From 61ac4ad7ad6c01ee75fa3e3ea8d396618a08b96d Mon Sep 17 00:00:00 2001 From: Bo Li Date: Mon, 25 Apr 2011 13:10:13 -0500 Subject: [PATCH] Version with Gibbs parallelized --- Gibbs.cpp | 250 ++++++++++++++++++++++++++++++-------- calcCI.cpp | 2 + makefile | 2 +- rsem-calculate-expression | 5 +- 4 files changed, 206 insertions(+), 53 deletions(-) diff --git a/Gibbs.cpp b/Gibbs.cpp index 5bdfb24..4e7377b 100644 --- a/Gibbs.cpp +++ b/Gibbs.cpp @@ -6,6 +6,8 @@ #include #include #include +#include +#include #include "boost/random.hpp" @@ -22,6 +24,19 @@ using namespace std; +typedef unsigned int seedType; +typedef boost::mt19937 engine_type; +typedef boost::gamma_distribution<> distribution_type; +typedef boost::variate_generator generator_type; +typedef boost::uniform_01 uniform_generator; + +struct Params { + int no, nsamples; + FILE *fo; + boost::mt19937 *rng; + double *pme_c, *pme_theta; +}; + struct Item { int sid; double conprb; @@ -32,10 +47,12 @@ struct Item { } }; +int nThreads; + int model_type; int m, M, N0, N1, nHits; double totc; -int BURNIN, CHAINLEN, GAP; +int BURNIN, NSAMPLES, GAP; char imdName[STRLEN], statName[STRLEN]; char thetaF[STRLEN], ofgF[STRLEN], groupF[STRLEN], refF[STRLEN], modelF[STRLEN]; char cvsF[STRLEN]; @@ -45,16 +62,18 @@ GroupInfo gi; vector theta, pme_theta, pme_c, eel; -vector s, z; +vector s; vector hits; vector counts; bool quiet; -vector arr; - -boost::mt19937 rng(time(NULL)); -boost::uniform_01 rg(rng); +engine_type engine(time(NULL)); +Params *params; +pthread_t *threads; +pthread_attr_t attr; +void *status; +int rc; void load_data(char* reference_name, char* statName, char* imdName) { ifstream fin; @@ -118,14 +137,81 @@ void load_data(char* reference_name, char* statName, char* imdName) { N1 = s.size() - 1; nHits = hits.size(); + totc = N0 + N1 + (M + 1); + if (verbose) { printf("Loading Data is finished!\n"); } } +// assign threads +void init() { + int quotient, left; + seedType seed; + set seedSet; + set::iterator iter; + char outF[STRLEN]; + + quotient = NSAMPLES / nThreads; + left = NSAMPLES % nThreads; + + sprintf(cvsF, "%s.countvectors", imdName); + seedSet.clear(); + params = new Params[nThreads]; + threads = new pthread_t[nThreads]; + for (int i = 0; i < nThreads; i++) { + params[i].no = i; + + params[i].nsamples = quotient; + if (i < left) params[i].nsamples++; + + if (i == 0) { + params[i].fo = fopen(cvsF, "w"); + fprintf(params[i].fo, "%d %d\n", NSAMPLES, M + 1); + } + else { + sprintf(outF, "%s%d", cvsF, i); + params[i].fo = fopen(outF, "w"); + } + + do { + seed = engine(); + iter = seedSet.find(seed); + } while (iter != seedSet.end()); + params[i].rng = new engine_type(seed); + seedSet.insert(seed); + + params[i].pme_c = new double[M + 1]; + memset(params[i].pme_c, 0, sizeof(double) * (M + 1)); + params[i].pme_theta = new double[M + 1]; + memset(params[i].pme_theta, 0, sizeof(double) * (M + 1)); + } + + /* set thread attribute to be joinable */ + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); + +} + +void sampleTheta(engine_type& engine, vector& theta) { + distribution_type gm(1); + generator_type gmg(engine, gm); + double denom; + + theta.clear(); + theta.resize(M + 1); + denom = 0.0; + for (int i = 0; i <= M; i++) { + theta[i] = gmg(); + denom += theta[i]; + } + assert(denom > EPSILON); + for (int i = 0; i <= M; i++) theta[i] /= denom; +} + // arr should be cumulative! // interval : [,) // random number should be in [0, arr[len - 1]) // If by chance arr[len - 1] == 0.0, one possibility is to sample uniformly from 0...len-1 -int sample(vector& arr, int len) { +int sample(uniform_generator& rg, vector& arr, int len) { int l, r, mid; double prb = rg() * arr[len - 1]; @@ -142,14 +228,39 @@ int sample(vector& arr, int len) { return l; } -void init() { +void writeForSee(FILE* fo, vector& counts) { + for (int i = 1; i < M; i++) fprintf(fo, "%d ", counts[i]); + fprintf(fo, "%d\n", counts[M]); +} + +void writeCountVector(FILE* fo, vector& counts) { + for (int i = 0; i < M; i++) { + fprintf(fo, "%d ", counts[i]); + } + fprintf(fo, "%d\n", counts[M]); +} + +void* Gibbs(void* arg) { int len, fr, to; + int CHAINLEN; + Params *params = (Params*)arg; + + vector theta; + vector z, counts; + vector arr; + + uniform_generator rg(*params->rng); + + sampleTheta(*params->rng, theta); + + // generate initial state arr.clear(); - z.clear(); - counts.clear(); + z.clear(); z.resize(N1); + + counts.clear(); counts.resize(M + 1, 1); // 1 pseudo count counts[0] += N0; @@ -161,35 +272,17 @@ void init() { arr[j - fr] = theta[hits[j].sid] * hits[j].conprb; if (j > fr) arr[j - fr] += arr[j - fr - 1]; // cumulative } - z[i] = hits[fr + sample(arr, len)].sid; + z[i] = hits[fr + sample(rg, arr, len)].sid; ++counts[z[i]]; } - totc = N0 + N1 + (M + 1); - - if (verbose) { printf("Initialization is finished!\n"); } -} - -void writeCountVector(FILE* fo) { - for (int i = 0; i < M; i++) { - fprintf(fo, "%d ", counts[i]); - } - fprintf(fo, "%d\n", counts[M]); -} - -void Gibbs(char* imdName) { - FILE *fo; - int fr, to, len; + // Gibbs sampling + char seeF[STRLEN]; + sprintf(seeF, "%s.see", statName); + FILE *fsee = fopen(seeF, "w"); - sprintf(cvsF, "%s.countvectors", imdName); - fo = fopen(cvsF, "w"); - assert(CHAINLEN % GAP == 0); - fprintf(fo, "%d %d\n", CHAINLEN / GAP, M + 1); - //fprintf(fo, "%d %d\n", CHAINLEN, M + 1); - - pme_c.clear(); pme_c.resize(M + 1, 0.0); - pme_theta.clear(); pme_theta.resize(M + 1, 0.0); - for (int ROUND = 1; ROUND <= BURNIN + CHAINLEN; ROUND++) { + CHAINLEN = 1 + (params->nsamples - 1) * GAP; + for (int ROUND = 1; ROUND <= BURNIN + CHAINLEN ; ROUND++) { for (int i = 0; i < N1; i++) { --counts[z[i]]; @@ -199,28 +292,68 @@ void Gibbs(char* imdName) { arr[j - fr] = counts[hits[j].sid] * hits[j].conprb; if (j > fr) arr[j - fr] += arr[j - fr - 1]; //cumulative } - z[i] = hits[fr + sample(arr, len)].sid; + z[i] = hits[fr + sample(rg, arr, len)].sid; ++counts[z[i]]; } + writeForSee(fsee, counts); + if (ROUND > BURNIN) { - if ((ROUND - BURNIN - 1) % GAP == 0) writeCountVector(fo); - for (int i = 0; i <= M; i++) { - pme_c[i] += counts[i] - 1; - pme_theta[i] += counts[i] / totc; + if ((ROUND - BURNIN - 1) % GAP == 0) writeCountVector(params->fo, counts); + for (int i = 0; i <= M; i++) { + params->pme_c[i] += counts[i] - 1; + params->pme_theta[i] += counts[i] / totc; } } - if (verbose) { printf("ROUND %d is finished!\n", ROUND); } + if (verbose & ROUND % 10 == 0) { printf("Thread %d, ROUND %d is finished!\n", params->no, ROUND); } } - fclose(fo); + fclose(fsee); + + return NULL; +} + +void release() { + char inpF[STRLEN], command[STRLEN]; + string line; + + /* destroy attribute */ + pthread_attr_destroy(&attr); + delete[] threads; + + pme_c.clear(); pme_c.resize(M + 1, 0.0); + pme_theta.clear(); pme_theta.resize(M + 1, 0.0); + for (int i = 0; i < nThreads; i++) { + fclose(params[i].fo); + delete params[i].rng; + for (int j = 0; j <= M; j++) { + pme_c[j] += params[i].pme_c[j]; + pme_theta[j] += params[i].pme_theta[j]; + } + delete[] params[i].pme_c; + delete[] params[i].pme_theta; + } + delete[] params; for (int i = 0; i <= M; i++) { - pme_c[i] /= CHAINLEN; - pme_theta[i] /= CHAINLEN; + pme_c[i] /= NSAMPLES; + pme_theta[i] /= NSAMPLES; } - if (verbose) { printf("Gibbs is finished!\n"); } + // combine files + FILE *fo = fopen(cvsF, "a"); + for (int i = 1; i < nThreads; i++) { + sprintf(inpF, "%s%d", cvsF, i); + ifstream fin(inpF); + while (getline(fin, line)) { + fprintf(fo, "%s\n", line.c_str()); + } + fin.close(); + sprintf(command, "rm -f %s", inpF); + int status = system(command); + if (status != 0) { fprintf(stderr, "Fail to delete file %s!\n", inpF); exit(-1); } + } + fclose(fo); } template @@ -338,25 +471,42 @@ void writeEstimatedParameters(char* modelF, char* imdName) { int main(int argc, char* argv[]) { if (argc < 7) { - printf("Usage: rsem-run-gibbs reference_name sample_name sampleToken BURNIN CHAINLEN GAP [-q]\n"); + printf("Usage: rsem-run-gibbs reference_name sample_name sampleToken BURNIN NSAMPLES GAP [-p #Threads] [-q]\n"); exit(-1); } BURNIN = atoi(argv[4]); - CHAINLEN = atoi(argv[5]); + NSAMPLES = atoi(argv[5]); GAP = atoi(argv[6]); sprintf(imdName, "%s.temp/%s", argv[2], argv[3]); sprintf(statName, "%s.stat/%s", argv[2], argv[3]); load_data(argv[1], statName, imdName); + nThreads = 1; quiet = false; - if (argc > 7 && !strcmp(argv[7], "-q")) { - quiet = true; + for (int i = 7; i < argc; i++) { + if (!strcmp(argv[i], "-p")) nThreads = atoi(argv[i + 1]); + if (!strcmp(argv[i], "-q")) quiet = true; } verbose = !quiet; + if (nThreads > NSAMPLES) { + nThreads = NSAMPLES; + printf("Warning: Number of samples is less than number of threads! Change the number of threads to %d!\n", nThreads); + } + + if (verbose) printf("Gibbs started!\n"); init(); - Gibbs(imdName); + for (int i = 0; i < nThreads; i++) { + rc = pthread_create(&threads[i], &attr, Gibbs, (void*)(¶ms[i])); + if (rc != 0) { fprintf(stderr, "Cannot create thread %d! (numbered from 0)\n", i); } + } + for (int i = 0; i < nThreads; i++) { + rc = pthread_join(threads[i], &status); + if (rc != 0) { fprintf(stderr, "Cannot join thread %d! (numbered from 0)\n", i); } + } + release(); + if (verbose) printf("Gibbs finished!\n"); sprintf(modelF, "%s.model", statName); FILE *fi = fopen(modelF, "r"); diff --git a/calcCI.cpp b/calcCI.cpp index c9ccce5..f3964d1 100644 --- a/calcCI.cpp +++ b/calcCI.cpp @@ -382,12 +382,14 @@ int main(int argc, char* argv[]) { delete[] tau_denoms; + /* sprintf(command, "rm -f %s", tmpF); int status = system(command); if (status != 0) { fprintf(stderr, "Cannot delete %s!\n", tmpF); exit(-1); } + */ return 0; } diff --git a/makefile b/makefile index 7609459..97cba95 100644 --- a/makefile +++ b/makefile @@ -92,7 +92,7 @@ simulation.o : utils.h Read.h SingleRead.h SingleReadQ.h PairedEndRead.h PairedE $(CC) $(COFLAGS) simulation.cpp rsem-run-gibbs : Gibbs.o - $(CC) -o rsem-run-gibbs Gibbs.o + $(CC) -o rsem-run-gibbs Gibbs.o -lpthread #some header files are omitted Gibbs.o : utils.h Model.h SingleModel.h SingleQModel.h PairedEndModel.h PairedEndQModel.h RefSeq.h RefSeqPolicy.h PolyARules.h Refs.h GroupInfo.h boost/random.hpp Gibbs.cpp diff --git a/rsem-calculate-expression b/rsem-calculate-expression index 746fb7e..68aa258 100755 --- a/rsem-calculate-expression +++ b/rsem-calculate-expression @@ -8,7 +8,7 @@ use strict; #const my $BURNIN = 200; -my $CHAINLEN = 1000; +my $NSAMPLES = 1000; my $SAMPLEGAP = 1; my $CONFIDENCE = 0.95; my $NSPC = 50; @@ -285,7 +285,8 @@ if ($genBamF) { &collectResults("$imdName.gene_res", "$sampleName.genes.results"); # gene level if ($calcCI) { - $command = $dir."rsem-run-gibbs $refName $sampleName $sampleToken $BURNIN $CHAINLEN $SAMPLEGAP"; + $command = $dir."rsem-run-gibbs $refName $sampleName $sampleToken $BURNIN $NSAMPLES $SAMPLEGAP"; + $command .= " -p $nThreads"; if ($quiet) { $command .= " -q"; } print "$command\n"; $status = system($command); -- 2.39.2