summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDonovan Baarda <abo@minkirri.apana.org.au>2021-09-16 11:37:18 +1000
committerGitHub <noreply@github.com>2021-09-16 11:37:18 +1000
commitd202e4eef208165aa7768ad15236e1cd1d6fae0c (patch)
tree7fe3f27a77873051d571b21880254fda9ab7e2cd
parentb8609e9fb1ca888c852f6a7cf5b96690098f94e1 (diff)
parentc50bd630dde1c62c56cac0e917570a8c13173e07 (diff)
downloadlibrsync-d202e4eef208165aa7768ad15236e1cd1d6fae0c.tar.gz
Merge pull request #234 from dbaarda/dev/scoop1
Make delta directly process the input stream if it has enough data.
-rw-r--r--NEWS.md13
-rw-r--r--src/buf.c55
-rw-r--r--src/delta.c130
-rw-r--r--src/job.c7
-rw-r--r--src/job.h31
-rw-r--r--src/librsync.h16
-rw-r--r--src/mksum.c2
-rw-r--r--src/netint.c2
-rw-r--r--src/patch.c2
-rw-r--r--src/readsums.c2
-rw-r--r--src/scoop.c45
-rw-r--r--src/scoop.h (renamed from src/stream.h)117
-rw-r--r--src/tube.c90
-rw-r--r--src/whole.c11
14 files changed, 292 insertions, 231 deletions
diff --git a/NEWS.md b/NEWS.md
index 1c72ab1..3dc9476 100644
--- a/NEWS.md
+++ b/NEWS.md
@@ -4,6 +4,19 @@
NOT RELEASED YET
+ * Make delta directly process the input stream if it has enough data. Delta
+ operations will only accumulate data into the internal scoop buffer if the
+ input buffer is too small, otherwise it will process the input directly.
+ This makes delta calculations 5%~15% faster by avoiding extra data copying.
+ (dbaarda, https://github.com/librsync/librsync/pull/234)
+
+ * Add .gitignore for `.cmake` created by LSP on Windows. (sourcefrog,
+ https://github.com/librsync/librsync/pull/232)
+
+ * Improve documentation so that Doxygen generates more complete documentation
+ with diagrams, renders better, and is more navigable as markdown docs on
+ GitHub. (dbaarda, https://github.com/librsync/librsync/pull/230)
+
* Add github action and make targets for clang-tidy and iwyu. Added
`clang-tidy` and `iwyu` make targets for checking code and includes, and
`iwyu-fix` for fixing includes. Added `lint.yml` GitHub action to run these
diff --git a/src/buf.c b/src/buf.c
index 1e7a69d..5885d56 100644
--- a/src/buf.c
+++ b/src/buf.c
@@ -65,44 +65,34 @@ rs_result rs_infilebuf_fill(rs_job_t *job, rs_buffers_t *buf, void *opaque)
rs_filebuf_t *fb = (rs_filebuf_t *)opaque;
FILE *f = fb->f;
- /* This is only allowed if either the buf has no input buffer yet, or that
- buffer could possibly be BUF. */
- if (buf->next_in != NULL) {
- assert(buf->avail_in <= fb->buf_len);
+ /* If buf has data, it must be in the buffer. */
+ if (buf->avail_in) {
assert(buf->next_in >= fb->buf);
- assert(buf->next_in <= fb->buf + fb->buf_len);
- } else {
- assert(buf->avail_in == 0);
+ assert(buf->next_in + buf->avail_in <= fb->buf + fb->buf_len);
}
-
- if (buf->eof_in || (buf->eof_in = feof(f))) {
- rs_trace("seen end of file on input");
+ if (buf->eof_in) {
return RS_DONE;
- }
-
- if (buf->avail_in)
- /* Still some data remaining. Perhaps we should read anyhow? */
+ } else if (buf->avail_in > fb->buf_len / 2) {
+ /* Buf is already full enough, do nothing. */
return RS_DONE;
-
- len = fread(fb->buf, 1, fb->buf_len, f);
+ } else if (buf->avail_in) {
+ /* Some leftover tail data, move it to the front of the buffer. */
+ rs_trace("moving buffer " FMT_SIZE " bytes to reuse " FMT_SIZE " bytes",
+ buf->avail_in, (size_t)(buf->next_in - fb->buf));
+ memmove(fb->buf, buf->next_in, buf->avail_in);
+ }
+ buf->next_in = fb->buf;
+ len = fread(fb->buf + buf->avail_in, 1, fb->buf_len - buf->avail_in, f);
if (len == 0) {
- /* This will happen if file size is a multiple of input block len */
- if (feof(f)) {
+ if ((buf->eof_in = feof(f))) {
rs_trace("seen end of file on input");
- buf->eof_in = 1;
return RS_DONE;
- }
- if (ferror(f)) {
- rs_error("error filling buf from file: %s", strerror(errno));
- return RS_IO_ERROR;
} else {
- rs_error("no error bit, but got " FMT_SIZE
- " return when trying to read", len);
+ rs_error("error filling buf from file: %s", strerror(errno));
return RS_IO_ERROR;
}
}
- buf->avail_in = len;
- buf->next_in = fb->buf;
+ buf->avail_in += len;
job->stats.in_bytes += len;
return RS_DONE;
}
@@ -114,18 +104,15 @@ rs_result rs_outfilebuf_drain(rs_job_t *job, rs_buffers_t *buf, void *opaque)
rs_filebuf_t *fb = (rs_filebuf_t *)opaque;
FILE *f = fb->f;
- /* This is only allowed if either the buf has no output buffer yet, or that
- buffer could possibly be BUF. */
- if (buf->next_out == NULL) {
+ /* If next_out is NULL, we haven't pointed it at fb->buf yet. */
+ if (!buf->next_out) {
assert(buf->avail_out == 0);
buf->next_out = fb->buf;
buf->avail_out = fb->buf_len;
- return RS_DONE;
}
-
- assert(buf->avail_out <= fb->buf_len);
+ /* The buf output buffer must be at the end of fb->buf. */
assert(buf->next_out >= fb->buf);
- assert(buf->next_out <= fb->buf + fb->buf_len);
+ assert(buf->next_out + buf->avail_out == fb->buf + fb->buf_len);
size_t present = buf->next_out - fb->buf;
if (present > 0) {
diff --git a/src/delta.c b/src/delta.c
index ed09ca7..2d7fdbc 100644
--- a/src/delta.c
+++ b/src/delta.c
@@ -63,9 +63,9 @@
* this happens.
*
* In pysync a 'last' attribute is used to hold the last miss or match for
- * extending if possible. In this code, basis_len and scoop_pos are used
- * instead of 'last'. When basis_len > 0, last is a match. When basis_len = 0
- * and scoop_pos is > 0, last is a miss. When both are 0, last is None (ie,
+ * extending if possible. In this code, basis_len and scan_pos are used instead
+ * of 'last'. When basis_len > 0, last is a match. When basis_len = 0 and
+ * scan_pos is > 0, last is a miss. When both are 0, last is None (ie,
* nothing).
*
* Pysync is also slightly different in that a 'flush' method is available to
@@ -82,10 +82,10 @@
* data consumes it off the input scoop and outputs the processed miss data
* into the tube.
*
- * The scoop contains all data yet to be processed. The scoop_pos is an index
+ * The scoop contains all data yet to be processed. The scan_pos is an index
* into the scoop that indicates the point scanned to. As data is scanned,
- * scoop_pos is incremented. As data is processed, it is removed from the scoop
- * and scoop_pos adjusted. Everything gets complicated because the tube can
+ * scan_pos is incremented. As data is processed, it is removed from the scoop
+ * and scan_pos adjusted. Everything gets complicated because the tube can
* block. When the tube is blocked, no data can be processed. */
#include <assert.h>
@@ -94,14 +94,17 @@
#include "job.h"
#include "sumset.h"
#include "checksum.h"
-#include "stream.h"
+#include "scoop.h"
#include "emit.h"
#include "trace.h"
+/** Max length of a miss is 64K including 3 command bytes. */
+#define MAX_MISS_LEN (MAX_DELTA_CMD - 3)
+
static rs_result rs_delta_s_scan(rs_job_t *job);
static rs_result rs_delta_s_flush(rs_job_t *job);
static rs_result rs_delta_s_end(rs_job_t *job);
-static inline void rs_getinput(rs_job_t *job);
+static inline rs_result rs_getinput(rs_job_t *job, size_t block_len);
static inline int rs_findmatch(rs_job_t *job, rs_long_t *match_pos,
size_t *match_len);
static inline rs_result rs_appendmatch(rs_job_t *job, rs_long_t match_pos,
@@ -123,13 +126,14 @@ static rs_result rs_delta_s_scan(rs_job_t *job)
rs_result result;
rs_job_check(job);
- /* read the input into the scoop */
- rs_getinput(job);
/* output any pending output from the tube */
- result = rs_tube_catchup(job);
+ if ((result = rs_tube_catchup(job)) != RS_DONE)
+ return result;
+ /* read the input into the scoop */
+ if ((result = rs_getinput(job, block_len)) != RS_DONE)
+ return result;
/* while output is not blocked and there is a block of data */
- while ((result == RS_DONE)
- && ((job->scoop_pos + block_len) < job->scoop_avail)) {
+ while ((result == RS_DONE) && ((job->scan_pos + block_len) < job->scan_len)) {
/* check if this block matches */
if (rs_findmatch(job, &match_pos, &match_len)) {
/* append the match and reset the weak_sum */
@@ -137,8 +141,8 @@ static rs_result rs_delta_s_scan(rs_job_t *job)
weaksum_reset(&job->weak_sum);
} else {
/* rotate the weak_sum and append the miss byte */
- weaksum_rotate(&job->weak_sum, job->scoop_next[job->scoop_pos],
- job->scoop_next[job->scoop_pos + block_len]);
+ weaksum_rotate(&job->weak_sum, job->scan_buf[job->scan_pos],
+ job->scan_buf[job->scan_pos + block_len]);
result = rs_appendmiss(job, 1);
}
}
@@ -158,17 +162,20 @@ static rs_result rs_delta_s_scan(rs_job_t *job)
static rs_result rs_delta_s_flush(rs_job_t *job)
{
+ const size_t block_len = job->signature->block_len;
rs_long_t match_pos;
size_t match_len;
rs_result result;
rs_job_check(job);
+ /* output any pending output from the tube */
+ if ((result = rs_tube_catchup(job)) != RS_DONE)
+ return result;
/* read the input into the scoop */
- rs_getinput(job);
- /* output any pending output */
- result = rs_tube_catchup(job);
+ if ((result = rs_getinput(job, block_len)) != RS_DONE)
+ return result;
/* while output is not blocked and there is any remaining data */
- while ((result == RS_DONE) && (job->scoop_pos < job->scoop_avail)) {
+ while ((result == RS_DONE) && (job->scan_pos < job->scan_len)) {
/* check if this block matches */
if (rs_findmatch(job, &match_pos, &match_len)) {
/* append the match and reset the weak_sum */
@@ -176,7 +183,7 @@ static rs_result rs_delta_s_flush(rs_job_t *job)
weaksum_reset(&job->weak_sum);
} else {
/* rollout from weak_sum and append the miss byte */
- weaksum_rollout(&job->weak_sum, job->scoop_next[job->scoop_pos]);
+ weaksum_rollout(&job->weak_sum, job->scan_buf[job->scan_pos]);
rs_trace("block reduced to " FMT_SIZE "",
weaksum_count(&job->weak_sum));
result = rs_appendmiss(job, 1);
@@ -199,25 +206,25 @@ static rs_result rs_delta_s_end(rs_job_t *job)
return RS_DONE;
}
-static inline void rs_getinput(rs_job_t *job)
+static inline rs_result rs_getinput(rs_job_t *job, size_t block_len)
{
- size_t len;
+ size_t min_len = block_len + MAX_DELTA_CMD;
- len = rs_scoop_total_avail(job);
- if (job->scoop_avail < len) {
- rs_scoop_input(job, len);
- }
+ job->scan_len = rs_scoop_avail(job);
+ if (job->scan_len < min_len && !job->stream->eof_in)
+ job->scan_len = min_len;
+ return rs_scoop_readahead(job, job->scan_len, (void **)&job->scan_buf);
}
-/** find a match at scoop_pos, returning the match_pos and match_len.
+/** find a match at scan_pos, returning the match_pos and match_len.
*
* Note that this will calculate weak_sum if required. It will also determine
* the match_len.
*
* This routine could be modified to do xdelta style matches that would extend
* matches past block boundaries by matching backwards and forwards beyond the
- * block boundaries. Extending backwards would require decrementing scoop_pos
- * as appropriate. */
+ * block boundaries. Extending backwards would require decrementing scan_pos as
+ * appropriate. */
static inline int rs_findmatch(rs_job_t *job, rs_long_t *match_pos,
size_t *match_len)
{
@@ -226,12 +233,12 @@ static inline int rs_findmatch(rs_job_t *job, rs_long_t *match_pos,
/* calculate the weak_sum if we don't have one */
if (weaksum_count(&job->weak_sum) == 0) {
/* set match_len to min(block_len, scan_avail) */
- *match_len = job->scoop_avail - job->scoop_pos;
+ *match_len = job->scan_len - job->scan_pos;
if (*match_len > block_len) {
*match_len = block_len;
}
/* Update the weak_sum */
- weaksum_update(&job->weak_sum, job->scoop_next + job->scoop_pos,
+ weaksum_update(&job->weak_sum, job->scan_buf + job->scan_pos,
*match_len);
rs_trace("calculate weak sum from scratch length " FMT_SIZE "",
weaksum_count(&job->weak_sum));
@@ -241,7 +248,7 @@ static inline int rs_findmatch(rs_job_t *job, rs_long_t *match_pos,
}
*match_pos =
rs_signature_find_match(job->signature, weaksum_digest(&job->weak_sum),
- job->scoop_next + job->scoop_pos, *match_len);
+ job->scan_buf + job->scan_pos, *match_len);
return *match_pos != -1;
}
@@ -262,8 +269,8 @@ static inline rs_result rs_appendmatch(rs_job_t *job, rs_long_t match_pos,
job->basis_pos = match_pos;
job->basis_len = match_len;
}
- /* increment scoop_pos to point at next unscanned data */
- job->scoop_pos += match_len;
+ /* increment scan_pos to point at next unscanned data */
+ job->scan_pos += match_len;
/* we can only process from the scoop if output is not blocked */
if (result == RS_DONE) {
/* process the match data off the scoop */
@@ -279,15 +286,14 @@ static inline rs_result rs_appendmatch(rs_job_t *job, rs_long_t match_pos,
* in memory. */
static inline rs_result rs_appendmiss(rs_job_t *job, size_t miss_len)
{
- const size_t max_miss = 32768; /* For 0.01% 3 command bytes overhead. */
rs_result result = RS_DONE;
- /* If last was a match, or max_miss misses, appendflush it. */
- if (job->basis_len || (job->scoop_pos >= max_miss)) {
+ /* If last was a match, or MAX_MISS_LEN misses, appendflush it. */
+ if (job->basis_len || (job->scan_pos >= MAX_MISS_LEN)) {
result = rs_appendflush(job);
}
- /* increment scoop_pos */
- job->scoop_pos += miss_len;
+ /* increment scan_pos */
+ job->scan_pos += miss_len;
return result;
}
@@ -302,9 +308,9 @@ static inline rs_result rs_appendflush(rs_job_t *job)
job->basis_len = 0;
return rs_processmatch(job);
/* else if last is a miss, emit and process it */
- } else if (job->scoop_pos) {
- rs_trace("got " FMT_SIZE " bytes of literal data", job->scoop_pos);
- rs_emit_literal_cmd(job, (int)job->scoop_pos);
+ } else if (job->scan_pos) {
+ rs_trace("got " FMT_SIZE " bytes of literal data", job->scan_pos);
+ rs_emit_literal_cmd(job, (int)job->scan_pos);
return rs_processmiss(job);
}
/* otherwise, nothing to flush so we are done */
@@ -313,40 +319,45 @@ static inline rs_result rs_appendflush(rs_job_t *job)
/** Process matching data in the scoop.
*
- * The scoop contains match data at scoop_next of length scoop_pos. This
- * function processes that match data, returning RS_DONE if it completes, or
- * RS_BLOCKED if it gets blocked. After it completes scoop_pos is reset to
- * still point at the next unscanned data.
+ * The scoop contains match data at scan_buf of length scan_pos. This function
+ * processes that match data, returning RS_DONE if it completes, or RS_BLOCKED
+ * if it gets blocked. After it completes scan_pos is reset to still point at
+ * the next unscanned data.
*
* This function currently just removes data from the scoop and adjusts
- * scoop_pos appropriately. In the future this could be used for something like
+ * scan_pos appropriately. In the future this could be used for something like
* context compressing of miss data. Note that it also calls rs_tube_catchup to
* output any pending output. */
static inline rs_result rs_processmatch(rs_job_t *job)
{
- job->scoop_avail -= job->scoop_pos;
- job->scoop_next += job->scoop_pos;
- job->scoop_pos = 0;
+ assert(job->copy_len == 0);
+ rs_scoop_advance(job, job->scan_pos);
+ job->scan_buf += job->scan_pos;
+ job->scan_len -= job->scan_pos;
+ job->scan_pos = 0;
return rs_tube_catchup(job);
}
/** Process miss data in the scoop.
*
- * The scoop contains miss data at scoop_next of length scoop_pos. This
- * function processes that miss data, returning RS_DONE if it completes, or
- * RS_BLOCKED if it gets blocked. After it completes scoop_pos is reset to
- * still point at the next unscanned data.
+ * The scoop contains miss data at scan_buf of length scan_pos. This function
+ * processes that miss data, returning RS_DONE if it completes, or RS_BLOCKED
+ * if it gets blocked. After it completes scan_pos is reset to still point at
+ * the next unscanned data.
*
* This function uses rs_tube_copy to queue copying from the scoop into output.
* and uses rs_tube_catchup to do the copying. This automaticly removes data
* from the scoop, but this can block. While rs_tube_catchup is blocked,
- * scoop_pos does not point at legit data, so scanning can also not proceed.
+ * scan_pos does not point at legit data, so scanning can also not proceed.
*
* In the future this could do compression of miss data before outputing it. */
static inline rs_result rs_processmiss(rs_job_t *job)
{
- rs_tube_copy(job, job->scoop_pos);
- job->scoop_pos = 0;
+ assert(job->write_len > 0);
+ rs_tube_copy(job, job->scan_pos);
+ job->scan_buf += job->scan_pos;
+ job->scan_len -= job->scan_pos;
+ job->scan_pos = 0;
return rs_tube_catchup(job);
}
@@ -354,15 +365,14 @@ static inline rs_result rs_processmiss(rs_job_t *job)
* recreate the input. */
static rs_result rs_delta_s_slack(rs_job_t *job)
{
- rs_buffers_t *const stream = job->stream;
- size_t avail = stream->avail_in;
+ size_t avail = rs_scoop_avail(job);
if (avail) {
rs_trace("emit slack delta for " FMT_SIZE " available bytes", avail);
rs_emit_literal_cmd(job, (int)avail);
rs_tube_copy(job, avail);
return RS_RUNNING;
- } else if (rs_job_input_is_ending(job)) {
+ } else if (rs_scoop_eof(job)) {
job->statefn = rs_delta_s_end;
return RS_RUNNING;
}
diff --git a/src/job.c b/src/job.c
index 84b72f4..1826b0f 100644
--- a/src/job.c
+++ b/src/job.c
@@ -30,7 +30,7 @@
#include <time.h>
#include "librsync.h"
#include "job.h"
-#include "stream.h"
+#include "scoop.h"
#include "trace.h"
#include "util.h"
@@ -135,11 +135,6 @@ const rs_stats_t *rs_job_statistics(rs_job_t *job)
return &job->stats;
}
-int rs_job_input_is_ending(rs_job_t *job)
-{
- return job->stream->eof_in;
-}
-
rs_result rs_job_drive(rs_job_t *job, rs_buffers_t *buf, rs_driven_cb in_cb,
void *in_opaque, rs_driven_cb out_cb, void *out_opaque)
{
diff --git a/src/job.h b/src/job.h
index e5c8ba5..fb22461 100644
--- a/src/job.h
+++ b/src/job.h
@@ -35,6 +35,14 @@
# include "checksum.h"
# include "librsync.h"
+/** Magic job tag number for checking jobs have been initialized. */
+# define RS_JOB_TAG 20010225
+
+/** Max length of a singled delta command is including command bytes.
+ *
+ * This is used to constrain and set the internal buffer sizes. */
+# define MAX_DELTA_CMD (1<<16)
+
/** The contents of this structure are private. */
struct rs_job {
int dogtag;
@@ -85,13 +93,17 @@ struct rs_job {
rs_stats_t stats;
/** Buffer of data in the scoop. Allocation is scoop_buf[0..scoop_alloc],
- * and scoop_next[0..scoop_avail] contains data yet to be processed.
- * scoop_next[scoop_pos..scoop_avail] is the data yet to be scanned. */
- rs_byte_t *scoop_buf; /* the allocation pointer */
- rs_byte_t *scoop_next; /* the data pointer */
- size_t scoop_alloc; /* the allocation size */
- size_t scoop_avail; /* the data size */
- size_t scoop_pos; /* the scan position */
+ * and scoop_next[0..scoop_avail] contains data yet to be processed. */
+ rs_byte_t *scoop_buf; /**< The buffer allocation pointer. */
+ rs_byte_t *scoop_next; /**< The next data pointer. */
+ size_t scoop_alloc; /**< The buffer allocation size. */
+ size_t scoop_avail; /**< The amount of data available. */
+
+ /** The delta scan buffer, where scan_buf[scan_pos..scan_len] is the data
+ * yet to be scanned. */
+ rs_byte_t *scan_buf; /**< The delta scan buffer pointer. */
+ size_t scan_len; /**< The delta scan buffer length. */
+ size_t scan_pos; /**< The delta scan position. */
/** If USED is >0, then buf contains that much write data to be sent out. */
rs_byte_t write_buf[36];
@@ -111,11 +123,6 @@ struct rs_job {
rs_job_t *rs_job_new(const char *, rs_result (*statefn)(rs_job_t *));
-int rs_job_input_is_ending(rs_job_t *job);
-
-/** Magic job tag number for checking jobs have been initialized. */
-# define RS_JOB_TAG 20010225
-
/** Assert that a job is valid.
*
* We don't use a static inline function here so that assert failure output
diff --git a/src/librsync.h b/src/librsync.h
index 83fb27d..fec8fed 100644
--- a/src/librsync.h
+++ b/src/librsync.h
@@ -312,11 +312,17 @@ LIBRSYNC_EXPORT void rs_sumset_dump(rs_signature_t const *);
* entry, and suitable to be passed in on a second call, but they don't
* directly tell you how much output data was produced.
*
- * Note also that if *#avail_in is nonzero on return, then not all of the input
- * data has been consumed. The caller should either provide more output buffer
- * space and call ::rs_job_iter() again passing the same #next_in and
- * #avail_in, or put the remaining input data into some persistent buffer and
- * call rs_job_iter() with it again when there is more output space.
+ * If the input buffer was large enough, it will be processed directly,
+ * otherwise the data can be copied and accumulated into an internal buffer for
+ * processing. This means using larger buffers can be much more efficient.
+ *
+ * Note also that if #avail_in is nonzero on return, then not all of the input
+ * data has been consumed. This can happen either because it ran out of output
+ * buffer space, or because it processed as much data as possible directly from
+ * the input buffer and needs more input to proceed without copying into
+ * internal buffers. The caller should provide more output buffer space and/or
+ * pack the remaining input data into another buffer with more input before
+ * calling rs_job_iter() again.
*
* \sa rs_job_iter() */
struct rs_buffers_s {
diff --git a/src/mksum.c b/src/mksum.c
index b0fcb2c..5982bb1 100644
--- a/src/mksum.c
+++ b/src/mksum.c
@@ -31,7 +31,7 @@
#include "librsync.h"
#include "job.h"
#include "sumset.h"
-#include "stream.h"
+#include "scoop.h"
#include "netint.h"
#include "trace.h"
#include "util.h"
diff --git a/src/netint.c b/src/netint.c
index e533b19..7d38071 100644
--- a/src/netint.c
+++ b/src/netint.c
@@ -30,7 +30,7 @@
#include <assert.h>
#include "librsync.h"
#include "netint.h"
-#include "stream.h"
+#include "scoop.h"
#define RS_MAX_INT_BYTES 8
diff --git a/src/patch.c b/src/patch.c
index 5fae2bb..e288d7d 100644
--- a/src/patch.c
+++ b/src/patch.c
@@ -33,7 +33,7 @@
#include "librsync.h"
#include "job.h"
#include "netint.h"
-#include "stream.h"
+#include "scoop.h"
#include "command.h"
#include "prototab.h"
#include "trace.h"
diff --git a/src/readsums.c b/src/readsums.c
index dd32416..4f7cd62 100644
--- a/src/readsums.c
+++ b/src/readsums.c
@@ -26,7 +26,7 @@
#include "librsync.h"
#include "job.h"
#include "sumset.h"
-#include "stream.h"
+#include "scoop.h"
#include "netint.h"
#include "trace.h"
#include "util.h"
diff --git a/src/scoop.c b/src/scoop.c
index d883693..6ac3999 100644
--- a/src/scoop.c
+++ b/src/scoop.c
@@ -28,23 +28,22 @@
/** \file scoop.c
* This file deals with readahead from caller-supplied buffers.
*
- * Many functions require a certain minimum amount of input to do their
- * processing. For example, to calculate a strong checksum of a block we need
- * at least a block of input.
+ * Many functions require a certain minimum amount of contiguous input data to
+ * do their processing. For example, to calculate a strong checksum of a block
+ * we need at least a block of input.
*
* Since we put the buffers completely under the control of the caller, we
* can't count on ever getting this much data all in one go. We can't simply
* wait, because the caller might have a smaller buffer than we require and so
- * we'll never get it. For the same reason we must always accept all the data
- * we're given.
+ * we'll never get it.
*
- * So, stream input data that's required for readahead is put into a special
- * buffer, from which the caller can then read. It's essentially like an
- * internal pipe, which on any given read request may or may not be able to
- * actually supply the data.
- *
- * As a future optimization, we might try to take data directly from the input
- * buffer if there's already enough there.
+ * Stream input data is used directly if there is sufficient data to satisfy
+ * the readhead requests, otherwise it is copied and accumulated into an
+ * internal buffer until there is enough. This means for large input buffers we
+ * can leave a "tail" of unprocessed data in the input buffer, and only consume
+ * all the data if it was too small and start accumulating into the internal
+ * buffer. Provided the input buffers always have enough data we avoid copying
+ * into the internal buffer at all.
*
* \todo We probably know a maximum amount of data that can be scooped up, so
* we could just avoid dynamic allocation. However that can't be fixed at
@@ -58,12 +57,12 @@
#include <string.h>
#include "librsync.h"
#include "job.h"
-#include "stream.h"
+#include "scoop.h"
#include "trace.h"
#include "util.h"
/** Try to accept a from the input buffer to get LEN bytes in the scoop. */
-void rs_scoop_input(rs_job_t *job, size_t len)
+static inline void rs_scoop_input(rs_job_t *job, size_t len)
{
rs_buffers_t *stream = job->stream;
size_t tocopy;
@@ -84,7 +83,7 @@ void rs_scoop_input(rs_job_t *job, size_t len)
rs_trace("resized scoop buffer to " FMT_SIZE " bytes from " FMT_SIZE "",
newsize, job->scoop_alloc);
job->scoop_alloc = newsize;
- } else if (job->scoop_buf != job->scoop_next) {
+ } else if (job->scoop_buf + job->scoop_alloc < job->scoop_next + len) {
/* Move existing data to the front of the scoop. */
rs_trace("moving scoop " FMT_SIZE " bytes to reuse " FMT_SIZE " bytes",
job->scoop_avail, (size_t)(job->scoop_next - job->scoop_buf));
@@ -96,7 +95,8 @@ void rs_scoop_input(rs_job_t *job, size_t len)
tocopy = len - job->scoop_avail;
if (tocopy > stream->avail_in)
tocopy = stream->avail_in;
- assert(tocopy + job->scoop_avail <= job->scoop_alloc);
+ assert(job->scoop_next + tocopy + job->scoop_avail <=
+ job->scoop_buf + job->scoop_alloc);
memcpy(job->scoop_next + job->scoop_avail, stream->next_in, tocopy);
rs_trace("accepted " FMT_SIZE " bytes from input to scoop", tocopy);
@@ -210,20 +210,11 @@ rs_result rs_scoop_read(rs_job_t *job, size_t len, void **ptr)
* at EOF, RS_BLOCKED if there was no data and not at EOF. */
rs_result rs_scoop_read_rest(rs_job_t *job, size_t *len, void **ptr)
{
- rs_buffers_t *stream = job->stream;
-
- *len = job->scoop_avail + stream->avail_in;
+ *len = rs_scoop_avail(job);
if (*len)
return rs_scoop_read(job, *len, ptr);
- else if (stream->eof_in)
+ else if (job->stream->eof_in)
return RS_INPUT_ENDED;
else
return RS_BLOCKED;
}
-
-/** Return the total number of bytes available including the scoop and input
- * buffer. */
-size_t rs_scoop_total_avail(rs_job_t *job)
-{
- return job->scoop_avail + job->stream->avail_in;
-}
diff --git a/src/stream.h b/src/scoop.h
index be6e286..4d4fde5 100644
--- a/src/stream.h
+++ b/src/scoop.h
@@ -27,7 +27,7 @@
| And sons who died on the Burma Railway.
*/
-/** \file stream.h
+/** \file scoop.h
* Manage librsync streams of IO.
*
* See \sa scoop.c and \sa tube.c for related code for input and output
@@ -52,12 +52,13 @@
*
* In buf.c you will find functions that then map buffers onto stdio files.
*
- * So on return from an encoding function, either the input or the output or
- * possibly both will have no more bytes available.
+ * On return from an encoding function, some input will have been consumed
+ * and/or some output produced, but either the input or the output or possibly
+ * both could have some remaining bytes available.
*
- * librsync never does IO or memory allocation, but relies on the caller. This
- * is very nice for integration, but means that we have to be fairly flexible
- * as to when we can `read' or `write' stuff internally.
+ * librsync never does IO or memory allocation for stream input, but relies on
+ * the caller. This is very nice for integration, but means that we have to be
+ * fairly flexible as to when we can `read' or `write' stuff internally.
*
* librsync basically does two types of IO. It reads network integers of
* various lengths which encode command and control information such as
@@ -73,21 +74,111 @@
* On each call into a stream iterator, it should begin by trying to flush
* output. This may well use up all the remaining stream space, in which case
* nothing else can be done. */
-#ifndef STREAM_H
-# define STREAM_H
-
-size_t rs_buffers_copy(rs_buffers_t *stream, size_t len);
+#ifndef SCOOP_H
+# define SCOOP_H
+# include <stdbool.h>
+# include <stddef.h>
+# include "job.h"
+# include "librsync.h"
rs_result rs_tube_catchup(rs_job_t *job);
int rs_tube_is_idle(rs_job_t const *job);
void rs_tube_write(rs_job_t *job, void const *buf, size_t len);
void rs_tube_copy(rs_job_t *job, size_t len);
-void rs_scoop_input(rs_job_t *job, size_t len);
void rs_scoop_advance(rs_job_t *job, size_t len);
rs_result rs_scoop_readahead(rs_job_t *job, size_t len, void **ptr);
rs_result rs_scoop_read(rs_job_t *job, size_t len, void **ptr);
rs_result rs_scoop_read_rest(rs_job_t *job, size_t *len, void **ptr);
-size_t rs_scoop_total_avail(rs_job_t *job);
-#endif /* !STREAM_H */
+static inline size_t rs_scoop_avail(rs_job_t *job)
+{
+ return job->scoop_avail + job->stream->avail_in;
+}
+
+/** Test if the scoop has reached eof. */
+static inline bool rs_scoop_eof(rs_job_t *job)
+{
+ return !rs_scoop_avail(job) && job->stream->eof_in;
+}
+
+/** Get a pointer to the next input in the scoop. */
+static inline void *rs_scoop_buf(rs_job_t *job)
+{
+ return job->scoop_avail ? (void *)job->scoop_next : (void *)job->
+ stream->next_in;
+}
+
+/** Get the contiguous length of the next input in the scoop. */
+static inline size_t rs_scoop_len(rs_job_t *job)
+{
+ return job->scoop_avail ? job->scoop_avail : job->stream->avail_in;
+}
+
+/** Get the next contiguous buffer of data available in the scoop.
+ *
+ * This will return a pointer to the data and reduce len to the amount of
+ * contiguous data available at that position.
+ *
+ * \param *job - the job instance to use.
+ *
+ * \param *len - the amount of data desired, updated to the amount available.
+ *
+ * \return A pointer to the data. */
+static inline void *rs_scoop_getbuf(rs_job_t *job, size_t *len)
+{
+ size_t max_len = rs_scoop_len(job);
+ if (*len > max_len)
+ *len = max_len;
+ return rs_scoop_buf(job);
+}
+
+/** Iterate through and consume contiguous data buffers in the scoop.
+ *
+ * Example: \code
+ * size_t len=rs_scoop_avail(job);
+ * size_t ilen;
+ *
+ * for (buf = rs_scoop_iterbuf(job, &len, &ilen); ilen > 0;
+ * buf = rs_scoop_nextbuf(job, &len, &ilen))
+ * ilen = fwrite(buf, ilen, 1, f);
+ * \endcode
+ *
+ * At each iteration buf and ilen are the data and its length for the current
+ * iteration, and len is the remaining data to iterate through including the
+ * current iteration. During an iteration you can change ilen to indicate only
+ * part of the buffer was processed and the next iteration will take this into
+ * account. Setting ilen = 0 to indicate blocking or errors will terminate
+ * iteration.
+ *
+ * At the end of iteration buf will point at the next location in the scoop
+ * after the iterated data, len and ilen will be zero, or the remaining data
+ * and last ilen if iteration was terminated by setting ilen = 0.
+ *
+ * \param *job - the job instance to use.
+ *
+ * \param *len - the size_t amount of data to iterate over.
+ *
+ * \param *ilen - the size_t amount of data in the current iteration.
+ *
+ * \return A pointer to data in the current iteration. */
+static inline void *rs_scoop_iterbuf(rs_job_t *job, size_t *len, size_t *ilen)
+{
+ *ilen = *len;
+ return rs_scoop_getbuf(job, ilen);
+}
+
+/** Get the next iteration of contiguous data buffers from the scoop.
+ *
+ * This advances the scoop for the previous iteration, and gets the next
+ * iteration. \sa rs_scoop_iterbuf */
+static inline void *rs_scoop_nextbuf(rs_job_t *job, size_t *len, size_t *ilen)
+{
+ if (*ilen == 0)
+ return rs_scoop_buf(job);
+ rs_scoop_advance(job, *ilen);
+ *len -= *ilen;
+ return rs_scoop_iterbuf(job, len, ilen);
+}
+
+#endif /* !SCOOP_H */
diff --git a/src/tube.c b/src/tube.c
index a6672d4..8faaec5 100644
--- a/src/tube.c
+++ b/src/tube.c
@@ -56,7 +56,7 @@
#include <string.h>
#include "librsync.h"
#include "job.h"
-#include "stream.h"
+#include "scoop.h"
#include "trace.h"
static void rs_tube_catchup_write(rs_job_t *job)
@@ -80,75 +80,36 @@ static void rs_tube_catchup_write(rs_job_t *job)
len, job->write_len);
}
-/** Execute a copy command, taking data from the scoop.
- *
- * \sa rs_tube_catchup_copy() */
-static void rs_tube_copy_from_scoop(rs_job_t *job)
-{
- rs_buffers_t *stream = job->stream;
- size_t len = job->copy_len;
-
- assert(len > 0);
- if (len > job->scoop_avail)
- len = job->scoop_avail;
- if (len > stream->avail_out)
- len = stream->avail_out;
- if (len) {
- memcpy(stream->next_out, job->scoop_next, len);
- stream->next_out += len;
- stream->avail_out -= len;
- job->scoop_avail -= len;
- job->scoop_next += len;
- job->copy_len -= len;
- }
- rs_trace("copied " FMT_SIZE " bytes from scoop, " FMT_SIZE
- " left in scoop, " FMT_SIZE " left to copy", len, job->scoop_avail,
- job->copy_len);
-}
-
-/** Execute a copy command, taking data from the stream.
- *
- * \sa rs_tube_catchup_copy() */
-static void rs_tube_copy_from_stream(rs_job_t *job)
-{
- rs_buffers_t *stream = job->stream;
- size_t len = job->copy_len;
-
- assert(len > 0);
- if (len > stream->avail_in)
- len = stream->avail_in;
- if (len > stream->avail_out)
- len = stream->avail_out;
- if (len) {
- memcpy(stream->next_out, stream->next_in, len);
- stream->next_out += len;
- stream->avail_out -= len;
- stream->next_in += len;
- stream->avail_in -= len;
- job->copy_len -= len;
- }
- rs_trace("copied " FMT_SIZE " bytes from stream, " FMT_SIZE
- "left in stream, " FMT_SIZE " left to copy", len, stream->avail_in,
- job->copy_len);
-}
-
/** Catch up on an outstanding copy command.
*
- * Takes data from the scoop, and the input (in that order), and writes as much
- * as will fit to the output, up to the limit of the outstanding copy. */
+ * Takes data from the scoop and writes as much as will fit to the output, up
+ * to the limit of the outstanding copy. */
static void rs_tube_catchup_copy(rs_job_t *job)
{
assert(job->write_len == 0);
assert(job->copy_len > 0);
-
- /* If there's data in the scoop, send that first. */
- if (job->scoop_avail && job->copy_len) {
- rs_tube_copy_from_scoop(job);
- }
- /* If there's more to copy and we emptied the scoop, send input. */
- if (job->copy_len && !job->scoop_avail) {
- rs_tube_copy_from_stream(job);
+ rs_buffers_t *stream = job->stream;
+ size_t copy_len = job->copy_len;
+ size_t avail_in = rs_scoop_avail(job);
+ size_t avail_out = stream->avail_out;
+ size_t len, ilen;
+ void *next;
+
+ if (copy_len > avail_in)
+ copy_len = avail_in;
+ if (copy_len > avail_out)
+ copy_len = avail_out;
+ len = copy_len;
+ for (next = rs_scoop_iterbuf(job, &len, &ilen); ilen > 0;
+ next = rs_scoop_nextbuf(job, &len, &ilen)) {
+ memcpy(stream->next_out, next, ilen);
+ stream->next_out += ilen;
+ stream->avail_out -= ilen;
+ job->copy_len -= ilen;
}
+ rs_trace("copied " FMT_SIZE " bytes from scoop, " FMT_SIZE
+ " left in scoop, " FMT_SIZE " left to copy", copy_len,
+ rs_scoop_avail(job), job->copy_len);
}
/** Put whatever will fit from the tube into the output of the stream.
@@ -166,8 +127,7 @@ rs_result rs_tube_catchup(rs_job_t *job)
if (job->copy_len) {
rs_tube_catchup_copy(job);
if (job->copy_len) {
- if (job->stream->eof_in && !job->stream->avail_in
- && !job->scoop_avail) {
+ if (rs_scoop_eof(job)) {
rs_error("reached end of file while copying data");
return RS_INPUT_ENDED;
}
diff --git a/src/whole.c b/src/whole.c
index 04777f8..e667f88 100644
--- a/src/whole.c
+++ b/src/whole.c
@@ -111,9 +111,9 @@ rs_result rs_delta_file(rs_signature_t *sig, FILE *new_file, FILE *delta_file,
rs_result r;
job = rs_delta_begin(sig);
- /* Size inbuf for 1 block, outbuf for literal cmd + 4 blocks. */
- r = rs_whole_run(job, new_file, delta_file, sig->block_len,
- 10 + 4 * sig->block_len);
+ /* Size inbuf for 4*(CMD + 1 block), outbuf for 4*CMD. */
+ r = rs_whole_run(job, new_file, delta_file,
+ 4 * (MAX_DELTA_CMD + sig->block_len), 4 * MAX_DELTA_CMD);
if (stats)
memcpy(stats, &job->stats, sizeof *stats);
rs_job_free(job);
@@ -127,8 +127,9 @@ rs_result rs_patch_file(FILE *basis_file, FILE *delta_file, FILE *new_file,
rs_result r;
job = rs_patch_begin(rs_file_copy_cb, basis_file);
- /* Default size inbuf and outbuf 64K. */
- r = rs_whole_run(job, delta_file, new_file, 64 * 1024, 64 * 1024);
+ /* Default size inbuf 1*CMD and outbuf 4*CMD. */
+ r = rs_whole_run(job, delta_file, new_file, MAX_DELTA_CMD,
+ 4 * MAX_DELTA_CMD);
if (stats)
memcpy(stats, &job->stats, sizeof *stats);
rs_job_free(job);