From 635ca2939cfb1f519f19e9dec072ddd05e9fb450 Mon Sep 17 00:00:00 2001 From: Bo Li Date: Tue, 31 Jan 2012 10:24:31 -0600 Subject: [PATCH] rsem v1.1.16 --- BamConverter.h | 6 +- Buffer.h | 81 +++++++ EM.cpp | 93 ++++---- Gibbs.cpp | 350 +++++++++++++++++++++--------- WHAT_IS_NEW | 8 + calcCI.cpp | 432 ++++++++++++++++++++++---------------- makefile | 18 +- my_assert.h | 105 +++++++++ rsem-calculate-expression | 39 +++- sampling.h | 28 ++- utils.h | 28 --- 11 files changed, 796 insertions(+), 392 deletions(-) create mode 100644 Buffer.h create mode 100644 my_assert.h diff --git a/BamConverter.h b/BamConverter.h index cca0eae..e7253ba 100644 --- a/BamConverter.h +++ b/BamConverter.h @@ -14,6 +14,7 @@ #include "sam_rsem_cvt.h" #include "utils.h" +#include "my_assert.h" #include "bc_aux.h" #include "Transcript.h" #include "Transcripts.h" @@ -44,8 +45,7 @@ private: BamConverter::BamConverter(const char* inpF, const char* outF, const char* chr_list, Transcripts& transcripts) : transcripts(transcripts) { - if (transcripts.getType() != 0) - exitWithError("Genome information is not provided! RSEM cannot convert the transcript bam file!"); + general_assert(transcripts.getType() == 0, "Genome information is not provided! RSEM cannot convert the transcript bam file!"); in = samopen(inpF, "rb", NULL); assert(in != 0); @@ -123,7 +123,7 @@ void BamConverter::convert(bam1_t* b, const Transcript& transcript) { int pos = b->core.pos; int readlen = b->core.l_qseq; - if (readlen == 0) exitWithError("One alignment line has SEQ field as *. RSEM does not support this currently!"); + general_assert(readlen > 0, "One alignment line has SEQ field as *. RSEM does not support this currently!"); iter = refmap.find(transcript.getSeqName()); assert(iter != refmap.end()); diff --git a/Buffer.h b/Buffer.h new file mode 100644 index 0000000..1ce28bc --- /dev/null +++ b/Buffer.h @@ -0,0 +1,81 @@ +#ifndef BUFFER_H_ +#define BUFFER_H_ + +#include +#include +#include + +#include "my_assert.h" + +typedef unsigned long long bufsize_type; +const int FLOATSIZE = sizeof(float); + +class Buffer { +public: + Buffer(int nMB, int nSamples, int cvlen, const char* tmpF) { + cpos = 0; + size = bufsize_type(nMB) * 1024 * 1024 / FLOATSIZE / cvlen; + if (size > (bufsize_type)nSamples) size = nSamples; + general_assert(size > 0, "Memory allocated for credibility intervals is not enough!"); + size *= cvlen; + + buffer = new float[size]; + ftmpOut.open(tmpF, std::ios::binary); + pthread_mutex_init(&lock, NULL); + + fr = to = 0; + this->nSamples = nSamples; + this->cvlen = cvlen; + } + + ~Buffer() { + if (fr < to) flushToTempFile(); + + delete[] buffer; + pthread_mutex_destroy(&lock); + ftmpOut.close(); + } + + void write(int n, float **vecs) { + pthread_assert(pthread_mutex_lock(&lock), "pthread_mutex_lock", "Error occurred while acquiring the lock!"); + for (int i = 0; i < n; i++) { + if (size - cpos < bufsize_type(cvlen)) flushToTempFile(); + memcpy(buffer + cpos, vecs[i], FLOATSIZE * cvlen); + cpos += cvlen; + ++to; + } + pthread_assert(pthread_mutex_unlock(&lock), "pthread_mutex_unlock", "Error occurred while releasing the lock!"); + } + +private: + bufsize_type size, cpos; // cpos : current position + + float *buffer; + std::ofstream ftmpOut; + pthread_mutex_t lock; + + int fr, to; // each flush, sample fr .. to - 1 + int nSamples, cvlen; + + void flushToTempFile() { + std::streampos gap1 = std::streampos(fr) * FLOATSIZE; + std::streampos gap2 = std::streampos(nSamples - to) * FLOATSIZE; + float *p = NULL; + + ftmpOut.seekp(0, std::ios_base::beg); + for (int i = 0; i < cvlen; i++) { + p = buffer + i; + ftmpOut.seekp(gap1, std::ios_base::cur); + for (int j = fr; j < to; j++) { + ftmpOut.write((char*)p, FLOATSIZE); + p += cvlen; + } + ftmpOut.seekp(gap2, std::ios_base::cur); + } + + cpos = 0; + fr = to; + } +}; + +#endif /* BUFFER_H_ */ diff --git a/EM.cpp b/EM.cpp index cf53fc3..b2493b2 100644 --- a/EM.cpp +++ b/EM.cpp @@ -10,6 +10,7 @@ #include #include "utils.h" +#include "my_assert.h" #include "sampling.h" #include "Read.h" @@ -113,12 +114,11 @@ void init(ReadReader **&readers, HitContainer **&hitvs, doubl sprintf(datF, "%s.dat", imdName); fin.open(datF); - if (!fin.is_open()) { fprintf(stderr, "Cannot open %s! It may not exist.\n", datF); exit(-1); } + general_assert(fin.is_open(), "Cannot open " + cstrtos(datF) + "! It may not exist."); fin>>nReads>>nHits>>rt; - if (nReads != N1) { fprintf(stderr, "Number of alignable reads does not match!\n"); exit(-1); } - //assert(nReads == N1); - if (rt != read_type) { fprintf(stderr, "Data file (.dat) does not have the right read type!\n"); exit(-1); } - //assert(rt == read_type); + general_assert(nReads == N1, "Number of alignable reads does not match!"); + general_assert(rt == read_type, "Data file (.dat) does not have the right read type!"); + //A just so so strategy for paralleling nhT = nHits / nThreads; @@ -128,12 +128,12 @@ void init(ReadReader **&readers, HitContainer **&hitvs, doubl ncpvs = new double*[nThreads]; for (int i = 0; i < nThreads; i++) { int ntLeft = nThreads - i - 1; // # of threads left - if (!readers[i]->locate(curnr)) { fprintf(stderr, "Read indices files do not match!\n"); exit(-1); } - //assert(readers[i]->locate(curnr)); + + general_assert(readers[i]->locate(curnr), "Read indices files do not match!"); while (nrLeft > ntLeft && (i == nThreads - 1 || hitvs[i]->getNHits() < nhT)) { - if (!hitvs[i]->read(fin)) { fprintf(stderr, "Cannot read alignments from .dat file!\n"); exit(-1); } - //assert(hitvs[i]->read(fin)); + general_assert(hitvs[i]->read(fin), "Cannot read alignments from .dat file!"); + --nrLeft; if (verbose && nrLeft % 1000000 == 0) { printf("DAT %d reads left!\n", nrLeft); } } @@ -186,12 +186,9 @@ void* E_STEP(void* arg) { memset(countv, 0, sizeof(double) * (M + 1)); for (int i = 0; i < N; i++) { if (needCalcConPrb || updateModel) { - if (!reader->next(read)) { - fprintf(stderr, "Can not load a read!\n"); - exit(-1); - } - //assert(reader->next(read)); + general_assert(reader->next(read), "Can not load a read!"); } + fr = hitv->getSAt(i); to = hitv->getSAt(i + 1); fracs.resize(to - fr + 1); @@ -253,10 +250,8 @@ void* calcConProbs(void* arg) { reader->reset(); for (int i = 0; i < N; i++) { - if (!reader->next(read)) { - fprintf(stderr, "Can not load a read!\n"); - exit(-1); - } + general_assert(reader->next(read), "Can not load a read!"); + fr = hitv->getSAt(i); to = hitv->getSAt(i + 1); @@ -321,8 +316,9 @@ void writeResults(ModelType& model, double* counts) { tau[i] = theta[i] / eel[i]; denom += tau[i]; } - if (denom <= 0) { fprintf(stderr, "No alignable reads?!\n"); exit(-1); } - //assert(denom > 0); + + general_assert(denom > 0, "No alignable reads?!"); + for (int i = 1; i <= M; i++) { tau[i] /= denom; } @@ -465,18 +461,12 @@ void EM() { //E step for (int i = 0; i < nThreads; i++) { rc = pthread_create(&threads[i], &attr, E_STEP, (void*)(&fparams[i])); - if (rc != 0) { - fprintf(stderr, "Cannot create thread %d at ROUND %d! (numbered from 0)", i, ROUND); - pthread_exception(rc); - } + pthread_assert(rc, "pthread_create", "Cannot create thread " + itos(i) + " (numbered from 0) at ROUND " + itos(ROUND) + "!"); } for (int i = 0; i < nThreads; i++) { rc = pthread_join(threads[i], &status); - if (rc != 0) { - fprintf(stderr, "Cannot join thread %d at ROUND %d! (numbered from 0)\n", i, ROUND); - pthread_exception(rc); - } + pthread_assert(rc, "pthread_join", "Cannot join thread " + itos(i) + " (numbered from 0) at ROUND " + itos(ROUND) + "!"); } model.setNeedCalcConPrb(false); @@ -522,17 +512,11 @@ void EM() { if (model.getNeedCalcConPrb()) { for (int i = 0; i < nThreads; i++) { rc = pthread_create(&threads[i], &attr, calcConProbs, (void*)(&fparams[i])); - if (rc != 0) { - fprintf(stderr, "Cannot create thread %d when generate files for Gibbs sampler! (numbered from 0)\n", i); - pthread_exception(rc); - } + pthread_assert(rc, "pthread_create", "Cannot create thread " + itos(i) + " (numbered from 0) when generating files for Gibbs sampler!"); } for (int i = 0; i < nThreads; i++) { rc = pthread_join(threads[i], &status); - if (rc != 0) { - fprintf(stderr, "Cannot join thread %d when generate files for Gibbs sampler! (numbered from 0)\n", i); - pthread_exception(rc); - } + pthread_assert(rc, "pthread_join", "Cannot join thread " + itos(i) + " (numbered from 0) when generating files for Gibbs sampler!"); } } model.setNeedCalcConPrb(false); @@ -578,7 +562,9 @@ void EM() { for (int i = 1; i <= M; i++) if (eel[i] < EPSILON) { theta[i] = 0.0; } else sum += theta[i]; - if (sum < EPSILON) { fprintf(stderr, "No Expected Effective Length is no less than %.6g?!\n", MINEEL); exit(-1); } + + general_assert(sum >= EPSILON, "No Expected Effective Length is no less than" + ftos(MINEEL, 6) + "?!"); + for (int i = 0; i <= M; i++) theta[i] /= sum; //calculate expected weights and counts using learned parameters @@ -586,17 +572,11 @@ void EM() { for (int i = 0; i <= M; i++) probv[i] = theta[i]; for (int i = 0; i < nThreads; i++) { rc = pthread_create(&threads[i], &attr, E_STEP, (void*)(&fparams[i])); - if (rc != 0) { - fprintf(stderr, "Cannot create thread %d when calculate expected weights! (numbered from 0)\n", i); - pthread_exception(rc); - } + pthread_assert(rc, "pthread_create", "Cannot create thread " + itos(i) + " (numbered from 0) when calculating expected weights!"); } for (int i = 0; i < nThreads; i++) { rc = pthread_join(threads[i], &status); - if (rc != 0) { - fprintf(stderr, "Cannot join thread %d! (numbered from 0) when calculate expected weights!\n", i); - pthread_exception(rc); - } + pthread_assert(rc, "pthread_join", "Cannot join thread " + itos(i) + " (numbered from 0) when calculating expected weights!"); } model.setNeedCalcConPrb(false); for (int i = 1; i < nThreads; i++) { @@ -610,7 +590,7 @@ void EM() { pthread_attr_destroy(&attr); //convert theta' to theta - double *mw = model.getMW(); + double *mw = model.getMW(); sum = 0.0; for (int i = 0; i <= M; i++) { theta[i] = (mw[i] < EPSILON ? 0.0 : theta[i] / mw[i]); @@ -634,7 +614,7 @@ void EM() { int local_N; int fr, to, len, id; vector arr; - arr.clear(); + uniform01 rg(engine_type(time(NULL))); if (verbose) printf("Begin to sample reads from their posteriors.\n"); for (int i = 0; i < nThreads; i++) { @@ -643,13 +623,14 @@ void EM() { fr = hitvs[i]->getSAt(j); to = hitvs[i]->getSAt(j + 1); len = to - fr + 1; - arr.resize(len); + arr.assign(len, 0); arr[0] = ncpvs[i][j]; for (int k = fr; k < to; k++) arr[k - fr + 1] = arr[k - fr] + hitvs[i]->getHitAt(k).getConPrb(); - id = (arr[len - 1] < EPSILON ? -1 : sample(arr, len)); // if all entries in arr are 0, let id be -1 + id = (arr[len - 1] < EPSILON ? -1 : sample(rg, arr, len)); // if all entries in arr are 0, let id be -1 for (int k = fr; k < to; k++) hitvs[i]->getHitAt(k).setConPrb(k - fr + 1 == id ? 1.0 : 0.0); } } + if (verbose) printf("Sampling is finished.\n"); } @@ -710,8 +691,8 @@ int main(int argc, char* argv[]) { if (!strcmp(argv[i], "--gibbs-out")) { genGibbsOut = true; } if (!strcmp(argv[i], "--sampling")) { bamSampling = true; } } - if (nThreads <= 0) { fprintf(stderr, "Number of threads should be bigger than 0!\n"); exit(-1); } - //assert(nThreads > 0); + + general_assert(nThreads > 0, "Number of threads should be bigger than 0!"); verbose = !quiet; @@ -728,11 +709,13 @@ int main(int argc, char* argv[]) { sprintf(cntF, "%s.cnt", statName); fin.open(cntF); - if (!fin.is_open()) { fprintf(stderr, "Cannot open %s! It may not exist.\n", cntF); exit(-1); } + + general_assert(fin.is_open(), "Cannot open " + cstrtos(cntF) + "! It may not exist."); + fin>>N0>>N1>>N2>>N_tot; fin.close(); - if (N1 <= 0) { fprintf(stderr, "There are no alignable reads!\n"); exit(-1); } + general_assert(N1 > 0, "There are no alignable reads!"); if (nThreads > N1) nThreads = N1; @@ -743,7 +726,9 @@ int main(int argc, char* argv[]) { sprintf(mparamsF, "%s.mparams", imdName); fin.open(mparamsF); - if (!fin.is_open()) { fprintf(stderr, "Cannot open %s! It may not exist.\n", mparamsF); exit(-1); } + + general_assert(fin.is_open(), "Cannot open " + cstrtos(mparamsF) + "It may not exist."); + fin>> mparams.minL>> mparams.maxL>> mparams.probF; int val; // 0 or 1 , for estRSPD fin>>val; diff --git a/Gibbs.cpp b/Gibbs.cpp index 979860b..1a084d6 100644 --- a/Gibbs.cpp +++ b/Gibbs.cpp @@ -5,8 +5,10 @@ #include #include #include +#include #include "utils.h" +#include "my_assert.h" #include "sampling.h" #include "Model.h" @@ -20,6 +22,15 @@ using namespace std; +struct Params { + int no, nsamples; + FILE *fo; + engine_type *engine; + double *pme_c, *pve_c; //posterior mean and variance vectors on counts + double *pme_theta; +}; + + struct Item { int sid; double conprb; @@ -30,10 +41,12 @@ struct Item { } }; +int nThreads; + int model_type; int m, M, N0, N1, nHits; double totc; -int BURNIN, CHAINLEN, GAP; +int BURNIN, NSAMPLES, GAP; char imdName[STRLEN], statName[STRLEN]; char thetaF[STRLEN], ofgF[STRLEN], groupF[STRLEN], refF[STRLEN], modelF[STRLEN]; char cvsF[STRLEN]; @@ -41,15 +54,22 @@ char cvsF[STRLEN]; Refs refs; GroupInfo gi; -vector theta, pme_theta, pme_c, eel; - -vector s, z; +vector s; vector hits; -vector counts; +vector theta; + +vector pme_c, pve_c; //global posterior mean and variance vectors on counts +vector pme_theta, eel; + +bool var_opt; bool quiet; -vector arr; +Params *paramsArray; +pthread_t *threads; +pthread_attr_t attr; +void *status; +int rc; void load_data(char* reference_name, char* statName, char* imdName) { ifstream fin; @@ -69,31 +89,19 @@ void load_data(char* reference_name, char* statName, char* imdName) { //load thetaF sprintf(thetaF, "%s.theta",statName); fin.open(thetaF); - if (!fin.is_open()) { - fprintf(stderr, "Cannot open %s!\n", thetaF); - exit(-1); - } + general_assert(fin.is_open(), "Cannot open " + cstrtos(thetaF) + "!"); fin>>tmpVal; - if (tmpVal != M + 1) { - fprintf(stderr, "Number of transcripts is not consistent in %s and %s!\n", refF, thetaF); - exit(-1); - } - theta.clear(); theta.resize(M + 1); + general_assert(tmpVal == M + 1, "Number of transcripts is not consistent in " + cstrtos(refF) + " and " + cstrtos(thetaF) + "!"); + theta.assign(M + 1, 0); for (int i = 0; i <= M; i++) fin>>theta[i]; fin.close(); //load ofgF; sprintf(ofgF, "%s.ofg", imdName); fin.open(ofgF); - if (!fin.is_open()) { - fprintf(stderr, "Cannot open %s!\n", ofgF); - exit(-1); - } + general_assert(fin.is_open(), "Cannot open " + cstrtos(ofgF) + "!"); fin>>tmpVal>>N0; - if (tmpVal != M) { - fprintf(stderr, "M in %s is not consistent with %s!\n", ofgF, refF); - exit(-1); - } + general_assert(tmpVal == M, "M in " + cstrtos(ofgF) + " is not consistent with " + cstrtos(refF) + "!"); getline(fin, line); s.clear(); hits.clear(); @@ -113,120 +121,214 @@ void load_data(char* reference_name, char* statName, char* imdName) { N1 = s.size() - 1; nHits = hits.size(); + totc = N0 + N1 + (M + 1); + if (verbose) { printf("Loading Data is finished!\n"); } } +// assign threads void init() { - int len, fr, to; + int quotient, left; + char outF[STRLEN]; - arr.clear(); - z.clear(); - counts.clear(); + quotient = NSAMPLES / nThreads; + left = NSAMPLES % nThreads; - z.resize(N1); - counts.resize(M + 1, 1); // 1 pseudo count - counts[0] += N0; + sprintf(cvsF, "%s.countvectors", imdName); + paramsArray = new Params[nThreads]; + threads = new pthread_t[nThreads]; - for (int i = 0; i < N1; i++) { - fr = s[i]; to = s[i + 1]; - len = to - fr; - arr.resize(len); - for (int j = fr; j < to; j++) { - arr[j - fr] = theta[hits[j].sid] * hits[j].conprb; - if (j > fr) arr[j - fr] += arr[j - fr - 1]; // cumulative - } - z[i] = hits[fr + sample(arr, len)].sid; - ++counts[z[i]]; + for (int i = 0; i < nThreads; i++) { + paramsArray[i].no = i; + + paramsArray[i].nsamples = quotient; + if (i < left) paramsArray[i].nsamples++; + + sprintf(outF, "%s%d", cvsF, i); + paramsArray[i].fo = fopen(outF, "w"); + + paramsArray[i].engine = engineFactory::new_engine(); + paramsArray[i].pme_c = new double[M + 1]; + memset(paramsArray[i].pme_c, 0, sizeof(double) * (M + 1)); + paramsArray[i].pve_c = new double[M + 1]; + memset(paramsArray[i].pve_c, 0, sizeof(double) * (M + 1)); + paramsArray[i].pme_theta = new double[M + 1]; + memset(paramsArray[i].pme_theta, 0, sizeof(double) * (M + 1)); } - totc = N0 + N1 + (M + 1); + /* set thread attribute to be joinable */ + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); + + if (verbose) { printf("Initialization finished!"); } +} + +//sample theta from Dir(1) +void sampleTheta(engine_type& engine, vector& theta) { + gamma_dist gm(1); + gamma_generator gmg(engine, gm); + double denom; - if (verbose) { printf("Initialization is finished!\n"); } + theta.assign(M + 1, 0); + denom = 0.0; + for (int i = 0; i <= M; i++) { + theta[i] = gmg(); + denom += theta[i]; + } + assert(denom > EPSILON); + for (int i = 0; i <= M; i++) theta[i] /= denom; } -void writeCountVector(FILE* fo) { +void writeCountVector(FILE* fo, vector& counts) { for (int i = 0; i < M; i++) { fprintf(fo, "%d ", counts[i]); } fprintf(fo, "%d\n", counts[M]); } -void Gibbs(char* imdName) { - FILE *fo; - int fr, to, len; +void* Gibbs(void* arg) { + int len, fr, to; + int CHAINLEN; + Params *params = (Params*)arg; - sprintf(cvsF, "%s.countvectors", imdName); - fo = fopen(cvsF, "w"); - assert(CHAINLEN % GAP == 0); - fprintf(fo, "%d %d\n", CHAINLEN / GAP, M + 1); - //fprintf(fo, "%d %d\n", CHAINLEN, M + 1); + vector theta; + vector z, counts; + vector arr; + + uniform01 rg(*params->engine); + + // generate initial state + sampleTheta(*params->engine, theta); + + z.assign(N1, 0); + + counts.assign(M + 1, 1); // 1 pseudo count + counts[0] += N0; - pme_c.clear(); pme_c.resize(M + 1, 0.0); - pme_theta.clear(); pme_theta.resize(M + 1, 0.0); + for (int i = 0; i < N1; i++) { + fr = s[i]; to = s[i + 1]; + len = to - fr; + arr.assign(len, 0); + for (int j = fr; j < to; j++) { + arr[j - fr] = theta[hits[j].sid] * hits[j].conprb; + if (j > fr) arr[j - fr] += arr[j - fr - 1]; // cumulative + } + z[i] = hits[fr + sample(rg, arr, len)].sid; + ++counts[z[i]]; + } + + // Gibbs sampling + CHAINLEN = 1 + (params->nsamples - 1) * GAP; for (int ROUND = 1; ROUND <= BURNIN + CHAINLEN; ROUND++) { for (int i = 0; i < N1; i++) { --counts[z[i]]; fr = s[i]; to = s[i + 1]; len = to - fr; - arr.resize(len); + arr.assign(len, 0); for (int j = fr; j < to; j++) { arr[j - fr] = counts[hits[j].sid] * hits[j].conprb; if (j > fr) arr[j - fr] += arr[j - fr - 1]; //cumulative } - z[i] = hits[fr + sample(arr, len)].sid; + z[i] = hits[fr + sample(rg, arr, len)].sid; ++counts[z[i]]; } if (ROUND > BURNIN) { - if ((ROUND - BURNIN - 1) % GAP == 0) writeCountVector(fo); - for (int i = 0; i <= M; i++) { - pme_c[i] += counts[i] - 1; - pme_theta[i] += counts[i] / totc; + if ((ROUND - BURNIN - 1) % GAP == 0) { + writeCountVector(params->fo, counts); + for (int i = 0; i <= M; i++) { + params->pme_c[i] += counts[i] - 1; + params->pve_c[i] += (counts[i] - 1) * (counts[i] - 1); + params->pme_theta[i] += counts[i] / totc; + } } } - if (verbose) { printf("ROUND %d is finished!\n", ROUND); } + if (verbose && ROUND % 100 == 0) { printf("Thread %d, ROUND %d is finished!\n", params->no, ROUND); } } - fclose(fo); + + return NULL; +} + +void release() { +// char inpF[STRLEN], command[STRLEN]; + string line; + + /* destroy attribute */ + pthread_attr_destroy(&attr); + delete[] threads; + + pme_c.assign(M + 1, 0); + pve_c.assign(M + 1, 0); + pme_theta.assign(M + 1, 0); + for (int i = 0; i < nThreads; i++) { + fclose(paramsArray[i].fo); + delete paramsArray[i].engine; + for (int j = 0; j <= M; j++) { + pme_c[j] += paramsArray[i].pme_c[j]; + pve_c[j] += paramsArray[i].pve_c[j]; + pme_theta[j] += paramsArray[i].pme_theta[j]; + } + delete[] paramsArray[i].pme_c; + delete[] paramsArray[i].pve_c; + delete[] paramsArray[i].pme_theta; + } + delete[] paramsArray; + for (int i = 0; i <= M; i++) { - pme_c[i] /= CHAINLEN; - pme_theta[i] /= CHAINLEN; + pme_c[i] /= NSAMPLES; + pve_c[i] = (pve_c[i] - NSAMPLES * pme_c[i] * pme_c[i]) / (NSAMPLES - 1); + pme_theta[i] /= NSAMPLES; } - if (verbose) { printf("Gibbs is finished!\n"); } + /* + // combine files + FILE *fo = fopen(cvsF, "a"); + for (int i = 1; i < nThreads; i++) { + sprintf(inpF, "%s%d", cvsF, i); + ifstream fin(inpF); + while (getline(fin, line)) { + fprintf(fo, "%s\n", line.c_str()); + } + fin.close(); + sprintf(command, "rm -f %s", inpF); + int status = system(command); + general_assert(status == 0, "Fail to delete file " + cstrtos(inpF) + "!"); + } + fclose(fo); + */ } template void calcExpectedEffectiveLengths(ModelType& model) { - int lb, ub, span; - double *pdf = NULL, *cdf = NULL, *clen = NULL; // clen[i] = sigma_{j=1}^{i}pdf[i]*(lb+i) + int lb, ub, span; + double *pdf = NULL, *cdf = NULL, *clen = NULL; // clen[i] = \sigma_{j=1}^{i}pdf[i]*(lb+i) - model.getGLD().copyTo(pdf, cdf, lb, ub, span); - clen = new double[span + 1]; - clen[0] = 0.0; - for (int i = 1; i <= span; i++) { - clen[i] = clen[i - 1] + pdf[i] * (lb + i); - } - - eel.clear(); - eel.resize(M + 1, 0.0); - for (int i = 1; i <= M; i++) { - int totLen = refs.getRef(i).getTotLen(); - int fullLen = refs.getRef(i).getFullLen(); - int pos1 = max(min(totLen - fullLen + 1, ub) - lb, 0); - int pos2 = max(min(totLen, ub) - lb, 0); - - if (pos2 == 0) { eel[i] = 0.0; continue; } + model.getGLD().copyTo(pdf, cdf, lb, ub, span); + clen = new double[span + 1]; + clen[0] = 0.0; + for (int i = 1; i <= span; i++) { + clen[i] = clen[i - 1] + pdf[i] * (lb + i); + } + + eel.assign(M + 1, 0.0); + for (int i = 1; i <= M; i++) { + int totLen = refs.getRef(i).getTotLen(); + int fullLen = refs.getRef(i).getFullLen(); + int pos1 = max(min(totLen - fullLen + 1, ub) - lb, 0); + int pos2 = max(min(totLen, ub) - lb, 0); + + if (pos2 == 0) { eel[i] = 0.0; continue; } - eel[i] = fullLen * cdf[pos1] + ((cdf[pos2] - cdf[pos1]) * (totLen + 1) - (clen[pos2] - clen[pos1])); - assert(eel[i] >= 0); - if (eel[i] < MINEEL) { eel[i] = 0.0; } - } + eel[i] = fullLen * cdf[pos1] + ((cdf[pos2] - cdf[pos1]) * (totLen + 1) - (clen[pos2] - clen[pos1])); + assert(eel[i] >= 0); + if (eel[i] < MINEEL) { eel[i] = 0.0; } + } - delete[] pdf; - delete[] cdf; - delete[] clen; + delete[] pdf; + delete[] cdf; + delete[] clen; } template @@ -244,7 +346,9 @@ void writeEstimatedParameters(char* modelF, char* imdName) { for (int i = 1; i <= M; i++) if (eel[i] < EPSILON) pme_theta[i] = 0.0; else denom += pme_theta[i]; - if (denom <= 0) { fprintf(stderr, "No Expected Effective Length is no less than %.6g?!\n", MINEEL); exit(-1); } + + general_assert(denom >= EPSILON, "No Expected Effective Length is no less than " + ftos(MINEEL, 6) + "?!"); + for (int i = 0; i <= M; i++) pme_theta[i] /= denom; denom = 0.0; @@ -266,8 +370,9 @@ void writeEstimatedParameters(char* modelF, char* imdName) { tau[i] = pme_theta[i] / eel[i]; denom += tau[i]; } - if (denom <= 0) { fprintf(stderr, "No alignable reads?!\n"); exit(-1); } - //assert(denom > 0); + + general_assert(denom >= EPSILON, "No alignable reads?!"); + for (int i = 1; i <= M; i++) { tau[i] /= denom; } @@ -275,17 +380,20 @@ void writeEstimatedParameters(char* modelF, char* imdName) { //isoform level results sprintf(outF, "%s.iso_res", imdName); fo = fopen(outF, "a"); - if (fo == NULL) { fprintf(stderr, "Cannot open %s!\n", outF); exit(-1); } + general_assert(fo != NULL, "Cannot open " + cstrtos(outF) + "!"); + for (int i = 1; i <= M; i++) fprintf(fo, "%.2f%c", pme_c[i], (i < M ? '\t' : '\n')); for (int i = 1; i <= M; i++) fprintf(fo, "%.15g%c", tau[i], (i < M ? '\t' : '\n')); + fclose(fo); //gene level results sprintf(outF, "%s.gene_res", imdName); fo = fopen(outF, "a"); - if (fo == NULL) { fprintf(stderr, "Cannot open %s!\n", outF); exit(-1); } + general_assert(fo != NULL, "Cannot open " + cstrtos(outF) + "!"); + for (int i = 0; i < m; i++) { double sumC = 0.0; // sum of pme counts int b = gi.spAt(i), e = gi.spAt(i + 1); @@ -309,33 +417,56 @@ void writeEstimatedParameters(char* modelF, char* imdName) { if (verbose) { printf("Gibbs based expression values are written!\n"); } } - int main(int argc, char* argv[]) { if (argc < 7) { - printf("Usage: rsem-run-gibbs reference_name sample_name sampleToken BURNIN CHAINLEN GAP [-q]\n"); + printf("Usage: rsem-run-gibbs-multi reference_name sample_name sampleToken BURNIN NSAMPLES GAP [-p #Threads] [--var] [-q]\n"); exit(-1); } BURNIN = atoi(argv[4]); - CHAINLEN = atoi(argv[5]); + NSAMPLES = atoi(argv[5]); GAP = atoi(argv[6]); sprintf(imdName, "%s.temp/%s", argv[2], argv[3]); sprintf(statName, "%s.stat/%s", argv[2], argv[3]); load_data(argv[1], statName, imdName); + nThreads = 1; + var_opt = false; quiet = false; - if (argc > 7 && !strcmp(argv[7], "-q")) { - quiet = true; + + for (int i = 7; i < argc; i++) { + if (!strcmp(argv[i], "-p")) nThreads = atoi(argv[i + 1]); + if (!strcmp(argv[i], "--var")) var_opt = true; + if (!strcmp(argv[i], "-q")) quiet = true; } verbose = !quiet; + assert(NSAMPLES > 1); // Otherwise, we cannot calculate posterior variance + + if (nThreads > NSAMPLES) { + nThreads = NSAMPLES; + printf("Warning: Number of samples is less than number of threads! Change the number of threads to %d!\n", nThreads); + } + + if (verbose) printf("Gibbs started!\n"); + init(); - Gibbs(imdName); + for (int i = 0; i < nThreads; i++) { + rc = pthread_create(&threads[i], &attr, Gibbs, (void*)(¶msArray[i])); + pthread_assert(rc, "pthread_create", "Cannot create thread " + itos(i) + " (numbered from 0)!"); + } + for (int i = 0; i < nThreads; i++) { + rc = pthread_join(threads[i], &status); + pthread_assert(rc, "pthread_join", "Cannot join thread " + itos(i) + " (numbered from 0)!"); + } + release(); + + if (verbose) printf("Gibbs finished!\n"); sprintf(modelF, "%s.model", statName); FILE *fi = fopen(modelF, "r"); - if (fi == NULL) { fprintf(stderr, "Cannot open %s!\n", modelF); exit(-1); } - fscanf(fi, "%d", &model_type); + general_assert(fi != NULL, "Cannot open " + cstrtos(modelF) + "!"); + assert(fscanf(fi, "%d", &model_type) == 1); fclose(fi); switch(model_type) { @@ -345,5 +476,20 @@ int main(int argc, char* argv[]) { case 3 : writeEstimatedParameters(modelF, imdName); break; } + if (var_opt) { + char varF[STRLEN]; + + sprintf(varF, "%s.var", statName); + FILE *fo = fopen(varF, "w"); + general_assert(fo != NULL, "Cannot open " + cstrtos(varF) + "!"); + for (int i = 0; i < m; i++) { + int b = gi.spAt(i), e = gi.spAt(i + 1), number_of_isoforms = e - b; + for (int j = b; j < e; j++) { + fprintf(fo, "%s\t%d\t%.15g\t%.15g\n", refs.getRef(j).getName().c_str(), number_of_isoforms, pme_c[j], pve_c[j]); + } + } + fclose(fo); + } + return 0; } diff --git a/WHAT_IS_NEW b/WHAT_IS_NEW index 2a6cc2f..08f9786 100644 --- a/WHAT_IS_NEW +++ b/WHAT_IS_NEW @@ -1,3 +1,10 @@ +RSEM v1.1.16 + +- Added --time option to show time consumed by each phase +- Moved the alignment file out of the temporary folder +- Enabled pthreads for calculating credibility intervals + +-------------------------------------------------------------------------------------------- RSEM v1.1.15 @@ -5,6 +12,7 @@ RSEM v1.1.15 - Modified samtools' Makefile for cygwin. For cygwin users, please uncomment the 4th and 8th lines in sam/Makefile before compiling RSEM -------------------------------------------------------------------------------------------- + RSEM v1.1.14 - Added --chunkmbs option to rsem-calculate-expression (patch contributed by earonesty) diff --git a/calcCI.cpp b/calcCI.cpp index b5a2608..79a6bb2 100644 --- a/calcCI.cpp +++ b/calcCI.cpp @@ -5,10 +5,12 @@ #include #include #include - -#include "boost/random.hpp" +#include +#include #include "utils.h" +#include "my_assert.h" +#include "sampling.h" #include "Model.h" #include "SingleModel.h" @@ -19,14 +21,15 @@ #include "Refs.h" #include "GroupInfo.h" +#include "Buffer.h" using namespace std; -typedef unsigned long bufsize_type; -typedef boost::mt19937 engine_type; -typedef boost::gamma_distribution<> distribution_type; -typedef boost::variate_generator generator_type; - -const int FLOATSIZE = sizeof(float); +struct Params { + int no; + FILE *fi; + engine_type *engine; + double *mw; +}; struct CIType { float lb, ub; // the interval is [lb, ub] @@ -34,28 +37,23 @@ struct CIType { CIType() { lb = ub = 0.0; } }; -bool quiet; +struct CIParams { + int no; + int start_gene_id, end_gene_id; +}; + int model_type; +int nMB; double confidence; +int nCV, nSpC, nSamples; // nCV: number of count vectors; nSpC: number of theta vectors sampled per count vector; nSamples: nCV * nSpC +int nThreads; +int cvlen; -int nC, cvlen, nSpC, nSamples; // nSpC : number of sample theta vectors per count vector -int fr, to; // each flush, sample fr .. to - 1 - -int nMB; -bufsize_type size; -float *buffer; char cvsF[STRLEN], tmpF[STRLEN], command[STRLEN]; -ofstream ftmpOut; -int *cvec; -double *theta; CIType *iso_tau, *gene_tau; -engine_type engine(time(NULL)); -distribution_type **gammas; -generator_type **rgs; - int M, m; Refs refs; GroupInfo gi; @@ -63,160 +61,177 @@ char imdName[STRLEN], statName[STRLEN]; char modelF[STRLEN], groupF[STRLEN], refF[STRLEN]; vector eel; //expected effective lengths -double *tau_denoms; // denominators for tau values -template -void calcExpectedEffectiveLengths(ModelType& model) { - int lb, ub, span; - double *pdf = NULL, *cdf = NULL, *clen = NULL; // clen[i] = sigma_{j=1}^{i}pdf[i]*(lb+i) - - model.getGLD().copyTo(pdf, cdf, lb, ub, span); - clen = new double[span + 1]; - clen[0] = 0.0; - for (int i = 1; i <= span; i++) { - clen[i] = clen[i - 1] + pdf[i] * (lb + i); - } - - eel.clear(); - eel.resize(M + 1, 0.0); - for (int i = 1; i <= M; i++) { - int totLen = refs.getRef(i).getTotLen(); - int fullLen = refs.getRef(i).getFullLen(); - int pos1 = max(min(totLen - fullLen + 1, ub) - lb, 0); - int pos2 = max(min(totLen, ub) - lb, 0); - - if (pos2 == 0) { eel[i] = 0.0; continue; } - - eel[i] = fullLen * cdf[pos1] + ((cdf[pos2] - cdf[pos1]) * (totLen + 1) - (clen[pos2] - clen[pos1])); - assert(eel[i] >= 0); - if (eel[i] < MINEEL) { eel[i] = 0.0; } - } - - delete[] pdf; - delete[] cdf; - delete[] clen; -} +Buffer *buffer; -void flushToTempFile() { - int gap1 = fr * FLOATSIZE; - int gap2 = (nSamples - to) * FLOATSIZE; - float *p = NULL; - - ftmpOut.seekp(0, ios_base::beg); - for (int i = 0; i < cvlen; i++) { - p = buffer + i; - ftmpOut.seekp(gap1, ios_base::cur); - for (int j = fr; j < to; j++) { - ftmpOut.write((char*)p, FLOATSIZE); - p += cvlen; - } - ftmpOut.seekp(gap2, ios_base::cur); - } -} +bool quiet; -template -void sampling() { - float *p, *ub; - ifstream fin(cvsF); - ModelType model; +Params *paramsArray; +pthread_t *threads; +pthread_attr_t attr; +void *status; +int rc; - model.read(modelF); - calcExpectedEffectiveLengths(model); +CIParams *ciParamsArray; - ftmpOut.open(tmpF, ios::binary); +template +void calcExpectedEffectiveLengths(ModelType& model) { + int lb, ub, span; + double *pdf = NULL, *cdf = NULL, *clen = NULL; // clen[i] = \sigma_{j=1}^{i}pdf[i]*(lb+i) + + model.getGLD().copyTo(pdf, cdf, lb, ub, span); + clen = new double[span + 1]; + clen[0] = 0.0; + for (int i = 1; i <= span; i++) { + clen[i] = clen[i - 1] + pdf[i] * (lb + i); + } - fin>>nC>>cvlen; - assert(cvlen = M + 1); + eel.assign(M + 1, 0.0); + for (int i = 1; i <= M; i++) { + int totLen = refs.getRef(i).getTotLen(); + int fullLen = refs.getRef(i).getFullLen(); + int pos1 = max(min(totLen - fullLen + 1, ub) - lb, 0); + int pos2 = max(min(totLen, ub) - lb, 0); - nSamples = nC * nSpC; + if (pos2 == 0) { eel[i] = 0.0; continue; } + + eel[i] = fullLen * cdf[pos1] + ((cdf[pos2] - cdf[pos1]) * (totLen + 1) - (clen[pos2] - clen[pos1])); + assert(eel[i] >= 0); + if (eel[i] < MINEEL) { eel[i] = 0.0; } + } + + delete[] pdf; + delete[] cdf; + delete[] clen; +} - fr = to = 0; +void* sample_theta_from_c(void* arg) { - size = bufsize_type(nMB) * 1024 * 1024 / FLOATSIZE / cvlen; - if (size > (bufsize_type)nSamples) size = nSamples; - size *= cvlen; - buffer = new float[size]; + int *cvec; + double *theta; + gamma_dist **gammas; + gamma_generator **rgs; - ub = buffer + size; - p = buffer; + Params *params = (Params*)arg; + FILE *fi = params->fi; + double *mw = params->mw; cvec = new int[cvlen]; theta = new double[cvlen]; - gammas = new distribution_type*[cvlen]; - rgs = new generator_type*[cvlen]; + gammas = new gamma_dist*[cvlen]; + rgs = new gamma_generator*[cvlen]; - tau_denoms = new double[nSamples]; - memset(tau_denoms, 0, sizeof(double) * nSamples); + float **vecs = new float*[nSpC]; + for (int i = 0; i < nSpC; i++) vecs[i] = new float[cvlen]; - double *mw = model.getMW(); - for (int i = 0; i < nC; i++) { - for (int j = 0; j < cvlen; j++) { - fin>>cvec[j]; - } + int cnt = 0; + while (fscanf(fi, "%d", &cvec[0]) == 1) { + for (int j = 1; j < cvlen; j++) assert(fscanf(fi, "%d", &cvec[j]) == 1); + + ++cnt; for (int j = 0; j < cvlen; j++) { - gammas[j] = new distribution_type(cvec[j]); // need change back before publishing - rgs[j] = new generator_type(engine, *gammas[j]); + gammas[j] = new gamma_dist(cvec[j]); + rgs[j] = new gamma_generator(*(params->engine), *gammas[j]); } - for (int j = 0; j < nSpC; j++) { + for (int i = 0; i < nSpC; i++) { double sum = 0.0; - for (int k = 0; k < cvlen; k++) { - theta[k] = (k == 0 || eel[k] > EPSILON ? (*rgs[k])() : 0.0); - sum += theta[k]; + for (int j = 0; j < cvlen; j++) { + theta[j] = ((j == 0 || eel[j] >= EPSILON) ? (*rgs[j])() : 0.0); + sum += theta[j]; } - assert(sum > 0.0); - for (int k = 0; k < cvlen; k++) theta[k] /= sum; + assert(sum >= EPSILON); + for (int j = 0; j < cvlen; j++) theta[j] /= sum; sum = 0.0; - for (int k = 0; k < cvlen; k++) { - theta[k] = (mw[k] < EPSILON ? 0.0 : theta[k] / mw[k]); - sum += theta[k]; + for (int j = 0; j < cvlen; j++) { + theta[j] = (mw[j] < EPSILON ? 0.0 : theta[j] / mw[j]); + sum += theta[j]; } assert(sum >= EPSILON); - for (int k = 0; k < cvlen; k++) theta[k] /= sum; - - *p = (float)theta[0]; ++p; - assert(1.0 - theta[0] > 0.0); - for (int k = 1; k < cvlen; k++) { - if (eel[k] > EPSILON) { - theta[k] /= (1.0 - theta[0]); - tau_denoms[to] += theta[k] / eel[k]; - } - else { - if (theta[k] != 0.0) { fprintf(stderr, "K=%d Theta_K=%lf\n", k, theta[k]); exit(-1); } + for (int j = 0; j < cvlen; j++) theta[j] /= sum; + + + sum = 0.0; + vecs[i][0] = theta[0]; + for (int j = 1; j < cvlen; j++) + if (eel[j] >= EPSILON) { + vecs[i][j] = theta[j] / eel[j]; + sum += vecs[i][j]; } + else assert(theta[j] < EPSILON); - *p = (float)theta[k]; - ++p; - } - ++to; - if (p == ub) { - flushToTempFile(); - p = buffer; - fr = to; - if (verbose) { printf("%d vectors are sampled!\n", to); } - } + assert(sum >= EPSILON); + for (int j = 1; j < cvlen; j++) vecs[i][j] /= sum; } + buffer->write(nSpC, vecs); + for (int j = 0; j < cvlen; j++) { delete gammas[j]; delete rgs[j]; } - } - if (fr != to) { flushToTempFile(); } - - fin.close(); - ftmpOut.close(); - - delete[] buffer; + if (verbose && cnt % 100 == 0) { printf("Thread %d, %d count vectors are processed!\n", params->no, cnt); } + } delete[] cvec; delete[] theta; delete[] gammas; delete[] rgs; + for (int i = 0; i < nSpC; i++) delete[] vecs[i]; + delete[] vecs; + + return NULL; +} + +template +void sample_theta_vectors_from_count_vectors() { + ModelType model; + model.read(modelF); + calcExpectedEffectiveLengths(model); + + buffer = new Buffer(nMB, nSamples, cvlen, tmpF); + + paramsArray = new Params[nThreads]; + threads = new pthread_t[nThreads]; + + char inpF[STRLEN]; + for (int i = 0; i < nThreads; i++) { + paramsArray[i].no = i; + sprintf(inpF, "%s%d", cvsF, i); + paramsArray[i].fi = fopen(inpF, "r"); + paramsArray[i].engine = engineFactory::new_engine(); + paramsArray[i].mw = model.getMW(); + } + + /* set thread attribute to be joinable */ + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); + + for (int i = 0; i < nThreads; i++) { + rc = pthread_create(&threads[i], &attr, &sample_theta_from_c, (void*)(¶msArray[i])); + pthread_assert(rc, "pthread_create", "Cannot create thread " + itos(i) + " (numbered from 0) in sample_theta_vectors_from_count_vectors!"); + } + for (int i = 0; i < nThreads; i++) { + rc = pthread_join(threads[i], &status); + pthread_assert(rc, "pthread_join", "Cannot join thread " + itos(i) + " (numbered from 0) in sample_theta_vectors_from_count_vectors!"); + } + + /* destroy attribute */ + pthread_attr_destroy(&attr); + delete[] threads; + + for (int i = 0; i < nThreads; i++) { + fclose(paramsArray[i].fi); + delete paramsArray[i].engine; + } + delete[] paramsArray; + + delete buffer; // Must delete here, force the content left in the buffer be written into the disk + if (verbose) { printf("Sampling is finished!\n"); } } @@ -263,45 +278,95 @@ void calcCI(int nSamples, float *samples, float &lb, float &ub) { } while (p <= threshold); } -void generateResults(char* imdName) { +void* calcCI_batch(void* arg) { float *itsamples, *gtsamples; ifstream fin; - FILE *fo; - char outF[STRLEN]; - - iso_tau = new CIType[M + 1]; - gene_tau = new CIType[m]; + CIParams *ciParams = (CIParams*)arg; itsamples = new float[nSamples]; gtsamples = new float[nSamples]; fin.open(tmpF, ios::binary); + streampos pos = streampos(gi.spAt(ciParams->start_gene_id)) * nSamples * FLOATSIZE; + fin.seekg(pos, ios::beg); - //read sampled theta0 values - for (int k = 0; k < nSamples; k++) fin.read((char*)(&itsamples[k]), FLOATSIZE); - - for (int i = 0; i < m; i++) { + int cnt = 0; + for (int i = ciParams->start_gene_id; i < ciParams->end_gene_id; i++) { int b = gi.spAt(i), e = gi.spAt(i + 1); memset(gtsamples, 0, FLOATSIZE * nSamples); for (int j = b; j < e; j++) { for (int k = 0; k < nSamples; k++) { fin.read((char*)(&itsamples[k]), FLOATSIZE); - if (eel[j] > EPSILON && tau_denoms[k] > EPSILON) { itsamples[k] = itsamples[k] / eel[j] / tau_denoms[k]; } - else { - if (itsamples[k] != 0.0) { fprintf(stderr, "Not equal to 0! K=%d, Sampled Theta Value=%lf\n", k, itsamples[k]); exit(-1); } - itsamples[k] = 0.0; - } gtsamples[k] += itsamples[k]; } calcCI(nSamples, itsamples, iso_tau[j].lb, iso_tau[j].ub); } calcCI(nSamples, gtsamples, gene_tau[i].lb, gene_tau[i].ub); - if (verbose && (i + 1) % 1000 == 0) { printf("%d genes are done!\n", i + 1); } + ++cnt; + if (verbose && cnt % 1000 == 0) { printf("In thread %d, %d genes are processed for CI calculation!\n", ciParams->no, cnt); } } fin.close(); + delete[] itsamples; + delete[] gtsamples; + + return NULL; +} + +void calculate_credibility_intervals(char* imdName) { + FILE *fo; + char outF[STRLEN]; + + iso_tau = new CIType[M + 1]; + gene_tau = new CIType[m]; + + assert(M > 0); + int quotient = M / nThreads; + if (quotient < 1) { nThreads = M; quotient = 1; } + int cur_gene_id = 0; + int num_isoforms = 0; + + // A just so so strategy for paralleling + ciParamsArray = new CIParams[nThreads]; + for (int i = 0; i < nThreads; i++) { + ciParamsArray[i].no = i; + ciParamsArray[i].start_gene_id = cur_gene_id; + num_isoforms = 0; + + while ((m - cur_gene_id > nThreads - i - 1) && (i == nThreads - 1 || num_isoforms < quotient)) { + num_isoforms += gi.spAt(cur_gene_id + 1) - gi.spAt(cur_gene_id); + ++cur_gene_id; + } + + ciParamsArray[i].end_gene_id = cur_gene_id; + } + + threads = new pthread_t[nThreads]; + + /* set thread attribute to be joinable */ + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); + + // paralleling + for (int i = 0; i < nThreads; i++) { + rc = pthread_create(&threads[i], &attr, &calcCI_batch, (void*)(&ciParamsArray[i])); + pthread_assert(rc, "pthread_create", "Cannot create thread " + itos(i) + " (numbered from 0) in calculate_credibility_intervals!"); + } + for (int i = 0; i < nThreads; i++) { + rc = pthread_join(threads[i], &status); + pthread_assert(rc, "pthread_join", "Cannot join thread " + itos(i) + " (numbered from 0) in calculate_credibility_intervals!"); + } + + // releasing resources + + /* destroy attribute */ + pthread_attr_destroy(&attr); + delete[] threads; + + delete[] ciParamsArray; + //isoform level results sprintf(outF, "%s.iso_res", imdName); fo = fopen(outF, "a"); @@ -320,46 +385,35 @@ void generateResults(char* imdName) { fprintf(fo, "%.6g%c", gene_tau[i].ub, (i < m - 1 ? '\t' : '\n')); fclose(fo); - delete[] itsamples; - delete[] gtsamples; - delete[] iso_tau; delete[] gene_tau; if (verbose) { printf("All credibility intervals are calculated!\n"); } - - sprintf(outF, "%s.tau_denoms", imdName); - fo = fopen(outF, "w"); - fprintf(fo, "%d\n", nSamples); - for (int i = 0; i < nSamples; i++) fprintf(fo, "%.15g ", tau_denoms[i]); - fprintf(fo, "\n"); - fclose(fo); } int main(int argc, char* argv[]) { - if (argc < 7) { - printf("Usage: rsem-calculate-credibility-intervals reference_name sample_name sampleToken confidence nSpC nMB[-q]\n"); + if (argc < 8) { + printf("Usage: rsem-calculate-credibility-intervals reference_name sample_name sampleToken confidence nCV nSpC nMB [-p #Threads] [-q]\n"); exit(-1); } confidence = atof(argv[4]); - nSpC = atoi(argv[5]); - nMB = atoi(argv[6]); + nCV = atoi(argv[5]); + nSpC = atoi(argv[6]); + nMB = atoi(argv[7]); + nThreads = 1; quiet = false; - if (argc > 7 && !strcmp(argv[7], "-q")) { - quiet = true; + for (int i = 8; i < argc; i++) { + if (!strcmp(argv[i], "-p")) nThreads = atoi(argv[i + 1]); + if (!strcmp(argv[i], "-q")) quiet = true; } verbose = !quiet; - sprintf(imdName, "%s.temp/%s", argv[2], argv[3]); - sprintf(statName, "%s.stat/%s", argv[2], argv[3]); - - sprintf(modelF, "%s.model", statName); - FILE *fi = fopen(modelF, "r"); - if (fi == NULL) { fprintf(stderr, "Cannot open %s!\n", modelF); exit(-1); } - fscanf(fi, "%d", &model_type); - fclose(fi); + if (nThreads > nCV) { + nThreads = nCV; + printf("Warning: Number of count vectors is less than number of threads! Change the number of threads to %d!\n", nThreads); + } sprintf(refF, "%s.seq", argv[1]); refs.loadRefs(refF, 1); @@ -368,19 +422,31 @@ int main(int argc, char* argv[]) { gi.load(groupF); m = gi.getm(); + nSamples = nCV * nSpC; + cvlen = M + 1; + assert(nSamples > 0 && cvlen > 1); // for Buffter.h: (bufsize_type)nSamples + + sprintf(imdName, "%s.temp/%s", argv[2], argv[3]); + sprintf(statName, "%s.stat/%s", argv[2], argv[3]); sprintf(tmpF, "%s.tmp", imdName); sprintf(cvsF, "%s.countvectors", imdName); + sprintf(modelF, "%s.model", statName); + FILE *fi = fopen(modelF, "r"); + general_assert(fi != NULL, "Cannot open " + cstrtos(modelF) + "!"); + assert(fscanf(fi, "%d", &model_type) == 1); + fclose(fi); + + // Phase I switch(model_type) { - case 0 : sampling(); break; - case 1 : sampling(); break; - case 2 : sampling(); break; - case 3 : sampling(); break; + case 0 : sample_theta_vectors_from_count_vectors(); break; + case 1 : sample_theta_vectors_from_count_vectors(); break; + case 2 : sample_theta_vectors_from_count_vectors(); break; + case 3 : sample_theta_vectors_from_count_vectors(); break; } - generateResults(imdName); - - delete[] tau_denoms; + // Phase II + calculate_credibility_intervals(imdName); /* sprintf(command, "rm -f %s", tmpF); diff --git a/makefile b/makefile index 1b2b1b9..45ab35d 100644 --- a/makefile +++ b/makefile @@ -1,7 +1,7 @@ CC = g++ CFLAGS = -Wall -c -I. COFLAGS = -Wall -O3 -ffast-math -c -I. -PROGRAMS = rsem-extract-reference-transcripts rsem-synthesis-reference-transcripts rsem-preref rsem-parse-alignments rsem-build-read-index rsem-run-em rsem-tbam2gbam rsem-run-gibbs rsem-calculate-credibility-intervals rsem-simulate-reads rsem-bam2wig rsem-get-unique rsem-bam2readdepth +PROGRAMS = rsem-extract-reference-transcripts rsem-synthesis-reference-transcripts rsem-preref rsem-parse-alignments rsem-build-read-index rsem-run-em rsem-tbam2gbam rsem-run-gibbs rsem-calculate-credibility-intervals rsem-simulate-reads rsem-bam2wig rsem-get-unique rsem-bam2readdepth all : build-sam $(PROGRAMS) @@ -86,12 +86,12 @@ sampling.h : boost/random.hpp rsem-run-em : EM.o sam/libbam.a $(CC) -o rsem-run-em EM.o sam/libbam.a -lz -lpthread -EM.o : utils.h Read.h SingleRead.h SingleReadQ.h PairedEndRead.h PairedEndReadQ.h SingleHit.h PairedEndHit.h Model.h SingleModel.h SingleQModel.h PairedEndModel.h PairedEndQModel.h Refs.h GroupInfo.h HitContainer.h ReadIndex.h ReadReader.h Orientation.h LenDist.h RSPD.h QualDist.h QProfile.h NoiseQProfile.h ModelParams.h RefSeq.h RefSeqPolicy.h PolyARules.h Profile.h NoiseProfile.h Transcript.h Transcripts.h HitWrapper.h BamWriter.h sam/bam.h sam/sam.h simul.h sam_rsem_aux.h sampling.h boost/random.hpp EM.cpp +EM.o : utils.h my_assert.h Read.h SingleRead.h SingleReadQ.h PairedEndRead.h PairedEndReadQ.h SingleHit.h PairedEndHit.h Model.h SingleModel.h SingleQModel.h PairedEndModel.h PairedEndQModel.h Refs.h GroupInfo.h HitContainer.h ReadIndex.h ReadReader.h Orientation.h LenDist.h RSPD.h QualDist.h QProfile.h NoiseQProfile.h ModelParams.h RefSeq.h RefSeqPolicy.h PolyARules.h Profile.h NoiseProfile.h Transcript.h Transcripts.h HitWrapper.h BamWriter.h sam/bam.h sam/sam.h simul.h sam_rsem_aux.h sampling.h boost/random.hpp EM.cpp $(CC) $(COFLAGS) EM.cpp bc_aux.h : sam/bam.h -BamConverter.h : utils.h sam/sam.h sam/bam.h sam_rsem_aux.h sam_rsem_cvt.h bc_aux.h Transcript.h Transcripts.h +BamConverter.h : utils.h my_assert.h sam/sam.h sam/bam.h sam_rsem_aux.h sam_rsem_cvt.h bc_aux.h Transcript.h Transcripts.h rsem-tbam2gbam : utils.h Transcripts.h Transcript.h bc_aux.h BamConverter.h sam/sam.h sam/bam.h sam/libbam.a sam_rsem_aux.h sam_rsem_cvt.h tbam2gbam.cpp sam/libbam.a $(CC) -O3 -Wall tbam2gbam.cpp sam/libbam.a -lz -o $@ @@ -112,22 +112,24 @@ simulation.o : utils.h Read.h SingleRead.h SingleReadQ.h PairedEndRead.h PairedE $(CC) $(COFLAGS) simulation.cpp rsem-run-gibbs : Gibbs.o - $(CC) -o rsem-run-gibbs Gibbs.o + $(CC) -o rsem-run-gibbs Gibbs.o -lpthread #some header files are omitted -Gibbs.o : utils.h Model.h SingleModel.h SingleQModel.h PairedEndModel.h PairedEndQModel.h RefSeq.h RefSeqPolicy.h PolyARules.h Refs.h GroupInfo.h sampling.h boost/random.hpp Gibbs.cpp +Gibbs.o : utils.h my_assert.h boost/random.hpp sampling.h Model.h SingleModel.h SingleQModel.h PairedEndModel.h PairedEndQModel.h RefSeq.h RefSeqPolicy.h PolyARules.h Refs.h GroupInfo.h Gibbs.cpp $(CC) $(COFLAGS) Gibbs.cpp +Buffer.h : my_assert.h + rsem-calculate-credibility-intervals : calcCI.o - $(CC) -o rsem-calculate-credibility-intervals calcCI.o + $(CC) -o rsem-calculate-credibility-intervals calcCI.o -lpthread #some header files are omitted -calcCI.o : utils.h Model.h SingleModel.h SingleQModel.h PairedEndModel.h PairedEndQModel.h RefSeq.h RefSeqPolicy.h PolyARules.h Refs.h GroupInfo.h calcCI.cpp boost/random.hpp +calcCI.o : utils.h my_assert.h boost/random.hpp sampling.h Model.h SingleModel.h SingleQModel.h PairedEndModel.h PairedEndQModel.h RefSeq.h RefSeqPolicy.h PolyARules.h Refs.h GroupInfo.h Buffer.h calcCI.cpp $(CC) $(COFLAGS) calcCI.cpp rsem-get-unique : sam/bam.h sam/sam.h getUnique.cpp sam/libbam.a $(CC) -O3 -Wall getUnique.cpp sam/libbam.a -lz -o $@ - + clean: rm -f *.o *~ $(PROGRAMS) cd sam ; ${MAKE} clean diff --git a/my_assert.h b/my_assert.h new file mode 100644 index 0000000..74f0299 --- /dev/null +++ b/my_assert.h @@ -0,0 +1,105 @@ +#ifndef MY_ASSERT_H +#define MY_ASSERT_H + +#include +#include +#include +#include +#include +#include +#include + +std::string itos(int i) { + std::ostringstream strout; + strout<$sampleName.time"); - print OUTPUT "Alignment: $time_alignment s.\n"; - print OUTPUT "RSEM: $time_rsem s.\n"; - print OUTPUT "CI: $time_ci s.\n"; + print OUTPUT "Aligning reads: $time_alignment s.\n"; + print OUTPUT "Estimating expression levels: $time_rsem s.\n"; + print OUTPUT "Calculating credibility intervals: $time_ci s.\n"; my $time_del = $time_end - $time_start; - print OUTPUT "Delete: $time_del s.\n"; +# print OUTPUT "Delete: $time_del s.\n"; close(OUTPUT); } @@ -469,7 +470,7 @@ Generate a BAM file, 'sample_name.genome.bam', with alignments mapped to genomic =item B<--sampling-for-bam> When RSEM generates a BAM file, instead of outputing all alignments a read has with their posterior probabilities, one alignment is sampled and outputed according to the posterior probabilities. If the sampling result is that the read comes from the "noise" transcript, nothing is outputed. (Default: off) - + =item B<--calc-ci> Calculate 95% credibility intervals and posterior mean estimates. (Default: off) @@ -550,6 +551,10 @@ Amount of memory (in MB) RSEM is allowed to use for computing credibility interv Keep temporary files generated by RSEM. RSEM creates a temporary directory, 'sample_name.temp', into which it puts all intermediate output files. If this directory already exists, RSEM overwrites all files generated by previous RSEM runs inside of it. By default, after RSEM finishes, the temporary directory is deleted. Set this option to prevent the deletion of this directory and the intermediate files inside of it. (Default: off) +=item B<--time> + +Output time consumed by each step of RSEM to 'sample_name.time'. (Default: off) + =item B<-q/--quiet> Suppress the output of logging information. (Default: off) @@ -647,6 +652,18 @@ indicating the strand of the transcript it aligns to. 'sample_name.genome.sorted.bam' and 'sample_name.genome.sorted.bam.bai' are the sorted BAM file and indices generated by samtools (included in RSEM package). +=item B + +Only generated when the input files are raw reads instead of SAM/BAM format files + +It is the gzipped SAM output produced by bowtie aligner. + +=item B + +Only generated when --time is specified. + +It contains time (in seconds) consumed by aligning reads, estimating expression levels and calculating credibility intervals. + =item B This is a folder instead of a file. All model related statistics are stored in this folder. Use 'rsem-plot-model' can generate plots using this folder. diff --git a/sampling.h b/sampling.h index 4171403..8820090 100644 --- a/sampling.h +++ b/sampling.h @@ -5,17 +5,39 @@ #include #include #include +#include #include "boost/random.hpp" -boost::mt19937 rng(time(NULL)); -boost::uniform_01 rg(rng); +typedef unsigned int seedType; +typedef boost::mt19937 engine_type; +typedef boost::gamma_distribution<> gamma_dist; +typedef boost::uniform_01 uniform01; +typedef boost::variate_generator gamma_generator; + +class engineFactory { +public: + static engine_type *new_engine() { + seedType seed; + static engine_type seedEngine(time(NULL)); + static std::set seedSet; // empty set of seeds + std::set::iterator iter; + + do { + seed = seedEngine(); + iter = seedSet.find(seed); + } while (iter != seedSet.end()); + seedSet.insert(seed); + + return new engine_type(seed); + } +}; // arr should be cumulative! // interval : [,) // random number should be in [0, arr[len - 1]) // If by chance arr[len - 1] == 0.0, one possibility is to sample uniformly from 0...len-1 -int sample(std::vector& arr, int len) { +int sample(uniform01& rg, std::vector& arr, int len) { int l, r, mid; double prb = rg() * arr[len - 1]; diff --git a/utils.h b/utils.h index 4fa0750..278e95e 100644 --- a/utils.h +++ b/utils.h @@ -10,7 +10,6 @@ #include #include #include -#include const int STRLEN = 10005 ; const double EPSILON = 1e-300; @@ -157,31 +156,4 @@ void genReadFileNames(const char* readFN, int tagType, int read_type, int& s, ch } } -void exitWithError(const char* errmsg) { - fprintf(stderr, "%s\n", errmsg); - exit(-1); -} - -void pthread_exception(int rc) { - switch(rc) { - case EAGAIN: - fprintf(stderr, "Error code: EAGAIN. Insufficient resources to create another thread, or a system-imposed limit on the number of threads was encountered.\n"); - break; - case EINVAL: - fprintf(stderr, "Error code: EINVAL. Invalid settings in attr if pthread_create() is called. Or the implementation has detected that the value specified by thread_id does not refer to a joinable thread if pthread_join() is called.\n"); - break; - case EPERM: - fprintf(stderr, "Error code: EPERM. No permission to set the scheduling policy and parameters specified in attr.\n"); - break; - case EDEADLK: - fprintf(stderr, "Error code: EDEADLK. A deadlock was detected (e.g., two threads tried to join with each other); or thread_id specifies the calling thread."); - break; - case ESRCH: - fprintf(stderr, "Error code: ESRCH. No thread with thread_id could be found.\n"); - break; - default: fprintf(stderr, "Unknown error code: %d\n", rc); - } - exit(-1); -} - #endif -- 2.39.2