summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/MessageHandlerImpl.cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2007-10-04 12:01:28 +0000
committerGordon Sim <gsim@apache.org>2007-10-04 12:01:28 +0000
commit44c50b719e5685bd7a25cdeb9c5c9202ed8dc29a (patch)
tree0227574c78bff90a747c573393616b17ecddfccf /cpp/src/qpid/broker/MessageHandlerImpl.cpp
parent3263908aff26ae784d0399b03d869bfc1b035ebd (diff)
downloadqpid-python-44c50b719e5685bd7a25cdeb9c5c9202ed8dc29a.tar.gz
Fix (and refactor) processing of ranges in message handler.
Alter release() to push released messages onto head in reverse order (todo: make this atomic instead) git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@581869 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/MessageHandlerImpl.cpp')
-rw-r--r--cpp/src/qpid/broker/MessageHandlerImpl.cpp100
1 files changed, 37 insertions, 63 deletions
diff --git a/cpp/src/qpid/broker/MessageHandlerImpl.cpp b/cpp/src/qpid/broker/MessageHandlerImpl.cpp
index 3d197e185d..b3880d86e5 100644
--- a/cpp/src/qpid/broker/MessageHandlerImpl.cpp
+++ b/cpp/src/qpid/broker/MessageHandlerImpl.cpp
@@ -29,13 +29,18 @@
#include <boost/format.hpp>
#include <boost/cast.hpp>
+#include <boost/bind.hpp>
namespace qpid {
namespace broker {
using namespace framing;
-MessageHandlerImpl::MessageHandlerImpl(SemanticState& s) : HandlerImpl(s) {}
+MessageHandlerImpl::MessageHandlerImpl(SemanticState& s) :
+ HandlerImpl(s),
+ releaseOp(boost::bind(&SemanticState::release, &state, _1, _2)),
+ rejectOp(boost::bind(&SemanticState::reject, &state, _1, _2))
+ {}
//
// Message class method handlers
@@ -86,52 +91,18 @@ MessageHandlerImpl::offset(uint64_t /*value*/ )
}
void
-MessageHandlerImpl::subscribe(uint16_t /*ticket*/,
- const string& queueName,
- const string& destination,
- bool noLocal,
- u_int8_t confirmMode,
- u_int8_t acquireMode,
- bool exclusive,
- const framing::FieldTable& filter )
-{
- Queue::shared_ptr queue = state.getQueue(queueName);
- if(!destination.empty() && state.exists(destination))
- throw ConnectionException(530, "Consumer tags must be unique");
-
- string tag = destination;
- //NB: am assuming pre-acquired = 0 as discussed on SIG list as is
- //the previously expected behaviour
- state.consume(MessageDelivery::getMessageDeliveryToken(destination, confirmMode, acquireMode),
- tag, queue, noLocal, confirmMode == 1, acquireMode == 0, exclusive, &filter);
- // Dispatch messages as there is now a consumer.
- queue->requestDispatch();
-}
-
-
-void
MessageHandlerImpl::get(uint16_t /*ticket*/,
- const string& queueName,
- const string& destination,
- bool noAck )
+ const string& /*queueName*/,
+ const string& /*destination*/,
+ bool /*noAck*/ )
{
- Queue::shared_ptr queue = state.getQueue(queueName);
-
- if (state.get(MessageDelivery::getMessageDeliveryToken(destination, noAck ? 0 : 1, 0), queue, !noAck)){
- //don't send any response... rely on execution completion
- } else {
- //temporarily disabled:
- //client.empty();
- }
+ throw ConnectionException(540, "get no longer supported");
}
void
MessageHandlerImpl::empty()
{
- // Shouldn't ever receive this as it is a response to get
- // which is never sent
- // TODO astitcher 2007-02-09 What is the correct exception to throw here?
- THROW_QPID_ERROR(INTERNAL_ERROR, "Impossible");
+ throw ConnectionException(540, "empty no longer supported");
}
void
@@ -151,6 +122,27 @@ MessageHandlerImpl::qos(uint32_t prefetchSize,
}
void
+MessageHandlerImpl::subscribe(uint16_t /*ticket*/,
+ const string& queueName,
+ const string& destination,
+ bool noLocal,
+ u_int8_t confirmMode,
+ u_int8_t acquireMode,
+ bool exclusive,
+ const framing::FieldTable& filter )
+{
+ Queue::shared_ptr queue = state.getQueue(queueName);
+ if(!destination.empty() && state.exists(destination))
+ throw ConnectionException(530, "Consumer tags must be unique");
+
+ string tag = destination;
+ state.consume(MessageDelivery::getMessageDeliveryToken(destination, confirmMode, acquireMode),
+ tag, queue, noLocal, confirmMode == 1, acquireMode == 0, exclusive, &filter);
+ // Dispatch messages as there is now a consumer.
+ queue->requestDispatch();
+}
+
+void
MessageHandlerImpl::recover(bool requeue)
{
state.recover(requeue);
@@ -159,13 +151,7 @@ MessageHandlerImpl::recover(bool requeue)
void
MessageHandlerImpl::reject(const SequenceNumberSet& transfers, uint16_t /*code*/, const string& /*text*/ )
{
- if (transfers.size() % 2) { //must be even number
- throw InvalidArgumentException("Received odd number of elements in list of transfers");
- }
-
- for (SequenceNumberSet::const_iterator i = transfers.begin(); i != transfers.end(); i++) {
- state.reject(i->getValue(), (++i)->getValue());
- }
+ transfers.processRanges(rejectOp);
}
void MessageHandlerImpl::flow(const std::string& destination, u_int8_t unit, u_int32_t value)
@@ -209,29 +195,17 @@ void MessageHandlerImpl::stop(const std::string& destination)
void MessageHandlerImpl::acquire(const SequenceNumberSet& transfers, u_int8_t /*mode*/)
{
- SequenceNumberSet results;
-
- if (transfers.size() % 2) { //must be even number
- throw InvalidArgumentException("Received odd number of elements in list of transfers");
- }
-
- for (SequenceNumberSet::const_iterator i = transfers.begin(); i != transfers.end(); i++) {
- state.acquire(i->getValue(), (++i)->getValue(), results);
- }
+ //TODO: implement mode
+ SequenceNumberSet results;
+ transfers.processRanges(boost::bind(&SemanticState::acquire, &state, _1, _2, results));
results = results.condense();
getProxy().getMessage().acquired(results);
}
void MessageHandlerImpl::release(const SequenceNumberSet& transfers)
{
- if (transfers.size() % 2) { //must be even number
- throw InvalidArgumentException("Received odd number of elements in list of transfers");
- }
-
- for (SequenceNumberSet::const_iterator i = transfers.begin(); i != transfers.end(); i++) {
- state.release(i->getValue(), (++i)->getValue());
- }
+ transfers.processRanges(releaseOp);
}
}} // namespace qpid::broker