diff options
author | Alan Conway <aconway@apache.org> | 2007-04-05 19:16:09 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2007-04-05 19:16:09 +0000 |
commit | bb79efff2408de5f6cd66089cde8b8a82cc80cc2 (patch) | |
tree | 9d9c72158da31cebdd7ee538a11951b240922065 /cpp/src/client/MessageMessageChannel.cpp | |
parent | 2a1e4c9663ff0725c061248a96ebab763678fdd6 (diff) | |
download | qpid-python-bb79efff2408de5f6cd66089cde8b8a82cc80cc2.tar.gz |
* Exteneded use of shared pointers frame bodies across all send() commands.
* tests/Makefile.am: added check-unit target to run just unit tests.
* Introduced make_shared_ptr convenience function for wrapping
plain pointers with shared_ptr.
* cpp/src/client/ClientChannel.h,cpp (sendsendAndReceive,sendAndReceiveSync):
Pass shared_ptr instead of raw ptr to fix memory problems.
Updated the following files to use make_shared_ptr
- src/client/BasicMessageChannel.cpp
- src/client/ClientConnection.cpp
* src/client/MessageMessageChannel.cpp: implemented 0-9 message.get.
* src/framing/Correlator.h,cpp: Allow request sender to register actions
to take when the correlated response arrives.
* cpp/src/tests/FramingTest.cpp: Added Correlator tests.
* src/framing/ChannelAdapter.h,cpp: use Correlator to dispatch
response actions.
* cpp/src/shared_ptr.h (make_shared_ptr): Convenience function
to make a shared pointer from a raw pointer.
* cpp/src/tests/ClientChannelTest.cpp: Added message.get test.
* cpp/src/tests/Makefile.am (check-unit): Added test-unit target
to run unit tests.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@525932 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/client/MessageMessageChannel.cpp')
-rw-r--r-- | cpp/src/client/MessageMessageChannel.cpp | 103 |
1 files changed, 84 insertions, 19 deletions
diff --git a/cpp/src/client/MessageMessageChannel.cpp b/cpp/src/client/MessageMessageChannel.cpp index 25fbb95413..164a1cb426 100644 --- a/cpp/src/client/MessageMessageChannel.cpp +++ b/cpp/src/client/MessageMessageChannel.cpp @@ -25,6 +25,7 @@ #include "../framing/FieldTable.h" #include "Connection.h" #include "../shared_ptr.h" +#include <boost/bind.hpp> namespace qpid { namespace client { @@ -48,9 +49,9 @@ void MessageMessageChannel::consume( if (tag.empty()) tag = newTag(); channel.sendAndReceive<MessageOkBody>( - new MessageConsumeBody( + make_shared_ptr(new MessageConsumeBody( channel.getVersion(), 0, queue.getName(), tag, noLocal, - ackMode == NO_ACK, false, fields ? *fields : FieldTable())); + ackMode == NO_ACK, false, fields ? *fields : FieldTable()))); // // FIXME aconway 2007-02-20: Race condition! // // We could receive the first message for the consumer @@ -115,16 +116,44 @@ void MessageMessageChannel::close(){ */ const string getDestinationId("__get__"); +/** + * A destination that provides a Correlator::Action to handle + * MessageEmpty responses. + */ +struct MessageGetDestination : public IncomingMessage::WaitableDestination +{ + void response(shared_ptr<AMQResponseBody> response) { + if (response->amqpClassId() == MessageOkBody::CLASS_ID) { + switch (response->amqpMethodId()) { + case MessageOkBody::METHOD_ID: + // Nothing to do, wait for transfer. + return; + case MessageEmptyBody::METHOD_ID: + empty(); // Wake up waiter with empty queue. + return; + } + } + throw QPID_ERROR(PROTOCOL_ERROR, "Invalid response"); + } + + Correlator::Action action() { + return boost::bind(&MessageGetDestination::response, this, _1); + } +}; + bool MessageMessageChannel::get( - Message& , const Queue& , AckMode ) + Message& msg, const Queue& queue, AckMode ackMode) { Mutex::ScopedLock l(lock); -// incoming.addDestination(getDestinationId, getDest); -// channel.send( -// new MessageGetBody( -// channel.version, 0, queue.getName(), getDestinationId, ackMode)); -// return getDest.wait(msg); - return false; + std::string destName=newTag(); + MessageGetDestination dest; + incoming.addDestination(destName, dest); + channel.send( + make_shared_ptr( + new MessageGetBody( + channel.version, 0, queue.getName(), destName, ackMode)), + dest.action()); + return dest.wait(msg); } @@ -176,9 +205,30 @@ void MessageMessageChannel::publish( // FIXME aconway 2007-02-23: throw QPID_ERROR(INTERNAL_ERROR, "References not yet implemented"); } - channel.sendAndReceive<MessageOkBody>(transfer.get()); + channel.sendAndReceive<MessageOkBody>(transfer); } +void copy(Message& msg, MessageTransferBody& transfer) { + // FIXME aconway 2007-04-05: Verify all required fields + // are copied. + msg.setContentType(transfer.getContentType()); + msg.setContentEncoding(transfer.getContentEncoding()); + msg.setHeaders(transfer.getApplicationHeaders()); + msg.setDeliveryMode(DeliveryMode(transfer.getDeliveryMode())); + msg.setPriority(transfer.getPriority()); + msg.setCorrelationId(transfer.getCorrelationId()); + msg.setReplyTo(transfer.getReplyTo()); + // FIXME aconway 2007-04-05: TTL/Expiration + msg.setMessageId(transfer.getMessageId()); + msg.setTimestamp(transfer.getTimestamp()); + msg.setUserId(transfer.getUserId()); + msg.setAppId(transfer.getAppId()); + msg.setDestination(transfer.getDestination()); + msg.setRedelivered(transfer.getRedelivered()); + msg.setDeliveryTag(0); // No meaning in 0-9 + if (transfer.getBody().isInline()) + msg.setData(transfer.getBody().getValue()); +} void MessageMessageChannel::handle(boost::shared_ptr<AMQMethodBody> method) { assert(method->amqpClassId() ==MessageTransferBody::CLASS_ID); @@ -203,23 +253,38 @@ void MessageMessageChannel::handle(boost::shared_ptr<AMQMethodBody> method) { break; } - case MessageEmptyBody::METHOD_ID: { - // FIXME aconway 2007-04-04: - // getDest.empty(); + case MessageTransferBody::METHOD_ID: { + MessageTransferBody::shared_ptr transfer= + shared_polymorphic_downcast<MessageTransferBody>(method); + if (transfer->getBody().isInline()) { + Message msg; + copy(msg, *transfer); + // Deliver it. + incoming.getDestination(transfer->getDestination()).message(msg); + } + else { + Message& msg=incoming.createMessage( + transfer->getDestination(), transfer->getBody().getValue()); + copy(msg, *transfer); + // Will be delivered when reference closes. + } break; } - case MessageCancelBody::METHOD_ID: - case MessageCheckpointBody::METHOD_ID: + case MessageEmptyBody::METHOD_ID: + case MessageOkBody::METHOD_ID: + // Nothing to do + break; // FIXME aconway 2007-04-03: TODO - case MessageOkBody::METHOD_ID: + case MessageCancelBody::METHOD_ID: + case MessageCheckpointBody::METHOD_ID: case MessageOffsetBody::METHOD_ID: case MessageQosBody::METHOD_ID: case MessageRecoverBody::METHOD_ID: case MessageRejectBody::METHOD_ID: case MessageResumeBody::METHOD_ID: - case MessageTransferBody::METHOD_ID: + break; default: throw Channel::UnknownMethod(); } @@ -322,10 +387,10 @@ void MessageMessageChannel::setReturnedMessageHandler( void MessageMessageChannel::setQos(){ channel.sendAndReceive<MessageOkBody>( - new MessageQosBody(channel.version, 0, channel.getPrefetch(), false)); + make_shared_ptr(new MessageQosBody(channel.version, 0, channel.getPrefetch(), false))); if(channel.isTransactional()) channel.sendAndReceive<TxSelectOkBody>( - new TxSelectBody(channel.version)); + make_shared_ptr(new TxSelectBody(channel.version))); } }} // namespace qpid::client |