summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-01-19 23:08:02 +0000
committerAlan Conway <aconway@apache.org>2012-01-19 23:08:02 +0000
commit9cd80cb66a9b832db519d03d75b6de21011c0c2f (patch)
tree0decad55031230860c21f59c10745ef5bf4c95e9
parentc22541a432b9734e015f78f46543ca97a9cab043 (diff)
downloadqpid-python-9cd80cb66a9b832db519d03d75b6de21011c0c2f.tar.gz
QPID-3603: HA logging improvements.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3603-2@1233678 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/Link.cpp1
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.cpp12
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp17
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.h4
4 files changed, 18 insertions, 16 deletions
diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp
index d586233e6d..cfa8dfda87 100644
--- a/qpid/cpp/src/qpid/broker/Link.cpp
+++ b/qpid/cpp/src/qpid/broker/Link.cpp
@@ -249,7 +249,6 @@ void Link::ioThreadProcessing()
if (state != STATE_OPERATIONAL)
return;
- QPID_LOG(debug, "Link::ioThreadProcessing()");
// check for bridge session errors and recover
if (!active.empty()) {
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
index 0929cc718d..e11fb8eb37 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -55,7 +55,7 @@ QueueReplicator::QueueReplicator(boost::shared_ptr<Queue> q, boost::shared_ptr<L
: Exchange(replicatorName(q->getName()), 0, q->getBroker()), queue(q), link(l)
{
std::stringstream ss;
- ss << "HA: Backup queue " << queue->getName() << ": ";
+ ss << "HA: Backup " << queue->getName() << ": ";
logPrefix = ss.str();
QPID_LOG(info, logPrefix << "Created, settings: " << q->getSettings());
}
@@ -133,12 +133,10 @@ template <class T> T decodeContent(Message& m) {
void QueueReplicator::dequeue(SequenceNumber n, const sys::Mutex::ScopedLock&) {
// Thread safe: only calls thread safe Queue functions.
- if (queue->getPosition() >= n) { // Ignore dequeus we haven't reached yet
+ if (queue->getPosition() >= n) { // Ignore messages we haven't reached yet
QueuedMessage message;
- if (queue->acquireMessageAt(n, message)) {
+ if (queue->acquireMessageAt(n, message))
queue->dequeue(0, message);
- QPID_LOG(trace, logPrefix << "Dequeued message "<< message.position);
- }
}
}
@@ -148,13 +146,13 @@ void QueueReplicator::route(Deliverable& msg, const std::string& key, const Fiel
sys::Mutex::ScopedLock l(lock);
if (key == DEQUEUE_EVENT_KEY) {
SequenceSet dequeues = decodeContent<SequenceSet>(msg.getMessage());
- QPID_LOG(trace, logPrefix << "Dequeue update: " << dequeues);
+ QPID_LOG(trace, logPrefix << "Dequeue: " << dequeues);
//TODO: should be able to optimise the following
for (SequenceSet::iterator i = dequeues.begin(); i != dequeues.end(); i++)
dequeue(*i, l);
} else if (key == POSITION_EVENT_KEY) {
SequenceNumber position = decodeContent<SequenceNumber>(msg.getMessage());
- QPID_LOG(trace, logPrefix << "Position update: from " << queue->getPosition()
+ QPID_LOG(trace, logPrefix << "Position moved from " << queue->getPosition()
<< " to " << position);
assert(queue->getPosition() <= position);
//TODO aconway 2011-12-14: Optimize this?
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
index 733492db81..6c33002b5c 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
@@ -47,6 +47,7 @@ string mask(const string& in)
return DOLLAR + in + INTERNAL;
}
+/* Called by SemanticState::consume to create a consumer */
boost::shared_ptr<broker::SemanticState::ConsumerImpl>
ReplicatingSubscription::Factory::create(
SemanticState* parent,
@@ -122,8 +123,6 @@ bool ReplicatingSubscription::deliver(QueuedMessage& m) {
SequenceNumber send(m.position);
--send; // Send the position before m was enqueued.
sendPositionEvent(send, l);
- QPID_LOG(trace, logPrefix << "Sending position " << send
- << ", was " << backupPosition);
}
backupPosition = m.position;
QPID_LOG(trace, logPrefix << "Replicating message " << m.position);
@@ -137,13 +136,14 @@ ReplicatingSubscription::~ReplicatingSubscription() {}
void ReplicatingSubscription::cancel()
{
QPID_LOG(debug, logPrefix <<"Cancelled backup subscription " << getName());
- getQueue()->removeObserver(boost::dynamic_pointer_cast<QueueObserver>(shared_from_this()));
+ getQueue()->removeObserver(
+ boost::dynamic_pointer_cast<QueueObserver>(shared_from_this()));
+ ConsumerImpl::cancel();
}
// Called before we get notified of the message being available and
// under the message lock in the queue. Called in arbitrary connection thread.
-void ReplicatingSubscription::enqueued(const QueuedMessage& m)
-{
+void ReplicatingSubscription::enqueued(const QueuedMessage& m) {
//delay completion
m.payload->getIngressCompletion().startCompleter();
}
@@ -164,6 +164,8 @@ void ReplicatingSubscription::sendDequeueEvent(const sys::Mutex::ScopedLock& l)
void ReplicatingSubscription::sendPositionEvent(
SequenceNumber position, const sys::Mutex::ScopedLock&l )
{
+ QPID_LOG(trace, logPrefix << "Sending position " << position
+ << ", was " << backupPosition);
string buf(backupPosition.encodedSize(),'\0');
framing::Buffer buffer(&buf[0], buf.size());
position.encode(buffer);
@@ -207,7 +209,6 @@ void ReplicatingSubscription::sendEvent(const std::string& key, framing::Buffer&
// the messageLock in the queue. Called in arbitrary connection threads.
void ReplicatingSubscription::dequeued(const QueuedMessage& m)
{
- QPID_LOG(trace, logPrefix << "Dequeued message " << m.position);
{
sys::Mutex::ScopedLock l(lock);
dequeues.add(m.position);
@@ -219,8 +220,10 @@ void ReplicatingSubscription::dequeued(const QueuedMessage& m)
// not under the message lock?
if (m.position > position) {
m.payload->getIngressCompletion().finishCompleter();
- QPID_LOG(trace, logPrefix << "Completed message " << m.position << " early");
+ QPID_LOG(trace, logPrefix << "Dequeued and completed message " << m.position << " early");
}
+ else
+ QPID_LOG(trace, logPrefix << "Dequeued message " << m.position);
}
notify(); // Ensure a call to doDispatch
}
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
index ddee9c8658..147c40ee6d 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
@@ -76,13 +76,15 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl,
~ReplicatingSubscription();
- void cancel();
+ // QueueObserver overrides.
bool deliver(broker::QueuedMessage& msg);
void enqueued(const broker::QueuedMessage&);
void dequeued(const broker::QueuedMessage&);
void acquired(const broker::QueuedMessage&) {}
void requeued(const broker::QueuedMessage&) {}
+ // Consumer overrides.
+ void cancel();
bool isDelayedCompletion() const { return true; }
protected: