summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-01-25 18:48:55 +0000
committerAlan Conway <aconway@apache.org>2012-01-25 18:48:55 +0000
commit9f5d14cfd7d497a5f70c508b22fa7c31070a0a62 (patch)
tree67888fca8b473242908f9ac2612213c4ad20de9e
parent1986c2fa3d11de7856458d5eae34fdc3f80a42b0 (diff)
downloadqpid-python-9f5d14cfd7d497a5f70c508b22fa7c31070a0a62.tar.gz
QPID-3603: Test to verify C++ client failover is working.
- TcpConnector: set identifier early so it is available in error messages. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3603-2@1235871 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/client/TCPConnector.cpp8
-rw-r--r--qpid/cpp/src/qpid/ha/ConnectionExcluder.cpp2
-rw-r--r--qpid/cpp/src/tests/brokertest.py52
-rwxr-xr-xqpid/cpp/src/tests/cluster_tests.py4
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py53
5 files changed, 71 insertions, 48 deletions
diff --git a/qpid/cpp/src/qpid/client/TCPConnector.cpp b/qpid/cpp/src/qpid/client/TCPConnector.cpp
index 4660a41c07..51eacf77e8 100644
--- a/qpid/cpp/src/qpid/client/TCPConnector.cpp
+++ b/qpid/cpp/src/qpid/client/TCPConnector.cpp
@@ -97,7 +97,7 @@ void TCPConnector::connect(const std::string& host, const std::string& port) {
boost::bind(&TCPConnector::connected, this, _1),
boost::bind(&TCPConnector::connectFailed, this, _3));
closed = false;
-
+ identifier = str(format("[%1%]") % socket.getFullAddress());
connector->start(poller);
}
@@ -120,8 +120,6 @@ void TCPConnector::start(sys::AsynchIO* aio_) {
for (int i = 0; i < 4; i++) {
aio->queueReadBuffer(new Buff(maxFrameSize));
}
-
- identifier = str(format("[%1%]") % socket.getFullAddress());
}
void TCPConnector::initAmqp() {
@@ -131,7 +129,7 @@ void TCPConnector::initAmqp() {
void TCPConnector::connectFailed(const std::string& msg) {
connector = 0;
- QPID_LOG(warning, "Connect failed: " << msg);
+ QPID_LOG(warning, "Connect failed: " << msg << " " << identifier);
socket.close();
if (!closed)
closed = true;
@@ -185,7 +183,7 @@ sys::ShutdownHandler* TCPConnector::getShutdownHandler() const {
return shutdownHandler;
}
-const std::string& TCPConnector::getIdentifier() const {
+const std::string& TCPConnector::getIdentifier() const {
return identifier;
}
diff --git a/qpid/cpp/src/qpid/ha/ConnectionExcluder.cpp b/qpid/cpp/src/qpid/ha/ConnectionExcluder.cpp
index 396217e0ff..141e1da2bc 100644
--- a/qpid/cpp/src/qpid/ha/ConnectionExcluder.cpp
+++ b/qpid/cpp/src/qpid/ha/ConnectionExcluder.cpp
@@ -34,8 +34,6 @@ void ConnectionExcluder::opened(broker::Connection& connection) {
&& !connection.getClientProperties().isSet(ADMIN_TAG))
throw Exception(
QPID_MSG("HA: Backup broker rejected connection " << connection.getMgmtId()));
- else
- QPID_LOG(debug, "HA: Backup broker accepted connection" << connection.getMgmtId());
}
const std::string ConnectionExcluder::ADMIN_TAG="qpid.ha-admin";
diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py
index f4a20eda5a..b2b4d89b0f 100644
--- a/qpid/cpp/src/tests/brokertest.py
+++ b/qpid/cpp/src/tests/brokertest.py
@@ -198,16 +198,17 @@ class Popen(subprocess.Popen):
os.kill( self.pid , signal.SIGTERM)
except AttributeError: # no os.kill, using taskkill.. (Windows only)
os.popen('TASKKILL /PID ' +str(self.pid) + ' /F')
- self._cleanup()
+ self.wait()
def kill(self):
- try: subprocess.Popen.kill(self)
+ try:
+ subprocess.Popen.kill(self)
except AttributeError: # No terminate method
try:
os.kill( self.pid , signal.SIGKILL)
except AttributeError: # no os.kill, using taskkill.. (Windows only)
os.popen('TASKKILL /PID ' +str(self.pid) + ' /F')
- self._cleanup()
+ self.wait()
def _cleanup(self):
"""Clean up after a dead process"""
@@ -555,22 +556,22 @@ class NumberedSender(Thread):
"""
def __init__(self, broker, max_depth=None, queue="test-queue",
- connection_options=Cluster.CONNECTION_OPTIONS):
+ connection_options=Cluster.CONNECTION_OPTIONS,
+ failover_updates=True, url=None):
"""
max_depth: enable flow control, ensure sent - received <= max_depth.
Requires self.notify_received(n) to be called each time messages are received.
"""
Thread.__init__(self)
+ cmd = ["qpid-send",
+ "--broker", url or broker.host_port(),
+ "--address", "%s;{create:always}"%queue,
+ "--connection-options", "{%s}"%(connection_options),
+ "--content-stdin"
+ ]
+ if failover_updates: cmd += ["--failover-updates"]
self.sender = broker.test.popen(
- ["qpid-send",
- "--broker", "localhost:%s"%broker.port(),
- "--address", "%s;{create:always}"%queue,
- "--failover-updates",
- "--connection-options", "{%s}"%(connection_options),
- "--content-stdin"
- ],
- expect=EXPECT_RUNNING,
- stdin=PIPE)
+ cmd, expect=EXPECT_RUNNING, stdin=PIPE)
self.condition = Condition()
self.max = max_depth
self.received = 0
@@ -617,30 +618,31 @@ class NumberedReceiver(Thread):
Thread to run a receiver client and verify it receives
sequentially numbered messages.
"""
- def __init__(self, broker, sender = None, queue="test-queue",
- connection_options=Cluster.CONNECTION_OPTIONS):
+ def __init__(self, broker, sender=None, queue="test-queue",
+ connection_options=Cluster.CONNECTION_OPTIONS,
+ failover_updates=True, url=None):
"""
sender: enable flow control. Call sender.received(n) for each message received.
"""
Thread.__init__(self)
self.test = broker.test
+ cmd = ["qpid-receive",
+ "--broker", url or broker.host_port(),
+ "--address", "%s;{create:always}"%queue,
+ "--connection-options", "{%s}"%(connection_options),
+ "--forever"
+ ]
+ if failover_updates: cmd += [ "--failover-updates" ]
self.receiver = self.test.popen(
- ["qpid-receive",
- "--broker", "localhost:%s"%broker.port(),
- "--address", "%s;{create:always}"%queue,
- "--failover-updates",
- "--connection-options", "{%s}"%(connection_options),
- "--forever"
- ],
- expect=EXPECT_RUNNING,
- stdout=PIPE)
+ cmd, expect=EXPECT_RUNNING, stdout=PIPE)
self.lock = Lock()
self.error = None
self.sender = sender
self.received = 0
def read_message(self):
- return int(self.receiver.stdout.readline())
+ n = int(self.receiver.stdout.readline())
+ return n
def run(self):
try:
diff --git a/qpid/cpp/src/tests/cluster_tests.py b/qpid/cpp/src/tests/cluster_tests.py
index 2db2cdd433..d2de384f08 100755
--- a/qpid/cpp/src/tests/cluster_tests.py
+++ b/qpid/cpp/src/tests/cluster_tests.py
@@ -1046,8 +1046,8 @@ class LongTests(BrokerTest):
# Start sender and receiver threads
cluster[0].declare_queue("test-queue")
- sender = NumberedSender(cluster[0], 1000) # Max queue depth
- receiver = NumberedReceiver(cluster[0], sender)
+ sender = NumberedSender(cluster[0], max_depth=1000)
+ receiver = NumberedReceiver(cluster[0], sender=sender)
receiver.start()
sender.start()
# Wait for sender & receiver to get up and running
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index f3c45ba7a3..6d5dc19aa6 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -22,7 +22,7 @@ import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil
from qpid.messaging import Message, NotFound, ConnectionError, Connection
from brokertest import *
from threading import Thread, Lock, Condition
-from logging import getLogger
+from logging import getLogger, WARN, ERROR, DEBUG
log = getLogger("qpid.ha-tests")
@@ -39,7 +39,8 @@ class ShortTests(BrokerTest):
] + args,
**kwargs)
- # FIXME aconway 2011-11-15: work around async replication.
+ # FIXME aconway 2011-11-15: work around async wiring replication.
+ # Wait for an address to become valid.
def wait(self, session, address):
def check():
try:
@@ -48,6 +49,13 @@ class ShortTests(BrokerTest):
except NotFound: return False
assert retry(check), "Timed out waiting for %s"%(address)
+ # FIXME aconway 2012-01-23: workaround: we need to give the
+ # backup a chance to attach to the queue.
+ def wait_backup(self, backup, address):
+ bs = self.connect_admin(backup).session()
+ self.wait(bs, address)
+ bs.connection.close()
+
def set_ha_status(self, address, status):
os.system("qpid-ha-status %s %s"%(address, status))
@@ -209,7 +217,8 @@ class ShortTests(BrokerTest):
raise
def test_failover(self):
- """Verify that backups rejects connections and that fail-over works"""
+ """Verify that backups rejects connections and that fail-over works in python client"""
+ getLogger().setLevel(ERROR) # Disable WARNING log messages due to failover
primary = self.ha_broker(name="primary", expect=EXPECT_EXIT_FAIL, broker_url="primary") # Temp hack to identify primary
backup = self.ha_broker(name="backup", broker_url=primary.host_port())
# Check that backup rejects normal connections
@@ -223,20 +232,36 @@ class ShortTests(BrokerTest):
# Test discovery: should connect to primary after reject by backup
c = backup.connect(reconnect_urls=[primary.host_port(), backup.host_port()], reconnect=True)
s = c.session()
- s.sender("q;{create:always,%s}"%(self.qpid_replicate())).send("foo", sync=True)
- # FIXME aconway 2012-01-23: we shouldn't need the wait and retry here,
- # send(sync=True) shouldn't return till the backup acknowledges.
- bs = self.connect_admin(backup).session()
- self.wait(bs, "q")
- self.assert_browse_retry(bs, "q", ["foo"])
- bs.connection.close()
+ sender = s.sender("q;{create:always,%s}"%(self.qpid_replicate()))
+ self.wait_backup(backup, "q")
+ sender.send("foo")
+ primary.kill()
+ assert retry(lambda: not is_running(primary.pid))
+ self.set_ha_status(backup.host_port(), "primary") # Promote the backup
+ self.assert_browse_retry(s, "q", ["foo"])
+ c.close()
+
+ def test_failover_cpp(self):
+ primary = self.ha_broker(name="primary", expect=EXPECT_EXIT_FAIL, broker_url="primary") # Temp hack to identify primary
+ backup = self.ha_broker(name="backup", broker_url=primary.host_port())
+ url="%s,%s"%(primary.host_port(), backup.host_port())
+ primary.connect().session().sender("q;{create:always,%s}"%(self.qpid_replicate()))
+ self.wait_backup(backup, "q")
+
+ sender = NumberedSender(primary, url=url, queue="q", failover_updates = False)
+ receiver = NumberedReceiver(primary, url=url, queue="q", failover_updates = False)
+ receiver.start()
+ sender.start()
+ self.wait_backup(backup, "q")
+ assert retry(lambda: receiver.received > 10) # Wait for some messages to get thru
primary.kill()
- # Promote the backup
+ assert retry(lambda: not is_running(primary.pid)) # Wait for primary to die
self.set_ha_status(backup.host_port(), "primary")
- # FIXME aconway 2012-01-23: should re-use session s below
- self.assert_browse_retry(c.session(), "q", ["foo"])
- c.close()
+ n = receiver.received # Make sure we are still running
+ assert retry(lambda: receiver.received > n + 10)
+ sender.stop()
+ receiver.stop()
if __name__ == "__main__":
shutil.rmtree("brokertest.tmp", True)