]> git.donarmstrong.com Git - mothur.git/blobdiff - aligncommand.cpp
added count.groups command and paralellized align.seqs for windows
[mothur.git] / aligncommand.cpp
index a32ec4df279b2fa1158ec5e818ce03545de4ed0c..b7c684311a5e186576d6f70c91dade75c76b31f4 100644 (file)
  */
 
 #include "aligncommand.h"
-#include "sequence.hpp"
-
-#include "gotohoverlap.hpp"
-#include "needlemanoverlap.hpp"
-#include "blastalign.hpp"
-#include "noalign.hpp"
-
-#include "nast.hpp"
-#include "nastreport.hpp"
 #include "referencedb.h"
 
 //**********************************************************************************************************************
@@ -256,6 +247,11 @@ AlignCommand::AlignCommand(string option)  {
                        
                        temp = validParameter.validFile(parameters, "save", false);                     if (temp == "not found"){       temp = "f";                             }
                        save = m->isTrue(temp); 
+                       //this is so the threads can quickly load the reference data
+                       #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux)
+                       #else
+                               if (processors != 1) { save = true; }
+                       #endif
                        rdb->save = save; 
                        if (save) { //clear out old references
                                rdb->clearMemory();
@@ -298,7 +294,6 @@ AlignCommand::~AlignCommand(){
        if (abort == false) {
                for (int i = 0; i < lines.size(); i++) {  delete lines[i];  }  lines.clear();
                delete templateDB;
-               delete alignment;
        }
 }
 //**********************************************************************************************************************
@@ -308,17 +303,6 @@ int AlignCommand::execute(){
                if (abort == true) { if (calledHelp) { return 0; }  return 2;   }
 
                templateDB = new AlignmentDB(templateFileName, search, kmerSize, gapOpen, gapExtend, match, misMatch);
-               int longestBase = templateDB->getLongestBase();
-               
-               if(align == "gotoh")                    {       alignment = new GotohOverlap(gapOpen, gapExtend, match, misMatch, longestBase);                 }
-               else if(align == "needleman")   {       alignment = new NeedlemanOverlap(gapOpen, match, misMatch, longestBase);                                }
-               else if(align == "blast")               {       alignment = new BlastAlignment(gapOpen, gapExtend, match, misMatch);            }
-               else if(align == "noalign")             {       alignment = new NoAlign();                                                                                                      }
-               else {
-                       m->mothurOut(align + " is not a valid alignment option. I will run the command using needleman.");
-                       m->mothurOutEndLine();
-                       alignment = new NeedlemanOverlap(gapOpen, match, misMatch, longestBase);
-               }
                
                for (int s = 0; s < candidateFileNames.size(); s++) {
                        if (m->control_pressed) { outputTypes.clear(); return 0; }
@@ -442,20 +426,28 @@ int AlignCommand::execute(){
                                
 #else
 
-               vector<unsigned long int> positions = m->divideFile(candidateFileNames[s], processors);
-               for (int i = 0; i < (positions.size()-1); i++) {
-                       lines.push_back(new linePair(positions[i], positions[(i+1)]));
-               }       
-       #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux)
+                       vector<unsigned long int> positions; 
+               #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux)
+                       positions = m->divideFile(candidateFileNames[s], processors);
+                       for (int i = 0; i < (positions.size()-1); i++) {        lines.push_back(new linePair(positions[i], positions[(i+1)]));  }
+               #else
+                       positions = m->setFilePosFasta(candidateFileNames[s], numFastaSeqs); 
+                       
+                       //figure out how many sequences you have to process
+                       int numSeqsPerProcessor = numFastaSeqs / processors;
+                       for (int i = 0; i < processors; i++) {
+                               int startIndex =  i * numSeqsPerProcessor;
+                               if(i == (processors - 1)){      numSeqsPerProcessor = numFastaSeqs - i * numSeqsPerProcessor;   }
+                               lines.push_back(new linePair(positions[startIndex], numSeqsPerProcessor));
+                       }
+               #endif
+                       
                        if(processors == 1){
                                numFastaSeqs = driver(lines[0], alignFileName, reportFileName, accnosFileName, candidateFileNames[s]);
                        }else{
                                numFastaSeqs = createProcesses(alignFileName, reportFileName, accnosFileName, candidateFileNames[s]); 
                        }
-       #else
-                       numFastaSeqs = driver(lines[0], alignFileName, reportFileName, accnosFileName, candidateFileNames[s]);
-       #endif
-                       
+                               
                        if (m->control_pressed) { m->mothurRemove(accnosFileName); m->mothurRemove(alignFileName); m->mothurRemove(reportFileName); outputTypes.clear();  return 0; }
                        
                        //delete accnos file if its blank else report to user
@@ -511,7 +503,6 @@ int AlignCommand::execute(){
 }
 
 //**********************************************************************************************************************
-
 int AlignCommand::driver(linePair* filePos, string alignFName, string reportFName, string accnosFName, string filename){
        try {
                ofstream alignmentFile;
@@ -529,10 +520,23 @@ int AlignCommand::driver(linePair* filePos, string alignFName, string reportFNam
 
                bool done = false;
                int count = 0;
+               
+               //moved this into driver to avoid deep copies in windows paralellized version
+               Alignment* alignment;
+               int longestBase = templateDB->getLongestBase();
+               if(align == "gotoh")                    {       alignment = new GotohOverlap(gapOpen, gapExtend, match, misMatch, longestBase);                 }
+               else if(align == "needleman")   {       alignment = new NeedlemanOverlap(gapOpen, match, misMatch, longestBase);                                }
+               else if(align == "blast")               {       alignment = new BlastAlignment(gapOpen, gapExtend, match, misMatch);            }
+               else if(align == "noalign")             {       alignment = new NoAlign();                                                                                                      }
+               else {
+                       m->mothurOut(align + " is not a valid alignment option. I will run the command using needleman.");
+                       m->mothurOutEndLine();
+                       alignment = new NeedlemanOverlap(gapOpen, match, misMatch, longestBase);
+               }
        
                while (!done) {
                        
-                       if (m->control_pressed) {  return 0; }
+                       if (m->control_pressed) {  break; }
                        
                        Sequence* candidateSeq = new Sequence(inFASTA);  m->gobble(inFASTA);
                        report.setCandidate(candidateSeq);
@@ -548,7 +552,7 @@ int AlignCommand::driver(linePair* filePos, string alignFName, string reportFNam
                                                                
                                Sequence temp = templateDB->findClosestSequence(candidateSeq);
                                Sequence* templateSeq = &temp;
-                       
+                               
                                float searchScore = templateDB->getSearchScore();
                                                                
                                Nast* nast = new Nast(alignment, candidateSeq, templateSeq);
@@ -628,6 +632,7 @@ int AlignCommand::driver(linePair* filePos, string alignFName, string reportFNam
                //report progress
                if((count) % 100 != 0){ m->mothurOut(toString(count)); m->mothurOutEndLine();           }
                
+               delete alignment;
                alignmentFile.close();
                inFASTA.close();
                accnosFile.close();
@@ -665,9 +670,22 @@ int AlignCommand::driverMPI(int start, int num, MPI_File& inMPI, MPI_File& align
             delete buf;
                }
                
+               Alignment* alignment;
+               int longestBase = templateDB->getLongestBase();
+               if(align == "gotoh")                    {       alignment = new GotohOverlap(gapOpen, gapExtend, match, misMatch, longestBase);                 }
+               else if(align == "needleman")   {       alignment = new NeedlemanOverlap(gapOpen, match, misMatch, longestBase);                                }
+               else if(align == "blast")               {       alignment = new BlastAlignment(gapOpen, gapExtend, match, misMatch);            }
+               else if(align == "noalign")             {       alignment = new NoAlign();                                                                                                      }
+               else {
+                       m->mothurOut(align + " is not a valid alignment option. I will run the command using needleman.");
+                       m->mothurOutEndLine();
+                       alignment = new NeedlemanOverlap(gapOpen, match, misMatch, longestBase);
+               }
+               
+               
                for(int i=0;i<num;i++){
                
-                       if (m->control_pressed) {  return 0; }
+                       if (m->control_pressed) { delete alignment; return 0; }
 
                        //read next sequence
                        int length = MPIPos[start+i+1] - MPIPos[start+i];
@@ -807,11 +825,10 @@ int AlignCommand::driverMPI(int start, int num, MPI_File& inMPI, MPI_File& align
 
 int AlignCommand::createProcesses(string alignFileName, string reportFileName, string accnosFName, string filename) {
        try {
-#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux)
+               int num = 0;
                processIDS.resize(0);
+#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux)
                int process = 1;
-               int num = 0;
-               //              processIDS.resize(0);
                
                //loop through and create all the processes you want
                while (process != processors) {
@@ -883,9 +900,90 @@ int AlignCommand::createProcesses(string alignFileName, string reportFileName, s
                        m->openOutputFile(accnosFName, out);
                        out.close();
                }
+#else
+               //////////////////////////////////////////////////////////////////////////////////////////////////////
+               //Windows version shared memory, so be careful when passing variables through the alignData struct. 
+               //Above fork() will clone, so memory is separate, but that's not the case with windows, 
+               //////////////////////////////////////////////////////////////////////////////////////////////////////
+               
+               vector<alignData*> pDataArray; 
+               DWORD   dwThreadIdArray[processors-1];
+               HANDLE  hThreadArray[processors-1]; 
+               
+               //Create processor worker threads.
+               for( int i=0; i<processors-1; i++ ){
+                       //copy templateDb
+                       //AlignmentDB* tempDB = new AlignmentDB(*templateDB);
+                       
+                       // Allocate memory for thread data.
+                       string extension = "";
+                       if (i != 0) { extension = toString(i) + ".temp"; }
+                       
+                       alignData* tempalign = new alignData((alignFileName + extension), (reportFileName + extension), (accnosFName + extension), filename, align, search, kmerSize, m, lines[i]->start, lines[i]->end, flip, match, misMatch, gapOpen, gapExtend, threshold);
+                       pDataArray.push_back(tempalign);
+                       processIDS.push_back(i);
+                               
+                       //MySeqSumThreadFunction is in header. It must be global or static to work with the threads.
+                       //default security attributes, thread function name, argument to thread function, use default creation flags, returns the thread identifier
+                       hThreadArray[i] = CreateThread(NULL, 0, MyAlignThreadFunction, pDataArray[i], 0, &dwThreadIdArray[i]);   
+               }
+               
+               //need to check for line ending error
+               ifstream inFASTA;
+               m->openInputFile(filename, inFASTA);
+               inFASTA.seekg(lines[processors-1]->start-1);
+               char c = inFASTA.peek();
+               
+               if (c == '>') { //we need to move back
+                       lines[processors-1]->start--; 
+               }
+               
+               //using the main process as a worker saves time and memory
+               //do my part - do last piece because windows is looking for eof
+               num = driver(lines[processors-1], (alignFileName + toString(processors-1) + ".temp"), (reportFileName + toString(processors-1) + ".temp"), (accnosFName + toString(processors-1) + ".temp"), filename);
+               
+               //Wait until all threads have terminated.
+               WaitForMultipleObjects(processors-1, hThreadArray, TRUE, INFINITE);
+               
+               //Close all thread handles and free memory allocations.
+               for(int i=0; i < pDataArray.size(); i++){
+                       num += pDataArray[i]->count;
+                       CloseHandle(hThreadArray[i]);
+                       delete pDataArray[i];
+               }
+               
+               vector<string> nonBlankAccnosFiles;
+               if (!(m->isBlank(accnosFName))) { nonBlankAccnosFiles.push_back(accnosFName); }
+               else { m->mothurRemove(accnosFName); } //remove so other files can be renamed to it
+               
+               for (int i = 1; i < processors; i++) {
+                       appendAlignFiles((alignFileName + toString(i) + ".temp"), alignFileName);
+                       m->mothurRemove((alignFileName + toString(i) + ".temp"));
+                       
+                       appendReportFiles((reportFileName + toString(i) + ".temp"), reportFileName);
+                       m->mothurRemove((reportFileName + toString(i) + ".temp"));
+                       
+                       if (!(m->isBlank(accnosFName + toString(i) + ".temp"))) {
+                               nonBlankAccnosFiles.push_back(accnosFName + toString(i) + ".temp");
+                       }else { m->mothurRemove((accnosFName + toString(i) + ".temp"));  }
+               }
+               
+               //append accnos files
+               if (nonBlankAccnosFiles.size() != 0) { 
+                       rename(nonBlankAccnosFiles[0].c_str(), accnosFName.c_str());
+                       
+                       for (int h=1; h < nonBlankAccnosFiles.size(); h++) {
+                               appendAlignFiles(nonBlankAccnosFiles[h], accnosFName);
+                               m->mothurRemove(nonBlankAccnosFiles[h]);
+                       }
+               }else { //recreate the accnosfile if needed
+                       ofstream out;
+                       m->openOutputFile(accnosFName, out);
+                       out.close();
+               }       
+#endif 
                
                return num;
-#endif         
        }
        catch(exception& e) {
                m->errorOut(e, "AlignCommand", "createProcesses");