]> git.donarmstrong.com Git - mothur.git/blobdiff - seqsummarycommand.cpp
moved utilities out of mothur.h and into mothurOut class.
[mothur.git] / seqsummarycommand.cpp
index f551eddacd00dade4aff87526f0b5aca59ad2b58..5dbcdec9cb90f2082912017064f876fd6210e464 100644 (file)
@@ -27,7 +27,7 @@ SeqSummaryCommand::SeqSummaryCommand(string option)  {
                        OptionParser parser(option);
                        map<string,string> parameters = parser.getParameters();
                        
-                       ValidParameters validParameter;
+                       ValidParameters validParameter("summary.seqs");
                        map<string,string>::iterator it;
                        
                        //check to make sure all parameters are valid for command
@@ -43,7 +43,7 @@ SeqSummaryCommand::SeqSummaryCommand(string option)  {
                                it = parameters.find("fasta");
                                //user has given a template file
                                if(it != parameters.end()){ 
-                                       path = hasPath(it->second);
+                                       path = m->hasPath(it->second);
                                        //if the user has not given a path then, add inputdir. else leave path alone.
                                        if (path == "") {       parameters["fasta"] = inputDir + it->second;            }
                                }
@@ -57,7 +57,7 @@ SeqSummaryCommand::SeqSummaryCommand(string option)  {
                        //if the user changes the output directory command factory will send this info to us in the output parameter 
                        outputDir = validParameter.validFile(parameters, "outputdir", false);           if (outputDir == "not found"){  
                                outputDir = ""; 
-                               outputDir += hasPath(fastafile); //if user entered a file with a path then preserve it  
+                               outputDir += m->hasPath(fastafile); //if user entered a file with a path then preserve it       
                        }
                        
                        string temp = validParameter.validFile(parameters, "processors", false);        if (temp == "not found"){       temp = "1";                             }
@@ -98,7 +98,7 @@ int SeqSummaryCommand::execute(){
                
                if (abort == true) { return 0; }
                
-               string summaryFile = outputDir + getSimpleName(fastafile) + ".summary";
+               string summaryFile = outputDir + m->getSimpleName(fastafile) + ".summary";
                                
                int numSeqs = 0;
                
@@ -113,7 +113,7 @@ int SeqSummaryCommand::execute(){
                                int tag = 2001;
                                int startTag = 1; int endTag = 2; int lengthTag = 3; int baseTag = 4; int lhomoTag = 5;
                                int outMode=MPI_MODE_CREATE|MPI_MODE_WRONLY; 
-                               vector<long> MPIPos;
+                               vector<unsigned long int> MPIPos;
                                
                                MPI_Status status; 
                                MPI_Status statusOut;
@@ -143,12 +143,13 @@ int SeqSummaryCommand::execute(){
                                                MPI_File_write_shared(outMPI, buf2, length, MPI_CHAR, &statusOut);
                                                delete buf2;
                                                
-                                               MPIPos = setFilePosFasta(fastafile, numSeqs); //fills MPIPos, returns numSeqs
+                                               MPIPos = m->setFilePosFasta(fastafile, numSeqs); //fills MPIPos, returns numSeqs
+                                       
+                                               for(int i = 1; i < processors; i++) { 
+                                                       MPI_Send(&numSeqs, 1, MPI_INT, i, tag, MPI_COMM_WORLD);
+                                                       MPI_Send(&MPIPos[0], (numSeqs+1), MPI_LONG, i, tag, MPI_COMM_WORLD);
+                                               }
                                                
-                                               //send file positions to all processes
-                                               MPI_Bcast(&numSeqs, 1, MPI_INT, 0, MPI_COMM_WORLD);  //send numSeqs
-                                               MPI_Bcast(&MPIPos[0], (numSeqs+1), MPI_LONG, 0, MPI_COMM_WORLD); //send file pos        
-                                                               
                                                //figure out how many sequences you have to do
                                                numSeqsPerProcessor = numSeqs / processors;
                                                int startIndex =  pid * numSeqsPerProcessor;
@@ -156,54 +157,55 @@ int SeqSummaryCommand::execute(){
                                                
                                                //do your part
                                                MPICreateSummary(startIndex, numSeqsPerProcessor, startPosition, endPosition, seqLength, ambigBases, longHomoPolymer, inMPI, outMPI, MPIPos);
-                                       
-                                               if (m->control_pressed) {  MPI_File_close(&inMPI); MPI_File_close(&outMPI);  return 0;  }
-                                               
-                                               //get the info from the child processes
-                                               for(int i = 1; i < processors; i++) { 
-                                                       
-                                                       int size;
-                                                       MPI_Recv(&size, 1, MPI_INT, i, tag, MPI_COMM_WORLD, &status);
-
-                                                       vector<int> temp; temp.resize(size+1);
-                                                       
-                                                       for(int j = 0; j < 5; j++) { 
-                                                       
-                                                               MPI_Recv(&temp[0], (size+1), MPI_INT, i, 2001, MPI_COMM_WORLD, &status); 
-                                                               int receiveTag = temp[temp.size()-1];  //child process added a int to the end to indicate what count this is for
-                                                               
-                                                               if (receiveTag == startTag) { 
-                                                                       for (int k = 0; k < size; k++) {                startPosition.push_back(temp[k]);       }
-                                                               }else if (receiveTag == endTag) { 
-                                                                       for (int k = 0; k < size; k++) {                endPosition.push_back(temp[k]); }
-                                                               }else if (receiveTag == lengthTag) { 
-                                                                       for (int k = 0; k < size; k++) {                seqLength.push_back(temp[k]);   }
-                                                               }else if (receiveTag == baseTag) { 
-                                                                       for (int k = 0; k < size; k++) {                ambigBases.push_back(temp[k]);  }
-                                                               }else if (receiveTag == lhomoTag) { 
-                                                                       for (int k = 0; k < size; k++) {                longHomoPolymer.push_back(temp[k]);     }
-                                                               }
-                                                       } 
                                                
-                                               }
-
-                                                                                               
                                }else { //i am the child process
                        
-                                       MPI_Bcast(&numSeqs, 1, MPI_INT, 0, MPI_COMM_WORLD); //get numSeqs
+                                       MPI_Recv(&numSeqs, 1, MPI_INT, 0, tag, MPI_COMM_WORLD, &status);
                                        MPIPos.resize(numSeqs+1);
-                                       MPI_Bcast(&MPIPos[0], (numSeqs+1), MPI_LONG, 0, MPI_COMM_WORLD); //get file positions
-                                       
+                                       MPI_Recv(&MPIPos[0], (numSeqs+1), MPI_LONG, 0, tag, MPI_COMM_WORLD, &status);
+                               
                                        //figure out how many sequences you have to align
                                        numSeqsPerProcessor = numSeqs / processors;
                                        int startIndex =  pid * numSeqsPerProcessor;
                                        if(pid == (processors - 1)){    numSeqsPerProcessor = numSeqs - pid * numSeqsPerProcessor;      }
-                                       
+                               
                                        //do your part
                                        MPICreateSummary(startIndex, numSeqsPerProcessor, startPosition, endPosition, seqLength, ambigBases, longHomoPolymer, inMPI, outMPI, MPIPos);
-                                       
-                                       if (m->control_pressed) {  MPI_File_close(&inMPI);  MPI_File_close(&outMPI); return 0;  }
-                                       
+                               }
+                               
+                               MPI_File_close(&inMPI);
+                               MPI_File_close(&outMPI);
+                               MPI_Barrier(MPI_COMM_WORLD); //make everyone wait - just in case
+                               
+                               if (pid == 0) {
+                                       //get the info from the child processes
+                                       for(int i = 1; i < processors; i++) { 
+                                               int size;
+                                               MPI_Recv(&size, 1, MPI_INT, i, tag, MPI_COMM_WORLD, &status);
+
+                                               vector<int> temp; temp.resize(size+1);
+                                               
+                                               for(int j = 0; j < 5; j++) { 
+                                               
+                                                       MPI_Recv(&temp[0], (size+1), MPI_INT, i, 2001, MPI_COMM_WORLD, &status); 
+                                                       int receiveTag = temp[temp.size()-1];  //child process added a int to the end to indicate what count this is for
+                                                       
+                                                       if (receiveTag == startTag) { 
+                                                               for (int k = 0; k < size; k++) {                startPosition.push_back(temp[k]);       }
+                                                       }else if (receiveTag == endTag) { 
+                                                               for (int k = 0; k < size; k++) {                endPosition.push_back(temp[k]); }
+                                                       }else if (receiveTag == lengthTag) { 
+                                                               for (int k = 0; k < size; k++) {                seqLength.push_back(temp[k]);   }
+                                                       }else if (receiveTag == baseTag) { 
+                                                               for (int k = 0; k < size; k++) {                ambigBases.push_back(temp[k]);  }
+                                                       }else if (receiveTag == lhomoTag) { 
+                                                               for (int k = 0; k < size; k++) {                longHomoPolymer.push_back(temp[k]);     }
+                                                       }
+                                               } 
+                                       }
+
+                               }else{
+                               
                                        //send my counts
                                        int size = startPosition.size();
                                        MPI_Send(&size, 1, MPI_INT, 0, tag, MPI_COMM_WORLD);
@@ -218,45 +220,33 @@ int SeqSummaryCommand::execute(){
                                        ierr = MPI_Send(&(ambigBases[0]), (size+1), MPI_INT, 0, 2001, MPI_COMM_WORLD);
                                        longHomoPolymer.push_back(lhomoTag);
                                        ierr = MPI_Send(&(longHomoPolymer[0]), (size+1), MPI_INT, 0, 2001, MPI_COMM_WORLD);
-
                                }
                                
-                               MPI_File_close(&inMPI);
-                               MPI_File_close(&outMPI);
-                               
+                               MPI_Barrier(MPI_COMM_WORLD); //make everyone wait - just in case
 #else
+                       vector<unsigned long int> positions = m->divideFile(fastafile, processors);
+                               
+                       for (int i = 0; i < (positions.size()-1); i++) {
+                               lines.push_back(new linePair(positions[i], positions[(i+1)]));
+                       }       
+
                #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux)
                                if(processors == 1){
-                                       ifstream inFASTA;
-                                       openInputFile(fastafile, inFASTA);
-                                       numSeqs=count(istreambuf_iterator<char>(inFASTA),istreambuf_iterator<char>(), '>');
-                                       inFASTA.close();
-                                       
-                                       lines.push_back(new linePair(0, numSeqs));
-                                       
-                                       driverCreateSummary(startPosition, endPosition, seqLength, ambigBases, longHomoPolymer, fastafile, summaryFile, lines[0]);
+                                       numSeqs = driverCreateSummary(startPosition, endPosition, seqLength, ambigBases, longHomoPolymer, fastafile, summaryFile, lines[0]);
                                }else{
-                                       numSeqs = setLines(fastafile);                                  
-                                       createProcessesCreateSummary(startPosition, endPosition, seqLength, ambigBases, longHomoPolymer, fastafile, summaryFile); 
+                                       numSeqs = createProcessesCreateSummary(startPosition, endPosition, seqLength, ambigBases, longHomoPolymer, fastafile, summaryFile); 
                                        
                                        rename((summaryFile + toString(processIDS[0]) + ".temp").c_str(), summaryFile.c_str());
                                        //append files
                                        for(int i=1;i<processors;i++){
-                                               appendFiles((summaryFile + toString(processIDS[i]) + ".temp"), summaryFile);
+                                               m->appendFiles((summaryFile + toString(processIDS[i]) + ".temp"), summaryFile);
                                                remove((summaryFile + toString(processIDS[i]) + ".temp").c_str());
                                        }
                                }
                                
                                if (m->control_pressed) {  return 0; }
                #else
-                               ifstream inFASTA;
-                               openInputFile(fastafile, inFASTA);
-                               numSeqs=count(istreambuf_iterator<char>(inFASTA),istreambuf_iterator<char>(), '>');
-                               inFASTA.close();
-                               
-                               lines.push_back(new linePair(0, numSeqs));
-                               
-                               driverCreateSummary(startPosition, endPosition, seqLength, ambigBases, longHomoPolymer, fastafile, summaryFile, lines[0]);
+                               numSeqs = driverCreateSummary(startPosition, endPosition, seqLength, ambigBases, longHomoPolymer, fastafile, summaryFile, lines[0]);
                                if (m->control_pressed) {  return 0; }
                #endif
 #endif
@@ -314,27 +304,31 @@ int SeqSummaryCommand::execute(){
        }
 }
 /**************************************************************************************/
-int SeqSummaryCommand::driverCreateSummary(vector<int>& startPosition, vector<int>& endPosition, vector<int>& seqLength, vector<int>& ambigBases, vector<int>& longHomoPolymer, string filename, string sumFile, linePair* line) {     
+int SeqSummaryCommand::driverCreateSummary(vector<int>& startPosition, vector<int>& endPosition, vector<int>& seqLength, vector<int>& ambigBases, vector<int>& longHomoPolymer, string filename, string sumFile, linePair* filePos) {  
        try {
                
                ofstream outSummary;
-               openOutputFile(sumFile, outSummary);
+               m->openOutputFile(sumFile, outSummary);
                
                //print header if you are process 0
-               if (line->start == 0) {
+               if (filePos->start == 0) {
                        outSummary << "seqname\tstart\tend\tnbases\tambigs\tpolymer" << endl;   
                }
                                
                ifstream in;
-               openInputFile(filename, in);
+               m->openInputFile(filename, in);
                                
-               in.seekg(line->start);
-               
-               for(int i=0;i<line->num;i++){
+               in.seekg(filePos->start);
+
+               bool done = false;
+               int count = 0;
+       
+               while (!done) {
                                
                        if (m->control_pressed) { in.close(); outSummary.close(); return 1; }
                                        
-                       Sequence current(in);
+                       Sequence current(in); m->gobble(in);
+       
                        if (current.getName() != "") {
                                startPosition.push_back(current.getStartPos());
                                endPosition.push_back(current.getEndPos());
@@ -346,12 +340,21 @@ int SeqSummaryCommand::driverCreateSummary(vector<int>& startPosition, vector<in
                                outSummary << current.getStartPos() << '\t' << current.getEndPos() << '\t';
                                outSummary << current.getNumBases() << '\t' << current.getAmbigBases() << '\t';
                                outSummary << current.getLongHomoPolymer() << endl;
+                               count++;
                        }
-                       gobble(in);
+                       
+                       unsigned long int pos = in.tellg();
+                       if ((pos == -1) || (pos >= filePos->end)) { break; }
+                       
+                       //report progress
+                       if((count) % 100 == 0){ m->mothurOut(toString(count)); m->mothurOutEndLine();           }
                }
+               //report progress
+               if((count) % 100 != 0){ m->mothurOut(toString(count)); m->mothurOutEndLine();           }
+               
                in.close();
                
-               return 0;
+               return count;
        }
        catch(exception& e) {
                m->errorOut(e, "SeqSummaryCommand", "driverCreateSummary");
@@ -360,10 +363,13 @@ int SeqSummaryCommand::driverCreateSummary(vector<int>& startPosition, vector<in
 }
 #ifdef USE_MPI
 /**************************************************************************************/
-int SeqSummaryCommand::MPICreateSummary(int start, int num, vector<int>& startPosition, vector<int>& endPosition, vector<int>& seqLength, vector<int>& ambigBases, vector<int>& longHomoPolymer, MPI_File& inMPI, MPI_File& outMPI, vector<long>& MPIPos) {    
+int SeqSummaryCommand::MPICreateSummary(int start, int num, vector<int>& startPosition, vector<int>& endPosition, vector<int>& seqLength, vector<int>& ambigBases, vector<int>& longHomoPolymer, MPI_File& inMPI, MPI_File& outMPI, vector<unsigned long int>& MPIPos) {       
        try {
                
+               int pid;
                MPI_Status status; 
+               MPI_Comm_rank(MPI_COMM_WORLD, &pid); 
+
                
                for(int i=0;i<num;i++){
                        
@@ -399,7 +405,7 @@ int SeqSummaryCommand::MPICreateSummary(int start, int num, vector<int>& startPo
                                        
                                MPI_File_write_shared(outMPI, buf3, length, MPI_CHAR, &status);
                                delete buf3;
-                       }                                               
+                       }       
                }
                
                return 0;
@@ -415,18 +421,33 @@ int SeqSummaryCommand::createProcessesCreateSummary(vector<int>& startPosition,
        try {
 #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux)
                int process = 0;
-               int exitCommand = 1;
+               int num = 0;
                processIDS.clear();
                
                //loop through and create all the processes you want
                while (process != processors) {
-                       int pid = vfork();
+                       int pid = fork();
                        
                        if (pid > 0) {
                                processIDS.push_back(pid);  //create map from line number to pid so you can append files in correct order later
                                process++;
                        }else if (pid == 0){
-                               driverCreateSummary(startPosition, endPosition, seqLength, ambigBases, longHomoPolymer, fastafile, sumFile + toString(getpid()) + ".temp", lines[process]);
+                               num = driverCreateSummary(startPosition, endPosition, seqLength, ambigBases, longHomoPolymer, fastafile, sumFile + toString(getpid()) + ".temp", lines[process]);
+                               
+                               //pass numSeqs to parent
+                               ofstream out;
+                               string tempFile = fastafile + toString(getpid()) + ".num.temp";
+                               m->openOutputFile(tempFile, out);
+                               
+                               out << num << endl;
+                               for (int k = 0; k < startPosition.size(); k++)          {               out << startPosition[k] << '\t'; }  out << endl;
+                               for (int k = 0; k < endPosition.size(); k++)            {               out << endPosition[k] << '\t'; }  out << endl;
+                               for (int k = 0; k < seqLength.size(); k++)                      {               out << seqLength[k] << '\t'; }  out << endl;
+                               for (int k = 0; k < ambigBases.size(); k++)                     {               out << ambigBases[k] << '\t'; }  out << endl;
+                               for (int k = 0; k < longHomoPolymer.size(); k++)        {               out << longHomoPolymer[k] << '\t'; }  out << endl;
+                               
+                               out.close();
+                               
                                exit(0);
                        }else { m->mothurOut("unable to spawn the necessary processes."); m->mothurOutEndLine(); exit(0); }
                }
@@ -437,65 +458,29 @@ int SeqSummaryCommand::createProcessesCreateSummary(vector<int>& startPosition,
                        wait(&temp);
                }
                
-               return exitCommand;
-#endif         
-       }
-       catch(exception& e) {
-               m->errorOut(e, "SeqSummaryCommand", "createProcessesCreateSummary");
-               exit(1);
-       }
-}
-/**************************************************************************************************/
-
-int SeqSummaryCommand::setLines(string filename) {
-       try {
-               
-               vector<long int> positions;
-               
-               ifstream inFASTA;
-               openInputFile(filename, inFASTA);
+               //parent reads in and combine Filter info
+               for (int i = 0; i < processIDS.size(); i++) {
+                       string tempFilename = fastafile + toString(processIDS[i]) + ".num.temp";
+                       ifstream in;
+                       m->openInputFile(tempFilename, in);
                        
-               string input;
-               while(!inFASTA.eof()){  
-                       input = getline(inFASTA);
-
-                       if (input.length() != 0) {
-                               if(input[0] == '>'){ long int pos = inFASTA.tellg(); positions.push_back(pos - input.length() - 1);     }
-                       }
-               }
-               inFASTA.close();
-               
-               int numFastaSeqs = positions.size();
-       
-               FILE * pFile;
-               long size;
-               
-               //get num bytes in file
-               pFile = fopen (filename.c_str(),"rb");
-               if (pFile==NULL) perror ("Error opening file");
-               else{
-                       fseek (pFile, 0, SEEK_END);
-                       size=ftell (pFile);
-                       fclose (pFile);
-               }
-               
-               int numSeqsPerProcessor = numFastaSeqs / processors;
-               
-               for (int i = 0; i < processors; i++) {
-
-                       long int startPos = positions[ i * numSeqsPerProcessor ];
-                       if(i == processors - 1){
-                               numSeqsPerProcessor = numFastaSeqs - i * numSeqsPerProcessor;
-                       }else{  
-                               long int myEnd = positions[ (i+1) * numSeqsPerProcessor ];
-                       }
-                       lines.push_back(new linePair(startPos, numSeqsPerProcessor));
+                       int temp, tempNum;
+                       in >> tempNum; m->gobble(in); num += tempNum;
+                       for (int k = 0; k < tempNum; k++)                       {               in >> temp; startPosition.push_back(temp);              }               m->gobble(in);
+                       for (int k = 0; k < tempNum; k++)                       {               in >> temp; endPosition.push_back(temp);                }               m->gobble(in);
+                       for (int k = 0; k < tempNum; k++)                       {               in >> temp; seqLength.push_back(temp);                  }               m->gobble(in);
+                       for (int k = 0; k < tempNum; k++)                       {               in >> temp; ambigBases.push_back(temp);                 }               m->gobble(in);
+                       for (int k = 0; k < tempNum; k++)                       {               in >> temp; longHomoPolymer.push_back(temp);    }               m->gobble(in);
+                               
+                       in.close();
+                       remove(tempFilename.c_str());
                }
                
-               return numFastaSeqs;
+               return num;
+#endif         
        }
        catch(exception& e) {
-               m->errorOut(e, "SeqSummaryCommand", "setLines");
+               m->errorOut(e, "SeqSummaryCommand", "createProcessesCreateSummary");
                exit(1);
        }
 }