diff options
author | Guy Benoish <guy.benoish@redislabs.com> | 2019-11-06 15:48:46 +0530 |
---|---|---|
committer | Oran Agra <oran@redislabs.com> | 2020-10-27 08:49:22 +0200 |
commit | da2906e507d63bdb9511e3fbce83c0165500b779 (patch) | |
tree | be06fce1b80ee1a018df4d5dd115aecec0853c8b | |
parent | 18264d641b1c1cb2d4b4053fe9a6934190ef84b7 (diff) | |
download | redis-da2906e507d63bdb9511e3fbce83c0165500b779.tar.gz |
Support streams in general module API functions
Fixes GitHub issue #6492
Added stream support in RM_KeyType and RM_ValueLength.
Also moduleDelKeyIfEmpty was updated, even though it has
no effect now (It will be relevant when stream type direct
API will be coded - i.e. RM_StreamAdd)
cherry picked from commit 1833d008b3af8628835b5f082c5b4b1359557893
* modified to avoid adding new API to 5.0 (reverting the change to
RM_KeyType)
-rw-r--r-- | src/module.c | 5 | ||||
-rw-r--r-- | src/stream.h | 1 | ||||
-rw-r--r-- | src/t_stream.c | 6 |
3 files changed, 11 insertions, 1 deletions
diff --git a/src/module.c b/src/module.c index f9e497b0c..7908d0d4b 100644 --- a/src/module.c +++ b/src/module.c @@ -471,7 +471,8 @@ int moduleDelKeyIfEmpty(RedisModuleKey *key) { case OBJ_LIST: isempty = listTypeLength(o) == 0; break; case OBJ_SET: isempty = setTypeSize(o) == 0; break; case OBJ_ZSET: isempty = zsetLength(o) == 0; break; - case OBJ_HASH : isempty = hashTypeLength(o) == 0; break; + case OBJ_HASH: isempty = hashTypeLength(o) == 0; break; + case OBJ_STREAM: isempty = streamLength(o) == 0; break; default: isempty = 0; } @@ -1684,6 +1685,7 @@ int RM_KeyType(RedisModuleKey *key) { case OBJ_ZSET: return REDISMODULE_KEYTYPE_ZSET; case OBJ_HASH: return REDISMODULE_KEYTYPE_HASH; case OBJ_MODULE: return REDISMODULE_KEYTYPE_MODULE; + /* case OBJ_STREAM: return REDISMODULE_KEYTYPE_STREAM; - don't wanna add new API to 5.0 */ default: return 0; } } @@ -1701,6 +1703,7 @@ size_t RM_ValueLength(RedisModuleKey *key) { case OBJ_SET: return setTypeSize(key->value); case OBJ_ZSET: return zsetLength(key->value); case OBJ_HASH: return hashTypeLength(key->value); + case OBJ_STREAM: return streamLength(key->value); default: return 0; } } diff --git a/src/stream.h b/src/stream.h index 4223b4177..ea135a668 100644 --- a/src/stream.h +++ b/src/stream.h @@ -103,6 +103,7 @@ struct client; stream *streamNew(void); void freeStream(stream *s); +unsigned long streamLength(const robj *subject); size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer, int flags, streamPropInfo *spi); void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamID *end, int rev); int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields); diff --git a/src/t_stream.c b/src/t_stream.c index f080f9192..d7754985d 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -82,6 +82,12 @@ void streamIncrID(streamID *id) { } } +/* Return the length of a stream. */ +unsigned long streamLength(const robj *subject) { + stream *s = subject->ptr; + return s->length; +} + /* Generate the next stream item ID given the previous one. If the current * milliseconds Unix time is greater than the previous one, just use this * as time part and start with sequence part of zero. Otherwise we use the |