X-Git-Url: https://git.donarmstrong.com/?a=blobdiff_plain;ds=sidebyside;f=filterseqscommand.cpp;h=3d3062b4cc371848fdbd222b7985415a41e62e57;hb=fdc1f6eaf544f695fc1511f24bddd7e6069c33ba;hp=b402a5bac5cc8cf8ad6a2ac4b4ab882fc91e306b;hpb=aba5f8811829037b0a3004ef33f0ad4ed5e5fcf8;p=mothur.git diff --git a/filterseqscommand.cpp b/filterseqscommand.cpp index b402a5b..3d3062b 100644 --- a/filterseqscommand.cpp +++ b/filterseqscommand.cpp @@ -167,9 +167,12 @@ int FilterSeqsCommand::execute() { inFASTA.close(); ////////////create filter///////////////// + m->mothurOut("Creating Filter... "); m->mothurOutEndLine(); filter = createFilter(); + m->mothurOutEndLine(); m->mothurOutEndLine(); + if (m->control_pressed) { return 0; } #ifdef USE_MPI @@ -193,8 +196,12 @@ int FilterSeqsCommand::execute() { ////////////run filter///////////////// + m->mothurOut("Running Filter... "); m->mothurOutEndLine(); + filterSequences(); - + + m->mothurOutEndLine(); m->mothurOutEndLine(); + int filteredLength = 0; for(int i=0;iMPIPos; MPI_Status status; MPI_Comm_size(MPI_COMM_WORLD, &processors); //set processors to the number of mpi processes running MPI_Comm_rank(MPI_COMM_WORLD, &pid); //find out who we are MPI_File outMPI; + MPI_File tempMPI; MPI_File inMPI; int outMode=MPI_MODE_CREATE|MPI_MODE_WRONLY; int inMode=MPI_MODE_RDONLY; @@ -256,28 +265,26 @@ int FilterSeqsCommand::filterSequences() { MPI_File_open(MPI_COMM_WORLD, inFileName, inMode, MPI_INFO_NULL, &inMPI); //comm, filename, mode, info, filepointer MPI_File_open(MPI_COMM_WORLD, outFilename, outMode, MPI_INFO_NULL, &outMPI); + if (m->control_pressed) { MPI_File_close(&inMPI); MPI_File_close(&outMPI); return 0; } + if (pid == 0) { //you are the root process - setLines(fastafileNames[s]); - - char bufF[alignmentLength]; - strcpy(bufF, filter.c_str()); - - for (int j = 0; j < lines.size(); j++) { //each process - if (j != 0) { //don't send to yourself - MPI_Send(&lines[j]->start, 1, MPI_INT, j, tag, MPI_COMM_WORLD); //start position in file - MPI_Send(&bufferSizes[j], 1, MPI_INT, j, tag, MPI_COMM_WORLD); //how bytes for the read - MPI_Send(bufF, alignmentLength, MPI_CHAR, j, tag, MPI_COMM_WORLD); - } - } + MPIPos = setFilePosFasta(fastafileNames[s], num); //fills MPIPos, returns numSeqs + numSeqs += num; - //read your peice of file - char buf[bufferSizes[0]]; - MPI_File_read_at(inMPI, lines[0]->start, buf, bufferSizes[0], MPI_CHAR, &status); - istringstream iss (buf,istringstream::in); + //send file positions to all processes + MPI_Bcast(&num, 1, MPI_INT, 0, MPI_COMM_WORLD); //send numSeqs + MPI_Bcast(&MPIPos[0], (num+1), MPI_LONG, 0, MPI_COMM_WORLD); //send file pos + //figure out how many sequences you have to do + numSeqsPerProcessor = num / processors; + if(pid == (processors - 1)){ numSeqsPerProcessor = num - pid * numSeqsPerProcessor; } + int startIndex = pid * numSeqsPerProcessor; + //do your part - driverMPIRun(iss, outMPI); + driverMPIRun(startIndex, numSeqsPerProcessor, inMPI, outMPI, MPIPos); + + if (m->control_pressed) { MPI_File_close(&inMPI); MPI_File_close(&outMPI); return 0; } //wait on chidren for(int i = 1; i < processors; i++) { @@ -286,23 +293,21 @@ int FilterSeqsCommand::filterSequences() { } }else { //you are a child process - //receive your section of file - int startPos, bufferSize; - char bufF[alignmentLength]; - MPI_Recv(&startPos, 1, MPI_INT, 0, tag, MPI_COMM_WORLD, &status); - MPI_Recv(&bufferSize, 1, MPI_INT, 0, tag, MPI_COMM_WORLD, &status); - MPI_Recv(bufF, alignmentLength, MPI_CHAR, 0, tag, MPI_COMM_WORLD, &status); + MPI_Bcast(&num, 1, MPI_INT, 0, MPI_COMM_WORLD); //get numSeqs + numSeqs += num; + MPIPos.resize(num+1); + MPI_Bcast(&MPIPos[0], (num+1), MPI_LONG, 0, MPI_COMM_WORLD); //get file positions - filter = bufF; //filter was made by process 0 so other processes need to get it - - //read your peice of file - char buf2[bufferSize]; - MPI_File_read_at(inMPI, startPos, buf2, bufferSize, MPI_CHAR, &status); - istringstream iss (buf2,istringstream::in); + //figure out how many sequences you have to align + numSeqsPerProcessor = num / processors; + if(pid == (processors - 1)){ numSeqsPerProcessor = num - pid * numSeqsPerProcessor; } + int startIndex = pid * numSeqsPerProcessor; + + //align your part + driverMPIRun(startIndex, numSeqsPerProcessor, inMPI, outMPI, MPIPos); + + if (m->control_pressed) { MPI_File_close(&inMPI); MPI_File_close(&outMPI); return 0; } - //do your part - driverMPIRun(iss, outMPI); - char buf[4]; strcpy(buf, "done"); @@ -361,16 +366,28 @@ int FilterSeqsCommand::filterSequences() { exit(1); } } +#ifdef USE_MPI /**************************************************************************************/ -int FilterSeqsCommand::driverMPIRun(istringstream& in, MPI_File& outMPI) { +int FilterSeqsCommand::driverMPIRun(int start, int num, MPI_File& inMPI, MPI_File& outMPI, vector& MPIPos) { try { string outputString = ""; int count = 0; MPI_Status status; - while (!in.eof()) { + for(int i=0;icontrol_pressed) { return 0; } + + //read next sequence + int length = MPIPos[start+i+1] - MPIPos[start+i]; + char buf4[length]; + MPI_File_read_at(inMPI, MPIPos[start+i], buf4, length, MPI_CHAR, &status); - Sequence seq(in); gobble(in); + string tempBuf = buf4; + if (tempBuf.length() > length) { tempBuf = tempBuf.substr(0, length); } + istringstream iss (tempBuf,istringstream::in); + + Sequence seq(iss); gobble(iss); if (seq.getName() != "") { string align = seq.getAligned(); @@ -396,6 +413,8 @@ int FilterSeqsCommand::driverMPIRun(istringstream& in, MPI_File& outMPI) { } } + + if((i+1) % 100 == 0){ cout << (i+1) << endl; m->mothurOutJustToLog(toString(i+1) + "\n"); } } if(outputString != ""){ //output to file @@ -407,7 +426,8 @@ int FilterSeqsCommand::driverMPIRun(istringstream& in, MPI_File& outMPI) { MPI_File_write_shared(outMPI, buf, length, MPI_CHAR, &status); outputString = ""; } - + + if((num) % 100 != 0){ cout << (num) << endl; m->mothurOutJustToLog(toString(num) + "\n"); } return 0; } @@ -416,6 +436,7 @@ int FilterSeqsCommand::driverMPIRun(istringstream& in, MPI_File& outMPI) { exit(1); } } +#endif /**************************************************************************************/ int FilterSeqsCommand::driverRunFilter(string F, string outputFilename, string inputFilename, linePair* line) { try { @@ -518,9 +539,9 @@ string FilterSeqsCommand::createFilter() { for (int i = 0; i < lines.size(); i++) { delete lines[i]; } lines.clear(); #ifdef USE_MPI - int pid; - int Atag = 1; int Ttag = 2; int Ctag = 3; int Gtag = 4; int Gaptag = 5; + int pid, numSeqsPerProcessor, num; int tag = 2001; + vector MPIPos; MPI_Status status; MPI_File inMPI; @@ -532,80 +553,44 @@ string FilterSeqsCommand::createFilter() { MPI_File_open(MPI_COMM_WORLD, tempFileName, MPI_MODE_RDONLY, MPI_INFO_NULL, &inMPI); //comm, filename, mode, info, filepointer + if (m->control_pressed) { MPI_File_close(&inMPI); return 0; } + if (pid == 0) { //you are the root process - setLines(fastafileNames[s]); - - for (int j = 0; j < lines.size(); j++) { //each process - if (j != 0) { //don't send to yourself - MPI_Send(&lines[j]->start, 1, MPI_INT, j, tag, MPI_COMM_WORLD); //start position in file - MPI_Send(&numSeqs, 1, MPI_INT, j, tag, MPI_COMM_WORLD); - MPI_Send(&bufferSizes[j], 1, MPI_INT, j, tag, MPI_COMM_WORLD); //how bytes for the read - } - } - - char buf[bufferSizes[0]]; - MPI_File_read_at(inMPI, 0, buf, bufferSizes[0], MPI_CHAR, &status); - - string tempBuf = buf; - if (tempBuf.length() > bufferSizes[0]) { tempBuf = tempBuf.substr(0, bufferSizes[0]); } - - MPICreateFilter(F, tempBuf); - - if (m->control_pressed) { MPI_File_close(&inMPI); return filterString; } - - vector temp; temp.resize(alignmentLength+1); + MPIPos = setFilePosFasta(fastafileNames[s], num); //fills MPIPos, returns numSeqs + numSeqs += num; - //get the frequencies from the child processes - for(int i = 0; i < ((processors-1)*5); i++) { - MPI_Recv(&temp[0], (alignmentLength+1), MPI_INT, MPI_ANY_SOURCE, tag, MPI_COMM_WORLD, &status); - int receiveTag = temp[temp.size()-1]; //child process added a int to the end to indicate what letter count this is for - - if (receiveTag == Atag) { //you are recieveing the A frequencies - for (int k = 0; k < alignmentLength; k++) { F.a[k] += temp[k]; } - }else if (receiveTag == Ttag) { //you are recieveing the T frequencies - for (int k = 0; k < alignmentLength; k++) { F.t[k] += temp[k]; } - }else if (receiveTag == Ctag) { //you are recieveing the C frequencies - for (int k = 0; k < alignmentLength; k++) { F.c[k] += temp[k]; } - }else if (receiveTag == Gtag) { //you are recieveing the G frequencies - for (int k = 0; k < alignmentLength; k++) { F.g[k] += temp[k]; } - }else if (receiveTag == Gaptag) { //you are recieveing the gap frequencies - for (int k = 0; k < alignmentLength; k++) { F.gap[k] += temp[k]; } - } - } - + //send file positions to all processes + MPI_Bcast(&num, 1, MPI_INT, 0, MPI_COMM_WORLD); //send numSeqs + MPI_Bcast(&MPIPos[0], (num+1), MPI_LONG, 0, MPI_COMM_WORLD); //send file pos + + //figure out how many sequences you have to do + numSeqsPerProcessor = num / processors; + if(pid == (processors - 1)){ numSeqsPerProcessor = num - pid * numSeqsPerProcessor; } + int startIndex = pid * numSeqsPerProcessor; + + //do your part + MPICreateFilter(startIndex, numSeqsPerProcessor, F, inMPI, MPIPos); + if (m->control_pressed) { MPI_File_close(&inMPI); return 0; } + }else { //i am the child process - int startPos, bufferSize; - MPI_Recv(&startPos, 1, MPI_INT, 0, tag, MPI_COMM_WORLD, &status); - MPI_Recv(&numSeqs, 1, MPI_INT, 0, tag, MPI_COMM_WORLD, &status); - MPI_Recv(&bufferSize, 1, MPI_INT, 0, tag, MPI_COMM_WORLD, &status); - - //send freqs - char buf2[bufferSize]; - MPI_File_read_at(inMPI, startPos, buf2, bufferSize, MPI_CHAR, &status); - - string tempBuf = buf2; - if (tempBuf.length() > bufferSize) { tempBuf = tempBuf.substr(0, bufferSize); } - - MPICreateFilter(F, tempBuf); - - if (m->control_pressed) { MPI_File_close(&inMPI); return filterString; } + MPI_Bcast(&num, 1, MPI_INT, 0, MPI_COMM_WORLD); //get numSeqs + MPIPos.resize(num+1); + numSeqs += num; + MPI_Bcast(&MPIPos[0], (num+1), MPI_LONG, 0, MPI_COMM_WORLD); //get file positions + + //figure out how many sequences you have to align + numSeqsPerProcessor = num / processors; + if(pid == (processors - 1)){ numSeqsPerProcessor = num - pid * numSeqsPerProcessor; } + int startIndex = pid * numSeqsPerProcessor; + + //do your part + MPICreateFilter(startIndex, numSeqsPerProcessor, F, inMPI, MPIPos); - //send my fequency counts - F.a.push_back(Atag); - int ierr = MPI_Send(&(F.a[0]), (alignmentLength+1), MPI_INT, 0, tag, MPI_COMM_WORLD); - F.t.push_back(Ttag); - ierr = MPI_Send (&(F.t[0]), (alignmentLength+1), MPI_INT, 0, tag, MPI_COMM_WORLD); - F.c.push_back(Ctag); - ierr = MPI_Send(&(F.c[0]), (alignmentLength+1), MPI_INT, 0, tag, MPI_COMM_WORLD); - F.g.push_back(Gtag); - ierr = MPI_Send(&(F.g[0]), (alignmentLength+1), MPI_INT, 0, tag, MPI_COMM_WORLD); - F.gap.push_back(Gaptag); - ierr = MPI_Send(&(F.gap[0]), (alignmentLength+1), MPI_INT, 0, tag, MPI_COMM_WORLD); + if (m->control_pressed) { MPI_File_close(&inMPI); return 0; } } - MPI_Barrier(MPI_COMM_WORLD); MPI_File_close(&inMPI); #else @@ -645,13 +630,74 @@ string FilterSeqsCommand::createFilter() { } } + +#ifdef USE_MPI + int pid; + int Atag = 1; int Ttag = 2; int Ctag = 3; int Gtag = 4; int Gaptag = 5; + MPI_Status status; + + MPI_Comm_rank(MPI_COMM_WORLD, &pid); + if (pid == 0) { //only one process should output the filter + + vector temp; temp.resize(alignmentLength+1); + + //get the frequencies from the child processes + for(int i = 0; i < ((processors-1)*5); i++) { + MPI_Recv(&temp[0], (alignmentLength+1), MPI_INT, MPI_ANY_SOURCE, 2001, MPI_COMM_WORLD, &status); + int receiveTag = temp[temp.size()-1]; //child process added a int to the end to indicate what letter count this is for + + if (receiveTag == Atag) { //you are recieveing the A frequencies + for (int k = 0; k < alignmentLength; k++) { F.a[k] += temp[k]; } + }else if (receiveTag == Ttag) { //you are recieveing the T frequencies + for (int k = 0; k < alignmentLength; k++) { F.t[k] += temp[k]; } + }else if (receiveTag == Ctag) { //you are recieveing the C frequencies + for (int k = 0; k < alignmentLength; k++) { F.c[k] += temp[k]; } + }else if (receiveTag == Gtag) { //you are recieveing the G frequencies + for (int k = 0; k < alignmentLength; k++) { F.g[k] += temp[k]; } + }else if (receiveTag == Gaptag) { //you are recieveing the gap frequencies + for (int k = 0; k < alignmentLength; k++) { F.gap[k] += temp[k]; } + } + } + }else{ + + //send my fequency counts + F.a.push_back(Atag); + int ierr = MPI_Send(&(F.a[0]), (alignmentLength+1), MPI_INT, 0, 2001, MPI_COMM_WORLD); + F.t.push_back(Ttag); + ierr = MPI_Send (&(F.t[0]), (alignmentLength+1), MPI_INT, 0, 2001, MPI_COMM_WORLD); + F.c.push_back(Ctag); + ierr = MPI_Send(&(F.c[0]), (alignmentLength+1), MPI_INT, 0, 2001, MPI_COMM_WORLD); + F.g.push_back(Gtag); + ierr = MPI_Send(&(F.g[0]), (alignmentLength+1), MPI_INT, 0, 2001, MPI_COMM_WORLD); + F.gap.push_back(Gaptag); + ierr = MPI_Send(&(F.gap[0]), (alignmentLength+1), MPI_INT, 0, 2001, MPI_COMM_WORLD); + } + + if (pid == 0) { //only one process should output the filter +#endif F.setNumSeqs(numSeqs); if(isTrue(vertical) == 1) { F.doVertical(); } if(soft != 0) { F.doSoft(); } filterString = F.getFilter(); - + +#ifdef USE_MPI + //send filter string to kids + MPI_Bcast(&filterString[0], alignmentLength, MPI_CHAR, 0, MPI_COMM_WORLD); + }else{ + //recieve filterString + char tempBuf[alignmentLength]; + MPI_Bcast(tempBuf, alignmentLength, MPI_CHAR, 0, MPI_COMM_WORLD); + + filterString = tempBuf; + if (filterString.length() > alignmentLength) { filterString = filterString.substr(0, alignmentLength); } + } + + MPI_Barrier(MPI_COMM_WORLD); +#endif + + return filterString; } catch(exception& e) { @@ -697,31 +743,43 @@ int FilterSeqsCommand::driverCreateFilter(Filters& F, string filename, linePair* exit(1); } } +#ifdef USE_MPI /**************************************************************************************/ -int FilterSeqsCommand::MPICreateFilter(Filters& F, string input) { +int FilterSeqsCommand::MPICreateFilter(int start, int num, Filters& F, MPI_File& inMPI, vector& MPIPos) { try { - vector seqStrings; - parseBuffer(input, seqStrings); + MPI_Status status; + int pid; + MPI_Comm_rank(MPI_COMM_WORLD, &pid); //find out who we are - for(int i=0;icontrol_pressed = true; } - - if (m->control_pressed) { return 1; } + if (m->control_pressed) { return 0; } + + //read next sequence + int length = MPIPos[start+i+1] - MPIPos[start+i]; + + char buf4[length]; + MPI_File_read_at(inMPI, MPIPos[start+i], buf4, length, MPI_CHAR, &status); - Sequence seq("", seqStrings[i]); + string tempBuf = buf4; + if (tempBuf.length() > length) { tempBuf = tempBuf.substr(0, length); } + istringstream iss (tempBuf,istringstream::in); + + Sequence seq(iss); + + if (seq.getAligned().length() != alignmentLength) { cout << "Alignment length is " << alignmentLength << " and sequence " << seq.getName() << " has length " << seq.getAligned().length() << ", please correct." << endl; exit(1); } if(trump != '*'){ F.doTrump(seq); } if(isTrue(vertical) || soft != 0){ F.getFreqs(seq); } cout.flush(); //report progress - if((i+1) % 100 == 0){ m->mothurOut(toString(i+1)); m->mothurOutEndLine(); } + if((i+1) % 100 == 0){ cout << (i+1) << endl; m->mothurOutJustToLog(toString(i+1) + "\n"); } } //report progress - if((seqStrings.size()) % 100 != 0){ m->mothurOut(toString(seqStrings.size())); m->mothurOutEndLine(); } + if((num) % 100 != 0){ cout << num << endl; m->mothurOutJustToLog(toString(num) + "\n"); } return 0; } @@ -730,7 +788,7 @@ int FilterSeqsCommand::MPICreateFilter(Filters& F, string input) { exit(1); } } - +#endif /**************************************************************************************************/ int FilterSeqsCommand::createProcessesCreateFilter(Filters& F, string filename) { @@ -826,28 +884,4 @@ int FilterSeqsCommand::setLines(string filename) { exit(1); } } -/**************************************************************************************************/ -int FilterSeqsCommand::parseBuffer(string file, vector& seqs) { - try { - istringstream iss (file); //,istringstream::in - string name, seqstring; - - while (!iss.eof()) { - - if (m->control_pressed) { return 0; } - - Sequence seq(iss); gobble(iss); - - if (seq.getName() != "") { - seqs.push_back(seq.getAligned()); - } - } - - return 0; - } - catch(exception& e) { - m->errorOut(e, "FilterSeqsCommand", "parseBuffer"); - exit(1); - } -} /**************************************************************************************/