summaryrefslogtreecommitdiff
path: root/Lib/test/test_logging.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_logging.py')
-rw-r--r--Lib/test/test_logging.py66
1 files changed, 62 insertions, 4 deletions
diff --git a/Lib/test/test_logging.py b/Lib/test/test_logging.py
index 017e9f38dc..06dd6d25cd 100644
--- a/Lib/test/test_logging.py
+++ b/Lib/test/test_logging.py
@@ -519,8 +519,17 @@ class HandlerTest(BaseTest):
os.unlink(fn)
h = logging.handlers.WatchedFileHandler(fn, delay=True)
if existing:
- self.assertNotEqual(h.dev, -1)
- self.assertNotEqual(h.ino, -1)
+ dev, ino = h.dev, h.ino
+ self.assertNotEqual(dev, -1)
+ self.assertNotEqual(ino, -1)
+ r = logging.makeLogRecord({'msg': 'Test'})
+ h.handle(r)
+ # Now remove the file.
+ os.unlink(fn)
+ self.assertFalse(os.path.exists(fn))
+ # The next call should recreate the file.
+ h.handle(r)
+ self.assertTrue(os.path.exists(fn))
else:
self.assertEqual(h.dev, -1)
self.assertEqual(h.ino, -1)
@@ -1045,8 +1054,9 @@ class SocketHandlerTest(BaseTest):
def tearDown(self):
"""Shutdown the TCP server."""
try:
- self.tcpserver.abort = True
- del self.tcpserver
+ if hasattr(self, 'tcpserver'):
+ self.tcpserver.abort = True
+ del self.tcpserver
self.root_logger.removeHandler(self.sock_hdlr)
self.sock_hdlr.close()
for thread in self.threads:
@@ -1068,6 +1078,22 @@ class SocketHandlerTest(BaseTest):
logger.debug("eggs")
self.assertEqual(self.get_output(), "spam\neggs\n")
+ def test_noserver(self):
+ # Kill the server
+ self.tcpserver.abort = True
+ del self.tcpserver
+ for thread in self.threads:
+ thread.join(2.0)
+ #The logging call should try to connect, which should fail
+ try:
+ raise RuntimeError('Deliberate mistake')
+ except RuntimeError:
+ self.root_logger.exception('Never sent')
+ self.root_logger.error('Never sent, either')
+ now = time.time()
+ self.assertTrue(self.sock_hdlr.retryTime > now)
+ time.sleep(self.sock_hdlr.retryTime - now + 0.001)
+ self.root_logger.error('Nor this')
class MemoryTest(BaseTest):
@@ -2613,6 +2639,38 @@ class LogRecordTest(BaseTest):
r.removeHandler(h)
h.close()
+ def test_multiprocessing(self):
+ r = logging.makeLogRecord({})
+ self.assertEqual(r.processName, 'MainProcess')
+ import multiprocessing as mp
+ r = logging.makeLogRecord({})
+ self.assertEqual(r.processName, mp.current_process().name)
+
+ def test_optional(self):
+ r = logging.makeLogRecord({})
+ NOT_NONE = self.assertIsNotNone
+ NOT_NONE(r.thread)
+ NOT_NONE(r.threadName)
+ NOT_NONE(r.process)
+ NOT_NONE(r.processName)
+ log_threads = logging.logThreads
+ log_processes = logging.logProcesses
+ log_multiprocessing = logging.logMultiprocessing
+ try:
+ logging.logThreads = False
+ logging.logProcesses = False
+ logging.logMultiprocessing = False
+ r = logging.makeLogRecord({})
+ NONE = self.assertIsNone
+ NONE(r.thread)
+ NONE(r.threadName)
+ NONE(r.process)
+ NONE(r.processName)
+ finally:
+ logging.logThreads = log_threads
+ logging.logProcesses = log_processes
+ logging.logMultiprocessing = log_multiprocessing
+
class BasicConfigTest(unittest.TestCase):
"""Test suite for logging.basicConfig."""