From e50413dd979a616b604914afd2488a308359ec6a Mon Sep 17 00:00:00 2001 From: Heng Li Date: Sun, 18 Mar 2012 01:23:55 -0400 Subject: [PATCH] preliminary implementation; not tested --- bgzf.c | 123 ++++++++++++++++++++++++++++++++++++++++++++++++++++++--- bgzf.h | 6 ++- 2 files changed, 122 insertions(+), 7 deletions(-) diff --git a/bgzf.c b/bgzf.c index 9badafb..566a1e3 100644 --- a/bgzf.c +++ b/bgzf.c @@ -27,6 +27,7 @@ #include #include #include +#include #include #include "bgzf.h" @@ -366,9 +367,118 @@ ssize_t bgzf_read(BGZF *fp, void *data, ssize_t length) return bytes_read; } +/***** BEGIN: multi-threading *****/ + +typedef struct { + BGZF *fp; + struct mtaux_t *mt; + void *buf; + int i, errcode; +} worker_t; + +typedef struct mtaux_t { + int n_threads, n_blks, curr; + void **blk; + int *len; + pthread_t *tid; + pthread_attr_t attr; + worker_t *w; +} mtaux_t; + +void bgzf_mt(BGZF *fp, int n_threads, int n_sub_blks) +{ + int i; + mtaux_t *mt; + if (!fp->is_write || fp->mt || n_threads <= 1) return; + mt = calloc(1, sizeof(mtaux_t)); + mt->n_threads = n_threads; + mt->n_blks = n_threads * n_sub_blks; + mt->blk = calloc(mt->n_blks, sizeof(void*)); + for (i = 0; i < mt->n_blks; ++i) + mt->blk[i] = malloc(BGZF_MAX_BLOCK_SIZE); + mt->tid = calloc(mt->n_threads, sizeof(pthread_t)); + mt->w = calloc(mt->n_threads, sizeof(worker_t)); + for (i = 0; i < mt->n_threads; ++i) { + mt->w[i].i = i; + mt->w[i].mt = mt; + mt->w[i].fp = fp; + mt->w[i].buf = malloc(BGZF_MAX_BLOCK_SIZE); + } + pthread_attr_init(&mt->attr); + pthread_attr_setdetachstate(&mt->attr, PTHREAD_CREATE_JOINABLE); + fp->mt = mt; +} + +static void mt_destroy(mtaux_t *mt) +{ + int i; + for (i = 0; i < mt->n_blks; ++i) free(mt->blk[i]); + for (i = 0; i < mt->n_threads; ++i) free(mt->w[i].buf); + free(mt->blk); free(mt->len); free(mt->w); free(mt->tid); + free(mt); +} + +static void *mt_worker(void *data) +{ + int i; + worker_t *w = (worker_t*)data; + w->errcode = 0; + for (i = w->i; i < w->mt->curr; i += w->mt->n_threads) { + int clen = BGZF_MAX_BLOCK_SIZE; + if (bgzf_compress(w->buf, &clen, w->mt->blk[i], w->mt->len[i], w->fp->compress_level) != 0) + w->errcode |= BGZF_ERR_ZLIB; + memcpy(w->mt->blk[i], w->buf, clen); + w->mt->len[i] = clen; + } + return 0; +} + +static int mt_flush(BGZF *fp) +{ + int i; + mtaux_t *mt = (mtaux_t*)fp->mt; + if (mt->curr == 0) return 0; + for (i = 0; i < mt->n_threads; ++i) pthread_create(&mt->tid[i], &mt->attr, mt_worker, &mt->w[i]); + for (i = 0; i < mt->n_threads; ++i) { + pthread_join(mt->tid[i], 0); + fp->errcode |= mt->w[i].errcode; + } + for (i = 0; i < mt->curr; ++i) + if (fwrite(mt->blk[i], 1, mt->len[i], fp->fp) != mt->len[i]) + fp->errcode |= BGZF_ERR_IO; + mt->curr = 0; + return 0; +} + +static int mt_push_blk(BGZF *fp) +{ + mtaux_t *mt = (mtaux_t*)fp->mt; + memcpy(mt->blk[mt->curr], fp->uncompressed_block, fp->block_offset); + mt->len[mt->curr] = fp->block_offset; + fp->block_offset = 0; + if (++mt->curr == mt->n_blks) mt_flush(fp); + return 0; +} + +static ssize_t mt_write(BGZF *fp, const void *data, ssize_t length) +{ + const uint8_t *input = data; + ssize_t rest = length; + while (rest) { + int copy_length = BGZF_BLOCK_SIZE - fp->block_offset < rest? BGZF_BLOCK_SIZE - fp->block_offset : rest; + memcpy(fp->uncompressed_block + fp->block_offset, input, copy_length); + fp->block_offset += copy_length; input += copy_length; rest -= copy_length; + if (fp->block_offset == BGZF_BLOCK_SIZE) mt_push_blk(fp); + } + return length - rest; +} + +/***** END: multi-threading *****/ + int bgzf_flush(BGZF *fp) { if (!fp->is_write) return 0; + if (fp->mt) return mt_flush(fp); while (fp->block_offset > 0) { int block_length; block_length = deflate_block(fp, fp->block_offset); @@ -384,18 +494,19 @@ int bgzf_flush(BGZF *fp) int bgzf_flush_try(BGZF *fp, ssize_t size) { - if (fp->block_offset + size > BGZF_BLOCK_SIZE) - return bgzf_flush(fp); + if (fp->block_offset + size > BGZF_BLOCK_SIZE) { + if (fp->mt) return mt_push_blk(fp); + else return bgzf_flush(fp); + } return -1; } ssize_t bgzf_write(BGZF *fp, const void *data, ssize_t length) { const uint8_t *input = data; - int block_length = BGZF_BLOCK_SIZE, bytes_written; + int block_length = BGZF_BLOCK_SIZE, bytes_written = 0; assert(fp->is_write); - input = data; - bytes_written = 0; + if (fp->mt) return mt_write(fp, data, length); while (bytes_written < length) { uint8_t* buffer = fp->uncompressed_block; int copy_length = block_length - fp->block_offset < length - bytes_written? block_length - fp->block_offset : length - bytes_written; @@ -414,12 +525,14 @@ int bgzf_close(BGZF* fp) if (fp == 0) return -1; if (fp->is_write) { if (bgzf_flush(fp) != 0) return -1; + fp->compress_level = -1; block_length = deflate_block(fp, 0); // write an empty block count = fwrite(fp->compressed_block, 1, block_length, fp->fp); if (fflush(fp->fp) != 0) { fp->errcode |= BGZF_ERR_IO; return -1; } + if (fp->mt) mt_destroy(fp->mt); } ret = fp->is_write? fclose(fp->fp) : _bgzf_close(fp->fp); if (ret != 0) return -1; diff --git a/bgzf.h b/bgzf.h index 4f69225..7d928a7 100644 --- a/bgzf.h +++ b/bgzf.h @@ -41,14 +41,14 @@ #define BGZF_ERR_MISUSE 8 typedef struct { - int errcode:30, is_write:2; - int compress_level, n_threads; + int errcode:16, is_write:2, compress_level:14; int cache_size; int block_length, block_offset; int64_t block_address; void *uncompressed_block, *compressed_block; void *cache; // a pointer to a hash table void *fp; // actual file handler; FILE* on writing; FILE* or knetFile* on reading + void *mt; // only used for multi-threading } BGZF; #ifndef KSTRING_T @@ -190,6 +190,8 @@ extern "C" { */ int bgzf_read_block(BGZF *fp); + void bgzf_mt(BGZF *fp, int n_threads, int n_sub_blks); + #ifdef __cplusplus } #endif -- 2.39.2