summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-01-19 23:08:30 +0000
committerAlan Conway <aconway@apache.org>2012-01-19 23:08:30 +0000
commit6df9229322558736ccb92af4f61f62a5798fd9df (patch)
tree6f4afd68f8e7f5530a666c42275fa58106c38868
parent6fd3663ff0c697584bbba5bcba5d61c3fefa9115 (diff)
downloadqpid-python-6df9229322558736ccb92af4f61f62a5798fd9df.tar.gz
QPID-3603: Handle backup crash/shutdown.
If a backup crashes or shuts down any messages that have been delayed completion for that backup must be marked complete to avoid the primary hanging. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3603-2@1233681 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/Consumer.h15
-rw-r--r--qpid/cpp/src/qpid/broker/DeliveryRecord.cpp57
-rw-r--r--qpid/cpp/src/qpid/broker/DeliveryRecord.h19
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.cpp5
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.h9
-rw-r--r--qpid/cpp/src/qpid/cluster/Connection.cpp5
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.cpp1
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp98
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.h14
-rw-r--r--qpid/cpp/src/tests/DeliveryRecordTest.cpp2
-rw-r--r--qpid/cpp/src/tests/QueueTest.cpp1
11 files changed, 139 insertions, 87 deletions
diff --git a/qpid/cpp/src/qpid/broker/Consumer.h b/qpid/cpp/src/qpid/broker/Consumer.h
index d13d8075a9..3330e3918f 100644
--- a/qpid/cpp/src/qpid/broker/Consumer.h
+++ b/qpid/cpp/src/qpid/broker/Consumer.h
@@ -34,10 +34,12 @@ class QueueListeners;
/**
* Base class for consumers which represent a subscription to a queue.
*/
-class Consumer {
+class Consumer
+{
const bool acquires;
- // inListeners allows QueueListeners to efficiently track if this instance is registered
- // for notifications without having to search its containers
+ // inListeners allows QueueListeners to efficiently track if this
+ // instance is registered for notifications without having to
+ // search its containers
bool inListeners;
// the name is generated by broker and is unique within broker scope. It is not
// provided or known by the remote Consumer.
@@ -61,7 +63,12 @@ class Consumer {
virtual bool accept(boost::intrusive_ptr<Message>) { return true; }
virtual OwnershipToken* getSession() = 0;
virtual void cancel() = 0;
- virtual bool isDelayedCompletion() const { return false; }
+
+ /** Called when the peer has acknowledged receipt of the message.
+ * Not to be confused with accept() above, which is asking if
+ * this consumer will consume/browse the message.
+ */
+ virtual void acknowledged(const QueuedMessage&) = 0;
protected:
framing::SequenceNumber position;
diff --git a/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp b/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
index f17795743b..0181a88840 100644
--- a/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
+++ b/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -21,6 +21,7 @@
#include "qpid/broker/DeliveryRecord.h"
#include "qpid/broker/DeliverableMessage.h"
#include "qpid/broker/SemanticState.h"
+#include "qpid/broker/Consumer.h"
#include "qpid/broker/Exchange.h"
#include "qpid/broker/Queue.h"
#include "qpid/log/Statement.h"
@@ -31,23 +32,25 @@ using namespace qpid;
using namespace qpid::broker;
using std::string;
-DeliveryRecord::DeliveryRecord(const QueuedMessage& _msg,
- const Queue::shared_ptr& _queue,
+DeliveryRecord::DeliveryRecord(const QueuedMessage& _msg,
+ const Queue::shared_ptr& _queue,
const std::string& _tag,
+ const boost::shared_ptr<Consumer>& _consumer,
bool _acquired,
- bool accepted,
+ bool accepted,
bool _windowing,
- uint32_t _credit, bool _isDelayedCompletion) : msg(_msg),
- queue(_queue),
- tag(_tag),
- acquired(_acquired),
- acceptExpected(!accepted),
- cancelled(false),
- completed(false),
- ended(accepted && acquired),
- windowing(_windowing),
- credit(msg.payload ? msg.payload->getRequiredCredit() : _credit),
- isDelayedCompletion(_isDelayedCompletion)
+ uint32_t _credit):
+ msg(_msg),
+ queue(_queue),
+ tag(_tag),
+ consumer(_consumer),
+ acquired(_acquired),
+ acceptExpected(!accepted),
+ cancelled(false),
+ completed(false),
+ ended(accepted && acquired),
+ windowing(_windowing),
+ credit(msg.payload ? msg.payload->getRequiredCredit() : _credit)
{}
bool DeliveryRecord::setEnded()
@@ -95,7 +98,7 @@ void DeliveryRecord::requeue() const
}
}
-void DeliveryRecord::release(bool setRedelivered)
+void DeliveryRecord::release(bool setRedelivered)
{
if (acquired && !ended) {
if (setRedelivered) msg.payload->redeliver();
@@ -108,19 +111,13 @@ void DeliveryRecord::release(bool setRedelivered)
}
void DeliveryRecord::complete() {
- completed = true;
+ completed = true;
}
bool DeliveryRecord::accept(TransactionContext* ctxt) {
if (!ended) {
- if (acquired) {
- queue->dequeue(ctxt, msg);
- } else if (isDelayedCompletion) {
- // FIXME aconway 2011-12-05: This should be done in HA code.
- msg.payload->getIngressCompletion().finishCompleter();
- QPID_LOG(debug, "Completed " << msg.queue->getName()
- << "[" << msg.position << "]");
- }
+ consumer->acknowledged(getMessage());
+ if (acquired) queue->dequeue(ctxt, msg);
setEnded();
QPID_LOG(debug, "Accepted " << id);
}
@@ -137,8 +134,8 @@ void DeliveryRecord::committed() const{
queue->dequeueCommitted(msg);
}
-void DeliveryRecord::reject()
-{
+void DeliveryRecord::reject()
+{
if (acquired && !ended) {
Exchange::shared_ptr alternate = queue->getAlternateExchange();
if (alternate) {
@@ -173,7 +170,7 @@ void DeliveryRecord::acquire(DeliveryIds& results) {
}
}
-void DeliveryRecord::cancel(const std::string& cancelledTag)
+void DeliveryRecord::cancel(const std::string& cancelledTag)
{
if (tag == cancelledTag)
cancelled = true;
@@ -192,7 +189,7 @@ AckRange DeliveryRecord::findRange(DeliveryRecords& records, DeliveryId first, D
namespace qpid {
namespace broker {
-std::ostream& operator<<(std::ostream& out, const DeliveryRecord& r)
+std::ostream& operator<<(std::ostream& out, const DeliveryRecord& r)
{
out << "{" << "id=" << r.id.getValue();
out << ", tag=" << r.tag << "}";
diff --git a/qpid/cpp/src/qpid/broker/DeliveryRecord.h b/qpid/cpp/src/qpid/broker/DeliveryRecord.h
index ea33ed5461..21074d4274 100644
--- a/qpid/cpp/src/qpid/broker/DeliveryRecord.h
+++ b/qpid/cpp/src/qpid/broker/DeliveryRecord.h
@@ -10,9 +10,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -38,6 +38,7 @@ namespace broker {
class TransactionContext;
class SemanticState;
struct AckRange;
+class Consumer;
/**
* Record of a delivery for which an ack is outstanding.
@@ -47,6 +48,7 @@ class DeliveryRecord
QueuedMessage msg;
mutable boost::shared_ptr<Queue> queue;
std::string tag; // name of consumer
+ boost::shared_ptr<Consumer> consumer;
DeliveryId id;
bool acquired : 1;
bool acceptExpected : 1;
@@ -63,21 +65,20 @@ class DeliveryRecord
* after that).
*/
uint32_t credit;
- bool isDelayedCompletion;
public:
QPID_BROKER_EXTERN DeliveryRecord(const QueuedMessage& msg,
const boost::shared_ptr<Queue>& queue,
const std::string& tag,
+ const boost::shared_ptr<Consumer>& consumer,
bool acquired,
bool accepted,
bool windowing,
- uint32_t credit=0, // Only used if msg is empty.
- bool isDelayedCompletion=false
+ uint32_t credit=0 // Only used if msg is empty.
);
-
+
bool coveredBy(const framing::SequenceSet* const range) const { return range->contains(id); }
-
+
void dequeue(TransactionContext* ctxt = 0) const;
void requeue() const;
void release(bool setRedelivered);
@@ -97,7 +98,7 @@ class DeliveryRecord
bool isAccepted() const { return !acceptExpected; }
bool isEnded() const { return ended; }
bool isWindowing() const { return windowing; }
-
+
uint32_t getCredit() const;
const std::string& getTag() const { return tag; }
@@ -134,7 +135,7 @@ typedef DeliveryRecord::DeliveryRecords DeliveryRecords;
struct AckRange
{
DeliveryRecords::iterator start;
- DeliveryRecords::iterator end;
+ DeliveryRecords::iterator end;
AckRange(DeliveryRecords::iterator _start, DeliveryRecords::iterator _end) : start(_start), end(_end) {}
};
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp
index 775c4cd862..e7d2259c80 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.cpp
+++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp
@@ -341,7 +341,8 @@ bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg)
{
assertClusterSafe();
allocateCredit(msg.payload);
- DeliveryRecord record(msg, msg.queue->shared_from_this(), getTag(), acquire, !ackExpected, credit.isWindowMode(), 0, isDelayedCompletion());
+ DeliveryRecord record(msg, msg.queue->shared_from_this(), getTag(),
+ shared_from_this(), acquire, !ackExpected, credit.isWindowMode(), 0);
bool sync = syncFrequency && ++deliveryCount >= syncFrequency;
if (sync) deliveryCount = 0;//reset
parent->deliver(record, sync);
@@ -364,7 +365,7 @@ bool SemanticState::ConsumerImpl::filter(intrusive_ptr<Message>)
bool SemanticState::ConsumerImpl::accept(intrusive_ptr<Message> msg)
{
assertClusterSafe();
- // FIXME aconway 2009-06-08: if we have byte & message credit but
+ // TODO aconway 2009-06-08: if we have byte & message credit but
// checkCredit fails because the message is to big, we should
// remain on queue's listener list for possible smaller messages
// in future.
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.h b/qpid/cpp/src/qpid/broker/SemanticState.h
index 2be78e7233..5a83fd0fb3 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.h
+++ b/qpid/cpp/src/qpid/broker/SemanticState.h
@@ -149,14 +149,11 @@ class SemanticState : private boost::noncopyable {
SemanticState& getParent() { return *parent; }
const SemanticState& getParent() const { return *parent; }
- // Manageable entry points
+ void acknowledged(const broker::QueuedMessage&) {}
+
+ // manageable entry points
management::ManagementObject* GetManagementObject (void) const;
management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args, std::string& text);
-
- /** This consumer wants delayed completion.
- * Overridden by ConsumerImpl subclasses.
- */
- virtual bool isDelayedCompletion() const { return false; }
};
typedef std::map<std::string, DtxBuffer::shared_ptr> DtxBufferMap;
diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp
index c16ab72876..00a343d71e 100644
--- a/qpid/cpp/src/qpid/cluster/Connection.cpp
+++ b/qpid/cpp/src/qpid/cluster/Connection.cpp
@@ -549,7 +549,7 @@ void Connection::deliveryRecord(const string& qname,
} else { // Message at original position in original queue
queue->find(position, m);
}
- // FIXME aconway 2011-08-19: removed:
+ // NOTE: removed:
// if (!m.payload)
// throw Exception(QPID_MSG("deliveryRecord no update message"));
//
@@ -561,7 +561,8 @@ void Connection::deliveryRecord(const string& qname,
//
}
- broker::DeliveryRecord dr(m, queue, tag, acquired, accepted, windowing, credit);
+ broker::DeliveryRecord dr(m, queue, tag, semanticState().find(tag),
+ acquired, accepted, windowing, credit);
dr.setId(id);
if (cancelled) dr.cancel(dr.getTag());
if (completed) dr.complete();
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp
index 36859909df..9b2598b3bf 100644
--- a/qpid/cpp/src/qpid/ha/HaBroker.cpp
+++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp
@@ -70,6 +70,7 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s)
boost::shared_ptr<ReplicatingSubscription::Factory>(
new ReplicatingSubscription::Factory()));
}
+
HaBroker::~HaBroker() {}
Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args, string&) {
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
index 6c33002b5c..0070118102 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
@@ -132,20 +132,68 @@ bool ReplicatingSubscription::deliver(QueuedMessage& m) {
ReplicatingSubscription::~ReplicatingSubscription() {}
+
+// INVARIANT: delayed contains msg <=> we have outstanding startCompletion on msg
+
+// Mark a message completed. May be called by acknowledge or dequeued
+void ReplicatingSubscription::complete(
+ const QueuedMessage& qm, const sys::Mutex::ScopedLock&)
+{
+ // Handle completions for the subscribed queue, not the internal event queue.
+ if (qm.queue && qm.queue == getQueue().get()) {
+ QPID_LOG(trace, logPrefix << "Completed message " << qm.position);
+ Delayed::iterator i= delayed.find(qm.position);
+ // The same message can be completed twice, by acknowledged and
+ // dequeued, remove it from the set so it only gets completed
+ // once.
+ if (i != delayed.end()) {
+ assert(i->second.payload == qm.payload);
+ qm.payload->getIngressCompletion().finishCompleter();
+ delayed.erase(i);
+ }
+ }
+}
+
+// Called before we get notified of the message being available and
+// under the message lock in the queue. Called in arbitrary connection thread.
+void ReplicatingSubscription::enqueued(const QueuedMessage& qm) {
+ sys::Mutex::ScopedLock l(lock);
+ // Delay completion
+ QPID_LOG(trace, logPrefix << "Delaying completion of message " << qm.position);
+ qm.payload->getIngressCompletion().startCompleter();
+ assert(delayed.find(qm.position) == delayed.end());
+ delayed[qm.position] = qm;
+}
+
+
+// Function to complete a delayed message, called by cancel()
+void ReplicatingSubscription::cancelComplete(
+ const Delayed::value_type& v, const sys::Mutex::ScopedLock&)
+{
+ QPID_LOG(trace, logPrefix << "Cancel completed message " << v.second.position);
+ v.second.payload->getIngressCompletion().finishCompleter();
+}
+
// Called in the subscription's connection thread.
void ReplicatingSubscription::cancel()
{
- QPID_LOG(debug, logPrefix <<"Cancelled backup subscription " << getName());
getQueue()->removeObserver(
boost::dynamic_pointer_cast<QueueObserver>(shared_from_this()));
+ {
+ sys::Mutex::ScopedLock l(lock);
+ QPID_LOG(debug, logPrefix <<"Cancelled backup subscription " << getName());
+ for_each(delayed.begin(), delayed.end(),
+ boost::bind(&ReplicatingSubscription::cancelComplete, this, _1, boost::ref(l)));
+ delayed.clear();
+ }
ConsumerImpl::cancel();
}
-// Called before we get notified of the message being available and
-// under the message lock in the queue. Called in arbitrary connection thread.
-void ReplicatingSubscription::enqueued(const QueuedMessage& m) {
- //delay completion
- m.payload->getIngressCompletion().startCompleter();
+// Called on primary in the backups IO thread.
+void ReplicatingSubscription::acknowledged(const QueuedMessage& msg) {
+ sys::Mutex::ScopedLock l(lock);
+ // Finish completion of message, it has been acknowledged by the backup.
+ complete(msg, l);
}
// Called with lock held. Called in subscription's connection thread.
@@ -160,6 +208,21 @@ void ReplicatingSubscription::sendDequeueEvent(const sys::Mutex::ScopedLock& l)
sendEvent(QueueReplicator::DEQUEUE_EVENT_KEY, buffer, l);
}
+// Called after the message has been removed from the deque and under
+// the messageLock in the queue. Called in arbitrary connection threads.
+void ReplicatingSubscription::dequeued(const QueuedMessage& qm)
+{
+ {
+ sys::Mutex::ScopedLock l(lock);
+ QPID_LOG(trace, logPrefix << "Dequeued message " << qm.position);
+ dequeues.add(qm.position);
+ // If we have not yet sent this message to the backup, then
+ // complete it now as it will never be accepted.
+ if (qm.position > position) complete(qm, l);
+ }
+ notify(); // Ensure a call to doDispatch
+}
+
// Called with lock held. Called in subscription's connection thread.
void ReplicatingSubscription::sendPositionEvent(
SequenceNumber position, const sys::Mutex::ScopedLock&l )
@@ -205,28 +268,6 @@ void ReplicatingSubscription::sendEvent(const std::string& key, framing::Buffer&
events->dispatch(consumer);
}
-// Called after the message has been removed from the deque and under
-// the messageLock in the queue. Called in arbitrary connection threads.
-void ReplicatingSubscription::dequeued(const QueuedMessage& m)
-{
- {
- sys::Mutex::ScopedLock l(lock);
- dequeues.add(m.position);
- // If we have not yet sent this message to the backup, then
- // complete it now as it will never be accepted.
-
- // FIXME aconway 2012-01-05: suspect use of position in
- // foreign connection thread. Race with deliver() which is
- // not under the message lock?
- if (m.position > position) {
- m.payload->getIngressCompletion().finishCompleter();
- QPID_LOG(trace, logPrefix << "Dequeued and completed message " << m.position << " early");
- }
- else
- QPID_LOG(trace, logPrefix << "Dequeued message " << m.position);
- }
- notify(); // Ensure a call to doDispatch
-}
// Called in subscription's connection thread.
bool ReplicatingSubscription::doDispatch()
@@ -244,7 +285,6 @@ bool ReplicatingSubscription::DelegatingConsumer::deliver(QueuedMessage& m) { re
void ReplicatingSubscription::DelegatingConsumer::notify() { delegate.notify(); }
bool ReplicatingSubscription::DelegatingConsumer::filter(boost::intrusive_ptr<Message> msg) { return delegate.filter(msg); }
bool ReplicatingSubscription::DelegatingConsumer::accept(boost::intrusive_ptr<Message> msg) { return delegate.accept(msg); }
-void ReplicatingSubscription::DelegatingConsumer::cancel() {}
OwnershipToken* ReplicatingSubscription::DelegatingConsumer::getSession() { return delegate.getSession(); }
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
index 147c40ee6d..8af273e4d8 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
@@ -85,17 +85,21 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl,
// Consumer overrides.
void cancel();
- bool isDelayedCompletion() const { return true; }
+ void acknowledged(const broker::QueuedMessage&);
protected:
bool doDispatch();
private:
+ typedef std::map<framing::SequenceNumber, broker::QueuedMessage> Delayed;
std::string logPrefix;
boost::shared_ptr<broker::Queue> events;
boost::shared_ptr<broker::Consumer> consumer;
- qpid::framing::SequenceSet dequeues;
+ Delayed delayed;
+ framing::SequenceSet dequeues;
framing::SequenceNumber backupPosition;
+ void complete(const broker::QueuedMessage&, const sys::Mutex::ScopedLock&);
+ void cancelComplete(const Delayed::value_type& v, const sys::Mutex::ScopedLock&);
void sendDequeueEvent(const sys::Mutex::ScopedLock&);
void sendPositionEvent(framing::SequenceNumber, const sys::Mutex::ScopedLock&);
void sendEvent(const std::string& key, framing::Buffer&,
@@ -110,9 +114,11 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl,
void notify();
bool filter(boost::intrusive_ptr<broker::Message>);
bool accept(boost::intrusive_ptr<broker::Message>);
- void cancel();
- bool isDelayedCompletion() const { return false; }
+ void cancel() {}
+ void acknowledged(const broker::QueuedMessage&) {}
+
broker::OwnershipToken* getSession();
+
private:
ReplicatingSubscription& delegate;
};
diff --git a/qpid/cpp/src/tests/DeliveryRecordTest.cpp b/qpid/cpp/src/tests/DeliveryRecordTest.cpp
index f7013014ff..fb7bd2f727 100644
--- a/qpid/cpp/src/tests/DeliveryRecordTest.cpp
+++ b/qpid/cpp/src/tests/DeliveryRecordTest.cpp
@@ -49,7 +49,7 @@ QPID_AUTO_TEST_CASE(testSort)
list<DeliveryRecord> records;
for (list<SequenceNumber>::iterator i = ids.begin(); i != ids.end(); i++) {
- DeliveryRecord r(QueuedMessage(0), Queue::shared_ptr(), "tag", false, false, false);
+ DeliveryRecord r(QueuedMessage(0), Queue::shared_ptr(), "tag", Consumer::shared_ptr(), false, false, false);
r.setId(*i);
records.push_back(r);
}
diff --git a/qpid/cpp/src/tests/QueueTest.cpp b/qpid/cpp/src/tests/QueueTest.cpp
index 9a76bb28e1..bb4f7b9f4b 100644
--- a/qpid/cpp/src/tests/QueueTest.cpp
+++ b/qpid/cpp/src/tests/QueueTest.cpp
@@ -67,6 +67,7 @@ public:
};
void notify() {}
void cancel() {}
+ void acknowledged(const QueuedMessage&) {}
OwnershipToken* getSession() { return 0; }
};