diff options
Diffstat (limited to 'qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp | 61 |
1 files changed, 44 insertions, 17 deletions
diff --git a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp index 2a48b2241a..b12af5eb25 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp @@ -18,8 +18,10 @@ * under the License. * */ -#include "qpid/messaging/amqp/SenderContext.h" -#include "qpid/messaging/amqp/EncodedMessage.h" +#include "SenderContext.h" +#include "Transaction.h" +#include "EncodedMessage.h" +#include "PnData.h" #include "qpid/messaging/AddressImpl.h" #include "qpid/messaging/exceptions.h" #include "qpid/Exception.h" @@ -40,22 +42,29 @@ extern "C" { namespace qpid { namespace messaging { namespace amqp { + //TODO: proper conversion to wide string for address -SenderContext::SenderContext(pn_session_t* session, const std::string& n, const qpid::messaging::Address& a, bool setToOnSend_) - : name(n), +SenderContext::SenderContext(pn_session_t* session, const std::string& n, + const qpid::messaging::Address& a, + bool setToOnSend_, + const CoordinatorPtr& coord) + : sender(pn_sender(session, n.c_str())), + name(n), address(a), helper(address), - sender(pn_sender(session, n.c_str())), nextId(0), capacity(50), unreliable(helper.isUnreliable()), - setToOnSend(setToOnSend_) {} + nextId(0), capacity(50), unreliable(helper.isUnreliable()), + setToOnSend(setToOnSend_), + transaction(coord) +{} SenderContext::~SenderContext() { - pn_link_free(sender); + if (sender) pn_link_free(sender); } void SenderContext::close() { - pn_link_close(sender); + if (sender) pn_link_close(sender); } void SenderContext::setCapacity(uint32_t c) @@ -88,10 +97,13 @@ bool SenderContext::send(const qpid::messaging::Message& message, SenderContext: { resend();//if there are any messages needing to be resent at the front of the queue, send them first if (processUnsettled(false) < capacity && pn_link_credit(sender)) { + types::Variant state; + if (transaction) + state = transaction->getSendState(); if (unreliable) { Delivery delivery(nextId++); delivery.encode(MessageImplAccess::get(message), address, setToOnSend); - delivery.send(sender, unreliable); + delivery.send(sender, unreliable, state); *out = 0; return true; } else { @@ -99,7 +111,7 @@ bool SenderContext::send(const qpid::messaging::Message& message, SenderContext: try { Delivery& delivery = deliveries.back(); delivery.encode(MessageImplAccess::get(message), address, setToOnSend); - delivery.send(sender, unreliable); + delivery.send(sender, unreliable, state); *out = &delivery; return true; } catch (const std::exception& e) { @@ -507,7 +519,8 @@ void SenderContext::Delivery::encode(const qpid::messaging::MessageImpl& msg, co throw SendError(e.what()); } } -void SenderContext::Delivery::send(pn_link_t* sender, bool unreliable) + +void SenderContext::Delivery::send(pn_link_t* sender, bool unreliable, const types::Variant& state) { pn_delivery_tag_t tag; tag.size = sizeof(id); @@ -517,6 +530,11 @@ void SenderContext::Delivery::send(pn_link_t* sender, bool unreliable) tag.bytes = reinterpret_cast<const char*>(&id); #endif token = pn_delivery(sender, tag); + if (!state.isVoid()) { // Add transaction state + PnData data(pn_disposition_data(pn_delivery_local(token))); + data.put(state); + pn_delivery_update(token, qpid::amqp::transaction::TRANSACTIONAL_STATE_CODE); + } pn_link_send(sender, encoded.getData(), encoded.getSize()); if (unreliable) { pn_delivery_settle(token); @@ -551,6 +569,15 @@ bool SenderContext::Delivery::rejected() { return pn_delivery_remote_state(token) == PN_REJECTED; } + +std::string SenderContext::Delivery::error() +{ + pn_condition_t *condition = pn_disposition_condition(pn_delivery_remote(token)); + return (condition && pn_condition_is_set(condition)) ? + Msg() << pn_condition_get_name(condition) << ": " << pn_condition_get_description(condition) : + std::string(); +} + void SenderContext::Delivery::settle() { pn_delivery_settle(token); @@ -570,10 +597,12 @@ void SenderContext::verify() helper.checkAssertion(target, AddressHelper::FOR_SENDER); } + void SenderContext::configure() { - configure(pn_link_target(sender)); + if (sender) configure(pn_link_target(sender)); } + void SenderContext::configure(pn_terminus_t* target) { helper.configure(sender, target, AddressHelper::FOR_SENDER); @@ -603,12 +632,10 @@ Address SenderContext::getAddress() const void SenderContext::reset(pn_session_t* session) { - sender = pn_sender(session, name.c_str()); - configure(); - - for (Deliveries::iterator i = deliveries.begin(); i != deliveries.end(); ++i) { + sender = session ? pn_sender(session, name.c_str()) : 0; + if (sender) configure(); + for (Deliveries::iterator i = deliveries.begin(); i != deliveries.end(); ++i) i->reset(); - } } void SenderContext::resend() |