diff options
author | Alan Conway <aconway@apache.org> | 2007-02-06 21:38:30 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2007-02-06 21:38:30 +0000 |
commit | 877e7ae368d4320bd60ba5750be207a5cac13f43 (patch) | |
tree | 9f0777c5e6069b537e13d1c1f88cc08560f47de3 /cpp/lib/broker/MessageHandlerImpl.cpp | |
parent | a0c19714ccb547c401e598189a36573ac750e809 (diff) | |
download | qpid-python-877e7ae368d4320bd60ba5750be207a5cac13f43.tar.gz |
* cpp/lib/broker/BrokerQueue.cpp (): Centralized exceptions.
* cpp/lib/broker/BrokerAdapter.cpp (consume): Moved exceptions to Queue
* cpp/lib/broker/BrokerChannel.cpp (consume): Moved exceptions to Queue
* cpp/lib/broker/BrokerMessageBase.cpp:
- Added getApplicationHeaders.
* cpp/lib/broker/BrokerMessageMessage.cpp:
- Fixed exchangeName/destination mix up.
- Removed redundant constructor.
- Added getApplicationHeaders
* cpp/lib/broker/MessageHandlerImpl.cpp:
- Added missing acknowledgements
- Replaced assert(0) with throw "unimplemented".
- Moved exchange existence exceptions to ExchangeRegistry
- Handle transfers with references.
* cpp/tests/Makefile.am (check): Don't run tests unless all libs built OK.
* cpp/tests/python_tests: Re-enabled python tests. Not all passing.
* python/tests/message.py (MessageTests.test_get): Replace get-ok with ok.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@504305 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/lib/broker/MessageHandlerImpl.cpp')
-rw-r--r-- | cpp/lib/broker/MessageHandlerImpl.cpp | 122 |
1 files changed, 57 insertions, 65 deletions
diff --git a/cpp/lib/broker/MessageHandlerImpl.cpp b/cpp/lib/broker/MessageHandlerImpl.cpp index e19afd0e67..5f5e9b84e7 100644 --- a/cpp/lib/broker/MessageHandlerImpl.cpp +++ b/cpp/lib/broker/MessageHandlerImpl.cpp @@ -1,4 +1,3 @@ - /* * * Copyright (c) 2006 The Apache Software Foundation @@ -17,6 +16,7 @@ * */ +#include "QpidError.h" #include "MessageHandlerImpl.h" #include "BrokerChannel.h" #include "FramingContent.h" @@ -31,6 +31,11 @@ namespace broker { using namespace framing; +MessageHandlerImpl::MessageHandlerImpl(Channel& ch, Connection& c, Broker& b) + : channel(ch), connection(c), broker(b), references(ch), + client(connection.client->getMessage()) +{} + // // Message class method handlers // @@ -42,7 +47,7 @@ MessageHandlerImpl::append(const MethodContext& context, references.get(reference).append( boost::shared_polymorphic_downcast<MessageAppendBody>( context.methodBody)); - sendOk(context); + client.ok(context); } @@ -51,7 +56,7 @@ MessageHandlerImpl::cancel(const MethodContext& context, const string& destination ) { channel.cancel(destination); - sendOk(context); + client.ok(context); } void @@ -59,7 +64,8 @@ MessageHandlerImpl::checkpoint(const MethodContext&, const string& /*reference*/, const string& /*identifier*/ ) { - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + // FIXME astitcher 2007-01-11: 0-9 feature + THROW_QPID_ERROR(INTERNAL_ERROR, "Unimplemented "); } void @@ -67,7 +73,7 @@ MessageHandlerImpl::close(const MethodContext& context, const string& reference) { references.get(reference).close(); - sendOk(context); + client.ok(context); } void @@ -80,32 +86,23 @@ MessageHandlerImpl::consume(const MethodContext& context, bool exclusive, const qpid::framing::FieldTable& filter ) { - Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId()); - if(!destination.empty() && channel.exists(destination)){ + Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId()); + if(!destination.empty() && channel.exists(destination)) throw ConnectionException(530, "Consumer tags must be unique"); - } - - try{ - string newTag = destination; - channel.consume(newTag, queue, !noAck, exclusive, noLocal ? &connection : 0, &filter); - - sendOk(context); - - //allow messages to be dispatched if required as there is now a consumer: - queue->dispatch(); - }catch(ExclusiveAccessException& e){ - if(exclusive) - throw ChannelException(403, "Exclusive access cannot be granted"); - else - throw ChannelException( - 403, "Access would violate previously granted exclusivity"); - } + string tag = destination; + channel.consume( + tag, queue, !noAck, exclusive, + noLocal ? &connection : 0, &filter); + client.ok(context); + // Dispatch messages as there is now a consumer. + queue->dispatch(); } void MessageHandlerImpl::empty( const MethodContext& ) { - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + // FIXME astitcher 2007-01-11: 0-9 feature + THROW_QPID_ERROR(INTERNAL_ERROR, "Unimplemented "); } void @@ -121,17 +118,18 @@ MessageHandlerImpl::get( const MethodContext& context, connection.getQueue(queueName, context.channel->getId()); // FIXME: get is probably Basic specific - if(!channel.get(queue, !noAck)){ - connection.client->getMessageHandler()->empty(context); - } - + if(channel.get(queue, !noAck)) + client.ok(context); + else + client.empty(context); } void MessageHandlerImpl::offset(const MethodContext&, u_int64_t /*value*/ ) { - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + // FIXME astitcher 2007-01-11: 0-9 feature + THROW_QPID_ERROR(INTERNAL_ERROR, "Unimplemented "); } void @@ -145,7 +143,7 @@ MessageHandlerImpl::open(const MethodContext& context, const string& reference) { references.open(reference); - sendOk(context); + client.ok(context); } void @@ -157,18 +155,17 @@ MessageHandlerImpl::qos(const MethodContext& context, //TODO: handle global channel.setPrefetchSize(prefetchSize); channel.setPrefetchCount(prefetchCount); - - sendOk(context); + client.ok(context); } void -MessageHandlerImpl::recover(const MethodContext&, - bool requeue ) +MessageHandlerImpl::recover(const MethodContext& context, + bool requeue) { - //assert(0); // FIXME astitcher 2007-01-11: 0-9 feature - + THROW_QPID_ERROR(INTERNAL_ERROR, "Unimplemented"); + // FIXME aconway 2007-02-06: Call to recover hangs client. channel.recover(requeue); - + client.ok(context); } void @@ -176,7 +173,8 @@ MessageHandlerImpl::reject(const MethodContext&, u_int16_t /*code*/, const string& /*text*/ ) { - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + // FIXME astitcher 2007-01-11: 0-9 feature + THROW_QPID_ERROR(INTERNAL_ERROR, "Unimplemented "); } void @@ -184,22 +182,23 @@ MessageHandlerImpl::resume(const MethodContext&, const string& /*reference*/, const string& /*identifier*/ ) { - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + // FIXME astitcher 2007-01-11: 0-9 feature + THROW_QPID_ERROR(INTERNAL_ERROR, "Unimplemented "); } void MessageHandlerImpl::transfer(const MethodContext& context, u_int16_t /*ticket*/, - const string& /*destination*/, + const string& destination, bool /*redelivered*/, - bool immediate, + bool /*immediate*/, u_int64_t /*ttl*/, u_int8_t /*priority*/, u_int64_t /*timestamp*/, u_int8_t /*deliveryMode*/, u_int64_t /*expiration*/, - const string& exchangeName, - const string& routingKey, + const string& /*exchangeName*/, + const string& /*routingKey*/, const string& /*messageId*/, const string& /*correlationId*/, const string& /*replyTo*/, @@ -211,30 +210,23 @@ MessageHandlerImpl::transfer(const MethodContext& context, const string& /*securityToken*/, const qpid::framing::FieldTable& /*applicationHeaders*/, qpid::framing::Content body, - bool mandatory) + bool /*mandatory*/) { - Exchange::shared_ptr exchange = exchangeName.empty() ? - broker.getExchanges().getDefault() : broker.getExchanges().get(exchangeName); - boost::shared_ptr<MessageTransferBody> transfer(boost::dynamic_pointer_cast<MessageTransferBody>(context.methodBody)); - if(exchange){ - if (body.isInline()) { - Message::shared_ptr msg(new MessageMessage(transfer, exchangeName, - routingKey, mandatory, immediate)); - - channel.handleInlineTransfer(msg, exchange); - - connection.client->getMessageHandler()->ok(context); - } else { - references.get(body.getValue()).transfer(transfer); - } - }else{ - throw ChannelException(404, "Exchange not found '" + exchangeName + "'"); + Exchange::shared_ptr exchange( + broker.getExchanges().get(destination)); + MessageTransferBody::shared_ptr transfer( + boost::shared_polymorphic_downcast<MessageTransferBody>( + context.methodBody)); + if (body.isInline()) { + Message::shared_ptr msg(new MessageMessage(transfer)); + channel.handleInlineTransfer(msg, exchange); + } + else { + // Add to reference. + references.get(body.getValue()).transfer(transfer); } + client.ok(context); } -void MessageHandlerImpl::sendOk(const MethodContext& context) { - connection.client->getMessageHandler()->ok(context); -} - }} // namespace qpid::broker |