diff options
Diffstat (limited to 'bdb/hash/hash.c')
-rw-r--r-- | bdb/hash/hash.c | 1386 |
1 files changed, 676 insertions, 710 deletions
diff --git a/bdb/hash/hash.c b/bdb/hash/hash.c index e96fd4898f0..2f972a3238d 100644 --- a/bdb/hash/hash.c +++ b/bdb/hash/hash.c @@ -1,7 +1,7 @@ /*- * See the file LICENSE for redistribution information. * - * Copyright (c) 1996, 1997, 1998, 1999, 2000 + * Copyright (c) 1996-2002 * Sleepycat Software. All rights reserved. */ /* @@ -43,7 +43,7 @@ #include "db_config.h" #ifndef lint -static const char revid[] = "$Id: hash.c,v 11.94 2001/01/03 16:42:26 ubell Exp $"; +static const char revid[] = "$Id: hash.c,v 11.166 2002/08/06 06:11:25 bostic Exp $"; #endif /* not lint */ #ifndef NO_SYSTEM_INCLUDES @@ -54,446 +54,70 @@ static const char revid[] = "$Id: hash.c,v 11.94 2001/01/03 16:42:26 ubell Exp $ #endif #include "db_int.h" -#include "db_page.h" -#include "db_am.h" -#include "db_ext.h" -#include "db_shash.h" -#include "db_swap.h" -#include "hash.h" -#include "btree.h" -#include "log.h" -#include "lock.h" -#include "txn.h" +#include "dbinc/db_page.h" +#include "dbinc/db_shash.h" +#include "dbinc/btree.h" +#include "dbinc/hash.h" +#include "dbinc/lock.h" +static int __ham_bulk __P((DBC *, DBT *, u_int32_t)); static int __ham_c_close __P((DBC *, db_pgno_t, int *)); static int __ham_c_del __P((DBC *)); static int __ham_c_destroy __P((DBC *)); static int __ham_c_get __P((DBC *, DBT *, DBT *, u_int32_t, db_pgno_t *)); static int __ham_c_put __P((DBC *, DBT *, DBT *, u_int32_t, db_pgno_t *)); static int __ham_c_writelock __P((DBC *)); -static int __ham_del_dups __P((DBC *, DBT *)); -static int __ham_delete __P((DB *, DB_TXN *, DBT *, u_int32_t)); static int __ham_dup_return __P((DBC *, DBT *, u_int32_t)); static int __ham_expand_table __P((DBC *)); -static int __ham_init_htab __P((DBC *, - const char *, db_pgno_t, u_int32_t, u_int32_t)); static int __ham_lookup __P((DBC *, const DBT *, u_int32_t, db_lockmode_t, db_pgno_t *)); static int __ham_overwrite __P((DBC *, DBT *, u_int32_t)); /* - * __ham_metachk -- + * __ham_quick_delete -- + * When performing a DB->del operation that does not involve secondary + * indices and is not removing an off-page duplicate tree, we can + * speed things up substantially by removing the entire duplicate + * set, if any is present, in one operation, rather than by conjuring + * up and deleting each of the items individually. (All are stored + * in one big HKEYDATA structure.) We don't bother to distinguish + * on-page duplicate sets from single, non-dup items; they're deleted + * in exactly the same way. * - * PUBLIC: int __ham_metachk __P((DB *, const char *, HMETA *)); - */ -int -__ham_metachk(dbp, name, hashm) - DB *dbp; - const char *name; - HMETA *hashm; -{ - DB_ENV *dbenv; - u_int32_t vers; - int ret; - - dbenv = dbp->dbenv; - - /* - * At this point, all we know is that the magic number is for a Hash. - * Check the version, the database may be out of date. - */ - vers = hashm->dbmeta.version; - if (F_ISSET(dbp, DB_AM_SWAP)) - M_32_SWAP(vers); - switch (vers) { - case 4: - case 5: - case 6: - __db_err(dbenv, - "%s: hash version %lu requires a version upgrade", - name, (u_long)vers); - return (DB_OLD_VERSION); - case 7: - break; - default: - __db_err(dbenv, - "%s: unsupported hash version: %lu", name, (u_long)vers); - return (EINVAL); - } - - /* Swap the page if we need to. */ - if (F_ISSET(dbp, DB_AM_SWAP) && (ret = __ham_mswap((PAGE *)hashm)) != 0) - return (ret); - - /* Check the type. */ - if (dbp->type != DB_HASH && dbp->type != DB_UNKNOWN) - return (EINVAL); - dbp->type = DB_HASH; - DB_ILLEGAL_METHOD(dbp, DB_OK_HASH); - - /* - * Check application info against metadata info, and set info, flags, - * and type based on metadata info. - */ - if ((ret = __db_fchk(dbenv, - "DB->open", hashm->dbmeta.flags, - DB_HASH_DUP | DB_HASH_SUBDB | DB_HASH_DUPSORT)) != 0) - return (ret); - - if (F_ISSET(&hashm->dbmeta, DB_HASH_DUP)) - F_SET(dbp, DB_AM_DUP); - else - if (F_ISSET(dbp, DB_AM_DUP)) { - __db_err(dbenv, - "%s: DB_DUP specified to open method but not set in database", - name); - return (EINVAL); - } - - if (F_ISSET(&hashm->dbmeta, DB_HASH_SUBDB)) - F_SET(dbp, DB_AM_SUBDB); - else - if (F_ISSET(dbp, DB_AM_SUBDB)) { - __db_err(dbenv, - "%s: multiple databases specified but not supported in file", - name); - return (EINVAL); - } - - if (F_ISSET(&hashm->dbmeta, DB_HASH_DUPSORT)) { - if (dbp->dup_compare == NULL) - dbp->dup_compare = __bam_defcmp; - } else - if (dbp->dup_compare != NULL) { - __db_err(dbenv, - "%s: duplicate sort function specified but not set in database", - name); - return (EINVAL); - } - - /* Set the page size. */ - dbp->pgsize = hashm->dbmeta.pagesize; - - /* Copy the file's ID. */ - memcpy(dbp->fileid, hashm->dbmeta.uid, DB_FILE_ID_LEN); - - return (0); -} - -/* - * __ham_open -- + * This function is called by __db_delete when the appropriate + * conditions are met, and it performs the delete in the optimized way. * - * PUBLIC: int __ham_open __P((DB *, const char *, db_pgno_t, u_int32_t)); + * The cursor should be set to the first item in the duplicate + * set, or to the sole key/data pair when the key does not have a + * duplicate set, before the function is called. + * + * PUBLIC: int __ham_quick_delete __P((DBC *)); */ int -__ham_open(dbp, name, base_pgno, flags) - DB *dbp; - const char *name; - db_pgno_t base_pgno; - u_int32_t flags; -{ - DB_ENV *dbenv; - DBC *dbc; - HASH_CURSOR *hcp; - HASH *hashp; - int need_sync, ret, t_ret; - - dbc = NULL; - dbenv = dbp->dbenv; - need_sync = 0; - - /* Initialize the remaining fields/methods of the DB. */ - dbp->del = __ham_delete; - dbp->stat = __ham_stat; - - /* - * Get a cursor. If DB_CREATE is specified, we may be creating - * pages, and to do that safely in CDB we need a write cursor. - * In STD_LOCKING mode, we'll synchronize using the meta page - * lock instead. - */ - if ((ret = dbp->cursor(dbp, - dbp->open_txn, &dbc, LF_ISSET(DB_CREATE) && CDB_LOCKING(dbenv) ? - DB_WRITECURSOR : 0)) != 0) - return (ret); - - hcp = (HASH_CURSOR *)dbc->internal; - hashp = dbp->h_internal; - hashp->meta_pgno = base_pgno; - if ((ret = __ham_get_meta(dbc)) != 0) - goto err1; - - /* - * If this is a new file, initialize it, and put it back dirty. - * - * Initialize the hdr structure. - */ - if (hcp->hdr->dbmeta.magic == DB_HASHMAGIC) { - /* File exists, verify the data in the header. */ - if (hashp->h_hash == NULL) - hashp->h_hash = hcp->hdr->dbmeta.version < 5 - ? __ham_func4 : __ham_func5; - if (!F_ISSET(dbp, DB_RDONLY) && - hashp->h_hash(dbp, - CHARKEY, sizeof(CHARKEY)) != hcp->hdr->h_charkey) { - __db_err(dbp->dbenv, - "hash: incompatible hash function"); - ret = EINVAL; - goto err2; - } - if (F_ISSET(&hcp->hdr->dbmeta, DB_HASH_DUP)) - F_SET(dbp, DB_AM_DUP); - if (F_ISSET(&hcp->hdr->dbmeta, DB_HASH_DUPSORT)) - F_SET(dbp, DB_AM_DUPSORT); - if (F_ISSET(&hcp->hdr->dbmeta, DB_HASH_SUBDB)) - F_SET(dbp, DB_AM_SUBDB); - } else if (!IS_RECOVERING(dbenv)) { - /* - * File does not exist, we must initialize the header. If - * locking is enabled that means getting a write lock first. - * During recovery the meta page will be in the log. - */ - dbc->lock.pgno = base_pgno; - - if (STD_LOCKING(dbc) && - ((ret = lock_put(dbenv, &hcp->hlock)) != 0 || - (ret = lock_get(dbenv, dbc->locker, - DB_NONBLOCK(dbc) ? DB_LOCK_NOWAIT : 0, - &dbc->lock_dbt, DB_LOCK_WRITE, &hcp->hlock)) != 0)) - goto err2; - else if (CDB_LOCKING(dbp->dbenv)) { - DB_ASSERT(LF_ISSET(DB_CREATE)); - if ((ret = lock_get(dbenv, dbc->locker, - DB_LOCK_UPGRADE, &dbc->lock_dbt, DB_LOCK_WRITE, - &dbc->mylock)) != 0) - goto err2; - } - if ((ret = __ham_init_htab(dbc, name, - base_pgno, hashp->h_nelem, hashp->h_ffactor)) != 0) - goto err2; - - need_sync = 1; - } - -err2: /* Release the meta data page */ - if ((t_ret = __ham_release_meta(dbc)) != 0 && ret == 0) - ret = t_ret; -err1: if ((t_ret = dbc->c_close(dbc)) != 0 && ret == 0) - ret = t_ret; - - /* Sync the file so that we know that the meta data goes to disk. */ - if (ret == 0 && need_sync) - ret = dbp->sync(dbp, 0); -#if CONFIG_TEST - if (ret == 0) - DB_TEST_RECOVERY(dbp, DB_TEST_POSTSYNC, ret, name); - -DB_TEST_RECOVERY_LABEL -#endif - if (ret != 0) - (void)__ham_db_close(dbp); - - return (ret); -} - -/************************** LOCAL CREATION ROUTINES **********************/ -/* - * Returns 0 on No Error - */ -static int -__ham_init_htab(dbc, name, pgno, nelem, ffactor) +__ham_quick_delete(dbc) DBC *dbc; - const char *name; - db_pgno_t pgno; - u_int32_t nelem, ffactor; { - DB *dbp; - DB_LOCK metalock; - DB_LSN orig_lsn; - DBMETA *mmeta; - HASH_CURSOR *hcp; - HASH *hashp; - PAGE *h; - db_pgno_t mpgno; - int32_t l2, nbuckets; - int dirty_mmeta, i, ret, t_ret; - - hcp = (HASH_CURSOR *)dbc->internal; - dbp = dbc->dbp; - hashp = dbp->h_internal; - mmeta = NULL; - h = NULL; - ret = 0; - dirty_mmeta = 0; - metalock.off = LOCK_INVALID; - - if (hashp->h_hash == NULL) - hashp->h_hash = DB_HASHVERSION < 5 ? __ham_func4 : __ham_func5; - - if (nelem != 0 && ffactor != 0) { - nelem = (nelem - 1) / ffactor + 1; - l2 = __db_log2(nelem > 2 ? nelem : 2); - } else - l2 = 1; - nbuckets = 1 << l2; - - orig_lsn = hcp->hdr->dbmeta.lsn; - memset(hcp->hdr, 0, sizeof(HMETA)); - ZERO_LSN(hcp->hdr->dbmeta.lsn); - hcp->hdr->dbmeta.pgno = pgno; - hcp->hdr->dbmeta.magic = DB_HASHMAGIC; - hcp->hdr->dbmeta.version = DB_HASHVERSION; - hcp->hdr->dbmeta.pagesize = dbp->pgsize; - hcp->hdr->dbmeta.type = P_HASHMETA; - hcp->hdr->dbmeta.free = PGNO_INVALID; - hcp->hdr->max_bucket = hcp->hdr->high_mask = nbuckets - 1; - hcp->hdr->low_mask = (nbuckets >> 1) - 1; - hcp->hdr->ffactor = ffactor; - hcp->hdr->h_charkey = hashp->h_hash(dbp, CHARKEY, sizeof(CHARKEY)); - memcpy(hcp->hdr->dbmeta.uid, dbp->fileid, DB_FILE_ID_LEN); - - if (F_ISSET(dbp, DB_AM_DUP)) - F_SET(&hcp->hdr->dbmeta, DB_HASH_DUP); - if (F_ISSET(dbp, DB_AM_SUBDB)) - F_SET(&hcp->hdr->dbmeta, DB_HASH_SUBDB); - if (dbp->dup_compare != NULL) - F_SET(&hcp->hdr->dbmeta, DB_HASH_DUPSORT); - - if ((ret = memp_fset(dbp->mpf, hcp->hdr, DB_MPOOL_DIRTY)) != 0) - goto err; - - /* - * Create the first and second buckets pages so that we have the - * page numbers for them and we can store that page number - * in the meta-data header (spares[0]). - */ - hcp->hdr->spares[0] = nbuckets; - if ((ret = memp_fget(dbp->mpf, - &hcp->hdr->spares[0], DB_MPOOL_NEW_GROUP, &h)) != 0) - goto err; - - P_INIT(h, dbp->pgsize, hcp->hdr->spares[0], PGNO_INVALID, - PGNO_INVALID, 0, P_HASH); - - /* Fill in the last fields of the meta data page. */ - hcp->hdr->spares[0] -= (nbuckets - 1); - for (i = 1; i <= l2; i++) - hcp->hdr->spares[i] = hcp->hdr->spares[0]; - for (; i < NCACHED; i++) - hcp->hdr->spares[i] = PGNO_INVALID; - - /* - * Before we are about to put any dirty pages, we need to log - * the meta-data page create. - */ - ret = __db_log_page(dbp, name, &orig_lsn, pgno, (PAGE *)hcp->hdr); - - if (dbp->open_txn != NULL) { - mmeta = (DBMETA *) hcp->hdr; - if (F_ISSET(dbp, DB_AM_SUBDB)) { - - /* - * If this is a subdatabase, then we need to - * get the LSN off the master meta data page - * because that's where free pages are linked - * and during recovery we need to access - * that page and roll it backward/forward - * correctly with respect to LSN. - */ - mpgno = PGNO_BASE_MD; - if ((ret = __db_lget(dbc, - 0, mpgno, DB_LOCK_WRITE, 0, &metalock)) != 0) - goto err; - if ((ret = memp_fget(dbp->mpf, - &mpgno, 0, (PAGE **)&mmeta)) != 0) - goto err; - } - if ((t_ret = __ham_groupalloc_log(dbp->dbenv, - dbp->open_txn, &LSN(mmeta), 0, dbp->log_fileid, - &LSN(mmeta), hcp->hdr->spares[0], - hcp->hdr->max_bucket + 1, mmeta->free)) != 0 && ret == 0) - ret = t_ret; - if (ret == 0) { - /* need to update real LSN for buffer manager */ - dirty_mmeta = 1; - } - - } - - DB_TEST_RECOVERY(dbp, DB_TEST_POSTLOG, ret, name); - -DB_TEST_RECOVERY_LABEL -err: if (h != NULL && - (t_ret = memp_fput(dbp->mpf, h, DB_MPOOL_DIRTY)) != 0 && ret == 0) - ret = t_ret; - - if (F_ISSET(dbp, DB_AM_SUBDB) && mmeta != NULL) - if ((t_ret = memp_fput(dbp->mpf, mmeta, - dirty_mmeta ? DB_MPOOL_DIRTY : 0)) != 0 && ret == 0) - ret = t_ret; - if (metalock.off != LOCK_INVALID) - (void)__TLPUT(dbc, metalock); - - return (ret); -} - -static int -__ham_delete(dbp, txn, key, flags) - DB *dbp; - DB_TXN *txn; - DBT *key; - u_int32_t flags; -{ - DBC *dbc; - HASH_CURSOR *hcp; - db_pgno_t pgno; int ret, t_ret; - /* - * This is the only access method routine called directly from - * the dbp, so we have to do error checking. - */ - - PANIC_CHECK(dbp->dbenv); - DB_ILLEGAL_BEFORE_OPEN(dbp, "DB->del"); - DB_CHECK_TXN(dbp, txn); - - if ((ret = - __db_delchk(dbp, key, flags, F_ISSET(dbp, DB_AM_RDONLY))) != 0) - return (ret); - - if ((ret = dbp->cursor(dbp, txn, &dbc, DB_WRITELOCK)) != 0) + if ((ret = __ham_get_meta(dbc)) != 0) return (ret); - DEBUG_LWRITE(dbc, txn, "ham_delete", key, NULL, flags); + /* Assert that we're not using secondary indices. */ + DB_ASSERT(!F_ISSET(dbc->dbp, DB_AM_SECONDARY)); + /* + * We should assert that we're not a primary either, but that + * would require grabbing the dbp's mutex, so we don't bother. + */ - hcp = (HASH_CURSOR *)dbc->internal; - if ((ret = __ham_get_meta(dbc)) != 0) - goto out; + /* Assert that we're set, but not to an off-page duplicate. */ + DB_ASSERT(IS_INITIALIZED(dbc)); + DB_ASSERT(((HASH_CURSOR *)dbc->internal)->opd == NULL); - pgno = PGNO_INVALID; - if ((ret = __ham_lookup(dbc, key, 0, DB_LOCK_WRITE, &pgno)) == 0) { - if (F_ISSET(hcp, H_OK)) { - if (pgno == PGNO_INVALID) - ret = __ham_del_pair(dbc, 1); - else { - /* When we close the cursor in __ham_del_dups, - * that will make the off-page dup tree go - * go away as well as our current entry. When - * it updates cursors, ours should get marked - * as H_DELETED. - */ - ret = __ham_del_dups(dbc, key); - } - } else - ret = DB_NOTFOUND; - } + ret = __ham_del_pair(dbc, 1); if ((t_ret = __ham_release_meta(dbc)) != 0 && ret == 0) ret = t_ret; -out: if ((t_ret = dbc->c_close(dbc)) != 0 && ret == 0) - ret = t_ret; return (ret); } @@ -517,8 +141,8 @@ __ham_c_init(dbc) 1, sizeof(struct cursor_t), &new_curs)) != 0) return (ret); if ((ret = __os_malloc(dbenv, - dbc->dbp->pgsize, NULL, &new_curs->split_buf)) != 0) { - __os_free(new_curs, sizeof(*new_curs)); + dbc->dbp->pgsize, &new_curs->split_buf)) != 0) { + __os_free(dbenv, new_curs); return (ret); } @@ -527,8 +151,10 @@ __ham_c_init(dbc) dbc->c_count = __db_c_count; dbc->c_del = __db_c_del; dbc->c_dup = __db_c_dup; - dbc->c_get = __db_c_get; + dbc->c_get = dbc->c_real_get = __db_c_get; + dbc->c_pget = __db_c_pget; dbc->c_put = __db_c_put; + dbc->c_am_bulk = __ham_bulk; dbc->c_am_close = __ham_c_close; dbc->c_am_del = __ham_c_del; dbc->c_am_destroy = __ham_c_destroy; @@ -551,12 +177,14 @@ __ham_c_close(dbc, root_pgno, rmroot) db_pgno_t root_pgno; int *rmroot; { + DB_MPOOLFILE *mpf; HASH_CURSOR *hcp; HKEYDATA *dp; int doroot, gotmeta, ret, t_ret; u_int32_t dirty; COMPQUIET(rmroot, 0); + mpf = dbc->dbp->mpf; dirty = 0; doroot = gotmeta = ret = 0; hcp = (HASH_CURSOR *) dbc->internal; @@ -568,9 +196,14 @@ __ham_c_close(dbc, root_pgno, rmroot) gotmeta = 1; if ((ret = __ham_get_cpage(dbc, DB_LOCK_READ)) != 0) goto out; - dp = (HKEYDATA *)H_PAIRDATA(hcp->page, hcp->indx); - DB_ASSERT(HPAGE_PTYPE(dp) == H_OFFDUP); - memcpy(&root_pgno, HOFFPAGE_PGNO(dp), sizeof(db_pgno_t)); + dp = (HKEYDATA *)H_PAIRDATA(dbc->dbp, hcp->page, hcp->indx); + + /* If its not a dup we aborted before we changed it. */ + if (HPAGE_PTYPE(dp) == H_OFFDUP) + memcpy(&root_pgno, + HOFFPAGE_PGNO(dp), sizeof(db_pgno_t)); + else + root_pgno = PGNO_INVALID; if ((ret = hcp->opd->c_am_close(hcp->opd, root_pgno, &doroot)) != 0) @@ -583,7 +216,7 @@ __ham_c_close(dbc, root_pgno, rmroot) } out: if (hcp->page != NULL && (t_ret = - memp_fput(dbc->dbp->mpf, hcp->page, dirty)) != 0 && ret == 0) + mpf->put(mpf, hcp->page, dirty)) != 0 && ret == 0) ret = t_ret; if (gotmeta != 0 && (t_ret = __ham_release_meta(dbc)) != 0 && ret == 0) ret = t_ret; @@ -605,8 +238,8 @@ __ham_c_destroy(dbc) hcp = (HASH_CURSOR *)dbc->internal; if (hcp->split_buf != NULL) - __os_free(hcp->split_buf, dbc->dbp->pgsize); - __os_free(hcp, sizeof(HASH_CURSOR)); + __os_free(dbc->dbp->dbenv, hcp->split_buf); + __os_free(dbc->dbp->dbenv, hcp); return (0); } @@ -623,6 +256,7 @@ __ham_c_count(dbc, recnop) db_recno_t *recnop; { DB *dbp; + DB_MPOOLFILE *mpf; HASH_CURSOR *hcp; db_indx_t len; db_recno_t recno; @@ -630,22 +264,23 @@ __ham_c_count(dbc, recnop) u_int8_t *p, *pend; dbp = dbc->dbp; - hcp = (HASH_CURSOR *) dbc->internal; + mpf = dbp->mpf; + hcp = (HASH_CURSOR *)dbc->internal; recno = 0; if ((ret = __ham_get_cpage(dbc, DB_LOCK_READ)) != 0) return (ret); - switch (HPAGE_PTYPE(H_PAIRDATA(hcp->page, hcp->indx))) { + switch (HPAGE_PTYPE(H_PAIRDATA(dbp, hcp->page, hcp->indx))) { case H_KEYDATA: case H_OFFPAGE: recno = 1; break; case H_DUPLICATE: - p = HKEYDATA_DATA(H_PAIRDATA(hcp->page, hcp->indx)); + p = HKEYDATA_DATA(H_PAIRDATA(dbp, hcp->page, hcp->indx)); pend = p + - LEN_HDATA(hcp->page, dbp->pgsize, hcp->indx); + LEN_HDATA(dbp, hcp->page, dbp->pgsize, hcp->indx); for (; p < pend; recno++) { /* p may be odd, so copy rather than just dereffing */ memcpy(&len, p, sizeof(db_indx_t)); @@ -654,14 +289,13 @@ __ham_c_count(dbc, recnop) break; default: - ret = __db_unknown_type(dbp->dbenv, "__ham_c_count", - HPAGE_PTYPE(H_PAIRDATA(hcp->page, hcp->indx))); + ret = __db_pgfmt(dbp->dbenv, hcp->pgno); goto err; } *recnop = recno; -err: if ((t_ret = memp_fput(dbc->dbp->mpf, hcp->page, 0)) != 0 && ret == 0) +err: if ((t_ret = mpf->put(mpf, hcp->page, 0)) != 0 && ret == 0) ret = t_ret; hcp->page = NULL; return (ret); @@ -673,10 +307,12 @@ __ham_c_del(dbc) { DB *dbp; DBT repldbt; + DB_MPOOLFILE *mpf; HASH_CURSOR *hcp; int ret, t_ret; dbp = dbc->dbp; + mpf = dbp->mpf; hcp = (HASH_CURSOR *)dbc->internal; if (F_ISSET(hcp, H_DELETED)) @@ -689,12 +325,12 @@ __ham_c_del(dbc) goto out; /* Off-page duplicates. */ - if (HPAGE_TYPE(hcp->page, H_DATAINDEX(hcp->indx)) == H_OFFDUP) + if (HPAGE_TYPE(dbp, hcp->page, H_DATAINDEX(hcp->indx)) == H_OFFDUP) goto out; if (F_ISSET(hcp, H_ISDUP)) { /* On-page duplicate. */ if (hcp->dup_off == 0 && - DUP_SIZE(hcp->dup_len) == LEN_HDATA(hcp->page, + DUP_SIZE(hcp->dup_len) == LEN_HDATA(dbp, hcp->page, hcp->hdr->dbmeta.pagesize, hcp->indx)) ret = __ham_del_pair(dbc, 1); else { @@ -703,21 +339,25 @@ __ham_c_del(dbc) repldbt.doff = hcp->dup_off; repldbt.dlen = DUP_SIZE(hcp->dup_len); repldbt.size = 0; - repldbt.data = HKEYDATA_DATA(H_PAIRDATA(hcp->page, + repldbt.data = HKEYDATA_DATA(H_PAIRDATA(dbp, hcp->page, hcp->indx)); - ret = __ham_replpair(dbc, &repldbt, 0); - hcp->dup_tlen -= DUP_SIZE(hcp->dup_len); - F_SET(hcp, H_DELETED); - ret = __ham_c_update(dbc, DUP_SIZE(hcp->dup_len), 0, 1); + if ((ret = __ham_replpair(dbc, &repldbt, 0)) == 0) { + hcp->dup_tlen -= DUP_SIZE(hcp->dup_len); + F_SET(hcp, H_DELETED); + ret = __ham_c_update(dbc, + DUP_SIZE(hcp->dup_len), 0, 1); + } } } else /* Not a duplicate */ ret = __ham_del_pair(dbc, 1); -out: if (ret == 0 && hcp->page != NULL && - (t_ret = memp_fput(dbp->mpf, hcp->page, DB_MPOOL_DIRTY)) != 0) - ret = t_ret; - hcp->page = NULL; +out: if (hcp->page != NULL) { + if ((t_ret = mpf->put(mpf, + hcp->page, ret == 0 ? DB_MPOOL_DIRTY : 0)) && ret == 0) + ret = t_ret; + hcp->page = NULL; + } if ((t_ret = __ham_release_meta(dbc)) != 0 && ret == 0) ret = t_ret; return (ret); @@ -760,7 +400,7 @@ __ham_c_dup(orig_dbc, new_dbc) * holds a lock of the correct type, so if we need a write lock and * request it, we know that we'll get it. */ - if (orig->lock.off == LOCK_INVALID || orig_dbc->txn != NULL) + if (!LOCK_ISSET(orig->lock) || orig_dbc->txn != NULL) return (0); return (__ham_lock_bucket(new_dbc, DB_LOCK_READ)); @@ -775,12 +415,14 @@ __ham_c_get(dbc, key, data, flags, pgnop) db_pgno_t *pgnop; { DB *dbp; + DB_MPOOLFILE *mpf; HASH_CURSOR *hcp; db_lockmode_t lock_type; int get_key, ret, t_ret; hcp = (HASH_CURSOR *)dbc->internal; dbp = dbc->dbp; + mpf = dbp->mpf; /* Clear OR'd in additional bits so we can check for flag equality. */ if (F_ISSET(dbc, DBC_RMW)) @@ -827,6 +469,7 @@ __ham_c_get(dbc, key, data, flags, pgnop) case DB_SET: case DB_SET_RANGE: case DB_GET_BOTH: + case DB_GET_BOTH_RANGE: ret = __ham_lookup(dbc, key, 0, lock_type, pgnop); get_key = 0; break; @@ -856,11 +499,11 @@ __ham_c_get(dbc, key, data, flags, pgnop) goto err; else if (F_ISSET(hcp, H_OK)) { if (*pgnop == PGNO_INVALID) - ret = __ham_dup_return (dbc, data, flags); + ret = __ham_dup_return(dbc, data, flags); break; } else if (!F_ISSET(hcp, H_NOMORE)) { __db_err(dbp->dbenv, - "H_NOMORE returned to __ham_c_get"); + "H_NOMORE returned to __ham_c_get"); ret = EINVAL; break; } @@ -872,7 +515,7 @@ __ham_c_get(dbc, key, data, flags, pgnop) case DB_LAST: case DB_PREV: case DB_PREV_NODUP: - ret = memp_fput(dbp->mpf, hcp->page, 0); + ret = mpf->put(mpf, hcp->page, 0); hcp->page = NULL; if (hcp->bucket == 0) { ret = DB_NOTFOUND; @@ -890,7 +533,7 @@ __ham_c_get(dbc, key, data, flags, pgnop) case DB_FIRST: case DB_NEXT: case DB_NEXT_NODUP: - ret = memp_fput(dbp->mpf, hcp->page, 0); + ret = mpf->put(mpf, hcp->page, 0); hcp->page = NULL; hcp->indx = NDX_INVALID; hcp->bucket++; @@ -907,6 +550,7 @@ __ham_c_get(dbc, key, data, flags, pgnop) break; case DB_GET_BOTH: case DB_GET_BOTHC: + case DB_GET_BOTH_RANGE: case DB_NEXT_DUP: case DB_SET: case DB_SET_RANGE: @@ -940,6 +584,382 @@ err: if ((t_ret = __ham_release_meta(dbc)) != 0 && ret == 0) return (ret); } +/* + * __ham_bulk -- Return bulk data from a hash table. + */ +static int +__ham_bulk(dbc, data, flags) + DBC *dbc; + DBT *data; + u_int32_t flags; +{ + DB *dbp; + DB_MPOOLFILE *mpf; + HASH_CURSOR *cp; + PAGE *pg; + db_indx_t dup_len, dup_off, dup_tlen, indx, *inp; + db_lockmode_t lock_mode; + db_pgno_t pgno; + int32_t *endp, key_off, *offp, *saveoff; + u_int32_t key_size, size, space; + u_int8_t *dbuf, *dp, *hk, *np, *tmp; + int is_dup, is_key; + int need_pg, next_key, no_dup, pagesize, ret, t_ret; + + ret = 0; + key_off = 0; + dup_len = dup_off = dup_tlen = 0; + size = 0; + dbp = dbc->dbp; + pagesize = dbp->pgsize; + mpf = dbp->mpf; + cp = (HASH_CURSOR *)dbc->internal; + is_key = LF_ISSET(DB_MULTIPLE_KEY) ? 1 : 0; + next_key = is_key && LF_ISSET(DB_OPFLAGS_MASK) != DB_NEXT_DUP; + no_dup = LF_ISSET(DB_OPFLAGS_MASK) == DB_NEXT_NODUP; + dbuf = data->data; + np = dp = dbuf; + + /* Keep track of space that is left. There is an termination entry */ + space = data->ulen; + space -= sizeof(*offp); + + /* Build the offset/size table from the end up. */ + endp = (int32_t *) ((u_int8_t *)dbuf + data->ulen); + endp--; + offp = endp; + + key_size = 0; + lock_mode = F_ISSET(dbc, DBC_RMW) ? DB_LOCK_WRITE: DB_LOCK_READ; + +next_pg: + need_pg = 1; + indx = cp->indx; + pg = cp->page; + inp = P_INP(dbp, pg); + + do { + if (is_key) { + hk = H_PAIRKEY(dbp, pg, indx); + if (HPAGE_PTYPE(hk) == H_OFFPAGE) { + memcpy(&key_size, + HOFFPAGE_TLEN(hk), sizeof(u_int32_t)); + memcpy(&pgno, + HOFFPAGE_PGNO(hk), sizeof(db_pgno_t)); + size = key_size; + if (key_size > space) + goto get_key_space; + if ((ret = __bam_bulk_overflow( + dbc, key_size, pgno, np)) != 0) + return (ret); + space -= key_size; + key_off = (int32_t)(np - dbuf); + np += key_size; + } else { + if (need_pg) { + dp = np; + size = pagesize - HOFFSET(pg); + if (space < size) { +get_key_space: + if (offp == endp) { + data->size = + ALIGN(size + + pagesize, + sizeof(u_int32_t)); + return (ENOMEM); + } + goto back_up; + } + memcpy(dp, + (u_int8_t *)pg + HOFFSET(pg), size); + need_pg = 0; + space -= size; + np += size; + } + key_size = LEN_HKEY(dbp, pg, pagesize, indx); + key_off = (int32_t)(inp[indx] - HOFFSET(pg) + + dp - dbuf + SSZA(HKEYDATA, data)); + } + } + + hk = H_PAIRDATA(dbp, pg, indx); + switch (HPAGE_PTYPE(hk)) { + case H_DUPLICATE: + case H_KEYDATA: + if (need_pg) { + dp = np; + size = pagesize - HOFFSET(pg); + if (space < size) { +back_up: + if (indx != 0) { + indx -= 2; + /* XXX + * It's not clear that this is + * the right way to fix this, + * but here goes. + * If we are backing up onto a + * duplicate, then we need to + * position ourselves at the + * end of the duplicate set. + * We probably need to make + * this work for H_OFFDUP too. + * It might be worth making a + * dummy cursor and calling + * __ham_item_prev. + */ + tmp = H_PAIRDATA(dbp, pg, indx); + if (HPAGE_PTYPE(tmp) == + H_DUPLICATE) { + dup_off = dup_tlen = + LEN_HDATA(dbp, pg, + pagesize, indx + 1); + memcpy(&dup_len, + HKEYDATA_DATA(tmp), + sizeof(db_indx_t)); + } + goto get_space; + } + /* indx == 0 */ + if ((ret = __ham_item_prev(dbc, + lock_mode, &pgno)) != 0) { + if (ret != DB_NOTFOUND) + return (ret); + if ((ret = mpf->put(mpf, + cp->page, 0)) != 0) + return (ret); + cp->page = NULL; + if (cp->bucket == 0) { + cp->indx = indx = + NDX_INVALID; + goto get_space; + } + if ((ret = + __ham_get_meta(dbc)) != 0) + return (ret); + + cp->bucket--; + cp->pgno = BUCKET_TO_PAGE(cp, + cp->bucket); + cp->indx = NDX_INVALID; + if ((ret = __ham_release_meta( + dbc)) != 0) + return (ret); + if ((ret = __ham_item_prev(dbc, + lock_mode, &pgno)) != 0) + return (ret); + } + indx = cp->indx; +get_space: + /* + * See if we put any data in the buffer. + */ + if (offp >= endp || + F_ISSET(dbc, DBC_TRANSIENT)) { + data->size = ALIGN(size + + data->ulen - space, + sizeof(u_int32_t)); + return (ENOMEM); + } + /* + * Don't continue; we're all out + * of space, even though we're + * returning success. + */ + next_key = 0; + break; + } + memcpy(dp, (u_int8_t *)pg + HOFFSET(pg), size); + need_pg = 0; + space -= size; + np += size; + } + + /* + * We're about to crack the offset(s) and length(s) + * out of an H_KEYDATA or H_DUPLICATE item. + * There are three cases: + * 1. We were moved into a duplicate set by + * the standard hash cursor code. Respect + * the dup_off and dup_tlen we were given. + * 2. We stumbled upon a duplicate set while + * walking the page on our own. We need to + * recognize it as a dup and set dup_off and + * dup_tlen. + * 3. The current item is not a dup. + */ + if (F_ISSET(cp, H_ISDUP)) { + /* Case 1 */ + is_dup = 1; + dup_len = cp->dup_len; + dup_off = cp->dup_off; + dup_tlen = cp->dup_tlen; + } else if (HPAGE_PTYPE(hk) == H_DUPLICATE) { + /* Case 2 */ + is_dup = 1; + /* + * If we run out of memory and bail, + * make sure the fact we're in a dup set + * isn't ignored later. + */ + F_SET(cp, H_ISDUP); + dup_off = 0; + memcpy(&dup_len, + HKEYDATA_DATA(hk), sizeof(db_indx_t)); + dup_tlen = LEN_HDATA(dbp, pg, pagesize, indx); + } else + /* Case 3 */ + is_dup = dup_len = dup_off = dup_tlen = 0; + + do { + space -= (is_key ? 4 : 2) * sizeof(*offp); + size += (is_key ? 4 : 2) * sizeof(*offp); + /* + * Since space is an unsigned, if we happen + * to wrap, then this comparison will turn out + * to be true. XXX Wouldn't it be better to + * simply check above that space is greater than + * the value we're about to subtract??? + */ + if (space > data->ulen) { + if (!is_dup || dup_off == 0) + goto back_up; + dup_off -= (db_indx_t)DUP_SIZE(offp[1]); + goto get_space; + } + if (is_key) { + *offp-- = key_off; + *offp-- = key_size; + } + if (is_dup) { + *offp-- = (int32_t)( + inp[indx + 1] - HOFFSET(pg) + + dp - dbuf + SSZA(HKEYDATA, data) + + dup_off + sizeof(db_indx_t)); + memcpy(&dup_len, + HKEYDATA_DATA(hk) + dup_off, + sizeof(db_indx_t)); + dup_off += DUP_SIZE(dup_len); + *offp-- = dup_len; + } else { + *offp-- = (int32_t)( + inp[indx + 1] - HOFFSET(pg) + + dp - dbuf + SSZA(HKEYDATA, data)); + *offp-- = LEN_HDATA(dbp, pg, + pagesize, indx); + } + } while (is_dup && dup_off < dup_tlen && no_dup == 0); + F_CLR(cp, H_ISDUP); + break; + case H_OFFDUP: + memcpy(&pgno, HOFFPAGE_PGNO(hk), sizeof(db_pgno_t)); + space -= 2 * sizeof(*offp); + if (space > data->ulen) + goto back_up; + + if (is_key) { + space -= 2 * sizeof(*offp); + if (space > data->ulen) + goto back_up; + *offp-- = key_off; + *offp-- = key_size; + } + saveoff = offp; + if ((ret = __bam_bulk_duplicates(dbc, + pgno, dbuf, is_key ? offp + 2 : NULL, + &offp, &np, &space, no_dup)) != 0) { + if (ret == ENOMEM) { + size = space; + if (is_key && saveoff == offp) { + offp += 2; + goto back_up; + } + goto get_space; + } + return (ret); + } + break; + case H_OFFPAGE: + space -= (is_key ? 4 : 2) * sizeof(*offp); + if (space > data->ulen) + goto back_up; + + memcpy(&size, HOFFPAGE_TLEN(hk), sizeof(u_int32_t)); + memcpy(&pgno, HOFFPAGE_PGNO(hk), sizeof(db_pgno_t)); + if (size > space) + goto back_up; + + if ((ret = + __bam_bulk_overflow(dbc, size, pgno, np)) != 0) + return (ret); + + if (is_key) { + *offp-- = key_off; + *offp-- = key_size; + } + + *offp-- = (int32_t)(np - dbuf); + *offp-- = size; + + np += size; + space -= size; + break; + } + } while (next_key && (indx += 2) < NUM_ENT(pg)); + + cp->indx = indx; + cp->dup_len = dup_len; + cp->dup_off = dup_off; + cp->dup_tlen = dup_tlen; + + /* If we are off the page then try to the next page. */ + if (ret == 0 && next_key && indx >= NUM_ENT(pg)) { + if ((ret = __ham_item_next(dbc, lock_mode, &pgno)) == 0) + goto next_pg; + if (ret != DB_NOTFOUND) + return (ret); + if ((ret = mpf->put(dbc->dbp->mpf, cp->page, 0)) != 0) + return (ret); + cp->page = NULL; + if ((ret = __ham_get_meta(dbc)) != 0) + return (ret); + + cp->bucket++; + if (cp->bucket > cp->hdr->max_bucket) { + /* + * Restore cursor to its previous state. We're past + * the last item in the last bucket, so the next + * DBC->c_get(DB_NEXT) will return DB_NOTFOUND. + */ + cp->bucket--; + ret = DB_NOTFOUND; + } else { + /* + * Start on the next bucket. + * + * Note that if this new bucket happens to be empty, + * but there's another non-empty bucket after it, + * we'll return early. This is a rare case, and we + * don't guarantee any particular number of keys + * returned on each call, so just let the next call + * to bulk get move forward by yet another bucket. + */ + cp->pgno = BUCKET_TO_PAGE(cp, cp->bucket); + cp->indx = NDX_INVALID; + F_CLR(cp, H_ISDUP); + ret = __ham_item_next(dbc, lock_mode, &pgno); + } + + if ((t_ret = __ham_release_meta(dbc)) != 0) + return (t_ret); + if (ret == 0) + goto next_pg; + if (ret != DB_NOTFOUND) + return (ret); + } + *offp = (u_int32_t) -1; + return (0); +} + static int __ham_c_put(dbc, key, data, flags, pgnop) DBC *dbc; @@ -949,6 +969,7 @@ __ham_c_put(dbc, key, data, flags, pgnop) db_pgno_t *pgnop; { DB *dbp; + DB_MPOOLFILE *mpf; DBT tmp_val, *myval; HASH_CURSOR *hcp; u_int32_t nbytes; @@ -962,6 +983,7 @@ __ham_c_put(dbc, key, data, flags, pgnop) COMPQUIET(myval, NULL); dbp = dbc->dbp; + mpf = dbp->mpf; hcp = (HASH_CURSOR *)dbc->internal; if (F_ISSET(hcp, H_DELETED) && @@ -984,8 +1006,7 @@ __ham_c_put(dbc, key, data, flags, pgnop) ret = 0; if (hcp->seek_found_page != PGNO_INVALID && hcp->seek_found_page != hcp->pgno) { - if ((ret = memp_fput(dbp->mpf, hcp->page, 0)) - != 0) + if ((ret = mpf->put(mpf, hcp->page, 0)) != 0) goto err2; hcp->page = NULL; hcp->pgno = hcp->seek_found_page; @@ -1000,9 +1021,10 @@ __ham_c_put(dbc, key, data, flags, pgnop) * and then write the new bytes represented by * val. */ - if ((ret = __ham_init_dbt(dbp->dbenv, - &tmp_val, data->size + data->doff, - &dbc->rdata.data, &dbc->rdata.ulen)) == 0) { + if ((ret = __ham_init_dbt(dbp->dbenv, &tmp_val, + data->size + data->doff, + &dbc->my_rdata.data, + &dbc->my_rdata.ulen)) == 0) { memset(tmp_val.data, 0, data->doff); memcpy((u_int8_t *)tmp_val.data + data->doff, data->data, data->size); @@ -1038,8 +1060,8 @@ done: if (ret == 0 && F_ISSET(hcp, H_EXPAND)) { F_CLR(hcp, H_EXPAND); } - if (ret == 0 && - (t_ret = memp_fset(dbp->mpf, hcp->page, DB_MPOOL_DIRTY)) != 0) + if (hcp->page != NULL && + (t_ret = mpf->set(mpf, hcp->page, DB_MPOOL_DIRTY)) != 0 && ret == 0) ret = t_ret; err2: if ((t_ret = __ham_release_meta(dbc)) != 0 && ret == 0) @@ -1058,17 +1080,30 @@ __ham_expand_table(dbc) DBC *dbc; { DB *dbp; - PAGE *h; + DB_LOCK metalock; + DB_LSN lsn; + DB_MPOOLFILE *mpf; + DBMETA *mmeta; HASH_CURSOR *hcp; - db_pgno_t pgno; - u_int32_t old_bucket, new_bucket; - int ret; + PAGE *h; + db_pgno_t pgno, mpgno; + u_int32_t newalloc, new_bucket, old_bucket; + int dirty_meta, got_meta, logn, new_double, ret; dbp = dbc->dbp; + mpf = dbp->mpf; hcp = (HASH_CURSOR *)dbc->internal; if ((ret = __ham_dirty_meta(dbc)) != 0) return (ret); + LOCK_INIT(metalock); + mmeta = (DBMETA *) hcp->hdr; + mpgno = mmeta->pgno; + h = NULL; + dirty_meta = 0; + got_meta = 0; + newalloc = 0; + /* * If the split point is about to increase, make sure that we * have enough extra pages. The calculation here is weird. @@ -1078,86 +1113,116 @@ __ham_expand_table(dbc) * see what the log of one greater than that is; here we have to * look at the log of max + 2. VERY NASTY STUFF. * - * It just got even nastier. With subdatabases, we have to request - * a chunk of contiguous pages, so we do that here using an - * undocumented feature of mpool (the MPOOL_NEW_GROUP flag) to - * give us a number of contiguous pages. Ouch. + * We figure out what we need to do, then we log it, then request + * the pages from mpool. We don't want to fail after extending + * the file. + * + * If the page we are about to split into has already been allocated, + * then we simply need to get it to get its LSN. If it hasn't yet + * been allocated, then we know it's LSN (0,0). */ - if (hcp->hdr->max_bucket == hcp->hdr->high_mask) { - /* - * Ask mpool to give us a set of contiguous page numbers - * large enough to contain the next doubling. - * - * Figure out how many new pages we need. This will return - * us the last page. We calculate its page number, initialize - * the page and then write it back to reserve all the pages - * in between. It is possible that the allocation of new pages - * has already been done, but the tranaction aborted. Since - * we don't undo the allocation, check for a valid pgno before - * doing the allocation. - */ - pgno = hcp->hdr->max_bucket + 1; - if (hcp->hdr->spares[__db_log2(pgno) + 1] == PGNO_INVALID) - /* Allocate a group of pages. */ - ret = memp_fget(dbp->mpf, - &pgno, DB_MPOOL_NEW_GROUP, &h); - else { - /* Just read in the last page of the batch */ - pgno = hcp->hdr->spares[__db_log2(pgno) + 1] + - hcp->hdr->max_bucket + 1; - /* Move to the last page of the group. */ - pgno += hcp->hdr->max_bucket; - ret = memp_fget(dbp->mpf, - &pgno, DB_MPOOL_CREATE, &h); - } - if (ret != 0) - return (ret); - P_INIT(h, dbp->pgsize, pgno, - PGNO_INVALID, PGNO_INVALID, 0, P_HASH); - pgno -= hcp->hdr->max_bucket; - } else { - pgno = BUCKET_TO_PAGE(hcp, hcp->hdr->max_bucket + 1); + new_bucket = hcp->hdr->max_bucket + 1; + old_bucket = new_bucket & hcp->hdr->low_mask; + + new_double = hcp->hdr->max_bucket == hcp->hdr->high_mask; + logn = __db_log2(new_bucket); + + if (!new_double || hcp->hdr->spares[logn + 1] != PGNO_INVALID) { + /* Page exists; get it so we can get its LSN */ + pgno = BUCKET_TO_PAGE(hcp, new_bucket); if ((ret = - memp_fget(dbp->mpf, &pgno, DB_MPOOL_CREATE, &h)) != 0) - return (ret); + mpf->get(mpf, &pgno, DB_MPOOL_CREATE, &h)) != 0) + goto err; + lsn = h->lsn; + } else { + /* Get the master meta-data page to do allocation. */ + if (F_ISSET(dbp, DB_AM_SUBDB)) { + mpgno = PGNO_BASE_MD; + if ((ret = __db_lget(dbc, + 0, mpgno, DB_LOCK_WRITE, 0, &metalock)) != 0) + goto err; + if ((ret = + mpf->get(mpf, &mpgno, 0, (PAGE **)&mmeta)) != 0) + goto err; + got_meta = 1; + } + pgno = mmeta->last_pgno + 1; + ZERO_LSN(lsn); + newalloc = 1; } - /* Now we can log the meta-data split. */ - if (DB_LOGGING(dbc)) { - if ((ret = __ham_metagroup_log(dbp->dbenv, - dbc->txn, &h->lsn, 0, dbp->log_fileid, - hcp->hdr->max_bucket, pgno, &hcp->hdr->dbmeta.lsn, - &h->lsn)) != 0) { - (void)memp_fput(dbp->mpf, h, DB_MPOOL_DIRTY); - return (ret); - } + /* Log the meta-data split first. */ + if (DBC_LOGGING(dbc)) { + /* + * We always log the page number of the first page of + * the allocation group. However, the LSN that we log + * is either the LSN on the first page (if we did not + * do the actual allocation here) or the LSN on the last + * page of the unit (if we did do the allocation here). + */ + if ((ret = __ham_metagroup_log(dbp, dbc->txn, + &lsn, 0, hcp->hdr->max_bucket, mpgno, &mmeta->lsn, + hcp->hdr->dbmeta.pgno, &hcp->hdr->dbmeta.lsn, + pgno, &lsn, newalloc)) != 0) + goto err; + } else + LSN_NOT_LOGGED(lsn); - hcp->hdr->dbmeta.lsn = h->lsn; - } + hcp->hdr->dbmeta.lsn = lsn; - /* If we allocated some new pages, write out the last page. */ - if ((ret = memp_fput(dbp->mpf, h, DB_MPOOL_DIRTY)) != 0) - return (ret); + if (new_double && hcp->hdr->spares[logn + 1] == PGNO_INVALID) { + /* + * We need to begin a new doubling and we have not allocated + * any pages yet. Read the last page in and initialize it to + * make the allocation contiguous. The pgno we calculated + * above is the first page allocated. The entry in spares is + * that page number minus any buckets already allocated (it + * simplifies bucket to page transaction). After we've set + * that, we calculate the last pgno. + */ + + hcp->hdr->spares[logn + 1] = pgno - new_bucket; + pgno += hcp->hdr->max_bucket; + mmeta->last_pgno = pgno; + mmeta->lsn = lsn; + dirty_meta = DB_MPOOL_DIRTY; - new_bucket = ++hcp->hdr->max_bucket; - old_bucket = (hcp->hdr->max_bucket & hcp->hdr->low_mask); + if ((ret = mpf->get(mpf, &pgno, DB_MPOOL_CREATE, &h)) != 0) + goto err; + + P_INIT(h, dbp->pgsize, + pgno, PGNO_INVALID, PGNO_INVALID, 0, P_HASH); + } + + /* Write out whatever page we ended up modifying. */ + h->lsn = lsn; + if ((ret = mpf->put(mpf, h, DB_MPOOL_DIRTY)) != 0) + goto err; + h = NULL; /* - * If we started a new doubling, fill in the spares array with - * the starting page number negatively offset by the bucket number. + * Update the meta-data page of this hash database. */ - if (new_bucket > hcp->hdr->high_mask) { - /* Starting a new doubling */ + hcp->hdr->max_bucket = new_bucket; + if (new_double) { hcp->hdr->low_mask = hcp->hdr->high_mask; hcp->hdr->high_mask = new_bucket | hcp->hdr->low_mask; - if (hcp->hdr->spares[__db_log2(new_bucket) + 1] == PGNO_INVALID) - hcp->hdr->spares[__db_log2(new_bucket) + 1] = - pgno - new_bucket; } /* Relocate records to the new bucket */ - return (__ham_split_page(dbc, old_bucket, new_bucket)); + ret = __ham_split_page(dbc, old_bucket, new_bucket); + +err: if (got_meta) + (void)mpf->put(mpf, mmeta, dirty_meta); + + if (LOCK_ISSET(metalock)) + (void)__TLPUT(dbc, metalock); + + if (h != NULL) + (void)mpf->put(mpf, h, 0); + + return (ret); } /* @@ -1191,7 +1256,7 @@ __ham_call_hash(dbc, k, len) * everything held by the cursor. */ static int -__ham_dup_return (dbc, val, flags) +__ham_dup_return(dbc, val, flags) DBC *dbc; DBT *val; u_int32_t flags; @@ -1211,7 +1276,7 @@ __ham_dup_return (dbc, val, flags) dbp = dbc->dbp; hcp = (HASH_CURSOR *)dbc->internal; ndx = H_DATAINDEX(hcp->indx); - type = HPAGE_TYPE(hcp->page, ndx); + type = HPAGE_TYPE(dbp, hcp->page, ndx); pp = hcp->page; myval = val; @@ -1228,8 +1293,8 @@ __ham_dup_return (dbc, val, flags) DB_ASSERT(type != H_OFFDUP); /* Case 1 */ - if (type != H_DUPLICATE && - flags != DB_GET_BOTH && flags != DB_GET_BOTHC) + if (type != H_DUPLICATE && flags != DB_GET_BOTH && + flags != DB_GET_BOTHC && flags != DB_GET_BOTH_RANGE) return (0); /* @@ -1239,11 +1304,11 @@ __ham_dup_return (dbc, val, flags) */ if (!F_ISSET(hcp, H_ISDUP) && type == H_DUPLICATE) { F_SET(hcp, H_ISDUP); - hcp->dup_tlen = LEN_HDATA(hcp->page, + hcp->dup_tlen = LEN_HDATA(dbp, hcp->page, hcp->hdr->dbmeta.pagesize, hcp->indx); - hk = H_PAIRDATA(hcp->page, hcp->indx); - if (flags == DB_LAST - || flags == DB_PREV || flags == DB_PREV_NODUP) { + hk = H_PAIRDATA(dbp, hcp->page, hcp->indx); + if (flags == DB_LAST || + flags == DB_PREV || flags == DB_PREV_NODUP) { hcp->dup_off = 0; do { memcpy(&len, @@ -1265,7 +1330,8 @@ __ham_dup_return (dbc, val, flags) * may need to adjust the cursor before returning data. * Case 4 */ - if (flags == DB_GET_BOTH || flags == DB_GET_BOTHC) { + if (flags == DB_GET_BOTH || + flags == DB_GET_BOTHC || flags == DB_GET_BOTH_RANGE) { if (F_ISSET(hcp, H_ISDUP)) { /* * If we're doing a join, search forward from the @@ -1274,7 +1340,7 @@ __ham_dup_return (dbc, val, flags) if (flags == DB_GET_BOTHC) F_SET(hcp, H_CONTINUE); - __ham_dsearch(dbc, val, &off, &cmp); + __ham_dsearch(dbc, val, &off, &cmp, flags); /* * This flag is set nowhere else and is safe to @@ -1283,7 +1349,7 @@ __ham_dup_return (dbc, val, flags) F_CLR(hcp, H_CONTINUE); hcp->dup_off = off; } else { - hk = H_PAIRDATA(hcp->page, hcp->indx); + hk = H_PAIRDATA(dbp, hcp->page, hcp->indx); if (((HKEYDATA *)hk)->type == H_OFFPAGE) { memcpy(&tlen, HOFFPAGE_TLEN(hk), sizeof(u_int32_t)); @@ -1298,7 +1364,7 @@ __ham_dup_return (dbc, val, flags) * routines may only look at data and size. */ tmp_val.data = HKEYDATA_DATA(hk); - tmp_val.size = LEN_HDATA(hcp->page, + tmp_val.size = LEN_HDATA(dbp, hcp->page, dbp->pgsize, hcp->indx); cmp = dbp->dup_compare == NULL ? __bam_defcmp(dbp, &tmp_val, val) : @@ -1311,6 +1377,18 @@ __ham_dup_return (dbc, val, flags) } /* + * If we're doing a bulk get, we don't want to actually return + * the data: __ham_bulk will take care of cracking out the + * duplicates appropriately. + * + * The rest of this function calculates partial offsets and + * handles the actual __db_ret, so just return if + * DB_MULTIPLE(_KEY) is set. + */ + if (F_ISSET(dbc, DBC_MULTIPLE | DBC_MULTIPLE_KEY)) + return (0); + + /* * Now, everything is initialized, grab a duplicate if * necessary. */ @@ -1351,8 +1429,8 @@ __ham_dup_return (dbc, val, flags) * Finally, if we had a duplicate, pp, ndx, and myval should be * set appropriately. */ - if ((ret = __db_ret(dbp, pp, ndx, myval, &dbc->rdata.data, - &dbc->rdata.ulen)) != 0) + if ((ret = __db_ret(dbp, pp, ndx, myval, &dbc->rdata->data, + &dbc->rdata->ulen)) != 0) return (ret); /* @@ -1374,6 +1452,7 @@ __ham_overwrite(dbc, nval, flags) u_int32_t flags; { DB *dbp; + DB_ENV *dbenv; HASH_CURSOR *hcp; DBT *myval, tmp_val, tmp_val2; void *newrec; @@ -1383,6 +1462,7 @@ __ham_overwrite(dbc, nval, flags) int ret; dbp = dbc->dbp; + dbenv = dbp->dbenv; hcp = (HASH_CURSOR *)dbc->internal; if (F_ISSET(hcp, H_ISDUP)) { /* @@ -1399,7 +1479,7 @@ __ham_overwrite(dbc, nval, flags) */ memset(&tmp_val, 0, sizeof(tmp_val)); if ((ret = - __ham_dup_return (dbc, &tmp_val, DB_CURRENT)) != 0) + __ham_dup_return(dbc, &tmp_val, DB_CURRENT)) != 0) return (ret); /* Figure out new size. */ @@ -1435,7 +1515,7 @@ __ham_overwrite(dbc, nval, flags) } if ((ret = __os_malloc(dbp->dbenv, - DUP_SIZE(newsize), NULL, &newrec)) != 0) + DUP_SIZE(newsize), &newrec)) != 0) return (ret); memset(&tmp_val2, 0, sizeof(tmp_val2)); F_SET(&tmp_val2, DB_DBT_PARTIAL); @@ -1483,8 +1563,7 @@ __ham_overwrite(dbc, nval, flags) tmp_val2.size = newsize; if (dbp->dup_compare( dbp, &tmp_val, &tmp_val2) != 0) { - (void)__os_free(newrec, - DUP_SIZE(newsize)); + (void)__os_free(dbenv, newrec); return (__db_duperr(dbp, flags)); } } @@ -1495,7 +1574,7 @@ __ham_overwrite(dbc, nval, flags) tmp_val2.dlen = DUP_SIZE(hcp->dup_len); ret = __ham_replpair(dbc, &tmp_val2, 0); - (void)__os_free(newrec, DUP_SIZE(newsize)); + (void)__os_free(dbenv, newrec); /* Update cursor */ if (ret != 0) @@ -1520,7 +1599,7 @@ __ham_overwrite(dbc, nval, flags) /* Make sure we maintain sort order. */ if (dbp->dup_compare != NULL) { tmp_val2.data = - HKEYDATA_DATA(H_PAIRDATA(hcp->page, + HKEYDATA_DATA(H_PAIRDATA(dbp, hcp->page, hcp->indx)) + hcp->dup_off + sizeof(db_indx_t); tmp_val2.size = hcp->dup_len; @@ -1529,8 +1608,8 @@ __ham_overwrite(dbc, nval, flags) } /* Overwriting a complete duplicate. */ if ((ret = - __ham_make_dup(dbp->dbenv, nval, - &tmp_val, &dbc->rdata.data, &dbc->rdata.ulen)) != 0) + __ham_make_dup(dbp->dbenv, nval, &tmp_val, + &dbc->my_rdata.data, &dbc->my_rdata.ulen)) != 0) return (ret); /* Now fix what we are replacing. */ tmp_val.doff = hcp->dup_off; @@ -1541,7 +1620,7 @@ __ham_overwrite(dbc, nval, flags) hcp->dup_tlen += (nval->size - hcp->dup_len); else hcp->dup_tlen -= (hcp->dup_len - nval->size); - hcp->dup_len = DUP_SIZE(nval->size); + hcp->dup_len = (db_indx_t)DUP_SIZE(nval->size); } myval = &tmp_val; } else if (!F_ISSET(nval, DB_DBT_PARTIAL)) { @@ -1549,12 +1628,12 @@ __ham_overwrite(dbc, nval, flags) memcpy(&tmp_val, nval, sizeof(*nval)); F_SET(&tmp_val, DB_DBT_PARTIAL); tmp_val.doff = 0; - hk = H_PAIRDATA(hcp->page, hcp->indx); + hk = H_PAIRDATA(dbp, hcp->page, hcp->indx); if (HPAGE_PTYPE(hk) == H_OFFPAGE) memcpy(&tmp_val.dlen, HOFFPAGE_TLEN(hk), sizeof(u_int32_t)); else - tmp_val.dlen = LEN_HDATA(hcp->page, + tmp_val.dlen = LEN_HDATA(dbp, hcp->page, hcp->hdr->dbmeta.pagesize, hcp->indx); myval = &tmp_val; } else @@ -1601,7 +1680,7 @@ __ham_lookup(dbc, key, sought, mode, pgnop) hcp->bucket = __ham_call_hash(dbc, (u_int8_t *)key->data, key->size); hcp->pgno = BUCKET_TO_PAGE(hcp, hcp->bucket); - while (1) { + for (;;) { *pgnop = PGNO_INVALID; if ((ret = __ham_item_next(dbc, mode, pgnop)) != 0) return (ret); @@ -1609,7 +1688,7 @@ __ham_lookup(dbc, key, sought, mode, pgnop) if (F_ISSET(hcp, H_NOMORE)) break; - hk = H_PAIRKEY(hcp->page, hcp->indx); + hk = H_PAIRKEY(dbp, hcp->page, hcp->indx); switch (HPAGE_PTYPE(hk)) { case H_OFFPAGE: memcpy(&tlen, HOFFPAGE_TLEN(hk), sizeof(u_int32_t)); @@ -1625,12 +1704,12 @@ __ham_lookup(dbc, key, sought, mode, pgnop) break; case H_KEYDATA: if (key->size == - LEN_HKEY(hcp->page, dbp->pgsize, hcp->indx) && + LEN_HKEY(dbp, hcp->page, dbp->pgsize, hcp->indx) && memcmp(key->data, HKEYDATA_DATA(hk), key->size) == 0) { /* Found the key, check for data type. */ found_key: F_SET(hcp, H_OK); - dk = H_PAIRDATA(hcp->page, hcp->indx); + dk = H_PAIRDATA(dbp, hcp->page, hcp->indx); if (HPAGE_PTYPE(dk) == H_OFFDUP) memcpy(pgnop, HOFFDUP_PGNO(dk), sizeof(db_pgno_t)); @@ -1643,7 +1722,7 @@ found_key: F_SET(hcp, H_OK); * These are errors because keys are never * duplicated, only data items are. */ - return (__db_pgfmt(dbp, PGNO(hcp->page))); + return (__db_pgfmt(dbp->dbenv, PGNO(hcp->page))); } } @@ -1677,7 +1756,7 @@ __ham_init_dbt(dbenv, dbt, size, bufp, sizep) memset(dbt, 0, sizeof(*dbt)); if (*sizep < size) { - if ((ret = __os_realloc(dbenv, size, NULL, bufp)) != 0) { + if ((ret = __os_realloc(dbenv, size, bufp)) != 0) { *sizep = 0; return (ret); } @@ -1732,8 +1811,8 @@ __ham_c_update(dbc, len, add, is_dup) MUTEX_THREAD_LOCK(dbenv, dbenv->dblist_mutexp); /* - * Calcuate the order of this deleted record. - * This will be one grater than any cursor that is pointing + * Calculate the order of this deleted record. + * This will be one greater than any cursor that is pointing * at this record and already marked as deleted. */ order = 0; @@ -1749,11 +1828,11 @@ __ham_c_update(dbc, len, add, is_dup) continue; lcp = (HASH_CURSOR *)cp->internal; if (F_ISSET(lcp, H_DELETED) && - hcp->pgno == lcp->pgno && - hcp->indx == lcp->indx && - order <= lcp->order && - (!is_dup || hcp->dup_off == lcp->dup_off)) - order = lcp->order +1; + hcp->pgno == lcp->pgno && + hcp->indx == lcp->indx && + order <= lcp->order && + (!is_dup || hcp->dup_off == lcp->dup_off)) + order = lcp->order + 1; } MUTEX_THREAD_UNLOCK(dbenv, dbp->mutexp); } @@ -1788,8 +1867,8 @@ __ham_c_update(dbc, len, add, is_dup) * We are "undeleting" so unmark all * cursors with the same order. */ - if (lcp->indx == hcp->indx - && F_ISSET(lcp, H_DELETED)) { + if (lcp->indx == hcp->indx && + F_ISSET(lcp, H_DELETED)) { if (lcp->order == hcp->order) F_CLR(lcp, H_DELETED); else if (lcp->order > @@ -1815,12 +1894,13 @@ __ham_c_update(dbc, len, add, is_dup) } else { if (lcp->indx > hcp->indx) { lcp->indx -= 2; - if (lcp->indx == hcp->indx - && F_ISSET(lcp, H_DELETED)) + if (lcp->indx == hcp->indx && + F_ISSET(lcp, H_DELETED)) lcp->order += order; - } else if (lcp->indx == hcp->indx - && !F_ISSET(lcp, H_DELETED)) { + } else if (lcp->indx == hcp->indx && + !F_ISSET(lcp, H_DELETED)) { F_SET(lcp, H_DELETED); + F_CLR(lcp, H_ISDUP); lcp->order = order; } } @@ -1833,10 +1913,10 @@ __ham_c_update(dbc, len, add, is_dup) */ if (add) { lcp->dup_tlen += len; - if (lcp->dup_off == hcp->dup_off - && F_ISSET(hcp, H_DELETED) - && F_ISSET(lcp, H_DELETED)) { - /* Abort of a delete. */ + if (lcp->dup_off == hcp->dup_off && + F_ISSET(hcp, H_DELETED) && + F_ISSET(lcp, H_DELETED)) { + /* Abort of a delete. */ if (lcp->order == hcp->order) F_CLR(lcp, H_DELETED); else if (lcp->order > @@ -1851,8 +1931,9 @@ __ham_c_update(dbc, len, add, is_dup) lcp->dup_tlen -= len; if (lcp->dup_off > hcp->dup_off) { lcp->dup_off -= len; - if (lcp->dup_off == hcp->dup_off - && F_ISSET(lcp, H_DELETED)) + if (lcp->dup_off == + hcp->dup_off && + F_ISSET(lcp, H_DELETED)) lcp->order += order; } else if (lcp->dup_off == hcp->dup_off && @@ -1867,10 +1948,9 @@ __ham_c_update(dbc, len, add, is_dup) } MUTEX_THREAD_UNLOCK(dbenv, dbenv->dblist_mutexp); - if (found != 0 && DB_LOGGING(dbc)) { - if ((ret = __ham_curadj_log(dbenv, - my_txn, &lsn, 0, dbp->log_fileid, hcp->pgno, - hcp->indx, len, hcp->dup_off, add, is_dup, order)) != 0) + if (found != 0 && DBC_LOGGING(dbc)) { + if ((ret = __ham_curadj_log(dbp, my_txn, &lsn, 0, hcp->pgno, + hcp->indx, len, hcp->dup_off, add, is_dup, order)) != 0) return (ret); } @@ -1885,13 +1965,12 @@ __ham_c_update(dbc, len, add, is_dup) * cursors on a split. The latter is so we can update cursors when we * move items off page. * - * PUBLIC: int __ham_get_clist __P((DB *, - * PUBLIC: db_pgno_t, u_int32_t, DBC ***)); + * PUBLIC: int __ham_get_clist __P((DB *, db_pgno_t, u_int32_t, DBC ***)); */ int -__ham_get_clist(dbp, bucket, indx, listp) +__ham_get_clist(dbp, pgno, indx, listp) DB *dbp; - db_pgno_t bucket; + db_pgno_t pgno; u_int32_t indx; DBC ***listp; { @@ -1915,18 +1994,20 @@ __ham_get_clist(dbp, bucket, indx, listp) MUTEX_THREAD_LOCK(dbenv, dbp->mutexp); for (cp = TAILQ_FIRST(&ldbp->active_queue); cp != NULL; cp = TAILQ_NEXT(cp, links)) - if (cp->dbtype == DB_HASH && - ((indx == NDX_INVALID && - ((HASH_CURSOR *)(cp->internal))->bucket - == bucket) || (indx != NDX_INVALID && - cp->internal->pgno == bucket && - cp->internal->indx == indx))) { + /* + * We match if cp->pgno matches the specified + * pgno, and if either the cp->indx matches + * or we weren't given an index. + */ + if (cp->internal->pgno == pgno && + (indx == NDX_INVALID || + cp->internal->indx == indx)) { if (nused >= nalloc) { nalloc += 10; if ((ret = __os_realloc(dbp->dbenv, nalloc * sizeof(HASH_CURSOR *), - NULL, listp)) != 0) - return (ret); + listp)) != 0) + goto err; } (*listp)[nused++] = cp; } @@ -1939,74 +2020,25 @@ __ham_get_clist(dbp, bucket, indx, listp) if (nused >= nalloc) { nalloc++; if ((ret = __os_realloc(dbp->dbenv, - nalloc * sizeof(HASH_CURSOR *), NULL, listp)) != 0) + nalloc * sizeof(HASH_CURSOR *), listp)) != 0) return (ret); } (*listp)[nused] = NULL; } return (0); -} - -static int -__ham_del_dups(orig_dbc, key) - DBC *orig_dbc; - DBT *key; -{ - DBC *dbc; - DBT data, lkey; - int ret, t_ret; - - /* Allocate a cursor. */ - if ((ret = orig_dbc->c_dup(orig_dbc, &dbc, 0)) != 0) - return (ret); - - /* - * Walk a cursor through the key/data pairs, deleting as we go. Set - * the DB_DBT_USERMEM flag, as this might be a threaded application - * and the flags checking will catch us. We don't actually want the - * keys or data, so request a partial of length 0. - */ - memset(&lkey, 0, sizeof(lkey)); - F_SET(&lkey, DB_DBT_USERMEM | DB_DBT_PARTIAL); - memset(&data, 0, sizeof(data)); - F_SET(&data, DB_DBT_USERMEM | DB_DBT_PARTIAL); - - /* Walk through the set of key/data pairs, deleting as we go. */ - if ((ret = dbc->c_get(dbc, key, &data, DB_SET)) != 0) { - if (ret == DB_NOTFOUND) - ret = 0; - goto err; - } - - for (;;) { - if ((ret = dbc->c_del(dbc, 0)) != 0) - goto err; - if ((ret = dbc->c_get(dbc, &lkey, &data, DB_NEXT_DUP)) != 0) { - if (ret == DB_NOTFOUND) { - ret = 0; - break; - } - goto err; - } - } - -err: /* - * Discard the cursor. This will cause the underlying off-page dup - * tree to go away as well as the actual entry on the page. - */ - if ((t_ret = dbc->c_close(dbc)) != 0 && ret == 0) - ret = t_ret; - +err: + MUTEX_THREAD_UNLOCK(dbp->dbenv, dbp->mutexp); + MUTEX_THREAD_UNLOCK(dbenv, dbenv->dblist_mutexp); return (ret); - } static int __ham_c_writelock(dbc) DBC *dbc; { - HASH_CURSOR *hcp; + DB_ENV *dbenv; DB_LOCK tmp_lock; + HASH_CURSOR *hcp; int ret; /* @@ -2017,79 +2049,13 @@ __ham_c_writelock(dbc) return (0); hcp = (HASH_CURSOR *)dbc->internal; - if ((hcp->lock.off == LOCK_INVALID || hcp->lock_mode == DB_LOCK_READ)) { + if ((!LOCK_ISSET(hcp->lock) || hcp->lock_mode == DB_LOCK_READ)) { tmp_lock = hcp->lock; if ((ret = __ham_lock_bucket(dbc, DB_LOCK_WRITE)) != 0) return (ret); - if (tmp_lock.off != LOCK_INVALID && - (ret = lock_put(dbc->dbp->dbenv, &tmp_lock)) != 0) - return (ret); - } - return (0); -} - -/* - * __ham_c_chgpg -- - * - * Adjust the cursors after moving an item from one page to another. - * If the old_index is NDX_INVALID, that means that we copied the - * page wholesale and we're leaving indices intact and just changing - * the page number. - * - * PUBLIC: int __ham_c_chgpg - * PUBLIC: __P((DBC *, db_pgno_t, u_int32_t, db_pgno_t, u_int32_t)); - */ -int -__ham_c_chgpg(dbc, old_pgno, old_index, new_pgno, new_index) - DBC *dbc; - db_pgno_t old_pgno, new_pgno; - u_int32_t old_index, new_index; -{ - DB *dbp, *ldbp; - DB_ENV *dbenv; - DB_LSN lsn; - DB_TXN *my_txn; - DBC *cp; - HASH_CURSOR *hcp; - int found, ret; - - dbp = dbc->dbp; - dbenv = dbp->dbenv; - - my_txn = IS_SUBTRANSACTION(dbc->txn) ? dbc->txn : NULL; - found = 0; - - MUTEX_THREAD_LOCK(dbenv, dbenv->dblist_mutexp); - for (ldbp = __dblist_get(dbenv, dbp->adj_fileid); - ldbp != NULL && ldbp->adj_fileid == dbp->adj_fileid; - ldbp = LIST_NEXT(ldbp, dblistlinks)) { - MUTEX_THREAD_LOCK(dbenv, dbp->mutexp); - for (cp = TAILQ_FIRST(&ldbp->active_queue); cp != NULL; - cp = TAILQ_NEXT(cp, links)) { - if (cp == dbc || cp->dbtype != DB_HASH) - continue; - - hcp = (HASH_CURSOR *)cp->internal; - if (hcp->pgno == old_pgno) { - if (old_index == NDX_INVALID) { - hcp->pgno = new_pgno; - } else if (hcp->indx == old_index) { - hcp->pgno = new_pgno; - hcp->indx = new_index; - } else - continue; - if (my_txn != NULL && cp->txn != my_txn) - found = 1; - } - } - MUTEX_THREAD_UNLOCK(dbenv, dbp->mutexp); - } - MUTEX_THREAD_UNLOCK(dbenv, dbenv->dblist_mutexp); - - if (found != 0 && DB_LOGGING(dbc)) { - if ((ret = __ham_chgpg_log(dbenv, - my_txn, &lsn, 0, dbp->log_fileid, DB_HAM_CHGPG, - old_pgno, new_pgno, old_index, new_index)) != 0) + dbenv = dbc->dbp->dbenv; + if (LOCK_ISSET(tmp_lock) && + (ret = dbenv->lock_put(dbenv, &tmp_lock)) != 0) return (ret); } return (0); |