X-Git-Url: https://git.donarmstrong.com/?a=blobdiff_plain;f=seqsummarycommand.cpp;h=7362c1b6cfe9e8b11c03ff4c69d3e32b83531942;hb=6e81846c8e5b2614f6b06643a9f558fb0e6669fa;hp=49edc36f8e7c3d94d32b7995f08c14e955cd232a;hpb=74844a60d80c6dd06e3fb02ee9b928424f9019b0;p=mothur.git diff --git a/seqsummarycommand.cpp b/seqsummarycommand.cpp index 49edc36..7362c1b 100644 --- a/seqsummarycommand.cpp +++ b/seqsummarycommand.cpp @@ -21,13 +21,13 @@ SeqSummaryCommand::SeqSummaryCommand(string option) { else { //valid paramters for this command - string Array[] = {"fasta","outputdir","inputdir"}; + string Array[] = {"fasta","processors","outputdir","inputdir"}; vector myArray (Array, Array+(sizeof(Array)/sizeof(string))); OptionParser parser(option); map parameters = parser.getParameters(); - ValidParameters validParameter; + ValidParameters validParameter("summary.seqs"); map::iterator it; //check to make sure all parameters are valid for command @@ -59,6 +59,10 @@ SeqSummaryCommand::SeqSummaryCommand(string option) { outputDir = ""; outputDir += hasPath(fastafile); //if user entered a file with a path then preserve it } + + string temp = validParameter.validFile(parameters, "processors", false); if (temp == "not found"){ temp = "1"; } + convert(temp, processors); + } } @@ -71,10 +75,10 @@ SeqSummaryCommand::SeqSummaryCommand(string option) { void SeqSummaryCommand::help(){ try { - m->mothurOut("The summary.seqs command reads a fastafile and ....\n"); - m->mothurOut("The summary.seqs command parameter is fasta and it is required.\n"); + m->mothurOut("The summary.seqs command reads a fastafile and summarizes the sequences.\n"); + m->mothurOut("The summary.seqs command parameters are fasta and processors, fasta is required.\n"); m->mothurOut("The summary.seqs command should be in the following format: \n"); - m->mothurOut("summary.seqs(fasta=yourFastaFile) \n"); + m->mothurOut("summary.seqs(fasta=yourFastaFile, processors=2) \n"); m->mothurOut("Note: No spaces between parameter labels (i.e. fasta), '=' and parameters (i.e.yourFastaFile).\n\n"); } catch(exception& e) { @@ -94,41 +98,162 @@ int SeqSummaryCommand::execute(){ if (abort == true) { return 0; } - ifstream inFASTA; - openInputFile(fastafile, inFASTA); - int numSeqs = 0; - - ofstream outSummary; string summaryFile = outputDir + getSimpleName(fastafile) + ".summary"; - openOutputFile(summaryFile, outSummary); + + int numSeqs = 0; vector startPosition; vector endPosition; vector seqLength; vector ambigBases; vector longHomoPolymer; + +#ifdef USE_MPI + int pid, numSeqsPerProcessor; + int tag = 2001; + int startTag = 1; int endTag = 2; int lengthTag = 3; int baseTag = 4; int lhomoTag = 5; + int outMode=MPI_MODE_CREATE|MPI_MODE_WRONLY; + vector MPIPos; + + MPI_Status status; + MPI_Status statusOut; + MPI_File inMPI; + MPI_File outMPI; + MPI_Comm_size(MPI_COMM_WORLD, &processors); + MPI_Comm_rank(MPI_COMM_WORLD, &pid); + + char tempFileName[1024]; + strcpy(tempFileName, fastafile.c_str()); + + char sumFileName[1024]; + strcpy(sumFileName, summaryFile.c_str()); - outSummary << "seqname\tstart\tend\tnbases\tambigs\tpolymer" << endl; + MPI_File_open(MPI_COMM_WORLD, tempFileName, MPI_MODE_RDONLY, MPI_INFO_NULL, &inMPI); //comm, filename, mode, info, filepointer + MPI_File_open(MPI_COMM_WORLD, sumFileName, outMode, MPI_INFO_NULL, &outMPI); + + if (m->control_pressed) { MPI_File_close(&inMPI); MPI_File_close(&outMPI); return 0; } + + if (pid == 0) { //you are the root process + //print header + string outputString = "seqname\tstart\tend\tnbases\tambigs\tpolymer\n"; + int length = outputString.length(); + char* buf2 = new char[length]; + memcpy(buf2, outputString.c_str(), length); + + MPI_File_write_shared(outMPI, buf2, length, MPI_CHAR, &statusOut); + delete buf2; + + MPIPos = setFilePosFasta(fastafile, numSeqs); //fills MPIPos, returns numSeqs + + for(int i = 1; i < processors; i++) { + MPI_Send(&numSeqs, 1, MPI_INT, i, tag, MPI_COMM_WORLD); + MPI_Send(&MPIPos[0], (numSeqs+1), MPI_LONG, i, tag, MPI_COMM_WORLD); + } + + //figure out how many sequences you have to do + numSeqsPerProcessor = numSeqs / processors; + int startIndex = pid * numSeqsPerProcessor; + if(pid == (processors - 1)){ numSeqsPerProcessor = numSeqs - pid * numSeqsPerProcessor; } + + //do your part + MPICreateSummary(startIndex, numSeqsPerProcessor, startPosition, endPosition, seqLength, ambigBases, longHomoPolymer, inMPI, outMPI, MPIPos); + + }else { //i am the child process + + MPI_Recv(&numSeqs, 1, MPI_INT, 0, tag, MPI_COMM_WORLD, &status); + MPIPos.resize(numSeqs+1); + MPI_Recv(&MPIPos[0], (numSeqs+1), MPI_LONG, 0, tag, MPI_COMM_WORLD, &status); + + //figure out how many sequences you have to align + numSeqsPerProcessor = numSeqs / processors; + int startIndex = pid * numSeqsPerProcessor; + if(pid == (processors - 1)){ numSeqsPerProcessor = numSeqs - pid * numSeqsPerProcessor; } + + //do your part + MPICreateSummary(startIndex, numSeqsPerProcessor, startPosition, endPosition, seqLength, ambigBases, longHomoPolymer, inMPI, outMPI, MPIPos); + } + + MPI_File_close(&inMPI); + MPI_File_close(&outMPI); + MPI_Barrier(MPI_COMM_WORLD); //make everyone wait - just in case + + if (pid == 0) { + //get the info from the child processes + for(int i = 1; i < processors; i++) { + int size; + MPI_Recv(&size, 1, MPI_INT, i, tag, MPI_COMM_WORLD, &status); - while(!inFASTA.eof()){ - Sequence current(inFASTA); - if (current.getName() != "") { - startPosition.push_back(current.getStartPos()); - endPosition.push_back(current.getEndPos()); - seqLength.push_back(current.getNumBases()); - ambigBases.push_back(current.getAmbigBases()); - longHomoPolymer.push_back(current.getLongHomoPolymer()); + vector temp; temp.resize(size+1); + + for(int j = 0; j < 5; j++) { + + MPI_Recv(&temp[0], (size+1), MPI_INT, i, 2001, MPI_COMM_WORLD, &status); + int receiveTag = temp[temp.size()-1]; //child process added a int to the end to indicate what count this is for + + if (receiveTag == startTag) { + for (int k = 0; k < size; k++) { startPosition.push_back(temp[k]); } + }else if (receiveTag == endTag) { + for (int k = 0; k < size; k++) { endPosition.push_back(temp[k]); } + }else if (receiveTag == lengthTag) { + for (int k = 0; k < size; k++) { seqLength.push_back(temp[k]); } + }else if (receiveTag == baseTag) { + for (int k = 0; k < size; k++) { ambigBases.push_back(temp[k]); } + }else if (receiveTag == lhomoTag) { + for (int k = 0; k < size; k++) { longHomoPolymer.push_back(temp[k]); } + } + } + } + + }else{ - outSummary << current.getName() << '\t'; - outSummary << current.getStartPos() << '\t' << current.getEndPos() << '\t'; - outSummary << current.getNumBases() << '\t' << current.getAmbigBases() << '\t'; - outSummary << current.getLongHomoPolymer() << endl; + //send my counts + int size = startPosition.size(); + MPI_Send(&size, 1, MPI_INT, 0, tag, MPI_COMM_WORLD); + + startPosition.push_back(startTag); + int ierr = MPI_Send(&(startPosition[0]), (size+1), MPI_INT, 0, 2001, MPI_COMM_WORLD); + endPosition.push_back(endTag); + ierr = MPI_Send (&(endPosition[0]), (size+1), MPI_INT, 0, 2001, MPI_COMM_WORLD); + seqLength.push_back(lengthTag); + ierr = MPI_Send(&(seqLength[0]), (size+1), MPI_INT, 0, 2001, MPI_COMM_WORLD); + ambigBases.push_back(baseTag); + ierr = MPI_Send(&(ambigBases[0]), (size+1), MPI_INT, 0, 2001, MPI_COMM_WORLD); + longHomoPolymer.push_back(lhomoTag); + ierr = MPI_Send(&(longHomoPolymer[0]), (size+1), MPI_INT, 0, 2001, MPI_COMM_WORLD); + } - numSeqs++; - } - gobble(inFASTA); - } - inFASTA.close(); + MPI_Barrier(MPI_COMM_WORLD); //make everyone wait - just in case +#else + vector positions = divideFile(fastafile, processors); + + for (int i = 0; i < (positions.size()-1); i++) { + lines.push_back(new linePair(positions[i], positions[(i+1)])); + } + + #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) + if(processors == 1){ + numSeqs = driverCreateSummary(startPosition, endPosition, seqLength, ambigBases, longHomoPolymer, fastafile, summaryFile, lines[0]); + }else{ + numSeqs = createProcessesCreateSummary(startPosition, endPosition, seqLength, ambigBases, longHomoPolymer, fastafile, summaryFile); + + rename((summaryFile + toString(processIDS[0]) + ".temp").c_str(), summaryFile.c_str()); + //append files + for(int i=1;icontrol_pressed) { return 0; } + #else + numSeqs = driverCreateSummary(startPosition, endPosition, seqLength, ambigBases, longHomoPolymer, fastafile, summaryFile, lines[0]); + if (m->control_pressed) { return 0; } + #endif +#endif + + #ifdef USE_MPI + if (pid == 0) { + #endif sort(startPosition.begin(), startPosition.end()); sort(endPosition.begin(), endPosition.end()); @@ -147,6 +272,8 @@ int SeqSummaryCommand::execute(){ if (startPosition[0] == -1) { startPosition[0] = 0; } if (endPosition[0] == -1) { endPosition[0] = 0; } + if (m->control_pressed) { remove(summaryFile.c_str()); return 0; } + m->mothurOutEndLine(); m->mothurOut("\t\tStart\tEnd\tNBases\tAmbigs\tPolymer"); m->mothurOutEndLine(); m->mothurOut("Minimum:\t" + toString(startPosition[0]) + "\t" + toString(endPosition[0]) + "\t" + toString(seqLength[0]) + "\t" + toString(ambigBases[0]) + "\t" + toString(longHomoPolymer[0])); m->mothurOutEndLine(); @@ -158,12 +285,16 @@ int SeqSummaryCommand::execute(){ m->mothurOut("Maximum:\t" + toString(startPosition[ptile100]) + "\t" + toString(endPosition[ptile100]) + "\t" + toString(seqLength[ptile100]) + "\t" + toString(ambigBases[ptile100]) + "\t" + toString(longHomoPolymer[ptile100])); m->mothurOutEndLine(); m->mothurOut("# of Seqs:\t" + toString(numSeqs)); m->mothurOutEndLine(); - outSummary.close(); + if (m->control_pressed) { remove(summaryFile.c_str()); return 0; } m->mothurOutEndLine(); m->mothurOut("Output File Name: "); m->mothurOutEndLine(); m->mothurOut(summaryFile); m->mothurOutEndLine(); m->mothurOutEndLine(); + + #ifdef USE_MPI + } + #endif return 0; } @@ -172,7 +303,187 @@ int SeqSummaryCommand::execute(){ exit(1); } } +/**************************************************************************************/ +int SeqSummaryCommand::driverCreateSummary(vector& startPosition, vector& endPosition, vector& seqLength, vector& ambigBases, vector& longHomoPolymer, string filename, string sumFile, linePair* filePos) { + try { + + ofstream outSummary; + openOutputFile(sumFile, outSummary); + + //print header if you are process 0 + if (filePos->start == 0) { + outSummary << "seqname\tstart\tend\tnbases\tambigs\tpolymer" << endl; + } + + ifstream in; + openInputFile(filename, in); + + in.seekg(filePos->start); + bool done = false; + int count = 0; + + while (!done) { + + if (m->control_pressed) { in.close(); outSummary.close(); return 1; } + + Sequence current(in); gobble(in); + + if (current.getName() != "") { + startPosition.push_back(current.getStartPos()); + endPosition.push_back(current.getEndPos()); + seqLength.push_back(current.getNumBases()); + ambigBases.push_back(current.getAmbigBases()); + longHomoPolymer.push_back(current.getLongHomoPolymer()); + + outSummary << current.getName() << '\t'; + outSummary << current.getStartPos() << '\t' << current.getEndPos() << '\t'; + outSummary << current.getNumBases() << '\t' << current.getAmbigBases() << '\t'; + outSummary << current.getLongHomoPolymer() << endl; + count++; + } + + unsigned long int pos = in.tellg(); + if ((pos == -1) || (pos >= filePos->end)) { break; } + + //report progress + if((count) % 100 == 0){ m->mothurOut(toString(count)); m->mothurOutEndLine(); } + } + //report progress + if((count) % 100 != 0){ m->mothurOut(toString(count)); m->mothurOutEndLine(); } + + in.close(); + + return count; + } + catch(exception& e) { + m->errorOut(e, "SeqSummaryCommand", "driverCreateSummary"); + exit(1); + } +} +#ifdef USE_MPI +/**************************************************************************************/ +int SeqSummaryCommand::MPICreateSummary(int start, int num, vector& startPosition, vector& endPosition, vector& seqLength, vector& ambigBases, vector& longHomoPolymer, MPI_File& inMPI, MPI_File& outMPI, vector& MPIPos) { + try { + + int pid; + MPI_Status status; + MPI_Comm_rank(MPI_COMM_WORLD, &pid); + + + for(int i=0;icontrol_pressed) { return 0; } + + //read next sequence + int length = MPIPos[start+i+1] - MPIPos[start+i]; + + char* buf4 = new char[length]; + MPI_File_read_at(inMPI, MPIPos[start+i], buf4, length, MPI_CHAR, &status); + + string tempBuf = buf4; + if (tempBuf.length() > length) { tempBuf = tempBuf.substr(0, length); } + istringstream iss (tempBuf,istringstream::in); + delete buf4; + + Sequence current(iss); + + if (current.getName() != "") { + startPosition.push_back(current.getStartPos()); + endPosition.push_back(current.getEndPos()); + seqLength.push_back(current.getNumBases()); + ambigBases.push_back(current.getAmbigBases()); + longHomoPolymer.push_back(current.getLongHomoPolymer()); + + string outputString = current.getName() + "\t" + toString(current.getStartPos()) + "\t" + toString(current.getEndPos()) + "\t"; + outputString += toString(current.getNumBases()) + "\t" + toString(current.getAmbigBases()) + "\t" + toString(current.getLongHomoPolymer()) + "\n"; + + //output to file + length = outputString.length(); + char* buf3 = new char[length]; + memcpy(buf3, outputString.c_str(), length); + + MPI_File_write_shared(outMPI, buf3, length, MPI_CHAR, &status); + delete buf3; + } + } + + return 0; + } + catch(exception& e) { + m->errorOut(e, "SeqSummaryCommand", "MPICreateSummary"); + exit(1); + } +} +#endif +/**************************************************************************************************/ +int SeqSummaryCommand::createProcessesCreateSummary(vector& startPosition, vector& endPosition, vector& seqLength, vector& ambigBases, vector& longHomoPolymer, string filename, string sumFile) { + try { +#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) + int process = 0; + int num = 0; + processIDS.clear(); + + //loop through and create all the processes you want + while (process != processors) { + int pid = fork(); + + if (pid > 0) { + processIDS.push_back(pid); //create map from line number to pid so you can append files in correct order later + process++; + }else if (pid == 0){ + num = driverCreateSummary(startPosition, endPosition, seqLength, ambigBases, longHomoPolymer, fastafile, sumFile + toString(getpid()) + ".temp", lines[process]); + + //pass numSeqs to parent + ofstream out; + string tempFile = toString(getpid()) + ".temp"; + openOutputFile(tempFile, out); + + out << num << endl; + for (int k = 0; k < startPosition.size(); k++) { out << startPosition[k] << '\t'; } out << endl; + for (int k = 0; k < endPosition.size(); k++) { out << endPosition[k] << '\t'; } out << endl; + for (int k = 0; k < seqLength.size(); k++) { out << seqLength[k] << '\t'; } out << endl; + for (int k = 0; k < ambigBases.size(); k++) { out << ambigBases[k] << '\t'; } out << endl; + for (int k = 0; k < longHomoPolymer.size(); k++) { out << longHomoPolymer[k] << '\t'; } out << endl; + + out.close(); + + exit(0); + }else { m->mothurOut("unable to spawn the necessary processes."); m->mothurOutEndLine(); exit(0); } + } + + //force parent to wait until all the processes are done + for (int i=0;i> tempNum; gobble(in); num += tempNum; + for (int k = 0; k < tempNum; k++) { in >> temp; startPosition.push_back(temp); } gobble(in); + for (int k = 0; k < tempNum; k++) { in >> temp; endPosition.push_back(temp); } gobble(in); + for (int k = 0; k < tempNum; k++) { in >> temp; seqLength.push_back(temp); } gobble(in); + for (int k = 0; k < tempNum; k++) { in >> temp; ambigBases.push_back(temp); } gobble(in); + for (int k = 0; k < tempNum; k++) { in >> temp; longHomoPolymer.push_back(temp); } gobble(in); + + in.close(); + remove(tempFilename.c_str()); + } + + return num; +#endif + } + catch(exception& e) { + m->errorOut(e, "SeqSummaryCommand", "createProcessesCreateSummary"); + exit(1); + } +} //***************************************************************************************************************