From: westcott Date: Wed, 17 Mar 2010 17:47:33 +0000 (+0000) Subject: finished mpi for filter.seqs X-Git-Url: https://git.donarmstrong.com/?p=mothur.git;a=commitdiff_plain;h=aba5f8811829037b0a3004ef33f0ad4ed5e5fcf8 finished mpi for filter.seqs --- diff --git a/distancecommand.cpp b/distancecommand.cpp index 23fe58c..4720df3 100644 --- a/distancecommand.cpp +++ b/distancecommand.cpp @@ -385,6 +385,7 @@ int DistanceCommand::driver(int startLine, int endLine, string dFileName, float exit(1); } } +#ifdef USE_MPI /**************************************************************************************************/ /////// need to fix to work with calcs and sequencedb int DistanceCommand::driverMPI(int startLine, int endLine, MPI_File& outMPI, float cutoff){ @@ -444,11 +445,11 @@ int DistanceCommand::driverMPI(int startLine, int endLine, MPI_File& outMPI, flo return 1; } catch(exception& e) { - m->errorOut(e, "DistanceCommand", "driver"); + m->errorOut(e, "DistanceCommand", "driverMPI"); exit(1); } } - +#endif /**************************************************************************************************/ int DistanceCommand::convertMatrix(string outputFile) { try{ diff --git a/distancecommand.h b/distancecommand.h index b0a3c04..c1dac14 100644 --- a/distancecommand.h +++ b/distancecommand.h @@ -46,7 +46,10 @@ private: //void appendFiles(string, string); void createProcesses(string); int driver(/*Dist*, SequenceDB, */int, int, string, float); + + #ifdef USE_MPI int driverMPI(int, int, MPI_File&, float); + #endif int convertMatrix(string); int convertToLowerTriangle(string); diff --git a/filterseqscommand.cpp b/filterseqscommand.cpp index 6d894d8..b402a5b 100644 --- a/filterseqscommand.cpp +++ b/filterseqscommand.cpp @@ -172,6 +172,13 @@ int FilterSeqsCommand::execute() { if (m->control_pressed) { return 0; } + #ifdef USE_MPI + int pid; + MPI_Comm_rank(MPI_COMM_WORLD, &pid); + + if (pid == 0) { //only one process should output the filter + #endif + ofstream outFilter; string filterFile = outputDir + filterFileName + ".filter"; @@ -180,6 +187,9 @@ int FilterSeqsCommand::execute() { outFilter.close(); outputNames.push_back(filterFile); + #ifdef USE_MPI + } + #endif ////////////run filter///////////////// @@ -216,7 +226,9 @@ int FilterSeqsCommand::execute() { /**************************************************************************************/ int FilterSeqsCommand::filterSequences() { try { - + + numSeqs = 0; + for (int s = 0; s < fastafileNames.size(); s++) { for (int i = 0; i < lines.size(); i++) { delete lines[i]; } lines.clear(); @@ -247,11 +259,15 @@ int FilterSeqsCommand::filterSequences() { if (pid == 0) { //you are the root process setLines(fastafileNames[s]); - + + char bufF[alignmentLength]; + strcpy(bufF, filter.c_str()); + for (int j = 0; j < lines.size(); j++) { //each process if (j != 0) { //don't send to yourself MPI_Send(&lines[j]->start, 1, MPI_INT, j, tag, MPI_COMM_WORLD); //start position in file MPI_Send(&bufferSizes[j], 1, MPI_INT, j, tag, MPI_COMM_WORLD); //how bytes for the read + MPI_Send(bufF, alignmentLength, MPI_CHAR, j, tag, MPI_COMM_WORLD); } } @@ -271,10 +287,14 @@ int FilterSeqsCommand::filterSequences() { }else { //you are a child process //receive your section of file - int startPos, numLines, bufferSize; + int startPos, bufferSize; + char bufF[alignmentLength]; MPI_Recv(&startPos, 1, MPI_INT, 0, tag, MPI_COMM_WORLD, &status); MPI_Recv(&bufferSize, 1, MPI_INT, 0, tag, MPI_COMM_WORLD, &status); - + MPI_Recv(bufF, alignmentLength, MPI_CHAR, 0, tag, MPI_COMM_WORLD, &status); + + filter = bufF; //filter was made by process 0 so other processes need to get it + //read your peice of file char buf2[bufferSize]; MPI_File_read_at(inMPI, startPos, buf2, bufferSize, MPI_CHAR, &status); @@ -305,14 +325,14 @@ int FilterSeqsCommand::filterSequences() { driverRunFilter(filter, filteredFasta, fastafileNames[s], lines[0]); }else{ - setLines(fastafileNames[s]); + setLines(fastafileNames[s]); createProcessesRunFilter(filter, fastafileNames[s]); - + rename((fastafileNames[s] + toString(processIDS[0]) + ".temp").c_str(), filteredFasta.c_str()); //append fasta files for(int i=1;istart); - for(int i=0;inumSeqs;i++){ + for(int i=0;inum;i++){ if (m->control_pressed) { in.close(); out.close(); return 0; } @@ -476,8 +496,7 @@ int FilterSeqsCommand::createProcessesRunFilter(string F, string filename) { /**************************************************************************************/ string FilterSeqsCommand::createFilter() { try { - string filterString = ""; - + string filterString = ""; Filters F; if (soft != 0) { F.setSoft(soft); } @@ -493,44 +512,46 @@ string FilterSeqsCommand::createFilter() { else { F.setFilter(string(alignmentLength, '1')); } numSeqs = 0; - if(trump != '*' || isTrue(vertical) || soft != 0){ for (int s = 0; s < fastafileNames.size(); s++) { + for (int i = 0; i < lines.size(); i++) { delete lines[i]; } lines.clear(); + #ifdef USE_MPI - int pid, rc, ierr; + int pid; int Atag = 1; int Ttag = 2; int Ctag = 3; int Gtag = 4; int Gaptag = 5; int tag = 2001; MPI_Status status; - MPI_File in; - rc = MPI_Comm_size(MPI_COMM_WORLD, &processors); - rc = MPI_Comm_rank(MPI_COMM_WORLD, &pid); + MPI_File inMPI; + MPI_Comm_size(MPI_COMM_WORLD, &processors); + MPI_Comm_rank(MPI_COMM_WORLD, &pid); - char tempFileName[fastafileNames[s].length()]; - strcpy(tempFileName, fastafileNames[s].c_str()); - cout << pid << " tempFileName " << tempFileName << endl; - MPI_File_open(MPI_COMM_WORLD, tempFileName, MPI_MODE_RDONLY, MPI_INFO_NULL, &in); //comm, filename, mode, info, filepointer - cout << pid << " here" << endl; + char* tempFileName = new char(fastafileNames[s].length()); + tempFileName = &(fastafileNames[s][0]); + + MPI_File_open(MPI_COMM_WORLD, tempFileName, MPI_MODE_RDONLY, MPI_INFO_NULL, &inMPI); //comm, filename, mode, info, filepointer + if (pid == 0) { //you are the root process setLines(fastafileNames[s]); - cout << pid << " after setlines" << endl; + for (int j = 0; j < lines.size(); j++) { //each process if (j != 0) { //don't send to yourself MPI_Send(&lines[j]->start, 1, MPI_INT, j, tag, MPI_COMM_WORLD); //start position in file + MPI_Send(&numSeqs, 1, MPI_INT, j, tag, MPI_COMM_WORLD); MPI_Send(&bufferSizes[j], 1, MPI_INT, j, tag, MPI_COMM_WORLD); //how bytes for the read } } - cout << pid << " done sending" << endl; + char buf[bufferSizes[0]]; - MPI_File_read_at(in, 0, buf, bufferSizes[0], MPI_CHAR, &status); - cout << pid << " done reading" << endl; + MPI_File_read_at(inMPI, 0, buf, bufferSizes[0], MPI_CHAR, &status); + string tempBuf = buf; if (tempBuf.length() > bufferSizes[0]) { tempBuf = tempBuf.substr(0, bufferSizes[0]); } MPICreateFilter(F, tempBuf); - cout << pid << "done with mpi create filter " << endl; - if (m->control_pressed) { MPI_File_close(&in); return filterString; } + + if (m->control_pressed) { MPI_File_close(&inMPI); return filterString; } vector temp; temp.resize(alignmentLength+1); @@ -538,7 +559,7 @@ string FilterSeqsCommand::createFilter() { for(int i = 0; i < ((processors-1)*5); i++) { MPI_Recv(&temp[0], (alignmentLength+1), MPI_INT, MPI_ANY_SOURCE, tag, MPI_COMM_WORLD, &status); int receiveTag = temp[temp.size()-1]; //child process added a int to the end to indicate what letter count this is for - + if (receiveTag == Atag) { //you are recieveing the A frequencies for (int k = 0; k < alignmentLength; k++) { F.a[k] += temp[k]; } }else if (receiveTag == Ttag) { //you are recieveing the T frequencies @@ -554,21 +575,22 @@ string FilterSeqsCommand::createFilter() { }else { //i am the child process - cout << pid << endl; + int startPos, bufferSize; - ierr = MPI_Recv(&startPos, 1, MPI_INT, 0, tag, MPI_COMM_WORLD, &status); - ierr = MPI_Recv(&bufferSize, 1, MPI_INT, 0, tag, MPI_COMM_WORLD, &status); - cout << pid << '\t' << startPos << '\t' << bufferSize << endl; + MPI_Recv(&startPos, 1, MPI_INT, 0, tag, MPI_COMM_WORLD, &status); + MPI_Recv(&numSeqs, 1, MPI_INT, 0, tag, MPI_COMM_WORLD, &status); + MPI_Recv(&bufferSize, 1, MPI_INT, 0, tag, MPI_COMM_WORLD, &status); + //send freqs char buf2[bufferSize]; - MPI_File_read_at( in, startPos, buf2, bufferSize, MPI_CHAR, &status); + MPI_File_read_at(inMPI, startPos, buf2, bufferSize, MPI_CHAR, &status); string tempBuf = buf2; if (tempBuf.length() > bufferSize) { tempBuf = tempBuf.substr(0, bufferSize); } MPICreateFilter(F, tempBuf); - cout << pid << "done with mpi create filter " << endl; - if (m->control_pressed) { MPI_File_close(&in); return filterString; } + + if (m->control_pressed) { MPI_File_close(&inMPI); return filterString; } //send my fequency counts F.a.push_back(Atag); @@ -584,7 +606,7 @@ string FilterSeqsCommand::createFilter() { } MPI_Barrier(MPI_COMM_WORLD); - MPI_File_close(&in); + MPI_File_close(&inMPI); #else #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) @@ -596,7 +618,6 @@ string FilterSeqsCommand::createFilter() { numSeqs += numFastaSeqs; - for (int i = 0; i < lines.size(); i++) { delete lines[i]; } lines.clear(); lines.push_back(new linePair(0, numFastaSeqs)); driverCreateFilter(F, fastafileNames[s], lines[0]); @@ -614,7 +635,6 @@ string FilterSeqsCommand::createFilter() { numSeqs += numFastaSeqs; - for (int i = 0; i < lines.size(); i++) { delete lines[i]; } lines.clear(); lines.push_back(new linePair(0, numFastaSeqs)); driverCreateFilter(F, fastafileNames[s], lines[0]); @@ -648,7 +668,7 @@ int FilterSeqsCommand::driverCreateFilter(Filters& F, string filename, linePair* in.seekg(line->start); - for(int i=0;inumSeqs;i++){ + for(int i=0;inum;i++){ if (m->control_pressed) { in.close(); return 1; } @@ -666,7 +686,7 @@ int FilterSeqsCommand::driverCreateFilter(Filters& F, string filename, linePair* } //report progress - if((line->numSeqs) % 100 != 0){ m->mothurOut(toString(line->numSeqs)); m->mothurOutEndLine(); } + if((line->num) % 100 != 0){ m->mothurOut(toString(line->num)); m->mothurOutEndLine(); } in.close(); @@ -751,7 +771,7 @@ int FilterSeqsCommand::createProcessesCreateFilter(Filters& F, string filename) int FilterSeqsCommand::setLines(string filename) { try { - for (int i = 0; i < lines.size(); i++) { delete lines[i]; } lines.clear(); + vector positions; bufferSizes.clear(); diff --git a/filterseqscommand.h b/filterseqscommand.h index 2f318c0..1d2526f 100644 --- a/filterseqscommand.h +++ b/filterseqscommand.h @@ -26,8 +26,8 @@ public: private: struct linePair { int start; - int numSeqs; - linePair(long int i, int j) : start(i), numSeqs(j) {} + int num; + linePair(long int i, long int j) : start(i), num(j) {} }; vector lines; vector processIDS;