]> git.donarmstrong.com Git - debbugs.git/blob - scripts/spamscan
Fix default user for usertags
[debbugs.git] / scripts / spamscan
1 #! /usr/bin/perl
2 # $Id: spamscan.in,v 1.8 2005/02/01 07:54:01 blarson Exp $
3 #
4 # Usage: spamscan
5 #
6 # Performs SpamAssassin checks on a message before allowing it through to
7 # the main incoming queue.
8 #
9 # Uses up: incoming/S<code><bugnum>.nn
10 # Temps:   incoming/R.nn
11 # Creates: incoming/I.nn
12 # Stop:    spamscan-stop
13
14 use warnings;
15 use strict;
16
17 use lib qw(/srv/bugs.debian.org/scripts);
18
19 use threads;
20 use threads::shared;
21
22 use Debbugs::Config qw(:config);
23
24 use Debbugs::Common qw(:lock);
25
26 use Mail::CrossAssassin;
27 use Socket;
28 use IO::Handle;
29 use IPC::Open2;
30
31
32 exit unless $config{spam_scan};
33
34 chdir $config{spool_dir} or die "chdir spool: $!\n";
35
36 umask 002;
37
38 eval {
39     filelock('incoming-spamscan');
40 };
41 exit if $@;
42
43 my %spamseen : shared = ();
44 my @ids : shared = ();
45 my %fudged : shared = ();
46 my $spamscan_stop : shared = 0;
47 my $cross_key : shared;
48 my @cross_return : shared;
49 my $cross_tid : shared;
50 my $print_lock : shared;
51 my $assassinated_lock : shared;
52 my $crossassassinated_lock : shared;
53 my $threadsrunning : shared = 0;
54
55 # flush output immediately
56 $| = 1;
57
58 sub lprint ($) {
59     lock $print_lock;
60     print $_[0];
61 }
62
63 my $user_prefs = "$ENV{HOME}/.spamassassin/user_prefs";
64 my $user_prefs_time;
65 if (-e $user_prefs) {
66     $user_prefs_time = (stat $user_prefs)[9];
67 } else {
68     die "$user_prefs not found";
69 }
70
71 # This thread handles the updating and querying of the crossassassin db
72 sub cross {
73     ca_init('\b\d{3,8}(?:-(?:close|done|forwarded|maintonly|submitter|quiet|subscribe))?\@'.$config{email_domain}, $config{spam_crossassassin_db});
74     my $mytid = threads->self->tid();
75 crosscheck:
76     while ($spamscan_stop <= 1) {
77         my ($ck, $ct);
78         {
79             lock $cross_key unless($cross_key);
80             until ($cross_key) {
81                 last crosscheck if $spamscan_stop > 1;
82                 lprint "{$mytid} cross waiting\n";
83                 cond_timedwait $cross_key, (time() + 30);
84             }
85             last crosscheck if ($spamscan_stop > 1);
86             $ck = $cross_key;
87             $ct = $cross_tid;
88             undef $cross_key;
89         }
90         unless ($ck) {
91             lprint "{$mytid} Cross nothing\n";
92             sleep 1;
93             next crosscheck;
94         }
95         lprint "{$mytid} Cross{$ct}: $ck\n";
96         {
97             lock @cross_return;
98             $cross_return[$ct] = ca_set($ck);
99             cond_signal @cross_return;
100         }
101     }
102 }
103
104 # multiple threads handle spamassassin
105 sub sa {
106     {
107         lock $threadsrunning;
108         $threadsrunning++;
109     }
110     my $mytid = threads->self->tid();
111     sleep $mytid + 3;
112     return if $spamscan_stop;
113     my ($sain, $saout);
114
115     my $pid = open2($saout, $sain, "/srv/bugs.debian.org/scripts/spamscan-sa");
116         lprint "{$mytid} forked $pid\n";
117         my $messages_handled=0;
118 pp:     until ($spamscan_stop) {
119             my ($id, $nf);
120             lprint "{$mytid} $messages_handled messages handled\n";
121             $messages_handled++;
122 getid:      for (;;) {
123                 {
124                     lock @ids;
125                     $nf = @ids;
126                     $id = shift @ids;
127                     last getid if $nf;
128                     cond_timedwait @ids, (time() + 30);
129                     last pp if $spamscan_stop;
130                     $nf = @ids;
131                     $id = shift @ids;
132                     last getid if $nf;
133                 }
134                 lprint "{$mytid} Waiting for spam to process\n";
135                 sleep 1;
136             }
137             print $sain "$id\n$nf\n";
138             lprint "{$mytid} $id is $nf\n";
139             my $keys = <$saout>;
140             unless (defined $keys) {
141                 lprint "{$mytid} Could not get keys: $!\n";
142                 last pp;
143             }
144             chomp $keys;
145             my $messageid = <$saout>;
146             unless (defined($messageid)) {
147                 lprint "{$mytid} Could not read messageid: $!\n";
148                 last pp;
149             }
150             chomp $messageid;
151             lprint "{$mytid} $id $keys\n";
152             my $ca_score;
153 crosskey:   for (;;) {
154                 {
155                     lock $cross_key;
156                     unless ($cross_key) {
157                         $cross_tid = $mytid;
158                         $cross_key = $keys;
159                         cond_signal $cross_key;
160                         last crosskey;
161                     }
162                 }
163                 lprint "{$mytid} zzz...\n";
164                 select undef, undef, undef, 0.1;
165             }
166 crossret:   for (;;) {
167                 {
168                     lock @cross_return;
169                     if ($cross_return[$mytid]) {
170                         $ca_score = $cross_return[$mytid];
171                         undef $cross_return[$mytid];
172                         last crossret;
173                     }
174                 }
175                 lprint "{$mytid} z z z...\n";
176                 select undef, undef, undef, 0.1;
177             }
178             lprint "{$mytid} $id: ca_score: $ca_score\n";
179             my $seen = $spamseen{$messageid};
180             $seen = '' unless $seen;
181             unless(print $sain "$ca_score\n$seen\n") {
182                 lprint "{$mytid} Could not send ca_score: $!\n";
183                 last pp;
184             }
185             my $todo = <$saout>;
186             unless (defined($todo)) {
187                 lprint "{$mytid} Could not read todo: $!\n";
188                 last pp;
189             }
190             chomp $todo;
191             my $nseen;
192             if ($todo == 1) {
193                 lock $assassinated_lock;
194                 print $sain "$todo\n";
195                 $nseen = <$saout>;
196             } elsif ($todo == 2) {
197                 lock $crossassassinated_lock;
198                 print $sain "$todo\n";
199                 $nseen = <$saout>;
200             } else {
201                 print $sain "$todo\n";
202                 $nseen = <$saout>;
203             }
204             unless(defined($nseen)) {
205                 lprint "{$mytid} Could not read seen: $!\n";
206                 start_sa() if (scalar(@ids) > ($threadsrunning * $config{spam_spams_per_thread})
207                     && $threadsrunning < $config{spam_max_threads});
208                 last pp;
209             }
210             chomp $nseen;
211             $spamseen{$messageid} = $nseen if ($nseen);
212             my $out = <$saout>;
213             unless(defined($out)) {
214                 lprint "{$mytid} Could not read out: $!\n";
215                 last pp;
216             }
217             chomp $out;
218             $out =~ tr/\r/\n/;
219             lprint $out;
220         }
221         {
222             lock $threadsrunning;
223             $threadsrunning--;
224         }
225         close $sain;
226         close $saout;
227         waitpid($pid,0);
228 }
229
230 my @sa_threads;
231 sub start_sa {
232     my $s = threads->create(\&sa)
233         or die "Could not start sa threads: $!";
234     $s->detach;
235     push @sa_threads, $s;
236 }
237
238 my $cross_thread = threads->create(\&cross)
239     or die "Could not start cross thread: $!";
240 $cross_thread->detach;
241 start_sa;
242 # start_sa;
243
244 my $stopafter = time() + $config{spam_keep_running};
245
246 for (;;) {
247     alarm 180;
248     if (-f 'spamscan-stop') {
249         lprint "spamscan-stop file created\n";
250         last;
251     }
252     if ($user_prefs_time != (stat $user_prefs)[9]) {
253         # stop and wait to be re-invoked from cron
254         lprint "File $user_prefs changed\n";
255         last;
256     }
257
258     unless (@ids) {
259         if (time() > $stopafter) {
260             lprint "KeepRunning timer expired\n";
261             last;
262         }
263         my @i;
264         opendir my $dir, 'incoming' or die "opendir incoming: $!";
265         while (defined($_ = readdir $dir)) {
266             push @i, $1 if /^S(.*)/;
267         }
268         close $dir;
269         unless (@i) {
270             lprint "No more spam to process\n";
271             last;
272         }
273         @i = sort {(split(/\./,$a))[1] <=> (split(/\./,$b))[1]} @i;
274         my $m = @i;
275         lprint "Messages to process: $m\n";
276         lock @ids;
277         push @ids, @i;
278         cond_broadcast @ids;
279     }
280     start_sa if (scalar(@ids) > (($threadsrunning - 1) * $config{spam_spams_per_thread})
281                  && $threadsrunning < $config{spam_max_threads});
282     sleep 30;
283 }
284
285 alarm 180;
286
287 # wait for the spamassasin threads
288 $spamscan_stop = 1;
289 {
290     lock @ids;
291     cond_broadcast @ids;
292 }
293
294 while (my $t = shift @sa_threads) {
295     my $tid = $t->tid;
296     lprint "{} waiting for thread $tid\n";
297     my $max_wait = 60;
298     while ($t->is_running and --$max_wait > 0) {
299         sleep 1;
300     }
301 #    $t->join;
302 }
303
304 # wait for the crossassasin thread
305 $spamscan_stop = 2;
306 {
307     lprint "{} waiting for cross thread\n";
308     lock $cross_key;
309     $cross_key = 1;
310     cond_signal $cross_key;
311 }
312 my $max_wait = 60;
313 while ($cross_thread->is_running and --$max_wait > 0) {
314     sleep 1;
315 }
316 #$cross_thread->join;
317
318 END{
319    foreach my $thread (threads->list()){
320       $thread->join;
321    }
322 }
323
324 &unfilelock;
325
326
327
328 #exit 0;