X-Git-Url: https://git.donarmstrong.com/?a=blobdiff_plain;f=src%2Fapi%2Finternal%2FBgzfStream_p.cpp;h=5891067a8ab17f154d2a37ad73e42ba8c9609eec;hb=a847098d20d1bb39a45123c6b886479c6a97001a;hp=69592d655e60811f87416315e8c86d4889c28a66;hpb=2e049ed7f28881bce09653e60f5aea54bfd7afbf;p=bamtools.git diff --git a/src/api/internal/BgzfStream_p.cpp b/src/api/internal/BgzfStream_p.cpp index 69592d6..5891067 100644 --- a/src/api/internal/BgzfStream_p.cpp +++ b/src/api/internal/BgzfStream_p.cpp @@ -2,20 +2,26 @@ // BgzfStream_p.cpp (c) 2011 Derek Barnett // Marth Lab, Department of Biology, Boston College // --------------------------------------------------------------------------- -// Last modified: 6 October 2011(DB) +// Last modified: 11 October 2011(DB) // --------------------------------------------------------------------------- // Based on BGZF routines developed at the Broad Institute. // Provides the basic functionality for reading & writing BGZF files // Replaces the old BGZF.* files to avoid clashing with other toolkits // *************************************************************************** -#include -#include +#include "api/BamAux.h" +#include "api/BamConstants.h" +#include "api/internal/BamDeviceFactory_p.h" +#include "api/internal/BamException_p.h" +#include "api/internal/BgzfStream_p.h" using namespace BamTools; using namespace BamTools::Internal; +#include "zlib.h" + #include #include +#include #include using namespace std; @@ -23,9 +29,7 @@ using namespace std; // RaiiWrapper implementation // ---------------------------- -BgzfStream::RaiiWrapper::RaiiWrapper(void) - : Stream(0) -{ +BgzfStream::RaiiWrapper::RaiiWrapper(void) { CompressedBlock = new char[Constants::BGZF_MAX_BLOCK_SIZE]; UncompressedBlock = new char[Constants::BGZF_DEFAULT_BLOCK_SIZE]; } @@ -37,12 +41,6 @@ BgzfStream::RaiiWrapper::~RaiiWrapper(void) { delete[] UncompressedBlock; CompressedBlock = 0; UncompressedBlock = 0; - - if ( Stream ) { - fflush(Stream); - fclose(Stream); - Stream = 0; - } } // --------------------------- @@ -51,12 +49,11 @@ BgzfStream::RaiiWrapper::~RaiiWrapper(void) { // constructor BgzfStream::BgzfStream(void) - : BlockLength(0) - , BlockOffset(0) - , BlockAddress(0) - , IsOpen(false) - , IsWriteOnly(false) - , IsWriteCompressed(true) + : m_blockLength(0) + , m_blockOffset(0) + , m_blockAddress(0) + , m_isWriteCompressed(true) + , m_device(0) { } // destructor @@ -79,30 +76,27 @@ bool BgzfStream::CheckBlockHeader(char* header) { // closes BGZF file void BgzfStream::Close(void) { - // skip if file not open - if ( !IsOpen ) - return; + // skip if no device open + if ( m_device == 0 ) return; // if writing to file, flush the current BGZF block, // then write an empty block (as EOF marker) - if ( IsWriteOnly ) { + if ( m_device->IsOpen() && (m_device->Mode() == IBamIODevice::WriteOnly) ) { FlushBlock(); const size_t blockLength = DeflateBlock(); - fwrite(Resources.CompressedBlock, 1, blockLength, Resources.Stream); + m_device->Write(Resources.CompressedBlock, blockLength); } - // flush and close stream - fflush(Resources.Stream); - fclose(Resources.Stream); - Resources.Stream = 0; - - // reset initial state - BlockLength = 0; - BlockOffset = 0; - BlockAddress = 0; - IsOpen = false; - IsWriteOnly = false; - IsWriteCompressed = true; + // close device + m_device->Close(); + delete m_device; + m_device = 0; + + // reset state + m_blockLength = 0; + m_blockOffset = 0; + m_blockAddress = 0; + m_isWriteCompressed = true; } // compresses the current block @@ -122,10 +116,10 @@ size_t BgzfStream::DeflateBlock(void) { buffer[14] = Constants::BGZF_LEN; // set compression level - const int compressionLevel = ( IsWriteCompressed ? Z_DEFAULT_COMPRESSION : 0 ); + const int compressionLevel = ( m_isWriteCompressed ? Z_DEFAULT_COMPRESSION : 0 ); // loop to retry for blocks that do not compress enough - int inputLength = BlockOffset; + int inputLength = m_blockOffset; size_t compressedLength = 0; const unsigned int bufferSize = Constants::BGZF_MAX_BLOCK_SIZE; @@ -160,16 +154,16 @@ size_t BgzfStream::DeflateBlock(void) { deflateEnd(&zs); - // if error status - if ( status != Z_OK ) - throw BamException("BgzfStream::DeflateBlock", "zlib deflate failed"); - - // not enough space available in buffer + // there was not enough space available in buffer // try to reduce the input length & re-start loop - inputLength -= 1024; - if ( inputLength <= 0 ) - throw BamException("BgzfStream::DeflateBlock", "input reduction failed"); - continue; + if ( status == Z_OK ) { + inputLength -= 1024; + if ( inputLength < 0 ) + throw BamException("BgzfStream::DeflateBlock", "input reduction failed"); + continue; + } + + throw BamException("BgzfStream::DeflateBlock", "zlib deflate failed"); } // finalize the compression routine @@ -198,29 +192,33 @@ size_t BgzfStream::DeflateBlock(void) { BamTools::PackUnsignedInt(&buffer[compressedLength - 4], inputLength); // ensure that we have less than a block of data left - int remaining = BlockOffset - inputLength; + int remaining = m_blockOffset - inputLength; if ( remaining > 0 ) { if ( remaining > inputLength ) throw BamException("BgzfStream::DeflateBlock", "after deflate, remainder too large"); memcpy(Resources.UncompressedBlock, Resources.UncompressedBlock + inputLength, remaining); } - // update block data & return compressedlength - BlockOffset = remaining; + // update block data + m_blockOffset = remaining; + + // return result return compressedLength; } // flushes the data in the BGZF block void BgzfStream::FlushBlock(void) { + BT_ASSERT_X( m_device, "BgzfStream::FlushBlock() - attempting to flush to null device" ); + // flush all of the remaining blocks - while ( BlockOffset > 0 ) { + while ( m_blockOffset > 0 ) { // compress the data block const size_t blockLength = DeflateBlock(); - // flush the data to our output stream - const size_t numBytesWritten = fwrite(Resources.CompressedBlock, 1, blockLength, Resources.Stream); + // flush the data to our output device + const size_t numBytesWritten = m_device->Write(Resources.CompressedBlock, blockLength); if ( numBytesWritten != blockLength ) { stringstream s(""); s << "expected to write " << blockLength @@ -229,7 +227,7 @@ void BgzfStream::FlushBlock(void) { } // update block data - BlockAddress += blockLength; + m_blockAddress += blockLength; } } @@ -268,82 +266,73 @@ size_t BgzfStream::InflateBlock(const size_t& blockLength) { return zs.total_out; } -// opens the BGZF file for reading (mode is either "rb" for reading, or "wb" for writing) -void BgzfStream::Open(const string& filename, const char* mode) { - - // make sure we're starting with fresh state - if ( IsOpen ) - Close(); - - // determine open mode - if ( strcmp(mode, "rb") == 0 ) - IsWriteOnly = false; - else if ( strcmp(mode, "wb") == 0) - IsWriteOnly = true; - else { - const string message = string("unknown file mode: ") + mode; - throw BamException("BgzfStream::Open", message); - } +bool BgzfStream::IsOpen(void) const { + if ( m_device == 0 ) + return false; + return m_device->IsOpen(); +} - // open BGZF stream on a file - if ( (filename != "stdin") && (filename != "stdout") && (filename != "-")) - Resources.Stream = fopen(filename.c_str(), mode); +void BgzfStream::Open(const string& filename, const IBamIODevice::OpenMode mode) { - // open BGZF stream on stdin - else if ( (filename == "stdin" || filename == "-") && (strcmp(mode, "rb") == 0 ) ) - Resources.Stream = freopen(NULL, mode, stdin); + // close current device if necessary + Close(); + BT_ASSERT_X( (m_device == 0), "BgzfStream::Open() - unable to properly close previous IO device" ); - // open BGZF stream on stdout - else if ( (filename == "stdout" || filename == "-") && (strcmp(mode, "wb") == 0) ) - Resources.Stream = freopen(NULL, mode, stdout); + // retrieve new IO device depending on filename + m_device = BamDeviceFactory::CreateDevice(filename); + BT_ASSERT_X( m_device, "BgzfStream::Open() - unable to create IO device from filename" ); - // ensure valid Stream - if ( !Resources.Stream ) { - const string message = string("unable to open file: ") + filename; + // if device fails to open + if ( !m_device->Open(mode) ) { + const string deviceError = m_device->GetErrorString(); + const string message = string("could not open BGZF stream: \n\t") + deviceError; throw BamException("BgzfStream::Open", message); } - - // set flag - IsOpen = true; } // reads BGZF data into a byte buffer size_t BgzfStream::Read(char* data, const size_t dataLength) { - // if stream not open for reading (or empty request) - if ( !IsOpen || IsWriteOnly || dataLength == 0 ) + if ( dataLength == 0 ) + return 0; + + // if stream not open for reading + BT_ASSERT_X( m_device, "BgzfStream::Read() - trying to read from null device"); + if ( !m_device->IsOpen() || (m_device->Mode() != IBamIODevice::ReadOnly) ) return 0; // read blocks as needed until desired data length is retrieved + char* output = data; size_t numBytesRead = 0; while ( numBytesRead < dataLength ) { // determine bytes available in current block - int bytesAvailable = BlockLength - BlockOffset; + int bytesAvailable = m_blockLength - m_blockOffset; // read (and decompress) next block if needed if ( bytesAvailable <= 0 ) { ReadBlock(); - bytesAvailable = BlockLength - BlockOffset; + bytesAvailable = m_blockLength - m_blockOffset; if ( bytesAvailable <= 0 ) break; } // copy data from uncompressed source buffer into data destination buffer const size_t copyLength = min( (dataLength-numBytesRead), (size_t)bytesAvailable ); - memcpy(data, Resources.UncompressedBlock + BlockOffset, copyLength); + memcpy(output, Resources.UncompressedBlock + m_blockOffset, copyLength); // update counters - BlockOffset += copyLength; - data += copyLength; - numBytesRead += copyLength; + m_blockOffset += copyLength; + output += copyLength; + numBytesRead += copyLength; } // update block data - if ( BlockOffset == BlockLength ) { - BlockAddress = ftell64(Resources.Stream); - BlockOffset = 0; - BlockLength = 0; + if ( m_blockOffset == m_blockLength ) { + m_blockAddress = m_device->Tell(); + m_blockOffset = 0; + m_blockLength = 0; + } // return actual number of bytes read @@ -353,21 +342,23 @@ size_t BgzfStream::Read(char* data, const size_t dataLength) { // reads a BGZF block void BgzfStream::ReadBlock(void) { - // store block start - int64_t blockAddress = ftell64(Resources.Stream); + BT_ASSERT_X( m_device, "BgzfStream::ReadBlock() - trying to read from null IO device"); + + // store block's starting address + int64_t blockAddress = m_device->Tell(); // read block header from file char header[Constants::BGZF_BLOCK_HEADER_LENGTH]; - size_t count = fread(header, 1, Constants::BGZF_BLOCK_HEADER_LENGTH, Resources.Stream); + size_t numBytesRead = m_device->Read(header, Constants::BGZF_BLOCK_HEADER_LENGTH); - // if block header empty, set marker & skip rest of method - if ( count == 0 ) { - BlockLength = 0; + // if block header empty + if ( numBytesRead == 0 ) { + m_blockLength = 0; return; } // if block header invalid size - if ( count != sizeof(header) ) + if ( numBytesRead != Constants::BGZF_BLOCK_HEADER_LENGTH ) throw BamException("BgzfStream::ReadBlock", "invalid block header size"); // validate block header contents @@ -380,57 +371,68 @@ void BgzfStream::ReadBlock(void) { // read remainder of block const size_t remaining = blockLength - Constants::BGZF_BLOCK_HEADER_LENGTH; - count = fread(&Resources.CompressedBlock[Constants::BGZF_BLOCK_HEADER_LENGTH], 1, remaining, Resources.Stream); - if ( count != remaining ) + numBytesRead = m_device->Read(&Resources.CompressedBlock[Constants::BGZF_BLOCK_HEADER_LENGTH], remaining); + if ( numBytesRead != remaining ) throw BamException("BgzfStream::ReadBlock", "could not read data from block"); // decompress block data - count = InflateBlock(blockLength); + numBytesRead = InflateBlock(blockLength); - // update block metadata - if ( BlockLength != 0 ) - BlockOffset = 0; - BlockAddress = blockAddress; - BlockLength = count; + // update block data + if ( m_blockLength != 0 ) + m_blockOffset = 0; + m_blockAddress = blockAddress; + m_blockLength = numBytesRead; } // seek to position in BGZF file void BgzfStream::Seek(const int64_t& position) { + BT_ASSERT_X( m_device, "BgzfStream::Seek() - trying to seek on null IO device"); + + // skip if device is not open + if ( !IsOpen() ) return; + // determine adjusted offset & address int blockOffset = (position & 0xFFFF); int64_t blockAddress = (position >> 16) & 0xFFFFFFFFFFFFLL; // attempt seek in file - if ( fseek64(Resources.Stream, blockAddress, SEEK_SET) != 0 ) { + if ( m_device->IsRandomAccess() && m_device->Seek(blockAddress) ) { + + // update block data & return success + m_blockLength = 0; + m_blockAddress = blockAddress; + m_blockOffset = blockOffset; + } + else { stringstream s(""); s << "unable to seek to position: " << position; throw BamException("BgzfStream::Seek", s.str()); } - - // if successful, update block metadata - BlockLength = 0; - BlockAddress = blockAddress; - BlockOffset = blockOffset; } void BgzfStream::SetWriteCompressed(bool ok) { - IsWriteCompressed = ok; + m_isWriteCompressed = ok; } // get file position in BGZF file int64_t BgzfStream::Tell(void) const { - if ( !IsOpen ) + if ( !IsOpen() ) return 0; - return ( (BlockAddress << 16) | (BlockOffset & 0xFFFF) ); + return ( (m_blockAddress << 16) | (m_blockOffset & 0xFFFF) ); } // writes the supplied data into the BGZF buffer size_t BgzfStream::Write(const char* data, const size_t dataLength) { + BT_ASSERT_X( m_device, "BgzfStream::Write() - trying to write to null IO device"); + BT_ASSERT_X( (m_device->Mode() == IBamIODevice::WriteOnly), + "BgzfStream::Write() - trying to write to non-writable IO device"); + // skip if file not open for writing - if ( !IsOpen || !IsWriteOnly ) - return false; + if ( !IsOpen() ) + return 0; // write blocks as needed til all data is written size_t numBytesWritten = 0; @@ -439,17 +441,17 @@ size_t BgzfStream::Write(const char* data, const size_t dataLength) { while ( numBytesWritten < dataLength ) { // copy data contents to uncompressed output buffer - const size_t copyLength = min(blockLength - BlockOffset, dataLength - numBytesWritten); + unsigned int copyLength = min(blockLength - m_blockOffset, dataLength - numBytesWritten); char* buffer = Resources.UncompressedBlock; - memcpy(buffer + BlockOffset, input, copyLength); + memcpy(buffer + m_blockOffset, input, copyLength); - // update counters - BlockOffset += copyLength; + // update counter + m_blockOffset += copyLength; input += copyLength; numBytesWritten += copyLength; // flush (& compress) output buffer when full - if ( BlockOffset == blockLength ) + if ( m_blockOffset == blockLength ) FlushBlock(); }