diff options
author | pedrofrazao <603718+pedrofrazao@users.noreply.github.com> | 2022-07-24 14:32:11 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-07-24 16:32:11 +0300 |
commit | a8c8227b0cddab6ae77fecbd935f327b30628b32 (patch) | |
tree | 351e63e7ea41923b0a51b512b963d35ffdec439f | |
parent | c70845d9783d66e48074fa3ebb7157bb52f11704 (diff) | |
download | redis-py-a8c8227b0cddab6ae77fecbd935f327b30628b32.tar.gz |
redis stream example (#2269)
* redis stream example
* redis stream example on docs/examples.rst
Co-authored-by: pedro.frazao <perl.pf@netcf.org>
-rw-r--r-- | docs/examples.rst | 1 | ||||
-rw-r--r-- | docs/examples/redis-stream-example.ipynb | 754 |
2 files changed, 755 insertions, 0 deletions
diff --git a/docs/examples.rst b/docs/examples.rst index 722fae2..08526ff 100644 --- a/docs/examples.rst +++ b/docs/examples.rst @@ -12,3 +12,4 @@ Examples examples/set_and_get_examples examples/search_vector_similarity_examples examples/pipeline_examples + examples/redis-stream-example.ipynb diff --git a/docs/examples/redis-stream-example.ipynb b/docs/examples/redis-stream-example.ipynb new file mode 100644 index 0000000..9303b52 --- /dev/null +++ b/docs/examples/redis-stream-example.ipynb @@ -0,0 +1,754 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Redis Stream Examples" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## basic config" + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [], + "source": [ + "redis_host = \"redis\"\n", + "stream_key = \"skey\"\n", + "stream2_key = \"s2key\"\n", + "group1 = \"grp1\"\n", + "group2 = \"grp2\"" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## connection" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "True" + ] + }, + "execution_count": 2, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "import redis\n", + "from time import time\n", + "from redis.exceptions import ConnectionError, DataError, NoScriptError, RedisError, ResponseError\n", + "\n", + "r = redis.Redis( redis_host )\n", + "r.ping()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## xadd and xread" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### add some data to the stream" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "stream length: 10\n" + ] + } + ], + "source": [ + "for i in range(0,10):\n", + " r.xadd( stream_key, { 'ts': time(), 'v': i } )\n", + "print( f\"stream length: {r.xlen( stream_key )}\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### read some data from the stream" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "[[b'skey', [(b'1657571033115-0', {b'ts': b'1657571033.1128936', b'v': b'0'}), (b'1657571033117-0', {b'ts': b'1657571033.1176307', b'v': b'1'})]]]\n" + ] + } + ], + "source": [ + "## read 2 entries from stream_key\n", + "l = r.xread( count=2, streams={stream_key:0} )\n", + "print(l)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### extract data from the returned structure" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "got data from stream: b'skey'\n", + "id: b'1657571033115-0' value: b'0'\n", + "id: b'1657571033117-0' value: b'1'\n" + ] + } + ], + "source": [ + "first_stream = l[0]\n", + "print( f\"got data from stream: {first_stream[0]}\")\n", + "fs_data = first_stream[1]\n", + "for id, value in fs_data:\n", + " print( f\"id: {id} value: {value[b'v']}\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### read more data from the stream\n", + "if we call the `xread` with the same arguments we will get the same data" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "id: b'1657571033115-0' value: b'0'\n", + "id: b'1657571033117-0' value: b'1'\n" + ] + } + ], + "source": [ + "l = r.xread( count=2, streams={stream_key:0} )\n", + "for id, value in l[0][1]:\n", + " print( f\"id: {id} value: {value[b'v']}\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "to get new data we need to change the key passed to the call" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "id: b'1657571033118-0' value: b'2'\n", + "id: b'1657571033119-0' value: b'3'\n" + ] + } + ], + "source": [ + "last_id_returned = l[0][1][-1][0]\n", + "l = r.xread( count=2, streams={stream_key: last_id_returned} )\n", + "for id, value in l[0][1]:\n", + " print( f\"id: {id} value: {value[b'v']}\")" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "id: b'1657571033119-1' value: b'4'\n", + "id: b'1657571033121-0' value: b'5'\n" + ] + } + ], + "source": [ + "last_id_returned = l[0][1][-1][0]\n", + "l = r.xread( count=2, streams={stream_key: last_id_returned} )\n", + "for id, value in l[0][1]:\n", + " print( f\"id: {id} value: {value[b'v']}\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "to get only newer entries" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "stream length: 10\n", + "after 5s block, got an empty list [], no *new* messages on the stream\n", + "stream length: 10\n" + ] + } + ], + "source": [ + "print( f\"stream length: {r.xlen( stream_key )}\")\n", + "# wait for 5s for new messages\n", + "l = r.xread( count=1, block=5000, streams={stream_key: '$'} )\n", + "print( f\"after 5s block, got an empty list {l}, no *new* messages on the stream\")\n", + "print( f\"stream length: {r.xlen( stream_key )}\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### 2nd stream\n", + "Add some messages to a 2nd stream" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "stream length: 10\n" + ] + } + ], + "source": [ + "for i in range(1000,1010):\n", + " r.xadd( stream2_key, { 'v': i } )\n", + "print( f\"stream length: {r.xlen( stream2_key )}\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "get messages from the 2 streams" + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "got from b'skey' the entry [(b'1657571033115-0', {b'ts': b'1657571033.1128936', b'v': b'0'})]\n", + "got from b's2key' the entry [(b'1657571042111-0', {b'v': b'1000'})]\n" + ] + } + ], + "source": [ + "l = r.xread( count=1, streams={stream_key:0,stream2_key:0} )\n", + "for k,d in l:\n", + " print(f\"got from {k} the entry {d}\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# stream groups\n", + "With the groups is possible track, for many consumers, and at the Redis side, which message have been already consumed.\n", + "## add some data to streams\n", + "Creating 2 streams with 10 messages each." + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "stream 'skey' length: 20\n", + "stream 's2key' length: 20\n" + ] + } + ], + "source": [ + "def add_some_data_to_stream( sname, key_range ):\n", + " for i in key_range:\n", + " r.xadd( sname, { 'ts': time(), 'v': i } )\n", + " print( f\"stream '{sname}' length: {r.xlen( stream_key )}\")\n", + "\n", + "add_some_data_to_stream( stream_key, range(0,10) )\n", + "add_some_data_to_stream( stream2_key, range(1000,1010) )" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## use a group to read from the stream\n", + "* create a group `grp1` with the stream `skey`, and\n", + "* create a group `grp2` with the streams `skey` and `s2key`\n", + "\n", + "Use the `xinfo_group` to verify the result of the group creation." + ] + }, + { + "cell_type": "code", + "execution_count": 13, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "skey -> group name: b'grp1' with 0 consumers and b'0-0' as last read id\n", + "skey -> group name: b'grp2' with 0 consumers and b'0-0' as last read id\n", + "s2key -> group name: b'grp2' with 0 consumers and b'0-0' as last read id\n" + ] + } + ], + "source": [ + "## create the group\n", + "def create_group( skey, gname ):\n", + " try:\n", + " r.xgroup_create( name=skey, groupname=gname, id=0 )\n", + " except ResponseError as e:\n", + " print(f\"raised: {e}\")\n", + "\n", + "# group1 read the stream 'skey'\n", + "create_group( stream_key, group1 )\n", + "# group2 read the streams 'skey' and 's2key'\n", + "create_group( stream_key, group2 )\n", + "create_group( stream2_key, group2 )\n", + "\n", + "def group_info( skey ):\n", + " res = r.xinfo_groups( name=skey )\n", + " for i in res:\n", + " print( f\"{skey} -> group name: {i['name']} with {i['consumers']} consumers and {i['last-delivered-id']}\"\n", + " + f\" as last read id\")\n", + " \n", + "group_info( stream_key )\n", + "group_info( stream2_key )" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## group read\n", + "The `xreadgroup` method permit to read from a stream group." + ] + }, + { + "cell_type": "code", + "execution_count": 14, + "metadata": {}, + "outputs": [], + "source": [ + "def print_xreadgroup_reply( reply, group = None, run = None):\n", + " for d_stream in reply:\n", + " for element in d_stream[1]:\n", + " print( f\"got element {element[0]}\"\n", + " + f\"from stream {d_stream[0]}\" )\n", + " if run is not None:\n", + " run( d_stream[0], group, element[0] )" + ] + }, + { + "cell_type": "code", + "execution_count": 15, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "got element b'1657571033115-0'from stream b'skey'\n", + "got element b'1657571033117-0'from stream b'skey'\n" + ] + } + ], + "source": [ + "# read some messages on group1 with consumer 'c' \n", + "d = r.xreadgroup( groupname=group1, consumername='c', block=10,\n", + " count=2, streams={stream_key:'>'})\n", + "print_xreadgroup_reply( d )" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "A **2nd consumer** for the same stream group will get not delivered messages." + ] + }, + { + "cell_type": "code", + "execution_count": 16, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "got element b'1657571033118-0'from stream b'skey'\n", + "got element b'1657571033119-0'from stream b'skey'\n" + ] + } + ], + "source": [ + "# read some messages on group1 with consumer 'c' \n", + "d = r.xreadgroup( groupname=group1, consumername='c2', block=10,\n", + " count=2, streams={stream_key:'>'})\n", + "print_xreadgroup_reply( d )" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "But a **2nd stream group** can read the already delivered messages again.\n", + "\n", + "Note that the 2nd stream group include also the 2nd stream.\n", + "That can be identified in the reply (1st element of the reply list)." + ] + }, + { + "cell_type": "code", + "execution_count": 18, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "got element b'1657571033115-0'from stream b'skey'\n", + "got element b'1657571033117-0'from stream b'skey'\n", + "got element b'1657571042111-0'from stream b's2key'\n", + "got element b'1657571042113-0'from stream b's2key'\n" + ] + } + ], + "source": [ + "d2 = r.xreadgroup( groupname=group2, consumername='c', block=10,\n", + " count=2, streams={stream_key:'>',stream2_key:'>'})\n", + "print_xreadgroup_reply( d2 )" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "To check for pending messages (delivered messages without acknowledgment) we can use the `xpending`." + ] + }, + { + "cell_type": "code", + "execution_count": 19, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "4 pending messages on 'skey' for group 'grp1'\n", + "2 pending messages on 'skey' for group 'grp2'\n", + "2 pending messages on 's2key' for group 'grp2'\n" + ] + } + ], + "source": [ + "# check pending status (read messages without a ack)\n", + "def print_pending_info( key_group ):\n", + " for s,k in key_group:\n", + " pr = r.xpending( name=s, groupname=k )\n", + " print( f\"{pr.get('pending')} pending messages on '{s}' for group '{k}'\" )\n", + " \n", + "print_pending_info( ((stream_key,group1),(stream_key,group2),(stream2_key,group2)) )" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## ack\n", + "Acknowledge some messages with `xack`." + ] + }, + { + "cell_type": "code", + "execution_count": 20, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "got element b'1657571033118-0'from stream b'skey'\n", + "got element b'1657571033119-0'from stream b'skey'\n" + ] + } + ], + "source": [ + "# do acknowledges for group1\n", + "toack = lambda k,g,e: r.xack( k,g, e )\n", + "print_xreadgroup_reply( d, group=group1, run=toack )" + ] + }, + { + "cell_type": "code", + "execution_count": 21, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "2 pending messages on 'skey' for group 'grp1'\n", + "2 pending messages on 'skey' for group 'grp2'\n", + "2 pending messages on 's2key' for group 'grp2'\n" + ] + } + ], + "source": [ + "# check pending again\n", + "print_pending_info( ((stream_key,group1),(stream_key,group2),(stream2_key,group2)) )" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "ack all messages on the `group1`." + ] + }, + { + "cell_type": "code", + "execution_count": 22, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "got element b'1657571033119-1'from stream b'skey'\n", + "got element b'1657571033121-0'from stream b'skey'\n", + "got element b'1657571033121-1'from stream b'skey'\n", + "got element b'1657571033121-2'from stream b'skey'\n", + "got element b'1657571033122-0'from stream b'skey'\n", + "got element b'1657571033122-1'from stream b'skey'\n", + "got element b'1657571049557-0'from stream b'skey'\n", + "got element b'1657571049557-1'from stream b'skey'\n", + "got element b'1657571049558-0'from stream b'skey'\n", + "got element b'1657571049559-0'from stream b'skey'\n", + "got element b'1657571049559-1'from stream b'skey'\n", + "got element b'1657571049559-2'from stream b'skey'\n", + "got element b'1657571049560-0'from stream b'skey'\n", + "got element b'1657571049562-0'from stream b'skey'\n", + "got element b'1657571049563-0'from stream b'skey'\n", + "got element b'1657571049563-1'from stream b'skey'\n", + "2 pending messages on 'skey' for group 'grp1'\n" + ] + } + ], + "source": [ + "d = r.xreadgroup( groupname=group1, consumername='c', block=10,\n", + " count=100, streams={stream_key:'>'})\n", + "print_xreadgroup_reply( d, group=group1, run=toack)\n", + "print_pending_info( ((stream_key,group1),) )" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "But stream length will be the same after the `xack` of all messages on the `group1`." + ] + }, + { + "cell_type": "code", + "execution_count": 24, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "20" + ] + }, + "execution_count": 24, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "r.xlen(stream_key)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## delete all\n", + "To remove the messages with need to remote them explicitly with `xdel`." + ] + }, + { + "cell_type": "code", + "execution_count": 25, + "metadata": {}, + "outputs": [], + "source": [ + "s1 = r.xread( streams={stream_key:0} )\n", + "for streams in s1:\n", + " stream_name, messages = streams\n", + " # del all ids from the message list\n", + " [ r.xdel( stream_name, i[0] ) for i in messages ]" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "stream length" + ] + }, + { + "cell_type": "code", + "execution_count": 26, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "0" + ] + }, + "execution_count": 26, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "r.xlen(stream_key)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "But with the `xdel` the 2nd group can read any not processed message from the `skey`." + ] + }, + { + "cell_type": "code", + "execution_count": 27, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "got element b'1657571042113-1'from stream b's2key'\n", + "got element b'1657571042114-0'from stream b's2key'\n" + ] + } + ], + "source": [ + "d2 = r.xreadgroup( groupname=group2, consumername='c', block=10,\n", + " count=2, streams={stream_key:'>',stream2_key:'>'})\n", + "print_xreadgroup_reply( d2 )" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.8.10" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} |