diff options
author | Gordon Sim <gsim@apache.org> | 2007-09-10 08:41:05 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2007-09-10 08:41:05 +0000 |
commit | a5c0fde5d0b96ae0b747f0cea21414753d6ee654 (patch) | |
tree | 4a809a880691db3e04fa3c7374db500b767ca85b /cpp/src | |
parent | 783b718d0b270121cd2e597424d0c81adea77a38 (diff) | |
download | qpid-python-a5c0fde5d0b96ae0b747f0cea21414753d6ee654.tar.gz |
Client side support for message and delivery properties in header segments.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@574176 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/Makefile.am | 3 | ||||
-rw-r--r-- | cpp/src/qpid/broker/BrokerQueue.cpp | 4 | ||||
-rw-r--r-- | cpp/src/qpid/broker/DeliveryRecord.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Message.cpp | 6 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Message.h | 6 | ||||
-rw-r--r-- | cpp/src/qpid/broker/MessageAdapter.cpp | 87 | ||||
-rw-r--r-- | cpp/src/qpid/broker/MessageAdapter.h | 65 | ||||
-rw-r--r-- | cpp/src/qpid/broker/MessageBuilder.cpp | 40 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Session.cpp | 4 | ||||
-rw-r--r-- | cpp/src/qpid/client/ClientMessage.h | 10 | ||||
-rw-r--r-- | cpp/src/qpid/client/ConnectionImpl.cpp | 34 | ||||
-rw-r--r-- | cpp/src/qpid/client/ConnectionImpl.h | 5 | ||||
-rw-r--r-- | cpp/src/qpid/client/ExecutionHandler.cpp | 17 | ||||
-rw-r--r-- | cpp/src/qpid/client/ExecutionHandler.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/framing/AMQContentBody.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/framing/FrameSet.cpp | 13 | ||||
-rw-r--r-- | cpp/src/qpid/framing/FrameSet.h | 1 | ||||
-rw-r--r-- | cpp/src/qpid/framing/MethodContent.h | 5 | ||||
-rw-r--r-- | cpp/src/qpid/framing/TransferContent.cpp | 64 | ||||
-rw-r--r-- | cpp/src/qpid/framing/TransferContent.h | 46 | ||||
-rw-r--r-- | cpp/src/tests/ClientSessionTest.cpp | 20 |
21 files changed, 330 insertions, 106 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index d97265c1d6..a1d8e38372 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -132,6 +132,7 @@ libqpidcommon_la_SOURCES = \ qpid/framing/Blob.cpp \ qpid/framing/MethodHolder.h qpid/framing/MethodHolder.cpp \ qpid/framing/MethodHolderMaxSize.h \ + qpid/framing/TransferContent.cpp \ qpid/Exception.cpp \ qpid/Plugin.h \ qpid/Plugin.cpp \ @@ -178,6 +179,7 @@ libqpidbroker_la_SOURCES = \ qpid/broker/FanOutExchange.cpp \ qpid/broker/HeadersExchange.cpp \ qpid/broker/Message.cpp \ + qpid/broker/MessageAdapter.cpp \ qpid/broker/MessageBuilder.cpp \ qpid/broker/MessageDelivery.cpp \ qpid/broker/MessageHandlerImpl.cpp \ @@ -344,6 +346,7 @@ nobase_include_HEADERS = \ qpid/framing/SequenceNumberSet.h \ qpid/framing/SerializeHandler.h \ qpid/framing/StructHelper.h \ + qpid/framing/TransferContent.h \ qpid/framing/TypeFilter.h \ qpid/framing/Value.h \ qpid/framing/Visitor.h \ diff --git a/cpp/src/qpid/broker/BrokerQueue.cpp b/cpp/src/qpid/broker/BrokerQueue.cpp index a094c7a804..1a13a31a5e 100644 --- a/cpp/src/qpid/broker/BrokerQueue.cpp +++ b/cpp/src/qpid/broker/BrokerQueue.cpp @@ -68,7 +68,7 @@ void Queue::deliver(Message::shared_ptr& msg){ if (msg->isImmediate() && getConsumerCount() == 0) { if (alternateExchange) { DeliverableMessage deliverable(msg); - alternateExchange->route(deliverable, msg->getRoutingKey(), &(msg->getApplicationHeaders())); + alternateExchange->route(deliverable, msg->getRoutingKey(), msg->getApplicationHeaders()); } } else { @@ -358,7 +358,7 @@ void Queue::destroy() while(!messages.empty()){ DeliverableMessage msg(messages.front().payload); alternateExchange->route(msg, msg.getMessage().getRoutingKey(), - &(msg.getMessage().getApplicationHeaders())); + msg.getMessage().getApplicationHeaders()); pop(); } alternateExchange->decAlternateUsers(); diff --git a/cpp/src/qpid/broker/DeliveryRecord.cpp b/cpp/src/qpid/broker/DeliveryRecord.cpp index 9b33fd5f10..a8a0745104 100644 --- a/cpp/src/qpid/broker/DeliveryRecord.cpp +++ b/cpp/src/qpid/broker/DeliveryRecord.cpp @@ -95,7 +95,7 @@ void DeliveryRecord::reject() Exchange::shared_ptr alternate = queue->getAlternateExchange(); if (alternate) { DeliverableMessage delivery(msg.payload); - alternate->route(delivery, msg.payload->getRoutingKey(), &(msg.payload->getApplicationHeaders())); + alternate->route(delivery, msg.payload->getRoutingKey(), msg.payload->getApplicationHeaders()); QPID_LOG(info, "Routed rejected message from " << queue->getName() << " to " << alternate->getName()); } else { diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp index e5f92297b7..84d3478173 100644 --- a/cpp/src/qpid/broker/Message.cpp +++ b/cpp/src/qpid/broker/Message.cpp @@ -38,12 +38,12 @@ PublishAdapter Message::PUBLISH; Message::Message(const SequenceNumber& id) : frames(id), persistenceId(0), redelivered(false), publisher(0), store(0), adapter(0) {} -const std::string& Message::getRoutingKey() const +std::string Message::getRoutingKey() const { return getAdapter().getRoutingKey(frames); } -const std::string& Message::getExchangeName() const +std::string Message::getExchangeName() const { return getAdapter().getExchange(frames); } @@ -61,7 +61,7 @@ bool Message::isImmediate() const return getAdapter().isImmediate(frames); } -const FieldTable& Message::getApplicationHeaders() const +const FieldTable* Message::getApplicationHeaders() const { return getAdapter().getApplicationHeaders(frames); } diff --git a/cpp/src/qpid/broker/Message.h b/cpp/src/qpid/broker/Message.h index 95b3f38b55..26b31d73e5 100644 --- a/cpp/src/qpid/broker/Message.h +++ b/cpp/src/qpid/broker/Message.h @@ -59,11 +59,11 @@ public: uint64_t contentSize() const; - const std::string& getRoutingKey() const; + std::string getRoutingKey() const; const boost::shared_ptr<Exchange> getExchange(ExchangeRegistry&) const; - const std::string& getExchangeName() const; + std::string getExchangeName() const; bool isImmediate() const; - const framing::FieldTable& getApplicationHeaders() const; + const framing::FieldTable* getApplicationHeaders() const; bool isPersistent(); framing::FrameSet& getFrames() { return frames; } diff --git a/cpp/src/qpid/broker/MessageAdapter.cpp b/cpp/src/qpid/broker/MessageAdapter.cpp new file mode 100644 index 0000000000..764bf02cf4 --- /dev/null +++ b/cpp/src/qpid/broker/MessageAdapter.cpp @@ -0,0 +1,87 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "MessageAdapter.h" + +namespace { + const std::string empty; +} + +namespace qpid { +namespace broker{ + + std::string PublishAdapter::getRoutingKey(const framing::FrameSet& f) + { + return f.as<framing::BasicPublishBody>()->getRoutingKey(); + } + + std::string PublishAdapter::getExchange(const framing::FrameSet& f) + { + return f.as<framing::BasicPublishBody>()->getExchange(); + } + + bool PublishAdapter::isImmediate(const framing::FrameSet& f) + { + return f.as<framing::BasicPublishBody>()->getImmediate(); + } + + const framing::FieldTable* PublishAdapter::getApplicationHeaders(const framing::FrameSet& f) + { + const framing::BasicHeaderProperties* p = f.getHeaders()->get<framing::BasicHeaderProperties>(); + return p ? &(p->getHeaders()) : 0; + } + + bool PublishAdapter::isPersistent(const framing::FrameSet& f) + { + const framing::BasicHeaderProperties* p = f.getHeaders()->get<framing::BasicHeaderProperties>(); + return p && p->getDeliveryMode() == 2; + } + + std::string TransferAdapter::getRoutingKey(const framing::FrameSet& f) + { + const framing::DeliveryProperties* p = f.getHeaders()->get<framing::DeliveryProperties>(); + return p ? p->getRoutingKey() : empty; + } + + std::string TransferAdapter::getExchange(const framing::FrameSet& f) + { + return f.as<framing::MessageTransferBody>()->getDestination(); + } + + bool TransferAdapter::isImmediate(const framing::FrameSet&) + { + //TODO: we seem to have lost the immediate flag + return false; + } + + const framing::FieldTable* TransferAdapter::getApplicationHeaders(const framing::FrameSet& f) + { + const framing::MessageProperties* p = f.getHeaders()->get<framing::MessageProperties>(); + return p ? &(p->getApplicationHeaders()) : 0; + } + + bool TransferAdapter::isPersistent(const framing::FrameSet& f) + { + const framing::DeliveryProperties* p = f.getHeaders()->get<framing::DeliveryProperties>(); + return p && p->getDeliveryMode() == 2; + } + +}} diff --git a/cpp/src/qpid/broker/MessageAdapter.h b/cpp/src/qpid/broker/MessageAdapter.h index 0b2dc6307a..e8337ec649 100644 --- a/cpp/src/qpid/broker/MessageAdapter.h +++ b/cpp/src/qpid/broker/MessageAdapter.h @@ -38,68 +38,29 @@ struct MessageAdapter { virtual ~MessageAdapter() {} - virtual const std::string& getRoutingKey(const framing::FrameSet& f) = 0; - virtual const std::string& getExchange(const framing::FrameSet& f) = 0; + virtual std::string getRoutingKey(const framing::FrameSet& f) = 0; + virtual std::string getExchange(const framing::FrameSet& f) = 0; virtual bool isImmediate(const framing::FrameSet& f) = 0; - virtual const framing::FieldTable& getApplicationHeaders(const framing::FrameSet& f) = 0; + virtual const framing::FieldTable* getApplicationHeaders(const framing::FrameSet& f) = 0; virtual bool isPersistent(const framing::FrameSet& f) = 0; }; struct PublishAdapter : MessageAdapter { - const std::string& getRoutingKey(const framing::FrameSet& f) - { - return f.as<framing::BasicPublishBody>()->getRoutingKey(); - } - - const std::string& getExchange(const framing::FrameSet& f) - { - return f.as<framing::BasicPublishBody>()->getExchange(); - } - - bool isImmediate(const framing::FrameSet& f) - { - return f.as<framing::BasicPublishBody>()->getImmediate(); - } - - const framing::FieldTable& getApplicationHeaders(const framing::FrameSet& f) - { - return f.getHeaders()->get<framing::BasicHeaderProperties>()->getHeaders(); - } - - bool isPersistent(const framing::FrameSet& f) - { - return f.getHeaders()->get<framing::BasicHeaderProperties>()->getDeliveryMode() == 2; - } + std::string getRoutingKey(const framing::FrameSet& f); + std::string getExchange(const framing::FrameSet& f); + bool isImmediate(const framing::FrameSet& f); + const framing::FieldTable* getApplicationHeaders(const framing::FrameSet& f); + bool isPersistent(const framing::FrameSet& f); }; struct TransferAdapter : MessageAdapter { - const std::string& getRoutingKey(const framing::FrameSet& f) - { - return f.getHeaders()->get<framing::DeliveryProperties>()->getRoutingKey(); - } - - const std::string& getExchange(const framing::FrameSet& f) - { - return f.as<framing::MessageTransferBody>()->getDestination(); - } - - bool isImmediate(const framing::FrameSet&) - { - //TODO: we seem to have lost the immediate flag - return false; - } - - const framing::FieldTable& getApplicationHeaders(const framing::FrameSet& f) - { - return f.getHeaders()->get<framing::MessageProperties>()->getApplicationHeaders(); - } - - bool isPersistent(const framing::FrameSet& f) - { - return f.getHeaders()->get<framing::DeliveryProperties>()->getDeliveryMode() == 2; - } + std::string getRoutingKey(const framing::FrameSet& f); + std::string getExchange(const framing::FrameSet& f); + bool isImmediate(const framing::FrameSet&); + const framing::FieldTable* getApplicationHeaders(const framing::FrameSet& f); + bool isPersistent(const framing::FrameSet& f); }; }} diff --git a/cpp/src/qpid/broker/MessageBuilder.cpp b/cpp/src/qpid/broker/MessageBuilder.cpp index 1a84aa9b65..84b3cbb2ac 100644 --- a/cpp/src/qpid/broker/MessageBuilder.cpp +++ b/cpp/src/qpid/broker/MessageBuilder.cpp @@ -22,8 +22,8 @@ #include "Message.h" #include "MessageStore.h" -#include "qpid/Exception.h" #include "qpid/framing/AMQFrame.h" +#include "qpid/framing/reply_exceptions.h" using namespace qpid::broker; using namespace qpid::framing; @@ -46,7 +46,7 @@ void MessageBuilder::handle(AMQFrame& frame) checkType(CONTENT_BODY, frame.getBody()->type()); break; default: - throw ConnectionException(504, "Invalid frame sequence for message."); + throw CommandInvalidException(QPID_MSG("Invalid frame sequence for message (state=" << state << ")")); } if (staging) { store->appendContent(*message, frame.castBody<AMQContentBody>()->getData()); @@ -61,13 +61,6 @@ void MessageBuilder::handle(AMQFrame& frame) } } -void MessageBuilder::checkType(uint8_t expected, uint8_t actual) -{ - if (expected != actual) { - throw ConnectionException(504, "Invalid frame sequence for message."); - } -} - void MessageBuilder::end() { message.reset(); @@ -81,3 +74,32 @@ void MessageBuilder::start(const SequenceNumber& id) state = METHOD; staging = false; } + +namespace { + +const std::string HEADER_BODY_S = "HEADER"; +const std::string METHOD_BODY_S = "METHOD"; +const std::string CONTENT_BODY_S = "CONTENT"; +const std::string HEARTBEAT_BODY_S = "HEARTBEAT"; +const std::string UNKNOWN = "unknown"; + +std::string type_str(uint8_t type) +{ + switch(type) { + case METHOD_BODY: return METHOD_BODY_S; + case HEADER_BODY: return HEADER_BODY_S; + case CONTENT_BODY: return CONTENT_BODY_S; + case HEARTBEAT_BODY: return HEARTBEAT_BODY_S; + } + return UNKNOWN; +} + +} + +void MessageBuilder::checkType(uint8_t expected, uint8_t actual) +{ + if (expected != actual) { + throw CommandInvalidException(QPID_MSG("Invalid frame sequence for message (expected " + << type_str(expected) << " got " << type_str(actual) << ")")); + } +} diff --git a/cpp/src/qpid/broker/Session.cpp b/cpp/src/qpid/broker/Session.cpp index a8b22cb12a..650182c807 100644 --- a/cpp/src/qpid/broker/Session.cpp +++ b/cpp/src/qpid/broker/Session.cpp @@ -336,13 +336,13 @@ void Session::route(Message::shared_ptr msg, Deliverable& strategy) { cacheExchange = getAdapter()->getConnection().broker.getExchanges().get(exchangeName); } - cacheExchange->route(strategy, msg->getRoutingKey(), &(msg->getApplicationHeaders())); + cacheExchange->route(strategy, msg->getRoutingKey(), msg->getApplicationHeaders()); if (!strategy.delivered) { //TODO:if reject-unroutable, then reject //else route to alternate exchange if (cacheExchange->getAlternate()) { - cacheExchange->getAlternate()->route(strategy, msg->getRoutingKey(), &(msg->getApplicationHeaders())); + cacheExchange->getAlternate()->route(strategy, msg->getRoutingKey(), msg->getApplicationHeaders()); } } diff --git a/cpp/src/qpid/client/ClientMessage.h b/cpp/src/qpid/client/ClientMessage.h index 19b0f867bc..1afe5585a9 100644 --- a/cpp/src/qpid/client/ClientMessage.h +++ b/cpp/src/qpid/client/ClientMessage.h @@ -58,8 +58,14 @@ class Message : public framing::BasicHeaderProperties, public framing::MethodCon bool isRedelivered() const { return redelivered; } void setRedelivered(bool _redelivered){ redelivered = _redelivered; } - const HeaderProperties& getMethodHeaders() const { return *this; } - + framing::AMQHeaderBody getHeader() const + { + framing::AMQHeaderBody header; + BasicHeaderProperties* properties = header.get<BasicHeaderProperties>(true); + BasicHeaderProperties::copy<BasicHeaderProperties, Message>(*properties, *this); + properties->setContentLength(data.size()); + return header; + } //TODO: move this elsewhere (GRS 24/08/2007) void populate(framing::FrameSet& frameset) diff --git a/cpp/src/qpid/client/ConnectionImpl.cpp b/cpp/src/qpid/client/ConnectionImpl.cpp index 5ff34cde4e..d21d550ee2 100644 --- a/cpp/src/qpid/client/ConnectionImpl.cpp +++ b/cpp/src/qpid/client/ConnectionImpl.cpp @@ -24,6 +24,7 @@ using namespace qpid::client; using namespace qpid::framing; +using namespace qpid::sys; ConnectionImpl::ConnectionImpl(boost::shared_ptr<Connector> c) : connector(c) { @@ -38,6 +39,7 @@ ConnectionImpl::ConnectionImpl(boost::shared_ptr<Connector> c) : connector(c) void ConnectionImpl::allocated(SessionCore::shared_ptr session) { + Mutex::ScopedLock l(lock); if (sessions.find(session->getId()) != sessions.end()) { throw Exception("Id already in use."); } @@ -46,6 +48,7 @@ void ConnectionImpl::allocated(SessionCore::shared_ptr session) void ConnectionImpl::released(SessionCore::shared_ptr session) { + Mutex::ScopedLock l(lock); SessionMap::iterator i = sessions.find(session->getId()); if (i != sessions.end()) { sessions.erase(i); @@ -59,12 +62,7 @@ void ConnectionImpl::handle(framing::AMQFrame& frame) void ConnectionImpl::incoming(framing::AMQFrame& frame) { - uint16_t id = frame.getChannel(); - SessionMap::iterator i = sessions.find(id); - if (i == sessions.end()) { - throw ConnectionException(504, (boost::format("Invalid channel number %g") % id).str()); - } - i->second->handle(frame); + find(frame.getChannel())->handle(frame); } void ConnectionImpl::open(const std::string& host, int port, @@ -93,10 +91,7 @@ void ConnectionImpl::closed() void ConnectionImpl::closedByPeer(uint16_t code, const std::string& text) { - for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); i++) { - i->second->closed(code, text); - } - sessions.clear(); + signalClose(code, text); connector->close(); } @@ -114,8 +109,25 @@ void ConnectionImpl::idleOut() void ConnectionImpl::shutdown() { //this indicates that the socket to the server has closed + signalClose(0, "Unexpected socket closure."); +} + +void ConnectionImpl::signalClose(uint16_t code, const std::string& text) +{ + Mutex::ScopedLock l(lock); for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); i++) { - i->second->closed(0, "Unexpected socket closure."); + Mutex::ScopedUnlock u(lock); + i->second->closed(code, text); } sessions.clear(); } + +SessionCore::shared_ptr ConnectionImpl::find(uint16_t id) +{ + Mutex::ScopedLock l(lock); + SessionMap::iterator i = sessions.find(id); + if (i == sessions.end()) { + throw ConnectionException(504, (boost::format("Invalid channel number %g") % id).str()); + } + return i->second; +} diff --git a/cpp/src/qpid/client/ConnectionImpl.h b/cpp/src/qpid/client/ConnectionImpl.h index e37713e77d..a2ee14ea6e 100644 --- a/cpp/src/qpid/client/ConnectionImpl.h +++ b/cpp/src/qpid/client/ConnectionImpl.h @@ -25,6 +25,7 @@ #include <map> #include <boost/shared_ptr.hpp> #include "qpid/framing/FrameHandler.h" +#include "qpid/sys/Mutex.h" #include "qpid/sys/ShutdownHandler.h" #include "qpid/sys/TimeoutHandler.h" #include "ConnectionHandler.h" @@ -44,6 +45,7 @@ class ConnectionImpl : public framing::FrameHandler, ConnectionHandler handler; boost::shared_ptr<Connector> connector; framing::ProtocolVersion version; + sys::Mutex lock; void incoming(framing::AMQFrame& frame); void closed(); @@ -51,6 +53,9 @@ class ConnectionImpl : public framing::FrameHandler, void idleOut(); void idleIn(); void shutdown(); + void signalClose(uint16_t, const std::string&); + SessionCore::shared_ptr find(uint16_t); + public: typedef boost::shared_ptr<ConnectionImpl> shared_ptr; diff --git a/cpp/src/qpid/client/ExecutionHandler.cpp b/cpp/src/qpid/client/ExecutionHandler.cpp index 1520ba2272..c2b5e45928 100644 --- a/cpp/src/qpid/client/ExecutionHandler.cpp +++ b/cpp/src/qpid/client/ExecutionHandler.cpp @@ -181,31 +181,28 @@ SequenceNumber ExecutionHandler::send(const AMQBody& command, const MethodConten CompletionTracker::ResultListener l) { SequenceNumber id = send(command, l); - sendContent(dynamic_cast<const BasicHeaderProperties&>(content.getMethodHeaders()), content.getData()); + sendContent(content); return id; } -void ExecutionHandler::sendContent(const BasicHeaderProperties& headers, const std::string& data) +void ExecutionHandler::sendContent(const MethodContent& content) { - AMQHeaderBody header; - BasicHeaderProperties::copy(*header.get<BasicHeaderProperties>(true), headers); - header.get<BasicHeaderProperties>(true)->setContentLength(data.size()); - AMQFrame h(0, header); - out(h); + AMQFrame header(0, content.getHeader()); + out(header); - u_int64_t data_length = data.length(); + u_int64_t data_length = content.getData().length(); if(data_length > 0){ //frame itself uses 8 bytes u_int32_t frag_size = maxFrameSize - 8; if(data_length < frag_size){ - AMQFrame frame(0, AMQContentBody(data)); + AMQFrame frame(0, AMQContentBody(content.getData())); out(frame); }else{ u_int32_t offset = 0; u_int32_t remaining = data_length - offset; while (remaining > 0) { u_int32_t length = remaining > frag_size ? frag_size : remaining; - string frag(data.substr(offset, length)); + string frag(content.getData().substr(offset, length)); AMQFrame frame(0, AMQContentBody(frag)); out(frame); offset += length; diff --git a/cpp/src/qpid/client/ExecutionHandler.h b/cpp/src/qpid/client/ExecutionHandler.h index a42697e26a..3078f6bc3a 100644 --- a/cpp/src/qpid/client/ExecutionHandler.h +++ b/cpp/src/qpid/client/ExecutionHandler.h @@ -59,7 +59,7 @@ class ExecutionHandler : void sendCompletion(); - void sendContent(const framing::BasicHeaderProperties& headers, const std::string& data); + void sendContent(const framing::MethodContent&); public: typedef CompletionTracker::ResultListener ResultListener; diff --git a/cpp/src/qpid/framing/AMQContentBody.cpp b/cpp/src/qpid/framing/AMQContentBody.cpp index b0850ea434..176114ea0c 100644 --- a/cpp/src/qpid/framing/AMQContentBody.cpp +++ b/cpp/src/qpid/framing/AMQContentBody.cpp @@ -41,6 +41,6 @@ void qpid::framing::AMQContentBody::print(std::ostream& out) const { out << "content (" << size() << " bytes)"; #ifndef NDEBUG - out << data.substr(0,10); + out << " " << data.substr(0,10); #endif } diff --git a/cpp/src/qpid/framing/FrameSet.cpp b/cpp/src/qpid/framing/FrameSet.cpp index 434f1b3aad..12579f53cb 100644 --- a/cpp/src/qpid/framing/FrameSet.cpp +++ b/cpp/src/qpid/framing/FrameSet.cpp @@ -56,17 +56,17 @@ bool FrameSet::isComplete() const const AMQMethodBody* FrameSet::getMethod() const { - return parts.empty() ? 0 : dynamic_cast<const AMQMethodBody*>(parts[0].getBody()); + return parts.empty() ? 0 : parts[0].getMethod(); } const AMQHeaderBody* FrameSet::getHeaders() const { - return parts.size() < 2 ? 0 : dynamic_cast<const AMQHeaderBody*>(parts[1].getBody()); + return parts.size() < 2 ? 0 : parts[1].castBody<AMQHeaderBody>(); } AMQHeaderBody* FrameSet::getHeaders() { - return parts.size() < 2 ? 0 : dynamic_cast<AMQHeaderBody*>(parts[1].getBody()); + return parts.size() < 2 ? 0 : parts[1].castBody<AMQHeaderBody>(); } uint64_t FrameSet::getContentSize() const @@ -81,3 +81,10 @@ void FrameSet::getContent(std::string& out) const AccumulateContent accumulator(out); map_if(accumulator, TypeFilter(CONTENT_BODY)); } + +std::string FrameSet::getContent() const +{ + std::string out; + getContent(out); + return out; +} diff --git a/cpp/src/qpid/framing/FrameSet.h b/cpp/src/qpid/framing/FrameSet.h index d6d5cd7a13..9a9512a6d4 100644 --- a/cpp/src/qpid/framing/FrameSet.h +++ b/cpp/src/qpid/framing/FrameSet.h @@ -48,6 +48,7 @@ public: uint64_t getContentSize() const; void getContent(std::string&) const; + std::string getContent() const; const AMQMethodBody* getMethod() const; const AMQHeaderBody* getHeaders() const; diff --git a/cpp/src/qpid/framing/MethodContent.h b/cpp/src/qpid/framing/MethodContent.h index 11d8d42cab..737c0d6b7b 100644 --- a/cpp/src/qpid/framing/MethodContent.h +++ b/cpp/src/qpid/framing/MethodContent.h @@ -21,7 +21,8 @@ #ifndef _MethodContent_ #define _MethodContent_ -#include "HeaderProperties.h" +#include <string> +#include "AMQHeaderBody.h" namespace qpid { namespace framing { @@ -31,7 +32,7 @@ class MethodContent public: virtual ~MethodContent() {} //TODO: rethink this interface - virtual const HeaderProperties& getMethodHeaders() const = 0; + virtual AMQHeaderBody getHeader() const = 0; virtual const std::string& getData() const = 0; }; diff --git a/cpp/src/qpid/framing/TransferContent.cpp b/cpp/src/qpid/framing/TransferContent.cpp new file mode 100644 index 0000000000..1faa24ae0c --- /dev/null +++ b/cpp/src/qpid/framing/TransferContent.cpp @@ -0,0 +1,64 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "TransferContent.h" + +namespace qpid { +namespace framing { + +TransferContent::TransferContent(const std::string& _data) +{ + setData(_data); +} + +AMQHeaderBody TransferContent::getHeader() const +{ + return header; +} + +const std::string& TransferContent::getData() const +{ + return data; +} + +void TransferContent::setData(const std::string& _data) +{ + data = _data; + header.get<MessageProperties>(true)->setContentLength(data.size()); +} + +void TransferContent::appendData(const std::string& _data) +{ + data += _data; + header.get<MessageProperties>(true)->setContentLength(data.size()); +} + +MessageProperties& TransferContent::getMessageProperties() +{ + return *header.get<MessageProperties>(true); +} + +DeliveryProperties& TransferContent::getDeliveryProperties() +{ + return *header.get<DeliveryProperties>(true); +} + +}} diff --git a/cpp/src/qpid/framing/TransferContent.h b/cpp/src/qpid/framing/TransferContent.h new file mode 100644 index 0000000000..14ba209f4b --- /dev/null +++ b/cpp/src/qpid/framing/TransferContent.h @@ -0,0 +1,46 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _TransferContent_ +#define _TransferContent_ + +#include "MethodContent.h" +#include "qpid/framing/MessageProperties.h" +#include "qpid/framing/DeliveryProperties.h" + +namespace qpid { +namespace framing { + +class TransferContent : public MethodContent +{ + AMQHeaderBody header; + std::string data; +public: + TransferContent(const std::string& data); + AMQHeaderBody getHeader() const; + void setData(const std::string&); + void appendData(const std::string&); + const std::string& getData() const; + MessageProperties& getMessageProperties(); + DeliveryProperties& getDeliveryProperties(); +}; + +}} +#endif diff --git a/cpp/src/tests/ClientSessionTest.cpp b/cpp/src/tests/ClientSessionTest.cpp index 1acac9c980..a3d50d0ae9 100644 --- a/cpp/src/tests/ClientSessionTest.cpp +++ b/cpp/src/tests/ClientSessionTest.cpp @@ -22,6 +22,7 @@ #include "qpid_test_plugin.h" #include "InProcessBroker.h" #include "qpid/client/Session.h" +#include "qpid/framing/TransferContent.h" using namespace qpid::client; using namespace qpid::framing; @@ -29,7 +30,8 @@ using namespace qpid::framing; class ClientSessionTest : public CppUnit::TestCase { CPPUNIT_TEST_SUITE(ClientSessionTest); - CPPUNIT_TEST(testQueueQuery);; + CPPUNIT_TEST(testQueueQuery); + CPPUNIT_TEST(testTransfer); CPPUNIT_TEST_SUITE_END(); boost::shared_ptr<Connector> broker; @@ -55,14 +57,24 @@ class ClientSessionTest : public CppUnit::TestCase CPPUNIT_ASSERT_EQUAL(alternate, result.get().getAlternateExchange()); } - void testCompletion() + void testTransfer() { std::string queue("my-queue"); std::string dest("my-dest"); + std::string data("my message"); session.queueDeclare(0, queue, "", false, false, true, true, FieldTable()); - //subcribe to the queue with confirm_mode = 1 + //subcribe to the queue with confirm_mode = 1: session.messageSubscribe(0, queue, dest, false, 1, 0, false, FieldTable()); - //publish some messages + //publish a message: + TransferContent content(data); + content.getDeliveryProperties().setRoutingKey("my-queue"); + session.messageTransfer(0, "", 0, 0, content); + //get & test the message: + FrameSet::shared_ptr msg = session.get(); + CPPUNIT_ASSERT(msg->isA<MessageTransferBody>()); + CPPUNIT_ASSERT_EQUAL(data, msg->getContent()); + //confirm receipt: + session.execution().completed(msg->getId(), true, true); } }; |