summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/messaging/amqp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/messaging/amqp')
-rw-r--r--cpp/src/qpid/messaging/amqp/AddressHelper.cpp182
-rw-r--r--cpp/src/qpid/messaging/amqp/AddressHelper.h57
-rw-r--r--cpp/src/qpid/messaging/amqp/ConnectionContext.cpp612
-rw-r--r--cpp/src/qpid/messaging/amqp/ConnectionContext.h150
-rw-r--r--cpp/src/qpid/messaging/amqp/ConnectionHandle.cpp84
-rw-r--r--cpp/src/qpid/messaging/amqp/ConnectionHandle.h58
-rw-r--r--cpp/src/qpid/messaging/amqp/DriverImpl.cpp74
-rw-r--r--cpp/src/qpid/messaging/amqp/DriverImpl.h60
-rw-r--r--cpp/src/qpid/messaging/amqp/EncodedMessage.cpp263
-rw-r--r--cpp/src/qpid/messaging/amqp/EncodedMessage.h177
-rw-r--r--cpp/src/qpid/messaging/amqp/ReceiverContext.cpp146
-rw-r--r--cpp/src/qpid/messaging/amqp/ReceiverContext.h68
-rw-r--r--cpp/src/qpid/messaging/amqp/ReceiverHandle.cpp106
-rw-r--r--cpp/src/qpid/messaging/amqp/ReceiverHandle.h63
-rw-r--r--cpp/src/qpid/messaging/amqp/Sasl.cpp157
-rw-r--r--cpp/src/qpid/messaging/amqp/Sasl.h72
-rw-r--r--cpp/src/qpid/messaging/amqp/SenderContext.cpp363
-rw-r--r--cpp/src/qpid/messaging/amqp/SenderContext.h90
-rw-r--r--cpp/src/qpid/messaging/amqp/SenderHandle.cpp75
-rw-r--r--cpp/src/qpid/messaging/amqp/SenderHandle.h58
-rw-r--r--cpp/src/qpid/messaging/amqp/SessionContext.cpp156
-rw-r--r--cpp/src/qpid/messaging/amqp/SessionContext.h81
-rw-r--r--cpp/src/qpid/messaging/amqp/SessionHandle.cpp148
-rw-r--r--cpp/src/qpid/messaging/amqp/SessionHandle.h64
-rw-r--r--cpp/src/qpid/messaging/amqp/SslTransport.cpp160
-rw-r--r--cpp/src/qpid/messaging/amqp/SslTransport.h74
-rw-r--r--cpp/src/qpid/messaging/amqp/TcpTransport.cpp162
-rw-r--r--cpp/src/qpid/messaging/amqp/TcpTransport.h71
-rw-r--r--cpp/src/qpid/messaging/amqp/Transport.cpp50
-rw-r--r--cpp/src/qpid/messaging/amqp/Transport.h48
-rw-r--r--cpp/src/qpid/messaging/amqp/TransportContext.h47
31 files changed, 3976 insertions, 0 deletions
diff --git a/cpp/src/qpid/messaging/amqp/AddressHelper.cpp b/cpp/src/qpid/messaging/amqp/AddressHelper.cpp
new file mode 100644
index 0000000000..359660dce5
--- /dev/null
+++ b/cpp/src/qpid/messaging/amqp/AddressHelper.cpp
@@ -0,0 +1,182 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/messaging/amqp/AddressHelper.h"
+#include "qpid/messaging/Address.h"
+#include <vector>
+#include <boost/assign.hpp>
+extern "C" {
+#include <proton/engine.h>
+}
+
+namespace qpid {
+namespace messaging {
+namespace amqp {
+
+using qpid::types::Variant;
+
+namespace {
+//policy types
+const std::string CREATE("create");
+const std::string ASSERT("assert");
+const std::string DELETE("delete");
+
+//policy values
+const std::string ALWAYS("always");
+const std::string NEVER("never");
+const std::string RECEIVER("receiver");
+const std::string SENDER("sender");
+
+const std::string NODE("node");
+const std::string LINK("link");
+
+const std::string TYPE("type");
+const std::string TOPIC("topic");
+const std::string QUEUE("queue");
+
+//distribution modes:
+const std::string MOVE("move");
+const std::string COPY("copy");
+
+const std::string SUPPORTED_DIST_MODES("supported-dist-modes");
+
+
+const std::vector<std::string> RECEIVER_MODES = boost::assign::list_of<std::string>(ALWAYS) (RECEIVER);
+const std::vector<std::string> SENDER_MODES = boost::assign::list_of<std::string>(ALWAYS) (SENDER);
+
+pn_bytes_t convert(const std::string& s)
+{
+ pn_bytes_t result;
+ result.start = const_cast<char*>(s.data());
+ result.size = s.size();
+ return result;
+}
+
+bool bind(const Variant::Map& options, const std::string& name, std::string& variable)
+{
+ Variant::Map::const_iterator j = options.find(name);
+ if (j == options.end()) {
+ return false;
+ } else {
+ variable = j->second.asString();
+ return true;
+ }
+}
+
+bool bind(const Variant::Map& options, const std::string& name, Variant::Map& variable)
+{
+ Variant::Map::const_iterator j = options.find(name);
+ if (j == options.end()) {
+ return false;
+ } else {
+ variable = j->second.asMap();
+ return true;
+ }
+}
+
+bool bind(const Address& address, const std::string& name, std::string& variable)
+{
+ return bind(address.getOptions(), name, variable);
+}
+
+bool bind(const Address& address, const std::string& name, Variant::Map& variable)
+{
+ return bind(address.getOptions(), name, variable);
+}
+
+bool in(const std::string& value, const std::vector<std::string>& choices)
+{
+ for (std::vector<std::string>::const_iterator i = choices.begin(); i != choices.end(); ++i) {
+ if (value == *i) return true;
+ }
+ return false;
+}
+}
+
+AddressHelper::AddressHelper(const Address& address)
+{
+ bind(address, CREATE, createPolicy);
+ bind(address, DELETE, deletePolicy);
+ bind(address, ASSERT, assertPolicy);
+
+ bind(address, NODE, node);
+ bind(address, LINK, link);
+}
+
+bool AddressHelper::createEnabled(CheckMode mode) const
+{
+ return enabled(createPolicy, mode);
+}
+bool AddressHelper::deleteEnabled(CheckMode mode) const
+{
+ return enabled(deletePolicy, mode);
+}
+bool AddressHelper::assertEnabled(CheckMode mode) const
+{
+ return enabled(assertPolicy, mode);
+}
+bool AddressHelper::enabled(const std::string& policy, CheckMode mode) const
+{
+ bool result = false;
+ switch (mode) {
+ case FOR_RECEIVER:
+ result = in(policy, RECEIVER_MODES);
+ break;
+ case FOR_SENDER:
+ result = in(policy, SENDER_MODES);
+ break;
+ }
+ return result;
+}
+
+const qpid::types::Variant::Map& AddressHelper::getNodeProperties() const
+{
+ return node;
+}
+const qpid::types::Variant::Map& AddressHelper::getLinkProperties() const
+{
+ return link;
+}
+
+void AddressHelper::setNodeProperties(pn_terminus_t* terminus)
+{
+ pn_terminus_set_dynamic(terminus, true);
+
+ //properties for dynamically created node:
+ pn_data_t* data = pn_terminus_properties(terminus);
+ if (node.size()) {
+ pn_data_put_map(data);
+ pn_data_enter(data);
+ }
+ for (qpid::types::Variant::Map::const_iterator i = node.begin(); i != node.end(); ++i) {
+ if (i->first == TYPE) {
+ pn_data_put_symbol(data, convert(SUPPORTED_DIST_MODES));
+ pn_data_put_string(data, convert(i->second == TOPIC ? COPY : MOVE));
+ } else {
+ pn_data_put_symbol(data, convert(i->first));
+ pn_data_put_string(data, convert(i->second.asString()));
+ }
+ }
+ if (node.size()) {
+ pn_data_exit(data);
+ }
+}
+
+}}} // namespace qpid::messaging::amqp
diff --git a/cpp/src/qpid/messaging/amqp/AddressHelper.h b/cpp/src/qpid/messaging/amqp/AddressHelper.h
new file mode 100644
index 0000000000..cd0aa1be9e
--- /dev/null
+++ b/cpp/src/qpid/messaging/amqp/AddressHelper.h
@@ -0,0 +1,57 @@
+#ifndef QPID_MESSAGING_AMQP_ADDRESSHELPER_H
+#define QPID_MESSAGING_AMQP_ADDRESSHELPER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/types/Variant.h"
+
+struct pn_terminus_t;
+
+namespace qpid {
+namespace messaging {
+class Address;
+namespace amqp {
+
+class AddressHelper
+{
+ public:
+ enum CheckMode {FOR_RECEIVER, FOR_SENDER};
+
+ AddressHelper(const Address& address);
+ bool createEnabled(CheckMode mode) const;
+ bool deleteEnabled(CheckMode mode) const;
+ bool assertEnabled(CheckMode mode) const;
+
+ void setNodeProperties(pn_terminus_t*);
+ const qpid::types::Variant::Map& getNodeProperties() const;
+ const qpid::types::Variant::Map& getLinkProperties() const;
+ private:
+ std::string createPolicy;
+ std::string assertPolicy;
+ std::string deletePolicy;
+ qpid::types::Variant::Map node;
+ qpid::types::Variant::Map link;
+
+ bool enabled(const std::string& policy, CheckMode mode) const;
+};
+}}} // namespace qpid::messaging::amqp
+
+#endif /*!QPID_MESSAGING_AMQP_ADDRESSHELPER_H*/
diff --git a/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp b/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
new file mode 100644
index 0000000000..b2a9b979b6
--- /dev/null
+++ b/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
@@ -0,0 +1,612 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "ConnectionContext.h"
+#include "DriverImpl.h"
+#include "ReceiverContext.h"
+#include "Sasl.h"
+#include "SenderContext.h"
+#include "SessionContext.h"
+#include "Transport.h"
+#include "qpid/messaging/exceptions.h"
+#include "qpid/messaging/Duration.h"
+#include "qpid/messaging/Message.h"
+#include "qpid/messaging/MessageImpl.h"
+#include "qpid/framing/Buffer.h"
+#include "qpid/framing/ProtocolInitiation.h"
+#include "qpid/framing/Uuid.h"
+#include "qpid/log/Statement.h"
+#include "qpid/sys/Time.h"
+#include <vector>
+extern "C" {
+#include <proton/engine.h>
+}
+
+namespace qpid {
+namespace messaging {
+namespace amqp {
+
+
+ConnectionContext::ConnectionContext(const std::string& u, const qpid::types::Variant::Map& o)
+ : qpid::messaging::ConnectionOptions(o),
+ url(u),
+ engine(pn_transport()),
+ connection(pn_connection()),
+ //note: disabled read/write of header as now handled by engine
+ writeHeader(false),
+ readHeader(false),
+ haveOutput(false),
+ state(DISCONNECTED),
+ codecSwitch(*this)
+{
+ if (pn_transport_bind(engine, connection)) {
+ //error
+ }
+ pn_connection_set_container(connection, "qpid::messaging");//TODO: take this from a connection option
+ bool enableTrace(false);
+ QPID_LOG_TEST_CAT(trace, protocol, enableTrace);
+ if (enableTrace) pn_transport_trace(engine, PN_TRACE_FRM);
+}
+
+ConnectionContext::~ConnectionContext()
+{
+ close();
+ sessions.clear();
+ pn_transport_free(engine);
+ pn_connection_free(connection);
+}
+
+namespace {
+const std::string COLON(":");
+}
+void ConnectionContext::open()
+{
+ qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ if (state != DISCONNECTED) throw qpid::messaging::ConnectionError("Connection was already opened!");
+ if (!driver) driver = DriverImpl::getDefault();
+
+ for (Url::const_iterator i = url.begin(); state != CONNECTED && i != url.end(); ++i) {
+ transport = driver->getTransport(i->protocol, *this);
+ std::stringstream port;
+ port << i->port;
+ id = i->host + COLON + port.str();
+ if (useSasl()) {
+ sasl = std::auto_ptr<Sasl>(new Sasl(id, *this, i->host));
+ }
+ state = CONNECTING;
+ try {
+ QPID_LOG(debug, id << " Connecting ...");
+ transport->connect(i->host, port.str());
+ } catch (const std::exception& e) {
+ QPID_LOG(info, id << " Error while connecting: " << e.what());
+ }
+ while (state == CONNECTING) {
+ lock.wait();
+ }
+ if (state == DISCONNECTED) {
+ QPID_LOG(debug, id << " Failed to connect");
+ transport = boost::shared_ptr<Transport>();
+ } else {
+ QPID_LOG(debug, id << " Connected");
+ }
+ }
+
+ if (state != CONNECTED) throw qpid::messaging::TransportFailure(QPID_MSG("Could not connect to " << url));
+
+ if (sasl.get()) {
+ wakeupDriver();
+ while (!sasl->authenticated()) {
+ QPID_LOG(debug, id << " Waiting to be authenticated...");
+ wait();
+ }
+ QPID_LOG(debug, id << " Authenticated");
+ }
+
+ QPID_LOG(debug, id << " Opening...");
+ pn_connection_open(connection);
+ wakeupDriver(); //want to write
+ while (pn_connection_state(connection) & PN_REMOTE_UNINIT) {
+ wait();
+ }
+ if (!(pn_connection_state(connection) & PN_REMOTE_ACTIVE)) {
+ throw qpid::messaging::ConnectionError("Failed to open connection");
+ }
+ QPID_LOG(debug, id << " Opened");
+}
+
+bool ConnectionContext::isOpen() const
+{
+ qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ return pn_connection_state(connection) & (PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE);
+}
+
+void ConnectionContext::endSession(boost::shared_ptr<SessionContext> ssn)
+{
+ qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ pn_session_close(ssn->session);
+ //TODO: need to destroy session and remove context from map
+ wakeupDriver();
+}
+
+void ConnectionContext::close()
+{
+ qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ if (state != CONNECTED) return;
+ if (!(pn_connection_state(connection) & PN_LOCAL_CLOSED)) {
+ for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); ++i) {
+ //wait for outstanding sends to settle
+ while (!i->second->settled()) {
+ QPID_LOG(debug, "Waiting for sends to settle before closing");
+ wait();//wait until message has been confirmed
+ }
+
+
+ if (!(pn_session_state(i->second->session) & PN_LOCAL_CLOSED)) {
+ pn_session_close(i->second->session);
+ }
+ }
+ pn_connection_close(connection);
+ wakeupDriver();
+ //wait for close to be confirmed by peer?
+ while (!(pn_connection_state(connection) & PN_REMOTE_CLOSED)) {
+ wait();
+ }
+ sessions.clear();
+ }
+ transport->close();
+ while (state != DISCONNECTED) {
+ lock.wait();
+ }
+}
+
+bool ConnectionContext::fetch(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk, qpid::messaging::Message& message, qpid::messaging::Duration timeout)
+{
+ {
+ qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ if (!lnk->capacity) {
+ pn_link_flow(lnk->receiver, 1);
+ wakeupDriver();
+ }
+ }
+ if (get(ssn, lnk, message, timeout)) {
+ qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ if (lnk->capacity) {
+ pn_link_flow(lnk->receiver, 1);//TODO: is this the right approach?
+ wakeupDriver();
+ }
+ return true;
+ } else {
+ {
+ qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ pn_link_drain(lnk->receiver, 0);
+ wakeupDriver();
+ while (pn_link_credit(lnk->receiver) && !pn_link_queued(lnk->receiver)) {
+ QPID_LOG(debug, "Waiting for message or for credit to be drained: credit=" << pn_link_credit(lnk->receiver) << ", queued=" << pn_link_queued(lnk->receiver));
+ wait();
+ }
+ if (lnk->capacity && pn_link_queued(lnk->receiver) == 0) {
+ pn_link_flow(lnk->receiver, lnk->capacity);
+ }
+ }
+ if (get(ssn, lnk, message, qpid::messaging::Duration::IMMEDIATE)) {
+ qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ if (lnk->capacity) {
+ pn_link_flow(lnk->receiver, 1);
+ wakeupDriver();
+ }
+ return true;
+ } else {
+ return false;
+ }
+ }
+}
+
+qpid::sys::AbsTime convert(qpid::messaging::Duration timeout)
+{
+ qpid::sys::AbsTime until;
+ uint64_t ms = timeout.getMilliseconds();
+ if (ms < (uint64_t) (qpid::sys::TIME_INFINITE/qpid::sys::TIME_MSEC)) {
+ return qpid::sys::AbsTime(qpid::sys::now(), ms * qpid::sys::TIME_MSEC);
+ } else {
+ return qpid::sys::FAR_FUTURE;
+ }
+}
+
+bool ConnectionContext::get(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk, qpid::messaging::Message& message, qpid::messaging::Duration timeout)
+{
+ qpid::sys::AbsTime until(convert(timeout));
+ while (true) {
+ qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ pn_delivery_t* current = pn_link_current((pn_link_t*) lnk->receiver);
+ QPID_LOG(debug, "In ConnectionContext::get(), current=" << current);
+ if (current) {
+ qpid::messaging::MessageImpl& impl = MessageImplAccess::get(message);
+ boost::shared_ptr<EncodedMessage> encoded(new EncodedMessage(pn_delivery_pending(current)));
+ ssize_t read = pn_link_recv(lnk->receiver, encoded->getData(), encoded->getSize());
+ if (read < 0) throw qpid::messaging::MessagingException("Failed to read message");
+ encoded->trim((size_t) read);
+ QPID_LOG(debug, "Received message of " << encoded->getSize() << " bytes: ");
+ encoded->init(impl);
+ impl.setEncoded(encoded);
+ impl.setInternalId(ssn->record(current));
+ pn_link_advance(lnk->receiver);
+ return true;
+ } else if (until > qpid::sys::now()) {
+ wait();
+ } else {
+ return false;
+ }
+ }
+ return false;
+}
+
+void ConnectionContext::acknowledge(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message* message, bool cumulative)
+{
+ qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ if (message) {
+ ssn->acknowledge(MessageImplAccess::get(*message).getInternalId(), cumulative);
+ } else {
+ ssn->acknowledge();
+ }
+ wakeupDriver();
+}
+
+
+void ConnectionContext::attach(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<SenderContext> lnk)
+{
+ lnk->configure();
+ attach(ssn->session, (pn_link_t*) lnk->sender);
+ if (!pn_link_remote_target((pn_link_t*) lnk->sender)) {
+ std::string msg("No such target : ");
+ msg += lnk->getTarget();
+ throw qpid::messaging::NotFound(msg);
+ }
+}
+
+void ConnectionContext::attach(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk)
+{
+ lnk->configure();
+ attach(ssn->session, lnk->receiver, lnk->capacity);
+ if (!pn_link_remote_source(lnk->receiver)) {
+ std::string msg("No such source : ");
+ msg += lnk->getSource();
+ throw qpid::messaging::NotFound(msg);
+ }
+}
+
+void ConnectionContext::attach(pn_session_t* /*session*/, pn_link_t* link, int credit)
+{
+ qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ QPID_LOG(debug, "Attaching link " << link << ", state=" << pn_link_state(link));
+ pn_link_open(link);
+ QPID_LOG(debug, "Link attached " << link << ", state=" << pn_link_state(link));
+ if (credit) pn_link_flow(link, credit);
+ wakeupDriver();
+ while (pn_link_state(link) & PN_REMOTE_UNINIT) {
+ QPID_LOG(debug, "waiting for confirmation of link attach for " << link << ", state=" << pn_link_state(link));
+ wait();
+ }
+}
+
+void ConnectionContext::send(boost::shared_ptr<SenderContext> snd, const qpid::messaging::Message& message, bool sync)
+{
+ qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ SenderContext::Delivery* delivery(0);
+ while (!(delivery = snd->send(message))) {
+ QPID_LOG(debug, "Waiting for capacity...");
+ wait();//wait for capacity
+ }
+ wakeupDriver();
+ if (sync) {
+ while (!delivery->accepted()) {
+ QPID_LOG(debug, "Waiting for confirmation...");
+ wait();//wait until message has been confirmed
+ }
+ }
+}
+
+void ConnectionContext::setCapacity(boost::shared_ptr<SenderContext> sender, uint32_t capacity)
+{
+ qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ sender->setCapacity(capacity);
+}
+uint32_t ConnectionContext::getCapacity(boost::shared_ptr<SenderContext> sender)
+{
+ qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ return sender->getCapacity();
+}
+uint32_t ConnectionContext::getUnsettled(boost::shared_ptr<SenderContext> sender)
+{
+ qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ return sender->getUnsettled();
+}
+
+void ConnectionContext::setCapacity(boost::shared_ptr<ReceiverContext> receiver, uint32_t capacity)
+{
+ qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ receiver->setCapacity(capacity);
+ pn_link_flow((pn_link_t*) receiver->receiver, receiver->getCapacity());
+ wakeupDriver();
+}
+uint32_t ConnectionContext::getCapacity(boost::shared_ptr<ReceiverContext> receiver)
+{
+ qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ return receiver->getCapacity();
+}
+uint32_t ConnectionContext::getAvailable(boost::shared_ptr<ReceiverContext> receiver)
+{
+ qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ return receiver->getAvailable();
+}
+uint32_t ConnectionContext::getUnsettled(boost::shared_ptr<ReceiverContext> receiver)
+{
+ qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ return receiver->getUnsettled();
+}
+
+void ConnectionContext::activateOutput()
+{
+ qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ wakeupDriver();
+}
+/**
+ * Expects lock to be held by caller
+ */
+void ConnectionContext::wakeupDriver()
+{
+ switch (state) {
+ case CONNECTED:
+ haveOutput = true;
+ transport->activateOutput();
+ QPID_LOG(debug, "wakeupDriver()");
+ break;
+ case DISCONNECTED:
+ case CONNECTING:
+ QPID_LOG(error, "wakeupDriver() called while not connected");
+ break;
+ }
+}
+
+void ConnectionContext::wait()
+{
+ lock.wait();
+ if (state == DISCONNECTED) {
+ throw qpid::messaging::TransportFailure("Disconnected");
+ }
+ //check for any closed links, sessions or indeed the connection
+}
+
+boost::shared_ptr<SessionContext> ConnectionContext::newSession(bool transactional, const std::string& n)
+{
+ qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ if (transactional) throw qpid::messaging::MessagingException("Transactions not yet supported");
+ std::string name = n.empty() ? qpid::framing::Uuid(true).str() : n;
+ SessionMap::const_iterator i = sessions.find(name);
+ if (i == sessions.end()) {
+ boost::shared_ptr<SessionContext> s(new SessionContext(connection));
+ s->session = pn_session(connection);
+ pn_session_open(s->session);
+ sessions[name] = s;
+ wakeupDriver();
+ while (pn_session_state(s->session) & PN_REMOTE_UNINIT) {
+ wait();
+ }
+ return s;
+ } else {
+ throw qpid::messaging::KeyError(std::string("Session already exists: ") + name);
+ }
+
+}
+boost::shared_ptr<SessionContext> ConnectionContext::getSession(const std::string& name) const
+{
+ SessionMap::const_iterator i = sessions.find(name);
+ if (i == sessions.end()) {
+ throw qpid::messaging::KeyError(std::string("No such session") + name);
+ } else {
+ return i->second;
+ }
+}
+
+void ConnectionContext::setOption(const std::string& name, const qpid::types::Variant& value)
+{
+ set(name, value);
+}
+
+std::string ConnectionContext::getAuthenticatedUsername()
+{
+ return sasl.get() ? sasl->getAuthenticatedUsername() : std::string();
+}
+
+std::size_t ConnectionContext::decode(const char* buffer, std::size_t size)
+{
+ qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ QPID_LOG(trace, id << " decode(" << size << ")");
+ if (readHeader) {
+ size_t decoded = readProtocolHeader(buffer, size);
+ if (decoded < size) {
+ decoded += decode(buffer + decoded, size - decoded);
+ }
+ return decoded;
+ }
+
+ //TODO: Fix pn_engine_input() to take const buffer
+ ssize_t n = pn_transport_input(engine, const_cast<char*>(buffer), size);
+ if (n > 0 || n == PN_EOS) {
+ //If engine returns EOS, have no way of knowing how many bytes
+ //it processed, but can assume none need to be reprocessed so
+ //consider them all read:
+ if (n == PN_EOS) n = size;
+ QPID_LOG_CAT(debug, network, id << " decoded " << n << " bytes from " << size)
+ pn_transport_tick(engine, 0);
+ lock.notifyAll();
+ return n;
+ } else if (n == PN_ERR) {
+ throw qpid::Exception(QPID_MSG("Error on input: " << getError()));
+ } else {
+ return 0;
+ }
+
+}
+std::size_t ConnectionContext::encode(char* buffer, std::size_t size)
+{
+ qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ QPID_LOG(trace, id << " encode(" << size << ")");
+ if (writeHeader) {
+ size_t encoded = writeProtocolHeader(buffer, size);
+ if (encoded < size) {
+ encoded += encode(buffer + encoded, size - encoded);
+ }
+ return encoded;
+ }
+
+ ssize_t n = pn_transport_output(engine, buffer, size);
+ if (n > 0) {
+ QPID_LOG_CAT(debug, network, id << " encoded " << n << " bytes from " << size)
+ haveOutput = true;
+ return n;
+ } else if (n == PN_ERR) {
+ throw qpid::Exception(QPID_MSG("Error on output: " << getError()));
+ } else if (n == PN_EOS) {
+ haveOutput = false;
+ return 0;//Is this right?
+ } else {
+ haveOutput = false;
+ return 0;
+ }
+}
+bool ConnectionContext::canEncode()
+{
+ qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ return haveOutput && state == CONNECTED;
+}
+void ConnectionContext::closed()
+{
+ qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ state = DISCONNECTED;
+ lock.notifyAll();
+}
+void ConnectionContext::opened()
+{
+ qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ state = CONNECTED;
+ lock.notifyAll();
+}
+bool ConnectionContext::isClosed() const
+{
+ return !isOpen();
+}
+namespace {
+qpid::framing::ProtocolVersion AMQP_1_0_PLAIN(1,0,qpid::framing::ProtocolVersion::AMQP);
+}
+
+std::string ConnectionContext::getError()
+{
+ std::stringstream text;
+ pn_error_t* cerror = pn_connection_error(connection);
+ if (cerror) text << "connection error " << pn_error_text(cerror);
+ pn_error_t* terror = pn_transport_error(engine);
+ if (terror) text << "transport error " << pn_error_text(terror);
+ return text.str();
+}
+
+framing::ProtocolVersion ConnectionContext::getVersion() const
+{
+ return AMQP_1_0_PLAIN;
+}
+
+std::size_t ConnectionContext::readProtocolHeader(const char* buffer, std::size_t size)
+{
+ framing::ProtocolInitiation pi(getVersion());
+ if (size >= pi.encodedSize()) {
+ readHeader = false;
+ qpid::framing::Buffer out(const_cast<char*>(buffer), size);
+ pi.decode(out);
+ QPID_LOG_CAT(debug, protocol, id << " read protocol header: " << pi);
+ return pi.encodedSize();
+ } else {
+ return 0;
+ }
+}
+std::size_t ConnectionContext::writeProtocolHeader(char* buffer, std::size_t size)
+{
+ framing::ProtocolInitiation pi(getVersion());
+ if (size >= pi.encodedSize()) {
+ QPID_LOG_CAT(debug, protocol, id << " writing protocol header: " << pi);
+ writeHeader = false;
+ qpid::framing::Buffer out(buffer, size);
+ pi.encode(out);
+ return pi.encodedSize();
+ } else {
+ QPID_LOG_CAT(debug, protocol, id << " insufficient buffer for protocol header: " << size)
+ return 0;
+ }
+}
+bool ConnectionContext::useSasl()
+{
+ return !(mechanism == "none" || mechanism == "NONE" || mechanism == "None");
+}
+
+qpid::sys::Codec& ConnectionContext::getCodec()
+{
+ return codecSwitch;
+}
+
+ConnectionContext::CodecSwitch::CodecSwitch(ConnectionContext& p) : parent(p) {}
+std::size_t ConnectionContext::CodecSwitch::decode(const char* buffer, std::size_t size)
+{
+ qpid::sys::ScopedLock<qpid::sys::Monitor> l(parent.lock);
+ size_t decoded = 0;
+ if (parent.sasl.get() && !parent.sasl->authenticated()) {
+ decoded = parent.sasl->decode(buffer, size);
+ if (!parent.sasl->authenticated()) return decoded;
+ }
+ if (decoded < size) {
+ if (parent.sasl.get() && parent.sasl->getSecurityLayer()) decoded += parent.sasl->getSecurityLayer()->decode(buffer+decoded, size-decoded);
+ else decoded += parent.decode(buffer+decoded, size-decoded);
+ }
+ return decoded;
+}
+std::size_t ConnectionContext::CodecSwitch::encode(char* buffer, std::size_t size)
+{
+ qpid::sys::ScopedLock<qpid::sys::Monitor> l(parent.lock);
+ size_t encoded = 0;
+ if (parent.sasl.get() && parent.sasl->canEncode()) {
+ encoded += parent.sasl->encode(buffer, size);
+ if (!parent.sasl->authenticated()) return encoded;
+ }
+ if (encoded < size) {
+ if (parent.sasl.get() && parent.sasl->getSecurityLayer()) encoded += parent.sasl->getSecurityLayer()->encode(buffer+encoded, size-encoded);
+ else encoded += parent.encode(buffer+encoded, size-encoded);
+ }
+ return encoded;
+}
+bool ConnectionContext::CodecSwitch::canEncode()
+{
+ qpid::sys::ScopedLock<qpid::sys::Monitor> l(parent.lock);
+ if (parent.sasl.get()) {
+ if (parent.sasl->canEncode()) return true;
+ else if (!parent.sasl->authenticated()) return false;
+ else if (parent.sasl->getSecurityLayer()) return parent.sasl->getSecurityLayer()->canEncode();
+ }
+ return parent.canEncode();
+}
+
+
+}}} // namespace qpid::messaging::amqp
diff --git a/cpp/src/qpid/messaging/amqp/ConnectionContext.h b/cpp/src/qpid/messaging/amqp/ConnectionContext.h
new file mode 100644
index 0000000000..3718184365
--- /dev/null
+++ b/cpp/src/qpid/messaging/amqp/ConnectionContext.h
@@ -0,0 +1,150 @@
+#ifndef QPID_MESSAGING_AMQP_CONNECTIONCONTEXT_H
+#define QPID_MESSAGING_AMQP_CONNECTIONCONTEXT_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <deque>
+#include <map>
+#include <memory>
+#include <string>
+#include <boost/shared_ptr.hpp>
+#include "qpid/Url.h"
+#include "qpid/messaging/ConnectionOptions.h"
+#include "qpid/sys/AtomicValue.h"
+#include "qpid/sys/ConnectionCodec.h"
+#include "qpid/sys/Monitor.h"
+#include "qpid/types/Variant.h"
+#include "qpid/messaging/amqp/TransportContext.h"
+
+struct pn_connection_t;
+struct pn_link_t;
+struct pn_session_t;
+struct pn_transport_t;
+
+
+namespace qpid {
+namespace framing {
+class ProtocolVersion;
+}
+namespace messaging {
+class Duration;
+class Message;
+namespace amqp {
+
+class DriverImpl;
+class ReceiverContext;
+class Sasl;
+class SessionContext;
+class SenderContext;
+class Transport;
+
+/**
+ *
+ */
+class ConnectionContext : public qpid::sys::ConnectionCodec, public qpid::messaging::ConnectionOptions, public TransportContext
+{
+ public:
+ ConnectionContext(const std::string& url, const qpid::types::Variant::Map& options);
+ ~ConnectionContext();
+ void open();
+ bool isOpen() const;
+ void close();
+ boost::shared_ptr<SessionContext> newSession(bool transactional, const std::string& name);
+ boost::shared_ptr<SessionContext> getSession(const std::string& name) const;
+ void endSession(boost::shared_ptr<SessionContext>);
+ void attach(boost::shared_ptr<SessionContext>, boost::shared_ptr<SenderContext>);
+ void attach(boost::shared_ptr<SessionContext>, boost::shared_ptr<ReceiverContext>);
+ void send(boost::shared_ptr<SenderContext> ctxt, const qpid::messaging::Message& message, bool sync);
+ bool fetch(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk, qpid::messaging::Message& message, qpid::messaging::Duration timeout);
+ bool get(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk, qpid::messaging::Message& message, qpid::messaging::Duration timeout);
+ void acknowledge(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message* message, bool cumulative);
+
+ void setOption(const std::string& name, const qpid::types::Variant& value);
+ std::string getAuthenticatedUsername();
+
+ void setCapacity(boost::shared_ptr<SenderContext>, uint32_t);
+ uint32_t getCapacity(boost::shared_ptr<SenderContext>);
+ uint32_t getUnsettled(boost::shared_ptr<SenderContext>);
+
+ void setCapacity(boost::shared_ptr<ReceiverContext>, uint32_t);
+ uint32_t getCapacity(boost::shared_ptr<ReceiverContext>);
+ uint32_t getAvailable(boost::shared_ptr<ReceiverContext>);
+ uint32_t getUnsettled(boost::shared_ptr<ReceiverContext>);
+
+
+ void activateOutput();
+ qpid::sys::Codec& getCodec();
+ //ConnectionCodec interface:
+ std::size_t decode(const char* buffer, std::size_t size);
+ std::size_t encode(char* buffer, std::size_t size);
+ bool canEncode();
+ void closed();
+ bool isClosed() const;
+ framing::ProtocolVersion getVersion() const;
+ //additionally, Transport needs:
+ void opened();//signal successful connection
+
+ private:
+ typedef std::map<std::string, boost::shared_ptr<SessionContext> > SessionMap;
+ qpid::Url url;
+
+ boost::shared_ptr<DriverImpl> driver;
+ boost::shared_ptr<Transport> transport;
+
+ pn_transport_t* engine;
+ pn_connection_t* connection;
+ SessionMap sessions;
+ mutable qpid::sys::Monitor lock;
+ bool writeHeader;
+ bool readHeader;
+ bool haveOutput;
+ std::string id;
+ enum {
+ DISCONNECTED,
+ CONNECTING,
+ CONNECTED
+ } state;
+ std::auto_ptr<Sasl> sasl;
+ class CodecSwitch : public qpid::sys::Codec
+ {
+ public:
+ CodecSwitch(ConnectionContext&);
+ std::size_t decode(const char* buffer, std::size_t size);
+ std::size_t encode(char* buffer, std::size_t size);
+ bool canEncode();
+ private:
+ ConnectionContext& parent;
+ };
+ CodecSwitch codecSwitch;
+
+ void wait();
+ void wakeupDriver();
+ void attach(pn_session_t*, pn_link_t*, int credit=0);
+
+ std::size_t readProtocolHeader(const char* buffer, std::size_t size);
+ std::size_t writeProtocolHeader(char* buffer, std::size_t size);
+ std::string getError();
+ bool useSasl();
+};
+
+}}} // namespace qpid::messaging::amqp
+
+#endif /*!QPID_MESSAGING_AMQP_CONNECTIONCONTEXT_H*/
diff --git a/cpp/src/qpid/messaging/amqp/ConnectionHandle.cpp b/cpp/src/qpid/messaging/amqp/ConnectionHandle.cpp
new file mode 100644
index 0000000000..0c4ec2bfcb
--- /dev/null
+++ b/cpp/src/qpid/messaging/amqp/ConnectionHandle.cpp
@@ -0,0 +1,84 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "ConnectionHandle.h"
+#include "ConnectionContext.h"
+#include "SessionHandle.h"
+#include "qpid/messaging/Session.h"
+#include "qpid/messaging/ProtocolRegistry.h"
+
+namespace qpid {
+namespace messaging {
+namespace amqp {
+// Static constructor which registers this implementation in the ProtocolRegistry
+namespace {
+ConnectionImpl* create(const std::string& u, const qpid::types::Variant::Map& o)
+{
+ return new ConnectionHandle(u, o);
+}
+
+struct StaticInit
+{
+ StaticInit()
+ {
+ ProtocolRegistry::add("amqp1.0", &create);
+ };
+} init;
+}
+
+ConnectionHandle::ConnectionHandle(const std::string& url, const qpid::types::Variant::Map& options) : connection(new ConnectionContext(url, options)) {}
+ConnectionHandle::ConnectionHandle(boost::shared_ptr<ConnectionContext> c) : connection(c) {}
+
+void ConnectionHandle::open()
+{
+ connection->open();
+}
+
+bool ConnectionHandle::isOpen() const
+{
+ return connection->isOpen();
+}
+
+void ConnectionHandle::close()
+{
+ connection->close();
+}
+
+Session ConnectionHandle::newSession(bool transactional, const std::string& name)
+{
+ return qpid::messaging::Session(new SessionHandle(connection, connection->newSession(transactional, name)));
+}
+
+Session ConnectionHandle::getSession(const std::string& name) const
+{
+ return qpid::messaging::Session(new SessionHandle(connection, connection->getSession(name)));
+}
+
+void ConnectionHandle::setOption(const std::string& name, const qpid::types::Variant& value)
+{
+ connection->setOption(name, value);
+}
+
+std::string ConnectionHandle::getAuthenticatedUsername()
+{
+ return connection->getAuthenticatedUsername();
+}
+
+}}} // namespace qpid::messaging::amqp
diff --git a/cpp/src/qpid/messaging/amqp/ConnectionHandle.h b/cpp/src/qpid/messaging/amqp/ConnectionHandle.h
new file mode 100644
index 0000000000..d1eb27f6de
--- /dev/null
+++ b/cpp/src/qpid/messaging/amqp/ConnectionHandle.h
@@ -0,0 +1,58 @@
+#ifndef QPID_MESSAGING_AMQP_CONNECTIONHANDLE_H
+#define QPID_MESSAGING_AMQP_CONNECTIONHANDLE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <boost/shared_ptr.hpp>
+#include "qpid/messaging/ConnectionImpl.h"
+#include "qpid/types/Variant.h"
+
+namespace qpid {
+namespace messaging {
+namespace amqp {
+
+class ConnectionContext;
+/**
+ * Handles are directly referenced by applications; Contexts are
+ * referenced by Handles. This allows a graph structure that
+ * remains intact as long as the application references any part
+ * of it, but that can be automatically reclaimed if the whole
+ * graph becomes unreferenced.
+ */
+class ConnectionHandle : public qpid::messaging::ConnectionImpl
+{
+ public:
+ ConnectionHandle(const std::string& url, const qpid::types::Variant::Map& options);
+ ConnectionHandle(boost::shared_ptr<ConnectionContext>);
+ void open();
+ bool isOpen() const;
+ void close();
+ Session newSession(bool transactional, const std::string& name);
+ Session getSession(const std::string& name) const;
+ void setOption(const std::string& name, const qpid::types::Variant& value);
+ std::string getAuthenticatedUsername();
+ private:
+ boost::shared_ptr<ConnectionContext> connection;
+};
+
+}}} // namespace qpid::messaging::amqp_1.0
+
+#endif /*!QPID_MESSAGING_AMQP_CONNECTIONHANDLE_H*/
diff --git a/cpp/src/qpid/messaging/amqp/DriverImpl.cpp b/cpp/src/qpid/messaging/amqp/DriverImpl.cpp
new file mode 100644
index 0000000000..16307b3c22
--- /dev/null
+++ b/cpp/src/qpid/messaging/amqp/DriverImpl.cpp
@@ -0,0 +1,74 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "DriverImpl.h"
+#include "Transport.h"
+#include "qpid/messaging/exceptions.h"
+#include "qpid/sys/Poller.h"
+#include "qpid/log/Statement.h"
+
+namespace qpid {
+namespace messaging {
+namespace amqp {
+
+DriverImpl::DriverImpl() : poller(new qpid::sys::Poller)
+{
+ start();
+}
+DriverImpl::~DriverImpl()
+{
+ stop();
+}
+
+void DriverImpl::start()
+{
+ thread = qpid::sys::Thread(*poller);
+ QPID_LOG(debug, "Driver started");
+}
+
+void DriverImpl::stop()
+{
+ QPID_LOG(debug, "Driver stopped");
+ poller->shutdown();
+ thread.join();
+}
+
+boost::shared_ptr<Transport> DriverImpl::getTransport(const std::string& protocol, TransportContext& connection)
+{
+ boost::shared_ptr<Transport> t(Transport::create(protocol, connection, poller));
+ if (!t) throw qpid::messaging::ConnectionError("No such transport: " + protocol);
+ return t;
+}
+
+
+qpid::sys::Mutex DriverImpl::defaultLock;
+boost::weak_ptr<DriverImpl> DriverImpl::theDefault;
+boost::shared_ptr<DriverImpl> DriverImpl::getDefault()
+{
+ qpid::sys::Mutex::ScopedLock l(defaultLock);
+ boost::shared_ptr<DriverImpl> p = theDefault.lock();
+ if (!p) {
+ p = boost::shared_ptr<DriverImpl>(new DriverImpl);
+ theDefault = p;
+ }
+ return p;
+}
+
+}}} // namespace qpid::messaging::amqp
diff --git a/cpp/src/qpid/messaging/amqp/DriverImpl.h b/cpp/src/qpid/messaging/amqp/DriverImpl.h
new file mode 100644
index 0000000000..354fa1ae35
--- /dev/null
+++ b/cpp/src/qpid/messaging/amqp/DriverImpl.h
@@ -0,0 +1,60 @@
+#ifndef QPID_MESSAGING_AMQP_DRIVERIMPL_H
+#define QPID_MESSAGING_AMQP_DRIVERIMPL_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/sys/Mutex.h"
+#include "qpid/sys/Thread.h"
+#include <boost/shared_ptr.hpp>
+#include <boost/weak_ptr.hpp>
+
+namespace qpid {
+namespace sys {
+class Poller;
+}
+namespace messaging {
+namespace amqp {
+class TransportContext;
+class Transport;
+/**
+ *
+ */
+class DriverImpl
+{
+ public:
+ DriverImpl();
+ ~DriverImpl();
+
+ void start();
+ void stop();
+
+ boost::shared_ptr<Transport> getTransport(const std::string& protocol, TransportContext& connection);
+
+ static boost::shared_ptr<DriverImpl> getDefault();
+ private:
+ boost::shared_ptr<qpid::sys::Poller> poller;
+ qpid::sys::Thread thread;
+ static qpid::sys::Mutex defaultLock;
+ static boost::weak_ptr<DriverImpl> theDefault;
+};
+}}} // namespace qpid::messaging::amqp
+
+#endif /*!QPID_MESSAGING_AMQP_DRIVERIMPL_H*/
diff --git a/cpp/src/qpid/messaging/amqp/EncodedMessage.cpp b/cpp/src/qpid/messaging/amqp/EncodedMessage.cpp
new file mode 100644
index 0000000000..54de3eae45
--- /dev/null
+++ b/cpp/src/qpid/messaging/amqp/EncodedMessage.cpp
@@ -0,0 +1,263 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/messaging/amqp/EncodedMessage.h"
+#include "qpid/messaging/Address.h"
+#include "qpid/messaging/MessageImpl.h"
+#include "qpid/amqp/Decoder.h"
+#include <boost/lexical_cast.hpp>
+#include <string.h>
+
+namespace qpid {
+namespace messaging {
+namespace amqp {
+
+using namespace qpid::amqp;
+
+EncodedMessage::EncodedMessage(size_t s) : size(s), data(size ? new char[size] : 0)
+{
+ init();
+}
+
+EncodedMessage::EncodedMessage() : size(0), data(0)
+{
+ init();
+}
+
+EncodedMessage::EncodedMessage(const EncodedMessage& other) : size(other.size), data(size ? new char[size] : 0)
+{
+ init();
+}
+
+void EncodedMessage::init()
+{
+ //init all CharSequence members
+ deliveryAnnotations.init();
+ messageAnnotations.init();
+ userId.init();
+ to.init();
+ subject.init();
+ replyTo.init();
+ contentType.init();
+ contentEncoding.init();
+ groupId.init();
+ replyToGroupId.init();
+ applicationProperties.init();
+ body.init();
+ footer.init();
+}
+
+EncodedMessage::~EncodedMessage()
+{
+ delete[] data;
+}
+
+size_t EncodedMessage::getSize() const
+{
+ return size;
+}
+void EncodedMessage::trim(size_t t)
+{
+ size = t;
+}
+void EncodedMessage::resize(size_t s)
+{
+ delete[] data;
+ size = s;
+ data = new char[size];
+}
+
+char* EncodedMessage::getData()
+{
+ return data;
+}
+const char* EncodedMessage::getData() const
+{
+ return data;
+}
+
+void EncodedMessage::init(qpid::messaging::MessageImpl& impl)
+{
+ //initial scan of raw data
+ qpid::amqp::Decoder decoder(data, size);
+ InitialScan reader(*this, impl);
+ decoder.read(reader);
+ bareMessage = reader.getBareMessage();
+ if (bareMessage.data && !bareMessage.size) {
+ bareMessage.size = (data + size) - bareMessage.data;
+ }
+
+}
+void EncodedMessage::populate(qpid::types::Variant::Map& map) const
+{
+ //decode application properties
+ if (applicationProperties) {
+ qpid::amqp::Decoder decoder(applicationProperties.data, applicationProperties.size);
+ decoder.readMap(map);
+ }
+ //add in 'x-amqp-' prefixed values
+ if (!!firstAcquirer) {
+ map["x-amqp-first-acquirer"] = firstAcquirer.get();
+ }
+ if (!!deliveryCount) {
+ map["x-amqp-delivery-count"] = deliveryCount.get();
+ }
+ if (to) {
+ map["x-amqp-delivery-count"] = to.str();
+ }
+ if (!!absoluteExpiryTime) {
+ map["x-amqp-absolute-expiry-time"] = absoluteExpiryTime.get();
+ }
+ if (!!creationTime) {
+ map["x-amqp-creation-time"] = creationTime.get();
+ }
+ if (groupId) {
+ map["x-amqp-group-id"] = groupId.str();
+ }
+ if (!!groupSequence) {
+ map["x-amqp-qroup-sequence"] = groupSequence.get();
+ }
+ if (replyToGroupId) {
+ map["x-amqp-reply-to-group-id"] = replyToGroupId.str();
+ }
+ //add in any annotations
+ if (deliveryAnnotations) {
+ qpid::types::Variant::Map& annotations = map["x-amqp-delivery-annotations"].asMap();
+ qpid::amqp::Decoder decoder(deliveryAnnotations.data, deliveryAnnotations.size);
+ decoder.readMap(annotations);
+ }
+ if (messageAnnotations) {
+ qpid::types::Variant::Map& annotations = map["x-amqp-message-annotations"].asMap();
+ qpid::amqp::Decoder decoder(messageAnnotations.data, messageAnnotations.size);
+ decoder.readMap(annotations);
+ }
+}
+qpid::amqp::CharSequence EncodedMessage::getBareMessage() const
+{
+ return bareMessage;
+}
+
+void EncodedMessage::getReplyTo(qpid::messaging::Address& a) const
+{
+ a = qpid::messaging::Address(replyTo.str());
+}
+void EncodedMessage::getSubject(std::string& s) const
+{
+ s.assign(subject.data, subject.size);
+}
+void EncodedMessage::getContentType(std::string& s) const
+{
+ s.assign(contentType.data, contentType.size);
+}
+void EncodedMessage::getUserId(std::string& s) const
+{
+ s.assign(userId.data, userId.size);
+}
+void EncodedMessage::getMessageId(std::string& s) const
+{
+ messageId.assign(s);
+}
+void EncodedMessage::getCorrelationId(std::string& s) const
+{
+ correlationId.assign(s);
+}
+void EncodedMessage::getBody(std::string& s) const
+{
+ s.assign(body.data, body.size);
+}
+
+qpid::amqp::CharSequence EncodedMessage::getBody() const
+{
+ return body;
+}
+
+bool EncodedMessage::hasHeaderChanged(const qpid::messaging::MessageImpl& msg) const
+{
+ if (!durable) {
+ if (msg.isDurable()) return true;
+ } else {
+ if (durable.get() != msg.isDurable()) return true;
+ }
+
+ if (!priority) {
+ if (msg.getPriority() != 4) return true;
+ } else {
+ if (priority.get() != msg.getPriority()) return true;
+ }
+
+ if (msg.getTtl() && (!ttl || msg.getTtl() != ttl.get())) {
+ return true;
+ }
+
+ //first-acquirer can't be changed via Message interface as yet
+
+ if (msg.isRedelivered() && (!deliveryCount || deliveryCount.get() == 0)) {
+ return true;
+ }
+
+ return false;
+}
+
+
+EncodedMessage::InitialScan::InitialScan(EncodedMessage& e, qpid::messaging::MessageImpl& m) : em(e), mi(m)
+{
+ //set up defaults as needed:
+ mi.setPriority(4);
+}
+//header:
+void EncodedMessage::InitialScan::onDurable(bool b) { mi.setDurable(b); em.durable = b; }
+void EncodedMessage::InitialScan::onPriority(uint8_t i) { mi.setPriority(i); em.priority = i; }
+void EncodedMessage::InitialScan::onTtl(uint32_t i) { mi.setTtl(i); em.ttl = i; }
+void EncodedMessage::InitialScan::onFirstAcquirer(bool b) { em.firstAcquirer = b; }
+void EncodedMessage::InitialScan::onDeliveryCount(uint32_t i)
+{
+ mi.setRedelivered(i);
+ em.deliveryCount = i;
+}
+
+//properties:
+void EncodedMessage::InitialScan::onMessageId(uint64_t v) { em.messageId.set(v); }
+void EncodedMessage::InitialScan::onMessageId(const qpid::amqp::CharSequence& v, qpid::types::VariantType t) { em.messageId.set(v, t); }
+void EncodedMessage::InitialScan::onUserId(const qpid::amqp::CharSequence& v) { em.userId = v; }
+void EncodedMessage::InitialScan::onTo(const qpid::amqp::CharSequence& v) { em.to = v; }
+void EncodedMessage::InitialScan::onSubject(const qpid::amqp::CharSequence& v) { em.subject = v; }
+void EncodedMessage::InitialScan::onReplyTo(const qpid::amqp::CharSequence& v) { em.replyTo = v;}
+void EncodedMessage::InitialScan::onCorrelationId(uint64_t v) { em.correlationId.set(v); }
+void EncodedMessage::InitialScan::onCorrelationId(const qpid::amqp::CharSequence& v, qpid::types::VariantType t) { em.correlationId.set(v, t); }
+void EncodedMessage::InitialScan::onContentType(const qpid::amqp::CharSequence& v) { em.contentType = v; }
+void EncodedMessage::InitialScan::onContentEncoding(const qpid::amqp::CharSequence& v) { em.contentEncoding = v; }
+void EncodedMessage::InitialScan::onAbsoluteExpiryTime(int64_t i) { em.absoluteExpiryTime = i; }
+void EncodedMessage::InitialScan::onCreationTime(int64_t i) { em.creationTime = i; }
+void EncodedMessage::InitialScan::onGroupId(const qpid::amqp::CharSequence& v) { em.groupId = v; }
+void EncodedMessage::InitialScan::onGroupSequence(uint32_t i) { em.groupSequence = i; }
+void EncodedMessage::InitialScan::onReplyToGroupId(const qpid::amqp::CharSequence& v) { em.replyToGroupId = v; }
+
+void EncodedMessage::InitialScan::onApplicationProperties(const qpid::amqp::CharSequence& v) { em.applicationProperties = v; }
+void EncodedMessage::InitialScan::onDeliveryAnnotations(const qpid::amqp::CharSequence& v) { em.deliveryAnnotations = v; }
+void EncodedMessage::InitialScan::onMessageAnnotations(const qpid::amqp::CharSequence& v) { em.messageAnnotations = v; }
+void EncodedMessage::InitialScan::onBody(const qpid::amqp::CharSequence& v, const qpid::amqp::Descriptor&)
+{
+ //TODO: how to communicate the type, i.e. descriptor?
+ em.body = v;
+}
+void EncodedMessage::InitialScan::onBody(const qpid::types::Variant&, const qpid::amqp::Descriptor&) {}
+void EncodedMessage::InitialScan::onFooter(const qpid::amqp::CharSequence& v) { em.footer = v; }
+
+}}} // namespace qpid::messaging::amqp
diff --git a/cpp/src/qpid/messaging/amqp/EncodedMessage.h b/cpp/src/qpid/messaging/amqp/EncodedMessage.h
new file mode 100644
index 0000000000..09a9d948d5
--- /dev/null
+++ b/cpp/src/qpid/messaging/amqp/EncodedMessage.h
@@ -0,0 +1,177 @@
+#ifndef QPID_MESSAGING_AMQP_ENCODEDMESSAGE_H
+#define QPID_MESSAGING_AMQP_ENCODEDMESSAGE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/amqp/CharSequence.h"
+#include "qpid/amqp/MessageId.h"
+#include "qpid/amqp/MessageReader.h"
+#include "qpid/sys/IntegerTypes.h"
+#include "qpid/types/Variant.h"
+#include <boost/optional.hpp>
+
+namespace qpid {
+namespace amqp {
+struct Descriptor;
+}
+namespace messaging {
+class Address;
+class MessageImpl;
+namespace amqp {
+
+/**
+ * Used to 'lazy-decode' an AMQP 1.0 message.
+ *
+ * There are four categories of data item:
+ *
+ * (i) simple, fixed width primitives - priority, ttl, durability,
+ * delivery count - for which lazy-decoding doesn't buy much. These
+ * are decoded unconditionally on an initial scan of the message.
+ *
+ * (ii) standard variable length string properties - subject,
+ * message-id, user-id etc - which require conversion to a std::string
+ * for returning to the application. By delaying the conversion of
+ * these to a std::string we can avoid allocation & copying until it
+ * is actually required. The initial scan of the message merely
+ * records the position of these strings within the raw message data.
+ *
+ * (iii) custom, application defined headers. These form a map, and
+ * again, delaying the creation of that map until it is actually
+ * required can be advantageous. The initial scan of the message merely
+ * records the position of this section within the raw message data.
+ *
+ * (iv) the body content. This may be retreived as a std::string, or
+ * as a char*. Avoiding conversion to the string until it is required
+ * is advantageous. The initial scan of the message merely records the
+ * position of this section within the raw message data.
+ *
+ * At present the Message class only explicitly exposes some of the
+ * standard property and headers defined by AMQP 1.0. The remainder
+ * will have to be accessed through the message 'headers' map, using
+ * the 'x-amqp-' prefix.
+ */
+class EncodedMessage
+{
+ public:
+ EncodedMessage();
+ EncodedMessage(size_t);
+ EncodedMessage(const EncodedMessage&);
+ ~EncodedMessage();
+
+
+ size_t getSize() const;
+ char* getData();
+ const char* getData() const;
+ void trim(size_t);
+ void resize(size_t);
+
+ void getReplyTo(qpid::messaging::Address&) const;
+ void getSubject(std::string&) const;
+ void getContentType(std::string&) const;
+ void getMessageId(std::string&) const;
+ void getUserId(std::string&) const;
+ void getCorrelationId(std::string&) const;
+
+ void init(qpid::messaging::MessageImpl&);
+ void populate(qpid::types::Variant::Map&) const;
+ void getBody(std::string&) const;
+ qpid::amqp::CharSequence getBareMessage() const;
+ qpid::amqp::CharSequence getBody() const;
+ bool hasHeaderChanged(const qpid::messaging::MessageImpl&) const;
+ private:
+ size_t size;
+ char* data;
+
+ class InitialScan : public qpid::amqp::MessageReader
+ {
+ public:
+ InitialScan(EncodedMessage& e, qpid::messaging::MessageImpl& m);
+ //header:
+ void onDurable(bool b);
+ void onPriority(uint8_t i);
+ void onTtl(uint32_t i);
+ void onFirstAcquirer(bool b);
+ void onDeliveryCount(uint32_t i);
+ //properties:
+ void onMessageId(uint64_t);
+ void onMessageId(const qpid::amqp::CharSequence&, qpid::types::VariantType);
+ void onUserId(const qpid::amqp::CharSequence& v);
+ void onTo(const qpid::amqp::CharSequence& v);
+ void onSubject(const qpid::amqp::CharSequence& v);
+ void onReplyTo(const qpid::amqp::CharSequence& v);
+ void onCorrelationId(uint64_t);
+ void onCorrelationId(const qpid::amqp::CharSequence&, qpid::types::VariantType);
+ void onContentType(const qpid::amqp::CharSequence& v);
+ void onContentEncoding(const qpid::amqp::CharSequence& v);
+ void onAbsoluteExpiryTime(int64_t i);
+ void onCreationTime(int64_t);
+ void onGroupId(const qpid::amqp::CharSequence&);
+ void onGroupSequence(uint32_t);
+ void onReplyToGroupId(const qpid::amqp::CharSequence&);
+
+ void onApplicationProperties(const qpid::amqp::CharSequence&);
+ void onDeliveryAnnotations(const qpid::amqp::CharSequence&);
+ void onMessageAnnotations(const qpid::amqp::CharSequence&);
+ void onBody(const qpid::amqp::CharSequence&, const qpid::amqp::Descriptor&);
+ void onBody(const qpid::types::Variant&, const qpid::amqp::Descriptor&);
+ void onFooter(const qpid::amqp::CharSequence&);
+ private:
+ EncodedMessage& em;
+ qpid::messaging::MessageImpl& mi;
+ };
+ //header:
+ boost::optional<bool> durable;
+ boost::optional<uint8_t> priority;
+ boost::optional<uint32_t> ttl;
+ boost::optional<bool> firstAcquirer;
+ boost::optional<uint32_t> deliveryCount;
+ //annotations:
+ qpid::amqp::CharSequence deliveryAnnotations;
+ qpid::amqp::CharSequence messageAnnotations;
+
+ qpid::amqp::CharSequence bareMessage;//properties, application-properties and content
+ //properties:
+ qpid::amqp::MessageId messageId;
+ qpid::amqp::CharSequence userId;
+ qpid::amqp::CharSequence to;
+ qpid::amqp::CharSequence subject;
+ qpid::amqp::CharSequence replyTo;
+ qpid::amqp::MessageId correlationId;
+ qpid::amqp::CharSequence contentType;
+ qpid::amqp::CharSequence contentEncoding;
+ boost::optional<int64_t> absoluteExpiryTime;
+ boost::optional<int64_t> creationTime;
+ qpid::amqp::CharSequence groupId;
+ boost::optional<uint32_t> groupSequence;
+ qpid::amqp::CharSequence replyToGroupId;
+ //application-properties:
+ qpid::amqp::CharSequence applicationProperties;
+ qpid::amqp::CharSequence body;
+ //footer:
+ qpid::amqp::CharSequence footer;
+
+ void init();
+ //not implemented:
+ EncodedMessage& operator=(const EncodedMessage&);
+};
+}}} // namespace qpid::messaging::amqp
+
+#endif /*!QPID_MESSAGING_ENCODEDMESSAGE_H*/
diff --git a/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp b/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp
new file mode 100644
index 0000000000..414793c7fd
--- /dev/null
+++ b/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp
@@ -0,0 +1,146 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/messaging/amqp/ReceiverContext.h"
+#include "qpid/messaging/amqp/AddressHelper.h"
+#include "qpid/messaging/Duration.h"
+#include "qpid/messaging/Message.h"
+#include "qpid/amqp/descriptors.h"
+extern "C" {
+#include <proton/engine.h>
+}
+
+namespace qpid {
+namespace messaging {
+namespace amqp {
+//TODO: proper conversion to wide string for address
+ReceiverContext::ReceiverContext(pn_session_t* session, const std::string& n, const qpid::messaging::Address& a)
+ : name(n),
+ address(a),
+ receiver(pn_receiver(session, name.c_str())),
+ capacity(0) {}
+ReceiverContext::~ReceiverContext()
+{
+ pn_link_free(receiver);
+}
+
+void ReceiverContext::setCapacity(uint32_t c)
+{
+ if (c != capacity) {
+ //stop
+ capacity = c;
+ //reissue credit
+ }
+}
+
+uint32_t ReceiverContext::getCapacity()
+{
+ return capacity;
+}
+
+uint32_t ReceiverContext::getAvailable()
+{
+ uint32_t count(0);
+ for (pn_delivery_t* d = pn_unsettled_head(receiver); d; d = pn_unsettled_next(d)) {
+ ++count;
+ if (d == pn_link_current(receiver)) break;
+ }
+ return count;
+}
+
+uint32_t ReceiverContext::getUnsettled()
+{
+ uint32_t count(0);
+ for (pn_delivery_t* d = pn_unsettled_head(receiver); d; d = pn_unsettled_next(d)) {
+ ++count;
+ }
+ return count;
+}
+
+void ReceiverContext::close()
+{
+
+}
+
+const std::string& ReceiverContext::getName() const
+{
+ return name;
+}
+
+const std::string& ReceiverContext::getSource() const
+{
+ return address.getName();
+}
+namespace {
+pn_bytes_t convert(const std::string& s)
+{
+ pn_bytes_t result;
+ result.start = const_cast<char*>(s.data());
+ result.size = s.size();
+ return result;
+}
+bool hasWildcards(const std::string& key)
+{
+ return key.find('*') != std::string::npos || key.find('#') != std::string::npos;
+}
+
+uint64_t getFilterDescriptor(const std::string& key)
+{
+ return hasWildcards(key) ? qpid::amqp::filters::LEGACY_TOPIC_FILTER_CODE : qpid::amqp::filters::LEGACY_DIRECT_FILTER_CODE;
+}
+}
+
+void ReceiverContext::configure() const
+{
+ configure(pn_link_source(receiver));
+}
+void ReceiverContext::configure(pn_terminus_t* source) const
+{
+ pn_terminus_set_address(source, address.getName().c_str());
+ //dynamic create:
+ AddressHelper helper(address);
+ if (helper.createEnabled(AddressHelper::FOR_RECEIVER)) {
+ helper.setNodeProperties(source);
+ }
+
+ if (!address.getSubject().empty()) {
+ //filter:
+ pn_data_t* filter = pn_terminus_filter(source);
+ pn_data_put_map(filter);
+ pn_data_enter(filter);
+ pn_data_put_symbol(filter, convert("subject"));
+ //TODO: At present inserting described values into the map doesn't seem to work; correct this once resolved
+ //pn_data_put_described(filter);
+ //pn_data_enter(filter);
+ //pn_data_put_ulong(filter, getFilterDescriptor(address.getSubject()));
+ pn_data_put_string(filter, convert(address.getSubject()));
+ //pn_data_exit(filter);
+ pn_data_exit(filter);
+ }
+}
+
+bool ReceiverContext::isClosed() const
+{
+ return false;//TODO
+}
+
+
+
+}}} // namespace qpid::messaging::amqp
diff --git a/cpp/src/qpid/messaging/amqp/ReceiverContext.h b/cpp/src/qpid/messaging/amqp/ReceiverContext.h
new file mode 100644
index 0000000000..34ecdda6be
--- /dev/null
+++ b/cpp/src/qpid/messaging/amqp/ReceiverContext.h
@@ -0,0 +1,68 @@
+#ifndef QPID_MESSAGING_AMQP_RECEIVERCONTEXT_H
+#define QPID_MESSAGING_AMQP_RECEIVERCONTEXT_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/messaging/Address.h"
+#include <string>
+#include "qpid/sys/IntegerTypes.h"
+
+struct pn_link_t;
+struct pn_session_t;
+struct pn_terminus_t;
+
+namespace qpid {
+namespace messaging {
+
+class Duration;
+class Message;
+
+namespace amqp {
+
+/**
+ *
+ */
+class ReceiverContext
+{
+ public:
+ ReceiverContext(pn_session_t* session, const std::string& name, const qpid::messaging::Address& source);
+ ~ReceiverContext();
+ void setCapacity(uint32_t);
+ uint32_t getCapacity();
+ uint32_t getAvailable();
+ uint32_t getUnsettled();
+ void attach();
+ void close();
+ const std::string& getName() const;
+ const std::string& getSource() const;
+ bool isClosed() const;
+ void configure() const;
+ private:
+ friend class ConnectionContext;
+ const std::string name;
+ const Address address;
+ pn_link_t* receiver;
+ uint32_t capacity;
+ void configure(pn_terminus_t*) const;
+};
+}}} // namespace qpid::messaging::amqp
+
+#endif /*!QPID_MESSAGING_AMQP_RECEIVERCONTEXT_H*/
diff --git a/cpp/src/qpid/messaging/amqp/ReceiverHandle.cpp b/cpp/src/qpid/messaging/amqp/ReceiverHandle.cpp
new file mode 100644
index 0000000000..9bf64ebb8d
--- /dev/null
+++ b/cpp/src/qpid/messaging/amqp/ReceiverHandle.cpp
@@ -0,0 +1,106 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "ReceiverHandle.h"
+#include "ConnectionContext.h"
+#include "SessionContext.h"
+#include "SessionHandle.h"
+#include "ReceiverContext.h"
+#include "qpid/messaging/Duration.h"
+#include "qpid/messaging/exceptions.h"
+#include "qpid/messaging/Message.h"
+#include "qpid/messaging/Session.h"
+
+namespace qpid {
+namespace messaging {
+namespace amqp {
+
+ReceiverHandle::ReceiverHandle(boost::shared_ptr<ConnectionContext> c,
+ boost::shared_ptr<SessionContext> s,
+ boost::shared_ptr<ReceiverContext> r
+) : connection(c), session(s), receiver(r) {}
+
+
+bool ReceiverHandle::get(qpid::messaging::Message& message, qpid::messaging::Duration timeout)
+{
+ return connection->get(session, receiver, message, timeout);
+}
+
+qpid::messaging::Message ReceiverHandle::get(qpid::messaging::Duration timeout)
+{
+ qpid::messaging::Message result;
+ if (!get(result, timeout)) throw qpid::messaging::NoMessageAvailable();
+ return result;
+}
+
+bool ReceiverHandle::fetch(qpid::messaging::Message& message, qpid::messaging::Duration timeout)
+{
+ return connection->fetch(session, receiver, message, timeout);
+}
+
+qpid::messaging::Message ReceiverHandle::fetch(qpid::messaging::Duration timeout)
+{
+ qpid::messaging::Message result;
+ if (!fetch(result, timeout)) throw qpid::messaging::NoMessageAvailable();
+ return result;
+}
+
+void ReceiverHandle::setCapacity(uint32_t capacity)
+{
+ connection->setCapacity(receiver, capacity);
+}
+
+uint32_t ReceiverHandle::getCapacity()
+{
+ return connection->getCapacity(receiver);
+}
+
+uint32_t ReceiverHandle::getAvailable()
+{
+ return connection->getAvailable(receiver);
+}
+
+uint32_t ReceiverHandle::getUnsettled()
+{
+ return connection->getUnsettled(receiver);
+}
+
+void ReceiverHandle::close()
+{
+ session->closeReceiver(getName());
+}
+
+const std::string& ReceiverHandle::getName() const
+{
+ return receiver->getName();
+}
+
+qpid::messaging::Session ReceiverHandle::getSession() const
+{
+ //create new SessionHandle instance; i.e. create new handle that shares the same context
+ return qpid::messaging::Session(new SessionHandle(connection, session));
+}
+
+bool ReceiverHandle::isClosed() const
+{
+ return receiver->isClosed();
+}
+
+}}} // namespace qpid::messaging::amqp
diff --git a/cpp/src/qpid/messaging/amqp/ReceiverHandle.h b/cpp/src/qpid/messaging/amqp/ReceiverHandle.h
new file mode 100644
index 0000000000..a1a6f26025
--- /dev/null
+++ b/cpp/src/qpid/messaging/amqp/ReceiverHandle.h
@@ -0,0 +1,63 @@
+#ifndef QPID_MESSAGING_AMQP_RECEIVERHANDLE_H
+#define QPID_MESSAGING_AMQP_RECEIVERHANDLE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <boost/shared_ptr.hpp>
+#include "qpid/messaging/ReceiverImpl.h"
+
+namespace qpid {
+namespace messaging {
+namespace amqp {
+
+class ConnectionContext;
+class SessionContext;
+class ReceiverContext;
+/**
+ *
+ */
+class ReceiverHandle : public qpid::messaging::ReceiverImpl
+{
+ public:
+ ReceiverHandle(boost::shared_ptr<ConnectionContext>,
+ boost::shared_ptr<SessionContext>,
+ boost::shared_ptr<ReceiverContext>
+ );
+ bool get(Message& message, qpid::messaging::Duration timeout);
+ qpid::messaging::Message get(qpid::messaging::Duration timeout);
+ bool fetch(Message& message, qpid::messaging::Duration timeout);
+ qpid::messaging::Message fetch(qpid::messaging::Duration timeout);
+ void setCapacity(uint32_t);
+ uint32_t getCapacity();
+ uint32_t getAvailable();
+ uint32_t getUnsettled();
+ void close();
+ const std::string& getName() const;
+ qpid::messaging::Session getSession() const;
+ bool isClosed() const;
+ private:
+ boost::shared_ptr<ConnectionContext> connection;
+ boost::shared_ptr<SessionContext> session;
+ boost::shared_ptr<ReceiverContext> receiver;
+};
+}}} // namespace qpid::messaging::amqp
+
+#endif /*!QPID_MESSAGING_AMQP_RECEIVERHANDLE_H*/
diff --git a/cpp/src/qpid/messaging/amqp/Sasl.cpp b/cpp/src/qpid/messaging/amqp/Sasl.cpp
new file mode 100644
index 0000000000..a8bae1adda
--- /dev/null
+++ b/cpp/src/qpid/messaging/amqp/Sasl.cpp
@@ -0,0 +1,157 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "ConnectionContext.h"
+#include "qpid/messaging/amqp/Sasl.h"
+#include "qpid/messaging/exceptions.h"
+#include "qpid/sys/SecurityLayer.h"
+#include "qpid/log/Statement.h"
+#include "qpid/Sasl.h"
+#include "qpid/SaslFactory.h"
+#include "qpid/StringUtils.h"
+#include <sstream>
+
+namespace qpid {
+namespace messaging {
+namespace amqp {
+
+Sasl::Sasl(const std::string& id, ConnectionContext& c, const std::string& hostname_)
+ : qpid::amqp::SaslClient(id), context(c),
+ sasl(qpid::SaslFactory::getInstance().create(c.username, c.password, c.service, hostname_, c.minSsf, c.maxSsf, false)),
+ hostname(hostname_), readHeader(true), writeHeader(true), haveOutput(false), state(NONE) {}
+
+std::size_t Sasl::decode(const char* buffer, std::size_t size)
+{
+ size_t decoded = 0;
+ if (readHeader) {
+ decoded += readProtocolHeader(buffer, size);
+ readHeader = !decoded;
+ }
+ if (state == NONE && decoded < size) {
+ decoded += read(buffer + decoded, size - decoded);
+ }
+ QPID_LOG(trace, id << " Sasl::decode(" << size << "): " << decoded);
+ return decoded;
+}
+
+std::size_t Sasl::encode(char* buffer, std::size_t size)
+{
+ size_t encoded = 0;
+ if (writeHeader) {
+ encoded += writeProtocolHeader(buffer, size);
+ writeHeader = !encoded;
+ }
+ if (encoded < size) {
+ encoded += write(buffer + encoded, size - encoded);
+ }
+ haveOutput = (encoded == size);
+ QPID_LOG(trace, id << " Sasl::encode(" << size << "): " << encoded);
+ return encoded;
+}
+
+bool Sasl::canEncode()
+{
+ QPID_LOG(trace, id << " Sasl::canEncode(): " << writeHeader << " || " << haveOutput);
+ return writeHeader || haveOutput;
+}
+
+void Sasl::mechanisms(const std::string& offered)
+{
+ QPID_LOG_CAT(debug, protocol, id << " Received SASL-MECHANISMS(" << offered << ")");
+ std::string response;
+
+ std::string mechanisms;
+ if (context.mechanism.size()) {
+ std::vector<std::string> allowed = split(context.mechanism, " ");
+ std::vector<std::string> supported = split(offered, " ");
+ std::stringstream intersection;
+ for (std::vector<std::string>::const_iterator i = allowed.begin(); i != allowed.end(); ++i) {
+ if (std::find(supported.begin(), supported.end(), *i) != supported.end()) {
+ intersection << *i << " ";
+ }
+ }
+ mechanisms = intersection.str();
+ } else {
+ mechanisms = offered;
+ }
+
+ if (sasl->start(mechanisms, response)) {
+ init(sasl->getMechanism(), &response, hostname.size() ? &hostname : 0);
+ } else {
+ init(sasl->getMechanism(), 0, hostname.size() ? &hostname : 0);
+ }
+ haveOutput = true;
+ context.activateOutput();
+}
+void Sasl::challenge(const std::string& challenge)
+{
+ QPID_LOG_CAT(debug, protocol, id << " Received SASL-CHALLENGE(" << challenge.size() << " bytes)");
+ std::string r = sasl->step(challenge);
+ response(&r);
+ haveOutput = true;
+ context.activateOutput();
+}
+namespace {
+const std::string EMPTY;
+}
+void Sasl::challenge()
+{
+ QPID_LOG_CAT(debug, protocol, id << " Received SASL-CHALLENGE(null)");
+ std::string r = sasl->step(EMPTY);
+ response(&r);
+}
+void Sasl::outcome(uint8_t result, const std::string& extra)
+{
+ QPID_LOG_CAT(debug, protocol, id << " Received SASL-OUTCOME(" << result << ", " << extra << ")");
+ outcome(result);
+}
+void Sasl::outcome(uint8_t result)
+{
+ QPID_LOG_CAT(debug, protocol, id << " Received SASL-OUTCOME(" << result << ")");
+ if (result) state = FAILED;
+ else state = SUCCEEDED;
+
+ securityLayer = sasl->getSecurityLayer(context.maxFrameSize);
+ if (securityLayer.get()) {
+ securityLayer->init(&context);
+ }
+ context.activateOutput();
+}
+
+qpid::sys::Codec* Sasl::getSecurityLayer()
+{
+ return securityLayer.get();
+}
+
+bool Sasl::authenticated()
+{
+ switch (state) {
+ case SUCCEEDED: return true;
+ case FAILED: throw qpid::messaging::UnauthorizedAccess("Failed to authenticate");
+ case NONE: default: return false;
+ }
+}
+
+std::string Sasl::getAuthenticatedUsername()
+{
+ return sasl->getUserId();
+}
+
+}}} // namespace qpid::messaging::amqp
diff --git a/cpp/src/qpid/messaging/amqp/Sasl.h b/cpp/src/qpid/messaging/amqp/Sasl.h
new file mode 100644
index 0000000000..6657779fdc
--- /dev/null
+++ b/cpp/src/qpid/messaging/amqp/Sasl.h
@@ -0,0 +1,72 @@
+#ifndef QPID_MESSAGING_AMQP_SASL_H
+#define QPID_MESSAGING_AMQP_SASL_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/sys/Codec.h"
+#include "qpid/amqp/SaslClient.h"
+#include <memory>
+
+namespace qpid {
+class Sasl;
+namespace sys {
+class SecurityLayer;
+}
+namespace messaging {
+class ConnectionOptions;
+namespace amqp {
+class ConnectionContext;
+
+/**
+ *
+ */
+class Sasl : public qpid::sys::Codec, qpid::amqp::SaslClient
+{
+ public:
+ Sasl(const std::string& id, ConnectionContext& context, const std::string& hostname);
+ std::size_t decode(const char* buffer, std::size_t size);
+ std::size_t encode(char* buffer, std::size_t size);
+ bool canEncode();
+
+ bool authenticated();
+ qpid::sys::Codec* getSecurityLayer();
+ std::string getAuthenticatedUsername();
+ private:
+ ConnectionContext& context;
+ std::auto_ptr<qpid::Sasl> sasl;
+ std::string hostname;
+ bool readHeader;
+ bool writeHeader;
+ bool haveOutput;
+ enum {
+ NONE, FAILED, SUCCEEDED
+ } state;
+ std::auto_ptr<qpid::sys::SecurityLayer> securityLayer;
+
+ void mechanisms(const std::string&);
+ void challenge(const std::string&);
+ void challenge(); //null != empty string
+ void outcome(uint8_t result, const std::string&);
+ void outcome(uint8_t result);
+};
+}}} // namespace qpid::messaging::amqp
+
+#endif /*!QPID_MESSAGING_AMQP_SASL_H*/
diff --git a/cpp/src/qpid/messaging/amqp/SenderContext.cpp b/cpp/src/qpid/messaging/amqp/SenderContext.cpp
new file mode 100644
index 0000000000..96c4437b89
--- /dev/null
+++ b/cpp/src/qpid/messaging/amqp/SenderContext.cpp
@@ -0,0 +1,363 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/messaging/amqp/SenderContext.h"
+#include "qpid/messaging/amqp/EncodedMessage.h"
+#include "qpid/messaging/amqp/AddressHelper.h"
+#include "qpid/amqp/descriptors.h"
+#include "qpid/amqp/MessageEncoder.h"
+#include "qpid/messaging/exceptions.h"
+#include "qpid/messaging/Message.h"
+#include "qpid/messaging/MessageImpl.h"
+#include "qpid/log/Statement.h"
+extern "C" {
+#include <proton/engine.h>
+}
+#include <boost/shared_ptr.hpp>
+#include <string.h>
+
+namespace qpid {
+namespace messaging {
+namespace amqp {
+//TODO: proper conversion to wide string for address
+SenderContext::SenderContext(pn_session_t* session, const std::string& n, const qpid::messaging::Address& a)
+ : name(n),
+ address(a),
+ sender(pn_sender(session, n.c_str())), capacity(1000) {}
+
+SenderContext::~SenderContext()
+{
+ pn_link_free(sender);
+}
+
+void SenderContext::close()
+{
+
+}
+
+void SenderContext::setCapacity(uint32_t c)
+{
+ if (c < deliveries.size()) throw qpid::messaging::SenderError("Desired capacity is less than unsettled message count!");
+ capacity = c;
+}
+
+uint32_t SenderContext::getCapacity()
+{
+ return capacity;
+}
+
+uint32_t SenderContext::getUnsettled()
+{
+ return processUnsettled();
+}
+
+const std::string& SenderContext::getName() const
+{
+ return name;
+}
+
+const std::string& SenderContext::getTarget() const
+{
+ return address.getName();
+}
+
+SenderContext::Delivery* SenderContext::send(const qpid::messaging::Message& message)
+{
+ if (processUnsettled() < capacity && pn_link_credit(sender)) {
+ deliveries.push_back(Delivery(nextId++));
+ Delivery& delivery = deliveries.back();
+ delivery.encode(MessageImplAccess::get(message), address);
+ delivery.send(sender);
+ return &delivery;
+ } else {
+ return 0;
+ }
+}
+
+uint32_t SenderContext::processUnsettled()
+{
+ //remove accepted messages from front of deque
+ while (!deliveries.empty() && deliveries.front().accepted()) {
+ deliveries.front().settle();
+ deliveries.pop_front();
+ }
+ return deliveries.size();
+}
+namespace {
+class HeaderAdapter : public qpid::amqp::MessageEncoder::Header
+{
+ public:
+ HeaderAdapter(const qpid::messaging::MessageImpl& impl) : msg(impl) {}
+ virtual bool isDurable() const
+ {
+ return msg.isDurable();
+ }
+ virtual uint8_t getPriority() const
+ {
+ return msg.getPriority();
+ }
+ virtual bool hasTtl() const
+ {
+ return msg.getTtl();
+ }
+ virtual uint32_t getTtl() const
+ {
+ return msg.getTtl();
+ }
+ virtual bool isFirstAcquirer() const
+ {
+ return false;
+ }
+ virtual uint32_t getDeliveryCount() const
+ {
+ return msg.isRedelivered() ? 1 : 0;
+ }
+ private:
+ const qpid::messaging::MessageImpl& msg;
+};
+const std::string EMPTY;
+
+class PropertiesAdapter : public qpid::amqp::MessageEncoder::Properties
+{
+ public:
+ PropertiesAdapter(const qpid::messaging::MessageImpl& impl, const std::string& s) : msg(impl), subject(s) {}
+ bool hasMessageId() const
+ {
+ return getMessageId().size();
+ }
+ std::string getMessageId() const
+ {
+ return msg.getMessageId();
+ }
+
+ bool hasUserId() const
+ {
+ return getUserId().size();
+ }
+
+ std::string getUserId() const
+ {
+ return msg.getUserId();
+ }
+
+ bool hasTo() const
+ {
+ return false;//not yet supported
+ }
+
+ std::string getTo() const
+ {
+ return EMPTY;//not yet supported
+ }
+
+ bool hasSubject() const
+ {
+ return subject.size() || getSubject().size();
+ }
+
+ std::string getSubject() const
+ {
+ return subject.size() ? subject : msg.getSubject();
+ }
+
+ bool hasReplyTo() const
+ {
+ return msg.getReplyTo();
+ }
+
+ std::string getReplyTo() const
+ {
+ return msg.getReplyTo().str();
+ }
+
+ bool hasCorrelationId() const
+ {
+ return getCorrelationId().size();
+ }
+
+ std::string getCorrelationId() const
+ {
+ return msg.getCorrelationId();
+ }
+
+ bool hasContentType() const
+ {
+ return getContentType().size();
+ }
+
+ std::string getContentType() const
+ {
+ return msg.getContentType();
+ }
+
+ bool hasContentEncoding() const
+ {
+ return false;//not yet supported
+ }
+
+ std::string getContentEncoding() const
+ {
+ return EMPTY;//not yet supported
+ }
+
+ bool hasAbsoluteExpiryTime() const
+ {
+ return false;//not yet supported
+ }
+
+ int64_t getAbsoluteExpiryTime() const
+ {
+ return 0;//not yet supported
+ }
+
+ bool hasCreationTime() const
+ {
+ return false;//not yet supported
+ }
+
+ int64_t getCreationTime() const
+ {
+ return 0;//not yet supported
+ }
+
+ bool hasGroupId() const
+ {
+ return false;//not yet supported
+ }
+
+ std::string getGroupId() const
+ {
+ return EMPTY;//not yet supported
+ }
+
+ bool hasGroupSequence() const
+ {
+ return false;//not yet supported
+ }
+
+ uint32_t getGroupSequence() const
+ {
+ return 0;//not yet supported
+ }
+
+ bool hasReplyToGroupId() const
+ {
+ return false;//not yet supported
+ }
+
+ std::string getReplyToGroupId() const
+ {
+ return EMPTY;//not yet supported
+ }
+ private:
+ const qpid::messaging::MessageImpl& msg;
+ const std::string subject;
+};
+
+bool changedSubject(const qpid::messaging::MessageImpl& msg, const qpid::messaging::Address& address)
+{
+ return address.getSubject().size() && address.getSubject() != msg.getSubject();
+}
+
+}
+
+SenderContext::Delivery::Delivery(int32_t i) : id(i), token(0) {}
+
+void SenderContext::Delivery::encode(const qpid::messaging::MessageImpl& msg, const qpid::messaging::Address& address)
+{
+ boost::shared_ptr<const EncodedMessage> original = msg.getEncoded();
+
+ if (original && !changedSubject(msg, address)) { //still have the content as received, send at least the bare message unaltered
+ //do we need to alter the header? are durable, priority, ttl, first-acquirer, delivery-count different from what was received?
+ if (original->hasHeaderChanged(msg)) {
+ //since as yet have no annotations, just write the revised header then the rest of the message as received
+ encoded.resize(16/*max header size*/ + original->getBareMessage().size);
+ qpid::amqp::MessageEncoder encoder(encoded.getData(), encoded.getSize());
+ HeaderAdapter header(msg);
+ encoder.writeHeader(header);
+ ::memcpy(encoded.getData() + encoder.getPosition(), original->getBareMessage().data, original->getBareMessage().size);
+ } else {
+ //since as yet have no annotations, if the header hasn't
+ //changed and we still have the original bare message, can
+ //send the entire content as is
+ encoded.resize(original->getSize());
+ ::memcpy(encoded.getData(), original->getData(), original->getSize());
+ }
+ } else {
+ HeaderAdapter header(msg);
+ PropertiesAdapter properties(msg, address.getSubject());
+ //compute size:
+ encoded.resize(qpid::amqp::MessageEncoder::getEncodedSize(header, properties, msg.getHeaders(), msg.getBytes()));
+ QPID_LOG(debug, "Sending message, buffer is " << encoded.getSize() << " bytes")
+ qpid::amqp::MessageEncoder encoder(encoded.getData(), encoded.getSize());
+ //write header:
+ encoder.writeHeader(header);
+ //write delivery-annotations, write message-annotations (none yet supported)
+ //write properties
+ encoder.writeProperties(properties);
+ //write application-properties
+ encoder.writeApplicationProperties(msg.getHeaders());
+ //write body
+ if (msg.getBytes().size()) encoder.writeBinary(msg.getBytes(), &qpid::amqp::message::DATA);//structured content not yet directly supported
+ if (encoder.getPosition() < encoded.getSize()) {
+ QPID_LOG(debug, "Trimming buffer from " << encoded.getSize() << " to " << encoder.getPosition());
+ encoded.trim(encoder.getPosition());
+ }
+ //write footer (no annotations yet supported)
+ }
+}
+void SenderContext::Delivery::send(pn_link_t* sender)
+{
+ pn_delivery_tag_t tag;
+ tag.size = sizeof(id);
+ tag.bytes = reinterpret_cast<const char*>(&id);
+ token = pn_delivery(sender, tag);
+ pn_link_send(sender, encoded.getData(), encoded.getSize());
+ pn_link_advance(sender);
+}
+
+bool SenderContext::Delivery::accepted()
+{
+ return pn_delivery_remote_state(token) == PN_ACCEPTED;
+}
+void SenderContext::Delivery::settle()
+{
+ pn_delivery_settle(token);
+}
+void SenderContext::configure() const
+{
+ configure(pn_link_target(sender));
+}
+void SenderContext::configure(pn_terminus_t* target) const
+{
+ pn_terminus_set_address(target, address.getName().c_str());
+ //dynamic create:
+ AddressHelper helper(address);
+ if (helper.createEnabled(AddressHelper::FOR_SENDER)) {
+ helper.setNodeProperties(target);
+ }
+}
+
+bool SenderContext::settled()
+{
+ return processUnsettled() == 0;
+}
+
+}}} // namespace qpid::messaging::amqp
diff --git a/cpp/src/qpid/messaging/amqp/SenderContext.h b/cpp/src/qpid/messaging/amqp/SenderContext.h
new file mode 100644
index 0000000000..3595379e70
--- /dev/null
+++ b/cpp/src/qpid/messaging/amqp/SenderContext.h
@@ -0,0 +1,90 @@
+#ifndef QPID_MESSAGING_AMQP_SENDERCONTEXT_H
+#define QPID_MESSAGING_AMQP_SENDERCONTEXT_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <deque>
+#include <string>
+#include <vector>
+#include "qpid/sys/IntegerTypes.h"
+#include "qpid/messaging/Address.h"
+#include "qpid/messaging/amqp/EncodedMessage.h"
+
+struct pn_delivery_t;
+struct pn_link_t;
+struct pn_session_t;
+struct pn_terminus_t;
+
+namespace qpid {
+namespace messaging {
+
+class Message;
+class MessageImpl;
+
+namespace amqp {
+/**
+ *
+ */
+class SenderContext
+{
+ public:
+ class Delivery
+ {
+ public:
+ Delivery(int32_t id);
+ void encode(const qpid::messaging::MessageImpl& message, const qpid::messaging::Address&);
+ void send(pn_link_t*);
+ bool accepted();
+ void settle();
+ private:
+ int32_t id;
+ pn_delivery_t* token;
+ EncodedMessage encoded;
+ };
+
+ SenderContext(pn_session_t* session, const std::string& name, const qpid::messaging::Address& target);
+ ~SenderContext();
+ void close();
+ void setCapacity(uint32_t);
+ uint32_t getCapacity();
+ uint32_t getUnsettled();
+ const std::string& getName() const;
+ const std::string& getTarget() const;
+ Delivery* send(const qpid::messaging::Message& message);
+ void configure() const;
+ bool settled();
+ private:
+ friend class ConnectionContext;
+ typedef std::deque<Delivery> Deliveries;
+
+ const std::string name;
+ const qpid::messaging::Address address;
+ pn_link_t* sender;
+ int32_t nextId;
+ Deliveries deliveries;
+ uint32_t capacity;
+
+ uint32_t processUnsettled();
+ void configure(pn_terminus_t*) const;
+};
+}}} // namespace qpid::messaging::amqp
+
+#endif /*!QPID_MESSAGING_AMQP_SENDERCONTEXT_H*/
diff --git a/cpp/src/qpid/messaging/amqp/SenderHandle.cpp b/cpp/src/qpid/messaging/amqp/SenderHandle.cpp
new file mode 100644
index 0000000000..b7168e5b31
--- /dev/null
+++ b/cpp/src/qpid/messaging/amqp/SenderHandle.cpp
@@ -0,0 +1,75 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "SenderHandle.h"
+#include "ConnectionContext.h"
+#include "SessionContext.h"
+#include "SessionHandle.h"
+#include "SenderContext.h"
+#include "qpid/messaging/Duration.h"
+#include "qpid/messaging/exceptions.h"
+#include "qpid/messaging/Message.h"
+#include "qpid/messaging/Session.h"
+
+namespace qpid {
+namespace messaging {
+namespace amqp {
+
+SenderHandle::SenderHandle(boost::shared_ptr<ConnectionContext> c,
+ boost::shared_ptr<SessionContext> s,
+ boost::shared_ptr<SenderContext> sndr
+) : connection(c), session(s), sender(sndr) {}
+
+void SenderHandle::send(const Message& message, bool sync)
+{
+ connection->send(sender, message, sync);
+}
+
+void SenderHandle::close()
+{
+ session->closeSender(getName());
+}
+
+void SenderHandle::setCapacity(uint32_t capacity)
+{
+ connection->setCapacity(sender, capacity);
+}
+
+uint32_t SenderHandle::getCapacity()
+{
+ return connection->getCapacity(sender);
+}
+
+uint32_t SenderHandle::getUnsettled()
+{
+ return connection->getUnsettled(sender);
+}
+
+const std::string& SenderHandle::getName() const
+{
+ return sender->getName();
+}
+
+qpid::messaging::Session SenderHandle::getSession() const
+{
+ return qpid::messaging::Session(new SessionHandle(connection, session));
+}
+
+}}} // namespace qpid::messaging::amqp
diff --git a/cpp/src/qpid/messaging/amqp/SenderHandle.h b/cpp/src/qpid/messaging/amqp/SenderHandle.h
new file mode 100644
index 0000000000..3c6b666582
--- /dev/null
+++ b/cpp/src/qpid/messaging/amqp/SenderHandle.h
@@ -0,0 +1,58 @@
+#ifndef QPID_MESSAGING_AMQP_SENDERHANDLE_H
+#define QPID_MESSAGING_AMQP_SENDERHANDLE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <boost/shared_ptr.hpp>
+#include "qpid/messaging/SenderImpl.h"
+
+namespace qpid {
+namespace messaging {
+namespace amqp {
+
+class ConnectionContext;
+class SessionContext;
+class SenderContext;
+/**
+ *
+ */
+class SenderHandle : public qpid::messaging::SenderImpl
+{
+ public:
+ SenderHandle(boost::shared_ptr<ConnectionContext> connection,
+ boost::shared_ptr<SessionContext> session,
+ boost::shared_ptr<SenderContext> sender
+ );
+ void send(const Message& message, bool sync);
+ void close();
+ void setCapacity(uint32_t);
+ uint32_t getCapacity();
+ uint32_t getUnsettled();
+ const std::string& getName() const;
+ Session getSession() const;
+ private:
+ boost::shared_ptr<ConnectionContext> connection;
+ boost::shared_ptr<SessionContext> session;
+ boost::shared_ptr<SenderContext> sender;
+};
+}}} // namespace qpid::messaging::amqp
+
+#endif /*!QPID_MESSAGING_AMQP_SENDERHANDLE_H*/
diff --git a/cpp/src/qpid/messaging/amqp/SessionContext.cpp b/cpp/src/qpid/messaging/amqp/SessionContext.cpp
new file mode 100644
index 0000000000..9bdc658bc7
--- /dev/null
+++ b/cpp/src/qpid/messaging/amqp/SessionContext.cpp
@@ -0,0 +1,156 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "SessionContext.h"
+#include "SenderContext.h"
+#include "ReceiverContext.h"
+#include <boost/format.hpp>
+#include "qpid/messaging/Address.h"
+#include "qpid/messaging/Duration.h"
+#include "qpid/messaging/exceptions.h"
+#include "qpid/log/Statement.h"
+extern "C" {
+#include <proton/engine.h>
+}
+
+namespace qpid {
+namespace messaging {
+namespace amqp {
+
+SessionContext::SessionContext(pn_connection_t* connection) : session(pn_session(connection)) {}
+SessionContext::~SessionContext()
+{
+ senders.clear(); receivers.clear();
+ pn_session_free(session);
+}
+
+boost::shared_ptr<SenderContext> SessionContext::createSender(const qpid::messaging::Address& address)
+{
+ std::string name = address.getName();
+
+ int count = 1;
+ for (SenderMap::const_iterator i = senders.find(name); i != senders.end(); i = senders.find(name)) {
+ name = (boost::format("%1%_%2%") % address.getName() % ++count).str();
+ }
+ boost::shared_ptr<SenderContext> s(new SenderContext(session, name, address));
+ senders[name] = s;
+ return s;
+}
+
+boost::shared_ptr<ReceiverContext> SessionContext::createReceiver(const qpid::messaging::Address& address)
+{
+ std::string name = address.getName();
+
+ int count = 1;
+ for (ReceiverMap::const_iterator i = receivers.find(name); i != receivers.end(); i = receivers.find(name)) {
+ name = (boost::format("%1%_%2%") % address.getName() % ++count).str();
+ }
+ boost::shared_ptr<ReceiverContext> r(new ReceiverContext(session, name, address));
+ receivers[name] = r;
+ return r;
+}
+
+boost::shared_ptr<SenderContext> SessionContext::getSender(const std::string& name) const
+{
+ SenderMap::const_iterator i = senders.find(name);
+ if (i == senders.end()) {
+ throw qpid::messaging::KeyError(std::string("No such sender") + name);
+ } else {
+ return i->second;
+ }
+}
+
+boost::shared_ptr<ReceiverContext> SessionContext::getReceiver(const std::string& name) const
+{
+ ReceiverMap::const_iterator i = receivers.find(name);
+ if (i == receivers.end()) {
+ throw qpid::messaging::KeyError(std::string("No such receiver") + name);
+ } else {
+ return i->second;
+ }
+}
+
+void SessionContext::closeReceiver(const std::string&)
+{
+
+}
+
+void SessionContext::closeSender(const std::string&)
+{
+
+}
+
+boost::shared_ptr<ReceiverContext> SessionContext::nextReceiver(qpid::messaging::Duration /*timeout*/)
+{
+ return boost::shared_ptr<ReceiverContext>();
+}
+
+uint32_t SessionContext::getReceivable()
+{
+ return 0;//TODO
+}
+
+uint32_t SessionContext::getUnsettledAcks()
+{
+ return 0;//TODO
+}
+
+qpid::framing::SequenceNumber SessionContext::record(pn_delivery_t* delivery)
+{
+ qpid::framing::SequenceNumber id = next++;
+ unacked[id] = delivery;
+ QPID_LOG(debug, "Recorded delivery " << id << " -> " << delivery);
+ return id;
+}
+
+void SessionContext::acknowledge(DeliveryMap::iterator begin, DeliveryMap::iterator end)
+{
+ for (DeliveryMap::iterator i = begin; i != end; ++i) {
+ QPID_LOG(debug, "Setting disposition for delivery " << i->first << " -> " << i->second);
+ pn_delivery_update(i->second, PN_ACCEPTED);
+ pn_delivery_settle(i->second);//TODO: different settlement modes?
+ }
+ unacked.erase(begin, end);
+}
+
+void SessionContext::acknowledge()
+{
+ QPID_LOG(debug, "acknowledging all " << unacked.size() << " messages");
+ acknowledge(unacked.begin(), unacked.end());
+}
+
+void SessionContext::acknowledge(const qpid::framing::SequenceNumber& id, bool cumulative)
+{
+ DeliveryMap::iterator i = unacked.find(id);
+ if (i != unacked.end()) {
+ acknowledge(cumulative ? unacked.begin() : i, ++i);
+ }
+}
+
+bool SessionContext::settled()
+{
+ bool result = true;
+ for (SenderMap::iterator i = senders.begin(); i != senders.end(); ++i) {
+ if (!i->second->settled()) result = false;
+ }
+ return result;
+}
+
+}}} // namespace qpid::messaging::amqp
diff --git a/cpp/src/qpid/messaging/amqp/SessionContext.h b/cpp/src/qpid/messaging/amqp/SessionContext.h
new file mode 100644
index 0000000000..eca30a0e97
--- /dev/null
+++ b/cpp/src/qpid/messaging/amqp/SessionContext.h
@@ -0,0 +1,81 @@
+#ifndef QPID_MESSAGING_AMQP_SESSIONCONTEXT_H
+#define QPID_MESSAGING_AMQP_SESSIONCONTEXT_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <map>
+#include <string>
+#include <boost/shared_ptr.hpp>
+#include "qpid/sys/IntegerTypes.h"
+#include "qpid/framing/SequenceNumber.h"
+
+struct pn_connection_t;
+struct pn_session_t;
+struct pn_delivery_t;
+
+namespace qpid {
+namespace messaging {
+
+class Address;
+class Duration;
+
+namespace amqp {
+
+class ConnectionContext;
+class SenderContext;
+class ReceiverContext;
+/**
+ *
+ */
+class SessionContext
+{
+ public:
+ SessionContext(pn_connection_t*);
+ ~SessionContext();
+ boost::shared_ptr<SenderContext> createSender(const qpid::messaging::Address& address);
+ boost::shared_ptr<ReceiverContext> createReceiver(const qpid::messaging::Address& address);
+ boost::shared_ptr<SenderContext> getSender(const std::string& name) const;
+ boost::shared_ptr<ReceiverContext> getReceiver(const std::string& name) const;
+ void closeReceiver(const std::string&);
+ void closeSender(const std::string&);
+ boost::shared_ptr<ReceiverContext> nextReceiver(qpid::messaging::Duration timeout);
+ uint32_t getReceivable();
+ uint32_t getUnsettledAcks();
+ bool settled();
+ private:
+ friend class ConnectionContext;
+ typedef std::map<std::string, boost::shared_ptr<SenderContext> > SenderMap;
+ typedef std::map<std::string, boost::shared_ptr<ReceiverContext> > ReceiverMap;
+ typedef std::map<qpid::framing::SequenceNumber, pn_delivery_t*> DeliveryMap;
+ pn_session_t* session;
+ SenderMap senders;
+ ReceiverMap receivers;
+ DeliveryMap unacked;
+ qpid::framing::SequenceNumber next;
+
+ qpid::framing::SequenceNumber record(pn_delivery_t*);
+ void acknowledge();
+ void acknowledge(const qpid::framing::SequenceNumber& id, bool cummulative);
+ void acknowledge(DeliveryMap::iterator begin, DeliveryMap::iterator end);
+};
+}}} // namespace qpid::messaging::amqp
+
+#endif /*!QPID_MESSAGING_AMQP_SESSIONCONTEXT_H*/
diff --git a/cpp/src/qpid/messaging/amqp/SessionHandle.cpp b/cpp/src/qpid/messaging/amqp/SessionHandle.cpp
new file mode 100644
index 0000000000..bf79771ca4
--- /dev/null
+++ b/cpp/src/qpid/messaging/amqp/SessionHandle.cpp
@@ -0,0 +1,148 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "SessionHandle.h"
+#include "ConnectionContext.h"
+#include "ConnectionHandle.h"
+#include "ReceiverContext.h"
+#include "ReceiverHandle.h"
+#include "SenderContext.h"
+#include "SenderHandle.h"
+#include "SessionContext.h"
+#include "qpid/messaging/Connection.h"
+#include "qpid/messaging/Duration.h"
+#include "qpid/messaging/exceptions.h"
+#include "qpid/messaging/Receiver.h"
+#include "qpid/messaging/Sender.h"
+#include "qpid/messaging/Session.h"
+#include "qpid/log/Statement.h"
+
+namespace qpid {
+namespace messaging {
+namespace amqp {
+
+SessionHandle::SessionHandle(boost::shared_ptr<ConnectionContext> c, boost::shared_ptr<SessionContext> s) : connection(c), session(s) {}
+
+void SessionHandle::commit()
+{
+
+}
+
+void SessionHandle::rollback()
+{
+
+}
+
+void SessionHandle::acknowledge(bool /*sync*/)
+{
+ connection->acknowledge(session, 0, false);
+}
+
+void SessionHandle::acknowledge(qpid::messaging::Message& msg, bool cumulative)
+{
+ //TODO: handle cumulative
+ connection->acknowledge(session, &msg, cumulative);
+}
+
+void SessionHandle::reject(qpid::messaging::Message&)
+{
+
+}
+
+void SessionHandle::release(qpid::messaging::Message&)
+{
+
+}
+
+void SessionHandle::close()
+{
+ connection->endSession(session);
+}
+
+void SessionHandle::sync(bool /*block*/)
+{
+
+}
+
+qpid::messaging::Sender SessionHandle::createSender(const qpid::messaging::Address& address)
+{
+ boost::shared_ptr<SenderContext> sender = session->createSender(address);
+ connection->attach(session, sender);
+ return qpid::messaging::Sender(new SenderHandle(connection, session, sender));
+}
+
+qpid::messaging::Receiver SessionHandle::createReceiver(const qpid::messaging::Address& address)
+{
+ boost::shared_ptr<ReceiverContext> receiver = session->createReceiver(address);
+ connection->attach(session, receiver);
+ return qpid::messaging::Receiver(new ReceiverHandle(connection, session, receiver));
+}
+
+bool SessionHandle::nextReceiver(Receiver& receiver, Duration timeout)
+{
+ boost::shared_ptr<ReceiverContext> r = session->nextReceiver(timeout);
+ if (r) {
+ //TODO: cache handles in this case to avoid frequent allocation
+ receiver = qpid::messaging::Receiver(new ReceiverHandle(connection, session, r));
+ return true;
+ } else {
+ return false;
+ }
+}
+
+qpid::messaging::Receiver SessionHandle::nextReceiver(Duration timeout)
+{
+ qpid::messaging::Receiver r;
+ if (nextReceiver(r, timeout)) return r;
+ else throw qpid::messaging::NoMessageAvailable();
+}
+
+uint32_t SessionHandle::getReceivable()
+{
+ return session->getReceivable();
+}
+
+uint32_t SessionHandle::getUnsettledAcks()
+{
+ return session->getUnsettledAcks();
+}
+
+Sender SessionHandle::getSender(const std::string& name) const
+{
+ return qpid::messaging::Sender(new SenderHandle(connection, session, session->getSender(name)));
+}
+
+Receiver SessionHandle::getReceiver(const std::string& name) const
+{
+ return qpid::messaging::Receiver(new ReceiverHandle(connection, session, session->getReceiver(name)));
+}
+
+Connection SessionHandle::getConnection() const
+{
+ return qpid::messaging::Connection(new ConnectionHandle(connection));
+}
+
+void SessionHandle::checkError()
+{
+
+}
+
+
+}}} // namespace qpid::messaging::amqp
diff --git a/cpp/src/qpid/messaging/amqp/SessionHandle.h b/cpp/src/qpid/messaging/amqp/SessionHandle.h
new file mode 100644
index 0000000000..5e843aaacc
--- /dev/null
+++ b/cpp/src/qpid/messaging/amqp/SessionHandle.h
@@ -0,0 +1,64 @@
+#ifndef QPID_MESSAGING_AMQP_SESSIONIMPL_H
+#define QPID_MESSAGING_AMQP_SESSIONIMPL_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <boost/shared_ptr.hpp>
+#include "qpid/messaging/SessionImpl.h"
+
+namespace qpid {
+namespace messaging {
+namespace amqp {
+
+class ConnectionContext;
+class SessionContext;
+/**
+ *
+ */
+class SessionHandle : public qpid::messaging::SessionImpl
+{
+ public:
+ SessionHandle(boost::shared_ptr<ConnectionContext>, boost::shared_ptr<SessionContext>);
+ void commit();
+ void rollback();
+ void acknowledge(bool sync);
+ void acknowledge(Message&, bool);
+ void reject(Message&);
+ void release(Message&);
+ void close();
+ void sync(bool block);
+ qpid::messaging::Sender createSender(const Address& address);
+ qpid::messaging::Receiver createReceiver(const Address& address);
+ bool nextReceiver(Receiver& receiver, Duration timeout);
+ qpid::messaging::Receiver nextReceiver(Duration timeout);
+ uint32_t getReceivable();
+ uint32_t getUnsettledAcks();
+ qpid::messaging::Sender getSender(const std::string& name) const;
+ qpid::messaging::Receiver getReceiver(const std::string& name) const;
+ qpid::messaging::Connection getConnection() const;
+ void checkError();
+ private:
+ boost::shared_ptr<ConnectionContext> connection;
+ boost::shared_ptr<SessionContext> session;
+};
+}}} // namespace qpid::messaging::amqp
+
+#endif /*!QPID_MESSAGING_AMQP_SESSIONIMPL_H*/
diff --git a/cpp/src/qpid/messaging/amqp/SslTransport.cpp b/cpp/src/qpid/messaging/amqp/SslTransport.cpp
new file mode 100644
index 0000000000..ea2375cb26
--- /dev/null
+++ b/cpp/src/qpid/messaging/amqp/SslTransport.cpp
@@ -0,0 +1,160 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "SslTransport.h"
+#include "TransportContext.h"
+#include "qpid/sys/ssl/SslSocket.h"
+#include "qpid/sys/AsynchIO.h"
+#include "qpid/sys/ConnectionCodec.h"
+#include "qpid/sys/Poller.h"
+#include "qpid/log/Statement.h"
+#include <boost/bind.hpp>
+#include <boost/format.hpp>
+
+using namespace qpid::sys;
+using namespace qpid::sys::ssl;
+
+namespace qpid {
+namespace messaging {
+namespace amqp {
+
+// Static constructor which registers connector here
+namespace {
+Transport* create(TransportContext& c, Poller::shared_ptr p)
+{
+ return new SslTransport(c, p);
+}
+
+struct StaticInit
+{
+ StaticInit()
+ {
+ Transport::add("ssl", &create);
+ };
+} init;
+}
+
+
+SslTransport::SslTransport(TransportContext& c, boost::shared_ptr<Poller> p) : context(c), connector(0), aio(0), poller(p) {}
+
+void SslTransport::connect(const std::string& host, const std::string& port)
+{
+ assert(!connector);
+ assert(!aio);
+ connector = AsynchConnector::create(
+ socket,
+ host, port,
+ boost::bind(&SslTransport::connected, this, _1),
+ boost::bind(&SslTransport::failed, this, _3));
+
+ connector->start(poller);
+}
+
+void SslTransport::failed(const std::string& msg)
+{
+ QPID_LOG(debug, "Failed to connect: " << msg);
+ socket.close();
+ context.closed();
+}
+
+void SslTransport::connected(const Socket&)
+{
+ context.opened();
+ aio = AsynchIO::create(socket,
+ boost::bind(&SslTransport::read, this, _1, _2),
+ boost::bind(&SslTransport::eof, this, _1),
+ boost::bind(&SslTransport::disconnected, this, _1),
+ boost::bind(&SslTransport::socketClosed, this, _1, _2),
+ 0, // nobuffs
+ boost::bind(&SslTransport::write, this, _1));
+ aio->createBuffers(std::numeric_limits<uint16_t>::max());//note: AMQP 1.0 _can_ handle large frame sizes
+ id = boost::str(boost::format("[%1%]") % socket.getFullAddress());
+ aio->start(poller);
+}
+
+void SslTransport::read(AsynchIO&, AsynchIO::BufferBase* buffer)
+{
+ int32_t decoded = context.getCodec().decode(buffer->bytes+buffer->dataStart, buffer->dataCount);
+ if (decoded < buffer->dataCount) {
+ // Adjust buffer for used bytes and then "unread them"
+ buffer->dataStart += decoded;
+ buffer->dataCount -= decoded;
+ aio->unread(buffer);
+ } else {
+ // Give whole buffer back to aio subsystem
+ aio->queueReadBuffer(buffer);
+ }
+}
+
+void SslTransport::write(AsynchIO&)
+{
+ if (context.getCodec().canEncode()) {
+ AsynchIO::BufferBase* buffer = aio->getQueuedBuffer();
+ if (buffer) {
+ size_t encoded = context.getCodec().encode(buffer->bytes, buffer->byteCount);
+
+ buffer->dataStart = 0;
+ buffer->dataCount = encoded;
+ aio->queueWrite(buffer);
+ }
+ }
+
+}
+
+void SslTransport::close()
+{
+ QPID_LOG(debug, id << " SslTransport closing...");
+ if (aio)
+ aio->queueWriteClose();
+}
+
+void SslTransport::eof(AsynchIO&)
+{
+ close();
+}
+
+void SslTransport::disconnected(AsynchIO&)
+{
+ close();
+ socketClosed(*aio, socket);
+}
+
+void SslTransport::socketClosed(AsynchIO&, const Socket&)
+{
+ if (aio)
+ aio->queueForDeletion();
+ context.closed();
+ QPID_LOG(debug, id << " Socket closed");
+}
+
+void SslTransport::abort()
+{
+ if (aio) {
+ // Established connection
+ aio->requestCallback(boost::bind(&SslTransport::eof, this, _1));
+ }
+}
+
+void SslTransport::activateOutput()
+{
+ if (aio) aio->notifyPendingWrite();
+}
+
+}}} // namespace qpid::messaging::amqp
diff --git a/cpp/src/qpid/messaging/amqp/SslTransport.h b/cpp/src/qpid/messaging/amqp/SslTransport.h
new file mode 100644
index 0000000000..f67ab95673
--- /dev/null
+++ b/cpp/src/qpid/messaging/amqp/SslTransport.h
@@ -0,0 +1,74 @@
+#ifndef QPID_MESSAGING_AMQP_SSLTRANSPORT_H
+#define QPID_MESSAGING_AMQP_SSLTRANSPORT_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/messaging/amqp/Transport.h"
+#include "qpid/sys/Mutex.h"
+#include "qpid/sys/ssl/SslSocket.h"
+#include <boost/shared_ptr.hpp>
+
+namespace qpid {
+namespace sys {
+class ConnectionCodec;
+class Poller;
+class AsynchConnector;
+class AsynchIO;
+class AsynchIOBufferBase;
+}
+
+namespace messaging {
+namespace amqp {
+class TransportContext;
+
+class SslTransport : public Transport
+{
+ public:
+ SslTransport(TransportContext&, boost::shared_ptr<qpid::sys::Poller> p);
+
+ void connect(const std::string& host, const std::string& port);
+
+ void activateOutput();
+ void abort();
+ void close();
+
+ private:
+ qpid::sys::ssl::SslSocket socket;
+ TransportContext& context;
+ qpid::sys::AsynchConnector* connector;
+ qpid::sys::AsynchIO* aio;
+ boost::shared_ptr<qpid::sys::Poller> poller;
+ bool closed;
+ std::string id;
+
+ void connected(const qpid::sys::Socket&);
+ void failed(const std::string& msg);
+ void read(qpid::sys::AsynchIO&, qpid::sys::AsynchIOBufferBase*);
+ void write(qpid::sys::AsynchIO&);
+ void eof(qpid::sys::AsynchIO&);
+ void disconnected(qpid::sys::AsynchIO&);
+ void socketClosed(qpid::sys::AsynchIO&, const qpid::sys::Socket&);
+
+ friend class DriverImpl;
+};
+}}} // namespace qpid::messaging::amqp
+
+#endif /*!QPID_MESSAGING_AMQP_SSLTRANSPORT_H*/
diff --git a/cpp/src/qpid/messaging/amqp/TcpTransport.cpp b/cpp/src/qpid/messaging/amqp/TcpTransport.cpp
new file mode 100644
index 0000000000..98022d634c
--- /dev/null
+++ b/cpp/src/qpid/messaging/amqp/TcpTransport.cpp
@@ -0,0 +1,162 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "TcpTransport.h"
+#include "ConnectionContext.h"
+#include "qpid/sys/AsynchIO.h"
+#include "qpid/sys/ConnectionCodec.h"
+#include "qpid/sys/Poller.h"
+#include "qpid/log/Statement.h"
+#include <boost/bind.hpp>
+#include <boost/format.hpp>
+
+using namespace qpid::sys;
+
+namespace qpid {
+namespace messaging {
+namespace amqp {
+// Static constructor which registers connector here
+namespace {
+Transport* create(TransportContext& c, Poller::shared_ptr p)
+{
+ return new TcpTransport(c, p);
+}
+
+struct StaticInit
+{
+ StaticInit()
+ {
+ Transport::add("tcp", &create);
+ };
+} init;
+}
+
+TcpTransport::TcpTransport(TransportContext& c, boost::shared_ptr<Poller> p) : socket(createSocket()), context(c), connector(0), aio(0), poller(p) {}
+
+void TcpTransport::connect(const std::string& host, const std::string& port)
+{
+ assert(!connector);
+ assert(!aio);
+ connector = AsynchConnector::create(
+ *socket,
+ host, port,
+ boost::bind(&TcpTransport::connected, this, _1),
+ boost::bind(&TcpTransport::failed, this, _3));
+
+ connector->start(poller);
+}
+
+void TcpTransport::failed(const std::string& msg)
+{
+ QPID_LOG(debug, "Failed to connect: " << msg);
+ connector = 0;
+ socket->close();
+ context.closed();
+}
+
+void TcpTransport::connected(const Socket&)
+{
+ context.opened();
+ connector = 0;
+ aio = AsynchIO::create(*socket,
+ boost::bind(&TcpTransport::read, this, _1, _2),
+ boost::bind(&TcpTransport::eof, this, _1),
+ boost::bind(&TcpTransport::disconnected, this, _1),
+ boost::bind(&TcpTransport::socketClosed, this, _1, _2),
+ 0, // nobuffs
+ boost::bind(&TcpTransport::write, this, _1));
+ aio->createBuffers(std::numeric_limits<uint16_t>::max());//note: AMQP 1.0 _can_ handle large frame sizes
+ id = boost::str(boost::format("[%1%]") % socket->getFullAddress());
+ aio->start(poller);
+}
+
+void TcpTransport::read(AsynchIO&, AsynchIO::BufferBase* buffer)
+{
+ int32_t decoded = context.getCodec().decode(buffer->bytes+buffer->dataStart, buffer->dataCount);
+ if (decoded < buffer->dataCount) {
+ // Adjust buffer for used bytes and then "unread them"
+ buffer->dataStart += decoded;
+ buffer->dataCount -= decoded;
+ aio->unread(buffer);
+ } else {
+ // Give whole buffer back to aio subsystem
+ aio->queueReadBuffer(buffer);
+ }
+}
+
+void TcpTransport::write(AsynchIO&)
+{
+ if (context.getCodec().canEncode()) {
+ AsynchIO::BufferBase* buffer = aio->getQueuedBuffer();
+ if (buffer) {
+ size_t encoded = context.getCodec().encode(buffer->bytes, buffer->byteCount);
+
+ buffer->dataStart = 0;
+ buffer->dataCount = encoded;
+ aio->queueWrite(buffer);
+ }
+ }
+
+}
+
+void TcpTransport::close()
+{
+ QPID_LOG(debug, id << " TcpTransport closing...");
+ if (aio)
+ aio->queueWriteClose();
+}
+
+void TcpTransport::eof(AsynchIO&)
+{
+ close();
+}
+
+void TcpTransport::disconnected(AsynchIO&)
+{
+ close();
+ socketClosed(*aio, *socket);
+}
+
+void TcpTransport::socketClosed(AsynchIO&, const Socket&)
+{
+ if (aio)
+ aio->queueForDeletion();
+ context.closed();
+ QPID_LOG(debug, id << " Socket closed");
+}
+
+void TcpTransport::abort()
+{
+ if (aio) {
+ // Established connection
+ aio->requestCallback(boost::bind(&TcpTransport::eof, this, _1));
+ } else if (connector) {
+ // We're still connecting
+ connector->stop();
+ failed("Connection timedout");
+ }
+}
+
+void TcpTransport::activateOutput()
+{
+ if (aio) aio->notifyPendingWrite();
+}
+
+}}} // namespace qpid::messaging::amqp
diff --git a/cpp/src/qpid/messaging/amqp/TcpTransport.h b/cpp/src/qpid/messaging/amqp/TcpTransport.h
new file mode 100644
index 0000000000..8c1087abb3
--- /dev/null
+++ b/cpp/src/qpid/messaging/amqp/TcpTransport.h
@@ -0,0 +1,71 @@
+#ifndef QPID_MESSAGING_AMQP_TCPTRANSPORT_H
+#define QPID_MESSAGING_AMQP_TCPTRANSPORT_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/messaging/amqp/Transport.h"
+#include "qpid/sys/Mutex.h"
+#include "qpid/sys/Socket.h"
+#include <boost/scoped_ptr.hpp>
+#include <boost/shared_ptr.hpp>
+
+namespace qpid {
+namespace sys {
+class ConnectionCodec;
+class AsynchConnector;
+class AsynchIO;
+class AsynchIOBufferBase;
+class Poller;
+}
+namespace messaging {
+namespace amqp {
+class TransportContext;
+
+class TcpTransport : public Transport
+{
+ public:
+ TcpTransport(TransportContext&, boost::shared_ptr<qpid::sys::Poller>);
+
+ void connect(const std::string& host, const std::string& port);
+
+ void activateOutput();
+ void abort();
+ void close();
+
+ private:
+ boost::scoped_ptr<qpid::sys::Socket> socket;
+ TransportContext& context;
+ qpid::sys::AsynchConnector* connector;
+ qpid::sys::AsynchIO* aio;
+ boost::shared_ptr<qpid::sys::Poller> poller;
+ std::string id;
+
+ void connected(const qpid::sys::Socket&);
+ void failed(const std::string& msg);
+ void read(qpid::sys::AsynchIO&, qpid::sys::AsynchIOBufferBase*);
+ void write(qpid::sys::AsynchIO&);
+ void eof(qpid::sys::AsynchIO&);
+ void disconnected(qpid::sys::AsynchIO&);
+ void socketClosed(qpid::sys::AsynchIO&, const qpid::sys::Socket&);
+};
+}}} // namespace qpid::messaging::amqp
+
+#endif /*!QPID_MESSAGING_AMQP_TCPTRANSPORT_H*/
diff --git a/cpp/src/qpid/messaging/amqp/Transport.cpp b/cpp/src/qpid/messaging/amqp/Transport.cpp
new file mode 100644
index 0000000000..21f51046b1
--- /dev/null
+++ b/cpp/src/qpid/messaging/amqp/Transport.cpp
@@ -0,0 +1,50 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/messaging/amqp/Transport.h"
+#include "qpid/messaging/amqp/TransportContext.h"
+#include <map>
+#include <string>
+
+namespace qpid {
+namespace messaging {
+namespace amqp {
+namespace {
+typedef std::map<std::string, Transport::Factory*> Registry;
+
+Registry& theRegistry()
+{
+ static Registry factories;
+ return factories;
+}
+}
+
+Transport* Transport::create(const std::string& name, TransportContext& context, boost::shared_ptr<qpid::sys::Poller> poller)
+{
+ Registry::const_iterator i = theRegistry().find(name);
+ if (i != theRegistry().end()) return (i->second)(context, poller);
+ else return 0;
+}
+void Transport::add(const std::string& name, Factory* factory)
+{
+ theRegistry()[name] = factory;
+}
+
+}}} // namespace qpid::messaging::amqp
diff --git a/cpp/src/qpid/messaging/amqp/Transport.h b/cpp/src/qpid/messaging/amqp/Transport.h
new file mode 100644
index 0000000000..ee021f645b
--- /dev/null
+++ b/cpp/src/qpid/messaging/amqp/Transport.h
@@ -0,0 +1,48 @@
+#ifndef QPID_MESSAGING_AMQP_TRANSPORT_H
+#define QPID_MESSAGING_AMQP_TRANSPORT_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/sys/OutputControl.h"
+#include <boost/shared_ptr.hpp>
+
+namespace qpid {
+namespace sys {
+class Poller;
+}
+namespace messaging {
+namespace amqp {
+class TransportContext;
+
+class Transport : public qpid::sys::OutputControl
+{
+ public:
+ virtual ~Transport() {}
+ virtual void connect(const std::string& host, const std::string& port) = 0;
+ virtual void close() = 0;
+
+ typedef Transport* Factory(TransportContext&, boost::shared_ptr<qpid::sys::Poller>);
+ static Transport* create(const std::string& name, TransportContext&, boost::shared_ptr<qpid::sys::Poller>);
+ static void add(const std::string& name, Factory* factory);
+};
+}}} // namespace qpid::messaging::amqp
+
+#endif /*!QPID_MESSAGING_AMQP_TRANSPORT_H*/
diff --git a/cpp/src/qpid/messaging/amqp/TransportContext.h b/cpp/src/qpid/messaging/amqp/TransportContext.h
new file mode 100644
index 0000000000..57192b5976
--- /dev/null
+++ b/cpp/src/qpid/messaging/amqp/TransportContext.h
@@ -0,0 +1,47 @@
+#ifndef QPID_MESSAGING_AMQP_TRANSPORTCONTEXT_H
+#define QPID_MESSAGING_AMQP_TRANSPORTCONTEXT_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+namespace qpid {
+namespace sys {
+class Codec;
+}
+namespace messaging {
+namespace amqp {
+
+/**
+ * Interface to be supplied by 'users' of Transport interface, in
+ * order to provide codec and handle callbaskc for opening and closing
+ * of connection.
+ */
+class TransportContext
+{
+ public:
+ virtual ~TransportContext() {}
+ virtual qpid::sys::Codec& getCodec() = 0;
+ virtual void closed() = 0;
+ virtual void opened() = 0;
+ private:
+};
+}}} // namespace qpid::messaging::amqp
+
+#endif /*!QPID_MESSAGING_AMQP_TRANSPORTCONTEXT_H*/