diff options
author | Gordon Sim <gsim@apache.org> | 2007-10-04 12:01:28 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2007-10-04 12:01:28 +0000 |
commit | 44c50b719e5685bd7a25cdeb9c5c9202ed8dc29a (patch) | |
tree | 0227574c78bff90a747c573393616b17ecddfccf /cpp/src | |
parent | 3263908aff26ae784d0399b03d869bfc1b035ebd (diff) | |
download | qpid-python-44c50b719e5685bd7a25cdeb9c5c9202ed8dc29a.tar.gz |
Fix (and refactor) processing of ranges in message handler.
Alter release() to push released messages onto head in reverse order (todo: make this atomic instead)
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@581869 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/broker/DeliveryRecord.h | 1 | ||||
-rw-r--r-- | cpp/src/qpid/broker/MessageHandlerImpl.cpp | 100 | ||||
-rw-r--r-- | cpp/src/qpid/broker/MessageHandlerImpl.h | 6 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SemanticState.cpp | 6 | ||||
-rw-r--r-- | cpp/src/qpid/framing/SequenceNumberSet.h | 16 |
5 files changed, 65 insertions, 64 deletions
diff --git a/cpp/src/qpid/broker/DeliveryRecord.h b/cpp/src/qpid/broker/DeliveryRecord.h index 21944cd553..7312cd15b0 100644 --- a/cpp/src/qpid/broker/DeliveryRecord.h +++ b/cpp/src/qpid/broker/DeliveryRecord.h @@ -75,6 +75,7 @@ class DeliveryRecord{ friend std::ostream& operator<<(std::ostream&, const DeliveryRecord&); }; +typedef std::list<DeliveryRecord> DeliveryRecords; typedef std::list<DeliveryRecord>::iterator ack_iterator; struct AckRange diff --git a/cpp/src/qpid/broker/MessageHandlerImpl.cpp b/cpp/src/qpid/broker/MessageHandlerImpl.cpp index 3d197e185d..b3880d86e5 100644 --- a/cpp/src/qpid/broker/MessageHandlerImpl.cpp +++ b/cpp/src/qpid/broker/MessageHandlerImpl.cpp @@ -29,13 +29,18 @@ #include <boost/format.hpp> #include <boost/cast.hpp> +#include <boost/bind.hpp> namespace qpid { namespace broker { using namespace framing; -MessageHandlerImpl::MessageHandlerImpl(SemanticState& s) : HandlerImpl(s) {} +MessageHandlerImpl::MessageHandlerImpl(SemanticState& s) : + HandlerImpl(s), + releaseOp(boost::bind(&SemanticState::release, &state, _1, _2)), + rejectOp(boost::bind(&SemanticState::reject, &state, _1, _2)) + {} // // Message class method handlers @@ -86,52 +91,18 @@ MessageHandlerImpl::offset(uint64_t /*value*/ ) } void -MessageHandlerImpl::subscribe(uint16_t /*ticket*/, - const string& queueName, - const string& destination, - bool noLocal, - u_int8_t confirmMode, - u_int8_t acquireMode, - bool exclusive, - const framing::FieldTable& filter ) -{ - Queue::shared_ptr queue = state.getQueue(queueName); - if(!destination.empty() && state.exists(destination)) - throw ConnectionException(530, "Consumer tags must be unique"); - - string tag = destination; - //NB: am assuming pre-acquired = 0 as discussed on SIG list as is - //the previously expected behaviour - state.consume(MessageDelivery::getMessageDeliveryToken(destination, confirmMode, acquireMode), - tag, queue, noLocal, confirmMode == 1, acquireMode == 0, exclusive, &filter); - // Dispatch messages as there is now a consumer. - queue->requestDispatch(); -} - - -void MessageHandlerImpl::get(uint16_t /*ticket*/, - const string& queueName, - const string& destination, - bool noAck ) + const string& /*queueName*/, + const string& /*destination*/, + bool /*noAck*/ ) { - Queue::shared_ptr queue = state.getQueue(queueName); - - if (state.get(MessageDelivery::getMessageDeliveryToken(destination, noAck ? 0 : 1, 0), queue, !noAck)){ - //don't send any response... rely on execution completion - } else { - //temporarily disabled: - //client.empty(); - } + throw ConnectionException(540, "get no longer supported"); } void MessageHandlerImpl::empty() { - // Shouldn't ever receive this as it is a response to get - // which is never sent - // TODO astitcher 2007-02-09 What is the correct exception to throw here? - THROW_QPID_ERROR(INTERNAL_ERROR, "Impossible"); + throw ConnectionException(540, "empty no longer supported"); } void @@ -151,6 +122,27 @@ MessageHandlerImpl::qos(uint32_t prefetchSize, } void +MessageHandlerImpl::subscribe(uint16_t /*ticket*/, + const string& queueName, + const string& destination, + bool noLocal, + u_int8_t confirmMode, + u_int8_t acquireMode, + bool exclusive, + const framing::FieldTable& filter ) +{ + Queue::shared_ptr queue = state.getQueue(queueName); + if(!destination.empty() && state.exists(destination)) + throw ConnectionException(530, "Consumer tags must be unique"); + + string tag = destination; + state.consume(MessageDelivery::getMessageDeliveryToken(destination, confirmMode, acquireMode), + tag, queue, noLocal, confirmMode == 1, acquireMode == 0, exclusive, &filter); + // Dispatch messages as there is now a consumer. + queue->requestDispatch(); +} + +void MessageHandlerImpl::recover(bool requeue) { state.recover(requeue); @@ -159,13 +151,7 @@ MessageHandlerImpl::recover(bool requeue) void MessageHandlerImpl::reject(const SequenceNumberSet& transfers, uint16_t /*code*/, const string& /*text*/ ) { - if (transfers.size() % 2) { //must be even number - throw InvalidArgumentException("Received odd number of elements in list of transfers"); - } - - for (SequenceNumberSet::const_iterator i = transfers.begin(); i != transfers.end(); i++) { - state.reject(i->getValue(), (++i)->getValue()); - } + transfers.processRanges(rejectOp); } void MessageHandlerImpl::flow(const std::string& destination, u_int8_t unit, u_int32_t value) @@ -209,29 +195,17 @@ void MessageHandlerImpl::stop(const std::string& destination) void MessageHandlerImpl::acquire(const SequenceNumberSet& transfers, u_int8_t /*mode*/) { - SequenceNumberSet results; - - if (transfers.size() % 2) { //must be even number - throw InvalidArgumentException("Received odd number of elements in list of transfers"); - } - - for (SequenceNumberSet::const_iterator i = transfers.begin(); i != transfers.end(); i++) { - state.acquire(i->getValue(), (++i)->getValue(), results); - } + //TODO: implement mode + SequenceNumberSet results; + transfers.processRanges(boost::bind(&SemanticState::acquire, &state, _1, _2, results)); results = results.condense(); getProxy().getMessage().acquired(results); } void MessageHandlerImpl::release(const SequenceNumberSet& transfers) { - if (transfers.size() % 2) { //must be even number - throw InvalidArgumentException("Received odd number of elements in list of transfers"); - } - - for (SequenceNumberSet::const_iterator i = transfers.begin(); i != transfers.end(); i++) { - state.release(i->getValue(), (++i)->getValue()); - } + transfers.processRanges(releaseOp); } }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/MessageHandlerImpl.h b/cpp/src/qpid/broker/MessageHandlerImpl.h index d90159d4f7..dd70f35dbb 100644 --- a/cpp/src/qpid/broker/MessageHandlerImpl.h +++ b/cpp/src/qpid/broker/MessageHandlerImpl.h @@ -25,6 +25,8 @@ #include "qpid/framing/AMQP_ClientProxy.h" #include "HandlerImpl.h" +#include <boost/function.hpp> + namespace qpid { namespace broker { @@ -36,6 +38,10 @@ class MessageHandlerImpl : public framing::AMQP_ServerOperations::MessageHandler, public HandlerImpl { + typedef boost::function<void(DeliveryId, DeliveryId)> RangedOperation; + RangedOperation releaseOp; + RangedOperation rejectOp; + public: MessageHandlerImpl(SemanticState&); diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp index 11bce282eb..e435ed0522 100644 --- a/cpp/src/qpid/broker/SemanticState.cpp +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -572,7 +572,11 @@ void SemanticState::release(DeliveryId first, DeliveryId last) { Mutex::ScopedLock locker(deliveryLock); AckRange range = findRange(first, last); - for_each(range.start, range.end, mem_fun_ref(&DeliveryRecord::release)); + //release results in the message being added to the head so want + //to release in reverse order to keep the original transfer order + DeliveryRecords::reverse_iterator start(range.end); + DeliveryRecords::reverse_iterator end(range.start); + for_each(start, end, mem_fun_ref(&DeliveryRecord::release)); } void SemanticState::reject(DeliveryId first, DeliveryId last) diff --git a/cpp/src/qpid/framing/SequenceNumberSet.h b/cpp/src/qpid/framing/SequenceNumberSet.h index 7643d68071..f9d0cc1fd4 100644 --- a/cpp/src/qpid/framing/SequenceNumberSet.h +++ b/cpp/src/qpid/framing/SequenceNumberSet.h @@ -26,6 +26,7 @@ #include "amqp_types.h" #include "Buffer.h" #include "SequenceNumber.h" +#include "qpid/framing/reply_exceptions.h" namespace qpid { namespace framing { @@ -41,9 +42,24 @@ public: uint32_t encodedSize() const; SequenceNumberSet condense() const; + template <class T> + void processRanges(T t) const + { + if (size() % 2) { //must be even number + throw InvalidArgumentException("SequenceNumberSet contains odd number of elements"); + } + + for (SequenceNumberSet::const_iterator i = begin(); i != end(); i++) { + SequenceNumber first = i->getValue(); + SequenceNumber last = (++i)->getValue(); + t(first, last); + } + } + friend std::ostream& operator<<(std::ostream&, const SequenceNumberSet&); }; + }} // namespace qpid::framing |