+ if (m->control_pressed) {
+ if (mult) { m->mothurRemove(outAllFileName); }
+ m->mothurRemove(outputFileName);
+ delete input;
+ for (int i = 0; i < lookup.size(); i++) { delete lookup[i]; }
+ for(int i=0;i<sumCalculators.size();i++){ delete sumCalculators[i]; }
+ m->clearGroups();
+ return 0;
+ }
+ /******************************************************/
+
+
+ /******************************************************/
+ //comparison breakup to be used by different processes later
+ numGroups = m->getNumGroups();
+ lines.resize(processors);
+ for (int i = 0; i < processors; i++) {
+ lines[i].start = int (sqrt(float(i)/float(processors)) * numGroups);
+ lines[i].end = int (sqrt(float(i+1)/float(processors)) * numGroups);
+ }
+ /******************************************************/
+
+ //if the users enters label "0.06" and there is no "0.06" in their file use the next lowest label.
+ set<string> processedLabels;
+ set<string> userLabels = labels;
+
+ //as long as you are not at the end of the file or done wih the lines you want
+ while((lookup[0] != NULL) && ((allLines == 1) || (userLabels.size() != 0))) {
+ if (m->control_pressed) {
+ if (mult) { m->mothurRemove(outAllFileName); }
+ m->mothurRemove(outputFileName);
+ delete input;
+ for (int i = 0; i < lookup.size(); i++) { delete lookup[i]; }
+ for(int i=0;i<sumCalculators.size();i++){ delete sumCalculators[i]; }
+ m->clearGroups();
+ return 0;
+ }
+
+
+ if(allLines == 1 || labels.count(lookup[0]->getLabel()) == 1){
+ m->mothurOut(lookup[0]->getLabel()); m->mothurOutEndLine();
+ process(lookup, outputFileName, outAllFileName);
+
+ processedLabels.insert(lookup[0]->getLabel());
+ userLabels.erase(lookup[0]->getLabel());
+ }
+
+ if ((m->anyLabelsToProcess(lookup[0]->getLabel(), userLabels, "") == true) && (processedLabels.count(lastLabel) != 1)) {
+ string saveLabel = lookup[0]->getLabel();
+
+ for (int i = 0; i < lookup.size(); i++) { delete lookup[i]; }
+ lookup = input->getSharedRAbundVectors(lastLabel);
+
+ m->mothurOut(lookup[0]->getLabel()); m->mothurOutEndLine();
+ process(lookup, outputFileName, outAllFileName);
+
+ processedLabels.insert(lookup[0]->getLabel());
+ userLabels.erase(lookup[0]->getLabel());
+
+ //restore real lastlabel to save below
+ lookup[0]->setLabel(saveLabel);
+ }
+
+ lastLabel = lookup[0]->getLabel();
+
+ //get next line to process
+ //prevent memory leak
+ for (int i = 0; i < lookup.size(); i++) { delete lookup[i]; }
+ lookup = input->getSharedRAbundVectors();
+ }
+
+ if (m->control_pressed) {
+ if (mult) { m->mothurRemove(outAllFileName); }
+ m->mothurRemove(outputFileName);
+ delete input;
+ for(int i=0;i<sumCalculators.size();i++){ delete sumCalculators[i]; }
+ m->clearGroups();
+ return 0;
+ }
+
+ //output error messages about any remaining user labels
+ set<string>::iterator it;
+ bool needToRun = false;
+ for (it = userLabels.begin(); it != userLabels.end(); it++) {
+ m->mothurOut("Your file does not include the label " + *it);
+ if (processedLabels.count(lastLabel) != 1) {
+ m->mothurOut(". I will use " + lastLabel + "."); m->mothurOutEndLine();
+ needToRun = true;
+ }else {
+ m->mothurOut(". Please refer to " + lastLabel + "."); m->mothurOutEndLine();
+ }
+ }
+
+ //run last label if you need to
+ if (needToRun == true) {
+ for (int i = 0; i < lookup.size(); i++) { if (lookup[i] != NULL) { delete lookup[i]; } }
+ lookup = input->getSharedRAbundVectors(lastLabel);
+
+ m->mothurOut(lookup[0]->getLabel()); m->mothurOutEndLine();
+ process(lookup, outputFileName, outAllFileName);
+ for (int i = 0; i < lookup.size(); i++) { delete lookup[i]; }
+ }
+
+
+ //reset groups parameter
+ m->clearGroups();
+
+ for(int i=0;i<sumCalculators.size();i++){ delete sumCalculators[i]; }
+ delete input;
+
+ if (m->control_pressed) {
+ m->mothurRemove(outAllFileName);
+ m->mothurRemove(outputFileName);
+ return 0;
+ }
+
+ m->mothurOutEndLine();
+ m->mothurOut("Output File Names: "); m->mothurOutEndLine();
+ m->mothurOut(outputFileName); m->mothurOutEndLine();
+ if (mult) { m->mothurOut(outAllFileName); m->mothurOutEndLine(); outputTypes["summary"].push_back(outAllFileName); }
+ for (int i = 0; i < outputNames.size(); i++) { m->mothurOut(outputNames[i]); m->mothurOutEndLine(); } outputTypes["summary"].push_back(outputFileName);
+ m->mothurOutEndLine();
+
+ return 0;
+ }
+ catch(exception& e) {
+ m->errorOut(e, "SummarySharedCommand", "execute");
+ exit(1);
+ }
+}
+
+/***********************************************************/
+int SummarySharedCommand::process(vector<SharedRAbundVector*> thisLookup, string sumFileName, string sumAllFileName) {
+ try {
+ vector< vector<seqDist> > calcDists; //vector containing vectors that contains the summary results for each group compare
+ calcDists.resize(sumCalculators.size()); //one for each calc, this will be used to make .dist files
+
+
+ if(processors == 1){
+ driver(thisLookup, 0, numGroups, sumFileName+".temp", sumAllFileName+".temp", calcDists);
+ m->appendFiles((sumFileName + ".temp"), sumFileName);
+ m->mothurRemove((sumFileName + ".temp"));
+ if (mult) {
+ m->appendFiles((sumAllFileName + ".temp"), sumAllFileName);
+ m->mothurRemove((sumAllFileName + ".temp"));
+ }
+ }else{
+
+ int process = 1;
+ vector<int> processIDS;
+
+#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) || (__linux__) || (__unix__) || (__unix)
+ //loop through and create all the processes you want
+ while (process != processors) {
+ int pid = fork();
+
+ if (pid > 0) {
+ processIDS.push_back(pid);
+ process++;
+ }else if (pid == 0){
+ driver(thisLookup, lines[process].start, lines[process].end, sumFileName + toString(getpid()) + ".temp", sumAllFileName + toString(getpid()) + ".temp", calcDists);
+
+ //only do this if you want a distance file
+ if (createPhylip) {
+ string tempdistFileName = m->getRootName(m->getSimpleName(sumFileName)) + toString(getpid()) + ".dist";
+ ofstream outtemp;
+ m->openOutputFile(tempdistFileName, outtemp);
+
+ for (int i = 0; i < calcDists.size(); i++) {
+ outtemp << calcDists[i].size() << endl;
+
+ for (int j = 0; j < calcDists[i].size(); j++) {
+ outtemp << calcDists[i][j].seq1 << '\t' << calcDists[i][j].seq2 << '\t' << calcDists[i][j].dist << endl;
+ }
+ }
+ outtemp.close();
+ }
+
+ exit(0);
+ }else {
+ m->mothurOut("[ERROR]: unable to spawn the necessary processes."); m->mothurOutEndLine();
+ for (int i = 0; i < processIDS.size(); i++) { kill (processIDS[i], SIGINT); }
+ exit(0);
+ }
+ }
+
+ //parent do your part
+ driver(thisLookup, lines[0].start, lines[0].end, sumFileName + toString(getpid()) + ".temp", sumAllFileName + toString(getpid()) + ".temp", calcDists);
+ m->appendFiles((sumFileName + toString(getpid()) + ".temp"), sumFileName);
+ m->mothurRemove((sumFileName + toString(getpid()) + ".temp"));
+ if (mult) { m->appendFiles((sumAllFileName + toString(getpid()) + ".temp"), sumAllFileName); }
+
+ //force parent to wait until all the processes are done
+ for (int i = 0; i < processIDS.size(); i++) {
+ int temp = processIDS[i];
+ wait(&temp);
+ }
+
+ for (int i = 0; i < processIDS.size(); i++) {
+ m->appendFiles((sumFileName + toString(processIDS[i]) + ".temp"), sumFileName);
+ m->mothurRemove((sumFileName + toString(processIDS[i]) + ".temp"));
+ if (mult) { m->mothurRemove((sumAllFileName + toString(processIDS[i]) + ".temp")); }
+
+ if (createPhylip) {
+ string tempdistFileName = m->getRootName(m->getSimpleName(sumFileName)) + toString(processIDS[i]) + ".dist";
+ ifstream intemp;
+ m->openInputFile(tempdistFileName, intemp);
+
+ for (int k = 0; k < calcDists.size(); k++) {
+ int size = 0;
+ intemp >> size; m->gobble(intemp);
+
+ for (int j = 0; j < size; j++) {
+ int seq1 = 0;
+ int seq2 = 0;
+ float dist = 1.0;
+
+ intemp >> seq1 >> seq2 >> dist; m->gobble(intemp);
+
+ seqDist tempDist(seq1, seq2, dist);
+ calcDists[k].push_back(tempDist);
+ }
+ }
+ intemp.close();
+ m->mothurRemove(tempdistFileName);
+ }
+ }
+#else
+ //////////////////////////////////////////////////////////////////////////////////////////////////////
+ //Windows version shared memory, so be careful when passing variables through the summarySharedData struct.
+ //Above fork() will clone, so memory is separate, but that's not the case with windows,
+ //Taking advantage of shared memory to pass results vectors.
+ //////////////////////////////////////////////////////////////////////////////////////////////////////
+
+ vector<summarySharedData*> pDataArray;
+ DWORD dwThreadIdArray[processors-1];
+ HANDLE hThreadArray[processors-1];
+
+ //Create processor worker threads.
+ for( int i=1; i<processors; i++ ){
+
+ //make copy of lookup so we don't get access violations
+ vector<SharedRAbundVector*> newLookup;
+ for (int k = 0; k < thisLookup.size(); k++) {
+ SharedRAbundVector* temp = new SharedRAbundVector();
+ temp->setLabel(thisLookup[k]->getLabel());
+ temp->setGroup(thisLookup[k]->getGroup());
+ newLookup.push_back(temp);
+ }
+
+ //for each bin
+ for (int k = 0; k < thisLookup[0]->getNumBins(); k++) {
+ if (m->control_pressed) { for (int j = 0; j < newLookup.size(); j++) { delete newLookup[j]; } return 0; }
+ for (int j = 0; j < thisLookup.size(); j++) { newLookup[j]->push_back(thisLookup[j]->getAbundance(k), thisLookup[j]->getGroup()); }
+ }
+
+ // Allocate memory for thread data.
+ summarySharedData* tempSum = new summarySharedData((sumFileName+toString(i)+".temp"), m, lines[i].start, lines[i].end, Estimators, newLookup);
+ pDataArray.push_back(tempSum);
+ processIDS.push_back(i);
+
+ hThreadArray[i-1] = CreateThread(NULL, 0, MySummarySharedThreadFunction, pDataArray[i-1], 0, &dwThreadIdArray[i-1]);
+ }
+
+ //parent do your part
+ driver(thisLookup, lines[0].start, lines[0].end, sumFileName +"0.temp", sumAllFileName + "0.temp", calcDists);
+ m->appendFiles((sumFileName + "0.temp"), sumFileName);
+ m->mothurRemove((sumFileName + "0.temp"));
+ if (mult) { m->appendFiles((sumAllFileName + "0.temp"), sumAllFileName); }
+
+ //Wait until all threads have terminated.
+ WaitForMultipleObjects(processors-1, hThreadArray, TRUE, INFINITE);
+
+ //Close all thread handles and free memory allocations.
+ for(int i=0; i < pDataArray.size(); i++){
+ m->appendFiles((sumFileName + toString(processIDS[i]) + ".temp"), sumFileName);
+ m->mothurRemove((sumFileName + toString(processIDS[i]) + ".temp"));
+
+ for (int j = 0; j < pDataArray[i]->thisLookup.size(); j++) { delete pDataArray[i]->thisLookup[j]; }
+
+ if (createPhylip) {
+ for (int k = 0; k < calcDists.size(); k++) {
+ int size = pDataArray[i]->calcDists[k].size();
+ for (int j = 0; j < size; j++) { calcDists[k].push_back(pDataArray[i]->calcDists[k][j]); }
+ }
+ }
+
+ CloseHandle(hThreadArray[i]);
+ delete pDataArray[i];
+ }
+
+#endif
+ }
+
+ if (createPhylip) {
+ for (int i = 0; i < calcDists.size(); i++) {
+ if (m->control_pressed) { break; }