diff options
author | Alan Conway <aconway@apache.org> | 2007-04-05 21:23:14 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2007-04-05 21:23:14 +0000 |
commit | f90f46c05b12e9e38bff1ac8418add1b3fa074d8 (patch) | |
tree | 675415764c64cb485d2c9eac93d4ad5639a6bc44 /qpid/cpp/src/client/MessageMessageChannel.cpp | |
parent | 6b5c686b366846b7ecb0bb298c41fe474e1fb3c8 (diff) | |
download | qpid-python-f90f46c05b12e9e38bff1ac8418add1b3fa074d8.tar.gz |
* cpp/src/broker/BrokerMessageMessage.h: Change reference from weak_ptr to
shared_ptr. Broker messages hold their reference.
* cpp/src/broker/Reference.cpp (close): clear messages array to break
shared_ptr cycle and avoid a leak.
* cpp/src/client/MessageMessageChannel.cpp (publish): Support references
for large messages.
* cpp/src/shared_ptr.h (make_shared_ptr): added deleter variant.
* cpp/src/tests/ClientChannelTest.cpp: Enabled testGetNoContent,
testGetFragmentedMessage
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@525964 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/client/MessageMessageChannel.cpp')
-rw-r--r-- | qpid/cpp/src/client/MessageMessageChannel.cpp | 43 |
1 files changed, 39 insertions, 4 deletions
diff --git a/qpid/cpp/src/client/MessageMessageChannel.cpp b/qpid/cpp/src/client/MessageMessageChannel.cpp index 164a1cb426..8d0fdc3189 100644 --- a/qpid/cpp/src/client/MessageMessageChannel.cpp +++ b/qpid/cpp/src/client/MessageMessageChannel.cpp @@ -192,6 +192,17 @@ MessageTransferBody::shared_ptr makeTransfer( )); } +// FIXME aconway 2007-04-05: Generated code should provide this. +/** + * Calculate the size of a frame containing the given body type + * if all variable-lengths parts are empty. + */ +template <class T> size_t overhead() { + static AMQFrame frame( + ProtocolVersion(), 0, make_shared_ptr(new T(ProtocolVersion()))); + return frame.size(); +} + void MessageMessageChannel::publish( const Message& msg, const Exchange& exchange, const std::string& routingKey, bool mandatory, bool immediate) @@ -201,11 +212,35 @@ void MessageMessageChannel::publish( msg, exchange.getName(), routingKey, mandatory, immediate); // Frame itself uses 8 bytes. u_int32_t frameMax = channel.connection->getMaxFrameSize() - 8; - if (transfer->size() > frameMax) { - // FIXME aconway 2007-02-23: - throw QPID_ERROR(INTERNAL_ERROR, "References not yet implemented"); + if (transfer->size() <= frameMax) { + channel.sendAndReceive<MessageOkBody>(transfer); + } + else { + std::string ref = newTag(); + std::string data = transfer->getBody().getValue(); + size_t chunk = + channel.connection->getMaxFrameSize() - + (overhead<MessageAppendBody>() + ref.size()); + // TODO aconway 2007-04-05: cast around lack of generated setters + const_cast<Content&>(transfer->getBody()) = Content(REFERENCE,ref); + channel.send( + make_shared_ptr(new MessageOpenBody(channel.version, ref))); + channel.send(transfer); + const char* p = data.data(); + const char* end = data.data()+data.size(); + while (p+chunk <= end) { + channel.send( + make_shared_ptr( + new MessageAppendBody(channel.version, ref, std::string(p, chunk)))); + p += chunk; + } + if (p < end) { + channel.send( + make_shared_ptr( + new MessageAppendBody(channel.version, ref, std::string(p, end-p)))); + } + channel.send(make_shared_ptr(new MessageCloseBody(channel.version, ref))); } - channel.sendAndReceive<MessageOkBody>(transfer); } void copy(Message& msg, MessageTransferBody& transfer) { |