]> git.donarmstrong.com Git - mothur.git/blobdiff - filterseqscommand.cpp
changed how we break up the files on parallelized commands to avoid scanning file.
[mothur.git] / filterseqscommand.cpp
index e6e616a3a5d3f4b7a81ef1ab91a974a9c993074c..1ebac0eba5a7e7fc54d85b24dc705c0f8a8314b5 100644 (file)
@@ -338,22 +338,18 @@ int FilterSeqsCommand::filterSequences() {
                                MPI_Barrier(MPI_COMM_WORLD); //make everyone wait - just in case
                                
 #else
+                       vector<unsigned long int> positions = divideFile(fastafileNames[s], processors);
+                               
+                       for (int i = 0; i < (positions.size()-1); i++) {
+                               lines.push_back(new linePair(positions[i], positions[(i+1)]));
+                       }       
                #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux)
                                if(processors == 1){
-                                       ifstream inFASTA;
-                                       int numFastaSeqs;
-                                       openInputFile(fastafileNames[s], inFASTA);
-                                       getNumSeqs(inFASTA, numFastaSeqs);
-                                       inFASTA.close();
-                                       
-                                       lines.push_back(new linePair(0, numFastaSeqs));
-                                       
+                                       int numFastaSeqs = driverRunFilter(filter, filteredFasta, fastafileNames[s], lines[0]);
                                        numSeqs += numFastaSeqs;
-                                       
-                                       driverRunFilter(filter, filteredFasta, fastafileNames[s], lines[0]);
                                }else{
-                                       setLines(fastafileNames[s]);
-                                       createProcessesRunFilter(filter, fastafileNames[s]); 
+                                       int numFastaSeqs = createProcessesRunFilter(filter, fastafileNames[s]); 
+                                       numSeqs += numFastaSeqs;
                                
                                        rename((fastafileNames[s] + toString(processIDS[0]) + ".temp").c_str(), filteredFasta.c_str());
                                
@@ -366,17 +362,8 @@ int FilterSeqsCommand::filterSequences() {
                                
                                if (m->control_pressed) {  return 1; }
                #else
-                               ifstream inFASTA;
-                               int numFastaSeqs;
-                               openInputFile(fastafileNames[s], inFASTA);
-                               getNumSeqs(inFASTA, numFastaSeqs);
-                               inFASTA.close();
-                                       
-                               lines.push_back(new linePair(0, numFastaSeqs));
-                               
+                               numFastaSeqs = driverRunFilter(filter, filteredFasta, fastafileNames[s], lines[0]);
                                numSeqs += numFastaSeqs;
-                               
-                               driverRunFilter(filter, filteredFasta, fastafileNames[s], lines[0]);
 
                                if (m->control_pressed) {  return 1; }
                #endif
@@ -466,7 +453,7 @@ int FilterSeqsCommand::driverMPIRun(int start, int num, MPI_File& inMPI, MPI_Fil
 }
 #endif
 /**************************************************************************************/
-int FilterSeqsCommand::driverRunFilter(string F, string outputFilename, string inputFilename, linePair* line) {        
+int FilterSeqsCommand::driverRunFilter(string F, string outputFilename, string inputFilename, linePair* filePos) {     
        try {
                ofstream out;
                openOutputFile(outputFilename, out);
@@ -474,13 +461,16 @@ int FilterSeqsCommand::driverRunFilter(string F, string outputFilename, string i
                ifstream in;
                openInputFile(inputFilename, in);
                                
-               in.seekg(line->start);
-               
-               for(int i=0;i<line->num;i++){
+               in.seekg(filePos->start);
+
+               bool done = false;
+               int count = 0;
+       
+               while (!done) {
                                
                                if (m->control_pressed) { in.close(); out.close(); return 0; }
                                
-                               Sequence seq(in);
+                               Sequence seq(in); gobble(in);
                                if (seq.getName() != "") {
                                        string align = seq.getAligned();
                                        string filterSeq = "";
@@ -492,20 +482,23 @@ int FilterSeqsCommand::driverRunFilter(string F, string outputFilename, string i
                                        }
                                        
                                        out << '>' << seq.getName() << endl << filterSeq << endl;
-                               }
-                               gobble(in);
-                               
+                               count++;
+                       }
+                       
+                       unsigned long int pos = in.tellg();
+                       if ((pos == -1) || (pos >= filePos->end)) { break; }
+                       
                        //report progress
-                       if((i+1) % 100 == 0){   m->mothurOut(toString(i+1)); m->mothurOutEndLine();             }
+                       if((count) % 100 == 0){ m->mothurOut(toString(count)); m->mothurOutEndLine();           }
                }
-               
                //report progress
-               if((line->num) % 100 != 0){     m->mothurOut(toString(line->num)); m->mothurOutEndLine();               }
+               if((count) % 100 != 0){ m->mothurOut(toString(count)); m->mothurOutEndLine();           }
+               
                
                out.close();
                in.close();
                
-               return 0;
+               return count;
        }
        catch(exception& e) {
                m->errorOut(e, "FilterSeqsCommand", "driverRunFilter");
@@ -518,7 +511,7 @@ int FilterSeqsCommand::createProcessesRunFilter(string F, string filename) {
        try {
 #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux)
                int process = 0;
-               int exitCommand = 1;
+               int num = 0;
                processIDS.clear();
                
                //loop through and create all the processes you want
@@ -530,7 +523,15 @@ int FilterSeqsCommand::createProcessesRunFilter(string F, string filename) {
                                process++;
                        }else if (pid == 0){
                                string filteredFasta = filename + toString(getpid()) + ".temp";
-                               driverRunFilter(F, filteredFasta, filename, lines[process]);
+                               num = driverRunFilter(F, filteredFasta, filename, lines[process]);
+                               
+                               //pass numSeqs to parent
+                               ofstream out;
+                               string tempFile = toString(getpid()) + ".temp";
+                               openOutputFile(tempFile, out);
+                               out << num << endl;
+                               out.close();
+                               
                                exit(0);
                        }else { m->mothurOut("unable to spawn the necessary processes."); m->mothurOutEndLine(); exit(0); }
                }
@@ -539,9 +540,18 @@ int FilterSeqsCommand::createProcessesRunFilter(string F, string filename) {
                for (int i=0;i<processors;i++) { 
                        int temp = processIDS[i];
                        wait(&temp);
+               }       
+                                       
+               for (int i = 0; i < processIDS.size(); i++) {
+                       ifstream in;
+                       string tempFile =  toString(processIDS[i]) + ".temp";
+                       openInputFile(tempFile, in);
+                       if (!in.eof()) { int tempNum = 0; in >> tempNum; num += tempNum; }
+                       in.close(); remove(tempFile.c_str());
                }
+
                
-               return exitCommand;
+               return num;
 #endif         
        }
        catch(exception& e) {
@@ -636,37 +646,24 @@ string FilterSeqsCommand::createFilter() {
                                MPI_Barrier(MPI_COMM_WORLD); //make everyone wait - just in case
                                
 #else
+               vector<unsigned long int> positions = divideFile(fastafileNames[s], processors);
+                               
+               for (int i = 0; i < (positions.size()-1); i++) {
+                       lines.push_back(new linePair(positions[i], positions[(i+1)]));
+               }       
                #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux)
                                if(processors == 1){
-                                       ifstream inFASTA;
-                                       int numFastaSeqs;
-                                       openInputFile(fastafileNames[s], inFASTA);
-                                       getNumSeqs(inFASTA, numFastaSeqs);
-                                       inFASTA.close();
-                                       
+                                       int numFastaSeqs = driverCreateFilter(F, fastafileNames[s], lines[0]);
                                        numSeqs += numFastaSeqs;
-                                       
-                                       lines.push_back(new linePair(0, numFastaSeqs));
-                                       
-                                       driverCreateFilter(F, fastafileNames[s], lines[0]);
                                }else{
-                                       setLines(fastafileNames[s]);                                    
-                                       createProcessesCreateFilter(F, fastafileNames[s]); 
+                                       int numFastaSeqs = createProcessesCreateFilter(F, fastafileNames[s]); 
+                                       numSeqs += numFastaSeqs;
                                }
                                
                                if (m->control_pressed) {  return filterString; }
                #else
-                               ifstream inFASTA;
-                               int numFastaSeqs;
-                               openInputFile(fastafileNames[s], inFASTA);
-                               getNumSeqs(inFASTA, numFastaSeqs);
-                               inFASTA.close();
-                               
+                               numFastaSeqs = driverCreateFilter(F, fastafileNames[s], lines[0]);
                                numSeqs += numFastaSeqs;
-                               
-                               lines.push_back(new linePair(0, numFastaSeqs));
-                               
-                               driverCreateFilter(F, fastafileNames[s], lines[0]);
                                if (m->control_pressed) {  return filterString; }
                #endif
 #endif
@@ -765,37 +762,42 @@ string FilterSeqsCommand::createFilter() {
        }
 }
 /**************************************************************************************/
-int FilterSeqsCommand::driverCreateFilter(Filters& F, string filename, linePair* line) {       
+int FilterSeqsCommand::driverCreateFilter(Filters& F, string filename, linePair* filePos) {    
        try {
                
                ifstream in;
                openInputFile(filename, in);
                                
-               in.seekg(line->start);
-               
-               for(int i=0;i<line->num;i++){
+               in.seekg(filePos->start);
+
+               bool done = false;
+               int count = 0;
+       
+               while (!done) {
                                
                        if (m->control_pressed) { in.close(); return 1; }
                                        
-                       Sequence seq(in);
+                       Sequence seq(in); gobble(in);
                        if (seq.getName() != "") {
                                        if (seq.getAligned().length() != alignmentLength) { m->mothurOut("Sequences are not all the same length, please correct."); m->mothurOutEndLine(); m->control_pressed = true;  }
                                        
                                        if(trump != '*'){       F.doTrump(seq); }
                                        if(isTrue(vertical) || soft != 0){      F.getFreqs(seq);        }
                                        cout.flush();
+                                       count++;
                        }
                        
+                       unsigned long int pos = in.tellg();
+                       if ((pos == -1) || (pos >= filePos->end)) { break; }
+                       
                        //report progress
-                       if((i+1) % 100 == 0){   m->mothurOut(toString(i+1)); m->mothurOutEndLine();             }
+                       if((count) % 100 == 0){ m->mothurOut(toString(count)); m->mothurOutEndLine();           }
                }
-               
                //report progress
-               if((line->num) % 100 != 0){     m->mothurOut(toString(line->num)); m->mothurOutEndLine();               }
-               
+               if((count) % 100 != 0){ m->mothurOut(toString(count)); m->mothurOutEndLine();           }
                in.close();
                
-               return 0;
+               return count;
        }
        catch(exception& e) {
                m->errorOut(e, "FilterSeqsCommand", "driverCreateFilter");
@@ -855,7 +857,7 @@ int FilterSeqsCommand::createProcessesCreateFilter(Filters& F, string filename)
        try {
 #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux)
                int process = 0;
-               int exitCommand = 1;
+               int num = 0;
                processIDS.clear();
                
                //loop through and create all the processes you want
@@ -866,13 +868,14 @@ int FilterSeqsCommand::createProcessesCreateFilter(Filters& F, string filename)
                                processIDS.push_back(pid);  //create map from line number to pid so you can append files in correct order later
                                process++;
                        }else if (pid == 0){
-                               driverCreateFilter(F, filename, lines[process]);
+                               num = driverCreateFilter(F, filename, lines[process]);
                                
                                //write out filter counts to file
                                filename += toString(getpid()) + "filterValues.temp";
                                ofstream out;
                                openOutputFile(filename, out);
                                
+                               out << num << endl;
                                for (int k = 0; k < alignmentLength; k++) {             out << F.a[k] << '\t'; }  out << endl;
                                for (int k = 0; k < alignmentLength; k++) {             out << F.t[k] << '\t'; }  out << endl;
                                for (int k = 0; k < alignmentLength; k++) {             out << F.g[k] << '\t'; }  out << endl;
@@ -897,7 +900,8 @@ int FilterSeqsCommand::createProcessesCreateFilter(Filters& F, string filename)
                        ifstream in;
                        openInputFile(tempFilename, in);
                        
-                       int temp;
+                       int temp, tempNum;
+                       in >> tempNum; gobble(in); num += tempNum;
                        for (int k = 0; k < alignmentLength; k++) {             in >> temp; F.a[k] += temp; }           gobble(in);
                        for (int k = 0; k < alignmentLength; k++) {             in >> temp; F.t[k] += temp; }           gobble(in);
                        for (int k = 0; k < alignmentLength; k++) {             in >> temp; F.g[k] += temp; }           gobble(in);
@@ -908,7 +912,7 @@ int FilterSeqsCommand::createProcessesCreateFilter(Filters& F, string filename)
                        remove(tempFilename.c_str());
                }
                
-               return exitCommand;
+               return num;
 #endif         
        }
        catch(exception& e) {
@@ -916,63 +920,4 @@ int FilterSeqsCommand::createProcessesCreateFilter(Filters& F, string filename)
                exit(1);
        }
 }
-/**************************************************************************************************/
-
-int FilterSeqsCommand::setLines(string filename) {
-       try {
-               
-               vector<unsigned long int> positions;
-               bufferSizes.clear();
-               
-               ifstream inFASTA;
-               openInputFile(filename, inFASTA);
-                       
-               string input;
-               while(!inFASTA.eof()){  
-                       input = getline(inFASTA);
-
-                       if (input.length() != 0) {
-                               if(input[0] == '>'){ unsigned long int pos = inFASTA.tellg(); positions.push_back(pos - input.length() - 1);    }
-                       }
-               }
-               inFASTA.close();
-               
-               int numFastaSeqs = positions.size();
-       
-               FILE * pFile;
-               unsigned long int size;
-               
-               //get num bytes in file
-               pFile = fopen (filename.c_str(),"rb");
-               if (pFile==NULL) perror ("Error opening file");
-               else{
-                       fseek (pFile, 0, SEEK_END);
-                       size=ftell (pFile);
-                       fclose (pFile);
-               }
-       
-               numSeqs += numFastaSeqs;
-               
-               int numSeqsPerProcessor = numFastaSeqs / processors;
-               
-               for (int i = 0; i < processors; i++) {
-
-                       unsigned long int startPos = positions[ i * numSeqsPerProcessor ];
-                       if(i == processors - 1){
-                               numSeqsPerProcessor = numFastaSeqs - i * numSeqsPerProcessor;
-                               bufferSizes.push_back(size - startPos);
-                       }else{  
-                               unsigned long int myEnd = positions[ (i+1) * numSeqsPerProcessor ];
-                               bufferSizes.push_back(myEnd-startPos);
-                       }
-                       lines.push_back(new linePair(startPos, numSeqsPerProcessor));
-               }
-               
-               return 0;
-       }
-       catch(exception& e) {
-               m->errorOut(e, "FilterSeqsCommand", "setLines");
-               exit(1);
-       }
-}
 /**************************************************************************************/