3 Copyright (c) 2008 Broad Institute / Massachusetts Institute of Technology
4 2011 Attractive Chaos <attractor@live.co.uk>
6 Permission is hereby granted, free of charge, to any person obtaining a copy
7 of this software and associated documentation files (the "Software"), to deal
8 in the Software without restriction, including without limitation the rights
9 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10 copies of the Software, and to permit persons to whom the Software is
11 furnished to do so, subject to the following conditions:
13 The above copyright notice and this permission notice shall be included in
14 all copies or substantial portions of the Software.
16 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
31 #include <sys/types.h>
36 typedef knetFile *_bgzf_file_t;
37 #define _bgzf_open(fn, mode) knet_open(fn, mode)
38 #define _bgzf_dopen(fp, mode) knet_dopen(fp, mode)
39 #define _bgzf_close(fp) knet_close(fp)
40 #define _bgzf_fileno(fp) ((fp)->fd)
41 #define _bgzf_tell(fp) knet_tell(fp)
42 #define _bgzf_seek(fp, offset, whence) knet_seek(fp, offset, whence)
43 #define _bgzf_read(fp, buf, len) knet_read(fp, buf, len)
44 #define _bgzf_write(fp, buf, len) knet_write(fp, buf, len)
45 #else // ~defined(_USE_KNETFILE)
46 #if defined(_WIN32) || defined(_MSC_VER)
47 #define ftello(fp) ftell(fp)
48 #define fseeko(fp, offset, whence) fseek(fp, offset, whence)
49 #else // ~defined(_WIN32)
50 extern off_t ftello(FILE *stream);
51 extern int fseeko(FILE *stream, off_t offset, int whence);
52 #endif // ~defined(_WIN32)
53 typedef FILE *_bgzf_file_t;
54 #define _bgzf_open(fn, mode) fopen(fn, mode)
55 #define _bgzf_dopen(fp, mode) fdopen(fp, mode)
56 #define _bgzf_close(fp) fclose(fp)
57 #define _bgzf_fileno(fp) fileno(fp)
58 #define _bgzf_tell(fp) ftello(fp)
59 #define _bgzf_seek(fp, offset, whence) fseeko(fp, offset, whence)
60 #define _bgzf_read(fp, buf, len) fread(buf, 1, len, fp)
61 #define _bgzf_write(fp, buf, len) fwrite(buf, 1, len, fp)
62 #endif // ~define(_USE_KNETFILE)
64 #define BLOCK_HEADER_LENGTH 18
65 #define BLOCK_FOOTER_LENGTH 8
68 /* BGZF/GZIP header (speciallized from RFC 1952; little endian):
69 +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
70 | 31|139| 8| 4| 0| 0|255| 6| 66| 67| 2|BLK_LEN|
71 +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
73 static const uint8_t g_magic[19] = "\037\213\010\4\0\0\0\0\0\377\6\0\102\103\2\0\0\0";
82 KHASH_MAP_INIT_INT64(cache, cache_t)
85 static inline void packInt16(uint8_t *buffer, uint16_t value)
88 buffer[1] = value >> 8;
91 static inline int unpackInt16(const uint8_t *buffer)
93 return buffer[0] | buffer[1] << 8;
96 static inline void packInt32(uint8_t *buffer, uint32_t value)
99 buffer[1] = value >> 8;
100 buffer[2] = value >> 16;
101 buffer[3] = value >> 24;
104 static BGZF *bgzf_read_init()
107 fp = calloc(1, sizeof(BGZF));
109 fp->uncompressed_block = malloc(BGZF_MAX_BLOCK_SIZE);
110 fp->compressed_block = malloc(BGZF_MAX_BLOCK_SIZE);
112 fp->cache = kh_init(cache);
117 static BGZF *bgzf_write_init(int compress_level) // compress_level==-1 for the default level
120 fp = calloc(1, sizeof(BGZF));
122 fp->uncompressed_block = malloc(BGZF_MAX_BLOCK_SIZE);
123 fp->compressed_block = malloc(BGZF_MAX_BLOCK_SIZE);
124 fp->compress_level = compress_level < 0? Z_DEFAULT_COMPRESSION : compress_level; // Z_DEFAULT_COMPRESSION==-1
125 if (fp->compress_level > 9) fp->compress_level = Z_DEFAULT_COMPRESSION;
128 // get the compress level from the mode string
129 static int mode2level(const char *__restrict mode)
131 int i, compress_level = -1;
132 for (i = 0; mode[i]; ++i)
133 if (mode[i] >= '0' && mode[i] <= '9') break;
134 if (mode[i]) compress_level = (int)mode[i] - '0';
135 if (strchr(mode, 'u')) compress_level = 0;
136 return compress_level;
139 BGZF *bgzf_open(const char *path, const char *mode)
142 assert(compressBound(BGZF_BLOCK_SIZE) < BGZF_MAX_BLOCK_SIZE);
143 if (strchr(mode, 'r') || strchr(mode, 'R')) {
145 if ((fpr = _bgzf_open(path, "r")) == 0) return 0;
146 fp = bgzf_read_init();
148 } else if (strchr(mode, 'w') || strchr(mode, 'W')) {
150 if ((fpw = fopen(path, "w")) == 0) return 0;
151 fp = bgzf_write_init(mode2level(mode));
157 BGZF *bgzf_dopen(int fd, const char *mode)
160 assert(compressBound(BGZF_BLOCK_SIZE) < BGZF_MAX_BLOCK_SIZE);
161 if (strchr(mode, 'r') || strchr(mode, 'R')) {
163 if ((fpr = _bgzf_dopen(fd, "r")) == 0) return 0;
164 fp = bgzf_read_init();
166 } else if (strchr(mode, 'w') || strchr(mode, 'W')) {
168 if ((fpw = fdopen(fd, "w")) == 0) return 0;
169 fp = bgzf_write_init(mode2level(mode));
175 static int bgzf_compress(void *_dst, int *dlen, void *src, int slen, int level)
179 uint8_t *dst = (uint8_t*)_dst;
182 zs.zalloc = NULL; zs.zfree = NULL;
185 zs.next_out = dst + BLOCK_HEADER_LENGTH;
186 zs.avail_out = *dlen - BLOCK_HEADER_LENGTH - BLOCK_FOOTER_LENGTH;
187 if (deflateInit2(&zs, level, Z_DEFLATED, -15, 8, Z_DEFAULT_STRATEGY) != Z_OK) return -1; // -15 to disable zlib header/footer
188 if (deflate(&zs, Z_FINISH) != Z_STREAM_END) return -1;
189 if (deflateEnd(&zs) != Z_OK) return -1;
190 *dlen = zs.total_out + BLOCK_HEADER_LENGTH + BLOCK_FOOTER_LENGTH;
192 memcpy(dst, g_magic, BLOCK_HEADER_LENGTH); // the last two bytes are a place holder for the length of the block
193 packInt16(&dst[16], *dlen - 1); // write the compressed length; -1 to fit 2 bytes
195 crc = crc32(crc32(0L, NULL, 0L), src, slen);
196 packInt32((uint8_t*)&dst[*dlen - 8], crc);
197 packInt32((uint8_t*)&dst[*dlen - 4], slen);
201 // Deflate the block in fp->uncompressed_block into fp->compressed_block. Also adds an extra field that stores the compressed block length.
202 static int deflate_block(BGZF *fp, int block_length)
204 int comp_size = BGZF_MAX_BLOCK_SIZE;
205 if (bgzf_compress(fp->compressed_block, &comp_size, fp->uncompressed_block, block_length, fp->compress_level) != 0) {
206 fp->errcode |= BGZF_ERR_ZLIB;
209 fp->block_offset = 0;
213 // Inflate the block in fp->compressed_block into fp->uncompressed_block
214 static int inflate_block(BGZF* fp, int block_length)
219 zs.next_in = fp->compressed_block + 18;
220 zs.avail_in = block_length - 16;
221 zs.next_out = fp->uncompressed_block;
222 zs.avail_out = BGZF_MAX_BLOCK_SIZE;
224 if (inflateInit2(&zs, -15) != Z_OK) {
225 fp->errcode |= BGZF_ERR_ZLIB;
228 if (inflate(&zs, Z_FINISH) != Z_STREAM_END) {
230 fp->errcode |= BGZF_ERR_ZLIB;
233 if (inflateEnd(&zs) != Z_OK) {
234 fp->errcode |= BGZF_ERR_ZLIB;
240 static int check_header(const uint8_t *header)
242 return (header[0] == 31 && header[1] == 139 && header[2] == 8 && (header[3] & 4) != 0
243 && unpackInt16((uint8_t*)&header[10]) == 6
244 && header[12] == 'B' && header[13] == 'C'
245 && unpackInt16((uint8_t*)&header[14]) == 2);
249 static void free_cache(BGZF *fp)
252 khash_t(cache) *h = (khash_t(cache)*)fp->cache;
253 if (fp->is_write) return;
254 for (k = kh_begin(h); k < kh_end(h); ++k)
255 if (kh_exist(h, k)) free(kh_val(h, k).block);
256 kh_destroy(cache, h);
259 static int load_block_from_cache(BGZF *fp, int64_t block_address)
263 khash_t(cache) *h = (khash_t(cache)*)fp->cache;
264 k = kh_get(cache, h, block_address);
265 if (k == kh_end(h)) return 0;
267 if (fp->block_length != 0) fp->block_offset = 0;
268 fp->block_address = block_address;
269 fp->block_length = p->size;
270 memcpy(fp->uncompressed_block, p->block, BGZF_MAX_BLOCK_SIZE);
271 _bgzf_seek((_bgzf_file_t)fp->fp, p->end_offset, SEEK_SET);
275 static void cache_block(BGZF *fp, int size)
280 khash_t(cache) *h = (khash_t(cache)*)fp->cache;
281 if (BGZF_MAX_BLOCK_SIZE >= fp->cache_size) return;
282 if ((kh_size(h) + 1) * BGZF_MAX_BLOCK_SIZE > fp->cache_size) {
283 /* A better way would be to remove the oldest block in the
284 * cache, but here we remove a random one for simplicity. This
285 * should not have a big impact on performance. */
286 for (k = kh_begin(h); k < kh_end(h); ++k)
287 if (kh_exist(h, k)) break;
289 free(kh_val(h, k).block);
293 k = kh_put(cache, h, fp->block_address, &ret);
294 if (ret == 0) return; // if this happens, a bug!
296 p->size = fp->block_length;
297 p->end_offset = fp->block_address + size;
298 p->block = malloc(BGZF_MAX_BLOCK_SIZE);
299 memcpy(kh_val(h, k).block, fp->uncompressed_block, BGZF_MAX_BLOCK_SIZE);
302 static void free_cache(BGZF *fp) {}
303 static int load_block_from_cache(BGZF *fp, int64_t block_address) {return 0;}
304 static void cache_block(BGZF *fp, int size) {}
307 int bgzf_read_block(BGZF *fp)
309 uint8_t header[BLOCK_HEADER_LENGTH], *compressed_block;
310 int count, size = 0, block_length, remaining;
311 int64_t block_address;
312 block_address = _bgzf_tell((_bgzf_file_t)fp->fp);
313 if (fp->cache_size && load_block_from_cache(fp, block_address)) return 0;
314 count = _bgzf_read(fp->fp, header, sizeof(header));
315 if (count == 0) { // no data read
316 fp->block_length = 0;
319 if (count != sizeof(header) || !check_header(header)) {
320 fp->errcode |= BGZF_ERR_HEADER;
324 block_length = unpackInt16((uint8_t*)&header[16]) + 1; // +1 because when writing this number, we used "-1"
325 compressed_block = (uint8_t*)fp->compressed_block;
326 memcpy(compressed_block, header, BLOCK_HEADER_LENGTH);
327 remaining = block_length - BLOCK_HEADER_LENGTH;
328 count = _bgzf_read(fp->fp, &compressed_block[BLOCK_HEADER_LENGTH], remaining);
329 if (count != remaining) {
330 fp->errcode |= BGZF_ERR_IO;
334 if ((count = inflate_block(fp, block_length)) < 0) return -1;
335 if (fp->block_length != 0) fp->block_offset = 0; // Do not reset offset if this read follows a seek.
336 fp->block_address = block_address;
337 fp->block_length = count;
338 cache_block(fp, size);
342 ssize_t bgzf_read(BGZF *fp, void *data, ssize_t length)
344 ssize_t bytes_read = 0;
345 uint8_t *output = data;
346 if (length <= 0) return 0;
347 assert(fp->is_write == 0);
348 while (bytes_read < length) {
349 int copy_length, available = fp->block_length - fp->block_offset;
351 if (available <= 0) {
352 if (bgzf_read_block(fp) != 0) return -1;
353 available = fp->block_length - fp->block_offset;
354 if (available <= 0) break;
356 copy_length = length - bytes_read < available? length - bytes_read : available;
357 buffer = fp->uncompressed_block;
358 memcpy(output, buffer + fp->block_offset, copy_length);
359 fp->block_offset += copy_length;
360 output += copy_length;
361 bytes_read += copy_length;
363 if (fp->block_offset == fp->block_length) {
364 fp->block_address = _bgzf_tell((_bgzf_file_t)fp->fp);
365 fp->block_offset = fp->block_length = 0;
370 /***** BEGIN: multi-threading *****/
376 int i, errcode, toproc;
379 typedef struct mtaux_t {
380 int n_threads, n_blks, curr, done;
381 volatile int proc_cnt;
386 pthread_mutex_t lock;
390 static void *mt_worker(void *data)
393 worker_t *w = (worker_t*)data;
396 pthread_mutex_lock(&w->mt->lock);
397 while (!w->toproc && !w->mt->done)
398 pthread_cond_wait(&w->mt->cv, &w->mt->lock);
399 if (w->mt->done) stop = 1;
401 pthread_mutex_unlock(&w->mt->lock);
404 for (i = w->i; i < w->mt->curr; i += w->mt->n_threads) {
405 int clen = BGZF_MAX_BLOCK_SIZE;
406 if (bgzf_compress(w->buf, &clen, w->mt->blk[i], w->mt->len[i], w->fp->compress_level) != 0)
407 w->errcode |= BGZF_ERR_ZLIB;
408 memcpy(w->mt->blk[i], w->buf, clen);
409 w->mt->len[i] = clen;
411 tmp = __sync_fetch_and_add(&w->mt->proc_cnt, 1);
416 int bgzf_mt(BGZF *fp, int n_threads, int n_sub_blks)
421 if (!fp->is_write || fp->mt || n_threads <= 1) return -1;
422 mt = calloc(1, sizeof(mtaux_t));
423 mt->n_threads = n_threads;
424 mt->n_blks = n_threads * n_sub_blks;
425 mt->len = calloc(mt->n_blks, sizeof(int));
426 mt->blk = calloc(mt->n_blks, sizeof(void*));
427 for (i = 0; i < mt->n_blks; ++i)
428 mt->blk[i] = malloc(BGZF_MAX_BLOCK_SIZE);
429 mt->tid = calloc(mt->n_threads, sizeof(pthread_t));
430 mt->w = calloc(mt->n_threads, sizeof(worker_t));
431 for (i = 0; i < mt->n_threads; ++i) {
435 mt->w[i].buf = malloc(BGZF_MAX_BLOCK_SIZE);
437 pthread_attr_init(&attr);
438 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
439 pthread_mutex_init(&mt->lock, 0);
440 pthread_cond_init(&mt->cv, 0);
441 for (i = 0; i < mt->n_threads; ++i)
442 pthread_create(&mt->tid[i], &attr, mt_worker, &mt->w[i]);
447 static void mt_destroy(mtaux_t *mt)
450 pthread_mutex_lock(&mt->lock);
451 mt->done = 1; mt->proc_cnt = 0;
452 pthread_cond_broadcast(&mt->cv);
453 pthread_mutex_unlock(&mt->lock);
454 for (i = 0; i < mt->n_threads; ++i) pthread_join(mt->tid[i], 0);
455 for (i = 0; i < mt->n_blks; ++i) free(mt->blk[i]);
456 for (i = 0; i < mt->n_threads; ++i) free(mt->w[i].buf);
457 free(mt->blk); free(mt->len); free(mt->w); free(mt->tid);
458 pthread_cond_destroy(&mt->cv);
459 pthread_mutex_destroy(&mt->lock);
463 static void mt_queue(BGZF *fp)
465 mtaux_t *mt = (mtaux_t*)fp->mt;
466 assert(mt->curr < mt->n_blks); // guaranteed by the caller
467 memcpy(mt->blk[mt->curr], fp->uncompressed_block, fp->block_offset);
468 mt->len[mt->curr] = fp->block_offset;
469 fp->block_offset = 0;
473 static int mt_flush(BGZF *fp)
476 mtaux_t *mt = (mtaux_t*)fp->mt;
477 if (fp->block_offset) mt_queue(fp); // guaranteed that assertion does not fail
478 pthread_mutex_lock(&mt->lock);
479 for (i = 0; i < mt->n_threads; ++i) mt->w[i].toproc = 1;
481 pthread_cond_broadcast(&mt->cv);
482 pthread_mutex_unlock(&mt->lock);
483 while (mt->proc_cnt < mt->n_threads);
484 for (i = 0; i < mt->n_threads; ++i) fp->errcode |= mt->w[i].errcode;
485 for (i = 0; i < mt->curr; ++i)
486 if (fwrite(mt->blk[i], 1, mt->len[i], fp->fp) != mt->len[i])
487 fp->errcode |= BGZF_ERR_IO;
492 static int mt_lazy_flush(BGZF *fp)
494 mtaux_t *mt = (mtaux_t*)fp->mt;
496 if (mt->curr == mt->n_blks)
501 static ssize_t mt_write(BGZF *fp, const void *data, ssize_t length)
503 const uint8_t *input = data;
504 ssize_t rest = length;
506 int copy_length = BGZF_BLOCK_SIZE - fp->block_offset < rest? BGZF_BLOCK_SIZE - fp->block_offset : rest;
507 memcpy(fp->uncompressed_block + fp->block_offset, input, copy_length);
508 fp->block_offset += copy_length; input += copy_length; rest -= copy_length;
509 if (fp->block_offset == BGZF_BLOCK_SIZE) mt_lazy_flush(fp);
511 return length - rest;
514 /***** END: multi-threading *****/
516 int bgzf_flush(BGZF *fp)
518 if (!fp->is_write) return 0;
519 if (fp->mt) return mt_flush(fp);
520 while (fp->block_offset > 0) {
522 block_length = deflate_block(fp, fp->block_offset);
523 if (block_length < 0) return -1;
524 if (fwrite(fp->compressed_block, 1, block_length, fp->fp) != block_length) {
525 fp->errcode |= BGZF_ERR_IO; // possibly truncated file
528 fp->block_address += block_length;
533 int bgzf_flush_try(BGZF *fp, ssize_t size)
535 if (fp->block_offset + size > BGZF_BLOCK_SIZE) {
536 if (fp->mt) return mt_lazy_flush(fp);
537 else return bgzf_flush(fp);
542 ssize_t bgzf_write(BGZF *fp, const void *data, ssize_t length)
544 const uint8_t *input = data;
545 int block_length = BGZF_BLOCK_SIZE, bytes_written = 0;
546 assert(fp->is_write);
547 if (fp->mt) return mt_write(fp, data, length);
548 while (bytes_written < length) {
549 uint8_t* buffer = fp->uncompressed_block;
550 int copy_length = block_length - fp->block_offset < length - bytes_written? block_length - fp->block_offset : length - bytes_written;
551 memcpy(buffer + fp->block_offset, input, copy_length);
552 fp->block_offset += copy_length;
553 input += copy_length;
554 bytes_written += copy_length;
555 if (fp->block_offset == block_length && bgzf_flush(fp)) break;
557 return bytes_written;
560 int bgzf_close(BGZF* fp)
562 int ret, count, block_length;
563 if (fp == 0) return -1;
565 if (bgzf_flush(fp) != 0) return -1;
566 fp->compress_level = -1;
567 block_length = deflate_block(fp, 0); // write an empty block
568 count = fwrite(fp->compressed_block, 1, block_length, fp->fp);
569 if (fflush(fp->fp) != 0) {
570 fp->errcode |= BGZF_ERR_IO;
573 if (fp->mt) mt_destroy(fp->mt);
575 ret = fp->is_write? fclose(fp->fp) : _bgzf_close(fp->fp);
576 if (ret != 0) return -1;
577 free(fp->uncompressed_block);
578 free(fp->compressed_block);
584 void bgzf_set_cache_size(BGZF *fp, int cache_size)
586 if (fp) fp->cache_size = cache_size;
589 int bgzf_check_EOF(BGZF *fp)
591 static uint8_t magic[28] = "\037\213\010\4\0\0\0\0\0\377\6\0\102\103\2\0\033\0\3\0\0\0\0\0\0\0\0\0";
594 offset = _bgzf_tell((_bgzf_file_t)fp->fp);
595 if (_bgzf_seek(fp->fp, -28, SEEK_END) < 0) return 0;
596 _bgzf_read(fp->fp, buf, 28);
597 _bgzf_seek(fp->fp, offset, SEEK_SET);
598 return (memcmp(magic, buf, 28) == 0)? 1 : 0;
601 int64_t bgzf_seek(BGZF* fp, int64_t pos, int where)
604 int64_t block_address;
606 if (fp->is_write || where != SEEK_SET) {
607 fp->errcode |= BGZF_ERR_MISUSE;
610 block_offset = pos & 0xFFFF;
611 block_address = pos >> 16;
612 if (_bgzf_seek(fp->fp, block_address, SEEK_SET) < 0) {
613 fp->errcode |= BGZF_ERR_IO;
616 fp->block_length = 0; // indicates current block has not been loaded
617 fp->block_address = block_address;
618 fp->block_offset = block_offset;
622 int bgzf_is_bgzf(const char *fn)
627 if ((fp = _bgzf_open(fn, "r")) == 0) return 0;
628 n = _bgzf_read(fp, buf, 16);
630 if (n != 16) return 0;
631 return memcmp(g_magic, buf, 16) == 0? 1 : 0;
634 int bgzf_getc(BGZF *fp)
637 if (fp->block_offset >= fp->block_length) {
638 if (bgzf_read_block(fp) != 0) return -2; /* error */
639 if (fp->block_length == 0) return -1; /* end-of-file */
641 c = ((unsigned char*)fp->uncompressed_block)[fp->block_offset++];
642 if (fp->block_offset == fp->block_length) {
643 fp->block_address = _bgzf_tell((_bgzf_file_t)fp->fp);
644 fp->block_offset = 0;
645 fp->block_length = 0;
651 #define kroundup32(x) (--(x), (x)|=(x)>>1, (x)|=(x)>>2, (x)|=(x)>>4, (x)|=(x)>>8, (x)|=(x)>>16, ++(x))
654 int bgzf_getline(BGZF *fp, int delim, kstring_t *str)
657 unsigned char *buf = (unsigned char*)fp->uncompressed_block;
660 if (fp->block_offset >= fp->block_length) {
661 if (bgzf_read_block(fp) != 0) { state = -2; break; }
662 if (fp->block_length == 0) { state = -1; break; }
664 for (l = fp->block_offset; l < fp->block_length && buf[l] != delim; ++l);
665 if (l < fp->block_length) state = 1;
666 l -= fp->block_offset;
667 if (str->l + l + 1 >= str->m) {
668 str->m = str->l + l + 2;
670 str->s = (char*)realloc(str->s, str->m);
672 memcpy(str->s + str->l, buf + fp->block_offset, l);
674 fp->block_offset += l + 1;
675 if (fp->block_offset >= fp->block_length) {
676 fp->block_address = _bgzf_tell((_bgzf_file_t)fp->fp);
677 fp->block_offset = 0;
678 fp->block_length = 0;
680 } while (state == 0);
681 if (str->l == 0 && state < 0) return state;