summaryrefslogtreecommitdiff
path: root/qpid
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2012-08-13 14:55:30 +0000
committerGordon Sim <gsim@apache.org>2012-08-13 14:55:30 +0000
commit796d89efad125c4728402a2e61db6abeb9b6d524 (patch)
treeae44db926b1d2953f7b35cf004c9a26d177062e4 /qpid
parent77fba0a917fb6757e6357aba073d3eaaf4bf1e89 (diff)
downloadqpid-python-796d89efad125c4728402a2e61db6abeb9b6d524.tar.gz
QPID-4178: Fix valgrind errors: prevent circular reference in messages, prevent uninitialised required credit value.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1372453 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid')
-rw-r--r--qpid/cpp/src/qpid/broker/PersistableMessage.cpp28
-rw-r--r--qpid/cpp/src/qpid/broker/PersistableMessage.h5
-rw-r--r--qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp13
-rw-r--r--qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h1
-rw-r--r--qpid/cpp/src/qpid/management/ManagementAgent.cpp3
5 files changed, 43 insertions, 7 deletions
diff --git a/qpid/cpp/src/qpid/broker/PersistableMessage.cpp b/qpid/cpp/src/qpid/broker/PersistableMessage.cpp
index 8866675c5c..b604f77aab 100644
--- a/qpid/cpp/src/qpid/broker/PersistableMessage.cpp
+++ b/qpid/cpp/src/qpid/broker/PersistableMessage.cpp
@@ -30,7 +30,33 @@ namespace qpid {
namespace broker {
PersistableMessage::~PersistableMessage() {}
-PersistableMessage::PersistableMessage() : persistenceId(0) {}
+PersistableMessage::PersistableMessage() : ingressCompletion(0), persistenceId(0) {}
+
+void PersistableMessage::setIngressCompletion(boost::intrusive_ptr<AsyncCompletion> i)
+{
+ ingressCompletion = i.get();
+ /**
+ * What follows is a hack to account for the fact that the
+ * AsyncCompletion to use may be, but is not always, this same
+ * object.
+ *
+ * This is hopefully temporary, and allows the store interface to
+ * remain unchanged without requiring another object to be allocated
+ * for every message.
+ *
+ * The case in question is where a message previously passed to
+ * the store is modified by some other queue onto which it is
+ * pushed, and then again persisted to the store. These will be
+ * two separate PersistableMessage instances (since the latter now
+ * has different content), but need to share the same
+ * AsyncCompletion (since they refer to the same incoming transfer
+ * command).
+ */
+ if (static_cast<RefCounted*>(ingressCompletion) != static_cast<RefCounted*>(this)) {
+ holder = i;
+ }
+}
+
void PersistableMessage::flush()
{
diff --git a/qpid/cpp/src/qpid/broker/PersistableMessage.h b/qpid/cpp/src/qpid/broker/PersistableMessage.h
index eb6b444e4a..2c33dd5ecb 100644
--- a/qpid/cpp/src/qpid/broker/PersistableMessage.h
+++ b/qpid/cpp/src/qpid/broker/PersistableMessage.h
@@ -57,7 +57,8 @@ class PersistableMessage : public Persistable
* operations have completed, the transfer of this message from the client
* may be considered complete.
*/
- boost::intrusive_ptr<AsyncCompletion> ingressCompletion;
+ AsyncCompletion* ingressCompletion;
+ boost::intrusive_ptr<AsyncCompletion> holder;
mutable uint64_t persistenceId;
public:
@@ -73,7 +74,7 @@ class PersistableMessage : public Persistable
/** track the progress of a message received by the broker - see ingressCompletion above */
QPID_BROKER_INLINE_EXTERN bool isIngressComplete() { return ingressCompletion->isDone(); }
QPID_BROKER_INLINE_EXTERN AsyncCompletion& getIngressCompletion() { return *ingressCompletion; }
- QPID_BROKER_INLINE_EXTERN void setIngressCompletion(boost::intrusive_ptr<AsyncCompletion> i) { ingressCompletion = i; }
+ QPID_BROKER_EXTERN void setIngressCompletion(boost::intrusive_ptr<AsyncCompletion> i);
QPID_BROKER_INLINE_EXTERN void enqueueStart() { ingressCompletion->startCompleter(); }
QPID_BROKER_INLINE_EXTERN void enqueueComplete() { ingressCompletion->finishCompleter(); }
diff --git a/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp b/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp
index e343abde8a..cac4434c48 100644
--- a/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp
@@ -39,8 +39,8 @@ namespace {
const std::string QMF2("qmf2");
const std::string PARTIAL("partial");
}
-MessageTransfer::MessageTransfer() : frames(framing::SequenceNumber()) {}
-MessageTransfer::MessageTransfer(const framing::SequenceNumber& id) : frames(id) {}
+MessageTransfer::MessageTransfer() : frames(framing::SequenceNumber()), requiredCredit(0), cachedRequiredCredit(false) {}
+MessageTransfer::MessageTransfer(const framing::SequenceNumber& id) : frames(id), requiredCredit(0), cachedRequiredCredit(false) {}
uint64_t MessageTransfer::getContentSize() const
{
@@ -100,7 +100,13 @@ bool MessageTransfer::requiresAccept() const
}
uint32_t MessageTransfer::getRequiredCredit() const
{
- return requiredCredit;
+ if (cachedRequiredCredit) {
+ return requiredCredit;
+ } else {
+ qpid::framing::SumBodySize sum;
+ frames.map_if(sum, qpid::framing::TypeFilter2<qpid::framing::HEADER_BODY, qpid::framing::CONTENT_BODY>());
+ return sum.getSize();
+ }
}
void MessageTransfer::computeRequiredCredit()
{
@@ -108,6 +114,7 @@ void MessageTransfer::computeRequiredCredit()
qpid::framing::SumBodySize sum;
frames.map_if(sum, qpid::framing::TypeFilter2<qpid::framing::HEADER_BODY, qpid::framing::CONTENT_BODY>());
requiredCredit = sum.getSize();
+ cachedRequiredCredit = true;
}
uint32_t MessageTransfer::getRequiredCredit(const qpid::broker::Message& msg)
{
diff --git a/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h b/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h
index 0d7ecb3956..590e389518 100644
--- a/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h
+++ b/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h
@@ -121,6 +121,7 @@ class MessageTransfer : public qpid::broker::Message::Encoding, public qpid::bro
private:
qpid::framing::FrameSet frames;
uint32_t requiredCredit;
+ bool cachedRequiredCredit;
MessageTransfer(const qpid::framing::FrameSet&);
void encodeHeader(framing::Buffer& buffer) const;
diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.cpp b/qpid/cpp/src/qpid/management/ManagementAgent.cpp
index 7aed5a3c14..474c86ed48 100644
--- a/qpid/cpp/src/qpid/management/ManagementAgent.cpp
+++ b/qpid/cpp/src/qpid/management/ManagementAgent.cpp
@@ -605,7 +605,7 @@ void ManagementAgent::sendBufferLH(const string& data,
}
if (exchange.get() == 0) return;
- intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> transfer(new qpid::broker::amqp_0_10::MessageTransfer());
+ intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> transfer(new qpid::broker::amqp_0_10::MessageTransfer);
AMQFrame method((MessageTransferBody(ProtocolVersion(), exchange->getName (), 0, 0)));
AMQFrame header((AMQHeaderBody()));
AMQFrame content((AMQContentBody(data)));
@@ -638,6 +638,7 @@ void ManagementAgent::sendBufferLH(const string& data,
dp->setTtl(ttl_msec);
}
transfer->getFrames().append(content);
+ transfer->computeRequiredCredit();
Message msg(transfer, transfer);
msg.setIsManagementMessage(true);
msg.computeExpiration(broker->getExpiryPolicy());