diff options
Diffstat (limited to 'cpp/src/qpid/broker/MessageHandlerImpl.cpp')
-rw-r--r-- | cpp/src/qpid/broker/MessageHandlerImpl.cpp | 42 |
1 files changed, 34 insertions, 8 deletions
diff --git a/cpp/src/qpid/broker/MessageHandlerImpl.cpp b/cpp/src/qpid/broker/MessageHandlerImpl.cpp index 7529e3bb39..d9b91c1617 100644 --- a/cpp/src/qpid/broker/MessageHandlerImpl.cpp +++ b/cpp/src/qpid/broker/MessageHandlerImpl.cpp @@ -25,6 +25,7 @@ #include "MessageDelivery.h" #include "qpid/framing/MessageAppendBody.h" #include "qpid/framing/MessageTransferBody.h" +#include "qpid/framing/reply_exceptions.h" #include "BrokerAdapter.h" #include <boost/format.hpp> @@ -92,7 +93,7 @@ MessageHandlerImpl::subscribe(uint16_t /*ticket*/, const string& destination, bool noLocal, u_int8_t confirmMode, - u_int8_t acquireMode,//TODO: implement acquire modes + u_int8_t acquireMode, bool exclusive, const framing::FieldTable& filter ) { @@ -101,8 +102,10 @@ MessageHandlerImpl::subscribe(uint16_t /*ticket*/, throw ConnectionException(530, "Consumer tags must be unique"); string tag = destination; + //NB: am assuming pre-acquired = 0 as discussed on SIG list as is + //the previously expected behaviour session.consume(MessageDelivery::getMessageDeliveryToken(destination, confirmMode, acquireMode), - tag, queue, noLocal, confirmMode == 1, exclusive, &filter); + tag, queue, noLocal, confirmMode == 1, acquireMode == 0, exclusive, &filter); // Dispatch messages as there is now a consumer. queue->requestDispatch(); } @@ -156,9 +159,15 @@ MessageHandlerImpl::recover(bool requeue) } void -MessageHandlerImpl::reject(const SequenceNumberSet& /*transfers*/, uint16_t /*code*/, const string& /*text*/ ) +MessageHandlerImpl::reject(const SequenceNumberSet& transfers, uint16_t /*code*/, const string& /*text*/ ) { - //TODO: implement + if (transfers.size() % 2) { //must be even number + throw InvalidArgumentException("Received odd number of elements in list of transfers"); + } + + for (SequenceNumberSet::const_iterator i = transfers.begin(); i != transfers.end(); i++) { + session.reject(i->getValue(), (++i)->getValue()); + } } void MessageHandlerImpl::flow(const std::string& destination, u_int8_t unit, u_int32_t value) @@ -200,14 +209,31 @@ void MessageHandlerImpl::stop(const std::string& destination) session.stop(destination); } -void MessageHandlerImpl::acquire(const SequenceNumberSet& /*transfers*/, u_int8_t /*mode*/) +void MessageHandlerImpl::acquire(const SequenceNumberSet& transfers, u_int8_t /*mode*/) { - throw ConnectionException(540, "Not yet implemented"); + SequenceNumberSet results; + + if (transfers.size() % 2) { //must be even number + throw InvalidArgumentException("Received odd number of elements in list of transfers"); + } + + for (SequenceNumberSet::const_iterator i = transfers.begin(); i != transfers.end(); i++) { + session.acquire(i->getValue(), (++i)->getValue(), results); + } + + results = results.condense(); + client.acquired(results); } -void MessageHandlerImpl::release(const SequenceNumberSet& /*transfers*/) +void MessageHandlerImpl::release(const SequenceNumberSet& transfers) { - throw ConnectionException(540, "Not yet implemented"); + if (transfers.size() % 2) { //must be even number + throw InvalidArgumentException("Received odd number of elements in list of transfers"); + } + + for (SequenceNumberSet::const_iterator i = transfers.begin(); i != transfers.end(); i++) { + session.release(i->getValue(), (++i)->getValue()); + } } }} // namespace qpid::broker |