]> git.donarmstrong.com Git - mothur.git/blobdiff - filterseqscommand.cpp
modified mpi code to save ram by writing out every 10 seqs.
[mothur.git] / filterseqscommand.cpp
index aa38e7affa36e65ada9463813bfce76438c382fe..fa1e93e7c475b9a58cec44ab42af2464a2632c3f 100644 (file)
@@ -295,67 +295,27 @@ string FilterSeqsCommand::createFilter() {
                                                                MPI_Send(&bufferSizes[j], 1, MPI_INT, j, tag, MPI_COMM_WORLD); //how bytes for the read
                                                        }
                                                }
-                                               //cout << "done sending" << endl;
-                                               //cout << "parent = " << pid << " lines = " << lines[pid]->start << '\t' << lines[pid]->numSeqs << " size = " <<  lines.size() << endl; 
-                                                
-                               cout << "parent =  " << pid << " address of Filter " << &F << " address of FilterString  " << &filterString << " address of numSeqs = " << &numSeqs << " address of soft = " << &soft << endl;          
                                
-                                               char* buf = new char(bufferSizes[0]);
-                       //cout << pid << '\t' << bufferSizes[0] << " line 1 start pos = " << lines[1]->start   << " buffer size 0 " << bufferSizes[0] << " buffer size 1 " << bufferSizes[1] << endl;                   
+                                               char buf[bufferSizes[0]];
                                                MPI_File_read_at(in, 0, buf, bufferSizes[0], MPI_CHAR, &status);
