diff options
Diffstat (limited to 'tests/test_redis.py')
-rw-r--r-- | tests/test_redis.py | 96 |
1 files changed, 96 insertions, 0 deletions
diff --git a/tests/test_redis.py b/tests/test_redis.py new file mode 100644 index 0000000..cc319cb --- /dev/null +++ b/tests/test_redis.py @@ -0,0 +1,96 @@ +import logging +from logutils.testing import TestHandler, Matcher +from logutils.redis import RedisQueueHandler, RedisQueueListener +from redis import Redis +import socket +import subprocess +import time +import unittest + +class QueueListener(RedisQueueListener): + def dequeue(self, block): + record = RedisQueueListener.dequeue(self, block) + if record: + record = logging.makeLogRecord(record) + return record + +class RedisQueueTest(unittest.TestCase): + def setUp(self): + self.handler = h = TestHandler(Matcher()) + self.logger = l = logging.getLogger() + self.server = subprocess.Popen(['redis-server'], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + self.wait_for_server() + self.queue = q = Redis() + self.qh = qh = RedisQueueHandler(redis=q) + self.ql = ql = QueueListener(h, redis=q) + ql.start() + l.addHandler(qh) + + def tearDown(self): + self.logger.removeHandler(self.qh) + self.qh.close() + self.handler.close() + self.server.terminate() + + def wait_for_server(self): + maxtime = time.time() + 2 # 2 seconds to wait for server + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + while time.time() < maxtime: + try: + sock.connect(('localhost', 6379)) + break + except socket.error: + pass + if time.time() >= maxtime: + raise Exception('unable to connect to Redis server') + sock.close() + + def test_simple(self): + "Simple test of queue handling and listening." + # Just as a demo, let's log some messages. + # Only one should show up in the log. + self.logger.debug("This won't show up.") + self.logger.info("Neither will this.") + self.logger.warning("But this will.") + self.ql.stop() #ensure all records have come through. + h = self.handler + #import pdb; pdb.set_trace() + self.assertTrue(h.matches(levelno=logging.WARNING)) + self.assertFalse(h.matches(levelno=logging.DEBUG)) + self.assertFalse(h.matches(levelno=logging.INFO)) + + def test_partial(self): + "Test of partial matching through queues." + # Just as a demo, let's log some messages. + # Only one should show up in the log. + self.logger.debug("This won't show up.") + self.logger.info("Neither will this.") + self.logger.warning("But this will.") + self.ql.stop() #ensure all records have come through. + h = self.handler + self.assertTrue(h.matches(msg="ut th")) # from "But this will" + self.assertTrue(h.matches(message="ut th")) # from "But this will" + self.assertFalse(h.matches(message="either")) + self.assertFalse(h.matches(message="won't")) + + def test_multiple(self): + "Test of matching multiple values through queues." + # Just as a demo, let's log some messages. + # Only one should show up in the log. + self.logger.debug("This won't show up.") + self.logger.info("Neither will this.") + self.logger.warning("But this will.") + self.logger.error("And so will this.") + self.ql.stop() #ensure all records have come through. + h = self.handler + self.assertTrue(h.matches(levelno=logging.WARNING, + message='ut thi')) + self.assertTrue(h.matches(levelno=logging.ERROR, + message='nd so wi')) + self.assertFalse(h.matches(levelno=logging.INFO)) + +if __name__ == '__main__': + unittest.main() + |