#include<cstdio>
#include<fstream>
+#include<pthread.h>
+
+#include "my_assert.h"
typedef unsigned long long bufsize_type;
const int FLOATSIZE = sizeof(float);
class Buffer {
public:
- Buffer(bufsize_type size, int sp, int nSamples, int cvlen, const char* tmpF) {
+ Buffer(int nMB, int nSamples, int cvlen, const char* tmpF) {
cpos = 0;
- this->size = size;
+ size = bufsize_type(nMB) * 1024 * 1024 / FLOATSIZE / cvlen;
+ if (size > (bufsize_type)nSamples) size = nSamples;
+ general_assert(size > 0, "Memory allocated for credibility intervals is not enough!");
+ size *= cvlen;
+
buffer = new float[size];
ftmpOut.open(tmpF, std::ios::binary);
+ pthread_mutex_init(&lock, NULL);
- fr = to = sp;
+ fr = to = 0;
this->nSamples = nSamples;
this->cvlen = cvlen;
-
}
~Buffer() {
if (fr < to) flushToTempFile();
delete[] buffer;
+ pthread_mutex_destroy(&lock);
ftmpOut.close();
}
- void write(float *vec) {
- if (size - cpos < bufsize_type(cvlen)) flushToTempFile();
- memcpy(buffer + cpos, vec, FLOATSIZE * cvlen);
- cpos += cvlen;
- ++to;
+ void write(int n, float **vecs) {
+ pthread_assert(pthread_mutex_lock(&lock), "pthread_mutex_lock", "Error occurred while acquiring the lock!");
+ for (int i = 0; i < n; i++) {
+ if (size - cpos < bufsize_type(cvlen)) flushToTempFile();
+ memcpy(buffer + cpos, vecs[i], FLOATSIZE * cvlen);
+ cpos += cvlen;
+ ++to;
+ }
+ pthread_assert(pthread_mutex_unlock(&lock), "pthread_mutex_unlock", "Error occurred while releasing the lock!");
}
private:
float *buffer;
std::ofstream ftmpOut;
+ pthread_mutex_t lock;
int fr, to; // each flush, sample fr .. to - 1
int nSamples, cvlen;
std::streampos gap2 = std::streampos(nSamples - to) * FLOATSIZE;
float *p = NULL;
- ftmpOut.seekp(0, std::ios::beg);
+ ftmpOut.seekp(0, std::ios_base::beg);
for (int i = 0; i < cvlen; i++) {
p = buffer + i;
- ftmpOut.seekp(gap1, std::ios::cur);
+ ftmpOut.seekp(gap1, std::ios_base::cur);
for (int j = fr; j < to; j++) {
ftmpOut.write((char*)p, FLOATSIZE);
p += cvlen;
}
- ftmpOut.seekp(gap2, std::ios::cur);
+ ftmpOut.seekp(gap2, std::ios_base::cur);
}
cpos = 0;
struct Params {
int no;
FILE *fi;
- bufsize_type size;
- int sp;
engine_type *engine;
double *mw;
};
Params *params = (Params*)arg;
FILE *fi = params->fi;
double *mw = params->mw;
- Buffer buffer(params->size, params->sp, nSamples, cvlen, tmpF);
cvec = new int[cvlen];
theta = new double[cvlen];
gammas = new gamma_dist*[cvlen];
rgs = new gamma_generator*[cvlen];
- float *vec = new float[cvlen];
+ float **vecs = new float*[nSpC];
+ for (int i = 0; i < nSpC; i++) vecs[i] = new float[cvlen];
int cnt = 0;
while (fscanf(fi, "%d", &cvec[0]) == 1) {
sum = 0.0;
- vec[0] = theta[0];
+ vecs[i][0] = theta[0];
for (int j = 1; j < cvlen; j++)
if (eel[j] >= EPSILON) {
- vec[j] = theta[j] / eel[j];
- sum += vec[j];
+ vecs[i][j] = theta[j] / eel[j];
+ sum += vecs[i][j];
}
else assert(theta[j] < EPSILON);
- assert(sum >= EPSILON);
- for (int j = 1; j < cvlen; j++) vec[j] /= sum;
- buffer.write(vec);
+ assert(sum >= EPSILON);
+ for (int j = 1; j < cvlen; j++) vecs[i][j] /= sum;
}
+ buffer->write(nSpC, vecs);
+
for (int j = 0; j < cvlen; j++) {
delete gammas[j];
delete rgs[j];
delete[] gammas;
delete[] rgs;
- delete[] vec;
+ for (int i = 0; i < nSpC; i++) delete[] vecs[i];
+ delete[] vecs;
return NULL;
}
model.read(modelF);
calcExpectedEffectiveLengths<ModelType>(model);
- char splitF[STRLEN];
- bufsize_type buf_maxcv = bufsize_type(nMB) * 1024 * 1024 / FLOATSIZE / cvlen;
- bufsize_type quotient, left;
- int sum = 0;
- sprintf(splitF, "%s.split", imdName);
- FILE *fi = fopen(splitF, "r");
- int num_threads;
- assert(fscanf(fi, "%d", &num_threads) == 1);
- assert(num_threads <= nThreads);
+ int num_threads = min(nThreads, nCV);
- quotient = buf_maxcv / num_threads;
- assert(quotient > 0);
- left = buf_maxcv % num_threads;
+ buffer = new Buffer(nMB, nSamples, cvlen, tmpF);
paramsArray = new Params[num_threads];
threads = new pthread_t[num_threads];
paramsArray[i].no = i;
sprintf(inpF, "%s%d", cvsF, i);
paramsArray[i].fi = fopen(inpF, "r");
-
- int num_samples;
- assert(fscanf(fi, "%d", &num_samples) == 1);
- num_samples *= nSpC;
- if (bufsize_type(num_samples) <= quotient) paramsArray[i].size = num_samples;
- else {
- paramsArray[i].size = quotient;
- if (left > 0) { ++paramsArray[i].size; --left; }
- }
- paramsArray[i].size *= cvlen * FLOATSIZE;
- paramsArray[i].sp = sum;
- sum += num_samples;
-
paramsArray[i].engine = engineFactory::new_engine();
paramsArray[i].mw = model.getMW();
}
- fclose(fi);
-
/* set thread attribute to be joinable */
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
}
delete[] paramsArray;
+ delete buffer; // Must delete here, force the content left in the buffer be written into the disk
+
if (verbose) { printf("Sampling is finished!\n"); }
}
void calculate_credibility_intervals(char* imdName) {
FILE *fo;
char outF[STRLEN];
+ int num_threads = nThreads;
iso_tau = new CIType[M + 1];
gene_tau = new CIType[m];
- // nThreads must be intact here.
assert(M > 0);
- int quotient = M / nThreads;
- if (quotient < 1) { nThreads = M; quotient = 1; }
+ int quotient = M / num_threads;
+ if (quotient < 1) { num_threads = M; quotient = 1; }
int cur_gene_id = 0;
int num_isoforms = 0;
// A just so so strategy for paralleling
- ciParamsArray = new CIParams[nThreads];
- for (int i = 0; i < nThreads; i++) {
+ ciParamsArray = new CIParams[num_threads];
+ for (int i = 0; i < num_threads; i++) {
ciParamsArray[i].no = i;
ciParamsArray[i].start_gene_id = cur_gene_id;
num_isoforms = 0;
- while ((m - cur_gene_id > nThreads - i - 1) && (i == nThreads - 1 || num_isoforms < quotient)) {
+ while ((m - cur_gene_id > num_threads - i - 1) && (i == num_threads - 1 || num_isoforms < quotient)) {
num_isoforms += gi.spAt(cur_gene_id + 1) - gi.spAt(cur_gene_id);
++cur_gene_id;
}
ciParamsArray[i].end_gene_id = cur_gene_id;
}
- threads = new pthread_t[nThreads];
+ threads = new pthread_t[num_threads];
/* set thread attribute to be joinable */
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
// paralleling
- for (int i = 0; i < nThreads; i++) {
+ for (int i = 0; i < num_threads; i++) {
rc = pthread_create(&threads[i], &attr, &calcCI_batch, (void*)(&ciParamsArray[i]));
pthread_assert(rc, "pthread_create", "Cannot create thread " + itos(i) + " (numbered from 0) in calculate_credibility_intervals!");
}
- for (int i = 0; i < nThreads; i++) {
+ for (int i = 0; i < num_threads; i++) {
rc = pthread_join(threads[i], &status);
pthread_assert(rc, "pthread_join", "Cannot join thread " + itos(i) + " (numbered from 0) in calculate_credibility_intervals!");
}