]> git.donarmstrong.com Git - biopieces.git/commitdiff
added multithreading to find_adaptor
authormartinahansen <martinahansen@74ccb610-7750-0410-82ae-013aeee3265d>
Sat, 28 May 2011 20:54:36 +0000 (20:54 +0000)
committermartinahansen <martinahansen@74ccb610-7750-0410-82ae-013aeee3265d>
Sat, 28 May 2011 20:54:36 +0000 (20:54 +0000)
git-svn-id: http://biopieces.googlecode.com/svn/trunk@1437 74ccb610-7750-0410-82ae-013aeee3265d

bp_bin/find_adaptor

index 53793095e7998b4c82d91b503025d756fd811611..78e633a34ca0365694022bdf310ed1ef354871f3 100755 (executable)
@@ -37,26 +37,76 @@ require 'maasha/fasta'
 class PatScanError < StandardError; end;
 
 class PatScan
-  def initialize(options, file_fasta, file_pattern, file_patscan)
+  def initialize(options, tmpdir, file_pattern, file_patscan, cpus)
     @options      = options
-    @file_fasta   = file_fasta
     @file_pattern = file_pattern
     @file_patscan = file_patscan
+    @cpus         = cpus
+    @files_fasta  = Dir.glob(File.join(tmpdir, "*.fna"))
 
     pat = Pattern.new(@options)
     pat.write(@file_pattern)
   end
 
+#  def run
+#    child_count = 0
+#
+#    @files_fasta.each do |file|
+#      if fork
+#        Process.wait if ( child_count += 1 ) >= @cpus
+#      else
+#        command = command_compile(file)
+#        system(command)
+#        raise PatScanError, "Command failed: #{command}" unless $?.success?
+#        exit
+#      end
+#    end
+#  end
+
   def run
+    child_count = 0
+
+    @files_fasta.each do |file|
+      Thread.pass while child_count >= @cpus
+      child_count += 1
+
+      Thread.new do
+        command = command_compile(file)
+        system(command)
+        raise PatScanError, "Command failed: #{command}" unless $?.success?
+        child_count -= 1
+      end
+    end
+  end
+
+#  def run
+#    child_count = 0
+#    ch_mutex = Mutex.new
+#    threads = []
+#
+#    @files_fasta.each do |file|
+#      Thread.pass while child_count >= @cpus
+#      ch_mutex.synchronize { child_count += 1 }
+#
+#      threads << Thread.new do
+#        command = command_compile(file)
+#        system(command)
+#        raise PatScanError, "Command failed: #{command}" unless $?.success?
+#        ch_mutex.synchronize { child_count -= 1 }
+#      end
+#    end
+#
+#    threads.each { |t| t.join }
+#  end
+
+  def command_compile(file)
     commands = []
     commands << "nice -n 19"
     commands << "scan_for_matches"
     commands << @file_pattern
-    commands << "< #{@file_fasta}"
-    commands << "> #{@file_patscan}"
+    commands << "< #{file}"
+    commands << "> #{file}.out"
     command = commands.join(" ")
-    system(command)
-    raise PatScanError, "Command failed: #{command}" unless $?.success?
   end
 
   def parse_results
@@ -157,47 +207,65 @@ casts << {:long=>'len',        :short=>'l', :type=>'uint',   :mandatory=>false,
 casts << {:long=>'mismatches', :short=>'m', :type=>'uint',   :mandatory=>false, :default=>10,  :allowed=>nil, :disallowed=>nil}
 casts << {:long=>'insertions', :short=>'i', :type=>'uint',   :mandatory=>false, :default=>5,   :allowed=>nil, :disallowed=>nil}
 casts << {:long=>'deletions',  :short=>'d', :type=>'uint',   :mandatory=>false, :default=>5,   :allowed=>nil, :disallowed=>nil}
+casts << {:long=>'cpus',       :short=>'c', :type=>'uint',   :mandatory=>false, :default=>1,   :allowed=>nil, :disallowed=>'0'}
+
+BASE_PER_FILE = 10_000_000
 
 options = Biopieces.options_parse(ARGV, casts)
 
-tmpdir       = Biopieces.mktmpdir
-file_fasta   = File.join(tmpdir, "data.fna")
+#tmpdir       = Biopieces.mktmpdir
+tmpdir       = "Tyt" # DEBUG TODO
 file_records = File.join(tmpdir, "data.stream")
 file_pattern = File.join(tmpdir, "pattern.txt")
-file_patscan = File.join(tmpdir, "patscan.fna")
+file_patscan = File.join(tmpdir, "patscan.out")
 
-count = 0
+number_file = 0
+number_seq  = 0
+bases       = 0
 
 Biopieces.open(options[:stream_in], file_records) do |input, output|
-  Fasta.open(file_fasta, mode='w') do |out_fa|
-    input.each do |record|
-      output.puts record
+  file_fasta = File.join(tmpdir, "#{number_file}.fna")
+  out_fa     = Fasta.open(file_fasta, mode='w')
 
-      if record.has_key? :SEQ
-        record[:SEQ_NAME] = count
-        out_fa.puts record
+  input.each do |record|
+    output.puts record
 
-        count += 1;
+    if record.has_key? :SEQ
+      record[:SEQ_NAME] = number_seq
+      out_fa.puts record
+
+      number_seq += 1;
+      bases      += record[:SEQ].length
+
+      if bases > BASE_PER_FILE
+        out_fa.close
+        bases = 0
+        number_file += 1
+        file_fasta = File.join(tmpdir, "#{number_file}.fna")
+        out_fa     = Fasta.open(file_fasta, mode='w')
       end
     end
   end
+
+  out_fa.close if out_fa.respond_to? :close
 end
 
-patscan = PatScan.new(options, file_fasta, file_pattern, file_patscan)
+patscan = PatScan.new(options, tmpdir, file_pattern, file_patscan, options[:cpus])
 patscan.run
+exit
 matches = patscan.parse_results
 
-count = 0
+number_seq = 0
 
 Biopieces.open(file_records, options[:stream_out]) do |input, output|
   input.each_record do |record|
     if record.has_key? :SEQ
-      if matches.has_key? count
-        record[:ADAPTOR_POS] = matches[count].first
-        record[:ADAPTOR_LEN] = matches[count].last
+      if matches.has_key? number_seq
+        record[:ADAPTOR_POS] = matches[number_seq].first
+        record[:ADAPTOR_LEN] = matches[number_seq].last
       end
 
-      count += 1;
+      number_seq += 1;
     end
 
     output.puts record