]> git.donarmstrong.com Git - mothur.git/blobdiff - classifyseqscommand.cpp
added MPI code, broke up chimera.seqs into 5 separated commands, added parse.sff...
[mothur.git] / classifyseqscommand.cpp
index ba854e98b40403c8d963fb65c9c4bd6178950947..a9f0a36a2a440e17e8f4f433954c766abba041ed 100644 (file)
@@ -87,15 +87,37 @@ ClassifySeqsCommand::ClassifySeqsCommand(string option)  {
                                        }
                                        
                                        int ableToOpen;
+                                       
+                                       #ifdef USE_MPI  
+                                               int pid;
+                                               MPI_Comm_size(MPI_COMM_WORLD, &processors); //set processors to the number of mpi processes running
+                                               MPI_Comm_rank(MPI_COMM_WORLD, &pid); //find out who we are
+                               
+                                               if (pid == 0) {
+                                       #endif
+                                       
                                        ifstream in;
                                        ableToOpen = openInputFile(fastaFileNames[i], in);
+                                       in.close();
+                                       
+                                       #ifdef USE_MPI  
+                                                       for (int j = 1; j < processors; j++) {
+                                                               MPI_Send(&ableToOpen, 1, MPI_INT, j, 2001, MPI_COMM_WORLD); 
+                                                       }
+                                               }else{
+                                                       MPI_Status status;
+                                                       MPI_Recv(&ableToOpen, 1, MPI_INT, 0, 2001, MPI_COMM_WORLD, &status);
+                                               }
+                                               
+                                       #endif
+                                       
                                        if (ableToOpen == 1) { 
                                                m->mothurOut(fastaFileNames[i] + " will be disregarded."); m->mothurOutEndLine(); 
                                                //erase from file list
                                                fastaFileNames.erase(fastaFileNames.begin()+i);
                                                i--;
                                        }
-                                       in.close();
+                                       
                                }
                                
                                //make sure there is at least one valid file left
@@ -125,12 +147,32 @@ ClassifySeqsCommand::ClassifySeqsCommand(string option)  {
                                                //if the user has not given a path then, add inputdir. else leave path alone.
                                                if (path == "") {       namefileNames[i] = inputDir + namefileNames[i];         }
                                        }
-
                                        int ableToOpen;
+                                       
+                                       #ifdef USE_MPI  
+                                               int pid;
+                                               MPI_Comm_size(MPI_COMM_WORLD, &processors); //set processors to the number of mpi processes running
+                                               MPI_Comm_rank(MPI_COMM_WORLD, &pid); //find out who we are
+                               
+                                               if (pid == 0) {
+                                       #endif
+
                                        ifstream in;
                                        ableToOpen = openInputFile(namefileNames[i], in);
-                                       if (ableToOpen == 1) {  m->mothurOut("Unable to match name file with fasta file."); m->mothurOutEndLine(); abort = true;        }
                                        in.close();
+                                       
+                                       #ifdef USE_MPI  
+                                                       for (int j = 1; j < processors; j++) {
+                                                               MPI_Send(&ableToOpen, 1, MPI_INT, j, 2001, MPI_COMM_WORLD); 
+                                                       }
+                                               }else{
+                                                       MPI_Status status;
+                                                       MPI_Recv(&ableToOpen, 1, MPI_INT, 0, 2001, MPI_COMM_WORLD, &status);
+                                               }
+                                               
+                                       #endif
+                                       if (ableToOpen == 1) {  m->mothurOut("Unable to match name file with fasta file."); m->mothurOutEndLine(); abort = true;        }
+                                       
                                }
                        }
 
