diff options
author | Robert Gemmell <robbie@apache.org> | 2016-07-05 21:55:35 +0000 |
---|---|---|
committer | Robert Gemmell <robbie@apache.org> | 2016-07-05 21:55:35 +0000 |
commit | f160cb6566c17945f7ebc4f3a752b2cc6a051685 (patch) | |
tree | 809f04fc1967c22e5abc52de07602555bed0e920 /qpid/cpp/src/qpid/broker/amqp/Outgoing.h | |
parent | ebb276cca41582b73223b55eff9f2d4386f4f746 (diff) | |
download | qpid-python-f160cb6566c17945f7ebc4f3a752b2cc6a051685.tar.gz |
QPID-7207: remove cpp and python subdirs from svn trunk, they have migrated to their own git repositories
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1751566 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/qpid/broker/amqp/Outgoing.h')
-rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Outgoing.h | 175 |
1 files changed, 0 insertions, 175 deletions
diff --git a/qpid/cpp/src/qpid/broker/amqp/Outgoing.h b/qpid/cpp/src/qpid/broker/amqp/Outgoing.h deleted file mode 100644 index f4ca4691b3..0000000000 --- a/qpid/cpp/src/qpid/broker/amqp/Outgoing.h +++ /dev/null @@ -1,175 +0,0 @@ -#ifndef QPID_BROKER_AMQP1_OUTGOING_H -#define QPID_BROKER_AMQP1_OUTGOING_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "qpid/broker/amqp/Message.h" -#include "qpid/broker/amqp/ManagedOutgoingLink.h" -#include "qpid/broker/Consumer.h" -#include "qpid/broker/QueueObserver.h" - -#include <boost/shared_ptr.hpp> -#include <boost/scoped_ptr.hpp> -#include <boost/enable_shared_from_this.hpp> -extern "C" { -#include <proton/engine.h> -} - -namespace qpid { -namespace sys { -class OutputControl; -} - -namespace framing { -class SequenceSet; -} - -namespace broker { -class Broker; -class Queue; -class Selector; - -namespace amqp { -class Session; - -template <class T> -class CircularArray -{ - public: - CircularArray(size_t l) : limit(l), data(new T[limit]) {} - T& operator[](size_t i) { return data[i]; } - size_t capacity() { return limit; } - ~CircularArray() { delete [] data; } - private: - const size_t limit; - T* const data; - size_t next; -}; - -class Outgoing : public ManagedOutgoingLink -{ - public: - Outgoing(Broker& broker, Session& parent, const std::string& source, const std::string& target, const std::string& name); - virtual void setSubjectFilter(const std::string&) = 0; - virtual void setSelectorFilter(const std::string&) = 0; - virtual void init() = 0; - /** - * Allows the link to initiate any outgoing transfers - */ - virtual bool doWork() = 0; - /** - * Signals that this link has been detached - */ - virtual void detached(bool closed) = 0; - /** - * Called when a delivery is writable - */ - virtual void handle(pn_delivery_t* delivery) = 0; - - void wakeup(); - virtual ~Outgoing() {} - protected: - Session& session; -}; - -/** - * Logic for handling an outgoing link from a queue (even if it is a - * subscription pseduo-queue created by the broker) - */ -class OutgoingFromQueue : public Outgoing, public qpid::broker::Consumer, - public boost::enable_shared_from_this<OutgoingFromQueue>, - public qpid::broker::QueueObserver -{ - public: - OutgoingFromQueue(Broker&, const std::string& source, const std::string& target, boost::shared_ptr<Queue> q, pn_link_t* l, Session&, - qpid::sys::OutputControl& o, SubscriptionType type, bool exclusive, bool isControllingUser); - ~OutgoingFromQueue(); - void setSubjectFilter(const std::string&); - void setSelectorFilter(const std::string&); - void init(); - bool doWork(); - void write(const char* data, size_t size); - void handle(pn_delivery_t* delivery); - bool canDeliver(); - void detached(bool closed); - - // Consumer interface: - bool deliver(const QueueCursor& cursor, const qpid::broker::Message& msg); - void notify(); - bool accept(const qpid::broker::Message&); - bool filter(const qpid::broker::Message&); - void cancel(); - void acknowledged(const qpid::broker::DeliveryRecord&); - qpid::broker::OwnershipToken* getSession(); - static boost::shared_ptr<Queue> getExclusiveSubscriptionQueue(Outgoing*); - - // QueueObserver interface - virtual void enqueued(const qpid::broker::Message&) {}; - virtual void acquired(const qpid::broker::Message&) {}; - virtual void requeued(const qpid::broker::Message&) {}; - virtual void dequeued(const qpid::broker::Message&); - virtual void consumerAdded(const qpid::broker::Consumer&) {}; - virtual void consumerRemoved(const qpid::broker::Consumer&) {}; - - private: - - struct Record - { - QueueCursor cursor; - qpid::broker::Message msg; - pn_delivery_t* delivery; - int disposition; - size_t index; - pn_delivery_tag_t tag; - //The delivery tag is a 4 byte value representing the - //index. It is encoded separately to avoid alignment issues. - //The number of deliveries held here is always strictly - //bounded, so 4 bytes is more than enough. - static const size_t TAG_WIDTH = sizeof(uint32_t); - char tagData[TAG_WIDTH]; - - Record(); - void init(size_t i); - void reset(); - static size_t getIndex(pn_delivery_tag_t); - }; - - void mergeMessageAnnotationsIfRequired(const Record &r); - - const bool exclusive; - const bool isControllingUser; - boost::shared_ptr<Queue> queue; - CircularArray<Record> deliveries; - pn_link_t* link; - qpid::sys::OutputControl& out; - size_t current; - std::vector<char> buffer; - std::string subjectFilter; - boost::scoped_ptr<Selector> selector; - bool unreliable; - bool cancelled; - - bool trackingUndeliverableMessages; - qpid::framing::SequenceSet undeliverableMessages; -}; -}}} // namespace qpid::broker::amqp - -#endif /*!QPID_BROKER_AMQP1_OUTGOING_H*/ |