MPI_Barrier(MPI_COMM_WORLD); //make everyone wait - just in case
#else
-
+ vector<unsigned long long> positions = savedPositions[s];
#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux)
- vector<unsigned long long> positions = m->divideFile(fastafileNames[s], processors);
+ //vector<unsigned long long> positions = m->divideFile(fastafileNames[s], processors);
for (int i = 0; i < (positions.size()-1); i++) {
lines.push_back(new linePair(positions[i], positions[(i+1)]));
int numFastaSeqs = driverRunFilter(filter, filteredFasta, fastafileNames[s], lines[0]);
numSeqs += numFastaSeqs;
}else{
- int numFastaSeqs = createProcessesRunFilter(filter, fastafileNames[s]);
+ int numFastaSeqs = createProcessesRunFilter(filter, fastafileNames[s], filteredFasta);
numSeqs += numFastaSeqs;
-
- rename((fastafileNames[s] + toString(processIDS[0]) + ".temp").c_str(), filteredFasta.c_str());
-
- //append fasta files
- for(int i=1;i<processors;i++){
- m->appendFiles((fastafileNames[s] + toString(processIDS[i]) + ".temp"), filteredFasta);
- m->mothurRemove((fastafileNames[s] + toString(processIDS[i]) + ".temp"));
- }
}
if (m->control_pressed) { return 1; }
#else
- lines.push_back(new linePair(0, 1000));
+ if(processors == 1){
+ lines.push_back(new linePair(0, 1000));
int numFastaSeqs = driverRunFilter(filter, filteredFasta, fastafileNames[s], lines[0]);
numSeqs += numFastaSeqs;
+ }else {
+ int numFastaSeqs = positions.size()-1;
+ //positions = m->setFilePosFasta(fastafileNames[s], numFastaSeqs);
+
+ //figure out how many sequences you have to process
+ int numSeqsPerProcessor = numFastaSeqs / processors;
+ for (int i = 0; i < processors; i++) {
+ int startIndex = i * numSeqsPerProcessor;
+ if(i == (processors - 1)){ numSeqsPerProcessor = numFastaSeqs - i * numSeqsPerProcessor; }
+ lines.push_back(new linePair(positions[startIndex], numSeqsPerProcessor));
+ }
+
+ numFastaSeqs = createProcessesRunFilter(filter, fastafileNames[s], filteredFasta);
+ numSeqs += numFastaSeqs;
+ }
if (m->control_pressed) { return 1; }
#endif
}
/**************************************************************************************************/
-int FilterSeqsCommand::createProcessesRunFilter(string F, string filename) {
+int FilterSeqsCommand::createProcessesRunFilter(string F, string filename, string filteredFastaName) {
try {
-#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux)
- int process = 0;
+
+ int process = 1;
int num = 0;
processIDS.clear();
+
+#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux)
+
//loop through and create all the processes you want
while (process != processors) {
}
}
+ num = driverRunFilter(F, filteredFastaName, filename, lines[0]);
+
//force parent to wait until all the processes are done
- for (int i=0;i<processors;i++) {
+ for (int i=0;i<processIDS.size();i++) {
int temp = processIDS[i];
wait(&temp);
}
m->openInputFile(tempFile, in);
if (!in.eof()) { int tempNum = 0; in >> tempNum; num += tempNum; }
in.close(); m->mothurRemove(tempFile);
+
+ m->appendFiles((filename + toString(processIDS[i]) + ".temp"), filteredFastaName);
+ m->mothurRemove((filename + toString(processIDS[i]) + ".temp"));
}
-
-
- return num;
-#endif
+
+#else
+
+ //////////////////////////////////////////////////////////////////////////////////////////////////////
+ //Windows version shared memory, so be careful when passing variables through the filterData struct.
+ //Above fork() will clone, so memory is separate, but that's not the case with windows,
+ //Taking advantage of shared memory to allow both threads to add info to F.
+ //////////////////////////////////////////////////////////////////////////////////////////////////////
+
+ vector<filterRunData*> pDataArray;
+ DWORD dwThreadIdArray[processors-1];
+ HANDLE hThreadArray[processors-1];
+
+ //Create processor worker threads.
+ for( int i=0; i<processors-1; i++){
+
+ string extension = "";
+ if (i != 0) { extension = toString(i) + ".temp"; }
+
+ filterRunData* tempFilter = new filterRunData(filter, filename, (filteredFastaName + extension), m, lines[i]->start, lines[i]->end, alignmentLength, i);
+ pDataArray.push_back(tempFilter);
+ processIDS.push_back(i);
+
+ hThreadArray[i] = CreateThread(NULL, 0, MyRunFilterThreadFunction, pDataArray[i], 0, &dwThreadIdArray[i]);
+ }
+
+ num = driverRunFilter(F, (filteredFastaName + toString(processors-1) + ".temp"), filename, lines[processors-1]);
+
+ //Wait until all threads have terminated.
+ WaitForMultipleObjects(processors-1, hThreadArray, TRUE, INFINITE);
+
+ //Close all thread handles and free memory allocations.
+ for(int i=0; i < pDataArray.size(); i++){
+ num += pDataArray[i]->count;
+ CloseHandle(hThreadArray[i]);
+ delete pDataArray[i];
+ }
+
+ for (int i = 1; i < processors; i++) {
+ m->appendFiles((filteredFastaName + toString(i) + ".temp"), filteredFastaName);
+ m->mothurRemove((filteredFastaName + toString(i) + ".temp"));
+ }
+#endif
+
+ return num;
+
}
catch(exception& e) {
m->errorOut(e, "FilterSeqsCommand", "createProcessesRunFilter");
#else
-
+ vector<unsigned long long> positions;
#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux)
- vector<unsigned long long> positions = m->divideFile(fastafileNames[s], processors);
+ positions = m->divideFile(fastafileNames[s], processors);
for (int i = 0; i < (positions.size()-1); i++) {
lines.push_back(new linePair(positions[i], positions[(i+1)]));
}
int numFastaSeqs = createProcessesCreateFilter(F, fastafileNames[s]);
numSeqs += numFastaSeqs;
}
-
- if (m->control_pressed) { return filterString; }
#else
- lines.push_back(new linePair(0, 1000));
- int numFastaSeqs = driverCreateFilter(F, fastafileNames[s], lines[0]);
- numSeqs += numFastaSeqs;
- if (m->control_pressed) { return filterString; }
+ if(processors == 1){
+ lines.push_back(new linePair(0, 1000));
+ int numFastaSeqs = driverCreateFilter(F, fastafileNames[s], lines[0]);
+ numSeqs += numFastaSeqs;
+ }else {
+ int numFastaSeqs = 0;
+ positions = m->setFilePosFasta(fastafileNames[s], numFastaSeqs);
+
+ //figure out how many sequences you have to process
+ int numSeqsPerProcessor = numFastaSeqs / processors;
+ for (int i = 0; i < processors; i++) {
+ int startIndex = i * numSeqsPerProcessor;
+ if(i == (processors - 1)){ numSeqsPerProcessor = numFastaSeqs - i * numSeqsPerProcessor; }
+ lines.push_back(new linePair(positions[startIndex], numSeqsPerProcessor));
+ }
+
+ numFastaSeqs = createProcessesCreateFilter(F, fastafileNames[s]);
+ numSeqs += numFastaSeqs;
+ }
#endif
+ //save the file positions so we can reuse them in the runFilter function
+ savedPositions[s] = positions;
+
+ if (m->control_pressed) { return filterString; }
#endif
}
MPI_Barrier(MPI_COMM_WORLD);
#endif
-
+
return filterString;
}
catch(exception& e) {
int FilterSeqsCommand::createProcessesCreateFilter(Filters& F, string filename) {
try {
-#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux)
- int process = 1;
+ int process = 1;
int num = 0;
processIDS.clear();
-
+
+#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux)
+
//loop through and create all the processes you want
while (process != processors) {
int pid = fork();
m->mothurRemove(tempFilename);
}
- return num;
-#endif
+
+#else
+
+ //////////////////////////////////////////////////////////////////////////////////////////////////////
+ //Windows version shared memory, so be careful when passing variables through the filterData struct.
+ //Above fork() will clone, so memory is separate, but that's not the case with windows,
+ //Taking advantage of shared memory to allow both threads to add info to F.
+ //////////////////////////////////////////////////////////////////////////////////////////////////////
+
+ vector<filterData*> pDataArray;
+ DWORD dwThreadIdArray[processors];
+ HANDLE hThreadArray[processors];
+
+ //Create processor worker threads.
+ for( int i=0; i<processors; i++ ){
+
+ filterData* tempFilter = new filterData(filename, m, lines[i]->start, lines[i]->end, alignmentLength, trump, vertical, soft, hard, i);
+ pDataArray.push_back(tempFilter);
+ processIDS.push_back(i);
+
+ hThreadArray[i] = CreateThread(NULL, 0, MyCreateFilterThreadFunction, pDataArray[i], 0, &dwThreadIdArray[i]);
+ }
+
+ //Wait until all threads have terminated.
+ WaitForMultipleObjects(processors, hThreadArray, TRUE, INFINITE);
+
+ //Close all thread handles and free memory allocations.
+ for(int i=0; i < pDataArray.size(); i++){
+ num += pDataArray[i]->count;
+ F.mergeFilter(pDataArray[i]->F.getFilter());
+
+ for (int k = 0; k < alignmentLength; k++) { F.a[k] += pDataArray[i]->F.a[k]; }
+ for (int k = 0; k < alignmentLength; k++) { F.t[k] += pDataArray[i]->F.t[k]; }
+ for (int k = 0; k < alignmentLength; k++) { F.g[k] += pDataArray[i]->F.g[k]; }
+ for (int k = 0; k < alignmentLength; k++) { F.c[k] += pDataArray[i]->F.c[k]; }
+ for (int k = 0; k < alignmentLength; k++) { F.gap[k] += pDataArray[i]->F.gap[k]; }
+
+ CloseHandle(hThreadArray[i]);
+ delete pDataArray[i];
+ }
+
+#endif
+ return num;
+
}
catch(exception& e) {
m->errorOut(e, "FilterSeqsCommand", "createProcessesCreateFilter");
\r
vector<linePair*> lines;\r
vector<int> processIDS;\r
+ map<int, vector<unsigned long long> > savedPositions;\r
\r
string vertical, filter, fasta, hard, outputDir, filterFileName;\r
vector<string> fastafileNames; \r
string createFilter();\r
int filterSequences();\r
int createProcessesCreateFilter(Filters&, string);\r
- int createProcessesRunFilter(string, string);\r
+ int createProcessesRunFilter(string, string, string);\r
int driverRunFilter(string, string, string, linePair*);\r
int driverCreateFilter(Filters& F, string filename, linePair* line);\r
#ifdef USE_MPI\r
\r
};\r
\r
+\r
+/**************************************************************************************************/\r
+//custom data structure for threads to use.\r
+// This is passed by void pointer so it can be any data type\r
+// that can be passed using a single void pointer (LPVOID).\r
+struct filterData {\r
+ Filters F;\r
+ int count, tid, alignmentLength;\r
+ unsigned long long start, end;\r
+ MothurOut* m;\r
+ string filename, vertical, hard;\r
+ char trump;\r
+ float soft;\r
+ \r
+ filterData(){}\r
+ filterData(string fn, MothurOut* mout, unsigned long long st, unsigned long long en, int aLength, char tr, string vert, float so, string ha, int t) {\r
+ filename = fn;\r
+ m = mout;\r
+ start = st;\r
+ end = en;\r
+ tid = t;\r
+ trump = tr;\r
+ alignmentLength = aLength;\r
+ vertical = vert;\r
+ soft = so;\r
+ hard = ha;\r
+ count = 0;\r
+ }\r
+};\r
+/**************************************************************************************************/\r
+//custom data structure for threads to use.\r
+// This is passed by void pointer so it can be any data type\r
+// that can be passed using a single void pointer (LPVOID).\r
+struct filterRunData {\r
+ int count, tid, alignmentLength;\r
+ unsigned long long start, end;\r
+ MothurOut* m;\r
+ string filename;\r
+ string filter, outputFilename;\r
+ \r
+ filterRunData(){}\r
+ filterRunData(string f, string fn, string ofn, MothurOut* mout, unsigned long long st, unsigned long long en, int aLength, int t) {\r
+ filter = f;\r
+ outputFilename = ofn;\r
+ filename = fn;\r
+ m = mout;\r
+ start = st;\r
+ end = en;\r
+ tid = t;\r
+ alignmentLength = aLength;\r
+ count = 0;\r
+ }\r
+};\r
+\r
+/**************************************************************************************************/\r
+#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux)\r
+#else\r
+static DWORD WINAPI MyCreateFilterThreadFunction(LPVOID lpParam){ \r
+ filterData* pDataArray;\r
+ pDataArray = (filterData*)lpParam;\r
+ \r
+ try {\r
+\r
+ if (pDataArray->soft != 0) { pDataArray->F.setSoft(pDataArray->soft); }\r
+ if (pDataArray->trump != '*') { pDataArray->F.setTrump(pDataArray->trump); }\r
+ \r
+ pDataArray->F.setLength(pDataArray->alignmentLength);\r
+ \r
+ if(pDataArray->trump != '*' || pDataArray->m->isTrue(pDataArray->vertical) || pDataArray->soft != 0){\r
+ pDataArray->F.initialize();\r
+ }\r
+ \r
+ if(pDataArray->hard.compare("") != 0) { pDataArray->F.doHard(pDataArray->hard); }\r
+ else { pDataArray->F.setFilter(string(pDataArray->alignmentLength, '1')); }\r
+ \r
+ ifstream in;\r
+ pDataArray->m->openInputFile(pDataArray->filename, in);\r
+ \r
+ //print header if you are process 0\r
+ if ((pDataArray->start == 0) || (pDataArray->start == 1)) {\r
+ in.seekg(0);\r
+ }else { //this accounts for the difference in line endings. \r
+ in.seekg(pDataArray->start-1); pDataArray->m->gobble(in); \r
+ }\r
+ \r
+ pDataArray->count = pDataArray->end;\r
+ for(int i = 0; i < pDataArray->end; i++){ //end is the number of sequences to process\r
+ \r
+ if (pDataArray->m->control_pressed) { in.close(); pDataArray->count = 1; return 1; }\r
+ \r
+ Sequence current(in); pDataArray->m->gobble(in); \r
+ \r
+ if (current.getName() != "") {\r
+ if (current.getAligned().length() != pDataArray->alignmentLength) { pDataArray->m->mothurOut("Sequences are not all the same length, please correct."); pDataArray->m->mothurOutEndLine(); pDataArray->m->control_pressed = true; }\r
+ \r
+ if(pDataArray->trump != '*') { pDataArray->F.doTrump(current); }\r
+ if(pDataArray->m->isTrue(pDataArray->vertical) || pDataArray->soft != 0) { pDataArray->F.getFreqs(current); }\r
+ }\r
+ \r
+ //report progress\r
+ if((i) % 100 == 0){ pDataArray->m->mothurOut(toString(i)); pDataArray->m->mothurOutEndLine(); }\r
+ }\r
+ \r
+ if((pDataArray->count) % 100 != 0){ pDataArray->m->mothurOut(toString(pDataArray->count)); pDataArray->m->mothurOutEndLine(); }\r
+ \r
+ in.close();\r
+ \r
+ return 0;\r
+ \r
+ }\r
+ catch(exception& e) {\r
+ pDataArray->m->errorOut(e, "FilterSeqsCommand", "MyCreateFilterThreadFunction");\r
+ exit(1);\r
+ }\r
+} \r
+/**************************************************************************************************/\r
+static DWORD WINAPI MyRunFilterThreadFunction(LPVOID lpParam){ \r
+ filterRunData* pDataArray;\r
+ pDataArray = (filterRunData*)lpParam;\r
+ \r
+ try {\r
+ \r
+ ofstream out;\r
+ pDataArray->m->openOutputFile(pDataArray->outputFilename, out);\r
+\r
+ ifstream in;\r
+ pDataArray->m->openInputFile(pDataArray->filename, in);\r
+ \r
+ //print header if you are process 0\r
+ if ((pDataArray->start == 0) || (pDataArray->start == 1)) {\r
+ in.seekg(0);\r
+ }else { //this accounts for the difference in line endings. \r
+ in.seekg(pDataArray->start-1); pDataArray->m->gobble(in); \r
+ }\r
+ \r
+ pDataArray->count = pDataArray->end;\r
+ for(int i = 0; i < pDataArray->end; i++){ //end is the number of sequences to process\r
+ \r
+ if (pDataArray->m->control_pressed) { in.close(); out.close(); pDataArray->count = 1; return 1; }\r
+ \r
+ Sequence seq(in); pDataArray->m->gobble(in);\r
+ if (seq.getName() != "") {\r
+ string align = seq.getAligned();\r
+ string filterSeq = "";\r
+ \r
+ for(int j=0;j<pDataArray->alignmentLength;j++){\r
+ if(pDataArray->filter[j] == '1'){\r
+ filterSeq += align[j];\r
+ }\r
+ }\r
+ \r
+ out << '>' << seq.getName() << endl << filterSeq << endl;\r
+ }\r
+ \r
+ //report progress\r
+ if((i) % 100 == 0){ pDataArray->m->mothurOut(toString(i)); pDataArray->m->mothurOutEndLine(); }\r
+ }\r
+ \r
+ if((pDataArray->count) % 100 != 0){ pDataArray->m->mothurOut(toString(pDataArray->count)); pDataArray->m->mothurOutEndLine(); }\r
+ \r
+ in.close();\r
+ out.close();\r
+ \r
+ return 0;\r
+ \r
+ }\r
+ catch(exception& e) {\r
+ pDataArray->m->errorOut(e, "FilterSeqsCommand", "MyRunFilterThreadFunction");\r
+ exit(1);\r
+ }\r
+} \r
+/**************************************************************************************************/\r
+#endif\r
+\r
+\r
#endif\r