diff options
Diffstat (limited to 'bdb/qam/qam.c')
-rw-r--r-- | bdb/qam/qam.c | 1070 |
1 files changed, 664 insertions, 406 deletions
diff --git a/bdb/qam/qam.c b/bdb/qam/qam.c index 0c9f453044f..b10f8743439 100644 --- a/bdb/qam/qam.c +++ b/bdb/qam/qam.c @@ -1,14 +1,14 @@ /*- * See the file LICENSE for redistribution information. * - * Copyright (c) 1999, 2000 + * Copyright (c) 1999-2002 * Sleepycat Software. All rights reserved. */ #include "db_config.h" #ifndef lint -static const char revid[] = "$Id: qam.c,v 11.72 2001/01/16 20:10:55 ubell Exp $"; +static const char revid[] = "$Id: qam.c,v 11.134 2002/08/13 20:46:08 ubell Exp $"; #endif /* not lint */ #ifndef NO_SYSTEM_INCLUDES @@ -18,20 +18,20 @@ static const char revid[] = "$Id: qam.c,v 11.72 2001/01/16 20:10:55 ubell Exp $" #endif #include "db_int.h" -#include "db_page.h" -#include "db_shash.h" -#include "db_am.h" -#include "mp.h" -#include "lock.h" -#include "log.h" -#include "btree.h" -#include "qam.h" - +#include "dbinc/db_page.h" +#include "dbinc/db_shash.h" +#include "dbinc/btree.h" +#include "dbinc/lock.h" +#include "dbinc/log.h" +#include "dbinc/qam.h" + +static int __qam_bulk __P((DBC *, DBT *, u_int32_t)); static int __qam_c_close __P((DBC *, db_pgno_t, int *)); static int __qam_c_del __P((DBC *)); static int __qam_c_destroy __P((DBC *)); static int __qam_c_get __P((DBC *, DBT *, DBT *, u_int32_t, db_pgno_t *)); static int __qam_c_put __P((DBC *, DBT *, DBT *, u_int32_t, db_pgno_t *)); +static int __qam_consume __P((DBC *, QMETA *, db_recno_t)); static int __qam_getno __P((DB *, const DBT *, db_recno_t *)); /* @@ -61,17 +61,16 @@ __qam_position(dbc, recnop, mode, exactp) pg = QAM_RECNO_PAGE(dbp, *recnop); if ((ret = __db_lget(dbc, 0, pg, mode == QAM_READ ? - DB_LOCK_READ : DB_LOCK_WRITE, 0, &cp->lock)) != 0) + DB_LOCK_READ : DB_LOCK_WRITE, 0, &cp->lock)) != 0) return (ret); cp->page = NULL; *exactp = 0; if ((ret = __qam_fget(dbp, &pg, - mode == QAM_WRITE ? DB_MPOOL_CREATE : 0, - &cp->page)) != 0) { + mode == QAM_WRITE ? DB_MPOOL_CREATE : 0, &cp->page)) != 0) { /* We did not fetch it, we can release the lock. */ (void)__LPUT(dbc, cp->lock); - cp->lock.off = LOCK_INVALID; - if (mode != QAM_WRITE && (ret == EINVAL || ret == ENOENT)) + if (mode != QAM_WRITE && + (ret == DB_PAGE_NOTFOUND || ret == ENOENT)) return (0); return (ret); } @@ -88,7 +87,7 @@ __qam_position(dbc, recnop, mode, exactp) } qp = QAM_GET_RECORD(dbp, cp->page, cp->indx); - *exactp = F_ISSET(qp, QAM_VALID); + *exactp = F_ISSET(qp, QAM_VALID) ? 1 : 0; return (ret); } @@ -116,9 +115,9 @@ __qam_pitem(dbc, pagep, indx, recno, data) DBT olddata, pdata, *datap; QAMDATA *qp; QUEUE *t; - u_int32_t size; + u_int32_t alloced; u_int8_t *dest, *p; - int alloced, ret; + int ret; alloced = ret = 0; @@ -131,7 +130,6 @@ __qam_pitem(dbc, pagep, indx, recno, data) qp = QAM_GET_RECORD(dbp, pagep, indx); p = qp->data; - size = data->size; datap = data; if (F_ISSET(data, DB_DBT_PARTIAL)) { if (data->doff + data->dlen > t->re_len) { @@ -159,12 +157,12 @@ len_err: __db_err(dbp->dbenv, * to log so that both this and the recovery code is simpler. */ - if (DB_LOGGING(dbc) || !F_ISSET(qp, QAM_VALID)) { + if (DBC_LOGGING(dbc) || !F_ISSET(qp, QAM_VALID)) { datap = &pdata; memset(datap, 0, sizeof(*datap)); if ((ret = __os_malloc(dbp->dbenv, - t->re_len, NULL, &datap->data)) != 0) + t->re_len, &datap->data)) != 0) return (ret); alloced = 1; datap->size = t->re_len; @@ -188,14 +186,14 @@ len_err: __db_err(dbp->dbenv, } no_partial: - if (DB_LOGGING(dbc)) { + if (DBC_LOGGING(dbc)) { olddata.size = 0; if (F_ISSET(qp, QAM_SET)) { olddata.data = qp->data; olddata.size = t->re_len; } - if ((ret = __qam_add_log(dbp->dbenv, dbc->txn, &LSN(pagep), - 0, dbp->log_fileid, &LSN(pagep), pagep->pgno, + if ((ret = __qam_add_log(dbp, dbc->txn, &LSN(pagep), + 0, &LSN(pagep), pagep->pgno, indx, recno, datap, qp->flags, olddata.size == 0 ? NULL : &olddata)) != 0) goto err; @@ -207,7 +205,7 @@ no_partial: memset(p + datap->size, t->re_pad, t->re_len - datap->size); err: if (alloced) - __os_free(datap->data, t->re_len); + __os_free(dbp->dbenv, datap->data); return (ret); } @@ -223,23 +221,37 @@ __qam_c_put(dbc, key, data, flags, pgnop) u_int32_t flags; db_pgno_t *pgnop; { - QUEUE_CURSOR *cp; DB *dbp; DB_LOCK lock; + DB_MPOOLFILE *mpf; QMETA *meta; + QUEUE_CURSOR *cp; db_pgno_t pg; db_recno_t new_cur, new_first; u_int32_t opcode; int exact, ret, t_ret; - COMPQUIET(key, NULL); - dbp = dbc->dbp; + mpf = dbp->mpf; if (pgnop != NULL) *pgnop = PGNO_INVALID; cp = (QUEUE_CURSOR *)dbc->internal; + switch (flags) { + case DB_KEYFIRST: + case DB_KEYLAST: + if ((ret = __qam_getno(dbp, key, &cp->recno)) != 0) + return (ret); + /* FALLTHROUGH */ + case DB_CURRENT: + break; + default: + /* The interface shouldn't let anything else through. */ + DB_ASSERT(0); + return (__db_ferr(dbp->dbenv, "__qam_c_put", flags)); + } + /* Write lock the record. */ if ((ret = __db_lget(dbc, 0, cp->recno, DB_LOCK_WRITE, DB_LOCK_RECORD, &lock)) != 0) @@ -252,29 +264,14 @@ __qam_c_put(dbc, key, data, flags, pgnop) return (ret); } - if (exact && flags == DB_NOOVERWRITE) { - ret = __TLPUT(dbc, lock); - /* Doing record locking, release the page lock */ - if ((t_ret = __LPUT(dbc, cp->lock)) == 0) - cp->lock.off = LOCK_INVALID; - else - if (ret == 0) - ret = t_ret; - if ((t_ret = - __qam_fput(dbp, cp->pgno, cp->page, 0)) != 0 && ret == 0) - ret = t_ret; - cp->page = NULL; - return (ret == 0 ? DB_KEYEXIST : ret); - } - /* Put the item on the page. */ ret = __qam_pitem(dbc, (QPAGE *)cp->page, cp->indx, cp->recno, data); /* Doing record locking, release the page lock */ if ((t_ret = __LPUT(dbc, cp->lock)) != 0 && ret == 0) ret = t_ret; - if ((t_ret = - __qam_fput(dbp, cp->pgno, cp->page, DB_MPOOL_DIRTY)) && ret == 0) + if ((t_ret = __qam_fput( + dbp, cp->pgno, cp->page, DB_MPOOL_DIRTY)) != 0 && ret == 0) ret = t_ret; cp->page = NULL; cp->lock = lock; @@ -284,11 +281,15 @@ __qam_c_put(dbc, key, data, flags, pgnop) /* We may need to reset the head or tail of the queue. */ pg = ((QUEUE *)dbp->q_internal)->q_meta; - if ((ret = __db_lget(dbc, 0, pg, DB_LOCK_WRITE, 0, &lock)) != 0) + + /* + * Get the meta page first, we don't want to write lock it while + * trying to pin it. + */ + if ((ret = mpf->get(mpf, &pg, 0, &meta)) != 0) return (ret); - if ((ret = memp_fget(dbp->mpf, &pg, 0, &meta)) != 0) { - /* We did not fetch it, we can release the lock. */ - (void)__LPUT(dbc, lock); + if ((ret = __db_lget(dbc, 0, pg, DB_LOCK_WRITE, 0, &lock)) != 0) { + (void)mpf->put(mpf, meta, 0); return (ret); } @@ -313,7 +314,8 @@ __qam_c_put(dbc, key, data, flags, pgnop) } else { if (QAM_BEFORE_FIRST(meta, cp->recno) && (meta->first_recno <= meta->cur_recno || - meta->first_recno - cp->recno < cp->recno - meta->cur_recno)) { + meta->first_recno - cp->recno < + cp->recno - meta->cur_recno)) { new_first = cp->recno; opcode |= QAM_SETFIRST; } @@ -321,7 +323,8 @@ __qam_c_put(dbc, key, data, flags, pgnop) if (meta->cur_recno == cp->recno || (QAM_AFTER_CURRENT(meta, cp->recno) && (meta->first_recno <= meta->cur_recno || - cp->recno - meta->cur_recno <= meta->first_recno - cp->recno))) { + cp->recno - meta->cur_recno <= + meta->first_recno - cp->recno))) { new_cur = cp->recno + 1; if (new_cur == RECNO_OOB) new_cur++; @@ -329,10 +332,12 @@ __qam_c_put(dbc, key, data, flags, pgnop) } } - if (opcode != 0 && DB_LOGGING(dbc)) { - ret = __qam_mvptr_log(dbp->dbenv, dbc->txn, &meta->dbmeta.lsn, - 0, opcode, dbp->log_fileid, meta->first_recno, new_first, - meta->cur_recno, new_cur, &meta->dbmeta.lsn); + if (opcode != 0 && DBC_LOGGING(dbc)) { + ret = __qam_mvptr_log(dbp, dbc->txn, &meta->dbmeta.lsn, + 0, opcode, meta->first_recno, new_first, + meta->cur_recno, new_cur, &meta->dbmeta.lsn, PGNO_BASE_MD); + if (ret != 0) + opcode = 0; } if (opcode & QAM_SETCUR) @@ -340,9 +345,8 @@ __qam_c_put(dbc, key, data, flags, pgnop) if (opcode & QAM_SETFIRST) meta->first_recno = new_first; - if ((t_ret = - memp_fput(dbp->mpf, meta, opcode != 0 ? DB_MPOOL_DIRTY : 0)) != 0 && - ret == 0) + if ((t_ret = mpf->put( + mpf, meta, opcode != 0 ? DB_MPOOL_DIRTY : 0)) != 0 && ret == 0) ret = t_ret; /* Don't hold the meta page long term. */ @@ -352,70 +356,42 @@ __qam_c_put(dbc, key, data, flags, pgnop) } /* - * __qam_put -- - * Add a record to the queue. - * If we are doing anything but appending, just call qam_c_put to do the - * work. Otherwise we fast path things here. + * __qam_append -- + * Perform a put(DB_APPEND) in queue. * - * PUBLIC: int __qam_put __P((DB *, DB_TXN *, DBT *, DBT *, u_int32_t)); + * PUBLIC: int __qam_append __P((DBC *, DBT *, DBT *)); */ int -__qam_put(dbp, txn, key, data, flags) - DB *dbp; - DB_TXN *txn; +__qam_append(dbc, key, data) + DBC *dbc; DBT *key, *data; - u_int32_t flags; { - QUEUE_CURSOR *cp; - DBC *dbc; + DB *dbp; DB_LOCK lock; + DB_MPOOLFILE *mpf; QMETA *meta; QPAGE *page; QUEUE *qp; + QUEUE_CURSOR *cp; db_pgno_t pg; db_recno_t recno; int ret, t_ret; - PANIC_CHECK(dbp->dbenv); - DB_CHECK_TXN(dbp, txn); - - /* Allocate a cursor. */ - if ((ret = dbp->cursor(dbp, txn, &dbc, DB_WRITELOCK)) != 0) - return (ret); - - DEBUG_LWRITE(dbc, dbc->txn, "qam_put", key, data, flags); - + dbp = dbc->dbp; + mpf = dbp->mpf; cp = (QUEUE_CURSOR *)dbc->internal; - /* Check for invalid flags. */ - if ((ret = __db_putchk(dbp, - key, data, flags, F_ISSET(dbp, DB_AM_RDONLY), 0)) != 0) - goto done; - - /* If not appending, then just call the cursor routine */ - if (flags != DB_APPEND) { - if ((ret = __qam_getno(dbp, key, &cp->recno)) != 0) - goto done; - - ret = __qam_c_put(dbc, NULL, data, flags, NULL); - goto done; - } - - /* Write lock the meta page. */ pg = ((QUEUE *)dbp->q_internal)->q_meta; - if ((ret = __db_lget(dbc, 0, pg, DB_LOCK_WRITE, 0, &lock)) != 0) - goto done; - if ((ret = memp_fget(dbp->mpf, &pg, 0, &meta)) != 0) { - /* We did not fetch it, we can release the lock. */ - (void)__LPUT(dbc, lock); - goto done; - } - - /* Record that we are going to allocate a record. */ - if (DB_LOGGING(dbc)) { - __qam_inc_log(dbp->dbenv, - dbc->txn, &meta->dbmeta.lsn, - 0, dbp->log_fileid, &meta->dbmeta.lsn); + /* + * Get the meta page first, we don't want to write lock it while + * trying to pin it. + */ + if ((ret = mpf->get(mpf, &pg, 0, &meta)) != 0) + return (ret); + /* Write lock the meta page. */ + if ((ret = __db_lget(dbc, 0, pg, DB_LOCK_WRITE, 0, &lock)) != 0) { + (void)mpf->put(mpf, meta, 0); + return (ret); } /* Get the next record number. */ @@ -436,15 +412,17 @@ __qam_put(dbp, txn, key, data, flags) meta->first_recno = recno; /* Lock the record and release meta page lock. */ - if ((ret = __db_lget(dbc, - 1, recno, DB_LOCK_WRITE, DB_LOCK_RECORD, &lock)) != 0) + if ((ret = __db_lget(dbc, LCK_COUPLE_ALWAYS, + recno, DB_LOCK_WRITE, DB_LOCK_RECORD, &lock)) != 0) { + (void)__LPUT(dbc, lock); goto err; + } /* * The application may modify the data based on the selected record * number. */ - if (flags == DB_APPEND && dbc->dbp->db_append_recno != NULL && + if (dbc->dbp->db_append_recno != NULL && (ret = dbc->dbp->db_append_recno(dbc->dbp, data, recno)) != 0) { (void)__LPUT(dbc, lock); goto err; @@ -484,16 +462,20 @@ __qam_put(dbp, txn, key, data, flags) /* Return the record number to the user. */ if (ret == 0) - ret = __db_retcopy(dbp, key, - &recno, sizeof(recno), &dbc->rkey.data, &dbc->rkey.ulen); + ret = __db_retcopy(dbp->dbenv, key, + &recno, sizeof(recno), &dbc->rkey->data, &dbc->rkey->ulen); + + /* Position the cursor on this record. */ + cp->recno = recno; /* See if we are leaving the extent. */ qp = (QUEUE *) dbp->q_internal; - if (qp->page_ext != 0 - && (recno % (qp->page_ext * qp->rec_page) == 0 - || recno == UINT32_T_MAX)) { - if ((ret = - __db_lget(dbc, 0, pg, DB_LOCK_WRITE, 0, &lock)) != 0) + if (qp->page_ext != 0 && + (recno % (qp->page_ext * qp->rec_page) == 0 || + recno == UINT32_T_MAX)) { + if ((ret = __db_lget(dbc, + 0, ((QUEUE *)dbp->q_internal)->q_meta, + DB_LOCK_WRITE, 0, &lock)) != 0) goto err; if (!QAM_AFTER_CURRENT(meta, recno)) ret = __qam_fclose(dbp, pg); @@ -502,13 +484,7 @@ __qam_put(dbp, txn, key, data, flags) err: /* Release the meta page. */ - if ((t_ret - = memp_fput(dbp->mpf, meta, DB_MPOOL_DIRTY)) != 0 && ret == 0) - ret = t_ret; - -done: - /* Discard the cursor. */ - if ((t_ret = dbc->c_close(dbc)) != 0 && ret == 0) + if ((t_ret = mpf->put(mpf, meta, DB_MPOOL_DIRTY)) != 0 && ret == 0) ret = t_ret; return (ret); @@ -522,50 +498,57 @@ static int __qam_c_del(dbc) DBC *dbc; { - QUEUE_CURSOR *cp; DB *dbp; DBT data; DB_LOCK lock; + DB_MPOOLFILE *mpf; PAGE *pagep; QAMDATA *qp; QMETA *meta; + QUEUE_CURSOR *cp; db_pgno_t pg; + db_recno_t first; int exact, ret, t_ret; dbp = dbc->dbp; + mpf = dbp->mpf; cp = (QUEUE_CURSOR *)dbc->internal; pg = ((QUEUE *)dbp->q_internal)->q_meta; - if ((ret = __db_lget(dbc, 0, pg, DB_LOCK_READ, 0, &lock)) != 0) + /* + * Get the meta page first, we don't want to write lock it while + * trying to pin it. + */ + if ((ret = mpf->get(mpf, &pg, 0, &meta)) != 0) return (ret); - if ((ret = memp_fget(dbp->mpf, &pg, 0, &meta)) != 0) { - /* We did not fetch it, we can release the lock. */ - (void)__LPUT(dbc, lock); + /* Write lock the meta page. */ + if ((ret = __db_lget(dbc, 0, pg, DB_LOCK_READ, 0, &lock)) != 0) { + (void)mpf->put(mpf, meta, 0); return (ret); } if (QAM_NOT_VALID(meta, cp->recno)) ret = DB_NOTFOUND; + first = meta->first_recno; + /* Don't hold the meta page long term. */ if ((t_ret = __LPUT(dbc, lock)) != 0 && ret == 0) ret = t_ret; - if ((t_ret = memp_fput(dbp->mpf, meta, 0)) != 0 && ret == 0) - ret = t_ret; if (ret != 0) - return (ret); + goto err1; if ((ret = __db_lget(dbc, 0, cp->recno, DB_LOCK_WRITE, DB_LOCK_RECORD, &lock)) != 0) - return (ret); + goto err1; cp->lock_mode = DB_LOCK_WRITE; /* Find the record ; delete only deletes exact matches. */ if ((ret = __qam_position(dbc, &cp->recno, QAM_WRITE, &exact)) != 0) { cp->lock = lock; - return (ret); + goto err1; } if (!exact) { ret = DB_NOTFOUND; @@ -575,21 +558,18 @@ __qam_c_del(dbc) pagep = cp->page; qp = QAM_GET_RECORD(dbp, pagep, cp->indx); - if (DB_LOGGING(dbc)) { - if (((QUEUE *)dbp->q_internal)->page_ext == 0 - || ((QUEUE *)dbp->q_internal)->re_len == 0) { - if ((ret = - __qam_del_log(dbp->dbenv, - dbc->txn, &LSN(pagep), 0, - dbp->log_fileid, &LSN(pagep), + if (DBC_LOGGING(dbc)) { + if (((QUEUE *)dbp->q_internal)->page_ext == 0 || + ((QUEUE *)dbp->q_internal)->re_len == 0) { + if ((ret = __qam_del_log(dbp, + dbc->txn, &LSN(pagep), 0, &LSN(pagep), pagep->pgno, cp->indx, cp->recno)) != 0) goto err1; } else { data.size = ((QUEUE *)dbp->q_internal)->re_len; data.data = qp->data; - if ((ret = - __qam_delext_log(dbp->dbenv, dbc->txn, - &LSN(pagep), 0, dbp->log_fileid, &LSN(pagep), + if ((ret = __qam_delext_log(dbp, + dbc->txn, &LSN(pagep), 0, &LSN(pagep), pagep->pgno, cp->indx, cp->recno, &data)) != 0) goto err1; } @@ -597,60 +577,28 @@ __qam_c_del(dbc) F_CLR(qp, QAM_VALID); -err1: - if ((t_ret = __qam_fput( - dbp, cp->pgno, cp->page, ret == 0 ? DB_MPOOL_DIRTY : 0)) != 0) - return (ret ? ret : t_ret); - cp->page = NULL; - /* Doing record locking, release the page lock */ - if ((t_ret = __LPUT(dbc, cp->lock)) != 0) { - cp->lock = lock; - return (ret ? ret : t_ret); + if (cp->recno == first) { + pg = ((QUEUE *)dbp->q_internal)->q_meta; + if ((ret = + __db_lget(dbc, 0, pg, DB_LOCK_WRITE, 0, &lock)) != 0) + goto err1; + ret = __qam_consume(dbc, meta, first); + if ((t_ret = __LPUT(dbc, lock)) != 0 && ret == 0) + ret = t_ret; } - cp->lock = lock; - return (ret); -} -/* - * __qam_delete -- - * Queue db->del function. - * - * PUBLIC: int __qam_delete __P((DB *, DB_TXN *, DBT *, u_int32_t)); - */ -int -__qam_delete(dbp, txn, key, flags) - DB *dbp; - DB_TXN *txn; - DBT *key; - u_int32_t flags; -{ - QUEUE_CURSOR *cp; - DBC *dbc; - int ret, t_ret; - - PANIC_CHECK(dbp->dbenv); - DB_CHECK_TXN(dbp, txn); - - /* Check for invalid flags. */ - if ((ret = - __db_delchk(dbp, key, flags, F_ISSET(dbp, DB_AM_RDONLY))) != 0) - return (ret); - - /* Acquire a cursor. */ - if ((ret = dbp->cursor(dbp, txn, &dbc, DB_WRITELOCK)) != 0) - return (ret); - - DEBUG_LWRITE(dbc, txn, "qam_delete", key, NULL, flags); - - cp = (QUEUE_CURSOR *)dbc->internal; - if ((ret = __qam_getno(dbp, key, &cp->recno)) != 0) - goto err; - - ret = __qam_c_del(dbc); +err1: + if ((t_ret = mpf->put(mpf, meta, 0)) != 0 && ret == 0) + ret = t_ret; + if (cp->page != NULL && (t_ret = __qam_fput(dbp, cp->pgno, + cp->page, ret == 0 ? DB_MPOOL_DIRTY : 0)) != 0 && ret == 0) + ret = t_ret; + cp->page = NULL; - /* Release the cursor. */ -err: if ((t_ret = dbc->c_close(dbc)) != 0 && ret == 0) + /* Doing record locking, release the page lock */ + if ((t_ret = __LPUT(dbc, cp->lock)) != 0 && ret == 0) ret = t_ret; + cp->lock = lock; return (ret); } @@ -671,39 +619,40 @@ __qam_c_get(dbc, key, data, flags, pgnop) db_pgno_t *pgnop; { DB *dbp; - DB_LOCK lock, pglock, metalock, save_lock; + DBC *dbcdup; DBT tmp; + DB_ENV *dbenv; + DB_LOCK lock, pglock, metalock; + DB_MPOOLFILE *mpf; PAGE *pg; QAMDATA *qp; QMETA *meta; QUEUE *t; QUEUE_CURSOR *cp; - db_indx_t save_indx; db_lockmode_t lock_mode; - db_pgno_t metapno, save_page; - db_recno_t current, first, save_recno; + db_pgno_t metapno; + db_recno_t first; qam_position_mode mode; - u_int32_t rec_extent; int exact, is_first, locked, ret, t_ret, wait, with_delete; - int put_mode, meta_dirty, retrying, skip_again, wrapped; + int put_mode, meta_dirty, retrying; - cp = (QUEUE_CURSOR *)dbc->internal; dbp = dbc->dbp; + dbenv = dbp->dbenv; + mpf = dbp->mpf; + cp = (QUEUE_CURSOR *)dbc->internal; - PANIC_CHECK(dbp->dbenv); + PANIC_CHECK(dbenv); wait = 0; with_delete = 0; retrying = 0; - rec_extent = 0; lock_mode = DB_LOCK_READ; - mode = QAM_READ; put_mode = 0; t_ret = 0; *pgnop = 0; pg = NULL; - skip_again = 0; + mode = QAM_READ; if (F_ISSET(dbc, DBC_RMW)) { lock_mode = DB_LOCK_WRITE; mode = QAM_WRITE; @@ -714,7 +663,9 @@ __qam_c_get(dbc, key, data, flags, pgnop) flags = DB_CONSUME; } if (flags == DB_CONSUME) { - DB_CHECK_TXN(dbp, dbc->txn); + if ((ret = __db_check_txn(dbp, dbc->txn, dbc->locker, 0)) != 0) + return (ret); + with_delete = 1; flags = DB_FIRST; lock_mode = DB_LOCK_WRITE; @@ -724,30 +675,30 @@ __qam_c_get(dbc, key, data, flags, pgnop) DEBUG_LREAD(dbc, dbc->txn, "qam_c_get", flags == DB_SET || flags == DB_SET_RANGE ? key : NULL, NULL, flags); + /* Make lint and friends happy. */ + meta_dirty = 0; + locked = 0; + is_first = 0; t = (QUEUE *)dbp->q_internal; - /* get the meta page */ metapno = t->q_meta; - if ((ret = __db_lget(dbc, 0, metapno, lock_mode, 0, &metalock)) != 0) + + /* + * Get the meta page first, we don't want to write lock it while + * trying to pin it. This is because someone my have it pinned + * but not locked. + */ + if ((ret = mpf->get(mpf, &metapno, 0, &meta)) != 0) return (ret); + if ((ret = __db_lget(dbc, 0, metapno, lock_mode, 0, &metalock)) != 0) + goto err; locked = 1; - if ((ret = memp_fget(dbp->mpf, &metapno, 0, &meta)) != 0) { - /* We did not fetch it, we can release the lock. */ - (void)__LPUT(dbc, metalock); - return (ret); - } first = 0; - /* Make lint and friends happy. */ - meta_dirty = 0; - /* Release any previous lock if not in a transaction. */ - if (cp->lock.off != LOCK_INVALID) { - (void)__TLPUT(dbc, cp->lock); - cp->lock.off = LOCK_INVALID; - } + (void)__TLPUT(dbc, cp->lock); retry: /* Update the record number. */ switch (flags) { @@ -778,8 +729,8 @@ retry: /* Update the record number. */ case DB_PREV: case DB_PREV_NODUP: if (cp->recno != RECNO_OOB) { - if (QAM_BEFORE_FIRST(meta, cp->recno) - || cp->recno == meta->first_recno) { + if (QAM_BEFORE_FIRST(meta, cp->recno) || + cp->recno == meta->first_recno) { ret = DB_NOTFOUND; goto err; } @@ -799,14 +750,15 @@ retry: /* Update the record number. */ if (cp->recno == RECNO_OOB) cp->recno--; break; - case DB_GET_BOTH: case DB_SET: case DB_SET_RANGE: + case DB_GET_BOTH: + case DB_GET_BOTH_RANGE: if ((ret = __qam_getno(dbp, key, &cp->recno)) != 0) goto err; break; default: - ret = __db_unknown_flag(dbp->dbenv, "__qam_c_get", flags); + ret = __db_unknown_flag(dbenv, "__qam_c_get", flags); goto err; } @@ -830,14 +782,16 @@ retry: /* Update the record number. */ retrying = 1; goto retry; } - if (CDB_LOCKING(dbp->dbenv)) { - if ((ret = lock_get(dbp->dbenv, dbc->locker, + if (CDB_LOCKING(dbenv)) { + if ((ret = dbenv->lock_get( + dbenv, dbc->locker, DB_LOCK_SWITCH, &dbc->lock_dbt, DB_LOCK_WAIT, &dbc->mylock)) != 0) goto err; - if ((ret = lock_get(dbp->dbenv, dbc->locker, - DB_LOCK_UPGRADE, &dbc->lock_dbt, DB_LOCK_WRITE, - &dbc->mylock)) != 0) + if ((ret = dbenv->lock_get( + dbenv, dbc->locker, + DB_LOCK_UPGRADE, &dbc->lock_dbt, + DB_LOCK_WRITE, &dbc->mylock)) != 0) goto err; goto retry; } @@ -859,7 +813,7 @@ retry: /* Update the record number. */ if ((ret = __db_lget(dbc, 0, metapno, DB_LOCK_WAIT, DB_LOCK_SWITCH, &metalock)) != 0) goto err; - if ((ret = lock_get(dbp->dbenv, dbc->locker, + if ((ret = dbenv->lock_get(dbenv, dbc->locker, DB_LOCK_UPGRADE, &dbc->lock_dbt, DB_LOCK_WRITE, &metalock)) != 0) goto err; @@ -883,11 +837,15 @@ retry: /* Update the record number. */ DB_LOCK_NOWAIT | DB_LOCK_RECORD : DB_LOCK_RECORD, &lock)) == DB_LOCK_NOTGRANTED && with_delete) { #ifdef QDEBUG - __db_logmsg(dbp->dbenv, + __db_logmsg(dbenv, dbc->txn, "Queue S", 0, "%x %d %d %d", dbc->locker, cp->recno, first, meta->first_recno); #endif first = 0; + if ((ret = + __db_lget(dbc, 0, metapno, lock_mode, 0, &metalock)) != 0) + goto err; + locked = 1; goto retry; } @@ -929,9 +887,9 @@ retry: /* Update the record number. */ cp->lock_mode = lock_mode; if (!exact) { - if (flags == DB_NEXT || flags == DB_NEXT_NODUP - || flags == DB_PREV || flags == DB_PREV_NODUP - || flags == DB_LAST) { + if (flags == DB_NEXT || flags == DB_NEXT_NODUP || + flags == DB_PREV || flags == DB_PREV_NODUP || + flags == DB_LAST) { /* Release locks and try again. */ if (pg != NULL) (void)__qam_fput(dbp, cp->pgno, pg, 0); @@ -951,18 +909,20 @@ retry: /* Update the record number. */ } /* Return the key if the user didn't give us one. */ - if (key != NULL && flags != DB_SET && flags != DB_GET_BOTH && - (ret = __db_retcopy(dbp, key, &cp->recno, sizeof(cp->recno), - &dbc->rkey.data, &dbc->rkey.ulen)) != 0) - goto err1; - - if (key != NULL) + if (key != NULL) { + if (flags != DB_GET_BOTH && flags != DB_GET_BOTH_RANGE && + flags != DB_SET && flags != DB_SET_RANGE && + (ret = __db_retcopy(dbp->dbenv, + key, &cp->recno, sizeof(cp->recno), + &dbc->rkey->data, &dbc->rkey->ulen)) != 0) + goto err1; F_SET(key, DB_DBT_ISSET); + } qp = QAM_GET_RECORD(dbp, pg, cp->indx); /* Return the data item. */ - if (flags == DB_GET_BOTH) { + if (flags == DB_GET_BOTH || flags == DB_GET_BOTH_RANGE) { /* * Need to compare */ @@ -973,8 +933,10 @@ retry: /* Update the record number. */ goto err1; } } - if (data != NULL && (ret = __db_retcopy(dbp, data, - qp->data, t->re_len, &dbc->rdata.data, &dbc->rdata.ulen)) != 0) + if (data != NULL && + !F_ISSET(dbc, DBC_MULTIPLE|DBC_MULTIPLE_KEY) && + (ret = __db_retcopy(dbp->dbenv, data, + qp->data, t->re_len, &dbc->rdata->data, &dbc->rdata->ulen)) != 0) goto err1; if (data != NULL) @@ -982,18 +944,53 @@ retry: /* Update the record number. */ /* Finally, if we are doing DB_CONSUME mark the record. */ if (with_delete) { - if (DB_LOGGING(dbc)) { + /* + * Assert that we're not a secondary index. Doing a DB_CONSUME + * on a secondary makes very little sense, since one can't + * DB_APPEND there; attempting one should be forbidden by + * the interface. + */ + DB_ASSERT(!F_ISSET(dbp, DB_AM_SECONDARY)); + + /* + * Check and see if we *have* any secondary indices. + * If we do, we're a primary, so call __db_c_del_primary + * to delete the references to the item we're about to + * delete. + * + * Note that we work on a duplicated cursor, since the + * __db_ret work has already been done, so it's not safe + * to perform any additional ops on this cursor. + */ + if (LIST_FIRST(&dbp->s_secondaries) != NULL) { + if ((ret = __db_c_idup(dbc, + &dbcdup, DB_POSITIONI)) != 0) + goto err1; + + if ((ret = __db_c_del_primary(dbcdup)) != 0) { + /* + * The __db_c_del_primary return is more + * interesting. + */ + (void)dbcdup->c_close(dbcdup); + goto err1; + } + + if ((ret = dbcdup->c_close(dbcdup)) != 0) + goto err1; + } + + if (DBC_LOGGING(dbc)) { if (t->page_ext == 0 || t->re_len == 0) { - if ((ret = __qam_del_log(dbp->dbenv, dbc->txn, - &LSN(pg), 0, dbp->log_fileid, &LSN(pg), + if ((ret = __qam_del_log(dbp, dbc->txn, + &LSN(pg), 0, &LSN(pg), pg->pgno, cp->indx, cp->recno)) != 0) goto err1; } else { tmp.data = qp->data; tmp.size = t->re_len; - if ((ret = - __qam_delext_log(dbp->dbenv, dbc->txn, - &LSN(pg), 0, dbp->log_fileid, &LSN(pg), + if ((ret = __qam_delext_log(dbp, + dbc->txn, &LSN(pg), 0, &LSN(pg), pg->pgno, cp->indx, cp->recno, &tmp)) != 0) goto err1; } @@ -1003,7 +1000,7 @@ retry: /* Update the record number. */ put_mode = DB_MPOOL_DIRTY; if ((ret = __LPUT(dbc, pglock)) != 0) - goto err; + goto err1; /* * Now we need to update the metapage @@ -1021,8 +1018,9 @@ retry: /* Update the record number. */ dbc, 0, metapno, lock_mode, 0, &metalock)) != 0) goto err1; locked = 1; + #ifdef QDEBUG - __db_logmsg(dbp->dbenv, + __db_logmsg(dbenv, dbc->txn, "Queue D", 0, "%x %d %d %d", dbc->locker, cp->recno, first, meta->first_recno); #endif @@ -1037,190 +1035,380 @@ retry: /* Update the record number. */ if (first != meta->first_recno) goto done; - save_page = cp->pgno; - save_indx = cp->indx; - save_recno = cp->recno; - save_lock = cp->lock; + if ((ret = __qam_consume(dbc, meta, first)) != 0) + goto err1; + } - /* - * If we skipped some deleted records, we need to - * reposition on the first one. Get a lock - * in case someone is trying to put it back. - */ - if (first != cp->recno) { - ret = __db_lget(dbc, 0, first, DB_LOCK_READ, - DB_LOCK_NOWAIT | DB_LOCK_RECORD, &lock); - if (ret == DB_LOCK_NOTGRANTED) { - ret = 0; - goto done; - } - if (ret != 0) - goto err1; - if ((ret = - __qam_fput(dbp, cp->pgno, cp->page, put_mode)) != 0) - goto err1; - cp->page = NULL; - put_mode = 0; - if ((ret = __qam_position(dbc, - &first, QAM_READ, &exact)) != 0 || exact != 0) { - (void)__LPUT(dbc, lock); - goto err1; - } - if ((ret =__LPUT(dbc, lock)) != 0) - goto err1; - if ((ret = __LPUT(dbc, cp->lock)) != 0) - goto err1; +done: +err1: if (cp->page != NULL) { + t_ret = __qam_fput(dbp, cp->pgno, cp->page, put_mode); + + if (!ret) + ret = t_ret; + /* Doing record locking, release the page lock */ + t_ret = __LPUT(dbc, pglock); + cp->page = NULL; + } + +err: if (!ret) + ret = t_ret; + if (meta) { + + /* release the meta page */ + t_ret = mpf->put(mpf, meta, meta_dirty ? DB_MPOOL_DIRTY : 0); + + if (!ret) + ret = t_ret; + + /* Don't hold the meta page long term. */ + if (locked) + t_ret = __LPUT(dbc, metalock); + } + DB_ASSERT(!LOCK_ISSET(metalock)); + + /* + * There is no need to keep the record locked if we are + * not in a transaction. + */ + if (t_ret == 0) + t_ret = __TLPUT(dbc, cp->lock); + + return (ret ? ret : t_ret); +} + +/* + * __qam_consume -- try to reset the head of the queue. + * + */ + +static int +__qam_consume(dbc, meta, first) + DBC *dbc; + QMETA *meta; + db_recno_t first; +{ + DB *dbp; + DB_LOCK lock, save_lock; + DB_MPOOLFILE *mpf; + QUEUE_CURSOR *cp; + db_indx_t save_indx; + db_pgno_t save_page; + db_recno_t current, save_recno; + u_int32_t rec_extent; + int exact, put_mode, ret, t_ret, wrapped; + + dbp = dbc->dbp; + mpf = dbp->mpf; + cp = (QUEUE_CURSOR *)dbc->internal; + put_mode = DB_MPOOL_DIRTY; + ret = t_ret = 0; + + save_page = cp->pgno; + save_indx = cp->indx; + save_recno = cp->recno; + save_lock = cp->lock; + + /* + * If we skipped some deleted records, we need to + * reposition on the first one. Get a lock + * in case someone is trying to put it back. + */ + if (first != cp->recno) { + ret = __db_lget(dbc, 0, first, DB_LOCK_READ, + DB_LOCK_NOWAIT | DB_LOCK_RECORD, &lock); + if (ret == DB_LOCK_NOTGRANTED) { + ret = 0; + goto done; } + if (ret != 0) + goto done; + if ((ret = + __qam_fput(dbp, cp->pgno, cp->page, put_mode)) != 0) + goto done; + cp->page = NULL; + put_mode = 0; + if ((ret = __qam_position(dbc, + &first, QAM_READ, &exact)) != 0 || exact != 0) { + (void)__LPUT(dbc, lock); + goto done; + } + if ((ret =__LPUT(dbc, lock)) != 0) + goto done; + if ((ret = __LPUT(dbc, cp->lock)) != 0) + goto done; + } - current = meta->cur_recno; - wrapped = 0; - if (first > current) - wrapped = 1; - rec_extent = meta->page_ext * meta->rec_page; + current = meta->cur_recno; + wrapped = 0; + if (first > current) + wrapped = 1; + rec_extent = meta->page_ext * meta->rec_page; - /* Loop until we find a record or hit current */ - for (;;) { - /* - * Check to see if we are moving off the extent - * and remove the extent. - * If we are moving off a page we need to - * get rid of the buffer. - * Wait for the lagging readers to move off the - * page. - */ - if (rec_extent != 0 - && ((exact = first % rec_extent == 0) - || first % meta->rec_page == 0 - || first == UINT32_T_MAX)) { - if (exact == 1 && (ret = __db_lget(dbc, - 0, cp->pgno, DB_LOCK_WRITE, 0, &cp->lock)) != 0) - break; + /* Loop until we find a record or hit current */ + for (;;) { + /* + * Check to see if we are moving off the extent + * and remove the extent. + * If we are moving off a page we need to + * get rid of the buffer. + * Wait for the lagging readers to move off the + * page. + */ + if (cp->page != NULL && rec_extent != 0 && + ((exact = (first % rec_extent == 0)) || + first % meta->rec_page == 0 || + first == UINT32_T_MAX)) { + if (exact == 1 && (ret = __db_lget(dbc, + 0, cp->pgno, DB_LOCK_WRITE, 0, &cp->lock)) != 0) + break; #ifdef QDEBUG - __db_logmsg(dbp->dbenv, - dbc->txn, "Queue R", 0, "%x %d %d %d", - dbc->locker, cp->pgno, first, meta->first_recno); + __db_logmsg(dbp->dbenv, + dbc->txn, "Queue R", 0, "%x %d %d %d", + dbc->locker, cp->pgno, first, meta->first_recno); #endif - put_mode |= DB_MPOOL_DISCARD; - if ((ret = __qam_fput(dbp, - cp->pgno, cp->page, put_mode)) != 0) - break; - cp->page = NULL; - - if (exact == 1) { - ret = __qam_fremove(dbp, cp->pgno); - t_ret = __LPUT(dbc, cp->lock); - } - if (ret != 0) - break; - if (t_ret != 0) { - ret = t_ret; - break; - } - } else if ((ret = - __qam_fput(dbp, cp->pgno, cp->page, put_mode)) != 0) + put_mode |= DB_MPOOL_DISCARD; + if ((ret = __qam_fput(dbp, + cp->pgno, cp->page, put_mode)) != 0) break; cp->page = NULL; - first++; - if (first == RECNO_OOB) { - wrapped = 0; - first++; - } - - /* - * LOOP EXIT when we come move to the current - * pointer. - */ - if (!wrapped && first >= current) - break; - ret = __db_lget(dbc, 0, first, DB_LOCK_READ, - DB_LOCK_NOWAIT | DB_LOCK_RECORD, &lock); - if (ret == DB_LOCK_NOTGRANTED) { - ret = 0; - break; + if (exact == 1) { + ret = __qam_fremove(dbp, cp->pgno); + t_ret = __LPUT(dbc, cp->lock); } if (ret != 0) break; - - if ((ret = __qam_position(dbc, - &first, QAM_READ, &exact)) != 0) { - (void)__LPUT(dbc, lock); - break; - } - put_mode = 0; - if ((ret =__LPUT(dbc, lock)) != 0 - || (ret = __LPUT(dbc, cp->lock)) != 0 ||exact) { - if ((t_ret = __qam_fput(dbp, cp->pgno, - cp->page, put_mode)) != 0 && ret == 0) - ret = t_ret; - cp->page = NULL; + if (t_ret != 0) { + ret = t_ret; break; } + } else if (cp->page != NULL && (ret = + __qam_fput(dbp, cp->pgno, cp->page, put_mode)) != 0) + break; + cp->page = NULL; + first++; + if (first == RECNO_OOB) { + wrapped = 0; + first++; } - cp->pgno = save_page; - cp->indx = save_indx; - cp->recno = save_recno; - cp->lock = save_lock; - /* - * We have advanced as far as we can. - * Advance first_recno to this point. + * LOOP EXIT when we come move to the current + * pointer. */ - if (meta->first_recno != first) { + if (!wrapped && first >= current) + break; + + ret = __db_lget(dbc, 0, first, DB_LOCK_READ, + DB_LOCK_NOWAIT | DB_LOCK_RECORD, &lock); + if (ret == DB_LOCK_NOTGRANTED) { + ret = 0; + break; + } + if (ret != 0) + break; + + if ((ret = __qam_position(dbc, + &first, QAM_READ, &exact)) != 0) { + (void)__LPUT(dbc, lock); + break; + } + put_mode = 0; + if ((ret =__LPUT(dbc, lock)) != 0 || + (ret = __LPUT(dbc, cp->lock)) != 0 || exact) { + if ((t_ret = __qam_fput(dbp, cp->pgno, + cp->page, put_mode)) != 0 && ret == 0) + ret = t_ret; + cp->page = NULL; + break; + } + } + + cp->pgno = save_page; + cp->indx = save_indx; + cp->recno = save_recno; + cp->lock = save_lock; + + /* + * We have advanced as far as we can. + * Advance first_recno to this point. + */ + if (ret == 0 && meta->first_recno != first) { #ifdef QDEBUG __db_logmsg(dbp->dbenv, dbc->txn, "Queue M", 0, "%x %d %d %d", dbc->locker, cp->recno, first, meta->first_recno); #endif - if (DB_LOGGING(dbc)) - if ((ret = - __qam_incfirst_log(dbp->dbenv, - dbc->txn, &meta->dbmeta.lsn, 0, - dbp->log_fileid, cp->recno)) != 0) - goto err; - meta->first_recno = first; - meta_dirty = 1; - } + if (DBC_LOGGING(dbc)) + if ((ret = __qam_incfirst_log(dbp, + dbc->txn, &meta->dbmeta.lsn, 0, + cp->recno, PGNO_BASE_MD)) != 0) + goto done; + meta->first_recno = first; + (void)mpf->set(mpf, meta, DB_MPOOL_DIRTY); } done: -err1: if (cp->page != NULL) { - t_ret = __qam_fput(dbp, cp->pgno, cp->page, put_mode); + return (ret); +} - if (!ret) +static int +__qam_bulk(dbc, data, flags) + DBC *dbc; + DBT *data; + u_int32_t flags; +{ + DB *dbp; + DB_LOCK metalock; + DB_MPOOLFILE *mpf; + PAGE *pg; + QMETA *meta; + QAMDATA *qp; + QUEUE_CURSOR *cp; + db_indx_t indx; + db_pgno_t metapno; + qam_position_mode mode; + int32_t *endp, *offp; + u_int8_t *dbuf, *dp, *np; + int exact, recs, re_len, ret, t_ret, valid; + int is_key, need_pg, pagesize, size, space; + + dbp = dbc->dbp; + mpf = dbp->mpf; + cp = (QUEUE_CURSOR *)dbc->internal; + + mode = QAM_READ; + if (F_ISSET(dbc, DBC_RMW)) + mode = QAM_WRITE; + + pagesize = dbp->pgsize; + re_len = ((QUEUE *)dbp->q_internal)->re_len; + recs = ((QUEUE *)dbp->q_internal)->rec_page; + metapno = ((QUEUE *)dbp->q_internal)->q_meta; + + is_key = LF_ISSET(DB_MULTIPLE_KEY) ? 1 : 0; + size = 0; + + if ((ret = __db_lget(dbc, 0, metapno, DB_LOCK_READ, 0, &metalock)) != 0) + return (ret); + if ((ret = mpf->get(mpf, &metapno, 0, &meta)) != 0) { + /* We did not fetch it, we can release the lock. */ + (void)__LPUT(dbc, metalock); + return (ret); + } + + dbuf = data->data; + np = dp = dbuf; + + /* Keep track of space that is left. There is an termination entry */ + space = data->ulen; + space -= sizeof(*offp); + + /* Build the offset/size table form the end up. */ + endp = (int32_t *) ((u_int8_t *)dbuf + data->ulen); + endp--; + offp = endp; + +next_pg: + if ((ret = __qam_position(dbc, &cp->recno, mode, &exact)) != 0) + goto done; + + pg = cp->page; + indx = cp->indx; + need_pg = 1; + + do { + /* + * If this page is a nonexistent page at the end of an + * extent, pg may be NULL. A NULL page has no valid records, + * so just keep looping as though qp exists and isn't QAM_VALID; + * calling QAM_GET_RECORD is unsafe. + */ + valid = 0; + + /* Wrap around, skipping zero. */ + if (cp->recno == RECNO_OOB) + cp->recno++; + if (pg != NULL) { + qp = QAM_GET_RECORD(dbp, pg, indx); + if (F_ISSET(qp, QAM_VALID)) { + valid = 1; + space -= (is_key ? 3 : 2) * sizeof(*offp); + if (space < 0) + goto get_space; + if (need_pg) { + dp = np; + size = pagesize - QPAGE_SZ(dbp); + if (space < size) { +get_space: + if (offp == endp) { + data->size = + ALIGN(size + + pagesize, + sizeof(u_int32_t)); + ret = ENOMEM; + break; + } + if (indx != 0) + indx--; + cp->recno--; + break; + } + memcpy(dp, + (char *)pg + QPAGE_SZ(dbp), size); + need_pg = 0; + space -= size; + np += size; + } + if (is_key) + *offp-- = cp->recno; + *offp-- = (int32_t)((u_int8_t*)qp - + (u_int8_t*)pg - QPAGE_SZ(dbp) + + dp - dbuf + SSZA(QAMDATA, data)); + *offp-- = re_len; + } + } + if (!valid && is_key == 0) { + *offp-- = 0; + *offp-- = 0; + } + cp->recno++; + } while (++indx < recs && indx != RECNO_OOB + && cp->recno != meta->cur_recno + && !QAM_AFTER_CURRENT(meta, cp->recno)); + + if ((t_ret = __TLPUT(dbc, cp->lock)) != 0 && ret == 0) + ret = t_ret; + + if (cp->page != NULL) { + if ((t_ret = + __qam_fput(dbp, cp->pgno, cp->page, 0)) != 0 && ret == 0) ret = t_ret; - /* Doing record locking, release the page lock */ - t_ret = __LPUT(dbc, pglock); cp->page = NULL; } -err: if (!ret) - ret = t_ret; - if (meta) { + if (ret == 0 + && (indx >= recs || indx == RECNO_OOB) + && cp->recno != meta->cur_recno + && !QAM_AFTER_CURRENT(meta, cp->recno)) + goto next_pg; - /* release the meta page */ - t_ret = memp_fput( - dbp->mpf, meta, meta_dirty ? DB_MPOOL_DIRTY : 0); + if (is_key == 1) + *offp = RECNO_OOB; + else + *offp = -1; - if (!ret) - ret = t_ret; +done: + /* release the meta page */ + t_ret = mpf->put(mpf, meta, 0); - /* Don't hold the meta page long term. */ - if (locked) - t_ret = __LPUT(dbc, metalock); - } - DB_ASSERT(metalock.off == LOCK_INVALID); + if (!ret) + ret = t_ret; - /* - * There is no need to keep the record locked if we are - * not in a transaction. - */ - if (t_ret == 0) - t_ret = __TLPUT(dbc, cp->lock); + t_ret = __LPUT(dbc, metalock); - return (ret ? ret : t_ret); + return (ret); } /* @@ -1241,15 +1429,12 @@ __qam_c_close(dbc, root_pgno, rmroot) cp = (QUEUE_CURSOR *)dbc->internal; /* Discard any locks not acquired inside of a transaction. */ - if (cp->lock.off != LOCK_INVALID) { - (void)__TLPUT(dbc, cp->lock); - cp->lock.off = LOCK_INVALID; - } + (void)__TLPUT(dbc, cp->lock); + LOCK_INIT(cp->lock); cp->page = NULL; cp->pgno = PGNO_INVALID; cp->indx = 0; - cp->lock.off = LOCK_INVALID; cp->lock_mode = DB_LOCK_NG; cp->recno = RECNO_OOB; cp->flags = 0; @@ -1277,7 +1462,7 @@ __qam_c_dup(orig_dbc, new_dbc) /* reget the long term lock if we are not in a xact */ if (orig_dbc->txn != NULL || - !STD_LOCKING(orig_dbc) || orig->lock.off == LOCK_INVALID) + !STD_LOCKING(orig_dbc) || !LOCK_ISSET(orig->lock)) return (0); return (__db_lget(new_dbc, @@ -1313,8 +1498,10 @@ __qam_c_init(dbc) dbc->c_count = __db_c_count; dbc->c_del = __db_c_del; dbc->c_dup = __db_c_dup; - dbc->c_get = __db_c_get; + dbc->c_get = dbc->c_real_get = __db_c_get; + dbc->c_pget = __db_c_pget; dbc->c_put = __db_c_put; + dbc->c_am_bulk = __qam_bulk; dbc->c_am_close = __qam_c_close; dbc->c_am_del = __qam_c_del; dbc->c_am_destroy = __qam_c_destroy; @@ -1334,7 +1521,7 @@ __qam_c_destroy(dbc) DBC *dbc; { /* Discard the structures. */ - __os_free(dbc->internal, sizeof(QUEUE_CURSOR)); + __os_free(dbc->dbp->dbenv, dbc->internal); return (0); } @@ -1355,3 +1542,74 @@ __qam_getno(dbp, key, rep) } return (0); } + +/* + * __qam_truncate -- + * Truncate a queue database + * + * PUBLIC: int __qam_truncate __P((DB *, DB_TXN *, u_int32_t *)); + */ +int +__qam_truncate(dbp, txn, countp) + DB *dbp; + DB_TXN *txn; + u_int32_t *countp; +{ + DBC *dbc; + DB_LOCK metalock; + DB_MPOOLFILE *mpf; + QMETA *meta; + db_pgno_t metapno; + int count, ret, t_ret; + + mpf = dbp->mpf; + + /* Acquire a cursor. */ + if ((ret = dbp->cursor(dbp, txn, &dbc, 0)) != 0) + return (ret); + + /* Walk the queue, counting rows. */ + count = 0; + while ((ret = __qam_c_get(dbc, NULL, NULL, DB_CONSUME, &metapno)) == 0) + count++; + + if (ret == DB_NOTFOUND) + ret = 0; + + /* Discard the cursor. */ + if ((t_ret = dbc->c_close(dbc)) != 0 && ret == 0) + ret = t_ret; + + if (ret != 0) + return (ret); + + /* update the meta page */ + /* get the meta page */ + metapno = ((QUEUE *)dbp->q_internal)->q_meta; + if ((ret = + __db_lget(dbc, 0, metapno, DB_LOCK_WRITE, 0, &metalock)) != 0) + return (ret); + + if ((ret = mpf->get(mpf, &metapno, 0, &meta)) != 0) { + /* We did not fetch it, we can release the lock. */ + (void)__LPUT(dbc, metalock); + return (ret); + } + if (DBC_LOGGING(dbc)) { + ret = __qam_mvptr_log(dbp, dbc->txn, &meta->dbmeta.lsn, 0, + QAM_SETCUR | QAM_SETFIRST | QAM_TRUNCATE, meta->first_recno, + 1, meta->cur_recno, 1, &meta->dbmeta.lsn, PGNO_BASE_MD); + } + if (ret == 0) + meta->first_recno = meta->cur_recno = 1; + + if ((t_ret = + mpf->put(mpf, meta, ret == 0 ? DB_MPOOL_DIRTY: 0)) != 0 && ret == 0) + ret = t_ret; + if ((t_ret = __LPUT(dbc, metalock)) != 0 && ret == 0) + ret = t_ret; + + *countp = count; + + return (ret); +} |