diff options
author | Oran Agra <oran@redislabs.com> | 2022-08-24 08:35:46 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-08-24 08:35:46 +0300 |
commit | 41d9eb0291417c36d694894c936d8f4f29ec5504 (patch) | |
tree | 5c316244f4c134522d0051e6437e793b5afc6ecc | |
parent | 90223759a37d2613cd6f2085050f8ce2f9a54ee3 (diff) | |
parent | 4faddf18ca8ca3adb93cf1e4e620be9eaf0f6bf4 (diff) | |
download | redis-41d9eb0291417c36d694894c936d8f4f29ec5504.tar.gz |
Merge: Fully abstract connection and make TLS dynamically loadable (#9320)
There are many commits in this PR, the detailed changes is described
in each commit message.
### Main changes in this PR
* Fully abstract connection type, and hide connection type specified methods.
Ex, currently TLS class looks like:
```
static ConnectionType CT_TLS = {
/* connection type */
.get_type = connTLSGetType,
/* connection type initialize & finalize & configure */
.init = tlsInit,
.cleanup = tlsCleanup,
.configure = tlsConfigure,
/* ae & accept & listen & error & address handler */
.ae_handler = tlsEventHandler,
.accept_handler = tlsAcceptHandler,
.addr = connTLSAddr,
.listen = connTLSListen,
/* create/close connection */
.conn_create = connCreateTLS,
.conn_create_accepted = connCreateAcceptedTLS,
.close = connTLSClose,
/* connect & accept */
.connect = connTLSConnect,
.blocking_connect = connTLSBlockingConnect,
.accept = connTLSAccept,
/* IO */
.read = connTLSRead,
.write = connTLSWrite,
.writev = connTLSWritev,
.set_write_handler = connTLSSetWriteHandler,
.set_read_handler = connTLSSetReadHandler,
.get_last_error = connTLSGetLastError,
.sync_write = connTLSSyncWrite,
.sync_read = connTLSSyncRead,
.sync_readline = connTLSSyncReadLine,
/* pending data */
.has_pending_data = tlsHasPendingData,
.process_pending_data = tlsProcessPendingData,
/* TLS specified methods */
.get_peer_cert = connTLSGetPeerCert,
};
int RedisRegisterConnectionTypeTLS()
{
return connTypeRegister(&CT_TLS);
}
```
* Also abstract Unix socket class. Currently, the connection framework becomes like:
```
uplayer
|
connection layer
/ | \
TCP Unix TLS
```
* It's possible to build TLS as a shared library (`make BUILD_TLS=module`).
Loading the shared library(redis-tls.so) into Redis by Redis module subsystem,
and Redis starts to listen TLS port. Ex:
```
./src/redis-server --tls-port 6379 --port 0 \
--tls-cert-file ./tests/tls/redis.crt \
--tls-key-file ./tests/tls/redis.key \
--tls-ca-cert-file ./tests/tls/ca.crt \
--loadmodule src/redis-tls.so
```
### Interface changes
* RM_GetContextFlags supports a new flag: REDISMODULE_CTX_FLAGS_SERVER_STARTUP
* INFO SERVER includes a list of listeners:
```
listener0:name=tcp,bind=127.0.0.1,port=6380
listener1:name=unix,bind=/run/redis.sock
listener2:name=tls,bind=127.0.0.1,port=6379
```
### Other notes
* Fix wrong signature of RedisModuleDefragFunc, this could break
compilation of a module, but not the ABI
* Some reordering of initialization order in server.c:
* Move initialization of listeners to be after loading the modules
* Config TLS after initialization of listeners
* Init cluster after initialization of listeners
* Sentinel does not support the TLS module or any connection module
since it uses hiredis for outbound connections, so when TLS is built as
a module, sentinel lacks TLS support.
-rw-r--r-- | .github/workflows/ci.yml | 3 | ||||
-rw-r--r-- | .github/workflows/daily.yml | 12 | ||||
-rw-r--r-- | TLS.md | 17 | ||||
-rw-r--r-- | deps/Makefile | 2 | ||||
-rw-r--r-- | src/Makefile | 41 | ||||
-rw-r--r-- | src/anet.c | 30 | ||||
-rw-r--r-- | src/anet.h | 8 | ||||
-rw-r--r-- | src/cluster.c | 52 | ||||
-rw-r--r-- | src/config.c | 53 | ||||
-rw-r--r-- | src/connection.c | 525 | ||||
-rw-r--r-- | src/connection.h | 226 | ||||
-rwxr-xr-x | src/mkreleasehdr.sh | 2 | ||||
-rw-r--r-- | src/module.c | 20 | ||||
-rw-r--r-- | src/networking.c | 76 | ||||
-rw-r--r-- | src/redis-cli.c | 3 | ||||
-rw-r--r-- | src/redismodule.h | 48 | ||||
-rw-r--r-- | src/release.c | 7 | ||||
-rw-r--r-- | src/replication.c | 20 | ||||
-rw-r--r-- | src/sentinel.c | 14 | ||||
-rw-r--r-- | src/server.c | 260 | ||||
-rw-r--r-- | src/server.h | 55 | ||||
-rw-r--r-- | src/socket.c | 453 | ||||
-rw-r--r-- | src/tls.c | 208 | ||||
-rw-r--r-- | src/unix.c | 194 | ||||
-rw-r--r-- | tests/instances.tcl | 11 | ||||
-rw-r--r-- | tests/modules/defragtest.c | 4 | ||||
-rw-r--r-- | tests/support/server.tcl | 5 | ||||
-rw-r--r-- | tests/support/util.tcl | 22 | ||||
-rw-r--r-- | tests/test_helper.tcl | 7 | ||||
-rw-r--r-- | tests/unit/moduleapi/infra.tcl | 9 | ||||
-rw-r--r-- | tests/unit/moduleapi/moduleconfigs.tcl | 40 |
31 files changed, 1567 insertions, 860 deletions
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 2b406f74b..f6a61b0e0 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -24,7 +24,8 @@ jobs: steps: - uses: actions/checkout@v3 - name: make - run: make SANITIZER=address REDIS_CFLAGS='-Werror' + # build with TLS module just for compilation coverage + run: make SANITIZER=address REDIS_CFLAGS='-Werror' BUILD_TLS=module - name: testprep run: sudo apt-get install tcl8.6 tclx -y - name: test diff --git a/.github/workflows/daily.yml b/.github/workflows/daily.yml index da3c8a21f..e9e0549af 100644 --- a/.github/workflows/daily.yml +++ b/.github/workflows/daily.yml @@ -539,7 +539,7 @@ jobs: run: | yum -y install centos-release-scl epel-release yum -y install devtoolset-7 openssl-devel openssl - scl enable devtoolset-7 "make BUILD_TLS=yes REDIS_CFLAGS='-Werror'" + scl enable devtoolset-7 "make BUILD_TLS=module REDIS_CFLAGS='-Werror'" - name: testprep run: | yum -y install tcl tcltls tclx @@ -547,19 +547,19 @@ jobs: - name: test if: true && !contains(github.event.inputs.skiptests, 'redis') run: | - ./runtest --accurate --verbose --dump-logs --tls --dump-logs ${{github.event.inputs.test_args}} + ./runtest --accurate --verbose --dump-logs --tls-module --dump-logs ${{github.event.inputs.test_args}} - name: module api test if: true && !contains(github.event.inputs.skiptests, 'modules') run: | - ./runtest-moduleapi --verbose --dump-logs --tls --dump-logs ${{github.event.inputs.test_args}} + ./runtest-moduleapi --verbose --dump-logs --tls-module --dump-logs ${{github.event.inputs.test_args}} - name: sentinel tests if: true && !contains(github.event.inputs.skiptests, 'sentinel') run: | - ./runtest-sentinel --tls ${{github.event.inputs.cluster_test_args}} + ./runtest-sentinel ${{github.event.inputs.cluster_test_args}} - name: cluster tests if: true && !contains(github.event.inputs.skiptests, 'cluster') run: | - ./runtest-cluster --tls ${{github.event.inputs.cluster_test_args}} + ./runtest-cluster --tls-module ${{github.event.inputs.cluster_test_args}} test-centos7-tls-no-tls: runs-on: ubuntu-latest @@ -582,7 +582,7 @@ jobs: run: | yum -y install centos-release-scl epel-release yum -y install devtoolset-7 openssl-devel openssl - scl enable devtoolset-7 "make BUILD_TLS=yes REDIS_CFLAGS='-Werror'" + scl enable devtoolset-7 "make BUILD_TLS=module REDIS_CFLAGS='-Werror'" - name: testprep run: | yum -y install tcl tcltls tclx @@ -9,8 +9,14 @@ Getting Started To build with TLS support you'll need OpenSSL development libraries (e.g. libssl-dev on Debian/Ubuntu). +To build TLS support as Redis built-in: Run `make BUILD_TLS=yes`. +Or to build TLS as Redis module: +Run `make BUILD_TLS=module`. + +Note that sentinel mode does not support TLS module. + ### Tests To run Redis test suite with TLS, you'll need TLS support for TCL (i.e. @@ -22,16 +28,27 @@ To run Redis test suite with TLS, you'll need TLS support for TCL (i.e. 2. Run `./runtest --tls` or `./runtest-cluster --tls` to run Redis and Redis Cluster tests in TLS mode. +3. Run `./runtest --tls-module` or `./runtest-cluster --tls-module` to + run Redis and Redis cluster tests in TLS mode with Redis module. + ### Running manually To manually run a Redis server with TLS mode (assuming `gen-test-certs.sh` was invoked so sample certificates/keys are available): +For TLS built-in mode: ./src/redis-server --tls-port 6379 --port 0 \ --tls-cert-file ./tests/tls/redis.crt \ --tls-key-file ./tests/tls/redis.key \ --tls-ca-cert-file ./tests/tls/ca.crt +For TLS module mode: + ./src/redis-server --tls-port 6379 --port 0 \ + --tls-cert-file ./tests/tls/redis.crt \ + --tls-key-file ./tests/tls/redis.key \ + --tls-ca-cert-file ./tests/tls/ca.crt \ + --loadmodule src/redis-tls.so + To connect to this Redis server with `redis-cli`: ./src/redis-cli --tls \ diff --git a/deps/Makefile b/deps/Makefile index 8592e1766..96dbb8c1d 100644 --- a/deps/Makefile +++ b/deps/Makefile @@ -44,7 +44,7 @@ distclean: .PHONY: distclean -ifeq ($(BUILD_TLS),yes) +ifneq (,$(filter $(BUILD_TLS),yes module)) HIREDIS_MAKE_FLAGS = USE_SSL=1 endif diff --git a/src/Makefile b/src/Makefile index 097ef2454..fdfef2b3c 100644 --- a/src/Makefile +++ b/src/Makefile @@ -267,24 +267,41 @@ ifeq ($(MALLOC),jemalloc) FINAL_LIBS := ../deps/jemalloc/lib/libjemalloc.a $(FINAL_LIBS) endif -ifeq ($(BUILD_TLS),yes) - FINAL_CFLAGS+=-DUSE_OPENSSL $(OPENSSL_CFLAGS) - FINAL_LDFLAGS+=$(OPENSSL_LDFLAGS) - LIBSSL_PKGCONFIG := $(shell $(PKG_CONFIG) --exists libssl && echo $$?) +# LIBSSL & LIBCRYPTO +LIBSSL_LIBS= +LIBSSL_PKGCONFIG := $(shell $(PKG_CONFIG) --exists libssl && echo $$?) ifeq ($(LIBSSL_PKGCONFIG),0) LIBSSL_LIBS=$(shell $(PKG_CONFIG) --libs libssl) else LIBSSL_LIBS=-lssl endif - LIBCRYPTO_PKGCONFIG := $(shell $(PKG_CONFIG) --exists libcrypto && echo $$?) +LIBCRYPTO_LIBS= +LIBCRYPTO_PKGCONFIG := $(shell $(PKG_CONFIG) --exists libcrypto && echo $$?) ifeq ($(LIBCRYPTO_PKGCONFIG),0) LIBCRYPTO_LIBS=$(shell $(PKG_CONFIG) --libs libcrypto) else LIBCRYPTO_LIBS=-lcrypto endif + +BUILD_NO:=0 +BUILD_YES:=1 +BUILD_MODULE:=2 +ifeq ($(BUILD_TLS),yes) + FINAL_CFLAGS+=-DUSE_OPENSSL=$(BUILD_YES) $(OPENSSL_CFLAGS) -DBUILD_TLS_MODULE=$(BUILD_NO) + FINAL_LDFLAGS+=$(OPENSSL_LDFLAGS) FINAL_LIBS += ../deps/hiredis/libhiredis_ssl.a $(LIBSSL_LIBS) $(LIBCRYPTO_LIBS) endif +TLS_MODULE= +TLS_MODULE_NAME:=redis-tls$(PROG_SUFFIX).so +TLS_MODULE_CFLAGS:=$(FINAL_CFLAGS) +ifeq ($(BUILD_TLS),module) + FINAL_CFLAGS+=-DUSE_OPENSSL=$(BUILD_MODULE) $(OPENSSL_CFLAGS) + TLS_CLIENT_LIBS = ../deps/hiredis/libhiredis_ssl.a $(LIBSSL_LIBS) $(LIBCRYPTO_LIBS) + TLS_MODULE=$(TLS_MODULE_NAME) + TLS_MODULE_CFLAGS+=-DUSE_OPENSSL=$(BUILD_MODULE) $(OPENSSL_CFLAGS) -DBUILD_TLS_MODULE=$(BUILD_MODULE) +endif + ifndef V define MAKE_INSTALL @printf ' %b %b\n' $(LINKCOLOR)INSTALL$(ENDCOLOR) $(BINCOLOR)$(1)$(ENDCOLOR) 1>&2 @@ -316,7 +333,7 @@ endif REDIS_SERVER_NAME=redis-server$(PROG_SUFFIX) REDIS_SENTINEL_NAME=redis-sentinel$(PROG_SUFFIX) -REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o connection.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o +REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o REDIS_CLI_NAME=redis-cli$(PROG_SUFFIX) REDIS_CLI_OBJ=anet.o adlist.o dict.o redis-cli.o zmalloc.o release.o ae.o redisassert.o crcspeed.o crc64.o siphash.o crc16.o monotonic.o cli_common.o mt19937-64.o strl.o REDIS_BENCHMARK_NAME=redis-benchmark$(PROG_SUFFIX) @@ -325,7 +342,7 @@ REDIS_CHECK_RDB_NAME=redis-check-rdb$(PROG_SUFFIX) REDIS_CHECK_AOF_NAME=redis-check-aof$(PROG_SUFFIX) ALL_SOURCES=$(sort $(patsubst %.o,%.c,$(REDIS_SERVER_OBJ) $(REDIS_CLI_OBJ) $(REDIS_BENCHMARK_OBJ))) -all: $(REDIS_SERVER_NAME) $(REDIS_SENTINEL_NAME) $(REDIS_CLI_NAME) $(REDIS_BENCHMARK_NAME) $(REDIS_CHECK_RDB_NAME) $(REDIS_CHECK_AOF_NAME) +all: $(REDIS_SERVER_NAME) $(REDIS_SENTINEL_NAME) $(REDIS_CLI_NAME) $(REDIS_BENCHMARK_NAME) $(REDIS_CHECK_RDB_NAME) $(REDIS_CHECK_AOF_NAME) $(TLS_MODULE) @echo "" @echo "Hint: It's a good idea to run 'make test' ;)" @echo "" @@ -385,13 +402,17 @@ $(REDIS_CHECK_RDB_NAME): $(REDIS_SERVER_NAME) $(REDIS_CHECK_AOF_NAME): $(REDIS_SERVER_NAME) $(REDIS_INSTALL) $(REDIS_SERVER_NAME) $(REDIS_CHECK_AOF_NAME) +# redis-tls.so +$(TLS_MODULE_NAME): $(REDIS_SERVER_NAME) + $(QUIET_CC)$(CC) -o $@ tls.c -shared -fPIC $(TLS_MODULE_CFLAGS) $(TLS_CLIENT_LIBS) + # redis-cli $(REDIS_CLI_NAME): $(REDIS_CLI_OBJ) - $(REDIS_LD) -o $@ $^ ../deps/hiredis/libhiredis.a ../deps/linenoise/linenoise.o $(FINAL_LIBS) + $(REDIS_LD) -o $@ $^ ../deps/hiredis/libhiredis.a ../deps/linenoise/linenoise.o $(FINAL_LIBS) $(TLS_CLIENT_LIBS) # redis-benchmark $(REDIS_BENCHMARK_NAME): $(REDIS_BENCHMARK_OBJ) - $(REDIS_LD) -o $@ $^ ../deps/hiredis/libhiredis.a ../deps/hdr_histogram/libhdrhistogram.a $(FINAL_LIBS) + $(REDIS_LD) -o $@ $^ ../deps/hiredis/libhiredis.a ../deps/hdr_histogram/libhdrhistogram.a $(FINAL_LIBS) $(TLS_CLIENT_LIBS) DEP = $(REDIS_SERVER_OBJ:%.o=%.d) $(REDIS_CLI_OBJ:%.o=%.d) $(REDIS_BENCHMARK_OBJ:%.o=%.d) -include $(DEP) @@ -410,7 +431,7 @@ commands.c: commands/*.json ../utils/generate-command-code.py endif clean: - rm -rf $(REDIS_SERVER_NAME) $(REDIS_SENTINEL_NAME) $(REDIS_CLI_NAME) $(REDIS_BENCHMARK_NAME) $(REDIS_CHECK_RDB_NAME) $(REDIS_CHECK_AOF_NAME) *.o *.gcda *.gcno *.gcov redis.info lcov-html Makefile.dep + rm -rf $(REDIS_SERVER_NAME) $(REDIS_SENTINEL_NAME) $(REDIS_CLI_NAME) $(REDIS_BENCHMARK_NAME) $(REDIS_CHECK_RDB_NAME) $(REDIS_CHECK_AOF_NAME) *.o *.gcda *.gcno *.gcov redis.info lcov-html Makefile.dep *.so rm -f $(DEP) .PHONY: clean diff --git a/src/anet.c b/src/anet.c index 753f2fe42..00c30b83d 100644 --- a/src/anet.c +++ b/src/anet.c @@ -62,6 +62,15 @@ static void anetSetError(char *err, const char *fmt, ...) va_end(ap); } +int anetGetError(int fd) { + int sockerr = 0; + socklen_t errlen = sizeof(sockerr); + + if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &sockerr, &errlen) == -1) + sockerr = errno; + return sockerr; +} + int anetSetBlock(char *err, int fd, int non_block) { int flags; @@ -570,11 +579,11 @@ int anetUnixAccept(char *err, int s) { return fd; } -int anetFdToString(int fd, char *ip, size_t ip_len, int *port, int fd_to_str_type) { +int anetFdToString(int fd, char *ip, size_t ip_len, int *port, int remote) { struct sockaddr_storage sa; socklen_t salen = sizeof(sa); - if (fd_to_str_type == FD_TO_PEER_NAME) { + if (remote) { if (getpeername(fd, (struct sockaddr *)&sa, &salen) == -1) goto error; } else { if (getsockname(fd, (struct sockaddr *)&sa, &salen) == -1) goto error; @@ -618,23 +627,6 @@ error: return -1; } -/* Format an IP,port pair into something easy to parse. If IP is IPv6 - * (matches for ":"), the ip is surrounded by []. IP and port are just - * separated by colons. This the standard to display addresses within Redis. */ -int anetFormatAddr(char *buf, size_t buf_len, char *ip, int port) { - return snprintf(buf,buf_len, strchr(ip,':') ? - "[%s]:%d" : "%s:%d", ip, port); -} - -/* Like anetFormatAddr() but extract ip and port from the socket's peer/sockname. */ -int anetFormatFdAddr(int fd, char *buf, size_t buf_len, int fd_to_str_type) { - char ip[INET6_ADDRSTRLEN]; - int port; - - anetFdToString(fd,ip,sizeof(ip),&port,fd_to_str_type); - return anetFormatAddr(buf, buf_len, ip, port); -} - /* Create a pipe buffer with given flags for read end and write end. * Note that it supports the file flags defined by pipe2() and fcntl(F_SETFL), * and one of the use cases is O_CLOEXEC|O_NONBLOCK. */ diff --git a/src/anet.h b/src/anet.h index ff86e2029..b571e52c1 100644 --- a/src/anet.h +++ b/src/anet.h @@ -49,10 +49,6 @@ #undef ip_len #endif -/* FD to address string conversion types */ -#define FD_TO_PEER_NAME 0 -#define FD_TO_SOCK_NAME 1 - int anetTcpNonBlockConnect(char *err, const char *addr, int port); int anetTcpNonBlockBestEffortBindConnect(char *err, const char *addr, int port, const char *source_addr); int anetResolve(char *err, char *host, char *ipbuf, size_t ipbuf_len, int flags); @@ -68,11 +64,11 @@ int anetEnableTcpNoDelay(char *err, int fd); int anetDisableTcpNoDelay(char *err, int fd); int anetSendTimeout(char *err, int fd, long long ms); int anetRecvTimeout(char *err, int fd, long long ms); -int anetFdToString(int fd, char *ip, size_t ip_len, int *port, int fd_to_str_type); +int anetFdToString(int fd, char *ip, size_t ip_len, int *port, int remote); int anetKeepAlive(char *err, int fd, int interval); int anetFormatAddr(char *fmt, size_t fmt_len, char *ip, int port); -int anetFormatFdAddr(int fd, char *buf, size_t buf_len, int fd_to_str_type); int anetPipe(int fds[2], int read_flags, int write_flags); int anetSetSockMarkId(char *err, int fd, uint32_t id); +int anetGetError(int fd); #endif diff --git a/src/cluster.c b/src/cluster.c index c739a3892..2a259b83c 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -119,6 +119,14 @@ dictType clusterNodesBlackListDictType = { NULL /* allow to expand */ }; +static ConnectionType *connTypeOfCluster() { + if (server.tls_cluster) { + return connectionTypeTls(); + } + + return connectionTypeTcp(); +} + /* ----------------------------------------------------------------------------- * Initialization * -------------------------------------------------------------------------- */ @@ -670,9 +678,6 @@ void clusterInit(void) { } if (saveconf) clusterSaveConfigOrDie(1); - /* We need a listening TCP port for our cluster messaging needs. */ - server.cfd.count = 0; - /* Port sanity check II * The other handshake port check is triggered too late to stop * us from trying to use a too-high cluster port number. */ @@ -688,14 +693,25 @@ void clusterInit(void) { serverLog(LL_WARNING, "No bind address is configured, but it is required for the Cluster bus."); exit(1); } - int cport = server.cluster_port ? server.cluster_port : port + CLUSTER_PORT_INCR; - if (listenToPort(cport, &server.cfd) == C_ERR ) { + + if (connectionIndexByType(connTypeOfCluster()->get_type(NULL)) < 0) { + serverLog(LL_WARNING, "Missing connection type %s, but it is required for the Cluster bus.", connTypeOfCluster()->get_type(NULL)); + exit(1); + } + + connListener *listener = &server.clistener; + listener->count = 0; + listener->bindaddr = server.bindaddr; + listener->bindaddr_count = server.bindaddr_count; + listener->port = server.cluster_port ? server.cluster_port : port + CLUSTER_PORT_INCR; + listener->ct = connTypeOfCluster(); + if (connListen(listener) == C_ERR ) { /* Note: the following log text is matched by the test suite. */ - serverLog(LL_WARNING, "Failed listening on port %u (cluster), aborting.", cport); + serverLog(LL_WARNING, "Failed listening on port %u (cluster), aborting.", listener->port); exit(1); } - if (createSocketAcceptHandler(&server.cfd, clusterAcceptHandler) != C_OK) { + if (createSocketAcceptHandler(&server.clistener, clusterAcceptHandler) != C_OK) { serverPanic("Unrecoverable error creating Redis Cluster socket accept handler."); } @@ -865,6 +881,7 @@ void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) { int cport, cfd; int max = MAX_CLUSTER_ACCEPTS_PER_CALL; char cip[NET_IP_STR_LEN]; + int require_auth = TLS_CLIENT_AUTH_YES; UNUSED(el); UNUSED(mask); UNUSED(privdata); @@ -882,8 +899,7 @@ void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) { return; } - connection *conn = server.tls_cluster ? - connCreateAcceptedTLS(cfd, TLS_CLIENT_AUTH_YES) : connCreateAcceptedSocket(cfd); + connection *conn = connCreateAccepted(connTypeOfCluster(), cfd, &require_auth); /* Make sure connection is not in an error state */ if (connGetState(conn) != CONN_STATE_ACCEPTING) { @@ -1769,7 +1785,7 @@ int nodeIp2String(char *buf, clusterLink *link, char *announced_ip) { buf[NET_IP_STR_LEN-1] = '\0'; /* We are not sure the input is sane. */ return C_OK; } else { - if (connPeerToString(link->conn, buf, NET_IP_STR_LEN, NULL) == C_ERR) { + if (connAddrPeerName(link->conn, buf, NET_IP_STR_LEN, NULL) == C_ERR) { serverLog(LL_NOTICE, "Error converting peer IP to string: %s", link->conn ? connGetLastError(link->conn) : "no link"); return C_ERR; @@ -2273,7 +2289,7 @@ int clusterProcessPacket(clusterLink *link) { { char ip[NET_IP_STR_LEN]; - if (connSockName(link->conn,ip,sizeof(ip),NULL) != -1 && + if (connAddrSockName(link->conn,ip,sizeof(ip),NULL) != -1 && strcmp(ip,myself->ip)) { memcpy(myself->ip,ip,NET_IP_STR_LEN); @@ -3969,7 +3985,7 @@ static int clusterNodeCronHandleReconnect(clusterNode *node, mstime_t handshake_ if (node->link == NULL) { clusterLink *link = createClusterLink(node); - link->conn = server.tls_cluster ? connCreateTLS() : connCreateSocket(); + link->conn = connCreate(connTypeOfCluster()); connSetPrivateData(link->conn, link); if (connConnect(link->conn, node->ip, node->cport, server.bind_source_addr, clusterLinkConnectHandler) == -1) { @@ -5022,7 +5038,7 @@ void addNodeToNodeReply(client *c, clusterNode *node) { /* Report non-TLS ports to non-TLS client in TLS cluster if available. */ int use_pport = (server.tls_cluster && - c->conn && connGetType(c->conn) != CONN_TYPE_TLS); + c->conn && (c->conn->type != connectionTypeTls())); addReplyLongLong(c, use_pport && node->pport ? node->pport : node->port); addReplyBulkCBuffer(c, node->name, CLUSTER_NAMELEN); @@ -5327,7 +5343,7 @@ NULL /* Report plaintext ports, only if cluster is TLS but client is known to * be non-TLS). */ int use_pport = (server.tls_cluster && - c->conn && connGetType(c->conn) != CONN_TYPE_TLS); + c->conn && (c->conn->type != connectionTypeTls())); sds nodes = clusterGenNodesDescription(0, use_pport); addReplyVerbatim(c,nodes,sdslen(nodes),"txt"); sdsfree(nodes); @@ -5759,7 +5775,7 @@ NULL /* Use plaintext port if cluster is TLS but client is non-TLS. */ int use_pport = (server.tls_cluster && - c->conn && connGetType(c->conn) != CONN_TYPE_TLS); + c->conn && (c->conn->type != connectionTypeTls())); addReplyArrayLen(c,n->numslaves); for (j = 0; j < n->numslaves; j++) { sds ni = clusterGenNodeDescription(n->slaves[j], use_pport); @@ -6175,8 +6191,8 @@ migrateCachedSocket* migrateGetSocket(client *c, robj *host, robj *port, long ti dictDelete(server.migrate_cached_sockets,dictGetKey(de)); } - /* Create the socket */ - conn = server.tls_cluster ? connCreateTLS() : connCreateSocket(); + /* Create the connection */ + conn = connCreate(connTypeOfCluster()); if (connBlockingConnect(conn, host->ptr, atoi(port->ptr), timeout) != C_OK) { addReplyError(c,"-IOERR error or timeout connecting to the client"); @@ -6888,7 +6904,7 @@ void clusterRedirectClient(client *c, clusterNode *n, int hashslot, int error_co /* Redirect to IP:port. Include plaintext port if cluster is TLS but * client is non-TLS. */ int use_pport = (server.tls_cluster && - c->conn && connGetType(c->conn) != CONN_TYPE_TLS); + c->conn && (c->conn->type != connectionTypeTls())); int port = use_pport && n->pport ? n->pport : n->port; addReplyErrorSds(c,sdscatprintf(sdsempty(), "-%s %d %s:%d", diff --git a/src/config.c b/src/config.c index f98225cbe..20a918dd8 100644 --- a/src/config.c +++ b/src/config.c @@ -30,6 +30,7 @@ #include "server.h" #include "cluster.h" +#include "connection.h" #include <fcntl.h> #include <sys/stat.h> @@ -2429,7 +2430,14 @@ static int updateHZ(const char **err) { } static int updatePort(const char **err) { - if (changeListenPort(server.port, &server.ipfd, acceptTcpHandler) == C_ERR) { + connListener *listener = listenerByType(CONN_TYPE_SOCKET); + + serverAssert(listener != NULL); + listener->bindaddr = server.bindaddr; + listener->bindaddr_count = server.bindaddr_count; + listener->port = server.port; + listener->ct = connectionByType(CONN_TYPE_SOCKET); + if (changeListener(listener) == C_ERR) { *err = "Unable to listen on this port. Check server logs."; return 0; } @@ -2544,12 +2552,36 @@ int updateRequirePass(const char **err) { return 1; } +/* applyBind affects both TCP and TLS (if enabled) together */ static int applyBind(const char **err) { - if (changeBindAddr() == C_ERR) { + connListener *tcp_listener = listenerByType(CONN_TYPE_SOCKET); + connListener *tls_listener = listenerByType(CONN_TYPE_TLS); + + serverAssert(tcp_listener != NULL); + tcp_listener->bindaddr = server.bindaddr; + tcp_listener->bindaddr_count = server.bindaddr_count; + tcp_listener->port = server.port; + tcp_listener->ct = connectionByType(CONN_TYPE_SOCKET); + if (changeListener(tcp_listener) == C_ERR) { *err = "Failed to bind to specified addresses."; + if (tls_listener) + closeListener(tls_listener); /* failed with TLS together */ return 0; } + if (server.tls_port != 0) { + serverAssert(tls_listener != NULL); + tls_listener->bindaddr = server.bindaddr; + tls_listener->bindaddr_count = server.bindaddr_count; + tls_listener->port = server.tls_port; + tls_listener->ct = connectionByType(CONN_TYPE_TLS); + if (changeListener(tls_listener) == C_ERR) { + *err = "Failed to bind to specified addresses."; + closeListener(tcp_listener); /* failed with TCP together */ + return 0; + } + } + return 1; } @@ -2571,13 +2603,12 @@ int updateClusterHostname(const char **err) { return 1; } -#ifdef USE_OPENSSL static int applyTlsCfg(const char **err) { UNUSED(err); /* If TLS is enabled, try to configure OpenSSL. */ if ((server.tls_port || server.tls_replication || server.tls_cluster) - && tlsConfigure(&server.tls_ctx_config) == C_ERR) { + && connTypeConfigure(connectionTypeTls(), &server.tls_ctx_config, 1) == C_ERR) { *err = "Unable to update TLS configuration. Check server logs."; return 0; } @@ -2586,12 +2617,18 @@ static int applyTlsCfg(const char **err) { static int applyTLSPort(const char **err) { /* Configure TLS in case it wasn't enabled */ - if (!isTlsConfigured() && tlsConfigure(&server.tls_ctx_config) == C_ERR) { + if (connTypeConfigure(connectionTypeTls(), &server.tls_ctx_config, 0) == C_ERR) { *err = "Unable to update TLS configuration. Check server logs."; return 0; } - if (changeListenPort(server.tls_port, &server.tlsfd, acceptTLSHandler) == C_ERR) { + connListener *listener = listenerByType(CONN_TYPE_TLS); + serverAssert(listener != NULL); + listener->bindaddr = server.bindaddr; + listener->bindaddr_count = server.bindaddr_count; + listener->port = server.tls_port; + listener->ct = connectionByType(CONN_TYPE_TLS); + if (changeListener(listener) == C_ERR) { *err = "Unable to listen on this port. Check server logs."; return 0; } @@ -2599,8 +2636,6 @@ static int applyTLSPort(const char **err) { return 1; } -#endif /* USE_OPENSSL */ - static int setConfigDirOption(standardConfig *config, sds *argv, int argc, const char **err) { UNUSED(config); if (argc != 1) { @@ -3109,7 +3144,6 @@ standardConfig static_configs[] = { createOffTConfig("auto-aof-rewrite-min-size", NULL, MODIFIABLE_CONFIG, 0, LLONG_MAX, server.aof_rewrite_min_size, 64*1024*1024, MEMORY_CONFIG, NULL, NULL), createOffTConfig("loading-process-events-interval-bytes", NULL, MODIFIABLE_CONFIG | HIDDEN_CONFIG, 1024, INT_MAX, server.loading_process_events_interval_bytes, 1024*1024*2, INTEGER_CONFIG, NULL, NULL), -#ifdef USE_OPENSSL createIntConfig("tls-port", NULL, MODIFIABLE_CONFIG, 0, 65535, server.tls_port, 0, INTEGER_CONFIG, NULL, applyTLSPort), /* TCP port. */ createIntConfig("tls-session-cache-size", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.tls_ctx_config.session_cache_size, 20*1024, INTEGER_CONFIG, NULL, applyTlsCfg), createIntConfig("tls-session-cache-timeout", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.tls_ctx_config.session_cache_timeout, 300, INTEGER_CONFIG, NULL, applyTlsCfg), @@ -3130,7 +3164,6 @@ standardConfig static_configs[] = { createStringConfig("tls-protocols", NULL, MODIFIABLE_CONFIG, EMPTY_STRING_IS_NULL, server.tls_ctx_config.protocols, NULL, NULL, applyTlsCfg), createStringConfig("tls-ciphers", NULL, MODIFIABLE_CONFIG, EMPTY_STRING_IS_NULL, server.tls_ctx_config.ciphers, NULL, NULL, applyTlsCfg), createStringConfig("tls-ciphersuites", NULL, MODIFIABLE_CONFIG, EMPTY_STRING_IS_NULL, server.tls_ctx_config.ciphersuites, NULL, NULL, applyTlsCfg), -#endif /* Special configs */ createSpecialConfig("dir", NULL, MODIFIABLE_CONFIG | PROTECTED_CONFIG | DENY_LOADING_CONFIG, setConfigDirOption, getConfigDirOption, rewriteConfigDirOption, NULL), diff --git a/src/connection.c b/src/connection.c index f61ed2404..5a5cd2767 100644 --- a/src/connection.c +++ b/src/connection.c @@ -1,453 +1,204 @@ -/* - * Copyright (c) 2019, Redis Labs - * All rights reserved. +/* ========================================================================== + * connection.c - connection layer framework + * -------------------------------------------------------------------------- + * Copyright (C) 2022 zhenwei pi * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the + * "Software"), to deal in the Software without restriction, including + * without limitation the rights to use, copy, modify, merge, publish, + * distribute, sublicense, and/or sell copies of the Software, and to permit + * persons to whom the Software is furnished to do so, subject to the + * following conditions: * - * * Redistributions of source code must retain the above copyright notice, - * this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution. - * * Neither the name of Redis nor the names of its contributors may be used - * to endorse or promote products derived from this software without - * specific prior written permission. + * The above copyright notice and this permission notice shall be included + * in all copies or substantial portions of the Software. * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" - * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE - * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE - * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE - * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR - * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF - * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS - * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN - * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) - * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE - * POSSIBILITY OF SUCH DAMAGE. + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS + * OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN + * NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, + * DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR + * OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE + * USE OR OTHER DEALINGS IN THE SOFTWARE. + * ========================================================================== */ #include "server.h" -#include "connhelpers.h" +#include "connection.h" -/* The connections module provides a lean abstraction of network connections - * to avoid direct socket and async event management across the Redis code base. - * - * It does NOT provide advanced connection features commonly found in similar - * libraries such as complete in/out buffer management, throttling, etc. These - * functions remain in networking.c. - * - * The primary goal is to allow transparent handling of TCP and TLS based - * connections. To do so, connections have the following properties: - * - * 1. A connection may live before its corresponding socket exists. This - * allows various context and configuration setting to be handled before - * establishing the actual connection. - * 2. The caller may register/unregister logical read/write handlers to be - * called when the connection has data to read from/can accept writes. - * These logical handlers may or may not correspond to actual AE events, - * depending on the implementation (for TCP they are; for TLS they aren't). - */ - -ConnectionType CT_Socket; - -/* When a connection is created we must know its type already, but the - * underlying socket may or may not exist: - * - * - For accepted connections, it exists as we do not model the listen/accept - * part; So caller calls connCreateSocket() followed by connAccept(). - * - For outgoing connections, the socket is created by the connection module - * itself; So caller calls connCreateSocket() followed by connConnect(), - * which registers a connect callback that fires on connected/error state - * (and after any transport level handshake was done). - * - * NOTE: An earlier version relied on connections being part of other structs - * and not independently allocated. This could lead to further optimizations - * like using container_of(), etc. However it was discontinued in favor of - * this approach for these reasons: - * - * 1. In some cases conns are created/handled outside the context of the - * containing struct, in which case it gets a bit awkward to copy them. - * 2. Future implementations may wish to allocate arbitrary data for the - * connection. - * 3. The container_of() approach is anyway risky because connections may - * be embedded in different structs, not just client. - */ - -connection *connCreateSocket() { - connection *conn = zcalloc(sizeof(connection)); - conn->type = &CT_Socket; - conn->fd = -1; +static ConnectionType *connTypes[CONN_TYPE_MAX]; - return conn; -} +int connTypeRegister(ConnectionType *ct) { + const char *typename = ct->get_type(NULL); + ConnectionType *tmpct; + int type; -/* Create a new socket-type connection that is already associated with - * an accepted connection. - * - * The socket is not ready for I/O until connAccept() was called and - * invoked the connection-level accept handler. - * - * Callers should use connGetState() and verify the created connection - * is not in an error state (which is not possible for a socket connection, - * but could but possible with other protocols). - */ -connection *connCreateAcceptedSocket(int fd) { - connection *conn = connCreateSocket(); - conn->fd = fd; - conn->state = CONN_STATE_ACCEPTING; - return conn; -} + /* find an empty slot to store the new connection type */ + for (type = 0; type < CONN_TYPE_MAX; type++) { + tmpct = connTypes[type]; + if (!tmpct) + break; -static int connSocketConnect(connection *conn, const char *addr, int port, const char *src_addr, - ConnectionCallbackFunc connect_handler) { - int fd = anetTcpNonBlockBestEffortBindConnect(NULL,addr,port,src_addr); - if (fd == -1) { - conn->state = CONN_STATE_ERROR; - conn->last_errno = errno; - return C_ERR; + /* ignore case, we really don't care "tls"/"TLS" */ + if (!strcasecmp(typename, tmpct->get_type(NULL))) { + serverLog(LL_WARNING, "Connection types %s already registered", typename); + return C_ERR; + } } - conn->fd = fd; - conn->state = CONN_STATE_CONNECTING; + serverLog(LL_VERBOSE, "Connection type %s registered", typename); + connTypes[type] = ct; - conn->conn_handler = connect_handler; - aeCreateFileEvent(server.el, conn->fd, AE_WRITABLE, - conn->type->ae_handler, conn); + if (ct->init) { + ct->init(); + } return C_OK; } -/* Returns true if a write handler is registered */ -int connHasWriteHandler(connection *conn) { - return conn->write_handler != NULL; -} +int connTypeInitialize() { + /* currently socket connection type is necessary */ + serverAssert(RedisRegisterConnectionTypeSocket() == C_OK); -/* Returns true if a read handler is registered */ -int connHasReadHandler(connection *conn) { - return conn->read_handler != NULL; -} + /* currently unix socket connection type is necessary */ + serverAssert(RedisRegisterConnectionTypeUnix() == C_OK); -/* Associate a private data pointer with the connection */ -void connSetPrivateData(connection *conn, void *data) { - conn->private_data = data; -} + /* may fail if without BUILD_TLS=yes */ + RedisRegisterConnectionTypeTLS(); -/* Get the associated private data pointer */ -void *connGetPrivateData(connection *conn) { - return conn->private_data; + return C_OK; } -/* ------ Pure socket connections ------- */ +ConnectionType *connectionByType(const char *typename) { + ConnectionType *ct; -/* A very incomplete list of implementation-specific calls. Much of the above shall - * move here as we implement additional connection types. - */ + for (int type = 0; type < CONN_TYPE_MAX; type++) { + ct = connTypes[type]; + if (!ct) + break; -/* Close the connection and free resources. */ -static void connSocketClose(connection *conn) { - if (conn->fd != -1) { - aeDeleteFileEvent(server.el,conn->fd, AE_READABLE | AE_WRITABLE); - close(conn->fd); - conn->fd = -1; + if (!strcasecmp(typename, ct->get_type(NULL))) + return ct; } - /* If called from within a handler, schedule the close but - * keep the connection until the handler returns. - */ - if (connHasRefs(conn)) { - conn->flags |= CONN_FLAG_CLOSE_SCHEDULED; - return; - } + serverLog(LL_WARNING, "Missing implement of connection type %s", typename); - zfree(conn); + return NULL; } -static int connSocketWrite(connection *conn, const void *data, size_t data_len) { - int ret = write(conn->fd, data, data_len); - if (ret < 0 && errno != EAGAIN) { - conn->last_errno = errno; - - /* Don't overwrite the state of a connection that is not already - * connected, not to mess with handler callbacks. - */ - if (errno != EINTR && conn->state == CONN_STATE_CONNECTED) - conn->state = CONN_STATE_ERROR; - } +/* Cache TCP connection type, query it by string once */ +ConnectionType *connectionTypeTcp() { + static ConnectionType *ct_tcp = NULL; - return ret; -} - -static int connSocketWritev(connection *conn, const struct iovec *iov, int iovcnt) { - int ret = writev(conn->fd, iov, iovcnt); - if (ret < 0 && errno != EAGAIN) { - conn->last_errno = errno; - - /* Don't overwrite the state of a connection that is not already - * connected, not to mess with handler callbacks. - */ - if (errno != EINTR && conn->state == CONN_STATE_CONNECTED) - conn->state = CONN_STATE_ERROR; - } - - return ret; -} + if (ct_tcp != NULL) + return ct_tcp; -static int connSocketRead(connection *conn, void *buf, size_t buf_len) { - int ret = read(conn->fd, buf, buf_len); - if (!ret) { - conn->state = CONN_STATE_CLOSED; - } else if (ret < 0 && errno != EAGAIN) { - conn->last_errno = errno; - - /* Don't overwrite the state of a connection that is not already - * connected, not to mess with handler callbacks. - */ - if (errno != EINTR && conn->state == CONN_STATE_CONNECTED) - conn->state = CONN_STATE_ERROR; - } + ct_tcp = connectionByType(CONN_TYPE_SOCKET); + serverAssert(ct_tcp != NULL); - return ret; + return ct_tcp; } -static int connSocketAccept(connection *conn, ConnectionCallbackFunc accept_handler) { - int ret = C_OK; +/* Cache TLS connection type, query it by string once */ +ConnectionType *connectionTypeTls() { + static ConnectionType *ct_tls = NULL; - if (conn->state != CONN_STATE_ACCEPTING) return C_ERR; - conn->state = CONN_STATE_CONNECTED; + if (ct_tls != NULL) + return ct_tls; - connIncrRefs(conn); - if (!callHandler(conn, accept_handler)) ret = C_ERR; - connDecrRefs(conn); - - return ret; + ct_tls = connectionByType(CONN_TYPE_TLS); + return ct_tls; } -/* Register a write handler, to be called when the connection is writable. - * If NULL, the existing handler is removed. - * - * The barrier flag indicates a write barrier is requested, resulting with - * CONN_FLAG_WRITE_BARRIER set. This will ensure that the write handler is - * always called before and not after the read handler in a single event - * loop. - */ -static int connSocketSetWriteHandler(connection *conn, ConnectionCallbackFunc func, int barrier) { - if (func == conn->write_handler) return C_OK; - - conn->write_handler = func; - if (barrier) - conn->flags |= CONN_FLAG_WRITE_BARRIER; - else - conn->flags &= ~CONN_FLAG_WRITE_BARRIER; - if (!conn->write_handler) - aeDeleteFileEvent(server.el,conn->fd,AE_WRITABLE); - else - if (aeCreateFileEvent(server.el,conn->fd,AE_WRITABLE, - conn->type->ae_handler,conn) == AE_ERR) return C_ERR; - return C_OK; -} +/* Cache Unix connection type, query it by string once */ +ConnectionType *connectionTypeUnix() { + static ConnectionType *ct_unix = NULL; -/* Register a read handler, to be called when the connection is readable. - * If NULL, the existing handler is removed. - */ -static int connSocketSetReadHandler(connection *conn, ConnectionCallbackFunc func) { - if (func == conn->read_handler) return C_OK; - - conn->read_handler = func; - if (!conn->read_handler) - aeDeleteFileEvent(server.el,conn->fd,AE_READABLE); - else - if (aeCreateFileEvent(server.el,conn->fd, - AE_READABLE,conn->type->ae_handler,conn) == AE_ERR) return C_ERR; - return C_OK; -} + if (ct_unix != NULL) + return ct_unix; -static const char *connSocketGetLastError(connection *conn) { - return strerror(conn->last_errno); + ct_unix = connectionByType(CONN_TYPE_UNIX); + return ct_unix; } -static void connSocketEventHandler(struct aeEventLoop *el, int fd, void *clientData, int mask) -{ - UNUSED(el); - UNUSED(fd); - connection *conn = clientData; - - if (conn->state == CONN_STATE_CONNECTING && - (mask & AE_WRITABLE) && conn->conn_handler) { - - int conn_error = connGetSocketError(conn); - if (conn_error) { - conn->last_errno = conn_error; - conn->state = CONN_STATE_ERROR; - } else { - conn->state = CONN_STATE_CONNECTED; - } - - if (!conn->write_handler) aeDeleteFileEvent(server.el,conn->fd,AE_WRITABLE); +int connectionIndexByType(const char *typename) { + ConnectionType *ct; - if (!callHandler(conn, conn->conn_handler)) return; - conn->conn_handler = NULL; - } - - /* Normally we execute the readable event first, and the writable - * event later. This is useful as sometimes we may be able - * to serve the reply of a query immediately after processing the - * query. - * - * However if WRITE_BARRIER is set in the mask, our application is - * asking us to do the reverse: never fire the writable event - * after the readable. In such a case, we invert the calls. - * This is useful when, for instance, we want to do things - * in the beforeSleep() hook, like fsync'ing a file to disk, - * before replying to a client. */ - int invert = conn->flags & CONN_FLAG_WRITE_BARRIER; - - int call_write = (mask & AE_WRITABLE) && conn->write_handler; - int call_read = (mask & AE_READABLE) && conn->read_handler; - - /* Handle normal I/O flows */ - if (!invert && call_read) { - if (!callHandler(conn, conn->read_handler)) return; - } - /* Fire the writable event. */ - if (call_write) { - if (!callHandler(conn, conn->write_handler)) return; - } - /* If we have to invert the call, fire the readable event now - * after the writable one. */ - if (invert && call_read) { - if (!callHandler(conn, conn->read_handler)) return; - } -} - -static int connSocketBlockingConnect(connection *conn, const char *addr, int port, long long timeout) { - int fd = anetTcpNonBlockConnect(NULL,addr,port); - if (fd == -1) { - conn->state = CONN_STATE_ERROR; - conn->last_errno = errno; - return C_ERR; - } + for (int type = 0; type < CONN_TYPE_MAX; type++) { + ct = connTypes[type]; + if (!ct) + break; - if ((aeWait(fd, AE_WRITABLE, timeout) & AE_WRITABLE) == 0) { - conn->state = CONN_STATE_ERROR; - conn->last_errno = ETIMEDOUT; + if (!strcasecmp(typename, ct->get_type(NULL))) + return type; } - conn->fd = fd; - conn->state = CONN_STATE_CONNECTED; - return C_OK; + return -1; } -/* Connection-based versions of syncio.c functions. - * NOTE: This should ideally be refactored out in favor of pure async work. - */ - -static ssize_t connSocketSyncWrite(connection *conn, char *ptr, ssize_t size, long long timeout) { - return syncWrite(conn->fd, ptr, size, timeout); -} +void connTypeCleanupAll() { + ConnectionType *ct; + int type; -static ssize_t connSocketSyncRead(connection *conn, char *ptr, ssize_t size, long long timeout) { - return syncRead(conn->fd, ptr, size, timeout); -} + for (type = 0; type < CONN_TYPE_MAX; type++) { + ct = connTypes[type]; + if (!ct) + break; -static ssize_t connSocketSyncReadLine(connection *conn, char *ptr, ssize_t size, long long timeout) { - return syncReadLine(conn->fd, ptr, size, timeout); -} - -static int connSocketGetType(connection *conn) { - (void) conn; - - return CONN_TYPE_SOCKET; -} - -ConnectionType CT_Socket = { - .ae_handler = connSocketEventHandler, - .close = connSocketClose, - .write = connSocketWrite, - .writev = connSocketWritev, - .read = connSocketRead, - .accept = connSocketAccept, - .connect = connSocketConnect, - .set_write_handler = connSocketSetWriteHandler, - .set_read_handler = connSocketSetReadHandler, - .get_last_error = connSocketGetLastError, - .blocking_connect = connSocketBlockingConnect, - .sync_write = connSocketSyncWrite, - .sync_read = connSocketSyncRead, - .sync_readline = connSocketSyncReadLine, - .get_type = connSocketGetType -}; - - -int connGetSocketError(connection *conn) { - int sockerr = 0; - socklen_t errlen = sizeof(sockerr); - - if (getsockopt(conn->fd, SOL_SOCKET, SO_ERROR, &sockerr, &errlen) == -1) - sockerr = errno; - return sockerr; -} - -int connPeerToString(connection *conn, char *ip, size_t ip_len, int *port) { - if (anetFdToString(conn ? conn->fd : -1, ip, ip_len, port, FD_TO_PEER_NAME) == -1) { - if (conn) conn->last_errno = errno; - return C_ERR; + if (ct->cleanup) + ct->cleanup(); } - return C_OK; } -int connSockName(connection *conn, char *ip, size_t ip_len, int *port) { - return anetFdToString(conn->fd, ip, ip_len, port, FD_TO_SOCK_NAME); -} +/* walk all the connection types until has pending data */ +int connTypeHasPendingData(void) { + ConnectionType *ct; + int type; + int ret = 0; -int connFormatFdAddr(connection *conn, char *buf, size_t buf_len, int fd_to_str_type) { - return anetFormatFdAddr(conn ? conn->fd : -1, buf, buf_len, fd_to_str_type); -} + for (type = 0; type < CONN_TYPE_MAX; type++) { + ct = connTypes[type]; + if (ct && ct->has_pending_data && (ret = ct->has_pending_data())) { + return ret; + } + } -int connBlock(connection *conn) { - if (conn->fd == -1) return C_ERR; - return anetBlock(NULL, conn->fd); + return ret; } -int connNonBlock(connection *conn) { - if (conn->fd == -1) return C_ERR; - return anetNonBlock(NULL, conn->fd); -} +/* walk all the connection types and process pending data for each connection type */ +int connTypeProcessPendingData(void) { + ConnectionType *ct; + int type; + int ret = 0; -int connEnableTcpNoDelay(connection *conn) { - if (conn->fd == -1) return C_ERR; - return anetEnableTcpNoDelay(NULL, conn->fd); -} + for (type = 0; type < CONN_TYPE_MAX; type++) { + ct = connTypes[type]; + if (ct && ct->process_pending_data) { + ret += ct->process_pending_data(); + } + } -int connDisableTcpNoDelay(connection *conn) { - if (conn->fd == -1) return C_ERR; - return anetDisableTcpNoDelay(NULL, conn->fd); + return ret; } -int connKeepAlive(connection *conn, int interval) { - if (conn->fd == -1) return C_ERR; - return anetKeepAlive(NULL, conn->fd, interval); -} +sds getListensInfoString(sds info) { + for (int j = 0; j < CONN_TYPE_MAX; j++) { + connListener *listener = &server.listeners[j]; + if (listener->ct == NULL) + continue; -int connSendTimeout(connection *conn, long long ms) { - return anetSendTimeout(NULL, conn->fd, ms); -} + info = sdscatfmt(info, "listener%i:name=%s", j, listener->ct->get_type(NULL)); + for (int i = 0; i < listener->count; i++) { + info = sdscatfmt(info, ",bind=%s", listener->bindaddr[i]); + } -int connRecvTimeout(connection *conn, long long ms) { - return anetRecvTimeout(NULL, conn->fd, ms); -} + if (listener->port) + info = sdscatfmt(info, ",port=%i", listener->port); -int connGetState(connection *conn) { - return conn->state; -} + info = sdscatfmt(info, "\r\n"); + } -/* Return a text that describes the connection, suitable for inclusion - * in CLIENT LIST and similar outputs. - * - * For sockets, we always return "fd=<fdnum>" to maintain compatibility. - */ -const char *connGetInfo(connection *conn, char *buf, size_t buf_len) { - snprintf(buf, buf_len-1, "fd=%i", conn == NULL ? -1 : conn->fd); - return buf; + return info; } - diff --git a/src/connection.h b/src/connection.h index dad2e2fd6..61ca205fe 100644 --- a/src/connection.h +++ b/src/connection.h @@ -32,12 +32,19 @@ #define __REDIS_CONNECTION_H #include <errno.h> +#include <stdio.h> +#include <string.h> #include <sys/uio.h> +#include "ae.h" + #define CONN_INFO_LEN 32 +#define CONN_ADDR_STR_LEN 128 /* Similar to INET6_ADDRSTRLEN, hoping to handle other protocols. */ +#define MAX_ACCEPTS_PER_CALL 1000 struct aeEventLoop; typedef struct connection connection; +typedef struct connListener connListener; typedef enum { CONN_STATE_NONE = 0, @@ -51,27 +58,55 @@ typedef enum { #define CONN_FLAG_CLOSE_SCHEDULED (1<<0) /* Closed scheduled by a handler */ #define CONN_FLAG_WRITE_BARRIER (1<<1) /* Write barrier requested */ -#define CONN_TYPE_SOCKET 1 -#define CONN_TYPE_TLS 2 +#define CONN_TYPE_SOCKET "tcp" +#define CONN_TYPE_UNIX "unix" +#define CONN_TYPE_TLS "tls" +#define CONN_TYPE_MAX 8 /* 8 is enough to be extendable */ typedef void (*ConnectionCallbackFunc)(struct connection *conn); typedef struct ConnectionType { + /* connection type */ + const char *(*get_type)(struct connection *conn); + + /* connection type initialize & finalize & configure */ + void (*init)(void); /* auto-call during register */ + void (*cleanup)(void); + int (*configure)(void *priv, int reconfigure); + + /* ae & accept & listen & error & address handler */ void (*ae_handler)(struct aeEventLoop *el, int fd, void *clientData, int mask); + aeFileProc *accept_handler; + int (*addr)(connection *conn, char *ip, size_t ip_len, int *port, int remote); + int (*listen)(connListener *listener); + + /* create/close connection */ + connection* (*conn_create)(void); + connection* (*conn_create_accepted)(int fd, void *priv); + void (*close)(struct connection *conn); + + /* connect & accept */ int (*connect)(struct connection *conn, const char *addr, int port, const char *source_addr, ConnectionCallbackFunc connect_handler); + int (*blocking_connect)(struct connection *conn, const char *addr, int port, long long timeout); + int (*accept)(struct connection *conn, ConnectionCallbackFunc accept_handler); + + /* IO */ int (*write)(struct connection *conn, const void *data, size_t data_len); int (*writev)(struct connection *conn, const struct iovec *iov, int iovcnt); int (*read)(struct connection *conn, void *buf, size_t buf_len); - void (*close)(struct connection *conn); - int (*accept)(struct connection *conn, ConnectionCallbackFunc accept_handler); int (*set_write_handler)(struct connection *conn, ConnectionCallbackFunc handler, int barrier); int (*set_read_handler)(struct connection *conn, ConnectionCallbackFunc handler); const char *(*get_last_error)(struct connection *conn); - int (*blocking_connect)(struct connection *conn, const char *addr, int port, long long timeout); ssize_t (*sync_write)(struct connection *conn, char *ptr, ssize_t size, long long timeout); ssize_t (*sync_read)(struct connection *conn, char *ptr, ssize_t size, long long timeout); ssize_t (*sync_readline)(struct connection *conn, char *ptr, ssize_t size, long long timeout); - int (*get_type)(struct connection *conn); + + /* pending data */ + int (*has_pending_data)(void); + int (*process_pending_data)(void); + + /* TLS specified methods */ + sds (*get_peer_cert)(struct connection *conn); } ConnectionType; struct connection { @@ -87,6 +122,19 @@ struct connection { int fd; }; +#define CONFIG_BINDADDR_MAX 16 + +/* Setup a listener by a connection type */ +struct connListener { + int fd[CONFIG_BINDADDR_MAX]; + int count; + char **bindaddr; + int bindaddr_count; + int port; + ConnectionType *ct; + void *priv; /* used by connection type specified data */ +}; + /* The connection module does not deal with listening and accepting sockets, * so we assume we have a socket when an incoming connection is created. * @@ -216,7 +264,7 @@ static inline ssize_t connSyncReadLine(connection *conn, char *ptr, ssize_t size } /* Return CONN_TYPE_* for the specified connection */ -static inline int connGetType(connection *conn) { +static inline const char *connGetType(connection *conn) { return conn->type->get_type(conn); } @@ -224,18 +272,77 @@ static inline int connLastErrorRetryable(connection *conn) { return conn->last_errno == EINTR; } -connection *connCreateSocket(); -connection *connCreateAcceptedSocket(int fd); +/* Get address information of a connection. + * remote works as boolean type to get local/remote address */ +static inline int connAddr(connection *conn, char *ip, size_t ip_len, int *port, int remote) { + if (conn && conn->type->addr) { + return conn->type->addr(conn, ip, ip_len, port, remote); + } -connection *connCreateTLS(); -connection *connCreateAcceptedTLS(int fd, int require_auth); + return -1; +} -void connSetPrivateData(connection *conn, void *data); -void *connGetPrivateData(connection *conn); -int connGetState(connection *conn); -int connHasWriteHandler(connection *conn); -int connHasReadHandler(connection *conn); -int connGetSocketError(connection *conn); +/* Format an IP,port pair into something easy to parse. If IP is IPv6 + * (matches for ":"), the ip is surrounded by []. IP and port are just + * separated by colons. This the standard to display addresses within Redis. */ +static inline int formatAddr(char *buf, size_t buf_len, char *ip, int port) { + return snprintf(buf, buf_len, strchr(ip,':') ? + "[%s]:%d" : "%s:%d", ip, port); +} + +static inline int connFormatAddr(connection *conn, char *buf, size_t buf_len, int remote) +{ + char ip[CONN_ADDR_STR_LEN]; + int port; + + if (connAddr(conn, ip, sizeof(ip), &port, remote) < 0) { + return -1; + } + + return formatAddr(buf, buf_len, ip, port); +} + +static inline int connAddrPeerName(connection *conn, char *ip, size_t ip_len, int *port) { + return connAddr(conn, ip, ip_len, port, 1); +} + +static inline int connAddrSockName(connection *conn, char *ip, size_t ip_len, int *port) { + return connAddr(conn, ip, ip_len, port, 0); +} + +static inline int connGetState(connection *conn) { + return conn->state; +} + +/* Returns true if a write handler is registered */ +static inline int connHasWriteHandler(connection *conn) { + return conn->write_handler != NULL; +} + +/* Returns true if a read handler is registered */ +static inline int connHasReadHandler(connection *conn) { + return conn->read_handler != NULL; +} + +/* Associate a private data pointer with the connection */ +static inline void connSetPrivateData(connection *conn, void *data) { + conn->private_data = data; +} + +/* Get the associated private data pointer */ +static inline void *connGetPrivateData(connection *conn) { + return conn->private_data; +} + +/* Return a text that describes the connection, suitable for inclusion + * in CLIENT LIST and similar outputs. + * + * For sockets, we always return "fd=<fdnum>" to maintain compatibility. + */ +static inline const char *connGetInfo(connection *conn, char *buf, size_t buf_len) { + snprintf(buf, buf_len-1, "fd=%i", conn == NULL ? -1 : conn->fd); + return buf; +} /* anet-style wrappers to conns */ int connBlock(connection *conn); @@ -245,14 +352,81 @@ int connDisableTcpNoDelay(connection *conn); int connKeepAlive(connection *conn, int interval); int connSendTimeout(connection *conn, long long ms); int connRecvTimeout(connection *conn, long long ms); -int connPeerToString(connection *conn, char *ip, size_t ip_len, int *port); -int connFormatFdAddr(connection *conn, char *buf, size_t buf_len, int fd_to_str_type); -int connSockName(connection *conn, char *ip, size_t ip_len, int *port); -const char *connGetInfo(connection *conn, char *buf, size_t buf_len); - -/* Helpers for tls special considerations */ -sds connTLSGetPeerCert(connection *conn); -int tlsHasPendingData(); -int tlsProcessPendingData(); + +/* Get cert for the secure connection */ +static inline sds connGetPeerCert(connection *conn) { + if (conn->type->get_peer_cert) { + return conn->type->get_peer_cert(conn); + } + + return NULL; +} + +/* Initialize the redis connection framework */ +int connTypeInitialize(); + +/* Register a connection type into redis connection framework */ +int connTypeRegister(ConnectionType *ct); + +/* Lookup a connection type by type name */ +ConnectionType *connectionByType(const char *typename); + +/* Fast path to get TCP connection type */ +ConnectionType *connectionTypeTcp(); + +/* Fast path to get TLS connection type */ +ConnectionType *connectionTypeTls(); + +/* Fast path to get Unix connection type */ +ConnectionType *connectionTypeUnix(); + +/* Lookup the index of a connection type by type name, return -1 if not found */ +int connectionIndexByType(const char *typename); + +/* Create a connection of specified type */ +static inline connection *connCreate(ConnectionType *ct) { + return ct->conn_create(); +} + +/* Create an accepted connection of specified type. + * priv is connection type specified argument */ +static inline connection *connCreateAccepted(ConnectionType *ct, int fd, void *priv) { + return ct->conn_create_accepted(fd, priv); +} + +/* Configure a connection type. A typical case is to configure TLS. + * priv is connection type specified, + * reconfigure is boolean type to specify if overwrite the original config */ +static inline int connTypeConfigure(ConnectionType *ct, void *priv, int reconfigure) { + return ct->configure(priv, reconfigure); +} + +/* Walk all the connection types and cleanup them all if possible */ +void connTypeCleanupAll(); + +/* Test all the connection type has pending data or not. */ +int connTypeHasPendingData(void); + +/* walk all the connection types and process pending data for each connection type */ +int connTypeProcessPendingData(void); + +/* Listen on an initialized listener */ +static inline int connListen(connListener *listener) { + return listener->ct->listen(listener); +} + +/* Get accept_handler of a connection type */ +static inline aeFileProc *connAcceptHandler(ConnectionType *ct) { + if (ct) + return ct->accept_handler; + return NULL; +} + +/* Get Listeners information, note that caller should free the non-empty string */ +sds getListensInfoString(sds info); + +int RedisRegisterConnectionTypeSocket(); +int RedisRegisterConnectionTypeUnix(); +int RedisRegisterConnectionTypeTLS(); #endif /* __REDIS_CONNECTION_H */ diff --git a/src/mkreleasehdr.sh b/src/mkreleasehdr.sh index 236c26c2b..117b9e86f 100755 --- a/src/mkreleasehdr.sh +++ b/src/mkreleasehdr.sh @@ -11,4 +11,6 @@ test -f release.h || touch release.h echo "#define REDIS_GIT_SHA1 \"$GIT_SHA1\"" > release.h echo "#define REDIS_GIT_DIRTY \"$GIT_DIRTY\"" >> release.h echo "#define REDIS_BUILD_ID \"$BUILD_ID\"" >> release.h +echo "#include \"version.h\"" >> release.h +echo "#define REDIS_BUILD_ID_RAW REDIS_VERSION REDIS_BUILD_ID REDIS_GIT_DIRTY REDIS_GIT_SHA1" >> release.h touch release.c # Force recompile of release.c diff --git a/src/module.c b/src/module.c index d915b56a1..f0f49837b 100644 --- a/src/module.c +++ b/src/module.c @@ -69,14 +69,14 @@ * pointers that have an API the module can call with them) * -------------------------------------------------------------------------- */ -typedef struct RedisModuleInfoCtx { +struct RedisModuleInfoCtx { struct RedisModule *module; dict *requested_sections; sds info; /* info string we collected so far */ int sections; /* number of sections we collected so far */ int in_section; /* indication if we're in an active section or not */ int in_dict_field; /* indication that we're currently appending to a dict */ -} RedisModuleInfoCtx; +}; /* This represents a shared API. Shared APIs will be used to populate * the server.sharedapi dictionary, mapping names of APIs exported by @@ -3327,11 +3327,11 @@ int modulePopulateClientInfoStructure(void *ci, client *client, int structver) { ci1->flags |= REDISMODULE_CLIENTINFO_FLAG_TRACKING; if (client->flags & CLIENT_BLOCKED) ci1->flags |= REDISMODULE_CLIENTINFO_FLAG_BLOCKED; - if (connGetType(client->conn) == CONN_TYPE_TLS) + if (client->conn->type == connectionTypeTls()) ci1->flags |= REDISMODULE_CLIENTINFO_FLAG_SSL; int port; - connPeerToString(client->conn,ci1->addr,sizeof(ci1->addr),&port); + connAddrPeerName(client->conn,ci1->addr,sizeof(ci1->addr),&port); ci1->port = port; ci1->db = client->db->id; ci1->id = client->id; @@ -3529,6 +3529,8 @@ int RM_GetSelectedDb(RedisModuleCtx *ctx) { * * * REDISMODULE_CTX_FLAGS_RESP3: Indicate the that client attached to this * context is using RESP3. + * + * * REDISMODULE_CTX_FLAGS_SERVER_STARTUP: The Redis instance is starting */ int RM_GetContextFlags(RedisModuleCtx *ctx) { int flags = 0; @@ -3614,6 +3616,10 @@ int RM_GetContextFlags(RedisModuleCtx *ctx) { if (hasActiveChildProcess()) flags |= REDISMODULE_CTX_FLAGS_ACTIVE_CHILD; if (server.in_fork_child) flags |= REDISMODULE_CTX_FLAGS_IS_CHILD; + /* Non-empty server.loadmodule_queue means that Redis is starting. */ + if (listLength(server.loadmodule_queue) > 0) + flags |= REDISMODULE_CTX_FLAGS_SERVER_STARTUP; + return flags; } @@ -8946,7 +8952,7 @@ RedisModuleString *RM_GetClientCertificate(RedisModuleCtx *ctx, uint64_t client_ client *c = lookupClientByID(client_id); if (c == NULL) return NULL; - sds cert = connTLSGetPeerCert(c->conn); + sds cert = connGetPeerCert(c->conn); if (!cert) return NULL; RedisModuleString *s = createObject(OBJ_STRING, cert); @@ -12205,13 +12211,13 @@ const char *RM_GetCurrentCommandName(RedisModuleCtx *ctx) { /* The defrag context, used to manage state during calls to the data type * defrag callback. */ -typedef struct RedisModuleDefragCtx { +struct RedisModuleDefragCtx { long defragged; long long int endtime; unsigned long *cursor; struct redisObject *key; /* Optional name of key processed, NULL when unknown. */ int dbid; /* The dbid of the key being processed, -1 when unknown. */ -} RedisModuleDefragCtx; +}; /* Register a defrag callback for global data, i.e. anything that the module * may allocate that is not tied to a specific data type. diff --git a/src/networking.c b/src/networking.c index d6471ee7c..7b3225fe9 100644 --- a/src/networking.c +++ b/src/networking.c @@ -1212,7 +1212,7 @@ int islocalClient(client *c) { /* tcp */ char cip[NET_IP_STR_LEN+1] = { 0 }; - connPeerToString(c->conn, cip, sizeof(cip)-1, NULL); + connAddrPeerName(c->conn, cip, sizeof(cip)-1, NULL); return !strcmp(cip,"127.0.0.1") || !strcmp(cip,"::1"); } @@ -1271,8 +1271,7 @@ void clientAcceptHandler(connection *conn) { c); } -#define MAX_ACCEPTS_PER_CALL 1000 -static void acceptCommonHandler(connection *conn, int flags, char *ip) { +void acceptCommonHandler(connection *conn, int flags, char *ip) { client *c; char conninfo[100]; UNUSED(ip); @@ -1344,65 +1343,6 @@ static void acceptCommonHandler(connection *conn, int flags, char *ip) { } } -void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) { - int cport, cfd, max = MAX_ACCEPTS_PER_CALL; - char cip[NET_IP_STR_LEN]; - UNUSED(el); - UNUSED(mask); - UNUSED(privdata); - - while(max--) { - cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport); - if (cfd == ANET_ERR) { - if (errno != EWOULDBLOCK) - serverLog(LL_WARNING, - "Accepting client connection: %s", server.neterr); - return; - } - serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport); - acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip); - } -} - -void acceptTLSHandler(aeEventLoop *el, int fd, void *privdata, int mask) { - int cport, cfd, max = MAX_ACCEPTS_PER_CALL; - char cip[NET_IP_STR_LEN]; - UNUSED(el); - UNUSED(mask); - UNUSED(privdata); - - while(max--) { - cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport); - if (cfd == ANET_ERR) { - if (errno != EWOULDBLOCK) - serverLog(LL_WARNING, - "Accepting client connection: %s", server.neterr); - return; - } - serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport); - acceptCommonHandler(connCreateAcceptedTLS(cfd, server.tls_auth_clients),0,cip); - } -} - -void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask) { - int cfd, max = MAX_ACCEPTS_PER_CALL; - UNUSED(el); - UNUSED(mask); - UNUSED(privdata); - - while(max--) { - cfd = anetUnixAccept(server.neterr, fd); - if (cfd == ANET_ERR) { - if (errno != EWOULDBLOCK) - serverLog(LL_WARNING, - "Accepting client connection: %s", server.neterr); - return; - } - serverLog(LL_VERBOSE,"Accepted connection to %s", server.unixsocket); - acceptCommonHandler(connCreateAcceptedSocket(cfd),CLIENT_UNIX_SOCKET,NULL); - } -} - void freeClientOriginalArgv(client *c) { /* We didn't rewrite this client */ if (!c->original_argv) return; @@ -2716,13 +2656,13 @@ done: * you want to relax error checking or need to display something anyway (see * anetFdToString implementation for more info). */ void genClientAddrString(client *client, char *addr, - size_t addr_len, int fd_to_str_type) { + size_t addr_len, int remote) { if (client->flags & CLIENT_UNIX_SOCKET) { /* Unix socket client. */ snprintf(addr,addr_len,"%s:0",server.unixsocket); } else { /* TCP client. */ - connFormatFdAddr(client->conn,addr,addr_len,fd_to_str_type); + connFormatAddr(client->conn,addr,addr_len,remote); } } @@ -2731,10 +2671,10 @@ void genClientAddrString(client *client, char *addr, * The Peer ID never changes during the life of the client, however it * is expensive to compute. */ char *getClientPeerId(client *c) { - char peerid[NET_ADDR_STR_LEN]; + char peerid[NET_ADDR_STR_LEN] = {0}; if (c->peerid == NULL) { - genClientAddrString(c,peerid,sizeof(peerid),FD_TO_PEER_NAME); + genClientAddrString(c,peerid,sizeof(peerid),1); c->peerid = sdsnew(peerid); } return c->peerid; @@ -2745,10 +2685,10 @@ char *getClientPeerId(client *c) { * The Socket Name never changes during the life of the client, however it * is expensive to compute. */ char *getClientSockname(client *c) { - char sockname[NET_ADDR_STR_LEN]; + char sockname[NET_ADDR_STR_LEN] = {0}; if (c->sockname == NULL) { - genClientAddrString(c,sockname,sizeof(sockname),FD_TO_SOCK_NAME); + genClientAddrString(c,sockname,sizeof(sockname),0); c->sockname = sdsnew(sockname); } return c->sockname; diff --git a/src/redis-cli.c b/src/redis-cli.c index bc85c21d4..131e79a78 100644 --- a/src/redis-cli.c +++ b/src/redis-cli.c @@ -61,6 +61,7 @@ #include "help.h" /* Used for backwards-compatibility with pre-7.0 servers that don't support COMMAND DOCS. */ #include "anet.h" #include "ae.h" +#include "connection.h" #include "cli_common.h" #include "mt19937-64.h" @@ -322,7 +323,7 @@ static void cliRefreshPrompt(void) { prompt = sdscatfmt(prompt,"redis %s",config.hostsocket); } else { char addr[256]; - anetFormatAddr(addr, sizeof(addr), config.conn_info.hostip, config.conn_info.hostport); + formatAddr(addr, sizeof(addr), config.conn_info.hostip, config.conn_info.hostport); prompt = sdscatlen(prompt,addr,strlen(addr)); } diff --git a/src/redismodule.h b/src/redismodule.h index 046677eed..36e8bf51f 100644 --- a/src/redismodule.h +++ b/src/redismodule.h @@ -162,11 +162,13 @@ typedef struct RedisModuleStreamID { #define REDISMODULE_CTX_FLAGS_RESP3 (1<<22) /* Redis is currently async loading database for diskless replication. */ #define REDISMODULE_CTX_FLAGS_ASYNC_LOADING (1<<23) +/* Redis is starting. */ +#define REDISMODULE_CTX_FLAGS_SERVER_STARTUP (1<<24) /* Next context flag, must be updated when adding new flags above! This flag should not be used directly by the module. * Use RedisModule_GetContextFlagsAll instead. */ -#define _REDISMODULE_CTX_FLAGS_NEXT (1<<24) +#define _REDISMODULE_CTX_FLAGS_NEXT (1<<25) /* Keyspace changes notification classes. Every class is associated with a * character for configuration purposes. @@ -748,13 +750,41 @@ typedef enum { REDISMODULE_ACL_LOG_CHANNEL /* Channel authorization failure */ } RedisModuleACLLogEntryReason; +/* Incomplete structures needed by both the core and modules. */ +typedef struct RedisModuleString RedisModuleString; +typedef struct RedisModuleIO RedisModuleIO; +typedef struct RedisModuleDigest RedisModuleDigest; +typedef struct RedisModuleInfoCtx RedisModuleInfoCtx; +typedef struct RedisModuleDefragCtx RedisModuleDefragCtx; + +/* Function pointers needed by both the core and modules, these needs to be + * exposed since you can't cast a function pointer to (void *). */ +typedef void (*RedisModuleInfoFunc)(RedisModuleInfoCtx *ctx, int for_crash_report); +typedef void (*RedisModuleDefragFunc)(RedisModuleDefragCtx *ctx); +typedef void (*RedisModuleUserChangedFunc) (uint64_t client_id, void *privdata); + /* ------------------------- End of common defines ------------------------ */ -#ifndef REDISMODULE_CORE +#if defined REDISMODULE_CORE +/* Things only defined for the modules core (server), not exported to modules + * that include this file. */ + +#define RedisModuleString robj + +#endif /* defined REDISMODULE_CORE */ + +#if !defined REDISMODULE_CORE && !defined REDISMODULE_CORE_MODULE +/* Things defined for modules, but not for core-modules. */ typedef long long mstime_t; typedef long long ustime_t; +#endif /* !defined REDISMODULE_CORE && !defined REDISMODULE_CORE_MODULE */ + +/* ----------- The rest of the defines are only for modules ----------------- */ +#if !defined REDISMODULE_CORE || defined REDISMODULE_CORE_MODULE +/* Things defined for modules and core-modules. */ + /* Macro definitions specific to individual compilers */ #ifndef REDISMODULE_ATTR_UNUSED # ifdef __GNUC__ @@ -784,21 +814,16 @@ typedef long long ustime_t; typedef struct RedisModuleCtx RedisModuleCtx; typedef struct RedisModuleCommand RedisModuleCommand; typedef struct RedisModuleKey RedisModuleKey; -typedef struct RedisModuleString RedisModuleString; typedef struct RedisModuleCallReply RedisModuleCallReply; -typedef struct RedisModuleIO RedisModuleIO; typedef struct RedisModuleType RedisModuleType; -typedef struct RedisModuleDigest RedisModuleDigest; typedef struct RedisModuleBlockedClient RedisModuleBlockedClient; typedef struct RedisModuleClusterInfo RedisModuleClusterInfo; typedef struct RedisModuleDict RedisModuleDict; typedef struct RedisModuleDictIter RedisModuleDictIter; typedef struct RedisModuleCommandFilterCtx RedisModuleCommandFilterCtx; typedef struct RedisModuleCommandFilter RedisModuleCommandFilter; -typedef struct RedisModuleInfoCtx RedisModuleInfoCtx; typedef struct RedisModuleServerInfoData RedisModuleServerInfoData; typedef struct RedisModuleScanCursor RedisModuleScanCursor; -typedef struct RedisModuleDefragCtx RedisModuleDefragCtx; typedef struct RedisModuleUser RedisModuleUser; typedef struct RedisModuleKeyOptCtx RedisModuleKeyOptCtx; @@ -825,11 +850,8 @@ typedef void (*RedisModuleClusterMessageReceiver)(RedisModuleCtx *ctx, const cha typedef void (*RedisModuleTimerProc)(RedisModuleCtx *ctx, void *data); typedef void (*RedisModuleCommandFilterFunc) (RedisModuleCommandFilterCtx *filter); typedef void (*RedisModuleForkDoneHandler) (int exitcode, int bysignal, void *user_data); -typedef void (*RedisModuleInfoFunc)(RedisModuleInfoCtx *ctx, int for_crash_report); typedef void (*RedisModuleScanCB)(RedisModuleCtx *ctx, RedisModuleString *keyname, RedisModuleKey *key, void *privdata); typedef void (*RedisModuleScanKeyCB)(RedisModuleKey *key, RedisModuleString *field, RedisModuleString *value, void *privdata); -typedef void (*RedisModuleUserChangedFunc) (uint64_t client_id, void *privdata); -typedef int (*RedisModuleDefragFunc)(RedisModuleDefragCtx *ctx); typedef RedisModuleString * (*RedisModuleConfigGetStringFunc)(const char *name, void *privdata); typedef long long (*RedisModuleConfigGetNumericFunc)(const char *name, void *privdata); typedef int (*RedisModuleConfigGetBoolFunc)(const char *name, void *privdata); @@ -1557,11 +1579,5 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int #define RMAPI_FUNC_SUPPORTED(func) (func != NULL) -#else - -/* Things only defined for the modules core, not exported to modules - * including this file. */ -#define RedisModuleString robj - #endif /* REDISMODULE_CORE */ #endif /* REDISMODULE_H */ diff --git a/src/release.c b/src/release.c index e0bd018fc..adc7e55dd 100644 --- a/src/release.c +++ b/src/release.c @@ -35,7 +35,6 @@ #include <stdio.h> #include "release.h" -#include "version.h" #include "crc64.h" char *redisGitSHA1(void) { @@ -46,8 +45,12 @@ char *redisGitDirty(void) { return REDIS_GIT_DIRTY; } +const char *redisBuildIdRaw(void) { + return REDIS_BUILD_ID_RAW; +} + uint64_t redisBuildId(void) { - char *buildid = REDIS_VERSION REDIS_BUILD_ID REDIS_GIT_DIRTY REDIS_GIT_SHA1; + char *buildid = REDIS_BUILD_ID_RAW; return crc64(0,(unsigned char*)buildid,strlen(buildid)); } diff --git a/src/replication.c b/src/replication.c index 58ed10833..7a71ac091 100644 --- a/src/replication.c +++ b/src/replication.c @@ -33,6 +33,7 @@ #include "cluster.h" #include "bio.h" #include "functions.h" +#include "connection.h" #include <memory.h> #include <sys/time.h> @@ -54,6 +55,13 @@ int cancelReplicationHandshake(int reconnect); int RDBGeneratedByReplication = 0; /* --------------------------- Utility functions ---------------------------- */ +static ConnectionType *connTypeOfReplication() { + if (server.tls_replication) { + return connectionTypeTls(); + } + + return connectionTypeTcp(); +} /* Return the pointer to a string representing the slave ip:listening_port * pair. Mostly useful for logging, since we want to log a slave using its @@ -66,11 +74,11 @@ char *replicationGetSlaveName(client *c) { ip[0] = '\0'; buf[0] = '\0'; if (c->slave_addr || - connPeerToString(c->conn,ip,sizeof(ip),NULL) != -1) + connAddrPeerName(c->conn,ip,sizeof(ip),NULL) != -1) { char *addr = c->slave_addr ? c->slave_addr : ip; if (c->slave_listening_port) - anetFormatAddr(buf,sizeof(buf),addr,c->slave_listening_port); + formatAddr(buf,sizeof(buf),addr,c->slave_listening_port); else snprintf(buf,sizeof(buf),"%s:<unknown-replica-port>",addr); } else { @@ -2863,7 +2871,7 @@ write_error: /* Handle sendCommand() errors. */ } int connectWithMaster(void) { - server.repl_transfer_s = server.tls_replication ? connCreateTLS() : connCreateSocket(); + server.repl_transfer_s = connCreate(connTypeOfReplication()); if (connConnect(server.repl_transfer_s, server.masterhost, server.masterport, server.bind_source_addr, syncWithMaster) == C_ERR) { serverLog(LL_WARNING,"Unable to connect to MASTER: %s", @@ -3156,7 +3164,7 @@ void roleCommand(client *c) { char ip[NET_IP_STR_LEN], *slaveaddr = slave->slave_addr; if (!slaveaddr) { - if (connPeerToString(slave->conn,ip,sizeof(ip),NULL) == -1) + if (connAddrPeerName(slave->conn,ip,sizeof(ip),NULL) == -1) continue; slaveaddr = ip; } @@ -3817,7 +3825,7 @@ static client *findReplica(char *host, int port) { char ip[NET_IP_STR_LEN], *replicaip = replica->slave_addr; if (!replicaip) { - if (connPeerToString(replica->conn, ip, sizeof(ip), NULL) == -1) + if (connAddrPeerName(replica->conn, ip, sizeof(ip), NULL) == -1) continue; replicaip = ip; } @@ -4048,7 +4056,7 @@ void updateFailoverStatus(void) { char ip[NET_IP_STR_LEN], *replicaaddr = replica->slave_addr; if (!replicaaddr) { - if (connPeerToString(replica->conn,ip,sizeof(ip),NULL) == -1) + if (connAddrPeerName(replica->conn,ip,sizeof(ip),NULL) == -1) continue; replicaaddr = ip; } diff --git a/src/sentinel.c b/src/sentinel.c index f819af11a..a75eb3236 100644 --- a/src/sentinel.c +++ b/src/sentinel.c @@ -30,7 +30,7 @@ #include "server.h" #include "hiredis.h" -#ifdef USE_OPENSSL +#if USE_OPENSSL == 1 /* BUILD_YES */ #include "openssl/ssl.h" #include "hiredis_ssl.h" #endif @@ -44,7 +44,7 @@ extern char **environ; -#ifdef USE_OPENSSL +#if USE_OPENSSL == 1 /* BUILD_YES */ extern SSL_CTX *redis_tls_ctx; extern SSL_CTX *redis_tls_client_ctx; #endif @@ -848,7 +848,7 @@ void sentinelRunPendingScripts(void) { sj->pid = 0; } else if (pid == 0) { /* Child */ - tlsCleanup(); + connTypeCleanupAll(); execve(sj->argv[0],sj->argv,environ); /* If we are here an error occurred. */ _exit(2); /* Don't retry execution. */ @@ -2378,9 +2378,7 @@ void sentinelSetClientName(sentinelRedisInstance *ri, redisAsyncContext *c, char } static int instanceLinkNegotiateTLS(redisAsyncContext *context) { -#ifndef USE_OPENSSL - (void) context; -#else +#if USE_OPENSSL == 1 /* BUILD_YES */ if (!redis_tls_ctx) return C_ERR; SSL *ssl = SSL_new(redis_tls_client_ctx ? redis_tls_client_ctx : redis_tls_ctx); if (!ssl) return C_ERR; @@ -2389,6 +2387,8 @@ static int instanceLinkNegotiateTLS(redisAsyncContext *context) { SSL_free(ssl); return C_ERR; } +#else + UNUSED(context); #endif return C_OK; } @@ -3027,7 +3027,7 @@ int sentinelSendHello(sentinelRedisInstance *ri) { if (sentinel.announce_ip) { announce_ip = sentinel.announce_ip; } else { - if (anetFdToString(ri->link->cc->c.fd,ip,sizeof(ip),NULL,FD_TO_SOCK_NAME) == -1) + if (anetFdToString(ri->link->cc->c.fd,ip,sizeof(ip),NULL,0) == -1) return C_ERR; announce_ip = ip; } diff --git a/src/server.c b/src/server.c index b96d95b75..b6a72ae88 100644 --- a/src/server.c +++ b/src/server.c @@ -1535,7 +1535,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) { if (ProcessingEventsWhileBlocked) { uint64_t processed = 0; processed += handleClientsWithPendingReadsUsingThreads(); - processed += tlsProcessPendingData(); + processed += connTypeProcessPendingData(); if (server.aof_state == AOF_ON || server.aof_state == AOF_WAIT_REWRITE) flushAppendOnlyFile(0); processed += handleClientsWithPendingWrites(); @@ -1550,11 +1550,11 @@ void beforeSleep(struct aeEventLoop *eventLoop) { /* We should handle pending reads clients ASAP after event loop. */ handleClientsWithPendingReadsUsingThreads(); - /* Handle TLS pending data. (must be done before flushAppendOnlyFile) */ - tlsProcessPendingData(); + /* Handle pending data(typical TLS). (must be done before flushAppendOnlyFile) */ + connTypeProcessPendingData(); - /* If tls still has pending unread data don't sleep at all. */ - aeSetDontWait(server.el, tlsHasPendingData()); + /* If any connection type(typical TLS) still has pending unread data don't sleep at all. */ + aeSetDontWait(server.el, connTypeHasPendingData()); /* Call the Redis Cluster before sleep function. Note that this function * may change the state of Redis Cluster (from ok to fail or vice versa), @@ -1862,9 +1862,7 @@ void initServerConfig(void) { server.bindaddr_count = CONFIG_DEFAULT_BINDADDR_COUNT; for (j = 0; j < CONFIG_DEFAULT_BINDADDR_COUNT; j++) server.bindaddr[j] = zstrdup(default_bindaddr[j]); - server.ipfd.count = 0; - server.tlsfd.count = 0; - server.sofd = -1; + memset(server.listeners, 0x00, sizeof(server.listeners)); server.active_expire_enabled = 1; server.skip_checksum_validation = 0; server.loading = 0; @@ -2232,7 +2230,7 @@ void checkTcpBacklogSettings(void) { #endif } -void closeSocketListeners(socketFds *sfd) { +void closeListener(connListener *sfd) { int j; for (j = 0; j < sfd->count; j++) { @@ -2247,11 +2245,11 @@ void closeSocketListeners(socketFds *sfd) { /* Create an event handler for accepting new connections in TCP or TLS domain sockets. * This works atomically for all socket fds */ -int createSocketAcceptHandler(socketFds *sfd, aeFileProc *accept_handler) { +int createSocketAcceptHandler(connListener *sfd, aeFileProc *accept_handler) { int j; for (j = 0; j < sfd->count; j++) { - if (aeCreateFileEvent(server.el, sfd->fd[j], AE_READABLE, accept_handler,NULL) == AE_ERR) { + if (aeCreateFileEvent(server.el, sfd->fd[j], AE_READABLE, accept_handler,sfd) == AE_ERR) { /* Rollback */ for (j = j-1; j >= 0; j--) aeDeleteFileEvent(server.el, sfd->fd[j], AE_READABLE); return C_ERR; @@ -2264,7 +2262,8 @@ int createSocketAcceptHandler(socketFds *sfd, aeFileProc *accept_handler) { * binding the addresses specified in the Redis server configuration. * * The listening file descriptors are stored in the integer array 'fds' - * and their number is set in '*count'. + * and their number is set in '*count'. Actually @sfd should be 'listener', + * for the historical reasons, let's keep 'sfd' here. * * The addresses to bind are specified in the global server.bindaddr array * and their number is server.bindaddr_count. If the server configuration @@ -2278,14 +2277,15 @@ int createSocketAcceptHandler(socketFds *sfd, aeFileProc *accept_handler) { * impossible to bind, or no bind addresses were specified in the server * configuration but the function is not able to bind * for at least * one of the IPv4 or IPv6 protocols. */ -int listenToPort(int port, socketFds *sfd) { +int listenToPort(connListener *sfd) { int j; - char **bindaddr = server.bindaddr; + int port = sfd->port; + char **bindaddr = sfd->bindaddr; /* If we have no bind address, we don't listen on a TCP socket */ - if (server.bindaddr_count == 0) return C_OK; + if (sfd->bindaddr_count == 0) return C_OK; - for (j = 0; j < server.bindaddr_count; j++) { + for (j = 0; j < sfd->bindaddr_count; j++) { char* addr = bindaddr[j]; int optional = *addr == '-'; if (optional) addr++; @@ -2309,7 +2309,7 @@ int listenToPort(int port, socketFds *sfd) { continue; /* Rollback successful listens before exiting */ - closeSocketListeners(sfd); + closeListener(sfd); return C_ERR; } if (server.socket_mark_id > 0) anetSetSockMarkId(NULL, sfd->fd[sfd->count], server.socket_mark_id); @@ -2446,12 +2446,6 @@ void initServer(void) { exit(1); } - if ((server.tls_port || server.tls_replication || server.tls_cluster) - && tlsConfigure(&server.tls_ctx_config) == C_ERR) { - serverLog(LL_WARNING, "Failed to configure TLS. Check logs for more info."); - exit(1); - } - for (j = 0; j < CLIENT_MEM_USAGE_BUCKETS; j++) { server.client_mem_usage_buckets[j].mem_usage_sum = 0; server.client_mem_usage_buckets[j].clients = listCreate(); @@ -2470,39 +2464,6 @@ void initServer(void) { } server.db = zmalloc(sizeof(redisDb)*server.dbnum); - /* Open the TCP listening socket for the user commands. */ - if (server.port != 0 && - listenToPort(server.port,&server.ipfd) == C_ERR) { - /* Note: the following log text is matched by the test suite. */ - serverLog(LL_WARNING, "Failed listening on port %u (TCP), aborting.", server.port); - exit(1); - } - if (server.tls_port != 0 && - listenToPort(server.tls_port,&server.tlsfd) == C_ERR) { - /* Note: the following log text is matched by the test suite. */ - serverLog(LL_WARNING, "Failed listening on port %u (TLS), aborting.", server.tls_port); - exit(1); - } - - /* Open the listening Unix domain socket. */ - if (server.unixsocket != NULL) { - unlink(server.unixsocket); /* don't care if this fails */ - server.sofd = anetUnixServer(server.neterr,server.unixsocket, - (mode_t)server.unixsocketperm, server.tcp_backlog); - if (server.sofd == ANET_ERR) { - serverLog(LL_WARNING, "Failed opening Unix socket: %s", server.neterr); - exit(1); - } - anetNonBlock(NULL,server.sofd); - anetCloexec(server.sofd); - } - - /* Abort if there are no listening sockets at all. */ - if (server.ipfd.count == 0 && server.tlsfd.count == 0 && server.sofd < 0) { - serverLog(LL_WARNING, "Configured to not listen anywhere, exiting."); - exit(1); - } - /* Create the Redis databases, and initialize other internal state. */ for (j = 0; j < server.dbnum; j++) { server.db[j].dict = dictCreate(&dbDictType); @@ -2583,18 +2544,6 @@ void initServer(void) { exit(1); } - /* Create an event handler for accepting new connections in TCP and Unix - * domain sockets. */ - if (createSocketAcceptHandler(&server.ipfd, acceptTcpHandler) != C_OK) { - serverPanic("Unrecoverable error creating TCP socket accept handler."); - } - if (createSocketAcceptHandler(&server.tlsfd, acceptTLSHandler) != C_OK) { - serverPanic("Unrecoverable error creating TLS socket accept handler."); - } - if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE, - acceptUnixHandler,NULL) == AE_ERR) serverPanic("Unrecoverable error creating server.sofd file event."); - - /* Register a readable event for the pipe used to awake the event loop * from module threads. */ if (aeCreateFileEvent(server.el, server.module_pipe[0], AE_READABLE, @@ -2618,7 +2567,6 @@ void initServer(void) { server.maxmemory_policy = MAXMEMORY_NO_EVICTION; } - if (server.cluster_enabled) clusterInit(); scriptingInit(1); functionsInit(); slowlogInit(); @@ -2630,6 +2578,78 @@ void initServer(void) { applyWatchdogPeriod(); } +void initListeners() { + /* Setup listeners from server config for TCP/TLS/Unix */ + int conn_index; + connListener *listener; + if (server.port != 0) { + conn_index = connectionIndexByType(CONN_TYPE_SOCKET); + if (conn_index < 0) + serverPanic("Failed finding connection listener of %s", CONN_TYPE_SOCKET); + listener = &server.listeners[conn_index]; + listener->bindaddr = server.bindaddr; + listener->bindaddr_count = server.bindaddr_count; + listener->port = server.port; + listener->ct = connectionByType(CONN_TYPE_SOCKET); + } + + if (server.tls_port || server.tls_replication || server.tls_cluster) { + ConnectionType *ct_tls = connectionTypeTls(); + if (!ct_tls) { + serverLog(LL_WARNING, "Failed finding TLS support."); + exit(1); + } + if (connTypeConfigure(ct_tls, &server.tls_ctx_config, 1) == C_ERR) { + serverLog(LL_WARNING, "Failed to configure TLS. Check logs for more info."); + exit(1); + } + } + + if (server.tls_port != 0) { + conn_index = connectionIndexByType(CONN_TYPE_TLS); + if (conn_index < 0) + serverPanic("Failed finding connection listener of %s", CONN_TYPE_TLS); + listener = &server.listeners[conn_index]; + listener->bindaddr = server.bindaddr; + listener->bindaddr_count = server.bindaddr_count; + listener->port = server.tls_port; + listener->ct = connectionByType(CONN_TYPE_TLS); + } + if (server.unixsocket != NULL) { + conn_index = connectionIndexByType(CONN_TYPE_UNIX); + if (conn_index < 0) + serverPanic("Failed finding connection listener of %s", CONN_TYPE_UNIX); + listener = &server.listeners[conn_index]; + listener->bindaddr = &server.unixsocket; + listener->bindaddr_count = 1; + listener->ct = connectionByType(CONN_TYPE_UNIX); + listener->priv = &server.unixsocketperm; /* Unix socket specified */ + } + + /* create all the configured listener, and add handler to start to accept */ + int listen_fds = 0; + for (int j = 0; j < CONN_TYPE_MAX; j++) { + listener = &server.listeners[j]; + if (listener->ct == NULL) + continue; + + if (connListen(listener) == C_ERR) { + serverLog(LL_WARNING, "Failed listening on port %u (%s), aborting.", listener->port, listener->ct->get_type(NULL)); + exit(1); + } + + if (createSocketAcceptHandler(listener, connAcceptHandler(listener->ct)) != C_OK) + serverPanic("Unrecoverable error creating %s listener accept handler.", listener->ct->get_type(NULL)); + + listen_fds += listener->count; + } + + if (listen_fds == 0) { + serverLog(LL_WARNING, "Configured to not listen anywhere, exiting."); + exit(1); + } +} + /* Some steps in server initialization need to be done last (after modules * are loaded). * Specifically, creation of threads due to a race bug in ld.so, in which @@ -4065,11 +4085,16 @@ void incrementErrorCount(const char *fullerr, size_t namelen) { void closeListeningSockets(int unlink_unix_socket) { int j; - for (j = 0; j < server.ipfd.count; j++) close(server.ipfd.fd[j]); - for (j = 0; j < server.tlsfd.count; j++) close(server.tlsfd.fd[j]); - if (server.sofd != -1) close(server.sofd); + for (int i = 0; i < CONN_TYPE_MAX; i++) { + connListener *listener = &server.listeners[i]; + if (listener->ct == NULL) + continue; + + for (j = 0; j < listener->count; j++) close(listener->fd[j]); + } + if (server.cluster_enabled) - for (j = 0; j < server.cfd.count; j++) close(server.cfd.fd[j]); + for (j = 0; j < server.clistener.count; j++) close(server.clistener.fd[j]); if (unlink_unix_socket && server.unixsocket) { serverLog(LL_NOTICE,"Removing the unix socket file."); if (unlink(server.unixsocket) != 0) @@ -5409,6 +5434,9 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) { "shutdown_in_milliseconds:%I\r\n", (int64_t)(server.shutdown_mstime - server.mstime)); } + + /* get all the listeners information */ + info = getListensInfoString(info); } /* Clients */ @@ -5934,7 +5962,7 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) { long lag = 0; if (!slaveip) { - if (connPeerToString(slave->conn,ip,sizeof(ip),&port) == -1) + if (connAddrPeerName(slave->conn,ip,sizeof(ip),&port) == -1) continue; slaveip = ip; } @@ -6264,61 +6292,38 @@ void redisAsciiArt(void) { zfree(buf); } -int changeBindAddr(void) { - /* Close old TCP and TLS servers */ - closeSocketListeners(&server.ipfd); - closeSocketListeners(&server.tlsfd); - - /* Bind to the new port */ - if ((server.port != 0 && listenToPort(server.port, &server.ipfd) != C_OK) || - (server.tls_port != 0 && listenToPort(server.tls_port, &server.tlsfd) != C_OK)) { - serverLog(LL_WARNING, "Failed to bind"); - - closeSocketListeners(&server.ipfd); - closeSocketListeners(&server.tlsfd); - return C_ERR; - } - - /* Create TCP and TLS event handlers */ - if (createSocketAcceptHandler(&server.ipfd, acceptTcpHandler) != C_OK) { - serverPanic("Unrecoverable error creating TCP socket accept handler."); - } - if (createSocketAcceptHandler(&server.tlsfd, acceptTLSHandler) != C_OK) { - serverPanic("Unrecoverable error creating TLS socket accept handler."); - } +/* Get the server listener by type name */ +connListener *listenerByType(const char *typename) { + int conn_index; - if (server.set_proc_title) redisSetProcTitle(NULL); + conn_index = connectionIndexByType(typename); + if (conn_index < 0) + return NULL; - return C_OK; + return &server.listeners[conn_index]; } -int changeListenPort(int port, socketFds *sfd, aeFileProc *accept_handler) { - socketFds new_sfd = {{0}}; - +/* Close original listener, re-create a new listener from the updated bind address & port */ +int changeListener(connListener *listener) { /* Close old servers */ - closeSocketListeners(sfd); + closeListener(listener); /* Just close the server if port disabled */ - if (port == 0) { + if (listener->port == 0) { if (server.set_proc_title) redisSetProcTitle(NULL); return C_OK; } - /* Bind to the new port */ - if (listenToPort(port, &new_sfd) != C_OK) { + /* Re-create listener */ + if (connListen(listener) != C_OK) { return C_ERR; } /* Create event handlers */ - if (createSocketAcceptHandler(&new_sfd, accept_handler) != C_OK) { - closeSocketListeners(&new_sfd); - return C_ERR; + if (createSocketAcceptHandler(listener, listener->ct->accept_handler) != C_OK) { + serverPanic("Unrecoverable error creating %s accept handler.", listener->ct->get_type(NULL)); } - /* Copy new descriptors */ - sfd->count = new_sfd.count; - memcpy(sfd->fd, new_sfd.fd, sizeof(new_sfd.fd)); - if (server.set_proc_title) redisSetProcTitle(NULL); return C_OK; @@ -6944,7 +6949,7 @@ int main(int argc, char **argv) { ACLInit(); /* The ACL subsystem must be initialized ASAP because the basic networking code and client creation depends on it. */ moduleInitModulesSystem(); - tlsInit(); + connTypeInitialize(); /* Store the executable path and arguments in a safe place in order * to be able to restart the server later. */ @@ -7089,6 +7094,16 @@ int main(int argc, char **argv) { if (server.set_proc_title) redisSetProcTitle(NULL); redisAsciiArt(); checkTcpBacklogSettings(); + if (!server.sentinel_mode) { + moduleInitModulesSystemLast(); + moduleLoadFromQueue(); + } + ACLLoadUsersAtStartup(); + initListeners(); + if (server.cluster_enabled) { + clusterInit(); + } + InitServerLast(); if (!server.sentinel_mode) { /* Things not needed when running in Sentinel mode. */ @@ -7117,10 +7132,6 @@ int main(int argc, char **argv) { } #endif /* __arm64__ */ #endif /* __linux__ */ - moduleInitModulesSystemLast(); - moduleLoadFromQueue(); - ACLLoadUsersAtStartup(); - InitServerLast(); aofLoadManifestFromDisk(); loadDataFromDisk(); aofOpenIfNeededOnServerStart(); @@ -7133,10 +7144,15 @@ int main(int argc, char **argv) { exit(1); } } - if (server.ipfd.count > 0 || server.tlsfd.count > 0) - serverLog(LL_NOTICE,"Ready to accept connections"); - if (server.sofd > 0) - serverLog(LL_NOTICE,"The server is now ready to accept connections at %s", server.unixsocket); + + for (j = 0; j < CONN_TYPE_MAX; j++) { + connListener *listener = &server.listeners[j]; + if (listener->ct == NULL) + continue; + + serverLog(LL_NOTICE,"Ready to accept connections %s", listener->ct->get_type(NULL)); + } + if (server.supervised_mode == SUPERVISED_SYSTEMD) { if (!server.masterhost) { redisCommunicateSystemd("STATUS=Ready to accept connections\n"); @@ -7146,8 +7162,6 @@ int main(int argc, char **argv) { redisCommunicateSystemd("READY=1\n"); } } else { - ACLLoadUsersAtStartup(); - InitServerLast(); sentinelIsRunning(); if (server.supervised_mode == SUPERVISED_SYSTEMD) { redisCommunicateSystemd("STATUS=Ready to accept connections\n"); diff --git a/src/server.h b/src/server.h index c0f68c73f..8a0a2f5f6 100644 --- a/src/server.h +++ b/src/server.h @@ -81,6 +81,7 @@ typedef long long ustime_t; /* microsecond time type. */ #include "connection.h" /* Connection abstraction */ #define REDISMODULE_CORE 1 +typedef struct redisObject robj; #include "redismodule.h" /* Redis modules API defines. */ /* Following includes allow test functions to be called from Redis main() */ @@ -679,9 +680,6 @@ struct RedisModuleIO; struct RedisModuleDigest; struct RedisModuleCtx; struct moduleLoadQueueEntry; -struct redisObject; -struct RedisModuleDefragCtx; -struct RedisModuleInfoCtx; struct RedisModuleKeyOptCtx; struct RedisModuleCommand; @@ -701,20 +699,12 @@ typedef size_t (*moduleTypeFreeEffortFunc)(struct redisObject *key, const void * typedef void (*moduleTypeUnlinkFunc)(struct redisObject *key, void *value); typedef void *(*moduleTypeCopyFunc)(struct redisObject *fromkey, struct redisObject *tokey, const void *value); typedef int (*moduleTypeDefragFunc)(struct RedisModuleDefragCtx *ctx, struct redisObject *key, void **value); -typedef void (*RedisModuleInfoFunc)(struct RedisModuleInfoCtx *ctx, int for_crash_report); -typedef void (*RedisModuleDefragFunc)(struct RedisModuleDefragCtx *ctx); typedef size_t (*moduleTypeMemUsageFunc2)(struct RedisModuleKeyOptCtx *ctx, const void *value, size_t sample_size); typedef void (*moduleTypeFreeFunc2)(struct RedisModuleKeyOptCtx *ctx, void *value); typedef size_t (*moduleTypeFreeEffortFunc2)(struct RedisModuleKeyOptCtx *ctx, const void *value); typedef void (*moduleTypeUnlinkFunc2)(struct RedisModuleKeyOptCtx *ctx, void *value); typedef void *(*moduleTypeCopyFunc2)(struct RedisModuleKeyOptCtx *ctx, const void *value); -/* This callback type is called by moduleNotifyUserChanged() every time - * a user authenticated via the module API is associated with a different - * user or gets disconnected. This needs to be exposed since you can't cast - * a function pointer to (void *). */ -typedef void (*RedisModuleUserChangedFunc) (uint64_t client_id, void *privdata); - /* The module type, which is referenced in each value of a given type, defines * the methods and links to the module exporting the type. */ @@ -786,7 +776,7 @@ typedef struct RedisModule RedisModule; /* This is a wrapper for the 'rio' streams used inside rdb.c in Redis, so that * the user does not have to take the total count of the written bytes nor * to care about error conditions. */ -typedef struct RedisModuleIO { +struct RedisModuleIO { size_t bytes; /* Bytes read / written so far. */ rio *rio; /* Rio stream. */ moduleType *type; /* Module type doing the operation. */ @@ -794,7 +784,7 @@ typedef struct RedisModuleIO { struct RedisModuleCtx *ctx; /* Optional context, see RM_GetContextFromIO()*/ struct redisObject *key; /* Optional name of key processed */ int dbid; /* The dbid of the key being processed, -1 when unknown. */ -} RedisModuleIO; +}; /* Macro to initialize an IO context. Note that the 'ver' field is populated * inside rdb.c according to the version of the value to load. */ @@ -813,12 +803,12 @@ typedef struct RedisModuleIO { * a data structure, so that a digest can be created in a way that correctly * reflects the values. See the DEBUG DIGEST command implementation for more * background. */ -typedef struct RedisModuleDigest { +struct RedisModuleDigest { unsigned char o[20]; /* Ordered elements. */ unsigned char x[20]; /* Xored elements. */ struct redisObject *key; /* Optional name of key processed */ int dbid; /* The dbid of the key being processed */ -} RedisModuleDigest; +}; /* Just start with a digest composed of all zero bytes. */ #define moduleInitDigestContext(mdvar) do { \ @@ -849,7 +839,7 @@ typedef struct RedisModuleDigest { #define OBJ_SHARED_REFCOUNT INT_MAX /* Global object never destroyed. */ #define OBJ_STATIC_REFCOUNT (INT_MAX-1) /* Object allocated in the stack. */ #define OBJ_FIRST_SPECIAL_REFCOUNT OBJ_STATIC_REFCOUNT -typedef struct redisObject { +struct redisObject { unsigned type:4; unsigned encoding:4; unsigned lru:LRU_BITS; /* LRU time (relative to global lru_clock) or @@ -857,7 +847,7 @@ typedef struct redisObject { * and most significant 16 bits access time). */ int refcount; void *ptr; -} robj; +}; /* The a string name for an object's type as listed above * Native types are checked against the OBJ_STRING, OBJ_LIST, OBJ_* defines, @@ -1376,11 +1366,6 @@ struct malloc_stats { size_t allocator_resident; }; -typedef struct socketFds { - int fd[CONFIG_BINDADDR_MAX]; - int count; -} socketFds; - /*----------------------------------------------------------------------------- * TLS Context Configuration *----------------------------------------------------------------------------*/ @@ -1514,11 +1499,9 @@ struct redisServer { char *bind_source_addr; /* Source address to bind on for outgoing connections */ char *unixsocket; /* UNIX socket path */ unsigned int unixsocketperm; /* UNIX socket permission (see mode_t) */ - socketFds ipfd; /* TCP socket file descriptors */ - socketFds tlsfd; /* TLS socket file descriptors */ - int sofd; /* Unix socket file descriptor */ + connListener listeners[CONN_TYPE_MAX]; /* TCP/Unix/TLS even more types */ uint32_t socket_mark_id; /* ID for listen socket marking */ - socketFds cfd; /* Cluster bus listening socket */ + connListener clistener; /* Cluster bus listener */ list *clients; /* List of active clients */ list *clients_to_close; /* Clients to close asynchronously */ list *clients_pending_write; /* There is to write or install handler. */ @@ -2460,9 +2443,7 @@ void setDeferredSetLen(client *c, void *node, long length); void setDeferredAttributeLen(client *c, void *node, long length); void setDeferredPushLen(client *c, void *node, long length); int processInputBuffer(client *c); -void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask); -void acceptTLSHandler(aeEventLoop *el, int fd, void *privdata, int mask); -void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask); +void acceptCommonHandler(connection *conn, int flags, char *ip); void readQueryFromClient(connection *conn); int prepareClientToWrite(client *c); void addReplyNull(client *c); @@ -2527,7 +2508,7 @@ char *getClientTypeName(int class); void flushSlavesOutputBuffers(void); void disconnectSlaves(void); void evictClients(void); -int listenToPort(int port, socketFds *fds); +int listenToPort(connListener *fds); void pauseClients(pause_purpose purpose, mstime_t end, pause_type type); void unpauseClients(pause_purpose purpose); int areClientsPaused(void); @@ -2893,9 +2874,10 @@ int processCommand(client *c); int processPendingCommandAndInputBuffer(client *c); void setupSignalHandlers(void); void removeSignalHandlers(void); -int createSocketAcceptHandler(socketFds *sfd, aeFileProc *accept_handler); -int changeListenPort(int port, socketFds *sfd, aeFileProc *accept_handler); -int changeBindAddr(void); +int createSocketAcceptHandler(connListener *sfd, aeFileProc *accept_handler); +connListener *listenerByType(const char *typename); +int changeListener(connListener *listener); +void closeListener(connListener *listener); struct redisCommand *lookupSubcommand(struct redisCommand *container, sds sub_name); struct redisCommand *lookupCommand(robj **argv, int argc); struct redisCommand *lookupCommandBySdsLogic(dict *commands, sds s); @@ -3277,6 +3259,7 @@ void *dictSdsDup(dict *d, const void *key); char *redisGitSHA1(void); char *redisGitDirty(void); uint64_t redisBuildId(void); +const char *redisBuildIdRaw(void); char *redisBuildIdString(void); /* Commands prototypes */ @@ -3579,12 +3562,6 @@ void swapMainDbWithTempDb(redisDb *tempDb); _serverLog(level, __VA_ARGS__);\ } while(0) -/* TLS stuff */ -void tlsInit(void); -void tlsCleanup(void); -int tlsConfigure(redisTLSContextConfig *ctx_config); -int isTlsConfigured(void); - #define redisDebug(fmt, ...) \ printf("DEBUG %s:%d > " fmt "\n", __FILE__, __LINE__, __VA_ARGS__) #define redisDebugMark() \ diff --git a/src/socket.c b/src/socket.c new file mode 100644 index 000000000..83000eaf8 --- /dev/null +++ b/src/socket.c @@ -0,0 +1,453 @@ +/* + * Copyright (c) 2019, Redis Labs + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of Redis nor the names of its contributors may be used + * to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "server.h" +#include "connhelpers.h" + +/* The connections module provides a lean abstraction of network connections + * to avoid direct socket and async event management across the Redis code base. + * + * It does NOT provide advanced connection features commonly found in similar + * libraries such as complete in/out buffer management, throttling, etc. These + * functions remain in networking.c. + * + * The primary goal is to allow transparent handling of TCP and TLS based + * connections. To do so, connections have the following properties: + * + * 1. A connection may live before its corresponding socket exists. This + * allows various context and configuration setting to be handled before + * establishing the actual connection. + * 2. The caller may register/unregister logical read/write handlers to be + * called when the connection has data to read from/can accept writes. + * These logical handlers may or may not correspond to actual AE events, + * depending on the implementation (for TCP they are; for TLS they aren't). + */ + +static ConnectionType CT_Socket; + +/* When a connection is created we must know its type already, but the + * underlying socket may or may not exist: + * + * - For accepted connections, it exists as we do not model the listen/accept + * part; So caller calls connCreateSocket() followed by connAccept(). + * - For outgoing connections, the socket is created by the connection module + * itself; So caller calls connCreateSocket() followed by connConnect(), + * which registers a connect callback that fires on connected/error state + * (and after any transport level handshake was done). + * + * NOTE: An earlier version relied on connections being part of other structs + * and not independently allocated. This could lead to further optimizations + * like using container_of(), etc. However it was discontinued in favor of + * this approach for these reasons: + * + * 1. In some cases conns are created/handled outside the context of the + * containing struct, in which case it gets a bit awkward to copy them. + * 2. Future implementations may wish to allocate arbitrary data for the + * connection. + * 3. The container_of() approach is anyway risky because connections may + * be embedded in different structs, not just client. + */ + +static connection *connCreateSocket(void) { + connection *conn = zcalloc(sizeof(connection)); + conn->type = &CT_Socket; + conn->fd = -1; + + return conn; +} + +/* Create a new socket-type connection that is already associated with + * an accepted connection. + * + * The socket is not ready for I/O until connAccept() was called and + * invoked the connection-level accept handler. + * + * Callers should use connGetState() and verify the created connection + * is not in an error state (which is not possible for a socket connection, + * but could but possible with other protocols). + */ +static connection *connCreateAcceptedSocket(int fd, void *priv) { + UNUSED(priv); + connection *conn = connCreateSocket(); + conn->fd = fd; + conn->state = CONN_STATE_ACCEPTING; + return conn; +} + +static int connSocketConnect(connection *conn, const char *addr, int port, const char *src_addr, + ConnectionCallbackFunc connect_handler) { + int fd = anetTcpNonBlockBestEffortBindConnect(NULL,addr,port,src_addr); + if (fd == -1) { + conn->state = CONN_STATE_ERROR; + conn->last_errno = errno; + return C_ERR; + } + + conn->fd = fd; + conn->state = CONN_STATE_CONNECTING; + + conn->conn_handler = connect_handler; + aeCreateFileEvent(server.el, conn->fd, AE_WRITABLE, + conn->type->ae_handler, conn); + + return C_OK; +} + +/* ------ Pure socket connections ------- */ + +/* A very incomplete list of implementation-specific calls. Much of the above shall + * move here as we implement additional connection types. + */ + +/* Close the connection and free resources. */ +static void connSocketClose(connection *conn) { + if (conn->fd != -1) { + aeDeleteFileEvent(server.el,conn->fd, AE_READABLE | AE_WRITABLE); + close(conn->fd); + conn->fd = -1; + } + + /* If called from within a handler, schedule the close but + * keep the connection until the handler returns. + */ + if (connHasRefs(conn)) { + conn->flags |= CONN_FLAG_CLOSE_SCHEDULED; + return; + } + + zfree(conn); +} + +static int connSocketWrite(connection *conn, const void *data, size_t data_len) { + int ret = write(conn->fd, data, data_len); + if (ret < 0 && errno != EAGAIN) { + conn->last_errno = errno; + + /* Don't overwrite the state of a connection that is not already + * connected, not to mess with handler callbacks. + */ + if (errno != EINTR && conn->state == CONN_STATE_CONNECTED) + conn->state = CONN_STATE_ERROR; + } + + return ret; +} + +static int connSocketWritev(connection *conn, const struct iovec *iov, int iovcnt) { + int ret = writev(conn->fd, iov, iovcnt); + if (ret < 0 && errno != EAGAIN) { + conn->last_errno = errno; + + /* Don't overwrite the state of a connection that is not already + * connected, not to mess with handler callbacks. + */ + if (errno != EINTR && conn->state == CONN_STATE_CONNECTED) + conn->state = CONN_STATE_ERROR; + } + + return ret; +} + +static int connSocketRead(connection *conn, void *buf, size_t buf_len) { + int ret = read(conn->fd, buf, buf_len); + if (!ret) { + conn->state = CONN_STATE_CLOSED; + } else if (ret < 0 && errno != EAGAIN) { + conn->last_errno = errno; + + /* Don't overwrite the state of a connection that is not already + * connected, not to mess with handler callbacks. + */ + if (errno != EINTR && conn->state == CONN_STATE_CONNECTED) + conn->state = CONN_STATE_ERROR; + } + + return ret; +} + +static int connSocketAccept(connection *conn, ConnectionCallbackFunc accept_handler) { + int ret = C_OK; + + if (conn->state != CONN_STATE_ACCEPTING) return C_ERR; + conn->state = CONN_STATE_CONNECTED; + + connIncrRefs(conn); + if (!callHandler(conn, accept_handler)) ret = C_ERR; + connDecrRefs(conn); + + return ret; +} + +/* Register a write handler, to be called when the connection is writable. + * If NULL, the existing handler is removed. + * + * The barrier flag indicates a write barrier is requested, resulting with + * CONN_FLAG_WRITE_BARRIER set. This will ensure that the write handler is + * always called before and not after the read handler in a single event + * loop. + */ +static int connSocketSetWriteHandler(connection *conn, ConnectionCallbackFunc func, int barrier) { + if (func == conn->write_handler) return C_OK; + + conn->write_handler = func; + if (barrier) + conn->flags |= CONN_FLAG_WRITE_BARRIER; + else + conn->flags &= ~CONN_FLAG_WRITE_BARRIER; + if (!conn->write_handler) + aeDeleteFileEvent(server.el,conn->fd,AE_WRITABLE); + else + if (aeCreateFileEvent(server.el,conn->fd,AE_WRITABLE, + conn->type->ae_handler,conn) == AE_ERR) return C_ERR; + return C_OK; +} + +/* Register a read handler, to be called when the connection is readable. + * If NULL, the existing handler is removed. + */ +static int connSocketSetReadHandler(connection *conn, ConnectionCallbackFunc func) { + if (func == conn->read_handler) return C_OK; + + conn->read_handler = func; + if (!conn->read_handler) + aeDeleteFileEvent(server.el,conn->fd,AE_READABLE); + else + if (aeCreateFileEvent(server.el,conn->fd, + AE_READABLE,conn->type->ae_handler,conn) == AE_ERR) return C_ERR; + return C_OK; +} + +static const char *connSocketGetLastError(connection *conn) { + return strerror(conn->last_errno); +} + +static void connSocketEventHandler(struct aeEventLoop *el, int fd, void *clientData, int mask) +{ + UNUSED(el); + UNUSED(fd); + connection *conn = clientData; + + if (conn->state == CONN_STATE_CONNECTING && + (mask & AE_WRITABLE) && conn->conn_handler) { + + int conn_error = anetGetError(conn->fd); + if (conn_error) { + conn->last_errno = conn_error; + conn->state = CONN_STATE_ERROR; + } else { + conn->state = CONN_STATE_CONNECTED; + } + + if (!conn->write_handler) aeDeleteFileEvent(server.el,conn->fd,AE_WRITABLE); + + if (!callHandler(conn, conn->conn_handler)) return; + conn->conn_handler = NULL; + } + + /* Normally we execute the readable event first, and the writable + * event later. This is useful as sometimes we may be able + * to serve the reply of a query immediately after processing the + * query. + * + * However if WRITE_BARRIER is set in the mask, our application is + * asking us to do the reverse: never fire the writable event + * after the readable. In such a case, we invert the calls. + * This is useful when, for instance, we want to do things + * in the beforeSleep() hook, like fsync'ing a file to disk, + * before replying to a client. */ + int invert = conn->flags & CONN_FLAG_WRITE_BARRIER; + + int call_write = (mask & AE_WRITABLE) && conn->write_handler; + int call_read = (mask & AE_READABLE) && conn->read_handler; + + /* Handle normal I/O flows */ + if (!invert && call_read) { + if (!callHandler(conn, conn->read_handler)) return; + } + /* Fire the writable event. */ + if (call_write) { + if (!callHandler(conn, conn->write_handler)) return; + } + /* If we have to invert the call, fire the readable event now + * after the writable one. */ + if (invert && call_read) { + if (!callHandler(conn, conn->read_handler)) return; + } +} + +static void connSocketAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) { + int cport, cfd, max = MAX_ACCEPTS_PER_CALL; + char cip[NET_IP_STR_LEN]; + UNUSED(el); + UNUSED(mask); + UNUSED(privdata); + + while(max--) { + cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport); + if (cfd == ANET_ERR) { + if (errno != EWOULDBLOCK) + serverLog(LL_WARNING, + "Accepting client connection: %s", server.neterr); + return; + } + serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport); + acceptCommonHandler(connCreateAcceptedSocket(cfd, NULL),0,cip); + } +} + +static int connSocketAddr(connection *conn, char *ip, size_t ip_len, int *port, int remote) { + if (anetFdToString(conn->fd, ip, ip_len, port, remote) == 0) + return C_OK; + + conn->last_errno = errno; + return C_ERR; +} + +static int connSocketListen(connListener *listener) { + return listenToPort(listener); +} + +static int connSocketBlockingConnect(connection *conn, const char *addr, int port, long long timeout) { + int fd = anetTcpNonBlockConnect(NULL,addr,port); + if (fd == -1) { + conn->state = CONN_STATE_ERROR; + conn->last_errno = errno; + return C_ERR; + } + + if ((aeWait(fd, AE_WRITABLE, timeout) & AE_WRITABLE) == 0) { + conn->state = CONN_STATE_ERROR; + conn->last_errno = ETIMEDOUT; + } + + conn->fd = fd; + conn->state = CONN_STATE_CONNECTED; + return C_OK; +} + +/* Connection-based versions of syncio.c functions. + * NOTE: This should ideally be refactored out in favor of pure async work. + */ + +static ssize_t connSocketSyncWrite(connection *conn, char *ptr, ssize_t size, long long timeout) { + return syncWrite(conn->fd, ptr, size, timeout); +} + +static ssize_t connSocketSyncRead(connection *conn, char *ptr, ssize_t size, long long timeout) { + return syncRead(conn->fd, ptr, size, timeout); +} + +static ssize_t connSocketSyncReadLine(connection *conn, char *ptr, ssize_t size, long long timeout) { + return syncReadLine(conn->fd, ptr, size, timeout); +} + +static const char *connSocketGetType(connection *conn) { + (void) conn; + + return CONN_TYPE_SOCKET; +} + +static ConnectionType CT_Socket = { + /* connection type */ + .get_type = connSocketGetType, + + /* connection type initialize & finalize & configure */ + .init = NULL, + .cleanup = NULL, + .configure = NULL, + + /* ae & accept & listen & error & address handler */ + .ae_handler = connSocketEventHandler, + .accept_handler = connSocketAcceptHandler, + .addr = connSocketAddr, + .listen = connSocketListen, + + /* create/close connection */ + .conn_create = connCreateSocket, + .conn_create_accepted = connCreateAcceptedSocket, + .close = connSocketClose, + + /* connect & accept */ + .connect = connSocketConnect, + .blocking_connect = connSocketBlockingConnect, + .accept = connSocketAccept, + + /* IO */ + .write = connSocketWrite, + .writev = connSocketWritev, + .read = connSocketRead, + .set_write_handler = connSocketSetWriteHandler, + .set_read_handler = connSocketSetReadHandler, + .get_last_error = connSocketGetLastError, + .sync_write = connSocketSyncWrite, + .sync_read = connSocketSyncRead, + .sync_readline = connSocketSyncReadLine, + + /* pending data */ + .has_pending_data = NULL, + .process_pending_data = NULL, +}; + +int connBlock(connection *conn) { + if (conn->fd == -1) return C_ERR; + return anetBlock(NULL, conn->fd); +} + +int connNonBlock(connection *conn) { + if (conn->fd == -1) return C_ERR; + return anetNonBlock(NULL, conn->fd); +} + +int connEnableTcpNoDelay(connection *conn) { + if (conn->fd == -1) return C_ERR; + return anetEnableTcpNoDelay(NULL, conn->fd); +} + +int connDisableTcpNoDelay(connection *conn) { + if (conn->fd == -1) return C_ERR; + return anetDisableTcpNoDelay(NULL, conn->fd); +} + +int connKeepAlive(connection *conn, int interval) { + if (conn->fd == -1) return C_ERR; + return anetKeepAlive(NULL, conn->fd, interval); +} + +int connSendTimeout(connection *conn, long long ms) { + return anetSendTimeout(NULL, conn->fd, ms); +} + +int connRecvTimeout(connection *conn, long long ms) { + return anetRecvTimeout(NULL, conn->fd, ms); +} + +int RedisRegisterConnectionTypeSocket() +{ + return connTypeRegister(&CT_Socket); +} @@ -27,12 +27,13 @@ * POSSIBILITY OF SUCH DAMAGE. */ +#define REDISMODULE_CORE_MODULE /* A module that's part of the redis core, uses server.h too. */ #include "server.h" #include "connhelpers.h" #include "adlist.h" -#ifdef USE_OPENSSL +#if (USE_OPENSSL == 1 /* BUILD_YES */ ) || ((USE_OPENSSL == 2 /* BUILD_MODULE */) && (BUILD_TLS_MODULE == 2)) #include <openssl/conf.h> #include <openssl/ssl.h> @@ -56,8 +57,6 @@ #define REDIS_TLS_PROTO_DEFAULT (REDIS_TLS_PROTO_TLSv1_2) #endif -extern ConnectionType CT_Socket; - SSL_CTX *redis_tls_ctx = NULL; SSL_CTX *redis_tls_client_ctx = NULL; @@ -141,7 +140,7 @@ static void initCryptoLocks(void) { } #endif /* USE_CRYPTO_LOCKS */ -void tlsInit(void) { +static void tlsInit(void) { /* Enable configuring OpenSSL using the standard openssl.cnf * OPENSSL_config()/OPENSSL_init_crypto() should be the first * call to the OpenSSL* library. @@ -169,7 +168,7 @@ void tlsInit(void) { pending_list = listCreate(); } -void tlsCleanup(void) { +static void tlsCleanup(void) { if (redis_tls_ctx) { SSL_CTX_free(redis_tls_ctx); redis_tls_ctx = NULL; @@ -281,12 +280,20 @@ error: /* Attempt to configure/reconfigure TLS. This operation is atomic and will * leave the SSL_CTX unchanged if fails. + * @priv: config of redisTLSContextConfig. + * @reconfigure: if true, ignore the previous configure; if false, only + * configure from @ctx_config if redis_tls_ctx is NULL. */ -int tlsConfigure(redisTLSContextConfig *ctx_config) { +static int tlsConfigure(void *priv, int reconfigure) { + redisTLSContextConfig *ctx_config = (redisTLSContextConfig *)priv; char errbuf[256]; SSL_CTX *ctx = NULL; SSL_CTX *client_ctx = NULL; + if (!reconfigure && redis_tls_ctx) { + return C_OK; + } + if (!ctx_config->cert_file) { serverLog(LL_WARNING, "No tls-cert-file configured!"); goto error; @@ -406,12 +413,6 @@ error: return C_ERR; } -/* Return 1 if TLS was already configured, 0 otherwise. - */ -int isTlsConfigured(void) { - return redis_tls_ctx != NULL; -} - #ifdef TLS_DEBUGGING #define TLSCONN_DEBUG(fmt, ...) \ serverLog(LL_DEBUG, "TLSCONN: " fmt, __VA_ARGS__) @@ -419,7 +420,7 @@ int isTlsConfigured(void) { #define TLSCONN_DEBUG(fmt, ...) #endif -ConnectionType CT_TLS; +static ConnectionType CT_TLS; /* Normal socket connections have a simple events/handler correlation. * @@ -464,7 +465,7 @@ static connection *createTLSConnection(int client_side) { return (connection *) conn; } -connection *connCreateTLS(void) { +static connection *connCreateTLS(void) { return createTLSConnection(1); } @@ -485,7 +486,8 @@ static void updateTLSError(tls_connection *conn) { * Callers should use connGetState() and verify the created connection * is not in an error state. */ -connection *connCreateAcceptedTLS(int fd, int require_auth) { +static connection *connCreateAcceptedTLS(int fd, void *priv) { + int require_auth = *(int *)priv; tls_connection *conn = (tls_connection *) createTLSConnection(0); conn->c.fd = fd; conn->c.state = CONN_STATE_ACCEPTING; @@ -548,7 +550,7 @@ static int handleSSLReturnCode(tls_connection *conn, int ret_value, WantIOType * return 0; } -void registerSSLEvent(tls_connection *conn, WantIOType want) { +static void registerSSLEvent(tls_connection *conn, WantIOType want) { int mask = aeGetFileEvents(server.el, conn->c.fd); switch (want) { @@ -568,7 +570,7 @@ void registerSSLEvent(tls_connection *conn, WantIOType want) { } } -void updateSSLEvent(tls_connection *conn) { +static void updateSSLEvent(tls_connection *conn) { int mask = aeGetFileEvents(server.el, conn->c.fd); int need_read = conn->c.read_handler || (conn->flags & TLS_CONN_FLAG_WRITE_WANT_READ); int need_write = conn->c.write_handler || (conn->flags & TLS_CONN_FLAG_READ_WANT_WRITE); @@ -595,7 +597,7 @@ static void tlsHandleEvent(tls_connection *conn, int mask) { switch (conn->c.state) { case CONN_STATE_CONNECTING: - conn_error = connGetSocketError((connection *) conn); + conn_error = anetGetError(conn->c.fd); if (conn_error) { conn->c.last_errno = conn_error; conn->c.state = CONN_STATE_ERROR; @@ -718,6 +720,34 @@ static void tlsEventHandler(struct aeEventLoop *el, int fd, void *clientData, in tlsHandleEvent(conn, mask); } +static void tlsAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) { + int cport, cfd, max = MAX_ACCEPTS_PER_CALL; + char cip[NET_IP_STR_LEN]; + UNUSED(el); + UNUSED(mask); + UNUSED(privdata); + + while(max--) { + cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport); + if (cfd == ANET_ERR) { + if (errno != EWOULDBLOCK) + serverLog(LL_WARNING, + "Accepting client connection: %s", server.neterr); + return; + } + serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport); + acceptCommonHandler(connCreateAcceptedTLS(cfd, &server.tls_auth_clients),0,cip); + } +} + +static int connTLSAddr(connection *conn, char *ip, size_t ip_len, int *port, int remote) { + return anetFdToString(conn->fd, ip, ip_len, port, remote); +} + +static int connTLSListen(connListener *listener) { + return listenToPort(listener); +} + static void connTLSClose(connection *conn_) { tls_connection *conn = (tls_connection *) conn_; @@ -738,7 +768,7 @@ static void connTLSClose(connection *conn_) { conn->pending_list_node = NULL; } - CT_Socket.close(conn_); + connectionTypeTcp()->close(conn_); } static int connTLSAccept(connection *_conn, ConnectionCallbackFunc accept_handler) { @@ -777,7 +807,7 @@ static int connTLSConnect(connection *conn_, const char *addr, int port, const c ERR_clear_error(); /* Initiate Socket connection first */ - if (CT_Socket.connect(conn_, addr, port, src_addr, connect_handler) == C_ERR) return C_ERR; + if (connectionTypeTcp()->connect(conn_, addr, port, src_addr, connect_handler) == C_ERR) return C_ERR; /* Return now, once the socket is connected we'll initiate * TLS connection from the event handler. @@ -905,7 +935,7 @@ static const char *connTLSGetLastError(connection *conn_) { return NULL; } -int connTLSSetWriteHandler(connection *conn, ConnectionCallbackFunc func, int barrier) { +static int connTLSSetWriteHandler(connection *conn, ConnectionCallbackFunc func, int barrier) { conn->write_handler = func; if (barrier) conn->flags |= CONN_FLAG_WRITE_BARRIER; @@ -915,7 +945,7 @@ int connTLSSetWriteHandler(connection *conn, ConnectionCallbackFunc func, int ba return C_OK; } -int connTLSSetReadHandler(connection *conn, ConnectionCallbackFunc func) { +static int connTLSSetReadHandler(connection *conn, ConnectionCallbackFunc func) { conn->read_handler = func; updateSSLEvent((tls_connection *) conn); return C_OK; @@ -940,7 +970,7 @@ static int connTLSBlockingConnect(connection *conn_, const char *addr, int port, if (conn->c.state != CONN_STATE_NONE) return C_ERR; /* Initiate socket blocking connect first */ - if (CT_Socket.blocking_connect(conn_, addr, port, timeout) == C_ERR) return C_ERR; + if (connectionTypeTcp()->blocking_connect(conn_, addr, port, timeout) == C_ERR) return C_ERR; /* Initiate TLS connection now. We set up a send/recv timeout on the socket, * which means the specified timeout will not be enforced accurately. */ @@ -1009,37 +1039,19 @@ exit: return nread; } -static int connTLSGetType(connection *conn_) { +static const char *connTLSGetType(connection *conn_) { (void) conn_; return CONN_TYPE_TLS; } -ConnectionType CT_TLS = { - .ae_handler = tlsEventHandler, - .accept = connTLSAccept, - .connect = connTLSConnect, - .blocking_connect = connTLSBlockingConnect, - .read = connTLSRead, - .write = connTLSWrite, - .writev = connTLSWritev, - .close = connTLSClose, - .set_write_handler = connTLSSetWriteHandler, - .set_read_handler = connTLSSetReadHandler, - .get_last_error = connTLSGetLastError, - .sync_write = connTLSSyncWrite, - .sync_read = connTLSSyncRead, - .sync_readline = connTLSSyncReadLine, - .get_type = connTLSGetType -}; - -int tlsHasPendingData() { +static int tlsHasPendingData() { if (!pending_list) return 0; return listLength(pending_list) > 0; } -int tlsProcessPendingData() { +static int tlsProcessPendingData() { listIter li; listNode *ln; @@ -1055,9 +1067,9 @@ int tlsProcessPendingData() { /* Fetch the peer certificate used for authentication on the specified * connection and return it as a PEM-encoded sds. */ -sds connTLSGetPeerCert(connection *conn_) { +static sds connTLSGetPeerCert(connection *conn_) { tls_connection *conn = (tls_connection *) conn_; - if (conn_->type->get_type(conn_) != CONN_TYPE_TLS || !conn->ssl) return NULL; + if ((conn_->type != connectionTypeTls()) || !conn->ssl) return NULL; X509 *cert = SSL_get_peer_certificate(conn->ssl); if (!cert) return NULL; @@ -1076,41 +1088,97 @@ sds connTLSGetPeerCert(connection *conn_) { return cert_pem; } -#else /* USE_OPENSSL */ +static ConnectionType CT_TLS = { + /* connection type */ + .get_type = connTLSGetType, -void tlsInit(void) { -} + /* connection type initialize & finalize & configure */ + .init = tlsInit, + .cleanup = tlsCleanup, + .configure = tlsConfigure, -void tlsCleanup(void) { -} + /* ae & accept & listen & error & address handler */ + .ae_handler = tlsEventHandler, + .accept_handler = tlsAcceptHandler, + .addr = connTLSAddr, + .listen = connTLSListen, -int tlsConfigure(redisTLSContextConfig *ctx_config) { - UNUSED(ctx_config); - return C_OK; -} + /* create/close connection */ + .conn_create = connCreateTLS, + .conn_create_accepted = connCreateAcceptedTLS, + .close = connTLSClose, -connection *connCreateTLS(void) { - return NULL; -} + /* connect & accept */ + .connect = connTLSConnect, + .blocking_connect = connTLSBlockingConnect, + .accept = connTLSAccept, -connection *connCreateAcceptedTLS(int fd, int require_auth) { - UNUSED(fd); - UNUSED(require_auth); + /* IO */ + .read = connTLSRead, + .write = connTLSWrite, + .writev = connTLSWritev, + .set_write_handler = connTLSSetWriteHandler, + .set_read_handler = connTLSSetReadHandler, + .get_last_error = connTLSGetLastError, + .sync_write = connTLSSyncWrite, + .sync_read = connTLSSyncRead, + .sync_readline = connTLSSyncReadLine, - return NULL; -} + /* pending data */ + .has_pending_data = tlsHasPendingData, + .process_pending_data = tlsProcessPendingData, -int tlsHasPendingData() { - return 0; + /* TLS specified methods */ + .get_peer_cert = connTLSGetPeerCert, +}; + +int RedisRegisterConnectionTypeTLS() { + return connTypeRegister(&CT_TLS); } -int tlsProcessPendingData() { - return 0; +#else /* USE_OPENSSL */ + +int RedisRegisterConnectionTypeTLS() { + serverLog(LL_VERBOSE, "Connection type %s not builtin", CONN_TYPE_TLS); + return C_ERR; } -sds connTLSGetPeerCert(connection *conn_) { - (void) conn_; - return NULL; +#endif + +#if BUILD_TLS_MODULE == 2 /* BUILD_MODULE */ + +#include "release.h" + +int RedisModule_OnLoad(void *ctx, RedisModuleString **argv, int argc) { + UNUSED(argv); + UNUSED(argc); + + /* Connection modules must be part of the same build as redis. */ + if (strcmp(REDIS_BUILD_ID_RAW, redisBuildIdRaw())) { + serverLog(LL_NOTICE, "Connection type %s was not built together with the redis-server used.", CONN_TYPE_TLS); + return REDISMODULE_ERR; + } + + if (RedisModule_Init(ctx,"tls",1,REDISMODULE_APIVER_1) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + /* Connection modules is available only bootup. */ + if ((RedisModule_GetContextFlags(ctx) & REDISMODULE_CTX_FLAGS_SERVER_STARTUP) == 0) { + serverLog(LL_NOTICE, "Connection type %s can be loaded only during bootup", CONN_TYPE_TLS); + return REDISMODULE_ERR; + } + + RedisModule_SetModuleOptions(ctx, REDISMODULE_OPTIONS_HANDLE_REPL_ASYNC_LOAD); + + if(connTypeRegister(&CT_TLS) != C_OK) + return REDISMODULE_ERR; + + return REDISMODULE_OK; } +int RedisModule_OnUnload(void *arg) { + UNUSED(arg); + serverLog(LL_NOTICE, "Connection type %s can not be unloaded", CONN_TYPE_TLS); + return REDISMODULE_ERR; +} #endif diff --git a/src/unix.c b/src/unix.c new file mode 100644 index 000000000..c4ffad4fb --- /dev/null +++ b/src/unix.c @@ -0,0 +1,194 @@ +/* ========================================================================== + * unix.c - unix socket connection implementation + * -------------------------------------------------------------------------- + * Copyright (C) 2022 zhenwei pi + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the + * "Software"), to deal in the Software without restriction, including + * without limitation the rights to use, copy, modify, merge, publish, + * distribute, sublicense, and/or sell copies of the Software, and to permit + * persons to whom the Software is furnished to do so, subject to the + * following conditions: + * + * The above copyright notice and this permission notice shall be included + * in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS + * OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN + * NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, + * DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR + * OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE + * USE OR OTHER DEALINGS IN THE SOFTWARE. + * ========================================================================== + */ + +#include "server.h" +#include "connection.h" + +static ConnectionType CT_Unix; + +static const char *connUnixGetType(connection *conn) { + UNUSED(conn); + + return CONN_TYPE_UNIX; +} + +static void connUnixEventHandler(struct aeEventLoop *el, int fd, void *clientData, int mask) { + connectionTypeTcp()->ae_handler(el, fd, clientData, mask); +} + +static int connUnixAddr(connection *conn, char *ip, size_t ip_len, int *port, int remote) { + return connectionTypeTcp()->addr(conn, ip, ip_len, port, remote); +} + +static int connUnixListen(connListener *listener) { + int fd; + mode_t *perm = (mode_t *)listener->priv; + + if (listener->bindaddr_count == 0) + return C_OK; + + /* currently listener->bindaddr_count is always 1, we still use a loop here in case Redis supports multi Unix socket in the future */ + for (int j = 0; j < listener->bindaddr_count; j++) { + char *addr = listener->bindaddr[j]; + + unlink(addr); /* don't care if this fails */ + fd = anetUnixServer(server.neterr, addr, *perm, server.tcp_backlog); + if (fd == ANET_ERR) { + serverLog(LL_WARNING, "Failed opening Unix socket: %s", server.neterr); + exit(1); + } + anetNonBlock(NULL, fd); + anetCloexec(fd); + listener->fd[listener->count++] = fd; + } + + return C_OK; +} + +static connection *connCreateUnix(void) { + connection *conn = zcalloc(sizeof(connection)); + conn->type = &CT_Unix; + conn->fd = -1; + + return conn; +} + +static connection *connCreateAcceptedUnix(int fd, void *priv) { + UNUSED(priv); + connection *conn = connCreateUnix(); + conn->fd = fd; + conn->state = CONN_STATE_ACCEPTING; + return conn; +} + +static void connUnixAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) { + int cfd, max = MAX_ACCEPTS_PER_CALL; + UNUSED(el); + UNUSED(mask); + UNUSED(privdata); + + while(max--) { + cfd = anetUnixAccept(server.neterr, fd); + if (cfd == ANET_ERR) { + if (errno != EWOULDBLOCK) + serverLog(LL_WARNING, + "Accepting client connection: %s", server.neterr); + return; + } + serverLog(LL_VERBOSE,"Accepted connection to %s", server.unixsocket); + acceptCommonHandler(connCreateAcceptedUnix(cfd, NULL),CLIENT_UNIX_SOCKET,NULL); + } +} + +static void connUnixClose(connection *conn) { + connectionTypeTcp()->close(conn); +} + +static int connUnixAccept(connection *conn, ConnectionCallbackFunc accept_handler) { + return connectionTypeTcp()->accept(conn, accept_handler); +} + +static int connUnixWrite(connection *conn, const void *data, size_t data_len) { + return connectionTypeTcp()->write(conn, data, data_len); +} + +static int connUnixWritev(connection *conn, const struct iovec *iov, int iovcnt) { + return connectionTypeTcp()->writev(conn, iov, iovcnt); +} + +static int connUnixRead(connection *conn, void *buf, size_t buf_len) { + return connectionTypeTcp()->read(conn, buf, buf_len); +} + +static int connUnixSetWriteHandler(connection *conn, ConnectionCallbackFunc func, int barrier) { + return connectionTypeTcp()->set_write_handler(conn, func, barrier); +} + +static int connUnixSetReadHandler(connection *conn, ConnectionCallbackFunc func) { + return connectionTypeTcp()->set_read_handler(conn, func); +} + +static const char *connUnixGetLastError(connection *conn) { + return strerror(conn->last_errno); +} + +static ssize_t connUnixSyncWrite(connection *conn, char *ptr, ssize_t size, long long timeout) { + return syncWrite(conn->fd, ptr, size, timeout); +} + +static ssize_t connUnixSyncRead(connection *conn, char *ptr, ssize_t size, long long timeout) { + return syncRead(conn->fd, ptr, size, timeout); +} + +static ssize_t connUnixSyncReadLine(connection *conn, char *ptr, ssize_t size, long long timeout) { + return syncReadLine(conn->fd, ptr, size, timeout); +} + +static ConnectionType CT_Unix = { + /* connection type */ + .get_type = connUnixGetType, + + /* connection type initialize & finalize & configure */ + .init = NULL, + .cleanup = NULL, + .configure = NULL, + + /* ae & accept & listen & error & address handler */ + .ae_handler = connUnixEventHandler, + .accept_handler = connUnixAcceptHandler, + .addr = connUnixAddr, + .listen = connUnixListen, + + /* create/close connection */ + .conn_create = connCreateUnix, + .conn_create_accepted = connCreateAcceptedUnix, + .close = connUnixClose, + + /* connect & accept */ + .connect = NULL, + .blocking_connect = NULL, + .accept = connUnixAccept, + + /* IO */ + .write = connUnixWrite, + .writev = connUnixWritev, + .read = connUnixRead, + .set_write_handler = connUnixSetWriteHandler, + .set_read_handler = connUnixSetReadHandler, + .get_last_error = connUnixGetLastError, + .sync_write = connUnixSyncWrite, + .sync_read = connUnixSyncRead, + .sync_readline = connUnixSyncReadLine, + + /* pending data */ + .has_pending_data = NULL, + .process_pending_data = NULL, +}; + +int RedisRegisterConnectionTypeUnix() +{ + return connTypeRegister(&CT_Unix); +} diff --git a/tests/instances.tcl b/tests/instances.tcl index 8faf6fb31..9cbb11a92 100644 --- a/tests/instances.tcl +++ b/tests/instances.tcl @@ -19,6 +19,7 @@ source ../support/test.tcl set ::verbose 0 set ::valgrind 0 set ::tls 0 +set ::tls_module 0 set ::pause_on_error 0 set ::dont_clean 0 set ::simulate_error 0 @@ -85,6 +86,10 @@ proc spawn_instance {type base_port count {conf {}} {base_conf_file ""}} { } if {$::tls} { + if {$::tls_module} { + puts $cfg [format "loadmodule %s/../../../src/redis-tls.so" [pwd]] + } + puts $cfg "tls-port $port" puts $cfg "tls-replication yes" puts $cfg "tls-cluster yes" @@ -271,13 +276,16 @@ proc parse_options {} { } elseif {$opt eq {--host}} { incr j set ::host ${val} - } elseif {$opt eq {--tls}} { + } elseif {$opt eq {--tls} || $opt eq {--tls-module}} { package require tls 1.6 ::tls::init \ -cafile "$::tlsdir/ca.crt" \ -certfile "$::tlsdir/client.crt" \ -keyfile "$::tlsdir/client.key" set ::tls 1 + if {$opt eq {--tls-module}} { + set ::tls_module 1 + } } elseif {$opt eq {--config}} { set val2 [lindex $::argv [expr $j+2]] dict set ::global_config $val $val2 @@ -293,6 +301,7 @@ proc parse_options {} { puts "--fail Simulate a test failure." puts "--valgrind Run with valgrind." puts "--tls Run tests in TLS mode." + puts "--tls-module Run tests in TLS mode with Redis module." puts "--host <host> Use hostname instead of 127.0.0.1." puts "--config <k> <v> Extra config argument(s)." puts "--stop Blocks once the first test fails." diff --git a/tests/modules/defragtest.c b/tests/modules/defragtest.c index 221280df1..6a02a059f 100644 --- a/tests/modules/defragtest.c +++ b/tests/modules/defragtest.c @@ -35,7 +35,7 @@ static void createGlobalStrings(RedisModuleCtx *ctx, int count) } } -static int defragGlobalStrings(RedisModuleDefragCtx *ctx) +static void defragGlobalStrings(RedisModuleDefragCtx *ctx) { for (int i = 0; i < global_strings_len; i++) { RedisModuleString *new = RedisModule_DefragRedisModuleString(ctx, global_strings[i]); @@ -45,8 +45,6 @@ static int defragGlobalStrings(RedisModuleDefragCtx *ctx) global_defragged++; } } - - return 0; } static void FragInfo(RedisModuleInfoCtx *ctx, int for_crash_report) { diff --git a/tests/support/server.tcl b/tests/support/server.tcl index b673b70ae..6cc846b97 100644 --- a/tests/support/server.tcl +++ b/tests/support/server.tcl @@ -300,7 +300,7 @@ proc wait_server_started {config_file stdout pid} { set maxiter [expr {120*1000/$checkperiod}] ; # Wait up to 2 minutes. set port_busy 0 while 1 { - if {[regexp -- " PID: $pid" [exec cat $stdout]]} { + if {[regexp -- " PID: $pid.*Server initialized" [exec cat $stdout]]} { break } after $checkperiod @@ -464,6 +464,9 @@ proc start_server {options {code undefined}} { set data [split [exec cat "tests/assets/$baseconfig"] "\n"] set config {} if {$::tls} { + if {$::tls_module} { + lappend config_lines [list "loadmodule" [format "%s/src/redis-tls.so" [pwd]]] + } dict set config "tls-cert-file" [format "%s/tests/tls/server.crt" [pwd]] dict set config "tls-key-file" [format "%s/tests/tls/server.key" [pwd]] dict set config "tls-client-cert-file" [format "%s/tests/tls/client.crt" [pwd]] diff --git a/tests/support/util.tcl b/tests/support/util.tcl index 8153ad8bb..c7aef0f50 100644 --- a/tests/support/util.tcl +++ b/tests/support/util.tcl @@ -1039,3 +1039,25 @@ proc memory_usage {key} { } return $usage } + +# forward compatibility, lmap missing in TCL 8.5 +proc lmap args { + set body [lindex $args end] + set args [lrange $args 0 end-1] + set n 0 + set pairs [list] + foreach {varnames listval} $args { + set varlist [list] + foreach varname $varnames { + upvar 1 $varname var$n + lappend varlist var$n + incr n + } + lappend pairs $varlist $listval + } + set temp [list] + foreach {*}$pairs { + lappend temp [uplevel 1 $body] + } + set temp +} diff --git a/tests/test_helper.tcl b/tests/test_helper.tcl index 5c951dda6..efa7a0a16 100644 --- a/tests/test_helper.tcl +++ b/tests/test_helper.tcl @@ -111,6 +111,7 @@ set ::traceleaks 0 set ::valgrind 0 set ::durable 0 set ::tls 0 +set ::tls_module 0 set ::stack_logging 0 set ::verbose 0 set ::quiet 0 @@ -611,6 +612,7 @@ proc print_help_screen {} { "--wait-server Wait after server is started (so that you can attach a debugger)." "--dump-logs Dump server log on test failure." "--tls Run tests in TLS mode." + "--tls-module Run tests in TLS mode with Redis module." "--host <addr> Run tests against an external host." "--port <port> TCP port to use against external host." "--baseport <port> Initial port number for spawned redis servers." @@ -659,13 +661,16 @@ for {set j 0} {$j < [llength $argv]} {incr j} { } } elseif {$opt eq {--quiet}} { set ::quiet 1 - } elseif {$opt eq {--tls}} { + } elseif {$opt eq {--tls} || $opt eq {--tls-module}} { package require tls 1.6 set ::tls 1 ::tls::init \ -cafile "$::tlsdir/ca.crt" \ -certfile "$::tlsdir/client.crt" \ -keyfile "$::tlsdir/client.key" + if {$opt eq {--tls-module}} { + set ::tls_module 1 + } } elseif {$opt eq {--host}} { set ::external 1 set ::host $arg diff --git a/tests/unit/moduleapi/infra.tcl b/tests/unit/moduleapi/infra.tcl index 7bfa7d4b3..1140e5ad5 100644 --- a/tests/unit/moduleapi/infra.tcl +++ b/tests/unit/moduleapi/infra.tcl @@ -5,18 +5,21 @@ test {modules config rewrite} { start_server {tags {"modules"}} { r module load $testmodule - assert_equal [lindex [lindex [r module list] 0] 1] infotest + set modules [lmap x [r module list] {dict get $x name}] + assert_not_equal [lsearch $modules infotest] -1 r config rewrite restart_server 0 true false - assert_equal [lindex [lindex [r module list] 0] 1] infotest + set modules [lmap x [r module list] {dict get $x name}] + assert_not_equal [lsearch $modules infotest] -1 assert_equal {OK} [r module unload infotest] r config rewrite restart_server 0 true false - assert_equal [llength [r module list]] 0 + set modules [lmap x [r module list] {dict get $x name}] + assert_equal [lsearch $modules infotest] -1 } } diff --git a/tests/unit/moduleapi/moduleconfigs.tcl b/tests/unit/moduleapi/moduleconfigs.tcl index 8ebce3514..2b28fc307 100644 --- a/tests/unit/moduleapi/moduleconfigs.tcl +++ b/tests/unit/moduleapi/moduleconfigs.tcl @@ -5,7 +5,7 @@ start_server {tags {"modules"}} { r module load $testmodule test {Config get commands work} { # Make sure config get module config works - assert_equal [lindex [lindex [r module list] 0] 1] moduleconfigs + assert_not_equal [lsearch [lmap x [r module list] {dict get $x name}] moduleconfigs] -1 assert_equal [r config get moduleconfigs.mutable_bool] "moduleconfigs.mutable_bool yes" assert_equal [r config get moduleconfigs.immutable_bool] "moduleconfigs.immutable_bool no" assert_equal [r config get moduleconfigs.memory_numeric] "moduleconfigs.memory_numeric 1024" @@ -94,7 +94,7 @@ start_server {tags {"modules"}} { test {test loadex functionality} { r module loadex $testmodule CONFIG moduleconfigs.mutable_bool no CONFIG moduleconfigs.immutable_bool yes CONFIG moduleconfigs.memory_numeric 2mb CONFIG moduleconfigs.string tclortickle - assert_equal [lindex [lindex [r module list] 0] 1] moduleconfigs + assert_not_equal [lsearch [lmap x [r module list] {dict get $x name}] moduleconfigs] -1 assert_equal [r config get moduleconfigs.mutable_bool] "moduleconfigs.mutable_bool no" assert_equal [r config get moduleconfigs.immutable_bool] "moduleconfigs.immutable_bool yes" assert_equal [r config get moduleconfigs.memory_numeric] "moduleconfigs.memory_numeric 2097152" @@ -157,7 +157,7 @@ start_server {tags {"modules"}} { test {test config rewrite with dynamic load} { #translates to: super \0secret password r module loadex $testmodule CONFIG moduleconfigs.string \x73\x75\x70\x65\x72\x20\x00\x73\x65\x63\x72\x65\x74\x20\x70\x61\x73\x73\x77\x6f\x72\x64 ARGS - assert_equal [lindex [lindex [r module list] 0] 1] moduleconfigs + assert_not_equal [lsearch [lmap x [r module list] {dict get $x name}] moduleconfigs] -1 assert_equal [r config get moduleconfigs.string] "moduleconfigs.string {super \0secret password}" r config set moduleconfigs.mutable_bool yes r config set moduleconfigs.memory_numeric 750 @@ -207,7 +207,7 @@ start_server {tags {"modules"}} { test {test 1.module load 2.config rewrite 3.module unload 4.config rewrite works} { # Configs need to be removed from the old config file in this case. r module loadex $testmodule CONFIG moduleconfigs.memory_numeric 500 ARGS - assert_equal [lindex [lindex [r module list] 0] 1] moduleconfigs + assert_not_equal [lsearch [lmap x [r module list] {dict get $x name}] moduleconfigs] -1 r config rewrite r module unload moduleconfigs r config rewrite @@ -217,34 +217,18 @@ start_server {tags {"modules"}} { } test {startup moduleconfigs} { # No loadmodule directive - set nomodload [start_server [list overrides [list moduleconfigs.string "hello"]]] - wait_for_condition 100 50 { - ! [is_alive $nomodload] - } else { - fail "startup should've failed with no load and module configs supplied" - } - set stdout [dict get $nomodload stdout] - assert_equal [count_message_lines $stdout "Module Configuration detected without loadmodule directive or no ApplyConfig call: aborting"] 1 + catch {exec src/redis-server --moduleconfigs.string "hello"} err + assert_match {*Module Configuration detected without loadmodule directive or no ApplyConfig call: aborting*} $err # Bad config value - set badconfig [start_server [list overrides [list loadmodule "$testmodule" moduleconfigs.string "rejectisfreed"]]] - wait_for_condition 100 50 { - ! [is_alive $badconfig] - } else { - fail "startup with bad moduleconfigs should've failed" - } - set stdout [dict get $badconfig stdout] - assert_equal [count_message_lines $stdout "Issue during loading of configuration moduleconfigs.string : Cannot set string to 'rejectisfreed'"] 1 + catch {exec src/redis-server --loadmodule "$testmodule" --moduleconfigs.string "rejectisfreed"} err + assert_match {*Issue during loading of configuration moduleconfigs.string : Cannot set string to 'rejectisfreed'*} $err - set noload [start_server [list overrides [list loadmodule "$testmodule noload" moduleconfigs.string "hello"]]] - wait_for_condition 100 50 { - ! [is_alive $noload] - } else { - fail "startup with moduleconfigs and no loadconfigs call should've failed" - } - set stdout [dict get $noload stdout] - assert_equal [count_message_lines $stdout "Module Configurations were not set, likely a missing LoadConfigs call. Unloading the module."] 1 + # missing LoadConfigs call + catch {exec src/redis-server --loadmodule "$testmodule" noload --moduleconfigs.string "hello"} err + assert_match {*Module Configurations were not set, likely a missing LoadConfigs call. Unloading the module.*} $err + # successful start_server [list overrides [list loadmodule "$testmodule" moduleconfigs.string "bootedup" moduleconfigs.enum two moduleconfigs.flags "two four"]] { assert_equal [r config get moduleconfigs.string] "moduleconfigs.string bootedup" assert_equal [r config get moduleconfigs.mutable_bool] "moduleconfigs.mutable_bool yes" |