diff options
Diffstat (limited to 'src/db/partition.c')
-rw-r--r-- | src/db/partition.c | 292 |
1 files changed, 231 insertions, 61 deletions
diff --git a/src/db/partition.c b/src/db/partition.c index f8beaf16..86491ba3 100644 --- a/src/db/partition.c +++ b/src/db/partition.c @@ -1,7 +1,7 @@ /*- * See the file LICENSE for redistribution information. * - * Copyright (c) 1996, 2012 Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 1996, 2015 Oracle and/or its affiliates. All rights reserved. * * $Id$ */ @@ -32,13 +32,12 @@ static int __partc_writelock __P((DBC*)); static int __partition_chk_meta __P((DB *, DB_THREAD_INFO *, DB_TXN *, u_int32_t)); static int __partition_setup_keys __P((DBC *, - DB_PARTITION *, DBMETA *, u_int32_t)); + DB_PARTITION *, u_int32_t, u_int32_t)); static int __part_key_cmp __P((const void *, const void *)); static inline void __part_search __P((DB *, DB_PARTITION *, DBT *, u_int32_t *)); -static char *Alloc_err = DB_STR_A("0644", - "Partition open failed to allocate %d bytes", "%d"); +#define ALLOC_ERR DB_STR_A("0764","Partition failed to allocate %d bytes","%d") /* * Allocate a partition cursor and copy flags to the partition cursor. @@ -70,20 +69,27 @@ static inline void __part_search(dbp, part, key, part_idp) { db_indx_t base, indx, limit; int cmp; - int (*func) __P((DB *, const DBT *, const DBT *)); + int (*func) __P((DB *, const DBT *, const DBT *, size_t *)); + size_t pos, pos_h, pos_l; DB_ASSERT(dbp->env, part->nparts != 0); COMPQUIET(cmp, 0); COMPQUIET(indx, 0); + pos_h = 0; + pos_l = 0; func = ((BTREE *)dbp->bt_internal)->bt_compare; DB_BINARY_SEARCH_FOR(base, limit, part->nparts, O_INDX) { + pos = pos_l > pos_h ? pos_h : pos_l; DB_BINARY_SEARCH_INCR(indx, base, limit, O_INDX); - cmp = func(dbp, key, &part->keys[indx]); + cmp = func(dbp, key, &part->keys[indx], &pos); if (cmp == 0) break; - if (cmp > 0) + if (cmp > 0) { DB_BINARY_SEARCH_SHIFT_BASE(indx, base, limit, O_INDX); + pos_l = pos; + } else + pos_h = pos; } if (cmp == 0) *part_idp = indx; @@ -146,7 +152,8 @@ __partition_set(dbp, parts, keys, callback) { DB_PARTITION *part; ENV *env; - int ret; + u_int32_t i; + int ret, t_ret; DB_ILLEGAL_AFTER_OPEN(dbp, "DB->set_partition"); env = dbp->dbenv->env; @@ -155,6 +162,11 @@ __partition_set(dbp, parts, keys, callback) __db_errx(env, DB_STR("0646", "Must specify at least 2 partitions.")); return (EINVAL); + } else if (parts > PART_MAXIMUM) { + __db_errx(env, DB_STR_A("0772", + "Must not specify more than %u partitions.", "%u"), + (unsigned int)PART_MAXIMUM); + return (EINVAL); } if (keys == NULL && callback == NULL) { @@ -178,11 +190,59 @@ bad: __db_errx(env, DB_STR("0648", (part->callback != NULL && keys != NULL)) goto bad; + /* + * Free a key array that was allocated by an earlier set_partition call. + */ + if (part->keys != NULL) { + for (i = 0; i < part->nparts - 1; i++) { + /* + * Always free all entries in the key array and return + * the first error code. + */ + if ((t_ret = __db_dbt_clone_free(dbp->env, + &part->keys[i])) != 0 && ret == 0) + ret = t_ret; + } + __os_free(dbp->env, part->keys); + part->keys = NULL; + } + + if (ret != 0) + return (ret); + part->nparts = parts; - part->keys = keys; part->callback = callback; - return (0); + /* + * Take a copy of the users key array otherwise we cannot be sure + * that the memory will still be valid when the database is opened. + */ + if (keys != NULL) { + if ((ret = __os_calloc(dbp->env, + part->nparts - 1, sizeof(DBT), &part->keys)) != 0) + goto err; + + for (i = 0, parts = 0; i < part->nparts - 1; i++, parts++) + if ((ret = __db_dbt_clone(dbp->env, + &part->keys[i], &keys[i])) != 0) + goto err; + } + +err: if (ret != 0 && part->keys != NULL) { + /* + * Always free those entries cloned successfully in the key + * array and the one which fails in __db_dbt_clone, and + * return the first error code. As ret != 0 here, so it is + * safe to ignore any error from __db_dbt_clone_free. + */ + for (i = 0; i < parts; i++) + (void)__db_dbt_clone_free(dbp->env, &part->keys[i]); + if (parts < part->nparts - 1 && part->keys[parts].data != NULL) + __os_free(dbp->env, part->keys[parts].data); + __os_free(dbp->env, part->keys); + part->keys = NULL; + } + return (ret); } /* @@ -288,15 +348,16 @@ __partition_open(dbp, ip, txn, fname, type, flags, mode, do_open) if ((ret = __os_calloc(env, part->nparts, sizeof(*part->handles), &part->handles)) != 0) { - __db_errx(env, - Alloc_err, part->nparts * sizeof(*part->handles)); + __db_errx(env, ALLOC_ERR, + (int)(part->nparts * sizeof(*part->handles))); goto err; } DB_ASSERT(env, fname != NULL); if ((ret = __os_malloc(env, strlen(fname) + PART_LEN + 1, &name)) != 0) { - __db_errx(env, Alloc_err, strlen(fname) + PART_LEN + 1); + __db_errx(env, ALLOC_ERR, + (int)(strlen(fname) + PART_LEN + 1)); goto err; } @@ -330,6 +391,9 @@ __partition_open(dbp, ip, txn, fname, type, flags, mode, do_open) part_db->dup_compare = dbp->dup_compare; part_db->app_private = dbp->app_private; part_db->api_internal = dbp->api_internal; + part_db->blob_threshold = dbp->blob_threshold; + part_db->blob_file_id = dbp->blob_file_id; + part_db->blob_sdb_id = dbp->blob_sdb_id; if (dbp->type == DB_BTREE) __bam_copy_config(dbp, part_db, part->nparts); @@ -388,7 +452,8 @@ __partition_chk_meta(dbp, ip, txn, flags) DB_MPOOLFILE *mpf; ENV *env; db_pgno_t base_pgno; - int ret, t_ret; + int ret, set_keys, t_ret; + u_int32_t pgsize; dbc = NULL; meta = NULL; @@ -397,6 +462,14 @@ __partition_chk_meta(dbp, ip, txn, flags) mpf = dbp->mpf; env = dbp->env; ret = 0; + set_keys = 0; + + /* + * Just to fix the lint warning. + * The real value will be set later, and we will + * only use the value after being set properly. + */ + pgsize = dbp->pgsize; /* Get a cursor on the main db. */ dbp->p_internal = NULL; @@ -475,10 +548,12 @@ __partition_chk_meta(dbp, ip, txn, flags) } } else if (meta->magic != DB_BTREEMAGIC) { __db_errx(env, DB_STR("0658", - "Partitioning only supported on BTREE nad HASH.")); + "Partitioning only supported on BTREE and HASH.")); ret = EINVAL; - } else - ret = __partition_setup_keys(dbc, part, meta, flags); + } else { + set_keys = 1; + pgsize = meta->pagesize; + } err: /* Put the metadata page back. */ if (meta != NULL && (t_ret = __memp_fput(mpf, @@ -487,6 +562,15 @@ err: /* Put the metadata page back. */ if ((t_ret = __LPUT(dbc, metalock)) != 0 && ret == 0) ret = t_ret; + /* + * We can only call __partition_setup_keys after putting + * the meta page and releasing the meta lock, or self-deadlock + * will occur. + */ + if (ret == 0 && set_keys && (t_ret = + __partition_setup_keys(dbc, part, pgsize, flags)) != 0) + ret = t_ret; + if (dbc != NULL && (t_ret = __dbc_close(dbc)) != 0 && ret == 0) ret = t_ret; @@ -502,7 +586,7 @@ err: /* Put the metadata page back. */ struct key_sort { DB *dbp; DBT *key; - int (*compare) __P((DB *, const DBT *, const DBT *)); + int (*compare) __P((DB *, const DBT *, const DBT *, size_t *)); }; static int __part_key_cmp(a, b) @@ -512,7 +596,7 @@ static int __part_key_cmp(a, b) ka = a; kb = b; - return (ka->compare(ka->dbp, ka->key, kb->key)); + return (ka->compare(ka->dbp, ka->key, kb->key, NULL)); } /* * __partition_setup_keys -- @@ -520,25 +604,22 @@ static int __part_key_cmp(a, b) * are creating a partitioned database. */ static int -__partition_setup_keys(dbc, part, meta, flags) +__partition_setup_keys(dbc, part, pgsize, flags) DBC *dbc; DB_PARTITION *part; - DBMETA *meta; - u_int32_t flags; + u_int32_t flags, pgsize; { BTREE *t; DB *dbp; - DBT data, key, *keys, *kp; + DBT data, key, *keys, *kp, *okp; ENV *env; - u_int32_t ds, i, j; - u_int8_t *dd; + db_pgno_t last_pgno; + u_int32_t cgetflags, i, j; + size_t dsize; struct key_sort *ks; - int have_keys, ret; - int (*compare) __P((DB *, const DBT *, const DBT *)); - void *dp; + int have_keys, ret, t_ret; + int (*compare) __P((DB *, const DBT *, const DBT *, size_t *)); - COMPQUIET(dd, NULL); - COMPQUIET(ds, 0); memset(&data, 0, sizeof(data)); memset(&key, 0, sizeof(key)); ks = NULL; @@ -549,6 +630,9 @@ __partition_setup_keys(dbc, part, meta, flags) /* Need to just read the main database. */ dbp->p_internal = NULL; have_keys = 0; + dsize = 0; + + keys = part->keys; /* First verify that things what we expect. */ if ((ret = __dbc_get(dbc, &key, &data, DB_FIRST)) != 0) { @@ -581,11 +665,15 @@ __partition_setup_keys(dbc, part, meta, flags) } if (LF_ISSET(DB_CREATE) && have_keys == 0) { - /* Insert the keys into the master database. */ + /* + * Insert the keys into the master database. We will also + * compute the total size of the keys for later use. + */ for (i = 0; i < part->nparts - 1; i++) { if ((ret = __db_put(dbp, dbc->thread_info, dbc->txn, &part->keys[i], &data, 0)) != 0) goto err; + dsize += part->keys[i].size; } /* @@ -604,39 +692,71 @@ __partition_setup_keys(dbc, part, meta, flags) } done: if (F_ISSET(part, PART_RANGE)) { /* - * Allocate one page to hold the keys plus space at the - * end of the buffer to put an array of DBTs. If there - * is not enough space __dbc_get will return how much - * is needed and we realloc. + * If we just did the insert, we have known the total size of + * the keys. Otherwise, the keys must have been in the database, + * and we can calculate the size by checking the last pgno of + * the corresponding mpoolfile. + * + * We make the size aligned at 1024 for performance. */ + if (dsize == 0) { + ret = __memp_get_last_pgno(dbp->mpf, &last_pgno); + if (ret != 0) + goto err; + if (last_pgno > 1) + last_pgno--; + dsize = last_pgno * pgsize; + } + dsize = DB_ALIGN(dsize, 1024); + if ((ret = __os_malloc(env, - meta->pagesize + (sizeof(DBT) * part->nparts), + dsize + (sizeof(DBT) * part->nparts), &part->data)) != 0) { - __db_errx(env, Alloc_err, meta->pagesize); + __db_errx(env, ALLOC_ERR, (int)dsize); goto err; } + memset(part->data, 0, + dsize + (sizeof(DBT) * part->nparts)); + + kp = okp = (DBT *) + ((u_int8_t *)part->data + dsize); memset(&key, 0, sizeof(key)); memset(&data, 0, sizeof(data)); - data.data = part->data; - data.ulen = meta->pagesize; data.flags = DB_DBT_USERMEM; -again: if ((ret = __dbc_get(dbc, &key, &data, - DB_FIRST | DB_MULTIPLE_KEY)) == DB_BUFFER_SMALL) { - if ((ret = __os_realloc(env, - data.size + (sizeof(DBT) * part->nparts), - &part->data)) != 0) + j = 0; + cgetflags = DB_FIRST; + while ((ret = __dbc_get(dbc, &key, &data, cgetflags)) == 0) { + /* It is an error if we get more keys than expect. */ + if ((u_int32_t)(kp - okp) > part->nparts) { + ret = EINVAL; goto err; - data.data = part->data; - data.ulen = data.size; - goto again; + } + kp->size = key.size; + kp->data = (u_int8_t *)part->data + j; + /* It is an error if the keys overflow the space. */ + if (j + kp->size > dsize) { + ret = EINVAL; + goto err; + } + memcpy(kp->data, key.data, kp->size); + j += kp->size; + cgetflags = DB_NEXT; + kp++; } + + /* + * We should get part->nparts keys back, otherwise it means + * the passed-in keys are not valid. + */ + if (ret == DB_NOTFOUND && (u_int32_t)(kp - okp) == part->nparts) + ret = 0; + if (ret == 0) { /* * They passed in keys, they must match. */ - keys = NULL; compare = NULL; - if (have_keys == 1 && (keys = part->keys) != NULL) { + if (have_keys == 1 && keys != NULL) { t = dbc->dbp->bt_internal; compare = t->bt_compare; if ((ret = __os_malloc(env, (part->nparts - 1) @@ -651,20 +771,15 @@ again: if ((ret = __dbc_get(dbc, &key, &data, qsort(ks, (size_t)part->nparts - 1, sizeof(struct key_sort), __part_key_cmp); } - DB_MULTIPLE_INIT(dp, &data); part->keys = (DBT *) - ((u_int8_t *)part->data + data.size); + ((u_int8_t *)part->data + dsize); + F_SET(part, PART_KEYS_SETUP); j = 0; for (kp = part->keys; kp < &part->keys[part->nparts]; kp++, j++) { - DB_MULTIPLE_KEY_NEXT(dp, - &data, kp->data, kp->size, dd, ds); - if (dp == NULL) { - ret = DB_NOTFOUND; - break; - } - if (keys != NULL && j != 0 && - compare(dbc->dbp, ks[j - 1].key, kp) != 0) { + if (have_keys == 1 && keys != NULL && j != 0 && + compare(dbc->dbp, ks[j - 1].key, + kp, NULL) != 0) { if (kp->data == NULL && F_ISSET(dbp, DB_AM_RECOVER)) goto err; @@ -683,6 +798,24 @@ again: if ((ret = __dbc_get(dbc, &key, &data, err: dbp->p_internal = part; if (ks != NULL) __os_free(env, ks); + + /* + * We only free the original copy of the key array when + * the keys have been setup properly, otherwise we let + * the close function to free the memory. + */ + if (keys != NULL && F_ISSET(part, PART_KEYS_SETUP)) { + for (i = 0; i < part->nparts - 1; i++) + /* + * Always free all entries in the key array and return + * the first error code. + */ + if ((t_ret = __db_dbt_clone_free(env, + &keys[i])) != 0 && ret == 0) + ret = t_ret; + __os_free(env, keys); + } + return (ret); } @@ -1183,6 +1316,15 @@ __partition_close(dbp, txn, flags) ret = t_ret; __os_free(env, part->handles); } + if (!F_ISSET(part, PART_KEYS_SETUP) && part->keys != NULL) { + for (i = 0; i < part->nparts - 1; i++) { + if (part->keys[i].data != NULL && (t_ret = + __db_dbt_clone_free(env, &part->keys[i])) != 0 && + ret == 0) + ret = t_ret; + } + __os_free(env, part->keys); + } if (part->dirs != NULL) __os_free(env, (char **)part->dirs); if (part->data != NULL) @@ -1471,7 +1613,8 @@ __part_fileid_reset(env, ip, fname, nparts, encrypted) if ((ret = __os_malloc(env, strlen(fname) + PART_LEN + 1, &name)) != 0) { - __db_errx(env, Alloc_err, strlen(fname) + PART_LEN + 1); + __db_errx(env, ALLOC_ERR, + (int)(strlen(fname) + PART_LEN + 1)); return (ret); } @@ -1747,7 +1890,8 @@ __part_rr(dbp, ip, txn, name, subdb, newname, flags) COMPQUIET(np, NULL); if (newname != NULL && (ret = __os_malloc(env, strlen(newname) + PART_LEN + 1, &np)) != 0) { - __db_errx(env, Alloc_err, strlen(newname) + PART_LEN + 1); + __db_errx(env, ALLOC_ERR, + (int)(strlen(newname) + PART_LEN + 1)); goto err; } for (i = 0; i < part->nparts; i++, pdbp++) { @@ -1790,6 +1934,32 @@ err: /* } return (ret); } + +/* + * __partc_dup -- + * Duplicate a cursor on a partitioned database. + * + * PUBLIC: int __partc_dup __P((DBC *, DBC *)); + */ +int +__partc_dup(dbc_orig, dbc_n) + DBC *dbc_orig; + DBC *dbc_n; +{ + PART_CURSOR *orig, *new; + + orig = (PART_CURSOR *)dbc_orig->internal; + new = (PART_CURSOR *)dbc_n->internal; + + /* + * A cursor on a partitioned database contains the identifier + * of the underlying database and a regular cursor that points + * to the underlying database. Copy both pieces. + */ + new->part_id = orig->part_id; + + return (__dbc_dup(orig->sub_cursor, &new->sub_cursor, DB_POSITION)); +} #ifdef HAVE_VERIFY /* * __part_verify -- |