summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/MessageHandlerImpl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/MessageHandlerImpl.cpp')
-rw-r--r--cpp/src/qpid/broker/MessageHandlerImpl.cpp42
1 files changed, 34 insertions, 8 deletions
diff --git a/cpp/src/qpid/broker/MessageHandlerImpl.cpp b/cpp/src/qpid/broker/MessageHandlerImpl.cpp
index 7529e3bb39..d9b91c1617 100644
--- a/cpp/src/qpid/broker/MessageHandlerImpl.cpp
+++ b/cpp/src/qpid/broker/MessageHandlerImpl.cpp
@@ -25,6 +25,7 @@
#include "MessageDelivery.h"
#include "qpid/framing/MessageAppendBody.h"
#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/framing/reply_exceptions.h"
#include "BrokerAdapter.h"
#include <boost/format.hpp>
@@ -92,7 +93,7 @@ MessageHandlerImpl::subscribe(uint16_t /*ticket*/,
const string& destination,
bool noLocal,
u_int8_t confirmMode,
- u_int8_t acquireMode,//TODO: implement acquire modes
+ u_int8_t acquireMode,
bool exclusive,
const framing::FieldTable& filter )
{
@@ -101,8 +102,10 @@ MessageHandlerImpl::subscribe(uint16_t /*ticket*/,
throw ConnectionException(530, "Consumer tags must be unique");
string tag = destination;
+ //NB: am assuming pre-acquired = 0 as discussed on SIG list as is
+ //the previously expected behaviour
session.consume(MessageDelivery::getMessageDeliveryToken(destination, confirmMode, acquireMode),
- tag, queue, noLocal, confirmMode == 1, exclusive, &filter);
+ tag, queue, noLocal, confirmMode == 1, acquireMode == 0, exclusive, &filter);
// Dispatch messages as there is now a consumer.
queue->requestDispatch();
}
@@ -156,9 +159,15 @@ MessageHandlerImpl::recover(bool requeue)
}
void
-MessageHandlerImpl::reject(const SequenceNumberSet& /*transfers*/, uint16_t /*code*/, const string& /*text*/ )
+MessageHandlerImpl::reject(const SequenceNumberSet& transfers, uint16_t /*code*/, const string& /*text*/ )
{
- //TODO: implement
+ if (transfers.size() % 2) { //must be even number
+ throw InvalidArgumentException("Received odd number of elements in list of transfers");
+ }
+
+ for (SequenceNumberSet::const_iterator i = transfers.begin(); i != transfers.end(); i++) {
+ session.reject(i->getValue(), (++i)->getValue());
+ }
}
void MessageHandlerImpl::flow(const std::string& destination, u_int8_t unit, u_int32_t value)
@@ -200,14 +209,31 @@ void MessageHandlerImpl::stop(const std::string& destination)
session.stop(destination);
}
-void MessageHandlerImpl::acquire(const SequenceNumberSet& /*transfers*/, u_int8_t /*mode*/)
+void MessageHandlerImpl::acquire(const SequenceNumberSet& transfers, u_int8_t /*mode*/)
{
- throw ConnectionException(540, "Not yet implemented");
+ SequenceNumberSet results;
+
+ if (transfers.size() % 2) { //must be even number
+ throw InvalidArgumentException("Received odd number of elements in list of transfers");
+ }
+
+ for (SequenceNumberSet::const_iterator i = transfers.begin(); i != transfers.end(); i++) {
+ session.acquire(i->getValue(), (++i)->getValue(), results);
+ }
+
+ results = results.condense();
+ client.acquired(results);
}
-void MessageHandlerImpl::release(const SequenceNumberSet& /*transfers*/)
+void MessageHandlerImpl::release(const SequenceNumberSet& transfers)
{
- throw ConnectionException(540, "Not yet implemented");
+ if (transfers.size() % 2) { //must be even number
+ throw InvalidArgumentException("Received odd number of elements in list of transfers");
+ }
+
+ for (SequenceNumberSet::const_iterator i = transfers.begin(); i != transfers.end(); i++) {
+ session.release(i->getValue(), (++i)->getValue());
+ }
}
}} // namespace qpid::broker