summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client/FailoverSubscriptionManager.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-10-10 04:49:48 +0000
committerAlan Conway <aconway@apache.org>2008-10-10 04:49:48 +0000
commit5d07d177cfc5eca21c44981bbe342f0cdcced4e5 (patch)
tree0f5f83642ed5effed52a5e2547565362ce2aea8c /cpp/src/qpid/client/FailoverSubscriptionManager.cpp
parente7ceead683231ef2cb35a6ee70488e859f023d12 (diff)
downloadqpid-python-5d07d177cfc5eca21c44981bbe342f0cdcced4e5.tar.gz
QPID-1340 froM Mick Goulish: preliminary client-side failover support.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@703319 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client/FailoverSubscriptionManager.cpp')
-rw-r--r--cpp/src/qpid/client/FailoverSubscriptionManager.cpp332
1 files changed, 332 insertions, 0 deletions
diff --git a/cpp/src/qpid/client/FailoverSubscriptionManager.cpp b/cpp/src/qpid/client/FailoverSubscriptionManager.cpp
new file mode 100644
index 0000000000..2b108c1303
--- /dev/null
+++ b/cpp/src/qpid/client/FailoverSubscriptionManager.cpp
@@ -0,0 +1,332 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/client/FailoverSession.h"
+#include "qpid/client/FailoverSubscriptionManager.h"
+
+
+
+using namespace std;
+
+
+namespace qpid {
+namespace client {
+
+
+
+FailoverSubscriptionManager::FailoverSubscriptionManager ( FailoverSession * fs) :
+ name("no_name"),
+ newSessionIsValid(false)
+{
+ subscriptionManager = new SubscriptionManager(fs->session);
+ fs->failoverSubscriptionManager = this;
+}
+
+
+
+void
+FailoverSubscriptionManager::prepareForFailover ( Session _newSession )
+{
+ newSession = _newSession;
+ newSessionIsValid = true;
+}
+
+
+
+void
+FailoverSubscriptionManager::failover ( )
+{
+ subscriptionManager->stop();
+ // TODO -- save vector of boost bind fns.
+}
+
+
+
+
+FailoverSubscriptionManager::subscribeArgs::subscribeArgs
+ ( int _interface,
+ MessageListener * _listener,
+ LocalQueue * _localQueue,
+ const std::string * _queue,
+ const FlowControl * _flow,
+ const std::string * _tag
+ ) :
+ interface(_interface),
+ listener(_listener),
+ localQueue(_localQueue),
+ queue(_queue),
+ flow(_flow),
+ tag(_tag)
+{
+}
+
+
+
+
+void
+FailoverSubscriptionManager::subscribe ( MessageListener & listener,
+ const std::string & queue,
+ const FlowControl & flow,
+ const std::string & tag
+ )
+{
+ subscriptionManager->subscribe ( listener,
+ queue,
+ flow,
+ tag
+ );
+ subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(MessageListener&, const std::string&, const FlowControl&, const std::string&) ) &FailoverSubscriptionManager::subscribe, this, boost::ref(listener), queue, flow, tag ) );
+}
+
+
+
+void
+FailoverSubscriptionManager::subscribe ( LocalQueue & localQueue,
+ const std::string & queue,
+ const FlowControl & flow,
+ const std::string & tag
+ )
+{
+ subscriptionManager->subscribe ( localQueue,
+ queue,
+ flow,
+ tag
+ );
+ subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(LocalQueue&, const std::string&, const FlowControl&, const std::string&) ) &FailoverSubscriptionManager::subscribe, this, localQueue, queue, flow, tag ) );
+}
+
+
+
+void
+FailoverSubscriptionManager::subscribe ( MessageListener & listener,
+ const std::string & queue,
+ const std::string & tag
+ )
+{
+ subscriptionManager->subscribe ( listener,
+ queue,
+ tag
+ );
+ // TODO -- more than one subscription
+ subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(MessageListener&, const std::string&, const std::string&) ) &FailoverSubscriptionManager::subscribe, this, boost::ref(listener), queue, tag ) );
+}
+
+
+
+
+void
+FailoverSubscriptionManager::subscribe ( LocalQueue & localQueue,
+ const std::string & queue,
+ const std::string & tag
+ )
+{
+ subscriptionManager->subscribe ( localQueue,
+ queue,
+ tag
+ );
+ subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(LocalQueue&, const std::string&, const std::string&) ) &FailoverSubscriptionManager::subscribe, this, localQueue, queue, tag ) );
+}
+
+
+
+bool
+FailoverSubscriptionManager::get ( Message & result,
+ const std::string & queue,
+ sys::Duration timeout
+ )
+{
+ return subscriptionManager->get ( result, queue, timeout );
+}
+
+
+
+void
+FailoverSubscriptionManager::cancel ( const std::string tag )
+{
+ subscriptionManager->cancel ( tag );
+}
+
+
+
+void
+FailoverSubscriptionManager::run ( ) // User Thread
+{
+ // FIXME mgoulish -- wait on a monitor here instead of this infinite loop
+ while ( 1 )
+ {
+ subscriptionManager->run ( );
+
+ // When we drop out of run, if there is a new Session
+ // waiting for us, this is a failover. Otherwise, just
+ // return control to usercode.
+ sleep(1); // FIXME mgoulish -- get rid of this when we have wait-on-monitor.
+
+ if ( newSessionIsValid )
+ {
+ delete subscriptionManager;
+ subscriptionManager = new SubscriptionManager(newSession);
+ // FIXME mgoulish make this an array of boost bind fns
+ //
+ for ( std::vector<subscribeFn>::iterator i = subscribeFns.begin();
+ i < subscribeFns.end();
+ ++ i
+ )
+ {
+ std::cerr << "MDEBUG new new resubscribe.\n";
+ (*i) ();
+ }
+
+ newSessionIsValid = false;
+ }
+ else
+ {
+ // break; TODO -- fix this
+ }
+ }
+
+}
+
+
+
+void
+FailoverSubscriptionManager::start ( )
+{
+ subscriptionManager->start ( );
+}
+
+
+
+void
+FailoverSubscriptionManager::setAutoStop ( bool set )
+{
+ subscriptionManager->setAutoStop ( set );
+}
+
+
+
+void
+FailoverSubscriptionManager::stop ( )
+{
+ subscriptionManager->stop ( );
+}
+
+
+
+void
+FailoverSubscriptionManager::setFlowControl ( const std::string & destination,
+ const FlowControl & flow
+ )
+{
+ subscriptionManager->setFlowControl ( destination, flow );
+}
+
+
+
+void
+FailoverSubscriptionManager::setFlowControl ( const FlowControl & flow )
+{
+ subscriptionManager->setFlowControl ( flow );
+}
+
+
+
+const FlowControl &
+FailoverSubscriptionManager::getFlowControl ( ) const
+{
+ return subscriptionManager->getFlowControl ( );
+}
+
+
+
+
+void
+FailoverSubscriptionManager::setFlowControl ( const std::string & tag,
+ uint32_t messages,
+ uint32_t bytes,
+ bool window
+ )
+{
+ subscriptionManager->setFlowControl ( tag,
+ messages,
+ bytes,
+ window
+ );
+}
+
+
+
+void
+FailoverSubscriptionManager::setFlowControl ( uint32_t messages,
+ uint32_t bytes,
+ bool window
+ )
+{
+ subscriptionManager->setFlowControl ( messages,
+ bytes,
+ window
+ );
+}
+
+
+
+void
+FailoverSubscriptionManager::setAcceptMode ( bool required )
+{
+ subscriptionManager->setAcceptMode ( required );
+}
+
+
+
+void
+FailoverSubscriptionManager::setAcquireMode ( bool acquire )
+{
+ subscriptionManager->setAcquireMode ( acquire );
+}
+
+
+
+void
+FailoverSubscriptionManager::setAckPolicy ( const AckPolicy & autoAck )
+{
+ subscriptionManager->setAckPolicy ( autoAck );
+}
+
+
+
+AckPolicy &
+FailoverSubscriptionManager::getAckPolicy()
+{
+ return subscriptionManager->getAckPolicy ( );
+}
+
+
+
+void
+FailoverSubscriptionManager::registerFailoverHandler ( boost::function<void ()> /* fh */ )
+{
+ // FIXME mgoulish -- get rid of this mechanism -- i think it's unused.
+}
+
+
+
+
+
+}} // namespace qpid::client