]> git.donarmstrong.com Git - bamtools.git/blobdiff - src/api/internal/BgzfStream_p.cpp
Bug fix in BgzfStream I/O
[bamtools.git] / src / api / internal / BgzfStream_p.cpp
index 69592d655e60811f87416315e8c86d4889c28a66..5891067a8ab17f154d2a37ad73e42ba8c9609eec 100644 (file)
@@ -2,20 +2,26 @@
 // BgzfStream_p.cpp (c) 2011 Derek Barnett
 // Marth Lab, Department of Biology, Boston College
 // ---------------------------------------------------------------------------
-// Last modified: 6 October 2011(DB)
+// Last modified: 11 October 2011(DB)
 // ---------------------------------------------------------------------------
 // Based on BGZF routines developed at the Broad Institute.
 // Provides the basic functionality for reading & writing BGZF files
 // Replaces the old BGZF.* files to avoid clashing with other toolkits
 // ***************************************************************************
 
-#include <api/internal/BamException_p.h>
-#include <api/internal/BgzfStream_p.h>
+#include "api/BamAux.h"
+#include "api/BamConstants.h"
+#include "api/internal/BamDeviceFactory_p.h"
+#include "api/internal/BamException_p.h"
+#include "api/internal/BgzfStream_p.h"
 using namespace BamTools;
 using namespace BamTools::Internal;
 
+#include "zlib.h"
+
 #include <cstring>
 #include <algorithm>
+#include <iostream>
 #include <sstream>
 using namespace std;
 
@@ -23,9 +29,7 @@ using namespace std;
 // RaiiWrapper implementation
 // ----------------------------
 
