]> git.donarmstrong.com Git - rsem.git/commitdiff
roll back to original version plus fixed a small bug in calcCI.cpp
authorBo Li <bli@cs.wisc.edu>
Wed, 8 Feb 2012 16:13:42 +0000 (10:13 -0600)
committerBo Li <bli@cs.wisc.edu>
Wed, 8 Feb 2012 16:13:42 +0000 (10:13 -0600)
Buffer.h
Gibbs.cpp
calcCI.cpp
makefile
rsem-calculate-expression

index 1c5a2d6f5415a901a9ec7ee335207a2644d2d6d6..1ce28bca18d4e21193965886cd92e0d64e5a927e 100644 (file)
--- a/Buffer.h
+++ b/Buffer.h
@@ -3,36 +3,48 @@
 
 #include<cstdio>
 #include<fstream>
+#include<pthread.h>
+
+#include "my_assert.h"
 
 typedef unsigned long long bufsize_type;
 const int FLOATSIZE = sizeof(float);
 
 class Buffer {
 public:
-       Buffer(bufsize_type size, int sp, int nSamples, int cvlen, const char* tmpF) {
+       Buffer(int nMB, int nSamples, int cvlen, const char* tmpF) {
                cpos = 0;
-               this->size = size;
+               size = bufsize_type(nMB) * 1024 * 1024 / FLOATSIZE / cvlen;
+               if (size > (bufsize_type)nSamples) size = nSamples;
+               general_assert(size > 0, "Memory allocated for credibility intervals is not enough!");
+               size *= cvlen;
+
                buffer = new float[size];
                ftmpOut.open(tmpF, std::ios::binary);
+               pthread_mutex_init(&lock, NULL);
 
-               fr = to = sp;
+               fr = to = 0;
                this->nSamples = nSamples;
                this->cvlen = cvlen;
-
        }
 
        ~Buffer() {
                if (fr < to) flushToTempFile();
 
                delete[] buffer;
+               pthread_mutex_destroy(&lock);
                ftmpOut.close();
        }
 
-       void write(float *vec) {
-               if (size - cpos < bufsize_type(cvlen)) flushToTempFile();
-               memcpy(buffer + cpos, vec, FLOATSIZE * cvlen);
-               cpos += cvlen;
-               ++to;
+       void write(int n, float **vecs) {
+               pthread_assert(pthread_mutex_lock(&lock), "pthread_mutex_lock", "Error occurred while acquiring the lock!");
+               for (int i = 0; i < n; i++) {
+                       if (size - cpos < bufsize_type(cvlen)) flushToTempFile();
+                       memcpy(buffer + cpos, vecs[i], FLOATSIZE * cvlen);
+                       cpos += cvlen;
+                       ++to;
+               }
+               pthread_assert(pthread_mutex_unlock(&lock), "pthread_mutex_unlock", "Error occurred while releasing the lock!");
        }
 
 private:
@@ -40,6 +52,7 @@ private:
 
        float *buffer;
        std::ofstream ftmpOut;
+       pthread_mutex_t lock;
 
        int fr, to; // each flush, sample fr .. to - 1
        int nSamples, cvlen;
@@ -49,15 +62,15 @@ private:
                std::streampos gap2 = std::streampos(nSamples - to) * FLOATSIZE;
                float *p = NULL;
 
-               ftmpOut.seekp(0, std::ios::beg);
+               ftmpOut.seekp(0, std::ios_base::beg);
                for (int i = 0; i < cvlen; i++) {
                        p = buffer + i;
-                       ftmpOut.seekp(gap1, std::ios::cur);
+                       ftmpOut.seekp(gap1, std::ios_base::cur);
                        for (int j = fr; j < to; j++) {
                                ftmpOut.write((char*)p, FLOATSIZE);
                                p += cvlen;
                        }
-                       ftmpOut.seekp(gap2, std::ios::cur);
+                       ftmpOut.seekp(gap2, std::ios_base::cur);
                }
 
                cpos = 0;
index f911da53c3e95370a224b95d64250abb3434bb7c..1a084d6b2828731daefe71d8b591d686fa13c19c 100644 (file)
--- a/Gibbs.cpp
+++ b/Gibbs.cpp
@@ -130,7 +130,6 @@ void load_data(char* reference_name, char* statName, char* imdName) {
 void init() {
        int quotient, left;
        char outF[STRLEN];
-       char splitF[STRLEN];
 
        quotient = NSAMPLES / nThreads;
        left = NSAMPLES % nThreads;
@@ -157,14 +156,6 @@ void init() {
                memset(paramsArray[i].pme_theta, 0, sizeof(double) * (M + 1));
        }
 
-       // output task splitting information
-       sprintf(splitF, "%s.split", imdName);
-       FILE *fo = fopen(splitF, "w");
-       fprintf(fo, "%d", nThreads);
-       for (int i = 0; i < nThreads; i++) fprintf(fo, " %d", paramsArray[i].nsamples);
-       fprintf(fo, "\n");
-       fclose(fo);
-
        /* set thread attribute to be joinable */
        pthread_attr_init(&attr);
        pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
index 3ae71157c58a023bb4cd7fe316e26c7b9e572663..89414f9b02f00ba641b8f987845aea8e977ef1b1 100644 (file)
@@ -27,8 +27,6 @@ using namespace std;
 struct Params {
        int no;
        FILE *fi;
-       bufsize_type size;
-       int sp;
        engine_type *engine;
        double *mw;
 };
@@ -117,14 +115,14 @@ void* sample_theta_from_c(void* arg) {
        Params *params = (Params*)arg;
        FILE *fi = params->fi;
        double *mw = params->mw;
-       Buffer buffer(params->size, params->sp, nSamples, cvlen, tmpF);
 
        cvec = new int[cvlen];
        theta = new double[cvlen];
        gammas = new gamma_dist*[cvlen];
        rgs = new gamma_generator*[cvlen];
 
-       float *vec = new float[cvlen];
+       float **vecs = new float*[nSpC];
+       for (int i = 0; i < nSpC; i++) vecs[i] = new float[cvlen];
 
        int cnt = 0;
        while (fscanf(fi, "%d", &cvec[0]) == 1) {
@@ -156,19 +154,20 @@ void* sample_theta_from_c(void* arg) {
 
 
                        sum = 0.0;
-                       vec[0] = theta[0];
+                       vecs[i][0] = theta[0];
                        for (int j = 1; j < cvlen; j++)
                                if (eel[j] >= EPSILON) {
-                                       vec[j] = theta[j] / eel[j];
-                                       sum += vec[j];
+                                       vecs[i][j] = theta[j] / eel[j];
+                                       sum += vecs[i][j];
                                }
                                else assert(theta[j] < EPSILON);
-                       assert(sum >= EPSILON);
-                       for (int j = 1; j < cvlen; j++) vec[j] /= sum;
 
-                       buffer.write(vec);
+                       assert(sum >= EPSILON);
+                       for (int j = 1; j < cvlen; j++) vecs[i][j] /= sum;
                }
 
+               buffer->write(nSpC, vecs);
+
                for (int j = 0; j < cvlen; j++) {
                        delete gammas[j];
                        delete rgs[j];
@@ -182,7 +181,8 @@ void* sample_theta_from_c(void* arg) {
        delete[] gammas;
        delete[] rgs;
 
-       delete[] vec;
+       for (int i = 0; i < nSpC; i++) delete[] vecs[i];
+       delete[] vecs;
 
        return NULL;
 }
@@ -193,20 +193,10 @@ void sample_theta_vectors_from_count_vectors() {
        model.read(modelF);
        calcExpectedEffectiveLengths<ModelType>(model);
 
-       char splitF[STRLEN];
-       bufsize_type buf_maxcv = bufsize_type(nMB) * 1024 * 1024 / FLOATSIZE / cvlen;
-       bufsize_type quotient, left;
-       int sum = 0;
 
-       sprintf(splitF, "%s.split", imdName);
-       FILE *fi = fopen(splitF, "r");
-       int num_threads;
-       assert(fscanf(fi, "%d", &num_threads) == 1);
-       assert(num_threads <= nThreads);
+       int num_threads = min(nThreads, nCV);
 
-       quotient = buf_maxcv / num_threads;
-       assert(quotient > 0);
-       left = buf_maxcv % num_threads;
+       buffer = new Buffer(nMB, nSamples, cvlen, tmpF);
 
        paramsArray = new Params[num_threads];
        threads = new pthread_t[num_threads];
@@ -216,25 +206,10 @@ void sample_theta_vectors_from_count_vectors() {
                paramsArray[i].no = i;
                sprintf(inpF, "%s%d", cvsF, i);
                paramsArray[i].fi = fopen(inpF, "r");
-
-               int num_samples;
-               assert(fscanf(fi, "%d", &num_samples) == 1);
-               num_samples *= nSpC;
-               if (bufsize_type(num_samples) <= quotient) paramsArray[i].size = num_samples;
-               else {
-                       paramsArray[i].size = quotient;
-                       if (left > 0) { ++paramsArray[i].size; --left; }
-               }
-               paramsArray[i].size *=  cvlen * FLOATSIZE;
-               paramsArray[i].sp = sum;
-               sum += num_samples;
-
                paramsArray[i].engine = engineFactory::new_engine();
                paramsArray[i].mw = model.getMW();
        }
 
-       fclose(fi);
-
        /* set thread attribute to be joinable */
        pthread_attr_init(&attr);
        pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
@@ -258,6 +233,8 @@ void sample_theta_vectors_from_count_vectors() {
        }
        delete[] paramsArray;
 
+       delete buffer; // Must delete here, force the content left in the buffer be written into the disk
+
        if (verbose) { printf("Sampling is finished!\n"); }
 }
 
@@ -344,25 +321,25 @@ void* calcCI_batch(void* arg) {
 void calculate_credibility_intervals(char* imdName) {
        FILE *fo;
        char outF[STRLEN];
+       int num_threads = nThreads;
 
        iso_tau = new CIType[M + 1];
        gene_tau = new CIType[m];
 
-       // nThreads must be intact here.
        assert(M > 0);
-       int quotient = M / nThreads;
-       if (quotient < 1) { nThreads = M; quotient = 1; }
+       int quotient = M / num_threads;
+       if (quotient < 1) { num_threads = M; quotient = 1; }
        int cur_gene_id = 0;
        int num_isoforms = 0;
 
        // A just so so strategy for paralleling
-       ciParamsArray = new CIParams[nThreads];
-       for (int i = 0; i < nThreads; i++) {
+       ciParamsArray = new CIParams[num_threads];
+       for (int i = 0; i < num_threads; i++) {
                ciParamsArray[i].no = i;
                ciParamsArray[i].start_gene_id = cur_gene_id;
                num_isoforms = 0;
 
-               while ((m - cur_gene_id > nThreads - i - 1) && (i == nThreads - 1 || num_isoforms < quotient)) {
+               while ((m - cur_gene_id > num_threads - i - 1) && (i == num_threads - 1 || num_isoforms < quotient)) {
                        num_isoforms += gi.spAt(cur_gene_id + 1) - gi.spAt(cur_gene_id);
                        ++cur_gene_id;
                }
@@ -370,18 +347,18 @@ void calculate_credibility_intervals(char* imdName) {
                ciParamsArray[i].end_gene_id = cur_gene_id;
        }
 
-       threads = new pthread_t[nThreads];
+       threads = new pthread_t[num_threads];
 
        /* set thread attribute to be joinable */
        pthread_attr_init(&attr);
        pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
 
        // paralleling
-       for (int i = 0; i < nThreads; i++) {
+       for (int i = 0; i < num_threads; i++) {
                rc = pthread_create(&threads[i], &attr, &calcCI_batch, (void*)(&ciParamsArray[i]));
                pthread_assert(rc, "pthread_create", "Cannot create thread " + itos(i) + " (numbered from 0) in calculate_credibility_intervals!");
        }
-       for (int i = 0; i < nThreads; i++) {
+       for (int i = 0; i < num_threads; i++) {
                rc = pthread_join(threads[i], &status);
                pthread_assert(rc, "pthread_join", "Cannot join thread " + itos(i) + " (numbered from 0) in calculate_credibility_intervals!");
        }
index 211a7ea5bbc86ee39362e3465c54fa6dd05f7304..45ab35d11cee55d44654949830bf4c0b43869d66 100644 (file)
--- a/makefile
+++ b/makefile
@@ -118,6 +118,8 @@ rsem-run-gibbs : Gibbs.o
 Gibbs.o : utils.h my_assert.h boost/random.hpp sampling.h Model.h SingleModel.h SingleQModel.h PairedEndModel.h PairedEndQModel.h RefSeq.h RefSeqPolicy.h PolyARules.h Refs.h GroupInfo.h Gibbs.cpp 
        $(CC) $(COFLAGS) Gibbs.cpp
 
+Buffer.h : my_assert.h
+
 rsem-calculate-credibility-intervals : calcCI.o
        $(CC) -o rsem-calculate-credibility-intervals calcCI.o -lpthread
 
index 2902239eff0513a186c2fc562d8541c99b578a62..00f26cee381fdf03f6028748bda1caa1aa9190ae 100755 (executable)
@@ -545,7 +545,7 @@ Number of bins in the RSPD. Only relevant when '--estimate-rspd' is specified.
 
 =item B<--ci-memory> <int>
 
-Amount of memory (in MB) RSEM is allowed to use for computing credibility intervals. (Default: 1024)
+Maximum size (in memory, MB) of the auxiliary buffer used for computing credibility intervals (CI). Set it larger for a faster CI calculation. However, leaving 2 GB memory free for other usage is recommended. (Default: 1024)
 
 =item B<--keep-intermediate-files>