From 30f2d98fffb579b870c8969ddcc1dfc61ccbb70a Mon Sep 17 00:00:00 2001 From: westcott Date: Tue, 16 Mar 2010 13:09:20 +0000 Subject: [PATCH] modified mpi code to save ram by writing out every 10 seqs. --- distancecommand.cpp | 182 ++++++++++++++++++++++++++++++++---------- distancecommand.h | 3 +- filterseqscommand.cpp | 164 ++++++++----------------------------- filterseqscommand.h | 2 +- mothur.cpp | 5 +- sequence.cpp | 22 ++--- 6 files changed, 187 insertions(+), 191 deletions(-) diff --git a/distancecommand.cpp b/distancecommand.cpp index 23f2834..e18defe 100644 --- a/distancecommand.cpp +++ b/distancecommand.cpp @@ -194,46 +194,42 @@ int DistanceCommand::execute(){ start = int (sqrt(float(pid)/float(processors)) * numSeqs); end = int (sqrt(float(pid+1)/float(processors)) * numSeqs); + MPI_File outMPI; + int amode=MPI_MODE_CREATE|MPI_MODE_WRONLY; + + char filename[outputFile.length()]; + strcpy(filename, outputFile.c_str()); + + MPI_File_open(MPI_COMM_WORLD, filename, amode, MPI_INFO_NULL, &outMPI); + if (pid == 0) { //you are the root process + //do your part string outputMyPart; - driverMPI(start, end, outputMyPart, cutoff); + driverMPI(start, end, outMPI, cutoff); - ofstream out; - openOutputFile(outputFile, out); - - out << outputMyPart; - - //get the childrens parts + //wait on chidren for(int i = 1; i < processors; i++) { - int length; - MPI_Recv(&length, 1, MPI_INT, i, tag, MPI_COMM_WORLD, &status); - - char buf[length]; - - MPI_Recv(buf, length, MPI_CHAR, i, tag, MPI_COMM_WORLD, &status); - - outputMyPart = buf; - out << outputMyPart; + char buf[4]; + MPI_Recv(buf, 4, MPI_CHAR, i, tag, MPI_COMM_WORLD, &status); } - out.close(); + if (output == "lt") { + convertToLowerTriangle(outputFile); + } }else { //you are a child process //do your part - string outputMyPart; - driverMPI(start, end, outputMyPart, cutoff); + driverMPI(start, end, outMPI, cutoff); - //send results to parent - int length = outputMyPart.length(); - char buf[length]; - strcpy(buf, outputMyPart.c_str()); + char buf[4]; + strcpy(buf, "done"); - MPI_Send( &length, 1, MPI_INT, 0, tag, MPI_COMM_WORLD); - MPI_Send(buf, length, MPI_CHAR, 0, tag, MPI_COMM_WORLD); + //tell parent you are done. + MPI_Send(buf, 4, MPI_CHAR, 0, tag, MPI_COMM_WORLD); } - + MPI_File_close(&outMPI); #else #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) @@ -391,24 +387,15 @@ int DistanceCommand::driver(int startLine, int endLine, string dFileName, float } /**************************************************************************************************/ /////// need to fix to work with calcs and sequencedb -int DistanceCommand::driverMPI(int startLine, int endLine, string& outputString, float cutoff){ +int DistanceCommand::driverMPI(int startLine, int endLine, MPI_File& outMPI, float cutoff){ try { - + MPI_Status status; int startTime = time(NULL); - outputString = ""; - - if((output == "lt") && startLine == 0){ outputString += (toString(alignDB.getNumSeqs()) + '\n'); } + string outputString = ""; for(int i=startLine;icontrol_pressed) { return 0; } @@ -419,23 +406,40 @@ int DistanceCommand::driverMPI(int startLine, int endLine, string& outputString, if(dist <= cutoff){ if (output == "column") { outputString += (alignDB.get(i).getName() + ' ' + alignDB.get(j).getName() + ' ' + toString(dist) + '\n'); } } - if (output == "lt") { outputString += (toString(dist) + '\t'); } - if (output == "square") { //make a square column you can convert to square phylip + if ((output == "square") || (output == "lt")){ //make a square column you can convert to square phylip outputString += (alignDB.get(i).getName() + ' ' + alignDB.get(j).getName() + ' ' + toString(dist) + '\n'); outputString += (alignDB.get(j).getName() + ' ' + alignDB.get(i).getName() + ' ' + toString(dist) + '\n'); } } - if (output == "lt") { outputString += '\n'; } - if(i % 100 == 0){ m->mothurOut(toString(i) + "\t" + toString(time(NULL) - startTime)); m->mothurOutEndLine(); } + if(i % 10 == 0){ //output to file + //send results to parent + int length = outputString.length(); + char buf[length]; + strcpy(buf, outputString.c_str()); + + MPI_File_write_shared(outMPI, buf, length, MPI_CHAR, &status); + outputString = ""; + } + } + m->mothurOut(toString(endLine-1) + "\t" + toString(time(NULL) - startTime)); m->mothurOutEndLine(); + if(outputString != ""){ //output to file + //send results to parent + int length = outputString.length(); + char buf[length]; + strcpy(buf, outputString.c_str()); + + MPI_File_write_shared(outMPI, buf, length, MPI_CHAR, &status); + outputString = ""; + } return 1; } @@ -533,6 +537,100 @@ int DistanceCommand::convertMatrix(string outputFile) { exit(1); } } +/**************************************************************************************************/ +int DistanceCommand::convertToLowerTriangle(string outputFile) { + try{ + + //sort file by first column so the distances for each row are together + string outfile = getRootName(outputFile) + "sorted.dist.temp"; + + //use the unix sort + #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) + string command = "sort -n " + outputFile + " -o " + outfile; + system(command.c_str()); + #else //sort using windows sort + string command = "sort " + outputFile + " /O " + outfile; + system(command.c_str()); + #endif + + + //output to new file distance for each row and save positions in file where new row begins + ifstream in; + openInputFile(outfile, in); + + ofstream out; + openOutputFile(outputFile, out); + + out.setf(ios::fixed, ios::floatfield); out.setf(ios::showpoint); + + out << alignDB.getNumSeqs() << endl; + + //get first currentRow + string first, currentRow, second; + float dist; + int i, j; + i = 0; j = 0; + map rowDists; //take advantage of the fact that maps are already sorted by key + map::iterator it; + + in >> first; + currentRow = first; + + rowDists[first] = 0.00; //distance to yourself is 0.0 + + in.seekg(0); + //openInputFile(outfile, in); + + while(!in.eof()) { + if (m->control_pressed) { in.close(); remove(outfile.c_str()); out.close(); return 0; } + + in >> first >> second >> dist; gobble(in); + + if (first != currentRow) { + //print out last row + out << currentRow << '\t'; //print name + + //print dists + for (it = rowDists.begin(); it != rowDists.end(); it++) { + if (j >= i) { break; } + out << it->second << '\t'; + j++; + } + out << endl; + + //start new row + currentRow = first; + rowDists.clear(); + rowDists[first] = 0.00; + rowDists[second] = dist; + j = 0; + i++; + }else{ + rowDists[second] = dist; + } + } + //print out last row + out << currentRow << '\t'; //print name + + //print dists + for (it = rowDists.begin(); it != rowDists.end(); it++) { + out << it->second << '\t'; + } + out << endl; + + in.close(); + out.close(); + + remove(outfile.c_str()); + + return 1; + + } + catch(exception& e) { + m->errorOut(e, "DistanceCommand", "convertToLowerTriangle"); + exit(1); + } +} /************************************************************************************************** void DistanceCommand::appendFiles(string temp, string filename) { try{ diff --git a/distancecommand.h b/distancecommand.h index 3fa69d5..b0a3c04 100644 --- a/distancecommand.h +++ b/distancecommand.h @@ -46,9 +46,10 @@ private: //void appendFiles(string, string); void createProcesses(string); int driver(/*Dist*, SequenceDB, */int, int, string, float); - int driverMPI(int, int, string&, float); + int driverMPI(int, int, MPI_File&, float); int convertMatrix(string); + int convertToLowerTriangle(string); }; diff --git a/filterseqscommand.cpp b/filterseqscommand.cpp index aa38e7a..fa1e93e 100644 --- a/filterseqscommand.cpp +++ b/filterseqscommand.cpp @@ -295,67 +295,27 @@ string FilterSeqsCommand::createFilter() { MPI_Send(&bufferSizes[j], 1, MPI_INT, j, tag, MPI_COMM_WORLD); //how bytes for the read } } - //cout << "done sending" << endl; - //cout << "parent = " << pid << " lines = " << lines[pid]->start << '\t' << lines[pid]->numSeqs << " size = " << lines.size() << endl; - - cout << "parent = " << pid << " address of Filter " << &F << " address of FilterString " << &filterString << " address of numSeqs = " << &numSeqs << " address of soft = " << &soft << endl; - char* buf = new char(bufferSizes[0]); - //cout << pid << '\t' << bufferSizes[0] << " line 1 start pos = " << lines[1]->start << " buffer size 0 " << bufferSizes[0] << " buffer size 1 " << bufferSizes[1] << endl; + char buf[bufferSizes[0]]; MPI_File_read_at(in, 0, buf, bufferSizes[0], MPI_CHAR, &status); - - cout << pid << " done reading " << &buf << endl; - string tempBuf = buf; - delete buf; - //cout << pid << '\t' << (tempBuf.substr(0, 10)) << endl; - - //parse buffer - istringstream iss (tempBuf,istringstream::in); - string name, seqstring; - vector seqs; - - while (iss) { - if (m->control_pressed) { return filterString; } - cout << "here" << endl; - Sequence seq(iss); - cout << "here1" << endl; - gobble(iss); - cout << seq.getName() << endl; - if (seq.getName() != "") { - seqs.push_back(seq.getAligned()); - } - - } - - for(int i=0;icontrol_pressed) { return filterString; } - - Sequence seq("", seqs[i]); - - if(trump != '*'){ F.doTrump(seq); } - if(isTrue(vertical) || soft != 0){ F.getFreqs(seq); } - cout.flush(); - - //report progress - if((i+1) % 100 == 0){ m->mothurOut(toString(i+1)); m->mothurOutEndLine(); } - } - - //report progress - if((seqs.size()) % 100 != 0){ m->mothurOut(toString(seqs.size())); m->mothurOutEndLine(); } - - //do your part - //MPICreateFilter(F, seqs); - - vector temp; temp.resize(alignmentLength); + MPICreateFilter(F, buf); + + vector temp; temp.resize(alignmentLength+1); //get the frequencies from the child processes for(int i = 0; i < ((processors-1)*5); i++) { cout << "i = " << i << endl; - int ierr = MPI_Recv(&temp, alignmentLength, MPI_INT, MPI_ANY_SOURCE, tag, MPI_COMM_WORLD, &status); - + //vector trial; trial.resize(10); + //cout << "trials size = " << trial.size() << endl; + //int ierr = MPI_Recv(&trial[0], 10, MPI_INT, MPI_ANY_SOURCE, tag, MPI_COMM_WORLD, &status); + int ierr = MPI_Recv(&temp[0], (alignmentLength+1), MPI_INT, MPI_ANY_SOURCE, tag, MPI_COMM_WORLD, &status); + cout << "recieved something" << endl; + //for (int g = 0; g < trial.size(); g++) { cout << trial[g] << '\t'; } cout << endl; int receiveTag = temp[temp.size()-1]; //child process added a int to the end to indicate what letter count this is for + cout << "reciveve tag = " << receiveTag << endl; + for (int k = 0; k < alignmentLength; k++) { cout << k << '\t' << temp[k] << endl; } + cout << "done " << endl << endl; int sender = status.MPI_SOURCE; @@ -377,72 +337,27 @@ string FilterSeqsCommand::createFilter() { }else { //i am the child process int startPos, numLines, bufferSize; - cout << "child = " << pid << " address of Filter " << &F << " address of FilterString " << &filterString << " address of numSeqs = " << &numSeqs << " address of soft = " << &soft<< endl; ierr = MPI_Recv(&startPos, 1, MPI_INT, 0, tag, MPI_COMM_WORLD, &status); ierr = MPI_Recv(&numLines, 1, MPI_INT, 0, tag, MPI_COMM_WORLD, &status); ierr = MPI_Recv(&bufferSize, 1, MPI_INT, 0, tag, MPI_COMM_WORLD, &status); - //cout << "child = " << pid << " done recv messages startpos = " << startPos << " numLines = " << numLines << " buffersize = " << bufferSize << endl; - - + //send freqs - char* buf2 = new char(bufferSize); + char buf2[bufferSize]; MPI_File_read_at( in, startPos, buf2, bufferSize, MPI_CHAR, &status); - cout << pid << " done reading " << &buf2 << endl; - - string tempBuf = buf2; - delete buf2; - // cout << pid << '\t' << (tempBuf.substr(0, 10)) << endl; - istringstream iss (tempBuf,istringstream::in); - - string name, seqstring; - vector seqs; - - while (iss) { - - if (m->control_pressed) { return filterString; } - cout << "here" << endl; - Sequence seq(iss); - cout << "here1" << endl; - gobble(iss); - cout << seq.getName() << endl; - - if (seq.getName() != "") { - seqs.push_back(seq.getAligned()); - } - } - - for(int i=0;icontrol_pressed) { return filterString; } - - Sequence seq("", seqs[i]); - - if(trump != '*'){ F.doTrump(seq); } - if(isTrue(vertical) || soft != 0){ F.getFreqs(seq); } - cout.flush(); - //report progress - if((i+1) % 100 == 0){ m->mothurOut(toString(i+1)); m->mothurOutEndLine(); } - } - - //report progress - if((seqs.size()) % 100 != 0){ m->mothurOut(toString(seqs.size())); m->mothurOutEndLine(); } - - //MPICreateFilter(F, seqs); - + MPICreateFilter(F, buf2); + //send my fequency counts F.a.push_back(Atag); - int ierr = MPI_Send( &F.a[0], alignmentLength, MPI_INT, 0, tag, MPI_COMM_WORLD); + int ierr = MPI_Send(&(F.a[0]), (alignmentLength+1), MPI_INT, 0, tag, MPI_COMM_WORLD); F.t.push_back(Ttag); - ierr = MPI_Send( &F.t[0], alignmentLength, MPI_INT, 0, tag, MPI_COMM_WORLD); + ierr = MPI_Send (&(F.t[0]), (alignmentLength+1), MPI_INT, 0, tag, MPI_COMM_WORLD); F.c.push_back(Ctag); - ierr = MPI_Send( &F.c[0], alignmentLength, MPI_INT, 0, tag, MPI_COMM_WORLD); + ierr = MPI_Send(&(F.c[0]), (alignmentLength+1), MPI_INT, 0, tag, MPI_COMM_WORLD); F.g.push_back(Gtag); - ierr = MPI_Send( &F.g[0], alignmentLength, MPI_INT, 0, tag, MPI_COMM_WORLD); + ierr = MPI_Send(&(F.g[0]), (alignmentLength+1), MPI_INT, 0, tag, MPI_COMM_WORLD); F.gap.push_back(Gaptag); - ierr = MPI_Send( &F.gap[0], alignmentLength, MPI_INT, 0, tag, MPI_COMM_WORLD); - - cout << "child " << pid << " done sending counts" << endl; + ierr = MPI_Send(&(F.gap[0]), (alignmentLength+1), MPI_INT, 0, tag, MPI_COMM_WORLD); } MPI_Barrier(MPI_COMM_WORLD); @@ -482,16 +397,8 @@ string FilterSeqsCommand::createFilter() { } } -#ifdef USE_MPI - -//merge all frequency data and create filter string - //int pid; - //MPI_Comm_rank(MPI_COMM_WORLD, &pid); - - //if (pid == 0) { //only one process should output to screen -#endif - cout << "made it here" << endl; + cout << "made it here, numSeqs = " << numSeqs << endl; F.setNumSeqs(numSeqs); if(isTrue(vertical) == 1) { F.doVertical(); } @@ -543,14 +450,17 @@ int FilterSeqsCommand::driverCreateFilter(Filters& F, string filename, linePair* } } /**************************************************************************************/ -int FilterSeqsCommand::MPICreateFilter(Filters& F, vector& seqStrings) { +int FilterSeqsCommand::MPICreateFilter(Filters& F, string input) { try { + vector seqStrings; + parseBuffer(input, seqStrings); + for(int i=0;icontrol_pressed) { return 1; } - Sequence seq("", seqStrings[0]); + Sequence seq("", seqStrings[i]); if(trump != '*'){ F.doTrump(seq); } if(isTrue(vertical) || soft != 0){ F.getFreqs(seq); } @@ -666,24 +576,16 @@ int FilterSeqsCommand::parseBuffer(string file, vector& seqs) { istringstream iss (file,istringstream::in); string name, seqstring; -int pid; -MPI_Comm_rank(MPI_COMM_WORLD, &pid); - Sequence* seq34 = new Sequence(); -cout << "address of new sequence " << pid << '\t' << seq34 << endl; -cout << "address of seqStrings " << pid << '\t' << &seqs << endl; while (iss) { if (m->control_pressed) { return 0; } - cout << "here" << endl; - Sequence* seq = new Sequence(iss); - cout << "here1" << endl; - gobble(iss); - cout << seq->getName() << endl; - if (seq->getName() != "") { - seqs.push_back(seq->getAligned()); + + Sequence seq(iss); gobble(iss); + + if (seq.getName() != "") { + seqs.push_back(seq.getAligned()); } - delete seq; } return 0; diff --git a/filterseqscommand.h b/filterseqscommand.h index 5eb49d3..3cc007c 100644 --- a/filterseqscommand.h +++ b/filterseqscommand.h @@ -45,7 +45,7 @@ private: string createFilter(); int createProcessesCreateFilter(Filters&, string); int driverCreateFilter(Filters&, string, linePair*); - int MPICreateFilter(Filters&, vector&); + int MPICreateFilter(Filters&, string); int setLines(string); int parseBuffer(string, vector&); diff --git a/mothur.cpp b/mothur.cpp index 75d03e1..6236012 100644 --- a/mothur.cpp +++ b/mothur.cpp @@ -107,7 +107,10 @@ int main(int argc, char *argv[]){ m->mothurOutEndLine(); #ifdef USE_MPI - m->mothurOutJustToLog("Using MPI\n"); + m->mothurOutJustToLog("Using MPI\tversion "); + int version, subversion; + MPI_Get_version(&version, &subversion); + m->mothurOutJustToLog(toString(version) + "." + toString(subversion) + "\n"); #endif //srand(54321); diff --git a/sequence.cpp b/sequence.cpp index 3bd80a8..4b56675 100644 --- a/sequence.cpp +++ b/sequence.cpp @@ -38,17 +38,12 @@ Sequence::Sequence(string newName, string sequence) { Sequence::Sequence(istringstream& fastaString){ try { m = MothurOut::getInstance(); - int pid; - MPI_Comm_rank(MPI_COMM_WORLD, &pid); - cout << pid << " after mothur instance " << &name << endl; + initialize(); - cout << "after mothur initialize" << endl; fastaString >> name; - cout << pid << " after name " << name << endl; name = name.substr(1); - string sequence; -cout << pid << " name = " << name << endl; + //read comments while ((name[0] == '#') && fastaString) { while (!fastaString.eof()) { char c = fastaString.get(); if (c == 10 || c == 13){ break; } } // get rest of line if there's any crap there @@ -61,13 +56,11 @@ cout << pid << " name = " << name << endl; name = ""; break; } - cout << pid << "in while comment" << endl; } - cout << pid << "after mothur comment" << endl; - while (!fastaString.eof()) { char c = fastaString.get(); cout << pid << " char = " << int(c) << endl; if (c == 10 || c == 13){ break; } } // get rest of line if there's any crap there - cout << pid << " after mothur name" << endl; + + while (!fastaString.eof()) { char c = fastaString.get(); if (c == 10 || c == 13){ break; } } // get rest of line if there's any crap there + sequence = getSequenceString(fastaString); - cout << pid << " after mothur sequence" << endl; setAligned(sequence); //setUnaligned removes any gap characters for us setUnaligned(sequence); @@ -169,11 +162,10 @@ string Sequence::getSequenceString(istringstream& fastaFile) { try { char letter; string sequence = ""; -int pid; -MPI_Comm_rank(MPI_COMM_WORLD, &pid); + while(!fastaFile.eof()){ letter= fastaFile.get(); - cout << pid << '\t' << letter << endl; + if(letter == '>'){ fastaFile.putback(letter); break; -- 2.39.2