summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorzhenwei pi <pizhenwei@bytedance.com>2022-07-27 12:18:28 +0800
committerzhenwei pi <pizhenwei@bytedance.com>2022-08-22 15:16:08 +0800
commit0b27cfe37d2d996cdab92629cfe6d6a39a15b464 (patch)
treef14eae4b629bc8a64bc747762b76cad430e4aed2
parent45617385e72eb814c6548064835cf19cdf466a5b (diff)
downloadredis-0b27cfe37d2d996cdab92629cfe6d6a39a15b464.tar.gz
Introduce .listen into connection type
Introduce listen method into connection type, this allows no hard code of listen logic. Originally, we initialize server during startup like this: if (server.port) listenToPort(server.port,&server.ipfd); if (server.tls_port) listenToPort(server.port,&server.tlsfd); if (server.unixsocket) anetUnixServer(...server.unixsocket...); ... if (createSocketAcceptHandler(&server.ipfd, acceptTcpHandler) != C_OK) if (createSocketAcceptHandler(&server.tlsfd, acceptTcpHandler) != C_OK) if (createSocketAcceptHandler(&server.sofd, acceptTcpHandler) != C_OK) ... If a new connection type gets supported, we have to add more hard code to setup listener. Introduce .listen and refactor listener, and Unix socket supports this. this allows to setup listener arguments and create listener in a loop. What's more, '.listen' is defined in connection.h, so we should include server.h to import 'struct socketFds', but server.h has already include 'connection.h'. To avoid including loop(also to make code reasonable), define 'struct connListener' in connection.h instead of 'struct socketFds' in server.h. This leads this commit to get more changes. There are more fields in 'struct connListener', hence it's possible to simplify changeBindAddr & applyTLSPort() & updatePort() into a single logic: update the listener config from the server.xxx, and re-create the listener. Because of the new field 'priv' in struct connListener, we expect to pass this to the accept handler(even it's not used currently), this may be used in the future. Signed-off-by: zhenwei pi <pizhenwei@bytedance.com>
-rw-r--r--src/cluster.c22
-rw-r--r--src/config.c43
-rw-r--r--src/connection.h20
-rw-r--r--src/server.c193
-rw-r--r--src/server.h20
-rw-r--r--src/socket.c5
-rw-r--r--src/tls.c5
-rw-r--r--src/unix.c26
8 files changed, 213 insertions, 121 deletions
diff --git a/src/cluster.c b/src/cluster.c
index 7aed7b35b..2a259b83c 100644
--- a/src/cluster.c
+++ b/src/cluster.c
@@ -678,9 +678,6 @@ void clusterInit(void) {
}
if (saveconf) clusterSaveConfigOrDie(1);
- /* We need a listening TCP port for our cluster messaging needs. */
- server.cfd.count = 0;
-
/* Port sanity check II
* The other handshake port check is triggered too late to stop
* us from trying to use a too-high cluster port number. */
@@ -696,14 +693,25 @@ void clusterInit(void) {
serverLog(LL_WARNING, "No bind address is configured, but it is required for the Cluster bus.");
exit(1);
}
- int cport = server.cluster_port ? server.cluster_port : port + CLUSTER_PORT_INCR;
- if (listenToPort(cport, &server.cfd) == C_ERR ) {
+
+ if (connectionIndexByType(connTypeOfCluster()->get_type(NULL)) < 0) {
+ serverLog(LL_WARNING, "Missing connection type %s, but it is required for the Cluster bus.", connTypeOfCluster()->get_type(NULL));
+ exit(1);
+ }
+
+ connListener *listener = &server.clistener;
+ listener->count = 0;
+ listener->bindaddr = server.bindaddr;
+ listener->bindaddr_count = server.bindaddr_count;
+ listener->port = server.cluster_port ? server.cluster_port : port + CLUSTER_PORT_INCR;
+ listener->ct = connTypeOfCluster();
+ if (connListen(listener) == C_ERR ) {
/* Note: the following log text is matched by the test suite. */
- serverLog(LL_WARNING, "Failed listening on port %u (cluster), aborting.", cport);
+ serverLog(LL_WARNING, "Failed listening on port %u (cluster), aborting.", listener->port);
exit(1);
}
- if (createSocketAcceptHandler(&server.cfd, clusterAcceptHandler) != C_OK) {
+ if (createSocketAcceptHandler(&server.clistener, clusterAcceptHandler) != C_OK) {
serverPanic("Unrecoverable error creating Redis Cluster socket accept handler.");
}
diff --git a/src/config.c b/src/config.c
index 561810330..20a918dd8 100644
--- a/src/config.c
+++ b/src/config.c
@@ -2430,7 +2430,14 @@ static int updateHZ(const char **err) {
}
static int updatePort(const char **err) {
- if (changeListenPort(server.port, &server.ipfd, connAcceptHandler(connectionTypeTcp())) == C_ERR) {
+ connListener *listener = listenerByType(CONN_TYPE_SOCKET);
+
+ serverAssert(listener != NULL);
+ listener->bindaddr = server.bindaddr;
+ listener->bindaddr_count = server.bindaddr_count;
+ listener->port = server.port;
+ listener->ct = connectionByType(CONN_TYPE_SOCKET);
+ if (changeListener(listener) == C_ERR) {
*err = "Unable to listen on this port. Check server logs.";
return 0;
}
@@ -2545,12 +2552,36 @@ int updateRequirePass(const char **err) {
return 1;
}
+/* applyBind affects both TCP and TLS (if enabled) together */
static int applyBind(const char **err) {
- if (changeBindAddr() == C_ERR) {
+ connListener *tcp_listener = listenerByType(CONN_TYPE_SOCKET);
+ connListener *tls_listener = listenerByType(CONN_TYPE_TLS);
+
+ serverAssert(tcp_listener != NULL);
+ tcp_listener->bindaddr = server.bindaddr;
+ tcp_listener->bindaddr_count = server.bindaddr_count;
+ tcp_listener->port = server.port;
+ tcp_listener->ct = connectionByType(CONN_TYPE_SOCKET);
+ if (changeListener(tcp_listener) == C_ERR) {
*err = "Failed to bind to specified addresses.";
+ if (tls_listener)
+ closeListener(tls_listener); /* failed with TLS together */
return 0;
}
+ if (server.tls_port != 0) {
+ serverAssert(tls_listener != NULL);
+ tls_listener->bindaddr = server.bindaddr;
+ tls_listener->bindaddr_count = server.bindaddr_count;
+ tls_listener->port = server.tls_port;
+ tls_listener->ct = connectionByType(CONN_TYPE_TLS);
+ if (changeListener(tls_listener) == C_ERR) {
+ *err = "Failed to bind to specified addresses.";
+ closeListener(tcp_listener); /* failed with TCP together */
+ return 0;
+ }
+ }
+
return 1;
}
@@ -2591,7 +2622,13 @@ static int applyTLSPort(const char **err) {
return 0;
}
- if (changeListenPort(server.tls_port, &server.tlsfd, connAcceptHandler(connectionTypeTls())) == C_ERR) {
+ connListener *listener = listenerByType(CONN_TYPE_TLS);
+ serverAssert(listener != NULL);
+ listener->bindaddr = server.bindaddr;
+ listener->bindaddr_count = server.bindaddr_count;
+ listener->port = server.tls_port;
+ listener->ct = connectionByType(CONN_TYPE_TLS);
+ if (changeListener(listener) == C_ERR) {
*err = "Unable to listen on this port. Check server logs.";
return 0;
}
diff --git a/src/connection.h b/src/connection.h
index c61b56451..ab68b6975 100644
--- a/src/connection.h
+++ b/src/connection.h
@@ -44,6 +44,7 @@
struct aeEventLoop;
typedef struct connection connection;
+typedef struct connListener connListener;
typedef enum {
CONN_STATE_NONE = 0,
@@ -77,6 +78,7 @@ typedef struct ConnectionType {
void (*ae_handler)(struct aeEventLoop *el, int fd, void *clientData, int mask);
aeFileProc *accept_handler;
int (*addr)(connection *conn, char *ip, size_t ip_len, int *port, int remote);
+ int (*listen)(connListener *listener);
/* create/close connection */
connection* (*conn_create)(void);
@@ -122,6 +124,19 @@ struct connection {
int fd;
};
+#define CONFIG_BINDADDR_MAX 16
+
+/* Setup a listener by a connection type */
+struct connListener {
+ int fd[CONFIG_BINDADDR_MAX];
+ int count;
+ char **bindaddr;
+ int bindaddr_count;
+ int port;
+ ConnectionType *ct;
+ void *priv; /* used by connection type specified data */
+};
+
/* The connection module does not deal with listening and accepting sockets,
* so we assume we have a socket when an incoming connection is created.
*
@@ -406,6 +421,11 @@ int connTypeHasPendingData(void);
/* walk all the connection types and process pending data for each connection type */
int connTypeProcessPendingData(void);
+/* Listen on an initialized listener */
+static inline int connListen(connListener *listener) {
+ return listener->ct->listen(listener);
+}
+
/* Get accept_handler of a connection type */
static inline aeFileProc *connAcceptHandler(ConnectionType *ct) {
if (ct)
diff --git a/src/server.c b/src/server.c
index 787bc3412..29397880d 100644
--- a/src/server.c
+++ b/src/server.c
@@ -1862,9 +1862,7 @@ void initServerConfig(void) {
server.bindaddr_count = CONFIG_DEFAULT_BINDADDR_COUNT;
for (j = 0; j < CONFIG_DEFAULT_BINDADDR_COUNT; j++)
server.bindaddr[j] = zstrdup(default_bindaddr[j]);
- server.ipfd.count = 0;
- server.tlsfd.count = 0;
- server.sofd.count = 0;
+ memset(server.listeners, 0x00, sizeof(server.listeners));
server.active_expire_enabled = 1;
server.skip_checksum_validation = 0;
server.loading = 0;
@@ -2232,7 +2230,7 @@ void checkTcpBacklogSettings(void) {
#endif
}
-void closeSocketListeners(socketFds *sfd) {
+void closeListener(connListener *sfd) {
int j;
for (j = 0; j < sfd->count; j++) {
@@ -2247,11 +2245,11 @@ void closeSocketListeners(socketFds *sfd) {
/* Create an event handler for accepting new connections in TCP or TLS domain sockets.
* This works atomically for all socket fds */
-int createSocketAcceptHandler(socketFds *sfd, aeFileProc *accept_handler) {
+int createSocketAcceptHandler(connListener *sfd, aeFileProc *accept_handler) {
int j;
for (j = 0; j < sfd->count; j++) {
- if (aeCreateFileEvent(server.el, sfd->fd[j], AE_READABLE, accept_handler,NULL) == AE_ERR) {
+ if (aeCreateFileEvent(server.el, sfd->fd[j], AE_READABLE, accept_handler,sfd) == AE_ERR) {
/* Rollback */
for (j = j-1; j >= 0; j--) aeDeleteFileEvent(server.el, sfd->fd[j], AE_READABLE);
return C_ERR;
@@ -2264,7 +2262,8 @@ int createSocketAcceptHandler(socketFds *sfd, aeFileProc *accept_handler) {
* binding the addresses specified in the Redis server configuration.
*
* The listening file descriptors are stored in the integer array 'fds'
- * and their number is set in '*count'.
+ * and their number is set in '*count'. Actually @sfd should be 'listener',
+ * for the historical reasons, let's keep 'sfd' here.
*
* The addresses to bind are specified in the global server.bindaddr array
* and their number is server.bindaddr_count. If the server configuration
@@ -2278,14 +2277,15 @@ int createSocketAcceptHandler(socketFds *sfd, aeFileProc *accept_handler) {
* impossible to bind, or no bind addresses were specified in the server
* configuration but the function is not able to bind * for at least
* one of the IPv4 or IPv6 protocols. */
-int listenToPort(int port, socketFds *sfd) {
+int listenToPort(connListener *sfd) {
int j;
- char **bindaddr = server.bindaddr;
+ int port = sfd->port;
+ char **bindaddr = sfd->bindaddr;
/* If we have no bind address, we don't listen on a TCP socket */
- if (server.bindaddr_count == 0) return C_OK;
+ if (sfd->bindaddr_count == 0) return C_OK;
- for (j = 0; j < server.bindaddr_count; j++) {
+ for (j = 0; j < sfd->bindaddr_count; j++) {
char* addr = bindaddr[j];
int optional = *addr == '-';
if (optional) addr++;
@@ -2309,7 +2309,7 @@ int listenToPort(int port, socketFds *sfd) {
continue;
/* Rollback successful listens before exiting */
- closeSocketListeners(sfd);
+ closeListener(sfd);
return C_ERR;
}
if (server.socket_mark_id > 0) anetSetSockMarkId(NULL, sfd->fd[sfd->count], server.socket_mark_id);
@@ -2470,38 +2470,38 @@ void initServer(void) {
}
server.db = zmalloc(sizeof(redisDb)*server.dbnum);
- /* Open the TCP listening socket for the user commands. */
- if (server.port != 0 &&
- listenToPort(server.port,&server.ipfd) == C_ERR) {
- /* Note: the following log text is matched by the test suite. */
- serverLog(LL_WARNING, "Failed listening on port %u (TCP), aborting.", server.port);
- exit(1);
- }
- if (server.tls_port != 0 &&
- listenToPort(server.tls_port,&server.tlsfd) == C_ERR) {
- /* Note: the following log text is matched by the test suite. */
- serverLog(LL_WARNING, "Failed listening on port %u (TLS), aborting.", server.tls_port);
- exit(1);
+ /* Setup listeners from server config for TCP/TLS/Unix */
+ int conn_index;
+ connListener *listener;
+ if (server.port != 0) {
+ conn_index = connectionIndexByType(CONN_TYPE_SOCKET);
+ if (conn_index < 0)
+ serverPanic("Failed finding connection listener of %s", CONN_TYPE_SOCKET);
+ listener = &server.listeners[conn_index];
+ listener->bindaddr = server.bindaddr;
+ listener->bindaddr_count = server.bindaddr_count;
+ listener->port = server.port;
+ listener->ct = connectionByType(CONN_TYPE_SOCKET);
+ }
+ if (server.tls_port != 0) {
+ conn_index = connectionIndexByType(CONN_TYPE_TLS);
+ if (conn_index < 0)
+ serverPanic("Failed finding connection listener of %s", CONN_TYPE_TLS);
+ listener = &server.listeners[conn_index];
+ listener->bindaddr = server.bindaddr;
+ listener->bindaddr_count = server.bindaddr_count;
+ listener->port = server.tls_port;
+ listener->ct = connectionByType(CONN_TYPE_TLS);
}
-
- /* Open the listening Unix domain socket. */
if (server.unixsocket != NULL) {
- unlink(server.unixsocket); /* don't care if this fails */
- server.sofd.fd[0] = anetUnixServer(server.neterr,server.unixsocket,
- (mode_t)server.unixsocketperm, server.tcp_backlog);
- if (server.sofd.fd[0] == ANET_ERR) {
- serverLog(LL_WARNING, "Failed opening Unix socket: %s", server.neterr);
- exit(1);
- }
- anetNonBlock(NULL,server.sofd.fd[0]);
- anetCloexec(server.sofd.fd[0]);
- server.sofd.count = 1;
- }
-
- /* Abort if there are no listening sockets at all. */
- if (server.ipfd.count == 0 && server.tlsfd.count == 0 && server.sofd.count == 0) {
- serverLog(LL_WARNING, "Configured to not listen anywhere, exiting.");
- exit(1);
+ conn_index = connectionIndexByType(CONN_TYPE_UNIX);
+ if (conn_index < 0)
+ serverPanic("Failed finding connection listener of %s", CONN_TYPE_UNIX);
+ listener = &server.listeners[conn_index];
+ listener->bindaddr = &server.unixsocket;
+ listener->bindaddr_count = 1;
+ listener->ct = connectionByType(CONN_TYPE_UNIX);
+ listener->priv = &server.unixsocketperm; /* Unix socket specified */
}
/* Create the Redis databases, and initialize other internal state. */
@@ -2584,18 +2584,28 @@ void initServer(void) {
exit(1);
}
- /* Create an event handler for accepting new connections in TCP and Unix
- * domain sockets. */
- if (createSocketAcceptHandler(&server.ipfd, connAcceptHandler(connectionTypeTcp())) != C_OK) {
- serverPanic("Unrecoverable error creating TCP socket accept handler.");
- }
- if (createSocketAcceptHandler(&server.tlsfd, connAcceptHandler(connectionTypeTls())) != C_OK) {
- serverPanic("Unrecoverable error creating TLS socket accept handler.");
- }
- if (createSocketAcceptHandler(&server.sofd, connAcceptHandler(connectionTypeUnix())) != C_OK) {
- serverPanic("Unrecoverable error creating server.sofd file event.");
+ /* create all the configured listener, and add handler to start to accept */
+ int listen_fds = 0;
+ for (j = 0; j < CONN_TYPE_MAX; j++) {
+ listener = &server.listeners[j];
+ if (listener->ct == NULL)
+ continue;
+
+ if (connListen(listener) == C_ERR) {
+ serverLog(LL_WARNING, "Failed listening on port %u (%s), aborting.", listener->port, listener->ct->get_type(NULL));
+ exit(1);
+ }
+
+ if (createSocketAcceptHandler(listener, connAcceptHandler(listener->ct)) != C_OK)
+ serverPanic("Unrecoverable error creating %s listener accept handler.", listener->ct->get_type(NULL));
+
+ listen_fds += listener->count;
}
+ if (listen_fds == 0) {
+ serverLog(LL_WARNING, "Configured to not listen anywhere, exiting.");
+ exit(1);
+ }
/* Register a readable event for the pipe used to awake the event loop
* from module threads. */
@@ -4067,11 +4077,16 @@ void incrementErrorCount(const char *fullerr, size_t namelen) {
void closeListeningSockets(int unlink_unix_socket) {
int j;
- for (j = 0; j < server.ipfd.count; j++) close(server.ipfd.fd[j]);
- for (j = 0; j < server.tlsfd.count; j++) close(server.tlsfd.fd[j]);
- for (j = 0; j < server.sofd.count; j++) close(server.sofd.fd[j]);
+ for (int i = 0; i < CONN_TYPE_MAX; i++) {
+ connListener *listener = &server.listeners[i];
+ if (listener->ct == NULL)
+ continue;
+
+ for (j = 0; j < listener->count; j++) close(listener->fd[j]);
+ }
+
if (server.cluster_enabled)
- for (j = 0; j < server.cfd.count; j++) close(server.cfd.fd[j]);
+ for (j = 0; j < server.clistener.count; j++) close(server.clistener.fd[j]);
if (unlink_unix_socket && server.unixsocket) {
serverLog(LL_NOTICE,"Removing the unix socket file.");
if (unlink(server.unixsocket) != 0)
@@ -6266,61 +6281,38 @@ void redisAsciiArt(void) {
zfree(buf);
}
-int changeBindAddr(void) {
- /* Close old TCP and TLS servers */
- closeSocketListeners(&server.ipfd);
- closeSocketListeners(&server.tlsfd);
+/* Get the server listener by type name */
+connListener *listenerByType(const char *typename) {
+ int conn_index;
- /* Bind to the new port */
- if ((server.port != 0 && listenToPort(server.port, &server.ipfd) != C_OK) ||
- (server.tls_port != 0 && listenToPort(server.tls_port, &server.tlsfd) != C_OK)) {
- serverLog(LL_WARNING, "Failed to bind");
-
- closeSocketListeners(&server.ipfd);
- closeSocketListeners(&server.tlsfd);
- return C_ERR;
- }
-
- /* Create TCP and TLS event handlers */
- if (createSocketAcceptHandler(&server.ipfd, connAcceptHandler(connectionTypeTcp())) != C_OK) {
- serverPanic("Unrecoverable error creating TCP socket accept handler.");
- }
- if (createSocketAcceptHandler(&server.tlsfd, connAcceptHandler(connectionTypeTls())) != C_OK) {
- serverPanic("Unrecoverable error creating TLS socket accept handler.");
- }
-
- if (server.set_proc_title) redisSetProcTitle(NULL);
+ conn_index = connectionIndexByType(typename);
+ if (conn_index < 0)
+ return NULL;
- return C_OK;
+ return &server.listeners[conn_index];
}
-int changeListenPort(int port, socketFds *sfd, aeFileProc *accept_handler) {
- socketFds new_sfd = {{0}};
-
+/* Close original listener, re-create a new listener from the updated bind address & port */
+int changeListener(connListener *listener) {
/* Close old servers */
- closeSocketListeners(sfd);
+ closeListener(listener);
/* Just close the server if port disabled */
- if (port == 0) {
+ if (listener->port == 0) {
if (server.set_proc_title) redisSetProcTitle(NULL);
return C_OK;
}
- /* Bind to the new port */
- if (listenToPort(port, &new_sfd) != C_OK) {
+ /* Re-create listener */
+ if (connListen(listener) != C_OK) {
return C_ERR;
}
/* Create event handlers */
- if (createSocketAcceptHandler(&new_sfd, accept_handler) != C_OK) {
- closeSocketListeners(&new_sfd);
- return C_ERR;
+ if (createSocketAcceptHandler(listener, listener->ct->accept_handler) != C_OK) {
+ serverPanic("Unrecoverable error creating %s accept handler.", listener->ct->get_type(NULL));
}
- /* Copy new descriptors */
- sfd->count = new_sfd.count;
- memcpy(sfd->fd, new_sfd.fd, sizeof(new_sfd.fd));
-
if (server.set_proc_title) redisSetProcTitle(NULL);
return C_OK;
@@ -7135,10 +7127,15 @@ int main(int argc, char **argv) {
exit(1);
}
}
- if (server.ipfd.count > 0 || server.tlsfd.count > 0)
- serverLog(LL_NOTICE,"Ready to accept connections");
- if (server.sofd.count > 0)
- serverLog(LL_NOTICE,"The server is now ready to accept connections at %s", server.unixsocket);
+
+ for (j = 0; j < CONN_TYPE_MAX; j++) {
+ connListener *listener = &server.listeners[j];
+ if (listener->ct == NULL)
+ continue;
+
+ serverLog(LL_NOTICE,"Ready to accept connections %s", listener->ct->get_type(NULL));
+ }
+
if (server.supervised_mode == SUPERVISED_SYSTEMD) {
if (!server.masterhost) {
redisCommunicateSystemd("STATUS=Ready to accept connections\n");
diff --git a/src/server.h b/src/server.h
index e6088a8b4..65faa84af 100644
--- a/src/server.h
+++ b/src/server.h
@@ -1376,11 +1376,6 @@ struct malloc_stats {
size_t allocator_resident;
};
-typedef struct socketFds {
- int fd[CONFIG_BINDADDR_MAX];
- int count;
-} socketFds;
-
/*-----------------------------------------------------------------------------
* TLS Context Configuration
*----------------------------------------------------------------------------*/
@@ -1514,11 +1509,9 @@ struct redisServer {
char *bind_source_addr; /* Source address to bind on for outgoing connections */
char *unixsocket; /* UNIX socket path */
unsigned int unixsocketperm; /* UNIX socket permission (see mode_t) */
- socketFds ipfd; /* TCP socket file descriptors */
- socketFds tlsfd; /* TLS socket file descriptors */
- socketFds sofd; /* Unix socket file descriptor */
+ connListener listeners[CONN_TYPE_MAX]; /* TCP/Unix/TLS even more types */
uint32_t socket_mark_id; /* ID for listen socket marking */
- socketFds cfd; /* Cluster bus listening socket */
+ connListener clistener; /* Cluster bus listener */
list *clients; /* List of active clients */
list *clients_to_close; /* Clients to close asynchronously */
list *clients_pending_write; /* There is to write or install handler. */
@@ -2525,7 +2518,7 @@ char *getClientTypeName(int class);
void flushSlavesOutputBuffers(void);
void disconnectSlaves(void);
void evictClients(void);
-int listenToPort(int port, socketFds *fds);
+int listenToPort(connListener *fds);
void pauseClients(pause_purpose purpose, mstime_t end, pause_type type);
void unpauseClients(pause_purpose purpose);
int areClientsPaused(void);
@@ -2891,9 +2884,10 @@ int processCommand(client *c);
int processPendingCommandAndInputBuffer(client *c);
void setupSignalHandlers(void);
void removeSignalHandlers(void);
-int createSocketAcceptHandler(socketFds *sfd, aeFileProc *accept_handler);
-int changeListenPort(int port, socketFds *sfd, aeFileProc *accept_handler);
-int changeBindAddr(void);
+int createSocketAcceptHandler(connListener *sfd, aeFileProc *accept_handler);
+connListener *listenerByType(const char *typename);
+int changeListener(connListener *listener);
+void closeListener(connListener *listener);
struct redisCommand *lookupSubcommand(struct redisCommand *container, sds sub_name);
struct redisCommand *lookupCommand(robj **argv, int argc);
struct redisCommand *lookupCommandBySdsLogic(dict *commands, sds s);
diff --git a/src/socket.c b/src/socket.c
index 387330f97..83000eaf8 100644
--- a/src/socket.c
+++ b/src/socket.c
@@ -329,6 +329,10 @@ static int connSocketAddr(connection *conn, char *ip, size_t ip_len, int *port,
return C_ERR;
}
+static int connSocketListen(connListener *listener) {
+ return listenToPort(listener);
+}
+
static int connSocketBlockingConnect(connection *conn, const char *addr, int port, long long timeout) {
int fd = anetTcpNonBlockConnect(NULL,addr,port);
if (fd == -1) {
@@ -382,6 +386,7 @@ static ConnectionType CT_Socket = {
.ae_handler = connSocketEventHandler,
.accept_handler = connSocketAcceptHandler,
.addr = connSocketAddr,
+ .listen = connSocketListen,
/* create/close connection */
.conn_create = connCreateSocket,
diff --git a/src/tls.c b/src/tls.c
index 0757797fe..be6bf4e1b 100644
--- a/src/tls.c
+++ b/src/tls.c
@@ -743,6 +743,10 @@ static int connTLSAddr(connection *conn, char *ip, size_t ip_len, int *port, int
return anetFdToString(conn->fd, ip, ip_len, port, remote);
}
+static int connTLSListen(connListener *listener) {
+ return listenToPort(listener);
+}
+
static void connTLSClose(connection *conn_) {
tls_connection *conn = (tls_connection *) conn_;
@@ -1104,6 +1108,7 @@ static ConnectionType CT_TLS = {
.ae_handler = tlsEventHandler,
.accept_handler = tlsAcceptHandler,
.addr = connTLSAddr,
+ .listen = connTLSListen,
/* create/close connection */
.conn_create = connCreateTLS,
diff --git a/src/unix.c b/src/unix.c
index 97bef0db4..c4ffad4fb 100644
--- a/src/unix.c
+++ b/src/unix.c
@@ -43,6 +43,31 @@ static int connUnixAddr(connection *conn, char *ip, size_t ip_len, int *port, in
return connectionTypeTcp()->addr(conn, ip, ip_len, port, remote);
}
+static int connUnixListen(connListener *listener) {
+ int fd;
+ mode_t *perm = (mode_t *)listener->priv;
+
+ if (listener->bindaddr_count == 0)
+ return C_OK;
+
+ /* currently listener->bindaddr_count is always 1, we still use a loop here in case Redis supports multi Unix socket in the future */
+ for (int j = 0; j < listener->bindaddr_count; j++) {
+ char *addr = listener->bindaddr[j];
+
+ unlink(addr); /* don't care if this fails */
+ fd = anetUnixServer(server.neterr, addr, *perm, server.tcp_backlog);
+ if (fd == ANET_ERR) {
+ serverLog(LL_WARNING, "Failed opening Unix socket: %s", server.neterr);
+ exit(1);
+ }
+ anetNonBlock(NULL, fd);
+ anetCloexec(fd);
+ listener->fd[listener->count++] = fd;
+ }
+
+ return C_OK;
+}
+
static connection *connCreateUnix(void) {
connection *conn = zcalloc(sizeof(connection));
conn->type = &CT_Unix;
@@ -135,6 +160,7 @@ static ConnectionType CT_Unix = {
.ae_handler = connUnixEventHandler,
.accept_handler = connUnixAcceptHandler,
.addr = connUnixAddr,
+ .listen = connUnixListen,
/* create/close connection */
.conn_create = connCreateUnix,