From 4840fe938fd73412ad9393569b37b5a7416c1b51 Mon Sep 17 00:00:00 2001 From: Gordon Sim Date: Tue, 2 Feb 2010 11:09:11 +0000 Subject: QPID-2380: recognise reliability option for sender (also added capacity to qpid_send test client and fixed handling of empty option string) git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@905579 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/client/amqp0_10/AddressResolution.cpp | 8 ++++---- cpp/src/qpid/client/amqp0_10/AddressResolution.h | 3 ++- cpp/src/qpid/client/amqp0_10/SenderImpl.cpp | 24 ++++++++++++++++------ cpp/src/qpid/client/amqp0_10/SenderImpl.h | 17 +++++++++++++++ cpp/src/qpid/messaging/Connection.cpp | 2 +- cpp/src/tests/qpid_send.cpp | 4 ++++ 6 files changed, 46 insertions(+), 12 deletions(-) (limited to 'cpp/src') diff --git a/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp b/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp index 3e53f40ba4..f1b487c255 100644 --- a/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp +++ b/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp @@ -265,12 +265,12 @@ bool getSenderPolicy(const Address& address, const std::string& key) return in(address.getOption(key), list_of(ALWAYS)(SENDER)); } -bool is_unreliable(const Address& address) +bool AddressResolution::is_unreliable(const Address& address) { return in(address.getOption(RELIABILITY), list_of(UNRELIABLE)(AT_MOST_ONCE)); } -bool is_reliable(const Address& address) +bool AddressResolution::is_reliable(const Address& address) { return in(address.getOption(RELIABILITY), list_of(AT_LEAST_ONCE)(EXACTLY_ONCE)); } @@ -346,7 +346,7 @@ const Variant& getNestedOption(const Variant::Map& options, const std::vector msg(new OutgoingMessage()); msg->convert(m); msg->setSubject(m.getSubject().empty() ? address.getSubject() : m.getSubject()); @@ -103,6 +107,14 @@ void SenderImpl::sendImpl(const qpid::messaging::Message& m) sink->send(session, name, outgoing.back()); } +void SenderImpl::sendUnreliable(const qpid::messaging::Message& m) +{ + OutgoingMessage msg; + msg.convert(m); + msg.setSubject(m.getSubject().empty() ? address.getSubject() : m.getSubject()); + sink->send(session, name, msg); +} + void SenderImpl::replay() { for (OutgoingMessages::iterator i = outgoing.begin(); i != outgoing.end(); ++i) { diff --git a/cpp/src/qpid/client/amqp0_10/SenderImpl.h b/cpp/src/qpid/client/amqp0_10/SenderImpl.h index 881f3c754c..b65f8cf8cc 100644 --- a/cpp/src/qpid/client/amqp0_10/SenderImpl.h +++ b/cpp/src/qpid/client/amqp0_10/SenderImpl.h @@ -73,6 +73,7 @@ class SenderImpl : public qpid::messaging::SenderImpl uint32_t capacity; uint32_t window; bool flushed; + const bool unreliable; uint32_t checkPendingSends(bool flush); void replay(); @@ -80,6 +81,7 @@ class SenderImpl : public qpid::messaging::SenderImpl //logic for application visible methods: void sendImpl(const qpid::messaging::Message&); + void sendUnreliable(const qpid::messaging::Message&); void closeImpl(); @@ -108,6 +110,21 @@ class SenderImpl : public qpid::messaging::SenderImpl } }; + struct UnreliableSend : Command + { + const qpid::messaging::Message* message; + + UnreliableSend(SenderImpl& i, const qpid::messaging::Message* m) : Command(i), message(m) {} + void operator()() + { + //TODO: ideally want to put messages on the outbound + //queue and pull them off in io thread, but the old + //0-10 client doesn't support that option so for now + //we simply don't queue unreliable messages + impl.sendUnreliable(*message); + } + }; + struct Close : Command { Close(SenderImpl& i) : Command(i) {} diff --git a/cpp/src/qpid/messaging/Connection.cpp b/cpp/src/qpid/messaging/Connection.cpp index 06006c9d20..230c9d5dbf 100644 --- a/cpp/src/qpid/messaging/Connection.cpp +++ b/cpp/src/qpid/messaging/Connection.cpp @@ -47,7 +47,7 @@ Connection::Connection(const std::string& o) { Variant::Map options; AddressParser parser(o); - if (parser.parseMap(options)) { + if (o.empty() || parser.parseMap(options)) { PI::ctor(*this, new qpid::client::amqp0_10::ConnectionImpl(options)); } else { throw InvalidOptionString(o); diff --git a/cpp/src/tests/qpid_send.cpp b/cpp/src/tests/qpid_send.cpp index 9556fb000f..57c348ab9c 100644 --- a/cpp/src/tests/qpid_send.cpp +++ b/cpp/src/tests/qpid_send.cpp @@ -63,6 +63,7 @@ struct Options : public qpid::Options std::string content; uint tx; uint rollbackFrequency; + uint capacity; qpid::log::Options log; Options(const std::string& argv0=std::string()) @@ -76,6 +77,7 @@ struct Options : public qpid::Options ttl(0), tx(0), rollbackFrequency(0), + capacity(0), log(argv0) { addOptions() @@ -94,6 +96,7 @@ struct Options : public qpid::Options ("correlation-id", qpid::optValue(correlationid, "ID"), "correlation-id for message") ("user-id", qpid::optValue(userid, "USERID"), "userid for message") ("content", qpid::optValue(content, "CONTENT"), "specify textual content") + ("capacity", qpid::optValue(capacity, "N"), "size of the senders outgoing message queue") ("tx", qpid::optValue(tx, "N"), "batch size for transactions (0 implies transaction are not used)") ("rollback-frequency", qpid::optValue(rollbackFrequency, "N"), "rollback frequency (0 implies no transaction will be rolledback)") ("help", qpid::optValue(help), "print this usage statement"); @@ -184,6 +187,7 @@ int main(int argc, char ** argv) connection.open(opts.url); Session session = connection.newSession(opts.tx > 0); Sender sender = session.createSender(opts.address); + if (opts.capacity) sender.setCapacity(opts.capacity); Message msg; msg.setDurable(opts.durable); if (opts.ttl) { -- cgit v1.2.1