diff options
author | Kenneth Anthony Giusti <kgiusti@apache.org> | 2011-02-19 15:03:16 +0000 |
---|---|---|
committer | Kenneth Anthony Giusti <kgiusti@apache.org> | 2011-02-19 15:03:16 +0000 |
commit | 81584c84fadc886b0ad53dceb479073e56bf8cdd (patch) | |
tree | f48206d10d52fdbb5a4ce93ec8068f0de4fbc9f5 /cpp/src/qpid/broker/Message.cpp | |
parent | ccd0e27fdf0c5a90a7f85099dac4f63dbd7a5d15 (diff) | |
download | qpid-python-81584c84fadc886b0ad53dceb479073e56bf8cdd.tar.gz |
QPID-2935: merge producer flow control (C++ broker).
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1072356 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/Message.cpp')
-rw-r--r-- | cpp/src/qpid/broker/Message.cpp | 27 |
1 files changed, 5 insertions, 22 deletions
diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp index c589669e5a..122c5b9c1a 100644 --- a/cpp/src/qpid/broker/Message.cpp +++ b/cpp/src/qpid/broker/Message.cpp @@ -50,14 +50,15 @@ TransferAdapter Message::TRANSFER; Message::Message(const framing::SequenceNumber& id) : frames(id), persistenceId(0), redelivered(false), loaded(false), staged(false), forcePersistentPolicy(false), publisher(0), adapter(0), - expiration(FAR_FUTURE), enqueueCallback(0), dequeueCallback(0), - inCallback(false), requiredCredit(0) {} + expiration(FAR_FUTURE), dequeueCallback(0), + inCallback(false), requiredCredit(0) +{} Message::Message(const Message& original) : PersistableMessage(), frames(original.frames), persistenceId(0), redelivered(false), loaded(false), staged(false), forcePersistentPolicy(false), publisher(0), adapter(0), - expiration(original.expiration), enqueueCallback(0), dequeueCallback(0), - inCallback(false), requiredCredit(0) + expiration(original.expiration), dequeueCallback(0), + inCallback(false), requiredCredit(0) { setExpiryPolicy(original.expiryPolicy); } @@ -415,30 +416,12 @@ struct ScopedSet { }; } -void Message::allEnqueuesComplete() { - ScopedSet ss(callbackLock, inCallback); - MessageCallback* cb = enqueueCallback; - if (cb && *cb) (*cb)(intrusive_ptr<Message>(this)); -} - void Message::allDequeuesComplete() { ScopedSet ss(callbackLock, inCallback); MessageCallback* cb = dequeueCallback; if (cb && *cb) (*cb)(intrusive_ptr<Message>(this)); } -void Message::setEnqueueCompleteCallback(MessageCallback& cb) { - sys::Mutex::ScopedLock l(callbackLock); - while (inCallback) callbackLock.wait(); - enqueueCallback = &cb; -} - -void Message::resetEnqueueCompleteCallback() { - sys::Mutex::ScopedLock l(callbackLock); - while (inCallback) callbackLock.wait(); - enqueueCallback = 0; -} - void Message::setDequeueCompleteCallback(MessageCallback& cb) { sys::Mutex::ScopedLock l(callbackLock); while (inCallback) callbackLock.wait(); |