]> git.donarmstrong.com Git - samtools.git/commitdiff
use conditional variable
authorHeng Li <lh3@me.com>
Sun, 18 Mar 2012 17:51:16 +0000 (13:51 -0400)
committerHeng Li <lh3@me.com>
Sun, 18 Mar 2012 17:51:16 +0000 (13:51 -0400)
Makefile
bgzf.c
bgzf.h

index 0d3eb4cde7ca8c894dc2bbe01c70284d09ede3e1..6ddd4ddbd6aac23bd955ed15253081f19cb5fc83 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -51,7 +51,7 @@ razip:razip.o razf.o $(KNETFILE_O)
 bgzip:bgzip.o bgzf.o $(KNETFILE_O)
                $(CC) $(CFLAGS) -o $@ bgzf.o bgzip.o $(KNETFILE_O) -lz -lpthread
 
-bgzf.o:bgzf.h
+bgzf.o:bgzf.c bgzf.h
                $(CC) -c $(CFLAGS) $(DFLAGS) -DBGZF_CACHE $(INCLUDES) bgzf.c -o $@
 
 razip.o:razf.h
diff --git a/bgzf.c b/bgzf.c
index 9de2fc356bd927d85bae6ba30ced1010cff69560..4c38f236f75b840a761a85a78bd19629b8f2cd24 100644 (file)
--- a/bgzf.c
+++ b/bgzf.c
@@ -373,22 +373,51 @@ typedef struct {
        BGZF *fp;
        struct mtaux_t *mt;
        void *buf;
-       int i, errcode;
+       int i, errcode, toproc;
 } worker_t;
 
 typedef struct mtaux_t {
-       int n_threads, n_blks, curr;
+       int n_threads, n_blks, curr, done;
+       volatile int proc_cnt;
        void **blk;
        int *len;
-       pthread_t *tid;
-       pthread_attr_t attr;
        worker_t *w;
+       pthread_t *tid;
+       pthread_mutex_t lock;
+       pthread_cond_t cv;
 } mtaux_t;
 
+static void *mt_worker(void *data)
+{
+       int i, stop = 0;
+       worker_t *w = (worker_t*)data;
+       for (;;) {
+               int tmp;
+               pthread_mutex_lock(&w->mt->lock);
+               while (!w->toproc && !w->mt->done)
+                       pthread_cond_wait(&w->mt->cv, &w->mt->lock);
+               if (w->mt->done) stop = 1;
+               w->toproc = 0;
+               pthread_mutex_unlock(&w->mt->lock);
+               if (stop) break;
+               w->errcode = 0;
+               for (i = w->i; i < w->mt->curr; i += w->mt->n_threads) {
+                       int clen = BGZF_MAX_BLOCK_SIZE;
+                       if (bgzf_compress(w->buf, &clen, w->mt->blk[i], w->mt->len[i], w->fp->compress_level) != 0)
+                               w->errcode |= BGZF_ERR_ZLIB;
+                       memcpy(w->mt->blk[i], w->buf, clen);
+                       w->mt->len[i] = clen;
+               }
+               tmp = __sync_fetch_and_add(&w->mt->proc_cnt, 1);
+       }
+       return 0;
+}
+
 int bgzf_mt(BGZF *fp, int n_threads, int n_sub_blks)
 {
        int i;
        mtaux_t *mt;
+       pthread_attr_t attr;
        if (!fp->is_write || fp->mt || n_threads <= 1) return -1;
        mt = calloc(1, sizeof(mtaux_t));
        mt->n_threads = n_threads;
@@ -405,8 +434,12 @@ int bgzf_mt(BGZF *fp, int n_threads, int n_sub_blks)
                mt->w[i].fp = fp;
                mt->w[i].buf = malloc(BGZF_MAX_BLOCK_SIZE);
        }
