]> git.donarmstrong.com Git - mothur.git/blobdiff - filterseqscommand.cpp
testing mpi
[mothur.git] / filterseqscommand.cpp
index 0e33ab2047359b432886384cc41d60a806fbdc7c..326be01ff722fda90a61e0dd887f8d3e57107be4 100644 (file)
@@ -131,6 +131,13 @@ FilterSeqsCommand::FilterSeqsCommand(string option)  {
 
 void FilterSeqsCommand::help(){
        try {
+               #ifdef USE_MPI
+                               int pid;
+                               MPI_Comm_rank(MPI_COMM_WORLD, &pid); 
+                                       
+                               if (pid == 0) {
+               #endif
+               
                m->mothurOut("The filter.seqs command reads a file containing sequences and creates a .filter and .filter.fasta file.\n");
                m->mothurOut("The filter.seqs command parameters are fasta, trump, soft, hard and vertical. \n");
                m->mothurOut("The fasta parameter is required. You may enter several fasta files to build the filter from and filter, by separating their names with -'s.\n");
@@ -144,6 +151,10 @@ void FilterSeqsCommand::help(){
                m->mothurOut("Example filter.seqs(fasta=abrecovery.fasta, trump=..., soft=..., hard=..., vertical=T).\n");
                m->mothurOut("Note: No spaces between parameter labels (i.e. fasta), '=' and parameters (i.e.yourFasta).\n\n");
                
+               #ifdef USE_MPI
+                       }
+               #endif
+               
        }
        catch(exception& e) {
                m->errorOut(e, "FilterSeqsCommand", "help");
@@ -221,7 +232,13 @@ int FilterSeqsCommand::execute() {
                
                if (m->control_pressed) {  for(int i = 0; i < outputNames.size(); i++) { remove(outputNames[i].c_str()); }  return 0; }
 
-               
+               #ifdef USE_MPI
+                               int pid;
+                               MPI_Comm_rank(MPI_COMM_WORLD, &pid); 
+                                       
+                               if (pid == 0) {
+               #endif
+
                m->mothurOutEndLine();
                m->mothurOut("Length of filtered alignment: " + toString(filteredLength)); m->mothurOutEndLine();
                m->mothurOut("Number of columns removed: " + toString((alignmentLength-filteredLength))); m->mothurOutEndLine();
@@ -233,6 +250,10 @@ int FilterSeqsCommand::execute() {
                m->mothurOut("Output File Names: "); m->mothurOutEndLine();
                for(int i = 0; i < outputNames.size(); i++) {  m->mothurOut(outputNames[i]); m->mothurOutEndLine();      }
                m->mothurOutEndLine();
+               
+               #ifdef USE_MPI
+                       }
+               #endif
 
                return 0;
                
@@ -267,72 +288,156 @@ string FilterSeqsCommand::createFilter() {
                        for (int s = 0; s < fastafileNames.size(); s++) {
                        
                                for (int i = 0; i < lines.size(); i++) {  delete lines[i];  }  lines.clear();
+                       
+#ifdef USE_MPI 
+                               int pid, rc, ierr; 
+                               char* buf;
+                               int Atag = 1; int Ttag = 2; int Ctag = 3; int Gtag = 4; int Gaptag = 5;
                                
-                               #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux)
-                                       if(processors == 1){
-                                               ifstream inFASTA;
-                                               openInputFile(fastafileNames[s], inFASTA);
-                                               int numFastaSeqs=count(istreambuf_iterator<char>(inFASTA),istreambuf_iterator<char>(), '>');
-                                               inFASTA.close();
+                               MPI_Status status; 
+                               MPI_File in; 
+                               rc = MPI_Comm_size(MPI_COMM_WORLD, &processors);
+                               rc = MPI_Comm_rank(MPI_COMM_WORLD, &pid); 
                                                
-                                               numSeqs += numFastaSeqs;
-                               
-                                               lines.push_back(new linePair(0, numFastaSeqs));
-                               
-                                               driverCreateFilter(F, fastafileNames[s], lines[0]);
-                                       }else{
-                                               vector<int> positions;
-                               
-                                               ifstream inFASTA;
-                                               openInputFile(fastafileNames[s], inFASTA);
-                               
-                                               string input;
-                                               while(!inFASTA.eof()){
-                                                       input = getline(inFASTA);
-                                                       if (input.length() != 0) {
-                                                               if(input[0] == '>'){    long int pos = inFASTA.tellg(); positions.push_back(pos - input.length() - 1);  }
+                                                       
+                               char* tempFileName = &(fastafileNames[s][0]);
+                               MPI_File_open(MPI_COMM_WORLD, tempFileName, MPI_MODE_RDONLY, MPI_INFO_NULL, &in);  //comm, filename, mode, info, filepointer
+                                                               
+                               if (pid == 0) { //you are the root process
+                                               setLines(fastafileNames[s]);
+                                               
+                                               for (int j = 0; j < lines.size(); j++) { //each process
+                                                       if (j != 0) { //don't send to yourself
+                                                               MPI_Send(&lines[j]->start, 1, MPI_INT, j, 2001, MPI_COMM_WORLD); //start position in file
+                                                               MPI_Send(&lines[j]->numSeqs, 1, MPI_INT, j, 2001, MPI_COMM_WORLD); //how many sequences we are sending
+                                                               MPI_Send(&bufferSizes[j], 1, MPI_INT, j, 2001, MPI_COMM_WORLD); //how bytes for the read
                                                        }
                                                }
-                                               inFASTA.close();
-                               
-                                               int numFastaSeqs = positions.size();
+                                               cout << "done sending" << endl;
+                                               cout << "parent = " << pid << " lines = " << lines[pid]->start << '\t' << lines[pid]->numSeqs << " size = " <<  lines.size() << endl;   
+                                               
+                                               buf = new char(bufferSizes[0]);
+                       cout << pid << '\t' << bufferSizes[0] << " line 1 start pos = " << lines[1]->start   << " buffer size 0 " << bufferSizes[0] << " buffer size 1 " << bufferSizes[1] << endl;                     
+                                               MPI_File_read_at(in, 0, buf, bufferSizes[0], MPI_CHAR, &status);
+                                               
+               cout << pid << " done reading " << endl;
+                                               string tempBuf = buf;
+                       cout << pid << '\t' << (tempBuf.substr(0, 10)) << endl;                                                                 
+                                               //do your part
+                                               MPICreateFilter(F, tempBuf);
+                                               
+                                               vector<int> temp; temp.resize(numSeqs);
+                                               
+                                               //get the frequencies from the child processes
+                                               for(int i = 0; i < ((processors-1)*5); i++) { 
+                               cout << "i = " << i << endl;
+                                                       int ierr = MPI_Recv(&temp, numSeqs, MPI_INT, MPI_ANY_SOURCE, 2001, MPI_COMM_WORLD, &status); 
+                                                       
+                                                       int receiveTag = temp[temp.size()-1];  //child process added a int to the end to indicate what letter count this is for
+                                                       
+                                                       int sender = status.MPI_SOURCE; 
+                                                       
+                                                       if (receiveTag == Atag) { //you are recieveing the A frequencies
+                                                               for (int k = 0; k < alignmentLength; k++) {             F.a[k] += temp[k];      }
+                                                       }else if (receiveTag == Ttag) { //you are recieveing the T frequencies
+                                                               for (int k = 0; k < alignmentLength; k++) {             F.t[k] += temp[k];      }
+                                                       }else if (receiveTag == Ctag) { //you are recieveing the C frequencies
+                                                               for (int k = 0; k < alignmentLength; k++) {             F.c[k] += temp[k];      }
+                                                       }else if (receiveTag == Gtag) { //you are recieveing the G frequencies
+                                                               for (int k = 0; k < alignmentLength; k++) {             F.g[k] += temp[k];      }
+                                                       }else if (receiveTag == Gaptag) { //you are recieveing the gap frequencies
+                                                               for (int k = 0; k < alignmentLength; k++) {             F.gap[k] += temp[k];    }
+                                                       }
+                                                       
+                                                       m->mothurOut("receive tag = " + toString(receiveTag) + " " + toString(sender) + " is complete."); m->mothurOutEndLine();
+                                               } 
+
                                                
-                                               numSeqs += numFastaSeqs;
+                               }else { //i am the child process
+                                       int startPos, numLines, bufferSize;
+                               cout << "child = " << pid << endl;
+                                       ierr = MPI_Recv(&startPos, 1, MPI_INT, 0, 2001, MPI_COMM_WORLD, &status);
+                                       ierr = MPI_Recv(&numLines, 1, MPI_INT, 0, 2001, MPI_COMM_WORLD, &status);
+                                       ierr = MPI_Recv(&bufferSize, 1, MPI_INT, 0, 2001, MPI_COMM_WORLD, &status);
+                               cout << "child = " << pid << " done recv messages startpos = " << startPos << " numLines = " << numLines << " buffersize = " << bufferSize << endl;     
                                
-                                               int numSeqsPerProcessor = numFastaSeqs / processors;
+                                       
+                                       //send freqs
+                                       char* buf2 = new char(bufferSize);
+                                       MPI_File_read_at( in, startPos, buf2, bufferSize, MPI_CHAR, &status);
+                               cout << pid << " done reading " << endl;
+                                       
+                                       string tempBuf = buf2;
+                       cout << pid << '\t' << (tempBuf.substr(0, 10)) << endl;
+                                       MPICreateFilter(F, tempBuf);
                                
-                                               for (int i = 0; i < processors; i++) {
-                                                       long int startPos = positions[ i * numSeqsPerProcessor ];
-                                                       if(i == processors - 1){
-                                                               numSeqsPerProcessor = numFastaSeqs - i * numSeqsPerProcessor;
-                                                       }
-                                                       lines.push_back(new linePair(startPos, numSeqsPerProcessor));
-                                               }
+                                       //send my fequency counts
+                                       F.a.push_back(Atag);
+                                       int ierr = MPI_Send( &F.a[0], alignmentLength, MPI_INT, 0, 2001, MPI_COMM_WORLD);
+                                       F.t.push_back(Ttag);
+                                       ierr = MPI_Send( &F.t[0], alignmentLength, MPI_INT, 0, 2001, MPI_COMM_WORLD);
+                                       F.c.push_back(Ctag);
+                                       ierr = MPI_Send( &F.c[0], alignmentLength, MPI_INT, 0, 2001, MPI_COMM_WORLD);
+                                       F.g.push_back(Gtag);
+                                       ierr = MPI_Send( &F.g[0], alignmentLength, MPI_INT, 0, 2001, MPI_COMM_WORLD);
+                                       F.gap.push_back(Gaptag);
+                                       ierr = MPI_Send( &F.gap[0], alignmentLength, MPI_INT, 0, 2001, MPI_COMM_WORLD);
+                                       
+                                       cout << "child " << pid << " done sending counts" << endl;
+                               }
                                
-                                               createProcessesCreateFilter(F, fastafileNames[s]); 
-                                       }
-                               #else
+                               MPI_Barrier(MPI_COMM_WORLD);
+                               
+#else
+               #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux)
+                               if(processors == 1){
                                        ifstream inFASTA;
                                        openInputFile(fastafileNames[s], inFASTA);
                                        int numFastaSeqs=count(istreambuf_iterator<char>(inFASTA),istreambuf_iterator<char>(), '>');
                                        inFASTA.close();
-                                               
+                                       
                                        numSeqs += numFastaSeqs;
-                               
+                                       
                                        lines.push_back(new linePair(0, numFastaSeqs));
+                                       
+                                       driverCreateFilter(F, fastafileNames[s], lines[0]);
+                               }else{
+                                       
+                                       setLines(fastafileNames[s]);                                    
+                                       createProcessesCreateFilter(F, fastafileNames[s]); 
+                               }
+               #else
+                               ifstream inFASTA;
+                               openInputFile(fastafileNames[s], inFASTA);
+                               int numFastaSeqs=count(istreambuf_iterator<char>(inFASTA),istreambuf_iterator<char>(), '>');
+                               inFASTA.close();
                                
-                                       driverCreateFilter(F, lines[0], fastafileNames[s]);
-                               #endif
-                       
+                               numSeqs += numFastaSeqs;
+                               
+                               lines.push_back(new linePair(0, numFastaSeqs));
+                               
+                               driverCreateFilter(F, lines[0], fastafileNames[s]);
+               #endif
+#endif
                        
                        }
                }
-               
+
+#ifdef USE_MPI
+
+//merge all frequency data and create filter string
+                                       //int pid;
+                                       //MPI_Comm_rank(MPI_COMM_WORLD, &pid); 
+                                       
+                                       //if (pid == 0) { //only one process should output to screen
+#endif
+
+       cout << "made it here" << endl; 
                F.setNumSeqs(numSeqs);
                                
                if(isTrue(vertical) == 1)       {       F.doVertical(); }
                if(soft != 0)                           {       F.doSoft();             }
-               
+//cout << "Filter String = " << F.getFilter() << endl;                 
                filterString = F.getFilter();
 
                return filterString;
@@ -361,8 +466,14 @@ int FilterSeqsCommand::driverCreateFilter(Filters& F, string filename, linePair*
                                        if(isTrue(vertical) || soft != 0){      F.getFreqs(seq);        }
                                        cout.flush();
                        }
+                       
+                       //report progress
+                       if((i+1) % 100 == 0){   m->mothurOut(toString(i+1)); m->mothurOutEndLine();             }
                }
-                               
+               
+               //report progress
+               if((line->numSeqs) % 100 != 0){ m->mothurOut(toString(line->numSeqs)); m->mothurOutEndLine();           }
+               
                in.close();
                
                return 0;
@@ -372,6 +483,38 @@ int FilterSeqsCommand::driverCreateFilter(Filters& F, string filename, linePair*
                exit(1);
        }
 }
+/**************************************************************************************/
+int FilterSeqsCommand::MPICreateFilter(Filters& F, string temp) {      
+       try {
+               
+               vector<string> seqStrings;
+               parseBuffer(temp, seqStrings);
+               
+               for(int i=0;i<seqStrings.size();i++){
+                               
+                       if (m->control_pressed) { return 1; }
+                       
+                       Sequence seq("", seqStrings[0]);
+                       
+                       if(trump != '*'){       F.doTrump(seq); }
+                       if(isTrue(vertical) || soft != 0){      F.getFreqs(seq);        }
+                       cout.flush();
+                                               
+                       //report progress
+                       if((i+1) % 100 == 0){   m->mothurOut(toString(i+1)); m->mothurOutEndLine();             }
+               }
+               
+               //report progress
+               if((seqStrings.size()) % 100 != 0){     m->mothurOut(toString(seqStrings.size())); m->mothurOutEndLine();               }
+               
+               return 0;
+       }
+       catch(exception& e) {
+               m->errorOut(e, "FilterSeqsCommand", "MPICreateFilter");
+               exit(1);
+       }
+}
+
 /**************************************************************************************************/
 
 int FilterSeqsCommand::createProcessesCreateFilter(Filters& F, string filename) {
@@ -408,4 +551,84 @@ int FilterSeqsCommand::createProcessesCreateFilter(Filters& F, string filename)
                exit(1);
        }
 }
+/**************************************************************************************************/
+
+int FilterSeqsCommand::setLines(string filename) {
+       try {
+               vector<int> positions;
+               map<int, int> buf;
+               bufferSizes.clear();
+               
+               int pid;
+               MPI_Comm_rank(MPI_COMM_WORLD, &pid); 
+       
+               ifstream inFASTA;
+               openInputFile(filename, inFASTA);
+                       
+               string input;
+               int numbuf = 0;
+               while(!inFASTA.eof()){  
+                       input = getline(inFASTA);
+
+                       if (input.length() != 0) {
+                               numbuf += input.length();
+                               if(input[0] == '>'){    long int pos = inFASTA.tellg(); positions.push_back(pos - input.length() - 1);  buf[(pos - input.length() - 1)] = numbuf; }
+                       }
+               }
+
+               inFASTA.close();
+               
+               int numFastaSeqs = positions.size();
+               
+               numSeqs += numFastaSeqs;
+               
+               int numSeqsPerProcessor = numFastaSeqs / processors;
+               
+               for (int i = 0; i < processors; i++) {
+
+                       long int startPos = positions[ i * numSeqsPerProcessor ];
+                       if(i == processors - 1){
+                               numSeqsPerProcessor = numFastaSeqs - i * numSeqsPerProcessor;
+                               bufferSizes.push_back(numbuf-buf[startPos]);
+                       }else{  
+                               int myEnd = positions[ (i+1) * numSeqsPerProcessor ];
+                               bufferSizes.push_back(buf[myEnd]-buf[startPos]);
+                       }
+                       lines.push_back(new linePair(startPos, numSeqsPerProcessor));
+               }
+               
+               return 0;
+       }
+       catch(exception& e) {
+               m->errorOut(e, "FilterSeqsCommand", "setLines");
+               exit(1);
+       }
+}
+/**************************************************************************************************/
+int FilterSeqsCommand::parseBuffer(string file, vector<string>& seqs) {
+       try {
+               
+               istringstream iss (file,istringstream::in);
+               string name, seqstring;
+               
+               while (iss) {
+                       
+                       if (m->control_pressed) { return 0; }
+               cout << "here" << endl;                 
+                       Sequence seq(iss); 
+       cout << "here1" << endl;                        
+                       gobble(iss);
+       cout << seq.getName() << endl;          
+                       if (seq.getName() != "") {
+                               seqs.push_back(seq.getAligned());       
+                       }
+               }
+               
+               return 0;
+       }
+       catch(exception& e) {
+               m->errorOut(e, "FilterSeqsCommand", "parseBuffer");
+               exit(1);
+       }
+}
 /**************************************************************************************/