@@ -211,6 +253,9 @@ void ClassifySeqsCommand::help(){
                m->mothurOut("The method parameter allows you to specify classification method to use.  Your options are: bayesian and knn. The default is bayesian.\n");
                m->mothurOut("The ksize parameter allows you to specify the kmer size for finding most similar template to candidate.  The default is 8.\n");
                m->mothurOut("The processors parameter allows you to specify the number of processors to use. The default is 1.\n");
+               #ifdef USE_MPI
+               m->mothurOut("When using MPI, the processors parameter is set to the number of MPI processes running. \n");
+               #endif
                m->mothurOut("The match parameter allows you to specify the bonus for having the same base. The default is 1.0.\n");
                m->mothurOut("The mistmatch parameter allows you to specify the penalty for having different bases.  The default is -1.0.\n");
                m->mothurOut("The gapopen parameter allows you to specify the penalty for opening a gap in an alignment. The default is -2.0.\n");
@@ -253,21 +298,6 @@ int ClassifySeqsCommand::execute(){
                                
                for (int s = 0; s < fastaFileNames.size(); s++) {
                
-                       //read namefile
-                       if(namefile != "") {
-                               nameMap.clear(); //remove old names
-                               
-                               ifstream inNames;
-                               openInputFile(namefileNames[s], inNames);
-                               
-                               string firstCol, secondCol;
-                               while(!inNames.eof()) {
-                                       inNames >> firstCol >> secondCol; gobble(inNames);
-                                       nameMap[firstCol] = getNumNames(secondCol);  //ex. seq1 seq1,seq3,seq5 -> seq1 = 3.
-                               }
-                               inNames.close();
-                       }
-               
                        m->mothurOut("Classifying sequences from " + fastaFileNames[s] + " ..." ); m->mothurOutEndLine();
                        
                        if (outputDir == "") { outputDir += hasPath(fastaFileNames[s]); }
@@ -282,7 +312,102 @@ int ClassifySeqsCommand::execute(){
                        int numFastaSeqs = 0;
                        for (int i = 0; i < lines.size(); i++) {  delete lines[i];  }  lines.clear();
                        
-#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux)
+#ifdef USE_MPI 
+                               int pid, end, numSeqsPerProcessor; 
+                               int tag = 2001;
+                               vector<long> MPIPos;
+                               
+                               MPI_Status status; 
+                               MPI_Comm_rank(MPI_COMM_WORLD, &pid); //find out who we are
+                               MPI_Comm_size(MPI_COMM_WORLD, &processors); 
+
+                               MPI_File inMPI;
+                               MPI_File outMPINewTax;
+                               MPI_File outMPITempTax;
+                                                       
+                               int outMode=MPI_MODE_CREATE|MPI_MODE_WRONLY; 
+                               int inMode=MPI_MODE_RDONLY; 
+                                                               
+                               char outNewTax[newTaxonomyFile.length()];
+                               strcpy(outNewTax, newTaxonomyFile.c_str());
+                               
+                               char outTempTax[tempTaxonomyFile.length()];
+                               strcpy(outTempTax, tempTaxonomyFile.c_str());
+                               
+                               char inFileName[fastaFileNames[s].length()];
+                               strcpy(inFileName, fastaFileNames[s].c_str());
+
+                               MPI_File_open(MPI_COMM_WORLD, inFileName, inMode, MPI_INFO_NULL, &inMPI);  //comm, filename, mode, info, filepointer
+                               MPI_File_open(MPI_COMM_WORLD, outNewTax, outMode, MPI_INFO_NULL, &outMPINewTax);
+                               MPI_File_open(MPI_COMM_WORLD, outTempTax, outMode, MPI_INFO_NULL, &outMPITempTax);
+                               
+                               if (m->control_pressed) {  MPI_File_close(&inMPI);  MPI_File_close(&outMPINewTax);   MPI_File_close(&outMPITempTax);  delete classify; return 0;  }
+
+                               if(namefile != "") {  MPIReadNamesFile(namefileNames[s]);  }
+                               
+                               if (pid == 0) { //you are the root process 
+                                       
+                                       MPIPos = setFilePosFasta(fastaFileNames[s], numFastaSeqs); //fills MPIPos, returns numSeqs
+                                       
+                                       //send file positions to all processes
+                                       MPI_Bcast(&numFastaSeqs, 1, MPI_INT, 0, MPI_COMM_WORLD);  //send numSeqs
+                                       MPI_Bcast(&MPIPos[0], (numFastaSeqs+1), MPI_LONG, 0, MPI_COMM_WORLD); //send file pos   
+                                       
+                                       //figure out how many sequences you have to align
+                                       numSeqsPerProcessor = numFastaSeqs / processors;
+                                       if(pid == (processors - 1)){    numSeqsPerProcessor = numFastaSeqs - pid * numSeqsPerProcessor;         }
+                                       int startIndex =  pid * numSeqsPerProcessor;
+                               
+                                       //align your part
+                                       driverMPI(startIndex, numSeqsPerProcessor, inMPI, outMPINewTax, outMPITempTax, MPIPos);
+                                       
+                                       if (m->control_pressed) {  MPI_File_close(&inMPI);  MPI_File_close(&outMPINewTax);   MPI_File_close(&outMPITempTax);  for (int i = 0; i < outputNames.size(); i++) {    remove(outputNames[i].c_str()); } delete classify; return 0;  }
+                                       
+                                       for (int i = 1; i < processors; i++) {
+                                               int done;
+                                               MPI_Recv(&done, 1, MPI_INT, i, tag, MPI_COMM_WORLD, &status);
+                                       }
+                               }else{ //you are a child process
+                                       MPI_Bcast(&numFastaSeqs, 1, MPI_INT, 0, MPI_COMM_WORLD); //get numSeqs
+                                       MPIPos.resize(numFastaSeqs+1);
+                                       MPI_Bcast(&MPIPos[0], (numFastaSeqs+1), MPI_LONG, 0, MPI_COMM_WORLD); //get file positions
+                                       
+                                       //figure out how many sequences you have to align
+                                       numSeqsPerProcessor = numFastaSeqs / processors;
+                                       if(pid == (processors - 1)){    numSeqsPerProcessor = numFastaSeqs - pid * numSeqsPerProcessor;         }
+                                       int startIndex =  pid * numSeqsPerProcessor;
+                                       
+                                       //align your part
+                                       driverMPI(startIndex, numSeqsPerProcessor, inMPI, outMPINewTax, outMPITempTax, MPIPos);
+                                       
+                                       if (m->control_pressed) {  MPI_File_close(&inMPI);  MPI_File_close(&outMPINewTax);   MPI_File_close(&outMPITempTax);  delete classify; return 0;  }
+
+                                       int done = 0;
+                                       MPI_Send(&done, 1, MPI_INT, 0, tag, MPI_COMM_WORLD); 
+                               }
+                               
+                               //close files 
+                               MPI_File_close(&inMPI);
+                               MPI_File_close(&outMPINewTax);
+                               MPI_File_close(&outMPITempTax);
+                               
+#else
+                       //read namefile
+                       if(namefile != "") {
+                               nameMap.clear(); //remove old names
+                               
+                               ifstream inNames;
+                               openInputFile(namefileNames[s], inNames);
+                               
+                               string firstCol, secondCol;
+                               while(!inNames.eof()) {
+                                       inNames >> firstCol >> secondCol; gobble(inNames);
+                                       nameMap[firstCol] = getNumNames(secondCol);  //ex. seq1 seq1,seq3,seq5 -> seq1 = 3.
+                               }
+                               inNames.close();
+                       }
+
+       #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux)
                        if(processors == 1){
                                ifstream inFASTA;
                                openInputFile(fastaFileNames[s], inFASTA);
@@ -333,7 +458,7 @@ int ClassifySeqsCommand::execute(){
                                }
                                
                        }
-#else
+       #else
                        ifstream inFASTA;
                        openInputFile(fastaFileNames[s], inFASTA);
                        numFastaSeqs=count(istreambuf_iterator<char>(inFASTA),istreambuf_iterator<char>(), '>');
@@ -342,7 +467,13 @@ int ClassifySeqsCommand::execute(){
                        lines.push_back(new linePair(0, numFastaSeqs));
                        
                        driver(lines[0], newTaxonomyFile, tempTaxonomyFile, fastaFileNames[s]);
-#endif 
+       #endif  
+#endif
+
+               #ifdef USE_MPI  
+                       if (pid == 0) {  //this part does not need to be paralellized
+               #endif
+
                        //make taxonomy tree from new taxonomy file 
                        PhyloTree taxaBrowser;
                        
@@ -416,6 +547,10 @@ int ClassifySeqsCommand::execute(){
                        remove(newTaxonomyFile.c_str());
                        rename(unclass.c_str(), newTaxonomyFile.c_str());
                        
+                       #ifdef USE_MPI  
+                               }
+                       #endif
+
                        m->mothurOutEndLine();
                        m->mothurOut("Output File Names: "); m->mothurOutEndLine();
                        for (int i = 0; i < outputNames.size(); i++) {  m->mothurOut(outputNames[i]); m->mothurOutEndLine();    }
@@ -577,5 +712,113 @@ int ClassifySeqsCommand::driver(linePair* line, string taxFName, string tempTFNa
                exit(1);
        }
 }
+//**********************************************************************************************************************
+#ifdef USE_MPI
+int ClassifySeqsCommand::driverMPI(int start, int num, MPI_File& inMPI, MPI_File& newFile, MPI_File& tempFile, vector<long>& MPIPos){
+       try {
+               MPI_Status statusNew; 
+               MPI_Status statusTemp; 
+               MPI_Status status; 
+               
+               int pid;
+               MPI_Comm_rank(MPI_COMM_WORLD, &pid); //find out who we are
+       
+               string taxonomy;
+               string outputString;
+
+               for(int i=0;i<num;i++){
+               
+                       if (m->control_pressed) { return 0; }
+               
+                       //read next sequence
+                       int length = MPIPos[start+i+1] - MPIPos[start+i];
+                       char buf4[length];
+                       MPI_File_read_at(inMPI, MPIPos[start+i], buf4, length, MPI_CHAR, &status);
+                       
+                       string tempBuf = buf4;
+                       if (tempBuf.length() > length) { tempBuf = tempBuf.substr(0, length);  }
+                       istringstream iss (tempBuf,istringstream::in);
+
+                       Sequence* candidateSeq = new Sequence(iss);
+                       
+                       if (candidateSeq->getName() != "") {
+                               taxonomy = classify->getTaxonomy(candidateSeq);
+                               
+                               if (taxonomy != "bad seq") {
+                                       //output confidence scores or not
+                                       if (probs) {
+                                               outputString =  candidateSeq->getName() + "\t" + taxonomy + "\n";
+                                       }else{
+                                               outputString =  candidateSeq->getName() + "\t" + classify->getSimpleTax() + "\n";
+                                       }
+                                       
+                                       int length = outputString.length();
+                                       char buf2[length];
+                                       strcpy(buf2, outputString.c_str()); 
+                               
+                                       MPI_File_write_shared(newFile, buf2, length, MPI_CHAR, &statusNew);
+                                       
+                                       outputString =  candidateSeq->getName() + "\t" + classify->getSimpleTax() + "\n";
+                                       length = outputString.length();
+                                       char buf[length];
+                                       strcpy(buf, outputString.c_str()); 
+                               
+                                       MPI_File_write_shared(tempFile, buf, length, MPI_CHAR, &statusTemp);
+                               }
+                       }                               
+                       delete candidateSeq;
+                       
+                       if((i+1) % 100 == 0){   cout << "Classifying sequence " << (i+1) << endl;       }
+               }
+               
+               if(num % 100 != 0){     cout << "Classifying sequence " << (num) << endl;       }
+               
+               
+               return 1;
+       }
+       catch(exception& e) {
+               m->errorOut(e, "ClassifySeqsCommand", "driverMPI");
+               exit(1);
+       }
+}
 
+//**********************************************************************************************************************
+int ClassifySeqsCommand::MPIReadNamesFile(string nameFilename){
+       try {
+       
+               nameMap.clear(); //remove old names
+               
+               MPI_File inMPI;
+               MPI_Offset size;
+               MPI_Status status;
+               
+               char inFileName[nameFilename.length()];
+               strcpy(inFileName, nameFilename.c_str());
+
+               MPI_File_open(MPI_COMM_WORLD, inFileName, MPI_MODE_RDONLY, MPI_INFO_NULL, &inMPI);  
+               MPI_File_get_size(inMPI, &size);
+
+               char buffer[size];
+               MPI_File_read(inMPI, buffer, size, MPI_CHAR, &status);
+
+               string tempBuf = buffer;
+               if (tempBuf.length() > size) { tempBuf = tempBuf.substr(0, size);  }
+               istringstream iss (tempBuf,istringstream::in);
+               
+               string firstCol, secondCol;
+               while(!iss.eof()) {
+                       iss >> firstCol >> secondCol; gobble(iss);
+                       nameMap[firstCol] = getNumNames(secondCol);  //ex. seq1 seq1,seq3,seq5 -> seq1 = 3.
+               }
+       
+               MPI_File_close(&inMPI);
+               
+               return 1;
+       }
+       catch(exception& e) {
+               m->errorOut(e, "ClassifySeqsCommand", "MPIReadNamesFile");
+               exit(1);
+       }
+}
+#endif
 /**************************************************************************************************/