#ifndef QPID_BROKER_AMQP1_OUTGOING_H #define QPID_BROKER_AMQP1_OUTGOING_H /* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ #include "qpid/broker/amqp/Message.h" #include "qpid/broker/amqp/ManagedOutgoingLink.h" #include "qpid/broker/Consumer.h" #include "qpid/broker/QueueObserver.h" #include #include #include extern "C" { #include } namespace qpid { namespace sys { class OutputControl; } namespace framing { class SequenceSet; } namespace broker { class Broker; class Queue; class Selector; namespace amqp { class Session; template class CircularArray { public: CircularArray(size_t l) : limit(l), data(new T[limit]) {} T& operator[](size_t i) { return data[i]; } size_t capacity() { return limit; } ~CircularArray() { delete [] data; } private: const size_t limit; T* const data; size_t next; }; class Outgoing : public ManagedOutgoingLink { public: Outgoing(Broker& broker, Session& parent, const std::string& source, const std::string& target, const std::string& name); virtual void setSubjectFilter(const std::string&) = 0; virtual void setSelectorFilter(const std::string&) = 0; virtual void init() = 0; /** * Allows the link to initiate any outgoing transfers */ virtual bool doWork() = 0; /** * Signals that this link has been detached */ virtual void detached(bool closed) = 0; /** * Called when a delivery is writable */ virtual void handle(pn_delivery_t* delivery) = 0; void wakeup(); virtual ~Outgoing() {} protected: Session& session; }; /** * Logic for handling an outgoing link from a queue (even if it is a * subscription pseduo-queue created by the broker) */ class OutgoingFromQueue : public Outgoing, public qpid::broker::Consumer, public boost::enable_shared_from_this, public qpid::broker::QueueObserver { public: OutgoingFromQueue(Broker&, const std::string& source, const std::string& target, boost::shared_ptr q, pn_link_t* l, Session&, qpid::sys::OutputControl& o, SubscriptionType type, bool exclusive, bool isControllingUser); ~OutgoingFromQueue(); void setSubjectFilter(const std::string&); void setSelectorFilter(const std::string&); void init(); bool doWork(); void write(const char* data, size_t size); void handle(pn_delivery_t* delivery); bool canDeliver(); void detached(bool closed); // Consumer interface: bool deliver(const QueueCursor& cursor, const qpid::broker::Message& msg); void notify(); bool accept(const qpid::broker::Message&); bool filter(const qpid::broker::Message&); void cancel(); void acknowledged(const qpid::broker::DeliveryRecord&); qpid::broker::OwnershipToken* getSession(); static boost::shared_ptr getExclusiveSubscriptionQueue(Outgoing*); // QueueObserver interface virtual void enqueued(const qpid::broker::Message&) {}; virtual void acquired(const qpid::broker::Message&) {}; virtual void requeued(const qpid::broker::Message&) {}; virtual void dequeued(const qpid::broker::Message&); virtual void consumerAdded(const qpid::broker::Consumer&) {}; virtual void consumerRemoved(const qpid::broker::Consumer&) {}; private: struct Record { QueueCursor cursor; qpid::broker::Message msg; pn_delivery_t* delivery; int disposition; size_t index; pn_delivery_tag_t tag; //The delivery tag is a 4 byte value representing the //index. It is encoded separately to avoid alignment issues. //The number of deliveries held here is always strictly //bounded, so 4 bytes is more than enough. static const size_t TAG_WIDTH = sizeof(uint32_t); char tagData[TAG_WIDTH]; Record(); void init(size_t i); void reset(); static size_t getIndex(pn_delivery_tag_t); }; const bool exclusive; const bool isControllingUser; boost::shared_ptr queue; CircularArray deliveries; pn_link_t* link; qpid::sys::OutputControl& out; size_t current; std::vector buffer; std::string subjectFilter; boost::scoped_ptr selector; bool unreliable; bool cancelled; bool trackingUndeliverableMessages; qpid::framing::SequenceSet undeliverableMessages; }; }}} // namespace qpid::broker::amqp #endif /*!QPID_BROKER_AMQP1_OUTGOING_H*/