diff options
author | Gordon Sim <gsim@apache.org> | 2007-09-03 17:35:35 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2007-09-03 17:35:35 +0000 |
commit | 331b0e84ae06da0c057a82c0f5b67f550bcd636b (patch) | |
tree | 91342743f16ad473b456a5ef336409e4af38cd5a /cpp/src/qpid/broker/MessageHandlerImpl.cpp | |
parent | cadb67eb27e00abb493793363dcf37f4d2f563dc (diff) | |
download | qpid-python-331b0e84ae06da0c057a82c0f5b67f550bcd636b.tar.gz |
Initial implementation (plus very simple tests) of message.acquire, message.release, message.reject and message.flush.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@572394 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/MessageHandlerImpl.cpp')
-rw-r--r-- | cpp/src/qpid/broker/MessageHandlerImpl.cpp | 42 |
1 files changed, 34 insertions, 8 deletions
diff --git a/cpp/src/qpid/broker/MessageHandlerImpl.cpp b/cpp/src/qpid/broker/MessageHandlerImpl.cpp index 7529e3bb39..d9b91c1617 100644 --- a/cpp/src/qpid/broker/MessageHandlerImpl.cpp +++ b/cpp/src/qpid/broker/MessageHandlerImpl.cpp @@ -25,6 +25,7 @@ #include "MessageDelivery.h" #include "qpid/framing/MessageAppendBody.h" #include "qpid/framing/MessageTransferBody.h" +#include "qpid/framing/reply_exceptions.h" #include "BrokerAdapter.h" #include <boost/format.hpp> @@ -92,7 +93,7 @@ MessageHandlerImpl::subscribe(uint16_t /*ticket*/, const string& destination, bool noLocal, u_int8_t confirmMode, - u_int8_t acquireMode,//TODO: implement acquire modes + u_int8_t acquireMode, bool exclusive, const framing::FieldTable& filter ) { @@ -101,8 +102,10 @@ MessageHandlerImpl::subscribe(uint16_t /*ticket*/, throw ConnectionException(530, "Consumer tags must be unique"); string tag = destination; + //NB: am assuming pre-acquired = 0 as discussed on SIG list as is + //the previously expected behaviour session.consume(MessageDelivery::getMessageDeliveryToken(destination, confirmMode, acquireMode), - tag, queue, noLocal, confirmMode == 1, exclusive, &filter); + tag, queue, noLocal, confirmMode == 1, acquireMode == 0, exclusive, &filter); // Dispatch messages as there is now a consumer. queue->requestDispatch(); } @@ -156,9 +159,15 @@ MessageHandlerImpl::recover(bool requeue) } void -MessageHandlerImpl::reject(const SequenceNumberSet& /*transfers*/, uint16_t /*code*/, const string& /*text*/ ) +MessageHandlerImpl::reject(const SequenceNumberSet& transfers, uint16_t /*code*/, const string& /*text*/ ) { - //TODO: implement + if (transfers.size() % 2) { //must be even number + throw InvalidArgumentException("Received odd number of elements in list of transfers"); + } + + for (SequenceNumberSet::const_iterator i = transfers.begin(); i != transfers.end(); i++) { + session.reject(i->getValue(), (++i)->getValue()); + } } void MessageHandlerImpl::flow(const std::string& destination, u_int8_t unit, u_int32_t value) @@ -200,14 +209,31 @@ void MessageHandlerImpl::stop(const std::string& destination) session.stop(destination); } -void MessageHandlerImpl::acquire(const SequenceNumberSet& /*transfers*/, u_int8_t /*mode*/) +void MessageHandlerImpl::acquire(const SequenceNumberSet& transfers, u_int8_t /*mode*/) { - throw ConnectionException(540, "Not yet implemented"); + SequenceNumberSet results; + + if (transfers.size() % 2) { //must be even number + throw InvalidArgumentException("Received odd number of elements in list of transfers"); + } + + for (SequenceNumberSet::const_iterator i = transfers.begin(); i != transfers.end(); i++) { + session.acquire(i->getValue(), (++i)->getValue(), results); + } + + results = results.condense(); + client.acquired(results); } -void MessageHandlerImpl::release(const SequenceNumberSet& /*transfers*/) +void MessageHandlerImpl::release(const SequenceNumberSet& transfers) { - throw ConnectionException(540, "Not yet implemented"); + if (transfers.size() % 2) { //must be even number + throw InvalidArgumentException("Received odd number of elements in list of transfers"); + } + + for (SequenceNumberSet::const_iterator i = transfers.begin(); i != transfers.end(); i++) { + session.release(i->getValue(), (++i)->getValue()); + } } }} // namespace qpid::broker |