summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-01-19 23:07:15 +0000
committerAlan Conway <aconway@apache.org>2012-01-19 23:07:15 +0000
commit9ba2fb5614a95263fc9a0b55b83a9f6f4f02e932 (patch)
tree2d91407eda2bccbc79ef3bd892bd73227123695e
parent20216bef51bb1c1b7c005d8adaddb4d0d281a6ef (diff)
downloadqpid-python-9ba2fb5614a95263fc9a0b55b83a9f6f4f02e932.tar.gz
QPID-3603: Format static log prefixes at consutruction time.
Creating the prefix dynamically caused sporadic core dumps with trace logging on. It is also inefficient. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3603-2@1233672 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.cpp21
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.h2
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp29
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.h5
4 files changed, 27 insertions, 30 deletions
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
index befaaa31ff..f567d61078 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -31,7 +31,7 @@
#include "qpid/framing/FieldTable.h"
#include "qpid/log/Statement.h"
#include <boost/shared_ptr.hpp>
-#include <ostream>
+#include <sstream>
namespace {
const std::string QPID_REPLICATOR_("qpid.replicator-");
@@ -51,14 +51,13 @@ std::string QueueReplicator::replicatorName(const std::string& queueName) {
return QPID_REPLICATOR_ + queueName;
}
-std::ostream& operator<<(std::ostream& o, const QueueReplicator& qr) {
- return o << "HA: Backup queue " << qr.queue->getName() << ": ";
-}
-
QueueReplicator::QueueReplicator(boost::shared_ptr<Queue> q, boost::shared_ptr<Link> l)
: Exchange(replicatorName(q->getName()), 0, q->getBroker()), queue(q), link(l)
{
- QPID_LOG(info, *this << "Created, settings: " << q->getSettings());
+ std::stringstream ss;
+ ss << "HA: Backup queue " << queue->getName() << ": ";
+ logPrefix = ss.str();
+ QPID_LOG(info, logPrefix << "Created, settings: " << q->getSettings());
}
// This must be separate from the constructor so we can call shared_from_this.
@@ -112,7 +111,7 @@ void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHa
peer.getMessage().subscribe(args.i_src, args.i_dest, 0/*accept-explicit*/, 1/*not-acquired*/, false, "", 0, settings);
peer.getMessage().flow(getName(), 0, 0xFFFFFFFF);
peer.getMessage().flow(getName(), 1, 0xFFFFFFFF);
- QPID_LOG(debug, *this << "Activated bridge from " << args.i_src << " to " << args.i_dest);
+ QPID_LOG(debug, logPrefix << "Activated bridge from " << args.i_src << " to " << args.i_dest);
// Reset self reference so this will be deleted when all external refs are gone.
self.reset();
}
@@ -134,7 +133,7 @@ void QueueReplicator::dequeue(SequenceNumber n, const sys::Mutex::ScopedLock&)
QueuedMessage message;
if (queue->acquireMessageAt(n, message)) {
queue->dequeue(0, message);
- QPID_LOG(trace, *this << "Dequeued message "<< message.position);
+ QPID_LOG(trace, logPrefix << "Dequeued message "<< message.position);
}
}
}
@@ -145,13 +144,13 @@ void QueueReplicator::route(Deliverable& msg, const std::string& key, const Fiel
sys::Mutex::ScopedLock l(lock);
if (key == DEQUEUE_EVENT_KEY) {
SequenceSet dequeues = decodeContent<SequenceSet>(msg.getMessage());
- QPID_LOG(trace, *this << "Received dequeues: " << dequeues);
+ QPID_LOG(trace, logPrefix << "Received dequeues: " << dequeues);
//TODO: should be able to optimise the following
for (SequenceSet::iterator i = dequeues.begin(); i != dequeues.end(); i++)
dequeue(*i, l);
} else if (key == POSITION_EVENT_KEY) {
SequenceNumber position = decodeContent<SequenceNumber>(msg.getMessage());
- QPID_LOG(trace, *this << "Advance position: from " << queue->getPosition()
+ QPID_LOG(trace, logPrefix << "Advance position: from " << queue->getPosition()
<< " to " << position);
assert(queue->getPosition() <= position);
//TODO aconway 2011-12-14: Optimize this?
@@ -160,7 +159,7 @@ void QueueReplicator::route(Deliverable& msg, const std::string& key, const Fiel
queue->setPosition(position);
} else {
msg.deliverTo(queue);
- QPID_LOG(trace, *this << "Enqueued message " << queue->getPosition());
+ QPID_LOG(trace, logPrefix << "Enqueued message " << queue->getPosition());
}
}
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.h b/qpid/cpp/src/qpid/ha/QueueReplicator.h
index 5bdafb83c8..ad820346d5 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.h
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.h
@@ -71,11 +71,11 @@ class QueueReplicator : public broker::Exchange,
void initializeBridge(broker::Bridge& bridge, broker::SessionHandler& sessionHandler);
void dequeue(framing::SequenceNumber, const sys::Mutex::ScopedLock&);
+ std::string logPrefix;
sys::Mutex lock;
boost::shared_ptr<broker::Queue> queue;
boost::shared_ptr<broker::Link> link;
boost::shared_ptr<QueueReplicator> self;
- friend std::ostream& operator<<(std::ostream&, const QueueReplicator&);
};
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
index be7694b93c..a77154c595 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
@@ -26,7 +26,7 @@
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/log/Statement.h"
-#include "ostream"
+#include <sstream>
namespace qpid {
namespace ha {
@@ -42,13 +42,6 @@ const string DOLLAR("$");
const string INTERNAL("-internal");
} // namespace
-
-ostream& operator<<(ostream& o, const ReplicatingSubscription& rs) {
- string url = rs.parent->getSession().getConnection().getUrl();
- string qname= rs.getQueue()->getName();
- return o << "HA: Primary: " << qname << "(" << url << "):";
-}
-
string mask(const string& in)
{
return DOLLAR + in + INTERNAL;
@@ -95,6 +88,12 @@ ReplicatingSubscription::ReplicatingSubscription(
events(new Queue(mask(name))),
consumer(new DelegatingConsumer(*this))
{
+ stringstream ss;
+ string url = parent->getSession().getConnection().getUrl();
+ string qname = getQueue()->getName();
+ ss << "HA: Primary queue " << qname << ", backup " << url << ": ";
+ logPrefix = ss.str();
+
// FIXME aconway 2011-12-09: Failover optimization removed.
// There was code here to re-use messages already on the backup
// during fail-over. This optimization was removed to simplify
@@ -102,7 +101,7 @@ ReplicatingSubscription::ReplicatingSubscription(
// can be re-introduced later. Last revision with the optimization:
// r1213258 | QPID-3603: Fix QueueReplicator subscription parameters.
- QPID_LOG(debug, *this << "Created subscription " << name);
+ QPID_LOG(debug, logPrefix << "Created subscription " << name);
// Note that broker::Queue::getPosition() returns the sequence
// number that will be assigned to the next message *minus 1*.
@@ -133,12 +132,12 @@ bool ReplicatingSubscription::deliver(QueuedMessage& m) {
SequenceNumber send(position);
--send; // Send the position before m was enqueued.
sendPositionEvent(send, l);
- QPID_LOG(trace, *this << "Sending position " << send
+ QPID_LOG(trace, logPrefix << "Sending position " << send
<< ", was " << backupPosition);
}
backupPosition = position;
}
- QPID_LOG(trace, *this << "Replicating message " << m.position);
+ QPID_LOG(trace, logPrefix << "Replicating message " << m.position);
}
return ConsumerImpl::deliver(m);
}
@@ -148,7 +147,7 @@ ReplicatingSubscription::~ReplicatingSubscription() {}
// Called in the subscription's connection thread.
void ReplicatingSubscription::cancel()
{
- QPID_LOG(debug, *this <<"Cancelled");
+ QPID_LOG(debug, logPrefix <<"Cancelled");
getQueue()->removeObserver(boost::dynamic_pointer_cast<QueueObserver>(shared_from_this()));
}
@@ -163,7 +162,7 @@ void ReplicatingSubscription::enqueued(const QueuedMessage& m)
// Called with lock held. Called in subscription's connection thread.
void ReplicatingSubscription::sendDequeueEvent(const sys::Mutex::ScopedLock& l)
{
- QPID_LOG(trace, *this << "Sending dequeues " << dequeues);
+ QPID_LOG(trace, logPrefix << "Sending dequeues " << dequeues);
string buf(dequeues.encodedSize(),'\0');
framing::Buffer buffer(&buf[0], buf.size());
dequeues.encode(buffer);
@@ -219,7 +218,7 @@ void ReplicatingSubscription::sendEvent(const std::string& key, framing::Buffer&
// the message lock in the queue. Called in arbitrary connection threads.
void ReplicatingSubscription::dequeued(const QueuedMessage& m)
{
- QPID_LOG(trace, *this << "Dequeued message " << m.position);
+ QPID_LOG(trace, logPrefix << "Dequeued message " << m.position);
{
sys::Mutex::ScopedLock l(lock);
dequeues.add(m.position);
@@ -229,7 +228,7 @@ void ReplicatingSubscription::dequeued(const QueuedMessage& m)
// we're not in the dispatch thread.
if (m.position > position) {
m.payload->getIngressCompletion().finishCompleter();
- QPID_LOG(trace, *this << "Completed message " << m.position << " early");
+ QPID_LOG(trace, logPrefix << "Completed message " << m.position << " early");
}
}
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
index 688d2b7726..ddee9c8658 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
@@ -88,6 +88,7 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl,
protected:
bool doDispatch();
private:
+ std::string logPrefix;
boost::shared_ptr<broker::Queue> events;
boost::shared_ptr<broker::Consumer> consumer;
qpid::framing::SequenceSet dequeues;
@@ -97,6 +98,7 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl,
void sendPositionEvent(framing::SequenceNumber, const sys::Mutex::ScopedLock&);
void sendEvent(const std::string& key, framing::Buffer&,
const sys::Mutex::ScopedLock&);
+
class DelegatingConsumer : public Consumer
{
public:
@@ -112,9 +114,6 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl,
private:
ReplicatingSubscription& delegate;
};
-
- /** Print a identifier for a ReplicatingSubscription */
- friend std::ostream& operator<<(std::ostream&, const ReplicatingSubscription&);
};