summaryrefslogtreecommitdiff
path: root/M4-RCs/qpid/cpp/src/qpid/client/SubscriptionImpl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'M4-RCs/qpid/cpp/src/qpid/client/SubscriptionImpl.cpp')
-rw-r--r--M4-RCs/qpid/cpp/src/qpid/client/SubscriptionImpl.cpp149
1 files changed, 0 insertions, 149 deletions
diff --git a/M4-RCs/qpid/cpp/src/qpid/client/SubscriptionImpl.cpp b/M4-RCs/qpid/cpp/src/qpid/client/SubscriptionImpl.cpp
deleted file mode 100644
index 5ea87110c2..0000000000
--- a/M4-RCs/qpid/cpp/src/qpid/client/SubscriptionImpl.cpp
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "SubscriptionImpl.h"
-#include "SubscriptionManager.h"
-#include "SubscriptionSettings.h"
-
-namespace qpid {
-namespace client {
-
-using sys::Mutex;
-using framing::MessageAcquireResult;
-
-SubscriptionImpl::SubscriptionImpl(SubscriptionManager& m, const std::string& q, const SubscriptionSettings& s, const std::string& n, MessageListener* l)
- : manager(m), name(n), queue(q), settings(s), listener(l)
-{}
-
-void SubscriptionImpl::subscribe()
-{
- async(manager.getSession()).messageSubscribe(
- arg::queue=queue,
- arg::destination=name,
- arg::acceptMode=settings.acceptMode,
- arg::acquireMode=settings.acquireMode,
- arg::exclusive=settings.exclusive);
- setFlowControl(settings.flowControl);
-}
-
-std::string SubscriptionImpl::getName() const { return name; }
-
-std::string SubscriptionImpl::getQueue() const { return queue; }
-
-const SubscriptionSettings& SubscriptionImpl::getSettings() const {
- Mutex::ScopedLock l(lock);
- return settings;
-}
-
-void SubscriptionImpl::setFlowControl(const FlowControl& f) {
- Mutex::ScopedLock l(lock);
- AsyncSession s=manager.getSession();
- if (&settings.flowControl != &f) settings.flowControl = f;
- s.messageSetFlowMode(name, f.window);
- s.messageFlow(name, CREDIT_UNIT_MESSAGE, f.messages);
- s.messageFlow(name, CREDIT_UNIT_BYTE, f.bytes);
- s.sync();
-}
-
-void SubscriptionImpl::grantCredit(framing::message::CreditUnit unit, uint32_t value) {
- async(manager.getSession()).messageFlow(name, unit, value);
-}
-
-void SubscriptionImpl::setAutoAck(size_t n) {
- Mutex::ScopedLock l(lock);
- settings.autoAck = n;
-}
-
-SequenceSet SubscriptionImpl::getUnacquired() const { Mutex::ScopedLock l(lock); return unacquired; }
-SequenceSet SubscriptionImpl::getUnaccepted() const { Mutex::ScopedLock l(lock); return unaccepted; }
-
-void SubscriptionImpl::acquire(const SequenceSet& messageIds) {
- Mutex::ScopedLock l(lock);
- MessageAcquireResult result = manager.getSession().messageAcquire(messageIds);
- unacquired.remove(result.getTransfers());
- if (settings.acceptMode == ACCEPT_MODE_EXPLICIT)
- unaccepted.add(result.getTransfers());
-}
-
-void SubscriptionImpl::accept(const SequenceSet& messageIds) {
- Mutex::ScopedLock l(lock);
- manager.getSession().messageAccept(messageIds);
- unaccepted.remove(messageIds);
- switch (settings.completionMode) {
- case COMPLETE_ON_ACCEPT:
- manager.getSession().markCompleted(messageIds, true);
- break;
- case COMPLETE_ON_DELIVERY:
- manager.getSession().sendCompletion();
- break;
- default://do nothing
- break;
- }
-}
-
-void SubscriptionImpl::release(const SequenceSet& messageIds) {
- Mutex::ScopedLock l(lock);
- manager.getSession().messageRelease(messageIds);
- if (settings.acceptMode == ACCEPT_MODE_EXPLICIT)
- unaccepted.remove(messageIds);
-}
-
-Session SubscriptionImpl::getSession() const { return manager.getSession(); }
-
-SubscriptionManager& SubscriptionImpl::getSubscriptionManager() const { return manager; }
-
-void SubscriptionImpl::cancel() { manager.cancel(name); }
-
-void SubscriptionImpl::received(Message& m) {
- Mutex::ScopedLock l(lock);
- if (m.getMethod().getAcquireMode() == ACQUIRE_MODE_NOT_ACQUIRED)
- unacquired.add(m.getId());
- else if (m.getMethod().getAcceptMode() == ACCEPT_MODE_EXPLICIT)
- unaccepted.add(m.getId());
-
- if (listener) {
- Mutex::ScopedUnlock u(lock);
- listener->received(m);
- }
-
- if (settings.completionMode == COMPLETE_ON_DELIVERY) {
- manager.getSession().markCompleted(m.getId(), false, false);
- }
- if (settings.autoAck) {
- if (unaccepted.size() >= settings.autoAck) {
- async(manager.getSession()).messageAccept(unaccepted);
- switch (settings.completionMode) {
- case COMPLETE_ON_ACCEPT:
- manager.getSession().markCompleted(unaccepted, true);
- break;
- case COMPLETE_ON_DELIVERY:
- manager.getSession().sendCompletion();
- break;
- default://do nothing
- break;
- }
- unaccepted.clear();
- }
- }
-}
-
-}} // namespace qpid::client
-