diff options
Diffstat (limited to 'qpid/cpp/src/qpid/messaging/amqp')
-rw-r--r-- | qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp | 8 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp | 244 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h | 33 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/messaging/amqp/PnData.cpp | 84 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/messaging/amqp/PnData.h | 29 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp | 14 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp | 61 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/messaging/amqp/SenderContext.h | 54 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/messaging/amqp/SenderHandle.cpp | 3 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp | 69 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/messaging/amqp/SessionContext.h | 11 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/messaging/amqp/Transaction.cpp | 155 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/messaging/amqp/Transaction.h | 95 |
14 files changed, 665 insertions, 199 deletions
diff --git a/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp index 7f19ca7ec0..2106e21686 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp @@ -510,9 +510,9 @@ void AddressHelper::checkAssertion(pn_terminus_t* terminus, CheckMode mode) requested.erase(j->first); } } else if (key == AUTO_DELETE) { - PnData(data).read(v); + PnData(data).get(v); isAutoDeleted = v.asBool(); - } else if (j != requested.end() && (PnData(data).read(v) && v.asString() == j->second.asString())) { + } else if (j != requested.end() && (PnData(data).get(v) && v.asString() == j->second.asString())) { requested.erase(j->first); } } @@ -646,7 +646,7 @@ void AddressHelper::configure(pn_link_t* link, pn_terminus_t* terminus, CheckMod } else { pn_data_put_ulong(filter, i->descriptorCode); } - PnData(filter).write(i->value); + PnData(filter).put(i->value); pn_data_exit(filter); } pn_data_exit(filter); @@ -733,7 +733,7 @@ void AddressHelper::setNodeProperties(pn_terminus_t* terminus) putLifetimePolicy(data, toLifetimePolicy(i->second.asString())); } else { pn_data_put_symbol(data, convert(i->first)); - PnData(data).write(i->second); + PnData(data).put(i->second); } } pn_data_exit(data); diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp index a0b16c2b4c..d4a7b60e3c 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp @@ -25,8 +25,11 @@ #include "Sasl.h" #include "SenderContext.h" #include "SessionContext.h" +#include "Transaction.h" #include "Transport.h" #include "qpid/amqp/descriptors.h" +#include "qpid/amqp/Encoder.h" +#include "qpid/amqp/Descriptor.h" #include "qpid/messaging/exceptions.h" #include "qpid/messaging/AddressImpl.h" #include "qpid/messaging/Duration.h" @@ -43,6 +46,7 @@ #include "qpid/sys/urlAdd.h" #include "config.h" #include <boost/lexical_cast.hpp> +#include <boost/bind.hpp> #include <vector> extern "C" { #include <proton/engine.h> @@ -151,20 +155,23 @@ ConnectionContext::~ConnectionContext() if (ticker) ticker->cancel(); close(); sessions.clear(); - pn_transport_free(engine); pn_connection_free(connection); + pn_transport_free(engine); } bool ConnectionContext::isOpen() const { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + sys::Monitor::ScopedLock l(lock); return state == CONNECTED && pn_connection_state(connection) & (PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE); } void ConnectionContext::sync(boost::shared_ptr<SessionContext> ssn) { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); - //wait for outstanding sends to settle + sys::Monitor::ScopedLock l(lock); + syncLH(ssn, l); +} + +void ConnectionContext::syncLH(boost::shared_ptr<SessionContext> ssn, sys::Monitor::ScopedLock&) { while (!ssn->settled()) { QPID_LOG(debug, "Waiting for sends to settle on sync()"); wait(ssn);//wait until message has been confirmed @@ -175,18 +182,13 @@ void ConnectionContext::sync(boost::shared_ptr<SessionContext> ssn) void ConnectionContext::endSession(boost::shared_ptr<SessionContext> ssn) { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + sys::Monitor::ScopedLock l(lock); if (pn_session_state(ssn->session) & PN_REMOTE_ACTIVE) { //explicitly release messages that have yet to be fetched for (SessionContext::ReceiverMap::iterator i = ssn->receivers.begin(); i != ssn->receivers.end(); ++i) { drain_and_release_messages(ssn, i->second); } - //wait for outstanding sends to settle - while (!ssn->settled()) { - QPID_LOG(debug, "Waiting for sends to settle before closing"); - wait(ssn);//wait until message has been confirmed - wakeupDriver(); - } + syncLH(ssn, l); } if (pn_session_state(ssn->session) & PN_REMOTE_ACTIVE) { @@ -199,17 +201,11 @@ void ConnectionContext::endSession(boost::shared_ptr<SessionContext> ssn) void ConnectionContext::close() { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + sys::Monitor::ScopedLock l(lock); if (state != CONNECTED) return; if (!(pn_connection_state(connection) & PN_LOCAL_CLOSED)) { for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); ++i) { - //wait for outstanding sends to settle - while (!i->second->settled()) { - QPID_LOG(debug, "Waiting for sends to settle before closing"); - wait(i->second);//wait until message has been confirmed - } - - + syncLH(i->second, l); if (!(pn_session_state(i->second->session) & PN_LOCAL_CLOSED)) { pn_session_close(i->second->session); } @@ -246,7 +242,7 @@ bool ConnectionContext::fetch(boost::shared_ptr<SessionContext> ssn, boost::shar */ qpid::sys::AtomicCount::ScopedIncrement track(lnk->fetching); { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + sys::Monitor::ScopedLock l(lock); checkClosed(ssn, lnk); if (!lnk->capacity) { pn_link_flow(lnk->receiver, 1); @@ -257,10 +253,10 @@ bool ConnectionContext::fetch(boost::shared_ptr<SessionContext> ssn, boost::shar return true; } else { { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + sys::Monitor::ScopedLock l(lock); pn_link_drain(lnk->receiver, 0); wakeupDriver(); - while (pn_link_credit(lnk->receiver) && !pn_link_queued(lnk->receiver)) { + while (pn_link_draining(lnk->receiver) && !pn_link_queued(lnk->receiver)) { QPID_LOG(debug, "Waiting for message or for credit to be drained: credit=" << pn_link_credit(lnk->receiver) << ", queued=" << pn_link_queued(lnk->receiver)); wait(ssn, lnk); } @@ -269,7 +265,7 @@ bool ConnectionContext::fetch(boost::shared_ptr<SessionContext> ssn, boost::shar } } if (get(ssn, lnk, message, qpid::messaging::Duration::IMMEDIATE)) { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + sys::Monitor::ScopedLock l(lock); if (lnk->capacity) { pn_link_flow(lnk->receiver, 1); wakeupDriver(); @@ -296,7 +292,7 @@ bool ConnectionContext::get(boost::shared_ptr<SessionContext> ssn, boost::shared { qpid::sys::AbsTime until(convert(timeout)); while (true) { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + sys::Monitor::ScopedLock l(lock); checkClosed(ssn, lnk); pn_delivery_t* current = pn_link_current((pn_link_t*) lnk->receiver); QPID_LOG(debug, "In ConnectionContext::get(), current=" << current); @@ -320,6 +316,9 @@ bool ConnectionContext::get(boost::shared_ptr<SessionContext> ssn, boost::shared haveOutput = true; } } + // Automatically ack messages if we are in a transaction. + if (ssn->transaction) + acknowledgeLH(ssn, &message, false, l); return true; } else if (until > qpid::sys::now()) { waitUntil(ssn, lnk, until); @@ -334,7 +333,7 @@ boost::shared_ptr<ReceiverContext> ConnectionContext::nextReceiver(boost::shared { qpid::sys::AbsTime until(convert(timeout)); while (true) { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + sys::Monitor::ScopedLock l(lock); checkClosed(ssn); boost::shared_ptr<ReceiverContext> r = ssn->nextReceiver(); if (r) { @@ -347,9 +346,13 @@ boost::shared_ptr<ReceiverContext> ConnectionContext::nextReceiver(boost::shared } } -void ConnectionContext::acknowledge(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message* message, bool cumulative) +void ConnectionContext::acknowledge(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message* message, bool cumulative) { + sys::Monitor::ScopedLock l(lock); + acknowledgeLH(ssn, message, cumulative, l); +} + +void ConnectionContext::acknowledgeLH(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message* message, bool cumulative, sys::Monitor::ScopedLock&) { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); checkClosed(ssn); if (message) { ssn->acknowledge(MessageImplAccess::get(*message).getInternalId(), cumulative); @@ -361,7 +364,7 @@ void ConnectionContext::acknowledge(boost::shared_ptr<SessionContext> ssn, qpid: void ConnectionContext::nack(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message& message, bool reject) { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + sys::Monitor::ScopedLock l(lock); checkClosed(ssn); ssn->nack(MessageImplAccess::get(message).getInternalId(), reject); wakeupDriver(); @@ -369,7 +372,7 @@ void ConnectionContext::nack(boost::shared_ptr<SessionContext> ssn, qpid::messag void ConnectionContext::detach(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<SenderContext> lnk) { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + sys::Monitor::ScopedLock l(lock); if (pn_link_state(lnk->sender) & PN_LOCAL_ACTIVE) { lnk->close(); } @@ -401,7 +404,7 @@ void ConnectionContext::drain_and_release_messages(boost::shared_ptr<SessionCont void ConnectionContext::detach(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk) { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + sys::Monitor::ScopedLock l(lock); drain_and_release_messages(ssn, lnk); if (pn_link_state(lnk->receiver) & PN_LOCAL_ACTIVE) { lnk->close(); @@ -415,7 +418,7 @@ void ConnectionContext::detach(boost::shared_ptr<SessionContext> ssn, boost::sha void ConnectionContext::attach(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<SenderContext> lnk) { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + sys::Monitor::ScopedLock l(lock); lnk->configure(); attach(ssn, lnk->sender); checkClosed(ssn, lnk); @@ -425,7 +428,7 @@ void ConnectionContext::attach(boost::shared_ptr<SessionContext> ssn, boost::sha void ConnectionContext::attach(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk) { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + sys::Monitor::ScopedLock l(lock); lnk->configure(); attach(ssn, lnk->receiver, lnk->capacity); checkClosed(ssn, lnk); @@ -445,11 +448,26 @@ void ConnectionContext::attach(boost::shared_ptr<SessionContext> ssn, pn_link_t* } } -void ConnectionContext::send(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<SenderContext> snd, const qpid::messaging::Message& message, bool sync) +void ConnectionContext::send( + boost::shared_ptr<SessionContext> ssn, + boost::shared_ptr<SenderContext> snd, + const qpid::messaging::Message& message, + bool sync, + SenderContext::Delivery** delivery) +{ + sys::Monitor::ScopedLock l(lock); + sendLH(ssn, snd, message, sync, delivery, l); +} + +void ConnectionContext::sendLH( + boost::shared_ptr<SessionContext> ssn, + boost::shared_ptr<SenderContext> snd, + const qpid::messaging::Message& message, + bool sync, + SenderContext::Delivery** delivery, + sys::Monitor::ScopedLock&) { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); checkClosed(ssn); - SenderContext::Delivery* delivery(0); while (pn_transport_pending(engine) > 65536) { QPID_LOG(debug, "Have " << pn_transport_pending(engine) << " bytes of output pending; waiting for this to be written..."); notifyOnWrite = true; @@ -457,17 +475,17 @@ void ConnectionContext::send(boost::shared_ptr<SessionContext> ssn, boost::share wait(ssn, snd); notifyOnWrite = false; } - while (!snd->send(message, &delivery)) { + while (!snd->send(message, delivery)) { QPID_LOG(debug, "Waiting for capacity..."); wait(ssn, snd);//wait for capacity } wakeupDriver(); - if (sync && delivery) { - while (!delivery->delivered()) { + if (sync && *delivery) { + while (!(*delivery)->delivered()) { QPID_LOG(debug, "Waiting for confirmation..."); wait(ssn, snd);//wait until message has been confirmed } - if (delivery->rejected()) { + if ((*delivery)->rejected()) { throw MessageRejected("Message was rejected by peer"); } @@ -476,46 +494,46 @@ void ConnectionContext::send(boost::shared_ptr<SessionContext> ssn, boost::share void ConnectionContext::setCapacity(boost::shared_ptr<SenderContext> sender, uint32_t capacity) { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + sys::Monitor::ScopedLock l(lock); sender->setCapacity(capacity); } uint32_t ConnectionContext::getCapacity(boost::shared_ptr<SenderContext> sender) { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + sys::Monitor::ScopedLock l(lock); return sender->getCapacity(); } uint32_t ConnectionContext::getUnsettled(boost::shared_ptr<SenderContext> sender) { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + sys::Monitor::ScopedLock l(lock); return sender->getUnsettled(); } void ConnectionContext::setCapacity(boost::shared_ptr<ReceiverContext> receiver, uint32_t capacity) { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + sys::Monitor::ScopedLock l(lock); receiver->setCapacity(capacity); pn_link_flow((pn_link_t*) receiver->receiver, receiver->getCapacity()); wakeupDriver(); } uint32_t ConnectionContext::getCapacity(boost::shared_ptr<ReceiverContext> receiver) { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + sys::Monitor::ScopedLock l(lock); return receiver->getCapacity(); } uint32_t ConnectionContext::getAvailable(boost::shared_ptr<ReceiverContext> receiver) { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + sys::Monitor::ScopedLock l(lock); return receiver->getAvailable(); } uint32_t ConnectionContext::getUnsettled(boost::shared_ptr<ReceiverContext> receiver) { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + sys::Monitor::ScopedLock l(lock); return receiver->getUnsettled(); } void ConnectionContext::activateOutput() { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + sys::Monitor::ScopedLock l(lock); if (state == CONNECTED) wakeupDriver(); } /** @@ -543,8 +561,8 @@ pn_state_t IS_CLOSED = PN_LOCAL_CLOSED | PN_REMOTE_CLOSED; void ConnectionContext::reset() { - pn_transport_free(engine); pn_connection_free(connection); + pn_transport_free(engine); engine = pn_transport(); connection = pn_connection(); @@ -555,7 +573,7 @@ void ConnectionContext::reset() } } -void ConnectionContext::check() { +bool ConnectionContext::check() { if (checkDisconnected()) { if (ConnectionOptions::reconnect) { QPID_LOG(notice, "Auto-reconnecting to " << fullUrl); @@ -564,7 +582,9 @@ void ConnectionContext::check() { } else { throw qpid::messaging::TransportFailure("Disconnected (reconnect disabled)"); } + return true; } + return false; } bool ConnectionContext::checkDisconnected() { @@ -588,7 +608,7 @@ bool ConnectionContext::checkDisconnected() { void ConnectionContext::wait() { - check(); + if (check()) return; // Reconnected, may need to re-test condition. lock.wait(); check(); } @@ -630,6 +650,7 @@ void ConnectionContext::waitUntil(boost::shared_ptr<SessionContext> ssn, boost:: void ConnectionContext::checkClosed(boost::shared_ptr<SessionContext> ssn) { check(); + ssn->error.raise(); if ((pn_session_state(ssn->session) & REQUIRES_CLOSE) == REQUIRES_CLOSE) { pn_condition_t* error = pn_session_remote_condition(ssn->session); std::stringstream text; @@ -690,6 +711,7 @@ void ConnectionContext::checkClosed(boost::shared_ptr<SessionContext> ssn, pn_li void ConnectionContext::restartSession(boost::shared_ptr<SessionContext> s) { + if (s->error) return; pn_session_open(s->session); wakeupDriver(); while (pn_session_state(s->session) & PN_REMOTE_UNINIT) { @@ -718,26 +740,31 @@ void ConnectionContext::restartSession(boost::shared_ptr<SessionContext> s) boost::shared_ptr<SessionContext> ConnectionContext::newSession(bool transactional, const std::string& n) { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); - if (transactional) throw qpid::messaging::MessagingException("Transactions not yet supported"); + boost::shared_ptr<SessionContext> session; std::string name = n.empty() ? qpid::framing::Uuid(true).str() : n; - SessionMap::const_iterator i = sessions.find(name); - if (i == sessions.end()) { - boost::shared_ptr<SessionContext> s(new SessionContext(connection)); - s->setName(name); - s->session = pn_session(connection); - pn_session_open(s->session); - wakeupDriver(); - while (pn_session_state(s->session) & PN_REMOTE_UNINIT) { - wait(); + { + sys::Monitor::ScopedLock l(lock); + SessionMap::const_iterator i = sessions.find(name); + if (i == sessions.end()) { + session = boost::shared_ptr<SessionContext>(new SessionContext(connection)); + session->setName(name); + pn_session_open(session->session); + wakeupDriver(); + sessions[name] = session; // Add it now so it will be restarted if we reconnect in wait() + while (pn_session_state(session->session) & PN_REMOTE_UNINIT) { + wait(); + } + } else { + throw qpid::messaging::KeyError(std::string("Session already exists: ") + name); } - sessions[name] = s; - return s; - } else { - throw qpid::messaging::KeyError(std::string("Session already exists: ") + name); - } + } + if (transactional) { // Outside of lock + startTxSession(session); + } + return session; } + boost::shared_ptr<SessionContext> ConnectionContext::getSession(const std::string& name) const { SessionMap::const_iterator i = sessions.find(name); @@ -760,7 +787,7 @@ std::string ConnectionContext::getAuthenticatedUsername() std::size_t ConnectionContext::decodePlain(const char* buffer, std::size_t size) { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + sys::Monitor::ScopedLock l(lock); QPID_LOG(trace, id << " decode(" << size << ")"); if (readHeader) { size_t decoded = readProtocolHeader(buffer, size); @@ -805,7 +832,7 @@ std::size_t ConnectionContext::decodePlain(const char* buffer, std::size_t size) } std::size_t ConnectionContext::encodePlain(char* buffer, std::size_t size) { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + sys::Monitor::ScopedLock l(lock); QPID_LOG(trace, id << " encode(" << size << ")"); if (writeHeader) { size_t encoded = writeProtocolHeader(buffer, size); @@ -843,19 +870,19 @@ std::size_t ConnectionContext::encodePlain(char* buffer, std::size_t size) } bool ConnectionContext::canEncodePlain() { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + sys::Monitor::ScopedLock l(lock); pn_transport_tick(engine, qpid::sys::Duration::FromEpoch() / qpid::sys::TIME_MSEC); return haveOutput && state == CONNECTED; } void ConnectionContext::closed() { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + sys::Monitor::ScopedLock l(lock); state = DISCONNECTED; lock.notifyAll(); } void ConnectionContext::opened() { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + sys::Monitor::ScopedLock l(lock); state = CONNECTED; lock.notifyAll(); } @@ -921,7 +948,7 @@ const qpid::messaging::ConnectionOptions* ConnectionContext::getOptions() std::size_t ConnectionContext::decode(const char* buffer, std::size_t size) { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + sys::Monitor::ScopedLock l(lock); size_t decoded = 0; try { if (sasl.get() && !sasl->authenticated()) { @@ -939,7 +966,7 @@ std::size_t ConnectionContext::decode(const char* buffer, std::size_t size) } std::size_t ConnectionContext::encode(char* buffer, std::size_t size) { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + sys::Monitor::ScopedLock l(lock); size_t encoded = 0; try { if (sasl.get() && sasl->canEncode()) { @@ -957,7 +984,7 @@ std::size_t ConnectionContext::encode(char* buffer, std::size_t size) } bool ConnectionContext::canEncode() { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + sys::Monitor::ScopedLock l(lock); if (sasl.get()) { try { if (sasl->canEncode()) return true; @@ -978,26 +1005,21 @@ const std::string CLIENT_PPID("qpid.client_ppid"); } void ConnectionContext::setProperties() { - pn_data_t* data = pn_connection_properties(connection); - pn_data_put_map(data); - pn_data_enter(data); - - pn_data_put_symbol(data, PnData::str(CLIENT_PROCESS_NAME)); - std::string processName = sys::SystemInfo::getProcessName(); - pn_data_put_string(data, PnData::str(processName)); - - pn_data_put_symbol(data, PnData::str(CLIENT_PID)); - pn_data_put_int(data, sys::SystemInfo::getProcessId()); - - pn_data_put_symbol(data, PnData::str(CLIENT_PPID)); - pn_data_put_int(data, sys::SystemInfo::getParentProcessId()); - + PnData data(pn_connection_properties(connection)); + pn_data_put_map(data.data); + pn_data_enter(data.data); + data.putSymbol(CLIENT_PROCESS_NAME); + data.putSymbol(sys::SystemInfo::getProcessName()); + data.putSymbol(CLIENT_PID); + data.put(int32_t(sys::SystemInfo::getProcessId())); + data.putSymbol(CLIENT_PPID); + data.put(int32_t(sys::SystemInfo::getParentProcessId())); for (Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) { - pn_data_put_symbol(data, PnData::str(i->first)); - PnData(data).write(i->second); + data.putSymbol(i->first); + data.put(i->second); } - pn_data_exit(data); + pn_data_exit(data.data); } const qpid::sys::SecuritySettings* ConnectionContext::getTransportSecuritySettings() @@ -1007,7 +1029,7 @@ const qpid::sys::SecuritySettings* ConnectionContext::getTransportSecuritySettin void ConnectionContext::open() { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + sys::Monitor::ScopedLock l(lock); if (state != DISCONNECTED) throw qpid::messaging::ConnectionError("Connection was already opened!"); if (!driver) driver = DriverImpl::getDefault(); QPID_LOG(info, "Starting connection to " << fullUrl); @@ -1049,7 +1071,7 @@ void ConnectionContext::autoconnect() void ConnectionContext::reconnect(const Url& url) { QPID_LOG(notice, "Reconnecting to " << url); - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + sys::Monitor::ScopedLock l(lock); if (state != DISCONNECTED) throw qpid::messaging::ConnectionError("Connection was already opened!"); if (!driver) driver = DriverImpl::getDefault(); reset(); @@ -1137,7 +1159,7 @@ bool ConnectionContext::tryOpenAddr(const qpid::Address& addr) { std::string ConnectionContext::getUrl() const { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + sys::Monitor::ScopedLock l(lock); return (state == CONNECTED) ? currentUrl.str() : std::string(); } @@ -1209,6 +1231,40 @@ bool ConnectionContext::CodecAdapter::canEncode() return context.canEncodePlain(); } +void ConnectionContext::startTxSession(boost::shared_ptr<SessionContext> session) { + try { + QPID_LOG(debug, id << " attaching transaction for " << session->getName()); + boost::shared_ptr<Transaction> tx(new Transaction(session->session)); + session->transaction = tx; + attach(session, tx); + tx->declare(boost::bind(&ConnectionContext::send, this, _1, _2, _3, _4, _5), session); + } catch (const Exception& e) { + throw TransactionError(Msg() << "Cannot start transaction: " << e.what()); + } +} + +void ConnectionContext::discharge(boost::shared_ptr<SessionContext> session, bool fail) { + { + sys::Monitor::ScopedLock l(lock); + checkClosed(session); + if (!session->transaction) + throw TransactionError("No Transaction"); + Transaction::SendFunction sendFn = boost::bind( + &ConnectionContext::sendLH, this, _1, _2, _3, _4, _5, boost::ref(l)); + syncLH(session, boost::ref(l)); // Sync to make sure all tx transfers have been received. + session->transaction->discharge(sendFn, session, fail); + session->transaction->declare(sendFn, session); + } +} + +void ConnectionContext::commit(boost::shared_ptr<SessionContext> session) { + discharge(session, false); +} + +void ConnectionContext::rollback(boost::shared_ptr<SessionContext> session) { + discharge(session, true); +} + // setup the transport and connection objects: void ConnectionContext::configureConnection() diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h index 80da9dff10..b687219624 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h +++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h @@ -34,6 +34,7 @@ #include "qpid/sys/Monitor.h" #include "qpid/types/Variant.h" #include "qpid/messaging/amqp/TransportContext.h" +#include "SenderContext.h" struct pn_connection_t; struct pn_link_t; @@ -59,7 +60,6 @@ class DriverImpl; class ReceiverContext; class Sasl; class SessionContext; -class SenderContext; class Transport; /** @@ -82,10 +82,20 @@ class ConnectionContext : public qpid::sys::ConnectionCodec, public qpid::messag void detach(boost::shared_ptr<SessionContext>, boost::shared_ptr<ReceiverContext>); void drain_and_release_messages(boost::shared_ptr<SessionContext>, boost::shared_ptr<ReceiverContext>); bool isClosed(boost::shared_ptr<SessionContext>, boost::shared_ptr<ReceiverContext>); - void send(boost::shared_ptr<SessionContext>, boost::shared_ptr<SenderContext> ctxt, const qpid::messaging::Message& message, bool sync); + + // Link operations + void send(boost::shared_ptr<SessionContext>, boost::shared_ptr<SenderContext> ctxt, + const qpid::messaging::Message& message, bool sync, + SenderContext::Delivery** delivery); + bool fetch(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk, qpid::messaging::Message& message, qpid::messaging::Duration timeout); bool get(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk, qpid::messaging::Message& message, qpid::messaging::Duration timeout); + + // Session operations void acknowledge(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message* message, bool cumulative); + void commit(boost::shared_ptr<SessionContext> ssn); + void rollback(boost::shared_ptr<SessionContext> ssn); + void nack(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message& message, bool reject); void sync(boost::shared_ptr<SessionContext> ssn); boost::shared_ptr<ReceiverContext> nextReceiver(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Duration timeout); @@ -93,10 +103,10 @@ class ConnectionContext : public qpid::sys::ConnectionCodec, public qpid::messag void setOption(const std::string& name, const qpid::types::Variant& value); std::string getAuthenticatedUsername(); + // Link operations void setCapacity(boost::shared_ptr<SenderContext>, uint32_t); uint32_t getCapacity(boost::shared_ptr<SenderContext>); uint32_t getUnsettled(boost::shared_ptr<SenderContext>); - void setCapacity(boost::shared_ptr<ReceiverContext>, uint32_t); uint32_t getCapacity(boost::shared_ptr<ReceiverContext>); uint32_t getAvailable(boost::shared_ptr<ReceiverContext>); @@ -159,9 +169,12 @@ class ConnectionContext : public qpid::sys::ConnectionCodec, public qpid::messag bool notifyOnWrite; boost::intrusive_ptr<qpid::sys::TimerTask> ticker; - void check(); + bool check(); bool checkDisconnected(); void waitNoReconnect(); + + // NOTE: All wait*() functions must be called in a loop that checks for the + // waited condition with the lock held. void wait(); void waitUntil(qpid::sys::AbsTime until); void wait(boost::shared_ptr<SessionContext>); @@ -170,10 +183,12 @@ class ConnectionContext : public qpid::sys::ConnectionCodec, public qpid::messag void wait(boost::shared_ptr<SessionContext>, boost::shared_ptr<SenderContext>); void waitUntil(boost::shared_ptr<SessionContext>, boost::shared_ptr<ReceiverContext>, qpid::sys::AbsTime until); void waitUntil(boost::shared_ptr<SessionContext>, boost::shared_ptr<SenderContext>, qpid::sys::AbsTime until); + void checkClosed(boost::shared_ptr<SessionContext>); void checkClosed(boost::shared_ptr<SessionContext>, boost::shared_ptr<ReceiverContext>); void checkClosed(boost::shared_ptr<SessionContext>, boost::shared_ptr<SenderContext>); void checkClosed(boost::shared_ptr<SessionContext>, pn_link_t*); + void wakeupDriver(); void attach(boost::shared_ptr<SessionContext>, pn_link_t*, int credit=0); void autoconnect(); @@ -194,8 +209,18 @@ class ConnectionContext : public qpid::sys::ConnectionCodec, public qpid::messag std::string getError(); bool useSasl(); void setProperties(); + void configureConnection(); bool checkTransportError(std::string&); + + void discharge(boost::shared_ptr<SessionContext>, bool fail); + void startTxSession(boost::shared_ptr<SessionContext>); + + void syncLH(boost::shared_ptr<SessionContext> ssn, sys::Monitor::ScopedLock&); + void sendLH(boost::shared_ptr<SessionContext>, boost::shared_ptr<SenderContext> ctxt, + const qpid::messaging::Message& message, bool sync, + SenderContext::Delivery** delivery, sys::Monitor::ScopedLock&); + void acknowledgeLH(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message* message, bool cumulative, sys::Monitor::ScopedLock&); }; }}} // namespace qpid::messaging::amqp diff --git a/qpid/cpp/src/qpid/messaging/amqp/PnData.cpp b/qpid/cpp/src/qpid/messaging/amqp/PnData.cpp index 5c57c5b0a3..3309d1a683 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/PnData.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/PnData.cpp @@ -20,34 +20,53 @@ */ #include "PnData.h" #include "qpid/types/encodings.h" +#include "qpid/log/Statement.h" namespace qpid { namespace messaging { namespace amqp { using types::Variant; +using namespace types::encodings; -void PnData::write(const Variant::Map& map) +// TODO aconway 2014-11-20: PnData duplicates functionality of qpid::amqp::Encoder,Decoder. +// Collapse them all into a single proton-based codec. + +void PnData::put(const Variant::Map& map) { pn_data_put_map(data); pn_data_enter(data); for (Variant::Map::const_iterator i = map.begin(); i != map.end(); ++i) { - pn_data_put_string(data, str(i->first)); - write(i->second); + pn_data_put_string(data, bytes(i->first)); + put(i->second); } pn_data_exit(data); } -void PnData::write(const Variant::List& list) + +void PnData::put(const Variant::List& list) { pn_data_put_list(data); pn_data_enter(data); for (Variant::List::const_iterator i = list.begin(); i != list.end(); ++i) { - write(*i); + put(*i); } pn_data_exit(data); } -void PnData::write(const Variant& value) + +void PnData::put(const Variant& value) { + // Open data descriptors associated with the value. + const Variant::List& descriptors = value.getDescriptors(); + for (Variant::List::const_iterator i = descriptors.begin(); i != descriptors.end(); ++i) { + pn_data_put_described(data); + pn_data_enter(data); + if (i->getType() == types::VAR_STRING) + pn_data_put_symbol(data, bytes(i->asString())); + else + pn_data_put_ulong(data, i->asUint64()); + } + + // Put the variant value switch (value.getType()) { case qpid::types::VAR_VOID: pn_data_put_null(data); @@ -65,61 +84,70 @@ void PnData::write(const Variant& value) pn_data_put_double(data, value.asDouble()); break; case qpid::types::VAR_STRING: - pn_data_put_string(data, str(value.asString())); + if (value.getEncoding() == ASCII) + pn_data_put_symbol(data, bytes(value.asString())); + else if (value.getEncoding() == BINARY) + pn_data_put_binary(data, bytes(value.asString())); + else + pn_data_put_string(data, bytes(value.asString())); break; case qpid::types::VAR_MAP: - write(value.asMap()); + put(value.asMap()); break; case qpid::types::VAR_LIST: - write(value.asList()); + put(value.asList()); break; default: break; } + + // Close any descriptors. + for (Variant::List::const_iterator i = descriptors.begin(); i != descriptors.end(); ++i) + pn_data_exit(data); } -bool PnData::read(qpid::types::Variant& value) +bool PnData::get(qpid::types::Variant& value) { - return read(pn_data_type(data), value); + return get(pn_data_type(data), value); } -void PnData::readList(qpid::types::Variant::List& value) +void PnData::getList(qpid::types::Variant::List& value) { size_t count = pn_data_get_list(data); pn_data_enter(data); for (size_t i = 0; i < count && pn_data_next(data); ++i) { qpid::types::Variant e; - if (read(e)) value.push_back(e); + if (get(e)) value.push_back(e); } pn_data_exit(data); } -void PnData::readMap(qpid::types::Variant::Map& value) +void PnData::getMap(qpid::types::Variant::Map& value) { size_t count = pn_data_get_list(data); pn_data_enter(data); for (size_t i = 0; i < (count/2) && pn_data_next(data); ++i) { - std::string key = str(pn_data_get_symbol(data)); + std::string key = string(pn_data_get_symbol(data)); pn_data_next(data); qpid::types::Variant e; - if (read(e)) value[key]= e; + if (get(e)) value[key]= e; } pn_data_exit(data); } -void PnData::readArray(qpid::types::Variant::List& value) +void PnData::getArray(qpid::types::Variant::List& value) { size_t count = pn_data_get_array(data); pn_type_t type = pn_data_get_array_type(data); pn_data_enter(data); for (size_t i = 0; i < count && pn_data_next(data); ++i) { qpid::types::Variant e; - if (read(type, e)) value.push_back(e); + if (get(type, e)) value.push_back(e); } pn_data_exit(data); } -bool PnData::read(pn_type_t type, qpid::types::Variant& value) +bool PnData::get(pn_type_t type, qpid::types::Variant& value) { switch (type) { case PN_NULL: @@ -168,41 +196,41 @@ bool PnData::read(pn_type_t type, qpid::types::Variant& value) value = qpid::types::Uuid(pn_data_get_uuid(data).bytes); return true; case PN_BINARY: - value = str(pn_data_get_binary(data)); + value = string(pn_data_get_binary(data)); value.setEncoding(qpid::types::encodings::BINARY); return true; case PN_STRING: - value = str(pn_data_get_string(data)); + value = string(pn_data_get_string(data)); value.setEncoding(qpid::types::encodings::UTF8); return true; case PN_SYMBOL: - value = str(pn_data_get_string(data)); + value = string(pn_data_get_string(data)); value.setEncoding(qpid::types::encodings::ASCII); return true; case PN_LIST: value = qpid::types::Variant::List(); - readList(value.asList()); + getList(value.asList()); return true; break; case PN_MAP: value = qpid::types::Variant::Map(); - readMap(value.asMap()); + getMap(value.asMap()); return true; case PN_ARRAY: value = qpid::types::Variant::List(); - readArray(value.asList()); + getArray(value.asList()); return true; case PN_DESCRIBED: + // TODO aconway 2014-11-20: get described values. case PN_DECIMAL32: case PN_DECIMAL64: case PN_DECIMAL128: default: return false; } - } -pn_bytes_t PnData::str(const std::string& s) +pn_bytes_t PnData::bytes(const std::string& s) { pn_bytes_t result; result.start = const_cast<char*>(s.data()); @@ -210,7 +238,7 @@ pn_bytes_t PnData::str(const std::string& s) return result; } -std::string PnData::str(const pn_bytes_t& in) +std::string PnData::string(const pn_bytes_t& in) { return std::string(in.start, in.size); } diff --git a/qpid/cpp/src/qpid/messaging/amqp/PnData.h b/qpid/cpp/src/qpid/messaging/amqp/PnData.h index 6d03235432..b0119f88fd 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/PnData.h +++ b/qpid/cpp/src/qpid/messaging/amqp/PnData.h @@ -32,28 +32,29 @@ namespace messaging { namespace amqp { /** - * Helper class to read/write messaging types to/from pn_data_t. + * Helper class to put/get messaging types to/from pn_data_t. */ class PnData { public: - PnData(pn_data_t* d) : data(d) {} + pn_data_t* data; - void write(const types::Variant& value); - void write(const types::Variant::Map& map); - void write(const types::Variant::List& list); + PnData(pn_data_t* d) : data(d) {} - bool read(pn_type_t type, types::Variant& value); - bool read(types::Variant& value); - void readList(types::Variant::List& value); - void readMap(types::Variant::Map& value); - void readArray(types::Variant::List& value); + void put(const types::Variant& value); + void put(const types::Variant::Map& map); + void put(const types::Variant::List& list); + void put(int32_t n) { pn_data_put_int(data, n); } + void putSymbol(const std::string& symbol) { pn_data_put_symbol(data, bytes(symbol)); } - static pn_bytes_t str(const std::string&); - static std::string str(const pn_bytes_t&); + bool get(pn_type_t type, types::Variant& value); + bool get(types::Variant& value); + void getList(types::Variant::List& value); + void getMap(types::Variant::Map& value); + void getArray(types::Variant::List& value); - private: - pn_data_t* data; + static pn_bytes_t bytes(const std::string&); + static std::string string(const pn_bytes_t&); }; }}} // namespace messaging::amqp diff --git a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp index 5e0707056f..a28509b0b1 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp @@ -37,9 +37,10 @@ ReceiverContext::ReceiverContext(pn_session_t* session, const std::string& n, co helper(address), receiver(pn_receiver(session, name.c_str())), capacity(0), used(0) {} + ReceiverContext::~ReceiverContext() { - pn_link_free(receiver); + if (receiver) pn_link_free(receiver); } void ReceiverContext::setCapacity(uint32_t c) @@ -63,12 +64,13 @@ uint32_t ReceiverContext::getAvailable() uint32_t ReceiverContext::getUnsettled() { + assert(pn_link_unsettled(receiver) >= pn_link_queued(receiver)); return pn_link_unsettled(receiver) - pn_link_queued(receiver); } void ReceiverContext::close() { - pn_link_close(receiver); + if (receiver) pn_link_close(receiver); } const std::string& ReceiverContext::getName() const @@ -96,7 +98,7 @@ void ReceiverContext::verify() } void ReceiverContext::configure() { - configure(pn_link_source(receiver)); + if (receiver) configure(pn_link_source(receiver)); } void ReceiverContext::configure(pn_terminus_t* source) { @@ -116,13 +118,13 @@ Address ReceiverContext::getAddress() const void ReceiverContext::reset(pn_session_t* session) { - receiver = pn_receiver(session, name.c_str()); - configure(); + receiver = session ? pn_receiver(session, name.c_str()) : 0; + if (receiver) configure(); } bool ReceiverContext::hasCurrent() { - return pn_link_current(receiver); + return receiver && pn_link_current(receiver); } bool ReceiverContext::wakeupToIssueCredit() diff --git a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp index 2a48b2241a..b12af5eb25 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp @@ -18,8 +18,10 @@ * under the License. * */ -#include "qpid/messaging/amqp/SenderContext.h" -#include "qpid/messaging/amqp/EncodedMessage.h" +#include "SenderContext.h" +#include "Transaction.h" +#include "EncodedMessage.h" +#include "PnData.h" #include "qpid/messaging/AddressImpl.h" #include "qpid/messaging/exceptions.h" #include "qpid/Exception.h" @@ -40,22 +42,29 @@ extern "C" { namespace qpid { namespace messaging { namespace amqp { + //TODO: proper conversion to wide string for address -SenderContext::SenderContext(pn_session_t* session, const std::string& n, const qpid::messaging::Address& a, bool setToOnSend_) - : name(n), +SenderContext::SenderContext(pn_session_t* session, const std::string& n, + const qpid::messaging::Address& a, + bool setToOnSend_, + const CoordinatorPtr& coord) + : sender(pn_sender(session, n.c_str())), + name(n), address(a), helper(address), - sender(pn_sender(session, n.c_str())), nextId(0), capacity(50), unreliable(helper.isUnreliable()), - setToOnSend(setToOnSend_) {} + nextId(0), capacity(50), unreliable(helper.isUnreliable()), + setToOnSend(setToOnSend_), + transaction(coord) +{} SenderContext::~SenderContext() { - pn_link_free(sender); + if (sender) pn_link_free(sender); } void SenderContext::close() { - pn_link_close(sender); + if (sender) pn_link_close(sender); } void SenderContext::setCapacity(uint32_t c) @@ -88,10 +97,13 @@ bool SenderContext::send(const qpid::messaging::Message& message, SenderContext: { resend();//if there are any messages needing to be resent at the front of the queue, send them first if (processUnsettled(false) < capacity && pn_link_credit(sender)) { + types::Variant state; + if (transaction) + state = transaction->getSendState(); if (unreliable) { Delivery delivery(nextId++); delivery.encode(MessageImplAccess::get(message), address, setToOnSend); - delivery.send(sender, unreliable); + delivery.send(sender, unreliable, state); *out = 0; return true; } else { @@ -99,7 +111,7 @@ bool SenderContext::send(const qpid::messaging::Message& message, SenderContext: try { Delivery& delivery = deliveries.back(); delivery.encode(MessageImplAccess::get(message), address, setToOnSend); - delivery.send(sender, unreliable); + delivery.send(sender, unreliable, state); *out = &delivery; return true; } catch (const std::exception& e) { @@ -507,7 +519,8 @@ void SenderContext::Delivery::encode(const qpid::messaging::MessageImpl& msg, co throw SendError(e.what()); } } -void SenderContext::Delivery::send(pn_link_t* sender, bool unreliable) + +void SenderContext::Delivery::send(pn_link_t* sender, bool unreliable, const types::Variant& state) { pn_delivery_tag_t tag; tag.size = sizeof(id); @@ -517,6 +530,11 @@ void SenderContext::Delivery::send(pn_link_t* sender, bool unreliable) tag.bytes = reinterpret_cast<const char*>(&id); #endif token = pn_delivery(sender, tag); + if (!state.isVoid()) { // Add transaction state + PnData data(pn_disposition_data(pn_delivery_local(token))); + data.put(state); + pn_delivery_update(token, qpid::amqp::transaction::TRANSACTIONAL_STATE_CODE); + } pn_link_send(sender, encoded.getData(), encoded.getSize()); if (unreliable) { pn_delivery_settle(token); @@ -551,6 +569,15 @@ bool SenderContext::Delivery::rejected() { return pn_delivery_remote_state(token) == PN_REJECTED; } + +std::string SenderContext::Delivery::error() +{ + pn_condition_t *condition = pn_disposition_condition(pn_delivery_remote(token)); + return (condition && pn_condition_is_set(condition)) ? + Msg() << pn_condition_get_name(condition) << ": " << pn_condition_get_description(condition) : + std::string(); +} + void SenderContext::Delivery::settle() { pn_delivery_settle(token); @@ -570,10 +597,12 @@ void SenderContext::verify() helper.checkAssertion(target, AddressHelper::FOR_SENDER); } + void SenderContext::configure() { - configure(pn_link_target(sender)); + if (sender) configure(pn_link_target(sender)); } + void SenderContext::configure(pn_terminus_t* target) { helper.configure(sender, target, AddressHelper::FOR_SENDER); @@ -603,12 +632,10 @@ Address SenderContext::getAddress() const void SenderContext::reset(pn_session_t* session) { - sender = pn_sender(session, name.c_str()); - configure(); - - for (Deliveries::iterator i = deliveries.begin(); i != deliveries.end(); ++i) { + sender = session ? pn_sender(session, name.c_str()) : 0; + if (sender) configure(); + for (Deliveries::iterator i = deliveries.begin(); i != deliveries.end(); ++i) i->reset(); - } } void SenderContext::resend() diff --git a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h index 66e45a85a6..4d3c4bee79 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h +++ b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h @@ -24,6 +24,7 @@ #include <deque> #include <string> #include <vector> +#include <boost/shared_ptr.hpp> #include "qpid/sys/IntegerTypes.h" #include "qpid/messaging/Address.h" #include "qpid/messaging/amqp/AddressHelper.h" @@ -41,9 +42,10 @@ class Message; class MessageImpl; namespace amqp { -/** - * - */ + +class Transaction; + + class SenderContext { public: @@ -52,13 +54,15 @@ class SenderContext public: Delivery(int32_t id); void encode(const qpid::messaging::MessageImpl& message, const qpid::messaging::Address&, bool setToField); - void send(pn_link_t*, bool unreliable); + void send(pn_link_t*, bool unreliable, const types::Variant& state=types::Variant()); bool delivered(); bool accepted(); bool rejected(); void settle(); void reset(); bool sent() const; + pn_delivery_t* getToken() const { return token; } + std::string error(); private: int32_t id; pn_delivery_t* token; @@ -66,22 +70,32 @@ class SenderContext bool presettled; }; - SenderContext(pn_session_t* session, const std::string& name, const qpid::messaging::Address& target, bool setToOnSend); + typedef boost::shared_ptr<Transaction> CoordinatorPtr; + + SenderContext(pn_session_t* session, const std::string& name, + const qpid::messaging::Address& target, + bool setToOnSend, + const CoordinatorPtr& transaction = CoordinatorPtr()); ~SenderContext(); - void reset(pn_session_t* session); - void close(); - void setCapacity(uint32_t); - uint32_t getCapacity(); - uint32_t getUnsettled(); - const std::string& getName() const; - const std::string& getTarget() const; - bool send(const qpid::messaging::Message& message, Delivery**); - void configure(); - void verify(); - void check(); - bool settled(); - bool closed(); - Address getAddress() const; + + virtual void reset(pn_session_t* session); + virtual void close(); + virtual void setCapacity(uint32_t); + virtual uint32_t getCapacity(); + virtual uint32_t getUnsettled(); + virtual const std::string& getName() const; + virtual const std::string& getTarget() const; + virtual bool send(const qpid::messaging::Message& message, Delivery**); + virtual void configure(); + virtual void verify(); + virtual void check(); + virtual bool settled(); + virtual bool closed(); + virtual Address getAddress() const; + + protected: + pn_link_t* sender; + private: friend class ConnectionContext; typedef std::deque<Delivery> Deliveries; @@ -89,12 +103,12 @@ class SenderContext const std::string name; qpid::messaging::Address address; AddressHelper helper; - pn_link_t* sender; int32_t nextId; Deliveries deliveries; uint32_t capacity; bool unreliable; bool setToOnSend; + boost::shared_ptr<Transaction> transaction; uint32_t processUnsettled(bool silent); void configure(pn_terminus_t*); diff --git a/qpid/cpp/src/qpid/messaging/amqp/SenderHandle.cpp b/qpid/cpp/src/qpid/messaging/amqp/SenderHandle.cpp index 367db701cb..98f2d34e7d 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/SenderHandle.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/SenderHandle.cpp @@ -39,7 +39,8 @@ SenderHandle::SenderHandle(boost::shared_ptr<ConnectionContext> c, void SenderHandle::send(const Message& message, bool sync) { - connection->send(session, sender, message, sync); + SenderContext::Delivery* d = 0; + connection->send(session, sender, message, sync, &d); } void SenderHandle::close() diff --git a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp index 824b958af3..2b82ffc377 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp @@ -21,11 +21,15 @@ #include "SessionContext.h" #include "SenderContext.h" #include "ReceiverContext.h" +#include "Transaction.h" +#include "PnData.h" #include <boost/format.hpp> #include "qpid/messaging/Address.h" #include "qpid/messaging/Duration.h" #include "qpid/messaging/exceptions.h" #include "qpid/log/Statement.h" +#include "qpid/amqp/descriptors.h" + extern "C" { #include <proton/engine.h> } @@ -35,23 +39,32 @@ namespace messaging { namespace amqp { SessionContext::SessionContext(pn_connection_t* connection) : session(pn_session(connection)) {} + SessionContext::~SessionContext() { - senders.clear(); receivers.clear(); - pn_session_free(session); + // Clear all pointers to senders and receivers before we free the session. + senders.clear(); + receivers.clear(); + transaction.reset(); // Transaction is a sender. + if (!error && session) + pn_session_free(session); } boost::shared_ptr<SenderContext> SessionContext::createSender(const qpid::messaging::Address& address, bool setToOnSend) { + error.raise(); std::string name = AddressHelper::getLinkName(address); - if (senders.find(name) != senders.end()) throw LinkError("Link name must be unique within the scope of the connection"); - boost::shared_ptr<SenderContext> s(new SenderContext(session, name, address, setToOnSend)); + if (senders.find(name) != senders.end()) + throw LinkError("Link name must be unique within the scope of the connection"); + boost::shared_ptr<SenderContext> s( + new SenderContext(session, name, address, setToOnSend, transaction)); senders[name] = s; return s; } boost::shared_ptr<ReceiverContext> SessionContext::createReceiver(const qpid::messaging::Address& address) { + error.raise(); std::string name = AddressHelper::getLinkName(address); if (receivers.find(name) != receivers.end()) throw LinkError("Link name must be unique within the scope of the connection"); boost::shared_ptr<ReceiverContext> r(new ReceiverContext(session, name, address)); @@ -61,6 +74,7 @@ boost::shared_ptr<ReceiverContext> SessionContext::createReceiver(const qpid::me boost::shared_ptr<SenderContext> SessionContext::getSender(const std::string& name) const { + error.raise(); SenderMap::const_iterator i = senders.find(name); if (i == senders.end()) { throw qpid::messaging::KeyError(std::string("No such sender") + name); @@ -71,6 +85,7 @@ boost::shared_ptr<SenderContext> SessionContext::getSender(const std::string& na boost::shared_ptr<ReceiverContext> SessionContext::getReceiver(const std::string& name) const { + error.raise(); ReceiverMap::const_iterator i = receivers.find(name); if (i == receivers.end()) { throw qpid::messaging::KeyError(std::string("No such receiver") + name); @@ -81,16 +96,19 @@ boost::shared_ptr<ReceiverContext> SessionContext::getReceiver(const std::string void SessionContext::removeReceiver(const std::string& n) { + error.raise(); receivers.erase(n); } void SessionContext::removeSender(const std::string& n) { + error.raise(); senders.erase(n); } boost::shared_ptr<ReceiverContext> SessionContext::nextReceiver() { + error.raise(); for (SessionContext::ReceiverMap::iterator i = receivers.begin(); i != receivers.end(); ++i) { if (i->second->hasCurrent()) { return i->second; @@ -102,16 +120,19 @@ boost::shared_ptr<ReceiverContext> SessionContext::nextReceiver() uint32_t SessionContext::getReceivable() { + error.raise(); return 0;//TODO } uint32_t SessionContext::getUnsettledAcks() { + error.raise(); return 0;//TODO } qpid::framing::SequenceNumber SessionContext::record(pn_delivery_t* delivery) { + error.raise(); qpid::framing::SequenceNumber id = next++; if (!pn_delivery_settled(delivery)) unacked[id] = delivery; @@ -121,22 +142,32 @@ qpid::framing::SequenceNumber SessionContext::record(pn_delivery_t* delivery) void SessionContext::acknowledge(DeliveryMap::iterator begin, DeliveryMap::iterator end) { + error.raise(); for (DeliveryMap::iterator i = begin; i != end; ++i) { - QPID_LOG(debug, "Setting disposition for delivery " << i->first << " -> " << i->second); - pn_delivery_update(i->second, PN_ACCEPTED); - pn_delivery_settle(i->second);//TODO: different settlement modes? + types::Variant txState; + if (transaction) { + QPID_LOG(trace, "Setting disposition for transactional delivery " + << i->first << " -> " << i->second); + transaction->acknowledge(i->second); + } else { + QPID_LOG(trace, "Setting disposition for delivery " << i->first << " -> " << i->second); + pn_delivery_update(i->second, PN_ACCEPTED); + pn_delivery_settle(i->second); //TODO: different settlement modes? + } } unacked.erase(begin, end); } void SessionContext::acknowledge() { + error.raise(); QPID_LOG(debug, "acknowledging all " << unacked.size() << " messages"); acknowledge(unacked.begin(), unacked.end()); } void SessionContext::acknowledge(const qpid::framing::SequenceNumber& id, bool cumulative) { + error.raise(); QPID_LOG(debug, "acknowledging selected messages, id=" << id << ", cumulative=" << cumulative); DeliveryMap::iterator i = unacked.find(id); if (i != unacked.end()) { @@ -149,6 +180,7 @@ void SessionContext::acknowledge(const qpid::framing::SequenceNumber& id, bool c void SessionContext::nack(const qpid::framing::SequenceNumber& id, bool reject) { + error.raise(); DeliveryMap::iterator i = unacked.find(id); if (i != unacked.end()) { if (reject) { @@ -166,7 +198,9 @@ void SessionContext::nack(const qpid::framing::SequenceNumber& id, bool reject) bool SessionContext::settled() { + error.raise(); bool result = true; + for (SenderMap::iterator i = senders.begin(); i != senders.end(); ++i) { try { if (!i->second->closed() && !i->second->settled()) result = false; @@ -189,8 +223,25 @@ std::string SessionContext::getName() const void SessionContext::reset(pn_connection_t* connection) { - session = pn_session(connection); unacked.clear(); + if (transaction) { + if (transaction->isCommitting()) + error = new TransactionUnknown("Transaction outcome unknown: transport failure"); + else + error = new TransactionAborted("Transaction aborted: transport failure"); + resetSession(0); + senders.clear(); + receivers.clear(); + transaction.reset(); + return; + } + resetSession(pn_session(connection)); + +} + +void SessionContext::resetSession(pn_session_t* session_) { + session = session_; + if (transaction) transaction->reset(session); for (SessionContext::SenderMap::iterator i = senders.begin(); i != senders.end(); ++i) { i->second->reset(session); } @@ -198,4 +249,6 @@ void SessionContext::reset(pn_connection_t* connection) i->second->reset(session); } } + + }}} // namespace qpid::messaging::amqp diff --git a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h index 8c2bb040a6..67b3c1e401 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h +++ b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h @@ -26,6 +26,7 @@ #include <boost/shared_ptr.hpp> #include "qpid/sys/IntegerTypes.h" #include "qpid/framing/SequenceNumber.h" +#include "qpid/sys/ExceptionHolder.h" struct pn_connection_t; struct pn_session_t; @@ -42,6 +43,8 @@ namespace amqp { class ConnectionContext; class SenderContext; class ReceiverContext; +class Transaction; + /** * */ @@ -63,23 +66,29 @@ class SessionContext bool settled(); void setName(const std::string&); std::string getName() const; + + void nack(const qpid::framing::SequenceNumber& id, bool reject); + private: friend class ConnectionContext; typedef std::map<std::string, boost::shared_ptr<SenderContext> > SenderMap; typedef std::map<std::string, boost::shared_ptr<ReceiverContext> > ReceiverMap; typedef std::map<qpid::framing::SequenceNumber, pn_delivery_t*> DeliveryMap; + pn_session_t* session; SenderMap senders; + boost::shared_ptr<Transaction> transaction; ReceiverMap receivers; DeliveryMap unacked; qpid::framing::SequenceNumber next; std::string name; + sys::ExceptionHolder error; qpid::framing::SequenceNumber record(pn_delivery_t*); void acknowledge(); void acknowledge(const qpid::framing::SequenceNumber& id, bool cummulative); void acknowledge(DeliveryMap::iterator begin, DeliveryMap::iterator end); - void nack(const qpid::framing::SequenceNumber& id, bool reject); + void resetSession(pn_session_t*); }; }}} // namespace qpid::messaging::amqp diff --git a/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp b/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp index 4d427639d3..44294e5f04 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp @@ -42,12 +42,12 @@ SessionHandle::SessionHandle(boost::shared_ptr<ConnectionContext> c, boost::shar void SessionHandle::commit() { - + connection->commit(session); } void SessionHandle::rollback() { - + connection->rollback(session); } void SessionHandle::acknowledge(bool /*sync*/) diff --git a/qpid/cpp/src/qpid/messaging/amqp/Transaction.cpp b/qpid/cpp/src/qpid/messaging/amqp/Transaction.cpp new file mode 100644 index 0000000000..754b00d802 --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/amqp/Transaction.cpp @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "Transaction.h" +#include "SessionContext.h" +#include "ConnectionContext.h" +#include "PnData.h" +#include <proton/engine.h> +#include <qpid/Exception.h> +#include <qpid/amqp/descriptors.h> +#include <qpid/messaging/exceptions.h> +#include <qpid/log/Statement.h> +#include "qpid/messaging/Message.h" + +namespace qpid { +namespace messaging { +namespace amqp { + +using namespace types; +using types::Exception; + +namespace { +const std::string LOCAL_TRANSACTIONS("amqp:local-transactions"); +const std::string TX_COORDINATOR("tx-transaction"); +const std::string ADDRESS("tx-transaction;{link:{reliability:at-least-once}}"); +} + +Transaction::Transaction(pn_session_t* session) : + SenderContext(session, TX_COORDINATOR, Address(ADDRESS), false), committing(false) +{} + +void Transaction::clear() { + id.clear(); + sendState.reset(); + acceptState.reset(); +} + +void Transaction::configure() { + SenderContext::configure(); + pn_terminus_t* target = pn_link_target(sender); + pn_terminus_set_type(target, PN_COORDINATOR); + PnData(pn_terminus_capabilities(target)).putSymbol(LOCAL_TRANSACTIONS); +} + +void Transaction::verify() {} + +const std::string& Transaction::getTarget() const { return getName(); } + +void Transaction::declare(SendFunction send, const SessionPtr& session) { + committing = false; + error.raise(); + clear(); + Variant declare = Variant::described(qpid::amqp::transaction::DECLARE_CODE, Variant::List()); + SenderContext::Delivery* delivery = 0; + send(session, shared_from_this(), Message(declare), true, &delivery); + setId(*delivery); +} + +void Transaction::discharge(SendFunction send, const SessionPtr& session, bool fail) { + error.raise(); + committing = !fail; + try { + // Send a discharge message to the remote coordinator. + Variant::List dischargeList; + dischargeList.push_back(Variant(id)); + dischargeList.push_back(Variant(fail)); + Variant discharge(dischargeList); + discharge.setDescriptor(qpid::amqp::transaction::DISCHARGE_CODE); + SenderContext::Delivery* delivery = 0; + send(session, shared_from_this(), Message(discharge), true, &delivery); + if (!delivery->accepted()) + throw TransactionAborted(delivery->error()); + committing = false; + } + catch(const TransactionError&) { + throw; + } + catch(const Exception& e) { + committing = false; + throw TransactionAborted(e.what()); + } +} + +// Set the transaction ID from the delivery returned by the remote coordinator. +void Transaction::setId(const SenderContext::Delivery& delivery) +{ + if (delivery.getToken() && + pn_delivery_remote_state(delivery.getToken()) == qpid::amqp::transaction::DECLARED_CODE) + { + pn_data_t* data = pn_disposition_data(pn_delivery_remote(delivery.getToken())); + if (data && pn_data_next(data)) { + size_t count = pn_data_get_list(data); + if (count > 0) { + pn_data_enter(data); + pn_data_next(data); + setId(PnData::string(pn_data_get_binary(data))); + pn_data_exit(data); + return; + } + } + } + throw TransactionError("No transaction ID returned by remote coordinator."); +} + +void Transaction::setId(const std::string& id_) { + id = id_; + if (id.empty()) { + clear(); + } + else { + // NOTE: The send and accept states are NOT described, the descriptor + // is added in pn_delivery_update. + Variant::List list; + list.push_back(Variant(id, "binary")); + sendState = Variant(list); + + Variant accepted = Variant::described(qpid::amqp::message::ACCEPTED_CODE, Variant::List()); + list.push_back(accepted); + acceptState = Variant(list); + } +} + +types::Variant Transaction::getSendState() const { + error.raise(); + return sendState; +} + +void Transaction::acknowledge(pn_delivery_t* delivery) +{ + error.raise(); + PnData data(pn_disposition_data(pn_delivery_local(delivery))); + data.put(acceptState); + pn_delivery_update(delivery, qpid::amqp::transaction::TRANSACTIONAL_STATE_CODE); + pn_delivery_settle(delivery); +} + + + +}}} // namespace qpid::messaging::amqp diff --git a/qpid/cpp/src/qpid/messaging/amqp/Transaction.h b/qpid/cpp/src/qpid/messaging/amqp/Transaction.h new file mode 100644 index 0000000000..35492c9bb3 --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/amqp/Transaction.h @@ -0,0 +1,95 @@ +#ifndef COORDINATORCONTEXT_H +#define COORDINATORCONTEXT_H +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +#include "SenderContext.h" +#include <qpid/types/Variant.h> +#include "qpid/sys/ExceptionHolder.h" +#include <boost/enable_shared_from_this.hpp> +#include <boost/function.hpp> + +struct pn_session_t; + +namespace qpid { +namespace messaging { +namespace amqp { + +class SessionContext; +class ConnectionContext; + +/** + * Track the current transaction for a session. + * + * Implements SenderContext, to send transaction command messages to remote coordinator. + */ +class Transaction : public SenderContext, public boost::enable_shared_from_this<Transaction> { + public: + typedef boost::shared_ptr<SessionContext> SessionPtr; + + typedef boost::function<void (boost::shared_ptr<SessionContext> ssn, + boost::shared_ptr<SenderContext> snd, + const qpid::messaging::Message& message, + bool sync, + SenderContext::Delivery** delivery)> SendFunction; + + Transaction(pn_session_t*); + + sys::ExceptionHolder error; + + /** Declare a transaction using connection and session to send to remote co-ordinator. */ + void declare(SendFunction, const SessionPtr& session); + + /** Discharge a transaction using connection and session to send to remote co-ordinator. + *@param fail: true means rollback, false means commit. + */ + void discharge(SendFunction, const SessionPtr& session, bool fail); + + /** Update a delivery with a transactional accept state. */ + void acknowledge(pn_delivery_t* delivery); + + /** Get delivery state to attach to transfers sent in a transaction. */ + types::Variant getSendState() const; + + /** Override SenderContext::getTarget with a more readable value */ + const std::string& getTarget() const; + + bool isCommitting() const { return committing; } + + protected: + // SenderContext overrides + void configure(); + void verify(); + + private: + std::string id; + types::Variant sendState; + types::Variant acceptState; + bool committing; + + + void clear(); + void setId(const SenderContext::Delivery& delivery); + void setId(const std::string& id); +}; + +}}} + +#endif |