-BgzfStream::RaiiWrapper::RaiiWrapper(void)
-    : Stream(0)
-{
+BgzfStream::RaiiWrapper::RaiiWrapper(void) {
     CompressedBlock   = new char[Constants::BGZF_MAX_BLOCK_SIZE];
     UncompressedBlock = new char[Constants::BGZF_DEFAULT_BLOCK_SIZE];
 }
@@ -37,12 +41,6 @@ BgzfStream::RaiiWrapper::~RaiiWrapper(void) {
     delete[] UncompressedBlock;
     CompressedBlock = 0;
     UncompressedBlock = 0;
-
-    if ( Stream ) {
-        fflush(Stream);
-        fclose(Stream);
-        Stream = 0;
-    }
 }
 
 // ---------------------------
@@ -51,12 +49,11 @@ BgzfStream::RaiiWrapper::~RaiiWrapper(void) {
 
 // constructor
 BgzfStream::BgzfStream(void)
-    : BlockLength(0)
-    , BlockOffset(0)
-    , BlockAddress(0)
-    , IsOpen(false)
-    , IsWriteOnly(false)
-    , IsWriteCompressed(true)
+  : m_blockLength(0)
+  , m_blockOffset(0)
+  , m_blockAddress(0)
+  , m_isWriteCompressed(true)
+  , m_device(0)
 { }
 
 // destructor
@@ -79,30 +76,27 @@ bool BgzfStream::CheckBlockHeader(char* header) {
 // closes BGZF file
 void BgzfStream::Close(void) {
 
-    // skip if file not open
-    if ( !IsOpen )
-        return;
+    // skip if no device open
+    if ( m_device == 0 ) return;
 
     // if writing to file, flush the current BGZF block,
     // then write an empty block (as EOF marker)
-    if ( IsWriteOnly ) {
+    if ( m_device->IsOpen() && (m_device->Mode() == IBamIODevice::WriteOnly) ) {
         FlushBlock();
         const size_t blockLength = DeflateBlock();
-        fwrite(Resources.CompressedBlock, 1, blockLength, Resources.Stream);
+        m_device->Write(Resources.CompressedBlock, blockLength);
     }
 
-    // flush and close stream
-    fflush(Resources.Stream);
-    fclose(Resources.Stream);
-    Resources.Stream = 0;
-
-    // reset initial state
-    BlockLength = 0;
-    BlockOffset = 0;
-    BlockAddress = 0;
-    IsOpen = false;
-    IsWriteOnly = false;
-    IsWriteCompressed = true;
+    // close device
+    m_device->Close();
+    delete m_device;
+    m_device = 0;
+
+    // reset state
+    m_blockLength = 0;
+    m_blockOffset = 0;
+    m_blockAddress = 0;
+    m_isWriteCompressed = true;
 }
 
 // compresses the current block
@@ -122,10 +116,10 @@ size_t BgzfStream::DeflateBlock(void) {
     buffer[14] = Constants::BGZF_LEN;
 
     // set compression level
-    const int compressionLevel = ( IsWriteCompressed ? Z_DEFAULT_COMPRESSION : 0 );
+    const int compressionLevel = ( m_isWriteCompressed ? Z_DEFAULT_COMPRESSION : 0 );
 
     // loop to retry for blocks that do not compress enough
-    int inputLength = BlockOffset;
+    int inputLength = m_blockOffset;
     size_t compressedLength = 0;
     const unsigned int bufferSize = Constants::BGZF_MAX_BLOCK_SIZE;
 
@@ -160,16 +154,16 @@ size_t BgzfStream::DeflateBlock(void) {
 
             deflateEnd(&zs);
 
-            // if error status
-            if ( status != Z_OK )
-                throw BamException("BgzfStream::DeflateBlock", "zlib deflate failed");
-
-            // not enough space available in buffer
+            // there was not enough space available in buffer
             // try to reduce the input length & re-start loop
-            inputLength -= 1024;
-            if ( inputLength <= 0 )
-                throw BamException("BgzfStream::DeflateBlock", "input reduction failed");
-            continue;
+            if ( status == Z_OK ) {
+                inputLength -= 1024;
+                if ( inputLength < 0 )
+                    throw BamException("BgzfStream::DeflateBlock", "input reduction failed");
+                continue;
+            }
+
+            throw BamException("BgzfStream::DeflateBlock", "zlib deflate failed");
         }
 
         // finalize the compression routine
@@ -198,29 +192,33 @@ size_t BgzfStream::DeflateBlock(void) {
     BamTools::PackUnsignedInt(&buffer[compressedLength - 4], inputLength);
 
     // ensure that we have less than a block of data left
-    int remaining = BlockOffset - inputLength;
+    int remaining = m_blockOffset - inputLength;
     if ( remaining > 0 ) {
         if ( remaining > inputLength )
             throw BamException("BgzfStream::DeflateBlock", "after deflate, remainder too large");
         memcpy(Resources.UncompressedBlock, Resources.UncompressedBlock + inputLength, remaining);
     }
 
-    // update block data & return compressedlength
-    BlockOffset = remaining;
+    // update block data
+    m_blockOffset = remaining;
+
+    // return result
     return compressedLength;
 }
 
 // flushes the data in the BGZF block
 void BgzfStream::FlushBlock(void) {
 
+    BT_ASSERT_X( m_device, "BgzfStream::FlushBlock() - attempting to flush to null device" );
+
     // flush all of the remaining blocks
-    while ( BlockOffset > 0 ) {
+    while ( m_blockOffset > 0 ) {
 
         // compress the data block
         const size_t blockLength = DeflateBlock();
 
-        // flush the data to our output stream
-        const size_t numBytesWritten = fwrite(Resources.CompressedBlock, 1, blockLength, Resources.Stream);
+        // flush the data to our output device
+        const size_t numBytesWritten = m_device->Write(Resources.CompressedBlock, blockLength);
         if ( numBytesWritten != blockLength ) {
             stringstream s("");
             s << "expected to write " << blockLength
@@ -229,7 +227,7 @@ void BgzfStream::FlushBlock(void) {
         }
 
         // update block data
-        BlockAddress += blockLength;
+        m_blockAddress += blockLength;
     }
 }
 
@@ -268,82 +266,73 @@ size_t BgzfStream::InflateBlock(const size_t& blockLength) {
     return zs.total_out;
 }
 
-// opens the BGZF file for reading (mode is either "rb" for reading, or "wb" for writing)
-void BgzfStream::Open(const string& filename, const char* mode) {
-
-    // make sure we're starting with fresh state
-    if ( IsOpen )
-        Close();
-
-    // determine open mode
-    if ( strcmp(mode, "rb") == 0 )
-        IsWriteOnly = false;
-    else if ( strcmp(mode, "wb") == 0)
-        IsWriteOnly = true;
-    else {
-        const string message = string("unknown file mode: ") + mode;
-        throw BamException("BgzfStream::Open", message);
-    }
+bool BgzfStream::IsOpen(void) const {
+    if ( m_device == 0 )
+        return false;
+    return m_device->IsOpen();
+}
 
-    // open BGZF stream on a file
-    if ( (filename != "stdin") && (filename != "stdout") && (filename != "-"))
-        Resources.Stream = fopen(filename.c_str(), mode);
+void BgzfStream::Open(const string& filename, const IBamIODevice::OpenMode mode) {
 
-    // open BGZF stream on stdin
-    else if ( (filename == "stdin" || filename == "-") && (strcmp(mode, "rb") == 0 ) )
-        Resources.Stream = freopen(NULL, mode, stdin);
+    // close current device if necessary
+    Close();
+    BT_ASSERT_X( (m_device == 0), "BgzfStream::Open() - unable to properly close previous IO device" );
 
-    // open BGZF stream on stdout
-    else if ( (filename == "stdout" || filename == "-") && (strcmp(mode, "wb") == 0) )
-        Resources.Stream = freopen(NULL, mode, stdout);
+    // retrieve new IO device depending on filename
+    m_device = BamDeviceFactory::CreateDevice(filename);
+    BT_ASSERT_X( m_device, "BgzfStream::Open() - unable to create IO device from filename" );
 
-    // ensure valid Stream
-    if ( !Resources.Stream ) {
-        const string message = string("unable to open file: ") + filename;
+    // if device fails to open
+    if ( !m_device->Open(mode) ) {
+        const string deviceError = m_device->GetErrorString();
+        const string message = string("could not open BGZF stream: \n\t") + deviceError;
         throw BamException("BgzfStream::Open", message);
     }
-
-    // set flag
-    IsOpen = true;
 }
 
 // reads BGZF data into a byte buffer
 size_t BgzfStream::Read(char* data, const size_t dataLength) {
 
-    // if stream not open for reading (or empty request)
-    if ( !IsOpen || IsWriteOnly || dataLength == 0 )
+    if ( dataLength == 0 )
+        return 0;
+
+    // if stream not open for reading
+    BT_ASSERT_X( m_device, "BgzfStream::Read() - trying to read from null device");
+    if ( !m_device->IsOpen() || (m_device->Mode() != IBamIODevice::ReadOnly) )
         return 0;
 
     // read blocks as needed until desired data length is retrieved
+    char* output = data;
     size_t numBytesRead = 0;
     while ( numBytesRead < dataLength ) {
 
         // determine bytes available in current block
-        int bytesAvailable = BlockLength - BlockOffset;
+        int bytesAvailable = m_blockLength - m_blockOffset;
 
         // read (and decompress) next block if needed
         if ( bytesAvailable <= 0 ) {
             ReadBlock();
-            bytesAvailable = BlockLength - BlockOffset;
+            bytesAvailable = m_blockLength - m_blockOffset;
             if ( bytesAvailable <= 0 )
                 break;
         }
 
         // copy data from uncompressed source buffer into data destination buffer
         const size_t copyLength = min( (dataLength-numBytesRead), (size_t)bytesAvailable );
-        memcpy(data, Resources.UncompressedBlock + BlockOffset, copyLength);
+        memcpy(output, Resources.UncompressedBlock + m_blockOffset, copyLength);
 
         // update counters
-        BlockOffset  += copyLength;
-        data         += copyLength;
-        numBytesRead += copyLength;
+        m_blockOffset += copyLength;
+        output        += copyLength;
+        numBytesRead  += copyLength;
     }
 
     // update block data
-    if ( BlockOffset == BlockLength ) {
-        BlockAddress = ftell64(Resources.Stream);
-        BlockOffset  = 0;
-        BlockLength  = 0;
+    if ( m_blockOffset == m_blockLength ) {
+        m_blockAddress = m_device->Tell();
+        m_blockOffset  = 0;
+        m_blockLength  = 0;
+
     }
 
     // return actual number of bytes read
@@ -353,21 +342,23 @@ size_t BgzfStream::Read(char* data, const size_t dataLength) {
 // reads a BGZF block
 void BgzfStream::ReadBlock(void) {
 
-    // store block start
-    int64_t blockAddress = ftell64(Resources.Stream);
+    BT_ASSERT_X( m_device, "BgzfStream::ReadBlock() - trying to read from null IO device");
+
+    // store block's starting address
+    int64_t blockAddress = m_device->Tell();
 
     // read block header from file
     char header[Constants::BGZF_BLOCK_HEADER_LENGTH];
-    size_t count = fread(header, 1, Constants::BGZF_BLOCK_HEADER_LENGTH, Resources.Stream);
+    size_t numBytesRead = m_device->Read(header, Constants::BGZF_BLOCK_HEADER_LENGTH);
 
-    // if block header empty, set marker & skip rest of method
-    if ( count == 0 ) {
-        BlockLength = 0;
+    // if block header empty
+    if ( numBytesRead == 0 ) {
+        m_blockLength = 0;
         return;
     }
 
     // if block header invalid size
-    if ( count != sizeof(header) )
+    if ( numBytesRead != Constants::BGZF_BLOCK_HEADER_LENGTH )
         throw BamException("BgzfStream::ReadBlock", "invalid block header size");
 
     // validate block header contents
@@ -380,57 +371,68 @@ void BgzfStream::ReadBlock(void) {
 
     // read remainder of block
     const size_t remaining = blockLength - Constants::BGZF_BLOCK_HEADER_LENGTH;
-    count = fread(&Resources.CompressedBlock[Constants::BGZF_BLOCK_HEADER_LENGTH], 1, remaining, Resources.Stream);
-    if ( count != remaining )
+    numBytesRead = m_device->Read(&Resources.CompressedBlock[Constants::BGZF_BLOCK_HEADER_LENGTH], remaining);
+    if ( numBytesRead != remaining )
         throw BamException("BgzfStream::ReadBlock", "could not read data from block");
 
     // decompress block data
-    count = InflateBlock(blockLength);
+    numBytesRead = InflateBlock(blockLength);
 
-    // update block metadata
-    if ( BlockLength != 0 )
-        BlockOffset = 0;
-    BlockAddress = blockAddress;
-    BlockLength  = count;
+    // update block data
+    if ( m_blockLength != 0 )
+        m_blockOffset = 0;
+    m_blockAddress = blockAddress;
+    m_blockLength  = numBytesRead;
 }
 
 // seek to position in BGZF file
 void BgzfStream::Seek(const int64_t& position) {
 
+    BT_ASSERT_X( m_device, "BgzfStream::Seek() - trying to seek on null IO device");
+
+    // skip if device is not open
+    if ( !IsOpen() ) return;
+
     // determine adjusted offset & address
     int     blockOffset  = (position & 0xFFFF);
     int64_t blockAddress = (position >> 16) & 0xFFFFFFFFFFFFLL;
 
     // attempt seek in file
-    if ( fseek64(Resources.Stream, blockAddress, SEEK_SET) != 0 ) {
+    if ( m_device->IsRandomAccess() && m_device->Seek(blockAddress) ) {
+
+        // update block data & return success
+        m_blockLength  = 0;
+        m_blockAddress = blockAddress;
+        m_blockOffset  = blockOffset;
+    }
+    else {
         stringstream s("");
         s << "unable to seek to position: " << position;
         throw BamException("BgzfStream::Seek", s.str());
     }
-
-    // if successful, update block metadata
-    BlockLength  = 0;
-    BlockAddress = blockAddress;
-    BlockOffset  = blockOffset;
 }
 
 void BgzfStream::SetWriteCompressed(bool ok) {
-    IsWriteCompressed = ok;
+    m_isWriteCompressed = ok;
 }
 
 // get file position in BGZF file
 int64_t BgzfStream::Tell(void) const {
-    if ( !IsOpen )
+    if ( !IsOpen() )
         return 0;
-    return ( (BlockAddress << 16) | (BlockOffset & 0xFFFF) );
+    return ( (m_blockAddress << 16) | (m_blockOffset & 0xFFFF) );
 }
 
 // writes the supplied data into the BGZF buffer
 size_t BgzfStream::Write(const char* data, const size_t dataLength) {
 
+    BT_ASSERT_X( m_device, "BgzfStream::Write() - trying to write to null IO device");
+    BT_ASSERT_X( (m_device->Mode() == IBamIODevice::WriteOnly),
+                 "BgzfStream::Write() - trying to write to non-writable IO device");
+
     // skip if file not open for writing
-    if ( !IsOpen || !IsWriteOnly )
-        return false;
+    if ( !IsOpen() )
+        return 0;
 
     // write blocks as needed til all data is written
     size_t numBytesWritten = 0;
@@ -439,17 +441,17 @@ size_t BgzfStream::Write(const char* data, const size_t dataLength) {
     while ( numBytesWritten < dataLength ) {
 
         // copy data contents to uncompressed output buffer
-        const size_t copyLength = min(blockLength - BlockOffset, dataLength - numBytesWritten);
+        unsigned int copyLength = min(blockLength - m_blockOffset, dataLength - numBytesWritten);
         char* buffer = Resources.UncompressedBlock;
-        memcpy(buffer + BlockOffset, input, copyLength);
+        memcpy(buffer + m_blockOffset, input, copyLength);
 
-        // update counters
-        BlockOffset     += copyLength;
+        // update counter
+        m_blockOffset   += copyLength;
         input           += copyLength;
         numBytesWritten += copyLength;
 
         // flush (& compress) output buffer when full
-        if ( BlockOffset == blockLength )
+        if ( m_blockOffset == blockLength )
             FlushBlock();
     }