]> git.donarmstrong.com Git - rsem.git/blobdiff - Gibbs.cpp
rewrote parallelization of calcCI.cpp
[rsem.git] / Gibbs.cpp
index 4e7377b679d8544ca3205fbadd7549bee196eac5..f911da53c3e95370a224b95d64250abb3434bb7c 100644 (file)
--- a/Gibbs.cpp
+++ b/Gibbs.cpp
@@ -1,4 +1,3 @@
-#include<ctime>
 #include<cstdio>
 #include<cstring>
 #include<cstdlib>
@@ -6,12 +5,11 @@
 #include<fstream>
 #include<sstream>
 #include<vector>
-#include<set>
 #include<pthread.h>
 
-#include "boost/random.hpp"
-
 #include "utils.h"
+#include "my_assert.h"
+#include "sampling.h"
 
 #include "Model.h"
 #include "SingleModel.h"
 
 using namespace std;
 
-typedef unsigned int seedType;
-typedef boost::mt19937 engine_type;
-typedef boost::gamma_distribution<> distribution_type;
-typedef boost::variate_generator<engine_type&, distribution_type> generator_type;
-typedef boost::uniform_01<engine_type> uniform_generator;
-
 struct Params {
        int no, nsamples;
        FILE *fo;
-       boost::mt19937 *rng;
-       double *pme_c, *pme_theta;
+       engine_type *engine;
+       double *pme_c, *pve_c; //posterior mean and variance vectors on counts
+       double *pme_theta;
 };
 
