diff options
-rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp | 9 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Outgoing.h | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Session.cpp | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp | 18 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h | 5 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp | 30 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/messaging/amqp/SenderContext.h | 5 |
9 files changed, 54 insertions, 21 deletions
diff --git a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp index 86fe34d8d3..3d2644380a 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp @@ -52,7 +52,8 @@ OutgoingFromQueue::OutgoingFromQueue(Broker& broker, const std::string& source, isControllingUser(p), queue(q), deliveries(5000), link(l), out(o), current(0), outstanding(0), - buffer(1024)/*used only for header at present*/ + buffer(1024)/*used only for header at present*/, + unreliable(pn_link_remote_snd_settle_mode(link) == PN_SND_SETTLED) { for (size_t i = 0 ; i < deliveries.capacity(); ++i) { deliveries[i].init(i); @@ -105,6 +106,7 @@ void OutgoingFromQueue::handle(pn_delivery_t* delivery) write(&buffer[0], encoder.getPosition()); Translation t(r.msg); t.write(*this); + if (unreliable) pn_delivery_settle(delivery); if (pn_link_advance(link)) { --outstanding; outgoingMessageSent(); @@ -113,7 +115,10 @@ void OutgoingFromQueue::handle(pn_delivery_t* delivery) QPID_LOG(error, "Failed to send message " << r.msg.getSequence() << " from " << queue->getName() << ", index=" << r.index); } } - if (pn_delivery_updated(delivery)) { + if (unreliable) { + if (preAcquires()) queue->dequeue(0, r.cursor); + r.reset(); + } else if (pn_delivery_updated(delivery)) { assert(r.delivery == delivery); r.disposition = pn_delivery_remote_state(delivery); if (r.disposition) { diff --git a/qpid/cpp/src/qpid/broker/amqp/Outgoing.h b/qpid/cpp/src/qpid/broker/amqp/Outgoing.h index f0f2226e10..d333c54672 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Outgoing.h +++ b/qpid/cpp/src/qpid/broker/amqp/Outgoing.h @@ -135,6 +135,7 @@ class OutgoingFromQueue : public Outgoing, public qpid::broker::Consumer, public std::vector<char> buffer; std::string subjectFilter; boost::scoped_ptr<Selector> selector; + bool unreliable; }; }}} // namespace qpid::broker::amqp diff --git a/qpid/cpp/src/qpid/broker/amqp/Session.cpp b/qpid/cpp/src/qpid/broker/amqp/Session.cpp index 17d7560e75..0344ea537e 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Session.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Session.cpp @@ -317,7 +317,6 @@ void Session::setupOutgoing(pn_link_t* link, pn_terminus_t* source, const std::s target = targetAddress; } - if (node.queue) { authorise.outgoing(node.queue); SubscriptionType type = pn_terminus_get_distribution_mode(source) == PN_DIST_MODE_COPY ? BROWSER : CONSUMER; diff --git a/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp index 2a358a99f7..9b0fd18ed1 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp @@ -73,6 +73,12 @@ const std::string SUBJECT_FILTER("subject-filter"); const std::string SOURCE("sender-source"); const std::string TARGET("receiver-target"); +//reliability options: +const std::string UNRELIABLE("unreliable"); +const std::string AT_MOST_ONCE("at-most-once"); +const std::string AT_LEAST_ONCE("at-least-once"); +const std::string EXACTLY_ONCE("exactly-once"); + //distribution modes: const std::string MOVE("move"); const std::string COPY("copy"); @@ -293,6 +299,7 @@ AddressHelper::AddressHelper(const Address& address) : bind(address, LINK, link); bind(node, PROPERTIES, properties); bind(node, CAPABILITIES, capabilities); + bind(link, RELIABILITY, reliability); durableNode = test(node, DURABLE); durableLink = test(link, DURABLE); timeout = get(link, TIMEOUT, durableLink ? DEFAULT_DURABLE_TIMEOUT : DEFAULT_TIMEOUT); @@ -506,6 +513,11 @@ bool AddressHelper::enabled(const std::string& policy, CheckMode mode) const return result; } +bool AddressHelper::isUnreliable() const +{ + return reliability == AT_MOST_ONCE || reliability == UNRELIABLE; +} + const qpid::types::Variant::Map& AddressHelper::getNodeProperties() const { return node; @@ -536,7 +548,7 @@ bool AddressHelper::getLinkOption(const std::string& name, std::string& out) con } } -void AddressHelper::configure(pn_terminus_t* terminus, CheckMode mode) +void AddressHelper::configure(pn_link_t* link, pn_terminus_t* terminus, CheckMode mode) { bool createOnDemand(false); if (isTemporary) { @@ -581,7 +593,9 @@ void AddressHelper::configure(pn_terminus_t* terminus, CheckMode mode) pn_data_exit(filter); } } - + if (isUnreliable()) { + pn_link_set_snd_settle_mode(link, PN_SND_SETTLED); + } } void AddressHelper::setCapabilities(pn_terminus_t* terminus, bool create) diff --git a/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h index cb48918e8f..3ee58cad8d 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h +++ b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h @@ -24,6 +24,7 @@ #include "qpid/types/Variant.h" #include <vector> +struct pn_link_t; struct pn_terminus_t; namespace qpid { @@ -36,9 +37,10 @@ class AddressHelper enum CheckMode {FOR_RECEIVER, FOR_SENDER}; AddressHelper(const Address& address); - void configure(pn_terminus_t* terminus, CheckMode mode); + void configure(pn_link_t* link, pn_terminus_t* terminus, CheckMode mode); void checkAssertion(pn_terminus_t* terminus, CheckMode mode); + bool isUnreliable() const; const qpid::types::Variant::Map& getNodeProperties() const; bool getLinkSource(std::string& out) const; bool getLinkTarget(std::string& out) const; @@ -68,6 +70,7 @@ class AddressHelper qpid::types::Variant::List capabilities; std::string name; std::string type; + std::string reliability; bool durableNode; bool durableLink; uint32_t timeout; diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp index 12ec0d5b20..e42002aa0d 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp @@ -377,12 +377,12 @@ void ConnectionContext::send(boost::shared_ptr<SessionContext> ssn, boost::share qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); checkClosed(ssn); SenderContext::Delivery* delivery(0); - while (!(delivery = snd->send(message))) { + while (!snd->send(message, &delivery)) { QPID_LOG(debug, "Waiting for capacity..."); wait(ssn, snd);//wait for capacity } wakeupDriver(); - if (sync) { + if (sync && delivery) { while (!delivery->accepted()) { QPID_LOG(debug, "Waiting for confirmation..."); wait(ssn, snd);//wait until message has been confirmed diff --git a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp index 10178f31d0..661856122d 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp @@ -99,7 +99,7 @@ void ReceiverContext::configure() } void ReceiverContext::configure(pn_terminus_t* source) { - helper.configure(source, AddressHelper::FOR_RECEIVER); + helper.configure(receiver, source, AddressHelper::FOR_RECEIVER); std::string option; if (helper.getLinkTarget(option)) { pn_terminus_set_address(pn_link_target(receiver), option.c_str()); diff --git a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp index 92a8941571..1926afcb27 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp @@ -42,7 +42,7 @@ SenderContext::SenderContext(pn_session_t* session, const std::string& n, const : name(n), address(a), helper(address), - sender(pn_sender(session, n.c_str())), capacity(1000) {} + sender(pn_sender(session, n.c_str())), capacity(1000), unreliable(helper.isUnreliable()) {} SenderContext::~SenderContext() { @@ -80,16 +80,25 @@ const std::string& SenderContext::getTarget() const return address.getName(); } -SenderContext::Delivery* SenderContext::send(const qpid::messaging::Message& message) +bool SenderContext::send(const qpid::messaging::Message& message, SenderContext::Delivery** out) { if (processUnsettled(false) < capacity && pn_link_credit(sender)) { - deliveries.push_back(Delivery(nextId++)); - Delivery& delivery = deliveries.back(); - delivery.encode(MessageImplAccess::get(message), address); - delivery.send(sender); - return &delivery; + if (unreliable) { + Delivery delivery(nextId++); + delivery.encode(MessageImplAccess::get(message), address); + delivery.send(sender, unreliable); + *out = 0; + return true; + } else { + deliveries.push_back(Delivery(nextId++)); + Delivery& delivery = deliveries.back(); + delivery.encode(MessageImplAccess::get(message), address); + delivery.send(sender, unreliable); + *out = &delivery; + return true; + } } else { - return 0; + return false; } } @@ -474,13 +483,14 @@ void SenderContext::Delivery::encode(const qpid::messaging::MessageImpl& msg, co //write footer (no annotations yet supported) } } -void SenderContext::Delivery::send(pn_link_t* sender) +void SenderContext::Delivery::send(pn_link_t* sender, bool unreliable) { pn_delivery_tag_t tag; tag.size = sizeof(id); tag.bytes = reinterpret_cast<const char*>(&id); token = pn_delivery(sender, tag); pn_link_send(sender, encoded.getData(), encoded.getSize()); + if (unreliable) pn_delivery_settle(token); pn_link_advance(sender); } @@ -520,7 +530,7 @@ void SenderContext::configure() } void SenderContext::configure(pn_terminus_t* target) { - helper.configure(target, AddressHelper::FOR_SENDER); + helper.configure(sender, target, AddressHelper::FOR_SENDER); std::string option; if (helper.getLinkSource(option)) { pn_terminus_set_address(pn_link_source(sender), option.c_str()); diff --git a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h index 4d73d38afe..fcdfbbcf96 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h +++ b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h @@ -52,7 +52,7 @@ class SenderContext public: Delivery(int32_t id); void encode(const qpid::messaging::MessageImpl& message, const qpid::messaging::Address&); - void send(pn_link_t*); + void send(pn_link_t*, bool unreliable); bool delivered(); bool accepted(); bool rejected(); @@ -71,7 +71,7 @@ class SenderContext uint32_t getUnsettled(); const std::string& getName() const; const std::string& getTarget() const; - Delivery* send(const qpid::messaging::Message& message); + bool send(const qpid::messaging::Message& message, Delivery**); void configure(); void verify(pn_terminus_t*); void check(); @@ -88,6 +88,7 @@ class SenderContext int32_t nextId; Deliveries deliveries; uint32_t capacity; + bool unreliable; uint32_t processUnsettled(bool silent); void configure(pn_terminus_t*); |