summaryrefslogtreecommitdiff
path: root/tests/test_redis.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_redis.py')
-rw-r--r--tests/test_redis.py96
1 files changed, 96 insertions, 0 deletions
diff --git a/tests/test_redis.py b/tests/test_redis.py
new file mode 100644
index 0000000..cc319cb
--- /dev/null
+++ b/tests/test_redis.py
@@ -0,0 +1,96 @@
+import logging
+from logutils.testing import TestHandler, Matcher
+from logutils.redis import RedisQueueHandler, RedisQueueListener
+from redis import Redis
+import socket
+import subprocess
+import time
+import unittest
+
+class QueueListener(RedisQueueListener):
+ def dequeue(self, block):
+ record = RedisQueueListener.dequeue(self, block)
+ if record:
+ record = logging.makeLogRecord(record)
+ return record
+
+class RedisQueueTest(unittest.TestCase):
+ def setUp(self):
+ self.handler = h = TestHandler(Matcher())
+ self.logger = l = logging.getLogger()
+ self.server = subprocess.Popen(['redis-server'],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ self.wait_for_server()
+ self.queue = q = Redis()
+ self.qh = qh = RedisQueueHandler(redis=q)
+ self.ql = ql = QueueListener(h, redis=q)
+ ql.start()
+ l.addHandler(qh)
+
+ def tearDown(self):
+ self.logger.removeHandler(self.qh)
+ self.qh.close()
+ self.handler.close()
+ self.server.terminate()
+
+ def wait_for_server(self):
+ maxtime = time.time() + 2 # 2 seconds to wait for server
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ while time.time() < maxtime:
+ try:
+ sock.connect(('localhost', 6379))
+ break
+ except socket.error:
+ pass
+ if time.time() >= maxtime:
+ raise Exception('unable to connect to Redis server')
+ sock.close()
+
+ def test_simple(self):
+ "Simple test of queue handling and listening."
+ # Just as a demo, let's log some messages.
+ # Only one should show up in the log.
+ self.logger.debug("This won't show up.")
+ self.logger.info("Neither will this.")
+ self.logger.warning("But this will.")
+ self.ql.stop() #ensure all records have come through.
+ h = self.handler
+ #import pdb; pdb.set_trace()
+ self.assertTrue(h.matches(levelno=logging.WARNING))
+ self.assertFalse(h.matches(levelno=logging.DEBUG))
+ self.assertFalse(h.matches(levelno=logging.INFO))
+
+ def test_partial(self):
+ "Test of partial matching through queues."
+ # Just as a demo, let's log some messages.
+ # Only one should show up in the log.
+ self.logger.debug("This won't show up.")
+ self.logger.info("Neither will this.")
+ self.logger.warning("But this will.")
+ self.ql.stop() #ensure all records have come through.
+ h = self.handler
+ self.assertTrue(h.matches(msg="ut th")) # from "But this will"
+ self.assertTrue(h.matches(message="ut th")) # from "But this will"
+ self.assertFalse(h.matches(message="either"))
+ self.assertFalse(h.matches(message="won't"))
+
+ def test_multiple(self):
+ "Test of matching multiple values through queues."
+ # Just as a demo, let's log some messages.
+ # Only one should show up in the log.
+ self.logger.debug("This won't show up.")
+ self.logger.info("Neither will this.")
+ self.logger.warning("But this will.")
+ self.logger.error("And so will this.")
+ self.ql.stop() #ensure all records have come through.
+ h = self.handler
+ self.assertTrue(h.matches(levelno=logging.WARNING,
+ message='ut thi'))
+ self.assertTrue(h.matches(levelno=logging.ERROR,
+ message='nd so wi'))
+ self.assertFalse(h.matches(levelno=logging.INFO))
+
+if __name__ == '__main__':
+ unittest.main()
+