summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/ha/StatusCheck.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/ha/StatusCheck.cpp')
-rw-r--r--qpid/cpp/src/qpid/ha/StatusCheck.cpp23
1 files changed, 18 insertions, 5 deletions
diff --git a/qpid/cpp/src/qpid/ha/StatusCheck.cpp b/qpid/cpp/src/qpid/ha/StatusCheck.cpp
index 8acf8d6cdc..b6bce0fd7b 100644
--- a/qpid/cpp/src/qpid/ha/StatusCheck.cpp
+++ b/qpid/cpp/src/qpid/ha/StatusCheck.cpp
@@ -23,6 +23,7 @@
#include "HaBroker.h"
#include "qpid/broker/Broker.h"
#include "qpid/log/Statement.h"
+#include "qpid/messaging/shutdown.h"
#include "qpid/messaging/Address.h"
#include "qpid/messaging/Connection.h"
#include "qpid/messaging/Message.h"
@@ -97,14 +98,17 @@ void StatusCheckThread::run() {
string status = details["status"].getString();
QPID_LOG(debug, logPrefix << status);
if (status != "joining") {
- statusCheck.setPromote(false);
+ statusCheck.noPromote();
QPID_LOG(info, logPrefix << "Joining established cluster");
}
}
else
QPID_LOG(error, logPrefix << "Invalid response " << response.getContent());
- } catch(...) {}
+ } catch(const std::exception& e) {
+ QPID_LOG(info, logPrefix << e.what());
+ }
try { c.close(); } catch(...) {}
+ statusCheck.endThread();
delete this;
}
@@ -117,16 +121,25 @@ StatusCheck::StatusCheck(HaBroker& hb) :
{}
StatusCheck::~StatusCheck() {
- // Join any leftovers
+ // In case canPromote was never called.
for (size_t i = 0; i < threads.size(); ++i) threads[i].join();
}
void StatusCheck::setUrl(const Url& url) {
Mutex::ScopedLock l(lock);
+ threadCount = url.size();
for (size_t i = 0; i < url.size(); ++i)
threads.push_back(Thread(new StatusCheckThread(*this, url[i])));
}
+void StatusCheck::endThread() {
+ // Shut down the client poller ASAP to avoid conflict with the broker's poller.
+ // See https://issues.apache.org/jira/browse/QPID-7149
+ if (--threadCount == 0) {
+ messaging::shutdown();
+ }
+}
+
bool StatusCheck::canPromote() {
Mutex::ScopedLock l(lock);
while (!threads.empty()) {
@@ -138,9 +151,9 @@ bool StatusCheck::canPromote() {
return promote;
}
-void StatusCheck::setPromote(bool p) {
+void StatusCheck::noPromote() {
Mutex::ScopedLock l(lock);
- promote = p;
+ promote = false;
}
}} // namespace qpid::ha