]> git.donarmstrong.com Git - mothur.git/blobdiff - indicatorcommand.cpp
changed random forest output filename
[mothur.git] / indicatorcommand.cpp
index b92c58b54852b35c80ad039720c847b8d35d02bb..fd818acb44c4c5280c6a54971d3d39413307861d 100644 (file)
@@ -291,8 +291,8 @@ int IndicatorCommand::execute(){
             for (int i = 0; i < m->Treenames.size(); i++) { 
                 nameMap.insert(m->Treenames[i]); 
                 //sanity check - is this a group that is not in the sharedfile?
+                if (i == 0) { gps.insert("Group1"); }
                                if (designfile == "") {
-                    if (i == 0) { gps.insert("Group1"); }
                                        if (!(m->inUsersGroups(m->Treenames[i], m->getAllGroups()))) {
                                                m->mothurOut("[ERROR]: " + m->Treenames[i] + " is not a group in your shared or relabund file."); m->mothurOutEndLine();
                                                mismatch = true;
@@ -453,7 +453,7 @@ int IndicatorCommand::GetIndicatorSpecies(){
                                
                        indicatorValues = getValues(groupings, indicatorGroups, randomGroupingsMap);
                        
-                       pValues = getPValues(groupings, randomGroupingsMap, lookup.size(), indicatorValues);                            
+                       pValues = getPValues(groupings, lookup.size(), indicatorValues);                                
                }else {
                        vector< vector<SharedRAbundFloatVector*> > groupings;
                        set<string> groupsAlreadyAdded;
@@ -476,7 +476,7 @@ int IndicatorCommand::GetIndicatorSpecies(){
                        
                        indicatorValues = getValues(groupings, indicatorGroups, randomGroupingsMap);
                        
-                       pValues = getPValues(groupings, randomGroupingsMap, lookupFloat.size(), indicatorValues);
+                       pValues = getPValues(groupings, lookupFloat.size(), indicatorValues);
                }
                        
                if (m->control_pressed) { out.close(); return 0; }
@@ -627,7 +627,7 @@ int IndicatorCommand::GetIndicatorSpecies(Tree*& T){
                                                                
                                indicatorValues = getValues(groupings, indicatorGroups, randomGroupingsMap);
                                
-                               pValues = getPValues(groupings, randomGroupingsMap, lookup.size(), indicatorValues);                            
+                               pValues = getPValues(groupings, lookup.size(), indicatorValues);                                
                        }else {
                                vector< vector<SharedRAbundFloatVector*> > groupings;
                                
@@ -676,7 +676,7 @@ int IndicatorCommand::GetIndicatorSpecies(Tree*& T){
                                
                                indicatorValues = getValues(groupings, indicatorGroups, randomGroupingsMap);
                                
-                               pValues = getPValues(groupings, randomGroupingsMap, lookupFloat.size(), indicatorValues);
+                               pValues = getPValues(groupings, lookupFloat.size(), indicatorValues);
                        }
                        
                        if (m->control_pressed) { out.close(); return 0; }
@@ -1142,7 +1142,7 @@ int IndicatorCommand::getSharedFloat(){
        }
 }
 //**********************************************************************************************************************
-vector<float> IndicatorCommand::driver(vector< vector<SharedRAbundFloatVector*> >& groupings, map< vector<int>, vector<int> > groupingsMap, int num, vector<float> indicatorValues, int numIters){
+vector<float> IndicatorCommand::driver(vector< vector<SharedRAbundFloatVector*> >& groupings, int num, vector<float> indicatorValues, int numIters){
        try {
                vector<float> pvalues;
                pvalues.resize(indicatorValues.size(), 0);
@@ -1150,7 +1150,7 @@ vector<float> IndicatorCommand::driver(vector< vector<SharedRAbundFloatVector*>
                
                for(int i=0;i<numIters;i++){
                        if (m->control_pressed) { break; }
-                       groupingsMap = randomizeGroupings(groupings, num);
+                       map< vector<int>, vector<int> > groupingsMap = randomizeGroupings(groupings, num);
                        vector<float> randomIndicatorValues = getValues(groupings, notUsedGroupings, groupingsMap);
                        
                        for (int j = 0; j < indicatorValues.size(); j++) {
@@ -1166,23 +1166,29 @@ vector<float> IndicatorCommand::driver(vector< vector<SharedRAbundFloatVector*>
        }
 }
 //**********************************************************************************************************************
-vector<float> IndicatorCommand::getPValues(vector< vector<SharedRAbundFloatVector*> >& groupings, map< vector<int>, vector<int> > groupingsMap, int num, vector<float> indicatorValues){
+vector<float> IndicatorCommand::getPValues(vector< vector<SharedRAbundFloatVector*> >& groupings, int num, vector<float> indicatorValues){
        try {
                vector<float> pvalues;
-               
-#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) || (__linux__) || (__unix__) || (__unix)
+
                if(processors == 1){
-                       pvalues = driver(groupings, groupingsMap, num, indicatorValues, iters);
-                       for (int i = 0; i < pvalues.size(); i++) { pvalues[i] /= (double)iters; }
+                       pvalues = driver(groupings, num, indicatorValues, iters);
+            for (int i = 0; i < pvalues.size(); i++) { pvalues[i] /= (double)iters; }
                }else{
+            //divide iters between processors
+                       vector<int> procIters;
+            int numItersPerProcessor = iters / processors;
+            
+            //divide iters between processes
+            for (int h = 0; h < processors; h++) {
+                if(h == processors - 1){ numItersPerProcessor = iters - h * numItersPerProcessor; }
+                procIters.push_back(numItersPerProcessor);
+            }
+            
+            vector<int> processIDS;
+            int process = 1;
                        
-                       //divide iters between processors
-                       int numItersPerProcessor = iters / processors;
-                       
-                       vector<int> processIDS;
-                       int process = 1;
-                       int num = 0;
-                       
+#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) || (__linux__) || (__unix__) || (__unix)
+                                               
                        //loop through and create all the processes you want
                        while (process != processors) {
                                int pid = fork();
@@ -1191,7 +1197,7 @@ vector<float> IndicatorCommand::getPValues(vector< vector<SharedRAbundFloatVecto
                                        processIDS.push_back(pid);  //create map from line number to pid so you can append files in correct order later
                                        process++;
                                }else if (pid == 0){
-                                       pvalues = driver(groupings, groupingsMap, num, indicatorValues, numItersPerProcessor);
+                                       pvalues = driver(groupings, num, indicatorValues, procIters[process]);
                                        
                                        //pass pvalues to parent
                                        ofstream out;
@@ -1215,9 +1221,7 @@ vector<float> IndicatorCommand::getPValues(vector< vector<SharedRAbundFloatVecto
                        }
                        
                        //do my part
-                       //special case for last processor in case it doesn't divide evenly
-                       numItersPerProcessor = iters - ((processors-1) * numItersPerProcessor);         
-                       pvalues = driver(groupings, groupingsMap, num, indicatorValues, numItersPerProcessor);
+                       pvalues = driver(groupings, num, indicatorValues, procIters[0]);
                        
                        //force parent to wait until all the processes are done
                        for (int i=0;i<processIDS.size();i++) { 
@@ -1239,12 +1243,71 @@ vector<float> IndicatorCommand::getPValues(vector< vector<SharedRAbundFloatVecto
                                }
                                in.close(); m->mothurRemove(tempFile);
                        }
-                       for (int i = 0; i < pvalues.size(); i++) { pvalues[i] /= (double)iters; } 
-               }
+                       for (int i = 0; i < pvalues.size(); i++) { pvalues[i] /= (double)iters; }
 #else
-               pvalues = driver(groupings, groupingsMap, num, indicatorValues, iters);
-               for (int i = 0; i < pvalues.size(); i++) { pvalues[i] /= (double)iters; }
+                       
+            //fill in functions
+            vector<indicatorData*> pDataArray;
+            DWORD   dwThreadIdArray[processors-1];
+            HANDLE  hThreadArray[processors-1];
+            
+            //Create processor worker threads.
+            for( int i=1; i<processors; i++ ){
+                
+                //make copy of lookup so we don't get access violations
+                vector< vector<SharedRAbundFloatVector*> > newGroupings;
+
+                for (int k = 0; k < groupings.size(); k++) {
+                    vector<SharedRAbundFloatVector*> newLookup;
+                    for (int l = 0; l < groupings[k].size(); l++) {
+                        SharedRAbundFloatVector* temp = new SharedRAbundFloatVector();
+                        temp->setLabel(groupings[k][l]->getLabel());
+                        temp->setGroup(groupings[k][l]->getGroup());
+                        newLookup.push_back(temp);
+                    }
+                    newGroupings.push_back(newLookup);
+                }
+                
+                //for each bin
+                for (int l = 0; l < groupings.size(); l++) {
+                    for (int k = 0; k < groupings[l][0]->getNumBins(); k++) {
+                        if (m->control_pressed) { for (int j = 0; j < newGroupings.size(); j++) { for (int u = 0; u < newGroupings[j].size(); u++) { delete newGroupings[j][u];  } } return pvalues; }
+                        
+                        for (int j = 0; j < groupings[l].size(); j++) { newGroupings[l][j]->push_back(groupings[l][j]->getAbundance(k), groupings[l][j]->getGroup()); }
+                    }
+                }
+        
+                vector<float> copyIValues = indicatorValues;
+
+                indicatorData* temp = new indicatorData(m, procIters[i], newGroupings, num, copyIValues);
+                pDataArray.push_back(temp);
+                processIDS.push_back(i);
+                
+                hThreadArray[i-1] = CreateThread(NULL, 0, MyIndicatorThreadFunction, pDataArray[i-1], 0, &dwThreadIdArray[i-1]);
+            }
+            
+            //do my part
+                       pvalues = driver(groupings, num, indicatorValues, procIters[0]);
+           
+            //Wait until all threads have terminated.
+            WaitForMultipleObjects(processors-1, hThreadArray, TRUE, INFINITE);
+            
+            //Close all thread handles and free memory allocations.
+            for(int i=0; i < pDataArray.size(); i++){
+                for (int j = 0; j < pDataArray[i]->pvalues.size(); j++) { pvalues[j] += pDataArray[i]->pvalues[j];  }
+                
+                for (int l = 0; l < pDataArray[i]->groupings.size(); l++) {
+                    for (int j = 0; j < pDataArray[i]->groupings[l].size(); j++) { delete pDataArray[i]->groupings[l][j]; }
+                }
+                
+                CloseHandle(hThreadArray[i]);
+                delete pDataArray[i];
+            }
+            
+            for (int i = 0; i < pvalues.size(); i++) { pvalues[i] /= (double)iters; }
 #endif
+               }
+
                
                return pvalues;
        }
@@ -1256,7 +1319,7 @@ vector<float> IndicatorCommand::getPValues(vector< vector<SharedRAbundFloatVecto
 
 //**********************************************************************************************************************
 //same as above, just data type difference
-vector<float> IndicatorCommand::driver(vector< vector<SharedRAbundVector*> >& groupings, map< vector<int>, vector<int> > groupingsMap, int num, vector<float> indicatorValues, int numIters){
+vector<float> IndicatorCommand::driver(vector< vector<SharedRAbundVector*> >& groupings, int num, vector<float> indicatorValues, int numIters){
        try {
                vector<float> pvalues;
                pvalues.resize(indicatorValues.size(), 0);
@@ -1264,7 +1327,7 @@ vector<float> IndicatorCommand::driver(vector< vector<SharedRAbundVector*> >& gr
                
                for(int i=0;i<numIters;i++){
                        if (m->control_pressed) { break; }
-                       groupingsMap = randomizeGroupings(groupings, num);
+                       map< vector<int>, vector<int> > groupingsMap = randomizeGroupings(groupings, num);
                        vector<float> randomIndicatorValues = getValues(groupings, notUsedGroupings, groupingsMap);
                        
                        for (int j = 0; j < indicatorValues.size(); j++) {
@@ -1281,22 +1344,29 @@ vector<float> IndicatorCommand::driver(vector< vector<SharedRAbundVector*> >& gr
 }
 //**********************************************************************************************************************
 //same as above, just data type difference
-vector<float> IndicatorCommand::getPValues(vector< vector<SharedRAbundVector*> >& groupings, map< vector<int>, vector<int> > groupingsMap, int num, vector<float> indicatorValues){
+vector<float> IndicatorCommand::getPValues(vector< vector<SharedRAbundVector*> >& groupings, int num, vector<float> indicatorValues){
        try {
                vector<float> pvalues;
-               
-#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) || (__linux__) || (__unix__) || (__unix)
+        
                if(processors == 1){
-                       pvalues = driver(groupings, groupingsMap, num, indicatorValues, iters);
-                       for (int i = 0; i < pvalues.size(); i++) { pvalues[i] /= (double)iters; }
+                       pvalues = driver(groupings, num, indicatorValues, iters);
+            for (int i = 0; i < pvalues.size(); i++) { pvalues[i] /= (double)iters; }
                }else{
+            //divide iters between processors
+                       vector<int> procIters;
+            int numItersPerProcessor = iters / processors;
+            
+            //divide iters between processes
+            for (int h = 0; h < processors; h++) {
+                if(h == processors - 1){ numItersPerProcessor = iters - h * numItersPerProcessor; }
+                procIters.push_back(numItersPerProcessor);
+            }
+            
+            vector<int> processIDS;
+            int process = 1;
                        
-                       //divide iters between processors
-                       int numItersPerProcessor = iters / processors;
-                       
-                       vector<int> processIDS;
-                       int process = 1;
-                       
+#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) || (__linux__) || (__unix__) || (__unix)
+            
                        //loop through and create all the processes you want
                        while (process != processors) {
                                int pid = fork();
@@ -1305,7 +1375,7 @@ vector<float> IndicatorCommand::getPValues(vector< vector<SharedRAbundVector*> >
                                        processIDS.push_back(pid);  //create map from line number to pid so you can append files in correct order later
                                        process++;
                                }else if (pid == 0){
-                                       pvalues = driver(groupings, groupingsMap, num, indicatorValues, numItersPerProcessor);
+                                       pvalues = driver(groupings, num, indicatorValues, procIters[process]);
                                        
                                        //pass pvalues to parent
                                        ofstream out;
@@ -1321,49 +1391,106 @@ vector<float> IndicatorCommand::getPValues(vector< vector<SharedRAbundVector*> >
                                        out.close();
                                        
                                        exit(0);
-                               }else { 
-                                       m->mothurOut("[ERROR]: unable to spawn the necessary processes."); m->mothurOutEndLine(); 
+                               }else {
+                                       m->mothurOut("[ERROR]: unable to spawn the necessary processes."); m->mothurOutEndLine();
                                        for (int i = 0; i < processIDS.size(); i++) { kill (processIDS[i], SIGINT); }
                                        exit(0);
                                }
                        }
                        
                        //do my part
-                       //special case for last processor in case it doesn't divide evenly
-                       numItersPerProcessor = iters - ((processors-1) * numItersPerProcessor);         
-                       pvalues = driver(groupings, groupingsMap, num, indicatorValues, numItersPerProcessor);
+                       pvalues = driver(groupings, num, indicatorValues, procIters[0]);
                        
                        //force parent to wait until all the processes are done
-                       for (int i=0;i<processIDS.size();i++) { 
+                       for (int i=0;i<processIDS.size();i++) {
                                int temp = processIDS[i];
                                wait(&temp);
                        }
                        
-                       //combine results                       
+                       //combine results
                        for (int i = 0; i < processIDS.size(); i++) {
                                ifstream in;
                                string tempFile =  toString(processIDS[i]) + ".pvalues.temp";
                                m->openInputFile(tempFile, in);
                                
                                ////// to do ///////////
-                               int numTemp; numTemp = 0; 
+                               int numTemp; numTemp = 0;
                                for (int j = 0; j < pvalues.size(); j++) {
                                        in >> numTemp; m->gobble(in);
                                        pvalues[j] += numTemp;
                                }
                                in.close(); m->mothurRemove(tempFile);
                        }
-                       for (int i = 0; i < pvalues.size(); i++) { pvalues[i] /= (double)iters; } 
-               }
+                       for (int i = 0; i < pvalues.size(); i++) { pvalues[i] /= (double)iters; }
 #else
-               pvalues = driver(groupings, groupingsMap, num, indicatorValues, iters);
-               for (int i = 0; i < pvalues.size(); i++) { pvalues[i] /= (double)iters; }
+            
+            //fill in functions
+            vector<indicatorData*> pDataArray;
+            DWORD   dwThreadIdArray[processors-1];
+            HANDLE  hThreadArray[processors-1];
+            
+            //Create processor worker threads.
+            for( int i=1; i<processors; i++ ){
+                
+                //make copy of lookup so we don't get access violations
+                vector< vector<SharedRAbundFloatVector*> > newGroupings;
+                
+                for (int k = 0; k < groupings.size(); k++) {
+                    vector<SharedRAbundFloatVector*> newLookup;
+                    for (int l = 0; l < groupings[k].size(); l++) {
+                        SharedRAbundFloatVector* temp = new SharedRAbundFloatVector();
+                        temp->setLabel(groupings[k][l]->getLabel());
+                        temp->setGroup(groupings[k][l]->getGroup());
+                        newLookup.push_back(temp);
+                    }
+                    newGroupings.push_back(newLookup);
+                }
+                
+                //for each bin
+                for (int l = 0; l < groupings.size(); l++) {
+                    for (int k = 0; k < groupings[l][0]->getNumBins(); k++) {
+                        if (m->control_pressed) { for (int j = 0; j < newGroupings.size(); j++) { for (int u = 0; u < newGroupings[j].size(); u++) { delete newGroupings[j][u];  } } return pvalues; }
+                        
+                        for (int j = 0; j < groupings[l].size(); j++) { newGroupings[l][j]->push_back((float)(groupings[l][j]->getAbundance(k)), groupings[l][j]->getGroup()); }
+                    }
+                }
+                
+                vector<float> copyIValues = indicatorValues;
+                
+                indicatorData* temp = new indicatorData(m, procIters[i], newGroupings, num, copyIValues);
+                pDataArray.push_back(temp);
+                processIDS.push_back(i);
+                
+                hThreadArray[i-1] = CreateThread(NULL, 0, MyIndicatorThreadFunction, pDataArray[i-1], 0, &dwThreadIdArray[i-1]);
+            }
+            
+            //do my part
+                       pvalues = driver(groupings, num, indicatorValues, procIters[0]);
+            
+            //Wait until all threads have terminated.
+            WaitForMultipleObjects(processors-1, hThreadArray, TRUE, INFINITE);
+            
+            //Close all thread handles and free memory allocations.
+            for(int i=0; i < pDataArray.size(); i++){
+                for (int j = 0; j < pDataArray[i]->pvalues.size(); j++) { pvalues[j] += pDataArray[i]->pvalues[j];  }
+                
+                for (int l = 0; l < pDataArray[i]->groupings.size(); l++) {
+                    for (int j = 0; j < pDataArray[i]->groupings[l].size(); j++) { delete pDataArray[i]->groupings[l][j]; }
+                }
+                
+                CloseHandle(hThreadArray[i]);
+                delete pDataArray[i];
+            }
+            
+            for (int i = 0; i < pvalues.size(); i++) { pvalues[i] /= (double)iters; }
 #endif
+               }
+        
                
                return pvalues;
        }
        catch(exception& e) {
-               m->errorOut(e, "IndicatorCommand", "getPValues");       
+               m->errorOut(e, "IndicatorCommand", "getPValues");
                exit(1);
        }
 }