X-Git-Url: https://git.donarmstrong.com/?a=blobdiff_plain;f=seqsummarycommand.cpp;h=ee8835386e1937bd8bc6d51ab2b0779f05f6a679;hb=fc1ed1ae1b022719176910ab71993bd6535810ad;hp=2fbf6d1ca616033f4fd52993412f0e720f8187af;hpb=c651c46022761aef61644f78462365d8f767ff0b;p=mothur.git diff --git a/seqsummarycommand.cpp b/seqsummarycommand.cpp index 2fbf6d1..ee88353 100644 --- a/seqsummarycommand.cpp +++ b/seqsummarycommand.cpp @@ -8,7 +8,7 @@ */ #include "seqsummarycommand.h" -#include "sequence.hpp" + //********************************************************************************************************************** vector SeqSummaryCommand::setParameters(){ @@ -281,31 +281,30 @@ int SeqSummaryCommand::execute(){ MPI_Barrier(MPI_COMM_WORLD); //make everyone wait - just in case #else - vector positions = m->divideFile(fastafile, processors); - - for (int i = 0; i < (positions.size()-1); i++) { - lines.push_back(new linePair(positions[i], positions[(i+1)])); - } - - #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) - if(processors == 1){ - numSeqs = driverCreateSummary(startPosition, endPosition, seqLength, ambigBases, longHomoPolymer, fastafile, summaryFile, lines[0]); - }else{ - numSeqs = createProcessesCreateSummary(startPosition, endPosition, seqLength, ambigBases, longHomoPolymer, fastafile, summaryFile); - - rename((summaryFile + toString(processIDS[0]) + ".temp").c_str(), summaryFile.c_str()); - //append files - for(int i=1;iappendFiles((summaryFile + toString(processIDS[i]) + ".temp"), summaryFile); - m->mothurRemove((summaryFile + toString(processIDS[i]) + ".temp")); - } + vector positions; + #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) + positions = m->divideFile(fastafile, processors); + for (int i = 0; i < (positions.size()-1); i++) { lines.push_back(new linePair(positions[i], positions[(i+1)])); } + #else + positions = m->setFilePosFasta(fastafile, numSeqs); + + //figure out how many sequences you have to process + int numSeqsPerProcessor = numSeqs / processors; + for (int i = 0; i < processors; i++) { + int startIndex = i * numSeqsPerProcessor; + if(i == (processors - 1)){ numSeqsPerProcessor = numSeqs - i * numSeqsPerProcessor; } + lines.push_back(new linePair(positions[startIndex], numSeqsPerProcessor)); } - - if (m->control_pressed) { return 0; } - #else + #endif + + + if(processors == 1){ numSeqs = driverCreateSummary(startPosition, endPosition, seqLength, ambigBases, longHomoPolymer, fastafile, summaryFile, lines[0]); - if (m->control_pressed) { return 0; } - #endif + }else{ + numSeqs = createProcessesCreateSummary(startPosition, endPosition, seqLength, ambigBases, longHomoPolymer, fastafile, summaryFile); + } + + if (m->control_pressed) { return 0; } #endif #ifdef USE_MPI @@ -507,11 +506,12 @@ int SeqSummaryCommand::MPICreateSummary(int start, int num, vector& startPo /**************************************************************************************************/ int SeqSummaryCommand::createProcessesCreateSummary(vector& startPosition, vector& endPosition, vector& seqLength, vector& ambigBases, vector& longHomoPolymer, string filename, string sumFile) { try { -#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) - int process = 0; + int process = 1; int num = 0; processIDS.clear(); +#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) + //loop through and create all the processes you want while (process != processors) { int pid = fork(); @@ -545,8 +545,11 @@ int SeqSummaryCommand::createProcessesCreateSummary(vector& startPosition, } } + //do your part + num = driverCreateSummary(startPosition, endPosition, seqLength, ambigBases, longHomoPolymer, fastafile, sumFile, lines[0]); + //force parent to wait until all the processes are done - for (int i=0;i& startPosition, in.close(); m->mothurRemove(tempFilename); + + m->appendFiles((sumFile + toString(processIDS[i]) + ".temp"), sumFile); + m->mothurRemove((sumFile + toString(processIDS[i]) + ".temp")); } - return num; +#else + ////////////////////////////////////////////////////////////////////////////////////////////////////// + //Windows version shared memory, so be careful when passing variables through the seqSumData struct. + //Above fork() will clone, so memory is separate, but that's not the case with windows, + //Taking advantage of shared memory to allow both threads to add info to vectors. + ////////////////////////////////////////////////////////////////////////////////////////////////////// + + vector pDataArray; + DWORD dwThreadIdArray[processors]; + HANDLE hThreadArray[processors]; + + //Create processor worker threads. + for( int i=0; istart << '\t' << lines[i]->end << endl; + // Allocate memory for thread data. + seqSumData* tempSum = new seqSumData(&startPosition, &endPosition, &seqLength, &ambigBases, &longHomoPolymer, filename, (sumFile + toString(i) + ".temp"), m, lines[i]->start, lines[i]->end, namefile, nameMap); + pDataArray.push_back(tempSum); + processIDS.push_back(i); + + //MySeqSumThreadFunction is in header. It must be global or static to work with the threads. + //default security attributes, thread function name, argument to thread function, use default creation flags, returns the thread identifier + hThreadArray[i] = CreateThread(NULL, 0, MySeqSumThreadFunction, pDataArray[i], 0, &dwThreadIdArray[i]); + } + + //Wait until all threads have terminated. + WaitForMultipleObjects(processors, hThreadArray, TRUE, INFINITE); + + //Close all thread handles and free memory allocations. + for(int i=0; i < pDataArray.size(); i++){ + num += pDataArray[i]->count; + CloseHandle(hThreadArray[i]); + delete pDataArray[i]; + } + + //rename((sumFile + toString(processIDS[0]) + ".temp").c_str(), sumFile.c_str()); + //append files + for(int i=0;iappendFiles((sumFile + toString(processIDS[i]) + ".temp"), sumFile); + m->mothurRemove((sumFile + toString(processIDS[i]) + ".temp")); + } #endif + return num; } catch(exception& e) { m->errorOut(e, "SeqSummaryCommand", "createProcessesCreateSummary");