diff options
Diffstat (limited to 'cpp/src/qpid/client/FailoverSession.cpp')
-rw-r--r-- | cpp/src/qpid/client/FailoverSession.cpp | 879 |
1 files changed, 861 insertions, 18 deletions
diff --git a/cpp/src/qpid/client/FailoverSession.cpp b/cpp/src/qpid/client/FailoverSession.cpp index 25867c2a24..d11d5aa362 100644 --- a/cpp/src/qpid/client/FailoverSession.cpp +++ b/cpp/src/qpid/client/FailoverSession.cpp @@ -38,7 +38,8 @@ namespace qpid { namespace client { FailoverSession::FailoverSession ( ) : - failover_in_progress(false) + failover_in_progress(false), + failover_count(0) { // The session is created by FailoverConnection::newSession failoverSubscriptionManager = 0; @@ -50,32 +51,108 @@ FailoverSession::~FailoverSession ( ) } + framing::FrameSet::shared_ptr FailoverSession::get() { + while(1) + { + try + { return session.get(); + } + catch ( const std::exception& error ) + { + if ( ! failover_in_progress ) + throw ( error ); + else + { + sys::Monitor::ScopedLock l(lock); + int current_failover_count = failover_count; + while ( current_failover_count == failover_count ) + lock.wait(); + } + } + } + } SessionId -FailoverSession::getId() const +FailoverSession::getId() { + while(1) + { + try + { return session.getId(); + } + catch ( const std::exception& error ) + { + if ( ! failover_in_progress ) + throw ( error ); + else + { + sys::Monitor::ScopedLock l(lock); + int current_failover_count = failover_count; + while ( current_failover_count == failover_count ) + lock.wait(); + } + } + } } void FailoverSession::close() { + while(1) + { + try + { session.close(); + return; + } + catch ( const std::exception& error ) + { + if ( ! failover_in_progress ) + throw ( error ); + else + { + sys::Monitor::ScopedLock l(lock); + int current_failover_count = failover_count; + while ( current_failover_count == failover_count ) + lock.wait(); + } + } + } } void FailoverSession::sync() { + while(1) + { + try + { + session.sync(); + return; + } + catch ( const std::exception& error ) + { + if ( ! failover_in_progress ) + throw ( error ); + else + { + sys::Monitor::ScopedLock l(lock); + int current_failover_count = failover_count; + while ( current_failover_count == failover_count ) + lock.wait(); + } + } + } - session.sync(); } @@ -90,16 +167,53 @@ FailoverSession::timeout(uint32_t /*seconds*/ ) Execution& FailoverSession::getExecution() { - + while(1) + { + try + { return session.getExecution(); + } + catch ( const std::exception& error ) + { + if ( ! failover_in_progress ) + throw ( error ); + else + { + sys::Monitor::ScopedLock l(lock); + int current_failover_count = failover_count; + while ( current_failover_count == failover_count ) + lock.wait(); + } + } + } + } void FailoverSession::flush() { - + while(1) + { + try + { session.flush(); + return; + } + catch ( const std::exception& error ) + { + if ( ! failover_in_progress ) + throw ( error ); + else + { + sys::Monitor::ScopedLock l(lock); + int current_failover_count = failover_count; + while ( current_failover_count == failover_count ) + lock.wait(); + } + } + } + } @@ -109,8 +223,27 @@ FailoverSession::markCompleted(const framing::SequenceNumber& id, bool notifyPeer ) { - + while(1) + { + try + { session.markCompleted ( id, cumulative, notifyPeer ); + return; + } + catch ( const std::exception& error ) + { + if ( ! failover_in_progress ) + throw ( error ); + else + { + sys::Monitor::ScopedLock l(lock); + int current_failover_count = failover_count; + while ( current_failover_count == failover_count ) + lock.wait(); + } + } + } + } @@ -120,8 +253,27 @@ FailoverSession::markCompleted(const framing::SequenceNumber& id, void FailoverSession::executionSync() { - + while(1) + { + try + { session.executionSync(); + return; + } + catch ( const std::exception& error ) + { + if ( ! failover_in_progress ) + throw ( error ); + else + { + sys::Monitor::ScopedLock l(lock); + int current_failover_count = failover_count; + while ( current_failover_count == failover_count ) + lock.wait(); + } + } + } + } @@ -131,10 +283,29 @@ FailoverSession::executionResult ( const SequenceNumber& commandId, const string& value ) { - + while(1) + { + try + { session.executionResult ( commandId, value ); + return; + } + catch ( const std::exception& error ) + { + if ( ! failover_in_progress ) + throw ( error ); + else + { + sys::Monitor::ScopedLock l(lock); + int current_failover_count = failover_count; + while ( current_failover_count == failover_count ) + lock.wait(); + } + } + } + } @@ -149,7 +320,10 @@ FailoverSession::executionException ( uint16_t errorCode, const FieldTable& errorInfo ) { - + while(1) + { + try + { session.executionException ( errorCode, commandId, classCode, @@ -158,6 +332,22 @@ FailoverSession::executionException ( uint16_t errorCode, description, errorInfo ); + return; + } + catch ( const std::exception& error ) + { + if ( ! failover_in_progress ) + throw ( error ); + else + { + sys::Monitor::ScopedLock l(lock); + int current_failover_count = failover_count; + while ( current_failover_count == failover_count ) + lock.wait(); + } + } + } + } @@ -179,7 +369,7 @@ FailoverSession::messageTransfer ( const string& destination, acquireMode, content ); - break; + return; } catch ( ... ) { @@ -197,8 +387,27 @@ FailoverSession::messageTransfer ( const string& destination, void FailoverSession::messageAccept ( const SequenceSet& transfers ) { - + while(1) + { + try + { session.messageAccept ( transfers ); + return; + } + catch ( const std::exception& error ) + { + if ( ! failover_in_progress ) + throw ( error ); + else + { + sys::Monitor::ScopedLock l(lock); + int current_failover_count = failover_count; + while ( current_failover_count == failover_count ) + lock.wait(); + } + } + } + } @@ -209,11 +418,30 @@ FailoverSession::messageReject ( const SequenceSet& transfers, const string& text ) { - + while(1) + { + try + { session.messageReject ( transfers, code, text ); + return; + } + catch ( const std::exception& error ) + { + if ( ! failover_in_progress ) + throw ( error ); + else + { + sys::Monitor::ScopedLock l(lock); + int current_failover_count = failover_count; + while ( current_failover_count == failover_count ) + lock.wait(); + } + } + } + } @@ -223,10 +451,29 @@ FailoverSession::messageRelease ( const SequenceSet& transfers, bool setRedelivered ) { - + while(1) + { + try + { session.messageRelease ( transfers, setRedelivered ); + return; + } + catch ( const std::exception& error ) + { + if ( ! failover_in_progress ) + throw ( error ); + else + { + sys::Monitor::ScopedLock l(lock); + int current_failover_count = failover_count; + while ( current_failover_count == failover_count ) + lock.wait(); + } + } + } + } @@ -234,8 +481,26 @@ FailoverSession::messageRelease ( const SequenceSet& transfers, qpid::framing::MessageAcquireResult FailoverSession::messageAcquire ( const SequenceSet& transfers ) { - + while(1) + { + try + { return session.messageAcquire ( transfers ); + } + catch ( const std::exception& error ) + { + if ( ! failover_in_progress ) + throw ( error ); + else + { + sys::Monitor::ScopedLock l(lock); + int current_failover_count = failover_count; + while ( current_failover_count == failover_count ) + lock.wait(); + } + } + } + } @@ -245,10 +510,28 @@ FailoverSession::messageResume ( const string& destination, const string& resumeId ) { - + while(1) + { + try + { return session.messageResume ( destination, resumeId ); + } + catch ( const std::exception& error ) + { + if ( ! failover_in_progress ) + throw ( error ); + else + { + sys::Monitor::ScopedLock l(lock); + int current_failover_count = failover_count; + while ( current_failover_count == failover_count ) + lock.wait(); + } + } + } + } @@ -264,7 +547,10 @@ FailoverSession::messageSubscribe ( const string& queue, const FieldTable& arguments ) { - + while(1) + { + try + { session.messageSubscribe ( queue, destination, acceptMode, @@ -274,6 +560,22 @@ FailoverSession::messageSubscribe ( const string& queue, resumeTtl, arguments ); + return; + } + catch ( const std::exception& error ) + { + if ( ! failover_in_progress ) + throw ( error ); + else + { + sys::Monitor::ScopedLock l(lock); + int current_failover_count = failover_count; + while ( current_failover_count == failover_count ) + lock.wait(); + } + } + } + } @@ -281,8 +583,27 @@ FailoverSession::messageSubscribe ( const string& queue, void FailoverSession::messageCancel ( const string& destinations ) { - + while(1) + { + try + { session.messageCancel ( destinations ); + return; + } + catch ( const std::exception& error ) + { + if ( ! failover_in_progress ) + throw ( error ); + else + { + sys::Monitor::ScopedLock l(lock); + int current_failover_count = failover_count; + while ( current_failover_count == failover_count ) + lock.wait(); + } + } + } + } @@ -292,9 +613,28 @@ FailoverSession::messageSetFlowMode ( const string& destination, uint8_t flowMode ) { + while(1) + { + try + { session.messageSetFlowMode ( destination, flowMode ); + return; + } + catch ( const std::exception& error ) + { + if ( ! failover_in_progress ) + throw ( error ); + else + { + sys::Monitor::ScopedLock l(lock); + int current_failover_count = failover_count; + while ( current_failover_count == failover_count ) + lock.wait(); + } + } + } } @@ -304,10 +644,29 @@ FailoverSession::messageFlow(const string& destination, uint8_t unit, uint32_t value) { + while(1) + { + try + { session.messageFlow ( destination, unit, value ); + return; + } + catch ( const std::exception& error ) + { + if ( ! failover_in_progress ) + throw ( error ); + else + { + sys::Monitor::ScopedLock l(lock); + int current_failover_count = failover_count; + while ( current_failover_count == failover_count ) + lock.wait(); + } + } + } } @@ -315,7 +674,26 @@ FailoverSession::messageFlow(const string& destination, void FailoverSession::messageFlush(const string& destination) { + while(1) + { + try + { session.messageFlush ( destination ); + return; + } + catch ( const std::exception& error ) + { + if ( ! failover_in_progress ) + throw ( error ); + else + { + sys::Monitor::ScopedLock l(lock); + int current_failover_count = failover_count; + while ( current_failover_count == failover_count ) + lock.wait(); + } + } + } } @@ -323,7 +701,26 @@ FailoverSession::messageFlush(const string& destination) void FailoverSession::messageStop(const string& destination) { + while(1) + { + try + { session.messageStop ( destination ); + return; + } + catch ( const std::exception& error ) + { + if ( ! failover_in_progress ) + throw ( error ); + else + { + sys::Monitor::ScopedLock l(lock); + int current_failover_count = failover_count; + while ( current_failover_count == failover_count ) + lock.wait(); + } + } + } } @@ -331,7 +728,26 @@ FailoverSession::messageStop(const string& destination) void FailoverSession::txSelect() { + while(1) + { + try + { session.txSelect ( ); + return; + } + catch ( const std::exception& error ) + { + if ( ! failover_in_progress ) + throw ( error ); + else + { + sys::Monitor::ScopedLock l(lock); + int current_failover_count = failover_count; + while ( current_failover_count == failover_count ) + lock.wait(); + } + } + } } @@ -339,7 +755,26 @@ FailoverSession::txSelect() void FailoverSession::txCommit() { + while(1) + { + try + { session.txCommit ( ); + return; + } + catch ( const std::exception& error ) + { + if ( ! failover_in_progress ) + throw ( error ); + else + { + sys::Monitor::ScopedLock l(lock); + int current_failover_count = failover_count; + while ( current_failover_count == failover_count ) + lock.wait(); + } + } + } } @@ -347,7 +782,26 @@ FailoverSession::txCommit() void FailoverSession::txRollback() { + while(1) + { + try + { session.txRollback ( ); + return; + } + catch ( const std::exception& error ) + { + if ( ! failover_in_progress ) + throw ( error ); + else + { + sys::Monitor::ScopedLock l(lock); + int current_failover_count = failover_count; + while ( current_failover_count == failover_count ) + lock.wait(); + } + } + } } @@ -355,7 +809,26 @@ FailoverSession::txRollback() void FailoverSession::dtxSelect() { + while(1) + { + try + { session.dtxSelect ( ); + return; + } + catch ( const std::exception& error ) + { + if ( ! failover_in_progress ) + throw ( error ); + else + { + sys::Monitor::ScopedLock l(lock); + int current_failover_count = failover_count; + while ( current_failover_count == failover_count ) + lock.wait(); + } + } + } } @@ -365,10 +838,28 @@ FailoverSession::dtxStart(const Xid& xid, bool join, bool resume) { + while(1) + { + try + { return session.dtxStart ( xid, join, resume ); + } + catch ( const std::exception& error ) + { + if ( ! failover_in_progress ) + throw ( error ); + else + { + sys::Monitor::ScopedLock l(lock); + int current_failover_count = failover_count; + while ( current_failover_count == failover_count ) + lock.wait(); + } + } + } } @@ -378,10 +869,28 @@ FailoverSession::dtxEnd(const Xid& xid, bool fail, bool suspend) { + while(1) + { + try + { return session.dtxEnd ( xid, fail, suspend ); + } + catch ( const std::exception& error ) + { + if ( ! failover_in_progress ) + throw ( error ); + else + { + sys::Monitor::ScopedLock l(lock); + int current_failover_count = failover_count; + while ( current_failover_count == failover_count ) + lock.wait(); + } + } + } } @@ -390,9 +899,27 @@ qpid::framing::XaResult FailoverSession::dtxCommit(const Xid& xid, bool onePhase) { + while(1) + { + try + { return session.dtxCommit ( xid, onePhase ); + } + catch ( const std::exception& error ) + { + if ( ! failover_in_progress ) + throw ( error ); + else + { + sys::Monitor::ScopedLock l(lock); + int current_failover_count = failover_count; + while ( current_failover_count == failover_count ) + lock.wait(); + } + } + } } @@ -400,7 +927,26 @@ FailoverSession::dtxCommit(const Xid& xid, void FailoverSession::dtxForget(const Xid& xid) { + while(1) + { + try + { session.dtxForget ( xid ); + return; + } + catch ( const std::exception& error ) + { + if ( ! failover_in_progress ) + throw ( error ); + else + { + sys::Monitor::ScopedLock l(lock); + int current_failover_count = failover_count; + while ( current_failover_count == failover_count ) + lock.wait(); + } + } + } } @@ -408,7 +954,25 @@ FailoverSession::dtxForget(const Xid& xid) qpid::framing::DtxGetTimeoutResult FailoverSession::dtxGetTimeout(const Xid& xid) { + while(1) + { + try + { return session.dtxGetTimeout ( xid ); + } + catch ( const std::exception& error ) + { + if ( ! failover_in_progress ) + throw ( error ); + else + { + sys::Monitor::ScopedLock l(lock); + int current_failover_count = failover_count; + while ( current_failover_count == failover_count ) + lock.wait(); + } + } + } } @@ -416,7 +980,25 @@ FailoverSession::dtxGetTimeout(const Xid& xid) qpid::framing::XaResult FailoverSession::dtxPrepare(const Xid& xid) { + while(1) + { + try + { return session.dtxPrepare ( xid ); + } + catch ( const std::exception& error ) + { + if ( ! failover_in_progress ) + throw ( error ); + else + { + sys::Monitor::ScopedLock l(lock); + int current_failover_count = failover_count; + while ( current_failover_count == failover_count ) + lock.wait(); + } + } + } } @@ -424,7 +1006,25 @@ FailoverSession::dtxPrepare(const Xid& xid) qpid::framing::DtxRecoverResult FailoverSession::dtxRecover() { + while(1) + { + try + { return session.dtxRecover ( ); + } + catch ( const std::exception& error ) + { + if ( ! failover_in_progress ) + throw ( error ); + else + { + sys::Monitor::ScopedLock l(lock); + int current_failover_count = failover_count; + while ( current_failover_count == failover_count ) + lock.wait(); + } + } + } } @@ -432,7 +1032,25 @@ FailoverSession::dtxRecover() qpid::framing::XaResult FailoverSession::dtxRollback(const Xid& xid) { + while(1) + { + try + { return session.dtxRollback ( xid ); + } + catch ( const std::exception& error ) + { + if ( ! failover_in_progress ) + throw ( error ); + else + { + sys::Monitor::ScopedLock l(lock); + int current_failover_count = failover_count; + while ( current_failover_count == failover_count ) + lock.wait(); + } + } + } } @@ -441,9 +1059,28 @@ void FailoverSession::dtxSetTimeout(const Xid& xid, uint32_t timeout) { + while(1) + { + try + { session.dtxSetTimeout ( xid, timeout ); + return; + } + catch ( const std::exception& error ) + { + if ( ! failover_in_progress ) + throw ( error ); + else + { + sys::Monitor::ScopedLock l(lock); + int current_failover_count = failover_count; + while ( current_failover_count == failover_count ) + lock.wait(); + } + } + } } @@ -457,6 +1094,10 @@ FailoverSession::exchangeDeclare(const string& exchange, bool autoDelete, const FieldTable& arguments) { + while(1) + { + try + { session.exchangeDeclare ( exchange, type, alternateExchange, @@ -465,6 +1106,21 @@ FailoverSession::exchangeDeclare(const string& exchange, autoDelete, arguments ); + return; + } + catch ( const std::exception& error ) + { + if ( ! failover_in_progress ) + throw ( error ); + else + { + sys::Monitor::ScopedLock l(lock); + int current_failover_count = failover_count; + while ( current_failover_count == failover_count ) + lock.wait(); + } + } + } } @@ -473,9 +1129,28 @@ void FailoverSession::exchangeDelete(const string& exchange, bool ifUnused) { + while(1) + { + try + { session.exchangeDelete ( exchange, ifUnused ); + return; + } + catch ( const std::exception& error ) + { + if ( ! failover_in_progress ) + throw ( error ); + else + { + sys::Monitor::ScopedLock l(lock); + int current_failover_count = failover_count; + while ( current_failover_count == failover_count ) + lock.wait(); + } + } + } } @@ -483,7 +1158,25 @@ FailoverSession::exchangeDelete(const string& exchange, qpid::framing::ExchangeQueryResult FailoverSession::exchangeQuery(const string& name) { + while(1) + { + try + { return session.exchangeQuery ( name ); + } + catch ( const std::exception& error ) + { + if ( ! failover_in_progress ) + throw ( error ); + else + { + sys::Monitor::ScopedLock l(lock); + int current_failover_count = failover_count; + while ( current_failover_count == failover_count ) + lock.wait(); + } + } + } } @@ -494,11 +1187,30 @@ FailoverSession::exchangeBind(const string& queue, const string& bindingKey, const FieldTable& arguments) { + while(1) + { + try + { session.exchangeBind ( queue, exchange, bindingKey, arguments ); + return; + } + catch ( const std::exception& error ) + { + if ( ! failover_in_progress ) + throw ( error ); + else + { + sys::Monitor::ScopedLock l(lock); + int current_failover_count = failover_count; + while ( current_failover_count == failover_count ) + lock.wait(); + } + } + } } @@ -508,10 +1220,29 @@ FailoverSession::exchangeUnbind(const string& queue, const string& exchange, const string& bindingKey) { + while(1) + { + try + { session.exchangeUnbind ( queue, exchange, bindingKey ); + return; + } + catch ( const std::exception& error ) + { + if ( ! failover_in_progress ) + throw ( error ); + else + { + sys::Monitor::ScopedLock l(lock); + int current_failover_count = failover_count; + while ( current_failover_count == failover_count ) + lock.wait(); + } + } + } } @@ -522,11 +1253,29 @@ FailoverSession::exchangeBound(const string& exchange, const string& bindingKey, const FieldTable& arguments) { + while(1) + { + try + { return session.exchangeBound ( exchange, queue, bindingKey, arguments ); + } + catch ( const std::exception& error ) + { + if ( ! failover_in_progress ) + throw ( error ); + else + { + sys::Monitor::ScopedLock l(lock); + int current_failover_count = failover_count; + while ( current_failover_count == failover_count ) + lock.wait(); + } + } + } } @@ -540,6 +1289,10 @@ FailoverSession::queueDeclare(const string& queue, bool autoDelete, const FieldTable& arguments) { + while(1) + { + try + { session.queueDeclare ( queue, alternateExchange, passive, @@ -548,6 +1301,21 @@ FailoverSession::queueDeclare(const string& queue, autoDelete, arguments ); + return; + } + catch ( const std::exception& error ) + { + if ( ! failover_in_progress ) + throw ( error ); + else + { + sys::Monitor::ScopedLock l(lock); + int current_failover_count = failover_count; + while ( current_failover_count == failover_count ) + lock.wait(); + } + } + } } @@ -557,10 +1325,29 @@ FailoverSession::queueDelete(const string& queue, bool ifUnused, bool ifEmpty) { + while(1) + { + try + { session.queueDelete ( queue, ifUnused, ifEmpty ); + return; + } + catch ( const std::exception& error ) + { + if ( ! failover_in_progress ) + throw ( error ); + else + { + sys::Monitor::ScopedLock l(lock); + int current_failover_count = failover_count; + while ( current_failover_count == failover_count ) + lock.wait(); + } + } + } } @@ -568,7 +1355,26 @@ FailoverSession::queueDelete(const string& queue, void FailoverSession::queuePurge(const string& queue) { + while(1) + { + try + { session.queuePurge ( queue) ; + return; + } + catch ( const std::exception& error ) + { + if ( ! failover_in_progress ) + throw ( error ); + else + { + sys::Monitor::ScopedLock l(lock); + int current_failover_count = failover_count; + while ( current_failover_count == failover_count ) + lock.wait(); + } + } + } } @@ -576,7 +1382,25 @@ FailoverSession::queuePurge(const string& queue) qpid::framing::QueueQueryResult FailoverSession::queueQuery(const string& queue) { - return session.queueQuery ( queue ); + while(1) + { + try + { + return session.queueQuery ( queue ); + } + catch ( const std::exception& error ) + { + if ( ! failover_in_progress ) + throw ( error ); + else + { + sys::Monitor::ScopedLock l(lock); + int current_failover_count = failover_count; + while ( current_failover_count == failover_count ) + lock.wait(); + } + } + } } @@ -587,6 +1411,7 @@ FailoverSession::queueQuery(const string& queue) void FailoverSession::prepareForFailover ( Connection newConnection ) { + failover_in_progress = true; try { newSession = newConnection.newSession(); @@ -603,6 +1428,24 @@ FailoverSession::prepareForFailover ( Connection newConnection ) } +void +FailoverSession::failoverStarting ( ) +{ + sys::Monitor::ScopedLock l(lock); + failover_in_progress = true; +} + + +void +FailoverSession::failoverComplete ( ) +{ + sys::Monitor::ScopedLock l(lock); + failover_in_progress = false; + ++ failover_count; + lock.notifyAll(); +} + + void FailoverSession::failover ( ) |