]> git.donarmstrong.com Git - debbugs.git/commitdiff
New version of spamscan that uses internal locking for
authorBlars Blarson <blarson@uvula>
Sat, 8 Sep 2007 14:01:53 +0000 (07:01 -0700)
committerBlars Blarson <blarson@uvula>
Sat, 8 Sep 2007 14:01:53 +0000 (07:01 -0700)
assassinated/crossassassinated.

Many changes including split of spamscan into spamscan and spamscan-sa.

scripts/spamscan-sa [new file with mode: 0755]
scripts/spamscan.in

diff --git a/scripts/spamscan-sa b/scripts/spamscan-sa
new file mode 100755 (executable)
index 0000000..3ee7682
--- /dev/null
@@ -0,0 +1,155 @@
+#! /usr/bin/perl
+# spamassassin handling split from spamscan
+#
+
+# unfortunatly we can't use strict;
+
+use lib qw(/usr/lib/debbugs);
+use Mail::CrossAssassin;
+
+use Mail::SpamAssassin;
+use Mail::SpamAssassin::NoMailAudit;
+
+my $config_path = '/etc/debbugs';
+require "$config_path/config";
+# New versions of debbugs will not allow use in /etc/debbugs/config
+use POSIX qw(strftime);
+$gSpamMailbox = strftime($gSpamMailbox,gmtime);
+$gCrossMailbox = strftime($gCrossMailbox,gmtime);
+
+umask 002;
+$| = 1;
+STDOUT->autoflush(1);
+
+sub header_or_empty ($$) {
+    my ($mail, $hdr) = @_;
+    my $value = $mail->get_header($hdr);
+    if (defined $value) {
+       chomp $value;
+       $value =~ tr/\n/\\n/;
+       return $value;
+    }
+    return '';
+}
+
+my $user_prefs = "$ENV{HOME}/.spamassassin/user_prefs";
+
+my $spam = Mail::SpamAssassin->new({
+        dont_copy_prefs => 1,
+        site_rules_filename => $gSpamRulesDir,
+        userprefs_filename => $user_prefs,
+               local_tests_only => ($gSpamLocalTestsOnly || 0),
+               debug => ($ENV{DEBBUGS_SPAM_DEBUG} || 0),
+});
+$spam->compile_now(1); # use all user preferences
+
+       while (my $id = <STDIN>) {
+           chomp $id;
+           my $nf = <STDIN> or die "Could not read nf: $!";
+           chomp $nf;
+           unless (rename "incoming/S$id", "incoming/R$id") {
+               die "Could not rename incoming/S$id: $!";
+           }
+           my $out = "[$nf] $id scanning ...\n";
+           open MESSAGE, "< incoming/R$id" or die "open incoming/R$id: $!";
+           my @textarray;
+           # Kludge to work around Received: then From_ weirdness in receive;
+           # remove when receive is fixed? We may continue to need it for
+           # reprocessing old messages.
+           $textarray[0] = <MESSAGE>;
+           if ($textarray[0] =~ /^Received:/) {
+               my $maybefrom = <MESSAGE>;
+               if ($maybefrom =~ /^From /) {
+                   $textarray[1] = $textarray[0];
+                   $textarray[0] = $maybefrom;
+               } else {
+                   $textarray[1] = $maybefrom;
+               }
+           }
+           push @textarray, <MESSAGE>;
+           close MESSAGE;
+           my $mail = Mail::SpamAssassin::NoMailAudit->new(data => \@textarray);
+           $mail->{noexit} = 1;
+
+           my $messageid = header_or_empty($mail, 'Message-Id');
+           $out .= "  From: " . header_or_empty($mail, 'From') . "\n";
+           $out .= "  Subject: ". header_or_empty($mail, 'Subject') . "\n";
+           $out .= "  Date: " . header_or_empty($mail, 'Date') . "\n";
+           $out .= "  Message-Id: $messageid\n";
+           my $keys = ca_keys($mail->get_body);
+           print  "$keys\n$messageid\n"
+               or die "Could not send keys: $!";
+           my $ca_score = <STDIN> or die "Could not read ca_score: $!";
+           chomp $ca_score;
+           my $todo = 0;
+           my $seen = <STDIN> or die "Child could not read seen: $!";
+           chomp $seen;
+           my $status;
+           my $nseen = $seen;
+           if ($seen) {
+               $todo = 1;
+               $out .= "  spam $seen duplicate\n";
+           } else {
+               $status = $spam->check($mail);
+               $status->rewrite_mail();
+
+               if ($status->is_spam()) {
+#                  $mail->accept($gSpamMailbox);
+#                  unlink "incoming/R$id" or warn "unlink incoming/R$id: $!";
+                   $todo = 1;
+                   my $score = sprintf "%.1f/%.1f %d",
+                           $status->get_hits(), $status->get_required_hits(),
+                           $ca_score;
+                   $out .= "  spam $score\n";
+                   $nseen = $score;
+               } elsif ($status->get_hits() > 0 && $ca_score >= $gMaxCross) {
+#                  $mail->accept($gCrossMailbox);
+#                  unlink "incoming/R$id" or warn "unlink incoming/R$id: $!";
+                   $todo = 2;
+                   my $score = sprintf "%.1f/%.1f %d",
+                   $status->get_hits(), $status->get_required_hits(), $ca_score;
+                   $out .= "  spam $score\n";
+                   $nseen = $score;
+               } else {
+                   open OUT, "> incoming/I$id" or die "open incoming/I$id: $!";
+                   my @headers = $mail->get_all_headers();
+                   if ($headers[0] =~ /^From /) {
+                       my $from = $headers[0];
+                       $headers[0] = $headers[1];
+                       $headers[1] = $from;
+                   }
+                   print OUT join '', @headers or die "print incoming/I$id: $!";
+                   if ($ca_score > 1) {
+                       print OUT "X-CrossAssassin-Score: $ca_score\n"
+                           or die "print incoming/I$id: $!";
+                   }
+                   print OUT "\n" or die "print incoming/I$id: $!";
+                   print OUT @{$mail->get_body()} or die "print incoming/I$id: $!";
+                   close OUT or die "close incoming/I$id: $!";
+                   unlink "incoming/R$id" or warn "unlink incoming/R$id: $!";
+                   $out .= sprintf "  ok %.1f/%.1f %d\n",
+                           $status->get_hits(), $status->get_required_hits(),
+                           $ca_score;
+               }
+           }
+           print "$todo\n";
+           my $x = <STDIN>;
+           if ($todo) {
+               open OUT, '>>', ($todo == 1) ? $gSpamMailbox : $gCrossMailbox
+                   or die "Could not open assassinated: $!";
+               my @headers = $mail->get_all_headers();
+               print OUT @headers
+                   or die "print assassinated: $!";
+               print OUT "\n"
+                   or die "print assassinated: $!";
+               foreach (@{$mail->get_body()}) {
+                   s/^From />From /;
+                   print OUT $_
+                       or die "print assassinated: $!";
+               }
+               close OUT or die "Close assassinated: $!";
+           }
+           $out =~ tr/\n/\r/;
+           print "$nseen\n$out\n";
+           $status->finish() unless($seen);
+       }
index 987778a568eb2d364608b7e57271cb981897e1fe..7e25a7bf4438183f27201e90f87e67d4f7036bde 100755 (executable)
@@ -1,5 +1,5 @@
-#! /usr/bin/perl -T
-# $Id: spamscan.in,v 1.10 2005/07/22 21:37:31 don Exp $
+#! /usr/bin/perl
+# $Id: spamscan.in,v 1.8 2005/02/01 07:54:01 blarson Exp $
 #
 # Usage: spamscan
 #
 # Creates: incoming/I.nn
 # Stop:    spamscan-stop
 
