summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp')
-rw-r--r--qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp222
1 files changed, 112 insertions, 110 deletions
diff --git a/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp b/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp
index 2b108c1303..5a790e26cd 100644
--- a/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp
+++ b/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp
@@ -33,11 +33,11 @@ namespace client {
FailoverSubscriptionManager::FailoverSubscriptionManager ( FailoverSession * fs) :
- name("no_name"),
- newSessionIsValid(false)
+ name("no_name"),
+ newSessionIsValid(false)
{
- subscriptionManager = new SubscriptionManager(fs->session);
- fs->failoverSubscriptionManager = this;
+ subscriptionManager = new SubscriptionManager(fs->session);
+ fs->setFailoverSubscriptionManager(this);
}
@@ -45,8 +45,9 @@ FailoverSubscriptionManager::FailoverSubscriptionManager ( FailoverSession * fs)
void
FailoverSubscriptionManager::prepareForFailover ( Session _newSession )
{
- newSession = _newSession;
- newSessionIsValid = true;
+ Lock l(lock);
+ newSession = _newSession;
+ newSessionIsValid = true;
}
@@ -54,27 +55,27 @@ FailoverSubscriptionManager::prepareForFailover ( Session _newSession )
void
FailoverSubscriptionManager::failover ( )
{
- subscriptionManager->stop();
- // TODO -- save vector of boost bind fns.
+ // Stop the subscription manager thread so it can notice the failover in progress.
+ subscriptionManager->stop();
}
FailoverSubscriptionManager::subscribeArgs::subscribeArgs
- ( int _interface,
- MessageListener * _listener,
- LocalQueue * _localQueue,
- const std::string * _queue,
- const FlowControl * _flow,
- const std::string * _tag
- ) :
- interface(_interface),
- listener(_listener),
- localQueue(_localQueue),
- queue(_queue),
- flow(_flow),
- tag(_tag)
+( int _interface,
+ MessageListener * _listener,
+ LocalQueue * _localQueue,
+ const std::string * _queue,
+ const FlowControl * _flow,
+ const std::string * _tag
+) :
+ interface(_interface),
+ listener(_listener),
+ localQueue(_localQueue),
+ queue(_queue),
+ flow(_flow),
+ tag(_tag)
{
}
@@ -86,14 +87,14 @@ FailoverSubscriptionManager::subscribe ( MessageListener & listener,
const std::string & queue,
const FlowControl & flow,
const std::string & tag
- )
+)
{
- subscriptionManager->subscribe ( listener,
- queue,
- flow,
- tag
- );
- subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(MessageListener&, const std::string&, const FlowControl&, const std::string&) ) &FailoverSubscriptionManager::subscribe, this, boost::ref(listener), queue, flow, tag ) );
+ subscriptionManager->subscribe ( listener,
+ queue,
+ flow,
+ tag
+ );
+ subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(MessageListener&, const std::string&, const FlowControl&, const std::string&) ) &FailoverSubscriptionManager::subscribe, this, boost::ref(listener), queue, flow, tag ) );
}
@@ -103,14 +104,14 @@ FailoverSubscriptionManager::subscribe ( LocalQueue & localQueue,
const std::string & queue,
const FlowControl & flow,
const std::string & tag
- )
+)
{
- subscriptionManager->subscribe ( localQueue,
- queue,
- flow,
- tag
- );
- subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(LocalQueue&, const std::string&, const FlowControl&, const std::string&) ) &FailoverSubscriptionManager::subscribe, this, localQueue, queue, flow, tag ) );
+ subscriptionManager->subscribe ( localQueue,
+ queue,
+ flow,
+ tag
+ );
+ subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(LocalQueue&, const std::string&, const FlowControl&, const std::string&) ) &FailoverSubscriptionManager::subscribe, this, localQueue, queue, flow, tag ) );
}
@@ -119,14 +120,14 @@ void
FailoverSubscriptionManager::subscribe ( MessageListener & listener,
const std::string & queue,
const std::string & tag
- )
+)
{
- subscriptionManager->subscribe ( listener,
- queue,
- tag
- );
- // TODO -- more than one subscription
- subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(MessageListener&, const std::string&, const std::string&) ) &FailoverSubscriptionManager::subscribe, this, boost::ref(listener), queue, tag ) );
+ subscriptionManager->subscribe ( listener,
+ queue,
+ tag
+ );
+ // TODO -- more than one subscription
+ subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(MessageListener&, const std::string&, const std::string&) ) &FailoverSubscriptionManager::subscribe, this, boost::ref(listener), queue, tag ) );
}
@@ -136,13 +137,13 @@ void
FailoverSubscriptionManager::subscribe ( LocalQueue & localQueue,
const std::string & queue,
const std::string & tag
- )
+)
{
- subscriptionManager->subscribe ( localQueue,
- queue,
- tag
- );
- subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(LocalQueue&, const std::string&, const std::string&) ) &FailoverSubscriptionManager::subscribe, this, localQueue, queue, tag ) );
+ subscriptionManager->subscribe ( localQueue,
+ queue,
+ tag
+ );
+ subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(LocalQueue&, const std::string&, const std::string&) ) &FailoverSubscriptionManager::subscribe, this, localQueue, queue, tag ) );
}
@@ -151,9 +152,10 @@ bool
FailoverSubscriptionManager::get ( Message & result,
const std::string & queue,
sys::Duration timeout
- )
+)
{
- return subscriptionManager->get ( result, queue, timeout );
+
+ return subscriptionManager->get ( result, queue, timeout );
}
@@ -161,7 +163,8 @@ FailoverSubscriptionManager::get ( Message & result,
void
FailoverSubscriptionManager::cancel ( const std::string tag )
{
- subscriptionManager->cancel ( tag );
+
+ subscriptionManager->cancel ( tag );
}
@@ -169,47 +172,40 @@ FailoverSubscriptionManager::cancel ( const std::string tag )
void
FailoverSubscriptionManager::run ( ) // User Thread
{
- // FIXME mgoulish -- wait on a monitor here instead of this infinite loop
- while ( 1 )
- {
- subscriptionManager->run ( );
-
- // When we drop out of run, if there is a new Session
- // waiting for us, this is a failover. Otherwise, just
- // return control to usercode.
- sleep(1); // FIXME mgoulish -- get rid of this when we have wait-on-monitor.
-
- if ( newSessionIsValid )
- {
- delete subscriptionManager;
- subscriptionManager = new SubscriptionManager(newSession);
- // FIXME mgoulish make this an array of boost bind fns
- //
- for ( std::vector<subscribeFn>::iterator i = subscribeFns.begin();
- i < subscribeFns.end();
- ++ i
- )
- {
- std::cerr << "MDEBUG new new resubscribe.\n";
- (*i) ();
- }
-
- newSessionIsValid = false;
- }
- else
+ while ( 1 )
{
- // break; TODO -- fix this
+ subscriptionManager->run ( );
+ Lock l(lock);
+ // When we drop out of run, if there is a new Session
+ // waiting for us, this is a failover. Otherwise, just
+ // return control to usercode.
+ if ( newSessionIsValid )
+ {
+ delete subscriptionManager;
+ subscriptionManager = new SubscriptionManager(newSession);
+ for ( std::vector<subscribeFn>::iterator i = subscribeFns.begin();
+ i < subscribeFns.end();
+ ++ i
+ )
+ {
+ (*i) ();
+ }
+ newSessionIsValid = false;
+ }
+ else
+ {
+ // Not a failover, return to user code.
+ break;
+ }
}
- }
-
}
-
void
FailoverSubscriptionManager::start ( )
{
- subscriptionManager->start ( );
+
+ subscriptionManager->start ( );
}
@@ -217,7 +213,8 @@ FailoverSubscriptionManager::start ( )
void
FailoverSubscriptionManager::setAutoStop ( bool set )
{
- subscriptionManager->setAutoStop ( set );
+
+ subscriptionManager->setAutoStop ( set );
}
@@ -225,7 +222,8 @@ FailoverSubscriptionManager::setAutoStop ( bool set )
void
FailoverSubscriptionManager::stop ( )
{
- subscriptionManager->stop ( );
+
+ subscriptionManager->stop ( );
}
@@ -233,9 +231,10 @@ FailoverSubscriptionManager::stop ( )
void
FailoverSubscriptionManager::setFlowControl ( const std::string & destination,
const FlowControl & flow
- )
+)
{
- subscriptionManager->setFlowControl ( destination, flow );
+
+ subscriptionManager->setFlowControl ( destination, flow );
}
@@ -243,7 +242,8 @@ FailoverSubscriptionManager::setFlowControl ( const std::string & destination,
void
FailoverSubscriptionManager::setFlowControl ( const FlowControl & flow )
{
- subscriptionManager->setFlowControl ( flow );
+
+ subscriptionManager->setFlowControl ( flow );
}
@@ -251,7 +251,8 @@ FailoverSubscriptionManager::setFlowControl ( const FlowControl & flow )
const FlowControl &
FailoverSubscriptionManager::getFlowControl ( ) const
{
- return subscriptionManager->getFlowControl ( );
+
+ return subscriptionManager->getFlowControl ( );
}
@@ -262,13 +263,14 @@ FailoverSubscriptionManager::setFlowControl ( const std::string & tag,
uint32_t messages,
uint32_t bytes,
bool window
- )
+)
{
- subscriptionManager->setFlowControl ( tag,
- messages,
- bytes,
- window
- );
+
+ subscriptionManager->setFlowControl ( tag,
+ messages,
+ bytes,
+ window
+ );
}
@@ -277,12 +279,13 @@ void
FailoverSubscriptionManager::setFlowControl ( uint32_t messages,
uint32_t bytes,
bool window
- )
+)
{
- subscriptionManager->setFlowControl ( messages,
- bytes,
- window
- );
+
+ subscriptionManager->setFlowControl ( messages,
+ bytes,
+ window
+ );
}
@@ -290,7 +293,8 @@ FailoverSubscriptionManager::setFlowControl ( uint32_t messages,
void
FailoverSubscriptionManager::setAcceptMode ( bool required )
{
- subscriptionManager->setAcceptMode ( required );
+
+ subscriptionManager->setAcceptMode ( required );
}
@@ -298,7 +302,8 @@ FailoverSubscriptionManager::setAcceptMode ( bool required )
void
FailoverSubscriptionManager::setAcquireMode ( bool acquire )
{
- subscriptionManager->setAcquireMode ( acquire );
+
+ subscriptionManager->setAcquireMode ( acquire );
}
@@ -306,7 +311,8 @@ FailoverSubscriptionManager::setAcquireMode ( bool acquire )
void
FailoverSubscriptionManager::setAckPolicy ( const AckPolicy & autoAck )
{
- subscriptionManager->setAckPolicy ( autoAck );
+
+ subscriptionManager->setAckPolicy ( autoAck );
}
@@ -314,16 +320,12 @@ FailoverSubscriptionManager::setAckPolicy ( const AckPolicy & autoAck )
AckPolicy &
FailoverSubscriptionManager::getAckPolicy()
{
- return subscriptionManager->getAckPolicy ( );
+
+ return subscriptionManager->getAckPolicy ( );
}
-void
-FailoverSubscriptionManager::registerFailoverHandler ( boost::function<void ()> /* fh */ )
-{
- // FIXME mgoulish -- get rid of this mechanism -- i think it's unused.
-}