]> git.donarmstrong.com Git - rsem.git/blob - calcCI.cpp
rewrote parallelization of calcCI.cpp
[rsem.git] / calcCI.cpp
1 #include<ctime>
2 #include<cstdio>
3 #include<cstring>
4 #include<cstdlib>
5 #include<cassert>
6 #include<fstream>
7 #include<algorithm>
8 #include<vector>
9 #include<pthread.h>
10
11 #include "utils.h"
12 #include "my_assert.h"
13 #include "sampling.h"
14
15 #include "Model.h"
16 #include "SingleModel.h"
17 #include "SingleQModel.h"
18 #include "PairedEndModel.h"
19 #include "PairedEndQModel.h"
20
21 #include "Refs.h"
22 #include "GroupInfo.h"
23
24 #include "Buffer.h"
25 using namespace std;
26
27 struct Params {
28         int no;
29         FILE *fi;
30         bufsize_type size;
31         int sp;
32         engine_type *engine;
33         double *mw;
34 };
35
36 struct CIType {
37         float lb, ub; // the interval is [lb, ub]
38
39         CIType() { lb = ub = 0.0; }
40 };
41
42 struct CIParams {
43         int no;
44         int start_gene_id, end_gene_id;
45 };
46
47 int model_type;
48
49 int nMB;
50 double confidence;
51 int nCV, nSpC, nSamples; // nCV: number of count vectors; nSpC: number of theta vectors sampled per count vector; nSamples: nCV * nSpC
52 int nThreads;
53 int cvlen;
54
55 char cvsF[STRLEN], tmpF[STRLEN], command[STRLEN];
56
57 CIType *iso_tau, *gene_tau;
58
59 int M, m;
60 Refs refs;
61 GroupInfo gi;
62 char imdName[STRLEN], statName[STRLEN];
63 char modelF[STRLEN], groupF[STRLEN], refF[STRLEN];
64
65 vector<double> eel; //expected effective lengths
66
67 Buffer *buffer;
68
69 bool quiet;
70
71 Params *paramsArray;
72 pthread_t *threads;
73 pthread_attr_t attr;
74 void *status;
75 int rc;
76
77 CIParams *ciParamsArray;
78
79 template<class ModelType>
80 void calcExpectedEffectiveLengths(ModelType& model) {
81         int lb, ub, span;
82         double *pdf = NULL, *cdf = NULL, *clen = NULL; // clen[i] = \sigma_{j=1}^{i}pdf[i]*(lb+i)
83   
84         model.getGLD().copyTo(pdf, cdf, lb, ub, span);
85         clen = new double[span + 1];
86         clen[0] = 0.0;
87         for (int i = 1; i <= span; i++) {
88                 clen[i] = clen[i - 1] + pdf[i] * (lb + i);
89         }
90
91         eel.assign(M + 1, 0.0);
92         for (int i = 1; i <= M; i++) {
93                 int totLen = refs.getRef(i).getTotLen();
94                 int fullLen = refs.getRef(i).getFullLen();
95                 int pos1 = max(min(totLen - fullLen + 1, ub) - lb, 0);
96                 int pos2 = max(min(totLen, ub) - lb, 0);
97
98                 if (pos2 == 0) { eel[i] = 0.0; continue; }
99     
100                 eel[i] = fullLen * cdf[pos1] + ((cdf[pos2] - cdf[pos1]) * (totLen + 1) - (clen[pos2] - clen[pos1]));
101                 assert(eel[i] >= 0);
102                 if (eel[i] < MINEEL) { eel[i] = 0.0; }
103         }
104   
105         delete[] pdf;
106         delete[] cdf;
107         delete[] clen;
108 }
109
110 void* sample_theta_from_c(void* arg) {
111
112         int *cvec;
113         double *theta;
114         gamma_dist **gammas;
115         gamma_generator **rgs;
116
117         Params *params = (Params*)arg;
118         FILE *fi = params->fi;
119         double *mw = params->mw;
120         Buffer buffer(params->size, params->sp, nSamples, cvlen, tmpF);
121
122         cvec = new int[cvlen];
123         theta = new double[cvlen];
124         gammas = new gamma_dist*[cvlen];
125         rgs = new gamma_generator*[cvlen];
126
127         float *vec = new float[cvlen];
128
129         int cnt = 0;
130         while (fscanf(fi, "%d", &cvec[0]) == 1) {
131                 for (int j = 1; j < cvlen; j++) assert(fscanf(fi, "%d", &cvec[j]) == 1);
132
133                 ++cnt;
134
135                 for (int j = 0; j < cvlen; j++) {
136                         gammas[j] = new gamma_dist(cvec[j]);
137                         rgs[j] = new gamma_generator(*(params->engine), *gammas[j]);
138                 }
139
140                 for (int i = 0; i < nSpC; i++) {
141                         double sum = 0.0;
142                         for (int j = 0; j < cvlen; j++) {
143                                 theta[j] = ((j == 0 || eel[j] >= EPSILON) ? (*rgs[j])() : 0.0);
144                                 sum += theta[j];
145                         }
146                         assert(sum >= EPSILON);
147                         for (int j = 0; j < cvlen; j++) theta[j] /= sum;
148
149                         sum = 0.0;
150                         for (int j = 0; j < cvlen; j++) {
151                                 theta[j] = (mw[j] < EPSILON ? 0.0 : theta[j] / mw[j]);
152                                 sum += theta[j];
153                         }
154                         assert(sum >= EPSILON);
155                         for (int j = 0; j < cvlen; j++) theta[j] /= sum;
156
157
158                         sum = 0.0;
159                         vec[0] = theta[0];
160                         for (int j = 1; j < cvlen; j++)
161                                 if (eel[j] >= EPSILON) {
162                                         vec[j] = theta[j] / eel[j];
163                                         sum += vec[j];
164                                 }
165                                 else assert(theta[j] < EPSILON);
166                         assert(sum >= EPSILON);
167                         for (int j = 1; j < cvlen; j++) vec[j] /= sum;
168
169                         buffer.write(vec);
170                 }
171
172                 for (int j = 0; j < cvlen; j++) {
173                         delete gammas[j];
174                         delete rgs[j];
175                 }
176
177                 if (verbose && cnt % 100 == 0) { printf("Thread %d, %d count vectors are processed!\n", params->no, cnt); }
178         }
179
180         delete[] cvec;
181         delete[] theta;
182         delete[] gammas;
183         delete[] rgs;
184
185         delete[] vec;
186
187         return NULL;
188 }
189
190 template<class ModelType>
191 void sample_theta_vectors_from_count_vectors() {
192         ModelType model;
193         model.read(modelF);
194         calcExpectedEffectiveLengths<ModelType>(model);
195
196         char splitF[STRLEN];
197         bufsize_type buf_maxcv = bufsize_type(nMB) * 1024 * 1024 / FLOATSIZE / cvlen;
198         bufsize_type quotient, left;
199         int sum = 0;
200
201         sprintf(splitF, "%s.split", imdName);
202         FILE *fi = fopen(splitF, "r");
203         int num_threads;
204         assert(fscanf(fi, "%d", &num_threads) == 1);
205         assert(num_threads <= nThreads);
206
207         quotient = buf_maxcv / num_threads;
208         assert(quotient > 0);
209         left = buf_maxcv % num_threads;
210
211         paramsArray = new Params[num_threads];
212         threads = new pthread_t[num_threads];
213
214         char inpF[STRLEN];
215         for (int i = 0; i < num_threads; i++) {
216                 paramsArray[i].no = i;
217                 sprintf(inpF, "%s%d", cvsF, i);
218                 paramsArray[i].fi = fopen(inpF, "r");
219
220                 int num_samples;
221                 assert(fscanf(fi, "%d", &num_samples) == 1);
222                 num_samples *= nSpC;
223                 if (bufsize_type(num_samples) <= quotient) paramsArray[i].size = num_samples;
224                 else {
225                         paramsArray[i].size = quotient;
226                         if (left > 0) { ++paramsArray[i].size; --left; }
227                 }
228                 paramsArray[i].size *=  cvlen * FLOATSIZE;
229                 paramsArray[i].sp = sum;
230                 sum += num_samples;
231
232                 paramsArray[i].engine = engineFactory::new_engine();
233                 paramsArray[i].mw = model.getMW();
234         }
235
236         fclose(fi);
237
238         /* set thread attribute to be joinable */
239         pthread_attr_init(&attr);
240         pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
241
242         for (int i = 0; i < num_threads; i++) {
243                 rc = pthread_create(&threads[i], &attr, &sample_theta_from_c, (void*)(&paramsArray[i]));
244                 pthread_assert(rc, "pthread_create", "Cannot create thread " + itos(i) + " (numbered from 0) in sample_theta_vectors_from_count_vectors!");
245         }
246         for (int i = 0; i < num_threads; i++) {
247                 rc = pthread_join(threads[i], &status);
248                 pthread_assert(rc, "pthread_join", "Cannot join thread " + itos(i) + " (numbered from 0) in sample_theta_vectors_from_count_vectors!");
249         }
250
251         /* destroy attribute */
252         pthread_attr_destroy(&attr);
253         delete[] threads;
254
255         for (int i = 0; i < num_threads; i++) {
256                 fclose(paramsArray[i].fi);
257                 delete paramsArray[i].engine;
258         }
259         delete[] paramsArray;
260
261         if (verbose) { printf("Sampling is finished!\n"); }
262 }
263
264 void calcCI(int nSamples, float *samples, float &lb, float &ub) {
265         int p, q; // p pointer for lb, q pointer for ub;
266         int newp, newq;
267         int threshold = nSamples - (int(confidence * nSamples - 1e-8) + 1);
268         int nOutside = 0;
269
270         sort(samples, samples + nSamples);
271
272         p = 0; q = nSamples - 1;
273         newq = nSamples - 1;
274         do {
275                 q = newq;
276                 while (newq > 0 && samples[newq - 1] == samples[newq]) newq--;
277                 newq--;
278         } while (newq >= 0 && nSamples - (newq + 1) <= threshold);
279
280         nOutside = nSamples - (q + 1);
281
282         lb = -1e30; ub = 1e30;
283         do {
284                 if (samples[q] - samples[p] < ub - lb) {
285                         lb = samples[p];
286                         ub = samples[q];
287                 }
288
289                 newp = p;
290                 while (newp < nSamples - 1 && samples[newp] == samples[newp + 1]) newp++;
291                 newp++;
292                 if (newp <= threshold) {
293                         nOutside += newp - p;
294                         p = newp;
295                         while (nOutside > threshold && q < nSamples - 1) {
296                                 newq = q + 1;
297                                 while (newq < nSamples - 1 && samples[newq] == samples[newq + 1]) newq++;
298                                 nOutside -= newq - q;
299                                 q = newq;
300                         }
301                         assert(nOutside <= threshold);
302                 }
303                 else p = newp;
304         } while (p <= threshold);
305 }
306
307 void* calcCI_batch(void* arg) {
308         float *itsamples, *gtsamples;
309         ifstream fin;
310         CIParams *ciParams = (CIParams*)arg;
311
312         itsamples = new float[nSamples];
313         gtsamples = new float[nSamples];
314
315         fin.open(tmpF, ios::binary);
316         streampos pos = streampos(gi.spAt(ciParams->start_gene_id)) * nSamples * FLOATSIZE;
317         fin.seekg(pos, ios::beg);
318
319         int cnt = 0;
320         for (int i = ciParams->start_gene_id; i < ciParams->end_gene_id; i++) {
321                 int b = gi.spAt(i), e = gi.spAt(i + 1);
322                 memset(gtsamples, 0, FLOATSIZE * nSamples);
323                 for (int j = b; j < e; j++) {
324                         for (int k = 0; k < nSamples; k++) {
325                                 fin.read((char*)(&itsamples[k]), FLOATSIZE);
326                                 gtsamples[k] += itsamples[k];
327                         }
328                         calcCI(nSamples, itsamples, iso_tau[j].lb, iso_tau[j].ub);
329                 }
330                 calcCI(nSamples, gtsamples, gene_tau[i].lb, gene_tau[i].ub);
331
332                 ++cnt;
333                 if (verbose && cnt % 1000 == 0) { printf("In thread %d, %d genes are processed for CI calculation!\n", ciParams->no, cnt); }
334         }
335
336         fin.close();
337
338         delete[] itsamples;
339         delete[] gtsamples;
340
341         return NULL;
342 }
343
344 void calculate_credibility_intervals(char* imdName) {
345         FILE *fo;
346         char outF[STRLEN];
347
348         iso_tau = new CIType[M + 1];
349         gene_tau = new CIType[m];
350
351         // nThreads must be intact here.
352         assert(M > 0);
353         int quotient = M / nThreads;
354         if (quotient < 1) { nThreads = M; quotient = 1; }
355         int cur_gene_id = 0;
356         int num_isoforms = 0;
357
358         // A just so so strategy for paralleling
359         ciParamsArray = new CIParams[nThreads];
360         for (int i = 0; i < nThreads; i++) {
361                 ciParamsArray[i].no = i;
362                 ciParamsArray[i].start_gene_id = cur_gene_id;
363                 num_isoforms = 0;
364
365                 while ((m - cur_gene_id > nThreads - i - 1) && (i == nThreads - 1 || num_isoforms < quotient)) {
366                         num_isoforms += gi.spAt(cur_gene_id + 1) - gi.spAt(cur_gene_id);
367                         ++cur_gene_id;
368                 }
369
370                 ciParamsArray[i].end_gene_id = cur_gene_id;
371         }
372
373         threads = new pthread_t[nThreads];
374
375         /* set thread attribute to be joinable */
376         pthread_attr_init(&attr);
377         pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
378
379         // paralleling
380         for (int i = 0; i < nThreads; i++) {
381                 rc = pthread_create(&threads[i], &attr, &calcCI_batch, (void*)(&ciParamsArray[i]));
382                 pthread_assert(rc, "pthread_create", "Cannot create thread " + itos(i) + " (numbered from 0) in calculate_credibility_intervals!");
383         }
384         for (int i = 0; i < nThreads; i++) {
385                 rc = pthread_join(threads[i], &status);
386                 pthread_assert(rc, "pthread_join", "Cannot join thread " + itos(i) + " (numbered from 0) in calculate_credibility_intervals!");
387         }
388
389         // releasing resources
390
391         /* destroy attribute */
392         pthread_attr_destroy(&attr);
393         delete[] threads;
394
395         delete[] ciParamsArray;
396
397         //isoform level results
398         sprintf(outF, "%s.iso_res", imdName);
399         fo = fopen(outF, "a");
400         for (int i = 1; i <= M; i++)
401                 fprintf(fo, "%.6g%c", iso_tau[i].lb, (i < M ? '\t' : '\n'));
402         for (int i = 1; i <= M; i++)
403                 fprintf(fo, "%.6g%c", iso_tau[i].ub, (i < M ? '\t' : '\n'));
404         fclose(fo);
405
406         //gene level results
407         sprintf(outF, "%s.gene_res", imdName);
408         fo = fopen(outF, "a");
409         for (int i = 0; i < m; i++)
410                 fprintf(fo, "%.6g%c", gene_tau[i].lb, (i < m - 1 ? '\t' : '\n'));
411         for (int i = 0; i < m; i++)
412                 fprintf(fo, "%.6g%c", gene_tau[i].ub, (i < m - 1 ? '\t' : '\n'));
413         fclose(fo);
414
415         delete[] iso_tau;
416         delete[] gene_tau;
417
418         if (verbose) { printf("All credibility intervals are calculated!\n"); }
419 }
420
421 int main(int argc, char* argv[]) {
422         if (argc < 8) {
423                 printf("Usage: rsem-calculate-credibility-intervals reference_name sample_name sampleToken confidence nCV nSpC nMB [-p #Threads] [-q]\n");
424                 exit(-1);
425         }
426
427         confidence = atof(argv[4]);
428         nCV = atoi(argv[5]);
429         nSpC = atoi(argv[6]);
430         nMB = atoi(argv[7]);
431
432         nThreads = 1;
433         quiet = false;
434         for (int i = 8; i < argc; i++) {
435                 if (!strcmp(argv[i], "-p")) nThreads = atoi(argv[i + 1]);
436                 if (!strcmp(argv[i], "-q")) quiet = true;
437         }
438         verbose = !quiet;
439
440         sprintf(refF, "%s.seq", argv[1]);
441         refs.loadRefs(refF, 1);
442         M = refs.getM();
443         sprintf(groupF, "%s.grp", argv[1]);
444         gi.load(groupF);
445         m = gi.getm();
446
447         nSamples = nCV * nSpC;
448         cvlen = M + 1;
449         assert(nSamples > 0 && cvlen > 1); // for Buffter.h: (bufsize_type)nSamples
450
451         sprintf(imdName, "%s.temp/%s", argv[2], argv[3]);
452         sprintf(statName, "%s.stat/%s", argv[2], argv[3]);
453         sprintf(tmpF, "%s.tmp", imdName);
454         sprintf(cvsF, "%s.countvectors", imdName);
455
456         sprintf(modelF, "%s.model", statName);
457         FILE *fi = fopen(modelF, "r");
458         general_assert(fi != NULL, "Cannot open " + cstrtos(modelF) + "!");
459         assert(fscanf(fi, "%d", &model_type) == 1);
460         fclose(fi);
461
462         // Phase I
463         switch(model_type) {
464         case 0 : sample_theta_vectors_from_count_vectors<SingleModel>(); break;
465         case 1 : sample_theta_vectors_from_count_vectors<SingleQModel>(); break;
466         case 2 : sample_theta_vectors_from_count_vectors<PairedEndModel>(); break;
467         case 3 : sample_theta_vectors_from_count_vectors<PairedEndQModel>(); break;
468         }
469
470         // Phase II
471         calculate_credibility_intervals(imdName);
472
473         /*
474         sprintf(command, "rm -f %s", tmpF);
475         int status = system(command);
476         if (status != 0) {
477                 fprintf(stderr, "Cannot delete %s!\n", tmpF);
478                 exit(-1);
479         }
480         */
481
482         return 0;
483 }