-$config_path = '/etc/debbugs';
-$lib_path = '/usr/lib/debbugs';
+# unfortunatly we can't use strict;
+use lib qw(/usr/lib/debbugs);
+use threads;
+use threads::shared;
+
+my $config_path = '/etc/debbugs';
+my $lib_path = '/usr/lib/debbugs';
+#use lib $lib_path;
+#use lib "/usr/lib/debbugs";
+use Mail::CrossAssassin;
+use Socket;
+use IO::Handle;
+use IPC::Open2;
 
 require "$config_path/config";
 require "$lib_path/errorlib";
@@ -21,12 +32,6 @@ $ENV{PATH} = $lib_path . ':' . $ENV{PATH};
 exit unless $gSpamScan;
 
 chdir $gSpoolDir or die "chdir spool: $!\n";
-push @INC, $lib_path;
-
-use Mail::SpamAssassin;
-
-use lib '/usr/lib/debbugs';
-use Mail::CrossAssassin;
 
 umask 002;
 
@@ -35,156 +40,291 @@ eval {
 };
 exit if $@;
 
-ca_init('\b\d{3,8}(?:-(?:close|done|forwarded|maintonly|submitter|quiet))?\@bugs\.debian\.org', '/org/bugs.debian.org/CrossAssassinDb');
+my %spamseen : shared = ();
+my @ids : shared = ();
+my %fudged : shared = ();
+my $spamscan_stop : shared = 0;
+my $cross_key : shared;
+my @cross_return : shared;
+my $cross_tid : shared;
+my $print_lock : shared;
+my $assassinated_lock : shared;
+my $crossassassinated_lock : shared;
+my $threadsrunning : shared = 0;
+
+$| = 1;
 
