X-Git-Url: https://git.donarmstrong.com/?a=blobdiff_plain;f=preclustercommand.cpp;h=582da493e4dc5cdf5267d65606181d1594f56a87;hb=8dd3c225255d7084e3aff8740aa4f1f1cabb367a;hp=74eddbee75d7021b3ee8584da6b8236df52eca4c;hpb=c47e480b743d1c242b8c527b6d12f992c68b8c2c;p=mothur.git diff --git a/preclustercommand.cpp b/preclustercommand.cpp index 74eddbe..582da49 100644 --- a/preclustercommand.cpp +++ b/preclustercommand.cpp @@ -8,12 +8,8 @@ */ #include "preclustercommand.h" -#include "sequenceparser.h" #include "deconvolutecommand.h" -//********************************************************************************************************************** -inline bool comparePriority(seqPNode first, seqPNode second) { return (first.numIdentical > second.numIdentical); } - //********************************************************************************************************************** vector PreClusterCommand::setParameters(){ try { @@ -21,7 +17,7 @@ vector PreClusterCommand::setParameters(){ CommandParameter pname("name", "InputTypes", "", "", "none", "none", "none",false,false); parameters.push_back(pname); CommandParameter pgroup("group", "InputTypes", "", "", "none", "none", "none",false,false); parameters.push_back(pgroup); CommandParameter pdiffs("diffs", "Number", "", "0", "", "", "",false,false); parameters.push_back(pdiffs); - CommandParameter pbygroup("bygroup", "Boolean", "", "T", "", "", "",false,false); parameters.push_back(pbygroup); + CommandParameter pprocessors("processors", "Number", "", "1", "", "", "",false,false); parameters.push_back(pprocessors); CommandParameter pinputdir("inputdir", "String", "", "", "", "", "",false,false); parameters.push_back(pinputdir); CommandParameter poutputdir("outputdir", "String", "", "", "", "", "",false,false); parameters.push_back(poutputdir); @@ -42,9 +38,8 @@ string PreClusterCommand::getHelpString(){ helpString += "The pre.cluster command outputs a new fasta and name file.\n"; helpString += "The pre.cluster command parameters are fasta, names and diffs. The fasta parameter is required. \n"; helpString += "The names parameter allows you to give a list of seqs that are identical. This file is 2 columns, first column is name or representative sequence, second column is a list of its identical sequences separated by commas.\n"; - helpString += "The group parameter allows you to provide a group file. \n"; + helpString += "The group parameter allows you to provide a group file so you can cluster by group. \n"; helpString += "The diffs parameter allows you to specify maximum number of mismatched bases allowed between sequences in a grouping. The default is 1.\n"; - helpString += "The bygroup parameter allows you to indicate you whether you want to cluster sequences only within each group, default=T, when a groupfile is given. \n"; helpString += "The pre.cluster command should be in the following format: \n"; helpString += "pre.cluster(fasta=yourFastaFile, names=yourNamesFile, diffs=yourMaxDiffs) \n"; helpString += "Example pre.cluster(fasta=amazon.fasta, diffs=2).\n"; @@ -65,6 +60,7 @@ PreClusterCommand::PreClusterCommand(){ vector tempOutNames; outputTypes["fasta"] = tempOutNames; outputTypes["name"] = tempOutNames; + outputTypes["map"] = tempOutNames; } catch(exception& e) { m->errorOut(e, "PreClusterCommand", "PreClusterCommand"); @@ -99,6 +95,7 @@ PreClusterCommand::PreClusterCommand(string option) { vector tempOutNames; outputTypes["fasta"] = tempOutNames; outputTypes["name"] = tempOutNames; + outputTypes["map"] = tempOutNames; //if the user changes the input directory command factory will send this info to us in the output parameter string inputDir = validParameter.validFile(parameters, "inputdir", false); @@ -150,26 +147,25 @@ PreClusterCommand::PreClusterCommand(string option) { // ...at some point should added some additional type checking... namefile = validParameter.validFile(parameters, "name", true); if (namefile == "not found") { namefile = ""; } - else if (namefile == "not open") { abort = true; } + else if (namefile == "not open") { namefile = ""; abort = true; } else { m->setNameFile(namefile); } groupfile = validParameter.validFile(parameters, "group", true); - if (groupfile == "not found") { groupfile = ""; } + if (groupfile == "not found") { groupfile = ""; bygroup = false; } else if (groupfile == "not open") { abort = true; groupfile = ""; } - else { m->setGroupFile(groupfile); } + else { m->setGroupFile(groupfile); bygroup = true; } string temp = validParameter.validFile(parameters, "diffs", false); if(temp == "not found"){ temp = "1"; } - convert(temp, diffs); + m->mothurConvert(temp, diffs); - temp = validParameter.validFile(parameters, "bygroup", false); - if (temp == "not found") { - if (groupfile != "") { temp = "T"; } - else { temp = "F"; } - } - bygroup = m->isTrue(temp); + temp = validParameter.validFile(parameters, "processors", false); if (temp == "not found"){ temp = m->getProcessors(); } + m->setProcessors(temp); + m->mothurConvert(temp, processors); - if (bygroup && (groupfile == "")) { m->mothurOut("You cannot set bygroup=T, unless you provide a groupfile."); m->mothurOutEndLine(); abort=true; } - + if (namefile == "") { + vector files; files.push_back(fastafile); + parser.getNameFile(files); + } } } @@ -190,11 +186,16 @@ int PreClusterCommand::execute(){ string fileroot = outputDir + m->getRootName(m->getSimpleName(fastafile)); string newFastaFile = fileroot + "precluster" + m->getExtension(fastafile); string newNamesFile = fileroot + "precluster.names"; + string newMapFile = fileroot + "precluster.map"; //add group name if by group + outputNames.push_back(newFastaFile); outputTypes["fasta"].push_back(newFastaFile); + outputNames.push_back(newNamesFile); outputTypes["name"].push_back(newNamesFile); + if (bygroup) { //clear out old files ofstream outFasta; m->openOutputFile(newFastaFile, outFasta); outFasta.close(); ofstream outNames; m->openOutputFile(newNamesFile, outNames); outNames.close(); + newMapFile = fileroot + "precluster."; //parse fasta and name file by group SequenceParser* parser; @@ -203,40 +204,13 @@ int PreClusterCommand::execute(){ vector groups = parser->getNamesOfGroups(); - //precluster each group - for (int i = 0; i < groups.size(); i++) { - - start = time(NULL); - - if (m->control_pressed) { delete parser; m->mothurRemove(newFastaFile); m->mothurRemove(newNamesFile); return 0; } - - m->mothurOutEndLine(); m->mothurOut("Processing group " + groups[i] + ":"); m->mothurOutEndLine(); - - map thisNameMap; - if (namefile != "") { thisNameMap = parser->getNameMap(groups[i]); } - vector thisSeqs = parser->getSeqs(groups[i]); - - //fill alignSeqs with this groups info. - int numSeqs = loadSeqs(thisNameMap, thisSeqs); - - if (m->control_pressed) { delete parser; m->mothurRemove(newFastaFile); m->mothurRemove(newNamesFile); return 0; } - - if (diffs > length) { m->mothurOut("Error: diffs is greater than your sequence length."); m->mothurOutEndLine(); return 0; } - - int count = process(); - - if (m->control_pressed) { delete parser; m->mothurRemove(newFastaFile); m->mothurRemove(newNamesFile); return 0; } - - m->mothurOut("Total number of sequences before precluster was " + toString(alignSeqs.size()) + "."); m->mothurOutEndLine(); - m->mothurOut("pre.cluster removed " + toString(count) + " sequences."); m->mothurOutEndLine(); m->mothurOutEndLine(); - printData(newFastaFile, newNamesFile); - - m->mothurOut("It took " + toString(time(NULL) - start) + " secs to cluster " + toString(numSeqs) + " sequences."); m->mothurOutEndLine(); - - } + if(processors == 1) { driverGroups(parser, newFastaFile, newNamesFile, newMapFile, 0, groups.size(), groups); } + else { createProcessesGroups(parser, newFastaFile, newNamesFile, newMapFile, groups); } delete parser; + if (m->control_pressed) { for (int i = 0; i < outputNames.size(); i++) { m->mothurRemove(outputNames[i]); } return 0; } + //run unique.seqs for deconvolute results string inputString = "fasta=" + newFastaFile; if (namefile != "") { inputString += ", name=" + newNamesFile; } @@ -253,23 +227,25 @@ int PreClusterCommand::execute(){ m->mothurOut("/******************************************/"); m->mothurOutEndLine(); - newNamesFile = filenames["name"][0]; - newFastaFile = filenames["fasta"][0]; + m->renameFile(filenames["fasta"][0], newFastaFile); + m->mothurOut("It took " + toString(time(NULL) - start) + " secs to run pre.cluster."); m->mothurOutEndLine(); + }else { if (namefile != "") { readNameFile(); } //reads fasta file and return number of seqs int numSeqs = readFASTA(); //fills alignSeqs and makes all seqs active - if (m->control_pressed) { return 0; } + if (m->control_pressed) { for (int i = 0; i < outputNames.size(); i++) { m->mothurRemove(outputNames[i]); } return 0; } if (numSeqs == 0) { m->mothurOut("Error reading fasta file...please correct."); m->mothurOutEndLine(); return 0; } if (diffs > length) { m->mothurOut("Error: diffs is greater than your sequence length."); m->mothurOutEndLine(); return 0; } - int count = process(); + int count = process(newMapFile); + outputNames.push_back(newMapFile); outputTypes["map"].push_back(newMapFile); - if (m->control_pressed) { return 0; } + if (m->control_pressed) { for (int i = 0; i < outputNames.size(); i++) { m->mothurRemove(outputNames[i]); } return 0; } m->mothurOut("Total number of sequences before precluster was " + toString(alignSeqs.size()) + "."); m->mothurOutEndLine(); m->mothurOut("pre.cluster removed " + toString(count) + " sequences."); m->mothurOutEndLine(); m->mothurOutEndLine(); @@ -277,13 +253,12 @@ int PreClusterCommand::execute(){ m->mothurOut("It took " + toString(time(NULL) - start) + " secs to cluster " + toString(numSeqs) + " sequences."); m->mothurOutEndLine(); } - - if (m->control_pressed) { m->mothurRemove(newFastaFile); m->mothurRemove(newNamesFile); return 0; } + + if (m->control_pressed) { for (int i = 0; i < outputNames.size(); i++) { m->mothurRemove(outputNames[i]); } return 0; } m->mothurOutEndLine(); m->mothurOut("Output File Names: "); m->mothurOutEndLine(); - m->mothurOut(newFastaFile); m->mothurOutEndLine(); outputNames.push_back(newFastaFile); outputTypes["fasta"].push_back(newFastaFile); - m->mothurOut(newNamesFile); m->mothurOutEndLine(); outputNames.push_back(newNamesFile); outputTypes["name"].push_back(newNamesFile); + for (int i = 0; i < outputNames.size(); i++) { m->mothurOut(outputNames[i]); m->mothurOutEndLine(); } m->mothurOutEndLine(); //set fasta file as new current fastafile @@ -307,9 +282,169 @@ int PreClusterCommand::execute(){ } } /**************************************************************************************************/ -int PreClusterCommand::process(){ +int PreClusterCommand::createProcessesGroups(SequenceParser* parser, string newFName, string newNName, string newMFile, vector groups) { try { + vector processIDS; + int process = 1; + int num = 0; + + //sanity check + if (groups.size() < processors) { processors = groups.size(); } + + //divide the groups between the processors + vector lines; + int numGroupsPerProcessor = groups.size() / processors; + for (int i = 0; i < processors; i++) { + int startIndex = i * numGroupsPerProcessor; + int endIndex = (i+1) * numGroupsPerProcessor; + if(i == (processors - 1)){ endIndex = groups.size(); } + lines.push_back(linePair(startIndex, endIndex)); + } + +#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) + + //loop through and create all the processes you want + while (process != processors) { + int pid = fork(); + + if (pid > 0) { + processIDS.push_back(pid); //create map from line number to pid so you can append files in correct order later + process++; + }else if (pid == 0){ + num = driverGroups(parser, newFName + toString(getpid()) + ".temp", newNName + toString(getpid()) + ".temp", newMFile, lines[process].start, lines[process].end, groups); + exit(0); + }else { + m->mothurOut("[ERROR]: unable to spawn the necessary processes."); m->mothurOutEndLine(); + for (int i = 0; i < processIDS.size(); i++) { kill (processIDS[i], SIGINT); } + exit(0); + } + } + + //do my part + num = driverGroups(parser, newFName, newNName, newMFile, lines[0].start, lines[0].end, groups); + + //force parent to wait until all the processes are done + for (int i=0;i pDataArray; + DWORD dwThreadIdArray[processors-1]; + HANDLE hThreadArray[processors-1]; + + //Create processor worker threads. + for( int i=1; imapFileNames.size(); j++) { + outputNames.push_back(pDataArray[i]->mapFileNames[j]); outputTypes["map"].push_back(pDataArray[i]->mapFileNames[j]); + } + CloseHandle(hThreadArray[i]); + delete pDataArray[i]; + } + +#endif + + //append output files + for(int i=0;igetFullPathName(".\\" + newFName); + //newNName = m->getFullPathName(".\\" + newNName); + + m->appendFiles((newFName + toString(processIDS[i]) + ".temp"), newFName); + m->mothurRemove((newFName + toString(processIDS[i]) + ".temp")); + + m->appendFiles((newNName + toString(processIDS[i]) + ".temp"), newNName); + m->mothurRemove((newNName + toString(processIDS[i]) + ".temp")); + } + + return num; + + } + catch(exception& e) { + m->errorOut(e, "PreClusterCommand", "createProcessesGroups"); + exit(1); + } +} +/**************************************************************************************************/ +int PreClusterCommand::driverGroups(SequenceParser* parser, string newFFile, string newNFile, string newMFile, int start, int end, vector groups){ + try { + + int numSeqs = 0; + + //precluster each group + for (int i = start; i < end; i++) { + + start = time(NULL); + + if (m->control_pressed) { return 0; } + + m->mothurOutEndLine(); m->mothurOut("Processing group " + groups[i] + ":"); m->mothurOutEndLine(); + + map thisNameMap; + if (namefile != "") { thisNameMap = parser->getNameMap(groups[i]); } + vector thisSeqs = parser->getSeqs(groups[i]); + + //fill alignSeqs with this groups info. + numSeqs = loadSeqs(thisNameMap, thisSeqs); + + if (m->control_pressed) { return 0; } + + if (diffs > length) { m->mothurOut("Error: diffs is greater than your sequence length."); m->mothurOutEndLine(); m->control_pressed = true; return 0; } + + int count = process(newMFile+groups[i]+".map"); + outputNames.push_back(newMFile+groups[i]+".map"); outputTypes["map"].push_back(newMFile+groups[i]+".map"); + + if (m->control_pressed) { return 0; } + + m->mothurOut("Total number of sequences before pre.cluster was " + toString(alignSeqs.size()) + "."); m->mothurOutEndLine(); + m->mothurOut("pre.cluster removed " + toString(count) + " sequences."); m->mothurOutEndLine(); m->mothurOutEndLine(); + printData(newFFile, newNFile); + + m->mothurOut("It took " + toString(time(NULL) - start) + " secs to cluster " + toString(numSeqs) + " sequences."); m->mothurOutEndLine(); + + } + + return numSeqs; + } + catch(exception& e) { + m->errorOut(e, "PreClusterCommand", "driverGroups"); + exit(1); + } +} +/**************************************************************************************************/ +int PreClusterCommand::process(string newMapFile){ + try { + ofstream out; + m->openOutputFile(newMapFile, out); + //sort seqs by number of identical seqs sort(alignSeqs.begin(), alignSeqs.end(), comparePriority); @@ -324,10 +459,12 @@ int PreClusterCommand::process(){ if (alignSeqs[i].active) { //this sequence has not been merged yet + string chunk = alignSeqs[i].seq.getName() + "\t" + toString(alignSeqs[i].numIdentical) + "\t" + toString(0) + "\t" + alignSeqs[i].seq.getAligned() + "\n"; + //try to merge it with all smaller seqs for (int j = i+1; j < numSeqs; j++) { - if (m->control_pressed) { return 0; } + if (m->control_pressed) { out.close(); return 0; } if (alignSeqs[j].active) { //this sequence has not been merged yet //are you within "diff" bases @@ -338,19 +475,24 @@ int PreClusterCommand::process(){ alignSeqs[i].names += ',' + alignSeqs[j].names; alignSeqs[i].numIdentical += alignSeqs[j].numIdentical; + chunk += alignSeqs[j].seq.getName() + "\t" + toString(alignSeqs[j].numIdentical) + "\t" + toString(mismatch) + "\t" + alignSeqs[j].seq.getAligned() + "\n"; + alignSeqs[j].active = 0; alignSeqs[j].numIdentical = 0; count++; } }//end if j active - }//end if i != j + }//end for loop j //remove from active list alignSeqs[i].active = 0; + out << "ideal_seq_" << (i+1) << '\t' << alignSeqs[i].numIdentical << endl << chunk << endl;; + }//end if active i if(i % 100 == 0) { m->mothurOut(toString(i) + "\t" + toString(numSeqs - count) + "\t" + toString(count)); m->mothurOutEndLine(); } } + out.close(); if(numSeqs % 100 != 0) { m->mothurOut(toString(numSeqs) + "\t" + toString(numSeqs - count) + "\t" + toString(count)); m->mothurOutEndLine(); } @@ -436,7 +578,7 @@ int PreClusterCommand::loadSeqs(map& thisName, vector& if (it == thisName.end()) { m->mothurOut(thisSeqs[i].getName() + " is not in your names file, please correct."); m->mothurOutEndLine(); error = true; } else{ //get number of reps - int numReps = 0; + int numReps = 1; for(int j=0;j<(it->second).length();j++){ if((it->second)[j] == ','){ numReps++; } } @@ -500,7 +642,7 @@ void PreClusterCommand::printData(string newfasta, string newname){ m->openOutputFile(newfasta, outFasta); m->openOutputFile(newname, outNames); } - + for (int i = 0; i < alignSeqs.size(); i++) { if (alignSeqs[i].numIdentical != 0) { alignSeqs[i].seq.printSequence(outFasta);