summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorpedrofrazao <603718+pedrofrazao@users.noreply.github.com>2022-07-24 14:32:11 +0100
committerGitHub <noreply@github.com>2022-07-24 16:32:11 +0300
commita8c8227b0cddab6ae77fecbd935f327b30628b32 (patch)
tree351e63e7ea41923b0a51b512b963d35ffdec439f
parentc70845d9783d66e48074fa3ebb7157bb52f11704 (diff)
downloadredis-py-a8c8227b0cddab6ae77fecbd935f327b30628b32.tar.gz
redis stream example (#2269)
* redis stream example * redis stream example on docs/examples.rst Co-authored-by: pedro.frazao <perl.pf@netcf.org>
-rw-r--r--docs/examples.rst1
-rw-r--r--docs/examples/redis-stream-example.ipynb754
2 files changed, 755 insertions, 0 deletions
diff --git a/docs/examples.rst b/docs/examples.rst
index 722fae2..08526ff 100644
--- a/docs/examples.rst
+++ b/docs/examples.rst
@@ -12,3 +12,4 @@ Examples
examples/set_and_get_examples
examples/search_vector_similarity_examples
examples/pipeline_examples
+ examples/redis-stream-example.ipynb
diff --git a/docs/examples/redis-stream-example.ipynb b/docs/examples/redis-stream-example.ipynb
new file mode 100644
index 0000000..9303b52
--- /dev/null
+++ b/docs/examples/redis-stream-example.ipynb
@@ -0,0 +1,754 @@
+{
+ "cells": [
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "# Redis Stream Examples"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "## basic config"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 1,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "redis_host = \"redis\"\n",
+ "stream_key = \"skey\"\n",
+ "stream2_key = \"s2key\"\n",
+ "group1 = \"grp1\"\n",
+ "group2 = \"grp2\""
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "## connection"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 2,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "True"
+ ]
+ },
+ "execution_count": 2,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "import redis\n",
+ "from time import time\n",
+ "from redis.exceptions import ConnectionError, DataError, NoScriptError, RedisError, ResponseError\n",
+ "\n",
+ "r = redis.Redis( redis_host )\n",
+ "r.ping()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "## xadd and xread"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "### add some data to the stream"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 3,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "stream length: 10\n"
+ ]
+ }
+ ],
+ "source": [
+ "for i in range(0,10):\n",
+ " r.xadd( stream_key, { 'ts': time(), 'v': i } )\n",
+ "print( f\"stream length: {r.xlen( stream_key )}\")"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "### read some data from the stream"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 4,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "[[b'skey', [(b'1657571033115-0', {b'ts': b'1657571033.1128936', b'v': b'0'}), (b'1657571033117-0', {b'ts': b'1657571033.1176307', b'v': b'1'})]]]\n"
+ ]
+ }
+ ],
+ "source": [
+ "## read 2 entries from stream_key\n",
+ "l = r.xread( count=2, streams={stream_key:0} )\n",
+ "print(l)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "### extract data from the returned structure"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 5,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "got data from stream: b'skey'\n",
+ "id: b'1657571033115-0' value: b'0'\n",
+ "id: b'1657571033117-0' value: b'1'\n"
+ ]
+ }
+ ],
+ "source": [
+ "first_stream = l[0]\n",
+ "print( f\"got data from stream: {first_stream[0]}\")\n",
+ "fs_data = first_stream[1]\n",
+ "for id, value in fs_data:\n",
+ " print( f\"id: {id} value: {value[b'v']}\")"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "### read more data from the stream\n",
+ "if we call the `xread` with the same arguments we will get the same data"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 6,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "id: b'1657571033115-0' value: b'0'\n",
+ "id: b'1657571033117-0' value: b'1'\n"
+ ]
+ }
+ ],
+ "source": [
+ "l = r.xread( count=2, streams={stream_key:0} )\n",
+ "for id, value in l[0][1]:\n",
+ " print( f\"id: {id} value: {value[b'v']}\")"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "to get new data we need to change the key passed to the call"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 7,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "id: b'1657571033118-0' value: b'2'\n",
+ "id: b'1657571033119-0' value: b'3'\n"
+ ]
+ }
+ ],
+ "source": [
+ "last_id_returned = l[0][1][-1][0]\n",
+ "l = r.xread( count=2, streams={stream_key: last_id_returned} )\n",
+ "for id, value in l[0][1]:\n",
+ " print( f\"id: {id} value: {value[b'v']}\")"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 8,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "id: b'1657571033119-1' value: b'4'\n",
+ "id: b'1657571033121-0' value: b'5'\n"
+ ]
+ }
+ ],
+ "source": [
+ "last_id_returned = l[0][1][-1][0]\n",
+ "l = r.xread( count=2, streams={stream_key: last_id_returned} )\n",
+ "for id, value in l[0][1]:\n",
+ " print( f\"id: {id} value: {value[b'v']}\")"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "to get only newer entries"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 9,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "stream length: 10\n",
+ "after 5s block, got an empty list [], no *new* messages on the stream\n",
+ "stream length: 10\n"
+ ]
+ }
+ ],
+ "source": [
+ "print( f\"stream length: {r.xlen( stream_key )}\")\n",
+ "# wait for 5s for new messages\n",
+ "l = r.xread( count=1, block=5000, streams={stream_key: '$'} )\n",
+ "print( f\"after 5s block, got an empty list {l}, no *new* messages on the stream\")\n",
+ "print( f\"stream length: {r.xlen( stream_key )}\")"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "### 2nd stream\n",
+ "Add some messages to a 2nd stream"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 10,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "stream length: 10\n"
+ ]
+ }
+ ],
+ "source": [
+ "for i in range(1000,1010):\n",
+ " r.xadd( stream2_key, { 'v': i } )\n",
+ "print( f\"stream length: {r.xlen( stream2_key )}\")"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "get messages from the 2 streams"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 11,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "got from b'skey' the entry [(b'1657571033115-0', {b'ts': b'1657571033.1128936', b'v': b'0'})]\n",
+ "got from b's2key' the entry [(b'1657571042111-0', {b'v': b'1000'})]\n"
+ ]
+ }
+ ],
+ "source": [
+ "l = r.xread( count=1, streams={stream_key:0,stream2_key:0} )\n",
+ "for k,d in l:\n",
+ " print(f\"got from {k} the entry {d}\")"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "# stream groups\n",
+ "With the groups is possible track, for many consumers, and at the Redis side, which message have been already consumed.\n",
+ "## add some data to streams\n",
+ "Creating 2 streams with 10 messages each."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 12,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "stream 'skey' length: 20\n",
+ "stream 's2key' length: 20\n"
+ ]
+ }
+ ],
+ "source": [
+ "def add_some_data_to_stream( sname, key_range ):\n",
+ " for i in key_range:\n",
+ " r.xadd( sname, { 'ts': time(), 'v': i } )\n",
+ " print( f\"stream '{sname}' length: {r.xlen( stream_key )}\")\n",
+ "\n",
+ "add_some_data_to_stream( stream_key, range(0,10) )\n",
+ "add_some_data_to_stream( stream2_key, range(1000,1010) )"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "## use a group to read from the stream\n",
+ "* create a group `grp1` with the stream `skey`, and\n",
+ "* create a group `grp2` with the streams `skey` and `s2key`\n",
+ "\n",
+ "Use the `xinfo_group` to verify the result of the group creation."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 13,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "skey -> group name: b'grp1' with 0 consumers and b'0-0' as last read id\n",
+ "skey -> group name: b'grp2' with 0 consumers and b'0-0' as last read id\n",
+ "s2key -> group name: b'grp2' with 0 consumers and b'0-0' as last read id\n"
+ ]
+ }
+ ],
+ "source": [
+ "## create the group\n",
+ "def create_group( skey, gname ):\n",
+ " try:\n",
+ " r.xgroup_create( name=skey, groupname=gname, id=0 )\n",
+ " except ResponseError as e:\n",
+ " print(f\"raised: {e}\")\n",
+ "\n",
+ "# group1 read the stream 'skey'\n",
+ "create_group( stream_key, group1 )\n",
+ "# group2 read the streams 'skey' and 's2key'\n",
+ "create_group( stream_key, group2 )\n",
+ "create_group( stream2_key, group2 )\n",
+ "\n",
+ "def group_info( skey ):\n",
+ " res = r.xinfo_groups( name=skey )\n",
+ " for i in res:\n",
+ " print( f\"{skey} -> group name: {i['name']} with {i['consumers']} consumers and {i['last-delivered-id']}\"\n",
+ " + f\" as last read id\")\n",
+ " \n",
+ "group_info( stream_key )\n",
+ "group_info( stream2_key )"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "## group read\n",
+ "The `xreadgroup` method permit to read from a stream group."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 14,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "def print_xreadgroup_reply( reply, group = None, run = None):\n",
+ " for d_stream in reply:\n",
+ " for element in d_stream[1]:\n",
+ " print( f\"got element {element[0]}\"\n",
+ " + f\"from stream {d_stream[0]}\" )\n",
+ " if run is not None:\n",
+ " run( d_stream[0], group, element[0] )"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 15,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "got element b'1657571033115-0'from stream b'skey'\n",
+ "got element b'1657571033117-0'from stream b'skey'\n"
+ ]
+ }
+ ],
+ "source": [
+ "# read some messages on group1 with consumer 'c' \n",
+ "d = r.xreadgroup( groupname=group1, consumername='c', block=10,\n",
+ " count=2, streams={stream_key:'>'})\n",
+ "print_xreadgroup_reply( d )"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "A **2nd consumer** for the same stream group will get not delivered messages."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 16,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "got element b'1657571033118-0'from stream b'skey'\n",
+ "got element b'1657571033119-0'from stream b'skey'\n"
+ ]
+ }
+ ],
+ "source": [
+ "# read some messages on group1 with consumer 'c' \n",
+ "d = r.xreadgroup( groupname=group1, consumername='c2', block=10,\n",
+ " count=2, streams={stream_key:'>'})\n",
+ "print_xreadgroup_reply( d )"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "But a **2nd stream group** can read the already delivered messages again.\n",
+ "\n",
+ "Note that the 2nd stream group include also the 2nd stream.\n",
+ "That can be identified in the reply (1st element of the reply list)."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 18,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "got element b'1657571033115-0'from stream b'skey'\n",
+ "got element b'1657571033117-0'from stream b'skey'\n",
+ "got element b'1657571042111-0'from stream b's2key'\n",
+ "got element b'1657571042113-0'from stream b's2key'\n"
+ ]
+ }
+ ],
+ "source": [
+ "d2 = r.xreadgroup( groupname=group2, consumername='c', block=10,\n",
+ " count=2, streams={stream_key:'>',stream2_key:'>'})\n",
+ "print_xreadgroup_reply( d2 )"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "To check for pending messages (delivered messages without acknowledgment) we can use the `xpending`."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 19,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "4 pending messages on 'skey' for group 'grp1'\n",
+ "2 pending messages on 'skey' for group 'grp2'\n",
+ "2 pending messages on 's2key' for group 'grp2'\n"
+ ]
+ }
+ ],
+ "source": [
+ "# check pending status (read messages without a ack)\n",
+ "def print_pending_info( key_group ):\n",
+ " for s,k in key_group:\n",
+ " pr = r.xpending( name=s, groupname=k )\n",
+ " print( f\"{pr.get('pending')} pending messages on '{s}' for group '{k}'\" )\n",
+ " \n",
+ "print_pending_info( ((stream_key,group1),(stream_key,group2),(stream2_key,group2)) )"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "## ack\n",
+ "Acknowledge some messages with `xack`."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 20,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "got element b'1657571033118-0'from stream b'skey'\n",
+ "got element b'1657571033119-0'from stream b'skey'\n"
+ ]
+ }
+ ],
+ "source": [
+ "# do acknowledges for group1\n",
+ "toack = lambda k,g,e: r.xack( k,g, e )\n",
+ "print_xreadgroup_reply( d, group=group1, run=toack )"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 21,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "2 pending messages on 'skey' for group 'grp1'\n",
+ "2 pending messages on 'skey' for group 'grp2'\n",
+ "2 pending messages on 's2key' for group 'grp2'\n"
+ ]
+ }
+ ],
+ "source": [
+ "# check pending again\n",
+ "print_pending_info( ((stream_key,group1),(stream_key,group2),(stream2_key,group2)) )"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "ack all messages on the `group1`."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 22,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "got element b'1657571033119-1'from stream b'skey'\n",
+ "got element b'1657571033121-0'from stream b'skey'\n",
+ "got element b'1657571033121-1'from stream b'skey'\n",
+ "got element b'1657571033121-2'from stream b'skey'\n",
+ "got element b'1657571033122-0'from stream b'skey'\n",
+ "got element b'1657571033122-1'from stream b'skey'\n",
+ "got element b'1657571049557-0'from stream b'skey'\n",
+ "got element b'1657571049557-1'from stream b'skey'\n",
+ "got element b'1657571049558-0'from stream b'skey'\n",
+ "got element b'1657571049559-0'from stream b'skey'\n",
+ "got element b'1657571049559-1'from stream b'skey'\n",
+ "got element b'1657571049559-2'from stream b'skey'\n",
+ "got element b'1657571049560-0'from stream b'skey'\n",
+ "got element b'1657571049562-0'from stream b'skey'\n",
+ "got element b'1657571049563-0'from stream b'skey'\n",
+ "got element b'1657571049563-1'from stream b'skey'\n",
+ "2 pending messages on 'skey' for group 'grp1'\n"
+ ]
+ }
+ ],
+ "source": [
+ "d = r.xreadgroup( groupname=group1, consumername='c', block=10,\n",
+ " count=100, streams={stream_key:'>'})\n",
+ "print_xreadgroup_reply( d, group=group1, run=toack)\n",
+ "print_pending_info( ((stream_key,group1),) )"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "But stream length will be the same after the `xack` of all messages on the `group1`."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 24,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "20"
+ ]
+ },
+ "execution_count": 24,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "r.xlen(stream_key)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "## delete all\n",
+ "To remove the messages with need to remote them explicitly with `xdel`."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 25,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "s1 = r.xread( streams={stream_key:0} )\n",
+ "for streams in s1:\n",
+ " stream_name, messages = streams\n",
+ " # del all ids from the message list\n",
+ " [ r.xdel( stream_name, i[0] ) for i in messages ]"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "stream length"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 26,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "0"
+ ]
+ },
+ "execution_count": 26,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "r.xlen(stream_key)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "But with the `xdel` the 2nd group can read any not processed message from the `skey`."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 27,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "got element b'1657571042113-1'from stream b's2key'\n",
+ "got element b'1657571042114-0'from stream b's2key'\n"
+ ]
+ }
+ ],
+ "source": [
+ "d2 = r.xreadgroup( groupname=group2, consumername='c', block=10,\n",
+ " count=2, streams={stream_key:'>',stream2_key:'>'})\n",
+ "print_xreadgroup_reply( d2 )"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {},
+ "outputs": [],
+ "source": []
+ }
+ ],
+ "metadata": {
+ "kernelspec": {
+ "display_name": "Python 3",
+ "language": "python",
+ "name": "python3"
+ },
+ "language_info": {
+ "codemirror_mode": {
+ "name": "ipython",
+ "version": 3
+ },
+ "file_extension": ".py",
+ "mimetype": "text/x-python",
+ "name": "python",
+ "nbconvert_exporter": "python",
+ "pygments_lexer": "ipython3",
+ "version": "3.8.10"
+ }
+ },
+ "nbformat": 4,
+ "nbformat_minor": 4
+}