*/
#include "clustersplitcommand.h"
-#include "readcluster.h"
-#include "splitmatrix.h"
-#include "readphylip.h"
-#include "readcolumn.h"
-#include "readmatrix.hpp"
-#include "inputdata.h"
+
//**********************************************************************************************************************
MPI_Barrier(MPI_COMM_WORLD);
#else
-
+ ///////////////////// WINDOWS CAN ONLY USE 1 PROCESSORS ACCESS VIOLATION UNRESOLVED ///////////////////////
//sanity check
if (processors > distName.size()) { processors = distName.size(); }
- #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux)
+ #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) || (__linux__) || (__unix__) || (__unix)
if(processors == 1){
listFileNames = cluster(distName, labels); //clusters individual files and returns names of list files
}else{
-
- //cout << processors << '\t' << distName.size() << endl;
- vector < vector < map<string, string> > > dividedNames; //distNames[1] = vector of filenames for process 1...
- dividedNames.resize(processors);
-
- //for each file group figure out which process will complete it
- //want to divide the load intelligently so the big files are spread between processes
- for (int i = 0; i < distName.size(); i++) {
- //cout << i << endl;
- int processToAssign = (i+1) % processors;
- if (processToAssign == 0) { processToAssign = processors; }
-
- dividedNames[(processToAssign-1)].push_back(distName[i]);
- }
-
- //not lets reverse the order of ever other process, so we balance big files running with little ones
- for (int i = 0; i < processors; i++) {
- //cout << i << endl;
- int remainder = ((i+1) % processors);
- if (remainder) { reverse(dividedNames[i].begin(), dividedNames[i].end()); }
- }
-
- createProcesses(dividedNames);
-
- if (m->control_pressed) { return 0; }
-
- //get list of list file names from each process
- for(int i=0;i<processors;i++){
- string filename = toString(processIDS[i]) + ".temp";
- ifstream in;
- m->openInputFile(filename, in);
-
- in >> tag; m->gobble(in);
-
- while(!in.eof()) {
- string tempName;
- in >> tempName; m->gobble(in);
- listFileNames.push_back(tempName);
- }
- in.close();
- m->mothurRemove((toString(processIDS[i]) + ".temp"));
-
- //get labels
- filename = toString(processIDS[i]) + ".temp.labels";
- ifstream in2;
- m->openInputFile(filename, in2);
-
- float tempCutoff;
- in2 >> tempCutoff; m->gobble(in2);
- if (tempCutoff < cutoff) { cutoff = tempCutoff; }
-
- while(!in2.eof()) {
- string tempName;
- in2 >> tempName; m->gobble(in2);
- if (labels.count(tempName) == 0) { labels.insert(tempName); }
- }
- in2.close();
- m->mothurRemove((toString(processIDS[i]) + ".temp.labels"));
- }
- }
+ listFileNames = createProcesses(distName, labels);
+ }
#else
listFileNames = cluster(distName, labels); //clusters individual files and returns names of list files
#endif
}
}
//**********************************************************************************************************************
-int ClusterSplitCommand::createProcesses(vector < vector < map<string, string> > > dividedNames){
+vector<string> ClusterSplitCommand::createProcesses(vector< map<string, string> > distName, set<string>& labels){
try {
+
+ vector<string> listFiles;
+ vector < vector < map<string, string> > > dividedNames; //distNames[1] = vector of filenames for process 1...
+ dividedNames.resize(processors);
+
+ //for each file group figure out which process will complete it
+ //want to divide the load intelligently so the big files are spread between processes
+ for (int i = 0; i < distName.size(); i++) {
+ //cout << i << endl;
+ int processToAssign = (i+1) % processors;
+ if (processToAssign == 0) { processToAssign = processors; }
+
+ dividedNames[(processToAssign-1)].push_back(distName[i]);
+ if ((processToAssign-1) == 1) { m->mothurOut(distName[i].begin()->first + "\n"); }
+ }
+
+ //not lets reverse the order of ever other process, so we balance big files running with little ones
+ for (int i = 0; i < processors; i++) {
+ //cout << i << endl;
+ int remainder = ((i+1) % processors);
+ if (remainder) { reverse(dividedNames[i].begin(), dividedNames[i].end()); }
+ }
+
+ if (m->control_pressed) { return listFiles; }
- #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux)
- int process = 0;
- int exitCommand = 1;
+ #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) || (__linux__) || (__unix__) || (__unix)
+ int process = 1;
processIDS.clear();
//loop through and create all the processes you want
}
}
+ //do your part
+ listFiles = cluster(dividedNames[0], labels);
+
//force parent to wait until all the processes are done
- for (int i=0;i<processors;i++) {
+ for (int i=0;i< processIDS.size();i++) {
int temp = processIDS[i];
wait(&temp);
}
+
+ //get list of list file names from each process
+ for(int i=0;i<processIDS.size();i++){
+ string filename = toString(processIDS[i]) + ".temp";
+ ifstream in;
+ m->openInputFile(filename, in);
+
+ in >> tag; m->gobble(in);
+
+ while(!in.eof()) {
+ string tempName;
+ in >> tempName; m->gobble(in);
+ listFiles.push_back(tempName);
+ }
+ in.close();
+ m->mothurRemove((toString(processIDS[i]) + ".temp"));
+
+ //get labels
+ filename = toString(processIDS[i]) + ".temp.labels";
+ ifstream in2;
+ m->openInputFile(filename, in2);
+
+ float tempCutoff;
+ in2 >> tempCutoff; m->gobble(in2);
+ if (tempCutoff < cutoff) { cutoff = tempCutoff; }
+
+ while(!in2.eof()) {
+ string tempName;
+ in2 >> tempName; m->gobble(in2);
+ if (labels.count(tempName) == 0) { labels.insert(tempName); }
+ }
+ in2.close();
+ m->mothurRemove((toString(processIDS[i]) + ".temp.labels"));
+ }
+
+
+ #else
+
+ //////////////////////////////////////////////////////////////////////////////////////////////////////
+ //Windows version shared memory, so be careful when passing variables through the clusterData struct.
+ //Above fork() will clone, so memory is separate, but that's not the case with windows,
+ //Taking advantage of shared memory to allow both threads to add labels.
+ //////////////////////////////////////////////////////////////////////////////////////////////////////
- return exitCommand;
+ vector<clusterData*> pDataArray;
+ DWORD dwThreadIdArray[processors-1];
+ HANDLE hThreadArray[processors-1];
+
+ //Create processor worker threads.
+ for( int i=1; i<processors; i++ ){
+ // Allocate memory for thread data.
+ clusterData* tempCluster = new clusterData(dividedNames[i], m, cutoff, method, outputDir, hard, precision, length, i);
+ pDataArray.push_back(tempCluster);
+ processIDS.push_back(i);
+
+ //MySeqSumThreadFunction is in header. It must be global or static to work with the threads.
+ //default security attributes, thread function name, argument to thread function, use default creation flags, returns the thread identifier
+ hThreadArray[i-1] = CreateThread(NULL, 0, MyClusterThreadFunction, pDataArray[i-1], 0, &dwThreadIdArray[i-1]);
+
+ }
+
+ //do your part
+ listFiles = cluster(dividedNames[0], labels);
+
+ //Wait until all threads have terminated.
+ WaitForMultipleObjects(processors-1, hThreadArray, TRUE, INFINITE);
+
+ //Close all thread handles and free memory allocations.
+ for(int i=0; i < pDataArray.size(); i++){
+ //get tag
+ tag = pDataArray[i]->tag;
+ //get listfiles created
+ for(int j=0; j < pDataArray[i]->listFiles.size(); j++){ listFiles.push_back(pDataArray[i]->listFiles[j]); }
+ //get labels
+ set<string>::iterator it;
+ for(it = pDataArray[i]->labels.begin(); it != pDataArray[i]->labels.end(); it++){ labels.insert(*it); }
+ //check cutoff
+ if (pDataArray[i]->cutoff < cutoff) { cutoff = pDataArray[i]->cutoff; }
+ CloseHandle(hThreadArray[i]);
+ delete pDataArray[i];
+ }
+
#endif
+
+ return listFiles;
}
catch(exception& e) {
vector<string> ClusterSplitCommand::cluster(vector< map<string, string> > distNames, set<string>& labels){
try {
- Cluster* cluster;
- SparseMatrix* matrix;
- ListVector* list;
- ListVector oldList;
- RAbundVector* rabund;
vector<string> listFileNames;
-
double smallestCutoff = cutoff;
//cluster each distance file
for (int i = 0; i < distNames.size(); i++) {
+
+ Cluster* cluster = NULL;
+ SparseMatrix* matrix = NULL;
+ ListVector* list = NULL;
+ ListVector oldList;
+ RAbundVector* rabund = NULL;
+
if (m->control_pressed) { return listFileNames; }
string thisNamefile = distNames[i].begin()->second;
oldList = *list;
matrix = read->getMatrix();
- delete read;
- delete nameMap;
+ delete read; read = NULL;
+ delete nameMap; nameMap = NULL;
#ifdef USE_MPI
}
delete matrix; delete list; delete cluster; delete rabund;
+ matrix = NULL; list = NULL; cluster = NULL; rabund = NULL;
listFile.close();
if (m->control_pressed) { //clean up