diff options
author | Alan Conway <aconway@apache.org> | 2008-10-22 13:09:33 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2008-10-22 13:09:33 +0000 |
commit | ff9a534510c37e8ec951bc0e0cb4528626b67b6e (patch) | |
tree | 1a00b5c6bbe76db1a07dc5c680b2ffa340f54985 | |
parent | f1709b592377abde0ddcc9eab07e85a3b3c928b5 (diff) | |
download | qpid-python-ff9a534510c37e8ec951bc0e0cb4528626b67b6e.tar.gz |
QPID-1382 from Mick Goulish: Improvement to Client-Side Cluster Failover code
Also:
Fix missing DispatchHandle.h include in sys/PollableQueue.h
Added ignore properties for failover example binaries & Makefile.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@707065 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/examples/failover/direct_producer.cpp | 6 | ||||
-rw-r--r-- | qpid/cpp/examples/failover/listener.cpp | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/ConnectionImpl.cpp | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/FailoverConnection.cpp | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/FailoverSession.cpp | 879 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/FailoverSession.h | 15 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/sys/PollableQueue.h | 1 |
8 files changed, 878 insertions, 32 deletions
diff --git a/qpid/cpp/examples/failover/direct_producer.cpp b/qpid/cpp/examples/failover/direct_producer.cpp index 513971197e..2a0104a994 100644 --- a/qpid/cpp/examples/failover/direct_producer.cpp +++ b/qpid/cpp/examples/failover/direct_producer.cpp @@ -60,6 +60,8 @@ main ( int argc, char ** argv) if ( count > 1000 ) report = !(sent % 1000); + report = false; + if ( report ) { std::cout << "sending message " @@ -71,7 +73,7 @@ main ( int argc, char ** argv) message_data << sent; message.setData(message_data.str()); - /* MICK FIXME + /* FIXME mgoulish 21 oct 08 session.messageTransfer ( arg::content=message, arg::destination="amq.direct" ); */ @@ -85,7 +87,7 @@ main ( int argc, char ** argv) } message.setData ( "That's all, folks!" ); - /* FIXME mgoulish 16 Oct 08 + /* FIXME mgoulish 21 oct 08 session.messageTransfer ( arg::content=message, arg::destination="amq.direct" ); diff --git a/qpid/cpp/examples/failover/listener.cpp b/qpid/cpp/examples/failover/listener.cpp index d8cb78c9ce..82913a521a 100644 --- a/qpid/cpp/examples/failover/listener.cpp +++ b/qpid/cpp/examples/failover/listener.cpp @@ -64,8 +64,10 @@ Listener::Listener ( FailoverSubscriptionManager & s ) : void Listener::received ( Message & message ) { + /* if(! (count%1000)) std::cerr << "\t\tListener received: " << message.getData() << std::endl; + * */ ++ count; diff --git a/qpid/cpp/src/qpid/client/ConnectionImpl.cpp b/qpid/cpp/src/qpid/client/ConnectionImpl.cpp index 92cf756580..b95f03164e 100644 --- a/qpid/cpp/src/qpid/client/ConnectionImpl.cpp +++ b/qpid/cpp/src/qpid/client/ConnectionImpl.cpp @@ -151,8 +151,6 @@ void ConnectionImpl::closed(uint16_t code, const std::string& text) { static const std::string CONN_CLOSED("Connection closed by broker"); void ConnectionImpl::shutdown() { - Mutex::ScopedLock l(lock); - if ( failureCallback ) failureCallback(); diff --git a/qpid/cpp/src/qpid/client/FailoverConnection.cpp b/qpid/cpp/src/qpid/client/FailoverConnection.cpp index 33b06a6a1a..8b37ba5cfa 100644 --- a/qpid/cpp/src/qpid/client/FailoverConnection.cpp +++ b/qpid/cpp/src/qpid/client/FailoverConnection.cpp @@ -129,7 +129,7 @@ FailoverConnection::failover ( ) ++ sessions_iterator ) { FailoverSession * fs = * sessions_iterator; - fs->failover_in_progress = true; + fs->failoverStarting(); } std::vector<Url> knownBrokers = connection.getKnownBrokers(); @@ -187,7 +187,7 @@ FailoverConnection::failover ( ) ) { FailoverSession * fs = * sessions_iterator; - fs->failover_in_progress = false; + fs->failoverComplete(); } } diff --git a/qpid/cpp/src/qpid/client/FailoverSession.cpp b/qpid/cpp/src/qpid/client/FailoverSession.cpp index 25867c2a24..d11d5aa362 100644 --- a/qpid/cpp/src/qpid/client/FailoverSession.cpp +++ b/qpid/cpp/src/qpid/client/FailoverSession.cpp @@ -38,7 +38,8 @@ namespace qpid { namespace client { FailoverSession::FailoverSession ( ) : - failover_in_progress(false) + failover_in_progress(false), + failover_count(0) { // The session is created by FailoverConnection::newSession failoverSubscriptionManager = 0; @@ -50,32 +51,108 @@ FailoverSession::~FailoverSession ( ) } + framing::FrameSet::shared_ptr FailoverSession::get() { + while(1) + { + try + { return session.get(); + } + catch ( const std::exception& error ) + { + if ( ! failover_in_progress ) + throw ( error ); + else + { + sys::Monitor::ScopedLock l(lock); + int current_failover_count = failover_count; + while ( current_failover_count == failover_count ) + lock.wait(); + } + } + } + } SessionId -FailoverSession::getId() const +FailoverSession::getId() { + while(1) + { + try + { return session.getId(); + } + catch ( const std::exception& error ) + { + if ( ! failover_in_progress ) + throw ( error ); + else + { + sys::Monitor::ScopedLock l(lock); + int current_failover_count = failover_count; + while ( current_failover_count == failover_count ) + lock.wait(); + } + } + } } void FailoverSession::close() { + while(1) + { + try + { session.close(); + return; + } + catch ( const std::exception& error ) + { + if ( ! failover_in_progress ) + throw ( error ); + else + { + sys::Monitor::ScopedLock l(lock); + int current_failover_count = failover_count; + while ( current_failover_count == failover_count ) + lock.wait(); + } + } + } } void FailoverSession::sync() { + while(1) + { + try + { + session.sync(); + return; + } + catch ( const std::exception& error ) + { + if ( ! failover_in_progress ) + throw ( error ); + else + { + sys::Monitor::ScopedLock l(lock); + int current_failover_count = failover_count; + while ( current_failover_count == failover_count ) + lock.wait(); + } + } + } - session.sync(); } @@ -90,16 +167,53 @@ FailoverSession::timeout(uint32_t /*seconds*/ ) Execution& FailoverSession::getExecution() { - + while(1) + { + try + { return session.getExecution(); + } + catch ( const std::exception& error ) + { + if ( ! failover_in_progress ) + throw ( error ); + else + { + sys::Monitor::ScopedLock l(lock); + int current_failover_count = failover_count; + while ( current_failover_count == failover_count ) + lock.wait(); + } + } + } + } void FailoverSession::flush() { - + while(1) + { + try + { session.flush(); + return; + } + catch ( const std::exception& error ) + { + if ( ! failover_in_progress ) + throw ( error ); + else + { + sys::Monitor::ScopedLock l(lock); + int current_failover_count = failover_count; + while ( current_failover_count == failover_count ) + lock.wait(); + } + } + } + } @@ -109,8 +223,27 @@ FailoverSession::markCompleted(const framing::SequenceNumber& id, bool notifyPeer ) { - + while(1) + { + try + { session.markCompleted ( id, cumulative, notifyPeer ); + return; + } + catch ( const std::exception& error ) + { + if ( ! failover_in_progress ) + throw ( error ); + else + { + sys::Monitor::ScopedLock l(lock); + int current_failover_count = failover_count; + while ( current_failover_count == failover_count ) + lock.wait(); + } + } + } + } @@ -120,8 +253,27 @@ FailoverSession::markCompleted(const framing::SequenceNumber& id, void FailoverSession::executionSync() { - + while(1) + { + try + { session.executionSync(); + return; + } + catch ( const std::exception& error ) + { + if ( ! failover_in_progress ) + throw ( error ); + else + { + sys::Monitor::ScopedLock l(lock); + int current_failover_count = failover_count; + while ( current_failover_count == failover_count ) + lock.wait(); + } + } + } + } @@ -131,10 +283,29 @@ FailoverSession::executionResult ( const SequenceNumber& commandId, const string& value ) { - + while(1) + { + try + { session.executionResult ( commandId, value ); + return; + } + catch ( const std::exception& error ) + { + if ( ! failover_in_progress ) + throw ( error ); + else + { + sys::Monitor::ScopedLock l(lock); + int current_failover_count = failover_count; + while ( current_failover_count == failover_count ) + lock.wait(); + } + } + } + } @@ -149,7 +320,10 @@ FailoverSession::executionException ( uint16_t errorCode, const FieldTable& errorInfo ) { - + while(1) + { + try + { session.executionException ( errorCode, commandId, classCode, @@ -158,6 +332,22 @@ FailoverSession::executionException ( uint16_t errorCode, description, errorInfo ); + return; + } + catch ( const std::exception& error ) + { + if ( ! failover_in_progress ) + throw ( error ); + else + { + sys::Monitor::ScopedLock l(lock); + int current_failover_count = failover_count; + while ( current_failover_count == failover_count ) + lock.wait(); + } + } + } + } @@ -179,7 +369,7 @@ FailoverSession::messageTransfer ( const string& destination, acquireMode, content ); - break; + return; } catch ( ... ) { @@ -197,8 +387,27 @@ FailoverSession::messageTransfer ( const string& destination, void FailoverSession::messageAccept ( const SequenceSet& transfers ) { - + while(1) + { + try + { session.messageAccept ( transfers ); + return; + } + catch ( const std::exception& error ) + { + if ( ! failover_in_progress ) + throw ( error ); + else + { + sys::Monitor::ScopedLock l(lock); + int current_failover_count = failover_count; + while ( current_failover_count == failover_count ) + lock.wait(); + } + } + } + } @@ -209,11 +418,30 @@ FailoverSession::messageReject ( const SequenceSet& transfers, const string& text ) { - + while(1) + { + try + { session.messageReject ( transfers, code, text ); + return; + } + catch ( const std::exception& error ) + { + if ( ! failover_in_progress ) + throw ( error ); + else + { + sys::Monitor::ScopedLock l(lock); + int current_failover_count = failover_count; + while ( current_failover_count == failover_count ) + lock.wait(); + } + } + } + } @@ -223,10 +451,29 @@ FailoverSession::messageRelease ( const SequenceSet& transfers, bool setRedelivered ) { - + while(1) + { + try + { session.messageRelease ( transfers, setRedelivered ); + return; + } + catch ( const std::exception& error ) + { + if ( ! failover_in_progress ) + throw ( error ); + else + { + sys::Monitor::ScopedLock l(lock); + int current_failover_count = failover_count; + while ( current_failover_count == failover_count ) + lock.wait(); + } + } + } + } @@ -234,8 +481,26 @@ FailoverSession::messageRelease ( const SequenceSet& transfers, qpid::framing::MessageAcquireResult FailoverSession::messageAcquire ( const SequenceSet& transfers ) { - + while(1) + { + try + { return session.messageAcquire ( transfers ); + } + catch ( const std::exception& error ) + { + if ( ! failover_in_progress ) + throw ( error ); + else + { + sys::Monitor::ScopedLock l(lock); + int current_failover_count = failover_count; + while ( current_failover_count == failover_count ) + lock.wait(); + } + } + } + } @@ -245,10 +510,28 @@ FailoverSession::messageResume ( const string& destination, const string& resumeId ) { - + while(1) + { + try + { return session.messageResume ( destination, resumeId ); + } + catch ( const std::exception& error ) + { + if ( ! failover_in_progress ) + throw ( error ); + else + { + sys::Monitor::ScopedLock l(lock); + int current_failover_count = failover_count; + while ( current_failover_count == failover_count ) + lock.wait(); + } + } + } + } @@ -264,7 +547,10 @@ FailoverSession::messageSubscribe ( const string& queue, const FieldTable& arguments ) { - + while(1) + { + try + { session.messageSubscribe ( queue, destination, acceptMode, @@ -274,6 +560,22 @@ FailoverSession::messageSubscribe ( const string& queue, resumeTtl, arguments ); + return; + } + catch ( const std::exception& error ) + { + if ( ! failover_in_progress ) + throw ( error ); + else + { + sys::Monitor::ScopedLock l(lock); + int current_failover_count = failover_count; + while ( current_failover_count == failover_count ) + lock.wait(); + } + } + } + } @@ -281,8 +583,27 @@ FailoverSession::messageSubscribe ( const string& queue, void FailoverSession::messageCancel ( const string& destinations ) { - + while(1) + { + try + { session.messageCancel ( destinations ); + return; + } + catch ( const std::exception& error ) + { + if ( ! failover_in_progress ) + throw ( error ); + else + { + sys::Monitor::ScopedLock l(lock); + int current_failover_count = failover_count; + while ( current_failover_count == failover_count ) + lock.wait(); + } + } + } + } @@ -292,9 +613,28 @@ FailoverSession::messageSetFlowMode ( const string& destination, uint8_t flowMode ) { + while(1) + { + try + { session.messageSetFlowMode ( destination, flowMode ); + return; + } + catch ( const std::exception& error ) + { + if ( ! failover_in_progress ) + throw ( error ); + else + { + sys::Monitor::ScopedLock l(lock); + int current_failover_count = failover_count; + while ( current_failover_count == failover_count ) + lock.wait(); + } + } + } } @@ -304,10 +644,29 @@ FailoverSession::messageFlow(const string& destination, uint8_t unit, uint32_t value) { + while(1) + { + try + { session.messageFlow ( destination, unit, value ); + return; + } + catch ( const std::exception& error ) + { + if ( ! failover_in_progress ) + throw ( error ); + else + { + sys::Monitor::ScopedLock l(lock); + int current_failover_count = failover_count; + while ( current_failover_count == failover_count ) + lock.wait(); + } + } + } } @@ -315,7 +674,26 @@ FailoverSession::messageFlow(const string& destination, void FailoverSession::messageFlush(const string& destination) { + while(1) + { + try + { session.messageFlush ( destination ); + return; + } + catch ( const std::exception& error ) + { + if ( ! failover_in_progress ) + throw ( error ); + else + { + sys::Monitor::ScopedLock l(lock); + int current_failover_count = failover_count; + while ( current_failover_count == failover_count ) + lock.wait(); + } + } + } } @@ -323,7 +701,26 @@ FailoverSession::messageFlush(const string& destination) void FailoverSession::messageStop(const string& destination) { + while(1) + { + try + { session.messageStop ( destination ); + return; + } + catch ( const std::exception& error ) + { + if ( ! failover_in_progress ) + throw ( error ); + else + { + sys::Monitor::ScopedLock l(lock); + int current_failover_count = failover_count; + while ( current_failover_count == failover_count ) + lock.wait(); + } + } + } } @@ -331,7 +728,26 @@ FailoverSession::messageStop(const string& destination) void FailoverSession::txSelect() { + while(1) + { + try + { session.txSelect ( ); + return; + } + catch ( const std::exception& error ) + { + if ( ! failover_in_progress ) + throw ( error ); + else + { + sys::Monitor::ScopedLock l(lock); + int current_failover_count = failover_count; + while ( current_failover_count == failover_count ) + lock.wait(); + } + } + } } @@ -339,7 +755,26 @@ FailoverSession::txSelect() void FailoverSession::txCommit() { + while(1) + { + try + { session.txCommit ( ); + return; + } + catch ( const std::exception& error ) + { + if ( ! failover_in_progress ) + throw ( error ); + else + { + sys::Monitor::ScopedLock l(lock); + int current_failover_count = failover_count; + while ( current_failover_count == failover_count ) + lock.wait(); + } + } + } } @@ -347,7 +782,26 @@ FailoverSession::txCommit() void FailoverSession::txRollback() { + while(1) + { + try + { session.txRollback ( ); + return; + } + catch ( const std::exception& error ) + { + if ( ! failover_in_progress ) + throw ( error ); + else + { + sys::Monitor::ScopedLock l(lock); + int current_failover_count = failover_count; + while ( current_failover_count == failover_count ) + lock.wait(); + } + } + } } @@ -355,7 +809,26 @@ FailoverSession::txRollback() void FailoverSession::dtxSelect() { + while(1) + { + try + { session.dtxSelect ( ); + return; + } + catch ( const std::exception& error ) + { + if ( ! failover_in_progress ) + throw ( error ); + else + { + sys::Monitor::ScopedLock l(lock); + int current_failover_count = failover_count; + while ( current_failover_count == failover_count ) + lock.wait(); + } + } + } } @@ -365,10 +838,28 @@ FailoverSession::dtxStart(const Xid& xid, bool join, bool resume) { + while(1) + { + try + { return session.dtxStart ( xid, join, resume ); + } + catch ( const std::exception& error ) + { + if ( ! failover_in_progress ) + throw ( error ); + else + { + sys::Monitor::ScopedLock l(lock); + int current_failover_count = failover_count; + while ( current_failover_count == failover_count ) + lock.wait(); + } + } + } } @@ -378,10 +869,28 @@ FailoverSession::dtxEnd(const Xid& xid, bool fail, bool suspend) { + while(1) + { + try + { return session.dtxEnd ( xid, fail, suspend ); + } + catch ( const std::exception& error ) + { + if ( ! failover_in_progress ) + throw ( error ); + else + { + sys::Monitor::ScopedLock l(lock); + int current_failover_count = failover_count; + while ( current_failover_count == failover_count ) + lock.wait(); + } + } + } } @@ -390,9 +899,27 @@ qpid::framing::XaResult FailoverSession::dtxCommit(const Xid& xid, bool onePhase) { + while(1) + { + try + { return session.dtxCommit ( xid, onePhase ); + } + catch ( const std::exception& error ) + { + if ( ! failover_in_progress ) + throw ( error ); + else + { + sys::Monitor::ScopedLock l(lock); + int current_failover_count = failover_count; + while ( current_failover_count == failover_count ) + lock.wait(); + } + } + } } @@ -400,7 +927,26 @@ FailoverSession::dtxCommit(const Xid& xid, void FailoverSession::dtxForget(const Xid& xid) { + while(1) + { + try + { session.dtxForget ( xid ); + return; + } + catch ( const std::exception& error ) + { + if ( ! failover_in_progress ) + throw ( error ); + else + { + sys::Monitor::ScopedLock l(lock); + int current_failover_count = failover_count; + while ( current_failover_count == failover_count ) + lock.wait(); + } + } + } } @@ -408,7 +954,25 @@ FailoverSession::dtxForget(const Xid& xid) qpid::framing::DtxGetTimeoutResult FailoverSession::dtxGetTimeout(const Xid& xid) { + while(1) + { + try + { return session.dtxGetTimeout ( xid ); + } + catch ( const std::exception& error ) + { + if ( ! failover_in_progress ) + throw ( error ); + else + { + sys::Monitor::ScopedLock l(lock); + int current_failover_count = failover_count; + while ( current_failover_count == failover_count ) + lock.wait(); + } + } + } } @@ -416,7 +980,25 @@ FailoverSession::dtxGetTimeout(const Xid& xid) qpid::framing::XaResult FailoverSession::dtxPrepare(const Xid& xid) { + while(1) + { + try + { return session.dtxPrepare ( xid ); + } + catch ( const std::exception& error ) + { + if ( ! failover_in_progress ) + throw ( error ); + else + { + sys::Monitor::ScopedLock l(lock); + int current_failover_count = failover_count; + while ( current_failover_count == failover_count ) + lock.wait(); + } + } + } } @@ -424,7 +1006,25 @@ FailoverSession::dtxPrepare(const Xid& xid) qpid::framing::DtxRecoverResult FailoverSession::dtxRecover() { + while(1) + { + try + { return session.dtxRecover ( ); + } + catch ( const std::exception& error ) + { + if ( ! failover_in_progress ) + throw ( error ); + else + { + sys::Monitor::ScopedLock l(lock); + int current_failover_count = failover_count; + while ( current_failover_count == failover_count ) + lock.wait(); + } + } + } } @@ -432,7 +1032,25 @@ FailoverSession::dtxRecover() qpid::framing::XaResult FailoverSession::dtxRollback(const Xid& xid) { + while(1) + { + try + { return session.dtxRollback ( xid ); + } + catch ( const std::exception& error ) + { + if ( ! failover_in_progress ) + throw ( error ); + else + { + sys::Monitor::ScopedLock l(lock); + int current_failover_count = failover_count; + while ( current_failover_count == failover_count ) + lock.wait(); + } + } + } } @@ -441,9 +1059,28 @@ void FailoverSession::dtxSetTimeout(const Xid& xid, uint32_t timeout) { + while(1) + { + try + { session.dtxSetTimeout ( xid, timeout ); + return; + } + catch ( const std::exception& error ) + { + if ( ! failover_in_progress ) + throw ( error ); + else + { + sys::Monitor::ScopedLock l(lock); + int current_failover_count = failover_count; + while ( current_failover_count == failover_count ) + lock.wait(); + } + } + } } @@ -457,6 +1094,10 @@ FailoverSession::exchangeDeclare(const string& exchange, bool autoDelete, const FieldTable& arguments) { + while(1) + { + try + { session.exchangeDeclare ( exchange, type, alternateExchange, @@ -465,6 +1106,21 @@ FailoverSession::exchangeDeclare(const string& exchange, autoDelete, arguments ); + return; + } + catch ( const std::exception& error ) + { + if ( ! failover_in_progress ) + throw ( error ); + else + { + sys::Monitor::ScopedLock l(lock); + int current_failover_count = failover_count; + while ( current_failover_count == failover_count ) + lock.wait(); + } + } + } } @@ -473,9 +1129,28 @@ void FailoverSession::exchangeDelete(const string& exchange, bool ifUnused) { + while(1) + { + try + { session.exchangeDelete ( exchange, ifUnused ); + return; + } + catch ( const std::exception& error ) + { + if ( ! failover_in_progress ) + throw ( error ); + else + { + sys::Monitor::ScopedLock l(lock); + int current_failover_count = failover_count; + while ( current_failover_count == failover_count ) + lock.wait(); + } + } + } } @@ -483,7 +1158,25 @@ FailoverSession::exchangeDelete(const string& exchange, qpid::framing::ExchangeQueryResult FailoverSession::exchangeQuery(const string& name) { + while(1) + { + try + { return session.exchangeQuery ( name ); + } + catch ( const std::exception& error ) + { + if ( ! failover_in_progress ) + throw ( error ); + else + { + sys::Monitor::ScopedLock l(lock); + int current_failover_count = failover_count; + while ( current_failover_count == failover_count ) + lock.wait(); + } + } + } } @@ -494,11 +1187,30 @@ FailoverSession::exchangeBind(const string& queue, const string& bindingKey, const FieldTable& arguments) { + while(1) + { + try + { session.exchangeBind ( queue, exchange, bindingKey, arguments ); + return; + } + catch ( const std::exception& error ) + { + if ( ! failover_in_progress ) + throw ( error ); + else + { + sys::Monitor::ScopedLock l(lock); + int current_failover_count = failover_count; + while ( current_failover_count == failover_count ) + lock.wait(); + } + } + } } @@ -508,10 +1220,29 @@ FailoverSession::exchangeUnbind(const string& queue, const string& exchange, const string& bindingKey) { + while(1) + { + try + { session.exchangeUnbind ( queue, exchange, bindingKey ); + return; + } + catch ( const std::exception& error ) + { + if ( ! failover_in_progress ) + throw ( error ); + else + { + sys::Monitor::ScopedLock l(lock); + int current_failover_count = failover_count; + while ( current_failover_count == failover_count ) + lock.wait(); + } + } + } } @@ -522,11 +1253,29 @@ FailoverSession::exchangeBound(const string& exchange, const string& bindingKey, const FieldTable& arguments) { + while(1) + { + try + { return session.exchangeBound ( exchange, queue, bindingKey, arguments ); + } + catch ( const std::exception& error ) + { + if ( ! failover_in_progress ) + throw ( error ); + else + { + sys::Monitor::ScopedLock l(lock); + int current_failover_count = failover_count; + while ( current_failover_count == failover_count ) + lock.wait(); + } + } + } } @@ -540,6 +1289,10 @@ FailoverSession::queueDeclare(const string& queue, bool autoDelete, const FieldTable& arguments) { + while(1) + { + try + { session.queueDeclare ( queue, alternateExchange, passive, @@ -548,6 +1301,21 @@ FailoverSession::queueDeclare(const string& queue, autoDelete, arguments ); + return; + } + catch ( const std::exception& error ) + { + if ( ! failover_in_progress ) + throw ( error ); + else + { + sys::Monitor::ScopedLock l(lock); + int current_failover_count = failover_count; + while ( current_failover_count == failover_count ) + lock.wait(); + } + } + } } @@ -557,10 +1325,29 @@ FailoverSession::queueDelete(const string& queue, bool ifUnused, bool ifEmpty) { + while(1) + { + try + { session.queueDelete ( queue, ifUnused, ifEmpty ); + return; + } + catch ( const std::exception& error ) + { + if ( ! failover_in_progress ) + throw ( error ); + else + { + sys::Monitor::ScopedLock l(lock); + int current_failover_count = failover_count; + while ( current_failover_count == failover_count ) + lock.wait(); + } + } + } } @@ -568,7 +1355,26 @@ FailoverSession::queueDelete(const string& queue, void FailoverSession::queuePurge(const string& queue) { + while(1) + { + try + { session.queuePurge ( queue) ; + return; + } + catch ( const std::exception& error ) + { + if ( ! failover_in_progress ) + throw ( error ); + else + { + sys::Monitor::ScopedLock l(lock); + int current_failover_count = failover_count; + while ( current_failover_count == failover_count ) + lock.wait(); + } + } + } } @@ -576,7 +1382,25 @@ FailoverSession::queuePurge(const string& queue) qpid::framing::QueueQueryResult FailoverSession::queueQuery(const string& queue) { - return session.queueQuery ( queue ); + while(1) + { + try + { + return session.queueQuery ( queue ); + } + catch ( const std::exception& error ) + { + if ( ! failover_in_progress ) + throw ( error ); + else + { + sys::Monitor::ScopedLock l(lock); + int current_failover_count = failover_count; + while ( current_failover_count == failover_count ) + lock.wait(); + } + } + } } @@ -587,6 +1411,7 @@ FailoverSession::queueQuery(const string& queue) void FailoverSession::prepareForFailover ( Connection newConnection ) { + failover_in_progress = true; try { newSession = newConnection.newSession(); @@ -603,6 +1428,24 @@ FailoverSession::prepareForFailover ( Connection newConnection ) } +void +FailoverSession::failoverStarting ( ) +{ + sys::Monitor::ScopedLock l(lock); + failover_in_progress = true; +} + + +void +FailoverSession::failoverComplete ( ) +{ + sys::Monitor::ScopedLock l(lock); + failover_in_progress = false; + ++ failover_count; + lock.notifyAll(); +} + + void FailoverSession::failover ( ) diff --git a/qpid/cpp/src/qpid/client/FailoverSession.h b/qpid/cpp/src/qpid/client/FailoverSession.h index b301353968..7a743da452 100644 --- a/qpid/cpp/src/qpid/client/FailoverSession.h +++ b/qpid/cpp/src/qpid/client/FailoverSession.h @@ -35,7 +35,7 @@ #include "qpid/client/SessionImpl.h" #include "qpid/client/TypedResult.h" #include "qpid/shared_ptr.h" -#include "qpid/sys/Mutex.h" +#include "qpid/sys/Monitor.h" #include <string> @@ -61,7 +61,7 @@ class FailoverSession framing::FrameSet::shared_ptr get(); - SessionId getId() const; + SessionId getId(); void close(); @@ -80,8 +80,6 @@ class FailoverSession void sendCompletion ( ); - bool failover_in_progress; - // Wrapped functions from Session ---------------------------- @@ -293,15 +291,18 @@ class FailoverSession // end Wrapped functions from Session --------------------------- // Tells the FailoverSession to get ready for a failover. + void failoverStarting(); void prepareForFailover ( Connection newConnection ); - void failover ( ); + void failoverComplete(); void setFailoverSubscriptionManager(FailoverSubscriptionManager*); private: - typedef sys::Mutex::ScopedLock Lock; - sys::Mutex lock; + sys::Monitor lock; + bool failover_in_progress; + int failover_count; + FailoverSubscriptionManager * failoverSubscriptionManager; diff --git a/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp b/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp index d12d976ef5..0331cbeb9e 100644 --- a/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp +++ b/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp @@ -48,7 +48,6 @@ FailoverSubscriptionManager::prepareForFailover ( Session _newSession ) sys::Monitor::ScopedLock l(lock); newSession = _newSession; newSessionIsValid = true; - // lock.notifyAll(); } diff --git a/qpid/cpp/src/qpid/sys/PollableQueue.h b/qpid/cpp/src/qpid/sys/PollableQueue.h index 8313196623..a594dab86d 100644 --- a/qpid/cpp/src/qpid/sys/PollableQueue.h +++ b/qpid/cpp/src/qpid/sys/PollableQueue.h @@ -24,6 +24,7 @@ #include "qpid/sys/PollableCondition.h" #include "qpid/sys/Dispatcher.h" +#include "qpid/sys/DispatchHandle.h" #include "qpid/sys/Monitor.h" #include <boost/function.hpp> #include <boost/bind.hpp> |