indicatorValues = getValues(groupings, indicatorGroups, randomGroupingsMap);
- pValues = getPValues(groupings, randomGroupingsMap, lookup.size(), indicatorValues);
+ pValues = getPValues(groupings, lookup.size(), indicatorValues);
}else {
vector< vector<SharedRAbundFloatVector*> > groupings;
set<string> groupsAlreadyAdded;
indicatorValues = getValues(groupings, indicatorGroups, randomGroupingsMap);
- pValues = getPValues(groupings, randomGroupingsMap, lookupFloat.size(), indicatorValues);
+ pValues = getPValues(groupings, lookupFloat.size(), indicatorValues);
}
if (m->control_pressed) { out.close(); return 0; }
indicatorValues = getValues(groupings, indicatorGroups, randomGroupingsMap);
- pValues = getPValues(groupings, randomGroupingsMap, lookup.size(), indicatorValues);
+ pValues = getPValues(groupings, lookup.size(), indicatorValues);
}else {
vector< vector<SharedRAbundFloatVector*> > groupings;
indicatorValues = getValues(groupings, indicatorGroups, randomGroupingsMap);
- pValues = getPValues(groupings, randomGroupingsMap, lookupFloat.size(), indicatorValues);
+ pValues = getPValues(groupings, lookupFloat.size(), indicatorValues);
}
if (m->control_pressed) { out.close(); return 0; }
}
}
//**********************************************************************************************************************
-vector<float> IndicatorCommand::driver(vector< vector<SharedRAbundFloatVector*> >& groupings, map< vector<int>, vector<int> > groupingsMap, int num, vector<float> indicatorValues, int numIters){
+vector<float> IndicatorCommand::driver(vector< vector<SharedRAbundFloatVector*> >& groupings, int num, vector<float> indicatorValues, int numIters){
try {
vector<float> pvalues;
pvalues.resize(indicatorValues.size(), 0);
for(int i=0;i<numIters;i++){
if (m->control_pressed) { break; }
- groupingsMap = randomizeGroupings(groupings, num);
+ map< vector<int>, vector<int> > groupingsMap = randomizeGroupings(groupings, num);
vector<float> randomIndicatorValues = getValues(groupings, notUsedGroupings, groupingsMap);
for (int j = 0; j < indicatorValues.size(); j++) {
}
}
//**********************************************************************************************************************
-vector<float> IndicatorCommand::getPValues(vector< vector<SharedRAbundFloatVector*> >& groupings, map< vector<int>, vector<int> > groupingsMap, int num, vector<float> indicatorValues){
+vector<float> IndicatorCommand::getPValues(vector< vector<SharedRAbundFloatVector*> >& groupings, int num, vector<float> indicatorValues){
try {
vector<float> pvalues;
-
-#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) || (__linux__) || (__unix__) || (__unix)
+
if(processors == 1){
- pvalues = driver(groupings, groupingsMap, num, indicatorValues, iters);
- for (int i = 0; i < pvalues.size(); i++) { pvalues[i] /= (double)iters; }
+ pvalues = driver(groupings, num, indicatorValues, iters);
+ for (int i = 0; i < pvalues.size(); i++) { pvalues[i] /= (double)iters; }
}else{
+ //divide iters between processors
+ vector<int> procIters;
+ int numItersPerProcessor = iters / processors;
+
+ //divide iters between processes
+ for (int h = 0; h < processors; h++) {
+ if(h == processors - 1){ numItersPerProcessor = iters - h * numItersPerProcessor; }
+ procIters.push_back(numItersPerProcessor);
+ }
+
+ vector<int> processIDS;
+ int process = 1;
- //divide iters between processors
- int numItersPerProcessor = iters / processors;
-
- vector<int> processIDS;
- int process = 1;
- int num = 0;
-
+#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) || (__linux__) || (__unix__) || (__unix)
+
//loop through and create all the processes you want
while (process != processors) {
int pid = fork();
processIDS.push_back(pid); //create map from line number to pid so you can append files in correct order later
process++;
}else if (pid == 0){
- pvalues = driver(groupings, groupingsMap, num, indicatorValues, numItersPerProcessor);
+ pvalues = driver(groupings, num, indicatorValues, procIters[process]);
//pass pvalues to parent
ofstream out;
}
//do my part
- //special case for last processor in case it doesn't divide evenly
- numItersPerProcessor = iters - ((processors-1) * numItersPerProcessor);
- pvalues = driver(groupings, groupingsMap, num, indicatorValues, numItersPerProcessor);
+ pvalues = driver(groupings, num, indicatorValues, procIters[0]);
//force parent to wait until all the processes are done
for (int i=0;i<processIDS.size();i++) {
}
in.close(); m->mothurRemove(tempFile);
}
- for (int i = 0; i < pvalues.size(); i++) { pvalues[i] /= (double)iters; }
- }
+ for (int i = 0; i < pvalues.size(); i++) { pvalues[i] /= (double)iters; }
#else
- pvalues = driver(groupings, groupingsMap, num, indicatorValues, iters);
- for (int i = 0; i < pvalues.size(); i++) { pvalues[i] /= (double)iters; }
+
+ //fill in functions
+ vector<indicatorData*> pDataArray;
+ DWORD dwThreadIdArray[processors-1];
+ HANDLE hThreadArray[processors-1];
+
+ //Create processor worker threads.
+ for( int i=1; i<processors; i++ ){
+
+ //make copy of lookup so we don't get access violations
+ vector< vector<SharedRAbundFloatVector*> > newGroupings;
+
+ for (int k = 0; k < groupings.size(); k++) {
+ vector<SharedRAbundFloatVector*> newLookup;
+ for (int l = 0; l < groupings[k].size(); l++) {
+ SharedRAbundFloatVector* temp = new SharedRAbundFloatVector();
+ temp->setLabel(groupings[k][l]->getLabel());
+ temp->setGroup(groupings[k][l]->getGroup());
+ newLookup.push_back(temp);
+ }
+ newGroupings.push_back(newLookup);
+ }
+
+ //for each bin
+ for (int l = 0; l < groupings.size(); l++) {
+ for (int k = 0; k < groupings[l][0]->getNumBins(); k++) {
+ if (m->control_pressed) { for (int j = 0; j < newGroupings.size(); j++) { for (int u = 0; u < newGroupings[j].size(); u++) { delete newGroupings[j][u]; } } return pvalues; }
+
+ for (int j = 0; j < groupings[l].size(); j++) { newGroupings[l][j]->push_back(groupings[l][j]->getAbundance(k), groupings[l][j]->getGroup()); }
+ }
+ }
+
+ vector<float> copyIValues = indicatorValues;
+
+ indicatorData* temp = new indicatorData(m, procIters[i], newGroupings, num, copyIValues);
+ pDataArray.push_back(temp);
+ processIDS.push_back(i);
+
+ hThreadArray[i-1] = CreateThread(NULL, 0, MyIndicatorThreadFunction, pDataArray[i-1], 0, &dwThreadIdArray[i-1]);
+ }
+
+ //do my part
+ pvalues = driver(groupings, num, indicatorValues, procIters[0]);
+
+ //Wait until all threads have terminated.
+ WaitForMultipleObjects(processors-1, hThreadArray, TRUE, INFINITE);
+
+ //Close all thread handles and free memory allocations.
+ for(int i=0; i < pDataArray.size(); i++){
+ for (int j = 0; j < pDataArray[i]->pvalues.size(); j++) { pvalues[j] += pDataArray[i]->pvalues[j]; }
+
+ for (int l = 0; l < pDataArray[i]->groupings.size(); l++) {
+ for (int j = 0; j < pDataArray[i]->groupings[l].size(); j++) { delete pDataArray[i]->groupings[l][j]; }
+ }
+
+ CloseHandle(hThreadArray[i]);
+ delete pDataArray[i];
+ }
+
+ for (int i = 0; i < pvalues.size(); i++) { pvalues[i] /= (double)iters; }
#endif
+ }
+
return pvalues;
}
//**********************************************************************************************************************
//same as above, just data type difference
-vector<float> IndicatorCommand::driver(vector< vector<SharedRAbundVector*> >& groupings, map< vector<int>, vector<int> > groupingsMap, int num, vector<float> indicatorValues, int numIters){
+vector<float> IndicatorCommand::driver(vector< vector<SharedRAbundVector*> >& groupings, int num, vector<float> indicatorValues, int numIters){
try {
vector<float> pvalues;
pvalues.resize(indicatorValues.size(), 0);
for(int i=0;i<numIters;i++){
if (m->control_pressed) { break; }
- groupingsMap = randomizeGroupings(groupings, num);
+ map< vector<int>, vector<int> > groupingsMap = randomizeGroupings(groupings, num);
vector<float> randomIndicatorValues = getValues(groupings, notUsedGroupings, groupingsMap);
for (int j = 0; j < indicatorValues.size(); j++) {
}
//**********************************************************************************************************************
//same as above, just data type difference
-vector<float> IndicatorCommand::getPValues(vector< vector<SharedRAbundVector*> >& groupings, map< vector<int>, vector<int> > groupingsMap, int num, vector<float> indicatorValues){
+vector<float> IndicatorCommand::getPValues(vector< vector<SharedRAbundVector*> >& groupings, int num, vector<float> indicatorValues){
try {
vector<float> pvalues;
-
-#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) || (__linux__) || (__unix__) || (__unix)
+
if(processors == 1){
- pvalues = driver(groupings, groupingsMap, num, indicatorValues, iters);
- for (int i = 0; i < pvalues.size(); i++) { pvalues[i] /= (double)iters; }
+ pvalues = driver(groupings, num, indicatorValues, iters);
+ for (int i = 0; i < pvalues.size(); i++) { pvalues[i] /= (double)iters; }
}else{
+ //divide iters between processors
+ vector<int> procIters;
+ int numItersPerProcessor = iters / processors;
+
+ //divide iters between processes
+ for (int h = 0; h < processors; h++) {
+ if(h == processors - 1){ numItersPerProcessor = iters - h * numItersPerProcessor; }
+ procIters.push_back(numItersPerProcessor);
+ }
+
+ vector<int> processIDS;
+ int process = 1;
- //divide iters between processors
- int numItersPerProcessor = iters / processors;
-
- vector<int> processIDS;
- int process = 1;
-
+#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) || (__linux__) || (__unix__) || (__unix)
+
//loop through and create all the processes you want
while (process != processors) {
int pid = fork();
processIDS.push_back(pid); //create map from line number to pid so you can append files in correct order later
process++;
}else if (pid == 0){
- pvalues = driver(groupings, groupingsMap, num, indicatorValues, numItersPerProcessor);
+ pvalues = driver(groupings, num, indicatorValues, procIters[process]);
//pass pvalues to parent
ofstream out;
out.close();
exit(0);
- }else {
- m->mothurOut("[ERROR]: unable to spawn the necessary processes."); m->mothurOutEndLine();
+ }else {
+ m->mothurOut("[ERROR]: unable to spawn the necessary processes."); m->mothurOutEndLine();
for (int i = 0; i < processIDS.size(); i++) { kill (processIDS[i], SIGINT); }
exit(0);
}
}
//do my part
- //special case for last processor in case it doesn't divide evenly
- numItersPerProcessor = iters - ((processors-1) * numItersPerProcessor);
- pvalues = driver(groupings, groupingsMap, num, indicatorValues, numItersPerProcessor);
+ pvalues = driver(groupings, num, indicatorValues, procIters[0]);
//force parent to wait until all the processes are done
- for (int i=0;i<processIDS.size();i++) {
+ for (int i=0;i<processIDS.size();i++) {
int temp = processIDS[i];
wait(&temp);
}
- //combine results
+ //combine results
for (int i = 0; i < processIDS.size(); i++) {
ifstream in;
string tempFile = toString(processIDS[i]) + ".pvalues.temp";
m->openInputFile(tempFile, in);
////// to do ///////////
- int numTemp; numTemp = 0;
+ int numTemp; numTemp = 0;
for (int j = 0; j < pvalues.size(); j++) {
in >> numTemp; m->gobble(in);
pvalues[j] += numTemp;
}
in.close(); m->mothurRemove(tempFile);
}
- for (int i = 0; i < pvalues.size(); i++) { pvalues[i] /= (double)iters; }
- }
+ for (int i = 0; i < pvalues.size(); i++) { pvalues[i] /= (double)iters; }
#else
- pvalues = driver(groupings, groupingsMap, num, indicatorValues, iters);
- for (int i = 0; i < pvalues.size(); i++) { pvalues[i] /= (double)iters; }
+
+ //fill in functions
+ vector<indicatorData*> pDataArray;
+ DWORD dwThreadIdArray[processors-1];
+ HANDLE hThreadArray[processors-1];
+
+ //Create processor worker threads.
+ for( int i=1; i<processors; i++ ){
+
+ //make copy of lookup so we don't get access violations
+ vector< vector<SharedRAbundFloatVector*> > newGroupings;
+
+ for (int k = 0; k < groupings.size(); k++) {
+ vector<SharedRAbundFloatVector*> newLookup;
+ for (int l = 0; l < groupings[k].size(); l++) {
+ SharedRAbundFloatVector* temp = new SharedRAbundFloatVector();
+ temp->setLabel(groupings[k][l]->getLabel());
+ temp->setGroup(groupings[k][l]->getGroup());
+ newLookup.push_back(temp);
+ }
+ newGroupings.push_back(newLookup);
+ }
+
+ //for each bin
+ for (int l = 0; l < groupings.size(); l++) {
+ for (int k = 0; k < groupings[l][0]->getNumBins(); k++) {
+ if (m->control_pressed) { for (int j = 0; j < newGroupings.size(); j++) { for (int u = 0; u < newGroupings[j].size(); u++) { delete newGroupings[j][u]; } } return pvalues; }
+
+ for (int j = 0; j < groupings[l].size(); j++) { newGroupings[l][j]->push_back((float)(groupings[l][j]->getAbundance(k)), groupings[l][j]->getGroup()); }
+ }
+ }
+
+ vector<float> copyIValues = indicatorValues;
+
+ indicatorData* temp = new indicatorData(m, procIters[i], newGroupings, num, copyIValues);
+ pDataArray.push_back(temp);
+ processIDS.push_back(i);
+
+ hThreadArray[i-1] = CreateThread(NULL, 0, MyIndicatorThreadFunction, pDataArray[i-1], 0, &dwThreadIdArray[i-1]);
+ }
+
+ //do my part
+ pvalues = driver(groupings, num, indicatorValues, procIters[0]);
+
+ //Wait until all threads have terminated.
+ WaitForMultipleObjects(processors-1, hThreadArray, TRUE, INFINITE);
+
+ //Close all thread handles and free memory allocations.
+ for(int i=0; i < pDataArray.size(); i++){
+ for (int j = 0; j < pDataArray[i]->pvalues.size(); j++) { pvalues[j] += pDataArray[i]->pvalues[j]; }
+
+ for (int l = 0; l < pDataArray[i]->groupings.size(); l++) {
+ for (int j = 0; j < pDataArray[i]->groupings[l].size(); j++) { delete pDataArray[i]->groupings[l][j]; }
+ }
+
+ CloseHandle(hThreadArray[i]);
+ delete pDataArray[i];
+ }
+
+ for (int i = 0; i < pvalues.size(); i++) { pvalues[i] /= (double)iters; }
#endif
+ }
+
return pvalues;
}
catch(exception& e) {
- m->errorOut(e, "IndicatorCommand", "getPValues");
+ m->errorOut(e, "IndicatorCommand", "getPValues");
exit(1);
}
}