diff options
author | Gordon Sim <gsim@apache.org> | 2012-08-13 14:55:30 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2012-08-13 14:55:30 +0000 |
commit | 796d89efad125c4728402a2e61db6abeb9b6d524 (patch) | |
tree | ae44db926b1d2953f7b35cf004c9a26d177062e4 /qpid | |
parent | 77fba0a917fb6757e6357aba073d3eaaf4bf1e89 (diff) | |
download | qpid-python-796d89efad125c4728402a2e61db6abeb9b6d524.tar.gz |
QPID-4178: Fix valgrind errors: prevent circular reference in messages, prevent uninitialised required credit value.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1372453 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid')
5 files changed, 43 insertions, 7 deletions
diff --git a/qpid/cpp/src/qpid/broker/PersistableMessage.cpp b/qpid/cpp/src/qpid/broker/PersistableMessage.cpp index 8866675c5c..b604f77aab 100644 --- a/qpid/cpp/src/qpid/broker/PersistableMessage.cpp +++ b/qpid/cpp/src/qpid/broker/PersistableMessage.cpp @@ -30,7 +30,33 @@ namespace qpid { namespace broker { PersistableMessage::~PersistableMessage() {} -PersistableMessage::PersistableMessage() : persistenceId(0) {} +PersistableMessage::PersistableMessage() : ingressCompletion(0), persistenceId(0) {} + +void PersistableMessage::setIngressCompletion(boost::intrusive_ptr<AsyncCompletion> i) +{ + ingressCompletion = i.get(); + /** + * What follows is a hack to account for the fact that the + * AsyncCompletion to use may be, but is not always, this same + * object. + * + * This is hopefully temporary, and allows the store interface to + * remain unchanged without requiring another object to be allocated + * for every message. + * + * The case in question is where a message previously passed to + * the store is modified by some other queue onto which it is + * pushed, and then again persisted to the store. These will be + * two separate PersistableMessage instances (since the latter now + * has different content), but need to share the same + * AsyncCompletion (since they refer to the same incoming transfer + * command). + */ + if (static_cast<RefCounted*>(ingressCompletion) != static_cast<RefCounted*>(this)) { + holder = i; + } +} + void PersistableMessage::flush() { diff --git a/qpid/cpp/src/qpid/broker/PersistableMessage.h b/qpid/cpp/src/qpid/broker/PersistableMessage.h index eb6b444e4a..2c33dd5ecb 100644 --- a/qpid/cpp/src/qpid/broker/PersistableMessage.h +++ b/qpid/cpp/src/qpid/broker/PersistableMessage.h @@ -57,7 +57,8 @@ class PersistableMessage : public Persistable * operations have completed, the transfer of this message from the client * may be considered complete. */ - boost::intrusive_ptr<AsyncCompletion> ingressCompletion; + AsyncCompletion* ingressCompletion; + boost::intrusive_ptr<AsyncCompletion> holder; mutable uint64_t persistenceId; public: @@ -73,7 +74,7 @@ class PersistableMessage : public Persistable /** track the progress of a message received by the broker - see ingressCompletion above */ QPID_BROKER_INLINE_EXTERN bool isIngressComplete() { return ingressCompletion->isDone(); } QPID_BROKER_INLINE_EXTERN AsyncCompletion& getIngressCompletion() { return *ingressCompletion; } - QPID_BROKER_INLINE_EXTERN void setIngressCompletion(boost::intrusive_ptr<AsyncCompletion> i) { ingressCompletion = i; } + QPID_BROKER_EXTERN void setIngressCompletion(boost::intrusive_ptr<AsyncCompletion> i); QPID_BROKER_INLINE_EXTERN void enqueueStart() { ingressCompletion->startCompleter(); } QPID_BROKER_INLINE_EXTERN void enqueueComplete() { ingressCompletion->finishCompleter(); } diff --git a/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp b/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp index e343abde8a..cac4434c48 100644 --- a/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp +++ b/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp @@ -39,8 +39,8 @@ namespace { const std::string QMF2("qmf2"); const std::string PARTIAL("partial"); } -MessageTransfer::MessageTransfer() : frames(framing::SequenceNumber()) {} -MessageTransfer::MessageTransfer(const framing::SequenceNumber& id) : frames(id) {} +MessageTransfer::MessageTransfer() : frames(framing::SequenceNumber()), requiredCredit(0), cachedRequiredCredit(false) {} +MessageTransfer::MessageTransfer(const framing::SequenceNumber& id) : frames(id), requiredCredit(0), cachedRequiredCredit(false) {} uint64_t MessageTransfer::getContentSize() const { @@ -100,7 +100,13 @@ bool MessageTransfer::requiresAccept() const } uint32_t MessageTransfer::getRequiredCredit() const { - return requiredCredit; + if (cachedRequiredCredit) { + return requiredCredit; + } else { + qpid::framing::SumBodySize sum; + frames.map_if(sum, qpid::framing::TypeFilter2<qpid::framing::HEADER_BODY, qpid::framing::CONTENT_BODY>()); + return sum.getSize(); + } } void MessageTransfer::computeRequiredCredit() { @@ -108,6 +114,7 @@ void MessageTransfer::computeRequiredCredit() qpid::framing::SumBodySize sum; frames.map_if(sum, qpid::framing::TypeFilter2<qpid::framing::HEADER_BODY, qpid::framing::CONTENT_BODY>()); requiredCredit = sum.getSize(); + cachedRequiredCredit = true; } uint32_t MessageTransfer::getRequiredCredit(const qpid::broker::Message& msg) { diff --git a/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h b/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h index 0d7ecb3956..590e389518 100644 --- a/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h +++ b/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h @@ -121,6 +121,7 @@ class MessageTransfer : public qpid::broker::Message::Encoding, public qpid::bro private: qpid::framing::FrameSet frames; uint32_t requiredCredit; + bool cachedRequiredCredit; MessageTransfer(const qpid::framing::FrameSet&); void encodeHeader(framing::Buffer& buffer) const; diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.cpp b/qpid/cpp/src/qpid/management/ManagementAgent.cpp index 7aed5a3c14..474c86ed48 100644 --- a/qpid/cpp/src/qpid/management/ManagementAgent.cpp +++ b/qpid/cpp/src/qpid/management/ManagementAgent.cpp @@ -605,7 +605,7 @@ void ManagementAgent::sendBufferLH(const string& data, } if (exchange.get() == 0) return; - intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> transfer(new qpid::broker::amqp_0_10::MessageTransfer()); + intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> transfer(new qpid::broker::amqp_0_10::MessageTransfer); AMQFrame method((MessageTransferBody(ProtocolVersion(), exchange->getName (), 0, 0))); AMQFrame header((AMQHeaderBody())); AMQFrame content((AMQContentBody(data))); @@ -638,6 +638,7 @@ void ManagementAgent::sendBufferLH(const string& data, dp->setTtl(ttl_msec); } transfer->getFrames().append(content); + transfer->computeRequiredCredit(); Message msg(transfer, transfer); msg.setIsManagementMessage(true); msg.computeExpiration(broker->getExpiryPolicy()); |