*/
#include "summarysharedcommand.h"
-#include "sharedsobscollectsummary.h"
-#include "sharedchao1.h"
-#include "sharedace.h"
-#include "sharednseqs.h"
-#include "sharedjabund.h"
-#include "sharedsorabund.h"
-#include "sharedjclass.h"
-#include "sharedsorclass.h"
-#include "sharedjest.h"
-#include "sharedsorest.h"
-#include "sharedthetayc.h"
-#include "sharedthetan.h"
-#include "sharedkstest.h"
-#include "whittaker.h"
-#include "sharedochiai.h"
-#include "sharedanderbergs.h"
-#include "sharedkulczynski.h"
-#include "sharedkulczynskicody.h"
-#include "sharedlennon.h"
-#include "sharedmorisitahorn.h"
-#include "sharedbraycurtis.h"
-#include "sharedjackknife.h"
-#include "whittaker.h"
-#include "odum.h"
-#include "canberra.h"
-#include "structeuclidean.h"
-#include "structchord.h"
-#include "hellinger.h"
-#include "manhattan.h"
-#include "structpearson.h"
-#include "soergel.h"
-#include "spearman.h"
-#include "structkulczynski.h"
-#include "structchi2.h"
-#include "speciesprofile.h"
-#include "hamming.h"
-#include "gower.h"
-#include "memchi2.h"
-#include "memchord.h"
-#include "memeuclidean.h"
-#include "mempearson.h"
//**********************************************************************************************************************
vector<string> SummarySharedCommand::setParameters(){
/***********************************************************/
int SummarySharedCommand::process(vector<SharedRAbundVector*> thisLookup, string sumFileName, string sumAllFileName) {
try {
- vector< vector<seqDist> > calcDists; //vector containing vectors that contains the summary results for each group compare
- calcDists.resize(sumCalculators.size()); //one for each calc, this will be used to make .dist files
-
- #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux)
- if(processors == 1){
- driver(thisLookup, 0, numGroups, sumFileName+".temp", sumAllFileName+".temp", calcDists);
- m->appendFiles((sumFileName + ".temp"), sumFileName);
- m->mothurRemove((sumFileName + ".temp"));
- if (mult) {
- m->appendFiles((sumAllFileName + ".temp"), sumAllFileName);
- m->mothurRemove((sumAllFileName + ".temp"));
- }
- }else{
- int process = 1;
- vector<int> processIDS;
-
- //loop through and create all the processes you want
- while (process != processors) {
- int pid = fork();
-
- if (pid > 0) {
- processIDS.push_back(pid);
- process++;
- }else if (pid == 0){
- driver(thisLookup, lines[process].start, lines[process].end, sumFileName + toString(getpid()) + ".temp", sumAllFileName + toString(getpid()) + ".temp", calcDists);
-
- //only do this if you want a distance file
- if (createPhylip) {
- string tempdistFileName = m->getRootName(m->getSimpleName(sumFileName)) + toString(getpid()) + ".dist";
- ofstream outtemp;
- m->openOutputFile(tempdistFileName, outtemp);
-
- for (int i = 0; i < calcDists.size(); i++) {
- outtemp << calcDists[i].size() << endl;
-
- for (int j = 0; j < calcDists[i].size(); j++) {
- outtemp << calcDists[i][j].seq1 << '\t' << calcDists[i][j].seq2 << '\t' << calcDists[i][j].dist << endl;
- }
- }
- outtemp.close();
- }
-
- exit(0);
- }else {
- m->mothurOut("[ERROR]: unable to spawn the necessary processes."); m->mothurOutEndLine();
- for (int i = 0; i < processIDS.size(); i++) { kill (processIDS[i], SIGINT); }
- exit(0);
- }
- }
-
- //parent do your part
- driver(thisLookup, lines[0].start, lines[0].end, sumFileName + toString(getpid()) + ".temp", sumAllFileName + toString(getpid()) + ".temp", calcDists);
- m->appendFiles((sumFileName + toString(getpid()) + ".temp"), sumFileName);
- m->mothurRemove((sumFileName + toString(getpid()) + ".temp"));
- if (mult) { m->appendFiles((sumAllFileName + toString(getpid()) + ".temp"), sumAllFileName); }
-
- //force parent to wait until all the processes are done
- for (int i = 0; i < processIDS.size(); i++) {
- int temp = processIDS[i];
- wait(&temp);
- }
-
- for (int i = 0; i < processIDS.size(); i++) {
- m->appendFiles((sumFileName + toString(processIDS[i]) + ".temp"), sumFileName);
- m->mothurRemove((sumFileName + toString(processIDS[i]) + ".temp"));
- if (mult) { m->mothurRemove((sumAllFileName + toString(processIDS[i]) + ".temp")); }
-
- if (createPhylip) {
- string tempdistFileName = m->getRootName(m->getSimpleName(sumFileName)) + toString(processIDS[i]) + ".dist";
- ifstream intemp;
- m->openInputFile(tempdistFileName, intemp);
-
- for (int k = 0; k < calcDists.size(); k++) {
- int size = 0;
- intemp >> size; m->gobble(intemp);
-
- for (int j = 0; j < size; j++) {
- int seq1 = 0;
- int seq2 = 0;
- float dist = 1.0;
-
- intemp >> seq1 >> seq2 >> dist; m->gobble(intemp);
-
- seqDist tempDist(seq1, seq2, dist);
- calcDists[k].push_back(tempDist);
- }
- }
- intemp.close();
- m->mothurRemove(tempdistFileName);
- }
- }
+ vector< vector<seqDist> > calcDists; //vector containing vectors that contains the summary results for each group compare
+ calcDists.resize(sumCalculators.size()); //one for each calc, this will be used to make .dist files
+
+
+ if(processors == 1){
+ driver(thisLookup, 0, numGroups, sumFileName+".temp", sumAllFileName+".temp", calcDists);
+ m->appendFiles((sumFileName + ".temp"), sumFileName);
+ m->mothurRemove((sumFileName + ".temp"));
+ if (mult) {
+ m->appendFiles((sumAllFileName + ".temp"), sumAllFileName);
+ m->mothurRemove((sumAllFileName + ".temp"));
+ }
+ }else{
+
+ int process = 1;
+ vector<int> processIDS;
+
+#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) || (__linux__) || (__unix__) || (__unix)
+ //loop through and create all the processes you want
+ while (process != processors) {
+ int pid = fork();
+
+ if (pid > 0) {
+ processIDS.push_back(pid);
+ process++;
+ }else if (pid == 0){
+ driver(thisLookup, lines[process].start, lines[process].end, sumFileName + toString(getpid()) + ".temp", sumAllFileName + toString(getpid()) + ".temp", calcDists);
+
+ //only do this if you want a distance file
+ if (createPhylip) {
+ string tempdistFileName = m->getRootName(m->getSimpleName(sumFileName)) + toString(getpid()) + ".dist";
+ ofstream outtemp;
+ m->openOutputFile(tempdistFileName, outtemp);
+
+ for (int i = 0; i < calcDists.size(); i++) {
+ outtemp << calcDists[i].size() << endl;
+
+ for (int j = 0; j < calcDists[i].size(); j++) {
+ outtemp << calcDists[i][j].seq1 << '\t' << calcDists[i][j].seq2 << '\t' << calcDists[i][j].dist << endl;
+ }
+ }
+ outtemp.close();
+ }
+
+ exit(0);
+ }else {
+ m->mothurOut("[ERROR]: unable to spawn the necessary processes."); m->mothurOutEndLine();
+ for (int i = 0; i < processIDS.size(); i++) { kill (processIDS[i], SIGINT); }
+ exit(0);
+ }
+ }
+
+ //parent do your part
+ driver(thisLookup, lines[0].start, lines[0].end, sumFileName + toString(getpid()) + ".temp", sumAllFileName + toString(getpid()) + ".temp", calcDists);
+ m->appendFiles((sumFileName + toString(getpid()) + ".temp"), sumFileName);
+ m->mothurRemove((sumFileName + toString(getpid()) + ".temp"));
+ if (mult) { m->appendFiles((sumAllFileName + toString(getpid()) + ".temp"), sumAllFileName); }
+
+ //force parent to wait until all the processes are done
+ for (int i = 0; i < processIDS.size(); i++) {
+ int temp = processIDS[i];
+ wait(&temp);
+ }
+
+ for (int i = 0; i < processIDS.size(); i++) {
+ m->appendFiles((sumFileName + toString(processIDS[i]) + ".temp"), sumFileName);
+ m->mothurRemove((sumFileName + toString(processIDS[i]) + ".temp"));
+ if (mult) { m->mothurRemove((sumAllFileName + toString(processIDS[i]) + ".temp")); }
+
+ if (createPhylip) {
+ string tempdistFileName = m->getRootName(m->getSimpleName(sumFileName)) + toString(processIDS[i]) + ".dist";
+ ifstream intemp;
+ m->openInputFile(tempdistFileName, intemp);
+
+ for (int k = 0; k < calcDists.size(); k++) {
+ int size = 0;
+ intemp >> size; m->gobble(intemp);
+
+ for (int j = 0; j < size; j++) {
+ int seq1 = 0;
+ int seq2 = 0;
+ float dist = 1.0;
+
+ intemp >> seq1 >> seq2 >> dist; m->gobble(intemp);
+
+ seqDist tempDist(seq1, seq2, dist);
+ calcDists[k].push_back(tempDist);
+ }
+ }
+ intemp.close();
+ m->mothurRemove(tempdistFileName);
+ }
+ }
+#else
+ //////////////////////////////////////////////////////////////////////////////////////////////////////
+ //Windows version shared memory, so be careful when passing variables through the summarySharedData struct.
+ //Above fork() will clone, so memory is separate, but that's not the case with windows,
+ //Taking advantage of shared memory to pass results vectors.
+ //////////////////////////////////////////////////////////////////////////////////////////////////////
- }
- #else
- driver(thisLookup, 0, numGroups, (sumFileName + ".temp"), (sumAllFileName + ".temp"), calcDists);
- m->appendFiles((sumFileName + ".temp"), sumFileName);
- m->mothurRemove((sumFileName + ".temp"));
- if (mult) {
- m->appendFiles((sumAllFileName + ".temp"), sumAllFileName);
- m->mothurRemove((sumAllFileName + ".temp"));
- }
- #endif
-
- if (createPhylip) {
- for (int i = 0; i < calcDists.size(); i++) {
- if (m->control_pressed) { break; }
+ vector<summarySharedData*> pDataArray;
+ DWORD dwThreadIdArray[processors-1];
+ HANDLE hThreadArray[processors-1];
+
+ //Create processor worker threads.
+ for( int i=1; i<processors; i++ ){
+
+ //make copy of lookup so we don't get access violations
+ vector<SharedRAbundVector*> newLookup;
+ for (int k = 0; k < thisLookup.size(); k++) {
+ SharedRAbundVector* temp = new SharedRAbundVector();
+ temp->setLabel(thisLookup[k]->getLabel());
+ temp->setGroup(thisLookup[k]->getGroup());
+ newLookup.push_back(temp);
+ }
+
+ //for each bin
+ for (int k = 0; k < thisLookup[0]->getNumBins(); k++) {
+ if (m->control_pressed) { for (int j = 0; j < newLookup.size(); j++) { delete newLookup[j]; } return 0; }
+ for (int j = 0; j < thisLookup.size(); j++) { newLookup[j]->push_back(thisLookup[j]->getAbundance(k), thisLookup[j]->getGroup()); }
+ }
+
+ // Allocate memory for thread data.
+ summarySharedData* tempSum = new summarySharedData((sumFileName+toString(i)+".temp"), m, lines[i].start, lines[i].end, Estimators, newLookup);
+ pDataArray.push_back(tempSum);
+ processIDS.push_back(i);
+
+ hThreadArray[i-1] = CreateThread(NULL, 0, MySummarySharedThreadFunction, pDataArray[i-1], 0, &dwThreadIdArray[i-1]);
+ }
+
+ //parent do your part
+ driver(thisLookup, lines[0].start, lines[0].end, sumFileName +"0.temp", sumAllFileName + "0.temp", calcDists);
+ m->appendFiles((sumFileName + "0.temp"), sumFileName);
+ m->mothurRemove((sumFileName + "0.temp"));
+ if (mult) { m->appendFiles((sumAllFileName + "0.temp"), sumAllFileName); }
+
+ //Wait until all threads have terminated.
+ WaitForMultipleObjects(processors-1, hThreadArray, TRUE, INFINITE);
+
+ //Close all thread handles and free memory allocations.
+ for(int i=0; i < pDataArray.size(); i++){
+ m->appendFiles((sumFileName + toString(processIDS[i]) + ".temp"), sumFileName);
+ m->mothurRemove((sumFileName + toString(processIDS[i]) + ".temp"));
+
+ for (int j = 0; j < pDataArray[i]->thisLookup.size(); j++) { delete pDataArray[i]->thisLookup[j]; }
+
+ if (createPhylip) {
+ for (int k = 0; k < calcDists.size(); k++) {
+ int size = pDataArray[i]->calcDists[k].size();
+ for (int j = 0; j < size; j++) { calcDists[k].push_back(pDataArray[i]->calcDists[k][j]); }
+ }
+ }
+
+ CloseHandle(hThreadArray[i]);
+ delete pDataArray[i];
+ }
+
+#endif
+ }
+
+ if (createPhylip) {
+ for (int i = 0; i < calcDists.size(); i++) {
+ if (m->control_pressed) { break; }
- string distFileName = outputDir + m->getRootName(m->getSimpleName(sumFileName)) + sumCalculators[i]->getName() + "." + thisLookup[0]->getLabel() + ".dist";
- outputNames.push_back(distFileName);
- ofstream outDist;
- m->openOutputFile(distFileName, outDist);
- outDist.setf(ios::fixed, ios::floatfield); outDist.setf(ios::showpoint);
-
- //initialize matrix
- vector< vector<float> > matrix; //square matrix to represent the distance
- matrix.resize(thisLookup.size());
- for (int k = 0; k < thisLookup.size(); k++) { matrix[k].resize(thisLookup.size(), 0.0); }
-
-
- for (int j = 0; j < calcDists[i].size(); j++) {
- int row = calcDists[i][j].seq1;
- int column = calcDists[i][j].seq2;
- float dist = calcDists[i][j].dist;
-
- matrix[row][column] = dist;
- matrix[column][row] = dist;
- }
+ string distFileName = outputDir + m->getRootName(m->getSimpleName(sumFileName)) + sumCalculators[i]->getName() + "." + thisLookup[0]->getLabel() + ".dist";
+ outputNames.push_back(distFileName);
+ ofstream outDist;
+ m->openOutputFile(distFileName, outDist);
+ outDist.setf(ios::fixed, ios::floatfield); outDist.setf(ios::showpoint);
+
+ //initialize matrix
+ vector< vector<float> > matrix; //square matrix to represent the distance
+ matrix.resize(thisLookup.size());
+ for (int k = 0; k < thisLookup.size(); k++) { matrix[k].resize(thisLookup.size(), 0.0); }
+
+
+ for (int j = 0; j < calcDists[i].size(); j++) {
+ int row = calcDists[i][j].seq1;
+ int column = calcDists[i][j].seq2;
+ float dist = calcDists[i][j].dist;
+
+ matrix[row][column] = dist;
+ matrix[column][row] = dist;
+ }
+
+ //output to file
+ outDist << thisLookup.size() << endl;
+ for (int r=0; r<thisLookup.size(); r++) {
+ //output name
+ string name = thisLookup[r]->getGroup();
+ if (name.length() < 10) { //pad with spaces to make compatible
+ while (name.length() < 10) { name += " "; }
+ }
+ outDist << name << '\t';
- //output to file
- outDist << thisLookup.size() << endl;
- for (int r=0; r<thisLookup.size(); r++) {
- //output name
- string name = thisLookup[r]->getGroup();
- if (name.length() < 10) { //pad with spaces to make compatible
- while (name.length() < 10) { name += " "; }
- }
- outDist << name << '\t';
-
- //output distances
- for (int l = 0; l < r; l++) { outDist << matrix[r][l] << '\t'; }
- outDist << endl;
- }
-
- outDist.close();
- }
- }
+ //output distances
+ for (int l = 0; l < r; l++) { outDist << matrix[r][l] << '\t'; }
+ outDist << endl;
+ }
+
+ outDist.close();
+ }
+ }
return 0;
}
catch(exception& e) {