diff options
-rw-r--r-- | qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp | 42 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp | 3 |
3 files changed, 28 insertions, 21 deletions
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp b/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp index 16e5fde075..5924e30dd8 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp +++ b/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -140,7 +140,7 @@ struct Binding { Binding(const Variant::Map&); Binding(const std::string& exchange, const std::string& queue, const std::string& key); - + std::string exchange; std::string queue; std::string key; @@ -243,7 +243,7 @@ class Subscription : public Exchange, public MessageSource FieldTable queueOptions; FieldTable subscriptionOptions; Bindings bindings; - + void bindSubject(const std::string& subject); void bindAll(); void add(const std::string& exchange, const std::string& key); @@ -328,7 +328,7 @@ Opt& Opt::operator/(const std::string& name) { if (options) { Variant::Map::const_iterator j = options->find(name); - if (j == options->end()) { + if (j == options->end()) { value = 0; options = 0; } else { @@ -373,7 +373,7 @@ void Opt::collect(qpid::framing::FieldTable& args) const bool AddressResolution::is_unreliable(const Address& address) { - + return in((Opt(address)/LINK/RELIABILITY).str(), list_of<std::string>(UNRELIABLE)(AT_MOST_ONCE)); } @@ -475,7 +475,7 @@ void QueueSource::subscribe(qpid::client::AsyncSession& session, const std::stri checkCreate(session, FOR_RECEIVER); checkAssert(session, FOR_RECEIVER); linkBindings.bind(session); - session.messageSubscribe(arg::queue=name, + session.messageSubscribe(arg::queue=name, arg::destination=destination, arg::acceptMode=acceptMode, arg::acquireMode=acquireMode, @@ -524,7 +524,7 @@ void Subscription::bindSubject(const std::string& subject) bindings.push_back(b); } else if (actualType == XML_EXCHANGE) { Binding b(name, queue, subject); - std::string query = (boost::format("declare variable $qpid.subject external; $qpid.subject = '%1%'") + std::string query = (boost::format("declare variable $qpid.subject external; $qpid.subject = '%1%'") % subject).str(); b.arguments.setString("xquery", query); bindings.push_back(b); @@ -540,7 +540,7 @@ void Subscription::bindAll() if (actualType == TOPIC_EXCHANGE) { add(name, WILDCARD_ANY); } else if (actualType == FANOUT_EXCHANGE) { - add(name, queue); + add(name, queue); } else if (actualType == HEADERS_EXCHANGE) { Binding b(name, queue, "match-all"); b.arguments.setString("x-match", "all"); @@ -549,7 +549,7 @@ void Subscription::bindAll() Binding b(name, queue, EMPTY_STRING); b.arguments.setString("xquery", "true()"); bindings.push_back(b); - } else { + } else { add(name, EMPTY_STRING); } } @@ -600,12 +600,13 @@ void ExchangeSink::send(qpid::client::AsyncSession& session, const std::string&, { m.message.getDeliveryProperties().setRoutingKey(m.getSubject()); m.status = session.messageTransfer(arg::destination=name, arg::content=m.message); + QPID_LOG(debug, "Sending to exchange " << name << " " << m.message.getMessageProperties() << " " << m.message.getDeliveryProperties()); } void ExchangeSink::cancel(qpid::client::AsyncSession& session, const std::string&) { linkBindings.unbind(session); - checkDelete(session, FOR_SENDER); + checkDelete(session, FOR_SENDER); } QueueSink::QueueSink(const Address& address) : Queue(address) {} @@ -620,6 +621,7 @@ void QueueSink::send(qpid::client::AsyncSession& session, const std::string&, Ou { m.message.getDeliveryProperties().setRoutingKey(name); m.status = session.messageTransfer(arg::content=m.message); + QPID_LOG(debug, "Sending to queue " << name << " " << m.message.getMessageProperties() << " " << m.message.getDeliveryProperties()); } void QueueSink::cancel(qpid::client::AsyncSession& session, const std::string&) @@ -654,9 +656,9 @@ qpid::framing::ReplyTo AddressResolution::convert(const Address& address) } } -bool isQueue(qpid::client::Session session, const qpid::messaging::Address& address) +bool isQueue(qpid::client::Session session, const qpid::messaging::Address& address) { - return address.getType() == QUEUE_ADDRESS || + return address.getType() == QUEUE_ADDRESS || (address.getType().empty() && session.queueQuery(address.getName()).getQueue() == address.getName()); } @@ -695,7 +697,7 @@ void Queue::checkCreate(qpid::client::AsyncSession& session, CheckMode mode) { if (enabled(createPolicy, mode)) { QPID_LOG(debug, "Auto-creating queue '" << name << "'"); - try { + try { session.queueDeclare(arg::queue=name, arg::durable=durable, arg::autoDelete=autoDelete, @@ -749,7 +751,7 @@ void Queue::checkAssert(qpid::client::AsyncSession& session, CheckMode mode) throw AssertionFailed((boost::format("Queue not exclusive: %1%") % name).str()); } if (!alternateExchange.empty() && result.getAlternateExchange() != alternateExchange) { - throw AssertionFailed((boost::format("Alternate exchange does not match for %1%, expected %2%, got %3%") + throw AssertionFailed((boost::format("Alternate exchange does not match for %1%, expected %2%, got %3%") % name % alternateExchange % result.getAlternateExchange()).str()); } for (FieldTable::ValueMap::const_iterator i = arguments.begin(); i != arguments.end(); ++i) { @@ -839,7 +841,7 @@ void Exchange::checkAssert(qpid::client::AsyncSession& session, CheckMode mode) throw NotFound((boost::format("Exchange not found: %1%") % name).str()); } else { if (specifiedType.size() && result.getType() != specifiedType) { - throw AssertionFailed((boost::format("Exchange %1% is of incorrect type, expected %2% but got %3%") + throw AssertionFailed((boost::format("Exchange %1% is of incorrect type, expected %2% but got %3%") % name % specifiedType % result.getType()).str()); } if (durable && !result.getDurable()) { @@ -862,7 +864,7 @@ void Exchange::checkAssert(qpid::client::AsyncSession& session, CheckMode mode) } } -Binding::Binding(const Variant::Map& b) : +Binding::Binding(const Variant::Map& b) : exchange((Opt(b)/EXCHANGE).str()), queue((Opt(b)/QUEUE).str()), key((Opt(b)/KEY).str()) @@ -916,11 +918,11 @@ void Bindings::unbind(qpid::client::AsyncSession& session) void Bindings::check(qpid::client::AsyncSession& session) { for (Bindings::const_iterator i = begin(); i != end(); ++i) { - ExchangeBoundResult result = sync(session).exchangeBound(arg::queue=i->queue, + ExchangeBoundResult result = sync(session).exchangeBound(arg::queue=i->queue, arg::exchange=i->exchange, arg::bindingKey=i->key); if (result.getQueueNotMatched() || result.getKeyNotMatched()) { - throw AssertionFailed((boost::format("No such binding [exchange=%1%, queue=%2%, key=%3%]") + throw AssertionFailed((boost::format("No such binding [exchange=%1%, queue=%2%, key=%3%]") % i->exchange % i->queue % i->key).str()); } } @@ -950,7 +952,7 @@ void Node::convert(const Variant& options, FieldTable& arguments) { if (!options.isVoid()) { translate(options.asMap(), arguments); - } + } } std::vector<std::string> Node::RECEIVER_MODES = list_of<std::string>(ALWAYS) (RECEIVER); std::vector<std::string> Node::SENDER_MODES = list_of<std::string>(ALWAYS) (SENDER); diff --git a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp index a20babdeb4..3cfd2e37f2 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp +++ b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp @@ -259,6 +259,7 @@ void ConnectionImpl::reopen() void ConnectionImpl::connect(const qpid::sys::AbsTime& started) { + QPID_LOG(debug, "Starting connection, urls=" << asString(urls)); for (double i = minReconnectInterval; !tryConnect(); i = std::min(i*2, maxReconnectInterval)) { if (!reconnect) { throw qpid::messaging::TransportFailure("Failed to connect (reconnect disabled)"); @@ -269,8 +270,11 @@ void ConnectionImpl::connect(const qpid::sys::AbsTime& started) if (expired(started, timeout)) { throw qpid::messaging::TransportFailure("Failed to connect within reconnect timeout"); } + QPID_LOG(debug, "Connection retry in " << i*1000*1000 << " microseconds, urls=" + << asString(urls)); qpid::sys::usleep(int64_t(i*1000*1000)); // Sleep in microseconds. } + QPID_LOG(debug, "Connection successful, urls=" << asString(urls)); retries = 0; } diff --git a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp index 2d05755fc7..e832cd2567 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp +++ b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp @@ -198,7 +198,8 @@ bool IncomingMessages::process(Handler* handler, qpid::sys::Duration duration) if (content->isA<MessageTransferBody>()) { MessageTransfer transfer(content, *this); if (handler && handler->accept(transfer)) { - QPID_LOG(debug, "Delivered " << *content->getMethod()); + QPID_LOG(debug, "Delivered " << *content->getMethod() << " " + << *content->getHeaders()); return true; } else { //received message for another destination, keep for later |