2 * clustersplitcommand.cpp
5 * Created by westcott on 5/19/10.
6 * Copyright 2010 Schloss Lab. All rights reserved.
10 #include "clustersplitcommand.h"
11 #include "readcluster.h"
12 #include "splitmatrix.h"
13 #include "readphylip.h"
14 #include "readcolumn.h"
15 #include "readmatrix.hpp"
16 #include "inputdata.h"
18 //**********************************************************************************************************************
19 //This function checks to make sure the cluster command has no errors and then clusters based on the method chosen.
20 ClusterSplitCommand::ClusterSplitCommand(string option) {
22 globaldata = GlobalData::getInstance();
25 //allow user to run help
26 if(option == "help") { help(); abort = true; }
29 //valid paramters for this command
30 string Array[] = {"phylip","column","name","cutoff","precision","method","splitmethod","taxonomy","taxlevel","large","showabund","timing","hard","processors","outputdir","inputdir"};
31 vector<string> myArray (Array, Array+(sizeof(Array)/sizeof(string)));
33 OptionParser parser(option);
34 map<string,string> parameters = parser.getParameters();
36 ValidParameters validParameter;
38 //check to make sure all parameters are valid for command
39 map<string,string>::iterator it;
40 for (it = parameters.begin(); it != parameters.end(); it++) {
41 if (validParameter.isValidParameter(it->first, myArray, it->second) != true) {
46 globaldata->newRead();
48 //if the user changes the output directory command factory will send this info to us in the output parameter
49 outputDir = validParameter.validFile(parameters, "outputdir", false); if (outputDir == "not found"){ outputDir = ""; }
51 //if the user changes the input directory command factory will send this info to us in the output parameter
52 string inputDir = validParameter.validFile(parameters, "inputdir", false);
53 if (inputDir == "not found"){ inputDir = ""; }
56 it = parameters.find("phylip");
57 //user has given a template file
58 if(it != parameters.end()){
59 path = hasPath(it->second);
60 //if the user has not given a path then, add inputdir. else leave path alone.
61 if (path == "") { parameters["phylip"] = inputDir + it->second; }
64 it = parameters.find("column");
65 //user has given a template file
66 if(it != parameters.end()){
67 path = hasPath(it->second);
68 //if the user has not given a path then, add inputdir. else leave path alone.
69 if (path == "") { parameters["column"] = inputDir + it->second; }
72 it = parameters.find("name");
73 //user has given a template file
74 if(it != parameters.end()){
75 path = hasPath(it->second);
76 //if the user has not given a path then, add inputdir. else leave path alone.
77 if (path == "") { parameters["name"] = inputDir + it->second; }
80 it = parameters.find("taxonomy");
81 //user has given a template file
82 if(it != parameters.end()){
83 path = hasPath(it->second);
84 //if the user has not given a path then, add inputdir. else leave path alone.
85 if (path == "") { parameters["taxonomy"] = inputDir + it->second; }
89 //check for required parameters
90 phylipfile = validParameter.validFile(parameters, "phylip", true);
91 if (phylipfile == "not open") { abort = true; }
92 else if (phylipfile == "not found") { phylipfile = ""; }
93 else { distfile = phylipfile; format = "phylip"; }
95 columnfile = validParameter.validFile(parameters, "column", true);
96 if (columnfile == "not open") { abort = true; }
97 else if (columnfile == "not found") { columnfile = ""; }
98 else { distfile = columnfile; format = "column"; }
100 namefile = validParameter.validFile(parameters, "name", true);
101 if (namefile == "not open") { abort = true; }
102 else if (namefile == "not found") { namefile = ""; }
104 taxFile = validParameter.validFile(parameters, "taxonomy", true);
105 if (taxFile == "not open") { abort = true; }
106 else if (taxFile == "not found") { taxFile = ""; }
108 if ((phylipfile == "") && (columnfile == "")) { m->mothurOut("When executing a cluster.split command you must enter a phylip or a column."); m->mothurOutEndLine(); abort = true; }
109 else if ((phylipfile != "") && (columnfile != "")) { m->mothurOut("When executing a cluster.split command you must enter ONLY ONE of the following: phylip or column."); m->mothurOutEndLine(); abort = true; }
111 if (columnfile != "") {
112 if (namefile == "") { m->mothurOut("You need to provide a namefile if you are going to use the column format."); m->mothurOutEndLine(); abort = true; }
115 //check for optional parameter and set defaults
116 // ...at some point should added some additional type checking...
117 //get user cutoff and precision or use defaults
119 temp = validParameter.validFile(parameters, "precision", false);
120 if (temp == "not found") { temp = "100"; }
121 //saves precision legnth for formatting below
122 length = temp.length();
123 convert(temp, precision);
125 temp = validParameter.validFile(parameters, "hard", false); if (temp == "not found") { temp = "F"; }
128 temp = validParameter.validFile(parameters, "large", false); if (temp == "not found") { temp = "F"; }
129 large = isTrue(temp);
131 temp = validParameter.validFile(parameters, "processors", false); if (temp == "not found"){ temp = "1"; }
132 convert(temp, processors);
134 splitmethod = validParameter.validFile(parameters, "splitmethod", false); if (splitmethod == "not found") { splitmethod = "distance"; }
136 temp = validParameter.validFile(parameters, "cutoff", false); if (temp == "not found") { temp = "10"; }
137 convert(temp, cutoff);
138 cutoff += (5 / (precision * 10.0));
140 temp = validParameter.validFile(parameters, "taxlevel", false); if (temp == "not found") { temp = "1"; }
141 convert(temp, taxLevelCutoff);
143 method = validParameter.validFile(parameters, "method", false); if (method == "not found") { method = "furthest"; }
145 if ((method == "furthest") || (method == "nearest") || (method == "average")) { }
146 else { m->mothurOut("Not a valid clustering method. Valid clustering algorithms are furthest, nearest or average."); m->mothurOutEndLine(); abort = true; }
148 if ((splitmethod == "distance") || (splitmethod == "classify")) { }
149 else { m->mothurOut("Not a valid splitting method. Valid splitting algorithms are distance or classify."); m->mothurOutEndLine(); abort = true; }
151 if ((splitmethod == "classify") && (taxFile == "")) { m->mothurOut("You need to provide a taxonomy file if you are going to use the classify splitmethod."); m->mothurOutEndLine(); abort = true; }
153 showabund = validParameter.validFile(parameters, "showabund", false);
154 if (showabund == "not found") { showabund = "T"; }
156 timing = validParameter.validFile(parameters, "timing", false);
157 if (timing == "not found") { timing = "F"; }
161 catch(exception& e) {
162 m->errorOut(e, "ClusterSplitCommand", "ClusterSplitCommand");
167 //**********************************************************************************************************************
169 void ClusterSplitCommand::help(){
171 m->mothurOut("The cluster.split command parameter options are phylip, column, name, cutoff, precision, method, splitmethod, taxonomy, taxlevel, showabund, timing, hard, large, processors. Phylip or column and name are required.\n");
172 m->mothurOut("The phylip and column parameter allow you to enter your distance file. \n");
173 m->mothurOut("The name parameter allows you to enter your name file and is required if your distance file is in column format. \n");
174 m->mothurOut("The cutoff parameter allow you to set the distance you want to cluster to, default is 10.0. \n");
175 m->mothurOut("The precision parameter allows you specify the precision of the precision of the distances outputted, default=100, meaning 2 decimal places. \n");
176 m->mothurOut("The method allows you to specify what clustering algorythm you want to use, default=furthest, option furthest, nearest, or average. \n");
177 m->mothurOut("The splitmethod parameter allows you to specify how you want to split your distance file before you cluster, default=distance, options distance or classify. \n");
178 m->mothurOut("The taxonomy parameter allows you to enter the taxonomy file for your sequences, this is only valid if you are using splitmethod=classify. Be sure your taxonomy file does not include the probability scores. \n");
179 m->mothurOut("The taxlevel parameter allows you to specify the taxonomy level you want to use to split the distance file, default=1. \n");
180 m->mothurOut("The large parameter allows you to indicate that your distance matrix is too large to fit in RAM. The default value is false.\n");
181 m->mothurOut("The cluster.split command should be in the following format: \n");
182 m->mothurOut("cluster.split(column=youDistanceFile, name=yourNameFile, method=yourMethod, cutoff=yourCutoff, precision=yourPrecision, splitmethod=yourSplitmethod, taxonomy=yourTaxonomyfile, taxlevel=yourtaxlevel) \n");
183 m->mothurOut("Example: cluster.split(column=abrecovery.dist, name=abrecovery.names, method=furthest, cutoff=0.10, precision=1000, splitmethod=classify, taxonomy=abrecovery.silva.slv.taxonomy, taxlevel=5) \n");
186 catch(exception& e) {
187 m->errorOut(e, "ClusterSplitCommand", "help");
192 //**********************************************************************************************************************
194 ClusterSplitCommand::~ClusterSplitCommand(){}
196 //**********************************************************************************************************************
198 int ClusterSplitCommand::execute(){
201 if (abort == true) { return 0; }
203 //****************** file prep work ******************************//
205 //if user gave a phylip file convert to column file
206 if (format == "phylip") {
208 ReadCluster* convert = new ReadCluster(distfile, cutoff, outputDir, false);
210 NameAssignment* nameMap = NULL;
211 convert->setFormat("phylip");
212 convert->read(nameMap);
214 if (m->control_pressed) { delete convert; return 0; }
216 distfile = convert->getOutputFile();
218 //if no names file given with phylip file, create it
219 ListVector* listToMakeNameFile = convert->getListVector();
220 if (namefile == "") { //you need to make a namefile for split matrix
222 namefile = phylipfile + ".names";
223 openOutputFile(namefile, out);
224 for (int i = 0; i < listToMakeNameFile->getNumBins(); i++) {
225 string bin = listToMakeNameFile->get(i);
226 out << bin << '\t' << bin << endl;
230 delete listToMakeNameFile;
233 if (m->control_pressed) { return 0; }
235 time_t estart = time(NULL);
236 m->mothurOut("Splitting the distance file..."); m->mothurOutEndLine();
238 //split matrix into non-overlapping groups
240 if (splitmethod == "distance") { split = new SplitMatrix(distfile, namefile, taxFile, cutoff, splitmethod, large); }
241 else { split = new SplitMatrix(distfile, namefile, taxFile, taxLevelCutoff, splitmethod, large); }
245 if (m->control_pressed) { delete split; return 0; }
247 string singletonName = split->getSingletonNames();
248 vector< map<string, string> > distName = split->getDistanceFiles(); //returns map of distance files -> namefile sorted by distance file size
251 if (m->control_pressed) { return 0; }
253 m->mothurOut("It took " + toString(time(NULL) - estart) + " seconds to split the distance file."); m->mothurOutEndLine();
256 //****************** break up files between processes and cluster each file set ******************************//
257 vector<string> listFileNames;
259 #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux)
261 listFileNames = cluster(distName, labels); //clusters individual files and returns names of list files
263 vector < vector < map<string, string> > > dividedNames; //distNames[1] = vector of filenames for process 1...
264 dividedNames.resize(processors);
266 //for each file group figure out which process will complete it
267 //want to divide the load intelligently so the big files are spread between processes
269 for (int i = 0; i < distName.size(); i++) {
270 int processToAssign = (i+1) % processors;
271 if (processToAssign == 0) { processToAssign = processors; }
273 dividedNames[(processToAssign-1)].push_back(distName[i]);
276 //not lets reverse the order of ever other process, so we balance big files running with little ones
277 for (int i = 0; i < processors; i++) {
278 int remainder = ((i+1) % processors);
279 if (remainder) { reverse(dividedNames[i].begin(), dividedNames[i].end()); }
282 createProcesses(dividedNames);
284 if (m->control_pressed) { return 0; }
286 //get list of list file names from each process
287 for(int i=0;i<processors;i++){
288 string filename = toString(processIDS[i]) + ".temp";
290 openInputFile(filename, in);
292 in >> tag; gobble(in);
296 in >> tempName; gobble(in);
297 listFileNames.push_back(tempName);
300 remove((toString(processIDS[i]) + ".temp").c_str());
303 filename = toString(processIDS[i]) + ".temp.labels";
305 openInputFile(filename, in2);
309 in2 >> tempName; gobble(in2);
310 if (labels.count(tempName) == 0) { labels.insert(tempName); }
313 remove((toString(processIDS[i]) + ".temp.labels").c_str());
317 listFileNames = cluster(distName, labels); //clusters individual files and returns names of list files
320 if (m->control_pressed) { for (int i = 0; i < listFileNames.size(); i++) { remove(listFileNames[i].c_str()); } return 0; }
322 m->mothurOut("It took " + toString(time(NULL) - estart) + " seconds to cluster"); m->mothurOutEndLine();
324 //****************** merge list file and create rabund and sabund files ******************************//
326 m->mothurOut("Merging the clustered files..."); m->mothurOutEndLine();
328 ListVector* listSingle;
329 map<float, int> labelBins = completeListFile(listFileNames, singletonName, labels, listSingle); //returns map of label to numBins
331 if (m->control_pressed) { if (listSingle != NULL) { delete listSingle; } for (int i = 0; i < outputNames.size(); i++) { remove(outputNames[i].c_str()); } return 0; }
333 mergeLists(listFileNames, labelBins, listSingle);
335 if (m->control_pressed) { for (int i = 0; i < outputNames.size(); i++) { remove(outputNames[i].c_str()); } return 0; }
337 m->mothurOut("It took " + toString(time(NULL) - estart) + " seconds to merge."); m->mothurOutEndLine();
339 m->mothurOutEndLine();
340 m->mothurOut("Output File Names: "); m->mothurOutEndLine();
341 for (int i = 0; i < outputNames.size(); i++) { m->mothurOut(outputNames[i]); m->mothurOutEndLine(); }
342 m->mothurOutEndLine();
346 catch(exception& e) {
347 m->errorOut(e, "ClusterSplitCommand", "execute");
351 //**********************************************************************************************************************
352 map<float, int> ClusterSplitCommand::completeListFile(vector<string> listNames, string singleton, set<string>& userLabels, ListVector*& listSingle){
355 map<float, int> labelBin;
356 vector<float> orderFloat;
360 if (singleton != "none") {
362 openInputFile(singleton, in);
364 string firstCol, secondCol;
365 listSingle = new ListVector();
367 in >> firstCol >> secondCol; gobble(in);
368 listSingle->push_back(secondCol);
371 remove(singleton.c_str());
373 numSingleBins = listSingle->getNumBins();
374 }else{ listSingle = NULL; numSingleBins = 0; }
376 //go through users set and make them floats so we can sort them
377 for(set<string>::iterator it = userLabels.begin(); it != userLabels.end(); ++it) {
380 if ((*it != "unique") && (convertTestFloat(*it, temp) == true)) { convert(*it, temp); }
381 else if (*it == "unique") { temp = -1.0; }
383 orderFloat.push_back(temp);
384 labelBin[temp] = numSingleBins; //initialize numbins
388 sort(orderFloat.begin(), orderFloat.end());
391 //get the list info from each file
392 for (int k = 0; k < listNames.size(); k++) {
394 if (m->control_pressed) {
395 if (listSingle != NULL) { delete listSingle; listSingle = NULL; remove(singleton.c_str()); }
396 for (int i = 0; i < listNames.size(); i++) { remove(listNames[i].c_str()); }
400 InputData* input = new InputData(listNames[k], "list");
401 ListVector* list = input->getListVector();
402 string lastLabel = list->getLabel();
404 string filledInList = listNames[k] + "filledInTemp";
406 openOutputFile(filledInList, outFilled);
408 //for each label needed
409 for(int l = 0; l < orderFloat.size(); l++){
412 if (orderFloat[l] == -1) { thisLabel = "unique"; }
413 else { thisLabel = toString(orderFloat[l], length-1); }
415 //this file has reached the end
417 list = input->getListVector(lastLabel, true);
418 }else{ //do you have the distance, or do you need to fill in
421 if (list->getLabel() == "unique") { labelFloat = -1.0; }
422 else { convert(list->getLabel(), labelFloat); }
424 //check for missing labels
425 if (labelFloat > orderFloat[l]) { //you are missing the label, get the next smallest one
426 //if its bigger get last label, otherwise keep it
428 list = input->getListVector(lastLabel, true); //get last list vector to use, you actually want to move back in the file
430 lastLabel = list->getLabel();
434 list->setLabel(thisLabel);
435 list->print(outFilled);
438 labelBin[orderFloat[l]] += list->getNumBins();
442 list = input->getListVector();
445 if (list != NULL) { delete list; }
449 remove(listNames[k].c_str());
450 rename(filledInList.c_str(), listNames[k].c_str());
455 catch(exception& e) {
456 m->errorOut(e, "ClusterSplitCommand", "completeListFile");
460 //**********************************************************************************************************************
461 int ClusterSplitCommand::mergeLists(vector<string> listNames, map<float, int> userLabels, ListVector* listSingle){
463 if (outputDir == "") { outputDir += hasPath(distfile); }
464 fileroot = outputDir + getRootName(getSimpleName(distfile));
466 openOutputFile(fileroot+ tag + ".sabund", outSabund);
467 openOutputFile(fileroot+ tag + ".rabund", outRabund);
468 openOutputFile(fileroot+ tag + ".list", outList);
470 outputNames.push_back(fileroot+ tag + ".sabund");
471 outputNames.push_back(fileroot+ tag + ".rabund");
472 outputNames.push_back(fileroot+ tag + ".list");
474 map<float, int>::iterator itLabel;
476 //for each label needed
477 for(itLabel = userLabels.begin(); itLabel != userLabels.end(); itLabel++) {
480 if (itLabel->first == -1) { thisLabel = "unique"; }
481 else { thisLabel = toString(itLabel->first, length-1); }
483 outList << thisLabel << '\t' << itLabel->second << '\t';
485 RAbundVector* rabund = new RAbundVector();
486 rabund->setLabel(thisLabel);
489 if (listSingle != NULL) {
490 for (int j = 0; j < listSingle->getNumBins(); j++) {
491 outList << listSingle->get(j) << '\t';
492 rabund->push_back(getNumNames(listSingle->get(j)));
496 //get the list info from each file
497 for (int k = 0; k < listNames.size(); k++) {
499 if (m->control_pressed) { if (listSingle != NULL) { delete listSingle; } for (int i = 0; i < listNames.size(); i++) { remove(listNames[i].c_str()); } delete rabund; return 0; }
501 InputData* input = new InputData(listNames[k], "list");
502 ListVector* list = input->getListVector(thisLabel);
504 //this file has reached the end
505 if (list == NULL) { m->mothurOut("Error merging listvectors in file " + listNames[k]); m->mothurOutEndLine(); }
507 for (int j = 0; j < list->getNumBins(); j++) {
508 outList << list->get(j) << '\t';
509 rabund->push_back(getNumNames(list->get(j)));
516 SAbundVector sabund = rabund->getSAbundVector();
518 sabund.print(outSabund);
519 rabund->print(outRabund);
529 if (listSingle != NULL) { delete listSingle; }
531 for (int i = 0; i < listNames.size(); i++) { remove(listNames[i].c_str()); }
535 catch(exception& e) {
536 m->errorOut(e, "ClusterSplitCommand", "mergeLists");
541 //**********************************************************************************************************************
543 void ClusterSplitCommand::printData(ListVector* oldList){
545 string label = oldList->getLabel();
546 RAbundVector oldRAbund = oldList->getRAbundVector();
548 oldRAbund.setLabel(label);
549 if (isTrue(showabund)) {
550 oldRAbund.getSAbundVector().print(cout);
552 oldRAbund.print(outRabund);
553 oldRAbund.getSAbundVector().print(outSabund);
555 oldList->print(outList);
557 catch(exception& e) {
558 m->errorOut(e, "ClusterSplitCommand", "printData");
562 //**********************************************************************************************************************
563 int ClusterSplitCommand::createProcesses(vector < vector < map<string, string> > > dividedNames){
566 #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux)
571 //loop through and create all the processes you want
572 while (process != processors) {
576 processIDS.push_back(pid); //create map from line number to pid so you can append files in correct order later
580 vector<string> listFileNames = cluster(dividedNames[process], labels);
582 //write out names to file
583 string filename = toString(getpid()) + ".temp";
585 openOutputFile(filename, out);
587 for (int j = 0; j < listFileNames.size(); j++) { out << listFileNames[j] << endl; }
592 filename = toString(getpid()) + ".temp.labels";
593 openOutputFile(filename, outLabels);
595 for (set<string>::iterator it = labels.begin(); it != labels.end(); it++) {
596 outLabels << (*it) << endl;
601 }else { m->mothurOut("unable to spawn the necessary processes."); m->mothurOutEndLine(); exit(0); }
604 //force parent to wait until all the processes are done
605 for (int i=0;i<processors;i++) {
606 int temp = processIDS[i];
614 catch(exception& e) {
615 m->errorOut(e, "ClusterSplitCommand", "createProcesses");
619 //**********************************************************************************************************************
621 vector<string> ClusterSplitCommand::cluster(vector< map<string, string> > distNames, set<string>& labels){
624 SparseMatrix* matrix;
627 RAbundVector* rabund;
629 vector<string> listFileNames;
631 //cluster each distance file
632 for (int i = 0; i < distNames.size(); i++) {
634 string thisNamefile = distNames[i].begin()->second;
635 string thisDistFile = distNames[i].begin()->first;
637 //read in distance file
638 globaldata->setNameFile(thisNamefile);
639 globaldata->setColumnFile(thisDistFile); globaldata->setFormat("column");
641 m->mothurOutEndLine(); m->mothurOut("Reading " + thisDistFile); m->mothurOutEndLine();
643 ReadMatrix* read = new ReadColumnMatrix(thisDistFile);
644 read->setCutoff(cutoff);
646 NameAssignment* nameMap = new NameAssignment(thisNamefile);
650 if (m->control_pressed) { delete read; delete nameMap; return listFileNames; }
652 list = read->getListVector();
654 matrix = read->getMatrix();
659 m->mothurOutEndLine(); m->mothurOut("Clustering " + thisDistFile); m->mothurOutEndLine();
661 rabund = new RAbundVector(list->getRAbundVector());
664 if (method == "furthest") { cluster = new CompleteLinkage(rabund, list, matrix, cutoff, method); }
665 else if(method == "nearest"){ cluster = new SingleLinkage(rabund, list, matrix, cutoff, method); }
666 else if(method == "average"){ cluster = new AverageLinkage(rabund, list, matrix, cutoff, method); }
667 tag = cluster->getTag();
669 if (outputDir == "") { outputDir += hasPath(thisDistFile); }
670 fileroot = outputDir + getRootName(getSimpleName(thisDistFile));
673 openOutputFile(fileroot+ tag + ".list", listFile);
675 listFileNames.push_back(fileroot+ tag + ".list");
677 time_t estart = time(NULL);
679 float previousDist = 0.00000;
680 float rndPreviousDist = 0.00000;
686 double saveCutoff = cutoff;
688 while (matrix->getSmallDist() < cutoff && matrix->getNNodes() > 0){
690 if (m->control_pressed) { //clean up
691 delete matrix; delete list; delete cluster; delete rabund;
693 for (int i = 0; i < listFileNames.size(); i++) { remove(listFileNames[i].c_str()); }
694 listFileNames.clear(); return listFileNames;
697 cluster->update(cutoff);
699 float dist = matrix->getSmallDist();
702 rndDist = ceilDist(dist, precision);
704 rndDist = roundDist(dist, precision);
707 if(previousDist <= 0.0000 && dist != previousDist){
708 oldList.setLabel("unique");
709 oldList.print(listFile);
710 if (labels.count("unique") == 0) { labels.insert("unique"); }
712 else if(rndDist != rndPreviousDist){
713 oldList.setLabel(toString(rndPreviousDist, length-1));
714 oldList.print(listFile);
715 if (labels.count(toString(rndPreviousDist, length-1)) == 0) { labels.insert(toString(rndPreviousDist, length-1)); }
719 rndPreviousDist = rndDist;
724 if(previousDist <= 0.0000){
725 oldList.setLabel("unique");
726 oldList.print(listFile);
727 if (labels.count("unique") == 0) { labels.insert("unique"); }
729 else if(rndPreviousDist<cutoff){
730 oldList.setLabel(toString(rndPreviousDist, length-1));
731 oldList.print(listFile);
732 if (labels.count(toString(rndPreviousDist, length-1)) == 0) { labels.insert(toString(rndPreviousDist, length-1)); }
735 delete matrix; delete list; delete cluster; delete rabund;
738 if (m->control_pressed) { //clean up
739 for (int i = 0; i < listFileNames.size(); i++) { remove(listFileNames[i].c_str()); }
740 listFileNames.clear(); return listFileNames;
743 remove(thisDistFile.c_str());
744 remove(thisNamefile.c_str());
748 return listFileNames;
751 catch(exception& e) {
752 m->errorOut(e, "ClusterSplitCommand", "cluster");
759 //**********************************************************************************************************************