summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-01-19 23:04:04 +0000
committerAlan Conway <aconway@apache.org>2012-01-19 23:04:04 +0000
commitf87f955c057c15e8af6ae27c5b16f4a0c9c32ec9 (patch)
tree1fdbf98b832bd6a18e1b829ed3c13ae56d7fddf7
parent7305cf39aff572bca77d89c12cc6d403e20ab20d (diff)
downloadqpid-python-f87f955c057c15e8af6ae27c5b16f4a0c9c32ec9.tar.gz
QPID-3603: Move broker::ReplicatingSubscription to ha namespace and ha plugin.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3603-2@1233648 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/Makefile.am3
-rw-r--r--qpid/cpp/src/ha.mk2
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.h4
-rw-r--r--qpid/cpp/src/qpid/broker/Consumer.h3
-rw-r--r--qpid/cpp/src/qpid/broker/ConsumerFactory.h70
-rw-r--r--qpid/cpp/src/qpid/broker/DeliveryRecord.cpp6
-rw-r--r--qpid/cpp/src/qpid/broker/DeliveryRecord.h4
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.cpp41
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.h9
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp (renamed from qpid/cpp/src/qpid/broker/ReplicatingSubscription.cpp)7
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.h (renamed from qpid/cpp/src/qpid/broker/ReplicatingSubscription.h)37
11 files changed, 132 insertions, 54 deletions
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am
index ec8ff98a54..5caf54e2ba 100644
--- a/qpid/cpp/src/Makefile.am
+++ b/qpid/cpp/src/Makefile.am
@@ -540,6 +540,7 @@ libqpidbroker_la_SOURCES = \
qpid/broker/Consumer.h \
qpid/broker/Credit.h \
qpid/broker/Credit.cpp \
+ qpid/broker/ConsumerFactory.h \
qpid/broker/Daemon.cpp \
qpid/broker/Daemon.h \
qpid/broker/Deliverable.h \
@@ -627,8 +628,6 @@ libqpidbroker_la_SOURCES = \
qpid/broker/QueuedMessage.h \
qpid/broker/QueueFlowLimit.h \
qpid/broker/QueueFlowLimit.cpp \
- qpid/broker/ReplicatingSubscription.h \
- qpid/broker/ReplicatingSubscription.cpp \
qpid/broker/RateFlowcontrol.h \
qpid/broker/RecoverableConfig.h \
qpid/broker/RecoverableExchange.h \
diff --git a/qpid/cpp/src/ha.mk b/qpid/cpp/src/ha.mk
index 3d465d235e..d367ba2101 100644
--- a/qpid/cpp/src/ha.mk
+++ b/qpid/cpp/src/ha.mk
@@ -31,6 +31,8 @@ ha_la_SOURCES = \
qpid/ha/Settings.h \
qpid/ha/QueueReplicator.h \
qpid/ha/QueueReplicator.cpp \
+ qpid/ha/ReplicatingSubscription.h \
+ qpid/ha/ReplicatingSubscription.cpp \
qpid/ha/WiringReplicator.cpp \
qpid/ha/WiringReplicator.h
diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h
index 840d47ac38..11cf81ea9e 100644
--- a/qpid/cpp/src/qpid/broker/Broker.h
+++ b/qpid/cpp/src/qpid/broker/Broker.h
@@ -37,6 +37,7 @@
#include "qpid/broker/Vhost.h"
#include "qpid/broker/System.h"
#include "qpid/broker/ExpiryPolicy.h"
+#include "qpid/broker/ConsumerFactory.h"
#include "qpid/management/Manageable.h"
#include "qpid/management/ManagementAgent.h"
#include "qmf/org/apache/qpid/broker/Broker.h"
@@ -198,6 +199,7 @@ public:
bool inCluster, clusterUpdatee;
boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
ConnectionCounter connectionCounter;
+ ConsumerFactories consumerFactories;
public:
virtual ~Broker();
@@ -356,6 +358,8 @@ public:
const std::string& key,
const std::string& userId,
const std::string& connectionId);
+
+ ConsumerFactories& getConsumerFactories() { return consumerFactories; }
};
}}
diff --git a/qpid/cpp/src/qpid/broker/Consumer.h b/qpid/cpp/src/qpid/broker/Consumer.h
index 647f082e44..83f744c0fc 100644
--- a/qpid/cpp/src/qpid/broker/Consumer.h
+++ b/qpid/cpp/src/qpid/broker/Consumer.h
@@ -31,6 +31,9 @@ namespace broker {
class Queue;
class QueueListeners;
+/**
+ * Base class for consumers which represent a subscription to a queue.
+ */
class Consumer {
const bool acquires;
// inListeners allows QueueListeners to efficiently track if this instance is registered
diff --git a/qpid/cpp/src/qpid/broker/ConsumerFactory.h b/qpid/cpp/src/qpid/broker/ConsumerFactory.h
new file mode 100644
index 0000000000..abd39fb3f8
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/ConsumerFactory.h
@@ -0,0 +1,70 @@
+#ifndef QPID_BROKER_CONSUMERFACTORY_H
+#define QPID_BROKER_CONSUMERFACTORY_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+// TODO aconway 2011-11-25: it's ugly exposing SemanticState::ConsumerImpl in public.
+// Refactor to use a more abstract interface.
+
+#include "qpid/broker/SemanticState.h"
+
+namespace qpid {
+namespace broker {
+
+/**
+ * Base class for consumer factoires. Plugins can register a
+ * ConsumerFactory via Broker:: getConsumerFactories() Each time a
+ * conumer is created, each factory is tried in turn till one returns
+ * non-0.
+ */
+class ConsumerFactory
+{
+ public:
+ virtual ~ConsumerFactory() {}
+
+ virtual boost::shared_ptr<SemanticState::ConsumerImpl> create(
+ SemanticState* parent,
+ const std::string& name, boost::shared_ptr<Queue> queue,
+ bool ack, bool acquire, bool exclusive, const std::string& tag,
+ const std::string& resumeId, uint64_t resumeTtl, const framing::FieldTable& arguments) = 0;
+};
+
+/** A set of factories held by the broker
+ * THREAD UNSAFE: see notes on member functions.
+ */
+class ConsumerFactories {
+ public:
+ typedef std::vector<boost::shared_ptr<ConsumerFactory> > Factories;
+
+ /** Thread safety: May only be called during plug-in initialization. */
+ void add(const boost::shared_ptr<ConsumerFactory>& cf) { factories.push_back(cf); }
+
+ /** Thread safety: May only be called after plug-in initialization. */
+ const Factories& get() const { return factories; }
+
+ private:
+ Factories factories;
+};
+
+}} // namespace qpid::broker
+
+#endif /*!QPID_BROKER_CONSUMERFACTORY_H*/
diff --git a/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp b/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
index 43ca1ae04b..4fe76dabd8 100644
--- a/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
+++ b/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
@@ -37,7 +37,7 @@ DeliveryRecord::DeliveryRecord(const QueuedMessage& _msg,
bool _acquired,
bool accepted,
bool _windowing,
- uint32_t _credit, bool _delayedCompletion) : msg(_msg),
+ uint32_t _credit, bool _isDelayedCompletion) : msg(_msg),
queue(_queue),
tag(_tag),
acquired(_acquired),
@@ -47,7 +47,7 @@ DeliveryRecord::DeliveryRecord(const QueuedMessage& _msg,
ended(accepted && acquired),
windowing(_windowing),
credit(msg.payload ? msg.payload->getRequiredCredit() : _credit),
- delayedCompletion(_delayedCompletion)
+ isDelayedCompletion(_isDelayedCompletion)
{}
bool DeliveryRecord::setEnded()
@@ -115,7 +115,7 @@ bool DeliveryRecord::accept(TransactionContext* ctxt) {
if (!ended) {
if (acquired) {
queue->dequeue(ctxt, msg);
- } else if (delayedCompletion) {
+ } else if (isDelayedCompletion) {
//TODO: this is a nasty way to do this; change it
msg.payload->getIngressCompletion().finishCompleter();
QPID_LOG(debug, "Completed " << msg.payload.get());
diff --git a/qpid/cpp/src/qpid/broker/DeliveryRecord.h b/qpid/cpp/src/qpid/broker/DeliveryRecord.h
index 90e72aaf0d..ea33ed5461 100644
--- a/qpid/cpp/src/qpid/broker/DeliveryRecord.h
+++ b/qpid/cpp/src/qpid/broker/DeliveryRecord.h
@@ -63,7 +63,7 @@ class DeliveryRecord
* after that).
*/
uint32_t credit;
- bool delayedCompletion;
+ bool isDelayedCompletion;
public:
QPID_BROKER_EXTERN DeliveryRecord(const QueuedMessage& msg,
@@ -73,7 +73,7 @@ class DeliveryRecord
bool accepted,
bool windowing,
uint32_t credit=0, // Only used if msg is empty.
- bool delayedCompletion=false
+ bool isDelayedCompletion=false
);
bool coveredBy(const framing::SequenceSet* const range) const { return range->contains(id); }
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp
index 333b707308..de2b09660c 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.cpp
+++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp
@@ -25,9 +25,7 @@
#include "qpid/broker/DtxAck.h"
#include "qpid/broker/DtxTimeout.h"
#include "qpid/broker/Message.h"
-#include "qpid/ha/WiringReplicator.h"
#include "qpid/broker/Queue.h"
-#include "qpid/broker/ReplicatingSubscription.h"
#include "qpid/broker/SessionContext.h"
#include "qpid/broker/SessionOutputException.h"
#include "qpid/broker/TxAccept.h"
@@ -108,15 +106,25 @@ bool SemanticState::exists(const string& consumerTag){
namespace {
const std::string SEPARATOR("::");
}
-
+
void SemanticState::consume(const string& tag,
Queue::shared_ptr queue, bool ackRequired, bool acquire,
- bool exclusive, const string& resumeId, uint64_t resumeTtl, const FieldTable& arguments)
+ bool exclusive, const string& resumeId, uint64_t resumeTtl,
+ const FieldTable& arguments)
{
// "tag" is only guaranteed to be unique to this session (see AMQP 0-10 Message.subscribe, destination).
// Create a globally unique name so the broker can identify individual consumers
std::string name = session.getSessionId().str() + SEPARATOR + tag;
- ConsumerImpl::shared_ptr c(ConsumerImpl::create(this, name, queue, ackRequired, acquire, exclusive, tag, resumeId, resumeTtl, arguments));
+ const ConsumerFactories::Factories& cf(
+ session.getBroker().getConsumerFactories().get());
+ ConsumerImpl::shared_ptr c;
+ for (ConsumerFactories::Factories::const_iterator i = cf.begin(); i != cf.end(); !c)
+ c = (*i)->create(this, name, queue, ackRequired, acquire, exclusive, tag,
+ resumeId, resumeTtl, arguments);
+ if (!c) // Create plain consumer
+ c = ConsumerImpl::shared_ptr(
+ new ConsumerImpl(this, name, queue, ackRequired, acquire, exclusive, tag,
+ resumeId, resumeTtl, arguments));
queue->consume(c, exclusive);//may throw exception
consumers[tag] = c;
}
@@ -266,26 +274,6 @@ void SemanticState::record(const DeliveryRecord& delivery)
const std::string QPID_SYNC_FREQUENCY("qpid.sync_frequency");
-SemanticState::ConsumerImpl::shared_ptr SemanticState::ConsumerImpl::create(SemanticState* parent,
- const string& name,
- Queue::shared_ptr queue,
- bool ack,
- bool acquire,
- bool exclusive,
- const string& tag,
- const string& resumeId,
- uint64_t resumeTtl,
- const framing::FieldTable& arguments)
-{
- if (arguments.isSet("qpid.replicating-subscription")) {
- shared_ptr result(new ReplicatingSubscription(parent, name, queue, ack, acquire, exclusive, tag, resumeId, resumeTtl, arguments));
- boost::dynamic_pointer_cast<ReplicatingSubscription>(result)->init();
- return result;
- } else {
- return shared_ptr(new ConsumerImpl(parent, name, queue, ack, acquire, exclusive, tag, resumeId, resumeTtl, arguments));
- }
-}
-
SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent,
const string& _name,
Queue::shared_ptr _queue,
@@ -297,7 +285,6 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent,
uint64_t _resumeTtl,
const framing::FieldTable& _arguments
-
) :
Consumer(_name, _acquire),
parent(_parent),
@@ -354,7 +341,7 @@ bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg)
{
assertClusterSafe();
allocateCredit(msg.payload);
- DeliveryRecord record(msg, msg.queue->shared_from_this(), getTag(), acquire, !ackExpected, credit.isWindowMode(), 0, dynamic_cast<const ReplicatingSubscription*>(this));
+ DeliveryRecord record(msg, msg.queue->shared_from_this(), getTag(), acquire, !ackExpected, credit.isWindowMode(), 0, isDelayedCompletion());
bool sync = syncFrequency && ++deliveryCount >= syncFrequency;
if (sync) deliveryCount = 0;//reset
parent->deliver(record, sync);
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.h b/qpid/cpp/src/qpid/broker/SemanticState.h
index ec4bcb756c..9b81767fb6 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.h
+++ b/qpid/cpp/src/qpid/broker/SemanticState.h
@@ -153,11 +153,10 @@ class SemanticState : private boost::noncopyable {
management::ManagementObject* GetManagementObject (void) const;
management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args, std::string& text);
- static shared_ptr create(SemanticState* parent,
- const std::string& name, boost::shared_ptr<Queue> queue,
- bool ack, bool acquire, bool exclusive, const std::string& tag,
- const std::string& resumeId, uint64_t resumeTtl, const framing::FieldTable& arguments);
-
+ /** This consumer wants delayed completion.
+ * Overridden by ConsumerImpl subclasses.
+ */
+ virtual bool isDelayedCompletion() const { return false; }
};
typedef std::map<std::string, DtxBuffer::shared_ptr> DtxBufferMap;
diff --git a/qpid/cpp/src/qpid/broker/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
index 4041e6ac7a..873d3784a7 100644
--- a/qpid/cpp/src/qpid/broker/ReplicatingSubscription.cpp
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
@@ -20,18 +20,19 @@
*/
#include "ReplicatingSubscription.h"
-#include "Queue.h"
+#include "qpid/broker/Queue.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/log/Statement.h"
namespace qpid {
-namespace broker {
+namespace ha {
using namespace framing;
+using namespace broker;
const std::string DOLLAR("$");
-const std::string INTERNAL("_internall");
+const std::string INTERNAL("_internal");
class ReplicationStateInitialiser
{
diff --git a/qpid/cpp/src/qpid/broker/ReplicatingSubscription.h b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
index a79c712e85..a1c10a7641 100644
--- a/qpid/cpp/src/qpid/broker/ReplicatingSubscription.h
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
@@ -23,35 +23,48 @@
*/
#include "qpid/broker/SemanticState.h"
+#include "qpid/broker/QueueObserver.h"
namespace qpid {
+
namespace broker {
+class Message;
+class Queue;
+class QueuedMessage;
+class OwnershipToken;
+}
+
+namespace ha {
/**
* Subscriber to a remote queue that replicates to a local queue.
*/
-class ReplicatingSubscription : public SemanticState::ConsumerImpl, public QueueObserver
+class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl,
+ public broker::QueueObserver
{
public:
- ReplicatingSubscription(SemanticState* parent,
- const std::string& name, boost::shared_ptr<Queue> queue,
+ ReplicatingSubscription(broker::SemanticState* parent,
+ const std::string& name, boost::shared_ptr<broker::Queue> ,
bool ack, bool acquire, bool exclusive, const std::string& tag,
- const std::string& resumeId, uint64_t resumeTtl, const framing::FieldTable& arguments);
+ const std::string& resumeId, uint64_t resumeTtl,
+ const framing::FieldTable& arguments);
~ReplicatingSubscription();
void init();
void cancel();
- bool deliver(QueuedMessage& msg);
- void enqueued(const QueuedMessage&);
- void dequeued(const QueuedMessage&);
- void acquired(const QueuedMessage&) {}
- void requeued(const QueuedMessage&) {}
+ bool deliver(broker::QueuedMessage& msg);
+ void enqueued(const broker::QueuedMessage&);
+ void dequeued(const broker::QueuedMessage&);
+ void acquired(const broker::QueuedMessage&) {}
+ void requeued(const broker::QueuedMessage&) {}
+
+ bool isDelayedCompletion() const { return true; }
protected:
bool doDispatch();
private:
- boost::shared_ptr<Queue> events;
- boost::shared_ptr<Consumer> consumer;
+ boost::shared_ptr<broker::Queue> events;
+ boost::shared_ptr<broker::Consumer> consumer;
qpid::framing::SequenceSet range;
void generateDequeueEvent();
@@ -60,7 +73,7 @@ class ReplicatingSubscription : public SemanticState::ConsumerImpl, public Queue
public:
DelegatingConsumer(ReplicatingSubscription&);
~DelegatingConsumer();
- bool deliver(QueuedMessage& msg);
+ bool deliver(broker::QueuedMessage& msg);
void notify();
bool filter(boost::intrusive_ptr<Message>);
bool accept(boost::intrusive_ptr<Message>);