summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--00-RELEASENOTES30
-rw-r--r--deps/Makefile2
-rw-r--r--deps/lua/src/Makefile5
-rw-r--r--deps/lua/src/fpconv.c205
-rw-r--r--deps/lua/src/fpconv.h22
-rw-r--r--deps/lua/src/lua_bit.c189
-rw-r--r--deps/lua/src/lua_cjson.c730
-rw-r--r--deps/lua/src/lua_cmsgpack.c450
-rw-r--r--deps/lua/src/strbuf.c6
-rw-r--r--deps/lua/src/strbuf.h16
-rw-r--r--redis.conf43
-rw-r--r--src/anet.c36
-rw-r--r--src/anet.h2
-rw-r--r--src/config.c26
-rw-r--r--src/config.h1
-rw-r--r--src/db.c2
-rw-r--r--src/debug.c6
-rw-r--r--src/fmacros.h1
-rw-r--r--src/hyperloglog.c2
-rw-r--r--src/latency.c45
-rw-r--r--src/latency.h1
-rw-r--r--src/memtest.c3
-rw-r--r--src/networking.c23
-rw-r--r--src/object.c50
-rw-r--r--src/rdb.c354
-rw-r--r--src/rdb.h1
-rw-r--r--src/redis-benchmark.c75
-rw-r--r--src/redis-cli.c9
-rw-r--r--src/redis.c119
-rw-r--r--src/redis.h44
-rw-r--r--src/replication.c399
-rw-r--r--src/rio.c177
-rw-r--r--src/rio.h16
-rw-r--r--src/scripting.c2
-rw-r--r--src/sds.c12
-rw-r--r--src/sort.c17
-rw-r--r--src/sparkline.c3
-rw-r--r--src/syncio.c1
-rw-r--r--src/t_hash.c2
-rw-r--r--src/t_string.c2
-rw-r--r--src/version.h2
-rw-r--r--src/zipmap.c5
-rw-r--r--src/zmalloc.c24
-rw-r--r--src/zmalloc.h1
-rw-r--r--tests/integration/replication.tcl135
-rw-r--r--tests/support/server.tcl16
-rw-r--r--tests/test_helper.tcl73
-rw-r--r--tests/unit/scan.tcl10
-rw-r--r--tests/unit/scripting.tcl88
-rw-r--r--tests/unit/sort.tcl18
-rwxr-xr-xutils/redis_init_script.tpl5
-rwxr-xr-xutils/whatisdoing.sh8
52 files changed, 2730 insertions, 784 deletions
diff --git a/00-RELEASENOTES b/00-RELEASENOTES
index f06ff8090..6108816df 100644
--- a/00-RELEASENOTES
+++ b/00-RELEASENOTES
@@ -14,6 +14,36 @@ HIGH: There is a critical bug that may affect a subset of users. Upgrade!
CRITICAL: There is a critical bug affecting MOST USERS. Upgrade ASAP.
--------------------------------------------------------------------------------
+--[ Redis 2.8.18 ] Release date: 4 Dec 2014
+
+# UPGRADE URGENCY: LOW for both Redis and Sentinel. This release mostly
+ adds new features to Redis, and contains non critical
+ fixes.
+
+* [FIX] Linenoise updated to be more VT100 compatible. (Salvatore Sanfilippo)
+* [FIX] A number of typos fixed inside comments. (Various authors)
+* [FIX] redis-cli no longer quits after long timeouts. (Matt Stancliff)
+* [FIX] Test framework improved to detect never terminating scripts, cleanup
+ instances on crashes. (Salvatore Sanfilippo)
+* [FIX] PFCOUNT can be used on slaves now. (Salvatore Sanfilippo)
+* [FIX] ZSCAN no longer report very small scores as 0. (Matt Stancliff,
+ Michael Grunder, Salvatore Sanfilippo)
+* [FIX] Don't show the ASCII logo if syslog is enabled. Redis is now
+ an Enterprise Grade product. (Salvatore Sanfilippo)
+
+* [NEW] EXPERIMENTAL: Diskless replication, for more info check the doc at
+ http://redis.io/topics/replication. (Salvatore Sanfilippo).
+* [NEW] Transparent Huge Pages detection and reporting in logs and
+ LATENCY DOCTOR output. (Salvatore Sanfilippo)
+* [NEW] Many Lua scripting enhancements: Bitops API, cjson upgrade and tests,
+ cmsgpack upgrade. (Matt Stancliff)
+* [NEW] Total and instantaneous Network bandwidth tracking in INFO.
+* [NEW] DEBUG POPULATE two args form implemented (old form still works).
+ The second argument is the key prefix. Default is "key:" (Salvatore
+ Sanfilippo)
+* [NEW] Check that tcp-backlog is matched by /proc/sys/net/core/somaxconn, and
+ warn about it if not. (Salvatore Sanfilippo)
+
--[ Redis 2.8.17 ] Release date: 19 Sep 2014
# UPGRADE URGENCY: HIGH for Redis Sentinel.
diff --git a/deps/Makefile b/deps/Makefile
index e183ede9c..1f623ea7b 100644
--- a/deps/Makefile
+++ b/deps/Makefile
@@ -58,7 +58,7 @@ ifeq ($(uname_S),SunOS)
LUA_CFLAGS= -D__C99FEATURES__=1
endif
-LUA_CFLAGS+= -O2 -Wall -DLUA_ANSI $(CFLAGS)
+LUA_CFLAGS+= -O2 -Wall -DLUA_ANSI -DENABLE_CJSON_GLOBAL $(CFLAGS)
LUA_LDFLAGS+= $(LDFLAGS)
# lua's Makefile defines AR="ar rcu", which is unusual, and makes it more
# challenging to cross-compile lua (and redis). These defines make it easier
diff --git a/deps/lua/src/Makefile b/deps/lua/src/Makefile
index 34b0c3617..f3bba2f81 100644
--- a/deps/lua/src/Makefile
+++ b/deps/lua/src/Makefile
@@ -25,9 +25,10 @@ PLATS= aix ansi bsd freebsd generic linux macosx mingw posix solaris
LUA_A= liblua.a
CORE_O= lapi.o lcode.o ldebug.o ldo.o ldump.o lfunc.o lgc.o llex.o lmem.o \
lobject.o lopcodes.o lparser.o lstate.o lstring.o ltable.o ltm.o \
- lundump.o lvm.o lzio.o strbuf.o
+ lundump.o lvm.o lzio.o strbuf.o fpconv.o
LIB_O= lauxlib.o lbaselib.o ldblib.o liolib.o lmathlib.o loslib.o ltablib.o \
- lstrlib.o loadlib.o linit.o lua_cjson.o lua_struct.o lua_cmsgpack.o
+ lstrlib.o loadlib.o linit.o lua_cjson.o lua_struct.o lua_cmsgpack.o \
+ lua_bit.o
LUA_T= lua
LUA_O= lua.o
diff --git a/deps/lua/src/fpconv.c b/deps/lua/src/fpconv.c
new file mode 100644
index 000000000..79908317a
--- /dev/null
+++ b/deps/lua/src/fpconv.c
@@ -0,0 +1,205 @@
+/* fpconv - Floating point conversion routines
+ *
+ * Copyright (c) 2011-2012 Mark Pulford <mark@kyne.com.au>
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining
+ * a copy of this software and associated documentation files (the
+ * "Software"), to deal in the Software without restriction, including
+ * without limitation the rights to use, copy, modify, merge, publish,
+ * distribute, sublicense, and/or sell copies of the Software, and to
+ * permit persons to whom the Software is furnished to do so, subject to
+ * the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+ * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
+ * IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
+ * CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
+ * TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
+ * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+ */
+
+/* JSON uses a '.' decimal separator. strtod() / sprintf() under C libraries
+ * with locale support will break when the decimal separator is a comma.
+ *
+ * fpconv_* will around these issues with a translation buffer if required.
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <assert.h>
+#include <string.h>
+
+#include "fpconv.h"
+
+/* Lua CJSON assumes the locale is the same for all threads within a
+ * process and doesn't change after initialisation.
+ *
+ * This avoids the need for per thread storage or expensive checks
+ * for call. */
+static char locale_decimal_point = '.';
+
+/* In theory multibyte decimal_points are possible, but
+ * Lua CJSON only supports UTF-8 and known locales only have
+ * single byte decimal points ([.,]).
+ *
+ * localconv() may not be thread safe (=>crash), and nl_langinfo() is
+ * not supported on some platforms. Use sprintf() instead - if the
+ * locale does change, at least Lua CJSON won't crash. */
+static void fpconv_update_locale()
+{
+ char buf[8];
+
+ snprintf(buf, sizeof(buf), "%g", 0.5);
+
+ /* Failing this test might imply the platform has a buggy dtoa
+ * implementation or wide characters */
+ if (buf[0] != '0' || buf[2] != '5' || buf[3] != 0) {
+ fprintf(stderr, "Error: wide characters found or printf() bug.");
+ abort();
+ }
+
+ locale_decimal_point = buf[1];
+}
+
+/* Check for a valid number character: [-+0-9a-yA-Y.]
+ * Eg: -0.6e+5, infinity, 0xF0.F0pF0
+ *
+ * Used to find the probable end of a number. It doesn't matter if
+ * invalid characters are counted - strtod() will find the valid
+ * number if it exists. The risk is that slightly more memory might
+ * be allocated before a parse error occurs. */
+static inline int valid_number_character(char ch)
+{
+ char lower_ch;
+
+ if ('0' <= ch && ch <= '9')
+ return 1;
+ if (ch == '-' || ch == '+' || ch == '.')
+ return 1;
+
+ /* Hex digits, exponent (e), base (p), "infinity",.. */
+ lower_ch = ch | 0x20;
+ if ('a' <= lower_ch && lower_ch <= 'y')
+ return 1;
+
+ return 0;
+}
+
+/* Calculate the size of the buffer required for a strtod locale
+ * conversion. */
+static int strtod_buffer_size(const char *s)
+{
+ const char *p = s;
+
+ while (valid_number_character(*p))
+ p++;
+
+ return p - s;
+}
+
+/* Similar to strtod(), but must be passed the current locale's decimal point
+ * character. Guaranteed to be called at the start of any valid number in a string */
+double fpconv_strtod(const char *nptr, char **endptr)
+{
+ char localbuf[FPCONV_G_FMT_BUFSIZE];
+ char *buf, *endbuf, *dp;
+ int buflen;
+ double value;
+
+ /* System strtod() is fine when decimal point is '.' */
+ if (locale_decimal_point == '.')
+ return strtod(nptr, endptr);
+
+ buflen = strtod_buffer_size(nptr);
+ if (!buflen) {
+ /* No valid characters found, standard strtod() return */
+ *endptr = (char *)nptr;
+ return 0;
+ }
+
+ /* Duplicate number into buffer */
+ if (buflen >= FPCONV_G_FMT_BUFSIZE) {
+ /* Handle unusually large numbers */
+ buf = malloc(buflen + 1);
+ if (!buf) {
+ fprintf(stderr, "Out of memory");
+ abort();
+ }
+ } else {
+ /* This is the common case.. */
+ buf = localbuf;
+ }
+ memcpy(buf, nptr, buflen);
+ buf[buflen] = 0;
+
+ /* Update decimal point character if found */
+ dp = strchr(buf, '.');
+ if (dp)
+ *dp = locale_decimal_point;
+
+ value = strtod(buf, &endbuf);
+ *endptr = (char *)&nptr[endbuf - buf];
+ if (buflen >= FPCONV_G_FMT_BUFSIZE)
+ free(buf);
+
+ return value;
+}
+
+/* "fmt" must point to a buffer of at least 6 characters */
+static void set_number_format(char *fmt, int precision)
+{
+ int d1, d2, i;
+
+ assert(1 <= precision && precision <= 14);
+
+ /* Create printf format (%.14g) from precision */
+ d1 = precision / 10;
+ d2 = precision % 10;
+ fmt[0] = '%';
+ fmt[1] = '.';
+ i = 2;
+ if (d1) {
+ fmt[i++] = '0' + d1;
+ }
+ fmt[i++] = '0' + d2;
+ fmt[i++] = 'g';
+ fmt[i] = 0;
+}
+
+/* Assumes there is always at least 32 characters available in the target buffer */
+int fpconv_g_fmt(char *str, double num, int precision)
+{
+ char buf[FPCONV_G_FMT_BUFSIZE];
+ char fmt[6];
+ int len;
+ char *b;
+
+ set_number_format(fmt, precision);
+
+ /* Pass through when decimal point character is dot. */
+ if (locale_decimal_point == '.')
+ return snprintf(str, FPCONV_G_FMT_BUFSIZE, fmt, num);
+
+ /* snprintf() to a buffer then translate for other decimal point characters */
+ len = snprintf(buf, FPCONV_G_FMT_BUFSIZE, fmt, num);
+
+ /* Copy into target location. Translate decimal point if required */
+ b = buf;
+ do {
+ *str++ = (*b == locale_decimal_point ? '.' : *b);
+ } while(*b++);
+
+ return len;
+}
+
+void fpconv_init()
+{
+ fpconv_update_locale();
+}
+
+/* vi:ai et sw=4 ts=4:
+ */
diff --git a/deps/lua/src/fpconv.h b/deps/lua/src/fpconv.h
new file mode 100644
index 000000000..7b0d0ee31
--- /dev/null
+++ b/deps/lua/src/fpconv.h
@@ -0,0 +1,22 @@
+/* Lua CJSON floating point conversion routines */
+
+/* Buffer required to store the largest string representation of a double.
+ *
+ * Longest double printed with %.14g is 21 characters long:
+ * -1.7976931348623e+308 */
+# define FPCONV_G_FMT_BUFSIZE 32
+
+#ifdef USE_INTERNAL_FPCONV
+static inline void fpconv_init()
+{
+ /* Do nothing - not required */
+}
+#else
+extern void fpconv_init();
+#endif
+
+extern int fpconv_g_fmt(char*, double, int);
+extern double fpconv_strtod(const char*, char**);
+
+/* vi:ai et sw=4 ts=4:
+ */
diff --git a/deps/lua/src/lua_bit.c b/deps/lua/src/lua_bit.c
new file mode 100644
index 000000000..690df7d3c
--- /dev/null
+++ b/deps/lua/src/lua_bit.c
@@ -0,0 +1,189 @@
+/*
+** Lua BitOp -- a bit operations library for Lua 5.1/5.2.
+** http://bitop.luajit.org/
+**
+** Copyright (C) 2008-2012 Mike Pall. All rights reserved.
+**
+** Permission is hereby granted, free of charge, to any person obtaining
+** a copy of this software and associated documentation files (the
+** "Software"), to deal in the Software without restriction, including
+** without limitation the rights to use, copy, modify, merge, publish,
+** distribute, sublicense, and/or sell copies of the Software, and to
+** permit persons to whom the Software is furnished to do so, subject to
+** the following conditions:
+**
+** The above copyright notice and this permission notice shall be
+** included in all copies or substantial portions of the Software.
+**
+** THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+** EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+** MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
+** IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
+** CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
+** TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
+** SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+**
+** [ MIT license: http://www.opensource.org/licenses/mit-license.php ]
+*/
+
+#define LUA_BITOP_VERSION "1.0.2"
+
+#define LUA_LIB
+#include "lua.h"
+#include "lauxlib.h"
+
+#ifdef _MSC_VER
+/* MSVC is stuck in the last century and doesn't have C99's stdint.h. */
+typedef __int32 int32_t;
+typedef unsigned __int32 uint32_t;
+typedef unsigned __int64 uint64_t;
+#else
+#include <stdint.h>
+#endif
+
+typedef int32_t SBits;
+typedef uint32_t UBits;
+
+typedef union {
+ lua_Number n;
+#ifdef LUA_NUMBER_DOUBLE
+ uint64_t b;
+#else
+ UBits b;
+#endif
+} BitNum;
+
+/* Convert argument to bit type. */
+static UBits barg(lua_State *L, int idx)
+{
+ BitNum bn;
+ UBits b;
+#if LUA_VERSION_NUM < 502
+ bn.n = lua_tonumber(L, idx);
+#else
+ bn.n = luaL_checknumber(L, idx);
+#endif
+#if defined(LUA_NUMBER_DOUBLE)
+ bn.n += 6755399441055744.0; /* 2^52+2^51 */
+#ifdef SWAPPED_DOUBLE
+ b = (UBits)(bn.b >> 32);
+#else
+ b = (UBits)bn.b;
+#endif
+#elif defined(LUA_NUMBER_INT) || defined(LUA_NUMBER_LONG) || \
+ defined(LUA_NUMBER_LONGLONG) || defined(LUA_NUMBER_LONG_LONG) || \
+ defined(LUA_NUMBER_LLONG)
+ if (sizeof(UBits) == sizeof(lua_Number))
+ b = bn.b;
+ else
+ b = (UBits)(SBits)bn.n;
+#elif defined(LUA_NUMBER_FLOAT)
+#error "A 'float' lua_Number type is incompatible with this library"
+#else
+#error "Unknown number type, check LUA_NUMBER_* in luaconf.h"
+#endif
+#if LUA_VERSION_NUM < 502
+ if (b == 0 && !lua_isnumber(L, idx)) {
+ luaL_typerror(L, idx, "number");
+ }
+#endif
+ return b;
+}
+
+/* Return bit type. */
+#define BRET(b) lua_pushnumber(L, (lua_Number)(SBits)(b)); return 1;
+
+static int bit_tobit(lua_State *L) { BRET(barg(L, 1)) }
+static int bit_bnot(lua_State *L) { BRET(~barg(L, 1)) }
+
+#define BIT_OP(func, opr) \
+ static int func(lua_State *L) { int i; UBits b = barg(L, 1); \
+ for (i = lua_gettop(L); i > 1; i--) b opr barg(L, i); BRET(b) }
+BIT_OP(bit_band, &=)
+BIT_OP(bit_bor, |=)
+BIT_OP(bit_bxor, ^=)
+
+#define bshl(b, n) (b << n)
+#define bshr(b, n) (b >> n)
+#define bsar(b, n) ((SBits)b >> n)
+#define brol(b, n) ((b << n) | (b >> (32-n)))
+#define bror(b, n) ((b << (32-n)) | (b >> n))
+#define BIT_SH(func, fn) \
+ static int func(lua_State *L) { \
+ UBits b = barg(L, 1); UBits n = barg(L, 2) & 31; BRET(fn(b, n)) }
+BIT_SH(bit_lshift, bshl)
+BIT_SH(bit_rshift, bshr)
+BIT_SH(bit_arshift, bsar)
+BIT_SH(bit_rol, brol)
+BIT_SH(bit_ror, bror)
+
+static int bit_bswap(lua_State *L)
+{
+ UBits b = barg(L, 1);
+ b = (b >> 24) | ((b >> 8) & 0xff00) | ((b & 0xff00) << 8) | (b << 24);
+ BRET(b)
+}
+
+static int bit_tohex(lua_State *L)
+{
+ UBits b = barg(L, 1);
+ SBits n = lua_isnone(L, 2) ? 8 : (SBits)barg(L, 2);
+ const char *hexdigits = "0123456789abcdef";
+ char buf[8];
+ int i;
+ if (n < 0) { n = -n; hexdigits = "0123456789ABCDEF"; }
+ if (n > 8) n = 8;
+ for (i = (int)n; --i >= 0; ) { buf[i] = hexdigits[b & 15]; b >>= 4; }
+ lua_pushlstring(L, buf, (size_t)n);
+ return 1;
+}
+
+static const struct luaL_Reg bit_funcs[] = {
+ { "tobit", bit_tobit },
+ { "bnot", bit_bnot },
+ { "band", bit_band },
+ { "bor", bit_bor },
+ { "bxor", bit_bxor },
+ { "lshift", bit_lshift },
+ { "rshift", bit_rshift },
+ { "arshift", bit_arshift },
+ { "rol", bit_rol },
+ { "ror", bit_ror },
+ { "bswap", bit_bswap },
+ { "tohex", bit_tohex },
+ { NULL, NULL }
+};
+
+/* Signed right-shifts are implementation-defined per C89/C99.
+** But the de facto standard are arithmetic right-shifts on two's
+** complement CPUs. This behaviour is required here, so test for it.
+*/
+#define BAD_SAR (bsar(-8, 2) != (SBits)-2)
+
+LUALIB_API int luaopen_bit(lua_State *L)
+{
+ UBits b;
+ lua_pushnumber(L, (lua_Number)1437217655L);
+ b = barg(L, -1);
+ if (b != (UBits)1437217655L || BAD_SAR) { /* Perform a simple self-test. */
+ const char *msg = "compiled with incompatible luaconf.h";
+#ifdef LUA_NUMBER_DOUBLE
+#ifdef _WIN32
+ if (b == (UBits)1610612736L)
+ msg = "use D3DCREATE_FPU_PRESERVE with DirectX";
+#endif
+ if (b == (UBits)1127743488L)
+ msg = "not compiled with SWAPPED_DOUBLE";
+#endif
+ if (BAD_SAR)
+ msg = "arithmetic right-shift broken";
+ luaL_error(L, "bit library self-test failed (%s)", msg);
+ }
+#if LUA_VERSION_NUM < 502
+ luaL_register(L, "bit", bit_funcs);
+#else
+ luaL_newlib(L, bit_funcs);
+#endif
+ return 1;
+}
+
diff --git a/deps/lua/src/lua_cjson.c b/deps/lua/src/lua_cjson.c
index 2e272b007..c26c0d7b8 100644
--- a/deps/lua/src/lua_cjson.c
+++ b/deps/lua/src/lua_cjson.c
@@ -1,8 +1,6 @@
-#define VERSION "1.0.3"
-
-/* CJSON - JSON support for Lua
+/* Lua CJSON - JSON support for Lua
*
- * Copyright (c) 2010-2011 Mark Pulford <mark@kyne.com.au>
+ * Copyright (c) 2010-2012 Mark Pulford <mark@kyne.com.au>
*
* Permission is hereby granted, free of charge, to any person obtaining
* a copy of this software and associated documentation files (the
@@ -41,22 +39,42 @@
#include <assert.h>
#include <string.h>
#include <math.h>
+#include <limits.h>
#include "lua.h"
#include "lauxlib.h"
#include "strbuf.h"
+#include "fpconv.h"
+
+#include "../../../src/solarisfixes.h"
+
+#ifndef CJSON_MODNAME
+#define CJSON_MODNAME "cjson"
+#endif
+
+#ifndef CJSON_VERSION
+#define CJSON_VERSION "2.1.0"
+#endif
-#ifdef MISSING_ISINF
+/* Workaround for Solaris platforms missing isinf() */
+#if !defined(isinf) && (defined(USE_INTERNAL_ISINF) || defined(MISSING_ISINF))
#define isinf(x) (!isnan(x) && isnan((x) - (x)))
#endif
#define DEFAULT_SPARSE_CONVERT 0
#define DEFAULT_SPARSE_RATIO 2
#define DEFAULT_SPARSE_SAFE 10
-#define DEFAULT_MAX_DEPTH 20
-#define DEFAULT_ENCODE_REFUSE_BADNUM 1
-#define DEFAULT_DECODE_REFUSE_BADNUM 0
+#define DEFAULT_ENCODE_MAX_DEPTH 1000
+#define DEFAULT_DECODE_MAX_DEPTH 1000
+#define DEFAULT_ENCODE_INVALID_NUMBERS 0
+#define DEFAULT_DECODE_INVALID_NUMBERS 1
#define DEFAULT_ENCODE_KEEP_BUFFER 1
+#define DEFAULT_ENCODE_NUMBER_PRECISION 14
+
+#ifdef DISABLE_INVALID_NUMBERS
+#undef DEFAULT_DECODE_INVALID_NUMBERS
+#define DEFAULT_DECODE_INVALID_NUMBERS 0
+#endif
typedef enum {
T_OBJ_BEGIN,
@@ -96,29 +114,29 @@ static const char *json_token_type_name[] = {
typedef struct {
json_token_type_t ch2token[256];
char escape2char[256]; /* Decoding */
-#if 0
- char escapes[35][8]; /* Pre-generated escape string buffer */
- char *char2escape[256]; /* Encoding */
-#endif
+
+ /* encode_buf is only allocated and used when
+ * encode_keep_buffer is set */
strbuf_t encode_buf;
- char number_fmt[8]; /* "%.XXg\0" */
- int current_depth;
int encode_sparse_convert;
int encode_sparse_ratio;
int encode_sparse_safe;
int encode_max_depth;
- int encode_refuse_badnum;
- int decode_refuse_badnum;
- int encode_keep_buffer;
+ int encode_invalid_numbers; /* 2 => Encode as "null" */
int encode_number_precision;
+ int encode_keep_buffer;
+
+ int decode_invalid_numbers;
+ int decode_max_depth;
} json_config_t;
typedef struct {
const char *data;
- int index;
+ const char *ptr;
strbuf_t *tmp; /* Temporary storage for strings */
json_config_t *cfg;
+ int current_depth;
} json_parse_t;
typedef struct {
@@ -171,29 +189,76 @@ static const char *char2escape[256] = {
NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL,
};
-static int json_config_key;
-
/* ===== CONFIGURATION ===== */
static json_config_t *json_fetch_config(lua_State *l)
{
json_config_t *cfg;
- lua_pushlightuserdata(l, &json_config_key);
- lua_gettable(l, LUA_REGISTRYINDEX);
- cfg = lua_touserdata(l, -1);
+ cfg = lua_touserdata(l, lua_upvalueindex(1));
if (!cfg)
luaL_error(l, "BUG: Unable to fetch CJSON configuration");
- lua_pop(l, 1);
-
return cfg;
}
-static void json_verify_arg_count(lua_State *l, int args)
+/* Ensure the correct number of arguments have been provided.
+ * Pad with nil to allow other functions to simply check arg[i]
+ * to find whether an argument was provided */
+static json_config_t *json_arg_init(lua_State *l, int args)
{
luaL_argcheck(l, lua_gettop(l) <= args, args + 1,
"found too many arguments");
+
+ while (lua_gettop(l) < args)
+ lua_pushnil(l);
+
+ return json_fetch_config(l);
+}
+
+/* Process integer options for configuration functions */
+static int json_integer_option(lua_State *l, int optindex, int *setting,
+ int min, int max)
+{
+ char errmsg[64];
+ int value;
+
+ if (!lua_isnil(l, optindex)) {
+ value = luaL_checkinteger(l, optindex);
+ snprintf(errmsg, sizeof(errmsg), "expected integer between %d and %d", min, max);
+ luaL_argcheck(l, min <= value && value <= max, 1, errmsg);
+ *setting = value;
+ }
+
+ lua_pushinteger(l, *setting);
+
+ return 1;
+}
+
+/* Process enumerated arguments for a configuration function */
+static int json_enum_option(lua_State *l, int optindex, int *setting,
+ const char **options, int bool_true)
+{
+ static const char *bool_options[] = { "off", "on", NULL };
+
+ if (!options) {
+ options = bool_options;
+ bool_true = 1;
+ }
+
+ if (!lua_isnil(l, optindex)) {
+ if (bool_true && lua_isboolean(l, optindex))
+ *setting = lua_toboolean(l, optindex) * bool_true;
+ else
+ *setting = luaL_checkoption(l, optindex, NULL, options);
+ }
+
+ if (bool_true && (*setting == 0 || *setting == bool_true))
+ lua_pushboolean(l, *setting);
+ else
+ lua_pushstring(l, options[*setting]);
+
+ return 1;
}
/* Configures handling of extremely sparse arrays:
@@ -202,29 +267,11 @@ static void json_verify_arg_count(lua_State *l, int args)
* safe: Always use an array when the max index <= safe */
static int json_cfg_encode_sparse_array(lua_State *l)
{
- json_config_t *cfg;
- int val;
-
- json_verify_arg_count(l, 3);
- cfg = json_fetch_config(l);
-
- switch (lua_gettop(l)) {
- case 3:
- val = luaL_checkinteger(l, 3);
- luaL_argcheck(l, val >= 0, 3, "expected integer >= 0");
- cfg->encode_sparse_safe = val;
- case 2:
- val = luaL_checkinteger(l, 2);
- luaL_argcheck(l, val >= 0, 2, "expected integer >= 0");
- cfg->encode_sparse_ratio = val;
- case 1:
- luaL_argcheck(l, lua_isboolean(l, 1), 1, "expected boolean");
- cfg->encode_sparse_convert = lua_toboolean(l, 1);
- }
+ json_config_t *cfg = json_arg_init(l, 3);
- lua_pushboolean(l, cfg->encode_sparse_convert);
- lua_pushinteger(l, cfg->encode_sparse_ratio);
- lua_pushinteger(l, cfg->encode_sparse_safe);
+ json_enum_option(l, 1, &cfg->encode_sparse_convert, NULL, 1);
+ json_integer_option(l, 2, &cfg->encode_sparse_ratio, 0, INT_MAX);
+ json_integer_option(l, 3, &cfg->encode_sparse_safe, 0, INT_MAX);
return 3;
}
@@ -233,108 +280,80 @@ static int json_cfg_encode_sparse_array(lua_State *l)
* encoding */
static int json_cfg_encode_max_depth(lua_State *l)
{
- json_config_t *cfg;
- int depth;
+ json_config_t *cfg = json_arg_init(l, 1);
- json_verify_arg_count(l, 1);
- cfg = json_fetch_config(l);
-
- if (lua_gettop(l)) {
- depth = luaL_checkinteger(l, 1);
- luaL_argcheck(l, depth > 0, 1, "expected positive integer");
- cfg->encode_max_depth = depth;
- }
-
- lua_pushinteger(l, cfg->encode_max_depth);
-
- return 1;
+ return json_integer_option(l, 1, &cfg->encode_max_depth, 1, INT_MAX);
}
-static void json_set_number_precision(json_config_t *cfg, int prec)
+/* Configures the maximum number of nested arrays/objects allowed when
+ * encoding */
+static int json_cfg_decode_max_depth(lua_State *l)
{
- cfg->encode_number_precision = prec;
- sprintf(cfg->number_fmt, "%%.%dg", prec);
+ json_config_t *cfg = json_arg_init(l, 1);
+
+ return json_integer_option(l, 1, &cfg->decode_max_depth, 1, INT_MAX);
}
/* Configures number precision when converting doubles to text */
static int json_cfg_encode_number_precision(lua_State *l)
{
- json_config_t *cfg;
- int precision;
-
- json_verify_arg_count(l, 1);
- cfg = json_fetch_config(l);
-
- if (lua_gettop(l)) {
- precision = luaL_checkinteger(l, 1);
- luaL_argcheck(l, 1 <= precision && precision <= 14, 1,
- "expected integer between 1 and 14");
- json_set_number_precision(cfg, precision);
- }
+ json_config_t *cfg = json_arg_init(l, 1);
- lua_pushinteger(l, cfg->encode_number_precision);
-
- return 1;
+ return json_integer_option(l, 1, &cfg->encode_number_precision, 1, 14);
}
/* Configures JSON encoding buffer persistence */
static int json_cfg_encode_keep_buffer(lua_State *l)
{
- json_config_t *cfg;
+ json_config_t *cfg = json_arg_init(l, 1);
+ int old_value;
- json_verify_arg_count(l, 1);
- cfg = json_fetch_config(l);
+ old_value = cfg->encode_keep_buffer;
- if (lua_gettop(l)) {
- luaL_checktype(l, 1, LUA_TBOOLEAN);
- cfg->encode_keep_buffer = lua_toboolean(l, 1);
- }
+ json_enum_option(l, 1, &cfg->encode_keep_buffer, NULL, 1);
- lua_pushboolean(l, cfg->encode_keep_buffer);
+ /* Init / free the buffer if the setting has changed */
+ if (old_value ^ cfg->encode_keep_buffer) {
+ if (cfg->encode_keep_buffer)
+ strbuf_init(&cfg->encode_buf, 0);
+ else
+ strbuf_free(&cfg->encode_buf);
+ }
return 1;
}
-/* On argument: decode enum and set config variables
- * **options must point to a NULL terminated array of 4 enums
- * Returns: current enum value */
-static void json_enum_option(lua_State *l, const char **options,
- int *opt1, int *opt2)
+#if defined(DISABLE_INVALID_NUMBERS) && !defined(USE_INTERNAL_FPCONV)
+void json_verify_invalid_number_setting(lua_State *l, int *setting)
{
- int setting;
+ if (*setting == 1) {
+ *setting = 0;
+ luaL_error(l, "Infinity, NaN, and/or hexadecimal numbers are not supported.");
+ }
+}
+#else
+#define json_verify_invalid_number_setting(l, s) do { } while(0)
+#endif
- if (lua_gettop(l)) {
- if (lua_isboolean(l, 1))
- setting = lua_toboolean(l, 1) * 3;
- else
- setting = luaL_checkoption(l, 1, NULL, options);
+static int json_cfg_encode_invalid_numbers(lua_State *l)
+{
+ static const char *options[] = { "off", "on", "null", NULL };
+ json_config_t *cfg = json_arg_init(l, 1);
- *opt1 = setting & 1 ? 1 : 0;
- *opt2 = setting & 2 ? 1 : 0;
- } else {
- setting = *opt1 | (*opt2 << 1);
- }
+ json_enum_option(l, 1, &cfg->encode_invalid_numbers, options, 1);
- if (setting)
- lua_pushstring(l, options[setting]);
- else
- lua_pushboolean(l, 0);
-}
+ json_verify_invalid_number_setting(l, &cfg->encode_invalid_numbers);
+ return 1;
+}
-/* When enabled, rejects: NaN, Infinity, hexidecimal numbers */
-static int json_cfg_refuse_invalid_numbers(lua_State *l)
+static int json_cfg_decode_invalid_numbers(lua_State *l)
{
- static const char *options_enc_dec[] = { "none", "encode", "decode",
- "both", NULL };
- json_config_t *cfg;
+ json_config_t *cfg = json_arg_init(l, 1);
- json_verify_arg_count(l, 1);
- cfg = json_fetch_config(l);
+ json_enum_option(l, 1, &cfg->decode_invalid_numbers, NULL, 1);
- json_enum_option(l, options_enc_dec,
- &cfg->encode_refuse_badnum,
- &cfg->decode_refuse_badnum);
+ json_verify_invalid_number_setting(l, &cfg->encode_invalid_numbers);
return 1;
}
@@ -364,16 +383,19 @@ static void json_create_config(lua_State *l)
lua_setfield(l, -2, "__gc");
lua_setmetatable(l, -2);
- strbuf_init(&cfg->encode_buf, 0);
-
cfg->encode_sparse_convert = DEFAULT_SPARSE_CONVERT;
cfg->encode_sparse_ratio = DEFAULT_SPARSE_RATIO;
cfg->encode_sparse_safe = DEFAULT_SPARSE_SAFE;
- cfg->encode_max_depth = DEFAULT_MAX_DEPTH;
- cfg->encode_refuse_badnum = DEFAULT_ENCODE_REFUSE_BADNUM;
- cfg->decode_refuse_badnum = DEFAULT_DECODE_REFUSE_BADNUM;
+ cfg->encode_max_depth = DEFAULT_ENCODE_MAX_DEPTH;
+ cfg->decode_max_depth = DEFAULT_DECODE_MAX_DEPTH;
+ cfg->encode_invalid_numbers = DEFAULT_ENCODE_INVALID_NUMBERS;
+ cfg->decode_invalid_numbers = DEFAULT_DECODE_INVALID_NUMBERS;
cfg->encode_keep_buffer = DEFAULT_ENCODE_KEEP_BUFFER;
- json_set_number_precision(cfg, 14);
+ cfg->encode_number_precision = DEFAULT_ENCODE_NUMBER_PRECISION;
+
+#if DEFAULT_ENCODE_KEEP_BUFFER > 0
+ strbuf_init(&cfg->encode_buf, 0);
+#endif
/* Decoding init */
@@ -419,41 +441,15 @@ static void json_create_config(lua_State *l)
cfg->escape2char['f'] = '\f';
cfg->escape2char['r'] = '\r';
cfg->escape2char['u'] = 'u'; /* Unicode parsing required */
-
-
-#if 0
- /* Initialise separate storage for pre-generated escape codes.
- * Escapes 0-31 map directly, 34, 92, 127 follow afterwards to
- * save memory. */
- for (i = 0 ; i < 32; i++)
- sprintf(cfg->escapes[i], "\\u%04x", i);
- strcpy(cfg->escapes[8], "\b"); /* Override simpler escapes */
- strcpy(cfg->escapes[9], "\t");
- strcpy(cfg->escapes[10], "\n");
- strcpy(cfg->escapes[12], "\f");
- strcpy(cfg->escapes[13], "\r");
- strcpy(cfg->escapes[32], "\\\""); /* chr(34) */
- strcpy(cfg->escapes[33], "\\\\"); /* chr(92) */
- sprintf(cfg->escapes[34], "\\u%04x", 127); /* char(127) */
-
- /* Initialise encoding escape lookup table */
- for (i = 0; i < 32; i++)
- cfg->char2escape[i] = cfg->escapes[i];
- for (i = 32; i < 256; i++)
- cfg->char2escape[i] = NULL;
- cfg->char2escape[34] = cfg->escapes[32];
- cfg->char2escape[92] = cfg->escapes[33];
- cfg->char2escape[127] = cfg->escapes[34];
-#endif
}
/* ===== ENCODING ===== */
-static void json_encode_exception(lua_State *l, json_config_t *cfg, int lindex,
+static void json_encode_exception(lua_State *l, json_config_t *cfg, strbuf_t *json, int lindex,
const char *reason)
{
if (!cfg->encode_keep_buffer)
- strbuf_free(&cfg->encode_buf);
+ strbuf_free(json);
luaL_error(l, "Cannot serialise %s: %s",
lua_typename(l, lua_type(l, lindex)), reason);
}
@@ -494,7 +490,7 @@ static void json_append_string(lua_State *l, strbuf_t *json, int lindex)
* -1 object (not a pure array)
* >=0 elements in array
*/
-static int lua_array_length(lua_State *l, json_config_t *cfg)
+static int lua_array_length(lua_State *l, json_config_t *cfg, strbuf_t *json)
{
double k;
int max;
@@ -529,7 +525,7 @@ static int lua_array_length(lua_State *l, json_config_t *cfg)
max > items * cfg->encode_sparse_ratio &&
max > cfg->encode_sparse_safe) {
if (!cfg->encode_sparse_convert)
- json_encode_exception(l, cfg, -1, "excessively sparse array");
+ json_encode_exception(l, cfg, json, -1, "excessively sparse array");
return -1;
}
@@ -537,31 +533,41 @@ static int lua_array_length(lua_State *l, json_config_t *cfg)
return max;
}
-static void json_encode_descend(lua_State *l, json_config_t *cfg)
+static void json_check_encode_depth(lua_State *l, json_config_t *cfg,
+ int current_depth, strbuf_t *json)
{
- cfg->current_depth++;
+ /* Ensure there are enough slots free to traverse a table (key,
+ * value) and push a string for a potential error message.
+ *
+ * Unlike "decode", the key and value are still on the stack when
+ * lua_checkstack() is called. Hence an extra slot for luaL_error()
+ * below is required just in case the next check to lua_checkstack()
+ * fails.
+ *
+ * While this won't cause a crash due to the EXTRA_STACK reserve
+ * slots, it would still be an improper use of the API. */
+ if (current_depth <= cfg->encode_max_depth && lua_checkstack(l, 3))
+ return;
- if (cfg->current_depth > cfg->encode_max_depth) {
- if (!cfg->encode_keep_buffer)
- strbuf_free(&cfg->encode_buf);
- luaL_error(l, "Cannot serialise, excessive nesting (%d)",
- cfg->current_depth);
- }
+ if (!cfg->encode_keep_buffer)
+ strbuf_free(json);
+
+ luaL_error(l, "Cannot serialise, excessive nesting (%d)",
+ current_depth);
}
-static void json_append_data(lua_State *l, json_config_t *cfg, strbuf_t *json);
+static void json_append_data(lua_State *l, json_config_t *cfg,
+ int current_depth, strbuf_t *json);
/* json_append_array args:
* - lua_State
* - JSON strbuf
* - Size of passwd Lua array (top of stack) */
-static void json_append_array(lua_State *l, json_config_t *cfg, strbuf_t *json,
- int array_length)
+static void json_append_array(lua_State *l, json_config_t *cfg, int current_depth,
+ strbuf_t *json, int array_length)
{
int comma, i;
- json_encode_descend(l, cfg);
-
strbuf_append_char(json, '[');
comma = 0;
@@ -572,38 +578,48 @@ static void json_append_array(lua_State *l, json_config_t *cfg, strbuf_t *json,
comma = 1;
lua_rawgeti(l, -1, i);
- json_append_data(l, cfg, json);
+ json_append_data(l, cfg, current_depth, json);
lua_pop(l, 1);
}
strbuf_append_char(json, ']');
-
- cfg->current_depth--;
}
-static void json_append_number(lua_State *l, strbuf_t *json, int index,
- json_config_t *cfg)
+static void json_append_number(lua_State *l, json_config_t *cfg,
+ strbuf_t *json, int lindex)
{
- double num = lua_tonumber(l, index);
+ double num = lua_tonumber(l, lindex);
+ int len;
- if (cfg->encode_refuse_badnum && (isinf(num) || isnan(num)))
- json_encode_exception(l, cfg, index, "must not be NaN or Inf");
+ if (cfg->encode_invalid_numbers == 0) {
+ /* Prevent encoding invalid numbers */
+ if (isinf(num) || isnan(num))
+ json_encode_exception(l, cfg, json, lindex, "must not be NaN or Inf");
+ } else if (cfg->encode_invalid_numbers == 1) {
+ /* Encode invalid numbers, but handle "nan" separately
+ * since some platforms may encode as "-nan". */
+ if (isnan(num)) {
+ strbuf_append_mem(json, "nan", 3);
+ return;
+ }
+ } else {
+ /* Encode invalid numbers as "null" */
+ if (isinf(num) || isnan(num)) {
+ strbuf_append_mem(json, "null", 4);
+ return;
+ }
+ }
- /* Lowest double printed with %.14g is 21 characters long:
- * -1.7976931348623e+308
- *
- * Use 32 to include the \0, and a few extra just in case..
- */
- strbuf_append_fmt(json, 32, cfg->number_fmt, num);
+ strbuf_ensure_empty_length(json, FPCONV_G_FMT_BUFSIZE);
+ len = fpconv_g_fmt(strbuf_empty_ptr(json), num, cfg->encode_number_precision);
+ strbuf_extend_length(json, len);
}
static void json_append_object(lua_State *l, json_config_t *cfg,
- strbuf_t *json)
+ int current_depth, strbuf_t *json)
{
int comma, keytype;
- json_encode_descend(l, cfg);
-
/* Object */
strbuf_append_char(json, '{');
@@ -620,30 +636,29 @@ static void json_append_object(lua_State *l, json_config_t *cfg,
keytype = lua_type(l, -2);
if (keytype == LUA_TNUMBER) {
strbuf_append_char(json, '"');
- json_append_number(l, json, -2, cfg);
+ json_append_number(l, cfg, json, -2);
strbuf_append_mem(json, "\":", 2);
} else if (keytype == LUA_TSTRING) {
json_append_string(l, json, -2);
strbuf_append_char(json, ':');
} else {
- json_encode_exception(l, cfg, -2,
+ json_encode_exception(l, cfg, json, -2,
"table key must be a number or string");
/* never returns */
}
/* table, key, value */
- json_append_data(l, cfg, json);
+ json_append_data(l, cfg, current_depth, json);
lua_pop(l, 1);
/* table, key */
}
strbuf_append_char(json, '}');
-
- cfg->current_depth--;
}
/* Serialise Lua data into JSON string. */
-static void json_append_data(lua_State *l, json_config_t *cfg, strbuf_t *json)
+static void json_append_data(lua_State *l, json_config_t *cfg,
+ int current_depth, strbuf_t *json)
{
int len;
@@ -652,7 +667,7 @@ static void json_append_data(lua_State *l, json_config_t *cfg, strbuf_t *json)
json_append_string(l, json, -1);
break;
case LUA_TNUMBER:
- json_append_number(l, json, -1, cfg);
+ json_append_number(l, cfg, json, -1);
break;
case LUA_TBOOLEAN:
if (lua_toboolean(l, -1))
@@ -661,11 +676,13 @@ static void json_append_data(lua_State *l, json_config_t *cfg, strbuf_t *json)
strbuf_append_mem(json, "false", 5);
break;
case LUA_TTABLE:
- len = lua_array_length(l, cfg);
+ current_depth++;
+ json_check_encode_depth(l, cfg, current_depth, json);
+ len = lua_array_length(l, cfg, json);
if (len > 0)
- json_append_array(l, cfg, json, len);
+ json_append_array(l, cfg, current_depth, json, len);
else
- json_append_object(l, cfg, json);
+ json_append_object(l, cfg, current_depth, json);
break;
case LUA_TNIL:
strbuf_append_mem(json, "null", 4);
@@ -678,38 +695,38 @@ static void json_append_data(lua_State *l, json_config_t *cfg, strbuf_t *json)
default:
/* Remaining types (LUA_TFUNCTION, LUA_TUSERDATA, LUA_TTHREAD,
* and LUA_TLIGHTUSERDATA) cannot be serialised */
- json_encode_exception(l, cfg, -1, "type not supported");
+ json_encode_exception(l, cfg, json, -1, "type not supported");
/* never returns */
}
}
static int json_encode(lua_State *l)
{
- json_config_t *cfg;
+ json_config_t *cfg = json_fetch_config(l);
+ strbuf_t local_encode_buf;
+ strbuf_t *encode_buf;
char *json;
int len;
- /* Can't use json_verify_arg_count() since we need to ensure
- * there is only 1 argument */
luaL_argcheck(l, lua_gettop(l) == 1, 1, "expected 1 argument");
- cfg = json_fetch_config(l);
- cfg->current_depth = 0;
-
- /* Reset the persistent buffer if it exists.
- * Otherwise allocate a new buffer. */
- if (strbuf_allocated(&cfg->encode_buf))
- strbuf_reset(&cfg->encode_buf);
- else
- strbuf_init(&cfg->encode_buf, 0);
+ if (!cfg->encode_keep_buffer) {
+ /* Use private buffer */
+ encode_buf = &local_encode_buf;
+ strbuf_init(encode_buf, 0);
+ } else {
+ /* Reuse existing buffer */
+ encode_buf = &cfg->encode_buf;
+ strbuf_reset(encode_buf);
+ }
- json_append_data(l, cfg, &cfg->encode_buf);
- json = strbuf_string(&cfg->encode_buf, &len);
+ json_append_data(l, cfg, 0, encode_buf);
+ json = strbuf_string(encode_buf, &len);
lua_pushlstring(l, json, len);
if (!cfg->encode_keep_buffer)
- strbuf_free(&cfg->encode_buf);
+ strbuf_free(encode_buf);
return 1;
}
@@ -808,7 +825,7 @@ static int json_append_unicode_escape(json_parse_t *json)
int escape_len = 6;
/* Fetch UTF-16 code unit */
- codepoint = decode_hex4(&json->data[json->index + 2]);
+ codepoint = decode_hex4(json->ptr + 2);
if (codepoint < 0)
return -1;
@@ -824,13 +841,13 @@ static int json_append_unicode_escape(json_parse_t *json)
return -1;
/* Ensure the next code is a unicode escape */
- if (json->data[json->index + escape_len] != '\\' ||
- json->data[json->index + escape_len + 1] != 'u') {
+ if (*(json->ptr + escape_len) != '\\' ||
+ *(json->ptr + escape_len + 1) != 'u') {
return -1;
}
/* Fetch the next codepoint */
- surrogate_low = decode_hex4(&json->data[json->index + 2 + escape_len]);
+ surrogate_low = decode_hex4(json->ptr + 2 + escape_len);
if (surrogate_low < 0)
return -1;
@@ -852,7 +869,7 @@ static int json_append_unicode_escape(json_parse_t *json)
/* Append bytes and advance parse index */
strbuf_append_mem_unsafe(json->tmp, utf8, len);
- json->index += escape_len;
+ json->ptr += escape_len;
return 0;
}
@@ -861,7 +878,7 @@ static void json_set_token_error(json_token_t *token, json_parse_t *json,
const char *errtype)
{
token->type = T_ERROR;
- token->index = json->index;
+ token->index = json->ptr - json->data;
token->value.string = errtype;
}
@@ -871,15 +888,18 @@ static void json_next_string_token(json_parse_t *json, json_token_t *token)
char ch;
/* Caller must ensure a string is next */
- assert(json->data[json->index] == '"');
+ assert(*json->ptr == '"');
/* Skip " */
- json->index++;
+ json->ptr++;
/* json->tmp is the temporary strbuf used to accumulate the
- * decoded string value. */
+ * decoded string value.
+ * json->tmp is sized to handle JSON containing only a string value.
+ */
strbuf_reset(json->tmp);
- while ((ch = json->data[json->index]) != '"') {
+
+ while ((ch = *json->ptr) != '"') {
if (!ch) {
/* Premature end of the string */
json_set_token_error(token, json, "unexpected end of string");
@@ -889,7 +909,7 @@ static void json_next_string_token(json_parse_t *json, json_token_t *token)
/* Handle escapes */
if (ch == '\\') {
/* Fetch escape character */
- ch = json->data[json->index + 1];
+ ch = *(json->ptr + 1);
/* Translate escape code and append to tmp string */
ch = escape2char[(unsigned char)ch];
@@ -907,14 +927,14 @@ static void json_next_string_token(json_parse_t *json, json_token_t *token)
}
/* Skip '\' */
- json->index++;
+ json->ptr++;
}
/* Append normal character or translated single character
* Unicode escapes are handled above */
strbuf_append_char_unsafe(json->tmp, ch);
- json->index++;
+ json->ptr++;
}
- json->index++; /* Eat final quote (") */
+ json->ptr++; /* Eat final quote (") */
strbuf_ensure_null(json->tmp);
@@ -928,7 +948,7 @@ static void json_next_string_token(json_parse_t *json, json_token_t *token)
* json_next_number_token() uses strtod() which allows other forms:
* - numbers starting with '+'
* - NaN, -NaN, infinity, -infinity
- * - hexidecimal numbers
+ * - hexadecimal numbers
* - numbers with leading zeros
*
* json_is_invalid_number() detects "numbers" which may pass strtod()'s
@@ -939,34 +959,33 @@ static void json_next_string_token(json_parse_t *json, json_token_t *token)
*/
static int json_is_invalid_number(json_parse_t *json)
{
- int i = json->index;
+ const char *p = json->ptr;
/* Reject numbers starting with + */
- if (json->data[i] == '+')
+ if (*p == '+')
return 1;
/* Skip minus sign if it exists */
- if (json->data[i] == '-')
- i++;
+ if (*p == '-')
+ p++;
/* Reject numbers starting with 0x, or leading zeros */
- if (json->data[i] == '0') {
- int ch2 = json->data[i + 1];
+ if (*p == '0') {
+ int ch2 = *(p + 1);
if ((ch2 | 0x20) == 'x' || /* Hex */
('0' <= ch2 && ch2 <= '9')) /* Leading zero */
return 1;
return 0;
- } else if (json->data[i] <= '9') {
+ } else if (*p <= '9') {
return 0; /* Ordinary number */
}
-
/* Reject inf/nan */
- if (!strncasecmp(&json->data[i], "inf", 3))
+ if (!strncasecmp(p, "inf", 3))
return 1;
- if (!strncasecmp(&json->data[i], "nan", 3))
+ if (!strncasecmp(p, "nan", 3))
return 1;
/* Pass all other numbers which may still be invalid, but
@@ -976,35 +995,39 @@ static int json_is_invalid_number(json_parse_t *json)
static void json_next_number_token(json_parse_t *json, json_token_t *token)
{
- const char *startptr;
char *endptr;
token->type = T_NUMBER;
- startptr = &json->data[json->index];
- token->value.number = strtod(&json->data[json->index], &endptr);
- if (startptr == endptr)
+ token->value.number = fpconv_strtod(json->ptr, &endptr);
+ if (json->ptr == endptr)
json_set_token_error(token, json, "invalid number");
else
- json->index += endptr - startptr; /* Skip the processed number */
+ json->ptr = endptr; /* Skip the processed number */
return;
}
/* Fills in the token struct.
* T_STRING will return a pointer to the json_parse_t temporary string
- * T_ERROR will leave the json->index pointer at the error.
+ * T_ERROR will leave the json->ptr pointer at the error.
*/
static void json_next_token(json_parse_t *json, json_token_t *token)
{
- json_token_type_t *ch2token = json->cfg->ch2token;
+ const json_token_type_t *ch2token = json->cfg->ch2token;
int ch;
- /* Eat whitespace. FIXME: UGLY */
- token->type = ch2token[(unsigned char)json->data[json->index]];
- while (token->type == T_WHITESPACE)
- token->type = ch2token[(unsigned char)json->data[++json->index]];
+ /* Eat whitespace. */
+ while (1) {
+ ch = (unsigned char)*(json->ptr);
+ token->type = ch2token[ch];
+ if (token->type != T_WHITESPACE)
+ break;
+ json->ptr++;
+ }
- token->index = json->index;
+ /* Store location of new token. Required when throwing errors
+ * for unexpected tokens (syntax errors). */
+ token->index = json->ptr - json->data;
/* Don't advance the pointer for an error or the end */
if (token->type == T_ERROR) {
@@ -1018,14 +1041,13 @@ static void json_next_token(json_parse_t *json, json_token_t *token)
/* Found a known single character token, advance index and return */
if (token->type != T_UNKNOWN) {
- json->index++;
+ json->ptr++;
return;
}
- /* Process characters which triggered T_UNKNOWN */
- ch = json->data[json->index];
-
- /* Must use strncmp() to match the front of the JSON string.
+ /* Process characters which triggered T_UNKNOWN
+ *
+ * Must use strncmp() to match the front of the JSON string.
* JSON identifier must be lowercase.
* When strict_numbers if disabled, either case is allowed for
* Infinity/NaN (since we are no longer following the spec..) */
@@ -1033,29 +1055,29 @@ static void json_next_token(json_parse_t *json, json_token_t *token)
json_next_string_token(json, token);
return;
} else if (ch == '-' || ('0' <= ch && ch <= '9')) {
- if (json->cfg->decode_refuse_badnum && json_is_invalid_number(json)) {
+ if (!json->cfg->decode_invalid_numbers && json_is_invalid_number(json)) {
json_set_token_error(token, json, "invalid number");
return;
}
json_next_number_token(json, token);
return;
- } else if (!strncmp(&json->data[json->index], "true", 4)) {
+ } else if (!strncmp(json->ptr, "true", 4)) {
token->type = T_BOOLEAN;
token->value.boolean = 1;
- json->index += 4;
+ json->ptr += 4;
return;
- } else if (!strncmp(&json->data[json->index], "false", 5)) {
+ } else if (!strncmp(json->ptr, "false", 5)) {
token->type = T_BOOLEAN;
token->value.boolean = 0;
- json->index += 5;
+ json->ptr += 5;
return;
- } else if (!strncmp(&json->data[json->index], "null", 4)) {
+ } else if (!strncmp(json->ptr, "null", 4)) {
token->type = T_NULL;
- json->index += 4;
+ json->ptr += 4;
return;
- } else if (!json->cfg->decode_refuse_badnum &&
+ } else if (json->cfg->decode_invalid_numbers &&
json_is_invalid_number(json)) {
- /* When refuse_badnum is disabled, only attempt to process
+ /* When decode_invalid_numbers is enabled, only attempt to process
* numbers we know are invalid JSON (Inf, NaN, hex)
* This is required to generate an appropriate token error,
* otherwise all bad tokens will register as "invalid number"
@@ -1091,13 +1113,23 @@ static void json_throw_parse_error(lua_State *l, json_parse_t *json,
exp, found, token->index + 1);
}
-static void json_decode_checkstack(lua_State *l, json_parse_t *json, int n)
+static inline void json_decode_ascend(json_parse_t *json)
{
- if (lua_checkstack(l, n))
+ json->current_depth--;
+}
+
+static void json_decode_descend(lua_State *l, json_parse_t *json, int slots)
+{
+ json->current_depth++;
+
+ if (json->current_depth <= json->cfg->decode_max_depth &&
+ lua_checkstack(l, slots)) {
return;
+ }
strbuf_free(json->tmp);
- luaL_error(l, "Too many nested data structures");
+ luaL_error(l, "Found too many nested data structures (%d) at character %d",
+ json->current_depth, json->ptr - json->data);
}
static void json_parse_object_context(lua_State *l, json_parse_t *json)
@@ -1106,7 +1138,7 @@ static void json_parse_object_context(lua_State *l, json_parse_t *json)
/* 3 slots required:
* .., table, key, value */
- json_decode_checkstack(l, json, 3);
+ json_decode_descend(l, json, 3);
lua_newtable(l);
@@ -1114,6 +1146,7 @@ static void json_parse_object_context(lua_State *l, json_parse_t *json)
/* Handle empty objects */
if (token.type == T_OBJ_END) {
+ json_decode_ascend(json);
return;
}
@@ -1137,8 +1170,10 @@ static void json_parse_object_context(lua_State *l, json_parse_t *json)
json_next_token(json, &token);
- if (token.type == T_OBJ_END)
+ if (token.type == T_OBJ_END) {
+ json_decode_ascend(json);
return;
+ }
if (token.type != T_COMMA)
json_throw_parse_error(l, json, "comma or object end", &token);
@@ -1155,15 +1190,17 @@ static void json_parse_array_context(lua_State *l, json_parse_t *json)
/* 2 slots required:
* .., table, value */
- json_decode_checkstack(l, json, 2);
+ json_decode_descend(l, json, 2);
lua_newtable(l);
json_next_token(json, &token);
/* Handle empty arrays */
- if (token.type == T_ARR_END)
+ if (token.type == T_ARR_END) {
+ json_decode_ascend(json);
return;
+ }
for (i = 1; ; i++) {
json_process_value(l, json, &token);
@@ -1171,8 +1208,10 @@ static void json_parse_array_context(lua_State *l, json_parse_t *json)
json_next_token(json, &token);
- if (token.type == T_ARR_END)
+ if (token.type == T_ARR_END) {
+ json_decode_ascend(json);
return;
+ }
if (token.type != T_COMMA)
json_throw_parse_error(l, json, "comma or array end", &token);
@@ -1211,15 +1250,26 @@ static void json_process_value(lua_State *l, json_parse_t *json,
}
}
-/* json_text must be null terminated string */
-static void lua_json_decode(lua_State *l, const char *json_text, int json_len)
+static int json_decode(lua_State *l)
{
json_parse_t json;
json_token_t token;
+ size_t json_len;
+
+ luaL_argcheck(l, lua_gettop(l) == 1, 1, "expected 1 argument");
json.cfg = json_fetch_config(l);
- json.data = json_text;
- json.index = 0;
+ json.data = luaL_checklstring(l, 1, &json_len);
+ json.current_depth = 0;
+ json.ptr = json.data;
+
+ /* Detect Unicode other than UTF-8 (see RFC 4627, Sec 3)
+ *
+ * CJSON can support any simple data type, hence only the first
+ * character is guaranteed to be ASCII (at worst: '"'). This is
+ * still enough to detect whether the wrong encoding is in use. */
+ if (json_len >= 2 && (!json.data[0] || !json.data[1]))
+ luaL_error(l, "JSON parser does not support UTF-16 or UTF-32");
/* Ensure the temporary buffer can hold the entire string.
* This means we no longer need to do length checks since the decoded
@@ -1236,64 +1286,142 @@ static void lua_json_decode(lua_State *l, const char *json_text, int json_len)
json_throw_parse_error(l, &json, "the end", &token);
strbuf_free(json.tmp);
+
+ return 1;
}
-static int json_decode(lua_State *l)
+/* ===== INITIALISATION ===== */
+
+#if !defined(LUA_VERSION_NUM) || LUA_VERSION_NUM < 502
+/* Compatibility for Lua 5.1.
+ *
+ * luaL_setfuncs() is used to create a module table where the functions have
+ * json_config_t as their first upvalue. Code borrowed from Lua 5.2 source. */
+static void luaL_setfuncs (lua_State *l, const luaL_Reg *reg, int nup)
{
- const char *json;
- size_t len;
+ int i;
- json_verify_arg_count(l, 1);
+ luaL_checkstack(l, nup, "too many upvalues");
+ for (; reg->name != NULL; reg++) { /* fill the table with given functions */
+ for (i = 0; i < nup; i++) /* copy upvalues to the top */
+ lua_pushvalue(l, -nup);
+ lua_pushcclosure(l, reg->func, nup); /* closure with those upvalues */
+ lua_setfield(l, -(nup + 2), reg->name);
+ }
+ lua_pop(l, nup); /* remove upvalues */
+}
+#endif
- json = luaL_checklstring(l, 1, &len);
+/* Call target function in protected mode with all supplied args.
+ * Assumes target function only returns a single non-nil value.
+ * Convert and return thrown errors as: nil, "error message" */
+static int json_protect_conversion(lua_State *l)
+{
+ int err;
- /* Detect Unicode other than UTF-8 (see RFC 4627, Sec 3)
- *
- * CJSON can support any simple data type, hence only the first
- * character is guaranteed to be ASCII (at worst: '"'). This is
- * still enough to detect whether the wrong encoding is in use. */
- if (len >= 2 && (!json[0] || !json[1]))
- luaL_error(l, "JSON parser does not support UTF-16 or UTF-32");
+ /* Deliberately throw an error for invalid arguments */
+ luaL_argcheck(l, lua_gettop(l) == 1, 1, "expected 1 argument");
- lua_json_decode(l, json, len);
+ /* pcall() the function stored as upvalue(1) */
+ lua_pushvalue(l, lua_upvalueindex(1));
+ lua_insert(l, 1);
+ err = lua_pcall(l, 1, 1, 0);
+ if (!err)
+ return 1;
- return 1;
-}
+ if (err == LUA_ERRRUN) {
+ lua_pushnil(l);
+ lua_insert(l, -2);
+ return 2;
+ }
-/* ===== INITIALISATION ===== */
+ /* Since we are not using a custom error handler, the only remaining
+ * errors are memory related */
+ return luaL_error(l, "Memory allocation error in CJSON protected call");
+}
-int luaopen_cjson(lua_State *l)
+/* Return cjson module table */
+static int lua_cjson_new(lua_State *l)
{
luaL_Reg reg[] = {
{ "encode", json_encode },
{ "decode", json_decode },
{ "encode_sparse_array", json_cfg_encode_sparse_array },
{ "encode_max_depth", json_cfg_encode_max_depth },
+ { "decode_max_depth", json_cfg_decode_max_depth },
{ "encode_number_precision", json_cfg_encode_number_precision },
{ "encode_keep_buffer", json_cfg_encode_keep_buffer },
- { "refuse_invalid_numbers", json_cfg_refuse_invalid_numbers },
+ { "encode_invalid_numbers", json_cfg_encode_invalid_numbers },
+ { "decode_invalid_numbers", json_cfg_decode_invalid_numbers },
+ { "new", lua_cjson_new },
{ NULL, NULL }
};
- /* Use json_fetch_config as a pointer.
- * It's faster than using a config string, and more unique */
- lua_pushlightuserdata(l, &json_config_key);
- json_create_config(l);
- lua_settable(l, LUA_REGISTRYINDEX);
+ /* Initialise number conversions */
+ fpconv_init();
- luaL_register(l, "cjson", reg);
+ /* cjson module table */
+ lua_newtable(l);
+
+ /* Register functions with config data as upvalue */
+ json_create_config(l);
+ luaL_setfuncs(l, reg, 1);
/* Set cjson.null */
lua_pushlightuserdata(l, NULL);
lua_setfield(l, -2, "null");
- /* Set cjson.version */
- lua_pushliteral(l, VERSION);
- lua_setfield(l, -2, "version");
+ /* Set module name / version fields */
+ lua_pushliteral(l, CJSON_MODNAME);
+ lua_setfield(l, -2, "_NAME");
+ lua_pushliteral(l, CJSON_VERSION);
+ lua_setfield(l, -2, "_VERSION");
+
+ return 1;
+}
+
+/* Return cjson.safe module table */
+static int lua_cjson_safe_new(lua_State *l)
+{
+ const char *func[] = { "decode", "encode", NULL };
+ int i;
+
+ lua_cjson_new(l);
+
+ /* Fix new() method */
+ lua_pushcfunction(l, lua_cjson_safe_new);
+ lua_setfield(l, -2, "new");
+
+ for (i = 0; func[i]; i++) {
+ lua_getfield(l, -1, func[i]);
+ lua_pushcclosure(l, json_protect_conversion, 1);
+ lua_setfield(l, -2, func[i]);
+ }
+
+ return 1;
+}
+
+int luaopen_cjson(lua_State *l)
+{
+ lua_cjson_new(l);
+
+#ifdef ENABLE_CJSON_GLOBAL
+ /* Register a global "cjson" table. */
+ lua_pushvalue(l, -1);
+ lua_setglobal(l, CJSON_MODNAME);
+#endif
/* Return cjson table */
return 1;
}
+int luaopen_cjson_safe(lua_State *l)
+{
+ lua_cjson_safe_new(l);
+
+ /* Return cjson.safe table */
+ return 1;
+}
+
/* vi:ai et sw=4 ts=4:
*/
diff --git a/deps/lua/src/lua_cmsgpack.c b/deps/lua/src/lua_cmsgpack.c
index 53dc1cf61..e13f053d2 100644
--- a/deps/lua/src/lua_cmsgpack.c
+++ b/deps/lua/src/lua_cmsgpack.c
@@ -7,14 +7,38 @@
#include "lua.h"
#include "lauxlib.h"
-#define LUACMSGPACK_VERSION "lua-cmsgpack 0.3.0"
+#define LUACMSGPACK_NAME "cmsgpack"
+#define LUACMSGPACK_SAFE_NAME "cmsgpack_safe"
+#define LUACMSGPACK_VERSION "lua-cmsgpack 0.4.0"
#define LUACMSGPACK_COPYRIGHT "Copyright (C) 2012, Salvatore Sanfilippo"
#define LUACMSGPACK_DESCRIPTION "MessagePack C implementation for Lua"
-#define LUACMSGPACK_MAX_NESTING 16 /* Max tables nesting. */
+/* Allows a preprocessor directive to override MAX_NESTING */
+#ifndef LUACMSGPACK_MAX_NESTING
+ #define LUACMSGPACK_MAX_NESTING 16 /* Max tables nesting. */
+#endif
-/* ==============================================================================
- * MessagePack implementation and bindings for Lua 5.1.
+/* Check if float or double can be an integer without loss of precision */
+#define IS_INT_TYPE_EQUIVALENT(x, T) (!isinf(x) && (T)(x) == (x))
+
+#define IS_INT64_EQUIVALENT(x) IS_INT_TYPE_EQUIVALENT(x, int64_t)
+#define IS_INT_EQUIVALENT(x) IS_INT_TYPE_EQUIVALENT(x, int)
+
+/* If size of pointer is equal to a 4 byte integer, we're on 32 bits. */
+#if UINTPTR_MAX == UINT_MAX
+ #define BITS_32 1
+#else
+ #define BITS_32 0
+#endif
+
+#if BITS_32
+ #define lua_pushunsigned(L, n) lua_pushnumber(L, n)
+#else
+ #define lua_pushunsigned(L, n) lua_pushinteger(L, n)
+#endif
+
+/* =============================================================================
+ * MessagePack implementation and bindings for Lua 5.1/5.2.
* Copyright(C) 2012 Salvatore Sanfilippo <antirez@gmail.com>
*
* http://github.com/antirez/lua-cmsgpack
@@ -29,23 +53,27 @@
* 20-Feb-2012 (ver 0.2.0): Tables encoding improved.
* 20-Feb-2012 (ver 0.2.1): Minor bug fixing.
* 20-Feb-2012 (ver 0.3.0): Module renamed lua-cmsgpack (was lua-msgpack).
- * ============================================================================ */
+ * 04-Apr-2014 (ver 0.3.1): Lua 5.2 support and minor bug fix.
+ * 07-Apr-2014 (ver 0.4.0): Multiple pack/unpack, lua allocator, efficiency.
+ * ========================================================================== */
-/* --------------------------- Endian conversion --------------------------------
- * We use it only for floats and doubles, all the other conversions are performed
+/* -------------------------- Endian conversion --------------------------------
+ * We use it only for floats and doubles, all the other conversions performed
* in an endian independent fashion. So the only thing we need is a function
- * that swaps a binary string if the arch is little endian (and left it untouched
+ * that swaps a binary string if arch is little endian (and left it untouched
* otherwise). */
/* Reverse memory bytes if arch is little endian. Given the conceptual
- * simplicity of the Lua build system we prefer to check for endianess at runtime.
+ * simplicity of the Lua build system we prefer check for endianess at runtime.
* The performance difference should be acceptable. */
static void memrevifle(void *ptr, size_t len) {
- unsigned char *p = ptr, *e = p+len-1, aux;
+ unsigned char *p = (unsigned char *)ptr,
+ *e = (unsigned char *)p+len-1,
+ aux;
int test = 1;
unsigned char *testp = (unsigned char*) &test;
- if (testp[0] == 0) return; /* Big endian, nothign to do. */
+ if (testp[0] == 0) return; /* Big endian, nothing to do. */
len /= 2;
while(len--) {
aux = *p;
@@ -56,30 +84,44 @@ static void memrevifle(void *ptr, size_t len) {
}
}
-/* ----------------------------- String buffer ----------------------------------
- * This is a simple implementation of string buffers. The only opereation
+/* ---------------------------- String buffer ----------------------------------
+ * This is a simple implementation of string buffers. The only operation
* supported is creating empty buffers and appending bytes to it.
* The string buffer uses 2x preallocation on every realloc for O(N) append
* behavior. */
typedef struct mp_buf {
+ lua_State *L;
unsigned char *b;
size_t len, free;
} mp_buf;
-static mp_buf *mp_buf_new(void) {
- mp_buf *buf = malloc(sizeof(*buf));
-
+static void *mp_realloc(lua_State *L, void *target, size_t osize,size_t nsize) {
+ void *(*local_realloc) (void *, void *, size_t osize, size_t nsize) = NULL;
+ void *ud;
+
+ local_realloc = lua_getallocf(L, &ud);
+
+ return local_realloc(ud, target, osize, nsize);
+}
+
+static mp_buf *mp_buf_new(lua_State *L) {
+ mp_buf *buf = NULL;
+
+ /* Old size = 0; new size = sizeof(*buf) */
+ buf = (mp_buf*)mp_realloc(L, NULL, 0, sizeof(*buf));
+
+ buf->L = L;
buf->b = NULL;
buf->len = buf->free = 0;
return buf;
}
-void mp_buf_append(mp_buf *buf, const unsigned char *s, size_t len) {
+static void mp_buf_append(mp_buf *buf, const unsigned char *s, size_t len) {
if (buf->free < len) {
size_t newlen = buf->len+len;
- buf->b = realloc(buf->b,newlen*2);
+ buf->b = (unsigned char*)mp_realloc(buf->L, buf->b, buf->len, newlen*2);
buf->free = newlen;
}
memcpy(buf->b+buf->len,s,len);
@@ -88,11 +130,11 @@ void mp_buf_append(mp_buf *buf, const unsigned char *s, size_t len) {
}
void mp_buf_free(mp_buf *buf) {
- free(buf->b);
- free(buf);
+ mp_realloc(buf->L, buf->b, buf->len, 0); /* realloc to 0 = free */
+ mp_realloc(buf->L, buf, sizeof(*buf), 0);
}
-/* ------------------------------ String cursor ----------------------------------
+/* ---------------------------- String cursor ----------------------------------
* This simple data structure is used for parsing. Basically you create a cursor
* using a string pointer and a length, then it is possible to access the
* current string position with cursor->p, check the remaining length
@@ -102,7 +144,7 @@ void mp_buf_free(mp_buf *buf) {
* be used to report errors. */
#define MP_CUR_ERROR_NONE 0
-#define MP_CUR_ERROR_EOF 1 /* Not enough data to complete the opereation. */
+#define MP_CUR_ERROR_EOF 1 /* Not enough data to complete operation. */
#define MP_CUR_ERROR_BADFMT 2 /* Bad data format */
typedef struct mp_cur {
@@ -111,22 +153,15 @@ typedef struct mp_cur {
int err;
} mp_cur;
-static mp_cur *mp_cur_new(const unsigned char *s, size_t len) {
- mp_cur *cursor = malloc(sizeof(*cursor));
-
+static void mp_cur_init(mp_cur *cursor, const unsigned char *s, size_t len) {
cursor->p = s;
cursor->left = len;
cursor->err = MP_CUR_ERROR_NONE;
- return cursor;
-}
-
-static void mp_cur_free(mp_cur *cursor) {
- free(cursor);
}
#define mp_cur_consume(_c,_len) do { _c->p += _len; _c->left -= _len; } while(0)
-/* When there is not enough room we set an error in the cursor and return, this
+/* When there is not enough room we set an error in the cursor and return. This
* is very common across the code so we have a macro to make the code look
* a bit simpler. */
#define mp_cur_need(_c,_len) do { \
@@ -136,7 +171,7 @@ static void mp_cur_free(mp_cur *cursor) {
} \
} while(0)
-/* --------------------------- Low level MP encoding -------------------------- */
+/* ------------------------- Low level MP encoding -------------------------- */
static void mp_encode_bytes(mp_buf *buf, const unsigned char *s, size_t len) {
unsigned char hdr[5];
@@ -219,7 +254,7 @@ static void mp_encode_int(mp_buf *buf, int64_t n) {
}
} else {
if (n >= -32) {
- b[0] = ((char)n); /* negative fixnum */
+ b[0] = ((signed char)n); /* negative fixnum */
enclen = 1;
} else if (n >= -128) {
b[0] = 0xd0; /* int 8 */
@@ -299,7 +334,7 @@ static void mp_encode_map(mp_buf *buf, int64_t n) {
mp_buf_append(buf,b,enclen);
}
-/* ----------------------------- Lua types encoding --------------------------- */
+/* --------------------------- Lua types encoding --------------------------- */
static void mp_encode_lua_string(lua_State *L, mp_buf *buf) {
size_t len;
@@ -314,13 +349,26 @@ static void mp_encode_lua_bool(lua_State *L, mp_buf *buf) {
mp_buf_append(buf,&b,1);
}
+/* Lua 5.3 has a built in 64-bit integer type */
+static void mp_encode_lua_integer(lua_State *L, mp_buf *buf) {
+#if (LUA_VERSION_NUM < 503) && BITS_32
+ lua_Number i = lua_tonumber(L,-1);
+#else
+ lua_Integer i = lua_tointeger(L,-1);
+#endif
+ mp_encode_int(buf, (int64_t)i);
+}
+
+/* Lua 5.2 and lower only has 64-bit doubles, so we need to
+ * detect if the double may be representable as an int
+ * for Lua < 5.3 */
static void mp_encode_lua_number(lua_State *L, mp_buf *buf) {
lua_Number n = lua_tonumber(L,-1);
- if (floor(n) != n) {
- mp_encode_double(buf,(double)n);
+ if (IS_INT64_EQUIVALENT(n)) {
+ mp_encode_lua_integer(L, buf);
} else {
- mp_encode_int(buf,(int64_t)n);
+ mp_encode_double(buf,(double)n);
}
}
@@ -328,7 +376,11 @@ static void mp_encode_lua_type(lua_State *L, mp_buf *buf, int level);
/* Convert a lua table into a message pack list. */
static void mp_encode_lua_table_as_array(lua_State *L, mp_buf *buf, int level) {
+#if LUA_VERSION_NUM < 502
size_t len = lua_objlen(L,-1), j;
+#else
+ size_t len = lua_rawlen(L,-1), j;
+#endif
mp_encode_array(buf,len);
for (j = 1; j <= len; j++) {
@@ -345,7 +397,7 @@ static void mp_encode_lua_table_as_map(lua_State *L, mp_buf *buf, int level) {
/* First step: count keys into table. No other way to do it with the
* Lua API, we need to iterate a first time. Note that an alternative
* would be to do a single run, and then hack the buffer to insert the
- * map opcodes for message pack. Too hachish for this lib. */
+ * map opcodes for message pack. Too hackish for this lib. */
lua_pushnil(L);
while(lua_next(L,-2)) {
lua_pop(L,1); /* remove value, keep key for next iteration. */
@@ -367,30 +419,43 @@ static void mp_encode_lua_table_as_map(lua_State *L, mp_buf *buf, int level) {
* of keys from numerical keys from 1 up to N, with N being the total number
* of elements, without any hole in the middle. */
static int table_is_an_array(lua_State *L) {
- long count = 0, max = 0, idx = 0;
+ int count = 0, max = 0;
+#if LUA_VERSION_NUM < 503
lua_Number n;
+#else
+ lua_Integer n;
+#endif
+
+ /* Stack top on function entry */
+ int stacktop;
+
+ stacktop = lua_gettop(L);
lua_pushnil(L);
while(lua_next(L,-2)) {
/* Stack: ... key value */
lua_pop(L,1); /* Stack: ... key */
- if (lua_type(L,-1) != LUA_TNUMBER) goto not_array;
- n = lua_tonumber(L,-1);
- idx = n;
- if (idx != n || idx < 1) goto not_array;
+ /* The <= 0 check is valid here because we're comparing indexes. */
+#if LUA_VERSION_NUM < 503
+ if ((LUA_TNUMBER != lua_type(L,-1)) || (n = lua_tonumber(L, -1)) <= 0 ||
+ !IS_INT_EQUIVALENT(n))
+#else
+ if (!lua_isinteger(L,-1) || (n = lua_tointeger(L, -1)) <= 0)
+#endif
+ {
+ lua_settop(L, stacktop);
+ return 0;
+ }
+ max = (n > max ? n : max);
count++;
- max = idx;
}
/* We have the total number of elements in "count". Also we have
- * the max index encountered in "idx". We can't reach this code
+ * the max index encountered in "max". We can't reach this code
* if there are indexes <= 0. If you also note that there can not be
- * repeated keys into a table, you have that if idx==count you are sure
+ * repeated keys into a table, you have that if max==count you are sure
* that there are all the keys form 1 to count (both included). */
- return idx == count;
-
-not_array:
- lua_pop(L,1);
- return 0;
+ lua_settop(L, stacktop);
+ return max == count;
}
/* If the length operator returns non-zero, that is, there is at least
@@ -405,6 +470,7 @@ static void mp_encode_lua_table(lua_State *L, mp_buf *buf, int level) {
static void mp_encode_lua_null(lua_State *L, mp_buf *buf) {
unsigned char b[1];
+ (void)L;
b[0] = 0xc0;
mp_buf_append(buf,b,1);
@@ -413,33 +479,70 @@ static void mp_encode_lua_null(lua_State *L, mp_buf *buf) {
static void mp_encode_lua_type(lua_State *L, mp_buf *buf, int level) {
int t = lua_type(L,-1);
- /* Limit the encoding of nested tables to a specfiied maximum depth, so that
+ /* Limit the encoding of nested tables to a specified maximum depth, so that
* we survive when called against circular references in tables. */
if (t == LUA_TTABLE && level == LUACMSGPACK_MAX_NESTING) t = LUA_TNIL;
switch(t) {
case LUA_TSTRING: mp_encode_lua_string(L,buf); break;
case LUA_TBOOLEAN: mp_encode_lua_bool(L,buf); break;
- case LUA_TNUMBER: mp_encode_lua_number(L,buf); break;
+ case LUA_TNUMBER:
+ #if LUA_VERSION_NUM < 503
+ mp_encode_lua_number(L,buf); break;
+ #else
+ if (lua_isinteger(L, -1)) {
+ mp_encode_lua_integer(L, buf);
+ } else {
+ mp_encode_lua_number(L, buf);
+ }
+ break;
+ #endif
case LUA_TTABLE: mp_encode_lua_table(L,buf,level); break;
default: mp_encode_lua_null(L,buf); break;
}
lua_pop(L,1);
}
+/*
+ * Packs all arguments as a stream for multiple upacking later.
+ * Returns error if no arguments provided.
+ */
static int mp_pack(lua_State *L) {
- mp_buf *buf = mp_buf_new();
+ int nargs = lua_gettop(L);
+ int i;
+ mp_buf *buf;
- mp_encode_lua_type(L,buf,0);
- lua_pushlstring(L,(char*)buf->b,buf->len);
+ if (nargs == 0)
+ return luaL_argerror(L, 0, "MessagePack pack needs input.");
+
+ buf = mp_buf_new(L);
+ for(i = 1; i <= nargs; i++) {
+ /* Copy argument i to top of stack for _encode processing;
+ * the encode function pops it from the stack when complete. */
+ lua_pushvalue(L, i);
+
+ mp_encode_lua_type(L,buf,0);
+
+ lua_pushlstring(L,(char*)buf->b,buf->len);
+
+ /* Reuse the buffer for the next operation by
+ * setting its free count to the total buffer size
+ * and the current position to zero. */
+ buf->free += buf->len;
+ buf->len = 0;
+ }
mp_buf_free(buf);
+
+ /* Concatenate all nargs buffers together */
+ lua_concat(L, nargs);
return 1;
}
-/* --------------------------------- Decoding --------------------------------- */
+/* ------------------------------- Decoding --------------------------------- */
void mp_decode_to_lua_type(lua_State *L, mp_cur *c);
void mp_decode_to_lua_array(lua_State *L, mp_cur *c, size_t len) {
+ assert(len <= UINT_MAX);
int index = 1;
lua_newtable(L);
@@ -452,6 +555,7 @@ void mp_decode_to_lua_array(lua_State *L, mp_cur *c, size_t len) {
}
void mp_decode_to_lua_hash(lua_State *L, mp_cur *c, size_t len) {
+ assert(len <= UINT_MAX);
lua_newtable(L);
while(len--) {
mp_decode_to_lua_type(L,c); /* key */
@@ -466,34 +570,44 @@ void mp_decode_to_lua_hash(lua_State *L, mp_cur *c, size_t len) {
* a Lua type, that is left as the only result on the stack. */
void mp_decode_to_lua_type(lua_State *L, mp_cur *c) {
mp_cur_need(c,1);
+
+ /* If we return more than 18 elements, we must resize the stack to
+ * fit all our return values. But, there is no way to
+ * determine how many objects a msgpack will unpack to up front, so
+ * we request a +1 larger stack on each iteration (noop if stack is
+ * big enough, and when stack does require resize it doubles in size) */
+ luaL_checkstack(L, 1,
+ "too many return values at once; "
+ "use unpack_one or unpack_limit instead.");
+
switch(c->p[0]) {
case 0xcc: /* uint 8 */
mp_cur_need(c,2);
- lua_pushnumber(L,c->p[1]);
+ lua_pushunsigned(L,c->p[1]);
mp_cur_consume(c,2);
break;
case 0xd0: /* int 8 */
mp_cur_need(c,2);
- lua_pushnumber(L,(char)c->p[1]);
+ lua_pushinteger(L,(signed char)c->p[1]);
mp_cur_consume(c,2);
break;
case 0xcd: /* uint 16 */
mp_cur_need(c,3);
- lua_pushnumber(L,
+ lua_pushunsigned(L,
(c->p[1] << 8) |
c->p[2]);
mp_cur_consume(c,3);
break;
case 0xd1: /* int 16 */
mp_cur_need(c,3);
- lua_pushnumber(L,(int16_t)
+ lua_pushinteger(L,(int16_t)
(c->p[1] << 8) |
c->p[2]);
mp_cur_consume(c,3);
break;
case 0xce: /* uint 32 */
mp_cur_need(c,5);
- lua_pushnumber(L,
+ lua_pushunsigned(L,
((uint32_t)c->p[1] << 24) |
((uint32_t)c->p[2] << 16) |
((uint32_t)c->p[3] << 8) |
@@ -502,7 +616,7 @@ void mp_decode_to_lua_type(lua_State *L, mp_cur *c) {
break;
case 0xd2: /* int 32 */
mp_cur_need(c,5);
- lua_pushnumber(L,
+ lua_pushinteger(L,
((int32_t)c->p[1] << 24) |
((int32_t)c->p[2] << 16) |
((int32_t)c->p[3] << 8) |
@@ -511,7 +625,7 @@ void mp_decode_to_lua_type(lua_State *L, mp_cur *c) {
break;
case 0xcf: /* uint 64 */
mp_cur_need(c,9);
- lua_pushnumber(L,
+ lua_pushunsigned(L,
((uint64_t)c->p[1] << 56) |
((uint64_t)c->p[2] << 48) |
((uint64_t)c->p[3] << 40) |
@@ -524,7 +638,11 @@ void mp_decode_to_lua_type(lua_State *L, mp_cur *c) {
break;
case 0xd3: /* int 64 */
mp_cur_need(c,9);
+#if LUA_VERSION_NUM < 503
lua_pushnumber(L,
+#else
+ lua_pushinteger(L,
+#endif
((int64_t)c->p[1] << 56) |
((int64_t)c->p[2] << 48) |
((int64_t)c->p[3] << 40) |
@@ -581,13 +699,14 @@ void mp_decode_to_lua_type(lua_State *L, mp_cur *c) {
case 0xdb: /* raw 32 */
mp_cur_need(c,5);
{
- size_t l = (c->p[1] << 24) |
- (c->p[2] << 16) |
- (c->p[3] << 8) |
- c->p[4];
- mp_cur_need(c,5+l);
- lua_pushlstring(L,(char*)c->p+5,l);
- mp_cur_consume(c,5+l);
+ size_t l = ((size_t)c->p[1] << 24) |
+ ((size_t)c->p[2] << 16) |
+ ((size_t)c->p[3] << 8) |
+ (size_t)c->p[4];
+ mp_cur_consume(c,5);
+ mp_cur_need(c,l);
+ lua_pushlstring(L,(char*)c->p,l);
+ mp_cur_consume(c,l);
}
break;
case 0xdc: /* array 16 */
@@ -601,10 +720,10 @@ void mp_decode_to_lua_type(lua_State *L, mp_cur *c) {
case 0xdd: /* array 32 */
mp_cur_need(c,5);
{
- size_t l = (c->p[1] << 24) |
- (c->p[2] << 16) |
- (c->p[3] << 8) |
- c->p[4];
+ size_t l = ((size_t)c->p[1] << 24) |
+ ((size_t)c->p[2] << 16) |
+ ((size_t)c->p[3] << 8) |
+ (size_t)c->p[4];
mp_cur_consume(c,5);
mp_decode_to_lua_array(L,c,l);
}
@@ -620,20 +739,20 @@ void mp_decode_to_lua_type(lua_State *L, mp_cur *c) {
case 0xdf: /* map 32 */
mp_cur_need(c,5);
{
- size_t l = (c->p[1] << 24) |
- (c->p[2] << 16) |
- (c->p[3] << 8) |
- c->p[4];
+ size_t l = ((size_t)c->p[1] << 24) |
+ ((size_t)c->p[2] << 16) |
+ ((size_t)c->p[3] << 8) |
+ (size_t)c->p[4];
mp_cur_consume(c,5);
mp_decode_to_lua_hash(L,c,l);
}
break;
default: /* types that can't be idenitified by first byte value. */
if ((c->p[0] & 0x80) == 0) { /* positive fixnum */
- lua_pushnumber(L,c->p[0]);
+ lua_pushunsigned(L,c->p[0]);
mp_cur_consume(c,1);
} else if ((c->p[0] & 0xe0) == 0xe0) { /* negative fixnum */
- lua_pushnumber(L,(signed char)c->p[0]);
+ lua_pushinteger(L,(signed char)c->p[0]);
mp_cur_consume(c,1);
} else if ((c->p[0] & 0xe0) == 0xa0) { /* fix raw */
size_t l = c->p[0] & 0x1f;
@@ -654,54 +773,163 @@ void mp_decode_to_lua_type(lua_State *L, mp_cur *c) {
}
}
-static int mp_unpack(lua_State *L) {
+static int mp_unpack_full(lua_State *L, int limit, int offset) {
size_t len;
- const unsigned char *s;
- mp_cur *c;
+ const char *s;
+ mp_cur c;
+ int cnt; /* Number of objects unpacked */
+ int decode_all = (!limit && !offset);
- if (!lua_isstring(L,-1)) {
- lua_pushstring(L,"MessagePack decoding needs a string as input.");
- lua_error(L);
+ s = luaL_checklstring(L,1,&len); /* if no match, exits */
+
+ if (offset < 0 || limit < 0) /* requesting negative off or lim is invalid */
+ return luaL_error(L,
+ "Invalid request to unpack with offset of %d and limit of %d.",
+ offset, len);
+ else if (offset > len)
+ return luaL_error(L,
+ "Start offset %d greater than input length %d.", offset, len);
+
+ if (decode_all) limit = INT_MAX;
+
+ mp_cur_init(&c,(const unsigned char *)s+offset,len-offset);
+
+ /* We loop over the decode because this could be a stream
+ * of multiple top-level values serialized together */
+ for(cnt = 0; c.left > 0 && cnt < limit; cnt++) {
+ mp_decode_to_lua_type(L,&c);
+
+ if (c.err == MP_CUR_ERROR_EOF) {
+ return luaL_error(L,"Missing bytes in input.");
+ } else if (c.err == MP_CUR_ERROR_BADFMT) {
+ return luaL_error(L,"Bad data format in input.");
+ }
}
- s = (const unsigned char*) lua_tolstring(L,-1,&len);
- c = mp_cur_new(s,len);
- mp_decode_to_lua_type(L,c);
-
- if (c->err == MP_CUR_ERROR_EOF) {
- mp_cur_free(c);
- lua_pushstring(L,"Missing bytes in input.");
- lua_error(L);
- } else if (c->err == MP_CUR_ERROR_BADFMT) {
- mp_cur_free(c);
- lua_pushstring(L,"Bad data format in input.");
- lua_error(L);
- } else if (c->left != 0) {
- mp_cur_free(c);
- lua_pushstring(L,"Extra bytes in input.");
- lua_error(L);
+ if (!decode_all) {
+ /* c->left is the remaining size of the input buffer.
+ * subtract the entire buffer size from the unprocessed size
+ * to get our next start offset */
+ int offset = len - c.left;
+ /* Return offset -1 when we have have processed the entire buffer. */
+ lua_pushinteger(L, c.left == 0 ? -1 : offset);
+ /* Results are returned with the arg elements still
+ * in place. Lua takes care of only returning
+ * elements above the args for us.
+ * In this case, we have one arg on the stack
+ * for this function, so we insert our first return
+ * value at position 2. */
+ lua_insert(L, 2);
+ cnt += 1; /* increase return count by one to make room for offset */
}
- mp_cur_free(c);
- return 1;
+
+ return cnt;
+}
+
+static int mp_unpack(lua_State *L) {
+ return mp_unpack_full(L, 0, 0);
+}
+
+static int mp_unpack_one(lua_State *L) {
+ int offset = luaL_optinteger(L, 2, 0);
+ /* Variable pop because offset may not exist */
+ lua_pop(L, lua_gettop(L)-1);
+ return mp_unpack_full(L, 1, offset);
+}
+
+static int mp_unpack_limit(lua_State *L) {
+ int limit = luaL_checkinteger(L, 2);
+ int offset = luaL_optinteger(L, 3, 0);
+ /* Variable pop because offset may not exist */
+ lua_pop(L, lua_gettop(L)-1);
+
+ return mp_unpack_full(L, limit, offset);
}
-/* ---------------------------------------------------------------------------- */
+static int mp_safe(lua_State *L) {
+ int argc, err, total_results;
+
+ argc = lua_gettop(L);
+
+ /* This adds our function to the bottom of the stack
+ * (the "call this function" position) */
+ lua_pushvalue(L, lua_upvalueindex(1));
+ lua_insert(L, 1);
+
+ err = lua_pcall(L, argc, LUA_MULTRET, 0);
+ total_results = lua_gettop(L);
+
+ if (!err) {
+ return total_results;
+ } else {
+ lua_pushnil(L);
+ lua_insert(L,-2);
+ return 2;
+ }
+}
-static const struct luaL_reg thislib[] = {
+/* -------------------------------------------------------------------------- */
+static const struct luaL_Reg cmds[] = {
{"pack", mp_pack},
{"unpack", mp_unpack},
- {NULL, NULL}
+ {"unpack_one", mp_unpack_one},
+ {"unpack_limit", mp_unpack_limit},
+ {0}
};
-LUALIB_API int luaopen_cmsgpack (lua_State *L) {
- luaL_register(L, "cmsgpack", thislib);
+static int luaopen_create(lua_State *L) {
+ int i;
+ /* Manually construct our module table instead of
+ * relying on _register or _newlib */
+ lua_newtable(L);
+
+ for (i = 0; i < (sizeof(cmds)/sizeof(*cmds) - 1); i++) {
+ lua_pushcfunction(L, cmds[i].func);
+ lua_setfield(L, -2, cmds[i].name);
+ }
+ /* Add metadata */
+ lua_pushliteral(L, LUACMSGPACK_NAME);
+ lua_setfield(L, -2, "_NAME");
lua_pushliteral(L, LUACMSGPACK_VERSION);
lua_setfield(L, -2, "_VERSION");
lua_pushliteral(L, LUACMSGPACK_COPYRIGHT);
lua_setfield(L, -2, "_COPYRIGHT");
lua_pushliteral(L, LUACMSGPACK_DESCRIPTION);
- lua_setfield(L, -2, "_DESCRIPTION");
+ lua_setfield(L, -2, "_DESCRIPTION");
+ return 1;
+}
+
+LUALIB_API int luaopen_cmsgpack(lua_State *L) {
+ luaopen_create(L);
+
+#if LUA_VERSION_NUM < 502
+ /* Register name globally for 5.1 */
+ lua_pushvalue(L, -1);
+ lua_setglobal(L, LUACMSGPACK_NAME);
+#endif
+
+ return 1;
+}
+
+LUALIB_API int luaopen_cmsgpack_safe(lua_State *L) {
+ int i;
+
+ luaopen_cmsgpack(L);
+
+ /* Wrap all functions in the safe handler */
+ for (i = 0; i < (sizeof(cmds)/sizeof(*cmds) - 1); i++) {
+ lua_getfield(L, -1, cmds[i].name);
+ lua_pushcclosure(L, mp_safe, 1);
+ lua_setfield(L, -2, cmds[i].name);
+ }
+
+#if LUA_VERSION_NUM < 502
+ /* Register name globally for 5.1 */
+ lua_pushvalue(L, -1);
+ lua_setglobal(L, LUACMSGPACK_SAFE_NAME);
+#endif
+
return 1;
}
diff --git a/deps/lua/src/strbuf.c b/deps/lua/src/strbuf.c
index 976925a88..f0f7f4b9a 100644
--- a/deps/lua/src/strbuf.c
+++ b/deps/lua/src/strbuf.c
@@ -1,6 +1,6 @@
-/* strbuf - string buffer routines
+/* strbuf - String buffer routines
*
- * Copyright (c) 2010-2011 Mark Pulford <mark@kyne.com.au>
+ * Copyright (c) 2010-2012 Mark Pulford <mark@kyne.com.au>
*
* Permission is hereby granted, free of charge, to any person obtaining
* a copy of this software and associated documentation files (the
@@ -29,7 +29,7 @@
#include "strbuf.h"
-void die(const char *fmt, ...)
+static void die(const char *fmt, ...)
{
va_list arg;
diff --git a/deps/lua/src/strbuf.h b/deps/lua/src/strbuf.h
index f856543ad..d861108c1 100644
--- a/deps/lua/src/strbuf.h
+++ b/deps/lua/src/strbuf.h
@@ -1,6 +1,6 @@
/* strbuf - String buffer routines
*
- * Copyright (c) 2010-2011 Mark Pulford <mark@kyne.com.au>
+ * Copyright (c) 2010-2012 Mark Pulford <mark@kyne.com.au>
*
* Permission is hereby granted, free of charge, to any person obtaining
* a copy of this software and associated documentation files (the
@@ -62,7 +62,9 @@ extern void strbuf_resize(strbuf_t *s, int len);
static int strbuf_empty_length(strbuf_t *s);
static int strbuf_length(strbuf_t *s);
static char *strbuf_string(strbuf_t *s, int *len);
-static void strbuf_ensure_empty_length(strbuf_t *s, int len);
+static void strbuf_ensure_empty_length(strbuf_t *s, int len);
+static char *strbuf_empty_ptr(strbuf_t *s);
+static void strbuf_extend_length(strbuf_t *s, int len);
/* Update */
extern void strbuf_append_fmt(strbuf_t *s, int len, const char *fmt, ...);
@@ -96,6 +98,16 @@ static inline void strbuf_ensure_empty_length(strbuf_t *s, int len)
strbuf_resize(s, s->length + len);
}
+static inline char *strbuf_empty_ptr(strbuf_t *s)
+{
+ return s->buf + s->length;
+}
+
+static inline void strbuf_extend_length(strbuf_t *s, int len)
+{
+ s->length += len;
+}
+
static inline int strbuf_length(strbuf_t *s)
{
return s->length;
diff --git a/redis.conf b/redis.conf
index 98ad40a53..6c765691d 100644
--- a/redis.conf
+++ b/redis.conf
@@ -240,6 +240,49 @@ slave-serve-stale-data yes
# administrative / dangerous commands.
slave-read-only yes
+# Replication SYNC strategy: disk or socket.
+#
+# -------------------------------------------------------
+# WARNING: DISKLESS REPLICATION IS EXPERIMENTAL CURRENTLY
+# -------------------------------------------------------
+#
+# New slaves and reconnecting slaves that are not able to continue the replication
+# process just receiving differences, need to do what is called a "full
+# synchronization". An RDB file is transmitted from the master to the slaves.
+# The transmission can happen in two different ways:
+#
+# 1) Disk-backed: The Redis master creates a new process that writes the RDB
+# file on disk. Later the file is transferred by the parent
+# process to the slaves incrementally.
+# 2) Diskless: The Redis master creates a new process that directly writes the
+# RDB file to slave sockets, without touching the disk at all.
+#
+# With disk-backed replication, while the RDB file is generated, more slaves
+# can be queued and served with the RDB file as soon as the current child producing
+# the RDB file finishes its work. With diskless replication instead once
+# the transfer starts, new slaves arriving will be queued and a new transfer
+# will start when the current one terminates.
+#
+# When diskless replication is used, the master waits a configurable amount of
+# time (in seconds) before starting the transfer in the hope that multiple slaves
+# will arrive and the transfer can be parallelized.
+#
+# With slow disks and fast (large bandwidth) networks, diskless replication
+# works better.
+repl-diskless-sync no
+
+# When diskless replication is enabled, it is possible to configure the delay
+# the server waits in order to spawn the child that trnasfers the RDB via socket
+# to the slaves.
+#
+# This is important since once the transfer starts, it is not possible to serve
+# new slaves arriving, that will be queued for the next RDB transfer, so the server
+# waits a delay in order to let more slaves arrive.
+#
+# The delay is specified in seconds, and by default is 5 seconds. To disable
+# it entirely just set it to 0 seconds and the transfer will start ASAP.
+repl-diskless-sync-delay 5
+
# Slaves send PINGs to server in a predefined interval. It's possible to change
# this interval with the repl_ping_slave_period option. The default value is 10
# seconds.
diff --git a/src/anet.c b/src/anet.c
index f36575bd4..732816308 100644
--- a/src/anet.c
+++ b/src/anet.c
@@ -34,6 +34,7 @@
#include <sys/socket.h>
#include <sys/stat.h>
#include <sys/un.h>
+#include <sys/time.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
@@ -57,24 +58,37 @@ static void anetSetError(char *err, const char *fmt, ...)
va_end(ap);
}
-int anetNonBlock(char *err, int fd)
-{
+int anetSetBlock(char *err, int fd, int non_block) {
int flags;
- /* Set the socket non-blocking.
+ /* Set the socket blocking (if non_block is zero) or non-blocking.
* Note that fcntl(2) for F_GETFL and F_SETFL can't be
* interrupted by a signal. */
if ((flags = fcntl(fd, F_GETFL)) == -1) {
anetSetError(err, "fcntl(F_GETFL): %s", strerror(errno));
return ANET_ERR;
}
- if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) {
+
+ if (non_block)
+ flags |= O_NONBLOCK;
+ else
+ flags &= ~O_NONBLOCK;
+
+ if (fcntl(fd, F_SETFL, flags) == -1) {
anetSetError(err, "fcntl(F_SETFL,O_NONBLOCK): %s", strerror(errno));
return ANET_ERR;
}
return ANET_OK;
}
+int anetNonBlock(char *err, int fd) {
+ return anetSetBlock(err,fd,1);
+}
+
+int anetBlock(char *err, int fd) {
+ return anetSetBlock(err,fd,0);
+}
+
/* Set TCP keep alive option to detect dead peers. The interval option
* is only used for Linux as we are using Linux-specific APIs to set
* the probe send time, interval, and count. */
@@ -165,6 +179,20 @@ int anetTcpKeepAlive(char *err, int fd)
return ANET_OK;
}
+/* Set the socket send timeout (SO_SNDTIMEO socket option) to the specified
+ * number of milliseconds, or disable it if the 'ms' argument is zero. */
+int anetSendTimeout(char *err, int fd, long long ms) {
+ struct timeval tv;
+
+ tv.tv_sec = ms/1000;
+ tv.tv_usec = (ms%1000)*1000;
+ if (setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)) == -1) {
+ anetSetError(err, "setsockopt SO_SNDTIMEO: %s", strerror(errno));
+ return ANET_ERR;
+ }
+ return ANET_OK;
+}
+
/* anetGenericResolve() is called by anetResolve() and anetResolveIP() to
* do the actual work. It resolves the hostname "host" and set the string
* representation of the IP address into the buffer pointed by "ipbuf".
diff --git a/src/anet.h b/src/anet.h
index c4248b9df..d78b1e7a0 100644
--- a/src/anet.h
+++ b/src/anet.h
@@ -61,9 +61,11 @@ int anetTcpAccept(char *err, int serversock, char *ip, size_t ip_len, int *port)
int anetUnixAccept(char *err, int serversock);
int anetWrite(int fd, char *buf, int count);
int anetNonBlock(char *err, int fd);
+int anetBlock(char *err, int fd);
int anetEnableTcpNoDelay(char *err, int fd);
int anetDisableTcpNoDelay(char *err, int fd);
int anetTcpKeepAlive(char *err, int fd);
+int anetSendTimeout(char *err, int fd, long long ms);
int anetPeerToString(int fd, char *ip, size_t ip_len, int *port);
int anetKeepAlive(char *err, int fd, int interval);
int anetSockName(int fd, char *ip, size_t ip_len, int *port);
diff --git a/src/config.c b/src/config.c
index aebdf1047..1ce7febf7 100644
--- a/src/config.c
+++ b/src/config.c
@@ -267,6 +267,16 @@ void loadServerConfigFromString(char *config) {
if ((server.repl_disable_tcp_nodelay = yesnotoi(argv[1])) == -1) {
err = "argument must be 'yes' or 'no'"; goto loaderr;
}
+ } else if (!strcasecmp(argv[0],"repl-diskless-sync") && argc==2) {
+ if ((server.repl_diskless_sync = yesnotoi(argv[1])) == -1) {
+ err = "argument must be 'yes' or 'no'"; goto loaderr;
+ }
+ } else if (!strcasecmp(argv[0],"repl-diskless-sync-delay") && argc==2) {
+ server.repl_diskless_sync_delay = atoi(argv[1]);
+ if (server.repl_diskless_sync_delay < 0) {
+ err = "repl-diskless-sync-delay can't be negative";
+ goto loaderr;
+ }
} else if (!strcasecmp(argv[0],"repl-backlog-size") && argc == 2) {
long long size = memtoll(argv[1],NULL);
if (size <= 0) {
@@ -281,7 +291,7 @@ void loadServerConfigFromString(char *config) {
goto loaderr;
}
} else if (!strcasecmp(argv[0],"masterauth") && argc == 2) {
- server.masterauth = zstrdup(argv[1]);
+ server.masterauth = zstrdup(argv[1]);
} else if (!strcasecmp(argv[0],"slave-serve-stale-data") && argc == 2) {
if ((server.repl_serve_stale_data = yesnotoi(argv[1])) == -1) {
err = "argument must be 'yes' or 'no'"; goto loaderr;
@@ -864,6 +874,15 @@ void configSetCommand(redisClient *c) {
if (yn == -1) goto badfmt;
server.repl_disable_tcp_nodelay = yn;
+ } else if (!strcasecmp(c->argv[2]->ptr,"repl-diskless-sync")) {
+ int yn = yesnotoi(o->ptr);
+
+ if (yn == -1) goto badfmt;
+ server.repl_diskless_sync = yn;
+ } else if (!strcasecmp(c->argv[2]->ptr,"repl-diskless-sync-delay")) {
+ if (getLongLongFromObject(o,&ll) == REDIS_ERR ||
+ ll < 0) goto badfmt;
+ server.repl_diskless_sync_delay = ll;
} else if (!strcasecmp(c->argv[2]->ptr,"slave-priority")) {
if (getLongLongFromObject(o,&ll) == REDIS_ERR ||
ll < 0) goto badfmt;
@@ -982,6 +1001,7 @@ void configGetCommand(redisClient *c) {
config_get_numerical_field("min-slaves-to-write",server.repl_min_slaves_to_write);
config_get_numerical_field("min-slaves-max-lag",server.repl_min_slaves_max_lag);
config_get_numerical_field("hz",server.hz);
+ config_get_numerical_field("repl-diskless-sync-delay",server.repl_diskless_sync_delay);
/* Bool (yes/no) values */
config_get_bool_field("no-appendfsync-on-rewrite",
@@ -998,6 +1018,8 @@ void configGetCommand(redisClient *c) {
config_get_bool_field("activerehashing", server.activerehashing);
config_get_bool_field("repl-disable-tcp-nodelay",
server.repl_disable_tcp_nodelay);
+ config_get_bool_field("repl-diskless-sync",
+ server.repl_diskless_sync);
config_get_bool_field("aof-rewrite-incremental-fsync",
server.aof_rewrite_incremental_fsync);
config_get_bool_field("aof-load-truncated",
@@ -1722,6 +1744,8 @@ int rewriteConfig(char *path) {
rewriteConfigBytesOption(state,"repl-backlog-size",server.repl_backlog_size,REDIS_DEFAULT_REPL_BACKLOG_SIZE);
rewriteConfigBytesOption(state,"repl-backlog-ttl",server.repl_backlog_time_limit,REDIS_DEFAULT_REPL_BACKLOG_TIME_LIMIT);
rewriteConfigYesNoOption(state,"repl-disable-tcp-nodelay",server.repl_disable_tcp_nodelay,REDIS_DEFAULT_REPL_DISABLE_TCP_NODELAY);
+ rewriteConfigYesNoOption(state,"repl-diskless-sync",server.repl_diskless_sync,REDIS_DEFAULT_REPL_DISKLESS_SYNC);
+ rewriteConfigNumericalOption(state,"repl-diskless-sync-delay",server.repl_diskless_sync_delay,REDIS_DEFAULT_REPL_DISKLESS_SYNC_DELAY);
rewriteConfigNumericalOption(state,"slave-priority",server.slave_priority,REDIS_DEFAULT_SLAVE_PRIORITY);
rewriteConfigNumericalOption(state,"min-slaves-to-write",server.repl_min_slaves_to_write,REDIS_DEFAULT_MIN_SLAVES_TO_WRITE);
rewriteConfigNumericalOption(state,"min-slaves-max-lag",server.repl_min_slaves_max_lag,REDIS_DEFAULT_MIN_SLAVES_MAX_LAG);
diff --git a/src/config.h b/src/config.h
index 57d07599a..1f2919ed2 100644
--- a/src/config.h
+++ b/src/config.h
@@ -48,6 +48,7 @@
#define HAVE_PROC_STAT 1
#define HAVE_PROC_MAPS 1
#define HAVE_PROC_SMAPS 1
+#define HAVE_PROC_SOMAXCONN 1
#endif
/* Test for task_info() */
diff --git a/src/db.c b/src/db.c
index b39fbe35d..dfec71942 100644
--- a/src/db.c
+++ b/src/db.c
@@ -371,7 +371,7 @@ void scanCallback(void *privdata, const dictEntry *de) {
} else if (o->type == REDIS_ZSET) {
key = dictGetKey(de);
incrRefCount(key);
- val = createStringObjectFromLongDouble(*(double*)dictGetVal(de));
+ val = createStringObjectFromLongDouble(*(double*)dictGetVal(de),0);
} else {
redisPanic("Type not handled in SCAN callback.");
}
diff --git a/src/debug.c b/src/debug.c
index 99f8b89ca..42b379adb 100644
--- a/src/debug.c
+++ b/src/debug.c
@@ -325,7 +325,8 @@ void debugCommand(redisClient *c) {
(long long) sdslen(val->ptr),
(long long) sdsavail(val->ptr));
}
- } else if (!strcasecmp(c->argv[1]->ptr,"populate") && c->argc == 3) {
+ } else if (!strcasecmp(c->argv[1]->ptr,"populate") &&
+ (c->argc == 3 || c->argc == 4)) {
long keys, j;
robj *key, *val;
char buf[128];
@@ -334,7 +335,8 @@ void debugCommand(redisClient *c) {
return;
dictExpand(c->db->dict,keys);
for (j = 0; j < keys; j++) {
- snprintf(buf,sizeof(buf),"key:%lu",j);
+ snprintf(buf,sizeof(buf),"%s:%lu",
+ (c->argc == 3) ? "key" : (char*)c->argv[3]->ptr, j);
key = createStringObject(buf,strlen(buf));
if (lookupKeyRead(c->db,key) != NULL) {
decrRefCount(key);
diff --git a/src/fmacros.h b/src/fmacros.h
index e49735ce5..6e56c759d 100644
--- a/src/fmacros.h
+++ b/src/fmacros.h
@@ -34,6 +34,7 @@
#if defined(__linux__)
#define _GNU_SOURCE
+#define _DEFAULT_SOURCE
#endif
#if defined(_AIX)
diff --git a/src/hyperloglog.c b/src/hyperloglog.c
index 005beb18f..a6cfdc742 100644
--- a/src/hyperloglog.c
+++ b/src/hyperloglog.c
@@ -1213,7 +1213,7 @@ void pfcountCommand(redisClient *c) {
for (j = 1; j < c->argc; j++) {
/* Check type and size. */
robj *o = lookupKeyRead(c->db,c->argv[j]);
- if (o == NULL) continue; /* Assume empty HLL for non existing var. */
+ if (o == NULL) continue; /* Assume empty HLL for non existing var.*/
if (isHLLObjectOrReply(c,o) != REDIS_OK) return;
/* Merge with this HLL with our 'max' HHL by setting max[i]
diff --git a/src/latency.c b/src/latency.c
index b7845ca29..cb116fb90 100644
--- a/src/latency.c
+++ b/src/latency.c
@@ -56,6 +56,32 @@ dictType latencyTimeSeriesDictType = {
dictVanillaFree /* val destructor */
};
+/* ------------------------- Utility functions ------------------------------ */
+
+#ifdef __linux__
+/* Returns 1 if Transparent Huge Pages support is enabled in the kernel.
+ * Otherwise (or if we are unable to check) 0 is returned. */
+int THPIsEnabled(void) {
+ char buf[1024];
+
+ FILE *fp = fopen("/sys/kernel/mm/transparent_hugepage/enabled","r");
+ if (!fp) return 0;
+ if (fgets(buf,sizeof(buf),fp) == NULL) {
+ fclose(fp);
+ return 0;
+ }
+ fclose(fp);
+ return (strstr(buf,"[never]") == NULL) ? 1 : 0;
+}
+#endif
+
+/* Report the amount of AnonHugePages in smap, in bytes. If the return
+ * value of the function is non-zero, the process is being targeted by
+ * THP support, and is likely to have memory usage / latency issues. */
+int THPGetAnonHugePagesSize(void) {
+ return zmalloc_get_smap_bytes_by_field("AnonHugePages:");
+}
+
/* ---------------------------- Latency API --------------------------------- */
/* Latency monitor initialization. We just need to create the dictionary
@@ -203,6 +229,7 @@ sds createLatencyReport(void) {
int advise_hz = 0; /* Use higher HZ. */
int advise_large_objects = 0; /* Deletion of large objects. */
int advise_relax_fsync_policy = 0; /* appendfsync always is slow. */
+ int advise_disable_thp = 0; /* AnonHugePages detected. */
int advices = 0;
/* Return ASAP if the latency engine is disabled and it looks like it
@@ -346,9 +373,15 @@ sds createLatencyReport(void) {
}
dictReleaseIterator(di);
- if (eventnum == 0) {
+ /* Add non event based advices. */
+ if (THPGetAnonHugePagesSize() > 0) {
+ advise_disable_thp = 1;
+ advices++;
+ }
+
+ if (eventnum == 0 && advices == 0) {
report = sdscat(report,"Dave, no latency spike was observed during the lifetime of this Redis instance, not in the slightest bit. I honestly think you ought to sit down calmly, take a stress pill, and think things over.\n");
- } else if (advices == 0) {
+ } else if (eventnum > 0 && advices == 0) {
report = sdscat(report,"\nWhile there are latency events logged, I'm not able to suggest any easy fix. Please use the Redis community to get some help, providing this report in your help request.\n");
} else {
/* Add all the suggestions accumulated so far. */
@@ -418,6 +451,10 @@ sds createLatencyReport(void) {
if (advise_large_objects) {
report = sdscat(report,"- Deleting, expiring or evicting (because of maxmemory policy) large objects is a blocking operation. If you have very large objects that are often deleted, expired, or evicted, try to fragment those objects into multiple smaller objects.\n");
}
+
+ if (advise_disable_thp) {
+ report = sdscat(report,"- I detected a non zero amount of anonymous huge pages used by your process. This creates very serious latency events in different conditions, especially when Redis is persisting on disk. To disable THP support use the command 'echo never > /sys/kernel/mm/transparent_hugepage/enabled', make sure to also add it into /etc/rc.local so that the command will be executed again after a reboot. Note that even if you have already disabled THP, you still need to restart the Redis process to get rid of the huge pages already created.\n");
+ }
}
return report;
@@ -475,7 +512,6 @@ sds latencyCommandGenSparkeline(char *event, struct latencyTimeSeries *ts) {
for (j = 0; j < LATENCY_TS_LEN; j++) {
int i = (ts->idx + j) % LATENCY_TS_LEN;
int elapsed;
- char *label;
char buf[64];
if (ts->samples[i].time == 0) continue;
@@ -497,8 +533,7 @@ sds latencyCommandGenSparkeline(char *event, struct latencyTimeSeries *ts) {
snprintf(buf,sizeof(buf),"%dh",elapsed/3600);
else
snprintf(buf,sizeof(buf),"%dd",elapsed/(3600*24));
- label = zstrdup(buf);
- sparklineSequenceAddSample(seq,ts->samples[i].latency,label);
+ sparklineSequenceAddSample(seq,ts->samples[i].latency,buf);
}
graph = sdscatprintf(graph,
diff --git a/src/latency.h b/src/latency.h
index 6ddbe0410..240f54b45 100644
--- a/src/latency.h
+++ b/src/latency.h
@@ -63,6 +63,7 @@ struct latencyStats {
void latencyMonitorInit(void);
void latencyAddSample(char *event, mstime_t latency);
+int THPIsEnabled(void);
/* Latency monitoring macros. */
diff --git a/src/memtest.c b/src/memtest.c
index 18d821b10..39fc4fcaa 100644
--- a/src/memtest.c
+++ b/src/memtest.c
@@ -35,6 +35,9 @@
#include <errno.h>
#include <termios.h>
#include <sys/ioctl.h>
+#if defined(__sun)
+#include <stropts.h>
+#endif
#include "config.h"
#if (ULONG_MAX == 4294967295UL)
diff --git a/src/networking.c b/src/networking.c
index 9f286b172..0dfd880a3 100644
--- a/src/networking.c
+++ b/src/networking.c
@@ -89,6 +89,7 @@ redisClient *createClient(int fd) {
c->ctime = c->lastinteraction = server.unixtime;
c->authenticated = 0;
c->replstate = REDIS_REPL_NONE;
+ c->repl_put_online_on_ack = 0;
c->reploff = 0;
c->repl_ack_off = 0;
c->repl_ack_time = 0;
@@ -661,12 +662,8 @@ void freeClient(redisClient *c) {
/* Log link disconnection with slave */
if ((c->flags & REDIS_SLAVE) && !(c->flags & REDIS_MONITOR)) {
- char ip[REDIS_IP_STR_LEN];
-
- if (anetPeerToString(c->fd,ip,sizeof(ip),NULL) != -1) {
- redisLog(REDIS_WARNING,"Connection with slave %s:%d lost.",
- ip, c->slave_listening_port);
- }
+ redisLog(REDIS_WARNING,"Connection with slave %s lost.",
+ replicationGetSlaveName(c));
}
/* Free the query buffer */
@@ -716,8 +713,10 @@ void freeClient(redisClient *c) {
/* Master/slave cleanup Case 1:
* we lost the connection with a slave. */
if (c->flags & REDIS_SLAVE) {
- if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
- close(c->repldbfd);
+ if (c->replstate == REDIS_REPL_SEND_BULK) {
+ if (c->repldbfd != -1) close(c->repldbfd);
+ if (c->replpreamble) sdsfree(c->replpreamble);
+ }
list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
ln = listSearchKey(l,c);
redisAssert(ln != NULL);
@@ -823,6 +822,7 @@ void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
*
* However if we are over the maxmemory limit we ignore that and
* just deliver as much data as it is possible to deliver. */
+ server.stat_net_output_bytes += totwritten;
if (totwritten > REDIS_MAX_WRITE_PER_EVENT &&
(server.maxmemory == 0 ||
zmalloc_used_memory() < server.maxmemory)) break;
@@ -906,8 +906,10 @@ int processInlineBuffer(redisClient *c) {
sdsrange(c->querybuf,querylen+2,-1);
/* Setup argv array on client structure */
- if (c->argv) zfree(c->argv);
- c->argv = zmalloc(sizeof(robj*)*argc);
+ if (argc) {
+ if (c->argv) zfree(c->argv);
+ c->argv = zmalloc(sizeof(robj*)*argc);
+ }
/* Create redis objects for all arguments. */
for (c->argc = 0, j = 0; j < argc; j++) {
@@ -1156,6 +1158,7 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
sdsIncrLen(c->querybuf,nread);
c->lastinteraction = server.unixtime;
if (c->flags & REDIS_MASTER) c->reploff += nread;
+ server.stat_net_input_bytes += nread;
} else {
server.current_client = NULL;
return;
diff --git a/src/object.c b/src/object.c
index 56528690f..cd774cb84 100644
--- a/src/object.c
+++ b/src/object.c
@@ -69,26 +69,44 @@ robj *createStringObjectFromLongLong(long long value) {
return o;
}
-/* Note: this function is defined into object.c since here it is where it
- * belongs but it is actually designed to be used just for INCRBYFLOAT */
-robj *createStringObjectFromLongDouble(long double value) {
+/* Create a string object from a long double. If humanfriendly is non-zero
+ * it does not use exponential format and trims trailing zeroes at the end,
+ * however this results in loss of precision. Otherwise exp format is used
+ * and the output of snprintf() is not modified.
+ *
+ * The 'humanfriendly' option is used for INCRBYFLOAT and HINCRBYFLOAT. */
+robj *createStringObjectFromLongDouble(long double value, int humanfriendly) {
char buf[256];
int len;
- /* We use 17 digits precision since with 128 bit floats that precision
- * after rounding is able to represent most small decimal numbers in a way
- * that is "non surprising" for the user (that is, most small decimal
- * numbers will be represented in a way that when converted back into
- * a string are exactly the same as what the user typed.) */
- len = snprintf(buf,sizeof(buf),"%.17Lf", value);
- /* Now remove trailing zeroes after the '.' */
- if (strchr(buf,'.') != NULL) {
- char *p = buf+len-1;
- while(*p == '0') {
- p--;
- len--;
+ if (isinf(value)) {
+ /* Libc in odd systems (Hi Solaris!) will format infinite in a
+ * different way, so better to handle it in an explicit way. */
+ if (value > 0) {
+ memcpy(buf,"inf",3);
+ len = 3;
+ } else {
+ memcpy(buf,"-inf",4);
+ len = 4;
}
- if (*p == '.') len--;
+ } else if (humanfriendly) {
+ /* We use 17 digits precision since with 128 bit floats that precision
+ * after rounding is able to represent most small decimal numbers in a
+ * way that is "non surprising" for the user (that is, most small
+ * decimal numbers will be represented in a way that when converted
+ * back into a string are exactly the same as what the user typed.) */
+ len = snprintf(buf,sizeof(buf),"%.17Lf", value);
+ /* Now remove trailing zeroes after the '.' */
+ if (strchr(buf,'.') != NULL) {
+ char *p = buf+len-1;
+ while(*p == '0') {
+ p--;
+ len--;
+ }
+ if (*p == '.') len--;
+ }
+ } else {
+ len = snprintf(buf,sizeof(buf),"%.17Lg", value);
}
return createStringObject(buf,len);
}
diff --git a/src/rdb.c b/src/rdb.c
index 7bca926f2..dcc146145 100644
--- a/src/rdb.c
+++ b/src/rdb.c
@@ -626,45 +626,37 @@ int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val,
return 1;
}
-/* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
-int rdbSave(char *filename) {
+/* Produces a dump of the database in RDB format sending it to the specified
+ * Redis I/O channel. On success REDIS_OK is returned, otherwise REDIS_ERR
+ * is returned and part of the output, or all the output, can be
+ * missing because of I/O errors.
+ *
+ * When the function returns REDIS_ERR and if 'error' is not NULL, the
+ * integer pointed by 'error' is set to the value of errno just after the I/O
+ * error. */
+int rdbSaveRio(rio *rdb, int *error) {
dictIterator *di = NULL;
dictEntry *de;
- char tmpfile[256];
char magic[10];
int j;
long long now = mstime();
- FILE *fp;
- rio rdb;
uint64_t cksum;
- snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
- fp = fopen(tmpfile,"w");
- if (!fp) {
- redisLog(REDIS_WARNING, "Failed opening .rdb for saving: %s",
- strerror(errno));
- return REDIS_ERR;
- }
-
- rioInitWithFile(&rdb,fp);
if (server.rdb_checksum)
- rdb.update_cksum = rioGenericUpdateChecksum;
+ rdb->update_cksum = rioGenericUpdateChecksum;
snprintf(magic,sizeof(magic),"REDIS%04d",REDIS_RDB_VERSION);
- if (rdbWriteRaw(&rdb,magic,9) == -1) goto werr;
+ if (rdbWriteRaw(rdb,magic,9) == -1) goto werr;
for (j = 0; j < server.dbnum; j++) {
redisDb *db = server.db+j;
dict *d = db->dict;
if (dictSize(d) == 0) continue;
di = dictGetSafeIterator(d);
- if (!di) {
- fclose(fp);
- return REDIS_ERR;
- }
+ if (!di) return REDIS_ERR;
/* Write the SELECT DB opcode */
- if (rdbSaveType(&rdb,REDIS_RDB_OPCODE_SELECTDB) == -1) goto werr;
- if (rdbSaveLen(&rdb,j) == -1) goto werr;
+ if (rdbSaveType(rdb,REDIS_RDB_OPCODE_SELECTDB) == -1) goto werr;
+ if (rdbSaveLen(rdb,j) == -1) goto werr;
/* Iterate this DB writing every entry */
while((de = dictNext(di)) != NULL) {
@@ -674,20 +666,74 @@ int rdbSave(char *filename) {
initStaticStringObject(key,keystr);
expire = getExpire(db,&key);
- if (rdbSaveKeyValuePair(&rdb,&key,o,expire,now) == -1) goto werr;
+ if (rdbSaveKeyValuePair(rdb,&key,o,expire,now) == -1) goto werr;
}
dictReleaseIterator(di);
}
di = NULL; /* So that we don't release it again on error. */
/* EOF opcode */
- if (rdbSaveType(&rdb,REDIS_RDB_OPCODE_EOF) == -1) goto werr;
+ if (rdbSaveType(rdb,REDIS_RDB_OPCODE_EOF) == -1) goto werr;
/* CRC64 checksum. It will be zero if checksum computation is disabled, the
* loading code skips the check in this case. */
- cksum = rdb.cksum;
+ cksum = rdb->cksum;
memrev64ifbe(&cksum);
- if (rioWrite(&rdb,&cksum,8) == 0) goto werr;
+ if (rioWrite(rdb,&cksum,8) == 0) goto werr;
+ return REDIS_OK;
+
+werr:
+ if (error) *error = errno;
+ if (di) dictReleaseIterator(di);
+ return REDIS_ERR;
+}
+
+/* This is just a wrapper to rdbSaveRio() that additionally adds a prefix
+ * and a suffix to the generated RDB dump. The prefix is:
+ *
+ * $EOF:<40 bytes unguessable hex string>\r\n
+ *
+ * While the suffix is the 40 bytes hex string we announced in the prefix.
+ * This way processes receiving the payload can understand when it ends
+ * without doing any processing of the content. */
+int rdbSaveRioWithEOFMark(rio *rdb, int *error) {
+ char eofmark[REDIS_EOF_MARK_SIZE];
+
+ getRandomHexChars(eofmark,REDIS_EOF_MARK_SIZE);
+ if (error) *error = 0;
+ if (rioWrite(rdb,"$EOF:",5) == 0) goto werr;
+ if (rioWrite(rdb,eofmark,REDIS_EOF_MARK_SIZE) == 0) goto werr;
+ if (rioWrite(rdb,"\r\n",2) == 0) goto werr;
+ if (rdbSaveRio(rdb,error) == REDIS_ERR) goto werr;
+ if (rioWrite(rdb,eofmark,REDIS_EOF_MARK_SIZE) == 0) goto werr;
+ return REDIS_OK;
+
+werr: /* Write error. */
+ /* Set 'error' only if not already set by rdbSaveRio() call. */
+ if (error && *error == 0) *error = errno;
+ return REDIS_ERR;
+}
+
+/* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success. */
+int rdbSave(char *filename) {
+ char tmpfile[256];
+ FILE *fp;
+ rio rdb;
+ int error;
+
+ snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
+ fp = fopen(tmpfile,"w");
+ if (!fp) {
+ redisLog(REDIS_WARNING, "Failed opening .rdb for saving: %s",
+ strerror(errno));
+ return REDIS_ERR;
+ }
+
+ rioInitWithFile(&rdb,fp);
+ if (rdbSaveRio(&rdb,&error) == REDIS_ERR) {
+ errno = error;
+ goto werr;
+ }
/* Make sure data will not remain on the OS's output buffers */
if (fflush(fp) == EOF) goto werr;
@@ -711,7 +757,6 @@ werr:
fclose(fp);
unlink(tmpfile);
redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
- if (di) dictReleaseIterator(di);
return REDIS_ERR;
}
@@ -756,6 +801,7 @@ int rdbSaveBackground(char *filename) {
redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
server.rdb_save_time_start = time(NULL);
server.rdb_child_pid = childpid;
+ server.rdb_child_type = REDIS_RDB_CHILD_TYPE_DISK;
updateDictResizePolicy();
return REDIS_OK;
}
@@ -1191,8 +1237,9 @@ eoferr: /* unexpected end of file is handled here with a fatal exit */
return REDIS_ERR; /* Just to avoid warning */
}
-/* A background saving child (BGSAVE) terminated its work. Handle this. */
-void backgroundSaveDoneHandler(int exitcode, int bysignal) {
+/* A background saving child (BGSAVE) terminated its work. Handle this.
+ * This function covers the case of actual BGSAVEs. */
+void backgroundSaveDoneHandlerDisk(int exitcode, int bysignal) {
if (!bysignal && exitcode == 0) {
redisLog(REDIS_NOTICE,
"Background saving terminated with success");
@@ -1217,11 +1264,258 @@ void backgroundSaveDoneHandler(int exitcode, int bysignal) {
server.lastbgsave_status = REDIS_ERR;
}
server.rdb_child_pid = -1;
+ server.rdb_child_type = REDIS_RDB_CHILD_TYPE_NONE;
server.rdb_save_time_last = time(NULL)-server.rdb_save_time_start;
server.rdb_save_time_start = -1;
/* Possibly there are slaves waiting for a BGSAVE in order to be served
* (the first stage of SYNC is a bulk transfer of dump.rdb) */
- updateSlavesWaitingBgsave((!bysignal && exitcode == 0) ? REDIS_OK : REDIS_ERR);
+ updateSlavesWaitingBgsave((!bysignal && exitcode == 0) ? REDIS_OK : REDIS_ERR, REDIS_RDB_CHILD_TYPE_DISK);
+}
+
+/* A background saving child (BGSAVE) terminated its work. Handle this.
+ * This function covers the case of RDB -> Salves socket transfers for
+ * diskless replication. */
+void backgroundSaveDoneHandlerSocket(int exitcode, int bysignal) {
+ uint64_t *ok_slaves;
+
+ if (!bysignal && exitcode == 0) {
+ redisLog(REDIS_NOTICE,
+ "Background RDB transfer terminated with success");
+ } else if (!bysignal && exitcode != 0) {
+ redisLog(REDIS_WARNING, "Background transfer error");
+ } else {
+ redisLog(REDIS_WARNING,
+ "Background transfer terminated by signal %d", bysignal);
+ }
+ server.rdb_child_pid = -1;
+ server.rdb_child_type = REDIS_RDB_CHILD_TYPE_NONE;
+ server.rdb_save_time_start = -1;
+
+ /* If the child returns an OK exit code, read the set of slave client
+ * IDs and the associated status code. We'll terminate all the slaves
+ * in error state.
+ *
+ * If the process returned an error, consider the list of slaves that
+ * can continue to be emtpy, so that it's just a special case of the
+ * normal code path. */
+ ok_slaves = zmalloc(sizeof(uint64_t)); /* Make space for the count. */
+ ok_slaves[0] = 0;
+ if (!bysignal && exitcode == 0) {
+ int readlen = sizeof(uint64_t);
+
+ if (read(server.rdb_pipe_read_result_from_child, ok_slaves, readlen) ==
+ readlen)
+ {
+ readlen = ok_slaves[0]*sizeof(uint64_t)*2;
+
+ /* Make space for enough elements as specified by the first
+ * uint64_t element in the array. */
+ ok_slaves = zrealloc(ok_slaves,sizeof(uint64_t)+readlen);
+ if (readlen &&
+ read(server.rdb_pipe_read_result_from_child, ok_slaves+1,
+ readlen) != readlen)
+ {
+ ok_slaves[0] = 0;
+ }
+ }
+ }
+
+ close(server.rdb_pipe_read_result_from_child);
+ close(server.rdb_pipe_write_result_to_parent);
+
+ /* We can continue the replication process with all the slaves that
+ * correctly received the full payload. Others are terminated. */
+ listNode *ln;
+ listIter li;
+
+ listRewind(server.slaves,&li);
+ while((ln = listNext(&li))) {
+ redisClient *slave = ln->value;
+
+ if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
+ uint64_t j;
+ int errorcode = 0;
+
+ /* Search for the slave ID in the reply. In order for a slave to
+ * continue the replication process, we need to find it in the list,
+ * and it must have an error code set to 0 (which means success). */
+ for (j = 0; j < ok_slaves[0]; j++) {
+ if (slave->id == ok_slaves[2*j+1]) {
+ errorcode = ok_slaves[2*j+2];
+ break; /* Found in slaves list. */
+ }
+ }
+ if (j == ok_slaves[0] || errorcode != 0) {
+ redisLog(REDIS_WARNING,
+ "Closing slave %s: child->slave RDB transfer failed: %s",
+ replicationGetSlaveName(slave),
+ (errorcode == 0) ? "RDB transfer child aborted"
+ : strerror(errorcode));
+ freeClient(slave);
+ } else {
+ redisLog(REDIS_WARNING,
+ "Slave %s correctly received the streamed RDB file.",
+ replicationGetSlaveName(slave));
+ /* Restore the socket as non-blocking. */
+ anetNonBlock(NULL,slave->fd);
+ anetSendTimeout(NULL,slave->fd,0);
+ }
+ }
+ }
+ zfree(ok_slaves);
+
+ updateSlavesWaitingBgsave((!bysignal && exitcode == 0) ? REDIS_OK : REDIS_ERR, REDIS_RDB_CHILD_TYPE_SOCKET);
+}
+
+/* When a background RDB saving/transfer terminates, call the right handler. */
+void backgroundSaveDoneHandler(int exitcode, int bysignal) {
+ switch(server.rdb_child_type) {
+ case REDIS_RDB_CHILD_TYPE_DISK:
+ backgroundSaveDoneHandlerDisk(exitcode,bysignal);
+ break;
+ case REDIS_RDB_CHILD_TYPE_SOCKET:
+ backgroundSaveDoneHandlerSocket(exitcode,bysignal);
+ break;
+ default:
+ redisPanic("Unknown RDB child type.");
+ break;
+ }
+}
+
+/* Spawn an RDB child that writes the RDB to the sockets of the slaves
+ * that are currently in REDIS_REPL_WAIT_BGSAVE_START state. */
+int rdbSaveToSlavesSockets(void) {
+ int *fds;
+ uint64_t *clientids;
+ int numfds;
+ listNode *ln;
+ listIter li;
+ pid_t childpid;
+ long long start;
+ int pipefds[2];
+
+ if (server.rdb_child_pid != -1) return REDIS_ERR;
+
+ /* Before to fork, create a pipe that will be used in order to
+ * send back to the parent the IDs of the slaves that successfully
+ * received all the writes. */
+ if (pipe(pipefds) == -1) return REDIS_ERR;
+ server.rdb_pipe_read_result_from_child = pipefds[0];
+ server.rdb_pipe_write_result_to_parent = pipefds[1];
+
+ /* Collect the file descriptors of the slaves we want to transfer
+ * the RDB to, which are i WAIT_BGSAVE_START state. */
+ fds = zmalloc(sizeof(int)*listLength(server.slaves));
+ /* We also allocate an array of corresponding client IDs. This will
+ * be useful for the child process in order to build the report
+ * (sent via unix pipe) that will be sent to the parent. */
+ clientids = zmalloc(sizeof(uint64_t)*listLength(server.slaves));
+ numfds = 0;
+
+ listRewind(server.slaves,&li);
+ while((ln = listNext(&li))) {
+ redisClient *slave = ln->value;
+
+ if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
+ clientids[numfds] = slave->id;
+ fds[numfds++] = slave->fd;
+ slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
+ /* Put the socket in non-blocking mode to simplify RDB transfer.
+ * We'll restore it when the children returns (since duped socket
+ * will share the O_NONBLOCK attribute with the parent). */
+ anetBlock(NULL,slave->fd);
+ anetSendTimeout(NULL,slave->fd,server.repl_timeout*1000);
+ }
+ }
+
+ /* Create the child process. */
+ start = ustime();
+ if ((childpid = fork()) == 0) {
+ /* Child */
+ int retval;
+ rio slave_sockets;
+
+ rioInitWithFdset(&slave_sockets,fds,numfds);
+ zfree(fds);
+
+ closeListeningSockets(0);
+ redisSetProcTitle("redis-rdb-to-slaves");
+
+ retval = rdbSaveRioWithEOFMark(&slave_sockets,NULL);
+ if (retval == REDIS_OK && rioFlush(&slave_sockets) == 0)
+ retval = REDIS_ERR;
+
+ if (retval == REDIS_OK) {
+ size_t private_dirty = zmalloc_get_private_dirty();
+
+ if (private_dirty) {
+ redisLog(REDIS_NOTICE,
+ "RDB: %zu MB of memory used by copy-on-write",
+ private_dirty/(1024*1024));
+ }
+
+ /* If we are returning OK, at least one slave was served
+ * with the RDB file as expected, so we need to send a report
+ * to the parent via the pipe. The format of the message is:
+ *
+ * <len> <slave[0].id> <slave[0].error> ...
+ *
+ * len, slave IDs, and slave errors, are all uint64_t integers,
+ * so basically the reply is composed of 64 bits for the len field
+ * plus 2 additional 64 bit integers for each entry, for a total
+ * of 'len' entries.
+ *
+ * The 'id' represents the slave's client ID, so that the master
+ * can match the report with a specific slave, and 'error' is
+ * set to 0 if the replication process terminated with a success
+ * or the error code if an error occurred. */
+ void *msg = zmalloc(sizeof(uint64_t)*(1+2*numfds));
+ uint64_t *len = msg;
+ uint64_t *ids = len+1;
+ int j, msglen;
+
+ *len = numfds;
+ for (j = 0; j < numfds; j++) {
+ *ids++ = clientids[j];
+ *ids++ = slave_sockets.io.fdset.state[j];
+ }
+
+ /* Write the message to the parent. If we have no good slaves or
+ * we are unable to transfer the message to the parent, we exit
+ * with an error so that the parent will abort the replication
+ * process with all the childre that were waiting. */
+ msglen = sizeof(uint64_t)*(1+2*numfds);
+ if (*len == 0 ||
+ write(server.rdb_pipe_write_result_to_parent,msg,msglen)
+ != msglen)
+ {
+ retval = REDIS_ERR;
+ }
+ }
+ exitFromChild((retval == REDIS_OK) ? 0 : 1);
+ } else {
+ /* Parent */
+ zfree(clientids); /* Not used by parent. Free ASAP. */
+ server.stat_fork_time = ustime()-start;
+ server.stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / server.stat_fork_time / (1024*1024*1024); /* GB per second. */
+ latencyAddSampleIfNeeded("fork",server.stat_fork_time/1000);
+ if (childpid == -1) {
+ redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
+ strerror(errno));
+ zfree(fds);
+ close(pipefds[0]);
+ close(pipefds[1]);
+ return REDIS_ERR;
+ }
+ redisLog(REDIS_NOTICE,"Background RDB transfer started by pid %d",childpid);
+ server.rdb_save_time_start = time(NULL);
+ server.rdb_child_pid = childpid;
+ server.rdb_child_type = REDIS_RDB_CHILD_TYPE_SOCKET;
+ updateDictResizePolicy();
+ zfree(fds);
+ return REDIS_OK;
+ }
+ return REDIS_OK; /* unreached */
}
void saveCommand(redisClient *c) {
diff --git a/src/rdb.h b/src/rdb.h
index 54ee4e514..eb40d4993 100644
--- a/src/rdb.h
+++ b/src/rdb.h
@@ -101,6 +101,7 @@ int rdbSaveObjectType(rio *rdb, robj *o);
int rdbLoadObjectType(rio *rdb);
int rdbLoad(char *filename);
int rdbSaveBackground(char *filename);
+int rdbSaveToSlavesSockets(void);
void rdbRemoveTempFile(pid_t childpid);
int rdbSave(char *filename);
int rdbSaveObject(rio *rdb, robj *o);
diff --git a/src/redis-benchmark.c b/src/redis-benchmark.c
index 2e67f1021..8c9304dec 100644
--- a/src/redis-benchmark.c
+++ b/src/redis-benchmark.c
@@ -90,9 +90,10 @@ typedef struct _client {
long long start; /* Start time of a request */
long long latency; /* Request latency */
int pending; /* Number of pending requests (replies to consume) */
- int selectlen; /* If non-zero, a SELECT of 'selectlen' bytes is currently
- used as a prefix of the pipline of commands. This gets
- discarded the first time it's sent. */
+ int prefix_pending; /* If non-zero, number of pending prefix commands. Commands
+ such as auth and select are prefixed to the pipeline of
+ benchmark commands and discarded after the first send. */
+ int prefixlen; /* Size in bytes of the pending prefix commands */
} *client;
/* Prototypes */
@@ -212,20 +213,21 @@ static void readHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
}
freeReplyObject(reply);
-
- if (c->selectlen) {
- size_t j;
-
- /* This is the OK from SELECT. Just discard the SELECT
- * from the buffer. */
+ /* This is an OK for prefix commands such as auth and select.*/
+ if (c->prefix_pending > 0) {
+ c->prefix_pending--;
c->pending--;
- sdsrange(c->obuf,c->selectlen,-1);
- /* We also need to fix the pointers to the strings
- * we need to randomize. */
- for (j = 0; j < c->randlen; j++)
- c->randptr[j] -= c->selectlen;
- c->selectlen = 0;
- continue;
+ /* Discard prefix commands on first response.*/
+ if (c->prefixlen > 0) {
+ size_t j;
+ sdsrange(c->obuf, c->prefixlen, -1);
+ /* We also need to fix the pointers to the strings
+ * we need to randomize. */
+ for (j = 0; j < c->randlen; j++)
+ c->randptr[j] -= c->prefixlen;
+ c->prefixlen = 0;
+ }
+ continue;
}
if (config.requests_finished < config.requests)
@@ -299,8 +301,7 @@ static void writeHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
* 2) The offsets of the __rand_int__ elements inside the command line, used
* for arguments randomization.
*
- * Even when cloning another client, the SELECT command is automatically prefixed
- * if needed. */
+ * Even when cloning another client, prefix commands are applied if needed.*/
static client createClient(char *cmd, size_t len, client from) {
int j;
client c = zmalloc(sizeof(struct _client));
@@ -325,12 +326,16 @@ static client createClient(char *cmd, size_t len, client from) {
* Queue N requests accordingly to the pipeline size, or simply clone
* the example client buffer. */
c->obuf = sdsempty();
-
+ /* Prefix the request buffer with AUTH and/or SELECT commands, if applicable.
+ * These commands are discarded after the first response, so if the client is
+ * reused the commands will not be used again. */
+ c->prefix_pending = 0;
if (config.auth) {
char *buf = NULL;
int len = redisFormatCommand(&buf, "AUTH %s", config.auth);
c->obuf = sdscatlen(c->obuf, buf, len);
free(buf);
+ c->prefix_pending++;
}
/* If a DB number different than zero is selected, prefix our request
@@ -340,26 +345,23 @@ static client createClient(char *cmd, size_t len, client from) {
if (config.dbnum != 0) {
c->obuf = sdscatprintf(c->obuf,"*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n",
(int)sdslen(config.dbnumstr),config.dbnumstr);
- c->selectlen = sdslen(c->obuf);
- } else {
- c->selectlen = 0;
+ c->prefix_pending++;
}
-
+ c->prefixlen = sdslen(c->obuf);
/* Append the request itself. */
if (from) {
c->obuf = sdscatlen(c->obuf,
- from->obuf+from->selectlen,
- sdslen(from->obuf)-from->selectlen);
+ from->obuf+from->prefixlen,
+ sdslen(from->obuf)-from->prefixlen);
} else {
for (j = 0; j < config.pipeline; j++)
c->obuf = sdscatlen(c->obuf,cmd,len);
}
c->written = 0;
- c->pending = config.pipeline;
+ c->pending = config.pipeline+c->prefix_pending;
c->randptr = NULL;
c->randlen = 0;
- if (c->selectlen) c->pending++;
/* Find substrings in the output buffer that need to be randomized. */
if (config.randomkeys) {
@@ -371,7 +373,7 @@ static client createClient(char *cmd, size_t len, client from) {
for (j = 0; j < (int)c->randlen; j++) {
c->randptr[j] = c->obuf + (from->randptr[j]-from->obuf);
/* Adjust for the different select prefix length. */
- c->randptr[j] += c->selectlen - from->selectlen;
+ c->randptr[j] += c->prefixlen - from->prefixlen;
}
} else {
char *p = c->obuf;
@@ -390,7 +392,8 @@ static client createClient(char *cmd, size_t len, client from) {
}
}
}
- aeCreateFileEvent(config.el,c->context->fd,AE_WRITABLE,writeHandler,c);
+ if (config.idlemode == 0)
+ aeCreateFileEvent(config.el,c->context->fd,AE_WRITABLE,writeHandler,c);
listAddNodeTail(config.clients,c);
config.liveclients++;
return c;
@@ -555,7 +558,7 @@ usage:
" -s <socket> Server socket (overrides host and port)\n"
" -a <password> Password for Redis Auth\n"
" -c <clients> Number of parallel connections (default 50)\n"
-" -n <requests> Total number of requests (default 10000)\n"
+" -n <requests> Total number of requests (default 100000)\n"
" -d <size> Data size of SET/GET value in bytes (default 2)\n"
" -dbnum <db> SELECT the specified db number (default 0)\n"
" -k <boolean> 1=keep alive 0=reconnect (default 1)\n"
@@ -599,9 +602,13 @@ int showThroughput(struct aeEventLoop *eventLoop, long long id, void *clientData
if (config.liveclients == 0) {
fprintf(stderr,"All clients disconnected... aborting.\n");
exit(1);
- }
-
+ }
if (config.csv) return 250;
+ if (config.idlemode == 1) {
+ printf("clients: %d\r", config.liveclients);
+ fflush(stdout);
+ return 250;
+ }
float dt = (float)(mstime()-config.start)/1000.0;
float rps = (float)config.requests_finished/dt;
printf("%s: %.2f\r", config.title, rps);
@@ -635,7 +642,7 @@ int main(int argc, const char **argv) {
signal(SIGPIPE, SIG_IGN);
config.numclients = 50;
- config.requests = 10000;
+ config.requests = 100000;
config.liveclients = 0;
config.el = aeCreateEventLoop(1024*10);
aeCreateTimeEvent(config.el,1,showThroughput,NULL,NULL);
@@ -693,8 +700,8 @@ int main(int argc, const char **argv) {
}
/* Run default benchmark suite. */
+ data = zmalloc(config.datasize+1);
do {
- data = zmalloc(config.datasize+1);
memset(data,'x',config.datasize);
data[config.datasize] = '\0';
diff --git a/src/redis-cli.c b/src/redis-cli.c
index 10030f874..fc1265c3f 100644
--- a/src/redis-cli.c
+++ b/src/redis-cli.c
@@ -524,7 +524,8 @@ static int cliReadReply(int output_raw_strings) {
}
if (config.interactive) {
/* Filter cases where we should reconnect */
- if (context->err == REDIS_ERR_IO && errno == ECONNRESET)
+ if (context->err == REDIS_ERR_IO &&
+ (errno == ECONNRESET || errno == EPIPE))
return REDIS_ERR;
if (context->err == REDIS_ERR_EOF)
return REDIS_ERR;
@@ -828,6 +829,7 @@ static void usage(void) {
" not a tty).\n"
" --no-raw Force formatted output even when STDOUT is not a tty.\n"
" --csv Output in CSV format.\n"
+" --stat Print rolling stats about server: mem, clients, ...\n"
" --latency Enter a special mode continuously sampling latency.\n"
" --latency-history Like --latency but tracking latency changes over time.\n"
" Default time interval is 15 sec. Change it using -i.\n"
@@ -1510,7 +1512,7 @@ static void findBigKeys(void) {
for(i=0;i<TYPE_NONE; i++) {
maxkeys[i] = sdsempty();
if(!maxkeys[i]) {
- fprintf(stderr, "Failed to allocate memory for largest key names!");
+ fprintf(stderr, "Failed to allocate memory for largest key names!\n");
exit(1);
}
}
@@ -1961,6 +1963,9 @@ int main(int argc, char **argv) {
/* Start interactive mode when no command is provided */
if (argc == 0 && !config.eval) {
+ /* Ignore SIGPIPE in interactive mode to force a reconnect */
+ signal(SIGPIPE, SIG_IGN);
+
/* Note that in repl mode we don't abort on connection error.
* A new attempt will be performed for every command send. */
cliConnect(0);
diff --git a/src/redis.c b/src/redis.c
index d090b1ed3..289e37791 100644
--- a/src/redis.c
+++ b/src/redis.c
@@ -30,6 +30,7 @@
#include "redis.h"
#include "slowlog.h"
#include "bio.h"
+#include "latency.h"
#include <time.h>
#include <signal.h>
@@ -270,7 +271,7 @@ struct redisCommand redisCommandTable[] = {
{"command",commandCommand,0,"rlt",0,NULL,0,0,0,0,0},
{"pfselftest",pfselftestCommand,1,"r",0,NULL,0,0,0,0,0},
{"pfadd",pfaddCommand,-2,"wmF",0,NULL,1,1,1,0,0},
- {"pfcount",pfcountCommand,-2,"w",0,NULL,1,1,1,0,0},
+ {"pfcount",pfcountCommand,-2,"r",0,NULL,1,1,1,0,0},
{"pfmerge",pfmergeCommand,-2,"wm",0,NULL,1,-1,1,0,0},
{"pfdebug",pfdebugCommand,-3,"w",0,NULL,0,0,0,0,0},
{"latency",latencyCommand,-2,"arslt",0,NULL,0,0,0,0,0}
@@ -824,27 +825,30 @@ void updateLRUClock(void) {
/* Add a sample to the operations per second array of samples. */
-void trackOperationsPerSecond(void) {
- long long t = mstime() - server.ops_sec_last_sample_time;
- long long ops = server.stat_numcommands - server.ops_sec_last_sample_ops;
+void trackInstantaneousMetric(int metric, long long current_reading) {
+ long long t = mstime() - server.inst_metric[metric].last_sample_time;
+ long long ops = current_reading -
+ server.inst_metric[metric].last_sample_count;
long long ops_sec;
ops_sec = t > 0 ? (ops*1000/t) : 0;
- server.ops_sec_samples[server.ops_sec_idx] = ops_sec;
- server.ops_sec_idx = (server.ops_sec_idx+1) % REDIS_OPS_SEC_SAMPLES;
- server.ops_sec_last_sample_time = mstime();
- server.ops_sec_last_sample_ops = server.stat_numcommands;
+ server.inst_metric[metric].samples[server.inst_metric[metric].idx] =
+ ops_sec;
+ server.inst_metric[metric].idx++;
+ server.inst_metric[metric].idx %= REDIS_METRIC_SAMPLES;
+ server.inst_metric[metric].last_sample_time = mstime();
+ server.inst_metric[metric].last_sample_count = current_reading;
}
/* Return the mean of all the samples. */
-long long getOperationsPerSecond(void) {
+long long getInstantaneousMetric(int metric) {
int j;
long long sum = 0;
- for (j = 0; j < REDIS_OPS_SEC_SAMPLES; j++)
- sum += server.ops_sec_samples[j];
- return sum / REDIS_OPS_SEC_SAMPLES;
+ for (j = 0; j < REDIS_METRIC_SAMPLES; j++)
+ sum += server.inst_metric[metric].samples[j];
+ return sum / REDIS_METRIC_SAMPLES;
}
/* Check for timeouts. Returns non-zero if the client was terminated */
@@ -1011,7 +1015,13 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
/* Update the time cache. */
updateCachedTime();
- run_with_period(100) trackOperationsPerSecond();
+ run_with_period(100) {
+ trackInstantaneousMetric(REDIS_METRIC_COMMAND,server.stat_numcommands);
+ trackInstantaneousMetric(REDIS_METRIC_NET_INPUT,
+ server.stat_net_input_bytes);
+ trackInstantaneousMetric(REDIS_METRIC_NET_OUTPUT,
+ server.stat_net_output_bytes);
+ }
/* We have just 22 bits per object for LRU information.
* So we use an (eventually wrapping) LRU clock with 10 seconds resolution.
@@ -1394,6 +1404,8 @@ void initServerConfig(void) {
server.repl_slave_ro = REDIS_DEFAULT_SLAVE_READ_ONLY;
server.repl_down_since = 0; /* Never connected, repl is down since EVER. */
server.repl_disable_tcp_nodelay = REDIS_DEFAULT_REPL_DISABLE_TCP_NODELAY;
+ server.repl_diskless_sync = REDIS_DEFAULT_REPL_DISKLESS_SYNC;
+ server.repl_diskless_sync_delay = REDIS_DEFAULT_REPL_DISKLESS_SYNC_DELAY;
server.slave_priority = REDIS_DEFAULT_SLAVE_PRIORITY;
server.master_repl_offset = 0;
@@ -1523,6 +1535,23 @@ void adjustOpenFilesLimit(void) {
}
}
+/* Check that server.tcp_backlog can be actually enforced in Linux according
+ * to the value of /proc/sys/net/core/somaxconn, or warn about it. */
+void checkTcpBacklogSettings(void) {
+#ifdef HAVE_PROC_SOMAXCONN
+ FILE *fp = fopen("/proc/sys/net/core/somaxconn","r");
+ char buf[1024];
+ if (!fp) return;
+ if (fgets(buf,sizeof(buf),fp) != NULL) {
+ int somaxconn = atoi(buf);
+ if (somaxconn > 0 && somaxconn < server.tcp_backlog) {
+ redisLog(REDIS_WARNING,"WARNING: The TCP backlog setting of %d cannot be enforced because /proc/sys/net/core/somaxconn is set to the lower value of %d.", server.tcp_backlog, somaxconn);
+ }
+ }
+ fclose(fp);
+#endif
+}
+
/* Initialize a set of file descriptors to listen to the specified 'port'
* binding the addresses specified in the Redis server configuration.
*
@@ -1593,6 +1622,8 @@ int listenToPort(int port, int *fds, int *count) {
* to reset via CONFIG RESETSTAT. The function is also used in order to
* initialize these fields in initServer() at server startup. */
void resetServerStats(void) {
+ int j;
+
server.stat_numcommands = 0;
server.stat_numconnections = 0;
server.stat_expiredkeys = 0;
@@ -1605,10 +1636,15 @@ void resetServerStats(void) {
server.stat_sync_full = 0;
server.stat_sync_partial_ok = 0;
server.stat_sync_partial_err = 0;
- memset(server.ops_sec_samples,0,sizeof(server.ops_sec_samples));
- server.ops_sec_idx = 0;
- server.ops_sec_last_sample_time = mstime();
- server.ops_sec_last_sample_ops = 0;
+ for (j = 0; j < REDIS_METRIC_COUNT; j++) {
+ server.inst_metric[j].idx = 0;
+ server.inst_metric[j].last_sample_time = mstime();
+ server.inst_metric[j].last_sample_count = 0;
+ memset(server.inst_metric[j].samples,0,
+ sizeof(server.inst_metric[j].samples));
+ }
+ server.stat_net_input_bytes = 0;
+ server.stat_net_output_bytes = 0;
}
void initServer(void) {
@@ -1677,6 +1713,7 @@ void initServer(void) {
server.cronloops = 0;
server.rdb_child_pid = -1;
server.aof_child_pid = -1;
+ server.rdb_child_type = REDIS_RDB_CHILD_TYPE_NONE;
aofRewriteBufferReset();
server.aof_buf = sdsempty();
server.lastsave = time(NULL); /* At startup we consider the DB saved. */
@@ -2629,6 +2666,10 @@ sds genRedisInfoString(char *section) {
"total_connections_received:%lld\r\n"
"total_commands_processed:%lld\r\n"
"instantaneous_ops_per_sec:%lld\r\n"
+ "total_net_input_bytes:%lld\r\n"
+ "total_net_output_bytes:%lld\r\n"
+ "instantaneous_input_kbps:%.2f\r\n"
+ "instantaneous_output_kbps:%.2f\r\n"
"rejected_connections:%lld\r\n"
"sync_full:%lld\r\n"
"sync_partial_ok:%lld\r\n"
@@ -2642,7 +2683,11 @@ sds genRedisInfoString(char *section) {
"latest_fork_usec:%lld\r\n",
server.stat_numconnections,
server.stat_numcommands,
- getOperationsPerSecond(),
+ getInstantaneousMetric(REDIS_METRIC_COMMAND),
+ server.stat_net_input_bytes,
+ server.stat_net_output_bytes,
+ (float)getInstantaneousMetric(REDIS_METRIC_NET_INPUT)/1024,
+ (float)getInstantaneousMetric(REDIS_METRIC_NET_OUTPUT)/1024,
server.stat_rejected_conn,
server.stat_sync_full,
server.stat_sync_partial_ok,
@@ -3032,10 +3077,13 @@ int linuxOvercommitMemoryValue(void) {
return atoi(buf);
}
-void linuxOvercommitMemoryWarning(void) {
+void linuxMemoryWarnings(void) {
if (linuxOvercommitMemoryValue() == 0) {
redisLog(REDIS_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low memory condition. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
}
+ if (THPIsEnabled()) {
+ redisLog(REDIS_WARNING,"WARNING you have Transparent Huge Pages (THP) support enabled in your kernel. This will create latency and memory usage issues with Redis. To fix this issue run the command 'echo never > /sys/kernel/mm/transparent_hugepage/enabled' as root, and add it to your /etc/rc.local in order to retain the setting after a reboot. Redis must be restarted after THP is disabled.");
+ }
}
#endif /* __linux__ */
@@ -3100,15 +3148,27 @@ void redisAsciiArt(void) {
if (server.sentinel_mode) mode = "sentinel";
- snprintf(buf,1024*16,ascii_logo,
- REDIS_VERSION,
- redisGitSHA1(),
- strtol(redisGitDirty(),NULL,10) > 0,
- (sizeof(long) == 8) ? "64" : "32",
- mode, server.port,
- (long) getpid()
- );
- redisLogRaw(REDIS_NOTICE|REDIS_LOG_RAW,buf);
+ if (server.syslog_enabled) {
+ redisLog(REDIS_NOTICE,
+ "Redis %s (%s/%d) %s bit, %s mode, port %d, pid %ld ready to start.",
+ REDIS_VERSION,
+ redisGitSHA1(),
+ strtol(redisGitDirty(),NULL,10) > 0,
+ (sizeof(long) == 8) ? "64" : "32",
+ mode, server.port,
+ (long) getpid()
+ );
+ } else {
+ snprintf(buf,1024*16,ascii_logo,
+ REDIS_VERSION,
+ redisGitSHA1(),
+ strtol(redisGitDirty(),NULL,10) > 0,
+ (sizeof(long) == 8) ? "64" : "32",
+ mode, server.port,
+ (long) getpid()
+ );
+ redisLogRaw(REDIS_NOTICE|REDIS_LOG_RAW,buf);
+ }
zfree(buf);
}
@@ -3301,8 +3361,9 @@ int main(int argc, char **argv) {
/* Things not needed when running in Sentinel mode. */
redisLog(REDIS_WARNING,"Server started, Redis version " REDIS_VERSION);
#ifdef __linux__
- linuxOvercommitMemoryWarning();
+ linuxMemoryWarnings();
#endif
+ checkTcpBacklogSettings();
loadDataFromDisk();
if (server.ipfd_count > 0)
redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
diff --git a/src/redis.h b/src/redis.h
index e9da98a0c..bfe690abf 100644
--- a/src/redis.h
+++ b/src/redis.h
@@ -96,7 +96,7 @@ typedef long long mstime_t; /* millisecond time type. */
#define REDIS_REPL_TIMEOUT 60
#define REDIS_REPL_PING_SLAVE_PERIOD 10
#define REDIS_RUN_ID_SIZE 40
-#define REDIS_OPS_SEC_SAMPLES 16
+#define REDIS_EOF_MARK_SIZE 40
#define REDIS_DEFAULT_REPL_BACKLOG_SIZE (1024*1024) /* 1mb */
#define REDIS_DEFAULT_REPL_BACKLOG_TIME_LIMIT (60*60) /* 1 hour */
#define REDIS_REPL_BACKLOG_MIN_SIZE (1024*16) /* 16k */
@@ -113,6 +113,8 @@ typedef long long mstime_t; /* millisecond time type. */
#define REDIS_DEFAULT_RDB_COMPRESSION 1
#define REDIS_DEFAULT_RDB_CHECKSUM 1
#define REDIS_DEFAULT_RDB_FILENAME "dump.rdb"
+#define REDIS_DEFAULT_REPL_DISKLESS_SYNC 0
+#define REDIS_DEFAULT_REPL_DISKLESS_SYNC_DELAY 5
#define REDIS_DEFAULT_SLAVE_SERVE_STALE_DATA 1
#define REDIS_DEFAULT_SLAVE_READ_ONLY 1
#define REDIS_DEFAULT_REPL_DISABLE_TCP_NODELAY 0
@@ -137,6 +139,13 @@ typedef long long mstime_t; /* millisecond time type. */
#define ACTIVE_EXPIRE_CYCLE_SLOW 0
#define ACTIVE_EXPIRE_CYCLE_FAST 1
+/* Instantaneous metrics tracking. */
+#define REDIS_METRIC_SAMPLES 16 /* Number of samples per metric. */
+#define REDIS_METRIC_COMMAND 0 /* Number of commands executed. */
+#define REDIS_METRIC_NET_INPUT 1 /* Bytes read to network .*/
+#define REDIS_METRIC_NET_OUTPUT 2 /* Bytes written to network. */
+#define REDIS_METRIC_COUNT 3
+
/* Protocol and I/O related defines */
#define REDIS_MAX_QUERYBUF_LEN (1024*1024*1024) /* 1GB max query buffer. */
#define REDIS_IOBUF_LEN (1024*16) /* Generic I/O buffer size */
@@ -354,6 +363,11 @@ typedef long long mstime_t; /* millisecond time type. */
#define REDIS_PROPAGATE_AOF 1
#define REDIS_PROPAGATE_REPL 2
+/* RDB active child save type. */
+#define REDIS_RDB_CHILD_TYPE_NONE 0
+#define REDIS_RDB_CHILD_TYPE_DISK 1 /* RDB is written to disk. */
+#define REDIS_RDB_CHILD_TYPE_SOCKET 2 /* RDB is written to slave socket. */
+
/* Keyspace changes notification classes. Every class is associated with a
* character for configuration purposes. */
#define REDIS_NOTIFY_KEYSPACE (1<<0) /* K */
@@ -485,9 +499,11 @@ typedef struct redisClient {
int flags; /* REDIS_SLAVE | REDIS_MONITOR | REDIS_MULTI ... */
int authenticated; /* when requirepass is non-NULL */
int replstate; /* replication state if this is a slave */
+ int repl_put_online_on_ack; /* Install slave write handler on ACK. */
int repldbfd; /* replication DB file descriptor */
off_t repldboff; /* replication DB file offset */
off_t repldbsize; /* replication DB file size */
+ sds replpreamble; /* replication DB preamble. */
long long reploff; /* replication offset if this is our master */
long long repl_ack_off; /* replication ack offset, if this is a slave */
long long repl_ack_time;/* replication ack time, if this is a slave */
@@ -651,12 +667,16 @@ struct redisServer {
long long slowlog_log_slower_than; /* SLOWLOG time limit (to get logged) */
unsigned long slowlog_max_len; /* SLOWLOG max number of items logged */
size_t resident_set_size; /* RSS sampled in serverCron(). */
- /* The following two are used to track instantaneous "load" in terms
- * of operations per second. */
- long long ops_sec_last_sample_time; /* Timestamp of last sample (in ms) */
- long long ops_sec_last_sample_ops; /* numcommands in last sample */
- long long ops_sec_samples[REDIS_OPS_SEC_SAMPLES];
- int ops_sec_idx;
+ long long stat_net_input_bytes; /* Bytes read from network. */
+ long long stat_net_output_bytes; /* Bytes written to network. */
+ /* The following two are used to track instantaneous metrics, like
+ * number of operations per second, network traffic. */
+ struct {
+ long long last_sample_time; /* Timestamp of last sample in ms */
+ long long last_sample_count;/* Count in last sample */
+ long long samples[REDIS_METRIC_SAMPLES];
+ int idx;
+ } inst_metric[REDIS_METRIC_COUNT];
/* Configuration */
int verbosity; /* Loglevel in redis.conf */
int maxidletime; /* Client timeout in seconds */
@@ -704,8 +724,11 @@ struct redisServer {
time_t lastbgsave_try; /* Unix time of last attempted bgsave */
time_t rdb_save_time_last; /* Time used by last RDB save run. */
time_t rdb_save_time_start; /* Current RDB save start time. */
+ int rdb_child_type; /* Type of save by active child. */
int lastbgsave_status; /* REDIS_OK or REDIS_ERR */
int stop_writes_on_bgsave_err; /* Don't allow writes if can't BGSAVE */
+ int rdb_pipe_write_result_to_parent; /* RDB pipes used to return the state */
+ int rdb_pipe_read_result_from_child; /* of each slave in diskless SYNC. */
/* Propagation of commands in AOF / replication */
redisOpArray also_propagate; /* Additional command to propagate. */
/* Logging */
@@ -730,6 +753,8 @@ struct redisServer {
int repl_min_slaves_to_write; /* Min number of slaves to write. */
int repl_min_slaves_max_lag; /* Max lag of <count> slaves to write. */
int repl_good_slaves_count; /* Number of slaves with lag <= max_lag. */
+ int repl_diskless_sync; /* Send RDB to slaves sockets directly. */
+ int repl_diskless_sync_delay; /* Delay to start a diskless repl BGSAVE. */
/* Replication (slave) */
char *masterauth; /* AUTH with this password with master */
char *masterhost; /* Hostname of master */
@@ -1022,7 +1047,7 @@ robj *tryObjectEncoding(robj *o);
robj *getDecodedObject(robj *o);
size_t stringObjectLen(robj *o);
robj *createStringObjectFromLongLong(long long value);
-robj *createStringObjectFromLongDouble(long double value);
+robj *createStringObjectFromLongDouble(long double value, int humanfriendly);
robj *createListObject(void);
robj *createZiplistObject(void);
robj *createSetObject(void);
@@ -1051,7 +1076,7 @@ ssize_t syncReadLine(int fd, char *ptr, ssize_t size, long long timeout);
/* Replication */
void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc);
void replicationFeedMonitors(redisClient *c, list *monitors, int dictid, robj **argv, int argc);
-void updateSlavesWaitingBgsave(int bgsaveerr);
+void updateSlavesWaitingBgsave(int bgsaveerr, int type);
void replicationCron(void);
void replicationHandleMasterDisconnection(void);
void replicationCacheMaster(redisClient *c);
@@ -1064,6 +1089,7 @@ int replicationScriptCacheExists(sds sha1);
void replicationSetMaster(char *ip, int port);
void replicationUnsetMaster(void);
void replicationSendNewlineToMaster(void);
+char *replicationGetSlaveName(redisClient *c);
/* Generic persistence functions */
void startLoading(FILE *fp);
diff --git a/src/replication.c b/src/replication.c
index f8b2ed92e..e6e477896 100644
--- a/src/replication.c
+++ b/src/replication.c
@@ -39,6 +39,32 @@
void replicationDiscardCachedMaster(void);
void replicationResurrectCachedMaster(int newfd);
+void replicationSendAck(void);
+void putSlaveOnline(redisClient *slave);
+
+/* --------------------------- Utility functions ---------------------------- */
+
+/* Return the pointer to a string representing the slave ip:listening_port
+ * pair. Mostly useful for logging, since we want to log a slave using its
+ * IP address and it's listening port which is more clear for the user, for
+ * example: "Closing connection with slave 10.1.2.3:6380". */
+char *replicationGetSlaveName(redisClient *c) {
+ static char buf[REDIS_PEER_ID_LEN];
+ char ip[REDIS_IP_STR_LEN];
+
+ ip[0] = '\0';
+ buf[0] = '\0';
+ if (anetPeerToString(c->fd,ip,sizeof(ip),NULL) != -1) {
+ if (c->slave_listening_port)
+ snprintf(buf,sizeof(buf),"%s:%d",ip,c->slave_listening_port);
+ else
+ snprintf(buf,sizeof(buf),"%s:<unknown-slave-port>",ip);
+ } else {
+ snprintf(buf,sizeof(buf),"client id #%llu",
+ (unsigned long long) c->id);
+ }
+ return buf;
+}
/* ---------------------------------- MASTER -------------------------------- */
@@ -211,7 +237,7 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
}
/* Write the command to every slave. */
- listRewind(slaves,&li);
+ listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
redisClient *slave = ln->value;
@@ -344,7 +370,8 @@ int masterTryPartialResynchronization(redisClient *c) {
"Runid mismatch (Client asked for '%s', I'm '%s')",
master_runid, server.runid);
} else {
- redisLog(REDIS_NOTICE,"Full resync requested by slave.");
+ redisLog(REDIS_NOTICE,"Full resync requested by slave %s",
+ replicationGetSlaveName(c));
}
goto need_full_resync;
}
@@ -357,10 +384,10 @@ int masterTryPartialResynchronization(redisClient *c) {
psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen))
{
redisLog(REDIS_NOTICE,
- "Unable to partial resync with the slave for lack of backlog (Slave request was: %lld).", psync_offset);
+ "Unable to partial resync with slave %s for lack of backlog (Slave request was: %lld).", replicationGetSlaveName(c), psync_offset);
if (psync_offset > server.master_repl_offset) {
redisLog(REDIS_WARNING,
- "Warning: slave tried to PSYNC with an offset that is greater than the master replication offset.");
+ "Warning: slave %s tried to PSYNC with an offset that is greater than the master replication offset.", replicationGetSlaveName(c));
}
goto need_full_resync;
}
@@ -372,6 +399,7 @@ int masterTryPartialResynchronization(redisClient *c) {
c->flags |= REDIS_SLAVE;
c->replstate = REDIS_REPL_ONLINE;
c->repl_ack_time = server.unixtime;
+ c->repl_put_online_on_ack = 0;
listAddNodeTail(server.slaves,c);
/* We can't use the connection buffers since they are used to accumulate
* new commands at this stage. But we are sure the socket send buffer is
@@ -383,7 +411,9 @@ int masterTryPartialResynchronization(redisClient *c) {
}
psync_len = addReplyReplicationBacklog(c,psync_offset);
redisLog(REDIS_NOTICE,
- "Partial resynchronization request accepted. Sending %lld bytes of backlog starting from offset %lld.", psync_len, psync_offset);
+ "Partial resynchronization request from %s accepted. Sending %lld bytes of backlog starting from offset %lld.",
+ replicationGetSlaveName(c),
+ psync_len, psync_offset);
/* Note that we don't need to set the selected DB at server.slaveseldb
* to -1 to force the master to emit SELECT, since the slave already
* has this state from the previous connection with the master. */
@@ -407,6 +437,28 @@ need_full_resync:
return REDIS_ERR;
}
+/* Start a BGSAVE for replication goals, which is, selecting the disk or
+ * socket target depending on the configuration, and making sure that
+ * the script cache is flushed before to start.
+ *
+ * Returns REDIS_OK on success or REDIS_ERR otherwise. */
+int startBgsaveForReplication(void) {
+ int retval;
+
+ redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC with target: %s",
+ server.repl_diskless_sync ? "slaves sockets" : "disk");
+
+ if (server.repl_diskless_sync)
+ retval = rdbSaveToSlavesSockets();
+ else
+ retval = rdbSaveBackground(server.rdb_filename);
+
+ /* Flush the script cache, since we need that slave differences are
+ * accumulated without requiring slaves to match our cached scripts. */
+ if (retval == REDIS_OK) replicationScriptCacheFlush();
+ return retval;
+}
+
/* SYNC and PSYNC command implemenation. */
void syncCommand(redisClient *c) {
/* ignore SYNC if already slave or in monitor mode */
@@ -428,7 +480,8 @@ void syncCommand(redisClient *c) {
return;
}
- redisLog(REDIS_NOTICE,"Slave asks for synchronization");
+ redisLog(REDIS_NOTICE,"Slave %s asks for synchronization",
+ replicationGetSlaveName(c));
/* Try a partial resynchronization if this is a PSYNC command.
* If it fails, we continue with usual full resynchronization, however
@@ -464,10 +517,12 @@ void syncCommand(redisClient *c) {
/* Here we need to check if there is a background saving operation
* in progress, or if it is required to start one */
- if (server.rdb_child_pid != -1) {
+ if (server.rdb_child_pid != -1 &&
+ server.rdb_child_type == REDIS_RDB_CHILD_TYPE_DISK)
+ {
/* Ok a background save is in progress. Let's check if it is a good
* one for replication, i.e. if there is another slave that is
- * registering differences since the server forked to save */
+ * registering differences since the server forked to save. */
redisClient *slave;
listNode *ln;
listIter li;
@@ -485,21 +540,35 @@ void syncCommand(redisClient *c) {
redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
} else {
/* No way, we need to wait for the next BGSAVE in order to
- * register differences */
+ * register differences. */
c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
}
+ } else if (server.rdb_child_pid != -1 &&
+ server.rdb_child_type == REDIS_RDB_CHILD_TYPE_SOCKET)
+ {
+ /* There is an RDB child process but it is writing directly to
+ * children sockets. We need to wait for the next BGSAVE
+ * in order to synchronize. */
+ c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
+ redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
} else {
- /* Ok we don't have a BGSAVE in progress, let's start one */
- redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
- if (rdbSaveBackground(server.rdb_filename) != REDIS_OK) {
- redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
- addReplyError(c,"Unable to perform background save");
- return;
+ if (server.repl_diskless_sync) {
+ /* Diskless replication RDB child is created inside
+ * replicationCron() since we want to delay its start a
+ * few seconds to wait for more slaves to arrive. */
+ c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
+ if (server.repl_diskless_sync_delay)
+ redisLog(REDIS_NOTICE,"Delay next BGSAVE for SYNC");
+ } else {
+ /* Ok we don't have a BGSAVE in progress, let's start one. */
+ if (startBgsaveForReplication() != REDIS_OK) {
+ redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
+ addReplyError(c,"Unable to perform background save");
+ return;
+ }
+ c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
}
- c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
- /* Flush the script cache for the new slave. */
- replicationScriptCacheFlush();
}
if (server.repl_disable_tcp_nodelay)
@@ -556,6 +625,11 @@ void replconfCommand(redisClient *c) {
if (offset > c->repl_ack_off)
c->repl_ack_off = offset;
c->repl_ack_time = server.unixtime;
+ /* If this was a diskless replication, we need to really put
+ * the slave online when the first ACK is received (which
+ * confirms slave is online and ready to get more data). */
+ if (c->repl_put_online_on_ack && c->replstate == REDIS_REPL_ONLINE)
+ putSlaveOnline(c);
/* Note: this command does not reply anything! */
return;
} else {
@@ -567,6 +641,32 @@ void replconfCommand(redisClient *c) {
addReply(c,shared.ok);
}
+/* This function puts a slave in the online state, and should be called just
+ * after a slave received the RDB file for the initial synchronization, and
+ * we are finally ready to send the incremental stream of commands.
+ *
+ * It does a few things:
+ *
+ * 1) Put the slave in ONLINE state.
+ * 2) Make sure the writable event is re-installed, since calling the SYNC
+ * command disables it, so that we can accumulate output buffer without
+ * sending it to the slave.
+ * 3) Update the count of good slaves. */
+void putSlaveOnline(redisClient *slave) {
+ slave->replstate = REDIS_REPL_ONLINE;
+ slave->repl_put_online_on_ack = 0;
+ slave->repl_ack_time = server.unixtime;
+ if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
+ sendReplyToClient, slave) == AE_ERR) {
+ redisLog(REDIS_WARNING,"Unable to register writable event for slave bulk transfer: %s", strerror(errno));
+ freeClient(slave);
+ return;
+ }
+ refreshGoodSlavesCount();
+ redisLog(REDIS_NOTICE,"Synchronization with slave %s succeeded",
+ replicationGetSlaveName(slave));
+}
+
void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
redisClient *slave = privdata;
REDIS_NOTUSED(el);
@@ -574,23 +674,29 @@ void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
char buf[REDIS_IOBUF_LEN];
ssize_t nwritten, buflen;
- if (slave->repldboff == 0) {
- /* Write the bulk write count before to transfer the DB. In theory here
- * we don't know how much room there is in the output buffer of the
- * socket, but in practice SO_SNDLOWAT (the minimum count for output
- * operations) will never be smaller than the few bytes we need. */
- sds bulkcount;
-
- bulkcount = sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
- slave->repldbsize);
- if (write(fd,bulkcount,sdslen(bulkcount)) != (signed)sdslen(bulkcount))
- {
- sdsfree(bulkcount);
+ /* Before sending the RDB file, we send the preamble as configured by the
+ * replication process. Currently the preamble is just the bulk count of
+ * the file in the form "$<length>\r\n". */
+ if (slave->replpreamble) {
+ nwritten = write(fd,slave->replpreamble,sdslen(slave->replpreamble));
+ if (nwritten == -1) {
+ redisLog(REDIS_VERBOSE,"Write error sending RDB preamble to slave: %s",
+ strerror(errno));
freeClient(slave);
return;
}
- sdsfree(bulkcount);
+ server.stat_net_output_bytes += nwritten;
+ sdsrange(slave->replpreamble,nwritten,-1);
+ if (sdslen(slave->replpreamble) == 0) {
+ sdsfree(slave->replpreamble);
+ slave->replpreamble = NULL;
+ /* fall through sending data. */
+ } else {
+ return;
+ }
}
+
+ /* If the preamble was already transfered, send the RDB bulk data. */
lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);
if (buflen <= 0) {
@@ -608,30 +714,30 @@ void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
return;
}
slave->repldboff += nwritten;
+ server.stat_net_output_bytes += nwritten;
if (slave->repldboff == slave->repldbsize) {
close(slave->repldbfd);
slave->repldbfd = -1;
aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
- slave->replstate = REDIS_REPL_ONLINE;
- slave->repl_ack_time = server.unixtime;
- if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
- sendReplyToClient, slave) == AE_ERR) {
- redisLog(REDIS_WARNING,"Unable to register writable event for slave bulk transfer: %s", strerror(errno));
- freeClient(slave);
- return;
- }
- refreshGoodSlavesCount();
- redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
+ putSlaveOnline(slave);
}
}
-/* This function is called at the end of every background saving.
- * The argument bgsaveerr is REDIS_OK if the background saving succeeded
- * otherwise REDIS_ERR is passed to the function.
+/* This function is called at the end of every background saving,
+ * or when the replication RDB transfer strategy is modified from
+ * disk to socket or the other way around.
*
* The goal of this function is to handle slaves waiting for a successful
- * background saving in order to perform non-blocking synchronization. */
-void updateSlavesWaitingBgsave(int bgsaveerr) {
+ * background saving in order to perform non-blocking synchronization, and
+ * to schedule a new BGSAVE if there are slaves that attached while a
+ * BGSAVE was in progress, but it was not a good one for replication (no
+ * other slave was accumulating differences).
+ *
+ * The argument bgsaveerr is REDIS_OK if the background saving succeeded
+ * otherwise REDIS_ERR is passed to the function.
+ * The 'type' argument is the type of the child that terminated
+ * (if it had a disk or socket target). */
+void updateSlavesWaitingBgsave(int bgsaveerr, int type) {
listNode *ln;
int startbgsave = 0;
listIter li;
@@ -646,34 +752,50 @@ void updateSlavesWaitingBgsave(int bgsaveerr) {
} else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
struct redis_stat buf;
- if (bgsaveerr != REDIS_OK) {
- freeClient(slave);
- redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
- continue;
- }
- if ((slave->repldbfd = open(server.rdb_filename,O_RDONLY)) == -1 ||
- redis_fstat(slave->repldbfd,&buf) == -1) {
- freeClient(slave);
- redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
- continue;
- }
- slave->repldboff = 0;
- slave->repldbsize = buf.st_size;
- slave->replstate = REDIS_REPL_SEND_BULK;
- aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
- if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) {
- freeClient(slave);
- continue;
+ /* If this was an RDB on disk save, we have to prepare to send
+ * the RDB from disk to the slave socket. Otherwise if this was
+ * already an RDB -> Slaves socket transfer, used in the case of
+ * diskless replication, our work is trivial, we can just put
+ * the slave online. */
+ if (type == REDIS_RDB_CHILD_TYPE_SOCKET) {
+ redisLog(REDIS_NOTICE,
+ "Streamed RDB transfer with slave %s succeeded (socket). Waiting for REPLCONF ACK from slave to enable streaming",
+ replicationGetSlaveName(slave));
+ /* Note: we wait for a REPLCONF ACK message from slave in
+ * order to really put it online (install the write handler
+ * so that the accumulated data can be transfered). However
+ * we change the replication state ASAP, since our slave
+ * is technically online now. */
+ slave->replstate = REDIS_REPL_ONLINE;
+ slave->repl_put_online_on_ack = 1;
+ } else {
+ if (bgsaveerr != REDIS_OK) {
+ freeClient(slave);
+ redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
+ continue;
+ }
+ if ((slave->repldbfd = open(server.rdb_filename,O_RDONLY)) == -1 ||
+ redis_fstat(slave->repldbfd,&buf) == -1) {
+ freeClient(slave);
+ redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
+ continue;
+ }
+ slave->repldboff = 0;
+ slave->repldbsize = buf.st_size;
+ slave->replstate = REDIS_REPL_SEND_BULK;
+ slave->replpreamble = sdscatprintf(sdsempty(),"$%lld\r\n",
+ (unsigned long long) slave->repldbsize);
+
+ aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
+ if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) {
+ freeClient(slave);
+ continue;
+ }
}
}
}
if (startbgsave) {
- /* Since we are starting a new background save for one or more slaves,
- * we flush the Replication Script Cache to use EVAL to propagate every
- * new EVALSHA for the first time, since all the new slaves don't know
- * about previous scripts. */
- replicationScriptCacheFlush();
- if (rdbSaveBackground(server.rdb_filename) != REDIS_OK) {
+ if (startBgsaveForReplication() != REDIS_OK) {
listIter li;
listRewind(server.slaves,&li);
@@ -737,6 +859,12 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
REDIS_NOTUSED(privdata);
REDIS_NOTUSED(mask);
+ /* Static vars used to hold the EOF mark, and the last bytes received
+ * form the server: when they match, we reached the end of the transfer. */
+ static char eofmark[REDIS_RUN_ID_SIZE];
+ static char lastbytes[REDIS_RUN_ID_SIZE];
+ static int usemark = 0;
+
/* If repl_transfer_size == -1 we still have to read the bulk length
* from the master reply. */
if (server.repl_transfer_size == -1) {
@@ -762,16 +890,44 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
redisLog(REDIS_WARNING,"Bad protocol from MASTER, the first byte is not '$' (we received '%s'), are you sure the host and port are right?", buf);
goto error;
}
- server.repl_transfer_size = strtol(buf+1,NULL,10);
- redisLog(REDIS_NOTICE,
- "MASTER <-> SLAVE sync: receiving %lld bytes from master",
- (long long) server.repl_transfer_size);
+
+ /* There are two possible forms for the bulk payload. One is the
+ * usual $<count> bulk format. The other is used for diskless transfers
+ * when the master does not know beforehand the size of the file to
+ * transfer. In the latter case, the following format is used:
+ *
+ * $EOF:<40 bytes delimiter>
+ *
+ * At the end of the file the announced delimiter is transmitted. The
+ * delimiter is long and random enough that the probability of a
+ * collision with the actual file content can be ignored. */
+ if (strncmp(buf+1,"EOF:",4) == 0 && strlen(buf+5) >= REDIS_RUN_ID_SIZE) {
+ usemark = 1;
+ memcpy(eofmark,buf+5,REDIS_RUN_ID_SIZE);
+ memset(lastbytes,0,REDIS_RUN_ID_SIZE);
+ /* Set any repl_transfer_size to avoid entering this code path
+ * at the next call. */
+ server.repl_transfer_size = 0;
+ redisLog(REDIS_NOTICE,
+ "MASTER <-> SLAVE sync: receiving streamed RDB from master");
+ } else {
+ usemark = 0;
+ server.repl_transfer_size = strtol(buf+1,NULL,10);
+ redisLog(REDIS_NOTICE,
+ "MASTER <-> SLAVE sync: receiving %lld bytes from master",
+ (long long) server.repl_transfer_size);
+ }
return;
}
/* Read bulk data */
- left = server.repl_transfer_size - server.repl_transfer_read;
- readlen = (left < (signed)sizeof(buf)) ? left : (signed)sizeof(buf);
+ if (usemark) {
+ readlen = sizeof(buf);
+ } else {
+ left = server.repl_transfer_size - server.repl_transfer_read;
+ readlen = (left < (signed)sizeof(buf)) ? left : (signed)sizeof(buf);
+ }
+
nread = read(fd,buf,readlen);
if (nread <= 0) {
redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
@@ -779,6 +935,24 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
replicationAbortSyncTransfer();
return;
}
+ server.stat_net_input_bytes += nread;
+
+ /* When a mark is used, we want to detect EOF asap in order to avoid
+ * writing the EOF mark into the file... */
+ int eof_reached = 0;
+
+ if (usemark) {
+ /* Update the last bytes array, and check if it matches our delimiter.*/
+ if (nread >= REDIS_RUN_ID_SIZE) {
+ memcpy(lastbytes,buf+nread-REDIS_RUN_ID_SIZE,REDIS_RUN_ID_SIZE);
+ } else {
+ int rem = REDIS_RUN_ID_SIZE-nread;
+ memmove(lastbytes,lastbytes+nread,rem);
+ memcpy(lastbytes+rem,buf,nread);
+ }
+ if (memcmp(lastbytes,eofmark,REDIS_RUN_ID_SIZE) == 0) eof_reached = 1;
+ }
+
server.repl_transfer_lastio = server.unixtime;
if (write(server.repl_transfer_fd,buf,nread) != nread) {
redisLog(REDIS_WARNING,"Write error or short write writing to the DB dump file needed for MASTER <-> SLAVE synchronization: %s", strerror(errno));
@@ -786,6 +960,16 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
}
server.repl_transfer_read += nread;
+ /* Delete the last 40 bytes from the file if we reached EOF. */
+ if (usemark && eof_reached) {
+ if (ftruncate(server.repl_transfer_fd,
+ server.repl_transfer_read - REDIS_RUN_ID_SIZE) == -1)
+ {
+ redisLog(REDIS_WARNING,"Error truncating the RDB file received from the master for SYNC: %s", strerror(errno));
+ goto error;
+ }
+ }
+
/* Sync data on disk from time to time, otherwise at the end of the transfer
* we may suffer a big delay as the memory buffers are copied into the
* actual disk. */
@@ -800,7 +984,12 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
}
/* Check if the transfer is now complete */
- if (server.repl_transfer_read == server.repl_transfer_size) {
+ if (!usemark) {
+ if (server.repl_transfer_read == server.repl_transfer_size)
+ eof_reached = 1;
+ }
+
+ if (eof_reached) {
if (rename(server.repl_transfer_tmpfile,server.rdb_filename) == -1) {
redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
replicationAbortSyncTransfer();
@@ -1644,7 +1833,9 @@ void replicationCron(void) {
redisClient *slave = ln->value;
if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START ||
- slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
+ (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END &&
+ server.rdb_child_type != REDIS_RDB_CHILD_TYPE_SOCKET))
+ {
if (write(slave->fd, "\n", 1) == -1) {
/* Don't worry, it's just a ping. */
}
@@ -1665,14 +1856,8 @@ void replicationCron(void) {
if (slave->flags & REDIS_PRE_PSYNC) continue;
if ((server.unixtime - slave->repl_ack_time) > server.repl_timeout)
{
- char ip[REDIS_IP_STR_LEN];
- int port;
-
- if (anetPeerToString(slave->fd,ip,sizeof(ip),&port) != -1) {
- redisLog(REDIS_WARNING,
- "Disconnecting timedout slave: %s:%d",
- ip, slave->slave_listening_port);
- }
+ redisLog(REDIS_WARNING, "Disconnecting timedout slave: %s",
+ replicationGetSlaveName(slave));
freeClient(slave);
}
}
@@ -1704,6 +1889,48 @@ void replicationCron(void) {
replicationScriptCacheFlush();
}
+ /* If we are using diskless replication and there are slaves waiting
+ * in WAIT_BGSAVE_START state, check if enough seconds elapsed and
+ * start a BGSAVE.
+ *
+ * This code is also useful to trigger a BGSAVE if the diskless
+ * replication was turned off with CONFIG SET, while there were already
+ * slaves in WAIT_BGSAVE_START state. */
+ if (server.rdb_child_pid == -1 && server.aof_child_pid == -1) {
+ time_t idle, max_idle = 0;
+ int slaves_waiting = 0;
+ listNode *ln;
+ listIter li;
+
+ listRewind(server.slaves,&li);
+ while((ln = listNext(&li))) {
+ redisClient *slave = ln->value;
+ if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
+ idle = server.unixtime - slave->lastinteraction;
+ if (idle > max_idle) max_idle = idle;
+ slaves_waiting++;
+ }
+ }
+
+ if (slaves_waiting && max_idle > server.repl_diskless_sync_delay) {
+ /* Start a BGSAVE. Usually with socket target, or with disk target
+ * if there was a recent socket -> disk config change. */
+ if (startBgsaveForReplication() == REDIS_OK) {
+ /* It started! We need to change the state of slaves
+ * from WAIT_BGSAVE_START to WAIT_BGSAVE_END in case
+ * the current target is disk. Otherwise it was already done
+ * by rdbSaveToSlavesSockets() which is called by
+ * startBgsaveForReplication(). */
+ listRewind(server.slaves,&li);
+ while((ln = listNext(&li))) {
+ redisClient *slave = ln->value;
+ if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
+ slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
+ }
+ }
+ }
+ }
+
/* Refresh the number of slaves with lag <= min-slaves-max-lag. */
refreshGoodSlavesCount();
}
diff --git a/src/rio.c b/src/rio.c
index 44f9b7a01..738e56fd0 100644
--- a/src/rio.c
+++ b/src/rio.c
@@ -55,6 +55,8 @@
#include "config.h"
#include "redis.h"
+/* ------------------------- Buffer I/O implementation ----------------------- */
+
/* Returns 1 or 0 for success/failure. */
static size_t rioBufferWrite(rio *r, const void *buf, size_t len) {
r->io.buffer.ptr = sdscatlen(r->io.buffer.ptr,(char*)buf,len);
@@ -76,6 +78,33 @@ static off_t rioBufferTell(rio *r) {
return r->io.buffer.pos;
}
+/* Flushes any buffer to target device if applicable. Returns 1 on success
+ * and 0 on failures. */
+static int rioBufferFlush(rio *r) {
+ REDIS_NOTUSED(r);
+ return 1; /* Nothing to do, our write just appends to the buffer. */
+}
+
+static const rio rioBufferIO = {
+ rioBufferRead,
+ rioBufferWrite,
+ rioBufferTell,
+ rioBufferFlush,
+ NULL, /* update_checksum */
+ 0, /* current checksum */
+ 0, /* bytes read or written */
+ 0, /* read/write chunk size */
+ { { NULL, 0 } } /* union for io-specific vars */
+};
+
+void rioInitWithBuffer(rio *r, sds s) {
+ *r = rioBufferIO;
+ r->io.buffer.ptr = s;
+ r->io.buffer.pos = 0;
+}
+
+/* --------------------- Stdio file pointer implementation ------------------- */
+
/* Returns 1 or 0 for success/failure. */
static size_t rioFileWrite(rio *r, const void *buf, size_t len) {
size_t retval;
@@ -103,21 +132,17 @@ static off_t rioFileTell(rio *r) {
return ftello(r->io.file.fp);
}
-static const rio rioBufferIO = {
- rioBufferRead,
- rioBufferWrite,
- rioBufferTell,
- NULL, /* update_checksum */
- 0, /* current checksum */
- 0, /* bytes read or written */
- 0, /* read/write chunk size */
- { { NULL, 0 } } /* union for io-specific vars */
-};
+/* Flushes any buffer to target device if applicable. Returns 1 on success
+ * and 0 on failures. */
+static int rioFileFlush(rio *r) {
+ return (fflush(r->io.file.fp) == 0) ? 1 : 0;
+}
static const rio rioFileIO = {
rioFileRead,
rioFileWrite,
rioFileTell,
+ rioFileFlush,
NULL, /* update_checksum */
0, /* current checksum */
0, /* bytes read or written */
@@ -132,12 +157,133 @@ void rioInitWithFile(rio *r, FILE *fp) {
r->io.file.autosync = 0;
}
-void rioInitWithBuffer(rio *r, sds s) {
- *r = rioBufferIO;
- r->io.buffer.ptr = s;
- r->io.buffer.pos = 0;
+/* ------------------- File descriptors set implementation ------------------- */
+
+/* Returns 1 or 0 for success/failure.
+ * The function returns success as long as we are able to correctly write
+ * to at least one file descriptor.
+ *
+ * When buf is NULL adn len is 0, the function performs a flush operation
+ * if there is some pending buffer, so this function is also used in order
+ * to implement rioFdsetFlush(). */
+static size_t rioFdsetWrite(rio *r, const void *buf, size_t len) {
+ ssize_t retval;
+ int j;
+ unsigned char *p = (unsigned char*) buf;
+ int doflush = (buf == NULL && len == 0);
+
+ /* To start we always append to our buffer. If it gets larger than
+ * a given size, we actually write to the sockets. */
+ if (len) {
+ r->io.fdset.buf = sdscatlen(r->io.fdset.buf,buf,len);
+ len = 0; /* Prevent entering the while belove if we don't flush. */
+ if (sdslen(r->io.fdset.buf) > REDIS_IOBUF_LEN) doflush = 1;
+ }
+
+ if (doflush) {
+ p = (unsigned char*) r->io.fdset.buf;
+ len = sdslen(r->io.fdset.buf);
+ }
+
+ /* Write in little chunchs so that when there are big writes we
+ * parallelize while the kernel is sending data in background to
+ * the TCP socket. */
+ while(len) {
+ size_t count = len < 1024 ? len : 1024;
+ int broken = 0;
+ for (j = 0; j < r->io.fdset.numfds; j++) {
+ if (r->io.fdset.state[j] != 0) {
+ /* Skip FDs alraedy in error. */
+ broken++;
+ continue;
+ }
+
+ /* Make sure to write 'count' bytes to the socket regardless
+ * of short writes. */
+ size_t nwritten = 0;
+ while(nwritten != count) {
+ retval = write(r->io.fdset.fds[j],p+nwritten,count-nwritten);
+ if (retval <= 0) {
+ /* With blocking sockets, which is the sole user of this
+ * rio target, EWOULDBLOCK is returned only because of
+ * the SO_SNDTIMEO socket option, so we translate the error
+ * into one more recognizable by the user. */
+ if (retval == -1 && errno == EWOULDBLOCK) errno = ETIMEDOUT;
+ break;
+ }
+ nwritten += retval;
+ }
+
+ if (nwritten != count) {
+ /* Mark this FD as broken. */
+ r->io.fdset.state[j] = errno;
+ if (r->io.fdset.state[j] == 0) r->io.fdset.state[j] = EIO;
+ }
+ }
+ if (broken == r->io.fdset.numfds) return 0; /* All the FDs in error. */
+ p += count;
+ len -= count;
+ r->io.fdset.pos += count;
+ }
+
+ if (doflush) sdsclear(r->io.fdset.buf);
+ return 1;
}
+/* Returns 1 or 0 for success/failure. */
+static size_t rioFdsetRead(rio *r, void *buf, size_t len) {
+ REDIS_NOTUSED(r);
+ REDIS_NOTUSED(buf);
+ REDIS_NOTUSED(len);
+ return 0; /* Error, this target does not support reading. */
+}
+
+/* Returns read/write position in file. */
+static off_t rioFdsetTell(rio *r) {
+ return r->io.fdset.pos;
+}
+
+/* Flushes any buffer to target device if applicable. Returns 1 on success
+ * and 0 on failures. */
+static int rioFdsetFlush(rio *r) {
+ /* Our flush is implemented by the write method, that recognizes a
+ * buffer set to NULL with a count of zero as a flush request. */
+ return rioFdsetWrite(r,NULL,0);
+}
+
+static const rio rioFdsetIO = {
+ rioFdsetRead,
+ rioFdsetWrite,
+ rioFdsetTell,
+ rioFdsetFlush,
+ NULL, /* update_checksum */
+ 0, /* current checksum */
+ 0, /* bytes read or written */
+ 0, /* read/write chunk size */
+ { { NULL, 0 } } /* union for io-specific vars */
+};
+
+void rioInitWithFdset(rio *r, int *fds, int numfds) {
+ int j;
+
+ *r = rioFdsetIO;
+ r->io.fdset.fds = zmalloc(sizeof(int)*numfds);
+ r->io.fdset.state = zmalloc(sizeof(int)*numfds);
+ memcpy(r->io.fdset.fds,fds,sizeof(int)*numfds);
+ for (j = 0; j < numfds; j++) r->io.fdset.state[j] = 0;
+ r->io.fdset.numfds = numfds;
+ r->io.fdset.pos = 0;
+ r->io.fdset.buf = sdsempty();
+}
+
+void rioFreeFdset(rio *r) {
+ zfree(r->io.fdset.fds);
+ zfree(r->io.fdset.state);
+ sdsfree(r->io.fdset.buf);
+}
+
+/* ---------------------------- Generic functions ---------------------------- */
+
/* This function can be installed both in memory and file streams when checksum
* computation is needed. */
void rioGenericUpdateChecksum(rio *r, const void *buf, size_t len) {
@@ -157,7 +303,8 @@ void rioSetAutoSync(rio *r, off_t bytes) {
r->io.file.autosync = bytes;
}
-/* ------------------------------ Higher level interface ---------------------------
+/* --------------------------- Higher level interface --------------------------
+ *
* The following higher level functions use lower level rio.c functions to help
* generating the Redis protocol for the Append Only File. */
diff --git a/src/rio.h b/src/rio.h
index 2d12c6cc7..e5fa0cd33 100644
--- a/src/rio.h
+++ b/src/rio.h
@@ -43,6 +43,7 @@ struct _rio {
size_t (*read)(struct _rio *, void *buf, size_t len);
size_t (*write)(struct _rio *, const void *buf, size_t len);
off_t (*tell)(struct _rio *);
+ int (*flush)(struct _rio *);
/* The update_cksum method if not NULL is used to compute the checksum of
* all the data that was read or written so far. The method should be
* designed so that can be called with the current checksum, and the buf
@@ -61,15 +62,25 @@ struct _rio {
/* Backend-specific vars. */
union {
+ /* In-memory buffer target. */
struct {
sds ptr;
off_t pos;
} buffer;
+ /* Stdio file pointer target. */
struct {
FILE *fp;
off_t buffered; /* Bytes written since last fsync. */
off_t autosync; /* fsync after 'autosync' bytes written. */
} file;
+ /* Multiple FDs target (used to write to N sockets). */
+ struct {
+ int *fds; /* File descriptors. */
+ int *state; /* Error state of each fd. 0 (if ok) or errno. */
+ int numfds;
+ off_t pos;
+ sds buf;
+ } fdset;
} io;
};
@@ -109,8 +120,13 @@ static inline off_t rioTell(rio *r) {
return r->tell(r);
}
+static inline int rioFlush(rio *r) {
+ return r->flush(r);
+}
+
void rioInitWithFile(rio *r, FILE *fp);
void rioInitWithBuffer(rio *r, sds s);
+void rioInitWithFdset(rio *r, int *fds, int numfds);
size_t rioWriteBulkCount(rio *r, char prefix, int count);
size_t rioWriteBulkString(rio *r, const char *buf, size_t len);
diff --git a/src/scripting.c b/src/scripting.c
index 0ae042f49..7dbd5f74f 100644
--- a/src/scripting.c
+++ b/src/scripting.c
@@ -536,6 +536,7 @@ void luaLoadLib(lua_State *lua, const char *libname, lua_CFunction luafunc) {
LUALIB_API int (luaopen_cjson) (lua_State *L);
LUALIB_API int (luaopen_struct) (lua_State *L);
LUALIB_API int (luaopen_cmsgpack) (lua_State *L);
+LUALIB_API int (luaopen_bit) (lua_State *L);
void luaLoadLibraries(lua_State *lua) {
luaLoadLib(lua, "", luaopen_base);
@@ -546,6 +547,7 @@ void luaLoadLibraries(lua_State *lua) {
luaLoadLib(lua, "cjson", luaopen_cjson);
luaLoadLib(lua, "struct", luaopen_struct);
luaLoadLib(lua, "cmsgpack", luaopen_cmsgpack);
+ luaLoadLib(lua, "bit", luaopen_bit);
#if 0 /* Stuff that we don't load currently, for sandboxing concerns. */
luaLoadLib(lua, LUA_LOADLIBNAME, luaopen_package);
diff --git a/src/sds.c b/src/sds.c
index 0ad925b4a..5a3bc82b1 100644
--- a/src/sds.c
+++ b/src/sds.c
@@ -295,7 +295,7 @@ sds sdscpy(sds s, const char *t) {
* conversion. 's' must point to a string with room for at least
* SDS_LLSTR_SIZE bytes.
*
- * The function returns the lenght of the null-terminated string
+ * The function returns the length of the null-terminated string
* representation stored at 's'. */
#define SDS_LLSTR_SIZE 21
int sdsll2str(char *s, long long value) {
@@ -369,7 +369,7 @@ sds sdsfromlonglong(long long value) {
return sdsnewlen(buf,len);
}
-/* Like sdscatpritf() but gets va_list instead of being variadic. */
+/* Like sdscatprintf() but gets va_list instead of being variadic. */
sds sdscatvprintf(sds s, const char *fmt, va_list ap) {
va_list cpy;
char staticbuf[1024], *buf = staticbuf, *t;
@@ -390,7 +390,7 @@ sds sdscatvprintf(sds s, const char *fmt, va_list ap) {
buf[buflen-2] = '\0';
va_copy(cpy,ap);
vsnprintf(buf, buflen, fmt, cpy);
- va_end(ap);
+ va_end(cpy);
if (buf[buflen-2] != '\0') {
if (buf != staticbuf) zfree(buf);
buflen *= 2;
@@ -415,7 +415,7 @@ sds sdscatvprintf(sds s, const char *fmt, va_list ap) {
*
* Example:
*
- * s = sdsempty("Sum is: ");
+ * s = sdsnew("Sum is: ");
* s = sdscatprintf(s,"%d+%d = %d",a,b,a+b).
*
* Often you need to create a string from scratch with the printf-alike
@@ -643,8 +643,8 @@ void sdstoupper(sds s) {
*
* Return value:
*
- * 1 if s1 > s2.
- * -1 if s1 < s2.
+ * positive if s1 > s2.
+ * negative if s1 < s2.
* 0 if s1 and s2 are exactly the same binary string.
*
* If two strings share exactly the same prefix, but one of the two has
diff --git a/src/sort.c b/src/sort.c
index 474a3cf64..372481969 100644
--- a/src/sort.c
+++ b/src/sort.c
@@ -264,16 +264,15 @@ void sortCommand(redisClient *c) {
j++;
}
- /* For the STORE option, or when SORT is called from a Lua script,
- * we want to force a specific ordering even when no explicit ordering
- * was asked (SORT BY nosort). This guarantees that replication / AOF
- * is deterministic.
+ /* When sorting a set with no sort specified, we must sort the output
+ * so the result is consistent across scripting and replication.
*
- * However in the case 'dontsort' is true, but the type to sort is a
- * sorted set, we don't need to do anything as ordering is guaranteed
- * in this special case. */
- if ((storekey || c->flags & REDIS_LUA_CLIENT) &&
- (dontsort && sortval->type != REDIS_ZSET))
+ * The other types (list, sorted set) will retain their native order
+ * even if no sort order is requested, so they remain stable across
+ * scripting and replication. */
+ if (dontsort &&
+ sortval->type == REDIS_SET &&
+ (storekey || c->flags & REDIS_LUA_CLIENT))
{
/* Force ALPHA sorting */
dontsort = 0;
diff --git a/src/sparkline.c b/src/sparkline.c
index 900f26ab7..8e2764aee 100644
--- a/src/sparkline.c
+++ b/src/sparkline.c
@@ -49,7 +49,7 @@ static int label_margin_top = 1;
* sparklineSequenceAddSample(seq, 10, NULL);
* sparklineSequenceAddSample(seq, 20, NULL);
* sparklineSequenceAddSample(seq, 30, "last sample label");
- * sds output = sparklineRender(seq, 80, 4);
+ * sds output = sparklineRender(sdsempty(), seq, 80, 4, SPARKLINE_FILL);
* freeSparklineSequence(seq);
* ------------------------------------------------------------------------- */
@@ -63,6 +63,7 @@ struct sequence *createSparklineSequence(void) {
/* Add a new sample into a sequence. */
void sparklineSequenceAddSample(struct sequence *seq, double value, char *label) {
+ label = (label == NULL || label[0] == '\0') ? NULL : zstrdup(label);
if (seq->length == 0) {
seq->min = seq->max = value;
} else {
diff --git a/src/syncio.c b/src/syncio.c
index 8810a842c..ac2a4a373 100644
--- a/src/syncio.c
+++ b/src/syncio.c
@@ -139,6 +139,7 @@ ssize_t syncReadLine(int fd, char *ptr, ssize_t size, long long timeout) {
*ptr = '\0';
nread++;
}
+ size--;
}
return nread;
}
diff --git a/src/t_hash.c b/src/t_hash.c
index 6bec70686..fff3bf9a4 100644
--- a/src/t_hash.c
+++ b/src/t_hash.c
@@ -568,7 +568,7 @@ void hincrbyfloatCommand(redisClient *c) {
}
value += incr;
- new = createStringObjectFromLongDouble(value);
+ new = createStringObjectFromLongDouble(value,1);
hashTypeTryObjectEncoding(o,&c->argv[2],NULL);
hashTypeSet(o,c->argv[2],new);
addReplyBulk(c,new);
diff --git a/src/t_string.c b/src/t_string.c
index f06693815..96c978add 100644
--- a/src/t_string.c
+++ b/src/t_string.c
@@ -387,7 +387,7 @@ void incrbyfloatCommand(redisClient *c) {
addReplyError(c,"increment would produce NaN or Infinity");
return;
}
- new = createStringObjectFromLongDouble(value);
+ new = createStringObjectFromLongDouble(value,1);
if (o)
dbOverwrite(c->db,c->argv[1],new);
else
diff --git a/src/version.h b/src/version.h
index 4bf4a6d34..9a736509e 100644
--- a/src/version.h
+++ b/src/version.h
@@ -1 +1 @@
-#define REDIS_VERSION "2.8.17"
+#define REDIS_VERSION "2.8.18"
diff --git a/src/zipmap.c b/src/zipmap.c
index 803fedeec..384b76bba 100644
--- a/src/zipmap.c
+++ b/src/zipmap.c
@@ -51,10 +51,9 @@
* <len> is the length of the following string (key or value).
* <len> lengths are encoded in a single value or in a 5 bytes value.
* If the first byte value (as an unsigned 8 bit value) is between 0 and
- * 252, it's a single-byte length. If it is 253 then a four bytes unsigned
+ * 253, it's a single-byte length. If it is 254 then a four bytes unsigned
* integer follows (in the host byte ordering). A value of 255 is used to
- * signal the end of the hash. The special value 254 is used to mark
- * empty space that can be used to add new key/value pairs.
+ * signal the end of the hash.
*
* <free> is the number of free unused bytes after the string, resulting
* from modification of values associated to a key. For instance if "foo"
diff --git a/src/zmalloc.c b/src/zmalloc.c
index 11616e5ad..6df51a80f 100644
--- a/src/zmalloc.c
+++ b/src/zmalloc.c
@@ -328,27 +328,39 @@ float zmalloc_get_fragmentation_ratio(size_t rss) {
return (float)rss/zmalloc_used_memory();
}
+/* Get the sum of the specified field (converted form kb to bytes) in
+ * /proc/self/smaps. The field must be specified with trailing ":" as it
+ * apperas in the smaps output.
+ *
+ * Example: zmalloc_get_smap_bytes_by_field("Rss:");
+ */
#if defined(HAVE_PROC_SMAPS)
-size_t zmalloc_get_private_dirty(void) {
+size_t zmalloc_get_smap_bytes_by_field(char *field) {
char line[1024];
- size_t pd = 0;
+ size_t bytes = 0;
FILE *fp = fopen("/proc/self/smaps","r");
+ int flen = strlen(field);
if (!fp) return 0;
while(fgets(line,sizeof(line),fp) != NULL) {
- if (strncmp(line,"Private_Dirty:",14) == 0) {
+ if (strncmp(line,field,flen) == 0) {
char *p = strchr(line,'k');
if (p) {
*p = '\0';
- pd += strtol(line+14,NULL,10) * 1024;
+ bytes += strtol(line+flen,NULL,10) * 1024;
}
}
}
fclose(fp);
- return pd;
+ return bytes;
}
#else
-size_t zmalloc_get_private_dirty(void) {
+size_t zmalloc_get_smap_bytes_by_field(char *field) {
+ ((void) field);
return 0;
}
#endif
+
+size_t zmalloc_get_private_dirty(void) {
+ return zmalloc_get_smap_bytes_by_field("Private_Dirty:");
+}
diff --git a/src/zmalloc.h b/src/zmalloc.h
index 72a4f8138..4de2cffea 100644
--- a/src/zmalloc.h
+++ b/src/zmalloc.h
@@ -76,6 +76,7 @@ void zmalloc_set_oom_handler(void (*oom_handler)(size_t));
float zmalloc_get_fragmentation_ratio(size_t rss);
size_t zmalloc_get_rss(void);
size_t zmalloc_get_private_dirty(void);
+size_t zmalloc_get_smap_bytes_by_field(char *field);
void zlibc_free(void *ptr);
#ifndef HAVE_MALLOC_SIZE
diff --git a/tests/integration/replication.tcl b/tests/integration/replication.tcl
index 767349e56..71a7ec60a 100644
--- a/tests/integration/replication.tcl
+++ b/tests/integration/replication.tcl
@@ -94,79 +94,86 @@ start_server {tags {"repl"}} {
}
}
-start_server {tags {"repl"}} {
- set master [srv 0 client]
- set master_host [srv 0 host]
- set master_port [srv 0 port]
- set slaves {}
- set load_handle0 [start_write_load $master_host $master_port 3]
- set load_handle1 [start_write_load $master_host $master_port 5]
- set load_handle2 [start_write_load $master_host $master_port 20]
- set load_handle3 [start_write_load $master_host $master_port 8]
- set load_handle4 [start_write_load $master_host $master_port 4]
- start_server {} {
- lappend slaves [srv 0 client]
+foreach dl {no yes} {
+ start_server {tags {"repl"}} {
+ set master [srv 0 client]
+ $master config set repl-diskless-sync $dl
+ set master_host [srv 0 host]
+ set master_port [srv 0 port]
+ set slaves {}
+ set load_handle0 [start_write_load $master_host $master_port 3]
+ set load_handle1 [start_write_load $master_host $master_port 5]
+ set load_handle2 [start_write_load $master_host $master_port 20]
+ set load_handle3 [start_write_load $master_host $master_port 8]
+ set load_handle4 [start_write_load $master_host $master_port 4]
start_server {} {
lappend slaves [srv 0 client]
start_server {} {
lappend slaves [srv 0 client]
- test "Connect multiple slaves at the same time (issue #141)" {
- # Send SALVEOF commands to slaves
- [lindex $slaves 0] slaveof $master_host $master_port
- [lindex $slaves 1] slaveof $master_host $master_port
- [lindex $slaves 2] slaveof $master_host $master_port
-
- # Wait for all the three slaves to reach the "online" state
- set retry 500
- while {$retry} {
- set info [r -3 info]
- if {[string match {*slave0:*state=online*slave1:*state=online*slave2:*state=online*} $info]} {
- break
+ start_server {} {
+ lappend slaves [srv 0 client]
+ test "Connect multiple slaves at the same time (issue #141), diskless=$dl" {
+ # Send SALVEOF commands to slaves
+ [lindex $slaves 0] slaveof $master_host $master_port
+ [lindex $slaves 1] slaveof $master_host $master_port
+ [lindex $slaves 2] slaveof $master_host $master_port
+
+ # Wait for all the three slaves to reach the "online"
+ # state from the POV of the master.
+ set retry 500
+ while {$retry} {
+ set info [r -3 info]
+ if {[string match {*slave0:*state=online*slave1:*state=online*slave2:*state=online*} $info]} {
+ break
+ } else {
+ incr retry -1
+ after 100
+ }
+ }
+ if {$retry == 0} {
+ error "assertion:Slaves not correctly synchronized"
+ }
+
+ # Wait that slaves acknowledge they are online so
+ # we are sure that DBSIZE and DEBUG DIGEST will not
+ # fail because of timing issues.
+ wait_for_condition 500 100 {
+ [lindex [[lindex $slaves 0] role] 3] eq {connected} &&
+ [lindex [[lindex $slaves 1] role] 3] eq {connected} &&
+ [lindex [[lindex $slaves 2] role] 3] eq {connected}
} else {
- incr retry -1
- after 100
+ fail "Slaves still not connected after some time"
}
- }
- if {$retry == 0} {
- error "assertion:Slaves not correctly synchronized"
- }
- # Stop the write load
- stop_write_load $load_handle0
- stop_write_load $load_handle1
- stop_write_load $load_handle2
- stop_write_load $load_handle3
- stop_write_load $load_handle4
-
- # Wait that slaves exit the "loading" state
- wait_for_condition 500 100 {
- ![string match {*loading:1*} [[lindex $slaves 0] info]] &&
- ![string match {*loading:1*} [[lindex $slaves 1] info]] &&
- ![string match {*loading:1*} [[lindex $slaves 2] info]]
- } else {
- fail "Slaves still loading data after too much time"
- }
+ # Stop the write load
+ stop_write_load $load_handle0
+ stop_write_load $load_handle1
+ stop_write_load $load_handle2
+ stop_write_load $load_handle3
+ stop_write_load $load_handle4
+
+ # Make sure that slaves and master have same
+ # number of keys
+ wait_for_condition 500 100 {
+ [$master dbsize] == [[lindex $slaves 0] dbsize] &&
+ [$master dbsize] == [[lindex $slaves 1] dbsize] &&
+ [$master dbsize] == [[lindex $slaves 2] dbsize]
+ } else {
+ fail "Different number of keys between masted and slave after too long time."
+ }
- # Make sure that slaves and master have same number of keys
- wait_for_condition 500 100 {
- [$master dbsize] == [[lindex $slaves 0] dbsize] &&
- [$master dbsize] == [[lindex $slaves 1] dbsize] &&
- [$master dbsize] == [[lindex $slaves 2] dbsize]
- } else {
- fail "Different number of keys between masted and slave after too long time."
+ # Check digests
+ set digest [$master debug digest]
+ set digest0 [[lindex $slaves 0] debug digest]
+ set digest1 [[lindex $slaves 1] debug digest]
+ set digest2 [[lindex $slaves 2] debug digest]
+ assert {$digest ne 0000000000000000000000000000000000000000}
+ assert {$digest eq $digest0}
+ assert {$digest eq $digest1}
+ assert {$digest eq $digest2}
}
-
- # Check digests
- set digest [$master debug digest]
- set digest0 [[lindex $slaves 0] debug digest]
- set digest1 [[lindex $slaves 1] debug digest]
- set digest2 [[lindex $slaves 2] debug digest]
- assert {$digest ne 0000000000000000000000000000000000000000}
- assert {$digest eq $digest0}
- assert {$digest eq $digest1}
- assert {$digest eq $digest2}
- }
- }
+ }
+ }
}
}
}
diff --git a/tests/support/server.tcl b/tests/support/server.tcl
index 233d4ce64..67ee24528 100644
--- a/tests/support/server.tcl
+++ b/tests/support/server.tcl
@@ -70,6 +70,9 @@ proc kill_server config {
if {$::valgrind} {
check_valgrind_errors [dict get $config stderr]
}
+
+ # Remove this pid from the set of active pids in the test server.
+ send_data_packet $::test_server_fd server-killed $pid
}
proc is_alive config {
@@ -204,11 +207,14 @@ proc start_server {options {code undefined}} {
set stderr [format "%s/%s" [dict get $config "dir"] "stderr"]
if {$::valgrind} {
- exec valgrind --suppressions=src/valgrind.sup --show-reachable=no --show-possibly-lost=no --leak-check=full src/redis-server $config_file > $stdout 2> $stderr &
+ set pid [exec valgrind --suppressions=src/valgrind.sup --show-reachable=no --show-possibly-lost=no --leak-check=full src/redis-server $config_file > $stdout 2> $stderr &]
} else {
- exec src/redis-server $config_file > $stdout 2> $stderr &
+ set pid [exec src/redis-server $config_file > $stdout 2> $stderr &]
}
+ # Tell the test server about this new instance.
+ send_data_packet $::test_server_fd server-spawned $pid
+
# check that the server actually started
# ugly but tries to be as fast as possible...
if {$::valgrind} {set retrynum 1000} else {set retrynum 100}
@@ -234,9 +240,9 @@ proc start_server {options {code undefined}} {
return
}
- # find out the pid
- while {![info exists pid]} {
- regexp {\[(\d+)\]} [exec cat $stdout] _ pid
+ # Wait for actual startup
+ while {![info exists _pid]} {
+ regexp {PID:\s(\d+)} [exec cat $stdout] _ _pid
after 100
}
diff --git a/tests/test_helper.tcl b/tests/test_helper.tcl
index 6125a95d4..d1ebde1c4 100644
--- a/tests/test_helper.tcl
+++ b/tests/test_helper.tcl
@@ -65,6 +65,9 @@ set ::file ""; # If set, runs only the tests in this comma separated list
set ::curfile ""; # Hold the filename of the current suite
set ::accurate 0; # If true runs fuzz tests with more iterations
set ::force_failure 0
+set ::timeout 600; # 10 minutes without progresses will quit the test.
+set ::last_progress [clock seconds]
+set ::active_servers {} ; # Pids of active Redis instances.
# Set to 1 when we are running in client mode. The Redis test uses a
# server-client model to run tests simultaneously. The server instance
@@ -200,11 +203,19 @@ proc test_server_main {} {
vwait forever
}
-# This function gets called 10 times per second, for now does nothing but
-# may be used in the future in order to detect test clients taking too much
-# time to execute the task.
+# This function gets called 10 times per second.
proc test_server_cron {} {
- # Do some work here.
+ set elapsed [expr {[clock seconds]-$::last_progress}]
+
+ if {$elapsed > $::timeout} {
+ set err "\[[colorstr red TIMEOUT]\]: clients state report follows."
+ puts $err
+ show_clients_state
+ kill_clients
+ force_kill_all_servers
+ the_end
+ }
+
after 100 test_server_cron
}
@@ -230,6 +241,8 @@ proc read_from_test_client fd {
set bytes [gets $fd]
set payload [read $fd $bytes]
foreach {status data} $payload break
+ set ::last_progress [clock seconds]
+
if {$status eq {ready}} {
if {!$::quiet} {
puts "\[$status\]: $data"
@@ -256,12 +269,15 @@ proc read_from_test_client fd {
set ::active_clients_task($fd) "(ERR) $data"
} elseif {$status eq {exception}} {
puts "\[[colorstr red $status]\]: $data"
- foreach p $::clients_pids {
- catch {exec kill -9 $p}
- }
+ kill_clients
+ force_kill_all_servers
exit 1
} elseif {$status eq {testing}} {
set ::active_clients_task($fd) "(IN PROGRESS) $data"
+ } elseif {$status eq {server-spawned}} {
+ lappend ::active_servers $data
+ } elseif {$status eq {server-killed}} {
+ set ::active_servers [lsearch -all -inline -not -exact $::active_servers $data]
} else {
if {!$::quiet} {
puts "\[$status\]: $data"
@@ -269,6 +285,31 @@ proc read_from_test_client fd {
}
}
+proc show_clients_state {} {
+ # The following loop is only useful for debugging tests that may
+ # enter an infinite loop. Commented out normally.
+ foreach x $::active_clients {
+ if {[info exist ::active_clients_task($x)]} {
+ puts "$x => $::active_clients_task($x)"
+ } else {
+ puts "$x => ???"
+ }
+ }
+}
+
+proc kill_clients {} {
+ foreach p $::clients_pids {
+ catch {exec kill $p}
+ }
+}
+
+proc force_kill_all_servers {} {
+ foreach p $::active_servers {
+ puts "Killing still running Redis server $p"
+ catch {exec kill -9 $p}
+ }
+}
+
# A new client is idle. Remove it from the list of active clients and
# if there are still test units to run, launch them.
proc signal_idle_client fd {
@@ -276,17 +317,7 @@ proc signal_idle_client fd {
set ::active_clients \
[lsearch -all -inline -not -exact $::active_clients $fd]
- if 0 {
- # The following loop is only useful for debugging tests that may
- # enter an infinite loop. Commented out normally.
- foreach x $::active_clients {
- if {[info exist ::active_clients_task($x)]} {
- puts "$x => $::active_clients_task($x)"
- } else {
- puts "$x => ???"
- }
- }
- }
+ if 0 {show_clients_state}
# New unit to process?
if {$::next_test != [llength $::all_tests]} {
@@ -361,7 +392,8 @@ proc print_help_screen {} {
"--quiet Don't show individual tests."
"--single <unit> Just execute the specified unit (see next option)."
"--list-tests List all the available test units."
- "--clients <num> Number of test clients (16)."
+ "--clients <num> Number of test clients (default 16)."
+ "--timeout <sec> Test timeout in seconds (default 10 min)."
"--force-failure Force the execution of a test that always fails."
"--help Print this help screen."
} "\n"]
@@ -410,6 +442,9 @@ for {set j 0} {$j < [llength $argv]} {incr j} {
} elseif {$opt eq {--clients}} {
set ::numclients $arg
incr j
+ } elseif {$opt eq {--timeout}} {
+ set ::timeout $arg
+ incr j
} elseif {$opt eq {--help}} {
print_help_screen
exit 0
diff --git a/tests/unit/scan.tcl b/tests/unit/scan.tcl
index 2b1033e39..1d84f128d 100644
--- a/tests/unit/scan.tcl
+++ b/tests/unit/scan.tcl
@@ -226,4 +226,14 @@ start_server {tags {"scan"}} {
set res [r zscan mykey 0 MATCH foo* COUNT 10000]
lsort -unique [lindex $res 1]
}
+
+ test "ZSCAN scores: regression test for issue #2175" {
+ r del mykey
+ for {set j 0} {$j < 500} {incr j} {
+ r zadd mykey 9.8813129168249309e-323 $j
+ }
+ set res [lindex [r zscan mykey 0] 1]
+ set first_score [lindex $res 1]
+ assert {$first_score != 0}
+ }
}
diff --git a/tests/unit/scripting.tcl b/tests/unit/scripting.tcl
index 07016bc04..e1cd2174b 100644
--- a/tests/unit/scripting.tcl
+++ b/tests/unit/scripting.tcl
@@ -184,6 +184,94 @@ start_server {tags {"scripting"}} {
set e
} {*against a key*}
+ test {EVAL - JSON numeric decoding} {
+ # We must return the table as a string because otherwise
+ # Redis converts floats to ints and we get 0 and 1023 instead
+ # of 0.0003 and 1023.2 as the parsed output.
+ r eval {return
+ table.concat(
+ cjson.decode(
+ "[0.0, -5e3, -1, 0.3e-3, 1023.2, 0e10]"), " ")
+ } 0
+ } {0 -5000 -1 0.0003 1023.2 0}
+
+ test {EVAL - JSON string decoding} {
+ r eval {local decoded = cjson.decode('{"keya": "a", "keyb": "b"}')
+ return {decoded.keya, decoded.keyb}
+ } 0
+ } {a b}
+
+ test {EVAL - cmsgpack can pack double?} {
+ r eval {local encoded = cmsgpack.pack(0.1)
+ local h = ""
+ for i = 1, #encoded do
+ h = h .. string.format("%02x",string.byte(encoded,i))
+ end
+ return h
+ } 0
+ } {cb3fb999999999999a}
+
+ test {EVAL - cmsgpack can pack negative int64?} {
+ r eval {local encoded = cmsgpack.pack(-1099511627776)
+ local h = ""
+ for i = 1, #encoded do
+ h = h .. string.format("%02x",string.byte(encoded,i))
+ end
+ return h
+ } 0
+ } {d3ffffff0000000000}
+
+ test {EVAL - cmsgpack can pack and unpack circular references?} {
+ r eval {local a = {x=nil,y=5}
+ local b = {x=a}
+ a['x'] = b
+ local encoded = cmsgpack.pack(a)
+ local h = ""
+ -- cmsgpack encodes to a depth of 16, but can't encode
+ -- references, so the encoded object has a deep copy recusive
+ -- depth of 16.
+ for i = 1, #encoded do
+ h = h .. string.format("%02x",string.byte(encoded,i))
+ end
+ -- when unpacked, re.x.x != re because the unpack creates
+ -- individual tables down to a depth of 16.
+ -- (that's why the encoded output is so large)
+ local re = cmsgpack.unpack(encoded)
+ assert(re)
+ assert(re.x)
+ assert(re.x.x.y == re.y)
+ assert(re.x.x.x.x.y == re.y)
+ assert(re.x.x.x.x.x.x.y == re.y)
+ assert(re.x.x.x.x.x.x.x.x.x.x.y == re.y)
+ -- maximum working depth:
+ assert(re.x.x.x.x.x.x.x.x.x.x.x.x.x.x.y == re.y)
+ -- now the last x would be b above and has no y
+ assert(re.x.x.x.x.x.x.x.x.x.x.x.x.x.x.x)
+ -- so, the final x.x is at the depth limit and was assigned nil
+ assert(re.x.x.x.x.x.x.x.x.x.x.x.x.x.x.x.x == nil)
+ return {h, re.x.x.x.x.x.x.x.x.y == re.y, re.y == 5}
+ } 0
+ } {82a17905a17881a17882a17905a17881a17882a17905a17881a17882a17905a17881a17882a17905a17881a17882a17905a17881a17882a17905a17881a17882a17905a17881a178c0 1 1}
+
+ test {EVAL - Numerical sanity check from bitop} {
+ r eval {assert(0x7fffffff == 2147483647, "broken hex literals");
+ assert(0xffffffff == -1 or 0xffffffff == 2^32-1,
+ "broken hex literals");
+ assert(tostring(-1) == "-1", "broken tostring()");
+ assert(tostring(0xffffffff) == "-1" or
+ tostring(0xffffffff) == "4294967295",
+ "broken tostring()")
+ } 0
+ } {}
+
+ test {EVAL - Verify minimal bitop functionality} {
+ r eval {assert(bit.tobit(1) == 1);
+ assert(bit.band(1) == 1);
+ assert(bit.bxor(1,2) == 3);
+ assert(bit.bor(1,2,4,8,16,32,64,128) == 255)
+ } 0
+ } {}
+
test {SCRIPTING FLUSH - is able to clear the scripts cache?} {
r set mykey myval
set v [r evalsha fd758d1589d044dd850a6f05d52f2eefd27f033f 1 mykey]
diff --git a/tests/unit/sort.tcl b/tests/unit/sort.tcl
index 9903a183f..a25ffeb5c 100644
--- a/tests/unit/sort.tcl
+++ b/tests/unit/sort.tcl
@@ -238,6 +238,24 @@ start_server {
r sort mylist by num get x:*->
} {100}
+ test "SORT by nosort retains native order for lists" {
+ r del testa
+ r lpush testa 2 1 4 3 5
+ r sort testa by nosort
+ } {5 3 4 1 2}
+
+ test "SORT by nosort plus store retains native order for lists" {
+ r del testa
+ r lpush testa 2 1 4 3 5
+ r sort testa by nosort store testb
+ r lrange testb 0 -1
+ } {5 3 4 1 2}
+
+ test "SORT by nosort with limit returns based on original list order" {
+ r sort testa by nosort limit 0 3 store testb
+ r lrange testb 0 -1
+ } {5 3 4}
+
tags {"slow"} {
set num 100
set res [create_random_dataset $num lpush]
diff --git a/utils/redis_init_script.tpl b/utils/redis_init_script.tpl
index d65086312..2e5b61301 100755
--- a/utils/redis_init_script.tpl
+++ b/utils/redis_init_script.tpl
@@ -26,11 +26,12 @@ case "$1" in
fi
;;
status)
- if [ ! -f $PIDFILE ]
+ PID=$(cat $PIDFILE)
+ if [ ! -x /proc/${PID} ]
then
echo 'Redis is not running'
else
- echo "Redis is running ($(<$PIDFILE))"
+ echo "Redis is running ($PID)"
fi
;;
restart)
diff --git a/utils/whatisdoing.sh b/utils/whatisdoing.sh
index 8f441cfc0..e4059caed 100755
--- a/utils/whatisdoing.sh
+++ b/utils/whatisdoing.sh
@@ -1,9 +1,15 @@
# This script is from http://poormansprofiler.org/
+#
+# NOTE: Instead of using this script, you should use the Redis
+# Software Watchdog, which provides a similar functionality but in
+# a more reliable / easy to use way.
+#
+# Check http://redis.io/topics/latency for more information.
#!/bin/bash
nsamples=1
sleeptime=0
-pid=$(pidof redis-server)
+pid=$(ps auxww | grep '[r]edis-server' | awk '{print $2}')
for x in $(seq 1 $nsamples)
do