summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/MessageHandlerImpl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/MessageHandlerImpl.cpp')
-rw-r--r--cpp/src/qpid/broker/MessageHandlerImpl.cpp47
1 files changed, 45 insertions, 2 deletions
diff --git a/cpp/src/qpid/broker/MessageHandlerImpl.cpp b/cpp/src/qpid/broker/MessageHandlerImpl.cpp
index da57439e21..c728a800ab 100644
--- a/cpp/src/qpid/broker/MessageHandlerImpl.cpp
+++ b/cpp/src/qpid/broker/MessageHandlerImpl.cpp
@@ -27,6 +27,8 @@
#include "qpid/framing/MessageTransferBody.h"
#include "BrokerAdapter.h"
+#include <boost/format.hpp>
+
namespace qpid {
namespace broker {
@@ -96,7 +98,7 @@ MessageHandlerImpl::consume(uint16_t /*ticket*/,
if(!destination.empty() && channel.exists(destination))
throw ConnectionException(530, "Consumer tags must be unique");
string tag = destination;
- channel.consume(MessageMessage::getToken(destination), tag, queue, !noAck, exclusive, noLocal ? &connection : 0, &filter);
+ channel.consume(MessageMessage::getToken(destination), tag, queue, noLocal, !noAck, exclusive, &filter);
// Dispatch messages as there is now a consumer.
queue->requestDispatch();
}
@@ -130,7 +132,7 @@ MessageHandlerImpl::empty()
void
MessageHandlerImpl::ok()
{
- channel.ack(adapter.getFirstAckRequest(), adapter.getLastAckRequest());
+ throw ConnectionException(540, "Message.Ok no longer supported");
}
void
@@ -171,4 +173,45 @@ MessageHandlerImpl::transfer(const framing::MethodContext& context)
}
+
+void MessageHandlerImpl::flow(const std::string& destination, u_int8_t unit, u_int32_t value)
+{
+
+ if (unit == 0) {
+ //message
+ channel.addMessageCredit(destination, value);
+ } else if (unit == 1) {
+ //bytes
+ channel.addByteCredit(destination, value);
+ } else {
+ //unknown
+ throw ConnectionException(502, boost::format("Invalid value for unit %1%") % unit);
+ }
+
+}
+
+void MessageHandlerImpl::flowMode(const std::string& destination, u_int8_t mode)
+{
+ if (mode == 0) {
+ //credit
+ channel.setCreditMode(destination);
+ } else if (mode == 1) {
+ //window
+ channel.setWindowMode(destination);
+ } else{
+ throw ConnectionException(502, boost::format("Invalid value for mode %1%") % mode);
+ }
+}
+
+void MessageHandlerImpl::flush(const std::string& destination)
+{
+ channel.flush(destination);
+}
+
+void MessageHandlerImpl::stop(const std::string& destination)
+{
+ channel.stop(destination);
+}
+
+
}} // namespace qpid::broker