diff options
author | Gordon Sim <gsim@apache.org> | 2007-10-04 12:01:28 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2007-10-04 12:01:28 +0000 |
commit | 44c50b719e5685bd7a25cdeb9c5c9202ed8dc29a (patch) | |
tree | 0227574c78bff90a747c573393616b17ecddfccf /cpp/src/qpid/broker/MessageHandlerImpl.cpp | |
parent | 3263908aff26ae784d0399b03d869bfc1b035ebd (diff) | |
download | qpid-python-44c50b719e5685bd7a25cdeb9c5c9202ed8dc29a.tar.gz |
Fix (and refactor) processing of ranges in message handler.
Alter release() to push released messages onto head in reverse order (todo: make this atomic instead)
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@581869 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/MessageHandlerImpl.cpp')
-rw-r--r-- | cpp/src/qpid/broker/MessageHandlerImpl.cpp | 100 |
1 files changed, 37 insertions, 63 deletions
diff --git a/cpp/src/qpid/broker/MessageHandlerImpl.cpp b/cpp/src/qpid/broker/MessageHandlerImpl.cpp index 3d197e185d..b3880d86e5 100644 --- a/cpp/src/qpid/broker/MessageHandlerImpl.cpp +++ b/cpp/src/qpid/broker/MessageHandlerImpl.cpp @@ -29,13 +29,18 @@ #include <boost/format.hpp> #include <boost/cast.hpp> +#include <boost/bind.hpp> namespace qpid { namespace broker { using namespace framing; -MessageHandlerImpl::MessageHandlerImpl(SemanticState& s) : HandlerImpl(s) {} +MessageHandlerImpl::MessageHandlerImpl(SemanticState& s) : + HandlerImpl(s), + releaseOp(boost::bind(&SemanticState::release, &state, _1, _2)), + rejectOp(boost::bind(&SemanticState::reject, &state, _1, _2)) + {} // // Message class method handlers @@ -86,52 +91,18 @@ MessageHandlerImpl::offset(uint64_t /*value*/ ) } void -MessageHandlerImpl::subscribe(uint16_t /*ticket*/, - const string& queueName, - const string& destination, - bool noLocal, - u_int8_t confirmMode, - u_int8_t acquireMode, - bool exclusive, - const framing::FieldTable& filter ) -{ - Queue::shared_ptr queue = state.getQueue(queueName); - if(!destination.empty() && state.exists(destination)) - throw ConnectionException(530, "Consumer tags must be unique"); - - string tag = destination; - //NB: am assuming pre-acquired = 0 as discussed on SIG list as is - //the previously expected behaviour - state.consume(MessageDelivery::getMessageDeliveryToken(destination, confirmMode, acquireMode), - tag, queue, noLocal, confirmMode == 1, acquireMode == 0, exclusive, &filter); - // Dispatch messages as there is now a consumer. - queue->requestDispatch(); -} - - -void MessageHandlerImpl::get(uint16_t /*ticket*/, - const string& queueName, - const string& destination, - bool noAck ) + const string& /*queueName*/, + const string& /*destination*/, + bool /*noAck*/ ) { - Queue::shared_ptr queue = state.getQueue(queueName); - - if (state.get(MessageDelivery::getMessageDeliveryToken(destination, noAck ? 0 : 1, 0), queue, !noAck)){ - //don't send any response... rely on execution completion - } else { - //temporarily disabled: - //client.empty(); - } + throw ConnectionException(540, "get no longer supported"); } void MessageHandlerImpl::empty() { - // Shouldn't ever receive this as it is a response to get - // which is never sent - // TODO astitcher 2007-02-09 What is the correct exception to throw here? - THROW_QPID_ERROR(INTERNAL_ERROR, "Impossible"); + throw ConnectionException(540, "empty no longer supported"); } void @@ -151,6 +122,27 @@ MessageHandlerImpl::qos(uint32_t prefetchSize, } void +MessageHandlerImpl::subscribe(uint16_t /*ticket*/, + const string& queueName, + const string& destination, + bool noLocal, + u_int8_t confirmMode, + u_int8_t acquireMode, + bool exclusive, + const framing::FieldTable& filter ) +{ + Queue::shared_ptr queue = state.getQueue(queueName); + if(!destination.empty() && state.exists(destination)) + throw ConnectionException(530, "Consumer tags must be unique"); + + string tag = destination; + state.consume(MessageDelivery::getMessageDeliveryToken(destination, confirmMode, acquireMode), + tag, queue, noLocal, confirmMode == 1, acquireMode == 0, exclusive, &filter); + // Dispatch messages as there is now a consumer. + queue->requestDispatch(); +} + +void MessageHandlerImpl::recover(bool requeue) { state.recover(requeue); @@ -159,13 +151,7 @@ MessageHandlerImpl::recover(bool requeue) void MessageHandlerImpl::reject(const SequenceNumberSet& transfers, uint16_t /*code*/, const string& /*text*/ ) { - if (transfers.size() % 2) { //must be even number - throw InvalidArgumentException("Received odd number of elements in list of transfers"); - } - - for (SequenceNumberSet::const_iterator i = transfers.begin(); i != transfers.end(); i++) { - state.reject(i->getValue(), (++i)->getValue()); - } + transfers.processRanges(rejectOp); } void MessageHandlerImpl::flow(const std::string& destination, u_int8_t unit, u_int32_t value) @@ -209,29 +195,17 @@ void MessageHandlerImpl::stop(const std::string& destination) void MessageHandlerImpl::acquire(const SequenceNumberSet& transfers, u_int8_t /*mode*/) { - SequenceNumberSet results; - - if (transfers.size() % 2) { //must be even number - throw InvalidArgumentException("Received odd number of elements in list of transfers"); - } - - for (SequenceNumberSet::const_iterator i = transfers.begin(); i != transfers.end(); i++) { - state.acquire(i->getValue(), (++i)->getValue(), results); - } + //TODO: implement mode + SequenceNumberSet results; + transfers.processRanges(boost::bind(&SemanticState::acquire, &state, _1, _2, results)); results = results.condense(); getProxy().getMessage().acquired(results); } void MessageHandlerImpl::release(const SequenceNumberSet& transfers) { - if (transfers.size() % 2) { //must be even number - throw InvalidArgumentException("Received odd number of elements in list of transfers"); - } - - for (SequenceNumberSet::const_iterator i = transfers.begin(); i != transfers.end(); i++) { - state.release(i->getValue(), (++i)->getValue()); - } + transfers.processRanges(releaseOp); } }} // namespace qpid::broker |