X-Git-Url: https://git.donarmstrong.com/?a=blobdiff_plain;f=seqsummarycommand.cpp;h=5009ff5c2eeb87e1cbe49fdd0f240355bfcaaf74;hb=8f7164a88df624fd0a8d1eddbb3d744463cc9ecb;hp=f551eddacd00dade4aff87526f0b5aca59ad2b58;hpb=4a2d841cb97fb02351022efe9d7068b1dc212bf9;p=mothur.git diff --git a/seqsummarycommand.cpp b/seqsummarycommand.cpp index f551edd..5009ff5 100644 --- a/seqsummarycommand.cpp +++ b/seqsummarycommand.cpp @@ -27,7 +27,7 @@ SeqSummaryCommand::SeqSummaryCommand(string option) { OptionParser parser(option); map parameters = parser.getParameters(); - ValidParameters validParameter; + ValidParameters validParameter("summary.seqs"); map::iterator it; //check to make sure all parameters are valid for command @@ -113,7 +113,7 @@ int SeqSummaryCommand::execute(){ int tag = 2001; int startTag = 1; int endTag = 2; int lengthTag = 3; int baseTag = 4; int lhomoTag = 5; int outMode=MPI_MODE_CREATE|MPI_MODE_WRONLY; - vector MPIPos; + vector MPIPos; MPI_Status status; MPI_Status statusOut; @@ -144,11 +144,12 @@ int SeqSummaryCommand::execute(){ delete buf2; MPIPos = setFilePosFasta(fastafile, numSeqs); //fills MPIPos, returns numSeqs + + for(int i = 1; i < processors; i++) { + MPI_Send(&numSeqs, 1, MPI_INT, i, tag, MPI_COMM_WORLD); + MPI_Send(&MPIPos[0], (numSeqs+1), MPI_LONG, i, tag, MPI_COMM_WORLD); + } - //send file positions to all processes - MPI_Bcast(&numSeqs, 1, MPI_INT, 0, MPI_COMM_WORLD); //send numSeqs - MPI_Bcast(&MPIPos[0], (numSeqs+1), MPI_LONG, 0, MPI_COMM_WORLD); //send file pos - //figure out how many sequences you have to do numSeqsPerProcessor = numSeqs / processors; int startIndex = pid * numSeqsPerProcessor; @@ -156,54 +157,55 @@ int SeqSummaryCommand::execute(){ //do your part MPICreateSummary(startIndex, numSeqsPerProcessor, startPosition, endPosition, seqLength, ambigBases, longHomoPolymer, inMPI, outMPI, MPIPos); - - if (m->control_pressed) { MPI_File_close(&inMPI); MPI_File_close(&outMPI); return 0; } - - //get the info from the child processes - for(int i = 1; i < processors; i++) { - - int size; - MPI_Recv(&size, 1, MPI_INT, i, tag, MPI_COMM_WORLD, &status); - - vector temp; temp.resize(size+1); - - for(int j = 0; j < 5; j++) { - - MPI_Recv(&temp[0], (size+1), MPI_INT, i, 2001, MPI_COMM_WORLD, &status); - int receiveTag = temp[temp.size()-1]; //child process added a int to the end to indicate what count this is for - - if (receiveTag == startTag) { - for (int k = 0; k < size; k++) { startPosition.push_back(temp[k]); } - }else if (receiveTag == endTag) { - for (int k = 0; k < size; k++) { endPosition.push_back(temp[k]); } - }else if (receiveTag == lengthTag) { - for (int k = 0; k < size; k++) { seqLength.push_back(temp[k]); } - }else if (receiveTag == baseTag) { - for (int k = 0; k < size; k++) { ambigBases.push_back(temp[k]); } - }else if (receiveTag == lhomoTag) { - for (int k = 0; k < size; k++) { longHomoPolymer.push_back(temp[k]); } - } - } - } - - }else { //i am the child process - MPI_Bcast(&numSeqs, 1, MPI_INT, 0, MPI_COMM_WORLD); //get numSeqs + MPI_Recv(&numSeqs, 1, MPI_INT, 0, tag, MPI_COMM_WORLD, &status); MPIPos.resize(numSeqs+1); - MPI_Bcast(&MPIPos[0], (numSeqs+1), MPI_LONG, 0, MPI_COMM_WORLD); //get file positions - + MPI_Recv(&MPIPos[0], (numSeqs+1), MPI_LONG, 0, tag, MPI_COMM_WORLD, &status); + //figure out how many sequences you have to align numSeqsPerProcessor = numSeqs / processors; int startIndex = pid * numSeqsPerProcessor; if(pid == (processors - 1)){ numSeqsPerProcessor = numSeqs - pid * numSeqsPerProcessor; } - + //do your part MPICreateSummary(startIndex, numSeqsPerProcessor, startPosition, endPosition, seqLength, ambigBases, longHomoPolymer, inMPI, outMPI, MPIPos); - - if (m->control_pressed) { MPI_File_close(&inMPI); MPI_File_close(&outMPI); return 0; } - + } + + MPI_File_close(&inMPI); + MPI_File_close(&outMPI); + MPI_Barrier(MPI_COMM_WORLD); //make everyone wait - just in case + + if (pid == 0) { + //get the info from the child processes + for(int i = 1; i < processors; i++) { + int size; + MPI_Recv(&size, 1, MPI_INT, i, tag, MPI_COMM_WORLD, &status); + + vector temp; temp.resize(size+1); + + for(int j = 0; j < 5; j++) { + + MPI_Recv(&temp[0], (size+1), MPI_INT, i, 2001, MPI_COMM_WORLD, &status); + int receiveTag = temp[temp.size()-1]; //child process added a int to the end to indicate what count this is for + + if (receiveTag == startTag) { + for (int k = 0; k < size; k++) { startPosition.push_back(temp[k]); } + }else if (receiveTag == endTag) { + for (int k = 0; k < size; k++) { endPosition.push_back(temp[k]); } + }else if (receiveTag == lengthTag) { + for (int k = 0; k < size; k++) { seqLength.push_back(temp[k]); } + }else if (receiveTag == baseTag) { + for (int k = 0; k < size; k++) { ambigBases.push_back(temp[k]); } + }else if (receiveTag == lhomoTag) { + for (int k = 0; k < size; k++) { longHomoPolymer.push_back(temp[k]); } + } + } + } + + }else{ + //send my counts int size = startPosition.size(); MPI_Send(&size, 1, MPI_INT, 0, tag, MPI_COMM_WORLD); @@ -218,20 +220,17 @@ int SeqSummaryCommand::execute(){ ierr = MPI_Send(&(ambigBases[0]), (size+1), MPI_INT, 0, 2001, MPI_COMM_WORLD); longHomoPolymer.push_back(lhomoTag); ierr = MPI_Send(&(longHomoPolymer[0]), (size+1), MPI_INT, 0, 2001, MPI_COMM_WORLD); - } - MPI_File_close(&inMPI); - MPI_File_close(&outMPI); - + MPI_Barrier(MPI_COMM_WORLD); //make everyone wait - just in case #else #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) if(processors == 1){ ifstream inFASTA; openInputFile(fastafile, inFASTA); - numSeqs=count(istreambuf_iterator(inFASTA),istreambuf_iterator(), '>'); - inFASTA.close(); - + getNumSeqs(inFASTA, numSeqs); + inFASTA.close(); + lines.push_back(new linePair(0, numSeqs)); driverCreateSummary(startPosition, endPosition, seqLength, ambigBases, longHomoPolymer, fastafile, summaryFile, lines[0]); @@ -251,7 +250,7 @@ int SeqSummaryCommand::execute(){ #else ifstream inFASTA; openInputFile(fastafile, inFASTA); - numSeqs=count(istreambuf_iterator(inFASTA),istreambuf_iterator(), '>'); + getNumSeqs(inFASTA, numSeqs); inFASTA.close(); lines.push_back(new linePair(0, numSeqs)); @@ -335,6 +334,7 @@ int SeqSummaryCommand::driverCreateSummary(vector& startPosition, vectorcontrol_pressed) { in.close(); outSummary.close(); return 1; } Sequence current(in); + if (current.getName() != "") { startPosition.push_back(current.getStartPos()); endPosition.push_back(current.getEndPos()); @@ -360,10 +360,13 @@ int SeqSummaryCommand::driverCreateSummary(vector& startPosition, vector& startPosition, vector& endPosition, vector& seqLength, vector& ambigBases, vector& longHomoPolymer, MPI_File& inMPI, MPI_File& outMPI, vector& MPIPos) { +int SeqSummaryCommand::MPICreateSummary(int start, int num, vector& startPosition, vector& endPosition, vector& seqLength, vector& ambigBases, vector& longHomoPolymer, MPI_File& inMPI, MPI_File& outMPI, vector& MPIPos) { try { + int pid; MPI_Status status; + MPI_Comm_rank(MPI_COMM_WORLD, &pid); + for(int i=0;i& startPo MPI_File_write_shared(outMPI, buf3, length, MPI_CHAR, &status); delete buf3; - } + } } return 0; @@ -450,7 +453,7 @@ int SeqSummaryCommand::createProcessesCreateSummary(vector& startPosition, int SeqSummaryCommand::setLines(string filename) { try { - vector positions; + vector positions; ifstream inFASTA; openInputFile(filename, inFASTA); @@ -460,7 +463,7 @@ int SeqSummaryCommand::setLines(string filename) { input = getline(inFASTA); if (input.length() != 0) { - if(input[0] == '>'){ long int pos = inFASTA.tellg(); positions.push_back(pos - input.length() - 1); } + if(input[0] == '>'){ unsigned long int pos = inFASTA.tellg(); positions.push_back(pos - input.length() - 1); } } } inFASTA.close(); @@ -468,7 +471,7 @@ int SeqSummaryCommand::setLines(string filename) { int numFastaSeqs = positions.size(); FILE * pFile; - long size; + unsigned long int size; //get num bytes in file pFile = fopen (filename.c_str(),"rb"); @@ -483,11 +486,11 @@ int SeqSummaryCommand::setLines(string filename) { for (int i = 0; i < processors; i++) { - long int startPos = positions[ i * numSeqsPerProcessor ]; + unsigned long int startPos = positions[ i * numSeqsPerProcessor ]; if(i == processors - 1){ numSeqsPerProcessor = numFastaSeqs - i * numSeqsPerProcessor; }else{ - long int myEnd = positions[ (i+1) * numSeqsPerProcessor ]; + unsigned long int myEnd = positions[ (i+1) * numSeqsPerProcessor ]; } lines.push_back(new linePair(startPos, numSeqsPerProcessor)); }