diff options
author | Alan Conway <aconway@apache.org> | 2012-02-14 15:59:26 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2012-02-14 15:59:26 +0000 |
commit | 36e6a8b0a5bf229d07edfc7cd207ed168a03cdd4 (patch) | |
tree | f47d2b10c5e7b44b0accbfb298c81110e704c3fd | |
parent | 06d591b82d902829e4b2d9ccc22788c41ba85cf3 (diff) | |
download | qpid-python-36e6a8b0a5bf229d07edfc7cd207ed168a03cdd4.tar.gz |
QPID-3603: Move class ReplicatingSubscription into its own files.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3603-6@1244021 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/Makefile.am | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/ReplicatingSubscription.cpp | 214 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/ReplicatingSubscription.h | 77 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SemanticState.cpp | 230 |
4 files changed, 294 insertions, 229 deletions
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am index 825d4374c2..dcffb913d4 100644 --- a/qpid/cpp/src/Makefile.am +++ b/qpid/cpp/src/Makefile.am @@ -633,6 +633,8 @@ libqpidbroker_la_SOURCES = \ qpid/broker/QueueFlowLimit.cpp \ qpid/broker/QueueReplicator.h \ qpid/broker/QueueReplicator.cpp \ + qpid/broker/ReplicatingSubscription.h \ + qpid/broker/ReplicatingSubscription.cpp \ qpid/broker/RateFlowcontrol.h \ qpid/broker/RecoverableConfig.h \ qpid/broker/RecoverableExchange.h \ diff --git a/qpid/cpp/src/qpid/broker/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/broker/ReplicatingSubscription.cpp new file mode 100644 index 0000000000..4041e6ac7a --- /dev/null +++ b/qpid/cpp/src/qpid/broker/ReplicatingSubscription.cpp @@ -0,0 +1,214 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "ReplicatingSubscription.h" +#include "Queue.h" +#include "qpid/framing/AMQFrame.h" +#include "qpid/framing/MessageTransferBody.h" +#include "qpid/log/Statement.h" + +namespace qpid { +namespace broker { + +using namespace framing; + +const std::string DOLLAR("$"); +const std::string INTERNAL("_internall"); + +class ReplicationStateInitialiser +{ + public: + ReplicationStateInitialiser( + qpid::framing::SequenceSet& r, + const qpid::framing::SequenceNumber& s, + const qpid::framing::SequenceNumber& e) : results(r), start(s), end(e) + { + results.add(start, end); + } + + void operator()(const QueuedMessage& message) { + if (message.position < start) { + //replica does not have a message that should still be on the queue + QPID_LOG(warning, "Replica appears to be missing message at " << message.position); + } else if (message.position >= start && message.position <= end) { + //i.e. message is within the intial range and has not been dequeued, so remove it from the results + results.remove(message.position); + } //else message has not been seen by replica yet so can be ignored here + } + + private: + qpid::framing::SequenceSet& results; + const qpid::framing::SequenceNumber start; + const qpid::framing::SequenceNumber end; +}; + +std::string mask(const std::string& in) +{ + return DOLLAR + in + INTERNAL; +} + +ReplicatingSubscription::ReplicatingSubscription( + SemanticState* _parent, + const std::string& _name, + Queue::shared_ptr _queue, + bool ack, + bool _acquire, + bool _exclusive, + const std::string& _tag, + const std::string& _resumeId, + uint64_t _resumeTtl, + const framing::FieldTable& _arguments +) : ConsumerImpl(_parent, _name, _queue, ack, _acquire, _exclusive, _tag, _resumeId, _resumeTtl, _arguments), + events(new Queue(mask(_name))), + consumer(new DelegatingConsumer(*this)) +{ + + if (_arguments.isSet("qpid.high_sequence_number")) { + qpid::framing::SequenceNumber hwm = _arguments.getAsInt("qpid.high_sequence_number"); + qpid::framing::SequenceNumber lwm; + if (_arguments.isSet("qpid.low_sequence_number")) { + lwm = _arguments.getAsInt("qpid.low_sequence_number"); + } else { + lwm = hwm; + } + qpid::framing::SequenceNumber oldest; + if (_queue->getOldest(oldest)) { + if (oldest >= hwm) { + range.add(lwm, --oldest); + } else if (oldest >= lwm) { + ReplicationStateInitialiser initialiser(range, lwm, hwm); + _queue->eachMessage(initialiser); + } else { //i.e. have older message on master than is reported to exist on replica + QPID_LOG(warning, "Replica appears to be missing message on master"); + } + } else { + //local queue (i.e. master) is empty + range.add(lwm, _queue->getPosition()); + } + QPID_LOG(debug, "Initial set of dequeues for " << _queue->getName() << " are " << range + << " (lwm=" << lwm << ", hwm=" << hwm << ", current=" << _queue->getPosition() << ")"); + //set position of 'cursor' + position = hwm; + } +} + +bool ReplicatingSubscription::deliver(QueuedMessage& m) +{ + return ConsumerImpl::deliver(m); +} + +void ReplicatingSubscription::init() +{ + getQueue()->addObserver(boost::dynamic_pointer_cast<QueueObserver>(shared_from_this())); +} + +void ReplicatingSubscription::cancel() +{ + getQueue()->removeObserver(boost::dynamic_pointer_cast<QueueObserver>(shared_from_this())); +} + +ReplicatingSubscription::~ReplicatingSubscription() {} + +//called before we get notified of the message being available and +//under the message lock in the queue +void ReplicatingSubscription::enqueued(const QueuedMessage& m) +{ + QPID_LOG(debug, "Enqueued message at " << m.position); + //delay completion + m.payload->getIngressCompletion().startCompleter(); + QPID_LOG(debug, "Delayed " << m.payload.get()); +} + +void ReplicatingSubscription::generateDequeueEvent() +{ + std::string buf(range.encodedSize(),'\0'); + framing::Buffer buffer(&buf[0], buf.size()); + range.encode(buffer); + range.clear(); + buffer.reset(); + + //generate event message + boost::intrusive_ptr<Message> event = new Message(); + AMQFrame method((MessageTransferBody(ProtocolVersion(), std::string(), 0, 0))); + AMQFrame header((AMQHeaderBody())); + AMQFrame content((AMQContentBody())); + content.castBody<AMQContentBody>()->decode(buffer, buffer.getSize()); + header.setBof(false); + header.setEof(false); + header.setBos(true); + header.setEos(true); + content.setBof(false); + content.setEof(true); + content.setBos(true); + content.setEos(true); + event->getFrames().append(method); + event->getFrames().append(header); + event->getFrames().append(content); + + DeliveryProperties* props = event->getFrames().getHeaders()->get<DeliveryProperties>(true); + props->setRoutingKey("dequeue-event"); + + events->deliver(event); +} + +//called after the message has been removed from the deque and under +//the message lock in the queue +void ReplicatingSubscription::dequeued(const QueuedMessage& m) +{ + { + sys::Mutex::ScopedLock l(lock); + range.add(m.position); + QPID_LOG(debug, "Updated dequeue event to include message at " << m.position << "; subscription is at " << position); + } + notify(); + if (m.position > position) { + m.payload->getIngressCompletion().finishCompleter(); + QPID_LOG(debug, "Completed " << m.payload.get() << " early due to dequeue"); + } +} + +bool ReplicatingSubscription::doDispatch() +{ + { + sys::Mutex::ScopedLock l(lock); + if (!range.empty()) { + generateDequeueEvent(); + } + } + bool r1 = events->dispatch(consumer); + bool r2 = ConsumerImpl::doDispatch(); + return r1 || r2; +} + +ReplicatingSubscription::DelegatingConsumer::DelegatingConsumer(ReplicatingSubscription& c) : Consumer(c.getName(), true), delegate(c) {} +ReplicatingSubscription::DelegatingConsumer::~DelegatingConsumer() {} +bool ReplicatingSubscription::DelegatingConsumer::deliver(QueuedMessage& m) +{ + return delegate.deliver(m); +} +void ReplicatingSubscription::DelegatingConsumer::notify() { delegate.notify(); } +bool ReplicatingSubscription::DelegatingConsumer::filter(boost::intrusive_ptr<Message> msg) { return delegate.filter(msg); } +bool ReplicatingSubscription::DelegatingConsumer::accept(boost::intrusive_ptr<Message> msg) { return delegate.accept(msg); } +void ReplicatingSubscription::DelegatingConsumer::cancel() {} +OwnershipToken* ReplicatingSubscription::DelegatingConsumer::getSession() { return delegate.getSession(); } + + +}} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/ReplicatingSubscription.h b/qpid/cpp/src/qpid/broker/ReplicatingSubscription.h new file mode 100644 index 0000000000..a79c712e85 --- /dev/null +++ b/qpid/cpp/src/qpid/broker/ReplicatingSubscription.h @@ -0,0 +1,77 @@ +#ifndef QPID_BROKER_REPLICATINGSUBSCRIPTION_H +#define QPID_BROKER_REPLICATINGSUBSCRIPTION_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/broker/SemanticState.h" + +namespace qpid { +namespace broker { + +/** + * Subscriber to a remote queue that replicates to a local queue. + */ +class ReplicatingSubscription : public SemanticState::ConsumerImpl, public QueueObserver +{ + public: + ReplicatingSubscription(SemanticState* parent, + const std::string& name, boost::shared_ptr<Queue> queue, + bool ack, bool acquire, bool exclusive, const std::string& tag, + const std::string& resumeId, uint64_t resumeTtl, const framing::FieldTable& arguments); + ~ReplicatingSubscription(); + + void init(); + void cancel(); + bool deliver(QueuedMessage& msg); + void enqueued(const QueuedMessage&); + void dequeued(const QueuedMessage&); + void acquired(const QueuedMessage&) {} + void requeued(const QueuedMessage&) {} + + protected: + bool doDispatch(); + private: + boost::shared_ptr<Queue> events; + boost::shared_ptr<Consumer> consumer; + qpid::framing::SequenceSet range; + + void generateDequeueEvent(); + class DelegatingConsumer : public Consumer + { + public: + DelegatingConsumer(ReplicatingSubscription&); + ~DelegatingConsumer(); + bool deliver(QueuedMessage& msg); + void notify(); + bool filter(boost::intrusive_ptr<Message>); + bool accept(boost::intrusive_ptr<Message>); + void cancel(); + OwnershipToken* getSession(); + private: + ReplicatingSubscription& delegate; + }; +}; + + +}} // namespace qpid::broker + +#endif /*!QPID_BROKER_REPLICATINGSUBSCRIPTION_H*/ diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp index 57a5e1879e..941096702c 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.cpp +++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp @@ -28,6 +28,7 @@ #include "qpid/broker/NodeClone.h" #include "qpid/broker/Queue.h" #include "qpid/broker/QueueReplicator.h" +#include "qpid/broker/ReplicatingSubscription.h" #include "qpid/broker/SessionContext.h" #include "qpid/broker/SessionOutputException.h" #include "qpid/broker/TxAccept.h" @@ -266,47 +267,6 @@ void SemanticState::record(const DeliveryRecord& delivery) const std::string QPID_SYNC_FREQUENCY("qpid.sync_frequency"); -class ReplicatingSubscription : public SemanticState::ConsumerImpl, public QueueObserver -{ - public: - ReplicatingSubscription(SemanticState* parent, - const std::string& name, boost::shared_ptr<Queue> queue, - bool ack, bool acquire, bool exclusive, const std::string& tag, - const std::string& resumeId, uint64_t resumeTtl, const framing::FieldTable& arguments); - ~ReplicatingSubscription(); - - void init(); - void cancel(); - bool deliver(QueuedMessage& msg); - void enqueued(const QueuedMessage&); - void dequeued(const QueuedMessage&); - void acquired(const QueuedMessage&) {} - void requeued(const QueuedMessage&) {} - - protected: - bool doDispatch(); - private: - boost::shared_ptr<Queue> events; - boost::shared_ptr<Consumer> consumer; - qpid::framing::SequenceSet range; - - void generateDequeueEvent(); - class DelegatingConsumer : public Consumer - { - public: - DelegatingConsumer(ReplicatingSubscription&); - ~DelegatingConsumer(); - bool deliver(QueuedMessage& msg); - void notify(); - bool filter(boost::intrusive_ptr<Message>); - bool accept(boost::intrusive_ptr<Message>); - void cancel() {} - OwnershipToken* getSession(); - private: - ReplicatingSubscription& delegate; - }; -}; - SemanticState::ConsumerImpl::shared_ptr SemanticState::ConsumerImpl::create(SemanticState* parent, const string& name, Queue::shared_ptr queue, @@ -327,163 +287,6 @@ SemanticState::ConsumerImpl::shared_ptr SemanticState::ConsumerImpl::create(Sema } } -std::string mask(const std::string& in) -{ - return std::string("$") + in + std::string("_internal"); -} - -class ReplicationStateInitialiser -{ - public: - ReplicationStateInitialiser(qpid::framing::SequenceSet& results, - const qpid::framing::SequenceNumber& start, - const qpid::framing::SequenceNumber& end); - void operator()(const QueuedMessage& m) { process(m); } - private: - qpid::framing::SequenceSet& results; - const qpid::framing::SequenceNumber start; - const qpid::framing::SequenceNumber end; - void process(const QueuedMessage&); -}; - -ReplicatingSubscription::ReplicatingSubscription(SemanticState* _parent, - const string& _name, - Queue::shared_ptr _queue, - bool ack, - bool _acquire, - bool _exclusive, - const string& _tag, - const string& _resumeId, - uint64_t _resumeTtl, - const framing::FieldTable& _arguments -) : ConsumerImpl(_parent, _name, _queue, ack, _acquire, _exclusive, _tag, _resumeId, _resumeTtl, _arguments), - events(new Queue(mask(_name))), - consumer(new DelegatingConsumer(*this)) -{ - - if (_arguments.isSet("qpid.high_sequence_number")) { - qpid::framing::SequenceNumber hwm = _arguments.getAsInt("qpid.high_sequence_number"); - qpid::framing::SequenceNumber lwm; - if (_arguments.isSet("qpid.low_sequence_number")) { - lwm = _arguments.getAsInt("qpid.low_sequence_number"); - } else { - lwm = hwm; - } - qpid::framing::SequenceNumber oldest; - if (_queue->getOldest(oldest)) { - if (oldest >= hwm) { - range.add(lwm, --oldest); - } else if (oldest >= lwm) { - ReplicationStateInitialiser initialiser(range, lwm, hwm); - _queue->eachMessage(initialiser); - } else { //i.e. have older message on master than is reported to exist on replica - QPID_LOG(warning, "Replica appears to be missing message on master"); - } - } else { - //local queue (i.e. master) is empty - range.add(lwm, _queue->getPosition()); - } - QPID_LOG(debug, "Initial set of dequeues for " << _queue->getName() << " are " << range - << " (lwm=" << lwm << ", hwm=" << hwm << ", current=" << _queue->getPosition() << ")"); - //set position of 'cursor' - position = hwm; - } -} - -bool ReplicatingSubscription::deliver(QueuedMessage& m) -{ - return ConsumerImpl::deliver(m); -} - -void ReplicatingSubscription::init() -{ - getQueue()->addObserver(boost::dynamic_pointer_cast<QueueObserver>(shared_from_this())); -} - -void ReplicatingSubscription::cancel() -{ - getQueue()->removeObserver(boost::dynamic_pointer_cast<QueueObserver>(shared_from_this())); -} - -ReplicatingSubscription::~ReplicatingSubscription() {} - -//called before we get notified of the message being available and -//under the message lock in the queue -void ReplicatingSubscription::enqueued(const QueuedMessage& m) -{ - QPID_LOG(debug, "Enqueued message at " << m.position); - //delay completion - m.payload->getIngressCompletion().startCompleter(); - QPID_LOG(debug, "Delayed " << m.payload.get()); -} - -class Buffer : public qpid::framing::Buffer -{ - public: - Buffer(size_t size) : qpid::framing::Buffer(new char[size], size) {} - ~Buffer() { delete[] getPointer(); } -}; - -void ReplicatingSubscription::generateDequeueEvent() -{ - Buffer buffer(range.encodedSize()); - range.encode(buffer); - range.clear(); - buffer.reset(); - - //generate event message - boost::intrusive_ptr<Message> event = new Message(); - AMQFrame method((MessageTransferBody(ProtocolVersion(), std::string(), 0, 0))); - AMQFrame header((AMQHeaderBody())); - AMQFrame content((AMQContentBody())); - content.castBody<AMQContentBody>()->decode(buffer, buffer.getSize()); - header.setBof(false); - header.setEof(false); - header.setBos(true); - header.setEos(true); - content.setBof(false); - content.setEof(true); - content.setBos(true); - content.setEos(true); - event->getFrames().append(method); - event->getFrames().append(header); - event->getFrames().append(content); - - DeliveryProperties* props = event->getFrames().getHeaders()->get<DeliveryProperties>(true); - props->setRoutingKey("dequeue-event"); - - events->deliver(event); -} - -//called after the message has been removed from the deque and under -//the message lock in the queue -void ReplicatingSubscription::dequeued(const QueuedMessage& m) -{ - { - Mutex::ScopedLock l(lock); - range.add(m.position); - QPID_LOG(debug, "Updated dequeue event to include message at " << m.position << "; subscription is at " << position); - } - notify(); - if (m.position > position) { - m.payload->getIngressCompletion().finishCompleter(); - QPID_LOG(debug, "Completed " << m.payload.get() << " early due to dequeue"); - } -} - -bool ReplicatingSubscription::doDispatch() -{ - { - Mutex::ScopedLock l(lock); - if (!range.empty()) { - generateDequeueEvent(); - } - } - bool r1 = events->dispatch(consumer); - bool r2 = ConsumerImpl::doDispatch(); - return r1 || r2; -} - SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, const string& _name, Queue::shared_ptr _queue, @@ -1048,35 +851,4 @@ void SemanticState::detached() } } -ReplicatingSubscription::DelegatingConsumer::DelegatingConsumer(ReplicatingSubscription& c) : Consumer(c.getName(), true), delegate(c) {} -ReplicatingSubscription::DelegatingConsumer::~DelegatingConsumer() {} -bool ReplicatingSubscription::DelegatingConsumer::deliver(QueuedMessage& m) -{ - return delegate.deliver(m); -} -void ReplicatingSubscription::DelegatingConsumer::notify() { delegate.notify(); } -bool ReplicatingSubscription::DelegatingConsumer::filter(boost::intrusive_ptr<Message> msg) { return delegate.filter(msg); } -bool ReplicatingSubscription::DelegatingConsumer::accept(boost::intrusive_ptr<Message> msg) { return delegate.accept(msg); } -OwnershipToken* ReplicatingSubscription::DelegatingConsumer::getSession() { return delegate.getSession(); } - -ReplicationStateInitialiser::ReplicationStateInitialiser(qpid::framing::SequenceSet& r, - const qpid::framing::SequenceNumber& s, - const qpid::framing::SequenceNumber& e) - : results(r), start(s), end(e) -{ - results.add(start, end); -} - -void ReplicationStateInitialiser::process(const QueuedMessage& message) -{ - if (message.position < start) { - //replica does not have a message that should still be on the queue - QPID_LOG(warning, "Replica appears to be missing message at " << message.position); - } else if (message.position >= start && message.position <= end) { - //i.e. message is within the intial range and has not been dequeued, so remove it from the results - results.remove(message.position); - } //else message has not been seen by replica yet so can be ignored here - -} - }} // namespace qpid::broker |