-my %spamseen = ();
+sub lprint ($) {
+    lock $print_lock;
+    print $_[0];
+}
 
 my $user_prefs = "$ENV{HOME}/.spamassassin/user_prefs";
 my $user_prefs_time;
 if (-e $user_prefs) {
     $user_prefs_time = (stat $user_prefs)[9];
+} else {
+    die "$user_prefs not found";
 }
 
-my $spam = Mail::SpamAssassin->new({
-    dont_copy_prefs => 1,
-    site_rules_filename => $gSpamRulesDir,
-    userprefs_filename => $user_prefs,
-    local_tests_only => ($gSpamLocalTestsOnly || 0),
-#    debug => ($ENV{DEBBUGS_SPAM_DEBUG} || 0),
-#    check_mx_delay => 2, # bit of a hack until we have parallelization
-});
-$spam->compile_now(1); # use all user preferences
+# This thread handles the updating and querying of the crossassassin db
+sub cross {
+    ca_init('\b\d{3,8}(?:-(?:close|done|forwarded|maintonly|submitter|quiet|subscribe))?\@bugs\.debian\.org', '/org/bugs.debian.org/CrossAssassinDb');
+    my $mytid = threads->self->tid();
+crosscheck:
+    while ($spamscan_stop <= 1) {
+       my ($ck, $ct);
+       {
+           lock $cross_key unless($cross_key);
+           until ($cross_key) {
+               last crosscheck if $spamscan_stop > 1;
+               lprint "{$mytid} cross waiting\n";
+               cond_timedwait $cross_key, (time() + 30);
+           }
+           last crosscheck if ($spamscan_stop > 1);
+           $ck = $cross_key;
+           $ct = $cross_tid;
+           undef $cross_key;
+       }
+       unless ($ck) {
+           lprint "{$mytid} Cross nothing\n";
+           sleep 1;
+           next crosscheck;
+       }
+       lprint "{$mytid} Cross{$ct}: $ck\n";
+       {
+           lock @cross_return;
+           $cross_return[$ct] = ca_set($ck);
+           cond_signal @cross_return;
+       }
+    }
+}
 
