From: Bo Li Date: Mon, 25 Apr 2011 18:19:40 +0000 (-0500) Subject: back to single threaded Gibbs X-Git-Url: https://git.donarmstrong.com/?p=rsem.git;a=commitdiff_plain;h=b82ba774b1f3510ab8732b58d34f153c7b2f7558 back to single threaded Gibbs --- diff --git a/Gibbs.cpp b/Gibbs.cpp index 4e7377b..5bdfb24 100644 --- a/Gibbs.cpp +++ b/Gibbs.cpp @@ -6,8 +6,6 @@ #include #include #include -#include -#include #include "boost/random.hpp" @@ -24,19 +22,6 @@ using namespace std; -typedef unsigned int seedType; -typedef boost::mt19937 engine_type; -typedef boost::gamma_distribution<> distribution_type; -typedef boost::variate_generator generator_type; -typedef boost::uniform_01 uniform_generator; - -struct Params { - int no, nsamples; - FILE *fo; - boost::mt19937 *rng; - double *pme_c, *pme_theta; -}; - struct Item { int sid; double conprb; @@ -47,12 +32,10 @@ struct Item { } }; -int nThreads; - int model_type; int m, M, N0, N1, nHits; double totc; -int BURNIN, NSAMPLES, GAP; +int BURNIN, CHAINLEN, GAP; char imdName[STRLEN], statName[STRLEN]; char thetaF[STRLEN], ofgF[STRLEN], groupF[STRLEN], refF[STRLEN], modelF[STRLEN]; char cvsF[STRLEN]; @@ -62,18 +45,16 @@ GroupInfo gi; vector theta, pme_theta, pme_c, eel; -vector s; +vector s, z; vector hits; vector counts; bool quiet; -engine_type engine(time(NULL)); -Params *params; -pthread_t *threads; -pthread_attr_t attr; -void *status; -int rc; +vector arr; + +boost::mt19937 rng(time(NULL)); +boost::uniform_01 rg(rng); void load_data(char* reference_name, char* statName, char* imdName) { ifstream fin; @@ -137,81 +118,14 @@ void load_data(char* reference_name, char* statName, char* imdName) { N1 = s.size() - 1; nHits = hits.size(); - totc = N0 + N1 + (M + 1); - if (verbose) { printf("Loading Data is finished!\n"); } } -// assign threads -void init() { - int quotient, left; - seedType seed; - set seedSet; - set::iterator iter; - char outF[STRLEN]; - - quotient = NSAMPLES / nThreads; - left = NSAMPLES % nThreads; - - sprintf(cvsF, "%s.countvectors", imdName); - seedSet.clear(); - params = new Params[nThreads]; - threads = new pthread_t[nThreads]; - for (int i = 0; i < nThreads; i++) { - params[i].no = i; - - params[i].nsamples = quotient; - if (i < left) params[i].nsamples++; - - if (i == 0) { - params[i].fo = fopen(cvsF, "w"); - fprintf(params[i].fo, "%d %d\n", NSAMPLES, M + 1); - } - else { - sprintf(outF, "%s%d", cvsF, i); - params[i].fo = fopen(outF, "w"); - } - - do { - seed = engine(); - iter = seedSet.find(seed); - } while (iter != seedSet.end()); - params[i].rng = new engine_type(seed); - seedSet.insert(seed); - - params[i].pme_c = new double[M + 1]; - memset(params[i].pme_c, 0, sizeof(double) * (M + 1)); - params[i].pme_theta = new double[M + 1]; - memset(params[i].pme_theta, 0, sizeof(double) * (M + 1)); - } - - /* set thread attribute to be joinable */ - pthread_attr_init(&attr); - pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); - -} - -void sampleTheta(engine_type& engine, vector& theta) { - distribution_type gm(1); - generator_type gmg(engine, gm); - double denom; - - theta.clear(); - theta.resize(M + 1); - denom = 0.0; - for (int i = 0; i <= M; i++) { - theta[i] = gmg(); - denom += theta[i]; - } - assert(denom > EPSILON); - for (int i = 0; i <= M; i++) theta[i] /= denom; -} - // arr should be cumulative! // interval : [,) // random number should be in [0, arr[len - 1]) // If by chance arr[len - 1] == 0.0, one possibility is to sample uniformly from 0...len-1 -int sample(uniform_generator& rg, vector& arr, int len) { +int sample(vector& arr, int len) { int l, r, mid; double prb = rg() * arr[len - 1]; @@ -228,39 +142,14 @@ int sample(uniform_generator& rg, vector& arr, int len) { return l; } -void writeForSee(FILE* fo, vector& counts) { - for (int i = 1; i < M; i++) fprintf(fo, "%d ", counts[i]); - fprintf(fo, "%d\n", counts[M]); -} - -void writeCountVector(FILE* fo, vector& counts) { - for (int i = 0; i < M; i++) { - fprintf(fo, "%d ", counts[i]); - } - fprintf(fo, "%d\n", counts[M]); -} - -void* Gibbs(void* arg) { +void init() { int len, fr, to; - int CHAINLEN; - Params *params = (Params*)arg; - - vector theta; - vector z, counts; - vector arr; - - uniform_generator rg(*params->rng); - - sampleTheta(*params->rng, theta); - - // generate initial state arr.clear(); - z.clear(); - z.resize(N1); - counts.clear(); + + z.resize(N1); counts.resize(M + 1, 1); // 1 pseudo count counts[0] += N0; @@ -272,17 +161,35 @@ void* Gibbs(void* arg) { arr[j - fr] = theta[hits[j].sid] * hits[j].conprb; if (j > fr) arr[j - fr] += arr[j - fr - 1]; // cumulative } - z[i] = hits[fr + sample(rg, arr, len)].sid; + z[i] = hits[fr + sample(arr, len)].sid; ++counts[z[i]]; } - // Gibbs sampling - char seeF[STRLEN]; - sprintf(seeF, "%s.see", statName); - FILE *fsee = fopen(seeF, "w"); + totc = N0 + N1 + (M + 1); + + if (verbose) { printf("Initialization is finished!\n"); } +} + +void writeCountVector(FILE* fo) { + for (int i = 0; i < M; i++) { + fprintf(fo, "%d ", counts[i]); + } + fprintf(fo, "%d\n", counts[M]); +} + +void Gibbs(char* imdName) { + FILE *fo; + int fr, to, len; - CHAINLEN = 1 + (params->nsamples - 1) * GAP; - for (int ROUND = 1; ROUND <= BURNIN + CHAINLEN ; ROUND++) { + sprintf(cvsF, "%s.countvectors", imdName); + fo = fopen(cvsF, "w"); + assert(CHAINLEN % GAP == 0); + fprintf(fo, "%d %d\n", CHAINLEN / GAP, M + 1); + //fprintf(fo, "%d %d\n", CHAINLEN, M + 1); + + pme_c.clear(); pme_c.resize(M + 1, 0.0); + pme_theta.clear(); pme_theta.resize(M + 1, 0.0); + for (int ROUND = 1; ROUND <= BURNIN + CHAINLEN; ROUND++) { for (int i = 0; i < N1; i++) { --counts[z[i]]; @@ -292,68 +199,28 @@ void* Gibbs(void* arg) { arr[j - fr] = counts[hits[j].sid] * hits[j].conprb; if (j > fr) arr[j - fr] += arr[j - fr - 1]; //cumulative } - z[i] = hits[fr + sample(rg, arr, len)].sid; + z[i] = hits[fr + sample(arr, len)].sid; ++counts[z[i]]; } - writeForSee(fsee, counts); - if (ROUND > BURNIN) { - if ((ROUND - BURNIN - 1) % GAP == 0) writeCountVector(params->fo, counts); - for (int i = 0; i <= M; i++) { - params->pme_c[i] += counts[i] - 1; - params->pme_theta[i] += counts[i] / totc; + if ((ROUND - BURNIN - 1) % GAP == 0) writeCountVector(fo); + for (int i = 0; i <= M; i++) { + pme_c[i] += counts[i] - 1; + pme_theta[i] += counts[i] / totc; } } - if (verbose & ROUND % 10 == 0) { printf("Thread %d, ROUND %d is finished!\n", params->no, ROUND); } - } - fclose(fsee); - - return NULL; -} - -void release() { - char inpF[STRLEN], command[STRLEN]; - string line; - - /* destroy attribute */ - pthread_attr_destroy(&attr); - delete[] threads; - - pme_c.clear(); pme_c.resize(M + 1, 0.0); - pme_theta.clear(); pme_theta.resize(M + 1, 0.0); - for (int i = 0; i < nThreads; i++) { - fclose(params[i].fo); - delete params[i].rng; - for (int j = 0; j <= M; j++) { - pme_c[j] += params[i].pme_c[j]; - pme_theta[j] += params[i].pme_theta[j]; - } - delete[] params[i].pme_c; - delete[] params[i].pme_theta; + if (verbose) { printf("ROUND %d is finished!\n", ROUND); } } - delete[] params; + fclose(fo); for (int i = 0; i <= M; i++) { - pme_c[i] /= NSAMPLES; - pme_theta[i] /= NSAMPLES; + pme_c[i] /= CHAINLEN; + pme_theta[i] /= CHAINLEN; } - // combine files - FILE *fo = fopen(cvsF, "a"); - for (int i = 1; i < nThreads; i++) { - sprintf(inpF, "%s%d", cvsF, i); - ifstream fin(inpF); - while (getline(fin, line)) { - fprintf(fo, "%s\n", line.c_str()); - } - fin.close(); - sprintf(command, "rm -f %s", inpF); - int status = system(command); - if (status != 0) { fprintf(stderr, "Fail to delete file %s!\n", inpF); exit(-1); } - } - fclose(fo); + if (verbose) { printf("Gibbs is finished!\n"); } } template @@ -471,42 +338,25 @@ void writeEstimatedParameters(char* modelF, char* imdName) { int main(int argc, char* argv[]) { if (argc < 7) { - printf("Usage: rsem-run-gibbs reference_name sample_name sampleToken BURNIN NSAMPLES GAP [-p #Threads] [-q]\n"); + printf("Usage: rsem-run-gibbs reference_name sample_name sampleToken BURNIN CHAINLEN GAP [-q]\n"); exit(-1); } BURNIN = atoi(argv[4]); - NSAMPLES = atoi(argv[5]); + CHAINLEN = atoi(argv[5]); GAP = atoi(argv[6]); sprintf(imdName, "%s.temp/%s", argv[2], argv[3]); sprintf(statName, "%s.stat/%s", argv[2], argv[3]); load_data(argv[1], statName, imdName); - nThreads = 1; quiet = false; - for (int i = 7; i < argc; i++) { - if (!strcmp(argv[i], "-p")) nThreads = atoi(argv[i + 1]); - if (!strcmp(argv[i], "-q")) quiet = true; + if (argc > 7 && !strcmp(argv[7], "-q")) { + quiet = true; } verbose = !quiet; - if (nThreads > NSAMPLES) { - nThreads = NSAMPLES; - printf("Warning: Number of samples is less than number of threads! Change the number of threads to %d!\n", nThreads); - } - - if (verbose) printf("Gibbs started!\n"); init(); - for (int i = 0; i < nThreads; i++) { - rc = pthread_create(&threads[i], &attr, Gibbs, (void*)(¶ms[i])); - if (rc != 0) { fprintf(stderr, "Cannot create thread %d! (numbered from 0)\n", i); } - } - for (int i = 0; i < nThreads; i++) { - rc = pthread_join(threads[i], &status); - if (rc != 0) { fprintf(stderr, "Cannot join thread %d! (numbered from 0)\n", i); } - } - release(); - if (verbose) printf("Gibbs finished!\n"); + Gibbs(imdName); sprintf(modelF, "%s.model", statName); FILE *fi = fopen(modelF, "r");