diff options
author | sundb <sundbcn@gmail.com> | 2021-09-02 16:07:51 +0800 |
---|---|---|
committer | Oran Agra <oran@redislabs.com> | 2021-10-04 13:59:19 +0300 |
commit | aabe95125d2cc5d315da575e64bebc8680607fc1 (patch) | |
tree | 91a46b9bc75a0f7b964f82de9f7c82d2950b3817 | |
parent | 03cb27e8ecd456a008192fcb7cb274dab7872335 (diff) | |
download | redis-aabe95125d2cc5d315da575e64bebc8680607fc1.tar.gz |
Fix the timing of read and write events under kqueue (#9416)
Normally we execute the read event first and then the write event.
When the barrier is set, we will do it reverse.
However, under `kqueue`, if an `fd` has both read and write events,
reading the event using `kevent` will generate two events, which will
result in uncontrolled read and write timing.
This also means that the guarantees of AOF `appendfsync` = `always` are
not met on MacOS without this fix.
The main change to this pr is to cache the events already obtained when reading
them, so that if the same `fd` occurs again, only the mask in the cache is updated,
rather than a new event is generated.
This was exposed by the following test failure on MacOS:
```
*** [err]: AOF fsync always barrier issue in tests/integration/aof.tcl
Expected 544 != 544 (context: type eval line 26 cmd {assert {$size1 != $size2}} proc ::test)
```
(cherry picked from commit 306a5ccd2d053ff653988b61a779e3cbce408874)
-rw-r--r-- | src/ae_kqueue.c | 62 |
1 files changed, 55 insertions, 7 deletions
diff --git a/src/ae_kqueue.c b/src/ae_kqueue.c index 6796f4ceb..cff4e25b2 100644 --- a/src/ae_kqueue.c +++ b/src/ae_kqueue.c @@ -36,8 +36,29 @@ typedef struct aeApiState { int kqfd; struct kevent *events; + + /* Events mask for merge read and write event. + * To reduce memory consumption, we use 2 bits to store the mask + * of an event, so that 1 byte will store the mask of 4 events. */ + char *eventsMask; } aeApiState; +#define EVENT_MASK_MALLOC_SIZE(sz) (((sz) + 3) / 4) +#define EVENT_MASK_OFFSET(fd) ((fd) % 4 * 2) +#define EVENT_MASK_ENCODE(fd, mask) (((mask) & 0x3) << EVENT_MASK_OFFSET(fd)) + +static inline int getEventMask(const char *eventsMask, int fd) { + return (eventsMask[fd/4] >> EVENT_MASK_OFFSET(fd)) & 0x3; +} + +static inline void addEventMask(char *eventsMask, int fd, int mask) { + eventsMask[fd/4] |= EVENT_MASK_ENCODE(fd, mask); +} + +static inline void resetEventMask(char *eventsMask, int fd) { + eventsMask[fd/4] &= ~EVENT_MASK_ENCODE(fd, 0x3); +} + static int aeApiCreate(aeEventLoop *eventLoop) { aeApiState *state = zmalloc(sizeof(aeApiState)); @@ -53,6 +74,8 @@ static int aeApiCreate(aeEventLoop *eventLoop) { zfree(state); return -1; } + state->eventsMask = zmalloc(EVENT_MASK_MALLOC_SIZE(eventLoop->setsize)); + memset(state->eventsMask, 0, EVENT_MASK_MALLOC_SIZE(eventLoop->setsize)); eventLoop->apidata = state; return 0; } @@ -61,6 +84,8 @@ static int aeApiResize(aeEventLoop *eventLoop, int setsize) { aeApiState *state = eventLoop->apidata; state->events = zrealloc(state->events, sizeof(struct kevent)*setsize); + state->eventsMask = zrealloc(state->eventsMask, EVENT_MASK_MALLOC_SIZE(setsize)); + memset(state->eventsMask, 0, EVENT_MASK_MALLOC_SIZE(setsize)); return 0; } @@ -69,6 +94,7 @@ static void aeApiFree(aeEventLoop *eventLoop) { close(state->kqfd); zfree(state->events); + zfree(state->eventsMask); zfree(state); } @@ -119,15 +145,37 @@ static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) { if (retval > 0) { int j; - numevents = retval; - for(j = 0; j < numevents; j++) { - int mask = 0; + /* Normally we execute the read event first and then the write event. + * When the barrier is set, we will do it reverse. + * + * However, under kqueue, read and write events would be separate + * events, which would make it impossible to control the order of + * reads and writes. So we store the event's mask we've got and merge + * the same fd events later. */ + for (j = 0; j < retval; j++) { struct kevent *e = state->events+j; + int fd = e->ident; + int mask = 0; - if (e->filter == EVFILT_READ) mask |= AE_READABLE; - if (e->filter == EVFILT_WRITE) mask |= AE_WRITABLE; - eventLoop->fired[j].fd = e->ident; - eventLoop->fired[j].mask = mask; + if (e->filter == EVFILT_READ) mask = AE_READABLE; + else if (e->filter == EVFILT_WRITE) mask = AE_WRITABLE; + addEventMask(state->eventsMask, fd, mask); + } + + /* Re-traversal to merge read and write events, and set the fd's mask to + * 0 so that events are not added again when the fd is encountered again. */ + numevents = 0; + for (j = 0; j < retval; j++) { + struct kevent *e = state->events+j; + int fd = e->ident; + int mask = getEventMask(state->eventsMask, fd); + + if (mask) { + eventLoop->fired[numevents].fd = fd; + eventLoop->fired[numevents].mask = mask; + resetEventMask(state->eventsMask, fd); + numevents++; + } } } return numevents; |