summaryrefslogtreecommitdiff
path: root/M4-RCs/qpid/cpp/src/qpid/client/SubscriptionManager.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'M4-RCs/qpid/cpp/src/qpid/client/SubscriptionManager.cpp')
-rw-r--r--M4-RCs/qpid/cpp/src/qpid/client/SubscriptionManager.cpp140
1 files changed, 0 insertions, 140 deletions
diff --git a/M4-RCs/qpid/cpp/src/qpid/client/SubscriptionManager.cpp b/M4-RCs/qpid/cpp/src/qpid/client/SubscriptionManager.cpp
deleted file mode 100644
index c91ae178ac..0000000000
--- a/M4-RCs/qpid/cpp/src/qpid/client/SubscriptionManager.cpp
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#ifndef _Subscription_
-#define _Subscription_
-
-#include "SubscriptionManager.h"
-#include "SubscriptionImpl.h"
-#include <qpid/client/Dispatcher.h>
-#include <qpid/client/Session.h>
-#include <qpid/client/MessageListener.h>
-#include <qpid/framing/Uuid.h>
-#include <set>
-#include <sstream>
-
-
-namespace qpid {
-namespace client {
-
-SubscriptionManager::SubscriptionManager(const Session& s)
- : dispatcher(s), session(s), autoStop(true)
-{}
-
-Subscription SubscriptionManager::subscribe(
- MessageListener& listener, const std::string& q, const SubscriptionSettings& ss, const std::string& n)
-{
- std::string name=n.empty() ? q:n;
- boost::intrusive_ptr<SubscriptionImpl> si = new SubscriptionImpl(*this, q, ss, name, &listener);
- dispatcher.listen(si);
- //issue subscription request after listener is registered with dispatcher
- si->subscribe();
- return subscriptions[name] = Subscription(si.get());
-}
-
-Subscription SubscriptionManager::subscribe(
- LocalQueue& lq, const std::string& q, const SubscriptionSettings& ss, const std::string& n)
-{
- std::string name=n.empty() ? q:n;
- lq.queue=session.getExecution().getDemux().add(name, ByTransferDest(name));
- boost::intrusive_ptr<SubscriptionImpl> si = new SubscriptionImpl(*this, q, ss, name, 0);
- si->subscribe();
- lq.subscription = Subscription(si.get());
- return subscriptions[name] = lq.subscription;
-}
-
-Subscription SubscriptionManager::subscribe(
- MessageListener& listener, const std::string& q, const std::string& n)
-{
- return subscribe(listener, q, defaultSettings, n);
-}
-
-Subscription SubscriptionManager::subscribe(
- LocalQueue& lq, const std::string& q, const std::string& n)
-{
- return subscribe(lq, q, defaultSettings, n);
-}
-
-void SubscriptionManager::cancel(const std::string& dest)
-{
- sync(session).messageCancel(dest);
- dispatcher.cancel(dest);
-}
-
-void SubscriptionManager::setAutoStop(bool set) { autoStop=set; }
-
-void SubscriptionManager::run()
-{
- dispatcher.setAutoStop(autoStop);
- dispatcher.run();
-}
-
-void SubscriptionManager::start()
-{
- dispatcher.setAutoStop(autoStop);
- dispatcher.start();
-}
-
-void SubscriptionManager::wait()
-{
- dispatcher.wait();
-}
-
-void SubscriptionManager::stop()
-{
- dispatcher.stop();
-}
-
-bool SubscriptionManager::get(Message& result, const std::string& queue, sys::Duration timeout) {
- LocalQueue lq;
- std::string unique = framing::Uuid(true).str();
- subscribe(lq, queue, SubscriptionSettings(FlowControl::messageCredit(1)), unique);
- AutoCancel ac(*this, unique);
- //first wait for message to be delivered if a timeout has been specified
- if (timeout && lq.get(result, timeout))
- return true;
- //make sure message is not on queue before final check
- sync(session).messageFlush(unique);
- return lq.get(result, 0);
-}
-
-Message SubscriptionManager::get(const std::string& queue, sys::Duration timeout) {
- Message result;
- if (!get(result, queue, timeout))
- throw Exception("Timed out waiting for a message");
- return result;
-}
-
-Session SubscriptionManager::getSession() const { return session; }
-
-Subscription SubscriptionManager::getSubscription(const std::string& name) const {
- std::map<std::string, Subscription>::const_iterator i = subscriptions.find(name);
- if (i == subscriptions.end())
- throw Exception(QPID_MSG("Subscription not found: " << name));
- return i->second;
-}
-
-void SubscriptionManager::registerFailoverHandler (boost::function<void ()> fh) {
- dispatcher.registerFailoverHandler(fh);
-}
-
-}} // namespace qpid::client
-
-#endif