summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/client/amqp0_10/AddressResolution.cpp8
-rw-r--r--cpp/src/qpid/client/amqp0_10/AddressResolution.h3
-rw-r--r--cpp/src/qpid/client/amqp0_10/SenderImpl.cpp24
-rw-r--r--cpp/src/qpid/client/amqp0_10/SenderImpl.h17
-rw-r--r--cpp/src/qpid/messaging/Connection.cpp2
-rw-r--r--cpp/src/tests/qpid_send.cpp4
6 files changed, 46 insertions, 12 deletions
diff --git a/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp b/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
index 3e53f40ba4..f1b487c255 100644
--- a/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
+++ b/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
@@ -265,12 +265,12 @@ bool getSenderPolicy(const Address& address, const std::string& key)
return in(address.getOption(key), list_of<std::string>(ALWAYS)(SENDER));
}
-bool is_unreliable(const Address& address)
+bool AddressResolution::is_unreliable(const Address& address)
{
return in(address.getOption(RELIABILITY), list_of<std::string>(UNRELIABLE)(AT_MOST_ONCE));
}
-bool is_reliable(const Address& address)
+bool AddressResolution::is_reliable(const Address& address)
{
return in(address.getOption(RELIABILITY), list_of<std::string>(AT_LEAST_ONCE)(EXACTLY_ONCE));
}
@@ -346,7 +346,7 @@ const Variant& getNestedOption(const Variant::Map& options, const std::vector<st
QueueSource::QueueSource(const Address& address) :
Queue(address),
- acceptMode(is_unreliable(address) ? ACCEPT_MODE_NONE : ACCEPT_MODE_EXPLICIT),
+ acceptMode(AddressResolution::is_unreliable(address) ? ACCEPT_MODE_NONE : ACCEPT_MODE_EXPLICIT),
acquireMode(address.getOption(BROWSE).asBool() ? ACQUIRE_MODE_NOT_ACQUIRED : ACQUIRE_MODE_PRE_ACQUIRED),
exclusive(false)
{
@@ -393,7 +393,7 @@ std::string Subscription::getSubscriptionName(const std::string& base, const Var
Subscription::Subscription(const Address& address, const std::string& exchangeType)
: Exchange(address),
queue(getSubscriptionName(name, address.getOption(NAME))),
- reliable(is_reliable(address)),
+ reliable(AddressResolution::is_reliable(address)),
durable(address.getOption(DURABLE_SUBSCRIPTION).asBool())
{
if (address.getOption(NO_LOCAL).asBool()) queueOptions.setInt(NO_LOCAL, 1);
diff --git a/cpp/src/qpid/client/amqp0_10/AddressResolution.h b/cpp/src/qpid/client/amqp0_10/AddressResolution.h
index 01c8c51595..5b81b06131 100644
--- a/cpp/src/qpid/client/amqp0_10/AddressResolution.h
+++ b/cpp/src/qpid/client/amqp0_10/AddressResolution.h
@@ -56,7 +56,8 @@ class AddressResolution
static qpid::messaging::Address convert(const qpid::framing::ReplyTo&);
static qpid::framing::ReplyTo convert(const qpid::messaging::Address&);
-
+ static bool is_unreliable(const qpid::messaging::Address& address);
+ static bool is_reliable(const qpid::messaging::Address& address);
private:
};
}}} // namespace qpid::client::amqp0_10
diff --git a/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp b/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
index b8dd299571..8782e6e813 100644
--- a/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
+++ b/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
@@ -32,12 +32,17 @@ namespace amqp0_10 {
SenderImpl::SenderImpl(SessionImpl& _parent, const std::string& _name,
const qpid::messaging::Address& _address) :
parent(_parent), name(_name), address(_address), state(UNRESOLVED),
- capacity(50), window(0), flushed(false) {}
+ capacity(50), window(0), flushed(false), unreliable(AddressResolution::is_unreliable(address)) {}
void SenderImpl::send(const qpid::messaging::Message& message)
{
- Send f(*this, &message);
- while (f.repeat) parent.execute(f);
+ if (unreliable) {
+ UnreliableSend f(*this, &message);
+ parent.execute(f);
+ } else {
+ Send f(*this, &message);
+ while (f.repeat) parent.execute(f);
+ }
}
void SenderImpl::close()
@@ -78,7 +83,7 @@ void SenderImpl::init(qpid::client::AsyncSession s, AddressResolution& resolver)
void SenderImpl::waitForCapacity()
{
//TODO: add option to throw exception rather than blocking?
- if (capacity <= (flushed ? checkPendingSends(false) : outgoing.size())) {
+ if (!unreliable && capacity <= (flushed ? checkPendingSends(false) : outgoing.size())) {
//Initial implementation is very basic. As outgoing is
//currently only reduced on receiving completions and we are
//blocking anyway we may as well sync(). If successful that
@@ -93,9 +98,8 @@ void SenderImpl::waitForCapacity()
}
}
-void SenderImpl::sendImpl(const qpid::messaging::Message& m)
+void SenderImpl::sendImpl(const qpid::messaging::Message& m)
{
- //TODO: make recording for replay optional (would still want to track completion however)
std::auto_ptr<OutgoingMessage> msg(new OutgoingMessage());
msg->convert(m);
msg->setSubject(m.getSubject().empty() ? address.getSubject() : m.getSubject());
@@ -103,6 +107,14 @@ void SenderImpl::sendImpl(const qpid::messaging::Message& m)
sink->send(session, name, outgoing.back());
}
+void SenderImpl::sendUnreliable(const qpid::messaging::Message& m)
+{
+ OutgoingMessage msg;
+ msg.convert(m);
+ msg.setSubject(m.getSubject().empty() ? address.getSubject() : m.getSubject());
+ sink->send(session, name, msg);
+}
+
void SenderImpl::replay()
{
for (OutgoingMessages::iterator i = outgoing.begin(); i != outgoing.end(); ++i) {
diff --git a/cpp/src/qpid/client/amqp0_10/SenderImpl.h b/cpp/src/qpid/client/amqp0_10/SenderImpl.h
index 881f3c754c..b65f8cf8cc 100644
--- a/cpp/src/qpid/client/amqp0_10/SenderImpl.h
+++ b/cpp/src/qpid/client/amqp0_10/SenderImpl.h
@@ -73,6 +73,7 @@ class SenderImpl : public qpid::messaging::SenderImpl
uint32_t capacity;
uint32_t window;
bool flushed;
+ const bool unreliable;
uint32_t checkPendingSends(bool flush);
void replay();
@@ -80,6 +81,7 @@ class SenderImpl : public qpid::messaging::SenderImpl
//logic for application visible methods:
void sendImpl(const qpid::messaging::Message&);
+ void sendUnreliable(const qpid::messaging::Message&);
void closeImpl();
@@ -108,6 +110,21 @@ class SenderImpl : public qpid::messaging::SenderImpl
}
};
+ struct UnreliableSend : Command
+ {
+ const qpid::messaging::Message* message;
+
+ UnreliableSend(SenderImpl& i, const qpid::messaging::Message* m) : Command(i), message(m) {}
+ void operator()()
+ {
+ //TODO: ideally want to put messages on the outbound
+ //queue and pull them off in io thread, but the old
+ //0-10 client doesn't support that option so for now
+ //we simply don't queue unreliable messages
+ impl.sendUnreliable(*message);
+ }
+ };
+
struct Close : Command
{
Close(SenderImpl& i) : Command(i) {}
diff --git a/cpp/src/qpid/messaging/Connection.cpp b/cpp/src/qpid/messaging/Connection.cpp
index 06006c9d20..230c9d5dbf 100644
--- a/cpp/src/qpid/messaging/Connection.cpp
+++ b/cpp/src/qpid/messaging/Connection.cpp
@@ -47,7 +47,7 @@ Connection::Connection(const std::string& o)
{
Variant::Map options;
AddressParser parser(o);
- if (parser.parseMap(options)) {
+ if (o.empty() || parser.parseMap(options)) {
PI::ctor(*this, new qpid::client::amqp0_10::ConnectionImpl(options));
} else {
throw InvalidOptionString(o);
diff --git a/cpp/src/tests/qpid_send.cpp b/cpp/src/tests/qpid_send.cpp
index 9556fb000f..57c348ab9c 100644
--- a/cpp/src/tests/qpid_send.cpp
+++ b/cpp/src/tests/qpid_send.cpp
@@ -63,6 +63,7 @@ struct Options : public qpid::Options
std::string content;
uint tx;
uint rollbackFrequency;
+ uint capacity;
qpid::log::Options log;
Options(const std::string& argv0=std::string())
@@ -76,6 +77,7 @@ struct Options : public qpid::Options
ttl(0),
tx(0),
rollbackFrequency(0),
+ capacity(0),
log(argv0)
{
addOptions()
@@ -94,6 +96,7 @@ struct Options : public qpid::Options
("correlation-id", qpid::optValue(correlationid, "ID"), "correlation-id for message")
("user-id", qpid::optValue(userid, "USERID"), "userid for message")
("content", qpid::optValue(content, "CONTENT"), "specify textual content")
+ ("capacity", qpid::optValue(capacity, "N"), "size of the senders outgoing message queue")
("tx", qpid::optValue(tx, "N"), "batch size for transactions (0 implies transaction are not used)")
("rollback-frequency", qpid::optValue(rollbackFrequency, "N"), "rollback frequency (0 implies no transaction will be rolledback)")
("help", qpid::optValue(help), "print this usage statement");
@@ -184,6 +187,7 @@ int main(int argc, char ** argv)
connection.open(opts.url);
Session session = connection.newSession(opts.tx > 0);
Sender sender = session.createSender(opts.address);
+ if (opts.capacity) sender.setCapacity(opts.capacity);
Message msg;
msg.setDurable(opts.durable);
if (opts.ttl) {