+
 struct Item {
        int sid;
        double conprb;
@@ -60,16 +54,18 @@ char cvsF[STRLEN];
 Refs refs;
 GroupInfo gi;
 
-vector<double> theta, pme_theta, pme_c, eel;
-
 vector<int> s;
 vector<Item> hits;
-vector<int> counts;
 
+vector<double> theta;
+
+vector<double> pme_c, pve_c; //global posterior mean and variance vectors on counts
+vector<double> pme_theta, eel;
+
+bool var_opt;
 bool quiet;
 
-engine_type engine(time(NULL));
-Params *params;
+Params *paramsArray;
 pthread_t *threads;
 pthread_attr_t attr;
 void *status;
@@ -93,31 +89,19 @@ void load_data(char* reference_name, char* statName, char* imdName) {
        //load thetaF
        sprintf(thetaF, "%s.theta",statName);
        fin.open(thetaF);
-       if (!fin.is_open()) {
-               fprintf(stderr, "Cannot open %s!\n", thetaF);
-               exit(-1);
-       }
+       general_assert(fin.is_open(), "Cannot open " + cstrtos(thetaF) + "!");
        fin>>tmpVal;
-       if (tmpVal != M + 1) {
-               fprintf(stderr, "Number of transcripts is not consistent in %s and %s!\n", refF, thetaF);
-               exit(-1);
-       }
-       theta.clear(); theta.resize(M + 1);
+       general_assert(tmpVal == M + 1, "Number of transcripts is not consistent in " + cstrtos(refF) + " and " + cstrtos(thetaF) + "!");
+       theta.assign(M + 1, 0);
        for (int i = 0; i <= M; i++) fin>>theta[i];
        fin.close();
 
        //load ofgF;
        sprintf(ofgF, "%s.ofg", imdName);
        fin.open(ofgF);
-       if (!fin.is_open()) {
-               fprintf(stderr, "Cannot open %s!\n", ofgF);
-               exit(-1);
-       }
+       general_assert(fin.is_open(), "Cannot open " + cstrtos(ofgF) + "!");
        fin>>tmpVal>>N0;
-       if (tmpVal != M) {
-               fprintf(stderr, "M in %s is not consistent with %s!\n", ofgF, refF);
-               exit(-1);
-       }
+       general_assert(tmpVal == M, "M in " + cstrtos(ofgF) + " is not consistent with " + cstrtos(refF) + "!");
        getline(fin, line);
 
        s.clear(); hits.clear();
@@ -145,59 +129,56 @@ void load_data(char* reference_name, char* statName, char* imdName) {
 // assign threads
 void init() {
        int quotient, left;
-       seedType seed;
-       set<seedType> seedSet;
-       set<seedType>::iterator iter;
        char outF[STRLEN];
+       char splitF[STRLEN];
 
        quotient = NSAMPLES / nThreads;
        left = NSAMPLES % nThreads;
 
        sprintf(cvsF, "%s.countvectors", imdName);
-       seedSet.clear();
-       params = new Params[nThreads];
+       paramsArray = new Params[nThreads];
        threads = new pthread_t[nThreads];
+
        for (int i = 0; i < nThreads; i++) {
-               params[i].no = i;
+               paramsArray[i].no = i;
 
-               params[i].nsamples = quotient;
-               if (i < left) params[i].nsamples++;
+               paramsArray[i].nsamples = quotient;
+               if (i < left) paramsArray[i].nsamples++;
 
-               if (i == 0) {
-                       params[i].fo = fopen(cvsF, "w");
-                       fprintf(params[i].fo, "%d %d\n", NSAMPLES, M + 1);
-               }
-               else {
-                       sprintf(outF, "%s%d", cvsF, i);
-                       params[i].fo = fopen(outF, "w");
-               }
+               sprintf(outF, "%s%d", cvsF, i);
+               paramsArray[i].fo = fopen(outF, "w");
 
-               do {
-                       seed = engine();
-                       iter = seedSet.find(seed);
-               } while (iter != seedSet.end());
-               params[i].rng = new engine_type(seed);
-               seedSet.insert(seed);
-
-               params[i].pme_c = new double[M + 1];
-               memset(params[i].pme_c, 0, sizeof(double) * (M + 1));
-               params[i].pme_theta = new double[M + 1];
-               memset(params[i].pme_theta, 0, sizeof(double) * (M + 1));
+               paramsArray[i].engine = engineFactory::new_engine();
+               paramsArray[i].pme_c = new double[M + 1];
+               memset(paramsArray[i].pme_c, 0, sizeof(double) * (M + 1));
+               paramsArray[i].pve_c = new double[M + 1];
+               memset(paramsArray[i].pve_c, 0, sizeof(double) * (M + 1));
+               paramsArray[i].pme_theta = new double[M + 1];
+               memset(paramsArray[i].pme_theta, 0, sizeof(double) * (M + 1));
        }
 
+       // output task splitting information
+       sprintf(splitF, "%s.split", imdName);
+       FILE *fo = fopen(splitF, "w");
+       fprintf(fo, "%d", nThreads);
+       for (int i = 0; i < nThreads; i++) fprintf(fo, " %d", paramsArray[i].nsamples);
+       fprintf(fo, "\n");
+       fclose(fo);
+
        /* set thread attribute to be joinable */
        pthread_attr_init(&attr);
        pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
 
+       if (verbose) { printf("Initialization finished!"); }
 }
 
+//sample theta from Dir(1)
 void sampleTheta(engine_type& engine, vector<double>& theta) {
-       distribution_type gm(1);
-       generator_type gmg(engine, gm);
+       gamma_dist gm(1);
+       gamma_generator gmg(engine, gm);
        double denom;
 
-       theta.clear();
-       theta.resize(M + 1);
+       theta.assign(M + 1, 0);
        denom = 0.0;
        for (int i = 0; i <= M; i++) {
                theta[i] = gmg();
@@ -207,32 +188,6 @@ void sampleTheta(engine_type& engine, vector<double>& theta) {
        for (int i = 0; i <= M; i++) theta[i] /= denom;
 }
 
-// arr should be cumulative!
-// interval : [,)
-// random number should be in [0, arr[len - 1])
-// If by chance arr[len - 1] == 0.0, one possibility is to sample uniformly from 0...len-1
-int sample(uniform_generator& rg, vector<double>& arr, int len) {
-  int l, r, mid;
-  double prb = rg() * arr[len - 1];
-
-  l = 0; r = len - 1;
-  while (l <= r) {
-    mid = (l + r) / 2;
-    if (arr[mid] <= prb) l = mid + 1;
-    else r = mid - 1;
-  }
-
-  if (l >= len) { printf("%d %lf %lf\n", len, arr[len - 1], prb); }
-  assert(l < len);
-
-  return l;
-}
-
-void writeForSee(FILE* fo, vector<int>& counts) {
-  for (int i = 1; i < M; i++) fprintf(fo, "%d ", counts[i]);
-  fprintf(fo, "%d\n", counts[M]);
-}
-
 void writeCountVector(FILE* fo, vector<int>& counts) {
        for (int i = 0; i < M; i++) {
                fprintf(fo, "%d ", counts[i]);
@@ -249,25 +204,20 @@ void* Gibbs(void* arg) {
        vector<int> z, counts;
        vector<double> arr;
 
-       uniform_generator rg(*params->rng);
-
-       sampleTheta(*params->rng, theta);
-
+       uniform01 rg(*params->engine);
 
        // generate initial state
-       arr.clear();
+       sampleTheta(*params->engine, theta);
 
-       z.clear();
-       z.resize(N1);
+       z.assign(N1, 0);
 
-       counts.clear();
-       counts.resize(M + 1, 1); // 1 pseudo count
+       counts.assign(M + 1, 1); // 1 pseudo count
        counts[0] += N0;
 
        for (int i = 0; i < N1; i++) {
                fr = s[i]; to = s[i + 1];
                len = to - fr;
-               arr.resize(len);
+               arr.assign(len, 0);
                for (int j = fr; j < to; j++) {
                        arr[j - fr] = theta[hits[j].sid] * hits[j].conprb;
                        if (j > fr) arr[j - fr] += arr[j - fr - 1];  // cumulative
@@ -277,17 +227,13 @@ void* Gibbs(void* arg) {
        }
 
        // Gibbs sampling
-    char seeF[STRLEN];
-       sprintf(seeF, "%s.see", statName);
-       FILE *fsee = fopen(seeF, "w");
-
        CHAINLEN = 1 + (params->nsamples - 1) * GAP;
-       for (int ROUND = 1; ROUND <= BURNIN + CHAINLEN  ; ROUND++) {
+       for (int ROUND = 1; ROUND <= BURNIN + CHAINLEN; ROUND++) {
 
                for (int i = 0; i < N1; i++) {
                        --counts[z[i]];
                        fr = s[i]; to = s[i + 1]; len = to - fr;
-                       arr.resize(len);
+                       arr.assign(len, 0);
                        for (int j = fr; j < to; j++) {
                                arr[j - fr] = counts[hits[j].sid] * hits[j].conprb;
                                if (j > fr) arr[j - fr] += arr[j - fr - 1]; //cumulative
@@ -296,50 +242,56 @@ void* Gibbs(void* arg) {
                        ++counts[z[i]];
                }
 
-               writeForSee(fsee, counts);
-
                if (ROUND > BURNIN) {
-                       if ((ROUND - BURNIN - 1) % GAP == 0) writeCountVector(params->fo, counts);
-                       for (int i = 0; i <= M; i++) {
-                               params->pme_c[i] += counts[i] - 1;
-                               params->pme_theta[i] += counts[i] / totc;
+                       if ((ROUND - BURNIN - 1) % GAP == 0) {
+                               writeCountVector(params->fo, counts);
+                               for (int i = 0; i <= M; i++) {
+                                       params->pme_c[i] += counts[i] - 1;
+                                       params->pve_c[i] += (counts[i] - 1) * (counts[i] - 1);
+                                       params->pme_theta[i] += counts[i] / totc;
+                               }
                        }
                }
 
-               if (verbose & ROUND % 10 == 0) { printf("Thread %d, ROUND %d is finished!\n", params->no, ROUND); }
+               if (verbose && ROUND % 100 == 0) { printf("Thread %d, ROUND %d is finished!\n", params->no, ROUND); }
        }
-       fclose(fsee);
 
        return NULL;
 }
 
 void release() {
-       char inpF[STRLEN], command[STRLEN];
+//     char inpF[STRLEN], command[STRLEN];
        string line;
 
        /* destroy attribute */
        pthread_attr_destroy(&attr);
        delete[] threads;
 
-       pme_c.clear(); pme_c.resize(M + 1, 0.0);
-       pme_theta.clear(); pme_theta.resize(M + 1, 0.0);
+       pme_c.assign(M + 1, 0);
+       pve_c.assign(M + 1, 0);
+       pme_theta.assign(M + 1, 0);
        for (int i = 0; i < nThreads; i++) {
-               fclose(params[i].fo);
-               delete params[i].rng;
+               fclose(paramsArray[i].fo);
+               delete paramsArray[i].engine;
                for (int j = 0; j <= M; j++) {
-                       pme_c[j] += params[i].pme_c[j];
-                       pme_theta[j] += params[i].pme_theta[j];
+                       pme_c[j] += paramsArray[i].pme_c[j];
+                       pve_c[j] += paramsArray[i].pve_c[j];
+                       pme_theta[j] += paramsArray[i].pme_theta[j];
                }
-               delete[] params[i].pme_c;
-               delete[] params[i].pme_theta;
+               delete[] paramsArray[i].pme_c;
+               delete[] paramsArray[i].pve_c;
+               delete[] paramsArray[i].pme_theta;
        }
-       delete[] params;
+       delete[] paramsArray;
+
 
        for (int i = 0; i <= M; i++) {
                pme_c[i] /= NSAMPLES;
+               pve_c[i] = (pve_c[i] - NSAMPLES * pme_c[i] * pme_c[i]) / (NSAMPLES - 1);
                pme_theta[i] /= NSAMPLES;
        }
 
+       /*
        // combine files
        FILE *fo = fopen(cvsF, "a");
        for (int i = 1; i < nThreads; i++) {
@@ -351,41 +303,41 @@ void release() {
                fin.close();
                sprintf(command, "rm -f %s", inpF);
                int status = system(command);
-               if (status != 0) { fprintf(stderr, "Fail to delete file %s!\n", inpF); exit(-1); }
+               general_assert(status == 0, "Fail to delete file " + cstrtos(inpF) + "!");
        }
        fclose(fo);
+       */
 }
 
 template<class ModelType>
 void calcExpectedEffectiveLengths(ModelType& model) {
-  int lb, ub, span;
-  double *pdf = NULL, *cdf = NULL, *clen = NULL; // clen[i] = sigma_{j=1}^{i}pdf[i]*(lb+i)
+       int lb, ub, span;
+       double *pdf = NULL, *cdf = NULL, *clen = NULL; // clen[i] = \sigma_{j=1}^{i}pdf[i]*(lb+i)
   
-  model.getGLD().copyTo(pdf, cdf, lb, ub, span);
-  clen = new double[span + 1];
-  clen[0] = 0.0;
-  for (int i = 1; i <= span; i++) {
-    clen[i] = clen[i - 1] + pdf[i] * (lb + i);
-  }
-
-  eel.clear();
-  eel.resize(M + 1, 0.0);
-  for (int i = 1; i <= M; i++) {
-    int totLen = refs.getRef(i).getTotLen();
-    int fullLen = refs.getRef(i).getFullLen();
-    int pos1 = max(min(totLen - fullLen + 1, ub) - lb, 0);
-    int pos2 = max(min(totLen, ub) - lb, 0);
-
-    if (pos2 == 0) { eel[i] = 0.0; continue; }
+       model.getGLD().copyTo(pdf, cdf, lb, ub, span);
+       clen = new double[span + 1];
+       clen[0] = 0.0;
+       for (int i = 1; i <= span; i++) {
+               clen[i] = clen[i - 1] + pdf[i] * (lb + i);
+       }
+
+       eel.assign(M + 1, 0.0);
+       for (int i = 1; i <= M; i++) {
+               int totLen = refs.getRef(i).getTotLen();
+               int fullLen = refs.getRef(i).getFullLen();
+               int pos1 = max(min(totLen - fullLen + 1, ub) - lb, 0);
+               int pos2 = max(min(totLen, ub) - lb, 0);
+
+               if (pos2 == 0) { eel[i] = 0.0; continue; }
     
-    eel[i] = fullLen * cdf[pos1] + ((cdf[pos2] - cdf[pos1]) * (totLen + 1) - (clen[pos2] - clen[pos1]));
-    assert(eel[i] >= 0);
-    if (eel[i] < MINEEL) { eel[i] = 0.0; }
-  }
+               eel[i] = fullLen * cdf[pos1] + ((cdf[pos2] - cdf[pos1]) * (totLen + 1) - (clen[pos2] - clen[pos1]));
+               assert(eel[i] >= 0);
+               if (eel[i] < MINEEL) { eel[i] = 0.0; }
+       }
   
-  delete[] pdf;
-  delete[] cdf;
-  delete[] clen;
+       delete[] pdf;
+       delete[] cdf;
+       delete[] clen;
 }
 
 template<class ModelType>
@@ -403,7 +355,9 @@ void writeEstimatedParameters(char* modelF, char* imdName) {
        for (int i = 1; i <= M; i++)
          if (eel[i] < EPSILON) pme_theta[i] = 0.0;
          else denom += pme_theta[i];
-       if (denom <= 0) { fprintf(stderr, "No Expected Effective Length is no less than %.6g?!\n", MINEEL); exit(-1); }
+
+       general_assert(denom >= EPSILON, "No Expected Effective Length is no less than " + ftos(MINEEL, 6) + "?!");
+
        for (int i = 0; i <= M; i++) pme_theta[i] /= denom;
 
        denom = 0.0;
@@ -425,8 +379,9 @@ void writeEstimatedParameters(char* modelF, char* imdName) {
            tau[i] = pme_theta[i] / eel[i];
            denom += tau[i];
          }
-       if (denom <= 0) { fprintf(stderr, "No alignable reads?!\n"); exit(-1); }
-       //assert(denom > 0);
+
+       general_assert(denom >= EPSILON, "No alignable reads?!");
+
        for (int i = 1; i <= M; i++) {
                tau[i] /= denom;
        }
@@ -434,17 +389,20 @@ void writeEstimatedParameters(char* modelF, char* imdName) {
        //isoform level results
        sprintf(outF, "%s.iso_res", imdName);
        fo = fopen(outF, "a");
-       if (fo == NULL) { fprintf(stderr, "Cannot open %s!\n", outF); exit(-1); }
+       general_assert(fo != NULL, "Cannot open " + cstrtos(outF) + "!");
+
        for (int i = 1; i <= M; i++)
                fprintf(fo, "%.2f%c", pme_c[i], (i < M ? '\t' : '\n'));
        for (int i = 1; i <= M; i++)
                fprintf(fo, "%.15g%c", tau[i], (i < M ? '\t' : '\n'));
+
        fclose(fo);
 
        //gene level results
        sprintf(outF, "%s.gene_res", imdName);
        fo = fopen(outF, "a");
-       if (fo == NULL) { fprintf(stderr, "Cannot open %s!\n", outF); exit(-1); }
+       general_assert(fo != NULL, "Cannot open " + cstrtos(outF) + "!");
+
        for (int i = 0; i < m; i++) {
                double sumC = 0.0; //  sum of pme counts
                int b = gi.spAt(i), e = gi.spAt(i + 1);
@@ -468,10 +426,9 @@ void writeEstimatedParameters(char* modelF, char* imdName) {
        if (verbose) { printf("Gibbs based expression values are written!\n"); }
 }
 
-
 int main(int argc, char* argv[]) {
        if (argc < 7) {
-               printf("Usage: rsem-run-gibbs reference_name sample_name sampleToken BURNIN NSAMPLES GAP [-p #Threads] [-q]\n");
+               printf("Usage: rsem-run-gibbs-multi reference_name sample_name sampleToken BURNIN NSAMPLES GAP [-p #Threads] [--var] [-q]\n");
                exit(-1);
        }
 
@@ -483,35 +440,42 @@ int main(int argc, char* argv[]) {
        load_data(argv[1], statName, imdName);
 
        nThreads = 1;
+       var_opt = false;
        quiet = false;
+
        for (int i = 7; i < argc; i++) {
                if (!strcmp(argv[i], "-p")) nThreads = atoi(argv[i + 1]);
+               if (!strcmp(argv[i], "--var")) var_opt = true;
                if (!strcmp(argv[i], "-q")) quiet = true;
        }
        verbose = !quiet;
 
+       assert(NSAMPLES > 1); // Otherwise, we cannot calculate posterior variance
+
        if (nThreads > NSAMPLES) {
                nThreads = NSAMPLES;
                printf("Warning: Number of samples is less than number of threads! Change the number of threads to %d!\n", nThreads);
        }
 
        if (verbose) printf("Gibbs started!\n");
+
        init();
        for (int i = 0; i < nThreads; i++) {
-               rc = pthread_create(&threads[i], &attr, Gibbs, (void*)(&params[i]));
-               if (rc != 0) { fprintf(stderr, "Cannot create thread %d! (numbered from 0)\n", i); }
+               rc = pthread_create(&threads[i], &attr, Gibbs, (void*)(&paramsArray[i]));
+               pthread_assert(rc, "pthread_create", "Cannot create thread " + itos(i) + " (numbered from 0)!");
        }
        for (int i = 0; i < nThreads; i++) {
                rc = pthread_join(threads[i], &status);
-               if (rc != 0) { fprintf(stderr, "Cannot join thread %d! (numbered from 0)\n", i); }
+               pthread_assert(rc, "pthread_join", "Cannot join thread " + itos(i) + " (numbered from 0)!");
        }
        release();
+
        if (verbose) printf("Gibbs finished!\n");
 
        sprintf(modelF, "%s.model", statName);
        FILE *fi = fopen(modelF, "r");
-       if (fi == NULL) { fprintf(stderr, "Cannot open %s!\n", modelF); exit(-1); }
-       fscanf(fi, "%d", &model_type);
+       general_assert(fi != NULL, "Cannot open " + cstrtos(modelF) + "!");
+       assert(fscanf(fi, "%d", &model_type) == 1);
        fclose(fi);
 
        switch(model_type) {
@@ -521,5 +485,20 @@ int main(int argc, char* argv[]) {
        case 3 : writeEstimatedParameters<PairedEndQModel>(modelF, imdName); break;
        }
 
+       if (var_opt) {
+               char varF[STRLEN];
+
+               sprintf(varF, "%s.var", statName);
+               FILE *fo = fopen(varF, "w");
+               general_assert(fo != NULL, "Cannot open " + cstrtos(varF) + "!");
+               for (int i = 0; i < m; i++) {
+                       int b = gi.spAt(i), e = gi.spAt(i + 1), number_of_isoforms = e - b;
+                       for (int j = b; j < e; j++) {
+                               fprintf(fo, "%s\t%d\t%.15g\t%.15g\n", refs.getRef(j).getName().c_str(), number_of_isoforms, pme_c[j], pve_c[j]);
+                       }
+               }
+               fclose(fo);
+       }
+
        return 0;
 }