From 9ba2fb5614a95263fc9a0b55b83a9f6f4f02e932 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Thu, 19 Jan 2012 23:07:15 +0000 Subject: QPID-3603: Format static log prefixes at consutruction time. Creating the prefix dynamically caused sporadic core dumps with trace logging on. It is also inefficient. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3603-2@1233672 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/ha/QueueReplicator.cpp | 21 ++++++++--------- qpid/cpp/src/qpid/ha/QueueReplicator.h | 2 +- qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp | 29 ++++++++++++------------ qpid/cpp/src/qpid/ha/ReplicatingSubscription.h | 5 ++-- 4 files changed, 27 insertions(+), 30 deletions(-) diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp index befaaa31ff..f567d61078 100644 --- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp @@ -31,7 +31,7 @@ #include "qpid/framing/FieldTable.h" #include "qpid/log/Statement.h" #include -#include +#include namespace { const std::string QPID_REPLICATOR_("qpid.replicator-"); @@ -51,14 +51,13 @@ std::string QueueReplicator::replicatorName(const std::string& queueName) { return QPID_REPLICATOR_ + queueName; } -std::ostream& operator<<(std::ostream& o, const QueueReplicator& qr) { - return o << "HA: Backup queue " << qr.queue->getName() << ": "; -} - QueueReplicator::QueueReplicator(boost::shared_ptr q, boost::shared_ptr l) : Exchange(replicatorName(q->getName()), 0, q->getBroker()), queue(q), link(l) { - QPID_LOG(info, *this << "Created, settings: " << q->getSettings()); + std::stringstream ss; + ss << "HA: Backup queue " << queue->getName() << ": "; + logPrefix = ss.str(); + QPID_LOG(info, logPrefix << "Created, settings: " << q->getSettings()); } // This must be separate from the constructor so we can call shared_from_this. @@ -112,7 +111,7 @@ void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHa peer.getMessage().subscribe(args.i_src, args.i_dest, 0/*accept-explicit*/, 1/*not-acquired*/, false, "", 0, settings); peer.getMessage().flow(getName(), 0, 0xFFFFFFFF); peer.getMessage().flow(getName(), 1, 0xFFFFFFFF); - QPID_LOG(debug, *this << "Activated bridge from " << args.i_src << " to " << args.i_dest); + QPID_LOG(debug, logPrefix << "Activated bridge from " << args.i_src << " to " << args.i_dest); // Reset self reference so this will be deleted when all external refs are gone. self.reset(); } @@ -134,7 +133,7 @@ void QueueReplicator::dequeue(SequenceNumber n, const sys::Mutex::ScopedLock&) QueuedMessage message; if (queue->acquireMessageAt(n, message)) { queue->dequeue(0, message); - QPID_LOG(trace, *this << "Dequeued message "<< message.position); + QPID_LOG(trace, logPrefix << "Dequeued message "<< message.position); } } } @@ -145,13 +144,13 @@ void QueueReplicator::route(Deliverable& msg, const std::string& key, const Fiel sys::Mutex::ScopedLock l(lock); if (key == DEQUEUE_EVENT_KEY) { SequenceSet dequeues = decodeContent(msg.getMessage()); - QPID_LOG(trace, *this << "Received dequeues: " << dequeues); + QPID_LOG(trace, logPrefix << "Received dequeues: " << dequeues); //TODO: should be able to optimise the following for (SequenceSet::iterator i = dequeues.begin(); i != dequeues.end(); i++) dequeue(*i, l); } else if (key == POSITION_EVENT_KEY) { SequenceNumber position = decodeContent(msg.getMessage()); - QPID_LOG(trace, *this << "Advance position: from " << queue->getPosition() + QPID_LOG(trace, logPrefix << "Advance position: from " << queue->getPosition() << " to " << position); assert(queue->getPosition() <= position); //TODO aconway 2011-12-14: Optimize this? @@ -160,7 +159,7 @@ void QueueReplicator::route(Deliverable& msg, const std::string& key, const Fiel queue->setPosition(position); } else { msg.deliverTo(queue); - QPID_LOG(trace, *this << "Enqueued message " << queue->getPosition()); + QPID_LOG(trace, logPrefix << "Enqueued message " << queue->getPosition()); } } diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.h b/qpid/cpp/src/qpid/ha/QueueReplicator.h index 5bdafb83c8..ad820346d5 100644 --- a/qpid/cpp/src/qpid/ha/QueueReplicator.h +++ b/qpid/cpp/src/qpid/ha/QueueReplicator.h @@ -71,11 +71,11 @@ class QueueReplicator : public broker::Exchange, void initializeBridge(broker::Bridge& bridge, broker::SessionHandler& sessionHandler); void dequeue(framing::SequenceNumber, const sys::Mutex::ScopedLock&); + std::string logPrefix; sys::Mutex lock; boost::shared_ptr queue; boost::shared_ptr link; boost::shared_ptr self; - friend std::ostream& operator<<(std::ostream&, const QueueReplicator&); }; }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp index be7694b93c..a77154c595 100644 --- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp @@ -26,7 +26,7 @@ #include "qpid/framing/AMQFrame.h" #include "qpid/framing/MessageTransferBody.h" #include "qpid/log/Statement.h" -#include "ostream" +#include namespace qpid { namespace ha { @@ -42,13 +42,6 @@ const string DOLLAR("$"); const string INTERNAL("-internal"); } // namespace - -ostream& operator<<(ostream& o, const ReplicatingSubscription& rs) { - string url = rs.parent->getSession().getConnection().getUrl(); - string qname= rs.getQueue()->getName(); - return o << "HA: Primary: " << qname << "(" << url << "):"; -} - string mask(const string& in) { return DOLLAR + in + INTERNAL; @@ -95,6 +88,12 @@ ReplicatingSubscription::ReplicatingSubscription( events(new Queue(mask(name))), consumer(new DelegatingConsumer(*this)) { + stringstream ss; + string url = parent->getSession().getConnection().getUrl(); + string qname = getQueue()->getName(); + ss << "HA: Primary queue " << qname << ", backup " << url << ": "; + logPrefix = ss.str(); + // FIXME aconway 2011-12-09: Failover optimization removed. // There was code here to re-use messages already on the backup // during fail-over. This optimization was removed to simplify @@ -102,7 +101,7 @@ ReplicatingSubscription::ReplicatingSubscription( // can be re-introduced later. Last revision with the optimization: // r1213258 | QPID-3603: Fix QueueReplicator subscription parameters. - QPID_LOG(debug, *this << "Created subscription " << name); + QPID_LOG(debug, logPrefix << "Created subscription " << name); // Note that broker::Queue::getPosition() returns the sequence // number that will be assigned to the next message *minus 1*. @@ -133,12 +132,12 @@ bool ReplicatingSubscription::deliver(QueuedMessage& m) { SequenceNumber send(position); --send; // Send the position before m was enqueued. sendPositionEvent(send, l); - QPID_LOG(trace, *this << "Sending position " << send + QPID_LOG(trace, logPrefix << "Sending position " << send << ", was " << backupPosition); } backupPosition = position; } - QPID_LOG(trace, *this << "Replicating message " << m.position); + QPID_LOG(trace, logPrefix << "Replicating message " << m.position); } return ConsumerImpl::deliver(m); } @@ -148,7 +147,7 @@ ReplicatingSubscription::~ReplicatingSubscription() {} // Called in the subscription's connection thread. void ReplicatingSubscription::cancel() { - QPID_LOG(debug, *this <<"Cancelled"); + QPID_LOG(debug, logPrefix <<"Cancelled"); getQueue()->removeObserver(boost::dynamic_pointer_cast(shared_from_this())); } @@ -163,7 +162,7 @@ void ReplicatingSubscription::enqueued(const QueuedMessage& m) // Called with lock held. Called in subscription's connection thread. void ReplicatingSubscription::sendDequeueEvent(const sys::Mutex::ScopedLock& l) { - QPID_LOG(trace, *this << "Sending dequeues " << dequeues); + QPID_LOG(trace, logPrefix << "Sending dequeues " << dequeues); string buf(dequeues.encodedSize(),'\0'); framing::Buffer buffer(&buf[0], buf.size()); dequeues.encode(buffer); @@ -219,7 +218,7 @@ void ReplicatingSubscription::sendEvent(const std::string& key, framing::Buffer& // the message lock in the queue. Called in arbitrary connection threads. void ReplicatingSubscription::dequeued(const QueuedMessage& m) { - QPID_LOG(trace, *this << "Dequeued message " << m.position); + QPID_LOG(trace, logPrefix << "Dequeued message " << m.position); { sys::Mutex::ScopedLock l(lock); dequeues.add(m.position); @@ -229,7 +228,7 @@ void ReplicatingSubscription::dequeued(const QueuedMessage& m) // we're not in the dispatch thread. if (m.position > position) { m.payload->getIngressCompletion().finishCompleter(); - QPID_LOG(trace, *this << "Completed message " << m.position << " early"); + QPID_LOG(trace, logPrefix << "Completed message " << m.position << " early"); } } diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h index 688d2b7726..ddee9c8658 100644 --- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h @@ -88,6 +88,7 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl, protected: bool doDispatch(); private: + std::string logPrefix; boost::shared_ptr events; boost::shared_ptr consumer; qpid::framing::SequenceSet dequeues; @@ -97,6 +98,7 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl, void sendPositionEvent(framing::SequenceNumber, const sys::Mutex::ScopedLock&); void sendEvent(const std::string& key, framing::Buffer&, const sys::Mutex::ScopedLock&); + class DelegatingConsumer : public Consumer { public: @@ -112,9 +114,6 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl, private: ReplicatingSubscription& delegate; }; - - /** Print a identifier for a ReplicatingSubscription */ - friend std::ostream& operator<<(std::ostream&, const ReplicatingSubscription&); }; -- cgit v1.2.1