summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-02-14 15:59:26 +0000
committerAlan Conway <aconway@apache.org>2012-02-14 15:59:26 +0000
commit36e6a8b0a5bf229d07edfc7cd207ed168a03cdd4 (patch)
treef47d2b10c5e7b44b0accbfb298c81110e704c3fd
parent06d591b82d902829e4b2d9ccc22788c41ba85cf3 (diff)
downloadqpid-python-36e6a8b0a5bf229d07edfc7cd207ed168a03cdd4.tar.gz
QPID-3603: Move class ReplicatingSubscription into its own files.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3603-6@1244021 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/Makefile.am2
-rw-r--r--qpid/cpp/src/qpid/broker/ReplicatingSubscription.cpp214
-rw-r--r--qpid/cpp/src/qpid/broker/ReplicatingSubscription.h77
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.cpp230
4 files changed, 294 insertions, 229 deletions
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am
index 825d4374c2..dcffb913d4 100644
--- a/qpid/cpp/src/Makefile.am
+++ b/qpid/cpp/src/Makefile.am
@@ -633,6 +633,8 @@ libqpidbroker_la_SOURCES = \
qpid/broker/QueueFlowLimit.cpp \
qpid/broker/QueueReplicator.h \
qpid/broker/QueueReplicator.cpp \
+ qpid/broker/ReplicatingSubscription.h \
+ qpid/broker/ReplicatingSubscription.cpp \
qpid/broker/RateFlowcontrol.h \
qpid/broker/RecoverableConfig.h \
qpid/broker/RecoverableExchange.h \
diff --git a/qpid/cpp/src/qpid/broker/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/broker/ReplicatingSubscription.cpp
new file mode 100644
index 0000000000..4041e6ac7a
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/ReplicatingSubscription.cpp
@@ -0,0 +1,214 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "ReplicatingSubscription.h"
+#include "Queue.h"
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/log/Statement.h"
+
+namespace qpid {
+namespace broker {
+
+using namespace framing;
+
+const std::string DOLLAR("$");
+const std::string INTERNAL("_internall");
+
+class ReplicationStateInitialiser
+{
+ public:
+ ReplicationStateInitialiser(
+ qpid::framing::SequenceSet& r,
+ const qpid::framing::SequenceNumber& s,
+ const qpid::framing::SequenceNumber& e) : results(r), start(s), end(e)
+ {
+ results.add(start, end);
+ }
+
+ void operator()(const QueuedMessage& message) {
+ if (message.position < start) {
+ //replica does not have a message that should still be on the queue
+ QPID_LOG(warning, "Replica appears to be missing message at " << message.position);
+ } else if (message.position >= start && message.position <= end) {
+ //i.e. message is within the intial range and has not been dequeued, so remove it from the results
+ results.remove(message.position);
+ } //else message has not been seen by replica yet so can be ignored here
+ }
+
+ private:
+ qpid::framing::SequenceSet& results;
+ const qpid::framing::SequenceNumber start;
+ const qpid::framing::SequenceNumber end;
+};
+
+std::string mask(const std::string& in)
+{
+ return DOLLAR + in + INTERNAL;
+}
+
+ReplicatingSubscription::ReplicatingSubscription(
+ SemanticState* _parent,
+ const std::string& _name,
+ Queue::shared_ptr _queue,
+ bool ack,
+ bool _acquire,
+ bool _exclusive,
+ const std::string& _tag,
+ const std::string& _resumeId,
+ uint64_t _resumeTtl,
+ const framing::FieldTable& _arguments
+) : ConsumerImpl(_parent, _name, _queue, ack, _acquire, _exclusive, _tag, _resumeId, _resumeTtl, _arguments),
+ events(new Queue(mask(_name))),
+ consumer(new DelegatingConsumer(*this))
+{
+
+ if (_arguments.isSet("qpid.high_sequence_number")) {
+ qpid::framing::SequenceNumber hwm = _arguments.getAsInt("qpid.high_sequence_number");
+ qpid::framing::SequenceNumber lwm;
+ if (_arguments.isSet("qpid.low_sequence_number")) {
+ lwm = _arguments.getAsInt("qpid.low_sequence_number");
+ } else {
+ lwm = hwm;
+ }
+ qpid::framing::SequenceNumber oldest;
+ if (_queue->getOldest(oldest)) {
+ if (oldest >= hwm) {
+ range.add(lwm, --oldest);
+ } else if (oldest >= lwm) {
+ ReplicationStateInitialiser initialiser(range, lwm, hwm);
+ _queue->eachMessage(initialiser);
+ } else { //i.e. have older message on master than is reported to exist on replica
+ QPID_LOG(warning, "Replica appears to be missing message on master");
+ }
+ } else {
+ //local queue (i.e. master) is empty
+ range.add(lwm, _queue->getPosition());
+ }
+ QPID_LOG(debug, "Initial set of dequeues for " << _queue->getName() << " are " << range
+ << " (lwm=" << lwm << ", hwm=" << hwm << ", current=" << _queue->getPosition() << ")");
+ //set position of 'cursor'
+ position = hwm;
+ }
+}
+
+bool ReplicatingSubscription::deliver(QueuedMessage& m)
+{
+ return ConsumerImpl::deliver(m);
+}
+
+void ReplicatingSubscription::init()
+{
+ getQueue()->addObserver(boost::dynamic_pointer_cast<QueueObserver>(shared_from_this()));
+}
+
+void ReplicatingSubscription::cancel()
+{
+ getQueue()->removeObserver(boost::dynamic_pointer_cast<QueueObserver>(shared_from_this()));
+}
+
+ReplicatingSubscription::~ReplicatingSubscription() {}
+
+//called before we get notified of the message being available and
+//under the message lock in the queue
+void ReplicatingSubscription::enqueued(const QueuedMessage& m)
+{
+ QPID_LOG(debug, "Enqueued message at " << m.position);
+ //delay completion
+ m.payload->getIngressCompletion().startCompleter();
+ QPID_LOG(debug, "Delayed " << m.payload.get());
+}
+
+void ReplicatingSubscription::generateDequeueEvent()
+{
+ std::string buf(range.encodedSize(),'\0');
+ framing::Buffer buffer(&buf[0], buf.size());
+ range.encode(buffer);
+ range.clear();
+ buffer.reset();
+
+ //generate event message
+ boost::intrusive_ptr<Message> event = new Message();
+ AMQFrame method((MessageTransferBody(ProtocolVersion(), std::string(), 0, 0)));
+ AMQFrame header((AMQHeaderBody()));
+ AMQFrame content((AMQContentBody()));
+ content.castBody<AMQContentBody>()->decode(buffer, buffer.getSize());
+ header.setBof(false);
+ header.setEof(false);
+ header.setBos(true);
+ header.setEos(true);
+ content.setBof(false);
+ content.setEof(true);
+ content.setBos(true);
+ content.setEos(true);
+ event->getFrames().append(method);
+ event->getFrames().append(header);
+ event->getFrames().append(content);
+
+ DeliveryProperties* props = event->getFrames().getHeaders()->get<DeliveryProperties>(true);
+ props->setRoutingKey("dequeue-event");
+
+ events->deliver(event);
+}
+
+//called after the message has been removed from the deque and under
+//the message lock in the queue
+void ReplicatingSubscription::dequeued(const QueuedMessage& m)
+{
+ {
+ sys::Mutex::ScopedLock l(lock);
+ range.add(m.position);
+ QPID_LOG(debug, "Updated dequeue event to include message at " << m.position << "; subscription is at " << position);
+ }
+ notify();
+ if (m.position > position) {
+ m.payload->getIngressCompletion().finishCompleter();
+ QPID_LOG(debug, "Completed " << m.payload.get() << " early due to dequeue");
+ }
+}
+
+bool ReplicatingSubscription::doDispatch()
+{
+ {
+ sys::Mutex::ScopedLock l(lock);
+ if (!range.empty()) {
+ generateDequeueEvent();
+ }
+ }
+ bool r1 = events->dispatch(consumer);
+ bool r2 = ConsumerImpl::doDispatch();
+ return r1 || r2;
+}
+
+ReplicatingSubscription::DelegatingConsumer::DelegatingConsumer(ReplicatingSubscription& c) : Consumer(c.getName(), true), delegate(c) {}
+ReplicatingSubscription::DelegatingConsumer::~DelegatingConsumer() {}
+bool ReplicatingSubscription::DelegatingConsumer::deliver(QueuedMessage& m)
+{
+ return delegate.deliver(m);
+}
+void ReplicatingSubscription::DelegatingConsumer::notify() { delegate.notify(); }
+bool ReplicatingSubscription::DelegatingConsumer::filter(boost::intrusive_ptr<Message> msg) { return delegate.filter(msg); }
+bool ReplicatingSubscription::DelegatingConsumer::accept(boost::intrusive_ptr<Message> msg) { return delegate.accept(msg); }
+void ReplicatingSubscription::DelegatingConsumer::cancel() {}
+OwnershipToken* ReplicatingSubscription::DelegatingConsumer::getSession() { return delegate.getSession(); }
+
+
+}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/ReplicatingSubscription.h b/qpid/cpp/src/qpid/broker/ReplicatingSubscription.h
new file mode 100644
index 0000000000..a79c712e85
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/ReplicatingSubscription.h
@@ -0,0 +1,77 @@
+#ifndef QPID_BROKER_REPLICATINGSUBSCRIPTION_H
+#define QPID_BROKER_REPLICATINGSUBSCRIPTION_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/broker/SemanticState.h"
+
+namespace qpid {
+namespace broker {
+
+/**
+ * Subscriber to a remote queue that replicates to a local queue.
+ */
+class ReplicatingSubscription : public SemanticState::ConsumerImpl, public QueueObserver
+{
+ public:
+ ReplicatingSubscription(SemanticState* parent,
+ const std::string& name, boost::shared_ptr<Queue> queue,
+ bool ack, bool acquire, bool exclusive, const std::string& tag,
+ const std::string& resumeId, uint64_t resumeTtl, const framing::FieldTable& arguments);
+ ~ReplicatingSubscription();
+
+ void init();
+ void cancel();
+ bool deliver(QueuedMessage& msg);
+ void enqueued(const QueuedMessage&);
+ void dequeued(const QueuedMessage&);
+ void acquired(const QueuedMessage&) {}
+ void requeued(const QueuedMessage&) {}
+
+ protected:
+ bool doDispatch();
+ private:
+ boost::shared_ptr<Queue> events;
+ boost::shared_ptr<Consumer> consumer;
+ qpid::framing::SequenceSet range;
+
+ void generateDequeueEvent();
+ class DelegatingConsumer : public Consumer
+ {
+ public:
+ DelegatingConsumer(ReplicatingSubscription&);
+ ~DelegatingConsumer();
+ bool deliver(QueuedMessage& msg);
+ void notify();
+ bool filter(boost::intrusive_ptr<Message>);
+ bool accept(boost::intrusive_ptr<Message>);
+ void cancel();
+ OwnershipToken* getSession();
+ private:
+ ReplicatingSubscription& delegate;
+ };
+};
+
+
+}} // namespace qpid::broker
+
+#endif /*!QPID_BROKER_REPLICATINGSUBSCRIPTION_H*/
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp
index 57a5e1879e..941096702c 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.cpp
+++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp
@@ -28,6 +28,7 @@
#include "qpid/broker/NodeClone.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/QueueReplicator.h"
+#include "qpid/broker/ReplicatingSubscription.h"
#include "qpid/broker/SessionContext.h"
#include "qpid/broker/SessionOutputException.h"
#include "qpid/broker/TxAccept.h"
@@ -266,47 +267,6 @@ void SemanticState::record(const DeliveryRecord& delivery)
const std::string QPID_SYNC_FREQUENCY("qpid.sync_frequency");
-class ReplicatingSubscription : public SemanticState::ConsumerImpl, public QueueObserver
-{
- public:
- ReplicatingSubscription(SemanticState* parent,
- const std::string& name, boost::shared_ptr<Queue> queue,
- bool ack, bool acquire, bool exclusive, const std::string& tag,
- const std::string& resumeId, uint64_t resumeTtl, const framing::FieldTable& arguments);
- ~ReplicatingSubscription();
-
- void init();
- void cancel();
- bool deliver(QueuedMessage& msg);
- void enqueued(const QueuedMessage&);
- void dequeued(const QueuedMessage&);
- void acquired(const QueuedMessage&) {}
- void requeued(const QueuedMessage&) {}
-
- protected:
- bool doDispatch();
- private:
- boost::shared_ptr<Queue> events;
- boost::shared_ptr<Consumer> consumer;
- qpid::framing::SequenceSet range;
-
- void generateDequeueEvent();
- class DelegatingConsumer : public Consumer
- {
- public:
- DelegatingConsumer(ReplicatingSubscription&);
- ~DelegatingConsumer();
- bool deliver(QueuedMessage& msg);
- void notify();
- bool filter(boost::intrusive_ptr<Message>);
- bool accept(boost::intrusive_ptr<Message>);
- void cancel() {}
- OwnershipToken* getSession();
- private:
- ReplicatingSubscription& delegate;
- };
-};
-
SemanticState::ConsumerImpl::shared_ptr SemanticState::ConsumerImpl::create(SemanticState* parent,
const string& name,
Queue::shared_ptr queue,
@@ -327,163 +287,6 @@ SemanticState::ConsumerImpl::shared_ptr SemanticState::ConsumerImpl::create(Sema
}
}
-std::string mask(const std::string& in)
-{
- return std::string("$") + in + std::string("_internal");
-}
-
-class ReplicationStateInitialiser
-{
- public:
- ReplicationStateInitialiser(qpid::framing::SequenceSet& results,
- const qpid::framing::SequenceNumber& start,
- const qpid::framing::SequenceNumber& end);
- void operator()(const QueuedMessage& m) { process(m); }
- private:
- qpid::framing::SequenceSet& results;
- const qpid::framing::SequenceNumber start;
- const qpid::framing::SequenceNumber end;
- void process(const QueuedMessage&);
-};
-
-ReplicatingSubscription::ReplicatingSubscription(SemanticState* _parent,
- const string& _name,
- Queue::shared_ptr _queue,
- bool ack,
- bool _acquire,
- bool _exclusive,
- const string& _tag,
- const string& _resumeId,
- uint64_t _resumeTtl,
- const framing::FieldTable& _arguments
-) : ConsumerImpl(_parent, _name, _queue, ack, _acquire, _exclusive, _tag, _resumeId, _resumeTtl, _arguments),
- events(new Queue(mask(_name))),
- consumer(new DelegatingConsumer(*this))
-{
-
- if (_arguments.isSet("qpid.high_sequence_number")) {
- qpid::framing::SequenceNumber hwm = _arguments.getAsInt("qpid.high_sequence_number");
- qpid::framing::SequenceNumber lwm;
- if (_arguments.isSet("qpid.low_sequence_number")) {
- lwm = _arguments.getAsInt("qpid.low_sequence_number");
- } else {
- lwm = hwm;
- }
- qpid::framing::SequenceNumber oldest;
- if (_queue->getOldest(oldest)) {
- if (oldest >= hwm) {
- range.add(lwm, --oldest);
- } else if (oldest >= lwm) {
- ReplicationStateInitialiser initialiser(range, lwm, hwm);
- _queue->eachMessage(initialiser);
- } else { //i.e. have older message on master than is reported to exist on replica
- QPID_LOG(warning, "Replica appears to be missing message on master");
- }
- } else {
- //local queue (i.e. master) is empty
- range.add(lwm, _queue->getPosition());
- }
- QPID_LOG(debug, "Initial set of dequeues for " << _queue->getName() << " are " << range
- << " (lwm=" << lwm << ", hwm=" << hwm << ", current=" << _queue->getPosition() << ")");
- //set position of 'cursor'
- position = hwm;
- }
-}
-
-bool ReplicatingSubscription::deliver(QueuedMessage& m)
-{
- return ConsumerImpl::deliver(m);
-}
-
-void ReplicatingSubscription::init()
-{
- getQueue()->addObserver(boost::dynamic_pointer_cast<QueueObserver>(shared_from_this()));
-}
-
-void ReplicatingSubscription::cancel()
-{
- getQueue()->removeObserver(boost::dynamic_pointer_cast<QueueObserver>(shared_from_this()));
-}
-
-ReplicatingSubscription::~ReplicatingSubscription() {}
-
-//called before we get notified of the message being available and
-//under the message lock in the queue
-void ReplicatingSubscription::enqueued(const QueuedMessage& m)
-{
- QPID_LOG(debug, "Enqueued message at " << m.position);
- //delay completion
- m.payload->getIngressCompletion().startCompleter();
- QPID_LOG(debug, "Delayed " << m.payload.get());
-}
-
-class Buffer : public qpid::framing::Buffer
-{
- public:
- Buffer(size_t size) : qpid::framing::Buffer(new char[size], size) {}
- ~Buffer() { delete[] getPointer(); }
-};
-
-void ReplicatingSubscription::generateDequeueEvent()
-{
- Buffer buffer(range.encodedSize());
- range.encode(buffer);
- range.clear();
- buffer.reset();
-
- //generate event message
- boost::intrusive_ptr<Message> event = new Message();
- AMQFrame method((MessageTransferBody(ProtocolVersion(), std::string(), 0, 0)));
- AMQFrame header((AMQHeaderBody()));
- AMQFrame content((AMQContentBody()));
- content.castBody<AMQContentBody>()->decode(buffer, buffer.getSize());
- header.setBof(false);
- header.setEof(false);
- header.setBos(true);
- header.setEos(true);
- content.setBof(false);
- content.setEof(true);
- content.setBos(true);
- content.setEos(true);
- event->getFrames().append(method);
- event->getFrames().append(header);
- event->getFrames().append(content);
-
- DeliveryProperties* props = event->getFrames().getHeaders()->get<DeliveryProperties>(true);
- props->setRoutingKey("dequeue-event");
-
- events->deliver(event);
-}
-
-//called after the message has been removed from the deque and under
-//the message lock in the queue
-void ReplicatingSubscription::dequeued(const QueuedMessage& m)
-{
- {
- Mutex::ScopedLock l(lock);
- range.add(m.position);
- QPID_LOG(debug, "Updated dequeue event to include message at " << m.position << "; subscription is at " << position);
- }
- notify();
- if (m.position > position) {
- m.payload->getIngressCompletion().finishCompleter();
- QPID_LOG(debug, "Completed " << m.payload.get() << " early due to dequeue");
- }
-}
-
-bool ReplicatingSubscription::doDispatch()
-{
- {
- Mutex::ScopedLock l(lock);
- if (!range.empty()) {
- generateDequeueEvent();
- }
- }
- bool r1 = events->dispatch(consumer);
- bool r2 = ConsumerImpl::doDispatch();
- return r1 || r2;
-}
-
SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent,
const string& _name,
Queue::shared_ptr _queue,
@@ -1048,35 +851,4 @@ void SemanticState::detached()
}
}
-ReplicatingSubscription::DelegatingConsumer::DelegatingConsumer(ReplicatingSubscription& c) : Consumer(c.getName(), true), delegate(c) {}
-ReplicatingSubscription::DelegatingConsumer::~DelegatingConsumer() {}
-bool ReplicatingSubscription::DelegatingConsumer::deliver(QueuedMessage& m)
-{
- return delegate.deliver(m);
-}
-void ReplicatingSubscription::DelegatingConsumer::notify() { delegate.notify(); }
-bool ReplicatingSubscription::DelegatingConsumer::filter(boost::intrusive_ptr<Message> msg) { return delegate.filter(msg); }
-bool ReplicatingSubscription::DelegatingConsumer::accept(boost::intrusive_ptr<Message> msg) { return delegate.accept(msg); }
-OwnershipToken* ReplicatingSubscription::DelegatingConsumer::getSession() { return delegate.getSession(); }
-
-ReplicationStateInitialiser::ReplicationStateInitialiser(qpid::framing::SequenceSet& r,
- const qpid::framing::SequenceNumber& s,
- const qpid::framing::SequenceNumber& e)
- : results(r), start(s), end(e)
-{
- results.add(start, end);
-}
-
-void ReplicationStateInitialiser::process(const QueuedMessage& message)
-{
- if (message.position < start) {
- //replica does not have a message that should still be on the queue
- QPID_LOG(warning, "Replica appears to be missing message at " << message.position);
- } else if (message.position >= start && message.position <= end) {
- //i.e. message is within the intial range and has not been dequeued, so remove it from the results
- results.remove(message.position);
- } //else message has not been seen by replica yet so can be ignored here
-
-}
-
}} // namespace qpid::broker