*/
#include "seqsummarycommand.h"
-#include "sequence.hpp"
+
//**********************************************************************************************************************
vector<string> SeqSummaryCommand::setParameters(){
MPI_Barrier(MPI_COMM_WORLD); //make everyone wait - just in case
#else
- vector<unsigned long int> positions = m->divideFile(fastafile, processors);
-
- for (int i = 0; i < (positions.size()-1); i++) {
- lines.push_back(new linePair(positions[i], positions[(i+1)]));
- }
-
- #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux)
- if(processors == 1){
- numSeqs = driverCreateSummary(startPosition, endPosition, seqLength, ambigBases, longHomoPolymer, fastafile, summaryFile, lines[0]);
- }else{
- numSeqs = createProcessesCreateSummary(startPosition, endPosition, seqLength, ambigBases, longHomoPolymer, fastafile, summaryFile);
-
- rename((summaryFile + toString(processIDS[0]) + ".temp").c_str(), summaryFile.c_str());
- //append files
- for(int i=1;i<processors;i++){
- m->appendFiles((summaryFile + toString(processIDS[i]) + ".temp"), summaryFile);
- m->mothurRemove((summaryFile + toString(processIDS[i]) + ".temp"));
- }
+ vector<unsigned long int> positions;
+ #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux)
+ positions = m->divideFile(fastafile, processors);
+ for (int i = 0; i < (positions.size()-1); i++) { lines.push_back(new linePair(positions[i], positions[(i+1)])); }
+ #else
+ positions = m->setFilePosFasta(fastafile, numSeqs);
+
+ //figure out how many sequences you have to process
+ int numSeqsPerProcessor = numSeqs / processors;
+ for (int i = 0; i < processors; i++) {
+ int startIndex = i * numSeqsPerProcessor;
+ if(i == (processors - 1)){ numSeqsPerProcessor = numSeqs - i * numSeqsPerProcessor; }
+ lines.push_back(new linePair(positions[startIndex], numSeqsPerProcessor));
}
-
- if (m->control_pressed) { return 0; }
- #else
+ #endif
+
+
+ if(processors == 1){
numSeqs = driverCreateSummary(startPosition, endPosition, seqLength, ambigBases, longHomoPolymer, fastafile, summaryFile, lines[0]);
- if (m->control_pressed) { return 0; }
- #endif
+ }else{
+ numSeqs = createProcessesCreateSummary(startPosition, endPosition, seqLength, ambigBases, longHomoPolymer, fastafile, summaryFile);
+ }
+
+ if (m->control_pressed) { return 0; }
#endif
#ifdef USE_MPI
/**************************************************************************************************/
int SeqSummaryCommand::createProcessesCreateSummary(vector<int>& startPosition, vector<int>& endPosition, vector<int>& seqLength, vector<int>& ambigBases, vector<int>& longHomoPolymer, string filename, string sumFile) {
try {
-#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux)
- int process = 0;
+ int process = 1;
int num = 0;
processIDS.clear();
+#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux)
+
//loop through and create all the processes you want
while (process != processors) {
int pid = fork();
}
}
+ //do your part
+ num = driverCreateSummary(startPosition, endPosition, seqLength, ambigBases, longHomoPolymer, fastafile, sumFile, lines[0]);
+
//force parent to wait until all the processes are done
- for (int i=0;i<processors;i++) {
+ for (int i=0;i<processIDS.size();i++) {
int temp = processIDS[i];
wait(&temp);
}
in.close();
m->mothurRemove(tempFilename);
+
+ m->appendFiles((sumFile + toString(processIDS[i]) + ".temp"), sumFile);
+ m->mothurRemove((sumFile + toString(processIDS[i]) + ".temp"));
}
- return num;
+#else
+ //////////////////////////////////////////////////////////////////////////////////////////////////////
+ //Windows version shared memory, so be careful when passing variables through the seqSumData struct.
+ //Above fork() will clone, so memory is separate, but that's not the case with windows,
+ //Taking advantage of shared memory to allow both threads to add info to vectors.
+ //////////////////////////////////////////////////////////////////////////////////////////////////////
+
+ vector<seqSumData*> pDataArray;
+ DWORD dwThreadIdArray[processors];
+ HANDLE hThreadArray[processors];
+
+ //Create processor worker threads.
+ for( int i=0; i<processors; i++ ){
+
+ //cout << i << '\t' << lines[i]->start << '\t' << lines[i]->end << endl;
+ // Allocate memory for thread data.
+ seqSumData* tempSum = new seqSumData(&startPosition, &endPosition, &seqLength, &ambigBases, &longHomoPolymer, filename, (sumFile + toString(i) + ".temp"), m, lines[i]->start, lines[i]->end, namefile, nameMap);
+ pDataArray.push_back(tempSum);
+ processIDS.push_back(i);
+
+ //MySeqSumThreadFunction is in header. It must be global or static to work with the threads.
+ //default security attributes, thread function name, argument to thread function, use default creation flags, returns the thread identifier
+ hThreadArray[i] = CreateThread(NULL, 0, MySeqSumThreadFunction, pDataArray[i], 0, &dwThreadIdArray[i]);
+ }
+
+ //Wait until all threads have terminated.
+ WaitForMultipleObjects(processors, hThreadArray, TRUE, INFINITE);
+
+ //Close all thread handles and free memory allocations.
+ for(int i=0; i < pDataArray.size(); i++){
+ num += pDataArray[i]->count;
+ CloseHandle(hThreadArray[i]);
+ delete pDataArray[i];
+ }
+
+ //rename((sumFile + toString(processIDS[0]) + ".temp").c_str(), sumFile.c_str());
+ //append files
+ for(int i=0;i<processIDS.size();i++){
+ m->appendFiles((sumFile + toString(processIDS[i]) + ".temp"), sumFile);
+ m->mothurRemove((sumFile + toString(processIDS[i]) + ".temp"));
+ }
#endif
+ return num;
}
catch(exception& e) {
m->errorOut(e, "SeqSummaryCommand", "createProcessesCreateSummary");