11 #include "my_assert.h"
15 #include "SingleModel.h"
16 #include "SingleQModel.h"
17 #include "PairedEndModel.h"
18 #include "PairedEndQModel.h"
22 #include "GroupInfo.h"
23 #include "WriteResults.h"
31 double *pme_c, *pve_c; //posterior mean and variance vectors on counts
32 double *pme_tpm, *pme_fpkm;
40 Item(int sid, double conprb) {
42 this->conprb = conprb;
53 int BURNIN, NSAMPLES, GAP;
54 char refName[STRLEN], imdName[STRLEN], statName[STRLEN];
55 char thetaF[STRLEN], ofgF[STRLEN], refF[STRLEN], modelF[STRLEN];
60 vector<HIT_INT_TYPE> s;
66 vector<double> pme_c, pve_c; //global posterior mean and variance vectors on counts
67 vector<double> pme_tpm, pme_fpkm;
80 void load_data(char* refName, char* statName, char* imdName) {
86 sprintf(refF, "%s.seq", refName);
87 refs.loadRefs(refF, 1);
91 sprintf(ofgF, "%s.ofg", imdName);
93 general_assert(fin.is_open(), "Cannot open " + cstrtos(ofgF) + "!");
95 general_assert(tmpVal == M, "M in " + cstrtos(ofgF) + " is not consistent with " + cstrtos(refF) + "!");
98 s.clear(); hits.clear();
100 while (getline(fin, line)) {
101 istringstream strin(line);
105 while (strin>>sid>>conprb) {
106 hits.push_back(Item(sid, conprb));
108 s.push_back(hits.size());
115 totc = N0 + N1 + (M + 1);
117 if (verbose) { printf("Loading Data is finished!\n"); }
120 template<class ModelType>
121 void init_model_related(char* modelF) {
125 calcExpectedEffectiveLengths<ModelType>(M, refs, model, eel);
126 memcpy(mw, model.getMW(), sizeof(double) * (M + 1)); // otherwise, after exiting this procedure, mw becomes undefined
134 quotient = NSAMPLES / nThreads;
135 left = NSAMPLES % nThreads;
137 sprintf(cvsF, "%s.countvectors", imdName);
138 paramsArray = new Params[nThreads];
139 threads = new pthread_t[nThreads];
141 hasSeed ? engineFactory::init(seed) : engineFactory::init();
142 for (int i = 0; i < nThreads; i++) {
143 paramsArray[i].no = i;
145 paramsArray[i].nsamples = quotient;
146 if (i < left) paramsArray[i].nsamples++;
148 sprintf(outF, "%s%d", cvsF, i);
149 paramsArray[i].fo = fopen(outF, "w");
151 paramsArray[i].engine = engineFactory::new_engine();
152 paramsArray[i].pme_c = new double[M + 1];
153 memset(paramsArray[i].pme_c, 0, sizeof(double) * (M + 1));
154 paramsArray[i].pve_c = new double[M + 1];
155 memset(paramsArray[i].pve_c, 0, sizeof(double) * (M + 1));
156 paramsArray[i].pme_tpm = new double[M + 1];
157 memset(paramsArray[i].pme_tpm, 0, sizeof(double) * (M + 1));
158 paramsArray[i].pme_fpkm = new double[M + 1];
159 memset(paramsArray[i].pme_fpkm, 0, sizeof(double) * (M + 1));
161 engineFactory::finish();
163 /* set thread attribute to be joinable */
164 pthread_attr_init(&attr);
165 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
167 if (verbose) { printf("Initialization finished!\n"); }
170 //sample theta from Dir(1)
171 void sampleTheta(engine_type& engine, vector<double>& theta) {
173 gamma_generator gmg(engine, gm);
176 theta.assign(M + 1, 0);
178 for (int i = 0; i <= M; i++) {
182 assert(denom > EPSILON);
183 for (int i = 0; i <= M; i++) theta[i] /= denom;
186 void writeCountVector(FILE* fo, vector<int>& counts) {
187 for (int i = 0; i < M; i++) {
188 fprintf(fo, "%d ", counts[i]);
190 fprintf(fo, "%d\n", counts[M]);
193 void* Gibbs(void* arg) {
195 HIT_INT_TYPE len, fr, to;
196 Params *params = (Params*)arg;
198 vector<double> theta, tpm, fpkm;
199 vector<int> z, counts;
202 uniform01 rg(*params->engine);
204 // generate initial state
205 sampleTheta(*params->engine, theta);
209 counts.assign(M + 1, 1); // 1 pseudo count
212 for (READ_INT_TYPE i = 0; i < N1; i++) {
213 fr = s[i]; to = s[i + 1];
216 for (HIT_INT_TYPE j = fr; j < to; j++) {
217 arr[j - fr] = theta[hits[j].sid] * hits[j].conprb;
218 if (j > fr) arr[j - fr] += arr[j - fr - 1]; // cumulative
220 z[i] = hits[fr + sample(rg, arr, len)].sid;
225 CHAINLEN = 1 + (params->nsamples - 1) * GAP;
226 for (int ROUND = 1; ROUND <= BURNIN + CHAINLEN; ROUND++) {
228 for (READ_INT_TYPE i = 0; i < N1; i++) {
230 fr = s[i]; to = s[i + 1]; len = to - fr;
232 for (HIT_INT_TYPE j = fr; j < to; j++) {
233 arr[j - fr] = counts[hits[j].sid] * hits[j].conprb;
234 if (j > fr) arr[j - fr] += arr[j - fr - 1]; //cumulative
236 z[i] = hits[fr + sample(rg, arr, len)].sid;
240 if (ROUND > BURNIN) {
241 if ((ROUND - BURNIN - 1) % GAP == 0) {
242 writeCountVector(params->fo, counts);
243 for (int i = 0; i <= M; i++) theta[i] = counts[i] / totc;
244 polishTheta(M, theta, eel, mw);
245 calcExpressionValues(M, theta, eel, tpm, fpkm);
246 for (int i = 0; i <= M; i++) {
247 params->pme_c[i] += counts[i] - 1;
248 params->pve_c[i] += (counts[i] - 1) * (counts[i] - 1);
249 params->pme_tpm[i] += tpm[i];
250 params->pme_fpkm[i] += fpkm[i];
255 if (verbose && ROUND % 100 == 0) { printf("Thread %d, ROUND %d is finished!\n", params->no, ROUND); }
262 // char inpF[STRLEN], command[STRLEN];
265 /* destroy attribute */
266 pthread_attr_destroy(&attr);
269 pme_c.assign(M + 1, 0);
270 pve_c.assign(M + 1, 0);
271 pme_tpm.assign(M + 1, 0);
272 pme_fpkm.assign(M + 1, 0);
273 for (int i = 0; i < nThreads; i++) {
274 fclose(paramsArray[i].fo);
275 delete paramsArray[i].engine;
276 for (int j = 0; j <= M; j++) {
277 pme_c[j] += paramsArray[i].pme_c[j];
278 pve_c[j] += paramsArray[i].pve_c[j];
279 pme_tpm[j] += paramsArray[i].pme_tpm[j];
280 pme_fpkm[j] += paramsArray[i].pme_fpkm[j];
282 delete[] paramsArray[i].pme_c;
283 delete[] paramsArray[i].pve_c;
284 delete[] paramsArray[i].pme_tpm;
285 delete[] paramsArray[i].pme_fpkm;
287 delete[] paramsArray;
290 for (int i = 0; i <= M; i++) {
291 pme_c[i] /= NSAMPLES;
292 pve_c[i] = (pve_c[i] - NSAMPLES * pme_c[i] * pme_c[i]) / (NSAMPLES - 1);
293 pme_tpm[i] /= NSAMPLES;
294 pme_fpkm[i] /= NSAMPLES;
298 int main(int argc, char* argv[]) {
300 printf("Usage: rsem-run-gibbs reference_name imdName statName BURNIN NSAMPLES GAP [-p #Threads] [--var] [--seed seed] [-q]\n");
304 strcpy(refName, argv[1]);
305 strcpy(imdName, argv[2]);
306 strcpy(statName, argv[3]);
308 BURNIN = atoi(argv[4]);
309 NSAMPLES = atoi(argv[5]);
317 for (int i = 7; i < argc; i++) {
318 if (!strcmp(argv[i], "-p")) nThreads = atoi(argv[i + 1]);
319 if (!strcmp(argv[i], "--var")) var_opt = true;
320 if (!strcmp(argv[i], "--seed")) {
322 int len = strlen(argv[i + 1]);
324 for (int k = 0; k < len; k++) seed = seed * 10 + (argv[i + 1][k] - '0');
326 if (!strcmp(argv[i], "-q")) quiet = true;
330 assert(NSAMPLES > 1); // Otherwise, we cannot calculate posterior variance
332 if (nThreads > NSAMPLES) {
334 printf("Warning: Number of samples is less than number of threads! Change the number of threads to %d!\n", nThreads);
337 load_data(refName, statName, imdName);
339 sprintf(modelF, "%s.model", statName);
340 FILE *fi = fopen(modelF, "r");
341 general_assert(fi != NULL, "Cannot open " + cstrtos(modelF) + "!");
342 assert(fscanf(fi, "%d", &model_type) == 1);
345 mw = new double[M + 1]; // make an extra copy
348 case 0 : init_model_related<SingleModel>(modelF); break;
349 case 1 : init_model_related<SingleQModel>(modelF); break;
350 case 2 : init_model_related<PairedEndModel>(modelF); break;
351 case 3 : init_model_related<PairedEndQModel>(modelF); break;
354 if (verbose) printf("Gibbs started!\n");
357 for (int i = 0; i < nThreads; i++) {
358 rc = pthread_create(&threads[i], &attr, Gibbs, (void*)(¶msArray[i]));
359 pthread_assert(rc, "pthread_create", "Cannot create thread " + itos(i) + " (numbered from 0)!");
361 for (int i = 0; i < nThreads; i++) {
362 rc = pthread_join(threads[i], NULL);
363 pthread_assert(rc, "pthread_join", "Cannot join thread " + itos(i) + " (numbered from 0)!");
367 if (verbose) printf("Gibbs finished!\n");
369 writeResultsGibbs(M, refName, imdName, pme_c, pme_fpkm, pme_tpm);
378 sprintf(groupF, "%s.grp", refName);
382 sprintf(varF, "%s.var", statName);
383 FILE *fo = fopen(varF, "w");
384 general_assert(fo != NULL, "Cannot open " + cstrtos(varF) + "!");
385 for (int i = 0; i < m; i++) {
386 int b = gi.spAt(i), e = gi.spAt(i + 1), number_of_isoforms = e - b;
387 for (int j = b; j < e; j++) {
388 fprintf(fo, "%s\t%d\t%.15g\t%.15g\n", refs.getRef(j).getName().c_str(), number_of_isoforms, pme_c[j], pve_c[j]);
394 delete mw; // delete the copy