]> git.donarmstrong.com Git - rsem.git/commitdiff
back to single threaded Gibbs
authorBo Li <bli@cs.wisc.edu>
Mon, 25 Apr 2011 18:19:40 +0000 (13:19 -0500)
committerBo Li <bli@cs.wisc.edu>
Mon, 25 Apr 2011 18:19:40 +0000 (13:19 -0500)
Gibbs.cpp

index 4e7377b679d8544ca3205fbadd7549bee196eac5..5bdfb247aa2db979cc8c3b345eb50b977462a43e 100644 (file)
--- a/Gibbs.cpp
+++ b/Gibbs.cpp
@@ -6,8 +6,6 @@
 #include<fstream>
 #include<sstream>
 #include<vector>
-#include<set>
-#include<pthread.h>
 
 #include "boost/random.hpp"
 
 
 using namespace std;
 
-typedef unsigned int seedType;
-typedef boost::mt19937 engine_type;
-typedef boost::gamma_distribution<> distribution_type;
-typedef boost::variate_generator<engine_type&, distribution_type> generator_type;
-typedef boost::uniform_01<engine_type> uniform_generator;
-
-struct Params {
-       int no, nsamples;
-       FILE *fo;
-       boost::mt19937 *rng;
-       double *pme_c, *pme_theta;
-};
-
 struct Item {
        int sid;
        double conprb;
@@ -47,12 +32,10 @@ struct Item {
        }
 };
 
-int nThreads;
-
 int model_type;
 int m, M, N0, N1, nHits;
 double totc;
-int BURNIN, NSAMPLES, GAP;
+int BURNIN, CHAINLEN, GAP;
 char imdName[STRLEN], statName[STRLEN];
 char thetaF[STRLEN], ofgF[STRLEN], groupF[STRLEN], refF[STRLEN], modelF[STRLEN];
 char cvsF[STRLEN];
@@ -62,18 +45,16 @@ GroupInfo gi;
 
 vector<double> theta, pme_theta, pme_c, eel;
 
-vector<int> s;
+vector<int> s, z;
 vector<Item> hits;
 vector<int> counts;
 
 bool quiet;
 
