#!/usr/bin/python # # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # http://www.apache.org/licenses/LICENSE-2.0 # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import unittest, threading, re, sys if sys.version_info < (3,): range = xrange import zookeeper, zktestbase ZOO_OPEN_ACL_UNSAFE = {"perms":0x1f, "scheme":"world", "id" :"anyone"} class ConnectionTest(zktestbase.TestBase): """Test whether we can make a connection""" def setUp(self): pass def testconnection(self): cv = threading.Condition() self.connected = False def connection_watcher(handle, type, state, path): cv.acquire() self.connected = True self.assertEqual(zookeeper.CONNECTED_STATE, state) self.handle = handle cv.notify() cv.release() cv.acquire() ret = zookeeper.init(self.host, connection_watcher) cv.wait(15.0) cv.release() self.assertEqual(self.connected, True, "Connection timed out to " + self.host) self.assertEqual(zookeeper.CONNECTED_STATE, zookeeper.state(self.handle)) self.assertEqual(zookeeper.close(self.handle), zookeeper.OK) # Trying to close the same handle twice is an error, and the C library will segfault on it # so make sure this is caught at the Python module layer self.assertRaises(zookeeper.ZooKeeperException, zookeeper.close, self.handle) self.assertRaises(zookeeper.ZooKeeperException, zookeeper.get, self.handle, "/") def testsslconnection(self): cv = threading.Condition() self.connected = False def connection_watcher(handle, type, state, path): cv.acquire() self.connected = True self.assertEqual(zookeeper.CONNECTED_STATE, state) self.handle = handle cv.notify() cv.release() cv.acquire() ret = zookeeper.init_ssl(self.sslhost, self.sslcert, connection_watcher) cv.wait(15.0) cv.release() self.assertEqual(self.connected, True, "SSL Connection timed out to " + self.host) self.assertEqual(zookeeper.CONNECTED_STATE, zookeeper.state(self.handle)) self.assertEqual(zookeeper.close(self.handle), zookeeper.OK) # Trying to close the same handle twice is an error, and the C library will segfault on it # so make sure this is caught at the Python module layer self.assertRaises(zookeeper.ZooKeeperException, zookeeper.close, self.handle) self.assertRaises(zookeeper.ZooKeeperException, zookeeper.get, self.handle, "/") def testhandlereuse(self): """ Test a) multiple concurrent connections b) reuse of closed handles """ cv = threading.Condition() self.connected = False def connection_watcher(handle, type, state, path): cv.acquire() self.connected = True self.assertEqual(zookeeper.CONNECTED_STATE, state) self.handle = handle cv.notify() cv.release() cv.acquire() handles = [ zookeeper.init(self.host) for i in range(10) ] ret = zookeeper.init(self.host, connection_watcher) cv.wait(15.0) cv.release() self.assertEqual(self.connected, True, "Connection timed out to " + self.host) self.assertEqual(True, self.all( [ zookeeper.state(handle) == zookeeper.CONNECTED_STATE for handle in handles ] ), "Not all connections succeeded") oldhandle = handles[3] zookeeper.close(oldhandle) newhandle = zookeeper.init(self.host) # This assertion tests *internal* behaviour; i.e. that the module # correctly reuses closed handles. This is therefore implementation # dependent. self.assertEqual(newhandle, oldhandle, "Didn't get reused handle") def testmanyhandles(self): """ Test the ability of the module to support many handles. """ # We'd like to do more, but currently the C client doesn't # work with > 83 handles (fails to create a pipe) on MacOS 10.5.8 handles = [ zookeeper.init(self.host) for i in range(9) ] cv = threading.Condition() self.connected = False def connection_watcher(handle, type, state, path): cv.acquire() self.connected = True self.assertEqual(zookeeper.CONNECTED_STATE, state) self.handle = handle cv.notify() cv.release() cv.acquire() ret = zookeeper.init(self.host, connection_watcher) cv.wait(15.0) cv.release() self.assertEqual(self.connected, True, "Connection timed out to " + self.host) for i,h in enumerate(handles): path = "/zkpython-test-handles-%s" % str(i) self.assertEqual(path, zookeeper.create(h, path, "", [ZOO_OPEN_ACL_UNSAFE], zookeeper.EPHEMERAL)) self.assertEqual(True, self.all( zookeeper.close(h) == zookeeper.OK for h in handles )) def testversionstringexists(self): self.assertTrue(hasattr(zookeeper, '__version__')) self.assertTrue(re.match("\d.\d.\d", zookeeper.__version__)) def tearDown(self): pass if __name__ == '__main__': unittest.main()