--- /dev/null
+#! /usr/bin/perl
+# spamassassin handling split from spamscan
+#
+
+# unfortunatly we can't use strict;
+
+use lib qw(/usr/lib/debbugs);
+use Mail::CrossAssassin;
+
+use Mail::SpamAssassin;
+use Mail::SpamAssassin::NoMailAudit;
+
+my $config_path = '/etc/debbugs';
+require "$config_path/config";
+# New versions of debbugs will not allow use in /etc/debbugs/config
+use POSIX qw(strftime);
+$gSpamMailbox = strftime($gSpamMailbox,gmtime);
+$gCrossMailbox = strftime($gCrossMailbox,gmtime);
+
+umask 002;
+$| = 1;
+STDOUT->autoflush(1);
+
+sub header_or_empty ($$) {
+ my ($mail, $hdr) = @_;
+ my $value = $mail->get_header($hdr);
+ if (defined $value) {
+ chomp $value;
+ $value =~ tr/\n/\\n/;
+ return $value;
+ }
+ return '';
+}
+
+my $user_prefs = "$ENV{HOME}/.spamassassin/user_prefs";
+
+my $spam = Mail::SpamAssassin->new({
+ dont_copy_prefs => 1,
+ site_rules_filename => $gSpamRulesDir,
+ userprefs_filename => $user_prefs,
+ local_tests_only => ($gSpamLocalTestsOnly || 0),
+ debug => ($ENV{DEBBUGS_SPAM_DEBUG} || 0),
+});
+$spam->compile_now(1); # use all user preferences
+
+ while (my $id = <STDIN>) {
+ chomp $id;
+ my $nf = <STDIN> or die "Could not read nf: $!";
+ chomp $nf;
+ unless (rename "incoming/S$id", "incoming/R$id") {
+ die "Could not rename incoming/S$id: $!";
+ }
+ my $out = "[$nf] $id scanning ...\n";
+ open MESSAGE, "< incoming/R$id" or die "open incoming/R$id: $!";
+ my @textarray;
+ # Kludge to work around Received: then From_ weirdness in receive;
+ # remove when receive is fixed? We may continue to need it for
+ # reprocessing old messages.
+ $textarray[0] = <MESSAGE>;
+ if ($textarray[0] =~ /^Received:/) {
+ my $maybefrom = <MESSAGE>;
+ if ($maybefrom =~ /^From /) {
+ $textarray[1] = $textarray[0];
+ $textarray[0] = $maybefrom;
+ } else {
+ $textarray[1] = $maybefrom;
+ }
+ }
+ push @textarray, <MESSAGE>;
+ close MESSAGE;
+ my $mail = Mail::SpamAssassin::NoMailAudit->new(data => \@textarray);
+ $mail->{noexit} = 1;
+
+ my $messageid = header_or_empty($mail, 'Message-Id');
+ $out .= " From: " . header_or_empty($mail, 'From') . "\n";
+ $out .= " Subject: ". header_or_empty($mail, 'Subject') . "\n";
+ $out .= " Date: " . header_or_empty($mail, 'Date') . "\n";
+ $out .= " Message-Id: $messageid\n";
+ my $keys = ca_keys($mail->get_body);
+ print "$keys\n$messageid\n"
+ or die "Could not send keys: $!";
+ my $ca_score = <STDIN> or die "Could not read ca_score: $!";
+ chomp $ca_score;
+ my $todo = 0;
+ my $seen = <STDIN> or die "Child could not read seen: $!";
+ chomp $seen;
+ my $status;
+ my $nseen = $seen;
+ if ($seen) {
+ $todo = 1;
+ $out .= " spam $seen duplicate\n";
+ } else {
+ $status = $spam->check($mail);
+ $status->rewrite_mail();
+
+ if ($status->is_spam()) {
+# $mail->accept($gSpamMailbox);
+# unlink "incoming/R$id" or warn "unlink incoming/R$id: $!";
+ $todo = 1;
+ my $score = sprintf "%.1f/%.1f %d",
+ $status->get_hits(), $status->get_required_hits(),
+ $ca_score;
+ $out .= " spam $score\n";
+ $nseen = $score;
+ } elsif ($status->get_hits() > 0 && $ca_score >= $gMaxCross) {
+# $mail->accept($gCrossMailbox);
+# unlink "incoming/R$id" or warn "unlink incoming/R$id: $!";
+ $todo = 2;
+ my $score = sprintf "%.1f/%.1f %d",
+ $status->get_hits(), $status->get_required_hits(), $ca_score;
+ $out .= " spam $score\n";
+ $nseen = $score;
+ } else {
+ open OUT, "> incoming/I$id" or die "open incoming/I$id: $!";
+ my @headers = $mail->get_all_headers();
+ if ($headers[0] =~ /^From /) {
+ my $from = $headers[0];
+ $headers[0] = $headers[1];
+ $headers[1] = $from;
+ }
+ print OUT join '', @headers or die "print incoming/I$id: $!";
+ if ($ca_score > 1) {
+ print OUT "X-CrossAssassin-Score: $ca_score\n"
+ or die "print incoming/I$id: $!";
+ }
+ print OUT "\n" or die "print incoming/I$id: $!";
+ print OUT @{$mail->get_body()} or die "print incoming/I$id: $!";
+ close OUT or die "close incoming/I$id: $!";
+ unlink "incoming/R$id" or warn "unlink incoming/R$id: $!";
+ $out .= sprintf " ok %.1f/%.1f %d\n",
+ $status->get_hits(), $status->get_required_hits(),
+ $ca_score;
+ }
+ }
+ print "$todo\n";
+ my $x = <STDIN>;
+ if ($todo) {
+ open OUT, '>>', ($todo == 1) ? $gSpamMailbox : $gCrossMailbox
+ or die "Could not open assassinated: $!";
+ my @headers = $mail->get_all_headers();
+ print OUT @headers
+ or die "print assassinated: $!";
+ print OUT "\n"
+ or die "print assassinated: $!";
+ foreach (@{$mail->get_body()}) {
+ s/^From />From /;
+ print OUT $_
+ or die "print assassinated: $!";
+ }
+ close OUT or die "Close assassinated: $!";
+ }
+ $out =~ tr/\n/\r/;
+ print "$nseen\n$out\n";
+ $status->finish() unless($seen);
+ }
-#! /usr/bin/perl -T
-# $Id: spamscan.in,v 1.10 2005/07/22 21:37:31 don Exp $
+#! /usr/bin/perl
+# $Id: spamscan.in,v 1.8 2005/02/01 07:54:01 blarson Exp $
#
# Usage: spamscan
#
# Creates: incoming/I.nn
# Stop: spamscan-stop
-$config_path = '/etc/debbugs';
-$lib_path = '/usr/lib/debbugs';
+# unfortunatly we can't use strict;
+use lib qw(/usr/lib/debbugs);
+use threads;
+use threads::shared;
+
+my $config_path = '/etc/debbugs';
+my $lib_path = '/usr/lib/debbugs';
+#use lib $lib_path;
+#use lib "/usr/lib/debbugs";
+use Mail::CrossAssassin;
+use Socket;
+use IO::Handle;
+use IPC::Open2;
require "$config_path/config";
require "$lib_path/errorlib";
exit unless $gSpamScan;
chdir $gSpoolDir or die "chdir spool: $!\n";
-push @INC, $lib_path;
-
-use Mail::SpamAssassin;
-
-use lib '/usr/lib/debbugs';
-use Mail::CrossAssassin;
umask 002;
};
exit if $@;
-ca_init('\b\d{3,8}(?:-(?:close|done|forwarded|maintonly|submitter|quiet))?\@bugs\.debian\.org', '/org/bugs.debian.org/CrossAssassinDb');
+my %spamseen : shared = ();
+my @ids : shared = ();
+my %fudged : shared = ();
+my $spamscan_stop : shared = 0;
+my $cross_key : shared;
+my @cross_return : shared;
+my $cross_tid : shared;
+my $print_lock : shared;
+my $assassinated_lock : shared;
+my $crossassassinated_lock : shared;
+my $threadsrunning : shared = 0;
+
+$| = 1;
-my %spamseen = ();
+sub lprint ($) {
+ lock $print_lock;
+ print $_[0];
+}
my $user_prefs = "$ENV{HOME}/.spamassassin/user_prefs";
my $user_prefs_time;
if (-e $user_prefs) {
$user_prefs_time = (stat $user_prefs)[9];
+} else {
+ die "$user_prefs not found";
}
-my $spam = Mail::SpamAssassin->new({
- dont_copy_prefs => 1,
- site_rules_filename => $gSpamRulesDir,
- userprefs_filename => $user_prefs,
- local_tests_only => ($gSpamLocalTestsOnly || 0),
-# debug => ($ENV{DEBBUGS_SPAM_DEBUG} || 0),
-# check_mx_delay => 2, # bit of a hack until we have parallelization
-});
-$spam->compile_now(1); # use all user preferences
+# This thread handles the updating and querying of the crossassassin db
+sub cross {
+ ca_init('\b\d{3,8}(?:-(?:close|done|forwarded|maintonly|submitter|quiet|subscribe))?\@bugs\.debian\.org', '/org/bugs.debian.org/CrossAssassinDb');
+ my $mytid = threads->self->tid();
+crosscheck:
+ while ($spamscan_stop <= 1) {
+ my ($ck, $ct);
+ {
+ lock $cross_key unless($cross_key);
+ until ($cross_key) {
+ last crosscheck if $spamscan_stop > 1;
+ lprint "{$mytid} cross waiting\n";
+ cond_timedwait $cross_key, (time() + 30);
+ }
+ last crosscheck if ($spamscan_stop > 1);
+ $ck = $cross_key;
+ $ct = $cross_tid;
+ undef $cross_key;
+ }
+ unless ($ck) {
+ lprint "{$mytid} Cross nothing\n";
+ sleep 1;
+ next crosscheck;
+ }
+ lprint "{$mytid} Cross{$ct}: $ck\n";
+ {
+ lock @cross_return;
+ $cross_return[$ct] = ca_set($ck);
+ cond_signal @cross_return;
+ }
+ }
+}
-$| = 1;
+# multiple threads handle spamassassin
+sub sa {
+ {
+ lock $threadsrunning;
+ $threadsrunning++;
+ }
+ my $mytid = threads->self->tid();
+ sleep $mytid + 3;
+ return if $spamscan_stop;
+ my ($sain, $saout);
-my @ids;
-my %fudged;
+ my $pid = open2($saout, $sain, "spamscan-sa");
+ lprint "{$mytid} forked $pid\n";
+ my $messages_handled=0;
+pp: until ($spamscan_stop) {
+ my ($id, $nf);
+ lprint "{$mytid} $messages_handled messages handled\n";
+ $messages_handled++;
+getid: for (;;) {
+ {
+ lock @ids;
+ $nf = @ids;
+ $id = shift @ids;
+ last getid if $nf;
+ cond_timedwait @ids, (time() + 30);
+ last pp if $spamscan_stop;
+ $nf = @ids;
+ $id = shift @ids;
+ last getid if $nf;
+ }
+ lprint "{$mytid} Waiting for spam to process\n";
+ sleep 1;
+ }
+ print $sain "$id\n$nf\n";
+ lprint "{$mytid} $id is $nf\n";
+ my $keys;
+ unless ($keys = <$saout>) {
+ lprint "{$mytid} Could not get keys: $!\n";
+ last pp;
+ }
+ chomp $keys;
+ my $messageid;
+ unless ($messageid = <$saout>) {
+ lprint "{$mytid} Could not read messageid: $!\n";
+ last pp;
+ }
+ chomp $messageid;
+ lprint "{$mytid} $id $keys\n";
+ my $ca_score;
+crosskey: for (;;) {
+ {
+ lock $cross_key;
+ unless ($cross_key) {
+ $cross_tid = $mytid;
+ $cross_key = $keys;
+ cond_signal $cross_key;
+ last crosskey;
+ }
+ }
+ lprint "{$mytid} zzz...\n";
+ select undef, undef, undef, 0.1;
+ }
+crossret: for (;;) {
+ {
+ lock @cross_return;
+ if ($cross_return[$mytid]) {
+ $ca_score = $cross_return[$mytid];
+ undef $cross_return[$mytid];
+ last crossret;
+ }
+ }
+ lprint "{$mytid} z z z...\n";
+ select undef, undef, undef, 0.1;
+ }
+ lprint "{$mytid} $id: ca_score: $ca_score\n";
+ my $seen = $spamseen{$messageid};
+ $seen = '' unless $seen;
+ unless(print $sain "$ca_score\n$seen\n") {
+ lprint "{$mytid} Could not send ca_score: $!\n";
+ last pp;
+ }
+ my $todo;
+ unless ($todo = <$saout>) {
+ lprint "{$mytid} Could not read todo: $!\n";
+ last pp;
+ }
+ chomp $todo;
+ my $nseen;
+ if ($todo == 1) {
+ lock $assassinated_lock;
+ print $sain "$todo\n";
+ $nseen = <$saout>;
+ } elsif ($todo == 2) {
+ lock $crossassassinated_lock;
+ print $sain "$todo\n";
+ $nseen = <$saout>;
+ } else {
+ print $sain "$todo\n";
+ $nseen = <$saout>;
+ }
+ unless($nseen) {
+ lprint "{$mytid} Could not read seen: $!\n";
+ start_sa if (scalar(@ids) > ($threadsrunning * $gSpamsPerThread)
+ && $threadsrunning < $gMaxThreads);
+ last pp;
+ }
+ chomp $nseen;
+ $spamseen{$messageid} = $nseen if ($nseen);
+ my $out;
+ unless($out = <$saout>) {
+ lprint "{$mytid} Could not read out: $!\n";
+ last pp;
+ }
+ chomp $out;
+ $out =~ tr/\r/\n/;
+ lprint $out;
+ }
+ {
+ lock $threadsrunning;
+ $threadsrunning--;
+ }
+ close $sain;
+ close $saout;
+ waitpid($pid,0);
+}
-sub header_or_empty ($$) {
- my ($mail, $hdr) = @_;
- my $value = $mail->get_header($hdr);
- if (defined $value) {
- chomp $value;
- return $value;
- }
- return '';
+my @sa_threads;
+sub start_sa() {
+ my $s = threads->create(\&sa)
+ or die "Could not start sa threads: $!";
+ $s->detach;
+ push @sa_threads, $s;
}
+$gKeepRunning = 3600 unless $gKeepRunning > 0;
+$gSpamsPerThread = 200 unless defined($gSpamsPerThread);
+$gMaxThreads = 20 unless defined($gMaxThreads);
+
+my $cross_thread = threads->create(\&cross)
+ or die "Could not start cross thread: $!";
+$cross_thread->detach;
+start_sa;
+# start_sa;
+
+my $stopafter = time() + $gKeepRunning;
+
for (;;) {
+ alarm 180;
if (-f 'spamscan-stop') {
- print "spamscan-stop file created\n";
+ lprint "spamscan-stop file created\n";
last;
}
- if (-e $user_prefs) {
- if ($user_prefs_time != (stat $user_prefs)[9]) {
- # stop and wait to be re-invoked from cron
- last;
- }
+ if ($user_prefs_time != (stat $user_prefs)[9]) {
+ # stop and wait to be re-invoked from cron
+ lprint "File $user_prefs changed\n";
+ last;
}
- if (!@ids) {
+ unless (@ids) {
+ if (time() > $stopafter) {
+ lprint "KeepRunning timer expired\n";
+ last;
+ }
+ my @i;
opendir DIR, 'incoming' or die "opendir incoming: $!";
while (defined($_ = readdir DIR)) {
- push @ids, $1 if /^S(.*)/;
+ push @i, $1 if /^S(.*)/;
}
- last unless @ids;
- @ids = sort @ids;
- }
-
- my $nf = @ids;
- my $id = shift @ids;
- unless (rename "incoming/S$id", "incoming/R$id") {
- if ($fudged{$id}) {
- die "$id already fudged once! $!\n";
+ unless (@i) {
+ lprint "No more spam to process\n";
+ last;
}
- $fudged{$id} = 1;
- next;
+ @i = sort {(split(/\./,$a))[1] <=> (split(/\./,$b))[1]} @i;
+ my $m = @i;
+ lprint "Messages to process: $m\n";
+ lock @ids;
+ push @ids, @i;
+ cond_broadcast @ids;
}
+ start_sa if (scalar(@ids) > (($threadsrunning - 1) * $gSpamsPerThread)
+ && $threadsrunning < $gMaxThreads);
+ sleep 30;
+}
- print "[$nf] $id scanning ...\n" or die "print log: $!";
-
- open MESSAGE, "< incoming/R$id" or die "open incoming/R$id: $!";
- my @textarray;
- # Kludge to work around Received: then From_ weirdness in receive;
- # remove when receive is fixed? We may continue to need it for
- # reprocessing old messages.
- $textarray[0] = <MESSAGE>;
- if ($textarray[0] =~ /^Received:/) {
- my $maybefrom = <MESSAGE>;
- if ($maybefrom =~ /^From /) {
- $textarray[1] = $textarray[0];
- $textarray[0] = $maybefrom;
- } else {
- $textarray[1] = $maybefrom;
- }
- }
- push @textarray, <MESSAGE>;
- close MESSAGE;
- my $mail = $spam->parse(\@textarray);
-
- my $messageid = header_or_empty($mail, 'Message-Id');
- print " From: ", header_or_empty($mail, 'From'), "\n";
- print " Subject: ", header_or_empty($mail, 'Subject'), "\n";
- print " Date: ", header_or_empty($mail, 'Date'), "\n";
- print " Message-Id: $messageid\n";
- my $ca_score = ca_set(ca_keys($mail->get_body));
- if (exists $spamseen{$messageid}) {
- # XXX THIS DOES NOT DO LOCKING
- open OUT, ">> $gSpamMailbox" or die "open $gSpamMailbox failed: $!";
- print OUT $mail->get_pristine or die "print $gSpamMailbox failed: $!";
- close OUT or die "close $gSpamMailbox failed: $!";
- unlink "incoming/R$id" or warn "unlink incoming/R$id: $!";
- print " spam $spamseen{$messageid} duplicate\n"
- or die "printf log: $!";
- } else {
- my $status = $spam->check($mail);
- my $munged_mail = $status->rewrite_mail();
-
- if ($status->is_spam()) {
- # XXX THIS DOES NOT DO LOCKING
- open OUT, ">> $gSpamMailbox" or die "open $gSpamMailbox failed: $!";
- print OUT $munged_mail or die "print $gSpamMailbox failed: $!";
- close OUT or die "close $gSpamMailbox failed: $!";
- unlink "incoming/R$id" or warn "unlink incoming/R$id: $!";
- my $score = sprintf "%.1f/%.1f %d",
- $status->get_score(), $status->get_required_score(), $ca_score;
- print " spam $score\n" or die "print log: $!";
- $spamseen{$messageid} = $score;
- } elsif ($status->get_score() > 0 && $ca_score >= 4) {
- # XXX THIS DOES NOT DO LOCKING
- open OUT, ">> $gCrossMailbox" or die "open $gCrossMailbox failed: $!";
- print OUT $munged_mail or die "print $gCrossMailbox failed: $!";
- close OUT or die "close $gCrossMailbox failed: $!";
- unlink "incoming/R$id" or warn "unlink incoming/R$id: $!";
- my $score = sprintf "%.1f/%.1f %d",
- $status->get_score(), $status->get_required_score(), $ca_score;
- printf " spam $score\n" or die "printf log: $!";
- $spamseen{$messageid} = $score;
- } else {
- open OUT, "> incoming/I$id" or die "open incoming/I$id: $!";
- my ($received,$from,$rest_of_message) = split /\n/, $munged_mail, 3;
- my ($headers,$body) = split /\n\n/, $rest_of_message, 2;
- if ($received =~ /^From /) {
- ($received,$from) = ($from,$received);
- }
- print OUT map { "$_\n"} ($received,$from,$headers) or die "print incoming/I$id: $!";
- if ($ca_score > 1) {
- print OUT "X-CrossAssassin-Score: $ca_score\n"
- or die "print incoming/I$id: $!";
- }
- print OUT "\n" or die "print incoming/I$id: $!";
- print OUT $body or die "print incoming/I$id: $!";
- close OUT or die "close incoming/I$id: $!";
- unlink "incoming/R$id" or warn "unlink incoming/R$id: $!";
- printf " ok %.1f/%.1f %d\n",
- $status->get_score(), $status->get_required_score(), $ca_score
- or die "printf log: $!";
- }
+alarm 180;
- $status->finish();
+# wait for the spamassasin threads
+$spamscan_stop = 1;
+{
+ lock @ids;
+ cond_broadcast @ids;
+}
+
+while (my $t = shift @sa_threads) {
+ my $tid = $t->tid;
+ lprint "{} waiting for thread $tid\n";
+ my $max_wait = 60;
+ while ($t->is_running and --$max_wait > 0) {
+ sleep 1;
}
- $mail->finish;
+# $t->join;
+}
+
+# wait for the crossassasin thread
+$spamscan_stop = 2;
+{
+ lprint "{} waiting for cross thread\n";
+ lock $cross_key;
+ $cross_key = 1;
+ cond_signal $cross_key;
+}
+my $max_wait = 60;
+while ($cross_thread->is_running and --$max_wait > 0) {
+ sleep 1;
+}
+#$cross_thread->join;
+
+END{
+ foreach my $thread (threads->list()){
+ $thread->join;
+ }
}
+
&unfilelock;
-exit 0;
+
+
+#exit 0;