From 153d8f44a0ae6aebd0323289d961e5c00ea2b212 Mon Sep 17 00:00:00 2001 From: Derek Barnett Date: Wed, 24 Jul 2013 16:08:13 -0400 Subject: [PATCH] Stablized HTTP access on all platforms. (issue #54, issue #11) --- src/api/internal/io/BamHttp_p.cpp | 394 ++++++++++++++++-------- src/api/internal/io/BamHttp_p.h | 8 +- src/api/internal/io/HttpHeader_p.h | 2 +- src/api/internal/io/RollingBuffer_p.cpp | 6 +- src/api/internal/io/TcpSocket_p.cpp | 6 +- src/api/internal/io/TcpSocket_p.h | 6 +- 6 files changed, 279 insertions(+), 143 deletions(-) diff --git a/src/api/internal/io/BamHttp_p.cpp b/src/api/internal/io/BamHttp_p.cpp index 377be82..b089172 100644 --- a/src/api/internal/io/BamHttp_p.cpp +++ b/src/api/internal/io/BamHttp_p.cpp @@ -2,7 +2,7 @@ // BamHttp_p.cpp (c) 2011 Derek Barnett // Marth Lab, Department of Biology, Boston College // --------------------------------------------------------------------------- -// Last modified: 8 December 2011 (DB) +// Last modified: 24 July 2013 (DB) // --------------------------------------------------------------------------- // Provides reading/writing of BAM files on HTTP server // *************************************************************************** @@ -16,6 +16,7 @@ using namespace BamTools::Internal; #include #include +#include #include #include using namespace std; @@ -34,9 +35,11 @@ static const size_t HTTP_PREFIX_LENGTH = 7; static const string DOUBLE_NEWLINE = "\n\n"; static const string GET_METHOD = "GET"; +static const string HEAD_METHOD = "HEAD"; static const string HOST_HEADER = "Host"; static const string RANGE_HEADER = "Range"; static const string BYTES_PREFIX = "bytes="; +static const string CONTENT_LENGTH_HEADER = "Content-Length"; static const char HOST_SEPARATOR = '/'; static const char PROXY_SEPARATOR = ':'; @@ -75,7 +78,8 @@ BamHttp::BamHttp(const string& url) , m_response(0) , m_isUrlParsed(false) , m_filePosition(-1) - , m_endRangeFilePosition(-1) + , m_fileEndPosition(-1) + , m_rangeEndPosition(-1) { ParseUrl(url); } @@ -88,25 +92,24 @@ BamHttp::~BamHttp(void) { delete m_socket; } -void BamHttp::Close(void) { - - // disconnect socket - m_socket->DisconnectFromHost(); - - // clean up request & response - if ( m_request ) { - delete m_request; - m_request = 0; - } +void BamHttp::ClearResponse(void) { if ( m_response ) { delete m_response; m_response = 0; } +} - // reset state - necessary?? +void BamHttp::Close(void) { + + // disconnect socket & clear related resources + DisconnectSocket(); + + // reset state m_isUrlParsed = false; - m_filePosition = -1; - m_endRangeFilePosition = -1; + m_filePosition = -1; + m_fileEndPosition = -1; + m_rangeEndPosition = -1; + m_mode = IBamIODevice::NotOpen; } bool BamHttp::ConnectSocket(void) { @@ -115,23 +118,7 @@ bool BamHttp::ConnectSocket(void) { // any state checks, etc? if ( !m_socket->ConnectToHost(m_hostname, m_port, m_mode) ) { - // TODO: set error string - return false; - } - - // attempt initial request - m_filePosition = 0; - m_endRangeFilePosition = -1; - if ( !SendRequest() ) { - // TODO: set error string - Close(); - return false; - } - - // wait for response from server - if ( !ReceiveResponse() ) { - // TODO: set error string - Close(); + SetErrorString("BamHttp::ConnectSocket", m_socket->GetErrorString()); return false; } @@ -139,10 +126,21 @@ bool BamHttp::ConnectSocket(void) { return true; } +void BamHttp::DisconnectSocket(void) { + + // disconnect socket & clean up + m_socket->DisconnectFromHost(); + ClearResponse(); + if ( m_request ) { + delete m_request; + m_request = 0; + } +} + bool BamHttp::EnsureSocketConnection(void) { if ( m_socket->IsConnected() ) return true; - else return ConnectSocket(); + return ConnectSocket(); } bool BamHttp::IsOpen(void) const { @@ -168,6 +166,20 @@ bool BamHttp::Open(const IBamIODevice::OpenMode mode) { return false; } + // initialize our file positions + m_filePosition = 0; + m_fileEndPosition = 0; + m_rangeEndPosition = 0; + + // attempt to send initial request (just 'HEAD' to check connection) + if ( !SendHeadRequest() ) { + SetErrorString("BamHttp::Open", m_socket->GetErrorString()); + return false; + } + + // clear response from HEAD request, not needed + ClearResponse(); + // return success return true; } @@ -216,62 +228,90 @@ int64_t BamHttp::Read(char* data, const unsigned int numBytes) { if ( !IsOpen() ) return -1; - // read until hit desired @numBytes - int64_t bytesReadSoFar = 0; - while ( bytesReadSoFar < numBytes ) { - - // calculate number of bytes we're going to try to read this iteration - const size_t remainingBytes = ( numBytes - bytesReadSoFar ); + int64_t numBytesReadSoFar = 0; + while ( numBytesReadSoFar < numBytes ) { - // if socket has access to entire file contents - // i.e. we received response with full data (status code == 200) - if ( m_endRangeFilePosition < 0 ) { + const size_t remaining = static_cast( numBytes - numBytesReadSoFar ); - // try to read 'remainingBytes' from socket - const int64_t socketBytesRead = ReadFromSocket(data+bytesReadSoFar, remainingBytes); - if ( socketBytesRead < 0 ) // error + // if we're not holding a valid GET reponse, get one + if ( m_response == 0 ) { + if ( !SendGetRequest(remaining) ) return -1; - else if ( socketBytesRead == 0 ) // EOF - return bytesReadSoFar; - bytesReadSoFar += socketBytesRead; - m_filePosition += socketBytesRead; } + BT_ASSERT_X(m_response, "null HTTP response"); - // socket has access to a range of data (might already be in buffer) - // i.e. we received response with partial data (status code == 206) - else { + // check response status code + const int statusCode = m_response->GetStatusCode(); + + // if we receieved full file contents in response + if ( statusCode == 200 ) { + + // try to read 'remaining' bytes from socket + const int64_t socketBytesRead = ReadFromSocket(data+numBytesReadSoFar, remaining); + + // if error + if ( socketBytesRead < 0 ) { + SetErrorString("BamHttp::Read", m_socket->GetErrorString()); + return -1; + } + + // EOF + else if ( socketBytesRead == 0 ) + return numBytesReadSoFar; - // there is data left from last request - if ( m_endRangeFilePosition > m_filePosition ) { + // update counters + numBytesReadSoFar += socketBytesRead; + m_filePosition += socketBytesRead; - // try to read either the total 'remainingBytes' or - // whatever we have remaining from last request range - const size_t rangeRemainingBytes = m_endRangeFilePosition - m_filePosition; - const size_t bytesToRead = std::min(remainingBytes, rangeRemainingBytes); - const int64_t socketBytesRead = ReadFromSocket(data+bytesReadSoFar, bytesToRead); - if ( socketBytesRead < 0 ) // error + } + + // else if we received a range of bytes in response + else if ( statusCode == 206 ) { + + // if we've exhausted the last request + if ( m_filePosition == m_rangeEndPosition ) { + if ( !SendGetRequest(remaining) ) return -1; - else if ( socketBytesRead == 0 ) // EOF - return bytesReadSoFar; - bytesReadSoFar += socketBytesRead; - m_filePosition += socketBytesRead; } - // otherwise, this is a 1st-time read or - // we already read everything from the last GET request else { - // request for next range - if ( !SendRequest(remainingBytes) || !ReceiveResponse() ) { - Close(); + // try to read 'remaining' bytes from socket + const int64_t socketBytesRead = ReadFromSocket(data+numBytesReadSoFar, remaining); + + // if error + if ( socketBytesRead < 0 ) { + SetErrorString("BamHttp::Read", m_socket->GetErrorString()); return -1; } + + // maybe EOF + else if ( socketBytesRead == 0 ) { + + // if we know we're not at end position, fire off a new request + if ( m_fileEndPosition > 0 && m_filePosition < m_fileEndPosition ) { + if ( !SendGetRequest() ) + return -1; + } else + return numBytesReadSoFar; + } + + // update counters + numBytesReadSoFar += socketBytesRead; + m_filePosition += socketBytesRead; } } + + + // else some other HTTP status + else { + SetErrorString("BamHttp::Read", "unsupported status code in response"); + return -1; + } } - // return actual number bytes successfully read - return bytesReadSoFar; + // return actual number of bytes read + return numBytesReadSoFar; } int64_t BamHttp::ReadFromSocket(char* data, const unsigned int maxNumBytes) { @@ -280,17 +320,14 @@ int64_t BamHttp::ReadFromSocket(char* data, const unsigned int maxNumBytes) { bool BamHttp::ReceiveResponse(void) { - // clear any prior response - if ( m_response ) - delete m_response; - - // make sure we're connected - if ( !EnsureSocketConnection() ) - return false; - // fetch header, up until double new line string responseHeader; do { + + // make sure we can read a line + if ( !m_socket->WaitForReadLine() ) + return false; + // read line & append to full header const string headerLine = m_socket->ReadLine(); responseHeader += headerLine; @@ -299,7 +336,7 @@ bool BamHttp::ReceiveResponse(void) { // sanity check if ( responseHeader.empty() ) { - // TODO: set error string + SetErrorString("BamHttp::ReceiveResponse", "empty HTTP response"); Close(); return false; } @@ -307,93 +344,184 @@ bool BamHttp::ReceiveResponse(void) { // create response from header text m_response = new HttpResponseHeader(responseHeader); if ( !m_response->IsValid() ) { - // TODO: set error string + SetErrorString("BamHttp::ReceiveResponse", "could not parse HTTP response"); Close(); return false; } - // if we got range response as requested - if ( m_response->GetStatusCode() == 206 ) - return true; - - // if we got the full file contents instead of range - else if ( m_response->GetStatusCode() == 200 ) { + // if we get here, success + return true; +} - // skip up to current file position - RaiiBuffer tmp(0x8000); - int64_t numBytesRead = 0; - while ( numBytesRead < m_filePosition ) { +bool BamHttp::Seek(const int64_t& position, const int origin) { - const int64_t remaining = m_filePosition - numBytesRead; - const size_t bytesToRead = static_cast( (remaining > 0x8000) ? 0x8000 : remaining ); - const int64_t socketBytesRead = ReadFromSocket(tmp.Buffer, bytesToRead); - if ( socketBytesRead < 0 ) { // error - Close(); - return false; - } - else if ( socketBytesRead == 0 ) // EOF - break; + // if HTTP device not in a valid state + if ( !IsOpen() ) { + SetErrorString("BamHttp::Seek", "cannot seek on unopen connection"); + return false; + } - numBytesRead += socketBytesRead; - } + // reset the connection + DisconnectSocket(); + if ( !ConnectSocket() ) { + SetErrorString("BamHttp::Seek", m_socket->GetErrorString()); + return false; + } - // return success - return ( numBytesRead == m_filePosition); + // udpate file position + switch ( origin ) { + case SEEK_CUR : m_filePosition += position; break; + case SEEK_SET : m_filePosition = position; break; + default : + SetErrorString("BamHttp::Seek", "unsupported seek origin"); + return false; } - // on any other reponse status - // TODO: set error string - Close(); - return false; + // return success + return true; } -bool BamHttp::Seek(const int64_t& position, const int origin) { +bool BamHttp::SendGetRequest(const size_t numBytes) { - // if HTTP device not in a valid state - if ( !IsOpen() ) { - // TODO: set error string + // clear previous data + ClearResponse(); + if ( m_request ) + delete m_request; + m_socket->ClearBuffer(); + + // make sure we're connected + if ( !EnsureSocketConnection() ) + return false; + + // create range string + const int64_t endPosition = m_filePosition + std::max(static_cast(0x10000), numBytes); + stringstream range(""); + range << BYTES_PREFIX << m_filePosition << '-' << endPosition; + + // create request + m_request = new HttpRequestHeader(GET_METHOD, m_filename); + m_request->SetField(HOST_HEADER, m_hostname); + m_request->SetField(RANGE_HEADER, range.str()); + + // send request + const string requestHeader = m_request->ToString(); + const size_t headerSize = requestHeader.size(); + if ( WriteToSocket(requestHeader.c_str(), headerSize) != headerSize ) { + SetErrorString("BamHttp::SendHeadRequest", m_socket->GetErrorString()); return false; } - // discard socket's buffer contents, update positions, & return success + // ensure clean buffer m_socket->ClearBuffer(); - if ( origin == SEEK_CUR ) - m_filePosition += position; - else if ( origin == SEEK_SET ) - m_filePosition = position; - else { - // TODO: set error string + // wait for response + if ( !ReceiveResponse() ) { + SetErrorString("BamHttp::SendGetRequest", m_socket->GetErrorString()); + Close(); return false; } - m_endRangeFilePosition = m_filePosition; - return true; + BT_ASSERT_X(m_response, "BamHttp::SendGetRequest : null HttpResponse"); + BT_ASSERT_X(m_response->IsValid(), "BamHttp::SendGetRequest : invalid HttpResponse"); + + // check response status code + const int statusCode = m_response->GetStatusCode(); + switch ( statusCode ) { + + // ranged response, as requested + case 206 : + // get content length if available + if ( m_response->ContainsKey(CONTENT_LENGTH_HEADER) ) { + const string contentLengthString = m_response->GetValue(CONTENT_LENGTH_HEADER); + m_rangeEndPosition = m_filePosition + atoi( contentLengthString.c_str() ); + } + return true; + + // full contents, not range + case 200 : + { + // skip up to current file position + RaiiBuffer tmp(0x8000); + int64_t numBytesRead = 0; + while ( numBytesRead < m_filePosition ) { + + // read data from response + const int64_t remaining = m_filePosition - numBytesRead; + const size_t bytesToRead = static_cast( (remaining > 0x8000) ? 0x8000 : remaining ); + const int64_t socketBytesRead = ReadFromSocket(tmp.Buffer, bytesToRead); + + // if error + if ( socketBytesRead < 0 ) { + SetErrorString("BamHttp::SendGetRequest", m_socket->GetErrorString()); + Close(); + return false; + } + + // else if EOF + else if ( socketBytesRead == 0 && m_socket->BufferBytesAvailable() == 0 ) + break; + + // update byte counter + numBytesRead += socketBytesRead; + } + + // return success + return ( numBytesRead == m_filePosition); + } + + // any other status codes + default: + break; + } + + // fail on unexpected status code + SetErrorString("BamHttp::SendGetRequest", "unsupported status code in response"); + Close(); + return false; } -bool BamHttp::SendRequest(const size_t numBytes) { +bool BamHttp::SendHeadRequest(void) { - // remove any currently active request + // ensure clean slate + ClearResponse(); if ( m_request ) delete m_request; - - // create range string - m_endRangeFilePosition = m_filePosition + numBytes; - stringstream range(""); - range << BYTES_PREFIX << m_filePosition << '-' << m_endRangeFilePosition; + m_socket->ClearBuffer(); // make sure we're connected if ( !EnsureSocketConnection() ) return false; // create request - m_request = new HttpRequestHeader(GET_METHOD, m_filename); - m_request->SetField(HOST_HEADER, m_hostname); - m_request->SetField(RANGE_HEADER, range.str()); + m_request = new HttpRequestHeader(HEAD_METHOD, m_filename); + m_request->SetField(HOST_HEADER, m_hostname); - // write request to socket + // send request const string requestHeader = m_request->ToString(); const size_t headerSize = requestHeader.size(); - return ( WriteToSocket(requestHeader.c_str(), headerSize) == headerSize ); + if ( WriteToSocket(requestHeader.c_str(), headerSize) != headerSize ) { + SetErrorString("BamHttp::SendHeadRequest", m_socket->GetErrorString()); + return false; + } + + m_socket->ClearBuffer(); + + // wait for response from server + if ( !ReceiveResponse() ) { + SetErrorString("BamHttp::SendHeadRequest", m_socket->GetErrorString()); + Close(); + return false; + } + BT_ASSERT_X(m_response, "BamHttp::SendHeadRequest : null HttpResponse"); + BT_ASSERT_X(m_response->IsValid(), "BamHttp::SendHeadRequest : invalid HttpResponse"); + + // get content length if available + if ( m_response->ContainsKey(CONTENT_LENGTH_HEADER) ) { + const string contentLengthString = m_response->GetValue(CONTENT_LENGTH_HEADER); + m_fileEndPosition = atoi( contentLengthString.c_str() ) - 1; + } + + // return whether we found any errors + return m_socket->GetError() == TcpSocket::NoError; } int64_t BamHttp::Tell(void) const { diff --git a/src/api/internal/io/BamHttp_p.h b/src/api/internal/io/BamHttp_p.h index 371ccce..cbbc95c 100644 --- a/src/api/internal/io/BamHttp_p.h +++ b/src/api/internal/io/BamHttp_p.h @@ -50,12 +50,15 @@ class BamHttp : public IBamIODevice { // internal methods private: + void ClearResponse(void); bool ConnectSocket(void); + void DisconnectSocket(void); bool EnsureSocketConnection(void); void ParseUrl(const std::string& url); int64_t ReadFromSocket(char* data, const unsigned int numBytes); bool ReceiveResponse(void); - bool SendRequest(const size_t numBytes = 0); + bool SendGetRequest(const size_t numBytes = 0x10000); + bool SendHeadRequest(void); int64_t WriteToSocket(const char* data, const unsigned int numBytes); // data members @@ -78,7 +81,8 @@ class BamHttp : public IBamIODevice { // file position int64_t m_filePosition; - int64_t m_endRangeFilePosition; + int64_t m_fileEndPosition; + int64_t m_rangeEndPosition; }; } // namespace Internal diff --git a/src/api/internal/io/HttpHeader_p.h b/src/api/internal/io/HttpHeader_p.h index 7a50ff9..6b838ff 100644 --- a/src/api/internal/io/HttpHeader_p.h +++ b/src/api/internal/io/HttpHeader_p.h @@ -75,7 +75,7 @@ class HttpRequestHeader : public HttpHeader { // ctor & dtor public: - HttpRequestHeader(const std::string& method, // "GET", "PUT", etc + HttpRequestHeader(const std::string& method, // "GET", "HEAD", ... const std::string& resource, // filename int majorVersion = 1, // version info int minorVersion = 1); diff --git a/src/api/internal/io/RollingBuffer_p.cpp b/src/api/internal/io/RollingBuffer_p.cpp index 10e7627..c712b57 100644 --- a/src/api/internal/io/RollingBuffer_p.cpp +++ b/src/api/internal/io/RollingBuffer_p.cpp @@ -237,7 +237,7 @@ size_t RollingBuffer::ReadLine(char* dest, size_t max) { bytesReadSoFar += bytesToRead; Free(bytesToRead); - if ( !((bytesReadSoFar < index+1)&&(bytesReadSoFar < max-1)) ) + if ( !((bytesReadSoFar < index+1) && (bytesReadSoFar < max-1)) ) finished = true; } @@ -274,7 +274,7 @@ char* RollingBuffer::Reserve(size_t n) { if ( (m_tail + n) <= m_data.at(m_tailBufferIndex).Size() ) { // fetch write pointer at current 'tail', increment tail by @n & return - char* ptr = m_data[m_tailBufferIndex].Data() + m_tail; + char* ptr = m_data[m_tailBufferIndex].Data(); //+ m_tail; m_tail += n; return ptr; } @@ -286,7 +286,7 @@ char* RollingBuffer::Reserve(size_t n) { m_data[m_tailBufferIndex].Resize(m_tail + n); // fetch write pointer at current 'tail', increment tail by @n & return - char* ptr = m_data[m_tailBufferIndex].Data() + m_tail; + char* ptr = m_data[m_tailBufferIndex].Data(); //+ m_tail; m_tail += n; return ptr; } diff --git a/src/api/internal/io/TcpSocket_p.cpp b/src/api/internal/io/TcpSocket_p.cpp index 1a5bd86..d390932 100644 --- a/src/api/internal/io/TcpSocket_p.cpp +++ b/src/api/internal/io/TcpSocket_p.cpp @@ -27,7 +27,7 @@ namespace BamTools { namespace Internal { // constants -static const size_t DEFAULT_BUFFER_SIZE = 0x4000; +static const size_t DEFAULT_BUFFER_SIZE = 0x10000; } // namespace Internal } // namespace BamTools @@ -43,7 +43,7 @@ TcpSocket::TcpSocket(void) , m_engine(0) , m_cachedSocketDescriptor(-1) , m_readBuffer(DEFAULT_BUFFER_SIZE) - , m_error(TcpSocket::UnknownSocketError) + , m_error(TcpSocket::NoError) , m_state(TcpSocket::UnconnectedState) { } @@ -79,7 +79,7 @@ bool TcpSocket::ConnectImpl(const HostInfo& hostInfo, m_hostName = hostInfo.HostName(); m_mode = mode; m_state = TcpSocket::UnconnectedState; - m_error = TcpSocket::UnknownSocketError; + m_error = TcpSocket::NoError; // m_localPort = 0; m_remotePort = 0; // m_localAddress.Clear(); diff --git a/src/api/internal/io/TcpSocket_p.h b/src/api/internal/io/TcpSocket_p.h index a25a11e..2ad2dee 100644 --- a/src/api/internal/io/TcpSocket_p.h +++ b/src/api/internal/io/TcpSocket_p.h @@ -28,13 +28,15 @@ namespace BamTools { namespace Internal { +class BamHttp; class TcpSocketEngine; class TcpSocket { // enums public: - enum SocketError { UnknownSocketError = -1 + enum SocketError { NoError = -2 + , UnknownSocketError = -1 , ConnectionRefusedError = 0 , RemoteHostClosedError , HostNotFoundError @@ -116,6 +118,8 @@ class TcpSocket { TcpSocket::SocketError m_error; TcpSocket::SocketState m_state; std::string m_errorString; + + friend class BamHttp; }; } // namespace Internal -- 2.39.2