+
+ //divide the partitions between the processors
+ vector< vector<int> > dividedPartitions;
+ vector< vector<string> > rels, matrix;
+ vector<string> doneFlags;
+ dividedPartitions.resize(processors);
+ rels.resize(processors);
+ matrix.resize(processors);
+
+ //for each file group figure out which process will complete it
+ //want to divide the load intelligently so the big files are spread between processes
+ for (int i=1; i<=maxpartitions; i++) {
+ //cout << i << endl;
+ int processToAssign = (i+1) % processors;
+ if (processToAssign == 0) { processToAssign = processors; }
+
+ if (m->debug) { m->mothurOut("[DEBUG]: assigning " + toString(i) + " to process " + toString(processToAssign-1) + "\n"); }
+ dividedPartitions[(processToAssign-1)].push_back(i);
+
+ variables["[tag]"] = toString(i);
+ string relName = getOutputFileName("relabund", variables);
+ string mName = getOutputFileName("matrix", variables);
+ rels[(processToAssign-1)].push_back(relName);
+ matrix[(processToAssign-1)].push_back(mName);
+ }
+
+ for (int i = 0; i < processors; i++) { //read from everyone elses, just write to yours
+ string tempDoneFile = toString(i) + ".done.temp";
+ doneFlags.push_back(tempDoneFile);
+ ofstream out;
+ m->openOutputFile(tempDoneFile, out); //clear out
+ out.close();
+ }
+
+
+#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) || (__linux__) || (__unix__) || (__unix)
+
+ //loop through and create all the processes you want
+ while (process != processors) {
+ int pid = fork();
+
+ if (pid > 0) {
+ processIDS.push_back(pid); //create map from line number to pid so you can append files in correct order later
+ process++;
+ }else if (pid == 0){
+ outputNames.clear();
+ num = processDriver(thislookup, dividedPartitions[process], (outputFileName + toString(getpid())), rels[process], matrix[process], doneFlags, process);
+
+ //pass numSeqs to parent
+ ofstream out;
+ string tempFile = toString(getpid()) + ".outputNames.temp";
+ m->openOutputFile(tempFile, out);
+ out << num << endl;
+ out << outputNames.size() << endl;
+ for (int i = 0; i < outputNames.size(); i++) { out << outputNames[i] << endl; }
+ out.close();
+
+ exit(0);
+ }else {
+ m->mothurOut("[ERROR]: unable to spawn the necessary processes."); m->mothurOutEndLine();
+ for (int i = 0; i < processIDS.size(); i++) { kill (processIDS[i], SIGINT); }
+ exit(0);
+ }
+ }
+
+ //do my part
+ minPartition = processDriver(thislookup, dividedPartitions[0], outputFileName, rels[0], matrix[0], doneFlags, 0);
+
+ //force parent to wait until all the processes are done
+ for (int i=0;i<processIDS.size();i++) {
+ int temp = processIDS[i];
+ wait(&temp);
+ }
+
+ vector<string> tempOutputNames = outputNames;
+ for (int i=0;i<processIDS.size();i++) {
+ ifstream in;
+ string tempFile = toString(processIDS[i]) + ".outputNames.temp";
+ m->openInputFile(tempFile, in);
+ if (!in.eof()) {
+ int tempNum = 0;
+ in >> tempNum; m->gobble(in);
+ if (tempNum < minPartition) { minPartition = tempNum; }
+ in >> tempNum; m->gobble(in);
+ for (int i = 0; i < tempNum; i++) {
+ string tempName = "";
+ in >> tempName; m->gobble(in);
+ tempOutputNames.push_back(tempName);
+ }
+ }
+ in.close(); m->mothurRemove(tempFile);
+
+ m->appendFilesWithoutHeaders(outputFileName + toString(processIDS[i]), outputFileName);
+ m->mothurRemove(outputFileName + toString(processIDS[i]));
+ }
+
+ if (processors > 1) {
+ outputNames.clear();
+ for (int i = 0; i < tempOutputNames.size(); i++) { //remove files if needed
+ string name = tempOutputNames[i];
+ vector<string> parts;
+ m->splitAtChar(name, parts, '.');
+ bool keep = true;
+ if (((parts[parts.size()-1] == "relabund") || (parts[parts.size()-1] == "posterior")) && (parts[parts.size()-2] == "mix")) {
+ string tempNum = parts[parts.size()-3];
+ int num; m->mothurConvert(tempNum, num);
+ //if (num > minPartition) {
+ // m->mothurRemove(tempOutputNames[i]);
+ // keep = false; if (m->debug) { m->mothurOut("[DEBUG]: removing " + tempOutputNames[i] + ".\n"); }
+ //}
+ }
+ if (keep) { outputNames.push_back(tempOutputNames[i]); }
+ }
+
+ //reorder fit file
+ ifstream in;
+ m->openInputFile(outputFileName, in);
+ string headers = m->getline(in); m->gobble(in);
+
+ map<int, string> file;
+ while (!in.eof()) {
+ string numString, line;
+ int num;
+ in >> numString; line = m->getline(in); m->gobble(in);
+ m->mothurConvert(numString, num);
+ file[num] = line;
+ }
+ in.close();
+ ofstream out;
+ m->openOutputFile(outputFileName, out);
+ out << headers << endl;
+ for (map<int, string>::iterator it = file.begin(); it != file.end(); it++) {
+ out << it->first << '\t' << it->second << endl;
+ if (m->debug) { m->mothurOut("[DEBUG]: printing: " + toString(it->first) + '\t' + it->second + ".\n"); }
+ }
+ out.close();
+ }
+
+#else
+ /*
+ vector<metaCommunityData*> pDataArray;
+ DWORD dwThreadIdArray[processors-1];
+ HANDLE hThreadArray[processors-1];
+
+ //Create processor worker threads.
+ for( int i=1; i<processors; i++ ){
+ //copy lookup
+ //make copy of lookup so we don't get access violations
+ vector<SharedRAbundVector*> newLookup;
+ for (int k = 0; k < thislookup.size(); k++) {
+ SharedRAbundVector* temp = new SharedRAbundVector();
+ temp->setLabel(thislookup[k]->getLabel());
+ temp->setGroup(thislookup[k]->getGroup());
+ newLookup.push_back(temp);
+ }
+
+ //for each bin
+ for (int k = 0; k < thislookup[0]->getNumBins(); k++) {
+ if (m->control_pressed) { for (int j = 0; j < newLookup.size(); j++) { delete newLookup[j]; } return 0; }
+ for (int j = 0; j < thislookup.size(); j++) { newLookup[j]->push_back(thislookup[j]->getAbundance(k), thislookup[j]->getGroup()); }
+ }
+
+ processIDS.push_back(i);
+
+ // Allocate memory for thread data.
+ metaCommunityData* tempMeta = new metaCommunityData(newLookup, m, dividedPartitions[i], outputFileName + toString(i), rels[i], matrix[i], minpartitions, maxpartitions, optimizegap);
+ pDataArray.push_back(tempMeta);
+
+ hThreadArray[i] = CreateThread(NULL, 0, MyMetaCommunityThreadFunction, pDataArray[i], 0, &dwThreadIdArray[i]);
+ }
+
+ //do my part
+ minPartition = processDriver(thislookup, dividedPartitions[0], outputFileName, rels[0], matrix[0]);
+
+ //Wait until all threads have terminated.
+ WaitForMultipleObjects(processors-1, hThreadArray, TRUE, INFINITE);
+
+ //Close all thread handles and free memory allocations.
+ for(int i=0; i < pDataArray.size(); i++){
+ if (pDataArray[i]->minPartition < minPartition) { minPartition = pDataArray[i]->minPartition; }
+ for (int j = 0; j < pDataArray[i]->outputNames.size(); j++) {
+ outputNames.push_back(pDataArray[i]->outputNames[j]);
+ }
+ m->appendFilesWithoutHeaders(outputFileName + toString(processIDS[i]), outputFileName);
+ m->mothurRemove(outputFileName + toString(processIDS[i]));
+ CloseHandle(hThreadArray[i]);
+ delete pDataArray[i];
+ } */
+ //do my part
+ minPartition = processDriver(thislookup, dividedPartitions[0], outputFileName, rels[0], matrix[0], doneFlags, 0);
+#endif
+ for (int i = 0; i < processors; i++) { //read from everyone elses, just write to yours
+ string tempDoneFile = toString(i) + ".done.temp";
+ m->mothurRemove(tempDoneFile);
+ }
+
+ if (m->control_pressed) { return 0; }
+
+ if (m->debug) { m->mothurOut("[DEBUG]: minPartition = " + toString(minPartition) + "\n"); }
+
+ //run generate Summary function for smallest minPartition
+ variables["[tag]"] = toString(minPartition);
+ generateSummaryFile(minPartition, variables);
+
+ return 0;
+
+ }
+ catch(exception& e) {
+ m->errorOut(e, "GetMetaCommunityCommand", "createProcesses");
+ exit(1);
+ }
+}
+//**********************************************************************************************************************
+int GetMetaCommunityCommand::processDriver(vector<SharedRAbundVector*>& thislookup, vector<int>& parts, string outputFileName, vector<string> relabunds, vector<string> matrix, vector<string> doneFlags, int processID){
+ try {
+
+ double minLaplace = 1e10;
+ int minPartition = 0;