summaryrefslogtreecommitdiff
path: root/src/pv/transfer.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/pv/transfer.c')
-rw-r--r--src/pv/transfer.c341
1 files changed, 341 insertions, 0 deletions
diff --git a/src/pv/transfer.c b/src/pv/transfer.c
new file mode 100644
index 0000000..55d61b5
--- /dev/null
+++ b/src/pv/transfer.c
@@ -0,0 +1,341 @@
+/*
+ * Functions for transferring between file descriptors.
+ *
+ * Copyright 2012 Andrew Wood, distributed under the Artistic License 2.0.
+ */
+
+#include "options.h"
+
+#define BUFFER_SIZE 409600
+#define BUFFER_SIZE_MAX 524288
+
+#define MAXIMISE_BUFFER_FILL 1
+
+#define _GNU_SOURCE 1 /* for splice() */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <errno.h>
+#include <time.h>
+#include <sys/time.h>
+#include <sys/types.h>
+#include <unistd.h>
+#include <sys/file.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+#include <signal.h>
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+static unsigned long long pv__bufsize = BUFFER_SIZE;
+
+
+/*
+ * Set the buffer size for transfers.
+ */
+void pv_set_buffer_size(unsigned long long sz, int force)
+{
+ if ((sz > BUFFER_SIZE_MAX) && (!force))
+ sz = BUFFER_SIZE_MAX;
+ pv__bufsize = sz;
+}
+
+
+/*
+ * Transfer some data from "fd" to standard output, timing out after 9/100
+ * of a second. If opts->rate_limit is >0, only up to "allowed" bytes can
+ * be written. The variables that "eof_in" and "eof_out" point to are used
+ * to flag that we've finished reading and writing respectively.
+ *
+ * Returns the number of bytes written, or negative on error (in which case
+ * opts->exit_status is updated). In line mode, the number of lines written
+ * will be put into *lineswritten.
+ *
+ * If "opts" is NULL, then the transfer buffer is freed, and zero is
+ * returned.
+ */
+long pv_transfer(opts_t opts, int fd, int *eof_in, int *eof_out,
+ unsigned long long allowed, long *lineswritten)
+{
+ static unsigned char *buf = NULL;
+ static unsigned long long buf_alloced = 0;
+ static unsigned long in_buffer = 0;
+ static unsigned long bytes_written = 0;
+ struct timeval tv;
+ fd_set readfds;
+ fd_set writefds;
+ int max_fd;
+ long to_write, written;
+ ssize_t r, w;
+#ifdef HAVE_SPLICE
+ static int splice_failed_fd = -1;
+ int splice_used = 0;
+#endif
+ int n;
+
+ if (opts == NULL) {
+ if (buf)
+ free(buf);
+ buf = NULL;
+ in_buffer = 0;
+ bytes_written = 0;
+ return 0;
+ }
+
+ if (buf == NULL) {
+ buf_alloced = pv__bufsize;
+ buf = (unsigned char *) malloc(pv__bufsize + 32);
+ if (buf == NULL) {
+ fprintf(stderr, "%s: %s: %s\n",
+ opts->program_name,
+ _("buffer allocation failed"),
+ strerror(errno));
+ opts->exit_status |= 64;
+ return -1;
+ }
+ }
+
+ /*
+ * Reallocate the buffer if the buffer size has changed mid-transfer.
+ */
+ if (buf_alloced < pv__bufsize) {
+ unsigned char *newptr;
+ newptr =
+ realloc( /* RATS: ignore */ buf, pv__bufsize + 32);
+ if (newptr == NULL) {
+ pv__bufsize = buf_alloced;
+ } else {
+ buf = newptr;
+ buf_alloced = pv__bufsize;
+ }
+ }
+
+ if ((opts->linemode) && (lineswritten != NULL))
+ *lineswritten = 0;
+
+ tv.tv_sec = 0;
+ tv.tv_usec = 90000;
+
+ FD_ZERO(&readfds);
+ FD_ZERO(&writefds);
+
+ max_fd = 0;
+
+ if ((!(*eof_in)) && (in_buffer < pv__bufsize)) {
+ FD_SET(fd, &readfds);
+ if (fd > max_fd)
+ max_fd = fd;
+ }
+
+ to_write = in_buffer - bytes_written;
+ if (opts->rate_limit > 0) {
+ if (to_write > allowed) {
+ to_write = allowed;
+ }
+ }
+
+ if ((!(*eof_out)) && (to_write > 0)) {
+ FD_SET(STDOUT_FILENO, &writefds);
+ if (STDOUT_FILENO > max_fd)
+ max_fd = STDOUT_FILENO;
+ }
+
+ if ((*eof_in) && (*eof_out))
+ return 0;
+
+ n = select(max_fd + 1, &readfds, &writefds, NULL, &tv);
+
+ if (n < 0) {
+ if (errno == EINTR)
+ return 0;
+ fprintf(stderr, "%s: %s: %s: %d: %s\n",
+ opts->program_name, opts->current_file,
+ _("select call failed"), n, strerror(errno));
+ opts->exit_status |= 16;
+ return -1;
+ }
+
+ written = 0;
+
+ if (FD_ISSET(fd, &readfds)) {
+#ifdef HAVE_SPLICE
+ splice_used = 0;
+ if ((!opts->linemode) && (fd != splice_failed_fd)
+ && (to_write == 0)) {
+ r = splice(fd, NULL, STDOUT_FILENO, NULL, allowed,
+ SPLICE_F_MORE);
+ splice_used = 1;
+ if ((r < 0) && (errno == EINVAL)) {
+ splice_failed_fd = fd;
+ splice_used = 0;
+ } else if (r > 0) {
+ written = r;
+ } else {
+ /* EOF might not really be EOF, it seems */
+ splice_used = 0;
+ }
+ }
+ if (splice_used == 0) {
+ r = read( /* RATS: ignore (checked OK) */ fd,
+ buf + in_buffer, pv__bufsize - in_buffer);
+ }
+#else
+ r = read( /* RATS: ignore (checked OK) */ fd,
+ buf + in_buffer, pv__bufsize - in_buffer);
+#endif /* HAVE_SPLICE */
+ if (r < 0) {
+ /*
+ * If a read error occurred but it was EINTR or
+ * EAGAIN, just wait a bit and then return zero,
+ * since this was a transient error.
+ */
+ if ((errno == EINTR) || (errno == EAGAIN)) {
+ tv.tv_sec = 0;
+ tv.tv_usec = 10000;
+ select(0, NULL, NULL, NULL, &tv);
+ return 0;
+ }
+ fprintf(stderr, "%s: %s: %s: %s\n",
+ opts->program_name,
+ opts->current_file,
+ _("read failed"), strerror(errno));
+ opts->exit_status |= 16;
+ *eof_in = 1;
+ if (bytes_written >= in_buffer)
+ *eof_out = 1;
+ } else if (r == 0) {
+ *eof_in = 1;
+ if (bytes_written >= in_buffer)
+ *eof_out = 1;
+ } else {
+#ifdef HAVE_SPLICE
+ if (splice_used == 0)
+ in_buffer += r;
+#else
+ in_buffer += r;
+#endif /* HAVE_SPLICE */
+
+ }
+ }
+
+ /*
+ * In line mode, only write up to and including the last newline,
+ * so that we're writing output line-by-line.
+ */
+ if ((to_write > 0) && (opts->linemode)) {
+ /*
+ * Guillaume Marcais: use strrchr to find last \n
+ */
+ unsigned char save;
+ char *start;
+ char *end;
+
+ save = buf[bytes_written + to_write];
+ buf[bytes_written + to_write] = 0;
+
+ start = (char *) (buf + bytes_written);
+ end = strrchr(start, '\n');
+ buf[bytes_written + to_write] = save;
+
+ if (end != NULL) {
+ to_write = (end - start) + 1;
+ }
+ }
+
+ if (FD_ISSET(STDOUT_FILENO, &writefds)
+#ifdef HAVE_SPLICE
+ && (splice_used == 0)
+#endif /* HAVE_SPLICE */
+ && (in_buffer > bytes_written)
+ && (to_write > 0)) {
+
+ signal(SIGALRM, SIG_IGN); /* RATS: ignore */
+ alarm(1);
+
+ w = write(STDOUT_FILENO, buf + bytes_written, to_write);
+
+ alarm(0);
+
+ if (w < 0) {
+ /*
+ * If a write error occurred but it was EINTR or
+ * EAGAIN, just wait a bit and then return zero,
+ * since this was a transient error.
+ */
+ if ((errno == EINTR) || (errno == EAGAIN)) {
+ tv.tv_sec = 0;
+ tv.tv_usec = 10000;
+ select(0, NULL, NULL, NULL, &tv);
+ return 0;
+ }
+ /*
+ * SIGPIPE means we've finished. Don't output an
+ * error because it's not really our error to report.
+ */
+ if (errno == EPIPE) {
+ *eof_in = 1;
+ *eof_out = 1;
+ return 0;
+ }
+ fprintf(stderr, "%s: %s: %s\n",
+ opts->program_name,
+ _("write failed"), strerror(errno));
+ opts->exit_status |= 16;
+ *eof_out = 1;
+ written = -1;
+ } else if (w == 0) {
+ *eof_out = 1;
+ } else {
+ if ((opts->linemode) && (lineswritten != NULL)) {
+ /*
+ * Guillaume Marcais: use strchr to count \n
+ */
+ unsigned char save;
+ char *ptr;
+ long lines = 0;
+
+ save = buf[bytes_written + w];
+ buf[bytes_written + w] = 0;
+ ptr = (char *) (buf + bytes_written - 1);
+
+ while ((ptr =
+ strchr((char *) (ptr + 1), '\n')))
+ ++lines;
+
+ *lineswritten += lines;
+ buf[bytes_written + w] = save;
+ }
+ bytes_written += w;
+ written += w;
+ if (bytes_written >= in_buffer) {
+ bytes_written = 0;
+ in_buffer = 0;
+ if (*eof_in)
+ *eof_out = 1;
+ }
+ }
+ }
+#ifdef MAXIMISE_BUFFER_FILL
+ /*
+ * Rotate the written bytes out of the buffer so that it can be
+ * filled up completely by the next read.
+ */
+ if (bytes_written > 0) {
+ if (bytes_written < in_buffer) {
+ memmove(buf, buf + bytes_written,
+ in_buffer - bytes_written);
+ in_buffer -= bytes_written;
+ bytes_written = 0;
+ } else {
+ bytes_written = 0;
+ in_buffer = 0;
+ }
+ }
+#endif /* MAXIMISE_BUFFER_FILL */
+ return written;
+}
+
+/* EOF */