summaryrefslogtreecommitdiff
path: root/qpid/cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2013-08-28 12:41:31 +0000
committerGordon Sim <gsim@apache.org>2013-08-28 12:41:31 +0000
commit01cb164d09b628206335c138eba796b3487c5ea0 (patch)
treef237bb35e462a28039cef852d4e8e03cda08bd8e /qpid/cpp
parentc29ed9b3d6d5bc2f772b7700166fa11d138ae3ec (diff)
downloadqpid-python-01cb164d09b628206335c138eba796b3487c5ea0.tar.gz
QPID-4978: add support for reliability option
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1518182 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp9
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Outgoing.h1
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Session.cpp1
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp18
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h5
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp4
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp2
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp30
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/SenderContext.h5
9 files changed, 54 insertions, 21 deletions
diff --git a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
index 86fe34d8d3..3d2644380a 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
@@ -52,7 +52,8 @@ OutgoingFromQueue::OutgoingFromQueue(Broker& broker, const std::string& source,
isControllingUser(p),
queue(q), deliveries(5000), link(l), out(o),
current(0), outstanding(0),
- buffer(1024)/*used only for header at present*/
+ buffer(1024)/*used only for header at present*/,
+ unreliable(pn_link_remote_snd_settle_mode(link) == PN_SND_SETTLED)
{
for (size_t i = 0 ; i < deliveries.capacity(); ++i) {
deliveries[i].init(i);
@@ -105,6 +106,7 @@ void OutgoingFromQueue::handle(pn_delivery_t* delivery)
write(&buffer[0], encoder.getPosition());
Translation t(r.msg);
t.write(*this);
+ if (unreliable) pn_delivery_settle(delivery);
if (pn_link_advance(link)) {
--outstanding;
outgoingMessageSent();
@@ -113,7 +115,10 @@ void OutgoingFromQueue::handle(pn_delivery_t* delivery)
QPID_LOG(error, "Failed to send message " << r.msg.getSequence() << " from " << queue->getName() << ", index=" << r.index);
}
}
- if (pn_delivery_updated(delivery)) {
+ if (unreliable) {
+ if (preAcquires()) queue->dequeue(0, r.cursor);
+ r.reset();
+ } else if (pn_delivery_updated(delivery)) {
assert(r.delivery == delivery);
r.disposition = pn_delivery_remote_state(delivery);
if (r.disposition) {
diff --git a/qpid/cpp/src/qpid/broker/amqp/Outgoing.h b/qpid/cpp/src/qpid/broker/amqp/Outgoing.h
index f0f2226e10..d333c54672 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Outgoing.h
+++ b/qpid/cpp/src/qpid/broker/amqp/Outgoing.h
@@ -135,6 +135,7 @@ class OutgoingFromQueue : public Outgoing, public qpid::broker::Consumer, public
std::vector<char> buffer;
std::string subjectFilter;
boost::scoped_ptr<Selector> selector;
+ bool unreliable;
};
}}} // namespace qpid::broker::amqp
diff --git a/qpid/cpp/src/qpid/broker/amqp/Session.cpp b/qpid/cpp/src/qpid/broker/amqp/Session.cpp
index 17d7560e75..0344ea537e 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Session.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Session.cpp
@@ -317,7 +317,6 @@ void Session::setupOutgoing(pn_link_t* link, pn_terminus_t* source, const std::s
target = targetAddress;
}
-
if (node.queue) {
authorise.outgoing(node.queue);
SubscriptionType type = pn_terminus_get_distribution_mode(source) == PN_DIST_MODE_COPY ? BROWSER : CONSUMER;
diff --git a/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp
index 2a358a99f7..9b0fd18ed1 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp
@@ -73,6 +73,12 @@ const std::string SUBJECT_FILTER("subject-filter");
const std::string SOURCE("sender-source");
const std::string TARGET("receiver-target");
+//reliability options:
+const std::string UNRELIABLE("unreliable");
+const std::string AT_MOST_ONCE("at-most-once");
+const std::string AT_LEAST_ONCE("at-least-once");
+const std::string EXACTLY_ONCE("exactly-once");
+
//distribution modes:
const std::string MOVE("move");
const std::string COPY("copy");
@@ -293,6 +299,7 @@ AddressHelper::AddressHelper(const Address& address) :
bind(address, LINK, link);
bind(node, PROPERTIES, properties);
bind(node, CAPABILITIES, capabilities);
+ bind(link, RELIABILITY, reliability);
durableNode = test(node, DURABLE);
durableLink = test(link, DURABLE);
timeout = get(link, TIMEOUT, durableLink ? DEFAULT_DURABLE_TIMEOUT : DEFAULT_TIMEOUT);
@@ -506,6 +513,11 @@ bool AddressHelper::enabled(const std::string& policy, CheckMode mode) const
return result;
}
+bool AddressHelper::isUnreliable() const
+{
+ return reliability == AT_MOST_ONCE || reliability == UNRELIABLE;
+}
+
const qpid::types::Variant::Map& AddressHelper::getNodeProperties() const
{
return node;
@@ -536,7 +548,7 @@ bool AddressHelper::getLinkOption(const std::string& name, std::string& out) con
}
}
-void AddressHelper::configure(pn_terminus_t* terminus, CheckMode mode)
+void AddressHelper::configure(pn_link_t* link, pn_terminus_t* terminus, CheckMode mode)
{
bool createOnDemand(false);
if (isTemporary) {
@@ -581,7 +593,9 @@ void AddressHelper::configure(pn_terminus_t* terminus, CheckMode mode)
pn_data_exit(filter);
}
}
-
+ if (isUnreliable()) {
+ pn_link_set_snd_settle_mode(link, PN_SND_SETTLED);
+ }
}
void AddressHelper::setCapabilities(pn_terminus_t* terminus, bool create)
diff --git a/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h
index cb48918e8f..3ee58cad8d 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h
+++ b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h
@@ -24,6 +24,7 @@
#include "qpid/types/Variant.h"
#include <vector>
+struct pn_link_t;
struct pn_terminus_t;
namespace qpid {
@@ -36,9 +37,10 @@ class AddressHelper
enum CheckMode {FOR_RECEIVER, FOR_SENDER};
AddressHelper(const Address& address);
- void configure(pn_terminus_t* terminus, CheckMode mode);
+ void configure(pn_link_t* link, pn_terminus_t* terminus, CheckMode mode);
void checkAssertion(pn_terminus_t* terminus, CheckMode mode);
+ bool isUnreliable() const;
const qpid::types::Variant::Map& getNodeProperties() const;
bool getLinkSource(std::string& out) const;
bool getLinkTarget(std::string& out) const;
@@ -68,6 +70,7 @@ class AddressHelper
qpid::types::Variant::List capabilities;
std::string name;
std::string type;
+ std::string reliability;
bool durableNode;
bool durableLink;
uint32_t timeout;
diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
index 12ec0d5b20..e42002aa0d 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
@@ -377,12 +377,12 @@ void ConnectionContext::send(boost::shared_ptr<SessionContext> ssn, boost::share
qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
checkClosed(ssn);
SenderContext::Delivery* delivery(0);
- while (!(delivery = snd->send(message))) {
+ while (!snd->send(message, &delivery)) {
QPID_LOG(debug, "Waiting for capacity...");
wait(ssn, snd);//wait for capacity
}
wakeupDriver();
- if (sync) {
+ if (sync && delivery) {
while (!delivery->accepted()) {
QPID_LOG(debug, "Waiting for confirmation...");
wait(ssn, snd);//wait until message has been confirmed
diff --git a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp
index 10178f31d0..661856122d 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp
@@ -99,7 +99,7 @@ void ReceiverContext::configure()
}
void ReceiverContext::configure(pn_terminus_t* source)
{
- helper.configure(source, AddressHelper::FOR_RECEIVER);
+ helper.configure(receiver, source, AddressHelper::FOR_RECEIVER);
std::string option;
if (helper.getLinkTarget(option)) {
pn_terminus_set_address(pn_link_target(receiver), option.c_str());
diff --git a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp
index 92a8941571..1926afcb27 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp
@@ -42,7 +42,7 @@ SenderContext::SenderContext(pn_session_t* session, const std::string& n, const
: name(n),
address(a),
helper(address),
- sender(pn_sender(session, n.c_str())), capacity(1000) {}
+ sender(pn_sender(session, n.c_str())), capacity(1000), unreliable(helper.isUnreliable()) {}
SenderContext::~SenderContext()
{
@@ -80,16 +80,25 @@ const std::string& SenderContext::getTarget() const
return address.getName();
}
-SenderContext::Delivery* SenderContext::send(const qpid::messaging::Message& message)
+bool SenderContext::send(const qpid::messaging::Message& message, SenderContext::Delivery** out)
{
if (processUnsettled(false) < capacity && pn_link_credit(sender)) {
- deliveries.push_back(Delivery(nextId++));
- Delivery& delivery = deliveries.back();
- delivery.encode(MessageImplAccess::get(message), address);
- delivery.send(sender);
- return &delivery;
+ if (unreliable) {
+ Delivery delivery(nextId++);
+ delivery.encode(MessageImplAccess::get(message), address);
+ delivery.send(sender, unreliable);
+ *out = 0;
+ return true;
+ } else {
+ deliveries.push_back(Delivery(nextId++));
+ Delivery& delivery = deliveries.back();
+ delivery.encode(MessageImplAccess::get(message), address);
+ delivery.send(sender, unreliable);
+ *out = &delivery;
+ return true;
+ }
} else {
- return 0;
+ return false;
}
}
@@ -474,13 +483,14 @@ void SenderContext::Delivery::encode(const qpid::messaging::MessageImpl& msg, co
//write footer (no annotations yet supported)
}
}
-void SenderContext::Delivery::send(pn_link_t* sender)
+void SenderContext::Delivery::send(pn_link_t* sender, bool unreliable)
{
pn_delivery_tag_t tag;
tag.size = sizeof(id);
tag.bytes = reinterpret_cast<const char*>(&id);
token = pn_delivery(sender, tag);
pn_link_send(sender, encoded.getData(), encoded.getSize());
+ if (unreliable) pn_delivery_settle(token);
pn_link_advance(sender);
}
@@ -520,7 +530,7 @@ void SenderContext::configure()
}
void SenderContext::configure(pn_terminus_t* target)
{
- helper.configure(target, AddressHelper::FOR_SENDER);
+ helper.configure(sender, target, AddressHelper::FOR_SENDER);
std::string option;
if (helper.getLinkSource(option)) {
pn_terminus_set_address(pn_link_source(sender), option.c_str());
diff --git a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h
index 4d73d38afe..fcdfbbcf96 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h
+++ b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h
@@ -52,7 +52,7 @@ class SenderContext
public:
Delivery(int32_t id);
void encode(const qpid::messaging::MessageImpl& message, const qpid::messaging::Address&);
- void send(pn_link_t*);
+ void send(pn_link_t*, bool unreliable);
bool delivered();
bool accepted();
bool rejected();
@@ -71,7 +71,7 @@ class SenderContext
uint32_t getUnsettled();
const std::string& getName() const;
const std::string& getTarget() const;
- Delivery* send(const qpid::messaging::Message& message);
+ bool send(const qpid::messaging::Message& message, Delivery**);
void configure();
void verify(pn_terminus_t*);
void check();
@@ -88,6 +88,7 @@ class SenderContext
int32_t nextId;
Deliveries deliveries;
uint32_t capacity;
+ bool unreliable;
uint32_t processUnsettled(bool silent);
void configure(pn_terminus_t*);