]> git.donarmstrong.com Git - debbugs.git/blob - scripts/spamscan
include function in instalsql for bin ver/src pkg linking
[debbugs.git] / scripts / spamscan
1 #! /usr/bin/perl
2 # $Id: spamscan.in,v 1.8 2005/02/01 07:54:01 blarson Exp $
3 #
4 # Usage: spamscan
5 #
6 # Performs SpamAssassin checks on a message before allowing it through to
7 # the main incoming queue.
8 #
9 # Uses up: incoming/S<code><bugnum>.nn
10 # Temps:   incoming/R.nn
11 # Creates: incoming/I.nn
12 # Stop:    spamscan-stop
13
14 use warnings;
15 use strict;
16
17 use threads;
18 use threads::shared;
19
20 use Debbugs::Config qw(:config);
21
22 use Debbugs::Common qw(:lock);
23
24 use Mail::CrossAssassin;
25 use Socket;
26 use IO::Handle;
27 use IPC::Open2;
28
29
30 exit unless $config{spam_scan};
31
32 chdir $config{spool_dir} or die "chdir spool: $!\n";
33
34 umask 002;
35
36 eval {
37     filelock('incoming-spamscan');
38 };
39 exit if $@;
40
41 my %spamseen : shared = ();
42 my @ids : shared = ();
43 my %fudged : shared = ();
44 my $spamscan_stop : shared = 0;
45 my $cross_key : shared;
46 my @cross_return : shared;
47 my $cross_tid : shared;
48 my $print_lock : shared;
49 my $assassinated_lock : shared;
50 my $crossassassinated_lock : shared;
51 my $threadsrunning : shared = 0;
52
53 # flush output immediately
54 $| = 1;
55
56 sub lprint ($) {
57     lock $print_lock;
58     print $_[0];
59 }
60
61 my $user_prefs = "$ENV{HOME}/.spamassassin/user_prefs";
62 my $user_prefs_time;
63 if (-e $user_prefs) {
64     $user_prefs_time = (stat $user_prefs)[9];
65 } else {
66     die "$user_prefs not found";
67 }
68
69 # This thread handles the updating and querying of the crossassassin db
70 sub cross {
71     ca_init('\b\d{3,8}(?:-(?:close|done|forwarded|maintonly|submitter|quiet|subscribe))?\@'.$config{email_domain}, $config{spam_crossassassin_db});
72     my $mytid = threads->self->tid();
73 crosscheck:
74     while ($spamscan_stop <= 1) {
75         my ($ck, $ct);
76         {
77             lock $cross_key unless($cross_key);
78             until ($cross_key) {
79                 last crosscheck if $spamscan_stop > 1;
80                 lprint "{$mytid} cross waiting\n";
81                 cond_timedwait $cross_key, (time() + 30);
82             }
83             last crosscheck if ($spamscan_stop > 1);
84             $ck = $cross_key;
85             $ct = $cross_tid;
86             undef $cross_key;
87         }
88         unless ($ck) {
89             lprint "{$mytid} Cross nothing\n";
90             sleep 1;
91             next crosscheck;
92         }
93         lprint "{$mytid} Cross{$ct}: $ck\n";
94         {
95             lock @cross_return;
96             $cross_return[$ct] = ca_set($ck);
97             cond_signal @cross_return;
98         }
99     }
100 }
101
102 # multiple threads handle spamassassin
103 sub sa {
104     {
105         lock $threadsrunning;
106         $threadsrunning++;
107     }
108     my $mytid = threads->self->tid();
109     sleep $mytid + 3;
110     return if $spamscan_stop;
111     my ($sain, $saout);
112
113     my $pid = open2($saout, $sain, "/usr/lib/debbugs/spamscan-sa");
114         lprint "{$mytid} forked $pid\n";
115         my $messages_handled=0;
116 pp:     until ($spamscan_stop) {
117             my ($id, $nf);
118             lprint "{$mytid} $messages_handled messages handled\n";
119             $messages_handled++;
120 getid:      for (;;) {
121                 {
122                     lock @ids;
123                     $nf = @ids;
124                     $id = shift @ids;
125                     last getid if $nf;
126                     cond_timedwait @ids, (time() + 30);
127                     last pp if $spamscan_stop;
128                     $nf = @ids;
129                     $id = shift @ids;
130                     last getid if $nf;
131                 }
132                 lprint "{$mytid} Waiting for spam to process\n";
133                 sleep 1;
134             }
135             print $sain "$id\n$nf\n";
136             lprint "{$mytid} $id is $nf\n";
137             my $keys = <$saout>;
138             unless (defined $keys) {
139                 lprint "{$mytid} Could not get keys: $!\n";
140                 last pp;
141             }
142             chomp $keys;
143             my $messageid = <$saout>;
144             unless (defined($messageid)) {
145                 lprint "{$mytid} Could not read messageid: $!\n";
146                 last pp;
147             }
148             chomp $messageid;
149             lprint "{$mytid} $id $keys\n";
150             my $ca_score;
151 crosskey:   for (;;) {
152                 {
153                     lock $cross_key;
154                     unless ($cross_key) {
155                         $cross_tid = $mytid;
156                         $cross_key = $keys;
157                         cond_signal $cross_key;
158                         last crosskey;
159                     }
160                 }
161                 lprint "{$mytid} zzz...\n";
162                 select undef, undef, undef, 0.1;
163             }
164 crossret:   for (;;) {
165                 {
166                     lock @cross_return;
167                     if ($cross_return[$mytid]) {
168                         $ca_score = $cross_return[$mytid];
169                         undef $cross_return[$mytid];
170                         last crossret;
171                     }
172                 }
173                 lprint "{$mytid} z z z...\n";
174                 select undef, undef, undef, 0.1;
175             }
176             lprint "{$mytid} $id: ca_score: $ca_score\n";
177             my $seen = $spamseen{$messageid};
178             $seen = '' unless $seen;
179             unless(print $sain "$ca_score\n$seen\n") {
180                 lprint "{$mytid} Could not send ca_score: $!\n";
181                 last pp;
182             }
183             my $todo = <$saout>;
184             unless (defined($todo)) {
185                 lprint "{$mytid} Could not read todo: $!\n";
186                 last pp;
187             }
188             chomp $todo;
189             my $nseen;
190             if ($todo == 1) {
191                 lock $assassinated_lock;
192                 print $sain "$todo\n";
193                 $nseen = <$saout>;
194             } elsif ($todo == 2) {
195                 lock $crossassassinated_lock;
196                 print $sain "$todo\n";
197                 $nseen = <$saout>;
198             } else {
199                 print $sain "$todo\n";
200                 $nseen = <$saout>;
201             }
202             unless(defined($nseen)) {
203                 lprint "{$mytid} Could not read seen: $!\n";
204                 start_sa() if (scalar(@ids) > ($threadsrunning * $config{spam_spams_per_thread})
205                     && $threadsrunning < $config{spam_max_threads});
206                 last pp;
207             }
208             chomp $nseen;
209             $spamseen{$messageid} = $nseen if ($nseen);
210             my $out = <$saout>;
211             unless(defined($out)) {
212                 lprint "{$mytid} Could not read out: $!\n";
213                 last pp;
214             }
215             chomp $out;
216             $out =~ tr/\r/\n/;
217             lprint $out;
218         }
219         {
220             lock $threadsrunning;
221             $threadsrunning--;
222         }
223         close $sain;
224         close $saout;
225         waitpid($pid,0);
226 }
227
228 my @sa_threads;
229 sub start_sa {
230     my $s = threads->create(\&sa)
231         or die "Could not start sa threads: $!";
232     $s->detach;
233     push @sa_threads, $s;
234 }
235
236 my $cross_thread = threads->create(\&cross)
237     or die "Could not start cross thread: $!";
238 $cross_thread->detach;
239 start_sa;
240 # start_sa;
241
242 my $stopafter = time() + $config{spam_keep_running};
243
244 for (;;) {
245     alarm 180;
246     if (-f 'spamscan-stop') {
247         lprint "spamscan-stop file created\n";
248         last;
249     }
250     if ($user_prefs_time != (stat $user_prefs)[9]) {
251         # stop and wait to be re-invoked from cron
252         lprint "File $user_prefs changed\n";
253         last;
254     }
255
256     unless (@ids) {
257         if (time() > $stopafter) {
258             lprint "KeepRunning timer expired\n";
259             last;
260         }
261         my @i;
262         opendir my $dir, 'incoming' or die "opendir incoming: $!";
263         while (defined($_ = readdir $dir)) {
264             push @i, $1 if /^S(.*)/;
265         }
266         close $dir;
267         unless (@i) {
268             lprint "No more spam to process\n";
269             last;
270         }
271         @i = sort {(split(/\./,$a))[1] <=> (split(/\./,$b))[1]} @i;
272         my $m = @i;
273         lprint "Messages to process: $m\n";
274         lock @ids;
275         push @ids, @i;
276         cond_broadcast @ids;
277     }
278     start_sa if (scalar(@ids) > (($threadsrunning - 1) * $config{spam_spams_per_thread})
279                  && $threadsrunning < $config{spam_max_threads});
280     sleep 30;
281 }
282
283 alarm 180;
284
285 # wait for the spamassasin threads
286 $spamscan_stop = 1;
287 {
288     lock @ids;
289     cond_broadcast @ids;
290 }
291
292 while (my $t = shift @sa_threads) {
293     my $tid = $t->tid;
294     lprint "{} waiting for thread $tid\n";
295     my $max_wait = 60;
296     while ($t->is_running and --$max_wait > 0) {
297         sleep 1;
298     }
299 #    $t->join;
300 }
301
302 # wait for the crossassasin thread
303 $spamscan_stop = 2;
304 {
305     lprint "{} waiting for cross thread\n";
306     lock $cross_key;
307     $cross_key = 1;
308     cond_signal $cross_key;
309 }
310 my $max_wait = 60;
311 while ($cross_thread->is_running and --$max_wait > 0) {
312     sleep 1;
313 }
314 #$cross_thread->join;
315
316 END{
317    foreach my $thread (threads->list()){
318       $thread->join;
319    }
320 }
321
322 &unfilelock;
323
324
325
326 #exit 0;