summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/messaging
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/messaging')
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp8
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp244
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h33
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/PnData.cpp84
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/PnData.h29
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp14
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp61
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/SenderContext.h54
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/SenderHandle.cpp3
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp69
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/SessionContext.h11
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp4
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/Transaction.cpp155
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/Transaction.h95
14 files changed, 665 insertions, 199 deletions
diff --git a/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp
index 7f19ca7ec0..2106e21686 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp
@@ -510,9 +510,9 @@ void AddressHelper::checkAssertion(pn_terminus_t* terminus, CheckMode mode)
requested.erase(j->first);
}
} else if (key == AUTO_DELETE) {
- PnData(data).read(v);
+ PnData(data).get(v);
isAutoDeleted = v.asBool();
- } else if (j != requested.end() && (PnData(data).read(v) && v.asString() == j->second.asString())) {
+ } else if (j != requested.end() && (PnData(data).get(v) && v.asString() == j->second.asString())) {
requested.erase(j->first);
}
}
@@ -646,7 +646,7 @@ void AddressHelper::configure(pn_link_t* link, pn_terminus_t* terminus, CheckMod
} else {
pn_data_put_ulong(filter, i->descriptorCode);
}
- PnData(filter).write(i->value);
+ PnData(filter).put(i->value);
pn_data_exit(filter);
}
pn_data_exit(filter);
@@ -733,7 +733,7 @@ void AddressHelper::setNodeProperties(pn_terminus_t* terminus)
putLifetimePolicy(data, toLifetimePolicy(i->second.asString()));
} else {
pn_data_put_symbol(data, convert(i->first));
- PnData(data).write(i->second);
+ PnData(data).put(i->second);
}
}
pn_data_exit(data);
diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
index a0b16c2b4c..d4a7b60e3c 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
@@ -25,8 +25,11 @@
#include "Sasl.h"
#include "SenderContext.h"
#include "SessionContext.h"
+#include "Transaction.h"
#include "Transport.h"
#include "qpid/amqp/descriptors.h"
+#include "qpid/amqp/Encoder.h"
+#include "qpid/amqp/Descriptor.h"
#include "qpid/messaging/exceptions.h"
#include "qpid/messaging/AddressImpl.h"
#include "qpid/messaging/Duration.h"
@@ -43,6 +46,7 @@
#include "qpid/sys/urlAdd.h"
#include "config.h"
#include <boost/lexical_cast.hpp>
+#include <boost/bind.hpp>
#include <vector>
extern "C" {
#include <proton/engine.h>
@@ -151,20 +155,23 @@ ConnectionContext::~ConnectionContext()
if (ticker) ticker->cancel();
close();
sessions.clear();
- pn_transport_free(engine);
pn_connection_free(connection);
+ pn_transport_free(engine);
}
bool ConnectionContext::isOpen() const
{
- qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ sys::Monitor::ScopedLock l(lock);
return state == CONNECTED && pn_connection_state(connection) & (PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE);
}
void ConnectionContext::sync(boost::shared_ptr<SessionContext> ssn)
{
- qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
- //wait for outstanding sends to settle
+ sys::Monitor::ScopedLock l(lock);
+ syncLH(ssn, l);
+}
+
+void ConnectionContext::syncLH(boost::shared_ptr<SessionContext> ssn, sys::Monitor::ScopedLock&) {
while (!ssn->settled()) {
QPID_LOG(debug, "Waiting for sends to settle on sync()");
wait(ssn);//wait until message has been confirmed
@@ -175,18 +182,13 @@ void ConnectionContext::sync(boost::shared_ptr<SessionContext> ssn)
void ConnectionContext::endSession(boost::shared_ptr<SessionContext> ssn)
{
- qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ sys::Monitor::ScopedLock l(lock);
if (pn_session_state(ssn->session) & PN_REMOTE_ACTIVE) {
//explicitly release messages that have yet to be fetched
for (SessionContext::ReceiverMap::iterator i = ssn->receivers.begin(); i != ssn->receivers.end(); ++i) {
drain_and_release_messages(ssn, i->second);
}
- //wait for outstanding sends to settle
- while (!ssn->settled()) {
- QPID_LOG(debug, "Waiting for sends to settle before closing");
- wait(ssn);//wait until message has been confirmed
- wakeupDriver();
- }
+ syncLH(ssn, l);
}
if (pn_session_state(ssn->session) & PN_REMOTE_ACTIVE) {
@@ -199,17 +201,11 @@ void ConnectionContext::endSession(boost::shared_ptr<SessionContext> ssn)
void ConnectionContext::close()
{
- qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ sys::Monitor::ScopedLock l(lock);
if (state != CONNECTED) return;
if (!(pn_connection_state(connection) & PN_LOCAL_CLOSED)) {
for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); ++i) {
- //wait for outstanding sends to settle
- while (!i->second->settled()) {
- QPID_LOG(debug, "Waiting for sends to settle before closing");
- wait(i->second);//wait until message has been confirmed
- }
-
-
+ syncLH(i->second, l);
if (!(pn_session_state(i->second->session) & PN_LOCAL_CLOSED)) {
pn_session_close(i->second->session);
}
@@ -246,7 +242,7 @@ bool ConnectionContext::fetch(boost::shared_ptr<SessionContext> ssn, boost::shar
*/
qpid::sys::AtomicCount::ScopedIncrement track(lnk->fetching);
{
- qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ sys::Monitor::ScopedLock l(lock);
checkClosed(ssn, lnk);
if (!lnk->capacity) {
pn_link_flow(lnk->receiver, 1);
@@ -257,10 +253,10 @@ bool ConnectionContext::fetch(boost::shared_ptr<SessionContext> ssn, boost::shar
return true;
} else {
{
- qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ sys::Monitor::ScopedLock l(lock);
pn_link_drain(lnk->receiver, 0);
wakeupDriver();
- while (pn_link_credit(lnk->receiver) && !pn_link_queued(lnk->receiver)) {
+ while (pn_link_draining(lnk->receiver) && !pn_link_queued(lnk->receiver)) {
QPID_LOG(debug, "Waiting for message or for credit to be drained: credit=" << pn_link_credit(lnk->receiver) << ", queued=" << pn_link_queued(lnk->receiver));
wait(ssn, lnk);
}
@@ -269,7 +265,7 @@ bool ConnectionContext::fetch(boost::shared_ptr<SessionContext> ssn, boost::shar
}
}
if (get(ssn, lnk, message, qpid::messaging::Duration::IMMEDIATE)) {
- qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ sys::Monitor::ScopedLock l(lock);
if (lnk->capacity) {
pn_link_flow(lnk->receiver, 1);
wakeupDriver();
@@ -296,7 +292,7 @@ bool ConnectionContext::get(boost::shared_ptr<SessionContext> ssn, boost::shared
{
qpid::sys::AbsTime until(convert(timeout));
while (true) {
- qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ sys::Monitor::ScopedLock l(lock);
checkClosed(ssn, lnk);
pn_delivery_t* current = pn_link_current((pn_link_t*) lnk->receiver);
QPID_LOG(debug, "In ConnectionContext::get(), current=" << current);
@@ -320,6 +316,9 @@ bool ConnectionContext::get(boost::shared_ptr<SessionContext> ssn, boost::shared
haveOutput = true;
}
}
+ // Automatically ack messages if we are in a transaction.
+ if (ssn->transaction)
+ acknowledgeLH(ssn, &message, false, l);
return true;
} else if (until > qpid::sys::now()) {
waitUntil(ssn, lnk, until);
@@ -334,7 +333,7 @@ boost::shared_ptr<ReceiverContext> ConnectionContext::nextReceiver(boost::shared
{
qpid::sys::AbsTime until(convert(timeout));
while (true) {
- qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ sys::Monitor::ScopedLock l(lock);
checkClosed(ssn);
boost::shared_ptr<ReceiverContext> r = ssn->nextReceiver();
if (r) {
@@ -347,9 +346,13 @@ boost::shared_ptr<ReceiverContext> ConnectionContext::nextReceiver(boost::shared
}
}
-void ConnectionContext::acknowledge(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message* message, bool cumulative)
+void ConnectionContext::acknowledge(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message* message, bool cumulative) {
+ sys::Monitor::ScopedLock l(lock);
+ acknowledgeLH(ssn, message, cumulative, l);
+}
+
+void ConnectionContext::acknowledgeLH(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message* message, bool cumulative, sys::Monitor::ScopedLock&)
{
- qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
checkClosed(ssn);
if (message) {
ssn->acknowledge(MessageImplAccess::get(*message).getInternalId(), cumulative);
@@ -361,7 +364,7 @@ void ConnectionContext::acknowledge(boost::shared_ptr<SessionContext> ssn, qpid:
void ConnectionContext::nack(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message& message, bool reject)
{
- qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ sys::Monitor::ScopedLock l(lock);
checkClosed(ssn);
ssn->nack(MessageImplAccess::get(message).getInternalId(), reject);
wakeupDriver();
@@ -369,7 +372,7 @@ void ConnectionContext::nack(boost::shared_ptr<SessionContext> ssn, qpid::messag
void ConnectionContext::detach(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<SenderContext> lnk)
{
- qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ sys::Monitor::ScopedLock l(lock);
if (pn_link_state(lnk->sender) & PN_LOCAL_ACTIVE) {
lnk->close();
}
@@ -401,7 +404,7 @@ void ConnectionContext::drain_and_release_messages(boost::shared_ptr<SessionCont
void ConnectionContext::detach(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk)
{
- qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ sys::Monitor::ScopedLock l(lock);
drain_and_release_messages(ssn, lnk);
if (pn_link_state(lnk->receiver) & PN_LOCAL_ACTIVE) {
lnk->close();
@@ -415,7 +418,7 @@ void ConnectionContext::detach(boost::shared_ptr<SessionContext> ssn, boost::sha
void ConnectionContext::attach(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<SenderContext> lnk)
{
- qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ sys::Monitor::ScopedLock l(lock);
lnk->configure();
attach(ssn, lnk->sender);
checkClosed(ssn, lnk);
@@ -425,7 +428,7 @@ void ConnectionContext::attach(boost::shared_ptr<SessionContext> ssn, boost::sha
void ConnectionContext::attach(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk)
{
- qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ sys::Monitor::ScopedLock l(lock);
lnk->configure();
attach(ssn, lnk->receiver, lnk->capacity);
checkClosed(ssn, lnk);
@@ -445,11 +448,26 @@ void ConnectionContext::attach(boost::shared_ptr<SessionContext> ssn, pn_link_t*
}
}
-void ConnectionContext::send(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<SenderContext> snd, const qpid::messaging::Message& message, bool sync)
+void ConnectionContext::send(
+ boost::shared_ptr<SessionContext> ssn,
+ boost::shared_ptr<SenderContext> snd,
+ const qpid::messaging::Message& message,
+ bool sync,
+ SenderContext::Delivery** delivery)
+{
+ sys::Monitor::ScopedLock l(lock);
+ sendLH(ssn, snd, message, sync, delivery, l);
+}
+
+void ConnectionContext::sendLH(
+ boost::shared_ptr<SessionContext> ssn,
+ boost::shared_ptr<SenderContext> snd,
+ const qpid::messaging::Message& message,
+ bool sync,
+ SenderContext::Delivery** delivery,
+ sys::Monitor::ScopedLock&)
{
- qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
checkClosed(ssn);
- SenderContext::Delivery* delivery(0);
while (pn_transport_pending(engine) > 65536) {
QPID_LOG(debug, "Have " << pn_transport_pending(engine) << " bytes of output pending; waiting for this to be written...");
notifyOnWrite = true;
@@ -457,17 +475,17 @@ void ConnectionContext::send(boost::shared_ptr<SessionContext> ssn, boost::share
wait(ssn, snd);
notifyOnWrite = false;
}
- while (!snd->send(message, &delivery)) {
+ while (!snd->send(message, delivery)) {
QPID_LOG(debug, "Waiting for capacity...");
wait(ssn, snd);//wait for capacity
}
wakeupDriver();
- if (sync && delivery) {
- while (!delivery->delivered()) {
+ if (sync && *delivery) {
+ while (!(*delivery)->delivered()) {
QPID_LOG(debug, "Waiting for confirmation...");
wait(ssn, snd);//wait until message has been confirmed
}
- if (delivery->rejected()) {
+ if ((*delivery)->rejected()) {
throw MessageRejected("Message was rejected by peer");
}
@@ -476,46 +494,46 @@ void ConnectionContext::send(boost::shared_ptr<SessionContext> ssn, boost::share
void ConnectionContext::setCapacity(boost::shared_ptr<SenderContext> sender, uint32_t capacity)
{
- qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ sys::Monitor::ScopedLock l(lock);
sender->setCapacity(capacity);
}
uint32_t ConnectionContext::getCapacity(boost::shared_ptr<SenderContext> sender)
{
- qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ sys::Monitor::ScopedLock l(lock);
return sender->getCapacity();
}
uint32_t ConnectionContext::getUnsettled(boost::shared_ptr<SenderContext> sender)
{
- qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ sys::Monitor::ScopedLock l(lock);
return sender->getUnsettled();
}
void ConnectionContext::setCapacity(boost::shared_ptr<ReceiverContext> receiver, uint32_t capacity)
{
- qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ sys::Monitor::ScopedLock l(lock);
receiver->setCapacity(capacity);
pn_link_flow((pn_link_t*) receiver->receiver, receiver->getCapacity());
wakeupDriver();
}
uint32_t ConnectionContext::getCapacity(boost::shared_ptr<ReceiverContext> receiver)
{
- qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ sys::Monitor::ScopedLock l(lock);
return receiver->getCapacity();
}
uint32_t ConnectionContext::getAvailable(boost::shared_ptr<ReceiverContext> receiver)
{
- qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ sys::Monitor::ScopedLock l(lock);
return receiver->getAvailable();
}
uint32_t ConnectionContext::getUnsettled(boost::shared_ptr<ReceiverContext> receiver)
{
- qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ sys::Monitor::ScopedLock l(lock);
return receiver->getUnsettled();
}
void ConnectionContext::activateOutput()
{
- qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ sys::Monitor::ScopedLock l(lock);
if (state == CONNECTED) wakeupDriver();
}
/**
@@ -543,8 +561,8 @@ pn_state_t IS_CLOSED = PN_LOCAL_CLOSED | PN_REMOTE_CLOSED;
void ConnectionContext::reset()
{
- pn_transport_free(engine);
pn_connection_free(connection);
+ pn_transport_free(engine);
engine = pn_transport();
connection = pn_connection();
@@ -555,7 +573,7 @@ void ConnectionContext::reset()
}
}
-void ConnectionContext::check() {
+bool ConnectionContext::check() {
if (checkDisconnected()) {
if (ConnectionOptions::reconnect) {
QPID_LOG(notice, "Auto-reconnecting to " << fullUrl);
@@ -564,7 +582,9 @@ void ConnectionContext::check() {
} else {
throw qpid::messaging::TransportFailure("Disconnected (reconnect disabled)");
}
+ return true;
}
+ return false;
}
bool ConnectionContext::checkDisconnected() {
@@ -588,7 +608,7 @@ bool ConnectionContext::checkDisconnected() {
void ConnectionContext::wait()
{
- check();
+ if (check()) return; // Reconnected, may need to re-test condition.
lock.wait();
check();
}
@@ -630,6 +650,7 @@ void ConnectionContext::waitUntil(boost::shared_ptr<SessionContext> ssn, boost::
void ConnectionContext::checkClosed(boost::shared_ptr<SessionContext> ssn)
{
check();
+ ssn->error.raise();
if ((pn_session_state(ssn->session) & REQUIRES_CLOSE) == REQUIRES_CLOSE) {
pn_condition_t* error = pn_session_remote_condition(ssn->session);
std::stringstream text;
@@ -690,6 +711,7 @@ void ConnectionContext::checkClosed(boost::shared_ptr<SessionContext> ssn, pn_li
void ConnectionContext::restartSession(boost::shared_ptr<SessionContext> s)
{
+ if (s->error) return;
pn_session_open(s->session);
wakeupDriver();
while (pn_session_state(s->session) & PN_REMOTE_UNINIT) {
@@ -718,26 +740,31 @@ void ConnectionContext::restartSession(boost::shared_ptr<SessionContext> s)
boost::shared_ptr<SessionContext> ConnectionContext::newSession(bool transactional, const std::string& n)
{
- qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
- if (transactional) throw qpid::messaging::MessagingException("Transactions not yet supported");
+ boost::shared_ptr<SessionContext> session;
std::string name = n.empty() ? qpid::framing::Uuid(true).str() : n;
- SessionMap::const_iterator i = sessions.find(name);
- if (i == sessions.end()) {
- boost::shared_ptr<SessionContext> s(new SessionContext(connection));
- s->setName(name);
- s->session = pn_session(connection);
- pn_session_open(s->session);
- wakeupDriver();
- while (pn_session_state(s->session) & PN_REMOTE_UNINIT) {
- wait();
+ {
+ sys::Monitor::ScopedLock l(lock);
+ SessionMap::const_iterator i = sessions.find(name);
+ if (i == sessions.end()) {
+ session = boost::shared_ptr<SessionContext>(new SessionContext(connection));
+ session->setName(name);
+ pn_session_open(session->session);
+ wakeupDriver();
+ sessions[name] = session; // Add it now so it will be restarted if we reconnect in wait()
+ while (pn_session_state(session->session) & PN_REMOTE_UNINIT) {
+ wait();
+ }
+ } else {
+ throw qpid::messaging::KeyError(std::string("Session already exists: ") + name);
}
- sessions[name] = s;
- return s;
- } else {
- throw qpid::messaging::KeyError(std::string("Session already exists: ") + name);
- }
+ }
+ if (transactional) { // Outside of lock
+ startTxSession(session);
+ }
+ return session;
}
+
boost::shared_ptr<SessionContext> ConnectionContext::getSession(const std::string& name) const
{
SessionMap::const_iterator i = sessions.find(name);
@@ -760,7 +787,7 @@ std::string ConnectionContext::getAuthenticatedUsername()
std::size_t ConnectionContext::decodePlain(const char* buffer, std::size_t size)
{
- qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ sys::Monitor::ScopedLock l(lock);
QPID_LOG(trace, id << " decode(" << size << ")");
if (readHeader) {
size_t decoded = readProtocolHeader(buffer, size);
@@ -805,7 +832,7 @@ std::size_t ConnectionContext::decodePlain(const char* buffer, std::size_t size)
}
std::size_t ConnectionContext::encodePlain(char* buffer, std::size_t size)
{
- qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ sys::Monitor::ScopedLock l(lock);
QPID_LOG(trace, id << " encode(" << size << ")");
if (writeHeader) {
size_t encoded = writeProtocolHeader(buffer, size);
@@ -843,19 +870,19 @@ std::size_t ConnectionContext::encodePlain(char* buffer, std::size_t size)
}
bool ConnectionContext::canEncodePlain()
{
- qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ sys::Monitor::ScopedLock l(lock);
pn_transport_tick(engine, qpid::sys::Duration::FromEpoch() / qpid::sys::TIME_MSEC);
return haveOutput && state == CONNECTED;
}
void ConnectionContext::closed()
{
- qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ sys::Monitor::ScopedLock l(lock);
state = DISCONNECTED;
lock.notifyAll();
}
void ConnectionContext::opened()
{
- qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ sys::Monitor::ScopedLock l(lock);
state = CONNECTED;
lock.notifyAll();
}
@@ -921,7 +948,7 @@ const qpid::messaging::ConnectionOptions* ConnectionContext::getOptions()
std::size_t ConnectionContext::decode(const char* buffer, std::size_t size)
{
- qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ sys::Monitor::ScopedLock l(lock);
size_t decoded = 0;
try {
if (sasl.get() && !sasl->authenticated()) {
@@ -939,7 +966,7 @@ std::size_t ConnectionContext::decode(const char* buffer, std::size_t size)
}
std::size_t ConnectionContext::encode(char* buffer, std::size_t size)
{
- qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ sys::Monitor::ScopedLock l(lock);
size_t encoded = 0;
try {
if (sasl.get() && sasl->canEncode()) {
@@ -957,7 +984,7 @@ std::size_t ConnectionContext::encode(char* buffer, std::size_t size)
}
bool ConnectionContext::canEncode()
{
- qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ sys::Monitor::ScopedLock l(lock);
if (sasl.get()) {
try {
if (sasl->canEncode()) return true;
@@ -978,26 +1005,21 @@ const std::string CLIENT_PPID("qpid.client_ppid");
}
void ConnectionContext::setProperties()
{
- pn_data_t* data = pn_connection_properties(connection);
- pn_data_put_map(data);
- pn_data_enter(data);
-
- pn_data_put_symbol(data, PnData::str(CLIENT_PROCESS_NAME));
- std::string processName = sys::SystemInfo::getProcessName();
- pn_data_put_string(data, PnData::str(processName));
-
- pn_data_put_symbol(data, PnData::str(CLIENT_PID));
- pn_data_put_int(data, sys::SystemInfo::getProcessId());
-
- pn_data_put_symbol(data, PnData::str(CLIENT_PPID));
- pn_data_put_int(data, sys::SystemInfo::getParentProcessId());
-
+ PnData data(pn_connection_properties(connection));
+ pn_data_put_map(data.data);
+ pn_data_enter(data.data);
+ data.putSymbol(CLIENT_PROCESS_NAME);
+ data.putSymbol(sys::SystemInfo::getProcessName());
+ data.putSymbol(CLIENT_PID);
+ data.put(int32_t(sys::SystemInfo::getProcessId()));
+ data.putSymbol(CLIENT_PPID);
+ data.put(int32_t(sys::SystemInfo::getParentProcessId()));
for (Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i)
{
- pn_data_put_symbol(data, PnData::str(i->first));
- PnData(data).write(i->second);
+ data.putSymbol(i->first);
+ data.put(i->second);
}
- pn_data_exit(data);
+ pn_data_exit(data.data);
}
const qpid::sys::SecuritySettings* ConnectionContext::getTransportSecuritySettings()
@@ -1007,7 +1029,7 @@ const qpid::sys::SecuritySettings* ConnectionContext::getTransportSecuritySettin
void ConnectionContext::open()
{
- qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ sys::Monitor::ScopedLock l(lock);
if (state != DISCONNECTED) throw qpid::messaging::ConnectionError("Connection was already opened!");
if (!driver) driver = DriverImpl::getDefault();
QPID_LOG(info, "Starting connection to " << fullUrl);
@@ -1049,7 +1071,7 @@ void ConnectionContext::autoconnect()
void ConnectionContext::reconnect(const Url& url) {
QPID_LOG(notice, "Reconnecting to " << url);
- qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ sys::Monitor::ScopedLock l(lock);
if (state != DISCONNECTED) throw qpid::messaging::ConnectionError("Connection was already opened!");
if (!driver) driver = DriverImpl::getDefault();
reset();
@@ -1137,7 +1159,7 @@ bool ConnectionContext::tryOpenAddr(const qpid::Address& addr) {
std::string ConnectionContext::getUrl() const
{
- qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ sys::Monitor::ScopedLock l(lock);
return (state == CONNECTED) ? currentUrl.str() : std::string();
}
@@ -1209,6 +1231,40 @@ bool ConnectionContext::CodecAdapter::canEncode()
return context.canEncodePlain();
}
+void ConnectionContext::startTxSession(boost::shared_ptr<SessionContext> session) {
+ try {
+ QPID_LOG(debug, id << " attaching transaction for " << session->getName());
+ boost::shared_ptr<Transaction> tx(new Transaction(session->session));
+ session->transaction = tx;
+ attach(session, tx);
+ tx->declare(boost::bind(&ConnectionContext::send, this, _1, _2, _3, _4, _5), session);
+ } catch (const Exception& e) {
+ throw TransactionError(Msg() << "Cannot start transaction: " << e.what());
+ }
+}
+
+void ConnectionContext::discharge(boost::shared_ptr<SessionContext> session, bool fail) {
+ {
+ sys::Monitor::ScopedLock l(lock);
+ checkClosed(session);
+ if (!session->transaction)
+ throw TransactionError("No Transaction");
+ Transaction::SendFunction sendFn = boost::bind(
+ &ConnectionContext::sendLH, this, _1, _2, _3, _4, _5, boost::ref(l));
+ syncLH(session, boost::ref(l)); // Sync to make sure all tx transfers have been received.
+ session->transaction->discharge(sendFn, session, fail);
+ session->transaction->declare(sendFn, session);
+ }
+}
+
+void ConnectionContext::commit(boost::shared_ptr<SessionContext> session) {
+ discharge(session, false);
+}
+
+void ConnectionContext::rollback(boost::shared_ptr<SessionContext> session) {
+ discharge(session, true);
+}
+
// setup the transport and connection objects:
void ConnectionContext::configureConnection()
diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h
index 80da9dff10..b687219624 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h
+++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h
@@ -34,6 +34,7 @@
#include "qpid/sys/Monitor.h"
#include "qpid/types/Variant.h"
#include "qpid/messaging/amqp/TransportContext.h"
+#include "SenderContext.h"
struct pn_connection_t;
struct pn_link_t;
@@ -59,7 +60,6 @@ class DriverImpl;
class ReceiverContext;
class Sasl;
class SessionContext;
-class SenderContext;
class Transport;
/**
@@ -82,10 +82,20 @@ class ConnectionContext : public qpid::sys::ConnectionCodec, public qpid::messag
void detach(boost::shared_ptr<SessionContext>, boost::shared_ptr<ReceiverContext>);
void drain_and_release_messages(boost::shared_ptr<SessionContext>, boost::shared_ptr<ReceiverContext>);
bool isClosed(boost::shared_ptr<SessionContext>, boost::shared_ptr<ReceiverContext>);
- void send(boost::shared_ptr<SessionContext>, boost::shared_ptr<SenderContext> ctxt, const qpid::messaging::Message& message, bool sync);
+
+ // Link operations
+ void send(boost::shared_ptr<SessionContext>, boost::shared_ptr<SenderContext> ctxt,
+ const qpid::messaging::Message& message, bool sync,
+ SenderContext::Delivery** delivery);
+
bool fetch(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk, qpid::messaging::Message& message, qpid::messaging::Duration timeout);
bool get(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk, qpid::messaging::Message& message, qpid::messaging::Duration timeout);
+
+ // Session operations
void acknowledge(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message* message, bool cumulative);
+ void commit(boost::shared_ptr<SessionContext> ssn);
+ void rollback(boost::shared_ptr<SessionContext> ssn);
+
void nack(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message& message, bool reject);
void sync(boost::shared_ptr<SessionContext> ssn);
boost::shared_ptr<ReceiverContext> nextReceiver(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Duration timeout);
@@ -93,10 +103,10 @@ class ConnectionContext : public qpid::sys::ConnectionCodec, public qpid::messag
void setOption(const std::string& name, const qpid::types::Variant& value);
std::string getAuthenticatedUsername();
+ // Link operations
void setCapacity(boost::shared_ptr<SenderContext>, uint32_t);
uint32_t getCapacity(boost::shared_ptr<SenderContext>);
uint32_t getUnsettled(boost::shared_ptr<SenderContext>);
-
void setCapacity(boost::shared_ptr<ReceiverContext>, uint32_t);
uint32_t getCapacity(boost::shared_ptr<ReceiverContext>);
uint32_t getAvailable(boost::shared_ptr<ReceiverContext>);
@@ -159,9 +169,12 @@ class ConnectionContext : public qpid::sys::ConnectionCodec, public qpid::messag
bool notifyOnWrite;
boost::intrusive_ptr<qpid::sys::TimerTask> ticker;
- void check();
+ bool check();
bool checkDisconnected();
void waitNoReconnect();
+
+ // NOTE: All wait*() functions must be called in a loop that checks for the
+ // waited condition with the lock held.
void wait();
void waitUntil(qpid::sys::AbsTime until);
void wait(boost::shared_ptr<SessionContext>);
@@ -170,10 +183,12 @@ class ConnectionContext : public qpid::sys::ConnectionCodec, public qpid::messag
void wait(boost::shared_ptr<SessionContext>, boost::shared_ptr<SenderContext>);
void waitUntil(boost::shared_ptr<SessionContext>, boost::shared_ptr<ReceiverContext>, qpid::sys::AbsTime until);
void waitUntil(boost::shared_ptr<SessionContext>, boost::shared_ptr<SenderContext>, qpid::sys::AbsTime until);
+
void checkClosed(boost::shared_ptr<SessionContext>);
void checkClosed(boost::shared_ptr<SessionContext>, boost::shared_ptr<ReceiverContext>);
void checkClosed(boost::shared_ptr<SessionContext>, boost::shared_ptr<SenderContext>);
void checkClosed(boost::shared_ptr<SessionContext>, pn_link_t*);
+
void wakeupDriver();
void attach(boost::shared_ptr<SessionContext>, pn_link_t*, int credit=0);
void autoconnect();
@@ -194,8 +209,18 @@ class ConnectionContext : public qpid::sys::ConnectionCodec, public qpid::messag
std::string getError();
bool useSasl();
void setProperties();
+
void configureConnection();
bool checkTransportError(std::string&);
+
+ void discharge(boost::shared_ptr<SessionContext>, bool fail);
+ void startTxSession(boost::shared_ptr<SessionContext>);
+
+ void syncLH(boost::shared_ptr<SessionContext> ssn, sys::Monitor::ScopedLock&);
+ void sendLH(boost::shared_ptr<SessionContext>, boost::shared_ptr<SenderContext> ctxt,
+ const qpid::messaging::Message& message, bool sync,
+ SenderContext::Delivery** delivery, sys::Monitor::ScopedLock&);
+ void acknowledgeLH(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message* message, bool cumulative, sys::Monitor::ScopedLock&);
};
}}} // namespace qpid::messaging::amqp
diff --git a/qpid/cpp/src/qpid/messaging/amqp/PnData.cpp b/qpid/cpp/src/qpid/messaging/amqp/PnData.cpp
index 5c57c5b0a3..3309d1a683 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/PnData.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/PnData.cpp
@@ -20,34 +20,53 @@
*/
#include "PnData.h"
#include "qpid/types/encodings.h"
+#include "qpid/log/Statement.h"
namespace qpid {
namespace messaging {
namespace amqp {
using types::Variant;
+using namespace types::encodings;
-void PnData::write(const Variant::Map& map)
+// TODO aconway 2014-11-20: PnData duplicates functionality of qpid::amqp::Encoder,Decoder.
+// Collapse them all into a single proton-based codec.
+
+void PnData::put(const Variant::Map& map)
{
pn_data_put_map(data);
pn_data_enter(data);
for (Variant::Map::const_iterator i = map.begin(); i != map.end(); ++i) {
- pn_data_put_string(data, str(i->first));
- write(i->second);
+ pn_data_put_string(data, bytes(i->first));
+ put(i->second);
}
pn_data_exit(data);
}
-void PnData::write(const Variant::List& list)
+
+void PnData::put(const Variant::List& list)
{
pn_data_put_list(data);
pn_data_enter(data);
for (Variant::List::const_iterator i = list.begin(); i != list.end(); ++i) {
- write(*i);
+ put(*i);
}
pn_data_exit(data);
}
-void PnData::write(const Variant& value)
+
+void PnData::put(const Variant& value)
{
+ // Open data descriptors associated with the value.
+ const Variant::List& descriptors = value.getDescriptors();
+ for (Variant::List::const_iterator i = descriptors.begin(); i != descriptors.end(); ++i) {
+ pn_data_put_described(data);
+ pn_data_enter(data);
+ if (i->getType() == types::VAR_STRING)
+ pn_data_put_symbol(data, bytes(i->asString()));
+ else
+ pn_data_put_ulong(data, i->asUint64());
+ }
+
+ // Put the variant value
switch (value.getType()) {
case qpid::types::VAR_VOID:
pn_data_put_null(data);
@@ -65,61 +84,70 @@ void PnData::write(const Variant& value)
pn_data_put_double(data, value.asDouble());
break;
case qpid::types::VAR_STRING:
- pn_data_put_string(data, str(value.asString()));
+ if (value.getEncoding() == ASCII)
+ pn_data_put_symbol(data, bytes(value.asString()));
+ else if (value.getEncoding() == BINARY)
+ pn_data_put_binary(data, bytes(value.asString()));
+ else
+ pn_data_put_string(data, bytes(value.asString()));
break;
case qpid::types::VAR_MAP:
- write(value.asMap());
+ put(value.asMap());
break;
case qpid::types::VAR_LIST:
- write(value.asList());
+ put(value.asList());
break;
default:
break;
}
+
+ // Close any descriptors.
+ for (Variant::List::const_iterator i = descriptors.begin(); i != descriptors.end(); ++i)
+ pn_data_exit(data);
}
-bool PnData::read(qpid::types::Variant& value)
+bool PnData::get(qpid::types::Variant& value)
{
- return read(pn_data_type(data), value);
+ return get(pn_data_type(data), value);
}
-void PnData::readList(qpid::types::Variant::List& value)
+void PnData::getList(qpid::types::Variant::List& value)
{
size_t count = pn_data_get_list(data);
pn_data_enter(data);
for (size_t i = 0; i < count && pn_data_next(data); ++i) {
qpid::types::Variant e;
- if (read(e)) value.push_back(e);
+ if (get(e)) value.push_back(e);
}
pn_data_exit(data);
}
-void PnData::readMap(qpid::types::Variant::Map& value)
+void PnData::getMap(qpid::types::Variant::Map& value)
{
size_t count = pn_data_get_list(data);
pn_data_enter(data);
for (size_t i = 0; i < (count/2) && pn_data_next(data); ++i) {
- std::string key = str(pn_data_get_symbol(data));
+ std::string key = string(pn_data_get_symbol(data));
pn_data_next(data);
qpid::types::Variant e;
- if (read(e)) value[key]= e;
+ if (get(e)) value[key]= e;
}
pn_data_exit(data);
}
-void PnData::readArray(qpid::types::Variant::List& value)
+void PnData::getArray(qpid::types::Variant::List& value)
{
size_t count = pn_data_get_array(data);
pn_type_t type = pn_data_get_array_type(data);
pn_data_enter(data);
for (size_t i = 0; i < count && pn_data_next(data); ++i) {
qpid::types::Variant e;
- if (read(type, e)) value.push_back(e);
+ if (get(type, e)) value.push_back(e);
}
pn_data_exit(data);
}
-bool PnData::read(pn_type_t type, qpid::types::Variant& value)
+bool PnData::get(pn_type_t type, qpid::types::Variant& value)
{
switch (type) {
case PN_NULL:
@@ -168,41 +196,41 @@ bool PnData::read(pn_type_t type, qpid::types::Variant& value)
value = qpid::types::Uuid(pn_data_get_uuid(data).bytes);
return true;
case PN_BINARY:
- value = str(pn_data_get_binary(data));
+ value = string(pn_data_get_binary(data));
value.setEncoding(qpid::types::encodings::BINARY);
return true;
case PN_STRING:
- value = str(pn_data_get_string(data));
+ value = string(pn_data_get_string(data));
value.setEncoding(qpid::types::encodings::UTF8);
return true;
case PN_SYMBOL:
- value = str(pn_data_get_string(data));
+ value = string(pn_data_get_string(data));
value.setEncoding(qpid::types::encodings::ASCII);
return true;
case PN_LIST:
value = qpid::types::Variant::List();
- readList(value.asList());
+ getList(value.asList());
return true;
break;
case PN_MAP:
value = qpid::types::Variant::Map();
- readMap(value.asMap());
+ getMap(value.asMap());
return true;
case PN_ARRAY:
value = qpid::types::Variant::List();
- readArray(value.asList());
+ getArray(value.asList());
return true;
case PN_DESCRIBED:
+ // TODO aconway 2014-11-20: get described values.
case PN_DECIMAL32:
case PN_DECIMAL64:
case PN_DECIMAL128:
default:
return false;
}
-
}
-pn_bytes_t PnData::str(const std::string& s)
+pn_bytes_t PnData::bytes(const std::string& s)
{
pn_bytes_t result;
result.start = const_cast<char*>(s.data());
@@ -210,7 +238,7 @@ pn_bytes_t PnData::str(const std::string& s)
return result;
}
-std::string PnData::str(const pn_bytes_t& in)
+std::string PnData::string(const pn_bytes_t& in)
{
return std::string(in.start, in.size);
}
diff --git a/qpid/cpp/src/qpid/messaging/amqp/PnData.h b/qpid/cpp/src/qpid/messaging/amqp/PnData.h
index 6d03235432..b0119f88fd 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/PnData.h
+++ b/qpid/cpp/src/qpid/messaging/amqp/PnData.h
@@ -32,28 +32,29 @@ namespace messaging {
namespace amqp {
/**
- * Helper class to read/write messaging types to/from pn_data_t.
+ * Helper class to put/get messaging types to/from pn_data_t.
*/
class PnData
{
public:
- PnData(pn_data_t* d) : data(d) {}
+ pn_data_t* data;
- void write(const types::Variant& value);
- void write(const types::Variant::Map& map);
- void write(const types::Variant::List& list);
+ PnData(pn_data_t* d) : data(d) {}
- bool read(pn_type_t type, types::Variant& value);
- bool read(types::Variant& value);
- void readList(types::Variant::List& value);
- void readMap(types::Variant::Map& value);
- void readArray(types::Variant::List& value);
+ void put(const types::Variant& value);
+ void put(const types::Variant::Map& map);
+ void put(const types::Variant::List& list);
+ void put(int32_t n) { pn_data_put_int(data, n); }
+ void putSymbol(const std::string& symbol) { pn_data_put_symbol(data, bytes(symbol)); }
- static pn_bytes_t str(const std::string&);
- static std::string str(const pn_bytes_t&);
+ bool get(pn_type_t type, types::Variant& value);
+ bool get(types::Variant& value);
+ void getList(types::Variant::List& value);
+ void getMap(types::Variant::Map& value);
+ void getArray(types::Variant::List& value);
- private:
- pn_data_t* data;
+ static pn_bytes_t bytes(const std::string&);
+ static std::string string(const pn_bytes_t&);
};
}}} // namespace messaging::amqp
diff --git a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp
index 5e0707056f..a28509b0b1 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp
@@ -37,9 +37,10 @@ ReceiverContext::ReceiverContext(pn_session_t* session, const std::string& n, co
helper(address),
receiver(pn_receiver(session, name.c_str())),
capacity(0), used(0) {}
+
ReceiverContext::~ReceiverContext()
{
- pn_link_free(receiver);
+ if (receiver) pn_link_free(receiver);
}
void ReceiverContext::setCapacity(uint32_t c)
@@ -63,12 +64,13 @@ uint32_t ReceiverContext::getAvailable()
uint32_t ReceiverContext::getUnsettled()
{
+ assert(pn_link_unsettled(receiver) >= pn_link_queued(receiver));
return pn_link_unsettled(receiver) - pn_link_queued(receiver);
}
void ReceiverContext::close()
{
- pn_link_close(receiver);
+ if (receiver) pn_link_close(receiver);
}
const std::string& ReceiverContext::getName() const
@@ -96,7 +98,7 @@ void ReceiverContext::verify()
}
void ReceiverContext::configure()
{
- configure(pn_link_source(receiver));
+ if (receiver) configure(pn_link_source(receiver));
}
void ReceiverContext::configure(pn_terminus_t* source)
{
@@ -116,13 +118,13 @@ Address ReceiverContext::getAddress() const
void ReceiverContext::reset(pn_session_t* session)
{
- receiver = pn_receiver(session, name.c_str());
- configure();
+ receiver = session ? pn_receiver(session, name.c_str()) : 0;
+ if (receiver) configure();
}
bool ReceiverContext::hasCurrent()
{
- return pn_link_current(receiver);
+ return receiver && pn_link_current(receiver);
}
bool ReceiverContext::wakeupToIssueCredit()
diff --git a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp
index 2a48b2241a..b12af5eb25 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp
@@ -18,8 +18,10 @@
* under the License.
*
*/
-#include "qpid/messaging/amqp/SenderContext.h"
-#include "qpid/messaging/amqp/EncodedMessage.h"
+#include "SenderContext.h"
+#include "Transaction.h"
+#include "EncodedMessage.h"
+#include "PnData.h"
#include "qpid/messaging/AddressImpl.h"
#include "qpid/messaging/exceptions.h"
#include "qpid/Exception.h"
@@ -40,22 +42,29 @@ extern "C" {
namespace qpid {
namespace messaging {
namespace amqp {
+
//TODO: proper conversion to wide string for address
-SenderContext::SenderContext(pn_session_t* session, const std::string& n, const qpid::messaging::Address& a, bool setToOnSend_)
- : name(n),
+SenderContext::SenderContext(pn_session_t* session, const std::string& n,
+ const qpid::messaging::Address& a,
+ bool setToOnSend_,
+ const CoordinatorPtr& coord)
+ : sender(pn_sender(session, n.c_str())),
+ name(n),
address(a),
helper(address),
- sender(pn_sender(session, n.c_str())), nextId(0), capacity(50), unreliable(helper.isUnreliable()),
- setToOnSend(setToOnSend_) {}
+ nextId(0), capacity(50), unreliable(helper.isUnreliable()),
+ setToOnSend(setToOnSend_),
+ transaction(coord)
+{}
SenderContext::~SenderContext()
{
- pn_link_free(sender);
+ if (sender) pn_link_free(sender);
}
void SenderContext::close()
{
- pn_link_close(sender);
+ if (sender) pn_link_close(sender);
}
void SenderContext::setCapacity(uint32_t c)
@@ -88,10 +97,13 @@ bool SenderContext::send(const qpid::messaging::Message& message, SenderContext:
{
resend();//if there are any messages needing to be resent at the front of the queue, send them first
if (processUnsettled(false) < capacity && pn_link_credit(sender)) {
+ types::Variant state;
+ if (transaction)
+ state = transaction->getSendState();
if (unreliable) {
Delivery delivery(nextId++);
delivery.encode(MessageImplAccess::get(message), address, setToOnSend);
- delivery.send(sender, unreliable);
+ delivery.send(sender, unreliable, state);
*out = 0;
return true;
} else {
@@ -99,7 +111,7 @@ bool SenderContext::send(const qpid::messaging::Message& message, SenderContext:
try {
Delivery& delivery = deliveries.back();
delivery.encode(MessageImplAccess::get(message), address, setToOnSend);
- delivery.send(sender, unreliable);
+ delivery.send(sender, unreliable, state);
*out = &delivery;
return true;
} catch (const std::exception& e) {
@@ -507,7 +519,8 @@ void SenderContext::Delivery::encode(const qpid::messaging::MessageImpl& msg, co
throw SendError(e.what());
}
}
-void SenderContext::Delivery::send(pn_link_t* sender, bool unreliable)
+
+void SenderContext::Delivery::send(pn_link_t* sender, bool unreliable, const types::Variant& state)
{
pn_delivery_tag_t tag;
tag.size = sizeof(id);
@@ -517,6 +530,11 @@ void SenderContext::Delivery::send(pn_link_t* sender, bool unreliable)
tag.bytes = reinterpret_cast<const char*>(&id);
#endif
token = pn_delivery(sender, tag);
+ if (!state.isVoid()) { // Add transaction state
+ PnData data(pn_disposition_data(pn_delivery_local(token)));
+ data.put(state);
+ pn_delivery_update(token, qpid::amqp::transaction::TRANSACTIONAL_STATE_CODE);
+ }
pn_link_send(sender, encoded.getData(), encoded.getSize());
if (unreliable) {
pn_delivery_settle(token);
@@ -551,6 +569,15 @@ bool SenderContext::Delivery::rejected()
{
return pn_delivery_remote_state(token) == PN_REJECTED;
}
+
+std::string SenderContext::Delivery::error()
+{
+ pn_condition_t *condition = pn_disposition_condition(pn_delivery_remote(token));
+ return (condition && pn_condition_is_set(condition)) ?
+ Msg() << pn_condition_get_name(condition) << ": " << pn_condition_get_description(condition) :
+ std::string();
+}
+
void SenderContext::Delivery::settle()
{
pn_delivery_settle(token);
@@ -570,10 +597,12 @@ void SenderContext::verify()
helper.checkAssertion(target, AddressHelper::FOR_SENDER);
}
+
void SenderContext::configure()
{
- configure(pn_link_target(sender));
+ if (sender) configure(pn_link_target(sender));
}
+
void SenderContext::configure(pn_terminus_t* target)
{
helper.configure(sender, target, AddressHelper::FOR_SENDER);
@@ -603,12 +632,10 @@ Address SenderContext::getAddress() const
void SenderContext::reset(pn_session_t* session)
{
- sender = pn_sender(session, name.c_str());
- configure();
-
- for (Deliveries::iterator i = deliveries.begin(); i != deliveries.end(); ++i) {
+ sender = session ? pn_sender(session, name.c_str()) : 0;
+ if (sender) configure();
+ for (Deliveries::iterator i = deliveries.begin(); i != deliveries.end(); ++i)
i->reset();
- }
}
void SenderContext::resend()
diff --git a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h
index 66e45a85a6..4d3c4bee79 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h
+++ b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h
@@ -24,6 +24,7 @@
#include <deque>
#include <string>
#include <vector>
+#include <boost/shared_ptr.hpp>
#include "qpid/sys/IntegerTypes.h"
#include "qpid/messaging/Address.h"
#include "qpid/messaging/amqp/AddressHelper.h"
@@ -41,9 +42,10 @@ class Message;
class MessageImpl;
namespace amqp {
-/**
- *
- */
+
+class Transaction;
+
+
class SenderContext
{
public:
@@ -52,13 +54,15 @@ class SenderContext
public:
Delivery(int32_t id);
void encode(const qpid::messaging::MessageImpl& message, const qpid::messaging::Address&, bool setToField);
- void send(pn_link_t*, bool unreliable);
+ void send(pn_link_t*, bool unreliable, const types::Variant& state=types::Variant());
bool delivered();
bool accepted();
bool rejected();
void settle();
void reset();
bool sent() const;
+ pn_delivery_t* getToken() const { return token; }
+ std::string error();
private:
int32_t id;
pn_delivery_t* token;
@@ -66,22 +70,32 @@ class SenderContext
bool presettled;
};
- SenderContext(pn_session_t* session, const std::string& name, const qpid::messaging::Address& target, bool setToOnSend);
+ typedef boost::shared_ptr<Transaction> CoordinatorPtr;
+
+ SenderContext(pn_session_t* session, const std::string& name,
+ const qpid::messaging::Address& target,
+ bool setToOnSend,
+ const CoordinatorPtr& transaction = CoordinatorPtr());
~SenderContext();
- void reset(pn_session_t* session);
- void close();
- void setCapacity(uint32_t);
- uint32_t getCapacity();
- uint32_t getUnsettled();
- const std::string& getName() const;
- const std::string& getTarget() const;
- bool send(const qpid::messaging::Message& message, Delivery**);
- void configure();
- void verify();
- void check();
- bool settled();
- bool closed();
- Address getAddress() const;
+
+ virtual void reset(pn_session_t* session);
+ virtual void close();
+ virtual void setCapacity(uint32_t);
+ virtual uint32_t getCapacity();
+ virtual uint32_t getUnsettled();
+ virtual const std::string& getName() const;
+ virtual const std::string& getTarget() const;
+ virtual bool send(const qpid::messaging::Message& message, Delivery**);
+ virtual void configure();
+ virtual void verify();
+ virtual void check();
+ virtual bool settled();
+ virtual bool closed();
+ virtual Address getAddress() const;
+
+ protected:
+ pn_link_t* sender;
+
private:
friend class ConnectionContext;
typedef std::deque<Delivery> Deliveries;
@@ -89,12 +103,12 @@ class SenderContext
const std::string name;
qpid::messaging::Address address;
AddressHelper helper;
- pn_link_t* sender;
int32_t nextId;
Deliveries deliveries;
uint32_t capacity;
bool unreliable;
bool setToOnSend;
+ boost::shared_ptr<Transaction> transaction;
uint32_t processUnsettled(bool silent);
void configure(pn_terminus_t*);
diff --git a/qpid/cpp/src/qpid/messaging/amqp/SenderHandle.cpp b/qpid/cpp/src/qpid/messaging/amqp/SenderHandle.cpp
index 367db701cb..98f2d34e7d 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/SenderHandle.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/SenderHandle.cpp
@@ -39,7 +39,8 @@ SenderHandle::SenderHandle(boost::shared_ptr<ConnectionContext> c,
void SenderHandle::send(const Message& message, bool sync)
{
- connection->send(session, sender, message, sync);
+ SenderContext::Delivery* d = 0;
+ connection->send(session, sender, message, sync, &d);
}
void SenderHandle::close()
diff --git a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp
index 824b958af3..2b82ffc377 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp
@@ -21,11 +21,15 @@
#include "SessionContext.h"
#include "SenderContext.h"
#include "ReceiverContext.h"
+#include "Transaction.h"
+#include "PnData.h"
#include <boost/format.hpp>
#include "qpid/messaging/Address.h"
#include "qpid/messaging/Duration.h"
#include "qpid/messaging/exceptions.h"
#include "qpid/log/Statement.h"
+#include "qpid/amqp/descriptors.h"
+
extern "C" {
#include <proton/engine.h>
}
@@ -35,23 +39,32 @@ namespace messaging {
namespace amqp {
SessionContext::SessionContext(pn_connection_t* connection) : session(pn_session(connection)) {}
+
SessionContext::~SessionContext()
{
- senders.clear(); receivers.clear();
- pn_session_free(session);
+ // Clear all pointers to senders and receivers before we free the session.
+ senders.clear();
+ receivers.clear();
+ transaction.reset(); // Transaction is a sender.
+ if (!error && session)
+ pn_session_free(session);
}
boost::shared_ptr<SenderContext> SessionContext::createSender(const qpid::messaging::Address& address, bool setToOnSend)
{
+ error.raise();
std::string name = AddressHelper::getLinkName(address);
- if (senders.find(name) != senders.end()) throw LinkError("Link name must be unique within the scope of the connection");
- boost::shared_ptr<SenderContext> s(new SenderContext(session, name, address, setToOnSend));
+ if (senders.find(name) != senders.end())
+ throw LinkError("Link name must be unique within the scope of the connection");
+ boost::shared_ptr<SenderContext> s(
+ new SenderContext(session, name, address, setToOnSend, transaction));
senders[name] = s;
return s;
}
boost::shared_ptr<ReceiverContext> SessionContext::createReceiver(const qpid::messaging::Address& address)
{
+ error.raise();
std::string name = AddressHelper::getLinkName(address);
if (receivers.find(name) != receivers.end()) throw LinkError("Link name must be unique within the scope of the connection");
boost::shared_ptr<ReceiverContext> r(new ReceiverContext(session, name, address));
@@ -61,6 +74,7 @@ boost::shared_ptr<ReceiverContext> SessionContext::createReceiver(const qpid::me
boost::shared_ptr<SenderContext> SessionContext::getSender(const std::string& name) const
{
+ error.raise();
SenderMap::const_iterator i = senders.find(name);
if (i == senders.end()) {
throw qpid::messaging::KeyError(std::string("No such sender") + name);
@@ -71,6 +85,7 @@ boost::shared_ptr<SenderContext> SessionContext::getSender(const std::string& na
boost::shared_ptr<ReceiverContext> SessionContext::getReceiver(const std::string& name) const
{
+ error.raise();
ReceiverMap::const_iterator i = receivers.find(name);
if (i == receivers.end()) {
throw qpid::messaging::KeyError(std::string("No such receiver") + name);
@@ -81,16 +96,19 @@ boost::shared_ptr<ReceiverContext> SessionContext::getReceiver(const std::string
void SessionContext::removeReceiver(const std::string& n)
{
+ error.raise();
receivers.erase(n);
}
void SessionContext::removeSender(const std::string& n)
{
+ error.raise();
senders.erase(n);
}
boost::shared_ptr<ReceiverContext> SessionContext::nextReceiver()
{
+ error.raise();
for (SessionContext::ReceiverMap::iterator i = receivers.begin(); i != receivers.end(); ++i) {
if (i->second->hasCurrent()) {
return i->second;
@@ -102,16 +120,19 @@ boost::shared_ptr<ReceiverContext> SessionContext::nextReceiver()
uint32_t SessionContext::getReceivable()
{
+ error.raise();
return 0;//TODO
}
uint32_t SessionContext::getUnsettledAcks()
{
+ error.raise();
return 0;//TODO
}
qpid::framing::SequenceNumber SessionContext::record(pn_delivery_t* delivery)
{
+ error.raise();
qpid::framing::SequenceNumber id = next++;
if (!pn_delivery_settled(delivery))
unacked[id] = delivery;
@@ -121,22 +142,32 @@ qpid::framing::SequenceNumber SessionContext::record(pn_delivery_t* delivery)
void SessionContext::acknowledge(DeliveryMap::iterator begin, DeliveryMap::iterator end)
{
+ error.raise();
for (DeliveryMap::iterator i = begin; i != end; ++i) {
- QPID_LOG(debug, "Setting disposition for delivery " << i->first << " -> " << i->second);
- pn_delivery_update(i->second, PN_ACCEPTED);
- pn_delivery_settle(i->second);//TODO: different settlement modes?
+ types::Variant txState;
+ if (transaction) {
+ QPID_LOG(trace, "Setting disposition for transactional delivery "
+ << i->first << " -> " << i->second);
+ transaction->acknowledge(i->second);
+ } else {
+ QPID_LOG(trace, "Setting disposition for delivery " << i->first << " -> " << i->second);
+ pn_delivery_update(i->second, PN_ACCEPTED);
+ pn_delivery_settle(i->second); //TODO: different settlement modes?
+ }
}
unacked.erase(begin, end);
}
void SessionContext::acknowledge()
{
+ error.raise();
QPID_LOG(debug, "acknowledging all " << unacked.size() << " messages");
acknowledge(unacked.begin(), unacked.end());
}
void SessionContext::acknowledge(const qpid::framing::SequenceNumber& id, bool cumulative)
{
+ error.raise();
QPID_LOG(debug, "acknowledging selected messages, id=" << id << ", cumulative=" << cumulative);
DeliveryMap::iterator i = unacked.find(id);
if (i != unacked.end()) {
@@ -149,6 +180,7 @@ void SessionContext::acknowledge(const qpid::framing::SequenceNumber& id, bool c
void SessionContext::nack(const qpid::framing::SequenceNumber& id, bool reject)
{
+ error.raise();
DeliveryMap::iterator i = unacked.find(id);
if (i != unacked.end()) {
if (reject) {
@@ -166,7 +198,9 @@ void SessionContext::nack(const qpid::framing::SequenceNumber& id, bool reject)
bool SessionContext::settled()
{
+ error.raise();
bool result = true;
+
for (SenderMap::iterator i = senders.begin(); i != senders.end(); ++i) {
try {
if (!i->second->closed() && !i->second->settled()) result = false;
@@ -189,8 +223,25 @@ std::string SessionContext::getName() const
void SessionContext::reset(pn_connection_t* connection)
{
- session = pn_session(connection);
unacked.clear();
+ if (transaction) {
+ if (transaction->isCommitting())
+ error = new TransactionUnknown("Transaction outcome unknown: transport failure");
+ else
+ error = new TransactionAborted("Transaction aborted: transport failure");
+ resetSession(0);
+ senders.clear();
+ receivers.clear();
+ transaction.reset();
+ return;
+ }
+ resetSession(pn_session(connection));
+
+}
+
+void SessionContext::resetSession(pn_session_t* session_) {
+ session = session_;
+ if (transaction) transaction->reset(session);
for (SessionContext::SenderMap::iterator i = senders.begin(); i != senders.end(); ++i) {
i->second->reset(session);
}
@@ -198,4 +249,6 @@ void SessionContext::reset(pn_connection_t* connection)
i->second->reset(session);
}
}
+
+
}}} // namespace qpid::messaging::amqp
diff --git a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h
index 8c2bb040a6..67b3c1e401 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h
+++ b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h
@@ -26,6 +26,7 @@
#include <boost/shared_ptr.hpp>
#include "qpid/sys/IntegerTypes.h"
#include "qpid/framing/SequenceNumber.h"
+#include "qpid/sys/ExceptionHolder.h"
struct pn_connection_t;
struct pn_session_t;
@@ -42,6 +43,8 @@ namespace amqp {
class ConnectionContext;
class SenderContext;
class ReceiverContext;
+class Transaction;
+
/**
*
*/
@@ -63,23 +66,29 @@ class SessionContext
bool settled();
void setName(const std::string&);
std::string getName() const;
+
+ void nack(const qpid::framing::SequenceNumber& id, bool reject);
+
private:
friend class ConnectionContext;
typedef std::map<std::string, boost::shared_ptr<SenderContext> > SenderMap;
typedef std::map<std::string, boost::shared_ptr<ReceiverContext> > ReceiverMap;
typedef std::map<qpid::framing::SequenceNumber, pn_delivery_t*> DeliveryMap;
+
pn_session_t* session;
SenderMap senders;
+ boost::shared_ptr<Transaction> transaction;
ReceiverMap receivers;
DeliveryMap unacked;
qpid::framing::SequenceNumber next;
std::string name;
+ sys::ExceptionHolder error;
qpid::framing::SequenceNumber record(pn_delivery_t*);
void acknowledge();
void acknowledge(const qpid::framing::SequenceNumber& id, bool cummulative);
void acknowledge(DeliveryMap::iterator begin, DeliveryMap::iterator end);
- void nack(const qpid::framing::SequenceNumber& id, bool reject);
+ void resetSession(pn_session_t*);
};
}}} // namespace qpid::messaging::amqp
diff --git a/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp b/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp
index 4d427639d3..44294e5f04 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp
@@ -42,12 +42,12 @@ SessionHandle::SessionHandle(boost::shared_ptr<ConnectionContext> c, boost::shar
void SessionHandle::commit()
{
-
+ connection->commit(session);
}
void SessionHandle::rollback()
{
-
+ connection->rollback(session);
}
void SessionHandle::acknowledge(bool /*sync*/)
diff --git a/qpid/cpp/src/qpid/messaging/amqp/Transaction.cpp b/qpid/cpp/src/qpid/messaging/amqp/Transaction.cpp
new file mode 100644
index 0000000000..754b00d802
--- /dev/null
+++ b/qpid/cpp/src/qpid/messaging/amqp/Transaction.cpp
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "Transaction.h"
+#include "SessionContext.h"
+#include "ConnectionContext.h"
+#include "PnData.h"
+#include <proton/engine.h>
+#include <qpid/Exception.h>
+#include <qpid/amqp/descriptors.h>
+#include <qpid/messaging/exceptions.h>
+#include <qpid/log/Statement.h>
+#include "qpid/messaging/Message.h"
+
+namespace qpid {
+namespace messaging {
+namespace amqp {
+
+using namespace types;
+using types::Exception;
+
+namespace {
+const std::string LOCAL_TRANSACTIONS("amqp:local-transactions");
+const std::string TX_COORDINATOR("tx-transaction");
+const std::string ADDRESS("tx-transaction;{link:{reliability:at-least-once}}");
+}
+
+Transaction::Transaction(pn_session_t* session) :
+ SenderContext(session, TX_COORDINATOR, Address(ADDRESS), false), committing(false)
+{}
+
+void Transaction::clear() {
+ id.clear();
+ sendState.reset();
+ acceptState.reset();
+}
+
+void Transaction::configure() {
+ SenderContext::configure();
+ pn_terminus_t* target = pn_link_target(sender);
+ pn_terminus_set_type(target, PN_COORDINATOR);
+ PnData(pn_terminus_capabilities(target)).putSymbol(LOCAL_TRANSACTIONS);
+}
+
+void Transaction::verify() {}
+
+const std::string& Transaction::getTarget() const { return getName(); }
+
+void Transaction::declare(SendFunction send, const SessionPtr& session) {
+ committing = false;
+ error.raise();
+ clear();
+ Variant declare = Variant::described(qpid::amqp::transaction::DECLARE_CODE, Variant::List());
+ SenderContext::Delivery* delivery = 0;
+ send(session, shared_from_this(), Message(declare), true, &delivery);
+ setId(*delivery);
+}
+
+void Transaction::discharge(SendFunction send, const SessionPtr& session, bool fail) {
+ error.raise();
+ committing = !fail;
+ try {
+ // Send a discharge message to the remote coordinator.
+ Variant::List dischargeList;
+ dischargeList.push_back(Variant(id));
+ dischargeList.push_back(Variant(fail));
+ Variant discharge(dischargeList);
+ discharge.setDescriptor(qpid::amqp::transaction::DISCHARGE_CODE);
+ SenderContext::Delivery* delivery = 0;
+ send(session, shared_from_this(), Message(discharge), true, &delivery);
+ if (!delivery->accepted())
+ throw TransactionAborted(delivery->error());
+ committing = false;
+ }
+ catch(const TransactionError&) {
+ throw;
+ }
+ catch(const Exception& e) {
+ committing = false;
+ throw TransactionAborted(e.what());
+ }
+}
+
+// Set the transaction ID from the delivery returned by the remote coordinator.
+void Transaction::setId(const SenderContext::Delivery& delivery)
+{
+ if (delivery.getToken() &&
+ pn_delivery_remote_state(delivery.getToken()) == qpid::amqp::transaction::DECLARED_CODE)
+ {
+ pn_data_t* data = pn_disposition_data(pn_delivery_remote(delivery.getToken()));
+ if (data && pn_data_next(data)) {
+ size_t count = pn_data_get_list(data);
+ if (count > 0) {
+ pn_data_enter(data);
+ pn_data_next(data);
+ setId(PnData::string(pn_data_get_binary(data)));
+ pn_data_exit(data);
+ return;
+ }
+ }
+ }
+ throw TransactionError("No transaction ID returned by remote coordinator.");
+}
+
+void Transaction::setId(const std::string& id_) {
+ id = id_;
+ if (id.empty()) {
+ clear();
+ }
+ else {
+ // NOTE: The send and accept states are NOT described, the descriptor
+ // is added in pn_delivery_update.
+ Variant::List list;
+ list.push_back(Variant(id, "binary"));
+ sendState = Variant(list);
+
+ Variant accepted = Variant::described(qpid::amqp::message::ACCEPTED_CODE, Variant::List());
+ list.push_back(accepted);
+ acceptState = Variant(list);
+ }
+}
+
+types::Variant Transaction::getSendState() const {
+ error.raise();
+ return sendState;
+}
+
+void Transaction::acknowledge(pn_delivery_t* delivery)
+{
+ error.raise();
+ PnData data(pn_disposition_data(pn_delivery_local(delivery)));
+ data.put(acceptState);
+ pn_delivery_update(delivery, qpid::amqp::transaction::TRANSACTIONAL_STATE_CODE);
+ pn_delivery_settle(delivery);
+}
+
+
+
+}}} // namespace qpid::messaging::amqp
diff --git a/qpid/cpp/src/qpid/messaging/amqp/Transaction.h b/qpid/cpp/src/qpid/messaging/amqp/Transaction.h
new file mode 100644
index 0000000000..35492c9bb3
--- /dev/null
+++ b/qpid/cpp/src/qpid/messaging/amqp/Transaction.h
@@ -0,0 +1,95 @@
+#ifndef COORDINATORCONTEXT_H
+#define COORDINATORCONTEXT_H
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+#include "SenderContext.h"
+#include <qpid/types/Variant.h>
+#include "qpid/sys/ExceptionHolder.h"
+#include <boost/enable_shared_from_this.hpp>
+#include <boost/function.hpp>
+
+struct pn_session_t;
+
+namespace qpid {
+namespace messaging {
+namespace amqp {
+
+class SessionContext;
+class ConnectionContext;
+
+/**
+ * Track the current transaction for a session.
+ *
+ * Implements SenderContext, to send transaction command messages to remote coordinator.
+ */
+class Transaction : public SenderContext, public boost::enable_shared_from_this<Transaction> {
+ public:
+ typedef boost::shared_ptr<SessionContext> SessionPtr;
+
+ typedef boost::function<void (boost::shared_ptr<SessionContext> ssn,
+ boost::shared_ptr<SenderContext> snd,
+ const qpid::messaging::Message& message,
+ bool sync,
+ SenderContext::Delivery** delivery)> SendFunction;
+
+ Transaction(pn_session_t*);
+
+ sys::ExceptionHolder error;
+
+ /** Declare a transaction using connection and session to send to remote co-ordinator. */
+ void declare(SendFunction, const SessionPtr& session);
+
+ /** Discharge a transaction using connection and session to send to remote co-ordinator.
+ *@param fail: true means rollback, false means commit.
+ */
+ void discharge(SendFunction, const SessionPtr& session, bool fail);
+
+ /** Update a delivery with a transactional accept state. */
+ void acknowledge(pn_delivery_t* delivery);
+
+ /** Get delivery state to attach to transfers sent in a transaction. */
+ types::Variant getSendState() const;
+
+ /** Override SenderContext::getTarget with a more readable value */
+ const std::string& getTarget() const;
+
+ bool isCommitting() const { return committing; }
+
+ protected:
+ // SenderContext overrides
+ void configure();
+ void verify();
+
+ private:
+ std::string id;
+ types::Variant sendState;
+ types::Variant acceptState;
+ bool committing;
+
+
+ void clear();
+ void setId(const SenderContext::Delivery& delivery);
+ void setId(const std::string& id);
+};
+
+}}}
+
+#endif