summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/client/amqp0_10
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/client/amqp0_10')
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp153
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.h88
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp1058
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.h64
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp404
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h88
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp466
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h108
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/MessageSink.h52
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/MessageSource.h47
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp170
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.h60
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp263
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h152
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp206
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h162
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp606
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h259
18 files changed, 4406 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp b/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp
new file mode 100644
index 0000000000..d2accddcd0
--- /dev/null
+++ b/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp
@@ -0,0 +1,153 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "AcceptTracker.h"
+
+namespace qpid {
+namespace client {
+namespace amqp0_10 {
+
+void AcceptTracker::State::accept()
+{
+ unconfirmed.add(unaccepted);
+ unaccepted.clear();
+}
+
+SequenceSet AcceptTracker::State::accept(qpid::framing::SequenceNumber id, bool cumulative)
+{
+ SequenceSet accepting;
+ if (cumulative) {
+ for (SequenceSet::iterator i = unaccepted.begin(); i != unaccepted.end() && *i <= id; ++i) {
+ accepting.add(*i);
+ }
+ unconfirmed.add(accepting);
+ unaccepted.remove(accepting);
+ } else {
+ if (unaccepted.contains(id)) {
+ unaccepted.remove(id);
+ unconfirmed.add(id);
+ accepting.add(id);
+ }
+ }
+ return accepting;
+}
+
+void AcceptTracker::State::release()
+{
+ unaccepted.clear();
+}
+
+uint32_t AcceptTracker::State::acceptsPending()
+{
+ return unconfirmed.size();
+}
+
+void AcceptTracker::State::completed(qpid::framing::SequenceSet& set)
+{
+ unconfirmed.remove(set);
+}
+
+void AcceptTracker::delivered(const std::string& destination, const qpid::framing::SequenceNumber& id)
+{
+ aggregateState.unaccepted.add(id);
+ destinationState[destination].unaccepted.add(id);
+}
+
+namespace
+{
+const size_t FLUSH_FREQUENCY = 1024;
+}
+
+void AcceptTracker::addToPending(qpid::client::AsyncSession& session, const Record& record)
+{
+ pending.push_back(record);
+ if (pending.size() % FLUSH_FREQUENCY == 0) session.flush();
+}
+
+
+void AcceptTracker::accept(qpid::client::AsyncSession& session)
+{
+ for (StateMap::iterator i = destinationState.begin(); i != destinationState.end(); ++i) {
+ i->second.accept();
+ }
+ Record record;
+ record.status = session.messageAccept(aggregateState.unaccepted);
+ record.accepted = aggregateState.unaccepted;
+ addToPending(session, record);
+ aggregateState.accept();
+}
+
+void AcceptTracker::accept(qpid::framing::SequenceNumber id, qpid::client::AsyncSession& session, bool cumulative)
+{
+ for (StateMap::iterator i = destinationState.begin(); i != destinationState.end(); ++i) {
+ i->second.accept(id, cumulative);
+ }
+ Record record;
+ record.accepted = aggregateState.accept(id, cumulative);
+ record.status = session.messageAccept(record.accepted);
+ addToPending(session, record);
+}
+
+void AcceptTracker::release(qpid::client::AsyncSession& session)
+{
+ session.messageRelease(aggregateState.unaccepted);
+ for (StateMap::iterator i = destinationState.begin(); i != destinationState.end(); ++i) {
+ i->second.release();
+ }
+ aggregateState.release();
+}
+
+uint32_t AcceptTracker::acceptsPending()
+{
+ checkPending();
+ return aggregateState.acceptsPending();
+}
+
+uint32_t AcceptTracker::acceptsPending(const std::string& destination)
+{
+ checkPending();
+ return destinationState[destination].acceptsPending();
+}
+
+void AcceptTracker::reset()
+{
+ destinationState.clear();
+ aggregateState.unaccepted.clear();
+ aggregateState.unconfirmed.clear();
+ pending.clear();
+}
+
+void AcceptTracker::checkPending()
+{
+ while (!pending.empty() && pending.front().status.isComplete()) {
+ completed(pending.front().accepted);
+ pending.pop_front();
+ }
+}
+
+void AcceptTracker::completed(qpid::framing::SequenceSet& set)
+{
+ for (StateMap::iterator i = destinationState.begin(); i != destinationState.end(); ++i) {
+ i->second.completed(set);
+ }
+ aggregateState.completed(set);
+}
+
+}}} // namespace qpid::client::amqp0_10
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.h b/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.h
new file mode 100644
index 0000000000..85209c3b87
--- /dev/null
+++ b/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.h
@@ -0,0 +1,88 @@
+#ifndef QPID_CLIENT_AMQP0_10_ACCEPTTRACKER_H
+#define QPID_CLIENT_AMQP0_10_ACCEPTTRACKER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/client/AsyncSession.h"
+#include "qpid/client/Completion.h"
+#include "qpid/framing/SequenceNumber.h"
+#include "qpid/framing/SequenceSet.h"
+#include <deque>
+#include <map>
+
+namespace qpid {
+namespace client {
+namespace amqp0_10 {
+
+/**
+ * Tracks the set of messages requiring acceptance, and those for
+ * which an accept has been issued but is yet to be confirmed
+ * complete.
+ */
+class AcceptTracker
+{
+ public:
+ void delivered(const std::string& destination, const qpid::framing::SequenceNumber& id);
+ void accept(qpid::client::AsyncSession&);
+ void accept(qpid::framing::SequenceNumber, qpid::client::AsyncSession&, bool cumulative);
+ void release(qpid::client::AsyncSession&);
+ uint32_t acceptsPending();
+ uint32_t acceptsPending(const std::string& destination);
+ void reset();
+ private:
+ struct State
+ {
+ /**
+ * ids of messages that have been delivered but not yet
+ * accepted
+ */
+ qpid::framing::SequenceSet unaccepted;
+ /**
+ * ids of messages for which an accept has been issued but not
+ * yet confirmed as completed
+ */
+ qpid::framing::SequenceSet unconfirmed;
+
+ void accept();
+ qpid::framing::SequenceSet accept(qpid::framing::SequenceNumber, bool cumulative);
+ void release();
+ uint32_t acceptsPending();
+ void completed(qpid::framing::SequenceSet&);
+ };
+ typedef std::map<std::string, State> StateMap;
+ struct Record
+ {
+ qpid::client::Completion status;
+ qpid::framing::SequenceSet accepted;
+ };
+ typedef std::deque<Record> Records;
+
+ State aggregateState;
+ StateMap destinationState;
+ Records pending;
+
+ void addToPending(qpid::client::AsyncSession&, const Record&);
+ void checkPending();
+ void completed(qpid::framing::SequenceSet&);
+};
+}}} // namespace qpid::client::amqp0_10
+
+#endif /*!QPID_CLIENT_AMQP0_10_ACCEPTTRACKER_H*/
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp b/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
new file mode 100644
index 0000000000..ed931c90fb
--- /dev/null
+++ b/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
@@ -0,0 +1,1058 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/client/amqp0_10/AddressResolution.h"
+#include "qpid/amqp_0_10/Codecs.h"
+#include "qpid/client/amqp0_10/MessageSource.h"
+#include "qpid/client/amqp0_10/MessageSink.h"
+#include "qpid/client/amqp0_10/OutgoingMessage.h"
+#include "qpid/messaging/Address.h"
+#include "qpid/messaging/AddressImpl.h"
+#include "qpid/messaging/Message.h"
+#include "qpid/types/Variant.h"
+#include "qpid/messaging/exceptions.h"
+#include "qpid/log/Statement.h"
+#include "qpid/framing/enum.h"
+#include "qpid/framing/ExchangeBoundResult.h"
+#include "qpid/framing/ExchangeQueryResult.h"
+#include "qpid/framing/FieldTable.h"
+#include "qpid/framing/FieldValue.h"
+#include "qpid/framing/QueueQueryResult.h"
+#include "qpid/framing/ReplyTo.h"
+#include "qpid/framing/reply_exceptions.h"
+#include "qpid/framing/Uuid.h"
+#include <boost/assign.hpp>
+#include <boost/format.hpp>
+
+namespace qpid {
+namespace client {
+namespace amqp0_10 {
+
+using qpid::Exception;
+using qpid::messaging::Address;
+using qpid::messaging::AddressError;
+using qpid::messaging::MalformedAddress;
+using qpid::messaging::ResolutionError;
+using qpid::messaging::NotFound;
+using qpid::messaging::AssertionFailed;
+using qpid::framing::ExchangeBoundResult;
+using qpid::framing::ExchangeQueryResult;
+using qpid::framing::FieldTable;
+using qpid::framing::FieldValue;
+using qpid::framing::QueueQueryResult;
+using qpid::framing::ReplyTo;
+using qpid::framing::Uuid;
+using namespace qpid::types;
+using namespace qpid::framing::message;
+using namespace qpid::amqp_0_10;
+using namespace boost::assign;
+
+class Verifier
+{
+ public:
+ Verifier();
+ void verify(const Address& address) const;
+ private:
+ Variant::Map defined;
+ void verify(const Variant::Map& allowed, const Variant::Map& actual) const;
+};
+
+namespace{
+const Variant EMPTY_VARIANT;
+const FieldTable EMPTY_FIELD_TABLE;
+const Variant::List EMPTY_LIST;
+const std::string EMPTY_STRING;
+
+//policy types
+const std::string CREATE("create");
+const std::string ASSERT("assert");
+const std::string DELETE("delete");
+
+//option names
+const std::string NODE("node");
+const std::string LINK("link");
+const std::string MODE("mode");
+const std::string RELIABILITY("reliability");
+const std::string TIMEOUT("timeout");
+const std::string NAME("name");
+const std::string DURABLE("durable");
+const std::string X_DECLARE("x-declare");
+const std::string X_SUBSCRIBE("x-subscribe");
+const std::string X_BINDINGS("x-bindings");
+const std::string SELECTOR("selector");
+const std::string APACHE_SELECTOR("x-apache-selector");
+const std::string QPID_FILTER("qpid.filter");
+const std::string EXCHANGE("exchange");
+const std::string QUEUE("queue");
+const std::string KEY("key");
+const std::string ARGUMENTS("arguments");
+const std::string ALTERNATE_EXCHANGE("alternate-exchange");
+const std::string TYPE("type");
+const std::string EXCLUSIVE("exclusive");
+const std::string AUTO_DELETE("auto-delete");
+
+//policy values
+const std::string ALWAYS("always");
+const std::string NEVER("never");
+const std::string RECEIVER("receiver");
+const std::string SENDER("sender");
+
+//address types
+const std::string QUEUE_ADDRESS("queue");
+const std::string TOPIC_ADDRESS("topic");
+
+//reliability options:
+const std::string UNRELIABLE("unreliable");
+const std::string AT_MOST_ONCE("at-most-once");
+const std::string AT_LEAST_ONCE("at-least-once");
+const std::string EXACTLY_ONCE("exactly-once");
+
+//receiver modes:
+const std::string BROWSE("browse");
+const std::string CONSUME("consume");
+
+//0-10 exchange types:
+const std::string TOPIC_EXCHANGE("topic");
+const std::string FANOUT_EXCHANGE("fanout");
+const std::string DIRECT_EXCHANGE("direct");
+const std::string HEADERS_EXCHANGE("headers");
+const std::string XML_EXCHANGE("xml");
+const std::string WILDCARD_ANY("#");
+
+//exchange prefixes:
+const std::string PREFIX_AMQ("amq.");
+const std::string PREFIX_QPID("qpid.");
+
+const Verifier verifier;
+
+bool areEquivalent(const FieldValue& a, const FieldValue& b)
+{
+ return ((a == b) || (a.convertsTo<int64_t>() && b.convertsTo<int64_t>() && a.get<int64_t>() == b.get<int64_t>()));
+}
+}
+
+struct Binding
+{
+ Binding(const Variant::Map&);
+ Binding(const std::string& exchange, const std::string& queue, const std::string& key);
+
+ std::string exchange;
+ std::string queue;
+ std::string key;
+ FieldTable arguments;
+};
+
+struct Bindings : std::vector<Binding>
+{
+ void add(const Variant::List& bindings);
+ void setDefaultExchange(const std::string&);
+ void setDefaultQueue(const std::string&);
+ void bind(qpid::client::AsyncSession& session);
+ void unbind(qpid::client::AsyncSession& session);
+ void check(qpid::client::AsyncSession& session);
+};
+
+class Node
+{
+ protected:
+ enum CheckMode {FOR_RECEIVER, FOR_SENDER};
+
+ Node(const Address& address);
+
+ const std::string name;
+ Variant createPolicy;
+ Variant assertPolicy;
+ Variant deletePolicy;
+ Bindings nodeBindings;
+ Bindings linkBindings;
+
+ static bool enabled(const Variant& policy, CheckMode mode);
+ static bool createEnabled(const Address& address, CheckMode mode);
+ static void convert(const Variant& option, FieldTable& arguments);
+ static std::vector<std::string> RECEIVER_MODES;
+ static std::vector<std::string> SENDER_MODES;
+};
+
+
+class Queue : protected Node
+{
+ public:
+ Queue(const Address& address);
+ protected:
+ void checkCreate(qpid::client::AsyncSession&, CheckMode);
+ void checkAssert(qpid::client::AsyncSession&, CheckMode);
+ void checkDelete(qpid::client::AsyncSession&, CheckMode);
+ private:
+ const bool durable;
+ bool autoDelete;
+ bool exclusive;
+ const std::string alternateExchange;
+ FieldTable arguments;
+};
+
+class Exchange : protected Node
+{
+ public:
+ Exchange(const Address& address);
+ protected:
+ void checkCreate(qpid::client::AsyncSession&, CheckMode);
+ void checkAssert(qpid::client::AsyncSession&, CheckMode);
+ void checkDelete(qpid::client::AsyncSession&, CheckMode);
+ bool isReservedName();
+
+ protected:
+ const std::string specifiedType;
+ private:
+ const bool durable;
+ bool autoDelete;
+ const std::string alternateExchange;
+ FieldTable arguments;
+};
+
+class QueueSource : public Queue, public MessageSource
+{
+ public:
+ QueueSource(const Address& address);
+ void subscribe(qpid::client::AsyncSession& session, const std::string& destination);
+ void cancel(qpid::client::AsyncSession& session, const std::string& destination);
+ private:
+ const AcquireMode acquireMode;
+ const AcceptMode acceptMode;
+ bool exclusive;
+ FieldTable options;
+};
+
+class Subscription : public Exchange, public MessageSource
+{
+ public:
+ Subscription(const Address&, const std::string& actualType);
+ void subscribe(qpid::client::AsyncSession& session, const std::string& destination);
+ void cancel(qpid::client::AsyncSession& session, const std::string& destination);
+ private:
+ const std::string queue;
+ const bool durable;
+ const bool reliable;
+ const std::string actualType;
+ const bool exclusiveQueue;
+ const bool autoDeleteQueue;
+ const bool exclusiveSubscription;
+ const std::string alternateExchange;
+ FieldTable queueOptions;
+ FieldTable subscriptionOptions;
+ Bindings bindings;
+
+ void bindSubject(const std::string& subject);
+ void bindAll();
+ void add(const std::string& exchange, const std::string& key);
+ static std::string getSubscriptionName(const std::string& base, const std::string& name);
+};
+
+class ExchangeSink : public Exchange, public MessageSink
+{
+ public:
+ ExchangeSink(const Address& name);
+ void declare(qpid::client::AsyncSession& session, const std::string& name);
+ void send(qpid::client::AsyncSession& session, const std::string& name, OutgoingMessage& message);
+ void cancel(qpid::client::AsyncSession& session, const std::string& name);
+ private:
+};
+
+class QueueSink : public Queue, public MessageSink
+{
+ public:
+ QueueSink(const Address& name);
+ void declare(qpid::client::AsyncSession& session, const std::string& name);
+ void send(qpid::client::AsyncSession& session, const std::string& name, OutgoingMessage& message);
+ void cancel(qpid::client::AsyncSession& session, const std::string& name);
+ private:
+};
+bool isQueue(qpid::client::Session session, const qpid::messaging::Address& address);
+bool isTopic(qpid::client::Session session, const qpid::messaging::Address& address);
+
+bool in(const Variant& value, const std::vector<std::string>& choices)
+{
+ if (!value.isVoid()) {
+ for (std::vector<std::string>::const_iterator i = choices.begin(); i != choices.end(); ++i) {
+ if (value.asString() == *i) return true;
+ }
+ }
+ return false;
+}
+
+const Variant& getOption(const Variant::Map& options, const std::string& name)
+{
+ Variant::Map::const_iterator j = options.find(name);
+ if (j == options.end()) {
+ return EMPTY_VARIANT;
+ } else {
+ return j->second;
+ }
+}
+
+const Variant& getOption(const Address& address, const std::string& name)
+{
+ return getOption(address.getOptions(), name);
+}
+
+bool getReceiverPolicy(const Address& address, const std::string& key)
+{
+ return in(getOption(address, key), list_of<std::string>(ALWAYS)(RECEIVER));
+}
+
+bool getSenderPolicy(const Address& address, const std::string& key)
+{
+ return in(getOption(address, key), list_of<std::string>(ALWAYS)(SENDER));
+}
+
+struct Opt
+{
+ Opt(const Address& address);
+ Opt(const Variant::Map& base);
+ Opt& operator/(const std::string& name);
+ operator bool() const;
+ operator std::string() const;
+ std::string str() const;
+ bool asBool(bool defaultValue) const;
+ const Variant::List& asList() const;
+ void collect(qpid::framing::FieldTable& args) const;
+ bool hasKey(const std::string&) const;
+
+ const Variant::Map* options;
+ const Variant* value;
+};
+
+Opt::Opt(const Address& address) : options(&(address.getOptions())), value(0) {}
+Opt::Opt(const Variant::Map& base) : options(&base), value(0) {}
+Opt& Opt::operator/(const std::string& name)
+{
+ if (options) {
+ Variant::Map::const_iterator j = options->find(name);
+ if (j == options->end()) {
+ value = 0;
+ options = 0;
+ } else {
+ value = &(j->second);
+ if (value->getType() == VAR_MAP) options = &(value->asMap());
+ else options = 0;
+ }
+ }
+ return *this;
+}
+
+
+Opt::operator bool() const
+{
+ return value && !value->isVoid() && value->asBool();
+}
+
+Opt::operator std::string() const
+{
+ return str();
+}
+
+bool Opt::asBool(bool defaultValue) const
+{
+ if (value) return value->asBool();
+ else return defaultValue;
+}
+
+std::string Opt::str() const
+{
+ if (value) return value->asString();
+ else return EMPTY_STRING;
+}
+
+const Variant::List& Opt::asList() const
+{
+ if (value) return value->asList();
+ else return EMPTY_LIST;
+}
+
+void Opt::collect(qpid::framing::FieldTable& args) const
+{
+ if (value) {
+ translate(value->asMap(), args);
+ }
+}
+bool Opt::hasKey(const std::string& key) const
+{
+ if (value) {
+ Variant::Map::const_iterator i = value->asMap().find(key);
+ return i != value->asMap().end();
+ } else {
+ return false;
+ }
+}
+
+bool AddressResolution::is_unreliable(const Address& address)
+{
+
+ return in((Opt(address)/LINK/RELIABILITY).str(),
+ list_of<std::string>(UNRELIABLE)(AT_MOST_ONCE));
+}
+
+bool AddressResolution::is_reliable(const Address& address)
+{
+ return in((Opt(address)/LINK/RELIABILITY).str(),
+ list_of<std::string>(AT_LEAST_ONCE)(EXACTLY_ONCE));
+}
+
+std::string checkAddressType(qpid::client::Session session, const Address& address)
+{
+ verifier.verify(address);
+ if (address.getName().empty()) {
+ throw MalformedAddress("Name cannot be null");
+ }
+ std::string type = (Opt(address)/NODE/TYPE).str();
+ if (type.empty()) {
+ ExchangeBoundResult result = session.exchangeBound(arg::exchange=address.getName(), arg::queue=address.getName());
+ if (result.getQueueNotFound() && result.getExchangeNotFound()) {
+ //neither a queue nor an exchange exists with that name; treat it as a queue
+ type = QUEUE_ADDRESS;
+ } else if (result.getExchangeNotFound()) {
+ //name refers to a queue
+ type = QUEUE_ADDRESS;
+ } else if (result.getQueueNotFound()) {
+ //name refers to an exchange
+ type = TOPIC_ADDRESS;
+ } else {
+ //both a queue and exchange exist for that name
+ throw ResolutionError("Ambiguous address, please specify queue or topic as node type");
+ }
+ }
+ return type;
+}
+
+std::auto_ptr<MessageSource> AddressResolution::resolveSource(qpid::client::Session session,
+ const Address& address)
+{
+ std::string type = checkAddressType(session, address);
+ if (type == TOPIC_ADDRESS) {
+ std::string exchangeType = sync(session).exchangeQuery(address.getName()).getType();
+ std::auto_ptr<MessageSource> source(new Subscription(address, exchangeType));
+ QPID_LOG(debug, "treating source address as topic: " << address);
+ return source;
+ } else if (type == QUEUE_ADDRESS) {
+ std::auto_ptr<MessageSource> source(new QueueSource(address));
+ QPID_LOG(debug, "treating source address as queue: " << address);
+ return source;
+ } else {
+ throw ResolutionError("Unrecognised type: " + type);
+ }
+}
+
+
+std::auto_ptr<MessageSink> AddressResolution::resolveSink(qpid::client::Session session,
+ const qpid::messaging::Address& address)
+{
+ std::string type = checkAddressType(session, address);
+ if (type == TOPIC_ADDRESS) {
+ std::auto_ptr<MessageSink> sink(new ExchangeSink(address));
+ QPID_LOG(debug, "treating target address as topic: " << address);
+ return sink;
+ } else if (type == QUEUE_ADDRESS) {
+ std::auto_ptr<MessageSink> sink(new QueueSink(address));
+ QPID_LOG(debug, "treating target address as queue: " << address);
+ return sink;
+ } else {
+ throw ResolutionError("Unrecognised type: " + type);
+ }
+}
+
+bool isBrowse(const Address& address)
+{
+ const Variant& mode = getOption(address, MODE);
+ if (!mode.isVoid()) {
+ std::string value = mode.asString();
+ if (value == BROWSE) return true;
+ else if (value != CONSUME) throw ResolutionError("Invalid mode");
+ }
+ return false;
+}
+
+QueueSource::QueueSource(const Address& address) :
+ Queue(address),
+ acquireMode(isBrowse(address) ? ACQUIRE_MODE_NOT_ACQUIRED : ACQUIRE_MODE_PRE_ACQUIRED),
+ //since this client does not provide any means by which an
+ //unacquired message can be acquired, there is no value in an
+ //explicit accept
+ acceptMode(acquireMode == ACQUIRE_MODE_NOT_ACQUIRED || AddressResolution::is_unreliable(address) ? ACCEPT_MODE_NONE : ACCEPT_MODE_EXPLICIT),
+ exclusive(false)
+{
+ exclusive = Opt(address)/LINK/X_SUBSCRIBE/EXCLUSIVE;
+ (Opt(address)/LINK/X_SUBSCRIBE/ARGUMENTS).collect(options);
+ std::string selector = Opt(address)/LINK/SELECTOR;
+ if (!selector.empty()) options.setString(APACHE_SELECTOR, selector);
+}
+
+void QueueSource::subscribe(qpid::client::AsyncSession& session, const std::string& destination)
+{
+ checkCreate(session, FOR_RECEIVER);
+ checkAssert(session, FOR_RECEIVER);
+ linkBindings.bind(session);
+ session.messageSubscribe(arg::queue=name,
+ arg::destination=destination,
+ arg::acceptMode=acceptMode,
+ arg::acquireMode=acquireMode,
+ arg::exclusive=exclusive,
+ arg::arguments=options);
+}
+
+void QueueSource::cancel(qpid::client::AsyncSession& session, const std::string& destination)
+{
+ linkBindings.unbind(session);
+ session.messageCancel(destination);
+ checkDelete(session, FOR_RECEIVER);
+}
+
+std::string Subscription::getSubscriptionName(const std::string& base, const std::string& name)
+{
+ if (name.empty()) {
+ return (boost::format("%1%_%2%") % base % Uuid(true).str()).str();
+ } else {
+ return name;
+ }
+}
+
+Subscription::Subscription(const Address& address, const std::string& type)
+ : Exchange(address),
+ queue(getSubscriptionName(name, (Opt(address)/LINK/NAME).str())),
+ durable(Opt(address)/LINK/DURABLE),
+ //if the link is durable, then assume it is also reliable unless explicitly stated otherwise
+ //if not assume it is unreliable unless explicitly stated otherwise
+ reliable(durable ? !AddressResolution::is_unreliable(address) : AddressResolution::is_reliable(address)),
+ actualType(type.empty() ? (specifiedType.empty() ? TOPIC_EXCHANGE : specifiedType) : type),
+ exclusiveQueue((Opt(address)/LINK/X_DECLARE/EXCLUSIVE).asBool(true)),
+ autoDeleteQueue((Opt(address)/LINK/X_DECLARE/AUTO_DELETE).asBool(!(durable || reliable))),
+ exclusiveSubscription((Opt(address)/LINK/X_SUBSCRIBE/EXCLUSIVE).asBool(exclusiveQueue)),
+ alternateExchange((Opt(address)/LINK/X_DECLARE/ALTERNATE_EXCHANGE).str())
+{
+
+ if ((Opt(address)/LINK).hasKey(TIMEOUT)) {
+ const Variant* timeout = (Opt(address)/LINK/TIMEOUT).value;
+ if (timeout->asUint32()) queueOptions.setInt("qpid.auto_delete_timeout", timeout->asUint32());
+ } else if (durable && !AddressResolution::is_reliable(address) && !(Opt(address)/LINK/X_DECLARE).hasKey(AUTO_DELETE)) {
+ //if durable, not explicitly reliable, and auto-delete not
+ //explicitly set, then set a non-zero default for the
+ //autodelete timeout
+ queueOptions.setInt("qpid.auto_delete_timeout", 2*60);
+ }
+ (Opt(address)/LINK/X_DECLARE/ARGUMENTS).collect(queueOptions);
+ (Opt(address)/LINK/X_SUBSCRIBE/ARGUMENTS).collect(subscriptionOptions);
+ std::string selector = Opt(address)/LINK/SELECTOR;
+ if (!selector.empty()) queueOptions.setString(QPID_FILTER, selector);
+
+ if (!address.getSubject().empty()) bindSubject(address.getSubject());
+ else if (linkBindings.empty()) bindAll();
+}
+
+void Subscription::bindSubject(const std::string& subject)
+{
+ if (actualType == HEADERS_EXCHANGE) {
+ Binding b(name, queue, subject);
+ b.arguments.setString("qpid.subject", subject);
+ b.arguments.setString("x-match", "all");
+ bindings.push_back(b);
+ } else if (actualType == XML_EXCHANGE) {
+ Binding b(name, queue, subject);
+ std::string query = (boost::format("declare variable $qpid.subject external; $qpid.subject = '%1%'")
+ % subject).str();
+ b.arguments.setString("xquery", query);
+ bindings.push_back(b);
+ } else {
+ //Note: the fanout exchange doesn't support any filtering, so
+ //the subject is ignored in that case
+ add(name, subject);
+ }
+}
+
+void Subscription::bindAll()
+{
+ if (actualType == TOPIC_EXCHANGE) {
+ add(name, WILDCARD_ANY);
+ } else if (actualType == FANOUT_EXCHANGE) {
+ add(name, queue);
+ } else if (actualType == HEADERS_EXCHANGE) {
+ Binding b(name, queue, "match-all");
+ b.arguments.setString("x-match", "all");
+ bindings.push_back(b);
+ } else if (actualType == XML_EXCHANGE) {
+ Binding b(name, queue, EMPTY_STRING);
+ b.arguments.setString("xquery", "true()");
+ bindings.push_back(b);
+ } else {
+ add(name, EMPTY_STRING);
+ }
+}
+
+void Subscription::add(const std::string& exchange, const std::string& key)
+{
+ bindings.push_back(Binding(exchange, queue, key));
+}
+
+void Subscription::subscribe(qpid::client::AsyncSession& session, const std::string& destination)
+{
+ //create exchange if required and specified by policy:
+ checkCreate(session, FOR_RECEIVER);
+ checkAssert(session, FOR_RECEIVER);
+
+ //create subscription queue:
+ session.queueDeclare(arg::queue=queue, arg::exclusive=exclusiveQueue,
+ arg::autoDelete=autoDeleteQueue, arg::durable=durable,
+ arg::alternateExchange=alternateExchange,
+ arg::arguments=queueOptions);
+ //'default' binding:
+ bindings.bind(session);
+ //any explicit bindings:
+ linkBindings.setDefaultQueue(queue);
+ linkBindings.bind(session);
+ //subscribe to subscription queue:
+ AcceptMode accept = reliable ? ACCEPT_MODE_EXPLICIT : ACCEPT_MODE_NONE;
+ session.messageSubscribe(arg::queue=queue, arg::destination=destination,
+ arg::exclusive=exclusiveSubscription, arg::acceptMode=accept, arg::arguments=subscriptionOptions);
+}
+
+void Subscription::cancel(qpid::client::AsyncSession& session, const std::string& destination)
+{
+ linkBindings.unbind(session);
+ session.messageCancel(destination);
+ if (exclusiveQueue) session.queueDelete(arg::queue=queue, arg::ifUnused=true);
+ checkDelete(session, FOR_RECEIVER);
+}
+
+ExchangeSink::ExchangeSink(const Address& address) : Exchange(address) {}
+
+void ExchangeSink::declare(qpid::client::AsyncSession& session, const std::string&)
+{
+ checkCreate(session, FOR_SENDER);
+ checkAssert(session, FOR_SENDER);
+ linkBindings.bind(session);
+}
+
+void ExchangeSink::send(qpid::client::AsyncSession& session, const std::string&, OutgoingMessage& m)
+{
+ m.send(session, name, m.getSubject());
+}
+
+void ExchangeSink::cancel(qpid::client::AsyncSession& session, const std::string&)
+{
+ linkBindings.unbind(session);
+ checkDelete(session, FOR_SENDER);
+}
+
+QueueSink::QueueSink(const Address& address) : Queue(address) {}
+
+void QueueSink::declare(qpid::client::AsyncSession& session, const std::string&)
+{
+ checkCreate(session, FOR_SENDER);
+ checkAssert(session, FOR_SENDER);
+ linkBindings.bind(session);
+}
+void QueueSink::send(qpid::client::AsyncSession& session, const std::string&, OutgoingMessage& m)
+{
+ m.send(session, name);
+}
+
+void QueueSink::cancel(qpid::client::AsyncSession& session, const std::string&)
+{
+ linkBindings.unbind(session);
+ checkDelete(session, FOR_SENDER);
+}
+
+Address AddressResolution::convert(const qpid::framing::ReplyTo& rt)
+{
+ Address address;
+ if (rt.getExchange().empty()) {//if default exchange, treat as queue
+ if (!rt.getRoutingKey().empty()) {
+ address.setName(rt.getRoutingKey());
+ address.setType(QUEUE_ADDRESS);
+ }
+ } else {
+ address.setName(rt.getExchange());
+ address.setSubject(rt.getRoutingKey());
+ address.setType(TOPIC_ADDRESS);
+ }
+ return address;
+}
+
+qpid::framing::ReplyTo AddressResolution::convert(const Address& address)
+{
+ if (address.getType() == QUEUE_ADDRESS || address.getType().empty()) {
+ return ReplyTo(EMPTY_STRING, address.getName());
+ } else if (address.getType() == TOPIC_ADDRESS) {
+ return ReplyTo(address.getName(), address.getSubject());
+ } else {
+ QPID_LOG(notice, "Unrecognised type for reply-to: " << address.getType());
+ return ReplyTo(EMPTY_STRING, address.getName());//treat as queue
+ }
+}
+
+bool isQueue(qpid::client::Session session, const qpid::messaging::Address& address)
+{
+ return address.getType() == QUEUE_ADDRESS ||
+ (address.getType().empty() && session.queueQuery(address.getName()).getQueue() == address.getName());
+}
+
+bool isTopic(qpid::client::Session session, const qpid::messaging::Address& address)
+{
+ if (address.getType().empty()) {
+ return !session.exchangeQuery(address.getName()).getNotFound();
+ } else if (address.getType() == TOPIC_ADDRESS) {
+ return true;
+ } else {
+ return false;
+ }
+}
+
+Node::Node(const Address& address) : name(address.getName()),
+ createPolicy(getOption(address, CREATE)),
+ assertPolicy(getOption(address, ASSERT)),
+ deletePolicy(getOption(address, DELETE))
+{
+ nodeBindings.add((Opt(address)/NODE/X_BINDINGS).asList());
+ linkBindings.add((Opt(address)/LINK/X_BINDINGS).asList());
+}
+
+Queue::Queue(const Address& a) : Node(a),
+ durable(Opt(a)/NODE/DURABLE),
+ autoDelete(Opt(a)/NODE/X_DECLARE/AUTO_DELETE),
+ exclusive(Opt(a)/NODE/X_DECLARE/EXCLUSIVE),
+ alternateExchange((Opt(a)/NODE/X_DECLARE/ALTERNATE_EXCHANGE).str())
+{
+ (Opt(a)/NODE/X_DECLARE/ARGUMENTS).collect(arguments);
+ nodeBindings.setDefaultQueue(name);
+ linkBindings.setDefaultQueue(name);
+ if (qpid::messaging::AddressImpl::isTemporary(a) && createPolicy.isVoid()) {
+ createPolicy = "always";
+ Opt specified = Opt(a)/NODE/X_DECLARE;
+ if (!specified.hasKey(AUTO_DELETE)) autoDelete = true;
+ if (!specified.hasKey(EXCLUSIVE)) exclusive = true;
+ }
+}
+
+void Queue::checkCreate(qpid::client::AsyncSession& session, CheckMode mode)
+{
+ if (enabled(createPolicy, mode)) {
+ QPID_LOG(debug, "Auto-creating queue '" << name << "'");
+ try {
+ session.queueDeclare(arg::queue=name,
+ arg::durable=durable,
+ arg::autoDelete=autoDelete,
+ arg::exclusive=exclusive,
+ arg::alternateExchange=alternateExchange,
+ arg::arguments=arguments);
+ nodeBindings.bind(session);
+ session.sync();
+ } catch (const qpid::framing::ResourceLockedException& e) {
+ throw ResolutionError((boost::format("Creation failed for queue %1%; %2%") % name % e.what()).str());
+ } catch (const qpid::framing::NotAllowedException& e) {
+ throw ResolutionError((boost::format("Creation failed for queue %1%; %2%") % name % e.what()).str());
+ } catch (const qpid::framing::NotFoundException& e) {//may be thrown when creating bindings
+ throw ResolutionError((boost::format("Creation failed for queue %1%; %2%") % name % e.what()).str());
+ }
+ } else {
+ try {
+ sync(session).queueDeclare(arg::queue=name, arg::passive=true);
+ } catch (const qpid::framing::NotFoundException& /*e*/) {
+ throw NotFound((boost::format("Queue %1% does not exist") % name).str());
+ }
+ }
+}
+
+void Queue::checkDelete(qpid::client::AsyncSession& session, CheckMode mode)
+{
+ //Note: queue-delete will cause a session exception if the queue
+ //does not exist, the query here prevents obvious cases of this
+ //but there is a race whenever two deletions are made concurrently
+ //so careful use of the delete policy is recommended at present
+ if (enabled(deletePolicy, mode) && sync(session).queueQuery(name).getQueue() == name) {
+ QPID_LOG(debug, "Auto-deleting queue '" << name << "'");
+ sync(session).queueDelete(arg::queue=name);
+ }
+}
+
+void Queue::checkAssert(qpid::client::AsyncSession& session, CheckMode mode)
+{
+ if (enabled(assertPolicy, mode)) {
+ QueueQueryResult result = sync(session).queueQuery(name);
+ if (result.getQueue() != name) {
+ throw NotFound((boost::format("Queue not found: %1%") % name).str());
+ } else {
+ if (durable && !result.getDurable()) {
+ throw AssertionFailed((boost::format("Queue not durable: %1%") % name).str());
+ }
+ if (autoDelete && !result.getAutoDelete()) {
+ throw AssertionFailed((boost::format("Queue not set to auto-delete: %1%") % name).str());
+ }
+ if (exclusive && !result.getExclusive()) {
+ throw AssertionFailed((boost::format("Queue not exclusive: %1%") % name).str());
+ }
+ if (!alternateExchange.empty() && result.getAlternateExchange() != alternateExchange) {
+ throw AssertionFailed((boost::format("Alternate exchange does not match for %1%, expected %2%, got %3%")
+ % name % alternateExchange % result.getAlternateExchange()).str());
+ }
+ for (FieldTable::ValueMap::const_iterator i = arguments.begin(); i != arguments.end(); ++i) {
+ FieldTable::ValuePtr v = result.getArguments().get(i->first);
+ if (!v) {
+ throw AssertionFailed((boost::format("Option %1% not set for %2%") % i->first % name).str());
+ } else if (!areEquivalent(*i->second, *v)) {
+ throw AssertionFailed((boost::format("Option %1% does not match for %2%, expected %3%, got %4%")
+ % i->first % name % *(i->second) % *v).str());
+ }
+ }
+ nodeBindings.check(session);
+ }
+ }
+}
+
+Exchange::Exchange(const Address& a) : Node(a),
+ specifiedType((Opt(a)/NODE/X_DECLARE/TYPE).str()),
+ durable(Opt(a)/NODE/DURABLE),
+ autoDelete(Opt(a)/NODE/X_DECLARE/AUTO_DELETE),
+ alternateExchange((Opt(a)/NODE/X_DECLARE/ALTERNATE_EXCHANGE).str())
+{
+ (Opt(a)/NODE/X_DECLARE/ARGUMENTS).collect(arguments);
+ nodeBindings.setDefaultExchange(name);
+ linkBindings.setDefaultExchange(name);
+ if (qpid::messaging::AddressImpl::isTemporary(a) && createPolicy.isVoid()) {
+ createPolicy = "always";
+ if (!(Opt(a)/NODE/X_DECLARE).hasKey(AUTO_DELETE)) autoDelete = true;
+ }
+}
+
+bool Exchange::isReservedName()
+{
+ return name.find(PREFIX_AMQ) != std::string::npos || name.find(PREFIX_QPID) != std::string::npos;
+}
+
+void Exchange::checkCreate(qpid::client::AsyncSession& session, CheckMode mode)
+{
+ if (enabled(createPolicy, mode)) {
+ try {
+ if (isReservedName()) {
+ try {
+ sync(session).exchangeDeclare(arg::exchange=name, arg::passive=true);
+ } catch (const qpid::framing::NotFoundException& /*e*/) {
+ throw ResolutionError((boost::format("Cannot create exchange %1%; names beginning with \"amq.\" or \"qpid.\" are reserved.") % name).str());
+ }
+
+ } else {
+ std::string type = specifiedType;
+ if (type.empty()) type = TOPIC_EXCHANGE;
+ session.exchangeDeclare(arg::exchange=name,
+ arg::type=type,
+ arg::durable=durable,
+ arg::autoDelete=autoDelete,
+ arg::alternateExchange=alternateExchange,
+ arg::arguments=arguments);
+ }
+ nodeBindings.bind(session);
+ session.sync();
+ } catch (const qpid::framing::NotAllowedException& e) {
+ throw ResolutionError((boost::format("Create failed for exchange %1%; %2%") % name % e.what()).str());
+ } catch (const qpid::framing::NotFoundException& e) {//can be caused when creating bindings
+ throw ResolutionError((boost::format("Create failed for exchange %1%; %2%") % name % e.what()).str());
+ }
+ } else {
+ try {
+ sync(session).exchangeDeclare(arg::exchange=name, arg::passive=true);
+ } catch (const qpid::framing::NotFoundException& /*e*/) {
+ throw NotFound((boost::format("Exchange %1% does not exist") % name).str());
+ }
+ }
+}
+
+void Exchange::checkDelete(qpid::client::AsyncSession& session, CheckMode mode)
+{
+ //Note: exchange-delete will cause a session exception if the
+ //exchange does not exist, the query here prevents obvious cases
+ //of this but there is a race whenever two deletions are made
+ //concurrently so careful use of the delete policy is recommended
+ //at present
+ if (enabled(deletePolicy, mode) && !sync(session).exchangeQuery(name).getNotFound()) {
+ sync(session).exchangeDelete(arg::exchange=name);
+ }
+}
+
+void Exchange::checkAssert(qpid::client::AsyncSession& session, CheckMode mode)
+{
+ if (enabled(assertPolicy, mode)) {
+ ExchangeQueryResult result = sync(session).exchangeQuery(name);
+ if (result.getNotFound()) {
+ throw NotFound((boost::format("Exchange not found: %1%") % name).str());
+ } else {
+ if (specifiedType.size() && result.getType() != specifiedType) {
+ throw AssertionFailed((boost::format("Exchange %1% is of incorrect type, expected %2% but got %3%")
+ % name % specifiedType % result.getType()).str());
+ }
+ if (durable && !result.getDurable()) {
+ throw AssertionFailed((boost::format("Exchange not durable: %1%") % name).str());
+ }
+ //Note: Can't check auto-delete or alternate-exchange via
+ //exchange-query-result as these are not returned
+ //TODO: could use a passive declare to check alternate-exchange
+ for (FieldTable::ValueMap::const_iterator i = arguments.begin(); i != arguments.end(); ++i) {
+ FieldTable::ValuePtr v = result.getArguments().get(i->first);
+ if (!v) {
+ throw AssertionFailed((boost::format("Option %1% not set for %2%") % i->first % name).str());
+ } else if (!areEquivalent(*i->second, *v)) {
+ throw AssertionFailed((boost::format("Option %1% does not match for %2%, expected %3%, got %4%")
+ % i->first % name % *(i->second) % *v).str());
+ }
+ }
+ nodeBindings.check(session);
+ }
+ }
+}
+
+Binding::Binding(const Variant::Map& b) :
+ exchange((Opt(b)/EXCHANGE).str()),
+ queue((Opt(b)/QUEUE).str()),
+ key((Opt(b)/KEY).str())
+{
+ (Opt(b)/ARGUMENTS).collect(arguments);
+}
+
+Binding::Binding(const std::string& e, const std::string& q, const std::string& k) : exchange(e), queue(q), key(k) {}
+
+
+void Bindings::add(const Variant::List& list)
+{
+ for (Variant::List::const_iterator i = list.begin(); i != list.end(); ++i) {
+ push_back(Binding(i->asMap()));
+ }
+}
+
+void Bindings::setDefaultExchange(const std::string& exchange)
+{
+ for (Bindings::iterator i = begin(); i != end(); ++i) {
+ if (i->exchange.empty()) i->exchange = exchange;
+ }
+}
+
+void Bindings::setDefaultQueue(const std::string& queue)
+{
+ for (Bindings::iterator i = begin(); i != end(); ++i) {
+ if (i->queue.empty()) i->queue = queue;
+ }
+}
+
+void Bindings::bind(qpid::client::AsyncSession& session)
+{
+ for (Bindings::const_iterator i = begin(); i != end(); ++i) {
+ session.exchangeBind(arg::queue=i->queue,
+ arg::exchange=i->exchange,
+ arg::bindingKey=i->key,
+ arg::arguments=i->arguments);
+ }
+}
+
+void Bindings::unbind(qpid::client::AsyncSession& session)
+{
+ for (Bindings::const_iterator i = begin(); i != end(); ++i) {
+ session.exchangeUnbind(arg::queue=i->queue,
+ arg::exchange=i->exchange,
+ arg::bindingKey=i->key);
+ }
+}
+
+void Bindings::check(qpid::client::AsyncSession& session)
+{
+ for (Bindings::const_iterator i = begin(); i != end(); ++i) {
+ ExchangeBoundResult result = sync(session).exchangeBound(arg::queue=i->queue,
+ arg::exchange=i->exchange,
+ arg::bindingKey=i->key);
+ if (result.getQueueNotMatched() || result.getKeyNotMatched()) {
+ throw AssertionFailed((boost::format("No such binding [exchange=%1%, queue=%2%, key=%3%]")
+ % i->exchange % i->queue % i->key).str());
+ }
+ }
+}
+
+bool Node::enabled(const Variant& policy, CheckMode mode)
+{
+ bool result = false;
+ switch (mode) {
+ case FOR_RECEIVER:
+ result = in(policy, RECEIVER_MODES);
+ break;
+ case FOR_SENDER:
+ result = in(policy, SENDER_MODES);
+ break;
+ }
+ return result;
+}
+
+bool Node::createEnabled(const Address& address, CheckMode mode)
+{
+ const Variant& policy = getOption(address, CREATE);
+ return enabled(policy, mode);
+}
+
+void Node::convert(const Variant& options, FieldTable& arguments)
+{
+ if (!options.isVoid()) {
+ translate(options.asMap(), arguments);
+ }
+}
+std::vector<std::string> Node::RECEIVER_MODES = list_of<std::string>(ALWAYS) (RECEIVER);
+std::vector<std::string> Node::SENDER_MODES = list_of<std::string>(ALWAYS) (SENDER);
+
+Verifier::Verifier()
+{
+ defined[CREATE] = true;
+ defined[ASSERT] = true;
+ defined[DELETE] = true;
+ defined[MODE] = true;
+ Variant::Map node;
+ node[TYPE] = true;
+ node[DURABLE] = true;
+ node[X_DECLARE] = true;
+ node[X_BINDINGS] = true;
+ defined[NODE] = node;
+ Variant::Map link;
+ link[NAME] = true;
+ link[DURABLE] = true;
+ link[RELIABILITY] = true;
+ link[TIMEOUT] = true;
+ link[X_SUBSCRIBE] = true;
+ link[X_DECLARE] = true;
+ link[X_BINDINGS] = true;
+ link[SELECTOR] = true;
+ defined[LINK] = link;
+}
+void Verifier::verify(const Address& address) const
+{
+ verify(defined, address.getOptions());
+}
+
+void Verifier::verify(const Variant::Map& allowed, const Variant::Map& actual) const
+{
+ for (Variant::Map::const_iterator i = actual.begin(); i != actual.end(); ++i) {
+ Variant::Map::const_iterator option = allowed.find(i->first);
+ if (option == allowed.end()) {
+ throw AddressError((boost::format("Unrecognised option: %1%") % i->first).str());
+ } else if (option->second.getType() == qpid::types::VAR_MAP) {
+ verify(option->second.asMap(), i->second.asMap());
+ }
+ }
+}
+
+}}} // namespace qpid::client::amqp0_10
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.h b/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.h
new file mode 100644
index 0000000000..fc8f1a1d18
--- /dev/null
+++ b/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.h
@@ -0,0 +1,64 @@
+#ifndef QPID_CLIENT_AMQP0_10_ADDRESSRESOLUTION_H
+#define QPID_CLIENT_AMQP0_10_ADDRESSRESOLUTION_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/client/Session.h"
+
+namespace qpid {
+
+namespace framing{
+class ReplyTo;
+}
+
+namespace messaging {
+class Address;
+}
+
+namespace client {
+namespace amqp0_10 {
+
+class MessageSource;
+class MessageSink;
+
+/**
+ * Maps from a generic Address and optional Filter to an AMQP 0-10
+ * MessageSource which will then be used by a ReceiverImpl instance
+ * created for the address.
+ */
+class AddressResolution
+{
+ public:
+ std::auto_ptr<MessageSource> resolveSource(qpid::client::Session session,
+ const qpid::messaging::Address& address);
+
+ std::auto_ptr<MessageSink> resolveSink(qpid::client::Session session,
+ const qpid::messaging::Address& address);
+
+ static qpid::messaging::Address convert(const qpid::framing::ReplyTo&);
+ static qpid::framing::ReplyTo convert(const qpid::messaging::Address&);
+ static bool is_unreliable(const qpid::messaging::Address& address);
+ static bool is_reliable(const qpid::messaging::Address& address);
+ private:
+};
+}}} // namespace qpid::client::amqp0_10
+
+#endif /*!QPID_CLIENT_AMQP0_10_ADDRESSRESOLUTION_H*/
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
new file mode 100644
index 0000000000..11ef06e517
--- /dev/null
+++ b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
@@ -0,0 +1,404 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "ConnectionImpl.h"
+#include "SessionImpl.h"
+#include "qpid/messaging/exceptions.h"
+#include "qpid/messaging/Session.h"
+#include "qpid/messaging/PrivateImplRef.h"
+#include "qpid/framing/Uuid.h"
+#include "qpid/log/Statement.h"
+#include "qpid/Url.h"
+#include "qpid/amqp_0_10/Codecs.h"
+#include <boost/intrusive_ptr.hpp>
+#include <vector>
+#include <sstream>
+#include <limits>
+
+namespace qpid {
+namespace client {
+namespace amqp0_10 {
+
+using qpid::types::Variant;
+using qpid::types::VAR_LIST;
+using qpid::framing::Uuid;
+
+namespace {
+
+const std::string TCP("tcp");
+const std::string COLON(":");
+double FOREVER(std::numeric_limits<double>::max());
+
+// Time values in seconds can be specified as integer or floating point values.
+double timeValue(const Variant& value) {
+ if (types::isIntegerType(value.getType()))
+ return double(value.asInt64());
+ return value.asDouble();
+}
+
+void merge(const std::string& value, std::vector<std::string>& list) {
+ if (std::find(list.begin(), list.end(), value) == list.end())
+ list.push_back(value);
+}
+
+void merge(const Variant::List& from, std::vector<std::string>& to)
+{
+ for (Variant::List::const_iterator i = from.begin(); i != from.end(); ++i)
+ merge(i->asString(), to);
+}
+
+std::string asString(const std::vector<std::string>& v) {
+ std::stringstream os;
+ os << "[";
+ for(std::vector<std::string>::const_iterator i = v.begin(); i != v.end(); ++i ) {
+ if (i != v.begin()) os << ", ";
+ os << *i;
+ }
+ os << "]";
+ return os.str();
+}
+
+bool expired(const sys::AbsTime& start, double timeout)
+{
+ if (timeout == 0) return true;
+ if (timeout == FOREVER) return false;
+ sys::Duration used(start, sys::now());
+ sys::Duration allowed((int64_t)(timeout*sys::TIME_SEC));
+ return allowed < used;
+}
+
+} // namespace
+
+ConnectionImpl::ConnectionImpl(const std::string& url, const Variant::Map& options) :
+ replaceUrls(false), autoReconnect(false), timeout(FOREVER), limit(-1),
+ minReconnectInterval(0.001), maxReconnectInterval(2),
+ retries(0), reconnectOnLimitExceeded(true), disableAutoDecode(false)
+{
+ setOptions(options);
+ urls.insert(urls.begin(), url);
+}
+
+void ConnectionImpl::setOptions(const Variant::Map& options)
+{
+ for (Variant::Map::const_iterator i = options.begin(); i != options.end(); ++i) {
+ setOption(i->first, i->second);
+ }
+}
+
+void ConnectionImpl::setOption(const std::string& name, const Variant& value)
+{
+ sys::Mutex::ScopedLock l(lock);
+ if (name == "reconnect") {
+ autoReconnect = value;
+ } else if (name == "reconnect-timeout" || name == "reconnect_timeout") {
+ timeout = timeValue(value);
+ } else if (name == "reconnect-limit" || name == "reconnect_limit") {
+ limit = value;
+ } else if (name == "reconnect-interval" || name == "reconnect_interval") {
+ maxReconnectInterval = minReconnectInterval = timeValue(value);
+ } else if (name == "reconnect-interval-min" || name == "reconnect_interval_min") {
+ minReconnectInterval = timeValue(value);
+ } else if (name == "reconnect-interval-max" || name == "reconnect_interval_max") {
+ maxReconnectInterval = timeValue(value);
+ } else if (name == "reconnect-urls-replace" || name == "reconnect_urls_replace") {
+ replaceUrls = value.asBool();
+ } else if (name == "reconnect-urls" || name == "reconnect_urls") {
+ if (replaceUrls) urls.clear();
+ if (value.getType() == VAR_LIST) {
+ merge(value.asList(), urls);
+ } else {
+ merge(value.asString(), urls);
+ }
+ } else if (name == "username") {
+ settings.username = value.asString();
+ } else if (name == "password") {
+ settings.password = value.asString();
+ } else if (name == "sasl-mechanism" || name == "sasl_mechanism" ||
+ name == "sasl-mechanisms" || name == "sasl_mechanisms") {
+ settings.mechanism = value.asString();
+ } else if (name == "sasl-service" || name == "sasl_service") {
+ settings.service = value.asString();
+ } else if (name == "sasl-min-ssf" || name == "sasl_min_ssf") {
+ settings.minSsf = value;
+ } else if (name == "sasl-max-ssf" || name == "sasl_max_ssf") {
+ settings.maxSsf = value;
+ } else if (name == "heartbeat") {
+ settings.heartbeat = value;
+ } else if (name == "tcp-nodelay" || name == "tcp_nodelay") {
+ settings.tcpNoDelay = value;
+ } else if (name == "locale") {
+ settings.locale = value.asString();
+ } else if (name == "max-channels" || name == "max_channels") {
+ settings.maxChannels = value;
+ } else if (name == "max-frame-size" || name == "max_frame_size") {
+ settings.maxFrameSize = value;
+ } else if (name == "bounds") {
+ settings.bounds = value;
+ } else if (name == "transport") {
+ settings.protocol = value.asString();
+ } else if (name == "ssl-cert-name" || name == "ssl_cert_name") {
+ settings.sslCertName = value.asString();
+ } else if (name == "ssl-ignore-hostname-verification-failure" || name == "ssl_ignore_hostname_verification_failure") {
+ settings.sslIgnoreHostnameVerificationFailure = value;
+ } else if (name == "x-reconnect-on-limit-exceeded" || name == "x_reconnect_on_limit_exceeded") {
+ reconnectOnLimitExceeded = value;
+ } else if (name == "client-properties" || name == "client_properties") {
+ amqp_0_10::translate(value.asMap(), settings.clientProperties);
+ } else if (name == "disable-auto-decode" || name == "disable_auto_decode") {
+ disableAutoDecode = value;
+ } else {
+ throw qpid::messaging::MessagingException(QPID_MSG("Invalid option: " << name << " not recognised"));
+ }
+}
+
+
+void ConnectionImpl::close()
+{
+ while(true) {
+ messaging::Session session;
+ {
+ qpid::sys::Mutex::ScopedLock l(lock);
+ if (sessions.empty()) break;
+ session = sessions.begin()->second;
+ }
+ session.close();
+ }
+ detach();
+}
+
+void ConnectionImpl::detach()
+{
+ qpid::sys::Mutex::ScopedLock l(lock);
+ connection.close();
+}
+
+bool ConnectionImpl::isOpen() const
+{
+ qpid::sys::Mutex::ScopedLock l(lock);
+ return connection.isOpen();
+}
+
+boost::intrusive_ptr<SessionImpl> getImplPtr(qpid::messaging::Session& session)
+{
+ return boost::dynamic_pointer_cast<SessionImpl>(
+ qpid::messaging::PrivateImplRef<qpid::messaging::Session>::get(session)
+ );
+}
+
+void ConnectionImpl::closed(SessionImpl& s)
+{
+ qpid::sys::Mutex::ScopedLock l(lock);
+ for (Sessions::iterator i = sessions.begin(); i != sessions.end(); ++i) {
+ if (getImplPtr(i->second).get() == &s) {
+ sessions.erase(i);
+ break;
+ }
+ }
+}
+
+qpid::messaging::Session ConnectionImpl::getSession(const std::string& name) const
+{
+ qpid::sys::Mutex::ScopedLock l(lock);
+ Sessions::const_iterator i = sessions.find(name);
+ if (i == sessions.end()) {
+ throw qpid::messaging::KeyError("No such session: " + name);
+ } else {
+ return i->second;
+ }
+}
+
+qpid::messaging::Session ConnectionImpl::newSession(bool transactional, const std::string& n)
+{
+ std::string name = n.empty() ? Uuid(true).str() : n;
+ qpid::messaging::Session impl(new SessionImpl(*this, transactional));
+ while (true) {
+ try {
+ getImplPtr(impl)->setSession(connection.newSession(name));
+ qpid::sys::Mutex::ScopedLock l(lock);
+ sessions[name] = impl;
+ break;
+ } catch (const qpid::TransportFailure&) {
+ reopen();
+ } catch (const qpid::SessionException& e) {
+ SessionImpl::rethrow(e);
+ } catch (const std::exception& e) {
+ throw qpid::messaging::MessagingException(e.what());
+ }
+ }
+ return impl;
+}
+
+void ConnectionImpl::open()
+{
+ qpid::sys::AbsTime start = qpid::sys::now();
+ qpid::sys::ScopedLock<qpid::sys::Semaphore> l(semaphore);
+ try {
+ if (!connection.isOpen()) connect(start);
+ }
+ catch (const types::Exception&) { throw; }
+ catch (const qpid::Exception& e) { throw messaging::ConnectionError(e.what()); }
+}
+
+void ConnectionImpl::reopen()
+{
+ if (!autoReconnect) {
+ throw qpid::messaging::TransportFailure("Failed to connect (reconnect disabled)");
+ }
+ open();
+}
+
+
+void ConnectionImpl::connect(const qpid::sys::AbsTime& started)
+{
+ QPID_LOG(debug, "Starting connection, urls=" << asString(urls));
+ for (double i = minReconnectInterval; !tryConnect(); i = std::min(i*2, maxReconnectInterval)) {
+ if (!autoReconnect) {
+ throw qpid::messaging::TransportFailure("Failed to connect (reconnect disabled)");
+ }
+ if (limit >= 0 && retries++ >= limit) {
+ throw qpid::messaging::TransportFailure("Failed to connect within reconnect limit");
+ }
+ if (expired(started, timeout)) {
+ throw qpid::messaging::TransportFailure("Failed to connect within reconnect timeout");
+ }
+ QPID_LOG(debug, "Connection retry in " << i*1000*1000 << " microseconds, urls="
+ << asString(urls));
+ qpid::sys::usleep(int64_t(i*1000*1000)); // Sleep in microseconds.
+ }
+ QPID_LOG(debug, "Connection successful, urls=" << asString(urls));
+ retries = 0;
+}
+
+void ConnectionImpl::mergeUrls(const std::vector<Url>& more, const sys::Mutex::ScopedLock&) {
+ for (std::vector<Url>::const_iterator i = more.begin(); i != more.end(); ++i)
+ merge(i->str(), urls);
+ QPID_LOG(debug, "Added known-hosts, reconnect-urls=" << asString(urls));
+}
+
+bool ConnectionImpl::tryConnect()
+{
+ sys::Mutex::ScopedLock l(lock);
+ for (std::vector<std::string>::const_iterator i = urls.begin(); i != urls.end(); ++i) {
+ try {
+ QPID_LOG(info, "Trying to connect to " << *i << "...");
+ Url url(*i, settings.protocol.size() ? settings.protocol : TCP);
+ if (url.getUser().size()) settings.username = url.getUser();
+ if (url.getPass().size()) settings.password = url.getPass();
+ connection.open(url, settings);
+ QPID_LOG(info, "Connected to " << *i);
+ mergeUrls(connection.getInitialBrokers(), l);
+ return resetSessions(l);
+ } catch (const qpid::ProtocolVersionError& e) {
+ throw qpid::messaging::ProtocolVersionError("AMQP 0-10 not supported");
+ } catch (const qpid::TransportFailure& e) {
+ QPID_LOG(info, "Failed to connect to " << *i << ": " << e.what());
+ }
+ }
+ return false;
+}
+
+bool ConnectionImpl::resetSessions(const sys::Mutex::ScopedLock& )
+{
+ try {
+ qpid::sys::Mutex::ScopedLock l(lock);
+ for (Sessions::iterator i = sessions.begin(); i != sessions.end(); ++i) {
+ if (!getImplPtr(i->second)->isTransactional()) {
+ getImplPtr(i->second)->setSession(connection.newSession(i->first));
+ }
+ }
+ return true;
+ } catch (const qpid::TransportFailure& e) {
+ QPID_LOG(debug, "Connection Failed to re-initialize sessions: " << e.what());
+ return false;
+ } catch (const qpid::framing::ResourceLimitExceededException& e) {
+ if (reconnectOnLimitExceeded) {
+ QPID_LOG(debug, "Detaching and reconnecting due to: " << e.what());
+ detach();
+ return false;
+ } else {
+ throw qpid::messaging::TargetCapacityExceeded(e.what());
+ }
+ }
+}
+
+bool ConnectionImpl::backoff()
+{
+ if (reconnectOnLimitExceeded) {
+ detach();
+ open();
+ return true;
+ } else {
+ return false;
+ }
+}
+
+void ConnectionImpl::reconnect(const std::string& u)
+{
+ sys::Mutex::ScopedLock l(lock);
+ try {
+ QPID_LOG(info, "Trying to connect to " << u << "...");
+ Url url(u, settings.protocol.size() ? settings.protocol : TCP);
+ if (url.getUser().size()) settings.username = url.getUser();
+ if (url.getPass().size()) settings.password = url.getPass();
+ connection.open(url, settings);
+ QPID_LOG(info, "Connected to " << u);
+ mergeUrls(connection.getInitialBrokers(), l);
+ if (!resetSessions(l)) throw qpid::messaging::TransportFailure("Could not re-establish sessions");
+ } catch (const qpid::TransportFailure& e) {
+ QPID_LOG(info, "Failed to connect to " << u << ": " << e.what());
+ throw qpid::messaging::TransportFailure(e.what());
+ } catch (const std::exception& e) {
+ QPID_LOG(info, "Error while connecting to " << u << ": " << e.what());
+ throw qpid::messaging::MessagingException(e.what());
+ }
+}
+
+void ConnectionImpl::reconnect()
+{
+ if (!tryConnect()) {
+ throw qpid::messaging::TransportFailure("Could not reconnect");
+ }
+}
+std::string ConnectionImpl::getUrl() const
+{
+ if (isOpen()) {
+ std::stringstream u;
+ u << connection.getNegotiatedSettings().protocol << COLON << connection.getNegotiatedSettings().host << COLON << connection.getNegotiatedSettings().port;
+ return u.str();
+ } else {
+ return std::string();
+ }
+}
+
+std::string ConnectionImpl::getAuthenticatedUsername()
+{
+ return connection.getNegotiatedSettings().username;
+}
+
+bool ConnectionImpl::getAutoDecode() const
+{
+ return !disableAutoDecode;
+}
+bool ConnectionImpl::getAutoReconnect() const
+{
+ return autoReconnect;
+}
+
+}}} // namespace qpid::client::amqp0_10
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
new file mode 100644
index 0000000000..bf8a759107
--- /dev/null
+++ b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
@@ -0,0 +1,88 @@
+#ifndef QPID_CLIENT_AMQP0_10_CONNECTIONIMPL_H
+#define QPID_CLIENT_AMQP0_10_CONNECTIONIMPL_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/messaging/ConnectionImpl.h"
+#include "qpid/types/Variant.h"
+#include "qpid/client/Connection.h"
+#include "qpid/client/ConnectionSettings.h"
+#include "qpid/sys/Mutex.h"
+#include "qpid/sys/Semaphore.h"
+#include <map>
+#include <vector>
+
+namespace qpid {
+struct Url;
+
+namespace client {
+namespace amqp0_10 {
+
+class SessionImpl;
+
+class ConnectionImpl : public qpid::messaging::ConnectionImpl
+{
+ public:
+ ConnectionImpl(const std::string& url, const qpid::types::Variant::Map& options);
+ void open();
+ void reopen();
+ bool isOpen() const;
+ void close();
+ qpid::messaging::Session newSession(bool transactional, const std::string& name);
+ qpid::messaging::Session getSession(const std::string& name) const;
+ void closed(SessionImpl&);
+ void detach();
+ void setOption(const std::string& name, const qpid::types::Variant& value);
+ bool backoff();
+ std::string getAuthenticatedUsername();
+ void reconnect(const std::string& url);
+ void reconnect();
+ std::string getUrl() const;
+ bool getAutoDecode() const;
+ bool getAutoReconnect() const;
+ private:
+ typedef std::map<std::string, qpid::messaging::Session> Sessions;
+
+ mutable qpid::sys::Mutex lock;//used to protect data structures
+ qpid::sys::Semaphore semaphore;//used to coordinate reconnection
+ Sessions sessions;
+ qpid::client::Connection connection;
+ bool replaceUrls; // Replace rather than merging with reconnect-urls
+ std::vector<std::string> urls;
+ qpid::client::ConnectionSettings settings;
+ bool autoReconnect;
+ double timeout;
+ int32_t limit;
+ double minReconnectInterval;
+ double maxReconnectInterval;
+ int32_t retries;
+ bool reconnectOnLimitExceeded;
+ bool disableAutoDecode;
+
+ void setOptions(const qpid::types::Variant::Map& options);
+ void connect(const qpid::sys::AbsTime& started);
+ bool tryConnect();
+ bool resetSessions(const sys::Mutex::ScopedLock&); // dummy parameter indicates call with lock held.
+ void mergeUrls(const std::vector<Url>& more, const sys::Mutex::ScopedLock&);
+};
+}}} // namespace qpid::client::amqp0_10
+
+#endif /*!QPID_CLIENT_AMQP0_10_CONNECTIONIMPL_H*/
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
new file mode 100644
index 0000000000..2ca2c85c64
--- /dev/null
+++ b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
@@ -0,0 +1,466 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/client/amqp0_10/IncomingMessages.h"
+#include "qpid/client/amqp0_10/AddressResolution.h"
+#include "qpid/amqp_0_10/Codecs.h"
+#include "qpid/client/SessionImpl.h"
+#include "qpid/client/SessionBase_0_10Access.h"
+#include "qpid/log/Statement.h"
+#include "qpid/messaging/Address.h"
+#include "qpid/messaging/Duration.h"
+#include "qpid/messaging/Message.h"
+#include "qpid/messaging/MessageImpl.h"
+#include "qpid/types/Variant.h"
+#include "qpid/framing/DeliveryProperties.h"
+#include "qpid/framing/FrameSet.h"
+#include "qpid/framing/MessageProperties.h"
+#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/framing/enum.h"
+#include <algorithm>
+
+namespace qpid {
+namespace client {
+namespace amqp0_10 {
+
+using namespace qpid::framing;
+using namespace qpid::framing::message;
+using namespace qpid::amqp_0_10;
+using qpid::sys::AbsTime;
+using qpid::sys::Duration;
+using qpid::messaging::MessageImplAccess;
+using qpid::types::Variant;
+
+namespace {
+const std::string EMPTY_STRING;
+
+
+struct GetNone : IncomingMessages::Handler
+{
+ bool accept(IncomingMessages::MessageTransfer&) { return false; }
+};
+
+struct GetAny : IncomingMessages::Handler
+{
+ bool accept(IncomingMessages::MessageTransfer& transfer)
+ {
+ transfer.retrieve(0);
+ return true;
+ }
+};
+
+struct MatchAndTrack
+{
+ const std::string destination;
+ SequenceSet ids;
+
+ MatchAndTrack(const std::string& d) : destination(d) {}
+
+ bool operator()(boost::shared_ptr<qpid::framing::FrameSet> command)
+ {
+ if (command->as<MessageTransferBody>()->getDestination() == destination) {
+ ids.add(command->getId());
+ return true;
+ } else {
+ return false;
+ }
+ }
+};
+
+struct Match
+{
+ const std::string destination;
+ uint32_t matched;
+
+ Match(const std::string& d) : destination(d), matched(0) {}
+
+ bool operator()(boost::shared_ptr<qpid::framing::FrameSet> command)
+ {
+ if (command->as<MessageTransferBody>()->getDestination() == destination) {
+ ++matched;
+ return true;
+ } else {
+ return false;
+ }
+ }
+};
+
+struct ScopedRelease
+{
+ bool& flag;
+ qpid::sys::Monitor& lock;
+
+ ScopedRelease(bool& f, qpid::sys::Monitor& l) : flag(f), lock(l) {}
+ ~ScopedRelease()
+ {
+ sys::Monitor::ScopedLock l(lock);
+ flag = false;
+ lock.notifyAll();
+ }
+};
+}
+
+IncomingMessages::IncomingMessages() : inUse(false) {}
+
+void IncomingMessages::setSession(qpid::client::AsyncSession s)
+{
+ sys::Mutex::ScopedLock l(lock);
+ session = s;
+ incoming = SessionBase_0_10Access(session).get()->getDemux().getDefault();
+ acceptTracker.reset();
+}
+
+namespace {
+qpid::sys::Duration get_duration(qpid::sys::Duration timeout, qpid::sys::AbsTime deadline)
+{
+ if (timeout == qpid::sys::TIME_INFINITE) {
+ return qpid::sys::TIME_INFINITE;
+ } else {
+ return std::max(qpid::sys::Duration(0), qpid::sys::Duration(AbsTime::now(), deadline));
+ }
+}
+}
+
+bool IncomingMessages::get(Handler& handler, qpid::sys::Duration timeout)
+{
+ sys::Mutex::ScopedLock l(lock);
+ AbsTime deadline(AbsTime::now(), timeout);
+ do {
+ //search through received list for any transfer of interest:
+ for (FrameSetQueue::iterator i = received.begin(); i != received.end();)
+ {
+ MessageTransfer transfer(*i, *this);
+ if (transfer.checkExpired()) {
+ i = received.erase(i);
+ } else if (handler.accept(transfer)) {
+ received.erase(i);
+ return true;
+ } else {
+ ++i;
+ }
+ }
+ if (inUse) {
+ //someone is already waiting on the incoming session queue, wait for them to finish
+ lock.wait(deadline);
+ } else {
+ inUse = true;
+ ScopedRelease release(inUse, lock);
+ sys::Mutex::ScopedUnlock l(lock);
+ //wait for suitable new message to arrive
+ switch (process(&handler, get_duration(timeout, deadline))) {
+ case OK:
+ return true;
+ case CLOSED:
+ return false;
+ case EMPTY:
+ break;
+ }
+ }
+ if (handler.isClosed()) throw qpid::messaging::ReceiverError("Receiver has been closed");
+ } while (AbsTime::now() < deadline);
+ return false;
+}
+namespace {
+struct Wakeup : public qpid::types::Exception {};
+}
+
+void IncomingMessages::wakeup()
+{
+ sys::Mutex::ScopedLock l(lock);
+ incoming->close(qpid::sys::ExceptionHolder(new Wakeup()));
+ lock.notifyAll();
+}
+
+bool IncomingMessages::getNextDestination(std::string& destination, qpid::sys::Duration timeout)
+{
+ sys::Mutex::ScopedLock l(lock);
+ AbsTime deadline(AbsTime::now(), timeout);
+ while (received.empty()) {
+ if (inUse) {
+ //someone is already waiting on the sessions incoming queue
+ lock.wait(deadline);
+ } else {
+ inUse = true;
+ ScopedRelease release(inUse, lock);
+ sys::Mutex::ScopedUnlock l(lock);
+ //wait for an incoming message
+ wait(get_duration(timeout, deadline));
+ }
+ if (!(AbsTime::now() < deadline)) break;
+ }
+ if (!received.empty()) {
+ destination = received.front()->as<MessageTransferBody>()->getDestination();
+ return true;
+ } else {
+ return false;
+ }
+}
+
+void IncomingMessages::accept()
+{
+ sys::Mutex::ScopedLock l(lock);
+ acceptTracker.accept(session);
+}
+
+void IncomingMessages::accept(qpid::framing::SequenceNumber id, bool cumulative)
+{
+ sys::Mutex::ScopedLock l(lock);
+ acceptTracker.accept(id, session, cumulative);
+}
+
+
+void IncomingMessages::releaseAll()
+{
+ {
+ //first process any received messages...
+ sys::Mutex::ScopedLock l(lock);
+ while (!received.empty()) {
+ retrieve(received.front(), 0);
+ received.pop_front();
+ }
+ }
+ //then pump out any available messages from incoming queue...
+ GetAny handler;
+ while (process(&handler, 0) == OK) ;
+ //now release all messages
+ sys::Mutex::ScopedLock l(lock);
+ acceptTracker.release(session);
+}
+
+void IncomingMessages::releasePending(const std::string& destination)
+{
+ //first pump all available messages from incoming to received...
+ while (process(0, 0) == OK) ;
+
+ //now remove all messages for this destination from received list, recording their ids...
+ sys::Mutex::ScopedLock l(lock);
+ MatchAndTrack match(destination);
+ for (FrameSetQueue::iterator i = received.begin(); i != received.end(); i = match(*i) ? received.erase(i) : ++i) ;
+ //now release those messages
+ session.messageRelease(match.ids);
+}
+
+bool IncomingMessages::pop(FrameSet::shared_ptr& content, qpid::sys::Duration timeout)
+{
+ try {
+ return incoming->pop(content, timeout);
+ } catch (const Wakeup&) {
+ incoming->open();
+ return false;
+ }
+}
+
+/**
+ * Get a frameset that is accepted by the specified handler from
+ * session queue, waiting for up to the specified duration and
+ * returning true if this could be achieved, false otherwise. Messages
+ * that are not accepted by the handler are pushed onto received queue
+ * for later retrieval.
+ */
+IncomingMessages::ProcessState IncomingMessages::process(Handler* handler, qpid::sys::Duration duration)
+{
+ AbsTime deadline(AbsTime::now(), duration);
+ FrameSet::shared_ptr content;
+ try {
+ for (Duration timeout = duration; pop(content, timeout); timeout = Duration(AbsTime::now(), deadline)) {
+ if (content->isA<MessageTransferBody>()) {
+ MessageTransfer transfer(content, *this);
+ if (transfer.checkExpired()) {
+ QPID_LOG(debug, "Expired received transfer: " << *content->getMethod());
+ } else if (handler && handler->accept(transfer)) {
+ QPID_LOG(debug, "Delivered " << *content->getMethod() << " "
+ << *content->getHeaders());
+ return OK;
+ } else {
+ //received message for another destination, keep for later
+ QPID_LOG(debug, "Pushed " << *content->getMethod() << " to received queue");
+ sys::Mutex::ScopedLock l(lock);
+ received.push_back(content);
+ lock.notifyAll();
+ }
+ } else {
+ //TODO: handle other types of commands (e.g. message-accept, message-flow etc)
+ }
+ }
+ }
+ catch (const qpid::ClosedException&) { return CLOSED; }
+ return EMPTY;
+}
+
+bool IncomingMessages::wait(qpid::sys::Duration duration)
+{
+ AbsTime deadline(AbsTime::now(), duration);
+ FrameSet::shared_ptr content;
+ for (Duration timeout = duration; pop(content, timeout); timeout = Duration(AbsTime::now(), deadline)) {
+ if (content->isA<MessageTransferBody>()) {
+ QPID_LOG(debug, "Pushed " << *content->getMethod() << " to received queue");
+ sys::Mutex::ScopedLock l(lock);
+ received.push_back(content);
+ lock.notifyAll();
+ return true;
+ } else {
+ //TODO: handle other types of commands (e.g. message-accept, message-flow etc)
+ }
+ }
+ return false;
+}
+
+uint32_t IncomingMessages::pendingAccept()
+{
+ sys::Mutex::ScopedLock l(lock);
+ return acceptTracker.acceptsPending();
+}
+uint32_t IncomingMessages::pendingAccept(const std::string& destination)
+{
+ sys::Mutex::ScopedLock l(lock);
+ return acceptTracker.acceptsPending(destination);
+}
+
+uint32_t IncomingMessages::available()
+{
+ //first pump all available messages from incoming to received...
+ while (process(0, 0) == OK) {}
+ //return the count of received messages
+ sys::Mutex::ScopedLock l(lock);
+ return received.size();
+}
+
+uint32_t IncomingMessages::available(const std::string& destination)
+{
+ //first pump all available messages from incoming to received...
+ while (process(0, 0) == OK) {}
+
+ //count all messages for this destination from received list
+ sys::Mutex::ScopedLock l(lock);
+ return std::for_each(received.begin(), received.end(), Match(destination)).matched;
+}
+
+void populate(qpid::messaging::Message& message, FrameSet& command);
+
+/**
+ * Called when message is retrieved; records retrieval for subsequent
+ * acceptance, marks the command as completed and converts command to
+ * message if message is required
+ */
+void IncomingMessages::retrieve(FrameSetPtr command, qpid::messaging::Message* message)
+{
+ if (message) {
+ populate(*message, *command);
+ }
+ const MessageTransferBody* transfer = command->as<MessageTransferBody>();
+ if (transfer->getAcceptMode() == ACCEPT_MODE_EXPLICIT) {
+ sys::Mutex::ScopedLock l(lock);
+ acceptTracker.delivered(transfer->getDestination(), command->getId());
+ }
+ session.markCompleted(command->getId(), false, false);
+}
+
+IncomingMessages::MessageTransfer::MessageTransfer(FrameSetPtr c, IncomingMessages& p) : content(c), parent(p) {}
+
+const std::string& IncomingMessages::MessageTransfer::getDestination()
+{
+ return content->as<MessageTransferBody>()->getDestination();
+}
+void IncomingMessages::MessageTransfer::retrieve(qpid::messaging::Message* message)
+{
+ parent.retrieve(content, message);
+}
+
+bool IncomingMessages::MessageTransfer::checkExpired()
+{
+ if (content->hasExpired()) {
+ retrieve(0);
+ parent.accept(content->getId(), false);
+ return true;
+ } else {
+ return false;
+ }
+}
+
+namespace {
+//TODO: unify conversion to and from 0-10 message that is currently
+//split between IncomingMessages and OutgoingMessage
+const std::string SUBJECT("qpid.subject");
+
+const std::string X_APP_ID("x-amqp-0-10.app-id");
+const std::string X_ROUTING_KEY("x-amqp-0-10.routing-key");
+const std::string X_CONTENT_ENCODING("x-amqp-0-10.content-encoding");
+const std::string X_TIMESTAMP("x-amqp-0-10.timestamp");
+}
+
+void populateHeaders(qpid::messaging::Message& message,
+ const DeliveryProperties* deliveryProperties,
+ const MessageProperties* messageProperties)
+{
+ if (deliveryProperties) {
+ message.setTtl(qpid::messaging::Duration(deliveryProperties->getTtl()));
+ message.setDurable(deliveryProperties->getDeliveryMode() == DELIVERY_MODE_PERSISTENT);
+ message.setPriority(deliveryProperties->getPriority());
+ message.setRedelivered(deliveryProperties->getRedelivered());
+ }
+ if (messageProperties) {
+ message.setContentType(messageProperties->getContentType());
+ if (messageProperties->hasReplyTo()) {
+ message.setReplyTo(AddressResolution::convert(messageProperties->getReplyTo()));
+ }
+ message.setSubject(messageProperties->getApplicationHeaders().getAsString(SUBJECT));
+ message.getProperties().clear();
+ translate(messageProperties->getApplicationHeaders(), message.getProperties());
+ message.setCorrelationId(messageProperties->getCorrelationId());
+ message.setUserId(messageProperties->getUserId());
+ if (messageProperties->hasMessageId()) {
+ message.setMessageId(messageProperties->getMessageId().str());
+ }
+ //expose 0-10 specific items through special properties:
+ // app-id, content-encoding
+ if (messageProperties->hasAppId()) {
+ message.getProperties()[X_APP_ID] = messageProperties->getAppId();
+ }
+ if (messageProperties->hasContentEncoding()) {
+ message.getProperties()[X_CONTENT_ENCODING] = messageProperties->getContentEncoding();
+ }
+ // routing-key, timestamp, others?
+ if (deliveryProperties && deliveryProperties->hasRoutingKey()) {
+ message.getProperties()[X_ROUTING_KEY] = deliveryProperties->getRoutingKey();
+ }
+ if (deliveryProperties && deliveryProperties->hasTimestamp()) {
+ message.getProperties()[X_TIMESTAMP] = deliveryProperties->getTimestamp();
+ }
+ }
+}
+
+void populateHeaders(qpid::messaging::Message& message, const AMQHeaderBody* headers)
+{
+ populateHeaders(message, headers->get<DeliveryProperties>(), headers->get<MessageProperties>());
+}
+
+void populate(qpid::messaging::Message& message, FrameSet& command)
+{
+ //need to be able to link the message back to the transfer it was delivered by
+ //e.g. for rejecting.
+ MessageImplAccess::get(message).setInternalId(command.getId());
+
+ message.setContent(command.getContent());
+
+ populateHeaders(message, command.getHeaders());
+}
+
+
+}}} // namespace qpid::client::amqp0_10
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h
new file mode 100644
index 0000000000..4c9ee68ece
--- /dev/null
+++ b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h
@@ -0,0 +1,108 @@
+#ifndef QPID_CLIENT_AMQP0_10_INCOMINGMESSAGES_H
+#define QPID_CLIENT_AMQP0_10_INCOMINGMESSAGES_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <string>
+#include <boost/shared_ptr.hpp>
+#include "qpid/client/AsyncSession.h"
+#include "qpid/framing/SequenceSet.h"
+#include "qpid/sys/BlockingQueue.h"
+#include "qpid/sys/Time.h"
+#include "qpid/client/amqp0_10/AcceptTracker.h"
+
+namespace qpid {
+
+namespace framing{
+class FrameSet;
+}
+
+namespace messaging {
+class Message;
+}
+
+namespace client {
+namespace amqp0_10 {
+
+/**
+ * Queue of incoming messages.
+ */
+class IncomingMessages
+{
+ public:
+ typedef boost::shared_ptr<qpid::framing::FrameSet> FrameSetPtr;
+ class MessageTransfer
+ {
+ public:
+ const std::string& getDestination();
+ void retrieve(qpid::messaging::Message* message);
+ private:
+ FrameSetPtr content;
+ IncomingMessages& parent;
+ bool checkExpired();
+
+ MessageTransfer(FrameSetPtr, IncomingMessages&);
+ friend class IncomingMessages;
+ };
+
+ struct Handler
+ {
+ virtual ~Handler() {}
+ virtual bool accept(MessageTransfer& transfer) = 0;
+ virtual bool isClosed() { return false; }
+ };
+
+ IncomingMessages();
+ void setSession(qpid::client::AsyncSession session);
+ bool get(Handler& handler, qpid::sys::Duration timeout);
+ void wakeup();
+ bool getNextDestination(std::string& destination, qpid::sys::Duration timeout);
+ void accept();
+ void accept(qpid::framing::SequenceNumber id, bool cumulative);
+ void releaseAll();
+ void releasePending(const std::string& destination);
+
+ uint32_t pendingAccept();
+ uint32_t pendingAccept(const std::string& destination);
+
+ uint32_t available();
+ uint32_t available(const std::string& destination);
+ private:
+ typedef std::deque<FrameSetPtr> FrameSetQueue;
+ enum ProcessState {EMPTY=0,OK=1,CLOSED=2};
+
+ sys::Monitor lock;
+ qpid::client::AsyncSession session;
+ boost::shared_ptr< sys::BlockingQueue<FrameSetPtr> > incoming;
+ bool inUse;
+ FrameSetQueue received;
+ AcceptTracker acceptTracker;
+
+ ProcessState process(Handler*, qpid::sys::Duration);
+ bool wait(qpid::sys::Duration);
+ bool pop(FrameSetPtr&, qpid::sys::Duration);
+
+ void retrieve(FrameSetPtr, qpid::messaging::Message*);
+
+};
+}}} // namespace qpid::client::amqp0_10
+
+#endif /*!QPID_CLIENT_AMQP0_10_INCOMINGMESSAGES_H*/
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/MessageSink.h b/qpid/cpp/src/qpid/client/amqp0_10/MessageSink.h
new file mode 100644
index 0000000000..d66d2ecb3c
--- /dev/null
+++ b/qpid/cpp/src/qpid/client/amqp0_10/MessageSink.h
@@ -0,0 +1,52 @@
+#ifndef QPID_CLIENT_AMQP0_10_MESSAGESINK_H
+#define QPID_CLIENT_AMQP0_10_MESSAGESINK_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <string>
+#include "qpid/client/AsyncSession.h"
+
+namespace qpid {
+
+namespace messaging {
+class Message;
+}
+
+namespace client {
+namespace amqp0_10 {
+
+class OutgoingMessage;
+
+/**
+ *
+ */
+class MessageSink
+{
+ public:
+ virtual ~MessageSink() {}
+ virtual void declare(qpid::client::AsyncSession& session, const std::string& name) = 0;
+ virtual void send(qpid::client::AsyncSession& session, const std::string& name, OutgoingMessage& message) = 0;
+ virtual void cancel(qpid::client::AsyncSession& session, const std::string& name) = 0;
+ private:
+};
+}}} // namespace qpid::client::amqp0_10
+
+#endif /*!QPID_CLIENT_AMQP0_10_MESSAGESINK_H*/
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/MessageSource.h b/qpid/cpp/src/qpid/client/amqp0_10/MessageSource.h
new file mode 100644
index 0000000000..74f2732f59
--- /dev/null
+++ b/qpid/cpp/src/qpid/client/amqp0_10/MessageSource.h
@@ -0,0 +1,47 @@
+#ifndef QPID_CLIENT_AMQP0_10_MESSAGESOURCE_H
+#define QPID_CLIENT_AMQP0_10_MESSAGESOURCE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <string>
+#include "qpid/client/AsyncSession.h"
+
+namespace qpid {
+namespace client {
+namespace amqp0_10 {
+
+/**
+ * Abstraction behind which the AMQP 0-10 commands required to
+ * establish (and tear down) an incoming stream of messages from a
+ * given address are hidden.
+ */
+class MessageSource
+{
+ public:
+ virtual ~MessageSource() {}
+ virtual void subscribe(qpid::client::AsyncSession& session, const std::string& destination) = 0;
+ virtual void cancel(qpid::client::AsyncSession& session, const std::string& destination) = 0;
+
+ private:
+};
+}}} // namespace qpid::client::amqp0_10
+
+#endif /*!QPID_CLIENT_AMQP0_10_MESSAGESOURCE_H*/
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp b/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp
new file mode 100644
index 0000000000..f2b205a78a
--- /dev/null
+++ b/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp
@@ -0,0 +1,170 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/client/amqp0_10/OutgoingMessage.h"
+#include "qpid/client/amqp0_10/AddressResolution.h"
+#include "qpid/amqp_0_10/Codecs.h"
+#include "qpid/types/encodings.h"
+#include "qpid/types/Variant.h"
+#include "qpid/messaging/Address.h"
+#include "qpid/messaging/Duration.h"
+#include "qpid/messaging/Message.h"
+#include "qpid/messaging/MessageImpl.h"
+#include "qpid/framing/enum.h"
+#include "qpid/log/Statement.h"
+#include <sstream>
+
+namespace qpid {
+namespace client {
+namespace amqp0_10 {
+
+using qpid::messaging::Address;
+using qpid::messaging::MessageImplAccess;
+using qpid::types::Variant;
+using namespace qpid::framing::message;
+using namespace qpid::amqp_0_10;
+
+namespace {
+//TODO: unify conversion to and from 0-10 message that is currently
+//split between IncomingMessages and OutgoingMessage
+const std::string SUBJECT("qpid.subject");
+const std::string X_APP_ID("x-amqp-0-10.app-id");
+const std::string X_ROUTING_KEY("x-amqp-0-10.routing-key");
+const std::string X_CONTENT_ENCODING("x-amqp-0-10.content-encoding");
+const std::string TEXT_PLAIN("text/plain");
+}
+
+void OutgoingMessage::convert(const qpid::messaging::Message& from)
+{
+ //TODO: need to avoid copying as much as possible
+ if (from.getContentObject().getType() == qpid::types::VAR_MAP) {
+ std::string content;
+ qpid::amqp_0_10::MapCodec::encode(from.getContentObject().asMap(), content);
+ message.getMessageProperties().setContentType(qpid::amqp_0_10::MapCodec::contentType);
+ message.setData(content);
+ } else if (from.getContentObject().getType() == qpid::types::VAR_LIST) {
+ std::string content;
+ qpid::amqp_0_10::ListCodec::encode(from.getContentObject().asList(), content);
+ message.getMessageProperties().setContentType(qpid::amqp_0_10::ListCodec::contentType);
+ message.setData(content);
+ } else if (from.getContentObject().getType() == qpid::types::VAR_STRING &&
+ (from.getContentObject().getEncoding() == qpid::types::encodings::UTF8 || from.getContentObject().getEncoding() == qpid::types::encodings::ASCII)) {
+ message.getMessageProperties().setContentType(TEXT_PLAIN);
+ message.setData(from.getContent());
+ } else {
+ message.setData(from.getContent());
+ message.getMessageProperties().setContentType(from.getContentType());
+ }
+ if ( !from.getCorrelationId().empty() )
+ message.getMessageProperties().setCorrelationId(from.getCorrelationId());
+ message.getMessageProperties().setUserId(from.getUserId());
+ const Address& address = from.getReplyTo();
+ if (address) {
+ message.getMessageProperties().setReplyTo(AddressResolution::convert(address));
+ }
+ if (!subject.empty()) {
+ Variant v(subject); v.setEncoding("utf8");
+ translate(from.getProperties(), SUBJECT, v, message.getMessageProperties().getApplicationHeaders());
+ } else {
+ translate(from.getProperties(), message.getMessageProperties().getApplicationHeaders());
+ }
+ if (from.getTtl().getMilliseconds()) {
+ message.getDeliveryProperties().setTtl(from.getTtl().getMilliseconds());
+ }
+ if (from.getDurable()) {
+ message.getDeliveryProperties().setDeliveryMode(DELIVERY_MODE_PERSISTENT);
+ }
+ if (from.getRedelivered()) {
+ message.getDeliveryProperties().setRedelivered(true);
+ }
+ if (from.getPriority()) message.getDeliveryProperties().setPriority(from.getPriority());
+
+ //allow certain 0-10 specific items to be set through special properties:
+ // message-id, app-id, content-encoding
+ if (from.getMessageId().size()) {
+ qpid::framing::Uuid uuid;
+ std::istringstream data(from.getMessageId());
+ data >> uuid;
+ message.getMessageProperties().setMessageId(uuid);
+ }
+ Variant::Map::const_iterator i;
+ i = from.getProperties().find(X_APP_ID);
+ if (i != from.getProperties().end()) {
+ message.getMessageProperties().setAppId(i->second.asString());
+ }
+ i = from.getProperties().find(X_CONTENT_ENCODING);
+ if (i != from.getProperties().end()) {
+ message.getMessageProperties().setContentEncoding(i->second.asString());
+ }
+ base = qpid::sys::now();
+}
+
+void OutgoingMessage::setSubject(const std::string& s)
+{
+ subject = s;
+}
+
+std::string OutgoingMessage::getSubject() const
+{
+ return subject;
+}
+
+void OutgoingMessage::send(qpid::client::AsyncSession& session, const std::string& destination, const std::string& routingKey)
+{
+ if (!expired) {
+ message.getDeliveryProperties().setRoutingKey(routingKey);
+ status = session.messageTransfer(arg::destination=destination, arg::content=message);
+ if (destination.empty()) {
+ QPID_LOG(debug, "Sending to queue " << routingKey << " " << message.getMessageProperties() << " " << message.getDeliveryProperties());
+ } else {
+ QPID_LOG(debug, "Sending to exchange " << destination << " " << message.getMessageProperties() << " " << message.getDeliveryProperties());
+ }
+ }
+}
+void OutgoingMessage::send(qpid::client::AsyncSession& session, const std::string& routingKey)
+{
+ send(session, std::string(), routingKey);
+}
+
+bool OutgoingMessage::isComplete()
+{
+ return expired || (status.isValid() && status.isComplete());
+}
+void OutgoingMessage::markRedelivered()
+{
+ message.setRedelivered(true);
+ if (message.getDeliveryProperties().hasTtl()) {
+ uint64_t delta = qpid::sys::Duration(base, qpid::sys::now())/qpid::sys::TIME_MSEC;
+ uint64_t ttl = message.getDeliveryProperties().getTtl();
+ if (ttl <= delta) {
+ QPID_LOG(debug, "Expiring outgoing message (" << ttl << " < " << delta << ")");
+ expired = true;
+ message.getDeliveryProperties().setTtl(1);
+ } else {
+ QPID_LOG(debug, "Adjusting ttl on outgoing message from " << ttl << " by " << delta);
+ ttl = ttl - delta;
+ message.getDeliveryProperties().setTtl(ttl);
+ }
+ }
+}
+OutgoingMessage::OutgoingMessage() : expired (false) {}
+
+
+}}} // namespace qpid::client::amqp0_10
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.h b/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.h
new file mode 100644
index 0000000000..a17ef03e10
--- /dev/null
+++ b/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.h
@@ -0,0 +1,60 @@
+#ifndef QPID_CLIENT_AMQP0_10_OUTGOINGMESSAGE_H
+#define QPID_CLIENT_AMQP0_10_OUTGOINGMESSAGE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/client/AsyncSession.h"
+#include "qpid/client/Completion.h"
+#include "qpid/client/Message.h"
+#include "qpid/sys/Time.h"
+
+namespace qpid {
+namespace messaging {
+class Message;
+}
+namespace client {
+namespace amqp0_10 {
+
+class OutgoingMessage
+{
+ private:
+ qpid::client::Message message;
+ qpid::client::Completion status;
+ std::string subject;
+ qpid::sys::AbsTime base;
+ bool expired;
+
+ public:
+ OutgoingMessage();
+ void convert(const qpid::messaging::Message&);
+ void setSubject(const std::string& subject);
+ std::string getSubject() const;
+ void send(qpid::client::AsyncSession& session, const std::string& destination, const std::string& routingKey);
+ void send(qpid::client::AsyncSession& session,const std::string& routingKey);
+ bool isComplete();
+ void markRedelivered();
+};
+
+
+
+}}} // namespace qpid::client::amqp0_10
+
+#endif /*!QPID_CLIENT_AMQP0_10_OUTGOINGMESSAGE_H*/
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
new file mode 100644
index 0000000000..c356bc298b
--- /dev/null
+++ b/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
@@ -0,0 +1,263 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "ReceiverImpl.h"
+#include "AddressResolution.h"
+#include "MessageSource.h"
+#include "SessionImpl.h"
+#include "qpid/messaging/exceptions.h"
+#include "qpid/messaging/Receiver.h"
+#include "qpid/messaging/Session.h"
+#include "qpid/amqp_0_10/Codecs.h"
+#include "qpid/types/encodings.h"
+
+namespace qpid {
+namespace client {
+namespace amqp0_10 {
+
+using qpid::messaging::NoMessageAvailable;
+using qpid::messaging::Receiver;
+using qpid::messaging::Duration;
+
+void ReceiverImpl::received(qpid::messaging::Message&)
+{
+ //TODO: should this be configurable
+ sys::Mutex::ScopedLock l(lock);
+ if (capacity && --window <= capacity/2) {
+ session.sendCompletion();
+ window = capacity;
+ }
+}
+
+qpid::messaging::Message ReceiverImpl::get(qpid::messaging::Duration timeout)
+{
+ qpid::messaging::Message result;
+ if (!get(result, timeout)) throw NoMessageAvailable();
+ return result;
+}
+
+qpid::messaging::Message ReceiverImpl::fetch(qpid::messaging::Duration timeout)
+{
+ qpid::messaging::Message result;
+ if (!fetch(result, timeout)) throw NoMessageAvailable();
+ return result;
+}
+
+bool ReceiverImpl::get(qpid::messaging::Message& message, qpid::messaging::Duration timeout)
+{
+ Get f(*this, message, timeout);
+ while (!parent->execute(f)) {}
+ return f.result;
+}
+
+bool ReceiverImpl::fetch(qpid::messaging::Message& message, qpid::messaging::Duration timeout)
+{
+ Fetch f(*this, message, timeout);
+ while (!parent->execute(f)) {}
+ return f.result;
+}
+
+void ReceiverImpl::close()
+{
+ execute<Close>();
+}
+
+void ReceiverImpl::start()
+{
+ sys::Mutex::ScopedLock l(lock);
+ if (state == STOPPED) {
+ state = STARTED;
+ startFlow(l);
+ session.sendCompletion();
+ }
+}
+
+void ReceiverImpl::stop()
+{
+ sys::Mutex::ScopedLock l(lock);
+ state = STOPPED;
+ session.messageStop(destination);
+}
+
+void ReceiverImpl::setCapacity(uint32_t c)
+{
+ execute1<SetCapacity>(c);
+}
+
+void ReceiverImpl::startFlow(const sys::Mutex::ScopedLock&)
+{
+ if (capacity > 0) {
+ session.messageSetFlowMode(destination, FLOW_MODE_WINDOW);
+ session.messageFlow(destination, CREDIT_UNIT_MESSAGE, capacity);
+ session.messageFlow(destination, CREDIT_UNIT_BYTE, byteCredit);
+ window = capacity;
+ }
+}
+
+void ReceiverImpl::init(qpid::client::AsyncSession s, AddressResolution& resolver)
+{
+ sys::Mutex::ScopedLock l(lock);
+ session = s;
+ if (state == CANCELLED) return;
+ if (state == UNRESOLVED) {
+ source = resolver.resolveSource(session, address);
+ assert(source.get());
+ state = STARTED;
+ }
+ source->subscribe(session, destination);
+ startFlow(l);
+}
+
+const std::string& ReceiverImpl::getName() const {
+ return destination;
+}
+
+uint32_t ReceiverImpl::getCapacity()
+{
+ sys::Mutex::ScopedLock l(lock);
+ return capacity;
+}
+
+uint32_t ReceiverImpl::getAvailable()
+{
+ return parent->getReceivable(destination);
+}
+
+uint32_t ReceiverImpl::getUnsettled()
+{
+ return parent->getUnsettledAcks(destination);
+}
+
+qpid::messaging::Address ReceiverImpl::getAddress() const
+{
+ return address;
+}
+
+ReceiverImpl::ReceiverImpl(SessionImpl& p, const std::string& name,
+ const qpid::messaging::Address& a, bool autoDecode_) :
+
+ parent(&p), destination(name), address(a), byteCredit(0xFFFFFFFF), autoDecode(autoDecode_),
+ state(UNRESOLVED), capacity(0), window(0) {}
+
+namespace {
+const std::string TEXT_PLAIN("text/plain");
+}
+
+bool ReceiverImpl::getImpl(qpid::messaging::Message& message, qpid::messaging::Duration timeout)
+{
+ {
+ sys::Mutex::ScopedLock l(lock);
+ if (state == CANCELLED) return false;
+ }
+ if (parent->get(*this, message, timeout)) {
+ if (autoDecode) {
+ if (message.getContentType() == qpid::amqp_0_10::MapCodec::contentType) {
+ message.getContentObject() = qpid::types::Variant::Map();
+ decode(message, message.getContentObject().asMap());
+ } else if (message.getContentType() == qpid::amqp_0_10::ListCodec::contentType) {
+ message.getContentObject() = qpid::types::Variant::List();
+ decode(message, message.getContentObject().asList());
+ } else if (!message.getContentBytes().empty()) {
+ message.getContentObject() = message.getContentBytes();
+ if (message.getContentType() == TEXT_PLAIN) {
+ message.getContentObject().setEncoding(qpid::types::encodings::UTF8);
+ } else {
+ message.getContentObject().setEncoding(qpid::types::encodings::BINARY);
+ }
+ }
+ }
+ return true;
+ } else {
+ return false;
+ }
+}
+
+bool ReceiverImpl::fetchImpl(qpid::messaging::Message& message, qpid::messaging::Duration timeout)
+{
+ {
+ sys::Mutex::ScopedLock l(lock);
+ if (state == CANCELLED) return false;
+
+ if (capacity == 0 || state != STARTED) {
+ session.messageSetFlowMode(destination, FLOW_MODE_CREDIT);
+ session.messageFlow(destination, CREDIT_UNIT_MESSAGE, 1);
+ session.messageFlow(destination, CREDIT_UNIT_BYTE, 0xFFFFFFFF);
+ }
+ }
+ if (getImpl(message, timeout)) {
+ return true;
+ } else {
+ qpid::client::Session s;
+ {
+ sys::Mutex::ScopedLock l(lock);
+ if (state == CANCELLED) return false; // Might have been closed during get.
+ s = sync(session);
+ }
+ s.messageFlush(destination);
+ {
+ sys::Mutex::ScopedLock l(lock);
+ startFlow(l); //reallocate credit
+ session.sendCompletion();//ensure previously received messages are signalled as completed
+ }
+ return getImpl(message, Duration::IMMEDIATE);
+ }
+}
+
+void ReceiverImpl::closeImpl()
+{
+ sys::Mutex::ScopedLock l(lock);
+ if (state != CANCELLED) {
+ state = CANCELLED;
+ sync(session).messageStop(destination);
+ {
+ sys::Mutex::ScopedUnlock l(lock);
+ parent->releasePending(destination);
+ }
+ source->cancel(session, destination);
+ {
+ sys::Mutex::ScopedUnlock l(lock);
+ parent->receiverCancelled(destination);
+ }
+ }
+}
+
+bool ReceiverImpl::isClosed() const {
+ sys::Mutex::ScopedLock l(lock);
+ return state == CANCELLED;
+}
+
+void ReceiverImpl::setCapacityImpl(uint32_t c)
+{
+ sys::Mutex::ScopedLock l(lock);
+ if (c != capacity) {
+ capacity = c;
+ if (state == STARTED) {
+ session.messageStop(destination);
+ startFlow(l);
+ }
+ }
+}
+
+qpid::messaging::Session ReceiverImpl::getSession() const
+{
+ return qpid::messaging::Session(parent.get());
+}
+
+}}} // namespace qpid::client::amqp0_10
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h b/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h
new file mode 100644
index 0000000000..0d3366907b
--- /dev/null
+++ b/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h
@@ -0,0 +1,152 @@
+#ifndef QPID_CLIENT_AMQP0_10_RECEIVERIMPL_H
+#define QPID_CLIENT_AMQP0_10_RECEIVERIMPL_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/messaging/Address.h"
+#include "qpid/messaging/Message.h"
+#include "qpid/messaging/ReceiverImpl.h"
+#include "qpid/client/AsyncSession.h"
+#include "qpid/client/amqp0_10/SessionImpl.h"
+#include "qpid/messaging/Duration.h"
+#include "qpid/sys/Mutex.h"
+#include <boost/intrusive_ptr.hpp>
+#include <memory>
+
+namespace qpid {
+namespace client {
+namespace amqp0_10 {
+
+class AddressResolution;
+class MessageSource;
+
+/**
+ * A receiver implementation based on an AMQP 0-10 subscription.
+ */
+class ReceiverImpl : public qpid::messaging::ReceiverImpl
+{
+ public:
+
+ enum State {UNRESOLVED, STOPPED, STARTED, CANCELLED};
+
+ ReceiverImpl(SessionImpl& parent, const std::string& name,
+ const qpid::messaging::Address& address, bool autoDecode);
+
+ void init(qpid::client::AsyncSession session, AddressResolution& resolver);
+ bool get(qpid::messaging::Message& message, qpid::messaging::Duration timeout);
+ qpid::messaging::Message get(qpid::messaging::Duration timeout);
+ bool fetch(qpid::messaging::Message& message, qpid::messaging::Duration timeout);
+ qpid::messaging::Message fetch(qpid::messaging::Duration timeout);
+ void close();
+ void start();
+ void stop();
+ const std::string& getName() const;
+ void setCapacity(uint32_t);
+ uint32_t getCapacity();
+ uint32_t getAvailable();
+ uint32_t getUnsettled();
+ void received(qpid::messaging::Message& message);
+ qpid::messaging::Session getSession() const;
+ bool isClosed() const;
+ qpid::messaging::Address getAddress() const;
+
+ private:
+ mutable sys::Mutex lock;
+ boost::intrusive_ptr<SessionImpl> parent;
+ const std::string destination;
+ const qpid::messaging::Address address;
+ const uint32_t byteCredit;
+ const bool autoDecode;
+ State state;
+
+ std::auto_ptr<MessageSource> source;
+ uint32_t capacity;
+ qpid::client::AsyncSession session;
+ uint32_t window;
+
+ void startFlow(const sys::Mutex::ScopedLock&); // Dummy param, call with lock held
+ //implementation of public facing methods
+ bool fetchImpl(qpid::messaging::Message& message, qpid::messaging::Duration timeout);
+ bool getImpl(qpid::messaging::Message& message, qpid::messaging::Duration timeout);
+ void closeImpl();
+ void setCapacityImpl(uint32_t);
+
+ //functors for public facing methods.
+ struct Command
+ {
+ ReceiverImpl& impl;
+
+ Command(ReceiverImpl& i) : impl(i) {}
+ };
+
+ struct Get : Command
+ {
+ qpid::messaging::Message& message;
+ qpid::messaging::Duration timeout;
+ bool result;
+
+ Get(ReceiverImpl& i, qpid::messaging::Message& m, qpid::messaging::Duration t) :
+ Command(i), message(m), timeout(t), result(false) {}
+ void operator()() { result = impl.getImpl(message, timeout); }
+ };
+
+ struct Fetch : Command
+ {
+ qpid::messaging::Message& message;
+ qpid::messaging::Duration timeout;
+ bool result;
+
+ Fetch(ReceiverImpl& i, qpid::messaging::Message& m, qpid::messaging::Duration t) :
+ Command(i), message(m), timeout(t), result(false) {}
+ void operator()() { result = impl.fetchImpl(message, timeout); }
+ };
+
+ struct Close : Command
+ {
+ Close(ReceiverImpl& i) : Command(i) {}
+ void operator()() { impl.closeImpl(); }
+ };
+
+ struct SetCapacity : Command
+ {
+ uint32_t capacity;
+
+ SetCapacity(ReceiverImpl& i, uint32_t c) : Command(i), capacity(c) {}
+ void operator()() { impl.setCapacityImpl(capacity); }
+ };
+
+ //helper templates for some common patterns
+ template <class F> void execute()
+ {
+ F f(*this);
+ parent->execute(f);
+ }
+
+ template <class F, class P> void execute1(P p)
+ {
+ F f(*this, p);
+ parent->execute(f);
+ }
+};
+
+}}} // namespace qpid::client::amqp0_10
+
+#endif /*!QPID_CLIENT_AMQP0_10_RECEIVERIMPL_H*/
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
new file mode 100644
index 0000000000..7575aaa306
--- /dev/null
+++ b/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
@@ -0,0 +1,206 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "SenderImpl.h"
+#include "MessageSink.h"
+#include "SessionImpl.h"
+#include "AddressResolution.h"
+#include "OutgoingMessage.h"
+#include "qpid/messaging/Session.h"
+
+namespace qpid {
+namespace client {
+namespace amqp0_10 {
+
+SenderImpl::SenderImpl(SessionImpl& _parent, const std::string& _name,
+ const qpid::messaging::Address& _address, bool _autoReconnect) :
+ parent(&_parent), autoReconnect(_autoReconnect), name(_name), address(_address), state(UNRESOLVED),
+ capacity(50), window(0), flushed(false), unreliable(AddressResolution::is_unreliable(address)) {}
+
+qpid::messaging::Address SenderImpl::getAddress() const
+{
+ return address;
+}
+
+void SenderImpl::send(const qpid::messaging::Message& message, bool sync)
+{
+ if (unreliable) { // immutable, don't need lock
+ UnreliableSend f(*this, message);
+ parent->execute(f);
+ } else {
+ Send f(*this, message);
+ while (f.repeat) parent->execute(f);
+ }
+ if (sync) parent->sync(true);
+}
+
+void SenderImpl::close()
+{
+ execute<Close>();
+}
+
+void SenderImpl::setCapacity(uint32_t c)
+{
+ bool flush;
+ {
+ sys::Mutex::ScopedLock l(lock);
+ flush = c < capacity;
+ capacity = c;
+ }
+ execute1<CheckPendingSends>(flush);
+}
+
+uint32_t SenderImpl::getCapacity() {
+ sys::Mutex::ScopedLock l(lock);
+ return capacity;
+}
+
+uint32_t SenderImpl::getUnsettled()
+{
+ CheckPendingSends f(*this, false);
+ parent->execute(f);
+ return f.pending;
+}
+
+void SenderImpl::init(qpid::client::AsyncSession s, AddressResolution& resolver)
+{
+ sys::Mutex::ScopedLock l(lock);
+ session = s;
+ if (state == UNRESOLVED) {
+ sink = resolver.resolveSink(session, address);
+ state = ACTIVE;
+ }
+ if (state == CANCELLED) {
+ sink->cancel(session, name);
+ sys::Mutex::ScopedUnlock u(lock);
+ parent->senderCancelled(name);
+ } else {
+ sink->declare(session, name);
+ replay(l);
+ }
+}
+
+void SenderImpl::waitForCapacity()
+{
+ sys::Mutex::ScopedLock l(lock);
+ try {
+ //TODO: add option to throw exception rather than blocking?
+ if (!unreliable && capacity <=
+ (flushed ? checkPendingSends(false, l) : outgoing.size()))
+ {
+ //Initial implementation is very basic. As outgoing is
+ //currently only reduced on receiving completions and we are
+ //blocking anyway we may as well sync(). If successful that
+ //should clear all outstanding sends.
+ session.sync();
+ checkPendingSends(false, l);
+ }
+ //flush periodically and check for conmpleted sends
+ if (++window > (capacity / 4)) {//TODO: make this configurable?
+ checkPendingSends(true, l);
+ window = 0;
+ }
+ } catch (const qpid::TransportFailure&) {
+ //Disconnection prevents flushing or syncing. If we have any
+ //capacity we will return anyway (the subsequent attempt to
+ //send will fail, but message will be on replay buffer).
+ if (capacity > outgoing.size()) return;
+ //If we are out of capacity, but autoreconnect is on, then
+ //rethrow the transport failure to trigger reconnect which
+ //will have the effect of blocking until connected and
+ //capacity is freed up
+ if (autoReconnect) throw;
+ //Otherwise, in order to clearly signal to the application
+ //that the message was not pushed to replay buffer, throw an
+ //out of capacity error
+ throw qpid::messaging::OutOfCapacity(name);
+ }
+}
+
+void SenderImpl::sendImpl(const qpid::messaging::Message& m)
+{
+ sys::Mutex::ScopedLock l(lock);
+ std::auto_ptr<OutgoingMessage> msg(new OutgoingMessage());
+ msg->setSubject(m.getSubject().empty() ? address.getSubject() : m.getSubject());
+ msg->convert(m);
+ outgoing.push_back(msg.release());
+ sink->send(session, name, outgoing.back());
+}
+
+void SenderImpl::sendUnreliable(const qpid::messaging::Message& m)
+{
+ sys::Mutex::ScopedLock l(lock);
+ OutgoingMessage msg;
+ msg.setSubject(m.getSubject().empty() ? address.getSubject() : m.getSubject());
+ msg.convert(m);
+ sink->send(session, name, msg);
+}
+
+void SenderImpl::replay(const sys::Mutex::ScopedLock& l)
+{
+ checkPendingSends(false, l);
+ for (OutgoingMessages::iterator i = outgoing.begin(); i != outgoing.end(); ++i) {
+ i->markRedelivered();
+ sink->send(session, name, *i);
+ }
+}
+
+uint32_t SenderImpl::checkPendingSends(bool flush) {
+ sys::Mutex::ScopedLock l(lock);
+ return checkPendingSends(flush, l);
+}
+
+uint32_t SenderImpl::checkPendingSends(bool flush, const sys::Mutex::ScopedLock&)
+{
+ if (flush) {
+ session.flush();
+ flushed = true;
+ } else {
+ flushed = false;
+ }
+ while (!outgoing.empty() && outgoing.front().isComplete()) {
+ outgoing.pop_front();
+ }
+ return outgoing.size();
+}
+
+void SenderImpl::closeImpl()
+{
+ {
+ sys::Mutex::ScopedLock l(lock);
+ state = CANCELLED;
+ sink->cancel(session, name);
+ }
+ parent->senderCancelled(name);
+}
+
+const std::string& SenderImpl::getName() const
+{
+ sys::Mutex::ScopedLock l(lock);
+ return name;
+}
+
+qpid::messaging::Session SenderImpl::getSession() const
+{
+ sys::Mutex::ScopedLock l(lock);
+ return qpid::messaging::Session(parent.get());
+}
+
+}}} // namespace qpid::client::amqp0_10
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h b/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h
new file mode 100644
index 0000000000..35ce82cf5d
--- /dev/null
+++ b/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h
@@ -0,0 +1,162 @@
+#ifndef QPID_CLIENT_AMQP0_10_SENDERIMPL_H
+#define QPID_CLIENT_AMQP0_10_SENDERIMPL_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/messaging/Address.h"
+#include "qpid/messaging/Message.h"
+#include "qpid/messaging/SenderImpl.h"
+#include "qpid/client/AsyncSession.h"
+#include "qpid/client/amqp0_10/SessionImpl.h"
+#include <memory>
+#include <boost/intrusive_ptr.hpp>
+#include <boost/ptr_container/ptr_deque.hpp>
+
+namespace qpid {
+namespace client {
+namespace amqp0_10 {
+
+class AddressResolution;
+class MessageSink;
+class OutgoingMessage;
+
+/**
+ *
+ */
+class SenderImpl : public qpid::messaging::SenderImpl
+{
+ public:
+ enum State {UNRESOLVED, ACTIVE, CANCELLED};
+
+ SenderImpl(SessionImpl& parent, const std::string& name,
+ const qpid::messaging::Address& address, bool autoReconnect);
+ void send(const qpid::messaging::Message&, bool sync);
+ void close();
+ void setCapacity(uint32_t);
+ uint32_t getCapacity();
+ uint32_t getUnsettled();
+ void init(qpid::client::AsyncSession, AddressResolution&);
+ const std::string& getName() const;
+ qpid::messaging::Session getSession() const;
+ qpid::messaging::Address getAddress() const;
+
+ private:
+ mutable sys::Mutex lock;
+ boost::intrusive_ptr<SessionImpl> parent;
+ const bool autoReconnect;
+ const std::string name;
+ const qpid::messaging::Address address;
+ State state;
+ std::auto_ptr<MessageSink> sink;
+
+ qpid::client::AsyncSession session;
+ std::string destination;
+ std::string routingKey;
+
+ typedef boost::ptr_deque<OutgoingMessage> OutgoingMessages;
+ OutgoingMessages outgoing;
+ uint32_t capacity;
+ uint32_t window;
+ bool flushed;
+ const bool unreliable;
+
+ uint32_t checkPendingSends(bool flush);
+ // Dummy ScopedLock parameter means call with lock held
+ uint32_t checkPendingSends(bool flush, const sys::Mutex::ScopedLock&);
+ void replay(const sys::Mutex::ScopedLock&);
+ void waitForCapacity();
+
+ //logic for application visible methods:
+ void sendImpl(const qpid::messaging::Message&);
+ void sendUnreliable(const qpid::messaging::Message&);
+ void closeImpl();
+
+
+ //functors for application visible methods (allowing locking and
+ //retry to be centralised):
+ struct Command
+ {
+ SenderImpl& impl;
+
+ Command(SenderImpl& i) : impl(i) {}
+ };
+
+ struct Send : Command
+ {
+ const qpid::messaging::Message& message;
+ bool repeat;
+
+ Send(SenderImpl& i, const qpid::messaging::Message& m) : Command(i), message(m), repeat(true) {}
+ void operator()()
+ {
+ impl.waitForCapacity();
+ //from this point message will be recorded if there is any
+ //failure (and replayed) so need not repeat the call
+ repeat = false;
+ impl.sendImpl(message);
+ }
+ };
+
+ struct UnreliableSend : Command
+ {
+ const qpid::messaging::Message& message;
+
+ UnreliableSend(SenderImpl& i, const qpid::messaging::Message& m) : Command(i), message(m) {}
+ void operator()()
+ {
+ //TODO: ideally want to put messages on the outbound
+ //queue and pull them off in io thread, but the old
+ //0-10 client doesn't support that option so for now
+ //we simply don't queue unreliable messages
+ impl.sendUnreliable(message);
+ }
+ };
+
+ struct Close : Command
+ {
+ Close(SenderImpl& i) : Command(i) {}
+ void operator()() { impl.closeImpl(); }
+ };
+
+ struct CheckPendingSends : Command
+ {
+ bool flush;
+ uint32_t pending;
+ CheckPendingSends(SenderImpl& i, bool f) : Command(i), flush(f), pending(0) {}
+ void operator()() { pending = impl.checkPendingSends(flush); }
+ };
+
+ //helper templates for some common patterns
+ template <class F> void execute()
+ {
+ F f(*this);
+ parent->execute(f);
+ }
+
+ template <class F, class P> bool execute1(P p)
+ {
+ F f(*this, p);
+ return parent->execute(f);
+ }
+};
+}}} // namespace qpid::client::amqp0_10
+
+#endif /*!QPID_CLIENT_AMQP0_10_SENDERIMPL_H*/
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
new file mode 100644
index 0000000000..1e2b68b24e
--- /dev/null
+++ b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
@@ -0,0 +1,606 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/client/amqp0_10/SessionImpl.h"
+#include "qpid/client/amqp0_10/ConnectionImpl.h"
+#include "qpid/client/amqp0_10/ReceiverImpl.h"
+#include "qpid/client/amqp0_10/SenderImpl.h"
+#include "qpid/client/amqp0_10/MessageSource.h"
+#include "qpid/client/amqp0_10/MessageSink.h"
+#include "qpid/client/SessionBase_0_10Access.h"
+#include "qpid/client/SessionImpl.h"
+#include "qpid/messaging/PrivateImplRef.h"
+#include "qpid/Exception.h"
+#include "qpid/log/Statement.h"
+#include "qpid/messaging/Address.h"
+#include "qpid/messaging/Connection.h"
+#include "qpid/messaging/Message.h"
+#include "qpid/messaging/MessageImpl.h"
+#include "qpid/messaging/Sender.h"
+#include "qpid/messaging/Receiver.h"
+#include "qpid/messaging/Session.h"
+#include "qpid/framing/enum.h"
+#include <boost/format.hpp>
+#include <boost/function.hpp>
+#include <boost/intrusive_ptr.hpp>
+
+using qpid::messaging::KeyError;
+using qpid::messaging::NoMessageAvailable;
+using qpid::messaging::MessagingException;
+using qpid::messaging::TransactionError;
+using qpid::messaging::TransactionAborted;
+using qpid::messaging::TransactionUnknown;
+using qpid::messaging::SessionError;
+using qpid::messaging::MessageImplAccess;
+using qpid::messaging::Sender;
+using qpid::messaging::Receiver;
+
+namespace qpid {
+namespace client {
+namespace amqp0_10 {
+
+typedef qpid::sys::Mutex::ScopedLock ScopedLock;
+typedef qpid::sys::Mutex::ScopedUnlock ScopedUnlock;
+
+SessionImpl::SessionImpl(ConnectionImpl& c, bool t) :
+ connection(&c), transactional(t), committing(false) {}
+
+bool SessionImpl::isTransactional() const
+{
+ return transactional;
+}
+
+void SessionImpl::checkError()
+{
+ ScopedLock l(lock);
+ txError.raise();
+ qpid::client::SessionBase_0_10Access s(session);
+ try {
+ s.get()->assertOpen();
+ } catch (const qpid::TransportFailure&) {
+ throw qpid::messaging::TransportFailure(std::string());
+ } catch (const qpid::framing::ResourceLimitExceededException& e) {
+ throw qpid::messaging::TargetCapacityExceeded(e.what());
+ } catch (const qpid::framing::UnauthorizedAccessException& e) {
+ throw qpid::messaging::UnauthorizedAccess(e.what());
+ } catch (const qpid::framing::NotFoundException& e) {
+ throw qpid::messaging::NotFound(e.what());
+ } catch (const qpid::framing::ResourceDeletedException& e) {
+ throw qpid::messaging::NotFound(e.what());
+ } catch (const qpid::SessionException& e) {
+ throw qpid::messaging::SessionError(e.what());
+ } catch (const qpid::ConnectionException& e) {
+ throw qpid::messaging::ConnectionError(e.what());
+ } catch (const qpid::Exception& e) {
+ throw qpid::messaging::MessagingException(e.what());
+ }
+}
+
+bool SessionImpl::hasError()
+{
+ ScopedLock l(lock);
+ qpid::client::SessionBase_0_10Access s(session);
+ return s.get()->hasError();
+}
+
+void SessionImpl::sync(bool block)
+{
+ if (block) retry<Sync>();
+ else execute<NonBlockingSync>();
+}
+
+namespace {
+struct ScopedSet {
+ bool& flag;
+ ScopedSet(bool& f) : flag(f) { flag = true; }
+ ~ScopedSet() { flag = false; }
+};
+}
+
+void SessionImpl::commit()
+{
+ try {
+ checkError();
+ ScopedSet s(committing);
+ execute<Commit>();
+ }
+ catch (const TransactionError&) {
+ assert(txError); // Must be set by thrower of TransactionError
+ }
+ catch (const std::exception& e) {
+ txError = new TransactionAborted(Msg() << "Transaction aborted: " << e.what());
+ }
+ checkError();
+}
+
+void SessionImpl::rollback()
+{
+ //If the session fails during this operation, the transaction will
+ //be rolled back anyway.
+ execute<Rollback>();
+}
+
+void SessionImpl::acknowledge(bool sync_)
+{
+ //Should probably throw an exception on failure here, or indicate
+ //it through a return type at least. Failure means that the
+ //message may be redelivered; i.e. the application cannot delete
+ //any state necessary for preventing reprocessing of the message
+ execute<Acknowledge>();
+ sync(sync_);
+}
+
+void SessionImpl::reject(qpid::messaging::Message& m)
+{
+ //Possibly want to somehow indicate failure here as well. Less
+ //clear need as compared to acknowledge however.
+ execute1<Reject>(m);
+}
+
+void SessionImpl::release(qpid::messaging::Message& m)
+{
+ execute1<Release>(m);
+}
+
+void SessionImpl::acknowledge(qpid::messaging::Message& m, bool cumulative)
+{
+ //Should probably throw an exception on failure here, or indicate
+ //it through a return type at least. Failure means that the
+ //message may be redelivered; i.e. the application cannot delete
+ //any state necessary for preventing reprocessing of the message
+ Acknowledge2 ack(*this, m, cumulative);
+ execute(ack);
+}
+
+void SessionImpl::close()
+{
+ if (hasError()) {
+ ScopedLock l(lock);
+ senders.clear();
+ receivers.clear();
+ } else {
+ Senders sCopy;
+ Receivers rCopy;
+ {
+ ScopedLock l(lock);
+ senders.swap(sCopy);
+ receivers.swap(rCopy);
+ }
+ for (Senders::iterator i = sCopy.begin(); i != sCopy.end(); ++i)
+ {
+ // outside the lock, will call senderCancelled
+ i->second.close();
+ }
+ for (Receivers::iterator i = rCopy.begin(); i != rCopy.end(); ++i)
+ {
+ // outside the lock, will call receiverCancelled
+ i->second.close();
+ }
+ }
+ connection->closed(*this);
+ if (!hasError()) {
+ ScopedLock l(lock);
+ session.close();
+ }
+}
+
+template <class T, class S> boost::intrusive_ptr<S> getImplPtr(T& t)
+{
+ return boost::dynamic_pointer_cast<S>(qpid::messaging::PrivateImplRef<T>::get(t));
+}
+
+template <class T> void getFreeKey(std::string& key, T& map)
+{
+ std::string name = key;
+ int count = 1;
+ for (typename T::const_iterator i = map.find(name); i != map.end(); i = map.find(name)) {
+ name = (boost::format("%1%_%2%") % key % ++count).str();
+ }
+ key = name;
+}
+
+void SessionImpl::setSession(qpid::client::Session s)
+{
+ session = s;
+ incoming.setSession(session);
+ if (transactional) {
+ session.txSelect();
+ }
+ for (Receivers::iterator i = receivers.begin(); i != receivers.end(); ++i) {
+ getImplPtr<Receiver, ReceiverImpl>(i->second)->init(session, resolver);
+ }
+ for (Senders::iterator i = senders.begin(); i != senders.end(); ++i) {
+ getImplPtr<Sender, SenderImpl>(i->second)->init(session, resolver);
+ }
+ session.sync();
+}
+
+struct SessionImpl::CreateReceiver : Command
+{
+ qpid::messaging::Receiver result;
+ const qpid::messaging::Address& address;
+
+ CreateReceiver(SessionImpl& i, const qpid::messaging::Address& a) :
+ Command(i), address(a) {}
+ void operator()() { result = impl.createReceiverImpl(address); }
+};
+
+Receiver SessionImpl::createReceiver(const qpid::messaging::Address& address)
+{
+ return get1<CreateReceiver, Receiver>(address);
+}
+
+Receiver SessionImpl::createReceiverImpl(const qpid::messaging::Address& address)
+{
+ ScopedLock l(lock);
+ std::string name = address.getName();
+ getFreeKey(name, receivers);
+ Receiver receiver(new ReceiverImpl(*this, name, address, connection->getAutoDecode()));
+ getImplPtr<Receiver, ReceiverImpl>(receiver)->init(session, resolver);
+ receivers[name] = receiver;
+ return receiver;
+}
+
+struct SessionImpl::CreateSender : Command
+{
+ qpid::messaging::Sender result;
+ const qpid::messaging::Address& address;
+
+ CreateSender(SessionImpl& i, const qpid::messaging::Address& a) :
+ Command(i), address(a) {}
+ void operator()() { result = impl.createSenderImpl(address); }
+};
+
+Sender SessionImpl::createSender(const qpid::messaging::Address& address)
+{
+ return get1<CreateSender, Sender>(address);
+}
+
+Sender SessionImpl::createSenderImpl(const qpid::messaging::Address& address)
+{
+ ScopedLock l(lock);
+ std::string name = address.getName();
+ getFreeKey(name, senders);
+ Sender sender(new SenderImpl(*this, name, address, connection->getAutoReconnect()));
+ getImplPtr<Sender, SenderImpl>(sender)->init(session, resolver);
+ senders[name] = sender;
+ return sender;
+}
+
+Sender SessionImpl::getSender(const std::string& name) const
+{
+ qpid::sys::Mutex::ScopedLock l(lock);
+ Senders::const_iterator i = senders.find(name);
+ if (i == senders.end()) {
+ throw KeyError(name);
+ } else {
+ return i->second;
+ }
+}
+
+Receiver SessionImpl::getReceiver(const std::string& name) const
+{
+ qpid::sys::Mutex::ScopedLock l(lock);
+ Receivers::const_iterator i = receivers.find(name);
+ if (i == receivers.end()) {
+ throw KeyError(name);
+ } else {
+ return i->second;
+ }
+}
+
+SessionImpl& SessionImpl::convert(qpid::messaging::Session& s)
+{
+ boost::intrusive_ptr<SessionImpl> impl = getImplPtr<qpid::messaging::Session, SessionImpl>(s);
+ if (!impl) {
+ throw SessionError(QPID_MSG("Configuration error; require qpid::client::amqp0_10::SessionImpl"));
+ }
+ return *impl;
+}
+
+namespace {
+
+struct IncomingMessageHandler : IncomingMessages::Handler
+{
+ typedef boost::function1<bool, IncomingMessages::MessageTransfer&> Callback;
+ Callback callback;
+ ReceiverImpl* receiver;
+
+ IncomingMessageHandler(Callback c) : callback(c), receiver(0) {}
+
+ bool accept(IncomingMessages::MessageTransfer& transfer)
+ {
+ return callback(transfer);
+ }
+
+ bool isClosed()
+ {
+ return receiver && receiver->isClosed();
+ }
+};
+
+}
+
+
+bool SessionImpl::getNextReceiver(Receiver* receiver, IncomingMessages::MessageTransfer& transfer)
+{
+ ScopedLock l(lock);
+ Receivers::const_iterator i = receivers.find(transfer.getDestination());
+ if (i == receivers.end()) {
+ QPID_LOG(error, "Received message for unknown destination " << transfer.getDestination());
+ return false;
+ } else {
+ *receiver = i->second;
+ return true;
+ }
+}
+
+bool SessionImpl::accept(ReceiverImpl* receiver,
+ qpid::messaging::Message* message,
+ IncomingMessages::MessageTransfer& transfer)
+{
+ if (receiver->getName() == transfer.getDestination()) {
+ transfer.retrieve(message);
+ receiver->received(*message);
+ return true;
+ } else {
+ return false;
+ }
+}
+
+qpid::sys::Duration adjust(qpid::messaging::Duration timeout)
+{
+ uint64_t ms = timeout.getMilliseconds();
+ if (ms < (uint64_t) (qpid::sys::TIME_INFINITE/qpid::sys::TIME_MSEC)) {
+ return ms * qpid::sys::TIME_MSEC;
+ } else {
+ return qpid::sys::TIME_INFINITE;
+ }
+}
+
+bool SessionImpl::getIncoming(IncomingMessages::Handler& handler, qpid::messaging::Duration timeout)
+{
+ return incoming.get(handler, adjust(timeout));
+}
+
+bool SessionImpl::get(ReceiverImpl& receiver, qpid::messaging::Message& message, qpid::messaging::Duration timeout)
+{
+ IncomingMessageHandler handler(boost::bind(&SessionImpl::accept, this, &receiver, &message, _1));
+ handler.receiver = &receiver;
+ return getIncoming(handler, timeout);
+}
+
+bool SessionImpl::nextReceiver(qpid::messaging::Receiver& receiver, qpid::messaging::Duration timeout)
+{
+ while (true) {
+ txError.raise();
+ try {
+ std::string destination;
+ if (incoming.getNextDestination(destination, adjust(timeout))) {
+ qpid::sys::Mutex::ScopedLock l(lock);
+ Receivers::const_iterator i = receivers.find(destination);
+ if (i == receivers.end()) {
+ throw qpid::messaging::ReceiverError(QPID_MSG("Received message for unknown destination " << destination));
+ } else {
+ receiver = i->second;
+ }
+ return true;
+ } else {
+ return false;
+ }
+ } catch (TransportFailure&) {
+ reconnect();
+ } catch (const qpid::framing::ResourceLimitExceededException& e) {
+ if (backoff()) return false;
+ else throw qpid::messaging::TargetCapacityExceeded(e.what());
+ } catch (const qpid::SessionException& e) {
+ rethrow(e);
+ } catch (const qpid::ClosedException&) {
+ throw qpid::messaging::SessionClosed();
+ } catch (const qpid::ConnectionException& e) {
+ throw qpid::messaging::ConnectionError(e.what());
+ } catch (const qpid::ChannelException& e) {
+ throw qpid::messaging::MessagingException(e.what());
+ }
+ }
+}
+
+qpid::messaging::Receiver SessionImpl::nextReceiver(qpid::messaging::Duration timeout)
+{
+ qpid::messaging::Receiver receiver;
+ if (!nextReceiver(receiver, timeout)) throw NoMessageAvailable();
+ if (!receiver) throw SessionError("Bad receiver returned!");
+ return receiver;
+}
+
+uint32_t SessionImpl::getReceivable()
+{
+ return get1<Receivable, uint32_t>((const std::string*) 0);
+}
+uint32_t SessionImpl::getReceivable(const std::string& destination)
+{
+ return get1<Receivable, uint32_t>(&destination);
+}
+
+struct SessionImpl::Receivable : Command
+{
+ const std::string* destination;
+ uint32_t result;
+
+ Receivable(SessionImpl& i, const std::string* d) : Command(i), destination(d), result(0) {}
+ void operator()() { result = impl.getReceivableImpl(destination); }
+};
+
+uint32_t SessionImpl::getReceivableImpl(const std::string* destination)
+{
+ ScopedLock l(lock);
+ if (destination) {
+ return incoming.available(*destination);
+ } else {
+ return incoming.available();
+ }
+}
+
+uint32_t SessionImpl::getUnsettledAcks()
+{
+ return get1<UnsettledAcks, uint32_t>((const std::string*) 0);
+}
+
+uint32_t SessionImpl::getUnsettledAcks(const std::string& destination)
+{
+ return get1<UnsettledAcks, uint32_t>(&destination);
+}
+
+struct SessionImpl::UnsettledAcks : Command
+{
+ const std::string* destination;
+ uint32_t result;
+
+ UnsettledAcks(SessionImpl& i, const std::string* d) : Command(i), destination(d), result(0) {}
+ void operator()() { result = impl.getUnsettledAcksImpl(destination); }
+};
+
+uint32_t SessionImpl::getUnsettledAcksImpl(const std::string* destination)
+{
+ ScopedLock l(lock);
+ if (destination) {
+ return incoming.pendingAccept(*destination);
+ } else {
+ return incoming.pendingAccept();
+ }
+}
+
+void SessionImpl::syncImpl(bool block)
+{
+ {
+ ScopedLock l(lock);
+ if (block) session.sync();
+ else session.flush();
+ }
+ //cleanup unconfirmed accept records:
+ incoming.pendingAccept();
+}
+
+void SessionImpl::commitImpl()
+{
+ ScopedLock l(lock);
+ incoming.accept();
+ session.txCommit();
+}
+
+void SessionImpl::rollbackImpl()
+{
+ ScopedLock l(lock);
+ for (Receivers::iterator i = receivers.begin(); i != receivers.end(); ++i) {
+ getImplPtr<Receiver, ReceiverImpl>(i->second)->stop();
+ }
+ //ensure that stop has been processed and all previously sent
+ //messages are available for release:
+ session.sync();
+ incoming.releaseAll();
+ session.txRollback();
+
+ for (Receivers::iterator i = receivers.begin(); i != receivers.end(); ++i) {
+ getImplPtr<Receiver, ReceiverImpl>(i->second)->start();
+ }
+}
+
+void SessionImpl::acknowledgeImpl()
+{
+ if (!transactional) incoming.accept();
+}
+
+void SessionImpl::acknowledgeImpl(qpid::messaging::Message& m, bool cumulative)
+{
+ if (!transactional) incoming.accept(MessageImplAccess::get(m).getInternalId(), cumulative);
+}
+
+void SessionImpl::rejectImpl(qpid::messaging::Message& m)
+{
+ SequenceSet set;
+ set.add(MessageImplAccess::get(m).getInternalId());
+ session.messageReject(set);
+}
+
+void SessionImpl::releaseImpl(qpid::messaging::Message& m)
+{
+ SequenceSet set;
+ set.add(MessageImplAccess::get(m).getInternalId());
+ session.messageRelease(set, true);
+}
+
+void SessionImpl::receiverCancelled(const std::string& name)
+{
+ {
+ ScopedLock l(lock);
+ receivers.erase(name);
+ session.sync();
+ incoming.releasePending(name);
+ }
+ incoming.wakeup();
+}
+
+void SessionImpl::releasePending(const std::string& name)
+{
+ ScopedLock l(lock);
+ incoming.releasePending(name);
+}
+
+void SessionImpl::senderCancelled(const std::string& name)
+{
+ ScopedLock l(lock);
+ senders.erase(name);
+}
+
+void SessionImpl::reconnect()
+{
+ if (transactional) {
+ if (committing)
+ txError = new TransactionUnknown("Transaction outcome unknown: transport failure");
+ else
+ txError = new TransactionAborted("Transaction aborted: transport failure");
+ txError.raise();
+ }
+ connection->reopen();
+}
+
+bool SessionImpl::backoff()
+{
+ return connection->backoff();
+}
+
+qpid::messaging::Connection SessionImpl::getConnection() const
+{
+ return qpid::messaging::Connection(connection.get());
+}
+
+void SessionImpl::rethrow(const qpid::SessionException& e) {
+ switch (e.code) {
+ case framing::execution::ERROR_CODE_NOT_ALLOWED:
+ case framing::execution::ERROR_CODE_UNAUTHORIZED_ACCESS: throw messaging::UnauthorizedAccess(e.what());
+
+ case framing::execution::ERROR_CODE_NOT_FOUND:
+ case framing::execution::ERROR_CODE_RESOURCE_DELETED: throw messaging::NotFound(e.what());
+
+ default: throw SessionError(e.what());
+ }
+}
+
+}}} // namespace qpid::client::amqp0_10
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h
new file mode 100644
index 0000000000..2bb72aa877
--- /dev/null
+++ b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h
@@ -0,0 +1,259 @@
+#ifndef QPID_CLIENT_AMQP0_10_SESSIONIMPL_H
+#define QPID_CLIENT_AMQP0_10_SESSIONIMPL_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/messaging/SessionImpl.h"
+#include "qpid/messaging/Duration.h"
+#include "qpid/messaging/exceptions.h"
+#include "qpid/client/Session.h"
+#include "qpid/client/SubscriptionManager.h"
+#include "qpid/client/amqp0_10/AddressResolution.h"
+#include "qpid/client/amqp0_10/IncomingMessages.h"
+#include "qpid/sys/Mutex.h"
+#include "qpid/framing/reply_exceptions.h"
+#include "qpid/sys/ExceptionHolder.h"
+#include <boost/intrusive_ptr.hpp>
+
+namespace qpid {
+
+namespace messaging {
+class Address;
+class Connection;
+class Message;
+class Receiver;
+class Sender;
+class Session;
+}
+
+namespace client {
+namespace amqp0_10 {
+
+class ConnectionImpl;
+class ReceiverImpl;
+class SenderImpl;
+
+/**
+ * Implementation of the protocol independent Session interface using
+ * AMQP 0-10.
+ */
+class SessionImpl : public qpid::messaging::SessionImpl
+{
+ public:
+ SessionImpl(ConnectionImpl&, bool transactional);
+ void commit();
+ void rollback();
+ void acknowledge(bool sync);
+ void reject(qpid::messaging::Message&);
+ void release(qpid::messaging::Message&);
+ void acknowledge(qpid::messaging::Message& msg, bool cumulative);
+ void close();
+ void sync(bool block);
+ qpid::messaging::Sender createSender(const qpid::messaging::Address& address);
+ qpid::messaging::Receiver createReceiver(const qpid::messaging::Address& address);
+
+ qpid::messaging::Sender getSender(const std::string& name) const;
+ qpid::messaging::Receiver getReceiver(const std::string& name) const;
+
+ bool nextReceiver(qpid::messaging::Receiver& receiver, qpid::messaging::Duration timeout);
+ qpid::messaging::Receiver nextReceiver(qpid::messaging::Duration timeout);
+
+ qpid::messaging::Connection getConnection() const;
+ void checkError();
+ bool hasError();
+ bool isTransactional() const;
+
+ bool get(ReceiverImpl& receiver, qpid::messaging::Message& message, qpid::messaging::Duration timeout);
+
+ void releasePending(const std::string& destination);
+ void receiverCancelled(const std::string& name);
+ void senderCancelled(const std::string& name);
+
+ uint32_t getReceivable();
+ uint32_t getReceivable(const std::string& destination);
+
+ uint32_t getUnsettledAcks();
+ uint32_t getUnsettledAcks(const std::string& destination);
+
+ void setSession(qpid::client::Session);
+
+ template <class T> bool execute(T& f)
+ {
+ try {
+ txError.raise();
+ f();
+ return true;
+ } catch (const qpid::TransportFailure&) {
+ reconnect();
+ return false;
+ } catch (const qpid::framing::ResourceLimitExceededException& e) {
+ if (backoff()) return false;
+ else throw qpid::messaging::TargetCapacityExceeded(e.what());
+ } catch (const qpid::framing::UnauthorizedAccessException& e) {
+ throw qpid::messaging::UnauthorizedAccess(e.what());
+ } catch (const qpid::framing::NotFoundException& e) {
+ throw qpid::messaging::NotFound(e.what());
+ } catch (const qpid::framing::ResourceDeletedException& e) {
+ throw qpid::messaging::NotFound(e.what());
+ } catch (const qpid::SessionException& e) {
+ rethrow(e);
+ return false; // Keep the compiler happy
+ } catch (const qpid::ConnectionException& e) {
+ throw qpid::messaging::ConnectionError(e.what());
+ } catch (const qpid::ChannelException& e) {
+ throw qpid::messaging::MessagingException(e.what());
+ }
+ }
+
+ static SessionImpl& convert(qpid::messaging::Session&);
+ static void rethrow(const qpid::SessionException&);
+
+ private:
+ typedef std::map<std::string, qpid::messaging::Receiver> Receivers;
+ typedef std::map<std::string, qpid::messaging::Sender> Senders;
+
+ mutable qpid::sys::Mutex lock;
+ boost::intrusive_ptr<ConnectionImpl> connection;
+ qpid::client::Session session;
+ AddressResolution resolver;
+ IncomingMessages incoming;
+ Receivers receivers;
+ Senders senders;
+ const bool transactional;
+ bool committing;
+ sys::ExceptionHolder txError;
+
+ bool accept(ReceiverImpl*, qpid::messaging::Message*, IncomingMessages::MessageTransfer&);
+ bool getIncoming(IncomingMessages::Handler& handler, qpid::messaging::Duration timeout);
+ bool getNextReceiver(qpid::messaging::Receiver* receiver, IncomingMessages::MessageTransfer& transfer);
+ void reconnect();
+ bool backoff();
+
+ void commitImpl();
+ void rollbackImpl();
+ void acknowledgeImpl();
+ void acknowledgeImpl(qpid::messaging::Message&, bool cumulative);
+ void rejectImpl(qpid::messaging::Message&);
+ void releaseImpl(qpid::messaging::Message&);
+ void closeImpl();
+ void syncImpl(bool block);
+ qpid::messaging::Sender createSenderImpl(const qpid::messaging::Address& address);
+ qpid::messaging::Receiver createReceiverImpl(const qpid::messaging::Address& address);
+ uint32_t getReceivableImpl(const std::string* destination);
+ uint32_t getUnsettledAcksImpl(const std::string* destination);
+
+ //functors for public facing methods (allows locking and retry
+ //logic to be centralised)
+ struct Command
+ {
+ SessionImpl& impl;
+
+ Command(SessionImpl& i) : impl(i) {}
+ };
+
+ struct Commit : Command
+ {
+ Commit(SessionImpl& i) : Command(i) {}
+ void operator()() { impl.commitImpl(); }
+ };
+
+ struct Rollback : Command
+ {
+ Rollback(SessionImpl& i) : Command(i) {}
+ void operator()() { impl.rollbackImpl(); }
+ };
+
+ struct Acknowledge : Command
+ {
+ Acknowledge(SessionImpl& i) : Command(i) {}
+ void operator()() { impl.acknowledgeImpl(); }
+ };
+
+ struct Sync : Command
+ {
+ Sync(SessionImpl& i) : Command(i) {}
+ void operator()() { impl.syncImpl(true); }
+ };
+
+ struct NonBlockingSync : Command
+ {
+ NonBlockingSync(SessionImpl& i) : Command(i) {}
+ void operator()() { impl.syncImpl(false); }
+ };
+
+ struct Reject : Command
+ {
+ qpid::messaging::Message& message;
+
+ Reject(SessionImpl& i, qpid::messaging::Message& m) : Command(i), message(m) {}
+ void operator()() { impl.rejectImpl(message); }
+ };
+
+ struct Release : Command
+ {
+ qpid::messaging::Message& message;
+
+ Release(SessionImpl& i, qpid::messaging::Message& m) : Command(i), message(m) {}
+ void operator()() { impl.releaseImpl(message); }
+ };
+
+ struct Acknowledge2 : Command
+ {
+ qpid::messaging::Message& message;
+ bool cumulative;
+
+ Acknowledge2(SessionImpl& i, qpid::messaging::Message& m, bool c) : Command(i), message(m), cumulative(c) {}
+ void operator()() { impl.acknowledgeImpl(message, cumulative); }
+ };
+
+ struct CreateSender;
+ struct CreateReceiver;
+ struct UnsettledAcks;
+ struct Receivable;
+
+ //helper templates for some common patterns
+ template <class F> bool execute()
+ {
+ F f(*this);
+ return execute(f);
+ }
+
+ template <class F> void retry()
+ {
+ while (!execute<F>()) {}
+ }
+
+ template <class F, class P> bool execute1(P p)
+ {
+ F f(*this, p);
+ return execute(f);
+ }
+
+ template <class F, class R, class P> R get1(P p)
+ {
+ F f(*this, p);
+ while (!execute(f)) {}
+ return f.result;
+ }
+};
+}}} // namespace qpid::client::amqp0_10
+
+#endif /*!QPID_CLIENT_AMQP0_10_SESSIONIMPL_H*/