summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2018-03-29 15:13:31 +0200
committerantirez <antirez@gmail.com>2018-03-29 15:13:31 +0200
commit0701cad3de615ef2fbdda27514dfaa6e52734e73 (patch)
tree42be2386a499d64ddc5e3f940a7c320f8d30af32 /src
parent8ac7af1c5d4d06d6c165e35d67a3a6a70e5d98c3 (diff)
downloadredis-0701cad3de615ef2fbdda27514dfaa6e52734e73.tar.gz
Modules Cluster API: message bus implementation.
Diffstat (limited to 'src')
-rw-r--r--src/cluster.c72
-rw-r--r--src/cluster.h20
-rw-r--r--src/module.c23
-rw-r--r--src/server.h1
4 files changed, 111 insertions, 5 deletions
diff --git a/src/cluster.c b/src/cluster.c
index d0f19bff4..0faa987e5 100644
--- a/src/cluster.c
+++ b/src/cluster.c
@@ -75,6 +75,7 @@ void clusterDelNode(clusterNode *delnode);
sds representClusterNodeFlags(sds ci, uint16_t flags);
uint64_t clusterGetMaxEpoch(void);
int clusterBumpConfigEpochWithoutConsensus(void);
+void moduleCallClusterReceivers(char *sender_id, uint64_t module_id, uint8_t type, const unsigned char *payload, uint32_t len);
/* -----------------------------------------------------------------------------
* Initialization
@@ -1682,6 +1683,12 @@ int clusterProcessPacket(clusterLink *link) {
explen += sizeof(clusterMsgDataUpdate);
if (totlen != explen) return 1;
+ } else if (type == CLUSTERMSG_TYPE_MODULE) {
+ uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
+
+ explen += sizeof(clusterMsgDataPublish) -
+ 3 + ntohl(hdr->data.module.msg.len);
+ if (totlen != explen) return 1;
}
/* Check if the sender is a known node. */
@@ -2076,6 +2083,15 @@ int clusterProcessPacket(clusterLink *link) {
* config accordingly. */
clusterUpdateSlotsConfigWith(n,reportedConfigEpoch,
hdr->data.update.nodecfg.slots);
+ } else if (type == CLUSTERMSG_TYPE_MODULE) {
+ if (!sender) return 1; /* Protect the module from unknown nodes. */
+ /* We need to route this message back to the right module subscribed
+ * for the right message type. */
+ uint64_t module_id = hdr->data.module.msg.module_id; /* Endian-safe ID */
+ uint32_t len = ntohl(hdr->data.module.msg.len);
+ uint8_t type = hdr->data.module.msg.type;
+ unsigned char *payload = hdr->data.module.msg.bulk_data;
+ moduleCallClusterReceivers(sender->name,module_id,type,payload,len);
} else {
serverLog(LL_WARNING,"Received unknown packet type: %d", type);
}
@@ -2563,6 +2579,61 @@ void clusterSendUpdate(clusterLink *link, clusterNode *node) {
clusterSendMessage(link,buf,ntohl(hdr->totlen));
}
+/* Send a MODULE message.
+ *
+ * If link is NULL, then the message is broadcasted to the whole cluster. */
+void clusterSendModule(clusterLink *link, uint64_t module_id, uint8_t type,
+ unsigned char *payload, uint32_t len) {
+ unsigned char buf[sizeof(clusterMsg)], *heapbuf;
+ clusterMsg *hdr = (clusterMsg*) buf;
+ uint32_t totlen;
+
+ clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_MODULE);
+ totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
+ totlen += sizeof(clusterMsgModule) - 3 + len;
+
+ hdr->data.module.msg.module_id = module_id; /* Already endian adjusted. */
+ hdr->data.module.msg.type = type;
+ hdr->data.module.msg.len = htonl(len);
+ hdr->totlen = htonl(totlen);
+
+ /* Try to use the local buffer if possible */
+ if (totlen < sizeof(buf)) {
+ heapbuf = buf;
+ } else {
+ heapbuf = zmalloc(totlen);
+ memcpy(heapbuf,hdr,sizeof(*hdr));
+ hdr = (clusterMsg*) heapbuf;
+ }
+ memcpy(hdr->data.module.msg.bulk_data,payload,len);
+
+ if (link)
+ clusterSendMessage(link,heapbuf,totlen);
+ else
+ clusterBroadcastMessage(heapbuf,totlen);
+
+ if (heapbuf != buf) zfree(heapbuf);
+}
+
+/* This function gets a cluster node ID string as target, the same way the nodes
+ * addresses are represented in the modules side, resolves the node, and sends
+ * the message. If the target is NULL the message is broadcasted.
+ *
+ * The function returns C_OK if the target is valid, otherwise C_ERR is
+ * returned. */
+int clusterSendModuleMessageToTarget(char *target, uint64_t module_id, uint8_t type, unsigned char *payload, uint32_t len) {
+ clusterNode *node = NULL;
+
+ if (target != NULL) {
+ node = clusterLookupNode(target);
+ if (node == NULL || node->link == NULL) return C_ERR;
+ }
+
+ clusterSendModule(target ? node->link : NULL,
+ module_id, type, payload, len);
+ return C_OK;
+}
+
/* -----------------------------------------------------------------------------
* CLUSTER Pub/Sub support
*
@@ -4008,6 +4079,7 @@ const char *clusterGetMessageTypeString(int type) {
case CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK: return "auth-ack";
case CLUSTERMSG_TYPE_UPDATE: return "update";
case CLUSTERMSG_TYPE_MFSTART: return "mfstart";
+ case CLUSTERMSG_TYPE_MODULE: return "module";
}
return "unknown";
}
diff --git a/src/cluster.h b/src/cluster.h
index f2b9a4ecf..4d4a4d60e 100644
--- a/src/cluster.h
+++ b/src/cluster.h
@@ -97,7 +97,8 @@ typedef struct clusterLink {
#define CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK 6 /* Yes, you have my vote */
#define CLUSTERMSG_TYPE_UPDATE 7 /* Another node slots configuration */
#define CLUSTERMSG_TYPE_MFSTART 8 /* Pause clients for manual failover */
-#define CLUSTERMSG_TYPE_COUNT 9 /* Total number of message types. */
+#define CLUSTERMSG_TYPE_MODULE 9 /* Module cluster API message. */
+#define CLUSTERMSG_TYPE_COUNT 10 /* Total number of message types. */
/* This structure represent elements of node->fail_reports. */
typedef struct clusterNodeFailReport {
@@ -195,10 +196,7 @@ typedef struct {
typedef struct {
uint32_t channel_len;
uint32_t message_len;
- /* We can't reclare bulk_data as bulk_data[] since this structure is
- * nested. The 8 bytes are removed from the count during the message
- * length computation. */
- unsigned char bulk_data[8];
+ unsigned char bulk_data[8]; /* 8 bytes just as placeholder. */
} clusterMsgDataPublish;
typedef struct {
@@ -207,6 +205,13 @@ typedef struct {
unsigned char slots[CLUSTER_SLOTS/8]; /* Slots bitmap. */
} clusterMsgDataUpdate;
+typedef struct {
+ uint64_t module_id; /* ID of the sender module. */
+ uint32_t len; /* ID of the sender module. */
+ uint8_t type; /* Type from 0 to 255. */
+ unsigned char bulk_data[3]; /* 3 bytes just as placeholder. */
+} clusterMsgModule;
+
union clusterMsgData {
/* PING, MEET and PONG */
struct {
@@ -228,6 +233,11 @@ union clusterMsgData {
struct {
clusterMsgDataUpdate nodecfg;
} update;
+
+ /* MODULE */
+ struct {
+ clusterMsgModule msg;
+ } module;
};
#define CLUSTER_PROTO_VER 1 /* Cluster bus protocol version. */
diff --git a/src/module.c b/src/module.c
index e8af8e3ff..fc00e9f9e 100644
--- a/src/module.c
+++ b/src/module.c
@@ -3809,6 +3809,29 @@ void moduleUnsubscribeNotifications(RedisModule *module) {
}
/* --------------------------------------------------------------------------
+ * Modules Cluster API
+ * -------------------------------------------------------------------------- */
+
+typedef void (*RedisModuleClusterMessageReceiver)(RedisModuleCtx *ctx, char *sender_id, uint8_t type, const unsigned char *payload, uint32_t len);
+
+/* This structure identifies a registered caller: it must match a given module
+ * ID, for a given message type. The callback function is just the function
+ * that was registered as receiver. */
+struct moduleClusterReceiver {
+ uint64_t module_id;
+ uint8_t msg_type;
+ RedisModuleClusterMessageReceiver callback;
+ struct moduleClusterReceiver *next;
+};
+
+/* We have an array of message types: each bucket is a linked list of
+ * configured receivers. */
+static struct moduleClusterReceiver *clusterReceivers[UINT8_MAX];
+
+void moduleCallClusterReceivers(char *sender_id, uint64_t module_id, uint8_t type, const unsigned char *payload, uint32_t len) {
+}
+
+/* --------------------------------------------------------------------------
* Modules API internals
* -------------------------------------------------------------------------- */
diff --git a/src/server.h b/src/server.h
index 155937536..cca56bc35 100644
--- a/src/server.h
+++ b/src/server.h
@@ -1808,6 +1808,7 @@ void clusterCron(void);
void clusterPropagatePublish(robj *channel, robj *message);
void migrateCloseTimedoutSockets(void);
void clusterBeforeSleep(void);
+int clusterSendModuleMessageToTarget(char *target, uint64_t module_id, uint8_t type, unsigned char *payload, uint32_t len);
/* Sentinel */
void initSentinelConfig(void);