diff options
Diffstat (limited to 'cpp/lib')
-rw-r--r-- | cpp/lib/broker/BrokerAdapter.cpp | 5 | ||||
-rw-r--r-- | cpp/lib/broker/BrokerChannel.cpp | 11 | ||||
-rw-r--r-- | cpp/lib/broker/BrokerChannel.h | 10 | ||||
-rw-r--r-- | cpp/lib/broker/BrokerMessageMessage.cpp | 46 | ||||
-rw-r--r-- | cpp/lib/broker/BrokerMessageMessage.h | 12 | ||||
-rw-r--r-- | cpp/lib/broker/MessageHandlerImpl.cpp | 42 |
6 files changed, 95 insertions, 31 deletions
diff --git a/cpp/lib/broker/BrokerAdapter.cpp b/cpp/lib/broker/BrokerAdapter.cpp index 6f55f32d47..c9d44c7445 100644 --- a/cpp/lib/broker/BrokerAdapter.cpp +++ b/cpp/lib/broker/BrokerAdapter.cpp @@ -334,6 +334,10 @@ BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::ok( const MethodContext& ) //no specific action required, generic response handling should be sufficient } + +// +// Message class method handlers +// void BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::ping( const MethodContext& context) { @@ -341,6 +345,7 @@ BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::ping( const MethodContext& con connection.client->getChannel().pong(context); } + void BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::pong( const MethodContext& context) { diff --git a/cpp/lib/broker/BrokerChannel.cpp b/cpp/lib/broker/BrokerChannel.cpp index c0250815e8..954eb391ea 100644 --- a/cpp/lib/broker/BrokerChannel.cpp +++ b/cpp/lib/broker/BrokerChannel.cpp @@ -187,6 +187,17 @@ void Channel::ConsumerImpl::requestDispatch(){ if(blocked) queue->dispatch(); } +void Channel::handleInlineTransfer(Message::shared_ptr& msg, Exchange::shared_ptr& exch){ + if(transactional){ + TxPublish* deliverable = new TxPublish(msg); + exch->route(*deliverable, msg->getRoutingKey(), &(msg->getHeaderProperties()->getHeaders())); + txBuffer.enlist(new DeletingTxOp(deliverable)); + }else{ + DeliverableMessage deliverable(msg); + exch->route(deliverable, msg->getRoutingKey(), &(msg->getHeaderProperties()->getHeaders())); + } +} + // FIXME aconway 2007-02-05: Drop exchange member, calculate from // message in ::complete(). void Channel::handlePublish(Message* _message, Exchange::shared_ptr _exchange){ diff --git a/cpp/lib/broker/BrokerChannel.h b/cpp/lib/broker/BrokerChannel.h index 484a4d64e3..cbad2382a8 100644 --- a/cpp/lib/broker/BrokerChannel.h +++ b/cpp/lib/broker/BrokerChannel.h @@ -96,7 +96,9 @@ class Channel : public framing::ChannelAdapter, boost::scoped_ptr<BrokerAdapter> adapter; - virtual void complete(Message::shared_ptr msg); + // completion handler for MessageBuilder + void complete(Message::shared_ptr msg); + void deliver(Message::shared_ptr& msg, const string& tag, Queue::shared_ptr& queue, bool ackExpected); void cancel(consumer_iterator consumer); bool checkPrefetch(Message::shared_ptr& msg); @@ -110,7 +112,9 @@ class Channel : public framing::ChannelAdapter, ~Channel(); + // For ChannelAdapter bool isOpen() const { return opened; } + void open() { opened = true; } void setDefaultQueue(Queue::shared_ptr queue){ defaultQueue = queue; } Queue::shared_ptr getDefaultQueue() const { return defaultQueue; } @@ -134,6 +138,10 @@ class Channel : public framing::ChannelAdapter, void handleHeader(boost::shared_ptr<framing::AMQHeaderBody>); void handleContent(boost::shared_ptr<framing::AMQContentBody>); void handleHeartbeat(boost::shared_ptr<framing::AMQHeartbeatBody>); + + void handleInlineTransfer(Message::shared_ptr& msg, Exchange::shared_ptr& exchange); + + // For ChannelAdapter void handleMethodInContext( boost::shared_ptr<framing::AMQMethodBody> method, const framing::MethodContext& context); diff --git a/cpp/lib/broker/BrokerMessageMessage.cpp b/cpp/lib/broker/BrokerMessageMessage.cpp index e2c4b94811..459a0e69e7 100644 --- a/cpp/lib/broker/BrokerMessageMessage.cpp +++ b/cpp/lib/broker/BrokerMessageMessage.cpp @@ -18,15 +18,27 @@ * under the License. * */ -#include <iostream> #include "BrokerMessageMessage.h" +#include "ChannelAdapter.h" #include "MessageTransferBody.h" #include "MessageAppendBody.h" #include "Reference.h" +#include <iostream> + using namespace std; using namespace qpid::broker; +using namespace qpid::framing; +MessageMessage::MessageMessage( + const boost::shared_ptr<MessageTransferBody> _methodBody, + const std::string& _exchange, const std::string& _routingKey, + bool _mandatory, bool _immediate) : + Message(_exchange, _routingKey, _mandatory, _immediate, _methodBody), + methodBody(_methodBody) +{ +} + MessageMessage::MessageMessage(TransferPtr transfer_) : Message(transfer_->getExchange(), transfer_->getRoutingKey(), transfer_->getMandatory(), transfer_->getImmediate(), @@ -43,14 +55,36 @@ MessageMessage::MessageMessage(TransferPtr transfer_, const Reference& ref) {} void MessageMessage::deliver( - framing::ChannelAdapter& /*channel*/, - const std::string& /*consumerTag*/, + framing::ChannelAdapter& channel, + const std::string& consumerTag, u_int64_t /*deliveryTag*/, u_int32_t /*framesize*/) { - // FIXME aconway 2007-02-05: - cout << "MessageMessage::deliver" << *transfer << " + " << appends.size() - << " appends." << endl; + channel.send( + new MessageTransferBody(channel.getVersion(), + methodBody->getTicket(), + consumerTag, + getRedelivered(), + methodBody->getImmediate(), + methodBody->getTtl(), + methodBody->getPriority(), + methodBody->getTimestamp(), + methodBody->getDeliveryMode(), + methodBody->getExpiration(), + getExchange(), + getRoutingKey(), + methodBody->getMessageId(), + methodBody->getCorrelationId(), + methodBody->getReplyTo(), + methodBody->getContentType(), + methodBody->getContentEncoding(), + methodBody->getUserId(), + methodBody->getAppId(), + methodBody->getTransactionId(), + methodBody->getSecurityToken(), + methodBody->getApplicationHeaders(), + methodBody->getBody(), + methodBody->getMandatory())); } void MessageMessage::sendGetOk( diff --git a/cpp/lib/broker/BrokerMessageMessage.h b/cpp/lib/broker/BrokerMessageMessage.h index aa136863a1..c943ce6102 100644 --- a/cpp/lib/broker/BrokerMessageMessage.h +++ b/cpp/lib/broker/BrokerMessageMessage.h @@ -21,10 +21,12 @@ * under the License. * */ -#include <vector> #include "BrokerMessageBase.h" +#include "MessageTransferBody.h" #include "Reference.h" +#include <vector> + namespace qpid { namespace framing { @@ -36,11 +38,17 @@ namespace broker { class Reference; class MessageMessage: public Message{ + const boost::shared_ptr<framing::MessageTransferBody> methodBody; + public: typedef Reference::TransferPtr TransferPtr; typedef Reference::AppendPtr AppendPtr; - typedef Reference::Appends Appends; + typedef Reference::Appends Appends; + MessageMessage( + const boost::shared_ptr<framing::MessageTransferBody> methodBody, + const std::string& exchange, const std::string& routingKey, + bool mandatory, bool immediate); MessageMessage(TransferPtr transfer); MessageMessage(TransferPtr transfer, const Reference&); diff --git a/cpp/lib/broker/MessageHandlerImpl.cpp b/cpp/lib/broker/MessageHandlerImpl.cpp index 30b69e4654..e19afd0e67 100644 --- a/cpp/lib/broker/MessageHandlerImpl.cpp +++ b/cpp/lib/broker/MessageHandlerImpl.cpp @@ -80,8 +80,6 @@ MessageHandlerImpl::consume(const MethodContext& context, bool exclusive, const qpid::framing::FieldTable& filter ) { - //assert(0); // FIXME astitcher 2007-01-11: 0-9 feature - Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId()); if(!destination.empty() && channel.exists(destination)){ throw ConnectionException(530, "Consumer tags must be unique"); @@ -139,7 +137,7 @@ MessageHandlerImpl::offset(const MethodContext&, void MessageHandlerImpl::ok( const MethodContext& ) { - // TODO aconway 2007-02-05: For HA, we can drop acked messages here. + // TODO: Need to ack the transfers acknowledged so far for flow control purp oses } void @@ -156,8 +154,6 @@ MessageHandlerImpl::qos(const MethodContext& context, u_int16_t prefetchCount, bool /*global*/ ) { - //assert(0); // FIXME astitcher 2007-01-11: 0-9 feature - //TODO: handle global channel.setPrefetchSize(prefetchSize); channel.setPrefetchCount(prefetchCount); @@ -196,14 +192,14 @@ MessageHandlerImpl::transfer(const MethodContext& context, u_int16_t /*ticket*/, const string& /*destination*/, bool /*redelivered*/, - bool /* immediate */, + bool immediate, u_int64_t /*ttl*/, u_int8_t /*priority*/, u_int64_t /*timestamp*/, u_int8_t /*deliveryMode*/, u_int64_t /*expiration*/, const string& exchangeName, - const string& /* routingKey */, + const string& routingKey, const string& /*messageId*/, const string& /*correlationId*/, const string& /*replyTo*/, @@ -215,22 +211,24 @@ MessageHandlerImpl::transfer(const MethodContext& context, const string& /*securityToken*/, const qpid::framing::FieldTable& /*applicationHeaders*/, qpid::framing::Content body, - bool /* mandatory */ ) + bool mandatory) { - //assert(0); // FIXME astitcher 2007-01-11: 0-9 feature - MessageTransferBody::shared_ptr transfer = - boost::shared_polymorphic_downcast<MessageTransferBody>( - context.methodBody); - // Verify the exchange exists, will throw if not. - broker.getExchanges().get(exchangeName); - if (body.isInline()) { - MessageMessage* msg = new MessageMessage(transfer); - // FIXME aconway 2007-02-05: Remove exchange parameter. - // use shared_ptr for message. - channel.handlePublish(msg, Exchange::shared_ptr()); - sendOk(context); - } else { - references.get(body.getValue()).transfer(transfer); + Exchange::shared_ptr exchange = exchangeName.empty() ? + broker.getExchanges().getDefault() : broker.getExchanges().get(exchangeName); + boost::shared_ptr<MessageTransferBody> transfer(boost::dynamic_pointer_cast<MessageTransferBody>(context.methodBody)); + if(exchange){ + if (body.isInline()) { + Message::shared_ptr msg(new MessageMessage(transfer, exchangeName, + routingKey, mandatory, immediate)); + + channel.handleInlineTransfer(msg, exchange); + + connection.client->getMessageHandler()->ok(context); + } else { + references.get(body.getValue()).transfer(transfer); + } + }else{ + throw ChannelException(404, "Exchange not found '" + exchangeName + "'"); } } |