summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2012-04-10 14:08:05 +0000
committerGordon Sim <gsim@apache.org>2012-04-10 14:08:05 +0000
commitdd007bcf3cba0074c42a6c13c151241fb6e2aad0 (patch)
treec3bee6bd265768e35b52595f00b803872574297c
parent66e1ad222d55a3ed0f4e1198a6bd45188fc76695 (diff)
downloadqpid-python-dd007bcf3cba0074c42a6c13c151241fb6e2aad0.tar.gz
Basic AMQP 1.0 support for qpid::messaging API
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/gs-amqp-1-0-sandbox@1311733 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/README_AMQP_1.0.txt41
-rw-r--r--qpid/cpp/configure.ac16
-rw-r--r--qpid/cpp/examples/messaging/spout.cpp3
-rw-r--r--qpid/cpp/src/Makefile.am18
-rw-r--r--qpid/cpp/src/qpid/framing/ProtocolInitiation.cpp30
-rw-r--r--qpid/cpp/src/qpid/messaging/Connection.cpp16
-rw-r--r--qpid/cpp/src/qpid/messaging/MessageImpl.cpp1
-rw-r--r--qpid/cpp/src/qpid/messaging/MessageImpl.h1
-rw-r--r--qpid/cpp/src/qpid/messaging/ReceiverImpl.h2
-rw-r--r--qpid/cpp/src/qpid/messaging/SenderImpl.h1
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp480
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h137
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/ConnectionHandle.cpp68
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/ConnectionHandle.h58
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/Decoder.cpp406
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/Decoder.h153
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp97
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h63
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/ReceiverHandle.cpp106
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/ReceiverHandle.h63
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp116
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/SenderContext.h82
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/SenderHandle.cpp75
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/SenderHandle.h58
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp146
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/SessionContext.h80
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp148
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/SessionHandle.h64
-rw-r--r--qpid/cpp/src/tests/test_env.sh.in1
29 files changed, 2519 insertions, 11 deletions
diff --git a/qpid/cpp/README_AMQP_1.0.txt b/qpid/cpp/README_AMQP_1.0.txt
new file mode 100644
index 0000000000..0db15f2b7d
--- /dev/null
+++ b/qpid/cpp/README_AMQP_1.0.txt
@@ -0,0 +1,41 @@
+This branch contains an initial integration of AMQP 1.0 support. This
+is provided via the qpid 'proton-c' component, available from
+https://svn.apache.org/repos/asf/qpid/proton/trunk/proton-c.
+
+The (absolute) path to the location of that library should be
+specified via the '--with-proton' option to configure.
+
+At present only the messaging API implementation is integrated
+(i.e. client only). I've used the python broker available from
+https://github.com/rhs/amqp for my testing so far.
+
+There is no automatic protocol version detection yet, so you need to
+explicitly enable AMQP 1.0 using the 'use_amqp1.0' connection option.
+
+E.g. start broker with:
+
+./broker -a -u example-users -n example-nodes
+
+then run:
+
+./examples/messaging/spout --connection-options '{use_amqp1.0:True}' --content 'my-message' queue
+
+then:
+
+./examples/messaging/drain --connection-options '{use_amqp1.0:True}' queue
+
+Note: at present the cluster tests don't run correctly from `make check` due to
+inability to find the proton library.
+
+== Status / TODO ==
+
+* client only at present
+* there is no support for transactions
+* there is no automatic protocol version detection
+* only message content is encoded/decoded (not properties/headers)
+* only the node names in addresses are actually used at present
+* driver-integration:
+ - the code at present uses the proton driver for IO
+ - each connection uses its own driver and starts a thread for that purpose; TODO: charing drivers between connections
+ - there is no integration with the c++ 'transports' (ssl, rdma etc)
+ - there is no real SASL integration (just simple PLAIN and ANONYMOUS supported by proton driver)
diff --git a/qpid/cpp/configure.ac b/qpid/cpp/configure.ac
index 6ba5f62f0e..16c4c88083 100644
--- a/qpid/cpp/configure.ac
+++ b/qpid/cpp/configure.ac
@@ -107,6 +107,22 @@ AC_DISABLE_STATIC
AC_PROG_LIBTOOL
AC_SUBST([LIBTOOL_DEPS])
+# Allow integration against external AMQP 1.0 protocol engine
+AC_ARG_WITH([proton],
+ [AS_HELP_STRING([--with-proton],
+ [Base directory for proton project])])
+
+PROTON_BASE=$with_proton
+PROTON_INCLUDE=$PROTON_BASE/include
+PROTON_LIB=$PROTON_BASE
+fail=0
+test -f $PROTON_INCLUDE/proton/engine.h || fail=1
+test $fail = 1 &&
+ AC_MSG_ERROR([Please specify path to proton!])
+CXXFLAGS="$CXXFLAGS -I$PROTON_INCLUDE"
+LDFLAGS="$LDFLAGS -L$PROTON_LIB -lqpidproton"
+AC_SUBST(PROTON_LIB)
+
# For libraries (libcommon) that use dlopen, dlerror, etc.,
# test whether we need to link with -ldl.
gl_saved_libs=$LIBS
diff --git a/qpid/cpp/examples/messaging/spout.cpp b/qpid/cpp/examples/messaging/spout.cpp
index cd11a7ad81..c758cb2967 100644
--- a/qpid/cpp/examples/messaging/spout.cpp
+++ b/qpid/cpp/examples/messaging/spout.cpp
@@ -159,7 +159,8 @@ int main(int argc, char** argv)
std::stringstream spoutid;
spoutid << id << ":" << count;
message.getProperties()["spout-id"] = spoutid.str();
- sender.send(message);
+ sender.send(message, (count + 1 == options.count));
+ //sender.send(message);
}
session.sync();
connection.close();
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am
index 5dcc4cd210..6b382bcec8 100644
--- a/qpid/cpp/src/Makefile.am
+++ b/qpid/cpp/src/Makefile.am
@@ -797,6 +797,24 @@ libqpidmessaging_la_SOURCES = \
qpid/messaging/ReceiverImpl.h \
qpid/messaging/SessionImpl.h \
qpid/messaging/FailoverUpdates.cpp \
+ qpid/messaging/amqp/ConnectionContext.h \
+ qpid/messaging/amqp/ConnectionContext.cpp \
+ qpid/messaging/amqp/ConnectionHandle.h \
+ qpid/messaging/amqp/ConnectionHandle.cpp \
+ qpid/messaging/amqp/Decoder.h \
+ qpid/messaging/amqp/Decoder.cpp \
+ qpid/messaging/amqp/ReceiverContext.h \
+ qpid/messaging/amqp/ReceiverContext.cpp \
+ qpid/messaging/amqp/ReceiverHandle.h \
+ qpid/messaging/amqp/ReceiverHandle.cpp \
+ qpid/messaging/amqp/SenderContext.h \
+ qpid/messaging/amqp/SenderContext.cpp \
+ qpid/messaging/amqp/SenderHandle.h \
+ qpid/messaging/amqp/SenderHandle.cpp \
+ qpid/messaging/amqp/SessionContext.h \
+ qpid/messaging/amqp/SessionContext.cpp \
+ qpid/messaging/amqp/SessionHandle.h \
+ qpid/messaging/amqp/SessionHandle.cpp \
qpid/client/amqp0_10/AcceptTracker.h \
qpid/client/amqp0_10/AcceptTracker.cpp \
qpid/client/amqp0_10/AddressResolution.h \
diff --git a/qpid/cpp/src/qpid/framing/ProtocolInitiation.cpp b/qpid/cpp/src/qpid/framing/ProtocolInitiation.cpp
index e617015d64..5ae731944c 100644
--- a/qpid/cpp/src/qpid/framing/ProtocolInitiation.cpp
+++ b/qpid/cpp/src/qpid/framing/ProtocolInitiation.cpp
@@ -36,10 +36,17 @@ void ProtocolInitiation::encode(Buffer& buffer) const {
buffer.putOctet('M');
buffer.putOctet('Q');
buffer.putOctet('P');
- buffer.putOctet(1);//class
- buffer.putOctet(1);//instance
- buffer.putOctet(version.getMajor());
- buffer.putOctet(version.getMinor());
+ if (version.getMajor() == 1) {
+ buffer.putOctet(0);
+ buffer.putOctet(version.getMajor());
+ buffer.putOctet(version.getMinor());
+ buffer.putOctet(0);//revision
+ } else {
+ buffer.putOctet(1);//class
+ buffer.putOctet(1);//instance
+ buffer.putOctet(version.getMajor());
+ buffer.putOctet(version.getMinor());
+ }
}
bool ProtocolInitiation::decode(Buffer& buffer){
@@ -48,10 +55,17 @@ bool ProtocolInitiation::decode(Buffer& buffer){
buffer.getOctet();//M
buffer.getOctet();//Q
buffer.getOctet();//P
- buffer.getOctet();//class
- buffer.getOctet();//instance
- version.setMajor(buffer.getOctet());
- version.setMinor(buffer.getOctet());
+ uint8_t protocolClass = buffer.getOctet();//class
+ if (protocolClass == 1) {
+ //old (pre-1.0) style
+ buffer.getOctet();//instance
+ version.setMajor(buffer.getOctet());
+ version.setMinor(buffer.getOctet());
+ } else {
+ version.setMajor(buffer.getOctet());
+ version.setMinor(buffer.getOctet());
+ buffer.getOctet();//revision
+ }
return true;
}else{
return false;
diff --git a/qpid/cpp/src/qpid/messaging/Connection.cpp b/qpid/cpp/src/qpid/messaging/Connection.cpp
index bd90aa54a7..b9b3f2a4c3 100644
--- a/qpid/cpp/src/qpid/messaging/Connection.cpp
+++ b/qpid/cpp/src/qpid/messaging/Connection.cpp
@@ -25,6 +25,7 @@
#include "qpid/messaging/SessionImpl.h"
#include "qpid/messaging/PrivateImplRef.h"
#include "qpid/client/amqp0_10/ConnectionImpl.h"
+#include "qpid/messaging/amqp/ConnectionHandle.h"
#include "qpid/log/Statement.h"
namespace qpid {
@@ -44,7 +45,11 @@ Connection::Connection(const std::string& url, const std::string& o)
Variant::Map options;
AddressParser parser(o);
if (o.empty() || parser.parseMap(options)) {
- PI::ctor(*this, new qpid::client::amqp0_10::ConnectionImpl(url, options));
+ if (options.find("use_amqp1.0") != options.end()) {
+ PI::ctor(*this, new qpid::messaging::amqp::ConnectionHandle(url, options));
+ } else {
+ PI::ctor(*this, new qpid::client::amqp0_10::ConnectionImpl(url, options));
+ }
} else {
throw InvalidOptionString("Invalid option string: " + o);
}
@@ -61,7 +66,14 @@ Connection::Connection()
PI::ctor(*this, new qpid::client::amqp0_10::ConnectionImpl(url, options));
}
-void Connection::open() { impl->open(); }
+void Connection::open()
+{
+ try {
+ impl->open();
+ } catch (const ConnectionError& e) {
+ QPID_LOG(notice, "In qpid::messaging::Connection::open(), caught: " << e.what());
+ }
+}
bool Connection::isOpen() { return impl->isOpen(); }
bool Connection::isOpen() const { return impl->isOpen(); }
void Connection::close() { impl->close(); }
diff --git a/qpid/cpp/src/qpid/messaging/MessageImpl.cpp b/qpid/cpp/src/qpid/messaging/MessageImpl.cpp
index 0601800e46..3ef0fa4541 100644
--- a/qpid/cpp/src/qpid/messaging/MessageImpl.cpp
+++ b/qpid/cpp/src/qpid/messaging/MessageImpl.cpp
@@ -61,6 +61,7 @@ void MessageImpl::setHeader(const std::string& key, const qpid::types::Variant&
//should these methods be on MessageContent?
void MessageImpl::setBytes(const std::string& c) { bytes = c; }
void MessageImpl::setBytes(const char* chars, size_t count) { bytes.assign(chars, count); }
+void MessageImpl::appendBytes(const char* chars, size_t count) { bytes.append(chars, count); }
const std::string& MessageImpl::getBytes() const { return bytes; }
std::string& MessageImpl::getBytes() { return bytes; }
diff --git a/qpid/cpp/src/qpid/messaging/MessageImpl.h b/qpid/cpp/src/qpid/messaging/MessageImpl.h
index 57df6b3fda..4510385b6a 100644
--- a/qpid/cpp/src/qpid/messaging/MessageImpl.h
+++ b/qpid/cpp/src/qpid/messaging/MessageImpl.h
@@ -64,6 +64,7 @@ struct MessageImpl
void setBytes(const std::string& bytes);
void setBytes(const char* chars, size_t count);
+ void appendBytes(const char* chars, size_t count);
const std::string& getBytes() const;
std::string& getBytes();
diff --git a/qpid/cpp/src/qpid/messaging/ReceiverImpl.h b/qpid/cpp/src/qpid/messaging/ReceiverImpl.h
index 57059bfd28..e450693d2c 100644
--- a/qpid/cpp/src/qpid/messaging/ReceiverImpl.h
+++ b/qpid/cpp/src/qpid/messaging/ReceiverImpl.h
@@ -22,10 +22,12 @@
*
*/
#include "qpid/RefCounted.h"
+#include "qpid/sys/IntegerTypes.h"
namespace qpid {
namespace messaging {
+class Duration;
class Message;
class MessageListener;
class Session;
diff --git a/qpid/cpp/src/qpid/messaging/SenderImpl.h b/qpid/cpp/src/qpid/messaging/SenderImpl.h
index a1ca02c72c..d978463fdb 100644
--- a/qpid/cpp/src/qpid/messaging/SenderImpl.h
+++ b/qpid/cpp/src/qpid/messaging/SenderImpl.h
@@ -22,6 +22,7 @@
*
*/
#include "qpid/RefCounted.h"
+#include "qpid/sys/IntegerTypes.h"
namespace qpid {
namespace messaging {
diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
new file mode 100644
index 0000000000..a16333b0bb
--- /dev/null
+++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
@@ -0,0 +1,480 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "ConnectionContext.h"
+#include "Decoder.h"
+#include "ReceiverContext.h"
+#include "SenderContext.h"
+#include "SessionContext.h"
+#include "qpid/messaging/exceptions.h"
+#include "qpid/messaging/Duration.h"
+#include "qpid/messaging/Message.h"
+#include "qpid/messaging/MessageImpl.h"
+#include "qpid/framing/Uuid.h"
+#include "qpid/log/Statement.h"
+#include "qpid/sys/Time.h"
+#include <vector>
+
+namespace qpid {
+namespace messaging {
+namespace amqp {
+
+/*
+ssize_t intercept_input(pn_transport_t *transport, char *bytes, size_t available, void* context, pn_input_fn_t *next)
+{
+ ConnectionContext* connection = reinterpret_cast<ConnectionContext*>(context);
+ qpid::sys::ScopedLock<qpid::sys::Monitor> l(connection->lock);
+ ssize_t result = next(transport, bytes, available);
+ connection->lock.notifyAll();
+ return result;
+}
+ssize_t intercept_output(pn_transport_t *transport, char *bytes, size_t size, void* context, pn_output_fn_t *next)
+{
+ ConnectionContext* connection = reinterpret_cast<ConnectionContext*>(context);
+ qpid::sys::ScopedLock<qpid::sys::Monitor> l(connection->lock);
+ return next(transport, bytes, size);
+}
+time_t intercept_tick(pn_transport_t *transport, time_t now, void* context, pn_tick_fn_t *next)
+{
+ ConnectionContext* connection = reinterpret_cast<ConnectionContext*>(context);
+ qpid::sys::ScopedLock<qpid::sys::Monitor> l(connection->lock);
+ return next(transport, now);
+}
+*/
+
+/*
+void callback(pn_selectable_t* s)
+{
+ ConnectionContext* context = reinterpret_cast<ConnectionContext*>(pn_selectable_context(s));
+
+ if (context->read()) {
+ context->doInput();
+ }
+ if (context->doOutput()) {
+ context->write();
+ }
+}
+*/
+
+ConnectionContext::ConnectionContext(const std::string& u, const qpid::types::Variant::Map& o)
+ : url(u),
+ driver(pn_driver()),//TODO: allow driver to be shared
+ socket(0),
+ connection(0),
+ //transport(0),
+ options(o),
+ waitingToWrite(false),
+ active(false) {}
+
+ConnectionContext::~ConnectionContext()
+{
+ if (active.boolCompareAndSwap(true, false)) {
+ wakeupDriver();
+ driverThread.join();//if active was false, then the caller of
+ //close() will have joined the driver
+ //thread already
+ }
+ pn_connector_destroy(socket);
+ pn_driver_destroy(driver);
+ //What else am I responsible for deleting?
+}
+
+void ConnectionContext::open()
+{
+ qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ //TODO: check not already open
+ std::string host = "localhost";
+ std::string port = "5672";
+
+ for (Url::const_iterator i = url.begin(); i != url.end(); ++i) {
+ std::stringstream port;
+ port << i->port;
+ socket = pn_connector(driver, i->host.c_str(), port.str().c_str(), this);
+ if (socket) break;
+ }
+
+ if (socket) {
+ pn_sasl_t *sasl = pn_connector_sasl(socket);
+
+ std::string user = url.getUser();
+ if (user.size()) {//TODO: proper SASL
+ pn_sasl_plain(sasl, user.c_str(), url.getPass().c_str());
+ } else {
+ pn_sasl_mechanisms(sasl, "ANONYMOUS");
+ pn_sasl_client(sasl);
+ }
+
+ connection = pn_connector_connection(socket);
+ pn_connection_open(connection);
+ active = true;
+ driverThread = qpid::sys::Thread(this);
+ //wait for open state
+ while (pn_sasl_outcome(sasl) == PN_SASL_NONE ||
+ !(pn_connection_state(connection) & (PN_LOCAL_ACTIVE | PN_REMOTE_UNINIT))) {
+ wait();
+ }
+ if (pn_sasl_outcome(sasl) != PN_SASL_OK) {
+ throw qpid::messaging::ConnectionError("Authentication failed!");
+ }
+ } else {
+ //set error state
+ }
+}
+
+void ConnectionContext::run()
+{
+ while (active.get()) {
+ pn_driver_wait(driver);
+ for (pn_connector_t* c = pn_driver_connector(driver); c; c = pn_driver_connector(driver)) {
+ ConnectionContext* context = reinterpret_cast<ConnectionContext*>(pn_connector_context(c));
+ qpid::sys::ScopedLock<qpid::sys::Monitor> l(context->lock);
+ pn_connector_process(c);
+ context->lock.notifyAll();
+ }
+ }
+}
+
+bool ConnectionContext::isOpen() const
+{
+ qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ return pn_connection_state(connection) & (PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE);
+}
+
+void ConnectionContext::endSession(boost::shared_ptr<SessionContext> ssn)
+{
+ qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ pn_session_close(ssn->session);
+ //TODO: need to destroy session and remove context from map
+ wakeupDriver();
+}
+
+void ConnectionContext::close()
+{
+ {
+ qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ if (!(pn_connection_state(connection) & PN_LOCAL_CLOSED)) {
+ for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); ++i){
+ pn_session_close(i->second->session);
+ }
+ sessions.clear();
+ pn_connection_close(connection);
+ wakeupDriver();
+ //wait for close to be confirmed by peer?
+ while (!(pn_connection_state(connection) & PN_REMOTE_CLOSED)) {
+ wait();
+ }
+ }
+ }
+ if (active.boolCompareAndSwap(true, false)) {
+ wakeupDriver();
+ }
+ driverThread.join();
+}
+
+bool ConnectionContext::fetch(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk, qpid::messaging::Message& message, qpid::messaging::Duration timeout)
+{
+ if (!lnk->capacity) {
+ pn_flow(lnk->receiver, 1);
+ wakeupDriver();
+ }
+ if (get(ssn, lnk, message, timeout)) {
+ if (lnk->capacity) {
+ pn_flow(lnk->receiver, 1);//TODO: is this the right approach?
+ }
+ return true;
+ } else {
+ //TODO: flush
+ return get(ssn, lnk, message, qpid::messaging::Duration::IMMEDIATE);
+ }
+}
+
+qpid::sys::AbsTime convert(qpid::messaging::Duration timeout)
+{
+ qpid::sys::AbsTime until;
+ uint64_t ms = timeout.getMilliseconds();
+ if (ms < (uint64_t) (qpid::sys::TIME_INFINITE/qpid::sys::TIME_MSEC)) {
+ return qpid::sys::AbsTime(qpid::sys::now(), ms * qpid::sys::TIME_MSEC);
+ } else {
+ return qpid::sys::FAR_FUTURE;
+ }
+}
+
+bool ConnectionContext::get(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk, qpid::messaging::Message& message, qpid::messaging::Duration timeout)
+{
+ qpid::sys::AbsTime until(convert(timeout));
+ while (until > qpid::sys::now()) {
+ qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ pn_delivery_t* current = pn_current((pn_link_t*) lnk->receiver);
+ if (current) {
+ //TODO: can we avoid copying here?
+ std::vector<char> data;
+ data.resize(pn_pending(current));
+ ssize_t read = pn_recv(lnk->receiver, &data[0], data.size());
+ MessageDataDecoder decoder(MessageImplAccess::get(message));
+ decoder.decode(&data[0], read);
+ MessageImplAccess::get(message).setInternalId(ssn->record(current));
+ pn_advance(lnk->receiver);
+ return true;
+ } else {
+ wait();
+ }
+ }
+ return false;
+}
+
+void ConnectionContext::acknowledge(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message* message, bool cumulative)
+{
+ qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ if (message) {
+ ssn->acknowledge(MessageImplAccess::get(*message).getInternalId(), cumulative);
+ } else {
+ ssn->acknowledge();
+ }
+ wakeupDriver();
+}
+
+
+void ConnectionContext::attach(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<SenderContext> lnk)
+{
+ pn_set_target((pn_link_t*) lnk->sender, lnk->getTarget().c_str());
+ attach(ssn->session, (pn_link_t*) lnk->sender);
+}
+
+void ConnectionContext::attach(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk)
+{
+ pn_set_source((pn_link_t*) lnk->receiver, lnk->getSource().c_str());
+ attach(ssn->session, (pn_link_t*) lnk->receiver, lnk->capacity);
+}
+
+void ConnectionContext::attach(pn_session_t* /*session*/, pn_link_t* link, int credit)
+{
+ qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ pn_link_open(link);
+ if (credit) pn_flow(link, credit);
+ wakeupDriver();
+ while (!(pn_link_state(link) & PN_REMOTE_UNINIT)) wait();
+}
+
+void ConnectionContext::send(boost::shared_ptr<SenderContext> snd, const qpid::messaging::Message& message, bool sync)
+{
+ qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ SenderContext::Delivery* delivery(0);
+ while (!(delivery = snd->send(message))) {
+ QPID_LOG(debug, "Waiting for capacity...");
+ wait();//wait for capacity
+ }
+ wakeupDriver();
+ if (sync) {
+ while (!delivery->accepted()) {
+ QPID_LOG(debug, "Waiting for confirmation...");
+ wait();//wait until message has been confirmed
+ }
+ }
+}
+
+void ConnectionContext::setCapacity(boost::shared_ptr<SenderContext> sender, uint32_t capacity)
+{
+ qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ sender->setCapacity(capacity);
+}
+uint32_t ConnectionContext::getCapacity(boost::shared_ptr<SenderContext> sender)
+{
+ qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ return sender->getCapacity();
+}
+uint32_t ConnectionContext::getUnsettled(boost::shared_ptr<SenderContext> sender)
+{
+ qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ return sender->getUnsettled();
+}
+
+void ConnectionContext::setCapacity(boost::shared_ptr<ReceiverContext> receiver, uint32_t capacity)
+{
+ qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ receiver->setCapacity(capacity);
+}
+uint32_t ConnectionContext::getCapacity(boost::shared_ptr<ReceiverContext> receiver)
+{
+ qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ return receiver->getCapacity();
+}
+uint32_t ConnectionContext::getAvailable(boost::shared_ptr<ReceiverContext> receiver)
+{
+ qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ return receiver->getAvailable();
+}
+uint32_t ConnectionContext::getUnsettled(boost::shared_ptr<ReceiverContext> receiver)
+{
+ qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ return receiver->getUnsettled();
+}
+
+/**
+ * Expects lock to be held by caller
+ */
+void ConnectionContext::wakeupDriver()
+{
+ pn_driver_wakeup(driver);
+ waitingToWrite = true;
+ QPID_LOG(debug, "wakeupDriver()");
+}
+
+void ConnectionContext::wait()
+{
+ lock.wait();
+}
+/*
+bool ConnectionContext::read()
+{
+ int r = pn_selectable_recv(socket, inputBuffer.position(), inputBuffer.capacity());
+ if (r > 0) {
+ inputBuffer.advance(r);
+ return true;
+ } else if (r == 0) {
+ //closed
+ return false;
+ } else {//i.e. r < 0
+ return false;
+ }
+}
+
+void ConnectionContext::write()
+{
+ QPID_LOG(debug, "write() " << outputBuffer.available() << " bytes available");
+ int w = pn_selectable_send(socket, outputBuffer.start(), outputBuffer.available());
+ if (w < 0) {
+ //io error
+ } else {
+ outputBuffer.consume(w);
+ QPID_LOG(debug, "Wrote " << w << " bytes");
+ }
+}
+
+void ConnectionContext::doInput()
+{
+ qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ ssize_t n = pn_input(transport, inputBuffer.start(), inputBuffer.available());
+ if (n > 0) {
+ inputBuffer.consume(n);
+ lock.notifyAll();
+ } else if (n < 0) {
+ QPID_LOG(error, "Error on input. Engine returned code: " << n);
+ }
+}
+
+bool ConnectionContext::doOutput()
+{
+ qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ ssize_t n = pn_output(transport, outputBuffer.position(), outputBuffer.capacity());
+ if (n < 0) {
+ //engine error
+ } else {
+ outputBuffer.advance(n);
+ }
+ lock.notifyAll();
+ if (!outputBuffer.available() && !waitingToWrite) {
+ pn_selectable_flags(socket, PN_SEL_RD);
+ }
+ return outputBuffer.available();
+}
+
+time_t ConnectionContext::doTick(time_t now)
+{
+ qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ waitingToWrite = false;
+ return pn_tick(transport, now);
+}
+*/
+
+boost::shared_ptr<SessionContext> ConnectionContext::newSession(bool transactional, const std::string& n)
+{
+ qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ if (transactional) throw qpid::messaging::MessagingException("Transactions not yet supported");
+ std::string name = n.empty() ? qpid::framing::Uuid(true).str() : n;
+ SessionMap::const_iterator i = sessions.find(name);
+ if (i == sessions.end()) {
+ boost::shared_ptr<SessionContext> s(new SessionContext(connection));
+ s->session = pn_session(connection);
+ pn_session_open(s->session);
+ sessions[name] = s;
+ QPID_LOG(debug, "Session " << name << " begun");
+ wakeupDriver();
+ while (pn_session_state(s->session) & PN_REMOTE_UNINIT) wait();
+ return s;
+ } else {
+ throw qpid::messaging::KeyError(std::string("Session already exists: ") + name);
+ }
+
+}
+boost::shared_ptr<SessionContext> ConnectionContext::getSession(const std::string& name) const
+{
+ SessionMap::const_iterator i = sessions.find(name);
+ if (i == sessions.end()) {
+ throw qpid::messaging::KeyError(std::string("No such session") + name);
+ } else {
+ return i->second;
+ }
+}
+
+void ConnectionContext::setOption(const std::string& name, const qpid::types::Variant& value)
+{
+ options[name] = value;
+}
+
+std::string ConnectionContext::getAuthenticatedUsername()
+{
+ return std::string();//TODO
+}
+
+ConnectionContext::Buffer::Buffer(size_t s) : data(new char[s]), size(s), used(0) {}
+ConnectionContext::Buffer::~Buffer() { delete[](data); }
+char* ConnectionContext::Buffer::position()
+{
+ return data + used;
+}
+
+size_t ConnectionContext::Buffer::available()
+{
+ return used;
+}
+
+size_t ConnectionContext::Buffer::capacity()
+{
+ return size - used;
+}
+
+char* ConnectionContext::Buffer::start()
+{
+ return data;
+}
+
+void ConnectionContext::Buffer::advance(size_t bytes)
+{
+ used += bytes;
+}
+
+void ConnectionContext::Buffer::consume(size_t bytes)
+{
+ memmove(data, data + bytes, size - bytes);
+ used -= bytes;
+}
+
+
+}}} // namespace qpid::messaging::amqp
diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h
new file mode 100644
index 0000000000..c176b35fee
--- /dev/null
+++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h
@@ -0,0 +1,137 @@
+#ifndef QPID_MESSAGING_AMQP_CONNECTIONCONTEXT_H
+#define QPID_MESSAGING_AMQP_CONNECTIONCONTEXT_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <deque>
+#include <map>
+#include <string>
+#include <boost/shared_ptr.hpp>
+#include "qpid/Url.h"
+#include "qpid/sys/AtomicValue.h"
+#include "qpid/sys/Monitor.h"
+#include "qpid/sys/Runnable.h"
+#include "qpid/sys/Thread.h"
+#include "qpid/types/Variant.h"
+extern "C" {
+#include <proton/engine.h>
+#include <proton/driver.h>
+}
+
+namespace qpid {
+namespace messaging {
+class Duration;
+class Message;
+namespace amqp {
+
+class ReceiverContext;
+class SessionContext;
+class SenderContext;
+
+/**
+ *
+ */
+class ConnectionContext : qpid::sys::Runnable
+{
+ public:
+ ConnectionContext(const std::string& url, const qpid::types::Variant::Map& options);
+ ~ConnectionContext();
+ void open();
+ bool isOpen() const;
+ void close();
+ boost::shared_ptr<SessionContext> newSession(bool transactional, const std::string& name);
+ boost::shared_ptr<SessionContext> getSession(const std::string& name) const;
+ void endSession(boost::shared_ptr<SessionContext>);
+ void attach(boost::shared_ptr<SessionContext>, boost::shared_ptr<SenderContext>);
+ void attach(boost::shared_ptr<SessionContext>, boost::shared_ptr<ReceiverContext>);
+ void send(boost::shared_ptr<SenderContext> ctxt, const qpid::messaging::Message& message, bool sync);
+ bool fetch(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk, qpid::messaging::Message& message, qpid::messaging::Duration timeout);
+ bool get(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk, qpid::messaging::Message& message, qpid::messaging::Duration timeout);
+ void acknowledge(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message* message, bool cumulative);
+
+ void setOption(const std::string& name, const qpid::types::Variant& value);
+ std::string getAuthenticatedUsername();
+
+ void setCapacity(boost::shared_ptr<SenderContext>, uint32_t);
+ uint32_t getCapacity(boost::shared_ptr<SenderContext>);
+ uint32_t getUnsettled(boost::shared_ptr<SenderContext>);
+
+ void setCapacity(boost::shared_ptr<ReceiverContext>, uint32_t);
+ uint32_t getCapacity(boost::shared_ptr<ReceiverContext>);
+ uint32_t getAvailable(boost::shared_ptr<ReceiverContext>);
+ uint32_t getUnsettled(boost::shared_ptr<ReceiverContext>);
+
+ /*
+ void doInput();
+ bool doOutput();
+ time_t doTick(time_t now);
+ bool read();
+ void write();
+ */
+ void readProtocolHeader();
+ void writeProtocolHeader();
+
+ private:
+ class Buffer
+ {
+ public:
+ Buffer(size_t size=65536);
+ ~Buffer();
+ char* position();
+ size_t available();
+ size_t capacity();
+ char* start();
+ void advance(size_t bytes);
+ void consume(size_t bytes);
+ private:
+ char* const data;
+ const size_t size;
+ size_t used;
+ };
+ typedef std::map<std::string, boost::shared_ptr<SessionContext> > SessionMap;
+ qpid::Url url;
+ pn_driver_t* driver;
+ pn_connector_t* socket;
+ pn_connection_t* connection;
+ qpid::types::Variant::Map options;
+ SessionMap sessions;
+ mutable qpid::sys::Monitor lock;
+ qpid::sys::Thread driverThread;
+ Buffer inputBuffer;
+ Buffer outputBuffer;
+ bool waitingToWrite;
+ qpid::sys::AtomicValue<bool> active;
+
+ void wait();
+ void wakeupDriver();
+ void run();
+ void attach(pn_session_t*, pn_link_t*, int credit=0);
+
+ /*
+ friend ssize_t intercept_input(pn_transport_t *transport, char *bytes, size_t available, void* context, pn_input_fn_t *next);
+ friend ssize_t intercept_output(pn_transport_t *transport, char *bytes, size_t size, void* context, pn_output_fn_t *next);
+ friend time_t intercept_tick(pn_transport_t *transport, time_t now, void* context, pn_tick_fn_t *next);
+ */
+};
+
+}}} // namespace qpid::messaging::amqp
+
+#endif /*!QPID_MESSAGING_AMQP_CONNECTIONCONTEXT_H*/
diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionHandle.cpp b/qpid/cpp/src/qpid/messaging/amqp/ConnectionHandle.cpp
new file mode 100644
index 0000000000..8fc3182c22
--- /dev/null
+++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionHandle.cpp
@@ -0,0 +1,68 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "ConnectionHandle.h"
+#include "ConnectionContext.h"
+#include "SessionHandle.h"
+#include "qpid/messaging/Session.h"
+
+namespace qpid {
+namespace messaging {
+namespace amqp {
+
+ConnectionHandle::ConnectionHandle(const std::string& url, const qpid::types::Variant::Map& options) : connection(new ConnectionContext(url, options)) {}
+ConnectionHandle::ConnectionHandle(boost::shared_ptr<ConnectionContext> c) : connection(c) {}
+
+void ConnectionHandle::open()
+{
+ connection->open();
+}
+
+bool ConnectionHandle::isOpen() const
+{
+ return connection->isOpen();
+}
+
+void ConnectionHandle::close()
+{
+ connection->close();
+}
+
+Session ConnectionHandle::newSession(bool transactional, const std::string& name)
+{
+ return qpid::messaging::Session(new SessionHandle(connection, connection->newSession(transactional, name)));
+}
+
+Session ConnectionHandle::getSession(const std::string& name) const
+{
+ return qpid::messaging::Session(new SessionHandle(connection, connection->getSession(name)));
+}
+
+void ConnectionHandle::setOption(const std::string& name, const qpid::types::Variant& value)
+{
+ connection->setOption(name, value);
+}
+
+std::string ConnectionHandle::getAuthenticatedUsername()
+{
+ return connection->getAuthenticatedUsername();
+}
+
+}}} // namespace qpid::messaging::amqp
diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionHandle.h b/qpid/cpp/src/qpid/messaging/amqp/ConnectionHandle.h
new file mode 100644
index 0000000000..d1eb27f6de
--- /dev/null
+++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionHandle.h
@@ -0,0 +1,58 @@
+#ifndef QPID_MESSAGING_AMQP_CONNECTIONHANDLE_H
+#define QPID_MESSAGING_AMQP_CONNECTIONHANDLE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <boost/shared_ptr.hpp>
+#include "qpid/messaging/ConnectionImpl.h"
+#include "qpid/types/Variant.h"
+
+namespace qpid {
+namespace messaging {
+namespace amqp {
+
+class ConnectionContext;
+/**
+ * Handles are directly referenced by applications; Contexts are
+ * referenced by Handles. This allows a graph structure that
+ * remains intact as long as the application references any part
+ * of it, but that can be automatically reclaimed if the whole
+ * graph becomes unreferenced.
+ */
+class ConnectionHandle : public qpid::messaging::ConnectionImpl
+{
+ public:
+ ConnectionHandle(const std::string& url, const qpid::types::Variant::Map& options);
+ ConnectionHandle(boost::shared_ptr<ConnectionContext>);
+ void open();
+ bool isOpen() const;
+ void close();
+ Session newSession(bool transactional, const std::string& name);
+ Session getSession(const std::string& name) const;
+ void setOption(const std::string& name, const qpid::types::Variant& value);
+ std::string getAuthenticatedUsername();
+ private:
+ boost::shared_ptr<ConnectionContext> connection;
+};
+
+}}} // namespace qpid::messaging::amqp_1.0
+
+#endif /*!QPID_MESSAGING_AMQP_CONNECTIONHANDLE_H*/
diff --git a/qpid/cpp/src/qpid/messaging/amqp/Decoder.cpp b/qpid/cpp/src/qpid/messaging/amqp/Decoder.cpp
new file mode 100644
index 0000000000..c61ae21e0e
--- /dev/null
+++ b/qpid/cpp/src/qpid/messaging/amqp/Decoder.cpp
@@ -0,0 +1,406 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "Decoder.h"
+#include "qpid/log/Statement.h"
+#include "qpid/messaging/MessageImpl.h"
+#include <string.h>
+
+namespace qpid {
+namespace messaging {
+namespace amqp {
+namespace {
+void on_null(void *ctx)
+{
+ reinterpret_cast<Decoder*>(ctx)->onNull();
+}
+void on_bool(void *ctx, bool v)
+{
+ reinterpret_cast<Decoder*>(ctx)->onBool(v);
+}
+void on_ubyte(void *ctx, uint8_t v)
+{
+ reinterpret_cast<Decoder*>(ctx)->onUbyte(v);
+}
+void on_byte(void *ctx, int8_t v)
+{
+ reinterpret_cast<Decoder*>(ctx)->onByte(v);
+}
+void on_ushort(void *ctx, uint16_t v)
+{
+ reinterpret_cast<Decoder*>(ctx)->onUshort(v);
+}
+void on_short(void *ctx, int16_t v)
+{
+ reinterpret_cast<Decoder*>(ctx)->onShort(v);
+}
+void on_uint(void *ctx, uint32_t v)
+{
+ reinterpret_cast<Decoder*>(ctx)->onUint(v);
+}
+void on_int(void *ctx, int32_t v)
+{
+ reinterpret_cast<Decoder*>(ctx)->onInt(v);
+}
+void on_float(void *ctx, float v)
+{
+ reinterpret_cast<Decoder*>(ctx)->onFloat(v);
+}
+void on_ulong(void *ctx, uint64_t v)
+{
+ reinterpret_cast<Decoder*>(ctx)->onUlong(v);
+}
+void on_long(void *ctx, int64_t v)
+{
+ reinterpret_cast<Decoder*>(ctx)->onLong(v);
+}
+void on_double(void *ctx, double v)
+{
+ reinterpret_cast<Decoder*>(ctx)->onDouble(v);
+}
+void on_binary(void *ctx, size_t size, char *bytes)
+{
+ reinterpret_cast<Decoder*>(ctx)->onBinary(size, bytes);
+}
+void on_utf8(void *ctx, size_t size, char *utf8)
+{
+ reinterpret_cast<Decoder*>(ctx)->onUtf8(size, utf8);
+}
+void on_symbol(void *ctx, size_t size, char *str)
+{
+ reinterpret_cast<Decoder*>(ctx)->onSymbol(size, str);
+}
+void start_descriptor(void *ctx)
+{
+ reinterpret_cast<Decoder*>(ctx)->startDescriptor();
+}
+void stop_descriptor(void *ctx)
+{
+ reinterpret_cast<Decoder*>(ctx)->stopDescriptor();
+}
+void start_array(void *ctx, size_t count, uint8_t code)
+{
+ reinterpret_cast<Decoder*>(ctx)->startArray(count, code);
+}
+void stop_array(void *ctx, size_t count, uint8_t code)
+{
+ reinterpret_cast<Decoder*>(ctx)->stopArray(count, code);
+}
+void start_list(void *ctx, size_t count)
+{
+ reinterpret_cast<Decoder*>(ctx)->startList(count);
+}
+void stop_list(void *ctx, size_t count)
+{
+ reinterpret_cast<Decoder*>(ctx)->stopList(count);
+}
+void start_map(void *ctx, size_t count)
+{
+ reinterpret_cast<Decoder*>(ctx)->startMap(count);
+}
+void stop_map(void *ctx, size_t count)
+{
+ reinterpret_cast<Decoder*>(ctx)->stopMap(count);
+}
+}
+
+Decoder::~Decoder() {}
+
+ssize_t Decoder::decode(const char* data, size_t size)
+{
+ pn_data_callbacks_t callbacks;
+ callbacks.on_null = &on_null;
+ callbacks.on_bool = &on_bool;
+ callbacks.on_ubyte = &on_ubyte;
+ callbacks.on_byte = &on_byte;
+ callbacks.on_ushort = &on_ushort;
+ callbacks.on_short = &on_short;
+ callbacks.on_uint = &on_uint;
+ callbacks.on_int = &on_int;
+ callbacks.on_float = &on_float;
+ callbacks.on_ulong = &on_ulong;
+ callbacks.on_long = &on_long;
+ callbacks.on_double = &on_double;
+ callbacks.on_binary = &on_binary;
+ callbacks.on_utf8 = &on_utf8;
+ callbacks.on_symbol = &on_symbol;
+ callbacks.start_descriptor = &start_descriptor;
+ callbacks.stop_descriptor = &stop_descriptor;
+ callbacks.start_array = &start_array;
+ callbacks.stop_array = &stop_array;
+ callbacks.start_list = &start_list;
+ callbacks.stop_list = &stop_list;
+ callbacks.start_map = &start_map;
+ callbacks.stop_map = &stop_map;
+ size_t total = 0;
+ while (total < size) {
+ ssize_t result = pn_read_datum(data + total, size - total, &callbacks, this);
+ if (result < 0) return result;
+ else total += result;
+ }
+ return total;
+}
+
+void DecoderBase::onNull() { QPID_LOG(debug, this << " onNull()"); }
+void DecoderBase::onBool(bool v) { QPID_LOG(debug, this << " onBool(" << v << ")"); }
+void DecoderBase::onUbyte(uint8_t v) { QPID_LOG(debug, this << " onUbyte(" << v << ")"); }
+void DecoderBase::onByte(int8_t v) { QPID_LOG(debug, this << " onByte(" << v << ")"); }
+void DecoderBase::onUshort(uint16_t v) { QPID_LOG(debug, this << " onUshort(" << v << ")"); }
+void DecoderBase::onShort(int16_t v) { QPID_LOG(debug, this << " onShort(" << v << ")"); }
+void DecoderBase::onUint(uint32_t v) { QPID_LOG(debug, this << " onUint(" << v << ")"); }
+void DecoderBase::onInt(int32_t v) { QPID_LOG(debug, this << " onInt(" << v << ")"); }
+void DecoderBase::onFloat(float v) { QPID_LOG(debug, this << " onFloat(" << v << ")"); }
+void DecoderBase::onUlong(uint64_t v) { QPID_LOG(debug, this << " onUlong(" << v << ")"); }
+void DecoderBase::onLong(int64_t v) { QPID_LOG(debug, this << " onLong(" << v << ")"); }
+void DecoderBase::onDouble(double v) { QPID_LOG(debug, this << " onDouble(" << v << ")"); }
+void DecoderBase::onBinary(size_t size, char*) { QPID_LOG(debug, this << " onBinary(" << size << ")"); }
+void DecoderBase::onUtf8(size_t size, char*) { QPID_LOG(debug, this << " onUtf8(" << size << ")"); }
+void DecoderBase::onSymbol(size_t size, char*) { QPID_LOG(debug, this << " onSymbol(" << size << ")"); }
+void DecoderBase::startDescriptor() { inDescriptor = true; QPID_LOG(debug, this << " startDescriptor()"); }
+void DecoderBase::stopDescriptor() { inDescriptor = false; QPID_LOG(debug, this << " stopDescriptor()"); }
+void DecoderBase::startArray(size_t count, uint8_t code) { QPID_LOG(debug, this << " startArray(" << count << ", " << code << ")"); }
+void DecoderBase::stopArray(size_t count, uint8_t code) { QPID_LOG(debug, this << " stopArray(" << count << ", " << code << ")"); }
+void DecoderBase::startList(size_t count) { QPID_LOG(debug, this << " startList(" << count << ")"); }
+void DecoderBase::stopList(size_t count) { QPID_LOG(debug, this << " stopList(" << count << ")"); }
+void DecoderBase::startMap(size_t count) { QPID_LOG(debug, this << " startMap(" << count << ")"); }
+void DecoderBase::stopMap(size_t count) { QPID_LOG(debug, this << " stopMap(" << count << ")"); }
+DecoderBase::~DecoderBase() {}
+
+void ContextSensitiveDecoder::onNull()
+{
+ if ((decoder = getDecoder())) decoder->onNull();
+ else QPID_LOG(debug, this << " onNull() not handled");
+ onValue();
+}
+void ContextSensitiveDecoder::onBool(bool v)
+{
+ if ((decoder = getDecoder())) decoder->onBool(v);
+ else QPID_LOG(debug, this << " onBool(" << v << ") not handled");
+ onValue();
+}
+void ContextSensitiveDecoder::onUbyte(uint8_t v)
+{
+ if ((decoder = getDecoder())) decoder->onUbyte(v);
+ else QPID_LOG(debug, this << " onUbyte(" << v << ") not handled");
+ onValue();
+}
+
+void ContextSensitiveDecoder::onByte(int8_t v)
+{
+ if ((decoder = getDecoder())) decoder->onByte(v);
+ else QPID_LOG(debug, this << " onByte(" << v << ") not handled");
+ onValue();
+}
+
+void ContextSensitiveDecoder::onUshort(uint16_t v)
+{
+ if ((decoder = getDecoder())) decoder->onUshort(v);
+ else QPID_LOG(debug, this << " onUshort(" << v << ") not handled");
+ onValue();
+}
+
+void ContextSensitiveDecoder::onShort(int16_t v)
+{
+ if ((decoder = getDecoder())) decoder->onShort(v);
+ else QPID_LOG(debug, this << " onShort(" << v << ") not handled");
+ onValue();
+}
+
+void ContextSensitiveDecoder::onUint(uint32_t v)
+{
+ if ((decoder = getDecoder())) decoder->onUint(v);
+ else QPID_LOG(debug, this << " onUint(" << v << ") not handled");
+ onValue();
+}
+
+void ContextSensitiveDecoder::onInt(int32_t v)
+{
+ if ((decoder = getDecoder())) decoder->onInt(v);
+ else QPID_LOG(debug, this << " onInt(" << v << ") not handled");
+ onValue();
+}
+
+void ContextSensitiveDecoder::onFloat(float v)
+{
+ if ((decoder = getDecoder())) decoder->onFloat(v);
+ else QPID_LOG(debug, this << " onFloat(" << v << ") not handled");
+ onValue();
+}
+
+void ContextSensitiveDecoder::onUlong(uint64_t v)
+{
+ if ((decoder = getDecoder())) decoder->onUlong(v);
+ else QPID_LOG(debug, this << " onUlong(" << v << ") not handled");
+ onValue();
+}
+
+void ContextSensitiveDecoder::onLong(int64_t v)
+{
+ if ((decoder = getDecoder())) decoder->onLong(v);
+ else QPID_LOG(debug, this << " onLong(" << v << ") not handled");
+ onValue();
+}
+
+void ContextSensitiveDecoder::onDouble(double v)
+{
+ if ((decoder = getDecoder())) decoder->onDouble(v);
+ else QPID_LOG(debug, this << " onDouble(" << v << ") not handled");
+ onValue();
+}
+
+void ContextSensitiveDecoder::onBinary(size_t size, char* bytes)
+{
+ if ((decoder = getDecoder())) decoder->onBinary(size, bytes);
+ else QPID_LOG(debug, this << " onBinary(" << size << ") not handled");
+ onValue();
+}
+
+void ContextSensitiveDecoder::onUtf8(size_t size, char* utf8)
+{
+ if ((decoder = getDecoder())) decoder->onUtf8(size, utf8);
+ else QPID_LOG(debug, this << " onUtf8(" << size << ") not handled");
+ onValue();
+}
+
+void ContextSensitiveDecoder::onSymbol(size_t size, char* symbol)
+{
+ if ((decoder = getDecoder())) decoder->onSymbol(size, symbol);
+ else QPID_LOG(debug, this << " onSymbol(" << size << ") not handled");
+ onValue();
+}
+
+void ContextSensitiveDecoder::startDescriptor()
+{
+ if ((decoder = getDecoder())) decoder->startDescriptor();
+ else QPID_LOG(debug, this << " startDescriptor() not handled");
+}
+
+void ContextSensitiveDecoder::stopDescriptor()
+{
+ ++descriptorLevel;
+ if ((decoder = getDecoder())) decoder->stopDescriptor();
+ else QPID_LOG(debug, this << " stopDescriptor() not handled");
+}
+
+void ContextSensitiveDecoder::startArray(size_t count, uint8_t code)
+{
+ ++valueLevel;
+ if ((decoder = getDecoder())) decoder->startArray(count, code);
+ else QPID_LOG(debug, this << " startArray(" << count << ", " << code << ") not handled");
+}
+
+void ContextSensitiveDecoder::stopArray(size_t count, uint8_t code)
+{
+ stopNested();
+ if ((decoder = getDecoder())) decoder->stopArray(count, code);
+ else QPID_LOG(debug, this << " stopArray(" << count << ", " << code << ") not handled");
+}
+
+void ContextSensitiveDecoder::startList(size_t count)
+{
+ startNested();
+ if ((decoder = getDecoder())) decoder->startList(count);
+ else QPID_LOG(debug, this << " startList(" << count << ") not handled");
+}
+
+void ContextSensitiveDecoder::stopList(size_t count)
+{
+ stopNested();
+ if ((decoder = getDecoder())) decoder->stopList(count);
+ else QPID_LOG(debug, this << " stopList(" << count << ") not handled");
+}
+
+void ContextSensitiveDecoder::startMap(size_t count)
+{
+ startNested();
+ if ((decoder = getDecoder())) decoder->startMap(count);
+ else QPID_LOG(debug, this << " startMap(" << count << ") not handled");
+}
+
+void ContextSensitiveDecoder::stopMap(size_t count)
+{
+ stopNested();
+ if ((decoder = getDecoder())) decoder->stopMap(count);
+ else QPID_LOG(debug, this << " stopMap(" << count << ") not handled");
+}
+
+void ContextSensitiveDecoder::onValue()
+{
+ if (valueLevel == descriptorLevel) {
+ described = false;
+ }
+}
+
+void ContextSensitiveDecoder::startNested()
+{
+ ++valueLevel;
+}
+
+void ContextSensitiveDecoder::stopNested()
+{
+ if (--valueLevel == descriptorLevel) {
+ --descriptorLevel;
+ described = false;
+ }
+}
+
+ContextSensitiveDecoder::ContextSensitiveDecoder() : described(false), descriptorLevel(-1), valueLevel(0), decoder(0) {}
+void ContextSensitiveDecoder::setDecoder(DecoderBase* d) { decoder = d; }
+DecoderBase* ContextSensitiveDecoder::getDecoder() { return decoder; }
+ContextSensitiveDecoder::~ContextSensitiveDecoder() {}
+
+DescribedValueDecoder::DescribedValueDecoder(uint64_t c, const std::string& s) : descriptorCode(c), descriptorSymbol(s),
+ descriptorDecoder(*this), matched(false) {}
+DescribedValueDecoder::~DescribedValueDecoder() {}
+DecoderBase* DescribedValueDecoder::getDecoder()
+{
+ if (inDescriptor) return &descriptorDecoder;
+ else if (described && matched) return this;//when does matched get switched off?
+ else return 0;
+}
+
+DescribedValueDecoder::DescriptorDecoder::DescriptorDecoder(DescribedValueDecoder& p) : parent(p) {}
+
+void DescribedValueDecoder::DescriptorDecoder::onSymbol(size_t size, char *str)
+{
+ parent.matched = (::strncmp(str, parent.descriptorSymbol.c_str(), size) == 0);
+ QPID_LOG(debug, &parent << ":" << this << ":DescriptorDecoder: onSymbol("
+ << std::string(str, size) << " -> " << (parent.matched ? "matched" : "did not match") << ")");
+}
+void DescribedValueDecoder::DescriptorDecoder::onUlong(uint64_t v)
+{
+ parent.matched = (v == parent.descriptorCode);
+ QPID_LOG(debug, &parent << ":" << this << ":DescriptorDecoder: onUlong(" << v << " -> " << (parent.matched ? "matched" : "did not match") << ")");
+}
+
+MessageDataDecoder::MessageDataDecoder(qpid::messaging::MessageImpl& m)
+ : DescribedValueDecoder(0x00000075, "amqp:data:binary"), msg(m)
+{
+ msg.bytes.clear();
+}
+
+void MessageDataDecoder::onBinary(size_t size, char *bytes)
+{
+ QPID_LOG(debug, this << ":MessageDataDecoder: onBinary() with " << size << " bytes");
+ msg.appendBytes(bytes, size);
+}
+
+
+}}} // namespace qpid::messaging::amqp
diff --git a/qpid/cpp/src/qpid/messaging/amqp/Decoder.h b/qpid/cpp/src/qpid/messaging/amqp/Decoder.h
new file mode 100644
index 0000000000..b3c883a8d3
--- /dev/null
+++ b/qpid/cpp/src/qpid/messaging/amqp/Decoder.h
@@ -0,0 +1,153 @@
+#ifndef QPID_MESSAGING_AMQP_DECODER_H
+#define QPID_MESSAGING_AMQP_DECODER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/sys/IntegerTypes.h"
+#include <string>
+extern "C" {
+#include "proton/codec.h"
+}
+
+namespace qpid {
+namespace messaging {
+struct MessageImpl;
+namespace amqp {
+
+class DecoderBase
+{
+ public:
+ virtual ~DecoderBase();
+ virtual void onNull();
+ virtual void onBool(bool v);
+ virtual void onUbyte(uint8_t v);
+ virtual void onByte(int8_t v);
+ virtual void onUshort(uint16_t v);
+ virtual void onShort(int16_t v);
+ virtual void onUint(uint32_t v);
+ virtual void onInt(int32_t v);
+ virtual void onFloat(float v);
+ virtual void onUlong(uint64_t v);
+ virtual void onLong(int64_t v);
+ virtual void onDouble(double v);
+ virtual void onBinary(size_t size, char *bytes);
+ virtual void onUtf8(size_t size, char *utf8);
+ virtual void onSymbol(size_t size, char *str);
+ virtual void startDescriptor();
+ virtual void stopDescriptor();
+ virtual void startArray(size_t count, uint8_t code);
+ virtual void stopArray(size_t count, uint8_t code);
+ virtual void startList(size_t count);
+ virtual void stopList(size_t count);
+ virtual void startMap(size_t count);
+ virtual void stopMap(size_t count);
+ protected:
+ bool inDescriptor;
+};
+
+/**
+ *
+ */
+class Decoder : public DecoderBase
+{
+ public:
+ virtual ~Decoder();
+ ssize_t decode(const char*, size_t);
+};
+
+class ContextSensitiveDecoder : public Decoder
+{
+ public:
+ ContextSensitiveDecoder();
+ virtual ~ContextSensitiveDecoder();
+ protected:
+ bool described;
+ ssize_t descriptorLevel;
+ ssize_t valueLevel;
+
+ void setDecoder(DecoderBase*);
+ virtual DecoderBase* getDecoder();
+ virtual void startNested();
+ virtual void stopNested();
+ virtual void onValue();
+ private:
+ DecoderBase* decoder;
+ void onNull();
+ void onBool(bool v);
+ void onUbyte(uint8_t v);
+ void onByte(int8_t v);
+ void onUshort(uint16_t v);
+ void onShort(int16_t v);
+ void onUint(uint32_t v);
+ void onInt(int32_t v);
+ void onFloat(float v);
+ void onUlong(uint64_t v);
+ void onLong(int64_t v);
+ void onDouble(double v);
+ void onBinary(size_t size, char *bytes);
+ void onUtf8(size_t size, char *utf8);
+ void onSymbol(size_t size, char *str);
+ void startDescriptor();
+ void stopDescriptor();
+ void startArray(size_t count, uint8_t code);
+ void stopArray(size_t count, uint8_t code);
+ void startList(size_t count);
+ void stopList(size_t count);
+ void startMap(size_t count);
+ void stopMap(size_t count);
+};
+
+class DescribedValueDecoder : public ContextSensitiveDecoder
+{
+ public:
+ DescribedValueDecoder(uint64_t descriptorCode, const std::string& descriptorSymbol);
+ virtual ~DescribedValueDecoder();
+ protected:
+ virtual DecoderBase* getDecoder();
+ private:
+ class DescriptorDecoder : public DecoderBase
+ {
+ public:
+ DescriptorDecoder(DescribedValueDecoder&);
+ void onSymbol(size_t size, char *str);
+ void onUlong(uint64_t v);
+ private:
+ DescribedValueDecoder& parent;
+ };
+
+ uint64_t descriptorCode;
+ std::string descriptorSymbol;
+ DescriptorDecoder descriptorDecoder;
+ bool matched;
+};
+
+class MessageDataDecoder : public DescribedValueDecoder
+{
+ public:
+ MessageDataDecoder(qpid::messaging::MessageImpl& msg);
+ private:
+ qpid::messaging::MessageImpl& msg;
+ void onBinary(size_t size, char *bytes);
+};
+
+}}} // namespace qpid::messaging::amqp
+
+#endif /*!QPID_MESSAGING_AMQP_DECODER_H*/
diff --git a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp
new file mode 100644
index 0000000000..b9972dc112
--- /dev/null
+++ b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp
@@ -0,0 +1,97 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "ReceiverContext.h"
+#include "qpid/messaging/Duration.h"
+#include "qpid/messaging/Message.h"
+extern "C" {
+#include "proton/engine.h"
+}
+
+namespace qpid {
+namespace messaging {
+namespace amqp {
+//TODO: proper conversion to wide string for address
+ReceiverContext::ReceiverContext(pn_session_t* session, const std::string& n, const std::string& s)
+ : name(n),
+ source(s.begin(), s.end()),
+ receiver(pn_receiver(session, source.c_str())),
+ capacity(0) {}
+ReceiverContext::~ReceiverContext()
+{
+ pn_link_destroy(receiver);
+}
+
+void ReceiverContext::setCapacity(uint32_t c)
+{
+ if (c != capacity) {
+ //stop
+ capacity = c;
+ //reissue credit
+ }
+}
+
+uint32_t ReceiverContext::getCapacity()
+{
+ return capacity;
+}
+
+uint32_t ReceiverContext::getAvailable()
+{
+ uint32_t count(0);
+ for (pn_delivery_t* d = pn_unsettled_head(receiver); d; d = pn_unsettled_next(d)) {
+ ++count;
+ if (d == pn_current(receiver)) break;
+ }
+ return count;
+}
+
+uint32_t ReceiverContext::getUnsettled()
+{
+ uint32_t count(0);
+ for (pn_delivery_t* d = pn_unsettled_head(receiver); d; d = pn_unsettled_next(d)) {
+ ++count;
+ }
+ return count;
+}
+
+void ReceiverContext::close()
+{
+
+}
+
+const std::string& ReceiverContext::getName() const
+{
+ return name;
+}
+
+const std::wstring& ReceiverContext::getSource() const
+{
+ return source;
+}
+
+bool ReceiverContext::isClosed() const
+{
+ return false;//TODO
+}
+
+
+
+}}} // namespace qpid::messaging::amqp
diff --git a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h
new file mode 100644
index 0000000000..f83de8e151
--- /dev/null
+++ b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h
@@ -0,0 +1,63 @@
+#ifndef QPID_MESSAGING_AMQP_RECEIVERCONTEXT_H
+#define QPID_MESSAGING_AMQP_RECEIVERCONTEXT_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <string>
+#include "qpid/sys/IntegerTypes.h"
+
+struct pn_link_t;
+struct pn_session_t;
+
+namespace qpid {
+namespace messaging {
+
+class Duration;
+class Message;
+
+namespace amqp {
+
+/**
+ *
+ */
+class ReceiverContext
+{
+ public:
+ ReceiverContext(pn_session_t* session, const std::string& name, const std::string& source);
+ ~ReceiverContext();
+ void setCapacity(uint32_t);
+ uint32_t getCapacity();
+ uint32_t getAvailable();
+ uint32_t getUnsettled();
+ void close();
+ const std::string& getName() const;
+ const std::wstring& getSource() const;
+ bool isClosed() const;
+ private:
+ friend class ConnectionContext;
+ const std::string name;
+ const std::wstring source;
+ pn_link_t* receiver;
+ uint32_t capacity;
+};
+}}} // namespace qpid::messaging::amqp
+
+#endif /*!QPID_MESSAGING_AMQP_RECEIVERCONTEXT_H*/
diff --git a/qpid/cpp/src/qpid/messaging/amqp/ReceiverHandle.cpp b/qpid/cpp/src/qpid/messaging/amqp/ReceiverHandle.cpp
new file mode 100644
index 0000000000..9bf64ebb8d
--- /dev/null
+++ b/qpid/cpp/src/qpid/messaging/amqp/ReceiverHandle.cpp
@@ -0,0 +1,106 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "ReceiverHandle.h"
+#include "ConnectionContext.h"
+#include "SessionContext.h"
+#include "SessionHandle.h"
+#include "ReceiverContext.h"
+#include "qpid/messaging/Duration.h"
+#include "qpid/messaging/exceptions.h"
+#include "qpid/messaging/Message.h"
+#include "qpid/messaging/Session.h"
+
+namespace qpid {
+namespace messaging {
+namespace amqp {
+
+ReceiverHandle::ReceiverHandle(boost::shared_ptr<ConnectionContext> c,
+ boost::shared_ptr<SessionContext> s,
+ boost::shared_ptr<ReceiverContext> r
+) : connection(c), session(s), receiver(r) {}
+
+
+bool ReceiverHandle::get(qpid::messaging::Message& message, qpid::messaging::Duration timeout)
+{
+ return connection->get(session, receiver, message, timeout);
+}
+
+qpid::messaging::Message ReceiverHandle::get(qpid::messaging::Duration timeout)
+{
+ qpid::messaging::Message result;
+ if (!get(result, timeout)) throw qpid::messaging::NoMessageAvailable();
+ return result;
+}
+
+bool ReceiverHandle::fetch(qpid::messaging::Message& message, qpid::messaging::Duration timeout)
+{
+ return connection->fetch(session, receiver, message, timeout);
+}
+
+qpid::messaging::Message ReceiverHandle::fetch(qpid::messaging::Duration timeout)
+{
+ qpid::messaging::Message result;
+ if (!fetch(result, timeout)) throw qpid::messaging::NoMessageAvailable();
+ return result;
+}
+
+void ReceiverHandle::setCapacity(uint32_t capacity)
+{
+ connection->setCapacity(receiver, capacity);
+}
+
+uint32_t ReceiverHandle::getCapacity()
+{
+ return connection->getCapacity(receiver);
+}
+
+uint32_t ReceiverHandle::getAvailable()
+{
+ return connection->getAvailable(receiver);
+}
+
+uint32_t ReceiverHandle::getUnsettled()
+{
+ return connection->getUnsettled(receiver);
+}
+
+void ReceiverHandle::close()
+{
+ session->closeReceiver(getName());
+}
+
+const std::string& ReceiverHandle::getName() const
+{
+ return receiver->getName();
+}
+
+qpid::messaging::Session ReceiverHandle::getSession() const
+{
+ //create new SessionHandle instance; i.e. create new handle that shares the same context
+ return qpid::messaging::Session(new SessionHandle(connection, session));
+}
+
+bool ReceiverHandle::isClosed() const
+{
+ return receiver->isClosed();
+}
+
+}}} // namespace qpid::messaging::amqp
diff --git a/qpid/cpp/src/qpid/messaging/amqp/ReceiverHandle.h b/qpid/cpp/src/qpid/messaging/amqp/ReceiverHandle.h
new file mode 100644
index 0000000000..a1a6f26025
--- /dev/null
+++ b/qpid/cpp/src/qpid/messaging/amqp/ReceiverHandle.h
@@ -0,0 +1,63 @@
+#ifndef QPID_MESSAGING_AMQP_RECEIVERHANDLE_H
+#define QPID_MESSAGING_AMQP_RECEIVERHANDLE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <boost/shared_ptr.hpp>
+#include "qpid/messaging/ReceiverImpl.h"
+
+namespace qpid {
+namespace messaging {
+namespace amqp {
+
+class ConnectionContext;
+class SessionContext;
+class ReceiverContext;
+/**
+ *
+ */
+class ReceiverHandle : public qpid::messaging::ReceiverImpl
+{
+ public:
+ ReceiverHandle(boost::shared_ptr<ConnectionContext>,
+ boost::shared_ptr<SessionContext>,
+ boost::shared_ptr<ReceiverContext>
+ );
+ bool get(Message& message, qpid::messaging::Duration timeout);
+ qpid::messaging::Message get(qpid::messaging::Duration timeout);
+ bool fetch(Message& message, qpid::messaging::Duration timeout);
+ qpid::messaging::Message fetch(qpid::messaging::Duration timeout);
+ void setCapacity(uint32_t);
+ uint32_t getCapacity();
+ uint32_t getAvailable();
+ uint32_t getUnsettled();
+ void close();
+ const std::string& getName() const;
+ qpid::messaging::Session getSession() const;
+ bool isClosed() const;
+ private:
+ boost::shared_ptr<ConnectionContext> connection;
+ boost::shared_ptr<SessionContext> session;
+ boost::shared_ptr<ReceiverContext> receiver;
+};
+}}} // namespace qpid::messaging::amqp
+
+#endif /*!QPID_MESSAGING_AMQP_RECEIVERHANDLE_H*/
diff --git a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp
new file mode 100644
index 0000000000..521538952d
--- /dev/null
+++ b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp
@@ -0,0 +1,116 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "SenderContext.h"
+#include "qpid/messaging/exceptions.h"
+#include "qpid/messaging/Message.h"
+#include "qpid/log/Statement.h"
+extern "C" {
+#include "proton/engine.h"
+#include "proton/message.h"
+}
+
+namespace qpid {
+namespace messaging {
+namespace amqp {
+//TODO: proper conversion to wide string for address
+SenderContext::SenderContext(pn_session_t* session, const std::string& n, const std::string& t)
+ : name(n),
+ target(t.begin(), t.end()),
+ sender(pn_sender(session, target.c_str())), capacity(1000) {}
+
+SenderContext::~SenderContext()
+{
+ pn_link_destroy(sender);
+}
+
+void SenderContext::close()
+{
+
+}
+
+void SenderContext::setCapacity(uint32_t c)
+{
+ if (c < deliveries.size()) throw qpid::messaging::SenderError("Desired capacity is less than unsettled message count!");
+ capacity = c;
+}
+
+uint32_t SenderContext::getCapacity()
+{
+ return capacity;
+}
+
+uint32_t SenderContext::getUnsettled()
+{
+ return processUnsettled();
+}
+
+const std::string& SenderContext::getName() const
+{
+ return name;
+}
+
+const std::wstring& SenderContext::getTarget() const
+{
+ return target;
+}
+
+SenderContext::Delivery* SenderContext::send(const qpid::messaging::Message& message)
+{
+ if (processUnsettled() < capacity) {
+ deliveries.push_back(Delivery(nextId++, message));
+ Delivery& delivery = deliveries.back();
+ delivery.send(sender);
+ return &delivery;
+ } else {
+ return 0;
+ }
+}
+
+uint32_t SenderContext::processUnsettled()
+{
+ //remove accepted messages from front of deque
+ while (!deliveries.empty() && deliveries.front().accepted()) {
+ deliveries.pop_front();
+ }
+ return deliveries.size();
+}
+
+SenderContext::Delivery::Delivery(int32_t i, const qpid::messaging::Message& msg) :
+ id(i), token(0)
+{
+ data.resize(msg.getContentSize() + 15);
+ //TODO: full message encoding including headers etc
+ pn_message_data(&data[0], data.size(), msg.getContentPtr(), msg.getContentSize());
+}
+void SenderContext::Delivery::send(pn_link_t* sender)
+{
+ pn_delivery_tag_t tag;
+ tag.size = sizeof(int32_t);
+ tag.bytes = reinterpret_cast<char*>(&id);
+ token = pn_delivery(sender, tag);
+ pn_send(sender, &data[0], data.size());
+}
+bool SenderContext::Delivery::accepted()
+{
+ return pn_remote_disp(token) == PN_ACCEPTED;
+}
+
+}}} // namespace qpid::messaging::amqp
diff --git a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h
new file mode 100644
index 0000000000..7abdee311d
--- /dev/null
+++ b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h
@@ -0,0 +1,82 @@
+#ifndef QPID_MESSAGING_AMQP_SENDERCONTEXT_H
+#define QPID_MESSAGING_AMQP_SENDERCONTEXT_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <deque>
+#include <string>
+#include <vector>
+#include "qpid/sys/IntegerTypes.h"
+
+struct pn_delivery_t;
+struct pn_link_t;
+struct pn_session_t;
+
+namespace qpid {
+namespace messaging {
+
+class Message;
+
+namespace amqp {
+
+/**
+ *
+ */
+class SenderContext
+{
+ public:
+ class Delivery
+ {
+ public:
+ Delivery(int32_t id, const qpid::messaging::Message& message);
+ void send(pn_link_t*);
+ bool accepted();
+ private:
+ int32_t id;
+ std::vector<char> data;
+ pn_delivery_t* token;
+ };
+
+ SenderContext(pn_session_t* session, const std::string& name, const std::string& target);
+ ~SenderContext();
+ void close();
+ void setCapacity(uint32_t);
+ uint32_t getCapacity();
+ uint32_t getUnsettled();
+ const std::string& getName() const;
+ const std::wstring& getTarget() const;
+ Delivery* send(const qpid::messaging::Message& message);
+ private:
+ friend class ConnectionContext;
+ typedef std::deque<Delivery> Deliveries;
+
+ const std::string name;
+ const std::wstring target;
+ pn_link_t* sender;
+ int32_t nextId;
+ Deliveries deliveries;
+ uint32_t capacity;
+
+ uint32_t processUnsettled();
+};
+}}} // namespace qpid::messaging::amqp
+
+#endif /*!QPID_MESSAGING_AMQP_SENDERCONTEXT_H*/
diff --git a/qpid/cpp/src/qpid/messaging/amqp/SenderHandle.cpp b/qpid/cpp/src/qpid/messaging/amqp/SenderHandle.cpp
new file mode 100644
index 0000000000..b7168e5b31
--- /dev/null
+++ b/qpid/cpp/src/qpid/messaging/amqp/SenderHandle.cpp
@@ -0,0 +1,75 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "SenderHandle.h"
+#include "ConnectionContext.h"
+#include "SessionContext.h"
+#include "SessionHandle.h"
+#include "SenderContext.h"
+#include "qpid/messaging/Duration.h"
+#include "qpid/messaging/exceptions.h"
+#include "qpid/messaging/Message.h"
+#include "qpid/messaging/Session.h"
+
+namespace qpid {
+namespace messaging {
+namespace amqp {
+
+SenderHandle::SenderHandle(boost::shared_ptr<ConnectionContext> c,
+ boost::shared_ptr<SessionContext> s,
+ boost::shared_ptr<SenderContext> sndr
+) : connection(c), session(s), sender(sndr) {}
+
+void SenderHandle::send(const Message& message, bool sync)
+{
+ connection->send(sender, message, sync);
+}
+
+void SenderHandle::close()
+{
+ session->closeSender(getName());
+}
+
+void SenderHandle::setCapacity(uint32_t capacity)
+{
+ connection->setCapacity(sender, capacity);
+}
+
+uint32_t SenderHandle::getCapacity()
+{
+ return connection->getCapacity(sender);
+}
+
+uint32_t SenderHandle::getUnsettled()
+{
+ return connection->getUnsettled(sender);
+}
+
+const std::string& SenderHandle::getName() const
+{
+ return sender->getName();
+}
+
+qpid::messaging::Session SenderHandle::getSession() const
+{
+ return qpid::messaging::Session(new SessionHandle(connection, session));
+}
+
+}}} // namespace qpid::messaging::amqp
diff --git a/qpid/cpp/src/qpid/messaging/amqp/SenderHandle.h b/qpid/cpp/src/qpid/messaging/amqp/SenderHandle.h
new file mode 100644
index 0000000000..3c6b666582
--- /dev/null
+++ b/qpid/cpp/src/qpid/messaging/amqp/SenderHandle.h
@@ -0,0 +1,58 @@
+#ifndef QPID_MESSAGING_AMQP_SENDERHANDLE_H
+#define QPID_MESSAGING_AMQP_SENDERHANDLE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <boost/shared_ptr.hpp>
+#include "qpid/messaging/SenderImpl.h"
+
+namespace qpid {
+namespace messaging {
+namespace amqp {
+
+class ConnectionContext;
+class SessionContext;
+class SenderContext;
+/**
+ *
+ */
+class SenderHandle : public qpid::messaging::SenderImpl
+{
+ public:
+ SenderHandle(boost::shared_ptr<ConnectionContext> connection,
+ boost::shared_ptr<SessionContext> session,
+ boost::shared_ptr<SenderContext> sender
+ );
+ void send(const Message& message, bool sync);
+ void close();
+ void setCapacity(uint32_t);
+ uint32_t getCapacity();
+ uint32_t getUnsettled();
+ const std::string& getName() const;
+ Session getSession() const;
+ private:
+ boost::shared_ptr<ConnectionContext> connection;
+ boost::shared_ptr<SessionContext> session;
+ boost::shared_ptr<SenderContext> sender;
+};
+}}} // namespace qpid::messaging::amqp
+
+#endif /*!QPID_MESSAGING_AMQP_SENDERHANDLE_H*/
diff --git a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp
new file mode 100644
index 0000000000..f7bd9a5e7a
--- /dev/null
+++ b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp
@@ -0,0 +1,146 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "SessionContext.h"
+#include "SenderContext.h"
+#include "ReceiverContext.h"
+#include <boost/format.hpp>
+#include "qpid/messaging/Address.h"
+#include "qpid/messaging/Duration.h"
+#include "qpid/messaging/exceptions.h"
+#include "qpid/log/Statement.h"
+extern "C" {
+#include "proton/engine.h"
+}
+
+namespace qpid {
+namespace messaging {
+namespace amqp {
+
+SessionContext::SessionContext(pn_connection_t* connection) : session(pn_session(connection)) {}
+SessionContext::~SessionContext()
+{
+ senders.clear(); receivers.clear();
+ pn_session_destroy(session);
+}
+
+boost::shared_ptr<SenderContext> SessionContext::createSender(const qpid::messaging::Address& address)
+{
+ std::string name = address.getName();
+
+ int count = 1;
+ for (SenderMap::const_iterator i = senders.find(name); i != senders.end(); i = senders.find(name)) {
+ name = (boost::format("%1%_%2%") % address.getName() % ++count).str();
+ }
+ boost::shared_ptr<SenderContext> s(new SenderContext(session, name, address.str()));
+ senders[name] = s;
+ return s;
+}
+
+boost::shared_ptr<ReceiverContext> SessionContext::createReceiver(const qpid::messaging::Address& address)
+{
+ std::string name = address.getName();
+
+ int count = 1;
+ for (ReceiverMap::const_iterator i = receivers.find(name); i != receivers.end(); i = receivers.find(name)) {
+ name = (boost::format("%1%_%2%") % address.getName() % ++count).str();
+ }
+ boost::shared_ptr<ReceiverContext> r(new ReceiverContext(session, name, address.str()));
+ receivers[name] = r;
+ return r;
+}
+
+boost::shared_ptr<SenderContext> SessionContext::getSender(const std::string& name) const
+{
+ SenderMap::const_iterator i = senders.find(name);
+ if (i == senders.end()) {
+ throw qpid::messaging::KeyError(std::string("No such sender") + name);
+ } else {
+ return i->second;
+ }
+}
+
+boost::shared_ptr<ReceiverContext> SessionContext::getReceiver(const std::string& name) const
+{
+ ReceiverMap::const_iterator i = receivers.find(name);
+ if (i == receivers.end()) {
+ throw qpid::messaging::KeyError(std::string("No such receiver") + name);
+ } else {
+ return i->second;
+ }
+}
+
+void SessionContext::closeReceiver(const std::string&)
+{
+
+}
+
+void SessionContext::closeSender(const std::string&)
+{
+
+}
+
+boost::shared_ptr<ReceiverContext> SessionContext::nextReceiver(qpid::messaging::Duration /*timeout*/)
+{
+ return boost::shared_ptr<ReceiverContext>();
+}
+
+uint32_t SessionContext::getReceivable()
+{
+ return 0;//TODO
+}
+
+uint32_t SessionContext::getUnsettledAcks()
+{
+ return 0;//TODO
+}
+
+qpid::framing::SequenceNumber SessionContext::record(pn_delivery_t* delivery)
+{
+ qpid::framing::SequenceNumber id = next++;
+ unacked[id] = delivery;
+ QPID_LOG(debug, "Recorded delivery " << id << " -> " << delivery);
+ return id;
+}
+
+void SessionContext::acknowledge(DeliveryMap::iterator begin, DeliveryMap::iterator end)
+{
+ for (DeliveryMap::iterator i = begin; i != end; ++i) {
+ QPID_LOG(debug, "Setting disposition for delivery " << i->first << " -> " << i->second);
+ pn_disposition(i->second, PN_ACCEPTED);
+ pn_settle(i->second);//TODO: different settlement modes?
+ }
+ unacked.erase(begin, end);
+}
+
+void SessionContext::acknowledge()
+{
+ acknowledge(unacked.begin(), unacked.end());
+}
+
+void SessionContext::acknowledge(const qpid::framing::SequenceNumber& id, bool cumulative)
+{
+ DeliveryMap::iterator i = unacked.find(id);
+ if (i != unacked.end()) {
+ acknowledge(cumulative ? unacked.begin() : i, ++i);
+ }
+}
+
+}}} // namespace qpid::messaging::amqp
diff --git a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h
new file mode 100644
index 0000000000..fbc8731230
--- /dev/null
+++ b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h
@@ -0,0 +1,80 @@
+#ifndef QPID_MESSAGING_AMQP_SESSIONCONTEXT_H
+#define QPID_MESSAGING_AMQP_SESSIONCONTEXT_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <map>
+#include <string>
+#include <boost/shared_ptr.hpp>
+#include "qpid/sys/IntegerTypes.h"
+#include "qpid/framing/SequenceNumber.h"
+
+struct pn_connection_t;
+struct pn_session_t;
+struct pn_delivery_t;
+
+namespace qpid {
+namespace messaging {
+
+class Address;
+class Duration;
+
+namespace amqp {
+
+class ConnectionContext;
+class SenderContext;
+class ReceiverContext;
+/**
+ *
+ */
+class SessionContext
+{
+ public:
+ SessionContext(pn_connection_t*);
+ ~SessionContext();
+ boost::shared_ptr<SenderContext> createSender(const qpid::messaging::Address& address);
+ boost::shared_ptr<ReceiverContext> createReceiver(const qpid::messaging::Address& address);
+ boost::shared_ptr<SenderContext> getSender(const std::string& name) const;
+ boost::shared_ptr<ReceiverContext> getReceiver(const std::string& name) const;
+ void closeReceiver(const std::string&);
+ void closeSender(const std::string&);
+ boost::shared_ptr<ReceiverContext> nextReceiver(qpid::messaging::Duration timeout);
+ uint32_t getReceivable();
+ uint32_t getUnsettledAcks();
+ private:
+ friend class ConnectionContext;
+ typedef std::map<std::string, boost::shared_ptr<SenderContext> > SenderMap;
+ typedef std::map<std::string, boost::shared_ptr<ReceiverContext> > ReceiverMap;
+ typedef std::map<qpid::framing::SequenceNumber, pn_delivery_t*> DeliveryMap;
+ pn_session_t* session;
+ SenderMap senders;
+ ReceiverMap receivers;
+ DeliveryMap unacked;
+ qpid::framing::SequenceNumber next;
+
+ qpid::framing::SequenceNumber record(pn_delivery_t*);
+ void acknowledge();
+ void acknowledge(const qpid::framing::SequenceNumber& id, bool cummulative);
+ void acknowledge(DeliveryMap::iterator begin, DeliveryMap::iterator end);
+};
+}}} // namespace qpid::messaging::amqp
+
+#endif /*!QPID_MESSAGING_AMQP_SESSIONCONTEXT_H*/
diff --git a/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp b/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp
new file mode 100644
index 0000000000..bf79771ca4
--- /dev/null
+++ b/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp
@@ -0,0 +1,148 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "SessionHandle.h"
+#include "ConnectionContext.h"
+#include "ConnectionHandle.h"
+#include "ReceiverContext.h"
+#include "ReceiverHandle.h"
+#include "SenderContext.h"
+#include "SenderHandle.h"
+#include "SessionContext.h"
+#include "qpid/messaging/Connection.h"
+#include "qpid/messaging/Duration.h"
+#include "qpid/messaging/exceptions.h"
+#include "qpid/messaging/Receiver.h"
+#include "qpid/messaging/Sender.h"
+#include "qpid/messaging/Session.h"
+#include "qpid/log/Statement.h"
+
+namespace qpid {
+namespace messaging {
+namespace amqp {
+
+SessionHandle::SessionHandle(boost::shared_ptr<ConnectionContext> c, boost::shared_ptr<SessionContext> s) : connection(c), session(s) {}
+
+void SessionHandle::commit()
+{
+
+}
+
+void SessionHandle::rollback()
+{
+
+}
+
+void SessionHandle::acknowledge(bool /*sync*/)
+{
+ connection->acknowledge(session, 0, false);
+}
+
+void SessionHandle::acknowledge(qpid::messaging::Message& msg, bool cumulative)
+{
+ //TODO: handle cumulative
+ connection->acknowledge(session, &msg, cumulative);
+}
+
+void SessionHandle::reject(qpid::messaging::Message&)
+{
+
+}
+
+void SessionHandle::release(qpid::messaging::Message&)
+{
+
+}
+
+void SessionHandle::close()
+{
+ connection->endSession(session);
+}
+
+void SessionHandle::sync(bool /*block*/)
+{
+
+}
+
+qpid::messaging::Sender SessionHandle::createSender(const qpid::messaging::Address& address)
+{
+ boost::shared_ptr<SenderContext> sender = session->createSender(address);
+ connection->attach(session, sender);
+ return qpid::messaging::Sender(new SenderHandle(connection, session, sender));
+}
+
+qpid::messaging::Receiver SessionHandle::createReceiver(const qpid::messaging::Address& address)
+{
+ boost::shared_ptr<ReceiverContext> receiver = session->createReceiver(address);
+ connection->attach(session, receiver);
+ return qpid::messaging::Receiver(new ReceiverHandle(connection, session, receiver));
+}
+
+bool SessionHandle::nextReceiver(Receiver& receiver, Duration timeout)
+{
+ boost::shared_ptr<ReceiverContext> r = session->nextReceiver(timeout);
+ if (r) {
+ //TODO: cache handles in this case to avoid frequent allocation
+ receiver = qpid::messaging::Receiver(new ReceiverHandle(connection, session, r));
+ return true;
+ } else {
+ return false;
+ }
+}
+
+qpid::messaging::Receiver SessionHandle::nextReceiver(Duration timeout)
+{
+ qpid::messaging::Receiver r;
+ if (nextReceiver(r, timeout)) return r;
+ else throw qpid::messaging::NoMessageAvailable();
+}
+
+uint32_t SessionHandle::getReceivable()
+{
+ return session->getReceivable();
+}
+
+uint32_t SessionHandle::getUnsettledAcks()
+{
+ return session->getUnsettledAcks();
+}
+
+Sender SessionHandle::getSender(const std::string& name) const
+{
+ return qpid::messaging::Sender(new SenderHandle(connection, session, session->getSender(name)));
+}
+
+Receiver SessionHandle::getReceiver(const std::string& name) const
+{
+ return qpid::messaging::Receiver(new ReceiverHandle(connection, session, session->getReceiver(name)));
+}
+
+Connection SessionHandle::getConnection() const
+{
+ return qpid::messaging::Connection(new ConnectionHandle(connection));
+}
+
+void SessionHandle::checkError()
+{
+
+}
+
+
+}}} // namespace qpid::messaging::amqp
diff --git a/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.h b/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.h
new file mode 100644
index 0000000000..5e843aaacc
--- /dev/null
+++ b/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.h
@@ -0,0 +1,64 @@
+#ifndef QPID_MESSAGING_AMQP_SESSIONIMPL_H
+#define QPID_MESSAGING_AMQP_SESSIONIMPL_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <boost/shared_ptr.hpp>
+#include "qpid/messaging/SessionImpl.h"
+
+namespace qpid {
+namespace messaging {
+namespace amqp {
+
+class ConnectionContext;
+class SessionContext;
+/**
+ *
+ */
+class SessionHandle : public qpid::messaging::SessionImpl
+{
+ public:
+ SessionHandle(boost::shared_ptr<ConnectionContext>, boost::shared_ptr<SessionContext>);
+ void commit();
+ void rollback();
+ void acknowledge(bool sync);
+ void acknowledge(Message&, bool);
+ void reject(Message&);
+ void release(Message&);
+ void close();
+ void sync(bool block);
+ qpid::messaging::Sender createSender(const Address& address);
+ qpid::messaging::Receiver createReceiver(const Address& address);
+ bool nextReceiver(Receiver& receiver, Duration timeout);
+ qpid::messaging::Receiver nextReceiver(Duration timeout);
+ uint32_t getReceivable();
+ uint32_t getUnsettledAcks();
+ qpid::messaging::Sender getSender(const std::string& name) const;
+ qpid::messaging::Receiver getReceiver(const std::string& name) const;
+ qpid::messaging::Connection getConnection() const;
+ void checkError();
+ private:
+ boost::shared_ptr<ConnectionContext> connection;
+ boost::shared_ptr<SessionContext> session;
+};
+}}} // namespace qpid::messaging::amqp
+
+#endif /*!QPID_MESSAGING_AMQP_SESSIONIMPL_H*/
diff --git a/qpid/cpp/src/tests/test_env.sh.in b/qpid/cpp/src/tests/test_env.sh.in
index 5c07bcdc2e..9acdb77aa3 100644
--- a/qpid/cpp/src/tests/test_env.sh.in
+++ b/qpid/cpp/src/tests/test_env.sh.in
@@ -57,6 +57,7 @@ export SENDER_EXEC=$QPID_TEST_EXEC_DIR/sender
# Path
export PATH=$top_builddir/src:$builddir:$srcdir:$PYTHON_COMMANDS:$QPID_TEST_EXEC_DIR:$PYTHON_DIR/commands:$PATH
+export LD_LIBRARY_PATH=@PROTON_LIB@
# Modules
export TEST_STORE_LIB=$testmoduledir/test_store.so