summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDonovan Baarda <abo@minkirri.apana.org.au>2021-09-14 13:09:22 +1000
committerDonovan Baarda <abo@minkirri.apana.org.au>2021-09-14 13:09:22 +1000
commitf330d5e50c904fe1f732471bc52c2b2af4bbd818 (patch)
treec64aedb76cb69460ec8d5825ff7f5b2f578379eb
parent32d824c77e6c534c78eafdf28251d5036309c8c8 (diff)
downloadlibrsync-f330d5e50c904fe1f732471bc52c2b2af4bbd818.tar.gz
Make delta directly process the input stream if it has enough data.
This means we only accumulate data into the scoop buffer if the input stream is too small, otherwise we process directly from the input stream. In job.h rename scoop_pos to scan_pos and add scan_buf and scan_len for pointing at the curren scan data, which can be either in the input stream or the scoop buffer. Also give the scoop and scan fields proper doxygen comments. In delta.c use scan_pos, scan_buf, and scan_len instead of scoop_pos, scoop_next, and scoop_avail respectively. Change rs_getinput() to return an rs_result_t and take the block_len as an argument, and have it set scan_buf and scan_len using rs_scoop_readhead() to get at least enough data to scan and emit a full miss literal command. Change rs_delta_s_scan() and rs_delta_s_flush() to do rs_tube_catchup() before rs_getinput() to consume any literal data off the scoop buffer before refilling it. Change the MAX_MISS_LEN to 64K - 3 cmd bytes from 32K. In whole.c change rs_delta_file() to use buffers large enough for 4x 64K literal commands, which is large enough to scan without copying into the scoop buffer.
-rw-r--r--src/delta.c123
-rw-r--r--src/job.h18
-rw-r--r--src/whole.c6
3 files changed, 81 insertions, 66 deletions
diff --git a/src/delta.c b/src/delta.c
index bee6663..fcfcf5a 100644
--- a/src/delta.c
+++ b/src/delta.c
@@ -63,9 +63,9 @@
* this happens.
*
* In pysync a 'last' attribute is used to hold the last miss or match for
- * extending if possible. In this code, basis_len and scoop_pos are used
- * instead of 'last'. When basis_len > 0, last is a match. When basis_len = 0
- * and scoop_pos is > 0, last is a miss. When both are 0, last is None (ie,
+ * extending if possible. In this code, basis_len and scan_pos are used instead
+ * of 'last'. When basis_len > 0, last is a match. When basis_len = 0 and
+ * scan_pos is > 0, last is a miss. When both are 0, last is None (ie,
* nothing).
*
* Pysync is also slightly different in that a 'flush' method is available to
@@ -82,10 +82,10 @@
* data consumes it off the input scoop and outputs the processed miss data
* into the tube.
*
- * The scoop contains all data yet to be processed. The scoop_pos is an index
+ * The scoop contains all data yet to be processed. The scan_pos is an index
* into the scoop that indicates the point scanned to. As data is scanned,
- * scoop_pos is incremented. As data is processed, it is removed from the scoop
- * and scoop_pos adjusted. Everything gets complicated because the tube can
+ * scan_pos is incremented. As data is processed, it is removed from the scoop
+ * and scan_pos adjusted. Everything gets complicated because the tube can
* block. When the tube is blocked, no data can be processed. */
#include <assert.h>
@@ -98,10 +98,13 @@
#include "emit.h"
#include "trace.h"
+/** Max length of a miss is 64K including 3 command bytes. */
+#define MAX_MISS_LEN ((1<<16)-3)
+
static rs_result rs_delta_s_scan(rs_job_t *job);
static rs_result rs_delta_s_flush(rs_job_t *job);
static rs_result rs_delta_s_end(rs_job_t *job);
-static inline void rs_getinput(rs_job_t *job);
+static inline rs_result rs_getinput(rs_job_t *job, size_t block_len);
static inline int rs_findmatch(rs_job_t *job, rs_long_t *match_pos,
size_t *match_len);
static inline rs_result rs_appendmatch(rs_job_t *job, rs_long_t match_pos,
@@ -123,13 +126,14 @@ static rs_result rs_delta_s_scan(rs_job_t *job)
rs_result result;
rs_job_check(job);
- /* read the input into the scoop */
- rs_getinput(job);
/* output any pending output from the tube */
- result = rs_tube_catchup(job);
+ if ((result = rs_tube_catchup(job)) != RS_DONE)
+ return result;
+ /* read the input into the scoop */
+ if ((result = rs_getinput(job, block_len)) != RS_DONE)
+ return result;
/* while output is not blocked and there is a block of data */
- while ((result == RS_DONE)
- && ((job->scoop_pos + block_len) < job->scoop_avail)) {
+ while ((result == RS_DONE) && ((job->scan_pos + block_len) < job->scan_len)) {
/* check if this block matches */
if (rs_findmatch(job, &match_pos, &match_len)) {
/* append the match and reset the weak_sum */
@@ -137,8 +141,8 @@ static rs_result rs_delta_s_scan(rs_job_t *job)
weaksum_reset(&job->weak_sum);
} else {
/* rotate the weak_sum and append the miss byte */
- weaksum_rotate(&job->weak_sum, job->scoop_next[job->scoop_pos],
- job->scoop_next[job->scoop_pos + block_len]);
+ weaksum_rotate(&job->weak_sum, job->scan_buf[job->scan_pos],
+ job->scan_buf[job->scan_pos + block_len]);
result = rs_appendmiss(job, 1);
}
}
@@ -158,17 +162,20 @@ static rs_result rs_delta_s_scan(rs_job_t *job)
static rs_result rs_delta_s_flush(rs_job_t *job)
{
+ const size_t block_len = job->signature->block_len;
rs_long_t match_pos;
size_t match_len;
rs_result result;
rs_job_check(job);
+ /* output any pending output from the tube */
+ if ((result = rs_tube_catchup(job)) != RS_DONE)
+ return result;
/* read the input into the scoop */
- rs_getinput(job);
- /* output any pending output */
- result = rs_tube_catchup(job);
+ if ((result = rs_getinput(job, block_len)) != RS_DONE)
+ return result;
/* while output is not blocked and there is any remaining data */
- while ((result == RS_DONE) && (job->scoop_pos < job->scoop_avail)) {
+ while ((result == RS_DONE) && (job->scan_pos < job->scan_len)) {
/* check if this block matches */
if (rs_findmatch(job, &match_pos, &match_len)) {
/* append the match and reset the weak_sum */
@@ -176,7 +183,7 @@ static rs_result rs_delta_s_flush(rs_job_t *job)
weaksum_reset(&job->weak_sum);
} else {
/* rollout from weak_sum and append the miss byte */
- weaksum_rollout(&job->weak_sum, job->scoop_next[job->scoop_pos]);
+ weaksum_rollout(&job->weak_sum, job->scan_buf[job->scan_pos]);
rs_trace("block reduced to " FMT_SIZE "",
weaksum_count(&job->weak_sum));
result = rs_appendmiss(job, 1);
@@ -199,25 +206,25 @@ static rs_result rs_delta_s_end(rs_job_t *job)
return RS_DONE;
}
-static inline void rs_getinput(rs_job_t *job)
+static inline rs_result rs_getinput(rs_job_t *job, size_t block_len)
{
- size_t len;
+ size_t min_len = block_len + MAX_MISS_LEN + 1;
- len = rs_scoop_avail(job);
- if (job->scoop_avail < len) {
- rs_scoop_input(job, len);
- }
+ job->scan_len = rs_scoop_avail(job);
+ if (job->scan_len < min_len && !job->stream->eof_in)
+ job->scan_len = min_len;
+ return rs_scoop_readahead(job, job->scan_len, (void **)&job->scan_buf);
}
-/** find a match at scoop_pos, returning the match_pos and match_len.
+/** find a match at scan_pos, returning the match_pos and match_len.
*
* Note that this will calculate weak_sum if required. It will also determine
* the match_len.
*
* This routine could be modified to do xdelta style matches that would extend
* matches past block boundaries by matching backwards and forwards beyond the
- * block boundaries. Extending backwards would require decrementing scoop_pos
- * as appropriate. */
+ * block boundaries. Extending backwards would require decrementing scan_pos as
+ * appropriate. */
static inline int rs_findmatch(rs_job_t *job, rs_long_t *match_pos,
size_t *match_len)
{
@@ -226,12 +233,12 @@ static inline int rs_findmatch(rs_job_t *job, rs_long_t *match_pos,
/* calculate the weak_sum if we don't have one */
if (weaksum_count(&job->weak_sum) == 0) {
/* set match_len to min(block_len, scan_avail) */
- *match_len = job->scoop_avail - job->scoop_pos;
+ *match_len = job->scan_len - job->scan_pos;
if (*match_len > block_len) {
*match_len = block_len;
}
/* Update the weak_sum */
- weaksum_update(&job->weak_sum, job->scoop_next + job->scoop_pos,
+ weaksum_update(&job->weak_sum, job->scan_buf + job->scan_pos,
*match_len);
rs_trace("calculate weak sum from scratch length " FMT_SIZE "",
weaksum_count(&job->weak_sum));
@@ -241,7 +248,7 @@ static inline int rs_findmatch(rs_job_t *job, rs_long_t *match_pos,
}
*match_pos =
rs_signature_find_match(job->signature, weaksum_digest(&job->weak_sum),
- job->scoop_next + job->scoop_pos, *match_len);
+ job->scan_buf + job->scan_pos, *match_len);
return *match_pos != -1;
}
@@ -262,8 +269,8 @@ static inline rs_result rs_appendmatch(rs_job_t *job, rs_long_t match_pos,
job->basis_pos = match_pos;
job->basis_len = match_len;
}
- /* increment scoop_pos to point at next unscanned data */
- job->scoop_pos += match_len;
+ /* increment scan_pos to point at next unscanned data */
+ job->scan_pos += match_len;
/* we can only process from the scoop if output is not blocked */
if (result == RS_DONE) {
/* process the match data off the scoop */
@@ -279,15 +286,14 @@ static inline rs_result rs_appendmatch(rs_job_t *job, rs_long_t match_pos,
* in memory. */
static inline rs_result rs_appendmiss(rs_job_t *job, size_t miss_len)
{
- const size_t max_miss = 32768; /* For 0.01% 3 command bytes overhead. */
rs_result result = RS_DONE;
- /* If last was a match, or max_miss misses, appendflush it. */
- if (job->basis_len || (job->scoop_pos >= max_miss)) {
+ /* If last was a match, or MAX_MISS_LEN misses, appendflush it. */
+ if (job->basis_len || (job->scan_pos >= MAX_MISS_LEN)) {
result = rs_appendflush(job);
}
- /* increment scoop_pos */
- job->scoop_pos += miss_len;
+ /* increment scan_pos */
+ job->scan_pos += miss_len;
return result;
}
@@ -302,9 +308,9 @@ static inline rs_result rs_appendflush(rs_job_t *job)
job->basis_len = 0;
return rs_processmatch(job);
/* else if last is a miss, emit and process it */
- } else if (job->scoop_pos) {
- rs_trace("got " FMT_SIZE " bytes of literal data", job->scoop_pos);
- rs_emit_literal_cmd(job, (int)job->scoop_pos);
+ } else if (job->scan_pos) {
+ rs_trace("got " FMT_SIZE " bytes of literal data", job->scan_pos);
+ rs_emit_literal_cmd(job, (int)job->scan_pos);
return rs_processmiss(job);
}
/* otherwise, nothing to flush so we are done */
@@ -313,40 +319,45 @@ static inline rs_result rs_appendflush(rs_job_t *job)
/** Process matching data in the scoop.
*
- * The scoop contains match data at scoop_next of length scoop_pos. This
- * function processes that match data, returning RS_DONE if it completes, or
- * RS_BLOCKED if it gets blocked. After it completes scoop_pos is reset to
- * still point at the next unscanned data.
+ * The scoop contains match data at scan_buf of length scan_pos. This function
+ * processes that match data, returning RS_DONE if it completes, or RS_BLOCKED
+ * if it gets blocked. After it completes scan_pos is reset to still point at
+ * the next unscanned data.
*
* This function currently just removes data from the scoop and adjusts
- * scoop_pos appropriately. In the future this could be used for something like
+ * scan_pos appropriately. In the future this could be used for something like
* context compressing of miss data. Note that it also calls rs_tube_catchup to
* output any pending output. */
static inline rs_result rs_processmatch(rs_job_t *job)
{
- job->scoop_avail -= job->scoop_pos;
- job->scoop_next += job->scoop_pos;
- job->scoop_pos = 0;
+ assert(job->copy_len == 0);
+ rs_scoop_advance(job, job->scan_pos);
+ job->scan_buf += job->scan_pos;
+ job->scan_len -= job->scan_pos;
+ job->scan_pos = 0;
return rs_tube_catchup(job);
}
/** Process miss data in the scoop.
*
- * The scoop contains miss data at scoop_next of length scoop_pos. This
- * function processes that miss data, returning RS_DONE if it completes, or
- * RS_BLOCKED if it gets blocked. After it completes scoop_pos is reset to
- * still point at the next unscanned data.
+ * The scoop contains miss data at scan_buf of length scan_pos. This function
+ * processes that miss data, returning RS_DONE if it completes, or RS_BLOCKED
+ * if it gets blocked. After it completes scan_pos is reset to still point at
+ * the next unscanned data.
*
* This function uses rs_tube_copy to queue copying from the scoop into output.
* and uses rs_tube_catchup to do the copying. This automaticly removes data
* from the scoop, but this can block. While rs_tube_catchup is blocked,
- * scoop_pos does not point at legit data, so scanning can also not proceed.
+ * scan_pos does not point at legit data, so scanning can also not proceed.
*
* In the future this could do compression of miss data before outputing it. */
static inline rs_result rs_processmiss(rs_job_t *job)
{
- rs_tube_copy(job, job->scoop_pos);
- job->scoop_pos = 0;
+ assert(job->write_len > 0);
+ rs_tube_copy(job, job->scan_pos);
+ job->scan_buf += job->scan_pos;
+ job->scan_len -= job->scan_pos;
+ job->scan_pos = 0;
return rs_tube_catchup(job);
}
diff --git a/src/job.h b/src/job.h
index 95782bf..b9d9ae7 100644
--- a/src/job.h
+++ b/src/job.h
@@ -85,13 +85,17 @@ struct rs_job {
rs_stats_t stats;
/** Buffer of data in the scoop. Allocation is scoop_buf[0..scoop_alloc],
- * and scoop_next[0..scoop_avail] contains data yet to be processed.
- * scoop_next[scoop_pos..scoop_avail] is the data yet to be scanned. */
- rs_byte_t *scoop_buf; /* the allocation pointer */
- rs_byte_t *scoop_next; /* the data pointer */
- size_t scoop_alloc; /* the allocation size */
- size_t scoop_avail; /* the data size */
- size_t scoop_pos; /* the scan position */
+ * and scoop_next[0..scoop_avail] contains data yet to be processed. */
+ rs_byte_t *scoop_buf; /**< The buffer allocation pointer. */
+ rs_byte_t *scoop_next; /**< The next data pointer. */
+ size_t scoop_alloc; /**< The buffer allocation size. */
+ size_t scoop_avail; /**< The amount of data available. */
+
+ /** The delta scan buffer, where scan_buf[scan_pos..scan_len] is the data
+ * yet to be scanned. */
+ rs_byte_t *scan_buf; /**< The delta scan buffer pointer. */
+ size_t scan_len; /**< The delta scan buffer length. */
+ size_t scan_pos; /**< The delta scan position. */
/** If USED is >0, then buf contains that much write data to be sent out. */
rs_byte_t write_buf[36];
diff --git a/src/whole.c b/src/whole.c
index 04777f8..ec44967 100644
--- a/src/whole.c
+++ b/src/whole.c
@@ -111,9 +111,9 @@ rs_result rs_delta_file(rs_signature_t *sig, FILE *new_file, FILE *delta_file,
rs_result r;
job = rs_delta_begin(sig);
- /* Size inbuf for 1 block, outbuf for literal cmd + 4 blocks. */
- r = rs_whole_run(job, new_file, delta_file, sig->block_len,
- 10 + 4 * sig->block_len);
+ /* Size inbuf for 4*(64K + 1 block), outbuf for 4*(64K). */
+ r = rs_whole_run(job, new_file, delta_file,
+ 4 * ((1 << 16) + sig->block_len), 4 * (1 << 16));
if (stats)
memcpy(stats, &job->stats, sizeof *stats);
rs_job_free(job);