summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/cluster
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/cluster')
-rw-r--r--qpid/cpp/src/qpid/cluster/Connection.cpp5
-rw-r--r--qpid/cpp/src/qpid/cluster/Connection.h2
-rw-r--r--qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp2
3 files changed, 6 insertions, 3 deletions
diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp
index c16ab72876..00a343d71e 100644
--- a/qpid/cpp/src/qpid/cluster/Connection.cpp
+++ b/qpid/cpp/src/qpid/cluster/Connection.cpp
@@ -549,7 +549,7 @@ void Connection::deliveryRecord(const string& qname,
} else { // Message at original position in original queue
queue->find(position, m);
}
- // FIXME aconway 2011-08-19: removed:
+ // NOTE: removed:
// if (!m.payload)
// throw Exception(QPID_MSG("deliveryRecord no update message"));
//
@@ -561,7 +561,8 @@ void Connection::deliveryRecord(const string& qname,
//
}
- broker::DeliveryRecord dr(m, queue, tag, acquired, accepted, windowing, credit);
+ broker::DeliveryRecord dr(m, queue, tag, semanticState().find(tag),
+ acquired, accepted, windowing, credit);
dr.setId(id);
if (cancelled) dr.cancel(dr.getTag());
if (completed) dr.complete();
diff --git a/qpid/cpp/src/qpid/cluster/Connection.h b/qpid/cpp/src/qpid/cluster/Connection.h
index f656ace45e..920c4937db 100644
--- a/qpid/cpp/src/qpid/cluster/Connection.h
+++ b/qpid/cpp/src/qpid/cluster/Connection.h
@@ -209,6 +209,8 @@ class Connection :
void queueDequeueSincePurgeState(const std::string&, uint32_t);
+ bool isAnnounced() const { return announced; }
+
private:
struct NullFrameHandler : public framing::FrameHandler {
void handle(framing::AMQFrame&) {}
diff --git a/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp b/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
index 4bf03eefa2..2cd1cf9a83 100644
--- a/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
+++ b/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
@@ -97,7 +97,7 @@ void OutputInterceptor::deliverDoOutput(uint32_t limit) {
}
void OutputInterceptor::sendDoOutput(size_t newLimit, const sys::Mutex::ScopedLock&) {
- if (parent.isLocal() && !sentDoOutput && !closing) {
+ if (parent.isLocal() && !sentDoOutput && !closing && parent.isAnnounced()) {
sentDoOutput = true;
parent.getCluster().getMulticast().mcastControl(
ClusterConnectionDeliverDoOutputBody(ProtocolVersion(), newLimit),