]> git.donarmstrong.com Git - rsem.git/commitdiff
Version with Gibbs parallelized
authorBo Li <bli@cs.wisc.edu>
Mon, 25 Apr 2011 18:10:13 +0000 (13:10 -0500)
committerBo Li <bli@cs.wisc.edu>
Mon, 25 Apr 2011 18:10:13 +0000 (13:10 -0500)
Gibbs.cpp
calcCI.cpp
makefile
rsem-calculate-expression

index 5bdfb247aa2db979cc8c3b345eb50b977462a43e..4e7377b679d8544ca3205fbadd7549bee196eac5 100644 (file)
--- a/Gibbs.cpp
+++ b/Gibbs.cpp
@@ -6,6 +6,8 @@
 #include<fstream>
 #include<sstream>
 #include<vector>
+#include<set>
+#include<pthread.h>
 
 #include "boost/random.hpp"
 
 
 using namespace std;
 
+typedef unsigned int seedType;
+typedef boost::mt19937 engine_type;
+typedef boost::gamma_distribution<> distribution_type;
+typedef boost::variate_generator<engine_type&, distribution_type> generator_type;
+typedef boost::uniform_01<engine_type> uniform_generator;
+
+struct Params {
+       int no, nsamples;
+       FILE *fo;
+       boost::mt19937 *rng;
+       double *pme_c, *pme_theta;
+};
+
 struct Item {
        int sid;
        double conprb;
@@ -32,10 +47,12 @@ struct Item {
        }
 };
 
+int nThreads;
+
 int model_type;
 int m, M, N0, N1, nHits;
 double totc;
-int BURNIN, CHAINLEN, GAP;
+int BURNIN, NSAMPLES, GAP;
 char imdName[STRLEN], statName[STRLEN];
 char thetaF[STRLEN], ofgF[STRLEN], groupF[STRLEN], refF[STRLEN], modelF[STRLEN];
 char cvsF[STRLEN];
@@ -45,16 +62,18 @@ GroupInfo gi;
 
 vector<double> theta, pme_theta, pme_c, eel;
 
-vector<int> s, z;
+vector<int> s;
 vector<Item> hits;
 vector<int> counts;
 
 bool quiet;
 
