diff options
author | Alan Conway <aconway@apache.org> | 2012-02-14 16:02:04 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2012-02-14 16:02:04 +0000 |
commit | fb6bbc399e8a1911d4ec3693509cf7ca571a582a (patch) | |
tree | f5c80e3991186e0b0f54b2b41f482a7e3f2aa756 | |
parent | a016c5c107cee09c9c7709b4d865c361e735ab36 (diff) | |
download | qpid-python-fb6bbc399e8a1911d4ec3693509cf7ca571a582a.tar.gz |
QPID-3603: Move broker::ReplicatingSubscription to ha namespace and ha plugin.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3603-6@1244036 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/Makefile.am | 3 | ||||
-rw-r--r-- | qpid/cpp/src/ha.mk | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Broker.h | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Consumer.h | 3 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/ConsumerFactory.h | 70 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/DeliveryRecord.cpp | 6 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/DeliveryRecord.h | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SemanticState.cpp | 41 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SemanticState.h | 9 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp (renamed from qpid/cpp/src/qpid/broker/ReplicatingSubscription.cpp) | 7 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/ReplicatingSubscription.h (renamed from qpid/cpp/src/qpid/broker/ReplicatingSubscription.h) | 37 |
11 files changed, 132 insertions, 54 deletions
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am index 12e45bf1f3..b11f67a0a5 100644 --- a/qpid/cpp/src/Makefile.am +++ b/qpid/cpp/src/Makefile.am @@ -542,6 +542,7 @@ libqpidbroker_la_SOURCES = \ qpid/broker/Consumer.h \ qpid/broker/Credit.h \ qpid/broker/Credit.cpp \ + qpid/broker/ConsumerFactory.h \ qpid/broker/Daemon.cpp \ qpid/broker/Daemon.h \ qpid/broker/Deliverable.h \ @@ -629,8 +630,6 @@ libqpidbroker_la_SOURCES = \ qpid/broker/QueuedMessage.h \ qpid/broker/QueueFlowLimit.h \ qpid/broker/QueueFlowLimit.cpp \ - qpid/broker/ReplicatingSubscription.h \ - qpid/broker/ReplicatingSubscription.cpp \ qpid/broker/RateFlowcontrol.h \ qpid/broker/RecoverableConfig.h \ qpid/broker/RecoverableExchange.h \ diff --git a/qpid/cpp/src/ha.mk b/qpid/cpp/src/ha.mk index 3d465d235e..d367ba2101 100644 --- a/qpid/cpp/src/ha.mk +++ b/qpid/cpp/src/ha.mk @@ -31,6 +31,8 @@ ha_la_SOURCES = \ qpid/ha/Settings.h \ qpid/ha/QueueReplicator.h \ qpid/ha/QueueReplicator.cpp \ + qpid/ha/ReplicatingSubscription.h \ + qpid/ha/ReplicatingSubscription.cpp \ qpid/ha/WiringReplicator.cpp \ qpid/ha/WiringReplicator.h diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h index 840d47ac38..11cf81ea9e 100644 --- a/qpid/cpp/src/qpid/broker/Broker.h +++ b/qpid/cpp/src/qpid/broker/Broker.h @@ -37,6 +37,7 @@ #include "qpid/broker/Vhost.h" #include "qpid/broker/System.h" #include "qpid/broker/ExpiryPolicy.h" +#include "qpid/broker/ConsumerFactory.h" #include "qpid/management/Manageable.h" #include "qpid/management/ManagementAgent.h" #include "qmf/org/apache/qpid/broker/Broker.h" @@ -198,6 +199,7 @@ public: bool inCluster, clusterUpdatee; boost::intrusive_ptr<ExpiryPolicy> expiryPolicy; ConnectionCounter connectionCounter; + ConsumerFactories consumerFactories; public: virtual ~Broker(); @@ -356,6 +358,8 @@ public: const std::string& key, const std::string& userId, const std::string& connectionId); + + ConsumerFactories& getConsumerFactories() { return consumerFactories; } }; }} diff --git a/qpid/cpp/src/qpid/broker/Consumer.h b/qpid/cpp/src/qpid/broker/Consumer.h index 647f082e44..83f744c0fc 100644 --- a/qpid/cpp/src/qpid/broker/Consumer.h +++ b/qpid/cpp/src/qpid/broker/Consumer.h @@ -31,6 +31,9 @@ namespace broker { class Queue; class QueueListeners; +/** + * Base class for consumers which represent a subscription to a queue. + */ class Consumer { const bool acquires; // inListeners allows QueueListeners to efficiently track if this instance is registered diff --git a/qpid/cpp/src/qpid/broker/ConsumerFactory.h b/qpid/cpp/src/qpid/broker/ConsumerFactory.h new file mode 100644 index 0000000000..abd39fb3f8 --- /dev/null +++ b/qpid/cpp/src/qpid/broker/ConsumerFactory.h @@ -0,0 +1,70 @@ +#ifndef QPID_BROKER_CONSUMERFACTORY_H +#define QPID_BROKER_CONSUMERFACTORY_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +// TODO aconway 2011-11-25: it's ugly exposing SemanticState::ConsumerImpl in public. +// Refactor to use a more abstract interface. + +#include "qpid/broker/SemanticState.h" + +namespace qpid { +namespace broker { + +/** + * Base class for consumer factoires. Plugins can register a + * ConsumerFactory via Broker:: getConsumerFactories() Each time a + * conumer is created, each factory is tried in turn till one returns + * non-0. + */ +class ConsumerFactory +{ + public: + virtual ~ConsumerFactory() {} + + virtual boost::shared_ptr<SemanticState::ConsumerImpl> create( + SemanticState* parent, + const std::string& name, boost::shared_ptr<Queue> queue, + bool ack, bool acquire, bool exclusive, const std::string& tag, + const std::string& resumeId, uint64_t resumeTtl, const framing::FieldTable& arguments) = 0; +}; + +/** A set of factories held by the broker + * THREAD UNSAFE: see notes on member functions. + */ +class ConsumerFactories { + public: + typedef std::vector<boost::shared_ptr<ConsumerFactory> > Factories; + + /** Thread safety: May only be called during plug-in initialization. */ + void add(const boost::shared_ptr<ConsumerFactory>& cf) { factories.push_back(cf); } + + /** Thread safety: May only be called after plug-in initialization. */ + const Factories& get() const { return factories; } + + private: + Factories factories; +}; + +}} // namespace qpid::broker + +#endif /*!QPID_BROKER_CONSUMERFACTORY_H*/ diff --git a/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp b/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp index a153ccbba6..47ddead0f8 100644 --- a/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp +++ b/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp @@ -37,7 +37,7 @@ DeliveryRecord::DeliveryRecord(const QueuedMessage& _msg, bool _acquired, bool accepted, bool _windowing, - uint32_t _credit, bool _delayedCompletion) : msg(_msg), + uint32_t _credit, bool _isDelayedCompletion) : msg(_msg), queue(_queue), tag(_tag), acquired(_acquired), @@ -47,7 +47,7 @@ DeliveryRecord::DeliveryRecord(const QueuedMessage& _msg, ended(accepted && acquired), windowing(_windowing), credit(msg.payload ? msg.payload->getRequiredCredit() : _credit), - delayedCompletion(_delayedCompletion) + isDelayedCompletion(_isDelayedCompletion) {} bool DeliveryRecord::setEnded() @@ -115,7 +115,7 @@ bool DeliveryRecord::accept(TransactionContext* ctxt) { if (!ended) { if (acquired) { queue->dequeue(ctxt, msg); - } else if (delayedCompletion) { + } else if (isDelayedCompletion) { //TODO: this is a nasty way to do this; change it msg.payload->getIngressCompletion().finishCompleter(); QPID_LOG(debug, "Completed " << msg.payload.get()); diff --git a/qpid/cpp/src/qpid/broker/DeliveryRecord.h b/qpid/cpp/src/qpid/broker/DeliveryRecord.h index 90e72aaf0d..ea33ed5461 100644 --- a/qpid/cpp/src/qpid/broker/DeliveryRecord.h +++ b/qpid/cpp/src/qpid/broker/DeliveryRecord.h @@ -63,7 +63,7 @@ class DeliveryRecord * after that). */ uint32_t credit; - bool delayedCompletion; + bool isDelayedCompletion; public: QPID_BROKER_EXTERN DeliveryRecord(const QueuedMessage& msg, @@ -73,7 +73,7 @@ class DeliveryRecord bool accepted, bool windowing, uint32_t credit=0, // Only used if msg is empty. - bool delayedCompletion=false + bool isDelayedCompletion=false ); bool coveredBy(const framing::SequenceSet* const range) const { return range->contains(id); } diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp index 333b707308..de2b09660c 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.cpp +++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp @@ -25,9 +25,7 @@ #include "qpid/broker/DtxAck.h" #include "qpid/broker/DtxTimeout.h" #include "qpid/broker/Message.h" -#include "qpid/ha/WiringReplicator.h" #include "qpid/broker/Queue.h" -#include "qpid/broker/ReplicatingSubscription.h" #include "qpid/broker/SessionContext.h" #include "qpid/broker/SessionOutputException.h" #include "qpid/broker/TxAccept.h" @@ -108,15 +106,25 @@ bool SemanticState::exists(const string& consumerTag){ namespace { const std::string SEPARATOR("::"); } - + void SemanticState::consume(const string& tag, Queue::shared_ptr queue, bool ackRequired, bool acquire, - bool exclusive, const string& resumeId, uint64_t resumeTtl, const FieldTable& arguments) + bool exclusive, const string& resumeId, uint64_t resumeTtl, + const FieldTable& arguments) { // "tag" is only guaranteed to be unique to this session (see AMQP 0-10 Message.subscribe, destination). // Create a globally unique name so the broker can identify individual consumers std::string name = session.getSessionId().str() + SEPARATOR + tag; - ConsumerImpl::shared_ptr c(ConsumerImpl::create(this, name, queue, ackRequired, acquire, exclusive, tag, resumeId, resumeTtl, arguments)); + const ConsumerFactories::Factories& cf( + session.getBroker().getConsumerFactories().get()); + ConsumerImpl::shared_ptr c; + for (ConsumerFactories::Factories::const_iterator i = cf.begin(); i != cf.end(); !c) + c = (*i)->create(this, name, queue, ackRequired, acquire, exclusive, tag, + resumeId, resumeTtl, arguments); + if (!c) // Create plain consumer + c = ConsumerImpl::shared_ptr( + new ConsumerImpl(this, name, queue, ackRequired, acquire, exclusive, tag, + resumeId, resumeTtl, arguments)); queue->consume(c, exclusive);//may throw exception consumers[tag] = c; } @@ -266,26 +274,6 @@ void SemanticState::record(const DeliveryRecord& delivery) const std::string QPID_SYNC_FREQUENCY("qpid.sync_frequency"); -SemanticState::ConsumerImpl::shared_ptr SemanticState::ConsumerImpl::create(SemanticState* parent, - const string& name, - Queue::shared_ptr queue, - bool ack, - bool acquire, - bool exclusive, - const string& tag, - const string& resumeId, - uint64_t resumeTtl, - const framing::FieldTable& arguments) -{ - if (arguments.isSet("qpid.replicating-subscription")) { - shared_ptr result(new ReplicatingSubscription(parent, name, queue, ack, acquire, exclusive, tag, resumeId, resumeTtl, arguments)); - boost::dynamic_pointer_cast<ReplicatingSubscription>(result)->init(); - return result; - } else { - return shared_ptr(new ConsumerImpl(parent, name, queue, ack, acquire, exclusive, tag, resumeId, resumeTtl, arguments)); - } -} - SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, const string& _name, Queue::shared_ptr _queue, @@ -297,7 +285,6 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, uint64_t _resumeTtl, const framing::FieldTable& _arguments - ) : Consumer(_name, _acquire), parent(_parent), @@ -354,7 +341,7 @@ bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg) { assertClusterSafe(); allocateCredit(msg.payload); - DeliveryRecord record(msg, msg.queue->shared_from_this(), getTag(), acquire, !ackExpected, credit.isWindowMode(), 0, dynamic_cast<const ReplicatingSubscription*>(this)); + DeliveryRecord record(msg, msg.queue->shared_from_this(), getTag(), acquire, !ackExpected, credit.isWindowMode(), 0, isDelayedCompletion()); bool sync = syncFrequency && ++deliveryCount >= syncFrequency; if (sync) deliveryCount = 0;//reset parent->deliver(record, sync); diff --git a/qpid/cpp/src/qpid/broker/SemanticState.h b/qpid/cpp/src/qpid/broker/SemanticState.h index ec4bcb756c..9b81767fb6 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.h +++ b/qpid/cpp/src/qpid/broker/SemanticState.h @@ -153,11 +153,10 @@ class SemanticState : private boost::noncopyable { management::ManagementObject* GetManagementObject (void) const; management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args, std::string& text); - static shared_ptr create(SemanticState* parent, - const std::string& name, boost::shared_ptr<Queue> queue, - bool ack, bool acquire, bool exclusive, const std::string& tag, - const std::string& resumeId, uint64_t resumeTtl, const framing::FieldTable& arguments); - + /** This consumer wants delayed completion. + * Overridden by ConsumerImpl subclasses. + */ + virtual bool isDelayedCompletion() const { return false; } }; typedef std::map<std::string, DtxBuffer::shared_ptr> DtxBufferMap; diff --git a/qpid/cpp/src/qpid/broker/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp index 4041e6ac7a..873d3784a7 100644 --- a/qpid/cpp/src/qpid/broker/ReplicatingSubscription.cpp +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp @@ -20,18 +20,19 @@ */ #include "ReplicatingSubscription.h" -#include "Queue.h" +#include "qpid/broker/Queue.h" #include "qpid/framing/AMQFrame.h" #include "qpid/framing/MessageTransferBody.h" #include "qpid/log/Statement.h" namespace qpid { -namespace broker { +namespace ha { using namespace framing; +using namespace broker; const std::string DOLLAR("$"); -const std::string INTERNAL("_internall"); +const std::string INTERNAL("_internal"); class ReplicationStateInitialiser { diff --git a/qpid/cpp/src/qpid/broker/ReplicatingSubscription.h b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h index a79c712e85..a1c10a7641 100644 --- a/qpid/cpp/src/qpid/broker/ReplicatingSubscription.h +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h @@ -23,35 +23,48 @@ */ #include "qpid/broker/SemanticState.h" +#include "qpid/broker/QueueObserver.h" namespace qpid { + namespace broker { +class Message; +class Queue; +class QueuedMessage; +class OwnershipToken; +} + +namespace ha { /** * Subscriber to a remote queue that replicates to a local queue. */ -class ReplicatingSubscription : public SemanticState::ConsumerImpl, public QueueObserver +class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl, + public broker::QueueObserver { public: - ReplicatingSubscription(SemanticState* parent, - const std::string& name, boost::shared_ptr<Queue> queue, + ReplicatingSubscription(broker::SemanticState* parent, + const std::string& name, boost::shared_ptr<broker::Queue> , bool ack, bool acquire, bool exclusive, const std::string& tag, - const std::string& resumeId, uint64_t resumeTtl, const framing::FieldTable& arguments); + const std::string& resumeId, uint64_t resumeTtl, + const framing::FieldTable& arguments); ~ReplicatingSubscription(); void init(); void cancel(); - bool deliver(QueuedMessage& msg); - void enqueued(const QueuedMessage&); - void dequeued(const QueuedMessage&); - void acquired(const QueuedMessage&) {} - void requeued(const QueuedMessage&) {} + bool deliver(broker::QueuedMessage& msg); + void enqueued(const broker::QueuedMessage&); + void dequeued(const broker::QueuedMessage&); + void acquired(const broker::QueuedMessage&) {} + void requeued(const broker::QueuedMessage&) {} + + bool isDelayedCompletion() const { return true; } protected: bool doDispatch(); private: - boost::shared_ptr<Queue> events; - boost::shared_ptr<Consumer> consumer; + boost::shared_ptr<broker::Queue> events; + boost::shared_ptr<broker::Consumer> consumer; qpid::framing::SequenceSet range; void generateDequeueEvent(); @@ -60,7 +73,7 @@ class ReplicatingSubscription : public SemanticState::ConsumerImpl, public Queue public: DelegatingConsumer(ReplicatingSubscription&); ~DelegatingConsumer(); - bool deliver(QueuedMessage& msg); + bool deliver(broker::QueuedMessage& msg); void notify(); bool filter(boost::intrusive_ptr<Message>); bool accept(boost::intrusive_ptr<Message>); |