From: martinahansen Date: Sat, 28 May 2011 20:54:36 +0000 (+0000) Subject: added multithreading to find_adaptor X-Git-Url: https://git.donarmstrong.com/?a=commitdiff_plain;h=cf1ef27d93920bab503036f21bce0853ac40310a;p=biopieces.git added multithreading to find_adaptor git-svn-id: http://biopieces.googlecode.com/svn/trunk@1437 74ccb610-7750-0410-82ae-013aeee3265d --- diff --git a/bp_bin/find_adaptor b/bp_bin/find_adaptor index 5379309..78e633a 100755 --- a/bp_bin/find_adaptor +++ b/bp_bin/find_adaptor @@ -37,26 +37,76 @@ require 'maasha/fasta' class PatScanError < StandardError; end; class PatScan - def initialize(options, file_fasta, file_pattern, file_patscan) + def initialize(options, tmpdir, file_pattern, file_patscan, cpus) @options = options - @file_fasta = file_fasta @file_pattern = file_pattern @file_patscan = file_patscan + @cpus = cpus + @files_fasta = Dir.glob(File.join(tmpdir, "*.fna")) pat = Pattern.new(@options) pat.write(@file_pattern) end +# def run +# child_count = 0 +# +# @files_fasta.each do |file| +# if fork +# Process.wait if ( child_count += 1 ) >= @cpus +# else +# command = command_compile(file) +# system(command) +# raise PatScanError, "Command failed: #{command}" unless $?.success? +# exit +# end +# end +# end + def run + child_count = 0 + + @files_fasta.each do |file| + Thread.pass while child_count >= @cpus + child_count += 1 + + Thread.new do + command = command_compile(file) + system(command) + raise PatScanError, "Command failed: #{command}" unless $?.success? + child_count -= 1 + end + end + end + +# def run +# child_count = 0 +# ch_mutex = Mutex.new +# threads = [] +# +# @files_fasta.each do |file| +# Thread.pass while child_count >= @cpus +# ch_mutex.synchronize { child_count += 1 } +# +# threads << Thread.new do +# command = command_compile(file) +# system(command) +# raise PatScanError, "Command failed: #{command}" unless $?.success? +# ch_mutex.synchronize { child_count -= 1 } +# end +# end +# +# threads.each { |t| t.join } +# end + + def command_compile(file) commands = [] commands << "nice -n 19" commands << "scan_for_matches" commands << @file_pattern - commands << "< #{@file_fasta}" - commands << "> #{@file_patscan}" + commands << "< #{file}" + commands << "> #{file}.out" command = commands.join(" ") - system(command) - raise PatScanError, "Command failed: #{command}" unless $?.success? end def parse_results @@ -157,47 +207,65 @@ casts << {:long=>'len', :short=>'l', :type=>'uint', :mandatory=>false, casts << {:long=>'mismatches', :short=>'m', :type=>'uint', :mandatory=>false, :default=>10, :allowed=>nil, :disallowed=>nil} casts << {:long=>'insertions', :short=>'i', :type=>'uint', :mandatory=>false, :default=>5, :allowed=>nil, :disallowed=>nil} casts << {:long=>'deletions', :short=>'d', :type=>'uint', :mandatory=>false, :default=>5, :allowed=>nil, :disallowed=>nil} +casts << {:long=>'cpus', :short=>'c', :type=>'uint', :mandatory=>false, :default=>1, :allowed=>nil, :disallowed=>'0'} + +BASE_PER_FILE = 10_000_000 options = Biopieces.options_parse(ARGV, casts) -tmpdir = Biopieces.mktmpdir -file_fasta = File.join(tmpdir, "data.fna") +#tmpdir = Biopieces.mktmpdir +tmpdir = "Tyt" # DEBUG TODO file_records = File.join(tmpdir, "data.stream") file_pattern = File.join(tmpdir, "pattern.txt") -file_patscan = File.join(tmpdir, "patscan.fna") +file_patscan = File.join(tmpdir, "patscan.out") -count = 0 +number_file = 0 +number_seq = 0 +bases = 0 Biopieces.open(options[:stream_in], file_records) do |input, output| - Fasta.open(file_fasta, mode='w') do |out_fa| - input.each do |record| - output.puts record + file_fasta = File.join(tmpdir, "#{number_file}.fna") + out_fa = Fasta.open(file_fasta, mode='w') - if record.has_key? :SEQ - record[:SEQ_NAME] = count - out_fa.puts record + input.each do |record| + output.puts record - count += 1; + if record.has_key? :SEQ + record[:SEQ_NAME] = number_seq + out_fa.puts record + + number_seq += 1; + bases += record[:SEQ].length + + if bases > BASE_PER_FILE + out_fa.close + bases = 0 + number_file += 1 + file_fasta = File.join(tmpdir, "#{number_file}.fna") + out_fa = Fasta.open(file_fasta, mode='w') end end end + + out_fa.close if out_fa.respond_to? :close end -patscan = PatScan.new(options, file_fasta, file_pattern, file_patscan) +patscan = PatScan.new(options, tmpdir, file_pattern, file_patscan, options[:cpus]) patscan.run +exit matches = patscan.parse_results -count = 0 +number_seq = 0 Biopieces.open(file_records, options[:stream_out]) do |input, output| input.each_record do |record| if record.has_key? :SEQ - if matches.has_key? count - record[:ADAPTOR_POS] = matches[count].first - record[:ADAPTOR_LEN] = matches[count].last + if matches.has_key? number_seq + record[:ADAPTOR_POS] = matches[number_seq].first + record[:ADAPTOR_LEN] = matches[number_seq].last end - count += 1; + number_seq += 1; end output.puts record