diff options
author | Alan Conway <aconway@apache.org> | 2007-02-02 22:03:10 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2007-02-02 22:03:10 +0000 |
commit | b5c270f10496f522ef6a03a8fa60f85d55c9187d (patch) | |
tree | 714e7abf7ba591d00232d821440e51461175cb9e /cpp/lib/broker/BrokerMessage.cpp | |
parent | 750f272ac99e8c830807affb3ae68ab0beeca63f (diff) | |
download | qpid-python-b5c270f10496f522ef6a03a8fa60f85d55c9187d.tar.gz |
* cpp/lib/common/framing/MethodContext.h: Reduced MethodContext to
ChannelAdapter and Method Body. Request ID comes from body,
ChannelAdapter is used to send frames, not OutputHandler.
* cpp/lib/common/framing/ChannelAdapter.h,.cpp: Removed context member.
Context is per-method not per-channel.
* cpp/lib/broker/*: Replace direct use of OutputHandler and ChannelId
with MethodContext (for responses) or ChannelAdapter (for requests.)
Use context request-ID to construct responses, send all bodies via
ChannelAdapter.
* cpp/lib/broker/BrokerAdapter.cpp: Link broker::Channel to BrokerAdapter.
* cpp/lib/broker/*: Remove unnecessary ProtocolVersion parameters.
Fix bogus signatures: ProtocolVersion* -> const ProtocolVersion&
* Cosmetic changes, many files:
- fixed indentation, broke long lines.
- removed unnecessary qpid:: prefixes.
* broker/BrokerAdapter,BrokerChannel: Merged BrokerAdapter into
broker::channel.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@502767 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/lib/broker/BrokerMessage.cpp')
-rw-r--r-- | cpp/lib/broker/BrokerMessage.cpp | 96 |
1 files changed, 54 insertions, 42 deletions
diff --git a/cpp/lib/broker/BrokerMessage.cpp b/cpp/lib/broker/BrokerMessage.cpp index b738040470..69d4ba087f 100644 --- a/cpp/lib/broker/BrokerMessage.cpp +++ b/cpp/lib/broker/BrokerMessage.cpp @@ -27,31 +27,35 @@ #include <BasicDeliverBody.h> #include <BasicGetOkBody.h> #include "AMQFrame.h" +#include "framing/ChannelAdapter.h" using namespace boost; using namespace qpid::broker; using namespace qpid::framing; using namespace qpid::sys; -BasicMessage::BasicMessage(const ConnectionToken* const _publisher, - const string& _exchange, const string& _routingKey, - bool _mandatory, bool _immediate) : - Message(_exchange, _routingKey, _mandatory, _immediate), - publisher(_publisher), - size(0) +BasicMessage::BasicMessage( + const ConnectionToken* const _publisher, + const string& _exchange, const string& _routingKey, + bool _mandatory, bool _immediate, framing::AMQMethodBody::shared_ptr respondTo +) : + Message(_exchange, _routingKey, _mandatory, _immediate, respondTo), + publisher(_publisher), + size(0) { } -BasicMessage::BasicMessage(Buffer& buffer, bool headersOnly, u_int32_t contentChunkSize) : - publisher(0), size(0) -{ +// FIXME aconway 2007-02-01: remove. +// BasicMessage::BasicMessage(Buffer& buffer, bool headersOnly, u_int32_t contentChunkSize) : +// publisher(0), size(0) +// { - decode(buffer, headersOnly, contentChunkSize); -} +// decode(buffer, headersOnly, contentChunkSize); +// } +// For tests only. BasicMessage::BasicMessage() : publisher(0), size(0) -{ -} +{} BasicMessage::~BasicMessage(){ if (content.get()) content->destroy(); @@ -73,34 +77,42 @@ bool BasicMessage::isComplete(){ return header.get() && (header->getContentSize() == contentSize()); } -void BasicMessage::deliver(OutputHandler* out, int channel, - const string& consumerTag, u_int64_t deliveryTag, - u_int32_t framesize, - ProtocolVersion* version){ - // CCT -- TODO - Update code generator to take pointer/ not instance to avoid extra contruction - out->send(new AMQFrame(*version, channel, - new BasicDeliverBody(*version, consumerTag, deliveryTag, getRedelivered(), getExchange(), getRoutingKey()))); - sendContent(out, channel, framesize, version); -} - -void BasicMessage::sendGetOk(OutputHandler* out, - int channel, - u_int32_t messageCount, - u_int64_t deliveryTag, - u_int32_t framesize, - ProtocolVersion* version){ - // CCT -- TODO - Update code generator to take pointer/ not instance to avoid extra contruction - out->send(new AMQFrame(*version, channel, - new BasicGetOkBody(*version, deliveryTag, getRedelivered(), getExchange(), getRoutingKey(), messageCount))); - sendContent(out, channel, framesize, version); -} - -void BasicMessage::sendContent(OutputHandler* out, int channel, u_int32_t framesize, ProtocolVersion* version){ - AMQBody::shared_ptr headerBody = static_pointer_cast<AMQBody, AMQHeaderBody>(header); - out->send(new AMQFrame(*version, channel, headerBody)); - +void BasicMessage::deliver(ChannelAdapter& channel, + const string& consumerTag, u_int64_t deliveryTag, + u_int32_t framesize) +{ + // CCT -- TODO - Update code generator to take pointer/ not + // instance to avoid extra contruction + channel.send( + new BasicDeliverBody( + channel.getVersion(), consumerTag, deliveryTag, + getRedelivered(), getExchange(), getRoutingKey())); + sendContent(channel, framesize); +} + +void BasicMessage::sendGetOk(const MethodContext& context, + u_int32_t messageCount, + u_int64_t deliveryTag, + u_int32_t framesize) +{ + // CCT -- TODO - Update code generator to take pointer/ not + // instance to avoid extra contruction + context.channel->send( + new BasicGetOkBody( + context.channel->getVersion(), + context.methodBody->getRequestId(), + deliveryTag, getRedelivered(), getExchange(), + getRoutingKey(), messageCount)); + sendContent(*context.channel, framesize); +} + +void BasicMessage::sendContent( + ChannelAdapter& channel, u_int32_t framesize) +{ + channel.send(header); Mutex::ScopedLock locker(contentLock); - if (content.get()) content->send(*version, out, channel, framesize); + if (content.get()) + content->send(channel, framesize); } BasicHeaderProperties* BasicMessage::getHeaderProperties(){ @@ -126,8 +138,8 @@ void BasicMessage::decode(Buffer& buffer, bool headersOnly, u_int32_t contentChu void BasicMessage::decodeHeader(Buffer& buffer) { - string exchange; - string routingKey; + string exchange; + string routingKey; buffer.getShortString(exchange); buffer.getShortString(routingKey); |