From 757263aa0ff5367f826468db30d983282e51e7f0 Mon Sep 17 00:00:00 2001 From: Bo Li Date: Wed, 8 Feb 2012 10:13:42 -0600 Subject: [PATCH] roll back to original version plus fixed a small bug in calcCI.cpp --- Buffer.h | 37 +++++++++++++------- Gibbs.cpp | 9 ----- calcCI.cpp | 71 +++++++++++++-------------------------- makefile | 2 ++ rsem-calculate-expression | 2 +- 5 files changed, 52 insertions(+), 69 deletions(-) diff --git a/Buffer.h b/Buffer.h index 1c5a2d6..1ce28bc 100644 --- a/Buffer.h +++ b/Buffer.h @@ -3,36 +3,48 @@ #include #include +#include + +#include "my_assert.h" typedef unsigned long long bufsize_type; const int FLOATSIZE = sizeof(float); class Buffer { public: - Buffer(bufsize_type size, int sp, int nSamples, int cvlen, const char* tmpF) { + Buffer(int nMB, int nSamples, int cvlen, const char* tmpF) { cpos = 0; - this->size = size; + size = bufsize_type(nMB) * 1024 * 1024 / FLOATSIZE / cvlen; + if (size > (bufsize_type)nSamples) size = nSamples; + general_assert(size > 0, "Memory allocated for credibility intervals is not enough!"); + size *= cvlen; + buffer = new float[size]; ftmpOut.open(tmpF, std::ios::binary); + pthread_mutex_init(&lock, NULL); - fr = to = sp; + fr = to = 0; this->nSamples = nSamples; this->cvlen = cvlen; - } ~Buffer() { if (fr < to) flushToTempFile(); delete[] buffer; + pthread_mutex_destroy(&lock); ftmpOut.close(); } - void write(float *vec) { - if (size - cpos < bufsize_type(cvlen)) flushToTempFile(); - memcpy(buffer + cpos, vec, FLOATSIZE * cvlen); - cpos += cvlen; - ++to; + void write(int n, float **vecs) { + pthread_assert(pthread_mutex_lock(&lock), "pthread_mutex_lock", "Error occurred while acquiring the lock!"); + for (int i = 0; i < n; i++) { + if (size - cpos < bufsize_type(cvlen)) flushToTempFile(); + memcpy(buffer + cpos, vecs[i], FLOATSIZE * cvlen); + cpos += cvlen; + ++to; + } + pthread_assert(pthread_mutex_unlock(&lock), "pthread_mutex_unlock", "Error occurred while releasing the lock!"); } private: @@ -40,6 +52,7 @@ private: float *buffer; std::ofstream ftmpOut; + pthread_mutex_t lock; int fr, to; // each flush, sample fr .. to - 1 int nSamples, cvlen; @@ -49,15 +62,15 @@ private: std::streampos gap2 = std::streampos(nSamples - to) * FLOATSIZE; float *p = NULL; - ftmpOut.seekp(0, std::ios::beg); + ftmpOut.seekp(0, std::ios_base::beg); for (int i = 0; i < cvlen; i++) { p = buffer + i; - ftmpOut.seekp(gap1, std::ios::cur); + ftmpOut.seekp(gap1, std::ios_base::cur); for (int j = fr; j < to; j++) { ftmpOut.write((char*)p, FLOATSIZE); p += cvlen; } - ftmpOut.seekp(gap2, std::ios::cur); + ftmpOut.seekp(gap2, std::ios_base::cur); } cpos = 0; diff --git a/Gibbs.cpp b/Gibbs.cpp index f911da5..1a084d6 100644 --- a/Gibbs.cpp +++ b/Gibbs.cpp @@ -130,7 +130,6 @@ void load_data(char* reference_name, char* statName, char* imdName) { void init() { int quotient, left; char outF[STRLEN]; - char splitF[STRLEN]; quotient = NSAMPLES / nThreads; left = NSAMPLES % nThreads; @@ -157,14 +156,6 @@ void init() { memset(paramsArray[i].pme_theta, 0, sizeof(double) * (M + 1)); } - // output task splitting information - sprintf(splitF, "%s.split", imdName); - FILE *fo = fopen(splitF, "w"); - fprintf(fo, "%d", nThreads); - for (int i = 0; i < nThreads; i++) fprintf(fo, " %d", paramsArray[i].nsamples); - fprintf(fo, "\n"); - fclose(fo); - /* set thread attribute to be joinable */ pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); diff --git a/calcCI.cpp b/calcCI.cpp index 3ae7115..89414f9 100644 --- a/calcCI.cpp +++ b/calcCI.cpp @@ -27,8 +27,6 @@ using namespace std; struct Params { int no; FILE *fi; - bufsize_type size; - int sp; engine_type *engine; double *mw; }; @@ -117,14 +115,14 @@ void* sample_theta_from_c(void* arg) { Params *params = (Params*)arg; FILE *fi = params->fi; double *mw = params->mw; - Buffer buffer(params->size, params->sp, nSamples, cvlen, tmpF); cvec = new int[cvlen]; theta = new double[cvlen]; gammas = new gamma_dist*[cvlen]; rgs = new gamma_generator*[cvlen]; - float *vec = new float[cvlen]; + float **vecs = new float*[nSpC]; + for (int i = 0; i < nSpC; i++) vecs[i] = new float[cvlen]; int cnt = 0; while (fscanf(fi, "%d", &cvec[0]) == 1) { @@ -156,19 +154,20 @@ void* sample_theta_from_c(void* arg) { sum = 0.0; - vec[0] = theta[0]; + vecs[i][0] = theta[0]; for (int j = 1; j < cvlen; j++) if (eel[j] >= EPSILON) { - vec[j] = theta[j] / eel[j]; - sum += vec[j]; + vecs[i][j] = theta[j] / eel[j]; + sum += vecs[i][j]; } else assert(theta[j] < EPSILON); - assert(sum >= EPSILON); - for (int j = 1; j < cvlen; j++) vec[j] /= sum; - buffer.write(vec); + assert(sum >= EPSILON); + for (int j = 1; j < cvlen; j++) vecs[i][j] /= sum; } + buffer->write(nSpC, vecs); + for (int j = 0; j < cvlen; j++) { delete gammas[j]; delete rgs[j]; @@ -182,7 +181,8 @@ void* sample_theta_from_c(void* arg) { delete[] gammas; delete[] rgs; - delete[] vec; + for (int i = 0; i < nSpC; i++) delete[] vecs[i]; + delete[] vecs; return NULL; } @@ -193,20 +193,10 @@ void sample_theta_vectors_from_count_vectors() { model.read(modelF); calcExpectedEffectiveLengths(model); - char splitF[STRLEN]; - bufsize_type buf_maxcv = bufsize_type(nMB) * 1024 * 1024 / FLOATSIZE / cvlen; - bufsize_type quotient, left; - int sum = 0; - sprintf(splitF, "%s.split", imdName); - FILE *fi = fopen(splitF, "r"); - int num_threads; - assert(fscanf(fi, "%d", &num_threads) == 1); - assert(num_threads <= nThreads); + int num_threads = min(nThreads, nCV); - quotient = buf_maxcv / num_threads; - assert(quotient > 0); - left = buf_maxcv % num_threads; + buffer = new Buffer(nMB, nSamples, cvlen, tmpF); paramsArray = new Params[num_threads]; threads = new pthread_t[num_threads]; @@ -216,25 +206,10 @@ void sample_theta_vectors_from_count_vectors() { paramsArray[i].no = i; sprintf(inpF, "%s%d", cvsF, i); paramsArray[i].fi = fopen(inpF, "r"); - - int num_samples; - assert(fscanf(fi, "%d", &num_samples) == 1); - num_samples *= nSpC; - if (bufsize_type(num_samples) <= quotient) paramsArray[i].size = num_samples; - else { - paramsArray[i].size = quotient; - if (left > 0) { ++paramsArray[i].size; --left; } - } - paramsArray[i].size *= cvlen * FLOATSIZE; - paramsArray[i].sp = sum; - sum += num_samples; - paramsArray[i].engine = engineFactory::new_engine(); paramsArray[i].mw = model.getMW(); } - fclose(fi); - /* set thread attribute to be joinable */ pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); @@ -258,6 +233,8 @@ void sample_theta_vectors_from_count_vectors() { } delete[] paramsArray; + delete buffer; // Must delete here, force the content left in the buffer be written into the disk + if (verbose) { printf("Sampling is finished!\n"); } } @@ -344,25 +321,25 @@ void* calcCI_batch(void* arg) { void calculate_credibility_intervals(char* imdName) { FILE *fo; char outF[STRLEN]; + int num_threads = nThreads; iso_tau = new CIType[M + 1]; gene_tau = new CIType[m]; - // nThreads must be intact here. assert(M > 0); - int quotient = M / nThreads; - if (quotient < 1) { nThreads = M; quotient = 1; } + int quotient = M / num_threads; + if (quotient < 1) { num_threads = M; quotient = 1; } int cur_gene_id = 0; int num_isoforms = 0; // A just so so strategy for paralleling - ciParamsArray = new CIParams[nThreads]; - for (int i = 0; i < nThreads; i++) { + ciParamsArray = new CIParams[num_threads]; + for (int i = 0; i < num_threads; i++) { ciParamsArray[i].no = i; ciParamsArray[i].start_gene_id = cur_gene_id; num_isoforms = 0; - while ((m - cur_gene_id > nThreads - i - 1) && (i == nThreads - 1 || num_isoforms < quotient)) { + while ((m - cur_gene_id > num_threads - i - 1) && (i == num_threads - 1 || num_isoforms < quotient)) { num_isoforms += gi.spAt(cur_gene_id + 1) - gi.spAt(cur_gene_id); ++cur_gene_id; } @@ -370,18 +347,18 @@ void calculate_credibility_intervals(char* imdName) { ciParamsArray[i].end_gene_id = cur_gene_id; } - threads = new pthread_t[nThreads]; + threads = new pthread_t[num_threads]; /* set thread attribute to be joinable */ pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); // paralleling - for (int i = 0; i < nThreads; i++) { + for (int i = 0; i < num_threads; i++) { rc = pthread_create(&threads[i], &attr, &calcCI_batch, (void*)(&ciParamsArray[i])); pthread_assert(rc, "pthread_create", "Cannot create thread " + itos(i) + " (numbered from 0) in calculate_credibility_intervals!"); } - for (int i = 0; i < nThreads; i++) { + for (int i = 0; i < num_threads; i++) { rc = pthread_join(threads[i], &status); pthread_assert(rc, "pthread_join", "Cannot join thread " + itos(i) + " (numbered from 0) in calculate_credibility_intervals!"); } diff --git a/makefile b/makefile index 211a7ea..45ab35d 100644 --- a/makefile +++ b/makefile @@ -118,6 +118,8 @@ rsem-run-gibbs : Gibbs.o Gibbs.o : utils.h my_assert.h boost/random.hpp sampling.h Model.h SingleModel.h SingleQModel.h PairedEndModel.h PairedEndQModel.h RefSeq.h RefSeqPolicy.h PolyARules.h Refs.h GroupInfo.h Gibbs.cpp $(CC) $(COFLAGS) Gibbs.cpp +Buffer.h : my_assert.h + rsem-calculate-credibility-intervals : calcCI.o $(CC) -o rsem-calculate-credibility-intervals calcCI.o -lpthread diff --git a/rsem-calculate-expression b/rsem-calculate-expression index 2902239..00f26ce 100755 --- a/rsem-calculate-expression +++ b/rsem-calculate-expression @@ -545,7 +545,7 @@ Number of bins in the RSPD. Only relevant when '--estimate-rspd' is specified. =item B<--ci-memory> -Amount of memory (in MB) RSEM is allowed to use for computing credibility intervals. (Default: 1024) +Maximum size (in memory, MB) of the auxiliary buffer used for computing credibility intervals (CI). Set it larger for a faster CI calculation. However, leaving 2 GB memory free for other usage is recommended. (Default: 1024) =item B<--keep-intermediate-files> -- 2.39.2