]> git.donarmstrong.com Git - mothur.git/blobdiff - filterseqscommand.cpp
added MPI to dist.seqs command
[mothur.git] / filterseqscommand.cpp
index 326be01ff722fda90a61e0dd887f8d3e57107be4..aa38e7affa36e65ada9463813bfce76438c382fe 100644 (file)
@@ -131,13 +131,7 @@ FilterSeqsCommand::FilterSeqsCommand(string option)  {
 
 void FilterSeqsCommand::help(){
        try {
-               #ifdef USE_MPI
-                               int pid;
-                               MPI_Comm_rank(MPI_COMM_WORLD, &pid); 
-                                       
-                               if (pid == 0) {
-               #endif
-               
+                               
                m->mothurOut("The filter.seqs command reads a file containing sequences and creates a .filter and .filter.fasta file.\n");
                m->mothurOut("The filter.seqs command parameters are fasta, trump, soft, hard and vertical. \n");
                m->mothurOut("The fasta parameter is required. You may enter several fasta files to build the filter from and filter, by separating their names with -'s.\n");
@@ -151,10 +145,6 @@ void FilterSeqsCommand::help(){
                m->mothurOut("Example filter.seqs(fasta=abrecovery.fasta, trump=..., soft=..., hard=..., vertical=T).\n");
                m->mothurOut("Note: No spaces between parameter labels (i.e. fasta), '=' and parameters (i.e.yourFasta).\n\n");
                
-               #ifdef USE_MPI
-                       }
-               #endif
-               
        }
        catch(exception& e) {
                m->errorOut(e, "FilterSeqsCommand", "help");
@@ -232,13 +222,7 @@ int FilterSeqsCommand::execute() {
                
                if (m->control_pressed) {  for(int i = 0; i < outputNames.size(); i++) { remove(outputNames[i].c_str()); }  return 0; }
 
-               #ifdef USE_MPI
-                               int pid;
-                               MPI_Comm_rank(MPI_COMM_WORLD, &pid); 
-                                       
-                               if (pid == 0) {
-               #endif
-
+               
                m->mothurOutEndLine();
                m->mothurOut("Length of filtered alignment: " + toString(filteredLength)); m->mothurOutEndLine();
                m->mothurOut("Number of columns removed: " + toString((alignmentLength-filteredLength))); m->mothurOutEndLine();
@@ -251,10 +235,6 @@ int FilterSeqsCommand::execute() {
                for(int i = 0; i < outputNames.size(); i++) {  m->mothurOut(outputNames[i]); m->mothurOutEndLine();      }
                m->mothurOutEndLine();
                
-               #ifdef USE_MPI
-                       }
-               #endif
-
                return 0;
                
        }
@@ -291,8 +271,8 @@ string FilterSeqsCommand::createFilter() {
                        
 #ifdef USE_MPI 
                                int pid, rc, ierr; 
-                               char* buf;
                                int Atag = 1; int Ttag = 2; int Ctag = 3; int Gtag = 4; int Gaptag = 5;
+                               int tag = 2001;
                                
                                MPI_Status status; 
                                MPI_File in; 
@@ -300,38 +280,80 @@ string FilterSeqsCommand::createFilter() {
                                rc = MPI_Comm_rank(MPI_COMM_WORLD, &pid); 
                                                
                                                        
-                               char* tempFileName = &(fastafileNames[s][0]);
+                               char* tempFileName = new char(fastafileNames[s].length());
+                               tempFileName = &(fastafileNames[s][0]);
+                               
                                MPI_File_open(MPI_COMM_WORLD, tempFileName, MPI_MODE_RDONLY, MPI_INFO_NULL, &in);  //comm, filename, mode, info, filepointer
-                                                               
+                               
                                if (pid == 0) { //you are the root process
                                                setLines(fastafileNames[s]);
                                                
                                                for (int j = 0; j < lines.size(); j++) { //each process
                                                        if (j != 0) { //don't send to yourself
-                                                               MPI_Send(&lines[j]->start, 1, MPI_INT, j, 2001, MPI_COMM_WORLD); //start position in file
-                                                               MPI_Send(&lines[j]->numSeqs, 1, MPI_INT, j, 2001, MPI_COMM_WORLD); //how many sequences we are sending
-                                                               MPI_Send(&bufferSizes[j], 1, MPI_INT, j, 2001, MPI_COMM_WORLD); //how bytes for the read
+                                                               MPI_Send(&lines[j]->start, 1, MPI_INT, j, tag, MPI_COMM_WORLD); //start position in file
+                                                               MPI_Send(&lines[j]->numSeqs, 1, MPI_INT, j, tag, MPI_COMM_WORLD); //how many sequences we are sending
+                                                               MPI_Send(&bufferSizes[j], 1, MPI_INT, j, tag, MPI_COMM_WORLD); //how bytes for the read
                                                        }
                                                }
-                                               cout << "done sending" << endl;
-                                               cout << "parent = " << pid << " lines = " << lines[pid]->start << '\t' << lines[pid]->numSeqs << " size = " <<  lines.size() << endl;   
-                                               
-                                               buf = new char(bufferSizes[0]);
-                       cout << pid << '\t' << bufferSizes[0] << " line 1 start pos = " << lines[1]->start   << " buffer size 0 " << bufferSizes[0] << " buffer size 1 " << bufferSizes[1] << endl;                     
+                                               //cout << "done sending" << endl;
+                                               //cout << "parent = " << pid << " lines = " << lines[pid]->start << '\t' << lines[pid]->numSeqs << " size = " <<  lines.size() << endl; 
+                                                
+                               cout << "parent =  " << pid << " address of Filter " << &F << " address of FilterString  " << &filterString << " address of numSeqs = " << &numSeqs << " address of soft = " << &soft << endl;          
+                               
+                                               char* buf = new char(bufferSizes[0]);
+                       //cout << pid << '\t' << bufferSizes[0] << " line 1 start pos = " << lines[1]->start   << " buffer size 0 " << bufferSizes[0] << " buffer size 1 " << bufferSizes[1] << endl;                   
                                                MPI_File_read_at(in, 0, buf, bufferSizes[0], MPI_CHAR, &status);
                                                
-               cout << pid << " done reading " << endl;
+               cout << pid << " done reading " << &buf <<  endl;
                                                string tempBuf = buf;
-                       cout << pid << '\t' << (tempBuf.substr(0, 10)) << endl;                                                                 
+                                               delete buf;
+                       //cout << pid << '\t' << (tempBuf.substr(0, 10)) << endl;
+                       
+                                               //parse buffer                                          
+                                               istringstream iss (tempBuf,istringstream::in);
+                                               string name, seqstring;
+                                               vector<string> seqs;
+                                       
+                                               while (iss) {
+                       
+                                                       if (m->control_pressed) { return filterString; }
+                                                       cout << "here" << endl;                 
+                                                       Sequence seq(iss); 
+                                                       cout << "here1" << endl;                        
+                                                       gobble(iss);
+                                                       cout << seq.getName() << endl;          
+                                                       if (seq.getName() != "") {
+                                                               seqs.push_back(seq.getAligned());       
+                                                       }
+                                                       
+                                               }
+                                               
+                                               for(int i=0;i<seqs.size();i++){
+                               
+                                                       if (m->control_pressed) { return filterString; }
+                       
+                                                       Sequence seq("", seqs[i]);
+                       
+                                                       if(trump != '*'){       F.doTrump(seq); }
+                                                       if(isTrue(vertical) || soft != 0){      F.getFreqs(seq);        }
+                                                       cout.flush();
+                                               
+                                                       //report progress
+                                                       if((i+1) % 100 == 0){   m->mothurOut(toString(i+1)); m->mothurOutEndLine();             }
+                                               }
+               
+                                               //report progress
+                                               if((seqs.size()) % 100 != 0){   m->mothurOut(toString(seqs.size())); m->mothurOutEndLine();             }
+
                                                //do your part
-                                               MPICreateFilter(F, tempBuf);
+                                               //MPICreateFilter(F, seqs);
                                                
-                                               vector<int> temp; temp.resize(numSeqs);
+                                               vector<int> temp; temp.resize(alignmentLength);
                                                
                                                //get the frequencies from the child processes
                                                for(int i = 0; i < ((processors-1)*5); i++) { 
                                cout << "i = " << i << endl;
-                                                       int ierr = MPI_Recv(&temp, numSeqs, MPI_INT, MPI_ANY_SOURCE, 2001, MPI_COMM_WORLD, &status); 
+                                                       int ierr = MPI_Recv(&temp, alignmentLength, MPI_INT, MPI_ANY_SOURCE, tag, MPI_COMM_WORLD, &status); 
                                                        
                                                        int receiveTag = temp[temp.size()-1];  //child process added a int to the end to indicate what letter count this is for
                                                        
@@ -355,33 +377,70 @@ string FilterSeqsCommand::createFilter() {
                                                
                                }else { //i am the child process
                                        int startPos, numLines, bufferSize;
-                               cout << "child = " << pid << endl;
-                                       ierr = MPI_Recv(&startPos, 1, MPI_INT, 0, 2001, MPI_COMM_WORLD, &status);
-                                       ierr = MPI_Recv(&numLines, 1, MPI_INT, 0, 2001, MPI_COMM_WORLD, &status);
-                                       ierr = MPI_Recv(&bufferSize, 1, MPI_INT, 0, 2001, MPI_COMM_WORLD, &status);
-                               cout << "child = " << pid << " done recv messages startpos = " << startPos << " numLines = " << numLines << " buffersize = " << bufferSize << endl;     
+                               cout << "child = " << pid << " address of Filter " << &F << " address of FilterString  " << &filterString << " address of numSeqs = " << &numSeqs << " address of soft = " << &soft<< endl;     
+                                       ierr = MPI_Recv(&startPos, 1, MPI_INT, 0, tag, MPI_COMM_WORLD, &status);
+                                       ierr = MPI_Recv(&numLines, 1, MPI_INT, 0, tag, MPI_COMM_WORLD, &status);
+                                       ierr = MPI_Recv(&bufferSize, 1, MPI_INT, 0, tag, MPI_COMM_WORLD, &status);
+                               //cout << "child = " << pid << " done recv messages startpos = " << startPos << " numLines = " << numLines << " buffersize = " << bufferSize << endl;   
                                
                                        
                                        //send freqs
                                        char* buf2 = new char(bufferSize);
                                        MPI_File_read_at( in, startPos, buf2, bufferSize, MPI_CHAR, &status);
-                               cout << pid << " done reading " << endl;
+                               cout << pid << " done reading " << &buf2 <<  endl;
                                        
                                        string tempBuf = buf2;
-                       cout << pid << '\t' << (tempBuf.substr(0, 10)) << endl;
-                                       MPICreateFilter(F, tempBuf);
+                                       delete buf2;
+               //      cout << pid << '\t' << (tempBuf.substr(0, 10)) << endl;
+                                       istringstream iss (tempBuf,istringstream::in);
+                                       
+                                       string name, seqstring;
+                                       vector<string> seqs;
+                                       
+                                       while (iss) {
+                       
+                                               if (m->control_pressed) { return filterString; }
+                                               cout << "here" << endl;                 
+                                               Sequence seq(iss); 
+                                               cout << "here1" << endl;                        
+                                               gobble(iss);
+                                               cout << seq.getName() << endl;  
+                                                       
+                                               if (seq.getName() != "") {
+                                                       seqs.push_back(seq.getAligned());       
+                                               }
+                                       }
+
+                                       for(int i=0;i<seqs.size();i++){
+                               
+                                               if (m->control_pressed) { return filterString; }
+                       
+                                               Sequence seq("", seqs[i]);
+                       
+                                               if(trump != '*'){       F.doTrump(seq); }
+                                               if(isTrue(vertical) || soft != 0){      F.getFreqs(seq);        }
+                                               cout.flush();
+                                               
+                                               //report progress
+                                               if((i+1) % 100 == 0){   m->mothurOut(toString(i+1)); m->mothurOutEndLine();             }
+                                       }
+               
+                                       //report progress
+                                       if((seqs.size()) % 100 != 0){   m->mothurOut(toString(seqs.size())); m->mothurOutEndLine();             }
+               
+                                       //MPICreateFilter(F, seqs);
                                
                                        //send my fequency counts
                                        F.a.push_back(Atag);
-                                       int ierr = MPI_Send( &F.a[0], alignmentLength, MPI_INT, 0, 2001, MPI_COMM_WORLD);
+                                       int ierr = MPI_Send( &F.a[0], alignmentLength, MPI_INT, 0, tag, MPI_COMM_WORLD);
                                        F.t.push_back(Ttag);
-                                       ierr = MPI_Send( &F.t[0], alignmentLength, MPI_INT, 0, 2001, MPI_COMM_WORLD);
+                                       ierr = MPI_Send( &F.t[0], alignmentLength, MPI_INT, 0, tag, MPI_COMM_WORLD);
                                        F.c.push_back(Ctag);
-                                       ierr = MPI_Send( &F.c[0], alignmentLength, MPI_INT, 0, 2001, MPI_COMM_WORLD);
+                                       ierr = MPI_Send( &F.c[0], alignmentLength, MPI_INT, 0, tag, MPI_COMM_WORLD);
                                        F.g.push_back(Gtag);
-                                       ierr = MPI_Send( &F.g[0], alignmentLength, MPI_INT, 0, 2001, MPI_COMM_WORLD);
+                                       ierr = MPI_Send( &F.g[0], alignmentLength, MPI_INT, 0, tag, MPI_COMM_WORLD);
                                        F.gap.push_back(Gaptag);
-                                       ierr = MPI_Send( &F.gap[0], alignmentLength, MPI_INT, 0, 2001, MPI_COMM_WORLD);
+                                       ierr = MPI_Send( &F.gap[0], alignmentLength, MPI_INT, 0, tag, MPI_COMM_WORLD);
                                        
                                        cout << "child " << pid << " done sending counts" << endl;
                                }
@@ -484,12 +543,9 @@ int FilterSeqsCommand::driverCreateFilter(Filters& F, string filename, linePair*
        }
 }
 /**************************************************************************************/
-int FilterSeqsCommand::MPICreateFilter(Filters& F, string temp) {      
+int FilterSeqsCommand::MPICreateFilter(Filters& F, vector<string>& seqStrings) {       
        try {
                
-               vector<string> seqStrings;
-               parseBuffer(temp, seqStrings);
-               
                for(int i=0;i<seqStrings.size();i++){
                                
                        if (m->control_pressed) { return 1; }
@@ -610,18 +666,24 @@ int FilterSeqsCommand::parseBuffer(string file, vector<string>& seqs) {
                
                istringstream iss (file,istringstream::in);
                string name, seqstring;
-               
+int pid;
+MPI_Comm_rank(MPI_COMM_WORLD, &pid); 
+       Sequence* seq34 = new Sequence();       
+cout << "address of new sequence " << pid << '\t' << seq34 << endl;
+cout << "address of seqStrings " << pid << '\t' << &seqs << endl;
+       
                while (iss) {
                        
                        if (m->control_pressed) { return 0; }
                cout << "here" << endl;                 
-                       Sequence seq(iss); 
+                       Sequence* seq = new Sequence(iss); 
        cout << "here1" << endl;                        
                        gobble(iss);
-       cout << seq.getName() << endl;          
-                       if (seq.getName() != "") {
-                               seqs.push_back(seq.getAligned());       
+       cout << seq->getName() << endl;         
+                       if (seq->getName() != "") {
+                               seqs.push_back(seq->getAligned());      
                        }
+                       delete seq;
                }
                
                return 0;