]> git.donarmstrong.com Git - debbugs.git/blobdiff - scripts/spamscan.in
Distinguish between reports and followups (closes: #459866)
[debbugs.git] / scripts / spamscan.in
index 15a4c64273d37c1fc1ef6485b9ad525c4e85c040..9114b837439747b556b4ad9eede35218c8d72398 100755 (executable)
@@ -1,5 +1,5 @@
-#! /usr/bin/perl -T
-# $Id: spamscan.in,v 1.2 2004/01/13 19:01:13 cjwatson Exp $
+#! /usr/bin/perl
+# $Id: spamscan.in,v 1.8 2005/02/01 07:54:01 blarson Exp $
 #
 # Usage: spamscan
 #
 # Creates: incoming/I.nn
 # Stop:    spamscan-stop
 
-$config_path = '/etc/debbugs';
-$lib_path = '/usr/lib/debbugs';
+use warnings;
+use strict;
 
-require "$config_path/config";
-require "$lib_path/errorlib";
-$ENV{PATH} = $lib_path . ':' . $ENV{PATH};
+use threads;
+use threads::shared;
 
-chdir $gSpoolDir or die "chdir spool: $!\n";
-push @INC, $lib_path;
+use Debbugs::Config qw(:config);
 
-use Mail::SpamAssassin;
-use Mail::SpamAssassin::NoMailAudit;
+use Debbugs::Common qw(:lock);
+
+use Mail::CrossAssassin;
+use Socket;
+use IO::Handle;
+use IPC::Open2;
+
+
+exit unless $config{spam_scan};
+
+chdir $config{spool_dir} or die "chdir spool: $!\n";
 
 umask 002;
 
+eval {
+    filelock('incoming-spamscan');
+};
+exit if $@;
+
+my %spamseen : shared = ();
+my @ids : shared = ();
+my %fudged : shared = ();
+my $spamscan_stop : shared = 0;
+my $cross_key : shared;
+my @cross_return : shared;
+my $cross_tid : shared;
+my $print_lock : shared;
+my $assassinated_lock : shared;
+my $crossassassinated_lock : shared;
+my $threadsrunning : shared = 0;
+
+# flush output immediately
+$| = 1;
+
+sub lprint ($) {
+    lock $print_lock;
+    print $_[0];
+}
+
 my $user_prefs = "$ENV{HOME}/.spamassassin/user_prefs";
 my $user_prefs_time;
 if (-e $user_prefs) {
     $user_prefs_time = (stat $user_prefs)[9];
+} else {
+    die "$user_prefs not found";
 }
 
-my $spam = Mail::SpamAssassin->new({
-    dont_copy_prefs => 1,
-    site_rules_filename => $gSpamRulesDir,
-    userprefs_filename => $user_prefs,
-    local_tests_only => ($gSpamLocalTestsOnly || 0),
-    debug => ($ENV{DEBBUGS_SPAM_DEBUG} || 0),
-});
-$spam->compile_now(1); # use all user preferences
+# This thread handles the updating and querying of the crossassassin db
+sub cross {
+    ca_init('\b\d{3,8}(?:-(?:close|done|forwarded|maintonly|submitter|quiet|subscribe))?\@'.$config{email_domain}, $config{spam_crossassassin_db});
+    my $mytid = threads->self->tid();
+crosscheck:
+    while ($spamscan_stop <= 1) {
+       my ($ck, $ct);
+       {
+           lock $cross_key unless($cross_key);
+           until ($cross_key) {
+               last crosscheck if $spamscan_stop > 1;
+               lprint "{$mytid} cross waiting\n";
+               cond_timedwait $cross_key, (time() + 30);
+           }
+           last crosscheck if ($spamscan_stop > 1);
+           $ck = $cross_key;
+           $ct = $cross_tid;
+           undef $cross_key;
+       }
+       unless ($ck) {
+           lprint "{$mytid} Cross nothing\n";
+           sleep 1;
+           next crosscheck;
+       }
+       lprint "{$mytid} Cross{$ct}: $ck\n";
+       {
+           lock @cross_return;
+           $cross_return[$ct] = ca_set($ck);
+           cond_signal @cross_return;
+       }
+    }
+}
 
-$| = 1;
+# multiple threads handle spamassassin
+sub sa {
+    {
+       lock $threadsrunning;
+       $threadsrunning++;
+    }
+    my $mytid = threads->self->tid();
+    sleep $mytid + 3;
+    return if $spamscan_stop;
+    my ($sain, $saout);
 
-my @ids;
-my %fudged;
+    my $pid = open2($saout, $sain, "/usr/lib/debbugs/spamscan-sa");
+       lprint "{$mytid} forked $pid\n";
+       my $messages_handled=0;
+pp:    until ($spamscan_stop) {
+           my ($id, $nf);
+           lprint "{$mytid} $messages_handled messages handled\n";
+           $messages_handled++;
+getid:     for (;;) {
+               {
+                   lock @ids;
+                   $nf = @ids;
+                   $id = shift @ids;
+                   last getid if $nf;
+                   cond_timedwait @ids, (time() + 30);
+                   last pp if $spamscan_stop;
+                   $nf = @ids;
+                   $id = shift @ids;
+                   last getid if $nf;
+               }
+               lprint "{$mytid} Waiting for spam to process\n";
+               sleep 1;
+           }
+           print $sain "$id\n$nf\n";
+           lprint "{$mytid} $id is $nf\n";
+           my $keys = <$saout>;
+           unless (defined $keys) {
+               lprint "{$mytid} Could not get keys: $!\n";
+               last pp;
+           }
+           chomp $keys;
+           my $messageid = <$saout>;
+           unless (defined($messageid)) {
+               lprint "{$mytid} Could not read messageid: $!\n";
+               last pp;
+           }
+           chomp $messageid;
+           lprint "{$mytid} $id $keys\n";
+           my $ca_score;
+crosskey:   for (;;) {
+               {
+                   lock $cross_key;
+                   unless ($cross_key) {
+                       $cross_tid = $mytid;
+                       $cross_key = $keys;
+                       cond_signal $cross_key;
+                       last crosskey;
+                   }
+               }
+               lprint "{$mytid} zzz...\n";
+               select undef, undef, undef, 0.1;
+           }
+crossret:   for (;;) {
+               {
+                   lock @cross_return;
+                   if ($cross_return[$mytid]) {
+                       $ca_score = $cross_return[$mytid];
+                       undef $cross_return[$mytid];
+                       last crossret;
+                   }
+               }
+               lprint "{$mytid} z z z...\n";
+               select undef, undef, undef, 0.1;
+           }
+           lprint "{$mytid} $id: ca_score: $ca_score\n";
+           my $seen = $spamseen{$messageid};
+           $seen = '' unless $seen;
+           unless(print $sain "$ca_score\n$seen\n") {
+               lprint "{$mytid} Could not send ca_score: $!\n";
+               last pp;
+           }
+           my $todo = <$saout>;
+           unless (defined($todo)) {
+               lprint "{$mytid} Could not read todo: $!\n";
+               last pp;
+           }
+           chomp $todo;
+           my $nseen;
+           if ($todo == 1) {
+               lock $assassinated_lock;
+               print $sain "$todo\n";
+               $nseen = <$saout>;
+           } elsif ($todo == 2) {
+               lock $crossassassinated_lock;
+               print $sain "$todo\n";
+               $nseen = <$saout>;
+           } else {
+               print $sain "$todo\n";
+               $nseen = <$saout>;
+           }
+           unless(defined($nseen)) {
+               lprint "{$mytid} Could not read seen: $!\n";
+               start_sa() if (scalar(@ids) > ($threadsrunning * $config{spam_spams_per_thread})
+                   && $threadsrunning < $config{spam_max_threads});
+               last pp;
+           }
+           chomp $nseen;
+           $spamseen{$messageid} = $nseen if ($nseen);
+           my $out = <$saout>;
+           unless(defined($out)) {
+               lprint "{$mytid} Could not read out: $!\n";
+               last pp;
+           }
+           chomp $out;
+           $out =~ tr/\r/\n/;
+           lprint $out;
+       }
+       {
+           lock $threadsrunning;
+           $threadsrunning--;
+       }
+        close $sain;
+        close $saout;
+       waitpid($pid,0);
+}
 
-sub header_or_empty ($$) {
-    my ($mail, $hdr) = @_;
-    my $value = $mail->get_header($hdr);
-    if (defined $value) {
-       chomp $value;
-       return $value;
-    }
-    return '';
+my @sa_threads;
+sub start_sa {
+    my $s = threads->create(\&sa)
+       or die "Could not start sa threads: $!";
+    $s->detach;
+    push @sa_threads, $s;
 }
 
-&filelock('incoming-spamscan');
+my $cross_thread = threads->create(\&cross)
+    or die "Could not start cross thread: $!";
+$cross_thread->detach;
+start_sa;
+# start_sa;
+
+my $stopafter = time() + $config{spam_keep_running};
+
 for (;;) {
+    alarm 180;
     if (-f 'spamscan-stop') {
-       print STDERR "spamscan-stop file created\n";
+       lprint "spamscan-stop file created\n";
        last;
     }
-    if (-e $user_prefs) {
-       if ($user_prefs_time != (stat $user_prefs)[9]) {
-           # stop and wait to be re-invoked from cron
-           last;
-       }
+    if ($user_prefs_time != (stat $user_prefs)[9]) {
+       # stop and wait to be re-invoked from cron
+       lprint "File $user_prefs changed\n";
+       last;
     }
 
-    if (!@ids) {
+    unless (@ids) {
+       if (time() > $stopafter) {
+           lprint "KeepRunning timer expired\n";
+           last;
+       }
+        my @i;
        opendir DIR, 'incoming' or die "opendir incoming: $!";
        while (defined($_ = readdir DIR)) {
-           push @ids, $1 if /^S(.*)/;
+           push @i, $1 if /^S(.*)/;
        }
-       last unless @ids;
-       @ids = sort @ids;
-    }
-
-    my $nf = @ids;
-    my $id = shift @ids;
-    unless (rename "incoming/S$id", "incoming/R$id") {
-       if ($fudged{$id}) {
-           die "$id already fudged once! $!\n";
+       unless (@i) {
+           lprint "No more spam to process\n";
+           last;
        }
-       $fudged{$id} = 1;
-       next;
+       @i = sort {(split(/\./,$a))[1] <=> (split(/\./,$b))[1]} @i;
+       my $m = @i;
+       lprint "Messages to process: $m\n";
+       lock @ids;
+       push @ids, @i;
+       cond_broadcast @ids;
     }
+    start_sa if (scalar(@ids) > (($threadsrunning - 1) * $config{spam_spams_per_thread})
+                && $threadsrunning < $config{spam_max_threads});
+    sleep 30;
+}
 
-    print "[$nf] $id scanning ...\n" or die "print log: $!";
-
-    open MESSAGE, "< incoming/R$id" or die "open incoming/R$id: $!";
-    my @textarray;
-    # Kludge to work around Received: then From_ weirdness in receive;
-    # remove when receive is fixed? We may continue to need it for
-    # reprocessing old messages.
-    $textarray[0] = <MESSAGE>;
-    if ($textarray[0] =~ /^Received:/) {
-       my $maybefrom = <MESSAGE>;
-       if ($maybefrom =~ /^From /) {
-           $textarray[1] = $textarray[0];
-           $textarray[0] = $maybefrom;
-       } else {
-           $textarray[1] = $maybefrom;
-       }
-    }
-    push @textarray, <MESSAGE>;
-    close MESSAGE;
-    my $mail = Mail::SpamAssassin::NoMailAudit->new(data => \@textarray);
-    $mail->{noexit} = 1;
-
-    print "  From: ", header_or_empty($mail, 'From'), "\n";
-    print "  Subject: ", header_or_empty($mail, 'Subject'), "\n";
-    print "  Message-Id: ", header_or_empty($mail, 'Message-Id'), "\n";
-    my $status = $spam->check($mail);
-    $status->rewrite_mail();
-
-    if ($status->is_spam()) {
-       $mail->accept($gSpamMailbox);
-       unlink "incoming/R$id" or warn "unlink incoming/R$id: $!";
-       printf "  spam %.1f/%.1f\n",
-              $status->get_hits(), $status->get_required_hits()
-           or die "printf log: $!";
-    } else {
-       open OUT, "> incoming/I$id" or die "open incoming/I$id: $!";
-       print OUT $mail->as_string() or die "print incoming/I$id: $!";
-       close OUT or die "close incoming/I$id: $!";
-       unlink "incoming/R$id" or warn "unlink incoming/R$id: $!";
-       printf "  ok %.1f/%.1f\n",
-              $status->get_hits(), $status->get_required_hits()
-           or die "printf log: $!";
+alarm 180;
+
+# wait for the spamassasin threads
+$spamscan_stop = 1;
+{
+    lock @ids;
+    cond_broadcast @ids;
+}
+
+while (my $t = shift @sa_threads) {
+    my $tid = $t->tid;
+    lprint "{} waiting for thread $tid\n";
+    my $max_wait = 60;
+    while ($t->is_running and --$max_wait > 0) {
+        sleep 1;
     }
+#    $t->join;
+}
 
-    $status->finish();
+# wait for the crossassasin thread
+$spamscan_stop = 2;
+{
+    lprint "{} waiting for cross thread\n";
+    lock $cross_key;
+    $cross_key = 1;
+    cond_signal $cross_key;
+}
+my $max_wait = 60;
+while ($cross_thread->is_running and --$max_wait > 0) {
+    sleep 1;
 }
+#$cross_thread->join;
+
+END{
+   foreach my $thread (threads->list()){
+      $thread->join;
+   }
+}
+
 &unfilelock;
 
-exit 0;
+
+
+#exit 0;