diff options
Diffstat (limited to 'qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp | 643 |
1 files changed, 643 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp new file mode 100644 index 0000000000..5289fbdf9b --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp @@ -0,0 +1,643 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "SenderContext.h" +#include "Transaction.h" +#include "EncodedMessage.h" +#include "PnData.h" +#include "util.h" +#include "qpid/messaging/AddressImpl.h" +#include "qpid/messaging/exceptions.h" +#include "qpid/Exception.h" +#include "qpid/amqp/descriptors.h" +#include "qpid/amqp/MapHandler.h" +#include "qpid/amqp/MessageEncoder.h" +#include "qpid/messaging/exceptions.h" +#include "qpid/messaging/Message.h" +#include "qpid/messaging/MessageImpl.h" +#include "qpid/log/Statement.h" +#include "config.h" +extern "C" { +#include <proton/engine.h> +} +#include <boost/shared_ptr.hpp> +#include <string.h> + +namespace qpid { +namespace messaging { +namespace amqp { + +//TODO: proper conversion to wide string for address +SenderContext::SenderContext(pn_session_t* session, const std::string& n, + const qpid::messaging::Address& a, + bool setToOnSend_, + const CoordinatorPtr& coord) + : sender(pn_sender(session, n.c_str())), + name(n), + address(a), + helper(address), + nextId(0), capacity(50), unreliable(helper.isUnreliable()), + setToOnSend(setToOnSend_), + transaction(coord) +{} + +SenderContext::~SenderContext() +{ + if (sender) pn_link_free(sender); +} + +void SenderContext::close() +{ + if (sender) pn_link_close(sender); +} + +void SenderContext::setCapacity(uint32_t c) +{ + if (c < deliveries.size()) throw qpid::messaging::SenderError("Desired capacity is less than unsettled message count!"); + capacity = c; +} + +uint32_t SenderContext::getCapacity() +{ + return capacity; +} + +uint32_t SenderContext::getUnsettled() +{ + return processUnsettled(true/*always allow retrieval of unsettled count, even if link has failed*/); +} + +const std::string& SenderContext::getName() const +{ + return name; +} + +const std::string& SenderContext::getTarget() const +{ + return address.getName(); +} + +bool SenderContext::send(const qpid::messaging::Message& message, SenderContext::Delivery** out) +{ + resend();//if there are any messages needing to be resent at the front of the queue, send them first + if (processUnsettled(false) < capacity && pn_link_credit(sender)) { + types::Variant state; + if (transaction) + state = transaction->getSendState(); + if (unreliable) { + Delivery delivery(nextId++); + delivery.encode(MessageImplAccess::get(message), address, setToOnSend); + delivery.send(sender, unreliable, state); + *out = 0; + return true; + } else { + deliveries.push_back(Delivery(nextId++)); + try { + Delivery& delivery = deliveries.back(); + delivery.encode(MessageImplAccess::get(message), address, setToOnSend); + delivery.send(sender, unreliable, state); + *out = &delivery; + return true; + } catch (const std::exception& e) { + deliveries.pop_back(); + --nextId; + throw SendError(e.what()); + } + } + } else { + return false; + } +} + +void SenderContext::check() +{ + if (pn_link_state(sender) & PN_REMOTE_CLOSED && !(pn_link_state(sender) & PN_LOCAL_CLOSED)) { + std::string text = get_error_string(pn_link_remote_condition(sender), "Link detached by peer"); + pn_link_close(sender); + throw qpid::messaging::LinkError(text); + } +} + +uint32_t SenderContext::processUnsettled(bool silent) +{ + if (!silent) { + check(); + } + //remove messages from front of deque once peer has confirmed receipt + while (!deliveries.empty() && deliveries.front().delivered() && !(pn_link_state(sender) & PN_REMOTE_CLOSED)) { + deliveries.front().settle(); + deliveries.pop_front(); + } + return deliveries.size(); +} +namespace { +const std::string X_AMQP("x-amqp-"); +const std::string X_AMQP_FIRST_ACQUIRER("x-amqp-first-acquirer"); +const std::string X_AMQP_DELIVERY_COUNT("x-amqp-delivery-count"); +const std::string X_AMQP_0_10_APP_ID("x-amqp-0-10.app-id"); + +class HeaderAdapter : public qpid::amqp::MessageEncoder::Header +{ + public: + HeaderAdapter(const qpid::messaging::MessageImpl& impl) : msg(impl), headers(msg.getHeaders()) {} + virtual bool isDurable() const + { + return msg.isDurable(); + } + virtual uint8_t getPriority() const + { + return msg.getPriority(); + } + virtual bool hasTtl() const + { + return msg.getTtl(); + } + virtual uint32_t getTtl() const + { + return msg.getTtl(); + } + virtual bool isFirstAcquirer() const + { + qpid::types::Variant::Map::const_iterator i = headers.find(X_AMQP_FIRST_ACQUIRER); + if (i != headers.end()) { + return i->second; + } else { + return false; + } + } + virtual uint32_t getDeliveryCount() const + { + qpid::types::Variant::Map::const_iterator i = headers.find(X_AMQP_DELIVERY_COUNT); + if (i != headers.end()) { + return i->second; + } else { + return msg.isRedelivered() ? 1 : 0; + } + } + private: + const qpid::messaging::MessageImpl& msg; + const qpid::types::Variant::Map& headers; +}; +const std::string EMPTY; +const std::string FORWARD_SLASH("/"); +const std::string X_AMQP_TO("x-amqp-to"); +const std::string X_AMQP_CONTENT_ENCODING("x-amqp-content-encoding"); +const std::string X_AMQP_CREATION_TIME("x-amqp-creation-time"); +const std::string X_AMQP_ABSOLUTE_EXPIRY_TIME("x-amqp-absolute-expiry-time"); +const std::string X_AMQP_GROUP_ID("x-amqp-group-id"); +const std::string X_AMQP_GROUP_SEQUENCE("x-amqp-group-sequence"); +const std::string X_AMQP_REPLY_TO_GROUP_ID("x-amqp-reply-to-group-id"); +const std::string X_AMQP_MESSAGE_ANNOTATIONS("x-amqp-message-annotations"); +const std::string X_AMQP_DELIVERY_ANNOTATIONS("x-amqp-delivery-annotations"); + +class PropertiesAdapter : public qpid::amqp::MessageEncoder::Properties +{ + public: + PropertiesAdapter(const qpid::messaging::MessageImpl& impl, const std::string& s, const std::string& t) : msg(impl), headers(msg.getHeaders()), subject(s), to(t) {} + bool hasMessageId() const + { + return getMessageId().size(); + } + std::string getMessageId() const + { + return msg.getMessageId(); + } + + bool hasUserId() const + { + return getUserId().size(); + } + + std::string getUserId() const + { + return msg.getUserId(); + } + + bool hasTo() const + { + return hasHeader(X_AMQP_TO) || !to.empty(); + } + + std::string getTo() const + { + qpid::types::Variant::Map::const_iterator i = headers.find(X_AMQP_TO); + if (i == headers.end()) return to; + else return i->second; + } + + bool hasSubject() const + { + return subject.size() || getSubject().size(); + } + + std::string getSubject() const + { + return subject.size() ? subject : msg.getSubject(); + } + + bool hasReplyTo() const + { + return msg.getReplyTo(); + } + + std::string getReplyTo() const + { + Address a = msg.getReplyTo(); + if (a.getSubject().size()) { + return a.getName() + FORWARD_SLASH + a.getSubject(); + } else { + return a.getName(); + } + } + + bool hasCorrelationId() const + { + return getCorrelationId().size(); + } + + std::string getCorrelationId() const + { + return msg.getCorrelationId(); + } + + bool hasContentType() const + { + return getContentType().size(); + } + + std::string getContentType() const + { + return msg.getContentType(); + } + + bool hasContentEncoding() const + { + return hasHeader(X_AMQP_CONTENT_ENCODING); + } + + std::string getContentEncoding() const + { + return headers.find(X_AMQP_CONTENT_ENCODING)->second; + } + + bool hasAbsoluteExpiryTime() const + { + return hasHeader(X_AMQP_ABSOLUTE_EXPIRY_TIME); + } + + int64_t getAbsoluteExpiryTime() const + { + return headers.find(X_AMQP_ABSOLUTE_EXPIRY_TIME)->second; + } + + bool hasCreationTime() const + { + return hasHeader(X_AMQP_CREATION_TIME); + } + + int64_t getCreationTime() const + { + return headers.find(X_AMQP_CREATION_TIME)->second; + } + + bool hasGroupId() const + { + return hasHeader(X_AMQP_GROUP_ID); + } + + std::string getGroupId() const + { + return headers.find(X_AMQP_GROUP_ID)->second; + } + + bool hasGroupSequence() const + { + return hasHeader(X_AMQP_GROUP_SEQUENCE); + } + + uint32_t getGroupSequence() const + { + return headers.find(X_AMQP_GROUP_SEQUENCE)->second; + } + + bool hasReplyToGroupId() const + { + return hasHeader(X_AMQP_REPLY_TO_GROUP_ID); + } + + std::string getReplyToGroupId() const + { + return headers.find(X_AMQP_REPLY_TO_GROUP_ID)->second; + } + private: + const qpid::messaging::MessageImpl& msg; + const qpid::types::Variant::Map& headers; + const std::string subject; + const std::string to; + + bool hasHeader(const std::string& key) const + { + return headers.find(key) != headers.end(); + } +}; + +bool startsWith(const std::string& input, const std::string& pattern) +{ + if (input.size() < pattern.size()) return false; + for (std::string::const_iterator b = pattern.begin(), a = input.begin(); b != pattern.end(); ++b, ++a) { + if (*a != *b) return false; + } + return true; +} +class ApplicationPropertiesAdapter : public qpid::amqp::MessageEncoder::ApplicationProperties +{ + public: + ApplicationPropertiesAdapter(const qpid::types::Variant::Map& h) : headers(h) {} + void handle(qpid::amqp::MapHandler& h) const + { + for (qpid::types::Variant::Map::const_iterator i = headers.begin(); i != headers.end(); ++i) { + //strip out values with special keys as they are sent in standard fields + if (!startsWith(i->first, X_AMQP) || i->first == X_AMQP_0_10_APP_ID) { + qpid::amqp::CharSequence key(convert(i->first)); + switch (i->second.getType()) { + case qpid::types::VAR_VOID: + h.handleVoid(key); + break; + case qpid::types::VAR_BOOL: + h.handleBool(key, i->second); + break; + case qpid::types::VAR_UINT8: + h.handleUint8(key, i->second); + break; + case qpid::types::VAR_UINT16: + h.handleUint16(key, i->second); + break; + case qpid::types::VAR_UINT32: + h.handleUint32(key, i->second); + break; + case qpid::types::VAR_UINT64: + h.handleUint64(key, i->second); + break; + case qpid::types::VAR_INT8: + h.handleInt8(key, i->second); + break; + case qpid::types::VAR_INT16: + h.handleInt16(key, i->second); + break; + case qpid::types::VAR_INT32: + h.handleInt32(key, i->second); + break; + case qpid::types::VAR_INT64: + h.handleInt64(key, i->second); + break; + case qpid::types::VAR_FLOAT: + h.handleFloat(key, i->second); + break; + case qpid::types::VAR_DOUBLE: + h.handleDouble(key, i->second); + break; + case qpid::types::VAR_STRING: + h.handleString(key, convert(i->second), convert(i->second.getEncoding())); + break; + case qpid::types::VAR_UUID: + QPID_LOG(warning, "Skipping UUID in application properties; not yet handled correctly."); + break; + case qpid::types::VAR_MAP: + case qpid::types::VAR_LIST: + QPID_LOG(warning, "Skipping nested list and map; not allowed in application properties."); + break; + } + } + } + } + private: + const qpid::types::Variant::Map& headers; + + static qpid::amqp::CharSequence convert(const std::string& in) + { + qpid::amqp::CharSequence out; + out.data = in.data(); + out.size = in.size(); + return out; + } +}; + +bool changedSubject(const qpid::messaging::MessageImpl& msg, const qpid::messaging::Address& address) +{ + return address.getSubject().size() && address.getSubject() != msg.getSubject(); +} + +} + +SenderContext::Delivery::Delivery(int32_t i) : id(i), token(0), presettled(false) {} + +void SenderContext::Delivery::reset() +{ + token = 0; +} + +void SenderContext::Delivery::encode(const qpid::messaging::MessageImpl& msg, const qpid::messaging::Address& address, bool setToField) +{ + try { + boost::shared_ptr<const EncodedMessage> original = msg.getEncoded(); + + if (original && !changedSubject(msg, address)) { //still have the content as received, send at least the bare message unaltered + //do we need to alter the header? are durable, priority, ttl, first-acquirer, delivery-count different from what was received? + if (original->hasHeaderChanged(msg)) { + //since as yet have no annotations, just write the revised header then the rest of the message as received + encoded.resize(16/*max header size*/ + original->getBareMessage().size); + qpid::amqp::MessageEncoder encoder(encoded.getData(), encoded.getSize()); + HeaderAdapter header(msg); + encoder.writeHeader(header); + ::memcpy(encoded.getData() + encoder.getPosition(), original->getBareMessage().data, original->getBareMessage().size); + } else { + //since as yet have no annotations, if the header hasn't + //changed and we still have the original bare message, can + //send the entire content as is + encoded.resize(original->getSize()); + ::memcpy(encoded.getData(), original->getData(), original->getSize()); + } + } else { + HeaderAdapter header(msg); + PropertiesAdapter properties(msg, address.getSubject(), setToField ? address.getName() : EMPTY); + ApplicationPropertiesAdapter applicationProperties(msg.getHeaders()); + //compute size: + size_t contentSize = qpid::amqp::MessageEncoder::getEncodedSize(header) + + qpid::amqp::MessageEncoder::getEncodedSize(properties) + + qpid::amqp::MessageEncoder::getEncodedSize(applicationProperties); + if (msg.getContent().isVoid()) { + contentSize += qpid::amqp::MessageEncoder::getEncodedSizeForContent(msg.getBytes()); + } else { + contentSize += qpid::amqp::MessageEncoder::getEncodedSizeForValue(msg.getContent()) + 3/*descriptor*/; + } + encoded.resize(contentSize); + QPID_LOG(debug, "Sending message, buffer is " << encoded.getSize() << " bytes") + qpid::amqp::MessageEncoder encoder(encoded.getData(), encoded.getSize()); + //write header: + encoder.writeHeader(header); + //write delivery-annotations, write message-annotations (none yet supported) + //write properties + encoder.writeProperties(properties); + //write application-properties + encoder.writeApplicationProperties(applicationProperties); + //write body + if (!msg.getContent().isVoid()) { + //write as AmqpValue + encoder.writeValue(msg.getContent(), &qpid::amqp::message::AMQP_VALUE); + } else if (msg.getBytes().size()) { + encoder.writeBinary(msg.getBytes(), &qpid::amqp::message::DATA);//structured content not yet directly supported + } + if (encoder.getPosition() < encoded.getSize()) { + QPID_LOG(debug, "Trimming buffer from " << encoded.getSize() << " to " << encoder.getPosition()); + encoded.trim(encoder.getPosition()); + } + //write footer (no annotations yet supported) + } + } catch (const qpid::Exception& e) { + throw SendError(e.what()); + } +} + +void SenderContext::Delivery::send(pn_link_t* sender, bool unreliable, const types::Variant& state) +{ + pn_delivery_tag_t tag; + tag.size = sizeof(id); +#ifdef NO_PROTON_DELIVERY_TAG_T + tag.start = reinterpret_cast<const char*>(&id); +#else + tag.bytes = reinterpret_cast<const char*>(&id); +#endif + token = pn_delivery(sender, tag); + if (!state.isVoid()) { // Add transaction state + PnData data(pn_disposition_data(pn_delivery_local(token))); + data.put(state); + pn_delivery_update(token, qpid::amqp::transaction::TRANSACTIONAL_STATE_CODE); + } + pn_link_send(sender, encoded.getData(), encoded.getSize()); + if (unreliable) { + pn_delivery_settle(token); + presettled = true; + } + pn_link_advance(sender); +} + +bool SenderContext::Delivery::sent() const +{ + return presettled || token; +} +bool SenderContext::Delivery::delivered() +{ + if (presettled || (token && (pn_delivery_remote_state(token) || pn_delivery_settled(token)))) { + //TODO: need a better means for signalling outcomes other than accepted + if (rejected()) { + QPID_LOG(warning, "delivery " << id << " was rejected by peer"); + } else if (!accepted()) { + QPID_LOG(info, "delivery " << id << " was not accepted by peer"); + } + return true; + } else { + return false; + } +} +bool SenderContext::Delivery::accepted() +{ + return pn_delivery_remote_state(token) == PN_ACCEPTED; +} +bool SenderContext::Delivery::rejected() +{ + return pn_delivery_remote_state(token) == PN_REJECTED; +} + +std::string SenderContext::Delivery::error() +{ + pn_condition_t *condition = pn_disposition_condition(pn_delivery_remote(token)); + return (condition && pn_condition_is_set(condition)) ? + Msg() << get_error_string(condition, std::string(), std::string()) : + std::string(); +} + +void SenderContext::Delivery::settle() +{ + pn_delivery_settle(token); +} +void SenderContext::verify() +{ + pn_terminus_t* target = pn_link_remote_target(sender); + if (!pn_terminus_get_address(target)) { + std::string msg("No such target : "); + msg += getTarget(); + QPID_LOG(debug, msg); + throw qpid::messaging::NotFound(msg); + } else if (AddressImpl::isTemporary(address)) { + address.setName(pn_terminus_get_address(target)); + QPID_LOG(debug, "Dynamic target name set to " << address.getName()); + } + + helper.checkAssertion(target, AddressHelper::FOR_SENDER); +} + +void SenderContext::configure() +{ + if (sender) configure(pn_link_target(sender)); +} + +void SenderContext::configure(pn_terminus_t* target) +{ + helper.configure(sender, target, AddressHelper::FOR_SENDER); + std::string option; + if (helper.getLinkSource(option)) { + pn_terminus_set_address(pn_link_source(sender), option.c_str()); + } else { + pn_terminus_set_address(pn_link_source(sender), pn_terminus_get_address(pn_link_target(sender))); + } +} + +bool SenderContext::settled() +{ + return processUnsettled(false) == 0; +} + +bool SenderContext::closed() +{ + return pn_link_state(sender) & PN_LOCAL_CLOSED; +} + +Address SenderContext::getAddress() const +{ + return address; +} + + +void SenderContext::reset(pn_session_t* session) +{ + sender = session ? pn_sender(session, name.c_str()) : 0; + if (sender) configure(); + for (Deliveries::iterator i = deliveries.begin(); i != deliveries.end(); ++i) + i->reset(); +} + +void SenderContext::resend() +{ + for (Deliveries::iterator i = deliveries.begin(); i != deliveries.end() && pn_link_credit(sender) && !i->sent(); ++i) { + i->send(sender, false/*only resend reliable transfers*/); + } +} + +}}} // namespace qpid::messaging::amqp |