-                                               
-               cout << pid << " done reading " << &buf <<  endl;
-                                               string tempBuf = buf;
-                                               delete buf;
-                       //cout << pid << '\t' << (tempBuf.substr(0, 10)) << endl;
-                       
-                                               //parse buffer                                          
-                                               istringstream iss (tempBuf,istringstream::in);
-                                               string name, seqstring;
-                                               vector<string> seqs;
-                                       
-                                               while (iss) {
                        
-                                                       if (m->control_pressed) { return filterString; }
-                                                       cout << "here" << endl;                 
-                                                       Sequence seq(iss); 
-                                                       cout << "here1" << endl;                        
-                                                       gobble(iss);
-                                                       cout << seq.getName() << endl;          
-                                                       if (seq.getName() != "") {
-                                                               seqs.push_back(seq.getAligned());       
-                                                       }
-                                                       
-                                               }
-                                               
-                                               for(int i=0;i<seqs.size();i++){
-                               
-                                                       if (m->control_pressed) { return filterString; }
-                       
-                                                       Sequence seq("", seqs[i]);
-                       
-                                                       if(trump != '*'){       F.doTrump(seq); }
-                                                       if(isTrue(vertical) || soft != 0){      F.getFreqs(seq);        }
-                                                       cout.flush();
-                                               
-                                                       //report progress
-                                                       if((i+1) % 100 == 0){   m->mothurOut(toString(i+1)); m->mothurOutEndLine();             }
-                                               }
-               
-                                               //report progress
-                                               if((seqs.size()) % 100 != 0){   m->mothurOut(toString(seqs.size())); m->mothurOutEndLine();             }
-
-                                               //do your part
-                                               //MPICreateFilter(F, seqs);
-                                               
-                                               vector<int> temp; temp.resize(alignmentLength);
+                                               MPICreateFilter(F, buf);
+                                                                                               
+                                               vector<int> temp; temp.resize(alignmentLength+1);
                                                
                                                //get the frequencies from the child processes
                                                for(int i = 0; i < ((processors-1)*5); i++) { 
                                cout << "i = " << i << endl;
-                                                       int ierr = MPI_Recv(&temp, alignmentLength, MPI_INT, MPI_ANY_SOURCE, tag, MPI_COMM_WORLD, &status); 
-                                                       
+                               //vector<int> trial; trial.resize(10);
+                               //cout << "trials size = " << trial.size() << endl;
+                               //int ierr = MPI_Recv(&trial[0], 10, MPI_INT, MPI_ANY_SOURCE, tag, MPI_COMM_WORLD, &status);
+                                                       int ierr = MPI_Recv(&temp[0], (alignmentLength+1), MPI_INT, MPI_ANY_SOURCE, tag, MPI_COMM_WORLD, &status); 
+                                       cout << "recieved something" << endl;           
+                       //for (int g = 0; g < trial.size(); g++) {  cout << trial[g] << '\t';  } cout << endl;
                                                        int receiveTag = temp[temp.size()-1];  //child process added a int to the end to indicate what letter count this is for
+                                       cout << "reciveve tag = " << receiveTag << endl;
+                                       for (int k = 0; k < alignmentLength; k++) {             cout << k << '\t' << temp[k] << endl;   }
+                                       cout << "done " << endl << endl;
                                                        
                                                        int sender = status.MPI_SOURCE; 
                                                        
@@ -377,72 +337,27 @@ string FilterSeqsCommand::createFilter() {
                                                
                                }else { //i am the child process
                                        int startPos, numLines, bufferSize;
-                               cout << "child = " << pid << " address of Filter " << &F << " address of FilterString  " << &filterString << " address of numSeqs = " << &numSeqs << " address of soft = " << &soft<< endl;     
                                        ierr = MPI_Recv(&startPos, 1, MPI_INT, 0, tag, MPI_COMM_WORLD, &status);
                                        ierr = MPI_Recv(&numLines, 1, MPI_INT, 0, tag, MPI_COMM_WORLD, &status);
                                        ierr = MPI_Recv(&bufferSize, 1, MPI_INT, 0, tag, MPI_COMM_WORLD, &status);
-                               //cout << "child = " << pid << " done recv messages startpos = " << startPos << " numLines = " << numLines << " buffersize = " << bufferSize << endl;   
-                               
-                                       
+                                                                       
                                        //send freqs
-                                       char* buf2 = new char(bufferSize);
+                                       char buf2[bufferSize];
                                        MPI_File_read_at( in, startPos, buf2, bufferSize, MPI_CHAR, &status);
-                               cout << pid << " done reading " << &buf2 <<  endl;
-                                       
-                                       string tempBuf = buf2;
-                                       delete buf2;
-               //      cout << pid << '\t' << (tempBuf.substr(0, 10)) << endl;
-                                       istringstream iss (tempBuf,istringstream::in);
-                                       
-                                       string name, seqstring;
-                                       vector<string> seqs;
-                                       
-                                       while (iss) {
-                       
-                                               if (m->control_pressed) { return filterString; }
-                                               cout << "here" << endl;                 
-                                               Sequence seq(iss); 
-                                               cout << "here1" << endl;                        
-                                               gobble(iss);
-                                               cout << seq.getName() << endl;  
-                                                       
-                                               if (seq.getName() != "") {
-                                                       seqs.push_back(seq.getAligned());       
-                                               }
-                                       }
-
-                                       for(int i=0;i<seqs.size();i++){
-                               
-                                               if (m->control_pressed) { return filterString; }
-                       
-                                               Sequence seq("", seqs[i]);
-                       
-                                               if(trump != '*'){       F.doTrump(seq); }
-                                               if(isTrue(vertical) || soft != 0){      F.getFreqs(seq);        }
-                                               cout.flush();
                                                
-                                               //report progress
-                                               if((i+1) % 100 == 0){   m->mothurOut(toString(i+1)); m->mothurOutEndLine();             }
-                                       }
-               
-                                       //report progress
-                                       if((seqs.size()) % 100 != 0){   m->mothurOut(toString(seqs.size())); m->mothurOutEndLine();             }
-               
-                                       //MPICreateFilter(F, seqs);
-                               
+                                       MPICreateFilter(F, buf2);
+                                       
                                        //send my fequency counts
                                        F.a.push_back(Atag);
-                                       int ierr = MPI_Send( &F.a[0], alignmentLength, MPI_INT, 0, tag, MPI_COMM_WORLD);
+                                       int ierr = MPI_Send(&(F.a[0]), (alignmentLength+1), MPI_INT, 0, tag, MPI_COMM_WORLD);
                                        F.t.push_back(Ttag);
-                                       ierr = MPI_Send( &F.t[0], alignmentLength, MPI_INT, 0, tag, MPI_COMM_WORLD);
+                                       ierr = MPI_Send (&(F.t[0]), (alignmentLength+1), MPI_INT, 0, tag, MPI_COMM_WORLD);
                                        F.c.push_back(Ctag);
-                                       ierr = MPI_Send( &F.c[0], alignmentLength, MPI_INT, 0, tag, MPI_COMM_WORLD);
+                                       ierr = MPI_Send(&(F.c[0]), (alignmentLength+1), MPI_INT, 0, tag, MPI_COMM_WORLD);
                                        F.g.push_back(Gtag);
-                                       ierr = MPI_Send( &F.g[0], alignmentLength, MPI_INT, 0, tag, MPI_COMM_WORLD);
+                                       ierr = MPI_Send(&(F.g[0]), (alignmentLength+1), MPI_INT, 0, tag, MPI_COMM_WORLD);
                                        F.gap.push_back(Gaptag);
-                                       ierr = MPI_Send( &F.gap[0], alignmentLength, MPI_INT, 0, tag, MPI_COMM_WORLD);
-                                       
-                                       cout << "child " << pid << " done sending counts" << endl;
+                                       ierr = MPI_Send(&(F.gap[0]), (alignmentLength+1), MPI_INT, 0, tag, MPI_COMM_WORLD);
                                }
                                
                                MPI_Barrier(MPI_COMM_WORLD);
@@ -482,16 +397,8 @@ string FilterSeqsCommand::createFilter() {
                        }
                }
 
