2 # $Id: spamscan.in,v 1.8 2005/02/01 07:54:01 blarson Exp $
6 # Performs SpamAssassin checks on a message before allowing it through to
7 # the main incoming queue.
9 # Uses up: incoming/S<code><bugnum>.nn
10 # Temps: incoming/R.nn
11 # Creates: incoming/I.nn
14 # unfortunatly we can't use strict;
15 use lib qw(/usr/lib/debbugs);
19 my $config_path = '/etc/debbugs';
20 my $lib_path = '/usr/lib/debbugs';
22 #use lib "/usr/lib/debbugs";
23 use Mail::CrossAssassin;
28 require "$config_path/config";
29 require "$lib_path/errorlib";
30 $ENV{PATH} = $lib_path . ':' . $ENV{PATH};
32 exit unless $gSpamScan;
34 chdir $gSpoolDir or die "chdir spool: $!\n";
39 &filelock('incoming-spamscan');
43 my %spamseen : shared = ();
44 my @ids : shared = ();
45 my %fudged : shared = ();
46 my $spamscan_stop : shared = 0;
47 my $cross_key : shared;
48 my @cross_return : shared;
49 my $cross_tid : shared;
50 my $print_lock : shared;
51 my $assassinated_lock : shared;
52 my $crossassassinated_lock : shared;
53 my $threadsrunning : shared = 0;
62 my $user_prefs = "$ENV{HOME}/.spamassassin/user_prefs";
65 $user_prefs_time = (stat $user_prefs)[9];
67 die "$user_prefs not found";
70 # This thread handles the updating and querying of the crossassassin db
72 ca_init('\b\d{3,8}(?:-(?:close|done|forwarded|maintonly|submitter|quiet|subscribe))?\@bugs\.debian\.org', '/org/bugs.debian.org/CrossAssassinDb');
73 my $mytid = threads->self->tid();
75 while ($spamscan_stop <= 1) {
78 lock $cross_key unless($cross_key);
80 last crosscheck if $spamscan_stop > 1;
81 lprint "{$mytid} cross waiting\n";
82 cond_timedwait $cross_key, (time() + 30);
84 last crosscheck if ($spamscan_stop > 1);
90 lprint "{$mytid} Cross nothing\n";
94 lprint "{$mytid} Cross{$ct}: $ck\n";
97 $cross_return[$ct] = ca_set($ck);
98 cond_signal @cross_return;
103 # multiple threads handle spamassassin
106 lock $threadsrunning;
109 my $mytid = threads->self->tid();
111 return if $spamscan_stop;
114 my $pid = open2($saout, $sain, "spamscan-sa");
115 lprint "{$mytid} forked $pid\n";
116 my $messages_handled=0;
117 pp: until ($spamscan_stop) {
119 lprint "{$mytid} $messages_handled messages handled\n";
127 cond_timedwait @ids, (time() + 30);
128 last pp if $spamscan_stop;
133 lprint "{$mytid} Waiting for spam to process\n";
136 print $sain "$id\n$nf\n";
137 lprint "{$mytid} $id is $nf\n";
139 unless ($keys = <$saout>) {
140 lprint "{$mytid} Could not get keys: $!\n";
145 unless ($messageid = <$saout>) {
146 lprint "{$mytid} Could not read messageid: $!\n";
150 lprint "{$mytid} $id $keys\n";
155 unless ($cross_key) {
158 cond_signal $cross_key;
162 lprint "{$mytid} zzz...\n";
163 select undef, undef, undef, 0.1;
168 if ($cross_return[$mytid]) {
169 $ca_score = $cross_return[$mytid];
170 undef $cross_return[$mytid];
174 lprint "{$mytid} z z z...\n";
175 select undef, undef, undef, 0.1;
177 lprint "{$mytid} $id: ca_score: $ca_score\n";
178 my $seen = $spamseen{$messageid};
179 $seen = '' unless $seen;
180 unless(print $sain "$ca_score\n$seen\n") {
181 lprint "{$mytid} Could not send ca_score: $!\n";
185 unless ($todo = <$saout>) {
186 lprint "{$mytid} Could not read todo: $!\n";
192 lock $assassinated_lock;
193 print $sain "$todo\n";
195 } elsif ($todo == 2) {
196 lock $crossassassinated_lock;
197 print $sain "$todo\n";
200 print $sain "$todo\n";
204 lprint "{$mytid} Could not read seen: $!\n";
205 start_sa if (scalar(@ids) > ($threadsrunning * $gSpamsPerThread)
206 && $threadsrunning < $gMaxThreads);
210 $spamseen{$messageid} = $nseen if ($nseen);
212 unless($out = <$saout>) {
213 lprint "{$mytid} Could not read out: $!\n";
221 lock $threadsrunning;
231 my $s = threads->create(\&sa)
232 or die "Could not start sa threads: $!";
234 push @sa_threads, $s;
237 $gKeepRunning = 3600 unless $gKeepRunning > 0;
238 $gSpamsPerThread = 200 unless defined($gSpamsPerThread);
239 $gMaxThreads = 20 unless defined($gMaxThreads);
241 my $cross_thread = threads->create(\&cross)
242 or die "Could not start cross thread: $!";
243 $cross_thread->detach;
247 my $stopafter = time() + $gKeepRunning;
251 if (-f 'spamscan-stop') {
252 lprint "spamscan-stop file created\n";
255 if ($user_prefs_time != (stat $user_prefs)[9]) {
256 # stop and wait to be re-invoked from cron
257 lprint "File $user_prefs changed\n";
262 if (time() > $stopafter) {
263 lprint "KeepRunning timer expired\n";
267 opendir DIR, 'incoming' or die "opendir incoming: $!";
268 while (defined($_ = readdir DIR)) {
269 push @i, $1 if /^S(.*)/;
272 lprint "No more spam to process\n";
275 @i = sort {(split(/\./,$a))[1] <=> (split(/\./,$b))[1]} @i;
277 lprint "Messages to process: $m\n";
282 start_sa if (scalar(@ids) > (($threadsrunning - 1) * $gSpamsPerThread)
283 && $threadsrunning < $gMaxThreads);
289 # wait for the spamassasin threads
296 while (my $t = shift @sa_threads) {
298 lprint "{} waiting for thread $tid\n";
300 while ($t->is_running and --$max_wait > 0) {
306 # wait for the crossassasin thread
309 lprint "{} waiting for cross thread\n";
312 cond_signal $cross_key;
315 while ($cross_thread->is_running and --$max_wait > 0) {
318 #$cross_thread->join;
321 foreach my $thread (threads->list()){