diff options
author | sundb <sundbcn@gmail.com> | 2021-09-02 16:07:51 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-09-02 11:07:51 +0300 |
commit | 306a5ccd2d053ff653988b61a779e3cbce408874 (patch) | |
tree | 51225e0798b91f4166b950df31fba62e4d418bb8 /src/ae_kqueue.c | |
parent | c9931ddba53f43c291cb61b6976a91496976e2fd (diff) | |
download | redis-306a5ccd2d053ff653988b61a779e3cbce408874.tar.gz |
Fix the timing of read and write events under kqueue (#9416)
Normally we execute the read event first and then the write event.
When the barrier is set, we will do it reverse.
However, under `kqueue`, if an `fd` has both read and write events,
reading the event using `kevent` will generate two events, which will
result in uncontrolled read and write timing.
This also means that the guarantees of AOF `appendfsync` = `always` are
not met on MacOS without this fix.
The main change to this pr is to cache the events already obtained when reading
them, so that if the same `fd` occurs again, only the mask in the cache is updated,
rather than a new event is generated.
This was exposed by the following test failure on MacOS:
```
*** [err]: AOF fsync always barrier issue in tests/integration/aof.tcl
Expected 544 != 544 (context: type eval line 26 cmd {assert {$size1 != $size2}} proc ::test)
```
Diffstat (limited to 'src/ae_kqueue.c')
-rw-r--r-- | src/ae_kqueue.c | 62 |
1 files changed, 55 insertions, 7 deletions
diff --git a/src/ae_kqueue.c b/src/ae_kqueue.c index 8666b03cf..e8454245f 100644 --- a/src/ae_kqueue.c +++ b/src/ae_kqueue.c @@ -36,8 +36,29 @@ typedef struct aeApiState { int kqfd; struct kevent *events; + + /* Events mask for merge read and write event. + * To reduce memory consumption, we use 2 bits to store the mask + * of an event, so that 1 byte will store the mask of 4 events. */ + char *eventsMask; } aeApiState; +#define EVENT_MASK_MALLOC_SIZE(sz) (((sz) + 3) / 4) +#define EVENT_MASK_OFFSET(fd) ((fd) % 4 * 2) +#define EVENT_MASK_ENCODE(fd, mask) (((mask) & 0x3) << EVENT_MASK_OFFSET(fd)) + +static inline int getEventMask(const char *eventsMask, int fd) { + return (eventsMask[fd/4] >> EVENT_MASK_OFFSET(fd)) & 0x3; +} + +static inline void addEventMask(char *eventsMask, int fd, int mask) { + eventsMask[fd/4] |= EVENT_MASK_ENCODE(fd, mask); +} + +static inline void resetEventMask(char *eventsMask, int fd) { + eventsMask[fd/4] &= ~EVENT_MASK_ENCODE(fd, 0x3); +} + static int aeApiCreate(aeEventLoop *eventLoop) { aeApiState *state = zmalloc(sizeof(aeApiState)); @@ -54,6 +75,8 @@ static int aeApiCreate(aeEventLoop *eventLoop) { return -1; } anetCloexec(state->kqfd); + state->eventsMask = zmalloc(EVENT_MASK_MALLOC_SIZE(eventLoop->setsize)); + memset(state->eventsMask, 0, EVENT_MASK_MALLOC_SIZE(eventLoop->setsize)); eventLoop->apidata = state; return 0; } @@ -62,6 +85,8 @@ static int aeApiResize(aeEventLoop *eventLoop, int setsize) { aeApiState *state = eventLoop->apidata; state->events = zrealloc(state->events, sizeof(struct kevent)*setsize); + state->eventsMask = zrealloc(state->eventsMask, EVENT_MASK_MALLOC_SIZE(setsize)); + memset(state->eventsMask, 0, EVENT_MASK_MALLOC_SIZE(setsize)); return 0; } @@ -70,6 +95,7 @@ static void aeApiFree(aeEventLoop *eventLoop) { close(state->kqfd); zfree(state->events); + zfree(state->eventsMask); zfree(state); } @@ -120,15 +146,37 @@ static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) { if (retval > 0) { int j; - numevents = retval; - for(j = 0; j < numevents; j++) { - int mask = 0; + /* Normally we execute the read event first and then the write event. + * When the barrier is set, we will do it reverse. + * + * However, under kqueue, read and write events would be separate + * events, which would make it impossible to control the order of + * reads and writes. So we store the event's mask we've got and merge + * the same fd events later. */ + for (j = 0; j < retval; j++) { struct kevent *e = state->events+j; + int fd = e->ident; + int mask = 0; - if (e->filter == EVFILT_READ) mask |= AE_READABLE; - if (e->filter == EVFILT_WRITE) mask |= AE_WRITABLE; - eventLoop->fired[j].fd = e->ident; - eventLoop->fired[j].mask = mask; + if (e->filter == EVFILT_READ) mask = AE_READABLE; + else if (e->filter == EVFILT_WRITE) mask = AE_WRITABLE; + addEventMask(state->eventsMask, fd, mask); + } + + /* Re-traversal to merge read and write events, and set the fd's mask to + * 0 so that events are not added again when the fd is encountered again. */ + numevents = 0; + for (j = 0; j < retval; j++) { + struct kevent *e = state->events+j; + int fd = e->ident; + int mask = getEventMask(state->eventsMask, fd); + + if (mask) { + eventLoop->fired[numevents].fd = fd; + eventLoop->fired[numevents].mask = mask; + resetEventMask(state->eventsMask, fd); + numevents++; + } } } else if (retval == -1 && errno != EINTR) { panic("aeApiPoll: kevent, %s", strerror(errno)); |