/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ #include "qpid/broker/amqp/Translation.h" #include "qpid/broker/amqp/Outgoing.h" #include "qpid/broker/amqp_0_10/MessageTransfer.h" #include "qpid/broker/Broker.h" #include "qpid/amqp/Decoder.h" #include "qpid/amqp/descriptors.h" #include "qpid/amqp/MessageEncoder.h" #include "qpid/amqp_0_10/Codecs.h" #include "qpid/types/Variant.h" #include "qpid/types/encodings.h" #include "qpid/framing/MessageTransferBody.h" #include "qpid/log/Statement.h" #include namespace qpid { namespace broker { namespace amqp { namespace { const std::string EMPTY; const std::string FORWARD_SLASH("/"); const std::string TEXT_PLAIN("text/plain"); const std::string SUBJECT_KEY("qpid.subject"); const std::string APP_ID("x-amqp-0-10.app-id"); qpid::framing::ReplyTo translate(const std::string address, Broker* broker) { size_t i = address.find(FORWARD_SLASH); if (i == std::string::npos) { //is it a queue or an exchange? if (broker && broker->getQueues().find(address)) { return qpid::framing::ReplyTo(EMPTY, address); } else if (broker && broker->getExchanges().find(address)) { return qpid::framing::ReplyTo(address, EMPTY); } else { return qpid::framing::ReplyTo(); } } else { return qpid::framing::ReplyTo(i > 0 ? address.substr(0, i) : EMPTY, (i+1) < address.size() ? address.substr(i+1) : EMPTY); } } std::string translate(const qpid::framing::ReplyTo r) { if (r.getExchange().size()) { if (r.getRoutingKey().size()) return r.getExchange() + FORWARD_SLASH + r.getRoutingKey(); else return r.getExchange(); } else return r.getRoutingKey(); } std::string translate(const qpid::amqp::CharSequence& chars) { if (chars.data && chars.size) return std::string(chars.data, chars.size); else return EMPTY; } bool setMessageId(qpid::framing::MessageProperties& m, const qpid::amqp::CharSequence& chars) { if (chars.data && chars.size) { if (chars.size == 16) { m.setMessageId(qpid::framing::Uuid(chars.data)); return true; } else { std::istringstream in(translate(chars)); qpid::framing::Uuid uuid; in >> uuid; if (!in.fail()) { m.setMessageId(uuid); return true; } } } return false; } class Properties_0_10 : public qpid::amqp::MessageEncoder::Properties { public: bool hasMessageId() const { return messageProperties && messageProperties->hasMessageId(); } std::string getMessageId() const { return messageProperties ? messageProperties->getMessageId().str() : EMPTY; } bool hasUserId() const { return messageProperties && messageProperties->hasUserId(); } std::string getUserId() const { return messageProperties ? messageProperties->getUserId() : EMPTY; } bool hasTo() const { return getDestination().size() || hasSubject(); } std::string getTo() const { return getDestination().size() ? getDestination() : getSubject(); } bool hasSubject() const { if (getDestination().empty()) { return getApplicationProperties().isSet(SUBJECT_KEY); } else { return deliveryProperties && deliveryProperties->hasRoutingKey(); } } std::string getSubject() const { if (getDestination().empty()) { //message was sent to default exchange, routing key is the queue name return getApplicationProperties().getAsString(SUBJECT_KEY); } else if (deliveryProperties) { return deliveryProperties->getRoutingKey(); } else { return EMPTY; } } bool hasReplyTo() const { return messageProperties && messageProperties->hasReplyTo(); } std::string getReplyTo() const { return messageProperties ? translate(messageProperties->getReplyTo()) : EMPTY; } bool hasCorrelationId() const { return messageProperties && messageProperties->hasCorrelationId(); } std::string getCorrelationId() const { return messageProperties ? messageProperties->getCorrelationId() : EMPTY; } bool hasContentType() const { return messageProperties && messageProperties->hasContentType(); } std::string getContentType() const { return messageProperties ? messageProperties->getContentType() : EMPTY; } bool hasContentEncoding() const { return messageProperties && messageProperties->hasContentEncoding(); } std::string getContentEncoding() const { return messageProperties ? messageProperties->getContentEncoding() : EMPTY; } bool hasAbsoluteExpiryTime() const { return deliveryProperties && deliveryProperties->hasExpiration(); } int64_t getAbsoluteExpiryTime() const { return deliveryProperties ? deliveryProperties->getExpiration() : 0; } bool hasCreationTime() const { return false; } int64_t getCreationTime() const { return 0; } bool hasGroupId() const {return false; } std::string getGroupId() const { return EMPTY; } bool hasGroupSequence() const { return false; } uint32_t getGroupSequence() const { return 0; } bool hasReplyToGroupId() const { return false; } std::string getReplyToGroupId() const { return EMPTY; } const qpid::framing::FieldTable& getApplicationProperties() const { return messageProperties->getApplicationHeaders(); } Properties_0_10(const qpid::broker::amqp_0_10::MessageTransfer& t) : transfer(t), messageProperties(transfer.getProperties()), deliveryProperties(transfer.getProperties()) {} private: const qpid::broker::amqp_0_10::MessageTransfer& transfer; const qpid::framing::MessageProperties* messageProperties; const qpid::framing::DeliveryProperties* deliveryProperties; std::string getDestination() const { return transfer.getMethod()->getDestination(); } }; } Translation::Translation(const qpid::broker::Message& m, Broker* b) : original(m), broker(b) {} boost::intrusive_ptr Translation::getTransfer() { boost::intrusive_ptr t = boost::intrusive_ptr(dynamic_cast(&original.getEncoding())); if (t) { return t;//no translation required } else { const Message* message = dynamic_cast(&original.getEncoding()); if (message) { //translate 1.0 message into 0-10 boost::intrusive_ptr transfer(new qpid::broker::amqp_0_10::MessageTransfer()); qpid::framing::AMQFrame method((qpid::framing::MessageTransferBody(qpid::framing::ProtocolVersion(), EMPTY, 0, 0))); qpid::framing::AMQFrame header((qpid::framing::AMQHeaderBody())); qpid::framing::AMQFrame content((qpid::framing::AMQContentBody())); method.setEof(false); header.setBof(false); header.setEof(false); content.setBof(false); transfer->getFrames().append(method); transfer->getFrames().append(header); qpid::framing::MessageProperties* props = transfer->getFrames().getHeaders()->get(true); if (message->isTypedBody()) { qpid::types::Variant body = message->getTypedBody(); std::string& data = content.castBody()->getData(); if (body.getType() == qpid::types::VAR_MAP) { qpid::amqp_0_10::MapCodec::encode(body.asMap(), data); props->setContentType(qpid::amqp_0_10::MapCodec::contentType); } else if (body.getType() == qpid::types::VAR_LIST) { qpid::amqp_0_10::ListCodec::encode(body.asList(), data); props->setContentType(qpid::amqp_0_10::ListCodec::contentType); } else if (body.getType() == qpid::types::VAR_STRING) { data = body.getString(); if (body.getEncoding() == qpid::types::encodings::UTF8 || body.getEncoding() == qpid::types::encodings::ASCII) { props->setContentType(TEXT_PLAIN); } } else { qpid::types::Variant::List container; container.push_back(body); qpid::amqp_0_10::ListCodec::encode(container, data); props->setContentType(qpid::amqp_0_10::ListCodec::contentType); } transfer->getFrames().append(content); props->setContentLength(data.size()); } else { qpid::amqp::CharSequence body = message->getBody(); content.castBody()->getData().assign(body.data, body.size); transfer->getFrames().append(content); props->setContentLength(body.size); } qpid::amqp::MessageId mid = message->getMessageId(); qpid::framing::Uuid uuid; switch (mid.type) { case qpid::amqp::MessageId::NONE: break; case qpid::amqp::MessageId::UUID: case qpid::amqp::MessageId::BYTES: if (mid.value.bytes.size == 0) break; if (setMessageId(*props, mid.value.bytes)) break; case qpid::amqp::MessageId::ULONG: QPID_LOG(info, "Skipping message id in translation from 1.0 to 0-10 as it is not a UUID"); break; } qpid::amqp::MessageId cid = message->getCorrelationId(); switch (cid.type) { case qpid::amqp::MessageId::NONE: break; case qpid::amqp::MessageId::UUID: assert(cid.value.bytes.size = 16); props->setCorrelationId(qpid::framing::Uuid(cid.value.bytes.data).str()); break; case qpid::amqp::MessageId::BYTES: if (cid.value.bytes.size) { props->setCorrelationId(translate(cid.value.bytes)); } break; case qpid::amqp::MessageId::ULONG: props->setCorrelationId(boost::lexical_cast(cid.value.ulong)); break; } if (message->getReplyToAsCharSequence()) props->setReplyTo(translate(message->getReplyTo(), broker)); if (message->getContentType()) props->setContentType(translate(message->getContentType())); if (message->getContentEncoding()) props->setContentEncoding(translate(message->getContentEncoding())); props->setUserId(message->getUserId()); // TODO: FieldTable applicationHeaders; qpid::amqp::CharSequence ap = message->getApplicationProperties(); if (ap) { qpid::amqp::Decoder d(ap.data, ap.size); qpid::amqp_0_10::translate(d.readMap(), props->getApplicationHeaders()); std::string appid = props->getApplicationHeaders().getAsString(APP_ID); if (!appid.empty()) { props->setAppId(appid); } } qpid::framing::DeliveryProperties* dp = transfer->getFrames().getHeaders()->get(true); dp->setPriority(message->getPriority()); if (message->isPersistent()) dp->setDeliveryMode(2); if (message->getRoutingKey().size()) { if (message->getRoutingKey().size() > std::numeric_limits::max()) { //have to truncate routing key as it is specified to be a str8 dp->setRoutingKey(message->getRoutingKey().substr(0,std::numeric_limits::max())); } else { dp->setRoutingKey(message->getRoutingKey()); } props->getApplicationHeaders().setString(SUBJECT_KEY, message->getRoutingKey()); } return transfer.get(); } else { throw qpid::Exception("Could not write message data in AMQP 0-10 format"); } } } void Translation::write(OutgoingFromQueue& out) { const Message* message = dynamic_cast(original.getPersistentContext().get()); //persistent context will contain any newly added annotations if (!message) message = dynamic_cast(&original.getEncoding()); if (message) { //write annotations qpid::amqp::CharSequence deliveryAnnotations = message->getDeliveryAnnotations(); qpid::amqp::CharSequence messageAnnotations = message->getMessageAnnotations(); if (deliveryAnnotations.size) out.write(deliveryAnnotations.data, deliveryAnnotations.size); if (messageAnnotations.size) out.write(messageAnnotations.data, messageAnnotations.size); //write bare message qpid::amqp::CharSequence bareMessage = message->getBareMessage(); if (bareMessage.size) out.write(bareMessage.data, bareMessage.size); //write footer: qpid::amqp::CharSequence footer = message->getFooter(); if (footer.size) out.write(footer.data, footer.size); } else { const qpid::broker::amqp_0_10::MessageTransfer* transfer = dynamic_cast(&original.getEncoding()); if (transfer) { Properties_0_10 properties(*transfer); qpid::types::Variant::Map applicationProperties; qpid::amqp_0_10::translate(properties.getApplicationProperties(), applicationProperties); if (properties.getContentType() == qpid::amqp_0_10::MapCodec::contentType) { qpid::types::Variant::Map content; qpid::amqp_0_10::MapCodec::decode(transfer->getContent(), content); size_t size = qpid::amqp::MessageEncoder::getEncodedSize(properties); size += qpid::amqp::MessageEncoder::getEncodedSize(applicationProperties, true) + 3;/*descriptor*/ size += qpid::amqp::MessageEncoder::getEncodedSize(content, true) + 3/*descriptor*/; std::vector buffer(size); qpid::amqp::MessageEncoder encoder(&buffer[0], buffer.size()); encoder.writeProperties(properties); encoder.writeApplicationProperties(applicationProperties); encoder.writeMap(content, &qpid::amqp::message::AMQP_VALUE); out.write(&buffer[0], encoder.getPosition()); } else if (properties.getContentType() == qpid::amqp_0_10::ListCodec::contentType) { qpid::types::Variant::List content; qpid::amqp_0_10::ListCodec::decode(transfer->getContent(), content); size_t size = qpid::amqp::MessageEncoder::getEncodedSize(properties); size += qpid::amqp::MessageEncoder::getEncodedSize(applicationProperties, true) + 3;/*descriptor*/ size += qpid::amqp::MessageEncoder::getEncodedSize(content, true) + 3/*descriptor*/; std::vector buffer(size); qpid::amqp::MessageEncoder encoder(&buffer[0], buffer.size()); encoder.writeProperties(properties); encoder.writeApplicationProperties(applicationProperties); encoder.writeList(content, &qpid::amqp::message::AMQP_VALUE); out.write(&buffer[0], encoder.getPosition()); } else { std::string content = transfer->getContent(); size_t size = qpid::amqp::MessageEncoder::getEncodedSize(properties, applicationProperties, content); std::vector buffer(size); qpid::amqp::MessageEncoder encoder(&buffer[0], buffer.size()); encoder.writeProperties(properties); encoder.writeApplicationProperties(applicationProperties); if (content.size()) encoder.writeBinary(content, &qpid::amqp::message::DATA); out.write(&buffer[0], encoder.getPosition()); } } else { QPID_LOG(error, "Could not write message data in AMQP 1.0 format"); } } } }}} // namespace qpid::broker::amqp