diff options
Diffstat (limited to 'db2/btree/bt_cursor.c')
-rw-r--r-- | db2/btree/bt_cursor.c | 1738 |
1 files changed, 948 insertions, 790 deletions
diff --git a/db2/btree/bt_cursor.c b/db2/btree/bt_cursor.c index 5d3366a3a1..10bc095c9d 100644 --- a/db2/btree/bt_cursor.c +++ b/db2/btree/bt_cursor.c @@ -8,148 +8,219 @@ #include "config.h" #ifndef lint -static const char sccsid[] = "@(#)bt_cursor.c 10.53 (Sleepycat) 5/25/98"; +static const char sccsid[] = "@(#)bt_cursor.c 10.81 (Sleepycat) 12/16/98"; #endif /* not lint */ #ifndef NO_SYSTEM_INCLUDES #include <sys/types.h> #include <errno.h> +#include <stdlib.h> #include <string.h> #endif #include "db_int.h" #include "db_page.h" #include "btree.h" +#include "shqueue.h" +#include "db_shash.h" +#include "lock.h" +#include "lock_ext.h" static int __bam_c_close __P((DBC *)); static int __bam_c_del __P((DBC *, u_int32_t)); -static int __bam_c_first __P((DB *, CURSOR *)); +static int __bam_c_destroy __P((DBC *)); +static int __bam_c_first __P((DBC *, CURSOR *)); static int __bam_c_get __P((DBC *, DBT *, DBT *, u_int32_t)); -static int __bam_c_getstack __P((DB *, CURSOR *)); -static int __bam_c_last __P((DB *, CURSOR *)); -static int __bam_c_next __P((DB *, CURSOR *, int)); -static int __bam_c_physdel __P((DB *, CURSOR *, PAGE *)); -static int __bam_c_prev __P((DB *, CURSOR *)); +static int __bam_c_getstack __P((DBC *, CURSOR *)); +static int __bam_c_last __P((DBC *, CURSOR *)); +static int __bam_c_next __P((DBC *, CURSOR *, int)); +static int __bam_c_physdel __P((DBC *, CURSOR *, PAGE *)); +static int __bam_c_prev __P((DBC *, CURSOR *)); static int __bam_c_put __P((DBC *, DBT *, DBT *, u_int32_t)); -static int __bam_c_rget __P((DB *, CURSOR *, DBT *, u_int32_t)); -static int __bam_c_search - __P((DB *, CURSOR *, const DBT *, u_int32_t, int, int *)); +static void __bam_c_reset __P((CURSOR *)); +static int __bam_c_rget __P((DBC *, DBT *, u_int32_t)); +static int __bam_c_search __P((DBC *, CURSOR *, const DBT *, u_int32_t, int *)); +static int __bam_dsearch __P((DBC *, CURSOR *, DBT *, u_int32_t *)); /* Discard the current page/lock held by a cursor. */ #undef DISCARD -#define DISCARD(dbp, cp) { \ +#define DISCARD(dbc, cp) { \ if ((cp)->page != NULL) { \ - (void)memp_fput(dbp->mpf, (cp)->page, 0); \ + (void)memp_fput((dbc)->dbp->mpf, (cp)->page, 0); \ (cp)->page = NULL; \ } \ if ((cp)->lock != LOCK_INVALID) { \ - (void)__BT_TLPUT((dbp), (cp)->lock); \ + (void)__BT_TLPUT((dbc), (cp)->lock); \ (cp)->lock = LOCK_INVALID; \ } \ } +/* If the cursor references a deleted record. */ +#undef IS_CUR_DELETED +#define IS_CUR_DELETED(cp) \ + (((cp)->dpgno == PGNO_INVALID && \ + B_DISSET(GET_BKEYDATA((cp)->page, \ + (cp)->indx + O_INDX)->type)) || \ + ((cp)->dpgno != PGNO_INVALID && \ + B_DISSET(GET_BKEYDATA((cp)->page, (cp)->dindx)->type))) + +/* If the cursor and index combination references a deleted record. */ +#undef IS_DELETED +#define IS_DELETED(cp, indx) \ + (((cp)->dpgno == PGNO_INVALID && \ + B_DISSET(GET_BKEYDATA((cp)->page, (indx) + O_INDX)->type)) || \ + ((cp)->dpgno != PGNO_INVALID && \ + B_DISSET(GET_BKEYDATA((cp)->page, (indx))->type))) + /* - * __bam_cursor -- - * Interface to the cursor functions. + * Test to see if two cursors could point to duplicates of the same key, + * whether on-page or off-page. The leaf page numbers must be the same + * in both cases. In the case of off-page duplicates, the key indices + * on the leaf page will be the same. In the case of on-page duplicates, + * the duplicate page number must not be set, and the key index offsets + * must be the same. For the last test, as the saved copy of the cursor + * will not have a valid page pointer, we use the cursor's. + */ +#undef POSSIBLE_DUPLICATE +#define POSSIBLE_DUPLICATE(cursor, saved_copy) \ + ((cursor)->pgno == (saved_copy).pgno && \ + ((cursor)->indx == (saved_copy).indx || \ + ((cursor)->dpgno == PGNO_INVALID && \ + (saved_copy).dpgno == PGNO_INVALID && \ + (cursor)->page->inp[(cursor)->indx] == \ + (cursor)->page->inp[(saved_copy).indx]))) + +/* + * __bam_c_reset -- + * Initialize internal cursor structure. + */ +static void +__bam_c_reset(cp) + CURSOR *cp; +{ + cp->sp = cp->csp = cp->stack; + cp->esp = cp->stack + sizeof(cp->stack) / sizeof(cp->stack[0]); + cp->page = NULL; + cp->pgno = PGNO_INVALID; + cp->indx = 0; + cp->dpgno = PGNO_INVALID; + cp->dindx = 0; + cp->lock = LOCK_INVALID; + cp->mode = DB_LOCK_NG; + cp->recno = RECNO_OOB; + cp->flags = 0; +} + +/* + * __bam_c_init -- + * Initialize the access private portion of a cursor * - * PUBLIC: int __bam_cursor __P((DB *, DB_TXN *, DBC **)); + * PUBLIC: int __bam_c_init __P((DBC *)); */ int -__bam_cursor(dbp, txn, dbcp) - DB *dbp; - DB_TXN *txn; - DBC **dbcp; +__bam_c_init(dbc) + DBC *dbc; { + DB *dbp; CURSOR *cp; - DBC *dbc; - - DEBUG_LWRITE(dbp, txn, "bam_cursor", NULL, NULL, 0); + int ret; - if ((dbc = (DBC *)__db_calloc(1, sizeof(DBC))) == NULL) - return (ENOMEM); - if ((cp = (CURSOR *)__db_calloc(1, sizeof(CURSOR))) == NULL) { - __db_free(dbc); - return (ENOMEM); - } + if ((ret = __os_calloc(1, sizeof(CURSOR), &cp)) != 0) + return (ret); + dbp = dbc->dbp; cp->dbc = dbc; - cp->pgno = cp->dpgno = PGNO_INVALID; - cp->lock = LOCK_INVALID; - - dbc->dbp = dbp; - dbc->txn = txn; - dbc->internal = cp; - dbc->c_close = __bam_c_close; - dbc->c_del = __bam_c_del; - dbc->c_get = __bam_c_get; - dbc->c_put = __bam_c_put; /* - * All cursors are queued from the master DB structure. Add the - * cursor to that queue. + * Logical record numbers are always the same size, and we don't want + * to have to check for space every time we return one. Allocate it + * in advance. */ - CURSOR_SETUP(dbp); - TAILQ_INSERT_HEAD(&dbp->curs_queue, dbc, links); - CURSOR_TEARDOWN(dbp); + if (dbp->type == DB_RECNO || F_ISSET(dbp, DB_BT_RECNUM)) { + if ((ret = __os_malloc(sizeof(db_recno_t), + NULL, &dbc->rkey.data)) != 0) { + __os_free(cp, sizeof(CURSOR)); + return (ret); + } + dbc->rkey.ulen = sizeof(db_recno_t); + } + + /* Initialize methods. */ + dbc->internal = cp; + if (dbp->type == DB_BTREE) { + dbc->c_am_close = __bam_c_close; + dbc->c_am_destroy = __bam_c_destroy; + dbc->c_del = __bam_c_del; + dbc->c_get = __bam_c_get; + dbc->c_put = __bam_c_put; + } else { + dbc->c_am_close = __bam_c_close; + dbc->c_am_destroy = __bam_c_destroy; + dbc->c_del = __ram_c_del; + dbc->c_get = __ram_c_get; + dbc->c_put = __ram_c_put; + } + + /* Initialize dynamic information. */ + __bam_c_reset(cp); - *dbcp = dbc; return (0); } /* * __bam_c_close -- - * Close a single cursor. + * Close down the cursor from a single use. */ static int __bam_c_close(dbc) DBC *dbc; { + CURSOR *cp; DB *dbp; int ret; - DEBUG_LWRITE(dbc->dbp, dbc->txn, "bam_c_close", NULL, NULL, 0); + dbp = dbc->dbp; + cp = dbc->internal; + ret = 0; - GETHANDLE(dbc->dbp, dbc->txn, &dbp, ret); + /* + * If a cursor deleted a btree key, perform the actual deletion. + * (Recno keys are either deleted immediately or never deleted.) + */ + if (dbp->type == DB_BTREE && F_ISSET(cp, C_DELETED)) + ret = __bam_c_physdel(dbc, cp, NULL); - ret = __bam_c_iclose(dbp, dbc); + /* Discard any locks not acquired inside of a transaction. */ + if (cp->lock != LOCK_INVALID) { + (void)__BT_TLPUT(dbc, cp->lock); + cp->lock = LOCK_INVALID; + } + + /* Sanity checks. */ +#ifdef DIAGNOSTIC + if (cp->csp != cp->stack) + __db_err(dbp->dbenv, "btree cursor close: stack not empty"); +#endif + + /* Initialize dynamic information. */ + __bam_c_reset(cp); - PUTHANDLE(dbp); return (ret); } /* - * __bam_c_iclose -- + * __bam_c_destroy -- * Close a single cursor -- internal version. - * - * PUBLIC: int __bam_c_iclose __P((DB *, DBC *)); */ -int -__bam_c_iclose(dbp, dbc) - DB *dbp; +static int +__bam_c_destroy(dbc) DBC *dbc; { - CURSOR *cp; - int ret; - - /* If a cursor key was deleted, perform the actual deletion. */ - cp = dbc->internal; - ret = F_ISSET(cp, C_DELETED) ? __bam_c_physdel(dbp, cp, NULL) : 0; - - /* Discard any lock if we're not inside a transaction. */ - if (cp->lock != LOCK_INVALID) - (void)__BT_TLPUT(dbp, cp->lock); - - /* Remove the cursor from the queue. */ - CURSOR_SETUP(dbp); - TAILQ_REMOVE(&dbp->curs_queue, dbc, links); - CURSOR_TEARDOWN(dbp); - /* Discard the structures. */ - FREE(dbc->internal, sizeof(CURSOR)); - FREE(dbc, sizeof(DBC)); + __os_free(dbc->internal, sizeof(CURSOR)); - return (ret); + return (0); } /* @@ -161,7 +232,6 @@ __bam_c_del(dbc, flags) DBC *dbc; u_int32_t flags; { - BTREE *t; CURSOR *cp; DB *dbp; DB_LOCK lock; @@ -170,23 +240,31 @@ __bam_c_del(dbc, flags) db_indx_t indx; int ret; - DEBUG_LWRITE(dbc->dbp, dbc->txn, "bam_c_del", NULL, NULL, flags); - + dbp = dbc->dbp; cp = dbc->internal; h = NULL; + DB_PANIC_CHECK(dbp); + /* Check for invalid flags. */ - if ((ret = __db_cdelchk(dbc->dbp, flags, - F_ISSET(dbc->dbp, DB_AM_RDONLY), cp->pgno != PGNO_INVALID)) != 0) + if ((ret = __db_cdelchk(dbp, flags, + F_ISSET(dbp, DB_AM_RDONLY), cp->pgno != PGNO_INVALID)) != 0) return (ret); + /* + * If we are running CDB, this had better be either a write + * cursor or an immediate writer. + */ + if (F_ISSET(dbp, DB_AM_CDB)) + if (!F_ISSET(dbc, DBC_RMW | DBC_WRITER)) + return (EINVAL); + + DEBUG_LWRITE(dbc, dbc->txn, "bam_c_del", NULL, NULL, flags); + /* If already deleted, return failure. */ - if (F_ISSET(cp, C_DELETED | C_REPLACE)) + if (F_ISSET(cp, C_DELETED)) return (DB_KEYEMPTY); - GETHANDLE(dbc->dbp, dbc->txn, &dbp, ret); - t = dbp->internal; - /* * We don't physically delete the record until the cursor moves, * so we have to have a long-lived write lock on the page instead @@ -194,10 +272,10 @@ __bam_c_del(dbc, flags) * to even get here, so we simply discard it. */ if (F_ISSET(dbp, DB_AM_LOCKING) && cp->mode != DB_LOCK_WRITE) { - if ((ret = __bam_lget(dbp, + if ((ret = __bam_lget(dbc, 0, cp->pgno, DB_LOCK_WRITE, &lock)) != 0) goto err; - (void)__BT_TLPUT(dbp, cp->lock); + (void)__BT_TLPUT(dbc, cp->lock); cp->lock = lock; cp->mode = DB_LOCK_WRITE; } @@ -216,85 +294,50 @@ __bam_c_del(dbc, flags) indx = cp->dindx; } - if ((ret = __bam_pget(dbp, &h, &pgno, 0)) != 0) + if ((ret = memp_fget(dbp->mpf, &pgno, 0, &h)) != 0) goto err; /* Log the change. */ - if (DB_LOGGING(dbp) && - (ret = __bam_cdel_log(dbp->dbenv->lg_info, dbp->txn, &LSN(h), + if (DB_LOGGING(dbc) && + (ret = __bam_cdel_log(dbp->dbenv->lg_info, dbc->txn, &LSN(h), 0, dbp->log_fileid, PGNO(h), &LSN(h), indx)) != 0) { (void)memp_fput(dbp->mpf, h, 0); goto err; } - /* Set the intent-to-delete flag on the page and in all cursors. */ + /* + * Set the intent-to-delete flag on the page and update all cursors. */ if (cp->dpgno == PGNO_INVALID) B_DSET(GET_BKEYDATA(h, indx + O_INDX)->type); else B_DSET(GET_BKEYDATA(h, indx)->type); - (void)__bam_ca_delete(dbp, pgno, indx, NULL, 0); + (void)__bam_ca_delete(dbp, pgno, indx, 1); ret = memp_fput(dbp->mpf, h, DB_MPOOL_DIRTY); h = NULL; /* - * If it's a btree with record numbers, we have to adjust the - * counts. + * If the tree has record numbers, we have to adjust the counts. + * + * !!! + * This test is right -- we don't yet support duplicates and record + * numbers in the same tree, so ignore duplicates if DB_BT_RECNUM + * set. */ - if (F_ISSET(dbp, DB_BT_RECNUM) && - (ret = __bam_c_getstack(dbp, cp)) == 0) { - ret = __bam_adjust(dbp, t, -1); - (void)__bam_stkrel(dbp); + if (F_ISSET(dbp, DB_BT_RECNUM)) { + if ((ret = __bam_c_getstack(dbc, cp)) != 0) + goto err; + if ((ret = __bam_adjust(dbc, -1)) != 0) + goto err; + (void)__bam_stkrel(dbc, 0); } err: if (h != NULL) (void)memp_fput(dbp->mpf, h, 0); - PUTHANDLE(dbp); return (ret); } /* - * __bam_get -- - * Retrieve a key/data pair from the tree. - * - * PUBLIC: int __bam_get __P((DB *, DB_TXN *, DBT *, DBT *, u_int32_t)); - */ -int -__bam_get(argdbp, txn, key, data, flags) - DB *argdbp; - DB_TXN *txn; - DBT *key, *data; - u_int32_t flags; -{ - DBC dbc; - CURSOR cp; - int ret; - - DEBUG_LREAD(argdbp, txn, "bam_get", key, NULL, flags); - - /* Check for invalid flags. */ - if ((ret = __db_getchk(argdbp, key, data, flags)) != 0) - return (ret); - - /* Build an internal cursor. */ - memset(&cp, 0, sizeof(cp)); - cp.dbc = &dbc; - cp.pgno = cp.dpgno = PGNO_INVALID; - cp.lock = LOCK_INVALID; - cp.flags = C_INTERNAL; - - /* Build an external cursor. */ - memset(&dbc, 0, sizeof(dbc)); - dbc.dbp = argdbp; - dbc.txn = txn; - dbc.internal = &cp; - - /* Get the key. */ - return(__bam_c_get(&dbc, - key, data, LF_ISSET(DB_SET_RECNO) ? DB_SET_RECNO : DB_SET)); -} - -/* * __bam_c_get -- * Get using a cursor (btree). */ @@ -304,91 +347,197 @@ __bam_c_get(dbc, key, data, flags) DBT *key, *data; u_int32_t flags; { - BTREE *t; - CURSOR *cp, copy; + CURSOR *cp, copy, start; DB *dbp; PAGE *h; - int exact, ret; - - DEBUG_LREAD(dbc->dbp, dbc->txn, "bam_c_get", - flags == DB_SET || flags == DB_SET_RANGE ? key : NULL, NULL, flags); + int exact, ret, tmp_rmw; + dbp = dbc->dbp; cp = dbc->internal; + DB_PANIC_CHECK(dbp); + /* Check for invalid flags. */ - if ((ret = __db_cgetchk(dbc->dbp, + if ((ret = __db_cgetchk(dbp, key, data, flags, cp->pgno != PGNO_INVALID)) != 0) return (ret); - GETHANDLE(dbc->dbp, dbc->txn, &dbp, ret); - t = dbp->internal; + /* Clear OR'd in additional bits so we can check for flag equality. */ + tmp_rmw = 0; + if (LF_ISSET(DB_RMW)) { + if (!F_ISSET(dbp, DB_AM_CDB)) { + tmp_rmw = 1; + F_SET(dbc, DBC_RMW); + } + LF_CLR(DB_RMW); + } + + DEBUG_LREAD(dbc, dbc->txn, "bam_c_get", + flags == DB_SET || flags == DB_SET_RANGE ? key : NULL, NULL, flags); /* - * Break out the code to return a cursor's record number. It - * has nothing to do with the cursor get code except that it's - * been rammed into the interface. + * Return a cursor's record number. It has nothing to do with the + * cursor get code except that it's been rammed into the interface. */ - if (LF_ISSET(DB_GET_RECNO)) { - ret = __bam_c_rget(dbp, cp, data, flags); - PUTHANDLE(dbp); + if (flags == DB_GET_RECNO) { + ret = __bam_c_rget(dbc, data, flags); + if (tmp_rmw) + F_CLR(dbc, DBC_RMW); return (ret); } - /* Initialize the cursor for a new retrieval. */ - copy = *cp; + /* + * Initialize the cursor for a new retrieval. Clear the cursor's + * page pointer, it was set before this operation, and no longer + * has any meaning. + */ cp->page = NULL; + copy = *cp; cp->lock = LOCK_INVALID; switch (flags) { case DB_CURRENT: /* It's not possible to return a deleted record. */ - if (F_ISSET(cp, C_DELETED | C_REPLACE)) { - PUTHANDLE(dbp); - return (DB_KEYEMPTY); + if (F_ISSET(cp, C_DELETED)) { + ret = DB_KEYEMPTY; + goto err; } - /* Get the page with the current item on it. */ - if ((ret = __bam_pget(dbp, &cp->page, &cp->pgno, 0)) != 0) + /* Acquire the current page. */ + if ((ret = __bam_lget(dbc, + 0, cp->pgno, DB_LOCK_READ, &cp->lock)) == 0) + ret = memp_fget(dbp->mpf, + cp->dpgno == PGNO_INVALID ? &cp->pgno : &cp->dpgno, + 0, &cp->page); + if (ret != 0) goto err; break; + case DB_NEXT_DUP: + if (cp->pgno == PGNO_INVALID) { + ret = EINVAL; + goto err; + } + if ((ret = __bam_c_next(dbc, cp, 1)) != 0) + goto err; + + /* Make sure we didn't go past the end of the duplicates. */ + if (!POSSIBLE_DUPLICATE(cp, copy)) { + ret = DB_NOTFOUND; + goto err; + } + break; case DB_NEXT: if (cp->pgno != PGNO_INVALID) { - if ((ret = __bam_c_next(dbp, cp, 1)) != 0) + if ((ret = __bam_c_next(dbc, cp, 1)) != 0) goto err; break; } /* FALLTHROUGH */ case DB_FIRST: - if ((ret = __bam_c_first(dbp, cp)) != 0) + if ((ret = __bam_c_first(dbc, cp)) != 0) goto err; break; case DB_PREV: if (cp->pgno != PGNO_INVALID) { - if ((ret = __bam_c_prev(dbp, cp)) != 0) + if ((ret = __bam_c_prev(dbc, cp)) != 0) goto err; break; } /* FALLTHROUGH */ case DB_LAST: - if ((ret = __bam_c_last(dbp, cp)) != 0) + if ((ret = __bam_c_last(dbc, cp)) != 0) goto err; break; + case DB_SET: + if ((ret = __bam_c_search(dbc, cp, key, flags, &exact)) != 0) + goto err; + + /* + * We cannot currently be referencing a deleted record, but we + * may be referencing off-page duplicates. + * + * If we're referencing off-page duplicates, move off-page. + * If we moved off-page, move to the next non-deleted record. + * If we moved to the next non-deleted record, check to make + * sure we didn't switch records because our current record + * had no non-deleted data items. + */ + start = *cp; + if ((ret = __bam_dup(dbc, cp, cp->indx, 0)) != 0) + goto err; + if (cp->dpgno != PGNO_INVALID && IS_CUR_DELETED(cp)) { + if ((ret = __bam_c_next(dbc, cp, 0)) != 0) + goto err; + if (!POSSIBLE_DUPLICATE(cp, start)) { + ret = DB_NOTFOUND; + goto err; + } + } + break; case DB_SET_RECNO: - exact = 1; - if ((ret = - __bam_c_search(dbp, cp, key, S_FIND, 1, &exact)) != 0) + if ((ret = __bam_c_search(dbc, cp, key, flags, &exact)) != 0) goto err; break; - case DB_SET: - exact = 1; - if ((ret = - __bam_c_search(dbp, cp, key, S_FIND, 0, &exact)) != 0) + case DB_GET_BOTH: + if (F_ISSET(dbc, DBC_CONTINUE | DBC_KEYSET)) { + /* Acquire the current page. */ + if ((ret = memp_fget(dbp->mpf, + cp->dpgno == PGNO_INVALID ? &cp->pgno : &cp->dpgno, + 0, &cp->page)) != 0) + goto err; + + /* If DBC_CONTINUE, move to the next item. */ + if (F_ISSET(dbc, DBC_CONTINUE) && + (ret = __bam_c_next(dbc, cp, 1)) != 0) + goto err; + } else { + if ((ret = + __bam_c_search(dbc, cp, key, flags, &exact)) != 0) + goto err; + + /* + * We may be referencing a duplicates page. Move to + * the first duplicate. + */ + if ((ret = __bam_dup(dbc, cp, cp->indx, 0)) != 0) + goto err; + } + + /* Search for a matching entry. */ + if ((ret = __bam_dsearch(dbc, cp, data, NULL)) != 0) goto err; + + /* Ignore deleted entries. */ + if (IS_CUR_DELETED(cp)) { + ret = DB_NOTFOUND; + goto err; + } break; case DB_SET_RANGE: - exact = 0; - if ((ret = - __bam_c_search(dbp, cp, key, S_FIND, 0, &exact)) != 0) + if ((ret = __bam_c_search(dbc, cp, key, flags, &exact)) != 0) + goto err; + + /* + * As we didn't require an exact match, the search function + * may have returned an entry past the end of the page. If + * so, move to the next entry. + */ + if (cp->indx == NUM_ENT(cp->page) && + (ret = __bam_c_next(dbc, cp, 0)) != 0) + goto err; + + /* + * We may be referencing off-page duplicates, if so, move + * off-page. + */ + if ((ret = __bam_dup(dbc, cp, cp->indx, 0)) != 0) + goto err; + + /* + * We may be referencing a deleted record, if so, move to + * the next non-deleted record. + */ + if (IS_CUR_DELETED(cp) && (ret = __bam_c_next(dbc, cp, 0)) != 0) goto err; break; } @@ -401,12 +550,12 @@ __bam_c_get(dbc, key, data, flags) */ if (flags != DB_SET) { if (cp->dpgno != PGNO_INVALID) { - if ((ret = __bam_pget(dbp, &h, &cp->pgno, 0)) != 0) + if ((ret = memp_fget(dbp->mpf, &cp->pgno, 0, &h)) != 0) goto err; } else h = cp->page; ret = __db_ret(dbp, - h, cp->indx, key, &t->bt_rkey.data, &t->bt_rkey.ulen); + h, cp->indx, key, &dbc->rkey.data, &dbc->rkey.ulen); if (cp->dpgno != PGNO_INVALID) (void)memp_fput(dbp->mpf, h, 0); if (ret) @@ -416,62 +565,163 @@ __bam_c_get(dbc, key, data, flags) /* Return the data. */ if ((ret = __db_ret(dbp, cp->page, cp->dpgno == PGNO_INVALID ? cp->indx + O_INDX : cp->dindx, - data, &t->bt_rdata.data, &t->bt_rdata.ulen)) != 0) + data, &dbc->rdata.data, &dbc->rdata.ulen)) != 0) goto err; /* - * If the previous cursor record has been deleted, delete it. The - * returned key isn't a deleted key, so clear the flag. + * If the previous cursor record has been deleted, physically delete + * the entry from the page. We clear the deleted flag before we call + * the underlying delete routine so that, if an error occurs, and we + * restore the cursor, the deleted flag is cleared. This is because, + * if we manage to physically modify the page, and then restore the + * cursor, we might try to repeat the page modification when closing + * the cursor. */ - if (F_ISSET(©, C_DELETED) && __bam_c_physdel(dbp, ©, cp->page)) - goto err; - F_CLR(cp, C_DELETED | C_REPLACE); + if (F_ISSET(©, C_DELETED)) { + F_CLR(©, C_DELETED); + if ((ret = __bam_c_physdel(dbc, ©, cp->page)) != 0) + goto err; + } + F_CLR(cp, C_DELETED); - /* Release the previous lock, if any. */ + /* Release the previous lock, if any; the current lock is retained. */ if (copy.lock != LOCK_INVALID) - (void)__BT_TLPUT(dbp, copy.lock); - - /* Release the pinned page. */ - ret = memp_fput(dbp->mpf, cp->page, 0); + (void)__BT_TLPUT(dbc, copy.lock); - /* Internal cursors don't hold locks. */ - if (F_ISSET(cp, C_INTERNAL) && cp->lock != LOCK_INVALID) - (void)__BT_TLPUT(dbp, cp->lock); - - ++t->lstat.bt_get; + /* Release the current page. */ + if ((ret = memp_fput(dbp->mpf, cp->page, 0)) != 0) + goto err; if (0) { err: if (cp->page != NULL) (void)memp_fput(dbp->mpf, cp->page, 0); if (cp->lock != LOCK_INVALID) - (void)__BT_TLPUT(dbp, cp->lock); + (void)__BT_TLPUT(dbc, cp->lock); *cp = copy; } - PUTHANDLE(dbp); + /* Release temporary lock upgrade. */ + if (tmp_rmw) + F_CLR(dbc, DBC_RMW); + return (ret); } /* + * __bam_dsearch -- + * Search for a matching data item (or the first data item that's + * equal to or greater than the one we're searching for). + */ +static int +__bam_dsearch(dbc, cp, data, iflagp) + DBC *dbc; + CURSOR *cp; + DBT *data; + u_int32_t *iflagp; +{ + DB *dbp; + CURSOR copy, last; + int cmp, ret; + + dbp = dbc->dbp; + + /* + * If iflagp is non-NULL, we're doing an insert. + * + * If the duplicates are off-page, use the duplicate search routine. + */ + if (cp->dpgno != PGNO_INVALID) { + if ((ret = __db_dsearch(dbc, iflagp != NULL, + data, cp->dpgno, &cp->dindx, &cp->page, &cmp)) != 0) + return (ret); + cp->dpgno = cp->page->pgno; + + if (iflagp == NULL) { + if (cmp != 0) + return (DB_NOTFOUND); + return (0); + } + *iflagp = DB_BEFORE; + return (0); + } + + /* Otherwise, do the search ourselves. */ + copy = *cp; + for (;;) { + /* Save the last interesting cursor position. */ + last = *cp; + + /* See if the data item matches the one we're looking for. */ + if ((cmp = __bam_cmp(dbp, data, cp->page, cp->indx + O_INDX, + dbp->dup_compare == NULL ? + __bam_defcmp : dbp->dup_compare)) == 0) { + if (iflagp != NULL) + *iflagp = DB_AFTER; + return (0); + } + + /* + * If duplicate entries are sorted, we're done if we find a + * page entry that sorts greater than the application item. + * If doing an insert, return success, otherwise DB_NOTFOUND. + */ + if (dbp->dup_compare != NULL && cmp < 0) { + if (iflagp == NULL) + return (DB_NOTFOUND); + *iflagp = DB_BEFORE; + return (0); + } + + /* + * Move to the next item. If we reach the end of the page and + * we're doing an insert, set the cursor to the last item and + * set the referenced memory location so callers know to insert + * after the item, instead of before it. If not inserting, we + * return DB_NOTFOUND. + */ + if ((cp->indx += P_INDX) >= NUM_ENT(cp->page)) { + if (iflagp == NULL) + return (DB_NOTFOUND); + goto use_last; + } + + /* + * Make sure we didn't go past the end of the duplicates. The + * error conditions are the same as above. + */ + if (!POSSIBLE_DUPLICATE(cp, copy)) { + if (iflagp == NULL) + return (DB_NOTFOUND); +use_last: *cp = last; + *iflagp = DB_AFTER; + return (0); + } + } + /* NOTREACHED */ +} + +/* * __bam_c_rget -- * Return the record number for a cursor. */ static int -__bam_c_rget(dbp, cp, data, flags) - DB *dbp; - CURSOR *cp; +__bam_c_rget(dbc, data, flags) + DBC *dbc; DBT *data; u_int32_t flags; { - BTREE *t; + CURSOR *cp; + DB *dbp; DBT dbt; db_recno_t recno; int exact, ret; COMPQUIET(flags, 0); + dbp = dbc->dbp; + cp = dbc->internal; /* Get the page with the current item on it. */ - if ((ret = __bam_pget(dbp, &cp->page, &cp->pgno, 0)) != 0) + if ((ret = memp_fget(dbp->mpf, &cp->pgno, 0, &cp->page)) != 0) return (ret); /* Get a copy of the key. */ @@ -481,18 +731,19 @@ __bam_c_rget(dbp, cp, data, flags) goto err; exact = 1; - if ((ret = __bam_search(dbp, &dbt, S_FIND, 1, &recno, &exact)) != 0) + if ((ret = __bam_search(dbc, &dbt, + F_ISSET(dbc, DBC_RMW) ? S_FIND_WR : S_FIND, + 1, &recno, &exact)) != 0) goto err; - t = dbp->internal; ret = __db_retcopy(data, &recno, sizeof(recno), - &t->bt_rdata.data, &t->bt_rdata.ulen, dbp->db_malloc); + &dbc->rdata.data, &dbc->rdata.ulen, dbp->db_malloc); /* Release the stack. */ - __bam_stkrel(dbp); + __bam_stkrel(dbc, 0); err: (void)memp_fput(dbp->mpf, cp->page, 0); - __db_free(dbt.data); + __os_free(dbt.data, dbt.size); return (ret); } @@ -506,62 +757,97 @@ __bam_c_put(dbc, key, data, flags) DBT *key, *data; u_int32_t flags; { - BTREE *t; CURSOR *cp, copy; DB *dbp; DBT dbt; db_indx_t indx; db_pgno_t pgno; - u_int32_t iiflags; + u_int32_t iiflags, iiop; int exact, needkey, ret, stack; void *arg; - DEBUG_LWRITE(dbc->dbp, dbc->txn, "bam_c_put", - flags == DB_KEYFIRST || flags == DB_KEYLAST ? key : NULL, - data, flags); - + dbp = dbc->dbp; cp = dbc->internal; - if ((ret = __db_cputchk(dbc->dbp, key, data, flags, - F_ISSET(dbc->dbp, DB_AM_RDONLY), cp->pgno != PGNO_INVALID)) != 0) - return (ret); + DB_PANIC_CHECK(dbp); - GETHANDLE(dbc->dbp, dbc->txn, &dbp, ret); - t = dbp->internal; + DEBUG_LWRITE(dbc, dbc->txn, "bam_c_put", + flags == DB_KEYFIRST || flags == DB_KEYLAST ? key : NULL, + data, flags); - /* Initialize the cursor for a new retrieval. */ - copy = *cp; - cp->page = NULL; - cp->lock = LOCK_INVALID; + if ((ret = __db_cputchk(dbp, key, data, flags, + F_ISSET(dbp, DB_AM_RDONLY), cp->pgno != PGNO_INVALID)) != 0) + return (ret); /* - * To split, we need a valid key for the page. Since it's a cursor, - * we have to build one. + * If we are running CDB, this had better be either a write + * cursor or an immediate writer. If it's a regular writer, + * that means we have an IWRITE lock and we need to upgrade + * it to a write lock. */ - stack = 0; + if (F_ISSET(dbp, DB_AM_CDB)) { + if (!F_ISSET(dbc, DBC_RMW | DBC_WRITER)) + return (EINVAL); + + if (F_ISSET(dbc, DBC_RMW) && + (ret = lock_get(dbp->dbenv->lk_info, dbc->locker, + DB_LOCK_UPGRADE, &dbc->lock_dbt, DB_LOCK_WRITE, + &dbc->mylock)) != 0) + return (EAGAIN); + } + if (0) { -split: /* Acquire a copy of a key from the page. */ +split: /* + * To split, we need a valid key for the page. Since it's a + * cursor, we have to build one. + * + * Acquire a copy of a key from the page. + */ if (needkey) { memset(&dbt, 0, sizeof(DBT)); if ((ret = __db_ret(dbp, cp->page, indx, - &dbt, &t->bt_rkey.data, &t->bt_rkey.ulen)) != 0) + &dbt, &dbc->rkey.data, &dbc->rkey.ulen)) != 0) goto err; arg = &dbt; } else arg = key; - /* Discard any pinned pages. */ + /* + * Discard any locks and pinned pages (the locks are discarded + * even if we're running with transactions, as they lock pages + * that we're sorry we ever acquired). If stack is set and the + * cursor entries are valid, they point to the same entries as + * the stack, don't free them twice. + */ if (stack) { - (void)__bam_stkrel(dbp); + (void)__bam_stkrel(dbc, 1); stack = 0; } else - DISCARD(dbp, cp); + DISCARD(dbc, cp); - if ((ret = __bam_split(dbp, arg)) != 0) + /* + * Restore the cursor to its original value. This is necessary + * for two reasons. First, we are about to copy it in case of + * error, again. Second, we adjust cursors during the split, + * and we have to ensure this cursor is adjusted appropriately, + * along with all the other cursors. + */ + *cp = copy; + + if ((ret = __bam_split(dbc, arg)) != 0) goto err; } - ret = 0; + /* + * Initialize the cursor for a new retrieval. Clear the cursor's + * page pointer, it was set before this operation, and no longer + * has any meaning. + */ + cp->page = NULL; + copy = *cp; + cp->lock = LOCK_INVALID; + + iiflags = needkey = ret = stack = 0; switch (flags) { case DB_AFTER: case DB_BEFORE: @@ -574,64 +860,148 @@ split: /* Acquire a copy of a key from the page. */ pgno = cp->dpgno; indx = cp->dindx; } + /* - * XXX - * This test is right -- we don't currently support duplicates - * in the presence of record numbers, so we don't worry about - * them if DB_BT_RECNUM is set. + * !!! + * This test is right -- we don't yet support duplicates and + * record numbers in the same tree, so ignore duplicates if + * DB_BT_RECNUM set. */ if (F_ISSET(dbp, DB_BT_RECNUM) && (flags != DB_CURRENT || F_ISSET(cp, C_DELETED))) { /* Acquire a complete stack. */ - if ((ret = __bam_c_getstack(dbp, cp)) != 0) + if ((ret = __bam_c_getstack(dbc, cp)) != 0) goto err; - cp->page = t->bt_csp->page; + cp->page = cp->csp->page; stack = 1; iiflags = BI_DOINCR; } else { /* Acquire the current page. */ - if ((ret = __bam_lget(dbp, + if ((ret = __bam_lget(dbc, 0, cp->pgno, DB_LOCK_WRITE, &cp->lock)) == 0) - ret = __bam_pget(dbp, &cp->page, &pgno, 0); + ret = memp_fget(dbp->mpf, &pgno, 0, &cp->page); if (ret != 0) goto err; iiflags = 0; } - if ((ret = __bam_iitem(dbp, &cp->page, - &indx, key, data, flags, iiflags)) == DB_NEEDSPLIT) - goto split; - break; - case DB_KEYFIRST: - exact = needkey = 0; - if ((ret = - __bam_c_search(dbp, cp, key, S_KEYFIRST, 0, &exact)) != 0) - goto err; - stack = 1; - indx = cp->dpgno == PGNO_INVALID ? cp->indx : cp->dindx; - if ((ret = __bam_iitem(dbp, &cp->page, &indx, key, - data, DB_BEFORE, exact ? 0 : BI_NEWKEY)) == DB_NEEDSPLIT) - goto split; + /* + * If the user has specified a duplicate comparison function, + * we return an error if DB_CURRENT was specified and the + * replacement data doesn't compare equal to the current data. + * This stops apps from screwing up the duplicate sort order. + */ + if (flags == DB_CURRENT && dbp->dup_compare != NULL) + if (__bam_cmp(dbp, data, + cp->page, indx, dbp->dup_compare) != 0) { + ret = EINVAL; + goto err; + } + + iiop = flags; break; + case DB_KEYFIRST: case DB_KEYLAST: - exact = needkey = 0; - if ((ret = - __bam_c_search(dbp, cp, key, S_KEYLAST, 0, &exact)) != 0) + /* + * If we have a duplicate comparison function, we position to + * the first of any on-page duplicates, and use __bam_dsearch + * to search for the right slot. Otherwise, we position to + * the first/last of any on-page duplicates based on the flag + * value. + */ + if ((ret = __bam_c_search(dbc, cp, key, + flags == DB_KEYFIRST || dbp->dup_compare != NULL ? + DB_KEYFIRST : DB_KEYLAST, &exact)) != 0) goto err; stack = 1; - indx = cp->dpgno == PGNO_INVALID ? cp->indx : cp->dindx; - if ((ret = __bam_iitem(dbp, &cp->page, &indx, key, - data, DB_AFTER, exact ? 0 : BI_NEWKEY)) == DB_NEEDSPLIT) - goto split; + /* + * If an exact match: + * If duplicates aren't supported, replace the current + * item. (When implementing the DB->put function, our + * caller has already checked the DB_NOOVERWRITE flag.) + * + * If there's a duplicate comparison function, find the + * correct slot for this duplicate item. + * + * If there's no duplicate comparison function, set the + * insert flag based on the argument flags. + * + * If there's no match, the search function returned the + * smallest slot greater than the key, use it. + */ + if (exact) { + if (F_ISSET(dbp, DB_AM_DUP)) { + /* + * If at off-page duplicate page, move to the + * first or last entry -- if a comparison + * function was specified, start searching at + * the first entry. Otherwise, move based on + * the DB_KEYFIRST/DB_KEYLAST flags. + */ + if ((ret = __bam_dup(dbc, cp, cp->indx, + dbp->dup_compare == NULL && + flags != DB_KEYFIRST)) != 0) + goto err; + + /* + * If there's a comparison function, search for + * the correct slot. Otherwise, set the insert + * flag based on the argment flag. + */ + if (dbp->dup_compare == NULL) + iiop = flags == DB_KEYFIRST ? + DB_BEFORE : DB_AFTER; + else + if ((ret = __bam_dsearch(dbc, + cp, data, &iiop)) != 0) + goto err; + } else + iiop = DB_CURRENT; + iiflags = 0; + } else { + iiop = DB_BEFORE; + iiflags = BI_NEWKEY; + } + + if (cp->dpgno == PGNO_INVALID) { + pgno = cp->pgno; + indx = cp->indx; + } else { + pgno = cp->dpgno; + indx = cp->dindx; + } break; } - if (ret) + + ret = __bam_iitem(dbc, &cp->page, &indx, key, data, iiop, iiflags); + + if (ret == DB_NEEDSPLIT) + goto split; + if (ret != 0) goto err; /* + * Reset any cursors referencing this item that might have the item + * marked for deletion. + */ + if (iiop == DB_CURRENT) { + (void)__bam_ca_delete(dbp, pgno, indx, 0); + + /* + * It's also possible that we are the cursor that had the + * item marked for deletion, in which case we want to make + * sure that we don't delete it because we had the delete + * flag set already. + */ + if (cp->pgno == copy.pgno && cp->indx == copy.indx && + cp->dpgno == copy.dpgno && cp->dindx == copy.dindx) + F_CLR(©, C_DELETED); + } + + /* * Update the cursor to point to the new entry. The new entry was * stored on the current page, because we split pages until it was * possible. @@ -642,17 +1012,24 @@ split: /* Acquire a copy of a key from the page. */ cp->dindx = indx; /* - * If the previous cursor record has been deleted, delete it. The - * returned key isn't a deleted key, so clear the flag. + * If the previous cursor record has been deleted, physically delete + * the entry from the page. We clear the deleted flag before we call + * the underlying delete routine so that, if an error occurs, and we + * restore the cursor, the deleted flag is cleared. This is because, + * if we manage to physically modify the page, and then restore the + * cursor, we might try to repeat the page modification when closing + * the cursor. */ - if (F_ISSET(©, C_DELETED) && - (ret = __bam_c_physdel(dbp, ©, cp->page)) != 0) - goto err; - F_CLR(cp, C_DELETED | C_REPLACE); + if (F_ISSET(©, C_DELETED)) { + F_CLR(©, C_DELETED); + if ((ret = __bam_c_physdel(dbc, ©, cp->page)) != 0) + goto err; + } + F_CLR(cp, C_DELETED); - /* Release the previous lock, if any. */ + /* Release the previous lock, if any; the current lock is retained. */ if (copy.lock != LOCK_INVALID) - (void)__BT_TLPUT(dbp, copy.lock); + (void)__BT_TLPUT(dbc, copy.lock); /* * Discard any pages pinned in the tree and their locks, except for @@ -662,23 +1039,26 @@ split: /* Acquire a copy of a key from the page. */ * we have to adjust the stack as necessary. If there was only a * single page on the stack, we don't have to free further stack pages. */ + if (stack && BT_STK_POP(cp) != NULL) + (void)__bam_stkrel(dbc, 0); - if (stack && BT_STK_POP(t) != NULL) - (void)__bam_stkrel(dbp); - + /* Release the current page. */ if ((ret = memp_fput(dbp->mpf, cp->page, 0)) != 0) goto err; if (0) { err: /* Discard any pinned pages. */ if (stack) - (void)__bam_stkrel(dbp); + (void)__bam_stkrel(dbc, 0); else - DISCARD(dbp, cp); + DISCARD(dbc, cp); *cp = copy; } - PUTHANDLE(dbp); + if (F_ISSET(dbp, DB_AM_CDB) && F_ISSET(dbc, DBC_RMW)) + (void)__lock_downgrade(dbp->dbenv->lk_info, dbc->mylock, + DB_LOCK_IWRITE, 0); + return (ret); } @@ -687,19 +1067,22 @@ err: /* Discard any pinned pages. */ * Return the first record. */ static int -__bam_c_first(dbp, cp) - DB *dbp; +__bam_c_first(dbc, cp) + DBC *dbc; CURSOR *cp; { + DB *dbp; db_pgno_t pgno; int ret; + dbp = dbc->dbp; + /* Walk down the left-hand side of the tree. */ for (pgno = PGNO_ROOT;;) { if ((ret = - __bam_lget(dbp, 0, pgno, DB_LOCK_READ, &cp->lock)) != 0) + __bam_lget(dbc, 0, pgno, DB_LOCK_READ, &cp->lock)) != 0) return (ret); - if ((ret = __bam_pget(dbp, &cp->page, &pgno, 0)) != 0) + if ((ret = memp_fget(dbp->mpf, &pgno, 0, &cp->page)) != 0) return (ret); /* If we find a leaf page, we're done. */ @@ -707,28 +1090,22 @@ __bam_c_first(dbp, cp) break; pgno = GET_BINTERNAL(cp->page, 0)->pgno; - DISCARD(dbp, cp); + DISCARD(dbc, cp); } cp->pgno = cp->page->pgno; cp->indx = 0; cp->dpgno = PGNO_INVALID; - /* If it's an empty page or a deleted record, go to the next one. */ - if (NUM_ENT(cp->page) == 0 || - B_DISSET(GET_BKEYDATA(cp->page, cp->indx + O_INDX)->type)) - if ((ret = __bam_c_next(dbp, cp, 0)) != 0) - return (ret); - - /* If it's a duplicate reference, go to the first entry. */ - if ((ret = __bam_ovfl_chk(dbp, cp, O_INDX, 0)) != 0) + /* Check for duplicates. */ + if ((ret = __bam_dup(dbc, cp, cp->indx, 0)) != 0) return (ret); - /* If it's a deleted record, go to the next one. */ - if (cp->dpgno != PGNO_INVALID && - B_DISSET(GET_BKEYDATA(cp->page, cp->dindx)->type)) - if ((ret = __bam_c_next(dbp, cp, 0)) != 0) + /* If on an empty page or a deleted record, move to the next one. */ + if (NUM_ENT(cp->page) == 0 || IS_CUR_DELETED(cp)) + if ((ret = __bam_c_next(dbc, cp, 0)) != 0) return (ret); + return (0); } @@ -737,19 +1114,22 @@ __bam_c_first(dbp, cp) * Return the last record. */ static int -__bam_c_last(dbp, cp) - DB *dbp; +__bam_c_last(dbc, cp) + DBC *dbc; CURSOR *cp; { + DB *dbp; db_pgno_t pgno; int ret; + dbp = dbc->dbp; + /* Walk down the right-hand side of the tree. */ for (pgno = PGNO_ROOT;;) { if ((ret = - __bam_lget(dbp, 0, pgno, DB_LOCK_READ, &cp->lock)) != 0) + __bam_lget(dbc, 0, pgno, DB_LOCK_READ, &cp->lock)) != 0) return (ret); - if ((ret = __bam_pget(dbp, &cp->page, &pgno, 0)) != 0) + if ((ret = memp_fget(dbp->mpf, &pgno, 0, &cp->page)) != 0) return (ret); /* If we find a leaf page, we're done. */ @@ -758,28 +1138,22 @@ __bam_c_last(dbp, cp) pgno = GET_BINTERNAL(cp->page, NUM_ENT(cp->page) - O_INDX)->pgno; - DISCARD(dbp, cp); + DISCARD(dbc, cp); } cp->pgno = cp->page->pgno; cp->indx = NUM_ENT(cp->page) == 0 ? 0 : NUM_ENT(cp->page) - P_INDX; cp->dpgno = PGNO_INVALID; - /* If it's an empty page or a deleted record, go to the previous one. */ - if (NUM_ENT(cp->page) == 0 || - B_DISSET(GET_BKEYDATA(cp->page, cp->indx + O_INDX)->type)) - if ((ret = __bam_c_prev(dbp, cp)) != 0) - return (ret); - - /* If it's a duplicate reference, go to the last entry. */ - if ((ret = __bam_ovfl_chk(dbp, cp, cp->indx + O_INDX, 1)) != 0) + /* Check for duplicates. */ + if ((ret = __bam_dup(dbc, cp, cp->indx, 1)) != 0) return (ret); - /* If it's a deleted record, go to the previous one. */ - if (cp->dpgno != PGNO_INVALID && - B_DISSET(GET_BKEYDATA(cp->page, cp->dindx)->type)) - if ((ret = __bam_c_prev(dbp, cp)) != 0) + /* If on an empty page or a deleted record, move to the next one. */ + if (NUM_ENT(cp->page) == 0 || IS_CUR_DELETED(cp)) + if ((ret = __bam_c_prev(dbc, cp)) != 0) return (ret); + return (0); } @@ -788,15 +1162,18 @@ __bam_c_last(dbp, cp) * Move to the next record. */ static int -__bam_c_next(dbp, cp, initial_move) - DB *dbp; +__bam_c_next(dbc, cp, initial_move) + DBC *dbc; CURSOR *cp; int initial_move; { + DB *dbp; db_indx_t adjust, indx; db_pgno_t pgno; int ret; + dbp = dbc->dbp; + /* * We're either moving through a page of duplicates or a btree leaf * page. @@ -812,9 +1189,9 @@ __bam_c_next(dbp, cp, initial_move) } if (cp->page == NULL) { if ((ret = - __bam_lget(dbp, 0, pgno, DB_LOCK_READ, &cp->lock)) != 0) + __bam_lget(dbc, 0, pgno, DB_LOCK_READ, &cp->lock)) != 0) return (ret); - if ((ret = __bam_pget(dbp, &cp->page, &pgno, 0)) != 0) + if ((ret = memp_fget(dbp->mpf, &pgno, 0, &cp->page)) != 0) return (ret); } @@ -832,15 +1209,13 @@ __bam_c_next(dbp, cp, initial_move) indx += adjust; for (;;) { if (indx >= NUM_ENT(cp->page)) { - pgno = cp->page->next_pgno; - DISCARD(dbp, cp); - /* * If we're in a btree leaf page, we've reached the end * of the tree. If we've reached the end of a page of * duplicates, continue from the btree leaf page where * we found this page of duplicates. */ + pgno = cp->page->next_pgno; if (pgno == PGNO_INVALID) { /* If in a btree leaf page, it's EOF. */ if (cp->dpgno == PGNO_INVALID) @@ -855,20 +1230,18 @@ __bam_c_next(dbp, cp, initial_move) } else indx = 0; - if ((ret = __bam_lget(dbp, + DISCARD(dbc, cp); + if ((ret = __bam_lget(dbc, 0, pgno, DB_LOCK_READ, &cp->lock)) != 0) return (ret); - if ((ret = __bam_pget(dbp, &cp->page, &pgno, 0)) != 0) + if ((ret = + memp_fget(dbp->mpf, &pgno, 0, &cp->page)) != 0) return (ret); continue; } /* Ignore deleted records. */ - if (dbp->type == DB_BTREE && - ((cp->dpgno == PGNO_INVALID && - B_DISSET(GET_BKEYDATA(cp->page, indx + O_INDX)->type)) || - (cp->dpgno != PGNO_INVALID && - B_DISSET(GET_BKEYDATA(cp->page, indx)->type)))) { + if (IS_DELETED(cp, indx)) { indx += adjust; continue; } @@ -882,8 +1255,7 @@ __bam_c_next(dbp, cp, initial_move) cp->pgno = cp->page->pgno; cp->indx = indx; - if ((ret = - __bam_ovfl_chk(dbp, cp, indx + O_INDX, 0)) != 0) + if ((ret = __bam_dup(dbc, cp, indx, 0)) != 0) return (ret); if (cp->dpgno != PGNO_INVALID) { indx = cp->dindx; @@ -904,14 +1276,17 @@ __bam_c_next(dbp, cp, initial_move) * Move to the previous record. */ static int -__bam_c_prev(dbp, cp) - DB *dbp; +__bam_c_prev(dbc, cp) + DBC *dbc; CURSOR *cp; { + DB *dbp; db_indx_t indx, adjust; db_pgno_t pgno; int ret, set_indx; + dbp = dbc->dbp; + /* * We're either moving through a page of duplicates or a btree leaf * page. @@ -927,9 +1302,9 @@ __bam_c_prev(dbp, cp) } if (cp->page == NULL) { if ((ret = - __bam_lget(dbp, 0, pgno, DB_LOCK_READ, &cp->lock)) != 0) + __bam_lget(dbc, 0, pgno, DB_LOCK_READ, &cp->lock)) != 0) return (ret); - if ((ret = __bam_pget(dbp, &cp->page, &pgno, 0)) != 0) + if ((ret = memp_fget(dbp->mpf, &pgno, 0, &cp->page)) != 0) return (ret); } @@ -941,15 +1316,13 @@ __bam_c_prev(dbp, cp) */ for (;;) { if (indx == 0) { - pgno = cp->page->prev_pgno; - DISCARD(dbp, cp); - /* * If we're in a btree leaf page, we've reached the * beginning of the tree. If we've reached the first * of a page of duplicates, continue from the btree * leaf page where we found this page of duplicates. */ + pgno = cp->page->prev_pgno; if (pgno == PGNO_INVALID) { /* If in a btree leaf page, it's SOF. */ if (cp->dpgno == PGNO_INVALID) @@ -965,10 +1338,12 @@ __bam_c_prev(dbp, cp) } else set_indx = 1; - if ((ret = __bam_lget(dbp, + DISCARD(dbc, cp); + if ((ret = __bam_lget(dbc, 0, pgno, DB_LOCK_READ, &cp->lock)) != 0) return (ret); - if ((ret = __bam_pget(dbp, &cp->page, &pgno, 0)) != 0) + if ((ret = + memp_fget(dbp->mpf, &pgno, 0, &cp->page)) != 0) return (ret); if (set_indx) @@ -979,11 +1354,7 @@ __bam_c_prev(dbp, cp) /* Ignore deleted records. */ indx -= adjust; - if (dbp->type == DB_BTREE && - ((cp->dpgno == PGNO_INVALID && - B_DISSET(GET_BKEYDATA(cp->page, indx + O_INDX)->type)) || - (cp->dpgno != PGNO_INVALID && - B_DISSET(GET_BKEYDATA(cp->page, indx)->type)))) + if (IS_DELETED(cp, indx)) continue; /* @@ -995,8 +1366,7 @@ __bam_c_prev(dbp, cp) cp->pgno = cp->page->pgno; cp->indx = indx; - if ((ret = - __bam_ovfl_chk(dbp, cp, indx + O_INDX, 1)) != 0) + if ((ret = __bam_dup(dbc, cp, indx, 1)) != 0) return (ret); if (cp->dpgno != PGNO_INVALID) { indx = cp->dindx + O_INDX; @@ -1017,499 +1387,261 @@ __bam_c_prev(dbp, cp) * Move to a specified record. */ static int -__bam_c_search(dbp, cp, key, flags, isrecno, exactp) - DB *dbp; +__bam_c_search(dbc, cp, key, flags, exactp) + DBC *dbc; CURSOR *cp; const DBT *key; u_int32_t flags; - int isrecno, *exactp; + int *exactp; { BTREE *t; + DB *dbp; + DB_LOCK lock; + PAGE *h; db_recno_t recno; - int needexact, ret; + db_indx_t indx; + u_int32_t sflags; + int cmp, needexact, ret; + dbp = dbc->dbp; t = dbp->internal; - needexact = *exactp; - /* - * Find any matching record; the search function pins the page. Make - * sure it's a valid key (__bam_search may return an index just past - * the end of a page) and return it. - */ - if (isrecno) { - if ((ret = __ram_getno(dbp, key, &recno, 0)) != 0) + /* Find an entry in the database. */ + switch (flags) { + case DB_SET_RECNO: + if ((ret = __ram_getno(dbc, key, &recno, 0)) != 0) return (ret); - ret = __bam_rsearch(dbp, &recno, flags, 1, exactp); - } else - ret = __bam_search(dbp, key, flags, 1, NULL, exactp); + sflags = F_ISSET(dbc, DBC_RMW) ? S_FIND_WR : S_FIND; + needexact = *exactp = 1; + ret = __bam_rsearch(dbc, &recno, sflags, 1, exactp); + break; + case DB_SET: + case DB_GET_BOTH: + sflags = F_ISSET(dbc, DBC_RMW) ? S_FIND_WR : S_FIND; + needexact = *exactp = 1; + goto search; + case DB_SET_RANGE: + sflags = F_ISSET(dbc, DBC_RMW) ? S_FIND_WR : S_FIND; + needexact = *exactp = 0; + goto search; + case DB_KEYFIRST: + sflags = S_KEYFIRST; + goto fast_search; + case DB_KEYLAST: + sflags = S_KEYLAST; +fast_search: needexact = *exactp = 0; + /* + * If the application has a history of inserting into the first + * or last pages of the database, we check those pages first to + * avoid doing a full search. + * + * Record numbers can't be fast-tracked, the entire tree has to + * be locked. + */ + h = NULL; + lock = LOCK_INVALID; + if (F_ISSET(dbp, DB_BT_RECNUM)) + goto search; + + /* Check if the application has a history of sorted input. */ + if (t->bt_lpgno == PGNO_INVALID) + goto search; + + /* + * Lock and retrieve the page on which we did the last insert. + * It's okay if it doesn't exist, or if it's not the page type + * we expected, it just means that the world changed. + */ + if (__bam_lget(dbc, 0, t->bt_lpgno, DB_LOCK_WRITE, &lock)) + goto fast_miss; + if (memp_fget(dbp->mpf, &t->bt_lpgno, 0, &h)) + goto fast_miss; + if (TYPE(h) != P_LBTREE) + goto fast_miss; + if (NUM_ENT(h) == 0) + goto fast_miss; + + /* + * What we do here is test to see if we're at the beginning or + * end of the tree and if the new item sorts before/after the + * first/last page entry. We don't try and catch inserts into + * the middle of the tree (although we could, as long as there + * were two keys on the page and we saved both the index and + * the page number of the last insert). + */ + if (h->next_pgno == PGNO_INVALID) { + indx = NUM_ENT(h) - P_INDX; + if ((cmp = + __bam_cmp(dbp, key, h, indx, t->bt_compare)) < 0) + goto try_begin; + if (cmp > 0) { + indx += P_INDX; + goto fast_hit; + } + + /* + * Found a duplicate. If doing DB_KEYLAST, we're at + * the correct position, otherwise, move to the first + * of the duplicates. + */ + if (flags == DB_KEYLAST) + goto fast_hit; + for (; + indx > 0 && h->inp[indx - P_INDX] == h->inp[indx]; + indx -= P_INDX) + ; + goto fast_hit; + } +try_begin: if (h->prev_pgno == PGNO_INVALID) { + indx = 0; + if ((cmp = + __bam_cmp(dbp, key, h, indx, t->bt_compare)) > 0) + goto fast_miss; + if (cmp < 0) + goto fast_hit; + /* + * Found a duplicate. If doing DB_KEYFIRST, we're at + * the correct position, otherwise, move to the last + * of the duplicates. + */ + if (flags == DB_KEYFIRST) + goto fast_hit; + for (; + indx < (db_indx_t)(NUM_ENT(h) - P_INDX) && + h->inp[indx] == h->inp[indx + P_INDX]; + indx += P_INDX) + ; + goto fast_hit; + } + goto fast_miss; + +fast_hit: /* Set the exact match flag, we may have found a duplicate. */ + *exactp = cmp == 0; + + /* Enter the entry in the stack. */ + BT_STK_CLR(cp); + BT_STK_ENTER(cp, h, indx, lock, ret); + break; + +fast_miss: if (h != NULL) + (void)memp_fput(dbp->mpf, h, 0); + if (lock != LOCK_INVALID) + (void)__BT_LPUT(dbc, lock); + +search: ret = __bam_search(dbc, key, sflags, 1, NULL, exactp); + break; + default: /* XXX: Impossible. */ + abort(); + /* NOTREACHED */ + } if (ret != 0) return (ret); - cp->page = t->bt_csp->page; - cp->pgno = cp->page->pgno; - cp->indx = t->bt_csp->indx; - cp->lock = t->bt_csp->lock; - cp->dpgno = PGNO_INVALID; - /* - * If we have an exact match, make sure that we're not looking at a - * chain of duplicates -- if so, move to an entry in that chain. + * Initialize the cursor to reference it. This has to be done + * before we return (even with DB_NOTFOUND) because we have to + * free the page(s) we locked in __bam_search. */ - if (*exactp) { - if ((ret = __bam_ovfl_chk(dbp, - cp, cp->indx + O_INDX, LF_ISSET(S_DUPLAST))) != 0) - return (ret); - } else - if (needexact) - return (DB_NOTFOUND); - - /* If past the end of a page, find the next entry. */ - if (cp->indx == NUM_ENT(cp->page) && - (ret = __bam_c_next(dbp, cp, 0)) != 0) - return (ret); + cp->page = cp->csp->page; + cp->pgno = cp->csp->page->pgno; + cp->indx = cp->csp->indx; + cp->lock = cp->csp->lock; + cp->dpgno = PGNO_INVALID; - /* If it's a deleted record, go to the next or previous one. */ - if (cp->dpgno != PGNO_INVALID && - B_DISSET(GET_BKEYDATA(cp->page, cp->dindx)->type)) { - if (flags == S_KEYLAST) { - if ((ret = __bam_c_prev(dbp, cp)) != 0) - return (ret); - } else - if ((ret = __bam_c_next(dbp, cp, 0)) != 0) - return (ret); - } /* - * If we don't specify an exact match (the DB_KEYFIRST/DB_KEYLAST or - * DB_SET_RANGE flags were set) __bam_search() may return a deleted - * item. For DB_KEYFIRST/DB_KEYLAST, we don't care since we're only - * using it for a tree position. For DB_SET_RANGE, we're returning - * the key, so we have to adjust it. + * If we inserted a key into the first or last slot of the tree, + * remember where it was so we can do it more quickly next time. */ - if (LF_ISSET(S_DELNO) && cp->dpgno == PGNO_INVALID && - B_DISSET(GET_BKEYDATA(cp->page, cp->indx + O_INDX)->type)) - if ((ret = __bam_c_next(dbp, cp, 0)) != 0) - return (ret); + if (flags == DB_KEYFIRST || flags == DB_KEYLAST) + t->bt_lpgno = + ((cp->page->next_pgno == PGNO_INVALID && + cp->indx >= NUM_ENT(cp->page)) || + (cp->page->prev_pgno == PGNO_INVALID && cp->indx == 0)) ? + cp->pgno : PGNO_INVALID; + + /* If we need an exact match and didn't find one, we're done. */ + if (needexact && *exactp == 0) + return (DB_NOTFOUND); return (0); } /* - * __bam_ovfl_chk -- - * Check for an overflow record, and if found, move to the correct - * record. + * __bam_dup -- + * Check for an off-page duplicates entry, and if found, move to the + * first or last entry. * - * PUBLIC: int __bam_ovfl_chk __P((DB *, CURSOR *, u_int32_t, int)); + * PUBLIC: int __bam_dup __P((DBC *, CURSOR *, u_int32_t, int)); */ int -__bam_ovfl_chk(dbp, cp, indx, to_end) - DB *dbp; +__bam_dup(dbc, cp, indx, last_dup) + DBC *dbc; CURSOR *cp; u_int32_t indx; - int to_end; + int last_dup; { BOVERFLOW *bo; + DB *dbp; db_pgno_t pgno; int ret; - /* Check for an overflow entry. */ - bo = GET_BOVERFLOW(cp->page, indx); - if (B_TYPE(bo->type) != B_DUPLICATE) - return (0); + dbp = dbc->dbp; /* - * If we find one, go to the duplicates page, and optionally move - * to the last record on that page. + * Check for an overflow entry. If we find one, move to the + * duplicates page, and optionally move to the last record on + * that page. * - * XXX + * !!! * We don't lock duplicates pages, we've already got the correct * lock on the main page. */ + bo = GET_BOVERFLOW(cp->page, indx + O_INDX); + if (B_TYPE(bo->type) != B_DUPLICATE) + return (0); + pgno = bo->pgno; if ((ret = memp_fput(dbp->mpf, cp->page, 0)) != 0) return (ret); cp->page = NULL; - if (to_end) { - if ((ret = __db_dend(dbp, pgno, &cp->page)) != 0) + if (last_dup) { + if ((ret = __db_dend(dbc, pgno, &cp->page)) != 0) return (ret); indx = NUM_ENT(cp->page) - O_INDX; } else { - if ((ret = __bam_pget(dbp, &cp->page, &pgno, 0)) != 0) + if ((ret = memp_fget(dbp->mpf, &pgno, 0, &cp->page)) != 0) return (ret); indx = 0; } - /* Update the duplicate entry in the cursor. */ + /* Update the cursor's duplicate information. */ cp->dpgno = cp->page->pgno; cp->dindx = indx; return (0); } -#ifdef DEBUG -/* - * __bam_cprint -- - * Display the current btree cursor list. - * - * PUBLIC: int __bam_cprint __P((DB *)); - */ -int -__bam_cprint(dbp) - DB *dbp; -{ - CURSOR *cp; - DBC *dbc; - - CURSOR_SETUP(dbp); - for (dbc = TAILQ_FIRST(&dbp->curs_queue); - dbc != NULL; dbc = TAILQ_NEXT(dbc, links)) { - cp = (CURSOR *)dbc->internal; - fprintf(stderr, - "%#0x: page: %lu index: %lu dpage %lu dindex: %lu", - (u_int)cp, (u_long)cp->pgno, (u_long)cp->indx, - (u_long)cp->dpgno, (u_long)cp->dindx); - if (F_ISSET(cp, C_DELETED)) - fprintf(stderr, "(deleted)"); - fprintf(stderr, "\n"); - } - CURSOR_TEARDOWN(dbp); - - return (0); -} -#endif /* DEBUG */ - -/* - * __bam_ca_delete -- - * Check if any of the cursors refer to the item we are about to delete, - * returning the number of cursors that refer to the item in question. - * - * PUBLIC: int __bam_ca_delete __P((DB *, db_pgno_t, u_int32_t, CURSOR *, int)); - */ -int -__bam_ca_delete(dbp, pgno, indx, curs, key_delete) - DB *dbp; - db_pgno_t pgno; - u_int32_t indx; - CURSOR *curs; - int key_delete; -{ - DBC *dbc; - CURSOR *cp; - int count; /* !!!: Has to contain max number of cursors. */ - - /* - * Adjust the cursors. We don't have to review the cursors for any - * process other than the current one, because we have the page write - * locked at this point, and any other process had better be using a - * different locker ID, meaning that only cursors in our process can - * be on the page. - * - * It's possible for multiple cursors within the thread to have write - * locks on the same page, but, cursors within a thread must be single - * threaded, so all we're locking here is the cursor linked list. - */ - CURSOR_SETUP(dbp); - for (count = 0, dbc = TAILQ_FIRST(&dbp->curs_queue); - dbc != NULL; dbc = TAILQ_NEXT(dbc, links)) { - cp = (CURSOR *)dbc->internal; - - /* - * Optionally, a cursor passed in is the one initiating the - * delete, so we don't want to count it or set its deleted - * flag. Otherwise, if a cursor refers to the item, then we - * set its deleted flag. - */ - if (curs == cp) - continue; - - /* - * If we're deleting the key itself and not just one of its - * duplicates, repoint the cursor to the main-page key/data - * pair, everything else is about to be discarded. - */ - if (key_delete || cp->dpgno == PGNO_INVALID) { - if (cp->pgno == pgno && cp->indx == indx) { - cp->dpgno = PGNO_INVALID; - ++count; - F_SET(cp, C_DELETED); - } - } else - if (cp->dpgno == pgno && cp->dindx == indx) { - ++count; - F_SET(cp, C_DELETED); - } - } - CURSOR_TEARDOWN(dbp); - - return (count); -} - -/* - * __bam_ca_di -- - * Adjust the cursors during a delete or insert. - * - * PUBLIC: void __bam_ca_di __P((DB *, db_pgno_t, u_int32_t, int)); - */ -void -__bam_ca_di(dbp, pgno, indx, adjust) - DB *dbp; - db_pgno_t pgno; - u_int32_t indx; - int adjust; -{ - CURSOR *cp; - DBC *dbc; - - /* Recno is responsible for its own adjustments. */ - if (dbp->type == DB_RECNO) - return; - - /* - * Adjust the cursors. See the comment in __bam_ca_delete(). - */ - CURSOR_SETUP(dbp); - for (dbc = TAILQ_FIRST(&dbp->curs_queue); - dbc != NULL; dbc = TAILQ_NEXT(dbc, links)) { - cp = (CURSOR *)dbc->internal; - if (cp->pgno == pgno && cp->indx >= indx) - cp->indx += adjust; - if (cp->dpgno == pgno && cp->dindx >= indx) - cp->dindx += adjust; - } - CURSOR_TEARDOWN(dbp); -} - -/* - * __bam_ca_dup -- - * Adjust the cursors when moving data items to a duplicates page. - * - * PUBLIC: void __bam_ca_dup __P((DB *, - * PUBLIC: db_pgno_t, u_int32_t, u_int32_t, db_pgno_t, u_int32_t)); - */ -void -__bam_ca_dup(dbp, fpgno, first, fi, tpgno, ti) - DB *dbp; - db_pgno_t fpgno, tpgno; - u_int32_t first, fi, ti; -{ - CURSOR *cp; - DBC *dbc; - - /* - * Adjust the cursors. See the comment in __bam_ca_delete(). - * - * No need to test duplicates, this only gets called when moving - * leaf page data items onto a duplicates page. - */ - CURSOR_SETUP(dbp); - for (dbc = TAILQ_FIRST(&dbp->curs_queue); - dbc != NULL; dbc = TAILQ_NEXT(dbc, links)) { - cp = (CURSOR *)dbc->internal; - /* - * Ignore matching entries that have already been moved, - * we move from the same location on the leaf page more - * than once. - */ - if (cp->dpgno == PGNO_INVALID && - cp->pgno == fpgno && cp->indx == fi) { - cp->indx = first; - cp->dpgno = tpgno; - cp->dindx = ti; - } - } - CURSOR_TEARDOWN(dbp); -} - -/* - * __bam_ca_move -- - * Adjust the cursors when moving data items to another page. - * - * PUBLIC: void __bam_ca_move __P((DB *, db_pgno_t, db_pgno_t)); - */ -void -__bam_ca_move(dbp, fpgno, tpgno) - DB *dbp; - db_pgno_t fpgno, tpgno; -{ - CURSOR *cp; - DBC *dbc; - - /* Recno is responsible for its own adjustments. */ - if (dbp->type == DB_RECNO) - return; - - /* - * Adjust the cursors. See the comment in __bam_ca_delete(). - * - * No need to test duplicates, this only gets called when copying - * over the root page with a leaf or internal page. - */ - CURSOR_SETUP(dbp); - for (dbc = TAILQ_FIRST(&dbp->curs_queue); - dbc != NULL; dbc = TAILQ_NEXT(dbc, links)) { - cp = (CURSOR *)dbc->internal; - if (cp->pgno == fpgno) - cp->pgno = tpgno; - } - CURSOR_TEARDOWN(dbp); -} - -/* - * __bam_ca_replace -- - * Check if any of the cursors refer to the item we are about to replace. - * If so, their flags should be changed from deleted to replaced. - * - * PUBLIC: void __bam_ca_replace - * PUBLIC: __P((DB *, db_pgno_t, u_int32_t, ca_replace_arg)); - */ -void -__bam_ca_replace(dbp, pgno, indx, pass) - DB *dbp; - db_pgno_t pgno; - u_int32_t indx; - ca_replace_arg pass; -{ - CURSOR *cp; - DBC *dbc; - - /* - * Adjust the cursors. See the comment in __bam_ca_delete(). - * - * Find any cursors that have logically deleted a record we're about - * to overwrite. - * - * Pass == REPLACE_SETUP: - * Set C_REPLACE_SETUP so we can find the cursors again. - * - * Pass == REPLACE_SUCCESS: - * Clear C_DELETED and C_REPLACE_SETUP, set C_REPLACE, the - * overwrite was successful. - * - * Pass == REPLACE_FAILED: - * Clear C_REPLACE_SETUP, the overwrite failed. - * - * For REPLACE_SUCCESS and REPLACE_FAILED, we reset the indx value - * for the cursor as it may have been changed by other cursor update - * routines as the item was deleted/inserted. - */ - CURSOR_SETUP(dbp); - switch (pass) { - case REPLACE_SETUP: /* Setup. */ - for (dbc = TAILQ_FIRST(&dbp->curs_queue); - dbc != NULL; dbc = TAILQ_NEXT(dbc, links)) { - cp = (CURSOR *)dbc->internal; - if ((cp->pgno == pgno && cp->indx == indx) || - (cp->dpgno == pgno && cp->dindx == indx)) - F_SET(cp, C_REPLACE_SETUP); - } - break; - case REPLACE_SUCCESS: /* Overwrite succeeded. */ - for (dbc = TAILQ_FIRST(&dbp->curs_queue); - dbc != NULL; dbc = TAILQ_NEXT(dbc, links)) { - cp = (CURSOR *)dbc->internal; - if (F_ISSET(cp, C_REPLACE_SETUP)) { - if (cp->dpgno == pgno) - cp->dindx = indx; - if (cp->pgno == pgno) - cp->indx = indx; - F_SET(cp, C_REPLACE); - F_CLR(cp, C_DELETED | C_REPLACE_SETUP); - } - } - break; - case REPLACE_FAILED: /* Overwrite failed. */ - for (dbc = TAILQ_FIRST(&dbp->curs_queue); - dbc != NULL; dbc = TAILQ_NEXT(dbc, links)) { - cp = (CURSOR *)dbc->internal; - if (F_ISSET(cp, C_REPLACE_SETUP)) { - if (cp->dpgno == pgno) - cp->dindx = indx; - if (cp->pgno == pgno) - cp->indx = indx; - F_CLR(cp, C_REPLACE_SETUP); - } - } - break; - } - CURSOR_TEARDOWN(dbp); -} - -/* - * __bam_ca_split -- - * Adjust the cursors when splitting a page. - * - * PUBLIC: void __bam_ca_split __P((DB *, - * PUBLIC: db_pgno_t, db_pgno_t, db_pgno_t, u_int32_t, int)); - */ -void -__bam_ca_split(dbp, ppgno, lpgno, rpgno, split_indx, cleft) - DB *dbp; - db_pgno_t ppgno, lpgno, rpgno; - u_int32_t split_indx; - int cleft; -{ - DBC *dbc; - CURSOR *cp; - - /* Recno is responsible for its own adjustments. */ - if (dbp->type == DB_RECNO) - return; - - /* - * Adjust the cursors. See the comment in __bam_ca_delete(). - * - * If splitting the page that a cursor was on, the cursor has to be - * adjusted to point to the same record as before the split. Most - * of the time we don't adjust pointers to the left page, because - * we're going to copy its contents back over the original page. If - * the cursor is on the right page, it is decremented by the number of - * records split to the left page. - */ - CURSOR_SETUP(dbp); - for (dbc = TAILQ_FIRST(&dbp->curs_queue); - dbc != NULL; dbc = TAILQ_NEXT(dbc, links)) { - cp = (CURSOR *)dbc->internal; - if (cp->pgno == ppgno) { - if (cp->indx < split_indx) { - if (cleft) - cp->pgno = lpgno; - } else { - cp->pgno = rpgno; - cp->indx -= split_indx; - } - } - if (cp->dpgno == ppgno) { - if (cp->dindx < split_indx) { - if (cleft) - cp->dpgno = lpgno; - } else { - cp->dpgno = rpgno; - cp->dindx -= split_indx; - } - } - } - CURSOR_TEARDOWN(dbp); -} - /* * __bam_c_physdel -- * Actually do the cursor deletion. */ static int -__bam_c_physdel(dbp, cp, h) - DB *dbp; +__bam_c_physdel(dbc, cp, h) + DBC *dbc; CURSOR *cp; PAGE *h; { enum { DELETE_ITEM, DELETE_PAGE, NOTHING_FURTHER } cmd; BOVERFLOW bo; - BTREE *t; + DB *dbp; DBT dbt; DB_LOCK lock; db_indx_t indx; db_pgno_t pgno, next_pgno, prev_pgno; int delete_page, local_page, ret; - t = dbp->internal; + dbp = dbc->dbp; + delete_page = ret = 0; /* Figure out what we're deleting. */ @@ -1522,20 +1654,37 @@ __bam_c_physdel(dbp, cp, h) } /* - * If the item is referenced by another cursor, leave it up to that - * cursor to do the delete. + * If the item is referenced by another cursor, set that cursor's + * delete flag and leave it up to it to do the delete. + * + * !!! + * This test for > 0 is a tricky. There are two ways that we can + * be called here. Either we are closing the cursor or we've moved + * off the page with the deleted entry. In the first case, we've + * already removed the cursor from the active queue, so we won't see + * it in __bam_ca_delete. In the second case, it will be on a different + * item, so we won't bother with it in __bam_ca_delete. */ - if (__bam_ca_delete(dbp, pgno, indx, cp, 0) != 0) + if (__bam_ca_delete(dbp, pgno, indx, 1) > 0) return (0); /* + * If this is concurrent DB, upgrade the lock if necessary. + */ + if (F_ISSET(dbp, DB_AM_CDB) && F_ISSET(dbc, DBC_RMW) && + (ret = lock_get(dbp->dbenv->lk_info, + dbc->locker, DB_LOCK_UPGRADE, &dbc->lock_dbt, DB_LOCK_WRITE, + &dbc->mylock)) != 0) + return (EAGAIN); + + /* * If we don't already have the page locked, get it and delete the * items. */ if ((h == NULL || h->pgno != pgno)) { - if ((ret = __bam_lget(dbp, 0, pgno, DB_LOCK_WRITE, &lock)) != 0) + if ((ret = __bam_lget(dbc, 0, pgno, DB_LOCK_WRITE, &lock)) != 0) return (ret); - if ((ret = __bam_pget(dbp, &h, &pgno, 0)) != 0) + if ((ret = memp_fget(dbp->mpf, &pgno, 0, &h)) != 0) return (ret); local_page = 1; } else @@ -1581,7 +1730,7 @@ __bam_c_physdel(dbp, cp, h) cmd = DELETE_ITEM; /* Delete the duplicate. */ - if ((ret = __db_drem(dbp, &h, indx, __bam_free)) != 0) + if ((ret = __db_drem(dbc, &h, indx, __bam_free)) != 0) goto err; /* @@ -1610,7 +1759,7 @@ __bam_c_physdel(dbp, cp, h) if (local_page) { if (h != NULL) (void)memp_fput(dbp->mpf, h, 0); - (void)__BT_TLPUT(dbp, lock); + (void)__BT_TLPUT(dbc, lock); local_page = 0; } @@ -1619,10 +1768,10 @@ __bam_c_physdel(dbp, cp, h) /* Acquire the parent page and switch the index to its entry. */ if ((ret = - __bam_lget(dbp, 0, cp->pgno, DB_LOCK_WRITE, &lock)) != 0) + __bam_lget(dbc, 0, cp->pgno, DB_LOCK_WRITE, &lock)) != 0) goto err; - if ((ret = __bam_pget(dbp, &h, &cp->pgno, 0)) != 0) { - (void)__BT_TLPUT(dbp, lock); + if ((ret = memp_fget(dbp->mpf, &cp->pgno, 0, &h)) != 0) { + (void)__BT_TLPUT(dbc, lock); goto err; } local_page = 1; @@ -1641,12 +1790,12 @@ __bam_c_physdel(dbp, cp, h) */ indx += O_INDX; bo = *GET_BOVERFLOW(h, indx); - (void)__db_ditem(dbp, h, indx, BOVERFLOW_SIZE); + (void)__db_ditem(dbc, h, indx, BOVERFLOW_SIZE); bo.pgno = next_pgno; memset(&dbt, 0, sizeof(dbt)); dbt.data = &bo; dbt.size = BOVERFLOW_SIZE; - (void)__db_pitem(dbp, h, indx, BOVERFLOW_SIZE, &dbt, NULL); + (void)__db_pitem(dbc, h, indx, BOVERFLOW_SIZE, &dbt, NULL); (void)memp_fset(dbp->mpf, h, DB_MPOOL_DIRTY); goto done; } @@ -1661,7 +1810,7 @@ btd: /* * set them is because we're (potentially) about to do a reverse split, * which would make our saved page information useless. * - * XXX + * !!! * The following operations to delete a page might deadlock. I think * that's OK. The problem is if we're deleting an item because we're * closing cursors because we've already deadlocked and want to call @@ -1680,37 +1829,44 @@ btd: /* /* * Do a normal btree delete. * - * XXX + * !!! * Delete the key item first, otherwise the duplicate checks in * __bam_ditem() won't work! */ - if ((ret = __bam_ditem(dbp, h, indx)) != 0) + if ((ret = __bam_ditem(dbc, h, indx)) != 0) goto err; - if ((ret = __bam_ditem(dbp, h, indx)) != 0) + if ((ret = __bam_ditem(dbc, h, indx)) != 0) goto err; /* Discard any remaining locks/pages. */ if (local_page) { (void)memp_fput(dbp->mpf, h, 0); - (void)__BT_TLPUT(dbp, lock); + (void)__BT_TLPUT(dbc, lock); local_page = 0; } /* Delete the page if it was emptied. */ if (delete_page) - ret = __bam_dpage(dbp, &dbt); + ret = __bam_dpage(dbc, &dbt); err: done: if (delete_page) - __db_free(dbt.data); + __os_free(dbt.data, dbt.size); if (local_page) { - (void)memp_fput(dbp->mpf, h, 0); - (void)__BT_TLPUT(dbp, lock); + /* + * It's possible for h to be NULL, as __db_drem may have + * been relinking pages by the time that it deadlocked. + */ + if (h != NULL) + (void)memp_fput(dbp->mpf, h, 0); + (void)__BT_TLPUT(dbc, lock); } - if (ret == 0) - ++t->lstat.bt_deleted; + if (F_ISSET(dbp, DB_AM_CDB) && F_ISSET(dbc, DBC_RMW)) + (void)__lock_downgrade(dbp->dbenv->lk_info, dbc->mylock, + DB_LOCK_IWRITE, 0); + return (ret); } @@ -1719,22 +1875,24 @@ done: if (delete_page) * Acquire a full stack for a cursor. */ static int -__bam_c_getstack(dbp, cp) - DB *dbp; +__bam_c_getstack(dbc, cp) + DBC *dbc; CURSOR *cp; { + DB *dbp; DBT dbt; PAGE *h; db_pgno_t pgno; int exact, ret; - ret = 0; + dbp = dbc->dbp; h = NULL; memset(&dbt, 0, sizeof(DBT)); + ret = 0; /* Get the page with the current item on it. */ pgno = cp->pgno; - if ((ret = __bam_pget(dbp, &h, &pgno, 0)) != 0) + if ((ret = memp_fget(dbp->mpf, &pgno, 0, &h)) != 0) return (ret); /* Get a copy of a key from the page. */ @@ -1744,12 +1902,12 @@ __bam_c_getstack(dbp, cp) /* Get a write-locked stack for that page. */ exact = 0; - ret = __bam_search(dbp, &dbt, S_KEYFIRST, 1, NULL, &exact); + ret = __bam_search(dbc, &dbt, S_KEYFIRST, 1, NULL, &exact); /* We no longer need the key or the page. */ err: if (h != NULL) (void)memp_fput(dbp->mpf, h, 0); if (dbt.data != NULL) - __db_free(dbt.data); + __os_free(dbt.data, dbt.size); return (ret); } |