summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-02-13 16:18:30 +0000
committerAlan Conway <aconway@apache.org>2012-02-13 16:18:30 +0000
commitb3cff4e5d1bcf0928b364d43785d8e5fadb062ed (patch)
tree1b6d84e3a5d837b6a4977af22f793ea0f51479b8
parentfb3419fce46397486151268098540d17c6ef7b46 (diff)
downloadqpid-python-b3cff4e5d1bcf0928b364d43785d8e5fadb062ed.tar.gz
QPID-3603: Additional debug logging for messaging client connections.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3603-2@1243581 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp42
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp4
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp3
3 files changed, 28 insertions, 21 deletions
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp b/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
index 16e5fde075..5924e30dd8 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
+++ b/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -140,7 +140,7 @@ struct Binding
{
Binding(const Variant::Map&);
Binding(const std::string& exchange, const std::string& queue, const std::string& key);
-
+
std::string exchange;
std::string queue;
std::string key;
@@ -243,7 +243,7 @@ class Subscription : public Exchange, public MessageSource
FieldTable queueOptions;
FieldTable subscriptionOptions;
Bindings bindings;
-
+
void bindSubject(const std::string& subject);
void bindAll();
void add(const std::string& exchange, const std::string& key);
@@ -328,7 +328,7 @@ Opt& Opt::operator/(const std::string& name)
{
if (options) {
Variant::Map::const_iterator j = options->find(name);
- if (j == options->end()) {
+ if (j == options->end()) {
value = 0;
options = 0;
} else {
@@ -373,7 +373,7 @@ void Opt::collect(qpid::framing::FieldTable& args) const
bool AddressResolution::is_unreliable(const Address& address)
{
-
+
return in((Opt(address)/LINK/RELIABILITY).str(),
list_of<std::string>(UNRELIABLE)(AT_MOST_ONCE));
}
@@ -475,7 +475,7 @@ void QueueSource::subscribe(qpid::client::AsyncSession& session, const std::stri
checkCreate(session, FOR_RECEIVER);
checkAssert(session, FOR_RECEIVER);
linkBindings.bind(session);
- session.messageSubscribe(arg::queue=name,
+ session.messageSubscribe(arg::queue=name,
arg::destination=destination,
arg::acceptMode=acceptMode,
arg::acquireMode=acquireMode,
@@ -524,7 +524,7 @@ void Subscription::bindSubject(const std::string& subject)
bindings.push_back(b);
} else if (actualType == XML_EXCHANGE) {
Binding b(name, queue, subject);
- std::string query = (boost::format("declare variable $qpid.subject external; $qpid.subject = '%1%'")
+ std::string query = (boost::format("declare variable $qpid.subject external; $qpid.subject = '%1%'")
% subject).str();
b.arguments.setString("xquery", query);
bindings.push_back(b);
@@ -540,7 +540,7 @@ void Subscription::bindAll()
if (actualType == TOPIC_EXCHANGE) {
add(name, WILDCARD_ANY);
} else if (actualType == FANOUT_EXCHANGE) {
- add(name, queue);
+ add(name, queue);
} else if (actualType == HEADERS_EXCHANGE) {
Binding b(name, queue, "match-all");
b.arguments.setString("x-match", "all");
@@ -549,7 +549,7 @@ void Subscription::bindAll()
Binding b(name, queue, EMPTY_STRING);
b.arguments.setString("xquery", "true()");
bindings.push_back(b);
- } else {
+ } else {
add(name, EMPTY_STRING);
}
}
@@ -600,12 +600,13 @@ void ExchangeSink::send(qpid::client::AsyncSession& session, const std::string&,
{
m.message.getDeliveryProperties().setRoutingKey(m.getSubject());
m.status = session.messageTransfer(arg::destination=name, arg::content=m.message);
+ QPID_LOG(debug, "Sending to exchange " << name << " " << m.message.getMessageProperties() << " " << m.message.getDeliveryProperties());
}
void ExchangeSink::cancel(qpid::client::AsyncSession& session, const std::string&)
{
linkBindings.unbind(session);
- checkDelete(session, FOR_SENDER);
+ checkDelete(session, FOR_SENDER);
}
QueueSink::QueueSink(const Address& address) : Queue(address) {}
@@ -620,6 +621,7 @@ void QueueSink::send(qpid::client::AsyncSession& session, const std::string&, Ou
{
m.message.getDeliveryProperties().setRoutingKey(name);
m.status = session.messageTransfer(arg::content=m.message);
+ QPID_LOG(debug, "Sending to queue " << name << " " << m.message.getMessageProperties() << " " << m.message.getDeliveryProperties());
}
void QueueSink::cancel(qpid::client::AsyncSession& session, const std::string&)
@@ -654,9 +656,9 @@ qpid::framing::ReplyTo AddressResolution::convert(const Address& address)
}
}
-bool isQueue(qpid::client::Session session, const qpid::messaging::Address& address)
+bool isQueue(qpid::client::Session session, const qpid::messaging::Address& address)
{
- return address.getType() == QUEUE_ADDRESS ||
+ return address.getType() == QUEUE_ADDRESS ||
(address.getType().empty() && session.queueQuery(address.getName()).getQueue() == address.getName());
}
@@ -695,7 +697,7 @@ void Queue::checkCreate(qpid::client::AsyncSession& session, CheckMode mode)
{
if (enabled(createPolicy, mode)) {
QPID_LOG(debug, "Auto-creating queue '" << name << "'");
- try {
+ try {
session.queueDeclare(arg::queue=name,
arg::durable=durable,
arg::autoDelete=autoDelete,
@@ -749,7 +751,7 @@ void Queue::checkAssert(qpid::client::AsyncSession& session, CheckMode mode)
throw AssertionFailed((boost::format("Queue not exclusive: %1%") % name).str());
}
if (!alternateExchange.empty() && result.getAlternateExchange() != alternateExchange) {
- throw AssertionFailed((boost::format("Alternate exchange does not match for %1%, expected %2%, got %3%")
+ throw AssertionFailed((boost::format("Alternate exchange does not match for %1%, expected %2%, got %3%")
% name % alternateExchange % result.getAlternateExchange()).str());
}
for (FieldTable::ValueMap::const_iterator i = arguments.begin(); i != arguments.end(); ++i) {
@@ -839,7 +841,7 @@ void Exchange::checkAssert(qpid::client::AsyncSession& session, CheckMode mode)
throw NotFound((boost::format("Exchange not found: %1%") % name).str());
} else {
if (specifiedType.size() && result.getType() != specifiedType) {
- throw AssertionFailed((boost::format("Exchange %1% is of incorrect type, expected %2% but got %3%")
+ throw AssertionFailed((boost::format("Exchange %1% is of incorrect type, expected %2% but got %3%")
% name % specifiedType % result.getType()).str());
}
if (durable && !result.getDurable()) {
@@ -862,7 +864,7 @@ void Exchange::checkAssert(qpid::client::AsyncSession& session, CheckMode mode)
}
}
-Binding::Binding(const Variant::Map& b) :
+Binding::Binding(const Variant::Map& b) :
exchange((Opt(b)/EXCHANGE).str()),
queue((Opt(b)/QUEUE).str()),
key((Opt(b)/KEY).str())
@@ -916,11 +918,11 @@ void Bindings::unbind(qpid::client::AsyncSession& session)
void Bindings::check(qpid::client::AsyncSession& session)
{
for (Bindings::const_iterator i = begin(); i != end(); ++i) {
- ExchangeBoundResult result = sync(session).exchangeBound(arg::queue=i->queue,
+ ExchangeBoundResult result = sync(session).exchangeBound(arg::queue=i->queue,
arg::exchange=i->exchange,
arg::bindingKey=i->key);
if (result.getQueueNotMatched() || result.getKeyNotMatched()) {
- throw AssertionFailed((boost::format("No such binding [exchange=%1%, queue=%2%, key=%3%]")
+ throw AssertionFailed((boost::format("No such binding [exchange=%1%, queue=%2%, key=%3%]")
% i->exchange % i->queue % i->key).str());
}
}
@@ -950,7 +952,7 @@ void Node::convert(const Variant& options, FieldTable& arguments)
{
if (!options.isVoid()) {
translate(options.asMap(), arguments);
- }
+ }
}
std::vector<std::string> Node::RECEIVER_MODES = list_of<std::string>(ALWAYS) (RECEIVER);
std::vector<std::string> Node::SENDER_MODES = list_of<std::string>(ALWAYS) (SENDER);
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
index a20babdeb4..3cfd2e37f2 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
+++ b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
@@ -259,6 +259,7 @@ void ConnectionImpl::reopen()
void ConnectionImpl::connect(const qpid::sys::AbsTime& started)
{
+ QPID_LOG(debug, "Starting connection, urls=" << asString(urls));
for (double i = minReconnectInterval; !tryConnect(); i = std::min(i*2, maxReconnectInterval)) {
if (!reconnect) {
throw qpid::messaging::TransportFailure("Failed to connect (reconnect disabled)");
@@ -269,8 +270,11 @@ void ConnectionImpl::connect(const qpid::sys::AbsTime& started)
if (expired(started, timeout)) {
throw qpid::messaging::TransportFailure("Failed to connect within reconnect timeout");
}
+ QPID_LOG(debug, "Connection retry in " << i*1000*1000 << " microseconds, urls="
+ << asString(urls));
qpid::sys::usleep(int64_t(i*1000*1000)); // Sleep in microseconds.
}
+ QPID_LOG(debug, "Connection successful, urls=" << asString(urls));
retries = 0;
}
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
index 2d05755fc7..e832cd2567 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
+++ b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
@@ -198,7 +198,8 @@ bool IncomingMessages::process(Handler* handler, qpid::sys::Duration duration)
if (content->isA<MessageTransferBody>()) {
MessageTransfer transfer(content, *this);
if (handler && handler->accept(transfer)) {
- QPID_LOG(debug, "Delivered " << *content->getMethod());
+ QPID_LOG(debug, "Delivered " << *content->getMethod() << " "
+ << *content->getHeaders());
return true;
} else {
//received message for another destination, keep for later