X-Git-Url: https://git.donarmstrong.com/?a=blobdiff_plain;f=seqsummarycommand.cpp;h=66405394859d182134884e4141b53d377c7ae245;hb=d01397212a287495d3d0ec00beea93759bf25402;hp=f630052dd58878cf75b80226b4fa2e0d118fed62;hpb=8699fd2c88f10abda5fe32c89be061a89d673fd6;p=mothur.git diff --git a/seqsummarycommand.cpp b/seqsummarycommand.cpp index f630052..6640539 100644 --- a/seqsummarycommand.cpp +++ b/seqsummarycommand.cpp @@ -144,11 +144,12 @@ int SeqSummaryCommand::execute(){ delete buf2; MPIPos = setFilePosFasta(fastafile, numSeqs); //fills MPIPos, returns numSeqs + + for(int i = 1; i < processors; i++) { + MPI_Send(&numSeqs, 1, MPI_INT, i, tag, MPI_COMM_WORLD); + MPI_Send(&MPIPos[0], (numSeqs+1), MPI_LONG, i, tag, MPI_COMM_WORLD); + } - //send file positions to all processes - MPI_Bcast(&numSeqs, 1, MPI_INT, 0, MPI_COMM_WORLD); //send numSeqs - MPI_Bcast(&MPIPos[0], (numSeqs+1), MPI_LONG, 0, MPI_COMM_WORLD); //send file pos - //figure out how many sequences you have to do numSeqsPerProcessor = numSeqs / processors; int startIndex = pid * numSeqsPerProcessor; @@ -156,54 +157,55 @@ int SeqSummaryCommand::execute(){ //do your part MPICreateSummary(startIndex, numSeqsPerProcessor, startPosition, endPosition, seqLength, ambigBases, longHomoPolymer, inMPI, outMPI, MPIPos); - - if (m->control_pressed) { MPI_File_close(&inMPI); MPI_File_close(&outMPI); return 0; } - - //get the info from the child processes - for(int i = 1; i < processors; i++) { - - int size; - MPI_Recv(&size, 1, MPI_INT, i, tag, MPI_COMM_WORLD, &status); - - vector temp; temp.resize(size+1); - - for(int j = 0; j < 5; j++) { - - MPI_Recv(&temp[0], (size+1), MPI_INT, i, 2001, MPI_COMM_WORLD, &status); - int receiveTag = temp[temp.size()-1]; //child process added a int to the end to indicate what count this is for - - if (receiveTag == startTag) { - for (int k = 0; k < size; k++) { startPosition.push_back(temp[k]); } - }else if (receiveTag == endTag) { - for (int k = 0; k < size; k++) { endPosition.push_back(temp[k]); } - }else if (receiveTag == lengthTag) { - for (int k = 0; k < size; k++) { seqLength.push_back(temp[k]); } - }else if (receiveTag == baseTag) { - for (int k = 0; k < size; k++) { ambigBases.push_back(temp[k]); } - }else if (receiveTag == lhomoTag) { - for (int k = 0; k < size; k++) { longHomoPolymer.push_back(temp[k]); } - } - } - } - - }else { //i am the child process - MPI_Bcast(&numSeqs, 1, MPI_INT, 0, MPI_COMM_WORLD); //get numSeqs + MPI_Recv(&numSeqs, 1, MPI_INT, 0, tag, MPI_COMM_WORLD, &status); MPIPos.resize(numSeqs+1); - MPI_Bcast(&MPIPos[0], (numSeqs+1), MPI_LONG, 0, MPI_COMM_WORLD); //get file positions - + MPI_Recv(&MPIPos[0], (numSeqs+1), MPI_LONG, 0, tag, MPI_COMM_WORLD, &status); + //figure out how many sequences you have to align numSeqsPerProcessor = numSeqs / processors; int startIndex = pid * numSeqsPerProcessor; if(pid == (processors - 1)){ numSeqsPerProcessor = numSeqs - pid * numSeqsPerProcessor; } - + //do your part MPICreateSummary(startIndex, numSeqsPerProcessor, startPosition, endPosition, seqLength, ambigBases, longHomoPolymer, inMPI, outMPI, MPIPos); - - if (m->control_pressed) { MPI_File_close(&inMPI); MPI_File_close(&outMPI); return 0; } - + } + + MPI_File_close(&inMPI); + MPI_File_close(&outMPI); + MPI_Barrier(MPI_COMM_WORLD); //make everyone wait - just in case + + if (pid == 0) { + //get the info from the child processes + for(int i = 1; i < processors; i++) { + int size; + MPI_Recv(&size, 1, MPI_INT, i, tag, MPI_COMM_WORLD, &status); + + vector temp; temp.resize(size+1); + + for(int j = 0; j < 5; j++) { + + MPI_Recv(&temp[0], (size+1), MPI_INT, i, 2001, MPI_COMM_WORLD, &status); + int receiveTag = temp[temp.size()-1]; //child process added a int to the end to indicate what count this is for + + if (receiveTag == startTag) { + for (int k = 0; k < size; k++) { startPosition.push_back(temp[k]); } + }else if (receiveTag == endTag) { + for (int k = 0; k < size; k++) { endPosition.push_back(temp[k]); } + }else if (receiveTag == lengthTag) { + for (int k = 0; k < size; k++) { seqLength.push_back(temp[k]); } + }else if (receiveTag == baseTag) { + for (int k = 0; k < size; k++) { ambigBases.push_back(temp[k]); } + }else if (receiveTag == lhomoTag) { + for (int k = 0; k < size; k++) { longHomoPolymer.push_back(temp[k]); } + } + } + } + + }else{ + //send my counts int size = startPosition.size(); MPI_Send(&size, 1, MPI_INT, 0, tag, MPI_COMM_WORLD); @@ -218,20 +220,17 @@ int SeqSummaryCommand::execute(){ ierr = MPI_Send(&(ambigBases[0]), (size+1), MPI_INT, 0, 2001, MPI_COMM_WORLD); longHomoPolymer.push_back(lhomoTag); ierr = MPI_Send(&(longHomoPolymer[0]), (size+1), MPI_INT, 0, 2001, MPI_COMM_WORLD); - } - MPI_File_close(&inMPI); - MPI_File_close(&outMPI); - + MPI_Barrier(MPI_COMM_WORLD); //make everyone wait - just in case #else #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) if(processors == 1){ ifstream inFASTA; openInputFile(fastafile, inFASTA); - numSeqs=count(istreambuf_iterator(inFASTA),istreambuf_iterator(), '>'); - inFASTA.close(); - + getNumSeqs(inFASTA, numSeqs); + inFASTA.close(); + lines.push_back(new linePair(0, numSeqs)); driverCreateSummary(startPosition, endPosition, seqLength, ambigBases, longHomoPolymer, fastafile, summaryFile, lines[0]); @@ -250,8 +249,8 @@ int SeqSummaryCommand::execute(){ if (m->control_pressed) { return 0; } #else ifstream inFASTA; - openInputFile(fastafileNames[s], inFASTA); - numSeqs=count(istreambuf_iterator(inFASTA),istreambuf_iterator(), '>'); + openInputFile(fastafile, inFASTA); + getNumSeqs(inFASTA, numSeqs); inFASTA.close(); lines.push_back(new linePair(0, numSeqs)); @@ -335,6 +334,7 @@ int SeqSummaryCommand::driverCreateSummary(vector& startPosition, vectorcontrol_pressed) { in.close(); outSummary.close(); return 1; } Sequence current(in); + if (current.getName() != "") { startPosition.push_back(current.getStartPos()); endPosition.push_back(current.getEndPos()); @@ -363,7 +363,10 @@ int SeqSummaryCommand::driverCreateSummary(vector& startPosition, vector& startPosition, vector& endPosition, vector& seqLength, vector& ambigBases, vector& longHomoPolymer, MPI_File& inMPI, MPI_File& outMPI, vector& MPIPos) { try { + int pid; MPI_Status status; + MPI_Comm_rank(MPI_COMM_WORLD, &pid); + for(int i=0;i& startPo MPI_File_write_shared(outMPI, buf3, length, MPI_CHAR, &status); delete buf3; - } + } } return 0; @@ -450,7 +453,7 @@ int SeqSummaryCommand::createProcessesCreateSummary(vector& startPosition, int SeqSummaryCommand::setLines(string filename) { try { - vector positions; + vector positions; ifstream inFASTA; openInputFile(filename, inFASTA); @@ -460,7 +463,7 @@ int SeqSummaryCommand::setLines(string filename) { input = getline(inFASTA); if (input.length() != 0) { - if(input[0] == '>'){ long int pos = inFASTA.tellg(); positions.push_back(pos - input.length() - 1); } + if(input[0] == '>'){ unsigned long int pos = inFASTA.tellg(); positions.push_back(pos - input.length() - 1); } } } inFASTA.close(); @@ -468,7 +471,7 @@ int SeqSummaryCommand::setLines(string filename) { int numFastaSeqs = positions.size(); FILE * pFile; - long size; + unsigned long int size; //get num bytes in file pFile = fopen (filename.c_str(),"rb"); @@ -483,11 +486,11 @@ int SeqSummaryCommand::setLines(string filename) { for (int i = 0; i < processors; i++) { - long int startPos = positions[ i * numSeqsPerProcessor ]; + unsigned long int startPos = positions[ i * numSeqsPerProcessor ]; if(i == processors - 1){ numSeqsPerProcessor = numFastaSeqs - i * numSeqsPerProcessor; }else{ - long int myEnd = positions[ (i+1) * numSeqsPerProcessor ]; + unsigned long int myEnd = positions[ (i+1) * numSeqsPerProcessor ]; } lines.push_back(new linePair(startPos, numSeqsPerProcessor)); }