summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/client
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/client')
-rw-r--r--qpid/cpp/src/qpid/client/TCPConnector.cpp8
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp42
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp51
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h6
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp5
5 files changed, 65 insertions, 47 deletions
diff --git a/qpid/cpp/src/qpid/client/TCPConnector.cpp b/qpid/cpp/src/qpid/client/TCPConnector.cpp
index 4660a41c07..51eacf77e8 100644
--- a/qpid/cpp/src/qpid/client/TCPConnector.cpp
+++ b/qpid/cpp/src/qpid/client/TCPConnector.cpp
@@ -97,7 +97,7 @@ void TCPConnector::connect(const std::string& host, const std::string& port) {
boost::bind(&TCPConnector::connected, this, _1),
boost::bind(&TCPConnector::connectFailed, this, _3));
closed = false;
-
+ identifier = str(format("[%1%]") % socket.getFullAddress());
connector->start(poller);
}
@@ -120,8 +120,6 @@ void TCPConnector::start(sys::AsynchIO* aio_) {
for (int i = 0; i < 4; i++) {
aio->queueReadBuffer(new Buff(maxFrameSize));
}
-
- identifier = str(format("[%1%]") % socket.getFullAddress());
}
void TCPConnector::initAmqp() {
@@ -131,7 +129,7 @@ void TCPConnector::initAmqp() {
void TCPConnector::connectFailed(const std::string& msg) {
connector = 0;
- QPID_LOG(warning, "Connect failed: " << msg);
+ QPID_LOG(warning, "Connect failed: " << msg << " " << identifier);
socket.close();
if (!closed)
closed = true;
@@ -185,7 +183,7 @@ sys::ShutdownHandler* TCPConnector::getShutdownHandler() const {
return shutdownHandler;
}
-const std::string& TCPConnector::getIdentifier() const {
+const std::string& TCPConnector::getIdentifier() const {
return identifier;
}
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp b/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
index 16e5fde075..5924e30dd8 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
+++ b/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -140,7 +140,7 @@ struct Binding
{
Binding(const Variant::Map&);
Binding(const std::string& exchange, const std::string& queue, const std::string& key);
-
+
std::string exchange;
std::string queue;
std::string key;
@@ -243,7 +243,7 @@ class Subscription : public Exchange, public MessageSource
FieldTable queueOptions;
FieldTable subscriptionOptions;
Bindings bindings;
-
+
void bindSubject(const std::string& subject);
void bindAll();
void add(const std::string& exchange, const std::string& key);
@@ -328,7 +328,7 @@ Opt& Opt::operator/(const std::string& name)
{
if (options) {
Variant::Map::const_iterator j = options->find(name);
- if (j == options->end()) {
+ if (j == options->end()) {
value = 0;
options = 0;
} else {
@@ -373,7 +373,7 @@ void Opt::collect(qpid::framing::FieldTable& args) const
bool AddressResolution::is_unreliable(const Address& address)
{
-
+
return in((Opt(address)/LINK/RELIABILITY).str(),
list_of<std::string>(UNRELIABLE)(AT_MOST_ONCE));
}
@@ -475,7 +475,7 @@ void QueueSource::subscribe(qpid::client::AsyncSession& session, const std::stri
checkCreate(session, FOR_RECEIVER);
checkAssert(session, FOR_RECEIVER);
linkBindings.bind(session);
- session.messageSubscribe(arg::queue=name,
+ session.messageSubscribe(arg::queue=name,
arg::destination=destination,
arg::acceptMode=acceptMode,
arg::acquireMode=acquireMode,
@@ -524,7 +524,7 @@ void Subscription::bindSubject(const std::string& subject)
bindings.push_back(b);
} else if (actualType == XML_EXCHANGE) {
Binding b(name, queue, subject);
- std::string query = (boost::format("declare variable $qpid.subject external; $qpid.subject = '%1%'")
+ std::string query = (boost::format("declare variable $qpid.subject external; $qpid.subject = '%1%'")
% subject).str();
b.arguments.setString("xquery", query);
bindings.push_back(b);
@@ -540,7 +540,7 @@ void Subscription::bindAll()
if (actualType == TOPIC_EXCHANGE) {
add(name, WILDCARD_ANY);
} else if (actualType == FANOUT_EXCHANGE) {
- add(name, queue);
+ add(name, queue);
} else if (actualType == HEADERS_EXCHANGE) {
Binding b(name, queue, "match-all");
b.arguments.setString("x-match", "all");
@@ -549,7 +549,7 @@ void Subscription::bindAll()
Binding b(name, queue, EMPTY_STRING);
b.arguments.setString("xquery", "true()");
bindings.push_back(b);
- } else {
+ } else {
add(name, EMPTY_STRING);
}
}
@@ -600,12 +600,13 @@ void ExchangeSink::send(qpid::client::AsyncSession& session, const std::string&,
{
m.message.getDeliveryProperties().setRoutingKey(m.getSubject());
m.status = session.messageTransfer(arg::destination=name, arg::content=m.message);
+ QPID_LOG(debug, "Sending to exchange " << name << " " << m.message.getMessageProperties() << " " << m.message.getDeliveryProperties());
}
void ExchangeSink::cancel(qpid::client::AsyncSession& session, const std::string&)
{
linkBindings.unbind(session);
- checkDelete(session, FOR_SENDER);
+ checkDelete(session, FOR_SENDER);
}
QueueSink::QueueSink(const Address& address) : Queue(address) {}
@@ -620,6 +621,7 @@ void QueueSink::send(qpid::client::AsyncSession& session, const std::string&, Ou
{
m.message.getDeliveryProperties().setRoutingKey(name);
m.status = session.messageTransfer(arg::content=m.message);
+ QPID_LOG(debug, "Sending to queue " << name << " " << m.message.getMessageProperties() << " " << m.message.getDeliveryProperties());
}
void QueueSink::cancel(qpid::client::AsyncSession& session, const std::string&)
@@ -654,9 +656,9 @@ qpid::framing::ReplyTo AddressResolution::convert(const Address& address)
}
}
-bool isQueue(qpid::client::Session session, const qpid::messaging::Address& address)
+bool isQueue(qpid::client::Session session, const qpid::messaging::Address& address)
{
- return address.getType() == QUEUE_ADDRESS ||
+ return address.getType() == QUEUE_ADDRESS ||
(address.getType().empty() && session.queueQuery(address.getName()).getQueue() == address.getName());
}
@@ -695,7 +697,7 @@ void Queue::checkCreate(qpid::client::AsyncSession& session, CheckMode mode)
{
if (enabled(createPolicy, mode)) {
QPID_LOG(debug, "Auto-creating queue '" << name << "'");
- try {
+ try {
session.queueDeclare(arg::queue=name,
arg::durable=durable,
arg::autoDelete=autoDelete,
@@ -749,7 +751,7 @@ void Queue::checkAssert(qpid::client::AsyncSession& session, CheckMode mode)
throw AssertionFailed((boost::format("Queue not exclusive: %1%") % name).str());
}
if (!alternateExchange.empty() && result.getAlternateExchange() != alternateExchange) {
- throw AssertionFailed((boost::format("Alternate exchange does not match for %1%, expected %2%, got %3%")
+ throw AssertionFailed((boost::format("Alternate exchange does not match for %1%, expected %2%, got %3%")
% name % alternateExchange % result.getAlternateExchange()).str());
}
for (FieldTable::ValueMap::const_iterator i = arguments.begin(); i != arguments.end(); ++i) {
@@ -839,7 +841,7 @@ void Exchange::checkAssert(qpid::client::AsyncSession& session, CheckMode mode)
throw NotFound((boost::format("Exchange not found: %1%") % name).str());
} else {
if (specifiedType.size() && result.getType() != specifiedType) {
- throw AssertionFailed((boost::format("Exchange %1% is of incorrect type, expected %2% but got %3%")
+ throw AssertionFailed((boost::format("Exchange %1% is of incorrect type, expected %2% but got %3%")
% name % specifiedType % result.getType()).str());
}
if (durable && !result.getDurable()) {
@@ -862,7 +864,7 @@ void Exchange::checkAssert(qpid::client::AsyncSession& session, CheckMode mode)
}
}
-Binding::Binding(const Variant::Map& b) :
+Binding::Binding(const Variant::Map& b) :
exchange((Opt(b)/EXCHANGE).str()),
queue((Opt(b)/QUEUE).str()),
key((Opt(b)/KEY).str())
@@ -916,11 +918,11 @@ void Bindings::unbind(qpid::client::AsyncSession& session)
void Bindings::check(qpid::client::AsyncSession& session)
{
for (Bindings::const_iterator i = begin(); i != end(); ++i) {
- ExchangeBoundResult result = sync(session).exchangeBound(arg::queue=i->queue,
+ ExchangeBoundResult result = sync(session).exchangeBound(arg::queue=i->queue,
arg::exchange=i->exchange,
arg::bindingKey=i->key);
if (result.getQueueNotMatched() || result.getKeyNotMatched()) {
- throw AssertionFailed((boost::format("No such binding [exchange=%1%, queue=%2%, key=%3%]")
+ throw AssertionFailed((boost::format("No such binding [exchange=%1%, queue=%2%, key=%3%]")
% i->exchange % i->queue % i->key).str());
}
}
@@ -950,7 +952,7 @@ void Node::convert(const Variant& options, FieldTable& arguments)
{
if (!options.isVoid()) {
translate(options.asMap(), arguments);
- }
+ }
}
std::vector<std::string> Node::RECEIVER_MODES = list_of<std::string>(ALWAYS) (RECEIVER);
std::vector<std::string> Node::SENDER_MODES = list_of<std::string>(ALWAYS) (SENDER);
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
index cc6e9b9ab2..3cfd2e37f2 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
+++ b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
@@ -29,6 +29,7 @@
#include <boost/intrusive_ptr.hpp>
#include <vector>
#include <sstream>
+#include <limits>
namespace qpid {
namespace client {
@@ -39,6 +40,16 @@ using qpid::types::VAR_LIST;
using qpid::framing::Uuid;
namespace {
+
+double FOREVER(std::numeric_limits<double>::max());
+
+// Time values in seconds can be specified as integer or floating point values.
+double timeValue(const Variant& value) {
+ if (types::isIntegerType(value.getType()))
+ return double(value.asInt64());
+ return value.asDouble();
+}
+
void merge(const std::string& value, std::vector<std::string>& list) {
if (std::find(list.begin(), list.end(), value) == list.end())
list.push_back(value);
@@ -60,11 +71,21 @@ std::string asString(const std::vector<std::string>& v) {
os << "]";
return os.str();
}
+
+bool expired(const sys::AbsTime& start, double timeout)
+{
+ if (timeout == 0) return true;
+ if (timeout == FOREVER) return false;
+ sys::Duration used(start, sys::now());
+ sys::Duration allowed(int64_t(timeout*sys::TIME_SEC));
+ return allowed < used;
}
+} // namespace
+
ConnectionImpl::ConnectionImpl(const std::string& url, const Variant::Map& options) :
- replaceUrls(false), reconnect(false), timeout(-1), limit(-1),
- minReconnectInterval(3), maxReconnectInterval(60),
+ replaceUrls(false), reconnect(false), timeout(FOREVER), limit(-1),
+ minReconnectInterval(0.001), maxReconnectInterval(2),
retries(0), reconnectOnLimitExceeded(true)
{
setOptions(options);
@@ -85,15 +106,15 @@ void ConnectionImpl::setOption(const std::string& name, const Variant& value)
if (name == "reconnect") {
reconnect = value;
} else if (name == "reconnect-timeout" || name == "reconnect_timeout") {
- timeout = value;
+ timeout = timeValue(value);
} else if (name == "reconnect-limit" || name == "reconnect_limit") {
limit = value;
} else if (name == "reconnect-interval" || name == "reconnect_interval") {
- maxReconnectInterval = minReconnectInterval = value;
+ maxReconnectInterval = minReconnectInterval = timeValue(value);
} else if (name == "reconnect-interval-min" || name == "reconnect_interval_min") {
- minReconnectInterval = value;
+ minReconnectInterval = timeValue(value);
} else if (name == "reconnect-interval-max" || name == "reconnect_interval_max") {
- maxReconnectInterval = value;
+ maxReconnectInterval = timeValue(value);
} else if (name == "reconnect-urls-replace" || name == "reconnect_urls_replace") {
replaceUrls = value.asBool();
} else if (name == "reconnect-urls" || name == "reconnect_urls") {
@@ -236,18 +257,10 @@ void ConnectionImpl::reopen()
}
-bool expired(const qpid::sys::AbsTime& start, int64_t timeout)
-{
- if (timeout == 0) return true;
- if (timeout < 0) return false;
- qpid::sys::Duration used(start, qpid::sys::now());
- qpid::sys::Duration allowed = timeout * qpid::sys::TIME_SEC;
- return allowed < used;
-}
-
void ConnectionImpl::connect(const qpid::sys::AbsTime& started)
{
- for (int64_t i = minReconnectInterval; !tryConnect(); i = std::min(i * 2, maxReconnectInterval)) {
+ QPID_LOG(debug, "Starting connection, urls=" << asString(urls));
+ for (double i = minReconnectInterval; !tryConnect(); i = std::min(i*2, maxReconnectInterval)) {
if (!reconnect) {
throw qpid::messaging::TransportFailure("Failed to connect (reconnect disabled)");
}
@@ -257,8 +270,11 @@ void ConnectionImpl::connect(const qpid::sys::AbsTime& started)
if (expired(started, timeout)) {
throw qpid::messaging::TransportFailure("Failed to connect within reconnect timeout");
}
- else qpid::sys::sleep(i);
+ QPID_LOG(debug, "Connection retry in " << i*1000*1000 << " microseconds, urls="
+ << asString(urls));
+ qpid::sys::usleep(int64_t(i*1000*1000)); // Sleep in microseconds.
}
+ QPID_LOG(debug, "Connection successful, urls=" << asString(urls));
retries = 0;
}
@@ -320,6 +336,7 @@ bool ConnectionImpl::backoff()
return false;
}
}
+
std::string ConnectionImpl::getAuthenticatedUsername()
{
return connection.getNegotiatedSettings().username;
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
index 1b58cbbe3e..d1ac4533d5 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
+++ b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
@@ -64,10 +64,10 @@ class ConnectionImpl : public qpid::messaging::ConnectionImpl
std::vector<std::string> urls;
qpid::client::ConnectionSettings settings;
bool reconnect;
- int64_t timeout;
+ double timeout;
int32_t limit;
- int64_t minReconnectInterval;
- int64_t maxReconnectInterval;
+ double minReconnectInterval;
+ double maxReconnectInterval;
int32_t retries;
bool reconnectOnLimitExceeded;
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
index 715376fd8d..e832cd2567 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
+++ b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
@@ -198,7 +198,8 @@ bool IncomingMessages::process(Handler* handler, qpid::sys::Duration duration)
if (content->isA<MessageTransferBody>()) {
MessageTransfer transfer(content, *this);
if (handler && handler->accept(transfer)) {
- QPID_LOG(debug, "Delivered " << *content->getMethod());
+ QPID_LOG(debug, "Delivered " << *content->getMethod() << " "
+ << *content->getHeaders());
return true;
} else {
//received message for another destination, keep for later
@@ -275,7 +276,7 @@ void IncomingMessages::retrieve(FrameSetPtr command, qpid::messaging::Message* m
populate(*message, *command);
}
const MessageTransferBody* transfer = command->as<MessageTransferBody>();
- if (transfer->getAcquireMode() == ACQUIRE_MODE_PRE_ACQUIRED && transfer->getAcceptMode() == ACCEPT_MODE_EXPLICIT) {
+ if (transfer->getAcceptMode() == ACCEPT_MODE_EXPLICIT) {
sys::Mutex::ScopedLock l(lock);
acceptTracker.delivered(transfer->getDestination(), command->getId());
}