summaryrefslogtreecommitdiff
path: root/src/chunk.c
diff options
context:
space:
mode:
authorGlenn Strauss <gstrauss@gluelogic.com>2021-09-22 16:12:05 -0400
committerGlenn Strauss <gstrauss@gluelogic.com>2021-09-30 17:34:03 -0400
commit6e62b84258fd955b34d3cbc0135c0e742aed4358 (patch)
treee1c6a733c50238aa22c01a997bf08f312b2ddd4b /src/chunk.c
parent6bd6226e90932b7ef1cafe6fb3f846d8cea2e9eb (diff)
downloadlighttpd-git-6e62b84258fd955b34d3cbc0135c0e742aed4358.tar.gz
[core] splice() data from backends to tempfiles
splice() data from backends to tempfiles (where splice() is available); reduce copying data to userspace when writing data to tempfiles Note: splice() on Linux returns EINVAL if target file has O_APPEND set so lighttpd uses pwrite() (where available) when writing to tempfiles (instead of lseek() + write(), or O_APPEND and write())
Diffstat (limited to 'src/chunk.c')
-rw-r--r--src/chunk.c168
1 files changed, 165 insertions, 3 deletions
diff --git a/src/chunk.c b/src/chunk.c
index 5fdff9b9..ab736ede 100644
--- a/src/chunk.c
+++ b/src/chunk.c
@@ -643,7 +643,12 @@ void chunkqueue_steal(chunkqueue * const restrict dest, chunkqueue * const restr
static int chunkqueue_get_append_mkstemp(buffer * const b, const char *path, const uint32_t len) {
buffer_copy_path_len2(b,path,len,CONST_STR_LEN("lighttpd-upload-XXXXXX"));
+ #if defined(HAVE_SPLICE) && defined(HAVE_PWRITE)
+ /*(splice() rejects O_APPEND target; omit flag if also using pwrite())*/
+ return fdevent_mkostemp(b->ptr, 0);
+ #else
return fdevent_mkostemp(b->ptr, O_APPEND);
+ #endif
}
static chunk *chunkqueue_get_append_newtempfile(chunkqueue * const restrict cq, log_error_st * const restrict errh) {
@@ -749,9 +754,13 @@ static int chunkqueue_to_tempfiles(chunkqueue * const restrict dest, log_error_s
chunkqueue src = *dest; /*(copy struct)*/
dest->first = dest->last = NULL;
dest->bytes_in -= cqlen;
- return (0 == chunkqueue_steal_with_tempfiles(dest, &src, cqlen, errh))
- ? 0
- : (chunkqueue_release_chunks(&src), -1);
+ if (0 == chunkqueue_steal_with_tempfiles(dest, &src, cqlen, errh))
+ return 0;
+ else {
+ const int errnum = errno;
+ chunkqueue_release_chunks(&src);
+ return -errnum;
+ }
}
int chunkqueue_append_mem_to_tempfile(chunkqueue * const restrict dest, const char * restrict mem, size_t len, log_error_st * const restrict errh) {
@@ -777,8 +786,13 @@ int chunkqueue_append_mem_to_tempfile(chunkqueue * const restrict dest, const ch
#ifdef __COVERITY__
if (dst_c->file.fd < 0) return -1;
#endif
+ #ifdef HAVE_PWRITE
+ /* coverity[negative_returns : FALSE] */
+ const ssize_t written =pwrite(dst_c->file.fd, mem, len, dst_c->file.length);
+ #else
/* coverity[negative_returns : FALSE] */
const ssize_t written = write(dst_c->file.fd, mem, len);
+ #endif
if ((size_t) written == len) {
dst_c->file.length += len;
@@ -800,6 +814,154 @@ int chunkqueue_append_mem_to_tempfile(chunkqueue * const restrict dest, const ch
return -1;
}
+#ifdef HAVE_SPLICE
+
+__attribute_cold__
+__attribute_noinline__
+static ssize_t chunkqueue_append_drain_pipe_tempfile(chunkqueue * const restrict cq, const int fd, unsigned int len, log_error_st * const restrict errh) {
+ /* attempt to drain full 'len' from pipe
+ * (even if len not reduced to opts->max_per_read limit)
+ * since data may have already been moved from socket to pipe
+ *(returns 0 on success, or -errno (negative errno) if error,
+ * even if partial write occurred)*/
+ char buf[16384];
+ ssize_t rd;
+ do {
+ do {
+ rd = read(fd, buf, sizeof(buf));
+ } while (rd < 0 && errno == EINTR);
+ if (rd < 0) break;
+ if (0 != chunkqueue_append_mem_to_tempfile(cq, buf, (size_t)rd, errh))
+ break;
+ } while ((len -= (unsigned int)rd));
+
+ if (0 == len)
+ return 0;
+ else {
+ const int errnum = errno;
+ if (cq->last && 0 == chunk_remaining_length(cq->last)) {
+ /*(remove empty chunk and unlink tempfile)*/
+ chunkqueue_remove_empty_chunks(cq);
+ }
+ return -errnum;
+ }
+}
+
+ssize_t chunkqueue_append_splice_pipe_tempfile(chunkqueue * const restrict cq, const int fd, unsigned int len, log_error_st * const restrict errh) {
+ /* check if prior MEM_CHUNK(s) exist and write to tempfile
+ * (check first chunk only, since if we are using tempfiles, then
+ * we expect further chunks to be tempfiles after starting tempfiles)*/
+ if (cq->first && cq->first->type == MEM_CHUNK) {
+ int rc = chunkqueue_to_tempfiles(cq, errh);
+ if (__builtin_expect( (0 != rc), 0)) return rc;
+ }
+
+ /*(returns num bytes written, or -errno (negative errno) if error)*/
+ ssize_t total = 0;
+ do {
+ chunk * const c = chunkqueue_get_append_tempfile(cq, errh);
+ if (__builtin_expect( (NULL == c), 0)) return -errno;
+
+ loff_t off = c->file.length;
+ ssize_t wr = splice(fd, NULL, c->file.fd, &off, len,
+ SPLICE_F_MOVE | SPLICE_F_NONBLOCK);
+
+ if (__builtin_expect(((size_t)wr == len), 1)) {
+ c->file.length += len;
+ cq->bytes_in += len;
+ return total + len;
+ }
+ else if (wr >= 0) {
+ /*(assume EINTR if partial write and retry;
+ * retry might fail with ENOSPC if no more space on volume)*/
+ cq->bytes_in += wr;
+ total += wr;
+ len -= (size_t)wr;
+ c->file.length += (size_t)wr;
+ /* continue; retry */
+ }
+ else {
+ const int errnum = errno;
+ switch (errnum) {
+ case EAGAIN:
+ #ifdef EWOULDBLOCK
+ #if EWOULDBLOCK != EAGAIN
+ case EWOULDBLOCK:
+ #endif
+ #endif
+ if (0 == chunk_remaining_length(c)) {
+ /*(remove empty chunk and unlink tempfile)*/
+ chunkqueue_remove_empty_chunks(cq);
+ }
+ return total;
+ case EINVAL: /*(assume total == 0 if EINVAL)*/
+ wr = chunkqueue_append_drain_pipe_tempfile(cq, fd, len, errh);
+ return (0 == wr) ? total + len : wr;
+ default:
+ if (!chunkqueue_append_tempfile_err(cq, errh, c))
+ return -errnum;
+ break; /* else continue; retry */
+ }
+ }
+ } while (len);
+ return -EIO; /*(not reached)*/
+}
+
+static int cqpipes[2] = { -1, -1 };
+
+__attribute_cold__
+__attribute_noinline__
+void chunkqueue_internal_pipes(int init) {
+ /*(intended for internal use within a single lighttpd process;
+ * must be initialized after fork() and graceful-restart to avoid
+ * sharing pipes between processes)*/
+ if (-1 != cqpipes[0]) { close(cqpipes[0]); cqpipes[0] = -1; }
+ if (-1 != cqpipes[1]) { close(cqpipes[1]); cqpipes[1] = -1; }
+ if (init)
+ fdevent_pipe_cloexec(cqpipes, 262144);
+}
+
+__attribute_cold__
+__attribute_noinline__
+static void chunkqueue_pipe_read_discard (void) {
+ char buf[16384];
+ ssize_t rd;
+ do {
+ rd = read(cqpipes[0], buf, sizeof(buf));
+ } while (rd > 0 || (rd < 0 && errno == EINTR));
+ if (rd < 0
+ #ifdef EWOULDBLOCK
+ #if EWOULDBLOCK != EAGAIN
+ && errno != EWOULDBLOCK
+ #endif
+ #endif
+ && errno != EAGAIN) {
+ chunkqueue_internal_pipes(1); /*(close() and re-initialize)*/
+ }
+}
+
+ssize_t chunkqueue_append_splice_sock_tempfile(chunkqueue * const restrict cq, const int fd, unsigned int len, log_error_st * const restrict errh) {
+ /*(returns num bytes written, or -errno (negative errno) if error)*/
+ int * const pipes = cqpipes;
+ if (-1 == pipes[1])
+ return -EINVAL; /*(not configured; not handled here)*/
+
+ /* splice() socket data to intermediate pipe */
+ ssize_t wr = splice(fd, NULL, pipes[1], NULL, len,
+ SPLICE_F_MOVE | SPLICE_F_NONBLOCK);
+ if (__builtin_expect( (wr <= 0), 0))
+ return -EINVAL; /*(reuse to indicate not handled here)*/
+ len = (unsigned int)wr;
+
+ /* splice() data from intermediate pipe to tempfile */
+ wr = chunkqueue_append_splice_pipe_tempfile(cq, pipes[0], len, errh);
+ if (wr < 0) /* expect (wr == (ssize_t)len) or (wr == -1) */
+ chunkqueue_pipe_read_discard();/* discard data from intermediate pipe */
+ return wr;
+}
+
+#endif /* HAVE_SPLICE */
+
int chunkqueue_steal_with_tempfiles(chunkqueue * const restrict dest, chunkqueue * const restrict src, off_t len, log_error_st * const restrict errh) {
while (len > 0) {
chunk *c = src->first;