summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--qpid/cpp/src/qpid/broker/DeliveryRecord.cpp2
-rw-r--r--qpid/cpp/src/qpid/cluster/Connection.cpp10
2 files changed, 9 insertions, 3 deletions
diff --git a/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp b/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
index fdb562b7c5..5d6aee9045 100644
--- a/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
+++ b/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
@@ -116,7 +116,7 @@ void DeliveryRecord::complete() {
bool DeliveryRecord::accept(TransactionContext* ctxt) {
if (!ended) {
- consumer->acknowledged(getMessage());
+ if (consumer) consumer->acknowledged(getMessage());
if (acquired) queue->dequeue(ctxt, msg);
setEnded();
QPID_LOG(debug, "Accepted " << id);
diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp
index 00a343d71e..fc6ada096f 100644
--- a/qpid/cpp/src/qpid/cluster/Connection.cpp
+++ b/qpid/cpp/src/qpid/cluster/Connection.cpp
@@ -561,8 +561,14 @@ void Connection::deliveryRecord(const string& qname,
//
}
- broker::DeliveryRecord dr(m, queue, tag, semanticState().find(tag),
- acquired, accepted, windowing, credit);
+ // If a subscription is cancelled while there are unacked messages for it
+ // there won't be a consumer. Just null it out in this case, it isn't needed.
+ boost::shared_ptr<broker::Consumer> consumer;
+ try { consumer = semanticState().find(tag); }
+ catch(...) {}
+
+ broker::DeliveryRecord dr(
+ m, queue, tag, consumer, acquired, accepted, windowing, credit);
dr.setId(id);
if (cancelled) dr.cancel(dr.getTag());
if (completed) dr.complete();