*/
#include "classifyseqscommand.h"
-#include "sequence.hpp"
-#include "bayesian.h"
-#include "phylotree.h"
-#include "phylosummary.h"
-#include "knn.h"
temp = validParameter.validFile(parameters, "ksize", false); if (temp == "not found"){ temp = "8"; }
convert(temp, kmerSize);
+ temp = validParameter.validFile(parameters, "processors", false); if (temp == "not found"){ temp = m->getProcessors(); }
+ m->setProcessors(temp);
+ convert(temp, processors);
+
temp = validParameter.validFile(parameters, "save", false); if (temp == "not found"){ temp = "f"; }
save = m->isTrue(temp);
+ //this is so the threads can quickly load the reference data
+ #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux)
+ #else
+ if ((processors != 1) && (rdb->referenceSeqs.size() == 0)) { save = true; }
+ #endif
rdb->save = save;
if (save) { //clear out old references
rdb->clearMemory();
}else if (taxonomyFileName == "not open") { abort = true; }
else { if (save) { rdb->setSavedTaxonomy(taxonomyFileName); } }
- temp = validParameter.validFile(parameters, "processors", false); if (temp == "not found"){ temp = m->getProcessors(); }
- m->setProcessors(temp);
- convert(temp, processors);
-
search = validParameter.validFile(parameters, "search", false); if (search == "not found"){ search = "kmer"; }
method = validParameter.validFile(parameters, "method", false); if (method == "not found"){ method = "bayesian"; }
try {
if (abort == true) { if (calledHelp) { return 0; } return 2; }
- if(method == "bayesian"){ classify = new Bayesian(taxonomyFileName, templateFileName, search, kmerSize, cutoff, iters); }
- else if(method == "knn"){ classify = new Knn(taxonomyFileName, templateFileName, search, kmerSize, gapOpen, gapExtend, match, misMatch, numWanted); }
+ if(method == "bayesian"){ classify = new Bayesian(taxonomyFileName, templateFileName, search, kmerSize, cutoff, iters, rand()); }
+ else if(method == "knn"){ classify = new Knn(taxonomyFileName, templateFileName, search, kmerSize, gapOpen, gapExtend, match, misMatch, numWanted, rand()); }
else {
m->mothurOut(search + " is not a valid method option. I will run the command using bayesian.");
m->mothurOutEndLine();
- classify = new Bayesian(taxonomyFileName, templateFileName, search, kmerSize, cutoff, iters);
+ classify = new Bayesian(taxonomyFileName, templateFileName, search, kmerSize, cutoff, iters, rand());
}
if (m->control_pressed) { delete classify; return 0; }
-
for (int s = 0; s < fastaFileNames.size(); s++) {
#ifdef USE_MPI
int pid, numSeqsPerProcessor;
int tag = 2001;
- vector<unsigned long int> MPIPos;
+ vector<unsigned long long> MPIPos;
MPI_Status status;
MPI_Comm_rank(MPI_COMM_WORLD, &pid); //find out who we are
MPI_File_open(MPI_COMM_WORLD, outNewTax, outMode, MPI_INFO_NULL, &outMPINewTax);
MPI_File_open(MPI_COMM_WORLD, outTempTax, outMode, MPI_INFO_NULL, &outMPITempTax);
- if (m->control_pressed) { outputTypes.clear(); MPI_File_close(&inMPI); MPI_File_close(&outMPINewTax); MPI_File_close(&outMPITempTax); delete classify; return 0; }
+ if (m->control_pressed) { outputTypes.clear(); MPI_File_close(&inMPI); MPI_File_close(&outMPINewTax); MPI_File_close(&outMPITempTax); delete classify; return 0; }
if (pid == 0) { //you are the root process
#else
- vector<unsigned long int> positions = m->divideFile(fastaFileNames[s], processors);
+ vector<unsigned long long> positions;
+#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux)
+ positions = m->divideFile(fastaFileNames[s], processors);
+ for (int i = 0; i < (positions.size()-1); i++) { lines.push_back(new linePair(positions[i], positions[(i+1)])); }
+#else
+ if (processors == 1) {
+ lines.push_back(new linePair(0, 1000));
+ }else {
+ positions = m->setFilePosFasta(fastaFileNames[s], numFastaSeqs);
- for (int i = 0; i < (positions.size()-1); i++) {
- lines.push_back(new linePair(positions[i], positions[(i+1)]));
- }
-
- #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux)
+ //figure out how many sequences you have to process
+ int numSeqsPerProcessor = numFastaSeqs / processors;
+ for (int i = 0; i < processors; i++) {
+ int startIndex = i * numSeqsPerProcessor;
+ if(i == (processors - 1)){ numSeqsPerProcessor = numFastaSeqs - i * numSeqsPerProcessor; }
+ lines.push_back(new linePair(positions[startIndex], numSeqsPerProcessor));
+ }
+ }
+#endif
if(processors == 1){
numFastaSeqs = driver(lines[0], newTaxonomyFile, tempTaxonomyFile, fastaFileNames[s]);
- }
- else{
- processIDS.resize(0);
-
+ }else{
numFastaSeqs = createProcesses(newTaxonomyFile, tempTaxonomyFile, fastaFileNames[s]);
-
}
- #else
- numFastaSeqs = driver(lines[0], newTaxonomyFile, tempTaxonomyFile, fastaFileNames[s]);
- #endif
#endif
m->mothurOutEndLine();
}
delete classify;
+
return 0;
}
catch(exception& e) {
int ClassifySeqsCommand::createProcesses(string taxFileName, string tempTaxFile, string filename) {
try {
+
+ int num = 0;
+ processIDS.clear();
+
#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux)
int process = 1;
- int num = 0;
//loop through and create all the processes you want
while (process != processors) {
if (!in.eof()) { int tempNum = 0; in >> tempNum; num += tempNum; }
in.close(); m->mothurRemove(m->getFullPathName(tempFile));
}
+#else
+ //////////////////////////////////////////////////////////////////////////////////////////////////////
+ //Windows version shared memory, so be careful when passing variables through the alignData struct.
+ //Above fork() will clone, so memory is separate, but that's not the case with windows,
+ //////////////////////////////////////////////////////////////////////////////////////////////////////
+
+ vector<classifyData*> pDataArray;
+ DWORD dwThreadIdArray[processors-1];
+ HANDLE hThreadArray[processors-1];
+
+ //Create processor worker threads.
+ for( int i=0; i<processors-1; i++ ){
+ // Allocate memory for thread data.
+ string extension = "";
+ if (i != 0) { extension = toString(i) + ".temp"; processIDS.push_back(i); }
+
+ classifyData* tempclass = new classifyData(probs, method, templateFileName, taxonomyFileName, (taxFileName + extension), (tempTaxFile + extension), filename, search, kmerSize, iters, numWanted, m, lines[i]->start, lines[i]->end, match, misMatch, gapOpen, gapExtend, cutoff, i);
+ pDataArray.push_back(tempclass);
+
+ //MySeqSumThreadFunction is in header. It must be global or static to work with the threads.
+ //default security attributes, thread function name, argument to thread function, use default creation flags, returns the thread identifier
+ hThreadArray[i] = CreateThread(NULL, 0, MyClassThreadFunction, pDataArray[i], 0, &dwThreadIdArray[i]);
+
+ }
+
+ //parent does its part
+ num = driver(lines[processors-1], taxFileName + toString(processors-1) + ".temp", tempTaxFile + toString(processors-1) + ".temp", filename);
+ processIDS.push_back((processors-1));
+
+ //Wait until all threads have terminated.
+ WaitForMultipleObjects(processors-1, hThreadArray, TRUE, INFINITE);
+
+ //Close all thread handles and free memory allocations.
+ for(int i=0; i < pDataArray.size(); i++){
+ num += pDataArray[i]->count;
+ CloseHandle(hThreadArray[i]);
+ delete pDataArray[i];
+ }
+
+ #endif
for(int i=0;i<processIDS.size();i++){
appendTaxFiles((taxFileName + toString(processIDS[i]) + ".temp"), taxFileName);
}
return num;
-#endif
+
}
catch(exception& e) {
m->errorOut(e, "ClassifySeqsCommand", "createProcesses");
delete candidateSeq;
#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux)
- unsigned long int pos = inFASTA.tellg();
+ unsigned long long pos = inFASTA.tellg();
if ((pos == -1) || (pos >= filePos->end)) { break; }
#else
if (inFASTA.eof()) { break; }
}
//**********************************************************************************************************************
#ifdef USE_MPI
-int ClassifySeqsCommand::driverMPI(int start, int num, MPI_File& inMPI, MPI_File& newFile, MPI_File& tempFile, vector<unsigned long int>& MPIPos){
+int ClassifySeqsCommand::driverMPI(int start, int num, MPI_File& inMPI, MPI_File& newFile, MPI_File& tempFile, vector<unsigned long long>& MPIPos){
try {
MPI_Status statusNew;
MPI_Status statusTemp;