diff options
author | Lorry Tar Creator <lorry-tar-importer@baserock.org> | 2015-02-17 17:25:57 +0000 |
---|---|---|
committer | <> | 2015-03-17 16:26:24 +0000 |
commit | 780b92ada9afcf1d58085a83a0b9e6bc982203d1 (patch) | |
tree | 598f8b9fa431b228d29897e798de4ac0c1d3d970 /src/rep | |
parent | 7a2660ba9cc2dc03a69ddfcfd95369395cc87444 (diff) | |
download | berkeleydb-master.tar.gz |
Diffstat (limited to 'src/rep')
-rw-r--r-- | src/rep/mlease.html | 2 | ||||
-rw-r--r-- | src/rep/rep.msg | 70 | ||||
-rw-r--r-- | src/rep/rep_automsg.c | 467 | ||||
-rw-r--r-- | src/rep/rep_backup.c | 2148 | ||||
-rw-r--r-- | src/rep/rep_elect.c | 74 | ||||
-rw-r--r-- | src/rep/rep_lease.c | 30 | ||||
-rw-r--r-- | src/rep/rep_log.c | 39 | ||||
-rw-r--r-- | src/rep/rep_method.c | 615 | ||||
-rw-r--r-- | src/rep/rep_record.c | 315 | ||||
-rw-r--r-- | src/rep/rep_region.c | 164 | ||||
-rw-r--r-- | src/rep/rep_stat.c | 76 | ||||
-rw-r--r-- | src/rep/rep_stub.c | 18 | ||||
-rw-r--r-- | src/rep/rep_util.c | 568 | ||||
-rw-r--r-- | src/rep/rep_verify.c | 27 |
14 files changed, 4165 insertions, 448 deletions
diff --git a/src/rep/mlease.html b/src/rep/mlease.html index 7d44b465..4e82f63c 100644 --- a/src/rep/mlease.html +++ b/src/rep/mlease.html @@ -1,5 +1,5 @@ <!DOCTYPE doctype PUBLIC "-//w3c//dtd html 4.0 transitional//en"> -<!--Copyright (c) 2011, 2012 Oracle and/or its affiliates. All rights reserved.--> +<!--Copyright (c) 2011, 2015 Oracle and/or its affiliates. All rights reserved.--> <html> <head> <meta http-equiv="Content-Type" diff --git a/src/rep/rep.msg b/src/rep/rep.msg index b751a64d..d5c56d93 100644 --- a/src/rep/rep.msg +++ b/src/rep/rep.msg @@ -1,7 +1,7 @@ /*- * See the file LICENSE for redistribution information. * - * Copyright (c) 2001, 2012 Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2001, 2015 Oracle and/or its affiliates. All rights reserved. * * $Id$ */ @@ -57,7 +57,22 @@ ARG pgsize u_int32_t ARG pgno db_pgno_t ARG max_pgno db_pgno_t ARG filenum u_int32_t -ARG finfo_flags u_int32_t +ARG finfo_flags u_int32_t +ARG type u_int32_t +ARG db_flags u_int32_t +ARG uid DBT +ARG info DBT +ARG dir DBT +ARG blob_fid_lo u_int32_t +ARG blob_fid_hi u_int32_t +END + +BEGIN_MSG fileinfo_v7 alloc check_length version +ARG pgsize u_int32_t +ARG pgno db_pgno_t +ARG max_pgno db_pgno_t +ARG filenum u_int32_t +ARG finfo_flags u_int32_t ARG type u_int32_t ARG db_flags u_int32_t ARG uid DBT @@ -158,3 +173,54 @@ ARG lsn DB_LSN ARG hist_sec u_int32_t ARG hist_nsec u_int32_t END + +/* + * Request for blob files. + */ +BEGIN_MSG blob_update_req +ARG blob_fid u_int64_t +ARG blob_sid u_int64_t +ARG blob_id u_int64_t +ARG highest_id u_int64_t +END + +/* + * A list of blob file for a database. + */ +BEGIN_MSG blob_update +ARG blob_fid u_int64_t +ARG highest_id u_int64_t +ARG flags u_int32_t +ARG num_blobs u_int32_t +END + +/* + * Blob file description, part of blob_update. + */ +BEGIN_MSG blob_file +ARG blob_sid u_int64_t +ARG blob_id u_int64_t +ARG blob_size u_int64_t +END + +/* + * A piece of data from a blob file. + */ +BEGIN_MSG blob_chunk +ARG flags u_int32_t +ARG blob_fid u_int64_t +ARG blob_sid u_int64_t +ARG blob_id u_int64_t +ARG offset u_int64_t +ARG data DBT +END + +/* + * Request for data from a blob file at the given offset. + */ +BEGIN_MSG blob_chunk_req +ARG blob_fid u_int64_t +ARG blob_sid u_int64_t +ARG blob_id u_int64_t +ARG offset u_int64_t +END diff --git a/src/rep/rep_automsg.c b/src/rep/rep_automsg.c index 5d8155fb..cab68b3e 100644 --- a/src/rep/rep_automsg.c +++ b/src/rep/rep_automsg.c @@ -280,6 +280,16 @@ __rep_fileinfo_marshal(env, version, argp, bp, max, lenp) memcpy(bp, argp->dir.data, argp->dir.size); bp += argp->dir.size; } + if (copy_only) { + memcpy(bp, &argp->blob_fid_lo, sizeof(u_int32_t)); + bp += sizeof(u_int32_t); + } else + DB_HTONL_COPYOUT(env, bp, argp->blob_fid_lo); + if (copy_only) { + memcpy(bp, &argp->blob_fid_hi, sizeof(u_int32_t)); + bp += sizeof(u_int32_t); + } else + DB_HTONL_COPYOUT(env, bp, argp->blob_fid_hi); *lenp = (size_t)(bp - start); return (0); @@ -386,6 +396,16 @@ __rep_fileinfo_unmarshal(env, version, argpp, bp, max, nextp) if (max < needed) goto too_few; bp += argp->dir.size; + if (copy_only) { + memcpy(&argp->blob_fid_lo, bp, sizeof(u_int32_t)); + bp += sizeof(u_int32_t); + } else + DB_NTOHL_COPYIN(env, argp->blob_fid_lo, bp); + if (copy_only) { + memcpy(&argp->blob_fid_hi, bp, sizeof(u_int32_t)); + bp += sizeof(u_int32_t); + } else + DB_NTOHL_COPYIN(env, argp->blob_fid_hi, bp); if (nextp != NULL) *nextp = bp; @@ -399,6 +419,211 @@ too_few: } /* + * PUBLIC: int __rep_fileinfo_v7_marshal __P((ENV *, u_int32_t, + * PUBLIC: __rep_fileinfo_v7_args *, u_int8_t *, size_t, size_t *)); + */ +int +__rep_fileinfo_v7_marshal(env, version, argp, bp, max, lenp) + ENV *env; + u_int32_t version; + __rep_fileinfo_v7_args *argp; + u_int8_t *bp; + size_t *lenp, max; +{ + int copy_only; + u_int8_t *start; + + if (max < __REP_FILEINFO_V7_SIZE + + (size_t)argp->uid.size + + (size_t)argp->info.size + + (size_t)argp->dir.size) + return (ENOMEM); + start = bp; + + copy_only = 0; + if (version < DB_REPVERSION_47) + copy_only = 1; + if (copy_only) { + memcpy(bp, &argp->pgsize, sizeof(u_int32_t)); + bp += sizeof(u_int32_t); + } else + DB_HTONL_COPYOUT(env, bp, argp->pgsize); + if (copy_only) { + memcpy(bp, &argp->pgno, sizeof(u_int32_t)); + bp += sizeof(u_int32_t); + } else + DB_HTONL_COPYOUT(env, bp, argp->pgno); + if (copy_only) { + memcpy(bp, &argp->max_pgno, sizeof(u_int32_t)); + bp += sizeof(u_int32_t); + } else + DB_HTONL_COPYOUT(env, bp, argp->max_pgno); + if (copy_only) { + memcpy(bp, &argp->filenum, sizeof(u_int32_t)); + bp += sizeof(u_int32_t); + } else + DB_HTONL_COPYOUT(env, bp, argp->filenum); + if (copy_only) { + memcpy(bp, &argp->finfo_flags, sizeof(u_int32_t)); + bp += sizeof(u_int32_t); + } else + DB_HTONL_COPYOUT(env, bp, argp->finfo_flags); + if (copy_only) { + memcpy(bp, &argp->type, sizeof(u_int32_t)); + bp += sizeof(u_int32_t); + } else + DB_HTONL_COPYOUT(env, bp, argp->type); + if (copy_only) { + memcpy(bp, &argp->db_flags, sizeof(u_int32_t)); + bp += sizeof(u_int32_t); + } else + DB_HTONL_COPYOUT(env, bp, argp->db_flags); + if (copy_only) { + memcpy(bp, &argp->uid.size, sizeof(u_int32_t)); + bp += sizeof(u_int32_t); + } else + DB_HTONL_COPYOUT(env, bp, argp->uid.size); + if (argp->uid.size > 0) { + memcpy(bp, argp->uid.data, argp->uid.size); + bp += argp->uid.size; + } + if (copy_only) { + memcpy(bp, &argp->info.size, sizeof(u_int32_t)); + bp += sizeof(u_int32_t); + } else + DB_HTONL_COPYOUT(env, bp, argp->info.size); + if (argp->info.size > 0) { + memcpy(bp, argp->info.data, argp->info.size); + bp += argp->info.size; + } + if (copy_only) { + memcpy(bp, &argp->dir.size, sizeof(u_int32_t)); + bp += sizeof(u_int32_t); + } else + DB_HTONL_COPYOUT(env, bp, argp->dir.size); + if (argp->dir.size > 0) { + memcpy(bp, argp->dir.data, argp->dir.size); + bp += argp->dir.size; + } + + *lenp = (size_t)(bp - start); + return (0); +} + +/* + * PUBLIC: int __rep_fileinfo_v7_unmarshal __P((ENV *, u_int32_t, + * PUBLIC: __rep_fileinfo_v7_args **, u_int8_t *, size_t, u_int8_t **)); + */ +int +__rep_fileinfo_v7_unmarshal(env, version, argpp, bp, max, nextp) + ENV *env; + u_int32_t version; + __rep_fileinfo_v7_args **argpp; + u_int8_t *bp; + size_t max; + u_int8_t **nextp; +{ + size_t needed; + __rep_fileinfo_v7_args *argp; + int ret; + int copy_only; + + needed = __REP_FILEINFO_V7_SIZE; + if (max < needed) + goto too_few; + if ((ret = __os_malloc(env, sizeof(*argp), &argp)) != 0) + return (ret); + + copy_only = 0; + if (version < DB_REPVERSION_47) + copy_only = 1; + if (copy_only) { + memcpy(&argp->pgsize, bp, sizeof(u_int32_t)); + bp += sizeof(u_int32_t); + } else + DB_NTOHL_COPYIN(env, argp->pgsize, bp); + if (copy_only) { + memcpy(&argp->pgno, bp, sizeof(u_int32_t)); + bp += sizeof(u_int32_t); + } else + DB_NTOHL_COPYIN(env, argp->pgno, bp); + if (copy_only) { + memcpy(&argp->max_pgno, bp, sizeof(u_int32_t)); + bp += sizeof(u_int32_t); + } else + DB_NTOHL_COPYIN(env, argp->max_pgno, bp); + if (copy_only) { + memcpy(&argp->filenum, bp, sizeof(u_int32_t)); + bp += sizeof(u_int32_t); + } else + DB_NTOHL_COPYIN(env, argp->filenum, bp); + if (copy_only) { + memcpy(&argp->finfo_flags, bp, sizeof(u_int32_t)); + bp += sizeof(u_int32_t); + } else + DB_NTOHL_COPYIN(env, argp->finfo_flags, bp); + if (copy_only) { + memcpy(&argp->type, bp, sizeof(u_int32_t)); + bp += sizeof(u_int32_t); + } else + DB_NTOHL_COPYIN(env, argp->type, bp); + if (copy_only) { + memcpy(&argp->db_flags, bp, sizeof(u_int32_t)); + bp += sizeof(u_int32_t); + } else + DB_NTOHL_COPYIN(env, argp->db_flags, bp); + if (copy_only) { + memcpy(&argp->uid.size, bp, sizeof(u_int32_t)); + bp += sizeof(u_int32_t); + } else + DB_NTOHL_COPYIN(env, argp->uid.size, bp); + if (argp->uid.size == 0) + argp->uid.data = NULL; + else + argp->uid.data = bp; + needed += (size_t)argp->uid.size; + if (max < needed) + goto too_few; + bp += argp->uid.size; + if (copy_only) { + memcpy(&argp->info.size, bp, sizeof(u_int32_t)); + bp += sizeof(u_int32_t); + } else + DB_NTOHL_COPYIN(env, argp->info.size, bp); + if (argp->info.size == 0) + argp->info.data = NULL; + else + argp->info.data = bp; + needed += (size_t)argp->info.size; + if (max < needed) + goto too_few; + bp += argp->info.size; + if (copy_only) { + memcpy(&argp->dir.size, bp, sizeof(u_int32_t)); + bp += sizeof(u_int32_t); + } else + DB_NTOHL_COPYIN(env, argp->dir.size, bp); + if (argp->dir.size == 0) + argp->dir.data = NULL; + else + argp->dir.data = bp; + needed += (size_t)argp->dir.size; + if (max < needed) + goto too_few; + bp += argp->dir.size; + + if (nextp != NULL) + *nextp = bp; + *argpp = argp; + return (0); + +too_few: + __db_errx(env, DB_STR("3675", + "Not enough input bytes to fill a __rep_fileinfo_v7 message")); + return (EINVAL); +} + +/* * PUBLIC: int __rep_fileinfo_v6_marshal __P((ENV *, u_int32_t, * PUBLIC: __rep_fileinfo_v6_args *, u_int8_t *, size_t, size_t *)); */ @@ -1039,3 +1264,245 @@ too_few: return (EINVAL); } +/* + * PUBLIC: void __rep_blob_update_req_marshal __P((ENV *, + * PUBLIC: __rep_blob_update_req_args *, u_int8_t *)); + */ +void +__rep_blob_update_req_marshal(env, argp, bp) + ENV *env; + __rep_blob_update_req_args *argp; + u_int8_t *bp; +{ + DB_HTONLL_COPYOUT(env, bp, argp->blob_fid); + DB_HTONLL_COPYOUT(env, bp, argp->blob_sid); + DB_HTONLL_COPYOUT(env, bp, argp->blob_id); + DB_HTONLL_COPYOUT(env, bp, argp->highest_id); +} + +/* + * PUBLIC: int __rep_blob_update_req_unmarshal __P((ENV *, + * PUBLIC: __rep_blob_update_req_args *, u_int8_t *, size_t, u_int8_t **)); + */ +int +__rep_blob_update_req_unmarshal(env, argp, bp, max, nextp) + ENV *env; + __rep_blob_update_req_args *argp; + u_int8_t *bp; + size_t max; + u_int8_t **nextp; +{ + if (max < __REP_BLOB_UPDATE_REQ_SIZE) + goto too_few; + DB_NTOHLL_COPYIN(env, argp->blob_fid, bp); + DB_NTOHLL_COPYIN(env, argp->blob_sid, bp); + DB_NTOHLL_COPYIN(env, argp->blob_id, bp); + DB_NTOHLL_COPYIN(env, argp->highest_id, bp); + + if (nextp != NULL) + *nextp = bp; + return (0); + +too_few: + __db_errx(env, DB_STR("3675", + "Not enough input bytes to fill a __rep_blob_update_req message")); + return (EINVAL); +} + +/* + * PUBLIC: void __rep_blob_update_marshal __P((ENV *, + * PUBLIC: __rep_blob_update_args *, u_int8_t *)); + */ +void +__rep_blob_update_marshal(env, argp, bp) + ENV *env; + __rep_blob_update_args *argp; + u_int8_t *bp; +{ + DB_HTONLL_COPYOUT(env, bp, argp->blob_fid); + DB_HTONLL_COPYOUT(env, bp, argp->highest_id); + DB_HTONL_COPYOUT(env, bp, argp->flags); + DB_HTONL_COPYOUT(env, bp, argp->num_blobs); +} + +/* + * PUBLIC: int __rep_blob_update_unmarshal __P((ENV *, + * PUBLIC: __rep_blob_update_args *, u_int8_t *, size_t, u_int8_t **)); + */ +int +__rep_blob_update_unmarshal(env, argp, bp, max, nextp) + ENV *env; + __rep_blob_update_args *argp; + u_int8_t *bp; + size_t max; + u_int8_t **nextp; +{ + if (max < __REP_BLOB_UPDATE_SIZE) + goto too_few; + DB_NTOHLL_COPYIN(env, argp->blob_fid, bp); + DB_NTOHLL_COPYIN(env, argp->highest_id, bp); + DB_NTOHL_COPYIN(env, argp->flags, bp); + DB_NTOHL_COPYIN(env, argp->num_blobs, bp); + + if (nextp != NULL) + *nextp = bp; + return (0); + +too_few: + __db_errx(env, DB_STR("3675", + "Not enough input bytes to fill a __rep_blob_update message")); + return (EINVAL); +} + +/* + * PUBLIC: void __rep_blob_file_marshal __P((ENV *, + * PUBLIC: __rep_blob_file_args *, u_int8_t *)); + */ +void +__rep_blob_file_marshal(env, argp, bp) + ENV *env; + __rep_blob_file_args *argp; + u_int8_t *bp; +{ + DB_HTONLL_COPYOUT(env, bp, argp->blob_sid); + DB_HTONLL_COPYOUT(env, bp, argp->blob_id); + DB_HTONLL_COPYOUT(env, bp, argp->blob_size); +} + +/* + * PUBLIC: int __rep_blob_file_unmarshal __P((ENV *, + * PUBLIC: __rep_blob_file_args *, u_int8_t *, size_t, u_int8_t **)); + */ +int +__rep_blob_file_unmarshal(env, argp, bp, max, nextp) + ENV *env; + __rep_blob_file_args *argp; + u_int8_t *bp; + size_t max; + u_int8_t **nextp; +{ + if (max < __REP_BLOB_FILE_SIZE) + goto too_few; + DB_NTOHLL_COPYIN(env, argp->blob_sid, bp); + DB_NTOHLL_COPYIN(env, argp->blob_id, bp); + DB_NTOHLL_COPYIN(env, argp->blob_size, bp); + + if (nextp != NULL) + *nextp = bp; + return (0); + +too_few: + __db_errx(env, DB_STR("3675", + "Not enough input bytes to fill a __rep_blob_file message")); + return (EINVAL); +} + +/* + * PUBLIC: void __rep_blob_chunk_marshal __P((ENV *, + * PUBLIC: __rep_blob_chunk_args *, u_int8_t *)); + */ +void +__rep_blob_chunk_marshal(env, argp, bp) + ENV *env; + __rep_blob_chunk_args *argp; + u_int8_t *bp; +{ + DB_HTONL_COPYOUT(env, bp, argp->flags); + DB_HTONLL_COPYOUT(env, bp, argp->blob_fid); + DB_HTONLL_COPYOUT(env, bp, argp->blob_sid); + DB_HTONLL_COPYOUT(env, bp, argp->blob_id); + DB_HTONLL_COPYOUT(env, bp, argp->offset); + DB_HTONL_COPYOUT(env, bp, argp->data.size); + if (argp->data.size > 0) { + memcpy(bp, argp->data.data, argp->data.size); + bp += argp->data.size; + } +} + +/* + * PUBLIC: int __rep_blob_chunk_unmarshal __P((ENV *, + * PUBLIC: __rep_blob_chunk_args *, u_int8_t *, size_t, u_int8_t **)); + */ +int +__rep_blob_chunk_unmarshal(env, argp, bp, max, nextp) + ENV *env; + __rep_blob_chunk_args *argp; + u_int8_t *bp; + size_t max; + u_int8_t **nextp; +{ + size_t needed; + + needed = __REP_BLOB_CHUNK_SIZE; + if (max < needed) + goto too_few; + DB_NTOHL_COPYIN(env, argp->flags, bp); + DB_NTOHLL_COPYIN(env, argp->blob_fid, bp); + DB_NTOHLL_COPYIN(env, argp->blob_sid, bp); + DB_NTOHLL_COPYIN(env, argp->blob_id, bp); + DB_NTOHLL_COPYIN(env, argp->offset, bp); + DB_NTOHL_COPYIN(env, argp->data.size, bp); + if (argp->data.size == 0) + argp->data.data = NULL; + else + argp->data.data = bp; + needed += (size_t)argp->data.size; + if (max < needed) + goto too_few; + bp += argp->data.size; + + if (nextp != NULL) + *nextp = bp; + return (0); + +too_few: + __db_errx(env, DB_STR("3675", + "Not enough input bytes to fill a __rep_blob_chunk message")); + return (EINVAL); +} + +/* + * PUBLIC: void __rep_blob_chunk_req_marshal __P((ENV *, + * PUBLIC: __rep_blob_chunk_req_args *, u_int8_t *)); + */ +void +__rep_blob_chunk_req_marshal(env, argp, bp) + ENV *env; + __rep_blob_chunk_req_args *argp; + u_int8_t *bp; +{ + DB_HTONLL_COPYOUT(env, bp, argp->blob_fid); + DB_HTONLL_COPYOUT(env, bp, argp->blob_sid); + DB_HTONLL_COPYOUT(env, bp, argp->blob_id); + DB_HTONLL_COPYOUT(env, bp, argp->offset); +} + +/* + * PUBLIC: int __rep_blob_chunk_req_unmarshal __P((ENV *, + * PUBLIC: __rep_blob_chunk_req_args *, u_int8_t *, size_t, u_int8_t **)); + */ +int +__rep_blob_chunk_req_unmarshal(env, argp, bp, max, nextp) + ENV *env; + __rep_blob_chunk_req_args *argp; + u_int8_t *bp; + size_t max; + u_int8_t **nextp; +{ + if (max < __REP_BLOB_CHUNK_REQ_SIZE) + goto too_few; + DB_NTOHLL_COPYIN(env, argp->blob_fid, bp); + DB_NTOHLL_COPYIN(env, argp->blob_sid, bp); + DB_NTOHLL_COPYIN(env, argp->blob_id, bp); + DB_NTOHLL_COPYIN(env, argp->offset, bp); + + if (nextp != NULL) + *nextp = bp; + return (0); + +too_few: + __db_errx(env, DB_STR("3675", + "Not enough input bytes to fill a __rep_blob_chunk_req message")); + return (EINVAL); +} + diff --git a/src/rep/rep_backup.c b/src/rep/rep_backup.c index cfde7622..14bc63bb 100644 --- a/src/rep/rep_backup.c +++ b/src/rep/rep_backup.c @@ -1,7 +1,7 @@ /*- * See the file LICENSE for redistribution information. * - * Copyright (c) 2004, 2012 Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2004, 2015 Oracle and/or its affiliates. All rights reserved. * * $Id$ */ @@ -9,6 +9,7 @@ #include "db_config.h" #include "db_int.h" +#include "dbinc/blob.h" #include "dbinc/db_page.h" #include "dbinc/db_am.h" #include "dbinc/fop.h" @@ -26,21 +27,45 @@ * Note that the fileinfo for the first file in the list always appears at * (constant) offset __REP_UPDATE_SIZE in the buffer. */ +#define FILE_CTX_INMEM_ONLY 0x01 typedef struct { u_int8_t *buf; /* Buffer base address. */ u_int32_t size; /* Total allocated buffer size. */ u_int8_t *fillptr; /* Pointer to first unused space. */ u_int32_t count; /* Number of entries currently in list. */ u_int32_t version; /* Rep version of marshaled format. */ + u_int32_t flags; /* Context flags. */ } FILE_LIST_CTX; #define FIRST_FILE_PTR(buf) ((buf) + __REP_UPDATE_SIZE) /* + * Flags used to show the state of blob files on the master in messages + * sent to the client. + */ +#define BLOB_DONE 0x01 +#define BLOB_DELETE 0x02 +#define BLOB_CHUNK_FAIL 0x04 + +#define BLOB_ID_SIZE sizeof(db_seq_t) +#define BLOB_KEY_SIZE (2 * BLOB_ID_SIZE) + +/* * Function that performs any desired processing on a single file, as part of * the traversal of a list of database files, such as with internal init. */ typedef int (FILE_WALK_FN) __P((ENV *, __rep_fileinfo_args *, void *)); +static int __rep_add_files_to_list __P(( + ENV *, const char *, const char *, FILE_LIST_CTX *, const char **, int)); +static int __rep_blob_chunk_gap + __P((ENV *, int, DB_THREAD_INFO *, REP *, int *, db_seq_t, int)); +static int __rep_blob_cleanup __P((ENV *, REP *)); +static int __rep_blobdone + __P((ENV *, int, DB_THREAD_INFO *, REP *, db_seq_t, int)); +static int __rep_blob_find_files __P((ENV *, DB_THREAD_INFO *, const char *, + db_seq_t *, db_seq_t, db_seq_t, db_seq_t *, DBT *, size_t *, u_int32_t *)); +static int __rep_blob_sort_dirs __P((ENV *, + int (*)(const char *), char **, int, char ***, int *)); static FILE_WALK_FN __rep_check_uid; static int __rep_clean_interrupted __P((ENV *)); static FILE_WALK_FN __rep_cleanup_nimdbs; @@ -52,6 +77,8 @@ static int __rep_get_fileinfo __P((ENV *, const char *, const char *, __rep_fileinfo_args *, u_int8_t *)); static int __rep_get_file_list __P((ENV *, DB_FH *, u_int32_t, u_int32_t *, DBT *)); +static int __rep_init_file_list_context __P((ENV *, + u_int32_t, u_int32_t, int, FILE_LIST_CTX *)); static int __rep_is_replicated_db __P((const char *, const char *)); static int __rep_log_setup __P((ENV *, REP *, u_int32_t, u_int32_t, DB_LSN *)); @@ -72,9 +99,12 @@ static FILE_WALK_FN __rep_remove_file; static int __rep_remove_logs __P((ENV *)); static int __rep_remove_nimdbs __P((ENV *)); static int __rep_rollback __P((ENV *, DB_LSN *)); +static int __rep_select_blob_file __P((const char *)); +static int __rep_select_blob_sdb __P((const char *)); static int __rep_unlink_by_list __P((ENV *, u_int32_t, u_int8_t *, u_int32_t, u_int32_t)); static FILE_WALK_FN __rep_unlink_file; +static int __rep_walk_blob_dir __P((ENV *, FILE_LIST_CTX*)); static int __rep_walk_filelist __P((ENV *, u_int32_t, u_int8_t *, u_int32_t, u_int32_t, FILE_WALK_FN *, void *)); static int __rep_walk_dir __P((ENV *, const char *, const char *, @@ -129,14 +159,12 @@ __rep_update_req(env, rp) dblp = env->lg_handle; logc = NULL; - if ((ret = __os_calloc(env, 1, MEGABYTE, &context.buf)) != 0) - goto err_noalloc; - context.size = MEGABYTE; - context.count = 0; - context.version = rp->rep_version; /* Reserve space for the update_args, and fill in file info. */ - context.fillptr = FIRST_FILE_PTR(context.buf); + if ((ret = __rep_init_file_list_context(env, rp->rep_version, + F_ISSET(rp, REPCTL_INMEM_ONLY) ? FILE_CTX_INMEM_ONLY : 0, + 1, &context)) != 0) + goto err_noalloc; if ((ret = __rep_find_dbs(env, &context)) != 0) goto err; @@ -214,6 +242,472 @@ err_noalloc: } /* + * Passed to the __rep_blob_sort_dirs function. + * Select blob files, of the form __db.bl### + */ +static int +__rep_select_blob_file(file) + const char *file; +{ + if (strncmp(BLOB_FILE_PREFIX, file, strlen(BLOB_FILE_PREFIX)) == 0) + return (1); + else + return (0); +} + +/* + * Passed to the __rep_blob_sort_dirs function. + * Select blob subdatabase directories, of the form __db### + */ +static int +__rep_select_blob_sdb(file) + const char *file; +{ + if (strncmp(BLOB_DIR_PREFIX, file, strlen(BLOB_DIR_PREFIX)) == 0 && + strncmp(BLOB_FILE_PREFIX, file, strlen(BLOB_FILE_PREFIX)) != 0 && + strcmp(BLOB_META_FILE_NAME, file) != 0) + return (1); + else + return (0); +} + +/* + * __rep_blob_sort_dirs + * Create a sorted list of directory names that all share a type that + * is selected using the given function. + */ +static int +__rep_blob_sort_dirs(env, select_fn, dirs, dirs_cnt, sorted, sorted_cnt) + ENV *env; + int (*select_fn) __P((const char *)); + char **dirs; + int dirs_cnt; + char ***sorted; + int *sorted_cnt; +{ + char **sort, *tmp; + int i, ret, size, sort_cnt, swapped; + + *sorted = NULL; + *sorted_cnt = 0; + sort_cnt = 0; + + if ((ret = __os_malloc(env, + (sizeof(char *) * (unsigned int)dirs_cnt), &sort)) != 0) + return (ret); + + for (i = 0; i < dirs_cnt; i++) { + if (select_fn(dirs[i])) { + sort[sort_cnt] = dirs[i]; + sort_cnt++; + } + } + + /* + * Directories are usually returned in order, or close to it, so use + * Bubble Sort to sort the list. + */ + size = sort_cnt; + swapped = 1; + while (swapped == 1 && size > 1) { + swapped = 0; + for (i = 0; (i + 1) < size; i++) { + if (strcmp(sort[i], sort[i+1]) > 0) { + tmp = sort[i]; + sort[i] = sort[i+1]; + sort[i+1] = tmp; + swapped = 1; + } + } + size--; + } + + *sorted = sort; + *sorted_cnt = sort_cnt; + + return (0); +} + +#define BLOB_THROTTLE_DEFAULT (10 * MEGABYTE) + +/* + * __rep_blob_update_req + * Send a list of blob files, starting after the blob id and sub-database + * id sent in the BLOB_UPDATE_REQ message. + * + * PUBLIC: int __rep_blob_update_req __P((ENV *, DB_THREAD_INFO *, DBT *)); + */ +int +__rep_blob_update_req(env, ip, rec) + ENV *env; + DB_THREAD_INFO *ip; + DBT *rec; +{ + DBT rbudbt; + REP *rep; + __rep_blob_update_args rbu; + __rep_blob_update_req_args rbur; + db_seq_t blob_fid, blob_id, blob_sdb, tmp; + int cur, dirs_cnt, ret, sdb_cnt; + size_t sent; + char *blob_sub_dir, *dir, **dirs, **sdb; + u_int32_t num_blobs, throttle; + u_int8_t *ptr; + + memset(&rbu, 0, sizeof(__rep_blob_update_args)); + memset(&rbudbt, 0, sizeof(DBT)); + blob_sub_dir = dir = NULL; + dirs = sdb = NULL; + sent = 0; + num_blobs = 0; + cur = dirs_cnt = sdb_cnt = 0; + rep = env->rep_handle->region; + throttle = rep->gbytes * GIGABYTE + rep->bytes; + if (throttle == 0) + throttle = BLOB_THROTTLE_DEFAULT; + + if ((ret = __rep_blob_update_req_unmarshal( + env, &rbur, rec->data, rec->size, &ptr)) != 0) + goto err; + + RPRINT(env, (env, DB_VERB_REP_SYNC, +"blob_update_req: file_id %llu sdb_id %llu blob_id %llu highest %llu", + (long long)rbur.blob_fid, (long long)rbur.blob_sid, + (long long)rbur.blob_id, (long long)rbur.highest_id)); + + rbu.blob_fid = rbur.blob_fid; + + if ((ret = __os_malloc(env, MEGABYTE, &rbudbt.data)) != 0) + goto err; + rbudbt.ulen = MEGABYTE; + rbudbt.size = __REP_BLOB_UPDATE_SIZE; + + blob_fid = (db_seq_t)rbur.blob_fid; + blob_sdb = (db_seq_t)rbur.blob_sid; + blob_id = (db_seq_t)rbur.blob_id; + + /* Find the first blob file if it is unknown. */ + if (blob_id == 0 && blob_sdb == 0) { +find_sdb: if (dirs == NULL) { + if ((ret = __blob_make_sub_dir( + env, &blob_sub_dir, blob_fid, 0)) != 0) + goto err; + if ((ret = __db_appname( + env, DB_APP_BLOB, blob_sub_dir, NULL, &dir)) != 0) + goto err; + /* If no directory, there are no blobs to send. */ + if (__os_exists(env, dir, NULL) != 0) + goto filedone; + + if ((ret = __os_dirlist( + env, dir, 1, &dirs, &dirs_cnt)) != 0) + goto err; + + if (dirs_cnt == 0) + goto filedone; + + if ((ret = __rep_blob_sort_dirs( + env, __rep_select_blob_sdb, + dirs, dirs_cnt, &sdb, &sdb_cnt)) != 0) + goto err; + } + /* + * Iterate through the list of subdirectories, until we find + * one that has an id larger than the current subdirectory id. + */ + while (cur < sdb_cnt) { + if ((ret = __blob_path_to_dir_ids( + env, sdb[cur], &tmp, NULL)) != 0) + goto err; + if (blob_sdb < tmp) { + blob_sdb = tmp; + break; + } + cur++; + } + /* Check if no more subdirectories to search */ + if (sdb_cnt != 0 && cur == sdb_cnt) + goto filedone; + if (dir != NULL) + __os_free(env, dir); + dir = NULL; + if (blob_sub_dir != NULL) + __os_free(env, blob_sub_dir); + blob_sub_dir = NULL; + } + + if (blob_sub_dir == NULL && (ret = + __blob_make_sub_dir(env, &blob_sub_dir, blob_fid, blob_sdb)) != 0) + goto err; + + if (dir == NULL && (ret = __db_appname( + env, DB_APP_BLOB, blob_sub_dir, NULL, &dir)) != 0) + goto err; + /* Search the current directory for blob files with id > blob_id. */ + if ((ret = __rep_blob_find_files( + env, ip, dir, &blob_id, blob_sdb, blob_fid, + (db_seq_t *)&rbur.highest_id, &rbudbt, &sent, &num_blobs)) != 0) + goto err; + + /* + * If we have not reached the send limit, and there are still + * directories to search, then search the next directory. + */ + if (sent < throttle) { + if (blob_sdb != 0) { + rbur.highest_id = 0; + blob_id = 0; + __os_free(env, blob_sub_dir); + blob_sub_dir = NULL; + __os_free(env, dir); + dir = NULL; + goto find_sdb; + } else { + /* Mark as the end of the files. */ +filedone: F_SET(&rbu, BLOB_DONE); + rbur.highest_id = 0; + } + } else + STAT(rep->stat.st_nthrottles++); + + rbu.num_blobs = num_blobs; + rbu.highest_id = rbur.highest_id; + __rep_blob_update_marshal(env, &rbu, rbudbt.data); + RPRINT(env, (env, DB_VERB_REP_SYNC, + "Sending blob_update: file_id %llu, num_blobs %lu, flags %lu", + (long long)rbu.blob_fid, + (long)num_blobs, (unsigned long)rbu.flags)); + (void)__rep_send_message( + env, DB_EID_BROADCAST, REP_BLOB_UPDATE, NULL, &rbudbt, 0, 0); + +err: if (sdb != NULL) + __os_free(env, sdb); + if (dirs != NULL) + __os_dirfree(env, dirs, dirs_cnt); + if (dir != NULL) + __os_free(env, dir); + if (blob_sub_dir != NULL) + __os_free(env, blob_sub_dir); + if (rbudbt.data != NULL) + __os_free(env, rbudbt.data); + return (ret); +} + +/* + * __rep_blob_find_files + * + * Search a directory for blob files, starting with the given blob id and + * sub-database id. Add information for each blob to the message buffer until + * there are no more files, or it has reached the maximum send amount in terms + * of combined blob files size. + * + * This search is complicated because the blobs have to be sent in order by id, + * but there can be huge holes between a blob file and the one with the next + * highest id, so iterating through the ids looking to see if the file exists + * for each id will take too long. The solution is to walk the directory + * hierarchy in order, reading every file in that directory, sorting them by + * id, and adding them to the update list. + */ +static int +__rep_blob_find_files( + env, ip, dir, blob_id, blob_sid, blob_fid, highest, buf, sent, num) + ENV *env; + DB_THREAD_INFO *ip; + const char *dir; + db_seq_t *blob_id; + db_seq_t blob_sid; + db_seq_t blob_fid; + db_seq_t *highest; + DBT *buf; + size_t *sent; + u_int32_t *num; +{ + DB *bmd; + DB_FH *fhp; + DB_TXN *txn; + REP *rep; + __rep_blob_file_args rbf; + char blob_path[MAX_BLOB_PATH_SZ], **dirs, **files, *path, *ptr; + db_seq_t tmp; + int blob_path_len, cur, depth, dirs_cnt, files_cnt, ret; + off_t blob_size; + size_t len; + u_int32_t bytes, mbytes, throttle; + + bmd = NULL; + txn = NULL; + fhp = NULL; + path = NULL; + dirs = files = NULL; + dirs_cnt = files_cnt = 0; + rbf.blob_sid = (u_int64_t)blob_sid; + rep = env->rep_handle->region; + throttle = rep->gbytes * GIGABYTE + rep->bytes; + if (throttle == 0) + throttle = BLOB_THROTTLE_DEFAULT; + + if ((ret = __os_malloc( + env, strlen(dir) + MAX_BLOB_PATH_SZ, &path)) != 0) + goto err; + + /* + * Read the highest possible blob id from the blob meta database, so + * we know when to stop looking for files for this database. The + * highest value is reset everytime we switch to a new subdatabase. + */ + if (*highest == 0) { + if ((ret = __db_create_internal(&bmd, env, 0)) != 0) + goto err; + + if ((ret = __txn_begin( + env, ip, NULL, &txn, DB_IGNORE_LEASE)) != 0) + goto err; + + bmd->blob_file_id = blob_fid; + bmd->blob_sdb_id = blob_sid; + if ((ret = __blob_highest_id(bmd, txn, highest) ) != 0) + goto err; + + if ((ret = __txn_abort(txn)) != 0) + goto err; + txn = NULL; + if ((ret = __db_close(bmd, NULL, 0)) != 0) + goto err; + bmd = NULL; + (*highest)++; + } + + (*blob_id)++; + while (*sent < throttle && *blob_id < *highest) { + memset(blob_path, 0, MAX_BLOB_PATH_SZ); + blob_path_len = depth = 0; + + /* Calucate the subdirectory from the blob id. */ + __blob_calculate_dirs( + *blob_id, blob_path, &blob_path_len, &depth); + if (blob_path_len != 0) { + (void)sprintf(path, "%s%c%s%c", + dir, PATH_SEPARATOR[0], blob_path, PATH_SEPARATOR[0]); + } else + (void)sprintf(path, "%s", dir); + len = strlen(path); + + /* If the sub-directory does not exist, look for the next. */ + if (__os_exists(env, path, NULL) != 0) { + (*blob_id) += + BLOB_DIR_ELEMS - (*blob_id % BLOB_DIR_ELEMS); + continue; + } + + /* Get a list of all the blob files, sorted by id. */ + if ((ret = __os_dirlist(env, path, 0, &dirs, &dirs_cnt)) != 0) + goto err; + + if ((ret = __rep_blob_sort_dirs(env, __rep_select_blob_file, + dirs, dirs_cnt, &files, &files_cnt)) != 0) + goto err; + + /* + * Find the first blob file with an id greater than or equal to + * the last id. + */ + for (cur = 0; cur < files_cnt; cur++) { + ptr = files[cur]; + ptr += strlen(BLOB_FILE_PREFIX); + if ((ret = __blob_str_to_id( + env, (const char **)&ptr, &tmp)) != 0) + goto err; + DB_ASSERT(env, tmp != 0); + if (tmp >= *blob_id) + break; + } + + /* Add each remaining blob file to the message buffer. */ + while (cur < files_cnt) { + /* Get the blob id from the current file name. */ + (void)sprintf(path + len, "%s", files[cur]); + ptr = path + len + strlen(BLOB_FILE_PREFIX); + if ((ret = __blob_str_to_id( + env, (const char **)&ptr, blob_id)) != 0) + goto err; + rbf.blob_id = (u_int64_t)*blob_id; + /* Open the file and get its size. */ + if ((ret = __os_open( + env, path, 0, DB_OSO_RDONLY, 0, &fhp)) != 0) { + if (ret == ENOENT) { + ret = 0; + RPRINT(env, (env, DB_VERB_REP_SYNC, + "blob_update blob file: %llu deleted, skipping.", + (long long)rbf.blob_id)); + cur++; + continue; + } + goto err; + } + if ((ret = __os_ioinfo( + env, path, fhp, &mbytes, &bytes, NULL)) != 0) + goto err; + if ((ret =__os_closehandle(env, fhp)) != 0) + goto err; + fhp = NULL; + blob_size = ((off_t)mbytes * (off_t)MEGABYTE) + bytes; + rbf.blob_size = (u_int64_t)blob_size; + if (blob_size > UINT32_MAX) + (*sent) = throttle + 1; + else { + if (((*sent) + (size_t)blob_size) < (*sent)) + (*sent) = throttle + 1; + else + (*sent) += (size_t)blob_size; + } + __rep_blob_file_marshal( + env, &rbf, (u_int8_t *)buf->data + buf->size); + (*num)++; + buf->size += __REP_BLOB_FILE_SIZE; + RPRINT(env, (env, DB_VERB_REP_SYNC, + "blob_update adding: blob_sid %llu, blob_id %llu blob_size %llu", + (long long)rbf.blob_sid, + (long long)rbf.blob_id, (long long)rbf.blob_size)); + if ((*sent) > throttle) + goto err; + + /* Resize if there is not enough space to grow. */ + if (buf->size > (buf->ulen - __REP_BLOB_FILE_SIZE)) { + if ((ret = __os_realloc( + env, buf->ulen * 2, &buf->data)) != 0) + goto err; + buf->ulen *= 2; + } + cur++; + } + /* + * Move to the next directory of blob files by setting the blob + * id to the next largest possible value. + */ + (*blob_id) += BLOB_DIR_ELEMS - (*blob_id % BLOB_DIR_ELEMS); + __os_free(env, files); + files = NULL; + __os_dirfree(env, dirs, dirs_cnt); + dirs = NULL; + } +err: + if (path != NULL) + __os_free(env, path); + if (files != NULL) + __os_free(env, files); + if (dirs != NULL) + __os_dirfree(env, dirs, dirs_cnt); + if (fhp != NULL) + (void)__os_closehandle(env, fhp); + if (txn != NULL) + (void)__txn_abort(txn); + if (bmd != NULL) + (void)__db_close(bmd, NULL, 0); + + return (ret); +} + +/* * __rep_find_dbs - * Walk through all the named files/databases including those in the * environment or data_dirs and those that in named and in-memory. We @@ -240,7 +734,8 @@ __rep_find_dbs(env, context) * replicated user databases. If the application has a metadata_dir, * this will also find any persistent internal system databases. */ - if (dbenv->db_data_dir != NULL) { + if (!F_ISSET(context, FILE_CTX_INMEM_ONLY) && + dbenv->db_data_dir != NULL) { for (ddir = dbenv->db_data_dir; *ddir != NULL; ++ddir) { if ((ret = __db_appname(env, DB_APP_NONE, *ddir, NULL, &real_dir)) != 0) @@ -252,16 +747,24 @@ __rep_find_dbs(env, context) real_dir = NULL; } } + /* * Walk the environment directory. If the application doesn't * have a metadata_dir, this will return persistent internal system * databases. If the application doesn't have a separate data * directory, this will also return all user databases. */ - if (ret == 0) + if (!F_ISSET(context, FILE_CTX_INMEM_ONLY) && ret == 0) ret = __rep_walk_dir(env, env->db_home, NULL, context); - /* Now, collect any in-memory named databases. */ + /* Gather the databases in the blob directory. */ + if (!F_ISSET(context, FILE_CTX_INMEM_ONLY) && ret == 0) + ret = __rep_walk_blob_dir(env, context); + + /* + * Now, collect any in-memory named databases. We do this no + * matter if the INMEM_ONLY flag is set or not. + */ if (ret == 0) ret = __rep_walk_dir(env, NULL, NULL, context); @@ -271,6 +774,148 @@ __rep_find_dbs(env, context) } /* + * __rep_walk_blob_dir -- + * + * The blob directory hierarchy consists of a top layer that contains the + * blob meta database (BMD) and a set of blob directories (BLDIR). + * Each BLDIR corresponds to a database file. If the database file doesn't + * contain subdatabases, the BLDIR contains a BMD and blob files. If the + * database file contains subdatabases, the BLDIR contains a BLSDIR + * subdirectory for each subdatabase. Each BLSDIR contains a BMD and blob + * files. + * + * This function walks the blob directory hierarchy and records any BMD. + * It first checks if the top level BMD exists, and if it does searches + * the first and second layers of the hierarchy for BMDs. + */ +static int +__rep_walk_blob_dir(env, context) + ENV *env; + FILE_LIST_CTX *context; +{ + int cnt, cnt2, i, j, ret; + size_t len; + char *blob_dir, *blob_sub, **dirs, *name, *name2, **subdirs; + char blob_sub_buf[MAX_BLOB_PATH_SZ]; + const char *bmd, *dirp; + + cnt = cnt2 = 0; + blob_dir = name = name2 = NULL; + dirs = subdirs = NULL; + bmd = BLOB_META_FILE_NAME; + blob_sub = blob_sub_buf; + + if ((ret = __db_appname( + env, DB_APP_BLOB, BLOB_META_FILE_NAME, &dirp, &name)) != 0) + goto err; + + /* + * If the main blob meta database does not exist, then no databases in + * the environment supports blobs. + */ + if ((ret = __os_exists(env, name, NULL)) != 0) { + ret = 0; + goto err; + } + + /* Get the blob directory. */ + if ((ret = __db_appname( + env, DB_APP_BLOB, NULL, &dirp, &blob_dir)) != 0) + goto err; + + if ((ret = __rep_add_files_to_list( + env, blob_dir, NULL, context, &bmd, 1)) != 0) + goto err; + + if ((ret = __os_dirlist(env, blob_dir, 1, &dirs, &cnt)) != 0) + goto err; + + __os_free(env, name); + name = NULL; + if ((ret = __os_malloc( + env, MAX_BLOB_PATH_SZ + strlen(blob_dir), &name)) != 0) + goto err; + + for (i = 0; i < cnt; i++) { + /* + * Skip blob files and the top level BMD + * (which was handled above). + */ + if (IS_BLOB_META(dirs[i]) || IS_BLOB_FILE(dirs[i])) + continue; + len = strlen(blob_dir) + + strlen(dirs[i]) + strlen(BLOB_META_FILE_NAME) + 3; + (void)snprintf(name, len, "%s%c%s%c%s", blob_dir, + PATH_SEPARATOR[0], dirs[i], PATH_SEPARATOR[0], + BLOB_META_FILE_NAME); + /* + * If a blob meta database exists, add it to the list, and move + * on to the next directory, otherwise get a directory list and + * check the second layer for BMD. If a directory contains a + * BMD, then it cannot contain subdirectories with BMD. + */ + if (__os_exists(env, name, NULL) == 0) { + (void)snprintf(blob_sub, + strlen(dirs[i]) + strlen(bmd) + 2, + "%s%c%s", dirs[i], PATH_SEPARATOR[0], bmd); + if ((ret = __rep_add_files_to_list(env, blob_dir, + NULL, context, (const char **)&blob_sub, 1)) != 0) + goto err; + } else { + len = strlen(blob_dir) + strlen(dirs[i]) + 2; + (void)snprintf(name, len, "%s%c%s", + blob_dir, PATH_SEPARATOR[0], dirs[i]); + if ((ret = __os_dirlist( + env, name, 1, &subdirs, &cnt2)) != 0) + goto err; + if (name2 == NULL) { + if ((ret = __os_malloc(env, + MAX_BLOB_PATH_SZ + strlen(name), + &name2)) != 0) + goto err; + } + for (j = 0; j < cnt2; j++) { + if (IS_BLOB_FILE(subdirs[j])) + continue; + len = strlen(name) + strlen(subdirs[j]) + + strlen(BLOB_META_FILE_NAME) + 3; + (void)snprintf(name2, len, "%s%c%s%c%s", + name, PATH_SEPARATOR[0], subdirs[j], + PATH_SEPARATOR[0], BLOB_META_FILE_NAME); + if ((ret = __os_exists( + env, name2, NULL)) == 0) { + len = strlen(dirs[i]) + + strlen(subdirs[j]) + + strlen(bmd) + 3; + (void)snprintf(blob_sub, + len, "%s%c%s%c%s", dirs[i], + PATH_SEPARATOR[0], subdirs[j], + PATH_SEPARATOR[0], bmd); + if ((ret = __rep_add_files_to_list( + env, blob_dir, NULL, context, + (const char **)&blob_sub, 1)) != 0) + goto err; + } + } + __os_dirfree(env, subdirs, cnt2); + subdirs = NULL; + } + } + +err: if (name != NULL) + __os_free(env, name); + if (name2 != NULL) + __os_free(env, name2); + if (blob_dir != NULL) + __os_free(env, blob_dir); + if (dirs != NULL) + __os_dirfree(env, dirs, cnt); + if (subdirs != NULL) + __os_dirfree(env, subdirs, cnt2); + return (ret); +} + +/* * __rep_walk_dir -- * * This is the routine that walks a directory and fills in the structures @@ -284,11 +929,8 @@ __rep_walk_dir(env, dir, datadir, context) const char *dir, *datadir; FILE_LIST_CTX *context; { - __rep_fileinfo_args tmpfp; - size_t avail, len; - int cnt, first_file, i, ret; - u_int8_t uid[DB_FILE_ID_LEN]; - char *file, **names, *subdb; + int cnt, ret; + char **names; if (dir == NULL) { VPRINT(env, (env, DB_VERB_REP_SYNC, @@ -304,7 +946,34 @@ __rep_walk_dir(env, dir, datadir, context) } VPRINT(env, (env, DB_VERB_REP_SYNC, "Walk_dir: Dir %s has %d files", (dir == NULL) ? "INMEM" : dir, cnt)); + ret = __rep_add_files_to_list( + env, dir, datadir, context, (const char **)names, cnt); + + __os_dirfree(env, names, cnt); + return (ret); +} + +/* + * __rep_add_files_to_list -- + * + * Add the given files to the file list. + */ +static int +__rep_add_files_to_list(env, dir, datadir, context, names, cnt) + ENV *env; + const char *dir, *datadir; + FILE_LIST_CTX *context; + const char **names; + int cnt; +{ + __rep_fileinfo_args tmpfp; + size_t avail, len; + int first_file, i, ret; + u_int8_t uid[DB_FILE_ID_LEN]; + const char *file, *subdb; + first_file = 1; + ret = 0; for (i = 0; i < cnt; i++) { VPRINT(env, (env, DB_VERB_REP_SYNC, "Walk_dir: File %d name: %s", i, names[i])); @@ -372,15 +1041,19 @@ __rep_walk_dir(env, dir, datadir, context) DB_SET_DBT(tmpfp.uid, uid, DB_FILE_ID_LEN); retry: avail = (size_t)(&context->buf[context->size] - context->fillptr); + /* + * It is safe to cast to the old structs + * because the first part of the current + * struct matches the old structs. + */ if (context->version < DB_REPVERSION_53) - /* - * It is safe to cast to the old struct - * because the first part of the current - * struct matches the old struct. - */ ret = __rep_fileinfo_v6_marshal(env, context->version, (__rep_fileinfo_v6_args *)&tmpfp, context->fillptr, avail, &len); + else if (context->version < DB_REPVERSION_61) + ret = __rep_fileinfo_v7_marshal(env, context->version, + (__rep_fileinfo_v7_args *)&tmpfp, + context->fillptr, avail, &len); else ret = __rep_fileinfo_marshal(env, context->version, &tmpfp, context->fillptr, avail, &len); @@ -409,9 +1082,7 @@ retry: avail = (size_t)(&context->buf[context->size] - */ context->fillptr += len; } -err: - __os_dirfree(env, names, cnt); - return (ret); +err: return (ret); } /* @@ -430,7 +1101,7 @@ __rep_is_replicated_db(name, dir) /* * Remaining things that don't have a "__db" prefix are eligible. */ - if (!IS_DB_FILE(name)) + if (!IS_DB_FILE(name) || IS_BLOB_META(name)) return (1); /* Here, we know we have a "__db" name. */ @@ -470,7 +1141,7 @@ __rep_check_uid(env, rfp, uid) if (memcmp(rfp->uid.data, uid, DB_FILE_ID_LEN) == 0) { VPRINT(env, (env, DB_VERB_REP_SYNC, "Check_uid: Found matching file.")); - ret = DB_KEYEXIST; + ret = USR_ERR(env, DB_KEYEXIST); } return (ret); @@ -489,6 +1160,7 @@ __rep_get_fileinfo(env, file, subdb, rfp, uid) DB_THREAD_INFO *ip; PAGE *pagep; int lorder, ret, t_ret; + u_int32_t flags; dbp = NULL; dbc = NULL; @@ -503,11 +1175,15 @@ __rep_get_fileinfo(env, file, subdb, rfp, uid) * database handles would block the master from handling UPDATE_REQ. */ F_SET(dbp, DB_AM_RECOVER); - if ((ret = __db_open(dbp, ip, NULL, file, subdb, DB_UNKNOWN, - DB_RDONLY | (F_ISSET(env, ENV_THREAD) ? DB_THREAD : 0), - 0, PGNO_BASE_MD)) != 0) + flags = DB_RDONLY | (F_ISSET(env, ENV_THREAD) ? DB_THREAD : 0); + if (file != NULL && IS_BLOB_META(file)) + LF_SET(DB_INTERNAL_BLOB_DB); + if ((ret = __db_open(dbp, ip, NULL, + file, subdb, DB_UNKNOWN, flags, 0, PGNO_BASE_MD)) != 0) goto err; + SET_LO_HI_VAR(dbp->blob_file_id, rfp->blob_fid_lo, rfp->blob_fid_hi); + if ((ret = __db_cursor(dbp, ip, NULL, &dbc, 0)) != 0) goto err; if ((ret = __memp_fget(dbp->mpf, &dbp->meta_pgno, ip, dbc->txn, @@ -574,6 +1250,7 @@ __rep_page_req(env, ip, eid, rp, rec) { __rep_fileinfo_args *msgfp, msgf; __rep_fileinfo_v6_args *msgfpv6; + __rep_fileinfo_v7_args *msgfpv7; DB_MPOOLFILE *mpf; DB_REP *db_rep; REP *rep; @@ -584,21 +1261,30 @@ __rep_page_req(env, ip, eid, rp, rec) db_rep = env->rep_handle; rep = db_rep->region; + /* + * Build a current struct by copying in the older + * version struct and then setting up the new fields. + * This is safe because all old fields are in the + * same location in the current struct. + */ if (rp->rep_version < DB_REPVERSION_53) { - /* - * Build a current struct by copying in the older - * version struct and then setting up the data_dir. - * This is safe because all old fields are in the - * same location in the current struct. - */ if ((ret = __rep_fileinfo_v6_unmarshal(env, rp->rep_version, &msgfpv6, rec->data, rec->size, &next)) != 0) return (ret); memcpy(&msgf, msgfpv6, sizeof(__rep_fileinfo_v6_args)); msgf.dir.data = NULL; msgf.dir.size = 0; + msgf.blob_fid_lo = msgf.blob_fid_hi = 0; msgfp = &msgf; msgfree = msgfpv6; + } else if (rp->rep_version < DB_REPVERSION_61) { + if ((ret = __rep_fileinfo_v7_unmarshal(env, rp->rep_version, + &msgfpv7, rec->data, rec->size, &next)) != 0) + return (ret); + memcpy(&msgf, msgfpv7, sizeof(__rep_fileinfo_v7_args)); + msgf.blob_fid_lo = msgf.blob_fid_hi = 0; + msgfp = &msgf; + msgfree = msgfpv7; } else { if ((ret = __rep_fileinfo_unmarshal(env, rp->rep_version, &msgfp, rec->data, rec->size, &next)) != 0) @@ -624,7 +1310,7 @@ __rep_page_req(env, ip, eid, rp, rec) (void)__rep_send_message(env, eid, REP_FILE_FAIL, NULL, rec, 0, 0); else - ret = DB_NOTFOUND; + ret = USR_ERR(env, DB_NOTFOUND); goto err; } @@ -738,7 +1424,7 @@ __rep_page_sendpages(env, ip, eid, rp, msgfp, mpf, dbp) #ifdef HAVE_QUEUE if ((ret = __qam_fget(qdbc, &p, 0, &pagep)) == ENOENT) #endif - ret = DB_PAGE_NOTFOUND; + ret = USR_ERR(env, DB_PAGE_NOTFOUND); } else ret = __memp_fget(mpf, &p, ip, NULL, 0, &pagep); msgfp->pgno = p; @@ -748,16 +1434,21 @@ __rep_page_sendpages(env, ip, eid, rp, msgfp, mpf, dbp) RPRINT(env, (env, DB_VERB_REP_SYNC, "sendpages: PAGE_FAIL on page %lu", (u_long)p)); + /* + * It is safe to cast to the old structs + * because the first part of the current + * struct matches the old structs. + */ if (rp->rep_version < DB_REPVERSION_53) - /* - * It is safe to cast to the old struct - * because the first part of the current - * struct matches the old struct. - */ ret = __rep_fileinfo_v6_marshal(env, rp->rep_version, (__rep_fileinfo_v6_args *)msgfp, buf, msgsz, &len); + else if (rp->rep_version < DB_REPVERSION_61) + ret = __rep_fileinfo_v7_marshal(env, + rp->rep_version, + (__rep_fileinfo_v7_args *)msgfp, + buf, msgsz, &len); else ret = __rep_fileinfo_marshal(env, rp->rep_version, msgfp, buf, @@ -772,7 +1463,7 @@ __rep_page_sendpages(env, ip, eid, rp, msgfp, mpf, dbp) REP_PAGE_FAIL, &lsn, &msgdbt, 0, 0); continue; } else - ret = DB_NOTFOUND; + ret = USR_ERR(env, DB_NOTFOUND); goto err; } else if (ret != 0) goto err; @@ -796,16 +1487,21 @@ __rep_page_sendpages(env, ip, eid, rp, msgfp, mpf, dbp) RPRINT(env, (env, DB_VERB_REP_SYNC, "sendpages: %lu, page lsn [%lu][%lu]", (u_long)p, (u_long)pagep->lsn.file, (u_long)pagep->lsn.offset)); + /* + * It is safe to cast to the old structs + * because the first part of the current + * structs matches the old struct. + */ if (rp->rep_version < DB_REPVERSION_53) - /* - * It is safe to cast to the old struct - * because the first part of the current - * struct matches the old struct. - */ ret = __rep_fileinfo_v6_marshal(env, rp->rep_version, (__rep_fileinfo_v6_args *)msgfp, buf, msgsz, &len); + else if (rp->rep_version < DB_REPVERSION_61) + ret = __rep_fileinfo_v7_marshal(env, + rp->rep_version, + (__rep_fileinfo_v7_args *)msgfp, + buf, msgsz, &len); else ret = __rep_fileinfo_marshal(env, rp->rep_version, msgfp, buf, msgsz, &len); @@ -1010,7 +1706,8 @@ __rep_update_setup(env, eid, rp, rec, savetime, lsn) ZERO_LSN(lp->waiting_lsn); ZERO_LSN(lp->max_wait_lsn); ZERO_LSN(lp->max_perm_lsn); - if (db_rep->rep_db == NULL) + ret = __rep_blob_cleanup(env, rep); + if (ret == 0 && db_rep->rep_db == NULL) ret = __rep_client_dbinit(env, 0, REP_DB); MUTEX_UNLOCK(env, rep->mtx_clientdb); if (ret != 0) @@ -1148,6 +1845,337 @@ err: /* return (ret); } +/* + * __rep_blob_update + * Prepare to receive blob file data by setting up the blob gap database, + * then requesting the blob file data. + * + * PUBLIC: int __rep_blob_update __P((ENV *, int, DB_THREAD_INFO *, DBT *)); + */ +int +__rep_blob_update(env, eid, ip, rec) + ENV *env; + int eid; + DB_THREAD_INFO *ip; + DBT *rec; +{ + DBC *dbc; + DB_REP *db_rep; + DBT data, key; + REP *rep; + REGINFO *infop; + __rep_blob_file_args rbf; + __rep_blob_update_args rbu; + __rep_fileinfo_args *rfp; + db_seq_t blob_fid; + int ret; + off_t offset; + size_t len; + u_int32_t num_blobs; + u_int8_t keybuf[BLOB_KEY_SIZE], *ptr; + + db_rep = env->rep_handle; + rep = db_rep->region; + infop = env->reginfo; + rfp = NULL; + dbc = NULL; + memset(&rbu, 0, sizeof(__rep_blob_update_args)); + memset(&rbf, 0, sizeof(__rep_blob_file_args)); + + if ((ret = __rep_blob_update_unmarshal( + env, &rbu, rec->data, rec->size, &ptr)) != 0) + return (ret); + len = rec->size - __REP_BLOB_UPDATE_SIZE; + + RPRINT(env, (env, DB_VERB_REP_SYNC, +"blob_update: file_id %llu, num_blobs %lu, flags %lu, highest %llu", + (long long)rbu.blob_fid, (long)rbu.num_blobs, + (unsigned long)rbu.flags, (long long)rbu.highest_id)); + + MUTEX_LOCK(env, rep->mtx_clientdb); + REP_SYSTEM_LOCK(env); + + /* + * Check if the world changed. + */ + if (rep->sync_state != SYNC_PAGE) + goto unlock; + + /* Make sure this is for the current database. */ + GET_CURINFO(rep, infop, rfp); + GET_LO_HI(env, rfp->blob_fid_lo, rfp->blob_fid_hi, blob_fid, ret); + if (ret != 0) + goto unlock; + + if (blob_fid != (db_seq_t)rbu.blob_fid) + goto unlock; + + rep->highest_id = (db_seq_t)rbu.highest_id; + /* + * For each blob file, add an entry to the database for each 1 MB + * section of that file. The entries will be deleted as the + * coresponding blob chunks arrive and are written to disk. + */ + if (db_rep->blob_dbp == NULL && + (ret = __rep_client_dbinit(env, 0, REP_BLOB)) != 0) + goto unlock; + + if ((ret = __db_cursor(db_rep->blob_dbp, ip, NULL, &dbc, 0)) != 0) + goto unlock; + + /* + * Make sure no one else has populated the database, this could happen + * if the update message is sent twice. + */ + memset(&key, 0, sizeof(DBT)); + memset(&data, 0, sizeof(DBT)); + if ((ret = __dbc_get(dbc, &key, &data, DB_FIRST)) != DB_NOTFOUND) + goto unlock; + + /* It is possible for a blob database to have no blobs. */ + if (rbu.num_blobs == 0) { + (void)__dbc_close(dbc); + dbc = NULL; + rep->blob_more_files = 0; + rep->gap_bl_hi_id = rep->gap_bl_hi_sid = 0; + rep->last_blob_id = rep->last_blob_sid = 0; + rep->prev_blob_id = rep->prev_blob_sid = 0; + rep->gap_bl_hi_off = 0; + rep->blob_sync = 0; + rep->highest_id = 0; + rep->blob_rereq = 0; + ret = __rep_blobdone(env, eid, ip, rep, blob_fid, 0); + goto unlock; + } + + memset(&key, 0, sizeof(DBT)); + memset(&data, 0, sizeof(DBT)); + data.flags = key.flags = DB_DBT_USERMEM; + key.data = keybuf; + key.ulen = key.size = BLOB_KEY_SIZE; + data.data = (void *)&offset; + data.ulen = data.size = sizeof(offset); + num_blobs = 0; + while (num_blobs < rbu.num_blobs) { + if ((ret = + __rep_blob_file_unmarshal(env, &rbf, ptr, len, &ptr)) != 0) + goto unlock; + len -= __REP_BLOB_FILE_SIZE; + + RPRINT(env, (env, DB_VERB_REP_SYNC, + "blob_update adding file: blob_id %llu, sdb_id %llu, blob_size %llu", + (long long)rbf.blob_id, (long long)rbf.blob_sid, + (long long)rbf.blob_size)); + + memcpy(keybuf, &rbf.blob_sid, BLOB_ID_SIZE); + memcpy(&(keybuf[BLOB_ID_SIZE]), &rbf.blob_id, BLOB_ID_SIZE); + offset = 0; + /* + * Add an entry for each megabyte of the blob file. Zero + * length blob files should have at least one entry. + */ + do { + if ((ret = __dbc_put(dbc, &key, &data, 0)) != 0) + goto unlock; + offset += MEGABYTE; + /* + * Check for overflow, this can happen when the master + * supports 64 file offsets, but the client does not. + */ + if (offset < 0) { + __db_errx(env, + DB_STR("3704", + "Blob file offset overflow")); + ret = EINVAL; + goto unlock; + } + } while ((u_int32_t)offset < rbf.blob_size); + num_blobs++; + } + /* Set whether there are more files after the ones on the list. */ + if (F_ISSET(&rbu, BLOB_DONE)) + rep->blob_more_files = 0; + else + rep->blob_more_files = 1; + rep->prev_blob_id = rep->last_blob_id; + rep->prev_blob_sid = rep->last_blob_sid; + rep->last_blob_sid = (db_seq_t)rbf.blob_sid; + rep->last_blob_id = (db_seq_t)rbf.blob_id; + + /* + * Send the same message payload in a REP_BLOB_ALL_REQ message to get + * the blob data. Peer-to-peer initialization is not supported for + * blobs, so we can only send this back to the master despite the fact + * that building the list of blob files is expensive. + */ + (void)__rep_send_message( + env, rep->master_id, REP_BLOB_ALL_REQ, NULL, rec, 0, 0); + +unlock: REP_SYSTEM_UNLOCK(env); + MUTEX_UNLOCK(env, rep->mtx_clientdb); + if (dbc != NULL) + (void)__dbc_close(dbc); + + return (ret); +} + +/* + * __rep_blob_allreq + * Request blob file data. + * + * PUBLIC: int __rep_blob_allreq __P((ENV *, int, DBT *)); + */ +int +__rep_blob_allreq(env, eid, rec) + ENV *env; + int eid; + DBT *rec; +{ + DB *dbp; + DB_FH *fhp; + DBT msg; + __rep_blob_chunk_args rbc; + __rep_blob_file_args rbf; + __rep_blob_update_args rbu; + db_seq_t old_sdb_id; + int done, ret; + off_t offset; + size_t len; + u_int32_t num_blobs; + u_int8_t *chunk_buf, *msg_buf, *ptr; + + dbp = NULL; + fhp = NULL; + chunk_buf = msg_buf = NULL; + memset(&rbu, 0, sizeof(__rep_blob_update_args)); + memset(&rbc, 0, sizeof(__rep_blob_chunk_args)); + memset(&msg, 0, sizeof(DBT)); + + if ((ret = + __os_malloc(env, MEGABYTE + __REP_BLOB_CHUNK_SIZE, &msg_buf)) != 0) + goto err; + msg.data = msg_buf; + msg.ulen = MEGABYTE + __REP_BLOB_CHUNK_SIZE; + if ((ret = __os_malloc(env, MEGABYTE, &chunk_buf)) != 0) + goto err; + rbc.data.data = chunk_buf; + rbc.data.ulen = MEGABYTE; + rbc.data.flags = DB_DBT_USERMEM; + + /* + * The REP_BLOB_ALL_REQ message sends the REP_BLOB_UPDATE message + * payload back to the master to request the actual blobs after the + * client has prepared itself to receive them. + */ + len = rec->size; + if ((ret = __rep_blob_update_unmarshal( + env, &rbu, rec->data, rec->size, &ptr)) != 0) + goto err; + len -= __REP_BLOB_UPDATE_SIZE; + + RPRINT(env, (env, DB_VERB_REP_SYNC, + "blob_all_req: file_id %llu, num_blobs %lu, flags %lu", + (long long)rbu.blob_fid, (long)rbu.num_blobs, + (unsigned long)rbu.flags)); + + if ((ret = __db_create_internal(&dbp, env, 0)) != 0) + goto err; + dbp->blob_file_id = (db_seq_t)rbu.blob_fid; + rbc.blob_fid = rbu.blob_fid; + num_blobs = 0; + /* + * The list of files to send is included in the message, go + * through the list and send each file in pieces. + */ + while (num_blobs < rbu.num_blobs) { + num_blobs++; + if ((ret = __rep_blob_file_unmarshal( + env, &rbf, ptr, len, &ptr)) != 0) + goto err; + len -= __REP_BLOB_FILE_SIZE; + old_sdb_id = dbp->blob_sdb_id; + dbp->blob_sdb_id = (db_seq_t)rbf.blob_sid; + rbc.flags = 0; + rbc.blob_sid = rbf.blob_sid; + rbc.blob_id = rbf.blob_id; + /* Free the sub-directory information if it has changed. */ + if (old_sdb_id != dbp->blob_sdb_id && + dbp->blob_sub_dir != NULL) { + __os_free(env, dbp->blob_sub_dir); + dbp->blob_sub_dir = NULL; + } + if (dbp->blob_sub_dir == NULL) { + if ((ret = __blob_make_sub_dir(env, &dbp->blob_sub_dir, + dbp->blob_file_id, dbp->blob_sdb_id)) != 0) + goto err; + } + if ((ret = __blob_file_open(dbp, + &fhp, (db_seq_t)rbf.blob_id, DB_FOP_READONLY, 0)) != 0) { + /* + * The file may have been deleted between creating the + * list and sending the data. Send a message saying + * the file has been deleted. + */ + if (ret == ENOENT) { + F_SET(&rbc, BLOB_DELETE); + rbc.data.size = 0; + __rep_blob_chunk_marshal(env, &rbc, msg.data); + msg.size = __REP_BLOB_CHUNK_SIZE; + (void)__rep_send_message(env, + eid, REP_BLOB_CHUNK, NULL, &msg, 0, 0); + ret = 0; + fhp = NULL; + continue; + } + goto err; + } + offset = 0; + do { + done = 0; + rbc.flags = 0; + if ((ret = __blob_file_read( + env, fhp, &rbc.data, offset, MEGABYTE)) != 0) + goto err; + DB_ASSERT(env, rbc.data.size <= MEGABYTE); + + /* + * In rare cases the blob file may have gotten shorter + * since the list was created. + */ + if (rbc.data.size < (u_int32_t)MEGABYTE && (u_int64_t) + (offset + rbc.data.size) < rbf.blob_size) { + F_SET(&rbc, BLOB_CHUNK_FAIL); + done = 1; + } + /* File may have grown since the list was made. */ + if ((u_int64_t) + (offset + rbc.data.size) > rbf.blob_size) { + rbc.data.size = + (u_int32_t)((off_t)rbf.blob_size - offset); + } + rbc.offset = (u_int64_t)offset; + __rep_blob_chunk_marshal(env, &rbc, msg.data); + msg.size = __REP_BLOB_CHUNK_SIZE + rbc.data.size; + (void)__rep_send_message( + env, eid, REP_BLOB_CHUNK, NULL, &msg, 0, 0); + offset += MEGABYTE; + } while ((u_int64_t)offset < rbf.blob_size && !done); + + if (fhp != NULL && (ret = __os_closehandle(env, fhp)) != 0) + goto err; + fhp = NULL; + } +err: if (chunk_buf != NULL) + __os_free(env, chunk_buf); + if (msg_buf != NULL) + __os_free(env, msg_buf); + if (fhp != NULL) + (void)__os_closehandle(env, fhp); + if (dbp != 0) + (void)__db_close(dbp, NULL, 0); + return (ret); +} + static int __rep_find_inmem(env, rfp, unused) ENV *env; @@ -1157,6 +2185,11 @@ __rep_find_inmem(env, rfp, unused) COMPQUIET(env, NULL); COMPQUIET(unused, NULL); + /* + * Cannot assume all databases are in-memory because abbreviated + * internal inits from 5.3 and earlier are not limited to in-memory + * databases. + */ return (FLD_ISSET(rfp->db_flags, DB_AM_INMEM) ? DB_KEYEXIST : 0); } @@ -1172,12 +2205,9 @@ __rep_remove_nimdbs(env) FILE_LIST_CTX context; int ret; - if ((ret = __os_calloc(env, 1, MEGABYTE, &context.buf)) != 0) + if ((ret = __rep_init_file_list_context(env, + DB_REPVERSION, 0, 0, &context)) != 0) return (ret); - context.size = MEGABYTE; - context.count = 0; - context.fillptr = context.buf; - context.version = DB_REPVERSION; /* NB: "NULL" asks walk_dir to consider only in-memory DBs */ if ((ret = __rep_walk_dir(env, NULL, NULL, &context)) != 0) @@ -1240,14 +2270,11 @@ __rep_remove_all(env, msg_version, rec) * 1. Get list of databases currently present at this client, which we * intend to remove. */ - if ((ret = __os_calloc(env, 1, MEGABYTE, &context.buf)) != 0) - return (ret); - context.size = MEGABYTE; - context.count = 0; - context.version = DB_REPVERSION; /* Reserve space for the marshaled update_args. */ - context.fillptr = FIRST_FILE_PTR(context.buf); + if ((ret = __rep_init_file_list_context(env, + DB_REPVERSION, 0, 1, &context)) != 0) + return (ret); if ((ret = __rep_find_dbs(env, &context)) != 0) goto out; @@ -1333,6 +2360,9 @@ __rep_remove_all(env, msg_version, rec) FIRST_FILE_PTR(context.buf), context.size, context.count, __rep_remove_file, NULL)) != 0) goto out; + /* Remove the blob directory. */ + if ((ret = __blob_del_hierarchy(env)) != 0) + goto out; /* * 4. Safe-store the (new) list of database files we intend to copy from @@ -1445,6 +2475,8 @@ __rep_remove_file(env, rfp, unused) #ifdef HAVE_QUEUE DB_THREAD_INFO *ip; #endif + APPNAME appname; + db_seq_t blob_fid, blob_sid; char *name; int ret, t_ret; @@ -1496,29 +2528,53 @@ __rep_remove_file(env, rfp, unused) * That will only have removed extent files. Now * we need to deal with the actual file itself. */ + appname = __rep_is_internal_rep_file(rfp->info.data) ? + DB_APP_META : (IS_BLOB_META(rfp->info.data) ? + DB_APP_BLOB : DB_APP_DATA); if (FLD_ISSET(rfp->db_flags, DB_AM_INMEM)) { if ((ret = __db_create_internal(&dbp, env, 0)) != 0) return (ret); MAKE_INMEM(dbp); F_SET(dbp, DB_AM_RECOVER); /* Skirt locking. */ ret = __db_inmem_remove(dbp, NULL, name); - } else if ((ret = __fop_remove(env, - NULL, rfp->uid.data, name, (const char **)&rfp->dir.data, - __rep_is_internal_rep_file(rfp->info.data) ? - DB_APP_META : DB_APP_DATA, 0)) != 0) + } else if ((ret = __fop_remove(env, NULL, rfp->uid.data, name, + (const char **)&rfp->dir.data, appname, 0)) != 0) { /* * If fop_remove fails, it could be because * the client has a different data_dir * structure than the master. Retry with the - * local, default settings. + * local, default settings. */ ret = __fop_remove(env, - NULL, rfp->uid.data, name, NULL, - __rep_is_internal_rep_file(rfp->info.data) ? - DB_APP_META : DB_APP_DATA, 0); -#ifdef HAVE_QUEUE -out: + NULL, rfp->uid.data, name, NULL, appname, 0); +#ifdef DB_WIN32 + /* + * Deleting a blob meta database can result in a + * ERROR_PATH_NOT_FOUND error on windows, so treat + * that as an ENOENT. + */ + if (__os_posix_err(ret) == ENOENT) + ret = ENOENT; #endif + } + /* Clean any blob directories. */ + if (ret == 0 && appname == DB_APP_BLOB) { + /* dbp has not been set, since queues do not support blobs. */ + DB_ASSERT(env, dbp == NULL); + if ((ret = __db_create_internal(&dbp, env, 0)) != 0) + goto out; + if ((ret = __blob_path_to_dir_ids( + env, name, &blob_fid, &blob_sid)) != 0) + goto out; + /* blob_fid == 0 if it is the top level blob meta db. */ + if (blob_fid != 0) { + dbp->blob_file_id = blob_fid; + dbp->blob_sdb_id = blob_sid; + if ((ret = __blob_del_all(dbp, NULL, 0)) != 0) + goto out; + } + } +out: if (dbp != NULL && (t_ret = __db_close(dbp, NULL, DB_NOSYNC)) != 0 && ret == 0) ret = t_ret; @@ -1610,10 +2666,11 @@ __rep_page(env, ip, eid, rp, rec) { DB_REP *db_rep; - DBT key, data; + DBT data, key; REP *rep; __rep_fileinfo_args *msgfp, msgf; __rep_fileinfo_v6_args *msgfpv6; + __rep_fileinfo_v7_args *msgfpv7; db_recno_t recno; int ret; char *msg; @@ -1647,21 +2704,30 @@ __rep_page(env, ip, eid, rp, rec) (u_long)rep->first_lsn.offset)); return (DB_REP_PAGEDONE); } + /* + * Build a current struct by copying in the older + * version struct and then setting up the new fields. + * This is safe because all old fields are in the + * same location in the current struct. + */ if (rp->rep_version < DB_REPVERSION_53) { - /* - * Build a current struct by copying in the older - * version struct and then setting up the data_dir. - * This is safe because all old fields are in the - * same location in the current struct. - */ if ((ret = __rep_fileinfo_v6_unmarshal(env, rp->rep_version, &msgfpv6, rec->data, rec->size, NULL)) != 0) return (ret); memcpy(&msgf, msgfpv6, sizeof(__rep_fileinfo_v6_args)); msgf.dir.data = NULL; msgf.dir.size = 0; + msgf.blob_fid_lo = msgf.blob_fid_hi = 0; msgfp = &msgf; msgfree = msgfpv6; + } else if (rp->rep_version < DB_REPVERSION_61) { + if ((ret = __rep_fileinfo_v7_unmarshal(env, rp->rep_version, + &msgfpv7, rec->data, rec->size, NULL)) != 0) + return (ret); + memcpy(&msgf, msgfpv7, sizeof(__rep_fileinfo_v7_args)); + msgf.blob_fid_lo = msgf.blob_fid_hi = 0; + msgfp = &msgf; + msgfree = msgfpv7; } else { if ((ret = __rep_fileinfo_unmarshal(env, rp->rep_version, &msgfp, rec->data, rec->size, NULL)) != 0) @@ -1671,9 +2737,9 @@ __rep_page(env, ip, eid, rp, rec) MUTEX_LOCK(env, rep->mtx_clientdb); REP_SYSTEM_LOCK(env); /* - * Check if the world changed. + * Check if the world changed or if we are in the blob sync phase. */ - if (rep->sync_state != SYNC_PAGE) { + if (rep->sync_state != SYNC_PAGE || rep->blob_sync != 0) { ret = DB_REP_PAGEDONE; goto err; } @@ -1785,6 +2851,218 @@ err: REP_SYSTEM_UNLOCK(env); } /* + * __rep_blob_chunk + * Process a blob chunk message. When a blob chunk arrives, delete its + * entry in the blob chunk gap database to show that it has arrived, and + * write the data to the blob file. + * + * PUBLIC: int __rep_blob_chunk __P((ENV *, int, DB_THREAD_INFO *, DBT *)); + */ +int +__rep_blob_chunk(env, eid, ip, rec) + ENV *env; + int eid; + DB_THREAD_INFO *ip; + DBT *rec; +{ + DB_REP *db_rep; + DBC *dbc; + DB_FH *fhp; + DBT data, key; + REP *rep; + REGINFO *infop; + __rep_blob_chunk_args rbc; + __rep_fileinfo_args *rfp; + db_seq_t blob_fid; + char *blob_sub_dir, *last, *mkpath, *name, *path; + int ret; + off_t offset; + u_int8_t keybuf[BLOB_KEY_SIZE], *ptr; + + ret = 0; + db_rep = env->rep_handle; + rep = db_rep->region; + infop = env->reginfo; + dbc = NULL; + blob_sub_dir = name = NULL; + path = NULL; + fhp = NULL; + + if (rep->sync_state != SYNC_PAGE) + return (DB_REP_PAGEDONE); + + if ((ret = __rep_blob_chunk_unmarshal( + env, &rbc, rec->data, rec->size, &ptr)) != 0) + return (ret); + + MUTEX_LOCK(env, rep->mtx_clientdb); + REP_SYSTEM_LOCK(env); + /* + * Check if the world changed. + */ + if (rep->sync_state != SYNC_PAGE) { + ret = DB_REP_PAGEDONE; + goto err; + } + /* + * We should not ever be in internal init with a lease granted. + */ + DB_ASSERT(env, + !IS_USING_LEASES(env) || __rep_islease_granted(env) == 0); + + /* Make sure this is for the current file. */ + GET_CURINFO(rep, infop, rfp); + GET_LO_HI(env, rfp->blob_fid_lo, rfp->blob_fid_hi, blob_fid, ret); + if (ret != 0) + goto err; + + if (blob_fid != (db_seq_t)rbc.blob_fid) { + ret = DB_REP_PAGEDONE; + goto err; + } + + RPRINT(env, (env, DB_VERB_REP_SYNC, +"REP_BLOB_CHUNK: blob_fid %llu, blob_sid %llu, blob_id %llu, offset %llu", + (unsigned long long)rbc.blob_fid, + (unsigned long long)rbc.blob_sid, + (unsigned long long)rbc.blob_id, (long long)rbc.offset)); + + if (db_rep->blob_dbp == NULL && + (ret = __rep_client_dbinit(env, 0, REP_BLOB)) != 0) { + RPRINT(env, (env, DB_VERB_REP_SYNC, + "REP_BLOB_CHUNK: Client_dbinit %s", + db_strerror(ret))); + goto err; + } + + /* Set the highest blob chunk received. */ + if (rbc.blob_sid > (u_int64_t)rep->gap_bl_hi_sid || + (rbc.blob_sid == (u_int64_t)rep->gap_bl_hi_sid && + rbc.blob_id > (u_int64_t)rep->gap_bl_hi_id) || + (rbc.blob_sid == (u_int64_t)rep->gap_bl_hi_sid && + rbc.blob_id == (u_int64_t)rep->gap_bl_hi_id && + rbc.offset > (u_int64_t)rep->gap_bl_hi_off)) { + rep->gap_bl_hi_id = (db_seq_t)rbc.blob_id; + rep->gap_bl_hi_sid = (db_seq_t)rbc.blob_sid; + rep->gap_bl_hi_off = (off_t)rbc.offset; + } + + memset(&key, 0, sizeof(DBT)); + memset(&data, 0, sizeof(DBT)); + data.flags = key.flags = DB_DBT_USERMEM; + key.data = keybuf; + key.ulen = key.size = BLOB_KEY_SIZE; + data.data = (void *)&offset; + data.ulen = data.size = sizeof(offset); + /* BLOB_DELETE is set if the blob file was deleted. */ + if (F_ISSET(&rbc, BLOB_DELETE)) { + memcpy(keybuf, &rbc.blob_sid, BLOB_ID_SIZE); + memcpy(&(keybuf[BLOB_ID_SIZE]), &rbc.blob_id, BLOB_ID_SIZE); + if ((ret = __db_del( + db_rep->blob_dbp, ip, NULL, &key, 0)) != 0) { + if (ret == DB_NOTFOUND) + ret = 0; + goto err; + } + goto done; + } + + if ((ret = __db_cursor(db_rep->blob_dbp, ip, NULL, &dbc, 0)) != 0) + goto err; + offset = (off_t)rbc.offset; + memcpy(keybuf, &rbc.blob_sid, BLOB_ID_SIZE); + memcpy(&(keybuf[BLOB_ID_SIZE]), &rbc.blob_id, BLOB_ID_SIZE); + /* If not found we have already dealt with this chunk. */ + if ((ret = __dbc_get(dbc, &key, &data, DB_GET_BOTH)) != 0) { + if (ret == DB_NOTFOUND) { + ret = 0; + goto done; + } + goto err; + } + /* + * BLOB_CHUNK_FAIL is set if the blob file was truncated to shorter + * than the BLOB_CHUNK offset. + */ + if (F_ISSET(&rbc, BLOB_CHUNK_FAIL)) { + while (ret == 0) { + if ((ret = __dbc_del(dbc, 0)) != 0) + goto err; + ret = __dbc_get(dbc, &key, &data, DB_NEXT_DUP); + } + if (ret == DB_NOTFOUND) + ret = 0; + if ((ret = __dbc_close(dbc)) != 0) + goto err; + dbc = NULL; + goto done; + } + if ((ret = __dbc_del(dbc, 0)) != 0) + goto err; + if ((ret = __dbc_close(dbc)) != 0) + goto err; + dbc = NULL; + + if ((ret = __blob_make_sub_dir(env, &blob_sub_dir, + (db_seq_t)rbc.blob_fid, (db_seq_t)rbc.blob_sid)) != 0) + goto err; + + if ((ret = __blob_id_to_path( + env, blob_sub_dir, (db_seq_t)rbc.blob_id, &name)) != 0) + goto err; + + if ((ret = __db_appname(env, DB_APP_BLOB, name, NULL, &path)) != 0 ) + goto err; + + last = __db_rpath(path); + DB_ASSERT(env, last != NULL); + *last = '\0'; + if (__os_exists(env, path, NULL) != 0) { + *last = PATH_SEPARATOR[0]; + mkpath = path; +#ifdef DB_WIN32 + /* + * Absolute paths on windows can result in it creating a "C" + * or "D" directory in the working directory. + */ + if (__os_abspath(mkpath)) + mkpath += 2; +#endif + if ((ret = __db_mkpath(env, mkpath)) != 0) + goto err; + } + *last = PATH_SEPARATOR[0]; + if ((ret = __os_open( + env, path, 0, DB_OSO_CREATE, env->db_mode, &fhp)) != 0) + goto err; + + /* Write the data into the blob file. */ + if ((ret = __fop_write_file(env, NULL, name, NULL, DB_APP_BLOB, + fhp, (off_t)rbc.offset, rbc.data.data, rbc.data.size, 0)) != 0) + goto err; + if ((ret = __os_closehandle(env, fhp)) != 0) + goto err; + fhp = NULL; + +done: ret = __rep_blobdone(env, eid, ip, rep, blob_fid, 0); + +err: REP_SYSTEM_UNLOCK(env); + MUTEX_UNLOCK(env, rep->mtx_clientdb); + if (path != NULL) + __os_free(env, path); + if (blob_sub_dir != NULL) + __os_free(env, blob_sub_dir); + if (name != NULL) + __os_free(env, name); + if (fhp != NULL) + (void)__os_closehandle(env, fhp); + if (dbc != NULL) + (void)__dbc_close(dbc); + + return (ret); +} + +/* * __rep_write_page - * Write this page into a database. */ @@ -1801,13 +3079,16 @@ __rep_write_page(env, ip, rep, msgfp) DB_PGINFO *pginfo; DB_REP *db_rep; REGINFO *infop; + APPNAME appname; __rep_fileinfo_args *rfp; + char *blob_path; int ret; void *dst; db_rep = env->rep_handle; infop = env->reginfo; rfp = NULL; + blob_path = NULL; /* * If this is the first page we're putting in this database, we need @@ -1830,15 +3111,39 @@ __rep_write_page(env, ip, rep, msgfp) RPRINT(env, (env, DB_VERB_REP_SYNC, "rep_write_page: Calling fop_create for %s", (char *)rfp->info.data)); + appname = (__rep_is_internal_rep_file(rfp->info.data) ? + DB_APP_META : (IS_BLOB_META((char *)rfp->info.data) + ? DB_APP_BLOB : DB_APP_DATA)); + /* + * May have to create the directory structure for blob + * metadata databases. + */ + if (appname == DB_APP_BLOB) { + if ((ret = __db_appname(env, + appname, rfp->info.data, + (const char **)&rfp->dir.data, + &blob_path)) != 0) + goto err; +#ifdef DB_WIN32 + /* + * Absolute paths on windows can result in + * it creating a "C" or "D" + * directory in the working directory. + */ + if (__os_abspath(blob_path)) + blob_path += 2; +#endif + if ((ret = __db_mkpath(env, blob_path)) != 0) + goto err; + } if ((ret = __fop_create(env, NULL, NULL, rfp->info.data, (const char **)&rfp->dir.data, - __rep_is_internal_rep_file(rfp->info.data) ? - DB_APP_META : DB_APP_DATA, env->db_mode, 0)) != 0) { + appname, env->db_mode, 0)) != 0) { /* * If fop_create fails, it could be because * the client has a different data_dir * structure than the master. Retry with the - * local, default settings. + * local, default settings. */ RPRINT(env, (env, DB_VERB_REP_SYNC, "rep_write_page: fop_create ret %d. Retry for %s, master datadir %s", @@ -1929,7 +3234,10 @@ __rep_write_page(env, ip, rep, msgfp) ret = __memp_fput(db_rep->file_mpf, ip, dst, db_rep->file_dbp->priority); -err: return (ret); +err: if (blob_path != NULL) + __os_free(env, blob_path); + + return (ret); } /* @@ -1976,7 +3284,7 @@ __rep_page_gap(env, rep, msgfp, type) * Make sure we're still talking about the same file. * If not, we're done here. */ - if (rfp->filenum != msgfp->filenum) { + if (rfp->filenum != msgfp->filenum || rep->blob_sync != 0) { ret = DB_REP_PAGEDONE; goto err; } @@ -2135,6 +3443,53 @@ err: } /* + * __rep_blob_cleanup - + * Clean up blob internal init information. + * + * Caller must hold client database mutex (mtx_clientdb) and + * REP_SYSTEM_LOCK. + */ +static int +__rep_blob_cleanup(env, rep) + ENV *env; + REP *rep; +{ + DB_REP *db_rep; + DB_THREAD_INFO *ip; + int ret, t_ret; + u_int32_t count; + + ret = 0; + db_rep = env->rep_handle; + + /* + * Delete any remaining records in the blob chunk database. The blob + * chunk database contains descriptions of the blob chunks that have + * yet to arrive. If not deleted, the remaining records could + * interfere with how the next REP_BLOB_UPDATE message is handled. + */ + if (db_rep->blob_dbp != NULL) { + ENV_GET_THREAD_INFO(env, ip); + ret = __db_truncate(db_rep->blob_dbp, ip, NULL, &count); + t_ret = __db_close(db_rep->blob_dbp, NULL, DB_NOSYNC); + if (ret == 0) + ret = t_ret; + db_rep->blob_dbp = NULL; + } + /* Reset blob internal init control values. */ + rep->gap_bl_hi_id = rep->gap_bl_hi_sid = 0; + rep->last_blob_id = rep->last_blob_sid = 0; + rep->prev_blob_id = rep->prev_blob_sid = 0; + rep->gap_bl_hi_off = 0; + rep->blob_more_files = 0; + rep->blob_sync = 0; + rep->highest_id = 0; + rep->blob_rereq = 0; + + return (ret); +} + +/* * __rep_init_cleanup - * Clean up internal initialization pieces. * @@ -2162,9 +3517,10 @@ __rep_init_cleanup(env, rep, force) /* * 1. Close up the file data pointer we used. * 2. Close/reset the page database. - * 3. Close/reset the queue database if we're forcing a cleanup. - * 4. Free current file info. - * 5. If we have all files or need to force, free original file info. + * 3. Close/truncate the blob chunk gap database. + * 4. Close/reset the queue database if we're forcing a cleanup. + * 5. Free current file info. + * 6. If we have all files or need to force, free original file info. */ if (db_rep->file_mpf != NULL) { ret = __memp_fclose(db_rep->file_mpf, 0); @@ -2176,6 +3532,15 @@ __rep_init_cleanup(env, rep, force) if (ret == 0) ret = t_ret; } + /* + * Truncate the blob chunk gap database, since entries in the database + * are for blob chunks we are expecting to arrive. Also reset blob + * internal init control values. + */ + t_ret = __rep_blob_cleanup(env, rep); + if (ret == 0) + ret = t_ret; + if (force && db_rep->queue_dbc != NULL) { queue_dbp = db_rep->queue_dbc->dbp; if ((t_ret = __dbc_close(db_rep->queue_dbc)) != 0 && ret == 0) @@ -2324,8 +3689,8 @@ __rep_clean_interrupted(env) * __rep_filedone - * We need to check if we're done with the current file after * processing the current page. Stat the database to see if - * we have all the pages. If so, we need to clean up/close - * this one, set up for the next one, and ask for its pages, + * we have all the pages and blobs. If so, we need to clean up/close + * this one, set up for the next one, and ask for its pages and blobs, * or if this is the last file, request the log records and * move to the REP_RECOVER_LOG state. */ @@ -2338,9 +3703,14 @@ __rep_filedone(env, ip, eid, rep, msgfp, type) __rep_fileinfo_args *msgfp; u_int32_t type; { + DBT msg; REGINFO *infop; __rep_fileinfo_args *rfp; + __rep_blob_update_req_args rbur; int ret; + u_int8_t buf[__REP_BLOB_UPDATE_REQ_SIZE]; + + memset(&msg, 0, sizeof(DBT)); /* * We've put our page, now we need to do any gap processing @@ -2375,8 +3745,96 @@ __rep_filedone(env, ip, eid, rep, msgfp, type) ((ret = __rep_queue_filedone(env, ip, rep, rfp)) != DB_REP_PAGEDONE)) return (ret); + + /* Request blob files. */ + if (rfp->blob_fid_lo != 0 || rfp->blob_fid_hi != 0) { + ret = 0; + rep->blob_sync = 1; + memset(&rbur, 0, sizeof(__rep_blob_update_req_args)); + GET_LO_HI(env, + rfp->blob_fid_lo, rfp->blob_fid_hi, rbur.blob_fid, ret); + msg.size = __REP_BLOB_UPDATE_REQ_SIZE; + msg.data = buf; + __rep_blob_update_req_marshal(env, &rbur, msg.data); + (void)__rep_send_message(env, + rep->master_id, REP_BLOB_UPDATE_REQ, NULL, &msg, 0, 0); + return (ret); + } + + /* + * We have all the data for this file. Clean up. + */ + if ((ret = __rep_init_cleanup(env, rep, 0)) != 0) + return (ret); + + rep->curfile++; + ret = __rep_nextfile(env, eid, rep); + + return (ret); +} + +/* + * __rep_blobdone - + * We need to check if we're done with the current file after + * processing the current blob chunk. + * + * Caller must hold client database mutex (mtx_clientdb) and + * REP_SYSTEM_LOCK. + */ +static int +__rep_blobdone(env, eid, ip, rep, blob_fid, force) + ENV *env; + int eid; + DB_THREAD_INFO *ip; + REP *rep; + db_seq_t blob_fid; + int force; +{ + DBT msg; + __rep_blob_update_req_args rbur; + int done, ret; + u_int8_t buf[__REP_BLOB_UPDATE_REQ_SIZE]; + /* - * We have all the pages for this file. Clean up. + * We've written our blob chunk, now we need to do any gap processing + * that might be needed to re-request chunks. + */ + done = 0; + ret = __rep_blob_chunk_gap(env, eid, ip, rep, &done, blob_fid, force); + /* + * The world changed while we were doing gap processing. + * We're done here. + */ + if (ret == DB_REP_PAGEDONE) + return (0); + else if (ret != 0) + goto err; + + /* + * If the blob database is empty then all files in the current list + * have been processed. However, there may be more files on the + * master, so request the next list if that is the case. + */ + if (done && rep->blob_more_files) { + memset(&rbur, 0, sizeof(__rep_blob_update_req_args)); + rbur.blob_fid = (u_int64_t)blob_fid; + rbur.blob_sid = (u_int64_t)rep->last_blob_sid; + rbur.blob_id = (u_int64_t)rep->last_blob_id; + rbur.highest_id = (u_int64_t)rep->highest_id; + rep->gap_bl_hi_id = rep->gap_bl_hi_sid = 0; + rep->gap_bl_hi_off = 0; + rep->blob_rereq = 0; + msg.size = __REP_BLOB_UPDATE_REQ_SIZE; + msg.data = buf; + __rep_blob_update_req_marshal(env, &rbur, msg.data); + (void)__rep_send_message(env, + rep->master_id, REP_BLOB_UPDATE_REQ, NULL, &msg, 0, 0); + return (0); + } else if (!done) + return (0); + + /* + * We have all the data for this file. Clean up. */ if ((ret = __rep_init_cleanup(env, rep, 0)) != 0) goto err; @@ -2388,6 +3846,255 @@ err: } /* + * __rep_blob_chunk_gap - + * We have written a blob chunk. Now check if there are any that need + * to be re-requested. The blob chunk gap database contains + * descriptions of all the blob chunks that have yet to arrive. + * + * Caller must hold client database mutex (mtx_clientdb) and + * REP_SYSTEM_LOCK. + */ +static int +__rep_blob_chunk_gap(env, eid, ip, rep, done, blob_fid, force) + ENV *env; + int eid; + DB_THREAD_INFO *ip; + REP *rep; + int *done; + db_seq_t blob_fid; + int force; +{ + DBC *dbc; + DBT data, high, key, msg; + DB_LOG *dblp; + DB_REP *db_rep; + LOG *lp; + REGINFO *infop; + __rep_blob_chunk_req_args rbcr; + __rep_fileinfo_args *rfp; + db_seq_t cur_blob_fid; + off_t offset; + int ret; + u_int8_t buf[BLOB_KEY_SIZE], msgbuf[__REP_BLOB_CHUNK_REQ_SIZE]; + + db_rep = env->rep_handle; + dblp = env->lg_handle; + lp = dblp->reginfo.primary; + infop = env->reginfo; + ret = 0; + dbc = NULL; + *done = 0; + + /* eid will be used when peer-to-peer is re-enabled for blobs. */ + COMPQUIET(eid, 0); + + /* + * Make sure we're still talking about the same file. + * If not, we're done here. + */ + GET_CURINFO(rep, infop, rfp); + GET_LO_HI(env, rfp->blob_fid_lo, rfp->blob_fid_hi, cur_blob_fid, ret); + if (cur_blob_fid != blob_fid) { + ret = DB_REP_PAGEDONE; + goto err; + } + + /* Get the first missing blob chunk. */ + if ((ret = __db_cursor(db_rep->blob_dbp, ip, NULL, &dbc, 0)) != 0) + goto err; + memset(&key, 0, sizeof(DBT)); + memset(&data, 0, sizeof(DBT)); + ret = __dbc_get(dbc, &key, &data, DB_FIRST); + if (ret == DB_NOTFOUND) { + /* All blobs received. */ + ret = 0; + *done = 1; + goto err; + } else if (ret != 0) + goto err; + + DB_ASSERT(env, key.size == BLOB_KEY_SIZE); + DB_ASSERT(env, data.size == sizeof(off_t)); + offset = *(off_t *)data.data; + /* + * Format the sdbid and id of the high chunk as a blob gap + * database key, so it can be compared with the entries in that + * database. + */ + memset(&high, 0, sizeof(DBT)); + memcpy(buf, &rep->gap_bl_hi_sid, BLOB_ID_SIZE); + memcpy(buf + BLOB_ID_SIZE, &rep->gap_bl_hi_id, BLOB_ID_SIZE); + high.data = buf; + high.size = BLOB_KEY_SIZE; + + /* + * If the first chunk in the database is larger than the highest chunk + * received, then there is no gap. + * + * If a gap does exist, check if it is time to do a re-request. If so, + * re-request every chunk that exists before the highest received. + */ + if (!force && (__rep_blob_cmp(NULL, &key, &high, NULL) > 0 || + (__rep_blob_cmp(NULL, &key, &high, NULL) == 0 && + offset > rep->gap_bl_hi_off))) { + lp->wait_ts = db_rep->request_gap; + __os_gettime(env, &lp->rcvd_ts, 1); + } else if (force || __rep_check_doreq(env, rep)) { + /* + * Re-request every chunk less than the highest one, plus the + * next blob chunk that we are expecting. The next expected + * blob chunk is requested in case the last blob chunk is lost + * in transit. + */ + do { + memset(&rbcr, 0, sizeof(__rep_blob_chunk_req_args)); + memcpy(&(rbcr.blob_sid), key.data, BLOB_ID_SIZE); + memcpy(&(rbcr.blob_id), + (u_int8_t *)key.data + BLOB_ID_SIZE, BLOB_ID_SIZE); + rbcr.offset = *(u_int64_t *)data.data; + rbcr.blob_fid = (u_int64_t)blob_fid; + msg.size = __REP_BLOB_CHUNK_REQ_SIZE; + msg.data = msgbuf; + RPRINT(env, (env, DB_VERB_REP_SYNC, +"blob_chunk_gap: Req file_id %llu, sdb_id %llu, blob_id %llu, offset %llu", + (long long)rbcr.blob_fid, (long long)rbcr.blob_sid, + (long long)rbcr.blob_id, (long long)rbcr.offset)); + __rep_blob_chunk_req_marshal(env, &rbcr, msg.data); + /* + * Note that peer-to-peer initialization is not + * supported for blobs. + */ + (void)__rep_send_message( + env, rep->master_id, + REP_BLOB_CHUNK_REQ, NULL, &msg, 0, 0); + /* + * Break after requesting the chunk after the highest + * one. + */ + if (__rep_blob_cmp(NULL, &key, &high, NULL) > 0 || + (__rep_blob_cmp(NULL, &key, &high, NULL) == 0 && + offset > rep->gap_bl_hi_off)) + break; + if ((ret = __dbc_get( + dbc, &key, &data, DB_NEXT)) != 0) { + if (ret == DB_NOTFOUND) { + ret = 0; + break; + } + goto err; + } + } while (1); + } + +err: if (dbc != NULL) + (void)__dbc_close(dbc); + + return (ret); +} + +/* + * __rep_blob_chunk_req + * Answer a request for a specific blob chunk. + * + * PUBLIC: int __rep_blob_chunk_req __P((ENV *, int, DBT *)); + */ +int +__rep_blob_chunk_req(env, eid, rec) + ENV *env; + int eid; + DBT *rec; +{ + DB *dbp; + DBT msg; + DB_FH *fhp; + __rep_blob_chunk_args rbc; + __rep_blob_chunk_req_args rbcr; + int ret; + u_int8_t *chunk_buf, *msg_buf, *ptr; + + dbp = NULL; + fhp = NULL; + chunk_buf = msg_buf = NULL; + + if ((ret = + __os_malloc(env, MEGABYTE + __REP_BLOB_CHUNK_SIZE, &msg_buf)) != 0) + goto err; + memset(&msg, 0, sizeof(DBT)); + msg.data = msg_buf; + msg.ulen = MEGABYTE + __REP_BLOB_CHUNK_SIZE; + if ((ret = __os_malloc(env, MEGABYTE, &chunk_buf)) != 0) + goto err; + memset(&rbc, 0, sizeof(__rep_blob_chunk_args)); + rbc.data.data = chunk_buf; + rbc.data.ulen = MEGABYTE; + rbc.data.flags = DB_DBT_USERMEM; + + if ((ret = __rep_blob_chunk_req_unmarshal( + env, &rbcr, rec->data, rec->size, &ptr)) != 0) + goto err; + + RPRINT(env, (env, DB_VERB_REP_SYNC, + "blob_chunk_req: file_id %llu, sdbid %llu, id %llu, offset %llu", + (long long)rbcr.blob_fid, (long long)rbcr.blob_sid, + (long long)rbcr.blob_id, (long long)rbcr.offset)); + + rbc.blob_fid = rbcr.blob_fid; + rbc.blob_id = rbcr.blob_id; + rbc.blob_sid = rbcr.blob_sid; + rbc.offset = rbcr.offset; + if ((ret = __db_create_internal(&dbp, env, 0)) != 0) + goto err; + dbp->blob_file_id = (db_seq_t)rbcr.blob_fid; + dbp->blob_sdb_id = (db_seq_t)rbcr.blob_sid; + if ((ret = __blob_make_sub_dir(env, &dbp->blob_sub_dir, + (db_seq_t)rbcr.blob_fid, (db_seq_t)rbcr.blob_sid)) != 0) + goto err; + if ((ret = __blob_file_open( + dbp, &fhp, (db_seq_t)rbcr.blob_id, DB_FOP_READONLY, 0)) != 0) { + /* + * The file may have been deleted between creating the + * list and sending the request. Send a message saying + * the file has been deleted. + */ + if (ret == ENOENT) { + ret = 0; + F_SET(&rbc, BLOB_DELETE); + rbc.data.size = 0; + __rep_blob_chunk_marshal(env, &rbc, msg.data); + msg.size = __REP_BLOB_CHUNK_SIZE; + (void)__rep_send_message( + env, eid, REP_BLOB_CHUNK, NULL, &msg, 0, 0); + goto err; + } + goto err; + } + if ((ret = __blob_file_read( + env, fhp, &rbc.data, (off_t)rbcr.offset, MEGABYTE)) != 0) + goto err; + DB_ASSERT(env, rbc.data.size <= MEGABYTE); + + /* + * In rare cases the blob file may have gotten shorter + * since the list was created. + */ + if (rbc.data.size == 0) + F_SET(&rbc, BLOB_CHUNK_FAIL); + __rep_blob_chunk_marshal(env, &rbc, msg.data); + msg.size = __REP_BLOB_CHUNK_SIZE + rbc.data.size; + (void)__rep_send_message(env, eid, REP_BLOB_CHUNK, NULL, &msg, 0, 0); + +err: if (chunk_buf != NULL) + __os_free(env, chunk_buf); + if (msg_buf != NULL) + __os_free(env, msg_buf); + if (fhp != NULL) + (void)__os_closehandle(env, fhp); + if (dbp != 0) + (void)__db_close(dbp, NULL, 0); + return (ret); +} + +/* * Starts requesting pages for the next file in the list (if any), or if not, * proceeds to the next stage: requesting logs. * @@ -2404,19 +4111,25 @@ __rep_nextfile(env, eid, rep) DBT dbt; __rep_logreq_args lr_args; DB_LOG *dblp; + DB_REP *db_rep; + DELAYED_BLOB_LIST *dbl; LOG *lp; REGENV *renv; REGINFO *infop; __rep_fileinfo_args *curinfo, *rfp, rf; __rep_fileinfo_v6_args *rfpv6; - int *curbuf, ret; + __rep_fileinfo_v7_args *rfpv7; + int *curbuf, ret, view_partial; u_int8_t *buf, *info_ptr, lrbuf[__REP_LOGREQ_SIZE], *nextinfo; size_t len, msgsz; + char *name; void *rffree; infop = env->reginfo; renv = infop->primary; + db_rep = env->rep_handle; rfp = NULL; + dbl = NULL; /* * Always direct the next request to the master (at least nominally), @@ -2430,13 +4143,13 @@ __rep_nextfile(env, eid, rep) /* Set curinfo to next file and examine it. */ info_ptr = R_ADDR(infop, rep->originfo_off + (rep->originfolen - rep->infolen)); + /* + * Build a current struct by copying in the older + * version struct and then setting up the new fields. + * This is safe because all old fields are in the + * same location in the current struct. + */ if (rep->infoversion < DB_REPVERSION_53) { - /* - * Build a current struct by copying in the older - * version struct and then setting up the data_dir. - * This is safe because all old fields are in the - * same location in the current struct. - */ if ((ret = __rep_fileinfo_v6_unmarshal(env, rep->infoversion, &rfpv6, info_ptr, rep->infolen, &nextinfo)) != 0) @@ -2444,8 +4157,18 @@ __rep_nextfile(env, eid, rep) memcpy(&rf, rfpv6, sizeof(__rep_fileinfo_v6_args)); rf.dir.data = NULL; rf.dir.size = 0; + rf.blob_fid_lo = rf.blob_fid_hi = 0; rfp = &rf; rffree = rfpv6; + } else if (rep->infoversion < DB_REPVERSION_61) { + if ((ret = __rep_fileinfo_v7_unmarshal(env, + rep->infoversion, &rfpv7, + info_ptr, rep->infolen, &nextinfo)) != 0) + return (ret); + memcpy(&rf, rfpv7, sizeof(__rep_fileinfo_v7_args)); + rf.blob_fid_lo = rf.blob_fid_hi = 0; + rfp = &rf; + rffree = rfpv7; } else { if ((ret = __rep_fileinfo_unmarshal(env, rep->infoversion, &rfp, info_ptr, @@ -2457,6 +4180,14 @@ __rep_nextfile(env, eid, rep) } rffree = rfp; } +#ifndef HAVE_64BIT_TYPES + if (rfp->blob_fid_lo != 0 || rfp->blob_fid_hi != 0) { + __db_errx(env, DB_STR("3705", + "Blobs require 64 integer compiler support.")); + __os_free(env, rffree); + return (DB_OPNOTSUP); + } +#endif rep->infolen -= (u_int32_t)(nextinfo - info_ptr); MUTEX_LOCK(env, renv->mtx_regenv); ret = __env_alloc(infop, sizeof(__rep_fileinfo_args) + @@ -2484,19 +4215,55 @@ __rep_nextfile(env, eid, rep) rfp->dir.data, rfp->dir.size); __os_free(env, rffree); - /* Skip over regular DB's in "abbreviated" internal inits. */ - if (F_ISSET(rep, REP_F_ABBREVIATED) && + /* + * If a partial callback is set, invoke the callback to see if + * this file should be replicated. + */ + if (IS_VIEW_SITE(env) && curinfo->info.size > 0 && !FLD_ISSET(curinfo->db_flags, DB_AM_INMEM)) { + name = (char *)curinfo->info.data; + DB_ASSERT(env, db_rep->partial != NULL); + /* + * Always replicate system owned databases. + */ + if (IS_DB_FILE(name) && !IS_BLOB_META(name)) + view_partial = 1; + else if ((ret = __rep_call_partial(env, + name, &view_partial, 0, &dbl)) != 0) { + VPRINT(env, (env, DB_VERB_REP_SYNC, + "rep_nextfile: partial cb err %d for %s", + ret, name)); + return (ret); + } + /* + * dbl != NULL when we could not find the name of the + * database that owns a blob meta database. If that + * happens then it was never opened, which means it + * was not replicated, and as such neither should its + * bmd be replicated. + */ + if (dbl != NULL) { + view_partial = 0; + __os_free(env, dbl); + dbl = NULL; + } VPRINT(env, (env, DB_VERB_REP_SYNC, - "Skipping file %d in abbreviated internal init", - curinfo->filenum)); - MUTEX_LOCK(env, renv->mtx_regenv); - __env_alloc_free(infop, - R_ADDR(infop, rep->curinfo_off)); - MUTEX_UNLOCK(env, renv->mtx_regenv); - rep->curinfo_off = INVALID_ROFF; - rep->curfile++; - continue; + "rep_nextfile: %s file %s %d on view site.", + view_partial == 0 ? + "Skipping" : "Replicating", + name, curinfo->filenum)); + /* + * If we're skipping the file, move to the next one. + */ + if (view_partial == 0) { + MUTEX_LOCK(env, renv->mtx_regenv); + __env_alloc_free(infop, + R_ADDR(infop, rep->curinfo_off)); + MUTEX_UNLOCK(env, renv->mtx_regenv); + rep->curinfo_off = INVALID_ROFF; + rep->curfile++; + continue; + } } /* Request this file's pages. */ @@ -2519,15 +4286,19 @@ __rep_nextfile(env, eid, rep) curinfo->uid.size + curinfo->info.size; if ((ret = __os_calloc(env, 1, msgsz, &buf)) != 0) return (ret); + /* + * It is safe to cast to the old structs + * because the first part of the current + * struct matches the old structs. + */ if (rep->infoversion < DB_REPVERSION_53) - /* - * It is safe to cast to the old struct - * because the first part of the current - * struct matches the old struct. - */ ret = __rep_fileinfo_v6_marshal(env, rep->infoversion, (__rep_fileinfo_v6_args *)curinfo, buf, msgsz, &len); + else if (rep->infoversion < DB_REPVERSION_61) + ret = __rep_fileinfo_v7_marshal(env, rep->infoversion, + (__rep_fileinfo_v7_args *)curinfo, buf, + msgsz, &len); else ret = __rep_fileinfo_marshal(env, rep->infoversion, curinfo, buf, msgsz, &len); @@ -2834,16 +4605,19 @@ __rep_pggap_req(env, rep, reqfp, gapflags) * new info into rep->finfo. Assert that the sizes never * change. The only thing this should do is change * the pgno field. Everything else remains the same. + * + * It is safe to cast to the old structs + * because the first part of the current + * struct matches the old structs. */ if (rep->infoversion < DB_REPVERSION_53) - /* - * It is safe to cast to the old struct - * because the first part of the current - * struct matches the old struct. - */ ret = __rep_fileinfo_v6_marshal(env, rep->infoversion, (__rep_fileinfo_v6_args *)tmpfp, buf, msgsz, &len); + else if (rep->infoversion < DB_REPVERSION_61) + ret = __rep_fileinfo_v7_marshal(env, rep->infoversion, + (__rep_fileinfo_v7_args *)tmpfp, buf, + msgsz, &len); else ret = __rep_fileinfo_marshal(env, rep->infoversion, tmpfp, buf, msgsz, &len); @@ -2865,6 +4639,94 @@ err: } /* + * __rep_blob_rereq - + * + * Re-request lost blob messages, such as REP_BLOB_CHUNK_REQ, REP_BLOB_ALL_REQ, + * or REP_BLOB_UPDATE_REQ. Note that the blob chunk gap database contains + * descriptions of the blob chunks that we are expecting to arrive. + * + * Assumes the caller holds mtx_clientdb and rep_mutex. + * + * PUBLIC: int __rep_blob_rereq __P((ENV *, REP *)); + */ +int +__rep_blob_rereq(env, rep) + ENV *env; + REP *rep; +{ + DB_REP *db_rep; + DB_THREAD_INFO *ip; + REGINFO *infop; + __rep_fileinfo_args *rfp; + db_seq_t blob_fid; + int master, ret; + u_int32_t count; + + db_rep = env->rep_handle; + infop = env->reginfo; + rfp = NULL; + ret = 0; + + /* First check if the master is around to answer the re-request. */ + master = rep->master_id; + if (master == DB_EID_INVALID) { + (void)__rep_send_message(env, + DB_EID_BROADCAST, REP_MASTER_REQ, NULL, NULL, 0, 0); + goto err; + } + + if (db_rep->blob_dbp == NULL && + (ret = __rep_client_dbinit(env, 0, REP_BLOB)) != 0) { + RPRINT(env, (env, DB_VERB_REP_SYNC, + "REP_BLOB_CHUNK: Client_dbinit %s", + db_strerror(ret))); + goto err; + } + + /* + * If the gap blob id is 0 then we either lost a REP_BLOB_ALL_REQ or + * a REP_BLOB_UPDATE_REQ message. Since we do not have the information + * to reconstruct a REP_BLOB_ALL_REQ message, reset the blob gap + * database and start over at the REP_BLOB_UPDATE_REQ stage. + * + * If the blob gap id is not 0, we lost a REP_BLOB_CHUNK_REQ message, + * so perform blob gap processing. + */ + ENV_GET_THREAD_INFO(env, ip); + if (rep->gap_bl_hi_id == 0) { + /* + * It takes a while to create the blob update message, so skip + * the first time it asks. + */ + if (rep->blob_rereq == 0) { + rep->blob_rereq = 1; + goto err; + } + rep->blob_rereq = 0; + if ((ret = __db_truncate( + db_rep->blob_dbp, ip, NULL, &count)) != 0) + goto err; + rep->blob_more_files = 1; + rep->last_blob_id = rep->prev_blob_id; + rep->last_blob_sid = rep->prev_blob_sid; + } + + GET_CURINFO(rep, infop, rfp); + GET_LO_HI(env, rfp->blob_fid_lo, rfp->blob_fid_hi, blob_fid, ret); + if (ret != 0) + goto err; + /* + * If there are entries in the blob gap database, __rep_blobdone + * will perform gap processing, otherwise it will send + * a REP_BLOB_UPDATE_REQ. + */ + ret = __rep_blobdone(env, master, ip, rep, blob_fid, 1); + +err: + return (ret); +} + +/* * __rep_finfo_alloc - * Allocate and initialize a fileinfo structure. * @@ -3521,6 +5383,7 @@ __rep_walk_filelist(env, version, files, size, count, fn, arg) { __rep_fileinfo_args *rfp, rf; __rep_fileinfo_v6_args *rfpv6; + __rep_fileinfo_v7_args *rfpv7; u_int8_t *next; int ret; void *rffree; @@ -3530,21 +5393,30 @@ __rep_walk_filelist(env, version, files, size, count, fn, arg) rfpv6 = NULL; rffree = NULL; while (count-- > 0) { + /* + * Build a current struct by copying in the older + * version struct and then setting up the new fields. + * This is safe because all old fields are in the + * same location in the current struct. + */ if (version < DB_REPVERSION_53) { - /* - * Build a current struct by copying in the older - * version struct and then setting up the data_dir. - * This is safe because all old fields are in the - * same location in the current struct. - */ if ((ret = __rep_fileinfo_v6_unmarshal(env, version, &rfpv6, files, size, &next)) != 0) break; memcpy(&rf, rfpv6, sizeof(__rep_fileinfo_v6_args)); rf.dir.data = NULL; rf.dir.size = 0; + rf.blob_fid_lo = rf.blob_fid_hi = 0; rfp = &rf; rffree = rfpv6; + } else if (version < DB_REPVERSION_61) { + if ((ret = __rep_fileinfo_v7_unmarshal(env, version, + &rfpv7, files, size, &next)) != 0) + break; + memcpy(&rf, rfpv7, sizeof(__rep_fileinfo_v7_args)); + rf.blob_fid_lo = rf.blob_fid_hi = 0; + rfp = &rf; + rffree = rfpv7; } else { if ((ret = __rep_fileinfo_unmarshal(env, version, &rfp, files, size, &next)) != 0) @@ -3566,3 +5438,33 @@ __rep_walk_filelist(env, version, files, size, count, fn, arg) __os_free(env, rffree); return (ret); } + +/* + * Initializes a FILE_LIST_CTX structure. + * + * Pass in a non-zero value for update_space to reserve space for + * update_args in the context's buffer. + */ +static int +__rep_init_file_list_context(env, version, flags, update_space, context) + ENV *env; + u_int32_t version; + u_int32_t flags; + int update_space; + FILE_LIST_CTX *context; +{ + int ret; + + if ((ret = __os_calloc(env, 1, MEGABYTE, &context->buf)) != 0) + return (ret); + context->size = MEGABYTE; + context->count = 0; + context->version = version; + context->flags = flags; + /* Reserve space for update_args. */ + if (update_space) + context->fillptr = FIRST_FILE_PTR(context->buf); + else + context->fillptr = context->buf; + return (ret); +} diff --git a/src/rep/rep_elect.c b/src/rep/rep_elect.c index 9e8c5249..234daf31 100644 --- a/src/rep/rep_elect.c +++ b/src/rep/rep_elect.c @@ -1,7 +1,7 @@ /*- * See the file LICENSE for redistribution information. * - * Copyright (c) 2004, 2012 Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2004, 2015 Oracle and/or its affiliates. All rights reserved. * * $Id$ */ @@ -53,8 +53,9 @@ __rep_elect_pp(dbenv, given_nsites, nvotes, flags) u_int32_t given_nsites, nvotes; u_int32_t flags; { - DB_REP *db_rep; ENV *env; + DB_REP *db_rep; + DB_THREAD_INFO *ip; int ret; env = dbenv->env; @@ -89,7 +90,9 @@ __rep_elect_pp(dbenv, given_nsites, nvotes, flags) return (EINVAL); } + ENV_ENTER(env, ip); ret = __rep_elect_int(env, given_nsites, nvotes, flags); + ENV_LEAVE(env, ip); /* * The DB_REP_IGNORE return code can be of use to repmgr (which of @@ -120,7 +123,6 @@ __rep_elect_int(env, given_nsites, nvotes, flags) DB_LOGC *logc; DB_LSN lsn; DB_REP *db_rep; - DB_THREAD_INFO *ip; LOG *lp; REP *rep; int done, elected, in_progress; @@ -140,6 +142,15 @@ __rep_elect_int(env, given_nsites, nvotes, flags) ret = 0; /* + * View sites never participate in elections. + */ + if (IS_VIEW_SITE(env)) { + __db_errx(env, DB_STR("3687", + "View sites may not participate in elections")); + return (EINVAL); + } + + /* * Specifying 0 for nsites signals us to use the value configured * previously via rep_set_nsites. Similarly, if the given nvotes is 0, * it asks us to compute the value representing a simple majority. @@ -185,7 +196,6 @@ __rep_elect_int(env, given_nsites, nvotes, flags) * real, configured priority, as retrieved from REP region. */ ctlflags = realpri != 0 ? REPCTL_ELECTABLE : 0; - ENV_ENTER(env, ip); orig_tally = 0; /* If we are already master, simply broadcast that fact and return. */ @@ -597,8 +607,7 @@ out: DB_ASSERT(env, rep->elect_th > 0); rep->elect_th--; if (rep->elect_th == 0) { - need_req = F_ISSET(rep, REP_F_SKIPPED_APPLY) && - !I_HAVE_WON(rep, rep->winner); + need_req = F_ISSET(rep, REP_F_SKIPPED_APPLY) && !elected; FLD_CLR(rep->lockout_flags, REP_LOCKOUT_APPLY); F_CLR(rep, REP_F_SKIPPED_APPLY); } @@ -641,7 +650,6 @@ out: unlck_lv: REP_SYSTEM_UNLOCK(env); } envleave: - ENV_LEAVE(env, ip); return (ret); } @@ -1106,7 +1114,7 @@ __rep_cmp_vote(env, rep, eid, lsnp, priority, gen, data_gen, tiebreaker, flags) u_int32_t priority; u_int32_t data_gen, flags, gen, tiebreaker; { - int cmp, like_pri; + int cmp, genlog_cmp, like_pri; cmp = LOG_COMPARE(lsnp, &rep->w_lsn); /* @@ -1140,9 +1148,18 @@ __rep_cmp_vote(env, rep, eid, lsnp, priority, gen, data_gen, tiebreaker, flags) like_pri = (priority == 0 && rep->w_priority == 0) || (priority != 0 && rep->w_priority != 0); - if ((priority != 0 && rep->w_priority == 0) || - (like_pri && data_gen > rep->w_datagen) || - (like_pri && data_gen == rep->w_datagen && cmp > 0) || + /* + * The undocumented ELECT_LOGLENGTH option requires that the + * election should be won based on log length without regard + * for datagen. Do not include datagen in the comparison if + * this option is enabled. + */ + if (FLD_ISSET(rep->config, REP_C_ELECT_LOGLENGTH)) + genlog_cmp = like_pri && cmp > 0; + else + genlog_cmp = (like_pri && data_gen > rep->w_datagen) || + (like_pri && data_gen == rep->w_datagen && cmp > 0); + if ((priority != 0 && rep->w_priority == 0) || genlog_cmp || (cmp == 0 && (priority > rep->w_priority || (priority == rep->w_priority && (tiebreaker > rep->w_tiebreaker))))) { @@ -1306,8 +1323,9 @@ __rep_wait(env, timeoutp, full_elect, egen, flags) { DB_REP *db_rep; REP *rep; - int done; - u_int32_t sleeptime, sleeptotal, timeout; + db_timespec exptime, mytime; + int diff_timeout, done; + u_int32_t sleeptime, timeout; db_rep = env->rep_handle; rep = db_rep->region; @@ -1315,10 +1333,20 @@ __rep_wait(env, timeoutp, full_elect, egen, flags) timeout = *timeoutp; sleeptime = SLEEPTIME(timeout); - sleeptotal = 0; - while (sleeptotal < timeout) { + __os_gettime(env, &exptime, 0); + TIMESPEC_ADD_DB_TIMEOUT(&exptime, timeout); + while (!done) { + __os_gettime(env, &mytime, 0); + /* + * Check if the timeout has expired. __os_yield might sleep + * a slightly shorter time than requested, so check the exact + * amount of time that has passed. If we do not sleep the + * full PHASE0 time, old unexpired lease grants could + * incorrectly prevent the election from happening. + */ + if (timespeccmp(&mytime, &exptime, >)) + break; __os_yield(env, 0, sleeptime); - sleeptotal += sleeptime; REP_SYSTEM_LOCK(env); /* * Check if group membership changed while we were @@ -1331,19 +1359,19 @@ __rep_wait(env, timeoutp, full_elect, egen, flags) if (!LF_ISSET(REP_E_PHASE0) && full_elect && F_ISSET(rep, REP_F_GROUP_ESTD)) { *timeoutp = rep->elect_timeout; + if ((diff_timeout = (int)(*timeoutp - timeout)) > 0) + TIMESPEC_ADD_DB_TIMEOUT(&exptime, diff_timeout); + else { + diff_timeout = -diff_timeout; + TIMESPEC_SUB_DB_TIMEOUT(&exptime, diff_timeout); + } timeout = *timeoutp; - if (sleeptotal >= timeout) - done = 1; - else - sleeptime = SLEEPTIME(timeout); + sleeptime = SLEEPTIME(timeout); } if (egen != rep->egen || !FLD_ISSET(rep->elect_flags, flags)) done = 1; REP_SYSTEM_UNLOCK(env); - - if (done) - return (0); } return (0); } diff --git a/src/rep/rep_lease.c b/src/rep/rep_lease.c index 047c39a7..b6010046 100644 --- a/src/rep/rep_lease.c +++ b/src/rep/rep_lease.c @@ -1,7 +1,7 @@ /*- * See the file LICENSE for redistribution information. * - * Copyright (c) 2007, 2012 Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2007, 2015 Oracle and/or its affiliates. All rights reserved. * * $Id$ */ @@ -45,10 +45,20 @@ __rep_update_grant(env, ts) timespecclear(&mytime); /* + * If we are a view, we never grant a lease. + */ + if (IS_VIEW_SITE(env)) + return (0); + + /* * Get current time, and add in the (skewed) lease duration - * time to send the grant to the master. + * time to send the grant to the master. We need to use '0' + * for a non-monotonic (i.e. realtime) timestamp. Some systems + * use "time since boot" for monotonic time, which would not + * work between machines here. We already document that for leases, + * the time cannot go backward. */ - __os_gettime(env, &mytime, 1); + __os_gettime(env, &mytime, 0); timespecadd(&mytime, &rep->lease_duration); REP_SYSTEM_LOCK(env); /* @@ -108,7 +118,7 @@ __rep_islease_granted(env) * Get current time and compare against our granted lease. */ timespecclear(&mytime); - __os_gettime(env, &mytime, 1); + __os_gettime(env, &mytime, 0); return (timespeccmp(&mytime, &rep->grant_expire, <=) ? 1 : 0); } @@ -319,9 +329,15 @@ __rep_lease_check(env, refresh) max_tries = LEASE_REFRESH_MIN; retry: REP_SYSTEM_LOCK(env); - min_leases = rep->config_nsites / 2; + /* + * We need enough leases so that we're guaranteed any successful + * election will include at least one site with the lease-guaranteed + * data. Note this is based on total number of sites so leases + * cannot be used with half or more unelectable sites. + */ + min_leases = (rep->config_nsites - 1) / 2; ret = 0; - __os_gettime(env, &curtime, 1); + __os_gettime(env, &curtime, 0); VPRINT(env, (env, DB_VERB_REP_LEASE, "%s %d of %d refresh %d min_leases %lu curtime %lu %lu, maxLSN [%lu][%lu]", "lease_check: try ", tries, max_tries, refresh, @@ -526,7 +542,7 @@ __rep_lease_waittime(env) if (!F_ISSET(rep, REP_F_LEASE_EXPIRED)) to = rep->lease_timeout; } else { - __os_gettime(env, &mytime, 1); + __os_gettime(env, &mytime, 0); RPRINT(env, (env, DB_VERB_REP_LEASE, "wait_time: mytime %lu %lu, grant_expire %lu %lu", (u_long)mytime.tv_sec, (u_long)mytime.tv_nsec, diff --git a/src/rep/rep_log.c b/src/rep/rep_log.c index 42300685..bf72db9e 100644 --- a/src/rep/rep_log.c +++ b/src/rep/rep_log.c @@ -1,7 +1,7 @@ /*- * See the file LICENSE for redistribution information. * - * Copyright (c) 2004, 2012 Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2004, 2015 Oracle and/or its affiliates. All rights reserved. * * $Id$ */ @@ -110,7 +110,7 @@ __rep_allreq(env, rp, eid) */ if (ret == 0 && repth.lsn.file != 1 && flags == DB_FIRST) { if (F_ISSET(rep, REP_F_CLIENT)) - ret = DB_NOTFOUND; + ret = USR_ERR(env, DB_NOTFOUND); else (void)__rep_send_message(env, eid, REP_VERIFY_FAIL, &repth.lsn, NULL, 0, 0); @@ -466,8 +466,8 @@ __rep_log_split(env, ip, rp, rec, ret_lsnp, last_lsnp) if (p >= ep && save_flags) F_SET(&tmprp, save_flags); /* - * A previous call to __rep_apply indicated an earlier - * record is a dup and the next_new_lsn we are waiting for. + * A previous call to __rep_apply indicated an earlier record + * is a past dup and the next_new_lsn for which we are waiting. * Skip log records until we catch up with next_new_lsn. */ if (is_dup && LOG_COMPARE(&tmprp.lsn, &next_new_lsn) < 0) { @@ -482,7 +482,20 @@ __rep_log_split(env, ip, rp, rec, ret_lsnp, last_lsnp) VPRINT(env, (env, DB_VERB_REP_MISC, "log_split: rep_apply ret %d, dup %d, tmp_lsn [%lu][%lu]", ret, is_dup, (u_long)tmp_lsn.file, (u_long)tmp_lsn.offset)); - if (is_dup) + /* + * We can skip log records between a past dup and tmp_lsn + * returned by rep_apply() because we know we have all + * those log records. For a past dup, this log record is + * less than or equal to tmp_lsn (which is either ready_lsn + * or max_perm_lsn) and we only have records to skip when + * it is less than tmp_lsn. + * + * We cannot skip log records for a future dup because we + * may not have all of them. In this case, this log record + * is greater than or equal to tmp_lsn (which is either + * ready_lsn or this log record). + */ + if (is_dup && LOG_COMPARE(&tmprp.lsn, &tmp_lsn) < 0) next_new_lsn = tmp_lsn; switch (ret) { /* @@ -637,7 +650,7 @@ __rep_logreq(env, rp, rec, eid) if (LOG_COMPARE(&firstlsn, &rp->lsn) > 0) { /* Case 3 */ if (F_ISSET(rep, REP_F_CLIENT)) { - ret = DB_NOTFOUND; + ret = USR_ERR(env, DB_NOTFOUND); goto err; } (void)__rep_send_message(env, eid, @@ -662,7 +675,7 @@ __rep_logreq(env, rp, rec, eid) ret = 0; goto err; } else - ret = DB_NOTFOUND; + ret = USR_ERR(env, DB_NOTFOUND); } } @@ -812,6 +825,14 @@ __rep_loggap_req(env, rep, lsnp, gapflags) ret = 0; /* + * If we are in SYNC_LOG and have all the log we need (i.e. + * rep->last_lsn is ZERO_LSN), just return, as there is nothing + * to do while recovery is running. + */ + if (rep->sync_state == SYNC_LOG && IS_ZERO_LSN(rep->last_lsn)) + return (0); + + /* * Check if we need to ask for the gap. * We ask for the gap if: * We are forced to with gapflags. @@ -1030,7 +1051,7 @@ __rep_chk_newfile(env, logc, rep, rp, eid) REP_VERIFY_FAIL, &rp->lsn, NULL, 0, 0); } else - ret = DB_NOTFOUND; + ret = USR_ERR(env, DB_NOTFOUND); } else { endlsn.offset += logc->len; if ((ret = __logc_version(logc, @@ -1054,7 +1075,7 @@ __rep_chk_newfile(env, logc, rep, rp, eid) } } } else - ret = DB_NOTFOUND; + ret = USR_ERR(env, DB_NOTFOUND); return (ret); } diff --git a/src/rep/rep_method.c b/src/rep/rep_method.c index f9f1924c..e0e7dd19 100644 --- a/src/rep/rep_method.c +++ b/src/rep/rep_method.c @@ -1,7 +1,7 @@ /*- * See the file LICENSE for redistribution information. * - * Copyright (c) 2001, 2012 Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2001, 2015 Oracle and/or its affiliates. All rights reserved. * * $Id$ */ @@ -10,6 +10,7 @@ #include "db_int.h" #include "dbinc/db_page.h" +#include "dbinc/blob.h" #include "dbinc/btree.h" #include "dbinc/mp.h" #include "dbinc/txn.h" @@ -17,14 +18,12 @@ static int __rep_abort_prepared __P((ENV *)); static int __rep_await_condition __P((ENV *, struct rep_waitgoal *, db_timeout_t)); -static int __rep_bt_cmp __P((DB *, const DBT *, const DBT *)); +static int __rep_bt_cmp __P((DB *, const DBT *, const DBT *, size_t *)); static int __rep_check_applied __P((ENV *, DB_THREAD_INFO *, DB_COMMIT_INFO *, struct rep_waitgoal *)); static void __rep_config_map __P((ENV *, u_int32_t *, u_int32_t *)); static u_int32_t __rep_conv_vers __P((ENV *, u_int32_t)); -static int __rep_read_lsn_history __P((ENV *, - DB_THREAD_INFO *, DB_TXN **, DBC **, u_int32_t, - __rep_lsn_hist_data_args *, struct rep_waitgoal *, u_int32_t)); +static int __rep_defview __P((DB_ENV *, const char *, int *, u_int32_t)); static int __rep_restore_prepared __P((ENV *)); static int __rep_save_lsn_hist __P((ENV *, DB_THREAD_INFO *, DB_LSN *)); /* @@ -123,9 +122,11 @@ __rep_get_config(dbenv, which, onp) #undef OK_FLAGS #define OK_FLAGS \ (DB_REP_CONF_AUTOINIT | DB_REP_CONF_AUTOROLLBACK | \ - DB_REP_CONF_BULK | DB_REP_CONF_DELAYCLIENT | DB_REP_CONF_INMEM | \ + DB_REP_CONF_BULK | DB_REP_CONF_DELAYCLIENT | \ + DB_REP_CONF_ELECT_LOGLENGTH | DB_REP_CONF_INMEM | \ DB_REP_CONF_LEASE | DB_REP_CONF_NOWAIT | \ - DB_REPMGR_CONF_2SITE_STRICT | DB_REPMGR_CONF_ELECTIONS) + DB_REPMGR_CONF_2SITE_STRICT | DB_REPMGR_CONF_ELECTIONS | \ + DB_REPMGR_CONF_PREFMAS_CLIENT | DB_REPMGR_CONF_PREFMAS_MASTER) if (FLD_ISSET(which, ~OK_FLAGS)) return (__db_ferr(env, "DB_ENV->rep_get_config", 0)); @@ -171,19 +172,30 @@ __rep_set_config(dbenv, which, on) REP *rep; REP_BULK bulk; u_int32_t mapped, orig; - int ret, t_ret; + int inmemlog, pm_ret, ret, t_ret; env = dbenv->env; db_rep = env->rep_handle; ret = 0; + pm_ret = 0; + inmemlog = 0; #undef OK_FLAGS #define OK_FLAGS \ (DB_REP_CONF_AUTOINIT | DB_REP_CONF_AUTOROLLBACK | \ - DB_REP_CONF_BULK | DB_REP_CONF_DELAYCLIENT | DB_REP_CONF_INMEM | \ + DB_REP_CONF_BULK | DB_REP_CONF_DELAYCLIENT | \ + DB_REP_CONF_ELECT_LOGLENGTH | DB_REP_CONF_INMEM | \ DB_REP_CONF_LEASE | DB_REP_CONF_NOWAIT | \ - DB_REPMGR_CONF_2SITE_STRICT | DB_REPMGR_CONF_ELECTIONS) -#define REPMGR_FLAGS (REP_C_2SITE_STRICT | REP_C_ELECTIONS) + DB_REPMGR_CONF_2SITE_STRICT | DB_REPMGR_CONF_ELECTIONS | \ + DB_REPMGR_CONF_PREFMAS_CLIENT | DB_REPMGR_CONF_PREFMAS_MASTER) +#define REPMGR_FLAGS (REP_C_2SITE_STRICT | REP_C_ELECTIONS | \ + REP_C_PREFMAS_CLIENT | REP_C_PREFMAS_MASTER) + +#define TURNING_ON_PREFMAS(orig, curr) \ + ((FLD_ISSET(curr, REP_C_PREFMAS_MASTER) && \ + !FLD_ISSET(orig, REP_C_PREFMAS_MASTER)) || \ + (FLD_ISSET(curr, REP_C_PREFMAS_CLIENT) && \ + !FLD_ISSET(orig, REP_C_PREFMAS_CLIENT))) ENV_NOT_CONFIGURED( env, db_rep->region, "DB_ENV->rep_set_config", DB_INIT_REP); @@ -224,6 +236,62 @@ __rep_set_config(dbenv, which, on) return (EINVAL); } /* + * The undocumented ELECT_LOGLENGTH option and the preferred + * master options cannot be changed after calling repmgr_start. + */ + if (FLD_ISSET(mapped, (REP_C_ELECT_LOGLENGTH | + REP_C_PREFMAS_MASTER | REP_C_PREFMAS_CLIENT)) && + F_ISSET(rep, REP_F_START_CALLED)) { + __db_errx(env, DB_STR("3706", + "DB_ENV->rep_set_config: %s " + "must be configured before DB_ENV->repmgr_start"), + FLD_ISSET(mapped, REP_C_ELECT_LOGLENGTH) ? + "ELECT_LOGLENGTH" : "preferred master"); + ENV_LEAVE(env, ip); + return (EINVAL); + } + /* + * Do not allow users to turn on preferred master if + * leases or in-memory replication files are in effect, + * or with a private environment or in-memory log files. + */ + if (FLD_ISSET(mapped, + (REP_C_PREFMAS_MASTER | REP_C_PREFMAS_CLIENT)) && + (REP_CONFIG_IS_SET(env, (REP_C_LEASE | REP_C_INMEM)) || + (__log_get_config(dbenv, + DB_LOG_IN_MEMORY, &inmemlog) == 0 && + (inmemlog > 0 || F_ISSET(env, ENV_PRIVATE))))) { + __db_errx(env, DB_STR("3707", + "DB_ENV->rep_set_config: preferred master mode " + "cannot be used with %s"), + REP_CONFIG_IS_SET(env, REP_C_LEASE) ? + "master leases" : + REP_CONFIG_IS_SET(env, REP_C_INMEM) ? + "in-memory replication files" : + inmemlog > 0 ? "in-memory log files" : + "a private environment"); + ENV_LEAVE(env, ip); + return (EINVAL); + } + /* + * If we are already in preferred master mode, we can't + * turn off elections or 2site_strict and we can't turn on + * leases. + */ + if (PREFMAS_IS_SET(env) && ((FLD_ISSET(mapped, + (REP_C_ELECTIONS | REP_C_2SITE_STRICT)) && on == 0) || + (FLD_ISSET(mapped, REP_C_LEASE) && on > 0))) { + __db_errx(env, DB_STR("3708", + "DB_ENV->rep_set_config: cannot %s %s " + "in preferred master mode"), + on == 0 ? "disable" : "enable", + FLD_ISSET(mapped, REP_C_ELECTIONS) ? "elections" : + FLD_ISSET(mapped, REP_C_LEASE) ? "leases" : + "2SITE_STRICT"); + ENV_LEAVE(env, ip); + return (EINVAL); + } + /* * Leases must be turned on before calling rep_start. * Leases can never be turned off once they're turned on. */ @@ -252,6 +320,17 @@ __rep_set_config(dbenv, which, on) else FLD_CLR(rep->config, mapped); +#ifdef HAVE_REPLICATION_THREADS + /* Do automatic preferred master configuration. */ + if (TURNING_ON_PREFMAS(orig, rep->config) && + (pm_ret = __repmgr_prefmas_auto_config(dbenv, + &rep->config)) != 0) { + REP_SYSTEM_UNLOCK(env); + MUTEX_UNLOCK(env, rep->mtx_clientdb); + ENV_LEAVE(env, ip); + goto prefmas_err; + } +#endif /* * Bulk transfer requires special processing if it is getting * toggled. @@ -297,10 +376,25 @@ __rep_set_config(dbenv, which, on) ret = t_ret; #endif } else { + orig = db_rep->config; if (on) FLD_SET(db_rep->config, mapped); else FLD_CLR(db_rep->config, mapped); +#ifdef HAVE_REPLICATION_THREADS + /* Do automatic preferred master configuration. */ + if (TURNING_ON_PREFMAS(orig, db_rep->config)) + pm_ret = + __repmgr_prefmas_auto_config(dbenv, + &db_rep->config); +#endif + } +prefmas_err: + if (pm_ret != 0) { + __db_errx(env, DB_STR("3709", + "DB_ENV->rep_set_config: could not complete automatic " + "preferred master configuration")); + ret = EINVAL; } /* Configuring 2SITE_STRICT, etc. makes this a repmgr application */ if (ret == 0 && FLD_ISSET(mapped, REPMGR_FLAGS)) @@ -331,6 +425,10 @@ __rep_config_map(env, inflagsp, outflagsp) FLD_SET(*outflagsp, REP_C_DELAYCLIENT); FLD_CLR(*inflagsp, DB_REP_CONF_DELAYCLIENT); } + if (FLD_ISSET(*inflagsp, DB_REP_CONF_ELECT_LOGLENGTH)) { + FLD_SET(*outflagsp, REP_C_ELECT_LOGLENGTH); + FLD_CLR(*inflagsp, DB_REP_CONF_ELECT_LOGLENGTH); + } if (FLD_ISSET(*inflagsp, DB_REP_CONF_INMEM)) { FLD_SET(*outflagsp, REP_C_INMEM); FLD_CLR(*inflagsp, DB_REP_CONF_INMEM); @@ -351,6 +449,14 @@ __rep_config_map(env, inflagsp, outflagsp) FLD_SET(*outflagsp, REP_C_ELECTIONS); FLD_CLR(*inflagsp, DB_REPMGR_CONF_ELECTIONS); } + if (FLD_ISSET(*inflagsp, DB_REPMGR_CONF_PREFMAS_CLIENT)) { + FLD_SET(*outflagsp, REP_C_PREFMAS_CLIENT); + FLD_CLR(*inflagsp, DB_REPMGR_CONF_PREFMAS_CLIENT); + } + if (FLD_ISSET(*inflagsp, DB_REPMGR_CONF_PREFMAS_MASTER)) { + FLD_SET(*outflagsp, REP_C_PREFMAS_MASTER); + FLD_CLR(*inflagsp, DB_REPMGR_CONF_PREFMAS_MASTER); + } DB_ASSERT(env, *inflagsp == 0); } @@ -368,8 +474,10 @@ __rep_start_pp(dbenv, dbt, flags) DBT *dbt; u_int32_t flags; { - DB_REP *db_rep; ENV *env; + DB_REP *db_rep; + DB_THREAD_INFO *ip; + int ret; env = dbenv->env; db_rep = env->rep_handle; @@ -400,7 +508,11 @@ __rep_start_pp(dbenv, dbt, flags) return (EINVAL); } - return (__rep_start_int(env, dbt, flags)); + ENV_ENTER(env, ip); + ret = __rep_start_int(env, dbt, flags, 0); + ENV_LEAVE(env, ip); + + return (ret); } /* @@ -432,13 +544,14 @@ __rep_start_pp(dbenv, dbt, flags) * clients that reference non-existent files whose creation was backed out * during a synchronizing recovery. * - * PUBLIC: int __rep_start_int __P((ENV *, DBT *, u_int32_t)); + * PUBLIC: int __rep_start_int __P((ENV *, DBT *, u_int32_t, u_int32_t)); */ int -__rep_start_int(env, dbt, flags) +__rep_start_int(env, dbt, flags, startopts) ENV *env; DBT *dbt; u_int32_t flags; + u_int32_t startopts; { DB *dbp; DB_LOG *dblp; @@ -474,9 +587,31 @@ __rep_start_int(env, dbt, flags) return (EINVAL); } - ENV_ENTER(env, ip); + /* + * If we are a view, we can never become master. + */ + if (IS_VIEW_SITE(env) && role == DB_REP_MASTER) { + __db_errx(env, DB_STR("3685", + "View site cannot become master")); + return (EINVAL); + } + + /* + * Check for consistent view usage. We need to check here rather + * than in __rep_open because non-rep-aware processes such as + * db_stat may open/join the environment. Rep-aware handles must + * consistently set the view. + */ + if ((ret = __rep_check_view(env)) != 0) { + RPRINT(env, (env, DB_VERB_REP_MISC, + "Application env/view mismatch.")); + __db_errx(env, DB_STR("3686", + "Application environment and view callback mismatch")); + return (ret); + } /* Serialize rep_start() calls. */ + ENV_GET_THREAD_INFO(env, ip); MUTEX_LOCK(env, rep->mtx_repstart); start_th = 1; @@ -492,8 +627,14 @@ __rep_start_int(env, dbt, flags) goto out; REP_SYSTEM_LOCK(env); + /* + * The FORCE_ROLECHG option is used when a side-effect of the role + * change such as incrementing the master gen is needed regardless + * of the previous role. + */ role_chg = (!F_ISSET(rep, REP_F_MASTER) && role == DB_REP_MASTER) || - (!F_ISSET(rep, REP_F_CLIENT) && role == DB_REP_CLIENT); + (!F_ISSET(rep, REP_F_CLIENT) && role == DB_REP_CLIENT) || + FLD_ISSET(startopts, REP_START_FORCE_ROLECHG); /* * There is no need for lockout if all we're doing is sending a message. @@ -511,9 +652,11 @@ __rep_start_int(env, dbt, flags) goto out; } - if (FLD_ISSET(rep->lockout_flags, REP_LOCKOUT_MSG)) { + if (!FLD_ISSET(startopts, REP_START_WAIT_LOCKMSG) && + FLD_ISSET(rep->lockout_flags, REP_LOCKOUT_MSG)) { /* - * There is already someone in msg lockout. Return. + * There is already someone in msg lockout and we are not + * waiting. Return. */ RPRINT(env, (env, DB_VERB_REP_MISC, "Thread already in msg lockout")); @@ -702,10 +845,15 @@ __rep_start_int(env, dbt, flags) * now defunct on master. * NEWFILE: Used to delay client apply during newfile * operation, not applicable to master. + * READONLY_MASTER: Used to coordinate preferred master + * takeover, should not remain in effect after restart. + * HOLD_GEN: Freeze gen for preferred master, should not + * remain in effect after restart. */ F_CLR(rep, REP_F_CLIENT | REP_F_ABBREVIATED | REP_F_MASTERELECT | REP_F_SKIPPED_APPLY | REP_F_DELAY | - REP_F_LEASE_EXPIRED | REP_F_NEWFILE); + REP_F_LEASE_EXPIRED | REP_F_NEWFILE | + REP_F_READONLY_MASTER | REP_F_HOLD_GEN); /* * When becoming a master, set the following flags: * MASTER: Indicate that this site is master. @@ -842,11 +990,16 @@ __rep_start_int(env, dbt, flags) } /* * When becoming a client, clear the following flags: + * HOLD_GEN: Freeze gen for preferred master, should not + * remain in effect after restart. * MASTER: Site is no longer a master. * MASTERELECT: Indicates that a master is elected * rather than appointed, not applicable on client. + * READONLY_MASTER: Used to coordinate preferred master + * takeover, should not remain in effect after restart. */ - F_CLR(rep, REP_F_MASTER | REP_F_MASTERELECT); + F_CLR(rep, REP_F_HOLD_GEN | REP_F_MASTER | REP_F_MASTERELECT | + REP_F_READONLY_MASTER); F_SET(rep, REP_F_CLIENT); /* @@ -928,6 +1081,15 @@ __rep_start_int(env, dbt, flags) * sync with the master. */ SET_GEN(0); + /* + * If we are changing role to client, reset our min log file + * until we hear from a master or another client. In + * particular, in a dupmaster situation, if this site loses + * an election a stale min_log_file would prevent archiving. + */ +#ifdef HAVE_REPLICATION_THREADS + rep->min_log_file = 0; +#endif REP_SYSTEM_UNLOCK(env); /* @@ -935,6 +1097,15 @@ __rep_start_int(env, dbt, flags) */ if ((ret = __dbt_usercopy(env, dbt)) != 0) goto out; + /* + * The HOLD_CLIGEN option does not allow this client's + * gen to change until the REP_F_HOLD_GEN flag is cleared. + * It prevents this site from responding to NEWMASTER messages + * and disables updating the gen from other incoming messages. + */ + if (FLD_ISSET(startopts, REP_START_HOLD_CLIGEN)) + F_SET(rep, REP_F_HOLD_GEN); + (void)__rep_send_message(env, DB_EID_BROADCAST, REP_NEWCLIENT, NULL, dbt, 0, 0); } @@ -967,7 +1138,6 @@ out: if (start_th) MUTEX_UNLOCK(env, rep->mtx_repstart); __dbt_userfree(env, dbt, NULL, NULL); - ENV_LEAVE(env, ip); return (ret); } @@ -1170,6 +1340,9 @@ __rep_client_dbinit(env, startup, which) if (which == REP_DB) { name = REPDBNAME; rdbpp = &db_rep->rep_db; + } else if (which == REP_BLOB) { + name = REPBLOBNAME; + rdbpp = &db_rep->blob_dbp; } else { name = REPPAGENAME; rdbpp = &db_rep->file_dbp; @@ -1209,16 +1382,28 @@ __rep_client_dbinit(env, startup, which) if (which == REP_DB && (ret = __bam_set_bt_compare(dbp, __rep_bt_cmp)) != 0) goto err; + if (which == REP_BLOB && + (ret = __bam_set_bt_compare(dbp, __rep_blob_cmp)) != 0 && + (ret = __db_set_dup_compare(dbp, __rep_offset_cmp)) != 0) + goto err; /* Don't write log records on the client. */ if ((ret = __db_set_flags(dbp, DB_TXN_NOT_DURABLE)) != 0) goto err; + /* Blob gap processing requires sorted duplicates. */ + if (which == REP_BLOB) { + if ((ret = __db_set_blob_threshold(dbp, 0, 0)) != 0) + goto err; + if ((ret = __db_set_flags(dbp, DB_DUPSORT)) != 0) + goto err; + } + flags = DB_NO_AUTO_COMMIT | DB_CREATE | DB_INTERNAL_TEMPORARY_DB | (F_ISSET(env, ENV_THREAD) ? DB_THREAD : 0); if ((ret = __db_open(dbp, ip, NULL, fname, subdb, - (which == REP_DB ? DB_BTREE : DB_RECNO), + (which == REP_PG ? DB_RECNO : DB_BTREE), flags, 0, PGNO_BASE_MD)) != 0) goto err; @@ -1243,14 +1428,16 @@ err: if (dbp != NULL && * care about the LSNs. */ static int -__rep_bt_cmp(dbp, dbt1, dbt2) +__rep_bt_cmp(dbp, dbt1, dbt2, locp) DB *dbp; const DBT *dbt1, *dbt2; + size_t *locp; { DB_LSN lsn1, lsn2; __rep_control_args *rp1, *rp2; COMPQUIET(dbp, NULL); + COMPQUIET(locp, NULL); rp1 = dbt1->data; rp2 = dbt2->data; @@ -1274,6 +1461,82 @@ __rep_bt_cmp(dbp, dbt1, dbt2) } /* + * __rep_blob_cmp -- + * + * Comparison function for the blob gap database. The key is the blob_sid + * appended with the blob_id. + * + * PUBLIC: int __rep_blob_cmp __P((DB *, const DBT *, const DBT *, size_t *)); + */ +int +__rep_blob_cmp(dbp, dbt1, dbt2, locp) + DB *dbp; + const DBT *dbt1, *dbt2; + size_t *locp; +{ + db_seq_t blob_id1, blob_id2, blob_sid1, blob_sid2; + u_int8_t *p; + + COMPQUIET(dbp, NULL); + COMPQUIET(locp, NULL); + + /* Use memcpy here to prevent alignment issues. */ + p = dbt1->data; + memcpy(&blob_sid1, p, sizeof(db_seq_t)); + p += sizeof(db_seq_t); + memcpy(&blob_id1, p, sizeof(db_seq_t)); + p = dbt2->data; + memcpy(&blob_sid2, p, sizeof(db_seq_t)); + p += sizeof(db_seq_t); + memcpy(&blob_id2, p, sizeof(db_seq_t)); + + if (blob_sid1 > blob_sid2) + return (1); + + if (blob_sid1 < blob_sid2) + return (-1); + + if (blob_id1 > blob_id2) + return (1); + + if (blob_id1 < blob_id2) + return (-1); + + return (0); +} + +/* + * __rep_offset_cmp -- + * + * Comparison function for duplicates in the the blob gap database. + * + * PUBLIC: int __rep_offset_cmp + * PUBLIC: __P((DB *, const DBT *, const DBT *, size_t *)); + */ +int +__rep_offset_cmp(dbp, dbt1, dbt2, locp) + DB *dbp; + const DBT *dbt1, *dbt2; + size_t *locp; +{ + off_t offset1, offset2; + + COMPQUIET(dbp, NULL); + COMPQUIET(locp, NULL); + + /* Use memcpy here to prevent alignment issues. */ + memcpy(&offset1, dbt1->data, sizeof(off_t)); + memcpy(&offset2, dbt2->data, sizeof(off_t)); + + if (offset1 == offset2) + return (0); + else if (offset1 > offset2) + return (1); + + return (-1); +} + +/* * __rep_abort_prepared -- * Abort any prepared transactions that recovery restored. * @@ -1684,7 +1947,10 @@ __rep_set_nsites_pp(dbenv, n) "DB_ENV->rep_set_nsites: cannot call from Replication Manager application")); return (EINVAL); } - if ((ret = __rep_set_nsites_int(env, n)) == 0) + ENV_ENTER(env, ip); + ret = __rep_set_nsites_int(env, n); + ENV_LEAVE(env, ip); + if (ret == 0) APP_SET_BASEAPI(env); return (ret); } @@ -1748,18 +2014,15 @@ __rep_get_nsites(dbenv, n) } /* - * PUBLIC: int __rep_set_priority __P((DB_ENV *, u_int32_t)); + * PUBLIC: int __rep_set_priority_pp __P((DB_ENV *, u_int32_t)); */ int -__rep_set_priority(dbenv, priority) +__rep_set_priority_pp(dbenv, priority) DB_ENV *dbenv; u_int32_t priority; { DB_REP *db_rep; ENV *env; - REP *rep; - u_int32_t prev; - int ret; env = dbenv->env; db_rep = env->rep_handle; @@ -1767,6 +2030,30 @@ __rep_set_priority(dbenv, priority) ENV_NOT_CONFIGURED( env, db_rep->region, "DB_ENV->rep_set_priority", DB_INIT_REP); + if (PREFMAS_IS_SET(env)) { + __db_errx(env, DB_STR_A("3710", +"%s: cannot change priority in preferred master mode.", + "%s"), "DB_ENV->rep_set_priority"); + return (EINVAL); + } + + return (__rep_set_priority_int(env, priority)); +} + +/* + * PUBLIC: int __rep_set_priority_int __P((ENV *, u_int32_t)); + */ +int +__rep_set_priority_int(env, priority) + ENV *env; + u_int32_t priority; +{ + DB_REP *db_rep; + REP *rep; + u_int32_t prev; + int ret; + + db_rep = env->rep_handle; ret = 0; if (REP_ON(env)) { rep = db_rep->region; @@ -1807,10 +2094,10 @@ __rep_get_priority(dbenv, priority) } /* - * PUBLIC: int __rep_set_timeout __P((DB_ENV *, int, db_timeout_t)); + * PUBLIC: int __rep_set_timeout_pp __P((DB_ENV *, int, db_timeout_t)); */ int -__rep_set_timeout(dbenv, which, timeout) +__rep_set_timeout_pp(dbenv, which, timeout) DB_ENV *dbenv; int which; db_timeout_t timeout; @@ -1818,13 +2105,10 @@ __rep_set_timeout(dbenv, which, timeout) DB_REP *db_rep; DB_THREAD_INFO *ip; ENV *env; - REP *rep; int repmgr_timeout, ret; env = dbenv->env; db_rep = env->rep_handle; - rep = db_rep->region; - ret = 0; repmgr_timeout = 0; if (timeout == 0 && (which == DB_REP_CONNECTION_RETRY || @@ -1850,12 +2134,46 @@ __rep_set_timeout(dbenv, which, timeout) return (EINVAL); } if (which == DB_REP_LEASE_TIMEOUT && IS_REP_STARTED(env)) { - ret = EINVAL; __db_errx(env, DB_STR_A("3568", "%s: lease timeout must be set before DB_ENV->rep_start.", "%s"), "DB_ENV->rep_set_timeout"); return (EINVAL); } + if (PREFMAS_IS_SET(env) && + (which == DB_REP_HEARTBEAT_MONITOR || + which == DB_REP_HEARTBEAT_SEND) && + timeout == 0) { + __db_errx(env, DB_STR_A("3711", +"%s: cannot turn off heartbeat timeout in preferred master mode.", + "%s"), "DB_ENV->rep_set_timeout"); + return (EINVAL); + } + + ret = __rep_set_timeout_int(env, which, timeout); + + /* Setting a repmgr timeout makes this a repmgr application */ + if (ret == 0 && repmgr_timeout) + APP_SET_REPMGR(env); + return (ret); + +} + +/* + * PUBLIC: int __rep_set_timeout_int __P((ENV *, int, db_timeout_t)); + */ +int +__rep_set_timeout_int(env, which, timeout) + ENV *env; + int which; + db_timeout_t timeout; +{ + DB_REP *db_rep; + REP *rep; + int ret; + + db_rep = env->rep_handle; + rep = db_rep->region; + ret = 0; switch (which) { case DB_REP_CHECKPOINT_DELAY: @@ -1888,6 +2206,7 @@ __rep_set_timeout(dbenv, which, timeout) rep->ack_timeout = timeout; else db_rep->ack_timeout = timeout; + ADJUST_AUTOTAKEOVER_WAITS(db_rep, timeout); break; case DB_REP_CONNECTION_RETRY: if (REP_ON(env)) @@ -1919,10 +2238,6 @@ __rep_set_timeout(dbenv, which, timeout) "Unknown timeout type argument to DB_ENV->rep_set_timeout")); ret = EINVAL; } - - /* Setting a repmgr timeout makes this a repmgr application */ - if (ret == 0 && repmgr_timeout) - APP_SET_REPMGR(env); return (ret); } @@ -2099,6 +2414,144 @@ __rep_set_request(dbenv, min, max) } /* + * __rep_set_view -- + * Set the view/partial replication function. + * + * PUBLIC: int __rep_set_view __P((DB_ENV *, + * PUBLIC: int (*)(DB_ENV *, const char *, int *, u_int32_t))); + */ +int +__rep_set_view(dbenv, f_partial) + DB_ENV *dbenv; + int (*f_partial) __P((DB_ENV *, + const char *, int *, u_int32_t)); +{ + DB_REP *db_rep; + ENV *env; + + env = dbenv->env; + db_rep = env->rep_handle; + + ENV_NOT_CONFIGURED( + env, db_rep->region, "DB_ENV->rep_set_view", DB_INIT_REP); + + ENV_ILLEGAL_AFTER_OPEN(env, "DB_ENV->rep_set_view"); + + if (f_partial == NULL) + db_rep->partial = __rep_defview; + else + db_rep->partial = f_partial; + return (0); +} + +/* + * __rep_defview -- + * Default view function. Always replicate. + */ +static int +__rep_defview(dbenv, name, result, flags) + DB_ENV *dbenv; + const char *name; + int *result; + u_int32_t flags; +{ + COMPQUIET(dbenv, NULL); + COMPQUIET(name, NULL); + COMPQUIET(flags, 0); + *result = 1; + return (0); +} + +/* + * __rep_call_partial -- + * Calls the partial function, after doing some checks required for + * handling blobs. + * + * PUBLIC: int __rep_call_partial + * PUBLIC: __P((ENV *, const char *, int *, u_int32_t, DELAYED_BLOB_LIST **)); + */ +int +__rep_call_partial(env, name, result, flags, lsp) + ENV *env; + const char *name; + int *result; + u_int32_t flags; + DELAYED_BLOB_LIST **lsp; +{ + DB_LOG *dblp; + DB_REP *db_rep; + DELAYED_BLOB_LIST *dbl; + FNAME *fname; + db_seq_t blob_file_id; + char *file_name; + int ret; + + ret = 0; + blob_file_id = 0; + db_rep = env->rep_handle; + dblp = env->lg_handle; + fname = NULL; + + /* + * If the database being sent is a blob meta database or file, then the + * name of its associated database needs to be passed to the partial + * function. To do this, use the blob file id in the path to the + * file to look up the blob_file_id of the associated database. That + * can be used to look up the name of the associated database through + * dbreg. + */ + if (db_rep->partial == __rep_defview || + (!IS_BLOB_META(name) && !IS_BLOB_FILE(name))) { + ret = db_rep->partial(env->dbenv, name, result, flags); + } else { + /* + * The top level blob meta database must always be replicated. + */ + if (strcmp(name, BLOB_META_FILE_NAME) == 0) { + *result = 1; + return (ret); + } + if ((ret = __blob_path_to_dir_ids( + env, name, &blob_file_id, NULL)) != 0) + return (ret); + DB_ASSERT(env, blob_file_id > 0); + + /* + * It is possible that the database that owns this blob meta + * database has not yet been processed on the client when + * processing the transaction, so assume it is not replicated. + * Return its information and process it later when its + * owning database is processed (which must happen in the + * same transaction). + */ + if (__dbreg_blob_file_to_fname( + dblp, blob_file_id, 0, &fname) != 0) { + if ((ret = __os_malloc( + env, sizeof(DELAYED_BLOB_LIST), &dbl)) != 0) + return (ret); + memset(dbl, 0, sizeof(DELAYED_BLOB_LIST)); + dbl->blob_file_id = blob_file_id; + if (*lsp == NULL) + *lsp = dbl; + else { + dbl->next = *lsp; + (*lsp)->prev = dbl; + *lsp = dbl; + } + *result = 0; + return (0); + } + + file_name = fname->fname_off == INVALID_ROFF ? + NULL : R_ADDR(&dblp->reginfo, fname->fname_off); + DB_ASSERT(env, file_name != NULL); + ret = db_rep->partial(env->dbenv, file_name, result, flags); + } + + return (ret); +} + +/* * __rep_set_transport_pp -- * Set the transport function for replication. * @@ -2288,25 +2741,46 @@ __rep_set_clockskew(dbenv, fast_clock, slow_clock) } /* - * __rep_flush -- + * __rep_flush_pp -- * Re-push the last log record to all clients, in case they've lost * messages and don't know it. * - * PUBLIC: int __rep_flush __P((DB_ENV *)); + * PUBLIC: int __rep_flush_pp __P((DB_ENV *)); */ int -__rep_flush(dbenv) +__rep_flush_pp (dbenv) DB_ENV *dbenv; { + ENV *env; + DB_THREAD_INFO *ip; + int ret; + + env = dbenv->env; + + ENV_ENTER(env, ip); + ret = __rep_flush_int(env); + ENV_LEAVE(env, ip); + + return (ret); +} + +/* + * __rep_flush_int -- + * Re-push the last log record to all clients, in case they've lost + * messages and don't know it. + * + * PUBLIC: int __rep_flush_int __P((ENV *)); + */ +int +__rep_flush_int(env) + ENV *env; +{ DBT rec; DB_LOGC *logc; DB_LSN lsn; DB_REP *db_rep; - DB_THREAD_INFO *ip; - ENV *env; int ret, t_ret; - env = dbenv->env; db_rep = env->rep_handle; ENV_REQUIRES_CONFIG_XX( @@ -2322,8 +2796,6 @@ __rep_flush(dbenv) return (EINVAL); } - ENV_ENTER(env, ip); - if ((ret = __log_cursor(env, &logc)) != 0) return (ret); @@ -2338,7 +2810,6 @@ __rep_flush(dbenv) err: if ((t_ret = __logc_close(logc)) != 0 && ret == 0) ret = t_ret; - ENV_LEAVE(env, ip); return (ret); } @@ -2693,7 +3164,7 @@ __rep_check_applied(env, ip, commit_info, reasonp) */ if (commit_info->gen == gen) { ret = __rep_read_lsn_history(env, - ip, &txn, &dbc, gen, &hist, reasonp, DB_SET); + ip, &txn, &dbc, gen, &hist, reasonp, DB_SET, 1); if (ret == DB_NOTFOUND) { /* * We haven't yet received the LSN history of the @@ -2720,7 +3191,7 @@ __rep_check_applied(env, ip, commit_info, reasonp) * masters at the same gen, and the txn of interest was * rolled back. */ - ret = DB_NOTFOUND; + ret = USR_ERR(env, DB_NOTFOUND); goto out; } @@ -2750,7 +3221,7 @@ __rep_check_applied(env, ip, commit_info, reasonp) * description of the txn of interest doesn't match what we see * in the history available to us now. */ - ret = DB_NOTFOUND; + ret = USR_ERR(env, DB_NOTFOUND); } else if (commit_info->gen < gen || gen == 0) { /* @@ -2759,10 +3230,10 @@ __rep_check_applied(env, ip, commit_info, reasonp) * the token LSN is within the close/open range defined by * [base,next). */ - ret = __rep_read_lsn_history(env, - ip, &txn, &dbc, commit_info->gen, &hist, reasonp, DB_SET); - t_ret = __rep_read_lsn_history(env, - ip, &txn, &dbc, commit_info->gen, &hist2, reasonp, DB_NEXT); + ret = __rep_read_lsn_history(env, ip, + &txn, &dbc, commit_info->gen, &hist, reasonp, DB_SET, 1); + t_ret = __rep_read_lsn_history(env, ip, + &txn, &dbc, commit_info->gen, &hist2, reasonp, DB_NEXT, 1); if (ret == DB_NOTFOUND) { /* * If the desired gen is not in our database, it could @@ -2812,7 +3283,7 @@ __rep_check_applied(env, ip, commit_info, reasonp) * don't match, meaning the txn was written at a dup * master and that gen instance was rolled back. */ - ret = DB_NOTFOUND; + ret = USR_ERR(env, DB_NOTFOUND); goto out; } @@ -2837,7 +3308,7 @@ __rep_check_applied(env, ip, commit_info, reasonp) LOG_COMPARE(&commit_info->lsn, &hist2.lsn) < 0) ret = 0; else - ret = DB_NOTFOUND; + ret = USR_ERR(env, DB_NOTFOUND); } else { /* * Token names a future gen. If we're a client and the LSN also @@ -2851,7 +3322,7 @@ __rep_check_applied(env, ip, commit_info, reasonp) reasonp->u.gen = commit_info->gen; return (DB_TIMEOUT); } - return (DB_NOTFOUND); + return (USR_ERR(env, DB_NOTFOUND)); } out: @@ -2867,9 +3338,19 @@ out: /* * The txn and dbc handles are owned by caller, though we create them if * necessary. Caller is responsible for closing them. + * + * The use_cache option is enabled for the read-your-writes feature, which + * makes frequent requests for the cached information (envid and lsn) when it + * is in use. Callers that require information that is not cached (e.g. + * timestamp) should not set use_cache. + * + * PUBLIC: int __rep_read_lsn_history __P((ENV *, DB_THREAD_INFO *, DB_TXN **, + * PUBLIC: DBC **, u_int32_t, __rep_lsn_hist_data_args *, + * PUBLIC: struct rep_waitgoal *, u_int32_t, int)); */ -static int -__rep_read_lsn_history(env, ip, txn, dbc, gen, gen_infop, reasonp, flags) +int +__rep_read_lsn_history(env, ip, txn, dbc, gen, gen_infop, reasonp, flags, + use_cache) ENV *env; DB_THREAD_INFO *ip; DB_TXN **txn; @@ -2878,6 +3359,7 @@ __rep_read_lsn_history(env, ip, txn, dbc, gen, gen_infop, reasonp, flags) __rep_lsn_hist_data_args *gen_infop; struct rep_waitgoal *reasonp; u_int32_t flags; + int use_cache; { DB_REP *db_rep; REP *rep; @@ -2898,7 +3380,8 @@ __rep_read_lsn_history(env, ip, txn, dbc, gen, gen_infop, reasonp, flags) /* Simply return cached info, if we already have it. */ desired_gen = flags == DB_SET ? gen : gen + 1; REP_SYSTEM_LOCK(env); - if (rep->gen == desired_gen && !IS_ZERO_LSN(rep->gen_base_lsn)) { + if (use_cache && rep->gen == desired_gen && + !IS_ZERO_LSN(rep->gen_base_lsn)) { gen_infop->lsn = rep->gen_base_lsn; gen_infop->envid = rep->master_envid; goto unlock; @@ -3005,8 +3488,14 @@ __rep_conv_vers(env, log_ver) /* * We can't use a switch statement, some of the DB_LOGVERSION_XX - * constants are the same + * constants are the same. */ + if (log_ver == DB_LOGVERSION_61) + return (DB_REPVERSION_61); + if (log_ver == DB_LOGVERSION_60p1) + return (DB_REPVERSION_60); + if (log_ver == DB_LOGVERSION_60) + return (DB_REPVERSION_60); if (log_ver == DB_LOGVERSION_53) return (DB_REPVERSION_53); if (log_ver == DB_LOGVERSION_52) diff --git a/src/rep/rep_record.c b/src/rep/rep_record.c index f4691974..b206e60e 100644 --- a/src/rep/rep_record.c +++ b/src/rep/rep_record.c @@ -1,7 +1,7 @@ /*- * See the file LICENSE for redistribution information. * - * Copyright (c) 2001, 2012 Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2001, 2015 Oracle and/or its affiliates. All rights reserved. * * $Id$ */ @@ -9,13 +9,17 @@ #include "db_config.h" #include "db_int.h" +#include "dbinc/blob.h" #include "dbinc/db_page.h" #include "dbinc/db_am.h" #include "dbinc/lock.h" #include "dbinc/mp.h" #include "dbinc/txn.h" -static int __rep_collect_txn __P((ENV *, DB_LSN *, LSN_COLLECTION *)); +static int __rep_collect_txn + __P((ENV *, DB_LSN *, LSN_COLLECTION *, DELAYED_BLOB_LIST **)); +static int __rep_remove_delayed_blobs + __P((ENV *, db_seq_t, u_int32_t ,DELAYED_BLOB_LIST **)); static int __rep_do_ckp __P((ENV *, DBT *, __rep_control_args *)); static int __rep_fire_newmaster __P((ENV *, u_int32_t, int)); static int __rep_fire_startupdone __P((ENV *, u_int32_t, int)); @@ -153,6 +157,7 @@ __rep_process_message_pp(dbenv, control, rec, eid, ret_lsnp) DB_LSN *ret_lsnp; { ENV *env; + DB_THREAD_INFO *ip; int ret; env = dbenv->env; @@ -193,7 +198,9 @@ __rep_process_message_pp(dbenv, control, rec, eid, ret_lsnp) return (ret); } + ENV_ENTER(env, ip); ret = __rep_process_message_int(env, control, rec, eid, ret_lsnp); + ENV_LEAVE(env, ip); __dbt_userfree(env, control, rec, NULL); return (ret); @@ -289,8 +296,7 @@ __rep_process_message_int(env, control, rec, eid, ret_lsnp) if (ret_lsnp != NULL) ZERO_LSN(*ret_lsnp); - ENV_ENTER(env, ip); - + ENV_GET_THREAD_INFO(env, ip); REP_PRINT_MESSAGE(env, eid, rp, "rep_process_message", 0); /* * Check the version number for both rep and log. If it is @@ -303,8 +309,7 @@ __rep_process_message_int(env, control, rec, eid, ret_lsnp) "%lu %d"), (u_long)rp->rep_version, DB_REPVERSION_MIN); - ret = EINVAL; - goto errlock; + return (EINVAL); } VPRINT(env, (env, DB_VERB_REP_MSGS, "Received record %lu with old rep version %lu", @@ -322,8 +327,7 @@ __rep_process_message_int(env, control, rec, eid, ret_lsnp) __db_errx(env, DB_STR_A("3517", "unexpected replication message version %lu, expected %d", "%lu %d"), (u_long)rp->rep_version, DB_REPVERSION); - ret = EINVAL; - goto errlock; + return (EINVAL); } if (rp->log_version < DB_LOGVERSION) { @@ -332,8 +336,7 @@ __rep_process_message_int(env, control, rec, eid, ret_lsnp) "unsupported old replication log version %lu, minimum version %d", "%lu %d"), (u_long)rp->log_version, DB_LOGVERSION_MIN); - ret = EINVAL; - goto errlock; + return (EINVAL); } VPRINT(env, (env, DB_VERB_REP_MSGS, "Received record %lu with old log version %lu", @@ -342,8 +345,7 @@ __rep_process_message_int(env, control, rec, eid, ret_lsnp) __db_errx(env, DB_STR_A("3519", "unexpected log record version %lu, expected %d", "%lu %d"), (u_long)rp->log_version, DB_LOGVERSION); - ret = EINVAL; - goto errlock; + return (EINVAL); } /* @@ -465,9 +467,14 @@ __rep_process_message_int(env, control, rec, eid, ret_lsnp) * accept the generation number and participate in future * elections and communication. Otherwise, I need to hear about * a new master and sync up. + * + * But do not do any of this if REP_F_HOLD_GEN is set. In + * this case we keep the site at its current gen until we + * clear this flag. */ - if (rp->rectype == REP_ALIVE || - rp->rectype == REP_VOTE1 || rp->rectype == REP_VOTE2) { + if ((rp->rectype == REP_ALIVE || + rp->rectype == REP_VOTE1 || rp->rectype == REP_VOTE2) && + !F_ISSET(rep, REP_F_HOLD_GEN)) { REP_SYSTEM_LOCK(env); RPRINT(env, (env, DB_VERB_REP_MSGS, "Updating gen from %lu to %lu", @@ -593,6 +600,38 @@ __rep_process_message_int(env, control, rec, eid, ret_lsnp) ret = __rep_allreq(env, rp, eid); CLIENT_REREQ; break; + case REP_BLOB_ALL_REQ: + /* Blobs do not support peer-to-peer. */ + RECOVERING_SKIP; + MASTER_ONLY(rep, rp); + ret = __rep_blob_allreq(env, eid, rec); + CLIENT_REREQ; + break; + case REP_BLOB_CHUNK: + /* Handle even if in recovery. */ + CLIENT_ONLY(rep, rp); + ret = __rep_blob_chunk(env, eid, ip, rec); + if (ret == DB_REP_PAGEDONE) + ret = 0; + break; + case REP_BLOB_CHUNK_REQ: + /* Blobs do not support peer-to-peer. */ + RECOVERING_SKIP; + MASTER_ONLY(rep, rp); + ret = __rep_blob_chunk_req(env, eid, rec); + CLIENT_REREQ; + break; + case REP_BLOB_UPDATE: + CLIENT_ONLY(rep, rp); + ret = __rep_blob_update(env, eid, ip, rec); + break; + case REP_BLOB_UPDATE_REQ: + MASTER_ONLY(rep, rp); + infop = env->reginfo; + renv = infop->primary; + MASTER_UPDATE(env, renv); + ret = __rep_blob_update_req(env, ip, rec); + break; case REP_BULK_LOG: RECOVERING_LOG_SKIP; CLIENT_ONLY(rep, rp); @@ -1059,8 +1098,6 @@ out: *ret_lsnp = rp->lsn; ret = DB_REP_NOTPERM; } - __dbt_userfree(env, control, rec, NULL); - ENV_LEAVE(env, ip); return (ret); } @@ -1290,8 +1327,24 @@ gap_check: #endif } - if (ret == DB_KEYEXIST) + if (ret == DB_KEYEXIST) { + STAT(rep->stat.st_log_duplicated++); +#ifdef CONFIG_TEST + STAT(rep->stat.st_log_futuredup++); +#endif + if (is_dupp != NULL) { + *is_dupp = 1; + /* + * Could get overwritten by max_lsn later, + * but only when returning NOTPERM for a + * REPCTL_PERM record, in which case max_lsn + * is this log record. + */ + if (ret_lsnp != NULL) + *ret_lsnp = lp->ready_lsn; + } ret = 0; + } if (ret != 0 && ret != ENOMEM) goto done; @@ -1337,10 +1390,11 @@ gap_check: * But max_lsn is guaranteed <= ready_lsn, so * it would be a more conservative LSN to return. */ - *ret_lsnp = lp->ready_lsn; + if (ret_lsnp != NULL) + *ret_lsnp = lp->ready_lsn; } LOGCOPY_32(env, &rectype, rec->data); - if (rectype == DB___txn_regop || rectype == DB___txn_ckp) + if (IS_PERM_RECTYPE(rectype)) max_lsn = lp->max_perm_lsn; /* * We check REPCTL_LEASE here, because this client may @@ -1536,6 +1590,7 @@ __rep_process_txn(env, rec) DB_REP *db_rep; DB_THREAD_INFO *ip; DB_TXNHEAD *txninfo; + DELAYED_BLOB_LIST *dblp, *dummy; LSN_COLLECTION lc; REP *rep; __txn_regop_args *txn_args; @@ -1548,12 +1603,12 @@ __rep_process_txn(env, rec) db_rep = env->rep_handle; rep = db_rep->region; logc = NULL; + dblp = dummy = NULL; txn_args = NULL; txn42_args = NULL; prep_args = NULL; txninfo = NULL; - ENV_ENTER(env, ip); memset(&data_dbt, 0, sizeof(data_dbt)); if (F_ISSET(env, ENV_THREAD)) F_SET(&data_dbt, DB_DBT_REALLOC); @@ -1618,8 +1673,19 @@ __rep_process_txn(env, rec) goto err; /* Phase 1. Get a list of the LSNs in this transaction, and sort it. */ - if ((ret = __rep_collect_txn(env, &prev_lsn, &lc)) != 0) + if ((ret = __rep_collect_txn(env, &prev_lsn, &lc, &dblp)) != 0) goto err; + /* Deal with any child transactions that had to be delayed. */ + while (dblp != NULL) { + if ((ret = __rep_collect_txn( + env, &dblp->lsn, &lc, &dummy)) != 0) + goto err; + DB_ASSERT(env, dummy == NULL); + dummy = dblp; + dblp = dummy->next; + __os_free(env, dummy); + dummy = NULL; + } qsort(lc.array, lc.nlsns, sizeof(DB_LSN), __rep_lsn_cmp); /* @@ -1627,6 +1693,7 @@ __rep_process_txn(env, rec) * records. Create a txnlist so that they can keep track of file * state between records. */ + ENV_GET_THREAD_INFO(env, ip); if ((ret = __db_txnlist_init(env, ip, 0, 0, NULL, &txninfo)) != 0) goto err; @@ -1647,6 +1714,7 @@ __rep_process_txn(env, rec) (u_long)lsnp->file, (u_long)lsnp->offset); goto err; } + LOGCOPY_32(env, &rectype, data_dbt.data); } err: memset(&req, 0, sizeof(req)); @@ -1658,6 +1726,12 @@ err: memset(&req, 0, sizeof(req)); if ((t_ret = __lock_id_free(env, locker)) != 0 && ret == 0) ret = t_ret; + while (dblp != NULL) { + dummy = dblp; + dblp = dummy->next; + __os_free(env, dummy); + } + err1: if (txn_args != NULL) __os_free(env, txn_args); if (txn42_args != NULL) @@ -1694,25 +1768,52 @@ err1: if (txn_args != NULL) * the entire transaction family at once. */ static int -__rep_collect_txn(env, lsnp, lc) +__rep_collect_txn(env, lsnp, lc, dbl) ENV *env; DB_LSN *lsnp; LSN_COLLECTION *lc; + DELAYED_BLOB_LIST **dbl; { + __dbreg_register_args *dbregargp; __txn_child_args *argp; DB_LOGC *logc; DB_LSN c_lsn; + DB_REP *db_rep; DBT data; - u_int32_t rectype; + db_seq_t blob_file_id; + u_int32_t child, rectype, skip_txnid; u_int nalloc; - int ret, t_ret; + int ret, t_ret, view_partial; + char *name; memset(&data, 0, sizeof(data)); F_SET(&data, DB_DBT_REALLOC); + skip_txnid = TXN_INVALID; if ((ret = __log_cursor(env, &logc)) != 0) return (ret); + /* + * For partial replication we assume a certain sequence of + * log records to detect a database create and skip it if + * desired. We are walking backward through the records of + * a single transaction right now. + * + * A create operation is done inside a BDB-owned child txn. + * Nothing else is done within this BDB-owned child txn. + * The last piece of a create operations is the dbreg_register + * log record that records the opening of the file. That + * log record contains the child txnid in the 'id' field, and + * the file name. At this point we invoke the partial callback + * to determine if this database should be replicated. If it + * should not be replicated, we need to avoid collecting the + * entire child txn referenced in the 'id' field. + * + * So if processing the dbreg_register record finds a database + * to skip, we store the child txnid in 'skip_txnid'. We use + * 'skip_txnid' to avoid processing log records or making + * recursive calls for that txnid. + */ while (!IS_ZERO_LSN(*lsnp) && (ret = __logc_get(logc, lsnp, &data, DB_SET)) == 0) { LOGCOPY_32(env, &rectype, data.data); @@ -1722,9 +1823,66 @@ __rep_collect_txn(env, lsnp, lc) goto err; c_lsn = argp->c_lsn; *lsnp = argp->prev_lsn; + child = argp->child; __os_free(env, argp); - ret = __rep_collect_txn(env, &c_lsn, lc); - } else { + + if (child == skip_txnid && *dbl != NULL && + (*dbl)->child == child) + (*dbl)->lsn = c_lsn; + /* + * If skip_txnid is set, it is the id of the child txnid + * that creates a database we should skip. So, if + * this is that child txn, do not collect it. + */ + if (skip_txnid == TXN_INVALID || child != skip_txnid) + ret = __rep_collect_txn(env, &c_lsn, lc, dbl); + } else if (IS_VIEW_SITE(env) && + rectype == DB___dbreg_register) { + db_rep = env->rep_handle; + /* + * If we are a view see if this is a file creation + * stream. On-disk files have the creating child txn + * in the 'id' field and the name. See if this view + * wants this file. + */ + if ((ret = __dbreg_register_read( + env, data.data, &dbregargp)) != 0) + goto err; + child = dbregargp->id; + name = (char *)dbregargp->name.data; + skip_txnid = TXN_INVALID; + if (child != TXN_INVALID && + (!IS_DB_FILE(name) || IS_BLOB_META(name))) { + /* + * The 'id' has a child txn so it is a create. + */ + DB_ASSERT(env, db_rep->partial != NULL); + GET_LO_HI(env, dbregargp->blob_fid_lo, + dbregargp->blob_fid_hi, blob_file_id, ret); + if (ret != 0) + goto err; + if ((ret = __rep_call_partial(env, + name, &view_partial, 0, dbl)) != 0) { + VPRINT(env, (env, DB_VERB_REP_MISC, + "rep_collect_txn: partial cb err %d for %s", ret, name)); + __os_free(env, dbregargp); + goto err; + } + /* + * Save the child txnid for when we walk back + * into the txn_child record. + */ + if (view_partial == 0) { + skip_txnid = child; + if ((ret = + __rep_remove_delayed_blobs(env, + blob_file_id, child, dbl)) != 0) + goto err; + } + } + __os_free(env, dbregargp); + } + if (rectype != DB___txn_child) { if (lc->nalloc < lc->nlsns + 1) { nalloc = lc->nalloc == 0 ? 20 : lc->nalloc * 2; if ((ret = __os_realloc(env, @@ -1761,6 +1919,62 @@ err: if ((t_ret = __logc_close(logc)) != 0 && ret == 0) } /* + * __rep_remove_delayed_blobs -- + * + * If a blob meta database is opened in the same transaction as the database + * that owns it, then deciding whether it should be replicated or not needs + * to be delayed until after the rest of the transaction is processed. To do + * this, the transaction's information is added to a DELAYED_BLOB_LIST. When + * the owning database is processed, if it is not replicated then remove the + * entry of its blob meta database from the delayed list. + */ +static int +__rep_remove_delayed_blobs(env, blob_file_id, child, dbl) + ENV *env; + db_seq_t blob_file_id; + u_int32_t child; + DELAYED_BLOB_LIST **dbl; +{ + DELAYED_BLOB_LIST *ent, *next, *prev; + + if (*dbl == NULL) + return (0); + + /* + * If the child transaction has not been set, then a new entry was just + * added to the list. + */ + if ((*dbl)->child == 0) { + (*dbl)->child = child; + return (0); + } + + if (blob_file_id == 0) + return (0); + + /* + * This blob meta database should not be replicated if its associated + * database is not replicated. Remove it from the delayed + * list so it will not be processed at a later time. + */ + for (ent = *dbl; ent != NULL; ent = (DELAYED_BLOB_LIST *)ent->next) { + if (ent->blob_file_id == blob_file_id && ent->child != child) { + next = (DELAYED_BLOB_LIST *)ent->next; + prev = (DELAYED_BLOB_LIST *)ent->prev; + if (ent == *dbl) + *dbl = next; + if (prev != NULL) + prev->next = ent->next; + if (next != NULL) + next->prev = ent->prev; + __os_free(env, ent); + break; + } + } + return (0); +} + +/* * __rep_lsn_cmp -- * qsort-type-compatible wrapper for LOG_COMPARE. */ @@ -2138,9 +2352,13 @@ __rep_process_rec(env, ip, rp, rec, ret_tsp, ret_lsnp) ret = __rep_process_txn(env, rec); } while (ret == DB_LOCK_DEADLOCK || ret == DB_LOCK_NOTGRANTED); - /* Now flush the log unless we're running TXN_NOSYNC. */ - if (ret == 0 && !F_ISSET(env->dbenv, DB_ENV_TXN_NOSYNC)) - ret = __log_flush(env, NULL); + /* Now write/flush the log as appropriate. */ + if (ret == 0) { + if (F_ISSET(env->dbenv, DB_ENV_TXN_WRITE_NOSYNC)) + ret = __log_rep_write(env); + else if (!F_ISSET(env->dbenv, DB_ENV_TXN_NOSYNC)) + ret = __log_flush(env, NULL); + } if (ret != 0) { __db_errx(env, DB_STR_A("3526", "Error processing txn [%lu][%lu]", "%lu %lu"), @@ -2256,7 +2474,7 @@ __rep_resend_req(env, rereq) DB_REP *db_rep; LOG *lp; REP *rep; - int master, ret; + int blob_sync, master, ret; repsync_t sync_state; u_int32_t gapflags, msgtype, repflags, sendflags; @@ -2271,6 +2489,7 @@ __rep_resend_req(env, rereq) repflags = rep->flags; sync_state = rep->sync_state; + blob_sync = rep->blob_sync; /* * If we are delayed we do not rerequest anything. */ @@ -2293,9 +2512,17 @@ __rep_resend_req(env, rereq) */ msgtype = REP_UPDATE_REQ; } else if (sync_state == SYNC_PAGE) { - REP_SYSTEM_LOCK(env); - ret = __rep_pggap_req(env, rep, NULL, gapflags); - REP_SYSTEM_UNLOCK(env); + if (blob_sync == 0) { + REP_SYSTEM_LOCK(env); + ret = __rep_pggap_req(env, rep, NULL, gapflags); + REP_SYSTEM_UNLOCK(env); + } else { + MUTEX_LOCK(env, rep->mtx_clientdb); + REP_SYSTEM_LOCK(env); + ret = __rep_blob_rereq(env, rep); + REP_SYSTEM_UNLOCK(env); + MUTEX_UNLOCK(env, rep->mtx_clientdb); + } } else { MUTEX_LOCK(env, rep->mtx_clientdb); ret = __rep_loggap_req(env, rep, NULL, gapflags); @@ -2397,9 +2624,20 @@ __rep_skip_msg(env, rep, eid, rectype) if (rep->master_id == DB_EID_INVALID) /* Case 1. */ (void)__rep_send_message(env, DB_EID_BROADCAST, REP_MASTER_REQ, NULL, NULL, 0, 0); - else if (eid == rep->master_id) /* Case 2. */ - ret = __rep_resend_req(env, 0); - else if (F_ISSET(rep, REP_F_CLIENT)) /* Case 3. */ + else if (eid == rep->master_id) { /* Case 2. */ + /* + * When we receive log messages in the SYNC_PAGE stage + * and we decide to rerequest, it often means the pages + * we expect have been dropped. Send a rerequest with + * gapflags for better performance. + */ + if ((rectype == REP_LOG || rectype == REP_BULK_LOG || + rectype == REP_LOG_MORE) && + rep->sync_state == SYNC_PAGE) + ret = __rep_resend_req(env, 1); + else + ret = __rep_resend_req(env, 0); + } else if (F_ISSET(rep, REP_F_CLIENT)) /* Case 3. */ (void)__rep_send_message(env, eid, REP_REREQUEST, NULL, NULL, 0, 0); } @@ -2421,7 +2659,6 @@ __rep_check_missing(env, gen, master_perm_lsn) DB_LOG *dblp; DB_LSN *end_lsn; DB_REP *db_rep; - DB_THREAD_INFO *ip; LOG *lp; REGINFO *infop; REP *rep; @@ -2434,7 +2671,6 @@ __rep_check_missing(env, gen, master_perm_lsn) infop = env->reginfo; has_log_gap = has_page_gap = ret = 0; - ENV_ENTER(env, ip); MUTEX_LOCK(env, rep->mtx_clientdb); REP_SYSTEM_LOCK(env); /* @@ -2518,8 +2754,7 @@ __rep_check_missing(env, gen, master_perm_lsn) rep->msg_th--; REP_SYSTEM_UNLOCK(env); -out: ENV_LEAVE(env, ip); - return (ret); +out: return (ret); } static int diff --git a/src/rep/rep_region.c b/src/rep/rep_region.c index f1d69dff..72372bff 100644 --- a/src/rep/rep_region.c +++ b/src/rep/rep_region.c @@ -1,7 +1,7 @@ /*- * See the file LICENSE for redistribution information. * - * Copyright (c) 2001, 2012 Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2001, 2015 Oracle and/or its affiliates. All rights reserved. * * $Id$ */ @@ -14,6 +14,8 @@ static int __rep_egen_init __P((ENV *, REP *)); static int __rep_gen_init __P((ENV *, REP *)); +static int __rep_view_init __P((ENV *, REP *)); +static int __rep_viewfile_exists __P((ENV *, int *)); /* * __rep_open -- @@ -29,7 +31,7 @@ __rep_open(env) REGENV *renv; REGINFO *infop; REP *rep; - int i, ret; + int i, ret, view; char *p; char fname[sizeof(REP_DIAGNAME) + 3]; @@ -37,10 +39,15 @@ __rep_open(env) infop = env->reginfo; renv = infop->primary; ret = 0; + view = 0; DB_ASSERT(env, DBREP_DIAG_FILES < 100); if (renv->rep_off == INVALID_ROFF) { - /* Must create the region. */ + /* + * Must create the region. This environment either is being + * created for the first time or has just had its regions + * cleared by a recovery. + */ if ((ret = __env_alloc(infop, sizeof(REP), &rep)) != 0) return (ret); memset(rep, 0, sizeof(*rep)); @@ -108,6 +115,23 @@ __rep_open(env) return (ret); if ((ret = __rep_egen_init(env, rep)) != 0) return (ret); + /* + * Determine if this is a view site or not. It is a view + * if the callback is set. If the site was a view in the + * past, we mark it as a view, but will check consistency + * later when starting replication. + */ + if (db_rep->partial != NULL) { + rep->stat.st_view = 1; + if ((ret = __rep_view_init(env, rep)) != 0) + return (ret); + } else { + if ((ret = __rep_viewfile_exists(env, &view)) != 0) + return (ret); + if (view) + rep->stat.st_view = 1; + } + rep->gbytes = db_rep->gbytes; rep->bytes = db_rep->bytes; rep->request_gap = db_rep->request_gap; @@ -157,6 +181,32 @@ __rep_open(env) "process joining the environment")); return (EINVAL); } + /* + * If we are joining an existing environment and we + * have a view callback set, then the environment must + * already be a view. If not, error. + * + * The other mismatch is not an error here (no callback + * set, but environment is a view) because we may be a + * rep unaware process such as db_stat and that is allowed + * to proceed. There is additional checking in other rep + * functions like rep_start to confirm consistency before + * using replication. + */ + if (db_rep->partial != NULL) { + if ((ret = __rep_viewfile_exists(env, &view)) != 0) + return (ret); + /* + * If there is a callback, and we are not in-memory, + * there better be a view system file too. + */ + if (view == 0 && !FLD_ISSET(rep->config, REP_C_INMEM)) { + __db_errx(env, DB_STR("3688", + "Application environment and view mismatch " + "joining the environment")); + return (EINVAL); + } + } #ifdef HAVE_REPLICATION_THREADS if ((ret = __repmgr_join(env, rep)) != 0) return (ret); @@ -506,9 +556,8 @@ __rep_write_egen(env, rep, egen) * If running in-memory replication, return without any file * operations. */ - if (FLD_ISSET(rep->config, REP_C_INMEM)) { + if (FLD_ISSET(rep->config, REP_C_INMEM)) return (0); - } if ((ret = __db_appname(env, DB_APP_META, REP_EGENNAME, NULL, &p)) != 0) @@ -591,9 +640,8 @@ __rep_write_gen(env, rep, gen) * If running in-memory replication, return without any file * operations. */ - if (FLD_ISSET(rep->config, REP_C_INMEM)) { + if (FLD_ISSET(rep->config, REP_C_INMEM)) return (0); - } if ((ret = __db_appname(env, DB_APP_META, REP_GENNAME, NULL, &p)) != 0) @@ -608,3 +656,105 @@ __rep_write_gen(env, rep, gen) __os_free(env, p); return (ret); } + +/* + * __rep_view_init -- + * Initialize the permanent view file to know this site is a view + * forever. The existence of the file is the record. + */ +static int +__rep_view_init(env, rep) + ENV *env; + REP *rep; +{ + DB_FH *fhp; + int ret; + char *p; + + /* + * If running in-memory replication, return without any file + * operations. + */ + if (FLD_ISSET(rep->config, REP_C_INMEM)) + return (0); + + if ((ret = __db_appname(env, + DB_APP_META, REPVIEW, NULL, &p)) != 0) + return (ret); + + /* + * If the file doesn't exist, create it. We just want to open + * and close the file. It doesn't have any content. + * If the file already exists, there is nothing else to do. + */ + if (__os_exists(env, p, NULL) != 0) { + RPRINT(env, (env, DB_VERB_REP_MISC, "View init: Create %s", p)); + if ((ret = __os_open(env, p, 0, + DB_OSO_CREATE | DB_OSO_TRUNC, DB_MODE_600, &fhp)) != 0) + goto out; + (void)__os_closehandle(env, fhp); + } +out: __os_free(env, p); + return (ret); +} + +/* + * __rep_check_view -- + * Check consistency between the view file and the db_rep handle. + * + * PUBLIC: int __rep_check_view __P((ENV *)); + */ +int +__rep_check_view(env) + ENV *env; +{ + DB_REP *db_rep; + REP *rep; + int exist, ret; + + db_rep = env->rep_handle; + rep = db_rep->region; + ret = 0; + + /* + * If running in-memory replication, check without any file + * operations. We can only check what exists in the region, + * which is the st_view field from a previous open. + */ + if (FLD_ISSET(rep->config, REP_C_INMEM)) + exist = (int)rep->stat.st_view; + else if ((ret = __rep_viewfile_exists(env, &exist)) != 0) + return (ret); + + RPRINT(env, (env, DB_VERB_REP_MISC, "Check view. Exist %d, cb %d", + exist, (db_rep->partial != NULL))); + /* + * If view file exists, a partial function must be set. + * If view file does not exist, a partial function must not be set. + */ + if ((exist == 0 && db_rep->partial != NULL) || + (exist == 1 && db_rep->partial == NULL)) + ret = EINVAL; + return (ret); +} + +static int +__rep_viewfile_exists(env, existp) + ENV *env; + int *existp; +{ + char *p; + int ret; + + *existp = 0; + if ((ret = __db_appname(env, + DB_APP_META, REPVIEW, NULL, &p)) != 0) + return (ret); + + if (__os_exists(env, p, NULL) == 0) + *existp = 1; + + __os_free(env, p); + return (ret); + +} diff --git a/src/rep/rep_stat.c b/src/rep/rep_stat.c index addfee25..ffb9f262 100644 --- a/src/rep/rep_stat.c +++ b/src/rep/rep_stat.c @@ -1,7 +1,7 @@ /*- * See the file LICENSE for redistribution information. * - * Copyright (c) 2001, 2012 Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2001, 2015 Oracle and/or its affiliates. All rights reserved. * * $Id$ */ @@ -73,6 +73,13 @@ static const char *__rep_syncstate_to_string __P((repsync_t)); } \ } while (0) +#define PRINT_VIEW(sp) do { \ + if ((sp)->st_view != 0) \ + __db_msg(env, "Environment configured as view site"); \ + else \ + __db_msg(env, "Environment not configured as view site");\ +} while (0) + /* * __rep_stat_pp -- * ENV->rep_stat pre/post processing. @@ -120,7 +127,7 @@ __rep_stat(env, statp, flags) DB_REP_STAT *stats; LOG *lp; REP *rep; - u_int32_t startupdone; + u_int32_t startupdone, view; uintmax_t queued; int dolock, ret; @@ -177,10 +184,12 @@ __rep_stat(env, statp, flags) if (LF_ISSET(DB_STAT_CLEAR)) { queued = rep->stat.st_log_queued; startupdone = rep->stat.st_startup_complete; + view = rep->stat.st_view; memset(&rep->stat, 0, sizeof(rep->stat)); rep->stat.st_log_queued = rep->stat.st_log_queued_total = rep->stat.st_log_queued_max = queued; rep->stat.st_startup_complete = startupdone; + rep->stat.st_view = view; } /* @@ -377,6 +386,7 @@ __rep_print_stats(env, flags) __db_dl(env, "Number of page records missed and requested", (u_long)sp->st_pg_requested); PRINT_STARTUPCOMPLETE(sp); + PRINT_VIEW(sp); __db_dl(env, "Number of transactions applied", (u_long)sp->st_txns_applied); @@ -462,16 +472,20 @@ __rep_print_all(env, flags) u_int32_t flags; { static const FN rep_cfn[] = { - { REP_C_2SITE_STRICT, "REP_C_2SITE_STRICT" }, - { REP_C_AUTOINIT, "REP_C_AUTOINIT" }, - { REP_C_AUTOROLLBACK, "REP_C_AUTOROLLBACK" }, - { REP_C_BULK, "REP_C_BULK" }, - { REP_C_DELAYCLIENT, "REP_C_DELAYCLIENT" }, - { REP_C_ELECTIONS, "REP_C_ELECTIONS" }, - { REP_C_INMEM, "REP_C_INMEM" }, - { REP_C_LEASE, "REP_C_LEASE" }, - { REP_C_NOWAIT, "REP_C_NOWAIT" }, - { 0, NULL } + { REP_C_2SITE_STRICT, "REP_C_2SITE_STRICT" }, + { REP_C_AUTOINIT, "REP_C_AUTOINIT" }, + { REP_C_AUTOROLLBACK, "REP_C_AUTOROLLBACK" }, + { REP_C_AUTOTAKEOVER, "REP_C_AUTOTAKEOVER" }, + { REP_C_BULK, "REP_C_BULK" }, + { REP_C_DELAYCLIENT, "REP_C_DELAYCLIENT" }, + { REP_C_ELECT_LOGLENGTH, "REP_C_ELECT_LOGLENGTH" }, + { REP_C_ELECTIONS, "REP_C_ELECTIONS" }, + { REP_C_INMEM, "REP_C_INMEM" }, + { REP_C_LEASE, "REP_C_LEASE" }, + { REP_C_NOWAIT, "REP_C_NOWAIT" }, + { REP_C_PREFMAS_CLIENT, "REP_C_PREFMAS_CLIENT" }, + { REP_C_PREFMAS_MASTER, "REP_C_PREFMAS_MASTER" }, + { 0, NULL } }; static const FN rep_efn[] = { { REP_E_PHASE0, "REP_E_PHASE0" }, @@ -481,19 +495,21 @@ __rep_print_all(env, flags) { 0, NULL } }; static const FN rep_fn[] = { - { REP_F_ABBREVIATED, "REP_F_ABBREVIATED" }, - { REP_F_APP_BASEAPI, "REP_F_APP_BASEAPI" }, - { REP_F_APP_REPMGR, "REP_F_APP_REPMGR" }, - { REP_F_CLIENT, "REP_F_CLIENT" }, - { REP_F_DELAY, "REP_F_DELAY" }, - { REP_F_GROUP_ESTD, "REP_F_GROUP_ESTD" }, - { REP_F_LEASE_EXPIRED, "REP_F_LEASE_EXPIRED" }, - { REP_F_MASTER, "REP_F_MASTER" }, - { REP_F_MASTERELECT, "REP_F_MASTERELECT" }, - { REP_F_NEWFILE, "REP_F_NEWFILE" }, - { REP_F_NIMDBS_LOADED, "REP_F_NIMDBS_LOADED" }, - { REP_F_SKIPPED_APPLY, "REP_F_SKIPPED_APPLY" }, - { REP_F_START_CALLED, "REP_F_START_CALLED" }, + { REP_F_ABBREVIATED, "REP_F_ABBREVIATED" }, + { REP_F_APP_BASEAPI, "REP_F_APP_BASEAPI" }, + { REP_F_APP_REPMGR, "REP_F_APP_REPMGR" }, + { REP_F_CLIENT, "REP_F_CLIENT" }, + { REP_F_DELAY, "REP_F_DELAY" }, + { REP_F_GROUP_ESTD, "REP_F_GROUP_ESTD" }, + { REP_F_HOLD_GEN, "REP_F_HOLD_GEN" }, + { REP_F_LEASE_EXPIRED, "REP_F_LEASE_EXPIRED" }, + { REP_F_MASTER, "REP_F_MASTER" }, + { REP_F_MASTERELECT, "REP_F_MASTERELECT" }, + { REP_F_NEWFILE, "REP_F_NEWFILE" }, + { REP_F_NIMDBS_LOADED, "REP_F_NIMDBS_LOADED" }, + { REP_F_READONLY_MASTER, "REP_F_READONLY_MASTER" }, + { REP_F_SKIPPED_APPLY, "REP_F_SKIPPED_APPLY" }, + { REP_F_START_CALLED, "REP_F_START_CALLED" }, { 0, NULL } }; static const FN rep_lfn[] = { @@ -523,15 +539,16 @@ __rep_print_all(env, flags) rep = db_rep->region; infop = env->reginfo; renv = infop->primary; - ENV_ENTER(env, ip); __db_msg(env, "%s", DB_GLOBAL(db_line)); __db_msg(env, "DB_REP handle information:"); if (db_rep->rep_db == NULL) STAT_ISSET("Bookkeeping database", db_rep->rep_db); - else + else { + ENV_GET_THREAD_INFO(env, ip); (void)__db_stat_print(db_rep->rep_db, ip, flags); + } __db_prflags(env, NULL, db_rep->flags, dbrep_fn, NULL, "\tFlags"); @@ -604,7 +621,6 @@ __rep_print_all(env, flags) STAT_LONG("Maximum lease timestamp microseconds", lp->max_lease_ts.tv_nsec / NS_PER_US); MUTEX_UNLOCK(env, rep->mtx_clientdb); - ENV_LEAVE(env, ip); return (0); } @@ -648,8 +664,10 @@ __rep_stat_summary_print(env) ret = 0; if ((ret = __rep_stat(env, &sp, 0)) == 0) { PRINT_STATUS(sp, is_client); - if (is_client) + if (is_client) { PRINT_STARTUPCOMPLETE(sp); + PRINT_VIEW(sp); + } PRINT_MAXPERMLSN(sp); /* * Use the number of sites that is kept up-to-date most diff --git a/src/rep/rep_stub.c b/src/rep/rep_stub.c index 2d96ea59..51c79eb0 100644 --- a/src/rep/rep_stub.c +++ b/src/rep/rep_stub.c @@ -1,7 +1,7 @@ /*- * See the file LICENSE for redistribution information. * - * Copyright (c) 1996, 2012 Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 1996, 2015 Oracle and/or its affiliates. All rights reserved. * * $Id$ */ @@ -130,7 +130,7 @@ __rep_elect_pp(dbenv, nsites, nvotes, flags) } int -__rep_flush(dbenv) +__rep_flush_pp(dbenv) DB_ENV *dbenv; { return (__db_norep(dbenv->env)); @@ -201,7 +201,7 @@ __rep_get_nsites(dbenv, n) } int -__rep_set_priority(dbenv, priority) +__rep_set_priority_pp(dbenv, priority) DB_ENV *dbenv; u_int32_t priority; { @@ -219,7 +219,7 @@ __rep_get_priority(dbenv, priority) } int -__rep_set_timeout(dbenv, which, timeout) +__rep_set_timeout_pp(dbenv, which, timeout) DB_ENV *dbenv; int which; db_timeout_t timeout; @@ -342,6 +342,16 @@ __rep_set_transport_pp(dbenv, eid, f_send) } int +__rep_set_view(dbenv, f_partial) + DB_ENV *dbenv; + int (*f_partial) __P((DB_ENV *, + const char *, int *, u_int32_t)); +{ + COMPQUIET(f_partial, NULL); + return (__db_norep(dbenv->env)); +} + +int __rep_set_request(dbenv, min, max) DB_ENV *dbenv; u_int32_t min, max; diff --git a/src/rep/rep_util.c b/src/rep/rep_util.c index 0dfe6122..5ee2592f 100644 --- a/src/rep/rep_util.c +++ b/src/rep/rep_util.c @@ -1,7 +1,7 @@ /*- * See the file LICENSE for redistribution information. * - * Copyright (c) 2001, 2012 Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2001, 2015 Oracle and/or its affiliates. All rights reserved. * * $Id$ */ @@ -11,6 +11,7 @@ #include "db_int.h" #include "dbinc/db_page.h" #include "dbinc/db_am.h" +#include "dbinc/fop.h" #include "dbinc/mp.h" #include "dbinc/txn.h" @@ -437,7 +438,7 @@ __rep_send_message(env, eid, rtype, lsnp, dbt, ctlflags, repflags) FLD_ISSET(ctlflags, REPCTL_LEASE | REPCTL_PERM)) { F_SET(&cntrl, REPCTL_LEASE); DB_ASSERT(env, rep->version == DB_REPVERSION); - __os_gettime(env, &msg_time, 1); + __os_gettime(env, &msg_time, 0); cntrl.msg_sec = (u_int32_t)msg_time.tv_sec; cntrl.msg_nsec = (u_int32_t)msg_time.tv_nsec; } @@ -591,6 +592,15 @@ __rep_new_master(env, cntrl, eid) ret = 0; logc = NULL; lockout_msg = 0; + + /* + * If REP_F_HOLD_GEN is set, we want to keep this site at its + * current gen. Do not process an incoming NEWMASTER, which + * would change the gen. + */ + if (F_ISSET(rep, REP_F_HOLD_GEN)) + return (ret); + REP_SYSTEM_LOCK(env); change = rep->gen != cntrl->gen || rep->master_id != eid; /* @@ -1128,6 +1138,8 @@ __env_db_rep_exit(env) rep = db_rep->region; REP_SYSTEM_LOCK(env); + /* If we have a reference, it better not already be 0. */ + DB_ASSERT(env, rep->handle_cnt != 0); rep->handle_cnt--; REP_SYSTEM_UNLOCK(env); @@ -1190,7 +1202,7 @@ __db_rep_enter(dbp, checkgen, checklock, return_now) * get an exclusive lock on this database. */ if (checkgen && dbp->mpf->mfp && IS_REP_CLIENT(env)) { - if (dbp->mpf->mfp->excl_lockout) + if (dbp->mpf->mfp->excl_lockout) return (DB_REP_HANDLE_DEAD); } @@ -1328,7 +1340,8 @@ __op_rep_exit(env) rep = db_rep->region; REP_SYSTEM_LOCK(env); - DB_ASSERT(env, rep->op_cnt > 0); + /* If we have a reference, it better not already be 0. */ + DB_ASSERT(env, rep->op_cnt != 0); rep->op_cnt--; REP_SYSTEM_UNLOCK(env); @@ -1697,7 +1710,9 @@ __rep_msg_to_old(version, rectype) REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, - REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID }, + REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, + REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, + REP_INVALID }, /* * 4.2/DB_REPVERSION 1 no longer supported. */ @@ -1708,7 +1723,9 @@ __rep_msg_to_old(version, rectype) REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, - REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID }, + REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, + REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, + REP_INVALID }, /* * 4.3/DB_REPVERSION 2 no longer supported. */ @@ -1719,7 +1736,9 @@ __rep_msg_to_old(version, rectype) REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, - REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID }, + REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, + REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, + REP_INVALID }, /* * From 4.7 message number To 4.4/4.5 message number */ @@ -1727,6 +1746,11 @@ __rep_msg_to_old(version, rectype) 1, /* REP_ALIVE */ 2, /* REP_ALIVE_REQ */ 3, /* REP_ALL_REQ */ + REP_INVALID, /* REP_BLOB_ALL_REQ */ + REP_INVALID, /* REP_BLOB_CHUNK */ + REP_INVALID, /* REP_BLOB_CHUNK_REQ */ + REP_INVALID, /* REP_BLOB_UPDATE */ + REP_INVALID, /* REP_BLOB_UPDATE_REQ */ 4, /* REP_BULK_LOG */ 5, /* REP_BULK_PAGE */ 6, /* REP_DUPMASTER */ @@ -1765,6 +1789,11 @@ __rep_msg_to_old(version, rectype) 1, /* REP_ALIVE */ 2, /* REP_ALIVE_REQ */ 3, /* REP_ALL_REQ */ + REP_INVALID, /* REP_BLOB_ALL_REQ */ + REP_INVALID, /* REP_BLOB_CHUNK */ + REP_INVALID, /* REP_BLOB_CHUNK_REQ */ + REP_INVALID, /* REP_BLOB_UPDATE */ + REP_INVALID, /* REP_BLOB_UPDATE_REQ */ 4, /* REP_BULK_LOG */ 5, /* REP_BULK_PAGE */ 6, /* REP_DUPMASTER */ @@ -1803,6 +1832,11 @@ __rep_msg_to_old(version, rectype) 1, /* REP_ALIVE */ 2, /* REP_ALIVE_REQ */ 3, /* REP_ALL_REQ */ + REP_INVALID, /* REP_BLOB_ALL_REQ */ + REP_INVALID, /* REP_BLOB_CHUNK */ + REP_INVALID, /* REP_BLOB_CHUNK_REQ */ + REP_INVALID, /* REP_BLOB_UPDATE */ + REP_INVALID, /* REP_BLOB_UPDATE_REQ */ 4, /* REP_BULK_LOG */ 5, /* REP_BULK_PAGE */ 6, /* REP_DUPMASTER */ @@ -1841,6 +1875,53 @@ __rep_msg_to_old(version, rectype) 1, /* REP_ALIVE */ 2, /* REP_ALIVE_REQ */ 3, /* REP_ALL_REQ */ + REP_INVALID, /* REP_BLOB_ALL_REQ */ + REP_INVALID, /* REP_BLOB_CHUNK */ + REP_INVALID, /* REP_BLOB_CHUNK_REQ */ + REP_INVALID, /* REP_BLOB_UPDATE */ + REP_INVALID, /* REP_BLOB_UPDATE_REQ */ + 4, /* REP_BULK_LOG */ + 5, /* REP_BULK_PAGE */ + 6, /* REP_DUPMASTER */ + 7, /* REP_FILE */ + 8, /* REP_FILE_FAIL */ + 9, /* REP_FILE_REQ */ + 10, /* REP_LEASE_GRANT */ + 11, /* REP_LOG */ + 12, /* REP_LOG_MORE */ + 13, /* REP_LOG_REQ */ + 14, /* REP_MASTER_REQ */ + 15, /* REP_NEWCLIENT */ + 16, /* REP_NEWFILE */ + 17, /* REP_NEWMASTER */ + 18, /* REP_NEWSITE */ + 19, /* REP_PAGE */ + 20, /* REP_PAGE_FAIL */ + 21, /* REP_PAGE_MORE */ + 22, /* REP_PAGE_REQ */ + 23, /* REP_REREQUEST */ + 24, /* REP_START_SYNC */ + 25, /* REP_UPDATE */ + 26, /* REP_UPDATE_REQ */ + 27, /* REP_VERIFY */ + 28, /* REP_VERIFY_FAIL */ + 29, /* REP_VERIFY_REQ */ + 30, /* REP_VOTE1 */ + 31 /* REP_VOTE2 */ + }, + /* + * From 6.1 message number To 5.3 message number. Messages + * handling BLOBs were added. + */ + { REP_INVALID, /* NO message 0 */ + 1, /* REP_ALIVE */ + 2, /* REP_ALIVE_REQ */ + 3, /* REP_ALL_REQ */ + REP_INVALID, /* REP_BLOB_ALL_REQ */ + REP_INVALID, /* REP_BLOB_CHUNK */ + REP_INVALID, /* REP_BLOB_CHUNK_REQ */ + REP_INVALID, /* REP_BLOB_UPDATE */ + REP_INVALID, /* REP_BLOB_UPDATE_REQ */ 4, /* REP_BULK_LOG */ 5, /* REP_BULK_PAGE */ 6, /* REP_DUPMASTER */ @@ -1901,7 +1982,9 @@ __rep_msg_from_old(version, rectype) REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, - REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID }, + REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, + REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, + REP_INVALID }, /* * 4.2/DB_REPVERSION 1 no longer supported. */ @@ -1912,7 +1995,9 @@ __rep_msg_from_old(version, rectype) REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, - REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID }, + REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, + REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, + REP_INVALID }, /* * 4.3/DB_REPVERSION 2 no longer supported. */ @@ -1923,7 +2008,9 @@ __rep_msg_from_old(version, rectype) REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, - REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID }, + REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, + REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, + REP_INVALID }, /* * From 4.4/4.5 message number To 4.7 message number */ @@ -1931,36 +2018,41 @@ __rep_msg_from_old(version, rectype) 1, /* 1, REP_ALIVE */ 2, /* 2, REP_ALIVE_REQ */ 3, /* 3, REP_ALL_REQ */ - 4, /* 4, REP_BULK_LOG */ - 5, /* 5, REP_BULK_PAGE */ - 6, /* 6, REP_DUPMASTER */ - 7, /* 7, REP_FILE */ - 8, /* 8, REP_FILE_FAIL */ - 9, /* 9, REP_FILE_REQ */ - /* 10, REP_LEASE_GRANT doesn't exist */ - 11, /* 10, REP_LOG */ - 12, /* 11, REP_LOG_MORE */ - 13, /* 12, REP_LOG_REQ */ - 14, /* 13, REP_MASTER_REQ */ - 15, /* 14, REP_NEWCLIENT */ - 16, /* 15, REP_NEWFILE */ - 17, /* 16, REP_NEWMASTER */ - 18, /* 17, REP_NEWSITE */ - 19, /* 18, REP_PAGE */ - 20, /* 19, REP_PAGE_FAIL */ - 21, /* 20, REP_PAGE_MORE */ - 22, /* 21, REP_PAGE_REQ */ - 23, /* 22, REP_REREQUEST */ - /* 24, REP_START_SYNC doesn't exist */ - 25, /* 23, REP_UPDATE */ - 26, /* 24, REP_UPDATE_REQ */ - 27, /* 25, REP_VERIFY */ - 28, /* 26, REP_VERIFY_FAIL */ - 29, /* 27, REP_VERIFY_REQ */ - 30, /* 28, REP_VOTE1 */ - 31, /* 29, REP_VOTE2 */ + 9, /* 4, REP_BULK_LOG */ + 10, /* 5, REP_BULK_PAGE */ + 11, /* 6, REP_DUPMASTER */ + 12, /* 7, REP_FILE */ + 13, /* 8, REP_FILE_FAIL */ + 14, /* 9, REP_FILE_REQ */ + /* 15, REP_LEASE_GRANT doesn't exist */ + 16, /* 10, REP_LOG */ + 17, /* 11, REP_LOG_MORE */ + 18, /* 12, REP_LOG_REQ */ + 19, /* 13, REP_MASTER_REQ */ + 20, /* 14, REP_NEWCLIENT */ + 21, /* 15, REP_NEWFILE */ + 22, /* 16, REP_NEWMASTER */ + 23, /* 17, REP_NEWSITE */ + 24, /* 18, REP_PAGE */ + 25, /* 19, REP_PAGE_FAIL */ + 26, /* 20, REP_PAGE_MORE */ + 27, /* 21, REP_PAGE_REQ */ + 28, /* 22, REP_REREQUEST */ + /* 29, REP_START_SYNC doesn't exist */ + 30, /* 23, REP_UPDATE */ + 31, /* 24, REP_UPDATE_REQ */ + 32, /* 25, REP_VERIFY */ + 33, /* 26, REP_VERIFY_FAIL */ + 34, /* 27, REP_VERIFY_REQ */ + 35, /* 28, REP_VOTE1 */ + 36, /* 29, REP_VOTE2 */ REP_INVALID, /* 30, 4.4/4.5 no message */ - REP_INVALID /* 31, 4.4/4.5 no message */ + REP_INVALID, /* 31, 4.4/4.5 no message */ + REP_INVALID, /* 32, 4.4/4.5 no message */ + REP_INVALID, /* 33, 4.4/4.5 no message */ + REP_INVALID, /* 34, 4.4/4.5 no message */ + REP_INVALID, /* 35, 4.4/4.5 no message */ + REP_INVALID /* 36, 4.4/4.5 no message */ }, /* * From 4.6 message number To 4.7 message number. There are @@ -1971,34 +2063,39 @@ __rep_msg_from_old(version, rectype) 1, /* 1, REP_ALIVE */ 2, /* 2, REP_ALIVE_REQ */ 3, /* 3, REP_ALL_REQ */ - 4, /* 4, REP_BULK_LOG */ - 5, /* 5, REP_BULK_PAGE */ - 6, /* 6, REP_DUPMASTER */ - 7, /* 7, REP_FILE */ - 8, /* 8, REP_FILE_FAIL */ - 9, /* 9, REP_FILE_REQ */ - 10, /* 10, REP_LEASE_GRANT */ - 11, /* 11, REP_LOG */ - 12, /* 12, REP_LOG_MORE */ - 13, /* 13, REP_LOG_REQ */ - 14, /* 14, REP_MASTER_REQ */ - 15, /* 15, REP_NEWCLIENT */ - 16, /* 16, REP_NEWFILE */ - 17, /* 17, REP_NEWMASTER */ - 18, /* 18, REP_NEWSITE */ - 19, /* 19, REP_PAGE */ - 20, /* 20, REP_PAGE_FAIL */ - 21, /* 21, REP_PAGE_MORE */ - 22, /* 22, REP_PAGE_REQ */ - 23, /* 22, REP_REREQUEST */ - 24, /* 24, REP_START_SYNC */ - 25, /* 25, REP_UPDATE */ - 26, /* 26, REP_UPDATE_REQ */ - 27, /* 27, REP_VERIFY */ - 28, /* 28, REP_VERIFY_FAIL */ - 29, /* 29, REP_VERIFY_REQ */ - 30, /* 30, REP_VOTE1 */ - 31 /* 31, REP_VOTE2 */ + 9, /* 4, REP_BULK_LOG */ + 10, /* 5, REP_BULK_PAGE */ + 11, /* 6, REP_DUPMASTER */ + 12, /* 7, REP_FILE */ + 13, /* 8, REP_FILE_FAIL */ + 14, /* 9, REP_FILE_REQ */ + 15, /* 10, REP_LEASE_GRANT */ + 16, /* 11, REP_LOG */ + 17, /* 12, REP_LOG_MORE */ + 18, /* 13, REP_LOG_REQ */ + 19, /* 14, REP_MASTER_REQ */ + 20, /* 15, REP_NEWCLIENT */ + 21, /* 16, REP_NEWFILE */ + 22, /* 17, REP_NEWMASTER */ + 23, /* 18, REP_NEWSITE */ + 24, /* 19, REP_PAGE */ + 25, /* 20, REP_PAGE_FAIL */ + 26, /* 21, REP_PAGE_MORE */ + 27, /* 22, REP_PAGE_REQ */ + 28, /* 22, REP_REREQUEST */ + 29, /* 24, REP_START_SYNC */ + 30, /* 25, REP_UPDATE */ + 31, /* 26, REP_UPDATE_REQ */ + 32, /* 27, REP_VERIFY */ + 33, /* 28, REP_VERIFY_FAIL */ + 34, /* 29, REP_VERIFY_REQ */ + 35, /* 30, REP_VOTE1 */ + 36, /* 31, REP_VOTE2 */ + REP_INVALID, /* 32, 4.6/4.7 no message */ + REP_INVALID, /* 33, 4.6/4.7 no message */ + REP_INVALID, /* 34, 4.6/4.7 no message */ + REP_INVALID, /* 35, 4.6/4.7 no message */ + REP_INVALID /* 36, 4.6/4.7 no message */ }, /* * From 4.7 message number To 5.2 message number. There are @@ -2009,34 +2106,39 @@ __rep_msg_from_old(version, rectype) 1, /* 1, REP_ALIVE */ 2, /* 2, REP_ALIVE_REQ */ 3, /* 3, REP_ALL_REQ */ - 4, /* 4, REP_BULK_LOG */ - 5, /* 5, REP_BULK_PAGE */ - 6, /* 6, REP_DUPMASTER */ - 7, /* 7, REP_FILE */ - 8, /* 8, REP_FILE_FAIL */ - 9, /* 9, REP_FILE_REQ */ - 10, /* 10, REP_LEASE_GRANT */ - 11, /* 11, REP_LOG */ - 12, /* 12, REP_LOG_MORE */ - 13, /* 13, REP_LOG_REQ */ - 14, /* 14, REP_MASTER_REQ */ - 15, /* 15, REP_NEWCLIENT */ - 16, /* 16, REP_NEWFILE */ - 17, /* 17, REP_NEWMASTER */ - 18, /* 18, REP_NEWSITE */ - 19, /* 19, REP_PAGE */ - 20, /* 20, REP_PAGE_FAIL */ - 21, /* 21, REP_PAGE_MORE */ - 22, /* 22, REP_PAGE_REQ */ - 23, /* 22, REP_REREQUEST */ - 24, /* 24, REP_START_SYNC */ - 25, /* 25, REP_UPDATE */ - 26, /* 26, REP_UPDATE_REQ */ - 27, /* 27, REP_VERIFY */ - 28, /* 28, REP_VERIFY_FAIL */ - 29, /* 29, REP_VERIFY_REQ */ - 30, /* 30, REP_VOTE1 */ - 31 /* 31, REP_VOTE2 */ + 9, /* 4, REP_BULK_LOG */ + 10, /* 5, REP_BULK_PAGE */ + 11, /* 6, REP_DUPMASTER */ + 12, /* 7, REP_FILE */ + 13, /* 8, REP_FILE_FAIL */ + 14, /* 9, REP_FILE_REQ */ + 15, /* 10, REP_LEASE_GRANT */ + 16, /* 11, REP_LOG */ + 17, /* 12, REP_LOG_MORE */ + 18, /* 13, REP_LOG_REQ */ + 19, /* 14, REP_MASTER_REQ */ + 20, /* 15, REP_NEWCLIENT */ + 21, /* 16, REP_NEWFILE */ + 22, /* 17, REP_NEWMASTER */ + 23, /* 18, REP_NEWSITE */ + 24, /* 19, REP_PAGE */ + 25, /* 20, REP_PAGE_FAIL */ + 26, /* 21, REP_PAGE_MORE */ + 27, /* 22, REP_PAGE_REQ */ + 28, /* 22, REP_REREQUEST */ + 29, /* 24, REP_START_SYNC */ + 30, /* 25, REP_UPDATE */ + 31, /* 26, REP_UPDATE_REQ */ + 32, /* 27, REP_VERIFY */ + 33, /* 28, REP_VERIFY_FAIL */ + 34, /* 29, REP_VERIFY_REQ */ + 35, /* 30, REP_VOTE1 */ + 36, /* 31, REP_VOTE2 */ + REP_INVALID, /* 32, 4.7/5.2 no message */ + REP_INVALID, /* 33, 4.7/5.2 no message */ + REP_INVALID, /* 34, 4.7/5.2 no message */ + REP_INVALID, /* 35, 4.7/5.2 no message */ + REP_INVALID /* 36, 4.7/5.2 no message */ }, /* * From 4.7 message number To 5.3 message number. There are @@ -2047,34 +2149,86 @@ __rep_msg_from_old(version, rectype) 1, /* 1, REP_ALIVE */ 2, /* 2, REP_ALIVE_REQ */ 3, /* 3, REP_ALL_REQ */ - 4, /* 4, REP_BULK_LOG */ - 5, /* 5, REP_BULK_PAGE */ - 6, /* 6, REP_DUPMASTER */ - 7, /* 7, REP_FILE */ - 8, /* 8, REP_FILE_FAIL */ - 9, /* 9, REP_FILE_REQ */ - 10, /* 10, REP_LEASE_GRANT */ - 11, /* 11, REP_LOG */ - 12, /* 12, REP_LOG_MORE */ - 13, /* 13, REP_LOG_REQ */ - 14, /* 14, REP_MASTER_REQ */ - 15, /* 15, REP_NEWCLIENT */ - 16, /* 16, REP_NEWFILE */ - 17, /* 17, REP_NEWMASTER */ - 18, /* 18, REP_NEWSITE */ - 19, /* 19, REP_PAGE */ - 20, /* 20, REP_PAGE_FAIL */ - 21, /* 21, REP_PAGE_MORE */ - 22, /* 22, REP_PAGE_REQ */ - 23, /* 22, REP_REREQUEST */ - 24, /* 24, REP_START_SYNC */ - 25, /* 25, REP_UPDATE */ - 26, /* 26, REP_UPDATE_REQ */ - 27, /* 27, REP_VERIFY */ - 28, /* 28, REP_VERIFY_FAIL */ - 29, /* 29, REP_VERIFY_REQ */ - 30, /* 30, REP_VOTE1 */ - 31 /* 31, REP_VOTE2 */ + 9, /* 4, REP_BULK_LOG */ + 10, /* 5, REP_BULK_PAGE */ + 11, /* 6, REP_DUPMASTER */ + 12, /* 7, REP_FILE */ + 13, /* 8, REP_FILE_FAIL */ + 14, /* 9, REP_FILE_REQ */ + 15, /* 10, REP_LEASE_GRANT */ + 16, /* 11, REP_LOG */ + 17, /* 12, REP_LOG_MORE */ + 18, /* 13, REP_LOG_REQ */ + 19, /* 14, REP_MASTER_REQ */ + 20, /* 15, REP_NEWCLIENT */ + 21, /* 16, REP_NEWFILE */ + 22, /* 17, REP_NEWMASTER */ + 23, /* 18, REP_NEWSITE */ + 24, /* 19, REP_PAGE */ + 25, /* 20, REP_PAGE_FAIL */ + 26, /* 21, REP_PAGE_MORE */ + 27, /* 22, REP_PAGE_REQ */ + 28, /* 22, REP_REREQUEST */ + 29, /* 24, REP_START_SYNC */ + 30, /* 25, REP_UPDATE */ + 31, /* 26, REP_UPDATE_REQ */ + 32, /* 27, REP_VERIFY */ + 33, /* 28, REP_VERIFY_FAIL */ + 34, /* 29, REP_VERIFY_REQ */ + 35, /* 30, REP_VOTE1 */ + 36, /* 31, REP_VOTE2 */ + REP_INVALID, /* 32, 4.7/5.3 no message */ + REP_INVALID, /* 33, 4.7/5.3 no message */ + REP_INVALID, /* 34, 4.7/5.3 no message */ + REP_INVALID, /* 35, 4.7/5.3 no message */ + REP_INVALID /* 36, 4.7/5.3 no message */ + }, + /* + * From 5.3 message number To 6.1 message number. Messages to + * handle BLOBs were added. + */ + { REP_INVALID, /* NO message 0 */ + 1, /* 1, REP_ALIVE */ + 2, /* 2, REP_ALIVE_REQ */ + 3, /* 3, REP_ALL_REQ */ + /* 4, REP_BLOB_ALL_REQ doesn't exist */ + /* 5, REP_BLOB_CHUNK doesn't exist */ + /* 6, REP_BLOB_CHUNK_REQ doesn't exist */ + /* 7, REP_BLOB_UPDATE doesn't exist */ + /* 8, REP_BLOB_UPDATE_REQ doesn't exist */ + 9, /* 4, REP_BULK_LOG */ + 10, /* 5, REP_BULK_PAGE */ + 11, /* 6, REP_DUPMASTER */ + 12, /* 7, REP_FILE */ + 13, /* 8, REP_FILE_FAIL */ + 14, /* 9, REP_FILE_REQ */ + 15, /* 10, REP_LEASE_GRANT */ + 16, /* 11, REP_LOG */ + 17, /* 12, REP_LOG_MORE */ + 18, /* 13, REP_LOG_REQ */ + 19, /* 14, REP_MASTER_REQ */ + 20, /* 15, REP_NEWCLIENT */ + 21, /* 16, REP_NEWFILE */ + 22, /* 17, REP_NEWMASTER */ + 23, /* 18, REP_NEWSITE */ + 24, /* 19, REP_PAGE */ + 25, /* 20, REP_PAGE_FAIL */ + 26, /* 21, REP_PAGE_MORE */ + 27, /* 22, REP_PAGE_REQ */ + 28, /* 23, REP_REREQUEST */ + 29, /* 24, REP_START_SYNC */ + 30, /* 25, REP_UPDATE */ + 31, /* 26, REP_UPDATE_REQ */ + 32, /* 27, REP_VERIFY */ + 33, /* 28, REP_VERIFY_FAIL */ + 34, /* 29, REP_VERIFY_REQ */ + 35, /* 30, REP_VOTE1 */ + 36, /* 31, REP_VOTE2 */ + REP_INVALID, /* 32, 5.3/6.1 no message */ + REP_INVALID, /* 33, 5.3/6.1 no message */ + REP_INVALID, /* 34, 5.3/6.1 no message */ + REP_INVALID, /* 35, 5.3/6.1 no message */ + REP_INVALID /* 36, 5.3/6.1 no message */ } }; return (table[version][rectype]); @@ -2215,9 +2369,9 @@ __rep_print_int(env, verbose, fmt, ap) __os_id(env->dbenv, &pid, &tid); if (diag_msg) MUTEX_LOCK(env, rep->mtx_diag); - __os_gettime(env, &ts, 1); + __os_gettime(env, &ts, 0); __db_msgadd(env, &mb, "[%lu:%lu][%s] %s: ", - (u_long)ts.tv_sec, (u_long)ts.tv_nsec/NS_PER_US, + (u_long)ts.tv_sec, (u_long)ts.tv_nsec / NS_PER_US, env->dbenv->thread_id_string(env->dbenv, pid, tid, buf), s); __db_msgadd_ap(env, &mb, fmt, ap); @@ -2260,6 +2414,26 @@ __rep_print_message(env, eid, rp, str, flags) FLD_SET(verbflag, DB_VERB_REP_MISC); type = "all_req"; break; + case REP_BLOB_ALL_REQ: + FLD_SET(verbflag, DB_VERB_REP_MISC); + type = "all_blob_req"; + break; + case REP_BLOB_CHUNK: + FLD_SET(verbflag, DB_VERB_REP_MISC); + type = "blob_chunk"; + break; + case REP_BLOB_CHUNK_REQ: + FLD_SET(verbflag, DB_VERB_REP_MISC); + type = "blob_chunk_req"; + break; + case REP_BLOB_UPDATE: + FLD_SET(verbflag, DB_VERB_REP_MISC); + type = "blob_update"; + break; + case REP_BLOB_UPDATE_REQ: + FLD_SET(verbflag, DB_VERB_REP_MISC); + type = "blob_update_req"; + break; case REP_BULK_LOG: FLD_SET(verbflag, DB_VERB_REP_MISC); type = "bulk_log"; @@ -2650,9 +2824,19 @@ __rep_log_backup(env, logc, lsn, match) */ if ((match == REP_REC_COMMIT && rectype == DB___txn_regop) || - (match == REP_REC_PERM && - (rectype == DB___txn_ckp || rectype == DB___txn_regop))) + ((match == REP_REC_PERM || match == REP_REC_PERM_DEL) && + IS_PERM_RECTYPE(rectype))) break; + /* + * Break early if a file remove is discovered in the logs. + * BDB cannot restore a deleted database or blob file from + * logs, so trigger internal init to recover the file. + * Used by Instant Internal Init in replication. + */ + if (match == REP_REC_PERM_DEL && rectype == DB___fop_remove) { + ret = DB_NOTFOUND; + break; + } } return (ret); } @@ -2671,7 +2855,6 @@ __rep_get_maxpermlsn(env, max_perm_lsnp) { DB_LOG *dblp; DB_REP *db_rep; - DB_THREAD_INFO *ip; LOG *lp; REP *rep; @@ -2680,11 +2863,9 @@ __rep_get_maxpermlsn(env, max_perm_lsnp) dblp = env->lg_handle; lp = dblp->reginfo.primary; - ENV_ENTER(env, ip); MUTEX_LOCK(env, rep->mtx_clientdb); *max_perm_lsnp = lp->max_perm_lsn; MUTEX_UNLOCK(env, rep->mtx_clientdb); - ENV_LEAVE(env, ip); return (0); } @@ -2724,12 +2905,13 @@ __rep_get_datagen(env, data_genp) u_int8_t data_buf[__REP_LSN_HIST_DATA_SIZE]; DBT key_dbt, data_dbt; u_int32_t flags; - int ret, t_ret, tries; + int ret, t_ret, tries, was_open; db_rep = env->rep_handle; ret = 0; *data_genp = 0; tries = 0; + was_open = 0; flags = DB_LAST; retry: if ((ret = __txn_begin(env, NULL, NULL, &txn, DB_IGNORE_LEASE)) != 0) @@ -2746,10 +2928,10 @@ retry: * That is not an error. */ ret = 0; - goto out; + goto noclose; } - db_rep->lsn_db = dbp; - } + } else + was_open = 1; if ((ret = __db_cursor(dbp, NULL, txn, &dbc, 0)) != 0) goto out; @@ -2784,8 +2966,126 @@ retry: &key, key_buf, __REP_LSN_HIST_KEY_SIZE, NULL)) == 0) *data_genp = key.gen; out: + if (!was_open && dbp != NULL && + (t_ret = __db_close(dbp, txn, DB_NOSYNC)) != 0 && ret == 0) + ret = t_ret; +noclose: if ((t_ret = __txn_commit(txn, DB_TXN_NOSYNC)) != 0 && ret == 0) ret = t_ret; err: return (ret); } + +/* + * __rep_become_readonly_master -- + * + * Put this master into a state where it no longer accepts writes but it + * is still a master that can respond to requests for missing messages. + * It fills in sync_lsn to provide a mechanism to know the LSN of the + * next log record expected on this site. Generally, this site should + * be restarted as a client shortly after becoming a readonly master. + * + * PUBLIC: int __rep_become_readonly_master + * PUBLIC: __P((ENV *, u_int32_t *, DB_LSN *)); + */ +int +__rep_become_readonly_master(env, gen, sync_lsnp) + ENV *env; + u_int32_t *gen; + DB_LSN *sync_lsnp; +{ + DB_LOG *dblp; + DB_REP *db_rep; + LOG *lp; + REP *rep; + int locked, ret; + + db_rep = env->rep_handle; + rep = db_rep->region; + dblp = env->lg_handle; + lp = dblp->reginfo.primary; + *gen = 0; + ZERO_LSN(*sync_lsnp); + ret = 0; + locked = 0; + + REP_SYSTEM_LOCK(env); + /* + * Lock out replication message thread processing so that replication + * world won't change (e.g. restart, client sync). + */ + if (FLD_ISSET(rep->lockout_flags, REP_LOCKOUT_MSG)) { + /* There is already someone in msg lockout, return. */ + RPRINT(env, (env, DB_VERB_REP_MISC, + "Readonly master: thread already in msg lockout")); + goto errunlock; + } else if ((ret = __rep_lockout_msg(env, rep, 0)) != 0) + goto errclearlockouts; + + /* + * Lock out API to wait for active txn/mpool operations to complete + * and prevent new ones from starting. + */ + if ((ret = __rep_lockout_api(env, rep)) != 0) + goto errclearlockouts; + locked = 1; + + /* Make this site a readonly master and get master generation. */ + F_SET(rep, REP_F_READONLY_MASTER); + *gen = rep->gen; + REP_SYSTEM_UNLOCK(env); + + /* Get the next log record the logging subsystem expects to write. */ + LOG_SYSTEM_LOCK(env); + *sync_lsnp = lp->lsn; + LOG_SYSTEM_UNLOCK(env); + + REP_SYSTEM_LOCK(env); +errclearlockouts: + FLD_CLR(rep->lockout_flags, REP_LOCKOUT_MSG); + if (locked) + CLR_LOCKOUT_BDB(rep); +errunlock: + REP_SYSTEM_UNLOCK(env); + return (ret); +} + +/* + * __rep_get_lsnhist_data -- + * + * A utility function to get the full LSN history database record for a + * particular gen. + * + * PUBLIC: int __rep_get_lsnhist_data __P((ENV *, DB_THREAD_INFO *, + * PUBLIC: u_int32_t, __rep_lsn_hist_data_args *)); + */ +int +__rep_get_lsnhist_data(env, ip, gen, lsnhist_data) + ENV *env; + DB_THREAD_INFO *ip; + u_int32_t gen; + __rep_lsn_hist_data_args *lsnhist_data; +{ + DB_TXN *txn; + DBC *dbc; + struct rep_waitgoal reason; + int ret, t_ret; + + txn = NULL; + dbc = NULL; + + /* + * Cannot use cached LSN history values because we need the + * timestamp value here, which is not cached. + */ + ret = __rep_read_lsn_history(env, + ip, &txn, &dbc, gen, lsnhist_data, &reason, DB_SET, 0); + + if (dbc != NULL && + (t_ret = __dbc_close(dbc)) != 0 && ret == 0) + ret = t_ret; + if (txn != NULL && + (t_ret = __db_txn_auto_resolve(env, txn, 1, ret)) != 0 && ret == 0) + ret = t_ret; + return (ret); +} diff --git a/src/rep/rep_verify.c b/src/rep/rep_verify.c index 5238f900..40a0dfce 100644 --- a/src/rep/rep_verify.c +++ b/src/rep/rep_verify.c @@ -1,7 +1,7 @@ /*- * See the file LICENSE for redistribution information. * - * Copyright (c) 2004, 2012 Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2004, 2015 Oracle and/or its affiliates. All rights reserved. * * $Id$ */ @@ -119,8 +119,15 @@ __rep_verify(env, rp, rec, eid, savetime) goto out; } } + /* + * Search for a matching perm record. If none is found, + * or a database or file delete is encountered before the + * perm record, begin internal init. Database and blob file + * deletes cannot be undone once committed, so internal init + * must be used to re-create the files. + */ if ((ret = __rep_log_backup(env, logc, &lsn, - REP_REC_PERM)) == 0) { + REP_REC_PERM_DEL)) == 0) { MUTEX_LOCK(env, rep->mtx_clientdb); lp->verify_lsn = lsn; __os_gettime(env, &lp->rcvd_ts, 1); @@ -205,8 +212,10 @@ __rep_internal_init(env, abbrev) u_int32_t abbrev; { REP *rep; + u_int32_t ctlflags; int master, ret; + ctlflags = 0; rep = env->rep_handle->region; REP_SYSTEM_LOCK(env); #ifdef HAVE_STATISTICS @@ -227,6 +236,7 @@ __rep_internal_init(env, abbrev) RPRINT(env, (env, DB_VERB_REP_SYNC, "send UPDATE_REQ, merely to check for NIMDB refresh")); F_SET(rep, REP_F_ABBREVIATED); + FLD_SET(ctlflags, REPCTL_INMEM_ONLY); } else F_CLR(rep, REP_F_ABBREVIATED); ZERO_LSN(rep->first_lsn); @@ -237,7 +247,7 @@ __rep_internal_init(env, abbrev) REP_SYSTEM_UNLOCK(env); if (ret == 0 && master != DB_EID_INVALID) (void)__rep_send_message(env, - master, REP_UPDATE_REQ, NULL, NULL, 0, 0); + master, REP_UPDATE_REQ, NULL, NULL, ctlflags, 0); return (ret); } @@ -504,8 +514,7 @@ __rep_dorecovery(env, lsnp, trunclsnp) */ DB_ASSERT(env, rep->op_cnt == 0); DB_ASSERT(env, rep->msg_th == 1); - if (rectype == DB___txn_regop || rectype == DB___txn_ckp || - rectype == DB___dbreg_register) + if (IS_PERM_RECTYPE(rectype) || rectype == DB___dbreg_register) skip_rec = 0; if (rectype == DB___txn_regop) { if (rep->version >= DB_REPVERSION_44) { @@ -653,8 +662,10 @@ __rep_verify_match(env, reclsnp, savetime) /* * Lockout the API and wait for operations to complete. */ - if ((ret = __rep_lockout_api(env, rep)) != 0) + if ((ret = __rep_lockout_api(env, rep)) != 0) { + FLD_CLR(rep->lockout_flags, REP_LOCKOUT_MSG); goto errunlock; + } /* OK, everyone is out, we can now run recovery. */ REP_SYSTEM_UNLOCK(env); @@ -690,6 +701,10 @@ __rep_verify_match(env, reclsnp, savetime) */ if (db_rep->rep_db == NULL && (ret = __rep_client_dbinit(env, 0, REP_DB)) != 0) { + REP_SYSTEM_LOCK(env); + FLD_CLR(rep->lockout_flags, + REP_LOCKOUT_API | REP_LOCKOUT_MSG | REP_LOCKOUT_OP); + REP_SYSTEM_UNLOCK(env); MUTEX_UNLOCK(env, rep->mtx_clientdb); goto out; } |