X-Git-Url: https://git.donarmstrong.com/?a=blobdiff_plain;ds=sidebyside;f=classifyseqscommand.cpp;h=3b7b4a8c595cfeb9d6149a05a2b5347e6adfe6cc;hb=ae57e166b2ed7b475ec3f466106bd76fabadd063;hp=4291132c74ea9537e9abfdea7779ac404d636c2a;hpb=55386dddad84cc1140d736cabaf4dd0ae16f2e01;p=mothur.git diff --git a/classifyseqscommand.cpp b/classifyseqscommand.cpp index 4291132..3b7b4a8 100644 --- a/classifyseqscommand.cpp +++ b/classifyseqscommand.cpp @@ -8,11 +8,6 @@ */ #include "classifyseqscommand.h" -#include "sequence.hpp" -#include "bayesian.h" -#include "phylotree.h" -#include "phylosummary.h" -#include "knn.h" @@ -383,6 +378,11 @@ ClassifySeqsCommand::ClassifySeqsCommand(string option) { temp = validParameter.validFile(parameters, "save", false); if (temp == "not found"){ temp = "f"; } save = m->isTrue(temp); + //this is so the threads can quickly load the reference data + #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) + #else + if (processors != 1) { save = true; } + #endif rdb->save = save; if (save) { //clear out old references rdb->clearMemory(); @@ -484,7 +484,6 @@ int ClassifySeqsCommand::execute(){ } if (m->control_pressed) { delete classify; return 0; } - for (int s = 0; s < fastaFileNames.size(); s++) { @@ -518,7 +517,7 @@ int ClassifySeqsCommand::execute(){ #ifdef USE_MPI int pid, numSeqsPerProcessor; int tag = 2001; - vector MPIPos; + vector MPIPos; MPI_Status status; MPI_Comm_rank(MPI_COMM_WORLD, &pid); //find out who we are @@ -544,7 +543,7 @@ int ClassifySeqsCommand::execute(){ MPI_File_open(MPI_COMM_WORLD, outNewTax, outMode, MPI_INFO_NULL, &outMPINewTax); MPI_File_open(MPI_COMM_WORLD, outTempTax, outMode, MPI_INFO_NULL, &outMPITempTax); - if (m->control_pressed) { outputTypes.clear(); MPI_File_close(&inMPI); MPI_File_close(&outMPINewTax); MPI_File_close(&outMPITempTax); delete classify; return 0; } + if (m->control_pressed) { outputTypes.clear(); MPI_File_close(&inMPI); MPI_File_close(&outMPINewTax); MPI_File_close(&outMPITempTax); delete classify; return 0; } if (pid == 0) { //you are the root process @@ -599,25 +598,30 @@ int ClassifySeqsCommand::execute(){ #else - vector positions = m->divideFile(fastaFileNames[s], processors); + vector positions; +#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) + positions = m->divideFile(fastaFileNames[s], processors); + for (int i = 0; i < (positions.size()-1); i++) { lines.push_back(new linePair(positions[i], positions[(i+1)])); } +#else + if (processors == 1) { + lines.push_back(new linePair(0, 1000)); + }else { + positions = m->setFilePosFasta(fastaFileNames[s], numFastaSeqs); - for (int i = 0; i < (positions.size()-1); i++) { - lines.push_back(new linePair(positions[i], positions[(i+1)])); - } - - #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) + //figure out how many sequences you have to process + int numSeqsPerProcessor = numFastaSeqs / processors; + for (int i = 0; i < processors; i++) { + int startIndex = i * numSeqsPerProcessor; + if(i == (processors - 1)){ numSeqsPerProcessor = numFastaSeqs - i * numSeqsPerProcessor; } + lines.push_back(new linePair(positions[startIndex], numSeqsPerProcessor)); + } + } +#endif if(processors == 1){ numFastaSeqs = driver(lines[0], newTaxonomyFile, tempTaxonomyFile, fastaFileNames[s]); - } - else{ - processIDS.resize(0); - + }else{ numFastaSeqs = createProcesses(newTaxonomyFile, tempTaxonomyFile, fastaFileNames[s]); - } - #else - numFastaSeqs = driver(lines[0], newTaxonomyFile, tempTaxonomyFile, fastaFileNames[s]); - #endif #endif m->mothurOutEndLine(); @@ -746,6 +750,7 @@ int ClassifySeqsCommand::execute(){ } delete classify; + return 0; } catch(exception& e) { @@ -787,9 +792,12 @@ string ClassifySeqsCommand::addUnclassifieds(string tax, int maxlevel) { int ClassifySeqsCommand::createProcesses(string taxFileName, string tempTaxFile, string filename) { try { + + int num = 0; + processIDS.clear(); + #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) int process = 1; - int num = 0; //loop through and create all the processes you want while (process != processors) { @@ -832,6 +840,46 @@ int ClassifySeqsCommand::createProcesses(string taxFileName, string tempTaxFile, if (!in.eof()) { int tempNum = 0; in >> tempNum; num += tempNum; } in.close(); m->mothurRemove(m->getFullPathName(tempFile)); } +#else + ////////////////////////////////////////////////////////////////////////////////////////////////////// + //Windows version shared memory, so be careful when passing variables through the alignData struct. + //Above fork() will clone, so memory is separate, but that's not the case with windows, + ////////////////////////////////////////////////////////////////////////////////////////////////////// + + vector pDataArray; + DWORD dwThreadIdArray[processors-1]; + HANDLE hThreadArray[processors-1]; + + //Create processor worker threads. + for( int i=0; istart, lines[i]->end, match, misMatch, gapOpen, gapExtend, cutoff, i); + pDataArray.push_back(tempclass); + + //MySeqSumThreadFunction is in header. It must be global or static to work with the threads. + //default security attributes, thread function name, argument to thread function, use default creation flags, returns the thread identifier + hThreadArray[i] = CreateThread(NULL, 0, MyClassThreadFunction, pDataArray[i], 0, &dwThreadIdArray[i]); + + } + + //parent does its part + num = driver(lines[processors-1], taxFileName + toString(processors-1) + ".temp", tempTaxFile + toString(processors-1) + ".temp", filename); + processIDS.push_back((processors-1)); + + //Wait until all threads have terminated. + WaitForMultipleObjects(processors-1, hThreadArray, TRUE, INFINITE); + + //Close all thread handles and free memory allocations. + for(int i=0; i < pDataArray.size(); i++){ + num += pDataArray[i]->count; + CloseHandle(hThreadArray[i]); + delete pDataArray[i]; + } + + #endif for(int i=0;ierrorOut(e, "ClassifySeqsCommand", "createProcesses"); @@ -918,7 +966,7 @@ int ClassifySeqsCommand::driver(linePair* filePos, string taxFName, string tempT delete candidateSeq; #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) - unsigned long int pos = inFASTA.tellg(); + unsigned long long pos = inFASTA.tellg(); if ((pos == -1) || (pos >= filePos->end)) { break; } #else if (inFASTA.eof()) { break; } @@ -944,7 +992,7 @@ int ClassifySeqsCommand::driver(linePair* filePos, string taxFName, string tempT } //********************************************************************************************************************** #ifdef USE_MPI -int ClassifySeqsCommand::driverMPI(int start, int num, MPI_File& inMPI, MPI_File& newFile, MPI_File& tempFile, vector& MPIPos){ +int ClassifySeqsCommand::driverMPI(int start, int num, MPI_File& inMPI, MPI_File& newFile, MPI_File& tempFile, vector& MPIPos){ try { MPI_Status statusNew; MPI_Status statusTemp;