/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ #include "Message.h" #include "qpid/amqp/Decoder.h" #include "qpid/amqp/descriptors.h" #include "qpid/amqp/MessageEncoder.h" #include "qpid/log/Statement.h" #include "qpid/framing/Buffer.h" #include namespace qpid { namespace broker { namespace amqp { namespace { std::string empty; } std::string Message::getRoutingKey() const { std::string v; v.assign(subject.data, subject.size); return v; } std::string Message::getUserId() const { std::string v; v.assign(userId.data, userId.size); return v; } bool Message::isPersistent() const { return durable && durable.get(); } bool Message::getTtl(uint64_t& t) const { if (!ttl) { return false; } else { t = ttl.get(); return true; } } uint8_t Message::getPriority() const { if (!priority) return 4; else return priority.get(); } std::string Message::getPropertyAsString(const std::string& /*key*/) const { return empty; } std::string Message::getAnnotationAsString(const std::string& /*key*/) const { return empty; } void Message::processProperties(MapHandler&) const {} //getContentSize() is primarily used in stats about the number of //bytes enqueued/dequeued etc, not sure whether this is the right name //and whether it should indeed only be the content that is thus //measured uint64_t Message::getContentSize() const { return data.size(); } //getContent() is used primarily for decoding qmf messages in management and ha std::string Message::getContent() const { return empty; } Message::Message(size_t size) : data(size) { deliveryAnnotations.init(); messageAnnotations.init(); bareMessage.init(); userId.init(); to.init(); subject.init(); replyTo.init(); contentType.init(); contentEncoding.init(); applicationProperties.init(); body.init(); footer.init(); } char* Message::getData() { return &data[0]; } const char* Message::getData() const { return &data[0]; } size_t Message::getSize() const { return data.size(); } qpid::amqp::MessageId Message::getMessageId() const { return messageId; } qpid::amqp::CharSequence Message::getReplyTo() const { return replyTo; } qpid::amqp::MessageId Message::getCorrelationId() const { return correlationId; } qpid::amqp::CharSequence Message::getContentType() const { return contentType; } qpid::amqp::CharSequence Message::getContentEncoding() const { return contentEncoding; } qpid::amqp::CharSequence Message::getDeliveryAnnotations() const { return deliveryAnnotations; } qpid::amqp::CharSequence Message::getMessageAnnotations() const { return messageAnnotations; } qpid::amqp::CharSequence Message::getApplicationProperties() const { return applicationProperties; } qpid::amqp::CharSequence Message::getBareMessage() const { return bareMessage; } qpid::amqp::CharSequence Message::getBody() const { return body; } qpid::amqp::CharSequence Message::getFooter() const { return footer; } void Message::scan() { qpid::amqp::Decoder decoder(getData(), getSize()); decoder.read(*this); bareMessage = qpid::amqp::MessageReader::getBareMessage(); if (bareMessage.data && !bareMessage.size) { bareMessage.size = getSize() - (bareMessage.data - getData()); } } const Message& Message::get(const qpid::broker::Message& message) { const Message* m = dynamic_cast(&message.getEncoding()); if (!m) throw qpid::Exception("Translation not yet implemented!!"); return *m; } void Message::onDurable(bool b) { durable = b; } void Message::onPriority(uint8_t i) { priority = i; } void Message::onTtl(uint32_t i) { ttl = i; } void Message::onFirstAcquirer(bool b) { firstAcquirer = b; } void Message::onDeliveryCount(uint32_t i) { deliveryCount = i; } void Message::onMessageId(uint64_t v) { messageId.set(v); } void Message::onMessageId(const qpid::amqp::CharSequence& v, qpid::types::VariantType t) { messageId.set(v, t); } void Message::onUserId(const qpid::amqp::CharSequence& v) { userId = v; } void Message::onTo(const qpid::amqp::CharSequence& v) { to = v; } void Message::onSubject(const qpid::amqp::CharSequence& v) { subject = v; } void Message::onReplyTo(const qpid::amqp::CharSequence& v) { replyTo = v; } void Message::onCorrelationId(uint64_t v) { correlationId.set(v); } void Message::onCorrelationId(const qpid::amqp::CharSequence& v, qpid::types::VariantType t) { correlationId.set(v, t);} void Message::onContentType(const qpid::amqp::CharSequence& v) { contentType = v; } void Message::onContentEncoding(const qpid::amqp::CharSequence& v) { contentEncoding = v; } void Message::onAbsoluteExpiryTime(int64_t) {} void Message::onCreationTime(int64_t) {} void Message::onGroupId(const qpid::amqp::CharSequence&) {} void Message::onGroupSequence(uint32_t) {} void Message::onReplyToGroupId(const qpid::amqp::CharSequence&) {} void Message::onApplicationProperties(const qpid::amqp::CharSequence& v) { applicationProperties = v; } void Message::onDeliveryAnnotations(const qpid::amqp::CharSequence& v) { deliveryAnnotations = v; } void Message::onMessageAnnotations(const qpid::amqp::CharSequence& v) { messageAnnotations = v; } void Message::onBody(const qpid::amqp::CharSequence& v, const qpid::amqp::Descriptor&) { body = v; } void Message::onBody(const qpid::types::Variant&, const qpid::amqp::Descriptor&) {} void Message::onFooter(const qpid::amqp::CharSequence& v) { footer = v; } //PersistableMessage interface: void Message::encode(framing::Buffer& buffer) const { buffer.putLong(0);//4-byte format indicator buffer.putRawData((const uint8_t*) getData(), getSize()); QPID_LOG(debug, "Encoded 1.0 message of " << getSize() << " bytes, including " << bareMessage.size << " bytes of 'bare message'"); } uint32_t Message::encodedSize() const { return 4/*format indicator*/ + data.size(); } //in 1.0 the binary header/content makes less sense and in any case //the functionality that split originally supported (i.e. lazy-loaded //messages) is no longer in use; for 1.0 we therefore treat the whole //content as 'header' and load it in the first stage. uint32_t Message::encodedHeaderSize() const { return encodedSize(); } void Message::decodeHeader(framing::Buffer& buffer) { if (buffer.available() != getSize()) { QPID_LOG(warning, "1.0 Message buffer was " << data.size() << " bytes, but " << buffer.available() << " bytes are available. Resizing."); data.resize(buffer.available()); } buffer.getRawData((uint8_t*) getData(), getSize()); scan(); QPID_LOG(debug, "Decoded 1.0 message of " << getSize() << " bytes, including " << bareMessage.size << " bytes of 'bare message'"); } void Message::decodeContent(framing::Buffer& /*buffer*/) {} boost::intrusive_ptr Message::merge(const std::map& annotations) const { //message- or delivery- annotations? would have to determine that from the name, for now assume always message-annotations size_t extra = 0; if (messageAnnotations) { //TODO: actual merge required } else { //add whole new section extra = qpid::amqp::MessageEncoder::getEncodedSize(annotations, true); } boost::intrusive_ptr copy(new Message(data.size()+extra)); size_t position(0); if (deliveryAnnotations) { ::memcpy(©->data[position], deliveryAnnotations.data, deliveryAnnotations.size); position += deliveryAnnotations.size; } if (messageAnnotations) { //TODO: actual merge required ::memcpy(©->data[position], messageAnnotations.data, messageAnnotations.size); position += messageAnnotations.size; } else { qpid::amqp::MessageEncoder encoder(©->data[position], extra); encoder.writeMap(annotations, &qpid::amqp::message::MESSAGE_ANNOTATIONS, true); position += extra; } if (bareMessage) { ::memcpy(©->data[position], bareMessage.data, bareMessage.size); position += bareMessage.size; } if (footer) { ::memcpy(©->data[position], footer.data, footer.size); position += footer.size; } copy->scan(); return copy; } }}} // namespace qpid::broker::amqp