*/
#include "classifyseqscommand.h"
-#include "sequence.hpp"
-#include "bayesian.h"
-#include "phylotree.h"
-#include "phylosummary.h"
-#include "knn.h"
temp = validParameter.validFile(parameters, "save", false); if (temp == "not found"){ temp = "f"; }
save = m->isTrue(temp);
+ //this is so the threads can quickly load the reference data
+ #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux)
+ #else
+ if (processors != 1) { save = true; }
+ #endif
rdb->save = save;
if (save) { //clear out old references
rdb->clearMemory();
try {
if (abort == true) { if (calledHelp) { return 0; } return 2; }
- if(method == "bayesian"){ classify = new Bayesian(taxonomyFileName, templateFileName, search, kmerSize, cutoff, iters); }
- else if(method == "knn"){ classify = new Knn(taxonomyFileName, templateFileName, search, kmerSize, gapOpen, gapExtend, match, misMatch, numWanted); }
+ if(method == "bayesian"){ classify = new Bayesian(taxonomyFileName, templateFileName, search, kmerSize, cutoff, iters, rand()); }
+ else if(method == "knn"){ classify = new Knn(taxonomyFileName, templateFileName, search, kmerSize, gapOpen, gapExtend, match, misMatch, numWanted, rand()); }
else {
m->mothurOut(search + " is not a valid method option. I will run the command using bayesian.");
m->mothurOutEndLine();
- classify = new Bayesian(taxonomyFileName, templateFileName, search, kmerSize, cutoff, iters);
+ classify = new Bayesian(taxonomyFileName, templateFileName, search, kmerSize, cutoff, iters, rand());
}
if (m->control_pressed) { delete classify; return 0; }
-
for (int s = 0; s < fastaFileNames.size(); s++) {
#ifdef USE_MPI
int pid, numSeqsPerProcessor;
int tag = 2001;
- vector<unsigned long int> MPIPos;
+ vector<unsigned long long> MPIPos;
MPI_Status status;
MPI_Comm_rank(MPI_COMM_WORLD, &pid); //find out who we are
MPI_File_open(MPI_COMM_WORLD, outNewTax, outMode, MPI_INFO_NULL, &outMPINewTax);
MPI_File_open(MPI_COMM_WORLD, outTempTax, outMode, MPI_INFO_NULL, &outMPITempTax);
- if (m->control_pressed) { outputTypes.clear(); MPI_File_close(&inMPI); MPI_File_close(&outMPINewTax); MPI_File_close(&outMPITempTax); delete classify; return 0; }
+ if (m->control_pressed) { outputTypes.clear(); MPI_File_close(&inMPI); MPI_File_close(&outMPINewTax); MPI_File_close(&outMPITempTax); delete classify; return 0; }
if (pid == 0) { //you are the root process
//align your part
driverMPI(startIndex, numSeqsPerProcessor, inMPI, outMPINewTax, outMPITempTax, MPIPos);
- if (m->control_pressed) { outputTypes.clear(); MPI_File_close(&inMPI); MPI_File_close(&outMPINewTax); MPI_File_close(&outMPITempTax); for (int i = 0; i < outputNames.size(); i++) { remove(outputNames[i].c_str()); } delete classify; return 0; }
+ if (m->control_pressed) { outputTypes.clear(); MPI_File_close(&inMPI); MPI_File_close(&outMPINewTax); MPI_File_close(&outMPITempTax); for (int i = 0; i < outputNames.size(); i++) { m->mothurRemove(outputNames[i]); } delete classify; return 0; }
for (int i = 1; i < processors; i++) {
int done;
#else
- vector<unsigned long int> positions = m->divideFile(fastaFileNames[s], processors);
+ vector<unsigned long long> positions;
+#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux)
+ positions = m->divideFile(fastaFileNames[s], processors);
+ for (int i = 0; i < (positions.size()-1); i++) { lines.push_back(new linePair(positions[i], positions[(i+1)])); }
+#else
+ if (processors == 1) {
+ lines.push_back(new linePair(0, 1000));
+ }else {
+ positions = m->setFilePosFasta(fastaFileNames[s], numFastaSeqs);
- for (int i = 0; i < (positions.size()-1); i++) {
- lines.push_back(new linePair(positions[i], positions[(i+1)]));
- }
-
- #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux)
+ //figure out how many sequences you have to process
+ int numSeqsPerProcessor = numFastaSeqs / processors;
+ for (int i = 0; i < processors; i++) {
+ int startIndex = i * numSeqsPerProcessor;
+ if(i == (processors - 1)){ numSeqsPerProcessor = numFastaSeqs - i * numSeqsPerProcessor; }
+ lines.push_back(new linePair(positions[startIndex], numSeqsPerProcessor));
+ }
+ }
+#endif
if(processors == 1){
numFastaSeqs = driver(lines[0], newTaxonomyFile, tempTaxonomyFile, fastaFileNames[s]);
- }
- else{
- processIDS.resize(0);
-
+ }else{
numFastaSeqs = createProcesses(newTaxonomyFile, tempTaxonomyFile, fastaFileNames[s]);
-
}
- #else
- numFastaSeqs = driver(lines[0], newTaxonomyFile, tempTaxonomyFile, fastaFileNames[s]);
- #endif
#endif
m->mothurOutEndLine();
PhyloSummary taxaSum(baseTName, group);
- if (m->control_pressed) { outputTypes.clear(); for (int i = 0; i < outputNames.size(); i++) { remove(outputNames[i].c_str()); } delete classify; return 0; }
+ if (m->control_pressed) { outputTypes.clear(); for (int i = 0; i < outputNames.size(); i++) { m->mothurRemove(outputNames[i]); } delete classify; return 0; }
if (namefile == "") { taxaSum.summarize(tempTaxonomyFile); }
else {
}
in.close();
}
- remove(tempTaxonomyFile.c_str());
+ m->mothurRemove(tempTaxonomyFile);
- if (m->control_pressed) { outputTypes.clear(); for (int i = 0; i < outputNames.size(); i++) { remove(outputNames[i].c_str()); } delete classify; return 0; }
+ if (m->control_pressed) { outputTypes.clear(); for (int i = 0; i < outputNames.size(); i++) { m->mothurRemove(outputNames[i]); } delete classify; return 0; }
//print summary file
ofstream outTaxTree;
//read taxfile - this reading and rewriting is done to preserve the confidence scores.
string name, taxon;
while (!inTax.eof()) {
- if (m->control_pressed) { outputTypes.clear(); for (int i = 0; i < outputNames.size(); i++) { remove(outputNames[i].c_str()); } remove(unclass.c_str()); delete classify; return 0; }
+ if (m->control_pressed) { outputTypes.clear(); for (int i = 0; i < outputNames.size(); i++) { m->mothurRemove(outputNames[i]); } m->mothurRemove(unclass); delete classify; return 0; }
inTax >> name >> taxon; m->gobble(inTax);
inTax.close();
outTax.close();
- remove(newTaxonomyFile.c_str());
+ m->mothurRemove(newTaxonomyFile);
rename(unclass.c_str(), newTaxonomyFile.c_str());
m->mothurOutEndLine();
}
delete classify;
+
return 0;
}
catch(exception& e) {
int ClassifySeqsCommand::createProcesses(string taxFileName, string tempTaxFile, string filename) {
try {
+
+ int num = 0;
+ processIDS.clear();
+
#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux)
int process = 1;
- int num = 0;
//loop through and create all the processes you want
while (process != processors) {
process++;
}else if (pid == 0){
num = driver(lines[process], taxFileName + toString(getpid()) + ".temp", tempTaxFile + toString(getpid()) + ".temp", filename);
-
+
//pass numSeqs to parent
ofstream out;
string tempFile = filename + toString(getpid()) + ".num.temp";
string tempFile = filename + toString(processIDS[i]) + ".num.temp";
m->openInputFile(tempFile, in);
if (!in.eof()) { int tempNum = 0; in >> tempNum; num += tempNum; }
- in.close(); remove(tempFile.c_str());
+ in.close(); m->mothurRemove(m->getFullPathName(tempFile));
+ }
+#else
+ //////////////////////////////////////////////////////////////////////////////////////////////////////
+ //Windows version shared memory, so be careful when passing variables through the alignData struct.
+ //Above fork() will clone, so memory is separate, but that's not the case with windows,
+ //////////////////////////////////////////////////////////////////////////////////////////////////////
+
+ vector<classifyData*> pDataArray;
+ DWORD dwThreadIdArray[processors-1];
+ HANDLE hThreadArray[processors-1];
+
+ //Create processor worker threads.
+ for( int i=0; i<processors-1; i++ ){
+ // Allocate memory for thread data.
+ string extension = "";
+ if (i != 0) { extension = toString(i) + ".temp"; processIDS.push_back(i); }
+
+ classifyData* tempclass = new classifyData(probs, method, templateFileName, taxonomyFileName, (taxFileName + extension), (tempTaxFile + extension), filename, search, kmerSize, iters, numWanted, m, lines[i]->start, lines[i]->end, match, misMatch, gapOpen, gapExtend, cutoff, i);
+ pDataArray.push_back(tempclass);
+
+ //MySeqSumThreadFunction is in header. It must be global or static to work with the threads.
+ //default security attributes, thread function name, argument to thread function, use default creation flags, returns the thread identifier
+ hThreadArray[i] = CreateThread(NULL, 0, MyClassThreadFunction, pDataArray[i], 0, &dwThreadIdArray[i]);
+
}
+ //parent does its part
+ num = driver(lines[processors-1], taxFileName + toString(processors-1) + ".temp", tempTaxFile + toString(processors-1) + ".temp", filename);
+ processIDS.push_back((processors-1));
+
+ //Wait until all threads have terminated.
+ WaitForMultipleObjects(processors-1, hThreadArray, TRUE, INFINITE);
+
+ //Close all thread handles and free memory allocations.
+ for(int i=0; i < pDataArray.size(); i++){
+ num += pDataArray[i]->count;
+ CloseHandle(hThreadArray[i]);
+ delete pDataArray[i];
+ }
+
+ #endif
+
for(int i=0;i<processIDS.size();i++){
appendTaxFiles((taxFileName + toString(processIDS[i]) + ".temp"), taxFileName);
appendTaxFiles((tempTaxFile + toString(processIDS[i]) + ".temp"), tempTaxFile);
- remove((taxFileName + toString(processIDS[i]) + ".temp").c_str());
- remove((tempTaxFile + toString(processIDS[i]) + ".temp").c_str());
+ m->mothurRemove((m->getFullPathName(taxFileName) + toString(processIDS[i]) + ".temp"));
+ m->mothurRemove((m->getFullPathName(tempTaxFile) + toString(processIDS[i]) + ".temp"));
}
return num;
-#endif
+
}
catch(exception& e) {
m->errorOut(e, "ClassifySeqsCommand", "createProcesses");
delete candidateSeq;
#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux)
- unsigned long int pos = inFASTA.tellg();
+ unsigned long long pos = inFASTA.tellg();
if ((pos == -1) || (pos >= filePos->end)) { break; }
#else
if (inFASTA.eof()) { break; }
}
//**********************************************************************************************************************
#ifdef USE_MPI
-int ClassifySeqsCommand::driverMPI(int start, int num, MPI_File& inMPI, MPI_File& newFile, MPI_File& tempFile, vector<unsigned long int>& MPIPos){
+int ClassifySeqsCommand::driverMPI(int start, int num, MPI_File& inMPI, MPI_File& newFile, MPI_File& tempFile, vector<unsigned long long>& MPIPos){
try {
MPI_Status statusNew;
MPI_Status statusTemp;