summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-10-14 18:21:50 +0000
committerAlan Conway <aconway@apache.org>2008-10-14 18:21:50 +0000
commit11db2ff9108e93a3146aa4608c3c355363bd5ad9 (patch)
tree36bd8f27e9f10072effb61cb48ff6240a3b510c8
parent8f810755623ef26bd76ef2ebf28f6bc387f353dd (diff)
downloadqpid-python-11db2ff9108e93a3146aa4608c3c355363bd5ad9.tar.gz
Bug fixes for client-side failover.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@704596 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/client/ConnectionImpl.cpp1
-rw-r--r--qpid/cpp/src/qpid/client/FailoverConnection.cpp8
-rw-r--r--qpid/cpp/src/qpid/client/FailoverSession.cpp3
-rw-r--r--qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp115
-rw-r--r--qpid/cpp/src/qpid/client/FailoverSubscriptionManager.h45
-rw-r--r--qpid/cpp/src/qpid/cluster/Cluster.cpp8
-rw-r--r--qpid/cpp/src/qpid/cluster/ClusterMap.cpp2
7 files changed, 95 insertions, 87 deletions
diff --git a/qpid/cpp/src/qpid/client/ConnectionImpl.cpp b/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
index 5d4723b442..bb3517f839 100644
--- a/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
+++ b/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
@@ -150,6 +150,7 @@ void ConnectionImpl::closed(uint16_t code, const std::string& text) {
static const std::string CONN_CLOSED("Connection closed by broker");
void ConnectionImpl::shutdown() {
+
Mutex::ScopedLock l(lock);
if (handler.isClosed()) return;
diff --git a/qpid/cpp/src/qpid/client/FailoverConnection.cpp b/qpid/cpp/src/qpid/client/FailoverConnection.cpp
index 3e982747ff..e98de868de 100644
--- a/qpid/cpp/src/qpid/client/FailoverConnection.cpp
+++ b/qpid/cpp/src/qpid/client/FailoverConnection.cpp
@@ -158,6 +158,10 @@ FailoverConnection::failover ( )
fs->prepareForFailover ( newConnection );
}
+ connection = newConnection;
+ connection.registerFailureCallback
+ ( boost::bind(&FailoverConnection::failover, this));
+
/*
* Tell all sessions to actually failover to the new connection.
*/
@@ -169,10 +173,6 @@ FailoverConnection::failover ( )
FailoverSession * fs = * sessions_iterator;
fs->failover ( );
}
-
- connection = newConnection;
- connection.registerFailureCallback
- ( boost::bind(&FailoverConnection::failover, this));
}
diff --git a/qpid/cpp/src/qpid/client/FailoverSession.cpp b/qpid/cpp/src/qpid/client/FailoverSession.cpp
index c6fb573bce..a088a8c91b 100644
--- a/qpid/cpp/src/qpid/client/FailoverSession.cpp
+++ b/qpid/cpp/src/qpid/client/FailoverSession.cpp
@@ -82,8 +82,7 @@ FailoverSession::sync()
uint32_t
FailoverSession::timeout(uint32_t /*seconds*/ )
{
-
- // MICK WTF? return session.timeout ( seconds );
+ // FIXME mgoulish return session.timeout ( seconds );
return 0;
}
diff --git a/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp b/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp
index 5a790e26cd..c1ef7d00c4 100644
--- a/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp
+++ b/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp
@@ -34,7 +34,8 @@ namespace client {
FailoverSubscriptionManager::FailoverSubscriptionManager ( FailoverSession * fs) :
name("no_name"),
- newSessionIsValid(false)
+ newSessionIsValid(false),
+ no_failover(false)
{
subscriptionManager = new SubscriptionManager(fs->session);
fs->setFailoverSubscriptionManager(this);
@@ -45,9 +46,10 @@ FailoverSubscriptionManager::FailoverSubscriptionManager ( FailoverSession * fs)
void
FailoverSubscriptionManager::prepareForFailover ( Session _newSession )
{
- Lock l(lock);
+ sys::Monitor::ScopedLock l(lock);
newSession = _newSession;
newSessionIsValid = true;
+ // lock.notifyAll();
}
@@ -55,28 +57,11 @@ FailoverSubscriptionManager::prepareForFailover ( Session _newSession )
void
FailoverSubscriptionManager::failover ( )
{
- // Stop the subscription manager thread so it can notice the failover in progress.
+ sys::Monitor::ScopedLock l(lock);
+ // Stop the subscription manager thread so it can notice
+ // the failover in progress.
subscriptionManager->stop();
-}
-
-
-
-
-FailoverSubscriptionManager::subscribeArgs::subscribeArgs
-( int _interface,
- MessageListener * _listener,
- LocalQueue * _localQueue,
- const std::string * _queue,
- const FlowControl * _flow,
- const std::string * _tag
-) :
- interface(_interface),
- listener(_listener),
- localQueue(_localQueue),
- queue(_queue),
- flow(_flow),
- tag(_tag)
-{
+ lock.notifyAll();
}
@@ -86,15 +71,19 @@ void
FailoverSubscriptionManager::subscribe ( MessageListener & listener,
const std::string & queue,
const FlowControl & flow,
- const std::string & tag
+ const std::string & tag,
+ bool record_this
)
{
+ sys::Monitor::ScopedLock l(lock);
+
subscriptionManager->subscribe ( listener,
queue,
flow,
tag
);
- subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(MessageListener&, const std::string&, const FlowControl&, const std::string&) ) &FailoverSubscriptionManager::subscribe, this, boost::ref(listener), queue, flow, tag ) );
+ if ( record_this )
+ subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(MessageListener&, const std::string&, const FlowControl&, const std::string&, bool) ) &FailoverSubscriptionManager::subscribe, this, boost::ref(listener), queue, flow, tag, false ) );
}
@@ -103,15 +92,20 @@ void
FailoverSubscriptionManager::subscribe ( LocalQueue & localQueue,
const std::string & queue,
const FlowControl & flow,
- const std::string & tag
+ const std::string & tag,
+ bool record_this
)
{
+ sys::Monitor::ScopedLock l(lock);
+
subscriptionManager->subscribe ( localQueue,
queue,
flow,
tag
);
- subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(LocalQueue&, const std::string&, const FlowControl&, const std::string&) ) &FailoverSubscriptionManager::subscribe, this, localQueue, queue, flow, tag ) );
+
+ if ( record_this )
+ subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(LocalQueue&, const std::string&, const FlowControl&, const std::string&, bool) ) &FailoverSubscriptionManager::subscribe, this, localQueue, queue, flow, tag, false ) );
}
@@ -119,15 +113,19 @@ FailoverSubscriptionManager::subscribe ( LocalQueue & localQueue,
void
FailoverSubscriptionManager::subscribe ( MessageListener & listener,
const std::string & queue,
- const std::string & tag
+ const std::string & tag,
+ bool record_this
)
{
+ sys::Monitor::ScopedLock l(lock);
+
subscriptionManager->subscribe ( listener,
queue,
tag
);
- // TODO -- more than one subscription
- subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(MessageListener&, const std::string&, const std::string&) ) &FailoverSubscriptionManager::subscribe, this, boost::ref(listener), queue, tag ) );
+
+ if ( record_this )
+ subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(MessageListener&, const std::string&, const std::string&, bool) ) &FailoverSubscriptionManager::subscribe, this, boost::ref(listener), queue, tag, false ) );
}
@@ -136,14 +134,19 @@ FailoverSubscriptionManager::subscribe ( MessageListener & listener,
void
FailoverSubscriptionManager::subscribe ( LocalQueue & localQueue,
const std::string & queue,
- const std::string & tag
+ const std::string & tag,
+ bool record_this
)
{
+ sys::Monitor::ScopedLock l(lock);
+
subscriptionManager->subscribe ( localQueue,
queue,
tag
);
- subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(LocalQueue&, const std::string&, const std::string&) ) &FailoverSubscriptionManager::subscribe, this, localQueue, queue, tag ) );
+
+ if ( record_this )
+ subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(LocalQueue&, const std::string&, const std::string&, bool) ) &FailoverSubscriptionManager::subscribe, this, localQueue, queue, tag, false ) );
}
@@ -172,31 +175,46 @@ FailoverSubscriptionManager::cancel ( const std::string tag )
void
FailoverSubscriptionManager::run ( ) // User Thread
{
+ std::vector<subscribeFn> mySubscribeFns;
+
while ( 1 )
{
subscriptionManager->run ( );
- Lock l(lock);
+
// When we drop out of run, if there is a new Session
// waiting for us, this is a failover. Otherwise, just
// return control to usercode.
- if ( newSessionIsValid )
+
{
- delete subscriptionManager;
- subscriptionManager = new SubscriptionManager(newSession);
- for ( std::vector<subscribeFn>::iterator i = subscribeFns.begin();
- i < subscribeFns.end();
- ++ i
- )
- {
- (*i) ();
- }
- newSessionIsValid = false;
+ sys::Monitor::ScopedLock l(lock);
+
+
+ while ( !newSessionIsValid && !no_failover )
+ lock.wait();
+
+
+ if ( newSessionIsValid )
+ {
+ newSessionIsValid = false;
+ delete subscriptionManager;
+ subscriptionManager = new SubscriptionManager(newSession);
+ mySubscribeFns.swap ( subscribeFns );
+ }
+ else
+ {
+ // Not a failover, return to user code.
+ break;
+ }
}
- else
+
+ for ( std::vector<subscribeFn>::iterator i = mySubscribeFns.begin();
+ i != mySubscribeFns.end();
+ ++ i
+ )
{
- // Not a failover, return to user code.
- break;
+ (*i) ();
}
+
}
}
@@ -222,8 +240,11 @@ FailoverSubscriptionManager::setAutoStop ( bool set )
void
FailoverSubscriptionManager::stop ( )
{
+ sys::Monitor::ScopedLock l(lock);
+ no_failover = true;
subscriptionManager->stop ( );
+ lock.notifyAll();
}
diff --git a/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.h b/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.h
index 8678f5683c..651e2549c2 100644
--- a/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.h
+++ b/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.h
@@ -33,7 +33,7 @@
#include <qpid/client/LocalQueue.h>
#include <qpid/client/FlowControl.h>
#include <qpid/sys/Runnable.h>
-#include <qpid/sys/Mutex.h>
+#include <qpid/sys/Monitor.h>
@@ -48,25 +48,27 @@ class FailoverSubscriptionManager
FailoverSubscriptionManager ( FailoverSession * fs );
- void foo ( int& arg_1 );
-
void subscribe ( MessageListener & listener,
const std::string & queue,
const FlowControl & flow,
- const std::string & tag = std::string() );
+ const std::string & tag = std::string(),
+ bool record_this = true );
void subscribe ( LocalQueue & localQueue,
const std::string & queue,
const FlowControl & flow,
- const std::string & tag=std::string());
+ const std::string & tag=std::string(),
+ bool record_this = true );
void subscribe ( MessageListener & listener,
const std::string & queue,
- const std::string & tag = std::string());
+ const std::string & tag = std::string(),
+ bool record_this = true );
void subscribe ( LocalQueue & localQueue,
const std::string & queue,
- const std::string & tag=std::string());
+ const std::string & tag=std::string(),
+ bool record_this = true );
bool get ( Message & result,
const std::string & queue,
@@ -115,9 +117,9 @@ class FailoverSubscriptionManager
std::string name;
+
private:
- typedef sys::Mutex::ScopedLock Lock;
- sys::Mutex lock;
+ sys::Monitor lock;
SubscriptionManager * subscriptionManager;
@@ -130,32 +132,11 @@ class FailoverSubscriptionManager
Session newSession;
bool newSessionIsValid;
+ bool no_failover;
+
- /*
- * */
typedef boost::function<void ()> subscribeFn;
std::vector < subscribeFn > subscribeFns;
-
- struct subscribeArgs
- {
- int interface;
- MessageListener * listener;
- LocalQueue * localQueue;
- const std::string * queue;
- const FlowControl * flow;
- const std::string * tag;
-
- subscribeArgs ( int _interface,
- MessageListener *,
- LocalQueue *,
- const std::string *,
- const FlowControl *,
- const std::string *
- );
- };
-
- std::vector < subscribeArgs * > subscriptionReplayVector;
-
};
}} // namespace qpid::client
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp
index d63ad9646b..edf23683ae 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.cpp
+++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp
@@ -163,7 +163,10 @@ void Cluster::mcastBuffer(const char* data, size_t size, const ConnectionId& con
void Cluster::mcast(const Event& e) { Lock l(lock); mcast(e, l); }
void Cluster::mcast(const Event& e, Lock&) {
- if (state == LEFT) return;
+ if (state == LEFT) {
+ lock.notifyAll(); // threads waiting in getUrls()
+ return;
+ }
if (state < READY && e.isConnection()) {
// Stall outgoing connection events.
QPID_LOG(trace, *this << " MCAST deferred: " << e );
@@ -351,6 +354,7 @@ void Cluster::configChange (
map = ClusterMap(memberId, myUrl, true);
memberUpdate(l);
unstall(l);
+ lock.notifyAll(); // threads waiting in getUrls()
}
else { // Joining established group.
state = NEWBIE;
@@ -417,6 +421,8 @@ void Cluster::dumpRequest(const MemberId& id, const std::string& url, Lock& l) {
void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) {
map.ready(id, Url(url));
+ if (id == memberId)
+ lock.notifyAll(); // threads waiting in getUrls()
memberUpdate(l);
}
diff --git a/qpid/cpp/src/qpid/cluster/ClusterMap.cpp b/qpid/cpp/src/qpid/cluster/ClusterMap.cpp
index e2fc25bfaa..f8c5695b23 100644
--- a/qpid/cpp/src/qpid/cluster/ClusterMap.cpp
+++ b/qpid/cpp/src/qpid/cluster/ClusterMap.cpp
@@ -146,7 +146,7 @@ boost::optional<Url> ClusterMap::dumpOffer(const MemberId& from, const MemberId&
newbies.erase(i); // No longer a potential dumpee.
return url;
}
- return boost::none;
+ return boost::optional<Url>();
}
}} // namespace qpid::cluster