-#ifdef USE_MPI
-
-//merge all frequency data and create filter string
-                                       //int pid;
-                                       //MPI_Comm_rank(MPI_COMM_WORLD, &pid); 
-                                       
-                                       //if (pid == 0) { //only one process should output to screen
-#endif
 
-       cout << "made it here" << endl; 
+       cout << "made it here, numSeqs = " << numSeqs << endl;  
                F.setNumSeqs(numSeqs);
                                
                if(isTrue(vertical) == 1)       {       F.doVertical(); }
@@ -543,14 +450,17 @@ int FilterSeqsCommand::driverCreateFilter(Filters& F, string filename, linePair*
        }
 }
 /**************************************************************************************/
-int FilterSeqsCommand::MPICreateFilter(Filters& F, vector<string>& seqStrings) {       
+int FilterSeqsCommand::MPICreateFilter(Filters& F, string input) {     
        try {
                
+               vector<string> seqStrings;
+               parseBuffer(input, seqStrings);
+               
                for(int i=0;i<seqStrings.size();i++){
                                
                        if (m->control_pressed) { return 1; }
                        
-                       Sequence seq("", seqStrings[0]);
+                       Sequence seq("", seqStrings[i]);
                        
                        if(trump != '*'){       F.doTrump(seq); }
                        if(isTrue(vertical) || soft != 0){      F.getFreqs(seq);        }
@@ -666,24 +576,16 @@ int FilterSeqsCommand::parseBuffer(string file, vector<string>& seqs) {
                
                istringstream iss (file,istringstream::in);
                string name, seqstring;
-int pid;
-MPI_Comm_rank(MPI_COMM_WORLD, &pid); 
-       Sequence* seq34 = new Sequence();       
-cout << "address of new sequence " << pid << '\t' << seq34 << endl;
-cout << "address of seqStrings " << pid << '\t' << &seqs << endl;
        
                while (iss) {
                        
                        if (m->control_pressed) { return 0; }
-               cout << "here" << endl;                 
-                       Sequence* seq = new Sequence(iss); 
-       cout << "here1" << endl;                        
-                       gobble(iss);
-       cout << seq->getName() << endl;         
-                       if (seq->getName() != "") {
-                               seqs.push_back(seq->getAligned());      
+                               
+                       Sequence seq(iss); gobble(iss);
+       
+                       if (seq.getName() != "") {
+                               seqs.push_back(seq.getAligned());       
                        }
-                       delete seq;
                }
                
                return 0;