summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2007-09-10 08:41:05 +0000
committerGordon Sim <gsim@apache.org>2007-09-10 08:41:05 +0000
commita5c0fde5d0b96ae0b747f0cea21414753d6ee654 (patch)
tree4a809a880691db3e04fa3c7374db500b767ca85b /cpp/src
parent783b718d0b270121cd2e597424d0c81adea77a38 (diff)
downloadqpid-python-a5c0fde5d0b96ae0b747f0cea21414753d6ee654.tar.gz
Client side support for message and delivery properties in header segments.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@574176 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/Makefile.am3
-rw-r--r--cpp/src/qpid/broker/BrokerQueue.cpp4
-rw-r--r--cpp/src/qpid/broker/DeliveryRecord.cpp2
-rw-r--r--cpp/src/qpid/broker/Message.cpp6
-rw-r--r--cpp/src/qpid/broker/Message.h6
-rw-r--r--cpp/src/qpid/broker/MessageAdapter.cpp87
-rw-r--r--cpp/src/qpid/broker/MessageAdapter.h65
-rw-r--r--cpp/src/qpid/broker/MessageBuilder.cpp40
-rw-r--r--cpp/src/qpid/broker/Session.cpp4
-rw-r--r--cpp/src/qpid/client/ClientMessage.h10
-rw-r--r--cpp/src/qpid/client/ConnectionImpl.cpp34
-rw-r--r--cpp/src/qpid/client/ConnectionImpl.h5
-rw-r--r--cpp/src/qpid/client/ExecutionHandler.cpp17
-rw-r--r--cpp/src/qpid/client/ExecutionHandler.h2
-rw-r--r--cpp/src/qpid/framing/AMQContentBody.cpp2
-rw-r--r--cpp/src/qpid/framing/FrameSet.cpp13
-rw-r--r--cpp/src/qpid/framing/FrameSet.h1
-rw-r--r--cpp/src/qpid/framing/MethodContent.h5
-rw-r--r--cpp/src/qpid/framing/TransferContent.cpp64
-rw-r--r--cpp/src/qpid/framing/TransferContent.h46
-rw-r--r--cpp/src/tests/ClientSessionTest.cpp20
21 files changed, 330 insertions, 106 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am
index d97265c1d6..a1d8e38372 100644
--- a/cpp/src/Makefile.am
+++ b/cpp/src/Makefile.am
@@ -132,6 +132,7 @@ libqpidcommon_la_SOURCES = \
qpid/framing/Blob.cpp \
qpid/framing/MethodHolder.h qpid/framing/MethodHolder.cpp \
qpid/framing/MethodHolderMaxSize.h \
+ qpid/framing/TransferContent.cpp \
qpid/Exception.cpp \
qpid/Plugin.h \
qpid/Plugin.cpp \
@@ -178,6 +179,7 @@ libqpidbroker_la_SOURCES = \
qpid/broker/FanOutExchange.cpp \
qpid/broker/HeadersExchange.cpp \
qpid/broker/Message.cpp \
+ qpid/broker/MessageAdapter.cpp \
qpid/broker/MessageBuilder.cpp \
qpid/broker/MessageDelivery.cpp \
qpid/broker/MessageHandlerImpl.cpp \
@@ -344,6 +346,7 @@ nobase_include_HEADERS = \
qpid/framing/SequenceNumberSet.h \
qpid/framing/SerializeHandler.h \
qpid/framing/StructHelper.h \
+ qpid/framing/TransferContent.h \
qpid/framing/TypeFilter.h \
qpid/framing/Value.h \
qpid/framing/Visitor.h \
diff --git a/cpp/src/qpid/broker/BrokerQueue.cpp b/cpp/src/qpid/broker/BrokerQueue.cpp
index a094c7a804..1a13a31a5e 100644
--- a/cpp/src/qpid/broker/BrokerQueue.cpp
+++ b/cpp/src/qpid/broker/BrokerQueue.cpp
@@ -68,7 +68,7 @@ void Queue::deliver(Message::shared_ptr& msg){
if (msg->isImmediate() && getConsumerCount() == 0) {
if (alternateExchange) {
DeliverableMessage deliverable(msg);
- alternateExchange->route(deliverable, msg->getRoutingKey(), &(msg->getApplicationHeaders()));
+ alternateExchange->route(deliverable, msg->getRoutingKey(), msg->getApplicationHeaders());
}
} else {
@@ -358,7 +358,7 @@ void Queue::destroy()
while(!messages.empty()){
DeliverableMessage msg(messages.front().payload);
alternateExchange->route(msg, msg.getMessage().getRoutingKey(),
- &(msg.getMessage().getApplicationHeaders()));
+ msg.getMessage().getApplicationHeaders());
pop();
}
alternateExchange->decAlternateUsers();
diff --git a/cpp/src/qpid/broker/DeliveryRecord.cpp b/cpp/src/qpid/broker/DeliveryRecord.cpp
index 9b33fd5f10..a8a0745104 100644
--- a/cpp/src/qpid/broker/DeliveryRecord.cpp
+++ b/cpp/src/qpid/broker/DeliveryRecord.cpp
@@ -95,7 +95,7 @@ void DeliveryRecord::reject()
Exchange::shared_ptr alternate = queue->getAlternateExchange();
if (alternate) {
DeliverableMessage delivery(msg.payload);
- alternate->route(delivery, msg.payload->getRoutingKey(), &(msg.payload->getApplicationHeaders()));
+ alternate->route(delivery, msg.payload->getRoutingKey(), msg.payload->getApplicationHeaders());
QPID_LOG(info, "Routed rejected message from " << queue->getName() << " to "
<< alternate->getName());
} else {
diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp
index e5f92297b7..84d3478173 100644
--- a/cpp/src/qpid/broker/Message.cpp
+++ b/cpp/src/qpid/broker/Message.cpp
@@ -38,12 +38,12 @@ PublishAdapter Message::PUBLISH;
Message::Message(const SequenceNumber& id) : frames(id), persistenceId(0), redelivered(false), publisher(0), store(0), adapter(0) {}
-const std::string& Message::getRoutingKey() const
+std::string Message::getRoutingKey() const
{
return getAdapter().getRoutingKey(frames);
}
-const std::string& Message::getExchangeName() const
+std::string Message::getExchangeName() const
{
return getAdapter().getExchange(frames);
}
@@ -61,7 +61,7 @@ bool Message::isImmediate() const
return getAdapter().isImmediate(frames);
}
-const FieldTable& Message::getApplicationHeaders() const
+const FieldTable* Message::getApplicationHeaders() const
{
return getAdapter().getApplicationHeaders(frames);
}
diff --git a/cpp/src/qpid/broker/Message.h b/cpp/src/qpid/broker/Message.h
index 95b3f38b55..26b31d73e5 100644
--- a/cpp/src/qpid/broker/Message.h
+++ b/cpp/src/qpid/broker/Message.h
@@ -59,11 +59,11 @@ public:
uint64_t contentSize() const;
- const std::string& getRoutingKey() const;
+ std::string getRoutingKey() const;
const boost::shared_ptr<Exchange> getExchange(ExchangeRegistry&) const;
- const std::string& getExchangeName() const;
+ std::string getExchangeName() const;
bool isImmediate() const;
- const framing::FieldTable& getApplicationHeaders() const;
+ const framing::FieldTable* getApplicationHeaders() const;
bool isPersistent();
framing::FrameSet& getFrames() { return frames; }
diff --git a/cpp/src/qpid/broker/MessageAdapter.cpp b/cpp/src/qpid/broker/MessageAdapter.cpp
new file mode 100644
index 0000000000..764bf02cf4
--- /dev/null
+++ b/cpp/src/qpid/broker/MessageAdapter.cpp
@@ -0,0 +1,87 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "MessageAdapter.h"
+
+namespace {
+ const std::string empty;
+}
+
+namespace qpid {
+namespace broker{
+
+ std::string PublishAdapter::getRoutingKey(const framing::FrameSet& f)
+ {
+ return f.as<framing::BasicPublishBody>()->getRoutingKey();
+ }
+
+ std::string PublishAdapter::getExchange(const framing::FrameSet& f)
+ {
+ return f.as<framing::BasicPublishBody>()->getExchange();
+ }
+
+ bool PublishAdapter::isImmediate(const framing::FrameSet& f)
+ {
+ return f.as<framing::BasicPublishBody>()->getImmediate();
+ }
+
+ const framing::FieldTable* PublishAdapter::getApplicationHeaders(const framing::FrameSet& f)
+ {
+ const framing::BasicHeaderProperties* p = f.getHeaders()->get<framing::BasicHeaderProperties>();
+ return p ? &(p->getHeaders()) : 0;
+ }
+
+ bool PublishAdapter::isPersistent(const framing::FrameSet& f)
+ {
+ const framing::BasicHeaderProperties* p = f.getHeaders()->get<framing::BasicHeaderProperties>();
+ return p && p->getDeliveryMode() == 2;
+ }
+
+ std::string TransferAdapter::getRoutingKey(const framing::FrameSet& f)
+ {
+ const framing::DeliveryProperties* p = f.getHeaders()->get<framing::DeliveryProperties>();
+ return p ? p->getRoutingKey() : empty;
+ }
+
+ std::string TransferAdapter::getExchange(const framing::FrameSet& f)
+ {
+ return f.as<framing::MessageTransferBody>()->getDestination();
+ }
+
+ bool TransferAdapter::isImmediate(const framing::FrameSet&)
+ {
+ //TODO: we seem to have lost the immediate flag
+ return false;
+ }
+
+ const framing::FieldTable* TransferAdapter::getApplicationHeaders(const framing::FrameSet& f)
+ {
+ const framing::MessageProperties* p = f.getHeaders()->get<framing::MessageProperties>();
+ return p ? &(p->getApplicationHeaders()) : 0;
+ }
+
+ bool TransferAdapter::isPersistent(const framing::FrameSet& f)
+ {
+ const framing::DeliveryProperties* p = f.getHeaders()->get<framing::DeliveryProperties>();
+ return p && p->getDeliveryMode() == 2;
+ }
+
+}}
diff --git a/cpp/src/qpid/broker/MessageAdapter.h b/cpp/src/qpid/broker/MessageAdapter.h
index 0b2dc6307a..e8337ec649 100644
--- a/cpp/src/qpid/broker/MessageAdapter.h
+++ b/cpp/src/qpid/broker/MessageAdapter.h
@@ -38,68 +38,29 @@ struct MessageAdapter
{
virtual ~MessageAdapter() {}
- virtual const std::string& getRoutingKey(const framing::FrameSet& f) = 0;
- virtual const std::string& getExchange(const framing::FrameSet& f) = 0;
+ virtual std::string getRoutingKey(const framing::FrameSet& f) = 0;
+ virtual std::string getExchange(const framing::FrameSet& f) = 0;
virtual bool isImmediate(const framing::FrameSet& f) = 0;
- virtual const framing::FieldTable& getApplicationHeaders(const framing::FrameSet& f) = 0;
+ virtual const framing::FieldTable* getApplicationHeaders(const framing::FrameSet& f) = 0;
virtual bool isPersistent(const framing::FrameSet& f) = 0;
};
struct PublishAdapter : MessageAdapter
{
- const std::string& getRoutingKey(const framing::FrameSet& f)
- {
- return f.as<framing::BasicPublishBody>()->getRoutingKey();
- }
-
- const std::string& getExchange(const framing::FrameSet& f)
- {
- return f.as<framing::BasicPublishBody>()->getExchange();
- }
-
- bool isImmediate(const framing::FrameSet& f)
- {
- return f.as<framing::BasicPublishBody>()->getImmediate();
- }
-
- const framing::FieldTable& getApplicationHeaders(const framing::FrameSet& f)
- {
- return f.getHeaders()->get<framing::BasicHeaderProperties>()->getHeaders();
- }
-
- bool isPersistent(const framing::FrameSet& f)
- {
- return f.getHeaders()->get<framing::BasicHeaderProperties>()->getDeliveryMode() == 2;
- }
+ std::string getRoutingKey(const framing::FrameSet& f);
+ std::string getExchange(const framing::FrameSet& f);
+ bool isImmediate(const framing::FrameSet& f);
+ const framing::FieldTable* getApplicationHeaders(const framing::FrameSet& f);
+ bool isPersistent(const framing::FrameSet& f);
};
struct TransferAdapter : MessageAdapter
{
- const std::string& getRoutingKey(const framing::FrameSet& f)
- {
- return f.getHeaders()->get<framing::DeliveryProperties>()->getRoutingKey();
- }
-
- const std::string& getExchange(const framing::FrameSet& f)
- {
- return f.as<framing::MessageTransferBody>()->getDestination();
- }
-
- bool isImmediate(const framing::FrameSet&)
- {
- //TODO: we seem to have lost the immediate flag
- return false;
- }
-
- const framing::FieldTable& getApplicationHeaders(const framing::FrameSet& f)
- {
- return f.getHeaders()->get<framing::MessageProperties>()->getApplicationHeaders();
- }
-
- bool isPersistent(const framing::FrameSet& f)
- {
- return f.getHeaders()->get<framing::DeliveryProperties>()->getDeliveryMode() == 2;
- }
+ std::string getRoutingKey(const framing::FrameSet& f);
+ std::string getExchange(const framing::FrameSet& f);
+ bool isImmediate(const framing::FrameSet&);
+ const framing::FieldTable* getApplicationHeaders(const framing::FrameSet& f);
+ bool isPersistent(const framing::FrameSet& f);
};
}}
diff --git a/cpp/src/qpid/broker/MessageBuilder.cpp b/cpp/src/qpid/broker/MessageBuilder.cpp
index 1a84aa9b65..84b3cbb2ac 100644
--- a/cpp/src/qpid/broker/MessageBuilder.cpp
+++ b/cpp/src/qpid/broker/MessageBuilder.cpp
@@ -22,8 +22,8 @@
#include "Message.h"
#include "MessageStore.h"
-#include "qpid/Exception.h"
#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/reply_exceptions.h"
using namespace qpid::broker;
using namespace qpid::framing;
@@ -46,7 +46,7 @@ void MessageBuilder::handle(AMQFrame& frame)
checkType(CONTENT_BODY, frame.getBody()->type());
break;
default:
- throw ConnectionException(504, "Invalid frame sequence for message.");
+ throw CommandInvalidException(QPID_MSG("Invalid frame sequence for message (state=" << state << ")"));
}
if (staging) {
store->appendContent(*message, frame.castBody<AMQContentBody>()->getData());
@@ -61,13 +61,6 @@ void MessageBuilder::handle(AMQFrame& frame)
}
}
-void MessageBuilder::checkType(uint8_t expected, uint8_t actual)
-{
- if (expected != actual) {
- throw ConnectionException(504, "Invalid frame sequence for message.");
- }
-}
-
void MessageBuilder::end()
{
message.reset();
@@ -81,3 +74,32 @@ void MessageBuilder::start(const SequenceNumber& id)
state = METHOD;
staging = false;
}
+
+namespace {
+
+const std::string HEADER_BODY_S = "HEADER";
+const std::string METHOD_BODY_S = "METHOD";
+const std::string CONTENT_BODY_S = "CONTENT";
+const std::string HEARTBEAT_BODY_S = "HEARTBEAT";
+const std::string UNKNOWN = "unknown";
+
+std::string type_str(uint8_t type)
+{
+ switch(type) {
+ case METHOD_BODY: return METHOD_BODY_S;
+ case HEADER_BODY: return HEADER_BODY_S;
+ case CONTENT_BODY: return CONTENT_BODY_S;
+ case HEARTBEAT_BODY: return HEARTBEAT_BODY_S;
+ }
+ return UNKNOWN;
+}
+
+}
+
+void MessageBuilder::checkType(uint8_t expected, uint8_t actual)
+{
+ if (expected != actual) {
+ throw CommandInvalidException(QPID_MSG("Invalid frame sequence for message (expected "
+ << type_str(expected) << " got " << type_str(actual) << ")"));
+ }
+}
diff --git a/cpp/src/qpid/broker/Session.cpp b/cpp/src/qpid/broker/Session.cpp
index a8b22cb12a..650182c807 100644
--- a/cpp/src/qpid/broker/Session.cpp
+++ b/cpp/src/qpid/broker/Session.cpp
@@ -336,13 +336,13 @@ void Session::route(Message::shared_ptr msg, Deliverable& strategy) {
cacheExchange = getAdapter()->getConnection().broker.getExchanges().get(exchangeName);
}
- cacheExchange->route(strategy, msg->getRoutingKey(), &(msg->getApplicationHeaders()));
+ cacheExchange->route(strategy, msg->getRoutingKey(), msg->getApplicationHeaders());
if (!strategy.delivered) {
//TODO:if reject-unroutable, then reject
//else route to alternate exchange
if (cacheExchange->getAlternate()) {
- cacheExchange->getAlternate()->route(strategy, msg->getRoutingKey(), &(msg->getApplicationHeaders()));
+ cacheExchange->getAlternate()->route(strategy, msg->getRoutingKey(), msg->getApplicationHeaders());
}
}
diff --git a/cpp/src/qpid/client/ClientMessage.h b/cpp/src/qpid/client/ClientMessage.h
index 19b0f867bc..1afe5585a9 100644
--- a/cpp/src/qpid/client/ClientMessage.h
+++ b/cpp/src/qpid/client/ClientMessage.h
@@ -58,8 +58,14 @@ class Message : public framing::BasicHeaderProperties, public framing::MethodCon
bool isRedelivered() const { return redelivered; }
void setRedelivered(bool _redelivered){ redelivered = _redelivered; }
- const HeaderProperties& getMethodHeaders() const { return *this; }
-
+ framing::AMQHeaderBody getHeader() const
+ {
+ framing::AMQHeaderBody header;
+ BasicHeaderProperties* properties = header.get<BasicHeaderProperties>(true);
+ BasicHeaderProperties::copy<BasicHeaderProperties, Message>(*properties, *this);
+ properties->setContentLength(data.size());
+ return header;
+ }
//TODO: move this elsewhere (GRS 24/08/2007)
void populate(framing::FrameSet& frameset)
diff --git a/cpp/src/qpid/client/ConnectionImpl.cpp b/cpp/src/qpid/client/ConnectionImpl.cpp
index 5ff34cde4e..d21d550ee2 100644
--- a/cpp/src/qpid/client/ConnectionImpl.cpp
+++ b/cpp/src/qpid/client/ConnectionImpl.cpp
@@ -24,6 +24,7 @@
using namespace qpid::client;
using namespace qpid::framing;
+using namespace qpid::sys;
ConnectionImpl::ConnectionImpl(boost::shared_ptr<Connector> c) : connector(c)
{
@@ -38,6 +39,7 @@ ConnectionImpl::ConnectionImpl(boost::shared_ptr<Connector> c) : connector(c)
void ConnectionImpl::allocated(SessionCore::shared_ptr session)
{
+ Mutex::ScopedLock l(lock);
if (sessions.find(session->getId()) != sessions.end()) {
throw Exception("Id already in use.");
}
@@ -46,6 +48,7 @@ void ConnectionImpl::allocated(SessionCore::shared_ptr session)
void ConnectionImpl::released(SessionCore::shared_ptr session)
{
+ Mutex::ScopedLock l(lock);
SessionMap::iterator i = sessions.find(session->getId());
if (i != sessions.end()) {
sessions.erase(i);
@@ -59,12 +62,7 @@ void ConnectionImpl::handle(framing::AMQFrame& frame)
void ConnectionImpl::incoming(framing::AMQFrame& frame)
{
- uint16_t id = frame.getChannel();
- SessionMap::iterator i = sessions.find(id);
- if (i == sessions.end()) {
- throw ConnectionException(504, (boost::format("Invalid channel number %g") % id).str());
- }
- i->second->handle(frame);
+ find(frame.getChannel())->handle(frame);
}
void ConnectionImpl::open(const std::string& host, int port,
@@ -93,10 +91,7 @@ void ConnectionImpl::closed()
void ConnectionImpl::closedByPeer(uint16_t code, const std::string& text)
{
- for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); i++) {
- i->second->closed(code, text);
- }
- sessions.clear();
+ signalClose(code, text);
connector->close();
}
@@ -114,8 +109,25 @@ void ConnectionImpl::idleOut()
void ConnectionImpl::shutdown()
{
//this indicates that the socket to the server has closed
+ signalClose(0, "Unexpected socket closure.");
+}
+
+void ConnectionImpl::signalClose(uint16_t code, const std::string& text)
+{
+ Mutex::ScopedLock l(lock);
for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); i++) {
- i->second->closed(0, "Unexpected socket closure.");
+ Mutex::ScopedUnlock u(lock);
+ i->second->closed(code, text);
}
sessions.clear();
}
+
+SessionCore::shared_ptr ConnectionImpl::find(uint16_t id)
+{
+ Mutex::ScopedLock l(lock);
+ SessionMap::iterator i = sessions.find(id);
+ if (i == sessions.end()) {
+ throw ConnectionException(504, (boost::format("Invalid channel number %g") % id).str());
+ }
+ return i->second;
+}
diff --git a/cpp/src/qpid/client/ConnectionImpl.h b/cpp/src/qpid/client/ConnectionImpl.h
index e37713e77d..a2ee14ea6e 100644
--- a/cpp/src/qpid/client/ConnectionImpl.h
+++ b/cpp/src/qpid/client/ConnectionImpl.h
@@ -25,6 +25,7 @@
#include <map>
#include <boost/shared_ptr.hpp>
#include "qpid/framing/FrameHandler.h"
+#include "qpid/sys/Mutex.h"
#include "qpid/sys/ShutdownHandler.h"
#include "qpid/sys/TimeoutHandler.h"
#include "ConnectionHandler.h"
@@ -44,6 +45,7 @@ class ConnectionImpl : public framing::FrameHandler,
ConnectionHandler handler;
boost::shared_ptr<Connector> connector;
framing::ProtocolVersion version;
+ sys::Mutex lock;
void incoming(framing::AMQFrame& frame);
void closed();
@@ -51,6 +53,9 @@ class ConnectionImpl : public framing::FrameHandler,
void idleOut();
void idleIn();
void shutdown();
+ void signalClose(uint16_t, const std::string&);
+ SessionCore::shared_ptr find(uint16_t);
+
public:
typedef boost::shared_ptr<ConnectionImpl> shared_ptr;
diff --git a/cpp/src/qpid/client/ExecutionHandler.cpp b/cpp/src/qpid/client/ExecutionHandler.cpp
index 1520ba2272..c2b5e45928 100644
--- a/cpp/src/qpid/client/ExecutionHandler.cpp
+++ b/cpp/src/qpid/client/ExecutionHandler.cpp
@@ -181,31 +181,28 @@ SequenceNumber ExecutionHandler::send(const AMQBody& command, const MethodConten
CompletionTracker::ResultListener l)
{
SequenceNumber id = send(command, l);
- sendContent(dynamic_cast<const BasicHeaderProperties&>(content.getMethodHeaders()), content.getData());
+ sendContent(content);
return id;
}
-void ExecutionHandler::sendContent(const BasicHeaderProperties& headers, const std::string& data)
+void ExecutionHandler::sendContent(const MethodContent& content)
{
- AMQHeaderBody header;
- BasicHeaderProperties::copy(*header.get<BasicHeaderProperties>(true), headers);
- header.get<BasicHeaderProperties>(true)->setContentLength(data.size());
- AMQFrame h(0, header);
- out(h);
+ AMQFrame header(0, content.getHeader());
+ out(header);
- u_int64_t data_length = data.length();
+ u_int64_t data_length = content.getData().length();
if(data_length > 0){
//frame itself uses 8 bytes
u_int32_t frag_size = maxFrameSize - 8;
if(data_length < frag_size){
- AMQFrame frame(0, AMQContentBody(data));
+ AMQFrame frame(0, AMQContentBody(content.getData()));
out(frame);
}else{
u_int32_t offset = 0;
u_int32_t remaining = data_length - offset;
while (remaining > 0) {
u_int32_t length = remaining > frag_size ? frag_size : remaining;
- string frag(data.substr(offset, length));
+ string frag(content.getData().substr(offset, length));
AMQFrame frame(0, AMQContentBody(frag));
out(frame);
offset += length;
diff --git a/cpp/src/qpid/client/ExecutionHandler.h b/cpp/src/qpid/client/ExecutionHandler.h
index a42697e26a..3078f6bc3a 100644
--- a/cpp/src/qpid/client/ExecutionHandler.h
+++ b/cpp/src/qpid/client/ExecutionHandler.h
@@ -59,7 +59,7 @@ class ExecutionHandler :
void sendCompletion();
- void sendContent(const framing::BasicHeaderProperties& headers, const std::string& data);
+ void sendContent(const framing::MethodContent&);
public:
typedef CompletionTracker::ResultListener ResultListener;
diff --git a/cpp/src/qpid/framing/AMQContentBody.cpp b/cpp/src/qpid/framing/AMQContentBody.cpp
index b0850ea434..176114ea0c 100644
--- a/cpp/src/qpid/framing/AMQContentBody.cpp
+++ b/cpp/src/qpid/framing/AMQContentBody.cpp
@@ -41,6 +41,6 @@ void qpid::framing::AMQContentBody::print(std::ostream& out) const
{
out << "content (" << size() << " bytes)";
#ifndef NDEBUG
- out << data.substr(0,10);
+ out << " " << data.substr(0,10);
#endif
}
diff --git a/cpp/src/qpid/framing/FrameSet.cpp b/cpp/src/qpid/framing/FrameSet.cpp
index 434f1b3aad..12579f53cb 100644
--- a/cpp/src/qpid/framing/FrameSet.cpp
+++ b/cpp/src/qpid/framing/FrameSet.cpp
@@ -56,17 +56,17 @@ bool FrameSet::isComplete() const
const AMQMethodBody* FrameSet::getMethod() const
{
- return parts.empty() ? 0 : dynamic_cast<const AMQMethodBody*>(parts[0].getBody());
+ return parts.empty() ? 0 : parts[0].getMethod();
}
const AMQHeaderBody* FrameSet::getHeaders() const
{
- return parts.size() < 2 ? 0 : dynamic_cast<const AMQHeaderBody*>(parts[1].getBody());
+ return parts.size() < 2 ? 0 : parts[1].castBody<AMQHeaderBody>();
}
AMQHeaderBody* FrameSet::getHeaders()
{
- return parts.size() < 2 ? 0 : dynamic_cast<AMQHeaderBody*>(parts[1].getBody());
+ return parts.size() < 2 ? 0 : parts[1].castBody<AMQHeaderBody>();
}
uint64_t FrameSet::getContentSize() const
@@ -81,3 +81,10 @@ void FrameSet::getContent(std::string& out) const
AccumulateContent accumulator(out);
map_if(accumulator, TypeFilter(CONTENT_BODY));
}
+
+std::string FrameSet::getContent() const
+{
+ std::string out;
+ getContent(out);
+ return out;
+}
diff --git a/cpp/src/qpid/framing/FrameSet.h b/cpp/src/qpid/framing/FrameSet.h
index d6d5cd7a13..9a9512a6d4 100644
--- a/cpp/src/qpid/framing/FrameSet.h
+++ b/cpp/src/qpid/framing/FrameSet.h
@@ -48,6 +48,7 @@ public:
uint64_t getContentSize() const;
void getContent(std::string&) const;
+ std::string getContent() const;
const AMQMethodBody* getMethod() const;
const AMQHeaderBody* getHeaders() const;
diff --git a/cpp/src/qpid/framing/MethodContent.h b/cpp/src/qpid/framing/MethodContent.h
index 11d8d42cab..737c0d6b7b 100644
--- a/cpp/src/qpid/framing/MethodContent.h
+++ b/cpp/src/qpid/framing/MethodContent.h
@@ -21,7 +21,8 @@
#ifndef _MethodContent_
#define _MethodContent_
-#include "HeaderProperties.h"
+#include <string>
+#include "AMQHeaderBody.h"
namespace qpid {
namespace framing {
@@ -31,7 +32,7 @@ class MethodContent
public:
virtual ~MethodContent() {}
//TODO: rethink this interface
- virtual const HeaderProperties& getMethodHeaders() const = 0;
+ virtual AMQHeaderBody getHeader() const = 0;
virtual const std::string& getData() const = 0;
};
diff --git a/cpp/src/qpid/framing/TransferContent.cpp b/cpp/src/qpid/framing/TransferContent.cpp
new file mode 100644
index 0000000000..1faa24ae0c
--- /dev/null
+++ b/cpp/src/qpid/framing/TransferContent.cpp
@@ -0,0 +1,64 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "TransferContent.h"
+
+namespace qpid {
+namespace framing {
+
+TransferContent::TransferContent(const std::string& _data)
+{
+ setData(_data);
+}
+
+AMQHeaderBody TransferContent::getHeader() const
+{
+ return header;
+}
+
+const std::string& TransferContent::getData() const
+{
+ return data;
+}
+
+void TransferContent::setData(const std::string& _data)
+{
+ data = _data;
+ header.get<MessageProperties>(true)->setContentLength(data.size());
+}
+
+void TransferContent::appendData(const std::string& _data)
+{
+ data += _data;
+ header.get<MessageProperties>(true)->setContentLength(data.size());
+}
+
+MessageProperties& TransferContent::getMessageProperties()
+{
+ return *header.get<MessageProperties>(true);
+}
+
+DeliveryProperties& TransferContent::getDeliveryProperties()
+{
+ return *header.get<DeliveryProperties>(true);
+}
+
+}}
diff --git a/cpp/src/qpid/framing/TransferContent.h b/cpp/src/qpid/framing/TransferContent.h
new file mode 100644
index 0000000000..14ba209f4b
--- /dev/null
+++ b/cpp/src/qpid/framing/TransferContent.h
@@ -0,0 +1,46 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _TransferContent_
+#define _TransferContent_
+
+#include "MethodContent.h"
+#include "qpid/framing/MessageProperties.h"
+#include "qpid/framing/DeliveryProperties.h"
+
+namespace qpid {
+namespace framing {
+
+class TransferContent : public MethodContent
+{
+ AMQHeaderBody header;
+ std::string data;
+public:
+ TransferContent(const std::string& data);
+ AMQHeaderBody getHeader() const;
+ void setData(const std::string&);
+ void appendData(const std::string&);
+ const std::string& getData() const;
+ MessageProperties& getMessageProperties();
+ DeliveryProperties& getDeliveryProperties();
+};
+
+}}
+#endif
diff --git a/cpp/src/tests/ClientSessionTest.cpp b/cpp/src/tests/ClientSessionTest.cpp
index 1acac9c980..a3d50d0ae9 100644
--- a/cpp/src/tests/ClientSessionTest.cpp
+++ b/cpp/src/tests/ClientSessionTest.cpp
@@ -22,6 +22,7 @@
#include "qpid_test_plugin.h"
#include "InProcessBroker.h"
#include "qpid/client/Session.h"
+#include "qpid/framing/TransferContent.h"
using namespace qpid::client;
using namespace qpid::framing;
@@ -29,7 +30,8 @@ using namespace qpid::framing;
class ClientSessionTest : public CppUnit::TestCase
{
CPPUNIT_TEST_SUITE(ClientSessionTest);
- CPPUNIT_TEST(testQueueQuery);;
+ CPPUNIT_TEST(testQueueQuery);
+ CPPUNIT_TEST(testTransfer);
CPPUNIT_TEST_SUITE_END();
boost::shared_ptr<Connector> broker;
@@ -55,14 +57,24 @@ class ClientSessionTest : public CppUnit::TestCase
CPPUNIT_ASSERT_EQUAL(alternate, result.get().getAlternateExchange());
}
- void testCompletion()
+ void testTransfer()
{
std::string queue("my-queue");
std::string dest("my-dest");
+ std::string data("my message");
session.queueDeclare(0, queue, "", false, false, true, true, FieldTable());
- //subcribe to the queue with confirm_mode = 1
+ //subcribe to the queue with confirm_mode = 1:
session.messageSubscribe(0, queue, dest, false, 1, 0, false, FieldTable());
- //publish some messages
+ //publish a message:
+ TransferContent content(data);
+ content.getDeliveryProperties().setRoutingKey("my-queue");
+ session.messageTransfer(0, "", 0, 0, content);
+ //get & test the message:
+ FrameSet::shared_ptr msg = session.get();
+ CPPUNIT_ASSERT(msg->isA<MessageTransferBody>());
+ CPPUNIT_ASSERT_EQUAL(data, msg->getContent());
+ //confirm receipt:
+ session.execution().completed(msg->getId(), true, true);
}
};