diff options
author | Glenn Strauss <gstrauss@gluelogic.com> | 2021-09-22 16:12:05 -0400 |
---|---|---|
committer | Glenn Strauss <gstrauss@gluelogic.com> | 2021-09-30 17:34:03 -0400 |
commit | 6e62b84258fd955b34d3cbc0135c0e742aed4358 (patch) | |
tree | e1c6a733c50238aa22c01a997bf08f312b2ddd4b /src/chunk.c | |
parent | 6bd6226e90932b7ef1cafe6fb3f846d8cea2e9eb (diff) | |
download | lighttpd-git-6e62b84258fd955b34d3cbc0135c0e742aed4358.tar.gz |
[core] splice() data from backends to tempfiles
splice() data from backends to tempfiles (where splice() is available);
reduce copying data to userspace when writing data to tempfiles
Note: splice() on Linux returns EINVAL if target file has O_APPEND set
so lighttpd uses pwrite() (where available) when writing to tempfiles
(instead of lseek() + write(), or O_APPEND and write())
Diffstat (limited to 'src/chunk.c')
-rw-r--r-- | src/chunk.c | 168 |
1 files changed, 165 insertions, 3 deletions
diff --git a/src/chunk.c b/src/chunk.c index 5fdff9b9..ab736ede 100644 --- a/src/chunk.c +++ b/src/chunk.c @@ -643,7 +643,12 @@ void chunkqueue_steal(chunkqueue * const restrict dest, chunkqueue * const restr static int chunkqueue_get_append_mkstemp(buffer * const b, const char *path, const uint32_t len) { buffer_copy_path_len2(b,path,len,CONST_STR_LEN("lighttpd-upload-XXXXXX")); + #if defined(HAVE_SPLICE) && defined(HAVE_PWRITE) + /*(splice() rejects O_APPEND target; omit flag if also using pwrite())*/ + return fdevent_mkostemp(b->ptr, 0); + #else return fdevent_mkostemp(b->ptr, O_APPEND); + #endif } static chunk *chunkqueue_get_append_newtempfile(chunkqueue * const restrict cq, log_error_st * const restrict errh) { @@ -749,9 +754,13 @@ static int chunkqueue_to_tempfiles(chunkqueue * const restrict dest, log_error_s chunkqueue src = *dest; /*(copy struct)*/ dest->first = dest->last = NULL; dest->bytes_in -= cqlen; - return (0 == chunkqueue_steal_with_tempfiles(dest, &src, cqlen, errh)) - ? 0 - : (chunkqueue_release_chunks(&src), -1); + if (0 == chunkqueue_steal_with_tempfiles(dest, &src, cqlen, errh)) + return 0; + else { + const int errnum = errno; + chunkqueue_release_chunks(&src); + return -errnum; + } } int chunkqueue_append_mem_to_tempfile(chunkqueue * const restrict dest, const char * restrict mem, size_t len, log_error_st * const restrict errh) { @@ -777,8 +786,13 @@ int chunkqueue_append_mem_to_tempfile(chunkqueue * const restrict dest, const ch #ifdef __COVERITY__ if (dst_c->file.fd < 0) return -1; #endif + #ifdef HAVE_PWRITE + /* coverity[negative_returns : FALSE] */ + const ssize_t written =pwrite(dst_c->file.fd, mem, len, dst_c->file.length); + #else /* coverity[negative_returns : FALSE] */ const ssize_t written = write(dst_c->file.fd, mem, len); + #endif if ((size_t) written == len) { dst_c->file.length += len; @@ -800,6 +814,154 @@ int chunkqueue_append_mem_to_tempfile(chunkqueue * const restrict dest, const ch return -1; } +#ifdef HAVE_SPLICE + +__attribute_cold__ +__attribute_noinline__ +static ssize_t chunkqueue_append_drain_pipe_tempfile(chunkqueue * const restrict cq, const int fd, unsigned int len, log_error_st * const restrict errh) { + /* attempt to drain full 'len' from pipe + * (even if len not reduced to opts->max_per_read limit) + * since data may have already been moved from socket to pipe + *(returns 0 on success, or -errno (negative errno) if error, + * even if partial write occurred)*/ + char buf[16384]; + ssize_t rd; + do { + do { + rd = read(fd, buf, sizeof(buf)); + } while (rd < 0 && errno == EINTR); + if (rd < 0) break; + if (0 != chunkqueue_append_mem_to_tempfile(cq, buf, (size_t)rd, errh)) + break; + } while ((len -= (unsigned int)rd)); + + if (0 == len) + return 0; + else { + const int errnum = errno; + if (cq->last && 0 == chunk_remaining_length(cq->last)) { + /*(remove empty chunk and unlink tempfile)*/ + chunkqueue_remove_empty_chunks(cq); + } + return -errnum; + } +} + +ssize_t chunkqueue_append_splice_pipe_tempfile(chunkqueue * const restrict cq, const int fd, unsigned int len, log_error_st * const restrict errh) { + /* check if prior MEM_CHUNK(s) exist and write to tempfile + * (check first chunk only, since if we are using tempfiles, then + * we expect further chunks to be tempfiles after starting tempfiles)*/ + if (cq->first && cq->first->type == MEM_CHUNK) { + int rc = chunkqueue_to_tempfiles(cq, errh); + if (__builtin_expect( (0 != rc), 0)) return rc; + } + + /*(returns num bytes written, or -errno (negative errno) if error)*/ + ssize_t total = 0; + do { + chunk * const c = chunkqueue_get_append_tempfile(cq, errh); + if (__builtin_expect( (NULL == c), 0)) return -errno; + + loff_t off = c->file.length; + ssize_t wr = splice(fd, NULL, c->file.fd, &off, len, + SPLICE_F_MOVE | SPLICE_F_NONBLOCK); + + if (__builtin_expect(((size_t)wr == len), 1)) { + c->file.length += len; + cq->bytes_in += len; + return total + len; + } + else if (wr >= 0) { + /*(assume EINTR if partial write and retry; + * retry might fail with ENOSPC if no more space on volume)*/ + cq->bytes_in += wr; + total += wr; + len -= (size_t)wr; + c->file.length += (size_t)wr; + /* continue; retry */ + } + else { + const int errnum = errno; + switch (errnum) { + case EAGAIN: + #ifdef EWOULDBLOCK + #if EWOULDBLOCK != EAGAIN + case EWOULDBLOCK: + #endif + #endif + if (0 == chunk_remaining_length(c)) { + /*(remove empty chunk and unlink tempfile)*/ + chunkqueue_remove_empty_chunks(cq); + } + return total; + case EINVAL: /*(assume total == 0 if EINVAL)*/ + wr = chunkqueue_append_drain_pipe_tempfile(cq, fd, len, errh); + return (0 == wr) ? total + len : wr; + default: + if (!chunkqueue_append_tempfile_err(cq, errh, c)) + return -errnum; + break; /* else continue; retry */ + } + } + } while (len); + return -EIO; /*(not reached)*/ +} + +static int cqpipes[2] = { -1, -1 }; + +__attribute_cold__ +__attribute_noinline__ +void chunkqueue_internal_pipes(int init) { + /*(intended for internal use within a single lighttpd process; + * must be initialized after fork() and graceful-restart to avoid + * sharing pipes between processes)*/ + if (-1 != cqpipes[0]) { close(cqpipes[0]); cqpipes[0] = -1; } + if (-1 != cqpipes[1]) { close(cqpipes[1]); cqpipes[1] = -1; } + if (init) + fdevent_pipe_cloexec(cqpipes, 262144); +} + +__attribute_cold__ +__attribute_noinline__ +static void chunkqueue_pipe_read_discard (void) { + char buf[16384]; + ssize_t rd; + do { + rd = read(cqpipes[0], buf, sizeof(buf)); + } while (rd > 0 || (rd < 0 && errno == EINTR)); + if (rd < 0 + #ifdef EWOULDBLOCK + #if EWOULDBLOCK != EAGAIN + && errno != EWOULDBLOCK + #endif + #endif + && errno != EAGAIN) { + chunkqueue_internal_pipes(1); /*(close() and re-initialize)*/ + } +} + +ssize_t chunkqueue_append_splice_sock_tempfile(chunkqueue * const restrict cq, const int fd, unsigned int len, log_error_st * const restrict errh) { + /*(returns num bytes written, or -errno (negative errno) if error)*/ + int * const pipes = cqpipes; + if (-1 == pipes[1]) + return -EINVAL; /*(not configured; not handled here)*/ + + /* splice() socket data to intermediate pipe */ + ssize_t wr = splice(fd, NULL, pipes[1], NULL, len, + SPLICE_F_MOVE | SPLICE_F_NONBLOCK); + if (__builtin_expect( (wr <= 0), 0)) + return -EINVAL; /*(reuse to indicate not handled here)*/ + len = (unsigned int)wr; + + /* splice() data from intermediate pipe to tempfile */ + wr = chunkqueue_append_splice_pipe_tempfile(cq, pipes[0], len, errh); + if (wr < 0) /* expect (wr == (ssize_t)len) or (wr == -1) */ + chunkqueue_pipe_read_discard();/* discard data from intermediate pipe */ + return wr; +} + +#endif /* HAVE_SPLICE */ + int chunkqueue_steal_with_tempfiles(chunkqueue * const restrict dest, chunkqueue * const restrict src, off_t len, log_error_st * const restrict errh) { while (len > 0) { chunk *c = src->first; |