summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp')
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp61
1 files changed, 44 insertions, 17 deletions
diff --git a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp
index 2a48b2241a..b12af5eb25 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp
@@ -18,8 +18,10 @@
* under the License.
*
*/
-#include "qpid/messaging/amqp/SenderContext.h"
-#include "qpid/messaging/amqp/EncodedMessage.h"
+#include "SenderContext.h"
+#include "Transaction.h"
+#include "EncodedMessage.h"
+#include "PnData.h"
#include "qpid/messaging/AddressImpl.h"
#include "qpid/messaging/exceptions.h"
#include "qpid/Exception.h"
@@ -40,22 +42,29 @@ extern "C" {
namespace qpid {
namespace messaging {
namespace amqp {
+
//TODO: proper conversion to wide string for address
-SenderContext::SenderContext(pn_session_t* session, const std::string& n, const qpid::messaging::Address& a, bool setToOnSend_)
- : name(n),
+SenderContext::SenderContext(pn_session_t* session, const std::string& n,
+ const qpid::messaging::Address& a,
+ bool setToOnSend_,
+ const CoordinatorPtr& coord)
+ : sender(pn_sender(session, n.c_str())),
+ name(n),
address(a),
helper(address),
- sender(pn_sender(session, n.c_str())), nextId(0), capacity(50), unreliable(helper.isUnreliable()),
- setToOnSend(setToOnSend_) {}
+ nextId(0), capacity(50), unreliable(helper.isUnreliable()),
+ setToOnSend(setToOnSend_),
+ transaction(coord)
+{}
SenderContext::~SenderContext()
{
- pn_link_free(sender);
+ if (sender) pn_link_free(sender);
}
void SenderContext::close()
{
- pn_link_close(sender);
+ if (sender) pn_link_close(sender);
}
void SenderContext::setCapacity(uint32_t c)
@@ -88,10 +97,13 @@ bool SenderContext::send(const qpid::messaging::Message& message, SenderContext:
{
resend();//if there are any messages needing to be resent at the front of the queue, send them first
if (processUnsettled(false) < capacity && pn_link_credit(sender)) {
+ types::Variant state;
+ if (transaction)
+ state = transaction->getSendState();
if (unreliable) {
Delivery delivery(nextId++);
delivery.encode(MessageImplAccess::get(message), address, setToOnSend);
- delivery.send(sender, unreliable);
+ delivery.send(sender, unreliable, state);
*out = 0;
return true;
} else {
@@ -99,7 +111,7 @@ bool SenderContext::send(const qpid::messaging::Message& message, SenderContext:
try {
Delivery& delivery = deliveries.back();
delivery.encode(MessageImplAccess::get(message), address, setToOnSend);
- delivery.send(sender, unreliable);
+ delivery.send(sender, unreliable, state);
*out = &delivery;
return true;
} catch (const std::exception& e) {
@@ -507,7 +519,8 @@ void SenderContext::Delivery::encode(const qpid::messaging::MessageImpl& msg, co
throw SendError(e.what());
}
}
-void SenderContext::Delivery::send(pn_link_t* sender, bool unreliable)
+
+void SenderContext::Delivery::send(pn_link_t* sender, bool unreliable, const types::Variant& state)
{
pn_delivery_tag_t tag;
tag.size = sizeof(id);
@@ -517,6 +530,11 @@ void SenderContext::Delivery::send(pn_link_t* sender, bool unreliable)
tag.bytes = reinterpret_cast<const char*>(&id);
#endif
token = pn_delivery(sender, tag);
+ if (!state.isVoid()) { // Add transaction state
+ PnData data(pn_disposition_data(pn_delivery_local(token)));
+ data.put(state);
+ pn_delivery_update(token, qpid::amqp::transaction::TRANSACTIONAL_STATE_CODE);
+ }
pn_link_send(sender, encoded.getData(), encoded.getSize());
if (unreliable) {
pn_delivery_settle(token);
@@ -551,6 +569,15 @@ bool SenderContext::Delivery::rejected()
{
return pn_delivery_remote_state(token) == PN_REJECTED;
}
+
+std::string SenderContext::Delivery::error()
+{
+ pn_condition_t *condition = pn_disposition_condition(pn_delivery_remote(token));
+ return (condition && pn_condition_is_set(condition)) ?
+ Msg() << pn_condition_get_name(condition) << ": " << pn_condition_get_description(condition) :
+ std::string();
+}
+
void SenderContext::Delivery::settle()
{
pn_delivery_settle(token);
@@ -570,10 +597,12 @@ void SenderContext::verify()
helper.checkAssertion(target, AddressHelper::FOR_SENDER);
}
+
void SenderContext::configure()
{
- configure(pn_link_target(sender));
+ if (sender) configure(pn_link_target(sender));
}
+
void SenderContext::configure(pn_terminus_t* target)
{
helper.configure(sender, target, AddressHelper::FOR_SENDER);
@@ -603,12 +632,10 @@ Address SenderContext::getAddress() const
void SenderContext::reset(pn_session_t* session)
{
- sender = pn_sender(session, name.c_str());
- configure();
-
- for (Deliveries::iterator i = deliveries.begin(); i != deliveries.end(); ++i) {
+ sender = session ? pn_sender(session, name.c_str()) : 0;
+ if (sender) configure();
+ for (Deliveries::iterator i = deliveries.begin(); i != deliveries.end(); ++i)
i->reset();
- }
}
void SenderContext::resend()