summaryrefslogtreecommitdiff
path: root/src/rep
diff options
context:
space:
mode:
authorLorry Tar Creator <lorry-tar-importer@baserock.org>2015-02-17 17:25:57 +0000
committer <>2015-03-17 16:26:24 +0000
commit780b92ada9afcf1d58085a83a0b9e6bc982203d1 (patch)
tree598f8b9fa431b228d29897e798de4ac0c1d3d970 /src/rep
parent7a2660ba9cc2dc03a69ddfcfd95369395cc87444 (diff)
downloadberkeleydb-master.tar.gz
Imported from /home/lorry/working-area/delta_berkeleydb/db-6.1.23.tar.gz.HEADdb-6.1.23master
Diffstat (limited to 'src/rep')
-rw-r--r--src/rep/mlease.html2
-rw-r--r--src/rep/rep.msg70
-rw-r--r--src/rep/rep_automsg.c467
-rw-r--r--src/rep/rep_backup.c2148
-rw-r--r--src/rep/rep_elect.c74
-rw-r--r--src/rep/rep_lease.c30
-rw-r--r--src/rep/rep_log.c39
-rw-r--r--src/rep/rep_method.c615
-rw-r--r--src/rep/rep_record.c315
-rw-r--r--src/rep/rep_region.c164
-rw-r--r--src/rep/rep_stat.c76
-rw-r--r--src/rep/rep_stub.c18
-rw-r--r--src/rep/rep_util.c568
-rw-r--r--src/rep/rep_verify.c27
14 files changed, 4165 insertions, 448 deletions
diff --git a/src/rep/mlease.html b/src/rep/mlease.html
index 7d44b465..4e82f63c 100644
--- a/src/rep/mlease.html
+++ b/src/rep/mlease.html
@@ -1,5 +1,5 @@
<!DOCTYPE doctype PUBLIC "-//w3c//dtd html 4.0 transitional//en">
-<!--Copyright (c) 2011, 2012 Oracle and/or its affiliates. All rights reserved.-->
+<!--Copyright (c) 2011, 2015 Oracle and/or its affiliates. All rights reserved.-->
<html>
<head>
<meta http-equiv="Content-Type"
diff --git a/src/rep/rep.msg b/src/rep/rep.msg
index b751a64d..d5c56d93 100644
--- a/src/rep/rep.msg
+++ b/src/rep/rep.msg
@@ -1,7 +1,7 @@
/*-
* See the file LICENSE for redistribution information.
*
- * Copyright (c) 2001, 2012 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2001, 2015 Oracle and/or its affiliates. All rights reserved.
*
* $Id$
*/
@@ -57,7 +57,22 @@ ARG pgsize u_int32_t
ARG pgno db_pgno_t
ARG max_pgno db_pgno_t
ARG filenum u_int32_t
-ARG finfo_flags u_int32_t
+ARG finfo_flags u_int32_t
+ARG type u_int32_t
+ARG db_flags u_int32_t
+ARG uid DBT
+ARG info DBT
+ARG dir DBT
+ARG blob_fid_lo u_int32_t
+ARG blob_fid_hi u_int32_t
+END
+
+BEGIN_MSG fileinfo_v7 alloc check_length version
+ARG pgsize u_int32_t
+ARG pgno db_pgno_t
+ARG max_pgno db_pgno_t
+ARG filenum u_int32_t
+ARG finfo_flags u_int32_t
ARG type u_int32_t
ARG db_flags u_int32_t
ARG uid DBT
@@ -158,3 +173,54 @@ ARG lsn DB_LSN
ARG hist_sec u_int32_t
ARG hist_nsec u_int32_t
END
+
+/*
+ * Request for blob files.
+ */
+BEGIN_MSG blob_update_req
+ARG blob_fid u_int64_t
+ARG blob_sid u_int64_t
+ARG blob_id u_int64_t
+ARG highest_id u_int64_t
+END
+
+/*
+ * A list of blob file for a database.
+ */
+BEGIN_MSG blob_update
+ARG blob_fid u_int64_t
+ARG highest_id u_int64_t
+ARG flags u_int32_t
+ARG num_blobs u_int32_t
+END
+
+/*
+ * Blob file description, part of blob_update.
+ */
+BEGIN_MSG blob_file
+ARG blob_sid u_int64_t
+ARG blob_id u_int64_t
+ARG blob_size u_int64_t
+END
+
+/*
+ * A piece of data from a blob file.
+ */
+BEGIN_MSG blob_chunk
+ARG flags u_int32_t
+ARG blob_fid u_int64_t
+ARG blob_sid u_int64_t
+ARG blob_id u_int64_t
+ARG offset u_int64_t
+ARG data DBT
+END
+
+/*
+ * Request for data from a blob file at the given offset.
+ */
+BEGIN_MSG blob_chunk_req
+ARG blob_fid u_int64_t
+ARG blob_sid u_int64_t
+ARG blob_id u_int64_t
+ARG offset u_int64_t
+END
diff --git a/src/rep/rep_automsg.c b/src/rep/rep_automsg.c
index 5d8155fb..cab68b3e 100644
--- a/src/rep/rep_automsg.c
+++ b/src/rep/rep_automsg.c
@@ -280,6 +280,16 @@ __rep_fileinfo_marshal(env, version, argp, bp, max, lenp)
memcpy(bp, argp->dir.data, argp->dir.size);
bp += argp->dir.size;
}
+ if (copy_only) {
+ memcpy(bp, &argp->blob_fid_lo, sizeof(u_int32_t));
+ bp += sizeof(u_int32_t);
+ } else
+ DB_HTONL_COPYOUT(env, bp, argp->blob_fid_lo);
+ if (copy_only) {
+ memcpy(bp, &argp->blob_fid_hi, sizeof(u_int32_t));
+ bp += sizeof(u_int32_t);
+ } else
+ DB_HTONL_COPYOUT(env, bp, argp->blob_fid_hi);
*lenp = (size_t)(bp - start);
return (0);
@@ -386,6 +396,16 @@ __rep_fileinfo_unmarshal(env, version, argpp, bp, max, nextp)
if (max < needed)
goto too_few;
bp += argp->dir.size;
+ if (copy_only) {
+ memcpy(&argp->blob_fid_lo, bp, sizeof(u_int32_t));
+ bp += sizeof(u_int32_t);
+ } else
+ DB_NTOHL_COPYIN(env, argp->blob_fid_lo, bp);
+ if (copy_only) {
+ memcpy(&argp->blob_fid_hi, bp, sizeof(u_int32_t));
+ bp += sizeof(u_int32_t);
+ } else
+ DB_NTOHL_COPYIN(env, argp->blob_fid_hi, bp);
if (nextp != NULL)
*nextp = bp;
@@ -399,6 +419,211 @@ too_few:
}
/*
+ * PUBLIC: int __rep_fileinfo_v7_marshal __P((ENV *, u_int32_t,
+ * PUBLIC: __rep_fileinfo_v7_args *, u_int8_t *, size_t, size_t *));
+ */
+int
+__rep_fileinfo_v7_marshal(env, version, argp, bp, max, lenp)
+ ENV *env;
+ u_int32_t version;
+ __rep_fileinfo_v7_args *argp;
+ u_int8_t *bp;
+ size_t *lenp, max;
+{
+ int copy_only;
+ u_int8_t *start;
+
+ if (max < __REP_FILEINFO_V7_SIZE
+ + (size_t)argp->uid.size
+ + (size_t)argp->info.size
+ + (size_t)argp->dir.size)
+ return (ENOMEM);
+ start = bp;
+
+ copy_only = 0;
+ if (version < DB_REPVERSION_47)
+ copy_only = 1;
+ if (copy_only) {
+ memcpy(bp, &argp->pgsize, sizeof(u_int32_t));
+ bp += sizeof(u_int32_t);
+ } else
+ DB_HTONL_COPYOUT(env, bp, argp->pgsize);
+ if (copy_only) {
+ memcpy(bp, &argp->pgno, sizeof(u_int32_t));
+ bp += sizeof(u_int32_t);
+ } else
+ DB_HTONL_COPYOUT(env, bp, argp->pgno);
+ if (copy_only) {
+ memcpy(bp, &argp->max_pgno, sizeof(u_int32_t));
+ bp += sizeof(u_int32_t);
+ } else
+ DB_HTONL_COPYOUT(env, bp, argp->max_pgno);
+ if (copy_only) {
+ memcpy(bp, &argp->filenum, sizeof(u_int32_t));
+ bp += sizeof(u_int32_t);
+ } else
+ DB_HTONL_COPYOUT(env, bp, argp->filenum);
+ if (copy_only) {
+ memcpy(bp, &argp->finfo_flags, sizeof(u_int32_t));
+ bp += sizeof(u_int32_t);
+ } else
+ DB_HTONL_COPYOUT(env, bp, argp->finfo_flags);
+ if (copy_only) {
+ memcpy(bp, &argp->type, sizeof(u_int32_t));
+ bp += sizeof(u_int32_t);
+ } else
+ DB_HTONL_COPYOUT(env, bp, argp->type);
+ if (copy_only) {
+ memcpy(bp, &argp->db_flags, sizeof(u_int32_t));
+ bp += sizeof(u_int32_t);
+ } else
+ DB_HTONL_COPYOUT(env, bp, argp->db_flags);
+ if (copy_only) {
+ memcpy(bp, &argp->uid.size, sizeof(u_int32_t));
+ bp += sizeof(u_int32_t);
+ } else
+ DB_HTONL_COPYOUT(env, bp, argp->uid.size);
+ if (argp->uid.size > 0) {
+ memcpy(bp, argp->uid.data, argp->uid.size);
+ bp += argp->uid.size;
+ }
+ if (copy_only) {
+ memcpy(bp, &argp->info.size, sizeof(u_int32_t));
+ bp += sizeof(u_int32_t);
+ } else
+ DB_HTONL_COPYOUT(env, bp, argp->info.size);
+ if (argp->info.size > 0) {
+ memcpy(bp, argp->info.data, argp->info.size);
+ bp += argp->info.size;
+ }
+ if (copy_only) {
+ memcpy(bp, &argp->dir.size, sizeof(u_int32_t));
+ bp += sizeof(u_int32_t);
+ } else
+ DB_HTONL_COPYOUT(env, bp, argp->dir.size);
+ if (argp->dir.size > 0) {
+ memcpy(bp, argp->dir.data, argp->dir.size);
+ bp += argp->dir.size;
+ }
+
+ *lenp = (size_t)(bp - start);
+ return (0);
+}
+
+/*
+ * PUBLIC: int __rep_fileinfo_v7_unmarshal __P((ENV *, u_int32_t,
+ * PUBLIC: __rep_fileinfo_v7_args **, u_int8_t *, size_t, u_int8_t **));
+ */
+int
+__rep_fileinfo_v7_unmarshal(env, version, argpp, bp, max, nextp)
+ ENV *env;
+ u_int32_t version;
+ __rep_fileinfo_v7_args **argpp;
+ u_int8_t *bp;
+ size_t max;
+ u_int8_t **nextp;
+{
+ size_t needed;
+ __rep_fileinfo_v7_args *argp;
+ int ret;
+ int copy_only;
+
+ needed = __REP_FILEINFO_V7_SIZE;
+ if (max < needed)
+ goto too_few;
+ if ((ret = __os_malloc(env, sizeof(*argp), &argp)) != 0)
+ return (ret);
+
+ copy_only = 0;
+ if (version < DB_REPVERSION_47)
+ copy_only = 1;
+ if (copy_only) {
+ memcpy(&argp->pgsize, bp, sizeof(u_int32_t));
+ bp += sizeof(u_int32_t);
+ } else
+ DB_NTOHL_COPYIN(env, argp->pgsize, bp);
+ if (copy_only) {
+ memcpy(&argp->pgno, bp, sizeof(u_int32_t));
+ bp += sizeof(u_int32_t);
+ } else
+ DB_NTOHL_COPYIN(env, argp->pgno, bp);
+ if (copy_only) {
+ memcpy(&argp->max_pgno, bp, sizeof(u_int32_t));
+ bp += sizeof(u_int32_t);
+ } else
+ DB_NTOHL_COPYIN(env, argp->max_pgno, bp);
+ if (copy_only) {
+ memcpy(&argp->filenum, bp, sizeof(u_int32_t));
+ bp += sizeof(u_int32_t);
+ } else
+ DB_NTOHL_COPYIN(env, argp->filenum, bp);
+ if (copy_only) {
+ memcpy(&argp->finfo_flags, bp, sizeof(u_int32_t));
+ bp += sizeof(u_int32_t);
+ } else
+ DB_NTOHL_COPYIN(env, argp->finfo_flags, bp);
+ if (copy_only) {
+ memcpy(&argp->type, bp, sizeof(u_int32_t));
+ bp += sizeof(u_int32_t);
+ } else
+ DB_NTOHL_COPYIN(env, argp->type, bp);
+ if (copy_only) {
+ memcpy(&argp->db_flags, bp, sizeof(u_int32_t));
+ bp += sizeof(u_int32_t);
+ } else
+ DB_NTOHL_COPYIN(env, argp->db_flags, bp);
+ if (copy_only) {
+ memcpy(&argp->uid.size, bp, sizeof(u_int32_t));
+ bp += sizeof(u_int32_t);
+ } else
+ DB_NTOHL_COPYIN(env, argp->uid.size, bp);
+ if (argp->uid.size == 0)
+ argp->uid.data = NULL;
+ else
+ argp->uid.data = bp;
+ needed += (size_t)argp->uid.size;
+ if (max < needed)
+ goto too_few;
+ bp += argp->uid.size;
+ if (copy_only) {
+ memcpy(&argp->info.size, bp, sizeof(u_int32_t));
+ bp += sizeof(u_int32_t);
+ } else
+ DB_NTOHL_COPYIN(env, argp->info.size, bp);
+ if (argp->info.size == 0)
+ argp->info.data = NULL;
+ else
+ argp->info.data = bp;
+ needed += (size_t)argp->info.size;
+ if (max < needed)
+ goto too_few;
+ bp += argp->info.size;
+ if (copy_only) {
+ memcpy(&argp->dir.size, bp, sizeof(u_int32_t));
+ bp += sizeof(u_int32_t);
+ } else
+ DB_NTOHL_COPYIN(env, argp->dir.size, bp);
+ if (argp->dir.size == 0)
+ argp->dir.data = NULL;
+ else
+ argp->dir.data = bp;
+ needed += (size_t)argp->dir.size;
+ if (max < needed)
+ goto too_few;
+ bp += argp->dir.size;
+
+ if (nextp != NULL)
+ *nextp = bp;
+ *argpp = argp;
+ return (0);
+
+too_few:
+ __db_errx(env, DB_STR("3675",
+ "Not enough input bytes to fill a __rep_fileinfo_v7 message"));
+ return (EINVAL);
+}
+
+/*
* PUBLIC: int __rep_fileinfo_v6_marshal __P((ENV *, u_int32_t,
* PUBLIC: __rep_fileinfo_v6_args *, u_int8_t *, size_t, size_t *));
*/
@@ -1039,3 +1264,245 @@ too_few:
return (EINVAL);
}
+/*
+ * PUBLIC: void __rep_blob_update_req_marshal __P((ENV *,
+ * PUBLIC: __rep_blob_update_req_args *, u_int8_t *));
+ */
+void
+__rep_blob_update_req_marshal(env, argp, bp)
+ ENV *env;
+ __rep_blob_update_req_args *argp;
+ u_int8_t *bp;
+{
+ DB_HTONLL_COPYOUT(env, bp, argp->blob_fid);
+ DB_HTONLL_COPYOUT(env, bp, argp->blob_sid);
+ DB_HTONLL_COPYOUT(env, bp, argp->blob_id);
+ DB_HTONLL_COPYOUT(env, bp, argp->highest_id);
+}
+
+/*
+ * PUBLIC: int __rep_blob_update_req_unmarshal __P((ENV *,
+ * PUBLIC: __rep_blob_update_req_args *, u_int8_t *, size_t, u_int8_t **));
+ */
+int
+__rep_blob_update_req_unmarshal(env, argp, bp, max, nextp)
+ ENV *env;
+ __rep_blob_update_req_args *argp;
+ u_int8_t *bp;
+ size_t max;
+ u_int8_t **nextp;
+{
+ if (max < __REP_BLOB_UPDATE_REQ_SIZE)
+ goto too_few;
+ DB_NTOHLL_COPYIN(env, argp->blob_fid, bp);
+ DB_NTOHLL_COPYIN(env, argp->blob_sid, bp);
+ DB_NTOHLL_COPYIN(env, argp->blob_id, bp);
+ DB_NTOHLL_COPYIN(env, argp->highest_id, bp);
+
+ if (nextp != NULL)
+ *nextp = bp;
+ return (0);
+
+too_few:
+ __db_errx(env, DB_STR("3675",
+ "Not enough input bytes to fill a __rep_blob_update_req message"));
+ return (EINVAL);
+}
+
+/*
+ * PUBLIC: void __rep_blob_update_marshal __P((ENV *,
+ * PUBLIC: __rep_blob_update_args *, u_int8_t *));
+ */
+void
+__rep_blob_update_marshal(env, argp, bp)
+ ENV *env;
+ __rep_blob_update_args *argp;
+ u_int8_t *bp;
+{
+ DB_HTONLL_COPYOUT(env, bp, argp->blob_fid);
+ DB_HTONLL_COPYOUT(env, bp, argp->highest_id);
+ DB_HTONL_COPYOUT(env, bp, argp->flags);
+ DB_HTONL_COPYOUT(env, bp, argp->num_blobs);
+}
+
+/*
+ * PUBLIC: int __rep_blob_update_unmarshal __P((ENV *,
+ * PUBLIC: __rep_blob_update_args *, u_int8_t *, size_t, u_int8_t **));
+ */
+int
+__rep_blob_update_unmarshal(env, argp, bp, max, nextp)
+ ENV *env;
+ __rep_blob_update_args *argp;
+ u_int8_t *bp;
+ size_t max;
+ u_int8_t **nextp;
+{
+ if (max < __REP_BLOB_UPDATE_SIZE)
+ goto too_few;
+ DB_NTOHLL_COPYIN(env, argp->blob_fid, bp);
+ DB_NTOHLL_COPYIN(env, argp->highest_id, bp);
+ DB_NTOHL_COPYIN(env, argp->flags, bp);
+ DB_NTOHL_COPYIN(env, argp->num_blobs, bp);
+
+ if (nextp != NULL)
+ *nextp = bp;
+ return (0);
+
+too_few:
+ __db_errx(env, DB_STR("3675",
+ "Not enough input bytes to fill a __rep_blob_update message"));
+ return (EINVAL);
+}
+
+/*
+ * PUBLIC: void __rep_blob_file_marshal __P((ENV *,
+ * PUBLIC: __rep_blob_file_args *, u_int8_t *));
+ */
+void
+__rep_blob_file_marshal(env, argp, bp)
+ ENV *env;
+ __rep_blob_file_args *argp;
+ u_int8_t *bp;
+{
+ DB_HTONLL_COPYOUT(env, bp, argp->blob_sid);
+ DB_HTONLL_COPYOUT(env, bp, argp->blob_id);
+ DB_HTONLL_COPYOUT(env, bp, argp->blob_size);
+}
+
+/*
+ * PUBLIC: int __rep_blob_file_unmarshal __P((ENV *,
+ * PUBLIC: __rep_blob_file_args *, u_int8_t *, size_t, u_int8_t **));
+ */
+int
+__rep_blob_file_unmarshal(env, argp, bp, max, nextp)
+ ENV *env;
+ __rep_blob_file_args *argp;
+ u_int8_t *bp;
+ size_t max;
+ u_int8_t **nextp;
+{
+ if (max < __REP_BLOB_FILE_SIZE)
+ goto too_few;
+ DB_NTOHLL_COPYIN(env, argp->blob_sid, bp);
+ DB_NTOHLL_COPYIN(env, argp->blob_id, bp);
+ DB_NTOHLL_COPYIN(env, argp->blob_size, bp);
+
+ if (nextp != NULL)
+ *nextp = bp;
+ return (0);
+
+too_few:
+ __db_errx(env, DB_STR("3675",
+ "Not enough input bytes to fill a __rep_blob_file message"));
+ return (EINVAL);
+}
+
+/*
+ * PUBLIC: void __rep_blob_chunk_marshal __P((ENV *,
+ * PUBLIC: __rep_blob_chunk_args *, u_int8_t *));
+ */
+void
+__rep_blob_chunk_marshal(env, argp, bp)
+ ENV *env;
+ __rep_blob_chunk_args *argp;
+ u_int8_t *bp;
+{
+ DB_HTONL_COPYOUT(env, bp, argp->flags);
+ DB_HTONLL_COPYOUT(env, bp, argp->blob_fid);
+ DB_HTONLL_COPYOUT(env, bp, argp->blob_sid);
+ DB_HTONLL_COPYOUT(env, bp, argp->blob_id);
+ DB_HTONLL_COPYOUT(env, bp, argp->offset);
+ DB_HTONL_COPYOUT(env, bp, argp->data.size);
+ if (argp->data.size > 0) {
+ memcpy(bp, argp->data.data, argp->data.size);
+ bp += argp->data.size;
+ }
+}
+
+/*
+ * PUBLIC: int __rep_blob_chunk_unmarshal __P((ENV *,
+ * PUBLIC: __rep_blob_chunk_args *, u_int8_t *, size_t, u_int8_t **));
+ */
+int
+__rep_blob_chunk_unmarshal(env, argp, bp, max, nextp)
+ ENV *env;
+ __rep_blob_chunk_args *argp;
+ u_int8_t *bp;
+ size_t max;
+ u_int8_t **nextp;
+{
+ size_t needed;
+
+ needed = __REP_BLOB_CHUNK_SIZE;
+ if (max < needed)
+ goto too_few;
+ DB_NTOHL_COPYIN(env, argp->flags, bp);
+ DB_NTOHLL_COPYIN(env, argp->blob_fid, bp);
+ DB_NTOHLL_COPYIN(env, argp->blob_sid, bp);
+ DB_NTOHLL_COPYIN(env, argp->blob_id, bp);
+ DB_NTOHLL_COPYIN(env, argp->offset, bp);
+ DB_NTOHL_COPYIN(env, argp->data.size, bp);
+ if (argp->data.size == 0)
+ argp->data.data = NULL;
+ else
+ argp->data.data = bp;
+ needed += (size_t)argp->data.size;
+ if (max < needed)
+ goto too_few;
+ bp += argp->data.size;
+
+ if (nextp != NULL)
+ *nextp = bp;
+ return (0);
+
+too_few:
+ __db_errx(env, DB_STR("3675",
+ "Not enough input bytes to fill a __rep_blob_chunk message"));
+ return (EINVAL);
+}
+
+/*
+ * PUBLIC: void __rep_blob_chunk_req_marshal __P((ENV *,
+ * PUBLIC: __rep_blob_chunk_req_args *, u_int8_t *));
+ */
+void
+__rep_blob_chunk_req_marshal(env, argp, bp)
+ ENV *env;
+ __rep_blob_chunk_req_args *argp;
+ u_int8_t *bp;
+{
+ DB_HTONLL_COPYOUT(env, bp, argp->blob_fid);
+ DB_HTONLL_COPYOUT(env, bp, argp->blob_sid);
+ DB_HTONLL_COPYOUT(env, bp, argp->blob_id);
+ DB_HTONLL_COPYOUT(env, bp, argp->offset);
+}
+
+/*
+ * PUBLIC: int __rep_blob_chunk_req_unmarshal __P((ENV *,
+ * PUBLIC: __rep_blob_chunk_req_args *, u_int8_t *, size_t, u_int8_t **));
+ */
+int
+__rep_blob_chunk_req_unmarshal(env, argp, bp, max, nextp)
+ ENV *env;
+ __rep_blob_chunk_req_args *argp;
+ u_int8_t *bp;
+ size_t max;
+ u_int8_t **nextp;
+{
+ if (max < __REP_BLOB_CHUNK_REQ_SIZE)
+ goto too_few;
+ DB_NTOHLL_COPYIN(env, argp->blob_fid, bp);
+ DB_NTOHLL_COPYIN(env, argp->blob_sid, bp);
+ DB_NTOHLL_COPYIN(env, argp->blob_id, bp);
+ DB_NTOHLL_COPYIN(env, argp->offset, bp);
+
+ if (nextp != NULL)
+ *nextp = bp;
+ return (0);
+
+too_few:
+ __db_errx(env, DB_STR("3675",
+ "Not enough input bytes to fill a __rep_blob_chunk_req message"));
+ return (EINVAL);
+}
+
diff --git a/src/rep/rep_backup.c b/src/rep/rep_backup.c
index cfde7622..14bc63bb 100644
--- a/src/rep/rep_backup.c
+++ b/src/rep/rep_backup.c
@@ -1,7 +1,7 @@
/*-
* See the file LICENSE for redistribution information.
*
- * Copyright (c) 2004, 2012 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2004, 2015 Oracle and/or its affiliates. All rights reserved.
*
* $Id$
*/
@@ -9,6 +9,7 @@
#include "db_config.h"
#include "db_int.h"
+#include "dbinc/blob.h"
#include "dbinc/db_page.h"
#include "dbinc/db_am.h"
#include "dbinc/fop.h"
@@ -26,21 +27,45 @@
* Note that the fileinfo for the first file in the list always appears at
* (constant) offset __REP_UPDATE_SIZE in the buffer.
*/
+#define FILE_CTX_INMEM_ONLY 0x01
typedef struct {
u_int8_t *buf; /* Buffer base address. */
u_int32_t size; /* Total allocated buffer size. */
u_int8_t *fillptr; /* Pointer to first unused space. */
u_int32_t count; /* Number of entries currently in list. */
u_int32_t version; /* Rep version of marshaled format. */
+ u_int32_t flags; /* Context flags. */
} FILE_LIST_CTX;
#define FIRST_FILE_PTR(buf) ((buf) + __REP_UPDATE_SIZE)
/*
+ * Flags used to show the state of blob files on the master in messages
+ * sent to the client.
+ */
+#define BLOB_DONE 0x01
+#define BLOB_DELETE 0x02
+#define BLOB_CHUNK_FAIL 0x04
+
+#define BLOB_ID_SIZE sizeof(db_seq_t)
+#define BLOB_KEY_SIZE (2 * BLOB_ID_SIZE)
+
+/*
* Function that performs any desired processing on a single file, as part of
* the traversal of a list of database files, such as with internal init.
*/
typedef int (FILE_WALK_FN) __P((ENV *, __rep_fileinfo_args *, void *));
+static int __rep_add_files_to_list __P((
+ ENV *, const char *, const char *, FILE_LIST_CTX *, const char **, int));
+static int __rep_blob_chunk_gap
+ __P((ENV *, int, DB_THREAD_INFO *, REP *, int *, db_seq_t, int));
+static int __rep_blob_cleanup __P((ENV *, REP *));
+static int __rep_blobdone
+ __P((ENV *, int, DB_THREAD_INFO *, REP *, db_seq_t, int));
+static int __rep_blob_find_files __P((ENV *, DB_THREAD_INFO *, const char *,
+ db_seq_t *, db_seq_t, db_seq_t, db_seq_t *, DBT *, size_t *, u_int32_t *));
+static int __rep_blob_sort_dirs __P((ENV *,
+ int (*)(const char *), char **, int, char ***, int *));
static FILE_WALK_FN __rep_check_uid;
static int __rep_clean_interrupted __P((ENV *));
static FILE_WALK_FN __rep_cleanup_nimdbs;
@@ -52,6 +77,8 @@ static int __rep_get_fileinfo __P((ENV *, const char *,
const char *, __rep_fileinfo_args *, u_int8_t *));
static int __rep_get_file_list __P((ENV *,
DB_FH *, u_int32_t, u_int32_t *, DBT *));
+static int __rep_init_file_list_context __P((ENV *,
+ u_int32_t, u_int32_t, int, FILE_LIST_CTX *));
static int __rep_is_replicated_db __P((const char *, const char *));
static int __rep_log_setup __P((ENV *,
REP *, u_int32_t, u_int32_t, DB_LSN *));
@@ -72,9 +99,12 @@ static FILE_WALK_FN __rep_remove_file;
static int __rep_remove_logs __P((ENV *));
static int __rep_remove_nimdbs __P((ENV *));
static int __rep_rollback __P((ENV *, DB_LSN *));
+static int __rep_select_blob_file __P((const char *));
+static int __rep_select_blob_sdb __P((const char *));
static int __rep_unlink_by_list __P((ENV *, u_int32_t,
u_int8_t *, u_int32_t, u_int32_t));
static FILE_WALK_FN __rep_unlink_file;
+static int __rep_walk_blob_dir __P((ENV *, FILE_LIST_CTX*));
static int __rep_walk_filelist __P((ENV *, u_int32_t, u_int8_t *,
u_int32_t, u_int32_t, FILE_WALK_FN *, void *));
static int __rep_walk_dir __P((ENV *, const char *, const char *,
@@ -129,14 +159,12 @@ __rep_update_req(env, rp)
dblp = env->lg_handle;
logc = NULL;
- if ((ret = __os_calloc(env, 1, MEGABYTE, &context.buf)) != 0)
- goto err_noalloc;
- context.size = MEGABYTE;
- context.count = 0;
- context.version = rp->rep_version;
/* Reserve space for the update_args, and fill in file info. */
- context.fillptr = FIRST_FILE_PTR(context.buf);
+ if ((ret = __rep_init_file_list_context(env, rp->rep_version,
+ F_ISSET(rp, REPCTL_INMEM_ONLY) ? FILE_CTX_INMEM_ONLY : 0,
+ 1, &context)) != 0)
+ goto err_noalloc;
if ((ret = __rep_find_dbs(env, &context)) != 0)
goto err;
@@ -214,6 +242,472 @@ err_noalloc:
}
/*
+ * Passed to the __rep_blob_sort_dirs function.
+ * Select blob files, of the form __db.bl###
+ */
+static int
+__rep_select_blob_file(file)
+ const char *file;
+{
+ if (strncmp(BLOB_FILE_PREFIX, file, strlen(BLOB_FILE_PREFIX)) == 0)
+ return (1);
+ else
+ return (0);
+}
+
+/*
+ * Passed to the __rep_blob_sort_dirs function.
+ * Select blob subdatabase directories, of the form __db###
+ */
+static int
+__rep_select_blob_sdb(file)
+ const char *file;
+{
+ if (strncmp(BLOB_DIR_PREFIX, file, strlen(BLOB_DIR_PREFIX)) == 0 &&
+ strncmp(BLOB_FILE_PREFIX, file, strlen(BLOB_FILE_PREFIX)) != 0 &&
+ strcmp(BLOB_META_FILE_NAME, file) != 0)
+ return (1);
+ else
+ return (0);
+}
+
+/*
+ * __rep_blob_sort_dirs
+ * Create a sorted list of directory names that all share a type that
+ * is selected using the given function.
+ */
+static int
+__rep_blob_sort_dirs(env, select_fn, dirs, dirs_cnt, sorted, sorted_cnt)
+ ENV *env;
+ int (*select_fn) __P((const char *));
+ char **dirs;
+ int dirs_cnt;
+ char ***sorted;
+ int *sorted_cnt;
+{
+ char **sort, *tmp;
+ int i, ret, size, sort_cnt, swapped;
+
+ *sorted = NULL;
+ *sorted_cnt = 0;
+ sort_cnt = 0;
+
+ if ((ret = __os_malloc(env,
+ (sizeof(char *) * (unsigned int)dirs_cnt), &sort)) != 0)
+ return (ret);
+
+ for (i = 0; i < dirs_cnt; i++) {
+ if (select_fn(dirs[i])) {
+ sort[sort_cnt] = dirs[i];
+ sort_cnt++;
+ }
+ }
+
+ /*
+ * Directories are usually returned in order, or close to it, so use
+ * Bubble Sort to sort the list.
+ */
+ size = sort_cnt;
+ swapped = 1;
+ while (swapped == 1 && size > 1) {
+ swapped = 0;
+ for (i = 0; (i + 1) < size; i++) {
+ if (strcmp(sort[i], sort[i+1]) > 0) {
+ tmp = sort[i];
+ sort[i] = sort[i+1];
+ sort[i+1] = tmp;
+ swapped = 1;
+ }
+ }
+ size--;
+ }
+
+ *sorted = sort;
+ *sorted_cnt = sort_cnt;
+
+ return (0);
+}
+
+#define BLOB_THROTTLE_DEFAULT (10 * MEGABYTE)
+
+/*
+ * __rep_blob_update_req
+ * Send a list of blob files, starting after the blob id and sub-database
+ * id sent in the BLOB_UPDATE_REQ message.
+ *
+ * PUBLIC: int __rep_blob_update_req __P((ENV *, DB_THREAD_INFO *, DBT *));
+ */
+int
+__rep_blob_update_req(env, ip, rec)
+ ENV *env;
+ DB_THREAD_INFO *ip;
+ DBT *rec;
+{
+ DBT rbudbt;
+ REP *rep;
+ __rep_blob_update_args rbu;
+ __rep_blob_update_req_args rbur;
+ db_seq_t blob_fid, blob_id, blob_sdb, tmp;
+ int cur, dirs_cnt, ret, sdb_cnt;
+ size_t sent;
+ char *blob_sub_dir, *dir, **dirs, **sdb;
+ u_int32_t num_blobs, throttle;
+ u_int8_t *ptr;
+
+ memset(&rbu, 0, sizeof(__rep_blob_update_args));
+ memset(&rbudbt, 0, sizeof(DBT));
+ blob_sub_dir = dir = NULL;
+ dirs = sdb = NULL;
+ sent = 0;
+ num_blobs = 0;
+ cur = dirs_cnt = sdb_cnt = 0;
+ rep = env->rep_handle->region;
+ throttle = rep->gbytes * GIGABYTE + rep->bytes;
+ if (throttle == 0)
+ throttle = BLOB_THROTTLE_DEFAULT;
+
+ if ((ret = __rep_blob_update_req_unmarshal(
+ env, &rbur, rec->data, rec->size, &ptr)) != 0)
+ goto err;
+
+ RPRINT(env, (env, DB_VERB_REP_SYNC,
+"blob_update_req: file_id %llu sdb_id %llu blob_id %llu highest %llu",
+ (long long)rbur.blob_fid, (long long)rbur.blob_sid,
+ (long long)rbur.blob_id, (long long)rbur.highest_id));
+
+ rbu.blob_fid = rbur.blob_fid;
+
+ if ((ret = __os_malloc(env, MEGABYTE, &rbudbt.data)) != 0)
+ goto err;
+ rbudbt.ulen = MEGABYTE;
+ rbudbt.size = __REP_BLOB_UPDATE_SIZE;
+
+ blob_fid = (db_seq_t)rbur.blob_fid;
+ blob_sdb = (db_seq_t)rbur.blob_sid;
+ blob_id = (db_seq_t)rbur.blob_id;
+
+ /* Find the first blob file if it is unknown. */
+ if (blob_id == 0 && blob_sdb == 0) {
+find_sdb: if (dirs == NULL) {
+ if ((ret = __blob_make_sub_dir(
+ env, &blob_sub_dir, blob_fid, 0)) != 0)
+ goto err;
+ if ((ret = __db_appname(
+ env, DB_APP_BLOB, blob_sub_dir, NULL, &dir)) != 0)
+ goto err;
+ /* If no directory, there are no blobs to send. */
+ if (__os_exists(env, dir, NULL) != 0)
+ goto filedone;
+
+ if ((ret = __os_dirlist(
+ env, dir, 1, &dirs, &dirs_cnt)) != 0)
+ goto err;
+
+ if (dirs_cnt == 0)
+ goto filedone;
+
+ if ((ret = __rep_blob_sort_dirs(
+ env, __rep_select_blob_sdb,
+ dirs, dirs_cnt, &sdb, &sdb_cnt)) != 0)
+ goto err;
+ }
+ /*
+ * Iterate through the list of subdirectories, until we find
+ * one that has an id larger than the current subdirectory id.
+ */
+ while (cur < sdb_cnt) {
+ if ((ret = __blob_path_to_dir_ids(
+ env, sdb[cur], &tmp, NULL)) != 0)
+ goto err;
+ if (blob_sdb < tmp) {
+ blob_sdb = tmp;
+ break;
+ }
+ cur++;
+ }
+ /* Check if no more subdirectories to search */
+ if (sdb_cnt != 0 && cur == sdb_cnt)
+ goto filedone;
+ if (dir != NULL)
+ __os_free(env, dir);
+ dir = NULL;
+ if (blob_sub_dir != NULL)
+ __os_free(env, blob_sub_dir);
+ blob_sub_dir = NULL;
+ }
+
+ if (blob_sub_dir == NULL && (ret =
+ __blob_make_sub_dir(env, &blob_sub_dir, blob_fid, blob_sdb)) != 0)
+ goto err;
+
+ if (dir == NULL && (ret = __db_appname(
+ env, DB_APP_BLOB, blob_sub_dir, NULL, &dir)) != 0)
+ goto err;
+ /* Search the current directory for blob files with id > blob_id. */
+ if ((ret = __rep_blob_find_files(
+ env, ip, dir, &blob_id, blob_sdb, blob_fid,
+ (db_seq_t *)&rbur.highest_id, &rbudbt, &sent, &num_blobs)) != 0)
+ goto err;
+
+ /*
+ * If we have not reached the send limit, and there are still
+ * directories to search, then search the next directory.
+ */
+ if (sent < throttle) {
+ if (blob_sdb != 0) {
+ rbur.highest_id = 0;
+ blob_id = 0;
+ __os_free(env, blob_sub_dir);
+ blob_sub_dir = NULL;
+ __os_free(env, dir);
+ dir = NULL;
+ goto find_sdb;
+ } else {
+ /* Mark as the end of the files. */
+filedone: F_SET(&rbu, BLOB_DONE);
+ rbur.highest_id = 0;
+ }
+ } else
+ STAT(rep->stat.st_nthrottles++);
+
+ rbu.num_blobs = num_blobs;
+ rbu.highest_id = rbur.highest_id;
+ __rep_blob_update_marshal(env, &rbu, rbudbt.data);
+ RPRINT(env, (env, DB_VERB_REP_SYNC,
+ "Sending blob_update: file_id %llu, num_blobs %lu, flags %lu",
+ (long long)rbu.blob_fid,
+ (long)num_blobs, (unsigned long)rbu.flags));
+ (void)__rep_send_message(
+ env, DB_EID_BROADCAST, REP_BLOB_UPDATE, NULL, &rbudbt, 0, 0);
+
+err: if (sdb != NULL)
+ __os_free(env, sdb);
+ if (dirs != NULL)
+ __os_dirfree(env, dirs, dirs_cnt);
+ if (dir != NULL)
+ __os_free(env, dir);
+ if (blob_sub_dir != NULL)
+ __os_free(env, blob_sub_dir);
+ if (rbudbt.data != NULL)
+ __os_free(env, rbudbt.data);
+ return (ret);
+}
+
+/*
+ * __rep_blob_find_files
+ *
+ * Search a directory for blob files, starting with the given blob id and
+ * sub-database id. Add information for each blob to the message buffer until
+ * there are no more files, or it has reached the maximum send amount in terms
+ * of combined blob files size.
+ *
+ * This search is complicated because the blobs have to be sent in order by id,
+ * but there can be huge holes between a blob file and the one with the next
+ * highest id, so iterating through the ids looking to see if the file exists
+ * for each id will take too long. The solution is to walk the directory
+ * hierarchy in order, reading every file in that directory, sorting them by
+ * id, and adding them to the update list.
+ */
+static int
+__rep_blob_find_files(
+ env, ip, dir, blob_id, blob_sid, blob_fid, highest, buf, sent, num)
+ ENV *env;
+ DB_THREAD_INFO *ip;
+ const char *dir;
+ db_seq_t *blob_id;
+ db_seq_t blob_sid;
+ db_seq_t blob_fid;
+ db_seq_t *highest;
+ DBT *buf;
+ size_t *sent;
+ u_int32_t *num;
+{
+ DB *bmd;
+ DB_FH *fhp;
+ DB_TXN *txn;
+ REP *rep;
+ __rep_blob_file_args rbf;
+ char blob_path[MAX_BLOB_PATH_SZ], **dirs, **files, *path, *ptr;
+ db_seq_t tmp;
+ int blob_path_len, cur, depth, dirs_cnt, files_cnt, ret;
+ off_t blob_size;
+ size_t len;
+ u_int32_t bytes, mbytes, throttle;
+
+ bmd = NULL;
+ txn = NULL;
+ fhp = NULL;
+ path = NULL;
+ dirs = files = NULL;
+ dirs_cnt = files_cnt = 0;
+ rbf.blob_sid = (u_int64_t)blob_sid;
+ rep = env->rep_handle->region;
+ throttle = rep->gbytes * GIGABYTE + rep->bytes;
+ if (throttle == 0)
+ throttle = BLOB_THROTTLE_DEFAULT;
+
+ if ((ret = __os_malloc(
+ env, strlen(dir) + MAX_BLOB_PATH_SZ, &path)) != 0)
+ goto err;
+
+ /*
+ * Read the highest possible blob id from the blob meta database, so
+ * we know when to stop looking for files for this database. The
+ * highest value is reset everytime we switch to a new subdatabase.
+ */
+ if (*highest == 0) {
+ if ((ret = __db_create_internal(&bmd, env, 0)) != 0)
+ goto err;
+
+ if ((ret = __txn_begin(
+ env, ip, NULL, &txn, DB_IGNORE_LEASE)) != 0)
+ goto err;
+
+ bmd->blob_file_id = blob_fid;
+ bmd->blob_sdb_id = blob_sid;
+ if ((ret = __blob_highest_id(bmd, txn, highest) ) != 0)
+ goto err;
+
+ if ((ret = __txn_abort(txn)) != 0)
+ goto err;
+ txn = NULL;
+ if ((ret = __db_close(bmd, NULL, 0)) != 0)
+ goto err;
+ bmd = NULL;
+ (*highest)++;
+ }
+
+ (*blob_id)++;
+ while (*sent < throttle && *blob_id < *highest) {
+ memset(blob_path, 0, MAX_BLOB_PATH_SZ);
+ blob_path_len = depth = 0;
+
+ /* Calucate the subdirectory from the blob id. */
+ __blob_calculate_dirs(
+ *blob_id, blob_path, &blob_path_len, &depth);
+ if (blob_path_len != 0) {
+ (void)sprintf(path, "%s%c%s%c",
+ dir, PATH_SEPARATOR[0], blob_path, PATH_SEPARATOR[0]);
+ } else
+ (void)sprintf(path, "%s", dir);
+ len = strlen(path);
+
+ /* If the sub-directory does not exist, look for the next. */
+ if (__os_exists(env, path, NULL) != 0) {
+ (*blob_id) +=
+ BLOB_DIR_ELEMS - (*blob_id % BLOB_DIR_ELEMS);
+ continue;
+ }
+
+ /* Get a list of all the blob files, sorted by id. */
+ if ((ret = __os_dirlist(env, path, 0, &dirs, &dirs_cnt)) != 0)
+ goto err;
+
+ if ((ret = __rep_blob_sort_dirs(env, __rep_select_blob_file,
+ dirs, dirs_cnt, &files, &files_cnt)) != 0)
+ goto err;
+
+ /*
+ * Find the first blob file with an id greater than or equal to
+ * the last id.
+ */
+ for (cur = 0; cur < files_cnt; cur++) {
+ ptr = files[cur];
+ ptr += strlen(BLOB_FILE_PREFIX);
+ if ((ret = __blob_str_to_id(
+ env, (const char **)&ptr, &tmp)) != 0)
+ goto err;
+ DB_ASSERT(env, tmp != 0);
+ if (tmp >= *blob_id)
+ break;
+ }
+
+ /* Add each remaining blob file to the message buffer. */
+ while (cur < files_cnt) {
+ /* Get the blob id from the current file name. */
+ (void)sprintf(path + len, "%s", files[cur]);
+ ptr = path + len + strlen(BLOB_FILE_PREFIX);
+ if ((ret = __blob_str_to_id(
+ env, (const char **)&ptr, blob_id)) != 0)
+ goto err;
+ rbf.blob_id = (u_int64_t)*blob_id;
+ /* Open the file and get its size. */
+ if ((ret = __os_open(
+ env, path, 0, DB_OSO_RDONLY, 0, &fhp)) != 0) {
+ if (ret == ENOENT) {
+ ret = 0;
+ RPRINT(env, (env, DB_VERB_REP_SYNC,
+ "blob_update blob file: %llu deleted, skipping.",
+ (long long)rbf.blob_id));
+ cur++;
+ continue;
+ }
+ goto err;
+ }
+ if ((ret = __os_ioinfo(
+ env, path, fhp, &mbytes, &bytes, NULL)) != 0)
+ goto err;
+ if ((ret =__os_closehandle(env, fhp)) != 0)
+ goto err;
+ fhp = NULL;
+ blob_size = ((off_t)mbytes * (off_t)MEGABYTE) + bytes;
+ rbf.blob_size = (u_int64_t)blob_size;
+ if (blob_size > UINT32_MAX)
+ (*sent) = throttle + 1;
+ else {
+ if (((*sent) + (size_t)blob_size) < (*sent))
+ (*sent) = throttle + 1;
+ else
+ (*sent) += (size_t)blob_size;
+ }
+ __rep_blob_file_marshal(
+ env, &rbf, (u_int8_t *)buf->data + buf->size);
+ (*num)++;
+ buf->size += __REP_BLOB_FILE_SIZE;
+ RPRINT(env, (env, DB_VERB_REP_SYNC,
+ "blob_update adding: blob_sid %llu, blob_id %llu blob_size %llu",
+ (long long)rbf.blob_sid,
+ (long long)rbf.blob_id, (long long)rbf.blob_size));
+ if ((*sent) > throttle)
+ goto err;
+
+ /* Resize if there is not enough space to grow. */
+ if (buf->size > (buf->ulen - __REP_BLOB_FILE_SIZE)) {
+ if ((ret = __os_realloc(
+ env, buf->ulen * 2, &buf->data)) != 0)
+ goto err;
+ buf->ulen *= 2;
+ }
+ cur++;
+ }
+ /*
+ * Move to the next directory of blob files by setting the blob
+ * id to the next largest possible value.
+ */
+ (*blob_id) += BLOB_DIR_ELEMS - (*blob_id % BLOB_DIR_ELEMS);
+ __os_free(env, files);
+ files = NULL;
+ __os_dirfree(env, dirs, dirs_cnt);
+ dirs = NULL;
+ }
+err:
+ if (path != NULL)
+ __os_free(env, path);
+ if (files != NULL)
+ __os_free(env, files);
+ if (dirs != NULL)
+ __os_dirfree(env, dirs, dirs_cnt);
+ if (fhp != NULL)
+ (void)__os_closehandle(env, fhp);
+ if (txn != NULL)
+ (void)__txn_abort(txn);
+ if (bmd != NULL)
+ (void)__db_close(bmd, NULL, 0);
+
+ return (ret);
+}
+
+/*
* __rep_find_dbs -
* Walk through all the named files/databases including those in the
* environment or data_dirs and those that in named and in-memory. We
@@ -240,7 +734,8 @@ __rep_find_dbs(env, context)
* replicated user databases. If the application has a metadata_dir,
* this will also find any persistent internal system databases.
*/
- if (dbenv->db_data_dir != NULL) {
+ if (!F_ISSET(context, FILE_CTX_INMEM_ONLY) &&
+ dbenv->db_data_dir != NULL) {
for (ddir = dbenv->db_data_dir; *ddir != NULL; ++ddir) {
if ((ret = __db_appname(env,
DB_APP_NONE, *ddir, NULL, &real_dir)) != 0)
@@ -252,16 +747,24 @@ __rep_find_dbs(env, context)
real_dir = NULL;
}
}
+
/*
* Walk the environment directory. If the application doesn't
* have a metadata_dir, this will return persistent internal system
* databases. If the application doesn't have a separate data
* directory, this will also return all user databases.
*/
- if (ret == 0)
+ if (!F_ISSET(context, FILE_CTX_INMEM_ONLY) && ret == 0)
ret = __rep_walk_dir(env, env->db_home, NULL, context);
- /* Now, collect any in-memory named databases. */
+ /* Gather the databases in the blob directory. */
+ if (!F_ISSET(context, FILE_CTX_INMEM_ONLY) && ret == 0)
+ ret = __rep_walk_blob_dir(env, context);
+
+ /*
+ * Now, collect any in-memory named databases. We do this no
+ * matter if the INMEM_ONLY flag is set or not.
+ */
if (ret == 0)
ret = __rep_walk_dir(env, NULL, NULL, context);
@@ -271,6 +774,148 @@ __rep_find_dbs(env, context)
}
/*
+ * __rep_walk_blob_dir --
+ *
+ * The blob directory hierarchy consists of a top layer that contains the
+ * blob meta database (BMD) and a set of blob directories (BLDIR).
+ * Each BLDIR corresponds to a database file. If the database file doesn't
+ * contain subdatabases, the BLDIR contains a BMD and blob files. If the
+ * database file contains subdatabases, the BLDIR contains a BLSDIR
+ * subdirectory for each subdatabase. Each BLSDIR contains a BMD and blob
+ * files.
+ *
+ * This function walks the blob directory hierarchy and records any BMD.
+ * It first checks if the top level BMD exists, and if it does searches
+ * the first and second layers of the hierarchy for BMDs.
+ */
+static int
+__rep_walk_blob_dir(env, context)
+ ENV *env;
+ FILE_LIST_CTX *context;
+{
+ int cnt, cnt2, i, j, ret;
+ size_t len;
+ char *blob_dir, *blob_sub, **dirs, *name, *name2, **subdirs;
+ char blob_sub_buf[MAX_BLOB_PATH_SZ];
+ const char *bmd, *dirp;
+
+ cnt = cnt2 = 0;
+ blob_dir = name = name2 = NULL;
+ dirs = subdirs = NULL;
+ bmd = BLOB_META_FILE_NAME;
+ blob_sub = blob_sub_buf;
+
+ if ((ret = __db_appname(
+ env, DB_APP_BLOB, BLOB_META_FILE_NAME, &dirp, &name)) != 0)
+ goto err;
+
+ /*
+ * If the main blob meta database does not exist, then no databases in
+ * the environment supports blobs.
+ */
+ if ((ret = __os_exists(env, name, NULL)) != 0) {
+ ret = 0;
+ goto err;
+ }
+
+ /* Get the blob directory. */
+ if ((ret = __db_appname(
+ env, DB_APP_BLOB, NULL, &dirp, &blob_dir)) != 0)
+ goto err;
+
+ if ((ret = __rep_add_files_to_list(
+ env, blob_dir, NULL, context, &bmd, 1)) != 0)
+ goto err;
+
+ if ((ret = __os_dirlist(env, blob_dir, 1, &dirs, &cnt)) != 0)
+ goto err;
+
+ __os_free(env, name);
+ name = NULL;
+ if ((ret = __os_malloc(
+ env, MAX_BLOB_PATH_SZ + strlen(blob_dir), &name)) != 0)
+ goto err;
+
+ for (i = 0; i < cnt; i++) {
+ /*
+ * Skip blob files and the top level BMD
+ * (which was handled above).
+ */
+ if (IS_BLOB_META(dirs[i]) || IS_BLOB_FILE(dirs[i]))
+ continue;
+ len = strlen(blob_dir) +
+ strlen(dirs[i]) + strlen(BLOB_META_FILE_NAME) + 3;
+ (void)snprintf(name, len, "%s%c%s%c%s", blob_dir,
+ PATH_SEPARATOR[0], dirs[i], PATH_SEPARATOR[0],
+ BLOB_META_FILE_NAME);
+ /*
+ * If a blob meta database exists, add it to the list, and move
+ * on to the next directory, otherwise get a directory list and
+ * check the second layer for BMD. If a directory contains a
+ * BMD, then it cannot contain subdirectories with BMD.
+ */
+ if (__os_exists(env, name, NULL) == 0) {
+ (void)snprintf(blob_sub,
+ strlen(dirs[i]) + strlen(bmd) + 2,
+ "%s%c%s", dirs[i], PATH_SEPARATOR[0], bmd);
+ if ((ret = __rep_add_files_to_list(env, blob_dir,
+ NULL, context, (const char **)&blob_sub, 1)) != 0)
+ goto err;
+ } else {
+ len = strlen(blob_dir) + strlen(dirs[i]) + 2;
+ (void)snprintf(name, len, "%s%c%s",
+ blob_dir, PATH_SEPARATOR[0], dirs[i]);
+ if ((ret = __os_dirlist(
+ env, name, 1, &subdirs, &cnt2)) != 0)
+ goto err;
+ if (name2 == NULL) {
+ if ((ret = __os_malloc(env,
+ MAX_BLOB_PATH_SZ + strlen(name),
+ &name2)) != 0)
+ goto err;
+ }
+ for (j = 0; j < cnt2; j++) {
+ if (IS_BLOB_FILE(subdirs[j]))
+ continue;
+ len = strlen(name) + strlen(subdirs[j])
+ + strlen(BLOB_META_FILE_NAME) + 3;
+ (void)snprintf(name2, len, "%s%c%s%c%s",
+ name, PATH_SEPARATOR[0], subdirs[j],
+ PATH_SEPARATOR[0], BLOB_META_FILE_NAME);
+ if ((ret = __os_exists(
+ env, name2, NULL)) == 0) {
+ len = strlen(dirs[i])
+ + strlen(subdirs[j])
+ + strlen(bmd) + 3;
+ (void)snprintf(blob_sub,
+ len, "%s%c%s%c%s", dirs[i],
+ PATH_SEPARATOR[0], subdirs[j],
+ PATH_SEPARATOR[0], bmd);
+ if ((ret = __rep_add_files_to_list(
+ env, blob_dir, NULL, context,
+ (const char **)&blob_sub, 1)) != 0)
+ goto err;
+ }
+ }
+ __os_dirfree(env, subdirs, cnt2);
+ subdirs = NULL;
+ }
+ }
+
+err: if (name != NULL)
+ __os_free(env, name);
+ if (name2 != NULL)
+ __os_free(env, name2);
+ if (blob_dir != NULL)
+ __os_free(env, blob_dir);
+ if (dirs != NULL)
+ __os_dirfree(env, dirs, cnt);
+ if (subdirs != NULL)
+ __os_dirfree(env, subdirs, cnt2);
+ return (ret);
+}
+
+/*
* __rep_walk_dir --
*
* This is the routine that walks a directory and fills in the structures
@@ -284,11 +929,8 @@ __rep_walk_dir(env, dir, datadir, context)
const char *dir, *datadir;
FILE_LIST_CTX *context;
{
- __rep_fileinfo_args tmpfp;
- size_t avail, len;
- int cnt, first_file, i, ret;
- u_int8_t uid[DB_FILE_ID_LEN];
- char *file, **names, *subdb;
+ int cnt, ret;
+ char **names;
if (dir == NULL) {
VPRINT(env, (env, DB_VERB_REP_SYNC,
@@ -304,7 +946,34 @@ __rep_walk_dir(env, dir, datadir, context)
}
VPRINT(env, (env, DB_VERB_REP_SYNC, "Walk_dir: Dir %s has %d files",
(dir == NULL) ? "INMEM" : dir, cnt));
+ ret = __rep_add_files_to_list(
+ env, dir, datadir, context, (const char **)names, cnt);
+
+ __os_dirfree(env, names, cnt);
+ return (ret);
+}
+
+/*
+ * __rep_add_files_to_list --
+ *
+ * Add the given files to the file list.
+ */
+static int
+__rep_add_files_to_list(env, dir, datadir, context, names, cnt)
+ ENV *env;
+ const char *dir, *datadir;
+ FILE_LIST_CTX *context;
+ const char **names;
+ int cnt;
+{
+ __rep_fileinfo_args tmpfp;
+ size_t avail, len;
+ int first_file, i, ret;
+ u_int8_t uid[DB_FILE_ID_LEN];
+ const char *file, *subdb;
+
first_file = 1;
+ ret = 0;
for (i = 0; i < cnt; i++) {
VPRINT(env, (env, DB_VERB_REP_SYNC,
"Walk_dir: File %d name: %s", i, names[i]));
@@ -372,15 +1041,19 @@ __rep_walk_dir(env, dir, datadir, context)
DB_SET_DBT(tmpfp.uid, uid, DB_FILE_ID_LEN);
retry: avail = (size_t)(&context->buf[context->size] -
context->fillptr);
+ /*
+ * It is safe to cast to the old structs
+ * because the first part of the current
+ * struct matches the old structs.
+ */
if (context->version < DB_REPVERSION_53)
- /*
- * It is safe to cast to the old struct
- * because the first part of the current
- * struct matches the old struct.
- */
ret = __rep_fileinfo_v6_marshal(env, context->version,
(__rep_fileinfo_v6_args *)&tmpfp,
context->fillptr, avail, &len);
+ else if (context->version < DB_REPVERSION_61)
+ ret = __rep_fileinfo_v7_marshal(env, context->version,
+ (__rep_fileinfo_v7_args *)&tmpfp,
+ context->fillptr, avail, &len);
else
ret = __rep_fileinfo_marshal(env, context->version,
&tmpfp, context->fillptr, avail, &len);
@@ -409,9 +1082,7 @@ retry: avail = (size_t)(&context->buf[context->size] -
*/
context->fillptr += len;
}
-err:
- __os_dirfree(env, names, cnt);
- return (ret);
+err: return (ret);
}
/*
@@ -430,7 +1101,7 @@ __rep_is_replicated_db(name, dir)
/*
* Remaining things that don't have a "__db" prefix are eligible.
*/
- if (!IS_DB_FILE(name))
+ if (!IS_DB_FILE(name) || IS_BLOB_META(name))
return (1);
/* Here, we know we have a "__db" name. */
@@ -470,7 +1141,7 @@ __rep_check_uid(env, rfp, uid)
if (memcmp(rfp->uid.data, uid, DB_FILE_ID_LEN) == 0) {
VPRINT(env, (env, DB_VERB_REP_SYNC,
"Check_uid: Found matching file."));
- ret = DB_KEYEXIST;
+ ret = USR_ERR(env, DB_KEYEXIST);
}
return (ret);
@@ -489,6 +1160,7 @@ __rep_get_fileinfo(env, file, subdb, rfp, uid)
DB_THREAD_INFO *ip;
PAGE *pagep;
int lorder, ret, t_ret;
+ u_int32_t flags;
dbp = NULL;
dbc = NULL;
@@ -503,11 +1175,15 @@ __rep_get_fileinfo(env, file, subdb, rfp, uid)
* database handles would block the master from handling UPDATE_REQ.
*/
F_SET(dbp, DB_AM_RECOVER);
- if ((ret = __db_open(dbp, ip, NULL, file, subdb, DB_UNKNOWN,
- DB_RDONLY | (F_ISSET(env, ENV_THREAD) ? DB_THREAD : 0),
- 0, PGNO_BASE_MD)) != 0)
+ flags = DB_RDONLY | (F_ISSET(env, ENV_THREAD) ? DB_THREAD : 0);
+ if (file != NULL && IS_BLOB_META(file))
+ LF_SET(DB_INTERNAL_BLOB_DB);
+ if ((ret = __db_open(dbp, ip, NULL,
+ file, subdb, DB_UNKNOWN, flags, 0, PGNO_BASE_MD)) != 0)
goto err;
+ SET_LO_HI_VAR(dbp->blob_file_id, rfp->blob_fid_lo, rfp->blob_fid_hi);
+
if ((ret = __db_cursor(dbp, ip, NULL, &dbc, 0)) != 0)
goto err;
if ((ret = __memp_fget(dbp->mpf, &dbp->meta_pgno, ip, dbc->txn,
@@ -574,6 +1250,7 @@ __rep_page_req(env, ip, eid, rp, rec)
{
__rep_fileinfo_args *msgfp, msgf;
__rep_fileinfo_v6_args *msgfpv6;
+ __rep_fileinfo_v7_args *msgfpv7;
DB_MPOOLFILE *mpf;
DB_REP *db_rep;
REP *rep;
@@ -584,21 +1261,30 @@ __rep_page_req(env, ip, eid, rp, rec)
db_rep = env->rep_handle;
rep = db_rep->region;
+ /*
+ * Build a current struct by copying in the older
+ * version struct and then setting up the new fields.
+ * This is safe because all old fields are in the
+ * same location in the current struct.
+ */
if (rp->rep_version < DB_REPVERSION_53) {
- /*
- * Build a current struct by copying in the older
- * version struct and then setting up the data_dir.
- * This is safe because all old fields are in the
- * same location in the current struct.
- */
if ((ret = __rep_fileinfo_v6_unmarshal(env, rp->rep_version,
&msgfpv6, rec->data, rec->size, &next)) != 0)
return (ret);
memcpy(&msgf, msgfpv6, sizeof(__rep_fileinfo_v6_args));
msgf.dir.data = NULL;
msgf.dir.size = 0;
+ msgf.blob_fid_lo = msgf.blob_fid_hi = 0;
msgfp = &msgf;
msgfree = msgfpv6;
+ } else if (rp->rep_version < DB_REPVERSION_61) {
+ if ((ret = __rep_fileinfo_v7_unmarshal(env, rp->rep_version,
+ &msgfpv7, rec->data, rec->size, &next)) != 0)
+ return (ret);
+ memcpy(&msgf, msgfpv7, sizeof(__rep_fileinfo_v7_args));
+ msgf.blob_fid_lo = msgf.blob_fid_hi = 0;
+ msgfp = &msgf;
+ msgfree = msgfpv7;
} else {
if ((ret = __rep_fileinfo_unmarshal(env, rp->rep_version,
&msgfp, rec->data, rec->size, &next)) != 0)
@@ -624,7 +1310,7 @@ __rep_page_req(env, ip, eid, rp, rec)
(void)__rep_send_message(env, eid, REP_FILE_FAIL,
NULL, rec, 0, 0);
else
- ret = DB_NOTFOUND;
+ ret = USR_ERR(env, DB_NOTFOUND);
goto err;
}
@@ -738,7 +1424,7 @@ __rep_page_sendpages(env, ip, eid, rp, msgfp, mpf, dbp)
#ifdef HAVE_QUEUE
if ((ret = __qam_fget(qdbc, &p, 0, &pagep)) == ENOENT)
#endif
- ret = DB_PAGE_NOTFOUND;
+ ret = USR_ERR(env, DB_PAGE_NOTFOUND);
} else
ret = __memp_fget(mpf, &p, ip, NULL, 0, &pagep);
msgfp->pgno = p;
@@ -748,16 +1434,21 @@ __rep_page_sendpages(env, ip, eid, rp, msgfp, mpf, dbp)
RPRINT(env, (env, DB_VERB_REP_SYNC,
"sendpages: PAGE_FAIL on page %lu",
(u_long)p));
+ /*
+ * It is safe to cast to the old structs
+ * because the first part of the current
+ * struct matches the old structs.
+ */
if (rp->rep_version < DB_REPVERSION_53)
- /*
- * It is safe to cast to the old struct
- * because the first part of the current
- * struct matches the old struct.
- */
ret = __rep_fileinfo_v6_marshal(env,
rp->rep_version,
(__rep_fileinfo_v6_args *)msgfp,
buf, msgsz, &len);
+ else if (rp->rep_version < DB_REPVERSION_61)
+ ret = __rep_fileinfo_v7_marshal(env,
+ rp->rep_version,
+ (__rep_fileinfo_v7_args *)msgfp,
+ buf, msgsz, &len);
else
ret = __rep_fileinfo_marshal(env,
rp->rep_version, msgfp, buf,
@@ -772,7 +1463,7 @@ __rep_page_sendpages(env, ip, eid, rp, msgfp, mpf, dbp)
REP_PAGE_FAIL, &lsn, &msgdbt, 0, 0);
continue;
} else
- ret = DB_NOTFOUND;
+ ret = USR_ERR(env, DB_NOTFOUND);
goto err;
} else if (ret != 0)
goto err;
@@ -796,16 +1487,21 @@ __rep_page_sendpages(env, ip, eid, rp, msgfp, mpf, dbp)
RPRINT(env, (env, DB_VERB_REP_SYNC,
"sendpages: %lu, page lsn [%lu][%lu]", (u_long)p,
(u_long)pagep->lsn.file, (u_long)pagep->lsn.offset));
+ /*
+ * It is safe to cast to the old structs
+ * because the first part of the current
+ * structs matches the old struct.
+ */
if (rp->rep_version < DB_REPVERSION_53)
- /*
- * It is safe to cast to the old struct
- * because the first part of the current
- * struct matches the old struct.
- */
ret = __rep_fileinfo_v6_marshal(env,
rp->rep_version,
(__rep_fileinfo_v6_args *)msgfp,
buf, msgsz, &len);
+ else if (rp->rep_version < DB_REPVERSION_61)
+ ret = __rep_fileinfo_v7_marshal(env,
+ rp->rep_version,
+ (__rep_fileinfo_v7_args *)msgfp,
+ buf, msgsz, &len);
else
ret = __rep_fileinfo_marshal(env, rp->rep_version,
msgfp, buf, msgsz, &len);
@@ -1010,7 +1706,8 @@ __rep_update_setup(env, eid, rp, rec, savetime, lsn)
ZERO_LSN(lp->waiting_lsn);
ZERO_LSN(lp->max_wait_lsn);
ZERO_LSN(lp->max_perm_lsn);
- if (db_rep->rep_db == NULL)
+ ret = __rep_blob_cleanup(env, rep);
+ if (ret == 0 && db_rep->rep_db == NULL)
ret = __rep_client_dbinit(env, 0, REP_DB);
MUTEX_UNLOCK(env, rep->mtx_clientdb);
if (ret != 0)
@@ -1148,6 +1845,337 @@ err: /*
return (ret);
}
+/*
+ * __rep_blob_update
+ * Prepare to receive blob file data by setting up the blob gap database,
+ * then requesting the blob file data.
+ *
+ * PUBLIC: int __rep_blob_update __P((ENV *, int, DB_THREAD_INFO *, DBT *));
+ */
+int
+__rep_blob_update(env, eid, ip, rec)
+ ENV *env;
+ int eid;
+ DB_THREAD_INFO *ip;
+ DBT *rec;
+{
+ DBC *dbc;
+ DB_REP *db_rep;
+ DBT data, key;
+ REP *rep;
+ REGINFO *infop;
+ __rep_blob_file_args rbf;
+ __rep_blob_update_args rbu;
+ __rep_fileinfo_args *rfp;
+ db_seq_t blob_fid;
+ int ret;
+ off_t offset;
+ size_t len;
+ u_int32_t num_blobs;
+ u_int8_t keybuf[BLOB_KEY_SIZE], *ptr;
+
+ db_rep = env->rep_handle;
+ rep = db_rep->region;
+ infop = env->reginfo;
+ rfp = NULL;
+ dbc = NULL;
+ memset(&rbu, 0, sizeof(__rep_blob_update_args));
+ memset(&rbf, 0, sizeof(__rep_blob_file_args));
+
+ if ((ret = __rep_blob_update_unmarshal(
+ env, &rbu, rec->data, rec->size, &ptr)) != 0)
+ return (ret);
+ len = rec->size - __REP_BLOB_UPDATE_SIZE;
+
+ RPRINT(env, (env, DB_VERB_REP_SYNC,
+"blob_update: file_id %llu, num_blobs %lu, flags %lu, highest %llu",
+ (long long)rbu.blob_fid, (long)rbu.num_blobs,
+ (unsigned long)rbu.flags, (long long)rbu.highest_id));
+
+ MUTEX_LOCK(env, rep->mtx_clientdb);
+ REP_SYSTEM_LOCK(env);
+
+ /*
+ * Check if the world changed.
+ */
+ if (rep->sync_state != SYNC_PAGE)
+ goto unlock;
+
+ /* Make sure this is for the current database. */
+ GET_CURINFO(rep, infop, rfp);
+ GET_LO_HI(env, rfp->blob_fid_lo, rfp->blob_fid_hi, blob_fid, ret);
+ if (ret != 0)
+ goto unlock;
+
+ if (blob_fid != (db_seq_t)rbu.blob_fid)
+ goto unlock;
+
+ rep->highest_id = (db_seq_t)rbu.highest_id;
+ /*
+ * For each blob file, add an entry to the database for each 1 MB
+ * section of that file. The entries will be deleted as the
+ * coresponding blob chunks arrive and are written to disk.
+ */
+ if (db_rep->blob_dbp == NULL &&
+ (ret = __rep_client_dbinit(env, 0, REP_BLOB)) != 0)
+ goto unlock;
+
+ if ((ret = __db_cursor(db_rep->blob_dbp, ip, NULL, &dbc, 0)) != 0)
+ goto unlock;
+
+ /*
+ * Make sure no one else has populated the database, this could happen
+ * if the update message is sent twice.
+ */
+ memset(&key, 0, sizeof(DBT));
+ memset(&data, 0, sizeof(DBT));
+ if ((ret = __dbc_get(dbc, &key, &data, DB_FIRST)) != DB_NOTFOUND)
+ goto unlock;
+
+ /* It is possible for a blob database to have no blobs. */
+ if (rbu.num_blobs == 0) {
+ (void)__dbc_close(dbc);
+ dbc = NULL;
+ rep->blob_more_files = 0;
+ rep->gap_bl_hi_id = rep->gap_bl_hi_sid = 0;
+ rep->last_blob_id = rep->last_blob_sid = 0;
+ rep->prev_blob_id = rep->prev_blob_sid = 0;
+ rep->gap_bl_hi_off = 0;
+ rep->blob_sync = 0;
+ rep->highest_id = 0;
+ rep->blob_rereq = 0;
+ ret = __rep_blobdone(env, eid, ip, rep, blob_fid, 0);
+ goto unlock;
+ }
+
+ memset(&key, 0, sizeof(DBT));
+ memset(&data, 0, sizeof(DBT));
+ data.flags = key.flags = DB_DBT_USERMEM;
+ key.data = keybuf;
+ key.ulen = key.size = BLOB_KEY_SIZE;
+ data.data = (void *)&offset;
+ data.ulen = data.size = sizeof(offset);
+ num_blobs = 0;
+ while (num_blobs < rbu.num_blobs) {
+ if ((ret =
+ __rep_blob_file_unmarshal(env, &rbf, ptr, len, &ptr)) != 0)
+ goto unlock;
+ len -= __REP_BLOB_FILE_SIZE;
+
+ RPRINT(env, (env, DB_VERB_REP_SYNC,
+ "blob_update adding file: blob_id %llu, sdb_id %llu, blob_size %llu",
+ (long long)rbf.blob_id, (long long)rbf.blob_sid,
+ (long long)rbf.blob_size));
+
+ memcpy(keybuf, &rbf.blob_sid, BLOB_ID_SIZE);
+ memcpy(&(keybuf[BLOB_ID_SIZE]), &rbf.blob_id, BLOB_ID_SIZE);
+ offset = 0;
+ /*
+ * Add an entry for each megabyte of the blob file. Zero
+ * length blob files should have at least one entry.
+ */
+ do {
+ if ((ret = __dbc_put(dbc, &key, &data, 0)) != 0)
+ goto unlock;
+ offset += MEGABYTE;
+ /*
+ * Check for overflow, this can happen when the master
+ * supports 64 file offsets, but the client does not.
+ */
+ if (offset < 0) {
+ __db_errx(env,
+ DB_STR("3704",
+ "Blob file offset overflow"));
+ ret = EINVAL;
+ goto unlock;
+ }
+ } while ((u_int32_t)offset < rbf.blob_size);
+ num_blobs++;
+ }
+ /* Set whether there are more files after the ones on the list. */
+ if (F_ISSET(&rbu, BLOB_DONE))
+ rep->blob_more_files = 0;
+ else
+ rep->blob_more_files = 1;
+ rep->prev_blob_id = rep->last_blob_id;
+ rep->prev_blob_sid = rep->last_blob_sid;
+ rep->last_blob_sid = (db_seq_t)rbf.blob_sid;
+ rep->last_blob_id = (db_seq_t)rbf.blob_id;
+
+ /*
+ * Send the same message payload in a REP_BLOB_ALL_REQ message to get
+ * the blob data. Peer-to-peer initialization is not supported for
+ * blobs, so we can only send this back to the master despite the fact
+ * that building the list of blob files is expensive.
+ */
+ (void)__rep_send_message(
+ env, rep->master_id, REP_BLOB_ALL_REQ, NULL, rec, 0, 0);
+
+unlock: REP_SYSTEM_UNLOCK(env);
+ MUTEX_UNLOCK(env, rep->mtx_clientdb);
+ if (dbc != NULL)
+ (void)__dbc_close(dbc);
+
+ return (ret);
+}
+
+/*
+ * __rep_blob_allreq
+ * Request blob file data.
+ *
+ * PUBLIC: int __rep_blob_allreq __P((ENV *, int, DBT *));
+ */
+int
+__rep_blob_allreq(env, eid, rec)
+ ENV *env;
+ int eid;
+ DBT *rec;
+{
+ DB *dbp;
+ DB_FH *fhp;
+ DBT msg;
+ __rep_blob_chunk_args rbc;
+ __rep_blob_file_args rbf;
+ __rep_blob_update_args rbu;
+ db_seq_t old_sdb_id;
+ int done, ret;
+ off_t offset;
+ size_t len;
+ u_int32_t num_blobs;
+ u_int8_t *chunk_buf, *msg_buf, *ptr;
+
+ dbp = NULL;
+ fhp = NULL;
+ chunk_buf = msg_buf = NULL;
+ memset(&rbu, 0, sizeof(__rep_blob_update_args));
+ memset(&rbc, 0, sizeof(__rep_blob_chunk_args));
+ memset(&msg, 0, sizeof(DBT));
+
+ if ((ret =
+ __os_malloc(env, MEGABYTE + __REP_BLOB_CHUNK_SIZE, &msg_buf)) != 0)
+ goto err;
+ msg.data = msg_buf;
+ msg.ulen = MEGABYTE + __REP_BLOB_CHUNK_SIZE;
+ if ((ret = __os_malloc(env, MEGABYTE, &chunk_buf)) != 0)
+ goto err;
+ rbc.data.data = chunk_buf;
+ rbc.data.ulen = MEGABYTE;
+ rbc.data.flags = DB_DBT_USERMEM;
+
+ /*
+ * The REP_BLOB_ALL_REQ message sends the REP_BLOB_UPDATE message
+ * payload back to the master to request the actual blobs after the
+ * client has prepared itself to receive them.
+ */
+ len = rec->size;
+ if ((ret = __rep_blob_update_unmarshal(
+ env, &rbu, rec->data, rec->size, &ptr)) != 0)
+ goto err;
+ len -= __REP_BLOB_UPDATE_SIZE;
+
+ RPRINT(env, (env, DB_VERB_REP_SYNC,
+ "blob_all_req: file_id %llu, num_blobs %lu, flags %lu",
+ (long long)rbu.blob_fid, (long)rbu.num_blobs,
+ (unsigned long)rbu.flags));
+
+ if ((ret = __db_create_internal(&dbp, env, 0)) != 0)
+ goto err;
+ dbp->blob_file_id = (db_seq_t)rbu.blob_fid;
+ rbc.blob_fid = rbu.blob_fid;
+ num_blobs = 0;
+ /*
+ * The list of files to send is included in the message, go
+ * through the list and send each file in pieces.
+ */
+ while (num_blobs < rbu.num_blobs) {
+ num_blobs++;
+ if ((ret = __rep_blob_file_unmarshal(
+ env, &rbf, ptr, len, &ptr)) != 0)
+ goto err;
+ len -= __REP_BLOB_FILE_SIZE;
+ old_sdb_id = dbp->blob_sdb_id;
+ dbp->blob_sdb_id = (db_seq_t)rbf.blob_sid;
+ rbc.flags = 0;
+ rbc.blob_sid = rbf.blob_sid;
+ rbc.blob_id = rbf.blob_id;
+ /* Free the sub-directory information if it has changed. */
+ if (old_sdb_id != dbp->blob_sdb_id &&
+ dbp->blob_sub_dir != NULL) {
+ __os_free(env, dbp->blob_sub_dir);
+ dbp->blob_sub_dir = NULL;
+ }
+ if (dbp->blob_sub_dir == NULL) {
+ if ((ret = __blob_make_sub_dir(env, &dbp->blob_sub_dir,
+ dbp->blob_file_id, dbp->blob_sdb_id)) != 0)
+ goto err;
+ }
+ if ((ret = __blob_file_open(dbp,
+ &fhp, (db_seq_t)rbf.blob_id, DB_FOP_READONLY, 0)) != 0) {
+ /*
+ * The file may have been deleted between creating the
+ * list and sending the data. Send a message saying
+ * the file has been deleted.
+ */
+ if (ret == ENOENT) {
+ F_SET(&rbc, BLOB_DELETE);
+ rbc.data.size = 0;
+ __rep_blob_chunk_marshal(env, &rbc, msg.data);
+ msg.size = __REP_BLOB_CHUNK_SIZE;
+ (void)__rep_send_message(env,
+ eid, REP_BLOB_CHUNK, NULL, &msg, 0, 0);
+ ret = 0;
+ fhp = NULL;
+ continue;
+ }
+ goto err;
+ }
+ offset = 0;
+ do {
+ done = 0;
+ rbc.flags = 0;
+ if ((ret = __blob_file_read(
+ env, fhp, &rbc.data, offset, MEGABYTE)) != 0)
+ goto err;
+ DB_ASSERT(env, rbc.data.size <= MEGABYTE);
+
+ /*
+ * In rare cases the blob file may have gotten shorter
+ * since the list was created.
+ */
+ if (rbc.data.size < (u_int32_t)MEGABYTE && (u_int64_t)
+ (offset + rbc.data.size) < rbf.blob_size) {
+ F_SET(&rbc, BLOB_CHUNK_FAIL);
+ done = 1;
+ }
+ /* File may have grown since the list was made. */
+ if ((u_int64_t)
+ (offset + rbc.data.size) > rbf.blob_size) {
+ rbc.data.size =
+ (u_int32_t)((off_t)rbf.blob_size - offset);
+ }
+ rbc.offset = (u_int64_t)offset;
+ __rep_blob_chunk_marshal(env, &rbc, msg.data);
+ msg.size = __REP_BLOB_CHUNK_SIZE + rbc.data.size;
+ (void)__rep_send_message(
+ env, eid, REP_BLOB_CHUNK, NULL, &msg, 0, 0);
+ offset += MEGABYTE;
+ } while ((u_int64_t)offset < rbf.blob_size && !done);
+
+ if (fhp != NULL && (ret = __os_closehandle(env, fhp)) != 0)
+ goto err;
+ fhp = NULL;
+ }
+err: if (chunk_buf != NULL)
+ __os_free(env, chunk_buf);
+ if (msg_buf != NULL)
+ __os_free(env, msg_buf);
+ if (fhp != NULL)
+ (void)__os_closehandle(env, fhp);
+ if (dbp != 0)
+ (void)__db_close(dbp, NULL, 0);
+ return (ret);
+}
+
static int
__rep_find_inmem(env, rfp, unused)
ENV *env;
@@ -1157,6 +2185,11 @@ __rep_find_inmem(env, rfp, unused)
COMPQUIET(env, NULL);
COMPQUIET(unused, NULL);
+ /*
+ * Cannot assume all databases are in-memory because abbreviated
+ * internal inits from 5.3 and earlier are not limited to in-memory
+ * databases.
+ */
return (FLD_ISSET(rfp->db_flags, DB_AM_INMEM) ? DB_KEYEXIST : 0);
}
@@ -1172,12 +2205,9 @@ __rep_remove_nimdbs(env)
FILE_LIST_CTX context;
int ret;
- if ((ret = __os_calloc(env, 1, MEGABYTE, &context.buf)) != 0)
+ if ((ret = __rep_init_file_list_context(env,
+ DB_REPVERSION, 0, 0, &context)) != 0)
return (ret);
- context.size = MEGABYTE;
- context.count = 0;
- context.fillptr = context.buf;
- context.version = DB_REPVERSION;
/* NB: "NULL" asks walk_dir to consider only in-memory DBs */
if ((ret = __rep_walk_dir(env, NULL, NULL, &context)) != 0)
@@ -1240,14 +2270,11 @@ __rep_remove_all(env, msg_version, rec)
* 1. Get list of databases currently present at this client, which we
* intend to remove.
*/
- if ((ret = __os_calloc(env, 1, MEGABYTE, &context.buf)) != 0)
- return (ret);
- context.size = MEGABYTE;
- context.count = 0;
- context.version = DB_REPVERSION;
/* Reserve space for the marshaled update_args. */
- context.fillptr = FIRST_FILE_PTR(context.buf);
+ if ((ret = __rep_init_file_list_context(env,
+ DB_REPVERSION, 0, 1, &context)) != 0)
+ return (ret);
if ((ret = __rep_find_dbs(env, &context)) != 0)
goto out;
@@ -1333,6 +2360,9 @@ __rep_remove_all(env, msg_version, rec)
FIRST_FILE_PTR(context.buf), context.size,
context.count, __rep_remove_file, NULL)) != 0)
goto out;
+ /* Remove the blob directory. */
+ if ((ret = __blob_del_hierarchy(env)) != 0)
+ goto out;
/*
* 4. Safe-store the (new) list of database files we intend to copy from
@@ -1445,6 +2475,8 @@ __rep_remove_file(env, rfp, unused)
#ifdef HAVE_QUEUE
DB_THREAD_INFO *ip;
#endif
+ APPNAME appname;
+ db_seq_t blob_fid, blob_sid;
char *name;
int ret, t_ret;
@@ -1496,29 +2528,53 @@ __rep_remove_file(env, rfp, unused)
* That will only have removed extent files. Now
* we need to deal with the actual file itself.
*/
+ appname = __rep_is_internal_rep_file(rfp->info.data) ?
+ DB_APP_META : (IS_BLOB_META(rfp->info.data) ?
+ DB_APP_BLOB : DB_APP_DATA);
if (FLD_ISSET(rfp->db_flags, DB_AM_INMEM)) {
if ((ret = __db_create_internal(&dbp, env, 0)) != 0)
return (ret);
MAKE_INMEM(dbp);
F_SET(dbp, DB_AM_RECOVER); /* Skirt locking. */
ret = __db_inmem_remove(dbp, NULL, name);
- } else if ((ret = __fop_remove(env,
- NULL, rfp->uid.data, name, (const char **)&rfp->dir.data,
- __rep_is_internal_rep_file(rfp->info.data) ?
- DB_APP_META : DB_APP_DATA, 0)) != 0)
+ } else if ((ret = __fop_remove(env, NULL, rfp->uid.data, name,
+ (const char **)&rfp->dir.data, appname, 0)) != 0) {
/*
* If fop_remove fails, it could be because
* the client has a different data_dir
* structure than the master. Retry with the
- * local, default settings.
+ * local, default settings.
*/
ret = __fop_remove(env,
- NULL, rfp->uid.data, name, NULL,
- __rep_is_internal_rep_file(rfp->info.data) ?
- DB_APP_META : DB_APP_DATA, 0);
-#ifdef HAVE_QUEUE
-out:
+ NULL, rfp->uid.data, name, NULL, appname, 0);
+#ifdef DB_WIN32
+ /*
+ * Deleting a blob meta database can result in a
+ * ERROR_PATH_NOT_FOUND error on windows, so treat
+ * that as an ENOENT.
+ */
+ if (__os_posix_err(ret) == ENOENT)
+ ret = ENOENT;
#endif
+ }
+ /* Clean any blob directories. */
+ if (ret == 0 && appname == DB_APP_BLOB) {
+ /* dbp has not been set, since queues do not support blobs. */
+ DB_ASSERT(env, dbp == NULL);
+ if ((ret = __db_create_internal(&dbp, env, 0)) != 0)
+ goto out;
+ if ((ret = __blob_path_to_dir_ids(
+ env, name, &blob_fid, &blob_sid)) != 0)
+ goto out;
+ /* blob_fid == 0 if it is the top level blob meta db. */
+ if (blob_fid != 0) {
+ dbp->blob_file_id = blob_fid;
+ dbp->blob_sdb_id = blob_sid;
+ if ((ret = __blob_del_all(dbp, NULL, 0)) != 0)
+ goto out;
+ }
+ }
+out:
if (dbp != NULL &&
(t_ret = __db_close(dbp, NULL, DB_NOSYNC)) != 0 && ret == 0)
ret = t_ret;
@@ -1610,10 +2666,11 @@ __rep_page(env, ip, eid, rp, rec)
{
DB_REP *db_rep;
- DBT key, data;
+ DBT data, key;
REP *rep;
__rep_fileinfo_args *msgfp, msgf;
__rep_fileinfo_v6_args *msgfpv6;
+ __rep_fileinfo_v7_args *msgfpv7;
db_recno_t recno;
int ret;
char *msg;
@@ -1647,21 +2704,30 @@ __rep_page(env, ip, eid, rp, rec)
(u_long)rep->first_lsn.offset));
return (DB_REP_PAGEDONE);
}
+ /*
+ * Build a current struct by copying in the older
+ * version struct and then setting up the new fields.
+ * This is safe because all old fields are in the
+ * same location in the current struct.
+ */
if (rp->rep_version < DB_REPVERSION_53) {
- /*
- * Build a current struct by copying in the older
- * version struct and then setting up the data_dir.
- * This is safe because all old fields are in the
- * same location in the current struct.
- */
if ((ret = __rep_fileinfo_v6_unmarshal(env, rp->rep_version,
&msgfpv6, rec->data, rec->size, NULL)) != 0)
return (ret);
memcpy(&msgf, msgfpv6, sizeof(__rep_fileinfo_v6_args));
msgf.dir.data = NULL;
msgf.dir.size = 0;
+ msgf.blob_fid_lo = msgf.blob_fid_hi = 0;
msgfp = &msgf;
msgfree = msgfpv6;
+ } else if (rp->rep_version < DB_REPVERSION_61) {
+ if ((ret = __rep_fileinfo_v7_unmarshal(env, rp->rep_version,
+ &msgfpv7, rec->data, rec->size, NULL)) != 0)
+ return (ret);
+ memcpy(&msgf, msgfpv7, sizeof(__rep_fileinfo_v7_args));
+ msgf.blob_fid_lo = msgf.blob_fid_hi = 0;
+ msgfp = &msgf;
+ msgfree = msgfpv7;
} else {
if ((ret = __rep_fileinfo_unmarshal(env, rp->rep_version,
&msgfp, rec->data, rec->size, NULL)) != 0)
@@ -1671,9 +2737,9 @@ __rep_page(env, ip, eid, rp, rec)
MUTEX_LOCK(env, rep->mtx_clientdb);
REP_SYSTEM_LOCK(env);
/*
- * Check if the world changed.
+ * Check if the world changed or if we are in the blob sync phase.
*/
- if (rep->sync_state != SYNC_PAGE) {
+ if (rep->sync_state != SYNC_PAGE || rep->blob_sync != 0) {
ret = DB_REP_PAGEDONE;
goto err;
}
@@ -1785,6 +2851,218 @@ err: REP_SYSTEM_UNLOCK(env);
}
/*
+ * __rep_blob_chunk
+ * Process a blob chunk message. When a blob chunk arrives, delete its
+ * entry in the blob chunk gap database to show that it has arrived, and
+ * write the data to the blob file.
+ *
+ * PUBLIC: int __rep_blob_chunk __P((ENV *, int, DB_THREAD_INFO *, DBT *));
+ */
+int
+__rep_blob_chunk(env, eid, ip, rec)
+ ENV *env;
+ int eid;
+ DB_THREAD_INFO *ip;
+ DBT *rec;
+{
+ DB_REP *db_rep;
+ DBC *dbc;
+ DB_FH *fhp;
+ DBT data, key;
+ REP *rep;
+ REGINFO *infop;
+ __rep_blob_chunk_args rbc;
+ __rep_fileinfo_args *rfp;
+ db_seq_t blob_fid;
+ char *blob_sub_dir, *last, *mkpath, *name, *path;
+ int ret;
+ off_t offset;
+ u_int8_t keybuf[BLOB_KEY_SIZE], *ptr;
+
+ ret = 0;
+ db_rep = env->rep_handle;
+ rep = db_rep->region;
+ infop = env->reginfo;
+ dbc = NULL;
+ blob_sub_dir = name = NULL;
+ path = NULL;
+ fhp = NULL;
+
+ if (rep->sync_state != SYNC_PAGE)
+ return (DB_REP_PAGEDONE);
+
+ if ((ret = __rep_blob_chunk_unmarshal(
+ env, &rbc, rec->data, rec->size, &ptr)) != 0)
+ return (ret);
+
+ MUTEX_LOCK(env, rep->mtx_clientdb);
+ REP_SYSTEM_LOCK(env);
+ /*
+ * Check if the world changed.
+ */
+ if (rep->sync_state != SYNC_PAGE) {
+ ret = DB_REP_PAGEDONE;
+ goto err;
+ }
+ /*
+ * We should not ever be in internal init with a lease granted.
+ */
+ DB_ASSERT(env,
+ !IS_USING_LEASES(env) || __rep_islease_granted(env) == 0);
+
+ /* Make sure this is for the current file. */
+ GET_CURINFO(rep, infop, rfp);
+ GET_LO_HI(env, rfp->blob_fid_lo, rfp->blob_fid_hi, blob_fid, ret);
+ if (ret != 0)
+ goto err;
+
+ if (blob_fid != (db_seq_t)rbc.blob_fid) {
+ ret = DB_REP_PAGEDONE;
+ goto err;
+ }
+
+ RPRINT(env, (env, DB_VERB_REP_SYNC,
+"REP_BLOB_CHUNK: blob_fid %llu, blob_sid %llu, blob_id %llu, offset %llu",
+ (unsigned long long)rbc.blob_fid,
+ (unsigned long long)rbc.blob_sid,
+ (unsigned long long)rbc.blob_id, (long long)rbc.offset));
+
+ if (db_rep->blob_dbp == NULL &&
+ (ret = __rep_client_dbinit(env, 0, REP_BLOB)) != 0) {
+ RPRINT(env, (env, DB_VERB_REP_SYNC,
+ "REP_BLOB_CHUNK: Client_dbinit %s",
+ db_strerror(ret)));
+ goto err;
+ }
+
+ /* Set the highest blob chunk received. */
+ if (rbc.blob_sid > (u_int64_t)rep->gap_bl_hi_sid ||
+ (rbc.blob_sid == (u_int64_t)rep->gap_bl_hi_sid &&
+ rbc.blob_id > (u_int64_t)rep->gap_bl_hi_id) ||
+ (rbc.blob_sid == (u_int64_t)rep->gap_bl_hi_sid &&
+ rbc.blob_id == (u_int64_t)rep->gap_bl_hi_id &&
+ rbc.offset > (u_int64_t)rep->gap_bl_hi_off)) {
+ rep->gap_bl_hi_id = (db_seq_t)rbc.blob_id;
+ rep->gap_bl_hi_sid = (db_seq_t)rbc.blob_sid;
+ rep->gap_bl_hi_off = (off_t)rbc.offset;
+ }
+
+ memset(&key, 0, sizeof(DBT));
+ memset(&data, 0, sizeof(DBT));
+ data.flags = key.flags = DB_DBT_USERMEM;
+ key.data = keybuf;
+ key.ulen = key.size = BLOB_KEY_SIZE;
+ data.data = (void *)&offset;
+ data.ulen = data.size = sizeof(offset);
+ /* BLOB_DELETE is set if the blob file was deleted. */
+ if (F_ISSET(&rbc, BLOB_DELETE)) {
+ memcpy(keybuf, &rbc.blob_sid, BLOB_ID_SIZE);
+ memcpy(&(keybuf[BLOB_ID_SIZE]), &rbc.blob_id, BLOB_ID_SIZE);
+ if ((ret = __db_del(
+ db_rep->blob_dbp, ip, NULL, &key, 0)) != 0) {
+ if (ret == DB_NOTFOUND)
+ ret = 0;
+ goto err;
+ }
+ goto done;
+ }
+
+ if ((ret = __db_cursor(db_rep->blob_dbp, ip, NULL, &dbc, 0)) != 0)
+ goto err;
+ offset = (off_t)rbc.offset;
+ memcpy(keybuf, &rbc.blob_sid, BLOB_ID_SIZE);
+ memcpy(&(keybuf[BLOB_ID_SIZE]), &rbc.blob_id, BLOB_ID_SIZE);
+ /* If not found we have already dealt with this chunk. */
+ if ((ret = __dbc_get(dbc, &key, &data, DB_GET_BOTH)) != 0) {
+ if (ret == DB_NOTFOUND) {
+ ret = 0;
+ goto done;
+ }
+ goto err;
+ }
+ /*
+ * BLOB_CHUNK_FAIL is set if the blob file was truncated to shorter
+ * than the BLOB_CHUNK offset.
+ */
+ if (F_ISSET(&rbc, BLOB_CHUNK_FAIL)) {
+ while (ret == 0) {
+ if ((ret = __dbc_del(dbc, 0)) != 0)
+ goto err;
+ ret = __dbc_get(dbc, &key, &data, DB_NEXT_DUP);
+ }
+ if (ret == DB_NOTFOUND)
+ ret = 0;
+ if ((ret = __dbc_close(dbc)) != 0)
+ goto err;
+ dbc = NULL;
+ goto done;
+ }
+ if ((ret = __dbc_del(dbc, 0)) != 0)
+ goto err;
+ if ((ret = __dbc_close(dbc)) != 0)
+ goto err;
+ dbc = NULL;
+
+ if ((ret = __blob_make_sub_dir(env, &blob_sub_dir,
+ (db_seq_t)rbc.blob_fid, (db_seq_t)rbc.blob_sid)) != 0)
+ goto err;
+
+ if ((ret = __blob_id_to_path(
+ env, blob_sub_dir, (db_seq_t)rbc.blob_id, &name)) != 0)
+ goto err;
+
+ if ((ret = __db_appname(env, DB_APP_BLOB, name, NULL, &path)) != 0 )
+ goto err;
+
+ last = __db_rpath(path);
+ DB_ASSERT(env, last != NULL);
+ *last = '\0';
+ if (__os_exists(env, path, NULL) != 0) {
+ *last = PATH_SEPARATOR[0];
+ mkpath = path;
+#ifdef DB_WIN32
+ /*
+ * Absolute paths on windows can result in it creating a "C"
+ * or "D" directory in the working directory.
+ */
+ if (__os_abspath(mkpath))
+ mkpath += 2;
+#endif
+ if ((ret = __db_mkpath(env, mkpath)) != 0)
+ goto err;
+ }
+ *last = PATH_SEPARATOR[0];
+ if ((ret = __os_open(
+ env, path, 0, DB_OSO_CREATE, env->db_mode, &fhp)) != 0)
+ goto err;
+
+ /* Write the data into the blob file. */
+ if ((ret = __fop_write_file(env, NULL, name, NULL, DB_APP_BLOB,
+ fhp, (off_t)rbc.offset, rbc.data.data, rbc.data.size, 0)) != 0)
+ goto err;
+ if ((ret = __os_closehandle(env, fhp)) != 0)
+ goto err;
+ fhp = NULL;
+
+done: ret = __rep_blobdone(env, eid, ip, rep, blob_fid, 0);
+
+err: REP_SYSTEM_UNLOCK(env);
+ MUTEX_UNLOCK(env, rep->mtx_clientdb);
+ if (path != NULL)
+ __os_free(env, path);
+ if (blob_sub_dir != NULL)
+ __os_free(env, blob_sub_dir);
+ if (name != NULL)
+ __os_free(env, name);
+ if (fhp != NULL)
+ (void)__os_closehandle(env, fhp);
+ if (dbc != NULL)
+ (void)__dbc_close(dbc);
+
+ return (ret);
+}
+
+/*
* __rep_write_page -
* Write this page into a database.
*/
@@ -1801,13 +3079,16 @@ __rep_write_page(env, ip, rep, msgfp)
DB_PGINFO *pginfo;
DB_REP *db_rep;
REGINFO *infop;
+ APPNAME appname;
__rep_fileinfo_args *rfp;
+ char *blob_path;
int ret;
void *dst;
db_rep = env->rep_handle;
infop = env->reginfo;
rfp = NULL;
+ blob_path = NULL;
/*
* If this is the first page we're putting in this database, we need
@@ -1830,15 +3111,39 @@ __rep_write_page(env, ip, rep, msgfp)
RPRINT(env, (env, DB_VERB_REP_SYNC,
"rep_write_page: Calling fop_create for %s",
(char *)rfp->info.data));
+ appname = (__rep_is_internal_rep_file(rfp->info.data) ?
+ DB_APP_META : (IS_BLOB_META((char *)rfp->info.data)
+ ? DB_APP_BLOB : DB_APP_DATA));
+ /*
+ * May have to create the directory structure for blob
+ * metadata databases.
+ */
+ if (appname == DB_APP_BLOB) {
+ if ((ret = __db_appname(env,
+ appname, rfp->info.data,
+ (const char **)&rfp->dir.data,
+ &blob_path)) != 0)
+ goto err;
+#ifdef DB_WIN32
+ /*
+ * Absolute paths on windows can result in
+ * it creating a "C" or "D"
+ * directory in the working directory.
+ */
+ if (__os_abspath(blob_path))
+ blob_path += 2;
+#endif
+ if ((ret = __db_mkpath(env, blob_path)) != 0)
+ goto err;
+ }
if ((ret = __fop_create(env, NULL, NULL,
rfp->info.data, (const char **)&rfp->dir.data,
- __rep_is_internal_rep_file(rfp->info.data) ?
- DB_APP_META : DB_APP_DATA, env->db_mode, 0)) != 0) {
+ appname, env->db_mode, 0)) != 0) {
/*
* If fop_create fails, it could be because
* the client has a different data_dir
* structure than the master. Retry with the
- * local, default settings.
+ * local, default settings.
*/
RPRINT(env, (env, DB_VERB_REP_SYNC,
"rep_write_page: fop_create ret %d. Retry for %s, master datadir %s",
@@ -1929,7 +3234,10 @@ __rep_write_page(env, ip, rep, msgfp)
ret = __memp_fput(db_rep->file_mpf,
ip, dst, db_rep->file_dbp->priority);
-err: return (ret);
+err: if (blob_path != NULL)
+ __os_free(env, blob_path);
+
+ return (ret);
}
/*
@@ -1976,7 +3284,7 @@ __rep_page_gap(env, rep, msgfp, type)
* Make sure we're still talking about the same file.
* If not, we're done here.
*/
- if (rfp->filenum != msgfp->filenum) {
+ if (rfp->filenum != msgfp->filenum || rep->blob_sync != 0) {
ret = DB_REP_PAGEDONE;
goto err;
}
@@ -2135,6 +3443,53 @@ err:
}
/*
+ * __rep_blob_cleanup -
+ * Clean up blob internal init information.
+ *
+ * Caller must hold client database mutex (mtx_clientdb) and
+ * REP_SYSTEM_LOCK.
+ */
+static int
+__rep_blob_cleanup(env, rep)
+ ENV *env;
+ REP *rep;
+{
+ DB_REP *db_rep;
+ DB_THREAD_INFO *ip;
+ int ret, t_ret;
+ u_int32_t count;
+
+ ret = 0;
+ db_rep = env->rep_handle;
+
+ /*
+ * Delete any remaining records in the blob chunk database. The blob
+ * chunk database contains descriptions of the blob chunks that have
+ * yet to arrive. If not deleted, the remaining records could
+ * interfere with how the next REP_BLOB_UPDATE message is handled.
+ */
+ if (db_rep->blob_dbp != NULL) {
+ ENV_GET_THREAD_INFO(env, ip);
+ ret = __db_truncate(db_rep->blob_dbp, ip, NULL, &count);
+ t_ret = __db_close(db_rep->blob_dbp, NULL, DB_NOSYNC);
+ if (ret == 0)
+ ret = t_ret;
+ db_rep->blob_dbp = NULL;
+ }
+ /* Reset blob internal init control values. */
+ rep->gap_bl_hi_id = rep->gap_bl_hi_sid = 0;
+ rep->last_blob_id = rep->last_blob_sid = 0;
+ rep->prev_blob_id = rep->prev_blob_sid = 0;
+ rep->gap_bl_hi_off = 0;
+ rep->blob_more_files = 0;
+ rep->blob_sync = 0;
+ rep->highest_id = 0;
+ rep->blob_rereq = 0;
+
+ return (ret);
+}
+
+/*
* __rep_init_cleanup -
* Clean up internal initialization pieces.
*
@@ -2162,9 +3517,10 @@ __rep_init_cleanup(env, rep, force)
/*
* 1. Close up the file data pointer we used.
* 2. Close/reset the page database.
- * 3. Close/reset the queue database if we're forcing a cleanup.
- * 4. Free current file info.
- * 5. If we have all files or need to force, free original file info.
+ * 3. Close/truncate the blob chunk gap database.
+ * 4. Close/reset the queue database if we're forcing a cleanup.
+ * 5. Free current file info.
+ * 6. If we have all files or need to force, free original file info.
*/
if (db_rep->file_mpf != NULL) {
ret = __memp_fclose(db_rep->file_mpf, 0);
@@ -2176,6 +3532,15 @@ __rep_init_cleanup(env, rep, force)
if (ret == 0)
ret = t_ret;
}
+ /*
+ * Truncate the blob chunk gap database, since entries in the database
+ * are for blob chunks we are expecting to arrive. Also reset blob
+ * internal init control values.
+ */
+ t_ret = __rep_blob_cleanup(env, rep);
+ if (ret == 0)
+ ret = t_ret;
+
if (force && db_rep->queue_dbc != NULL) {
queue_dbp = db_rep->queue_dbc->dbp;
if ((t_ret = __dbc_close(db_rep->queue_dbc)) != 0 && ret == 0)
@@ -2324,8 +3689,8 @@ __rep_clean_interrupted(env)
* __rep_filedone -
* We need to check if we're done with the current file after
* processing the current page. Stat the database to see if
- * we have all the pages. If so, we need to clean up/close
- * this one, set up for the next one, and ask for its pages,
+ * we have all the pages and blobs. If so, we need to clean up/close
+ * this one, set up for the next one, and ask for its pages and blobs,
* or if this is the last file, request the log records and
* move to the REP_RECOVER_LOG state.
*/
@@ -2338,9 +3703,14 @@ __rep_filedone(env, ip, eid, rep, msgfp, type)
__rep_fileinfo_args *msgfp;
u_int32_t type;
{
+ DBT msg;
REGINFO *infop;
__rep_fileinfo_args *rfp;
+ __rep_blob_update_req_args rbur;
int ret;
+ u_int8_t buf[__REP_BLOB_UPDATE_REQ_SIZE];
+
+ memset(&msg, 0, sizeof(DBT));
/*
* We've put our page, now we need to do any gap processing
@@ -2375,8 +3745,96 @@ __rep_filedone(env, ip, eid, rep, msgfp, type)
((ret = __rep_queue_filedone(env, ip, rep, rfp)) !=
DB_REP_PAGEDONE))
return (ret);
+
+ /* Request blob files. */
+ if (rfp->blob_fid_lo != 0 || rfp->blob_fid_hi != 0) {
+ ret = 0;
+ rep->blob_sync = 1;
+ memset(&rbur, 0, sizeof(__rep_blob_update_req_args));
+ GET_LO_HI(env,
+ rfp->blob_fid_lo, rfp->blob_fid_hi, rbur.blob_fid, ret);
+ msg.size = __REP_BLOB_UPDATE_REQ_SIZE;
+ msg.data = buf;
+ __rep_blob_update_req_marshal(env, &rbur, msg.data);
+ (void)__rep_send_message(env,
+ rep->master_id, REP_BLOB_UPDATE_REQ, NULL, &msg, 0, 0);
+ return (ret);
+ }
+
+ /*
+ * We have all the data for this file. Clean up.
+ */
+ if ((ret = __rep_init_cleanup(env, rep, 0)) != 0)
+ return (ret);
+
+ rep->curfile++;
+ ret = __rep_nextfile(env, eid, rep);
+
+ return (ret);
+}
+
+/*
+ * __rep_blobdone -
+ * We need to check if we're done with the current file after
+ * processing the current blob chunk.
+ *
+ * Caller must hold client database mutex (mtx_clientdb) and
+ * REP_SYSTEM_LOCK.
+ */
+static int
+__rep_blobdone(env, eid, ip, rep, blob_fid, force)
+ ENV *env;
+ int eid;
+ DB_THREAD_INFO *ip;
+ REP *rep;
+ db_seq_t blob_fid;
+ int force;
+{
+ DBT msg;
+ __rep_blob_update_req_args rbur;
+ int done, ret;
+ u_int8_t buf[__REP_BLOB_UPDATE_REQ_SIZE];
+
/*
- * We have all the pages for this file. Clean up.
+ * We've written our blob chunk, now we need to do any gap processing
+ * that might be needed to re-request chunks.
+ */
+ done = 0;
+ ret = __rep_blob_chunk_gap(env, eid, ip, rep, &done, blob_fid, force);
+ /*
+ * The world changed while we were doing gap processing.
+ * We're done here.
+ */
+ if (ret == DB_REP_PAGEDONE)
+ return (0);
+ else if (ret != 0)
+ goto err;
+
+ /*
+ * If the blob database is empty then all files in the current list
+ * have been processed. However, there may be more files on the
+ * master, so request the next list if that is the case.
+ */
+ if (done && rep->blob_more_files) {
+ memset(&rbur, 0, sizeof(__rep_blob_update_req_args));
+ rbur.blob_fid = (u_int64_t)blob_fid;
+ rbur.blob_sid = (u_int64_t)rep->last_blob_sid;
+ rbur.blob_id = (u_int64_t)rep->last_blob_id;
+ rbur.highest_id = (u_int64_t)rep->highest_id;
+ rep->gap_bl_hi_id = rep->gap_bl_hi_sid = 0;
+ rep->gap_bl_hi_off = 0;
+ rep->blob_rereq = 0;
+ msg.size = __REP_BLOB_UPDATE_REQ_SIZE;
+ msg.data = buf;
+ __rep_blob_update_req_marshal(env, &rbur, msg.data);
+ (void)__rep_send_message(env,
+ rep->master_id, REP_BLOB_UPDATE_REQ, NULL, &msg, 0, 0);
+ return (0);
+ } else if (!done)
+ return (0);
+
+ /*
+ * We have all the data for this file. Clean up.
*/
if ((ret = __rep_init_cleanup(env, rep, 0)) != 0)
goto err;
@@ -2388,6 +3846,255 @@ err:
}
/*
+ * __rep_blob_chunk_gap -
+ * We have written a blob chunk. Now check if there are any that need
+ * to be re-requested. The blob chunk gap database contains
+ * descriptions of all the blob chunks that have yet to arrive.
+ *
+ * Caller must hold client database mutex (mtx_clientdb) and
+ * REP_SYSTEM_LOCK.
+ */
+static int
+__rep_blob_chunk_gap(env, eid, ip, rep, done, blob_fid, force)
+ ENV *env;
+ int eid;
+ DB_THREAD_INFO *ip;
+ REP *rep;
+ int *done;
+ db_seq_t blob_fid;
+ int force;
+{
+ DBC *dbc;
+ DBT data, high, key, msg;
+ DB_LOG *dblp;
+ DB_REP *db_rep;
+ LOG *lp;
+ REGINFO *infop;
+ __rep_blob_chunk_req_args rbcr;
+ __rep_fileinfo_args *rfp;
+ db_seq_t cur_blob_fid;
+ off_t offset;
+ int ret;
+ u_int8_t buf[BLOB_KEY_SIZE], msgbuf[__REP_BLOB_CHUNK_REQ_SIZE];
+
+ db_rep = env->rep_handle;
+ dblp = env->lg_handle;
+ lp = dblp->reginfo.primary;
+ infop = env->reginfo;
+ ret = 0;
+ dbc = NULL;
+ *done = 0;
+
+ /* eid will be used when peer-to-peer is re-enabled for blobs. */
+ COMPQUIET(eid, 0);
+
+ /*
+ * Make sure we're still talking about the same file.
+ * If not, we're done here.
+ */
+ GET_CURINFO(rep, infop, rfp);
+ GET_LO_HI(env, rfp->blob_fid_lo, rfp->blob_fid_hi, cur_blob_fid, ret);
+ if (cur_blob_fid != blob_fid) {
+ ret = DB_REP_PAGEDONE;
+ goto err;
+ }
+
+ /* Get the first missing blob chunk. */
+ if ((ret = __db_cursor(db_rep->blob_dbp, ip, NULL, &dbc, 0)) != 0)
+ goto err;
+ memset(&key, 0, sizeof(DBT));
+ memset(&data, 0, sizeof(DBT));
+ ret = __dbc_get(dbc, &key, &data, DB_FIRST);
+ if (ret == DB_NOTFOUND) {
+ /* All blobs received. */
+ ret = 0;
+ *done = 1;
+ goto err;
+ } else if (ret != 0)
+ goto err;
+
+ DB_ASSERT(env, key.size == BLOB_KEY_SIZE);
+ DB_ASSERT(env, data.size == sizeof(off_t));
+ offset = *(off_t *)data.data;
+ /*
+ * Format the sdbid and id of the high chunk as a blob gap
+ * database key, so it can be compared with the entries in that
+ * database.
+ */
+ memset(&high, 0, sizeof(DBT));
+ memcpy(buf, &rep->gap_bl_hi_sid, BLOB_ID_SIZE);
+ memcpy(buf + BLOB_ID_SIZE, &rep->gap_bl_hi_id, BLOB_ID_SIZE);
+ high.data = buf;
+ high.size = BLOB_KEY_SIZE;
+
+ /*
+ * If the first chunk in the database is larger than the highest chunk
+ * received, then there is no gap.
+ *
+ * If a gap does exist, check if it is time to do a re-request. If so,
+ * re-request every chunk that exists before the highest received.
+ */
+ if (!force && (__rep_blob_cmp(NULL, &key, &high, NULL) > 0 ||
+ (__rep_blob_cmp(NULL, &key, &high, NULL) == 0 &&
+ offset > rep->gap_bl_hi_off))) {
+ lp->wait_ts = db_rep->request_gap;
+ __os_gettime(env, &lp->rcvd_ts, 1);
+ } else if (force || __rep_check_doreq(env, rep)) {
+ /*
+ * Re-request every chunk less than the highest one, plus the
+ * next blob chunk that we are expecting. The next expected
+ * blob chunk is requested in case the last blob chunk is lost
+ * in transit.
+ */
+ do {
+ memset(&rbcr, 0, sizeof(__rep_blob_chunk_req_args));
+ memcpy(&(rbcr.blob_sid), key.data, BLOB_ID_SIZE);
+ memcpy(&(rbcr.blob_id),
+ (u_int8_t *)key.data + BLOB_ID_SIZE, BLOB_ID_SIZE);
+ rbcr.offset = *(u_int64_t *)data.data;
+ rbcr.blob_fid = (u_int64_t)blob_fid;
+ msg.size = __REP_BLOB_CHUNK_REQ_SIZE;
+ msg.data = msgbuf;
+ RPRINT(env, (env, DB_VERB_REP_SYNC,
+"blob_chunk_gap: Req file_id %llu, sdb_id %llu, blob_id %llu, offset %llu",
+ (long long)rbcr.blob_fid, (long long)rbcr.blob_sid,
+ (long long)rbcr.blob_id, (long long)rbcr.offset));
+ __rep_blob_chunk_req_marshal(env, &rbcr, msg.data);
+ /*
+ * Note that peer-to-peer initialization is not
+ * supported for blobs.
+ */
+ (void)__rep_send_message(
+ env, rep->master_id,
+ REP_BLOB_CHUNK_REQ, NULL, &msg, 0, 0);
+ /*
+ * Break after requesting the chunk after the highest
+ * one.
+ */
+ if (__rep_blob_cmp(NULL, &key, &high, NULL) > 0 ||
+ (__rep_blob_cmp(NULL, &key, &high, NULL) == 0 &&
+ offset > rep->gap_bl_hi_off))
+ break;
+ if ((ret = __dbc_get(
+ dbc, &key, &data, DB_NEXT)) != 0) {
+ if (ret == DB_NOTFOUND) {
+ ret = 0;
+ break;
+ }
+ goto err;
+ }
+ } while (1);
+ }
+
+err: if (dbc != NULL)
+ (void)__dbc_close(dbc);
+
+ return (ret);
+}
+
+/*
+ * __rep_blob_chunk_req
+ * Answer a request for a specific blob chunk.
+ *
+ * PUBLIC: int __rep_blob_chunk_req __P((ENV *, int, DBT *));
+ */
+int
+__rep_blob_chunk_req(env, eid, rec)
+ ENV *env;
+ int eid;
+ DBT *rec;
+{
+ DB *dbp;
+ DBT msg;
+ DB_FH *fhp;
+ __rep_blob_chunk_args rbc;
+ __rep_blob_chunk_req_args rbcr;
+ int ret;
+ u_int8_t *chunk_buf, *msg_buf, *ptr;
+
+ dbp = NULL;
+ fhp = NULL;
+ chunk_buf = msg_buf = NULL;
+
+ if ((ret =
+ __os_malloc(env, MEGABYTE + __REP_BLOB_CHUNK_SIZE, &msg_buf)) != 0)
+ goto err;
+ memset(&msg, 0, sizeof(DBT));
+ msg.data = msg_buf;
+ msg.ulen = MEGABYTE + __REP_BLOB_CHUNK_SIZE;
+ if ((ret = __os_malloc(env, MEGABYTE, &chunk_buf)) != 0)
+ goto err;
+ memset(&rbc, 0, sizeof(__rep_blob_chunk_args));
+ rbc.data.data = chunk_buf;
+ rbc.data.ulen = MEGABYTE;
+ rbc.data.flags = DB_DBT_USERMEM;
+
+ if ((ret = __rep_blob_chunk_req_unmarshal(
+ env, &rbcr, rec->data, rec->size, &ptr)) != 0)
+ goto err;
+
+ RPRINT(env, (env, DB_VERB_REP_SYNC,
+ "blob_chunk_req: file_id %llu, sdbid %llu, id %llu, offset %llu",
+ (long long)rbcr.blob_fid, (long long)rbcr.blob_sid,
+ (long long)rbcr.blob_id, (long long)rbcr.offset));
+
+ rbc.blob_fid = rbcr.blob_fid;
+ rbc.blob_id = rbcr.blob_id;
+ rbc.blob_sid = rbcr.blob_sid;
+ rbc.offset = rbcr.offset;
+ if ((ret = __db_create_internal(&dbp, env, 0)) != 0)
+ goto err;
+ dbp->blob_file_id = (db_seq_t)rbcr.blob_fid;
+ dbp->blob_sdb_id = (db_seq_t)rbcr.blob_sid;
+ if ((ret = __blob_make_sub_dir(env, &dbp->blob_sub_dir,
+ (db_seq_t)rbcr.blob_fid, (db_seq_t)rbcr.blob_sid)) != 0)
+ goto err;
+ if ((ret = __blob_file_open(
+ dbp, &fhp, (db_seq_t)rbcr.blob_id, DB_FOP_READONLY, 0)) != 0) {
+ /*
+ * The file may have been deleted between creating the
+ * list and sending the request. Send a message saying
+ * the file has been deleted.
+ */
+ if (ret == ENOENT) {
+ ret = 0;
+ F_SET(&rbc, BLOB_DELETE);
+ rbc.data.size = 0;
+ __rep_blob_chunk_marshal(env, &rbc, msg.data);
+ msg.size = __REP_BLOB_CHUNK_SIZE;
+ (void)__rep_send_message(
+ env, eid, REP_BLOB_CHUNK, NULL, &msg, 0, 0);
+ goto err;
+ }
+ goto err;
+ }
+ if ((ret = __blob_file_read(
+ env, fhp, &rbc.data, (off_t)rbcr.offset, MEGABYTE)) != 0)
+ goto err;
+ DB_ASSERT(env, rbc.data.size <= MEGABYTE);
+
+ /*
+ * In rare cases the blob file may have gotten shorter
+ * since the list was created.
+ */
+ if (rbc.data.size == 0)
+ F_SET(&rbc, BLOB_CHUNK_FAIL);
+ __rep_blob_chunk_marshal(env, &rbc, msg.data);
+ msg.size = __REP_BLOB_CHUNK_SIZE + rbc.data.size;
+ (void)__rep_send_message(env, eid, REP_BLOB_CHUNK, NULL, &msg, 0, 0);
+
+err: if (chunk_buf != NULL)
+ __os_free(env, chunk_buf);
+ if (msg_buf != NULL)
+ __os_free(env, msg_buf);
+ if (fhp != NULL)
+ (void)__os_closehandle(env, fhp);
+ if (dbp != 0)
+ (void)__db_close(dbp, NULL, 0);
+ return (ret);
+}
+
+/*
* Starts requesting pages for the next file in the list (if any), or if not,
* proceeds to the next stage: requesting logs.
*
@@ -2404,19 +4111,25 @@ __rep_nextfile(env, eid, rep)
DBT dbt;
__rep_logreq_args lr_args;
DB_LOG *dblp;
+ DB_REP *db_rep;
+ DELAYED_BLOB_LIST *dbl;
LOG *lp;
REGENV *renv;
REGINFO *infop;
__rep_fileinfo_args *curinfo, *rfp, rf;
__rep_fileinfo_v6_args *rfpv6;
- int *curbuf, ret;
+ __rep_fileinfo_v7_args *rfpv7;
+ int *curbuf, ret, view_partial;
u_int8_t *buf, *info_ptr, lrbuf[__REP_LOGREQ_SIZE], *nextinfo;
size_t len, msgsz;
+ char *name;
void *rffree;
infop = env->reginfo;
renv = infop->primary;
+ db_rep = env->rep_handle;
rfp = NULL;
+ dbl = NULL;
/*
* Always direct the next request to the master (at least nominally),
@@ -2430,13 +4143,13 @@ __rep_nextfile(env, eid, rep)
/* Set curinfo to next file and examine it. */
info_ptr = R_ADDR(infop,
rep->originfo_off + (rep->originfolen - rep->infolen));
+ /*
+ * Build a current struct by copying in the older
+ * version struct and then setting up the new fields.
+ * This is safe because all old fields are in the
+ * same location in the current struct.
+ */
if (rep->infoversion < DB_REPVERSION_53) {
- /*
- * Build a current struct by copying in the older
- * version struct and then setting up the data_dir.
- * This is safe because all old fields are in the
- * same location in the current struct.
- */
if ((ret = __rep_fileinfo_v6_unmarshal(env,
rep->infoversion, &rfpv6,
info_ptr, rep->infolen, &nextinfo)) != 0)
@@ -2444,8 +4157,18 @@ __rep_nextfile(env, eid, rep)
memcpy(&rf, rfpv6, sizeof(__rep_fileinfo_v6_args));
rf.dir.data = NULL;
rf.dir.size = 0;
+ rf.blob_fid_lo = rf.blob_fid_hi = 0;
rfp = &rf;
rffree = rfpv6;
+ } else if (rep->infoversion < DB_REPVERSION_61) {
+ if ((ret = __rep_fileinfo_v7_unmarshal(env,
+ rep->infoversion, &rfpv7,
+ info_ptr, rep->infolen, &nextinfo)) != 0)
+ return (ret);
+ memcpy(&rf, rfpv7, sizeof(__rep_fileinfo_v7_args));
+ rf.blob_fid_lo = rf.blob_fid_hi = 0;
+ rfp = &rf;
+ rffree = rfpv7;
} else {
if ((ret = __rep_fileinfo_unmarshal(env,
rep->infoversion, &rfp, info_ptr,
@@ -2457,6 +4180,14 @@ __rep_nextfile(env, eid, rep)
}
rffree = rfp;
}
+#ifndef HAVE_64BIT_TYPES
+ if (rfp->blob_fid_lo != 0 || rfp->blob_fid_hi != 0) {
+ __db_errx(env, DB_STR("3705",
+ "Blobs require 64 integer compiler support."));
+ __os_free(env, rffree);
+ return (DB_OPNOTSUP);
+ }
+#endif
rep->infolen -= (u_int32_t)(nextinfo - info_ptr);
MUTEX_LOCK(env, renv->mtx_regenv);
ret = __env_alloc(infop, sizeof(__rep_fileinfo_args) +
@@ -2484,19 +4215,55 @@ __rep_nextfile(env, eid, rep)
rfp->dir.data, rfp->dir.size);
__os_free(env, rffree);
- /* Skip over regular DB's in "abbreviated" internal inits. */
- if (F_ISSET(rep, REP_F_ABBREVIATED) &&
+ /*
+ * If a partial callback is set, invoke the callback to see if
+ * this file should be replicated.
+ */
+ if (IS_VIEW_SITE(env) && curinfo->info.size > 0 &&
!FLD_ISSET(curinfo->db_flags, DB_AM_INMEM)) {
+ name = (char *)curinfo->info.data;
+ DB_ASSERT(env, db_rep->partial != NULL);
+ /*
+ * Always replicate system owned databases.
+ */
+ if (IS_DB_FILE(name) && !IS_BLOB_META(name))
+ view_partial = 1;
+ else if ((ret = __rep_call_partial(env,
+ name, &view_partial, 0, &dbl)) != 0) {
+ VPRINT(env, (env, DB_VERB_REP_SYNC,
+ "rep_nextfile: partial cb err %d for %s",
+ ret, name));
+ return (ret);
+ }
+ /*
+ * dbl != NULL when we could not find the name of the
+ * database that owns a blob meta database. If that
+ * happens then it was never opened, which means it
+ * was not replicated, and as such neither should its
+ * bmd be replicated.
+ */
+ if (dbl != NULL) {
+ view_partial = 0;
+ __os_free(env, dbl);
+ dbl = NULL;
+ }
VPRINT(env, (env, DB_VERB_REP_SYNC,
- "Skipping file %d in abbreviated internal init",
- curinfo->filenum));
- MUTEX_LOCK(env, renv->mtx_regenv);
- __env_alloc_free(infop,
- R_ADDR(infop, rep->curinfo_off));
- MUTEX_UNLOCK(env, renv->mtx_regenv);
- rep->curinfo_off = INVALID_ROFF;
- rep->curfile++;
- continue;
+ "rep_nextfile: %s file %s %d on view site.",
+ view_partial == 0 ?
+ "Skipping" : "Replicating",
+ name, curinfo->filenum));
+ /*
+ * If we're skipping the file, move to the next one.
+ */
+ if (view_partial == 0) {
+ MUTEX_LOCK(env, renv->mtx_regenv);
+ __env_alloc_free(infop,
+ R_ADDR(infop, rep->curinfo_off));
+ MUTEX_UNLOCK(env, renv->mtx_regenv);
+ rep->curinfo_off = INVALID_ROFF;
+ rep->curfile++;
+ continue;
+ }
}
/* Request this file's pages. */
@@ -2519,15 +4286,19 @@ __rep_nextfile(env, eid, rep)
curinfo->uid.size + curinfo->info.size;
if ((ret = __os_calloc(env, 1, msgsz, &buf)) != 0)
return (ret);
+ /*
+ * It is safe to cast to the old structs
+ * because the first part of the current
+ * struct matches the old structs.
+ */
if (rep->infoversion < DB_REPVERSION_53)
- /*
- * It is safe to cast to the old struct
- * because the first part of the current
- * struct matches the old struct.
- */
ret = __rep_fileinfo_v6_marshal(env, rep->infoversion,
(__rep_fileinfo_v6_args *)curinfo, buf,
msgsz, &len);
+ else if (rep->infoversion < DB_REPVERSION_61)
+ ret = __rep_fileinfo_v7_marshal(env, rep->infoversion,
+ (__rep_fileinfo_v7_args *)curinfo, buf,
+ msgsz, &len);
else
ret = __rep_fileinfo_marshal(env, rep->infoversion,
curinfo, buf, msgsz, &len);
@@ -2834,16 +4605,19 @@ __rep_pggap_req(env, rep, reqfp, gapflags)
* new info into rep->finfo. Assert that the sizes never
* change. The only thing this should do is change
* the pgno field. Everything else remains the same.
+ *
+ * It is safe to cast to the old structs
+ * because the first part of the current
+ * struct matches the old structs.
*/
if (rep->infoversion < DB_REPVERSION_53)
- /*
- * It is safe to cast to the old struct
- * because the first part of the current
- * struct matches the old struct.
- */
ret = __rep_fileinfo_v6_marshal(env, rep->infoversion,
(__rep_fileinfo_v6_args *)tmpfp, buf,
msgsz, &len);
+ else if (rep->infoversion < DB_REPVERSION_61)
+ ret = __rep_fileinfo_v7_marshal(env, rep->infoversion,
+ (__rep_fileinfo_v7_args *)tmpfp, buf,
+ msgsz, &len);
else
ret = __rep_fileinfo_marshal(env, rep->infoversion,
tmpfp, buf, msgsz, &len);
@@ -2865,6 +4639,94 @@ err:
}
/*
+ * __rep_blob_rereq -
+ *
+ * Re-request lost blob messages, such as REP_BLOB_CHUNK_REQ, REP_BLOB_ALL_REQ,
+ * or REP_BLOB_UPDATE_REQ. Note that the blob chunk gap database contains
+ * descriptions of the blob chunks that we are expecting to arrive.
+ *
+ * Assumes the caller holds mtx_clientdb and rep_mutex.
+ *
+ * PUBLIC: int __rep_blob_rereq __P((ENV *, REP *));
+ */
+int
+__rep_blob_rereq(env, rep)
+ ENV *env;
+ REP *rep;
+{
+ DB_REP *db_rep;
+ DB_THREAD_INFO *ip;
+ REGINFO *infop;
+ __rep_fileinfo_args *rfp;
+ db_seq_t blob_fid;
+ int master, ret;
+ u_int32_t count;
+
+ db_rep = env->rep_handle;
+ infop = env->reginfo;
+ rfp = NULL;
+ ret = 0;
+
+ /* First check if the master is around to answer the re-request. */
+ master = rep->master_id;
+ if (master == DB_EID_INVALID) {
+ (void)__rep_send_message(env,
+ DB_EID_BROADCAST, REP_MASTER_REQ, NULL, NULL, 0, 0);
+ goto err;
+ }
+
+ if (db_rep->blob_dbp == NULL &&
+ (ret = __rep_client_dbinit(env, 0, REP_BLOB)) != 0) {
+ RPRINT(env, (env, DB_VERB_REP_SYNC,
+ "REP_BLOB_CHUNK: Client_dbinit %s",
+ db_strerror(ret)));
+ goto err;
+ }
+
+ /*
+ * If the gap blob id is 0 then we either lost a REP_BLOB_ALL_REQ or
+ * a REP_BLOB_UPDATE_REQ message. Since we do not have the information
+ * to reconstruct a REP_BLOB_ALL_REQ message, reset the blob gap
+ * database and start over at the REP_BLOB_UPDATE_REQ stage.
+ *
+ * If the blob gap id is not 0, we lost a REP_BLOB_CHUNK_REQ message,
+ * so perform blob gap processing.
+ */
+ ENV_GET_THREAD_INFO(env, ip);
+ if (rep->gap_bl_hi_id == 0) {
+ /*
+ * It takes a while to create the blob update message, so skip
+ * the first time it asks.
+ */
+ if (rep->blob_rereq == 0) {
+ rep->blob_rereq = 1;
+ goto err;
+ }
+ rep->blob_rereq = 0;
+ if ((ret = __db_truncate(
+ db_rep->blob_dbp, ip, NULL, &count)) != 0)
+ goto err;
+ rep->blob_more_files = 1;
+ rep->last_blob_id = rep->prev_blob_id;
+ rep->last_blob_sid = rep->prev_blob_sid;
+ }
+
+ GET_CURINFO(rep, infop, rfp);
+ GET_LO_HI(env, rfp->blob_fid_lo, rfp->blob_fid_hi, blob_fid, ret);
+ if (ret != 0)
+ goto err;
+ /*
+ * If there are entries in the blob gap database, __rep_blobdone
+ * will perform gap processing, otherwise it will send
+ * a REP_BLOB_UPDATE_REQ.
+ */
+ ret = __rep_blobdone(env, master, ip, rep, blob_fid, 1);
+
+err:
+ return (ret);
+}
+
+/*
* __rep_finfo_alloc -
* Allocate and initialize a fileinfo structure.
*
@@ -3521,6 +5383,7 @@ __rep_walk_filelist(env, version, files, size, count, fn, arg)
{
__rep_fileinfo_args *rfp, rf;
__rep_fileinfo_v6_args *rfpv6;
+ __rep_fileinfo_v7_args *rfpv7;
u_int8_t *next;
int ret;
void *rffree;
@@ -3530,21 +5393,30 @@ __rep_walk_filelist(env, version, files, size, count, fn, arg)
rfpv6 = NULL;
rffree = NULL;
while (count-- > 0) {
+ /*
+ * Build a current struct by copying in the older
+ * version struct and then setting up the new fields.
+ * This is safe because all old fields are in the
+ * same location in the current struct.
+ */
if (version < DB_REPVERSION_53) {
- /*
- * Build a current struct by copying in the older
- * version struct and then setting up the data_dir.
- * This is safe because all old fields are in the
- * same location in the current struct.
- */
if ((ret = __rep_fileinfo_v6_unmarshal(env, version,
&rfpv6, files, size, &next)) != 0)
break;
memcpy(&rf, rfpv6, sizeof(__rep_fileinfo_v6_args));
rf.dir.data = NULL;
rf.dir.size = 0;
+ rf.blob_fid_lo = rf.blob_fid_hi = 0;
rfp = &rf;
rffree = rfpv6;
+ } else if (version < DB_REPVERSION_61) {
+ if ((ret = __rep_fileinfo_v7_unmarshal(env, version,
+ &rfpv7, files, size, &next)) != 0)
+ break;
+ memcpy(&rf, rfpv7, sizeof(__rep_fileinfo_v7_args));
+ rf.blob_fid_lo = rf.blob_fid_hi = 0;
+ rfp = &rf;
+ rffree = rfpv7;
} else {
if ((ret = __rep_fileinfo_unmarshal(env, version,
&rfp, files, size, &next)) != 0)
@@ -3566,3 +5438,33 @@ __rep_walk_filelist(env, version, files, size, count, fn, arg)
__os_free(env, rffree);
return (ret);
}
+
+/*
+ * Initializes a FILE_LIST_CTX structure.
+ *
+ * Pass in a non-zero value for update_space to reserve space for
+ * update_args in the context's buffer.
+ */
+static int
+__rep_init_file_list_context(env, version, flags, update_space, context)
+ ENV *env;
+ u_int32_t version;
+ u_int32_t flags;
+ int update_space;
+ FILE_LIST_CTX *context;
+{
+ int ret;
+
+ if ((ret = __os_calloc(env, 1, MEGABYTE, &context->buf)) != 0)
+ return (ret);
+ context->size = MEGABYTE;
+ context->count = 0;
+ context->version = version;
+ context->flags = flags;
+ /* Reserve space for update_args. */
+ if (update_space)
+ context->fillptr = FIRST_FILE_PTR(context->buf);
+ else
+ context->fillptr = context->buf;
+ return (ret);
+}
diff --git a/src/rep/rep_elect.c b/src/rep/rep_elect.c
index 9e8c5249..234daf31 100644
--- a/src/rep/rep_elect.c
+++ b/src/rep/rep_elect.c
@@ -1,7 +1,7 @@
/*-
* See the file LICENSE for redistribution information.
*
- * Copyright (c) 2004, 2012 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2004, 2015 Oracle and/or its affiliates. All rights reserved.
*
* $Id$
*/
@@ -53,8 +53,9 @@ __rep_elect_pp(dbenv, given_nsites, nvotes, flags)
u_int32_t given_nsites, nvotes;
u_int32_t flags;
{
- DB_REP *db_rep;
ENV *env;
+ DB_REP *db_rep;
+ DB_THREAD_INFO *ip;
int ret;
env = dbenv->env;
@@ -89,7 +90,9 @@ __rep_elect_pp(dbenv, given_nsites, nvotes, flags)
return (EINVAL);
}
+ ENV_ENTER(env, ip);
ret = __rep_elect_int(env, given_nsites, nvotes, flags);
+ ENV_LEAVE(env, ip);
/*
* The DB_REP_IGNORE return code can be of use to repmgr (which of
@@ -120,7 +123,6 @@ __rep_elect_int(env, given_nsites, nvotes, flags)
DB_LOGC *logc;
DB_LSN lsn;
DB_REP *db_rep;
- DB_THREAD_INFO *ip;
LOG *lp;
REP *rep;
int done, elected, in_progress;
@@ -140,6 +142,15 @@ __rep_elect_int(env, given_nsites, nvotes, flags)
ret = 0;
/*
+ * View sites never participate in elections.
+ */
+ if (IS_VIEW_SITE(env)) {
+ __db_errx(env, DB_STR("3687",
+ "View sites may not participate in elections"));
+ return (EINVAL);
+ }
+
+ /*
* Specifying 0 for nsites signals us to use the value configured
* previously via rep_set_nsites. Similarly, if the given nvotes is 0,
* it asks us to compute the value representing a simple majority.
@@ -185,7 +196,6 @@ __rep_elect_int(env, given_nsites, nvotes, flags)
* real, configured priority, as retrieved from REP region.
*/
ctlflags = realpri != 0 ? REPCTL_ELECTABLE : 0;
- ENV_ENTER(env, ip);
orig_tally = 0;
/* If we are already master, simply broadcast that fact and return. */
@@ -597,8 +607,7 @@ out:
DB_ASSERT(env, rep->elect_th > 0);
rep->elect_th--;
if (rep->elect_th == 0) {
- need_req = F_ISSET(rep, REP_F_SKIPPED_APPLY) &&
- !I_HAVE_WON(rep, rep->winner);
+ need_req = F_ISSET(rep, REP_F_SKIPPED_APPLY) && !elected;
FLD_CLR(rep->lockout_flags, REP_LOCKOUT_APPLY);
F_CLR(rep, REP_F_SKIPPED_APPLY);
}
@@ -641,7 +650,6 @@ out:
unlck_lv: REP_SYSTEM_UNLOCK(env);
}
envleave:
- ENV_LEAVE(env, ip);
return (ret);
}
@@ -1106,7 +1114,7 @@ __rep_cmp_vote(env, rep, eid, lsnp, priority, gen, data_gen, tiebreaker, flags)
u_int32_t priority;
u_int32_t data_gen, flags, gen, tiebreaker;
{
- int cmp, like_pri;
+ int cmp, genlog_cmp, like_pri;
cmp = LOG_COMPARE(lsnp, &rep->w_lsn);
/*
@@ -1140,9 +1148,18 @@ __rep_cmp_vote(env, rep, eid, lsnp, priority, gen, data_gen, tiebreaker, flags)
like_pri = (priority == 0 && rep->w_priority == 0) ||
(priority != 0 && rep->w_priority != 0);
- if ((priority != 0 && rep->w_priority == 0) ||
- (like_pri && data_gen > rep->w_datagen) ||
- (like_pri && data_gen == rep->w_datagen && cmp > 0) ||
+ /*
+ * The undocumented ELECT_LOGLENGTH option requires that the
+ * election should be won based on log length without regard
+ * for datagen. Do not include datagen in the comparison if
+ * this option is enabled.
+ */
+ if (FLD_ISSET(rep->config, REP_C_ELECT_LOGLENGTH))
+ genlog_cmp = like_pri && cmp > 0;
+ else
+ genlog_cmp = (like_pri && data_gen > rep->w_datagen) ||
+ (like_pri && data_gen == rep->w_datagen && cmp > 0);
+ if ((priority != 0 && rep->w_priority == 0) || genlog_cmp ||
(cmp == 0 && (priority > rep->w_priority ||
(priority == rep->w_priority &&
(tiebreaker > rep->w_tiebreaker))))) {
@@ -1306,8 +1323,9 @@ __rep_wait(env, timeoutp, full_elect, egen, flags)
{
DB_REP *db_rep;
REP *rep;
- int done;
- u_int32_t sleeptime, sleeptotal, timeout;
+ db_timespec exptime, mytime;
+ int diff_timeout, done;
+ u_int32_t sleeptime, timeout;
db_rep = env->rep_handle;
rep = db_rep->region;
@@ -1315,10 +1333,20 @@ __rep_wait(env, timeoutp, full_elect, egen, flags)
timeout = *timeoutp;
sleeptime = SLEEPTIME(timeout);
- sleeptotal = 0;
- while (sleeptotal < timeout) {
+ __os_gettime(env, &exptime, 0);
+ TIMESPEC_ADD_DB_TIMEOUT(&exptime, timeout);
+ while (!done) {
+ __os_gettime(env, &mytime, 0);
+ /*
+ * Check if the timeout has expired. __os_yield might sleep
+ * a slightly shorter time than requested, so check the exact
+ * amount of time that has passed. If we do not sleep the
+ * full PHASE0 time, old unexpired lease grants could
+ * incorrectly prevent the election from happening.
+ */
+ if (timespeccmp(&mytime, &exptime, >))
+ break;
__os_yield(env, 0, sleeptime);
- sleeptotal += sleeptime;
REP_SYSTEM_LOCK(env);
/*
* Check if group membership changed while we were
@@ -1331,19 +1359,19 @@ __rep_wait(env, timeoutp, full_elect, egen, flags)
if (!LF_ISSET(REP_E_PHASE0) &&
full_elect && F_ISSET(rep, REP_F_GROUP_ESTD)) {
*timeoutp = rep->elect_timeout;
+ if ((diff_timeout = (int)(*timeoutp - timeout)) > 0)
+ TIMESPEC_ADD_DB_TIMEOUT(&exptime, diff_timeout);
+ else {
+ diff_timeout = -diff_timeout;
+ TIMESPEC_SUB_DB_TIMEOUT(&exptime, diff_timeout);
+ }
timeout = *timeoutp;
- if (sleeptotal >= timeout)
- done = 1;
- else
- sleeptime = SLEEPTIME(timeout);
+ sleeptime = SLEEPTIME(timeout);
}
if (egen != rep->egen || !FLD_ISSET(rep->elect_flags, flags))
done = 1;
REP_SYSTEM_UNLOCK(env);
-
- if (done)
- return (0);
}
return (0);
}
diff --git a/src/rep/rep_lease.c b/src/rep/rep_lease.c
index 047c39a7..b6010046 100644
--- a/src/rep/rep_lease.c
+++ b/src/rep/rep_lease.c
@@ -1,7 +1,7 @@
/*-
* See the file LICENSE for redistribution information.
*
- * Copyright (c) 2007, 2012 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2007, 2015 Oracle and/or its affiliates. All rights reserved.
*
* $Id$
*/
@@ -45,10 +45,20 @@ __rep_update_grant(env, ts)
timespecclear(&mytime);
/*
+ * If we are a view, we never grant a lease.
+ */
+ if (IS_VIEW_SITE(env))
+ return (0);
+
+ /*
* Get current time, and add in the (skewed) lease duration
- * time to send the grant to the master.
+ * time to send the grant to the master. We need to use '0'
+ * for a non-monotonic (i.e. realtime) timestamp. Some systems
+ * use "time since boot" for monotonic time, which would not
+ * work between machines here. We already document that for leases,
+ * the time cannot go backward.
*/
- __os_gettime(env, &mytime, 1);
+ __os_gettime(env, &mytime, 0);
timespecadd(&mytime, &rep->lease_duration);
REP_SYSTEM_LOCK(env);
/*
@@ -108,7 +118,7 @@ __rep_islease_granted(env)
* Get current time and compare against our granted lease.
*/
timespecclear(&mytime);
- __os_gettime(env, &mytime, 1);
+ __os_gettime(env, &mytime, 0);
return (timespeccmp(&mytime, &rep->grant_expire, <=) ? 1 : 0);
}
@@ -319,9 +329,15 @@ __rep_lease_check(env, refresh)
max_tries = LEASE_REFRESH_MIN;
retry:
REP_SYSTEM_LOCK(env);
- min_leases = rep->config_nsites / 2;
+ /*
+ * We need enough leases so that we're guaranteed any successful
+ * election will include at least one site with the lease-guaranteed
+ * data. Note this is based on total number of sites so leases
+ * cannot be used with half or more unelectable sites.
+ */
+ min_leases = (rep->config_nsites - 1) / 2;
ret = 0;
- __os_gettime(env, &curtime, 1);
+ __os_gettime(env, &curtime, 0);
VPRINT(env, (env, DB_VERB_REP_LEASE,
"%s %d of %d refresh %d min_leases %lu curtime %lu %lu, maxLSN [%lu][%lu]",
"lease_check: try ", tries, max_tries, refresh,
@@ -526,7 +542,7 @@ __rep_lease_waittime(env)
if (!F_ISSET(rep, REP_F_LEASE_EXPIRED))
to = rep->lease_timeout;
} else {
- __os_gettime(env, &mytime, 1);
+ __os_gettime(env, &mytime, 0);
RPRINT(env, (env, DB_VERB_REP_LEASE,
"wait_time: mytime %lu %lu, grant_expire %lu %lu",
(u_long)mytime.tv_sec, (u_long)mytime.tv_nsec,
diff --git a/src/rep/rep_log.c b/src/rep/rep_log.c
index 42300685..bf72db9e 100644
--- a/src/rep/rep_log.c
+++ b/src/rep/rep_log.c
@@ -1,7 +1,7 @@
/*-
* See the file LICENSE for redistribution information.
*
- * Copyright (c) 2004, 2012 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2004, 2015 Oracle and/or its affiliates. All rights reserved.
*
* $Id$
*/
@@ -110,7 +110,7 @@ __rep_allreq(env, rp, eid)
*/
if (ret == 0 && repth.lsn.file != 1 && flags == DB_FIRST) {
if (F_ISSET(rep, REP_F_CLIENT))
- ret = DB_NOTFOUND;
+ ret = USR_ERR(env, DB_NOTFOUND);
else
(void)__rep_send_message(env, eid,
REP_VERIFY_FAIL, &repth.lsn, NULL, 0, 0);
@@ -466,8 +466,8 @@ __rep_log_split(env, ip, rp, rec, ret_lsnp, last_lsnp)
if (p >= ep && save_flags)
F_SET(&tmprp, save_flags);
/*
- * A previous call to __rep_apply indicated an earlier
- * record is a dup and the next_new_lsn we are waiting for.
+ * A previous call to __rep_apply indicated an earlier record
+ * is a past dup and the next_new_lsn for which we are waiting.
* Skip log records until we catch up with next_new_lsn.
*/
if (is_dup && LOG_COMPARE(&tmprp.lsn, &next_new_lsn) < 0) {
@@ -482,7 +482,20 @@ __rep_log_split(env, ip, rp, rec, ret_lsnp, last_lsnp)
VPRINT(env, (env, DB_VERB_REP_MISC,
"log_split: rep_apply ret %d, dup %d, tmp_lsn [%lu][%lu]",
ret, is_dup, (u_long)tmp_lsn.file, (u_long)tmp_lsn.offset));
- if (is_dup)
+ /*
+ * We can skip log records between a past dup and tmp_lsn
+ * returned by rep_apply() because we know we have all
+ * those log records. For a past dup, this log record is
+ * less than or equal to tmp_lsn (which is either ready_lsn
+ * or max_perm_lsn) and we only have records to skip when
+ * it is less than tmp_lsn.
+ *
+ * We cannot skip log records for a future dup because we
+ * may not have all of them. In this case, this log record
+ * is greater than or equal to tmp_lsn (which is either
+ * ready_lsn or this log record).
+ */
+ if (is_dup && LOG_COMPARE(&tmprp.lsn, &tmp_lsn) < 0)
next_new_lsn = tmp_lsn;
switch (ret) {
/*
@@ -637,7 +650,7 @@ __rep_logreq(env, rp, rec, eid)
if (LOG_COMPARE(&firstlsn, &rp->lsn) > 0) {
/* Case 3 */
if (F_ISSET(rep, REP_F_CLIENT)) {
- ret = DB_NOTFOUND;
+ ret = USR_ERR(env, DB_NOTFOUND);
goto err;
}
(void)__rep_send_message(env, eid,
@@ -662,7 +675,7 @@ __rep_logreq(env, rp, rec, eid)
ret = 0;
goto err;
} else
- ret = DB_NOTFOUND;
+ ret = USR_ERR(env, DB_NOTFOUND);
}
}
@@ -812,6 +825,14 @@ __rep_loggap_req(env, rep, lsnp, gapflags)
ret = 0;
/*
+ * If we are in SYNC_LOG and have all the log we need (i.e.
+ * rep->last_lsn is ZERO_LSN), just return, as there is nothing
+ * to do while recovery is running.
+ */
+ if (rep->sync_state == SYNC_LOG && IS_ZERO_LSN(rep->last_lsn))
+ return (0);
+
+ /*
* Check if we need to ask for the gap.
* We ask for the gap if:
* We are forced to with gapflags.
@@ -1030,7 +1051,7 @@ __rep_chk_newfile(env, logc, rep, rp, eid)
REP_VERIFY_FAIL, &rp->lsn,
NULL, 0, 0);
} else
- ret = DB_NOTFOUND;
+ ret = USR_ERR(env, DB_NOTFOUND);
} else {
endlsn.offset += logc->len;
if ((ret = __logc_version(logc,
@@ -1054,7 +1075,7 @@ __rep_chk_newfile(env, logc, rep, rp, eid)
}
}
} else
- ret = DB_NOTFOUND;
+ ret = USR_ERR(env, DB_NOTFOUND);
return (ret);
}
diff --git a/src/rep/rep_method.c b/src/rep/rep_method.c
index f9f1924c..e0e7dd19 100644
--- a/src/rep/rep_method.c
+++ b/src/rep/rep_method.c
@@ -1,7 +1,7 @@
/*-
* See the file LICENSE for redistribution information.
*
- * Copyright (c) 2001, 2012 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2001, 2015 Oracle and/or its affiliates. All rights reserved.
*
* $Id$
*/
@@ -10,6 +10,7 @@
#include "db_int.h"
#include "dbinc/db_page.h"
+#include "dbinc/blob.h"
#include "dbinc/btree.h"
#include "dbinc/mp.h"
#include "dbinc/txn.h"
@@ -17,14 +18,12 @@
static int __rep_abort_prepared __P((ENV *));
static int __rep_await_condition __P((ENV *,
struct rep_waitgoal *, db_timeout_t));
-static int __rep_bt_cmp __P((DB *, const DBT *, const DBT *));
+static int __rep_bt_cmp __P((DB *, const DBT *, const DBT *, size_t *));
static int __rep_check_applied __P((ENV *,
DB_THREAD_INFO *, DB_COMMIT_INFO *, struct rep_waitgoal *));
static void __rep_config_map __P((ENV *, u_int32_t *, u_int32_t *));
static u_int32_t __rep_conv_vers __P((ENV *, u_int32_t));
-static int __rep_read_lsn_history __P((ENV *,
- DB_THREAD_INFO *, DB_TXN **, DBC **, u_int32_t,
- __rep_lsn_hist_data_args *, struct rep_waitgoal *, u_int32_t));
+static int __rep_defview __P((DB_ENV *, const char *, int *, u_int32_t));
static int __rep_restore_prepared __P((ENV *));
static int __rep_save_lsn_hist __P((ENV *, DB_THREAD_INFO *, DB_LSN *));
/*
@@ -123,9 +122,11 @@ __rep_get_config(dbenv, which, onp)
#undef OK_FLAGS
#define OK_FLAGS \
(DB_REP_CONF_AUTOINIT | DB_REP_CONF_AUTOROLLBACK | \
- DB_REP_CONF_BULK | DB_REP_CONF_DELAYCLIENT | DB_REP_CONF_INMEM | \
+ DB_REP_CONF_BULK | DB_REP_CONF_DELAYCLIENT | \
+ DB_REP_CONF_ELECT_LOGLENGTH | DB_REP_CONF_INMEM | \
DB_REP_CONF_LEASE | DB_REP_CONF_NOWAIT | \
- DB_REPMGR_CONF_2SITE_STRICT | DB_REPMGR_CONF_ELECTIONS)
+ DB_REPMGR_CONF_2SITE_STRICT | DB_REPMGR_CONF_ELECTIONS | \
+ DB_REPMGR_CONF_PREFMAS_CLIENT | DB_REPMGR_CONF_PREFMAS_MASTER)
if (FLD_ISSET(which, ~OK_FLAGS))
return (__db_ferr(env, "DB_ENV->rep_get_config", 0));
@@ -171,19 +172,30 @@ __rep_set_config(dbenv, which, on)
REP *rep;
REP_BULK bulk;
u_int32_t mapped, orig;
- int ret, t_ret;
+ int inmemlog, pm_ret, ret, t_ret;
env = dbenv->env;
db_rep = env->rep_handle;
ret = 0;
+ pm_ret = 0;
+ inmemlog = 0;
#undef OK_FLAGS
#define OK_FLAGS \
(DB_REP_CONF_AUTOINIT | DB_REP_CONF_AUTOROLLBACK | \
- DB_REP_CONF_BULK | DB_REP_CONF_DELAYCLIENT | DB_REP_CONF_INMEM | \
+ DB_REP_CONF_BULK | DB_REP_CONF_DELAYCLIENT | \
+ DB_REP_CONF_ELECT_LOGLENGTH | DB_REP_CONF_INMEM | \
DB_REP_CONF_LEASE | DB_REP_CONF_NOWAIT | \
- DB_REPMGR_CONF_2SITE_STRICT | DB_REPMGR_CONF_ELECTIONS)
-#define REPMGR_FLAGS (REP_C_2SITE_STRICT | REP_C_ELECTIONS)
+ DB_REPMGR_CONF_2SITE_STRICT | DB_REPMGR_CONF_ELECTIONS | \
+ DB_REPMGR_CONF_PREFMAS_CLIENT | DB_REPMGR_CONF_PREFMAS_MASTER)
+#define REPMGR_FLAGS (REP_C_2SITE_STRICT | REP_C_ELECTIONS | \
+ REP_C_PREFMAS_CLIENT | REP_C_PREFMAS_MASTER)
+
+#define TURNING_ON_PREFMAS(orig, curr) \
+ ((FLD_ISSET(curr, REP_C_PREFMAS_MASTER) && \
+ !FLD_ISSET(orig, REP_C_PREFMAS_MASTER)) || \
+ (FLD_ISSET(curr, REP_C_PREFMAS_CLIENT) && \
+ !FLD_ISSET(orig, REP_C_PREFMAS_CLIENT)))
ENV_NOT_CONFIGURED(
env, db_rep->region, "DB_ENV->rep_set_config", DB_INIT_REP);
@@ -224,6 +236,62 @@ __rep_set_config(dbenv, which, on)
return (EINVAL);
}
/*
+ * The undocumented ELECT_LOGLENGTH option and the preferred
+ * master options cannot be changed after calling repmgr_start.
+ */
+ if (FLD_ISSET(mapped, (REP_C_ELECT_LOGLENGTH |
+ REP_C_PREFMAS_MASTER | REP_C_PREFMAS_CLIENT)) &&
+ F_ISSET(rep, REP_F_START_CALLED)) {
+ __db_errx(env, DB_STR("3706",
+ "DB_ENV->rep_set_config: %s "
+ "must be configured before DB_ENV->repmgr_start"),
+ FLD_ISSET(mapped, REP_C_ELECT_LOGLENGTH) ?
+ "ELECT_LOGLENGTH" : "preferred master");
+ ENV_LEAVE(env, ip);
+ return (EINVAL);
+ }
+ /*
+ * Do not allow users to turn on preferred master if
+ * leases or in-memory replication files are in effect,
+ * or with a private environment or in-memory log files.
+ */
+ if (FLD_ISSET(mapped,
+ (REP_C_PREFMAS_MASTER | REP_C_PREFMAS_CLIENT)) &&
+ (REP_CONFIG_IS_SET(env, (REP_C_LEASE | REP_C_INMEM)) ||
+ (__log_get_config(dbenv,
+ DB_LOG_IN_MEMORY, &inmemlog) == 0 &&
+ (inmemlog > 0 || F_ISSET(env, ENV_PRIVATE))))) {
+ __db_errx(env, DB_STR("3707",
+ "DB_ENV->rep_set_config: preferred master mode "
+ "cannot be used with %s"),
+ REP_CONFIG_IS_SET(env, REP_C_LEASE) ?
+ "master leases" :
+ REP_CONFIG_IS_SET(env, REP_C_INMEM) ?
+ "in-memory replication files" :
+ inmemlog > 0 ? "in-memory log files" :
+ "a private environment");
+ ENV_LEAVE(env, ip);
+ return (EINVAL);
+ }
+ /*
+ * If we are already in preferred master mode, we can't
+ * turn off elections or 2site_strict and we can't turn on
+ * leases.
+ */
+ if (PREFMAS_IS_SET(env) && ((FLD_ISSET(mapped,
+ (REP_C_ELECTIONS | REP_C_2SITE_STRICT)) && on == 0) ||
+ (FLD_ISSET(mapped, REP_C_LEASE) && on > 0))) {
+ __db_errx(env, DB_STR("3708",
+ "DB_ENV->rep_set_config: cannot %s %s "
+ "in preferred master mode"),
+ on == 0 ? "disable" : "enable",
+ FLD_ISSET(mapped, REP_C_ELECTIONS) ? "elections" :
+ FLD_ISSET(mapped, REP_C_LEASE) ? "leases" :
+ "2SITE_STRICT");
+ ENV_LEAVE(env, ip);
+ return (EINVAL);
+ }
+ /*
* Leases must be turned on before calling rep_start.
* Leases can never be turned off once they're turned on.
*/
@@ -252,6 +320,17 @@ __rep_set_config(dbenv, which, on)
else
FLD_CLR(rep->config, mapped);
+#ifdef HAVE_REPLICATION_THREADS
+ /* Do automatic preferred master configuration. */
+ if (TURNING_ON_PREFMAS(orig, rep->config) &&
+ (pm_ret = __repmgr_prefmas_auto_config(dbenv,
+ &rep->config)) != 0) {
+ REP_SYSTEM_UNLOCK(env);
+ MUTEX_UNLOCK(env, rep->mtx_clientdb);
+ ENV_LEAVE(env, ip);
+ goto prefmas_err;
+ }
+#endif
/*
* Bulk transfer requires special processing if it is getting
* toggled.
@@ -297,10 +376,25 @@ __rep_set_config(dbenv, which, on)
ret = t_ret;
#endif
} else {
+ orig = db_rep->config;
if (on)
FLD_SET(db_rep->config, mapped);
else
FLD_CLR(db_rep->config, mapped);
+#ifdef HAVE_REPLICATION_THREADS
+ /* Do automatic preferred master configuration. */
+ if (TURNING_ON_PREFMAS(orig, db_rep->config))
+ pm_ret =
+ __repmgr_prefmas_auto_config(dbenv,
+ &db_rep->config);
+#endif
+ }
+prefmas_err:
+ if (pm_ret != 0) {
+ __db_errx(env, DB_STR("3709",
+ "DB_ENV->rep_set_config: could not complete automatic "
+ "preferred master configuration"));
+ ret = EINVAL;
}
/* Configuring 2SITE_STRICT, etc. makes this a repmgr application */
if (ret == 0 && FLD_ISSET(mapped, REPMGR_FLAGS))
@@ -331,6 +425,10 @@ __rep_config_map(env, inflagsp, outflagsp)
FLD_SET(*outflagsp, REP_C_DELAYCLIENT);
FLD_CLR(*inflagsp, DB_REP_CONF_DELAYCLIENT);
}
+ if (FLD_ISSET(*inflagsp, DB_REP_CONF_ELECT_LOGLENGTH)) {
+ FLD_SET(*outflagsp, REP_C_ELECT_LOGLENGTH);
+ FLD_CLR(*inflagsp, DB_REP_CONF_ELECT_LOGLENGTH);
+ }
if (FLD_ISSET(*inflagsp, DB_REP_CONF_INMEM)) {
FLD_SET(*outflagsp, REP_C_INMEM);
FLD_CLR(*inflagsp, DB_REP_CONF_INMEM);
@@ -351,6 +449,14 @@ __rep_config_map(env, inflagsp, outflagsp)
FLD_SET(*outflagsp, REP_C_ELECTIONS);
FLD_CLR(*inflagsp, DB_REPMGR_CONF_ELECTIONS);
}
+ if (FLD_ISSET(*inflagsp, DB_REPMGR_CONF_PREFMAS_CLIENT)) {
+ FLD_SET(*outflagsp, REP_C_PREFMAS_CLIENT);
+ FLD_CLR(*inflagsp, DB_REPMGR_CONF_PREFMAS_CLIENT);
+ }
+ if (FLD_ISSET(*inflagsp, DB_REPMGR_CONF_PREFMAS_MASTER)) {
+ FLD_SET(*outflagsp, REP_C_PREFMAS_MASTER);
+ FLD_CLR(*inflagsp, DB_REPMGR_CONF_PREFMAS_MASTER);
+ }
DB_ASSERT(env, *inflagsp == 0);
}
@@ -368,8 +474,10 @@ __rep_start_pp(dbenv, dbt, flags)
DBT *dbt;
u_int32_t flags;
{
- DB_REP *db_rep;
ENV *env;
+ DB_REP *db_rep;
+ DB_THREAD_INFO *ip;
+ int ret;
env = dbenv->env;
db_rep = env->rep_handle;
@@ -400,7 +508,11 @@ __rep_start_pp(dbenv, dbt, flags)
return (EINVAL);
}
- return (__rep_start_int(env, dbt, flags));
+ ENV_ENTER(env, ip);
+ ret = __rep_start_int(env, dbt, flags, 0);
+ ENV_LEAVE(env, ip);
+
+ return (ret);
}
/*
@@ -432,13 +544,14 @@ __rep_start_pp(dbenv, dbt, flags)
* clients that reference non-existent files whose creation was backed out
* during a synchronizing recovery.
*
- * PUBLIC: int __rep_start_int __P((ENV *, DBT *, u_int32_t));
+ * PUBLIC: int __rep_start_int __P((ENV *, DBT *, u_int32_t, u_int32_t));
*/
int
-__rep_start_int(env, dbt, flags)
+__rep_start_int(env, dbt, flags, startopts)
ENV *env;
DBT *dbt;
u_int32_t flags;
+ u_int32_t startopts;
{
DB *dbp;
DB_LOG *dblp;
@@ -474,9 +587,31 @@ __rep_start_int(env, dbt, flags)
return (EINVAL);
}
- ENV_ENTER(env, ip);
+ /*
+ * If we are a view, we can never become master.
+ */
+ if (IS_VIEW_SITE(env) && role == DB_REP_MASTER) {
+ __db_errx(env, DB_STR("3685",
+ "View site cannot become master"));
+ return (EINVAL);
+ }
+
+ /*
+ * Check for consistent view usage. We need to check here rather
+ * than in __rep_open because non-rep-aware processes such as
+ * db_stat may open/join the environment. Rep-aware handles must
+ * consistently set the view.
+ */
+ if ((ret = __rep_check_view(env)) != 0) {
+ RPRINT(env, (env, DB_VERB_REP_MISC,
+ "Application env/view mismatch."));
+ __db_errx(env, DB_STR("3686",
+ "Application environment and view callback mismatch"));
+ return (ret);
+ }
/* Serialize rep_start() calls. */
+ ENV_GET_THREAD_INFO(env, ip);
MUTEX_LOCK(env, rep->mtx_repstart);
start_th = 1;
@@ -492,8 +627,14 @@ __rep_start_int(env, dbt, flags)
goto out;
REP_SYSTEM_LOCK(env);
+ /*
+ * The FORCE_ROLECHG option is used when a side-effect of the role
+ * change such as incrementing the master gen is needed regardless
+ * of the previous role.
+ */
role_chg = (!F_ISSET(rep, REP_F_MASTER) && role == DB_REP_MASTER) ||
- (!F_ISSET(rep, REP_F_CLIENT) && role == DB_REP_CLIENT);
+ (!F_ISSET(rep, REP_F_CLIENT) && role == DB_REP_CLIENT) ||
+ FLD_ISSET(startopts, REP_START_FORCE_ROLECHG);
/*
* There is no need for lockout if all we're doing is sending a message.
@@ -511,9 +652,11 @@ __rep_start_int(env, dbt, flags)
goto out;
}
- if (FLD_ISSET(rep->lockout_flags, REP_LOCKOUT_MSG)) {
+ if (!FLD_ISSET(startopts, REP_START_WAIT_LOCKMSG) &&
+ FLD_ISSET(rep->lockout_flags, REP_LOCKOUT_MSG)) {
/*
- * There is already someone in msg lockout. Return.
+ * There is already someone in msg lockout and we are not
+ * waiting. Return.
*/
RPRINT(env, (env, DB_VERB_REP_MISC,
"Thread already in msg lockout"));
@@ -702,10 +845,15 @@ __rep_start_int(env, dbt, flags)
* now defunct on master.
* NEWFILE: Used to delay client apply during newfile
* operation, not applicable to master.
+ * READONLY_MASTER: Used to coordinate preferred master
+ * takeover, should not remain in effect after restart.
+ * HOLD_GEN: Freeze gen for preferred master, should not
+ * remain in effect after restart.
*/
F_CLR(rep, REP_F_CLIENT | REP_F_ABBREVIATED |
REP_F_MASTERELECT | REP_F_SKIPPED_APPLY | REP_F_DELAY |
- REP_F_LEASE_EXPIRED | REP_F_NEWFILE);
+ REP_F_LEASE_EXPIRED | REP_F_NEWFILE |
+ REP_F_READONLY_MASTER | REP_F_HOLD_GEN);
/*
* When becoming a master, set the following flags:
* MASTER: Indicate that this site is master.
@@ -842,11 +990,16 @@ __rep_start_int(env, dbt, flags)
}
/*
* When becoming a client, clear the following flags:
+ * HOLD_GEN: Freeze gen for preferred master, should not
+ * remain in effect after restart.
* MASTER: Site is no longer a master.
* MASTERELECT: Indicates that a master is elected
* rather than appointed, not applicable on client.
+ * READONLY_MASTER: Used to coordinate preferred master
+ * takeover, should not remain in effect after restart.
*/
- F_CLR(rep, REP_F_MASTER | REP_F_MASTERELECT);
+ F_CLR(rep, REP_F_HOLD_GEN | REP_F_MASTER | REP_F_MASTERELECT |
+ REP_F_READONLY_MASTER);
F_SET(rep, REP_F_CLIENT);
/*
@@ -928,6 +1081,15 @@ __rep_start_int(env, dbt, flags)
* sync with the master.
*/
SET_GEN(0);
+ /*
+ * If we are changing role to client, reset our min log file
+ * until we hear from a master or another client. In
+ * particular, in a dupmaster situation, if this site loses
+ * an election a stale min_log_file would prevent archiving.
+ */
+#ifdef HAVE_REPLICATION_THREADS
+ rep->min_log_file = 0;
+#endif
REP_SYSTEM_UNLOCK(env);
/*
@@ -935,6 +1097,15 @@ __rep_start_int(env, dbt, flags)
*/
if ((ret = __dbt_usercopy(env, dbt)) != 0)
goto out;
+ /*
+ * The HOLD_CLIGEN option does not allow this client's
+ * gen to change until the REP_F_HOLD_GEN flag is cleared.
+ * It prevents this site from responding to NEWMASTER messages
+ * and disables updating the gen from other incoming messages.
+ */
+ if (FLD_ISSET(startopts, REP_START_HOLD_CLIGEN))
+ F_SET(rep, REP_F_HOLD_GEN);
+
(void)__rep_send_message(env,
DB_EID_BROADCAST, REP_NEWCLIENT, NULL, dbt, 0, 0);
}
@@ -967,7 +1138,6 @@ out:
if (start_th)
MUTEX_UNLOCK(env, rep->mtx_repstart);
__dbt_userfree(env, dbt, NULL, NULL);
- ENV_LEAVE(env, ip);
return (ret);
}
@@ -1170,6 +1340,9 @@ __rep_client_dbinit(env, startup, which)
if (which == REP_DB) {
name = REPDBNAME;
rdbpp = &db_rep->rep_db;
+ } else if (which == REP_BLOB) {
+ name = REPBLOBNAME;
+ rdbpp = &db_rep->blob_dbp;
} else {
name = REPPAGENAME;
rdbpp = &db_rep->file_dbp;
@@ -1209,16 +1382,28 @@ __rep_client_dbinit(env, startup, which)
if (which == REP_DB &&
(ret = __bam_set_bt_compare(dbp, __rep_bt_cmp)) != 0)
goto err;
+ if (which == REP_BLOB &&
+ (ret = __bam_set_bt_compare(dbp, __rep_blob_cmp)) != 0 &&
+ (ret = __db_set_dup_compare(dbp, __rep_offset_cmp)) != 0)
+ goto err;
/* Don't write log records on the client. */
if ((ret = __db_set_flags(dbp, DB_TXN_NOT_DURABLE)) != 0)
goto err;
+ /* Blob gap processing requires sorted duplicates. */
+ if (which == REP_BLOB) {
+ if ((ret = __db_set_blob_threshold(dbp, 0, 0)) != 0)
+ goto err;
+ if ((ret = __db_set_flags(dbp, DB_DUPSORT)) != 0)
+ goto err;
+ }
+
flags = DB_NO_AUTO_COMMIT | DB_CREATE | DB_INTERNAL_TEMPORARY_DB |
(F_ISSET(env, ENV_THREAD) ? DB_THREAD : 0);
if ((ret = __db_open(dbp, ip, NULL, fname, subdb,
- (which == REP_DB ? DB_BTREE : DB_RECNO),
+ (which == REP_PG ? DB_RECNO : DB_BTREE),
flags, 0, PGNO_BASE_MD)) != 0)
goto err;
@@ -1243,14 +1428,16 @@ err: if (dbp != NULL &&
* care about the LSNs.
*/
static int
-__rep_bt_cmp(dbp, dbt1, dbt2)
+__rep_bt_cmp(dbp, dbt1, dbt2, locp)
DB *dbp;
const DBT *dbt1, *dbt2;
+ size_t *locp;
{
DB_LSN lsn1, lsn2;
__rep_control_args *rp1, *rp2;
COMPQUIET(dbp, NULL);
+ COMPQUIET(locp, NULL);
rp1 = dbt1->data;
rp2 = dbt2->data;
@@ -1274,6 +1461,82 @@ __rep_bt_cmp(dbp, dbt1, dbt2)
}
/*
+ * __rep_blob_cmp --
+ *
+ * Comparison function for the blob gap database. The key is the blob_sid
+ * appended with the blob_id.
+ *
+ * PUBLIC: int __rep_blob_cmp __P((DB *, const DBT *, const DBT *, size_t *));
+ */
+int
+__rep_blob_cmp(dbp, dbt1, dbt2, locp)
+ DB *dbp;
+ const DBT *dbt1, *dbt2;
+ size_t *locp;
+{
+ db_seq_t blob_id1, blob_id2, blob_sid1, blob_sid2;
+ u_int8_t *p;
+
+ COMPQUIET(dbp, NULL);
+ COMPQUIET(locp, NULL);
+
+ /* Use memcpy here to prevent alignment issues. */
+ p = dbt1->data;
+ memcpy(&blob_sid1, p, sizeof(db_seq_t));
+ p += sizeof(db_seq_t);
+ memcpy(&blob_id1, p, sizeof(db_seq_t));
+ p = dbt2->data;
+ memcpy(&blob_sid2, p, sizeof(db_seq_t));
+ p += sizeof(db_seq_t);
+ memcpy(&blob_id2, p, sizeof(db_seq_t));
+
+ if (blob_sid1 > blob_sid2)
+ return (1);
+
+ if (blob_sid1 < blob_sid2)
+ return (-1);
+
+ if (blob_id1 > blob_id2)
+ return (1);
+
+ if (blob_id1 < blob_id2)
+ return (-1);
+
+ return (0);
+}
+
+/*
+ * __rep_offset_cmp --
+ *
+ * Comparison function for duplicates in the the blob gap database.
+ *
+ * PUBLIC: int __rep_offset_cmp
+ * PUBLIC: __P((DB *, const DBT *, const DBT *, size_t *));
+ */
+int
+__rep_offset_cmp(dbp, dbt1, dbt2, locp)
+ DB *dbp;
+ const DBT *dbt1, *dbt2;
+ size_t *locp;
+{
+ off_t offset1, offset2;
+
+ COMPQUIET(dbp, NULL);
+ COMPQUIET(locp, NULL);
+
+ /* Use memcpy here to prevent alignment issues. */
+ memcpy(&offset1, dbt1->data, sizeof(off_t));
+ memcpy(&offset2, dbt2->data, sizeof(off_t));
+
+ if (offset1 == offset2)
+ return (0);
+ else if (offset1 > offset2)
+ return (1);
+
+ return (-1);
+}
+
+/*
* __rep_abort_prepared --
* Abort any prepared transactions that recovery restored.
*
@@ -1684,7 +1947,10 @@ __rep_set_nsites_pp(dbenv, n)
"DB_ENV->rep_set_nsites: cannot call from Replication Manager application"));
return (EINVAL);
}
- if ((ret = __rep_set_nsites_int(env, n)) == 0)
+ ENV_ENTER(env, ip);
+ ret = __rep_set_nsites_int(env, n);
+ ENV_LEAVE(env, ip);
+ if (ret == 0)
APP_SET_BASEAPI(env);
return (ret);
}
@@ -1748,18 +2014,15 @@ __rep_get_nsites(dbenv, n)
}
/*
- * PUBLIC: int __rep_set_priority __P((DB_ENV *, u_int32_t));
+ * PUBLIC: int __rep_set_priority_pp __P((DB_ENV *, u_int32_t));
*/
int
-__rep_set_priority(dbenv, priority)
+__rep_set_priority_pp(dbenv, priority)
DB_ENV *dbenv;
u_int32_t priority;
{
DB_REP *db_rep;
ENV *env;
- REP *rep;
- u_int32_t prev;
- int ret;
env = dbenv->env;
db_rep = env->rep_handle;
@@ -1767,6 +2030,30 @@ __rep_set_priority(dbenv, priority)
ENV_NOT_CONFIGURED(
env, db_rep->region, "DB_ENV->rep_set_priority", DB_INIT_REP);
+ if (PREFMAS_IS_SET(env)) {
+ __db_errx(env, DB_STR_A("3710",
+"%s: cannot change priority in preferred master mode.",
+ "%s"), "DB_ENV->rep_set_priority");
+ return (EINVAL);
+ }
+
+ return (__rep_set_priority_int(env, priority));
+}
+
+/*
+ * PUBLIC: int __rep_set_priority_int __P((ENV *, u_int32_t));
+ */
+int
+__rep_set_priority_int(env, priority)
+ ENV *env;
+ u_int32_t priority;
+{
+ DB_REP *db_rep;
+ REP *rep;
+ u_int32_t prev;
+ int ret;
+
+ db_rep = env->rep_handle;
ret = 0;
if (REP_ON(env)) {
rep = db_rep->region;
@@ -1807,10 +2094,10 @@ __rep_get_priority(dbenv, priority)
}
/*
- * PUBLIC: int __rep_set_timeout __P((DB_ENV *, int, db_timeout_t));
+ * PUBLIC: int __rep_set_timeout_pp __P((DB_ENV *, int, db_timeout_t));
*/
int
-__rep_set_timeout(dbenv, which, timeout)
+__rep_set_timeout_pp(dbenv, which, timeout)
DB_ENV *dbenv;
int which;
db_timeout_t timeout;
@@ -1818,13 +2105,10 @@ __rep_set_timeout(dbenv, which, timeout)
DB_REP *db_rep;
DB_THREAD_INFO *ip;
ENV *env;
- REP *rep;
int repmgr_timeout, ret;
env = dbenv->env;
db_rep = env->rep_handle;
- rep = db_rep->region;
- ret = 0;
repmgr_timeout = 0;
if (timeout == 0 && (which == DB_REP_CONNECTION_RETRY ||
@@ -1850,12 +2134,46 @@ __rep_set_timeout(dbenv, which, timeout)
return (EINVAL);
}
if (which == DB_REP_LEASE_TIMEOUT && IS_REP_STARTED(env)) {
- ret = EINVAL;
__db_errx(env, DB_STR_A("3568",
"%s: lease timeout must be set before DB_ENV->rep_start.",
"%s"), "DB_ENV->rep_set_timeout");
return (EINVAL);
}
+ if (PREFMAS_IS_SET(env) &&
+ (which == DB_REP_HEARTBEAT_MONITOR ||
+ which == DB_REP_HEARTBEAT_SEND) &&
+ timeout == 0) {
+ __db_errx(env, DB_STR_A("3711",
+"%s: cannot turn off heartbeat timeout in preferred master mode.",
+ "%s"), "DB_ENV->rep_set_timeout");
+ return (EINVAL);
+ }
+
+ ret = __rep_set_timeout_int(env, which, timeout);
+
+ /* Setting a repmgr timeout makes this a repmgr application */
+ if (ret == 0 && repmgr_timeout)
+ APP_SET_REPMGR(env);
+ return (ret);
+
+}
+
+/*
+ * PUBLIC: int __rep_set_timeout_int __P((ENV *, int, db_timeout_t));
+ */
+int
+__rep_set_timeout_int(env, which, timeout)
+ ENV *env;
+ int which;
+ db_timeout_t timeout;
+{
+ DB_REP *db_rep;
+ REP *rep;
+ int ret;
+
+ db_rep = env->rep_handle;
+ rep = db_rep->region;
+ ret = 0;
switch (which) {
case DB_REP_CHECKPOINT_DELAY:
@@ -1888,6 +2206,7 @@ __rep_set_timeout(dbenv, which, timeout)
rep->ack_timeout = timeout;
else
db_rep->ack_timeout = timeout;
+ ADJUST_AUTOTAKEOVER_WAITS(db_rep, timeout);
break;
case DB_REP_CONNECTION_RETRY:
if (REP_ON(env))
@@ -1919,10 +2238,6 @@ __rep_set_timeout(dbenv, which, timeout)
"Unknown timeout type argument to DB_ENV->rep_set_timeout"));
ret = EINVAL;
}
-
- /* Setting a repmgr timeout makes this a repmgr application */
- if (ret == 0 && repmgr_timeout)
- APP_SET_REPMGR(env);
return (ret);
}
@@ -2099,6 +2414,144 @@ __rep_set_request(dbenv, min, max)
}
/*
+ * __rep_set_view --
+ * Set the view/partial replication function.
+ *
+ * PUBLIC: int __rep_set_view __P((DB_ENV *,
+ * PUBLIC: int (*)(DB_ENV *, const char *, int *, u_int32_t)));
+ */
+int
+__rep_set_view(dbenv, f_partial)
+ DB_ENV *dbenv;
+ int (*f_partial) __P((DB_ENV *,
+ const char *, int *, u_int32_t));
+{
+ DB_REP *db_rep;
+ ENV *env;
+
+ env = dbenv->env;
+ db_rep = env->rep_handle;
+
+ ENV_NOT_CONFIGURED(
+ env, db_rep->region, "DB_ENV->rep_set_view", DB_INIT_REP);
+
+ ENV_ILLEGAL_AFTER_OPEN(env, "DB_ENV->rep_set_view");
+
+ if (f_partial == NULL)
+ db_rep->partial = __rep_defview;
+ else
+ db_rep->partial = f_partial;
+ return (0);
+}
+
+/*
+ * __rep_defview --
+ * Default view function. Always replicate.
+ */
+static int
+__rep_defview(dbenv, name, result, flags)
+ DB_ENV *dbenv;
+ const char *name;
+ int *result;
+ u_int32_t flags;
+{
+ COMPQUIET(dbenv, NULL);
+ COMPQUIET(name, NULL);
+ COMPQUIET(flags, 0);
+ *result = 1;
+ return (0);
+}
+
+/*
+ * __rep_call_partial --
+ * Calls the partial function, after doing some checks required for
+ * handling blobs.
+ *
+ * PUBLIC: int __rep_call_partial
+ * PUBLIC: __P((ENV *, const char *, int *, u_int32_t, DELAYED_BLOB_LIST **));
+ */
+int
+__rep_call_partial(env, name, result, flags, lsp)
+ ENV *env;
+ const char *name;
+ int *result;
+ u_int32_t flags;
+ DELAYED_BLOB_LIST **lsp;
+{
+ DB_LOG *dblp;
+ DB_REP *db_rep;
+ DELAYED_BLOB_LIST *dbl;
+ FNAME *fname;
+ db_seq_t blob_file_id;
+ char *file_name;
+ int ret;
+
+ ret = 0;
+ blob_file_id = 0;
+ db_rep = env->rep_handle;
+ dblp = env->lg_handle;
+ fname = NULL;
+
+ /*
+ * If the database being sent is a blob meta database or file, then the
+ * name of its associated database needs to be passed to the partial
+ * function. To do this, use the blob file id in the path to the
+ * file to look up the blob_file_id of the associated database. That
+ * can be used to look up the name of the associated database through
+ * dbreg.
+ */
+ if (db_rep->partial == __rep_defview ||
+ (!IS_BLOB_META(name) && !IS_BLOB_FILE(name))) {
+ ret = db_rep->partial(env->dbenv, name, result, flags);
+ } else {
+ /*
+ * The top level blob meta database must always be replicated.
+ */
+ if (strcmp(name, BLOB_META_FILE_NAME) == 0) {
+ *result = 1;
+ return (ret);
+ }
+ if ((ret = __blob_path_to_dir_ids(
+ env, name, &blob_file_id, NULL)) != 0)
+ return (ret);
+ DB_ASSERT(env, blob_file_id > 0);
+
+ /*
+ * It is possible that the database that owns this blob meta
+ * database has not yet been processed on the client when
+ * processing the transaction, so assume it is not replicated.
+ * Return its information and process it later when its
+ * owning database is processed (which must happen in the
+ * same transaction).
+ */
+ if (__dbreg_blob_file_to_fname(
+ dblp, blob_file_id, 0, &fname) != 0) {
+ if ((ret = __os_malloc(
+ env, sizeof(DELAYED_BLOB_LIST), &dbl)) != 0)
+ return (ret);
+ memset(dbl, 0, sizeof(DELAYED_BLOB_LIST));
+ dbl->blob_file_id = blob_file_id;
+ if (*lsp == NULL)
+ *lsp = dbl;
+ else {
+ dbl->next = *lsp;
+ (*lsp)->prev = dbl;
+ *lsp = dbl;
+ }
+ *result = 0;
+ return (0);
+ }
+
+ file_name = fname->fname_off == INVALID_ROFF ?
+ NULL : R_ADDR(&dblp->reginfo, fname->fname_off);
+ DB_ASSERT(env, file_name != NULL);
+ ret = db_rep->partial(env->dbenv, file_name, result, flags);
+ }
+
+ return (ret);
+}
+
+/*
* __rep_set_transport_pp --
* Set the transport function for replication.
*
@@ -2288,25 +2741,46 @@ __rep_set_clockskew(dbenv, fast_clock, slow_clock)
}
/*
- * __rep_flush --
+ * __rep_flush_pp --
* Re-push the last log record to all clients, in case they've lost
* messages and don't know it.
*
- * PUBLIC: int __rep_flush __P((DB_ENV *));
+ * PUBLIC: int __rep_flush_pp __P((DB_ENV *));
*/
int
-__rep_flush(dbenv)
+__rep_flush_pp (dbenv)
DB_ENV *dbenv;
{
+ ENV *env;
+ DB_THREAD_INFO *ip;
+ int ret;
+
+ env = dbenv->env;
+
+ ENV_ENTER(env, ip);
+ ret = __rep_flush_int(env);
+ ENV_LEAVE(env, ip);
+
+ return (ret);
+}
+
+/*
+ * __rep_flush_int --
+ * Re-push the last log record to all clients, in case they've lost
+ * messages and don't know it.
+ *
+ * PUBLIC: int __rep_flush_int __P((ENV *));
+ */
+int
+__rep_flush_int(env)
+ ENV *env;
+{
DBT rec;
DB_LOGC *logc;
DB_LSN lsn;
DB_REP *db_rep;
- DB_THREAD_INFO *ip;
- ENV *env;
int ret, t_ret;
- env = dbenv->env;
db_rep = env->rep_handle;
ENV_REQUIRES_CONFIG_XX(
@@ -2322,8 +2796,6 @@ __rep_flush(dbenv)
return (EINVAL);
}
- ENV_ENTER(env, ip);
-
if ((ret = __log_cursor(env, &logc)) != 0)
return (ret);
@@ -2338,7 +2810,6 @@ __rep_flush(dbenv)
err: if ((t_ret = __logc_close(logc)) != 0 && ret == 0)
ret = t_ret;
- ENV_LEAVE(env, ip);
return (ret);
}
@@ -2693,7 +3164,7 @@ __rep_check_applied(env, ip, commit_info, reasonp)
*/
if (commit_info->gen == gen) {
ret = __rep_read_lsn_history(env,
- ip, &txn, &dbc, gen, &hist, reasonp, DB_SET);
+ ip, &txn, &dbc, gen, &hist, reasonp, DB_SET, 1);
if (ret == DB_NOTFOUND) {
/*
* We haven't yet received the LSN history of the
@@ -2720,7 +3191,7 @@ __rep_check_applied(env, ip, commit_info, reasonp)
* masters at the same gen, and the txn of interest was
* rolled back.
*/
- ret = DB_NOTFOUND;
+ ret = USR_ERR(env, DB_NOTFOUND);
goto out;
}
@@ -2750,7 +3221,7 @@ __rep_check_applied(env, ip, commit_info, reasonp)
* description of the txn of interest doesn't match what we see
* in the history available to us now.
*/
- ret = DB_NOTFOUND;
+ ret = USR_ERR(env, DB_NOTFOUND);
} else if (commit_info->gen < gen || gen == 0) {
/*
@@ -2759,10 +3230,10 @@ __rep_check_applied(env, ip, commit_info, reasonp)
* the token LSN is within the close/open range defined by
* [base,next).
*/
- ret = __rep_read_lsn_history(env,
- ip, &txn, &dbc, commit_info->gen, &hist, reasonp, DB_SET);
- t_ret = __rep_read_lsn_history(env,
- ip, &txn, &dbc, commit_info->gen, &hist2, reasonp, DB_NEXT);
+ ret = __rep_read_lsn_history(env, ip,
+ &txn, &dbc, commit_info->gen, &hist, reasonp, DB_SET, 1);
+ t_ret = __rep_read_lsn_history(env, ip,
+ &txn, &dbc, commit_info->gen, &hist2, reasonp, DB_NEXT, 1);
if (ret == DB_NOTFOUND) {
/*
* If the desired gen is not in our database, it could
@@ -2812,7 +3283,7 @@ __rep_check_applied(env, ip, commit_info, reasonp)
* don't match, meaning the txn was written at a dup
* master and that gen instance was rolled back.
*/
- ret = DB_NOTFOUND;
+ ret = USR_ERR(env, DB_NOTFOUND);
goto out;
}
@@ -2837,7 +3308,7 @@ __rep_check_applied(env, ip, commit_info, reasonp)
LOG_COMPARE(&commit_info->lsn, &hist2.lsn) < 0)
ret = 0;
else
- ret = DB_NOTFOUND;
+ ret = USR_ERR(env, DB_NOTFOUND);
} else {
/*
* Token names a future gen. If we're a client and the LSN also
@@ -2851,7 +3322,7 @@ __rep_check_applied(env, ip, commit_info, reasonp)
reasonp->u.gen = commit_info->gen;
return (DB_TIMEOUT);
}
- return (DB_NOTFOUND);
+ return (USR_ERR(env, DB_NOTFOUND));
}
out:
@@ -2867,9 +3338,19 @@ out:
/*
* The txn and dbc handles are owned by caller, though we create them if
* necessary. Caller is responsible for closing them.
+ *
+ * The use_cache option is enabled for the read-your-writes feature, which
+ * makes frequent requests for the cached information (envid and lsn) when it
+ * is in use. Callers that require information that is not cached (e.g.
+ * timestamp) should not set use_cache.
+ *
+ * PUBLIC: int __rep_read_lsn_history __P((ENV *, DB_THREAD_INFO *, DB_TXN **,
+ * PUBLIC: DBC **, u_int32_t, __rep_lsn_hist_data_args *,
+ * PUBLIC: struct rep_waitgoal *, u_int32_t, int));
*/
-static int
-__rep_read_lsn_history(env, ip, txn, dbc, gen, gen_infop, reasonp, flags)
+int
+__rep_read_lsn_history(env, ip, txn, dbc, gen, gen_infop, reasonp, flags,
+ use_cache)
ENV *env;
DB_THREAD_INFO *ip;
DB_TXN **txn;
@@ -2878,6 +3359,7 @@ __rep_read_lsn_history(env, ip, txn, dbc, gen, gen_infop, reasonp, flags)
__rep_lsn_hist_data_args *gen_infop;
struct rep_waitgoal *reasonp;
u_int32_t flags;
+ int use_cache;
{
DB_REP *db_rep;
REP *rep;
@@ -2898,7 +3380,8 @@ __rep_read_lsn_history(env, ip, txn, dbc, gen, gen_infop, reasonp, flags)
/* Simply return cached info, if we already have it. */
desired_gen = flags == DB_SET ? gen : gen + 1;
REP_SYSTEM_LOCK(env);
- if (rep->gen == desired_gen && !IS_ZERO_LSN(rep->gen_base_lsn)) {
+ if (use_cache && rep->gen == desired_gen &&
+ !IS_ZERO_LSN(rep->gen_base_lsn)) {
gen_infop->lsn = rep->gen_base_lsn;
gen_infop->envid = rep->master_envid;
goto unlock;
@@ -3005,8 +3488,14 @@ __rep_conv_vers(env, log_ver)
/*
* We can't use a switch statement, some of the DB_LOGVERSION_XX
- * constants are the same
+ * constants are the same.
*/
+ if (log_ver == DB_LOGVERSION_61)
+ return (DB_REPVERSION_61);
+ if (log_ver == DB_LOGVERSION_60p1)
+ return (DB_REPVERSION_60);
+ if (log_ver == DB_LOGVERSION_60)
+ return (DB_REPVERSION_60);
if (log_ver == DB_LOGVERSION_53)
return (DB_REPVERSION_53);
if (log_ver == DB_LOGVERSION_52)
diff --git a/src/rep/rep_record.c b/src/rep/rep_record.c
index f4691974..b206e60e 100644
--- a/src/rep/rep_record.c
+++ b/src/rep/rep_record.c
@@ -1,7 +1,7 @@
/*-
* See the file LICENSE for redistribution information.
*
- * Copyright (c) 2001, 2012 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2001, 2015 Oracle and/or its affiliates. All rights reserved.
*
* $Id$
*/
@@ -9,13 +9,17 @@
#include "db_config.h"
#include "db_int.h"
+#include "dbinc/blob.h"
#include "dbinc/db_page.h"
#include "dbinc/db_am.h"
#include "dbinc/lock.h"
#include "dbinc/mp.h"
#include "dbinc/txn.h"
-static int __rep_collect_txn __P((ENV *, DB_LSN *, LSN_COLLECTION *));
+static int __rep_collect_txn
+ __P((ENV *, DB_LSN *, LSN_COLLECTION *, DELAYED_BLOB_LIST **));
+static int __rep_remove_delayed_blobs
+ __P((ENV *, db_seq_t, u_int32_t ,DELAYED_BLOB_LIST **));
static int __rep_do_ckp __P((ENV *, DBT *, __rep_control_args *));
static int __rep_fire_newmaster __P((ENV *, u_int32_t, int));
static int __rep_fire_startupdone __P((ENV *, u_int32_t, int));
@@ -153,6 +157,7 @@ __rep_process_message_pp(dbenv, control, rec, eid, ret_lsnp)
DB_LSN *ret_lsnp;
{
ENV *env;
+ DB_THREAD_INFO *ip;
int ret;
env = dbenv->env;
@@ -193,7 +198,9 @@ __rep_process_message_pp(dbenv, control, rec, eid, ret_lsnp)
return (ret);
}
+ ENV_ENTER(env, ip);
ret = __rep_process_message_int(env, control, rec, eid, ret_lsnp);
+ ENV_LEAVE(env, ip);
__dbt_userfree(env, control, rec, NULL);
return (ret);
@@ -289,8 +296,7 @@ __rep_process_message_int(env, control, rec, eid, ret_lsnp)
if (ret_lsnp != NULL)
ZERO_LSN(*ret_lsnp);
- ENV_ENTER(env, ip);
-
+ ENV_GET_THREAD_INFO(env, ip);
REP_PRINT_MESSAGE(env, eid, rp, "rep_process_message", 0);
/*
* Check the version number for both rep and log. If it is
@@ -303,8 +309,7 @@ __rep_process_message_int(env, control, rec, eid, ret_lsnp)
"%lu %d"), (u_long)rp->rep_version,
DB_REPVERSION_MIN);
- ret = EINVAL;
- goto errlock;
+ return (EINVAL);
}
VPRINT(env, (env, DB_VERB_REP_MSGS,
"Received record %lu with old rep version %lu",
@@ -322,8 +327,7 @@ __rep_process_message_int(env, control, rec, eid, ret_lsnp)
__db_errx(env, DB_STR_A("3517",
"unexpected replication message version %lu, expected %d",
"%lu %d"), (u_long)rp->rep_version, DB_REPVERSION);
- ret = EINVAL;
- goto errlock;
+ return (EINVAL);
}
if (rp->log_version < DB_LOGVERSION) {
@@ -332,8 +336,7 @@ __rep_process_message_int(env, control, rec, eid, ret_lsnp)
"unsupported old replication log version %lu, minimum version %d",
"%lu %d"), (u_long)rp->log_version,
DB_LOGVERSION_MIN);
- ret = EINVAL;
- goto errlock;
+ return (EINVAL);
}
VPRINT(env, (env, DB_VERB_REP_MSGS,
"Received record %lu with old log version %lu",
@@ -342,8 +345,7 @@ __rep_process_message_int(env, control, rec, eid, ret_lsnp)
__db_errx(env, DB_STR_A("3519",
"unexpected log record version %lu, expected %d",
"%lu %d"), (u_long)rp->log_version, DB_LOGVERSION);
- ret = EINVAL;
- goto errlock;
+ return (EINVAL);
}
/*
@@ -465,9 +467,14 @@ __rep_process_message_int(env, control, rec, eid, ret_lsnp)
* accept the generation number and participate in future
* elections and communication. Otherwise, I need to hear about
* a new master and sync up.
+ *
+ * But do not do any of this if REP_F_HOLD_GEN is set. In
+ * this case we keep the site at its current gen until we
+ * clear this flag.
*/
- if (rp->rectype == REP_ALIVE ||
- rp->rectype == REP_VOTE1 || rp->rectype == REP_VOTE2) {
+ if ((rp->rectype == REP_ALIVE ||
+ rp->rectype == REP_VOTE1 || rp->rectype == REP_VOTE2) &&
+ !F_ISSET(rep, REP_F_HOLD_GEN)) {
REP_SYSTEM_LOCK(env);
RPRINT(env, (env, DB_VERB_REP_MSGS,
"Updating gen from %lu to %lu",
@@ -593,6 +600,38 @@ __rep_process_message_int(env, control, rec, eid, ret_lsnp)
ret = __rep_allreq(env, rp, eid);
CLIENT_REREQ;
break;
+ case REP_BLOB_ALL_REQ:
+ /* Blobs do not support peer-to-peer. */
+ RECOVERING_SKIP;
+ MASTER_ONLY(rep, rp);
+ ret = __rep_blob_allreq(env, eid, rec);
+ CLIENT_REREQ;
+ break;
+ case REP_BLOB_CHUNK:
+ /* Handle even if in recovery. */
+ CLIENT_ONLY(rep, rp);
+ ret = __rep_blob_chunk(env, eid, ip, rec);
+ if (ret == DB_REP_PAGEDONE)
+ ret = 0;
+ break;
+ case REP_BLOB_CHUNK_REQ:
+ /* Blobs do not support peer-to-peer. */
+ RECOVERING_SKIP;
+ MASTER_ONLY(rep, rp);
+ ret = __rep_blob_chunk_req(env, eid, rec);
+ CLIENT_REREQ;
+ break;
+ case REP_BLOB_UPDATE:
+ CLIENT_ONLY(rep, rp);
+ ret = __rep_blob_update(env, eid, ip, rec);
+ break;
+ case REP_BLOB_UPDATE_REQ:
+ MASTER_ONLY(rep, rp);
+ infop = env->reginfo;
+ renv = infop->primary;
+ MASTER_UPDATE(env, renv);
+ ret = __rep_blob_update_req(env, ip, rec);
+ break;
case REP_BULK_LOG:
RECOVERING_LOG_SKIP;
CLIENT_ONLY(rep, rp);
@@ -1059,8 +1098,6 @@ out:
*ret_lsnp = rp->lsn;
ret = DB_REP_NOTPERM;
}
- __dbt_userfree(env, control, rec, NULL);
- ENV_LEAVE(env, ip);
return (ret);
}
@@ -1290,8 +1327,24 @@ gap_check:
#endif
}
- if (ret == DB_KEYEXIST)
+ if (ret == DB_KEYEXIST) {
+ STAT(rep->stat.st_log_duplicated++);
+#ifdef CONFIG_TEST
+ STAT(rep->stat.st_log_futuredup++);
+#endif
+ if (is_dupp != NULL) {
+ *is_dupp = 1;
+ /*
+ * Could get overwritten by max_lsn later,
+ * but only when returning NOTPERM for a
+ * REPCTL_PERM record, in which case max_lsn
+ * is this log record.
+ */
+ if (ret_lsnp != NULL)
+ *ret_lsnp = lp->ready_lsn;
+ }
ret = 0;
+ }
if (ret != 0 && ret != ENOMEM)
goto done;
@@ -1337,10 +1390,11 @@ gap_check:
* But max_lsn is guaranteed <= ready_lsn, so
* it would be a more conservative LSN to return.
*/
- *ret_lsnp = lp->ready_lsn;
+ if (ret_lsnp != NULL)
+ *ret_lsnp = lp->ready_lsn;
}
LOGCOPY_32(env, &rectype, rec->data);
- if (rectype == DB___txn_regop || rectype == DB___txn_ckp)
+ if (IS_PERM_RECTYPE(rectype))
max_lsn = lp->max_perm_lsn;
/*
* We check REPCTL_LEASE here, because this client may
@@ -1536,6 +1590,7 @@ __rep_process_txn(env, rec)
DB_REP *db_rep;
DB_THREAD_INFO *ip;
DB_TXNHEAD *txninfo;
+ DELAYED_BLOB_LIST *dblp, *dummy;
LSN_COLLECTION lc;
REP *rep;
__txn_regop_args *txn_args;
@@ -1548,12 +1603,12 @@ __rep_process_txn(env, rec)
db_rep = env->rep_handle;
rep = db_rep->region;
logc = NULL;
+ dblp = dummy = NULL;
txn_args = NULL;
txn42_args = NULL;
prep_args = NULL;
txninfo = NULL;
- ENV_ENTER(env, ip);
memset(&data_dbt, 0, sizeof(data_dbt));
if (F_ISSET(env, ENV_THREAD))
F_SET(&data_dbt, DB_DBT_REALLOC);
@@ -1618,8 +1673,19 @@ __rep_process_txn(env, rec)
goto err;
/* Phase 1. Get a list of the LSNs in this transaction, and sort it. */
- if ((ret = __rep_collect_txn(env, &prev_lsn, &lc)) != 0)
+ if ((ret = __rep_collect_txn(env, &prev_lsn, &lc, &dblp)) != 0)
goto err;
+ /* Deal with any child transactions that had to be delayed. */
+ while (dblp != NULL) {
+ if ((ret = __rep_collect_txn(
+ env, &dblp->lsn, &lc, &dummy)) != 0)
+ goto err;
+ DB_ASSERT(env, dummy == NULL);
+ dummy = dblp;
+ dblp = dummy->next;
+ __os_free(env, dummy);
+ dummy = NULL;
+ }
qsort(lc.array, lc.nlsns, sizeof(DB_LSN), __rep_lsn_cmp);
/*
@@ -1627,6 +1693,7 @@ __rep_process_txn(env, rec)
* records. Create a txnlist so that they can keep track of file
* state between records.
*/
+ ENV_GET_THREAD_INFO(env, ip);
if ((ret = __db_txnlist_init(env, ip, 0, 0, NULL, &txninfo)) != 0)
goto err;
@@ -1647,6 +1714,7 @@ __rep_process_txn(env, rec)
(u_long)lsnp->file, (u_long)lsnp->offset);
goto err;
}
+ LOGCOPY_32(env, &rectype, data_dbt.data);
}
err: memset(&req, 0, sizeof(req));
@@ -1658,6 +1726,12 @@ err: memset(&req, 0, sizeof(req));
if ((t_ret = __lock_id_free(env, locker)) != 0 && ret == 0)
ret = t_ret;
+ while (dblp != NULL) {
+ dummy = dblp;
+ dblp = dummy->next;
+ __os_free(env, dummy);
+ }
+
err1: if (txn_args != NULL)
__os_free(env, txn_args);
if (txn42_args != NULL)
@@ -1694,25 +1768,52 @@ err1: if (txn_args != NULL)
* the entire transaction family at once.
*/
static int
-__rep_collect_txn(env, lsnp, lc)
+__rep_collect_txn(env, lsnp, lc, dbl)
ENV *env;
DB_LSN *lsnp;
LSN_COLLECTION *lc;
+ DELAYED_BLOB_LIST **dbl;
{
+ __dbreg_register_args *dbregargp;
__txn_child_args *argp;
DB_LOGC *logc;
DB_LSN c_lsn;
+ DB_REP *db_rep;
DBT data;
- u_int32_t rectype;
+ db_seq_t blob_file_id;
+ u_int32_t child, rectype, skip_txnid;
u_int nalloc;
- int ret, t_ret;
+ int ret, t_ret, view_partial;
+ char *name;
memset(&data, 0, sizeof(data));
F_SET(&data, DB_DBT_REALLOC);
+ skip_txnid = TXN_INVALID;
if ((ret = __log_cursor(env, &logc)) != 0)
return (ret);
+ /*
+ * For partial replication we assume a certain sequence of
+ * log records to detect a database create and skip it if
+ * desired. We are walking backward through the records of
+ * a single transaction right now.
+ *
+ * A create operation is done inside a BDB-owned child txn.
+ * Nothing else is done within this BDB-owned child txn.
+ * The last piece of a create operations is the dbreg_register
+ * log record that records the opening of the file. That
+ * log record contains the child txnid in the 'id' field, and
+ * the file name. At this point we invoke the partial callback
+ * to determine if this database should be replicated. If it
+ * should not be replicated, we need to avoid collecting the
+ * entire child txn referenced in the 'id' field.
+ *
+ * So if processing the dbreg_register record finds a database
+ * to skip, we store the child txnid in 'skip_txnid'. We use
+ * 'skip_txnid' to avoid processing log records or making
+ * recursive calls for that txnid.
+ */
while (!IS_ZERO_LSN(*lsnp) &&
(ret = __logc_get(logc, lsnp, &data, DB_SET)) == 0) {
LOGCOPY_32(env, &rectype, data.data);
@@ -1722,9 +1823,66 @@ __rep_collect_txn(env, lsnp, lc)
goto err;
c_lsn = argp->c_lsn;
*lsnp = argp->prev_lsn;
+ child = argp->child;
__os_free(env, argp);
- ret = __rep_collect_txn(env, &c_lsn, lc);
- } else {
+
+ if (child == skip_txnid && *dbl != NULL &&
+ (*dbl)->child == child)
+ (*dbl)->lsn = c_lsn;
+ /*
+ * If skip_txnid is set, it is the id of the child txnid
+ * that creates a database we should skip. So, if
+ * this is that child txn, do not collect it.
+ */
+ if (skip_txnid == TXN_INVALID || child != skip_txnid)
+ ret = __rep_collect_txn(env, &c_lsn, lc, dbl);
+ } else if (IS_VIEW_SITE(env) &&
+ rectype == DB___dbreg_register) {
+ db_rep = env->rep_handle;
+ /*
+ * If we are a view see if this is a file creation
+ * stream. On-disk files have the creating child txn
+ * in the 'id' field and the name. See if this view
+ * wants this file.
+ */
+ if ((ret = __dbreg_register_read(
+ env, data.data, &dbregargp)) != 0)
+ goto err;
+ child = dbregargp->id;
+ name = (char *)dbregargp->name.data;
+ skip_txnid = TXN_INVALID;
+ if (child != TXN_INVALID &&
+ (!IS_DB_FILE(name) || IS_BLOB_META(name))) {
+ /*
+ * The 'id' has a child txn so it is a create.
+ */
+ DB_ASSERT(env, db_rep->partial != NULL);
+ GET_LO_HI(env, dbregargp->blob_fid_lo,
+ dbregargp->blob_fid_hi, blob_file_id, ret);
+ if (ret != 0)
+ goto err;
+ if ((ret = __rep_call_partial(env,
+ name, &view_partial, 0, dbl)) != 0) {
+ VPRINT(env, (env, DB_VERB_REP_MISC,
+ "rep_collect_txn: partial cb err %d for %s", ret, name));
+ __os_free(env, dbregargp);
+ goto err;
+ }
+ /*
+ * Save the child txnid for when we walk back
+ * into the txn_child record.
+ */
+ if (view_partial == 0) {
+ skip_txnid = child;
+ if ((ret =
+ __rep_remove_delayed_blobs(env,
+ blob_file_id, child, dbl)) != 0)
+ goto err;
+ }
+ }
+ __os_free(env, dbregargp);
+ }
+ if (rectype != DB___txn_child) {
if (lc->nalloc < lc->nlsns + 1) {
nalloc = lc->nalloc == 0 ? 20 : lc->nalloc * 2;
if ((ret = __os_realloc(env,
@@ -1761,6 +1919,62 @@ err: if ((t_ret = __logc_close(logc)) != 0 && ret == 0)
}
/*
+ * __rep_remove_delayed_blobs --
+ *
+ * If a blob meta database is opened in the same transaction as the database
+ * that owns it, then deciding whether it should be replicated or not needs
+ * to be delayed until after the rest of the transaction is processed. To do
+ * this, the transaction's information is added to a DELAYED_BLOB_LIST. When
+ * the owning database is processed, if it is not replicated then remove the
+ * entry of its blob meta database from the delayed list.
+ */
+static int
+__rep_remove_delayed_blobs(env, blob_file_id, child, dbl)
+ ENV *env;
+ db_seq_t blob_file_id;
+ u_int32_t child;
+ DELAYED_BLOB_LIST **dbl;
+{
+ DELAYED_BLOB_LIST *ent, *next, *prev;
+
+ if (*dbl == NULL)
+ return (0);
+
+ /*
+ * If the child transaction has not been set, then a new entry was just
+ * added to the list.
+ */
+ if ((*dbl)->child == 0) {
+ (*dbl)->child = child;
+ return (0);
+ }
+
+ if (blob_file_id == 0)
+ return (0);
+
+ /*
+ * This blob meta database should not be replicated if its associated
+ * database is not replicated. Remove it from the delayed
+ * list so it will not be processed at a later time.
+ */
+ for (ent = *dbl; ent != NULL; ent = (DELAYED_BLOB_LIST *)ent->next) {
+ if (ent->blob_file_id == blob_file_id && ent->child != child) {
+ next = (DELAYED_BLOB_LIST *)ent->next;
+ prev = (DELAYED_BLOB_LIST *)ent->prev;
+ if (ent == *dbl)
+ *dbl = next;
+ if (prev != NULL)
+ prev->next = ent->next;
+ if (next != NULL)
+ next->prev = ent->prev;
+ __os_free(env, ent);
+ break;
+ }
+ }
+ return (0);
+}
+
+/*
* __rep_lsn_cmp --
* qsort-type-compatible wrapper for LOG_COMPARE.
*/
@@ -2138,9 +2352,13 @@ __rep_process_rec(env, ip, rp, rec, ret_tsp, ret_lsnp)
ret = __rep_process_txn(env, rec);
} while (ret == DB_LOCK_DEADLOCK || ret == DB_LOCK_NOTGRANTED);
- /* Now flush the log unless we're running TXN_NOSYNC. */
- if (ret == 0 && !F_ISSET(env->dbenv, DB_ENV_TXN_NOSYNC))
- ret = __log_flush(env, NULL);
+ /* Now write/flush the log as appropriate. */
+ if (ret == 0) {
+ if (F_ISSET(env->dbenv, DB_ENV_TXN_WRITE_NOSYNC))
+ ret = __log_rep_write(env);
+ else if (!F_ISSET(env->dbenv, DB_ENV_TXN_NOSYNC))
+ ret = __log_flush(env, NULL);
+ }
if (ret != 0) {
__db_errx(env, DB_STR_A("3526",
"Error processing txn [%lu][%lu]", "%lu %lu"),
@@ -2256,7 +2474,7 @@ __rep_resend_req(env, rereq)
DB_REP *db_rep;
LOG *lp;
REP *rep;
- int master, ret;
+ int blob_sync, master, ret;
repsync_t sync_state;
u_int32_t gapflags, msgtype, repflags, sendflags;
@@ -2271,6 +2489,7 @@ __rep_resend_req(env, rereq)
repflags = rep->flags;
sync_state = rep->sync_state;
+ blob_sync = rep->blob_sync;
/*
* If we are delayed we do not rerequest anything.
*/
@@ -2293,9 +2512,17 @@ __rep_resend_req(env, rereq)
*/
msgtype = REP_UPDATE_REQ;
} else if (sync_state == SYNC_PAGE) {
- REP_SYSTEM_LOCK(env);
- ret = __rep_pggap_req(env, rep, NULL, gapflags);
- REP_SYSTEM_UNLOCK(env);
+ if (blob_sync == 0) {
+ REP_SYSTEM_LOCK(env);
+ ret = __rep_pggap_req(env, rep, NULL, gapflags);
+ REP_SYSTEM_UNLOCK(env);
+ } else {
+ MUTEX_LOCK(env, rep->mtx_clientdb);
+ REP_SYSTEM_LOCK(env);
+ ret = __rep_blob_rereq(env, rep);
+ REP_SYSTEM_UNLOCK(env);
+ MUTEX_UNLOCK(env, rep->mtx_clientdb);
+ }
} else {
MUTEX_LOCK(env, rep->mtx_clientdb);
ret = __rep_loggap_req(env, rep, NULL, gapflags);
@@ -2397,9 +2624,20 @@ __rep_skip_msg(env, rep, eid, rectype)
if (rep->master_id == DB_EID_INVALID) /* Case 1. */
(void)__rep_send_message(env,
DB_EID_BROADCAST, REP_MASTER_REQ, NULL, NULL, 0, 0);
- else if (eid == rep->master_id) /* Case 2. */
- ret = __rep_resend_req(env, 0);
- else if (F_ISSET(rep, REP_F_CLIENT)) /* Case 3. */
+ else if (eid == rep->master_id) { /* Case 2. */
+ /*
+ * When we receive log messages in the SYNC_PAGE stage
+ * and we decide to rerequest, it often means the pages
+ * we expect have been dropped. Send a rerequest with
+ * gapflags for better performance.
+ */
+ if ((rectype == REP_LOG || rectype == REP_BULK_LOG ||
+ rectype == REP_LOG_MORE) &&
+ rep->sync_state == SYNC_PAGE)
+ ret = __rep_resend_req(env, 1);
+ else
+ ret = __rep_resend_req(env, 0);
+ } else if (F_ISSET(rep, REP_F_CLIENT)) /* Case 3. */
(void)__rep_send_message(env,
eid, REP_REREQUEST, NULL, NULL, 0, 0);
}
@@ -2421,7 +2659,6 @@ __rep_check_missing(env, gen, master_perm_lsn)
DB_LOG *dblp;
DB_LSN *end_lsn;
DB_REP *db_rep;
- DB_THREAD_INFO *ip;
LOG *lp;
REGINFO *infop;
REP *rep;
@@ -2434,7 +2671,6 @@ __rep_check_missing(env, gen, master_perm_lsn)
infop = env->reginfo;
has_log_gap = has_page_gap = ret = 0;
- ENV_ENTER(env, ip);
MUTEX_LOCK(env, rep->mtx_clientdb);
REP_SYSTEM_LOCK(env);
/*
@@ -2518,8 +2754,7 @@ __rep_check_missing(env, gen, master_perm_lsn)
rep->msg_th--;
REP_SYSTEM_UNLOCK(env);
-out: ENV_LEAVE(env, ip);
- return (ret);
+out: return (ret);
}
static int
diff --git a/src/rep/rep_region.c b/src/rep/rep_region.c
index f1d69dff..72372bff 100644
--- a/src/rep/rep_region.c
+++ b/src/rep/rep_region.c
@@ -1,7 +1,7 @@
/*-
* See the file LICENSE for redistribution information.
*
- * Copyright (c) 2001, 2012 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2001, 2015 Oracle and/or its affiliates. All rights reserved.
*
* $Id$
*/
@@ -14,6 +14,8 @@
static int __rep_egen_init __P((ENV *, REP *));
static int __rep_gen_init __P((ENV *, REP *));
+static int __rep_view_init __P((ENV *, REP *));
+static int __rep_viewfile_exists __P((ENV *, int *));
/*
* __rep_open --
@@ -29,7 +31,7 @@ __rep_open(env)
REGENV *renv;
REGINFO *infop;
REP *rep;
- int i, ret;
+ int i, ret, view;
char *p;
char fname[sizeof(REP_DIAGNAME) + 3];
@@ -37,10 +39,15 @@ __rep_open(env)
infop = env->reginfo;
renv = infop->primary;
ret = 0;
+ view = 0;
DB_ASSERT(env, DBREP_DIAG_FILES < 100);
if (renv->rep_off == INVALID_ROFF) {
- /* Must create the region. */
+ /*
+ * Must create the region. This environment either is being
+ * created for the first time or has just had its regions
+ * cleared by a recovery.
+ */
if ((ret = __env_alloc(infop, sizeof(REP), &rep)) != 0)
return (ret);
memset(rep, 0, sizeof(*rep));
@@ -108,6 +115,23 @@ __rep_open(env)
return (ret);
if ((ret = __rep_egen_init(env, rep)) != 0)
return (ret);
+ /*
+ * Determine if this is a view site or not. It is a view
+ * if the callback is set. If the site was a view in the
+ * past, we mark it as a view, but will check consistency
+ * later when starting replication.
+ */
+ if (db_rep->partial != NULL) {
+ rep->stat.st_view = 1;
+ if ((ret = __rep_view_init(env, rep)) != 0)
+ return (ret);
+ } else {
+ if ((ret = __rep_viewfile_exists(env, &view)) != 0)
+ return (ret);
+ if (view)
+ rep->stat.st_view = 1;
+ }
+
rep->gbytes = db_rep->gbytes;
rep->bytes = db_rep->bytes;
rep->request_gap = db_rep->request_gap;
@@ -157,6 +181,32 @@ __rep_open(env)
"process joining the environment"));
return (EINVAL);
}
+ /*
+ * If we are joining an existing environment and we
+ * have a view callback set, then the environment must
+ * already be a view. If not, error.
+ *
+ * The other mismatch is not an error here (no callback
+ * set, but environment is a view) because we may be a
+ * rep unaware process such as db_stat and that is allowed
+ * to proceed. There is additional checking in other rep
+ * functions like rep_start to confirm consistency before
+ * using replication.
+ */
+ if (db_rep->partial != NULL) {
+ if ((ret = __rep_viewfile_exists(env, &view)) != 0)
+ return (ret);
+ /*
+ * If there is a callback, and we are not in-memory,
+ * there better be a view system file too.
+ */
+ if (view == 0 && !FLD_ISSET(rep->config, REP_C_INMEM)) {
+ __db_errx(env, DB_STR("3688",
+ "Application environment and view mismatch "
+ "joining the environment"));
+ return (EINVAL);
+ }
+ }
#ifdef HAVE_REPLICATION_THREADS
if ((ret = __repmgr_join(env, rep)) != 0)
return (ret);
@@ -506,9 +556,8 @@ __rep_write_egen(env, rep, egen)
* If running in-memory replication, return without any file
* operations.
*/
- if (FLD_ISSET(rep->config, REP_C_INMEM)) {
+ if (FLD_ISSET(rep->config, REP_C_INMEM))
return (0);
- }
if ((ret = __db_appname(env,
DB_APP_META, REP_EGENNAME, NULL, &p)) != 0)
@@ -591,9 +640,8 @@ __rep_write_gen(env, rep, gen)
* If running in-memory replication, return without any file
* operations.
*/
- if (FLD_ISSET(rep->config, REP_C_INMEM)) {
+ if (FLD_ISSET(rep->config, REP_C_INMEM))
return (0);
- }
if ((ret = __db_appname(env,
DB_APP_META, REP_GENNAME, NULL, &p)) != 0)
@@ -608,3 +656,105 @@ __rep_write_gen(env, rep, gen)
__os_free(env, p);
return (ret);
}
+
+/*
+ * __rep_view_init --
+ * Initialize the permanent view file to know this site is a view
+ * forever. The existence of the file is the record.
+ */
+static int
+__rep_view_init(env, rep)
+ ENV *env;
+ REP *rep;
+{
+ DB_FH *fhp;
+ int ret;
+ char *p;
+
+ /*
+ * If running in-memory replication, return without any file
+ * operations.
+ */
+ if (FLD_ISSET(rep->config, REP_C_INMEM))
+ return (0);
+
+ if ((ret = __db_appname(env,
+ DB_APP_META, REPVIEW, NULL, &p)) != 0)
+ return (ret);
+
+ /*
+ * If the file doesn't exist, create it. We just want to open
+ * and close the file. It doesn't have any content.
+ * If the file already exists, there is nothing else to do.
+ */
+ if (__os_exists(env, p, NULL) != 0) {
+ RPRINT(env, (env, DB_VERB_REP_MISC, "View init: Create %s", p));
+ if ((ret = __os_open(env, p, 0,
+ DB_OSO_CREATE | DB_OSO_TRUNC, DB_MODE_600, &fhp)) != 0)
+ goto out;
+ (void)__os_closehandle(env, fhp);
+ }
+out: __os_free(env, p);
+ return (ret);
+}
+
+/*
+ * __rep_check_view --
+ * Check consistency between the view file and the db_rep handle.
+ *
+ * PUBLIC: int __rep_check_view __P((ENV *));
+ */
+int
+__rep_check_view(env)
+ ENV *env;
+{
+ DB_REP *db_rep;
+ REP *rep;
+ int exist, ret;
+
+ db_rep = env->rep_handle;
+ rep = db_rep->region;
+ ret = 0;
+
+ /*
+ * If running in-memory replication, check without any file
+ * operations. We can only check what exists in the region,
+ * which is the st_view field from a previous open.
+ */
+ if (FLD_ISSET(rep->config, REP_C_INMEM))
+ exist = (int)rep->stat.st_view;
+ else if ((ret = __rep_viewfile_exists(env, &exist)) != 0)
+ return (ret);
+
+ RPRINT(env, (env, DB_VERB_REP_MISC, "Check view. Exist %d, cb %d",
+ exist, (db_rep->partial != NULL)));
+ /*
+ * If view file exists, a partial function must be set.
+ * If view file does not exist, a partial function must not be set.
+ */
+ if ((exist == 0 && db_rep->partial != NULL) ||
+ (exist == 1 && db_rep->partial == NULL))
+ ret = EINVAL;
+ return (ret);
+}
+
+static int
+__rep_viewfile_exists(env, existp)
+ ENV *env;
+ int *existp;
+{
+ char *p;
+ int ret;
+
+ *existp = 0;
+ if ((ret = __db_appname(env,
+ DB_APP_META, REPVIEW, NULL, &p)) != 0)
+ return (ret);
+
+ if (__os_exists(env, p, NULL) == 0)
+ *existp = 1;
+
+ __os_free(env, p);
+ return (ret);
+
+}
diff --git a/src/rep/rep_stat.c b/src/rep/rep_stat.c
index addfee25..ffb9f262 100644
--- a/src/rep/rep_stat.c
+++ b/src/rep/rep_stat.c
@@ -1,7 +1,7 @@
/*-
* See the file LICENSE for redistribution information.
*
- * Copyright (c) 2001, 2012 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2001, 2015 Oracle and/or its affiliates. All rights reserved.
*
* $Id$
*/
@@ -73,6 +73,13 @@ static const char *__rep_syncstate_to_string __P((repsync_t));
} \
} while (0)
+#define PRINT_VIEW(sp) do { \
+ if ((sp)->st_view != 0) \
+ __db_msg(env, "Environment configured as view site"); \
+ else \
+ __db_msg(env, "Environment not configured as view site");\
+} while (0)
+
/*
* __rep_stat_pp --
* ENV->rep_stat pre/post processing.
@@ -120,7 +127,7 @@ __rep_stat(env, statp, flags)
DB_REP_STAT *stats;
LOG *lp;
REP *rep;
- u_int32_t startupdone;
+ u_int32_t startupdone, view;
uintmax_t queued;
int dolock, ret;
@@ -177,10 +184,12 @@ __rep_stat(env, statp, flags)
if (LF_ISSET(DB_STAT_CLEAR)) {
queued = rep->stat.st_log_queued;
startupdone = rep->stat.st_startup_complete;
+ view = rep->stat.st_view;
memset(&rep->stat, 0, sizeof(rep->stat));
rep->stat.st_log_queued = rep->stat.st_log_queued_total =
rep->stat.st_log_queued_max = queued;
rep->stat.st_startup_complete = startupdone;
+ rep->stat.st_view = view;
}
/*
@@ -377,6 +386,7 @@ __rep_print_stats(env, flags)
__db_dl(env, "Number of page records missed and requested",
(u_long)sp->st_pg_requested);
PRINT_STARTUPCOMPLETE(sp);
+ PRINT_VIEW(sp);
__db_dl(env,
"Number of transactions applied", (u_long)sp->st_txns_applied);
@@ -462,16 +472,20 @@ __rep_print_all(env, flags)
u_int32_t flags;
{
static const FN rep_cfn[] = {
- { REP_C_2SITE_STRICT, "REP_C_2SITE_STRICT" },
- { REP_C_AUTOINIT, "REP_C_AUTOINIT" },
- { REP_C_AUTOROLLBACK, "REP_C_AUTOROLLBACK" },
- { REP_C_BULK, "REP_C_BULK" },
- { REP_C_DELAYCLIENT, "REP_C_DELAYCLIENT" },
- { REP_C_ELECTIONS, "REP_C_ELECTIONS" },
- { REP_C_INMEM, "REP_C_INMEM" },
- { REP_C_LEASE, "REP_C_LEASE" },
- { REP_C_NOWAIT, "REP_C_NOWAIT" },
- { 0, NULL }
+ { REP_C_2SITE_STRICT, "REP_C_2SITE_STRICT" },
+ { REP_C_AUTOINIT, "REP_C_AUTOINIT" },
+ { REP_C_AUTOROLLBACK, "REP_C_AUTOROLLBACK" },
+ { REP_C_AUTOTAKEOVER, "REP_C_AUTOTAKEOVER" },
+ { REP_C_BULK, "REP_C_BULK" },
+ { REP_C_DELAYCLIENT, "REP_C_DELAYCLIENT" },
+ { REP_C_ELECT_LOGLENGTH, "REP_C_ELECT_LOGLENGTH" },
+ { REP_C_ELECTIONS, "REP_C_ELECTIONS" },
+ { REP_C_INMEM, "REP_C_INMEM" },
+ { REP_C_LEASE, "REP_C_LEASE" },
+ { REP_C_NOWAIT, "REP_C_NOWAIT" },
+ { REP_C_PREFMAS_CLIENT, "REP_C_PREFMAS_CLIENT" },
+ { REP_C_PREFMAS_MASTER, "REP_C_PREFMAS_MASTER" },
+ { 0, NULL }
};
static const FN rep_efn[] = {
{ REP_E_PHASE0, "REP_E_PHASE0" },
@@ -481,19 +495,21 @@ __rep_print_all(env, flags)
{ 0, NULL }
};
static const FN rep_fn[] = {
- { REP_F_ABBREVIATED, "REP_F_ABBREVIATED" },
- { REP_F_APP_BASEAPI, "REP_F_APP_BASEAPI" },
- { REP_F_APP_REPMGR, "REP_F_APP_REPMGR" },
- { REP_F_CLIENT, "REP_F_CLIENT" },
- { REP_F_DELAY, "REP_F_DELAY" },
- { REP_F_GROUP_ESTD, "REP_F_GROUP_ESTD" },
- { REP_F_LEASE_EXPIRED, "REP_F_LEASE_EXPIRED" },
- { REP_F_MASTER, "REP_F_MASTER" },
- { REP_F_MASTERELECT, "REP_F_MASTERELECT" },
- { REP_F_NEWFILE, "REP_F_NEWFILE" },
- { REP_F_NIMDBS_LOADED, "REP_F_NIMDBS_LOADED" },
- { REP_F_SKIPPED_APPLY, "REP_F_SKIPPED_APPLY" },
- { REP_F_START_CALLED, "REP_F_START_CALLED" },
+ { REP_F_ABBREVIATED, "REP_F_ABBREVIATED" },
+ { REP_F_APP_BASEAPI, "REP_F_APP_BASEAPI" },
+ { REP_F_APP_REPMGR, "REP_F_APP_REPMGR" },
+ { REP_F_CLIENT, "REP_F_CLIENT" },
+ { REP_F_DELAY, "REP_F_DELAY" },
+ { REP_F_GROUP_ESTD, "REP_F_GROUP_ESTD" },
+ { REP_F_HOLD_GEN, "REP_F_HOLD_GEN" },
+ { REP_F_LEASE_EXPIRED, "REP_F_LEASE_EXPIRED" },
+ { REP_F_MASTER, "REP_F_MASTER" },
+ { REP_F_MASTERELECT, "REP_F_MASTERELECT" },
+ { REP_F_NEWFILE, "REP_F_NEWFILE" },
+ { REP_F_NIMDBS_LOADED, "REP_F_NIMDBS_LOADED" },
+ { REP_F_READONLY_MASTER, "REP_F_READONLY_MASTER" },
+ { REP_F_SKIPPED_APPLY, "REP_F_SKIPPED_APPLY" },
+ { REP_F_START_CALLED, "REP_F_START_CALLED" },
{ 0, NULL }
};
static const FN rep_lfn[] = {
@@ -523,15 +539,16 @@ __rep_print_all(env, flags)
rep = db_rep->region;
infop = env->reginfo;
renv = infop->primary;
- ENV_ENTER(env, ip);
__db_msg(env, "%s", DB_GLOBAL(db_line));
__db_msg(env, "DB_REP handle information:");
if (db_rep->rep_db == NULL)
STAT_ISSET("Bookkeeping database", db_rep->rep_db);
- else
+ else {
+ ENV_GET_THREAD_INFO(env, ip);
(void)__db_stat_print(db_rep->rep_db, ip, flags);
+ }
__db_prflags(env, NULL, db_rep->flags, dbrep_fn, NULL, "\tFlags");
@@ -604,7 +621,6 @@ __rep_print_all(env, flags)
STAT_LONG("Maximum lease timestamp microseconds",
lp->max_lease_ts.tv_nsec / NS_PER_US);
MUTEX_UNLOCK(env, rep->mtx_clientdb);
- ENV_LEAVE(env, ip);
return (0);
}
@@ -648,8 +664,10 @@ __rep_stat_summary_print(env)
ret = 0;
if ((ret = __rep_stat(env, &sp, 0)) == 0) {
PRINT_STATUS(sp, is_client);
- if (is_client)
+ if (is_client) {
PRINT_STARTUPCOMPLETE(sp);
+ PRINT_VIEW(sp);
+ }
PRINT_MAXPERMLSN(sp);
/*
* Use the number of sites that is kept up-to-date most
diff --git a/src/rep/rep_stub.c b/src/rep/rep_stub.c
index 2d96ea59..51c79eb0 100644
--- a/src/rep/rep_stub.c
+++ b/src/rep/rep_stub.c
@@ -1,7 +1,7 @@
/*-
* See the file LICENSE for redistribution information.
*
- * Copyright (c) 1996, 2012 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 1996, 2015 Oracle and/or its affiliates. All rights reserved.
*
* $Id$
*/
@@ -130,7 +130,7 @@ __rep_elect_pp(dbenv, nsites, nvotes, flags)
}
int
-__rep_flush(dbenv)
+__rep_flush_pp(dbenv)
DB_ENV *dbenv;
{
return (__db_norep(dbenv->env));
@@ -201,7 +201,7 @@ __rep_get_nsites(dbenv, n)
}
int
-__rep_set_priority(dbenv, priority)
+__rep_set_priority_pp(dbenv, priority)
DB_ENV *dbenv;
u_int32_t priority;
{
@@ -219,7 +219,7 @@ __rep_get_priority(dbenv, priority)
}
int
-__rep_set_timeout(dbenv, which, timeout)
+__rep_set_timeout_pp(dbenv, which, timeout)
DB_ENV *dbenv;
int which;
db_timeout_t timeout;
@@ -342,6 +342,16 @@ __rep_set_transport_pp(dbenv, eid, f_send)
}
int
+__rep_set_view(dbenv, f_partial)
+ DB_ENV *dbenv;
+ int (*f_partial) __P((DB_ENV *,
+ const char *, int *, u_int32_t));
+{
+ COMPQUIET(f_partial, NULL);
+ return (__db_norep(dbenv->env));
+}
+
+int
__rep_set_request(dbenv, min, max)
DB_ENV *dbenv;
u_int32_t min, max;
diff --git a/src/rep/rep_util.c b/src/rep/rep_util.c
index 0dfe6122..5ee2592f 100644
--- a/src/rep/rep_util.c
+++ b/src/rep/rep_util.c
@@ -1,7 +1,7 @@
/*-
* See the file LICENSE for redistribution information.
*
- * Copyright (c) 2001, 2012 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2001, 2015 Oracle and/or its affiliates. All rights reserved.
*
* $Id$
*/
@@ -11,6 +11,7 @@
#include "db_int.h"
#include "dbinc/db_page.h"
#include "dbinc/db_am.h"
+#include "dbinc/fop.h"
#include "dbinc/mp.h"
#include "dbinc/txn.h"
@@ -437,7 +438,7 @@ __rep_send_message(env, eid, rtype, lsnp, dbt, ctlflags, repflags)
FLD_ISSET(ctlflags, REPCTL_LEASE | REPCTL_PERM)) {
F_SET(&cntrl, REPCTL_LEASE);
DB_ASSERT(env, rep->version == DB_REPVERSION);
- __os_gettime(env, &msg_time, 1);
+ __os_gettime(env, &msg_time, 0);
cntrl.msg_sec = (u_int32_t)msg_time.tv_sec;
cntrl.msg_nsec = (u_int32_t)msg_time.tv_nsec;
}
@@ -591,6 +592,15 @@ __rep_new_master(env, cntrl, eid)
ret = 0;
logc = NULL;
lockout_msg = 0;
+
+ /*
+ * If REP_F_HOLD_GEN is set, we want to keep this site at its
+ * current gen. Do not process an incoming NEWMASTER, which
+ * would change the gen.
+ */
+ if (F_ISSET(rep, REP_F_HOLD_GEN))
+ return (ret);
+
REP_SYSTEM_LOCK(env);
change = rep->gen != cntrl->gen || rep->master_id != eid;
/*
@@ -1128,6 +1138,8 @@ __env_db_rep_exit(env)
rep = db_rep->region;
REP_SYSTEM_LOCK(env);
+ /* If we have a reference, it better not already be 0. */
+ DB_ASSERT(env, rep->handle_cnt != 0);
rep->handle_cnt--;
REP_SYSTEM_UNLOCK(env);
@@ -1190,7 +1202,7 @@ __db_rep_enter(dbp, checkgen, checklock, return_now)
* get an exclusive lock on this database.
*/
if (checkgen && dbp->mpf->mfp && IS_REP_CLIENT(env)) {
- if (dbp->mpf->mfp->excl_lockout)
+ if (dbp->mpf->mfp->excl_lockout)
return (DB_REP_HANDLE_DEAD);
}
@@ -1328,7 +1340,8 @@ __op_rep_exit(env)
rep = db_rep->region;
REP_SYSTEM_LOCK(env);
- DB_ASSERT(env, rep->op_cnt > 0);
+ /* If we have a reference, it better not already be 0. */
+ DB_ASSERT(env, rep->op_cnt != 0);
rep->op_cnt--;
REP_SYSTEM_UNLOCK(env);
@@ -1697,7 +1710,9 @@ __rep_msg_to_old(version, rectype)
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
- REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID },
+ REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
+ REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
+ REP_INVALID },
/*
* 4.2/DB_REPVERSION 1 no longer supported.
*/
@@ -1708,7 +1723,9 @@ __rep_msg_to_old(version, rectype)
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
- REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID },
+ REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
+ REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
+ REP_INVALID },
/*
* 4.3/DB_REPVERSION 2 no longer supported.
*/
@@ -1719,7 +1736,9 @@ __rep_msg_to_old(version, rectype)
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
- REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID },
+ REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
+ REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
+ REP_INVALID },
/*
* From 4.7 message number To 4.4/4.5 message number
*/
@@ -1727,6 +1746,11 @@ __rep_msg_to_old(version, rectype)
1, /* REP_ALIVE */
2, /* REP_ALIVE_REQ */
3, /* REP_ALL_REQ */
+ REP_INVALID, /* REP_BLOB_ALL_REQ */
+ REP_INVALID, /* REP_BLOB_CHUNK */
+ REP_INVALID, /* REP_BLOB_CHUNK_REQ */
+ REP_INVALID, /* REP_BLOB_UPDATE */
+ REP_INVALID, /* REP_BLOB_UPDATE_REQ */
4, /* REP_BULK_LOG */
5, /* REP_BULK_PAGE */
6, /* REP_DUPMASTER */
@@ -1765,6 +1789,11 @@ __rep_msg_to_old(version, rectype)
1, /* REP_ALIVE */
2, /* REP_ALIVE_REQ */
3, /* REP_ALL_REQ */
+ REP_INVALID, /* REP_BLOB_ALL_REQ */
+ REP_INVALID, /* REP_BLOB_CHUNK */
+ REP_INVALID, /* REP_BLOB_CHUNK_REQ */
+ REP_INVALID, /* REP_BLOB_UPDATE */
+ REP_INVALID, /* REP_BLOB_UPDATE_REQ */
4, /* REP_BULK_LOG */
5, /* REP_BULK_PAGE */
6, /* REP_DUPMASTER */
@@ -1803,6 +1832,11 @@ __rep_msg_to_old(version, rectype)
1, /* REP_ALIVE */
2, /* REP_ALIVE_REQ */
3, /* REP_ALL_REQ */
+ REP_INVALID, /* REP_BLOB_ALL_REQ */
+ REP_INVALID, /* REP_BLOB_CHUNK */
+ REP_INVALID, /* REP_BLOB_CHUNK_REQ */
+ REP_INVALID, /* REP_BLOB_UPDATE */
+ REP_INVALID, /* REP_BLOB_UPDATE_REQ */
4, /* REP_BULK_LOG */
5, /* REP_BULK_PAGE */
6, /* REP_DUPMASTER */
@@ -1841,6 +1875,53 @@ __rep_msg_to_old(version, rectype)
1, /* REP_ALIVE */
2, /* REP_ALIVE_REQ */
3, /* REP_ALL_REQ */
+ REP_INVALID, /* REP_BLOB_ALL_REQ */
+ REP_INVALID, /* REP_BLOB_CHUNK */
+ REP_INVALID, /* REP_BLOB_CHUNK_REQ */
+ REP_INVALID, /* REP_BLOB_UPDATE */
+ REP_INVALID, /* REP_BLOB_UPDATE_REQ */
+ 4, /* REP_BULK_LOG */
+ 5, /* REP_BULK_PAGE */
+ 6, /* REP_DUPMASTER */
+ 7, /* REP_FILE */
+ 8, /* REP_FILE_FAIL */
+ 9, /* REP_FILE_REQ */
+ 10, /* REP_LEASE_GRANT */
+ 11, /* REP_LOG */
+ 12, /* REP_LOG_MORE */
+ 13, /* REP_LOG_REQ */
+ 14, /* REP_MASTER_REQ */
+ 15, /* REP_NEWCLIENT */
+ 16, /* REP_NEWFILE */
+ 17, /* REP_NEWMASTER */
+ 18, /* REP_NEWSITE */
+ 19, /* REP_PAGE */
+ 20, /* REP_PAGE_FAIL */
+ 21, /* REP_PAGE_MORE */
+ 22, /* REP_PAGE_REQ */
+ 23, /* REP_REREQUEST */
+ 24, /* REP_START_SYNC */
+ 25, /* REP_UPDATE */
+ 26, /* REP_UPDATE_REQ */
+ 27, /* REP_VERIFY */
+ 28, /* REP_VERIFY_FAIL */
+ 29, /* REP_VERIFY_REQ */
+ 30, /* REP_VOTE1 */
+ 31 /* REP_VOTE2 */
+ },
+ /*
+ * From 6.1 message number To 5.3 message number. Messages
+ * handling BLOBs were added.
+ */
+ { REP_INVALID, /* NO message 0 */
+ 1, /* REP_ALIVE */
+ 2, /* REP_ALIVE_REQ */
+ 3, /* REP_ALL_REQ */
+ REP_INVALID, /* REP_BLOB_ALL_REQ */
+ REP_INVALID, /* REP_BLOB_CHUNK */
+ REP_INVALID, /* REP_BLOB_CHUNK_REQ */
+ REP_INVALID, /* REP_BLOB_UPDATE */
+ REP_INVALID, /* REP_BLOB_UPDATE_REQ */
4, /* REP_BULK_LOG */
5, /* REP_BULK_PAGE */
6, /* REP_DUPMASTER */
@@ -1901,7 +1982,9 @@ __rep_msg_from_old(version, rectype)
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
- REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID },
+ REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
+ REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
+ REP_INVALID },
/*
* 4.2/DB_REPVERSION 1 no longer supported.
*/
@@ -1912,7 +1995,9 @@ __rep_msg_from_old(version, rectype)
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
- REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID },
+ REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
+ REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
+ REP_INVALID },
/*
* 4.3/DB_REPVERSION 2 no longer supported.
*/
@@ -1923,7 +2008,9 @@ __rep_msg_from_old(version, rectype)
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
- REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID },
+ REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
+ REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
+ REP_INVALID },
/*
* From 4.4/4.5 message number To 4.7 message number
*/
@@ -1931,36 +2018,41 @@ __rep_msg_from_old(version, rectype)
1, /* 1, REP_ALIVE */
2, /* 2, REP_ALIVE_REQ */
3, /* 3, REP_ALL_REQ */
- 4, /* 4, REP_BULK_LOG */
- 5, /* 5, REP_BULK_PAGE */
- 6, /* 6, REP_DUPMASTER */
- 7, /* 7, REP_FILE */
- 8, /* 8, REP_FILE_FAIL */
- 9, /* 9, REP_FILE_REQ */
- /* 10, REP_LEASE_GRANT doesn't exist */
- 11, /* 10, REP_LOG */
- 12, /* 11, REP_LOG_MORE */
- 13, /* 12, REP_LOG_REQ */
- 14, /* 13, REP_MASTER_REQ */
- 15, /* 14, REP_NEWCLIENT */
- 16, /* 15, REP_NEWFILE */
- 17, /* 16, REP_NEWMASTER */
- 18, /* 17, REP_NEWSITE */
- 19, /* 18, REP_PAGE */
- 20, /* 19, REP_PAGE_FAIL */
- 21, /* 20, REP_PAGE_MORE */
- 22, /* 21, REP_PAGE_REQ */
- 23, /* 22, REP_REREQUEST */
- /* 24, REP_START_SYNC doesn't exist */
- 25, /* 23, REP_UPDATE */
- 26, /* 24, REP_UPDATE_REQ */
- 27, /* 25, REP_VERIFY */
- 28, /* 26, REP_VERIFY_FAIL */
- 29, /* 27, REP_VERIFY_REQ */
- 30, /* 28, REP_VOTE1 */
- 31, /* 29, REP_VOTE2 */
+ 9, /* 4, REP_BULK_LOG */
+ 10, /* 5, REP_BULK_PAGE */
+ 11, /* 6, REP_DUPMASTER */
+ 12, /* 7, REP_FILE */
+ 13, /* 8, REP_FILE_FAIL */
+ 14, /* 9, REP_FILE_REQ */
+ /* 15, REP_LEASE_GRANT doesn't exist */
+ 16, /* 10, REP_LOG */
+ 17, /* 11, REP_LOG_MORE */
+ 18, /* 12, REP_LOG_REQ */
+ 19, /* 13, REP_MASTER_REQ */
+ 20, /* 14, REP_NEWCLIENT */
+ 21, /* 15, REP_NEWFILE */
+ 22, /* 16, REP_NEWMASTER */
+ 23, /* 17, REP_NEWSITE */
+ 24, /* 18, REP_PAGE */
+ 25, /* 19, REP_PAGE_FAIL */
+ 26, /* 20, REP_PAGE_MORE */
+ 27, /* 21, REP_PAGE_REQ */
+ 28, /* 22, REP_REREQUEST */
+ /* 29, REP_START_SYNC doesn't exist */
+ 30, /* 23, REP_UPDATE */
+ 31, /* 24, REP_UPDATE_REQ */
+ 32, /* 25, REP_VERIFY */
+ 33, /* 26, REP_VERIFY_FAIL */
+ 34, /* 27, REP_VERIFY_REQ */
+ 35, /* 28, REP_VOTE1 */
+ 36, /* 29, REP_VOTE2 */
REP_INVALID, /* 30, 4.4/4.5 no message */
- REP_INVALID /* 31, 4.4/4.5 no message */
+ REP_INVALID, /* 31, 4.4/4.5 no message */
+ REP_INVALID, /* 32, 4.4/4.5 no message */
+ REP_INVALID, /* 33, 4.4/4.5 no message */
+ REP_INVALID, /* 34, 4.4/4.5 no message */
+ REP_INVALID, /* 35, 4.4/4.5 no message */
+ REP_INVALID /* 36, 4.4/4.5 no message */
},
/*
* From 4.6 message number To 4.7 message number. There are
@@ -1971,34 +2063,39 @@ __rep_msg_from_old(version, rectype)
1, /* 1, REP_ALIVE */
2, /* 2, REP_ALIVE_REQ */
3, /* 3, REP_ALL_REQ */
- 4, /* 4, REP_BULK_LOG */
- 5, /* 5, REP_BULK_PAGE */
- 6, /* 6, REP_DUPMASTER */
- 7, /* 7, REP_FILE */
- 8, /* 8, REP_FILE_FAIL */
- 9, /* 9, REP_FILE_REQ */
- 10, /* 10, REP_LEASE_GRANT */
- 11, /* 11, REP_LOG */
- 12, /* 12, REP_LOG_MORE */
- 13, /* 13, REP_LOG_REQ */
- 14, /* 14, REP_MASTER_REQ */
- 15, /* 15, REP_NEWCLIENT */
- 16, /* 16, REP_NEWFILE */
- 17, /* 17, REP_NEWMASTER */
- 18, /* 18, REP_NEWSITE */
- 19, /* 19, REP_PAGE */
- 20, /* 20, REP_PAGE_FAIL */
- 21, /* 21, REP_PAGE_MORE */
- 22, /* 22, REP_PAGE_REQ */
- 23, /* 22, REP_REREQUEST */
- 24, /* 24, REP_START_SYNC */
- 25, /* 25, REP_UPDATE */
- 26, /* 26, REP_UPDATE_REQ */
- 27, /* 27, REP_VERIFY */
- 28, /* 28, REP_VERIFY_FAIL */
- 29, /* 29, REP_VERIFY_REQ */
- 30, /* 30, REP_VOTE1 */
- 31 /* 31, REP_VOTE2 */
+ 9, /* 4, REP_BULK_LOG */
+ 10, /* 5, REP_BULK_PAGE */
+ 11, /* 6, REP_DUPMASTER */
+ 12, /* 7, REP_FILE */
+ 13, /* 8, REP_FILE_FAIL */
+ 14, /* 9, REP_FILE_REQ */
+ 15, /* 10, REP_LEASE_GRANT */
+ 16, /* 11, REP_LOG */
+ 17, /* 12, REP_LOG_MORE */
+ 18, /* 13, REP_LOG_REQ */
+ 19, /* 14, REP_MASTER_REQ */
+ 20, /* 15, REP_NEWCLIENT */
+ 21, /* 16, REP_NEWFILE */
+ 22, /* 17, REP_NEWMASTER */
+ 23, /* 18, REP_NEWSITE */
+ 24, /* 19, REP_PAGE */
+ 25, /* 20, REP_PAGE_FAIL */
+ 26, /* 21, REP_PAGE_MORE */
+ 27, /* 22, REP_PAGE_REQ */
+ 28, /* 22, REP_REREQUEST */
+ 29, /* 24, REP_START_SYNC */
+ 30, /* 25, REP_UPDATE */
+ 31, /* 26, REP_UPDATE_REQ */
+ 32, /* 27, REP_VERIFY */
+ 33, /* 28, REP_VERIFY_FAIL */
+ 34, /* 29, REP_VERIFY_REQ */
+ 35, /* 30, REP_VOTE1 */
+ 36, /* 31, REP_VOTE2 */
+ REP_INVALID, /* 32, 4.6/4.7 no message */
+ REP_INVALID, /* 33, 4.6/4.7 no message */
+ REP_INVALID, /* 34, 4.6/4.7 no message */
+ REP_INVALID, /* 35, 4.6/4.7 no message */
+ REP_INVALID /* 36, 4.6/4.7 no message */
},
/*
* From 4.7 message number To 5.2 message number. There are
@@ -2009,34 +2106,39 @@ __rep_msg_from_old(version, rectype)
1, /* 1, REP_ALIVE */
2, /* 2, REP_ALIVE_REQ */
3, /* 3, REP_ALL_REQ */
- 4, /* 4, REP_BULK_LOG */
- 5, /* 5, REP_BULK_PAGE */
- 6, /* 6, REP_DUPMASTER */
- 7, /* 7, REP_FILE */
- 8, /* 8, REP_FILE_FAIL */
- 9, /* 9, REP_FILE_REQ */
- 10, /* 10, REP_LEASE_GRANT */
- 11, /* 11, REP_LOG */
- 12, /* 12, REP_LOG_MORE */
- 13, /* 13, REP_LOG_REQ */
- 14, /* 14, REP_MASTER_REQ */
- 15, /* 15, REP_NEWCLIENT */
- 16, /* 16, REP_NEWFILE */
- 17, /* 17, REP_NEWMASTER */
- 18, /* 18, REP_NEWSITE */
- 19, /* 19, REP_PAGE */
- 20, /* 20, REP_PAGE_FAIL */
- 21, /* 21, REP_PAGE_MORE */
- 22, /* 22, REP_PAGE_REQ */
- 23, /* 22, REP_REREQUEST */
- 24, /* 24, REP_START_SYNC */
- 25, /* 25, REP_UPDATE */
- 26, /* 26, REP_UPDATE_REQ */
- 27, /* 27, REP_VERIFY */
- 28, /* 28, REP_VERIFY_FAIL */
- 29, /* 29, REP_VERIFY_REQ */
- 30, /* 30, REP_VOTE1 */
- 31 /* 31, REP_VOTE2 */
+ 9, /* 4, REP_BULK_LOG */
+ 10, /* 5, REP_BULK_PAGE */
+ 11, /* 6, REP_DUPMASTER */
+ 12, /* 7, REP_FILE */
+ 13, /* 8, REP_FILE_FAIL */
+ 14, /* 9, REP_FILE_REQ */
+ 15, /* 10, REP_LEASE_GRANT */
+ 16, /* 11, REP_LOG */
+ 17, /* 12, REP_LOG_MORE */
+ 18, /* 13, REP_LOG_REQ */
+ 19, /* 14, REP_MASTER_REQ */
+ 20, /* 15, REP_NEWCLIENT */
+ 21, /* 16, REP_NEWFILE */
+ 22, /* 17, REP_NEWMASTER */
+ 23, /* 18, REP_NEWSITE */
+ 24, /* 19, REP_PAGE */
+ 25, /* 20, REP_PAGE_FAIL */
+ 26, /* 21, REP_PAGE_MORE */
+ 27, /* 22, REP_PAGE_REQ */
+ 28, /* 22, REP_REREQUEST */
+ 29, /* 24, REP_START_SYNC */
+ 30, /* 25, REP_UPDATE */
+ 31, /* 26, REP_UPDATE_REQ */
+ 32, /* 27, REP_VERIFY */
+ 33, /* 28, REP_VERIFY_FAIL */
+ 34, /* 29, REP_VERIFY_REQ */
+ 35, /* 30, REP_VOTE1 */
+ 36, /* 31, REP_VOTE2 */
+ REP_INVALID, /* 32, 4.7/5.2 no message */
+ REP_INVALID, /* 33, 4.7/5.2 no message */
+ REP_INVALID, /* 34, 4.7/5.2 no message */
+ REP_INVALID, /* 35, 4.7/5.2 no message */
+ REP_INVALID /* 36, 4.7/5.2 no message */
},
/*
* From 4.7 message number To 5.3 message number. There are
@@ -2047,34 +2149,86 @@ __rep_msg_from_old(version, rectype)
1, /* 1, REP_ALIVE */
2, /* 2, REP_ALIVE_REQ */
3, /* 3, REP_ALL_REQ */
- 4, /* 4, REP_BULK_LOG */
- 5, /* 5, REP_BULK_PAGE */
- 6, /* 6, REP_DUPMASTER */
- 7, /* 7, REP_FILE */
- 8, /* 8, REP_FILE_FAIL */
- 9, /* 9, REP_FILE_REQ */
- 10, /* 10, REP_LEASE_GRANT */
- 11, /* 11, REP_LOG */
- 12, /* 12, REP_LOG_MORE */
- 13, /* 13, REP_LOG_REQ */
- 14, /* 14, REP_MASTER_REQ */
- 15, /* 15, REP_NEWCLIENT */
- 16, /* 16, REP_NEWFILE */
- 17, /* 17, REP_NEWMASTER */
- 18, /* 18, REP_NEWSITE */
- 19, /* 19, REP_PAGE */
- 20, /* 20, REP_PAGE_FAIL */
- 21, /* 21, REP_PAGE_MORE */
- 22, /* 22, REP_PAGE_REQ */
- 23, /* 22, REP_REREQUEST */
- 24, /* 24, REP_START_SYNC */
- 25, /* 25, REP_UPDATE */
- 26, /* 26, REP_UPDATE_REQ */
- 27, /* 27, REP_VERIFY */
- 28, /* 28, REP_VERIFY_FAIL */
- 29, /* 29, REP_VERIFY_REQ */
- 30, /* 30, REP_VOTE1 */
- 31 /* 31, REP_VOTE2 */
+ 9, /* 4, REP_BULK_LOG */
+ 10, /* 5, REP_BULK_PAGE */
+ 11, /* 6, REP_DUPMASTER */
+ 12, /* 7, REP_FILE */
+ 13, /* 8, REP_FILE_FAIL */
+ 14, /* 9, REP_FILE_REQ */
+ 15, /* 10, REP_LEASE_GRANT */
+ 16, /* 11, REP_LOG */
+ 17, /* 12, REP_LOG_MORE */
+ 18, /* 13, REP_LOG_REQ */
+ 19, /* 14, REP_MASTER_REQ */
+ 20, /* 15, REP_NEWCLIENT */
+ 21, /* 16, REP_NEWFILE */
+ 22, /* 17, REP_NEWMASTER */
+ 23, /* 18, REP_NEWSITE */
+ 24, /* 19, REP_PAGE */
+ 25, /* 20, REP_PAGE_FAIL */
+ 26, /* 21, REP_PAGE_MORE */
+ 27, /* 22, REP_PAGE_REQ */
+ 28, /* 22, REP_REREQUEST */
+ 29, /* 24, REP_START_SYNC */
+ 30, /* 25, REP_UPDATE */
+ 31, /* 26, REP_UPDATE_REQ */
+ 32, /* 27, REP_VERIFY */
+ 33, /* 28, REP_VERIFY_FAIL */
+ 34, /* 29, REP_VERIFY_REQ */
+ 35, /* 30, REP_VOTE1 */
+ 36, /* 31, REP_VOTE2 */
+ REP_INVALID, /* 32, 4.7/5.3 no message */
+ REP_INVALID, /* 33, 4.7/5.3 no message */
+ REP_INVALID, /* 34, 4.7/5.3 no message */
+ REP_INVALID, /* 35, 4.7/5.3 no message */
+ REP_INVALID /* 36, 4.7/5.3 no message */
+ },
+ /*
+ * From 5.3 message number To 6.1 message number. Messages to
+ * handle BLOBs were added.
+ */
+ { REP_INVALID, /* NO message 0 */
+ 1, /* 1, REP_ALIVE */
+ 2, /* 2, REP_ALIVE_REQ */
+ 3, /* 3, REP_ALL_REQ */
+ /* 4, REP_BLOB_ALL_REQ doesn't exist */
+ /* 5, REP_BLOB_CHUNK doesn't exist */
+ /* 6, REP_BLOB_CHUNK_REQ doesn't exist */
+ /* 7, REP_BLOB_UPDATE doesn't exist */
+ /* 8, REP_BLOB_UPDATE_REQ doesn't exist */
+ 9, /* 4, REP_BULK_LOG */
+ 10, /* 5, REP_BULK_PAGE */
+ 11, /* 6, REP_DUPMASTER */
+ 12, /* 7, REP_FILE */
+ 13, /* 8, REP_FILE_FAIL */
+ 14, /* 9, REP_FILE_REQ */
+ 15, /* 10, REP_LEASE_GRANT */
+ 16, /* 11, REP_LOG */
+ 17, /* 12, REP_LOG_MORE */
+ 18, /* 13, REP_LOG_REQ */
+ 19, /* 14, REP_MASTER_REQ */
+ 20, /* 15, REP_NEWCLIENT */
+ 21, /* 16, REP_NEWFILE */
+ 22, /* 17, REP_NEWMASTER */
+ 23, /* 18, REP_NEWSITE */
+ 24, /* 19, REP_PAGE */
+ 25, /* 20, REP_PAGE_FAIL */
+ 26, /* 21, REP_PAGE_MORE */
+ 27, /* 22, REP_PAGE_REQ */
+ 28, /* 23, REP_REREQUEST */
+ 29, /* 24, REP_START_SYNC */
+ 30, /* 25, REP_UPDATE */
+ 31, /* 26, REP_UPDATE_REQ */
+ 32, /* 27, REP_VERIFY */
+ 33, /* 28, REP_VERIFY_FAIL */
+ 34, /* 29, REP_VERIFY_REQ */
+ 35, /* 30, REP_VOTE1 */
+ 36, /* 31, REP_VOTE2 */
+ REP_INVALID, /* 32, 5.3/6.1 no message */
+ REP_INVALID, /* 33, 5.3/6.1 no message */
+ REP_INVALID, /* 34, 5.3/6.1 no message */
+ REP_INVALID, /* 35, 5.3/6.1 no message */
+ REP_INVALID /* 36, 5.3/6.1 no message */
}
};
return (table[version][rectype]);
@@ -2215,9 +2369,9 @@ __rep_print_int(env, verbose, fmt, ap)
__os_id(env->dbenv, &pid, &tid);
if (diag_msg)
MUTEX_LOCK(env, rep->mtx_diag);
- __os_gettime(env, &ts, 1);
+ __os_gettime(env, &ts, 0);
__db_msgadd(env, &mb, "[%lu:%lu][%s] %s: ",
- (u_long)ts.tv_sec, (u_long)ts.tv_nsec/NS_PER_US,
+ (u_long)ts.tv_sec, (u_long)ts.tv_nsec / NS_PER_US,
env->dbenv->thread_id_string(env->dbenv, pid, tid, buf), s);
__db_msgadd_ap(env, &mb, fmt, ap);
@@ -2260,6 +2414,26 @@ __rep_print_message(env, eid, rp, str, flags)
FLD_SET(verbflag, DB_VERB_REP_MISC);
type = "all_req";
break;
+ case REP_BLOB_ALL_REQ:
+ FLD_SET(verbflag, DB_VERB_REP_MISC);
+ type = "all_blob_req";
+ break;
+ case REP_BLOB_CHUNK:
+ FLD_SET(verbflag, DB_VERB_REP_MISC);
+ type = "blob_chunk";
+ break;
+ case REP_BLOB_CHUNK_REQ:
+ FLD_SET(verbflag, DB_VERB_REP_MISC);
+ type = "blob_chunk_req";
+ break;
+ case REP_BLOB_UPDATE:
+ FLD_SET(verbflag, DB_VERB_REP_MISC);
+ type = "blob_update";
+ break;
+ case REP_BLOB_UPDATE_REQ:
+ FLD_SET(verbflag, DB_VERB_REP_MISC);
+ type = "blob_update_req";
+ break;
case REP_BULK_LOG:
FLD_SET(verbflag, DB_VERB_REP_MISC);
type = "bulk_log";
@@ -2650,9 +2824,19 @@ __rep_log_backup(env, logc, lsn, match)
*/
if ((match == REP_REC_COMMIT &&
rectype == DB___txn_regop) ||
- (match == REP_REC_PERM &&
- (rectype == DB___txn_ckp || rectype == DB___txn_regop)))
+ ((match == REP_REC_PERM || match == REP_REC_PERM_DEL) &&
+ IS_PERM_RECTYPE(rectype)))
break;
+ /*
+ * Break early if a file remove is discovered in the logs.
+ * BDB cannot restore a deleted database or blob file from
+ * logs, so trigger internal init to recover the file.
+ * Used by Instant Internal Init in replication.
+ */
+ if (match == REP_REC_PERM_DEL && rectype == DB___fop_remove) {
+ ret = DB_NOTFOUND;
+ break;
+ }
}
return (ret);
}
@@ -2671,7 +2855,6 @@ __rep_get_maxpermlsn(env, max_perm_lsnp)
{
DB_LOG *dblp;
DB_REP *db_rep;
- DB_THREAD_INFO *ip;
LOG *lp;
REP *rep;
@@ -2680,11 +2863,9 @@ __rep_get_maxpermlsn(env, max_perm_lsnp)
dblp = env->lg_handle;
lp = dblp->reginfo.primary;
- ENV_ENTER(env, ip);
MUTEX_LOCK(env, rep->mtx_clientdb);
*max_perm_lsnp = lp->max_perm_lsn;
MUTEX_UNLOCK(env, rep->mtx_clientdb);
- ENV_LEAVE(env, ip);
return (0);
}
@@ -2724,12 +2905,13 @@ __rep_get_datagen(env, data_genp)
u_int8_t data_buf[__REP_LSN_HIST_DATA_SIZE];
DBT key_dbt, data_dbt;
u_int32_t flags;
- int ret, t_ret, tries;
+ int ret, t_ret, tries, was_open;
db_rep = env->rep_handle;
ret = 0;
*data_genp = 0;
tries = 0;
+ was_open = 0;
flags = DB_LAST;
retry:
if ((ret = __txn_begin(env, NULL, NULL, &txn, DB_IGNORE_LEASE)) != 0)
@@ -2746,10 +2928,10 @@ retry:
* That is not an error.
*/
ret = 0;
- goto out;
+ goto noclose;
}
- db_rep->lsn_db = dbp;
- }
+ } else
+ was_open = 1;
if ((ret = __db_cursor(dbp, NULL, txn, &dbc, 0)) != 0)
goto out;
@@ -2784,8 +2966,126 @@ retry:
&key, key_buf, __REP_LSN_HIST_KEY_SIZE, NULL)) == 0)
*data_genp = key.gen;
out:
+ if (!was_open && dbp != NULL &&
+ (t_ret = __db_close(dbp, txn, DB_NOSYNC)) != 0 && ret == 0)
+ ret = t_ret;
+noclose:
if ((t_ret = __txn_commit(txn, DB_TXN_NOSYNC)) != 0 && ret == 0)
ret = t_ret;
err:
return (ret);
}
+
+/*
+ * __rep_become_readonly_master --
+ *
+ * Put this master into a state where it no longer accepts writes but it
+ * is still a master that can respond to requests for missing messages.
+ * It fills in sync_lsn to provide a mechanism to know the LSN of the
+ * next log record expected on this site. Generally, this site should
+ * be restarted as a client shortly after becoming a readonly master.
+ *
+ * PUBLIC: int __rep_become_readonly_master
+ * PUBLIC: __P((ENV *, u_int32_t *, DB_LSN *));
+ */
+int
+__rep_become_readonly_master(env, gen, sync_lsnp)
+ ENV *env;
+ u_int32_t *gen;
+ DB_LSN *sync_lsnp;
+{
+ DB_LOG *dblp;
+ DB_REP *db_rep;
+ LOG *lp;
+ REP *rep;
+ int locked, ret;
+
+ db_rep = env->rep_handle;
+ rep = db_rep->region;
+ dblp = env->lg_handle;
+ lp = dblp->reginfo.primary;
+ *gen = 0;
+ ZERO_LSN(*sync_lsnp);
+ ret = 0;
+ locked = 0;
+
+ REP_SYSTEM_LOCK(env);
+ /*
+ * Lock out replication message thread processing so that replication
+ * world won't change (e.g. restart, client sync).
+ */
+ if (FLD_ISSET(rep->lockout_flags, REP_LOCKOUT_MSG)) {
+ /* There is already someone in msg lockout, return. */
+ RPRINT(env, (env, DB_VERB_REP_MISC,
+ "Readonly master: thread already in msg lockout"));
+ goto errunlock;
+ } else if ((ret = __rep_lockout_msg(env, rep, 0)) != 0)
+ goto errclearlockouts;
+
+ /*
+ * Lock out API to wait for active txn/mpool operations to complete
+ * and prevent new ones from starting.
+ */
+ if ((ret = __rep_lockout_api(env, rep)) != 0)
+ goto errclearlockouts;
+ locked = 1;
+
+ /* Make this site a readonly master and get master generation. */
+ F_SET(rep, REP_F_READONLY_MASTER);
+ *gen = rep->gen;
+ REP_SYSTEM_UNLOCK(env);
+
+ /* Get the next log record the logging subsystem expects to write. */
+ LOG_SYSTEM_LOCK(env);
+ *sync_lsnp = lp->lsn;
+ LOG_SYSTEM_UNLOCK(env);
+
+ REP_SYSTEM_LOCK(env);
+errclearlockouts:
+ FLD_CLR(rep->lockout_flags, REP_LOCKOUT_MSG);
+ if (locked)
+ CLR_LOCKOUT_BDB(rep);
+errunlock:
+ REP_SYSTEM_UNLOCK(env);
+ return (ret);
+}
+
+/*
+ * __rep_get_lsnhist_data --
+ *
+ * A utility function to get the full LSN history database record for a
+ * particular gen.
+ *
+ * PUBLIC: int __rep_get_lsnhist_data __P((ENV *, DB_THREAD_INFO *,
+ * PUBLIC: u_int32_t, __rep_lsn_hist_data_args *));
+ */
+int
+__rep_get_lsnhist_data(env, ip, gen, lsnhist_data)
+ ENV *env;
+ DB_THREAD_INFO *ip;
+ u_int32_t gen;
+ __rep_lsn_hist_data_args *lsnhist_data;
+{
+ DB_TXN *txn;
+ DBC *dbc;
+ struct rep_waitgoal reason;
+ int ret, t_ret;
+
+ txn = NULL;
+ dbc = NULL;
+
+ /*
+ * Cannot use cached LSN history values because we need the
+ * timestamp value here, which is not cached.
+ */
+ ret = __rep_read_lsn_history(env,
+ ip, &txn, &dbc, gen, lsnhist_data, &reason, DB_SET, 0);
+
+ if (dbc != NULL &&
+ (t_ret = __dbc_close(dbc)) != 0 && ret == 0)
+ ret = t_ret;
+ if (txn != NULL &&
+ (t_ret = __db_txn_auto_resolve(env, txn, 1, ret)) != 0 && ret == 0)
+ ret = t_ret;
+ return (ret);
+}
diff --git a/src/rep/rep_verify.c b/src/rep/rep_verify.c
index 5238f900..40a0dfce 100644
--- a/src/rep/rep_verify.c
+++ b/src/rep/rep_verify.c
@@ -1,7 +1,7 @@
/*-
* See the file LICENSE for redistribution information.
*
- * Copyright (c) 2004, 2012 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2004, 2015 Oracle and/or its affiliates. All rights reserved.
*
* $Id$
*/
@@ -119,8 +119,15 @@ __rep_verify(env, rp, rec, eid, savetime)
goto out;
}
}
+ /*
+ * Search for a matching perm record. If none is found,
+ * or a database or file delete is encountered before the
+ * perm record, begin internal init. Database and blob file
+ * deletes cannot be undone once committed, so internal init
+ * must be used to re-create the files.
+ */
if ((ret = __rep_log_backup(env, logc, &lsn,
- REP_REC_PERM)) == 0) {
+ REP_REC_PERM_DEL)) == 0) {
MUTEX_LOCK(env, rep->mtx_clientdb);
lp->verify_lsn = lsn;
__os_gettime(env, &lp->rcvd_ts, 1);
@@ -205,8 +212,10 @@ __rep_internal_init(env, abbrev)
u_int32_t abbrev;
{
REP *rep;
+ u_int32_t ctlflags;
int master, ret;
+ ctlflags = 0;
rep = env->rep_handle->region;
REP_SYSTEM_LOCK(env);
#ifdef HAVE_STATISTICS
@@ -227,6 +236,7 @@ __rep_internal_init(env, abbrev)
RPRINT(env, (env, DB_VERB_REP_SYNC,
"send UPDATE_REQ, merely to check for NIMDB refresh"));
F_SET(rep, REP_F_ABBREVIATED);
+ FLD_SET(ctlflags, REPCTL_INMEM_ONLY);
} else
F_CLR(rep, REP_F_ABBREVIATED);
ZERO_LSN(rep->first_lsn);
@@ -237,7 +247,7 @@ __rep_internal_init(env, abbrev)
REP_SYSTEM_UNLOCK(env);
if (ret == 0 && master != DB_EID_INVALID)
(void)__rep_send_message(env,
- master, REP_UPDATE_REQ, NULL, NULL, 0, 0);
+ master, REP_UPDATE_REQ, NULL, NULL, ctlflags, 0);
return (ret);
}
@@ -504,8 +514,7 @@ __rep_dorecovery(env, lsnp, trunclsnp)
*/
DB_ASSERT(env, rep->op_cnt == 0);
DB_ASSERT(env, rep->msg_th == 1);
- if (rectype == DB___txn_regop || rectype == DB___txn_ckp ||
- rectype == DB___dbreg_register)
+ if (IS_PERM_RECTYPE(rectype) || rectype == DB___dbreg_register)
skip_rec = 0;
if (rectype == DB___txn_regop) {
if (rep->version >= DB_REPVERSION_44) {
@@ -653,8 +662,10 @@ __rep_verify_match(env, reclsnp, savetime)
/*
* Lockout the API and wait for operations to complete.
*/
- if ((ret = __rep_lockout_api(env, rep)) != 0)
+ if ((ret = __rep_lockout_api(env, rep)) != 0) {
+ FLD_CLR(rep->lockout_flags, REP_LOCKOUT_MSG);
goto errunlock;
+ }
/* OK, everyone is out, we can now run recovery. */
REP_SYSTEM_UNLOCK(env);
@@ -690,6 +701,10 @@ __rep_verify_match(env, reclsnp, savetime)
*/
if (db_rep->rep_db == NULL &&
(ret = __rep_client_dbinit(env, 0, REP_DB)) != 0) {
+ REP_SYSTEM_LOCK(env);
+ FLD_CLR(rep->lockout_flags,
+ REP_LOCKOUT_API | REP_LOCKOUT_MSG | REP_LOCKOUT_OP);
+ REP_SYSTEM_UNLOCK(env);
MUTEX_UNLOCK(env, rep->mtx_clientdb);
goto out;
}