2 # $Id: spamscan.in,v 1.8 2005/02/01 07:54:01 blarson Exp $
6 # Performs SpamAssassin checks on a message before allowing it through to
7 # the main incoming queue.
9 # Uses up: incoming/S<code><bugnum>.nn
10 # Temps: incoming/R.nn
11 # Creates: incoming/I.nn
17 use lib qw(/srv/bugs.debian.org/scripts);
22 use Debbugs::Config qw(:config);
24 use Debbugs::Common qw(:lock);
26 use Mail::CrossAssassin;
32 exit unless $config{spam_scan};
34 chdir $config{spool_dir} or die "chdir spool: $!\n";
39 filelock('incoming-spamscan');
43 my %spamseen : shared = ();
44 my @ids : shared = ();
45 my %fudged : shared = ();
46 my $spamscan_stop : shared = 0;
47 my $cross_key : shared;
48 my @cross_return : shared;
49 my $cross_tid : shared;
50 my $print_lock : shared;
51 my $assassinated_lock : shared;
52 my $crossassassinated_lock : shared;
53 my $threadsrunning : shared = 0;
55 # flush output immediately
63 my $user_prefs = "$ENV{HOME}/.spamassassin/user_prefs";
66 $user_prefs_time = (stat $user_prefs)[9];
68 die "$user_prefs not found";
71 # This thread handles the updating and querying of the crossassassin db
73 ca_init('\b\d{3,8}(?:-(?:close|done|forwarded|maintonly|submitter|quiet|subscribe))?\@'.$config{email_domain}, $config{spam_crossassassin_db});
74 my $mytid = threads->self->tid();
76 while ($spamscan_stop <= 1) {
79 lock $cross_key unless($cross_key);
81 last crosscheck if $spamscan_stop > 1;
82 lprint "{$mytid} cross waiting\n";
83 cond_timedwait $cross_key, (time() + 30);
85 last crosscheck if ($spamscan_stop > 1);
91 lprint "{$mytid} Cross nothing\n";
95 lprint "{$mytid} Cross{$ct}: $ck\n";
98 $cross_return[$ct] = ca_set($ck);
99 cond_signal @cross_return;
104 # multiple threads handle spamassassin
107 lock $threadsrunning;
110 my $mytid = threads->self->tid();
112 return if $spamscan_stop;
115 my $pid = open2($saout, $sain, "/srv/bugs.debian.org/scripts/spamscan-sa");
116 lprint "{$mytid} forked $pid\n";
117 my $messages_handled=0;
118 pp: until ($spamscan_stop) {
120 lprint "{$mytid} $messages_handled messages handled\n";
128 cond_timedwait @ids, (time() + 30);
129 last pp if $spamscan_stop;
134 lprint "{$mytid} Waiting for spam to process\n";
137 print $sain "$id\n$nf\n";
138 lprint "{$mytid} $id is $nf\n";
140 unless (defined $keys) {
141 lprint "{$mytid} Could not get keys: $!\n";
145 my $messageid = <$saout>;
146 unless (defined($messageid)) {
147 lprint "{$mytid} Could not read messageid: $!\n";
151 lprint "{$mytid} $id $keys\n";
156 unless ($cross_key) {
159 cond_signal $cross_key;
163 lprint "{$mytid} zzz...\n";
164 select undef, undef, undef, 0.1;
169 if ($cross_return[$mytid]) {
170 $ca_score = $cross_return[$mytid];
171 undef $cross_return[$mytid];
175 lprint "{$mytid} z z z...\n";
176 select undef, undef, undef, 0.1;
178 lprint "{$mytid} $id: ca_score: $ca_score\n";
179 my $seen = $spamseen{$messageid};
180 $seen = '' unless $seen;
181 unless(print $sain "$ca_score\n$seen\n") {
182 lprint "{$mytid} Could not send ca_score: $!\n";
186 unless (defined($todo)) {
187 lprint "{$mytid} Could not read todo: $!\n";
193 lock $assassinated_lock;
194 print $sain "$todo\n";
196 } elsif ($todo == 2) {
197 lock $crossassassinated_lock;
198 print $sain "$todo\n";
201 print $sain "$todo\n";
204 unless(defined($nseen)) {
205 lprint "{$mytid} Could not read seen: $!\n";
206 start_sa() if (scalar(@ids) > ($threadsrunning * $config{spam_spams_per_thread})
207 && $threadsrunning < $config{spam_max_threads});
211 $spamseen{$messageid} = $nseen if ($nseen);
213 unless(defined($out)) {
214 lprint "{$mytid} Could not read out: $!\n";
222 lock $threadsrunning;
232 my $s = threads->create(\&sa)
233 or die "Could not start sa threads: $!";
235 push @sa_threads, $s;
238 my $cross_thread = threads->create(\&cross)
239 or die "Could not start cross thread: $!";
240 $cross_thread->detach;
244 my $stopafter = time() + $config{spam_keep_running};
248 if (-f 'spamscan-stop') {
249 lprint "spamscan-stop file created\n";
252 if ($user_prefs_time != (stat $user_prefs)[9]) {
253 # stop and wait to be re-invoked from cron
254 lprint "File $user_prefs changed\n";
259 if (time() > $stopafter) {
260 lprint "KeepRunning timer expired\n";
264 opendir my $dir, 'incoming' or die "opendir incoming: $!";
265 while (defined($_ = readdir $dir)) {
266 push @i, $1 if /^S(.*)/;
270 lprint "No more spam to process\n";
273 @i = sort {(split(/\./,$a))[1] <=> (split(/\./,$b))[1]} @i;
275 lprint "Messages to process: $m\n";
280 start_sa if (scalar(@ids) > (($threadsrunning - 1) * $config{spam_spams_per_thread})
281 && $threadsrunning < $config{spam_max_threads});
287 # wait for the spamassasin threads
294 while (my $t = shift @sa_threads) {
296 lprint "{} waiting for thread $tid\n";
298 while ($t->is_running and --$max_wait > 0) {
304 # wait for the crossassasin thread
307 lprint "{} waiting for cross thread\n";
310 cond_signal $cross_key;
313 while ($cross_thread->is_running and --$max_wait > 0) {
316 #$cross_thread->join;
319 foreach my $thread (threads->list()){