diff options
author | Gordon Sim <gsim@apache.org> | 2012-04-10 14:08:05 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2012-04-10 14:08:05 +0000 |
commit | dd007bcf3cba0074c42a6c13c151241fb6e2aad0 (patch) | |
tree | c3bee6bd265768e35b52595f00b803872574297c | |
parent | 66e1ad222d55a3ed0f4e1198a6bd45188fc76695 (diff) | |
download | qpid-python-dd007bcf3cba0074c42a6c13c151241fb6e2aad0.tar.gz |
Basic AMQP 1.0 support for qpid::messaging API
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/gs-amqp-1-0-sandbox@1311733 13f79535-47bb-0310-9956-ffa450edef68
29 files changed, 2519 insertions, 11 deletions
diff --git a/qpid/cpp/README_AMQP_1.0.txt b/qpid/cpp/README_AMQP_1.0.txt new file mode 100644 index 0000000000..0db15f2b7d --- /dev/null +++ b/qpid/cpp/README_AMQP_1.0.txt @@ -0,0 +1,41 @@ +This branch contains an initial integration of AMQP 1.0 support. This +is provided via the qpid 'proton-c' component, available from +https://svn.apache.org/repos/asf/qpid/proton/trunk/proton-c. + +The (absolute) path to the location of that library should be +specified via the '--with-proton' option to configure. + +At present only the messaging API implementation is integrated +(i.e. client only). I've used the python broker available from +https://github.com/rhs/amqp for my testing so far. + +There is no automatic protocol version detection yet, so you need to +explicitly enable AMQP 1.0 using the 'use_amqp1.0' connection option. + +E.g. start broker with: + +./broker -a -u example-users -n example-nodes + +then run: + +./examples/messaging/spout --connection-options '{use_amqp1.0:True}' --content 'my-message' queue + +then: + +./examples/messaging/drain --connection-options '{use_amqp1.0:True}' queue + +Note: at present the cluster tests don't run correctly from `make check` due to +inability to find the proton library. + +== Status / TODO == + +* client only at present +* there is no support for transactions +* there is no automatic protocol version detection +* only message content is encoded/decoded (not properties/headers) +* only the node names in addresses are actually used at present +* driver-integration: + - the code at present uses the proton driver for IO + - each connection uses its own driver and starts a thread for that purpose; TODO: charing drivers between connections + - there is no integration with the c++ 'transports' (ssl, rdma etc) + - there is no real SASL integration (just simple PLAIN and ANONYMOUS supported by proton driver) diff --git a/qpid/cpp/configure.ac b/qpid/cpp/configure.ac index 6ba5f62f0e..16c4c88083 100644 --- a/qpid/cpp/configure.ac +++ b/qpid/cpp/configure.ac @@ -107,6 +107,22 @@ AC_DISABLE_STATIC AC_PROG_LIBTOOL AC_SUBST([LIBTOOL_DEPS]) +# Allow integration against external AMQP 1.0 protocol engine +AC_ARG_WITH([proton], + [AS_HELP_STRING([--with-proton], + [Base directory for proton project])]) + +PROTON_BASE=$with_proton +PROTON_INCLUDE=$PROTON_BASE/include +PROTON_LIB=$PROTON_BASE +fail=0 +test -f $PROTON_INCLUDE/proton/engine.h || fail=1 +test $fail = 1 && + AC_MSG_ERROR([Please specify path to proton!]) +CXXFLAGS="$CXXFLAGS -I$PROTON_INCLUDE" +LDFLAGS="$LDFLAGS -L$PROTON_LIB -lqpidproton" +AC_SUBST(PROTON_LIB) + # For libraries (libcommon) that use dlopen, dlerror, etc., # test whether we need to link with -ldl. gl_saved_libs=$LIBS diff --git a/qpid/cpp/examples/messaging/spout.cpp b/qpid/cpp/examples/messaging/spout.cpp index cd11a7ad81..c758cb2967 100644 --- a/qpid/cpp/examples/messaging/spout.cpp +++ b/qpid/cpp/examples/messaging/spout.cpp @@ -159,7 +159,8 @@ int main(int argc, char** argv) std::stringstream spoutid; spoutid << id << ":" << count; message.getProperties()["spout-id"] = spoutid.str(); - sender.send(message); + sender.send(message, (count + 1 == options.count)); + //sender.send(message); } session.sync(); connection.close(); diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am index 5dcc4cd210..6b382bcec8 100644 --- a/qpid/cpp/src/Makefile.am +++ b/qpid/cpp/src/Makefile.am @@ -797,6 +797,24 @@ libqpidmessaging_la_SOURCES = \ qpid/messaging/ReceiverImpl.h \ qpid/messaging/SessionImpl.h \ qpid/messaging/FailoverUpdates.cpp \ + qpid/messaging/amqp/ConnectionContext.h \ + qpid/messaging/amqp/ConnectionContext.cpp \ + qpid/messaging/amqp/ConnectionHandle.h \ + qpid/messaging/amqp/ConnectionHandle.cpp \ + qpid/messaging/amqp/Decoder.h \ + qpid/messaging/amqp/Decoder.cpp \ + qpid/messaging/amqp/ReceiverContext.h \ + qpid/messaging/amqp/ReceiverContext.cpp \ + qpid/messaging/amqp/ReceiverHandle.h \ + qpid/messaging/amqp/ReceiverHandle.cpp \ + qpid/messaging/amqp/SenderContext.h \ + qpid/messaging/amqp/SenderContext.cpp \ + qpid/messaging/amqp/SenderHandle.h \ + qpid/messaging/amqp/SenderHandle.cpp \ + qpid/messaging/amqp/SessionContext.h \ + qpid/messaging/amqp/SessionContext.cpp \ + qpid/messaging/amqp/SessionHandle.h \ + qpid/messaging/amqp/SessionHandle.cpp \ qpid/client/amqp0_10/AcceptTracker.h \ qpid/client/amqp0_10/AcceptTracker.cpp \ qpid/client/amqp0_10/AddressResolution.h \ diff --git a/qpid/cpp/src/qpid/framing/ProtocolInitiation.cpp b/qpid/cpp/src/qpid/framing/ProtocolInitiation.cpp index e617015d64..5ae731944c 100644 --- a/qpid/cpp/src/qpid/framing/ProtocolInitiation.cpp +++ b/qpid/cpp/src/qpid/framing/ProtocolInitiation.cpp @@ -36,10 +36,17 @@ void ProtocolInitiation::encode(Buffer& buffer) const { buffer.putOctet('M'); buffer.putOctet('Q'); buffer.putOctet('P'); - buffer.putOctet(1);//class - buffer.putOctet(1);//instance - buffer.putOctet(version.getMajor()); - buffer.putOctet(version.getMinor()); + if (version.getMajor() == 1) { + buffer.putOctet(0); + buffer.putOctet(version.getMajor()); + buffer.putOctet(version.getMinor()); + buffer.putOctet(0);//revision + } else { + buffer.putOctet(1);//class + buffer.putOctet(1);//instance + buffer.putOctet(version.getMajor()); + buffer.putOctet(version.getMinor()); + } } bool ProtocolInitiation::decode(Buffer& buffer){ @@ -48,10 +55,17 @@ bool ProtocolInitiation::decode(Buffer& buffer){ buffer.getOctet();//M buffer.getOctet();//Q buffer.getOctet();//P - buffer.getOctet();//class - buffer.getOctet();//instance - version.setMajor(buffer.getOctet()); - version.setMinor(buffer.getOctet()); + uint8_t protocolClass = buffer.getOctet();//class + if (protocolClass == 1) { + //old (pre-1.0) style + buffer.getOctet();//instance + version.setMajor(buffer.getOctet()); + version.setMinor(buffer.getOctet()); + } else { + version.setMajor(buffer.getOctet()); + version.setMinor(buffer.getOctet()); + buffer.getOctet();//revision + } return true; }else{ return false; diff --git a/qpid/cpp/src/qpid/messaging/Connection.cpp b/qpid/cpp/src/qpid/messaging/Connection.cpp index bd90aa54a7..b9b3f2a4c3 100644 --- a/qpid/cpp/src/qpid/messaging/Connection.cpp +++ b/qpid/cpp/src/qpid/messaging/Connection.cpp @@ -25,6 +25,7 @@ #include "qpid/messaging/SessionImpl.h" #include "qpid/messaging/PrivateImplRef.h" #include "qpid/client/amqp0_10/ConnectionImpl.h" +#include "qpid/messaging/amqp/ConnectionHandle.h" #include "qpid/log/Statement.h" namespace qpid { @@ -44,7 +45,11 @@ Connection::Connection(const std::string& url, const std::string& o) Variant::Map options; AddressParser parser(o); if (o.empty() || parser.parseMap(options)) { - PI::ctor(*this, new qpid::client::amqp0_10::ConnectionImpl(url, options)); + if (options.find("use_amqp1.0") != options.end()) { + PI::ctor(*this, new qpid::messaging::amqp::ConnectionHandle(url, options)); + } else { + PI::ctor(*this, new qpid::client::amqp0_10::ConnectionImpl(url, options)); + } } else { throw InvalidOptionString("Invalid option string: " + o); } @@ -61,7 +66,14 @@ Connection::Connection() PI::ctor(*this, new qpid::client::amqp0_10::ConnectionImpl(url, options)); } -void Connection::open() { impl->open(); } +void Connection::open() +{ + try { + impl->open(); + } catch (const ConnectionError& e) { + QPID_LOG(notice, "In qpid::messaging::Connection::open(), caught: " << e.what()); + } +} bool Connection::isOpen() { return impl->isOpen(); } bool Connection::isOpen() const { return impl->isOpen(); } void Connection::close() { impl->close(); } diff --git a/qpid/cpp/src/qpid/messaging/MessageImpl.cpp b/qpid/cpp/src/qpid/messaging/MessageImpl.cpp index 0601800e46..3ef0fa4541 100644 --- a/qpid/cpp/src/qpid/messaging/MessageImpl.cpp +++ b/qpid/cpp/src/qpid/messaging/MessageImpl.cpp @@ -61,6 +61,7 @@ void MessageImpl::setHeader(const std::string& key, const qpid::types::Variant& //should these methods be on MessageContent? void MessageImpl::setBytes(const std::string& c) { bytes = c; } void MessageImpl::setBytes(const char* chars, size_t count) { bytes.assign(chars, count); } +void MessageImpl::appendBytes(const char* chars, size_t count) { bytes.append(chars, count); } const std::string& MessageImpl::getBytes() const { return bytes; } std::string& MessageImpl::getBytes() { return bytes; } diff --git a/qpid/cpp/src/qpid/messaging/MessageImpl.h b/qpid/cpp/src/qpid/messaging/MessageImpl.h index 57df6b3fda..4510385b6a 100644 --- a/qpid/cpp/src/qpid/messaging/MessageImpl.h +++ b/qpid/cpp/src/qpid/messaging/MessageImpl.h @@ -64,6 +64,7 @@ struct MessageImpl void setBytes(const std::string& bytes); void setBytes(const char* chars, size_t count); + void appendBytes(const char* chars, size_t count); const std::string& getBytes() const; std::string& getBytes(); diff --git a/qpid/cpp/src/qpid/messaging/ReceiverImpl.h b/qpid/cpp/src/qpid/messaging/ReceiverImpl.h index 57059bfd28..e450693d2c 100644 --- a/qpid/cpp/src/qpid/messaging/ReceiverImpl.h +++ b/qpid/cpp/src/qpid/messaging/ReceiverImpl.h @@ -22,10 +22,12 @@ * */ #include "qpid/RefCounted.h" +#include "qpid/sys/IntegerTypes.h" namespace qpid { namespace messaging { +class Duration; class Message; class MessageListener; class Session; diff --git a/qpid/cpp/src/qpid/messaging/SenderImpl.h b/qpid/cpp/src/qpid/messaging/SenderImpl.h index a1ca02c72c..d978463fdb 100644 --- a/qpid/cpp/src/qpid/messaging/SenderImpl.h +++ b/qpid/cpp/src/qpid/messaging/SenderImpl.h @@ -22,6 +22,7 @@ * */ #include "qpid/RefCounted.h" +#include "qpid/sys/IntegerTypes.h" namespace qpid { namespace messaging { diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp new file mode 100644 index 0000000000..a16333b0bb --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp @@ -0,0 +1,480 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "ConnectionContext.h" +#include "Decoder.h" +#include "ReceiverContext.h" +#include "SenderContext.h" +#include "SessionContext.h" +#include "qpid/messaging/exceptions.h" +#include "qpid/messaging/Duration.h" +#include "qpid/messaging/Message.h" +#include "qpid/messaging/MessageImpl.h" +#include "qpid/framing/Uuid.h" +#include "qpid/log/Statement.h" +#include "qpid/sys/Time.h" +#include <vector> + +namespace qpid { +namespace messaging { +namespace amqp { + +/* +ssize_t intercept_input(pn_transport_t *transport, char *bytes, size_t available, void* context, pn_input_fn_t *next) +{ + ConnectionContext* connection = reinterpret_cast<ConnectionContext*>(context); + qpid::sys::ScopedLock<qpid::sys::Monitor> l(connection->lock); + ssize_t result = next(transport, bytes, available); + connection->lock.notifyAll(); + return result; +} +ssize_t intercept_output(pn_transport_t *transport, char *bytes, size_t size, void* context, pn_output_fn_t *next) +{ + ConnectionContext* connection = reinterpret_cast<ConnectionContext*>(context); + qpid::sys::ScopedLock<qpid::sys::Monitor> l(connection->lock); + return next(transport, bytes, size); +} +time_t intercept_tick(pn_transport_t *transport, time_t now, void* context, pn_tick_fn_t *next) +{ + ConnectionContext* connection = reinterpret_cast<ConnectionContext*>(context); + qpid::sys::ScopedLock<qpid::sys::Monitor> l(connection->lock); + return next(transport, now); +} +*/ + +/* +void callback(pn_selectable_t* s) +{ + ConnectionContext* context = reinterpret_cast<ConnectionContext*>(pn_selectable_context(s)); + + if (context->read()) { + context->doInput(); + } + if (context->doOutput()) { + context->write(); + } +} +*/ + +ConnectionContext::ConnectionContext(const std::string& u, const qpid::types::Variant::Map& o) + : url(u), + driver(pn_driver()),//TODO: allow driver to be shared + socket(0), + connection(0), + //transport(0), + options(o), + waitingToWrite(false), + active(false) {} + +ConnectionContext::~ConnectionContext() +{ + if (active.boolCompareAndSwap(true, false)) { + wakeupDriver(); + driverThread.join();//if active was false, then the caller of + //close() will have joined the driver + //thread already + } + pn_connector_destroy(socket); + pn_driver_destroy(driver); + //What else am I responsible for deleting? +} + +void ConnectionContext::open() +{ + qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + //TODO: check not already open + std::string host = "localhost"; + std::string port = "5672"; + + for (Url::const_iterator i = url.begin(); i != url.end(); ++i) { + std::stringstream port; + port << i->port; + socket = pn_connector(driver, i->host.c_str(), port.str().c_str(), this); + if (socket) break; + } + + if (socket) { + pn_sasl_t *sasl = pn_connector_sasl(socket); + + std::string user = url.getUser(); + if (user.size()) {//TODO: proper SASL + pn_sasl_plain(sasl, user.c_str(), url.getPass().c_str()); + } else { + pn_sasl_mechanisms(sasl, "ANONYMOUS"); + pn_sasl_client(sasl); + } + + connection = pn_connector_connection(socket); + pn_connection_open(connection); + active = true; + driverThread = qpid::sys::Thread(this); + //wait for open state + while (pn_sasl_outcome(sasl) == PN_SASL_NONE || + !(pn_connection_state(connection) & (PN_LOCAL_ACTIVE | PN_REMOTE_UNINIT))) { + wait(); + } + if (pn_sasl_outcome(sasl) != PN_SASL_OK) { + throw qpid::messaging::ConnectionError("Authentication failed!"); + } + } else { + //set error state + } +} + +void ConnectionContext::run() +{ + while (active.get()) { + pn_driver_wait(driver); + for (pn_connector_t* c = pn_driver_connector(driver); c; c = pn_driver_connector(driver)) { + ConnectionContext* context = reinterpret_cast<ConnectionContext*>(pn_connector_context(c)); + qpid::sys::ScopedLock<qpid::sys::Monitor> l(context->lock); + pn_connector_process(c); + context->lock.notifyAll(); + } + } +} + +bool ConnectionContext::isOpen() const +{ + qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + return pn_connection_state(connection) & (PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE); +} + +void ConnectionContext::endSession(boost::shared_ptr<SessionContext> ssn) +{ + qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + pn_session_close(ssn->session); + //TODO: need to destroy session and remove context from map + wakeupDriver(); +} + +void ConnectionContext::close() +{ + { + qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + if (!(pn_connection_state(connection) & PN_LOCAL_CLOSED)) { + for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); ++i){ + pn_session_close(i->second->session); + } + sessions.clear(); + pn_connection_close(connection); + wakeupDriver(); + //wait for close to be confirmed by peer? + while (!(pn_connection_state(connection) & PN_REMOTE_CLOSED)) { + wait(); + } + } + } + if (active.boolCompareAndSwap(true, false)) { + wakeupDriver(); + } + driverThread.join(); +} + +bool ConnectionContext::fetch(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk, qpid::messaging::Message& message, qpid::messaging::Duration timeout) +{ + if (!lnk->capacity) { + pn_flow(lnk->receiver, 1); + wakeupDriver(); + } + if (get(ssn, lnk, message, timeout)) { + if (lnk->capacity) { + pn_flow(lnk->receiver, 1);//TODO: is this the right approach? + } + return true; + } else { + //TODO: flush + return get(ssn, lnk, message, qpid::messaging::Duration::IMMEDIATE); + } +} + +qpid::sys::AbsTime convert(qpid::messaging::Duration timeout) +{ + qpid::sys::AbsTime until; + uint64_t ms = timeout.getMilliseconds(); + if (ms < (uint64_t) (qpid::sys::TIME_INFINITE/qpid::sys::TIME_MSEC)) { + return qpid::sys::AbsTime(qpid::sys::now(), ms * qpid::sys::TIME_MSEC); + } else { + return qpid::sys::FAR_FUTURE; + } +} + +bool ConnectionContext::get(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk, qpid::messaging::Message& message, qpid::messaging::Duration timeout) +{ + qpid::sys::AbsTime until(convert(timeout)); + while (until > qpid::sys::now()) { + qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + pn_delivery_t* current = pn_current((pn_link_t*) lnk->receiver); + if (current) { + //TODO: can we avoid copying here? + std::vector<char> data; + data.resize(pn_pending(current)); + ssize_t read = pn_recv(lnk->receiver, &data[0], data.size()); + MessageDataDecoder decoder(MessageImplAccess::get(message)); + decoder.decode(&data[0], read); + MessageImplAccess::get(message).setInternalId(ssn->record(current)); + pn_advance(lnk->receiver); + return true; + } else { + wait(); + } + } + return false; +} + +void ConnectionContext::acknowledge(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message* message, bool cumulative) +{ + qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + if (message) { + ssn->acknowledge(MessageImplAccess::get(*message).getInternalId(), cumulative); + } else { + ssn->acknowledge(); + } + wakeupDriver(); +} + + +void ConnectionContext::attach(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<SenderContext> lnk) +{ + pn_set_target((pn_link_t*) lnk->sender, lnk->getTarget().c_str()); + attach(ssn->session, (pn_link_t*) lnk->sender); +} + +void ConnectionContext::attach(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk) +{ + pn_set_source((pn_link_t*) lnk->receiver, lnk->getSource().c_str()); + attach(ssn->session, (pn_link_t*) lnk->receiver, lnk->capacity); +} + +void ConnectionContext::attach(pn_session_t* /*session*/, pn_link_t* link, int credit) +{ + qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + pn_link_open(link); + if (credit) pn_flow(link, credit); + wakeupDriver(); + while (!(pn_link_state(link) & PN_REMOTE_UNINIT)) wait(); +} + +void ConnectionContext::send(boost::shared_ptr<SenderContext> snd, const qpid::messaging::Message& message, bool sync) +{ + qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + SenderContext::Delivery* delivery(0); + while (!(delivery = snd->send(message))) { + QPID_LOG(debug, "Waiting for capacity..."); + wait();//wait for capacity + } + wakeupDriver(); + if (sync) { + while (!delivery->accepted()) { + QPID_LOG(debug, "Waiting for confirmation..."); + wait();//wait until message has been confirmed + } + } +} + +void ConnectionContext::setCapacity(boost::shared_ptr<SenderContext> sender, uint32_t capacity) +{ + qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + sender->setCapacity(capacity); +} +uint32_t ConnectionContext::getCapacity(boost::shared_ptr<SenderContext> sender) +{ + qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + return sender->getCapacity(); +} +uint32_t ConnectionContext::getUnsettled(boost::shared_ptr<SenderContext> sender) +{ + qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + return sender->getUnsettled(); +} + +void ConnectionContext::setCapacity(boost::shared_ptr<ReceiverContext> receiver, uint32_t capacity) +{ + qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + receiver->setCapacity(capacity); +} +uint32_t ConnectionContext::getCapacity(boost::shared_ptr<ReceiverContext> receiver) +{ + qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + return receiver->getCapacity(); +} +uint32_t ConnectionContext::getAvailable(boost::shared_ptr<ReceiverContext> receiver) +{ + qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + return receiver->getAvailable(); +} +uint32_t ConnectionContext::getUnsettled(boost::shared_ptr<ReceiverContext> receiver) +{ + qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + return receiver->getUnsettled(); +} + +/** + * Expects lock to be held by caller + */ +void ConnectionContext::wakeupDriver() +{ + pn_driver_wakeup(driver); + waitingToWrite = true; + QPID_LOG(debug, "wakeupDriver()"); +} + +void ConnectionContext::wait() +{ + lock.wait(); +} +/* +bool ConnectionContext::read() +{ + int r = pn_selectable_recv(socket, inputBuffer.position(), inputBuffer.capacity()); + if (r > 0) { + inputBuffer.advance(r); + return true; + } else if (r == 0) { + //closed + return false; + } else {//i.e. r < 0 + return false; + } +} + +void ConnectionContext::write() +{ + QPID_LOG(debug, "write() " << outputBuffer.available() << " bytes available"); + int w = pn_selectable_send(socket, outputBuffer.start(), outputBuffer.available()); + if (w < 0) { + //io error + } else { + outputBuffer.consume(w); + QPID_LOG(debug, "Wrote " << w << " bytes"); + } +} + +void ConnectionContext::doInput() +{ + qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + ssize_t n = pn_input(transport, inputBuffer.start(), inputBuffer.available()); + if (n > 0) { + inputBuffer.consume(n); + lock.notifyAll(); + } else if (n < 0) { + QPID_LOG(error, "Error on input. Engine returned code: " << n); + } +} + +bool ConnectionContext::doOutput() +{ + qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + ssize_t n = pn_output(transport, outputBuffer.position(), outputBuffer.capacity()); + if (n < 0) { + //engine error + } else { + outputBuffer.advance(n); + } + lock.notifyAll(); + if (!outputBuffer.available() && !waitingToWrite) { + pn_selectable_flags(socket, PN_SEL_RD); + } + return outputBuffer.available(); +} + +time_t ConnectionContext::doTick(time_t now) +{ + qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + waitingToWrite = false; + return pn_tick(transport, now); +} +*/ + +boost::shared_ptr<SessionContext> ConnectionContext::newSession(bool transactional, const std::string& n) +{ + qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + if (transactional) throw qpid::messaging::MessagingException("Transactions not yet supported"); + std::string name = n.empty() ? qpid::framing::Uuid(true).str() : n; + SessionMap::const_iterator i = sessions.find(name); + if (i == sessions.end()) { + boost::shared_ptr<SessionContext> s(new SessionContext(connection)); + s->session = pn_session(connection); + pn_session_open(s->session); + sessions[name] = s; + QPID_LOG(debug, "Session " << name << " begun"); + wakeupDriver(); + while (pn_session_state(s->session) & PN_REMOTE_UNINIT) wait(); + return s; + } else { + throw qpid::messaging::KeyError(std::string("Session already exists: ") + name); + } + +} +boost::shared_ptr<SessionContext> ConnectionContext::getSession(const std::string& name) const +{ + SessionMap::const_iterator i = sessions.find(name); + if (i == sessions.end()) { + throw qpid::messaging::KeyError(std::string("No such session") + name); + } else { + return i->second; + } +} + +void ConnectionContext::setOption(const std::string& name, const qpid::types::Variant& value) +{ + options[name] = value; +} + +std::string ConnectionContext::getAuthenticatedUsername() +{ + return std::string();//TODO +} + +ConnectionContext::Buffer::Buffer(size_t s) : data(new char[s]), size(s), used(0) {} +ConnectionContext::Buffer::~Buffer() { delete[](data); } +char* ConnectionContext::Buffer::position() +{ + return data + used; +} + +size_t ConnectionContext::Buffer::available() +{ + return used; +} + +size_t ConnectionContext::Buffer::capacity() +{ + return size - used; +} + +char* ConnectionContext::Buffer::start() +{ + return data; +} + +void ConnectionContext::Buffer::advance(size_t bytes) +{ + used += bytes; +} + +void ConnectionContext::Buffer::consume(size_t bytes) +{ + memmove(data, data + bytes, size - bytes); + used -= bytes; +} + + +}}} // namespace qpid::messaging::amqp diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h new file mode 100644 index 0000000000..c176b35fee --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h @@ -0,0 +1,137 @@ +#ifndef QPID_MESSAGING_AMQP_CONNECTIONCONTEXT_H +#define QPID_MESSAGING_AMQP_CONNECTIONCONTEXT_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <deque> +#include <map> +#include <string> +#include <boost/shared_ptr.hpp> +#include "qpid/Url.h" +#include "qpid/sys/AtomicValue.h" +#include "qpid/sys/Monitor.h" +#include "qpid/sys/Runnable.h" +#include "qpid/sys/Thread.h" +#include "qpid/types/Variant.h" +extern "C" { +#include <proton/engine.h> +#include <proton/driver.h> +} + +namespace qpid { +namespace messaging { +class Duration; +class Message; +namespace amqp { + +class ReceiverContext; +class SessionContext; +class SenderContext; + +/** + * + */ +class ConnectionContext : qpid::sys::Runnable +{ + public: + ConnectionContext(const std::string& url, const qpid::types::Variant::Map& options); + ~ConnectionContext(); + void open(); + bool isOpen() const; + void close(); + boost::shared_ptr<SessionContext> newSession(bool transactional, const std::string& name); + boost::shared_ptr<SessionContext> getSession(const std::string& name) const; + void endSession(boost::shared_ptr<SessionContext>); + void attach(boost::shared_ptr<SessionContext>, boost::shared_ptr<SenderContext>); + void attach(boost::shared_ptr<SessionContext>, boost::shared_ptr<ReceiverContext>); + void send(boost::shared_ptr<SenderContext> ctxt, const qpid::messaging::Message& message, bool sync); + bool fetch(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk, qpid::messaging::Message& message, qpid::messaging::Duration timeout); + bool get(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk, qpid::messaging::Message& message, qpid::messaging::Duration timeout); + void acknowledge(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message* message, bool cumulative); + + void setOption(const std::string& name, const qpid::types::Variant& value); + std::string getAuthenticatedUsername(); + + void setCapacity(boost::shared_ptr<SenderContext>, uint32_t); + uint32_t getCapacity(boost::shared_ptr<SenderContext>); + uint32_t getUnsettled(boost::shared_ptr<SenderContext>); + + void setCapacity(boost::shared_ptr<ReceiverContext>, uint32_t); + uint32_t getCapacity(boost::shared_ptr<ReceiverContext>); + uint32_t getAvailable(boost::shared_ptr<ReceiverContext>); + uint32_t getUnsettled(boost::shared_ptr<ReceiverContext>); + + /* + void doInput(); + bool doOutput(); + time_t doTick(time_t now); + bool read(); + void write(); + */ + void readProtocolHeader(); + void writeProtocolHeader(); + + private: + class Buffer + { + public: + Buffer(size_t size=65536); + ~Buffer(); + char* position(); + size_t available(); + size_t capacity(); + char* start(); + void advance(size_t bytes); + void consume(size_t bytes); + private: + char* const data; + const size_t size; + size_t used; + }; + typedef std::map<std::string, boost::shared_ptr<SessionContext> > SessionMap; + qpid::Url url; + pn_driver_t* driver; + pn_connector_t* socket; + pn_connection_t* connection; + qpid::types::Variant::Map options; + SessionMap sessions; + mutable qpid::sys::Monitor lock; + qpid::sys::Thread driverThread; + Buffer inputBuffer; + Buffer outputBuffer; + bool waitingToWrite; + qpid::sys::AtomicValue<bool> active; + + void wait(); + void wakeupDriver(); + void run(); + void attach(pn_session_t*, pn_link_t*, int credit=0); + + /* + friend ssize_t intercept_input(pn_transport_t *transport, char *bytes, size_t available, void* context, pn_input_fn_t *next); + friend ssize_t intercept_output(pn_transport_t *transport, char *bytes, size_t size, void* context, pn_output_fn_t *next); + friend time_t intercept_tick(pn_transport_t *transport, time_t now, void* context, pn_tick_fn_t *next); + */ +}; + +}}} // namespace qpid::messaging::amqp + +#endif /*!QPID_MESSAGING_AMQP_CONNECTIONCONTEXT_H*/ diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionHandle.cpp b/qpid/cpp/src/qpid/messaging/amqp/ConnectionHandle.cpp new file mode 100644 index 0000000000..8fc3182c22 --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionHandle.cpp @@ -0,0 +1,68 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "ConnectionHandle.h" +#include "ConnectionContext.h" +#include "SessionHandle.h" +#include "qpid/messaging/Session.h" + +namespace qpid { +namespace messaging { +namespace amqp { + +ConnectionHandle::ConnectionHandle(const std::string& url, const qpid::types::Variant::Map& options) : connection(new ConnectionContext(url, options)) {} +ConnectionHandle::ConnectionHandle(boost::shared_ptr<ConnectionContext> c) : connection(c) {} + +void ConnectionHandle::open() +{ + connection->open(); +} + +bool ConnectionHandle::isOpen() const +{ + return connection->isOpen(); +} + +void ConnectionHandle::close() +{ + connection->close(); +} + +Session ConnectionHandle::newSession(bool transactional, const std::string& name) +{ + return qpid::messaging::Session(new SessionHandle(connection, connection->newSession(transactional, name))); +} + +Session ConnectionHandle::getSession(const std::string& name) const +{ + return qpid::messaging::Session(new SessionHandle(connection, connection->getSession(name))); +} + +void ConnectionHandle::setOption(const std::string& name, const qpid::types::Variant& value) +{ + connection->setOption(name, value); +} + +std::string ConnectionHandle::getAuthenticatedUsername() +{ + return connection->getAuthenticatedUsername(); +} + +}}} // namespace qpid::messaging::amqp diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionHandle.h b/qpid/cpp/src/qpid/messaging/amqp/ConnectionHandle.h new file mode 100644 index 0000000000..d1eb27f6de --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionHandle.h @@ -0,0 +1,58 @@ +#ifndef QPID_MESSAGING_AMQP_CONNECTIONHANDLE_H +#define QPID_MESSAGING_AMQP_CONNECTIONHANDLE_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <boost/shared_ptr.hpp> +#include "qpid/messaging/ConnectionImpl.h" +#include "qpid/types/Variant.h" + +namespace qpid { +namespace messaging { +namespace amqp { + +class ConnectionContext; +/** + * Handles are directly referenced by applications; Contexts are + * referenced by Handles. This allows a graph structure that + * remains intact as long as the application references any part + * of it, but that can be automatically reclaimed if the whole + * graph becomes unreferenced. + */ +class ConnectionHandle : public qpid::messaging::ConnectionImpl +{ + public: + ConnectionHandle(const std::string& url, const qpid::types::Variant::Map& options); + ConnectionHandle(boost::shared_ptr<ConnectionContext>); + void open(); + bool isOpen() const; + void close(); + Session newSession(bool transactional, const std::string& name); + Session getSession(const std::string& name) const; + void setOption(const std::string& name, const qpid::types::Variant& value); + std::string getAuthenticatedUsername(); + private: + boost::shared_ptr<ConnectionContext> connection; +}; + +}}} // namespace qpid::messaging::amqp_1.0 + +#endif /*!QPID_MESSAGING_AMQP_CONNECTIONHANDLE_H*/ diff --git a/qpid/cpp/src/qpid/messaging/amqp/Decoder.cpp b/qpid/cpp/src/qpid/messaging/amqp/Decoder.cpp new file mode 100644 index 0000000000..c61ae21e0e --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/amqp/Decoder.cpp @@ -0,0 +1,406 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "Decoder.h" +#include "qpid/log/Statement.h" +#include "qpid/messaging/MessageImpl.h" +#include <string.h> + +namespace qpid { +namespace messaging { +namespace amqp { +namespace { +void on_null(void *ctx) +{ + reinterpret_cast<Decoder*>(ctx)->onNull(); +} +void on_bool(void *ctx, bool v) +{ + reinterpret_cast<Decoder*>(ctx)->onBool(v); +} +void on_ubyte(void *ctx, uint8_t v) +{ + reinterpret_cast<Decoder*>(ctx)->onUbyte(v); +} +void on_byte(void *ctx, int8_t v) +{ + reinterpret_cast<Decoder*>(ctx)->onByte(v); +} +void on_ushort(void *ctx, uint16_t v) +{ + reinterpret_cast<Decoder*>(ctx)->onUshort(v); +} +void on_short(void *ctx, int16_t v) +{ + reinterpret_cast<Decoder*>(ctx)->onShort(v); +} +void on_uint(void *ctx, uint32_t v) +{ + reinterpret_cast<Decoder*>(ctx)->onUint(v); +} +void on_int(void *ctx, int32_t v) +{ + reinterpret_cast<Decoder*>(ctx)->onInt(v); +} +void on_float(void *ctx, float v) +{ + reinterpret_cast<Decoder*>(ctx)->onFloat(v); +} +void on_ulong(void *ctx, uint64_t v) +{ + reinterpret_cast<Decoder*>(ctx)->onUlong(v); +} +void on_long(void *ctx, int64_t v) +{ + reinterpret_cast<Decoder*>(ctx)->onLong(v); +} +void on_double(void *ctx, double v) +{ + reinterpret_cast<Decoder*>(ctx)->onDouble(v); +} +void on_binary(void *ctx, size_t size, char *bytes) +{ + reinterpret_cast<Decoder*>(ctx)->onBinary(size, bytes); +} +void on_utf8(void *ctx, size_t size, char *utf8) +{ + reinterpret_cast<Decoder*>(ctx)->onUtf8(size, utf8); +} +void on_symbol(void *ctx, size_t size, char *str) +{ + reinterpret_cast<Decoder*>(ctx)->onSymbol(size, str); +} +void start_descriptor(void *ctx) +{ + reinterpret_cast<Decoder*>(ctx)->startDescriptor(); +} +void stop_descriptor(void *ctx) +{ + reinterpret_cast<Decoder*>(ctx)->stopDescriptor(); +} +void start_array(void *ctx, size_t count, uint8_t code) +{ + reinterpret_cast<Decoder*>(ctx)->startArray(count, code); +} +void stop_array(void *ctx, size_t count, uint8_t code) +{ + reinterpret_cast<Decoder*>(ctx)->stopArray(count, code); +} +void start_list(void *ctx, size_t count) +{ + reinterpret_cast<Decoder*>(ctx)->startList(count); +} +void stop_list(void *ctx, size_t count) +{ + reinterpret_cast<Decoder*>(ctx)->stopList(count); +} +void start_map(void *ctx, size_t count) +{ + reinterpret_cast<Decoder*>(ctx)->startMap(count); +} +void stop_map(void *ctx, size_t count) +{ + reinterpret_cast<Decoder*>(ctx)->stopMap(count); +} +} + +Decoder::~Decoder() {} + +ssize_t Decoder::decode(const char* data, size_t size) +{ + pn_data_callbacks_t callbacks; + callbacks.on_null = &on_null; + callbacks.on_bool = &on_bool; + callbacks.on_ubyte = &on_ubyte; + callbacks.on_byte = &on_byte; + callbacks.on_ushort = &on_ushort; + callbacks.on_short = &on_short; + callbacks.on_uint = &on_uint; + callbacks.on_int = &on_int; + callbacks.on_float = &on_float; + callbacks.on_ulong = &on_ulong; + callbacks.on_long = &on_long; + callbacks.on_double = &on_double; + callbacks.on_binary = &on_binary; + callbacks.on_utf8 = &on_utf8; + callbacks.on_symbol = &on_symbol; + callbacks.start_descriptor = &start_descriptor; + callbacks.stop_descriptor = &stop_descriptor; + callbacks.start_array = &start_array; + callbacks.stop_array = &stop_array; + callbacks.start_list = &start_list; + callbacks.stop_list = &stop_list; + callbacks.start_map = &start_map; + callbacks.stop_map = &stop_map; + size_t total = 0; + while (total < size) { + ssize_t result = pn_read_datum(data + total, size - total, &callbacks, this); + if (result < 0) return result; + else total += result; + } + return total; +} + +void DecoderBase::onNull() { QPID_LOG(debug, this << " onNull()"); } +void DecoderBase::onBool(bool v) { QPID_LOG(debug, this << " onBool(" << v << ")"); } +void DecoderBase::onUbyte(uint8_t v) { QPID_LOG(debug, this << " onUbyte(" << v << ")"); } +void DecoderBase::onByte(int8_t v) { QPID_LOG(debug, this << " onByte(" << v << ")"); } +void DecoderBase::onUshort(uint16_t v) { QPID_LOG(debug, this << " onUshort(" << v << ")"); } +void DecoderBase::onShort(int16_t v) { QPID_LOG(debug, this << " onShort(" << v << ")"); } +void DecoderBase::onUint(uint32_t v) { QPID_LOG(debug, this << " onUint(" << v << ")"); } +void DecoderBase::onInt(int32_t v) { QPID_LOG(debug, this << " onInt(" << v << ")"); } +void DecoderBase::onFloat(float v) { QPID_LOG(debug, this << " onFloat(" << v << ")"); } +void DecoderBase::onUlong(uint64_t v) { QPID_LOG(debug, this << " onUlong(" << v << ")"); } +void DecoderBase::onLong(int64_t v) { QPID_LOG(debug, this << " onLong(" << v << ")"); } +void DecoderBase::onDouble(double v) { QPID_LOG(debug, this << " onDouble(" << v << ")"); } +void DecoderBase::onBinary(size_t size, char*) { QPID_LOG(debug, this << " onBinary(" << size << ")"); } +void DecoderBase::onUtf8(size_t size, char*) { QPID_LOG(debug, this << " onUtf8(" << size << ")"); } +void DecoderBase::onSymbol(size_t size, char*) { QPID_LOG(debug, this << " onSymbol(" << size << ")"); } +void DecoderBase::startDescriptor() { inDescriptor = true; QPID_LOG(debug, this << " startDescriptor()"); } +void DecoderBase::stopDescriptor() { inDescriptor = false; QPID_LOG(debug, this << " stopDescriptor()"); } +void DecoderBase::startArray(size_t count, uint8_t code) { QPID_LOG(debug, this << " startArray(" << count << ", " << code << ")"); } +void DecoderBase::stopArray(size_t count, uint8_t code) { QPID_LOG(debug, this << " stopArray(" << count << ", " << code << ")"); } +void DecoderBase::startList(size_t count) { QPID_LOG(debug, this << " startList(" << count << ")"); } +void DecoderBase::stopList(size_t count) { QPID_LOG(debug, this << " stopList(" << count << ")"); } +void DecoderBase::startMap(size_t count) { QPID_LOG(debug, this << " startMap(" << count << ")"); } +void DecoderBase::stopMap(size_t count) { QPID_LOG(debug, this << " stopMap(" << count << ")"); } +DecoderBase::~DecoderBase() {} + +void ContextSensitiveDecoder::onNull() +{ + if ((decoder = getDecoder())) decoder->onNull(); + else QPID_LOG(debug, this << " onNull() not handled"); + onValue(); +} +void ContextSensitiveDecoder::onBool(bool v) +{ + if ((decoder = getDecoder())) decoder->onBool(v); + else QPID_LOG(debug, this << " onBool(" << v << ") not handled"); + onValue(); +} +void ContextSensitiveDecoder::onUbyte(uint8_t v) +{ + if ((decoder = getDecoder())) decoder->onUbyte(v); + else QPID_LOG(debug, this << " onUbyte(" << v << ") not handled"); + onValue(); +} + +void ContextSensitiveDecoder::onByte(int8_t v) +{ + if ((decoder = getDecoder())) decoder->onByte(v); + else QPID_LOG(debug, this << " onByte(" << v << ") not handled"); + onValue(); +} + +void ContextSensitiveDecoder::onUshort(uint16_t v) +{ + if ((decoder = getDecoder())) decoder->onUshort(v); + else QPID_LOG(debug, this << " onUshort(" << v << ") not handled"); + onValue(); +} + +void ContextSensitiveDecoder::onShort(int16_t v) +{ + if ((decoder = getDecoder())) decoder->onShort(v); + else QPID_LOG(debug, this << " onShort(" << v << ") not handled"); + onValue(); +} + +void ContextSensitiveDecoder::onUint(uint32_t v) +{ + if ((decoder = getDecoder())) decoder->onUint(v); + else QPID_LOG(debug, this << " onUint(" << v << ") not handled"); + onValue(); +} + +void ContextSensitiveDecoder::onInt(int32_t v) +{ + if ((decoder = getDecoder())) decoder->onInt(v); + else QPID_LOG(debug, this << " onInt(" << v << ") not handled"); + onValue(); +} + +void ContextSensitiveDecoder::onFloat(float v) +{ + if ((decoder = getDecoder())) decoder->onFloat(v); + else QPID_LOG(debug, this << " onFloat(" << v << ") not handled"); + onValue(); +} + +void ContextSensitiveDecoder::onUlong(uint64_t v) +{ + if ((decoder = getDecoder())) decoder->onUlong(v); + else QPID_LOG(debug, this << " onUlong(" << v << ") not handled"); + onValue(); +} + +void ContextSensitiveDecoder::onLong(int64_t v) +{ + if ((decoder = getDecoder())) decoder->onLong(v); + else QPID_LOG(debug, this << " onLong(" << v << ") not handled"); + onValue(); +} + +void ContextSensitiveDecoder::onDouble(double v) +{ + if ((decoder = getDecoder())) decoder->onDouble(v); + else QPID_LOG(debug, this << " onDouble(" << v << ") not handled"); + onValue(); +} + +void ContextSensitiveDecoder::onBinary(size_t size, char* bytes) +{ + if ((decoder = getDecoder())) decoder->onBinary(size, bytes); + else QPID_LOG(debug, this << " onBinary(" << size << ") not handled"); + onValue(); +} + +void ContextSensitiveDecoder::onUtf8(size_t size, char* utf8) +{ + if ((decoder = getDecoder())) decoder->onUtf8(size, utf8); + else QPID_LOG(debug, this << " onUtf8(" << size << ") not handled"); + onValue(); +} + +void ContextSensitiveDecoder::onSymbol(size_t size, char* symbol) +{ + if ((decoder = getDecoder())) decoder->onSymbol(size, symbol); + else QPID_LOG(debug, this << " onSymbol(" << size << ") not handled"); + onValue(); +} + +void ContextSensitiveDecoder::startDescriptor() +{ + if ((decoder = getDecoder())) decoder->startDescriptor(); + else QPID_LOG(debug, this << " startDescriptor() not handled"); +} + +void ContextSensitiveDecoder::stopDescriptor() +{ + ++descriptorLevel; + if ((decoder = getDecoder())) decoder->stopDescriptor(); + else QPID_LOG(debug, this << " stopDescriptor() not handled"); +} + +void ContextSensitiveDecoder::startArray(size_t count, uint8_t code) +{ + ++valueLevel; + if ((decoder = getDecoder())) decoder->startArray(count, code); + else QPID_LOG(debug, this << " startArray(" << count << ", " << code << ") not handled"); +} + +void ContextSensitiveDecoder::stopArray(size_t count, uint8_t code) +{ + stopNested(); + if ((decoder = getDecoder())) decoder->stopArray(count, code); + else QPID_LOG(debug, this << " stopArray(" << count << ", " << code << ") not handled"); +} + +void ContextSensitiveDecoder::startList(size_t count) +{ + startNested(); + if ((decoder = getDecoder())) decoder->startList(count); + else QPID_LOG(debug, this << " startList(" << count << ") not handled"); +} + +void ContextSensitiveDecoder::stopList(size_t count) +{ + stopNested(); + if ((decoder = getDecoder())) decoder->stopList(count); + else QPID_LOG(debug, this << " stopList(" << count << ") not handled"); +} + +void ContextSensitiveDecoder::startMap(size_t count) +{ + startNested(); + if ((decoder = getDecoder())) decoder->startMap(count); + else QPID_LOG(debug, this << " startMap(" << count << ") not handled"); +} + +void ContextSensitiveDecoder::stopMap(size_t count) +{ + stopNested(); + if ((decoder = getDecoder())) decoder->stopMap(count); + else QPID_LOG(debug, this << " stopMap(" << count << ") not handled"); +} + +void ContextSensitiveDecoder::onValue() +{ + if (valueLevel == descriptorLevel) { + described = false; + } +} + +void ContextSensitiveDecoder::startNested() +{ + ++valueLevel; +} + +void ContextSensitiveDecoder::stopNested() +{ + if (--valueLevel == descriptorLevel) { + --descriptorLevel; + described = false; + } +} + +ContextSensitiveDecoder::ContextSensitiveDecoder() : described(false), descriptorLevel(-1), valueLevel(0), decoder(0) {} +void ContextSensitiveDecoder::setDecoder(DecoderBase* d) { decoder = d; } +DecoderBase* ContextSensitiveDecoder::getDecoder() { return decoder; } +ContextSensitiveDecoder::~ContextSensitiveDecoder() {} + +DescribedValueDecoder::DescribedValueDecoder(uint64_t c, const std::string& s) : descriptorCode(c), descriptorSymbol(s), + descriptorDecoder(*this), matched(false) {} +DescribedValueDecoder::~DescribedValueDecoder() {} +DecoderBase* DescribedValueDecoder::getDecoder() +{ + if (inDescriptor) return &descriptorDecoder; + else if (described && matched) return this;//when does matched get switched off? + else return 0; +} + +DescribedValueDecoder::DescriptorDecoder::DescriptorDecoder(DescribedValueDecoder& p) : parent(p) {} + +void DescribedValueDecoder::DescriptorDecoder::onSymbol(size_t size, char *str) +{ + parent.matched = (::strncmp(str, parent.descriptorSymbol.c_str(), size) == 0); + QPID_LOG(debug, &parent << ":" << this << ":DescriptorDecoder: onSymbol(" + << std::string(str, size) << " -> " << (parent.matched ? "matched" : "did not match") << ")"); +} +void DescribedValueDecoder::DescriptorDecoder::onUlong(uint64_t v) +{ + parent.matched = (v == parent.descriptorCode); + QPID_LOG(debug, &parent << ":" << this << ":DescriptorDecoder: onUlong(" << v << " -> " << (parent.matched ? "matched" : "did not match") << ")"); +} + +MessageDataDecoder::MessageDataDecoder(qpid::messaging::MessageImpl& m) + : DescribedValueDecoder(0x00000075, "amqp:data:binary"), msg(m) +{ + msg.bytes.clear(); +} + +void MessageDataDecoder::onBinary(size_t size, char *bytes) +{ + QPID_LOG(debug, this << ":MessageDataDecoder: onBinary() with " << size << " bytes"); + msg.appendBytes(bytes, size); +} + + +}}} // namespace qpid::messaging::amqp diff --git a/qpid/cpp/src/qpid/messaging/amqp/Decoder.h b/qpid/cpp/src/qpid/messaging/amqp/Decoder.h new file mode 100644 index 0000000000..b3c883a8d3 --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/amqp/Decoder.h @@ -0,0 +1,153 @@ +#ifndef QPID_MESSAGING_AMQP_DECODER_H +#define QPID_MESSAGING_AMQP_DECODER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/sys/IntegerTypes.h" +#include <string> +extern "C" { +#include "proton/codec.h" +} + +namespace qpid { +namespace messaging { +struct MessageImpl; +namespace amqp { + +class DecoderBase +{ + public: + virtual ~DecoderBase(); + virtual void onNull(); + virtual void onBool(bool v); + virtual void onUbyte(uint8_t v); + virtual void onByte(int8_t v); + virtual void onUshort(uint16_t v); + virtual void onShort(int16_t v); + virtual void onUint(uint32_t v); + virtual void onInt(int32_t v); + virtual void onFloat(float v); + virtual void onUlong(uint64_t v); + virtual void onLong(int64_t v); + virtual void onDouble(double v); + virtual void onBinary(size_t size, char *bytes); + virtual void onUtf8(size_t size, char *utf8); + virtual void onSymbol(size_t size, char *str); + virtual void startDescriptor(); + virtual void stopDescriptor(); + virtual void startArray(size_t count, uint8_t code); + virtual void stopArray(size_t count, uint8_t code); + virtual void startList(size_t count); + virtual void stopList(size_t count); + virtual void startMap(size_t count); + virtual void stopMap(size_t count); + protected: + bool inDescriptor; +}; + +/** + * + */ +class Decoder : public DecoderBase +{ + public: + virtual ~Decoder(); + ssize_t decode(const char*, size_t); +}; + +class ContextSensitiveDecoder : public Decoder +{ + public: + ContextSensitiveDecoder(); + virtual ~ContextSensitiveDecoder(); + protected: + bool described; + ssize_t descriptorLevel; + ssize_t valueLevel; + + void setDecoder(DecoderBase*); + virtual DecoderBase* getDecoder(); + virtual void startNested(); + virtual void stopNested(); + virtual void onValue(); + private: + DecoderBase* decoder; + void onNull(); + void onBool(bool v); + void onUbyte(uint8_t v); + void onByte(int8_t v); + void onUshort(uint16_t v); + void onShort(int16_t v); + void onUint(uint32_t v); + void onInt(int32_t v); + void onFloat(float v); + void onUlong(uint64_t v); + void onLong(int64_t v); + void onDouble(double v); + void onBinary(size_t size, char *bytes); + void onUtf8(size_t size, char *utf8); + void onSymbol(size_t size, char *str); + void startDescriptor(); + void stopDescriptor(); + void startArray(size_t count, uint8_t code); + void stopArray(size_t count, uint8_t code); + void startList(size_t count); + void stopList(size_t count); + void startMap(size_t count); + void stopMap(size_t count); +}; + +class DescribedValueDecoder : public ContextSensitiveDecoder +{ + public: + DescribedValueDecoder(uint64_t descriptorCode, const std::string& descriptorSymbol); + virtual ~DescribedValueDecoder(); + protected: + virtual DecoderBase* getDecoder(); + private: + class DescriptorDecoder : public DecoderBase + { + public: + DescriptorDecoder(DescribedValueDecoder&); + void onSymbol(size_t size, char *str); + void onUlong(uint64_t v); + private: + DescribedValueDecoder& parent; + }; + + uint64_t descriptorCode; + std::string descriptorSymbol; + DescriptorDecoder descriptorDecoder; + bool matched; +}; + +class MessageDataDecoder : public DescribedValueDecoder +{ + public: + MessageDataDecoder(qpid::messaging::MessageImpl& msg); + private: + qpid::messaging::MessageImpl& msg; + void onBinary(size_t size, char *bytes); +}; + +}}} // namespace qpid::messaging::amqp + +#endif /*!QPID_MESSAGING_AMQP_DECODER_H*/ diff --git a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp new file mode 100644 index 0000000000..b9972dc112 --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp @@ -0,0 +1,97 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "ReceiverContext.h" +#include "qpid/messaging/Duration.h" +#include "qpid/messaging/Message.h" +extern "C" { +#include "proton/engine.h" +} + +namespace qpid { +namespace messaging { +namespace amqp { +//TODO: proper conversion to wide string for address +ReceiverContext::ReceiverContext(pn_session_t* session, const std::string& n, const std::string& s) + : name(n), + source(s.begin(), s.end()), + receiver(pn_receiver(session, source.c_str())), + capacity(0) {} +ReceiverContext::~ReceiverContext() +{ + pn_link_destroy(receiver); +} + +void ReceiverContext::setCapacity(uint32_t c) +{ + if (c != capacity) { + //stop + capacity = c; + //reissue credit + } +} + +uint32_t ReceiverContext::getCapacity() +{ + return capacity; +} + +uint32_t ReceiverContext::getAvailable() +{ + uint32_t count(0); + for (pn_delivery_t* d = pn_unsettled_head(receiver); d; d = pn_unsettled_next(d)) { + ++count; + if (d == pn_current(receiver)) break; + } + return count; +} + +uint32_t ReceiverContext::getUnsettled() +{ + uint32_t count(0); + for (pn_delivery_t* d = pn_unsettled_head(receiver); d; d = pn_unsettled_next(d)) { + ++count; + } + return count; +} + +void ReceiverContext::close() +{ + +} + +const std::string& ReceiverContext::getName() const +{ + return name; +} + +const std::wstring& ReceiverContext::getSource() const +{ + return source; +} + +bool ReceiverContext::isClosed() const +{ + return false;//TODO +} + + + +}}} // namespace qpid::messaging::amqp diff --git a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h new file mode 100644 index 0000000000..f83de8e151 --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h @@ -0,0 +1,63 @@ +#ifndef QPID_MESSAGING_AMQP_RECEIVERCONTEXT_H +#define QPID_MESSAGING_AMQP_RECEIVERCONTEXT_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <string> +#include "qpid/sys/IntegerTypes.h" + +struct pn_link_t; +struct pn_session_t; + +namespace qpid { +namespace messaging { + +class Duration; +class Message; + +namespace amqp { + +/** + * + */ +class ReceiverContext +{ + public: + ReceiverContext(pn_session_t* session, const std::string& name, const std::string& source); + ~ReceiverContext(); + void setCapacity(uint32_t); + uint32_t getCapacity(); + uint32_t getAvailable(); + uint32_t getUnsettled(); + void close(); + const std::string& getName() const; + const std::wstring& getSource() const; + bool isClosed() const; + private: + friend class ConnectionContext; + const std::string name; + const std::wstring source; + pn_link_t* receiver; + uint32_t capacity; +}; +}}} // namespace qpid::messaging::amqp + +#endif /*!QPID_MESSAGING_AMQP_RECEIVERCONTEXT_H*/ diff --git a/qpid/cpp/src/qpid/messaging/amqp/ReceiverHandle.cpp b/qpid/cpp/src/qpid/messaging/amqp/ReceiverHandle.cpp new file mode 100644 index 0000000000..9bf64ebb8d --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/amqp/ReceiverHandle.cpp @@ -0,0 +1,106 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "ReceiverHandle.h" +#include "ConnectionContext.h" +#include "SessionContext.h" +#include "SessionHandle.h" +#include "ReceiverContext.h" +#include "qpid/messaging/Duration.h" +#include "qpid/messaging/exceptions.h" +#include "qpid/messaging/Message.h" +#include "qpid/messaging/Session.h" + +namespace qpid { +namespace messaging { +namespace amqp { + +ReceiverHandle::ReceiverHandle(boost::shared_ptr<ConnectionContext> c, + boost::shared_ptr<SessionContext> s, + boost::shared_ptr<ReceiverContext> r +) : connection(c), session(s), receiver(r) {} + + +bool ReceiverHandle::get(qpid::messaging::Message& message, qpid::messaging::Duration timeout) +{ + return connection->get(session, receiver, message, timeout); +} + +qpid::messaging::Message ReceiverHandle::get(qpid::messaging::Duration timeout) +{ + qpid::messaging::Message result; + if (!get(result, timeout)) throw qpid::messaging::NoMessageAvailable(); + return result; +} + +bool ReceiverHandle::fetch(qpid::messaging::Message& message, qpid::messaging::Duration timeout) +{ + return connection->fetch(session, receiver, message, timeout); +} + +qpid::messaging::Message ReceiverHandle::fetch(qpid::messaging::Duration timeout) +{ + qpid::messaging::Message result; + if (!fetch(result, timeout)) throw qpid::messaging::NoMessageAvailable(); + return result; +} + +void ReceiverHandle::setCapacity(uint32_t capacity) +{ + connection->setCapacity(receiver, capacity); +} + +uint32_t ReceiverHandle::getCapacity() +{ + return connection->getCapacity(receiver); +} + +uint32_t ReceiverHandle::getAvailable() +{ + return connection->getAvailable(receiver); +} + +uint32_t ReceiverHandle::getUnsettled() +{ + return connection->getUnsettled(receiver); +} + +void ReceiverHandle::close() +{ + session->closeReceiver(getName()); +} + +const std::string& ReceiverHandle::getName() const +{ + return receiver->getName(); +} + +qpid::messaging::Session ReceiverHandle::getSession() const +{ + //create new SessionHandle instance; i.e. create new handle that shares the same context + return qpid::messaging::Session(new SessionHandle(connection, session)); +} + +bool ReceiverHandle::isClosed() const +{ + return receiver->isClosed(); +} + +}}} // namespace qpid::messaging::amqp diff --git a/qpid/cpp/src/qpid/messaging/amqp/ReceiverHandle.h b/qpid/cpp/src/qpid/messaging/amqp/ReceiverHandle.h new file mode 100644 index 0000000000..a1a6f26025 --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/amqp/ReceiverHandle.h @@ -0,0 +1,63 @@ +#ifndef QPID_MESSAGING_AMQP_RECEIVERHANDLE_H +#define QPID_MESSAGING_AMQP_RECEIVERHANDLE_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <boost/shared_ptr.hpp> +#include "qpid/messaging/ReceiverImpl.h" + +namespace qpid { +namespace messaging { +namespace amqp { + +class ConnectionContext; +class SessionContext; +class ReceiverContext; +/** + * + */ +class ReceiverHandle : public qpid::messaging::ReceiverImpl +{ + public: + ReceiverHandle(boost::shared_ptr<ConnectionContext>, + boost::shared_ptr<SessionContext>, + boost::shared_ptr<ReceiverContext> + ); + bool get(Message& message, qpid::messaging::Duration timeout); + qpid::messaging::Message get(qpid::messaging::Duration timeout); + bool fetch(Message& message, qpid::messaging::Duration timeout); + qpid::messaging::Message fetch(qpid::messaging::Duration timeout); + void setCapacity(uint32_t); + uint32_t getCapacity(); + uint32_t getAvailable(); + uint32_t getUnsettled(); + void close(); + const std::string& getName() const; + qpid::messaging::Session getSession() const; + bool isClosed() const; + private: + boost::shared_ptr<ConnectionContext> connection; + boost::shared_ptr<SessionContext> session; + boost::shared_ptr<ReceiverContext> receiver; +}; +}}} // namespace qpid::messaging::amqp + +#endif /*!QPID_MESSAGING_AMQP_RECEIVERHANDLE_H*/ diff --git a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp new file mode 100644 index 0000000000..521538952d --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp @@ -0,0 +1,116 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "SenderContext.h" +#include "qpid/messaging/exceptions.h" +#include "qpid/messaging/Message.h" +#include "qpid/log/Statement.h" +extern "C" { +#include "proton/engine.h" +#include "proton/message.h" +} + +namespace qpid { +namespace messaging { +namespace amqp { +//TODO: proper conversion to wide string for address +SenderContext::SenderContext(pn_session_t* session, const std::string& n, const std::string& t) + : name(n), + target(t.begin(), t.end()), + sender(pn_sender(session, target.c_str())), capacity(1000) {} + +SenderContext::~SenderContext() +{ + pn_link_destroy(sender); +} + +void SenderContext::close() +{ + +} + +void SenderContext::setCapacity(uint32_t c) +{ + if (c < deliveries.size()) throw qpid::messaging::SenderError("Desired capacity is less than unsettled message count!"); + capacity = c; +} + +uint32_t SenderContext::getCapacity() +{ + return capacity; +} + +uint32_t SenderContext::getUnsettled() +{ + return processUnsettled(); +} + +const std::string& SenderContext::getName() const +{ + return name; +} + +const std::wstring& SenderContext::getTarget() const +{ + return target; +} + +SenderContext::Delivery* SenderContext::send(const qpid::messaging::Message& message) +{ + if (processUnsettled() < capacity) { + deliveries.push_back(Delivery(nextId++, message)); + Delivery& delivery = deliveries.back(); + delivery.send(sender); + return &delivery; + } else { + return 0; + } +} + +uint32_t SenderContext::processUnsettled() +{ + //remove accepted messages from front of deque + while (!deliveries.empty() && deliveries.front().accepted()) { + deliveries.pop_front(); + } + return deliveries.size(); +} + +SenderContext::Delivery::Delivery(int32_t i, const qpid::messaging::Message& msg) : + id(i), token(0) +{ + data.resize(msg.getContentSize() + 15); + //TODO: full message encoding including headers etc + pn_message_data(&data[0], data.size(), msg.getContentPtr(), msg.getContentSize()); +} +void SenderContext::Delivery::send(pn_link_t* sender) +{ + pn_delivery_tag_t tag; + tag.size = sizeof(int32_t); + tag.bytes = reinterpret_cast<char*>(&id); + token = pn_delivery(sender, tag); + pn_send(sender, &data[0], data.size()); +} +bool SenderContext::Delivery::accepted() +{ + return pn_remote_disp(token) == PN_ACCEPTED; +} + +}}} // namespace qpid::messaging::amqp diff --git a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h new file mode 100644 index 0000000000..7abdee311d --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h @@ -0,0 +1,82 @@ +#ifndef QPID_MESSAGING_AMQP_SENDERCONTEXT_H +#define QPID_MESSAGING_AMQP_SENDERCONTEXT_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <deque> +#include <string> +#include <vector> +#include "qpid/sys/IntegerTypes.h" + +struct pn_delivery_t; +struct pn_link_t; +struct pn_session_t; + +namespace qpid { +namespace messaging { + +class Message; + +namespace amqp { + +/** + * + */ +class SenderContext +{ + public: + class Delivery + { + public: + Delivery(int32_t id, const qpid::messaging::Message& message); + void send(pn_link_t*); + bool accepted(); + private: + int32_t id; + std::vector<char> data; + pn_delivery_t* token; + }; + + SenderContext(pn_session_t* session, const std::string& name, const std::string& target); + ~SenderContext(); + void close(); + void setCapacity(uint32_t); + uint32_t getCapacity(); + uint32_t getUnsettled(); + const std::string& getName() const; + const std::wstring& getTarget() const; + Delivery* send(const qpid::messaging::Message& message); + private: + friend class ConnectionContext; + typedef std::deque<Delivery> Deliveries; + + const std::string name; + const std::wstring target; + pn_link_t* sender; + int32_t nextId; + Deliveries deliveries; + uint32_t capacity; + + uint32_t processUnsettled(); +}; +}}} // namespace qpid::messaging::amqp + +#endif /*!QPID_MESSAGING_AMQP_SENDERCONTEXT_H*/ diff --git a/qpid/cpp/src/qpid/messaging/amqp/SenderHandle.cpp b/qpid/cpp/src/qpid/messaging/amqp/SenderHandle.cpp new file mode 100644 index 0000000000..b7168e5b31 --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/amqp/SenderHandle.cpp @@ -0,0 +1,75 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "SenderHandle.h" +#include "ConnectionContext.h" +#include "SessionContext.h" +#include "SessionHandle.h" +#include "SenderContext.h" +#include "qpid/messaging/Duration.h" +#include "qpid/messaging/exceptions.h" +#include "qpid/messaging/Message.h" +#include "qpid/messaging/Session.h" + +namespace qpid { +namespace messaging { +namespace amqp { + +SenderHandle::SenderHandle(boost::shared_ptr<ConnectionContext> c, + boost::shared_ptr<SessionContext> s, + boost::shared_ptr<SenderContext> sndr +) : connection(c), session(s), sender(sndr) {} + +void SenderHandle::send(const Message& message, bool sync) +{ + connection->send(sender, message, sync); +} + +void SenderHandle::close() +{ + session->closeSender(getName()); +} + +void SenderHandle::setCapacity(uint32_t capacity) +{ + connection->setCapacity(sender, capacity); +} + +uint32_t SenderHandle::getCapacity() +{ + return connection->getCapacity(sender); +} + +uint32_t SenderHandle::getUnsettled() +{ + return connection->getUnsettled(sender); +} + +const std::string& SenderHandle::getName() const +{ + return sender->getName(); +} + +qpid::messaging::Session SenderHandle::getSession() const +{ + return qpid::messaging::Session(new SessionHandle(connection, session)); +} + +}}} // namespace qpid::messaging::amqp diff --git a/qpid/cpp/src/qpid/messaging/amqp/SenderHandle.h b/qpid/cpp/src/qpid/messaging/amqp/SenderHandle.h new file mode 100644 index 0000000000..3c6b666582 --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/amqp/SenderHandle.h @@ -0,0 +1,58 @@ +#ifndef QPID_MESSAGING_AMQP_SENDERHANDLE_H +#define QPID_MESSAGING_AMQP_SENDERHANDLE_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <boost/shared_ptr.hpp> +#include "qpid/messaging/SenderImpl.h" + +namespace qpid { +namespace messaging { +namespace amqp { + +class ConnectionContext; +class SessionContext; +class SenderContext; +/** + * + */ +class SenderHandle : public qpid::messaging::SenderImpl +{ + public: + SenderHandle(boost::shared_ptr<ConnectionContext> connection, + boost::shared_ptr<SessionContext> session, + boost::shared_ptr<SenderContext> sender + ); + void send(const Message& message, bool sync); + void close(); + void setCapacity(uint32_t); + uint32_t getCapacity(); + uint32_t getUnsettled(); + const std::string& getName() const; + Session getSession() const; + private: + boost::shared_ptr<ConnectionContext> connection; + boost::shared_ptr<SessionContext> session; + boost::shared_ptr<SenderContext> sender; +}; +}}} // namespace qpid::messaging::amqp + +#endif /*!QPID_MESSAGING_AMQP_SENDERHANDLE_H*/ diff --git a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp new file mode 100644 index 0000000000..f7bd9a5e7a --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp @@ -0,0 +1,146 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "SessionContext.h" +#include "SenderContext.h" +#include "ReceiverContext.h" +#include <boost/format.hpp> +#include "qpid/messaging/Address.h" +#include "qpid/messaging/Duration.h" +#include "qpid/messaging/exceptions.h" +#include "qpid/log/Statement.h" +extern "C" { +#include "proton/engine.h" +} + +namespace qpid { +namespace messaging { +namespace amqp { + +SessionContext::SessionContext(pn_connection_t* connection) : session(pn_session(connection)) {} +SessionContext::~SessionContext() +{ + senders.clear(); receivers.clear(); + pn_session_destroy(session); +} + +boost::shared_ptr<SenderContext> SessionContext::createSender(const qpid::messaging::Address& address) +{ + std::string name = address.getName(); + + int count = 1; + for (SenderMap::const_iterator i = senders.find(name); i != senders.end(); i = senders.find(name)) { + name = (boost::format("%1%_%2%") % address.getName() % ++count).str(); + } + boost::shared_ptr<SenderContext> s(new SenderContext(session, name, address.str())); + senders[name] = s; + return s; +} + +boost::shared_ptr<ReceiverContext> SessionContext::createReceiver(const qpid::messaging::Address& address) +{ + std::string name = address.getName(); + + int count = 1; + for (ReceiverMap::const_iterator i = receivers.find(name); i != receivers.end(); i = receivers.find(name)) { + name = (boost::format("%1%_%2%") % address.getName() % ++count).str(); + } + boost::shared_ptr<ReceiverContext> r(new ReceiverContext(session, name, address.str())); + receivers[name] = r; + return r; +} + +boost::shared_ptr<SenderContext> SessionContext::getSender(const std::string& name) const +{ + SenderMap::const_iterator i = senders.find(name); + if (i == senders.end()) { + throw qpid::messaging::KeyError(std::string("No such sender") + name); + } else { + return i->second; + } +} + +boost::shared_ptr<ReceiverContext> SessionContext::getReceiver(const std::string& name) const +{ + ReceiverMap::const_iterator i = receivers.find(name); + if (i == receivers.end()) { + throw qpid::messaging::KeyError(std::string("No such receiver") + name); + } else { + return i->second; + } +} + +void SessionContext::closeReceiver(const std::string&) +{ + +} + +void SessionContext::closeSender(const std::string&) +{ + +} + +boost::shared_ptr<ReceiverContext> SessionContext::nextReceiver(qpid::messaging::Duration /*timeout*/) +{ + return boost::shared_ptr<ReceiverContext>(); +} + +uint32_t SessionContext::getReceivable() +{ + return 0;//TODO +} + +uint32_t SessionContext::getUnsettledAcks() +{ + return 0;//TODO +} + +qpid::framing::SequenceNumber SessionContext::record(pn_delivery_t* delivery) +{ + qpid::framing::SequenceNumber id = next++; + unacked[id] = delivery; + QPID_LOG(debug, "Recorded delivery " << id << " -> " << delivery); + return id; +} + +void SessionContext::acknowledge(DeliveryMap::iterator begin, DeliveryMap::iterator end) +{ + for (DeliveryMap::iterator i = begin; i != end; ++i) { + QPID_LOG(debug, "Setting disposition for delivery " << i->first << " -> " << i->second); + pn_disposition(i->second, PN_ACCEPTED); + pn_settle(i->second);//TODO: different settlement modes? + } + unacked.erase(begin, end); +} + +void SessionContext::acknowledge() +{ + acknowledge(unacked.begin(), unacked.end()); +} + +void SessionContext::acknowledge(const qpid::framing::SequenceNumber& id, bool cumulative) +{ + DeliveryMap::iterator i = unacked.find(id); + if (i != unacked.end()) { + acknowledge(cumulative ? unacked.begin() : i, ++i); + } +} + +}}} // namespace qpid::messaging::amqp diff --git a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h new file mode 100644 index 0000000000..fbc8731230 --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h @@ -0,0 +1,80 @@ +#ifndef QPID_MESSAGING_AMQP_SESSIONCONTEXT_H +#define QPID_MESSAGING_AMQP_SESSIONCONTEXT_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <map> +#include <string> +#include <boost/shared_ptr.hpp> +#include "qpid/sys/IntegerTypes.h" +#include "qpid/framing/SequenceNumber.h" + +struct pn_connection_t; +struct pn_session_t; +struct pn_delivery_t; + +namespace qpid { +namespace messaging { + +class Address; +class Duration; + +namespace amqp { + +class ConnectionContext; +class SenderContext; +class ReceiverContext; +/** + * + */ +class SessionContext +{ + public: + SessionContext(pn_connection_t*); + ~SessionContext(); + boost::shared_ptr<SenderContext> createSender(const qpid::messaging::Address& address); + boost::shared_ptr<ReceiverContext> createReceiver(const qpid::messaging::Address& address); + boost::shared_ptr<SenderContext> getSender(const std::string& name) const; + boost::shared_ptr<ReceiverContext> getReceiver(const std::string& name) const; + void closeReceiver(const std::string&); + void closeSender(const std::string&); + boost::shared_ptr<ReceiverContext> nextReceiver(qpid::messaging::Duration timeout); + uint32_t getReceivable(); + uint32_t getUnsettledAcks(); + private: + friend class ConnectionContext; + typedef std::map<std::string, boost::shared_ptr<SenderContext> > SenderMap; + typedef std::map<std::string, boost::shared_ptr<ReceiverContext> > ReceiverMap; + typedef std::map<qpid::framing::SequenceNumber, pn_delivery_t*> DeliveryMap; + pn_session_t* session; + SenderMap senders; + ReceiverMap receivers; + DeliveryMap unacked; + qpid::framing::SequenceNumber next; + + qpid::framing::SequenceNumber record(pn_delivery_t*); + void acknowledge(); + void acknowledge(const qpid::framing::SequenceNumber& id, bool cummulative); + void acknowledge(DeliveryMap::iterator begin, DeliveryMap::iterator end); +}; +}}} // namespace qpid::messaging::amqp + +#endif /*!QPID_MESSAGING_AMQP_SESSIONCONTEXT_H*/ diff --git a/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp b/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp new file mode 100644 index 0000000000..bf79771ca4 --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp @@ -0,0 +1,148 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "SessionHandle.h" +#include "ConnectionContext.h" +#include "ConnectionHandle.h" +#include "ReceiverContext.h" +#include "ReceiverHandle.h" +#include "SenderContext.h" +#include "SenderHandle.h" +#include "SessionContext.h" +#include "qpid/messaging/Connection.h" +#include "qpid/messaging/Duration.h" +#include "qpid/messaging/exceptions.h" +#include "qpid/messaging/Receiver.h" +#include "qpid/messaging/Sender.h" +#include "qpid/messaging/Session.h" +#include "qpid/log/Statement.h" + +namespace qpid { +namespace messaging { +namespace amqp { + +SessionHandle::SessionHandle(boost::shared_ptr<ConnectionContext> c, boost::shared_ptr<SessionContext> s) : connection(c), session(s) {} + +void SessionHandle::commit() +{ + +} + +void SessionHandle::rollback() +{ + +} + +void SessionHandle::acknowledge(bool /*sync*/) +{ + connection->acknowledge(session, 0, false); +} + +void SessionHandle::acknowledge(qpid::messaging::Message& msg, bool cumulative) +{ + //TODO: handle cumulative + connection->acknowledge(session, &msg, cumulative); +} + +void SessionHandle::reject(qpid::messaging::Message&) +{ + +} + +void SessionHandle::release(qpid::messaging::Message&) +{ + +} + +void SessionHandle::close() +{ + connection->endSession(session); +} + +void SessionHandle::sync(bool /*block*/) +{ + +} + +qpid::messaging::Sender SessionHandle::createSender(const qpid::messaging::Address& address) +{ + boost::shared_ptr<SenderContext> sender = session->createSender(address); + connection->attach(session, sender); + return qpid::messaging::Sender(new SenderHandle(connection, session, sender)); +} + +qpid::messaging::Receiver SessionHandle::createReceiver(const qpid::messaging::Address& address) +{ + boost::shared_ptr<ReceiverContext> receiver = session->createReceiver(address); + connection->attach(session, receiver); + return qpid::messaging::Receiver(new ReceiverHandle(connection, session, receiver)); +} + +bool SessionHandle::nextReceiver(Receiver& receiver, Duration timeout) +{ + boost::shared_ptr<ReceiverContext> r = session->nextReceiver(timeout); + if (r) { + //TODO: cache handles in this case to avoid frequent allocation + receiver = qpid::messaging::Receiver(new ReceiverHandle(connection, session, r)); + return true; + } else { + return false; + } +} + +qpid::messaging::Receiver SessionHandle::nextReceiver(Duration timeout) +{ + qpid::messaging::Receiver r; + if (nextReceiver(r, timeout)) return r; + else throw qpid::messaging::NoMessageAvailable(); +} + +uint32_t SessionHandle::getReceivable() +{ + return session->getReceivable(); +} + +uint32_t SessionHandle::getUnsettledAcks() +{ + return session->getUnsettledAcks(); +} + +Sender SessionHandle::getSender(const std::string& name) const +{ + return qpid::messaging::Sender(new SenderHandle(connection, session, session->getSender(name))); +} + +Receiver SessionHandle::getReceiver(const std::string& name) const +{ + return qpid::messaging::Receiver(new ReceiverHandle(connection, session, session->getReceiver(name))); +} + +Connection SessionHandle::getConnection() const +{ + return qpid::messaging::Connection(new ConnectionHandle(connection)); +} + +void SessionHandle::checkError() +{ + +} + + +}}} // namespace qpid::messaging::amqp diff --git a/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.h b/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.h new file mode 100644 index 0000000000..5e843aaacc --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.h @@ -0,0 +1,64 @@ +#ifndef QPID_MESSAGING_AMQP_SESSIONIMPL_H +#define QPID_MESSAGING_AMQP_SESSIONIMPL_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <boost/shared_ptr.hpp> +#include "qpid/messaging/SessionImpl.h" + +namespace qpid { +namespace messaging { +namespace amqp { + +class ConnectionContext; +class SessionContext; +/** + * + */ +class SessionHandle : public qpid::messaging::SessionImpl +{ + public: + SessionHandle(boost::shared_ptr<ConnectionContext>, boost::shared_ptr<SessionContext>); + void commit(); + void rollback(); + void acknowledge(bool sync); + void acknowledge(Message&, bool); + void reject(Message&); + void release(Message&); + void close(); + void sync(bool block); + qpid::messaging::Sender createSender(const Address& address); + qpid::messaging::Receiver createReceiver(const Address& address); + bool nextReceiver(Receiver& receiver, Duration timeout); + qpid::messaging::Receiver nextReceiver(Duration timeout); + uint32_t getReceivable(); + uint32_t getUnsettledAcks(); + qpid::messaging::Sender getSender(const std::string& name) const; + qpid::messaging::Receiver getReceiver(const std::string& name) const; + qpid::messaging::Connection getConnection() const; + void checkError(); + private: + boost::shared_ptr<ConnectionContext> connection; + boost::shared_ptr<SessionContext> session; +}; +}}} // namespace qpid::messaging::amqp + +#endif /*!QPID_MESSAGING_AMQP_SESSIONIMPL_H*/ diff --git a/qpid/cpp/src/tests/test_env.sh.in b/qpid/cpp/src/tests/test_env.sh.in index 5c07bcdc2e..9acdb77aa3 100644 --- a/qpid/cpp/src/tests/test_env.sh.in +++ b/qpid/cpp/src/tests/test_env.sh.in @@ -57,6 +57,7 @@ export SENDER_EXEC=$QPID_TEST_EXEC_DIR/sender # Path export PATH=$top_builddir/src:$builddir:$srcdir:$PYTHON_COMMANDS:$QPID_TEST_EXEC_DIR:$PYTHON_DIR/commands:$PATH +export LD_LIBRARY_PATH=@PROTON_LIB@ # Modules export TEST_STORE_LIB=$testmoduledir/test_store.so |