1 // ***************************************************************************
2 // BgzfStream_p.cpp (c) 2011 Derek Barnett
3 // Marth Lab, Department of Biology, Boston College
4 // ---------------------------------------------------------------------------
5 // Last modified: 25 October 2011(DB)
6 // ---------------------------------------------------------------------------
7 // Based on BGZF routines developed at the Broad Institute.
8 // Provides the basic functionality for reading & writing BGZF files
9 // Replaces the old BGZF.* files to avoid clashing with other toolkits
10 // ***************************************************************************
12 #include "api/BamAux.h"
13 #include "api/BamConstants.h"
14 #include "api/internal/io/BamDeviceFactory_p.h"
15 #include "api/internal/io/BgzfStream_p.h"
16 #include "api/internal/utils/BamException_p.h"
17 using namespace BamTools;
18 using namespace BamTools::Internal;
28 // ---------------------------
29 // BgzfStream implementation
30 // ---------------------------
33 BgzfStream::BgzfStream(void)
37 , m_isWriteCompressed(true)
39 , m_uncompressedBlock(Constants::BGZF_DEFAULT_BLOCK_SIZE)
40 , m_compressedBlock(Constants::BGZF_MAX_BLOCK_SIZE)
44 BgzfStream::~BgzfStream(void) {
48 // checks BGZF block header
49 bool BgzfStream::CheckBlockHeader(char* header) {
50 return (header[0] == Constants::GZIP_ID1 &&
51 header[1] == Constants::GZIP_ID2 &&
52 header[2] == Z_DEFLATED &&
53 (header[3] & Constants::FLG_FEXTRA) != 0 &&
54 BamTools::UnpackUnsignedShort(&header[10]) == Constants::BGZF_XLEN &&
55 header[12] == Constants::BGZF_ID1 &&
56 header[13] == Constants::BGZF_ID2 &&
57 BamTools::UnpackUnsignedShort(&header[14]) == Constants::BGZF_LEN );
61 void BgzfStream::Close(void) {
63 // skip if no device open
64 if ( m_device == 0 ) return;
66 // if writing to file, flush the current BGZF block,
67 // then write an empty block (as EOF marker)
68 if ( m_device->IsOpen() && (m_device->Mode() == IBamIODevice::WriteOnly) ) {
70 const size_t blockLength = DeflateBlock();
71 m_device->Write(m_compressedBlock.Buffer, blockLength);
79 // ensure our buffers are cleared out
80 m_uncompressedBlock.Clear();
81 m_compressedBlock.Clear();
87 m_isWriteCompressed = true;
90 // compresses the current block
91 size_t BgzfStream::DeflateBlock(void) {
93 // initialize the gzip header
94 char* buffer = m_compressedBlock.Buffer;
95 memset(buffer, 0, 18);
96 buffer[0] = Constants::GZIP_ID1;
97 buffer[1] = Constants::GZIP_ID2;
98 buffer[2] = Constants::CM_DEFLATE;
99 buffer[3] = Constants::FLG_FEXTRA;
100 buffer[9] = Constants::OS_UNKNOWN;
101 buffer[10] = Constants::BGZF_XLEN;
102 buffer[12] = Constants::BGZF_ID1;
103 buffer[13] = Constants::BGZF_ID2;
104 buffer[14] = Constants::BGZF_LEN;
106 // set compression level
107 const int compressionLevel = ( m_isWriteCompressed ? Z_DEFAULT_COMPRESSION : 0 );
109 // loop to retry for blocks that do not compress enough
110 int inputLength = m_blockOffset;
111 size_t compressedLength = 0;
112 const unsigned int bufferSize = Constants::BGZF_MAX_BLOCK_SIZE;
116 // initialize zstream values
120 zs.next_in = (Bytef*)m_uncompressedBlock.Buffer;
121 zs.avail_in = inputLength;
122 zs.next_out = (Bytef*)&buffer[Constants::BGZF_BLOCK_HEADER_LENGTH];
123 zs.avail_out = bufferSize -
124 Constants::BGZF_BLOCK_HEADER_LENGTH -
125 Constants::BGZF_BLOCK_FOOTER_LENGTH;
127 // initialize the zlib compression algorithm
128 int status = deflateInit2(&zs,
131 Constants::GZIP_WINDOW_BITS,
132 Constants::Z_DEFAULT_MEM_LEVEL,
134 if ( status != Z_OK )
135 throw BamException("BgzfStream::DeflateBlock", "zlib deflateInit2 failed");
138 status = deflate(&zs, Z_FINISH);
140 // if not at stream end
141 if ( status != Z_STREAM_END ) {
145 // there was not enough space available in buffer
146 // try to reduce the input length & re-start loop
147 if ( status == Z_OK ) {
149 if ( inputLength < 0 )
150 throw BamException("BgzfStream::DeflateBlock", "input reduction failed");
154 throw BamException("BgzfStream::DeflateBlock", "zlib deflate failed");
157 // finalize the compression routine
158 status = deflateEnd(&zs);
159 if ( status != Z_OK )
160 throw BamException("BgzfStream::DeflateBlock", "zlib deflateEnd failed");
162 // update compressedLength
163 compressedLength = zs.total_out +
164 Constants::BGZF_BLOCK_HEADER_LENGTH +
165 Constants::BGZF_BLOCK_FOOTER_LENGTH;
166 if ( compressedLength > Constants::BGZF_MAX_BLOCK_SIZE )
167 throw BamException("BgzfStream::DeflateBlock", "deflate overflow");
173 // store the compressed length
174 BamTools::PackUnsignedShort(&buffer[16], static_cast<uint16_t>(compressedLength - 1));
176 // store the CRC32 checksum
177 uint32_t crc = crc32(0, NULL, 0);
178 crc = crc32(crc, (Bytef*)m_uncompressedBlock.Buffer, inputLength);
179 BamTools::PackUnsignedInt(&buffer[compressedLength - 8], crc);
180 BamTools::PackUnsignedInt(&buffer[compressedLength - 4], inputLength);
182 // ensure that we have less than a block of data left
183 int remaining = m_blockOffset - inputLength;
184 if ( remaining > 0 ) {
185 if ( remaining > inputLength )
186 throw BamException("BgzfStream::DeflateBlock", "after deflate, remainder too large");
187 memcpy(m_uncompressedBlock.Buffer, m_uncompressedBlock.Buffer + inputLength, remaining);
191 m_blockOffset = remaining;
194 return compressedLength;
197 // flushes the data in the BGZF block
198 void BgzfStream::FlushBlock(void) {
200 BT_ASSERT_X( m_device, "BgzfStream::FlushBlock() - attempting to flush to null device" );
202 // flush all of the remaining blocks
203 while ( m_blockOffset > 0 ) {
205 // compress the data block
206 const size_t blockLength = DeflateBlock();
208 // flush the data to our output device
209 const int64_t numBytesWritten = m_device->Write(m_compressedBlock.Buffer, blockLength);
211 // check for device error
212 if ( numBytesWritten < 0 ) {
213 const string message = string("device error: ") + m_device->GetErrorString();
214 throw BamException("BgzfStream::FlushBlock", message);
217 // check that we wrote expected numBytes
218 if ( numBytesWritten != static_cast<int64_t>(blockLength) ) {
220 s << "expected to write " << blockLength
221 << " bytes during flushing, but wrote " << numBytesWritten;
222 throw BamException("BgzfStream::FlushBlock", s.str());
226 m_blockAddress += blockLength;
230 // decompresses the current block
231 size_t BgzfStream::InflateBlock(const size_t& blockLength) {
233 // setup zlib stream object
237 zs.next_in = (Bytef*)m_compressedBlock.Buffer + 18;
238 zs.avail_in = blockLength - 16;
239 zs.next_out = (Bytef*)m_uncompressedBlock.Buffer;
240 zs.avail_out = Constants::BGZF_DEFAULT_BLOCK_SIZE;
243 int status = inflateInit2(&zs, Constants::GZIP_WINDOW_BITS);
244 if ( status != Z_OK )
245 throw BamException("BgzfStream::InflateBlock", "zlib inflateInit failed");
248 status = inflate(&zs, Z_FINISH);
249 if ( status != Z_STREAM_END ) {
251 throw BamException("BgzfStream::InflateBlock", "zlib inflate failed");
255 status = inflateEnd(&zs);
256 if ( status != Z_OK ) {
258 throw BamException("BgzfStream::InflateBlock", "zlib inflateEnd failed");
265 bool BgzfStream::IsOpen(void) const {
268 return m_device->IsOpen();
271 void BgzfStream::Open(const string& filename, const IBamIODevice::OpenMode mode) {
273 // close current device if necessary
275 BT_ASSERT_X( (m_device == 0), "BgzfStream::Open() - unable to properly close previous IO device" );
277 // retrieve new IO device depending on filename
278 m_device = BamDeviceFactory::CreateDevice(filename);
279 BT_ASSERT_X( m_device, "BgzfStream::Open() - unable to create IO device from filename" );
281 // if device fails to open
282 if ( !m_device->Open(mode) ) {
283 const string deviceError = m_device->GetErrorString();
284 const string message = string("could not open BGZF stream: \n\t") + deviceError;
285 throw BamException("BgzfStream::Open", message);
289 // reads BGZF data into a byte buffer
290 size_t BgzfStream::Read(char* data, const size_t dataLength) {
292 if ( dataLength == 0 )
295 // if stream not open for reading
296 BT_ASSERT_X( m_device, "BgzfStream::Read() - trying to read from null device");
297 if ( !m_device->IsOpen() || (m_device->Mode() != IBamIODevice::ReadOnly) )
300 // read blocks as needed until desired data length is retrieved
302 size_t numBytesRead = 0;
303 while ( numBytesRead < dataLength ) {
305 // determine bytes available in current block
306 int bytesAvailable = m_blockLength - m_blockOffset;
308 // read (and decompress) next block if needed
309 if ( bytesAvailable <= 0 ) {
311 bytesAvailable = m_blockLength - m_blockOffset;
312 if ( bytesAvailable <= 0 )
316 // copy data from uncompressed source buffer into data destination buffer
317 const size_t copyLength = min( (dataLength-numBytesRead), (size_t)bytesAvailable );
318 memcpy(output, m_uncompressedBlock.Buffer + m_blockOffset, copyLength);
321 m_blockOffset += copyLength;
322 output += copyLength;
323 numBytesRead += copyLength;
327 if ( m_blockOffset == m_blockLength ) {
328 m_blockAddress = m_device->Tell();
334 // return actual number of bytes read
338 // reads a BGZF block
339 void BgzfStream::ReadBlock(void) {
341 BT_ASSERT_X( m_device, "BgzfStream::ReadBlock() - trying to read from null IO device");
343 // store block's starting address
344 int64_t blockAddress = m_device->Tell();
346 // read block header from file
347 char header[Constants::BGZF_BLOCK_HEADER_LENGTH];
348 int64_t numBytesRead = m_device->Read(header, Constants::BGZF_BLOCK_HEADER_LENGTH);
350 // check for device error
351 if ( numBytesRead < 0 ) {
352 const string message = string("device error: ") + m_device->GetErrorString();
353 throw BamException("BgzfStream::ReadBlock", message);
356 // if block header empty
357 if ( numBytesRead == 0 ) {
362 // if block header invalid size
363 if ( numBytesRead != static_cast<int8_t>(Constants::BGZF_BLOCK_HEADER_LENGTH) )
364 throw BamException("BgzfStream::ReadBlock", "invalid block header size");
366 // validate block header contents
367 if ( !BgzfStream::CheckBlockHeader(header) )
368 throw BamException("BgzfStream::ReadBlock", "invalid block header contents");
370 // copy header contents to compressed buffer
371 const size_t blockLength = BamTools::UnpackUnsignedShort(&header[16]) + 1;
372 memcpy(m_compressedBlock.Buffer, header, Constants::BGZF_BLOCK_HEADER_LENGTH);
374 // read remainder of block
375 const size_t remaining = blockLength - Constants::BGZF_BLOCK_HEADER_LENGTH;
376 numBytesRead = m_device->Read(&m_compressedBlock.Buffer[Constants::BGZF_BLOCK_HEADER_LENGTH], remaining);
378 // check for device error
379 if ( numBytesRead < 0 ) {
380 const string message = string("device error: ") + m_device->GetErrorString();
381 throw BamException("BgzfStream::ReadBlock", message);
384 // check that we read in expected numBytes
385 if ( numBytesRead != static_cast<int64_t>(remaining) )
386 throw BamException("BgzfStream::ReadBlock", "could not read data from block");
388 // decompress block data
389 const size_t newBlockLength = InflateBlock(blockLength);
392 if ( m_blockLength != 0 )
394 m_blockAddress = blockAddress;
395 m_blockLength = newBlockLength;
398 // seek to position in BGZF file
399 void BgzfStream::Seek(const int64_t& position) {
401 BT_ASSERT_X( m_device, "BgzfStream::Seek() - trying to seek on null IO device");
403 // skip if device is not open
404 if ( !IsOpen() ) return;
406 // determine adjusted offset & address
407 int blockOffset = (position & 0xFFFF);
408 int64_t blockAddress = (position >> 16) & 0xFFFFFFFFFFFFLL;
410 // attempt seek in file
411 if ( m_device->IsRandomAccess() && m_device->Seek(blockAddress) ) {
413 // update block data & return success
415 m_blockAddress = blockAddress;
416 m_blockOffset = blockOffset;
420 s << "unable to seek to position: " << position;
421 throw BamException("BgzfStream::Seek", s.str());
425 void BgzfStream::SetWriteCompressed(bool ok) {
426 m_isWriteCompressed = ok;
429 // get file position in BGZF file
430 int64_t BgzfStream::Tell(void) const {
433 return ( (m_blockAddress << 16) | (m_blockOffset & 0xFFFF) );
436 // writes the supplied data into the BGZF buffer
437 size_t BgzfStream::Write(const char* data, const size_t dataLength) {
439 BT_ASSERT_X( m_device, "BgzfStream::Write() - trying to write to null IO device");
440 BT_ASSERT_X( (m_device->Mode() == IBamIODevice::WriteOnly),
441 "BgzfStream::Write() - trying to write to non-writable IO device");
443 // skip if file not open for writing
447 // write blocks as needed til all data is written
448 size_t numBytesWritten = 0;
449 const char* input = data;
450 const size_t blockLength = Constants::BGZF_DEFAULT_BLOCK_SIZE;
451 while ( numBytesWritten < dataLength ) {
453 // copy data contents to uncompressed output buffer
454 unsigned int copyLength = min(blockLength - m_blockOffset, dataLength - numBytesWritten);
455 char* buffer = m_uncompressedBlock.Buffer;
456 memcpy(buffer + m_blockOffset, input, copyLength);
459 m_blockOffset += copyLength;
461 numBytesWritten += copyLength;
463 // flush (& compress) output buffer when full
464 if ( m_blockOffset == blockLength )
468 // return actual number of bytes written
469 return numBytesWritten;