1 // ***************************************************************************
2 // BgzfStream_p.cpp (c) 2011 Derek Barnett
3 // Marth Lab, Department of Biology, Boston College
4 // ---------------------------------------------------------------------------
5 // Last modified: 17 January 2012(DB)
6 // ---------------------------------------------------------------------------
7 // Based on BGZF routines developed at the Broad Institute.
8 // Provides the basic functionality for reading & writing BGZF files
9 // Replaces the old BGZF.* files to avoid clashing with other toolkits
10 // ***************************************************************************
12 #include "api/BamAux.h"
13 #include "api/BamConstants.h"
14 #include "api/internal/io/BamDeviceFactory_p.h"
15 #include "api/internal/io/BgzfStream_p.h"
16 #include "api/internal/utils/BamException_p.h"
17 using namespace BamTools;
18 using namespace BamTools::Internal;
28 // ---------------------------
29 // BgzfStream implementation
30 // ---------------------------
33 BgzfStream::BgzfStream(void)
37 , m_isWriteCompressed(true)
39 , m_uncompressedBlock(Constants::BGZF_DEFAULT_BLOCK_SIZE)
40 , m_compressedBlock(Constants::BGZF_MAX_BLOCK_SIZE)
44 BgzfStream::~BgzfStream(void) {
48 // checks BGZF block header
49 bool BgzfStream::CheckBlockHeader(char* header) {
50 return (header[0] == Constants::GZIP_ID1 &&
51 header[1] == Constants::GZIP_ID2 &&
52 header[2] == Z_DEFLATED &&
53 (header[3] & Constants::FLG_FEXTRA) != 0 &&
54 BamTools::UnpackUnsignedShort(&header[10]) == Constants::BGZF_XLEN &&
55 header[12] == Constants::BGZF_ID1 &&
56 header[13] == Constants::BGZF_ID2 &&
57 BamTools::UnpackUnsignedShort(&header[14]) == Constants::BGZF_LEN );
61 void BgzfStream::Close(void) {
63 // skip if no device open
64 if ( m_device == 0 ) return;
66 // if writing to file, flush the current BGZF block,
67 // then write an empty block (as EOF marker)
68 if ( m_device->IsOpen() && (m_device->Mode() == IBamIODevice::WriteOnly) ) {
70 const size_t blockLength = DeflateBlock(0);
71 m_device->Write(m_compressedBlock.Buffer, blockLength);
79 // ensure our buffers are cleared out
80 m_uncompressedBlock.Clear();
81 m_compressedBlock.Clear();
87 m_isWriteCompressed = true;
90 // compresses the current block
91 size_t BgzfStream::DeflateBlock(int32_t blockLength) {
93 // initialize the gzip header
94 char* buffer = m_compressedBlock.Buffer;
95 memset(buffer, 0, 18);
96 buffer[0] = Constants::GZIP_ID1;
97 buffer[1] = Constants::GZIP_ID2;
98 buffer[2] = Constants::CM_DEFLATE;
99 buffer[3] = Constants::FLG_FEXTRA;
100 buffer[9] = Constants::OS_UNKNOWN;
101 buffer[10] = Constants::BGZF_XLEN;
102 buffer[12] = Constants::BGZF_ID1;
103 buffer[13] = Constants::BGZF_ID2;
104 buffer[14] = Constants::BGZF_LEN;
106 // set compression level
107 const int compressionLevel = ( m_isWriteCompressed ? Z_DEFAULT_COMPRESSION : 0 );
109 // loop to retry for blocks that do not compress enough
110 int inputLength = blockLength;
111 size_t compressedLength = 0;
112 const unsigned int bufferSize = Constants::BGZF_MAX_BLOCK_SIZE;
116 // initialize zstream values
120 zs.next_in = (Bytef*)m_uncompressedBlock.Buffer;
121 zs.avail_in = inputLength;
122 zs.next_out = (Bytef*)&buffer[Constants::BGZF_BLOCK_HEADER_LENGTH];
123 zs.avail_out = bufferSize -
124 Constants::BGZF_BLOCK_HEADER_LENGTH -
125 Constants::BGZF_BLOCK_FOOTER_LENGTH;
127 // initialize the zlib compression algorithm
128 int status = deflateInit2(&zs,
131 Constants::GZIP_WINDOW_BITS,
132 Constants::Z_DEFAULT_MEM_LEVEL,
134 if ( status != Z_OK )
135 throw BamException("BgzfStream::DeflateBlock", "zlib deflateInit2 failed");
138 status = deflate(&zs, Z_FINISH);
140 // if not at stream end
141 if ( status != Z_STREAM_END ) {
145 // there was not enough space available in buffer
146 // try to reduce the input length & re-start loop
147 if ( status == Z_OK ) {
149 if ( inputLength < 0 )
150 throw BamException("BgzfStream::DeflateBlock", "input reduction failed");
154 throw BamException("BgzfStream::DeflateBlock", "zlib deflate failed");
157 // finalize the compression routine
158 status = deflateEnd(&zs);
159 if ( status != Z_OK )
160 throw BamException("BgzfStream::DeflateBlock", "zlib deflateEnd failed");
162 // update compressedLength
163 compressedLength = zs.total_out +
164 Constants::BGZF_BLOCK_HEADER_LENGTH +
165 Constants::BGZF_BLOCK_FOOTER_LENGTH;
166 if ( compressedLength > Constants::BGZF_MAX_BLOCK_SIZE )
167 throw BamException("BgzfStream::DeflateBlock", "deflate overflow");
173 // store the compressed length
174 BamTools::PackUnsignedShort(&buffer[16], static_cast<uint16_t>(compressedLength - 1));
176 // store the CRC32 checksum
177 uint32_t crc = crc32(0, NULL, 0);
178 crc = crc32(crc, (Bytef*)m_uncompressedBlock.Buffer, inputLength);
179 BamTools::PackUnsignedInt(&buffer[compressedLength - 8], crc);
180 BamTools::PackUnsignedInt(&buffer[compressedLength - 4], inputLength);
182 // ensure that we have less than a block of data left
183 int remaining = blockLength - inputLength;
184 if ( remaining > 0 ) {
185 if ( remaining > inputLength )
186 throw BamException("BgzfStream::DeflateBlock", "after deflate, remainder too large");
187 memcpy(m_uncompressedBlock.Buffer, m_uncompressedBlock.Buffer + inputLength, remaining);
191 m_blockOffset = remaining;
194 return compressedLength;
197 // flushes the data in the BGZF block
198 void BgzfStream::FlushBlock(void) {
200 BT_ASSERT_X( m_device, "BgzfStream::FlushBlock() - attempting to flush to null device" );
202 // flush all of the remaining blocks
203 while ( m_blockOffset > 0 ) {
205 // compress the data block
206 const size_t blockLength = DeflateBlock(m_blockOffset);
208 // flush the data to our output device
209 const int64_t numBytesWritten = m_device->Write(m_compressedBlock.Buffer, blockLength);
211 // check for device error
212 if ( numBytesWritten < 0 ) {
213 const string message = string("device error: ") + m_device->GetErrorString();
214 throw BamException("BgzfStream::FlushBlock", message);
217 // check that we wrote expected numBytes
218 if ( numBytesWritten != static_cast<int64_t>(blockLength) ) {
220 s << "expected to write " << blockLength
221 << " bytes during flushing, but wrote " << numBytesWritten;
222 throw BamException("BgzfStream::FlushBlock", s.str());
226 m_blockAddress += blockLength;
230 // decompresses the current block
231 size_t BgzfStream::InflateBlock(const size_t& blockLength) {
233 // setup zlib stream object
237 zs.next_in = (Bytef*)m_compressedBlock.Buffer + 18;
238 zs.avail_in = blockLength - 16;
239 zs.next_out = (Bytef*)m_uncompressedBlock.Buffer;
240 zs.avail_out = Constants::BGZF_DEFAULT_BLOCK_SIZE;
243 int status = inflateInit2(&zs, Constants::GZIP_WINDOW_BITS);
244 if ( status != Z_OK )
245 throw BamException("BgzfStream::InflateBlock", "zlib inflateInit failed");
248 status = inflate(&zs, Z_FINISH);
249 if ( status != Z_STREAM_END ) {
251 throw BamException("BgzfStream::InflateBlock", "zlib inflate failed");
255 status = inflateEnd(&zs);
256 if ( status != Z_OK ) {
258 throw BamException("BgzfStream::InflateBlock", "zlib inflateEnd failed");
265 bool BgzfStream::IsOpen(void) const {
268 return m_device->IsOpen();
271 void BgzfStream::Open(const string& filename, const IBamIODevice::OpenMode mode) {
273 // close current device if necessary
275 BT_ASSERT_X( (m_device == 0), "BgzfStream::Open() - unable to properly close previous IO device" );
277 // retrieve new IO device depending on filename
278 m_device = BamDeviceFactory::CreateDevice(filename);
279 BT_ASSERT_X( m_device, "BgzfStream::Open() - unable to create IO device from filename" );
281 // if device fails to open
282 if ( !m_device->Open(mode) ) {
283 const string deviceError = m_device->GetErrorString();
284 const string message = string("could not open BGZF stream: \n\t") + deviceError;
285 throw BamException("BgzfStream::Open", message);
289 // reads BGZF data into a byte buffer
290 size_t BgzfStream::Read(char* data, const size_t dataLength) {
292 if ( dataLength == 0 )
295 // if stream not open for reading
296 BT_ASSERT_X( m_device, "BgzfStream::Read() - trying to read from null device");
297 if ( !m_device->IsOpen() || (m_device->Mode() != IBamIODevice::ReadOnly) )
300 // read blocks as needed until desired data length is retrieved
302 size_t numBytesRead = 0;
303 while ( numBytesRead < dataLength ) {
305 // determine bytes available in current block
306 int bytesAvailable = m_blockLength - m_blockOffset;
308 // read (and decompress) next block if needed
309 if ( bytesAvailable <= 0 ) {
311 bytesAvailable = m_blockLength - m_blockOffset;
312 if ( bytesAvailable <= 0 )
316 // copy data from uncompressed source buffer into data destination buffer
317 const size_t copyLength = min( (dataLength-numBytesRead), (size_t)bytesAvailable );
318 memcpy(output, m_uncompressedBlock.Buffer + m_blockOffset, copyLength);
321 m_blockOffset += copyLength;
322 output += copyLength;
323 numBytesRead += copyLength;
327 if ( m_blockOffset == m_blockLength ) {
328 m_blockAddress = m_device->Tell();
333 // return actual number of bytes read
337 // reads a BGZF block
338 void BgzfStream::ReadBlock(void) {
340 BT_ASSERT_X( m_device, "BgzfStream::ReadBlock() - trying to read from null IO device");
342 // store block's starting address
343 const int64_t blockAddress = m_device->Tell();
345 // read block header from file
346 char header[Constants::BGZF_BLOCK_HEADER_LENGTH];
347 int64_t numBytesRead = m_device->Read(header, Constants::BGZF_BLOCK_HEADER_LENGTH);
349 // check for device error
350 if ( numBytesRead < 0 ) {
351 const string message = string("device error: ") + m_device->GetErrorString();
352 throw BamException("BgzfStream::ReadBlock", message);
355 // if block header empty
356 if ( numBytesRead == 0 ) {
361 // if block header invalid size
362 if ( numBytesRead != static_cast<int8_t>(Constants::BGZF_BLOCK_HEADER_LENGTH) )
363 throw BamException("BgzfStream::ReadBlock", "invalid block header size");
365 // validate block header contents
366 if ( !BgzfStream::CheckBlockHeader(header) )
367 throw BamException("BgzfStream::ReadBlock", "invalid block header contents");
369 // copy header contents to compressed buffer
370 const size_t blockLength = BamTools::UnpackUnsignedShort(&header[16]) + 1;
371 memcpy(m_compressedBlock.Buffer, header, Constants::BGZF_BLOCK_HEADER_LENGTH);
373 // read remainder of block
374 const size_t remaining = blockLength - Constants::BGZF_BLOCK_HEADER_LENGTH;
375 numBytesRead = m_device->Read(&m_compressedBlock.Buffer[Constants::BGZF_BLOCK_HEADER_LENGTH], remaining);
377 // check for device error
378 if ( numBytesRead < 0 ) {
379 const string message = string("device error: ") + m_device->GetErrorString();
380 throw BamException("BgzfStream::ReadBlock", message);
383 // check that we read in expected numBytes
384 if ( numBytesRead != static_cast<int64_t>(remaining) )
385 throw BamException("BgzfStream::ReadBlock", "could not read data from block");
387 // decompress block data
388 const size_t newBlockLength = InflateBlock(blockLength);
391 if ( m_blockLength != 0 )
393 m_blockAddress = blockAddress;
394 m_blockLength = newBlockLength;
397 // seek to position in BGZF file
398 void BgzfStream::Seek(const int64_t& position) {
400 BT_ASSERT_X( m_device, "BgzfStream::Seek() - trying to seek on null IO device");
402 // skip if device is not open
403 if ( !IsOpen() ) return;
405 // determine adjusted offset & address
406 int blockOffset = (position & 0xFFFF);
407 int64_t blockAddress = (position >> 16) & 0xFFFFFFFFFFFFLL;
409 // attempt seek in file
410 if ( m_device->IsRandomAccess() && m_device->Seek(blockAddress) ) {
412 // update block data & return success
414 m_blockAddress = blockAddress;
415 m_blockOffset = blockOffset;
419 s << "unable to seek to position: " << position;
420 throw BamException("BgzfStream::Seek", s.str());
424 void BgzfStream::SetWriteCompressed(bool ok) {
425 m_isWriteCompressed = ok;
428 // get file position in BGZF file
429 int64_t BgzfStream::Tell(void) const {
432 return ( (m_blockAddress << 16) | (m_blockOffset & 0xFFFF) );
435 // writes the supplied data into the BGZF buffer
436 size_t BgzfStream::Write(const char* data, const size_t dataLength) {
438 BT_ASSERT_X( m_device, "BgzfStream::Write() - trying to write to null IO device");
439 BT_ASSERT_X( (m_device->Mode() == IBamIODevice::WriteOnly),
440 "BgzfStream::Write() - trying to write to non-writable IO device");
442 // skip if file not open for writing
446 // write blocks as needed til all data is written
447 size_t numBytesWritten = 0;
448 const char* input = data;
449 const size_t blockLength = Constants::BGZF_DEFAULT_BLOCK_SIZE;
450 while ( numBytesWritten < dataLength ) {
452 // copy data contents to uncompressed output buffer
453 unsigned int copyLength = min(blockLength - m_blockOffset, dataLength - numBytesWritten);
454 char* buffer = m_uncompressedBlock.Buffer;
455 memcpy(buffer + m_blockOffset, input, copyLength);
458 m_blockOffset += copyLength;
460 numBytesWritten += copyLength;
462 // flush (& compress) output buffer when full
463 if ( m_blockOffset == static_cast<int32_t>(blockLength) )
467 // return actual number of bytes written
468 return numBytesWritten;