]> git.donarmstrong.com Git - samtools.git/commitdiff
avoid wasting CPU time by the master thread
authorHeng Li <lh3@me.com>
Sun, 18 Mar 2012 18:12:45 +0000 (14:12 -0400)
committerHeng Li <lh3@me.com>
Sun, 18 Mar 2012 18:12:45 +0000 (14:12 -0400)
bgzf.c

diff --git a/bgzf.c b/bgzf.c
index 4c38f236f75b840a761a85a78bd19629b8f2cd24..7254fa21cb96a7b010688e6e6ccb37e4b7b0a474 100644 (file)
--- a/bgzf.c
+++ b/bgzf.c
@@ -387,29 +387,32 @@ typedef struct mtaux_t {
        pthread_cond_t cv;
 } mtaux_t;
 
-static void *mt_worker(void *data)
-{
-       int i, stop = 0;
-       worker_t *w = (worker_t*)data;
-       for (;;) {
-               int tmp;
-               pthread_mutex_lock(&w->mt->lock);
-               while (!w->toproc && !w->mt->done)
-                       pthread_cond_wait(&w->mt->cv, &w->mt->lock);
-               if (w->mt->done) stop = 1;
-               w->toproc = 0;
-               pthread_mutex_unlock(&w->mt->lock);
-               if (stop) break;
-               w->errcode = 0;
-               for (i = w->i; i < w->mt->curr; i += w->mt->n_threads) {
-                       int clen = BGZF_MAX_BLOCK_SIZE;
+static int worker_aux(worker_t *w)
+{
+       int i, tmp, stop = 0;
+       // wait for condition: to process or all done
+       pthread_mutex_lock(&w->mt->lock);
+       while (!w->toproc && !w->mt->done)
+               pthread_cond_wait(&w->mt->cv, &w->mt->lock);
+       if (w->mt->done) stop = 1;
+       w->toproc = 0;
+       pthread_mutex_unlock(&w->mt->lock);
+       if (stop) return 1; // to quit the thread
+       w->errcode = 0;
+       for (i = w->i; i < w->mt->curr; i += w->mt->n_threads) {
+               int clen = BGZF_MAX_BLOCK_SIZE;
                        if (bgzf_compress(w->buf, &clen, w->mt->blk[i], w->mt->len[i], w->fp->compress_level) != 0)
-                               w->errcode |= BGZF_ERR_ZLIB;
-                       memcpy(w->mt->blk[i], w->buf, clen);
-                       w->mt->len[i] = clen;
-               }
-               tmp = __sync_fetch_and_add(&w->mt->proc_cnt, 1);
+                       w->errcode |= BGZF_ERR_ZLIB;
+               memcpy(w->mt->blk[i], w->buf, clen);
+               w->mt->len[i] = clen;
        }
+       tmp = __sync_fetch_and_add(&w->mt->proc_cnt, 1);
+       return 0;
+}
+
+static void *mt_worker(void *data)
+{
+       while (worker_aux(data) == 0);
        return 0;
 }
 
@@ -426,7 +429,7 @@ int bgzf_mt(BGZF *fp, int n_threads, int n_sub_blks)
        mt->blk = calloc(mt->n_blks, sizeof(void*));
        for (i = 0; i < mt->n_blks; ++i)
                mt->blk[i] = malloc(BGZF_MAX_BLOCK_SIZE);
-       mt->tid = calloc(mt->n_threads, sizeof(pthread_t));
+       mt->tid = calloc(mt->n_threads, sizeof(pthread_t)); // tid[0] is not used, as the worker 0 is launched by the master
        mt->w = calloc(mt->n_threads, sizeof(worker_t));
        for (i = 0; i < mt->n_threads; ++i) {
                mt->w[i].i = i;
@@ -438,7 +441,7 @@ int bgzf_mt(BGZF *fp, int n_threads, int n_sub_blks)
        pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
        pthread_mutex_init(&mt->lock, 0);
        pthread_cond_init(&mt->cv, 0);
-       for (i = 0; i < mt->n_threads; ++i)
+       for (i = 1; i < mt->n_threads; ++i) // worker 0 is effectively launched by the master thread
                pthread_create(&mt->tid[i], &attr, mt_worker, &mt->w[i]);
        fp->mt = mt;
        return 0;
@@ -447,11 +450,13 @@ int bgzf_mt(BGZF *fp, int n_threads, int n_sub_blks)
 static void mt_destroy(mtaux_t *mt)
 {
        int i;
+       // signal all workers to quit
        pthread_mutex_lock(&mt->lock);
        mt->done = 1; mt->proc_cnt = 0;
        pthread_cond_broadcast(&mt->cv);
        pthread_mutex_unlock(&mt->lock);
-       for (i = 0; i < mt->n_threads; ++i) pthread_join(mt->tid[i], 0);
+       for (i = 1; i < mt->n_threads; ++i) pthread_join(mt->tid[i], 0); // worker 0 is effectively launched by the master thread
+       // free other data allocated on heap
        for (i = 0; i < mt->n_blks; ++i) free(mt->blk[i]);
        for (i = 0; i < mt->n_threads; ++i) free(mt->w[i].buf);
        free(mt->blk); free(mt->len); free(mt->w); free(mt->tid);
@@ -475,12 +480,17 @@ static int mt_flush(BGZF *fp)
        int i;
        mtaux_t *mt = (mtaux_t*)fp->mt;
        if (fp->block_offset) mt_queue(fp); // guaranteed that assertion does not fail
+       // signal all the workers to compress
        pthread_mutex_lock(&mt->lock);
        for (i = 0; i < mt->n_threads; ++i) mt->w[i].toproc = 1;
        mt->proc_cnt = 0;
        pthread_cond_broadcast(&mt->cv);
        pthread_mutex_unlock(&mt->lock);
+       // worker 0 is doing things here
+       worker_aux(&mt->w[0]);
+       // wait for all the threads to complete
        while (mt->proc_cnt < mt->n_threads);
+       // dump data to disk
        for (i = 0; i < mt->n_threads; ++i) fp->errcode |= mt->w[i].errcode;
        for (i = 0; i < mt->curr; ++i)
                if (fwrite(mt->blk[i], 1, mt->len[i], fp->fp) != mt->len[i])