-engine_type engine(time(NULL));
-Params *params;
-pthread_t *threads;
-pthread_attr_t attr;
-void *status;
-int rc;
+vector<double> arr;
+
+boost::mt19937 rng(time(NULL));
+boost::uniform_01<boost::mt19937> rg(rng);
 
 void load_data(char* reference_name, char* statName, char* imdName) {
        ifstream fin;
@@ -137,81 +118,14 @@ void load_data(char* reference_name, char* statName, char* imdName) {
        N1 = s.size() - 1;
        nHits = hits.size();
 
-       totc = N0 + N1 + (M + 1);
-
        if (verbose) { printf("Loading Data is finished!\n"); }
 }
 
-// assign threads
-void init() {
-       int quotient, left;
-       seedType seed;
-       set<seedType> seedSet;
-       set<seedType>::iterator iter;
-       char outF[STRLEN];
-
-       quotient = NSAMPLES / nThreads;
-       left = NSAMPLES % nThreads;
-
-       sprintf(cvsF, "%s.countvectors", imdName);
-       seedSet.clear();
-       params = new Params[nThreads];
-       threads = new pthread_t[nThreads];
-       for (int i = 0; i < nThreads; i++) {
-               params[i].no = i;
-
-               params[i].nsamples = quotient;
-               if (i < left) params[i].nsamples++;
-
-               if (i == 0) {
-                       params[i].fo = fopen(cvsF, "w");
-                       fprintf(params[i].fo, "%d %d\n", NSAMPLES, M + 1);
-               }
-               else {
-                       sprintf(outF, "%s%d", cvsF, i);
-                       params[i].fo = fopen(outF, "w");
-               }
-
-               do {
-                       seed = engine();
-                       iter = seedSet.find(seed);
-               } while (iter != seedSet.end());
-               params[i].rng = new engine_type(seed);
-               seedSet.insert(seed);
-
-               params[i].pme_c = new double[M + 1];
-               memset(params[i].pme_c, 0, sizeof(double) * (M + 1));
-               params[i].pme_theta = new double[M + 1];
-               memset(params[i].pme_theta, 0, sizeof(double) * (M + 1));
-       }
-
-       /* set thread attribute to be joinable */
-       pthread_attr_init(&attr);
-       pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
-
-}
-
-void sampleTheta(engine_type& engine, vector<double>& theta) {
-       distribution_type gm(1);
-       generator_type gmg(engine, gm);
-       double denom;
-
-       theta.clear();
-       theta.resize(M + 1);
-       denom = 0.0;
-       for (int i = 0; i <= M; i++) {
-               theta[i] = gmg();
-               denom += theta[i];
-       }
-       assert(denom > EPSILON);
-       for (int i = 0; i <= M; i++) theta[i] /= denom;
-}
-
 // arr should be cumulative!
 // interval : [,)
 // random number should be in [0, arr[len - 1])
 // If by chance arr[len - 1] == 0.0, one possibility is to sample uniformly from 0...len-1
-int sample(uniform_generator& rg, vector<double>& arr, int len) {
+int sample(vector<double>& arr, int len) {
   int l, r, mid;
   double prb = rg() * arr[len - 1];
 
@@ -228,39 +142,14 @@ int sample(uniform_generator& rg, vector<double>& arr, int len) {
   return l;
 }
 
-void writeForSee(FILE* fo, vector<int>& counts) {
-  for (int i = 1; i < M; i++) fprintf(fo, "%d ", counts[i]);
-  fprintf(fo, "%d\n", counts[M]);
-}
-
-void writeCountVector(FILE* fo, vector<int>& counts) {
-       for (int i = 0; i < M; i++) {
-               fprintf(fo, "%d ", counts[i]);
-       }
-       fprintf(fo, "%d\n", counts[M]);
-}
-
-void* Gibbs(void* arg) {
+void init() {
        int len, fr, to;
-       int CHAINLEN;
-       Params *params = (Params*)arg;
-
-       vector<double> theta;
-       vector<int> z, counts;
-       vector<double> arr;
-
-       uniform_generator rg(*params->rng);
-
-       sampleTheta(*params->rng, theta);
 
-
-       // generate initial state
        arr.clear();
-
        z.clear();
-       z.resize(N1);
-
        counts.clear();
+
+       z.resize(N1);
        counts.resize(M + 1, 1); // 1 pseudo count
        counts[0] += N0;
 
@@ -272,17 +161,35 @@ void* Gibbs(void* arg) {
                        arr[j - fr] = theta[hits[j].sid] * hits[j].conprb;
                        if (j > fr) arr[j - fr] += arr[j - fr - 1];  // cumulative
                }
-               z[i] = hits[fr + sample(rg, arr, len)].sid;
+               z[i] = hits[fr + sample(arr, len)].sid;
                ++counts[z[i]];
        }
 
-       // Gibbs sampling
-    char seeF[STRLEN];
-       sprintf(seeF, "%s.see", statName);
-       FILE *fsee = fopen(seeF, "w");
+       totc = N0 + N1 + (M + 1);
+
+       if (verbose) { printf("Initialization is finished!\n"); }
+}
+
+void writeCountVector(FILE* fo) {
+       for (int i = 0; i < M; i++) {
+               fprintf(fo, "%d ", counts[i]);
+       }
+       fprintf(fo, "%d\n", counts[M]);
+}
+
+void Gibbs(char* imdName) {
+       FILE *fo;
+       int fr, to, len;
 
-       CHAINLEN = 1 + (params->nsamples - 1) * GAP;
-       for (int ROUND = 1; ROUND <= BURNIN + CHAINLEN  ; ROUND++) {
+       sprintf(cvsF, "%s.countvectors", imdName);
+       fo = fopen(cvsF, "w");
+       assert(CHAINLEN % GAP == 0);
+       fprintf(fo, "%d %d\n", CHAINLEN / GAP, M + 1);
+       //fprintf(fo, "%d %d\n", CHAINLEN, M + 1);
+
+       pme_c.clear(); pme_c.resize(M + 1, 0.0);
+       pme_theta.clear(); pme_theta.resize(M + 1, 0.0);
+       for (int ROUND = 1; ROUND <= BURNIN + CHAINLEN; ROUND++) {
 
                for (int i = 0; i < N1; i++) {
                        --counts[z[i]];
@@ -292,68 +199,28 @@ void* Gibbs(void* arg) {
                                arr[j - fr] = counts[hits[j].sid] * hits[j].conprb;
                                if (j > fr) arr[j - fr] += arr[j - fr - 1]; //cumulative
                        }
-                       z[i] = hits[fr + sample(rg, arr, len)].sid;
+                       z[i] = hits[fr + sample(arr, len)].sid;
                        ++counts[z[i]];
                }
 
-               writeForSee(fsee, counts);
-
                if (ROUND > BURNIN) {
-                       if ((ROUND - BURNIN - 1) % GAP == 0) writeCountVector(params->fo, counts);
-                       for (int i = 0; i <= M; i++) {
-                               params->pme_c[i] += counts[i] - 1;
-                               params->pme_theta[i] += counts[i] / totc;
+                       if ((ROUND - BURNIN - 1) % GAP == 0) writeCountVector(fo);
+                       for (int i = 0; i <= M; i++) { 
+                         pme_c[i] += counts[i] - 1;
+                         pme_theta[i] += counts[i] / totc;
                        }
                }
 
-               if (verbose & ROUND % 10 == 0) { printf("Thread %d, ROUND %d is finished!\n", params->no, ROUND); }
-       }
-       fclose(fsee);
-
-       return NULL;
-}
-
-void release() {
-       char inpF[STRLEN], command[STRLEN];
-       string line;
-
-       /* destroy attribute */
-       pthread_attr_destroy(&attr);
-       delete[] threads;
-
-       pme_c.clear(); pme_c.resize(M + 1, 0.0);
-       pme_theta.clear(); pme_theta.resize(M + 1, 0.0);
-       for (int i = 0; i < nThreads; i++) {
-               fclose(params[i].fo);
-               delete params[i].rng;
-               for (int j = 0; j <= M; j++) {
-                       pme_c[j] += params[i].pme_c[j];
-                       pme_theta[j] += params[i].pme_theta[j];
-               }
-               delete[] params[i].pme_c;
-               delete[] params[i].pme_theta;
+               if (verbose) { printf("ROUND %d is finished!\n", ROUND); }
        }
-       delete[] params;
+       fclose(fo);
 
        for (int i = 0; i <= M; i++) {
-               pme_c[i] /= NSAMPLES;
-               pme_theta[i] /= NSAMPLES;
+         pme_c[i] /= CHAINLEN;
+         pme_theta[i] /= CHAINLEN;
        }
 
-       // combine files
-       FILE *fo = fopen(cvsF, "a");
-       for (int i = 1; i < nThreads; i++) {
-               sprintf(inpF, "%s%d", cvsF, i);
-               ifstream fin(inpF);
-               while (getline(fin, line)) {
-                       fprintf(fo, "%s\n", line.c_str());
-               }
-               fin.close();
-               sprintf(command, "rm -f %s", inpF);
-               int status = system(command);
-               if (status != 0) { fprintf(stderr, "Fail to delete file %s!\n", inpF); exit(-1); }
-       }
-       fclose(fo);
+       if (verbose) { printf("Gibbs is finished!\n"); }
 }
 
 template<class ModelType>
@@ -471,42 +338,25 @@ void writeEstimatedParameters(char* modelF, char* imdName) {
 
 int main(int argc, char* argv[]) {
        if (argc < 7) {
-               printf("Usage: rsem-run-gibbs reference_name sample_name sampleToken BURNIN NSAMPLES GAP [-p #Threads] [-q]\n");
+               printf("Usage: rsem-run-gibbs reference_name sample_name sampleToken BURNIN CHAINLEN GAP [-q]\n");
                exit(-1);
        }
 
        BURNIN = atoi(argv[4]);
-       NSAMPLES = atoi(argv[5]);
+       CHAINLEN = atoi(argv[5]);
        GAP = atoi(argv[6]);
        sprintf(imdName, "%s.temp/%s", argv[2], argv[3]);
        sprintf(statName, "%s.stat/%s", argv[2], argv[3]);
        load_data(argv[1], statName, imdName);
 
-       nThreads = 1;
        quiet = false;
-       for (int i = 7; i < argc; i++) {
-               if (!strcmp(argv[i], "-p")) nThreads = atoi(argv[i + 1]);
-               if (!strcmp(argv[i], "-q")) quiet = true;
+       if (argc > 7 && !strcmp(argv[7], "-q")) {
+               quiet = true;
        }
        verbose = !quiet;
 
-       if (nThreads > NSAMPLES) {
-               nThreads = NSAMPLES;
-               printf("Warning: Number of samples is less than number of threads! Change the number of threads to %d!\n", nThreads);
-       }
-
-       if (verbose) printf("Gibbs started!\n");
        init();
-       for (int i = 0; i < nThreads; i++) {
-               rc = pthread_create(&threads[i], &attr, Gibbs, (void*)(&params[i]));
-               if (rc != 0) { fprintf(stderr, "Cannot create thread %d! (numbered from 0)\n", i); }
-       }
-       for (int i = 0; i < nThreads; i++) {
-               rc = pthread_join(threads[i], &status);
-               if (rc != 0) { fprintf(stderr, "Cannot join thread %d! (numbered from 0)\n", i); }
-       }
-       release();
-       if (verbose) printf("Gibbs finished!\n");
+       Gibbs(imdName);
 
        sprintf(modelF, "%s.model", statName);
        FILE *fi = fopen(modelF, "r");