summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/messaging
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2011-05-27 15:44:23 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2011-05-27 15:44:23 +0000
commit66765100f4257159622cefe57bed50125a5ad017 (patch)
treea88ee23bb194eb91f0ebb2d9b23ff423e3ea8e37 /qpid/cpp/src/qpid/messaging
parent1aeaa7b16e5ce54f10c901d75c4d40f9f88b9db6 (diff)
parent88b98b2f4152ef59a671fad55a0d08338b6b78ca (diff)
downloadqpid-python-rajith_jms_client.tar.gz
Creating a branch for experimenting with some ideas for JMS client.rajith_jms_client
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/rajith_jms_client@1128369 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/qpid/messaging')
-rw-r--r--qpid/cpp/src/qpid/messaging/Address.cpp151
-rw-r--r--qpid/cpp/src/qpid/messaging/AddressParser.cpp265
-rw-r--r--qpid/cpp/src/qpid/messaging/AddressParser.h66
-rw-r--r--qpid/cpp/src/qpid/messaging/Connection.cpp82
-rw-r--r--qpid/cpp/src/qpid/messaging/ConnectionImpl.h52
-rw-r--r--qpid/cpp/src/qpid/messaging/Duration.cpp55
-rw-r--r--qpid/cpp/src/qpid/messaging/FailoverUpdates.cpp85
-rw-r--r--qpid/cpp/src/qpid/messaging/HandleInstantiator.cpp64
-rw-r--r--qpid/cpp/src/qpid/messaging/Message.cpp148
-rw-r--r--qpid/cpp/src/qpid/messaging/MessageImpl.cpp79
-rw-r--r--qpid/cpp/src/qpid/messaging/MessageImpl.h90
-rw-r--r--qpid/cpp/src/qpid/messaging/PrivateImplRef.h94
-rw-r--r--qpid/cpp/src/qpid/messaging/Receiver.cpp48
-rw-r--r--qpid/cpp/src/qpid/messaging/ReceiverImpl.h52
-rw-r--r--qpid/cpp/src/qpid/messaging/Sender.cpp44
-rw-r--r--qpid/cpp/src/qpid/messaging/SenderImpl.h47
-rw-r--r--qpid/cpp/src/qpid/messaging/Session.cpp109
-rw-r--r--qpid/cpp/src/qpid/messaging/SessionImpl.h63
-rw-r--r--qpid/cpp/src/qpid/messaging/exceptions.cpp58
19 files changed, 1652 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/messaging/Address.cpp b/qpid/cpp/src/qpid/messaging/Address.cpp
new file mode 100644
index 0000000000..a516959edb
--- /dev/null
+++ b/qpid/cpp/src/qpid/messaging/Address.cpp
@@ -0,0 +1,151 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/messaging/Address.h"
+#include "qpid/framing/Uuid.h"
+#include <sstream>
+#include <boost/format.hpp>
+
+namespace qpid {
+namespace messaging {
+
+using namespace qpid::types;
+
+namespace {
+const std::string SUBJECT_DIVIDER = "/";
+const std::string OPTIONS_DIVIDER = ";";
+const std::string SPACE = " ";
+const std::string TYPE = "type";
+}
+class AddressImpl
+{
+ public:
+ std::string name;
+ std::string subject;
+ Variant::Map options;
+
+ AddressImpl() {}
+ AddressImpl(const std::string& n, const std::string& s, const Variant::Map& o) :
+ name(n), subject(s), options(o) {}
+};
+
+class AddressParser
+{
+ public:
+ AddressParser(const std::string&);
+ bool parse(Address& address);
+ private:
+ const std::string& input;
+ std::string::size_type current;
+ static const std::string RESERVED;
+
+ bool readChar(char c);
+ bool readQuotedString(std::string& s);
+ bool readQuotedValue(Variant& value);
+ bool readString(std::string& value, char delimiter);
+ bool readWord(std::string& word, const std::string& delims = RESERVED);
+ bool readSimpleValue(Variant& word);
+ bool readKey(std::string& key);
+ bool readValue(Variant& value);
+ bool readKeyValuePair(Variant::Map& map);
+ bool readMap(Variant& value);
+ bool readList(Variant& value);
+ bool readName(std::string& name);
+ bool readSubject(std::string& subject);
+ bool error(const std::string& message);
+ bool eos();
+ bool iswhitespace();
+ bool in(const std::string& delims);
+ bool isreserved();
+};
+
+Address::Address() : impl(new AddressImpl()) {}
+Address::Address(const std::string& address) : impl(new AddressImpl())
+{
+ AddressParser parser(address);
+ parser.parse(*this);
+}
+Address::Address(const std::string& name, const std::string& subject, const Variant::Map& options,
+ const std::string& type)
+ : impl(new AddressImpl(name, subject, options)) { setType(type); }
+Address::Address(const Address& a) :
+ impl(new AddressImpl(a.impl->name, a.impl->subject, a.impl->options)) {}
+Address::~Address() { delete impl; }
+
+Address& Address::operator=(const Address& a) { *impl = *a.impl; return *this; }
+
+
+std::string Address::str() const
+{
+ std::stringstream out;
+ out << impl->name;
+ if (!impl->subject.empty()) out << SUBJECT_DIVIDER << impl->subject;
+ if (!impl->options.empty()) out << OPTIONS_DIVIDER << impl->options;
+ return out.str();
+}
+Address::operator bool() const { return !impl->name.empty(); }
+bool Address::operator !() const { return impl->name.empty(); }
+
+const std::string& Address::getName() const { return impl->name; }
+void Address::setName(const std::string& name) { impl->name = name; }
+const std::string& Address::getSubject() const { return impl->subject; }
+void Address::setSubject(const std::string& subject) { impl->subject = subject; }
+const Variant::Map& Address::getOptions() const { return impl->options; }
+Variant::Map& Address::getOptions() { return impl->options; }
+void Address::setOptions(const Variant::Map& options) { impl->options = options; }
+
+
+namespace{
+const Variant EMPTY_VARIANT;
+const std::string EMPTY_STRING;
+const std::string NODE_PROPERTIES="node";
+}
+
+const Variant& find(const Variant::Map& map, const std::string& key)
+{
+ Variant::Map::const_iterator i = map.find(key);
+ if (i == map.end()) return EMPTY_VARIANT;
+ else return i->second;
+}
+
+std::string Address::getType() const
+{
+ const Variant& props = find(impl->options, NODE_PROPERTIES);
+ if (props.getType() == VAR_MAP) {
+ const Variant& type = find(props.asMap(), TYPE);
+ if (!type.isVoid()) return type.asString();
+ }
+ return EMPTY_STRING;
+}
+
+void Address::setType(const std::string& type)
+{
+ Variant& props = impl->options[NODE_PROPERTIES];
+ if (props.isVoid()) props = Variant::Map();
+ props.asMap()[TYPE] = type;
+}
+
+std::ostream& operator<<(std::ostream& out, const Address& address)
+{
+ out << address.str();
+ return out;
+}
+
+}} // namespace qpid::messaging
diff --git a/qpid/cpp/src/qpid/messaging/AddressParser.cpp b/qpid/cpp/src/qpid/messaging/AddressParser.cpp
new file mode 100644
index 0000000000..4c8f35fbc5
--- /dev/null
+++ b/qpid/cpp/src/qpid/messaging/AddressParser.cpp
@@ -0,0 +1,265 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "AddressParser.h"
+#include "qpid/framing/Uuid.h"
+#include <boost/format.hpp>
+
+namespace qpid {
+namespace messaging {
+
+using namespace qpid::types;
+
+AddressParser::AddressParser(const std::string& s) : input(s), current(0) {}
+
+bool AddressParser::error(const std::string& message)
+{
+ throw MalformedAddress((boost::format("%1%, character %2% of %3%") % message % current % input).str());
+}
+
+bool AddressParser::parse(Address& address)
+{
+ std::string name;
+ if (readName(name)) {
+ if (name.find('#') == 0) name = qpid::framing::Uuid(true).str() + name;
+ address.setName(name);
+ if (readChar('/')) {
+ std::string subject;
+ readSubject(subject);
+ address.setSubject(subject);
+ }
+ if (readChar(';')) {
+ Variant options = Variant::Map();
+ if (readMap(options)) {
+ address.setOptions(options.asMap());
+ }
+ }
+ //skip trailing whitespace
+ while (!eos() && iswhitespace()) ++current;
+ return eos() || error("Unexpected chars in address: " + input.substr(current));
+ } else {
+ return input.empty() || error("Expected name");
+ }
+}
+
+bool AddressParser::parseMap(Variant::Map& map)
+{
+ if (readChar('{')) {
+ readMapEntries(map);
+ return readChar('}') || error("Unmatched '{'!");
+ } else {
+ return false;
+ }
+}
+
+bool AddressParser::parseList(Variant::List& list)
+{
+ if (readChar('[')) {
+ readListItems(list);
+ return readChar(']') || error("Unmatched '['!");
+ } else {
+ return false;
+ }
+}
+
+
+bool AddressParser::readList(Variant& value)
+{
+ if (readChar('[')) {
+ value = Variant::List();
+ readListItems(value.asList());
+ return readChar(']') || error("Unmatched '['!");
+ } else {
+ return false;
+ }
+}
+
+void AddressParser::readListItems(Variant::List& list)
+{
+ Variant item;
+ while (readValueIfExists(item)) {
+ list.push_back(item);
+ if (!readChar(',')) break;
+ }
+}
+
+bool AddressParser::readMap(Variant& value)
+{
+ if (readChar('{')) {
+ value = Variant::Map();
+ readMapEntries(value.asMap());
+ return readChar('}') || error("Unmatched '{'!");
+ } else {
+ return false;
+ }
+}
+
+void AddressParser::readMapEntries(Variant::Map& map)
+{
+ while (readKeyValuePair(map) && readChar(',')) {}
+}
+
+bool AddressParser::readKeyValuePair(Variant::Map& map)
+{
+ std::string key;
+ Variant value;
+ if (readKey(key)) {
+ if (readChar(':') && readValue(value)) {
+ map[key] = value;
+ return true;
+ } else {
+ return error("Bad key-value pair, expected ':'");
+ }
+ } else {
+ return false;
+ }
+}
+
+bool AddressParser::readKey(std::string& key)
+{
+ return readWord(key) || readQuotedString(key);
+}
+
+bool AddressParser::readValue(Variant& value)
+{
+ return readValueIfExists(value) || error("Expected value");
+}
+
+bool AddressParser::readValueIfExists(Variant& value)
+{
+ return readSimpleValue(value) || readQuotedValue(value) ||
+ readMap(value) || readList(value);
+}
+
+bool AddressParser::readString(std::string& value, char delimiter)
+{
+ if (readChar(delimiter)) {
+ std::string::size_type start = current++;
+ while (!eos()) {
+ if (input.at(current) == delimiter) {
+ if (current > start) {
+ value = input.substr(start, current - start);
+ } else {
+ value = "";
+ }
+ ++current;
+ return true;
+ } else {
+ ++current;
+ }
+ }
+ return error("Unmatched delimiter");
+ } else {
+ return false;
+ }
+}
+
+bool AddressParser::readName(std::string& name)
+{
+ return readQuotedString(name) || readWord(name, "/;");
+}
+
+bool AddressParser::readSubject(std::string& subject)
+{
+ return readQuotedString(subject) || readWord(subject, ";");
+}
+
+bool AddressParser::readQuotedString(std::string& s)
+{
+ return readString(s, '"') || readString(s, '\'');
+}
+
+bool AddressParser::readQuotedValue(Variant& value)
+{
+ std::string s;
+ if (readQuotedString(s)) {
+ value = s;
+ return true;
+ } else {
+ return false;
+ }
+}
+
+bool AddressParser::readSimpleValue(Variant& value)
+{
+ std::string s;
+ if (readWord(s)) {
+ value.parse(s);
+ return true;
+ } else {
+ return false;
+ }
+}
+
+bool AddressParser::readWord(std::string& value, const std::string& delims)
+{
+ //skip leading whitespace
+ while (!eos() && iswhitespace()) ++current;
+
+ //read any number of non-whitespace, non-reserved chars into value
+ std::string::size_type start = current;
+ while (!eos() && !iswhitespace() && !in(delims)) ++current;
+
+ if (current > start) {
+ value = input.substr(start, current - start);
+ return true;
+ } else {
+ return false;
+ }
+}
+
+bool AddressParser::readChar(char c)
+{
+ while (!eos()) {
+ if (iswhitespace()) {
+ ++current;
+ } else if (input.at(current) == c) {
+ ++current;
+ return true;
+ } else {
+ return false;
+ }
+ }
+ return false;
+}
+
+bool AddressParser::iswhitespace()
+{
+ return ::isspace(input.at(current));
+}
+
+bool AddressParser::isreserved()
+{
+ return in(RESERVED);
+}
+
+bool AddressParser::in(const std::string& chars)
+{
+ return chars.find(input.at(current)) != std::string::npos;
+}
+
+bool AddressParser::eos()
+{
+ return current >= input.size();
+}
+
+const std::string AddressParser::RESERVED = "\'\"{}[],:/";
+
+}} // namespace qpid::messaging
diff --git a/qpid/cpp/src/qpid/messaging/AddressParser.h b/qpid/cpp/src/qpid/messaging/AddressParser.h
new file mode 100644
index 0000000000..1635331d19
--- /dev/null
+++ b/qpid/cpp/src/qpid/messaging/AddressParser.h
@@ -0,0 +1,66 @@
+#ifndef QPID_MESSAGING_ADDRESSPARSER_H
+#define QPID_MESSAGING_ADDRESSPARSER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/messaging/Address.h"
+
+namespace qpid {
+namespace messaging {
+
+class AddressParser
+{
+ public:
+ AddressParser(const std::string&);
+ bool parse(Address& address);
+ bool parseMap(qpid::types::Variant::Map& map);
+ bool parseList(qpid::types::Variant::List& list);
+ private:
+ const std::string& input;
+ std::string::size_type current;
+ static const std::string RESERVED;
+
+ bool readChar(char c);
+ bool readQuotedString(std::string& s);
+ bool readQuotedValue(qpid::types::Variant& value);
+ bool readString(std::string& value, char delimiter);
+ bool readWord(std::string& word, const std::string& delims = RESERVED);
+ bool readSimpleValue(qpid::types::Variant& word);
+ bool readKey(std::string& key);
+ bool readValue(qpid::types::Variant& value);
+ bool readValueIfExists(qpid::types::Variant& value);
+ bool readKeyValuePair(qpid::types::Variant::Map& map);
+ bool readMap(qpid::types::Variant& value);
+ bool readList(qpid::types::Variant& value);
+ bool readName(std::string& name);
+ bool readSubject(std::string& subject);
+ bool error(const std::string& message);
+ bool eos();
+ bool iswhitespace();
+ bool in(const std::string& delims);
+ bool isreserved();
+ void readListItems(qpid::types::Variant::List& list);
+ void readMapEntries(qpid::types::Variant::Map& map);
+};
+
+}} // namespace qpid::messaging
+
+#endif /*!QPID_MESSAGING_ADDRESSPARSER_H*/
diff --git a/qpid/cpp/src/qpid/messaging/Connection.cpp b/qpid/cpp/src/qpid/messaging/Connection.cpp
new file mode 100644
index 0000000000..bd90aa54a7
--- /dev/null
+++ b/qpid/cpp/src/qpid/messaging/Connection.cpp
@@ -0,0 +1,82 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/messaging/Connection.h"
+#include "qpid/messaging/AddressParser.h"
+#include "qpid/messaging/ConnectionImpl.h"
+#include "qpid/messaging/Session.h"
+#include "qpid/messaging/SessionImpl.h"
+#include "qpid/messaging/PrivateImplRef.h"
+#include "qpid/client/amqp0_10/ConnectionImpl.h"
+#include "qpid/log/Statement.h"
+
+namespace qpid {
+namespace messaging {
+
+using namespace qpid::types;
+
+typedef PrivateImplRef<qpid::messaging::Connection> PI;
+
+Connection::Connection(ConnectionImpl* impl) { PI::ctor(*this, impl); }
+Connection::Connection(const Connection& c) : Handle<ConnectionImpl>() { PI::copy(*this, c); }
+Connection& Connection::operator=(const Connection& c) { return PI::assign(*this, c); }
+Connection::~Connection() { PI::dtor(*this); }
+
+Connection::Connection(const std::string& url, const std::string& o)
+{
+ Variant::Map options;
+ AddressParser parser(o);
+ if (o.empty() || parser.parseMap(options)) {
+ PI::ctor(*this, new qpid::client::amqp0_10::ConnectionImpl(url, options));
+ } else {
+ throw InvalidOptionString("Invalid option string: " + o);
+ }
+}
+Connection::Connection(const std::string& url, const Variant::Map& options)
+{
+ PI::ctor(*this, new qpid::client::amqp0_10::ConnectionImpl(url, options));
+}
+
+Connection::Connection()
+{
+ Variant::Map options;
+ std::string url = "amqp:tcp:127.0.0.1:5672";
+ PI::ctor(*this, new qpid::client::amqp0_10::ConnectionImpl(url, options));
+}
+
+void Connection::open() { impl->open(); }
+bool Connection::isOpen() { return impl->isOpen(); }
+bool Connection::isOpen() const { return impl->isOpen(); }
+void Connection::close() { impl->close(); }
+Session Connection::createSession(const std::string& name) { return impl->newSession(false, name); }
+Session Connection::createTransactionalSession(const std::string& name)
+{
+ return impl->newSession(true, name);
+}
+Session Connection::getSession(const std::string& name) const { return impl->getSession(name); }
+void Connection::setOption(const std::string& name, const Variant& value)
+{
+ impl->setOption(name, value);
+}
+std::string Connection::getAuthenticatedUsername()
+{
+ return impl->getAuthenticatedUsername();
+}
+}} // namespace qpid::messaging
diff --git a/qpid/cpp/src/qpid/messaging/ConnectionImpl.h b/qpid/cpp/src/qpid/messaging/ConnectionImpl.h
new file mode 100644
index 0000000000..1e11d9a6d5
--- /dev/null
+++ b/qpid/cpp/src/qpid/messaging/ConnectionImpl.h
@@ -0,0 +1,52 @@
+#ifndef QPID_MESSAGING_CONNECTIONIMPL_H
+#define QPID_MESSAGING_CONNECTIONIMPL_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <string>
+#include "qpid/RefCounted.h"
+
+namespace qpid {
+
+namespace types {
+class Variant;
+}
+
+namespace messaging {
+
+class Session;
+
+class ConnectionImpl : public virtual qpid::RefCounted
+{
+ public:
+ virtual ~ConnectionImpl() {}
+ virtual void open() = 0;
+ virtual bool isOpen() const = 0;
+ virtual void close() = 0;
+ virtual Session newSession(bool transactional, const std::string& name) = 0;
+ virtual Session getSession(const std::string& name) const = 0;
+ virtual void setOption(const std::string& name, const qpid::types::Variant& value) = 0;
+ virtual std::string getAuthenticatedUsername() = 0;
+ private:
+};
+}} // namespace qpid::messaging
+
+#endif /*!QPID_MESSAGING_CONNECTIONIMPL_H*/
diff --git a/qpid/cpp/src/qpid/messaging/Duration.cpp b/qpid/cpp/src/qpid/messaging/Duration.cpp
new file mode 100644
index 0000000000..a23e9f5bcb
--- /dev/null
+++ b/qpid/cpp/src/qpid/messaging/Duration.cpp
@@ -0,0 +1,55 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/messaging/Duration.h"
+#include <limits>
+
+namespace qpid {
+namespace messaging {
+
+Duration::Duration(uint64_t ms) : milliseconds(ms) {}
+uint64_t Duration::getMilliseconds() const { return milliseconds; }
+
+Duration operator*(const Duration& duration, uint64_t multiplier)
+{
+ return Duration(duration.getMilliseconds() * multiplier);
+}
+
+Duration operator*(uint64_t multiplier, const Duration& duration)
+{
+ return Duration(duration.getMilliseconds() * multiplier);
+}
+
+bool operator==(const Duration& a, const Duration& b)
+{
+ return a.getMilliseconds() == b.getMilliseconds();
+}
+
+bool operator!=(const Duration& a, const Duration& b)
+{
+ return a.getMilliseconds() != b.getMilliseconds();
+}
+
+const Duration Duration::FOREVER(std::numeric_limits<uint64_t>::max());
+const Duration Duration::IMMEDIATE(0);
+const Duration Duration::SECOND(1000);
+const Duration Duration::MINUTE(SECOND * 60);
+
+}} // namespace qpid::messaging
diff --git a/qpid/cpp/src/qpid/messaging/FailoverUpdates.cpp b/qpid/cpp/src/qpid/messaging/FailoverUpdates.cpp
new file mode 100644
index 0000000000..4f2fcf2e82
--- /dev/null
+++ b/qpid/cpp/src/qpid/messaging/FailoverUpdates.cpp
@@ -0,0 +1,85 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/messaging/FailoverUpdates.h"
+#include "qpid/messaging/Connection.h"
+#include "qpid/messaging/Message.h"
+#include "qpid/messaging/Receiver.h"
+#include "qpid/messaging/Session.h"
+#include "qpid/sys/Runnable.h"
+#include "qpid/sys/Thread.h"
+#include "qpid/log/Statement.h"
+#include "qpid/Exception.h"
+#include "qpid/Url.h"
+#include "qpid/framing/Uuid.h"
+#include <vector>
+
+namespace qpid {
+namespace messaging {
+using framing::Uuid;
+
+struct FailoverUpdatesImpl : qpid::sys::Runnable
+{
+ Connection connection;
+ Session session;
+ Receiver receiver;
+ qpid::sys::Thread thread;
+
+ FailoverUpdatesImpl(Connection& c) : connection(c)
+ {
+ session = connection.createSession("failover-updates."+Uuid(true).str());
+ receiver = session.createReceiver("amq.failover");
+ thread = qpid::sys::Thread(*this);
+ }
+
+ ~FailoverUpdatesImpl() {
+ try {
+ session.close();
+ } catch(...) {} // Squash exceptions in a destructor.
+ thread.join();
+ }
+
+ void run()
+ {
+ try {
+ Message message;
+ while (receiver.fetch(message)) {
+ connection.setOption("reconnect-urls", message.getProperties()["amq.failover"]);
+ QPID_LOG(debug, "Set reconnect-urls to " << message.getProperties()["amq.failover"]);
+ session.acknowledge();
+ }
+ }
+ catch (const ClosedException&) {}
+ catch (const qpid::TransportFailure& e) {
+ QPID_LOG(warning, "Failover updates stopped on loss of connection. " << e.what());
+ }
+ catch (const std::exception& e) {
+ QPID_LOG(warning, "Failover updates stopped due to exception: " << e.what());
+ }
+ }
+};
+
+FailoverUpdates::FailoverUpdates(Connection& connection) : impl(new FailoverUpdatesImpl(connection)) {}
+FailoverUpdates::~FailoverUpdates() { if (impl) { delete impl; } }
+FailoverUpdates::FailoverUpdates(const FailoverUpdates&) : impl(0) {}
+FailoverUpdates& FailoverUpdates::operator=(const FailoverUpdates&) { return *this; }
+
+
+}} // namespace qpid::messaging
diff --git a/qpid/cpp/src/qpid/messaging/HandleInstantiator.cpp b/qpid/cpp/src/qpid/messaging/HandleInstantiator.cpp
new file mode 100644
index 0000000000..c9a7680bb4
--- /dev/null
+++ b/qpid/cpp/src/qpid/messaging/HandleInstantiator.cpp
@@ -0,0 +1,64 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/messaging/Connection.h"
+#include "qpid/messaging/Receiver.h"
+#include "qpid/messaging/Sender.h"
+#include "qpid/messaging/Session.h"
+
+namespace qpid {
+namespace messaging {
+
+using namespace qpid::types;
+
+void HandleInstantiatorDoNotCall(void)
+{
+ // This function exists to instantiate various template Handle
+ // bool functions. The instances are then available to
+ // the qpidmessaging DLL and subsequently exported.
+ // This function must not be exported nor called called.
+ // For further information refer to
+ // https://issues.apache.org/jira/browse/QPID-2926
+
+ Connection connection;
+ if (connection.isValid()) connection.close();
+ if (connection.isNull() ) connection.close();
+ if (connection ) connection.close();
+ if (!connection ) connection.close();
+
+ Receiver receiver;
+ if (receiver.isValid()) receiver.close();
+ if (receiver.isNull() ) receiver.close();
+ if (receiver ) receiver.close();
+ if (!receiver ) receiver.close();
+
+ Sender sender;
+ if (sender.isValid()) sender.close();
+ if (sender.isNull() ) sender.close();
+ if (sender ) sender.close();
+ if (!sender ) sender.close();
+
+ Session session;
+ if (session.isValid()) session.close();
+ if (session.isNull() ) session.close();
+ if (session ) session.close();
+ if (!session ) session.close();
+}
+}} // namespace qpid::messaging
diff --git a/qpid/cpp/src/qpid/messaging/Message.cpp b/qpid/cpp/src/qpid/messaging/Message.cpp
new file mode 100644
index 0000000000..83cdfd3c55
--- /dev/null
+++ b/qpid/cpp/src/qpid/messaging/Message.cpp
@@ -0,0 +1,148 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/messaging/Message.h"
+#include "qpid/messaging/MessageImpl.h"
+#include "qpid/amqp_0_10/Codecs.h"
+#include <boost/format.hpp>
+
+namespace qpid {
+namespace messaging {
+
+using namespace qpid::types;
+
+Message::Message(const std::string& bytes) : impl(new MessageImpl(bytes)) {}
+Message::Message(const char* bytes, size_t count) : impl(new MessageImpl(bytes, count)) {}
+
+Message::Message(const Message& m) : impl(new MessageImpl(*m.impl)) {}
+Message::~Message() { delete impl; }
+
+Message& Message::operator=(const Message& m) { *impl = *m.impl; return *this; }
+
+void Message::setReplyTo(const Address& d) { impl->setReplyTo(d); }
+const Address& Message::getReplyTo() const { return impl->getReplyTo(); }
+
+void Message::setSubject(const std::string& s) { impl->setSubject(s); }
+const std::string& Message::getSubject() const { return impl->getSubject(); }
+
+void Message::setContentType(const std::string& s) { impl->setContentType(s); }
+const std::string& Message::getContentType() const { return impl->getContentType(); }
+
+void Message::setMessageId(const std::string& id) { impl->messageId = id; }
+const std::string& Message::getMessageId() const { return impl->messageId; }
+
+void Message::setUserId(const std::string& id) { impl->userId = id; }
+const std::string& Message::getUserId() const { return impl->userId; }
+
+void Message::setCorrelationId(const std::string& id) { impl->correlationId = id; }
+const std::string& Message::getCorrelationId() const { return impl->correlationId; }
+
+uint8_t Message::getPriority() const { return impl->priority; }
+void Message::setPriority(uint8_t priority) { impl->priority = priority; }
+
+void Message::setTtl(Duration ttl) { impl->ttl = ttl.getMilliseconds(); }
+Duration Message::getTtl() const { return Duration(impl->ttl); }
+
+void Message::setDurable(bool durable) { impl->durable = durable; }
+bool Message::getDurable() const { return impl->durable; }
+
+bool Message::getRedelivered() const { return impl->redelivered; }
+void Message::setRedelivered(bool redelivered) { impl->redelivered = redelivered; }
+
+const Variant::Map& Message::getProperties() const { return impl->getHeaders(); }
+Variant::Map& Message::getProperties() { return impl->getHeaders(); }
+void Message::setProperty(const std::string& k, const qpid::types::Variant& v) { impl->setHeader(k,v); }
+
+void Message::setContent(const std::string& c) { impl->setBytes(c); }
+void Message::setContent(const char* chars, size_t count) { impl->setBytes(chars, count); }
+std::string Message::getContent() const { return impl->getBytes(); }
+
+const char* Message::getContentPtr() const
+{
+ return impl->getBytes().data();
+}
+
+size_t Message::getContentSize() const
+{
+ return impl->getBytes().size();
+}
+
+EncodingException::EncodingException(const std::string& msg) : qpid::types::Exception(msg) {}
+
+const std::string BAD_ENCODING("Unsupported encoding: %1% (only %2% is supported at present).");
+
+template <class C> struct MessageCodec
+{
+ static bool checkEncoding(const std::string& requested)
+ {
+ if (requested.size()) {
+ if (requested == C::contentType) return true;
+ else throw EncodingException((boost::format(BAD_ENCODING) % requested % C::contentType).str());
+ } else {
+ return false;
+ }
+ }
+
+ /*
+ * Currently only support a single encoding type for both list and
+ * map, based on AMQP 0-10, though wider support is anticipated in the
+ * future. This method simply checks that the desired encoding (if one
+ * is specified, either through the message-content or through an
+ * override) is indeed supported.
+ */
+ static void checkEncoding(const Message& message, const std::string& requested)
+ {
+ checkEncoding(requested) || checkEncoding(message.getContentType());
+ }
+
+ static void decode(const Message& message, typename C::ObjectType& object, const std::string& encoding)
+ {
+ checkEncoding(message, encoding);
+ C::decode(message.getContent(), object);
+ }
+
+ static void encode(const typename C::ObjectType& map, Message& message, const std::string& encoding)
+ {
+ checkEncoding(message, encoding);
+ std::string content;
+ C::encode(map, content);
+ message.setContentType(C::contentType);
+ message.setContent(content);
+ }
+};
+
+void decode(const Message& message, Variant::Map& map, const std::string& encoding)
+{
+ MessageCodec<qpid::amqp_0_10::MapCodec>::decode(message, map, encoding);
+}
+void decode(const Message& message, Variant::List& list, const std::string& encoding)
+{
+ MessageCodec<qpid::amqp_0_10::ListCodec>::decode(message, list, encoding);
+}
+void encode(const Variant::Map& map, Message& message, const std::string& encoding)
+{
+ MessageCodec<qpid::amqp_0_10::MapCodec>::encode(map, message, encoding);
+}
+void encode(const Variant::List& list, Message& message, const std::string& encoding)
+{
+ MessageCodec<qpid::amqp_0_10::ListCodec>::encode(list, message, encoding);
+}
+
+}} // namespace qpid::messaging
diff --git a/qpid/cpp/src/qpid/messaging/MessageImpl.cpp b/qpid/cpp/src/qpid/messaging/MessageImpl.cpp
new file mode 100644
index 0000000000..0601800e46
--- /dev/null
+++ b/qpid/cpp/src/qpid/messaging/MessageImpl.cpp
@@ -0,0 +1,79 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "MessageImpl.h"
+#include "qpid/messaging/Message.h"
+
+namespace qpid {
+namespace messaging {
+
+namespace {
+const std::string EMPTY_STRING = "";
+}
+
+using namespace qpid::types;
+
+MessageImpl::MessageImpl(const std::string& c) :
+ priority(0),
+ ttl(0),
+ durable(false),
+ redelivered(false),
+ bytes(c),
+ internalId(0) {}
+MessageImpl::MessageImpl(const char* chars, size_t count) :
+ priority(0),
+ ttl(0),
+ durable (false),
+ redelivered(false),
+ bytes(chars, count),
+ internalId(0) {}
+
+void MessageImpl::setReplyTo(const Address& d) { replyTo = d; }
+const Address& MessageImpl::getReplyTo() const { return replyTo; }
+
+void MessageImpl::setSubject(const std::string& s) { subject = s; }
+const std::string& MessageImpl::getSubject() const { return subject; }
+
+void MessageImpl::setContentType(const std::string& s) { contentType = s; }
+const std::string& MessageImpl::getContentType() const { return contentType; }
+
+const Variant::Map& MessageImpl::getHeaders() const { return headers; }
+Variant::Map& MessageImpl::getHeaders() { return headers; }
+void MessageImpl::setHeader(const std::string& key, const qpid::types::Variant& val) { headers[key] = val; }
+
+//should these methods be on MessageContent?
+void MessageImpl::setBytes(const std::string& c) { bytes = c; }
+void MessageImpl::setBytes(const char* chars, size_t count) { bytes.assign(chars, count); }
+const std::string& MessageImpl::getBytes() const { return bytes; }
+std::string& MessageImpl::getBytes() { return bytes; }
+
+void MessageImpl::setInternalId(qpid::framing::SequenceNumber i) { internalId = i; }
+qpid::framing::SequenceNumber MessageImpl::getInternalId() { return internalId; }
+
+MessageImpl& MessageImplAccess::get(Message& msg)
+{
+ return *msg.impl;
+}
+const MessageImpl& MessageImplAccess::get(const Message& msg)
+{
+ return *msg.impl;
+}
+
+}} // namespace qpid::messaging
diff --git a/qpid/cpp/src/qpid/messaging/MessageImpl.h b/qpid/cpp/src/qpid/messaging/MessageImpl.h
new file mode 100644
index 0000000000..57df6b3fda
--- /dev/null
+++ b/qpid/cpp/src/qpid/messaging/MessageImpl.h
@@ -0,0 +1,90 @@
+#ifndef QPID_MESSAGING_MESSAGEIMPL_H
+#define QPID_MESSAGING_MESSAGEIMPL_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/messaging/Address.h"
+#include "qpid/types/Variant.h"
+#include "qpid/framing/SequenceNumber.h"
+
+namespace qpid {
+namespace messaging {
+
+struct MessageImpl
+{
+ Address replyTo;
+ std::string subject;
+ std::string contentType;
+ std::string messageId;
+ std::string userId;
+ std::string correlationId;
+ uint8_t priority;
+ uint64_t ttl;
+ bool durable;
+ bool redelivered;
+ qpid::types::Variant::Map headers;
+
+ std::string bytes;
+
+ qpid::framing::SequenceNumber internalId;
+
+ MessageImpl(const std::string& c);
+ MessageImpl(const char* chars, size_t count);
+
+ void setReplyTo(const Address& d);
+ const Address& getReplyTo() const;
+
+ void setSubject(const std::string& s);
+ const std::string& getSubject() const;
+
+ void setContentType(const std::string& s);
+ const std::string& getContentType() const;
+
+ const qpid::types::Variant::Map& getHeaders() const;
+ qpid::types::Variant::Map& getHeaders();
+ void setHeader(const std::string& key, const qpid::types::Variant& val);
+
+ void setBytes(const std::string& bytes);
+ void setBytes(const char* chars, size_t count);
+ const std::string& getBytes() const;
+ std::string& getBytes();
+
+ void setInternalId(qpid::framing::SequenceNumber id);
+ qpid::framing::SequenceNumber getInternalId();
+
+};
+
+class Message;
+
+/**
+ * Provides access to the internal MessageImpl for a message which is
+ * useful when accessing any message state not exposed directly
+ * through the public API.
+ */
+struct MessageImplAccess
+{
+ static MessageImpl& get(Message&);
+ static const MessageImpl& get(const Message&);
+};
+
+}} // namespace qpid::messaging
+
+#endif /*!QPID_MESSAGING_MESSAGEIMPL_H*/
diff --git a/qpid/cpp/src/qpid/messaging/PrivateImplRef.h b/qpid/cpp/src/qpid/messaging/PrivateImplRef.h
new file mode 100644
index 0000000000..e77c58d071
--- /dev/null
+++ b/qpid/cpp/src/qpid/messaging/PrivateImplRef.h
@@ -0,0 +1,94 @@
+#ifndef QPID_MESSAGING_PRIVATEIMPL_H
+#define QPID_MESSAGING_PRIVATEIMPL_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/messaging/ImportExport.h"
+#include <boost/intrusive_ptr.hpp>
+#include "qpid/RefCounted.h"
+
+namespace qpid {
+namespace messaging {
+
+/**
+ * Helper class to implement a class with a private, reference counted
+ * implementation and reference semantics.
+ *
+ * Such classes are used in the public API to hide implementation, they
+ * should. Example of use:
+ *
+ * === Foo.h
+ *
+ * template <class T> PrivateImplRef;
+ * class FooImpl;
+ *
+ * Foo : public Handle<FooImpl> {
+ * public:
+ * Foo(FooImpl* = 0);
+ * Foo(const Foo&);
+ * ~Foo();
+ * Foo& operator=(const Foo&);
+ *
+ * int fooDo(); // and other Foo functions...
+ *
+ * private:
+ * typedef FooImpl Impl;
+ * Impl* impl;
+ * friend class PrivateImplRef<Foo>;
+ *
+ * === Foo.cpp
+ *
+ * typedef PrivateImplRef<Foo> PI;
+ * Foo::Foo(FooImpl* p) { PI::ctor(*this, p); }
+ * Foo::Foo(const Foo& c) : Handle<FooImpl>() { PI::copy(*this, c); }
+ * Foo::~Foo() { PI::dtor(*this); }
+ * Foo& Foo::operator=(const Foo& c) { return PI::assign(*this, c); }
+ *
+ * int foo::fooDo() { return impl->fooDo(); }
+ *
+ */
+template <class T> class PrivateImplRef {
+ public:
+ typedef typename T::Impl Impl;
+ typedef boost::intrusive_ptr<Impl> intrusive_ptr;
+
+ /** Get the implementation pointer from a handle */
+ static intrusive_ptr get(const T& t) { return intrusive_ptr(t.impl); }
+
+ /** Set the implementation pointer in a handle */
+ static void set(T& t, const intrusive_ptr& p) {
+ if (t.impl == p) return;
+ if (t.impl) boost::intrusive_ptr_release(t.impl);
+ t.impl = p.get();
+ if (t.impl) boost::intrusive_ptr_add_ref(t.impl);
+ }
+
+ // Helper functions to implement the ctor, dtor, copy, assign
+ static void ctor(T& t, Impl* p) { t.impl = p; if (p) boost::intrusive_ptr_add_ref(p); }
+ static void copy(T& t, const T& x) { if (&t == &x) return; t.impl = 0; assign(t, x); }
+ static void dtor(T& t) { if(t.impl) boost::intrusive_ptr_release(t.impl); }
+ static T& assign(T& t, const T& x) { set(t, get(x)); return t;}
+};
+
+}} // namespace qpid::messaging
+
+#endif /*!QPID_MESSAGING_PRIVATEIMPL_H*/
diff --git a/qpid/cpp/src/qpid/messaging/Receiver.cpp b/qpid/cpp/src/qpid/messaging/Receiver.cpp
new file mode 100644
index 0000000000..78e0c5daa3
--- /dev/null
+++ b/qpid/cpp/src/qpid/messaging/Receiver.cpp
@@ -0,0 +1,48 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/messaging/Receiver.h"
+#include "qpid/messaging/Message.h"
+#include "qpid/messaging/ReceiverImpl.h"
+#include "qpid/messaging/Session.h"
+#include "qpid/messaging/PrivateImplRef.h"
+
+namespace qpid {
+namespace messaging {
+
+typedef PrivateImplRef<qpid::messaging::Receiver> PI;
+
+Receiver::Receiver(ReceiverImpl* impl) { PI::ctor(*this, impl); }
+Receiver::Receiver(const Receiver& s) : Handle<ReceiverImpl>() { PI::copy(*this, s); }
+Receiver::~Receiver() { PI::dtor(*this); }
+Receiver& Receiver::operator=(const Receiver& s) { return PI::assign(*this, s); }
+bool Receiver::get(Message& message, Duration timeout) { return impl->get(message, timeout); }
+Message Receiver::get(Duration timeout) { return impl->get(timeout); }
+bool Receiver::fetch(Message& message, Duration timeout) { return impl->fetch(message, timeout); }
+Message Receiver::fetch(Duration timeout) { return impl->fetch(timeout); }
+void Receiver::setCapacity(uint32_t c) { impl->setCapacity(c); }
+uint32_t Receiver::getCapacity() { return impl->getCapacity(); }
+uint32_t Receiver::getAvailable() { return impl->getAvailable(); }
+uint32_t Receiver::getUnsettled() { return impl->getUnsettled(); }
+void Receiver::close() { impl->close(); }
+const std::string& Receiver::getName() const { return impl->getName(); }
+Session Receiver::getSession() const { return impl->getSession(); }
+bool Receiver::isClosed() const { return impl->isClosed(); }
+}} // namespace qpid::messaging
diff --git a/qpid/cpp/src/qpid/messaging/ReceiverImpl.h b/qpid/cpp/src/qpid/messaging/ReceiverImpl.h
new file mode 100644
index 0000000000..57059bfd28
--- /dev/null
+++ b/qpid/cpp/src/qpid/messaging/ReceiverImpl.h
@@ -0,0 +1,52 @@
+#ifndef QPID_MESSAGING_RECEIVERIMPL_H
+#define QPID_MESSAGING_RECEIVERIMPL_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/RefCounted.h"
+
+namespace qpid {
+namespace messaging {
+
+class Message;
+class MessageListener;
+class Session;
+
+class ReceiverImpl : public virtual qpid::RefCounted
+{
+ public:
+ virtual ~ReceiverImpl() {}
+ virtual bool get(Message& message, Duration timeout) = 0;
+ virtual Message get(Duration timeout) = 0;
+ virtual bool fetch(Message& message, Duration timeout) = 0;
+ virtual Message fetch(Duration timeout) = 0;
+ virtual void setCapacity(uint32_t) = 0;
+ virtual uint32_t getCapacity() = 0;
+ virtual uint32_t getAvailable() = 0;
+ virtual uint32_t getUnsettled() = 0;
+ virtual void close() = 0;
+ virtual const std::string& getName() const = 0;
+ virtual Session getSession() const = 0;
+ virtual bool isClosed() const = 0;
+};
+}} // namespace qpid::messaging
+
+#endif /*!QPID_MESSAGING_RECEIVERIMPL_H*/
diff --git a/qpid/cpp/src/qpid/messaging/Sender.cpp b/qpid/cpp/src/qpid/messaging/Sender.cpp
new file mode 100644
index 0000000000..53dbb69777
--- /dev/null
+++ b/qpid/cpp/src/qpid/messaging/Sender.cpp
@@ -0,0 +1,44 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/messaging/Sender.h"
+#include "qpid/messaging/Message.h"
+#include "qpid/messaging/SenderImpl.h"
+#include "qpid/messaging/Session.h"
+#include "qpid/messaging/PrivateImplRef.h"
+
+namespace qpid {
+namespace messaging {
+typedef PrivateImplRef<qpid::messaging::Sender> PI;
+
+Sender::Sender(SenderImpl* impl) { PI::ctor(*this, impl); }
+Sender::Sender(const Sender& s) : qpid::messaging::Handle<SenderImpl>() { PI::copy(*this, s); }
+Sender::~Sender() { PI::dtor(*this); }
+Sender& Sender::operator=(const Sender& s) { return PI::assign(*this, s); }
+void Sender::send(const Message& message, bool sync) { impl->send(message, sync); }
+void Sender::close() { impl->close(); }
+void Sender::setCapacity(uint32_t c) { impl->setCapacity(c); }
+uint32_t Sender::getCapacity() { return impl->getCapacity(); }
+uint32_t Sender::getUnsettled() { return impl->getUnsettled(); }
+uint32_t Sender::getAvailable() { return getCapacity() - getUnsettled(); }
+const std::string& Sender::getName() const { return impl->getName(); }
+Session Sender::getSession() const { return impl->getSession(); }
+
+}} // namespace qpid::messaging
diff --git a/qpid/cpp/src/qpid/messaging/SenderImpl.h b/qpid/cpp/src/qpid/messaging/SenderImpl.h
new file mode 100644
index 0000000000..a1ca02c72c
--- /dev/null
+++ b/qpid/cpp/src/qpid/messaging/SenderImpl.h
@@ -0,0 +1,47 @@
+#ifndef QPID_MESSAGING_SENDERIMPL_H
+#define QPID_MESSAGING_SENDERIMPL_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/RefCounted.h"
+
+namespace qpid {
+namespace messaging {
+
+class Message;
+class Session;
+
+class SenderImpl : public virtual qpid::RefCounted
+{
+ public:
+ virtual ~SenderImpl() {}
+ virtual void send(const Message& message, bool sync) = 0;
+ virtual void close() = 0;
+ virtual void setCapacity(uint32_t) = 0;
+ virtual uint32_t getCapacity() = 0;
+ virtual uint32_t getUnsettled() = 0;
+ virtual const std::string& getName() const = 0;
+ virtual Session getSession() const = 0;
+ private:
+};
+}} // namespace qpid::messaging
+
+#endif /*!QPID_MESSAGING_SENDERIMPL_H*/
diff --git a/qpid/cpp/src/qpid/messaging/Session.cpp b/qpid/cpp/src/qpid/messaging/Session.cpp
new file mode 100644
index 0000000000..496953a8e5
--- /dev/null
+++ b/qpid/cpp/src/qpid/messaging/Session.cpp
@@ -0,0 +1,109 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/messaging/Session.h"
+#include "qpid/messaging/Address.h"
+#include "qpid/messaging/Connection.h"
+#include "qpid/messaging/Message.h"
+#include "qpid/messaging/Sender.h"
+#include "qpid/messaging/Receiver.h"
+#include "qpid/messaging/SessionImpl.h"
+#include "qpid/messaging/PrivateImplRef.h"
+
+namespace qpid {
+namespace messaging {
+
+typedef PrivateImplRef<qpid::messaging::Session> PI;
+
+Session::Session(SessionImpl* impl) { PI::ctor(*this, impl); }
+Session::Session(const Session& s) : Handle<SessionImpl>() { PI::copy(*this, s); }
+Session::~Session() { PI::dtor(*this); }
+Session& Session::operator=(const Session& s) { return PI::assign(*this, s); }
+void Session::commit() { impl->commit(); }
+void Session::rollback() { impl->rollback(); }
+void Session::acknowledge(bool sync) { impl->acknowledge(sync); }
+void Session::acknowledge(Message& m, bool s) { impl->acknowledge(m); sync(s); }
+void Session::reject(Message& m) { impl->reject(m); }
+void Session::release(Message& m) { impl->release(m); }
+void Session::close() { impl->close(); }
+
+Sender Session::createSender(const Address& address)
+{
+ return impl->createSender(address);
+}
+Receiver Session::createReceiver(const Address& address)
+{
+ return impl->createReceiver(address);
+}
+
+Sender Session::createSender(const std::string& address)
+{
+ return impl->createSender(Address(address));
+}
+Receiver Session::createReceiver(const std::string& address)
+{
+ return impl->createReceiver(Address(address));
+}
+
+void Session::sync(bool block)
+{
+ impl->sync(block);
+}
+
+bool Session::nextReceiver(Receiver& receiver, Duration timeout)
+{
+ return impl->nextReceiver(receiver, timeout);
+}
+
+
+Receiver Session::nextReceiver(Duration timeout)
+{
+ return impl->nextReceiver(timeout);
+}
+
+uint32_t Session::getReceivable() { return impl->getReceivable(); }
+uint32_t Session::getUnsettledAcks() { return impl->getUnsettledAcks(); }
+
+Sender Session::getSender(const std::string& name) const
+{
+ return impl->getSender(name);
+}
+Receiver Session::getReceiver(const std::string& name) const
+{
+ return impl->getReceiver(name);
+}
+
+Connection Session::getConnection() const
+{
+ return impl->getConnection();
+}
+
+void Session::checkError() { impl->checkError(); }
+bool Session::hasError()
+{
+ try {
+ checkError();
+ return false;
+ } catch (const std::exception&) {
+ return true;
+ }
+}
+
+}} // namespace qpid::messaging
diff --git a/qpid/cpp/src/qpid/messaging/SessionImpl.h b/qpid/cpp/src/qpid/messaging/SessionImpl.h
new file mode 100644
index 0000000000..02a254e4f2
--- /dev/null
+++ b/qpid/cpp/src/qpid/messaging/SessionImpl.h
@@ -0,0 +1,63 @@
+#ifndef QPID_MESSAGING_SESSIONIMPL_H
+#define QPID_MESSAGING_SESSIONIMPL_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/RefCounted.h"
+#include <string>
+#include "qpid/messaging/Duration.h"
+
+namespace qpid {
+namespace messaging {
+
+class Address;
+class Connection;
+class Message;
+class Sender;
+class Receiver;
+
+class SessionImpl : public virtual qpid::RefCounted
+{
+ public:
+ virtual ~SessionImpl() {}
+ virtual void commit() = 0;
+ virtual void rollback() = 0;
+ virtual void acknowledge(bool sync) = 0;
+ virtual void acknowledge(Message&) = 0;
+ virtual void reject(Message&) = 0;
+ virtual void release(Message&) = 0;
+ virtual void close() = 0;
+ virtual void sync(bool block) = 0;
+ virtual Sender createSender(const Address& address) = 0;
+ virtual Receiver createReceiver(const Address& address) = 0;
+ virtual bool nextReceiver(Receiver& receiver, Duration timeout) = 0;
+ virtual Receiver nextReceiver(Duration timeout) = 0;
+ virtual uint32_t getReceivable() = 0;
+ virtual uint32_t getUnsettledAcks() = 0;
+ virtual Sender getSender(const std::string& name) const = 0;
+ virtual Receiver getReceiver(const std::string& name) const = 0;
+ virtual Connection getConnection() const = 0;
+ virtual void checkError() = 0;
+ private:
+};
+}} // namespace qpid::messaging
+
+#endif /*!QPID_MESSAGING_SESSIONIMPL_H*/
diff --git a/qpid/cpp/src/qpid/messaging/exceptions.cpp b/qpid/cpp/src/qpid/messaging/exceptions.cpp
new file mode 100644
index 0000000000..5d2683fffe
--- /dev/null
+++ b/qpid/cpp/src/qpid/messaging/exceptions.cpp
@@ -0,0 +1,58 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/messaging/exceptions.h"
+
+namespace qpid {
+namespace messaging {
+
+MessagingException::MessagingException(const std::string& msg) : qpid::types::Exception(msg) {}
+MessagingException::~MessagingException() throw() {}
+
+InvalidOptionString::InvalidOptionString(const std::string& msg) : MessagingException(msg) {}
+KeyError::KeyError(const std::string& msg) : MessagingException(msg) {}
+
+
+LinkError::LinkError(const std::string& msg) : MessagingException(msg) {}
+
+AddressError::AddressError(const std::string& msg) : LinkError(msg) {}
+ResolutionError::ResolutionError(const std::string& msg) : AddressError(msg) {}
+MalformedAddress::MalformedAddress(const std::string& msg) : AddressError(msg) {}
+AssertionFailed::AssertionFailed(const std::string& msg) : ResolutionError(msg) {}
+NotFound::NotFound(const std::string& msg) : ResolutionError(msg) {}
+
+ReceiverError::ReceiverError(const std::string& msg) : LinkError(msg) {}
+FetchError::FetchError(const std::string& msg) : ReceiverError(msg) {}
+NoMessageAvailable::NoMessageAvailable() : FetchError("No message to fetch") {}
+
+SenderError::SenderError(const std::string& msg) : LinkError(msg) {}
+SendError::SendError(const std::string& msg) : SenderError(msg) {}
+TargetCapacityExceeded::TargetCapacityExceeded(const std::string& msg) : SendError(msg) {}
+
+SessionError::SessionError(const std::string& msg) : MessagingException(msg) {}
+TransactionError::TransactionError(const std::string& msg) : SessionError(msg) {}
+TransactionAborted::TransactionAborted(const std::string& msg) : TransactionError(msg) {}
+UnauthorizedAccess::UnauthorizedAccess(const std::string& msg) : SessionError(msg) {}
+
+ConnectionError::ConnectionError(const std::string& msg) : MessagingException(msg) {}
+
+TransportFailure::TransportFailure(const std::string& msg) : MessagingException(msg) {}
+
+}} // namespace qpid::messaging