summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp366
1 files changed, 366 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp b/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp
new file mode 100644
index 0000000000..e343abde8a
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp
@@ -0,0 +1,366 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "MessageTransfer.h"
+#include "qpid/broker/MapHandler.h"
+#include "qpid/broker/Message.h"
+#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/framing/MessageProperties.h"
+#include "qpid/framing/DeliveryProperties.h"
+#include "qpid/framing/FieldValue.h"
+#include "qpid/framing/frame_functors.h"
+#include "qpid/framing/TypeFilter.h"
+#include "qpid/framing/SendContent.h"
+#include "qpid/log/Statement.h"
+
+using namespace qpid::framing;
+
+namespace qpid {
+namespace broker {
+namespace amqp_0_10 {
+namespace {
+const std::string QMF2("qmf2");
+const std::string PARTIAL("partial");
+}
+MessageTransfer::MessageTransfer() : frames(framing::SequenceNumber()) {}
+MessageTransfer::MessageTransfer(const framing::SequenceNumber& id) : frames(id) {}
+
+uint64_t MessageTransfer::getContentSize() const
+{
+ return frames.getContentSize();
+}
+
+std::string MessageTransfer::getAnnotationAsString(const std::string& key) const
+{
+ const qpid::framing::MessageProperties* mp = getProperties<qpid::framing::MessageProperties>();
+ if (mp && mp->hasApplicationHeaders()) {
+ return mp->getApplicationHeaders().getAsString(key);
+ } else {
+ return std::string();
+ }
+}
+std::string MessageTransfer::getPropertyAsString(const std::string& key) const { return getAnnotationAsString(key); }
+
+bool MessageTransfer::getTtl(uint64_t& result) const
+{
+ const qpid::framing::DeliveryProperties* dp = getProperties<qpid::framing::DeliveryProperties>();
+ if (dp && dp->hasTtl()) {
+ result = dp->getTtl();
+ return true;
+ } else {
+ return false;
+ }
+}
+bool MessageTransfer::hasExpiration() const
+{
+ const qpid::framing::DeliveryProperties* dp = getProperties<qpid::framing::DeliveryProperties>();
+ if (dp && dp->hasExpiration()) {
+ return true;
+ } else {
+ return false;
+ }
+}
+
+uint8_t MessageTransfer::getPriority() const
+{
+ const qpid::framing::DeliveryProperties* dp = getProperties<qpid::framing::DeliveryProperties>();
+ if (dp && dp->hasPriority()) {
+ return dp->getPriority();
+ } else {
+ return 0;
+ }
+}
+
+std::string MessageTransfer::getExchangeName() const
+{
+ return getFrames().as<framing::MessageTransferBody>()->getDestination();
+}
+
+bool MessageTransfer::requiresAccept() const
+{
+ const framing::MessageTransferBody* b = getFrames().as<framing::MessageTransferBody>();
+ return b && b->getAcceptMode() == 0/*EXPLICIT == 0*/;
+}
+uint32_t MessageTransfer::getRequiredCredit() const
+{
+ return requiredCredit;
+}
+void MessageTransfer::computeRequiredCredit()
+{
+ //add up payload for all header and content frames in the frameset
+ qpid::framing::SumBodySize sum;
+ frames.map_if(sum, qpid::framing::TypeFilter2<qpid::framing::HEADER_BODY, qpid::framing::CONTENT_BODY>());
+ requiredCredit = sum.getSize();
+}
+uint32_t MessageTransfer::getRequiredCredit(const qpid::broker::Message& msg)
+{
+ //TODO: may need to reflect annotations and other modifications in this also
+ return get(msg).getRequiredCredit();
+}
+
+qpid::framing::FrameSet& MessageTransfer::getFrames()
+{
+ return frames;
+}
+const qpid::framing::FrameSet& MessageTransfer::getFrames() const
+{
+ return frames;
+}
+void MessageTransfer::sendContent(framing::FrameHandler& out, uint16_t maxFrameSize) const
+{
+ qpid::framing::Count c;
+ frames.map_if(c, qpid::framing::TypeFilter<qpid::framing::CONTENT_BODY>());
+
+ qpid::framing::SendContent f(out, maxFrameSize, c.getCount());
+ frames.map_if(f, qpid::framing::TypeFilter<qpid::framing::CONTENT_BODY>());
+}
+
+class SendHeader
+{
+ public:
+ SendHeader(FrameHandler& h, bool r, uint64_t t, uint64_t ts, const qpid::types::Variant::Map& a) : handler(h), redelivered(r), ttl(t), timestamp(ts), annotations(a) {}
+ void operator()(const AMQFrame& f)
+ {
+ AMQFrame copy = f;
+ if (redelivered || ttl || timestamp || annotations.size()) {
+ copy.cloneBody();
+ if (annotations.size()) {
+ MessageProperties* props =
+ copy.castBody<AMQHeaderBody>()->get<MessageProperties>(true);
+ for (qpid::types::Variant::Map::const_iterator i = annotations.begin();
+ i != annotations.end(); ++i) {
+ props->getApplicationHeaders().setString(i->first, i->second.asString());
+ }
+ }
+ if (redelivered || ttl || timestamp) {
+ DeliveryProperties* dp =
+ copy.castBody<AMQHeaderBody>()->get<DeliveryProperties>(true);
+ if (ttl) dp->setTtl(ttl);
+ if (redelivered) dp->setRedelivered(redelivered);
+ if (timestamp) dp->setTimestamp(timestamp);
+ }
+ }
+ handler.handle(copy);
+ }
+ private:
+ FrameHandler& handler;
+ bool redelivered;
+ uint64_t ttl;
+ uint64_t timestamp;
+ const qpid::types::Variant::Map& annotations;
+};
+
+void MessageTransfer::sendHeader(framing::FrameHandler& out, uint16_t /*maxFrameSize*/,
+ bool redelivered, uint64_t ttl, uint64_t timestamp,
+ const qpid::types::Variant::Map& annotations) const
+{
+ SendHeader f(out, redelivered, ttl, timestamp, annotations);
+ frames.map_if(f, TypeFilter<HEADER_BODY>());
+}
+bool MessageTransfer::isImmediateDeliveryRequired(const qpid::broker::Message& /*message*/)
+{
+ return false;//TODO
+}
+
+const framing::SequenceNumber& MessageTransfer::getCommandId() const { return frames.getId(); }
+
+std::string MessageTransfer::getRoutingKey() const
+{
+ const qpid::framing::DeliveryProperties* dp = getProperties<qpid::framing::DeliveryProperties>();
+ if (dp && dp->hasRoutingKey()) {
+ return dp->getRoutingKey();
+ } else {
+ return std::string();
+ }
+}
+bool MessageTransfer::isPersistent() const
+{
+ const qpid::framing::DeliveryProperties* dp = getProperties<qpid::framing::DeliveryProperties>();
+ if (dp && dp->hasDeliveryMode()) {
+ return dp->getDeliveryMode() == 2;
+ } else {
+ return false;
+ }
+}
+
+std::string MessageTransfer::getContent() const
+{
+ return frames.getContent();
+}
+
+void MessageTransfer::decodeHeader(framing::Buffer& buffer)
+{
+ AMQFrame method;
+ method.decode(buffer);
+ frames.append(method);
+
+ AMQFrame header;
+ header.decode(buffer);
+ frames.append(header);
+}
+void MessageTransfer::decodeContent(framing::Buffer& buffer)
+{
+ if (buffer.available()) {
+ //get the data as a string and set that as the content
+ //body on a frame then add that frame to the frameset
+ AMQFrame frame((AMQContentBody()));
+ frame.castBody<AMQContentBody>()->decode(buffer, buffer.available());
+ frame.setFirstSegment(false);
+ frames.append(frame);
+ } else {
+ //adjust header flags
+ MarkLastSegment f;
+ frames.map_if(f, TypeFilter<HEADER_BODY>());
+ }
+}
+
+void MessageTransfer::encode(framing::Buffer& buffer) const
+{
+ //encode method and header frames
+ EncodeFrame f1(buffer);
+ frames.map_if(f1, TypeFilter2<METHOD_BODY, HEADER_BODY>());
+
+ //then encode the payload of each content frame
+ framing::EncodeBody f2(buffer);
+ frames.map_if(f2, TypeFilter<CONTENT_BODY>());
+}
+
+void MessageTransfer::encodeContent(framing::Buffer& buffer) const
+{
+ //encode the payload of each content frame
+ EncodeBody f2(buffer);
+ frames.map_if(f2, TypeFilter<CONTENT_BODY>());
+}
+
+uint32_t MessageTransfer::encodedSize() const
+{
+ return encodedHeaderSize() + encodedContentSize();
+}
+
+uint32_t MessageTransfer::encodedContentSize() const
+{
+ return frames.getContentSize();
+}
+
+uint32_t MessageTransfer::encodedHeaderSize() const
+{
+ //add up the size for all method and header frames in the frameset
+ SumFrameSize sum;
+ frames.map_if(sum, TypeFilter2<METHOD_BODY, HEADER_BODY>());
+ return sum.getSize();
+}
+
+bool MessageTransfer::isQMFv2() const
+{
+ const framing::MessageProperties* props = getProperties<framing::MessageProperties>();
+ return props && props->getAppId() == QMF2 && props->hasApplicationHeaders();
+}
+
+bool MessageTransfer::isQMFv2(const qpid::broker::Message& message)
+{
+ const MessageTransfer* transfer = dynamic_cast<const MessageTransfer*>(&message.getEncoding());
+ return transfer && transfer->isQMFv2();
+}
+
+bool MessageTransfer::isLastQMFResponse(const std::string correlation) const
+{
+ const framing::MessageProperties* props = getProperties<framing::MessageProperties>();
+ return props && props->getCorrelationId() == correlation
+ && props->hasApplicationHeaders() && !props->getApplicationHeaders().isSet(PARTIAL);
+}
+
+bool MessageTransfer::isLastQMFResponse(const qpid::broker::Message& message, const std::string correlation)
+{
+ const MessageTransfer* transfer = dynamic_cast<const MessageTransfer*>(&message.getEncoding());
+ return transfer && transfer->isLastQMFResponse(correlation);
+}
+
+
+void MessageTransfer::processProperties(qpid::broker::MapHandler& handler) const
+{
+ const qpid::framing::MessageProperties* mp = getProperties<qpid::framing::MessageProperties>();
+ if (mp && mp->hasApplicationHeaders()) {
+ const FieldTable ft = mp->getApplicationHeaders();
+ for (FieldTable::const_iterator i = ft.begin(); i != ft.end(); ++i) {
+ qpid::broker::MapHandler::CharSequence key;
+ key.data = i->first.data();
+ key.size = i->first.size();
+ FieldTable::ValuePtr v = i->second;
+ //TODO: something more sophisticated...
+ if (v->empty()) {
+ handler.handleVoid(key);
+ } else if (v->convertsTo<uint64_t>()) {
+ handler.handleUint64(key, v->get<uint64_t>());
+ } else if (v->convertsTo<int64_t>()) {
+ handler.handleInt64(key, v->get<int64_t>());
+ } else if (v->convertsTo<std::string>()) {
+ std::string s = v->get<std::string>();
+ qpid::broker::MapHandler::CharSequence value;
+ value.data = s.data();
+ value.size = s.size();
+ qpid::broker::MapHandler::CharSequence encoding; encoding.size = 0; encoding.data = 0;
+ handler.handleString(key, value, encoding);
+ } else {
+ QPID_LOG(debug, "Unhandled key!" << *v);
+ }
+ }
+ }
+}
+
+std::string MessageTransfer::getUserId() const
+{
+ const qpid::framing::MessageProperties* mp = getProperties<qpid::framing::MessageProperties>();
+ if (mp && mp->hasUserId()) return mp->getUserId();
+ else return std::string();
+
+}
+MessageTransfer::MessageTransfer(const qpid::framing::FrameSet& f) : frames(f), requiredCredit(0) {}
+
+boost::intrusive_ptr<PersistableMessage> MessageTransfer::merge(const std::map<std::string, qpid::types::Variant>& annotations) const
+{
+ boost::intrusive_ptr<MessageTransfer> clone(new MessageTransfer(this->frames));
+ qpid::framing::MessageProperties* mp = clone->frames.getHeaders()->get<qpid::framing::MessageProperties>(true);
+ for (qpid::types::Variant::Map::const_iterator i = annotations.begin(); i != annotations.end(); ++i) {
+ mp->getApplicationHeaders().setString(i->first, i->second);
+ }
+ return clone;
+}
+}
+// qpid::broker namespace, TODO: move these elsewhere!
+void encode(const Message& in, std::string& out)
+{
+ const amqp_0_10::MessageTransfer& transfer = amqp_0_10::MessageTransfer::get(in);
+ uint32_t size = transfer.encodedSize();
+ std::vector<char> data(size);
+ qpid::framing::Buffer buffer(&(data[0]), size);
+ transfer.encode(buffer);
+ buffer.reset();
+ buffer.getRawData(out, size);
+}
+void decode(const std::string& in, Message& out)
+{
+ boost::intrusive_ptr<amqp_0_10::MessageTransfer> transfer(new amqp_0_10::MessageTransfer);
+ qpid::framing::Buffer buffer(const_cast<char*>(in.data()), in.size());
+ transfer->decodeHeader(buffer);
+ transfer->decodeContent(buffer);
+ out = Message(transfer, transfer);
+}
+
+}} // namespace qpid::broker::amqp_0_10