From cb9c5129766d46fa1a266f724c1ebd41047f3a03 Mon Sep 17 00:00:00 2001 From: westcott Date: Wed, 17 Mar 2010 14:59:45 +0000 Subject: [PATCH] testing --- distancecommand.cpp | 2 +- filterseqscommand.cpp | 419 ++++++++++++++++++++++++++++++++---------- filterseqscommand.h | 7 +- 3 files changed, 334 insertions(+), 94 deletions(-) diff --git a/distancecommand.cpp b/distancecommand.cpp index e18defe..23fe58c 100644 --- a/distancecommand.cpp +++ b/distancecommand.cpp @@ -193,7 +193,7 @@ int DistanceCommand::execute(){ //each process gets where it should start and stop in the file start = int (sqrt(float(pid)/float(processors)) * numSeqs); end = int (sqrt(float(pid+1)/float(processors)) * numSeqs); - + MPI_File outMPI; int amode=MPI_MODE_CREATE|MPI_MODE_WRONLY; diff --git a/filterseqscommand.cpp b/filterseqscommand.cpp index fa1e93e..6d894d8 100644 --- a/filterseqscommand.cpp +++ b/filterseqscommand.cpp @@ -158,7 +158,6 @@ int FilterSeqsCommand::execute() { try { if (abort == true) { return 0; } - vector outputNames; ifstream inFASTA; openInputFile(fastafileNames[0], inFASTA); @@ -171,6 +170,8 @@ int FilterSeqsCommand::execute() { filter = createFilter(); + if (m->control_pressed) { return 0; } + ofstream outFilter; string filterFile = outputDir + filterFileName + ".filter"; @@ -182,39 +183,8 @@ int FilterSeqsCommand::execute() { ////////////run filter///////////////// - numSeqs = 0; - for (int i = 0; i < fastafileNames.size(); i++) { - ifstream in; - openInputFile(fastafileNames[i], in); - string filteredFasta = outputDir + getRootName(getSimpleName(fastafileNames[i])) + "filter.fasta"; - ofstream outFASTA; - openOutputFile(filteredFasta, outFASTA); - outputNames.push_back(filteredFasta); - - - while(!in.eof()){ - if (m->control_pressed) { in.close(); outFASTA.close(); for(int i = 0; i < outputNames.size(); i++) { remove(outputNames[i].c_str()); } return 0; } - - Sequence seq(in); - if (seq.getName() != "") { - string align = seq.getAligned(); - string filterSeq = ""; - - for(int j=0;j' << seq.getName() << endl << filterSeq << endl; - numSeqs++; - } - gobble(in); - } - outFASTA.close(); - in.close(); - } - + filterSequences(); + int filteredLength = 0; for(int i=0;istart, 1, MPI_INT, j, tag, MPI_COMM_WORLD); //start position in file + MPI_Send(&bufferSizes[j], 1, MPI_INT, j, tag, MPI_COMM_WORLD); //how bytes for the read + } + } + + //read your peice of file + char buf[bufferSizes[0]]; + MPI_File_read_at(inMPI, lines[0]->start, buf, bufferSizes[0], MPI_CHAR, &status); + istringstream iss (buf,istringstream::in); + + //do your part + driverMPIRun(iss, outMPI); + + //wait on chidren + for(int i = 1; i < processors; i++) { + char buf[4]; + MPI_Recv(buf, 4, MPI_CHAR, i, tag, MPI_COMM_WORLD, &status); + } + + }else { //you are a child process + //receive your section of file + int startPos, numLines, bufferSize; + MPI_Recv(&startPos, 1, MPI_INT, 0, tag, MPI_COMM_WORLD, &status); + MPI_Recv(&bufferSize, 1, MPI_INT, 0, tag, MPI_COMM_WORLD, &status); + + //read your peice of file + char buf2[bufferSize]; + MPI_File_read_at(inMPI, startPos, buf2, bufferSize, MPI_CHAR, &status); + istringstream iss (buf2,istringstream::in); + + //do your part + driverMPIRun(iss, outMPI); + + char buf[4]; + strcpy(buf, "done"); + + //tell parent you are done. + MPI_Send(buf, 4, MPI_CHAR, 0, tag, MPI_COMM_WORLD); + } + + MPI_File_close(&outMPI); + MPI_File_close(&inMPI); + +#else + #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) + if(processors == 1){ + ifstream inFASTA; + openInputFile(fastafileNames[s], inFASTA); + int numFastaSeqs=count(istreambuf_iterator(inFASTA),istreambuf_iterator(), '>'); + inFASTA.close(); + + lines.push_back(new linePair(0, numFastaSeqs)); + + driverRunFilter(filter, filteredFasta, fastafileNames[s], lines[0]); + }else{ + setLines(fastafileNames[s]); + createProcessesRunFilter(filter, fastafileNames[s]); + + rename((fastafileNames[s] + toString(processIDS[0]) + ".temp").c_str(), filteredFasta.c_str()); + + //append fasta files + for(int i=1;icontrol_pressed) { return 1; } + #else + ifstream inFASTA; + openInputFile(fastafileNames[s], inFASTA); + int numFastaSeqs=count(istreambuf_iterator(inFASTA),istreambuf_iterator(), '>'); + inFASTA.close(); + + lines.push_back(new linePair(0, numFastaSeqs)); + + driverRunFilter(filter, filteredFasta, fastafileNames[s], lines[0]); + + if (m->control_pressed) { return 1; } + #endif +#endif + outputNames.push_back(filteredFasta); + } + + return 0; + } + catch(exception& e) { + m->errorOut(e, "FilterSeqsCommand", "filterSequences"); + exit(1); + } +} +/**************************************************************************************/ +int FilterSeqsCommand::driverMPIRun(istringstream& in, MPI_File& outMPI) { + try { + string outputString = ""; + int count = 0; + MPI_Status status; + + while (!in.eof()) { + + Sequence seq(in); gobble(in); + + if (seq.getName() != "") { + string align = seq.getAligned(); + string filterSeq = ""; + + for(int j=0;jerrorOut(e, "FilterSeqsCommand", "driverRunFilter"); + exit(1); + } +} +/**************************************************************************************/ +int FilterSeqsCommand::driverRunFilter(string F, string outputFilename, string inputFilename, linePair* line) { + try { + ofstream out; + openOutputFile(outputFilename, out); + + ifstream in; + openInputFile(inputFilename, in); + + in.seekg(line->start); + + for(int i=0;inumSeqs;i++){ + + if (m->control_pressed) { in.close(); out.close(); return 0; } + + Sequence seq(in); + if (seq.getName() != "") { + string align = seq.getAligned(); + string filterSeq = ""; + + for(int j=0;j' << seq.getName() << endl << filterSeq << endl; + } + gobble(in); + } + out.close(); + in.close(); + + return 0; + } + catch(exception& e) { + m->errorOut(e, "FilterSeqsCommand", "driverRunFilter"); + exit(1); + } +} +/**************************************************************************************************/ + +int FilterSeqsCommand::createProcessesRunFilter(string F, string filename) { + try { +#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) + int process = 0; + int exitCommand = 1; + processIDS.clear(); + + //loop through and create all the processes you want + while (process != processors) { + int pid = fork(); + + if (pid > 0) { + processIDS.push_back(pid); //create map from line number to pid so you can append files in correct order later + process++; + }else if (pid == 0){ + string filteredFasta = filename + toString(getpid()) + ".temp"; + driverRunFilter(F, filteredFasta, filename, lines[process]); + exit(0); + }else { m->mothurOut("unable to spawn the necessary processes."); m->mothurOutEndLine(); exit(0); } + } + + //force parent to wait until all the processes are done + for (int i=0;ierrorOut(e, "FilterSeqsCommand", "createProcessesRunFilter"); + exit(1); + } +} +/**************************************************************************************/ string FilterSeqsCommand::createFilter() { try { string filterString = ""; @@ -267,8 +497,6 @@ string FilterSeqsCommand::createFilter() { if(trump != '*' || isTrue(vertical) || soft != 0){ for (int s = 0; s < fastafileNames.size(); s++) { - for (int i = 0; i < lines.size(); i++) { delete lines[i]; } lines.clear(); - #ifdef USE_MPI int pid, rc, ierr; int Atag = 1; int Ttag = 2; int Ctag = 3; int Gtag = 4; int Gaptag = 5; @@ -278,47 +506,39 @@ string FilterSeqsCommand::createFilter() { MPI_File in; rc = MPI_Comm_size(MPI_COMM_WORLD, &processors); rc = MPI_Comm_rank(MPI_COMM_WORLD, &pid); - - char* tempFileName = new char(fastafileNames[s].length()); - tempFileName = &(fastafileNames[s][0]); - + char tempFileName[fastafileNames[s].length()]; + strcpy(tempFileName, fastafileNames[s].c_str()); + cout << pid << " tempFileName " << tempFileName << endl; MPI_File_open(MPI_COMM_WORLD, tempFileName, MPI_MODE_RDONLY, MPI_INFO_NULL, &in); //comm, filename, mode, info, filepointer - + cout << pid << " here" << endl; if (pid == 0) { //you are the root process setLines(fastafileNames[s]); - + cout << pid << " after setlines" << endl; for (int j = 0; j < lines.size(); j++) { //each process if (j != 0) { //don't send to yourself MPI_Send(&lines[j]->start, 1, MPI_INT, j, tag, MPI_COMM_WORLD); //start position in file - MPI_Send(&lines[j]->numSeqs, 1, MPI_INT, j, tag, MPI_COMM_WORLD); //how many sequences we are sending MPI_Send(&bufferSizes[j], 1, MPI_INT, j, tag, MPI_COMM_WORLD); //how bytes for the read } } - + cout << pid << " done sending" << endl; char buf[bufferSizes[0]]; MPI_File_read_at(in, 0, buf, bufferSizes[0], MPI_CHAR, &status); - - MPICreateFilter(F, buf); + cout << pid << " done reading" << endl; + string tempBuf = buf; + if (tempBuf.length() > bufferSizes[0]) { tempBuf = tempBuf.substr(0, bufferSizes[0]); } + + MPICreateFilter(F, tempBuf); + cout << pid << "done with mpi create filter " << endl; + if (m->control_pressed) { MPI_File_close(&in); return filterString; } vector temp; temp.resize(alignmentLength+1); //get the frequencies from the child processes for(int i = 0; i < ((processors-1)*5); i++) { - cout << "i = " << i << endl; - //vector trial; trial.resize(10); - //cout << "trials size = " << trial.size() << endl; - //int ierr = MPI_Recv(&trial[0], 10, MPI_INT, MPI_ANY_SOURCE, tag, MPI_COMM_WORLD, &status); - int ierr = MPI_Recv(&temp[0], (alignmentLength+1), MPI_INT, MPI_ANY_SOURCE, tag, MPI_COMM_WORLD, &status); - cout << "recieved something" << endl; - //for (int g = 0; g < trial.size(); g++) { cout << trial[g] << '\t'; } cout << endl; + MPI_Recv(&temp[0], (alignmentLength+1), MPI_INT, MPI_ANY_SOURCE, tag, MPI_COMM_WORLD, &status); int receiveTag = temp[temp.size()-1]; //child process added a int to the end to indicate what letter count this is for - cout << "reciveve tag = " << receiveTag << endl; - for (int k = 0; k < alignmentLength; k++) { cout << k << '\t' << temp[k] << endl; } - cout << "done " << endl << endl; - - int sender = status.MPI_SOURCE; - + if (receiveTag == Atag) { //you are recieveing the A frequencies for (int k = 0; k < alignmentLength; k++) { F.a[k] += temp[k]; } }else if (receiveTag == Ttag) { //you are recieveing the T frequencies @@ -330,22 +550,25 @@ string FilterSeqsCommand::createFilter() { }else if (receiveTag == Gaptag) { //you are recieveing the gap frequencies for (int k = 0; k < alignmentLength; k++) { F.gap[k] += temp[k]; } } - - m->mothurOut("receive tag = " + toString(receiveTag) + " " + toString(sender) + " is complete."); m->mothurOutEndLine(); } }else { //i am the child process - int startPos, numLines, bufferSize; + cout << pid << endl; + int startPos, bufferSize; ierr = MPI_Recv(&startPos, 1, MPI_INT, 0, tag, MPI_COMM_WORLD, &status); - ierr = MPI_Recv(&numLines, 1, MPI_INT, 0, tag, MPI_COMM_WORLD, &status); ierr = MPI_Recv(&bufferSize, 1, MPI_INT, 0, tag, MPI_COMM_WORLD, &status); - + cout << pid << '\t' << startPos << '\t' << bufferSize << endl; //send freqs char buf2[bufferSize]; MPI_File_read_at( in, startPos, buf2, bufferSize, MPI_CHAR, &status); - - MPICreateFilter(F, buf2); + + string tempBuf = buf2; + if (tempBuf.length() > bufferSize) { tempBuf = tempBuf.substr(0, bufferSize); } + + MPICreateFilter(F, tempBuf); + cout << pid << "done with mpi create filter " << endl; + if (m->control_pressed) { MPI_File_close(&in); return filterString; } //send my fequency counts F.a.push_back(Atag); @@ -361,6 +584,7 @@ string FilterSeqsCommand::createFilter() { } MPI_Barrier(MPI_COMM_WORLD); + MPI_File_close(&in); #else #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) @@ -372,14 +596,16 @@ string FilterSeqsCommand::createFilter() { numSeqs += numFastaSeqs; + for (int i = 0; i < lines.size(); i++) { delete lines[i]; } lines.clear(); lines.push_back(new linePair(0, numFastaSeqs)); driverCreateFilter(F, fastafileNames[s], lines[0]); }else{ - setLines(fastafileNames[s]); createProcessesCreateFilter(F, fastafileNames[s]); } + + if (m->control_pressed) { return filterString; } #else ifstream inFASTA; openInputFile(fastafileNames[s], inFASTA); @@ -388,22 +614,22 @@ string FilterSeqsCommand::createFilter() { numSeqs += numFastaSeqs; + for (int i = 0; i < lines.size(); i++) { delete lines[i]; } lines.clear(); lines.push_back(new linePair(0, numFastaSeqs)); - driverCreateFilter(F, lines[0], fastafileNames[s]); + driverCreateFilter(F, fastafileNames[s], lines[0]); + if (m->control_pressed) { return filterString; } #endif #endif } } - - cout << "made it here, numSeqs = " << numSeqs << endl; F.setNumSeqs(numSeqs); if(isTrue(vertical) == 1) { F.doVertical(); } if(soft != 0) { F.doSoft(); } -//cout << "Filter String = " << F.getFilter() << endl; + filterString = F.getFilter(); return filterString; @@ -428,6 +654,8 @@ int FilterSeqsCommand::driverCreateFilter(Filters& F, string filename, linePair* Sequence seq(in); if (seq.getName() != "") { + if (seq.getAligned().length() != alignmentLength) { m->mothurOut("Sequences are not all the same length, please correct."); m->mothurOutEndLine(); m->control_pressed = true; } + if(trump != '*'){ F.doTrump(seq); } if(isTrue(vertical) || soft != 0){ F.getFreqs(seq); } cout.flush(); @@ -457,7 +685,9 @@ int FilterSeqsCommand::MPICreateFilter(Filters& F, string input) { parseBuffer(input, seqStrings); for(int i=0;icontrol_pressed = true; } + if (m->control_pressed) { return 1; } Sequence seq("", seqStrings[i]); @@ -488,7 +718,7 @@ int FilterSeqsCommand::createProcessesCreateFilter(Filters& F, string filename) #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) int process = 0; int exitCommand = 1; - vector processIDS; + processIDS.clear(); //loop through and create all the processes you want while (process != processors) { @@ -521,31 +751,37 @@ int FilterSeqsCommand::createProcessesCreateFilter(Filters& F, string filename) int FilterSeqsCommand::setLines(string filename) { try { - vector positions; - map buf; + for (int i = 0; i < lines.size(); i++) { delete lines[i]; } lines.clear(); + vector positions; bufferSizes.clear(); - int pid; - MPI_Comm_rank(MPI_COMM_WORLD, &pid); - ifstream inFASTA; openInputFile(filename, inFASTA); string input; - int numbuf = 0; while(!inFASTA.eof()){ input = getline(inFASTA); if (input.length() != 0) { - numbuf += input.length(); - if(input[0] == '>'){ long int pos = inFASTA.tellg(); positions.push_back(pos - input.length() - 1); buf[(pos - input.length() - 1)] = numbuf; } + if(input[0] == '>'){ long int pos = inFASTA.tellg(); positions.push_back(pos - input.length() - 1); } } } - inFASTA.close(); int numFastaSeqs = positions.size(); - + + FILE * pFile; + long size; + + //get num bytes in file + pFile = fopen (filename.c_str(),"rb"); + if (pFile==NULL) perror ("Error opening file"); + else{ + fseek (pFile, 0, SEEK_END); + size=ftell (pFile); + fclose (pFile); + } + numSeqs += numFastaSeqs; int numSeqsPerProcessor = numFastaSeqs / processors; @@ -555,10 +791,10 @@ int FilterSeqsCommand::setLines(string filename) { long int startPos = positions[ i * numSeqsPerProcessor ]; if(i == processors - 1){ numSeqsPerProcessor = numFastaSeqs - i * numSeqsPerProcessor; - bufferSizes.push_back(numbuf-buf[startPos]); + bufferSizes.push_back(size - startPos); }else{ - int myEnd = positions[ (i+1) * numSeqsPerProcessor ]; - bufferSizes.push_back(buf[myEnd]-buf[startPos]); + long int myEnd = positions[ (i+1) * numSeqsPerProcessor ]; + bufferSizes.push_back(myEnd-startPos); } lines.push_back(new linePair(startPos, numSeqsPerProcessor)); } @@ -572,17 +808,16 @@ int FilterSeqsCommand::setLines(string filename) { } /**************************************************************************************************/ int FilterSeqsCommand::parseBuffer(string file, vector& seqs) { - try { - - istringstream iss (file,istringstream::in); + try { + istringstream iss (file); //,istringstream::in string name, seqstring; - - while (iss) { + + while (!iss.eof()) { if (m->control_pressed) { return 0; } Sequence seq(iss); gobble(iss); - + if (seq.getName() != "") { seqs.push_back(seq.getAligned()); } diff --git a/filterseqscommand.h b/filterseqscommand.h index 3cc007c..2f318c0 100644 --- a/filterseqscommand.h +++ b/filterseqscommand.h @@ -30,12 +30,13 @@ private: linePair(long int i, int j) : start(i), numSeqs(j) {} }; vector lines; - + vector processIDS; string vertical, filter, fasta, hard, outputDir, filterFileName; vector fastafileNames; int alignmentLength, processors; vector bufferSizes; + vector outputNames; char trump; bool abort; @@ -43,8 +44,12 @@ private: int numSeqs; string createFilter(); + int filterSequences(); int createProcessesCreateFilter(Filters&, string); + int createProcessesRunFilter(string, string); int driverCreateFilter(Filters&, string, linePair*); + int driverRunFilter(string, string, string, linePair*); + int driverMPIRun(istringstream&, MPI_File&); int MPICreateFilter(Filters&, string); int setLines(string); int parseBuffer(string, vector&); -- 2.39.2