5 * Created by westcott on 6/1/11.
6 * Copyright 2011 Schloss Lab. All rights reserved.
10 #include "countseqscommand.h"
11 #include "sharedutilities.h"
12 #include "counttable.h"
14 //**********************************************************************************************************************
15 vector<string> CountSeqsCommand::setParameters(){
17 CommandParameter pname("name", "InputTypes", "", "", "none", "none", "none","count",false,true,true); parameters.push_back(pname);
18 CommandParameter pgroup("group", "InputTypes", "", "", "none", "none", "none","",false,false,true); parameters.push_back(pgroup);
19 CommandParameter pprocessors("processors", "Number", "", "1", "", "", "","",false,false,true); parameters.push_back(pprocessors);
20 CommandParameter plarge("large", "Boolean", "", "F", "", "", "","",false,false); parameters.push_back(plarge);
21 CommandParameter pgroups("groups", "String", "", "", "", "", "","",false,false); parameters.push_back(pgroups);
22 CommandParameter pinputdir("inputdir", "String", "", "", "", "", "","",false,false); parameters.push_back(pinputdir);
23 CommandParameter poutputdir("outputdir", "String", "", "", "", "", "","",false,false); parameters.push_back(poutputdir);
25 vector<string> myArray;
26 for (int i = 0; i < parameters.size(); i++) { myArray.push_back(parameters[i].name); }
30 m->errorOut(e, "CountSeqsCommand", "setParameters");
34 //**********************************************************************************************************************
35 string CountSeqsCommand::getHelpString(){
37 string helpString = "";
38 helpString += "The count.seqs aka. make.table command reads a name file and outputs a .count_table file. You may also provide a group file to get the counts broken down by group.\n";
39 helpString += "The groups parameter allows you to indicate which groups you want to include in the counts, by default all groups in your groupfile are used.\n";
40 helpString += "The large parameter indicates the name and group files are too large to fit in RAM.\n";
41 helpString += "When you use the groups parameter and a sequence does not represent any sequences from the groups you specify it is not included in the .count.summary file.\n";
42 helpString += "The processors parameter allows you to specify the number of processors to use. The default is 1.\n";
43 helpString += "The count.seqs command should be in the following format: count.seqs(name=yourNameFile).\n";
44 helpString += "Example count.seqs(name=amazon.names) or make.table(name=amazon.names).\n";
45 helpString += "Note: No spaces between parameter labels (i.e. name), '=' and parameters (i.e.yourNameFile).\n";
49 m->errorOut(e, "CountSeqsCommand", "getHelpString");
53 //**********************************************************************************************************************
54 string CountSeqsCommand::getOutputPattern(string type) {
58 if (type == "count") { pattern = "[filename],count_table"; }
59 else { m->mothurOut("[ERROR]: No definition for type " + type + " output pattern.\n"); m->control_pressed = true; }
64 m->errorOut(e, "CountSeqsCommand", "getOutputPattern");
68 //**********************************************************************************************************************
69 CountSeqsCommand::CountSeqsCommand(){
71 abort = true; calledHelp = true;
73 vector<string> tempOutNames;
74 outputTypes["count"] = tempOutNames;
77 m->errorOut(e, "CountSeqsCommand", "CountSeqsCommand");
81 //**********************************************************************************************************************
83 CountSeqsCommand::CountSeqsCommand(string option) {
85 abort = false; calledHelp = false;
87 //allow user to run help
88 if(option == "help") { help(); abort = true; calledHelp = true; }
89 else if(option == "citation") { citation(); abort = true; calledHelp = true;}
91 vector<string> myArray = setParameters();
93 OptionParser parser(option);
94 map<string,string> parameters = parser.getParameters();
96 ValidParameters validParameter;
97 map<string,string>::iterator it;
99 //check to make sure all parameters are valid for command
100 for (map<string,string>::iterator it = parameters.begin(); it != parameters.end(); it++) {
101 if (validParameter.isValidParameter(it->first, myArray, it->second) != true) { abort = true; }
104 //initialize outputTypes
105 vector<string> tempOutNames;
106 outputTypes["count"] = tempOutNames;
109 //if the user changes the input directory command factory will send this info to us in the output parameter
110 string inputDir = validParameter.validFile(parameters, "inputdir", false);
111 if (inputDir == "not found"){ inputDir = ""; }
114 it = parameters.find("name");
115 //user has given a template file
116 if(it != parameters.end()){
117 path = m->hasPath(it->second);
118 //if the user has not given a path then, add inputdir. else leave path alone.
119 if (path == "") { parameters["name"] = inputDir + it->second; }
122 it = parameters.find("group");
123 //user has given a template file
124 if(it != parameters.end()){
125 path = m->hasPath(it->second);
126 //if the user has not given a path then, add inputdir. else leave path alone.
127 if (path == "") { parameters["group"] = inputDir + it->second; }
131 //check for required parameters
132 namefile = validParameter.validFile(parameters, "name", true);
133 if (namefile == "not open") { namefile = ""; abort = true; }
134 else if (namefile == "not found"){
135 namefile = m->getNameFile();
136 if (namefile != "") { m->mothurOut("Using " + namefile + " as input file for the name parameter."); m->mothurOutEndLine(); }
137 else { m->mothurOut("You have no current namefile and the name parameter is required."); m->mothurOutEndLine(); abort = true; }
138 }else { m->setNameFile(namefile); }
140 groupfile = validParameter.validFile(parameters, "group", true);
141 if (groupfile == "not open") { abort = true; }
142 else if (groupfile == "not found") { groupfile = ""; }
143 else { m->setGroupFile(groupfile); }
145 groups = validParameter.validFile(parameters, "groups", false);
146 if (groups == "not found") { groups = "all"; }
147 m->splitAtDash(groups, Groups);
149 string temp = validParameter.validFile(parameters, "large", false); if (temp == "not found") { temp = "F"; }
150 large = m->isTrue(temp);
152 temp = validParameter.validFile(parameters, "processors", false); if (temp == "not found"){ temp = m->getProcessors(); }
153 m->setProcessors(temp);
154 m->mothurConvert(temp, processors);
156 //if the user changes the output directory command factory will send this info to us in the output parameter
157 outputDir = validParameter.validFile(parameters, "outputdir", false); if (outputDir == "not found"){ outputDir = m->hasPath(namefile); }
162 catch(exception& e) {
163 m->errorOut(e, "CountSeqsCommand", "CountSeqsCommand");
167 //**********************************************************************************************************************
169 int CountSeqsCommand::execute(){
172 if (abort == true) { if (calledHelp) { return 0; } return 2; }
174 map<string, string> variables;
175 variables["[filename]"] = outputDir + m->getRootName(m->getSimpleName(namefile));
176 string outputFileName = getOutputFileName("count", variables);
179 int start = time(NULL);
180 if (!large) { total = processSmall(outputFileName); }
181 else { total = processLarge(outputFileName); }
183 if (m->control_pressed) { m->mothurRemove(outputFileName); return 0; }
185 m->mothurOut("It took " + toString(time(NULL) - start) + " secs to create a table for " + toString(total) + " sequences.");
186 m->mothurOutEndLine();
187 m->mothurOutEndLine();
189 //set rabund file as new current rabundfile
190 itTypes = outputTypes.find("count");
191 if (itTypes != outputTypes.end()) {
192 if ((itTypes->second).size() != 0) { string current = (itTypes->second)[0]; m->setCountTableFile(current); }
195 m->mothurOutEndLine();
196 m->mothurOut("Total number of sequences: " + toString(total)); m->mothurOutEndLine();
197 m->mothurOutEndLine();
198 m->mothurOut("Output File Names: "); m->mothurOutEndLine();
199 m->mothurOut(outputFileName); m->mothurOutEndLine();
200 m->mothurOutEndLine();
205 catch(exception& e) {
206 m->errorOut(e, "CountSeqsCommand", "execute");
210 //**********************************************************************************************************************
212 int CountSeqsCommand::processSmall(string outputFileName){
215 m->openOutputFile(outputFileName, out); outputTypes["count"].push_back(outputFileName);
216 outputNames.push_back(outputFileName); outputTypes["count"].push_back(outputFileName);
217 out << "Representative_Sequence\ttotal\t";
220 if (groupfile != "") {
221 groupMap = new GroupMap(groupfile); groupMap->readMap();
223 //make sure groups are valid. takes care of user setting groupNames that are invalid or setting groups=all
224 SharedUtil* util = new SharedUtil();
225 vector<string> nameGroups = groupMap->getNamesOfGroups();
226 util->setGroups(Groups, nameGroups);
229 //sort groupNames so that the group title match the counts below, this is needed because the map object automatically sorts
230 sort(Groups.begin(), Groups.end());
233 for (int i = 0; i < Groups.size(); i++) {
234 out << Groups[i] << '\t';
240 int total = createProcesses(groupMap, outputFileName);
242 if (groupfile != "") { delete groupMap; }
246 catch(exception& e) {
247 m->errorOut(e, "CountSeqsCommand", "processSmall");
251 /**************************************************************************************************/
252 int CountSeqsCommand::createProcesses(GroupMap*& groupMap, string outputFileName) {
255 vector<int> processIDS;
257 vector<unsigned long long> positions;
258 vector<linePair> lines;
261 #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) || (__linux__) || (__unix__) || (__unix)
262 positions = m->divideFilePerLine(namefile, processors);
263 for (int i = 0; i < (positions.size()-1); i++) { lines.push_back(linePair(positions[i], positions[(i+1)])); }
265 if(processors == 1){ lines.push_back(linePair(0, 1000)); }
268 positions = m->setFilePosEachLine(namefile, numSeqs);
269 if (positions.size() < processors) { processors = positions.size(); }
271 //figure out how many sequences you have to process
272 int numSeqsPerProcessor = numSeqs / processors;
273 for (int i = 0; i < processors; i++) {
274 int startIndex = i * numSeqsPerProcessor;
275 if(i == (processors - 1)){ numSeqsPerProcessor = numSeqs - i * numSeqsPerProcessor; }
276 lines.push_back(linePair(positions[startIndex], numSeqsPerProcessor));
282 #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) || (__linux__) || (__unix__) || (__unix)
284 //loop through and create all the processes you want
285 while (process != processors-1) {
289 processIDS.push_back(pid); //create map from line number to pid so you can append files in correct order later
292 string filename = toString(getpid()) + ".temp";
293 numSeqs = driver(lines[process].start, lines[process].end, filename, groupMap);
295 string tempFile = toString(getpid()) + ".num.temp";
297 m->openOutputFile(tempFile, outTemp);
299 outTemp << numSeqs << endl;
304 m->mothurOut("[ERROR]: unable to spawn the necessary processes."); m->mothurOutEndLine();
305 for (int i = 0; i < processIDS.size(); i++) { kill (processIDS[i], SIGINT); }
310 string filename = toString(getpid()) + ".temp";
311 numSeqs = driver(lines[processors-1].start, lines[processors-1].end, filename, groupMap);
313 //force parent to wait until all the processes are done
314 for (int i=0;i<processIDS.size();i++) {
315 int temp = processIDS[i];
319 for (int i = 0; i < processIDS.size(); i++) {
320 string tempFile = toString(processIDS[i]) + ".num.temp";
322 m->openInputFile(tempFile, intemp);
325 intemp >> num; intemp.close();
327 m->mothurRemove(tempFile);
330 vector<countData*> pDataArray;
331 DWORD dwThreadIdArray[processors-1];
332 HANDLE hThreadArray[processors-1];
333 vector<GroupMap*> copies;
335 //Create processor worker threads.
336 for( int i=0; i<processors-1; i++ ){
337 string filename = toString(i) + ".temp";
339 GroupMap* copyGroup = new GroupMap();
340 copyGroup->getCopy(groupMap);
341 copies.push_back(copyGroup);
342 vector<string> cGroups = Groups;
344 countData* temp = new countData(filename, copyGroup, m, lines[i].start, lines[i].end, groupfile, namefile, cGroups);
345 pDataArray.push_back(temp);
346 processIDS.push_back(i);
348 hThreadArray[i] = CreateThread(NULL, 0, MyCountThreadFunction, pDataArray[i], 0, &dwThreadIdArray[i]);
351 string filename = toString(processors-1) + ".temp";
352 numSeqs = driver(lines[processors-1].start, lines[processors-1].end, filename, groupMap);
354 //Wait until all threads have terminated.
355 WaitForMultipleObjects(processors-1, hThreadArray, TRUE, INFINITE);
357 //Close all thread handles and free memory allocations.
358 for(int i=0; i < pDataArray.size(); i++){
359 numSeqs += pDataArray[i]->total;
361 CloseHandle(hThreadArray[i]);
362 delete pDataArray[i];
366 //append output files
367 for(int i=0;i<processIDS.size();i++){
368 m->appendFiles((toString(processIDS[i]) + ".temp"), outputFileName);
369 m->mothurRemove((toString(processIDS[i]) + ".temp"));
371 m->appendFiles(filename, outputFileName);
372 m->mothurRemove(filename);
376 if (groupfile != "") {
377 if (numSeqs != groupMap->getNumSeqs()) {
378 m->mothurOut("[ERROR]: processes reported processing " + toString(numSeqs) + " sequences, but group file indicates you have " + toString(groupMap->getNumSeqs()) + " sequences.");
379 if (processors == 1) { m->mothurOut(" Could you have a file mismatch?\n"); }
380 else { m->mothurOut(" Either you have a file mismatch or a process failed to complete the task assigned to it.\n"); m->control_pressed = true; }
385 catch(exception& e) {
386 m->errorOut(e, "CountSeqsCommand", "createProcesses");
390 /**************************************************************************************************/
391 int CountSeqsCommand::driver(unsigned long long start, unsigned long long end, string outputFileName, GroupMap*& groupMap) {
395 m->openOutputFile(outputFileName, out);
398 m->openInputFile(namefile, in);
404 if (m->control_pressed) { break; }
406 string firstCol, secondCol;
407 in >> firstCol; m->gobble(in); in >> secondCol; m->gobble(in);
408 //cout << firstCol << '\t' << secondCol << endl;
409 m->checkName(firstCol);
410 m->checkName(secondCol);
411 //cout << firstCol << '\t' << secondCol << endl;
413 vector<string> names;
414 m->splitAtChar(secondCol, names, ',');
416 if (groupfile != "") {
418 map<string, int> groupCounts;
420 for (int i = 0; i < Groups.size(); i++) { groupCounts[Groups[i]] = 0; }
422 //get counts for each of the users groups
423 for (int i = 0; i < names.size(); i++) {
424 string group = groupMap->getGroup(names[i]);
426 if (group == "not found") { m->mothurOut("[ERROR]: " + names[i] + " is not in your groupfile, please correct."); m->mothurOutEndLine(); }
428 map<string, int>::iterator it = groupCounts.find(group);
430 //if not found, then this sequence is not from a group we care about
431 if (it != groupCounts.end()) {
439 out << firstCol << '\t' << total << '\t';
440 for (map<string, int>::iterator it = groupCounts.begin(); it != groupCounts.end(); it++) {
441 out << it->second << '\t';
446 out << firstCol << '\t' << names.size() << endl;
449 total += names.size();
451 #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) || (__linux__) || (__unix__) || (__unix)
452 unsigned long long pos = in.tellg();
453 if ((pos == -1) || (pos >= end)) { break; }
455 if (in.eof()) { break; }
465 catch(exception& e) {
466 m->errorOut(e, "CountSeqsCommand", "driver");
470 //**********************************************************************************************************************
472 int CountSeqsCommand::processLarge(string outputFileName){
474 set<string> namesOfGroups;
475 map<string, int> initial;
476 for (set<string>::iterator it = namesOfGroups.begin(); it != namesOfGroups.end(); it++) { initial[(*it)] = 0; }
478 m->openOutputFile(outputFileName, out);
479 outputNames.push_back(outputFileName); outputTypes["count"].push_back(outputFileName);
480 out << "Representative_Sequence\ttotal\t";
481 if (groupfile == "") { out << endl; }
483 map<string, unsigned long long> namesToIndex;
484 string outfile = m->getRootName(groupfile) + "sorted.groups.temp";
485 string outName = m->getRootName(namefile) + "sorted.name.temp";
486 map<int, string> indexToName;
487 map<int, string> indexToGroup;
488 if (groupfile != "") {
489 time_t estart = time(NULL);
490 //convert name file to redundant -> unique. set unique name equal to index so we can use vectors, save name for later.
491 string newNameFile = m->getRootName(namefile) + ".name.temp";
492 string newGroupFile = m->getRootName(groupfile) + ".group.temp";
493 indexToName = processNameFile(newNameFile);
494 indexToGroup = getGroupNames(newGroupFile, namesOfGroups);
496 //sort file by first column so the names of sequences will be easier to find
498 #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) || (__linux__) || (__unix__) || (__unix)
499 string command = "sort -n " + newGroupFile + " -o " + outfile;
500 system(command.c_str());
501 command = "sort -n " + newNameFile + " -o " + outName;
502 system(command.c_str());
503 #else //sort using windows sort
504 string command = "sort " + newGroupFile + " /O " + outfile;
505 system(command.c_str());
506 command = "sort " + newNameFile + " /O " + outName;
507 system(command.c_str());
509 m->mothurRemove(newNameFile);
510 m->mothurRemove(newGroupFile);
512 m->mothurOut("It took " + toString(time(NULL) - estart) + " seconds to sort and index the group and name files. "); m->mothurOutEndLine();
513 }else { outName = namefile; }
515 time_t estart = time(NULL);
518 m->openInputFile(outName, in);
524 vector< vector<int> > nameMapCount;
525 if (groupfile != "") {
526 m->openInputFile(outfile, in2);
527 nameMapCount.resize(indexToName.size());
528 for (int i = 0; i < nameMapCount.size(); i++) {
529 nameMapCount[i].resize(indexToGroup.size(), 0);
534 if (m->control_pressed) { break; }
537 in >> firstCol; m->gobble(in);
539 if (groupfile != "") {
541 in >> uniqueIndex; m->gobble(in);
543 string name; int groupIndex;
544 in2 >> name >> groupIndex; m->gobble(in2);
546 if (name != firstCol) { m->mothurOut("[ERROR]: found " + name + " in your groupfile, but " + firstCol + " was in your namefile, please correct.\n"); m->control_pressed = true; }
548 nameMapCount[uniqueIndex][groupIndex]++;
552 in >> secondCol; m->gobble(in);
553 int num = m->getNumNames(secondCol);
554 out << firstCol << '\t' << num << endl;
560 if (groupfile != "") {
561 m->mothurRemove(outfile);
562 m->mothurRemove(outName);
564 for (map<int, string>::iterator it = indexToGroup.begin(); it != indexToGroup.end(); it++) { out << it->second << '\t'; }
566 for (int i = 0; i < nameMapCount.size(); i++) {
567 string totalsLine = "";
569 for (int j = 0; j < nameMapCount[i].size(); j++) {
570 seqTotal += nameMapCount[i][j];
571 totalsLine += toString(nameMapCount[i][j]) + '\t';
573 out << indexToName[i] << '\t' << seqTotal << '\t' << totalsLine << endl;
579 m->mothurOut("It took " + toString(time(NULL) - estart) + " seconds to create the count table file. "); m->mothurOutEndLine();
583 catch(exception& e) {
584 m->errorOut(e, "CountSeqsCommand", "processLarge");
588 /**************************************************************************************************/
589 map<int, string> CountSeqsCommand::processNameFile(string name) {
591 map<int, string> indexToNames;
594 m->openOutputFile(name, out);
598 m->openInputFile(namefile, in);
602 bool pairDone = false;
603 bool columnOne = true;
604 string firstCol, secondCol;
608 if (m->control_pressed) { break; }
610 in.read(buffer, 4096);
611 vector<string> pieces = m->splitWhiteSpace(rest, buffer, in.gcount());
613 for (int i = 0; i < pieces.size(); i++) {
614 if (columnOne) { firstCol = pieces[i]; columnOne=false; }
615 else { secondCol = pieces[i]; pairDone = true; columnOne=true; }
618 m->checkName(firstCol);
619 m->checkName(secondCol);
620 //parse names into vector
621 vector<string> theseNames;
622 m->splitAtComma(secondCol, theseNames);
623 for (int i = 0; i < theseNames.size(); i++) { out << theseNames[i] << '\t' << count << endl; }
624 indexToNames[count] = firstCol;
634 vector<string> pieces = m->splitWhiteSpace(rest);
636 for (int i = 0; i < pieces.size(); i++) {
637 if (columnOne) { firstCol = pieces[i]; columnOne=false; }
638 else { secondCol = pieces[i]; pairDone = true; columnOne=true; }
641 m->checkName(firstCol);
642 m->checkName(secondCol);
643 //parse names into vector
644 vector<string> theseNames;
645 m->splitAtComma(secondCol, theseNames);
646 for (int i = 0; i < theseNames.size(); i++) { out << theseNames[i] << '\t' << count << endl; }
647 indexToNames[count] = firstCol;
657 catch(exception& e) {
658 m->errorOut(e, "CountSeqsCommand", "processNameFile");
662 /**************************************************************************************************/
663 map<int, string> CountSeqsCommand::getGroupNames(string filename, set<string>& namesOfGroups) {
665 map<int, string> indexToGroups;
666 map<string, int> groupIndex;
667 map<string, int>::iterator it;
670 m->openOutputFile(filename, out);
674 m->openInputFile(groupfile, in);
678 bool pairDone = false;
679 bool columnOne = true;
680 string firstCol, secondCol;
684 if (m->control_pressed) { break; }
686 in.read(buffer, 4096);
687 vector<string> pieces = m->splitWhiteSpace(rest, buffer, in.gcount());
689 for (int i = 0; i < pieces.size(); i++) {
690 if (columnOne) { firstCol = pieces[i]; columnOne=false; }
691 else { secondCol = pieces[i]; pairDone = true; columnOne=true; }
694 m->checkName(firstCol);
695 it = groupIndex.find(secondCol);
696 if (it == groupIndex.end()) { //add group, assigning the group and number so we can use vectors above
697 groupIndex[secondCol] = count;
700 out << firstCol << '\t' << groupIndex[secondCol] << endl;
701 namesOfGroups.insert(secondCol);
710 vector<string> pieces = m->splitWhiteSpace(rest);
712 for (int i = 0; i < pieces.size(); i++) {
713 if (columnOne) { firstCol = pieces[i]; columnOne=false; }
714 else { secondCol = pieces[i]; pairDone = true; columnOne=true; }
717 m->checkName(firstCol);
718 it = groupIndex.find(secondCol);
719 if (it == groupIndex.end()) { //add group, assigning the group and number so we can use vectors above
720 groupIndex[secondCol] = count;
723 out << firstCol << '\t' << groupIndex[secondCol] << endl;
724 namesOfGroups.insert(secondCol);
730 for (it = groupIndex.begin(); it != groupIndex.end(); it++) { indexToGroups[it->second] = it->first; }
732 return indexToGroups;
734 catch(exception& e) {
735 m->errorOut(e, "CountSeqsCommand", "getGroupNames");
739 //**********************************************************************************************************************