From 8ec6e84eb4a811fcccba1a95e40983461ff59261 Mon Sep 17 00:00:00 2001 From: Blars Blarson Date: Sat, 8 Sep 2007 07:01:53 -0700 Subject: [PATCH] New version of spamscan that uses internal locking for assassinated/crossassassinated. Many changes including split of spamscan into spamscan and spamscan-sa. --- scripts/spamscan-sa | 155 +++++++++++++++++ scripts/spamscan.in | 406 +++++++++++++++++++++++++++++--------------- 2 files changed, 428 insertions(+), 133 deletions(-) create mode 100755 scripts/spamscan-sa diff --git a/scripts/spamscan-sa b/scripts/spamscan-sa new file mode 100755 index 0000000..3ee7682 --- /dev/null +++ b/scripts/spamscan-sa @@ -0,0 +1,155 @@ +#! /usr/bin/perl +# spamassassin handling split from spamscan +# + +# unfortunatly we can't use strict; + +use lib qw(/usr/lib/debbugs); +use Mail::CrossAssassin; + +use Mail::SpamAssassin; +use Mail::SpamAssassin::NoMailAudit; + +my $config_path = '/etc/debbugs'; +require "$config_path/config"; +# New versions of debbugs will not allow use in /etc/debbugs/config +use POSIX qw(strftime); +$gSpamMailbox = strftime($gSpamMailbox,gmtime); +$gCrossMailbox = strftime($gCrossMailbox,gmtime); + +umask 002; +$| = 1; +STDOUT->autoflush(1); + +sub header_or_empty ($$) { + my ($mail, $hdr) = @_; + my $value = $mail->get_header($hdr); + if (defined $value) { + chomp $value; + $value =~ tr/\n/\\n/; + return $value; + } + return ''; +} + +my $user_prefs = "$ENV{HOME}/.spamassassin/user_prefs"; + +my $spam = Mail::SpamAssassin->new({ + dont_copy_prefs => 1, + site_rules_filename => $gSpamRulesDir, + userprefs_filename => $user_prefs, + local_tests_only => ($gSpamLocalTestsOnly || 0), + debug => ($ENV{DEBBUGS_SPAM_DEBUG} || 0), +}); +$spam->compile_now(1); # use all user preferences + + while (my $id = ) { + chomp $id; + my $nf = or die "Could not read nf: $!"; + chomp $nf; + unless (rename "incoming/S$id", "incoming/R$id") { + die "Could not rename incoming/S$id: $!"; + } + my $out = "[$nf] $id scanning ...\n"; + open MESSAGE, "< incoming/R$id" or die "open incoming/R$id: $!"; + my @textarray; + # Kludge to work around Received: then From_ weirdness in receive; + # remove when receive is fixed? We may continue to need it for + # reprocessing old messages. + $textarray[0] = ; + if ($textarray[0] =~ /^Received:/) { + my $maybefrom = ; + if ($maybefrom =~ /^From /) { + $textarray[1] = $textarray[0]; + $textarray[0] = $maybefrom; + } else { + $textarray[1] = $maybefrom; + } + } + push @textarray, ; + close MESSAGE; + my $mail = Mail::SpamAssassin::NoMailAudit->new(data => \@textarray); + $mail->{noexit} = 1; + + my $messageid = header_or_empty($mail, 'Message-Id'); + $out .= " From: " . header_or_empty($mail, 'From') . "\n"; + $out .= " Subject: ". header_or_empty($mail, 'Subject') . "\n"; + $out .= " Date: " . header_or_empty($mail, 'Date') . "\n"; + $out .= " Message-Id: $messageid\n"; + my $keys = ca_keys($mail->get_body); + print "$keys\n$messageid\n" + or die "Could not send keys: $!"; + my $ca_score = or die "Could not read ca_score: $!"; + chomp $ca_score; + my $todo = 0; + my $seen = or die "Child could not read seen: $!"; + chomp $seen; + my $status; + my $nseen = $seen; + if ($seen) { + $todo = 1; + $out .= " spam $seen duplicate\n"; + } else { + $status = $spam->check($mail); + $status->rewrite_mail(); + + if ($status->is_spam()) { +# $mail->accept($gSpamMailbox); +# unlink "incoming/R$id" or warn "unlink incoming/R$id: $!"; + $todo = 1; + my $score = sprintf "%.1f/%.1f %d", + $status->get_hits(), $status->get_required_hits(), + $ca_score; + $out .= " spam $score\n"; + $nseen = $score; + } elsif ($status->get_hits() > 0 && $ca_score >= $gMaxCross) { +# $mail->accept($gCrossMailbox); +# unlink "incoming/R$id" or warn "unlink incoming/R$id: $!"; + $todo = 2; + my $score = sprintf "%.1f/%.1f %d", + $status->get_hits(), $status->get_required_hits(), $ca_score; + $out .= " spam $score\n"; + $nseen = $score; + } else { + open OUT, "> incoming/I$id" or die "open incoming/I$id: $!"; + my @headers = $mail->get_all_headers(); + if ($headers[0] =~ /^From /) { + my $from = $headers[0]; + $headers[0] = $headers[1]; + $headers[1] = $from; + } + print OUT join '', @headers or die "print incoming/I$id: $!"; + if ($ca_score > 1) { + print OUT "X-CrossAssassin-Score: $ca_score\n" + or die "print incoming/I$id: $!"; + } + print OUT "\n" or die "print incoming/I$id: $!"; + print OUT @{$mail->get_body()} or die "print incoming/I$id: $!"; + close OUT or die "close incoming/I$id: $!"; + unlink "incoming/R$id" or warn "unlink incoming/R$id: $!"; + $out .= sprintf " ok %.1f/%.1f %d\n", + $status->get_hits(), $status->get_required_hits(), + $ca_score; + } + } + print "$todo\n"; + my $x = ; + if ($todo) { + open OUT, '>>', ($todo == 1) ? $gSpamMailbox : $gCrossMailbox + or die "Could not open assassinated: $!"; + my @headers = $mail->get_all_headers(); + print OUT @headers + or die "print assassinated: $!"; + print OUT "\n" + or die "print assassinated: $!"; + foreach (@{$mail->get_body()}) { + s/^From />From /; + print OUT $_ + or die "print assassinated: $!"; + } + close OUT or die "Close assassinated: $!"; + } + $out =~ tr/\n/\r/; + print "$nseen\n$out\n"; + $status->finish() unless($seen); + } diff --git a/scripts/spamscan.in b/scripts/spamscan.in index 987778a..7e25a7b 100755 --- a/scripts/spamscan.in +++ b/scripts/spamscan.in @@ -1,5 +1,5 @@ -#! /usr/bin/perl -T -# $Id: spamscan.in,v 1.10 2005/07/22 21:37:31 don Exp $ +#! /usr/bin/perl +# $Id: spamscan.in,v 1.8 2005/02/01 07:54:01 blarson Exp $ # # Usage: spamscan # @@ -11,8 +11,19 @@ # Creates: incoming/I.nn # Stop: spamscan-stop -$config_path = '/etc/debbugs'; -$lib_path = '/usr/lib/debbugs'; +# unfortunatly we can't use strict; +use lib qw(/usr/lib/debbugs); +use threads; +use threads::shared; + +my $config_path = '/etc/debbugs'; +my $lib_path = '/usr/lib/debbugs'; +#use lib $lib_path; +#use lib "/usr/lib/debbugs"; +use Mail::CrossAssassin; +use Socket; +use IO::Handle; +use IPC::Open2; require "$config_path/config"; require "$lib_path/errorlib"; @@ -21,12 +32,6 @@ $ENV{PATH} = $lib_path . ':' . $ENV{PATH}; exit unless $gSpamScan; chdir $gSpoolDir or die "chdir spool: $!\n"; -push @INC, $lib_path; - -use Mail::SpamAssassin; - -use lib '/usr/lib/debbugs'; -use Mail::CrossAssassin; umask 002; @@ -35,156 +40,291 @@ eval { }; exit if $@; -ca_init('\b\d{3,8}(?:-(?:close|done|forwarded|maintonly|submitter|quiet))?\@bugs\.debian\.org', '/org/bugs.debian.org/CrossAssassinDb'); +my %spamseen : shared = (); +my @ids : shared = (); +my %fudged : shared = (); +my $spamscan_stop : shared = 0; +my $cross_key : shared; +my @cross_return : shared; +my $cross_tid : shared; +my $print_lock : shared; +my $assassinated_lock : shared; +my $crossassassinated_lock : shared; +my $threadsrunning : shared = 0; + +$| = 1; -my %spamseen = (); +sub lprint ($) { + lock $print_lock; + print $_[0]; +} my $user_prefs = "$ENV{HOME}/.spamassassin/user_prefs"; my $user_prefs_time; if (-e $user_prefs) { $user_prefs_time = (stat $user_prefs)[9]; +} else { + die "$user_prefs not found"; } -my $spam = Mail::SpamAssassin->new({ - dont_copy_prefs => 1, - site_rules_filename => $gSpamRulesDir, - userprefs_filename => $user_prefs, - local_tests_only => ($gSpamLocalTestsOnly || 0), -# debug => ($ENV{DEBBUGS_SPAM_DEBUG} || 0), -# check_mx_delay => 2, # bit of a hack until we have parallelization -}); -$spam->compile_now(1); # use all user preferences +# This thread handles the updating and querying of the crossassassin db +sub cross { + ca_init('\b\d{3,8}(?:-(?:close|done|forwarded|maintonly|submitter|quiet|subscribe))?\@bugs\.debian\.org', '/org/bugs.debian.org/CrossAssassinDb'); + my $mytid = threads->self->tid(); +crosscheck: + while ($spamscan_stop <= 1) { + my ($ck, $ct); + { + lock $cross_key unless($cross_key); + until ($cross_key) { + last crosscheck if $spamscan_stop > 1; + lprint "{$mytid} cross waiting\n"; + cond_timedwait $cross_key, (time() + 30); + } + last crosscheck if ($spamscan_stop > 1); + $ck = $cross_key; + $ct = $cross_tid; + undef $cross_key; + } + unless ($ck) { + lprint "{$mytid} Cross nothing\n"; + sleep 1; + next crosscheck; + } + lprint "{$mytid} Cross{$ct}: $ck\n"; + { + lock @cross_return; + $cross_return[$ct] = ca_set($ck); + cond_signal @cross_return; + } + } +} -$| = 1; +# multiple threads handle spamassassin +sub sa { + { + lock $threadsrunning; + $threadsrunning++; + } + my $mytid = threads->self->tid(); + sleep $mytid + 3; + return if $spamscan_stop; + my ($sain, $saout); -my @ids; -my %fudged; + my $pid = open2($saout, $sain, "spamscan-sa"); + lprint "{$mytid} forked $pid\n"; + my $messages_handled=0; +pp: until ($spamscan_stop) { + my ($id, $nf); + lprint "{$mytid} $messages_handled messages handled\n"; + $messages_handled++; +getid: for (;;) { + { + lock @ids; + $nf = @ids; + $id = shift @ids; + last getid if $nf; + cond_timedwait @ids, (time() + 30); + last pp if $spamscan_stop; + $nf = @ids; + $id = shift @ids; + last getid if $nf; + } + lprint "{$mytid} Waiting for spam to process\n"; + sleep 1; + } + print $sain "$id\n$nf\n"; + lprint "{$mytid} $id is $nf\n"; + my $keys; + unless ($keys = <$saout>) { + lprint "{$mytid} Could not get keys: $!\n"; + last pp; + } + chomp $keys; + my $messageid; + unless ($messageid = <$saout>) { + lprint "{$mytid} Could not read messageid: $!\n"; + last pp; + } + chomp $messageid; + lprint "{$mytid} $id $keys\n"; + my $ca_score; +crosskey: for (;;) { + { + lock $cross_key; + unless ($cross_key) { + $cross_tid = $mytid; + $cross_key = $keys; + cond_signal $cross_key; + last crosskey; + } + } + lprint "{$mytid} zzz...\n"; + select undef, undef, undef, 0.1; + } +crossret: for (;;) { + { + lock @cross_return; + if ($cross_return[$mytid]) { + $ca_score = $cross_return[$mytid]; + undef $cross_return[$mytid]; + last crossret; + } + } + lprint "{$mytid} z z z...\n"; + select undef, undef, undef, 0.1; + } + lprint "{$mytid} $id: ca_score: $ca_score\n"; + my $seen = $spamseen{$messageid}; + $seen = '' unless $seen; + unless(print $sain "$ca_score\n$seen\n") { + lprint "{$mytid} Could not send ca_score: $!\n"; + last pp; + } + my $todo; + unless ($todo = <$saout>) { + lprint "{$mytid} Could not read todo: $!\n"; + last pp; + } + chomp $todo; + my $nseen; + if ($todo == 1) { + lock $assassinated_lock; + print $sain "$todo\n"; + $nseen = <$saout>; + } elsif ($todo == 2) { + lock $crossassassinated_lock; + print $sain "$todo\n"; + $nseen = <$saout>; + } else { + print $sain "$todo\n"; + $nseen = <$saout>; + } + unless($nseen) { + lprint "{$mytid} Could not read seen: $!\n"; + start_sa if (scalar(@ids) > ($threadsrunning * $gSpamsPerThread) + && $threadsrunning < $gMaxThreads); + last pp; + } + chomp $nseen; + $spamseen{$messageid} = $nseen if ($nseen); + my $out; + unless($out = <$saout>) { + lprint "{$mytid} Could not read out: $!\n"; + last pp; + } + chomp $out; + $out =~ tr/\r/\n/; + lprint $out; + } + { + lock $threadsrunning; + $threadsrunning--; + } + close $sain; + close $saout; + waitpid($pid,0); +} -sub header_or_empty ($$) { - my ($mail, $hdr) = @_; - my $value = $mail->get_header($hdr); - if (defined $value) { - chomp $value; - return $value; - } - return ''; +my @sa_threads; +sub start_sa() { + my $s = threads->create(\&sa) + or die "Could not start sa threads: $!"; + $s->detach; + push @sa_threads, $s; } +$gKeepRunning = 3600 unless $gKeepRunning > 0; +$gSpamsPerThread = 200 unless defined($gSpamsPerThread); +$gMaxThreads = 20 unless defined($gMaxThreads); + +my $cross_thread = threads->create(\&cross) + or die "Could not start cross thread: $!"; +$cross_thread->detach; +start_sa; +# start_sa; + +my $stopafter = time() + $gKeepRunning; + for (;;) { + alarm 180; if (-f 'spamscan-stop') { - print "spamscan-stop file created\n"; + lprint "spamscan-stop file created\n"; last; } - if (-e $user_prefs) { - if ($user_prefs_time != (stat $user_prefs)[9]) { - # stop and wait to be re-invoked from cron - last; - } + if ($user_prefs_time != (stat $user_prefs)[9]) { + # stop and wait to be re-invoked from cron + lprint "File $user_prefs changed\n"; + last; } - if (!@ids) { + unless (@ids) { + if (time() > $stopafter) { + lprint "KeepRunning timer expired\n"; + last; + } + my @i; opendir DIR, 'incoming' or die "opendir incoming: $!"; while (defined($_ = readdir DIR)) { - push @ids, $1 if /^S(.*)/; + push @i, $1 if /^S(.*)/; } - last unless @ids; - @ids = sort @ids; - } - - my $nf = @ids; - my $id = shift @ids; - unless (rename "incoming/S$id", "incoming/R$id") { - if ($fudged{$id}) { - die "$id already fudged once! $!\n"; + unless (@i) { + lprint "No more spam to process\n"; + last; } - $fudged{$id} = 1; - next; + @i = sort {(split(/\./,$a))[1] <=> (split(/\./,$b))[1]} @i; + my $m = @i; + lprint "Messages to process: $m\n"; + lock @ids; + push @ids, @i; + cond_broadcast @ids; } + start_sa if (scalar(@ids) > (($threadsrunning - 1) * $gSpamsPerThread) + && $threadsrunning < $gMaxThreads); + sleep 30; +} - print "[$nf] $id scanning ...\n" or die "print log: $!"; - - open MESSAGE, "< incoming/R$id" or die "open incoming/R$id: $!"; - my @textarray; - # Kludge to work around Received: then From_ weirdness in receive; - # remove when receive is fixed? We may continue to need it for - # reprocessing old messages. - $textarray[0] = ; - if ($textarray[0] =~ /^Received:/) { - my $maybefrom = ; - if ($maybefrom =~ /^From /) { - $textarray[1] = $textarray[0]; - $textarray[0] = $maybefrom; - } else { - $textarray[1] = $maybefrom; - } - } - push @textarray, ; - close MESSAGE; - my $mail = $spam->parse(\@textarray); - - my $messageid = header_or_empty($mail, 'Message-Id'); - print " From: ", header_or_empty($mail, 'From'), "\n"; - print " Subject: ", header_or_empty($mail, 'Subject'), "\n"; - print " Date: ", header_or_empty($mail, 'Date'), "\n"; - print " Message-Id: $messageid\n"; - my $ca_score = ca_set(ca_keys($mail->get_body)); - if (exists $spamseen{$messageid}) { - # XXX THIS DOES NOT DO LOCKING - open OUT, ">> $gSpamMailbox" or die "open $gSpamMailbox failed: $!"; - print OUT $mail->get_pristine or die "print $gSpamMailbox failed: $!"; - close OUT or die "close $gSpamMailbox failed: $!"; - unlink "incoming/R$id" or warn "unlink incoming/R$id: $!"; - print " spam $spamseen{$messageid} duplicate\n" - or die "printf log: $!"; - } else { - my $status = $spam->check($mail); - my $munged_mail = $status->rewrite_mail(); - - if ($status->is_spam()) { - # XXX THIS DOES NOT DO LOCKING - open OUT, ">> $gSpamMailbox" or die "open $gSpamMailbox failed: $!"; - print OUT $munged_mail or die "print $gSpamMailbox failed: $!"; - close OUT or die "close $gSpamMailbox failed: $!"; - unlink "incoming/R$id" or warn "unlink incoming/R$id: $!"; - my $score = sprintf "%.1f/%.1f %d", - $status->get_score(), $status->get_required_score(), $ca_score; - print " spam $score\n" or die "print log: $!"; - $spamseen{$messageid} = $score; - } elsif ($status->get_score() > 0 && $ca_score >= 4) { - # XXX THIS DOES NOT DO LOCKING - open OUT, ">> $gCrossMailbox" or die "open $gCrossMailbox failed: $!"; - print OUT $munged_mail or die "print $gCrossMailbox failed: $!"; - close OUT or die "close $gCrossMailbox failed: $!"; - unlink "incoming/R$id" or warn "unlink incoming/R$id: $!"; - my $score = sprintf "%.1f/%.1f %d", - $status->get_score(), $status->get_required_score(), $ca_score; - printf " spam $score\n" or die "printf log: $!"; - $spamseen{$messageid} = $score; - } else { - open OUT, "> incoming/I$id" or die "open incoming/I$id: $!"; - my ($received,$from,$rest_of_message) = split /\n/, $munged_mail, 3; - my ($headers,$body) = split /\n\n/, $rest_of_message, 2; - if ($received =~ /^From /) { - ($received,$from) = ($from,$received); - } - print OUT map { "$_\n"} ($received,$from,$headers) or die "print incoming/I$id: $!"; - if ($ca_score > 1) { - print OUT "X-CrossAssassin-Score: $ca_score\n" - or die "print incoming/I$id: $!"; - } - print OUT "\n" or die "print incoming/I$id: $!"; - print OUT $body or die "print incoming/I$id: $!"; - close OUT or die "close incoming/I$id: $!"; - unlink "incoming/R$id" or warn "unlink incoming/R$id: $!"; - printf " ok %.1f/%.1f %d\n", - $status->get_score(), $status->get_required_score(), $ca_score - or die "printf log: $!"; - } +alarm 180; - $status->finish(); +# wait for the spamassasin threads +$spamscan_stop = 1; +{ + lock @ids; + cond_broadcast @ids; +} + +while (my $t = shift @sa_threads) { + my $tid = $t->tid; + lprint "{} waiting for thread $tid\n"; + my $max_wait = 60; + while ($t->is_running and --$max_wait > 0) { + sleep 1; } - $mail->finish; +# $t->join; +} + +# wait for the crossassasin thread +$spamscan_stop = 2; +{ + lprint "{} waiting for cross thread\n"; + lock $cross_key; + $cross_key = 1; + cond_signal $cross_key; +} +my $max_wait = 60; +while ($cross_thread->is_running and --$max_wait > 0) { + sleep 1; +} +#$cross_thread->join; + +END{ + foreach my $thread (threads->list()){ + $thread->join; + } } + &unfilelock; -exit 0; + + +#exit 0; -- 2.39.2