diff options
author | Alan Conway <aconway@apache.org> | 2012-01-19 23:08:02 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2012-01-19 23:08:02 +0000 |
commit | 9cd80cb66a9b832db519d03d75b6de21011c0c2f (patch) | |
tree | 0decad55031230860c21f59c10745ef5bf4c95e9 | |
parent | c22541a432b9734e015f78f46543ca97a9cab043 (diff) | |
download | qpid-python-9cd80cb66a9b832db519d03d75b6de21011c0c2f.tar.gz |
QPID-3603: HA logging improvements.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3603-2@1233678 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/broker/Link.cpp | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/QueueReplicator.cpp | 12 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp | 17 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/ReplicatingSubscription.h | 4 |
4 files changed, 18 insertions, 16 deletions
diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp index d586233e6d..cfa8dfda87 100644 --- a/qpid/cpp/src/qpid/broker/Link.cpp +++ b/qpid/cpp/src/qpid/broker/Link.cpp @@ -249,7 +249,6 @@ void Link::ioThreadProcessing() if (state != STATE_OPERATIONAL) return; - QPID_LOG(debug, "Link::ioThreadProcessing()"); // check for bridge session errors and recover if (!active.empty()) { diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp index 0929cc718d..e11fb8eb37 100644 --- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp @@ -55,7 +55,7 @@ QueueReplicator::QueueReplicator(boost::shared_ptr<Queue> q, boost::shared_ptr<L : Exchange(replicatorName(q->getName()), 0, q->getBroker()), queue(q), link(l) { std::stringstream ss; - ss << "HA: Backup queue " << queue->getName() << ": "; + ss << "HA: Backup " << queue->getName() << ": "; logPrefix = ss.str(); QPID_LOG(info, logPrefix << "Created, settings: " << q->getSettings()); } @@ -133,12 +133,10 @@ template <class T> T decodeContent(Message& m) { void QueueReplicator::dequeue(SequenceNumber n, const sys::Mutex::ScopedLock&) { // Thread safe: only calls thread safe Queue functions. - if (queue->getPosition() >= n) { // Ignore dequeus we haven't reached yet + if (queue->getPosition() >= n) { // Ignore messages we haven't reached yet QueuedMessage message; - if (queue->acquireMessageAt(n, message)) { + if (queue->acquireMessageAt(n, message)) queue->dequeue(0, message); - QPID_LOG(trace, logPrefix << "Dequeued message "<< message.position); - } } } @@ -148,13 +146,13 @@ void QueueReplicator::route(Deliverable& msg, const std::string& key, const Fiel sys::Mutex::ScopedLock l(lock); if (key == DEQUEUE_EVENT_KEY) { SequenceSet dequeues = decodeContent<SequenceSet>(msg.getMessage()); - QPID_LOG(trace, logPrefix << "Dequeue update: " << dequeues); + QPID_LOG(trace, logPrefix << "Dequeue: " << dequeues); //TODO: should be able to optimise the following for (SequenceSet::iterator i = dequeues.begin(); i != dequeues.end(); i++) dequeue(*i, l); } else if (key == POSITION_EVENT_KEY) { SequenceNumber position = decodeContent<SequenceNumber>(msg.getMessage()); - QPID_LOG(trace, logPrefix << "Position update: from " << queue->getPosition() + QPID_LOG(trace, logPrefix << "Position moved from " << queue->getPosition() << " to " << position); assert(queue->getPosition() <= position); //TODO aconway 2011-12-14: Optimize this? diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp index 733492db81..6c33002b5c 100644 --- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp @@ -47,6 +47,7 @@ string mask(const string& in) return DOLLAR + in + INTERNAL; } +/* Called by SemanticState::consume to create a consumer */ boost::shared_ptr<broker::SemanticState::ConsumerImpl> ReplicatingSubscription::Factory::create( SemanticState* parent, @@ -122,8 +123,6 @@ bool ReplicatingSubscription::deliver(QueuedMessage& m) { SequenceNumber send(m.position); --send; // Send the position before m was enqueued. sendPositionEvent(send, l); - QPID_LOG(trace, logPrefix << "Sending position " << send - << ", was " << backupPosition); } backupPosition = m.position; QPID_LOG(trace, logPrefix << "Replicating message " << m.position); @@ -137,13 +136,14 @@ ReplicatingSubscription::~ReplicatingSubscription() {} void ReplicatingSubscription::cancel() { QPID_LOG(debug, logPrefix <<"Cancelled backup subscription " << getName()); - getQueue()->removeObserver(boost::dynamic_pointer_cast<QueueObserver>(shared_from_this())); + getQueue()->removeObserver( + boost::dynamic_pointer_cast<QueueObserver>(shared_from_this())); + ConsumerImpl::cancel(); } // Called before we get notified of the message being available and // under the message lock in the queue. Called in arbitrary connection thread. -void ReplicatingSubscription::enqueued(const QueuedMessage& m) -{ +void ReplicatingSubscription::enqueued(const QueuedMessage& m) { //delay completion m.payload->getIngressCompletion().startCompleter(); } @@ -164,6 +164,8 @@ void ReplicatingSubscription::sendDequeueEvent(const sys::Mutex::ScopedLock& l) void ReplicatingSubscription::sendPositionEvent( SequenceNumber position, const sys::Mutex::ScopedLock&l ) { + QPID_LOG(trace, logPrefix << "Sending position " << position + << ", was " << backupPosition); string buf(backupPosition.encodedSize(),'\0'); framing::Buffer buffer(&buf[0], buf.size()); position.encode(buffer); @@ -207,7 +209,6 @@ void ReplicatingSubscription::sendEvent(const std::string& key, framing::Buffer& // the messageLock in the queue. Called in arbitrary connection threads. void ReplicatingSubscription::dequeued(const QueuedMessage& m) { - QPID_LOG(trace, logPrefix << "Dequeued message " << m.position); { sys::Mutex::ScopedLock l(lock); dequeues.add(m.position); @@ -219,8 +220,10 @@ void ReplicatingSubscription::dequeued(const QueuedMessage& m) // not under the message lock? if (m.position > position) { m.payload->getIngressCompletion().finishCompleter(); - QPID_LOG(trace, logPrefix << "Completed message " << m.position << " early"); + QPID_LOG(trace, logPrefix << "Dequeued and completed message " << m.position << " early"); } + else + QPID_LOG(trace, logPrefix << "Dequeued message " << m.position); } notify(); // Ensure a call to doDispatch } diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h index ddee9c8658..147c40ee6d 100644 --- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h @@ -76,13 +76,15 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl, ~ReplicatingSubscription(); - void cancel(); + // QueueObserver overrides. bool deliver(broker::QueuedMessage& msg); void enqueued(const broker::QueuedMessage&); void dequeued(const broker::QueuedMessage&); void acquired(const broker::QueuedMessage&) {} void requeued(const broker::QueuedMessage&) {} + // Consumer overrides. + void cancel(); bool isDelayedCompletion() const { return true; } protected: |