-$| = 1;
+# multiple threads handle spamassassin
+sub sa {
+    {
+       lock $threadsrunning;
+       $threadsrunning++;
+    }
+    my $mytid = threads->self->tid();
+    sleep $mytid + 3;
+    return if $spamscan_stop;
+    my ($sain, $saout);
 
-my @ids;
-my %fudged;
+    my $pid = open2($saout, $sain, "spamscan-sa");
+       lprint "{$mytid} forked $pid\n";
+       my $messages_handled=0;
+pp:    until ($spamscan_stop) {
+           my ($id, $nf);
+           lprint "{$mytid} $messages_handled messages handled\n";
+           $messages_handled++;
+getid:     for (;;) {
+               {
+                   lock @ids;
+                   $nf = @ids;
+                   $id = shift @ids;
+                   last getid if $nf;
+                   cond_timedwait @ids, (time() + 30);
+                   last pp if $spamscan_stop;
+                   $nf = @ids;
+                   $id = shift @ids;
+                   last getid if $nf;
+               }
+               lprint "{$mytid} Waiting for spam to process\n";
+               sleep 1;
+           }
+           print $sain "$id\n$nf\n";
+           lprint "{$mytid} $id is $nf\n";
+           my $keys;
+           unless ($keys = <$saout>) {
+               lprint "{$mytid} Could not get keys: $!\n";
+               last pp;
+           }
+           chomp $keys;
+           my $messageid;
+           unless ($messageid = <$saout>) {
+               lprint "{$mytid} Could not read messageid: $!\n";
+               last pp;
+           }
+           chomp $messageid;
+           lprint "{$mytid} $id $keys\n";
+           my $ca_score;
+crosskey:   for (;;) {
+               {
+                   lock $cross_key;
+                   unless ($cross_key) {
+                       $cross_tid = $mytid;
+                       $cross_key = $keys;
+                       cond_signal $cross_key;
+                       last crosskey;
+                   }
+               }
+               lprint "{$mytid} zzz...\n";
+               select undef, undef, undef, 0.1;
+           }
+crossret:   for (;;) {
+               {
+                   lock @cross_return;
+                   if ($cross_return[$mytid]) {
+                       $ca_score = $cross_return[$mytid];
+                       undef $cross_return[$mytid];
+                       last crossret;
+                   }
+               }
+               lprint "{$mytid} z z z...\n";
+               select undef, undef, undef, 0.1;
+           }
+           lprint "{$mytid} $id: ca_score: $ca_score\n";
+           my $seen = $spamseen{$messageid};
+           $seen = '' unless $seen;
+           unless(print $sain "$ca_score\n$seen\n") {
+               lprint "{$mytid} Could not send ca_score: $!\n";
+               last pp;
+           }
+           my $todo;
+           unless ($todo = <$saout>) {
+               lprint "{$mytid} Could not read todo: $!\n";
+               last pp;
+           }
+           chomp $todo;
+           my $nseen;
+           if ($todo == 1) {
+               lock $assassinated_lock;
+               print $sain "$todo\n";
+               $nseen = <$saout>;
+           } elsif ($todo == 2) {
+               lock $crossassassinated_lock;
+               print $sain "$todo\n";
+               $nseen = <$saout>;
+           } else {
+               print $sain "$todo\n";
+               $nseen = <$saout>;
+           }
+           unless($nseen) {
+               lprint "{$mytid} Could not read seen: $!\n";
+               start_sa if (scalar(@ids) > ($threadsrunning * $gSpamsPerThread)
+                   && $threadsrunning < $gMaxThreads);
+               last pp;
+           }
+           chomp $nseen;
+           $spamseen{$messageid} = $nseen if ($nseen);
+           my $out;
+           unless($out = <$saout>) {
+               lprint "{$mytid} Could not read out: $!\n";
+               last pp;
+           }
+           chomp $out;
+           $out =~ tr/\r/\n/;
+           lprint $out;
+       }
+       {
+           lock $threadsrunning;
+           $threadsrunning--;
+       }
+        close $sain;
+        close $saout;
+       waitpid($pid,0);
+}
 
