X-Git-Url: https://git.donarmstrong.com/?a=blobdiff_plain;f=src%2Fapi%2Finternal%2FBgzfStream_p.cpp;h=1744ddd37bf0bde872a41a8f60bbd117fc6634e3;hb=9f1ce8c47aeadb6dc1320b52ee671c3341b97935;hp=aba2a0786a095db16c6adf6c8830a3f14bd37f4a;hpb=a15dba1bdfe5a1a61e175cb18b1e2694cfcd1746;p=bamtools.git diff --git a/src/api/internal/BgzfStream_p.cpp b/src/api/internal/BgzfStream_p.cpp index aba2a07..1744ddd 100644 --- a/src/api/internal/BgzfStream_p.cpp +++ b/src/api/internal/BgzfStream_p.cpp @@ -1,98 +1,127 @@ // *************************************************************************** // BgzfStream_p.cpp (c) 2011 Derek Barnett // Marth Lab, Department of Biology, Boston College -// All rights reserved. // --------------------------------------------------------------------------- -// Last modified: 5 April 2011(DB) +// Last modified: 10 October 2011(DB) // --------------------------------------------------------------------------- // Based on BGZF routines developed at the Broad Institute. // Provides the basic functionality for reading & writing BGZF files // Replaces the old BGZF.* files to avoid clashing with other toolkits // *************************************************************************** -#include +#include "api/BamAux.h" +#include "api/BamConstants.h" +#include "api/internal/BamDeviceFactory_p.h" +#include "api/internal/BamException_p.h" +#include "api/internal/BgzfStream_p.h" using namespace BamTools; using namespace BamTools::Internal; +#include "zlib.h" + #include #include +#include +#include using namespace std; +// ---------------------------- +// RaiiWrapper implementation +// ---------------------------- + +BgzfStream::RaiiWrapper::RaiiWrapper(void) { + CompressedBlock = new char[Constants::BGZF_MAX_BLOCK_SIZE]; + UncompressedBlock = new char[Constants::BGZF_DEFAULT_BLOCK_SIZE]; +} + +BgzfStream::RaiiWrapper::~RaiiWrapper(void) { + + // clean up buffers + delete[] CompressedBlock; + delete[] UncompressedBlock; + CompressedBlock = 0; + UncompressedBlock = 0; +} + +// --------------------------- +// BgzfStream implementation +// --------------------------- + // constructor BgzfStream::BgzfStream(void) - : UncompressedBlockSize(Constants::BGZF_DEFAULT_BLOCK_SIZE) - , CompressedBlockSize(Constants::BGZF_MAX_BLOCK_SIZE) - , BlockLength(0) - , BlockOffset(0) - , BlockAddress(0) - , IsOpen(false) - , IsWriteOnly(false) - , IsWriteCompressed(true) - , Stream(NULL) - , UncompressedBlock(NULL) - , CompressedBlock(NULL) -{ - try { - CompressedBlock = new char[CompressedBlockSize]; - UncompressedBlock = new char[UncompressedBlockSize]; - } catch( std::bad_alloc& ba ) { - fprintf(stderr, "BgzfStream ERROR: unable to allocate memory\n"); - exit(1); - } -} + : m_blockLength(0) + , m_blockOffset(0) + , m_blockAddress(0) + , m_isWriteCompressed(true) + , m_device(0) +{ } // destructor BgzfStream::~BgzfStream(void) { - if( CompressedBlock ) delete[] CompressedBlock; - if( UncompressedBlock ) delete[] UncompressedBlock; + Close(); +} + +// checks BGZF block header +bool BgzfStream::CheckBlockHeader(char* header) { + return (header[0] == Constants::GZIP_ID1 && + header[1] == Constants::GZIP_ID2 && + header[2] == Z_DEFLATED && + (header[3] & Constants::FLG_FEXTRA) != 0 && + BamTools::UnpackUnsignedShort(&header[10]) == Constants::BGZF_XLEN && + header[12] == Constants::BGZF_ID1 && + header[13] == Constants::BGZF_ID2 && + BamTools::UnpackUnsignedShort(&header[14]) == Constants::BGZF_LEN ); } // closes BGZF file void BgzfStream::Close(void) { - // skip if file not open - if ( !IsOpen ) return; + // reset state + m_blockLength = 0; + m_blockOffset = 0; + m_blockAddress = 0; + m_isWriteCompressed = true; + + // skip if no device open + if ( m_device == 0 ) return; // if writing to file, flush the current BGZF block, // then write an empty block (as EOF marker) - if ( IsWriteOnly ) { + if ( m_device->IsOpen() && (m_device->Mode() == IBamIODevice::WriteOnly) ) { FlushBlock(); - int blockLength = DeflateBlock(); - fwrite(CompressedBlock, 1, blockLength, Stream); + const size_t blockLength = DeflateBlock(); + m_device->Write(Resources.CompressedBlock, blockLength); } - // flush and close stream - fflush(Stream); - fclose(Stream); - - // reset flags - IsWriteCompressed = true; - IsOpen = false; + // close device + m_device->Close(); + delete m_device; + m_device = 0; } // compresses the current block -int BgzfStream::DeflateBlock(void) { +size_t BgzfStream::DeflateBlock(void) { // initialize the gzip header - char* buffer = CompressedBlock; + char* buffer = Resources.CompressedBlock; memset(buffer, 0, 18); buffer[0] = Constants::GZIP_ID1; - buffer[1] = (char)Constants::GZIP_ID2; + buffer[1] = Constants::GZIP_ID2; buffer[2] = Constants::CM_DEFLATE; buffer[3] = Constants::FLG_FEXTRA; - buffer[9] = (char)Constants::OS_UNKNOWN; + buffer[9] = Constants::OS_UNKNOWN; buffer[10] = Constants::BGZF_XLEN; buffer[12] = Constants::BGZF_ID1; buffer[13] = Constants::BGZF_ID2; buffer[14] = Constants::BGZF_LEN; // set compression level - const int compressionLevel = ( IsWriteCompressed ? Z_DEFAULT_COMPRESSION : 0 ); + const int compressionLevel = ( m_isWriteCompressed ? Z_DEFAULT_COMPRESSION : 0 ); // loop to retry for blocks that do not compress enough - int inputLength = BlockOffset; - int compressedLength = 0; - unsigned int bufferSize = CompressedBlockSize; + int inputLength = m_blockOffset; + size_t compressedLength = 0; + const unsigned int bufferSize = Constants::BGZF_MAX_BLOCK_SIZE; while ( true ) { @@ -100,80 +129,78 @@ int BgzfStream::DeflateBlock(void) { z_stream zs; zs.zalloc = NULL; zs.zfree = NULL; - zs.next_in = (Bytef*)UncompressedBlock; + zs.next_in = (Bytef*)Resources.UncompressedBlock; zs.avail_in = inputLength; zs.next_out = (Bytef*)&buffer[Constants::BGZF_BLOCK_HEADER_LENGTH]; - zs.avail_out = bufferSize - Constants::BGZF_BLOCK_HEADER_LENGTH - Constants::BGZF_BLOCK_FOOTER_LENGTH; + zs.avail_out = bufferSize - + Constants::BGZF_BLOCK_HEADER_LENGTH - + Constants::BGZF_BLOCK_FOOTER_LENGTH; // initialize the zlib compression algorithm - if ( deflateInit2(&zs, - compressionLevel, - Z_DEFLATED, - Constants::GZIP_WINDOW_BITS, - Constants::Z_DEFAULT_MEM_LEVEL, - Z_DEFAULT_STRATEGY) != Z_OK ) - { - fprintf(stderr, "BgzfStream ERROR: zlib deflate initialization failed\n"); - exit(1); - } + int status = deflateInit2(&zs, + compressionLevel, + Z_DEFLATED, + Constants::GZIP_WINDOW_BITS, + Constants::Z_DEFAULT_MEM_LEVEL, + Z_DEFAULT_STRATEGY); + if ( status != Z_OK ) + throw BamException("BgzfStream::DeflateBlock", "zlib deflateInit2 failed"); // compress the data - int status = deflate(&zs, Z_FINISH); + status = deflate(&zs, Z_FINISH); + + // if not at stream end if ( status != Z_STREAM_END ) { deflateEnd(&zs); - // reduce the input length and try again - if ( status == Z_OK ) { - inputLength -= 1024; - if ( inputLength < 0 ) { - fprintf(stderr, "BgzfStream ERROR: input reduction failed\n"); - exit(1); - } - continue; - } - - fprintf(stderr, "BgzfStream ERROR: zlib::deflateEnd() failed\n"); - exit(1); - } + // if error status + if ( status != Z_OK ) + throw BamException("BgzfStream::DeflateBlock", "zlib deflate failed"); - // finalize the compression routine - if ( deflateEnd(&zs) != Z_OK ) { - fprintf(stderr, "BgzfStream ERROR: zlib::deflateEnd() failed\n"); - exit(1); - } - - compressedLength = zs.total_out; - compressedLength += Constants::BGZF_BLOCK_HEADER_LENGTH + Constants::BGZF_BLOCK_FOOTER_LENGTH; - if ( compressedLength > Constants::BGZF_MAX_BLOCK_SIZE ) { - fprintf(stderr, "BgzfStream ERROR: deflate overflow\n"); - exit(1); + // not enough space available in buffer + // try to reduce the input length & re-start loop + inputLength -= 1024; + if ( inputLength <= 0 ) + throw BamException("BgzfStream::DeflateBlock", "input reduction failed"); + continue; } + // finalize the compression routine + status = deflateEnd(&zs); + if ( status != Z_OK ) + throw BamException("BgzfStream::DeflateBlock", "zlib deflateEnd failed"); + + // update compressedLength + compressedLength = zs.total_out + + Constants::BGZF_BLOCK_HEADER_LENGTH + + Constants::BGZF_BLOCK_FOOTER_LENGTH; + if ( compressedLength > Constants::BGZF_MAX_BLOCK_SIZE ) + throw BamException("BgzfStream::DeflateBlock", "deflate overflow"); + + // quit while loop break; } // store the compressed length - BamTools::PackUnsignedShort(&buffer[16], (unsigned short)(compressedLength - 1)); + BamTools::PackUnsignedShort(&buffer[16], static_cast(compressedLength - 1)); // store the CRC32 checksum - unsigned int crc = crc32(0, NULL, 0); - crc = crc32(crc, (Bytef*)UncompressedBlock, inputLength); + uint32_t crc = crc32(0, NULL, 0); + crc = crc32(crc, (Bytef*)Resources.UncompressedBlock, inputLength); BamTools::PackUnsignedInt(&buffer[compressedLength - 8], crc); BamTools::PackUnsignedInt(&buffer[compressedLength - 4], inputLength); // ensure that we have less than a block of data left - int remaining = BlockOffset - inputLength; + int remaining = m_blockOffset - inputLength; if ( remaining > 0 ) { - if ( remaining > inputLength ) { - fprintf(stderr, "BgzfStream ERROR: after deflate, remainder too large\n"); - exit(1); - } - memcpy(UncompressedBlock, UncompressedBlock + inputLength, remaining); + if ( remaining > inputLength ) + throw BamException("BgzfStream::DeflateBlock", "after deflate, remainder too large"); + memcpy(Resources.UncompressedBlock, Resources.UncompressedBlock + inputLength, remaining); } // update block data - BlockOffset = remaining; + m_blockOffset = remaining; // return result return compressedLength; @@ -182,258 +209,251 @@ int BgzfStream::DeflateBlock(void) { // flushes the data in the BGZF block void BgzfStream::FlushBlock(void) { + BT_ASSERT_X( m_device, "BgzfStream::FlushBlock() - attempting to flush to null device" ); + // flush all of the remaining blocks - while ( BlockOffset > 0 ) { + while ( m_blockOffset > 0 ) { // compress the data block - int blockLength = DeflateBlock(); + const size_t blockLength = DeflateBlock(); - // flush the data to our output stream - int numBytesWritten = fwrite(CompressedBlock, 1, blockLength, Stream); + // flush the data to our output device + const size_t numBytesWritten = m_device->Write(Resources.CompressedBlock, blockLength); if ( numBytesWritten != blockLength ) { - fprintf(stderr, "BgzfStream ERROR: expected to write %u bytes during flushing, but wrote %u bytes\n", - blockLength, numBytesWritten); - exit(1); + stringstream s(""); + s << "expected to write " << blockLength + << " bytes during flushing, but wrote " << numBytesWritten; + throw BamException("BgzfStream::FlushBlock", s.str()); } // update block data - BlockAddress += blockLength; + m_blockAddress += blockLength; } } // decompresses the current block -int BgzfStream::InflateBlock(const int& blockLength) { +size_t BgzfStream::InflateBlock(const size_t& blockLength) { - // inflate the data from compressed buffer into uncompressed buffer + // setup zlib stream object z_stream zs; zs.zalloc = NULL; zs.zfree = NULL; - zs.next_in = (Bytef*)CompressedBlock + 18; + zs.next_in = (Bytef*)Resources.CompressedBlock + 18; zs.avail_in = blockLength - 16; - zs.next_out = (Bytef*)UncompressedBlock; - zs.avail_out = UncompressedBlockSize; + zs.next_out = (Bytef*)Resources.UncompressedBlock; + zs.avail_out = Constants::BGZF_DEFAULT_BLOCK_SIZE; + // initialize int status = inflateInit2(&zs, Constants::GZIP_WINDOW_BITS); - if ( status != Z_OK ) { - fprintf(stderr, "BgzfStream ERROR: could not decompress block - zlib::inflateInit() failed\n"); - return -1; - } + if ( status != Z_OK ) + throw BamException("BgzfStream::InflateBlock", "zlib inflateInit failed"); + // decompress status = inflate(&zs, Z_FINISH); if ( status != Z_STREAM_END ) { inflateEnd(&zs); - fprintf(stderr, "BgzfStream ERROR: could not decompress block - zlib::inflate() failed\n"); - return -1; + throw BamException("BgzfStream::InflateBlock", "zlib inflate failed"); } + // finalize status = inflateEnd(&zs); if ( status != Z_OK ) { - fprintf(stderr, "BgzfStream ERROR: could not decompress block - zlib::inflateEnd() failed\n"); - return -1; + inflateEnd(&zs); + throw BamException("BgzfStream::InflateBlock", "zlib inflateEnd failed"); } // return result return zs.total_out; } -// opens the BGZF file for reading (mode is either "rb" for reading, or "wb" for writing) -bool BgzfStream::Open(const string& filename, const char* mode) { - - // close current stream, if necessary, before opening next - if ( IsOpen ) Close(); - - // determine open mode - if ( strcmp(mode, "rb") == 0 ) - IsWriteOnly = false; - else if ( strcmp(mode, "wb") == 0) - IsWriteOnly = true; - else { - fprintf(stderr, "BgzfStream ERROR: unknown file mode: %s\n", mode); +bool BgzfStream::IsOpen(void) const { + if ( m_device == 0 ) return false; - } + return m_device->IsOpen(); +} - // open BGZF stream on a file - if ( (filename != "stdin") && (filename != "stdout") ) - Stream = fopen(filename.c_str(), mode); +void BgzfStream::Open(const string& filename, const IBamIODevice::OpenMode mode) { - // open BGZF stream on stdin - else if ( (filename == "stdin") && (strcmp(mode, "rb") == 0 ) ) - Stream = freopen(NULL, mode, stdin); + // close current device if necessary + Close(); + BT_ASSERT_X( (m_device == 0), "BgzfStream::Open() - unable to properly close previous IO device" ); - // open BGZF stream on stdout - else if ( (filename == "stdout") && (strcmp(mode, "wb") == 0) ) - Stream = freopen(NULL, mode, stdout); + // retrieve new IO device depending on filename + m_device = BamDeviceFactory::CreateDevice(filename); + BT_ASSERT_X( m_device, "BgzfStream::Open() - unable to create IO device from filename" ); - if ( !Stream ) { - fprintf(stderr, "BgzfStream ERROR: unable to open file %s\n", filename.c_str() ); - return false; + // if device fails to open + if ( !m_device->Open(mode) ) { + const string deviceError = m_device->GetErrorString(); + const string message = string("could not open BGZF stream: \n\t") + deviceError; + throw BamException("BgzfStream::Open", message); } - - // set flag & return success - IsOpen = true; - return true; } // reads BGZF data into a byte buffer -int BgzfStream::Read(char* data, const unsigned int dataLength) { +size_t BgzfStream::Read(char* data, const size_t dataLength) { - // if stream not open for reading (or empty request) - if ( !IsOpen || IsWriteOnly || dataLength == 0 ) + if ( dataLength == 0 ) + return 0; + + // if stream not open for reading + BT_ASSERT_X( m_device, "BgzfStream::Read() - trying to read from null device"); + if ( !m_device->IsOpen() || (m_device->Mode() != IBamIODevice::ReadOnly) ) return 0; // read blocks as needed until desired data length is retrieved - char* output = data; - unsigned int numBytesRead = 0; + size_t numBytesRead = 0; while ( numBytesRead < dataLength ) { // determine bytes available in current block - int bytesAvailable = BlockLength - BlockOffset; + int bytesAvailable = m_blockLength - m_blockOffset; // read (and decompress) next block if needed if ( bytesAvailable <= 0 ) { - if ( !ReadBlock() ) return -1; - bytesAvailable = BlockLength - BlockOffset; - if ( bytesAvailable <= 0 ) break; + ReadBlock(); + bytesAvailable = m_blockLength - m_blockOffset; + if ( bytesAvailable <= 0 ) + break; } // copy data from uncompressed source buffer into data destination buffer - char* buffer = UncompressedBlock; - int copyLength = min( (int)(dataLength-numBytesRead), bytesAvailable ); - memcpy(output, buffer + BlockOffset, copyLength); + const size_t copyLength = min( (dataLength-numBytesRead), (size_t)bytesAvailable ); + memcpy(data, Resources.UncompressedBlock + m_blockOffset, copyLength); // update counters - BlockOffset += copyLength; - output += copyLength; + m_blockOffset += copyLength; + data += copyLength; numBytesRead += copyLength; } // update block data - if ( BlockOffset == BlockLength ) { - BlockAddress = ftell64(Stream); - BlockOffset = 0; - BlockLength = 0; + if ( m_blockOffset == m_blockLength ) { + m_blockAddress = m_device->Tell(); + m_blockOffset = 0; + m_blockLength = 0; + } + // return actual number of bytes read return numBytesRead; } // reads a BGZF block -bool BgzfStream::ReadBlock(void) { +void BgzfStream::ReadBlock(void) { - char header[Constants::BGZF_BLOCK_HEADER_LENGTH]; - int64_t blockAddress = ftell64(Stream); + BT_ASSERT_X( m_device, "BgzfStream::ReadBlock() - trying to read from null IO device"); + + // store block's starting address + int64_t blockAddress = m_device->Tell(); // read block header from file - int count = fread(header, 1, sizeof(header), Stream); + char header[Constants::BGZF_BLOCK_HEADER_LENGTH]; + size_t numBytesRead = m_device->Read(header, Constants::BGZF_BLOCK_HEADER_LENGTH); // if block header empty - if ( count == 0 ) { - BlockLength = 0; - return true; + if ( numBytesRead == 0 ) { + m_blockLength = 0; + return; } // if block header invalid size - if ( count != sizeof(header) ) { - fprintf(stderr, "BgzfStream ERROR: read block failed - could not read block header\n"); - return false; - } + if ( numBytesRead != Constants::BGZF_BLOCK_HEADER_LENGTH ) + throw BamException("BgzfStream::ReadBlock", "invalid block header size"); // validate block header contents - if ( !BgzfStream::CheckBlockHeader(header) ) { - fprintf(stderr, "BgzfStream ERROR: read block failed - invalid block header\n"); - return false; - } + if ( !BgzfStream::CheckBlockHeader(header) ) + throw BamException("BgzfStream::ReadBlock", "invalid block header contents"); // copy header contents to compressed buffer - int blockLength = BamTools::UnpackUnsignedShort(&header[16]) + 1; - char* compressedBlock = CompressedBlock; - memcpy(compressedBlock, header, Constants::BGZF_BLOCK_HEADER_LENGTH); - int remaining = blockLength - Constants::BGZF_BLOCK_HEADER_LENGTH; + const size_t blockLength = BamTools::UnpackUnsignedShort(&header[16]) + 1; + memcpy(Resources.CompressedBlock, header, Constants::BGZF_BLOCK_HEADER_LENGTH); // read remainder of block - count = fread(&compressedBlock[Constants::BGZF_BLOCK_HEADER_LENGTH], 1, remaining, Stream); - if ( count != remaining ) { - fprintf(stderr, "BgzfStream ERROR: read block failed - could not read data from block\n"); - return false; - } + const size_t remaining = blockLength - Constants::BGZF_BLOCK_HEADER_LENGTH; + numBytesRead = m_device->Read(&Resources.CompressedBlock[Constants::BGZF_BLOCK_HEADER_LENGTH], remaining); + if ( numBytesRead != remaining ) + throw BamException("BgzfStream::ReadBlock", "could not read data from block"); // decompress block data - count = InflateBlock(blockLength); - if ( count < 0 ) { - fprintf(stderr, "BgzfStream ERROR: read block failed - could not decompress block data\n"); - return false; - } + numBytesRead = InflateBlock(blockLength); // update block data - if ( BlockLength != 0 ) - BlockOffset = 0; - BlockAddress = blockAddress; - BlockLength = count; - - // return success - return true; + if ( m_blockLength != 0 ) + m_blockOffset = 0; + m_blockAddress = blockAddress; + m_blockLength = numBytesRead; } // seek to position in BGZF file -bool BgzfStream::Seek(const int64_t& position) { +void BgzfStream::Seek(const int64_t& position) { + + BT_ASSERT_X( m_device, "BgzfStream::Seek() - trying to seek on null IO device"); - // skip if not open - if ( !IsOpen ) return false; + // skip if device is not open + if ( !IsOpen() ) return; // determine adjusted offset & address int blockOffset = (position & 0xFFFF); int64_t blockAddress = (position >> 16) & 0xFFFFFFFFFFFFLL; // attempt seek in file - if ( fseek64(Stream, blockAddress, SEEK_SET) != 0 ) { - fprintf(stderr, "BgzfStream ERROR: unable to seek in file\n"); - return false; - } + if ( m_device->IsRandomAccess() && m_device->Seek(blockAddress) ) { - // update block data & return success - BlockLength = 0; - BlockAddress = blockAddress; - BlockOffset = blockOffset; - return true; + // update block data & return success + m_blockLength = 0; + m_blockAddress = blockAddress; + m_blockOffset = blockOffset; + } + else { + stringstream s(""); + s << "unable to seek to position: " << position; + throw BamException("BgzfStream::Seek", s.str()); + } } void BgzfStream::SetWriteCompressed(bool ok) { - IsWriteCompressed = ok; + m_isWriteCompressed = ok; } // get file position in BGZF file int64_t BgzfStream::Tell(void) const { - if ( !IsOpen ) + if ( !IsOpen() ) return 0; - return ( (BlockAddress << 16) | (BlockOffset & 0xFFFF) ); + return ( (m_blockAddress << 16) | (m_blockOffset & 0xFFFF) ); } // writes the supplied data into the BGZF buffer -unsigned int BgzfStream::Write(const char* data, const unsigned int dataLen) { +size_t BgzfStream::Write(const char* data, const size_t dataLength) { + + BT_ASSERT_X( m_device, "BgzfStream::Write() - trying to write to null IO device"); + BT_ASSERT_X( (m_device->Mode() == IBamIODevice::WriteOnly), + "BgzfStream::Write() - trying to write to non-writable IO device"); // skip if file not open for writing - if ( !IsOpen || !IsWriteOnly ) return false; + if ( !IsOpen() ) + return 0; // write blocks as needed til all data is written - unsigned int numBytesWritten = 0; + size_t numBytesWritten = 0; const char* input = data; - unsigned int blockLength = UncompressedBlockSize; - while ( numBytesWritten < dataLen ) { + const size_t blockLength = Constants::BGZF_DEFAULT_BLOCK_SIZE; + while ( numBytesWritten < dataLength ) { // copy data contents to uncompressed output buffer - unsigned int copyLength = min(blockLength - BlockOffset, dataLen - numBytesWritten); - char* buffer = UncompressedBlock; - memcpy(buffer + BlockOffset, input, copyLength); + unsigned int copyLength = min(blockLength - m_blockOffset, dataLength - numBytesWritten); + char* buffer = Resources.UncompressedBlock; + memcpy(buffer + m_blockOffset, input, copyLength); // update counter - BlockOffset += copyLength; + m_blockOffset += copyLength; input += copyLength; numBytesWritten += copyLength; // flush (& compress) output buffer when full - if ( BlockOffset == blockLength ) FlushBlock(); + if ( m_blockOffset == blockLength ) + FlushBlock(); } - // return result + // return actual number of bytes written return numBytesWritten; }