diff options
Diffstat (limited to 'qpid/cpp/src/qpid/client/amqp0_10')
18 files changed, 4406 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp b/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp new file mode 100644 index 0000000000..d2accddcd0 --- /dev/null +++ b/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp @@ -0,0 +1,153 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "AcceptTracker.h" + +namespace qpid { +namespace client { +namespace amqp0_10 { + +void AcceptTracker::State::accept() +{ + unconfirmed.add(unaccepted); + unaccepted.clear(); +} + +SequenceSet AcceptTracker::State::accept(qpid::framing::SequenceNumber id, bool cumulative) +{ + SequenceSet accepting; + if (cumulative) { + for (SequenceSet::iterator i = unaccepted.begin(); i != unaccepted.end() && *i <= id; ++i) { + accepting.add(*i); + } + unconfirmed.add(accepting); + unaccepted.remove(accepting); + } else { + if (unaccepted.contains(id)) { + unaccepted.remove(id); + unconfirmed.add(id); + accepting.add(id); + } + } + return accepting; +} + +void AcceptTracker::State::release() +{ + unaccepted.clear(); +} + +uint32_t AcceptTracker::State::acceptsPending() +{ + return unconfirmed.size(); +} + +void AcceptTracker::State::completed(qpid::framing::SequenceSet& set) +{ + unconfirmed.remove(set); +} + +void AcceptTracker::delivered(const std::string& destination, const qpid::framing::SequenceNumber& id) +{ + aggregateState.unaccepted.add(id); + destinationState[destination].unaccepted.add(id); +} + +namespace +{ +const size_t FLUSH_FREQUENCY = 1024; +} + +void AcceptTracker::addToPending(qpid::client::AsyncSession& session, const Record& record) +{ + pending.push_back(record); + if (pending.size() % FLUSH_FREQUENCY == 0) session.flush(); +} + + +void AcceptTracker::accept(qpid::client::AsyncSession& session) +{ + for (StateMap::iterator i = destinationState.begin(); i != destinationState.end(); ++i) { + i->second.accept(); + } + Record record; + record.status = session.messageAccept(aggregateState.unaccepted); + record.accepted = aggregateState.unaccepted; + addToPending(session, record); + aggregateState.accept(); +} + +void AcceptTracker::accept(qpid::framing::SequenceNumber id, qpid::client::AsyncSession& session, bool cumulative) +{ + for (StateMap::iterator i = destinationState.begin(); i != destinationState.end(); ++i) { + i->second.accept(id, cumulative); + } + Record record; + record.accepted = aggregateState.accept(id, cumulative); + record.status = session.messageAccept(record.accepted); + addToPending(session, record); +} + +void AcceptTracker::release(qpid::client::AsyncSession& session) +{ + session.messageRelease(aggregateState.unaccepted); + for (StateMap::iterator i = destinationState.begin(); i != destinationState.end(); ++i) { + i->second.release(); + } + aggregateState.release(); +} + +uint32_t AcceptTracker::acceptsPending() +{ + checkPending(); + return aggregateState.acceptsPending(); +} + +uint32_t AcceptTracker::acceptsPending(const std::string& destination) +{ + checkPending(); + return destinationState[destination].acceptsPending(); +} + +void AcceptTracker::reset() +{ + destinationState.clear(); + aggregateState.unaccepted.clear(); + aggregateState.unconfirmed.clear(); + pending.clear(); +} + +void AcceptTracker::checkPending() +{ + while (!pending.empty() && pending.front().status.isComplete()) { + completed(pending.front().accepted); + pending.pop_front(); + } +} + +void AcceptTracker::completed(qpid::framing::SequenceSet& set) +{ + for (StateMap::iterator i = destinationState.begin(); i != destinationState.end(); ++i) { + i->second.completed(set); + } + aggregateState.completed(set); +} + +}}} // namespace qpid::client::amqp0_10 diff --git a/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.h b/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.h new file mode 100644 index 0000000000..85209c3b87 --- /dev/null +++ b/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.h @@ -0,0 +1,88 @@ +#ifndef QPID_CLIENT_AMQP0_10_ACCEPTTRACKER_H +#define QPID_CLIENT_AMQP0_10_ACCEPTTRACKER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/client/AsyncSession.h" +#include "qpid/client/Completion.h" +#include "qpid/framing/SequenceNumber.h" +#include "qpid/framing/SequenceSet.h" +#include <deque> +#include <map> + +namespace qpid { +namespace client { +namespace amqp0_10 { + +/** + * Tracks the set of messages requiring acceptance, and those for + * which an accept has been issued but is yet to be confirmed + * complete. + */ +class AcceptTracker +{ + public: + void delivered(const std::string& destination, const qpid::framing::SequenceNumber& id); + void accept(qpid::client::AsyncSession&); + void accept(qpid::framing::SequenceNumber, qpid::client::AsyncSession&, bool cumulative); + void release(qpid::client::AsyncSession&); + uint32_t acceptsPending(); + uint32_t acceptsPending(const std::string& destination); + void reset(); + private: + struct State + { + /** + * ids of messages that have been delivered but not yet + * accepted + */ + qpid::framing::SequenceSet unaccepted; + /** + * ids of messages for which an accept has been issued but not + * yet confirmed as completed + */ + qpid::framing::SequenceSet unconfirmed; + + void accept(); + qpid::framing::SequenceSet accept(qpid::framing::SequenceNumber, bool cumulative); + void release(); + uint32_t acceptsPending(); + void completed(qpid::framing::SequenceSet&); + }; + typedef std::map<std::string, State> StateMap; + struct Record + { + qpid::client::Completion status; + qpid::framing::SequenceSet accepted; + }; + typedef std::deque<Record> Records; + + State aggregateState; + StateMap destinationState; + Records pending; + + void addToPending(qpid::client::AsyncSession&, const Record&); + void checkPending(); + void completed(qpid::framing::SequenceSet&); +}; +}}} // namespace qpid::client::amqp0_10 + +#endif /*!QPID_CLIENT_AMQP0_10_ACCEPTTRACKER_H*/ diff --git a/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp b/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp new file mode 100644 index 0000000000..ed931c90fb --- /dev/null +++ b/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp @@ -0,0 +1,1058 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/client/amqp0_10/AddressResolution.h" +#include "qpid/amqp_0_10/Codecs.h" +#include "qpid/client/amqp0_10/MessageSource.h" +#include "qpid/client/amqp0_10/MessageSink.h" +#include "qpid/client/amqp0_10/OutgoingMessage.h" +#include "qpid/messaging/Address.h" +#include "qpid/messaging/AddressImpl.h" +#include "qpid/messaging/Message.h" +#include "qpid/types/Variant.h" +#include "qpid/messaging/exceptions.h" +#include "qpid/log/Statement.h" +#include "qpid/framing/enum.h" +#include "qpid/framing/ExchangeBoundResult.h" +#include "qpid/framing/ExchangeQueryResult.h" +#include "qpid/framing/FieldTable.h" +#include "qpid/framing/FieldValue.h" +#include "qpid/framing/QueueQueryResult.h" +#include "qpid/framing/ReplyTo.h" +#include "qpid/framing/reply_exceptions.h" +#include "qpid/framing/Uuid.h" +#include <boost/assign.hpp> +#include <boost/format.hpp> + +namespace qpid { +namespace client { +namespace amqp0_10 { + +using qpid::Exception; +using qpid::messaging::Address; +using qpid::messaging::AddressError; +using qpid::messaging::MalformedAddress; +using qpid::messaging::ResolutionError; +using qpid::messaging::NotFound; +using qpid::messaging::AssertionFailed; +using qpid::framing::ExchangeBoundResult; +using qpid::framing::ExchangeQueryResult; +using qpid::framing::FieldTable; +using qpid::framing::FieldValue; +using qpid::framing::QueueQueryResult; +using qpid::framing::ReplyTo; +using qpid::framing::Uuid; +using namespace qpid::types; +using namespace qpid::framing::message; +using namespace qpid::amqp_0_10; +using namespace boost::assign; + +class Verifier +{ + public: + Verifier(); + void verify(const Address& address) const; + private: + Variant::Map defined; + void verify(const Variant::Map& allowed, const Variant::Map& actual) const; +}; + +namespace{ +const Variant EMPTY_VARIANT; +const FieldTable EMPTY_FIELD_TABLE; +const Variant::List EMPTY_LIST; +const std::string EMPTY_STRING; + +//policy types +const std::string CREATE("create"); +const std::string ASSERT("assert"); +const std::string DELETE("delete"); + +//option names +const std::string NODE("node"); +const std::string LINK("link"); +const std::string MODE("mode"); +const std::string RELIABILITY("reliability"); +const std::string TIMEOUT("timeout"); +const std::string NAME("name"); +const std::string DURABLE("durable"); +const std::string X_DECLARE("x-declare"); +const std::string X_SUBSCRIBE("x-subscribe"); +const std::string X_BINDINGS("x-bindings"); +const std::string SELECTOR("selector"); +const std::string APACHE_SELECTOR("x-apache-selector"); +const std::string QPID_FILTER("qpid.filter"); +const std::string EXCHANGE("exchange"); +const std::string QUEUE("queue"); +const std::string KEY("key"); +const std::string ARGUMENTS("arguments"); +const std::string ALTERNATE_EXCHANGE("alternate-exchange"); +const std::string TYPE("type"); +const std::string EXCLUSIVE("exclusive"); +const std::string AUTO_DELETE("auto-delete"); + +//policy values +const std::string ALWAYS("always"); +const std::string NEVER("never"); +const std::string RECEIVER("receiver"); +const std::string SENDER("sender"); + +//address types +const std::string QUEUE_ADDRESS("queue"); +const std::string TOPIC_ADDRESS("topic"); + +//reliability options: +const std::string UNRELIABLE("unreliable"); +const std::string AT_MOST_ONCE("at-most-once"); +const std::string AT_LEAST_ONCE("at-least-once"); +const std::string EXACTLY_ONCE("exactly-once"); + +//receiver modes: +const std::string BROWSE("browse"); +const std::string CONSUME("consume"); + +//0-10 exchange types: +const std::string TOPIC_EXCHANGE("topic"); +const std::string FANOUT_EXCHANGE("fanout"); +const std::string DIRECT_EXCHANGE("direct"); +const std::string HEADERS_EXCHANGE("headers"); +const std::string XML_EXCHANGE("xml"); +const std::string WILDCARD_ANY("#"); + +//exchange prefixes: +const std::string PREFIX_AMQ("amq."); +const std::string PREFIX_QPID("qpid."); + +const Verifier verifier; + +bool areEquivalent(const FieldValue& a, const FieldValue& b) +{ + return ((a == b) || (a.convertsTo<int64_t>() && b.convertsTo<int64_t>() && a.get<int64_t>() == b.get<int64_t>())); +} +} + +struct Binding +{ + Binding(const Variant::Map&); + Binding(const std::string& exchange, const std::string& queue, const std::string& key); + + std::string exchange; + std::string queue; + std::string key; + FieldTable arguments; +}; + +struct Bindings : std::vector<Binding> +{ + void add(const Variant::List& bindings); + void setDefaultExchange(const std::string&); + void setDefaultQueue(const std::string&); + void bind(qpid::client::AsyncSession& session); + void unbind(qpid::client::AsyncSession& session); + void check(qpid::client::AsyncSession& session); +}; + +class Node +{ + protected: + enum CheckMode {FOR_RECEIVER, FOR_SENDER}; + + Node(const Address& address); + + const std::string name; + Variant createPolicy; + Variant assertPolicy; + Variant deletePolicy; + Bindings nodeBindings; + Bindings linkBindings; + + static bool enabled(const Variant& policy, CheckMode mode); + static bool createEnabled(const Address& address, CheckMode mode); + static void convert(const Variant& option, FieldTable& arguments); + static std::vector<std::string> RECEIVER_MODES; + static std::vector<std::string> SENDER_MODES; +}; + + +class Queue : protected Node +{ + public: + Queue(const Address& address); + protected: + void checkCreate(qpid::client::AsyncSession&, CheckMode); + void checkAssert(qpid::client::AsyncSession&, CheckMode); + void checkDelete(qpid::client::AsyncSession&, CheckMode); + private: + const bool durable; + bool autoDelete; + bool exclusive; + const std::string alternateExchange; + FieldTable arguments; +}; + +class Exchange : protected Node +{ + public: + Exchange(const Address& address); + protected: + void checkCreate(qpid::client::AsyncSession&, CheckMode); + void checkAssert(qpid::client::AsyncSession&, CheckMode); + void checkDelete(qpid::client::AsyncSession&, CheckMode); + bool isReservedName(); + + protected: + const std::string specifiedType; + private: + const bool durable; + bool autoDelete; + const std::string alternateExchange; + FieldTable arguments; +}; + +class QueueSource : public Queue, public MessageSource +{ + public: + QueueSource(const Address& address); + void subscribe(qpid::client::AsyncSession& session, const std::string& destination); + void cancel(qpid::client::AsyncSession& session, const std::string& destination); + private: + const AcquireMode acquireMode; + const AcceptMode acceptMode; + bool exclusive; + FieldTable options; +}; + +class Subscription : public Exchange, public MessageSource +{ + public: + Subscription(const Address&, const std::string& actualType); + void subscribe(qpid::client::AsyncSession& session, const std::string& destination); + void cancel(qpid::client::AsyncSession& session, const std::string& destination); + private: + const std::string queue; + const bool durable; + const bool reliable; + const std::string actualType; + const bool exclusiveQueue; + const bool autoDeleteQueue; + const bool exclusiveSubscription; + const std::string alternateExchange; + FieldTable queueOptions; + FieldTable subscriptionOptions; + Bindings bindings; + + void bindSubject(const std::string& subject); + void bindAll(); + void add(const std::string& exchange, const std::string& key); + static std::string getSubscriptionName(const std::string& base, const std::string& name); +}; + +class ExchangeSink : public Exchange, public MessageSink +{ + public: + ExchangeSink(const Address& name); + void declare(qpid::client::AsyncSession& session, const std::string& name); + void send(qpid::client::AsyncSession& session, const std::string& name, OutgoingMessage& message); + void cancel(qpid::client::AsyncSession& session, const std::string& name); + private: +}; + +class QueueSink : public Queue, public MessageSink +{ + public: + QueueSink(const Address& name); + void declare(qpid::client::AsyncSession& session, const std::string& name); + void send(qpid::client::AsyncSession& session, const std::string& name, OutgoingMessage& message); + void cancel(qpid::client::AsyncSession& session, const std::string& name); + private: +}; +bool isQueue(qpid::client::Session session, const qpid::messaging::Address& address); +bool isTopic(qpid::client::Session session, const qpid::messaging::Address& address); + +bool in(const Variant& value, const std::vector<std::string>& choices) +{ + if (!value.isVoid()) { + for (std::vector<std::string>::const_iterator i = choices.begin(); i != choices.end(); ++i) { + if (value.asString() == *i) return true; + } + } + return false; +} + +const Variant& getOption(const Variant::Map& options, const std::string& name) +{ + Variant::Map::const_iterator j = options.find(name); + if (j == options.end()) { + return EMPTY_VARIANT; + } else { + return j->second; + } +} + +const Variant& getOption(const Address& address, const std::string& name) +{ + return getOption(address.getOptions(), name); +} + +bool getReceiverPolicy(const Address& address, const std::string& key) +{ + return in(getOption(address, key), list_of<std::string>(ALWAYS)(RECEIVER)); +} + +bool getSenderPolicy(const Address& address, const std::string& key) +{ + return in(getOption(address, key), list_of<std::string>(ALWAYS)(SENDER)); +} + +struct Opt +{ + Opt(const Address& address); + Opt(const Variant::Map& base); + Opt& operator/(const std::string& name); + operator bool() const; + operator std::string() const; + std::string str() const; + bool asBool(bool defaultValue) const; + const Variant::List& asList() const; + void collect(qpid::framing::FieldTable& args) const; + bool hasKey(const std::string&) const; + + const Variant::Map* options; + const Variant* value; +}; + +Opt::Opt(const Address& address) : options(&(address.getOptions())), value(0) {} +Opt::Opt(const Variant::Map& base) : options(&base), value(0) {} +Opt& Opt::operator/(const std::string& name) +{ + if (options) { + Variant::Map::const_iterator j = options->find(name); + if (j == options->end()) { + value = 0; + options = 0; + } else { + value = &(j->second); + if (value->getType() == VAR_MAP) options = &(value->asMap()); + else options = 0; + } + } + return *this; +} + + +Opt::operator bool() const +{ + return value && !value->isVoid() && value->asBool(); +} + +Opt::operator std::string() const +{ + return str(); +} + +bool Opt::asBool(bool defaultValue) const +{ + if (value) return value->asBool(); + else return defaultValue; +} + +std::string Opt::str() const +{ + if (value) return value->asString(); + else return EMPTY_STRING; +} + +const Variant::List& Opt::asList() const +{ + if (value) return value->asList(); + else return EMPTY_LIST; +} + +void Opt::collect(qpid::framing::FieldTable& args) const +{ + if (value) { + translate(value->asMap(), args); + } +} +bool Opt::hasKey(const std::string& key) const +{ + if (value) { + Variant::Map::const_iterator i = value->asMap().find(key); + return i != value->asMap().end(); + } else { + return false; + } +} + +bool AddressResolution::is_unreliable(const Address& address) +{ + + return in((Opt(address)/LINK/RELIABILITY).str(), + list_of<std::string>(UNRELIABLE)(AT_MOST_ONCE)); +} + +bool AddressResolution::is_reliable(const Address& address) +{ + return in((Opt(address)/LINK/RELIABILITY).str(), + list_of<std::string>(AT_LEAST_ONCE)(EXACTLY_ONCE)); +} + +std::string checkAddressType(qpid::client::Session session, const Address& address) +{ + verifier.verify(address); + if (address.getName().empty()) { + throw MalformedAddress("Name cannot be null"); + } + std::string type = (Opt(address)/NODE/TYPE).str(); + if (type.empty()) { + ExchangeBoundResult result = session.exchangeBound(arg::exchange=address.getName(), arg::queue=address.getName()); + if (result.getQueueNotFound() && result.getExchangeNotFound()) { + //neither a queue nor an exchange exists with that name; treat it as a queue + type = QUEUE_ADDRESS; + } else if (result.getExchangeNotFound()) { + //name refers to a queue + type = QUEUE_ADDRESS; + } else if (result.getQueueNotFound()) { + //name refers to an exchange + type = TOPIC_ADDRESS; + } else { + //both a queue and exchange exist for that name + throw ResolutionError("Ambiguous address, please specify queue or topic as node type"); + } + } + return type; +} + +std::auto_ptr<MessageSource> AddressResolution::resolveSource(qpid::client::Session session, + const Address& address) +{ + std::string type = checkAddressType(session, address); + if (type == TOPIC_ADDRESS) { + std::string exchangeType = sync(session).exchangeQuery(address.getName()).getType(); + std::auto_ptr<MessageSource> source(new Subscription(address, exchangeType)); + QPID_LOG(debug, "treating source address as topic: " << address); + return source; + } else if (type == QUEUE_ADDRESS) { + std::auto_ptr<MessageSource> source(new QueueSource(address)); + QPID_LOG(debug, "treating source address as queue: " << address); + return source; + } else { + throw ResolutionError("Unrecognised type: " + type); + } +} + + +std::auto_ptr<MessageSink> AddressResolution::resolveSink(qpid::client::Session session, + const qpid::messaging::Address& address) +{ + std::string type = checkAddressType(session, address); + if (type == TOPIC_ADDRESS) { + std::auto_ptr<MessageSink> sink(new ExchangeSink(address)); + QPID_LOG(debug, "treating target address as topic: " << address); + return sink; + } else if (type == QUEUE_ADDRESS) { + std::auto_ptr<MessageSink> sink(new QueueSink(address)); + QPID_LOG(debug, "treating target address as queue: " << address); + return sink; + } else { + throw ResolutionError("Unrecognised type: " + type); + } +} + +bool isBrowse(const Address& address) +{ + const Variant& mode = getOption(address, MODE); + if (!mode.isVoid()) { + std::string value = mode.asString(); + if (value == BROWSE) return true; + else if (value != CONSUME) throw ResolutionError("Invalid mode"); + } + return false; +} + +QueueSource::QueueSource(const Address& address) : + Queue(address), + acquireMode(isBrowse(address) ? ACQUIRE_MODE_NOT_ACQUIRED : ACQUIRE_MODE_PRE_ACQUIRED), + //since this client does not provide any means by which an + //unacquired message can be acquired, there is no value in an + //explicit accept + acceptMode(acquireMode == ACQUIRE_MODE_NOT_ACQUIRED || AddressResolution::is_unreliable(address) ? ACCEPT_MODE_NONE : ACCEPT_MODE_EXPLICIT), + exclusive(false) +{ + exclusive = Opt(address)/LINK/X_SUBSCRIBE/EXCLUSIVE; + (Opt(address)/LINK/X_SUBSCRIBE/ARGUMENTS).collect(options); + std::string selector = Opt(address)/LINK/SELECTOR; + if (!selector.empty()) options.setString(APACHE_SELECTOR, selector); +} + +void QueueSource::subscribe(qpid::client::AsyncSession& session, const std::string& destination) +{ + checkCreate(session, FOR_RECEIVER); + checkAssert(session, FOR_RECEIVER); + linkBindings.bind(session); + session.messageSubscribe(arg::queue=name, + arg::destination=destination, + arg::acceptMode=acceptMode, + arg::acquireMode=acquireMode, + arg::exclusive=exclusive, + arg::arguments=options); +} + +void QueueSource::cancel(qpid::client::AsyncSession& session, const std::string& destination) +{ + linkBindings.unbind(session); + session.messageCancel(destination); + checkDelete(session, FOR_RECEIVER); +} + +std::string Subscription::getSubscriptionName(const std::string& base, const std::string& name) +{ + if (name.empty()) { + return (boost::format("%1%_%2%") % base % Uuid(true).str()).str(); + } else { + return name; + } +} + +Subscription::Subscription(const Address& address, const std::string& type) + : Exchange(address), + queue(getSubscriptionName(name, (Opt(address)/LINK/NAME).str())), + durable(Opt(address)/LINK/DURABLE), + //if the link is durable, then assume it is also reliable unless explicitly stated otherwise + //if not assume it is unreliable unless explicitly stated otherwise + reliable(durable ? !AddressResolution::is_unreliable(address) : AddressResolution::is_reliable(address)), + actualType(type.empty() ? (specifiedType.empty() ? TOPIC_EXCHANGE : specifiedType) : type), + exclusiveQueue((Opt(address)/LINK/X_DECLARE/EXCLUSIVE).asBool(true)), + autoDeleteQueue((Opt(address)/LINK/X_DECLARE/AUTO_DELETE).asBool(!(durable || reliable))), + exclusiveSubscription((Opt(address)/LINK/X_SUBSCRIBE/EXCLUSIVE).asBool(exclusiveQueue)), + alternateExchange((Opt(address)/LINK/X_DECLARE/ALTERNATE_EXCHANGE).str()) +{ + + if ((Opt(address)/LINK).hasKey(TIMEOUT)) { + const Variant* timeout = (Opt(address)/LINK/TIMEOUT).value; + if (timeout->asUint32()) queueOptions.setInt("qpid.auto_delete_timeout", timeout->asUint32()); + } else if (durable && !AddressResolution::is_reliable(address) && !(Opt(address)/LINK/X_DECLARE).hasKey(AUTO_DELETE)) { + //if durable, not explicitly reliable, and auto-delete not + //explicitly set, then set a non-zero default for the + //autodelete timeout + queueOptions.setInt("qpid.auto_delete_timeout", 2*60); + } + (Opt(address)/LINK/X_DECLARE/ARGUMENTS).collect(queueOptions); + (Opt(address)/LINK/X_SUBSCRIBE/ARGUMENTS).collect(subscriptionOptions); + std::string selector = Opt(address)/LINK/SELECTOR; + if (!selector.empty()) queueOptions.setString(QPID_FILTER, selector); + + if (!address.getSubject().empty()) bindSubject(address.getSubject()); + else if (linkBindings.empty()) bindAll(); +} + +void Subscription::bindSubject(const std::string& subject) +{ + if (actualType == HEADERS_EXCHANGE) { + Binding b(name, queue, subject); + b.arguments.setString("qpid.subject", subject); + b.arguments.setString("x-match", "all"); + bindings.push_back(b); + } else if (actualType == XML_EXCHANGE) { + Binding b(name, queue, subject); + std::string query = (boost::format("declare variable $qpid.subject external; $qpid.subject = '%1%'") + % subject).str(); + b.arguments.setString("xquery", query); + bindings.push_back(b); + } else { + //Note: the fanout exchange doesn't support any filtering, so + //the subject is ignored in that case + add(name, subject); + } +} + +void Subscription::bindAll() +{ + if (actualType == TOPIC_EXCHANGE) { + add(name, WILDCARD_ANY); + } else if (actualType == FANOUT_EXCHANGE) { + add(name, queue); + } else if (actualType == HEADERS_EXCHANGE) { + Binding b(name, queue, "match-all"); + b.arguments.setString("x-match", "all"); + bindings.push_back(b); + } else if (actualType == XML_EXCHANGE) { + Binding b(name, queue, EMPTY_STRING); + b.arguments.setString("xquery", "true()"); + bindings.push_back(b); + } else { + add(name, EMPTY_STRING); + } +} + +void Subscription::add(const std::string& exchange, const std::string& key) +{ + bindings.push_back(Binding(exchange, queue, key)); +} + +void Subscription::subscribe(qpid::client::AsyncSession& session, const std::string& destination) +{ + //create exchange if required and specified by policy: + checkCreate(session, FOR_RECEIVER); + checkAssert(session, FOR_RECEIVER); + + //create subscription queue: + session.queueDeclare(arg::queue=queue, arg::exclusive=exclusiveQueue, + arg::autoDelete=autoDeleteQueue, arg::durable=durable, + arg::alternateExchange=alternateExchange, + arg::arguments=queueOptions); + //'default' binding: + bindings.bind(session); + //any explicit bindings: + linkBindings.setDefaultQueue(queue); + linkBindings.bind(session); + //subscribe to subscription queue: + AcceptMode accept = reliable ? ACCEPT_MODE_EXPLICIT : ACCEPT_MODE_NONE; + session.messageSubscribe(arg::queue=queue, arg::destination=destination, + arg::exclusive=exclusiveSubscription, arg::acceptMode=accept, arg::arguments=subscriptionOptions); +} + +void Subscription::cancel(qpid::client::AsyncSession& session, const std::string& destination) +{ + linkBindings.unbind(session); + session.messageCancel(destination); + if (exclusiveQueue) session.queueDelete(arg::queue=queue, arg::ifUnused=true); + checkDelete(session, FOR_RECEIVER); +} + +ExchangeSink::ExchangeSink(const Address& address) : Exchange(address) {} + +void ExchangeSink::declare(qpid::client::AsyncSession& session, const std::string&) +{ + checkCreate(session, FOR_SENDER); + checkAssert(session, FOR_SENDER); + linkBindings.bind(session); +} + +void ExchangeSink::send(qpid::client::AsyncSession& session, const std::string&, OutgoingMessage& m) +{ + m.send(session, name, m.getSubject()); +} + +void ExchangeSink::cancel(qpid::client::AsyncSession& session, const std::string&) +{ + linkBindings.unbind(session); + checkDelete(session, FOR_SENDER); +} + +QueueSink::QueueSink(const Address& address) : Queue(address) {} + +void QueueSink::declare(qpid::client::AsyncSession& session, const std::string&) +{ + checkCreate(session, FOR_SENDER); + checkAssert(session, FOR_SENDER); + linkBindings.bind(session); +} +void QueueSink::send(qpid::client::AsyncSession& session, const std::string&, OutgoingMessage& m) +{ + m.send(session, name); +} + +void QueueSink::cancel(qpid::client::AsyncSession& session, const std::string&) +{ + linkBindings.unbind(session); + checkDelete(session, FOR_SENDER); +} + +Address AddressResolution::convert(const qpid::framing::ReplyTo& rt) +{ + Address address; + if (rt.getExchange().empty()) {//if default exchange, treat as queue + if (!rt.getRoutingKey().empty()) { + address.setName(rt.getRoutingKey()); + address.setType(QUEUE_ADDRESS); + } + } else { + address.setName(rt.getExchange()); + address.setSubject(rt.getRoutingKey()); + address.setType(TOPIC_ADDRESS); + } + return address; +} + +qpid::framing::ReplyTo AddressResolution::convert(const Address& address) +{ + if (address.getType() == QUEUE_ADDRESS || address.getType().empty()) { + return ReplyTo(EMPTY_STRING, address.getName()); + } else if (address.getType() == TOPIC_ADDRESS) { + return ReplyTo(address.getName(), address.getSubject()); + } else { + QPID_LOG(notice, "Unrecognised type for reply-to: " << address.getType()); + return ReplyTo(EMPTY_STRING, address.getName());//treat as queue + } +} + +bool isQueue(qpid::client::Session session, const qpid::messaging::Address& address) +{ + return address.getType() == QUEUE_ADDRESS || + (address.getType().empty() && session.queueQuery(address.getName()).getQueue() == address.getName()); +} + +bool isTopic(qpid::client::Session session, const qpid::messaging::Address& address) +{ + if (address.getType().empty()) { + return !session.exchangeQuery(address.getName()).getNotFound(); + } else if (address.getType() == TOPIC_ADDRESS) { + return true; + } else { + return false; + } +} + +Node::Node(const Address& address) : name(address.getName()), + createPolicy(getOption(address, CREATE)), + assertPolicy(getOption(address, ASSERT)), + deletePolicy(getOption(address, DELETE)) +{ + nodeBindings.add((Opt(address)/NODE/X_BINDINGS).asList()); + linkBindings.add((Opt(address)/LINK/X_BINDINGS).asList()); +} + +Queue::Queue(const Address& a) : Node(a), + durable(Opt(a)/NODE/DURABLE), + autoDelete(Opt(a)/NODE/X_DECLARE/AUTO_DELETE), + exclusive(Opt(a)/NODE/X_DECLARE/EXCLUSIVE), + alternateExchange((Opt(a)/NODE/X_DECLARE/ALTERNATE_EXCHANGE).str()) +{ + (Opt(a)/NODE/X_DECLARE/ARGUMENTS).collect(arguments); + nodeBindings.setDefaultQueue(name); + linkBindings.setDefaultQueue(name); + if (qpid::messaging::AddressImpl::isTemporary(a) && createPolicy.isVoid()) { + createPolicy = "always"; + Opt specified = Opt(a)/NODE/X_DECLARE; + if (!specified.hasKey(AUTO_DELETE)) autoDelete = true; + if (!specified.hasKey(EXCLUSIVE)) exclusive = true; + } +} + +void Queue::checkCreate(qpid::client::AsyncSession& session, CheckMode mode) +{ + if (enabled(createPolicy, mode)) { + QPID_LOG(debug, "Auto-creating queue '" << name << "'"); + try { + session.queueDeclare(arg::queue=name, + arg::durable=durable, + arg::autoDelete=autoDelete, + arg::exclusive=exclusive, + arg::alternateExchange=alternateExchange, + arg::arguments=arguments); + nodeBindings.bind(session); + session.sync(); + } catch (const qpid::framing::ResourceLockedException& e) { + throw ResolutionError((boost::format("Creation failed for queue %1%; %2%") % name % e.what()).str()); + } catch (const qpid::framing::NotAllowedException& e) { + throw ResolutionError((boost::format("Creation failed for queue %1%; %2%") % name % e.what()).str()); + } catch (const qpid::framing::NotFoundException& e) {//may be thrown when creating bindings + throw ResolutionError((boost::format("Creation failed for queue %1%; %2%") % name % e.what()).str()); + } + } else { + try { + sync(session).queueDeclare(arg::queue=name, arg::passive=true); + } catch (const qpid::framing::NotFoundException& /*e*/) { + throw NotFound((boost::format("Queue %1% does not exist") % name).str()); + } + } +} + +void Queue::checkDelete(qpid::client::AsyncSession& session, CheckMode mode) +{ + //Note: queue-delete will cause a session exception if the queue + //does not exist, the query here prevents obvious cases of this + //but there is a race whenever two deletions are made concurrently + //so careful use of the delete policy is recommended at present + if (enabled(deletePolicy, mode) && sync(session).queueQuery(name).getQueue() == name) { + QPID_LOG(debug, "Auto-deleting queue '" << name << "'"); + sync(session).queueDelete(arg::queue=name); + } +} + +void Queue::checkAssert(qpid::client::AsyncSession& session, CheckMode mode) +{ + if (enabled(assertPolicy, mode)) { + QueueQueryResult result = sync(session).queueQuery(name); + if (result.getQueue() != name) { + throw NotFound((boost::format("Queue not found: %1%") % name).str()); + } else { + if (durable && !result.getDurable()) { + throw AssertionFailed((boost::format("Queue not durable: %1%") % name).str()); + } + if (autoDelete && !result.getAutoDelete()) { + throw AssertionFailed((boost::format("Queue not set to auto-delete: %1%") % name).str()); + } + if (exclusive && !result.getExclusive()) { + throw AssertionFailed((boost::format("Queue not exclusive: %1%") % name).str()); + } + if (!alternateExchange.empty() && result.getAlternateExchange() != alternateExchange) { + throw AssertionFailed((boost::format("Alternate exchange does not match for %1%, expected %2%, got %3%") + % name % alternateExchange % result.getAlternateExchange()).str()); + } + for (FieldTable::ValueMap::const_iterator i = arguments.begin(); i != arguments.end(); ++i) { + FieldTable::ValuePtr v = result.getArguments().get(i->first); + if (!v) { + throw AssertionFailed((boost::format("Option %1% not set for %2%") % i->first % name).str()); + } else if (!areEquivalent(*i->second, *v)) { + throw AssertionFailed((boost::format("Option %1% does not match for %2%, expected %3%, got %4%") + % i->first % name % *(i->second) % *v).str()); + } + } + nodeBindings.check(session); + } + } +} + +Exchange::Exchange(const Address& a) : Node(a), + specifiedType((Opt(a)/NODE/X_DECLARE/TYPE).str()), + durable(Opt(a)/NODE/DURABLE), + autoDelete(Opt(a)/NODE/X_DECLARE/AUTO_DELETE), + alternateExchange((Opt(a)/NODE/X_DECLARE/ALTERNATE_EXCHANGE).str()) +{ + (Opt(a)/NODE/X_DECLARE/ARGUMENTS).collect(arguments); + nodeBindings.setDefaultExchange(name); + linkBindings.setDefaultExchange(name); + if (qpid::messaging::AddressImpl::isTemporary(a) && createPolicy.isVoid()) { + createPolicy = "always"; + if (!(Opt(a)/NODE/X_DECLARE).hasKey(AUTO_DELETE)) autoDelete = true; + } +} + +bool Exchange::isReservedName() +{ + return name.find(PREFIX_AMQ) != std::string::npos || name.find(PREFIX_QPID) != std::string::npos; +} + +void Exchange::checkCreate(qpid::client::AsyncSession& session, CheckMode mode) +{ + if (enabled(createPolicy, mode)) { + try { + if (isReservedName()) { + try { + sync(session).exchangeDeclare(arg::exchange=name, arg::passive=true); + } catch (const qpid::framing::NotFoundException& /*e*/) { + throw ResolutionError((boost::format("Cannot create exchange %1%; names beginning with \"amq.\" or \"qpid.\" are reserved.") % name).str()); + } + + } else { + std::string type = specifiedType; + if (type.empty()) type = TOPIC_EXCHANGE; + session.exchangeDeclare(arg::exchange=name, + arg::type=type, + arg::durable=durable, + arg::autoDelete=autoDelete, + arg::alternateExchange=alternateExchange, + arg::arguments=arguments); + } + nodeBindings.bind(session); + session.sync(); + } catch (const qpid::framing::NotAllowedException& e) { + throw ResolutionError((boost::format("Create failed for exchange %1%; %2%") % name % e.what()).str()); + } catch (const qpid::framing::NotFoundException& e) {//can be caused when creating bindings + throw ResolutionError((boost::format("Create failed for exchange %1%; %2%") % name % e.what()).str()); + } + } else { + try { + sync(session).exchangeDeclare(arg::exchange=name, arg::passive=true); + } catch (const qpid::framing::NotFoundException& /*e*/) { + throw NotFound((boost::format("Exchange %1% does not exist") % name).str()); + } + } +} + +void Exchange::checkDelete(qpid::client::AsyncSession& session, CheckMode mode) +{ + //Note: exchange-delete will cause a session exception if the + //exchange does not exist, the query here prevents obvious cases + //of this but there is a race whenever two deletions are made + //concurrently so careful use of the delete policy is recommended + //at present + if (enabled(deletePolicy, mode) && !sync(session).exchangeQuery(name).getNotFound()) { + sync(session).exchangeDelete(arg::exchange=name); + } +} + +void Exchange::checkAssert(qpid::client::AsyncSession& session, CheckMode mode) +{ + if (enabled(assertPolicy, mode)) { + ExchangeQueryResult result = sync(session).exchangeQuery(name); + if (result.getNotFound()) { + throw NotFound((boost::format("Exchange not found: %1%") % name).str()); + } else { + if (specifiedType.size() && result.getType() != specifiedType) { + throw AssertionFailed((boost::format("Exchange %1% is of incorrect type, expected %2% but got %3%") + % name % specifiedType % result.getType()).str()); + } + if (durable && !result.getDurable()) { + throw AssertionFailed((boost::format("Exchange not durable: %1%") % name).str()); + } + //Note: Can't check auto-delete or alternate-exchange via + //exchange-query-result as these are not returned + //TODO: could use a passive declare to check alternate-exchange + for (FieldTable::ValueMap::const_iterator i = arguments.begin(); i != arguments.end(); ++i) { + FieldTable::ValuePtr v = result.getArguments().get(i->first); + if (!v) { + throw AssertionFailed((boost::format("Option %1% not set for %2%") % i->first % name).str()); + } else if (!areEquivalent(*i->second, *v)) { + throw AssertionFailed((boost::format("Option %1% does not match for %2%, expected %3%, got %4%") + % i->first % name % *(i->second) % *v).str()); + } + } + nodeBindings.check(session); + } + } +} + +Binding::Binding(const Variant::Map& b) : + exchange((Opt(b)/EXCHANGE).str()), + queue((Opt(b)/QUEUE).str()), + key((Opt(b)/KEY).str()) +{ + (Opt(b)/ARGUMENTS).collect(arguments); +} + +Binding::Binding(const std::string& e, const std::string& q, const std::string& k) : exchange(e), queue(q), key(k) {} + + +void Bindings::add(const Variant::List& list) +{ + for (Variant::List::const_iterator i = list.begin(); i != list.end(); ++i) { + push_back(Binding(i->asMap())); + } +} + +void Bindings::setDefaultExchange(const std::string& exchange) +{ + for (Bindings::iterator i = begin(); i != end(); ++i) { + if (i->exchange.empty()) i->exchange = exchange; + } +} + +void Bindings::setDefaultQueue(const std::string& queue) +{ + for (Bindings::iterator i = begin(); i != end(); ++i) { + if (i->queue.empty()) i->queue = queue; + } +} + +void Bindings::bind(qpid::client::AsyncSession& session) +{ + for (Bindings::const_iterator i = begin(); i != end(); ++i) { + session.exchangeBind(arg::queue=i->queue, + arg::exchange=i->exchange, + arg::bindingKey=i->key, + arg::arguments=i->arguments); + } +} + +void Bindings::unbind(qpid::client::AsyncSession& session) +{ + for (Bindings::const_iterator i = begin(); i != end(); ++i) { + session.exchangeUnbind(arg::queue=i->queue, + arg::exchange=i->exchange, + arg::bindingKey=i->key); + } +} + +void Bindings::check(qpid::client::AsyncSession& session) +{ + for (Bindings::const_iterator i = begin(); i != end(); ++i) { + ExchangeBoundResult result = sync(session).exchangeBound(arg::queue=i->queue, + arg::exchange=i->exchange, + arg::bindingKey=i->key); + if (result.getQueueNotMatched() || result.getKeyNotMatched()) { + throw AssertionFailed((boost::format("No such binding [exchange=%1%, queue=%2%, key=%3%]") + % i->exchange % i->queue % i->key).str()); + } + } +} + +bool Node::enabled(const Variant& policy, CheckMode mode) +{ + bool result = false; + switch (mode) { + case FOR_RECEIVER: + result = in(policy, RECEIVER_MODES); + break; + case FOR_SENDER: + result = in(policy, SENDER_MODES); + break; + } + return result; +} + +bool Node::createEnabled(const Address& address, CheckMode mode) +{ + const Variant& policy = getOption(address, CREATE); + return enabled(policy, mode); +} + +void Node::convert(const Variant& options, FieldTable& arguments) +{ + if (!options.isVoid()) { + translate(options.asMap(), arguments); + } +} +std::vector<std::string> Node::RECEIVER_MODES = list_of<std::string>(ALWAYS) (RECEIVER); +std::vector<std::string> Node::SENDER_MODES = list_of<std::string>(ALWAYS) (SENDER); + +Verifier::Verifier() +{ + defined[CREATE] = true; + defined[ASSERT] = true; + defined[DELETE] = true; + defined[MODE] = true; + Variant::Map node; + node[TYPE] = true; + node[DURABLE] = true; + node[X_DECLARE] = true; + node[X_BINDINGS] = true; + defined[NODE] = node; + Variant::Map link; + link[NAME] = true; + link[DURABLE] = true; + link[RELIABILITY] = true; + link[TIMEOUT] = true; + link[X_SUBSCRIBE] = true; + link[X_DECLARE] = true; + link[X_BINDINGS] = true; + link[SELECTOR] = true; + defined[LINK] = link; +} +void Verifier::verify(const Address& address) const +{ + verify(defined, address.getOptions()); +} + +void Verifier::verify(const Variant::Map& allowed, const Variant::Map& actual) const +{ + for (Variant::Map::const_iterator i = actual.begin(); i != actual.end(); ++i) { + Variant::Map::const_iterator option = allowed.find(i->first); + if (option == allowed.end()) { + throw AddressError((boost::format("Unrecognised option: %1%") % i->first).str()); + } else if (option->second.getType() == qpid::types::VAR_MAP) { + verify(option->second.asMap(), i->second.asMap()); + } + } +} + +}}} // namespace qpid::client::amqp0_10 diff --git a/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.h b/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.h new file mode 100644 index 0000000000..fc8f1a1d18 --- /dev/null +++ b/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.h @@ -0,0 +1,64 @@ +#ifndef QPID_CLIENT_AMQP0_10_ADDRESSRESOLUTION_H +#define QPID_CLIENT_AMQP0_10_ADDRESSRESOLUTION_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/client/Session.h" + +namespace qpid { + +namespace framing{ +class ReplyTo; +} + +namespace messaging { +class Address; +} + +namespace client { +namespace amqp0_10 { + +class MessageSource; +class MessageSink; + +/** + * Maps from a generic Address and optional Filter to an AMQP 0-10 + * MessageSource which will then be used by a ReceiverImpl instance + * created for the address. + */ +class AddressResolution +{ + public: + std::auto_ptr<MessageSource> resolveSource(qpid::client::Session session, + const qpid::messaging::Address& address); + + std::auto_ptr<MessageSink> resolveSink(qpid::client::Session session, + const qpid::messaging::Address& address); + + static qpid::messaging::Address convert(const qpid::framing::ReplyTo&); + static qpid::framing::ReplyTo convert(const qpid::messaging::Address&); + static bool is_unreliable(const qpid::messaging::Address& address); + static bool is_reliable(const qpid::messaging::Address& address); + private: +}; +}}} // namespace qpid::client::amqp0_10 + +#endif /*!QPID_CLIENT_AMQP0_10_ADDRESSRESOLUTION_H*/ diff --git a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp new file mode 100644 index 0000000000..11ef06e517 --- /dev/null +++ b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp @@ -0,0 +1,404 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "ConnectionImpl.h" +#include "SessionImpl.h" +#include "qpid/messaging/exceptions.h" +#include "qpid/messaging/Session.h" +#include "qpid/messaging/PrivateImplRef.h" +#include "qpid/framing/Uuid.h" +#include "qpid/log/Statement.h" +#include "qpid/Url.h" +#include "qpid/amqp_0_10/Codecs.h" +#include <boost/intrusive_ptr.hpp> +#include <vector> +#include <sstream> +#include <limits> + +namespace qpid { +namespace client { +namespace amqp0_10 { + +using qpid::types::Variant; +using qpid::types::VAR_LIST; +using qpid::framing::Uuid; + +namespace { + +const std::string TCP("tcp"); +const std::string COLON(":"); +double FOREVER(std::numeric_limits<double>::max()); + +// Time values in seconds can be specified as integer or floating point values. +double timeValue(const Variant& value) { + if (types::isIntegerType(value.getType())) + return double(value.asInt64()); + return value.asDouble(); +} + +void merge(const std::string& value, std::vector<std::string>& list) { + if (std::find(list.begin(), list.end(), value) == list.end()) + list.push_back(value); +} + +void merge(const Variant::List& from, std::vector<std::string>& to) +{ + for (Variant::List::const_iterator i = from.begin(); i != from.end(); ++i) + merge(i->asString(), to); +} + +std::string asString(const std::vector<std::string>& v) { + std::stringstream os; + os << "["; + for(std::vector<std::string>::const_iterator i = v.begin(); i != v.end(); ++i ) { + if (i != v.begin()) os << ", "; + os << *i; + } + os << "]"; + return os.str(); +} + +bool expired(const sys::AbsTime& start, double timeout) +{ + if (timeout == 0) return true; + if (timeout == FOREVER) return false; + sys::Duration used(start, sys::now()); + sys::Duration allowed((int64_t)(timeout*sys::TIME_SEC)); + return allowed < used; +} + +} // namespace + +ConnectionImpl::ConnectionImpl(const std::string& url, const Variant::Map& options) : + replaceUrls(false), autoReconnect(false), timeout(FOREVER), limit(-1), + minReconnectInterval(0.001), maxReconnectInterval(2), + retries(0), reconnectOnLimitExceeded(true), disableAutoDecode(false) +{ + setOptions(options); + urls.insert(urls.begin(), url); +} + +void ConnectionImpl::setOptions(const Variant::Map& options) +{ + for (Variant::Map::const_iterator i = options.begin(); i != options.end(); ++i) { + setOption(i->first, i->second); + } +} + +void ConnectionImpl::setOption(const std::string& name, const Variant& value) +{ + sys::Mutex::ScopedLock l(lock); + if (name == "reconnect") { + autoReconnect = value; + } else if (name == "reconnect-timeout" || name == "reconnect_timeout") { + timeout = timeValue(value); + } else if (name == "reconnect-limit" || name == "reconnect_limit") { + limit = value; + } else if (name == "reconnect-interval" || name == "reconnect_interval") { + maxReconnectInterval = minReconnectInterval = timeValue(value); + } else if (name == "reconnect-interval-min" || name == "reconnect_interval_min") { + minReconnectInterval = timeValue(value); + } else if (name == "reconnect-interval-max" || name == "reconnect_interval_max") { + maxReconnectInterval = timeValue(value); + } else if (name == "reconnect-urls-replace" || name == "reconnect_urls_replace") { + replaceUrls = value.asBool(); + } else if (name == "reconnect-urls" || name == "reconnect_urls") { + if (replaceUrls) urls.clear(); + if (value.getType() == VAR_LIST) { + merge(value.asList(), urls); + } else { + merge(value.asString(), urls); + } + } else if (name == "username") { + settings.username = value.asString(); + } else if (name == "password") { + settings.password = value.asString(); + } else if (name == "sasl-mechanism" || name == "sasl_mechanism" || + name == "sasl-mechanisms" || name == "sasl_mechanisms") { + settings.mechanism = value.asString(); + } else if (name == "sasl-service" || name == "sasl_service") { + settings.service = value.asString(); + } else if (name == "sasl-min-ssf" || name == "sasl_min_ssf") { + settings.minSsf = value; + } else if (name == "sasl-max-ssf" || name == "sasl_max_ssf") { + settings.maxSsf = value; + } else if (name == "heartbeat") { + settings.heartbeat = value; + } else if (name == "tcp-nodelay" || name == "tcp_nodelay") { + settings.tcpNoDelay = value; + } else if (name == "locale") { + settings.locale = value.asString(); + } else if (name == "max-channels" || name == "max_channels") { + settings.maxChannels = value; + } else if (name == "max-frame-size" || name == "max_frame_size") { + settings.maxFrameSize = value; + } else if (name == "bounds") { + settings.bounds = value; + } else if (name == "transport") { + settings.protocol = value.asString(); + } else if (name == "ssl-cert-name" || name == "ssl_cert_name") { + settings.sslCertName = value.asString(); + } else if (name == "ssl-ignore-hostname-verification-failure" || name == "ssl_ignore_hostname_verification_failure") { + settings.sslIgnoreHostnameVerificationFailure = value; + } else if (name == "x-reconnect-on-limit-exceeded" || name == "x_reconnect_on_limit_exceeded") { + reconnectOnLimitExceeded = value; + } else if (name == "client-properties" || name == "client_properties") { + amqp_0_10::translate(value.asMap(), settings.clientProperties); + } else if (name == "disable-auto-decode" || name == "disable_auto_decode") { + disableAutoDecode = value; + } else { + throw qpid::messaging::MessagingException(QPID_MSG("Invalid option: " << name << " not recognised")); + } +} + + +void ConnectionImpl::close() +{ + while(true) { + messaging::Session session; + { + qpid::sys::Mutex::ScopedLock l(lock); + if (sessions.empty()) break; + session = sessions.begin()->second; + } + session.close(); + } + detach(); +} + +void ConnectionImpl::detach() +{ + qpid::sys::Mutex::ScopedLock l(lock); + connection.close(); +} + +bool ConnectionImpl::isOpen() const +{ + qpid::sys::Mutex::ScopedLock l(lock); + return connection.isOpen(); +} + +boost::intrusive_ptr<SessionImpl> getImplPtr(qpid::messaging::Session& session) +{ + return boost::dynamic_pointer_cast<SessionImpl>( + qpid::messaging::PrivateImplRef<qpid::messaging::Session>::get(session) + ); +} + +void ConnectionImpl::closed(SessionImpl& s) +{ + qpid::sys::Mutex::ScopedLock l(lock); + for (Sessions::iterator i = sessions.begin(); i != sessions.end(); ++i) { + if (getImplPtr(i->second).get() == &s) { + sessions.erase(i); + break; + } + } +} + +qpid::messaging::Session ConnectionImpl::getSession(const std::string& name) const +{ + qpid::sys::Mutex::ScopedLock l(lock); + Sessions::const_iterator i = sessions.find(name); + if (i == sessions.end()) { + throw qpid::messaging::KeyError("No such session: " + name); + } else { + return i->second; + } +} + +qpid::messaging::Session ConnectionImpl::newSession(bool transactional, const std::string& n) +{ + std::string name = n.empty() ? Uuid(true).str() : n; + qpid::messaging::Session impl(new SessionImpl(*this, transactional)); + while (true) { + try { + getImplPtr(impl)->setSession(connection.newSession(name)); + qpid::sys::Mutex::ScopedLock l(lock); + sessions[name] = impl; + break; + } catch (const qpid::TransportFailure&) { + reopen(); + } catch (const qpid::SessionException& e) { + SessionImpl::rethrow(e); + } catch (const std::exception& e) { + throw qpid::messaging::MessagingException(e.what()); + } + } + return impl; +} + +void ConnectionImpl::open() +{ + qpid::sys::AbsTime start = qpid::sys::now(); + qpid::sys::ScopedLock<qpid::sys::Semaphore> l(semaphore); + try { + if (!connection.isOpen()) connect(start); + } + catch (const types::Exception&) { throw; } + catch (const qpid::Exception& e) { throw messaging::ConnectionError(e.what()); } +} + +void ConnectionImpl::reopen() +{ + if (!autoReconnect) { + throw qpid::messaging::TransportFailure("Failed to connect (reconnect disabled)"); + } + open(); +} + + +void ConnectionImpl::connect(const qpid::sys::AbsTime& started) +{ + QPID_LOG(debug, "Starting connection, urls=" << asString(urls)); + for (double i = minReconnectInterval; !tryConnect(); i = std::min(i*2, maxReconnectInterval)) { + if (!autoReconnect) { + throw qpid::messaging::TransportFailure("Failed to connect (reconnect disabled)"); + } + if (limit >= 0 && retries++ >= limit) { + throw qpid::messaging::TransportFailure("Failed to connect within reconnect limit"); + } + if (expired(started, timeout)) { + throw qpid::messaging::TransportFailure("Failed to connect within reconnect timeout"); + } + QPID_LOG(debug, "Connection retry in " << i*1000*1000 << " microseconds, urls=" + << asString(urls)); + qpid::sys::usleep(int64_t(i*1000*1000)); // Sleep in microseconds. + } + QPID_LOG(debug, "Connection successful, urls=" << asString(urls)); + retries = 0; +} + +void ConnectionImpl::mergeUrls(const std::vector<Url>& more, const sys::Mutex::ScopedLock&) { + for (std::vector<Url>::const_iterator i = more.begin(); i != more.end(); ++i) + merge(i->str(), urls); + QPID_LOG(debug, "Added known-hosts, reconnect-urls=" << asString(urls)); +} + +bool ConnectionImpl::tryConnect() +{ + sys::Mutex::ScopedLock l(lock); + for (std::vector<std::string>::const_iterator i = urls.begin(); i != urls.end(); ++i) { + try { + QPID_LOG(info, "Trying to connect to " << *i << "..."); + Url url(*i, settings.protocol.size() ? settings.protocol : TCP); + if (url.getUser().size()) settings.username = url.getUser(); + if (url.getPass().size()) settings.password = url.getPass(); + connection.open(url, settings); + QPID_LOG(info, "Connected to " << *i); + mergeUrls(connection.getInitialBrokers(), l); + return resetSessions(l); + } catch (const qpid::ProtocolVersionError& e) { + throw qpid::messaging::ProtocolVersionError("AMQP 0-10 not supported"); + } catch (const qpid::TransportFailure& e) { + QPID_LOG(info, "Failed to connect to " << *i << ": " << e.what()); + } + } + return false; +} + +bool ConnectionImpl::resetSessions(const sys::Mutex::ScopedLock& ) +{ + try { + qpid::sys::Mutex::ScopedLock l(lock); + for (Sessions::iterator i = sessions.begin(); i != sessions.end(); ++i) { + if (!getImplPtr(i->second)->isTransactional()) { + getImplPtr(i->second)->setSession(connection.newSession(i->first)); + } + } + return true; + } catch (const qpid::TransportFailure& e) { + QPID_LOG(debug, "Connection Failed to re-initialize sessions: " << e.what()); + return false; + } catch (const qpid::framing::ResourceLimitExceededException& e) { + if (reconnectOnLimitExceeded) { + QPID_LOG(debug, "Detaching and reconnecting due to: " << e.what()); + detach(); + return false; + } else { + throw qpid::messaging::TargetCapacityExceeded(e.what()); + } + } +} + +bool ConnectionImpl::backoff() +{ + if (reconnectOnLimitExceeded) { + detach(); + open(); + return true; + } else { + return false; + } +} + +void ConnectionImpl::reconnect(const std::string& u) +{ + sys::Mutex::ScopedLock l(lock); + try { + QPID_LOG(info, "Trying to connect to " << u << "..."); + Url url(u, settings.protocol.size() ? settings.protocol : TCP); + if (url.getUser().size()) settings.username = url.getUser(); + if (url.getPass().size()) settings.password = url.getPass(); + connection.open(url, settings); + QPID_LOG(info, "Connected to " << u); + mergeUrls(connection.getInitialBrokers(), l); + if (!resetSessions(l)) throw qpid::messaging::TransportFailure("Could not re-establish sessions"); + } catch (const qpid::TransportFailure& e) { + QPID_LOG(info, "Failed to connect to " << u << ": " << e.what()); + throw qpid::messaging::TransportFailure(e.what()); + } catch (const std::exception& e) { + QPID_LOG(info, "Error while connecting to " << u << ": " << e.what()); + throw qpid::messaging::MessagingException(e.what()); + } +} + +void ConnectionImpl::reconnect() +{ + if (!tryConnect()) { + throw qpid::messaging::TransportFailure("Could not reconnect"); + } +} +std::string ConnectionImpl::getUrl() const +{ + if (isOpen()) { + std::stringstream u; + u << connection.getNegotiatedSettings().protocol << COLON << connection.getNegotiatedSettings().host << COLON << connection.getNegotiatedSettings().port; + return u.str(); + } else { + return std::string(); + } +} + +std::string ConnectionImpl::getAuthenticatedUsername() +{ + return connection.getNegotiatedSettings().username; +} + +bool ConnectionImpl::getAutoDecode() const +{ + return !disableAutoDecode; +} +bool ConnectionImpl::getAutoReconnect() const +{ + return autoReconnect; +} + +}}} // namespace qpid::client::amqp0_10 diff --git a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h new file mode 100644 index 0000000000..bf8a759107 --- /dev/null +++ b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h @@ -0,0 +1,88 @@ +#ifndef QPID_CLIENT_AMQP0_10_CONNECTIONIMPL_H +#define QPID_CLIENT_AMQP0_10_CONNECTIONIMPL_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/messaging/ConnectionImpl.h" +#include "qpid/types/Variant.h" +#include "qpid/client/Connection.h" +#include "qpid/client/ConnectionSettings.h" +#include "qpid/sys/Mutex.h" +#include "qpid/sys/Semaphore.h" +#include <map> +#include <vector> + +namespace qpid { +struct Url; + +namespace client { +namespace amqp0_10 { + +class SessionImpl; + +class ConnectionImpl : public qpid::messaging::ConnectionImpl +{ + public: + ConnectionImpl(const std::string& url, const qpid::types::Variant::Map& options); + void open(); + void reopen(); + bool isOpen() const; + void close(); + qpid::messaging::Session newSession(bool transactional, const std::string& name); + qpid::messaging::Session getSession(const std::string& name) const; + void closed(SessionImpl&); + void detach(); + void setOption(const std::string& name, const qpid::types::Variant& value); + bool backoff(); + std::string getAuthenticatedUsername(); + void reconnect(const std::string& url); + void reconnect(); + std::string getUrl() const; + bool getAutoDecode() const; + bool getAutoReconnect() const; + private: + typedef std::map<std::string, qpid::messaging::Session> Sessions; + + mutable qpid::sys::Mutex lock;//used to protect data structures + qpid::sys::Semaphore semaphore;//used to coordinate reconnection + Sessions sessions; + qpid::client::Connection connection; + bool replaceUrls; // Replace rather than merging with reconnect-urls + std::vector<std::string> urls; + qpid::client::ConnectionSettings settings; + bool autoReconnect; + double timeout; + int32_t limit; + double minReconnectInterval; + double maxReconnectInterval; + int32_t retries; + bool reconnectOnLimitExceeded; + bool disableAutoDecode; + + void setOptions(const qpid::types::Variant::Map& options); + void connect(const qpid::sys::AbsTime& started); + bool tryConnect(); + bool resetSessions(const sys::Mutex::ScopedLock&); // dummy parameter indicates call with lock held. + void mergeUrls(const std::vector<Url>& more, const sys::Mutex::ScopedLock&); +}; +}}} // namespace qpid::client::amqp0_10 + +#endif /*!QPID_CLIENT_AMQP0_10_CONNECTIONIMPL_H*/ diff --git a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp new file mode 100644 index 0000000000..2ca2c85c64 --- /dev/null +++ b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp @@ -0,0 +1,466 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/client/amqp0_10/IncomingMessages.h" +#include "qpid/client/amqp0_10/AddressResolution.h" +#include "qpid/amqp_0_10/Codecs.h" +#include "qpid/client/SessionImpl.h" +#include "qpid/client/SessionBase_0_10Access.h" +#include "qpid/log/Statement.h" +#include "qpid/messaging/Address.h" +#include "qpid/messaging/Duration.h" +#include "qpid/messaging/Message.h" +#include "qpid/messaging/MessageImpl.h" +#include "qpid/types/Variant.h" +#include "qpid/framing/DeliveryProperties.h" +#include "qpid/framing/FrameSet.h" +#include "qpid/framing/MessageProperties.h" +#include "qpid/framing/MessageTransferBody.h" +#include "qpid/framing/enum.h" +#include <algorithm> + +namespace qpid { +namespace client { +namespace amqp0_10 { + +using namespace qpid::framing; +using namespace qpid::framing::message; +using namespace qpid::amqp_0_10; +using qpid::sys::AbsTime; +using qpid::sys::Duration; +using qpid::messaging::MessageImplAccess; +using qpid::types::Variant; + +namespace { +const std::string EMPTY_STRING; + + +struct GetNone : IncomingMessages::Handler +{ + bool accept(IncomingMessages::MessageTransfer&) { return false; } +}; + +struct GetAny : IncomingMessages::Handler +{ + bool accept(IncomingMessages::MessageTransfer& transfer) + { + transfer.retrieve(0); + return true; + } +}; + +struct MatchAndTrack +{ + const std::string destination; + SequenceSet ids; + + MatchAndTrack(const std::string& d) : destination(d) {} + + bool operator()(boost::shared_ptr<qpid::framing::FrameSet> command) + { + if (command->as<MessageTransferBody>()->getDestination() == destination) { + ids.add(command->getId()); + return true; + } else { + return false; + } + } +}; + +struct Match +{ + const std::string destination; + uint32_t matched; + + Match(const std::string& d) : destination(d), matched(0) {} + + bool operator()(boost::shared_ptr<qpid::framing::FrameSet> command) + { + if (command->as<MessageTransferBody>()->getDestination() == destination) { + ++matched; + return true; + } else { + return false; + } + } +}; + +struct ScopedRelease +{ + bool& flag; + qpid::sys::Monitor& lock; + + ScopedRelease(bool& f, qpid::sys::Monitor& l) : flag(f), lock(l) {} + ~ScopedRelease() + { + sys::Monitor::ScopedLock l(lock); + flag = false; + lock.notifyAll(); + } +}; +} + +IncomingMessages::IncomingMessages() : inUse(false) {} + +void IncomingMessages::setSession(qpid::client::AsyncSession s) +{ + sys::Mutex::ScopedLock l(lock); + session = s; + incoming = SessionBase_0_10Access(session).get()->getDemux().getDefault(); + acceptTracker.reset(); +} + +namespace { +qpid::sys::Duration get_duration(qpid::sys::Duration timeout, qpid::sys::AbsTime deadline) +{ + if (timeout == qpid::sys::TIME_INFINITE) { + return qpid::sys::TIME_INFINITE; + } else { + return std::max(qpid::sys::Duration(0), qpid::sys::Duration(AbsTime::now(), deadline)); + } +} +} + +bool IncomingMessages::get(Handler& handler, qpid::sys::Duration timeout) +{ + sys::Mutex::ScopedLock l(lock); + AbsTime deadline(AbsTime::now(), timeout); + do { + //search through received list for any transfer of interest: + for (FrameSetQueue::iterator i = received.begin(); i != received.end();) + { + MessageTransfer transfer(*i, *this); + if (transfer.checkExpired()) { + i = received.erase(i); + } else if (handler.accept(transfer)) { + received.erase(i); + return true; + } else { + ++i; + } + } + if (inUse) { + //someone is already waiting on the incoming session queue, wait for them to finish + lock.wait(deadline); + } else { + inUse = true; + ScopedRelease release(inUse, lock); + sys::Mutex::ScopedUnlock l(lock); + //wait for suitable new message to arrive + switch (process(&handler, get_duration(timeout, deadline))) { + case OK: + return true; + case CLOSED: + return false; + case EMPTY: + break; + } + } + if (handler.isClosed()) throw qpid::messaging::ReceiverError("Receiver has been closed"); + } while (AbsTime::now() < deadline); + return false; +} +namespace { +struct Wakeup : public qpid::types::Exception {}; +} + +void IncomingMessages::wakeup() +{ + sys::Mutex::ScopedLock l(lock); + incoming->close(qpid::sys::ExceptionHolder(new Wakeup())); + lock.notifyAll(); +} + +bool IncomingMessages::getNextDestination(std::string& destination, qpid::sys::Duration timeout) +{ + sys::Mutex::ScopedLock l(lock); + AbsTime deadline(AbsTime::now(), timeout); + while (received.empty()) { + if (inUse) { + //someone is already waiting on the sessions incoming queue + lock.wait(deadline); + } else { + inUse = true; + ScopedRelease release(inUse, lock); + sys::Mutex::ScopedUnlock l(lock); + //wait for an incoming message + wait(get_duration(timeout, deadline)); + } + if (!(AbsTime::now() < deadline)) break; + } + if (!received.empty()) { + destination = received.front()->as<MessageTransferBody>()->getDestination(); + return true; + } else { + return false; + } +} + +void IncomingMessages::accept() +{ + sys::Mutex::ScopedLock l(lock); + acceptTracker.accept(session); +} + +void IncomingMessages::accept(qpid::framing::SequenceNumber id, bool cumulative) +{ + sys::Mutex::ScopedLock l(lock); + acceptTracker.accept(id, session, cumulative); +} + + +void IncomingMessages::releaseAll() +{ + { + //first process any received messages... + sys::Mutex::ScopedLock l(lock); + while (!received.empty()) { + retrieve(received.front(), 0); + received.pop_front(); + } + } + //then pump out any available messages from incoming queue... + GetAny handler; + while (process(&handler, 0) == OK) ; + //now release all messages + sys::Mutex::ScopedLock l(lock); + acceptTracker.release(session); +} + +void IncomingMessages::releasePending(const std::string& destination) +{ + //first pump all available messages from incoming to received... + while (process(0, 0) == OK) ; + + //now remove all messages for this destination from received list, recording their ids... + sys::Mutex::ScopedLock l(lock); + MatchAndTrack match(destination); + for (FrameSetQueue::iterator i = received.begin(); i != received.end(); i = match(*i) ? received.erase(i) : ++i) ; + //now release those messages + session.messageRelease(match.ids); +} + +bool IncomingMessages::pop(FrameSet::shared_ptr& content, qpid::sys::Duration timeout) +{ + try { + return incoming->pop(content, timeout); + } catch (const Wakeup&) { + incoming->open(); + return false; + } +} + +/** + * Get a frameset that is accepted by the specified handler from + * session queue, waiting for up to the specified duration and + * returning true if this could be achieved, false otherwise. Messages + * that are not accepted by the handler are pushed onto received queue + * for later retrieval. + */ +IncomingMessages::ProcessState IncomingMessages::process(Handler* handler, qpid::sys::Duration duration) +{ + AbsTime deadline(AbsTime::now(), duration); + FrameSet::shared_ptr content; + try { + for (Duration timeout = duration; pop(content, timeout); timeout = Duration(AbsTime::now(), deadline)) { + if (content->isA<MessageTransferBody>()) { + MessageTransfer transfer(content, *this); + if (transfer.checkExpired()) { + QPID_LOG(debug, "Expired received transfer: " << *content->getMethod()); + } else if (handler && handler->accept(transfer)) { + QPID_LOG(debug, "Delivered " << *content->getMethod() << " " + << *content->getHeaders()); + return OK; + } else { + //received message for another destination, keep for later + QPID_LOG(debug, "Pushed " << *content->getMethod() << " to received queue"); + sys::Mutex::ScopedLock l(lock); + received.push_back(content); + lock.notifyAll(); + } + } else { + //TODO: handle other types of commands (e.g. message-accept, message-flow etc) + } + } + } + catch (const qpid::ClosedException&) { return CLOSED; } + return EMPTY; +} + +bool IncomingMessages::wait(qpid::sys::Duration duration) +{ + AbsTime deadline(AbsTime::now(), duration); + FrameSet::shared_ptr content; + for (Duration timeout = duration; pop(content, timeout); timeout = Duration(AbsTime::now(), deadline)) { + if (content->isA<MessageTransferBody>()) { + QPID_LOG(debug, "Pushed " << *content->getMethod() << " to received queue"); + sys::Mutex::ScopedLock l(lock); + received.push_back(content); + lock.notifyAll(); + return true; + } else { + //TODO: handle other types of commands (e.g. message-accept, message-flow etc) + } + } + return false; +} + +uint32_t IncomingMessages::pendingAccept() +{ + sys::Mutex::ScopedLock l(lock); + return acceptTracker.acceptsPending(); +} +uint32_t IncomingMessages::pendingAccept(const std::string& destination) +{ + sys::Mutex::ScopedLock l(lock); + return acceptTracker.acceptsPending(destination); +} + +uint32_t IncomingMessages::available() +{ + //first pump all available messages from incoming to received... + while (process(0, 0) == OK) {} + //return the count of received messages + sys::Mutex::ScopedLock l(lock); + return received.size(); +} + +uint32_t IncomingMessages::available(const std::string& destination) +{ + //first pump all available messages from incoming to received... + while (process(0, 0) == OK) {} + + //count all messages for this destination from received list + sys::Mutex::ScopedLock l(lock); + return std::for_each(received.begin(), received.end(), Match(destination)).matched; +} + +void populate(qpid::messaging::Message& message, FrameSet& command); + +/** + * Called when message is retrieved; records retrieval for subsequent + * acceptance, marks the command as completed and converts command to + * message if message is required + */ +void IncomingMessages::retrieve(FrameSetPtr command, qpid::messaging::Message* message) +{ + if (message) { + populate(*message, *command); + } + const MessageTransferBody* transfer = command->as<MessageTransferBody>(); + if (transfer->getAcceptMode() == ACCEPT_MODE_EXPLICIT) { + sys::Mutex::ScopedLock l(lock); + acceptTracker.delivered(transfer->getDestination(), command->getId()); + } + session.markCompleted(command->getId(), false, false); +} + +IncomingMessages::MessageTransfer::MessageTransfer(FrameSetPtr c, IncomingMessages& p) : content(c), parent(p) {} + +const std::string& IncomingMessages::MessageTransfer::getDestination() +{ + return content->as<MessageTransferBody>()->getDestination(); +} +void IncomingMessages::MessageTransfer::retrieve(qpid::messaging::Message* message) +{ + parent.retrieve(content, message); +} + +bool IncomingMessages::MessageTransfer::checkExpired() +{ + if (content->hasExpired()) { + retrieve(0); + parent.accept(content->getId(), false); + return true; + } else { + return false; + } +} + +namespace { +//TODO: unify conversion to and from 0-10 message that is currently +//split between IncomingMessages and OutgoingMessage +const std::string SUBJECT("qpid.subject"); + +const std::string X_APP_ID("x-amqp-0-10.app-id"); +const std::string X_ROUTING_KEY("x-amqp-0-10.routing-key"); +const std::string X_CONTENT_ENCODING("x-amqp-0-10.content-encoding"); +const std::string X_TIMESTAMP("x-amqp-0-10.timestamp"); +} + +void populateHeaders(qpid::messaging::Message& message, + const DeliveryProperties* deliveryProperties, + const MessageProperties* messageProperties) +{ + if (deliveryProperties) { + message.setTtl(qpid::messaging::Duration(deliveryProperties->getTtl())); + message.setDurable(deliveryProperties->getDeliveryMode() == DELIVERY_MODE_PERSISTENT); + message.setPriority(deliveryProperties->getPriority()); + message.setRedelivered(deliveryProperties->getRedelivered()); + } + if (messageProperties) { + message.setContentType(messageProperties->getContentType()); + if (messageProperties->hasReplyTo()) { + message.setReplyTo(AddressResolution::convert(messageProperties->getReplyTo())); + } + message.setSubject(messageProperties->getApplicationHeaders().getAsString(SUBJECT)); + message.getProperties().clear(); + translate(messageProperties->getApplicationHeaders(), message.getProperties()); + message.setCorrelationId(messageProperties->getCorrelationId()); + message.setUserId(messageProperties->getUserId()); + if (messageProperties->hasMessageId()) { + message.setMessageId(messageProperties->getMessageId().str()); + } + //expose 0-10 specific items through special properties: + // app-id, content-encoding + if (messageProperties->hasAppId()) { + message.getProperties()[X_APP_ID] = messageProperties->getAppId(); + } + if (messageProperties->hasContentEncoding()) { + message.getProperties()[X_CONTENT_ENCODING] = messageProperties->getContentEncoding(); + } + // routing-key, timestamp, others? + if (deliveryProperties && deliveryProperties->hasRoutingKey()) { + message.getProperties()[X_ROUTING_KEY] = deliveryProperties->getRoutingKey(); + } + if (deliveryProperties && deliveryProperties->hasTimestamp()) { + message.getProperties()[X_TIMESTAMP] = deliveryProperties->getTimestamp(); + } + } +} + +void populateHeaders(qpid::messaging::Message& message, const AMQHeaderBody* headers) +{ + populateHeaders(message, headers->get<DeliveryProperties>(), headers->get<MessageProperties>()); +} + +void populate(qpid::messaging::Message& message, FrameSet& command) +{ + //need to be able to link the message back to the transfer it was delivered by + //e.g. for rejecting. + MessageImplAccess::get(message).setInternalId(command.getId()); + + message.setContent(command.getContent()); + + populateHeaders(message, command.getHeaders()); +} + + +}}} // namespace qpid::client::amqp0_10 diff --git a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h new file mode 100644 index 0000000000..4c9ee68ece --- /dev/null +++ b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h @@ -0,0 +1,108 @@ +#ifndef QPID_CLIENT_AMQP0_10_INCOMINGMESSAGES_H +#define QPID_CLIENT_AMQP0_10_INCOMINGMESSAGES_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <string> +#include <boost/shared_ptr.hpp> +#include "qpid/client/AsyncSession.h" +#include "qpid/framing/SequenceSet.h" +#include "qpid/sys/BlockingQueue.h" +#include "qpid/sys/Time.h" +#include "qpid/client/amqp0_10/AcceptTracker.h" + +namespace qpid { + +namespace framing{ +class FrameSet; +} + +namespace messaging { +class Message; +} + +namespace client { +namespace amqp0_10 { + +/** + * Queue of incoming messages. + */ +class IncomingMessages +{ + public: + typedef boost::shared_ptr<qpid::framing::FrameSet> FrameSetPtr; + class MessageTransfer + { + public: + const std::string& getDestination(); + void retrieve(qpid::messaging::Message* message); + private: + FrameSetPtr content; + IncomingMessages& parent; + bool checkExpired(); + + MessageTransfer(FrameSetPtr, IncomingMessages&); + friend class IncomingMessages; + }; + + struct Handler + { + virtual ~Handler() {} + virtual bool accept(MessageTransfer& transfer) = 0; + virtual bool isClosed() { return false; } + }; + + IncomingMessages(); + void setSession(qpid::client::AsyncSession session); + bool get(Handler& handler, qpid::sys::Duration timeout); + void wakeup(); + bool getNextDestination(std::string& destination, qpid::sys::Duration timeout); + void accept(); + void accept(qpid::framing::SequenceNumber id, bool cumulative); + void releaseAll(); + void releasePending(const std::string& destination); + + uint32_t pendingAccept(); + uint32_t pendingAccept(const std::string& destination); + + uint32_t available(); + uint32_t available(const std::string& destination); + private: + typedef std::deque<FrameSetPtr> FrameSetQueue; + enum ProcessState {EMPTY=0,OK=1,CLOSED=2}; + + sys::Monitor lock; + qpid::client::AsyncSession session; + boost::shared_ptr< sys::BlockingQueue<FrameSetPtr> > incoming; + bool inUse; + FrameSetQueue received; + AcceptTracker acceptTracker; + + ProcessState process(Handler*, qpid::sys::Duration); + bool wait(qpid::sys::Duration); + bool pop(FrameSetPtr&, qpid::sys::Duration); + + void retrieve(FrameSetPtr, qpid::messaging::Message*); + +}; +}}} // namespace qpid::client::amqp0_10 + +#endif /*!QPID_CLIENT_AMQP0_10_INCOMINGMESSAGES_H*/ diff --git a/qpid/cpp/src/qpid/client/amqp0_10/MessageSink.h b/qpid/cpp/src/qpid/client/amqp0_10/MessageSink.h new file mode 100644 index 0000000000..d66d2ecb3c --- /dev/null +++ b/qpid/cpp/src/qpid/client/amqp0_10/MessageSink.h @@ -0,0 +1,52 @@ +#ifndef QPID_CLIENT_AMQP0_10_MESSAGESINK_H +#define QPID_CLIENT_AMQP0_10_MESSAGESINK_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <string> +#include "qpid/client/AsyncSession.h" + +namespace qpid { + +namespace messaging { +class Message; +} + +namespace client { +namespace amqp0_10 { + +class OutgoingMessage; + +/** + * + */ +class MessageSink +{ + public: + virtual ~MessageSink() {} + virtual void declare(qpid::client::AsyncSession& session, const std::string& name) = 0; + virtual void send(qpid::client::AsyncSession& session, const std::string& name, OutgoingMessage& message) = 0; + virtual void cancel(qpid::client::AsyncSession& session, const std::string& name) = 0; + private: +}; +}}} // namespace qpid::client::amqp0_10 + +#endif /*!QPID_CLIENT_AMQP0_10_MESSAGESINK_H*/ diff --git a/qpid/cpp/src/qpid/client/amqp0_10/MessageSource.h b/qpid/cpp/src/qpid/client/amqp0_10/MessageSource.h new file mode 100644 index 0000000000..74f2732f59 --- /dev/null +++ b/qpid/cpp/src/qpid/client/amqp0_10/MessageSource.h @@ -0,0 +1,47 @@ +#ifndef QPID_CLIENT_AMQP0_10_MESSAGESOURCE_H +#define QPID_CLIENT_AMQP0_10_MESSAGESOURCE_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <string> +#include "qpid/client/AsyncSession.h" + +namespace qpid { +namespace client { +namespace amqp0_10 { + +/** + * Abstraction behind which the AMQP 0-10 commands required to + * establish (and tear down) an incoming stream of messages from a + * given address are hidden. + */ +class MessageSource +{ + public: + virtual ~MessageSource() {} + virtual void subscribe(qpid::client::AsyncSession& session, const std::string& destination) = 0; + virtual void cancel(qpid::client::AsyncSession& session, const std::string& destination) = 0; + + private: +}; +}}} // namespace qpid::client::amqp0_10 + +#endif /*!QPID_CLIENT_AMQP0_10_MESSAGESOURCE_H*/ diff --git a/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp b/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp new file mode 100644 index 0000000000..f2b205a78a --- /dev/null +++ b/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp @@ -0,0 +1,170 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/client/amqp0_10/OutgoingMessage.h" +#include "qpid/client/amqp0_10/AddressResolution.h" +#include "qpid/amqp_0_10/Codecs.h" +#include "qpid/types/encodings.h" +#include "qpid/types/Variant.h" +#include "qpid/messaging/Address.h" +#include "qpid/messaging/Duration.h" +#include "qpid/messaging/Message.h" +#include "qpid/messaging/MessageImpl.h" +#include "qpid/framing/enum.h" +#include "qpid/log/Statement.h" +#include <sstream> + +namespace qpid { +namespace client { +namespace amqp0_10 { + +using qpid::messaging::Address; +using qpid::messaging::MessageImplAccess; +using qpid::types::Variant; +using namespace qpid::framing::message; +using namespace qpid::amqp_0_10; + +namespace { +//TODO: unify conversion to and from 0-10 message that is currently +//split between IncomingMessages and OutgoingMessage +const std::string SUBJECT("qpid.subject"); +const std::string X_APP_ID("x-amqp-0-10.app-id"); +const std::string X_ROUTING_KEY("x-amqp-0-10.routing-key"); +const std::string X_CONTENT_ENCODING("x-amqp-0-10.content-encoding"); +const std::string TEXT_PLAIN("text/plain"); +} + +void OutgoingMessage::convert(const qpid::messaging::Message& from) +{ + //TODO: need to avoid copying as much as possible + if (from.getContentObject().getType() == qpid::types::VAR_MAP) { + std::string content; + qpid::amqp_0_10::MapCodec::encode(from.getContentObject().asMap(), content); + message.getMessageProperties().setContentType(qpid::amqp_0_10::MapCodec::contentType); + message.setData(content); + } else if (from.getContentObject().getType() == qpid::types::VAR_LIST) { + std::string content; + qpid::amqp_0_10::ListCodec::encode(from.getContentObject().asList(), content); + message.getMessageProperties().setContentType(qpid::amqp_0_10::ListCodec::contentType); + message.setData(content); + } else if (from.getContentObject().getType() == qpid::types::VAR_STRING && + (from.getContentObject().getEncoding() == qpid::types::encodings::UTF8 || from.getContentObject().getEncoding() == qpid::types::encodings::ASCII)) { + message.getMessageProperties().setContentType(TEXT_PLAIN); + message.setData(from.getContent()); + } else { + message.setData(from.getContent()); + message.getMessageProperties().setContentType(from.getContentType()); + } + if ( !from.getCorrelationId().empty() ) + message.getMessageProperties().setCorrelationId(from.getCorrelationId()); + message.getMessageProperties().setUserId(from.getUserId()); + const Address& address = from.getReplyTo(); + if (address) { + message.getMessageProperties().setReplyTo(AddressResolution::convert(address)); + } + if (!subject.empty()) { + Variant v(subject); v.setEncoding("utf8"); + translate(from.getProperties(), SUBJECT, v, message.getMessageProperties().getApplicationHeaders()); + } else { + translate(from.getProperties(), message.getMessageProperties().getApplicationHeaders()); + } + if (from.getTtl().getMilliseconds()) { + message.getDeliveryProperties().setTtl(from.getTtl().getMilliseconds()); + } + if (from.getDurable()) { + message.getDeliveryProperties().setDeliveryMode(DELIVERY_MODE_PERSISTENT); + } + if (from.getRedelivered()) { + message.getDeliveryProperties().setRedelivered(true); + } + if (from.getPriority()) message.getDeliveryProperties().setPriority(from.getPriority()); + + //allow certain 0-10 specific items to be set through special properties: + // message-id, app-id, content-encoding + if (from.getMessageId().size()) { + qpid::framing::Uuid uuid; + std::istringstream data(from.getMessageId()); + data >> uuid; + message.getMessageProperties().setMessageId(uuid); + } + Variant::Map::const_iterator i; + i = from.getProperties().find(X_APP_ID); + if (i != from.getProperties().end()) { + message.getMessageProperties().setAppId(i->second.asString()); + } + i = from.getProperties().find(X_CONTENT_ENCODING); + if (i != from.getProperties().end()) { + message.getMessageProperties().setContentEncoding(i->second.asString()); + } + base = qpid::sys::now(); +} + +void OutgoingMessage::setSubject(const std::string& s) +{ + subject = s; +} + +std::string OutgoingMessage::getSubject() const +{ + return subject; +} + +void OutgoingMessage::send(qpid::client::AsyncSession& session, const std::string& destination, const std::string& routingKey) +{ + if (!expired) { + message.getDeliveryProperties().setRoutingKey(routingKey); + status = session.messageTransfer(arg::destination=destination, arg::content=message); + if (destination.empty()) { + QPID_LOG(debug, "Sending to queue " << routingKey << " " << message.getMessageProperties() << " " << message.getDeliveryProperties()); + } else { + QPID_LOG(debug, "Sending to exchange " << destination << " " << message.getMessageProperties() << " " << message.getDeliveryProperties()); + } + } +} +void OutgoingMessage::send(qpid::client::AsyncSession& session, const std::string& routingKey) +{ + send(session, std::string(), routingKey); +} + +bool OutgoingMessage::isComplete() +{ + return expired || (status.isValid() && status.isComplete()); +} +void OutgoingMessage::markRedelivered() +{ + message.setRedelivered(true); + if (message.getDeliveryProperties().hasTtl()) { + uint64_t delta = qpid::sys::Duration(base, qpid::sys::now())/qpid::sys::TIME_MSEC; + uint64_t ttl = message.getDeliveryProperties().getTtl(); + if (ttl <= delta) { + QPID_LOG(debug, "Expiring outgoing message (" << ttl << " < " << delta << ")"); + expired = true; + message.getDeliveryProperties().setTtl(1); + } else { + QPID_LOG(debug, "Adjusting ttl on outgoing message from " << ttl << " by " << delta); + ttl = ttl - delta; + message.getDeliveryProperties().setTtl(ttl); + } + } +} +OutgoingMessage::OutgoingMessage() : expired (false) {} + + +}}} // namespace qpid::client::amqp0_10 diff --git a/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.h b/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.h new file mode 100644 index 0000000000..a17ef03e10 --- /dev/null +++ b/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.h @@ -0,0 +1,60 @@ +#ifndef QPID_CLIENT_AMQP0_10_OUTGOINGMESSAGE_H +#define QPID_CLIENT_AMQP0_10_OUTGOINGMESSAGE_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/client/AsyncSession.h" +#include "qpid/client/Completion.h" +#include "qpid/client/Message.h" +#include "qpid/sys/Time.h" + +namespace qpid { +namespace messaging { +class Message; +} +namespace client { +namespace amqp0_10 { + +class OutgoingMessage +{ + private: + qpid::client::Message message; + qpid::client::Completion status; + std::string subject; + qpid::sys::AbsTime base; + bool expired; + + public: + OutgoingMessage(); + void convert(const qpid::messaging::Message&); + void setSubject(const std::string& subject); + std::string getSubject() const; + void send(qpid::client::AsyncSession& session, const std::string& destination, const std::string& routingKey); + void send(qpid::client::AsyncSession& session,const std::string& routingKey); + bool isComplete(); + void markRedelivered(); +}; + + + +}}} // namespace qpid::client::amqp0_10 + +#endif /*!QPID_CLIENT_AMQP0_10_OUTGOINGMESSAGE_H*/ diff --git a/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp new file mode 100644 index 0000000000..c356bc298b --- /dev/null +++ b/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp @@ -0,0 +1,263 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "ReceiverImpl.h" +#include "AddressResolution.h" +#include "MessageSource.h" +#include "SessionImpl.h" +#include "qpid/messaging/exceptions.h" +#include "qpid/messaging/Receiver.h" +#include "qpid/messaging/Session.h" +#include "qpid/amqp_0_10/Codecs.h" +#include "qpid/types/encodings.h" + +namespace qpid { +namespace client { +namespace amqp0_10 { + +using qpid::messaging::NoMessageAvailable; +using qpid::messaging::Receiver; +using qpid::messaging::Duration; + +void ReceiverImpl::received(qpid::messaging::Message&) +{ + //TODO: should this be configurable + sys::Mutex::ScopedLock l(lock); + if (capacity && --window <= capacity/2) { + session.sendCompletion(); + window = capacity; + } +} + +qpid::messaging::Message ReceiverImpl::get(qpid::messaging::Duration timeout) +{ + qpid::messaging::Message result; + if (!get(result, timeout)) throw NoMessageAvailable(); + return result; +} + +qpid::messaging::Message ReceiverImpl::fetch(qpid::messaging::Duration timeout) +{ + qpid::messaging::Message result; + if (!fetch(result, timeout)) throw NoMessageAvailable(); + return result; +} + +bool ReceiverImpl::get(qpid::messaging::Message& message, qpid::messaging::Duration timeout) +{ + Get f(*this, message, timeout); + while (!parent->execute(f)) {} + return f.result; +} + +bool ReceiverImpl::fetch(qpid::messaging::Message& message, qpid::messaging::Duration timeout) +{ + Fetch f(*this, message, timeout); + while (!parent->execute(f)) {} + return f.result; +} + +void ReceiverImpl::close() +{ + execute<Close>(); +} + +void ReceiverImpl::start() +{ + sys::Mutex::ScopedLock l(lock); + if (state == STOPPED) { + state = STARTED; + startFlow(l); + session.sendCompletion(); + } +} + +void ReceiverImpl::stop() +{ + sys::Mutex::ScopedLock l(lock); + state = STOPPED; + session.messageStop(destination); +} + +void ReceiverImpl::setCapacity(uint32_t c) +{ + execute1<SetCapacity>(c); +} + +void ReceiverImpl::startFlow(const sys::Mutex::ScopedLock&) +{ + if (capacity > 0) { + session.messageSetFlowMode(destination, FLOW_MODE_WINDOW); + session.messageFlow(destination, CREDIT_UNIT_MESSAGE, capacity); + session.messageFlow(destination, CREDIT_UNIT_BYTE, byteCredit); + window = capacity; + } +} + +void ReceiverImpl::init(qpid::client::AsyncSession s, AddressResolution& resolver) +{ + sys::Mutex::ScopedLock l(lock); + session = s; + if (state == CANCELLED) return; + if (state == UNRESOLVED) { + source = resolver.resolveSource(session, address); + assert(source.get()); + state = STARTED; + } + source->subscribe(session, destination); + startFlow(l); +} + +const std::string& ReceiverImpl::getName() const { + return destination; +} + +uint32_t ReceiverImpl::getCapacity() +{ + sys::Mutex::ScopedLock l(lock); + return capacity; +} + +uint32_t ReceiverImpl::getAvailable() +{ + return parent->getReceivable(destination); +} + +uint32_t ReceiverImpl::getUnsettled() +{ + return parent->getUnsettledAcks(destination); +} + +qpid::messaging::Address ReceiverImpl::getAddress() const +{ + return address; +} + +ReceiverImpl::ReceiverImpl(SessionImpl& p, const std::string& name, + const qpid::messaging::Address& a, bool autoDecode_) : + + parent(&p), destination(name), address(a), byteCredit(0xFFFFFFFF), autoDecode(autoDecode_), + state(UNRESOLVED), capacity(0), window(0) {} + +namespace { +const std::string TEXT_PLAIN("text/plain"); +} + +bool ReceiverImpl::getImpl(qpid::messaging::Message& message, qpid::messaging::Duration timeout) +{ + { + sys::Mutex::ScopedLock l(lock); + if (state == CANCELLED) return false; + } + if (parent->get(*this, message, timeout)) { + if (autoDecode) { + if (message.getContentType() == qpid::amqp_0_10::MapCodec::contentType) { + message.getContentObject() = qpid::types::Variant::Map(); + decode(message, message.getContentObject().asMap()); + } else if (message.getContentType() == qpid::amqp_0_10::ListCodec::contentType) { + message.getContentObject() = qpid::types::Variant::List(); + decode(message, message.getContentObject().asList()); + } else if (!message.getContentBytes().empty()) { + message.getContentObject() = message.getContentBytes(); + if (message.getContentType() == TEXT_PLAIN) { + message.getContentObject().setEncoding(qpid::types::encodings::UTF8); + } else { + message.getContentObject().setEncoding(qpid::types::encodings::BINARY); + } + } + } + return true; + } else { + return false; + } +} + +bool ReceiverImpl::fetchImpl(qpid::messaging::Message& message, qpid::messaging::Duration timeout) +{ + { + sys::Mutex::ScopedLock l(lock); + if (state == CANCELLED) return false; + + if (capacity == 0 || state != STARTED) { + session.messageSetFlowMode(destination, FLOW_MODE_CREDIT); + session.messageFlow(destination, CREDIT_UNIT_MESSAGE, 1); + session.messageFlow(destination, CREDIT_UNIT_BYTE, 0xFFFFFFFF); + } + } + if (getImpl(message, timeout)) { + return true; + } else { + qpid::client::Session s; + { + sys::Mutex::ScopedLock l(lock); + if (state == CANCELLED) return false; // Might have been closed during get. + s = sync(session); + } + s.messageFlush(destination); + { + sys::Mutex::ScopedLock l(lock); + startFlow(l); //reallocate credit + session.sendCompletion();//ensure previously received messages are signalled as completed + } + return getImpl(message, Duration::IMMEDIATE); + } +} + +void ReceiverImpl::closeImpl() +{ + sys::Mutex::ScopedLock l(lock); + if (state != CANCELLED) { + state = CANCELLED; + sync(session).messageStop(destination); + { + sys::Mutex::ScopedUnlock l(lock); + parent->releasePending(destination); + } + source->cancel(session, destination); + { + sys::Mutex::ScopedUnlock l(lock); + parent->receiverCancelled(destination); + } + } +} + +bool ReceiverImpl::isClosed() const { + sys::Mutex::ScopedLock l(lock); + return state == CANCELLED; +} + +void ReceiverImpl::setCapacityImpl(uint32_t c) +{ + sys::Mutex::ScopedLock l(lock); + if (c != capacity) { + capacity = c; + if (state == STARTED) { + session.messageStop(destination); + startFlow(l); + } + } +} + +qpid::messaging::Session ReceiverImpl::getSession() const +{ + return qpid::messaging::Session(parent.get()); +} + +}}} // namespace qpid::client::amqp0_10 diff --git a/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h b/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h new file mode 100644 index 0000000000..0d3366907b --- /dev/null +++ b/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h @@ -0,0 +1,152 @@ +#ifndef QPID_CLIENT_AMQP0_10_RECEIVERIMPL_H +#define QPID_CLIENT_AMQP0_10_RECEIVERIMPL_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/messaging/Address.h" +#include "qpid/messaging/Message.h" +#include "qpid/messaging/ReceiverImpl.h" +#include "qpid/client/AsyncSession.h" +#include "qpid/client/amqp0_10/SessionImpl.h" +#include "qpid/messaging/Duration.h" +#include "qpid/sys/Mutex.h" +#include <boost/intrusive_ptr.hpp> +#include <memory> + +namespace qpid { +namespace client { +namespace amqp0_10 { + +class AddressResolution; +class MessageSource; + +/** + * A receiver implementation based on an AMQP 0-10 subscription. + */ +class ReceiverImpl : public qpid::messaging::ReceiverImpl +{ + public: + + enum State {UNRESOLVED, STOPPED, STARTED, CANCELLED}; + + ReceiverImpl(SessionImpl& parent, const std::string& name, + const qpid::messaging::Address& address, bool autoDecode); + + void init(qpid::client::AsyncSession session, AddressResolution& resolver); + bool get(qpid::messaging::Message& message, qpid::messaging::Duration timeout); + qpid::messaging::Message get(qpid::messaging::Duration timeout); + bool fetch(qpid::messaging::Message& message, qpid::messaging::Duration timeout); + qpid::messaging::Message fetch(qpid::messaging::Duration timeout); + void close(); + void start(); + void stop(); + const std::string& getName() const; + void setCapacity(uint32_t); + uint32_t getCapacity(); + uint32_t getAvailable(); + uint32_t getUnsettled(); + void received(qpid::messaging::Message& message); + qpid::messaging::Session getSession() const; + bool isClosed() const; + qpid::messaging::Address getAddress() const; + + private: + mutable sys::Mutex lock; + boost::intrusive_ptr<SessionImpl> parent; + const std::string destination; + const qpid::messaging::Address address; + const uint32_t byteCredit; + const bool autoDecode; + State state; + + std::auto_ptr<MessageSource> source; + uint32_t capacity; + qpid::client::AsyncSession session; + uint32_t window; + + void startFlow(const sys::Mutex::ScopedLock&); // Dummy param, call with lock held + //implementation of public facing methods + bool fetchImpl(qpid::messaging::Message& message, qpid::messaging::Duration timeout); + bool getImpl(qpid::messaging::Message& message, qpid::messaging::Duration timeout); + void closeImpl(); + void setCapacityImpl(uint32_t); + + //functors for public facing methods. + struct Command + { + ReceiverImpl& impl; + + Command(ReceiverImpl& i) : impl(i) {} + }; + + struct Get : Command + { + qpid::messaging::Message& message; + qpid::messaging::Duration timeout; + bool result; + + Get(ReceiverImpl& i, qpid::messaging::Message& m, qpid::messaging::Duration t) : + Command(i), message(m), timeout(t), result(false) {} + void operator()() { result = impl.getImpl(message, timeout); } + }; + + struct Fetch : Command + { + qpid::messaging::Message& message; + qpid::messaging::Duration timeout; + bool result; + + Fetch(ReceiverImpl& i, qpid::messaging::Message& m, qpid::messaging::Duration t) : + Command(i), message(m), timeout(t), result(false) {} + void operator()() { result = impl.fetchImpl(message, timeout); } + }; + + struct Close : Command + { + Close(ReceiverImpl& i) : Command(i) {} + void operator()() { impl.closeImpl(); } + }; + + struct SetCapacity : Command + { + uint32_t capacity; + + SetCapacity(ReceiverImpl& i, uint32_t c) : Command(i), capacity(c) {} + void operator()() { impl.setCapacityImpl(capacity); } + }; + + //helper templates for some common patterns + template <class F> void execute() + { + F f(*this); + parent->execute(f); + } + + template <class F, class P> void execute1(P p) + { + F f(*this, p); + parent->execute(f); + } +}; + +}}} // namespace qpid::client::amqp0_10 + +#endif /*!QPID_CLIENT_AMQP0_10_RECEIVERIMPL_H*/ diff --git a/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp new file mode 100644 index 0000000000..7575aaa306 --- /dev/null +++ b/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp @@ -0,0 +1,206 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "SenderImpl.h" +#include "MessageSink.h" +#include "SessionImpl.h" +#include "AddressResolution.h" +#include "OutgoingMessage.h" +#include "qpid/messaging/Session.h" + +namespace qpid { +namespace client { +namespace amqp0_10 { + +SenderImpl::SenderImpl(SessionImpl& _parent, const std::string& _name, + const qpid::messaging::Address& _address, bool _autoReconnect) : + parent(&_parent), autoReconnect(_autoReconnect), name(_name), address(_address), state(UNRESOLVED), + capacity(50), window(0), flushed(false), unreliable(AddressResolution::is_unreliable(address)) {} + +qpid::messaging::Address SenderImpl::getAddress() const +{ + return address; +} + +void SenderImpl::send(const qpid::messaging::Message& message, bool sync) +{ + if (unreliable) { // immutable, don't need lock + UnreliableSend f(*this, message); + parent->execute(f); + } else { + Send f(*this, message); + while (f.repeat) parent->execute(f); + } + if (sync) parent->sync(true); +} + +void SenderImpl::close() +{ + execute<Close>(); +} + +void SenderImpl::setCapacity(uint32_t c) +{ + bool flush; + { + sys::Mutex::ScopedLock l(lock); + flush = c < capacity; + capacity = c; + } + execute1<CheckPendingSends>(flush); +} + +uint32_t SenderImpl::getCapacity() { + sys::Mutex::ScopedLock l(lock); + return capacity; +} + +uint32_t SenderImpl::getUnsettled() +{ + CheckPendingSends f(*this, false); + parent->execute(f); + return f.pending; +} + +void SenderImpl::init(qpid::client::AsyncSession s, AddressResolution& resolver) +{ + sys::Mutex::ScopedLock l(lock); + session = s; + if (state == UNRESOLVED) { + sink = resolver.resolveSink(session, address); + state = ACTIVE; + } + if (state == CANCELLED) { + sink->cancel(session, name); + sys::Mutex::ScopedUnlock u(lock); + parent->senderCancelled(name); + } else { + sink->declare(session, name); + replay(l); + } +} + +void SenderImpl::waitForCapacity() +{ + sys::Mutex::ScopedLock l(lock); + try { + //TODO: add option to throw exception rather than blocking? + if (!unreliable && capacity <= + (flushed ? checkPendingSends(false, l) : outgoing.size())) + { + //Initial implementation is very basic. As outgoing is + //currently only reduced on receiving completions and we are + //blocking anyway we may as well sync(). If successful that + //should clear all outstanding sends. + session.sync(); + checkPendingSends(false, l); + } + //flush periodically and check for conmpleted sends + if (++window > (capacity / 4)) {//TODO: make this configurable? + checkPendingSends(true, l); + window = 0; + } + } catch (const qpid::TransportFailure&) { + //Disconnection prevents flushing or syncing. If we have any + //capacity we will return anyway (the subsequent attempt to + //send will fail, but message will be on replay buffer). + if (capacity > outgoing.size()) return; + //If we are out of capacity, but autoreconnect is on, then + //rethrow the transport failure to trigger reconnect which + //will have the effect of blocking until connected and + //capacity is freed up + if (autoReconnect) throw; + //Otherwise, in order to clearly signal to the application + //that the message was not pushed to replay buffer, throw an + //out of capacity error + throw qpid::messaging::OutOfCapacity(name); + } +} + +void SenderImpl::sendImpl(const qpid::messaging::Message& m) +{ + sys::Mutex::ScopedLock l(lock); + std::auto_ptr<OutgoingMessage> msg(new OutgoingMessage()); + msg->setSubject(m.getSubject().empty() ? address.getSubject() : m.getSubject()); + msg->convert(m); + outgoing.push_back(msg.release()); + sink->send(session, name, outgoing.back()); +} + +void SenderImpl::sendUnreliable(const qpid::messaging::Message& m) +{ + sys::Mutex::ScopedLock l(lock); + OutgoingMessage msg; + msg.setSubject(m.getSubject().empty() ? address.getSubject() : m.getSubject()); + msg.convert(m); + sink->send(session, name, msg); +} + +void SenderImpl::replay(const sys::Mutex::ScopedLock& l) +{ + checkPendingSends(false, l); + for (OutgoingMessages::iterator i = outgoing.begin(); i != outgoing.end(); ++i) { + i->markRedelivered(); + sink->send(session, name, *i); + } +} + +uint32_t SenderImpl::checkPendingSends(bool flush) { + sys::Mutex::ScopedLock l(lock); + return checkPendingSends(flush, l); +} + +uint32_t SenderImpl::checkPendingSends(bool flush, const sys::Mutex::ScopedLock&) +{ + if (flush) { + session.flush(); + flushed = true; + } else { + flushed = false; + } + while (!outgoing.empty() && outgoing.front().isComplete()) { + outgoing.pop_front(); + } + return outgoing.size(); +} + +void SenderImpl::closeImpl() +{ + { + sys::Mutex::ScopedLock l(lock); + state = CANCELLED; + sink->cancel(session, name); + } + parent->senderCancelled(name); +} + +const std::string& SenderImpl::getName() const +{ + sys::Mutex::ScopedLock l(lock); + return name; +} + +qpid::messaging::Session SenderImpl::getSession() const +{ + sys::Mutex::ScopedLock l(lock); + return qpid::messaging::Session(parent.get()); +} + +}}} // namespace qpid::client::amqp0_10 diff --git a/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h b/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h new file mode 100644 index 0000000000..35ce82cf5d --- /dev/null +++ b/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h @@ -0,0 +1,162 @@ +#ifndef QPID_CLIENT_AMQP0_10_SENDERIMPL_H +#define QPID_CLIENT_AMQP0_10_SENDERIMPL_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/messaging/Address.h" +#include "qpid/messaging/Message.h" +#include "qpid/messaging/SenderImpl.h" +#include "qpid/client/AsyncSession.h" +#include "qpid/client/amqp0_10/SessionImpl.h" +#include <memory> +#include <boost/intrusive_ptr.hpp> +#include <boost/ptr_container/ptr_deque.hpp> + +namespace qpid { +namespace client { +namespace amqp0_10 { + +class AddressResolution; +class MessageSink; +class OutgoingMessage; + +/** + * + */ +class SenderImpl : public qpid::messaging::SenderImpl +{ + public: + enum State {UNRESOLVED, ACTIVE, CANCELLED}; + + SenderImpl(SessionImpl& parent, const std::string& name, + const qpid::messaging::Address& address, bool autoReconnect); + void send(const qpid::messaging::Message&, bool sync); + void close(); + void setCapacity(uint32_t); + uint32_t getCapacity(); + uint32_t getUnsettled(); + void init(qpid::client::AsyncSession, AddressResolution&); + const std::string& getName() const; + qpid::messaging::Session getSession() const; + qpid::messaging::Address getAddress() const; + + private: + mutable sys::Mutex lock; + boost::intrusive_ptr<SessionImpl> parent; + const bool autoReconnect; + const std::string name; + const qpid::messaging::Address address; + State state; + std::auto_ptr<MessageSink> sink; + + qpid::client::AsyncSession session; + std::string destination; + std::string routingKey; + + typedef boost::ptr_deque<OutgoingMessage> OutgoingMessages; + OutgoingMessages outgoing; + uint32_t capacity; + uint32_t window; + bool flushed; + const bool unreliable; + + uint32_t checkPendingSends(bool flush); + // Dummy ScopedLock parameter means call with lock held + uint32_t checkPendingSends(bool flush, const sys::Mutex::ScopedLock&); + void replay(const sys::Mutex::ScopedLock&); + void waitForCapacity(); + + //logic for application visible methods: + void sendImpl(const qpid::messaging::Message&); + void sendUnreliable(const qpid::messaging::Message&); + void closeImpl(); + + + //functors for application visible methods (allowing locking and + //retry to be centralised): + struct Command + { + SenderImpl& impl; + + Command(SenderImpl& i) : impl(i) {} + }; + + struct Send : Command + { + const qpid::messaging::Message& message; + bool repeat; + + Send(SenderImpl& i, const qpid::messaging::Message& m) : Command(i), message(m), repeat(true) {} + void operator()() + { + impl.waitForCapacity(); + //from this point message will be recorded if there is any + //failure (and replayed) so need not repeat the call + repeat = false; + impl.sendImpl(message); + } + }; + + struct UnreliableSend : Command + { + const qpid::messaging::Message& message; + + UnreliableSend(SenderImpl& i, const qpid::messaging::Message& m) : Command(i), message(m) {} + void operator()() + { + //TODO: ideally want to put messages on the outbound + //queue and pull them off in io thread, but the old + //0-10 client doesn't support that option so for now + //we simply don't queue unreliable messages + impl.sendUnreliable(message); + } + }; + + struct Close : Command + { + Close(SenderImpl& i) : Command(i) {} + void operator()() { impl.closeImpl(); } + }; + + struct CheckPendingSends : Command + { + bool flush; + uint32_t pending; + CheckPendingSends(SenderImpl& i, bool f) : Command(i), flush(f), pending(0) {} + void operator()() { pending = impl.checkPendingSends(flush); } + }; + + //helper templates for some common patterns + template <class F> void execute() + { + F f(*this); + parent->execute(f); + } + + template <class F, class P> bool execute1(P p) + { + F f(*this, p); + return parent->execute(f); + } +}; +}}} // namespace qpid::client::amqp0_10 + +#endif /*!QPID_CLIENT_AMQP0_10_SENDERIMPL_H*/ diff --git a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp new file mode 100644 index 0000000000..1e2b68b24e --- /dev/null +++ b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp @@ -0,0 +1,606 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/client/amqp0_10/SessionImpl.h" +#include "qpid/client/amqp0_10/ConnectionImpl.h" +#include "qpid/client/amqp0_10/ReceiverImpl.h" +#include "qpid/client/amqp0_10/SenderImpl.h" +#include "qpid/client/amqp0_10/MessageSource.h" +#include "qpid/client/amqp0_10/MessageSink.h" +#include "qpid/client/SessionBase_0_10Access.h" +#include "qpid/client/SessionImpl.h" +#include "qpid/messaging/PrivateImplRef.h" +#include "qpid/Exception.h" +#include "qpid/log/Statement.h" +#include "qpid/messaging/Address.h" +#include "qpid/messaging/Connection.h" +#include "qpid/messaging/Message.h" +#include "qpid/messaging/MessageImpl.h" +#include "qpid/messaging/Sender.h" +#include "qpid/messaging/Receiver.h" +#include "qpid/messaging/Session.h" +#include "qpid/framing/enum.h" +#include <boost/format.hpp> +#include <boost/function.hpp> +#include <boost/intrusive_ptr.hpp> + +using qpid::messaging::KeyError; +using qpid::messaging::NoMessageAvailable; +using qpid::messaging::MessagingException; +using qpid::messaging::TransactionError; +using qpid::messaging::TransactionAborted; +using qpid::messaging::TransactionUnknown; +using qpid::messaging::SessionError; +using qpid::messaging::MessageImplAccess; +using qpid::messaging::Sender; +using qpid::messaging::Receiver; + +namespace qpid { +namespace client { +namespace amqp0_10 { + +typedef qpid::sys::Mutex::ScopedLock ScopedLock; +typedef qpid::sys::Mutex::ScopedUnlock ScopedUnlock; + +SessionImpl::SessionImpl(ConnectionImpl& c, bool t) : + connection(&c), transactional(t), committing(false) {} + +bool SessionImpl::isTransactional() const +{ + return transactional; +} + +void SessionImpl::checkError() +{ + ScopedLock l(lock); + txError.raise(); + qpid::client::SessionBase_0_10Access s(session); + try { + s.get()->assertOpen(); + } catch (const qpid::TransportFailure&) { + throw qpid::messaging::TransportFailure(std::string()); + } catch (const qpid::framing::ResourceLimitExceededException& e) { + throw qpid::messaging::TargetCapacityExceeded(e.what()); + } catch (const qpid::framing::UnauthorizedAccessException& e) { + throw qpid::messaging::UnauthorizedAccess(e.what()); + } catch (const qpid::framing::NotFoundException& e) { + throw qpid::messaging::NotFound(e.what()); + } catch (const qpid::framing::ResourceDeletedException& e) { + throw qpid::messaging::NotFound(e.what()); + } catch (const qpid::SessionException& e) { + throw qpid::messaging::SessionError(e.what()); + } catch (const qpid::ConnectionException& e) { + throw qpid::messaging::ConnectionError(e.what()); + } catch (const qpid::Exception& e) { + throw qpid::messaging::MessagingException(e.what()); + } +} + +bool SessionImpl::hasError() +{ + ScopedLock l(lock); + qpid::client::SessionBase_0_10Access s(session); + return s.get()->hasError(); +} + +void SessionImpl::sync(bool block) +{ + if (block) retry<Sync>(); + else execute<NonBlockingSync>(); +} + +namespace { +struct ScopedSet { + bool& flag; + ScopedSet(bool& f) : flag(f) { flag = true; } + ~ScopedSet() { flag = false; } +}; +} + +void SessionImpl::commit() +{ + try { + checkError(); + ScopedSet s(committing); + execute<Commit>(); + } + catch (const TransactionError&) { + assert(txError); // Must be set by thrower of TransactionError + } + catch (const std::exception& e) { + txError = new TransactionAborted(Msg() << "Transaction aborted: " << e.what()); + } + checkError(); +} + +void SessionImpl::rollback() +{ + //If the session fails during this operation, the transaction will + //be rolled back anyway. + execute<Rollback>(); +} + +void SessionImpl::acknowledge(bool sync_) +{ + //Should probably throw an exception on failure here, or indicate + //it through a return type at least. Failure means that the + //message may be redelivered; i.e. the application cannot delete + //any state necessary for preventing reprocessing of the message + execute<Acknowledge>(); + sync(sync_); +} + +void SessionImpl::reject(qpid::messaging::Message& m) +{ + //Possibly want to somehow indicate failure here as well. Less + //clear need as compared to acknowledge however. + execute1<Reject>(m); +} + +void SessionImpl::release(qpid::messaging::Message& m) +{ + execute1<Release>(m); +} + +void SessionImpl::acknowledge(qpid::messaging::Message& m, bool cumulative) +{ + //Should probably throw an exception on failure here, or indicate + //it through a return type at least. Failure means that the + //message may be redelivered; i.e. the application cannot delete + //any state necessary for preventing reprocessing of the message + Acknowledge2 ack(*this, m, cumulative); + execute(ack); +} + +void SessionImpl::close() +{ + if (hasError()) { + ScopedLock l(lock); + senders.clear(); + receivers.clear(); + } else { + Senders sCopy; + Receivers rCopy; + { + ScopedLock l(lock); + senders.swap(sCopy); + receivers.swap(rCopy); + } + for (Senders::iterator i = sCopy.begin(); i != sCopy.end(); ++i) + { + // outside the lock, will call senderCancelled + i->second.close(); + } + for (Receivers::iterator i = rCopy.begin(); i != rCopy.end(); ++i) + { + // outside the lock, will call receiverCancelled + i->second.close(); + } + } + connection->closed(*this); + if (!hasError()) { + ScopedLock l(lock); + session.close(); + } +} + +template <class T, class S> boost::intrusive_ptr<S> getImplPtr(T& t) +{ + return boost::dynamic_pointer_cast<S>(qpid::messaging::PrivateImplRef<T>::get(t)); +} + +template <class T> void getFreeKey(std::string& key, T& map) +{ + std::string name = key; + int count = 1; + for (typename T::const_iterator i = map.find(name); i != map.end(); i = map.find(name)) { + name = (boost::format("%1%_%2%") % key % ++count).str(); + } + key = name; +} + +void SessionImpl::setSession(qpid::client::Session s) +{ + session = s; + incoming.setSession(session); + if (transactional) { + session.txSelect(); + } + for (Receivers::iterator i = receivers.begin(); i != receivers.end(); ++i) { + getImplPtr<Receiver, ReceiverImpl>(i->second)->init(session, resolver); + } + for (Senders::iterator i = senders.begin(); i != senders.end(); ++i) { + getImplPtr<Sender, SenderImpl>(i->second)->init(session, resolver); + } + session.sync(); +} + +struct SessionImpl::CreateReceiver : Command +{ + qpid::messaging::Receiver result; + const qpid::messaging::Address& address; + + CreateReceiver(SessionImpl& i, const qpid::messaging::Address& a) : + Command(i), address(a) {} + void operator()() { result = impl.createReceiverImpl(address); } +}; + +Receiver SessionImpl::createReceiver(const qpid::messaging::Address& address) +{ + return get1<CreateReceiver, Receiver>(address); +} + +Receiver SessionImpl::createReceiverImpl(const qpid::messaging::Address& address) +{ + ScopedLock l(lock); + std::string name = address.getName(); + getFreeKey(name, receivers); + Receiver receiver(new ReceiverImpl(*this, name, address, connection->getAutoDecode())); + getImplPtr<Receiver, ReceiverImpl>(receiver)->init(session, resolver); + receivers[name] = receiver; + return receiver; +} + +struct SessionImpl::CreateSender : Command +{ + qpid::messaging::Sender result; + const qpid::messaging::Address& address; + + CreateSender(SessionImpl& i, const qpid::messaging::Address& a) : + Command(i), address(a) {} + void operator()() { result = impl.createSenderImpl(address); } +}; + +Sender SessionImpl::createSender(const qpid::messaging::Address& address) +{ + return get1<CreateSender, Sender>(address); +} + +Sender SessionImpl::createSenderImpl(const qpid::messaging::Address& address) +{ + ScopedLock l(lock); + std::string name = address.getName(); + getFreeKey(name, senders); + Sender sender(new SenderImpl(*this, name, address, connection->getAutoReconnect())); + getImplPtr<Sender, SenderImpl>(sender)->init(session, resolver); + senders[name] = sender; + return sender; +} + +Sender SessionImpl::getSender(const std::string& name) const +{ + qpid::sys::Mutex::ScopedLock l(lock); + Senders::const_iterator i = senders.find(name); + if (i == senders.end()) { + throw KeyError(name); + } else { + return i->second; + } +} + +Receiver SessionImpl::getReceiver(const std::string& name) const +{ + qpid::sys::Mutex::ScopedLock l(lock); + Receivers::const_iterator i = receivers.find(name); + if (i == receivers.end()) { + throw KeyError(name); + } else { + return i->second; + } +} + +SessionImpl& SessionImpl::convert(qpid::messaging::Session& s) +{ + boost::intrusive_ptr<SessionImpl> impl = getImplPtr<qpid::messaging::Session, SessionImpl>(s); + if (!impl) { + throw SessionError(QPID_MSG("Configuration error; require qpid::client::amqp0_10::SessionImpl")); + } + return *impl; +} + +namespace { + +struct IncomingMessageHandler : IncomingMessages::Handler +{ + typedef boost::function1<bool, IncomingMessages::MessageTransfer&> Callback; + Callback callback; + ReceiverImpl* receiver; + + IncomingMessageHandler(Callback c) : callback(c), receiver(0) {} + + bool accept(IncomingMessages::MessageTransfer& transfer) + { + return callback(transfer); + } + + bool isClosed() + { + return receiver && receiver->isClosed(); + } +}; + +} + + +bool SessionImpl::getNextReceiver(Receiver* receiver, IncomingMessages::MessageTransfer& transfer) +{ + ScopedLock l(lock); + Receivers::const_iterator i = receivers.find(transfer.getDestination()); + if (i == receivers.end()) { + QPID_LOG(error, "Received message for unknown destination " << transfer.getDestination()); + return false; + } else { + *receiver = i->second; + return true; + } +} + +bool SessionImpl::accept(ReceiverImpl* receiver, + qpid::messaging::Message* message, + IncomingMessages::MessageTransfer& transfer) +{ + if (receiver->getName() == transfer.getDestination()) { + transfer.retrieve(message); + receiver->received(*message); + return true; + } else { + return false; + } +} + +qpid::sys::Duration adjust(qpid::messaging::Duration timeout) +{ + uint64_t ms = timeout.getMilliseconds(); + if (ms < (uint64_t) (qpid::sys::TIME_INFINITE/qpid::sys::TIME_MSEC)) { + return ms * qpid::sys::TIME_MSEC; + } else { + return qpid::sys::TIME_INFINITE; + } +} + +bool SessionImpl::getIncoming(IncomingMessages::Handler& handler, qpid::messaging::Duration timeout) +{ + return incoming.get(handler, adjust(timeout)); +} + +bool SessionImpl::get(ReceiverImpl& receiver, qpid::messaging::Message& message, qpid::messaging::Duration timeout) +{ + IncomingMessageHandler handler(boost::bind(&SessionImpl::accept, this, &receiver, &message, _1)); + handler.receiver = &receiver; + return getIncoming(handler, timeout); +} + +bool SessionImpl::nextReceiver(qpid::messaging::Receiver& receiver, qpid::messaging::Duration timeout) +{ + while (true) { + txError.raise(); + try { + std::string destination; + if (incoming.getNextDestination(destination, adjust(timeout))) { + qpid::sys::Mutex::ScopedLock l(lock); + Receivers::const_iterator i = receivers.find(destination); + if (i == receivers.end()) { + throw qpid::messaging::ReceiverError(QPID_MSG("Received message for unknown destination " << destination)); + } else { + receiver = i->second; + } + return true; + } else { + return false; + } + } catch (TransportFailure&) { + reconnect(); + } catch (const qpid::framing::ResourceLimitExceededException& e) { + if (backoff()) return false; + else throw qpid::messaging::TargetCapacityExceeded(e.what()); + } catch (const qpid::SessionException& e) { + rethrow(e); + } catch (const qpid::ClosedException&) { + throw qpid::messaging::SessionClosed(); + } catch (const qpid::ConnectionException& e) { + throw qpid::messaging::ConnectionError(e.what()); + } catch (const qpid::ChannelException& e) { + throw qpid::messaging::MessagingException(e.what()); + } + } +} + +qpid::messaging::Receiver SessionImpl::nextReceiver(qpid::messaging::Duration timeout) +{ + qpid::messaging::Receiver receiver; + if (!nextReceiver(receiver, timeout)) throw NoMessageAvailable(); + if (!receiver) throw SessionError("Bad receiver returned!"); + return receiver; +} + +uint32_t SessionImpl::getReceivable() +{ + return get1<Receivable, uint32_t>((const std::string*) 0); +} +uint32_t SessionImpl::getReceivable(const std::string& destination) +{ + return get1<Receivable, uint32_t>(&destination); +} + +struct SessionImpl::Receivable : Command +{ + const std::string* destination; + uint32_t result; + + Receivable(SessionImpl& i, const std::string* d) : Command(i), destination(d), result(0) {} + void operator()() { result = impl.getReceivableImpl(destination); } +}; + +uint32_t SessionImpl::getReceivableImpl(const std::string* destination) +{ + ScopedLock l(lock); + if (destination) { + return incoming.available(*destination); + } else { + return incoming.available(); + } +} + +uint32_t SessionImpl::getUnsettledAcks() +{ + return get1<UnsettledAcks, uint32_t>((const std::string*) 0); +} + +uint32_t SessionImpl::getUnsettledAcks(const std::string& destination) +{ + return get1<UnsettledAcks, uint32_t>(&destination); +} + +struct SessionImpl::UnsettledAcks : Command +{ + const std::string* destination; + uint32_t result; + + UnsettledAcks(SessionImpl& i, const std::string* d) : Command(i), destination(d), result(0) {} + void operator()() { result = impl.getUnsettledAcksImpl(destination); } +}; + +uint32_t SessionImpl::getUnsettledAcksImpl(const std::string* destination) +{ + ScopedLock l(lock); + if (destination) { + return incoming.pendingAccept(*destination); + } else { + return incoming.pendingAccept(); + } +} + +void SessionImpl::syncImpl(bool block) +{ + { + ScopedLock l(lock); + if (block) session.sync(); + else session.flush(); + } + //cleanup unconfirmed accept records: + incoming.pendingAccept(); +} + +void SessionImpl::commitImpl() +{ + ScopedLock l(lock); + incoming.accept(); + session.txCommit(); +} + +void SessionImpl::rollbackImpl() +{ + ScopedLock l(lock); + for (Receivers::iterator i = receivers.begin(); i != receivers.end(); ++i) { + getImplPtr<Receiver, ReceiverImpl>(i->second)->stop(); + } + //ensure that stop has been processed and all previously sent + //messages are available for release: + session.sync(); + incoming.releaseAll(); + session.txRollback(); + + for (Receivers::iterator i = receivers.begin(); i != receivers.end(); ++i) { + getImplPtr<Receiver, ReceiverImpl>(i->second)->start(); + } +} + +void SessionImpl::acknowledgeImpl() +{ + if (!transactional) incoming.accept(); +} + +void SessionImpl::acknowledgeImpl(qpid::messaging::Message& m, bool cumulative) +{ + if (!transactional) incoming.accept(MessageImplAccess::get(m).getInternalId(), cumulative); +} + +void SessionImpl::rejectImpl(qpid::messaging::Message& m) +{ + SequenceSet set; + set.add(MessageImplAccess::get(m).getInternalId()); + session.messageReject(set); +} + +void SessionImpl::releaseImpl(qpid::messaging::Message& m) +{ + SequenceSet set; + set.add(MessageImplAccess::get(m).getInternalId()); + session.messageRelease(set, true); +} + +void SessionImpl::receiverCancelled(const std::string& name) +{ + { + ScopedLock l(lock); + receivers.erase(name); + session.sync(); + incoming.releasePending(name); + } + incoming.wakeup(); +} + +void SessionImpl::releasePending(const std::string& name) +{ + ScopedLock l(lock); + incoming.releasePending(name); +} + +void SessionImpl::senderCancelled(const std::string& name) +{ + ScopedLock l(lock); + senders.erase(name); +} + +void SessionImpl::reconnect() +{ + if (transactional) { + if (committing) + txError = new TransactionUnknown("Transaction outcome unknown: transport failure"); + else + txError = new TransactionAborted("Transaction aborted: transport failure"); + txError.raise(); + } + connection->reopen(); +} + +bool SessionImpl::backoff() +{ + return connection->backoff(); +} + +qpid::messaging::Connection SessionImpl::getConnection() const +{ + return qpid::messaging::Connection(connection.get()); +} + +void SessionImpl::rethrow(const qpid::SessionException& e) { + switch (e.code) { + case framing::execution::ERROR_CODE_NOT_ALLOWED: + case framing::execution::ERROR_CODE_UNAUTHORIZED_ACCESS: throw messaging::UnauthorizedAccess(e.what()); + + case framing::execution::ERROR_CODE_NOT_FOUND: + case framing::execution::ERROR_CODE_RESOURCE_DELETED: throw messaging::NotFound(e.what()); + + default: throw SessionError(e.what()); + } +} + +}}} // namespace qpid::client::amqp0_10 diff --git a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h new file mode 100644 index 0000000000..2bb72aa877 --- /dev/null +++ b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h @@ -0,0 +1,259 @@ +#ifndef QPID_CLIENT_AMQP0_10_SESSIONIMPL_H +#define QPID_CLIENT_AMQP0_10_SESSIONIMPL_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/messaging/SessionImpl.h" +#include "qpid/messaging/Duration.h" +#include "qpid/messaging/exceptions.h" +#include "qpid/client/Session.h" +#include "qpid/client/SubscriptionManager.h" +#include "qpid/client/amqp0_10/AddressResolution.h" +#include "qpid/client/amqp0_10/IncomingMessages.h" +#include "qpid/sys/Mutex.h" +#include "qpid/framing/reply_exceptions.h" +#include "qpid/sys/ExceptionHolder.h" +#include <boost/intrusive_ptr.hpp> + +namespace qpid { + +namespace messaging { +class Address; +class Connection; +class Message; +class Receiver; +class Sender; +class Session; +} + +namespace client { +namespace amqp0_10 { + +class ConnectionImpl; +class ReceiverImpl; +class SenderImpl; + +/** + * Implementation of the protocol independent Session interface using + * AMQP 0-10. + */ +class SessionImpl : public qpid::messaging::SessionImpl +{ + public: + SessionImpl(ConnectionImpl&, bool transactional); + void commit(); + void rollback(); + void acknowledge(bool sync); + void reject(qpid::messaging::Message&); + void release(qpid::messaging::Message&); + void acknowledge(qpid::messaging::Message& msg, bool cumulative); + void close(); + void sync(bool block); + qpid::messaging::Sender createSender(const qpid::messaging::Address& address); + qpid::messaging::Receiver createReceiver(const qpid::messaging::Address& address); + + qpid::messaging::Sender getSender(const std::string& name) const; + qpid::messaging::Receiver getReceiver(const std::string& name) const; + + bool nextReceiver(qpid::messaging::Receiver& receiver, qpid::messaging::Duration timeout); + qpid::messaging::Receiver nextReceiver(qpid::messaging::Duration timeout); + + qpid::messaging::Connection getConnection() const; + void checkError(); + bool hasError(); + bool isTransactional() const; + + bool get(ReceiverImpl& receiver, qpid::messaging::Message& message, qpid::messaging::Duration timeout); + + void releasePending(const std::string& destination); + void receiverCancelled(const std::string& name); + void senderCancelled(const std::string& name); + + uint32_t getReceivable(); + uint32_t getReceivable(const std::string& destination); + + uint32_t getUnsettledAcks(); + uint32_t getUnsettledAcks(const std::string& destination); + + void setSession(qpid::client::Session); + + template <class T> bool execute(T& f) + { + try { + txError.raise(); + f(); + return true; + } catch (const qpid::TransportFailure&) { + reconnect(); + return false; + } catch (const qpid::framing::ResourceLimitExceededException& e) { + if (backoff()) return false; + else throw qpid::messaging::TargetCapacityExceeded(e.what()); + } catch (const qpid::framing::UnauthorizedAccessException& e) { + throw qpid::messaging::UnauthorizedAccess(e.what()); + } catch (const qpid::framing::NotFoundException& e) { + throw qpid::messaging::NotFound(e.what()); + } catch (const qpid::framing::ResourceDeletedException& e) { + throw qpid::messaging::NotFound(e.what()); + } catch (const qpid::SessionException& e) { + rethrow(e); + return false; // Keep the compiler happy + } catch (const qpid::ConnectionException& e) { + throw qpid::messaging::ConnectionError(e.what()); + } catch (const qpid::ChannelException& e) { + throw qpid::messaging::MessagingException(e.what()); + } + } + + static SessionImpl& convert(qpid::messaging::Session&); + static void rethrow(const qpid::SessionException&); + + private: + typedef std::map<std::string, qpid::messaging::Receiver> Receivers; + typedef std::map<std::string, qpid::messaging::Sender> Senders; + + mutable qpid::sys::Mutex lock; + boost::intrusive_ptr<ConnectionImpl> connection; + qpid::client::Session session; + AddressResolution resolver; + IncomingMessages incoming; + Receivers receivers; + Senders senders; + const bool transactional; + bool committing; + sys::ExceptionHolder txError; + + bool accept(ReceiverImpl*, qpid::messaging::Message*, IncomingMessages::MessageTransfer&); + bool getIncoming(IncomingMessages::Handler& handler, qpid::messaging::Duration timeout); + bool getNextReceiver(qpid::messaging::Receiver* receiver, IncomingMessages::MessageTransfer& transfer); + void reconnect(); + bool backoff(); + + void commitImpl(); + void rollbackImpl(); + void acknowledgeImpl(); + void acknowledgeImpl(qpid::messaging::Message&, bool cumulative); + void rejectImpl(qpid::messaging::Message&); + void releaseImpl(qpid::messaging::Message&); + void closeImpl(); + void syncImpl(bool block); + qpid::messaging::Sender createSenderImpl(const qpid::messaging::Address& address); + qpid::messaging::Receiver createReceiverImpl(const qpid::messaging::Address& address); + uint32_t getReceivableImpl(const std::string* destination); + uint32_t getUnsettledAcksImpl(const std::string* destination); + + //functors for public facing methods (allows locking and retry + //logic to be centralised) + struct Command + { + SessionImpl& impl; + + Command(SessionImpl& i) : impl(i) {} + }; + + struct Commit : Command + { + Commit(SessionImpl& i) : Command(i) {} + void operator()() { impl.commitImpl(); } + }; + + struct Rollback : Command + { + Rollback(SessionImpl& i) : Command(i) {} + void operator()() { impl.rollbackImpl(); } + }; + + struct Acknowledge : Command + { + Acknowledge(SessionImpl& i) : Command(i) {} + void operator()() { impl.acknowledgeImpl(); } + }; + + struct Sync : Command + { + Sync(SessionImpl& i) : Command(i) {} + void operator()() { impl.syncImpl(true); } + }; + + struct NonBlockingSync : Command + { + NonBlockingSync(SessionImpl& i) : Command(i) {} + void operator()() { impl.syncImpl(false); } + }; + + struct Reject : Command + { + qpid::messaging::Message& message; + + Reject(SessionImpl& i, qpid::messaging::Message& m) : Command(i), message(m) {} + void operator()() { impl.rejectImpl(message); } + }; + + struct Release : Command + { + qpid::messaging::Message& message; + + Release(SessionImpl& i, qpid::messaging::Message& m) : Command(i), message(m) {} + void operator()() { impl.releaseImpl(message); } + }; + + struct Acknowledge2 : Command + { + qpid::messaging::Message& message; + bool cumulative; + + Acknowledge2(SessionImpl& i, qpid::messaging::Message& m, bool c) : Command(i), message(m), cumulative(c) {} + void operator()() { impl.acknowledgeImpl(message, cumulative); } + }; + + struct CreateSender; + struct CreateReceiver; + struct UnsettledAcks; + struct Receivable; + + //helper templates for some common patterns + template <class F> bool execute() + { + F f(*this); + return execute(f); + } + + template <class F> void retry() + { + while (!execute<F>()) {} + } + + template <class F, class P> bool execute1(P p) + { + F f(*this, p); + return execute(f); + } + + template <class F, class R, class P> R get1(P p) + { + F f(*this, p); + while (!execute(f)) {} + return f.result; + } +}; +}}} // namespace qpid::client::amqp0_10 + +#endif /*!QPID_CLIENT_AMQP0_10_SESSIONIMPL_H*/ |