#! /usr/bin/perl # $Id: spamscan.in,v 1.8 2005/02/01 07:54:01 blarson Exp $ # # Usage: spamscan # # Performs SpamAssassin checks on a message before allowing it through to # the main incoming queue. # # Uses up: incoming/S.nn # Temps: incoming/R.nn # Creates: incoming/I.nn # Stop: spamscan-stop use warnings; use strict; use threads; use threads::shared; use Debbugs::Config qw(:config); use Debbugs::Common qw(:lock); use Mail::CrossAssassin; use Socket; use IO::Handle; use IPC::Open2; exit unless $config{spam_scan}; chdir $config{spool_dir} or die "chdir spool: $!\n"; umask 002; eval { filelock('incoming-spamscan'); }; exit if $@; my %spamseen : shared = (); my @ids : shared = (); my %fudged : shared = (); my $spamscan_stop : shared = 0; my $cross_key : shared; my @cross_return : shared; my $cross_tid : shared; my $print_lock : shared; my $assassinated_lock : shared; my $crossassassinated_lock : shared; my $threadsrunning : shared = 0; # flush output immediately $| = 1; sub lprint ($) { lock $print_lock; print $_[0]; } my $user_prefs = "$ENV{HOME}/.spamassassin/user_prefs"; my $user_prefs_time; if (-e $user_prefs) { $user_prefs_time = (stat $user_prefs)[9]; } else { die "$user_prefs not found"; } # This thread handles the updating and querying of the crossassassin db sub cross { ca_init('\b\d{3,8}(?:-(?:close|done|forwarded|maintonly|submitter|quiet|subscribe))?\@'.$config{email_domain}, $config{spam_crossassassin_db}); my $mytid = threads->self->tid(); crosscheck: while ($spamscan_stop <= 1) { my ($ck, $ct); { lock $cross_key unless($cross_key); until ($cross_key) { last crosscheck if $spamscan_stop > 1; lprint "{$mytid} cross waiting\n"; cond_timedwait $cross_key, (time() + 30); } last crosscheck if ($spamscan_stop > 1); $ck = $cross_key; $ct = $cross_tid; undef $cross_key; } unless ($ck) { lprint "{$mytid} Cross nothing\n"; sleep 1; next crosscheck; } lprint "{$mytid} Cross{$ct}: $ck\n"; { lock @cross_return; $cross_return[$ct] = ca_set($ck); cond_signal @cross_return; } } } # multiple threads handle spamassassin sub sa { { lock $threadsrunning; $threadsrunning++; } my $mytid = threads->self->tid(); sleep $mytid + 3; return if $spamscan_stop; my ($sain, $saout); my $pid = open2($saout, $sain, "/usr/lib/debbugs/spamscan-sa"); lprint "{$mytid} forked $pid\n"; my $messages_handled=0; pp: until ($spamscan_stop) { my ($id, $nf); lprint "{$mytid} $messages_handled messages handled\n"; $messages_handled++; getid: for (;;) { { lock @ids; $nf = @ids; $id = shift @ids; last getid if $nf; cond_timedwait @ids, (time() + 30); last pp if $spamscan_stop; $nf = @ids; $id = shift @ids; last getid if $nf; } lprint "{$mytid} Waiting for spam to process\n"; sleep 1; } print $sain "$id\n$nf\n"; lprint "{$mytid} $id is $nf\n"; my $keys = <$saout>; unless (defined $keys) { lprint "{$mytid} Could not get keys: $!\n"; last pp; } chomp $keys; my $messageid = <$saout>; unless (defined($messageid)) { lprint "{$mytid} Could not read messageid: $!\n"; last pp; } chomp $messageid; lprint "{$mytid} $id $keys\n"; my $ca_score; crosskey: for (;;) { { lock $cross_key; unless ($cross_key) { $cross_tid = $mytid; $cross_key = $keys; cond_signal $cross_key; last crosskey; } } lprint "{$mytid} zzz...\n"; select undef, undef, undef, 0.1; } crossret: for (;;) { { lock @cross_return; if ($cross_return[$mytid]) { $ca_score = $cross_return[$mytid]; undef $cross_return[$mytid]; last crossret; } } lprint "{$mytid} z z z...\n"; select undef, undef, undef, 0.1; } lprint "{$mytid} $id: ca_score: $ca_score\n"; my $seen = $spamseen{$messageid}; $seen = '' unless $seen; unless(print $sain "$ca_score\n$seen\n") { lprint "{$mytid} Could not send ca_score: $!\n"; last pp; } my $todo = <$saout>; unless (defined($todo)) { lprint "{$mytid} Could not read todo: $!\n"; last pp; } chomp $todo; my $nseen; if ($todo == 1) { lock $assassinated_lock; print $sain "$todo\n"; $nseen = <$saout>; } elsif ($todo == 2) { lock $crossassassinated_lock; print $sain "$todo\n"; $nseen = <$saout>; } else { print $sain "$todo\n"; $nseen = <$saout>; } unless(defined($nseen)) { lprint "{$mytid} Could not read seen: $!\n"; start_sa() if (scalar(@ids) > ($threadsrunning * $config{spam_spams_per_thread}) && $threadsrunning < $config{spam_max_threads}); last pp; } chomp $nseen; $spamseen{$messageid} = $nseen if ($nseen); my $out = <$saout>; unless(defined($out)) { lprint "{$mytid} Could not read out: $!\n"; last pp; } chomp $out; $out =~ tr/\r/\n/; lprint $out; } { lock $threadsrunning; $threadsrunning--; } close $sain; close $saout; waitpid($pid,0); } my @sa_threads; sub start_sa { my $s = threads->create(\&sa) or die "Could not start sa threads: $!"; $s->detach; push @sa_threads, $s; } my $cross_thread = threads->create(\&cross) or die "Could not start cross thread: $!"; $cross_thread->detach; start_sa; # start_sa; my $stopafter = time() + $config{spam_keep_running}; for (;;) { alarm 180; if (-f 'spamscan-stop') { lprint "spamscan-stop file created\n"; last; } if ($user_prefs_time != (stat $user_prefs)[9]) { # stop and wait to be re-invoked from cron lprint "File $user_prefs changed\n"; last; } unless (@ids) { if (time() > $stopafter) { lprint "KeepRunning timer expired\n"; last; } my @i; opendir my $dir, 'incoming' or die "opendir incoming: $!"; while (defined($_ = readdir $dir)) { push @i, $1 if /^S(.*)/; } close $dir; unless (@i) { lprint "No more spam to process\n"; last; } @i = sort {(split(/\./,$a))[1] <=> (split(/\./,$b))[1]} @i; my $m = @i; lprint "Messages to process: $m\n"; lock @ids; push @ids, @i; cond_broadcast @ids; } start_sa if (scalar(@ids) > (($threadsrunning - 1) * $config{spam_spams_per_thread}) && $threadsrunning < $config{spam_max_threads}); sleep 30; } alarm 180; # wait for the spamassasin threads $spamscan_stop = 1; { lock @ids; cond_broadcast @ids; } while (my $t = shift @sa_threads) { my $tid = $t->tid; lprint "{} waiting for thread $tid\n"; my $max_wait = 60; while ($t->is_running and --$max_wait > 0) { sleep 1; } # $t->join; } # wait for the crossassasin thread $spamscan_stop = 2; { lprint "{} waiting for cross thread\n"; lock $cross_key; $cross_key = 1; cond_signal $cross_key; } my $max_wait = 60; while ($cross_thread->is_running and --$max_wait > 0) { sleep 1; } #$cross_thread->join; END{ foreach my $thread (threads->list()){ $thread->join; } } &unfilelock; #exit 0;