diff options
author | Alan Conway <aconway@apache.org> | 2007-02-06 21:38:30 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2007-02-06 21:38:30 +0000 |
commit | 877e7ae368d4320bd60ba5750be207a5cac13f43 (patch) | |
tree | 9f0777c5e6069b537e13d1c1f88cc08560f47de3 /cpp | |
parent | a0c19714ccb547c401e598189a36573ac750e809 (diff) | |
download | qpid-python-877e7ae368d4320bd60ba5750be207a5cac13f43.tar.gz |
* cpp/lib/broker/BrokerQueue.cpp (): Centralized exceptions.
* cpp/lib/broker/BrokerAdapter.cpp (consume): Moved exceptions to Queue
* cpp/lib/broker/BrokerChannel.cpp (consume): Moved exceptions to Queue
* cpp/lib/broker/BrokerMessageBase.cpp:
- Added getApplicationHeaders.
* cpp/lib/broker/BrokerMessageMessage.cpp:
- Fixed exchangeName/destination mix up.
- Removed redundant constructor.
- Added getApplicationHeaders
* cpp/lib/broker/MessageHandlerImpl.cpp:
- Added missing acknowledgements
- Replaced assert(0) with throw "unimplemented".
- Moved exchange existence exceptions to ExchangeRegistry
- Handle transfers with references.
* cpp/tests/Makefile.am (check): Don't run tests unless all libs built OK.
* cpp/tests/python_tests: Re-enabled python tests. Not all passing.
* python/tests/message.py (MessageTests.test_get): Replace get-ok with ok.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@504305 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/lib/broker/BrokerAdapter.cpp | 29 | ||||
-rw-r--r-- | cpp/lib/broker/BrokerChannel.cpp | 15 | ||||
-rw-r--r-- | cpp/lib/broker/BrokerMessage.cpp | 9 | ||||
-rw-r--r-- | cpp/lib/broker/BrokerMessage.h | 1 | ||||
-rw-r--r-- | cpp/lib/broker/BrokerMessageBase.h | 6 | ||||
-rw-r--r-- | cpp/lib/broker/BrokerMessageMessage.cpp | 68 | ||||
-rw-r--r-- | cpp/lib/broker/BrokerMessageMessage.h | 9 | ||||
-rw-r--r-- | cpp/lib/broker/BrokerQueue.cpp | 17 | ||||
-rw-r--r-- | cpp/lib/broker/BrokerQueue.h | 6 | ||||
-rw-r--r-- | cpp/lib/broker/MessageHandlerImpl.cpp | 122 | ||||
-rw-r--r-- | cpp/lib/broker/MessageHandlerImpl.h | 7 | ||||
-rw-r--r-- | cpp/tests/Makefile.am | 3 | ||||
-rwxr-xr-x | cpp/tests/python_tests | 4 |
13 files changed, 155 insertions, 141 deletions
diff --git a/cpp/lib/broker/BrokerAdapter.cpp b/cpp/lib/broker/BrokerAdapter.cpp index c9d44c7445..625dda1480 100644 --- a/cpp/lib/broker/BrokerAdapter.cpp +++ b/cpp/lib/broker/BrokerAdapter.cpp @@ -15,6 +15,8 @@ * limitations under the License. * */ +#include <boost/format.hpp> + #include "BrokerAdapter.h" #include "Connection.h" #include "Exception.h" @@ -25,6 +27,7 @@ namespace qpid { namespace broker { +using boost::format; using namespace qpid; using namespace qpid::framing; @@ -151,9 +154,11 @@ void BrokerAdapter::BrokerAdapter::QueueHandlerImpl::declare(const MethodContext } } } - if (exclusive && !queue->isExclusiveOwner(&connection)) { - throw ChannelException(405, "Cannot grant exclusive access to queue"); - } + if (exclusive && !queue->isExclusiveOwner(&connection)) + throw ChannelException( + 405, + format("Cannot grant exclusive access to queue '%s'") + % queue->getName()); if (!nowait) { string queueName = queue->getName(); connection.client->getQueue().declareOk(context, queueName, queue->getMessageCount(), queue->getConsumerCount()); @@ -248,20 +253,14 @@ void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::consume( throw ConnectionException(530, "Consumer tags must be unique"); } - try{ - string newTag = consumerTag; - channel.consume( - newTag, queue, !noAck, exclusive, noLocal ? &connection : 0, &fields); + string newTag = consumerTag; + channel.consume( + newTag, queue, !noAck, exclusive, noLocal ? &connection : 0, &fields); - if(!nowait) connection.client->getBasic().consumeOk(context, newTag); - - //allow messages to be dispatched if required as there is now a consumer: - queue->dispatch(); - }catch(ExclusiveAccessException& e){ - if(exclusive) throw ChannelException(403, "Exclusive access cannot be granted"); - else throw ChannelException(403, "Access would violate previously granted exclusivity"); - } + if(!nowait) connection.client->getBasic().consumeOk(context, newTag); + //allow messages to be dispatched if required as there is now a consumer: + queue->dispatch(); } void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::cancel(const MethodContext& context, const string& consumerTag, bool nowait){ diff --git a/cpp/lib/broker/BrokerChannel.cpp b/cpp/lib/broker/BrokerChannel.cpp index 954eb391ea..ba1ccb7031 100644 --- a/cpp/lib/broker/BrokerChannel.cpp +++ b/cpp/lib/broker/BrokerChannel.cpp @@ -82,10 +82,11 @@ void Channel::consume(string& tag, Queue::shared_ptr queue, bool acks, ConsumerImpl* c(new ConsumerImpl(this, tag, queue, connection, acks)); try{ queue->consume(c, exclusive);//may throw exception - consumers[tag] = c; - }catch(ExclusiveAccessException& e){ + consumers[tag] = c; + } catch(...) { + // FIXME aconway 2007-02-06: auto_ptr for exception safe mem. mgmt. delete c; - throw e; + throw; } } @@ -190,11 +191,11 @@ void Channel::ConsumerImpl::requestDispatch(){ void Channel::handleInlineTransfer(Message::shared_ptr& msg, Exchange::shared_ptr& exch){ if(transactional){ TxPublish* deliverable = new TxPublish(msg); - exch->route(*deliverable, msg->getRoutingKey(), &(msg->getHeaderProperties()->getHeaders())); + exch->route(*deliverable, msg->getRoutingKey(), &(msg->getApplicationHeaders())); txBuffer.enlist(new DeletingTxOp(deliverable)); }else{ DeliverableMessage deliverable(msg); - exch->route(deliverable, msg->getRoutingKey(), &(msg->getHeaderProperties()->getHeaders())); + exch->route(deliverable, msg->getRoutingKey(), &(msg->getApplicationHeaders())); } } @@ -227,12 +228,12 @@ void Channel::complete(Message::shared_ptr msg) { if(transactional) { std::auto_ptr<TxPublish> deliverable(new TxPublish(msg)); exchange->route(*deliverable, msg->getRoutingKey(), - &(msg->getHeaderProperties()->getHeaders())); + &(msg->getApplicationHeaders())); txBuffer.enlist(new DeletingTxOp(deliverable.release())); } else { DeliverableMessage deliverable(msg); exchange->route(deliverable, msg->getRoutingKey(), - &(msg->getHeaderProperties()->getHeaders())); + &(msg->getApplicationHeaders())); } } diff --git a/cpp/lib/broker/BrokerMessage.cpp b/cpp/lib/broker/BrokerMessage.cpp index 69d4ba087f..43a22ab6b9 100644 --- a/cpp/lib/broker/BrokerMessage.cpp +++ b/cpp/lib/broker/BrokerMessage.cpp @@ -18,6 +18,8 @@ * under the License. * */ +#include <boost/cast.hpp> + #include <BrokerMessage.h> #include <iostream> @@ -116,7 +118,12 @@ void BasicMessage::sendContent( } BasicHeaderProperties* BasicMessage::getHeaderProperties(){ - return dynamic_cast<BasicHeaderProperties*>(header->getProperties()); + return boost::polymorphic_downcast<BasicHeaderProperties*>( + header->getProperties()); +} + +const FieldTable& BasicMessage::getApplicationHeaders(){ + return getHeaderProperties()->getHeaders(); } const ConnectionToken* const BasicMessage::getPublisher(){ diff --git a/cpp/lib/broker/BrokerMessage.h b/cpp/lib/broker/BrokerMessage.h index 5ac9c83d0e..d56912ea60 100644 --- a/cpp/lib/broker/BrokerMessage.h +++ b/cpp/lib/broker/BrokerMessage.h @@ -85,6 +85,7 @@ class BasicMessage : public Message { u_int32_t framesize); framing::BasicHeaderProperties* getHeaderProperties(); + const framing::FieldTable& getApplicationHeaders(); bool isPersistent(); u_int64_t contentSize() const { return size; } diff --git a/cpp/lib/broker/BrokerMessageBase.h b/cpp/lib/broker/BrokerMessageBase.h index 9a5b136ada..d5e37fbc7a 100644 --- a/cpp/lib/broker/BrokerMessageBase.h +++ b/cpp/lib/broker/BrokerMessageBase.h @@ -37,6 +37,7 @@ namespace framing { class MethodContext; class ChannelAdapter; class BasicHeaderProperties; +class FieldTable; } namespace broker { @@ -114,7 +115,12 @@ class Message{ virtual bool isComplete() = 0; virtual u_int64_t contentSize() const = 0; + // FIXME aconway 2007-02-06: Get rid of BasicHeaderProperties + // at this level. Expose only generic properties available from both + // message types (e.g. getApplicationHeaders below). + // virtual framing::BasicHeaderProperties* getHeaderProperties() = 0; + virtual const framing::FieldTable& getApplicationHeaders() = 0; virtual bool isPersistent() = 0; virtual const ConnectionToken* const getPublisher() = 0; diff --git a/cpp/lib/broker/BrokerMessageMessage.cpp b/cpp/lib/broker/BrokerMessageMessage.cpp index 459a0e69e7..d7020b8923 100644 --- a/cpp/lib/broker/BrokerMessageMessage.cpp +++ b/cpp/lib/broker/BrokerMessageMessage.cpp @@ -18,11 +18,13 @@ * under the License. * */ +#include "QpidError.h" #include "BrokerMessageMessage.h" #include "ChannelAdapter.h" #include "MessageTransferBody.h" #include "MessageAppendBody.h" #include "Reference.h" +#include "framing/FieldTable.h" #include <iostream> @@ -30,24 +32,15 @@ using namespace std; using namespace qpid::broker; using namespace qpid::framing; -MessageMessage::MessageMessage( - const boost::shared_ptr<MessageTransferBody> _methodBody, - const std::string& _exchange, const std::string& _routingKey, - bool _mandatory, bool _immediate) : - Message(_exchange, _routingKey, _mandatory, _immediate, _methodBody), - methodBody(_methodBody) -{ -} - MessageMessage::MessageMessage(TransferPtr transfer_) - : Message(transfer_->getExchange(), transfer_->getRoutingKey(), + : Message(transfer_->getDestination(), transfer_->getRoutingKey(), transfer_->getMandatory(), transfer_->getImmediate(), transfer_), transfer(transfer_) {} MessageMessage::MessageMessage(TransferPtr transfer_, const Reference& ref) - : Message(transfer_->getExchange(), transfer_->getRoutingKey(), + : Message(transfer_->getDestination(), transfer_->getRoutingKey(), transfer_->getMandatory(), transfer_->getImmediate(), transfer_), transfer(transfer_), @@ -62,29 +55,29 @@ void MessageMessage::deliver( { channel.send( new MessageTransferBody(channel.getVersion(), - methodBody->getTicket(), + transfer->getTicket(), consumerTag, getRedelivered(), - methodBody->getImmediate(), - methodBody->getTtl(), - methodBody->getPriority(), - methodBody->getTimestamp(), - methodBody->getDeliveryMode(), - methodBody->getExpiration(), + transfer->getImmediate(), + transfer->getTtl(), + transfer->getPriority(), + transfer->getTimestamp(), + transfer->getDeliveryMode(), + transfer->getExpiration(), getExchange(), getRoutingKey(), - methodBody->getMessageId(), - methodBody->getCorrelationId(), - methodBody->getReplyTo(), - methodBody->getContentType(), - methodBody->getContentEncoding(), - methodBody->getUserId(), - methodBody->getAppId(), - methodBody->getTransactionId(), - methodBody->getSecurityToken(), - methodBody->getApplicationHeaders(), - methodBody->getBody(), - methodBody->getMandatory())); + transfer->getMessageId(), + transfer->getCorrelationId(), + transfer->getReplyTo(), + transfer->getContentType(), + transfer->getContentEncoding(), + transfer->getUserId(), + transfer->getAppId(), + transfer->getTransactionId(), + transfer->getSecurityToken(), + transfer->getApplicationHeaders(), + transfer->getBody(), + transfer->getMandatory())); } void MessageMessage::sendGetOk( @@ -98,11 +91,12 @@ void MessageMessage::sendGetOk( bool MessageMessage::isComplete() { - return true; // FIXME aconway 2007-02-05: + return true; } u_int64_t MessageMessage::contentSize() const { + THROW_QPID_ERROR(INTERNAL_ERROR, "Unfinished"); return 0; // FIXME aconway 2007-02-05: } @@ -110,33 +104,45 @@ qpid::framing::BasicHeaderProperties* MessageMessage::getHeaderProperties() { return 0; // FIXME aconway 2007-02-05: } + +const FieldTable& MessageMessage::getApplicationHeaders() +{ + THROW_QPID_ERROR(INTERNAL_ERROR, "Unfinished"); + return transfer->getApplicationHeaders(); +} bool MessageMessage::isPersistent() { + THROW_QPID_ERROR(INTERNAL_ERROR, "Unfinished"); return false; // FIXME aconway 2007-02-05: } const ConnectionToken* const MessageMessage::getPublisher() { + THROW_QPID_ERROR(INTERNAL_ERROR, "Unfinished"); return 0; // FIXME aconway 2007-02-05: } u_int32_t MessageMessage::encodedSize() { + THROW_QPID_ERROR(INTERNAL_ERROR, "Unfinished"); return 0; // FIXME aconway 2007-02-05: } u_int32_t MessageMessage::encodedHeaderSize() { + THROW_QPID_ERROR(INTERNAL_ERROR, "Unfinished"); return 0; // FIXME aconway 2007-02-05: } u_int32_t MessageMessage::encodedContentSize() { + THROW_QPID_ERROR(INTERNAL_ERROR, "Unfinished"); return 0; // FIXME aconway 2007-02-05: } u_int64_t MessageMessage::expectedContentSize() { + THROW_QPID_ERROR(INTERNAL_ERROR, "Unfinished"); return 0; // FIXME aconway 2007-02-05: } diff --git a/cpp/lib/broker/BrokerMessageMessage.h b/cpp/lib/broker/BrokerMessageMessage.h index c943ce6102..5310ef65b3 100644 --- a/cpp/lib/broker/BrokerMessageMessage.h +++ b/cpp/lib/broker/BrokerMessageMessage.h @@ -38,17 +38,11 @@ namespace broker { class Reference; class MessageMessage: public Message{ - const boost::shared_ptr<framing::MessageTransferBody> methodBody; - public: typedef Reference::TransferPtr TransferPtr; typedef Reference::AppendPtr AppendPtr; typedef Reference::Appends Appends; - MessageMessage( - const boost::shared_ptr<framing::MessageTransferBody> methodBody, - const std::string& exchange, const std::string& routingKey, - bool mandatory, bool immediate); MessageMessage(TransferPtr transfer); MessageMessage(TransferPtr transfer, const Reference&); @@ -67,7 +61,8 @@ class MessageMessage: public Message{ bool isComplete(); u_int64_t contentSize() const; - qpid::framing::BasicHeaderProperties* getHeaderProperties(); + framing::BasicHeaderProperties* getHeaderProperties(); + const framing::FieldTable& getApplicationHeaders(); bool isPersistent(); const ConnectionToken* const getPublisher(); diff --git a/cpp/lib/broker/BrokerQueue.cpp b/cpp/lib/broker/BrokerQueue.cpp index 0e48d3b13d..99c045c59a 100644 --- a/cpp/lib/broker/BrokerQueue.cpp +++ b/cpp/lib/broker/BrokerQueue.cpp @@ -18,6 +18,9 @@ * under the License. * */ + +#include <boost/format.hpp> + #include <BrokerQueue.h> #include <MessageStore.h> #include <sys/Monitor.h> @@ -27,6 +30,7 @@ using namespace qpid::broker; using namespace qpid::sys; using namespace qpid::framing; +using boost::format; Queue::Queue(const string& _name, u_int32_t _autodelete, MessageStore* const _store, @@ -128,12 +132,17 @@ void Queue::dispatch(){ void Queue::consume(Consumer* c, bool requestExclusive){ Mutex::ScopedLock locker(lock); - if(exclusive) throw ExclusiveAccessException(); - if(requestExclusive){ - if(!consumers.empty()) throw ExclusiveAccessException(); + if(exclusive) + throw ChannelException( + 403, format("Queue '%s' has an exclusive consumer." + " No more consumers allowed.") % getName()); + if(requestExclusive) { + if(!consumers.empty()) + throw ChannelException( + 403, format("Queue '%s' already has conumers." + "Exclusive access denied.") %getName()); exclusive = c; } - if(autodelete && consumers.empty()) lastUsed = 0; consumers.push_back(c); } diff --git a/cpp/lib/broker/BrokerQueue.h b/cpp/lib/broker/BrokerQueue.h index 860de45b9c..40fa4bd415 100644 --- a/cpp/lib/broker/BrokerQueue.h +++ b/cpp/lib/broker/BrokerQueue.h @@ -21,7 +21,6 @@ * under the License. * */ - #include <vector> #include <memory> #include <queue> @@ -35,6 +34,9 @@ #include <sys/Monitor.h> #include <QueuePolicy.h> +// TODO aconway 2007-02-06: Use auto_ptr and boost::ptr_vector to +// enforce ownership of Consumers. + namespace qpid { namespace broker { class MessageStore; @@ -42,8 +44,6 @@ namespace qpid { /** * Thrown when exclusive access would be violated. */ - struct ExclusiveAccessException{}; - using std::string; /** diff --git a/cpp/lib/broker/MessageHandlerImpl.cpp b/cpp/lib/broker/MessageHandlerImpl.cpp index e19afd0e67..5f5e9b84e7 100644 --- a/cpp/lib/broker/MessageHandlerImpl.cpp +++ b/cpp/lib/broker/MessageHandlerImpl.cpp @@ -1,4 +1,3 @@ - /* * * Copyright (c) 2006 The Apache Software Foundation @@ -17,6 +16,7 @@ * */ +#include "QpidError.h" #include "MessageHandlerImpl.h" #include "BrokerChannel.h" #include "FramingContent.h" @@ -31,6 +31,11 @@ namespace broker { using namespace framing; +MessageHandlerImpl::MessageHandlerImpl(Channel& ch, Connection& c, Broker& b) + : channel(ch), connection(c), broker(b), references(ch), + client(connection.client->getMessage()) +{} + // // Message class method handlers // @@ -42,7 +47,7 @@ MessageHandlerImpl::append(const MethodContext& context, references.get(reference).append( boost::shared_polymorphic_downcast<MessageAppendBody>( context.methodBody)); - sendOk(context); + client.ok(context); } @@ -51,7 +56,7 @@ MessageHandlerImpl::cancel(const MethodContext& context, const string& destination ) { channel.cancel(destination); - sendOk(context); + client.ok(context); } void @@ -59,7 +64,8 @@ MessageHandlerImpl::checkpoint(const MethodContext&, const string& /*reference*/, const string& /*identifier*/ ) { - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + // FIXME astitcher 2007-01-11: 0-9 feature + THROW_QPID_ERROR(INTERNAL_ERROR, "Unimplemented "); } void @@ -67,7 +73,7 @@ MessageHandlerImpl::close(const MethodContext& context, const string& reference) { references.get(reference).close(); - sendOk(context); + client.ok(context); } void @@ -80,32 +86,23 @@ MessageHandlerImpl::consume(const MethodContext& context, bool exclusive, const qpid::framing::FieldTable& filter ) { - Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId()); - if(!destination.empty() && channel.exists(destination)){ + Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId()); + if(!destination.empty() && channel.exists(destination)) throw ConnectionException(530, "Consumer tags must be unique"); - } - - try{ - string newTag = destination; - channel.consume(newTag, queue, !noAck, exclusive, noLocal ? &connection : 0, &filter); - - sendOk(context); - - //allow messages to be dispatched if required as there is now a consumer: - queue->dispatch(); - }catch(ExclusiveAccessException& e){ - if(exclusive) - throw ChannelException(403, "Exclusive access cannot be granted"); - else - throw ChannelException( - 403, "Access would violate previously granted exclusivity"); - } + string tag = destination; + channel.consume( + tag, queue, !noAck, exclusive, + noLocal ? &connection : 0, &filter); + client.ok(context); + // Dispatch messages as there is now a consumer. + queue->dispatch(); } void MessageHandlerImpl::empty( const MethodContext& ) { - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + // FIXME astitcher 2007-01-11: 0-9 feature + THROW_QPID_ERROR(INTERNAL_ERROR, "Unimplemented "); } void @@ -121,17 +118,18 @@ MessageHandlerImpl::get( const MethodContext& context, connection.getQueue(queueName, context.channel->getId()); // FIXME: get is probably Basic specific - if(!channel.get(queue, !noAck)){ - connection.client->getMessageHandler()->empty(context); - } - + if(channel.get(queue, !noAck)) + client.ok(context); + else + client.empty(context); } void MessageHandlerImpl::offset(const MethodContext&, u_int64_t /*value*/ ) { - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + // FIXME astitcher 2007-01-11: 0-9 feature + THROW_QPID_ERROR(INTERNAL_ERROR, "Unimplemented "); } void @@ -145,7 +143,7 @@ MessageHandlerImpl::open(const MethodContext& context, const string& reference) { references.open(reference); - sendOk(context); + client.ok(context); } void @@ -157,18 +155,17 @@ MessageHandlerImpl::qos(const MethodContext& context, //TODO: handle global channel.setPrefetchSize(prefetchSize); channel.setPrefetchCount(prefetchCount); - - sendOk(context); + client.ok(context); } void -MessageHandlerImpl::recover(const MethodContext&, - bool requeue ) +MessageHandlerImpl::recover(const MethodContext& context, + bool requeue) { - //assert(0); // FIXME astitcher 2007-01-11: 0-9 feature - + THROW_QPID_ERROR(INTERNAL_ERROR, "Unimplemented"); + // FIXME aconway 2007-02-06: Call to recover hangs client. channel.recover(requeue); - + client.ok(context); } void @@ -176,7 +173,8 @@ MessageHandlerImpl::reject(const MethodContext&, u_int16_t /*code*/, const string& /*text*/ ) { - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + // FIXME astitcher 2007-01-11: 0-9 feature + THROW_QPID_ERROR(INTERNAL_ERROR, "Unimplemented "); } void @@ -184,22 +182,23 @@ MessageHandlerImpl::resume(const MethodContext&, const string& /*reference*/, const string& /*identifier*/ ) { - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + // FIXME astitcher 2007-01-11: 0-9 feature + THROW_QPID_ERROR(INTERNAL_ERROR, "Unimplemented "); } void MessageHandlerImpl::transfer(const MethodContext& context, u_int16_t /*ticket*/, - const string& /*destination*/, + const string& destination, bool /*redelivered*/, - bool immediate, + bool /*immediate*/, u_int64_t /*ttl*/, u_int8_t /*priority*/, u_int64_t /*timestamp*/, u_int8_t /*deliveryMode*/, u_int64_t /*expiration*/, - const string& exchangeName, - const string& routingKey, + const string& /*exchangeName*/, + const string& /*routingKey*/, const string& /*messageId*/, const string& /*correlationId*/, const string& /*replyTo*/, @@ -211,30 +210,23 @@ MessageHandlerImpl::transfer(const MethodContext& context, const string& /*securityToken*/, const qpid::framing::FieldTable& /*applicationHeaders*/, qpid::framing::Content body, - bool mandatory) + bool /*mandatory*/) { - Exchange::shared_ptr exchange = exchangeName.empty() ? - broker.getExchanges().getDefault() : broker.getExchanges().get(exchangeName); - boost::shared_ptr<MessageTransferBody> transfer(boost::dynamic_pointer_cast<MessageTransferBody>(context.methodBody)); - if(exchange){ - if (body.isInline()) { - Message::shared_ptr msg(new MessageMessage(transfer, exchangeName, - routingKey, mandatory, immediate)); - - channel.handleInlineTransfer(msg, exchange); - - connection.client->getMessageHandler()->ok(context); - } else { - references.get(body.getValue()).transfer(transfer); - } - }else{ - throw ChannelException(404, "Exchange not found '" + exchangeName + "'"); + Exchange::shared_ptr exchange( + broker.getExchanges().get(destination)); + MessageTransferBody::shared_ptr transfer( + boost::shared_polymorphic_downcast<MessageTransferBody>( + context.methodBody)); + if (body.isInline()) { + Message::shared_ptr msg(new MessageMessage(transfer)); + channel.handleInlineTransfer(msg, exchange); + } + else { + // Add to reference. + references.get(body.getValue()).transfer(transfer); } + client.ok(context); } -void MessageHandlerImpl::sendOk(const MethodContext& context) { - connection.client->getMessageHandler()->ok(context); -} - }} // namespace qpid::broker diff --git a/cpp/lib/broker/MessageHandlerImpl.h b/cpp/lib/broker/MessageHandlerImpl.h index 886ca5fb54..0fef45bb19 100644 --- a/cpp/lib/broker/MessageHandlerImpl.h +++ b/cpp/lib/broker/MessageHandlerImpl.h @@ -22,6 +22,7 @@ #include <memory> #include "AMQP_ServerOperations.h" +#include "AMQP_ClientProxy.h" #include "Reference.h" #include "BrokerChannel.h" @@ -36,8 +37,7 @@ class MessageHandlerImpl : public framing::AMQP_ServerOperations::MessageHandler { public: - MessageHandlerImpl(Channel& ch, Connection& c, Broker& b) - : channel(ch), connection(c), broker(b), references(ch) {} + MessageHandlerImpl(Channel& ch, Connection& c, Broker& b); void append(const framing::MethodContext&, const std::string& reference, @@ -119,12 +119,11 @@ class MessageHandlerImpl : framing::Content body, bool mandatory ); private: - void sendOk(const framing::MethodContext&); - Channel& channel; Connection& connection; Broker& broker; ReferenceRegistry references; + framing::AMQP_ClientProxy::Message& client; }; }} // namespace qpid::broker diff --git a/cpp/tests/Makefile.am b/cpp/tests/Makefile.am index 93a117b514..2b51a5b125 100644 --- a/cpp/tests/Makefile.am +++ b/cpp/tests/Makefile.am @@ -98,3 +98,6 @@ gen.mk: Makefile.am ) \ > $@-t mv $@-t $@ + + +check: $(check_LTLIBRARIES) $(lib_common) $(lib_client) $(lib_broker) diff --git a/cpp/tests/python_tests b/cpp/tests/python_tests index d41f8adb80..3f22082f51 100755 --- a/cpp/tests/python_tests +++ b/cpp/tests/python_tests @@ -1,8 +1,4 @@ #!/bin/sh -# FIXME aconway 2007-01-09: Re-enable. -echo "*** WARNING: PYTHON TESTS DISABLED till branch is functioning on 0-9." -exit - # Run the python tests. if test -d ../../python ; then cd ../../python && ./run-tests -v -I cpp_failing.txt |