summaryrefslogtreecommitdiff
path: root/bdb/hash/hash.c
diff options
context:
space:
mode:
Diffstat (limited to 'bdb/hash/hash.c')
-rw-r--r--bdb/hash/hash.c1386
1 files changed, 676 insertions, 710 deletions
diff --git a/bdb/hash/hash.c b/bdb/hash/hash.c
index e96fd4898f0..2f972a3238d 100644
--- a/bdb/hash/hash.c
+++ b/bdb/hash/hash.c
@@ -1,7 +1,7 @@
/*-
* See the file LICENSE for redistribution information.
*
- * Copyright (c) 1996, 1997, 1998, 1999, 2000
+ * Copyright (c) 1996-2002
* Sleepycat Software. All rights reserved.
*/
/*
@@ -43,7 +43,7 @@
#include "db_config.h"
#ifndef lint
-static const char revid[] = "$Id: hash.c,v 11.94 2001/01/03 16:42:26 ubell Exp $";
+static const char revid[] = "$Id: hash.c,v 11.166 2002/08/06 06:11:25 bostic Exp $";
#endif /* not lint */
#ifndef NO_SYSTEM_INCLUDES
@@ -54,446 +54,70 @@ static const char revid[] = "$Id: hash.c,v 11.94 2001/01/03 16:42:26 ubell Exp $
#endif
#include "db_int.h"
-#include "db_page.h"
-#include "db_am.h"
-#include "db_ext.h"
-#include "db_shash.h"
-#include "db_swap.h"
-#include "hash.h"
-#include "btree.h"
-#include "log.h"
-#include "lock.h"
-#include "txn.h"
+#include "dbinc/db_page.h"
+#include "dbinc/db_shash.h"
+#include "dbinc/btree.h"
+#include "dbinc/hash.h"
+#include "dbinc/lock.h"
+static int __ham_bulk __P((DBC *, DBT *, u_int32_t));
static int __ham_c_close __P((DBC *, db_pgno_t, int *));
static int __ham_c_del __P((DBC *));
static int __ham_c_destroy __P((DBC *));
static int __ham_c_get __P((DBC *, DBT *, DBT *, u_int32_t, db_pgno_t *));
static int __ham_c_put __P((DBC *, DBT *, DBT *, u_int32_t, db_pgno_t *));
static int __ham_c_writelock __P((DBC *));
-static int __ham_del_dups __P((DBC *, DBT *));
-static int __ham_delete __P((DB *, DB_TXN *, DBT *, u_int32_t));
static int __ham_dup_return __P((DBC *, DBT *, u_int32_t));
static int __ham_expand_table __P((DBC *));
-static int __ham_init_htab __P((DBC *,
- const char *, db_pgno_t, u_int32_t, u_int32_t));
static int __ham_lookup __P((DBC *,
const DBT *, u_int32_t, db_lockmode_t, db_pgno_t *));
static int __ham_overwrite __P((DBC *, DBT *, u_int32_t));
/*
- * __ham_metachk --
+ * __ham_quick_delete --
+ * When performing a DB->del operation that does not involve secondary
+ * indices and is not removing an off-page duplicate tree, we can
+ * speed things up substantially by removing the entire duplicate
+ * set, if any is present, in one operation, rather than by conjuring
+ * up and deleting each of the items individually. (All are stored
+ * in one big HKEYDATA structure.) We don't bother to distinguish
+ * on-page duplicate sets from single, non-dup items; they're deleted
+ * in exactly the same way.
*
- * PUBLIC: int __ham_metachk __P((DB *, const char *, HMETA *));
- */
-int
-__ham_metachk(dbp, name, hashm)
- DB *dbp;
- const char *name;
- HMETA *hashm;
-{
- DB_ENV *dbenv;
- u_int32_t vers;
- int ret;
-
- dbenv = dbp->dbenv;
-
- /*
- * At this point, all we know is that the magic number is for a Hash.
- * Check the version, the database may be out of date.
- */
- vers = hashm->dbmeta.version;
- if (F_ISSET(dbp, DB_AM_SWAP))
- M_32_SWAP(vers);
- switch (vers) {
- case 4:
- case 5:
- case 6:
- __db_err(dbenv,
- "%s: hash version %lu requires a version upgrade",
- name, (u_long)vers);
- return (DB_OLD_VERSION);
- case 7:
- break;
- default:
- __db_err(dbenv,
- "%s: unsupported hash version: %lu", name, (u_long)vers);
- return (EINVAL);
- }
-
- /* Swap the page if we need to. */
- if (F_ISSET(dbp, DB_AM_SWAP) && (ret = __ham_mswap((PAGE *)hashm)) != 0)
- return (ret);
-
- /* Check the type. */
- if (dbp->type != DB_HASH && dbp->type != DB_UNKNOWN)
- return (EINVAL);
- dbp->type = DB_HASH;
- DB_ILLEGAL_METHOD(dbp, DB_OK_HASH);
-
- /*
- * Check application info against metadata info, and set info, flags,
- * and type based on metadata info.
- */
- if ((ret = __db_fchk(dbenv,
- "DB->open", hashm->dbmeta.flags,
- DB_HASH_DUP | DB_HASH_SUBDB | DB_HASH_DUPSORT)) != 0)
- return (ret);
-
- if (F_ISSET(&hashm->dbmeta, DB_HASH_DUP))
- F_SET(dbp, DB_AM_DUP);
- else
- if (F_ISSET(dbp, DB_AM_DUP)) {
- __db_err(dbenv,
- "%s: DB_DUP specified to open method but not set in database",
- name);
- return (EINVAL);
- }
-
- if (F_ISSET(&hashm->dbmeta, DB_HASH_SUBDB))
- F_SET(dbp, DB_AM_SUBDB);
- else
- if (F_ISSET(dbp, DB_AM_SUBDB)) {
- __db_err(dbenv,
- "%s: multiple databases specified but not supported in file",
- name);
- return (EINVAL);
- }
-
- if (F_ISSET(&hashm->dbmeta, DB_HASH_DUPSORT)) {
- if (dbp->dup_compare == NULL)
- dbp->dup_compare = __bam_defcmp;
- } else
- if (dbp->dup_compare != NULL) {
- __db_err(dbenv,
- "%s: duplicate sort function specified but not set in database",
- name);
- return (EINVAL);
- }
-
- /* Set the page size. */
- dbp->pgsize = hashm->dbmeta.pagesize;
-
- /* Copy the file's ID. */
- memcpy(dbp->fileid, hashm->dbmeta.uid, DB_FILE_ID_LEN);
-
- return (0);
-}
-
-/*
- * __ham_open --
+ * This function is called by __db_delete when the appropriate
+ * conditions are met, and it performs the delete in the optimized way.
*
- * PUBLIC: int __ham_open __P((DB *, const char *, db_pgno_t, u_int32_t));
+ * The cursor should be set to the first item in the duplicate
+ * set, or to the sole key/data pair when the key does not have a
+ * duplicate set, before the function is called.
+ *
+ * PUBLIC: int __ham_quick_delete __P((DBC *));
*/
int
-__ham_open(dbp, name, base_pgno, flags)
- DB *dbp;
- const char *name;
- db_pgno_t base_pgno;
- u_int32_t flags;
-{
- DB_ENV *dbenv;
- DBC *dbc;
- HASH_CURSOR *hcp;
- HASH *hashp;
- int need_sync, ret, t_ret;
-
- dbc = NULL;
- dbenv = dbp->dbenv;
- need_sync = 0;
-
- /* Initialize the remaining fields/methods of the DB. */
- dbp->del = __ham_delete;
- dbp->stat = __ham_stat;
-
- /*
- * Get a cursor. If DB_CREATE is specified, we may be creating
- * pages, and to do that safely in CDB we need a write cursor.
- * In STD_LOCKING mode, we'll synchronize using the meta page
- * lock instead.
- */
- if ((ret = dbp->cursor(dbp,
- dbp->open_txn, &dbc, LF_ISSET(DB_CREATE) && CDB_LOCKING(dbenv) ?
- DB_WRITECURSOR : 0)) != 0)
- return (ret);
-
- hcp = (HASH_CURSOR *)dbc->internal;
- hashp = dbp->h_internal;
- hashp->meta_pgno = base_pgno;
- if ((ret = __ham_get_meta(dbc)) != 0)
- goto err1;
-
- /*
- * If this is a new file, initialize it, and put it back dirty.
- *
- * Initialize the hdr structure.
- */
- if (hcp->hdr->dbmeta.magic == DB_HASHMAGIC) {
- /* File exists, verify the data in the header. */
- if (hashp->h_hash == NULL)
- hashp->h_hash = hcp->hdr->dbmeta.version < 5
- ? __ham_func4 : __ham_func5;
- if (!F_ISSET(dbp, DB_RDONLY) &&
- hashp->h_hash(dbp,
- CHARKEY, sizeof(CHARKEY)) != hcp->hdr->h_charkey) {
- __db_err(dbp->dbenv,
- "hash: incompatible hash function");
- ret = EINVAL;
- goto err2;
- }
- if (F_ISSET(&hcp->hdr->dbmeta, DB_HASH_DUP))
- F_SET(dbp, DB_AM_DUP);
- if (F_ISSET(&hcp->hdr->dbmeta, DB_HASH_DUPSORT))
- F_SET(dbp, DB_AM_DUPSORT);
- if (F_ISSET(&hcp->hdr->dbmeta, DB_HASH_SUBDB))
- F_SET(dbp, DB_AM_SUBDB);
- } else if (!IS_RECOVERING(dbenv)) {
- /*
- * File does not exist, we must initialize the header. If
- * locking is enabled that means getting a write lock first.
- * During recovery the meta page will be in the log.
- */
- dbc->lock.pgno = base_pgno;
-
- if (STD_LOCKING(dbc) &&
- ((ret = lock_put(dbenv, &hcp->hlock)) != 0 ||
- (ret = lock_get(dbenv, dbc->locker,
- DB_NONBLOCK(dbc) ? DB_LOCK_NOWAIT : 0,
- &dbc->lock_dbt, DB_LOCK_WRITE, &hcp->hlock)) != 0))
- goto err2;
- else if (CDB_LOCKING(dbp->dbenv)) {
- DB_ASSERT(LF_ISSET(DB_CREATE));
- if ((ret = lock_get(dbenv, dbc->locker,
- DB_LOCK_UPGRADE, &dbc->lock_dbt, DB_LOCK_WRITE,
- &dbc->mylock)) != 0)
- goto err2;
- }
- if ((ret = __ham_init_htab(dbc, name,
- base_pgno, hashp->h_nelem, hashp->h_ffactor)) != 0)
- goto err2;
-
- need_sync = 1;
- }
-
-err2: /* Release the meta data page */
- if ((t_ret = __ham_release_meta(dbc)) != 0 && ret == 0)
- ret = t_ret;
-err1: if ((t_ret = dbc->c_close(dbc)) != 0 && ret == 0)
- ret = t_ret;
-
- /* Sync the file so that we know that the meta data goes to disk. */
- if (ret == 0 && need_sync)
- ret = dbp->sync(dbp, 0);
-#if CONFIG_TEST
- if (ret == 0)
- DB_TEST_RECOVERY(dbp, DB_TEST_POSTSYNC, ret, name);
-
-DB_TEST_RECOVERY_LABEL
-#endif
- if (ret != 0)
- (void)__ham_db_close(dbp);
-
- return (ret);
-}
-
-/************************** LOCAL CREATION ROUTINES **********************/
-/*
- * Returns 0 on No Error
- */
-static int
-__ham_init_htab(dbc, name, pgno, nelem, ffactor)
+__ham_quick_delete(dbc)
DBC *dbc;
- const char *name;
- db_pgno_t pgno;
- u_int32_t nelem, ffactor;
{
- DB *dbp;
- DB_LOCK metalock;
- DB_LSN orig_lsn;
- DBMETA *mmeta;
- HASH_CURSOR *hcp;
- HASH *hashp;
- PAGE *h;
- db_pgno_t mpgno;
- int32_t l2, nbuckets;
- int dirty_mmeta, i, ret, t_ret;
-
- hcp = (HASH_CURSOR *)dbc->internal;
- dbp = dbc->dbp;
- hashp = dbp->h_internal;
- mmeta = NULL;
- h = NULL;
- ret = 0;
- dirty_mmeta = 0;
- metalock.off = LOCK_INVALID;
-
- if (hashp->h_hash == NULL)
- hashp->h_hash = DB_HASHVERSION < 5 ? __ham_func4 : __ham_func5;
-
- if (nelem != 0 && ffactor != 0) {
- nelem = (nelem - 1) / ffactor + 1;
- l2 = __db_log2(nelem > 2 ? nelem : 2);
- } else
- l2 = 1;
- nbuckets = 1 << l2;
-
- orig_lsn = hcp->hdr->dbmeta.lsn;
- memset(hcp->hdr, 0, sizeof(HMETA));
- ZERO_LSN(hcp->hdr->dbmeta.lsn);
- hcp->hdr->dbmeta.pgno = pgno;
- hcp->hdr->dbmeta.magic = DB_HASHMAGIC;
- hcp->hdr->dbmeta.version = DB_HASHVERSION;
- hcp->hdr->dbmeta.pagesize = dbp->pgsize;
- hcp->hdr->dbmeta.type = P_HASHMETA;
- hcp->hdr->dbmeta.free = PGNO_INVALID;
- hcp->hdr->max_bucket = hcp->hdr->high_mask = nbuckets - 1;
- hcp->hdr->low_mask = (nbuckets >> 1) - 1;
- hcp->hdr->ffactor = ffactor;
- hcp->hdr->h_charkey = hashp->h_hash(dbp, CHARKEY, sizeof(CHARKEY));
- memcpy(hcp->hdr->dbmeta.uid, dbp->fileid, DB_FILE_ID_LEN);
-
- if (F_ISSET(dbp, DB_AM_DUP))
- F_SET(&hcp->hdr->dbmeta, DB_HASH_DUP);
- if (F_ISSET(dbp, DB_AM_SUBDB))
- F_SET(&hcp->hdr->dbmeta, DB_HASH_SUBDB);
- if (dbp->dup_compare != NULL)
- F_SET(&hcp->hdr->dbmeta, DB_HASH_DUPSORT);
-
- if ((ret = memp_fset(dbp->mpf, hcp->hdr, DB_MPOOL_DIRTY)) != 0)
- goto err;
-
- /*
- * Create the first and second buckets pages so that we have the
- * page numbers for them and we can store that page number
- * in the meta-data header (spares[0]).
- */
- hcp->hdr->spares[0] = nbuckets;
- if ((ret = memp_fget(dbp->mpf,
- &hcp->hdr->spares[0], DB_MPOOL_NEW_GROUP, &h)) != 0)
- goto err;
-
- P_INIT(h, dbp->pgsize, hcp->hdr->spares[0], PGNO_INVALID,
- PGNO_INVALID, 0, P_HASH);
-
- /* Fill in the last fields of the meta data page. */
- hcp->hdr->spares[0] -= (nbuckets - 1);
- for (i = 1; i <= l2; i++)
- hcp->hdr->spares[i] = hcp->hdr->spares[0];
- for (; i < NCACHED; i++)
- hcp->hdr->spares[i] = PGNO_INVALID;
-
- /*
- * Before we are about to put any dirty pages, we need to log
- * the meta-data page create.
- */
- ret = __db_log_page(dbp, name, &orig_lsn, pgno, (PAGE *)hcp->hdr);
-
- if (dbp->open_txn != NULL) {
- mmeta = (DBMETA *) hcp->hdr;
- if (F_ISSET(dbp, DB_AM_SUBDB)) {
-
- /*
- * If this is a subdatabase, then we need to
- * get the LSN off the master meta data page
- * because that's where free pages are linked
- * and during recovery we need to access
- * that page and roll it backward/forward
- * correctly with respect to LSN.
- */
- mpgno = PGNO_BASE_MD;
- if ((ret = __db_lget(dbc,
- 0, mpgno, DB_LOCK_WRITE, 0, &metalock)) != 0)
- goto err;
- if ((ret = memp_fget(dbp->mpf,
- &mpgno, 0, (PAGE **)&mmeta)) != 0)
- goto err;
- }
- if ((t_ret = __ham_groupalloc_log(dbp->dbenv,
- dbp->open_txn, &LSN(mmeta), 0, dbp->log_fileid,
- &LSN(mmeta), hcp->hdr->spares[0],
- hcp->hdr->max_bucket + 1, mmeta->free)) != 0 && ret == 0)
- ret = t_ret;
- if (ret == 0) {
- /* need to update real LSN for buffer manager */
- dirty_mmeta = 1;
- }
-
- }
-
- DB_TEST_RECOVERY(dbp, DB_TEST_POSTLOG, ret, name);
-
-DB_TEST_RECOVERY_LABEL
-err: if (h != NULL &&
- (t_ret = memp_fput(dbp->mpf, h, DB_MPOOL_DIRTY)) != 0 && ret == 0)
- ret = t_ret;
-
- if (F_ISSET(dbp, DB_AM_SUBDB) && mmeta != NULL)
- if ((t_ret = memp_fput(dbp->mpf, mmeta,
- dirty_mmeta ? DB_MPOOL_DIRTY : 0)) != 0 && ret == 0)
- ret = t_ret;
- if (metalock.off != LOCK_INVALID)
- (void)__TLPUT(dbc, metalock);
-
- return (ret);
-}
-
-static int
-__ham_delete(dbp, txn, key, flags)
- DB *dbp;
- DB_TXN *txn;
- DBT *key;
- u_int32_t flags;
-{
- DBC *dbc;
- HASH_CURSOR *hcp;
- db_pgno_t pgno;
int ret, t_ret;
- /*
- * This is the only access method routine called directly from
- * the dbp, so we have to do error checking.
- */
-
- PANIC_CHECK(dbp->dbenv);
- DB_ILLEGAL_BEFORE_OPEN(dbp, "DB->del");
- DB_CHECK_TXN(dbp, txn);
-
- if ((ret =
- __db_delchk(dbp, key, flags, F_ISSET(dbp, DB_AM_RDONLY))) != 0)
- return (ret);
-
- if ((ret = dbp->cursor(dbp, txn, &dbc, DB_WRITELOCK)) != 0)
+ if ((ret = __ham_get_meta(dbc)) != 0)
return (ret);
- DEBUG_LWRITE(dbc, txn, "ham_delete", key, NULL, flags);
+ /* Assert that we're not using secondary indices. */
+ DB_ASSERT(!F_ISSET(dbc->dbp, DB_AM_SECONDARY));
+ /*
+ * We should assert that we're not a primary either, but that
+ * would require grabbing the dbp's mutex, so we don't bother.
+ */
- hcp = (HASH_CURSOR *)dbc->internal;
- if ((ret = __ham_get_meta(dbc)) != 0)
- goto out;
+ /* Assert that we're set, but not to an off-page duplicate. */
+ DB_ASSERT(IS_INITIALIZED(dbc));
+ DB_ASSERT(((HASH_CURSOR *)dbc->internal)->opd == NULL);
- pgno = PGNO_INVALID;
- if ((ret = __ham_lookup(dbc, key, 0, DB_LOCK_WRITE, &pgno)) == 0) {
- if (F_ISSET(hcp, H_OK)) {
- if (pgno == PGNO_INVALID)
- ret = __ham_del_pair(dbc, 1);
- else {
- /* When we close the cursor in __ham_del_dups,
- * that will make the off-page dup tree go
- * go away as well as our current entry. When
- * it updates cursors, ours should get marked
- * as H_DELETED.
- */
- ret = __ham_del_dups(dbc, key);
- }
- } else
- ret = DB_NOTFOUND;
- }
+ ret = __ham_del_pair(dbc, 1);
if ((t_ret = __ham_release_meta(dbc)) != 0 && ret == 0)
ret = t_ret;
-out: if ((t_ret = dbc->c_close(dbc)) != 0 && ret == 0)
- ret = t_ret;
return (ret);
}
@@ -517,8 +141,8 @@ __ham_c_init(dbc)
1, sizeof(struct cursor_t), &new_curs)) != 0)
return (ret);
if ((ret = __os_malloc(dbenv,
- dbc->dbp->pgsize, NULL, &new_curs->split_buf)) != 0) {
- __os_free(new_curs, sizeof(*new_curs));
+ dbc->dbp->pgsize, &new_curs->split_buf)) != 0) {
+ __os_free(dbenv, new_curs);
return (ret);
}
@@ -527,8 +151,10 @@ __ham_c_init(dbc)
dbc->c_count = __db_c_count;
dbc->c_del = __db_c_del;
dbc->c_dup = __db_c_dup;
- dbc->c_get = __db_c_get;
+ dbc->c_get = dbc->c_real_get = __db_c_get;
+ dbc->c_pget = __db_c_pget;
dbc->c_put = __db_c_put;
+ dbc->c_am_bulk = __ham_bulk;
dbc->c_am_close = __ham_c_close;
dbc->c_am_del = __ham_c_del;
dbc->c_am_destroy = __ham_c_destroy;
@@ -551,12 +177,14 @@ __ham_c_close(dbc, root_pgno, rmroot)
db_pgno_t root_pgno;
int *rmroot;
{
+ DB_MPOOLFILE *mpf;
HASH_CURSOR *hcp;
HKEYDATA *dp;
int doroot, gotmeta, ret, t_ret;
u_int32_t dirty;
COMPQUIET(rmroot, 0);
+ mpf = dbc->dbp->mpf;
dirty = 0;
doroot = gotmeta = ret = 0;
hcp = (HASH_CURSOR *) dbc->internal;
@@ -568,9 +196,14 @@ __ham_c_close(dbc, root_pgno, rmroot)
gotmeta = 1;
if ((ret = __ham_get_cpage(dbc, DB_LOCK_READ)) != 0)
goto out;
- dp = (HKEYDATA *)H_PAIRDATA(hcp->page, hcp->indx);
- DB_ASSERT(HPAGE_PTYPE(dp) == H_OFFDUP);
- memcpy(&root_pgno, HOFFPAGE_PGNO(dp), sizeof(db_pgno_t));
+ dp = (HKEYDATA *)H_PAIRDATA(dbc->dbp, hcp->page, hcp->indx);
+
+ /* If its not a dup we aborted before we changed it. */
+ if (HPAGE_PTYPE(dp) == H_OFFDUP)
+ memcpy(&root_pgno,
+ HOFFPAGE_PGNO(dp), sizeof(db_pgno_t));
+ else
+ root_pgno = PGNO_INVALID;
if ((ret =
hcp->opd->c_am_close(hcp->opd, root_pgno, &doroot)) != 0)
@@ -583,7 +216,7 @@ __ham_c_close(dbc, root_pgno, rmroot)
}
out: if (hcp->page != NULL && (t_ret =
- memp_fput(dbc->dbp->mpf, hcp->page, dirty)) != 0 && ret == 0)
+ mpf->put(mpf, hcp->page, dirty)) != 0 && ret == 0)
ret = t_ret;
if (gotmeta != 0 && (t_ret = __ham_release_meta(dbc)) != 0 && ret == 0)
ret = t_ret;
@@ -605,8 +238,8 @@ __ham_c_destroy(dbc)
hcp = (HASH_CURSOR *)dbc->internal;
if (hcp->split_buf != NULL)
- __os_free(hcp->split_buf, dbc->dbp->pgsize);
- __os_free(hcp, sizeof(HASH_CURSOR));
+ __os_free(dbc->dbp->dbenv, hcp->split_buf);
+ __os_free(dbc->dbp->dbenv, hcp);
return (0);
}
@@ -623,6 +256,7 @@ __ham_c_count(dbc, recnop)
db_recno_t *recnop;
{
DB *dbp;
+ DB_MPOOLFILE *mpf;
HASH_CURSOR *hcp;
db_indx_t len;
db_recno_t recno;
@@ -630,22 +264,23 @@ __ham_c_count(dbc, recnop)
u_int8_t *p, *pend;
dbp = dbc->dbp;
- hcp = (HASH_CURSOR *) dbc->internal;
+ mpf = dbp->mpf;
+ hcp = (HASH_CURSOR *)dbc->internal;
recno = 0;
if ((ret = __ham_get_cpage(dbc, DB_LOCK_READ)) != 0)
return (ret);
- switch (HPAGE_PTYPE(H_PAIRDATA(hcp->page, hcp->indx))) {
+ switch (HPAGE_PTYPE(H_PAIRDATA(dbp, hcp->page, hcp->indx))) {
case H_KEYDATA:
case H_OFFPAGE:
recno = 1;
break;
case H_DUPLICATE:
- p = HKEYDATA_DATA(H_PAIRDATA(hcp->page, hcp->indx));
+ p = HKEYDATA_DATA(H_PAIRDATA(dbp, hcp->page, hcp->indx));
pend = p +
- LEN_HDATA(hcp->page, dbp->pgsize, hcp->indx);
+ LEN_HDATA(dbp, hcp->page, dbp->pgsize, hcp->indx);
for (; p < pend; recno++) {
/* p may be odd, so copy rather than just dereffing */
memcpy(&len, p, sizeof(db_indx_t));
@@ -654,14 +289,13 @@ __ham_c_count(dbc, recnop)
break;
default:
- ret = __db_unknown_type(dbp->dbenv, "__ham_c_count",
- HPAGE_PTYPE(H_PAIRDATA(hcp->page, hcp->indx)));
+ ret = __db_pgfmt(dbp->dbenv, hcp->pgno);
goto err;
}
*recnop = recno;
-err: if ((t_ret = memp_fput(dbc->dbp->mpf, hcp->page, 0)) != 0 && ret == 0)
+err: if ((t_ret = mpf->put(mpf, hcp->page, 0)) != 0 && ret == 0)
ret = t_ret;
hcp->page = NULL;
return (ret);
@@ -673,10 +307,12 @@ __ham_c_del(dbc)
{
DB *dbp;
DBT repldbt;
+ DB_MPOOLFILE *mpf;
HASH_CURSOR *hcp;
int ret, t_ret;
dbp = dbc->dbp;
+ mpf = dbp->mpf;
hcp = (HASH_CURSOR *)dbc->internal;
if (F_ISSET(hcp, H_DELETED))
@@ -689,12 +325,12 @@ __ham_c_del(dbc)
goto out;
/* Off-page duplicates. */
- if (HPAGE_TYPE(hcp->page, H_DATAINDEX(hcp->indx)) == H_OFFDUP)
+ if (HPAGE_TYPE(dbp, hcp->page, H_DATAINDEX(hcp->indx)) == H_OFFDUP)
goto out;
if (F_ISSET(hcp, H_ISDUP)) { /* On-page duplicate. */
if (hcp->dup_off == 0 &&
- DUP_SIZE(hcp->dup_len) == LEN_HDATA(hcp->page,
+ DUP_SIZE(hcp->dup_len) == LEN_HDATA(dbp, hcp->page,
hcp->hdr->dbmeta.pagesize, hcp->indx))
ret = __ham_del_pair(dbc, 1);
else {
@@ -703,21 +339,25 @@ __ham_c_del(dbc)
repldbt.doff = hcp->dup_off;
repldbt.dlen = DUP_SIZE(hcp->dup_len);
repldbt.size = 0;
- repldbt.data = HKEYDATA_DATA(H_PAIRDATA(hcp->page,
+ repldbt.data = HKEYDATA_DATA(H_PAIRDATA(dbp, hcp->page,
hcp->indx));
- ret = __ham_replpair(dbc, &repldbt, 0);
- hcp->dup_tlen -= DUP_SIZE(hcp->dup_len);
- F_SET(hcp, H_DELETED);
- ret = __ham_c_update(dbc, DUP_SIZE(hcp->dup_len), 0, 1);
+ if ((ret = __ham_replpair(dbc, &repldbt, 0)) == 0) {
+ hcp->dup_tlen -= DUP_SIZE(hcp->dup_len);
+ F_SET(hcp, H_DELETED);
+ ret = __ham_c_update(dbc,
+ DUP_SIZE(hcp->dup_len), 0, 1);
+ }
}
} else /* Not a duplicate */
ret = __ham_del_pair(dbc, 1);
-out: if (ret == 0 && hcp->page != NULL &&
- (t_ret = memp_fput(dbp->mpf, hcp->page, DB_MPOOL_DIRTY)) != 0)
- ret = t_ret;
- hcp->page = NULL;
+out: if (hcp->page != NULL) {
+ if ((t_ret = mpf->put(mpf,
+ hcp->page, ret == 0 ? DB_MPOOL_DIRTY : 0)) && ret == 0)
+ ret = t_ret;
+ hcp->page = NULL;
+ }
if ((t_ret = __ham_release_meta(dbc)) != 0 && ret == 0)
ret = t_ret;
return (ret);
@@ -760,7 +400,7 @@ __ham_c_dup(orig_dbc, new_dbc)
* holds a lock of the correct type, so if we need a write lock and
* request it, we know that we'll get it.
*/
- if (orig->lock.off == LOCK_INVALID || orig_dbc->txn != NULL)
+ if (!LOCK_ISSET(orig->lock) || orig_dbc->txn != NULL)
return (0);
return (__ham_lock_bucket(new_dbc, DB_LOCK_READ));
@@ -775,12 +415,14 @@ __ham_c_get(dbc, key, data, flags, pgnop)
db_pgno_t *pgnop;
{
DB *dbp;
+ DB_MPOOLFILE *mpf;
HASH_CURSOR *hcp;
db_lockmode_t lock_type;
int get_key, ret, t_ret;
hcp = (HASH_CURSOR *)dbc->internal;
dbp = dbc->dbp;
+ mpf = dbp->mpf;
/* Clear OR'd in additional bits so we can check for flag equality. */
if (F_ISSET(dbc, DBC_RMW))
@@ -827,6 +469,7 @@ __ham_c_get(dbc, key, data, flags, pgnop)
case DB_SET:
case DB_SET_RANGE:
case DB_GET_BOTH:
+ case DB_GET_BOTH_RANGE:
ret = __ham_lookup(dbc, key, 0, lock_type, pgnop);
get_key = 0;
break;
@@ -856,11 +499,11 @@ __ham_c_get(dbc, key, data, flags, pgnop)
goto err;
else if (F_ISSET(hcp, H_OK)) {
if (*pgnop == PGNO_INVALID)
- ret = __ham_dup_return (dbc, data, flags);
+ ret = __ham_dup_return(dbc, data, flags);
break;
} else if (!F_ISSET(hcp, H_NOMORE)) {
__db_err(dbp->dbenv,
- "H_NOMORE returned to __ham_c_get");
+ "H_NOMORE returned to __ham_c_get");
ret = EINVAL;
break;
}
@@ -872,7 +515,7 @@ __ham_c_get(dbc, key, data, flags, pgnop)
case DB_LAST:
case DB_PREV:
case DB_PREV_NODUP:
- ret = memp_fput(dbp->mpf, hcp->page, 0);
+ ret = mpf->put(mpf, hcp->page, 0);
hcp->page = NULL;
if (hcp->bucket == 0) {
ret = DB_NOTFOUND;
@@ -890,7 +533,7 @@ __ham_c_get(dbc, key, data, flags, pgnop)
case DB_FIRST:
case DB_NEXT:
case DB_NEXT_NODUP:
- ret = memp_fput(dbp->mpf, hcp->page, 0);
+ ret = mpf->put(mpf, hcp->page, 0);
hcp->page = NULL;
hcp->indx = NDX_INVALID;
hcp->bucket++;
@@ -907,6 +550,7 @@ __ham_c_get(dbc, key, data, flags, pgnop)
break;
case DB_GET_BOTH:
case DB_GET_BOTHC:
+ case DB_GET_BOTH_RANGE:
case DB_NEXT_DUP:
case DB_SET:
case DB_SET_RANGE:
@@ -940,6 +584,382 @@ err: if ((t_ret = __ham_release_meta(dbc)) != 0 && ret == 0)
return (ret);
}
+/*
+ * __ham_bulk -- Return bulk data from a hash table.
+ */
+static int
+__ham_bulk(dbc, data, flags)
+ DBC *dbc;
+ DBT *data;
+ u_int32_t flags;
+{
+ DB *dbp;
+ DB_MPOOLFILE *mpf;
+ HASH_CURSOR *cp;
+ PAGE *pg;
+ db_indx_t dup_len, dup_off, dup_tlen, indx, *inp;
+ db_lockmode_t lock_mode;
+ db_pgno_t pgno;
+ int32_t *endp, key_off, *offp, *saveoff;
+ u_int32_t key_size, size, space;
+ u_int8_t *dbuf, *dp, *hk, *np, *tmp;
+ int is_dup, is_key;
+ int need_pg, next_key, no_dup, pagesize, ret, t_ret;
+
+ ret = 0;
+ key_off = 0;
+ dup_len = dup_off = dup_tlen = 0;
+ size = 0;
+ dbp = dbc->dbp;
+ pagesize = dbp->pgsize;
+ mpf = dbp->mpf;
+ cp = (HASH_CURSOR *)dbc->internal;
+ is_key = LF_ISSET(DB_MULTIPLE_KEY) ? 1 : 0;
+ next_key = is_key && LF_ISSET(DB_OPFLAGS_MASK) != DB_NEXT_DUP;
+ no_dup = LF_ISSET(DB_OPFLAGS_MASK) == DB_NEXT_NODUP;
+ dbuf = data->data;
+ np = dp = dbuf;
+
+ /* Keep track of space that is left. There is an termination entry */
+ space = data->ulen;
+ space -= sizeof(*offp);
+
+ /* Build the offset/size table from the end up. */
+ endp = (int32_t *) ((u_int8_t *)dbuf + data->ulen);
+ endp--;
+ offp = endp;
+
+ key_size = 0;
+ lock_mode = F_ISSET(dbc, DBC_RMW) ? DB_LOCK_WRITE: DB_LOCK_READ;
+
+next_pg:
+ need_pg = 1;
+ indx = cp->indx;
+ pg = cp->page;
+ inp = P_INP(dbp, pg);
+
+ do {
+ if (is_key) {
+ hk = H_PAIRKEY(dbp, pg, indx);
+ if (HPAGE_PTYPE(hk) == H_OFFPAGE) {
+ memcpy(&key_size,
+ HOFFPAGE_TLEN(hk), sizeof(u_int32_t));
+ memcpy(&pgno,
+ HOFFPAGE_PGNO(hk), sizeof(db_pgno_t));
+ size = key_size;
+ if (key_size > space)
+ goto get_key_space;
+ if ((ret = __bam_bulk_overflow(
+ dbc, key_size, pgno, np)) != 0)
+ return (ret);
+ space -= key_size;
+ key_off = (int32_t)(np - dbuf);
+ np += key_size;
+ } else {
+ if (need_pg) {
+ dp = np;
+ size = pagesize - HOFFSET(pg);
+ if (space < size) {
+get_key_space:
+ if (offp == endp) {
+ data->size =
+ ALIGN(size +
+ pagesize,
+ sizeof(u_int32_t));
+ return (ENOMEM);
+ }
+ goto back_up;
+ }
+ memcpy(dp,
+ (u_int8_t *)pg + HOFFSET(pg), size);
+ need_pg = 0;
+ space -= size;
+ np += size;
+ }
+ key_size = LEN_HKEY(dbp, pg, pagesize, indx);
+ key_off = (int32_t)(inp[indx] - HOFFSET(pg)
+ + dp - dbuf + SSZA(HKEYDATA, data));
+ }
+ }
+
+ hk = H_PAIRDATA(dbp, pg, indx);
+ switch (HPAGE_PTYPE(hk)) {
+ case H_DUPLICATE:
+ case H_KEYDATA:
+ if (need_pg) {
+ dp = np;
+ size = pagesize - HOFFSET(pg);
+ if (space < size) {
+back_up:
+ if (indx != 0) {
+ indx -= 2;
+ /* XXX
+ * It's not clear that this is
+ * the right way to fix this,
+ * but here goes.
+ * If we are backing up onto a
+ * duplicate, then we need to
+ * position ourselves at the
+ * end of the duplicate set.
+ * We probably need to make
+ * this work for H_OFFDUP too.
+ * It might be worth making a
+ * dummy cursor and calling
+ * __ham_item_prev.
+ */
+ tmp = H_PAIRDATA(dbp, pg, indx);
+ if (HPAGE_PTYPE(tmp) ==
+ H_DUPLICATE) {
+ dup_off = dup_tlen =
+ LEN_HDATA(dbp, pg,
+ pagesize, indx + 1);
+ memcpy(&dup_len,
+ HKEYDATA_DATA(tmp),
+ sizeof(db_indx_t));
+ }
+ goto get_space;
+ }
+ /* indx == 0 */
+ if ((ret = __ham_item_prev(dbc,
+ lock_mode, &pgno)) != 0) {
+ if (ret != DB_NOTFOUND)
+ return (ret);
+ if ((ret = mpf->put(mpf,
+ cp->page, 0)) != 0)
+ return (ret);
+ cp->page = NULL;
+ if (cp->bucket == 0) {
+ cp->indx = indx =
+ NDX_INVALID;
+ goto get_space;
+ }
+ if ((ret =
+ __ham_get_meta(dbc)) != 0)
+ return (ret);
+
+ cp->bucket--;
+ cp->pgno = BUCKET_TO_PAGE(cp,
+ cp->bucket);
+ cp->indx = NDX_INVALID;
+ if ((ret = __ham_release_meta(
+ dbc)) != 0)
+ return (ret);
+ if ((ret = __ham_item_prev(dbc,
+ lock_mode, &pgno)) != 0)
+ return (ret);
+ }
+ indx = cp->indx;
+get_space:
+ /*
+ * See if we put any data in the buffer.
+ */
+ if (offp >= endp ||
+ F_ISSET(dbc, DBC_TRANSIENT)) {
+ data->size = ALIGN(size +
+ data->ulen - space,
+ sizeof(u_int32_t));
+ return (ENOMEM);
+ }
+ /*
+ * Don't continue; we're all out
+ * of space, even though we're
+ * returning success.
+ */
+ next_key = 0;
+ break;
+ }
+ memcpy(dp, (u_int8_t *)pg + HOFFSET(pg), size);
+ need_pg = 0;
+ space -= size;
+ np += size;
+ }
+
+ /*
+ * We're about to crack the offset(s) and length(s)
+ * out of an H_KEYDATA or H_DUPLICATE item.
+ * There are three cases:
+ * 1. We were moved into a duplicate set by
+ * the standard hash cursor code. Respect
+ * the dup_off and dup_tlen we were given.
+ * 2. We stumbled upon a duplicate set while
+ * walking the page on our own. We need to
+ * recognize it as a dup and set dup_off and
+ * dup_tlen.
+ * 3. The current item is not a dup.
+ */
+ if (F_ISSET(cp, H_ISDUP)) {
+ /* Case 1 */
+ is_dup = 1;
+ dup_len = cp->dup_len;
+ dup_off = cp->dup_off;
+ dup_tlen = cp->dup_tlen;
+ } else if (HPAGE_PTYPE(hk) == H_DUPLICATE) {
+ /* Case 2 */
+ is_dup = 1;
+ /*
+ * If we run out of memory and bail,
+ * make sure the fact we're in a dup set
+ * isn't ignored later.
+ */
+ F_SET(cp, H_ISDUP);
+ dup_off = 0;
+ memcpy(&dup_len,
+ HKEYDATA_DATA(hk), sizeof(db_indx_t));
+ dup_tlen = LEN_HDATA(dbp, pg, pagesize, indx);
+ } else
+ /* Case 3 */
+ is_dup = dup_len = dup_off = dup_tlen = 0;
+
+ do {
+ space -= (is_key ? 4 : 2) * sizeof(*offp);
+ size += (is_key ? 4 : 2) * sizeof(*offp);
+ /*
+ * Since space is an unsigned, if we happen
+ * to wrap, then this comparison will turn out
+ * to be true. XXX Wouldn't it be better to
+ * simply check above that space is greater than
+ * the value we're about to subtract???
+ */
+ if (space > data->ulen) {
+ if (!is_dup || dup_off == 0)
+ goto back_up;
+ dup_off -= (db_indx_t)DUP_SIZE(offp[1]);
+ goto get_space;
+ }
+ if (is_key) {
+ *offp-- = key_off;
+ *offp-- = key_size;
+ }
+ if (is_dup) {
+ *offp-- = (int32_t)(
+ inp[indx + 1] - HOFFSET(pg) +
+ dp - dbuf + SSZA(HKEYDATA, data) +
+ dup_off + sizeof(db_indx_t));
+ memcpy(&dup_len,
+ HKEYDATA_DATA(hk) + dup_off,
+ sizeof(db_indx_t));
+ dup_off += DUP_SIZE(dup_len);
+ *offp-- = dup_len;
+ } else {
+ *offp-- = (int32_t)(
+ inp[indx + 1] - HOFFSET(pg) +
+ dp - dbuf + SSZA(HKEYDATA, data));
+ *offp-- = LEN_HDATA(dbp, pg,
+ pagesize, indx);
+ }
+ } while (is_dup && dup_off < dup_tlen && no_dup == 0);
+ F_CLR(cp, H_ISDUP);
+ break;
+ case H_OFFDUP:
+ memcpy(&pgno, HOFFPAGE_PGNO(hk), sizeof(db_pgno_t));
+ space -= 2 * sizeof(*offp);
+ if (space > data->ulen)
+ goto back_up;
+
+ if (is_key) {
+ space -= 2 * sizeof(*offp);
+ if (space > data->ulen)
+ goto back_up;
+ *offp-- = key_off;
+ *offp-- = key_size;
+ }
+ saveoff = offp;
+ if ((ret = __bam_bulk_duplicates(dbc,
+ pgno, dbuf, is_key ? offp + 2 : NULL,
+ &offp, &np, &space, no_dup)) != 0) {
+ if (ret == ENOMEM) {
+ size = space;
+ if (is_key && saveoff == offp) {
+ offp += 2;
+ goto back_up;
+ }
+ goto get_space;
+ }
+ return (ret);
+ }
+ break;
+ case H_OFFPAGE:
+ space -= (is_key ? 4 : 2) * sizeof(*offp);
+ if (space > data->ulen)
+ goto back_up;
+
+ memcpy(&size, HOFFPAGE_TLEN(hk), sizeof(u_int32_t));
+ memcpy(&pgno, HOFFPAGE_PGNO(hk), sizeof(db_pgno_t));
+ if (size > space)
+ goto back_up;
+
+ if ((ret =
+ __bam_bulk_overflow(dbc, size, pgno, np)) != 0)
+ return (ret);
+
+ if (is_key) {
+ *offp-- = key_off;
+ *offp-- = key_size;
+ }
+
+ *offp-- = (int32_t)(np - dbuf);
+ *offp-- = size;
+
+ np += size;
+ space -= size;
+ break;
+ }
+ } while (next_key && (indx += 2) < NUM_ENT(pg));
+
+ cp->indx = indx;
+ cp->dup_len = dup_len;
+ cp->dup_off = dup_off;
+ cp->dup_tlen = dup_tlen;
+
+ /* If we are off the page then try to the next page. */
+ if (ret == 0 && next_key && indx >= NUM_ENT(pg)) {
+ if ((ret = __ham_item_next(dbc, lock_mode, &pgno)) == 0)
+ goto next_pg;
+ if (ret != DB_NOTFOUND)
+ return (ret);
+ if ((ret = mpf->put(dbc->dbp->mpf, cp->page, 0)) != 0)
+ return (ret);
+ cp->page = NULL;
+ if ((ret = __ham_get_meta(dbc)) != 0)
+ return (ret);
+
+ cp->bucket++;
+ if (cp->bucket > cp->hdr->max_bucket) {
+ /*
+ * Restore cursor to its previous state. We're past
+ * the last item in the last bucket, so the next
+ * DBC->c_get(DB_NEXT) will return DB_NOTFOUND.
+ */
+ cp->bucket--;
+ ret = DB_NOTFOUND;
+ } else {
+ /*
+ * Start on the next bucket.
+ *
+ * Note that if this new bucket happens to be empty,
+ * but there's another non-empty bucket after it,
+ * we'll return early. This is a rare case, and we
+ * don't guarantee any particular number of keys
+ * returned on each call, so just let the next call
+ * to bulk get move forward by yet another bucket.
+ */
+ cp->pgno = BUCKET_TO_PAGE(cp, cp->bucket);
+ cp->indx = NDX_INVALID;
+ F_CLR(cp, H_ISDUP);
+ ret = __ham_item_next(dbc, lock_mode, &pgno);
+ }
+
+ if ((t_ret = __ham_release_meta(dbc)) != 0)
+ return (t_ret);
+ if (ret == 0)
+ goto next_pg;
+ if (ret != DB_NOTFOUND)
+ return (ret);
+ }
+ *offp = (u_int32_t) -1;
+ return (0);
+}
+
static int
__ham_c_put(dbc, key, data, flags, pgnop)
DBC *dbc;
@@ -949,6 +969,7 @@ __ham_c_put(dbc, key, data, flags, pgnop)
db_pgno_t *pgnop;
{
DB *dbp;
+ DB_MPOOLFILE *mpf;
DBT tmp_val, *myval;
HASH_CURSOR *hcp;
u_int32_t nbytes;
@@ -962,6 +983,7 @@ __ham_c_put(dbc, key, data, flags, pgnop)
COMPQUIET(myval, NULL);
dbp = dbc->dbp;
+ mpf = dbp->mpf;
hcp = (HASH_CURSOR *)dbc->internal;
if (F_ISSET(hcp, H_DELETED) &&
@@ -984,8 +1006,7 @@ __ham_c_put(dbc, key, data, flags, pgnop)
ret = 0;
if (hcp->seek_found_page != PGNO_INVALID &&
hcp->seek_found_page != hcp->pgno) {
- if ((ret = memp_fput(dbp->mpf, hcp->page, 0))
- != 0)
+ if ((ret = mpf->put(mpf, hcp->page, 0)) != 0)
goto err2;
hcp->page = NULL;
hcp->pgno = hcp->seek_found_page;
@@ -1000,9 +1021,10 @@ __ham_c_put(dbc, key, data, flags, pgnop)
* and then write the new bytes represented by
* val.
*/
- if ((ret = __ham_init_dbt(dbp->dbenv,
- &tmp_val, data->size + data->doff,
- &dbc->rdata.data, &dbc->rdata.ulen)) == 0) {
+ if ((ret = __ham_init_dbt(dbp->dbenv, &tmp_val,
+ data->size + data->doff,
+ &dbc->my_rdata.data,
+ &dbc->my_rdata.ulen)) == 0) {
memset(tmp_val.data, 0, data->doff);
memcpy((u_int8_t *)tmp_val.data +
data->doff, data->data, data->size);
@@ -1038,8 +1060,8 @@ done: if (ret == 0 && F_ISSET(hcp, H_EXPAND)) {
F_CLR(hcp, H_EXPAND);
}
- if (ret == 0 &&
- (t_ret = memp_fset(dbp->mpf, hcp->page, DB_MPOOL_DIRTY)) != 0)
+ if (hcp->page != NULL &&
+ (t_ret = mpf->set(mpf, hcp->page, DB_MPOOL_DIRTY)) != 0 && ret == 0)
ret = t_ret;
err2: if ((t_ret = __ham_release_meta(dbc)) != 0 && ret == 0)
@@ -1058,17 +1080,30 @@ __ham_expand_table(dbc)
DBC *dbc;
{
DB *dbp;
- PAGE *h;
+ DB_LOCK metalock;
+ DB_LSN lsn;
+ DB_MPOOLFILE *mpf;
+ DBMETA *mmeta;
HASH_CURSOR *hcp;
- db_pgno_t pgno;
- u_int32_t old_bucket, new_bucket;
- int ret;
+ PAGE *h;
+ db_pgno_t pgno, mpgno;
+ u_int32_t newalloc, new_bucket, old_bucket;
+ int dirty_meta, got_meta, logn, new_double, ret;
dbp = dbc->dbp;
+ mpf = dbp->mpf;
hcp = (HASH_CURSOR *)dbc->internal;
if ((ret = __ham_dirty_meta(dbc)) != 0)
return (ret);
+ LOCK_INIT(metalock);
+ mmeta = (DBMETA *) hcp->hdr;
+ mpgno = mmeta->pgno;
+ h = NULL;
+ dirty_meta = 0;
+ got_meta = 0;
+ newalloc = 0;
+
/*
* If the split point is about to increase, make sure that we
* have enough extra pages. The calculation here is weird.
@@ -1078,86 +1113,116 @@ __ham_expand_table(dbc)
* see what the log of one greater than that is; here we have to
* look at the log of max + 2. VERY NASTY STUFF.
*
- * It just got even nastier. With subdatabases, we have to request
- * a chunk of contiguous pages, so we do that here using an
- * undocumented feature of mpool (the MPOOL_NEW_GROUP flag) to
- * give us a number of contiguous pages. Ouch.
+ * We figure out what we need to do, then we log it, then request
+ * the pages from mpool. We don't want to fail after extending
+ * the file.
+ *
+ * If the page we are about to split into has already been allocated,
+ * then we simply need to get it to get its LSN. If it hasn't yet
+ * been allocated, then we know it's LSN (0,0).
*/
- if (hcp->hdr->max_bucket == hcp->hdr->high_mask) {
- /*
- * Ask mpool to give us a set of contiguous page numbers
- * large enough to contain the next doubling.
- *
- * Figure out how many new pages we need. This will return
- * us the last page. We calculate its page number, initialize
- * the page and then write it back to reserve all the pages
- * in between. It is possible that the allocation of new pages
- * has already been done, but the tranaction aborted. Since
- * we don't undo the allocation, check for a valid pgno before
- * doing the allocation.
- */
- pgno = hcp->hdr->max_bucket + 1;
- if (hcp->hdr->spares[__db_log2(pgno) + 1] == PGNO_INVALID)
- /* Allocate a group of pages. */
- ret = memp_fget(dbp->mpf,
- &pgno, DB_MPOOL_NEW_GROUP, &h);
- else {
- /* Just read in the last page of the batch */
- pgno = hcp->hdr->spares[__db_log2(pgno) + 1] +
- hcp->hdr->max_bucket + 1;
- /* Move to the last page of the group. */
- pgno += hcp->hdr->max_bucket;
- ret = memp_fget(dbp->mpf,
- &pgno, DB_MPOOL_CREATE, &h);
- }
- if (ret != 0)
- return (ret);
- P_INIT(h, dbp->pgsize, pgno,
- PGNO_INVALID, PGNO_INVALID, 0, P_HASH);
- pgno -= hcp->hdr->max_bucket;
- } else {
- pgno = BUCKET_TO_PAGE(hcp, hcp->hdr->max_bucket + 1);
+ new_bucket = hcp->hdr->max_bucket + 1;
+ old_bucket = new_bucket & hcp->hdr->low_mask;
+
+ new_double = hcp->hdr->max_bucket == hcp->hdr->high_mask;
+ logn = __db_log2(new_bucket);
+
+ if (!new_double || hcp->hdr->spares[logn + 1] != PGNO_INVALID) {
+ /* Page exists; get it so we can get its LSN */
+ pgno = BUCKET_TO_PAGE(hcp, new_bucket);
if ((ret =
- memp_fget(dbp->mpf, &pgno, DB_MPOOL_CREATE, &h)) != 0)
- return (ret);
+ mpf->get(mpf, &pgno, DB_MPOOL_CREATE, &h)) != 0)
+ goto err;
+ lsn = h->lsn;
+ } else {
+ /* Get the master meta-data page to do allocation. */
+ if (F_ISSET(dbp, DB_AM_SUBDB)) {
+ mpgno = PGNO_BASE_MD;
+ if ((ret = __db_lget(dbc,
+ 0, mpgno, DB_LOCK_WRITE, 0, &metalock)) != 0)
+ goto err;
+ if ((ret =
+ mpf->get(mpf, &mpgno, 0, (PAGE **)&mmeta)) != 0)
+ goto err;
+ got_meta = 1;
+ }
+ pgno = mmeta->last_pgno + 1;
+ ZERO_LSN(lsn);
+ newalloc = 1;
}
- /* Now we can log the meta-data split. */
- if (DB_LOGGING(dbc)) {
- if ((ret = __ham_metagroup_log(dbp->dbenv,
- dbc->txn, &h->lsn, 0, dbp->log_fileid,
- hcp->hdr->max_bucket, pgno, &hcp->hdr->dbmeta.lsn,
- &h->lsn)) != 0) {
- (void)memp_fput(dbp->mpf, h, DB_MPOOL_DIRTY);
- return (ret);
- }
+ /* Log the meta-data split first. */
+ if (DBC_LOGGING(dbc)) {
+ /*
+ * We always log the page number of the first page of
+ * the allocation group. However, the LSN that we log
+ * is either the LSN on the first page (if we did not
+ * do the actual allocation here) or the LSN on the last
+ * page of the unit (if we did do the allocation here).
+ */
+ if ((ret = __ham_metagroup_log(dbp, dbc->txn,
+ &lsn, 0, hcp->hdr->max_bucket, mpgno, &mmeta->lsn,
+ hcp->hdr->dbmeta.pgno, &hcp->hdr->dbmeta.lsn,
+ pgno, &lsn, newalloc)) != 0)
+ goto err;
+ } else
+ LSN_NOT_LOGGED(lsn);
- hcp->hdr->dbmeta.lsn = h->lsn;
- }
+ hcp->hdr->dbmeta.lsn = lsn;
- /* If we allocated some new pages, write out the last page. */
- if ((ret = memp_fput(dbp->mpf, h, DB_MPOOL_DIRTY)) != 0)
- return (ret);
+ if (new_double && hcp->hdr->spares[logn + 1] == PGNO_INVALID) {
+ /*
+ * We need to begin a new doubling and we have not allocated
+ * any pages yet. Read the last page in and initialize it to
+ * make the allocation contiguous. The pgno we calculated
+ * above is the first page allocated. The entry in spares is
+ * that page number minus any buckets already allocated (it
+ * simplifies bucket to page transaction). After we've set
+ * that, we calculate the last pgno.
+ */
+
+ hcp->hdr->spares[logn + 1] = pgno - new_bucket;
+ pgno += hcp->hdr->max_bucket;
+ mmeta->last_pgno = pgno;
+ mmeta->lsn = lsn;
+ dirty_meta = DB_MPOOL_DIRTY;
- new_bucket = ++hcp->hdr->max_bucket;
- old_bucket = (hcp->hdr->max_bucket & hcp->hdr->low_mask);
+ if ((ret = mpf->get(mpf, &pgno, DB_MPOOL_CREATE, &h)) != 0)
+ goto err;
+
+ P_INIT(h, dbp->pgsize,
+ pgno, PGNO_INVALID, PGNO_INVALID, 0, P_HASH);
+ }
+
+ /* Write out whatever page we ended up modifying. */
+ h->lsn = lsn;
+ if ((ret = mpf->put(mpf, h, DB_MPOOL_DIRTY)) != 0)
+ goto err;
+ h = NULL;
/*
- * If we started a new doubling, fill in the spares array with
- * the starting page number negatively offset by the bucket number.
+ * Update the meta-data page of this hash database.
*/
- if (new_bucket > hcp->hdr->high_mask) {
- /* Starting a new doubling */
+ hcp->hdr->max_bucket = new_bucket;
+ if (new_double) {
hcp->hdr->low_mask = hcp->hdr->high_mask;
hcp->hdr->high_mask = new_bucket | hcp->hdr->low_mask;
- if (hcp->hdr->spares[__db_log2(new_bucket) + 1] == PGNO_INVALID)
- hcp->hdr->spares[__db_log2(new_bucket) + 1] =
- pgno - new_bucket;
}
/* Relocate records to the new bucket */
- return (__ham_split_page(dbc, old_bucket, new_bucket));
+ ret = __ham_split_page(dbc, old_bucket, new_bucket);
+
+err: if (got_meta)
+ (void)mpf->put(mpf, mmeta, dirty_meta);
+
+ if (LOCK_ISSET(metalock))
+ (void)__TLPUT(dbc, metalock);
+
+ if (h != NULL)
+ (void)mpf->put(mpf, h, 0);
+
+ return (ret);
}
/*
@@ -1191,7 +1256,7 @@ __ham_call_hash(dbc, k, len)
* everything held by the cursor.
*/
static int
-__ham_dup_return (dbc, val, flags)
+__ham_dup_return(dbc, val, flags)
DBC *dbc;
DBT *val;
u_int32_t flags;
@@ -1211,7 +1276,7 @@ __ham_dup_return (dbc, val, flags)
dbp = dbc->dbp;
hcp = (HASH_CURSOR *)dbc->internal;
ndx = H_DATAINDEX(hcp->indx);
- type = HPAGE_TYPE(hcp->page, ndx);
+ type = HPAGE_TYPE(dbp, hcp->page, ndx);
pp = hcp->page;
myval = val;
@@ -1228,8 +1293,8 @@ __ham_dup_return (dbc, val, flags)
DB_ASSERT(type != H_OFFDUP);
/* Case 1 */
- if (type != H_DUPLICATE &&
- flags != DB_GET_BOTH && flags != DB_GET_BOTHC)
+ if (type != H_DUPLICATE && flags != DB_GET_BOTH &&
+ flags != DB_GET_BOTHC && flags != DB_GET_BOTH_RANGE)
return (0);
/*
@@ -1239,11 +1304,11 @@ __ham_dup_return (dbc, val, flags)
*/
if (!F_ISSET(hcp, H_ISDUP) && type == H_DUPLICATE) {
F_SET(hcp, H_ISDUP);
- hcp->dup_tlen = LEN_HDATA(hcp->page,
+ hcp->dup_tlen = LEN_HDATA(dbp, hcp->page,
hcp->hdr->dbmeta.pagesize, hcp->indx);
- hk = H_PAIRDATA(hcp->page, hcp->indx);
- if (flags == DB_LAST
- || flags == DB_PREV || flags == DB_PREV_NODUP) {
+ hk = H_PAIRDATA(dbp, hcp->page, hcp->indx);
+ if (flags == DB_LAST ||
+ flags == DB_PREV || flags == DB_PREV_NODUP) {
hcp->dup_off = 0;
do {
memcpy(&len,
@@ -1265,7 +1330,8 @@ __ham_dup_return (dbc, val, flags)
* may need to adjust the cursor before returning data.
* Case 4
*/
- if (flags == DB_GET_BOTH || flags == DB_GET_BOTHC) {
+ if (flags == DB_GET_BOTH ||
+ flags == DB_GET_BOTHC || flags == DB_GET_BOTH_RANGE) {
if (F_ISSET(hcp, H_ISDUP)) {
/*
* If we're doing a join, search forward from the
@@ -1274,7 +1340,7 @@ __ham_dup_return (dbc, val, flags)
if (flags == DB_GET_BOTHC)
F_SET(hcp, H_CONTINUE);
- __ham_dsearch(dbc, val, &off, &cmp);
+ __ham_dsearch(dbc, val, &off, &cmp, flags);
/*
* This flag is set nowhere else and is safe to
@@ -1283,7 +1349,7 @@ __ham_dup_return (dbc, val, flags)
F_CLR(hcp, H_CONTINUE);
hcp->dup_off = off;
} else {
- hk = H_PAIRDATA(hcp->page, hcp->indx);
+ hk = H_PAIRDATA(dbp, hcp->page, hcp->indx);
if (((HKEYDATA *)hk)->type == H_OFFPAGE) {
memcpy(&tlen,
HOFFPAGE_TLEN(hk), sizeof(u_int32_t));
@@ -1298,7 +1364,7 @@ __ham_dup_return (dbc, val, flags)
* routines may only look at data and size.
*/
tmp_val.data = HKEYDATA_DATA(hk);
- tmp_val.size = LEN_HDATA(hcp->page,
+ tmp_val.size = LEN_HDATA(dbp, hcp->page,
dbp->pgsize, hcp->indx);
cmp = dbp->dup_compare == NULL ?
__bam_defcmp(dbp, &tmp_val, val) :
@@ -1311,6 +1377,18 @@ __ham_dup_return (dbc, val, flags)
}
/*
+ * If we're doing a bulk get, we don't want to actually return
+ * the data: __ham_bulk will take care of cracking out the
+ * duplicates appropriately.
+ *
+ * The rest of this function calculates partial offsets and
+ * handles the actual __db_ret, so just return if
+ * DB_MULTIPLE(_KEY) is set.
+ */
+ if (F_ISSET(dbc, DBC_MULTIPLE | DBC_MULTIPLE_KEY))
+ return (0);
+
+ /*
* Now, everything is initialized, grab a duplicate if
* necessary.
*/
@@ -1351,8 +1429,8 @@ __ham_dup_return (dbc, val, flags)
* Finally, if we had a duplicate, pp, ndx, and myval should be
* set appropriately.
*/
- if ((ret = __db_ret(dbp, pp, ndx, myval, &dbc->rdata.data,
- &dbc->rdata.ulen)) != 0)
+ if ((ret = __db_ret(dbp, pp, ndx, myval, &dbc->rdata->data,
+ &dbc->rdata->ulen)) != 0)
return (ret);
/*
@@ -1374,6 +1452,7 @@ __ham_overwrite(dbc, nval, flags)
u_int32_t flags;
{
DB *dbp;
+ DB_ENV *dbenv;
HASH_CURSOR *hcp;
DBT *myval, tmp_val, tmp_val2;
void *newrec;
@@ -1383,6 +1462,7 @@ __ham_overwrite(dbc, nval, flags)
int ret;
dbp = dbc->dbp;
+ dbenv = dbp->dbenv;
hcp = (HASH_CURSOR *)dbc->internal;
if (F_ISSET(hcp, H_ISDUP)) {
/*
@@ -1399,7 +1479,7 @@ __ham_overwrite(dbc, nval, flags)
*/
memset(&tmp_val, 0, sizeof(tmp_val));
if ((ret =
- __ham_dup_return (dbc, &tmp_val, DB_CURRENT)) != 0)
+ __ham_dup_return(dbc, &tmp_val, DB_CURRENT)) != 0)
return (ret);
/* Figure out new size. */
@@ -1435,7 +1515,7 @@ __ham_overwrite(dbc, nval, flags)
}
if ((ret = __os_malloc(dbp->dbenv,
- DUP_SIZE(newsize), NULL, &newrec)) != 0)
+ DUP_SIZE(newsize), &newrec)) != 0)
return (ret);
memset(&tmp_val2, 0, sizeof(tmp_val2));
F_SET(&tmp_val2, DB_DBT_PARTIAL);
@@ -1483,8 +1563,7 @@ __ham_overwrite(dbc, nval, flags)
tmp_val2.size = newsize;
if (dbp->dup_compare(
dbp, &tmp_val, &tmp_val2) != 0) {
- (void)__os_free(newrec,
- DUP_SIZE(newsize));
+ (void)__os_free(dbenv, newrec);
return (__db_duperr(dbp, flags));
}
}
@@ -1495,7 +1574,7 @@ __ham_overwrite(dbc, nval, flags)
tmp_val2.dlen = DUP_SIZE(hcp->dup_len);
ret = __ham_replpair(dbc, &tmp_val2, 0);
- (void)__os_free(newrec, DUP_SIZE(newsize));
+ (void)__os_free(dbenv, newrec);
/* Update cursor */
if (ret != 0)
@@ -1520,7 +1599,7 @@ __ham_overwrite(dbc, nval, flags)
/* Make sure we maintain sort order. */
if (dbp->dup_compare != NULL) {
tmp_val2.data =
- HKEYDATA_DATA(H_PAIRDATA(hcp->page,
+ HKEYDATA_DATA(H_PAIRDATA(dbp, hcp->page,
hcp->indx)) + hcp->dup_off +
sizeof(db_indx_t);
tmp_val2.size = hcp->dup_len;
@@ -1529,8 +1608,8 @@ __ham_overwrite(dbc, nval, flags)
}
/* Overwriting a complete duplicate. */
if ((ret =
- __ham_make_dup(dbp->dbenv, nval,
- &tmp_val, &dbc->rdata.data, &dbc->rdata.ulen)) != 0)
+ __ham_make_dup(dbp->dbenv, nval, &tmp_val,
+ &dbc->my_rdata.data, &dbc->my_rdata.ulen)) != 0)
return (ret);
/* Now fix what we are replacing. */
tmp_val.doff = hcp->dup_off;
@@ -1541,7 +1620,7 @@ __ham_overwrite(dbc, nval, flags)
hcp->dup_tlen += (nval->size - hcp->dup_len);
else
hcp->dup_tlen -= (hcp->dup_len - nval->size);
- hcp->dup_len = DUP_SIZE(nval->size);
+ hcp->dup_len = (db_indx_t)DUP_SIZE(nval->size);
}
myval = &tmp_val;
} else if (!F_ISSET(nval, DB_DBT_PARTIAL)) {
@@ -1549,12 +1628,12 @@ __ham_overwrite(dbc, nval, flags)
memcpy(&tmp_val, nval, sizeof(*nval));
F_SET(&tmp_val, DB_DBT_PARTIAL);
tmp_val.doff = 0;
- hk = H_PAIRDATA(hcp->page, hcp->indx);
+ hk = H_PAIRDATA(dbp, hcp->page, hcp->indx);
if (HPAGE_PTYPE(hk) == H_OFFPAGE)
memcpy(&tmp_val.dlen,
HOFFPAGE_TLEN(hk), sizeof(u_int32_t));
else
- tmp_val.dlen = LEN_HDATA(hcp->page,
+ tmp_val.dlen = LEN_HDATA(dbp, hcp->page,
hcp->hdr->dbmeta.pagesize, hcp->indx);
myval = &tmp_val;
} else
@@ -1601,7 +1680,7 @@ __ham_lookup(dbc, key, sought, mode, pgnop)
hcp->bucket = __ham_call_hash(dbc, (u_int8_t *)key->data, key->size);
hcp->pgno = BUCKET_TO_PAGE(hcp, hcp->bucket);
- while (1) {
+ for (;;) {
*pgnop = PGNO_INVALID;
if ((ret = __ham_item_next(dbc, mode, pgnop)) != 0)
return (ret);
@@ -1609,7 +1688,7 @@ __ham_lookup(dbc, key, sought, mode, pgnop)
if (F_ISSET(hcp, H_NOMORE))
break;
- hk = H_PAIRKEY(hcp->page, hcp->indx);
+ hk = H_PAIRKEY(dbp, hcp->page, hcp->indx);
switch (HPAGE_PTYPE(hk)) {
case H_OFFPAGE:
memcpy(&tlen, HOFFPAGE_TLEN(hk), sizeof(u_int32_t));
@@ -1625,12 +1704,12 @@ __ham_lookup(dbc, key, sought, mode, pgnop)
break;
case H_KEYDATA:
if (key->size ==
- LEN_HKEY(hcp->page, dbp->pgsize, hcp->indx) &&
+ LEN_HKEY(dbp, hcp->page, dbp->pgsize, hcp->indx) &&
memcmp(key->data,
HKEYDATA_DATA(hk), key->size) == 0) {
/* Found the key, check for data type. */
found_key: F_SET(hcp, H_OK);
- dk = H_PAIRDATA(hcp->page, hcp->indx);
+ dk = H_PAIRDATA(dbp, hcp->page, hcp->indx);
if (HPAGE_PTYPE(dk) == H_OFFDUP)
memcpy(pgnop, HOFFDUP_PGNO(dk),
sizeof(db_pgno_t));
@@ -1643,7 +1722,7 @@ found_key: F_SET(hcp, H_OK);
* These are errors because keys are never
* duplicated, only data items are.
*/
- return (__db_pgfmt(dbp, PGNO(hcp->page)));
+ return (__db_pgfmt(dbp->dbenv, PGNO(hcp->page)));
}
}
@@ -1677,7 +1756,7 @@ __ham_init_dbt(dbenv, dbt, size, bufp, sizep)
memset(dbt, 0, sizeof(*dbt));
if (*sizep < size) {
- if ((ret = __os_realloc(dbenv, size, NULL, bufp)) != 0) {
+ if ((ret = __os_realloc(dbenv, size, bufp)) != 0) {
*sizep = 0;
return (ret);
}
@@ -1732,8 +1811,8 @@ __ham_c_update(dbc, len, add, is_dup)
MUTEX_THREAD_LOCK(dbenv, dbenv->dblist_mutexp);
/*
- * Calcuate the order of this deleted record.
- * This will be one grater than any cursor that is pointing
+ * Calculate the order of this deleted record.
+ * This will be one greater than any cursor that is pointing
* at this record and already marked as deleted.
*/
order = 0;
@@ -1749,11 +1828,11 @@ __ham_c_update(dbc, len, add, is_dup)
continue;
lcp = (HASH_CURSOR *)cp->internal;
if (F_ISSET(lcp, H_DELETED) &&
- hcp->pgno == lcp->pgno &&
- hcp->indx == lcp->indx &&
- order <= lcp->order &&
- (!is_dup || hcp->dup_off == lcp->dup_off))
- order = lcp->order +1;
+ hcp->pgno == lcp->pgno &&
+ hcp->indx == lcp->indx &&
+ order <= lcp->order &&
+ (!is_dup || hcp->dup_off == lcp->dup_off))
+ order = lcp->order + 1;
}
MUTEX_THREAD_UNLOCK(dbenv, dbp->mutexp);
}
@@ -1788,8 +1867,8 @@ __ham_c_update(dbc, len, add, is_dup)
* We are "undeleting" so unmark all
* cursors with the same order.
*/
- if (lcp->indx == hcp->indx
- && F_ISSET(lcp, H_DELETED)) {
+ if (lcp->indx == hcp->indx &&
+ F_ISSET(lcp, H_DELETED)) {
if (lcp->order == hcp->order)
F_CLR(lcp, H_DELETED);
else if (lcp->order >
@@ -1815,12 +1894,13 @@ __ham_c_update(dbc, len, add, is_dup)
} else {
if (lcp->indx > hcp->indx) {
lcp->indx -= 2;
- if (lcp->indx == hcp->indx
- && F_ISSET(lcp, H_DELETED))
+ if (lcp->indx == hcp->indx &&
+ F_ISSET(lcp, H_DELETED))
lcp->order += order;
- } else if (lcp->indx == hcp->indx
- && !F_ISSET(lcp, H_DELETED)) {
+ } else if (lcp->indx == hcp->indx &&
+ !F_ISSET(lcp, H_DELETED)) {
F_SET(lcp, H_DELETED);
+ F_CLR(lcp, H_ISDUP);
lcp->order = order;
}
}
@@ -1833,10 +1913,10 @@ __ham_c_update(dbc, len, add, is_dup)
*/
if (add) {
lcp->dup_tlen += len;
- if (lcp->dup_off == hcp->dup_off
- && F_ISSET(hcp, H_DELETED)
- && F_ISSET(lcp, H_DELETED)) {
- /* Abort of a delete. */
+ if (lcp->dup_off == hcp->dup_off &&
+ F_ISSET(hcp, H_DELETED) &&
+ F_ISSET(lcp, H_DELETED)) {
+ /* Abort of a delete. */
if (lcp->order == hcp->order)
F_CLR(lcp, H_DELETED);
else if (lcp->order >
@@ -1851,8 +1931,9 @@ __ham_c_update(dbc, len, add, is_dup)
lcp->dup_tlen -= len;
if (lcp->dup_off > hcp->dup_off) {
lcp->dup_off -= len;
- if (lcp->dup_off == hcp->dup_off
- && F_ISSET(lcp, H_DELETED))
+ if (lcp->dup_off ==
+ hcp->dup_off &&
+ F_ISSET(lcp, H_DELETED))
lcp->order += order;
} else if (lcp->dup_off ==
hcp->dup_off &&
@@ -1867,10 +1948,9 @@ __ham_c_update(dbc, len, add, is_dup)
}
MUTEX_THREAD_UNLOCK(dbenv, dbenv->dblist_mutexp);
- if (found != 0 && DB_LOGGING(dbc)) {
- if ((ret = __ham_curadj_log(dbenv,
- my_txn, &lsn, 0, dbp->log_fileid, hcp->pgno,
- hcp->indx, len, hcp->dup_off, add, is_dup, order)) != 0)
+ if (found != 0 && DBC_LOGGING(dbc)) {
+ if ((ret = __ham_curadj_log(dbp, my_txn, &lsn, 0, hcp->pgno,
+ hcp->indx, len, hcp->dup_off, add, is_dup, order)) != 0)
return (ret);
}
@@ -1885,13 +1965,12 @@ __ham_c_update(dbc, len, add, is_dup)
* cursors on a split. The latter is so we can update cursors when we
* move items off page.
*
- * PUBLIC: int __ham_get_clist __P((DB *,
- * PUBLIC: db_pgno_t, u_int32_t, DBC ***));
+ * PUBLIC: int __ham_get_clist __P((DB *, db_pgno_t, u_int32_t, DBC ***));
*/
int
-__ham_get_clist(dbp, bucket, indx, listp)
+__ham_get_clist(dbp, pgno, indx, listp)
DB *dbp;
- db_pgno_t bucket;
+ db_pgno_t pgno;
u_int32_t indx;
DBC ***listp;
{
@@ -1915,18 +1994,20 @@ __ham_get_clist(dbp, bucket, indx, listp)
MUTEX_THREAD_LOCK(dbenv, dbp->mutexp);
for (cp = TAILQ_FIRST(&ldbp->active_queue); cp != NULL;
cp = TAILQ_NEXT(cp, links))
- if (cp->dbtype == DB_HASH &&
- ((indx == NDX_INVALID &&
- ((HASH_CURSOR *)(cp->internal))->bucket
- == bucket) || (indx != NDX_INVALID &&
- cp->internal->pgno == bucket &&
- cp->internal->indx == indx))) {
+ /*
+ * We match if cp->pgno matches the specified
+ * pgno, and if either the cp->indx matches
+ * or we weren't given an index.
+ */
+ if (cp->internal->pgno == pgno &&
+ (indx == NDX_INVALID ||
+ cp->internal->indx == indx)) {
if (nused >= nalloc) {
nalloc += 10;
if ((ret = __os_realloc(dbp->dbenv,
nalloc * sizeof(HASH_CURSOR *),
- NULL, listp)) != 0)
- return (ret);
+ listp)) != 0)
+ goto err;
}
(*listp)[nused++] = cp;
}
@@ -1939,74 +2020,25 @@ __ham_get_clist(dbp, bucket, indx, listp)
if (nused >= nalloc) {
nalloc++;
if ((ret = __os_realloc(dbp->dbenv,
- nalloc * sizeof(HASH_CURSOR *), NULL, listp)) != 0)
+ nalloc * sizeof(HASH_CURSOR *), listp)) != 0)
return (ret);
}
(*listp)[nused] = NULL;
}
return (0);
-}
-
-static int
-__ham_del_dups(orig_dbc, key)
- DBC *orig_dbc;
- DBT *key;
-{
- DBC *dbc;
- DBT data, lkey;
- int ret, t_ret;
-
- /* Allocate a cursor. */
- if ((ret = orig_dbc->c_dup(orig_dbc, &dbc, 0)) != 0)
- return (ret);
-
- /*
- * Walk a cursor through the key/data pairs, deleting as we go. Set
- * the DB_DBT_USERMEM flag, as this might be a threaded application
- * and the flags checking will catch us. We don't actually want the
- * keys or data, so request a partial of length 0.
- */
- memset(&lkey, 0, sizeof(lkey));
- F_SET(&lkey, DB_DBT_USERMEM | DB_DBT_PARTIAL);
- memset(&data, 0, sizeof(data));
- F_SET(&data, DB_DBT_USERMEM | DB_DBT_PARTIAL);
-
- /* Walk through the set of key/data pairs, deleting as we go. */
- if ((ret = dbc->c_get(dbc, key, &data, DB_SET)) != 0) {
- if (ret == DB_NOTFOUND)
- ret = 0;
- goto err;
- }
-
- for (;;) {
- if ((ret = dbc->c_del(dbc, 0)) != 0)
- goto err;
- if ((ret = dbc->c_get(dbc, &lkey, &data, DB_NEXT_DUP)) != 0) {
- if (ret == DB_NOTFOUND) {
- ret = 0;
- break;
- }
- goto err;
- }
- }
-
-err: /*
- * Discard the cursor. This will cause the underlying off-page dup
- * tree to go away as well as the actual entry on the page.
- */
- if ((t_ret = dbc->c_close(dbc)) != 0 && ret == 0)
- ret = t_ret;
-
+err:
+ MUTEX_THREAD_UNLOCK(dbp->dbenv, dbp->mutexp);
+ MUTEX_THREAD_UNLOCK(dbenv, dbenv->dblist_mutexp);
return (ret);
-
}
static int
__ham_c_writelock(dbc)
DBC *dbc;
{
- HASH_CURSOR *hcp;
+ DB_ENV *dbenv;
DB_LOCK tmp_lock;
+ HASH_CURSOR *hcp;
int ret;
/*
@@ -2017,79 +2049,13 @@ __ham_c_writelock(dbc)
return (0);
hcp = (HASH_CURSOR *)dbc->internal;
- if ((hcp->lock.off == LOCK_INVALID || hcp->lock_mode == DB_LOCK_READ)) {
+ if ((!LOCK_ISSET(hcp->lock) || hcp->lock_mode == DB_LOCK_READ)) {
tmp_lock = hcp->lock;
if ((ret = __ham_lock_bucket(dbc, DB_LOCK_WRITE)) != 0)
return (ret);
- if (tmp_lock.off != LOCK_INVALID &&
- (ret = lock_put(dbc->dbp->dbenv, &tmp_lock)) != 0)
- return (ret);
- }
- return (0);
-}
-
-/*
- * __ham_c_chgpg --
- *
- * Adjust the cursors after moving an item from one page to another.
- * If the old_index is NDX_INVALID, that means that we copied the
- * page wholesale and we're leaving indices intact and just changing
- * the page number.
- *
- * PUBLIC: int __ham_c_chgpg
- * PUBLIC: __P((DBC *, db_pgno_t, u_int32_t, db_pgno_t, u_int32_t));
- */
-int
-__ham_c_chgpg(dbc, old_pgno, old_index, new_pgno, new_index)
- DBC *dbc;
- db_pgno_t old_pgno, new_pgno;
- u_int32_t old_index, new_index;
-{
- DB *dbp, *ldbp;
- DB_ENV *dbenv;
- DB_LSN lsn;
- DB_TXN *my_txn;
- DBC *cp;
- HASH_CURSOR *hcp;
- int found, ret;
-
- dbp = dbc->dbp;
- dbenv = dbp->dbenv;
-
- my_txn = IS_SUBTRANSACTION(dbc->txn) ? dbc->txn : NULL;
- found = 0;
-
- MUTEX_THREAD_LOCK(dbenv, dbenv->dblist_mutexp);
- for (ldbp = __dblist_get(dbenv, dbp->adj_fileid);
- ldbp != NULL && ldbp->adj_fileid == dbp->adj_fileid;
- ldbp = LIST_NEXT(ldbp, dblistlinks)) {
- MUTEX_THREAD_LOCK(dbenv, dbp->mutexp);
- for (cp = TAILQ_FIRST(&ldbp->active_queue); cp != NULL;
- cp = TAILQ_NEXT(cp, links)) {
- if (cp == dbc || cp->dbtype != DB_HASH)
- continue;
-
- hcp = (HASH_CURSOR *)cp->internal;
- if (hcp->pgno == old_pgno) {
- if (old_index == NDX_INVALID) {
- hcp->pgno = new_pgno;
- } else if (hcp->indx == old_index) {
- hcp->pgno = new_pgno;
- hcp->indx = new_index;
- } else
- continue;
- if (my_txn != NULL && cp->txn != my_txn)
- found = 1;
- }
- }
- MUTEX_THREAD_UNLOCK(dbenv, dbp->mutexp);
- }
- MUTEX_THREAD_UNLOCK(dbenv, dbenv->dblist_mutexp);
-
- if (found != 0 && DB_LOGGING(dbc)) {
- if ((ret = __ham_chgpg_log(dbenv,
- my_txn, &lsn, 0, dbp->log_fileid, DB_HAM_CHGPG,
- old_pgno, new_pgno, old_index, new_index)) != 0)
+ dbenv = dbc->dbp->dbenv;
+ if (LOCK_ISSET(tmp_lock) &&
+ (ret = dbenv->lock_put(dbenv, &tmp_lock)) != 0)
return (ret);
}
return (0);