diff options
Diffstat (limited to 'qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp | 222 |
1 files changed, 112 insertions, 110 deletions
diff --git a/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp b/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp index 2b108c1303..5a790e26cd 100644 --- a/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp +++ b/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp @@ -33,11 +33,11 @@ namespace client { FailoverSubscriptionManager::FailoverSubscriptionManager ( FailoverSession * fs) : - name("no_name"), - newSessionIsValid(false) + name("no_name"), + newSessionIsValid(false) { - subscriptionManager = new SubscriptionManager(fs->session); - fs->failoverSubscriptionManager = this; + subscriptionManager = new SubscriptionManager(fs->session); + fs->setFailoverSubscriptionManager(this); } @@ -45,8 +45,9 @@ FailoverSubscriptionManager::FailoverSubscriptionManager ( FailoverSession * fs) void FailoverSubscriptionManager::prepareForFailover ( Session _newSession ) { - newSession = _newSession; - newSessionIsValid = true; + Lock l(lock); + newSession = _newSession; + newSessionIsValid = true; } @@ -54,27 +55,27 @@ FailoverSubscriptionManager::prepareForFailover ( Session _newSession ) void FailoverSubscriptionManager::failover ( ) { - subscriptionManager->stop(); - // TODO -- save vector of boost bind fns. + // Stop the subscription manager thread so it can notice the failover in progress. + subscriptionManager->stop(); } FailoverSubscriptionManager::subscribeArgs::subscribeArgs - ( int _interface, - MessageListener * _listener, - LocalQueue * _localQueue, - const std::string * _queue, - const FlowControl * _flow, - const std::string * _tag - ) : - interface(_interface), - listener(_listener), - localQueue(_localQueue), - queue(_queue), - flow(_flow), - tag(_tag) +( int _interface, + MessageListener * _listener, + LocalQueue * _localQueue, + const std::string * _queue, + const FlowControl * _flow, + const std::string * _tag +) : + interface(_interface), + listener(_listener), + localQueue(_localQueue), + queue(_queue), + flow(_flow), + tag(_tag) { } @@ -86,14 +87,14 @@ FailoverSubscriptionManager::subscribe ( MessageListener & listener, const std::string & queue, const FlowControl & flow, const std::string & tag - ) +) { - subscriptionManager->subscribe ( listener, - queue, - flow, - tag - ); - subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(MessageListener&, const std::string&, const FlowControl&, const std::string&) ) &FailoverSubscriptionManager::subscribe, this, boost::ref(listener), queue, flow, tag ) ); + subscriptionManager->subscribe ( listener, + queue, + flow, + tag + ); + subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(MessageListener&, const std::string&, const FlowControl&, const std::string&) ) &FailoverSubscriptionManager::subscribe, this, boost::ref(listener), queue, flow, tag ) ); } @@ -103,14 +104,14 @@ FailoverSubscriptionManager::subscribe ( LocalQueue & localQueue, const std::string & queue, const FlowControl & flow, const std::string & tag - ) +) { - subscriptionManager->subscribe ( localQueue, - queue, - flow, - tag - ); - subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(LocalQueue&, const std::string&, const FlowControl&, const std::string&) ) &FailoverSubscriptionManager::subscribe, this, localQueue, queue, flow, tag ) ); + subscriptionManager->subscribe ( localQueue, + queue, + flow, + tag + ); + subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(LocalQueue&, const std::string&, const FlowControl&, const std::string&) ) &FailoverSubscriptionManager::subscribe, this, localQueue, queue, flow, tag ) ); } @@ -119,14 +120,14 @@ void FailoverSubscriptionManager::subscribe ( MessageListener & listener, const std::string & queue, const std::string & tag - ) +) { - subscriptionManager->subscribe ( listener, - queue, - tag - ); - // TODO -- more than one subscription - subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(MessageListener&, const std::string&, const std::string&) ) &FailoverSubscriptionManager::subscribe, this, boost::ref(listener), queue, tag ) ); + subscriptionManager->subscribe ( listener, + queue, + tag + ); + // TODO -- more than one subscription + subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(MessageListener&, const std::string&, const std::string&) ) &FailoverSubscriptionManager::subscribe, this, boost::ref(listener), queue, tag ) ); } @@ -136,13 +137,13 @@ void FailoverSubscriptionManager::subscribe ( LocalQueue & localQueue, const std::string & queue, const std::string & tag - ) +) { - subscriptionManager->subscribe ( localQueue, - queue, - tag - ); - subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(LocalQueue&, const std::string&, const std::string&) ) &FailoverSubscriptionManager::subscribe, this, localQueue, queue, tag ) ); + subscriptionManager->subscribe ( localQueue, + queue, + tag + ); + subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(LocalQueue&, const std::string&, const std::string&) ) &FailoverSubscriptionManager::subscribe, this, localQueue, queue, tag ) ); } @@ -151,9 +152,10 @@ bool FailoverSubscriptionManager::get ( Message & result, const std::string & queue, sys::Duration timeout - ) +) { - return subscriptionManager->get ( result, queue, timeout ); + + return subscriptionManager->get ( result, queue, timeout ); } @@ -161,7 +163,8 @@ FailoverSubscriptionManager::get ( Message & result, void FailoverSubscriptionManager::cancel ( const std::string tag ) { - subscriptionManager->cancel ( tag ); + + subscriptionManager->cancel ( tag ); } @@ -169,47 +172,40 @@ FailoverSubscriptionManager::cancel ( const std::string tag ) void FailoverSubscriptionManager::run ( ) // User Thread { - // FIXME mgoulish -- wait on a monitor here instead of this infinite loop - while ( 1 ) - { - subscriptionManager->run ( ); - - // When we drop out of run, if there is a new Session - // waiting for us, this is a failover. Otherwise, just - // return control to usercode. - sleep(1); // FIXME mgoulish -- get rid of this when we have wait-on-monitor. - - if ( newSessionIsValid ) - { - delete subscriptionManager; - subscriptionManager = new SubscriptionManager(newSession); - // FIXME mgoulish make this an array of boost bind fns - // - for ( std::vector<subscribeFn>::iterator i = subscribeFns.begin(); - i < subscribeFns.end(); - ++ i - ) - { - std::cerr << "MDEBUG new new resubscribe.\n"; - (*i) (); - } - - newSessionIsValid = false; - } - else + while ( 1 ) { - // break; TODO -- fix this + subscriptionManager->run ( ); + Lock l(lock); + // When we drop out of run, if there is a new Session + // waiting for us, this is a failover. Otherwise, just + // return control to usercode. + if ( newSessionIsValid ) + { + delete subscriptionManager; + subscriptionManager = new SubscriptionManager(newSession); + for ( std::vector<subscribeFn>::iterator i = subscribeFns.begin(); + i < subscribeFns.end(); + ++ i + ) + { + (*i) (); + } + newSessionIsValid = false; + } + else + { + // Not a failover, return to user code. + break; + } } - } - } - void FailoverSubscriptionManager::start ( ) { - subscriptionManager->start ( ); + + subscriptionManager->start ( ); } @@ -217,7 +213,8 @@ FailoverSubscriptionManager::start ( ) void FailoverSubscriptionManager::setAutoStop ( bool set ) { - subscriptionManager->setAutoStop ( set ); + + subscriptionManager->setAutoStop ( set ); } @@ -225,7 +222,8 @@ FailoverSubscriptionManager::setAutoStop ( bool set ) void FailoverSubscriptionManager::stop ( ) { - subscriptionManager->stop ( ); + + subscriptionManager->stop ( ); } @@ -233,9 +231,10 @@ FailoverSubscriptionManager::stop ( ) void FailoverSubscriptionManager::setFlowControl ( const std::string & destination, const FlowControl & flow - ) +) { - subscriptionManager->setFlowControl ( destination, flow ); + + subscriptionManager->setFlowControl ( destination, flow ); } @@ -243,7 +242,8 @@ FailoverSubscriptionManager::setFlowControl ( const std::string & destination, void FailoverSubscriptionManager::setFlowControl ( const FlowControl & flow ) { - subscriptionManager->setFlowControl ( flow ); + + subscriptionManager->setFlowControl ( flow ); } @@ -251,7 +251,8 @@ FailoverSubscriptionManager::setFlowControl ( const FlowControl & flow ) const FlowControl & FailoverSubscriptionManager::getFlowControl ( ) const { - return subscriptionManager->getFlowControl ( ); + + return subscriptionManager->getFlowControl ( ); } @@ -262,13 +263,14 @@ FailoverSubscriptionManager::setFlowControl ( const std::string & tag, uint32_t messages, uint32_t bytes, bool window - ) +) { - subscriptionManager->setFlowControl ( tag, - messages, - bytes, - window - ); + + subscriptionManager->setFlowControl ( tag, + messages, + bytes, + window + ); } @@ -277,12 +279,13 @@ void FailoverSubscriptionManager::setFlowControl ( uint32_t messages, uint32_t bytes, bool window - ) +) { - subscriptionManager->setFlowControl ( messages, - bytes, - window - ); + + subscriptionManager->setFlowControl ( messages, + bytes, + window + ); } @@ -290,7 +293,8 @@ FailoverSubscriptionManager::setFlowControl ( uint32_t messages, void FailoverSubscriptionManager::setAcceptMode ( bool required ) { - subscriptionManager->setAcceptMode ( required ); + + subscriptionManager->setAcceptMode ( required ); } @@ -298,7 +302,8 @@ FailoverSubscriptionManager::setAcceptMode ( bool required ) void FailoverSubscriptionManager::setAcquireMode ( bool acquire ) { - subscriptionManager->setAcquireMode ( acquire ); + + subscriptionManager->setAcquireMode ( acquire ); } @@ -306,7 +311,8 @@ FailoverSubscriptionManager::setAcquireMode ( bool acquire ) void FailoverSubscriptionManager::setAckPolicy ( const AckPolicy & autoAck ) { - subscriptionManager->setAckPolicy ( autoAck ); + + subscriptionManager->setAckPolicy ( autoAck ); } @@ -314,16 +320,12 @@ FailoverSubscriptionManager::setAckPolicy ( const AckPolicy & autoAck ) AckPolicy & FailoverSubscriptionManager::getAckPolicy() { - return subscriptionManager->getAckPolicy ( ); + + return subscriptionManager->getAckPolicy ( ); } -void -FailoverSubscriptionManager::registerFailoverHandler ( boost::function<void ()> /* fh */ ) -{ - // FIXME mgoulish -- get rid of this mechanism -- i think it's unused. -} |