X-Git-Url: https://git.donarmstrong.com/?a=blobdiff_plain;f=screenseqscommand.cpp;h=9494865c4f66e302182b8fd221e0b44951cb2518;hb=5d00ef3e809832f08efdd691a9eace8ac20feb07;hp=41067396c6f5333faeaf3346afd9b865ad351531;hpb=755185afe1c287b8c6eddf9eedd293a38fc9f998;p=mothur.git diff --git a/screenseqscommand.cpp b/screenseqscommand.cpp index 4106739..9494865 100644 --- a/screenseqscommand.cpp +++ b/screenseqscommand.cpp @@ -8,7 +8,7 @@ */ #include "screenseqscommand.h" -#include "sequence.hpp" + //********************************************************************************************************************** vector ScreenSeqsCommand::setParameters(){ @@ -289,16 +289,25 @@ int ScreenSeqsCommand::execute(){ } else { #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) - positions = m->divideFile(fastafile, processors); - for (int i = 0; i < (positions.size()-1); i++) { - lines.push_back(new linePair(positions[i], positions[(i+1)])); - } + positions = m->divideFile(fastafile, processors); + for (int i = 0; i < (positions.size()-1); i++) { lines.push_back(linePair(positions[i], positions[(i+1)])); } #else - positions.push_back(0); positions.push_back(1000); - lines.push_back(new linePair(0, 1000)); + if(processors == 1){ lines.push_back(linePair(0, 1000)); } + else { + int numFastaSeqs = 0; + positions = m->setFilePosFasta(fastafile, numFastaSeqs); + + //figure out how many sequences you have to process + int numSeqsPerProcessor = numFastaSeqs / processors; + for (int i = 0; i < processors; i++) { + int startIndex = i * numSeqsPerProcessor; + if(i == (processors - 1)){ numSeqsPerProcessor = numFastaSeqs - i * numSeqsPerProcessor; } + lines.push_back(linePair(positions[startIndex], numSeqsPerProcessor)); + } + } #endif } - + string goodSeqFile = outputDir + m->getRootName(m->getSimpleName(fastafile)) + "good" + m->getExtension(fastafile); string badAccnosFile = outputDir + m->getRootName(m->getSimpleName(fastafile)) + "bad.accnos"; @@ -351,29 +360,16 @@ int ScreenSeqsCommand::execute(){ numSeqsPerProcessor = numFastaSeqs / processors; int startIndex = pid * numSeqsPerProcessor; if(pid == (processors - 1)){ numSeqsPerProcessor = numFastaSeqs - pid * numSeqsPerProcessor; } - // cout << pid << '\t' << numSeqsPerProcessor << '\t' << startIndex << endl; + //align your part driverMPI(startIndex, numSeqsPerProcessor, inMPI, outMPIGood, outMPIBadAccnos, MPIPos, badSeqNames); - //cout << pid << " done" << endl; + if (m->control_pressed) { MPI_File_close(&inMPI); MPI_File_close(&outMPIGood); MPI_File_close(&outMPIBadAccnos); return 0; } for (int i = 1; i < processors; i++) { - //get bad lists int badSize; MPI_Recv(&badSize, 1, MPI_INT, i, tag, MPI_COMM_WORLD, &status); - /*for (int j = 0; j < badSize; j++) { - int length; - MPI_Recv(&length, 1, MPI_INT, i, tag, MPI_COMM_WORLD, &status); //recv the length of the name - char* buf2 = new char[length]; //make space to recieve it - MPI_Recv(buf2, length, MPI_CHAR, i, tag, MPI_COMM_WORLD, &status); //get name - - string tempBuf = buf2; - if (tempBuf.length() > length) { tempBuf = tempBuf.substr(0, length); } - delete buf2; - - badSeqNames.insert(tempBuf); - }*/ } }else{ //you are a child process MPI_Recv(&numFastaSeqs, 1, MPI_INT, 0, tag, MPI_COMM_WORLD, &status); @@ -384,27 +380,15 @@ int ScreenSeqsCommand::execute(){ numSeqsPerProcessor = numFastaSeqs / processors; int startIndex = pid * numSeqsPerProcessor; if(pid == (processors - 1)){ numSeqsPerProcessor = numFastaSeqs - pid * numSeqsPerProcessor; } - //cout << pid << '\t' << numSeqsPerProcessor << '\t' << startIndex << endl; + //align your part driverMPI(startIndex, numSeqsPerProcessor, inMPI, outMPIGood, outMPIBadAccnos, MPIPos, badSeqNames); -//cout << pid << " done" << endl; + if (m->control_pressed) { MPI_File_close(&inMPI); MPI_File_close(&outMPIGood); MPI_File_close(&outMPIBadAccnos); return 0; } //send bad list int badSize = badSeqNames.size(); MPI_Send(&badSize, 1, MPI_INT, 0, tag, MPI_COMM_WORLD); - - /* - set::iterator it; - for (it = badSeqNames.begin(); it != badSeqNames.end(); it++) { - string name = *it; - int length = name.length(); - char* buf2 = new char[length]; - memcpy(buf2, name.c_str(), length); - - MPI_Send(&length, 1, MPI_INT, 0, tag, MPI_COMM_WORLD); - MPI_Send(buf2, length, MPI_CHAR, 0, tag, MPI_COMM_WORLD); - }*/ } //close files @@ -415,52 +399,16 @@ int ScreenSeqsCommand::execute(){ #else - #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) + //#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) if(processors == 1){ - numFastaSeqs = driver(lines[0], goodSeqFile, badAccnosFile, fastafile, badSeqNames); - - if (m->control_pressed) { m->mothurRemove(goodSeqFile); return 0; } - + numFastaSeqs = driver(lines[0], goodSeqFile, badAccnosFile, fastafile, badSeqNames); }else{ - processIDS.resize(0); - numFastaSeqs = createProcesses(goodSeqFile, badAccnosFile, fastafile, badSeqNames); - - rename((goodSeqFile + toString(processIDS[0]) + ".temp").c_str(), goodSeqFile.c_str()); - rename((badAccnosFile + toString(processIDS[0]) + ".temp").c_str(), badAccnosFile.c_str()); - - //append alignment and report files - for(int i=1;iappendFiles((goodSeqFile + toString(processIDS[i]) + ".temp"), goodSeqFile); - m->mothurRemove((goodSeqFile + toString(processIDS[i]) + ".temp")); - - m->appendFiles((badAccnosFile + toString(processIDS[i]) + ".temp"), badAccnosFile); - m->mothurRemove((badAccnosFile + toString(processIDS[i]) + ".temp")); - } - - if (m->control_pressed) { m->mothurRemove(goodSeqFile); return 0; } - - //read badSeqs in because root process doesnt know what other "bad" seqs the children found - ifstream inBad; - int ableToOpen = m->openInputFile(badAccnosFile, inBad, "no error"); - - if (ableToOpen == 0) { - badSeqNames.clear(); - string tempName; - while (!inBad.eof()) { - inBad >> tempName; m->gobble(inBad); - badSeqNames.insert(tempName); - } - inBad.close(); - } - } - #else - numFastaSeqs = driver(lines[0], goodSeqFile, badAccnosFile, fastafile, badSeqNames); - - if (m->control_pressed) { m->mothurRemove(goodSeqFile); return 0; } - - #endif - + } + //#else + // numFastaSeqs = driver(lines[0], goodSeqFile, badAccnosFile, fastafile, badSeqNames); + //#endif + if (m->control_pressed) { m->mothurRemove(goodSeqFile); return 0; } #endif #ifdef USE_MPI @@ -669,14 +617,24 @@ int ScreenSeqsCommand::getSummary(vector& positions){ vector ambigBases; vector longHomoPolymer; + vector positions; #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) - vector positions = m->divideFile(fastafile, processors); - - for (int i = 0; i < (positions.size()-1); i++) { - lines.push_back(new linePair(positions[i], positions[(i+1)])); - } + positions = m->divideFile(fastafile, processors); + for (int i = 0; i < (positions.size()-1); i++) { lines.push_back(linePair(positions[i], positions[(i+1)])); } #else - lines.push_back(new linePair(0, 1000)); + if(processors == 1){ lines.push_back(linePair(0, 1000)); } + else { + int numFastaSeqs = 0; + positions = m->setFilePosFasta(fastafile, numFastaSeqs); + + //figure out how many sequences you have to process + int numSeqsPerProcessor = numFastaSeqs / processors; + for (int i = 0; i < processors; i++) { + int startIndex = i * numSeqsPerProcessor; + if(i == (processors - 1)){ numSeqsPerProcessor = numFastaSeqs - i * numSeqsPerProcessor; } + lines.push_back(linePair(positions[startIndex], numSeqsPerProcessor)); + } + } #endif #ifdef USE_MPI @@ -687,7 +645,7 @@ int ScreenSeqsCommand::getSummary(vector& positions){ driverCreateSummary(startPosition, endPosition, seqLength, ambigBases, longHomoPolymer, fastafile, lines[0]); #else int numSeqs = 0; - #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) + //#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) if(processors == 1){ numSeqs = driverCreateSummary(startPosition, endPosition, seqLength, ambigBases, longHomoPolymer, fastafile, lines[0]); }else{ @@ -695,10 +653,10 @@ int ScreenSeqsCommand::getSummary(vector& positions){ } if (m->control_pressed) { return 0; } - #else - numSeqs = driverCreateSummary(startPosition, endPosition, seqLength, ambigBases, longHomoPolymer, fastafile, lines[0]); - if (m->control_pressed) { return 0; } - #endif + //#else + // numSeqs = driverCreateSummary(startPosition, endPosition, seqLength, ambigBases, longHomoPolymer, fastafile, lines[0]); + // if (m->control_pressed) { return 0; } + //#endif #endif sort(startPosition.begin(), startPosition.end()); sort(endPosition.begin(), endPosition.end()); @@ -753,13 +711,13 @@ int ScreenSeqsCommand::getSummary(vector& positions){ } } /**************************************************************************************/ -int ScreenSeqsCommand::driverCreateSummary(vector& startPosition, vector& endPosition, vector& seqLength, vector& ambigBases, vector& longHomoPolymer, string filename, linePair* filePos) { +int ScreenSeqsCommand::driverCreateSummary(vector& startPosition, vector& endPosition, vector& seqLength, vector& ambigBases, vector& longHomoPolymer, string filename, linePair filePos) { try { ifstream in; m->openInputFile(filename, in); - in.seekg(filePos->start); + in.seekg(filePos.start); bool done = false; int count = 0; @@ -794,7 +752,7 @@ int ScreenSeqsCommand::driverCreateSummary(vector& startPosition, vectormothurOut("Optimizing sequence: " + toString(count)); m->mothurOutEndLine(); } #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) unsigned long long pos = in.tellg(); - if ((pos == -1) || (pos >= filePos->end)) { break; } + if ((pos == -1) || (pos >= filePos.end)) { break; } #else if (in.eof()) { break; } #endif @@ -813,11 +771,13 @@ int ScreenSeqsCommand::driverCreateSummary(vector& startPosition, vector& startPosition, vector& endPosition, vector& seqLength, vector& ambigBases, vector& longHomoPolymer, string filename) { try { -#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) - int process = 1; + + int process = 1; int num = 0; - processIDS.clear(); - + vector processIDS; + +#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) + //loop through and create all the processes you want while (process != processors) { int pid = fork(); @@ -878,8 +838,50 @@ int ScreenSeqsCommand::createProcessesCreateSummary(vector& startPosition, m->mothurRemove(tempFilename); } - return num; + +#else + ////////////////////////////////////////////////////////////////////////////////////////////////////// + //Windows version shared memory, so be careful when passing variables through the seqSumData struct. + //Above fork() will clone, so memory is separate, but that's not the case with windows, + //Taking advantage of shared memory to allow both threads to add info to vectors. + ////////////////////////////////////////////////////////////////////////////////////////////////////// + + vector pDataArray; + DWORD dwThreadIdArray[processors-1]; + HANDLE hThreadArray[processors-1]; + + //Create processor worker threads. + for( int i=0; icount; + for (int k = 0; k < pDataArray[i]->startPosition.size(); k++) { startPosition.push_back(pDataArray[i]->startPosition[k]); } + for (int k = 0; k < pDataArray[i]->endPosition.size(); k++) { endPosition.push_back(pDataArray[i]->endPosition[k]); } + for (int k = 0; k < pDataArray[i]->seqLength.size(); k++) { seqLength.push_back(pDataArray[i]->seqLength[k]); } + for (int k = 0; k < pDataArray[i]->ambigBases.size(); k++) { ambigBases.push_back(pDataArray[i]->ambigBases[k]); } + for (int k = 0; k < pDataArray[i]->longHomoPolymer.size(); k++) { longHomoPolymer.push_back(pDataArray[i]->longHomoPolymer[k]); } + CloseHandle(hThreadArray[i]); + delete pDataArray[i]; + } + #endif + return num; } catch(exception& e) { m->errorOut(e, "ScreenSeqsCommand", "createProcessesCreateSummary"); @@ -1128,7 +1130,7 @@ int ScreenSeqsCommand::screenQual(set badSeqNames){ } //********************************************************************************************************************** -int ScreenSeqsCommand::driver(linePair* filePos, string goodFName, string badAccnosFName, string filename, set& badSeqNames){ +int ScreenSeqsCommand::driver(linePair filePos, string goodFName, string badAccnosFName, string filename, set& badSeqNames){ try { ofstream goodFile; m->openOutputFile(goodFName, goodFile); @@ -1139,7 +1141,7 @@ int ScreenSeqsCommand::driver(linePair* filePos, string goodFName, string badAcc ifstream inFASTA; m->openInputFile(filename, inFASTA); - inFASTA.seekg(filePos->start); + inFASTA.seekg(filePos.start); bool done = false; int count = 0; @@ -1170,7 +1172,7 @@ int ScreenSeqsCommand::driver(linePair* filePos, string goodFName, string badAcc #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) unsigned long long pos = inFASTA.tellg(); - if ((pos == -1) || (pos >= filePos->end)) { break; } + if ((pos == -1) || (pos >= filePos.end)) { break; } #else if (inFASTA.eof()) { break; } #endif @@ -1275,10 +1277,13 @@ int ScreenSeqsCommand::driverMPI(int start, int num, MPI_File& inMPI, MPI_File& int ScreenSeqsCommand::createProcesses(string goodFileName, string badAccnos, string filename, set& badSeqNames) { try { -#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) - int process = 0; + + vector processIDS; + int process = 1; int num = 0; - + +#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) + //loop through and create all the processes you want while (process != processors) { int pid = fork(); @@ -1304,8 +1309,10 @@ int ScreenSeqsCommand::createProcesses(string goodFileName, string badAccnos, st } } + num = driver(lines[0], goodFileName, badAccnos, filename, badSeqNames); + //force parent to wait until all the processes are done - for (int i=0;iopenInputFile(tempFile, in); if (!in.eof()) { int tempNum = 0; in >> tempNum; num += tempNum; } in.close(); m->mothurRemove(tempFile); + + m->appendFiles((goodFileName + toString(processIDS[i]) + ".temp"), goodFileName); + m->mothurRemove((goodFileName + toString(processIDS[i]) + ".temp")); + + m->appendFiles((badAccnos + toString(processIDS[i]) + ".temp"), badAccnos); + m->mothurRemove((badAccnos + toString(processIDS[i]) + ".temp")); } - return num; -#endif + //read badSeqs in because root process doesnt know what other "bad" seqs the children found + ifstream inBad; + int ableToOpen = m->openInputFile(badAccnos, inBad, "no error"); + + if (ableToOpen == 0) { + badSeqNames.clear(); + string tempName; + while (!inBad.eof()) { + inBad >> tempName; m->gobble(inBad); + badSeqNames.insert(tempName); + } + inBad.close(); + } +#else + + ////////////////////////////////////////////////////////////////////////////////////////////////////// + //Windows version shared memory, so be careful when passing variables through the sumScreenData struct. + //Above fork() will clone, so memory is separate, but that's not the case with windows, + //Taking advantage of shared memory to allow both threads to add info to badSeqNames. + ////////////////////////////////////////////////////////////////////////////////////////////////////// + + vector pDataArray; + DWORD dwThreadIdArray[processors-1]; + HANDLE hThreadArray[processors-1]; + + //Create processor worker threads. + for( int i=0; icount; + CloseHandle(hThreadArray[i]); + delete pDataArray[i]; + } + + for (int i = 0; i < processIDS.size(); i++) { + m->appendFiles((goodFileName + toString(processIDS[i]) + ".temp"), goodFileName); + m->mothurRemove((goodFileName + toString(processIDS[i]) + ".temp")); + + m->appendFiles((badAccnos + toString(processIDS[i]) + ".temp"), badAccnos); + m->mothurRemove((badAccnos + toString(processIDS[i]) + ".temp")); + } + +#endif + + return num; + } catch(exception& e) { m->errorOut(e, "ScreenSeqsCommand", "createProcesses");