summaryrefslogtreecommitdiff
path: root/cpp/lib/broker/MessageHandlerImpl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/lib/broker/MessageHandlerImpl.cpp')
-rw-r--r--cpp/lib/broker/MessageHandlerImpl.cpp88
1 files changed, 48 insertions, 40 deletions
diff --git a/cpp/lib/broker/MessageHandlerImpl.cpp b/cpp/lib/broker/MessageHandlerImpl.cpp
index 71100996e7..30b69e4654 100644
--- a/cpp/lib/broker/MessageHandlerImpl.cpp
+++ b/cpp/lib/broker/MessageHandlerImpl.cpp
@@ -23,6 +23,8 @@
#include "Connection.h"
#include "Broker.h"
#include "BrokerMessageMessage.h"
+#include "MessageAppendBody.h"
+#include "MessageTransferBody.h"
namespace qpid {
namespace broker {
@@ -33,23 +35,23 @@ using namespace framing;
// Message class method handlers
//
void
-MessageHandlerImpl::append(const MethodContext&,
- const string& /*reference*/,
+MessageHandlerImpl::append(const MethodContext& context,
+ const string& reference,
const string& /*bytes*/ )
{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+ references.get(reference).append(
+ boost::shared_polymorphic_downcast<MessageAppendBody>(
+ context.methodBody));
+ sendOk(context);
}
void
-MessageHandlerImpl::cancel( const MethodContext& context,
- const string& destination )
+MessageHandlerImpl::cancel(const MethodContext& context,
+ const string& destination )
{
- //assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
-
channel.cancel(destination);
-
- connection.client->getMessageHandler()->ok(context);
+ sendOk(context);
}
void
@@ -61,10 +63,11 @@ MessageHandlerImpl::checkpoint(const MethodContext&,
}
void
-MessageHandlerImpl::close(const MethodContext&,
- const string& /*reference*/ )
+MessageHandlerImpl::close(const MethodContext& context,
+ const string& reference)
{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+ references.get(reference).close();
+ sendOk(context);
}
void
@@ -88,13 +91,16 @@ MessageHandlerImpl::consume(const MethodContext& context,
string newTag = destination;
channel.consume(newTag, queue, !noAck, exclusive, noLocal ? &connection : 0, &filter);
- connection.client->getMessageHandler()->ok(context);
+ sendOk(context);
//allow messages to be dispatched if required as there is now a consumer:
queue->dispatch();
}catch(ExclusiveAccessException& e){
- if(exclusive) throw ChannelException(403, "Exclusive access cannot be granted");
- else throw ChannelException(403, "Access would violate previously granted exclusivity");
+ if(exclusive)
+ throw ChannelException(403, "Exclusive access cannot be granted");
+ else
+ throw ChannelException(
+ 403, "Access would violate previously granted exclusivity");
}
}
@@ -133,14 +139,15 @@ MessageHandlerImpl::offset(const MethodContext&,
void
MessageHandlerImpl::ok( const MethodContext& )
{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+ // TODO aconway 2007-02-05: For HA, we can drop acked messages here.
}
void
-MessageHandlerImpl::open(const MethodContext&,
- const string& /*reference*/ )
+MessageHandlerImpl::open(const MethodContext& context,
+ const string& reference)
{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+ references.open(reference);
+ sendOk(context);
}
void
@@ -155,7 +162,7 @@ MessageHandlerImpl::qos(const MethodContext& context,
channel.setPrefetchSize(prefetchSize);
channel.setPrefetchCount(prefetchCount);
- connection.client->getMessageHandler()->ok(context);
+ sendOk(context);
}
void
@@ -189,14 +196,14 @@ MessageHandlerImpl::transfer(const MethodContext& context,
u_int16_t /*ticket*/,
const string& /*destination*/,
bool /*redelivered*/,
- bool immediate,
+ bool /* immediate */,
u_int64_t /*ttl*/,
u_int8_t /*priority*/,
u_int64_t /*timestamp*/,
u_int8_t /*deliveryMode*/,
u_int64_t /*expiration*/,
const string& exchangeName,
- const string& routingKey,
+ const string& /* routingKey */,
const string& /*messageId*/,
const string& /*correlationId*/,
const string& /*replyTo*/,
@@ -208,27 +215,28 @@ MessageHandlerImpl::transfer(const MethodContext& context,
const string& /*securityToken*/,
const qpid::framing::FieldTable& /*applicationHeaders*/,
qpid::framing::Content body,
- bool mandatory )
+ bool /* mandatory */ )
{
//assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
-
- Exchange::shared_ptr exchange = exchangeName.empty() ?
- broker.getExchanges().getDefault() : broker.getExchanges().get(exchangeName);
- if(exchange){
- if (body.isInline()) {
- MessageMessage* msg =
- new MessageMessage(context.methodBody, exchangeName,
- routingKey, mandatory, immediate);
- channel.handlePublish(msg, exchange);
-
- connection.client->getMessageHandler()->ok(context);
- } else {
- // Don't handle reference content yet
- assert(body.isInline());
- }
- }else{
- throw ChannelException(404, "Exchange not found '" + exchangeName + "'");
+ MessageTransferBody::shared_ptr transfer =
+ boost::shared_polymorphic_downcast<MessageTransferBody>(
+ context.methodBody);
+ // Verify the exchange exists, will throw if not.
+ broker.getExchanges().get(exchangeName);
+ if (body.isInline()) {
+ MessageMessage* msg = new MessageMessage(transfer);
+ // FIXME aconway 2007-02-05: Remove exchange parameter.
+ // use shared_ptr for message.
+ channel.handlePublish(msg, Exchange::shared_ptr());
+ sendOk(context);
+ } else {
+ references.get(body.getValue()).transfer(transfer);
}
}
+
+void MessageHandlerImpl::sendOk(const MethodContext& context) {
+ connection.client->getMessageHandler()->ok(context);
+}
+
}} // namespace qpid::broker