From 5d00ef3e809832f08efdd691a9eace8ac20feb07 Mon Sep 17 00:00:00 2001 From: Sarah Westcott Date: Fri, 24 Feb 2012 12:49:41 -0500 Subject: [PATCH] paralellized screen.seqs for windows. --- screenseqscommand.cpp | 292 ++++++++++++++++++++++++++---------------- screenseqscommand.h | 205 ++++++++++++++++++++++++++++- seqsummarycommand.cpp | 27 ++-- 3 files changed, 400 insertions(+), 124 deletions(-) diff --git a/screenseqscommand.cpp b/screenseqscommand.cpp index 4106739..9494865 100644 --- a/screenseqscommand.cpp +++ b/screenseqscommand.cpp @@ -8,7 +8,7 @@ */ #include "screenseqscommand.h" -#include "sequence.hpp" + //********************************************************************************************************************** vector ScreenSeqsCommand::setParameters(){ @@ -289,16 +289,25 @@ int ScreenSeqsCommand::execute(){ } else { #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) - positions = m->divideFile(fastafile, processors); - for (int i = 0; i < (positions.size()-1); i++) { - lines.push_back(new linePair(positions[i], positions[(i+1)])); - } + positions = m->divideFile(fastafile, processors); + for (int i = 0; i < (positions.size()-1); i++) { lines.push_back(linePair(positions[i], positions[(i+1)])); } #else - positions.push_back(0); positions.push_back(1000); - lines.push_back(new linePair(0, 1000)); + if(processors == 1){ lines.push_back(linePair(0, 1000)); } + else { + int numFastaSeqs = 0; + positions = m->setFilePosFasta(fastafile, numFastaSeqs); + + //figure out how many sequences you have to process + int numSeqsPerProcessor = numFastaSeqs / processors; + for (int i = 0; i < processors; i++) { + int startIndex = i * numSeqsPerProcessor; + if(i == (processors - 1)){ numSeqsPerProcessor = numFastaSeqs - i * numSeqsPerProcessor; } + lines.push_back(linePair(positions[startIndex], numSeqsPerProcessor)); + } + } #endif } - + string goodSeqFile = outputDir + m->getRootName(m->getSimpleName(fastafile)) + "good" + m->getExtension(fastafile); string badAccnosFile = outputDir + m->getRootName(m->getSimpleName(fastafile)) + "bad.accnos"; @@ -351,29 +360,16 @@ int ScreenSeqsCommand::execute(){ numSeqsPerProcessor = numFastaSeqs / processors; int startIndex = pid * numSeqsPerProcessor; if(pid == (processors - 1)){ numSeqsPerProcessor = numFastaSeqs - pid * numSeqsPerProcessor; } - // cout << pid << '\t' << numSeqsPerProcessor << '\t' << startIndex << endl; + //align your part driverMPI(startIndex, numSeqsPerProcessor, inMPI, outMPIGood, outMPIBadAccnos, MPIPos, badSeqNames); - //cout << pid << " done" << endl; + if (m->control_pressed) { MPI_File_close(&inMPI); MPI_File_close(&outMPIGood); MPI_File_close(&outMPIBadAccnos); return 0; } for (int i = 1; i < processors; i++) { - //get bad lists int badSize; MPI_Recv(&badSize, 1, MPI_INT, i, tag, MPI_COMM_WORLD, &status); - /*for (int j = 0; j < badSize; j++) { - int length; - MPI_Recv(&length, 1, MPI_INT, i, tag, MPI_COMM_WORLD, &status); //recv the length of the name - char* buf2 = new char[length]; //make space to recieve it - MPI_Recv(buf2, length, MPI_CHAR, i, tag, MPI_COMM_WORLD, &status); //get name - - string tempBuf = buf2; - if (tempBuf.length() > length) { tempBuf = tempBuf.substr(0, length); } - delete buf2; - - badSeqNames.insert(tempBuf); - }*/ } }else{ //you are a child process MPI_Recv(&numFastaSeqs, 1, MPI_INT, 0, tag, MPI_COMM_WORLD, &status); @@ -384,27 +380,15 @@ int ScreenSeqsCommand::execute(){ numSeqsPerProcessor = numFastaSeqs / processors; int startIndex = pid * numSeqsPerProcessor; if(pid == (processors - 1)){ numSeqsPerProcessor = numFastaSeqs - pid * numSeqsPerProcessor; } - //cout << pid << '\t' << numSeqsPerProcessor << '\t' << startIndex << endl; + //align your part driverMPI(startIndex, numSeqsPerProcessor, inMPI, outMPIGood, outMPIBadAccnos, MPIPos, badSeqNames); -//cout << pid << " done" << endl; + if (m->control_pressed) { MPI_File_close(&inMPI); MPI_File_close(&outMPIGood); MPI_File_close(&outMPIBadAccnos); return 0; } //send bad list int badSize = badSeqNames.size(); MPI_Send(&badSize, 1, MPI_INT, 0, tag, MPI_COMM_WORLD); - - /* - set::iterator it; - for (it = badSeqNames.begin(); it != badSeqNames.end(); it++) { - string name = *it; - int length = name.length(); - char* buf2 = new char[length]; - memcpy(buf2, name.c_str(), length); - - MPI_Send(&length, 1, MPI_INT, 0, tag, MPI_COMM_WORLD); - MPI_Send(buf2, length, MPI_CHAR, 0, tag, MPI_COMM_WORLD); - }*/ } //close files @@ -415,52 +399,16 @@ int ScreenSeqsCommand::execute(){ #else - #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) + //#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) if(processors == 1){ - numFastaSeqs = driver(lines[0], goodSeqFile, badAccnosFile, fastafile, badSeqNames); - - if (m->control_pressed) { m->mothurRemove(goodSeqFile); return 0; } - + numFastaSeqs = driver(lines[0], goodSeqFile, badAccnosFile, fastafile, badSeqNames); }else{ - processIDS.resize(0); - numFastaSeqs = createProcesses(goodSeqFile, badAccnosFile, fastafile, badSeqNames); - - rename((goodSeqFile + toString(processIDS[0]) + ".temp").c_str(), goodSeqFile.c_str()); - rename((badAccnosFile + toString(processIDS[0]) + ".temp").c_str(), badAccnosFile.c_str()); - - //append alignment and report files - for(int i=1;iappendFiles((goodSeqFile + toString(processIDS[i]) + ".temp"), goodSeqFile); - m->mothurRemove((goodSeqFile + toString(processIDS[i]) + ".temp")); - - m->appendFiles((badAccnosFile + toString(processIDS[i]) + ".temp"), badAccnosFile); - m->mothurRemove((badAccnosFile + toString(processIDS[i]) + ".temp")); - } - - if (m->control_pressed) { m->mothurRemove(goodSeqFile); return 0; } - - //read badSeqs in because root process doesnt know what other "bad" seqs the children found - ifstream inBad; - int ableToOpen = m->openInputFile(badAccnosFile, inBad, "no error"); - - if (ableToOpen == 0) { - badSeqNames.clear(); - string tempName; - while (!inBad.eof()) { - inBad >> tempName; m->gobble(inBad); - badSeqNames.insert(tempName); - } - inBad.close(); - } - } - #else - numFastaSeqs = driver(lines[0], goodSeqFile, badAccnosFile, fastafile, badSeqNames); - - if (m->control_pressed) { m->mothurRemove(goodSeqFile); return 0; } - - #endif - + } + //#else + // numFastaSeqs = driver(lines[0], goodSeqFile, badAccnosFile, fastafile, badSeqNames); + //#endif + if (m->control_pressed) { m->mothurRemove(goodSeqFile); return 0; } #endif #ifdef USE_MPI @@ -669,14 +617,24 @@ int ScreenSeqsCommand::getSummary(vector& positions){ vector ambigBases; vector longHomoPolymer; + vector positions; #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) - vector positions = m->divideFile(fastafile, processors); - - for (int i = 0; i < (positions.size()-1); i++) { - lines.push_back(new linePair(positions[i], positions[(i+1)])); - } + positions = m->divideFile(fastafile, processors); + for (int i = 0; i < (positions.size()-1); i++) { lines.push_back(linePair(positions[i], positions[(i+1)])); } #else - lines.push_back(new linePair(0, 1000)); + if(processors == 1){ lines.push_back(linePair(0, 1000)); } + else { + int numFastaSeqs = 0; + positions = m->setFilePosFasta(fastafile, numFastaSeqs); + + //figure out how many sequences you have to process + int numSeqsPerProcessor = numFastaSeqs / processors; + for (int i = 0; i < processors; i++) { + int startIndex = i * numSeqsPerProcessor; + if(i == (processors - 1)){ numSeqsPerProcessor = numFastaSeqs - i * numSeqsPerProcessor; } + lines.push_back(linePair(positions[startIndex], numSeqsPerProcessor)); + } + } #endif #ifdef USE_MPI @@ -687,7 +645,7 @@ int ScreenSeqsCommand::getSummary(vector& positions){ driverCreateSummary(startPosition, endPosition, seqLength, ambigBases, longHomoPolymer, fastafile, lines[0]); #else int numSeqs = 0; - #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) + //#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) if(processors == 1){ numSeqs = driverCreateSummary(startPosition, endPosition, seqLength, ambigBases, longHomoPolymer, fastafile, lines[0]); }else{ @@ -695,10 +653,10 @@ int ScreenSeqsCommand::getSummary(vector& positions){ } if (m->control_pressed) { return 0; } - #else - numSeqs = driverCreateSummary(startPosition, endPosition, seqLength, ambigBases, longHomoPolymer, fastafile, lines[0]); - if (m->control_pressed) { return 0; } - #endif + //#else + // numSeqs = driverCreateSummary(startPosition, endPosition, seqLength, ambigBases, longHomoPolymer, fastafile, lines[0]); + // if (m->control_pressed) { return 0; } + //#endif #endif sort(startPosition.begin(), startPosition.end()); sort(endPosition.begin(), endPosition.end()); @@ -753,13 +711,13 @@ int ScreenSeqsCommand::getSummary(vector& positions){ } } /**************************************************************************************/ -int ScreenSeqsCommand::driverCreateSummary(vector& startPosition, vector& endPosition, vector& seqLength, vector& ambigBases, vector& longHomoPolymer, string filename, linePair* filePos) { +int ScreenSeqsCommand::driverCreateSummary(vector& startPosition, vector& endPosition, vector& seqLength, vector& ambigBases, vector& longHomoPolymer, string filename, linePair filePos) { try { ifstream in; m->openInputFile(filename, in); - in.seekg(filePos->start); + in.seekg(filePos.start); bool done = false; int count = 0; @@ -794,7 +752,7 @@ int ScreenSeqsCommand::driverCreateSummary(vector& startPosition, vectormothurOut("Optimizing sequence: " + toString(count)); m->mothurOutEndLine(); } #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) unsigned long long pos = in.tellg(); - if ((pos == -1) || (pos >= filePos->end)) { break; } + if ((pos == -1) || (pos >= filePos.end)) { break; } #else if (in.eof()) { break; } #endif @@ -813,11 +771,13 @@ int ScreenSeqsCommand::driverCreateSummary(vector& startPosition, vector& startPosition, vector& endPosition, vector& seqLength, vector& ambigBases, vector& longHomoPolymer, string filename) { try { -#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) - int process = 1; + + int process = 1; int num = 0; - processIDS.clear(); - + vector processIDS; + +#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) + //loop through and create all the processes you want while (process != processors) { int pid = fork(); @@ -878,8 +838,50 @@ int ScreenSeqsCommand::createProcessesCreateSummary(vector& startPosition, m->mothurRemove(tempFilename); } - return num; + +#else + ////////////////////////////////////////////////////////////////////////////////////////////////////// + //Windows version shared memory, so be careful when passing variables through the seqSumData struct. + //Above fork() will clone, so memory is separate, but that's not the case with windows, + //Taking advantage of shared memory to allow both threads to add info to vectors. + ////////////////////////////////////////////////////////////////////////////////////////////////////// + + vector pDataArray; + DWORD dwThreadIdArray[processors-1]; + HANDLE hThreadArray[processors-1]; + + //Create processor worker threads. + for( int i=0; icount; + for (int k = 0; k < pDataArray[i]->startPosition.size(); k++) { startPosition.push_back(pDataArray[i]->startPosition[k]); } + for (int k = 0; k < pDataArray[i]->endPosition.size(); k++) { endPosition.push_back(pDataArray[i]->endPosition[k]); } + for (int k = 0; k < pDataArray[i]->seqLength.size(); k++) { seqLength.push_back(pDataArray[i]->seqLength[k]); } + for (int k = 0; k < pDataArray[i]->ambigBases.size(); k++) { ambigBases.push_back(pDataArray[i]->ambigBases[k]); } + for (int k = 0; k < pDataArray[i]->longHomoPolymer.size(); k++) { longHomoPolymer.push_back(pDataArray[i]->longHomoPolymer[k]); } + CloseHandle(hThreadArray[i]); + delete pDataArray[i]; + } + #endif + return num; } catch(exception& e) { m->errorOut(e, "ScreenSeqsCommand", "createProcessesCreateSummary"); @@ -1128,7 +1130,7 @@ int ScreenSeqsCommand::screenQual(set badSeqNames){ } //********************************************************************************************************************** -int ScreenSeqsCommand::driver(linePair* filePos, string goodFName, string badAccnosFName, string filename, set& badSeqNames){ +int ScreenSeqsCommand::driver(linePair filePos, string goodFName, string badAccnosFName, string filename, set& badSeqNames){ try { ofstream goodFile; m->openOutputFile(goodFName, goodFile); @@ -1139,7 +1141,7 @@ int ScreenSeqsCommand::driver(linePair* filePos, string goodFName, string badAcc ifstream inFASTA; m->openInputFile(filename, inFASTA); - inFASTA.seekg(filePos->start); + inFASTA.seekg(filePos.start); bool done = false; int count = 0; @@ -1170,7 +1172,7 @@ int ScreenSeqsCommand::driver(linePair* filePos, string goodFName, string badAcc #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) unsigned long long pos = inFASTA.tellg(); - if ((pos == -1) || (pos >= filePos->end)) { break; } + if ((pos == -1) || (pos >= filePos.end)) { break; } #else if (inFASTA.eof()) { break; } #endif @@ -1275,10 +1277,13 @@ int ScreenSeqsCommand::driverMPI(int start, int num, MPI_File& inMPI, MPI_File& int ScreenSeqsCommand::createProcesses(string goodFileName, string badAccnos, string filename, set& badSeqNames) { try { -#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) - int process = 0; + + vector processIDS; + int process = 1; int num = 0; - + +#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) + //loop through and create all the processes you want while (process != processors) { int pid = fork(); @@ -1304,8 +1309,10 @@ int ScreenSeqsCommand::createProcesses(string goodFileName, string badAccnos, st } } + num = driver(lines[0], goodFileName, badAccnos, filename, badSeqNames); + //force parent to wait until all the processes are done - for (int i=0;iopenInputFile(tempFile, in); if (!in.eof()) { int tempNum = 0; in >> tempNum; num += tempNum; } in.close(); m->mothurRemove(tempFile); + + m->appendFiles((goodFileName + toString(processIDS[i]) + ".temp"), goodFileName); + m->mothurRemove((goodFileName + toString(processIDS[i]) + ".temp")); + + m->appendFiles((badAccnos + toString(processIDS[i]) + ".temp"), badAccnos); + m->mothurRemove((badAccnos + toString(processIDS[i]) + ".temp")); } - return num; -#endif + //read badSeqs in because root process doesnt know what other "bad" seqs the children found + ifstream inBad; + int ableToOpen = m->openInputFile(badAccnos, inBad, "no error"); + + if (ableToOpen == 0) { + badSeqNames.clear(); + string tempName; + while (!inBad.eof()) { + inBad >> tempName; m->gobble(inBad); + badSeqNames.insert(tempName); + } + inBad.close(); + } +#else + + ////////////////////////////////////////////////////////////////////////////////////////////////////// + //Windows version shared memory, so be careful when passing variables through the sumScreenData struct. + //Above fork() will clone, so memory is separate, but that's not the case with windows, + //Taking advantage of shared memory to allow both threads to add info to badSeqNames. + ////////////////////////////////////////////////////////////////////////////////////////////////////// + + vector pDataArray; + DWORD dwThreadIdArray[processors-1]; + HANDLE hThreadArray[processors-1]; + + //Create processor worker threads. + for( int i=0; icount; + CloseHandle(hThreadArray[i]); + delete pDataArray[i]; + } + + for (int i = 0; i < processIDS.size(); i++) { + m->appendFiles((goodFileName + toString(processIDS[i]) + ".temp"), goodFileName); + m->mothurRemove((goodFileName + toString(processIDS[i]) + ".temp")); + + m->appendFiles((badAccnos + toString(processIDS[i]) + ".temp"), badAccnos); + m->mothurRemove((badAccnos + toString(processIDS[i]) + ".temp")); + } + +#endif + + return num; + } catch(exception& e) { m->errorOut(e, "ScreenSeqsCommand", "createProcesses"); diff --git a/screenseqscommand.h b/screenseqscommand.h index 49d992a..007b6d6 100644 --- a/screenseqscommand.h +++ b/screenseqscommand.h @@ -11,6 +11,7 @@ */ #include "mothur.h" #include "command.hpp" +#include "sequence.hpp" class ScreenSeqsCommand : public Command { @@ -38,8 +39,7 @@ private: linePair(unsigned long long i, unsigned long long j) : start(i), end(j) {} }; - vector processIDS; //processid - vector lines; + vector lines; int screenNameGroupFile(set); int screenGroupFile(set); @@ -47,7 +47,7 @@ private: int screenQual(set); int screenTaxonomy(set); - int driver(linePair*, string, string, string, set&); + int driver(linePair, string, string, string, set&); int createProcesses(string, string, string, set&); #ifdef USE_MPI @@ -64,7 +64,204 @@ private: int getSummary(vector&); int createProcessesCreateSummary(vector&, vector&, vector&, vector&, vector&, string); - int driverCreateSummary(vector&, vector&, vector&, vector&, vector&, string, linePair*); + int driverCreateSummary(vector&, vector&, vector&, vector&, vector&, string, linePair); }; +/**************************************************************************************************/ +//custom data structure for threads to use. +// This is passed by void pointer so it can be any data type +// that can be passed using a single void pointer (LPVOID). +struct sumData { + vector startPosition; + vector endPosition; + vector seqLength; + vector ambigBases; + vector longHomoPolymer; + string filename, namefile; + unsigned long long start; + unsigned long long end; + int count; + MothurOut* m; + map nameMap; + + + sumData(){} + sumData(string f, MothurOut* mout, unsigned long long st, unsigned long long en, string nf, map nam) { + filename = f; + namefile = nf; + m = mout; + start = st; + end = en; + nameMap = nam; + count = 0; + } +}; +/**************************************************************************************************/ +//custom data structure for threads to use. +// This is passed by void pointer so it can be any data type +// that can be passed using a single void pointer (LPVOID). +struct sumScreenData { + int startPos, endPos, maxAmbig, maxHomoP, minLength, maxLength; + unsigned long long start; + unsigned long long end; + int count; + MothurOut* m; + string goodFName, badAccnosFName, filename; + set* badSeqNames; + + + sumScreenData(){} + sumScreenData(int s, int e, int a, int h, int minl, int maxl, string f, MothurOut* mout, unsigned long long st, unsigned long long en, string gf, string bf, set* bn) { + startPos = s; + endPos = e; + minLength = minl; + maxLength = maxl; + maxAmbig = a; + maxHomoP = h; + filename = f; + goodFName = gf; + badAccnosFName = bf; + m = mout; + start = st; + end = en; + badSeqNames = bn; + count = 0; + } +}; + + +/**************************************************************************************************/ +#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) +#else +static DWORD WINAPI MySumThreadFunction(LPVOID lpParam){ + sumData* pDataArray; + pDataArray = (sumData*)lpParam; + + try { + ifstream in; + pDataArray->m->openInputFile(pDataArray->filename, in); + + //print header if you are process 0 + if ((pDataArray->start == 0) || (pDataArray->start == 1)) { + in.seekg(0); + }else { //this accounts for the difference in line endings. + in.seekg(pDataArray->start-1); pDataArray->m->gobble(in); + } + + pDataArray->count = pDataArray->end; + for(int i = 0; i < pDataArray->end; i++){ //end is the number of sequences to process + + if (pDataArray->m->control_pressed) { in.close(); pDataArray->count = 1; return 1; } + + Sequence current(in); pDataArray->m->gobble(in); + + if (current.getName() != "") { + + int num = 1; + if (pDataArray->namefile != "") { + //make sure this sequence is in the namefile, else error + map::iterator it = pDataArray->nameMap.find(current.getName()); + + if (it == pDataArray->nameMap.end()) { pDataArray->m->mothurOut("[ERROR]: " + current.getName() + " is not in your namefile, please correct."); pDataArray->m->mothurOutEndLine(); pDataArray->m->control_pressed = true; } + else { num = it->second; } + } + + //for each sequence this sequence represents + for (int i = 0; i < num; i++) { + pDataArray->startPosition.push_back(current.getStartPos()); + pDataArray->endPosition.push_back(current.getEndPos()); + pDataArray->seqLength.push_back(current.getNumBases()); + pDataArray->ambigBases.push_back(current.getAmbigBases()); + pDataArray->longHomoPolymer.push_back(current.getLongHomoPolymer()); + } + } + } + + in.close(); + + return 0; + + } + catch(exception& e) { + pDataArray->m->errorOut(e, "ScreenSeqsCommand", "MySumThreadFunction"); + exit(1); + } +} + +/**************************************************************************************************/ + +static DWORD WINAPI MySumScreenThreadFunction(LPVOID lpParam){ + sumScreenData* pDataArray; + pDataArray = (sumScreenData*)lpParam; + + try { + + ofstream goodFile; + pDataArray->m->openOutputFile(pDataArray->goodFName, goodFile); + + ofstream badAccnosFile; + pDataArray->m->openOutputFile(pDataArray->badAccnosFName, badAccnosFile); + + ifstream in; + pDataArray->m->openInputFile(pDataArray->filename, in); + + //print header if you are process 0 + if ((pDataArray->start == 0) || (pDataArray->start == 1)) { + in.seekg(0); + }else { //this accounts for the difference in line endings. + in.seekg(pDataArray->start-1); pDataArray->m->gobble(in); + } + + pDataArray->count = pDataArray->end; + for(int i = 0; i < pDataArray->end; i++){ //end is the number of sequences to process + + if (pDataArray->m->control_pressed) { in.close(); badAccnosFile.close(); goodFile.close(); pDataArray->count = 1; return 1; } + + Sequence currSeq(in); pDataArray->m->gobble(in); + + if (currSeq.getName() != "") { + bool goodSeq = 1; // innocent until proven guilty + if(goodSeq == 1 && pDataArray->startPos != -1 && pDataArray->startPos < currSeq.getStartPos()) { goodSeq = 0; } + if(goodSeq == 1 && pDataArray->endPos != -1 && pDataArray->endPos > currSeq.getEndPos()) { goodSeq = 0; } + if(goodSeq == 1 && pDataArray->maxAmbig != -1 && pDataArray->maxAmbig < currSeq.getAmbigBases()) { goodSeq = 0; } + if(goodSeq == 1 && pDataArray->maxHomoP != -1 && pDataArray->maxHomoP < currSeq.getLongHomoPolymer()) { goodSeq = 0; } + if(goodSeq == 1 && pDataArray->minLength != -1 && pDataArray->minLength > currSeq.getNumBases()) { goodSeq = 0; } + if(goodSeq == 1 && pDataArray->maxLength != -1 && pDataArray->maxLength < currSeq.getNumBases()) { goodSeq = 0; } + + if(goodSeq == 1){ + currSeq.printSequence(goodFile); + } + else{ + badAccnosFile << currSeq.getName() << endl; + pDataArray->badSeqNames->insert(currSeq.getName()); + } + + } + //report progress + if((i+1) % 100 == 0){ pDataArray->m->mothurOut("Processing sequence: " + toString(i+1)); pDataArray->m->mothurOutEndLine(); } + } + //report progress + if((pDataArray->count) % 100 != 0){ pDataArray->m->mothurOut("Processing sequence: " + toString(pDataArray->count)); pDataArray->m->mothurOutEndLine(); } + + + + in.close(); + goodFile.close(); + badAccnosFile.close(); + + return 0; + + } + catch(exception& e) { + pDataArray->m->errorOut(e, "ScreenSeqsCommand", "MySumScreenThreadFunction"); + exit(1); + } +} + +#endif + +/**************************************************************************************************/ + + + #endif diff --git a/seqsummarycommand.cpp b/seqsummarycommand.cpp index e8f73ca..37d0fdf 100644 --- a/seqsummarycommand.cpp +++ b/seqsummarycommand.cpp @@ -603,25 +603,29 @@ int SeqSummaryCommand::createProcessesCreateSummary(vector& startPosition, ////////////////////////////////////////////////////////////////////////////////////////////////////// vector pDataArray; - DWORD dwThreadIdArray[processors]; - HANDLE hThreadArray[processors]; + DWORD dwThreadIdArray[processors-1]; + HANDLE hThreadArray[processors-1]; //Create processor worker threads. - for( int i=0; istart << '\t' << lines[i]->end << endl; + for( int i=0; istart, lines[i]->end, namefile, nameMap); + seqSumData* tempSum = new seqSumData(&startPosition, &endPosition, &seqLength, &ambigBases, &longHomoPolymer, filename, (sumFile+extension), m, lines[i]->start, lines[i]->end, namefile, nameMap); pDataArray.push_back(tempSum); - processIDS.push_back(i); - + //MySeqSumThreadFunction is in header. It must be global or static to work with the threads. //default security attributes, thread function name, argument to thread function, use default creation flags, returns the thread identifier hThreadArray[i] = CreateThread(NULL, 0, MySeqSumThreadFunction, pDataArray[i], 0, &dwThreadIdArray[i]); } - + + //do your part + num = driverCreateSummary(startPosition, endPosition, seqLength, ambigBases, longHomoPolymer, fastafile, (sumFile+toString(processors-1)+".temp"), lines[processors-1]); + processIDS.push_back(processors-1); + //Wait until all threads have terminated. - WaitForMultipleObjects(processors, hThreadArray, TRUE, INFINITE); + WaitForMultipleObjects(processors-1, hThreadArray, TRUE, INFINITE); //Close all thread handles and free memory allocations. for(int i=0; i < pDataArray.size(); i++){ @@ -629,8 +633,7 @@ int SeqSummaryCommand::createProcessesCreateSummary(vector& startPosition, CloseHandle(hThreadArray[i]); delete pDataArray[i]; } - - //rename((sumFile + toString(processIDS[0]) + ".temp").c_str(), sumFile.c_str()); + //append files for(int i=0;iappendFiles((sumFile + toString(processIDS[i]) + ".temp"), sumFile); -- 2.39.2