]> git.donarmstrong.com Git - debbugs.git/blob - scripts/spamscan.in
add default package support
[debbugs.git] / scripts / spamscan.in
1 #! /usr/bin/perl
2 # $Id: spamscan.in,v 1.8 2005/02/01 07:54:01 blarson Exp $
3 #
4 # Usage: spamscan
5 #
6 # Performs SpamAssassin checks on a message before allowing it through to
7 # the main incoming queue.
8 #
9 # Uses up: incoming/S<code><bugnum>.nn
10 # Temps:   incoming/R.nn
11 # Creates: incoming/I.nn
12 # Stop:    spamscan-stop
13
14 # unfortunatly we can't use strict;
15 use lib qw(/usr/lib/debbugs);
16 use threads;
17 use threads::shared;
18
19 my $config_path = '/etc/debbugs';
20 my $lib_path = '/usr/lib/debbugs';
21 #use lib $lib_path;
22 #use lib "/usr/lib/debbugs";
23 use Mail::CrossAssassin;
24 use Socket;
25 use IO::Handle;
26 use IPC::Open2;
27
28 require "$config_path/config";
29 require "$lib_path/errorlib";
30 $ENV{PATH} = $lib_path . ':' . $ENV{PATH};
31
32 exit unless $gSpamScan;
33
34 chdir $gSpoolDir or die "chdir spool: $!\n";
35
36 umask 002;
37
38 eval {
39     &filelock('incoming-spamscan');
40 };
41 exit if $@;
42
43 my %spamseen : shared = ();
44 my @ids : shared = ();
45 my %fudged : shared = ();
46 my $spamscan_stop : shared = 0;
47 my $cross_key : shared;
48 my @cross_return : shared;
49 my $cross_tid : shared;
50 my $print_lock : shared;
51 my $assassinated_lock : shared;
52 my $crossassassinated_lock : shared;
53 my $threadsrunning : shared = 0;
54
55 $| = 1;
56
57 sub lprint ($) {
58     lock $print_lock;
59     print $_[0];
60 }
61
62 my $user_prefs = "$ENV{HOME}/.spamassassin/user_prefs";
63 my $user_prefs_time;
64 if (-e $user_prefs) {
65     $user_prefs_time = (stat $user_prefs)[9];
66 } else {
67     die "$user_prefs not found";
68 }
69
70 # This thread handles the updating and querying of the crossassassin db
71 sub cross {
72     ca_init('\b\d{3,8}(?:-(?:close|done|forwarded|maintonly|submitter|quiet|subscribe))?\@bugs\.debian\.org', '/org/bugs.debian.org/CrossAssassinDb');
73     my $mytid = threads->self->tid();
74 crosscheck:
75     while ($spamscan_stop <= 1) {
76         my ($ck, $ct);
77         {
78             lock $cross_key unless($cross_key);
79             until ($cross_key) {
80                 last crosscheck if $spamscan_stop > 1;
81                 lprint "{$mytid} cross waiting\n";
82                 cond_timedwait $cross_key, (time() + 30);
83             }
84             last crosscheck if ($spamscan_stop > 1);
85             $ck = $cross_key;
86             $ct = $cross_tid;
87             undef $cross_key;
88         }
89         unless ($ck) {
90             lprint "{$mytid} Cross nothing\n";
91             sleep 1;
92             next crosscheck;
93         }
94         lprint "{$mytid} Cross{$ct}: $ck\n";
95         {
96             lock @cross_return;
97             $cross_return[$ct] = ca_set($ck);
98             cond_signal @cross_return;
99         }
100     }
101 }
102
103 # multiple threads handle spamassassin
104 sub sa {
105     {
106         lock $threadsrunning;
107         $threadsrunning++;
108     }
109     my $mytid = threads->self->tid();
110     sleep $mytid + 3;
111     return if $spamscan_stop;
112     my ($sain, $saout);
113
114     my $pid = open2($saout, $sain, "spamscan-sa");
115         lprint "{$mytid} forked $pid\n";
116         my $messages_handled=0;
117 pp:     until ($spamscan_stop) {
118             my ($id, $nf);
119             lprint "{$mytid} $messages_handled messages handled\n";
120             $messages_handled++;
121 getid:      for (;;) {
122                 {
123                     lock @ids;
124                     $nf = @ids;
125                     $id = shift @ids;
126                     last getid if $nf;
127                     cond_timedwait @ids, (time() + 30);
128                     last pp if $spamscan_stop;
129                     $nf = @ids;
130                     $id = shift @ids;
131                     last getid if $nf;
132                 }
133                 lprint "{$mytid} Waiting for spam to process\n";
134                 sleep 1;
135             }
136             print $sain "$id\n$nf\n";
137             lprint "{$mytid} $id is $nf\n";
138             my $keys;
139             unless ($keys = <$saout>) {
140                 lprint "{$mytid} Could not get keys: $!\n";
141                 last pp;
142             }
143             chomp $keys;
144             my $messageid;
145             unless ($messageid = <$saout>) {
146                 lprint "{$mytid} Could not read messageid: $!\n";
147                 last pp;
148             }
149             chomp $messageid;
150             lprint "{$mytid} $id $keys\n";
151             my $ca_score;
152 crosskey:   for (;;) {
153                 {
154                     lock $cross_key;
155                     unless ($cross_key) {
156                         $cross_tid = $mytid;
157                         $cross_key = $keys;
158                         cond_signal $cross_key;
159                         last crosskey;
160                     }
161                 }
162                 lprint "{$mytid} zzz...\n";
163                 select undef, undef, undef, 0.1;
164             }
165 crossret:   for (;;) {
166                 {
167                     lock @cross_return;
168                     if ($cross_return[$mytid]) {
169                         $ca_score = $cross_return[$mytid];
170                         undef $cross_return[$mytid];
171                         last crossret;
172                     }
173                 }
174                 lprint "{$mytid} z z z...\n";
175                 select undef, undef, undef, 0.1;
176             }
177             lprint "{$mytid} $id: ca_score: $ca_score\n";
178             my $seen = $spamseen{$messageid};
179             $seen = '' unless $seen;
180             unless(print $sain "$ca_score\n$seen\n") {
181                 lprint "{$mytid} Could not send ca_score: $!\n";
182                 last pp;
183             }
184             my $todo;
185             unless ($todo = <$saout>) {
186                 lprint "{$mytid} Could not read todo: $!\n";
187                 last pp;
188             }
189             chomp $todo;
190             my $nseen;
191             if ($todo == 1) {
192                 lock $assassinated_lock;
193                 print $sain "$todo\n";
194                 $nseen = <$saout>;
195             } elsif ($todo == 2) {
196                 lock $crossassassinated_lock;
197                 print $sain "$todo\n";
198                 $nseen = <$saout>;
199             } else {
200                 print $sain "$todo\n";
201                 $nseen = <$saout>;
202             }
203             unless($nseen) {
204                 lprint "{$mytid} Could not read seen: $!\n";
205                 start_sa if (scalar(@ids) > ($threadsrunning * $gSpamsPerThread)
206                     && $threadsrunning < $gMaxThreads);
207                 last pp;
208             }
209             chomp $nseen;
210             $spamseen{$messageid} = $nseen if ($nseen);
211             my $out;
212             unless($out = <$saout>) {
213                 lprint "{$mytid} Could not read out: $!\n";
214                 last pp;
215             }
216             chomp $out;
217             $out =~ tr/\r/\n/;
218             lprint $out;
219         }
220         {
221             lock $threadsrunning;
222             $threadsrunning--;
223         }
224         close $sain;
225         close $saout;
226         waitpid($pid,0);
227 }
228
229 my @sa_threads;
230 sub start_sa() {
231     my $s = threads->create(\&sa)
232         or die "Could not start sa threads: $!";
233     $s->detach;
234     push @sa_threads, $s;
235 }
236
237 $gKeepRunning = 3600 unless $gKeepRunning > 0;
238 $gSpamsPerThread = 200 unless defined($gSpamsPerThread);
239 $gMaxThreads = 20 unless defined($gMaxThreads);
240
241 my $cross_thread = threads->create(\&cross)
242     or die "Could not start cross thread: $!";
243 $cross_thread->detach;
244 start_sa;
245 # start_sa;
246
247 my $stopafter = time() + $gKeepRunning;
248
249 for (;;) {
250     alarm 180;
251     if (-f 'spamscan-stop') {
252         lprint "spamscan-stop file created\n";
253         last;
254     }
255     if ($user_prefs_time != (stat $user_prefs)[9]) {
256         # stop and wait to be re-invoked from cron
257         lprint "File $user_prefs changed\n";
258         last;
259     }
260
261     unless (@ids) {
262         if (time() > $stopafter) {
263             lprint "KeepRunning timer expired\n";
264             last;
265         }
266         my @i;
267         opendir DIR, 'incoming' or die "opendir incoming: $!";
268         while (defined($_ = readdir DIR)) {
269             push @i, $1 if /^S(.*)/;
270         }
271         unless (@i) {
272             lprint "No more spam to process\n";
273             last;
274         }
275         @i = sort {(split(/\./,$a))[1] <=> (split(/\./,$b))[1]} @i;
276         my $m = @i;
277         lprint "Messages to process: $m\n";
278         lock @ids;
279         push @ids, @i;
280         cond_broadcast @ids;
281     }
282     start_sa if (scalar(@ids) > (($threadsrunning - 1) * $gSpamsPerThread)
283                  && $threadsrunning < $gMaxThreads);
284     sleep 30;
285 }
286
287 alarm 180;
288
289 # wait for the spamassasin threads
290 $spamscan_stop = 1;
291 {
292     lock @ids;
293     cond_broadcast @ids;
294 }
295
296 while (my $t = shift @sa_threads) {
297     my $tid = $t->tid;
298     lprint "{} waiting for thread $tid\n";
299     my $max_wait = 60;
300     while ($t->is_running and --$max_wait > 0) {
301         sleep 1;
302     }
303 #    $t->join;
304 }
305
306 # wait for the crossassasin thread
307 $spamscan_stop = 2;
308 {
309     lprint "{} waiting for cross thread\n";
310     lock $cross_key;
311     $cross_key = 1;
312     cond_signal $cross_key;
313 }
314 my $max_wait = 60;
315 while ($cross_thread->is_running and --$max_wait > 0) {
316     sleep 1;
317 }
318 #$cross_thread->join;
319
320 END{
321    foreach my $thread (threads->list()){
322       $thread->join;
323    }
324 }
325
326 &unfilelock;
327
328
329
330 #exit 0;