OptionParser parser(option);
map<string,string> parameters = parser.getParameters();
- ValidParameters validParameter;
+ ValidParameters validParameter("cluster.split");
//check to make sure all parameters are valid for command
map<string,string>::iterator it;
it = parameters.find("phylip");
//user has given a template file
if(it != parameters.end()){
- path = hasPath(it->second);
+ path = m->hasPath(it->second);
//if the user has not given a path then, add inputdir. else leave path alone.
if (path == "") { parameters["phylip"] = inputDir + it->second; }
}
it = parameters.find("column");
//user has given a template file
if(it != parameters.end()){
- path = hasPath(it->second);
+ path = m->hasPath(it->second);
//if the user has not given a path then, add inputdir. else leave path alone.
if (path == "") { parameters["column"] = inputDir + it->second; }
}
it = parameters.find("name");
//user has given a template file
if(it != parameters.end()){
- path = hasPath(it->second);
+ path = m->hasPath(it->second);
//if the user has not given a path then, add inputdir. else leave path alone.
if (path == "") { parameters["name"] = inputDir + it->second; }
}
it = parameters.find("taxonomy");
//user has given a template file
if(it != parameters.end()){
- path = hasPath(it->second);
+ path = m->hasPath(it->second);
//if the user has not given a path then, add inputdir. else leave path alone.
if (path == "") { parameters["taxonomy"] = inputDir + it->second; }
}
it = parameters.find("fasta");
//user has given a template file
if(it != parameters.end()){
- path = hasPath(it->second);
+ path = m->hasPath(it->second);
//if the user has not given a path then, add inputdir. else leave path alone.
if (path == "") { parameters["fasta"] = inputDir + it->second; }
}
fastafile = validParameter.validFile(parameters, "fasta", true);
if (fastafile == "not open") { abort = true; }
else if (fastafile == "not found") { fastafile = ""; }
- else { splitmethod = "fasta"; }
+ else { distfile = fastafile; splitmethod = "fasta"; }
taxFile = validParameter.validFile(parameters, "taxonomy", true);
if (taxFile == "not open") { abort = true; }
convert(temp, precision);
temp = validParameter.validFile(parameters, "hard", false); if (temp == "not found") { temp = "F"; }
- hard = isTrue(temp);
+ hard = m->isTrue(temp);
temp = validParameter.validFile(parameters, "large", false); if (temp == "not found") { temp = "F"; }
- large = isTrue(temp);
+ large = m->isTrue(temp);
temp = validParameter.validFile(parameters, "processors", false); if (temp == "not found"){ temp = "1"; }
convert(temp, processors);
m->mothurOut("The taxonomy parameter allows you to enter the taxonomy file for your sequences, this is only valid if you are using splitmethod=classify. Be sure your taxonomy file does not include the probability scores. \n");
m->mothurOut("The taxlevel parameter allows you to specify the taxonomy level you want to use to split the distance file, default=1. \n");
m->mothurOut("The large parameter allows you to indicate that your distance matrix is too large to fit in RAM. The default value is false.\n");
+ #ifdef USE_MPI
+ m->mothurOut("When using MPI, the processors parameter is set to the number of MPI processes running. \n");
+ #endif
m->mothurOut("The cluster.split command should be in the following format: \n");
m->mothurOut("cluster.split(column=youDistanceFile, name=yourNameFile, method=yourMethod, cutoff=yourCutoff, precision=yourPrecision, splitmethod=yourSplitmethod, taxonomy=yourTaxonomyfile, taxlevel=yourtaxlevel) \n");
m->mothurOut("Example: cluster.split(column=abrecovery.dist, name=abrecovery.names, method=furthest, cutoff=0.10, precision=1000, splitmethod=classify, taxonomy=abrecovery.silva.slv.taxonomy, taxlevel=5) \n");
try {
if (abort == true) { return 0; }
+
time_t estart;
+ vector<string> listFileNames;
+ set<string> labels;
+ string singletonName = "";
+ double saveCutoff = cutoff;
+
//****************** file prep work ******************************//
+ #ifdef USE_MPI
+ int pid;
+ int tag = 2001;
+ MPI_Status status;
+ MPI_Comm_size(MPI_COMM_WORLD, &processors); //set processors to the number of mpi processes running
+ MPI_Comm_rank(MPI_COMM_WORLD, &pid); //find out who we are
+
+ if (pid == 0) { //only process 0 converts and splits
+
+ #endif
//if user gave a phylip file convert to column file
if (format == "phylip") {
if (namefile == "") { //you need to make a namefile for split matrix
ofstream out;
namefile = phylipfile + ".names";
- openOutputFile(namefile, out);
+ m->openOutputFile(namefile, out);
for (int i = 0; i < listToMakeNameFile->getNumBins(); i++) {
string bin = listToMakeNameFile->get(i);
out << bin << '\t' << bin << endl;
//split matrix into non-overlapping groups
SplitMatrix* split;
- if (splitmethod == "distance") { split = new SplitMatrix(distfile, namefile, taxFile, cutoff, splitmethod, large); }
- else if (splitmethod == "classify") { split = new SplitMatrix(distfile, namefile, taxFile, taxLevelCutoff, splitmethod, large); }
- else if (splitmethod == "fasta") { split = new SplitMatrix(fastafile, namefile, taxFile, taxLevelCutoff, splitmethod, processors); }
- else { m->mothurOut("Not a valid splitting method. Valid splitting algorithms are distance, classify or fasta."); m->mothurOutEndLine(); return 0; }
+ if (splitmethod == "distance") { split = new SplitMatrix(distfile, namefile, taxFile, cutoff, splitmethod, large); }
+ else if (splitmethod == "classify") { split = new SplitMatrix(distfile, namefile, taxFile, taxLevelCutoff, splitmethod, large); }
+ else if (splitmethod == "fasta") { split = new SplitMatrix(fastafile, namefile, taxFile, taxLevelCutoff, splitmethod, processors, outputDir); }
+ else { m->mothurOut("Not a valid splitting method. Valid splitting algorithms are distance, classify or fasta."); m->mothurOutEndLine(); return 0; }
split->split();
if (m->control_pressed) { delete split; return 0; }
- string singletonName = split->getSingletonNames();
+ singletonName = split->getSingletonNames();
vector< map<string, string> > distName = split->getDistanceFiles(); //returns map of distance files -> namefile sorted by distance file size
delete split;
estart = time(NULL);
//****************** break up files between processes and cluster each file set ******************************//
- vector<string> listFileNames;
- set<string> labels;
+ #ifdef USE_MPI
+ ////you are process 0 from above////
+
+ vector < vector < map<string, string> > > dividedNames; //distNames[1] = vector of filenames for process 1...
+ dividedNames.resize(processors);
+
+ //for each file group figure out which process will complete it
+ //want to divide the load intelligently so the big files are spread between processes
+ int count = 1;
+ for (int i = 0; i < distName.size(); i++) {
+ int processToAssign = (i+1) % processors;
+ if (processToAssign == 0) { processToAssign = processors; }
+
+ dividedNames[(processToAssign-1)].push_back(distName[i]);
+ }
+
+ //not lets reverse the order of ever other process, so we balance big files running with little ones
+ for (int i = 0; i < processors; i++) {
+ int remainder = ((i+1) % processors);
+ if (remainder) { reverse(dividedNames[i].begin(), dividedNames[i].end()); }
+ }
+
+
+ //send each child the list of files it needs to process
+ for(int i = 1; i < processors; i++) {
+ //send number of file pairs
+ int num = dividedNames[i].size();
+ MPI_Send(&num, 1, MPI_INT, i, tag, MPI_COMM_WORLD);
+
+ for (int j = 0; j < num; j++) { //send filenames to process i
+ char tempDistFileName[1024];
+ strcpy(tempDistFileName, (dividedNames[i][j].begin()->first).c_str());
+ int lengthDist = (dividedNames[i][j].begin()->first).length();
+
+ MPI_Send(&lengthDist, 1, MPI_INT, i, tag, MPI_COMM_WORLD);
+ MPI_Send(tempDistFileName, 1024, MPI_CHAR, i, tag, MPI_COMM_WORLD);
+
+ char tempNameFileName[1024];
+ strcpy(tempNameFileName, (dividedNames[i][j].begin()->second).c_str());
+ int lengthName = (dividedNames[i][j].begin()->second).length();
+
+ MPI_Send(&lengthName, 1, MPI_INT, i, tag, MPI_COMM_WORLD);
+ MPI_Send(tempNameFileName, 1024, MPI_CHAR, i, tag, MPI_COMM_WORLD);
+ }
+ }
+
+ //process your share
+ listFileNames = cluster(dividedNames[0], labels);
+
+ //receive the other processes info
+ for(int i = 1; i < processors; i++) {
+ int num = dividedNames[i].size();
+
+ double tempCutoff;
+ MPI_Recv(&tempCutoff, 1, MPI_DOUBLE, i, tag, MPI_COMM_WORLD, &status);
+ if (tempCutoff < cutoff) { cutoff = tempCutoff; }
+
+ //send list filenames to root process
+ for (int j = 0; j < num; j++) {
+ int lengthList = 0;
+ char tempListFileName[1024];
+
+ MPI_Recv(&lengthList, 1, MPI_INT, i, tag, MPI_COMM_WORLD, &status);
+ MPI_Recv(tempListFileName, 1024, MPI_CHAR, i, tag, MPI_COMM_WORLD, &status);
+
+ string myListFileName = tempListFileName;
+ myListFileName = myListFileName.substr(0, lengthList);
+
+ listFileNames.push_back(myListFileName);
+ }
+
+ //send Labels to root process
+ int numLabels = 0;
+ MPI_Recv(&numLabels, 1, MPI_INT, i, tag, MPI_COMM_WORLD, &status);
+
+ for (int j = 0; j < numLabels; j++) {
+ int lengthLabel = 0;
+ char tempLabel[100];
+
+ MPI_Recv(&lengthLabel, 1, MPI_INT, i, tag, MPI_COMM_WORLD, &status);
+ MPI_Recv(tempLabel, 100, MPI_CHAR, i, tag, MPI_COMM_WORLD, &status);
+
+ string myLabel = tempLabel;
+ myLabel = myLabel.substr(0, lengthLabel);
+
+ if (labels.count(myLabel) == 0) { labels.insert(myLabel); }
+ }
+ }
+
+ }else { //you are a child process
+ vector < map<string, string> > myNames;
+
+ //recieve the files you need to process
+ //receive number of file pairs
+ int num = 0;
+ MPI_Recv(&num, 1, MPI_INT, 0, tag, MPI_COMM_WORLD, &status);
+
+ myNames.resize(num);
+
+ for (int j = 0; j < num; j++) { //receive filenames to process
+ int lengthDist = 0;
+ char tempDistFileName[1024];
+
+ MPI_Recv(&lengthDist, 1, MPI_INT, 0, tag, MPI_COMM_WORLD, &status);
+ MPI_Recv(tempDistFileName, 1024, MPI_CHAR, 0, tag, MPI_COMM_WORLD, &status);
+
+ string myDistFileName = tempDistFileName;
+ myDistFileName = myDistFileName.substr(0, lengthDist);
+
+ int lengthName = 0;
+ char tempNameFileName[1024];
+
+ MPI_Recv(&lengthName, 1, MPI_INT, 0, tag, MPI_COMM_WORLD, &status);
+ MPI_Recv(tempNameFileName, 1024, MPI_CHAR, 0, tag, MPI_COMM_WORLD, &status);
+
+ string myNameFileName = tempNameFileName;
+ myNameFileName = myNameFileName.substr(0, lengthName);
+
+ //save file name
+ myNames[j][myDistFileName] = myNameFileName;
+ }
+
+ //process them
+ listFileNames = cluster(myNames, labels);
+
+ //send cutoff
+ MPI_Send(&cutoff, 1, MPI_DOUBLE, 0, tag, MPI_COMM_WORLD);
+
+ //send list filenames to root process
+ for (int j = 0; j < num; j++) {
+ char tempListFileName[1024];
+ strcpy(tempListFileName, listFileNames[j].c_str());
+ int lengthList = listFileNames[j].length();
+
+ MPI_Send(&lengthList, 1, MPI_INT, 0, tag, MPI_COMM_WORLD);
+ MPI_Send(tempListFileName, 1024, MPI_CHAR, 0, tag, MPI_COMM_WORLD);
+ }
+
+ //send Labels to root process
+ int numLabels = labels.size();
+ MPI_Send(&numLabels, 1, MPI_INT, 0, tag, MPI_COMM_WORLD);
+
+ for(set<string>::iterator it = labels.begin(); it != labels.end(); ++it) {
+ char tempLabel[100];
+ strcpy(tempLabel, (*it).c_str());
+ int lengthLabel = (*it).length();
+
+ MPI_Send(&lengthLabel, 1, MPI_INT, 0, tag, MPI_COMM_WORLD);
+ MPI_Send(tempLabel, 100, MPI_CHAR, 0, tag, MPI_COMM_WORLD);
+ }
+ }
+
+ //make everyone wait
+ MPI_Barrier(MPI_COMM_WORLD);
+
+ #else
+
#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux)
if(processors == 1){
listFileNames = cluster(distName, labels); //clusters individual files and returns names of list files
for(int i=0;i<processors;i++){
string filename = toString(processIDS[i]) + ".temp";
ifstream in;
- openInputFile(filename, in);
+ m->openInputFile(filename, in);
- in >> tag; gobble(in);
+ in >> tag; m->gobble(in);
while(!in.eof()) {
string tempName;
- in >> tempName; gobble(in);
+ in >> tempName; m->gobble(in);
listFileNames.push_back(tempName);
}
in.close();
//get labels
filename = toString(processIDS[i]) + ".temp.labels";
ifstream in2;
- openInputFile(filename, in2);
+ m->openInputFile(filename, in2);
+
+ float tempCutoff;
+ in2 >> tempCutoff; m->gobble(in2);
+ if (tempCutoff < cutoff) { cutoff = tempCutoff; }
while(!in2.eof()) {
string tempName;
- in2 >> tempName; gobble(in2);
+ in2 >> tempName; m->gobble(in2);
if (labels.count(tempName) == 0) { labels.insert(tempName); }
}
in2.close();
#else
listFileNames = cluster(distName, labels); //clusters individual files and returns names of list files
#endif
-
+ #endif
if (m->control_pressed) { for (int i = 0; i < listFileNames.size(); i++) { remove(listFileNames[i].c_str()); } return 0; }
+ if (saveCutoff != cutoff) { m->mothurOut("Cutoff was " + toString(saveCutoff) + " changed cutoff to " + toString(cutoff)); m->mothurOutEndLine(); }
+
m->mothurOut("It took " + toString(time(NULL) - estart) + " seconds to cluster"); m->mothurOutEndLine();
//****************** merge list file and create rabund and sabund files ******************************//
estart = time(NULL);
m->mothurOut("Merging the clustered files..."); m->mothurOutEndLine();
+ #ifdef USE_MPI
+ if (pid == 0) { //only process 0 merges
+ #endif
+
ListVector* listSingle;
map<float, int> labelBins = completeListFile(listFileNames, singletonName, labels, listSingle); //returns map of label to numBins
m->mothurOut("Output File Names: "); m->mothurOutEndLine();
for (int i = 0; i < outputNames.size(); i++) { m->mothurOut(outputNames[i]); m->mothurOutEndLine(); }
m->mothurOutEndLine();
+
+ #ifdef USE_MPI
+ } //only process 0 merges
+
+ //make everyone wait
+ MPI_Barrier(MPI_COMM_WORLD);
+ #endif
return 0;
}
//read in singletons
if (singleton != "none") {
ifstream in;
- openInputFile(singleton, in);
+ m->openInputFile(singleton, in);
string firstCol, secondCol;
listSingle = new ListVector();
while (!in.eof()) {
- in >> firstCol >> secondCol; gobble(in);
+ in >> firstCol >> secondCol; m->gobble(in);
listSingle->push_back(secondCol);
}
in.close();
if ((*it != "unique") && (convertTestFloat(*it, temp) == true)) { convert(*it, temp); }
else if (*it == "unique") { temp = -1.0; }
-
- orderFloat.push_back(temp);
- labelBin[temp] = numSingleBins; //initialize numbins
+
+ if (temp <= cutoff) {
+ orderFloat.push_back(temp);
+ labelBin[temp] = numSingleBins; //initialize numbins
+ }
}
//sort order
string filledInList = listNames[k] + "filledInTemp";
ofstream outFilled;
- openOutputFile(filledInList, outFilled);
+ m->openOutputFile(filledInList, outFilled);
//for each label needed
for(int l = 0; l < orderFloat.size(); l++){
//**********************************************************************************************************************
int ClusterSplitCommand::mergeLists(vector<string> listNames, map<float, int> userLabels, ListVector* listSingle){
try {
- if (outputDir == "") { outputDir += hasPath(distfile); }
- fileroot = outputDir + getRootName(getSimpleName(distfile));
+ if (outputDir == "") { outputDir += m->hasPath(distfile); }
+ fileroot = outputDir + m->getRootName(m->getSimpleName(distfile));
- openOutputFile(fileroot+ tag + ".sabund", outSabund);
- openOutputFile(fileroot+ tag + ".rabund", outRabund);
- openOutputFile(fileroot+ tag + ".list", outList);
+ m->openOutputFile(fileroot+ tag + ".sabund", outSabund);
+ m->openOutputFile(fileroot+ tag + ".rabund", outRabund);
+ m->openOutputFile(fileroot+ tag + ".list", outList);
outputNames.push_back(fileroot+ tag + ".sabund");
outputNames.push_back(fileroot+ tag + ".rabund");
if (listSingle != NULL) {
for (int j = 0; j < listSingle->getNumBins(); j++) {
outList << listSingle->get(j) << '\t';
- rabund->push_back(getNumNames(listSingle->get(j)));
+ rabund->push_back(m->getNumNames(listSingle->get(j)));
}
}
else {
for (int j = 0; j < list->getNumBins(); j++) {
outList << list->get(j) << '\t';
- rabund->push_back(getNumNames(list->get(j)));
+ rabund->push_back(m->getNumNames(list->get(j)));
}
delete list;
}
RAbundVector oldRAbund = oldList->getRAbundVector();
oldRAbund.setLabel(label);
- if (isTrue(showabund)) {
+ if (m->isTrue(showabund)) {
oldRAbund.getSAbundVector().print(cout);
}
oldRAbund.print(outRabund);
//write out names to file
string filename = toString(getpid()) + ".temp";
ofstream out;
- openOutputFile(filename, out);
+ m->openOutputFile(filename, out);
out << tag << endl;
for (int j = 0; j < listFileNames.size(); j++) { out << listFileNames[j] << endl; }
out.close();
//print out labels
ofstream outLabels;
filename = toString(getpid()) + ".temp.labels";
- openOutputFile(filename, outLabels);
-
+ m->openOutputFile(filename, outLabels);
+
+ outLabels << cutoff << endl;
for (set<string>::iterator it = labels.begin(); it != labels.end(); it++) {
outLabels << (*it) << endl;
}
vector<string> listFileNames;
+ double smallestCutoff = cutoff;
+
//cluster each distance file
for (int i = 0; i < distNames.size(); i++) {
+ if (m->control_pressed) { return listFileNames; }
string thisNamefile = distNames[i].begin()->second;
string thisDistFile = distNames[i].begin()->first;
globaldata->setNameFile(thisNamefile);
globaldata->setColumnFile(thisDistFile); globaldata->setFormat("column");
+ #ifdef USE_MPI
+ int pid;
+ MPI_Comm_rank(MPI_COMM_WORLD, &pid); //find out who we are
+
+ //output your files too
+ if (pid != 0) {
+ cout << endl << "Reading " << thisDistFile << endl;
+ }
+ #endif
+
m->mothurOutEndLine(); m->mothurOut("Reading " + thisDistFile); m->mothurOutEndLine();
ReadMatrix* read = new ReadColumnMatrix(thisDistFile);
delete read;
delete nameMap;
+
+ #ifdef USE_MPI
+ //output your files too
+ if (pid != 0) {
+ cout << endl << "Clustering " << thisDistFile << endl;
+ }
+ #endif
+
m->mothurOutEndLine(); m->mothurOut("Clustering " + thisDistFile); m->mothurOutEndLine();
rabund = new RAbundVector(list->getRAbundVector());
else if(method == "average"){ cluster = new AverageLinkage(rabund, list, matrix, cutoff, method); }
tag = cluster->getTag();
- if (outputDir == "") { outputDir += hasPath(thisDistFile); }
- fileroot = outputDir + getRootName(getSimpleName(thisDistFile));
+ if (outputDir == "") { outputDir += m->hasPath(thisDistFile); }
+ fileroot = outputDir + m->getRootName(m->getSimpleName(thisDistFile));
ofstream listFile;
- openOutputFile(fileroot+ tag + ".list", listFile);
+ m->openOutputFile(fileroot+ tag + ".list", listFile);
listFileNames.push_back(fileroot+ tag + ".list");
listFileNames.clear(); return listFileNames;
}
- cluster->update(cutoff);
+ cluster->update(saveCutoff);
float dist = matrix->getSmallDist();
float rndDist;
if (hard) {
- rndDist = ceilDist(dist, precision);
+ rndDist = m->ceilDist(dist, precision);
}else{
- rndDist = roundDist(dist, precision);
+ rndDist = m->roundDist(dist, precision);
}
if(previousDist <= 0.0000 && dist != previousDist){
remove(thisDistFile.c_str());
remove(thisNamefile.c_str());
+
+ if (saveCutoff != cutoff) {
+ if (hard) { saveCutoff = m->ceilDist(saveCutoff, precision); }
+ else { saveCutoff = m->roundDist(saveCutoff, precision); }
+
+ m->mothurOut("Cutoff was " + toString(cutoff) + " changed cutoff to " + toString(saveCutoff)); m->mothurOutEndLine();
+ }
+
+ if (saveCutoff < smallestCutoff) { smallestCutoff = saveCutoff; }
}
+ cutoff = smallestCutoff;
return listFileNames;