From 672a430873e686b884bccb1b027321658c541b25 Mon Sep 17 00:00:00 2001 From: Don Armstrong Date: Sat, 25 Aug 2007 02:23:41 +0000 Subject: [PATCH] add threads implementation to actually handle doing the useful stuff git-svn-id: file:///srv/svn/function2gene/trunk@4 a0738b58-4706-0410-8799-fb830574a030 --- bin/do_it_all | 121 ++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 87 insertions(+), 34 deletions(-) diff --git a/bin/do_it_all b/bin/do_it_all index b85356d..3d3cfa3 100755 --- a/bin/do_it_all +++ b/bin/do_it_all @@ -79,17 +79,20 @@ Display this manual. use vars qw($DEBUG); +use Cwd qw(abs_path); +use IO::File; +use Storable qw(thaw freeze); my %options = (databases => [], keywords => [], debug => 0, help => 0, man => 0, - directory => '', + results => '', ); GetOptions(\%options,'keywords=s@','databases=s@', - 'restart_at|restart-at=s', + 'restart_at|restart-at=s','results=s', 'debug|d+','help|h|?','man|m'); pod2usage() if $options{help}; @@ -104,11 +107,11 @@ $ERRORS.="unknown database(s)" if @{$options{databases}} and grep {$_ !~ /^(?:ncbi|genecards|harvester)$/i} @{$options{databases}}; -if (not length $options{directory}) { - $ERRORS.="directory not specified"; +if (not length $options{results}) { + $ERRORS.="results directory not specified"; } -elsif (not -d $options{directory} or not -w $options{directory}) { - $ERRORS.="directory $options{directory} does not exist or is not writeable"; +elsif (not -d $options{results} or not -w $options{results}) { + $ERRORS.="results directory $options{results} does not exist or is not writeable"; } pod2usage($ERRORS) if length $ERRORS; @@ -128,21 +131,27 @@ $DEBUG = $options{debug}; my %state; -if (-e "$options{directory}/do_it_all_state") { +$options{keywords} = [map {abs_path($_)} @{$options{keywords}}]; + +chdir $options{results} or die "Unable to chdir to $options{results}"; + +if (-e "do_it_all_state") { ADVISE("Using existing state information"); - my $state_fh = IO::File->new("$options{directory}/do_it_all_state",'r') or die + my $state_fh = IO::File->new("do_it_all_state",'r') or die "Unable to open state file for reading: $!"; local $/; - my $state_file = <$state_fh> or die "Unabel to read state file $!"; + my $state_file = <$state_fh>; %state = %{thaw($state_file)} or die "Unable to thaw state file"; } else { ADVISE("Starting new run"); %state = (keywords => [], databases => [map {lc($_)} @{$options{databases}}], - gotten_keywords => {}, - parsed_keywords => {}, - combined_keywords => {}, + done_keywords => { + get => {}, + parse => {}, + combine => {}, + }, ); } @@ -188,40 +197,84 @@ if (exists $options{restart_at} and length $options{restart_at}) { # for each keyword, we check to see if we've got results, parsed # results, and combined it. If not, we queue up those actions. -my @get_needed = (); -my @parse_needed = (); -my $combine_needed = 0; +my %actions = (combine => 0, + get => {}, + parse => {}, + ); + +if (not @{$state{keywords}}) { + ADVISE("There are no keywords specified"); +} for my $keyword (@{$state{keywords}}) { for my $database (@{$state{databases}}) { - if (not exists $state{gotten_keywords}{$database}{$keyword}) { - push @get_needed,[$database,$keyword]; - delete $state{parsed_keywords}{$database}{$keyword} if - exists $state{gotten_keywords}{$database}{$keyword}; - delete $state{combined_keywords}{$database}{$keyword} if - exists $state{gotten_keywords}{$database}{$keyword}; + if (not exists $state{done_keywords}{get}{$database}{$keyword}) { + push @{$actions{get}{$database}}, $keyword; + delete $state{done_keywords}{parse}{$database}{$keyword} if + exists $state{done_keywords}{parse}{$database}{$keyword}; + delete $state{done_keywords}{combine}{$database}{$keyword} if + exists $state{done_keywords}{combine}{$database}{$keyword}; } - if (not exists $state{parsed_keywords}{$database}{$keyword}) { - push @parse_needed,[$database,$keyword]; - delete $state{combined_keywords}{$database}{$keyword} if - exists $state{gotten_keywords}{$database}{$keyword}; + if (not exists $state{done_keywords}{parse}{$database}{$keyword}) { + push @{$actions{parse}{$database}},$keyword; + delete $state{done_keywords}{combine}{$database}{$keyword} if + exists $state{done_keywords}{combine}{$database}{$keyword}; } - if (not exists $state{combined_keywords}{$database}{$keyword}) { - $combine_needed = 1; + if (not exists $state{done_keywords}{combine}{$database}{$keyword}) { + $actions{combine} = 1; } } } -# handle getting needed results -for my $action (@get_needed) { - -} -# handle parsing needed results -for my $action (@parse_needed) { +use threads; +use Thread::Queue; + +for my $state (qw(get parse)) { + my %databases; + for my $database (keys %{$actions{$state}}) { + next unless @{$actions{$state}{$database}}; + $databases{$database}{queue} = Thread::Queue->new; + $databases{$database}{thread} = threads->new(\&handle_action($state,$database,$databases{database}{queue})); + $databases{$database}{queue}->enqueue(@{$actions{$state}{$database}}); + $databases{$database}{queue}->enqueue(undef); + } + my $ERRORS=0; + for my $database (keys %databases) { + my ($actioned_keywords,$failed_keywords) = $databases{$database}{thread}->join; + if (@{$failed_keywords}) { + ADVISE("These keywords failed during '$state' of '$database':",@{$failed_keywords}); + $ERRORS=1; + } + @{$state{done_keywords}{$state}{$database}}{@{$actioned_keywords}} = (1) x @{$actioned_keywords}; + delete @{$state{done_keywords}{$state}{$database}}{@{$failed_keywords}}; + } + save_state(\%state); + if ($ERRORS) { + WARN("Stoping, as there are errors"); + exit 1; + } } -# handle combining results +sub handle_action{ + my ($state,$database,$queue) = @_; + my $keyword; + my $actioned_keywords = (); + my $failed_keywords = (); + while ($keyword = $queue->dequeue) { + # handle the action, baybee + ADVISE("$state results from '$database' for '$keyword'"); + push @{$actioned_keywords},$keyword; + } + return ($actioned_keywords,$failed_keywords); +} +sub save_state{ + my ($state) = @_; + my $state_fh = IO::File->new("do_it_all_state",'w') or die + "Unable to open state file for writing: $!"; + print {$state_fh} freeze($state) or die "Unable to freeze state file"; + close $state_fh or die "Unable to close state file: $!"; +} sub ADVISE{ -- 2.39.2