diff options
Diffstat (limited to 'cpp/lib/broker/MessageHandlerImpl.cpp')
-rw-r--r-- | cpp/lib/broker/MessageHandlerImpl.cpp | 88 |
1 files changed, 48 insertions, 40 deletions
diff --git a/cpp/lib/broker/MessageHandlerImpl.cpp b/cpp/lib/broker/MessageHandlerImpl.cpp index 71100996e7..30b69e4654 100644 --- a/cpp/lib/broker/MessageHandlerImpl.cpp +++ b/cpp/lib/broker/MessageHandlerImpl.cpp @@ -23,6 +23,8 @@ #include "Connection.h" #include "Broker.h" #include "BrokerMessageMessage.h" +#include "MessageAppendBody.h" +#include "MessageTransferBody.h" namespace qpid { namespace broker { @@ -33,23 +35,23 @@ using namespace framing; // Message class method handlers // void -MessageHandlerImpl::append(const MethodContext&, - const string& /*reference*/, +MessageHandlerImpl::append(const MethodContext& context, + const string& reference, const string& /*bytes*/ ) { - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + references.get(reference).append( + boost::shared_polymorphic_downcast<MessageAppendBody>( + context.methodBody)); + sendOk(context); } void -MessageHandlerImpl::cancel( const MethodContext& context, - const string& destination ) +MessageHandlerImpl::cancel(const MethodContext& context, + const string& destination ) { - //assert(0); // FIXME astitcher 2007-01-11: 0-9 feature - channel.cancel(destination); - - connection.client->getMessageHandler()->ok(context); + sendOk(context); } void @@ -61,10 +63,11 @@ MessageHandlerImpl::checkpoint(const MethodContext&, } void -MessageHandlerImpl::close(const MethodContext&, - const string& /*reference*/ ) +MessageHandlerImpl::close(const MethodContext& context, + const string& reference) { - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + references.get(reference).close(); + sendOk(context); } void @@ -88,13 +91,16 @@ MessageHandlerImpl::consume(const MethodContext& context, string newTag = destination; channel.consume(newTag, queue, !noAck, exclusive, noLocal ? &connection : 0, &filter); - connection.client->getMessageHandler()->ok(context); + sendOk(context); //allow messages to be dispatched if required as there is now a consumer: queue->dispatch(); }catch(ExclusiveAccessException& e){ - if(exclusive) throw ChannelException(403, "Exclusive access cannot be granted"); - else throw ChannelException(403, "Access would violate previously granted exclusivity"); + if(exclusive) + throw ChannelException(403, "Exclusive access cannot be granted"); + else + throw ChannelException( + 403, "Access would violate previously granted exclusivity"); } } @@ -133,14 +139,15 @@ MessageHandlerImpl::offset(const MethodContext&, void MessageHandlerImpl::ok( const MethodContext& ) { - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + // TODO aconway 2007-02-05: For HA, we can drop acked messages here. } void -MessageHandlerImpl::open(const MethodContext&, - const string& /*reference*/ ) +MessageHandlerImpl::open(const MethodContext& context, + const string& reference) { - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + references.open(reference); + sendOk(context); } void @@ -155,7 +162,7 @@ MessageHandlerImpl::qos(const MethodContext& context, channel.setPrefetchSize(prefetchSize); channel.setPrefetchCount(prefetchCount); - connection.client->getMessageHandler()->ok(context); + sendOk(context); } void @@ -189,14 +196,14 @@ MessageHandlerImpl::transfer(const MethodContext& context, u_int16_t /*ticket*/, const string& /*destination*/, bool /*redelivered*/, - bool immediate, + bool /* immediate */, u_int64_t /*ttl*/, u_int8_t /*priority*/, u_int64_t /*timestamp*/, u_int8_t /*deliveryMode*/, u_int64_t /*expiration*/, const string& exchangeName, - const string& routingKey, + const string& /* routingKey */, const string& /*messageId*/, const string& /*correlationId*/, const string& /*replyTo*/, @@ -208,27 +215,28 @@ MessageHandlerImpl::transfer(const MethodContext& context, const string& /*securityToken*/, const qpid::framing::FieldTable& /*applicationHeaders*/, qpid::framing::Content body, - bool mandatory ) + bool /* mandatory */ ) { //assert(0); // FIXME astitcher 2007-01-11: 0-9 feature - - Exchange::shared_ptr exchange = exchangeName.empty() ? - broker.getExchanges().getDefault() : broker.getExchanges().get(exchangeName); - if(exchange){ - if (body.isInline()) { - MessageMessage* msg = - new MessageMessage(context.methodBody, exchangeName, - routingKey, mandatory, immediate); - channel.handlePublish(msg, exchange); - - connection.client->getMessageHandler()->ok(context); - } else { - // Don't handle reference content yet - assert(body.isInline()); - } - }else{ - throw ChannelException(404, "Exchange not found '" + exchangeName + "'"); + MessageTransferBody::shared_ptr transfer = + boost::shared_polymorphic_downcast<MessageTransferBody>( + context.methodBody); + // Verify the exchange exists, will throw if not. + broker.getExchanges().get(exchangeName); + if (body.isInline()) { + MessageMessage* msg = new MessageMessage(transfer); + // FIXME aconway 2007-02-05: Remove exchange parameter. + // use shared_ptr for message. + channel.handlePublish(msg, Exchange::shared_ptr()); + sendOk(context); + } else { + references.get(body.getValue()).transfer(transfer); } } + +void MessageHandlerImpl::sendOk(const MethodContext& context) { + connection.client->getMessageHandler()->ok(context); +} + }} // namespace qpid::broker |