diff options
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/include/qpid/messaging/Address.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/AddressResolution.cpp | 8 | ||||
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/AddressResolution.h | 3 | ||||
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/SenderImpl.cpp | 24 | ||||
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/SenderImpl.h | 17 | ||||
-rw-r--r-- | cpp/src/qpid/messaging/Connection.cpp | 2 | ||||
-rw-r--r-- | cpp/src/tests/qpid_send.cpp | 4 |
7 files changed, 47 insertions, 13 deletions
diff --git a/cpp/include/qpid/messaging/Address.h b/cpp/include/qpid/messaging/Address.h index 88863072bd..745949792d 100644 --- a/cpp/include/qpid/messaging/Address.h +++ b/cpp/include/qpid/messaging/Address.h @@ -120,7 +120,7 @@ class AddressImpl; * exactly-once (the latter is not yet correctly supported).</td></tr> * * <tr valign=top><td>filter</td><td>(only relevant for topics at present) allows bindings to - * be created for the queue that match the given criteris (or list of + * be created for the queue that match the given criteria (or list of * criteria).</td></tr> * * <tr valign=top><td>x-properties</td><td>allows protocol or implementation specific options diff --git a/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp b/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp index 3e53f40ba4..f1b487c255 100644 --- a/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp +++ b/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp @@ -265,12 +265,12 @@ bool getSenderPolicy(const Address& address, const std::string& key) return in(address.getOption(key), list_of<std::string>(ALWAYS)(SENDER)); } -bool is_unreliable(const Address& address) +bool AddressResolution::is_unreliable(const Address& address) { return in(address.getOption(RELIABILITY), list_of<std::string>(UNRELIABLE)(AT_MOST_ONCE)); } -bool is_reliable(const Address& address) +bool AddressResolution::is_reliable(const Address& address) { return in(address.getOption(RELIABILITY), list_of<std::string>(AT_LEAST_ONCE)(EXACTLY_ONCE)); } @@ -346,7 +346,7 @@ const Variant& getNestedOption(const Variant::Map& options, const std::vector<st QueueSource::QueueSource(const Address& address) : Queue(address), - acceptMode(is_unreliable(address) ? ACCEPT_MODE_NONE : ACCEPT_MODE_EXPLICIT), + acceptMode(AddressResolution::is_unreliable(address) ? ACCEPT_MODE_NONE : ACCEPT_MODE_EXPLICIT), acquireMode(address.getOption(BROWSE).asBool() ? ACQUIRE_MODE_NOT_ACQUIRED : ACQUIRE_MODE_PRE_ACQUIRED), exclusive(false) { @@ -393,7 +393,7 @@ std::string Subscription::getSubscriptionName(const std::string& base, const Var Subscription::Subscription(const Address& address, const std::string& exchangeType) : Exchange(address), queue(getSubscriptionName(name, address.getOption(NAME))), - reliable(is_reliable(address)), + reliable(AddressResolution::is_reliable(address)), durable(address.getOption(DURABLE_SUBSCRIPTION).asBool()) { if (address.getOption(NO_LOCAL).asBool()) queueOptions.setInt(NO_LOCAL, 1); diff --git a/cpp/src/qpid/client/amqp0_10/AddressResolution.h b/cpp/src/qpid/client/amqp0_10/AddressResolution.h index 01c8c51595..5b81b06131 100644 --- a/cpp/src/qpid/client/amqp0_10/AddressResolution.h +++ b/cpp/src/qpid/client/amqp0_10/AddressResolution.h @@ -56,7 +56,8 @@ class AddressResolution static qpid::messaging::Address convert(const qpid::framing::ReplyTo&); static qpid::framing::ReplyTo convert(const qpid::messaging::Address&); - + static bool is_unreliable(const qpid::messaging::Address& address); + static bool is_reliable(const qpid::messaging::Address& address); private: }; }}} // namespace qpid::client::amqp0_10 diff --git a/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp b/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp index b8dd299571..8782e6e813 100644 --- a/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp +++ b/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp @@ -32,12 +32,17 @@ namespace amqp0_10 { SenderImpl::SenderImpl(SessionImpl& _parent, const std::string& _name, const qpid::messaging::Address& _address) : parent(_parent), name(_name), address(_address), state(UNRESOLVED), - capacity(50), window(0), flushed(false) {} + capacity(50), window(0), flushed(false), unreliable(AddressResolution::is_unreliable(address)) {} void SenderImpl::send(const qpid::messaging::Message& message) { - Send f(*this, &message); - while (f.repeat) parent.execute(f); + if (unreliable) { + UnreliableSend f(*this, &message); + parent.execute(f); + } else { + Send f(*this, &message); + while (f.repeat) parent.execute(f); + } } void SenderImpl::close() @@ -78,7 +83,7 @@ void SenderImpl::init(qpid::client::AsyncSession s, AddressResolution& resolver) void SenderImpl::waitForCapacity() { //TODO: add option to throw exception rather than blocking? - if (capacity <= (flushed ? checkPendingSends(false) : outgoing.size())) { + if (!unreliable && capacity <= (flushed ? checkPendingSends(false) : outgoing.size())) { //Initial implementation is very basic. As outgoing is //currently only reduced on receiving completions and we are //blocking anyway we may as well sync(). If successful that @@ -93,9 +98,8 @@ void SenderImpl::waitForCapacity() } } -void SenderImpl::sendImpl(const qpid::messaging::Message& m) +void SenderImpl::sendImpl(const qpid::messaging::Message& m) { - //TODO: make recording for replay optional (would still want to track completion however) std::auto_ptr<OutgoingMessage> msg(new OutgoingMessage()); msg->convert(m); msg->setSubject(m.getSubject().empty() ? address.getSubject() : m.getSubject()); @@ -103,6 +107,14 @@ void SenderImpl::sendImpl(const qpid::messaging::Message& m) sink->send(session, name, outgoing.back()); } +void SenderImpl::sendUnreliable(const qpid::messaging::Message& m) +{ + OutgoingMessage msg; + msg.convert(m); + msg.setSubject(m.getSubject().empty() ? address.getSubject() : m.getSubject()); + sink->send(session, name, msg); +} + void SenderImpl::replay() { for (OutgoingMessages::iterator i = outgoing.begin(); i != outgoing.end(); ++i) { diff --git a/cpp/src/qpid/client/amqp0_10/SenderImpl.h b/cpp/src/qpid/client/amqp0_10/SenderImpl.h index 881f3c754c..b65f8cf8cc 100644 --- a/cpp/src/qpid/client/amqp0_10/SenderImpl.h +++ b/cpp/src/qpid/client/amqp0_10/SenderImpl.h @@ -73,6 +73,7 @@ class SenderImpl : public qpid::messaging::SenderImpl uint32_t capacity; uint32_t window; bool flushed; + const bool unreliable; uint32_t checkPendingSends(bool flush); void replay(); @@ -80,6 +81,7 @@ class SenderImpl : public qpid::messaging::SenderImpl //logic for application visible methods: void sendImpl(const qpid::messaging::Message&); + void sendUnreliable(const qpid::messaging::Message&); void closeImpl(); @@ -108,6 +110,21 @@ class SenderImpl : public qpid::messaging::SenderImpl } }; + struct UnreliableSend : Command + { + const qpid::messaging::Message* message; + + UnreliableSend(SenderImpl& i, const qpid::messaging::Message* m) : Command(i), message(m) {} + void operator()() + { + //TODO: ideally want to put messages on the outbound + //queue and pull them off in io thread, but the old + //0-10 client doesn't support that option so for now + //we simply don't queue unreliable messages + impl.sendUnreliable(*message); + } + }; + struct Close : Command { Close(SenderImpl& i) : Command(i) {} diff --git a/cpp/src/qpid/messaging/Connection.cpp b/cpp/src/qpid/messaging/Connection.cpp index 06006c9d20..230c9d5dbf 100644 --- a/cpp/src/qpid/messaging/Connection.cpp +++ b/cpp/src/qpid/messaging/Connection.cpp @@ -47,7 +47,7 @@ Connection::Connection(const std::string& o) { Variant::Map options; AddressParser parser(o); - if (parser.parseMap(options)) { + if (o.empty() || parser.parseMap(options)) { PI::ctor(*this, new qpid::client::amqp0_10::ConnectionImpl(options)); } else { throw InvalidOptionString(o); diff --git a/cpp/src/tests/qpid_send.cpp b/cpp/src/tests/qpid_send.cpp index 9556fb000f..57c348ab9c 100644 --- a/cpp/src/tests/qpid_send.cpp +++ b/cpp/src/tests/qpid_send.cpp @@ -63,6 +63,7 @@ struct Options : public qpid::Options std::string content; uint tx; uint rollbackFrequency; + uint capacity; qpid::log::Options log; Options(const std::string& argv0=std::string()) @@ -76,6 +77,7 @@ struct Options : public qpid::Options ttl(0), tx(0), rollbackFrequency(0), + capacity(0), log(argv0) { addOptions() @@ -94,6 +96,7 @@ struct Options : public qpid::Options ("correlation-id", qpid::optValue(correlationid, "ID"), "correlation-id for message") ("user-id", qpid::optValue(userid, "USERID"), "userid for message") ("content", qpid::optValue(content, "CONTENT"), "specify textual content") + ("capacity", qpid::optValue(capacity, "N"), "size of the senders outgoing message queue") ("tx", qpid::optValue(tx, "N"), "batch size for transactions (0 implies transaction are not used)") ("rollback-frequency", qpid::optValue(rollbackFrequency, "N"), "rollback frequency (0 implies no transaction will be rolledback)") ("help", qpid::optValue(help), "print this usage statement"); @@ -184,6 +187,7 @@ int main(int argc, char ** argv) connection.open(opts.url); Session session = connection.newSession(opts.tx > 0); Sender sender = session.createSender(opts.address); + if (opts.capacity) sender.setCapacity(opts.capacity); Message msg; msg.setDurable(opts.durable); if (opts.ttl) { |