2 * clustersplitcommand.cpp
5 * Created by westcott on 5/19/10.
6 * Copyright 2010 Schloss Lab. All rights reserved.
10 #include "clustersplitcommand.h"
14 //**********************************************************************************************************************
15 vector<string> ClusterSplitCommand::setParameters(){
17 CommandParameter ptaxonomy("taxonomy", "InputTypes", "", "", "none", "none", "FastaTaxName",false,false); parameters.push_back(ptaxonomy);
18 CommandParameter pphylip("phylip", "InputTypes", "", "", "PhylipColumnFasta", "PhylipColumnFasta", "none",false,false); parameters.push_back(pphylip);
19 CommandParameter pfasta("fasta", "InputTypes", "", "", "PhylipColumnFasta", "PhylipColumnFasta", "FastaTaxName",false,false); parameters.push_back(pfasta);
20 CommandParameter pname("name", "InputTypes", "", "", "none", "none", "ColumnName-FastaTaxName",false,false); parameters.push_back(pname);
21 CommandParameter pcolumn("column", "InputTypes", "", "", "PhylipColumnFasta", "PhylipColumnFasta", "ColumnName",false,false); parameters.push_back(pcolumn);
22 CommandParameter ptaxlevel("taxlevel", "Number", "", "3", "", "", "",false,false); parameters.push_back(ptaxlevel);
23 CommandParameter psplitmethod("splitmethod", "Multiple", "classify-fasta-distance", "distance", "", "", "",false,false); parameters.push_back(psplitmethod);
24 CommandParameter plarge("large", "Boolean", "", "F", "", "", "",false,false); parameters.push_back(plarge);
25 CommandParameter pshowabund("showabund", "Boolean", "", "T", "", "", "",false,false); parameters.push_back(pshowabund);
26 CommandParameter ptiming("timing", "Boolean", "", "F", "", "", "",false,false); parameters.push_back(ptiming);
27 CommandParameter pprocessors("processors", "Number", "", "1", "", "", "",false,false); parameters.push_back(pprocessors);
28 CommandParameter pcutoff("cutoff", "Number", "", "0.25", "", "", "",false,false); parameters.push_back(pcutoff);
29 CommandParameter pprecision("precision", "Number", "", "100", "", "", "",false,false); parameters.push_back(pprecision);
30 CommandParameter pmethod("method", "Multiple", "furthest-nearest-average-weighted", "average", "", "", "",false,false); parameters.push_back(pmethod);
31 CommandParameter phard("hard", "Boolean", "", "T", "", "", "",false,false); parameters.push_back(phard);
32 CommandParameter pinputdir("inputdir", "String", "", "", "", "", "",false,false); parameters.push_back(pinputdir);
33 CommandParameter poutputdir("outputdir", "String", "", "", "", "", "",false,false); parameters.push_back(poutputdir);
35 vector<string> myArray;
36 for (int i = 0; i < parameters.size(); i++) { myArray.push_back(parameters[i].name); }
40 m->errorOut(e, "ClusterSplitCommand", "setParameters");
44 //**********************************************************************************************************************
45 string ClusterSplitCommand::getHelpString(){
47 string helpString = "";
48 helpString += "The cluster.split command parameter options are fasta, phylip, column, name, cutoff, precision, method, splitmethod, taxonomy, taxlevel, showabund, timing, hard, large, processors. Fasta or Phylip or column and name are required.\n";
49 helpString += "The cluster.split command can split your files in 3 ways. Splitting by distance file, by classification, or by classification also using a fasta file. \n";
50 helpString += "For the distance file method, you need only provide your distance file and mothur will split the file into distinct groups. \n";
51 helpString += "For the classification method, you need to provide your distance file and taxonomy file, and set the splitmethod to classify. \n";
52 helpString += "You will also need to set the taxlevel you want to split by. mothur will split the sequences into distinct taxonomy groups, and split the distance file based on those groups. \n";
53 helpString += "For the classification method using a fasta file, you need to provide your fasta file, names file and taxonomy file. \n";
54 helpString += "You will also need to set the taxlevel you want to split by. mothur will split the sequence into distinct taxonomy groups, and create distance files for each grouping. \n";
55 helpString += "The phylip and column parameter allow you to enter your distance file. \n";
56 helpString += "The fasta parameter allows you to enter your aligned fasta file. \n";
57 helpString += "The name parameter allows you to enter your name file and is required if your distance file is in column format. \n";
58 helpString += "The cutoff parameter allow you to set the distance you want to cluster to, default is 0.25. \n";
59 helpString += "The precision parameter allows you specify the precision of the precision of the distances outputted, default=100, meaning 2 decimal places. \n";
60 helpString += "The method allows you to specify what clustering algorythm you want to use, default=average, option furthest, nearest, or average. \n";
61 helpString += "The splitmethod parameter allows you to specify how you want to split your distance file before you cluster, default=distance, options distance, classify or fasta. \n";
62 helpString += "The taxonomy parameter allows you to enter the taxonomy file for your sequences, this is only valid if you are using splitmethod=classify. Be sure your taxonomy file does not include the probability scores. \n";
63 helpString += "The taxlevel parameter allows you to specify the taxonomy level you want to use to split the distance file, default=3, meaning use the first taxon in each list. \n";
64 helpString += "The large parameter allows you to indicate that your distance matrix is too large to fit in RAM. The default value is false.\n";
66 helpString += "When using MPI, the processors parameter is set to the number of MPI processes running. \n";
68 helpString += "The cluster.split command should be in the following format: \n";
69 helpString += "cluster.split(column=youDistanceFile, name=yourNameFile, method=yourMethod, cutoff=yourCutoff, precision=yourPrecision, splitmethod=yourSplitmethod, taxonomy=yourTaxonomyfile, taxlevel=yourtaxlevel) \n";
70 helpString += "Example: cluster.split(column=abrecovery.dist, name=abrecovery.names, method=furthest, cutoff=0.10, precision=1000, splitmethod=classify, taxonomy=abrecovery.silva.slv.taxonomy, taxlevel=5) \n";
74 m->errorOut(e, "ClusterSplitCommand", "getHelpString");
78 //**********************************************************************************************************************
79 ClusterSplitCommand::ClusterSplitCommand(){
81 abort = true; calledHelp = true;
83 vector<string> tempOutNames;
84 outputTypes["list"] = tempOutNames;
85 outputTypes["rabund"] = tempOutNames;
86 outputTypes["sabund"] = tempOutNames;
87 outputTypes["column"] = tempOutNames;
90 m->errorOut(e, "ClusterSplitCommand", "ClusterSplitCommand");
94 //**********************************************************************************************************************
95 //This function checks to make sure the cluster command has no errors and then clusters based on the method chosen.
96 ClusterSplitCommand::ClusterSplitCommand(string option) {
98 abort = false; calledHelp = false;
101 //allow user to run help
102 if(option == "help") { help(); abort = true; calledHelp = true; }
103 else if(option == "citation") { citation(); abort = true; calledHelp = true;}
106 vector<string> myArray = setParameters();
108 OptionParser parser(option);
109 map<string,string> parameters = parser.getParameters();
111 ValidParameters validParameter("cluster.split");
113 //check to make sure all parameters are valid for command
114 map<string,string>::iterator it;
115 for (it = parameters.begin(); it != parameters.end(); it++) {
116 if (validParameter.isValidParameter(it->first, myArray, it->second) != true) {
121 //initialize outputTypes
122 vector<string> tempOutNames;
123 outputTypes["list"] = tempOutNames;
124 outputTypes["rabund"] = tempOutNames;
125 outputTypes["sabund"] = tempOutNames;
126 outputTypes["column"] = tempOutNames;
128 //if the user changes the output directory command factory will send this info to us in the output parameter
129 outputDir = validParameter.validFile(parameters, "outputdir", false); if (outputDir == "not found"){ outputDir = ""; }
131 //if the user changes the input directory command factory will send this info to us in the output parameter
132 string inputDir = validParameter.validFile(parameters, "inputdir", false);
133 if (inputDir == "not found"){ inputDir = ""; }
136 it = parameters.find("phylip");
137 //user has given a template file
138 if(it != parameters.end()){
139 path = m->hasPath(it->second);
140 //if the user has not given a path then, add inputdir. else leave path alone.
141 if (path == "") { parameters["phylip"] = inputDir + it->second; }
144 it = parameters.find("column");
145 //user has given a template file
146 if(it != parameters.end()){
147 path = m->hasPath(it->second);
148 //if the user has not given a path then, add inputdir. else leave path alone.
149 if (path == "") { parameters["column"] = inputDir + it->second; }
152 it = parameters.find("name");
153 //user has given a template file
154 if(it != parameters.end()){
155 path = m->hasPath(it->second);
156 //if the user has not given a path then, add inputdir. else leave path alone.
157 if (path == "") { parameters["name"] = inputDir + it->second; }
160 it = parameters.find("taxonomy");
161 //user has given a template file
162 if(it != parameters.end()){
163 path = m->hasPath(it->second);
164 //if the user has not given a path then, add inputdir. else leave path alone.
165 if (path == "") { parameters["taxonomy"] = inputDir + it->second; }
168 it = parameters.find("fasta");
169 //user has given a template file
170 if(it != parameters.end()){
171 path = m->hasPath(it->second);
172 //if the user has not given a path then, add inputdir. else leave path alone.
173 if (path == "") { parameters["fasta"] = inputDir + it->second; }
177 //check for required parameters
178 phylipfile = validParameter.validFile(parameters, "phylip", true);
179 if (phylipfile == "not open") { abort = true; }
180 else if (phylipfile == "not found") { phylipfile = ""; }
181 else { distfile = phylipfile; format = "phylip"; m->setPhylipFile(phylipfile); }
183 columnfile = validParameter.validFile(parameters, "column", true);
184 if (columnfile == "not open") { abort = true; }
185 else if (columnfile == "not found") { columnfile = ""; }
186 else { distfile = columnfile; format = "column"; m->setColumnFile(columnfile); }
188 namefile = validParameter.validFile(parameters, "name", true);
189 if (namefile == "not open") { abort = true; }
190 else if (namefile == "not found") { namefile = ""; }
191 else { m->setNameFile(namefile); }
193 fastafile = validParameter.validFile(parameters, "fasta", true);
194 if (fastafile == "not open") { abort = true; }
195 else if (fastafile == "not found") { fastafile = ""; }
196 else { distfile = fastafile; splitmethod = "fasta"; m->setFastaFile(fastafile); }
198 taxFile = validParameter.validFile(parameters, "taxonomy", true);
199 if (taxFile == "not open") { taxFile = ""; abort = true; }
200 else if (taxFile == "not found") { taxFile = ""; }
201 else { m->setTaxonomyFile(taxFile); }
203 if ((phylipfile == "") && (columnfile == "") && (fastafile == "")) {
204 //is there are current file available for either of these?
205 //give priority to column, then phylip, then fasta
206 columnfile = m->getColumnFile();
207 if (columnfile != "") { m->mothurOut("Using " + columnfile + " as input file for the column parameter."); m->mothurOutEndLine(); }
209 phylipfile = m->getPhylipFile();
210 if (phylipfile != "") { m->mothurOut("Using " + phylipfile + " as input file for the phylip parameter."); m->mothurOutEndLine(); }
212 fastafile = m->getFastaFile();
213 if (fastafile != "") { m->mothurOut("Using " + fastafile + " as input file for the fasta parameter."); m->mothurOutEndLine(); }
215 m->mothurOut("No valid current files. When executing a cluster.split command you must enter a phylip or a column or fastafile."); m->mothurOutEndLine();
221 else if ((phylipfile != "") && (columnfile != "") && (fastafile != "")) { m->mothurOut("When executing a cluster.split command you must enter ONLY ONE of the following: fasta, phylip or column."); m->mothurOutEndLine(); abort = true; }
223 if (columnfile != "") {
224 if (namefile == "") {
225 namefile = m->getNameFile();
226 if (namefile != "") { m->mothurOut("Using " + namefile + " as input file for the name parameter."); m->mothurOutEndLine(); }
228 m->mothurOut("You need to provide a namefile if you are going to use the column format."); m->mothurOutEndLine();
234 if (fastafile != "") {
236 taxFile = m->getTaxonomyFile();
237 if (taxFile != "") { m->mothurOut("Using " + taxFile + " as input file for the taxonomy parameter."); m->mothurOutEndLine(); }
239 m->mothurOut("You need to provide a taxonomy file if you are if you are using a fasta file to generate the split."); m->mothurOutEndLine();
244 if (namefile == "") {
245 namefile = m->getNameFile();
246 if (namefile != "") { m->mothurOut("Using " + namefile + " as input file for the name parameter."); m->mothurOutEndLine(); }
248 m->mothurOut("You need to provide a namefile if you are if you are using a fasta file to generate the split."); m->mothurOutEndLine();
254 //check for optional parameter and set defaults
255 // ...at some point should added some additional type checking...
256 //get user cutoff and precision or use defaults
258 temp = validParameter.validFile(parameters, "precision", false);
259 if (temp == "not found") { temp = "100"; }
260 //saves precision legnth for formatting below
261 length = temp.length();
262 m->mothurConvert(temp, precision);
264 temp = validParameter.validFile(parameters, "hard", false); if (temp == "not found") { temp = "T"; }
265 hard = m->isTrue(temp);
267 temp = validParameter.validFile(parameters, "large", false); if (temp == "not found") { temp = "F"; }
268 large = m->isTrue(temp);
270 temp = validParameter.validFile(parameters, "processors", false); if (temp == "not found"){ temp = m->getProcessors(); }
271 m->setProcessors(temp);
272 m->mothurConvert(temp, processors);
274 temp = validParameter.validFile(parameters, "splitmethod", false);
275 if (splitmethod != "fasta") {
276 if (temp == "not found") { splitmethod = "distance"; }
277 else { splitmethod = temp; }
280 temp = validParameter.validFile(parameters, "cutoff", false); if (temp == "not found") { temp = "0.25"; }
281 m->mothurConvert(temp, cutoff);
282 cutoff += (5 / (precision * 10.0));
284 temp = validParameter.validFile(parameters, "taxlevel", false); if (temp == "not found") { temp = "3"; }
285 m->mothurConvert(temp, taxLevelCutoff);
287 method = validParameter.validFile(parameters, "method", false); if (method == "not found") { method = "average"; }
289 if ((method == "furthest") || (method == "nearest") || (method == "average")) { }
290 else { m->mothurOut("Not a valid clustering method. Valid clustering algorithms are furthest, nearest or average."); m->mothurOutEndLine(); abort = true; }
292 if ((splitmethod == "distance") || (splitmethod == "classify") || (splitmethod == "fasta")) { }
293 else { m->mothurOut(splitmethod + " is not a valid splitting method. Valid splitting algorithms are distance, classify or fasta."); m->mothurOutEndLine(); abort = true; }
295 if ((splitmethod == "classify") && (taxFile == "")) { m->mothurOut("You need to provide a taxonomy file if you are going to use the classify splitmethod."); m->mothurOutEndLine(); abort = true; }
297 showabund = validParameter.validFile(parameters, "showabund", false);
298 if (showabund == "not found") { showabund = "T"; }
300 timing = validParameter.validFile(parameters, "timing", false);
301 if (timing == "not found") { timing = "F"; }
305 catch(exception& e) {
306 m->errorOut(e, "ClusterSplitCommand", "ClusterSplitCommand");
311 //**********************************************************************************************************************
313 int ClusterSplitCommand::execute(){
316 if (abort == true) { if (calledHelp) { return 0; } return 2; }
319 vector<string> listFileNames;
321 string singletonName = "";
322 double saveCutoff = cutoff;
324 //****************** file prep work ******************************//
329 MPI_Comm_size(MPI_COMM_WORLD, &processors); //set processors to the number of mpi processes running
330 MPI_Comm_rank(MPI_COMM_WORLD, &pid); //find out who we are
332 if (pid == 0) { //only process 0 converts and splits
336 //if user gave a phylip file convert to column file
337 if (format == "phylip") {
339 m->mothurOut("Converting to column format..."); m->mothurOutEndLine();
341 ReadCluster* convert = new ReadCluster(distfile, cutoff, outputDir, false);
343 NameAssignment* nameMap = NULL;
344 convert->setFormat("phylip");
345 convert->read(nameMap);
347 if (m->control_pressed) { delete convert; return 0; }
349 distfile = convert->getOutputFile();
351 //if no names file given with phylip file, create it
352 ListVector* listToMakeNameFile = convert->getListVector();
353 if (namefile == "") { //you need to make a namefile for split matrix
355 namefile = phylipfile + ".names";
356 m->openOutputFile(namefile, out);
357 for (int i = 0; i < listToMakeNameFile->getNumBins(); i++) {
358 string bin = listToMakeNameFile->get(i);
359 out << bin << '\t' << bin << endl;
363 delete listToMakeNameFile;
366 m->mothurOut("It took " + toString(time(NULL) - estart) + " seconds to convert the distance file."); m->mothurOutEndLine();
368 if (m->control_pressed) { return 0; }
371 m->mothurOut("Splitting the file..."); m->mothurOutEndLine();
373 //split matrix into non-overlapping groups
375 if (splitmethod == "distance") { split = new SplitMatrix(distfile, namefile, taxFile, cutoff, splitmethod, large); }
376 else if (splitmethod == "classify") { split = new SplitMatrix(distfile, namefile, taxFile, taxLevelCutoff, splitmethod, large); }
377 else if (splitmethod == "fasta") { split = new SplitMatrix(fastafile, namefile, taxFile, taxLevelCutoff, cutoff, splitmethod, processors, outputDir); }
378 else { m->mothurOut("Not a valid splitting method. Valid splitting algorithms are distance, classify or fasta."); m->mothurOutEndLine(); return 0; }
382 if (m->control_pressed) { delete split; return 0; }
384 singletonName = split->getSingletonNames();
385 vector< map<string, string> > distName = split->getDistanceFiles(); //returns map of distance files -> namefile sorted by distance file size
388 //output a merged distance file
389 if (splitmethod == "fasta") { createMergedDistanceFile(distName); }
392 if (m->control_pressed) { return 0; }
394 m->mothurOut("It took " + toString(time(NULL) - estart) + " seconds to split the distance file."); m->mothurOutEndLine();
397 //****************** break up files between processes and cluster each file set ******************************//
399 ////you are process 0 from above////
401 vector < vector < map<string, string> > > dividedNames; //distNames[1] = vector of filenames for process 1...
402 dividedNames.resize(processors);
404 //for each file group figure out which process will complete it
405 //want to divide the load intelligently so the big files are spread between processes
406 for (int i = 0; i < distName.size(); i++) {
407 int processToAssign = (i+1) % processors;
408 if (processToAssign == 0) { processToAssign = processors; }
410 dividedNames[(processToAssign-1)].push_back(distName[i]);
413 //not lets reverse the order of ever other process, so we balance big files running with little ones
414 for (int i = 0; i < processors; i++) {
415 int remainder = ((i+1) % processors);
416 if (remainder) { reverse(dividedNames[i].begin(), dividedNames[i].end()); }
420 //send each child the list of files it needs to process
421 for(int i = 1; i < processors; i++) {
422 //send number of file pairs
423 int num = dividedNames[i].size();
424 MPI_Send(&num, 1, MPI_INT, i, tag, MPI_COMM_WORLD);
426 for (int j = 0; j < num; j++) { //send filenames to process i
427 char tempDistFileName[1024];
428 strcpy(tempDistFileName, (dividedNames[i][j].begin()->first).c_str());
429 int lengthDist = (dividedNames[i][j].begin()->first).length();
431 MPI_Send(&lengthDist, 1, MPI_INT, i, tag, MPI_COMM_WORLD);
432 MPI_Send(tempDistFileName, 1024, MPI_CHAR, i, tag, MPI_COMM_WORLD);
434 char tempNameFileName[1024];
435 strcpy(tempNameFileName, (dividedNames[i][j].begin()->second).c_str());
436 int lengthName = (dividedNames[i][j].begin()->second).length();
438 MPI_Send(&lengthName, 1, MPI_INT, i, tag, MPI_COMM_WORLD);
439 MPI_Send(tempNameFileName, 1024, MPI_CHAR, i, tag, MPI_COMM_WORLD);
444 listFileNames = cluster(dividedNames[0], labels);
446 //receive the other processes info
447 for(int i = 1; i < processors; i++) {
448 int num = dividedNames[i].size();
451 MPI_Recv(&tempCutoff, 1, MPI_DOUBLE, i, tag, MPI_COMM_WORLD, &status);
452 if (tempCutoff < cutoff) { cutoff = tempCutoff; }
454 //send list filenames to root process
455 for (int j = 0; j < num; j++) {
457 char tempListFileName[1024];
459 MPI_Recv(&lengthList, 1, MPI_INT, i, tag, MPI_COMM_WORLD, &status);
460 MPI_Recv(tempListFileName, 1024, MPI_CHAR, i, tag, MPI_COMM_WORLD, &status);
462 string myListFileName = tempListFileName;
463 myListFileName = myListFileName.substr(0, lengthList);
465 listFileNames.push_back(myListFileName);
468 //send Labels to root process
470 MPI_Recv(&numLabels, 1, MPI_INT, i, tag, MPI_COMM_WORLD, &status);
472 for (int j = 0; j < numLabels; j++) {
476 MPI_Recv(&lengthLabel, 1, MPI_INT, i, tag, MPI_COMM_WORLD, &status);
477 MPI_Recv(tempLabel, 100, MPI_CHAR, i, tag, MPI_COMM_WORLD, &status);
479 string myLabel = tempLabel;
480 myLabel = myLabel.substr(0, lengthLabel);
482 if (labels.count(myLabel) == 0) { labels.insert(myLabel); }
486 }else { //you are a child process
487 vector < map<string, string> > myNames;
489 //recieve the files you need to process
490 //receive number of file pairs
492 MPI_Recv(&num, 1, MPI_INT, 0, tag, MPI_COMM_WORLD, &status);
496 for (int j = 0; j < num; j++) { //receive filenames to process
498 char tempDistFileName[1024];
500 MPI_Recv(&lengthDist, 1, MPI_INT, 0, tag, MPI_COMM_WORLD, &status);
501 MPI_Recv(tempDistFileName, 1024, MPI_CHAR, 0, tag, MPI_COMM_WORLD, &status);
503 string myDistFileName = tempDistFileName;
504 myDistFileName = myDistFileName.substr(0, lengthDist);
507 char tempNameFileName[1024];
509 MPI_Recv(&lengthName, 1, MPI_INT, 0, tag, MPI_COMM_WORLD, &status);
510 MPI_Recv(tempNameFileName, 1024, MPI_CHAR, 0, tag, MPI_COMM_WORLD, &status);
512 string myNameFileName = tempNameFileName;
513 myNameFileName = myNameFileName.substr(0, lengthName);
516 myNames[j][myDistFileName] = myNameFileName;
520 listFileNames = cluster(myNames, labels);
523 MPI_Send(&cutoff, 1, MPI_DOUBLE, 0, tag, MPI_COMM_WORLD);
525 //send list filenames to root process
526 for (int j = 0; j < num; j++) {
527 char tempListFileName[1024];
528 strcpy(tempListFileName, listFileNames[j].c_str());
529 int lengthList = listFileNames[j].length();
531 MPI_Send(&lengthList, 1, MPI_INT, 0, tag, MPI_COMM_WORLD);
532 MPI_Send(tempListFileName, 1024, MPI_CHAR, 0, tag, MPI_COMM_WORLD);
535 //send Labels to root process
536 int numLabels = labels.size();
537 MPI_Send(&numLabels, 1, MPI_INT, 0, tag, MPI_COMM_WORLD);
539 for(set<string>::iterator it = labels.begin(); it != labels.end(); ++it) {
541 strcpy(tempLabel, (*it).c_str());
542 int lengthLabel = (*it).length();
544 MPI_Send(&lengthLabel, 1, MPI_INT, 0, tag, MPI_COMM_WORLD);
545 MPI_Send(tempLabel, 100, MPI_CHAR, 0, tag, MPI_COMM_WORLD);
550 MPI_Barrier(MPI_COMM_WORLD);
553 ///////////////////// WINDOWS CAN ONLY USE 1 PROCESSORS ACCESS VIOLATION UNRESOLVED ///////////////////////
555 if (processors > distName.size()) { processors = distName.size(); }
557 #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) || (__linux__) || (__unix__) || (__unix)
559 listFileNames = cluster(distName, labels); //clusters individual files and returns names of list files
561 listFileNames = createProcesses(distName, labels);
564 listFileNames = cluster(distName, labels); //clusters individual files and returns names of list files
567 if (m->control_pressed) { for (int i = 0; i < listFileNames.size(); i++) { m->mothurRemove(listFileNames[i]); } return 0; }
569 if (saveCutoff != cutoff) { m->mothurOut("Cutoff was " + toString(saveCutoff) + " changed cutoff to " + toString(cutoff)); m->mothurOutEndLine(); }
571 m->mothurOut("It took " + toString(time(NULL) - estart) + " seconds to cluster"); m->mothurOutEndLine();
573 //****************** merge list file and create rabund and sabund files ******************************//
575 m->mothurOut("Merging the clustered files..."); m->mothurOutEndLine();
578 if (pid == 0) { //only process 0 merges
581 ListVector* listSingle;
582 map<float, int> labelBins = completeListFile(listFileNames, singletonName, labels, listSingle); //returns map of label to numBins
584 if (m->control_pressed) { if (listSingle != NULL) { delete listSingle; } for (int i = 0; i < outputNames.size(); i++) { m->mothurRemove(outputNames[i]); } return 0; }
586 mergeLists(listFileNames, labelBins, listSingle);
588 if (m->control_pressed) { for (int i = 0; i < outputNames.size(); i++) { m->mothurRemove(outputNames[i]); } return 0; }
590 m->mothurOut("It took " + toString(time(NULL) - estart) + " seconds to merge."); m->mothurOutEndLine();
592 //set list file as new current listfile
594 itTypes = outputTypes.find("list");
595 if (itTypes != outputTypes.end()) {
596 if ((itTypes->second).size() != 0) { current = (itTypes->second)[0]; m->setListFile(current); }
599 //set rabund file as new current rabundfile
600 itTypes = outputTypes.find("rabund");
601 if (itTypes != outputTypes.end()) {
602 if ((itTypes->second).size() != 0) { current = (itTypes->second)[0]; m->setRabundFile(current); }
605 //set sabund file as new current sabundfile
606 itTypes = outputTypes.find("sabund");
607 if (itTypes != outputTypes.end()) {
608 if ((itTypes->second).size() != 0) { current = (itTypes->second)[0]; m->setSabundFile(current); }
611 m->mothurOutEndLine();
612 m->mothurOut("Output File Names: "); m->mothurOutEndLine();
613 for (int i = 0; i < outputNames.size(); i++) { m->mothurOut(outputNames[i]); m->mothurOutEndLine(); }
614 m->mothurOutEndLine();
617 } //only process 0 merges
620 MPI_Barrier(MPI_COMM_WORLD);
625 catch(exception& e) {
626 m->errorOut(e, "ClusterSplitCommand", "execute");
630 //**********************************************************************************************************************
631 map<float, int> ClusterSplitCommand::completeListFile(vector<string> listNames, string singleton, set<string>& userLabels, ListVector*& listSingle){
634 map<float, int> labelBin;
635 vector<float> orderFloat;
639 if (singleton != "none") {
641 m->openInputFile(singleton, in);
643 string firstCol, secondCol;
644 listSingle = new ListVector();
646 in >> firstCol >> secondCol; m->gobble(in);
647 listSingle->push_back(secondCol);
650 m->mothurRemove(singleton);
652 numSingleBins = listSingle->getNumBins();
653 }else{ listSingle = NULL; numSingleBins = 0; }
655 //go through users set and make them floats so we can sort them
656 for(set<string>::iterator it = userLabels.begin(); it != userLabels.end(); ++it) {
659 if ((*it != "unique") && (convertTestFloat(*it, temp) == true)) { convert(*it, temp); }
660 else if (*it == "unique") { temp = -1.0; }
662 if (temp <= cutoff) {
663 orderFloat.push_back(temp);
664 labelBin[temp] = numSingleBins; //initialize numbins
669 sort(orderFloat.begin(), orderFloat.end());
672 //get the list info from each file
673 for (int k = 0; k < listNames.size(); k++) {
675 if (m->control_pressed) {
676 if (listSingle != NULL) { delete listSingle; listSingle = NULL; m->mothurRemove(singleton); }
677 for (int i = 0; i < listNames.size(); i++) { m->mothurRemove(listNames[i]); }
681 InputData* input = new InputData(listNames[k], "list");
682 ListVector* list = input->getListVector();
683 string lastLabel = list->getLabel();
685 string filledInList = listNames[k] + "filledInTemp";
687 m->openOutputFile(filledInList, outFilled);
689 //for each label needed
690 for(int l = 0; l < orderFloat.size(); l++){
693 if (orderFloat[l] == -1) { thisLabel = "unique"; }
694 else { thisLabel = toString(orderFloat[l], length-1); }
696 //this file has reached the end
698 list = input->getListVector(lastLabel, true);
699 }else{ //do you have the distance, or do you need to fill in
702 if (list->getLabel() == "unique") { labelFloat = -1.0; }
703 else { convert(list->getLabel(), labelFloat); }
705 //check for missing labels
706 if (labelFloat > orderFloat[l]) { //you are missing the label, get the next smallest one
707 //if its bigger get last label, otherwise keep it
709 list = input->getListVector(lastLabel, true); //get last list vector to use, you actually want to move back in the file
711 lastLabel = list->getLabel();
715 list->setLabel(thisLabel);
716 list->print(outFilled);
719 labelBin[orderFloat[l]] += list->getNumBins();
723 list = input->getListVector();
726 if (list != NULL) { delete list; }
730 m->mothurRemove(listNames[k]);
731 rename(filledInList.c_str(), listNames[k].c_str());
736 catch(exception& e) {
737 m->errorOut(e, "ClusterSplitCommand", "completeListFile");
741 //**********************************************************************************************************************
742 int ClusterSplitCommand::mergeLists(vector<string> listNames, map<float, int> userLabels, ListVector* listSingle){
744 if (outputDir == "") { outputDir += m->hasPath(distfile); }
745 fileroot = outputDir + m->getRootName(m->getSimpleName(distfile));
747 m->openOutputFile(fileroot+ tag + ".sabund", outSabund);
748 m->openOutputFile(fileroot+ tag + ".rabund", outRabund);
749 m->openOutputFile(fileroot+ tag + ".list", outList);
751 outputNames.push_back(fileroot+ tag + ".sabund"); outputTypes["list"].push_back(fileroot+ tag + ".list");
752 outputNames.push_back(fileroot+ tag + ".rabund"); outputTypes["rabund"].push_back(fileroot+ tag + ".rabund");
753 outputNames.push_back(fileroot+ tag + ".list"); outputTypes["sabund"].push_back(fileroot+ tag + ".sabund");
755 map<float, int>::iterator itLabel;
757 //for each label needed
758 for(itLabel = userLabels.begin(); itLabel != userLabels.end(); itLabel++) {
761 if (itLabel->first == -1) { thisLabel = "unique"; }
762 else { thisLabel = toString(itLabel->first, length-1); }
764 outList << thisLabel << '\t' << itLabel->second << '\t';
766 RAbundVector* rabund = new RAbundVector();
767 rabund->setLabel(thisLabel);
770 if (listSingle != NULL) {
771 for (int j = 0; j < listSingle->getNumBins(); j++) {
772 outList << listSingle->get(j) << '\t';
773 rabund->push_back(m->getNumNames(listSingle->get(j)));
777 //get the list info from each file
778 for (int k = 0; k < listNames.size(); k++) {
780 if (m->control_pressed) { if (listSingle != NULL) { delete listSingle; } for (int i = 0; i < listNames.size(); i++) { m->mothurRemove(listNames[i]); } delete rabund; return 0; }
782 InputData* input = new InputData(listNames[k], "list");
783 ListVector* list = input->getListVector(thisLabel);
785 //this file has reached the end
786 if (list == NULL) { m->mothurOut("Error merging listvectors in file " + listNames[k]); m->mothurOutEndLine(); }
788 for (int j = 0; j < list->getNumBins(); j++) {
789 outList << list->get(j) << '\t';
790 rabund->push_back(m->getNumNames(list->get(j)));
797 SAbundVector sabund = rabund->getSAbundVector();
799 sabund.print(outSabund);
800 rabund->print(outRabund);
810 if (listSingle != NULL) { delete listSingle; }
812 for (int i = 0; i < listNames.size(); i++) { m->mothurRemove(listNames[i]); }
816 catch(exception& e) {
817 m->errorOut(e, "ClusterSplitCommand", "mergeLists");
822 //**********************************************************************************************************************
824 void ClusterSplitCommand::printData(ListVector* oldList){
826 string label = oldList->getLabel();
827 RAbundVector oldRAbund = oldList->getRAbundVector();
829 oldRAbund.setLabel(label);
830 if (m->isTrue(showabund)) {
831 oldRAbund.getSAbundVector().print(cout);
833 oldRAbund.print(outRabund);
834 oldRAbund.getSAbundVector().print(outSabund);
836 oldList->print(outList);
838 catch(exception& e) {
839 m->errorOut(e, "ClusterSplitCommand", "printData");
843 //**********************************************************************************************************************
844 vector<string> ClusterSplitCommand::createProcesses(vector< map<string, string> > distName, set<string>& labels){
847 vector<string> listFiles;
848 vector < vector < map<string, string> > > dividedNames; //distNames[1] = vector of filenames for process 1...
849 dividedNames.resize(processors);
851 //for each file group figure out which process will complete it
852 //want to divide the load intelligently so the big files are spread between processes
853 for (int i = 0; i < distName.size(); i++) {
855 int processToAssign = (i+1) % processors;
856 if (processToAssign == 0) { processToAssign = processors; }
858 dividedNames[(processToAssign-1)].push_back(distName[i]);
859 if ((processToAssign-1) == 1) { m->mothurOut(distName[i].begin()->first + "\n"); }
862 //not lets reverse the order of ever other process, so we balance big files running with little ones
863 for (int i = 0; i < processors; i++) {
865 int remainder = ((i+1) % processors);
866 if (remainder) { reverse(dividedNames[i].begin(), dividedNames[i].end()); }
869 if (m->control_pressed) { return listFiles; }
871 #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) || (__linux__) || (__unix__) || (__unix)
875 //loop through and create all the processes you want
876 while (process != processors) {
880 processIDS.push_back(pid); //create map from line number to pid so you can append files in correct order later
884 vector<string> listFileNames = cluster(dividedNames[process], labels);
886 //write out names to file
887 string filename = toString(getpid()) + ".temp";
889 m->openOutputFile(filename, out);
891 for (int j = 0; j < listFileNames.size(); j++) { out << listFileNames[j] << endl; }
896 filename = toString(getpid()) + ".temp.labels";
897 m->openOutputFile(filename, outLabels);
899 outLabels << cutoff << endl;
900 for (set<string>::iterator it = labels.begin(); it != labels.end(); it++) {
901 outLabels << (*it) << endl;
907 m->mothurOut("[ERROR]: unable to spawn the necessary processes."); m->mothurOutEndLine();
908 for (int i = 0; i < processIDS.size(); i++) { kill (processIDS[i], SIGINT); }
914 listFiles = cluster(dividedNames[0], labels);
916 //force parent to wait until all the processes are done
917 for (int i=0;i< processIDS.size();i++) {
918 int temp = processIDS[i];
922 //get list of list file names from each process
923 for(int i=0;i<processIDS.size();i++){
924 string filename = toString(processIDS[i]) + ".temp";
926 m->openInputFile(filename, in);
928 in >> tag; m->gobble(in);
932 in >> tempName; m->gobble(in);
933 listFiles.push_back(tempName);
936 m->mothurRemove((toString(processIDS[i]) + ".temp"));
939 filename = toString(processIDS[i]) + ".temp.labels";
941 m->openInputFile(filename, in2);
944 in2 >> tempCutoff; m->gobble(in2);
945 if (tempCutoff < cutoff) { cutoff = tempCutoff; }
949 in2 >> tempName; m->gobble(in2);
950 if (labels.count(tempName) == 0) { labels.insert(tempName); }
953 m->mothurRemove((toString(processIDS[i]) + ".temp.labels"));
959 //////////////////////////////////////////////////////////////////////////////////////////////////////
960 //Windows version shared memory, so be careful when passing variables through the clusterData struct.
961 //Above fork() will clone, so memory is separate, but that's not the case with windows,
962 //Taking advantage of shared memory to allow both threads to add labels.
963 //////////////////////////////////////////////////////////////////////////////////////////////////////
965 vector<clusterData*> pDataArray;
966 DWORD dwThreadIdArray[processors-1];
967 HANDLE hThreadArray[processors-1];
969 //Create processor worker threads.
970 for( int i=1; i<processors; i++ ){
971 // Allocate memory for thread data.
972 clusterData* tempCluster = new clusterData(dividedNames[i], m, cutoff, method, outputDir, hard, precision, length, i);
973 pDataArray.push_back(tempCluster);
974 processIDS.push_back(i);
976 //MySeqSumThreadFunction is in header. It must be global or static to work with the threads.
977 //default security attributes, thread function name, argument to thread function, use default creation flags, returns the thread identifier
978 hThreadArray[i-1] = CreateThread(NULL, 0, MyClusterThreadFunction, pDataArray[i-1], 0, &dwThreadIdArray[i-1]);
983 listFiles = cluster(dividedNames[0], labels);
985 //Wait until all threads have terminated.
986 WaitForMultipleObjects(processors-1, hThreadArray, TRUE, INFINITE);
988 //Close all thread handles and free memory allocations.
989 for(int i=0; i < pDataArray.size(); i++){
991 tag = pDataArray[i]->tag;
992 //get listfiles created
993 for(int j=0; j < pDataArray[i]->listFiles.size(); j++){ listFiles.push_back(pDataArray[i]->listFiles[j]); }
995 set<string>::iterator it;
996 for(it = pDataArray[i]->labels.begin(); it != pDataArray[i]->labels.end(); it++){ labels.insert(*it); }
998 if (pDataArray[i]->cutoff < cutoff) { cutoff = pDataArray[i]->cutoff; }
999 CloseHandle(hThreadArray[i]);
1000 delete pDataArray[i];
1008 catch(exception& e) {
1009 m->errorOut(e, "ClusterSplitCommand", "createProcesses");
1013 //**********************************************************************************************************************
1015 vector<string> ClusterSplitCommand::cluster(vector< map<string, string> > distNames, set<string>& labels){
1018 vector<string> listFileNames;
1019 double smallestCutoff = cutoff;
1021 //cluster each distance file
1022 for (int i = 0; i < distNames.size(); i++) {
1024 Cluster* cluster = NULL;
1025 SparseMatrix* matrix = NULL;
1026 ListVector* list = NULL;
1028 RAbundVector* rabund = NULL;
1030 if (m->control_pressed) { return listFileNames; }
1032 string thisNamefile = distNames[i].begin()->second;
1033 string thisDistFile = distNames[i].begin()->first;
1037 MPI_Comm_rank(MPI_COMM_WORLD, &pid); //find out who we are
1039 //output your files too
1041 cout << endl << "Reading " << thisDistFile << endl;
1045 m->mothurOutEndLine(); m->mothurOut("Reading " + thisDistFile); m->mothurOutEndLine();
1047 ReadMatrix* read = new ReadColumnMatrix(thisDistFile);
1048 read->setCutoff(cutoff);
1050 NameAssignment* nameMap = new NameAssignment(thisNamefile);
1052 read->read(nameMap);
1054 if (m->control_pressed) { delete read; delete nameMap; return listFileNames; }
1056 list = read->getListVector();
1058 matrix = read->getMatrix();
1060 delete read; read = NULL;
1061 delete nameMap; nameMap = NULL;
1065 //output your files too
1067 cout << endl << "Clustering " << thisDistFile << endl;
1071 m->mothurOutEndLine(); m->mothurOut("Clustering " + thisDistFile); m->mothurOutEndLine();
1073 rabund = new RAbundVector(list->getRAbundVector());
1076 if (method == "furthest") { cluster = new CompleteLinkage(rabund, list, matrix, cutoff, method); }
1077 else if(method == "nearest"){ cluster = new SingleLinkage(rabund, list, matrix, cutoff, method); }
1078 else if(method == "average"){ cluster = new AverageLinkage(rabund, list, matrix, cutoff, method); }
1079 tag = cluster->getTag();
1081 if (outputDir == "") { outputDir += m->hasPath(thisDistFile); }
1082 fileroot = outputDir + m->getRootName(m->getSimpleName(thisDistFile));
1085 m->openOutputFile(fileroot+ tag + ".list", listFile);
1087 listFileNames.push_back(fileroot+ tag + ".list");
1089 float previousDist = 0.00000;
1090 float rndPreviousDist = 0.00000;
1096 double saveCutoff = cutoff;
1098 while (matrix->getSmallDist() < cutoff && matrix->getNNodes() > 0){
1100 if (m->control_pressed) { //clean up
1101 delete matrix; delete list; delete cluster; delete rabund;
1103 for (int i = 0; i < listFileNames.size(); i++) { m->mothurRemove(listFileNames[i]); }
1104 listFileNames.clear(); return listFileNames;
1107 cluster->update(saveCutoff);
1109 float dist = matrix->getSmallDist();
1112 rndDist = m->ceilDist(dist, precision);
1114 rndDist = m->roundDist(dist, precision);
1117 if(previousDist <= 0.0000 && dist != previousDist){
1118 oldList.setLabel("unique");
1119 oldList.print(listFile);
1120 if (labels.count("unique") == 0) { labels.insert("unique"); }
1122 else if(rndDist != rndPreviousDist){
1123 oldList.setLabel(toString(rndPreviousDist, length-1));
1124 oldList.print(listFile);
1125 if (labels.count(toString(rndPreviousDist, length-1)) == 0) { labels.insert(toString(rndPreviousDist, length-1)); }
1128 previousDist = dist;
1129 rndPreviousDist = rndDist;
1134 if(previousDist <= 0.0000){
1135 oldList.setLabel("unique");
1136 oldList.print(listFile);
1137 if (labels.count("unique") == 0) { labels.insert("unique"); }
1139 else if(rndPreviousDist<cutoff){
1140 oldList.setLabel(toString(rndPreviousDist, length-1));
1141 oldList.print(listFile);
1142 if (labels.count(toString(rndPreviousDist, length-1)) == 0) { labels.insert(toString(rndPreviousDist, length-1)); }
1145 delete matrix; delete list; delete cluster; delete rabund;
1146 matrix = NULL; list = NULL; cluster = NULL; rabund = NULL;
1149 if (m->control_pressed) { //clean up
1150 for (int i = 0; i < listFileNames.size(); i++) { m->mothurRemove(listFileNames[i]); }
1151 listFileNames.clear(); return listFileNames;
1154 m->mothurRemove(thisDistFile);
1155 m->mothurRemove(thisNamefile);
1157 if (saveCutoff != cutoff) {
1158 if (hard) { saveCutoff = m->ceilDist(saveCutoff, precision); }
1159 else { saveCutoff = m->roundDist(saveCutoff, precision); }
1161 m->mothurOut("Cutoff was " + toString(cutoff) + " changed cutoff to " + toString(saveCutoff)); m->mothurOutEndLine();
1164 if (saveCutoff < smallestCutoff) { smallestCutoff = saveCutoff; }
1167 cutoff = smallestCutoff;
1169 return listFileNames;
1172 catch(exception& e) {
1173 m->errorOut(e, "ClusterSplitCommand", "cluster");
1179 //**********************************************************************************************************************
1181 int ClusterSplitCommand::createMergedDistanceFile(vector< map<string, string> > distNames) {
1186 MPI_Comm_rank(MPI_COMM_WORLD, &pid); //find out who we are
1191 string thisOutputDir = outputDir;
1192 if (outputDir == "") { thisOutputDir = m->hasPath(fastafile); }
1193 string outputFileName = thisOutputDir + m->getRootName(m->getSimpleName(fastafile)) + "dist";
1194 m->mothurRemove(outputFileName);
1197 for (int i = 0; i < distNames.size(); i++) {
1198 if (m->control_pressed) { return 0; }
1200 string thisDistFile = distNames[i].begin()->first;
1202 m->appendFiles(thisDistFile, outputFileName);
1205 outputTypes["column"].push_back(outputFileName); outputNames.push_back(outputFileName);
1215 catch(exception& e) {
1216 m->errorOut(e, "ClusterSplitCommand", "createMergedDistanceFile");
1220 //**********************************************************************************************************************