From: westcott Date: Thu, 29 Apr 2010 13:54:23 +0000 (+0000) Subject: paralellized summary.seqs and added mpi code to it. X-Git-Url: https://git.donarmstrong.com/?p=mothur.git;a=commitdiff_plain;h=8699fd2c88f10abda5fe32c89be061a89d673fd6 paralellized summary.seqs and added mpi code to it. --- diff --git a/commandfactory.cpp b/commandfactory.cpp index 8d78d63..32d4211 100644 --- a/commandfactory.cpp +++ b/commandfactory.cpp @@ -130,8 +130,7 @@ CommandFactory::CommandFactory(){ commands["get.rabund"] = "get.rabund"; commands["bootstrap.shared"] = "bootstrap.shared"; //commands["consensus"] = "consensus"; - commands["help"] = "help"; - commands["summary.seqs"] = "summary.seqs"; + commands["help"] = "help"; commands["reverse.seqs"] = "reverse.seqs"; commands["trim.seqs"] = "trim.seqs"; commands["list.seqs"] = "list.seqs"; @@ -163,6 +162,7 @@ CommandFactory::CommandFactory(){ commands["chimera.pintail"] = "MPIEnabled"; commands["chimera.bellerophon"] = "MPIEnabled"; commands["screen.seqs"] = "MPIEnabled"; + commands["summary.seqs"] = "MPIEnabled"; commands["quit"] = "MPIEnabled"; } diff --git a/screenseqscommand.cpp b/screenseqscommand.cpp index ab75032..f67dba7 100644 --- a/screenseqscommand.cpp +++ b/screenseqscommand.cpp @@ -134,7 +134,7 @@ ScreenSeqsCommand::ScreenSeqsCommand(string option) { void ScreenSeqsCommand::help(){ try { m->mothurOut("The screen.seqs command reads a fastafile and creates .....\n"); - m->mothurOut("The screen.seqs command parameters are fasta, start, end, maxambig, maxhomop, minlength, maxlength, name, and group.\n"); + m->mothurOut("The screen.seqs command parameters are fasta, start, end, maxambig, maxhomop, minlength, maxlength, name, group and processors.\n"); m->mothurOut("The fasta parameter is required.\n"); m->mothurOut("The start parameter .... The default is -1.\n"); m->mothurOut("The end parameter .... The default is -1.\n"); @@ -142,6 +142,7 @@ void ScreenSeqsCommand::help(){ m->mothurOut("The maxhomop parameter .... The default is -1.\n"); m->mothurOut("The minlength parameter .... The default is -1.\n"); m->mothurOut("The maxlength parameter .... The default is -1.\n"); + m->mothurOut("The processors parameter allows you to specify the number of processors to use while running the command. The default is 1.\n"); m->mothurOut("The name parameter allows you to provide a namesfile, and the group parameter allows you to provide a groupfile.\n"); m->mothurOut("The screen.seqs command should be in the following format: \n"); m->mothurOut("screen.seqs(fasta=yourFastaFile, name=youNameFile, group=yourGroupFIle, start=yourStart, end=yourEnd, maxambig=yourMaxambig, \n"); diff --git a/seqsummarycommand.cpp b/seqsummarycommand.cpp index 4514e01..f630052 100644 --- a/seqsummarycommand.cpp +++ b/seqsummarycommand.cpp @@ -21,7 +21,7 @@ SeqSummaryCommand::SeqSummaryCommand(string option) { else { //valid paramters for this command - string Array[] = {"fasta","outputdir","inputdir"}; + string Array[] = {"fasta","processors","outputdir","inputdir"}; vector myArray (Array, Array+(sizeof(Array)/sizeof(string))); OptionParser parser(option); @@ -59,6 +59,10 @@ SeqSummaryCommand::SeqSummaryCommand(string option) { outputDir = ""; outputDir += hasPath(fastafile); //if user entered a file with a path then preserve it } + + string temp = validParameter.validFile(parameters, "processors", false); if (temp == "not found"){ temp = "1"; } + convert(temp, processors); + } } @@ -71,10 +75,10 @@ SeqSummaryCommand::SeqSummaryCommand(string option) { void SeqSummaryCommand::help(){ try { - m->mothurOut("The summary.seqs command reads a fastafile and ....\n"); - m->mothurOut("The summary.seqs command parameter is fasta and it is required.\n"); + m->mothurOut("The summary.seqs command reads a fastafile and summarizes the sequences.\n"); + m->mothurOut("The summary.seqs command parameters are fasta and processors, fasta is required.\n"); m->mothurOut("The summary.seqs command should be in the following format: \n"); - m->mothurOut("summary.seqs(fasta=yourFastaFile) \n"); + m->mothurOut("summary.seqs(fasta=yourFastaFile, processors=2) \n"); m->mothurOut("Note: No spaces between parameter labels (i.e. fasta), '=' and parameters (i.e.yourFastaFile).\n\n"); } catch(exception& e) { @@ -94,43 +98,172 @@ int SeqSummaryCommand::execute(){ if (abort == true) { return 0; } - ifstream inFASTA; - openInputFile(fastafile, inFASTA); - int numSeqs = 0; - - ofstream outSummary; string summaryFile = outputDir + getSimpleName(fastafile) + ".summary"; - openOutputFile(summaryFile, outSummary); + + int numSeqs = 0; vector startPosition; vector endPosition; vector seqLength; vector ambigBases; vector longHomoPolymer; + +#ifdef USE_MPI + int pid, numSeqsPerProcessor; + int tag = 2001; + int startTag = 1; int endTag = 2; int lengthTag = 3; int baseTag = 4; int lhomoTag = 5; + int outMode=MPI_MODE_CREATE|MPI_MODE_WRONLY; + vector MPIPos; + + MPI_Status status; + MPI_Status statusOut; + MPI_File inMPI; + MPI_File outMPI; + MPI_Comm_size(MPI_COMM_WORLD, &processors); + MPI_Comm_rank(MPI_COMM_WORLD, &pid); + + char tempFileName[1024]; + strcpy(tempFileName, fastafile.c_str()); + + char sumFileName[1024]; + strcpy(sumFileName, summaryFile.c_str()); - outSummary << "seqname\tstart\tend\tnbases\tambigs\tpolymer" << endl; + MPI_File_open(MPI_COMM_WORLD, tempFileName, MPI_MODE_RDONLY, MPI_INFO_NULL, &inMPI); //comm, filename, mode, info, filepointer + MPI_File_open(MPI_COMM_WORLD, sumFileName, outMode, MPI_INFO_NULL, &outMPI); + + if (m->control_pressed) { MPI_File_close(&inMPI); MPI_File_close(&outMPI); return 0; } + + if (pid == 0) { //you are the root process + //print header + string outputString = "seqname\tstart\tend\tnbases\tambigs\tpolymer\n"; + int length = outputString.length(); + char* buf2 = new char[length]; + memcpy(buf2, outputString.c_str(), length); + + MPI_File_write_shared(outMPI, buf2, length, MPI_CHAR, &statusOut); + delete buf2; + + MPIPos = setFilePosFasta(fastafile, numSeqs); //fills MPIPos, returns numSeqs + + //send file positions to all processes + MPI_Bcast(&numSeqs, 1, MPI_INT, 0, MPI_COMM_WORLD); //send numSeqs + MPI_Bcast(&MPIPos[0], (numSeqs+1), MPI_LONG, 0, MPI_COMM_WORLD); //send file pos + + //figure out how many sequences you have to do + numSeqsPerProcessor = numSeqs / processors; + int startIndex = pid * numSeqsPerProcessor; + if(pid == (processors - 1)){ numSeqsPerProcessor = numSeqs - pid * numSeqsPerProcessor; } + + //do your part + MPICreateSummary(startIndex, numSeqsPerProcessor, startPosition, endPosition, seqLength, ambigBases, longHomoPolymer, inMPI, outMPI, MPIPos); + + if (m->control_pressed) { MPI_File_close(&inMPI); MPI_File_close(&outMPI); return 0; } + + //get the info from the child processes + for(int i = 1; i < processors; i++) { + + int size; + MPI_Recv(&size, 1, MPI_INT, i, tag, MPI_COMM_WORLD, &status); + + vector temp; temp.resize(size+1); + + for(int j = 0; j < 5; j++) { + + MPI_Recv(&temp[0], (size+1), MPI_INT, i, 2001, MPI_COMM_WORLD, &status); + int receiveTag = temp[temp.size()-1]; //child process added a int to the end to indicate what count this is for + + if (receiveTag == startTag) { + for (int k = 0; k < size; k++) { startPosition.push_back(temp[k]); } + }else if (receiveTag == endTag) { + for (int k = 0; k < size; k++) { endPosition.push_back(temp[k]); } + }else if (receiveTag == lengthTag) { + for (int k = 0; k < size; k++) { seqLength.push_back(temp[k]); } + }else if (receiveTag == baseTag) { + for (int k = 0; k < size; k++) { ambigBases.push_back(temp[k]); } + }else if (receiveTag == lhomoTag) { + for (int k = 0; k < size; k++) { longHomoPolymer.push_back(temp[k]); } + } + } + + } - while(!inFASTA.eof()){ - if (m->control_pressed) { inFASTA.close(); outSummary.close(); remove(summaryFile.c_str()); return 0; } + + }else { //i am the child process - Sequence current(inFASTA); - if (current.getName() != "") { - startPosition.push_back(current.getStartPos()); - endPosition.push_back(current.getEndPos()); - seqLength.push_back(current.getNumBases()); - ambigBases.push_back(current.getAmbigBases()); - longHomoPolymer.push_back(current.getLongHomoPolymer()); + MPI_Bcast(&numSeqs, 1, MPI_INT, 0, MPI_COMM_WORLD); //get numSeqs + MPIPos.resize(numSeqs+1); + MPI_Bcast(&MPIPos[0], (numSeqs+1), MPI_LONG, 0, MPI_COMM_WORLD); //get file positions + + //figure out how many sequences you have to align + numSeqsPerProcessor = numSeqs / processors; + int startIndex = pid * numSeqsPerProcessor; + if(pid == (processors - 1)){ numSeqsPerProcessor = numSeqs - pid * numSeqsPerProcessor; } + + //do your part + MPICreateSummary(startIndex, numSeqsPerProcessor, startPosition, endPosition, seqLength, ambigBases, longHomoPolymer, inMPI, outMPI, MPIPos); + + if (m->control_pressed) { MPI_File_close(&inMPI); MPI_File_close(&outMPI); return 0; } + + //send my counts + int size = startPosition.size(); + MPI_Send(&size, 1, MPI_INT, 0, tag, MPI_COMM_WORLD); + + startPosition.push_back(startTag); + int ierr = MPI_Send(&(startPosition[0]), (size+1), MPI_INT, 0, 2001, MPI_COMM_WORLD); + endPosition.push_back(endTag); + ierr = MPI_Send (&(endPosition[0]), (size+1), MPI_INT, 0, 2001, MPI_COMM_WORLD); + seqLength.push_back(lengthTag); + ierr = MPI_Send(&(seqLength[0]), (size+1), MPI_INT, 0, 2001, MPI_COMM_WORLD); + ambigBases.push_back(baseTag); + ierr = MPI_Send(&(ambigBases[0]), (size+1), MPI_INT, 0, 2001, MPI_COMM_WORLD); + longHomoPolymer.push_back(lhomoTag); + ierr = MPI_Send(&(longHomoPolymer[0]), (size+1), MPI_INT, 0, 2001, MPI_COMM_WORLD); + + } - outSummary << current.getName() << '\t'; - outSummary << current.getStartPos() << '\t' << current.getEndPos() << '\t'; - outSummary << current.getNumBases() << '\t' << current.getAmbigBases() << '\t'; - outSummary << current.getLongHomoPolymer() << endl; + MPI_File_close(&inMPI); + MPI_File_close(&outMPI); - numSeqs++; - } - gobble(inFASTA); - } - inFASTA.close(); +#else + #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) + if(processors == 1){ + ifstream inFASTA; + openInputFile(fastafile, inFASTA); + numSeqs=count(istreambuf_iterator(inFASTA),istreambuf_iterator(), '>'); + inFASTA.close(); + + lines.push_back(new linePair(0, numSeqs)); + + driverCreateSummary(startPosition, endPosition, seqLength, ambigBases, longHomoPolymer, fastafile, summaryFile, lines[0]); + }else{ + numSeqs = setLines(fastafile); + createProcessesCreateSummary(startPosition, endPosition, seqLength, ambigBases, longHomoPolymer, fastafile, summaryFile); + + rename((summaryFile + toString(processIDS[0]) + ".temp").c_str(), summaryFile.c_str()); + //append files + for(int i=1;icontrol_pressed) { return 0; } + #else + ifstream inFASTA; + openInputFile(fastafileNames[s], inFASTA); + numSeqs=count(istreambuf_iterator(inFASTA),istreambuf_iterator(), '>'); + inFASTA.close(); + + lines.push_back(new linePair(0, numSeqs)); + + driverCreateSummary(startPosition, endPosition, seqLength, ambigBases, longHomoPolymer, fastafile, summaryFile, lines[0]); + if (m->control_pressed) { return 0; } + #endif +#endif + + #ifdef USE_MPI + if (pid == 0) { + #endif sort(startPosition.begin(), startPosition.end()); sort(endPosition.begin(), endPosition.end()); @@ -149,7 +282,7 @@ int SeqSummaryCommand::execute(){ if (startPosition[0] == -1) { startPosition[0] = 0; } if (endPosition[0] == -1) { endPosition[0] = 0; } - if (m->control_pressed) { outSummary.close(); remove(summaryFile.c_str()); return 0; } + if (m->control_pressed) { remove(summaryFile.c_str()); return 0; } m->mothurOutEndLine(); m->mothurOut("\t\tStart\tEnd\tNBases\tAmbigs\tPolymer"); m->mothurOutEndLine(); @@ -162,14 +295,16 @@ int SeqSummaryCommand::execute(){ m->mothurOut("Maximum:\t" + toString(startPosition[ptile100]) + "\t" + toString(endPosition[ptile100]) + "\t" + toString(seqLength[ptile100]) + "\t" + toString(ambigBases[ptile100]) + "\t" + toString(longHomoPolymer[ptile100])); m->mothurOutEndLine(); m->mothurOut("# of Seqs:\t" + toString(numSeqs)); m->mothurOutEndLine(); - outSummary.close(); - if (m->control_pressed) { remove(summaryFile.c_str()); return 0; } m->mothurOutEndLine(); m->mothurOut("Output File Name: "); m->mothurOutEndLine(); m->mothurOut(summaryFile); m->mothurOutEndLine(); m->mothurOutEndLine(); + + #ifdef USE_MPI + } + #endif return 0; } @@ -178,7 +313,192 @@ int SeqSummaryCommand::execute(){ exit(1); } } +/**************************************************************************************/ +int SeqSummaryCommand::driverCreateSummary(vector& startPosition, vector& endPosition, vector& seqLength, vector& ambigBases, vector& longHomoPolymer, string filename, string sumFile, linePair* line) { + try { + + ofstream outSummary; + openOutputFile(sumFile, outSummary); + + //print header if you are process 0 + if (line->start == 0) { + outSummary << "seqname\tstart\tend\tnbases\tambigs\tpolymer" << endl; + } + + ifstream in; + openInputFile(filename, in); + + in.seekg(line->start); + + for(int i=0;inum;i++){ + + if (m->control_pressed) { in.close(); outSummary.close(); return 1; } + + Sequence current(in); + if (current.getName() != "") { + startPosition.push_back(current.getStartPos()); + endPosition.push_back(current.getEndPos()); + seqLength.push_back(current.getNumBases()); + ambigBases.push_back(current.getAmbigBases()); + longHomoPolymer.push_back(current.getLongHomoPolymer()); + + outSummary << current.getName() << '\t'; + outSummary << current.getStartPos() << '\t' << current.getEndPos() << '\t'; + outSummary << current.getNumBases() << '\t' << current.getAmbigBases() << '\t'; + outSummary << current.getLongHomoPolymer() << endl; + } + gobble(in); + } + in.close(); + + return 0; + } + catch(exception& e) { + m->errorOut(e, "SeqSummaryCommand", "driverCreateSummary"); + exit(1); + } +} +#ifdef USE_MPI +/**************************************************************************************/ +int SeqSummaryCommand::MPICreateSummary(int start, int num, vector& startPosition, vector& endPosition, vector& seqLength, vector& ambigBases, vector& longHomoPolymer, MPI_File& inMPI, MPI_File& outMPI, vector& MPIPos) { + try { + + MPI_Status status; + + for(int i=0;icontrol_pressed) { return 0; } + + //read next sequence + int length = MPIPos[start+i+1] - MPIPos[start+i]; + + char* buf4 = new char[length]; + MPI_File_read_at(inMPI, MPIPos[start+i], buf4, length, MPI_CHAR, &status); + + string tempBuf = buf4; + if (tempBuf.length() > length) { tempBuf = tempBuf.substr(0, length); } + istringstream iss (tempBuf,istringstream::in); + delete buf4; + Sequence current(iss); + + if (current.getName() != "") { + startPosition.push_back(current.getStartPos()); + endPosition.push_back(current.getEndPos()); + seqLength.push_back(current.getNumBases()); + ambigBases.push_back(current.getAmbigBases()); + longHomoPolymer.push_back(current.getLongHomoPolymer()); + + string outputString = current.getName() + "\t" + toString(current.getStartPos()) + "\t" + toString(current.getEndPos()) + "\t"; + outputString += toString(current.getNumBases()) + "\t" + toString(current.getAmbigBases()) + "\t" + toString(current.getLongHomoPolymer()) + "\n"; + + //output to file + length = outputString.length(); + char* buf3 = new char[length]; + memcpy(buf3, outputString.c_str(), length); + + MPI_File_write_shared(outMPI, buf3, length, MPI_CHAR, &status); + delete buf3; + } + } + + return 0; + } + catch(exception& e) { + m->errorOut(e, "SeqSummaryCommand", "MPICreateSummary"); + exit(1); + } +} +#endif +/**************************************************************************************************/ +int SeqSummaryCommand::createProcessesCreateSummary(vector& startPosition, vector& endPosition, vector& seqLength, vector& ambigBases, vector& longHomoPolymer, string filename, string sumFile) { + try { +#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) + int process = 0; + int exitCommand = 1; + processIDS.clear(); + + //loop through and create all the processes you want + while (process != processors) { + int pid = vfork(); + + if (pid > 0) { + processIDS.push_back(pid); //create map from line number to pid so you can append files in correct order later + process++; + }else if (pid == 0){ + driverCreateSummary(startPosition, endPosition, seqLength, ambigBases, longHomoPolymer, fastafile, sumFile + toString(getpid()) + ".temp", lines[process]); + exit(0); + }else { m->mothurOut("unable to spawn the necessary processes."); m->mothurOutEndLine(); exit(0); } + } + + //force parent to wait until all the processes are done + for (int i=0;ierrorOut(e, "SeqSummaryCommand", "createProcessesCreateSummary"); + exit(1); + } +} +/**************************************************************************************************/ + +int SeqSummaryCommand::setLines(string filename) { + try { + + vector positions; + + ifstream inFASTA; + openInputFile(filename, inFASTA); + + string input; + while(!inFASTA.eof()){ + input = getline(inFASTA); + + if (input.length() != 0) { + if(input[0] == '>'){ long int pos = inFASTA.tellg(); positions.push_back(pos - input.length() - 1); } + } + } + inFASTA.close(); + + int numFastaSeqs = positions.size(); + + FILE * pFile; + long size; + + //get num bytes in file + pFile = fopen (filename.c_str(),"rb"); + if (pFile==NULL) perror ("Error opening file"); + else{ + fseek (pFile, 0, SEEK_END); + size=ftell (pFile); + fclose (pFile); + } + + int numSeqsPerProcessor = numFastaSeqs / processors; + + for (int i = 0; i < processors; i++) { + + long int startPos = positions[ i * numSeqsPerProcessor ]; + if(i == processors - 1){ + numSeqsPerProcessor = numFastaSeqs - i * numSeqsPerProcessor; + }else{ + long int myEnd = positions[ (i+1) * numSeqsPerProcessor ]; + } + lines.push_back(new linePair(startPos, numSeqsPerProcessor)); + } + + return numFastaSeqs; + } + catch(exception& e) { + m->errorOut(e, "SeqSummaryCommand", "setLines"); + exit(1); + } +} //*************************************************************************************************************** diff --git a/seqsummarycommand.h b/seqsummarycommand.h index e6b3b87..58fbc0a 100644 --- a/seqsummarycommand.h +++ b/seqsummarycommand.h @@ -23,6 +23,24 @@ public: private: bool abort; string fastafile, outputDir; + int processors; + + struct linePair { + int start; + int num; + linePair(long int i, long int j) : start(i), num(j) {} + }; + vector lines; + vector processIDS; + + int createProcessesCreateSummary(vector&, vector&, vector&, vector&, vector&, string, string); + int driverCreateSummary(vector&, vector&, vector&, vector&, vector&, string, string, linePair*); + int setLines(string); + + #ifdef USE_MPI + int MPICreateSummary(int, int, vector&, vector&, vector&, vector&, vector&, MPI_File&, MPI_File&, vector&); + #endif + };