X-Git-Url: https://git.donarmstrong.com/?p=samtools.git;a=blobdiff_plain;f=bgzf.c;h=880d5afcbabee58a68fa8a998e68bc5509c6462f;hp=9badafb62394fe6f9659a1a386677071e3aa72d1;hb=307c147168f7154e3755712797078c513e0b242a;hpb=4e0a2f896484bc0792e81b7036a17a3f4b15d2dd diff --git a/bgzf.c b/bgzf.c index 9badafb..880d5af 100644 --- a/bgzf.c +++ b/bgzf.c @@ -27,6 +27,7 @@ #include #include #include +#include #include #include "bgzf.h" @@ -218,7 +219,7 @@ static int inflate_block(BGZF* fp, int block_length) zs.next_in = fp->compressed_block + 18; zs.avail_in = block_length - 16; zs.next_out = fp->uncompressed_block; - zs.avail_out = BGZF_BLOCK_SIZE; + zs.avail_out = BGZF_MAX_BLOCK_SIZE; if (inflateInit2(&zs, -15) != Z_OK) { fp->errcode |= BGZF_ERR_ZLIB; @@ -266,7 +267,7 @@ static int load_block_from_cache(BGZF *fp, int64_t block_address) if (fp->block_length != 0) fp->block_offset = 0; fp->block_address = block_address; fp->block_length = p->size; - memcpy(fp->uncompressed_block, p->block, BGZF_BLOCK_SIZE); + memcpy(fp->uncompressed_block, p->block, BGZF_MAX_BLOCK_SIZE); _bgzf_seek((_bgzf_file_t)fp->fp, p->end_offset, SEEK_SET); return p->size; } @@ -277,8 +278,8 @@ static void cache_block(BGZF *fp, int size) khint_t k; cache_t *p; khash_t(cache) *h = (khash_t(cache)*)fp->cache; - if (BGZF_BLOCK_SIZE >= fp->cache_size) return; - if ((kh_size(h) + 1) * BGZF_BLOCK_SIZE > fp->cache_size) { + if (BGZF_MAX_BLOCK_SIZE >= fp->cache_size) return; + if ((kh_size(h) + 1) * BGZF_MAX_BLOCK_SIZE > fp->cache_size) { /* A better way would be to remove the oldest block in the * cache, but here we remove a random one for simplicity. This * should not have a big impact on performance. */ @@ -294,8 +295,8 @@ static void cache_block(BGZF *fp, int size) p = &kh_val(h, k); p->size = fp->block_length; p->end_offset = fp->block_address + size; - p->block = malloc(BGZF_BLOCK_SIZE); - memcpy(kh_val(h, k).block, fp->uncompressed_block, BGZF_BLOCK_SIZE); + p->block = malloc(BGZF_MAX_BLOCK_SIZE); + memcpy(kh_val(h, k).block, fp->uncompressed_block, BGZF_MAX_BLOCK_SIZE); } #else static void free_cache(BGZF *fp) {} @@ -309,7 +310,7 @@ int bgzf_read_block(BGZF *fp) int count, size = 0, block_length, remaining; int64_t block_address; block_address = _bgzf_tell((_bgzf_file_t)fp->fp); - if (load_block_from_cache(fp, block_address)) return 0; + if (fp->cache_size && load_block_from_cache(fp, block_address)) return 0; count = _bgzf_read(fp->fp, header, sizeof(header)); if (count == 0) { // no data read fp->block_length = 0; @@ -366,9 +367,166 @@ ssize_t bgzf_read(BGZF *fp, void *data, ssize_t length) return bytes_read; } +/***** BEGIN: multi-threading *****/ + +typedef struct { + BGZF *fp; + struct mtaux_t *mt; + void *buf; + int i, errcode, toproc; +} worker_t; + +typedef struct mtaux_t { + int n_threads, n_blks, curr, done; + volatile int proc_cnt; + void **blk; + int *len; + worker_t *w; + pthread_t *tid; + pthread_mutex_t lock; + pthread_cond_t cv; +} mtaux_t; + +static int worker_aux(worker_t *w) +{ + int i, tmp, stop = 0; + // wait for condition: to process or all done + pthread_mutex_lock(&w->mt->lock); + while (!w->toproc && !w->mt->done) + pthread_cond_wait(&w->mt->cv, &w->mt->lock); + if (w->mt->done) stop = 1; + w->toproc = 0; + pthread_mutex_unlock(&w->mt->lock); + if (stop) return 1; // to quit the thread + w->errcode = 0; + for (i = w->i; i < w->mt->curr; i += w->mt->n_threads) { + int clen = BGZF_MAX_BLOCK_SIZE; + if (bgzf_compress(w->buf, &clen, w->mt->blk[i], w->mt->len[i], w->fp->compress_level) != 0) + w->errcode |= BGZF_ERR_ZLIB; + memcpy(w->mt->blk[i], w->buf, clen); + w->mt->len[i] = clen; + } + tmp = __sync_fetch_and_add(&w->mt->proc_cnt, 1); + return 0; +} + +static void *mt_worker(void *data) +{ + while (worker_aux(data) == 0); + return 0; +} + +int bgzf_mt(BGZF *fp, int n_threads, int n_sub_blks) +{ + int i; + mtaux_t *mt; + pthread_attr_t attr; + if (!fp->is_write || fp->mt || n_threads <= 1) return -1; + mt = calloc(1, sizeof(mtaux_t)); + mt->n_threads = n_threads; + mt->n_blks = n_threads * n_sub_blks; + mt->len = calloc(mt->n_blks, sizeof(int)); + mt->blk = calloc(mt->n_blks, sizeof(void*)); + for (i = 0; i < mt->n_blks; ++i) + mt->blk[i] = malloc(BGZF_MAX_BLOCK_SIZE); + mt->tid = calloc(mt->n_threads, sizeof(pthread_t)); // tid[0] is not used, as the worker 0 is launched by the master + mt->w = calloc(mt->n_threads, sizeof(worker_t)); + for (i = 0; i < mt->n_threads; ++i) { + mt->w[i].i = i; + mt->w[i].mt = mt; + mt->w[i].fp = fp; + mt->w[i].buf = malloc(BGZF_MAX_BLOCK_SIZE); + } + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); + pthread_mutex_init(&mt->lock, 0); + pthread_cond_init(&mt->cv, 0); + for (i = 1; i < mt->n_threads; ++i) // worker 0 is effectively launched by the master thread + pthread_create(&mt->tid[i], &attr, mt_worker, &mt->w[i]); + fp->mt = mt; + return 0; +} + +static void mt_destroy(mtaux_t *mt) +{ + int i; + // signal all workers to quit + pthread_mutex_lock(&mt->lock); + mt->done = 1; mt->proc_cnt = 0; + pthread_cond_broadcast(&mt->cv); + pthread_mutex_unlock(&mt->lock); + for (i = 1; i < mt->n_threads; ++i) pthread_join(mt->tid[i], 0); // worker 0 is effectively launched by the master thread + // free other data allocated on heap + for (i = 0; i < mt->n_blks; ++i) free(mt->blk[i]); + for (i = 0; i < mt->n_threads; ++i) free(mt->w[i].buf); + free(mt->blk); free(mt->len); free(mt->w); free(mt->tid); + pthread_cond_destroy(&mt->cv); + pthread_mutex_destroy(&mt->lock); + free(mt); +} + +static void mt_queue(BGZF *fp) +{ + mtaux_t *mt = (mtaux_t*)fp->mt; + assert(mt->curr < mt->n_blks); // guaranteed by the caller + memcpy(mt->blk[mt->curr], fp->uncompressed_block, fp->block_offset); + mt->len[mt->curr] = fp->block_offset; + fp->block_offset = 0; + ++mt->curr; +} + +static int mt_flush(BGZF *fp) +{ + int i; + mtaux_t *mt = (mtaux_t*)fp->mt; + if (fp->block_offset) mt_queue(fp); // guaranteed that assertion does not fail + // signal all the workers to compress + pthread_mutex_lock(&mt->lock); + for (i = 0; i < mt->n_threads; ++i) mt->w[i].toproc = 1; + mt->proc_cnt = 0; + pthread_cond_broadcast(&mt->cv); + pthread_mutex_unlock(&mt->lock); + // worker 0 is doing things here + worker_aux(&mt->w[0]); + // wait for all the threads to complete + while (mt->proc_cnt < mt->n_threads); + // dump data to disk + for (i = 0; i < mt->n_threads; ++i) fp->errcode |= mt->w[i].errcode; + for (i = 0; i < mt->curr; ++i) + if (fwrite(mt->blk[i], 1, mt->len[i], fp->fp) != mt->len[i]) + fp->errcode |= BGZF_ERR_IO; + mt->curr = 0; + return 0; +} + +static int mt_lazy_flush(BGZF *fp) +{ + mtaux_t *mt = (mtaux_t*)fp->mt; + if (fp->block_offset) mt_queue(fp); + if (mt->curr == mt->n_blks) + return mt_flush(fp); + return -1; +} + +static ssize_t mt_write(BGZF *fp, const void *data, ssize_t length) +{ + const uint8_t *input = data; + ssize_t rest = length; + while (rest) { + int copy_length = BGZF_BLOCK_SIZE - fp->block_offset < rest? BGZF_BLOCK_SIZE - fp->block_offset : rest; + memcpy(fp->uncompressed_block + fp->block_offset, input, copy_length); + fp->block_offset += copy_length; input += copy_length; rest -= copy_length; + if (fp->block_offset == BGZF_BLOCK_SIZE) mt_lazy_flush(fp); + } + return length - rest; +} + +/***** END: multi-threading *****/ + int bgzf_flush(BGZF *fp) { if (!fp->is_write) return 0; + if (fp->mt) return mt_flush(fp); while (fp->block_offset > 0) { int block_length; block_length = deflate_block(fp, fp->block_offset); @@ -384,18 +542,19 @@ int bgzf_flush(BGZF *fp) int bgzf_flush_try(BGZF *fp, ssize_t size) { - if (fp->block_offset + size > BGZF_BLOCK_SIZE) - return bgzf_flush(fp); + if (fp->block_offset + size > BGZF_BLOCK_SIZE) { + if (fp->mt) return mt_lazy_flush(fp); + else return bgzf_flush(fp); + } return -1; } ssize_t bgzf_write(BGZF *fp, const void *data, ssize_t length) { const uint8_t *input = data; - int block_length = BGZF_BLOCK_SIZE, bytes_written; + int block_length = BGZF_BLOCK_SIZE, bytes_written = 0; assert(fp->is_write); - input = data; - bytes_written = 0; + if (fp->mt) return mt_write(fp, data, length); while (bytes_written < length) { uint8_t* buffer = fp->uncompressed_block; int copy_length = block_length - fp->block_offset < length - bytes_written? block_length - fp->block_offset : length - bytes_written; @@ -414,12 +573,14 @@ int bgzf_close(BGZF* fp) if (fp == 0) return -1; if (fp->is_write) { if (bgzf_flush(fp) != 0) return -1; + fp->compress_level = -1; block_length = deflate_block(fp, 0); // write an empty block count = fwrite(fp->compressed_block, 1, block_length, fp->fp); if (fflush(fp->fp) != 0) { fp->errcode |= BGZF_ERR_IO; return -1; } + if (fp->mt) mt_destroy(fp->mt); } ret = fp->is_write? fclose(fp->fp) : _bgzf_close(fp->fp); if (ret != 0) return -1;