summaryrefslogtreecommitdiff
path: root/src/db/partition.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/db/partition.c')
-rw-r--r--src/db/partition.c292
1 files changed, 231 insertions, 61 deletions
diff --git a/src/db/partition.c b/src/db/partition.c
index f8beaf16..86491ba3 100644
--- a/src/db/partition.c
+++ b/src/db/partition.c
@@ -1,7 +1,7 @@
/*-
* See the file LICENSE for redistribution information.
*
- * Copyright (c) 1996, 2012 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 1996, 2015 Oracle and/or its affiliates. All rights reserved.
*
* $Id$
*/
@@ -32,13 +32,12 @@ static int __partc_writelock __P((DBC*));
static int __partition_chk_meta __P((DB *,
DB_THREAD_INFO *, DB_TXN *, u_int32_t));
static int __partition_setup_keys __P((DBC *,
- DB_PARTITION *, DBMETA *, u_int32_t));
+ DB_PARTITION *, u_int32_t, u_int32_t));
static int __part_key_cmp __P((const void *, const void *));
static inline void __part_search __P((DB *,
DB_PARTITION *, DBT *, u_int32_t *));
-static char *Alloc_err = DB_STR_A("0644",
- "Partition open failed to allocate %d bytes", "%d");
+#define ALLOC_ERR DB_STR_A("0764","Partition failed to allocate %d bytes","%d")
/*
* Allocate a partition cursor and copy flags to the partition cursor.
@@ -70,20 +69,27 @@ static inline void __part_search(dbp, part, key, part_idp)
{
db_indx_t base, indx, limit;
int cmp;
- int (*func) __P((DB *, const DBT *, const DBT *));
+ int (*func) __P((DB *, const DBT *, const DBT *, size_t *));
+ size_t pos, pos_h, pos_l;
DB_ASSERT(dbp->env, part->nparts != 0);
COMPQUIET(cmp, 0);
COMPQUIET(indx, 0);
+ pos_h = 0;
+ pos_l = 0;
func = ((BTREE *)dbp->bt_internal)->bt_compare;
DB_BINARY_SEARCH_FOR(base, limit, part->nparts, O_INDX) {
+ pos = pos_l > pos_h ? pos_h : pos_l;
DB_BINARY_SEARCH_INCR(indx, base, limit, O_INDX);
- cmp = func(dbp, key, &part->keys[indx]);
+ cmp = func(dbp, key, &part->keys[indx], &pos);
if (cmp == 0)
break;
- if (cmp > 0)
+ if (cmp > 0) {
DB_BINARY_SEARCH_SHIFT_BASE(indx, base, limit, O_INDX);
+ pos_l = pos;
+ } else
+ pos_h = pos;
}
if (cmp == 0)
*part_idp = indx;
@@ -146,7 +152,8 @@ __partition_set(dbp, parts, keys, callback)
{
DB_PARTITION *part;
ENV *env;
- int ret;
+ u_int32_t i;
+ int ret, t_ret;
DB_ILLEGAL_AFTER_OPEN(dbp, "DB->set_partition");
env = dbp->dbenv->env;
@@ -155,6 +162,11 @@ __partition_set(dbp, parts, keys, callback)
__db_errx(env, DB_STR("0646",
"Must specify at least 2 partitions."));
return (EINVAL);
+ } else if (parts > PART_MAXIMUM) {
+ __db_errx(env, DB_STR_A("0772",
+ "Must not specify more than %u partitions.", "%u"),
+ (unsigned int)PART_MAXIMUM);
+ return (EINVAL);
}
if (keys == NULL && callback == NULL) {
@@ -178,11 +190,59 @@ bad: __db_errx(env, DB_STR("0648",
(part->callback != NULL && keys != NULL))
goto bad;
+ /*
+ * Free a key array that was allocated by an earlier set_partition call.
+ */
+ if (part->keys != NULL) {
+ for (i = 0; i < part->nparts - 1; i++) {
+ /*
+ * Always free all entries in the key array and return
+ * the first error code.
+ */
+ if ((t_ret = __db_dbt_clone_free(dbp->env,
+ &part->keys[i])) != 0 && ret == 0)
+ ret = t_ret;
+ }
+ __os_free(dbp->env, part->keys);
+ part->keys = NULL;
+ }
+
+ if (ret != 0)
+ return (ret);
+
part->nparts = parts;
- part->keys = keys;
part->callback = callback;
- return (0);
+ /*
+ * Take a copy of the users key array otherwise we cannot be sure
+ * that the memory will still be valid when the database is opened.
+ */
+ if (keys != NULL) {
+ if ((ret = __os_calloc(dbp->env,
+ part->nparts - 1, sizeof(DBT), &part->keys)) != 0)
+ goto err;
+
+ for (i = 0, parts = 0; i < part->nparts - 1; i++, parts++)
+ if ((ret = __db_dbt_clone(dbp->env,
+ &part->keys[i], &keys[i])) != 0)
+ goto err;
+ }
+
+err: if (ret != 0 && part->keys != NULL) {
+ /*
+ * Always free those entries cloned successfully in the key
+ * array and the one which fails in __db_dbt_clone, and
+ * return the first error code. As ret != 0 here, so it is
+ * safe to ignore any error from __db_dbt_clone_free.
+ */
+ for (i = 0; i < parts; i++)
+ (void)__db_dbt_clone_free(dbp->env, &part->keys[i]);
+ if (parts < part->nparts - 1 && part->keys[parts].data != NULL)
+ __os_free(dbp->env, part->keys[parts].data);
+ __os_free(dbp->env, part->keys);
+ part->keys = NULL;
+ }
+ return (ret);
}
/*
@@ -288,15 +348,16 @@ __partition_open(dbp, ip, txn, fname, type, flags, mode, do_open)
if ((ret = __os_calloc(env,
part->nparts, sizeof(*part->handles), &part->handles)) != 0) {
- __db_errx(env,
- Alloc_err, part->nparts * sizeof(*part->handles));
+ __db_errx(env, ALLOC_ERR,
+ (int)(part->nparts * sizeof(*part->handles)));
goto err;
}
DB_ASSERT(env, fname != NULL);
if ((ret = __os_malloc(env,
strlen(fname) + PART_LEN + 1, &name)) != 0) {
- __db_errx(env, Alloc_err, strlen(fname) + PART_LEN + 1);
+ __db_errx(env, ALLOC_ERR,
+ (int)(strlen(fname) + PART_LEN + 1));
goto err;
}
@@ -330,6 +391,9 @@ __partition_open(dbp, ip, txn, fname, type, flags, mode, do_open)
part_db->dup_compare = dbp->dup_compare;
part_db->app_private = dbp->app_private;
part_db->api_internal = dbp->api_internal;
+ part_db->blob_threshold = dbp->blob_threshold;
+ part_db->blob_file_id = dbp->blob_file_id;
+ part_db->blob_sdb_id = dbp->blob_sdb_id;
if (dbp->type == DB_BTREE)
__bam_copy_config(dbp, part_db, part->nparts);
@@ -388,7 +452,8 @@ __partition_chk_meta(dbp, ip, txn, flags)
DB_MPOOLFILE *mpf;
ENV *env;
db_pgno_t base_pgno;
- int ret, t_ret;
+ int ret, set_keys, t_ret;
+ u_int32_t pgsize;
dbc = NULL;
meta = NULL;
@@ -397,6 +462,14 @@ __partition_chk_meta(dbp, ip, txn, flags)
mpf = dbp->mpf;
env = dbp->env;
ret = 0;
+ set_keys = 0;
+
+ /*
+ * Just to fix the lint warning.
+ * The real value will be set later, and we will
+ * only use the value after being set properly.
+ */
+ pgsize = dbp->pgsize;
/* Get a cursor on the main db. */
dbp->p_internal = NULL;
@@ -475,10 +548,12 @@ __partition_chk_meta(dbp, ip, txn, flags)
}
} else if (meta->magic != DB_BTREEMAGIC) {
__db_errx(env, DB_STR("0658",
- "Partitioning only supported on BTREE nad HASH."));
+ "Partitioning only supported on BTREE and HASH."));
ret = EINVAL;
- } else
- ret = __partition_setup_keys(dbc, part, meta, flags);
+ } else {
+ set_keys = 1;
+ pgsize = meta->pagesize;
+ }
err: /* Put the metadata page back. */
if (meta != NULL && (t_ret = __memp_fput(mpf,
@@ -487,6 +562,15 @@ err: /* Put the metadata page back. */
if ((t_ret = __LPUT(dbc, metalock)) != 0 && ret == 0)
ret = t_ret;
+ /*
+ * We can only call __partition_setup_keys after putting
+ * the meta page and releasing the meta lock, or self-deadlock
+ * will occur.
+ */
+ if (ret == 0 && set_keys && (t_ret =
+ __partition_setup_keys(dbc, part, pgsize, flags)) != 0)
+ ret = t_ret;
+
if (dbc != NULL && (t_ret = __dbc_close(dbc)) != 0 && ret == 0)
ret = t_ret;
@@ -502,7 +586,7 @@ err: /* Put the metadata page back. */
struct key_sort {
DB *dbp;
DBT *key;
- int (*compare) __P((DB *, const DBT *, const DBT *));
+ int (*compare) __P((DB *, const DBT *, const DBT *, size_t *));
};
static int __part_key_cmp(a, b)
@@ -512,7 +596,7 @@ static int __part_key_cmp(a, b)
ka = a;
kb = b;
- return (ka->compare(ka->dbp, ka->key, kb->key));
+ return (ka->compare(ka->dbp, ka->key, kb->key, NULL));
}
/*
* __partition_setup_keys --
@@ -520,25 +604,22 @@ static int __part_key_cmp(a, b)
* are creating a partitioned database.
*/
static int
-__partition_setup_keys(dbc, part, meta, flags)
+__partition_setup_keys(dbc, part, pgsize, flags)
DBC *dbc;
DB_PARTITION *part;
- DBMETA *meta;
- u_int32_t flags;
+ u_int32_t flags, pgsize;
{
BTREE *t;
DB *dbp;
- DBT data, key, *keys, *kp;
+ DBT data, key, *keys, *kp, *okp;
ENV *env;
- u_int32_t ds, i, j;
- u_int8_t *dd;
+ db_pgno_t last_pgno;
+ u_int32_t cgetflags, i, j;
+ size_t dsize;
struct key_sort *ks;
- int have_keys, ret;
- int (*compare) __P((DB *, const DBT *, const DBT *));
- void *dp;
+ int have_keys, ret, t_ret;
+ int (*compare) __P((DB *, const DBT *, const DBT *, size_t *));
- COMPQUIET(dd, NULL);
- COMPQUIET(ds, 0);
memset(&data, 0, sizeof(data));
memset(&key, 0, sizeof(key));
ks = NULL;
@@ -549,6 +630,9 @@ __partition_setup_keys(dbc, part, meta, flags)
/* Need to just read the main database. */
dbp->p_internal = NULL;
have_keys = 0;
+ dsize = 0;
+
+ keys = part->keys;
/* First verify that things what we expect. */
if ((ret = __dbc_get(dbc, &key, &data, DB_FIRST)) != 0) {
@@ -581,11 +665,15 @@ __partition_setup_keys(dbc, part, meta, flags)
}
if (LF_ISSET(DB_CREATE) && have_keys == 0) {
- /* Insert the keys into the master database. */
+ /*
+ * Insert the keys into the master database. We will also
+ * compute the total size of the keys for later use.
+ */
for (i = 0; i < part->nparts - 1; i++) {
if ((ret = __db_put(dbp, dbc->thread_info,
dbc->txn, &part->keys[i], &data, 0)) != 0)
goto err;
+ dsize += part->keys[i].size;
}
/*
@@ -604,39 +692,71 @@ __partition_setup_keys(dbc, part, meta, flags)
}
done: if (F_ISSET(part, PART_RANGE)) {
/*
- * Allocate one page to hold the keys plus space at the
- * end of the buffer to put an array of DBTs. If there
- * is not enough space __dbc_get will return how much
- * is needed and we realloc.
+ * If we just did the insert, we have known the total size of
+ * the keys. Otherwise, the keys must have been in the database,
+ * and we can calculate the size by checking the last pgno of
+ * the corresponding mpoolfile.
+ *
+ * We make the size aligned at 1024 for performance.
*/
+ if (dsize == 0) {
+ ret = __memp_get_last_pgno(dbp->mpf, &last_pgno);
+ if (ret != 0)
+ goto err;
+ if (last_pgno > 1)
+ last_pgno--;
+ dsize = last_pgno * pgsize;
+ }
+ dsize = DB_ALIGN(dsize, 1024);
+
if ((ret = __os_malloc(env,
- meta->pagesize + (sizeof(DBT) * part->nparts),
+ dsize + (sizeof(DBT) * part->nparts),
&part->data)) != 0) {
- __db_errx(env, Alloc_err, meta->pagesize);
+ __db_errx(env, ALLOC_ERR, (int)dsize);
goto err;
}
+ memset(part->data, 0,
+ dsize + (sizeof(DBT) * part->nparts));
+
+ kp = okp = (DBT *)
+ ((u_int8_t *)part->data + dsize);
memset(&key, 0, sizeof(key));
memset(&data, 0, sizeof(data));
- data.data = part->data;
- data.ulen = meta->pagesize;
data.flags = DB_DBT_USERMEM;
-again: if ((ret = __dbc_get(dbc, &key, &data,
- DB_FIRST | DB_MULTIPLE_KEY)) == DB_BUFFER_SMALL) {
- if ((ret = __os_realloc(env,
- data.size + (sizeof(DBT) * part->nparts),
- &part->data)) != 0)
+ j = 0;
+ cgetflags = DB_FIRST;
+ while ((ret = __dbc_get(dbc, &key, &data, cgetflags)) == 0) {
+ /* It is an error if we get more keys than expect. */
+ if ((u_int32_t)(kp - okp) > part->nparts) {
+ ret = EINVAL;
goto err;
- data.data = part->data;
- data.ulen = data.size;
- goto again;
+ }
+ kp->size = key.size;
+ kp->data = (u_int8_t *)part->data + j;
+ /* It is an error if the keys overflow the space. */
+ if (j + kp->size > dsize) {
+ ret = EINVAL;
+ goto err;
+ }
+ memcpy(kp->data, key.data, kp->size);
+ j += kp->size;
+ cgetflags = DB_NEXT;
+ kp++;
}
+
+ /*
+ * We should get part->nparts keys back, otherwise it means
+ * the passed-in keys are not valid.
+ */
+ if (ret == DB_NOTFOUND && (u_int32_t)(kp - okp) == part->nparts)
+ ret = 0;
+
if (ret == 0) {
/*
* They passed in keys, they must match.
*/
- keys = NULL;
compare = NULL;
- if (have_keys == 1 && (keys = part->keys) != NULL) {
+ if (have_keys == 1 && keys != NULL) {
t = dbc->dbp->bt_internal;
compare = t->bt_compare;
if ((ret = __os_malloc(env, (part->nparts - 1)
@@ -651,20 +771,15 @@ again: if ((ret = __dbc_get(dbc, &key, &data,
qsort(ks, (size_t)part->nparts - 1,
sizeof(struct key_sort), __part_key_cmp);
}
- DB_MULTIPLE_INIT(dp, &data);
part->keys = (DBT *)
- ((u_int8_t *)part->data + data.size);
+ ((u_int8_t *)part->data + dsize);
+ F_SET(part, PART_KEYS_SETUP);
j = 0;
for (kp = part->keys;
kp < &part->keys[part->nparts]; kp++, j++) {
- DB_MULTIPLE_KEY_NEXT(dp,
- &data, kp->data, kp->size, dd, ds);
- if (dp == NULL) {
- ret = DB_NOTFOUND;
- break;
- }
- if (keys != NULL && j != 0 &&
- compare(dbc->dbp, ks[j - 1].key, kp) != 0) {
+ if (have_keys == 1 && keys != NULL && j != 0 &&
+ compare(dbc->dbp, ks[j - 1].key,
+ kp, NULL) != 0) {
if (kp->data == NULL &&
F_ISSET(dbp, DB_AM_RECOVER))
goto err;
@@ -683,6 +798,24 @@ again: if ((ret = __dbc_get(dbc, &key, &data,
err: dbp->p_internal = part;
if (ks != NULL)
__os_free(env, ks);
+
+ /*
+ * We only free the original copy of the key array when
+ * the keys have been setup properly, otherwise we let
+ * the close function to free the memory.
+ */
+ if (keys != NULL && F_ISSET(part, PART_KEYS_SETUP)) {
+ for (i = 0; i < part->nparts - 1; i++)
+ /*
+ * Always free all entries in the key array and return
+ * the first error code.
+ */
+ if ((t_ret = __db_dbt_clone_free(env,
+ &keys[i])) != 0 && ret == 0)
+ ret = t_ret;
+ __os_free(env, keys);
+ }
+
return (ret);
}
@@ -1183,6 +1316,15 @@ __partition_close(dbp, txn, flags)
ret = t_ret;
__os_free(env, part->handles);
}
+ if (!F_ISSET(part, PART_KEYS_SETUP) && part->keys != NULL) {
+ for (i = 0; i < part->nparts - 1; i++) {
+ if (part->keys[i].data != NULL && (t_ret =
+ __db_dbt_clone_free(env, &part->keys[i])) != 0 &&
+ ret == 0)
+ ret = t_ret;
+ }
+ __os_free(env, part->keys);
+ }
if (part->dirs != NULL)
__os_free(env, (char **)part->dirs);
if (part->data != NULL)
@@ -1471,7 +1613,8 @@ __part_fileid_reset(env, ip, fname, nparts, encrypted)
if ((ret = __os_malloc(env,
strlen(fname) + PART_LEN + 1, &name)) != 0) {
- __db_errx(env, Alloc_err, strlen(fname) + PART_LEN + 1);
+ __db_errx(env, ALLOC_ERR,
+ (int)(strlen(fname) + PART_LEN + 1));
return (ret);
}
@@ -1747,7 +1890,8 @@ __part_rr(dbp, ip, txn, name, subdb, newname, flags)
COMPQUIET(np, NULL);
if (newname != NULL && (ret = __os_malloc(env,
strlen(newname) + PART_LEN + 1, &np)) != 0) {
- __db_errx(env, Alloc_err, strlen(newname) + PART_LEN + 1);
+ __db_errx(env, ALLOC_ERR,
+ (int)(strlen(newname) + PART_LEN + 1));
goto err;
}
for (i = 0; i < part->nparts; i++, pdbp++) {
@@ -1790,6 +1934,32 @@ err: /*
}
return (ret);
}
+
+/*
+ * __partc_dup --
+ * Duplicate a cursor on a partitioned database.
+ *
+ * PUBLIC: int __partc_dup __P((DBC *, DBC *));
+ */
+int
+__partc_dup(dbc_orig, dbc_n)
+ DBC *dbc_orig;
+ DBC *dbc_n;
+{
+ PART_CURSOR *orig, *new;
+
+ orig = (PART_CURSOR *)dbc_orig->internal;
+ new = (PART_CURSOR *)dbc_n->internal;
+
+ /*
+ * A cursor on a partitioned database contains the identifier
+ * of the underlying database and a regular cursor that points
+ * to the underlying database. Copy both pieces.
+ */
+ new->part_id = orig->part_id;
+
+ return (__dbc_dup(orig->sub_cursor, &new->sub_cursor, DB_POSITION));
+}
#ifdef HAVE_VERIFY
/*
* __part_verify --