*/
#include "countseqscommand.h"
-#include "groupmap.h"
#include "sharedutilities.h"
#include "counttable.h"
string temp = validParameter.validFile(parameters, "large", false); if (temp == "not found") { temp = "F"; }
large = m->isTrue(temp);
+
+ temp = validParameter.validFile(parameters, "processors", false); if (temp == "not found"){ temp = m->getProcessors(); }
+ m->setProcessors(temp);
+ m->mothurConvert(temp, processors);
//if the user changes the output directory command factory will send this info to us in the output parameter
outputDir = validParameter.validFile(parameters, "outputdir", false); if (outputDir == "not found"){ outputDir = m->hasPath(namefile); }
string outputFileName = getOutputFileName("count", variables);
int total = 0;
+ int start = time(NULL);
if (!large) { total = processSmall(outputFileName); }
else { total = processLarge(outputFileName); }
if (m->control_pressed) { m->mothurRemove(outputFileName); return 0; }
+ m->mothurOut("It took " + toString(time(NULL) - start) + " secs to create a table for " + toString(total) + " sequences.");
+ m->mothurOutEndLine();
+ m->mothurOutEndLine();
+
//set rabund file as new current rabundfile
itTypes = outputTypes.find("count");
if (itTypes != outputTypes.end()) {
}
}
out << endl;
+ out.close();
+
+ int total = createProcesses(groupMap, outputFileName);
+
+ if (groupfile != "") { delete groupMap; }
+
+ return total;
+ }
+ catch(exception& e) {
+ m->errorOut(e, "CountSeqsCommand", "processSmall");
+ exit(1);
+ }
+}
+/**************************************************************************************************/
+int CountSeqsCommand::createProcesses(GroupMap*& groupMap, string outputFileName) {
+ try {
- //open input file
- ifstream in;
+ vector<int> processIDS;
+ int process = 0;
+ vector<unsigned long long> positions;
+ vector<linePair> lines;
+ int numSeqs = 0;
+
+#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) || (__linux__) || (__unix__) || (__unix)
+ positions = m->divideFilePerLine(namefile, processors);
+ for (int i = 0; i < (positions.size()-1); i++) { lines.push_back(linePair(positions[i], positions[(i+1)])); }
+#else
+ if(processors == 1){ lines.push_back(linePair(0, 1000)); }
+ else {
+ int numSeqs = 0;
+ positions = m->setFilePosEachLine(namefile, numSeqs);
+ if (positions.size() < processors) { processors = positions.size(); }
+
+ //figure out how many sequences you have to process
+ int numSeqsPerProcessor = numSeqs / processors;
+ for (int i = 0; i < processors; i++) {
+ int startIndex = i * numSeqsPerProcessor;
+ if(i == (processors - 1)){ numSeqsPerProcessor = numSeqs - i * numSeqsPerProcessor; }
+ lines.push_back(linePair(positions[startIndex], numSeqsPerProcessor));
+ }
+ }
+#endif
+
+
+#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) || (__linux__) || (__unix__) || (__unix)
+
+ //loop through and create all the processes you want
+ while (process != processors-1) {
+ int pid = fork();
+
+ if (pid > 0) {
+ processIDS.push_back(pid); //create map from line number to pid so you can append files in correct order later
+ process++;
+ }else if (pid == 0){
+ string filename = toString(getpid()) + ".temp";
+ numSeqs = driver(lines[process].start, lines[process].end, filename, groupMap);
+
+ string tempFile = toString(getpid()) + ".num.temp";
+ ofstream outTemp;
+ m->openOutputFile(tempFile, outTemp);
+
+ outTemp << numSeqs << endl;
+ outTemp.close();
+
+ exit(0);
+ }else {
+ m->mothurOut("[ERROR]: unable to spawn the necessary processes."); m->mothurOutEndLine();
+ for (int i = 0; i < processIDS.size(); i++) { kill (processIDS[i], SIGINT); }
+ exit(0);
+ }
+ }
+
+ string filename = toString(getpid()) + ".temp";
+ numSeqs = driver(lines[processors-1].start, lines[processors-1].end, filename, groupMap);
+
+ //force parent to wait until all the processes are done
+ for (int i=0;i<processIDS.size();i++) {
+ int temp = processIDS[i];
+ wait(&temp);
+ }
+
+ for (int i = 0; i < processIDS.size(); i++) {
+ string tempFile = toString(processIDS[i]) + ".num.temp";
+ ifstream intemp;
+ m->openInputFile(tempFile, intemp);
+
+ int num;
+ intemp >> num; intemp.close();
+ numSeqs += num;
+ m->mothurRemove(tempFile);
+ }
+#else
+ vector<countData*> pDataArray;
+ DWORD dwThreadIdArray[processors-1];
+ HANDLE hThreadArray[processors-1];
+ vector<GroupMap*> copies;
+
+ //Create processor worker threads.
+ for( int i=0; i<processors-1; i++ ){
+ string filename = toString(i) + ".temp";
+
+ GroupMap* copyGroup = new GroupMap();
+ copyGroup->getCopy(groupMap);
+ copies.push_back(copyGroup);
+ vector<string> cGroups = Groups;
+
+ countData* temp = new countData(filename, copyGroup, m, lines[i].start, lines[i].end, groupfile, namefile, cGroups);
+ pDataArray.push_back(temp);
+ processIDS.push_back(i);
+
+ hThreadArray[i] = CreateThread(NULL, 0, MyCountThreadFunction, pDataArray[i], 0, &dwThreadIdArray[i]);
+ }
+
+ string filename = toString(processors-1) + ".temp";
+ numSeqs = driver(lines[processors-1].start, lines[processors-1].end, filename, groupMap);
+
+ //Wait until all threads have terminated.
+ WaitForMultipleObjects(processors-1, hThreadArray, TRUE, INFINITE);
+
+ //Close all thread handles and free memory allocations.
+ for(int i=0; i < pDataArray.size(); i++){
+ numSeqs += pDataArray[i]->total;
+ delete copies[i];
+ CloseHandle(hThreadArray[i]);
+ delete pDataArray[i];
+ }
+#endif
+
+ //append output files
+ for(int i=0;i<processIDS.size();i++){
+ m->appendFiles((toString(processIDS[i]) + ".temp"), outputFileName);
+ m->mothurRemove((toString(processIDS[i]) + ".temp"));
+ }
+ m->appendFiles(filename, outputFileName);
+ m->mothurRemove(filename);
+
+
+ //sanity check
+ if (numSeqs != groupMap->getNumSeqs()) {
+ m->mothurOut("[ERROR]: processes reported processing " + toString(numSeqs) + " sequences, but group file indicates you have " + toString(groupMap->getNumSeqs()) + " sequences.");
+ if (processors == 1) { m->mothurOut(" Could you have a file mismatch?\n"); }
+ else { m->mothurOut(" Either you have a file mismatch or a process failed to complete the task assigned to it.\n"); m->control_pressed = true; }
+ }
+
+ return numSeqs;
+ }
+ catch(exception& e) {
+ m->errorOut(e, "CountSeqsCommand", "createProcesses");
+ exit(1);
+ }
+}
+/**************************************************************************************************/
+int CountSeqsCommand::driver(unsigned long long start, unsigned long long end, string outputFileName, GroupMap*& groupMap) {
+ try {
+
+ ofstream out;
+ m->openOutputFile(outputFileName, out);
+
+ ifstream in;
m->openInputFile(namefile, in);
+ in.seekg(start);
- int total = 0;
- while (!in.eof()) {
+ bool done = false;
+ int total = 0;
+ while (!done) {
if (m->control_pressed) { break; }
string firstCol, secondCol;
m->checkName(firstCol);
m->checkName(secondCol);
//cout << firstCol << '\t' << secondCol << endl;
-
+
vector<string> names;
m->splitAtChar(secondCol, names, ',');
}
total += names.size();
+
+#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) || (__linux__) || (__unix__) || (__unix)
+ unsigned long long pos = in.tellg();
+ if ((pos == -1) || (pos >= end)) { break; }
+#else
+ if (in.eof()) { break; }
+#endif
+
}
in.close();
out.close();
-
- if (groupfile != "") { delete groupMap; }
-
+
return total;
+
}
catch(exception& e) {
- m->errorOut(e, "CountSeqsCommand", "processSmall");
+ m->errorOut(e, "CountSeqsCommand", "driver");
exit(1);
}
}
*/
#include "command.hpp"
+#include "groupmap.h"
class CountSeqsCommand : public Command {
private:
+
+ struct linePair {
+ unsigned long long start;
+ unsigned long long end;
+ linePair(unsigned long long i, unsigned long long j) : start(i), end(j) {}
+ };
+
string namefile, groupfile, outputDir, groups;
bool abort, large;
vector<string> Groups, outputNames;
map<int, string> processNameFile(string);
map<int, string> getGroupNames(string, set<string>&);
+ int createProcesses(GroupMap*&, string);
+ int driver(unsigned long long, unsigned long long, string, GroupMap*&);
+
+};
+
+/***********************************************************************/
+struct countData {
+ unsigned long long start;
+ unsigned long long end;
+ MothurOut* m;
+ string outputFileName, namefile, groupfile;
+ GroupMap* groupMap;
+ int total;
+ vector<string> Groups;
+
+ countData(){}
+ countData(string fn, GroupMap* g, MothurOut* mout, unsigned long long st, unsigned long long en, string gfn, string nfn, vector<string> gr) {
+ m = mout;
+ start = st;
+ end = en;
+ groupMap = g;
+ groupfile = gfn;
+ namefile = nfn;
+ outputFileName = fn;
+ Groups = gr;
+ total = 0;
+ }
};
+/**************************************************************************************************/
+#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) || (__linux__) || (__unix__) || (__unix)
+#else
+static DWORD WINAPI MyCountThreadFunction(LPVOID lpParam){
+ countData* pDataArray;
+ pDataArray = (countData*)lpParam;
+ try {
+ ofstream out;
+ pDataArray->m->openOutputFile(pDataArray->outputFileName, out);
+
+ ifstream in;
+ pDataArray->m->openInputFile(pDataArray->namefile, in);
+ in.seekg(pDataArray->start);
+
+ //print header if you are process 0
+ if ((pDataArray->start == 0) || (pDataArray->start == 1)) {
+ in.seekg(0);
+ }else { //this accounts for the difference in line endings.
+ in.seekg(pDataArray->start-1); pDataArray->m->gobble(in);
+ }
+
+ pDataArray->total = 0;
+ for(int i = 0; i < pDataArray->end; i++){ //end is the number of sequences to process
+
+ if (pDataArray->m->control_pressed) { break; }
+
+ string firstCol, secondCol;
+ in >> firstCol; pDataArray->m->gobble(in); in >> secondCol; pDataArray->m->gobble(in);
+ //cout << firstCol << '\t' << secondCol << endl;
+ pDataArray->m->checkName(firstCol);
+ pDataArray->m->checkName(secondCol);
+
+ vector<string> names;
+ pDataArray->m->splitAtChar(secondCol, names, ',');
+
+ if (pDataArray->groupfile != "") {
+ //set to 0
+ map<string, int> groupCounts;
+ int total = 0;
+ for (int i = 0; i < pDataArray->Groups.size(); i++) { groupCounts[pDataArray->Groups[i]] = 0; }
+
+ //get counts for each of the users groups
+ for (int i = 0; i < names.size(); i++) {
+ string group = pDataArray->groupMap->getGroup(names[i]);
+
+ if (group == "not found") { pDataArray->m->mothurOut("[ERROR]: " + names[i] + " is not in your groupfile, please correct."); pDataArray->m->mothurOutEndLine(); }
+ else {
+ map<string, int>::iterator it = groupCounts.find(group);
+
+ //if not found, then this sequence is not from a group we care about
+ if (it != groupCounts.end()) {
+ it->second++;
+ total++;
+ }
+ }
+ }
+
+ if (total != 0) {
+ out << firstCol << '\t' << total << '\t';
+ for (map<string, int>::iterator it = groupCounts.begin(); it != groupCounts.end(); it++) {
+ out << it->second << '\t';
+ }
+ out << endl;
+ }
+ }else {
+ out << firstCol << '\t' << names.size() << endl;
+ }
+
+ pDataArray->total += names.size();
+ }
+ in.close();
+ out.close();
+
+
+ return 0;
+ }
+ catch(exception& e) {
+ pDataArray->m->errorOut(e, "CountSeqsCommand", "MyCountThreadFunction");
+ exit(1);
+ }
+}
+#endif
+
+
+
#endif