summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2016-07-05 21:55:35 +0000
committerRobert Gemmell <robbie@apache.org>2016-07-05 21:55:35 +0000
commitf160cb6566c17945f7ebc4f3a752b2cc6a051685 (patch)
tree809f04fc1967c22e5abc52de07602555bed0e920 /qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
parentebb276cca41582b73223b55eff9f2d4386f4f746 (diff)
downloadqpid-python-f160cb6566c17945f7ebc4f3a752b2cc6a051685.tar.gz
QPID-7207: remove cpp and python subdirs from svn trunk, they have migrated to their own git repositories
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1751566 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp373
1 files changed, 0 insertions, 373 deletions
diff --git a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
deleted file mode 100644
index abd96a61e9..0000000000
--- a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
+++ /dev/null
@@ -1,373 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "qpid/broker/amqp/DataReader.h"
-#include "qpid/broker/amqp/Outgoing.h"
-#include "qpid/broker/amqp/Exception.h"
-#include "qpid/broker/amqp/Header.h"
-#include "qpid/broker/amqp/Session.h"
-#include "qpid/broker/amqp/Translation.h"
-#include "qpid/broker/Queue.h"
-#include "qpid/broker/Selector.h"
-#include "qpid/broker/TopicKeyNode.h"
-#include "qpid/sys/OutputControl.h"
-#include "qpid/amqp/descriptors.h"
-#include "qpid/amqp/Descriptor.h"
-#include "qpid/amqp/MessageEncoder.h"
-#include "qpid/framing/Buffer.h"
-#include "qpid/framing/reply_exceptions.h"
-#include "qpid/log/Statement.h"
-#include "config.h"
-
-namespace qpid {
-namespace broker {
-namespace amqp {
-
-Outgoing::Outgoing(Broker& broker, Session& parent, const std::string& source, const std::string& target, const std::string& name)
- : ManagedOutgoingLink(broker, parent, source, target, name), session(parent) {}
-
-void Outgoing::wakeup()
-{
- session.wakeup();
-}
-
-namespace {
-bool requested_reliable(pn_link_t* link)
-{
- return pn_link_remote_snd_settle_mode(link) == PN_SND_UNSETTLED;
-}
-bool requested_unreliable(pn_link_t* link)
-{
- return pn_link_remote_snd_settle_mode(link) == PN_SND_SETTLED;
-}
-}
-
-OutgoingFromQueue::OutgoingFromQueue(Broker& broker, const std::string& source, const std::string& target, boost::shared_ptr<Queue> q, pn_link_t* l, Session& session,
- qpid::sys::OutputControl& o, SubscriptionType type, bool e, bool p)
- : Outgoing(broker, session, source, target, pn_link_name(l)),
- Consumer(pn_link_name(l), type, target),
- exclusive(e),
- isControllingUser(p),
- queue(q), deliveries(5000), link(l), out(o),
- current(0),
- buffer(1024)/*used only for header at present*/,
- //for exclusive queues, assume unreliable unless reliable is explicitly requested; otherwise assume reliable unless unreliable requested
- unreliable(exclusive ? !requested_reliable(link) : requested_unreliable(link)),
- cancelled(false),
- trackingUndeliverableMessages(false)
-{
- for (size_t i = 0 ; i < deliveries.capacity(); ++i) {
- deliveries[i].init(i);
- }
- if (isControllingUser) queue->markInUse(true);
-}
-
-void OutgoingFromQueue::init()
-{
- queue->consume(shared_from_this(), exclusive);//may throw exception
-}
-
-bool OutgoingFromQueue::doWork()
-{
- QPID_LOG(trace, "Dispatching to " << getName() << ": " << pn_link_credit(link));
- if (canDeliver()) {
- try{
- if (queue->dispatch(shared_from_this())) {
- return true;
- } else {
- pn_link_drained(link);
- QPID_LOG(trace, "No message available on " << queue->getName());
- }
- } catch (const qpid::framing::ResourceDeletedException& e) {
- throw Exception(qpid::amqp::error_conditions::RESOURCE_DELETED, e.what());
- }
- } else {
- QPID_LOG(trace, "Can't deliver to " << getName() << " from " << queue->getName() << ": " << pn_link_credit(link));
- }
- return false;
-}
-
-void OutgoingFromQueue::write(const char* data, size_t size)
-{
- pn_link_send(link, data, size);
-}
-
-void OutgoingFromQueue::mergeMessageAnnotationsIfRequired(const Record &r)
-{
- pn_data_t *remoteAnnotationsRaw =
- pn_disposition_annotations(pn_delivery_remote(r.delivery));
- if (remoteAnnotationsRaw == 0) {
- return;
- }
-
- qpid::types::Variant::Map remoteMessageAnnotations;
- DataReader::read(remoteAnnotationsRaw, remoteMessageAnnotations);
- queue->mergeMessageAnnotations(r.cursor, remoteMessageAnnotations);
-}
-
-void OutgoingFromQueue::handle(pn_delivery_t* delivery)
-{
- size_t i = Record::getIndex(pn_delivery_tag(delivery));
- Record& r = deliveries[i];
- if (r.delivery && pn_delivery_updated(delivery)) {
- assert(r.delivery == delivery);
- r.disposition = pn_delivery_remote_state(delivery);
-
- std::pair<TxBuffer*,uint64_t> txn = session.getTransactionalState(delivery);
- if (txn.first) {
- r.disposition = txn.second;
- }
-
- if (!r.disposition && pn_delivery_settled(delivery)) {
- //if peer has settled without setting state, assume accepted
- r.disposition = PN_ACCEPTED;
- }
- if (r.disposition) {
- switch (r.disposition) {
- case PN_ACCEPTED:
- if (preAcquires()) queue->dequeue(r.cursor, txn.first);
- outgoingMessageAccepted();
- break;
- case PN_REJECTED:
- if (preAcquires()) queue->reject(r.cursor);
- outgoingMessageRejected();
- break;
- case PN_RELEASED:
- if (preAcquires()) queue->release(r.cursor, false);//for PN_RELEASED, delivery count should not be incremented
- outgoingMessageRejected();//TODO: not quite true...
- break;
- case PN_MODIFIED:
- if (preAcquires()) {
- mergeMessageAnnotationsIfRequired(r);
- if (pn_disposition_is_undeliverable(pn_delivery_remote(delivery))) {
- if (!trackingUndeliverableMessages) {
- // observe queue for changes to track undeliverable messages
- queue->getObservers().add(
- boost::dynamic_pointer_cast<OutgoingFromQueue>(shared_from_this()));
- trackingUndeliverableMessages = true;
- }
-
- undeliverableMessages.add(r.msg.getSequence());
- }
-
- queue->release(r.cursor, pn_disposition_is_failed(pn_delivery_remote(delivery)));
- }
- outgoingMessageRejected();//TODO: not quite true...
- break;
- default:
- QPID_LOG(warning, "Unhandled disposition: " << r.disposition);
- }
- //TODO: only settle once any dequeue on store has completed
- pn_delivery_settle(delivery);
- r.reset();
- }
- }
-}
-
-bool OutgoingFromQueue::canDeliver()
-{
- return deliveries[current].delivery == 0 && pn_link_credit(link);
-}
-
-void OutgoingFromQueue::detached(bool closed)
-{
- QPID_LOG(debug, "Detaching outgoing link " << getName() << " from " << queue->getName());
-
- if (trackingUndeliverableMessages) {
- // stop observation of the queue
- queue->getObservers().remove(
- boost::dynamic_pointer_cast<OutgoingFromQueue>(shared_from_this()));
- }
-
- queue->cancel(shared_from_this());
- //TODO: release in a clearer order?
- for (size_t i = 0 ; i < deliveries.capacity(); ++i) {
- if (deliveries[i].msg) queue->release(deliveries[i].cursor, true);
- }
- if (exclusive) {
- queue->releaseExclusiveOwnership(closed);
- } else if (isControllingUser) {
- queue->releaseFromUse(true);
- }
- cancelled = true;
-}
-
-OutgoingFromQueue::~OutgoingFromQueue()
-{
- if (!cancelled && isControllingUser) queue->releaseFromUse(true);
-}
-
-//Consumer interface:
-bool OutgoingFromQueue::deliver(const QueueCursor& cursor, const qpid::broker::Message& msg)
-{
- Record& r = deliveries[current++];
- if (current >= deliveries.capacity()) current = 0;
- r.cursor = cursor;
- r.msg = msg;
- r.delivery = pn_delivery(link, r.tag);
- //write header
- qpid::amqp::MessageEncoder encoder(&buffer[0], buffer.size());
- encoder.writeHeader(Header(r.msg));
- write(&buffer[0], encoder.getPosition());
- Translation t(r.msg);
- t.write(*this);
- if (pn_link_advance(link)) {
- if (unreliable) pn_delivery_settle(r.delivery);
- outgoingMessageSent();
- QPID_LOG(debug, "Sent message " << r.msg.getSequence() << " from " << queue->getName() << ", index=" << r.index);
- } else {
- QPID_LOG(error, "Failed to send message " << r.msg.getSequence() << " from " << queue->getName() << ", index=" << r.index);
- }
- if (unreliable) {
- if (preAcquires()) queue->dequeue(0, r.cursor);
- r.reset();
- }
- QPID_LOG(debug, "Requested delivery of " << r.msg.getSequence() << " from " << queue->getName() << ", index=" << r.index);
- return true;
-}
-
-void OutgoingFromQueue::notify()
-{
- QPID_LOG(trace, "Notification received for " << queue->getName());
- out.activateOutput();
-}
-
-bool OutgoingFromQueue::accept(const qpid::broker::Message&)
-{
- return true;
-}
-
-void OutgoingFromQueue::setSubjectFilter(const std::string& f)
-{
- subjectFilter = f;
-}
-
-void OutgoingFromQueue::setSelectorFilter(const std::string& f)
-{
- selector.reset(new Selector(f));
-}
-
-namespace {
-
-bool match(TokenIterator& filter, TokenIterator& target)
-{
- bool wild = false;
- while (!filter.finished())
- {
- if (filter.match1('*')) {
- if (target.finished()) return false;
- //else move to next word in filter target
- filter.next();
- target.next();
- } else if (filter.match1('#')) {
- // i.e. filter word is '#' which can match a variable number of words in the target
- filter.next();
- if (filter.finished()) return true;
- else if (target.finished()) return false;
- wild = true;
- } else {
- //filter word needs to match target exactly
- if (target.finished()) return false;
- std::string word;
- target.pop(word);
- if (filter.match(word)) {
- wild = false;
- filter.next();
- } else if (!wild) {
- return false;
- }
- }
- }
- return target.finished();
-}
-bool match(const std::string& filter, const std::string& target)
-{
- TokenIterator lhs(filter);
- TokenIterator rhs(target);
- return match(lhs, rhs);
-}
-}
-
-bool OutgoingFromQueue::filter(const qpid::broker::Message& m)
-{
- if (undeliverableMessages.contains(m.getSequence())) return false;
- return (subjectFilter.empty() || subjectFilter == m.getRoutingKey() || match(subjectFilter, m.getRoutingKey()))
- && (!selector || selector->filter(m));
-}
-
-void OutgoingFromQueue::cancel() {}
-
-void OutgoingFromQueue::acknowledged(const qpid::broker::DeliveryRecord&) {}
-
-qpid::broker::OwnershipToken* OutgoingFromQueue::getSession()
-{
- return 0;
-}
-
-OutgoingFromQueue::Record::Record() : delivery(0), disposition(0), index(0)
-{
-#ifdef NO_PROTON_DELIVERY_TAG_T
- tag.start = tagData;
-#else
- tag.bytes = tagData;
-#endif
- tag.size = TAG_WIDTH;
-}
-void OutgoingFromQueue::Record::init(size_t i)
-{
- index = i;
- qpid::framing::Buffer buffer(tagData, tag.size);
- assert(index <= std::numeric_limits<uint32_t>::max());
- buffer.putLong(index);
-}
-void OutgoingFromQueue::Record::reset()
-{
- cursor = QueueCursor();
- msg = qpid::broker::Message();
- delivery = 0;
- disposition = 0;
-}
-
-size_t OutgoingFromQueue::Record::getIndex(pn_delivery_tag_t t)
-{
- assert(t.size == TAG_WIDTH);
-#ifdef NO_PROTON_DELIVERY_TAG_T
- qpid::framing::Buffer buffer(const_cast<char*>(t.start)/*won't ever be written to*/, t.size);
-#else
- qpid::framing::Buffer buffer(const_cast<char*>(t.bytes)/*won't ever be written to*/, t.size);
-#endif
- return (size_t) buffer.getLong();
-}
-
-boost::shared_ptr<Queue> OutgoingFromQueue::getExclusiveSubscriptionQueue(Outgoing* o)
-{
- OutgoingFromQueue* s = dynamic_cast<OutgoingFromQueue*>(o);
- if (s && s->exclusive) return s->queue;
- else return boost::shared_ptr<Queue>();
-}
-
-void OutgoingFromQueue::dequeued(const qpid::broker::Message &m)
-{
- if (undeliverableMessages.contains(m.getSequence())) {
- undeliverableMessages.remove(m.getSequence());
- }
-}
-
-}}} // namespace qpid::broker::amqp