1 // ***************************************************************************
2 // BgzfStream_p.cpp (c) 2011 Derek Barnett
3 // Marth Lab, Department of Biology, Boston College
4 // ---------------------------------------------------------------------------
5 // Last modified: 7 October 2011(DB)
6 // ---------------------------------------------------------------------------
7 // Based on BGZF routines developed at the Broad Institute.
8 // Provides the basic functionality for reading & writing BGZF files
9 // Replaces the old BGZF.* files to avoid clashing with other toolkits
10 // ***************************************************************************
12 #include <api/internal/BamDeviceFactory_p.h>
13 #include <api/internal/BamException_p.h>
14 #include <api/internal/BgzfStream_p.h>
15 using namespace BamTools;
16 using namespace BamTools::Internal;
24 // ----------------------------
25 // RaiiWrapper implementation
26 // ----------------------------
28 BgzfStream::RaiiWrapper::RaiiWrapper(void)
31 CompressedBlock = new char[Constants::BGZF_MAX_BLOCK_SIZE];
32 UncompressedBlock = new char[Constants::BGZF_DEFAULT_BLOCK_SIZE];
35 BgzfStream::RaiiWrapper::~RaiiWrapper(void) {
38 delete[] CompressedBlock;
39 delete[] UncompressedBlock;
41 UncompressedBlock = 0;
50 // ---------------------------
51 // BgzfStream implementation
52 // ---------------------------
55 BgzfStream::BgzfStream(void)
60 , m_isWriteOnly(false)
61 , m_isWriteCompressed(true)
66 BgzfStream::~BgzfStream(void) {
70 // checks BGZF block header
71 bool BgzfStream::CheckBlockHeader(char* header) {
72 return (header[0] == Constants::GZIP_ID1 &&
73 header[1] == Constants::GZIP_ID2 &&
74 header[2] == Z_DEFLATED &&
75 (header[3] & Constants::FLG_FEXTRA) != 0 &&
76 BamTools::UnpackUnsignedShort(&header[10]) == Constants::BGZF_XLEN &&
77 header[12] == Constants::BGZF_ID1 &&
78 header[13] == Constants::BGZF_ID2 &&
79 BamTools::UnpackUnsignedShort(&header[14]) == Constants::BGZF_LEN );
83 void BgzfStream::Close(void) {
85 // skip if no device open
89 // if writing to file, flush the current BGZF block,
90 // then write an empty block (as EOF marker)
91 if ( m_device->IsOpen() && (m_device->Mode() == IBamIODevice::WriteOnly) ) {
93 const size_t blockLength = DeflateBlock();
94 m_device->Write(Resources.CompressedBlock, blockLength);
103 fflush(Resources.Stream);
104 fclose(Resources.Stream);
105 Resources.Stream = 0;
112 m_isWriteOnly = false;
113 m_isWriteCompressed = true;
117 // compresses the current block
118 size_t BgzfStream::DeflateBlock(void) {
120 // initialize the gzip header
121 char* buffer = Resources.CompressedBlock;
122 memset(buffer, 0, 18);
123 buffer[0] = Constants::GZIP_ID1;
124 buffer[1] = Constants::GZIP_ID2;
125 buffer[2] = Constants::CM_DEFLATE;
126 buffer[3] = Constants::FLG_FEXTRA;
127 buffer[9] = Constants::OS_UNKNOWN;
128 buffer[10] = Constants::BGZF_XLEN;
129 buffer[12] = Constants::BGZF_ID1;
130 buffer[13] = Constants::BGZF_ID2;
131 buffer[14] = Constants::BGZF_LEN;
133 // set compression level
134 const int compressionLevel = ( m_isWriteCompressed ? Z_DEFAULT_COMPRESSION : 0 );
136 // loop to retry for blocks that do not compress enough
137 int inputLength = BlockOffset;
138 size_t compressedLength = 0;
139 const unsigned int bufferSize = Constants::BGZF_MAX_BLOCK_SIZE;
143 // initialize zstream values
147 zs.next_in = (Bytef*)Resources.UncompressedBlock;
148 zs.avail_in = inputLength;
149 zs.next_out = (Bytef*)&buffer[Constants::BGZF_BLOCK_HEADER_LENGTH];
150 zs.avail_out = bufferSize -
151 Constants::BGZF_BLOCK_HEADER_LENGTH -
152 Constants::BGZF_BLOCK_FOOTER_LENGTH;
154 // initialize the zlib compression algorithm
155 int status = deflateInit2(&zs,
158 Constants::GZIP_WINDOW_BITS,
159 Constants::Z_DEFAULT_MEM_LEVEL,
161 if ( status != Z_OK )
162 throw BamException("BgzfStream::DeflateBlock", "zlib deflateInit2 failed");
165 status = deflate(&zs, Z_FINISH);
167 // if not at stream end
168 if ( status != Z_STREAM_END ) {
173 if ( status != Z_OK )
174 throw BamException("BgzfStream::DeflateBlock", "zlib deflate failed");
176 // not enough space available in buffer
177 // try to reduce the input length & re-start loop
179 if ( inputLength <= 0 )
180 throw BamException("BgzfStream::DeflateBlock", "input reduction failed");
184 // finalize the compression routine
185 status = deflateEnd(&zs);
186 if ( status != Z_OK )
187 throw BamException("BgzfStream::DeflateBlock", "zlib deflateEnd failed");
189 // update compressedLength
190 compressedLength = zs.total_out +
191 Constants::BGZF_BLOCK_HEADER_LENGTH +
192 Constants::BGZF_BLOCK_FOOTER_LENGTH;
193 if ( compressedLength > Constants::BGZF_MAX_BLOCK_SIZE )
194 throw BamException("BgzfStream::DeflateBlock", "deflate overflow");
200 // store the compressed length
201 BamTools::PackUnsignedShort(&buffer[16], static_cast<uint16_t>(compressedLength - 1));
203 // store the CRC32 checksum
204 uint32_t crc = crc32(0, NULL, 0);
205 crc = crc32(crc, (Bytef*)Resources.UncompressedBlock, inputLength);
206 BamTools::PackUnsignedInt(&buffer[compressedLength - 8], crc);
207 BamTools::PackUnsignedInt(&buffer[compressedLength - 4], inputLength);
209 // ensure that we have less than a block of data left
210 int remaining = m_blockOffset - inputLength;
211 if ( remaining > 0 ) {
212 if ( remaining > inputLength )
213 throw BamException("BgzfStream::DeflateBlock", "after deflate, remainder too large");
214 memcpy(Resources.UncompressedBlock, Resources.UncompressedBlock + inputLength, remaining);
218 m_blockOffset = remaining;
221 return compressedLength;
224 // flushes the data in the BGZF block
225 void BgzfStream::FlushBlock(void) {
227 BT_ASSERT_X( m_device, "BgzfStream::FlushBlock() - attempting to flush to null device" );
229 // flush all of the remaining blocks
230 while ( m_blockOffset > 0 ) {
232 // compress the data block
233 const size_t blockLength = DeflateBlock();
235 // flush the data to our output device
236 const size_t numBytesWritten = m_device->Write(Resources.CompressedBlock, blockLength);
237 if ( numBytesWritten != blockLength ) {
239 s << "expected to write " << blockLength
240 << " bytes during flushing, but wrote " << numBytesWritten;
241 throw BamException("BgzfStream::FlushBlock", s.str());
245 m_blockAddress += blockLength;
249 // decompresses the current block
250 size_t BgzfStream::InflateBlock(const size_t& blockLength) {
252 // setup zlib stream object
256 zs.next_in = (Bytef*)Resources.CompressedBlock + 18;
257 zs.avail_in = blockLength - 16;
258 zs.next_out = (Bytef*)Resources.UncompressedBlock;
259 zs.avail_out = Constants::BGZF_DEFAULT_BLOCK_SIZE;
262 int status = inflateInit2(&zs, Constants::GZIP_WINDOW_BITS);
263 if ( status != Z_OK )
264 throw BamException("BgzfStream::InflateBlock", "zlib inflateInit failed");
267 status = inflate(&zs, Z_FINISH);
268 if ( status != Z_STREAM_END ) {
270 throw BamException("BgzfStream::InflateBlock", "zlib inflate failed");
274 status = inflateEnd(&zs);
275 if ( status != Z_OK ) {
277 throw BamException("BgzfStream::InflateBlock", "zlib inflateEnd failed");
284 bool BgzfStream::IsOpen(void) const {
287 return m_device->IsOpen();
290 bool BgzfStream::Open(const string& filename, const IBamIODevice::OpenMode mode) {
292 // close current device if necessary
296 BT_ASSERT_X( (m_device == 0), "BgzfStream::Open() - unable to properly close previous IO device" );
298 // retrieve new IO device depending on filename
299 m_device = BamDeviceFactory::CreateDevice(filename);
302 BT_ASSERT_X( m_device, "BgzfStream::Open() - unable to create IO device from filename" );
304 // if device fails to open
305 if ( !m_device->Open(mode) ) {
306 cerr << "BgzfStream::Open() - unable to open IO device:" << endl;
307 cerr << m_device->ErrorString();
311 // otherwise, set flag & return true
313 m_isWriteOnly = ( mode == IBamIODevice::WriteOnly );
318 // opens the BGZF file for reading (mode is either "rb" for reading, or "wb" for writing)
319 void BgzfStream::Open(const string& filename, const char* mode) {
321 // make sure we're starting with fresh state
325 // determine open mode
326 if ( strcmp(mode, "rb") == 0 )
327 m_isWriteOnly = false;
328 else if ( strcmp(mode, "wb") == 0)
329 m_isWriteOnly = true;
331 const string message = string("unknown file mode: ") + mode;
332 throw BamException("BgzfStream::Open", message);
335 // open BGZF stream on a file
336 if ( (filename != "stdin") && (filename != "stdout") && (filename != "-"))
337 Resources.Stream = fopen(filename.c_str(), mode);
339 // open BGZF stream on stdin
340 else if ( (filename == "stdin" || filename == "-") && (strcmp(mode, "rb") == 0 ) )
341 Resources.Stream = freopen(NULL, mode, stdin);
343 // open BGZF stream on stdout
344 else if ( (filename == "stdout" || filename == "-") && (strcmp(mode, "wb") == 0) )
345 Resources.Stream = freopen(NULL, mode, stdout);
347 // ensure valid Stream
348 if ( !Resources.Stream ) {
349 const string message = string("unable to open file: ") + filename;
350 throw BamException("BgzfStream::Open", message);
353 // set flag & return success
358 // reads BGZF data into a byte buffer
359 size_t BgzfStream::Read(char* data, const size_t dataLength) {
361 if ( dataLength == 0 )
364 // if stream not open for reading
365 BT_ASSERT_X( m_device, "BgzfStream::Read() - trying to read from null device");
366 if ( !m_device->IsOpen() || (m_device->Mode() != IBamIODevice::ReadOnly) )
369 // read blocks as needed until desired data length is retrieved
370 size_t numBytesRead = 0;
371 while ( numBytesRead < dataLength ) {
373 // determine bytes available in current block
374 int bytesAvailable = m_blockLength - m_blockOffset;
376 // read (and decompress) next block if needed
377 if ( bytesAvailable <= 0 ) {
379 bytesAvailable = m_blockLength - m_blockOffset;
380 if ( bytesAvailable <= 0 )
384 // copy data from uncompressed source buffer into data destination buffer
385 const size_t copyLength = min( (dataLength-numBytesRead), (size_t)bytesAvailable );
386 memcpy(data, Resources.UncompressedBlock + m_blockOffset, copyLength);
389 m_blockOffset += copyLength;
391 numBytesRead += copyLength;
395 if ( m_blockOffset == m_blockLength ) {
396 m_blockAddress = m_device->Tell();
402 // return actual number of bytes read
406 // reads a BGZF block
407 void BgzfStream::ReadBlock(void) {
409 BT_ASSERT_X( m_device, "BgzfStream::ReadBlock() - trying to read from null IO device");
411 // store block's starting address
412 int64_t blockAddress = m_device->Tell();
414 // read block header from file
415 char header[Constants::BGZF_BLOCK_HEADER_LENGTH];
416 size_t numBytesRead = m_device->Read(header, Constants::BGZF_BLOCK_HEADER_LENGTH);
418 // if block header empty
419 if ( numBytesRead == 0 ) {
424 // if block header invalid size
425 if ( numBytesRead != Constants::BGZF_BLOCK_HEADER_LENGTH )
426 throw BamException("BgzfStream::ReadBlock", "invalid block header size");
428 // validate block header contents
429 if ( !BgzfStream::CheckBlockHeader(header) )
430 throw BamException("BgzfStream::ReadBlock", "invalid block header contents");
432 // copy header contents to compressed buffer
433 const size_t blockLength = BamTools::UnpackUnsignedShort(&header[16]) + 1;
434 memcpy(Resources.CompressedBlock, header, Constants::BGZF_BLOCK_HEADER_LENGTH);
436 // read remainder of block
437 const size_t remaining = blockLength - Constants::BGZF_BLOCK_HEADER_LENGTH;
438 numBytesRead = m_device->Read(&Resources.CompressedBlock[Constants::BGZF_BLOCK_HEADER_LENGTH], remaining);
439 if ( numBytesRead != remaining )
440 throw BamException("BgzfStream::ReadBlock", "could not read data from block");
442 // decompress block data
443 numBytesRead = InflateBlock(blockLength);
446 if ( m_blockLength != 0 )
448 m_blockAddress = blockAddress;
449 m_blockLength = numBytesRead;
452 // seek to position in BGZF file
453 void BgzfStream::Seek(const int64_t& position) {
455 BT_ASSERT_X( m_device, "BgzfStream::Seek() - trying to seek on null IO device");
457 // skip if not open or not seek-able
458 if ( !IsOpen() /*|| !m_device->IsRandomAccess()*/ ) {
459 cerr << "BgzfStream::Seek() - device not open" << endl;
463 // determine adjusted offset & address
464 int blockOffset = (position & 0xFFFF);
465 int64_t blockAddress = (position >> 16) & 0xFFFFFFFFFFFFLL;
467 // attempt seek in file
468 if ( !m_device->Seek(blockAddress) ) {
470 s << "unable to seek to position: " << position;
471 throw BamException("BgzfStream::Seek", s.str());
474 // update block data & return success
476 m_blockAddress = blockAddress;
477 m_blockOffset = blockOffset;
480 void BgzfStream::SetWriteCompressed(bool ok) {
481 m_isWriteCompressed = ok;
484 // get file position in BGZF file
485 int64_t BgzfStream::Tell(void) const {
486 if ( !m_isOpen ) return 0;
487 return ( (m_blockAddress << 16) | (m_blockOffset & 0xFFFF) );
490 // writes the supplied data into the BGZF buffer
491 size_t BgzfStream::Write(const char* data, const size_t dataLength) {
493 BT_ASSERT_X( m_device, "BgzfStream::Write() - trying to write to null IO device");
494 BT_ASSERT_X( (m_device->Mode() == IBamIODevice::WriteOnly),
495 "BgzfStream::Write() - trying to write to non-writable IO device");
497 // skip if file not open for writing
498 if ( !IsOpen || !IsWriteOnly )
501 // write blocks as needed til all data is written
502 size_t numBytesWritten = 0;
503 const char* input = data;
504 const size_t blockLength = Constants::BGZF_DEFAULT_BLOCK_SIZE;
505 while ( numBytesWritten < dataLength ) {
507 // copy data contents to uncompressed output buffer
508 unsigned int copyLength = min(blockLength - m_blockOffset, dataLength - numBytesWritten);
509 char* buffer = Resources.UncompressedBlock;
510 memcpy(buffer + m_blockOffset, input, copyLength);
513 m_blockOffset += copyLength;
515 numBytesWritten += copyLength;
517 // flush (& compress) output buffer when full
518 if ( m_blockOffset == blockLength )
522 // return actual number of bytes written
523 return numBytesWritten;