-vector<double> arr;
-
-boost::mt19937 rng(time(NULL));
-boost::uniform_01<boost::mt19937> rg(rng);
+engine_type engine(time(NULL));
+Params *params;
+pthread_t *threads;
+pthread_attr_t attr;
+void *status;
+int rc;
 
 void load_data(char* reference_name, char* statName, char* imdName) {
        ifstream fin;
@@ -118,14 +137,81 @@ void load_data(char* reference_name, char* statName, char* imdName) {
        N1 = s.size() - 1;
        nHits = hits.size();
 
+       totc = N0 + N1 + (M + 1);
+
        if (verbose) { printf("Loading Data is finished!\n"); }
 }
 
+// assign threads
+void init() {
+       int quotient, left;
+       seedType seed;
+       set<seedType> seedSet;
+       set<seedType>::iterator iter;
+       char outF[STRLEN];
+
+       quotient = NSAMPLES / nThreads;
+       left = NSAMPLES % nThreads;
+
+       sprintf(cvsF, "%s.countvectors", imdName);
+       seedSet.clear();
+       params = new Params[nThreads];
+       threads = new pthread_t[nThreads];
+       for (int i = 0; i < nThreads; i++) {
+               params[i].no = i;
+
+               params[i].nsamples = quotient;
+               if (i < left) params[i].nsamples++;
+
+               if (i == 0) {
+                       params[i].fo = fopen(cvsF, "w");
+                       fprintf(params[i].fo, "%d %d\n", NSAMPLES, M + 1);
+               }
+               else {
+                       sprintf(outF, "%s%d", cvsF, i);
+                       params[i].fo = fopen(outF, "w");
+               }
+
+               do {
+                       seed = engine();
+                       iter = seedSet.find(seed);
+               } while (iter != seedSet.end());
+               params[i].rng = new engine_type(seed);
+               seedSet.insert(seed);
+
+               params[i].pme_c = new double[M + 1];
+               memset(params[i].pme_c, 0, sizeof(double) * (M + 1));
+               params[i].pme_theta = new double[M + 1];
+               memset(params[i].pme_theta, 0, sizeof(double) * (M + 1));
+       }
+
+       /* set thread attribute to be joinable */
+       pthread_attr_init(&attr);
+       pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
+
+}
+
+void sampleTheta(engine_type& engine, vector<double>& theta) {
+       distribution_type gm(1);
+       generator_type gmg(engine, gm);
+       double denom;
+
+       theta.clear();
+       theta.resize(M + 1);
+       denom = 0.0;
+       for (int i = 0; i <= M; i++) {
+               theta[i] = gmg();
+               denom += theta[i];
+       }
+       assert(denom > EPSILON);
+       for (int i = 0; i <= M; i++) theta[i] /= denom;
+}
+
 // arr should be cumulative!
 // interval : [,)
 // random number should be in [0, arr[len - 1])
 // If by chance arr[len - 1] == 0.0, one possibility is to sample uniformly from 0...len-1
-int sample(vector<double>& arr, int len) {
+int sample(uniform_generator& rg, vector<double>& arr, int len) {
   int l, r, mid;
   double prb = rg() * arr[len - 1];
 
@@ -142,14 +228,39 @@ int sample(vector<double>& arr, int len) {
   return l;
 }
 
-void init() {
+void writeForSee(FILE* fo, vector<int>& counts) {
+  for (int i = 1; i < M; i++) fprintf(fo, "%d ", counts[i]);
+  fprintf(fo, "%d\n", counts[M]);
+}
+
+void writeCountVector(FILE* fo, vector<int>& counts) {
+       for (int i = 0; i < M; i++) {
+               fprintf(fo, "%d ", counts[i]);
+       }
+       fprintf(fo, "%d\n", counts[M]);
+}
+
+void* Gibbs(void* arg) {
        int len, fr, to;
+       int CHAINLEN;
+       Params *params = (Params*)arg;
+
+       vector<double> theta;
+       vector<int> z, counts;
+       vector<double> arr;
+
+       uniform_generator rg(*params->rng);
+
+       sampleTheta(*params->rng, theta);
 
+
+       // generate initial state
        arr.clear();
-       z.clear();
-       counts.clear();
 
+       z.clear();
        z.resize(N1);
+
+       counts.clear();
        counts.resize(M + 1, 1); // 1 pseudo count
        counts[0] += N0;
 
@@ -161,35 +272,17 @@ void init() {
                        arr[j - fr] = theta[hits[j].sid] * hits[j].conprb;
                        if (j > fr) arr[j - fr] += arr[j - fr - 1];  // cumulative
                }
-               z[i] = hits[fr + sample(arr, len)].sid;
+               z[i] = hits[fr + sample(rg, arr, len)].sid;
                ++counts[z[i]];
        }
 
-       totc = N0 + N1 + (M + 1);
-
-       if (verbose) { printf("Initialization is finished!\n"); }
-}
-
-void writeCountVector(FILE* fo) {
-       for (int i = 0; i < M; i++) {
-               fprintf(fo, "%d ", counts[i]);
-       }
-       fprintf(fo, "%d\n", counts[M]);
-}
-
-void Gibbs(char* imdName) {
-       FILE *fo;
-       int fr, to, len;
+       // Gibbs sampling
+    char seeF[STRLEN];
+       sprintf(seeF, "%s.see", statName);
+       FILE *fsee = fopen(seeF, "w");
 
-       sprintf(cvsF, "%s.countvectors", imdName);
-       fo = fopen(cvsF, "w");
-       assert(CHAINLEN % GAP == 0);
-       fprintf(fo, "%d %d\n", CHAINLEN / GAP, M + 1);
-       //fprintf(fo, "%d %d\n", CHAINLEN, M + 1);
-
-       pme_c.clear(); pme_c.resize(M + 1, 0.0);
-       pme_theta.clear(); pme_theta.resize(M + 1, 0.0);
-       for (int ROUND = 1; ROUND <= BURNIN + CHAINLEN; ROUND++) {
+       CHAINLEN = 1 + (params->nsamples - 1) * GAP;
+       for (int ROUND = 1; ROUND <= BURNIN + CHAINLEN  ; ROUND++) {
 
                for (int i = 0; i < N1; i++) {
                        --counts[z[i]];
@@ -199,28 +292,68 @@ void Gibbs(char* imdName) {
                                arr[j - fr] = counts[hits[j].sid] * hits[j].conprb;
                                if (j > fr) arr[j - fr] += arr[j - fr - 1]; //cumulative
                        }
-                       z[i] = hits[fr + sample(arr, len)].sid;
+                       z[i] = hits[fr + sample(rg, arr, len)].sid;
                        ++counts[z[i]];
                }
 
+               writeForSee(fsee, counts);
+
                if (ROUND > BURNIN) {
-                       if ((ROUND - BURNIN - 1) % GAP == 0) writeCountVector(fo);
-                       for (int i = 0; i <= M; i++) { 
-                         pme_c[i] += counts[i] - 1;
-                         pme_theta[i] += counts[i] / totc;
+                       if ((ROUND - BURNIN - 1) % GAP == 0) writeCountVector(params->fo, counts);
+                       for (int i = 0; i <= M; i++) {
+                               params->pme_c[i] += counts[i] - 1;
+                               params->pme_theta[i] += counts[i] / totc;
                        }
                }
 
-               if (verbose) { printf("ROUND %d is finished!\n", ROUND); }
+               if (verbose & ROUND % 10 == 0) { printf("Thread %d, ROUND %d is finished!\n", params->no, ROUND); }
        }
-       fclose(fo);
+       fclose(fsee);
+
+       return NULL;
+}
+
+void release() {
+       char inpF[STRLEN], command[STRLEN];
+       string line;
+
+       /* destroy attribute */
+       pthread_attr_destroy(&attr);
+       delete[] threads;
+
+       pme_c.clear(); pme_c.resize(M + 1, 0.0);
+       pme_theta.clear(); pme_theta.resize(M + 1, 0.0);
+       for (int i = 0; i < nThreads; i++) {
+               fclose(params[i].fo);
+               delete params[i].rng;
+               for (int j = 0; j <= M; j++) {
+                       pme_c[j] += params[i].pme_c[j];
+                       pme_theta[j] += params[i].pme_theta[j];
+               }
+               delete[] params[i].pme_c;
+               delete[] params[i].pme_theta;
+       }
+       delete[] params;
 
        for (int i = 0; i <= M; i++) {
-         pme_c[i] /= CHAINLEN;
-         pme_theta[i] /= CHAINLEN;
+               pme_c[i] /= NSAMPLES;
+               pme_theta[i] /= NSAMPLES;
        }
 
-       if (verbose) { printf("Gibbs is finished!\n"); }
+       // combine files
+       FILE *fo = fopen(cvsF, "a");
+       for (int i = 1; i < nThreads; i++) {
+               sprintf(inpF, "%s%d", cvsF, i);
+               ifstream fin(inpF);
+               while (getline(fin, line)) {
+                       fprintf(fo, "%s\n", line.c_str());
+               }
+               fin.close();
+               sprintf(command, "rm -f %s", inpF);
+               int status = system(command);
+               if (status != 0) { fprintf(stderr, "Fail to delete file %s!\n", inpF); exit(-1); }
+       }
+       fclose(fo);
 }
 
 template<class ModelType>
@@ -338,25 +471,42 @@ void writeEstimatedParameters(char* modelF, char* imdName) {
 
 int main(int argc, char* argv[]) {
        if (argc < 7) {
-               printf("Usage: rsem-run-gibbs reference_name sample_name sampleToken BURNIN CHAINLEN GAP [-q]\n");
+               printf("Usage: rsem-run-gibbs reference_name sample_name sampleToken BURNIN NSAMPLES GAP [-p #Threads] [-q]\n");
                exit(-1);
        }
 
        BURNIN = atoi(argv[4]);
-       CHAINLEN = atoi(argv[5]);
+       NSAMPLES = atoi(argv[5]);
        GAP = atoi(argv[6]);
        sprintf(imdName, "%s.temp/%s", argv[2], argv[3]);
        sprintf(statName, "%s.stat/%s", argv[2], argv[3]);
        load_data(argv[1], statName, imdName);
 
+       nThreads = 1;
        quiet = false;
-       if (argc > 7 && !strcmp(argv[7], "-q")) {
-               quiet = true;
+       for (int i = 7; i < argc; i++) {
+               if (!strcmp(argv[i], "-p")) nThreads = atoi(argv[i + 1]);
+               if (!strcmp(argv[i], "-q")) quiet = true;
        }
        verbose = !quiet;
 
+       if (nThreads > NSAMPLES) {
+               nThreads = NSAMPLES;
+               printf("Warning: Number of samples is less than number of threads! Change the number of threads to %d!\n", nThreads);
+       }
+
+       if (verbose) printf("Gibbs started!\n");
        init();
-       Gibbs(imdName);
+       for (int i = 0; i < nThreads; i++) {
+               rc = pthread_create(&threads[i], &attr, Gibbs, (void*)(&params[i]));
+               if (rc != 0) { fprintf(stderr, "Cannot create thread %d! (numbered from 0)\n", i); }
+       }
+       for (int i = 0; i < nThreads; i++) {
+               rc = pthread_join(threads[i], &status);
+               if (rc != 0) { fprintf(stderr, "Cannot join thread %d! (numbered from 0)\n", i); }
+       }
+       release();
+       if (verbose) printf("Gibbs finished!\n");
 
        sprintf(modelF, "%s.model", statName);
        FILE *fi = fopen(modelF, "r");
index c9ccce5909a2be59baabe8b3345232256e8a2da4..f3964d18b8472d65a42b07355041ddeaed82a045 100644 (file)
@@ -382,12 +382,14 @@ int main(int argc, char* argv[]) {
 
        delete[] tau_denoms;
 
+       /*
        sprintf(command, "rm -f %s", tmpF);
        int status = system(command);
        if (status != 0) {
                fprintf(stderr, "Cannot delete %s!\n", tmpF);
                exit(-1);
        }
+        */
 
        return 0;
 }
index 76094590dc81f6396538c4fbee38c335e1596283..97cba95d1e8aad570e1f40d871f2c80b8a054768 100644 (file)
--- a/makefile
+++ b/makefile
@@ -92,7 +92,7 @@ simulation.o : utils.h Read.h SingleRead.h SingleReadQ.h PairedEndRead.h PairedE
        $(CC) $(COFLAGS) simulation.cpp
 
 rsem-run-gibbs : Gibbs.o
-       $(CC) -o rsem-run-gibbs Gibbs.o 
+       $(CC) -o rsem-run-gibbs Gibbs.o -lpthread
 
 #some header files are omitted
 Gibbs.o : utils.h Model.h SingleModel.h SingleQModel.h PairedEndModel.h PairedEndQModel.h RefSeq.h RefSeqPolicy.h PolyARules.h Refs.h GroupInfo.h boost/random.hpp Gibbs.cpp 
index 746fb7e510006d9bc238bf5678adddac7c3d83c2..68aa2587abcfb96bf6fa8639ebb1876d4e7c4d83 100755 (executable)
@@ -8,7 +8,7 @@ use strict;
 
 #const
 my $BURNIN = 200;
-my $CHAINLEN = 1000;
+my $NSAMPLES = 1000;
 my $SAMPLEGAP = 1;
 my $CONFIDENCE = 0.95;
 my $NSPC = 50;
@@ -285,7 +285,8 @@ if ($genBamF) {
 &collectResults("$imdName.gene_res", "$sampleName.genes.results"); # gene level
 
 if ($calcCI) {
-    $command = $dir."rsem-run-gibbs $refName $sampleName $sampleToken $BURNIN $CHAINLEN $SAMPLEGAP";
+    $command = $dir."rsem-run-gibbs $refName $sampleName $sampleToken $BURNIN $NSAMPLES $SAMPLEGAP";
+    $command .= " -p $nThreads";
     if ($quiet) { $command .= " -q"; }
     print "$command\n";
     $status = system($command);