summaryrefslogtreecommitdiff
path: root/cpp/lib/broker/BrokerMessage.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-02-02 22:03:10 +0000
committerAlan Conway <aconway@apache.org>2007-02-02 22:03:10 +0000
commitb5c270f10496f522ef6a03a8fa60f85d55c9187d (patch)
tree714e7abf7ba591d00232d821440e51461175cb9e /cpp/lib/broker/BrokerMessage.cpp
parent750f272ac99e8c830807affb3ae68ab0beeca63f (diff)
downloadqpid-python-b5c270f10496f522ef6a03a8fa60f85d55c9187d.tar.gz
* cpp/lib/common/framing/MethodContext.h: Reduced MethodContext to
ChannelAdapter and Method Body. Request ID comes from body, ChannelAdapter is used to send frames, not OutputHandler. * cpp/lib/common/framing/ChannelAdapter.h,.cpp: Removed context member. Context is per-method not per-channel. * cpp/lib/broker/*: Replace direct use of OutputHandler and ChannelId with MethodContext (for responses) or ChannelAdapter (for requests.) Use context request-ID to construct responses, send all bodies via ChannelAdapter. * cpp/lib/broker/BrokerAdapter.cpp: Link broker::Channel to BrokerAdapter. * cpp/lib/broker/*: Remove unnecessary ProtocolVersion parameters. Fix bogus signatures: ProtocolVersion* -> const ProtocolVersion& * Cosmetic changes, many files: - fixed indentation, broke long lines. - removed unnecessary qpid:: prefixes. * broker/BrokerAdapter,BrokerChannel: Merged BrokerAdapter into broker::channel. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@502767 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/lib/broker/BrokerMessage.cpp')
-rw-r--r--cpp/lib/broker/BrokerMessage.cpp96
1 files changed, 54 insertions, 42 deletions
diff --git a/cpp/lib/broker/BrokerMessage.cpp b/cpp/lib/broker/BrokerMessage.cpp
index b738040470..69d4ba087f 100644
--- a/cpp/lib/broker/BrokerMessage.cpp
+++ b/cpp/lib/broker/BrokerMessage.cpp
@@ -27,31 +27,35 @@
#include <BasicDeliverBody.h>
#include <BasicGetOkBody.h>
#include "AMQFrame.h"
+#include "framing/ChannelAdapter.h"
using namespace boost;
using namespace qpid::broker;
using namespace qpid::framing;
using namespace qpid::sys;
-BasicMessage::BasicMessage(const ConnectionToken* const _publisher,
- const string& _exchange, const string& _routingKey,
- bool _mandatory, bool _immediate) :
- Message(_exchange, _routingKey, _mandatory, _immediate),
- publisher(_publisher),
- size(0)
+BasicMessage::BasicMessage(
+ const ConnectionToken* const _publisher,
+ const string& _exchange, const string& _routingKey,
+ bool _mandatory, bool _immediate, framing::AMQMethodBody::shared_ptr respondTo
+) :
+ Message(_exchange, _routingKey, _mandatory, _immediate, respondTo),
+ publisher(_publisher),
+ size(0)
{
}
-BasicMessage::BasicMessage(Buffer& buffer, bool headersOnly, u_int32_t contentChunkSize) :
- publisher(0), size(0)
-{
+// FIXME aconway 2007-02-01: remove.
+// BasicMessage::BasicMessage(Buffer& buffer, bool headersOnly, u_int32_t contentChunkSize) :
+// publisher(0), size(0)
+// {
- decode(buffer, headersOnly, contentChunkSize);
-}
+// decode(buffer, headersOnly, contentChunkSize);
+// }
+// For tests only.
BasicMessage::BasicMessage() : publisher(0), size(0)
-{
-}
+{}
BasicMessage::~BasicMessage(){
if (content.get()) content->destroy();
@@ -73,34 +77,42 @@ bool BasicMessage::isComplete(){
return header.get() && (header->getContentSize() == contentSize());
}
-void BasicMessage::deliver(OutputHandler* out, int channel,
- const string& consumerTag, u_int64_t deliveryTag,
- u_int32_t framesize,
- ProtocolVersion* version){
- // CCT -- TODO - Update code generator to take pointer/ not instance to avoid extra contruction
- out->send(new AMQFrame(*version, channel,
- new BasicDeliverBody(*version, consumerTag, deliveryTag, getRedelivered(), getExchange(), getRoutingKey())));
- sendContent(out, channel, framesize, version);
-}
-
-void BasicMessage::sendGetOk(OutputHandler* out,
- int channel,
- u_int32_t messageCount,
- u_int64_t deliveryTag,
- u_int32_t framesize,
- ProtocolVersion* version){
- // CCT -- TODO - Update code generator to take pointer/ not instance to avoid extra contruction
- out->send(new AMQFrame(*version, channel,
- new BasicGetOkBody(*version, deliveryTag, getRedelivered(), getExchange(), getRoutingKey(), messageCount)));
- sendContent(out, channel, framesize, version);
-}
-
-void BasicMessage::sendContent(OutputHandler* out, int channel, u_int32_t framesize, ProtocolVersion* version){
- AMQBody::shared_ptr headerBody = static_pointer_cast<AMQBody, AMQHeaderBody>(header);
- out->send(new AMQFrame(*version, channel, headerBody));
-
+void BasicMessage::deliver(ChannelAdapter& channel,
+ const string& consumerTag, u_int64_t deliveryTag,
+ u_int32_t framesize)
+{
+ // CCT -- TODO - Update code generator to take pointer/ not
+ // instance to avoid extra contruction
+ channel.send(
+ new BasicDeliverBody(
+ channel.getVersion(), consumerTag, deliveryTag,
+ getRedelivered(), getExchange(), getRoutingKey()));
+ sendContent(channel, framesize);
+}
+
+void BasicMessage::sendGetOk(const MethodContext& context,
+ u_int32_t messageCount,
+ u_int64_t deliveryTag,
+ u_int32_t framesize)
+{
+ // CCT -- TODO - Update code generator to take pointer/ not
+ // instance to avoid extra contruction
+ context.channel->send(
+ new BasicGetOkBody(
+ context.channel->getVersion(),
+ context.methodBody->getRequestId(),
+ deliveryTag, getRedelivered(), getExchange(),
+ getRoutingKey(), messageCount));
+ sendContent(*context.channel, framesize);
+}
+
+void BasicMessage::sendContent(
+ ChannelAdapter& channel, u_int32_t framesize)
+{
+ channel.send(header);
Mutex::ScopedLock locker(contentLock);
- if (content.get()) content->send(*version, out, channel, framesize);
+ if (content.get())
+ content->send(channel, framesize);
}
BasicHeaderProperties* BasicMessage::getHeaderProperties(){
@@ -126,8 +138,8 @@ void BasicMessage::decode(Buffer& buffer, bool headersOnly, u_int32_t contentChu
void BasicMessage::decodeHeader(Buffer& buffer)
{
- string exchange;
- string routingKey;
+ string exchange;
+ string routingKey;
buffer.getShortString(exchange);
buffer.getShortString(routingKey);