diff options
Diffstat (limited to 'Lib/test/test_logging.py')
-rw-r--r-- | Lib/test/test_logging.py | 439 |
1 files changed, 329 insertions, 110 deletions
diff --git a/Lib/test/test_logging.py b/Lib/test/test_logging.py index ae4ca18444..ffd0c8bf2b 100644 --- a/Lib/test/test_logging.py +++ b/Lib/test/test_logging.py @@ -26,6 +26,7 @@ import logging.handlers import logging.config import codecs +import configparser import datetime import pickle import io @@ -58,7 +59,9 @@ try: import smtpd from urllib.parse import urlparse, parse_qs from socketserver import (ThreadingUDPServer, DatagramRequestHandler, - ThreadingTCPServer, StreamRequestHandler) + ThreadingTCPServer, StreamRequestHandler, + ThreadingUnixStreamServer, + ThreadingUnixDatagramServer) except ImportError: threading = None try: @@ -75,13 +78,12 @@ try: except ImportError: pass - class BaseTest(unittest.TestCase): """Base class for logging tests.""" log_format = "%(name)s -> %(levelname)s: %(message)s" - expected_log_pat = r"^([\w.]+) -> ([\w]+): ([\d]+)$" + expected_log_pat = r"^([\w.]+) -> (\w+): (\d+)$" message_num = 0 def setUp(self): @@ -93,7 +95,8 @@ class BaseTest(unittest.TestCase): self.saved_handlers = logging._handlers.copy() self.saved_handler_list = logging._handlerList[:] self.saved_loggers = saved_loggers = logger_dict.copy() - self.saved_level_names = logging._levelNames.copy() + self.saved_name_to_level = logging._nameToLevel.copy() + self.saved_level_to_name = logging._levelToName.copy() self.logger_states = logger_states = {} for name in saved_loggers: logger_states[name] = getattr(saved_loggers[name], @@ -135,8 +138,10 @@ class BaseTest(unittest.TestCase): self.root_logger.setLevel(self.original_logging_level) logging._acquireLock() try: - logging._levelNames.clear() - logging._levelNames.update(self.saved_level_names) + logging._levelToName.clear() + logging._levelToName.update(self.saved_level_to_name) + logging._nameToLevel.clear() + logging._nameToLevel.update(self.saved_name_to_level) logging._handlers.clear() logging._handlers.update(self.saved_handlers) logging._handlerList[:] = self.saved_handler_list @@ -150,12 +155,12 @@ class BaseTest(unittest.TestCase): finally: logging._releaseLock() - def assert_log_lines(self, expected_values, stream=None): + def assert_log_lines(self, expected_values, stream=None, pat=None): """Match the collected log lines against the regular expression self.expected_log_pat, and compare the extracted group values to the expected_values list of tuples.""" stream = stream or self.stream - pat = re.compile(self.expected_log_pat) + pat = re.compile(pat or self.expected_log_pat) actual_lines = stream.getvalue().splitlines() self.assertEqual(len(actual_lines), len(expected_values)) for actual, expected in zip(actual_lines, expected_values): @@ -430,7 +435,7 @@ class CustomLevelsAndFiltersTest(BaseTest): """Test various filtering possibilities with custom logging levels.""" # Skip the logger name group. - expected_log_pat = r"^[\w.]+ -> ([\w]+): ([\d]+)$" + expected_log_pat = r"^[\w.]+ -> (\w+): (\d+)$" def setUp(self): BaseTest.setUp(self) @@ -560,7 +565,7 @@ class HandlerTest(BaseTest): self.assertEqual(h.facility, h.LOG_USER) self.assertTrue(h.unixsocket) h.close() - except socket.error: # syslogd might not be available + except OSError: # syslogd might not be available pass for method in ('GET', 'POST', 'PUT'): if method == 'PUT': @@ -647,41 +652,6 @@ class StreamHandlerTest(BaseTest): # -- if it proves to be of wider utility than just test_logging if threading: - class TestSMTPChannel(smtpd.SMTPChannel): - """ - This derived class has had to be created because smtpd does not - support use of custom channel maps, although they are allowed by - asyncore's design. Issue #11959 has been raised to address this, - and if resolved satisfactorily, some of this code can be removed. - """ - def __init__(self, server, conn, addr, sockmap): - asynchat.async_chat.__init__(self, conn, sockmap) - self.smtp_server = server - self.conn = conn - self.addr = addr - self.data_size_limit = None - self.received_lines = [] - self.smtp_state = self.COMMAND - self.seen_greeting = '' - self.mailfrom = None - self.rcpttos = [] - self.received_data = '' - self.fqdn = socket.getfqdn() - self.num_bytes = 0 - try: - self.peer = conn.getpeername() - except socket.error as err: - # a race condition may occur if the other end is closing - # before we can get the peername - self.close() - if err.args[0] != errno.ENOTCONN: - raise - return - self.push('220 %s %s' % (self.fqdn, smtpd.__version__)) - self.set_terminator(b'\r\n') - self.extended_smtp = False - - class TestSMTPServer(smtpd.SMTPServer): """ This class implements a test SMTP server. @@ -702,37 +672,14 @@ if threading: :func:`asyncore.loop`. This avoids changing the :mod:`asyncore` module's global state. """ - channel_class = TestSMTPChannel def __init__(self, addr, handler, poll_interval, sockmap): - self._localaddr = addr - self._remoteaddr = None - self.data_size_limit = None - self.sockmap = sockmap - asyncore.dispatcher.__init__(self, map=sockmap) - try: - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.setblocking(0) - self.set_socket(sock, map=sockmap) - # try to re-use a server port if possible - self.set_reuse_addr() - self.bind(addr) - self.port = sock.getsockname()[1] - self.listen(5) - except: - self.close() - raise + smtpd.SMTPServer.__init__(self, addr, None, map=sockmap) + self.port = self.socket.getsockname()[1] self._handler = handler self._thread = None self.poll_interval = poll_interval - def handle_accepted(self, conn, addr): - """ - Redefined only because the base class does not pass in a - map, forcing use of a global in :mod:`asyncore`. - """ - channel = self.channel_class(self, conn, addr, self.sockmap) - def process_message(self, peer, mailfrom, rcpttos, data): """ Delegates to the handler passed in to the server's constructor. @@ -763,8 +710,8 @@ if threading: :func:`asyncore.loop`. """ try: - asyncore.loop(poll_interval, map=self.sockmap) - except select.error: + asyncore.loop(poll_interval, map=self._map) + except OSError: # On FreeBSD 8, closing the server repeatably # raises this error. We swallow it if the # server has been closed. @@ -871,7 +818,7 @@ if threading: sock, addr = self.socket.accept() if self.sslctx: sock = self.sslctx.wrap_socket(sock, server_side=True) - except socket.error as e: + except OSError as e: # socket errors are silenced by the caller, print them here sys.stderr.write("Got an error:\n%s\n" % e) raise @@ -908,6 +855,9 @@ if threading: super(TestTCPServer, self).server_bind() self.port = self.socket.getsockname()[1] + class TestUnixStreamServer(TestTCPServer): + address_family = socket.AF_UNIX + class TestUDPServer(ControlMixin, ThreadingUDPServer): """ A UDP server which is controllable using :class:`ControlMixin`. @@ -937,7 +887,7 @@ if threading: if data: try: super(DelegatingUDPRequestHandler, self).finish() - except socket.error: + except OSError: if not self.server._closed: raise @@ -955,6 +905,9 @@ if threading: super(TestUDPServer, self).server_close() self._closed = True + class TestUnixDatagramServer(TestUDPServer): + address_family = socket.AF_UNIX + # - end of server_helper section @unittest.skipUnless(threading, 'Threading required for this test.') @@ -991,7 +944,7 @@ class MemoryHandlerTest(BaseTest): """Tests for the MemoryHandler.""" # Do not bother with a logger name group. - expected_log_pat = r"^[\w.]+ -> ([\w]+): ([\d]+)$" + expected_log_pat = r"^[\w.]+ -> (\w+): (\d+)$" def setUp(self): BaseTest.setUp(self) @@ -1044,7 +997,7 @@ class ConfigFileTest(BaseTest): """Reading logging config from a .ini-style config file.""" - expected_log_pat = r"^([\w]+) \+\+ ([\w]+)$" + expected_log_pat = r"^(\w+) \+\+ (\w+)$" # config0 is a standard configuration. config0 = """ @@ -1292,6 +1245,24 @@ class ConfigFileTest(BaseTest): # Original logger output is empty. self.assert_log_lines([]) + def test_config0_using_cp_ok(self): + # A simple config file which overrides the default settings. + with captured_stdout() as output: + file = io.StringIO(textwrap.dedent(self.config0)) + cp = configparser.ConfigParser() + cp.read_file(file) + logging.config.fileConfig(cp) + logger = logging.getLogger() + # Won't output anything + logger.info(self.next_message()) + # Outputs a message + logger.error(self.next_message()) + self.assert_log_lines([ + ('ERROR', '2'), + ], stream=output) + # Original logger output is empty. + self.assert_log_lines([]) + def test_config1_ok(self, config=config1): # A config file defining a sub-parser as well. with captured_stdout() as output: @@ -1381,7 +1352,7 @@ class ConfigFileTest(BaseTest): def test_logger_disabling(self): self.apply_config(self.disable_test) - logger = logging.getLogger('foo') + logger = logging.getLogger('some_pristine_logger') self.assertFalse(logger.disabled) self.apply_config(self.disable_test) self.assertTrue(logger.disabled) @@ -1394,17 +1365,23 @@ class SocketHandlerTest(BaseTest): """Test for SocketHandler objects.""" + if threading: + server_class = TestTCPServer + address = ('localhost', 0) + def setUp(self): """Set up a TCP server to receive log messages, and a SocketHandler pointing to that server's address and port.""" BaseTest.setUp(self) - addr = ('localhost', 0) - self.server = server = TestTCPServer(addr, self.handle_socket, - 0.01) + self.server = server = self.server_class(self.address, + self.handle_socket, 0.01) server.start() server.ready.wait() - self.sock_hdlr = logging.handlers.SocketHandler('localhost', - server.port) + hcls = logging.handlers.SocketHandler + if isinstance(server.server_address, tuple): + self.sock_hdlr = hcls('localhost', server.port) + else: + self.sock_hdlr = hcls(server.server_address, None) self.log_output = '' self.root_logger.removeHandler(self.root_logger.handlers[0]) self.root_logger.addHandler(self.sock_hdlr) @@ -1444,35 +1421,69 @@ class SocketHandlerTest(BaseTest): self.assertEqual(self.log_output, "spam\neggs\n") def test_noserver(self): + # Avoid timing-related failures due to SocketHandler's own hard-wired + # one-second timeout on socket.create_connection() (issue #16264). + self.sock_hdlr.retryStart = 2.5 # Kill the server self.server.stop(2.0) - #The logging call should try to connect, which should fail + # The logging call should try to connect, which should fail try: raise RuntimeError('Deliberate mistake') except RuntimeError: self.root_logger.exception('Never sent') self.root_logger.error('Never sent, either') now = time.time() - self.assertTrue(self.sock_hdlr.retryTime > now) + self.assertGreater(self.sock_hdlr.retryTime, now) time.sleep(self.sock_hdlr.retryTime - now + 0.001) self.root_logger.error('Nor this') +def _get_temp_domain_socket(): + fd, fn = tempfile.mkstemp(prefix='test_logging_', suffix='.sock') + os.close(fd) + # just need a name - file can't be present, or we'll get an + # 'address already in use' error. + os.remove(fn) + return fn + +@unittest.skipUnless(threading, 'Threading required for this test.') +class UnixSocketHandlerTest(SocketHandlerTest): + + """Test for SocketHandler with unix sockets.""" + + if threading: + server_class = TestUnixStreamServer + + def setUp(self): + # override the definition in the base class + self.address = _get_temp_domain_socket() + SocketHandlerTest.setUp(self) + + def tearDown(self): + SocketHandlerTest.tearDown(self) + os.remove(self.address) @unittest.skipUnless(threading, 'Threading required for this test.') class DatagramHandlerTest(BaseTest): """Test for DatagramHandler.""" + if threading: + server_class = TestUDPServer + address = ('localhost', 0) + def setUp(self): """Set up a UDP server to receive log messages, and a DatagramHandler pointing to that server's address and port.""" BaseTest.setUp(self) - addr = ('localhost', 0) - self.server = server = TestUDPServer(addr, self.handle_datagram, 0.01) + self.server = server = self.server_class(self.address, + self.handle_datagram, 0.01) server.start() server.ready.wait() - self.sock_hdlr = logging.handlers.DatagramHandler('localhost', - server.port) + hcls = logging.handlers.DatagramHandler + if isinstance(server.server_address, tuple): + self.sock_hdlr = hcls('localhost', server.port) + else: + self.sock_hdlr = hcls(server.server_address, None) self.log_output = '' self.root_logger.removeHandler(self.root_logger.handlers[0]) self.root_logger.addHandler(self.sock_hdlr) @@ -1507,21 +1518,44 @@ class DatagramHandlerTest(BaseTest): @unittest.skipUnless(threading, 'Threading required for this test.') +class UnixDatagramHandlerTest(DatagramHandlerTest): + + """Test for DatagramHandler using Unix sockets.""" + + if threading: + server_class = TestUnixDatagramServer + + def setUp(self): + # override the definition in the base class + self.address = _get_temp_domain_socket() + DatagramHandlerTest.setUp(self) + + def tearDown(self): + DatagramHandlerTest.tearDown(self) + os.remove(self.address) + +@unittest.skipUnless(threading, 'Threading required for this test.') class SysLogHandlerTest(BaseTest): """Test for SysLogHandler using UDP.""" + if threading: + server_class = TestUDPServer + address = ('localhost', 0) + def setUp(self): """Set up a UDP server to receive log messages, and a SysLogHandler pointing to that server's address and port.""" BaseTest.setUp(self) - addr = ('localhost', 0) - self.server = server = TestUDPServer(addr, self.handle_datagram, - 0.01) + self.server = server = self.server_class(self.address, + self.handle_datagram, 0.01) server.start() server.ready.wait() - self.sl_hdlr = logging.handlers.SysLogHandler(('localhost', - server.port)) + hcls = logging.handlers.SysLogHandler + if isinstance(server.server_address, tuple): + self.sl_hdlr = hcls(('localhost', server.port)) + else: + self.sl_hdlr = hcls(server.server_address) self.log_output = '' self.root_logger.removeHandler(self.root_logger.handlers[0]) self.root_logger.addHandler(self.sl_hdlr) @@ -1559,6 +1593,23 @@ class SysLogHandlerTest(BaseTest): @unittest.skipUnless(threading, 'Threading required for this test.') +class UnixSysLogHandlerTest(SysLogHandlerTest): + + """Test for SysLogHandler with Unix sockets.""" + + if threading: + server_class = TestUnixDatagramServer + + def setUp(self): + # override the definition in the base class + self.address = _get_temp_domain_socket() + SysLogHandlerTest.setUp(self) + + def tearDown(self): + SysLogHandlerTest.tearDown(self) + os.remove(self.address) + +@unittest.skipUnless(threading, 'Threading required for this test.') class HTTPHandlerTest(BaseTest): """Test for HTTPHandler.""" @@ -1779,7 +1830,7 @@ class WarningsTest(BaseTest): logger.removeHandler(h) s = stream.getvalue() h.close() - self.assertTrue(s.find("UserWarning: I'm warning you...\n") > 0) + self.assertGreater(s.find("UserWarning: I'm warning you...\n"), 0) #See if an explicit file uses the original implementation a_file = io.StringIO() @@ -1817,7 +1868,7 @@ class ConfigDictTest(BaseTest): """Reading logging config from a dictionary.""" - expected_log_pat = r"^([\w]+) \+\+ ([\w]+)$" + expected_log_pat = r"^(\w+) \+\+ (\w+)$" # config0 is a standard configuration. config0 = { @@ -2389,6 +2440,32 @@ class ConfigDictTest(BaseTest): }, } + # As config0, but with properties + config14 = { + 'version': 1, + 'formatters': { + 'form1' : { + 'format' : '%(levelname)s ++ %(message)s', + }, + }, + 'handlers' : { + 'hand1' : { + 'class' : 'logging.StreamHandler', + 'formatter' : 'form1', + 'level' : 'NOTSET', + 'stream' : 'ext://sys.stdout', + '.': { + 'foo': 'bar', + 'terminator': '!\n', + } + }, + }, + 'root' : { + 'level' : 'WARNING', + 'handlers' : ['hand1'], + }, + } + out_of_order = { "version": 1, "formatters": { @@ -2656,11 +2733,20 @@ class ConfigDictTest(BaseTest): def test_config13_failure(self): self.assertRaises(Exception, self.apply_config, self.config13) + def test_config14_ok(self): + with captured_stdout() as output: + self.apply_config(self.config14) + h = logging._handlers['hand1'] + self.assertEqual(h.foo, 'bar') + self.assertEqual(h.terminator, '!\n') + logging.warning('Exclamation') + self.assertTrue(output.getvalue().endswith('Exclamation!\n')) + @unittest.skipUnless(threading, 'listen() needs threading to work') - def setup_via_listener(self, text): + def setup_via_listener(self, text, verify=None): text = text.encode("utf-8") # Ask for a randomly assigned port (by using port 0) - t = logging.config.listen(0) + t = logging.config.listen(0, verify) t.start() t.ready.wait() # Now get the port allocated @@ -2720,6 +2806,69 @@ class ConfigDictTest(BaseTest): # Original logger output is empty. self.assert_log_lines([]) + @unittest.skipUnless(threading, 'Threading required for this test.') + def test_listen_verify(self): + + def verify_fail(stuff): + return None + + def verify_reverse(stuff): + return stuff[::-1] + + logger = logging.getLogger("compiler.parser") + to_send = textwrap.dedent(ConfigFileTest.config1) + # First, specify a verification function that will fail. + # We expect to see no output, since our configuration + # never took effect. + with captured_stdout() as output: + self.setup_via_listener(to_send, verify_fail) + # Both will output a message + logger.info(self.next_message()) + logger.error(self.next_message()) + self.assert_log_lines([], stream=output) + # Original logger output has the stuff we logged. + self.assert_log_lines([ + ('INFO', '1'), + ('ERROR', '2'), + ], pat=r"^[\w.]+ -> (\w+): (\d+)$") + + # Now, perform no verification. Our configuration + # should take effect. + + with captured_stdout() as output: + self.setup_via_listener(to_send) # no verify callable specified + logger = logging.getLogger("compiler.parser") + # Both will output a message + logger.info(self.next_message()) + logger.error(self.next_message()) + self.assert_log_lines([ + ('INFO', '3'), + ('ERROR', '4'), + ], stream=output) + # Original logger output still has the stuff we logged before. + self.assert_log_lines([ + ('INFO', '1'), + ('ERROR', '2'), + ], pat=r"^[\w.]+ -> (\w+): (\d+)$") + + # Now, perform verification which transforms the bytes. + + with captured_stdout() as output: + self.setup_via_listener(to_send[::-1], verify_reverse) + logger = logging.getLogger("compiler.parser") + # Both will output a message + logger.info(self.next_message()) + logger.error(self.next_message()) + self.assert_log_lines([ + ('INFO', '5'), + ('ERROR', '6'), + ], stream=output) + # Original logger output still has the stuff we logged before. + self.assert_log_lines([ + ('INFO', '1'), + ('ERROR', '2'), + ], pat=r"^[\w.]+ -> (\w+): (\d+)$") + def test_out_of_order(self): self.apply_config(self.out_of_order) handler = logging.getLogger('mymodule').handlers[0] @@ -2779,14 +2928,14 @@ class ChildLoggerTest(BaseTest): l2 = logging.getLogger('def.ghi') c1 = r.getChild('xyz') c2 = r.getChild('uvw.xyz') - self.assertTrue(c1 is logging.getLogger('xyz')) - self.assertTrue(c2 is logging.getLogger('uvw.xyz')) + self.assertIs(c1, logging.getLogger('xyz')) + self.assertIs(c2, logging.getLogger('uvw.xyz')) c1 = l1.getChild('def') c2 = c1.getChild('ghi') c3 = l1.getChild('def.ghi') - self.assertTrue(c1 is logging.getLogger('abc.def')) - self.assertTrue(c2 is logging.getLogger('abc.def.ghi')) - self.assertTrue(c2 is c3) + self.assertIs(c1, logging.getLogger('abc.def')) + self.assertIs(c2, logging.getLogger('abc.def.ghi')) + self.assertIs(c2, c3) class DerivedLogRecord(logging.LogRecord): @@ -2829,7 +2978,7 @@ class LogRecordFactoryTest(BaseTest): class QueueHandlerTest(BaseTest): # Do not bother with a logger name group. - expected_log_pat = r"^[\w.]+ -> ([\w]+): ([\d]+)$" + expected_log_pat = r"^[\w.]+ -> (\w+): (\d+)$" def setUp(self): BaseTest.setUp(self) @@ -3123,13 +3272,13 @@ class ShutdownTest(BaseTest): self.assertEqual('0 - release', self.called[-1]) def test_with_ioerror_in_acquire(self): - self._test_with_failure_in_method('acquire', IOError) + self._test_with_failure_in_method('acquire', OSError) def test_with_ioerror_in_flush(self): - self._test_with_failure_in_method('flush', IOError) + self._test_with_failure_in_method('flush', OSError) def test_with_ioerror_in_close(self): - self._test_with_failure_in_method('close', IOError) + self._test_with_failure_in_method('close', OSError) def test_with_valueerror_in_acquire(self): self._test_with_failure_in_method('acquire', ValueError) @@ -3336,6 +3485,12 @@ class BasicConfigTest(unittest.TestCase): self.assertEqual(logging.root.level, self.original_logging_level) def test_filename(self): + + def cleanup(h1, h2, fn): + h1.close() + h2.close() + os.remove(fn) + logging.basicConfig(filename='test.log') self.assertEqual(len(logging.root.handlers), 1) @@ -3343,17 +3498,23 @@ class BasicConfigTest(unittest.TestCase): self.assertIsInstance(handler, logging.FileHandler) expected = logging.FileHandler('test.log', 'a') - self.addCleanup(expected.close) self.assertEqual(handler.stream.mode, expected.stream.mode) self.assertEqual(handler.stream.name, expected.stream.name) + self.addCleanup(cleanup, handler, expected, 'test.log') def test_filemode(self): + + def cleanup(h1, h2, fn): + h1.close() + h2.close() + os.remove(fn) + logging.basicConfig(filename='test.log', filemode='wb') handler = logging.root.handlers[0] expected = logging.FileHandler('test.log', 'wb') - self.addCleanup(expected.close) self.assertEqual(handler.stream.mode, expected.stream.mode) + self.addCleanup(cleanup, handler, expected, 'test.log') def test_stream(self): stream = io.StringIO() @@ -3809,6 +3970,63 @@ class TimedRotatingFileHandlerTest(BaseFileTest): assertRaises(ValueError, logging.handlers.TimedRotatingFileHandler, self.fn, 'W7', delay=True) + def test_compute_rollover_daily_attime(self): + currentTime = 0 + atTime = datetime.time(12, 0, 0) + rh = logging.handlers.TimedRotatingFileHandler( + self.fn, when='MIDNIGHT', interval=1, backupCount=0, utc=True, + atTime=atTime) + try: + actual = rh.computeRollover(currentTime) + self.assertEqual(actual, currentTime + 12 * 60 * 60) + + actual = rh.computeRollover(currentTime + 13 * 60 * 60) + self.assertEqual(actual, currentTime + 36 * 60 * 60) + finally: + rh.close() + + #@unittest.skipIf(True, 'Temporarily skipped while failures investigated.') + def test_compute_rollover_weekly_attime(self): + currentTime = int(time.time()) + today = currentTime - currentTime % 86400 + + atTime = datetime.time(12, 0, 0) + + wday = time.gmtime(today).tm_wday + for day in range(7): + rh = logging.handlers.TimedRotatingFileHandler( + self.fn, when='W%d' % day, interval=1, backupCount=0, utc=True, + atTime=atTime) + try: + if wday > day: + # The rollover day has already passed this week, so we + # go over into next week + expected = (7 - wday + day) + else: + expected = (day - wday) + # At this point expected is in days from now, convert to seconds + expected *= 24 * 60 * 60 + # Add in the rollover time + expected += 12 * 60 * 60 + # Add in adjustment for today + expected += today + actual = rh.computeRollover(today) + if actual != expected: + print('failed in timezone: %d' % time.timezone) + print('local vars: %s' % locals()) + self.assertEqual(actual, expected) + if day == wday: + # goes into following week + expected += 7 * 24 * 60 * 60 + actual = rh.computeRollover(today + 13 * 60 * 60) + if actual != expected: + print('failed in timezone: %d' % time.timezone) + print('local vars: %s' % locals()) + self.assertEqual(actual, expected) + finally: + rh.close() + + def secs(**kw): return datetime.timedelta(**kw) // datetime.timedelta(seconds=1) @@ -3867,7 +4085,7 @@ class NTEventLogHandlerTest(BaseTest): h.handle(r) h.close() # Now see if the event is recorded - self.assertTrue(num_recs < win32evtlog.GetNumberOfEventLogRecords(elh)) + self.assertLess(num_recs, win32evtlog.GetNumberOfEventLogRecords(elh)) flags = win32evtlog.EVENTLOG_BACKWARDS_READ | \ win32evtlog.EVENTLOG_SEQUENTIAL_READ found = False @@ -3900,7 +4118,8 @@ def test_main(): SMTPHandlerTest, FileHandlerTest, RotatingFileHandlerTest, LastResortTest, LogRecordTest, ExceptionTest, SysLogHandlerTest, HTTPHandlerTest, NTEventLogHandlerTest, - TimedRotatingFileHandlerTest + TimedRotatingFileHandlerTest, UnixSocketHandlerTest, + UnixDatagramHandlerTest, UnixSysLogHandlerTest ) if __name__ == "__main__": |