diff options
author | Alan Conway <aconway@apache.org> | 2008-10-14 18:21:50 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2008-10-14 18:21:50 +0000 |
commit | 11db2ff9108e93a3146aa4608c3c355363bd5ad9 (patch) | |
tree | 36bd8f27e9f10072effb61cb48ff6240a3b510c8 | |
parent | 8f810755623ef26bd76ef2ebf28f6bc387f353dd (diff) | |
download | qpid-python-11db2ff9108e93a3146aa4608c3c355363bd5ad9.tar.gz |
Bug fixes for client-side failover.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@704596 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/client/ConnectionImpl.cpp | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/FailoverConnection.cpp | 8 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/FailoverSession.cpp | 3 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp | 115 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/FailoverSubscriptionManager.h | 45 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Cluster.cpp | 8 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/ClusterMap.cpp | 2 |
7 files changed, 95 insertions, 87 deletions
diff --git a/qpid/cpp/src/qpid/client/ConnectionImpl.cpp b/qpid/cpp/src/qpid/client/ConnectionImpl.cpp index 5d4723b442..bb3517f839 100644 --- a/qpid/cpp/src/qpid/client/ConnectionImpl.cpp +++ b/qpid/cpp/src/qpid/client/ConnectionImpl.cpp @@ -150,6 +150,7 @@ void ConnectionImpl::closed(uint16_t code, const std::string& text) { static const std::string CONN_CLOSED("Connection closed by broker"); void ConnectionImpl::shutdown() { + Mutex::ScopedLock l(lock); if (handler.isClosed()) return; diff --git a/qpid/cpp/src/qpid/client/FailoverConnection.cpp b/qpid/cpp/src/qpid/client/FailoverConnection.cpp index 3e982747ff..e98de868de 100644 --- a/qpid/cpp/src/qpid/client/FailoverConnection.cpp +++ b/qpid/cpp/src/qpid/client/FailoverConnection.cpp @@ -158,6 +158,10 @@ FailoverConnection::failover ( ) fs->prepareForFailover ( newConnection ); } + connection = newConnection; + connection.registerFailureCallback + ( boost::bind(&FailoverConnection::failover, this)); + /* * Tell all sessions to actually failover to the new connection. */ @@ -169,10 +173,6 @@ FailoverConnection::failover ( ) FailoverSession * fs = * sessions_iterator; fs->failover ( ); } - - connection = newConnection; - connection.registerFailureCallback - ( boost::bind(&FailoverConnection::failover, this)); } diff --git a/qpid/cpp/src/qpid/client/FailoverSession.cpp b/qpid/cpp/src/qpid/client/FailoverSession.cpp index c6fb573bce..a088a8c91b 100644 --- a/qpid/cpp/src/qpid/client/FailoverSession.cpp +++ b/qpid/cpp/src/qpid/client/FailoverSession.cpp @@ -82,8 +82,7 @@ FailoverSession::sync() uint32_t FailoverSession::timeout(uint32_t /*seconds*/ ) { - - // MICK WTF? return session.timeout ( seconds ); + // FIXME mgoulish return session.timeout ( seconds ); return 0; } diff --git a/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp b/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp index 5a790e26cd..c1ef7d00c4 100644 --- a/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp +++ b/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp @@ -34,7 +34,8 @@ namespace client { FailoverSubscriptionManager::FailoverSubscriptionManager ( FailoverSession * fs) : name("no_name"), - newSessionIsValid(false) + newSessionIsValid(false), + no_failover(false) { subscriptionManager = new SubscriptionManager(fs->session); fs->setFailoverSubscriptionManager(this); @@ -45,9 +46,10 @@ FailoverSubscriptionManager::FailoverSubscriptionManager ( FailoverSession * fs) void FailoverSubscriptionManager::prepareForFailover ( Session _newSession ) { - Lock l(lock); + sys::Monitor::ScopedLock l(lock); newSession = _newSession; newSessionIsValid = true; + // lock.notifyAll(); } @@ -55,28 +57,11 @@ FailoverSubscriptionManager::prepareForFailover ( Session _newSession ) void FailoverSubscriptionManager::failover ( ) { - // Stop the subscription manager thread so it can notice the failover in progress. + sys::Monitor::ScopedLock l(lock); + // Stop the subscription manager thread so it can notice + // the failover in progress. subscriptionManager->stop(); -} - - - - -FailoverSubscriptionManager::subscribeArgs::subscribeArgs -( int _interface, - MessageListener * _listener, - LocalQueue * _localQueue, - const std::string * _queue, - const FlowControl * _flow, - const std::string * _tag -) : - interface(_interface), - listener(_listener), - localQueue(_localQueue), - queue(_queue), - flow(_flow), - tag(_tag) -{ + lock.notifyAll(); } @@ -86,15 +71,19 @@ void FailoverSubscriptionManager::subscribe ( MessageListener & listener, const std::string & queue, const FlowControl & flow, - const std::string & tag + const std::string & tag, + bool record_this ) { + sys::Monitor::ScopedLock l(lock); + subscriptionManager->subscribe ( listener, queue, flow, tag ); - subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(MessageListener&, const std::string&, const FlowControl&, const std::string&) ) &FailoverSubscriptionManager::subscribe, this, boost::ref(listener), queue, flow, tag ) ); + if ( record_this ) + subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(MessageListener&, const std::string&, const FlowControl&, const std::string&, bool) ) &FailoverSubscriptionManager::subscribe, this, boost::ref(listener), queue, flow, tag, false ) ); } @@ -103,15 +92,20 @@ void FailoverSubscriptionManager::subscribe ( LocalQueue & localQueue, const std::string & queue, const FlowControl & flow, - const std::string & tag + const std::string & tag, + bool record_this ) { + sys::Monitor::ScopedLock l(lock); + subscriptionManager->subscribe ( localQueue, queue, flow, tag ); - subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(LocalQueue&, const std::string&, const FlowControl&, const std::string&) ) &FailoverSubscriptionManager::subscribe, this, localQueue, queue, flow, tag ) ); + + if ( record_this ) + subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(LocalQueue&, const std::string&, const FlowControl&, const std::string&, bool) ) &FailoverSubscriptionManager::subscribe, this, localQueue, queue, flow, tag, false ) ); } @@ -119,15 +113,19 @@ FailoverSubscriptionManager::subscribe ( LocalQueue & localQueue, void FailoverSubscriptionManager::subscribe ( MessageListener & listener, const std::string & queue, - const std::string & tag + const std::string & tag, + bool record_this ) { + sys::Monitor::ScopedLock l(lock); + subscriptionManager->subscribe ( listener, queue, tag ); - // TODO -- more than one subscription - subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(MessageListener&, const std::string&, const std::string&) ) &FailoverSubscriptionManager::subscribe, this, boost::ref(listener), queue, tag ) ); + + if ( record_this ) + subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(MessageListener&, const std::string&, const std::string&, bool) ) &FailoverSubscriptionManager::subscribe, this, boost::ref(listener), queue, tag, false ) ); } @@ -136,14 +134,19 @@ FailoverSubscriptionManager::subscribe ( MessageListener & listener, void FailoverSubscriptionManager::subscribe ( LocalQueue & localQueue, const std::string & queue, - const std::string & tag + const std::string & tag, + bool record_this ) { + sys::Monitor::ScopedLock l(lock); + subscriptionManager->subscribe ( localQueue, queue, tag ); - subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(LocalQueue&, const std::string&, const std::string&) ) &FailoverSubscriptionManager::subscribe, this, localQueue, queue, tag ) ); + + if ( record_this ) + subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(LocalQueue&, const std::string&, const std::string&, bool) ) &FailoverSubscriptionManager::subscribe, this, localQueue, queue, tag, false ) ); } @@ -172,31 +175,46 @@ FailoverSubscriptionManager::cancel ( const std::string tag ) void FailoverSubscriptionManager::run ( ) // User Thread { + std::vector<subscribeFn> mySubscribeFns; + while ( 1 ) { subscriptionManager->run ( ); - Lock l(lock); + // When we drop out of run, if there is a new Session // waiting for us, this is a failover. Otherwise, just // return control to usercode. - if ( newSessionIsValid ) + { - delete subscriptionManager; - subscriptionManager = new SubscriptionManager(newSession); - for ( std::vector<subscribeFn>::iterator i = subscribeFns.begin(); - i < subscribeFns.end(); - ++ i - ) - { - (*i) (); - } - newSessionIsValid = false; + sys::Monitor::ScopedLock l(lock); + + + while ( !newSessionIsValid && !no_failover ) + lock.wait(); + + + if ( newSessionIsValid ) + { + newSessionIsValid = false; + delete subscriptionManager; + subscriptionManager = new SubscriptionManager(newSession); + mySubscribeFns.swap ( subscribeFns ); + } + else + { + // Not a failover, return to user code. + break; + } } - else + + for ( std::vector<subscribeFn>::iterator i = mySubscribeFns.begin(); + i != mySubscribeFns.end(); + ++ i + ) { - // Not a failover, return to user code. - break; + (*i) (); } + } } @@ -222,8 +240,11 @@ FailoverSubscriptionManager::setAutoStop ( bool set ) void FailoverSubscriptionManager::stop ( ) { + sys::Monitor::ScopedLock l(lock); + no_failover = true; subscriptionManager->stop ( ); + lock.notifyAll(); } diff --git a/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.h b/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.h index 8678f5683c..651e2549c2 100644 --- a/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.h +++ b/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.h @@ -33,7 +33,7 @@ #include <qpid/client/LocalQueue.h> #include <qpid/client/FlowControl.h> #include <qpid/sys/Runnable.h> -#include <qpid/sys/Mutex.h> +#include <qpid/sys/Monitor.h> @@ -48,25 +48,27 @@ class FailoverSubscriptionManager FailoverSubscriptionManager ( FailoverSession * fs ); - void foo ( int& arg_1 ); - void subscribe ( MessageListener & listener, const std::string & queue, const FlowControl & flow, - const std::string & tag = std::string() ); + const std::string & tag = std::string(), + bool record_this = true ); void subscribe ( LocalQueue & localQueue, const std::string & queue, const FlowControl & flow, - const std::string & tag=std::string()); + const std::string & tag=std::string(), + bool record_this = true ); void subscribe ( MessageListener & listener, const std::string & queue, - const std::string & tag = std::string()); + const std::string & tag = std::string(), + bool record_this = true ); void subscribe ( LocalQueue & localQueue, const std::string & queue, - const std::string & tag=std::string()); + const std::string & tag=std::string(), + bool record_this = true ); bool get ( Message & result, const std::string & queue, @@ -115,9 +117,9 @@ class FailoverSubscriptionManager std::string name; + private: - typedef sys::Mutex::ScopedLock Lock; - sys::Mutex lock; + sys::Monitor lock; SubscriptionManager * subscriptionManager; @@ -130,32 +132,11 @@ class FailoverSubscriptionManager Session newSession; bool newSessionIsValid; + bool no_failover; + - /* - * */ typedef boost::function<void ()> subscribeFn; std::vector < subscribeFn > subscribeFns; - - struct subscribeArgs - { - int interface; - MessageListener * listener; - LocalQueue * localQueue; - const std::string * queue; - const FlowControl * flow; - const std::string * tag; - - subscribeArgs ( int _interface, - MessageListener *, - LocalQueue *, - const std::string *, - const FlowControl *, - const std::string * - ); - }; - - std::vector < subscribeArgs * > subscriptionReplayVector; - }; }} // namespace qpid::client diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp index d63ad9646b..edf23683ae 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.cpp +++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp @@ -163,7 +163,10 @@ void Cluster::mcastBuffer(const char* data, size_t size, const ConnectionId& con void Cluster::mcast(const Event& e) { Lock l(lock); mcast(e, l); } void Cluster::mcast(const Event& e, Lock&) { - if (state == LEFT) return; + if (state == LEFT) { + lock.notifyAll(); // threads waiting in getUrls() + return; + } if (state < READY && e.isConnection()) { // Stall outgoing connection events. QPID_LOG(trace, *this << " MCAST deferred: " << e ); @@ -351,6 +354,7 @@ void Cluster::configChange ( map = ClusterMap(memberId, myUrl, true); memberUpdate(l); unstall(l); + lock.notifyAll(); // threads waiting in getUrls() } else { // Joining established group. state = NEWBIE; @@ -417,6 +421,8 @@ void Cluster::dumpRequest(const MemberId& id, const std::string& url, Lock& l) { void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) { map.ready(id, Url(url)); + if (id == memberId) + lock.notifyAll(); // threads waiting in getUrls() memberUpdate(l); } diff --git a/qpid/cpp/src/qpid/cluster/ClusterMap.cpp b/qpid/cpp/src/qpid/cluster/ClusterMap.cpp index e2fc25bfaa..f8c5695b23 100644 --- a/qpid/cpp/src/qpid/cluster/ClusterMap.cpp +++ b/qpid/cpp/src/qpid/cluster/ClusterMap.cpp @@ -146,7 +146,7 @@ boost::optional<Url> ClusterMap::dumpOffer(const MemberId& from, const MemberId& newbies.erase(i); // No longer a potential dumpee. return url; } - return boost::none; + return boost::optional<Url>(); } }} // namespace qpid::cluster |