summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2007-10-04 12:01:28 +0000
committerGordon Sim <gsim@apache.org>2007-10-04 12:01:28 +0000
commit44c50b719e5685bd7a25cdeb9c5c9202ed8dc29a (patch)
tree0227574c78bff90a747c573393616b17ecddfccf /cpp/src
parent3263908aff26ae784d0399b03d869bfc1b035ebd (diff)
downloadqpid-python-44c50b719e5685bd7a25cdeb9c5c9202ed8dc29a.tar.gz
Fix (and refactor) processing of ranges in message handler.
Alter release() to push released messages onto head in reverse order (todo: make this atomic instead) git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@581869 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/broker/DeliveryRecord.h1
-rw-r--r--cpp/src/qpid/broker/MessageHandlerImpl.cpp100
-rw-r--r--cpp/src/qpid/broker/MessageHandlerImpl.h6
-rw-r--r--cpp/src/qpid/broker/SemanticState.cpp6
-rw-r--r--cpp/src/qpid/framing/SequenceNumberSet.h16
5 files changed, 65 insertions, 64 deletions
diff --git a/cpp/src/qpid/broker/DeliveryRecord.h b/cpp/src/qpid/broker/DeliveryRecord.h
index 21944cd553..7312cd15b0 100644
--- a/cpp/src/qpid/broker/DeliveryRecord.h
+++ b/cpp/src/qpid/broker/DeliveryRecord.h
@@ -75,6 +75,7 @@ class DeliveryRecord{
friend std::ostream& operator<<(std::ostream&, const DeliveryRecord&);
};
+typedef std::list<DeliveryRecord> DeliveryRecords;
typedef std::list<DeliveryRecord>::iterator ack_iterator;
struct AckRange
diff --git a/cpp/src/qpid/broker/MessageHandlerImpl.cpp b/cpp/src/qpid/broker/MessageHandlerImpl.cpp
index 3d197e185d..b3880d86e5 100644
--- a/cpp/src/qpid/broker/MessageHandlerImpl.cpp
+++ b/cpp/src/qpid/broker/MessageHandlerImpl.cpp
@@ -29,13 +29,18 @@
#include <boost/format.hpp>
#include <boost/cast.hpp>
+#include <boost/bind.hpp>
namespace qpid {
namespace broker {
using namespace framing;
-MessageHandlerImpl::MessageHandlerImpl(SemanticState& s) : HandlerImpl(s) {}
+MessageHandlerImpl::MessageHandlerImpl(SemanticState& s) :
+ HandlerImpl(s),
+ releaseOp(boost::bind(&SemanticState::release, &state, _1, _2)),
+ rejectOp(boost::bind(&SemanticState::reject, &state, _1, _2))
+ {}
//
// Message class method handlers
@@ -86,52 +91,18 @@ MessageHandlerImpl::offset(uint64_t /*value*/ )
}
void
-MessageHandlerImpl::subscribe(uint16_t /*ticket*/,
- const string& queueName,
- const string& destination,
- bool noLocal,
- u_int8_t confirmMode,
- u_int8_t acquireMode,
- bool exclusive,
- const framing::FieldTable& filter )
-{
- Queue::shared_ptr queue = state.getQueue(queueName);
- if(!destination.empty() && state.exists(destination))
- throw ConnectionException(530, "Consumer tags must be unique");
-
- string tag = destination;
- //NB: am assuming pre-acquired = 0 as discussed on SIG list as is
- //the previously expected behaviour
- state.consume(MessageDelivery::getMessageDeliveryToken(destination, confirmMode, acquireMode),
- tag, queue, noLocal, confirmMode == 1, acquireMode == 0, exclusive, &filter);
- // Dispatch messages as there is now a consumer.
- queue->requestDispatch();
-}
-
-
-void
MessageHandlerImpl::get(uint16_t /*ticket*/,
- const string& queueName,
- const string& destination,
- bool noAck )
+ const string& /*queueName*/,
+ const string& /*destination*/,
+ bool /*noAck*/ )
{
- Queue::shared_ptr queue = state.getQueue(queueName);
-
- if (state.get(MessageDelivery::getMessageDeliveryToken(destination, noAck ? 0 : 1, 0), queue, !noAck)){
- //don't send any response... rely on execution completion
- } else {
- //temporarily disabled:
- //client.empty();
- }
+ throw ConnectionException(540, "get no longer supported");
}
void
MessageHandlerImpl::empty()
{
- // Shouldn't ever receive this as it is a response to get
- // which is never sent
- // TODO astitcher 2007-02-09 What is the correct exception to throw here?
- THROW_QPID_ERROR(INTERNAL_ERROR, "Impossible");
+ throw ConnectionException(540, "empty no longer supported");
}
void
@@ -151,6 +122,27 @@ MessageHandlerImpl::qos(uint32_t prefetchSize,
}
void
+MessageHandlerImpl::subscribe(uint16_t /*ticket*/,
+ const string& queueName,
+ const string& destination,
+ bool noLocal,
+ u_int8_t confirmMode,
+ u_int8_t acquireMode,
+ bool exclusive,
+ const framing::FieldTable& filter )
+{
+ Queue::shared_ptr queue = state.getQueue(queueName);
+ if(!destination.empty() && state.exists(destination))
+ throw ConnectionException(530, "Consumer tags must be unique");
+
+ string tag = destination;
+ state.consume(MessageDelivery::getMessageDeliveryToken(destination, confirmMode, acquireMode),
+ tag, queue, noLocal, confirmMode == 1, acquireMode == 0, exclusive, &filter);
+ // Dispatch messages as there is now a consumer.
+ queue->requestDispatch();
+}
+
+void
MessageHandlerImpl::recover(bool requeue)
{
state.recover(requeue);
@@ -159,13 +151,7 @@ MessageHandlerImpl::recover(bool requeue)
void
MessageHandlerImpl::reject(const SequenceNumberSet& transfers, uint16_t /*code*/, const string& /*text*/ )
{
- if (transfers.size() % 2) { //must be even number
- throw InvalidArgumentException("Received odd number of elements in list of transfers");
- }
-
- for (SequenceNumberSet::const_iterator i = transfers.begin(); i != transfers.end(); i++) {
- state.reject(i->getValue(), (++i)->getValue());
- }
+ transfers.processRanges(rejectOp);
}
void MessageHandlerImpl::flow(const std::string& destination, u_int8_t unit, u_int32_t value)
@@ -209,29 +195,17 @@ void MessageHandlerImpl::stop(const std::string& destination)
void MessageHandlerImpl::acquire(const SequenceNumberSet& transfers, u_int8_t /*mode*/)
{
- SequenceNumberSet results;
-
- if (transfers.size() % 2) { //must be even number
- throw InvalidArgumentException("Received odd number of elements in list of transfers");
- }
-
- for (SequenceNumberSet::const_iterator i = transfers.begin(); i != transfers.end(); i++) {
- state.acquire(i->getValue(), (++i)->getValue(), results);
- }
+ //TODO: implement mode
+ SequenceNumberSet results;
+ transfers.processRanges(boost::bind(&SemanticState::acquire, &state, _1, _2, results));
results = results.condense();
getProxy().getMessage().acquired(results);
}
void MessageHandlerImpl::release(const SequenceNumberSet& transfers)
{
- if (transfers.size() % 2) { //must be even number
- throw InvalidArgumentException("Received odd number of elements in list of transfers");
- }
-
- for (SequenceNumberSet::const_iterator i = transfers.begin(); i != transfers.end(); i++) {
- state.release(i->getValue(), (++i)->getValue());
- }
+ transfers.processRanges(releaseOp);
}
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/MessageHandlerImpl.h b/cpp/src/qpid/broker/MessageHandlerImpl.h
index d90159d4f7..dd70f35dbb 100644
--- a/cpp/src/qpid/broker/MessageHandlerImpl.h
+++ b/cpp/src/qpid/broker/MessageHandlerImpl.h
@@ -25,6 +25,8 @@
#include "qpid/framing/AMQP_ClientProxy.h"
#include "HandlerImpl.h"
+#include <boost/function.hpp>
+
namespace qpid {
namespace broker {
@@ -36,6 +38,10 @@ class MessageHandlerImpl :
public framing::AMQP_ServerOperations::MessageHandler,
public HandlerImpl
{
+ typedef boost::function<void(DeliveryId, DeliveryId)> RangedOperation;
+ RangedOperation releaseOp;
+ RangedOperation rejectOp;
+
public:
MessageHandlerImpl(SemanticState&);
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp
index 11bce282eb..e435ed0522 100644
--- a/cpp/src/qpid/broker/SemanticState.cpp
+++ b/cpp/src/qpid/broker/SemanticState.cpp
@@ -572,7 +572,11 @@ void SemanticState::release(DeliveryId first, DeliveryId last)
{
Mutex::ScopedLock locker(deliveryLock);
AckRange range = findRange(first, last);
- for_each(range.start, range.end, mem_fun_ref(&DeliveryRecord::release));
+ //release results in the message being added to the head so want
+ //to release in reverse order to keep the original transfer order
+ DeliveryRecords::reverse_iterator start(range.end);
+ DeliveryRecords::reverse_iterator end(range.start);
+ for_each(start, end, mem_fun_ref(&DeliveryRecord::release));
}
void SemanticState::reject(DeliveryId first, DeliveryId last)
diff --git a/cpp/src/qpid/framing/SequenceNumberSet.h b/cpp/src/qpid/framing/SequenceNumberSet.h
index 7643d68071..f9d0cc1fd4 100644
--- a/cpp/src/qpid/framing/SequenceNumberSet.h
+++ b/cpp/src/qpid/framing/SequenceNumberSet.h
@@ -26,6 +26,7 @@
#include "amqp_types.h"
#include "Buffer.h"
#include "SequenceNumber.h"
+#include "qpid/framing/reply_exceptions.h"
namespace qpid {
namespace framing {
@@ -41,9 +42,24 @@ public:
uint32_t encodedSize() const;
SequenceNumberSet condense() const;
+ template <class T>
+ void processRanges(T t) const
+ {
+ if (size() % 2) { //must be even number
+ throw InvalidArgumentException("SequenceNumberSet contains odd number of elements");
+ }
+
+ for (SequenceNumberSet::const_iterator i = begin(); i != end(); i++) {
+ SequenceNumber first = i->getValue();
+ SequenceNumber last = (++i)->getValue();
+ t(first, last);
+ }
+ }
+
friend std::ostream& operator<<(std::ostream&, const SequenceNumberSet&);
};
+
}} // namespace qpid::framing