diff options
author | Alan Conway <aconway@apache.org> | 2008-10-10 04:49:48 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2008-10-10 04:49:48 +0000 |
commit | 5d07d177cfc5eca21c44981bbe342f0cdcced4e5 (patch) | |
tree | 0f5f83642ed5effed52a5e2547565362ce2aea8c /cpp/src/qpid/client/FailoverSubscriptionManager.cpp | |
parent | e7ceead683231ef2cb35a6ee70488e859f023d12 (diff) | |
download | qpid-python-5d07d177cfc5eca21c44981bbe342f0cdcced4e5.tar.gz |
QPID-1340 froM Mick Goulish: preliminary client-side failover support.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@703319 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client/FailoverSubscriptionManager.cpp')
-rw-r--r-- | cpp/src/qpid/client/FailoverSubscriptionManager.cpp | 332 |
1 files changed, 332 insertions, 0 deletions
diff --git a/cpp/src/qpid/client/FailoverSubscriptionManager.cpp b/cpp/src/qpid/client/FailoverSubscriptionManager.cpp new file mode 100644 index 0000000000..2b108c1303 --- /dev/null +++ b/cpp/src/qpid/client/FailoverSubscriptionManager.cpp @@ -0,0 +1,332 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/client/FailoverSession.h" +#include "qpid/client/FailoverSubscriptionManager.h" + + + +using namespace std; + + +namespace qpid { +namespace client { + + + +FailoverSubscriptionManager::FailoverSubscriptionManager ( FailoverSession * fs) : + name("no_name"), + newSessionIsValid(false) +{ + subscriptionManager = new SubscriptionManager(fs->session); + fs->failoverSubscriptionManager = this; +} + + + +void +FailoverSubscriptionManager::prepareForFailover ( Session _newSession ) +{ + newSession = _newSession; + newSessionIsValid = true; +} + + + +void +FailoverSubscriptionManager::failover ( ) +{ + subscriptionManager->stop(); + // TODO -- save vector of boost bind fns. +} + + + + +FailoverSubscriptionManager::subscribeArgs::subscribeArgs + ( int _interface, + MessageListener * _listener, + LocalQueue * _localQueue, + const std::string * _queue, + const FlowControl * _flow, + const std::string * _tag + ) : + interface(_interface), + listener(_listener), + localQueue(_localQueue), + queue(_queue), + flow(_flow), + tag(_tag) +{ +} + + + + +void +FailoverSubscriptionManager::subscribe ( MessageListener & listener, + const std::string & queue, + const FlowControl & flow, + const std::string & tag + ) +{ + subscriptionManager->subscribe ( listener, + queue, + flow, + tag + ); + subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(MessageListener&, const std::string&, const FlowControl&, const std::string&) ) &FailoverSubscriptionManager::subscribe, this, boost::ref(listener), queue, flow, tag ) ); +} + + + +void +FailoverSubscriptionManager::subscribe ( LocalQueue & localQueue, + const std::string & queue, + const FlowControl & flow, + const std::string & tag + ) +{ + subscriptionManager->subscribe ( localQueue, + queue, + flow, + tag + ); + subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(LocalQueue&, const std::string&, const FlowControl&, const std::string&) ) &FailoverSubscriptionManager::subscribe, this, localQueue, queue, flow, tag ) ); +} + + + +void +FailoverSubscriptionManager::subscribe ( MessageListener & listener, + const std::string & queue, + const std::string & tag + ) +{ + subscriptionManager->subscribe ( listener, + queue, + tag + ); + // TODO -- more than one subscription + subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(MessageListener&, const std::string&, const std::string&) ) &FailoverSubscriptionManager::subscribe, this, boost::ref(listener), queue, tag ) ); +} + + + + +void +FailoverSubscriptionManager::subscribe ( LocalQueue & localQueue, + const std::string & queue, + const std::string & tag + ) +{ + subscriptionManager->subscribe ( localQueue, + queue, + tag + ); + subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(LocalQueue&, const std::string&, const std::string&) ) &FailoverSubscriptionManager::subscribe, this, localQueue, queue, tag ) ); +} + + + +bool +FailoverSubscriptionManager::get ( Message & result, + const std::string & queue, + sys::Duration timeout + ) +{ + return subscriptionManager->get ( result, queue, timeout ); +} + + + +void +FailoverSubscriptionManager::cancel ( const std::string tag ) +{ + subscriptionManager->cancel ( tag ); +} + + + +void +FailoverSubscriptionManager::run ( ) // User Thread +{ + // FIXME mgoulish -- wait on a monitor here instead of this infinite loop + while ( 1 ) + { + subscriptionManager->run ( ); + + // When we drop out of run, if there is a new Session + // waiting for us, this is a failover. Otherwise, just + // return control to usercode. + sleep(1); // FIXME mgoulish -- get rid of this when we have wait-on-monitor. + + if ( newSessionIsValid ) + { + delete subscriptionManager; + subscriptionManager = new SubscriptionManager(newSession); + // FIXME mgoulish make this an array of boost bind fns + // + for ( std::vector<subscribeFn>::iterator i = subscribeFns.begin(); + i < subscribeFns.end(); + ++ i + ) + { + std::cerr << "MDEBUG new new resubscribe.\n"; + (*i) (); + } + + newSessionIsValid = false; + } + else + { + // break; TODO -- fix this + } + } + +} + + + +void +FailoverSubscriptionManager::start ( ) +{ + subscriptionManager->start ( ); +} + + + +void +FailoverSubscriptionManager::setAutoStop ( bool set ) +{ + subscriptionManager->setAutoStop ( set ); +} + + + +void +FailoverSubscriptionManager::stop ( ) +{ + subscriptionManager->stop ( ); +} + + + +void +FailoverSubscriptionManager::setFlowControl ( const std::string & destination, + const FlowControl & flow + ) +{ + subscriptionManager->setFlowControl ( destination, flow ); +} + + + +void +FailoverSubscriptionManager::setFlowControl ( const FlowControl & flow ) +{ + subscriptionManager->setFlowControl ( flow ); +} + + + +const FlowControl & +FailoverSubscriptionManager::getFlowControl ( ) const +{ + return subscriptionManager->getFlowControl ( ); +} + + + + +void +FailoverSubscriptionManager::setFlowControl ( const std::string & tag, + uint32_t messages, + uint32_t bytes, + bool window + ) +{ + subscriptionManager->setFlowControl ( tag, + messages, + bytes, + window + ); +} + + + +void +FailoverSubscriptionManager::setFlowControl ( uint32_t messages, + uint32_t bytes, + bool window + ) +{ + subscriptionManager->setFlowControl ( messages, + bytes, + window + ); +} + + + +void +FailoverSubscriptionManager::setAcceptMode ( bool required ) +{ + subscriptionManager->setAcceptMode ( required ); +} + + + +void +FailoverSubscriptionManager::setAcquireMode ( bool acquire ) +{ + subscriptionManager->setAcquireMode ( acquire ); +} + + + +void +FailoverSubscriptionManager::setAckPolicy ( const AckPolicy & autoAck ) +{ + subscriptionManager->setAckPolicy ( autoAck ); +} + + + +AckPolicy & +FailoverSubscriptionManager::getAckPolicy() +{ + return subscriptionManager->getAckPolicy ( ); +} + + + +void +FailoverSubscriptionManager::registerFailoverHandler ( boost::function<void ()> /* fh */ ) +{ + // FIXME mgoulish -- get rid of this mechanism -- i think it's unused. +} + + + + + +}} // namespace qpid::client |