-       pthread_attr_init(&mt->attr);
-       pthread_attr_setdetachstate(&mt->attr, PTHREAD_CREATE_JOINABLE);
+       pthread_attr_init(&attr);
+       pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
+       pthread_mutex_init(&mt->lock, 0);
+       pthread_cond_init(&mt->cv, 0);
+       for (i = 0; i < mt->n_threads; ++i)
+               pthread_create(&mt->tid[i], &attr, mt_worker, &mt->w[i]);
        fp->mt = mt;
        return 0;
 }
@@ -414,27 +447,19 @@ int bgzf_mt(BGZF *fp, int n_threads, int n_sub_blks)
 static void mt_destroy(mtaux_t *mt)
 {
        int i;
+       pthread_mutex_lock(&mt->lock);
+       mt->done = 1; mt->proc_cnt = 0;
+       pthread_cond_broadcast(&mt->cv);
+       pthread_mutex_unlock(&mt->lock);
+       for (i = 0; i < mt->n_threads; ++i) pthread_join(mt->tid[i], 0);
        for (i = 0; i < mt->n_blks; ++i) free(mt->blk[i]);
        for (i = 0; i < mt->n_threads; ++i) free(mt->w[i].buf);
        free(mt->blk); free(mt->len); free(mt->w); free(mt->tid);
+       pthread_cond_destroy(&mt->cv);
+       pthread_mutex_destroy(&mt->lock);
        free(mt);
 }
 
-static void *mt_worker(void *data)
-{
-       int i;
-       worker_t *w = (worker_t*)data;
-       w->errcode = 0;
-       for (i = w->i; i < w->mt->curr; i += w->mt->n_threads) {
-               int clen = BGZF_MAX_BLOCK_SIZE;
-               if (bgzf_compress(w->buf, &clen, w->mt->blk[i], w->mt->len[i], w->fp->compress_level) != 0)
-                       w->errcode |= BGZF_ERR_ZLIB;
-               memcpy(w->mt->blk[i], w->buf, clen);
-               w->mt->len[i] = clen;
-       }
-       return 0;
-}
-
 static void mt_queue(BGZF *fp)
 {
        mtaux_t *mt = (mtaux_t*)fp->mt;
@@ -450,11 +475,13 @@ static int mt_flush(BGZF *fp)
        int i;
        mtaux_t *mt = (mtaux_t*)fp->mt;
        if (fp->block_offset) mt_queue(fp); // guaranteed that assertion does not fail
-       for (i = 0; i < mt->n_threads; ++i) pthread_create(&mt->tid[i], &mt->attr, mt_worker, &mt->w[i]);
-       for (i = 0; i < mt->n_threads; ++i) {
-               pthread_join(mt->tid[i], 0);
-               fp->errcode |= mt->w[i].errcode;
-       }
+       pthread_mutex_lock(&mt->lock);
+       for (i = 0; i < mt->n_threads; ++i) mt->w[i].toproc = 1;
+       mt->proc_cnt = 0;
+       pthread_cond_broadcast(&mt->cv);
+       pthread_mutex_unlock(&mt->lock);
+       while (mt->proc_cnt < mt->n_threads);
+       for (i = 0; i < mt->n_threads; ++i) fp->errcode |= mt->w[i].errcode;
        for (i = 0; i < mt->curr; ++i)
                if (fwrite(mt->blk[i], 1, mt->len[i], fp->fp) != mt->len[i])
                        fp->errcode |= BGZF_ERR_IO;
diff --git a/bgzf.h b/bgzf.h
index 2a70bb902771d937432602045842bf7e0cf1711c..9984d8fe8ea9146330335ac1d4ff5ed9db454ad5 100644 (file)
--- a/bgzf.h
+++ b/bgzf.h
@@ -190,6 +190,13 @@ extern "C" {
         */
        int bgzf_read_block(BGZF *fp);
 
+       /**
+        * Enable multi-threading (only effective on writing)
+        *
+        * @param fp          BGZF file handler; must be opened for writing
+        * @param n_threads   #threads used for writing
+        * @param n_sub_blks  #blocks processed by each thread; a value 64-256 is recommended
+        */
        int bgzf_mt(BGZF *fp, int n_threads, int n_sub_blks);
 
 #ifdef __cplusplus