From: westcott Date: Mon, 24 Oct 2011 11:53:53 +0000 (+0000) Subject: added processors to pre.cluster for mac and windows. Fixed bug in reNameFile function... X-Git-Url: https://git.donarmstrong.com/?p=mothur.git;a=commitdiff_plain;h=192b4636345c51d962d4711206535e34cb2fd97c added processors to pre.cluster for mac and windows. Fixed bug in reNameFile function in mothurOut that prevented Windows from renaming the file. If user runs cluster with a phylip formatted distance file and no cutoff, use cluster.classic to save memory and time. --- diff --git a/chimerauchimecommand.cpp b/chimerauchimecommand.cpp index 2b24fd8..caddc27 100644 --- a/chimerauchimecommand.cpp +++ b/chimerauchimecommand.cpp @@ -1344,7 +1344,7 @@ int ChimeraUchimeCommand::createProcesses(string outputFileName, string filename //append output files - for(int i=0;iappendFiles((outputFileName + toString(processIDS[i]) + ".temp"), outputFileName); m->mothurRemove((outputFileName + toString(processIDS[i]) + ".temp")); @@ -1436,7 +1436,7 @@ int ChimeraUchimeCommand::createProcessesGroups(SequenceParser& parser, string o //append output files - for(int i=0;iappendFiles((outputFName + toString(processIDS[i]) + ".temp"), outputFName); m->mothurRemove((outputFName + toString(processIDS[i]) + ".temp")); diff --git a/clustercommand.cpp b/clustercommand.cpp index 678912c..47f5531 100644 --- a/clustercommand.cpp +++ b/clustercommand.cpp @@ -11,6 +11,7 @@ #include "readphylip.h" #include "readcolumn.h" #include "readmatrix.hpp" +#include "clusterdoturcommand.h" //********************************************************************************************************************** vector ClusterCommand::setParameters(){ @@ -207,7 +208,7 @@ ClusterCommand::ClusterCommand(string option) { timing = validParameter.validFile(parameters, "timing", false); if (timing == "not found") { timing = "F"; } - } + } } catch(exception& e) { m->errorOut(e, "ClusterCommand", "ClusterCommand"); @@ -223,6 +224,34 @@ int ClusterCommand::execute(){ if (abort == true) { if (calledHelp) { return 0; } return 2; } + //phylip file given and cutoff not given - use cluster.classic because it uses less memory and is faster + if ((format == "phylip") && (cutoff > 10.0)) { + m->mothurOutEndLine(); m->mothurOut("You are using a phylip file and no cutoff. I will run cluster.classic to save memory and time."); m->mothurOutEndLine(); + + //run unique.seqs for deconvolute results + string inputString = "phylip=" + distfile; + if (namefile != "") { inputString += ", name=" + namefile; } + inputString += ", precision=" + toString(precision); + inputString += ", method=" + method; + if (hard) { inputString += ", hard=T"; } + else { inputString += ", hard=F"; } + if (sim) { inputString += ", sim=T"; } + else { inputString += ", sim=F"; } + + + m->mothurOutEndLine(); + m->mothurOut("/------------------------------------------------------------/"); m->mothurOutEndLine(); + m->mothurOut("Running command: cluster.classic(" + inputString + ")"); m->mothurOutEndLine(); + + Command* clusterClassicCommand = new ClusterDoturCommand(inputString); + clusterClassicCommand->execute(); + delete clusterClassicCommand; + + m->mothurOut("/------------------------------------------------------------/"); m->mothurOutEndLine(); + + return 0; + } + ReadMatrix* read; if (format == "column") { read = new ReadColumnMatrix(columnfile, sim); } //sim indicates whether its a similarity matrix else if (format == "phylip") { read = new ReadPhylipMatrix(phylipfile, sim); } diff --git a/makefile b/makefile index 35857e0..44d8a81 100644 --- a/makefile +++ b/makefile @@ -31,7 +31,7 @@ ifeq ($(strip $(64BIT_VERSION)),yes) #if you using cygwin to build Windows the following line #CXX = x86_64-w64-mingw32-g++ #CC = x86_64-w64-mingw32-g++ - #TARGET_ARCH += -m64 + #TARGET_ARCH += -m64 -static #if you are a linux user use the following line #CXXFLAGS += -mtune=native -march=native -m64 diff --git a/mothurout.cpp b/mothurout.cpp index df1a1c0..1920221 100644 --- a/mothurout.cpp +++ b/mothurout.cpp @@ -891,10 +891,10 @@ int MothurOut::renameFile(string oldName, string newName){ try { ifstream inTest; int exist = openInputFile(newName, inTest, ""); + inTest.close(); #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) if (exist == 0) { //you could open it so you want to delete it - inTest.close(); string command = "rm " + newName; system(command.c_str()); } diff --git a/preclustercommand.cpp b/preclustercommand.cpp index 4b04529..0810a02 100644 --- a/preclustercommand.cpp +++ b/preclustercommand.cpp @@ -8,12 +8,8 @@ */ #include "preclustercommand.h" -#include "sequenceparser.h" #include "deconvolutecommand.h" -//********************************************************************************************************************** -inline bool comparePriority(seqPNode first, seqPNode second) { return (first.numIdentical > second.numIdentical); } - //********************************************************************************************************************** vector PreClusterCommand::setParameters(){ try { @@ -21,7 +17,7 @@ vector PreClusterCommand::setParameters(){ CommandParameter pname("name", "InputTypes", "", "", "none", "none", "none",false,false); parameters.push_back(pname); CommandParameter pgroup("group", "InputTypes", "", "", "none", "none", "none",false,false); parameters.push_back(pgroup); CommandParameter pdiffs("diffs", "Number", "", "0", "", "", "",false,false); parameters.push_back(pdiffs); - CommandParameter pbygroup("bygroup", "Boolean", "", "T", "", "", "",false,false); parameters.push_back(pbygroup); + CommandParameter pprocessors("processors", "Number", "", "1", "", "", "",false,false); parameters.push_back(pprocessors); CommandParameter pinputdir("inputdir", "String", "", "", "", "", "",false,false); parameters.push_back(pinputdir); CommandParameter poutputdir("outputdir", "String", "", "", "", "", "",false,false); parameters.push_back(poutputdir); @@ -42,9 +38,8 @@ string PreClusterCommand::getHelpString(){ helpString += "The pre.cluster command outputs a new fasta and name file.\n"; helpString += "The pre.cluster command parameters are fasta, names and diffs. The fasta parameter is required. \n"; helpString += "The names parameter allows you to give a list of seqs that are identical. This file is 2 columns, first column is name or representative sequence, second column is a list of its identical sequences separated by commas.\n"; - helpString += "The group parameter allows you to provide a group file. \n"; + helpString += "The group parameter allows you to provide a group file so you can cluster by group. \n"; helpString += "The diffs parameter allows you to specify maximum number of mismatched bases allowed between sequences in a grouping. The default is 1.\n"; - helpString += "The bygroup parameter allows you to indicate you whether you want to cluster sequences only within each group, default=T, when a groupfile is given. \n"; helpString += "The pre.cluster command should be in the following format: \n"; helpString += "pre.cluster(fasta=yourFastaFile, names=yourNamesFile, diffs=yourMaxDiffs) \n"; helpString += "Example pre.cluster(fasta=amazon.fasta, diffs=2).\n"; @@ -154,22 +149,18 @@ PreClusterCommand::PreClusterCommand(string option) { else { m->setNameFile(namefile); } groupfile = validParameter.validFile(parameters, "group", true); - if (groupfile == "not found") { groupfile = ""; } + if (groupfile == "not found") { groupfile = ""; bygroup = false; } else if (groupfile == "not open") { abort = true; groupfile = ""; } - else { m->setGroupFile(groupfile); } + else { m->setGroupFile(groupfile); bygroup = true; } string temp = validParameter.validFile(parameters, "diffs", false); if(temp == "not found"){ temp = "1"; } convert(temp, diffs); - temp = validParameter.validFile(parameters, "bygroup", false); - if (temp == "not found") { - if (groupfile != "") { temp = "T"; } - else { temp = "F"; } - } - bygroup = m->isTrue(temp); + temp = validParameter.validFile(parameters, "processors", false); if (temp == "not found"){ temp = m->getProcessors(); } + m->setProcessors(temp); + convert(temp, processors); + - if (bygroup && (groupfile == "")) { m->mothurOut("You cannot set bygroup=T, unless you provide a groupfile."); m->mothurOutEndLine(); abort=true; } - } } @@ -203,40 +194,17 @@ int PreClusterCommand::execute(){ vector groups = parser->getNamesOfGroups(); - //precluster each group - for (int i = 0; i < groups.size(); i++) { - - start = time(NULL); - - if (m->control_pressed) { delete parser; m->mothurRemove(newFastaFile); m->mothurRemove(newNamesFile); return 0; } - - m->mothurOutEndLine(); m->mothurOut("Processing group " + groups[i] + ":"); m->mothurOutEndLine(); - - map thisNameMap; - if (namefile != "") { thisNameMap = parser->getNameMap(groups[i]); } - vector thisSeqs = parser->getSeqs(groups[i]); - - //fill alignSeqs with this groups info. - int numSeqs = loadSeqs(thisNameMap, thisSeqs); - - if (m->control_pressed) { delete parser; m->mothurRemove(newFastaFile); m->mothurRemove(newNamesFile); return 0; } - - if (diffs > length) { m->mothurOut("Error: diffs is greater than your sequence length."); m->mothurOutEndLine(); return 0; } - - int count = process(); - - if (m->control_pressed) { delete parser; m->mothurRemove(newFastaFile); m->mothurRemove(newNamesFile); return 0; } - - m->mothurOut("Total number of sequences before pre.cluster was " + toString(alignSeqs.size()) + "."); m->mothurOutEndLine(); - m->mothurOut("pre.cluster removed " + toString(count) + " sequences."); m->mothurOutEndLine(); m->mothurOutEndLine(); - printData(newFastaFile, newNamesFile); - - m->mothurOut("It took " + toString(time(NULL) - start) + " secs to cluster " + toString(numSeqs) + " sequences."); m->mothurOutEndLine(); - - } +//#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) + if(processors == 1) { driverGroups(parser, newFastaFile, newNamesFile, 0, groups.size(), groups); } + else { createProcessesGroups(parser, newFastaFile, newNamesFile, groups); } +//#else +// driverGroups(parser, newFastaFile, newNamesFile, 0, groups.size(), groups); +//#endif delete parser; + if (m->control_pressed) { m->mothurRemove(newFastaFile); m->mothurRemove(newNamesFile); return 0; } + //run unique.seqs for deconvolute results string inputString = "fasta=" + newFastaFile; if (namefile != "") { inputString += ", name=" + newNamesFile; } @@ -254,6 +222,8 @@ int PreClusterCommand::execute(){ m->mothurOut("/******************************************/"); m->mothurOutEndLine(); m->renameFile(filenames["fasta"][0], newFastaFile); + + m->mothurOut("It took " + toString(time(NULL) - start) + " secs to run pre.cluster."); m->mothurOutEndLine(); }else { if (namefile != "") { readNameFile(); } @@ -306,6 +276,157 @@ int PreClusterCommand::execute(){ } } /**************************************************************************************************/ +int PreClusterCommand::createProcessesGroups(SequenceParser* parser, string newFName, string newNName, vector groups) { + try { + + vector processIDS; + int process = 1; + int num = 0; + + //sanity check + if (groups.size() < processors) { processors = groups.size(); } + + //divide the groups between the processors + vector lines; + int numGroupsPerProcessor = groups.size() / processors; + for (int i = 0; i < processors; i++) { + int startIndex = i * numGroupsPerProcessor; + int endIndex = (i+1) * numGroupsPerProcessor; + if(i == (processors - 1)){ endIndex = groups.size(); } + lines.push_back(linePair(startIndex, endIndex)); + } + +#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) + + //loop through and create all the processes you want + while (process != processors) { + int pid = fork(); + + if (pid > 0) { + processIDS.push_back(pid); //create map from line number to pid so you can append files in correct order later + process++; + }else if (pid == 0){ + num = driverGroups(parser, newFName + toString(getpid()) + ".temp", newNName + toString(getpid()) + ".temp", lines[process].start, lines[process].end, groups); + exit(0); + }else { + m->mothurOut("[ERROR]: unable to spawn the necessary processes."); m->mothurOutEndLine(); + for (int i = 0; i < processIDS.size(); i++) { kill (processIDS[i], SIGINT); } + exit(0); + } + } + + //do my part + num = driverGroups(parser, newFName, newNName, lines[0].start, lines[0].end, groups); + + //force parent to wait until all the processes are done + for (int i=0;i pDataArray; + DWORD dwThreadIdArray[processors-1]; + HANDLE hThreadArray[processors-1]; + + //Create processor worker threads. + for( int i=1; iappendFiles((newFName + toString(processIDS[i]) + ".temp"), newFName); + m->mothurRemove((newFName + toString(processIDS[i]) + ".temp")); + + m->appendFiles((newNName + toString(processIDS[i]) + ".temp"), newNName); + m->mothurRemove((newNName + toString(processIDS[i]) + ".temp")); + } + + return num; + + } + catch(exception& e) { + m->errorOut(e, "PreClusterCommand", "createProcessesGroups"); + exit(1); + } +} +/**************************************************************************************************/ +int PreClusterCommand::driverGroups(SequenceParser* parser, string newFFile, string newNFile, int start, int end, vector groups){ + try { + + int numSeqs = 0; + + //precluster each group + for (int i = start; i < end; i++) { + + start = time(NULL); + + if (m->control_pressed) { return 0; } + + m->mothurOutEndLine(); m->mothurOut("Processing group " + groups[i] + ":"); m->mothurOutEndLine(); + + map thisNameMap; + if (namefile != "") { thisNameMap = parser->getNameMap(groups[i]); } + vector thisSeqs = parser->getSeqs(groups[i]); + + //fill alignSeqs with this groups info. + numSeqs = loadSeqs(thisNameMap, thisSeqs); + + if (m->control_pressed) { return 0; } + + if (diffs > length) { m->mothurOut("Error: diffs is greater than your sequence length."); m->mothurOutEndLine(); m->control_pressed = true; return 0; } + + int count = process(); + + if (m->control_pressed) { return 0; } + + m->mothurOut("Total number of sequences before pre.cluster was " + toString(alignSeqs.size()) + "."); m->mothurOutEndLine(); + m->mothurOut("pre.cluster removed " + toString(count) + " sequences."); m->mothurOutEndLine(); m->mothurOutEndLine(); + printData(newFFile, newNFile); + + m->mothurOut("It took " + toString(time(NULL) - start) + " secs to cluster " + toString(numSeqs) + " sequences."); m->mothurOutEndLine(); + + } + + return numSeqs; + } + catch(exception& e) { + m->errorOut(e, "PreClusterCommand", "driverGroups"); + exit(1); + } +} +/**************************************************************************************************/ int PreClusterCommand::process(){ try { diff --git a/preclustercommand.h b/preclustercommand.h index 38bcd37..2868004 100644 --- a/preclustercommand.h +++ b/preclustercommand.h @@ -14,6 +14,7 @@ #include "command.hpp" #include "sequence.hpp" +#include "sequenceparser.h" /************************************************************/ struct seqPNode { @@ -26,6 +27,8 @@ struct seqPNode { ~seqPNode() {} }; /************************************************************/ +inline bool comparePriority(seqPNode first, seqPNode second) { return (first.numIdentical > second.numIdentical); } +//************************************************************/ class PreClusterCommand : public Command { @@ -46,7 +49,14 @@ public: void help() { m->mothurOut(getHelpString()); } private: - int diffs, length; + + struct linePair { + int start; + int end; + linePair(int i, int j) : start(i), end(j) {} + }; + + int diffs, length, processors; bool abort, bygroup; string fastafile, namefile, outputDir, groupfile; vector alignSeqs; //maps the number of identical seqs to a sequence @@ -64,12 +74,216 @@ private: void printData(string, string); //fasta filename, names file name int process(); int loadSeqs(map&, vector&); + int driverGroups(SequenceParser*, string, string, int, int, vector groups); + int createProcessesGroups(SequenceParser*, string, string, vector); }; -/************************************************************/ +/**************************************************************************************************/ +//custom data structure for threads to use. +// This is passed by void pointer so it can be any data type +// that can be passed using a single void pointer (LPVOID). +typedef struct preClusterData { + string fastafile; + string namefile; + string groupfile; + string newFName, newNName; + MothurOut* m; + int start; + int end; + int diffs, threadID; + vector groups; + + preClusterData(){} + preClusterData(string f, string n, string g, string nff, string nnf, vector gr, MothurOut* mout, int st, int en, int d, int tid) { + fastafile = f; + namefile = n; + groupfile = g; + newFName = nff; + newNName = nnf; + m = mout; + start = st; + end = en; + diffs = d; + threadID = tid; + groups = gr; + } +}; +/**************************************************************************************************/ +#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) +#else +static DWORD WINAPI MyPreclusterThreadFunction(LPVOID lpParam){ + preClusterData* pDataArray; + pDataArray = (preClusterData*)lpParam; + + try { + + //parse fasta and name file by group + SequenceParser* parser; + if (pDataArray->namefile != "") { parser = new SequenceParser(pDataArray->groupfile, pDataArray->fastafile, pDataArray->namefile); } + else { parser = new SequenceParser(pDataArray->groupfile, pDataArray->fastafile); } + + int numSeqs = 0; + vector alignSeqs; + //clear out old files + ofstream outF; pDataArray->m->openOutputFile(pDataArray->newFName, outF); outF.close(); + ofstream outN; pDataArray->m->openOutputFile(pDataArray->newNName, outN); outN.close(); + + //precluster each group + for (int k = pDataArray->start; k < pDataArray->end; k++) { + + int start = time(NULL); + + if (pDataArray->m->control_pressed) { delete parser; return 0; } + + pDataArray->m->mothurOutEndLine(); pDataArray->m->mothurOut("Processing group " + pDataArray->groups[k] + ":"); pDataArray->m->mothurOutEndLine(); + + map thisNameMap; + if (pDataArray->namefile != "") { thisNameMap = parser->getNameMap(pDataArray->groups[k]); } + vector thisSeqs = parser->getSeqs(pDataArray->groups[k]); + + //fill alignSeqs with this groups info. + //////////////////////////////////////////////////// + //numSeqs = loadSeqs(thisNameMap, thisSeqs); same function below + + int length = 0; + alignSeqs.clear(); + map::iterator it; + bool error = false; + + for (int i = 0; i < thisSeqs.size(); i++) { + + if (pDataArray->m->control_pressed) { delete parser; return 0; } + + if (pDataArray->namefile != "") { + it = thisNameMap.find(thisSeqs[i].getName()); + + //should never be true since parser checks for this + if (it == thisNameMap.end()) { pDataArray->m->mothurOut(thisSeqs[i].getName() + " is not in your names file, please correct."); pDataArray->m->mothurOutEndLine(); error = true; } + else{ + //get number of reps + int numReps = 1; + for(int j=0;j<(it->second).length();j++){ + if((it->second)[j] == ','){ numReps++; } + } + + seqPNode tempNode(numReps, thisSeqs[i], it->second); + alignSeqs.push_back(tempNode); + if (thisSeqs[i].getAligned().length() > length) { length = thisSeqs[i].getAligned().length(); } + } + }else { //no names file, you are identical to yourself + seqPNode tempNode(1, thisSeqs[i], thisSeqs[i].getName()); + alignSeqs.push_back(tempNode); + if (thisSeqs[i].getAligned().length() > length) { length = thisSeqs[i].getAligned().length(); } + } + } + + //sanity check + if (error) { pDataArray->m->control_pressed = true; } + + thisSeqs.clear(); + numSeqs = alignSeqs.size(); + + //////////////////////////////////////////////////// + + if (pDataArray->m->control_pressed) { delete parser; return 0; } + + if (pDataArray->diffs > length) { pDataArray->m->mothurOut("Error: diffs is greater than your sequence length."); pDataArray->m->mothurOutEndLine(); pDataArray->m->control_pressed = true; return 0; } + + //////////////////////////////////////////////////// + //int count = process(); - same function below + + //sort seqs by number of identical seqs + sort(alignSeqs.begin(), alignSeqs.end(), comparePriority); + + int count = 0; + + //think about running through twice... + for (int i = 0; i < numSeqs; i++) { + + //are you active + // itActive = active.find(alignSeqs[i].seq.getName()); + + if (alignSeqs[i].active) { //this sequence has not been merged yet + + //try to merge it with all smaller seqs + for (int j = i+1; j < numSeqs; j++) { + + if (pDataArray->m->control_pressed) { delete parser; return 0; } + + if (alignSeqs[j].active) { //this sequence has not been merged yet + //are you within "diff" bases + //int mismatch = calcMisMatches(alignSeqs[i].seq.getAligned(), alignSeqs[j].seq.getAligned()); + int mismatch = 0; + + for (int k = 0; k < alignSeqs[i].seq.getAligned().length(); k++) { + //do they match + if (alignSeqs[i].seq.getAligned()[k] != alignSeqs[j].seq.getAligned()[k]) { mismatch++; } + if (mismatch > pDataArray->diffs) { mismatch = length; break; } //to far to cluster + } + + if (mismatch <= pDataArray->diffs) { + //merge + alignSeqs[i].names += ',' + alignSeqs[j].names; + alignSeqs[i].numIdentical += alignSeqs[j].numIdentical; + + alignSeqs[j].active = 0; + alignSeqs[j].numIdentical = 0; + count++; + } + }//end if j active + }//end if i != j + + //remove from active list + alignSeqs[i].active = 0; + + }//end if active i + if(i % 100 == 0) { pDataArray->m->mothurOut(toString(i) + "\t" + toString(numSeqs - count) + "\t" + toString(count)); pDataArray->m->mothurOutEndLine(); } + } + + if(numSeqs % 100 != 0) { pDataArray->m->mothurOut(toString(numSeqs) + "\t" + toString(numSeqs - count) + "\t" + toString(count)); pDataArray->m->mothurOutEndLine(); } + //////////////////////////////////////////////////// + + if (pDataArray->m->control_pressed) { delete parser; return 0; } + + pDataArray->m->mothurOut("Total number of sequences before pre.cluster was " + toString(alignSeqs.size()) + ".");pDataArray-> m->mothurOutEndLine(); + pDataArray->m->mothurOut("pre.cluster removed " + toString(count) + " sequences."); pDataArray->m->mothurOutEndLine(); pDataArray->m->mothurOutEndLine(); + + //////////////////////////////////////////////////// + //printData(pDataArray->newFFile, pDataArray->newNFile); - same as below + ofstream outFasta; + ofstream outNames; + + pDataArray->m->openOutputFileAppend(pDataArray->newFName, outFasta); + pDataArray->m->openOutputFileAppend(pDataArray->newNName, outNames); + + for (int i = 0; i < alignSeqs.size(); i++) { + if (alignSeqs[i].numIdentical != 0) { + alignSeqs[i].seq.printSequence(outFasta); + outNames << alignSeqs[i].seq.getName() << '\t' << alignSeqs[i].names << endl; + } + } + + outFasta.close(); + outNames.close(); + //////////////////////////////////////////////////// + + pDataArray->m->mothurOut("It took " + toString(time(NULL) - start) + " secs to cluster " + toString(numSeqs) + " sequences."); pDataArray->m->mothurOutEndLine(); + + } + + return numSeqs; + + } + catch(exception& e) { + pDataArray->m->errorOut(e, "AlignCommand", "MyPreclusterThreadFunction"); + exit(1); + } +} +#endif +/**************************************************************************************************/ #endif