diff options
Diffstat (limited to 'qpid/cpp/src/qpid/client')
5 files changed, 65 insertions, 47 deletions
diff --git a/qpid/cpp/src/qpid/client/TCPConnector.cpp b/qpid/cpp/src/qpid/client/TCPConnector.cpp index 4660a41c07..51eacf77e8 100644 --- a/qpid/cpp/src/qpid/client/TCPConnector.cpp +++ b/qpid/cpp/src/qpid/client/TCPConnector.cpp @@ -97,7 +97,7 @@ void TCPConnector::connect(const std::string& host, const std::string& port) { boost::bind(&TCPConnector::connected, this, _1), boost::bind(&TCPConnector::connectFailed, this, _3)); closed = false; - + identifier = str(format("[%1%]") % socket.getFullAddress()); connector->start(poller); } @@ -120,8 +120,6 @@ void TCPConnector::start(sys::AsynchIO* aio_) { for (int i = 0; i < 4; i++) { aio->queueReadBuffer(new Buff(maxFrameSize)); } - - identifier = str(format("[%1%]") % socket.getFullAddress()); } void TCPConnector::initAmqp() { @@ -131,7 +129,7 @@ void TCPConnector::initAmqp() { void TCPConnector::connectFailed(const std::string& msg) { connector = 0; - QPID_LOG(warning, "Connect failed: " << msg); + QPID_LOG(warning, "Connect failed: " << msg << " " << identifier); socket.close(); if (!closed) closed = true; @@ -185,7 +183,7 @@ sys::ShutdownHandler* TCPConnector::getShutdownHandler() const { return shutdownHandler; } -const std::string& TCPConnector::getIdentifier() const { +const std::string& TCPConnector::getIdentifier() const { return identifier; } diff --git a/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp b/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp index 16e5fde075..5924e30dd8 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp +++ b/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -140,7 +140,7 @@ struct Binding { Binding(const Variant::Map&); Binding(const std::string& exchange, const std::string& queue, const std::string& key); - + std::string exchange; std::string queue; std::string key; @@ -243,7 +243,7 @@ class Subscription : public Exchange, public MessageSource FieldTable queueOptions; FieldTable subscriptionOptions; Bindings bindings; - + void bindSubject(const std::string& subject); void bindAll(); void add(const std::string& exchange, const std::string& key); @@ -328,7 +328,7 @@ Opt& Opt::operator/(const std::string& name) { if (options) { Variant::Map::const_iterator j = options->find(name); - if (j == options->end()) { + if (j == options->end()) { value = 0; options = 0; } else { @@ -373,7 +373,7 @@ void Opt::collect(qpid::framing::FieldTable& args) const bool AddressResolution::is_unreliable(const Address& address) { - + return in((Opt(address)/LINK/RELIABILITY).str(), list_of<std::string>(UNRELIABLE)(AT_MOST_ONCE)); } @@ -475,7 +475,7 @@ void QueueSource::subscribe(qpid::client::AsyncSession& session, const std::stri checkCreate(session, FOR_RECEIVER); checkAssert(session, FOR_RECEIVER); linkBindings.bind(session); - session.messageSubscribe(arg::queue=name, + session.messageSubscribe(arg::queue=name, arg::destination=destination, arg::acceptMode=acceptMode, arg::acquireMode=acquireMode, @@ -524,7 +524,7 @@ void Subscription::bindSubject(const std::string& subject) bindings.push_back(b); } else if (actualType == XML_EXCHANGE) { Binding b(name, queue, subject); - std::string query = (boost::format("declare variable $qpid.subject external; $qpid.subject = '%1%'") + std::string query = (boost::format("declare variable $qpid.subject external; $qpid.subject = '%1%'") % subject).str(); b.arguments.setString("xquery", query); bindings.push_back(b); @@ -540,7 +540,7 @@ void Subscription::bindAll() if (actualType == TOPIC_EXCHANGE) { add(name, WILDCARD_ANY); } else if (actualType == FANOUT_EXCHANGE) { - add(name, queue); + add(name, queue); } else if (actualType == HEADERS_EXCHANGE) { Binding b(name, queue, "match-all"); b.arguments.setString("x-match", "all"); @@ -549,7 +549,7 @@ void Subscription::bindAll() Binding b(name, queue, EMPTY_STRING); b.arguments.setString("xquery", "true()"); bindings.push_back(b); - } else { + } else { add(name, EMPTY_STRING); } } @@ -600,12 +600,13 @@ void ExchangeSink::send(qpid::client::AsyncSession& session, const std::string&, { m.message.getDeliveryProperties().setRoutingKey(m.getSubject()); m.status = session.messageTransfer(arg::destination=name, arg::content=m.message); + QPID_LOG(debug, "Sending to exchange " << name << " " << m.message.getMessageProperties() << " " << m.message.getDeliveryProperties()); } void ExchangeSink::cancel(qpid::client::AsyncSession& session, const std::string&) { linkBindings.unbind(session); - checkDelete(session, FOR_SENDER); + checkDelete(session, FOR_SENDER); } QueueSink::QueueSink(const Address& address) : Queue(address) {} @@ -620,6 +621,7 @@ void QueueSink::send(qpid::client::AsyncSession& session, const std::string&, Ou { m.message.getDeliveryProperties().setRoutingKey(name); m.status = session.messageTransfer(arg::content=m.message); + QPID_LOG(debug, "Sending to queue " << name << " " << m.message.getMessageProperties() << " " << m.message.getDeliveryProperties()); } void QueueSink::cancel(qpid::client::AsyncSession& session, const std::string&) @@ -654,9 +656,9 @@ qpid::framing::ReplyTo AddressResolution::convert(const Address& address) } } -bool isQueue(qpid::client::Session session, const qpid::messaging::Address& address) +bool isQueue(qpid::client::Session session, const qpid::messaging::Address& address) { - return address.getType() == QUEUE_ADDRESS || + return address.getType() == QUEUE_ADDRESS || (address.getType().empty() && session.queueQuery(address.getName()).getQueue() == address.getName()); } @@ -695,7 +697,7 @@ void Queue::checkCreate(qpid::client::AsyncSession& session, CheckMode mode) { if (enabled(createPolicy, mode)) { QPID_LOG(debug, "Auto-creating queue '" << name << "'"); - try { + try { session.queueDeclare(arg::queue=name, arg::durable=durable, arg::autoDelete=autoDelete, @@ -749,7 +751,7 @@ void Queue::checkAssert(qpid::client::AsyncSession& session, CheckMode mode) throw AssertionFailed((boost::format("Queue not exclusive: %1%") % name).str()); } if (!alternateExchange.empty() && result.getAlternateExchange() != alternateExchange) { - throw AssertionFailed((boost::format("Alternate exchange does not match for %1%, expected %2%, got %3%") + throw AssertionFailed((boost::format("Alternate exchange does not match for %1%, expected %2%, got %3%") % name % alternateExchange % result.getAlternateExchange()).str()); } for (FieldTable::ValueMap::const_iterator i = arguments.begin(); i != arguments.end(); ++i) { @@ -839,7 +841,7 @@ void Exchange::checkAssert(qpid::client::AsyncSession& session, CheckMode mode) throw NotFound((boost::format("Exchange not found: %1%") % name).str()); } else { if (specifiedType.size() && result.getType() != specifiedType) { - throw AssertionFailed((boost::format("Exchange %1% is of incorrect type, expected %2% but got %3%") + throw AssertionFailed((boost::format("Exchange %1% is of incorrect type, expected %2% but got %3%") % name % specifiedType % result.getType()).str()); } if (durable && !result.getDurable()) { @@ -862,7 +864,7 @@ void Exchange::checkAssert(qpid::client::AsyncSession& session, CheckMode mode) } } -Binding::Binding(const Variant::Map& b) : +Binding::Binding(const Variant::Map& b) : exchange((Opt(b)/EXCHANGE).str()), queue((Opt(b)/QUEUE).str()), key((Opt(b)/KEY).str()) @@ -916,11 +918,11 @@ void Bindings::unbind(qpid::client::AsyncSession& session) void Bindings::check(qpid::client::AsyncSession& session) { for (Bindings::const_iterator i = begin(); i != end(); ++i) { - ExchangeBoundResult result = sync(session).exchangeBound(arg::queue=i->queue, + ExchangeBoundResult result = sync(session).exchangeBound(arg::queue=i->queue, arg::exchange=i->exchange, arg::bindingKey=i->key); if (result.getQueueNotMatched() || result.getKeyNotMatched()) { - throw AssertionFailed((boost::format("No such binding [exchange=%1%, queue=%2%, key=%3%]") + throw AssertionFailed((boost::format("No such binding [exchange=%1%, queue=%2%, key=%3%]") % i->exchange % i->queue % i->key).str()); } } @@ -950,7 +952,7 @@ void Node::convert(const Variant& options, FieldTable& arguments) { if (!options.isVoid()) { translate(options.asMap(), arguments); - } + } } std::vector<std::string> Node::RECEIVER_MODES = list_of<std::string>(ALWAYS) (RECEIVER); std::vector<std::string> Node::SENDER_MODES = list_of<std::string>(ALWAYS) (SENDER); diff --git a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp index cc6e9b9ab2..3cfd2e37f2 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp +++ b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp @@ -29,6 +29,7 @@ #include <boost/intrusive_ptr.hpp> #include <vector> #include <sstream> +#include <limits> namespace qpid { namespace client { @@ -39,6 +40,16 @@ using qpid::types::VAR_LIST; using qpid::framing::Uuid; namespace { + +double FOREVER(std::numeric_limits<double>::max()); + +// Time values in seconds can be specified as integer or floating point values. +double timeValue(const Variant& value) { + if (types::isIntegerType(value.getType())) + return double(value.asInt64()); + return value.asDouble(); +} + void merge(const std::string& value, std::vector<std::string>& list) { if (std::find(list.begin(), list.end(), value) == list.end()) list.push_back(value); @@ -60,11 +71,21 @@ std::string asString(const std::vector<std::string>& v) { os << "]"; return os.str(); } + +bool expired(const sys::AbsTime& start, double timeout) +{ + if (timeout == 0) return true; + if (timeout == FOREVER) return false; + sys::Duration used(start, sys::now()); + sys::Duration allowed(int64_t(timeout*sys::TIME_SEC)); + return allowed < used; } +} // namespace + ConnectionImpl::ConnectionImpl(const std::string& url, const Variant::Map& options) : - replaceUrls(false), reconnect(false), timeout(-1), limit(-1), - minReconnectInterval(3), maxReconnectInterval(60), + replaceUrls(false), reconnect(false), timeout(FOREVER), limit(-1), + minReconnectInterval(0.001), maxReconnectInterval(2), retries(0), reconnectOnLimitExceeded(true) { setOptions(options); @@ -85,15 +106,15 @@ void ConnectionImpl::setOption(const std::string& name, const Variant& value) if (name == "reconnect") { reconnect = value; } else if (name == "reconnect-timeout" || name == "reconnect_timeout") { - timeout = value; + timeout = timeValue(value); } else if (name == "reconnect-limit" || name == "reconnect_limit") { limit = value; } else if (name == "reconnect-interval" || name == "reconnect_interval") { - maxReconnectInterval = minReconnectInterval = value; + maxReconnectInterval = minReconnectInterval = timeValue(value); } else if (name == "reconnect-interval-min" || name == "reconnect_interval_min") { - minReconnectInterval = value; + minReconnectInterval = timeValue(value); } else if (name == "reconnect-interval-max" || name == "reconnect_interval_max") { - maxReconnectInterval = value; + maxReconnectInterval = timeValue(value); } else if (name == "reconnect-urls-replace" || name == "reconnect_urls_replace") { replaceUrls = value.asBool(); } else if (name == "reconnect-urls" || name == "reconnect_urls") { @@ -236,18 +257,10 @@ void ConnectionImpl::reopen() } -bool expired(const qpid::sys::AbsTime& start, int64_t timeout) -{ - if (timeout == 0) return true; - if (timeout < 0) return false; - qpid::sys::Duration used(start, qpid::sys::now()); - qpid::sys::Duration allowed = timeout * qpid::sys::TIME_SEC; - return allowed < used; -} - void ConnectionImpl::connect(const qpid::sys::AbsTime& started) { - for (int64_t i = minReconnectInterval; !tryConnect(); i = std::min(i * 2, maxReconnectInterval)) { + QPID_LOG(debug, "Starting connection, urls=" << asString(urls)); + for (double i = minReconnectInterval; !tryConnect(); i = std::min(i*2, maxReconnectInterval)) { if (!reconnect) { throw qpid::messaging::TransportFailure("Failed to connect (reconnect disabled)"); } @@ -257,8 +270,11 @@ void ConnectionImpl::connect(const qpid::sys::AbsTime& started) if (expired(started, timeout)) { throw qpid::messaging::TransportFailure("Failed to connect within reconnect timeout"); } - else qpid::sys::sleep(i); + QPID_LOG(debug, "Connection retry in " << i*1000*1000 << " microseconds, urls=" + << asString(urls)); + qpid::sys::usleep(int64_t(i*1000*1000)); // Sleep in microseconds. } + QPID_LOG(debug, "Connection successful, urls=" << asString(urls)); retries = 0; } @@ -320,6 +336,7 @@ bool ConnectionImpl::backoff() return false; } } + std::string ConnectionImpl::getAuthenticatedUsername() { return connection.getNegotiatedSettings().username; diff --git a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h index 1b58cbbe3e..d1ac4533d5 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h +++ b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h @@ -64,10 +64,10 @@ class ConnectionImpl : public qpid::messaging::ConnectionImpl std::vector<std::string> urls; qpid::client::ConnectionSettings settings; bool reconnect; - int64_t timeout; + double timeout; int32_t limit; - int64_t minReconnectInterval; - int64_t maxReconnectInterval; + double minReconnectInterval; + double maxReconnectInterval; int32_t retries; bool reconnectOnLimitExceeded; diff --git a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp index 715376fd8d..e832cd2567 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp +++ b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp @@ -198,7 +198,8 @@ bool IncomingMessages::process(Handler* handler, qpid::sys::Duration duration) if (content->isA<MessageTransferBody>()) { MessageTransfer transfer(content, *this); if (handler && handler->accept(transfer)) { - QPID_LOG(debug, "Delivered " << *content->getMethod()); + QPID_LOG(debug, "Delivered " << *content->getMethod() << " " + << *content->getHeaders()); return true; } else { //received message for another destination, keep for later @@ -275,7 +276,7 @@ void IncomingMessages::retrieve(FrameSetPtr command, qpid::messaging::Message* m populate(*message, *command); } const MessageTransferBody* transfer = command->as<MessageTransferBody>(); - if (transfer->getAcquireMode() == ACQUIRE_MODE_PRE_ACQUIRED && transfer->getAcceptMode() == ACCEPT_MODE_EXPLICIT) { + if (transfer->getAcceptMode() == ACCEPT_MODE_EXPLICIT) { sys::Mutex::ScopedLock l(lock); acceptTracker.delivered(transfer->getDestination(), command->getId()); } |