-sub header_or_empty ($$) {
-    my ($mail, $hdr) = @_;
-    my $value = $mail->get_header($hdr);
-    if (defined $value) {
-       chomp $value;
-       return $value;
-    }
-    return '';
+my @sa_threads;
+sub start_sa() {
+    my $s = threads->create(\&sa)
+       or die "Could not start sa threads: $!";
+    $s->detach;
+    push @sa_threads, $s;
 }
 
+$gKeepRunning = 3600 unless $gKeepRunning > 0;
+$gSpamsPerThread = 200 unless defined($gSpamsPerThread);
+$gMaxThreads = 20 unless defined($gMaxThreads);
+
+my $cross_thread = threads->create(\&cross)
+    or die "Could not start cross thread: $!";
+$cross_thread->detach;
+start_sa;
+# start_sa;
+
+my $stopafter = time() + $gKeepRunning;
+
 for (;;) {
+    alarm 180;
     if (-f 'spamscan-stop') {
-       print "spamscan-stop file created\n";
+       lprint "spamscan-stop file created\n";
        last;
     }
-    if (-e $user_prefs) {
-       if ($user_prefs_time != (stat $user_prefs)[9]) {
-           # stop and wait to be re-invoked from cron
-           last;
-       }
+    if ($user_prefs_time != (stat $user_prefs)[9]) {
+       # stop and wait to be re-invoked from cron
+       lprint "File $user_prefs changed\n";
+       last;
     }
 
-    if (!@ids) {
+    unless (@ids) {
+       if (time() > $stopafter) {
+           lprint "KeepRunning timer expired\n";
+           last;
+       }
+        my @i;
        opendir DIR, 'incoming' or die "opendir incoming: $!";
        while (defined($_ = readdir DIR)) {
-           push @ids, $1 if /^S(.*)/;
+           push @i, $1 if /^S(.*)/;
        }
-       last unless @ids;
-       @ids = sort @ids;
-    }
-
-    my $nf = @ids;
-    my $id = shift @ids;
-    unless (rename "incoming/S$id", "incoming/R$id") {
-       if ($fudged{$id}) {
-           die "$id already fudged once! $!\n";
+       unless (@i) {
+           lprint "No more spam to process\n";
+           last;
        }
-       $fudged{$id} = 1;
-       next;
+       @i = sort {(split(/\./,$a))[1] <=> (split(/\./,$b))[1]} @i;
+       my $m = @i;
+       lprint "Messages to process: $m\n";
+       lock @ids;
+       push @ids, @i;
+       cond_broadcast @ids;
     }
+    start_sa if (scalar(@ids) > (($threadsrunning - 1) * $gSpamsPerThread)
+                && $threadsrunning < $gMaxThreads);
+    sleep 30;
+}
 
-    print "[$nf] $id scanning ...\n" or die "print log: $!";
-
-    open MESSAGE, "< incoming/R$id" or die "open incoming/R$id: $!";
-    my @textarray;
-    # Kludge to work around Received: then From_ weirdness in receive;
-    # remove when receive is fixed? We may continue to need it for
-    # reprocessing old messages.
-    $textarray[0] = <MESSAGE>;
-    if ($textarray[0] =~ /^Received:/) {
-       my $maybefrom = <MESSAGE>;
-       if ($maybefrom =~ /^From /) {
-           $textarray[1] = $textarray[0];
-           $textarray[0] = $maybefrom;
-       } else {
-           $textarray[1] = $maybefrom;
-       }
-    }
-    push @textarray, <MESSAGE>;
-    close MESSAGE;
-    my $mail = $spam->parse(\@textarray);
-
-    my $messageid = header_or_empty($mail, 'Message-Id');
-    print "  From: ", header_or_empty($mail, 'From'), "\n";
-    print "  Subject: ", header_or_empty($mail, 'Subject'), "\n";
-    print "  Date: ", header_or_empty($mail, 'Date'), "\n";
-    print "  Message-Id: $messageid\n";
-    my $ca_score = ca_set(ca_keys($mail->get_body));
-    if (exists $spamseen{$messageid}) {
-       # XXX THIS DOES NOT DO LOCKING
-       open  OUT, ">> $gSpamMailbox" or die "open $gSpamMailbox failed: $!";
-       print OUT $mail->get_pristine or die "print $gSpamMailbox failed: $!";
-       close OUT or die "close $gSpamMailbox failed: $!";
-       unlink "incoming/R$id" or warn "unlink incoming/R$id: $!";
-       print "  spam $spamseen{$messageid} duplicate\n"
-           or die "printf log: $!";
-    } else {
-       my $status = $spam->check($mail);
-       my $munged_mail = $status->rewrite_mail();
-
-       if ($status->is_spam()) {
-           # XXX THIS DOES NOT DO LOCKING
-           open OUT, ">> $gSpamMailbox" or die "open $gSpamMailbox failed: $!";
-           print OUT $munged_mail or die "print $gSpamMailbox failed: $!";
-           close OUT  or die "close $gSpamMailbox failed: $!";
-           unlink "incoming/R$id" or warn "unlink incoming/R$id: $!";
-           my $score = sprintf "%.1f/%.1f %d",
-               $status->get_score(), $status->get_required_score(), $ca_score;
-           print "  spam $score\n" or die "print log: $!";
-           $spamseen{$messageid} = $score;
-       } elsif ($status->get_score() > 0 && $ca_score >= 4) {
-           # XXX THIS DOES NOT DO LOCKING
-           open OUT, ">> $gCrossMailbox" or die "open $gCrossMailbox failed: $!";
-           print OUT $munged_mail or die "print $gCrossMailbox failed: $!";
-           close OUT  or die "close $gCrossMailbox failed: $!";
-           unlink "incoming/R$id" or warn "unlink incoming/R$id: $!";
-           my $score = sprintf "%.1f/%.1f %d",
-               $status->get_score(), $status->get_required_score(), $ca_score;
-           printf "  spam $score\n" or die "printf log: $!";
-           $spamseen{$messageid} = $score;
-       } else {
-           open OUT, "> incoming/I$id" or die "open incoming/I$id: $!";
-           my ($received,$from,$rest_of_message) = split /\n/, $munged_mail, 3;
-           my ($headers,$body) = split /\n\n/, $rest_of_message, 2;
-           if ($received =~ /^From /) {
-               ($received,$from) = ($from,$received);
-           }
-           print OUT map { "$_\n"} ($received,$from,$headers) or die "print incoming/I$id: $!";
-           if ($ca_score > 1) {
-               print OUT "X-CrossAssassin-Score: $ca_score\n"
-                   or die "print incoming/I$id: $!";
-           }
-           print OUT "\n" or die "print incoming/I$id: $!";
-           print OUT $body or die "print incoming/I$id: $!";
-           close OUT or die "close incoming/I$id: $!";
-           unlink "incoming/R$id" or warn "unlink incoming/R$id: $!";
-           printf "  ok %.1f/%.1f %d\n",
-               $status->get_score(), $status->get_required_score(), $ca_score
-               or die "printf log: $!";
-       }
+alarm 180;
 
-       $status->finish();
+# wait for the spamassasin threads
+$spamscan_stop = 1;
+{
+    lock @ids;
+    cond_broadcast @ids;
+}
+
+while (my $t = shift @sa_threads) {
+    my $tid = $t->tid;
+    lprint "{} waiting for thread $tid\n";
+    my $max_wait = 60;
+    while ($t->is_running and --$max_wait > 0) {
+        sleep 1;
     }
-    $mail->finish;
+#    $t->join;
+}
+
+# wait for the crossassasin thread
+$spamscan_stop = 2;
+{
+    lprint "{} waiting for cross thread\n";
+    lock $cross_key;
+    $cross_key = 1;
+    cond_signal $cross_key;
+}
+my $max_wait = 60;
+while ($cross_thread->is_running and --$max_wait > 0) {
+    sleep 1;
+}
+#$cross_thread->join;
+
+END{
+   foreach my $thread (threads->list()){
+      $thread->join;
+   }
 }
+
 &unfilelock;
 
-exit 0;
+
+
+#exit 0;