summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDonovan Baarda <abo@minkirri.apana.org.au>2021-09-14 11:19:06 +1000
committerDonovan Baarda <abo@minkirri.apana.org.au>2021-09-14 11:19:06 +1000
commit32d824c77e6c534c78eafdf28251d5036309c8c8 (patch)
tree8696e2502be44b3c87d09e68d620923113858e8b
parent8ca6145d1d6247e64db0c185f2a08e1bd7101ca4 (diff)
downloadlibrsync-32d824c77e6c534c78eafdf28251d5036309c8c8.tar.gz
Add functions for getting and consuming contigous buffers from the scoop.
Add static inline fuctions to stream.h for getting and iterating through contiguous data buffers from the scoop. In tube.c remove rs_tube_copy_from_scoop() and rs_tube_copy_from_stream() and just make rs_tube_catchup_copy() iterate through contiguous buffers from the scoop. In rs_tube_catchup() use rs_scoop_eof() to check for eof instead of checking the scoop and stream directly. Remove undefined/unused rs_buffers_copy() from steam.h.
-rw-r--r--src/stream.h84
-rw-r--r--src/tube.c89
2 files changed, 107 insertions, 66 deletions
diff --git a/src/stream.h b/src/stream.h
index e5f6a44..f4c330e 100644
--- a/src/stream.h
+++ b/src/stream.h
@@ -76,8 +76,7 @@
#ifndef STREAM_H
# define STREAM_H
# include "job.h"
-
-size_t rs_buffers_copy(rs_buffers_t *stream, size_t len);
+# include <sys/types.h>
rs_result rs_tube_catchup(rs_job_t *job);
int rs_tube_is_idle(rs_job_t const *job);
@@ -101,4 +100,85 @@ static inline bool rs_scoop_eof(rs_job_t *job)
return !rs_scoop_avail(job) && job->stream->eof_in;
}
+/** Get a pointer to the next input in the scoop. */
+static inline void *rs_scoop_buf(rs_job_t *job)
+{
+ return job->scoop_avail ? (void *)job->scoop_next : (void *)job->
+ stream->next_in;
+}
+
+/** Get the contiguous length of the next input in the scoop. */
+static inline size_t rs_scoop_len(rs_job_t *job)
+{
+ return job->scoop_avail ? job->scoop_avail : job->stream->avail_in;
+}
+
+/** Get the next contiguous buffer of data available in the scoop.
+ *
+ * This will return a pointer to the data and reduce len to the amount of
+ * contiguous data available at that position.
+ *
+ * \param *job - the job instance to use.
+ *
+ * \param *len - the amount of data desired, updated to the amount available.
+ *
+ * \return A pointer to the data. */
+static inline void *rs_scoop_getbuf(rs_job_t *job, size_t *len)
+{
+ size_t max_len = rs_scoop_len(job);
+ if (*len > max_len)
+ *len = max_len;
+ return rs_scoop_buf(job);
+}
+
+/** Iterate through and consume contiguous data buffers in the scoop.
+ *
+ * Example: \code
+ * size_t len=rs_scoop_avail(job);
+ * ssize_t ilen;
+ *
+ * for (buf = rs_scoop_iterbuf(job, &len, &ilen); ilen > 0;
+ * buf = rs_scoop_nextbuf(job, &len, &ilen))
+ * ilen = fwrite(buf, ilen, 1, f);
+ * \endcode
+ *
+ * At each iteration buf and ilen are the data and its length for the current
+ * iteration. During an iteration you can change ilen to indicate only part of
+ * the buffer was processed and the next iteration will take this into account.
+ * Setting ilen <= 0 to indicate blocking or errors will terminate iteration.
+ * The pos and len are updated to the remaining data to iterate through,
+ * including the current iteration.
+ *
+ * At the end of iteration buf and pos will point at the next location in the
+ * scoop after the iterated data, len and ilen will be zero, or the remaining
+ * data and last ilen if iteration was terminated by setting ilen <= 0.
+ *
+ * \param *job - the job instance to use.
+ *
+ * \param *len - the size_t amount of data to iterate over.
+ *
+ * \param *ilen - the ssize_t amount of data in the current iteration.
+ *
+ * \return A pointer to data in the current iteration. */
+static inline void *rs_scoop_iterbuf(rs_job_t *job, size_t *len,
+ ssize_t *ilen)
+{
+ *ilen = *len;
+ return rs_scoop_getbuf(job, (size_t *)ilen);
+}
+
+/** Get the next iteration of contiguous data buffers from the scoop.
+ *
+ * This advances the scoop for the previous iteration, and gets the next
+ * iteration. \sa rs_scoop_iterbuf */
+static inline void *rs_scoop_nextbuf(rs_job_t *job, size_t *len,
+ ssize_t *ilen)
+{
+ if (*ilen <= 0)
+ return rs_scoop_buf(job);
+ rs_scoop_advance(job, *ilen);
+ *len -= *ilen;
+ return rs_scoop_iterbuf(job, len, ilen);
+}
+
#endif /* !STREAM_H */
diff --git a/src/tube.c b/src/tube.c
index a6672d4..00c936b 100644
--- a/src/tube.c
+++ b/src/tube.c
@@ -80,75 +80,37 @@ static void rs_tube_catchup_write(rs_job_t *job)
len, job->write_len);
}
-/** Execute a copy command, taking data from the scoop.
- *
- * \sa rs_tube_catchup_copy() */
-static void rs_tube_copy_from_scoop(rs_job_t *job)
-{
- rs_buffers_t *stream = job->stream;
- size_t len = job->copy_len;
-
- assert(len > 0);
- if (len > job->scoop_avail)
- len = job->scoop_avail;
- if (len > stream->avail_out)
- len = stream->avail_out;
- if (len) {
- memcpy(stream->next_out, job->scoop_next, len);
- stream->next_out += len;
- stream->avail_out -= len;
- job->scoop_avail -= len;
- job->scoop_next += len;
- job->copy_len -= len;
- }
- rs_trace("copied " FMT_SIZE " bytes from scoop, " FMT_SIZE
- " left in scoop, " FMT_SIZE " left to copy", len, job->scoop_avail,
- job->copy_len);
-}
-
-/** Execute a copy command, taking data from the stream.
- *
- * \sa rs_tube_catchup_copy() */
-static void rs_tube_copy_from_stream(rs_job_t *job)
-{
- rs_buffers_t *stream = job->stream;
- size_t len = job->copy_len;
-
- assert(len > 0);
- if (len > stream->avail_in)
- len = stream->avail_in;
- if (len > stream->avail_out)
- len = stream->avail_out;
- if (len) {
- memcpy(stream->next_out, stream->next_in, len);
- stream->next_out += len;
- stream->avail_out -= len;
- stream->next_in += len;
- stream->avail_in -= len;
- job->copy_len -= len;
- }
- rs_trace("copied " FMT_SIZE " bytes from stream, " FMT_SIZE
- "left in stream, " FMT_SIZE " left to copy", len, stream->avail_in,
- job->copy_len);
-}
-
/** Catch up on an outstanding copy command.
*
- * Takes data from the scoop, and the input (in that order), and writes as much
- * as will fit to the output, up to the limit of the outstanding copy. */
+ * Takes data from the scoop and writes as much as will fit to the output, up
+ * to the limit of the outstanding copy. */
static void rs_tube_catchup_copy(rs_job_t *job)
{
assert(job->write_len == 0);
assert(job->copy_len > 0);
-
- /* If there's data in the scoop, send that first. */
- if (job->scoop_avail && job->copy_len) {
- rs_tube_copy_from_scoop(job);
- }
- /* If there's more to copy and we emptied the scoop, send input. */
- if (job->copy_len && !job->scoop_avail) {
- rs_tube_copy_from_stream(job);
+ rs_buffers_t *stream = job->stream;
+ size_t copy_len = job->copy_len;
+ size_t avail_in = rs_scoop_avail(job);
+ size_t avail_out = stream->avail_out;
+ size_t len;
+ ssize_t ilen;
+ void *next;
+
+ if (copy_len > avail_in)
+ copy_len = avail_in;
+ if (copy_len > avail_out)
+ copy_len = avail_out;
+ len = copy_len;
+ for (next = rs_scoop_iterbuf(job, &len, &ilen); ilen > 0;
+ next = rs_scoop_nextbuf(job, &len, &ilen)) {
+ memcpy(stream->next_out, next, ilen);
+ stream->next_out += ilen;
+ stream->avail_out -= ilen;
+ job->copy_len -= ilen;
}
+ rs_trace("copied " FMT_SIZE " bytes from scoop, " FMT_SIZE
+ " left in scoop, " FMT_SIZE " left to copy", copy_len,
+ rs_scoop_avail(job), job->copy_len);
}
/** Put whatever will fit from the tube into the output of the stream.
@@ -166,8 +128,7 @@ rs_result rs_tube_catchup(rs_job_t *job)
if (job->copy_len) {
rs_tube_catchup_copy(job);
if (job->copy_len) {
- if (job->stream->eof_in && !job->stream->avail_in
- && !job->scoop_avail) {
+ if (rs_scoop_eof(job)) {
rs_error("reached end of file while copying data");
return RS_INPUT_ENDED;
}