diff options
Diffstat (limited to 'bdb/db/db_join.c')
-rw-r--r-- | bdb/db/db_join.c | 250 |
1 files changed, 171 insertions, 79 deletions
diff --git a/bdb/db/db_join.c b/bdb/db/db_join.c index 881dedde0fc..6281b1a8383 100644 --- a/bdb/db/db_join.c +++ b/bdb/db/db_join.c @@ -1,14 +1,14 @@ -/*- +/* * See the file LICENSE for redistribution information. * - * Copyright (c) 1998, 1999, 2000 + * Copyright (c) 1998-2002 * Sleepycat Software. All rights reserved. */ #include "db_config.h" #ifndef lint -static const char revid[] = "$Id: db_join.c,v 11.31 2000/12/20 22:41:54 krinsky Exp $"; +static const char revid[] = "$Id: db_join.c,v 11.55 2002/08/08 03:57:47 bostic Exp $"; #endif /* not lint */ #ifndef NO_SYSTEM_INCLUDES @@ -19,16 +19,17 @@ static const char revid[] = "$Id: db_join.c,v 11.31 2000/12/20 22:41:54 krinsky #endif #include "db_int.h" -#include "db_page.h" -#include "db_join.h" -#include "db_am.h" -#include "btree.h" +#include "dbinc/db_page.h" +#include "dbinc/db_join.h" +#include "dbinc/btree.h" static int __db_join_close __P((DBC *)); static int __db_join_cmp __P((const void *, const void *)); static int __db_join_del __P((DBC *, u_int32_t)); static int __db_join_get __P((DBC *, DBT *, DBT *, u_int32_t)); -static int __db_join_getnext __P((DBC *, DBT *, DBT *, u_int32_t)); +static int __db_join_getnext __P((DBC *, DBT *, DBT *, u_int32_t, u_int32_t)); +static int __db_join_primget __P((DB *, + DB_TXN *, u_int32_t, DBT *, DBT *, u_int32_t)); static int __db_join_put __P((DBC *, DBT *, DBT *, u_int32_t)); /* @@ -84,7 +85,8 @@ __db_join(primary, curslist, dbcp, flags) DBC *dbc; JOIN_CURSOR *jc; int ret; - u_int32_t i, ncurs, nslots; + u_int32_t i; + size_t ncurs, nslots; COMPQUIET(nslots, 0); @@ -104,11 +106,13 @@ __db_join(primary, curslist, dbcp, flags) 1, sizeof(JOIN_CURSOR), &jc)) != 0) goto err; - if ((ret = __os_malloc(dbenv, 256, NULL, &jc->j_key.data)) != 0) + if ((ret = __os_malloc(dbenv, 256, &jc->j_key.data)) != 0) goto err; jc->j_key.ulen = 256; F_SET(&jc->j_key, DB_DBT_USERMEM); + F_SET(&jc->j_rdata, DB_DBT_REALLOC); + for (jc->j_curslist = curslist; *jc->j_curslist != NULL; jc->j_curslist++) ; @@ -184,7 +188,7 @@ __db_join(primary, curslist, dbcp, flags) jc->j_fdupcurs[i] = NULL; jc->j_exhausted[i] = 0; } - jc->j_ncurs = ncurs; + jc->j_ncurs = (u_int32_t)ncurs; /* * If DB_JOIN_NOSORT is not set, optimize secondary cursors by @@ -226,20 +230,20 @@ __db_join(primary, curslist, dbcp, flags) err: if (jc != NULL) { if (jc->j_curslist != NULL) - __os_free(jc->j_curslist, nslots * sizeof(DBC *)); + __os_free(dbenv, jc->j_curslist); if (jc->j_workcurs != NULL) { if (jc->j_workcurs[0] != NULL) - __os_free(jc->j_workcurs[0], sizeof(DBC)); - __os_free(jc->j_workcurs, nslots * sizeof(DBC *)); + __os_free(dbenv, jc->j_workcurs[0]); + __os_free(dbenv, jc->j_workcurs); } if (jc->j_fdupcurs != NULL) - __os_free(jc->j_fdupcurs, nslots * sizeof(DBC *)); + __os_free(dbenv, jc->j_fdupcurs); if (jc->j_exhausted != NULL) - __os_free(jc->j_exhausted, nslots * sizeof(u_int8_t)); - __os_free(jc, sizeof(JOIN_CURSOR)); + __os_free(dbenv, jc->j_exhausted); + __os_free(dbenv, jc); } if (dbc != NULL) - __os_free(dbc, sizeof(DBC)); + __os_free(dbenv, dbc); return (ret); } @@ -279,8 +283,8 @@ __db_join_get(dbc, key_arg, data_arg, flags) DB *dbp; DBC *cp; JOIN_CURSOR *jc; - int ret; - u_int32_t i, j, operation; + int db_manage_data, ret; + u_int32_t i, j, operation, opmods; dbp = dbc->dbp; jc = (JOIN_CURSOR *)dbc->internal; @@ -289,6 +293,12 @@ __db_join_get(dbc, key_arg, data_arg, flags) operation = LF_ISSET(DB_OPFLAGS_MASK); + /* !!! + * If the set of flags here changes, check that __db_join_primget + * is updated to handle them properly. + */ + opmods = LF_ISSET(DB_RMW | DB_DIRTY_READ); + if ((ret = __db_joingetchk(dbp, key_arg, flags)) != 0) return (ret); @@ -319,13 +329,14 @@ __db_join_get(dbc, key_arg, data_arg, flags) goto samekey; F_CLR(jc, JOIN_RETRY); -retry: ret = jc->j_workcurs[0]->c_get(jc->j_workcurs[0], - &jc->j_key, key_n, jc->j_exhausted[0] ? DB_NEXT_DUP : DB_CURRENT); +retry: ret = jc->j_workcurs[0]->c_real_get(jc->j_workcurs[0], + &jc->j_key, key_n, + opmods | (jc->j_exhausted[0] ? DB_NEXT_DUP : DB_CURRENT)); if (ret == ENOMEM) { jc->j_key.ulen <<= 1; if ((ret = __os_realloc(dbp->dbenv, - jc->j_key.ulen, NULL, &jc->j_key.data)) != 0) + jc->j_key.ulen, &jc->j_key.data)) != 0) goto mem_err; goto retry; } @@ -379,7 +390,7 @@ retry: ret = jc->j_workcurs[0]->c_get(jc->j_workcurs[0], retry2: cp = jc->j_workcurs[i]; if ((ret = __db_join_getnext(cp, &jc->j_key, key_n, - jc->j_exhausted[i])) == DB_NOTFOUND) { + jc->j_exhausted[i], opmods)) == DB_NOTFOUND) { /* * jc->j_workcurs[i] has no more of the datum we're * interested in. Go back one cursor and get @@ -475,7 +486,7 @@ retry2: cp = jc->j_workcurs[i]; if (ret == ENOMEM) { jc->j_key.ulen <<= 1; if ((ret = __os_realloc(dbp->dbenv, jc->j_key.ulen, - NULL, &jc->j_key.data)) != 0) { + &jc->j_key.data)) != 0) { mem_err: __db_err(dbp->dbenv, "Allocation failed for join key, len = %lu", (u_long)jc->j_key.ulen); @@ -523,8 +534,8 @@ samekey: /* * Get the key we tried and failed to return last time; * it should be the current datum of all the secondary cursors. */ - if ((ret = jc->j_workcurs[0]->c_get(jc->j_workcurs[0], - &jc->j_key, key_n, DB_CURRENT)) != 0) + if ((ret = jc->j_workcurs[0]->c_real_get(jc->j_workcurs[0], + &jc->j_key, key_n, DB_CURRENT | opmods)) != 0) return (ret); F_CLR(jc, JOIN_RETRY); } @@ -532,36 +543,28 @@ samekey: /* /* * ret == 0; we have a key to return. * - * If DB_DBT_USERMEM or DB_DBT_MALLOC is set, we need to - * copy it back into the dbt we were given for the key; - * call __db_retcopy. - * - * Otherwise, assert that we do not in fact need to copy anything - * and simply proceed. + * If DB_DBT_USERMEM or DB_DBT_MALLOC is set, we need to copy the key + * back into the dbt we were given for the key; call __db_retcopy. + * Otherwise, assert that we do not need to copy anything and proceed. */ - if (F_ISSET(key_arg, DB_DBT_USERMEM) || - F_ISSET(key_arg, DB_DBT_MALLOC)) { + DB_ASSERT(F_ISSET( + key_arg, DB_DBT_USERMEM | DB_DBT_MALLOC) || key_n == key_arg); + + if (F_ISSET(key_arg, DB_DBT_USERMEM | DB_DBT_MALLOC) && + (ret = __db_retcopy(dbp->dbenv, + key_arg, key_n->data, key_n->size, NULL, NULL)) != 0) { /* - * We need to copy the key back into our original - * datum. Do so. + * The retcopy failed, most commonly because we have a user + * buffer for the key which is too small. Set things up to + * retry next time, and return. */ - if ((ret = __db_retcopy(dbp, - key_arg, key_n->data, key_n->size, NULL, NULL)) != 0) { - /* - * The retcopy failed, most commonly because we - * have a user buffer for the key which is too small. - * Set things up to retry next time, and return. - */ - F_SET(jc, JOIN_RETRY); - return (ret); - } - } else - DB_ASSERT(key_n == key_arg); + F_SET(jc, JOIN_RETRY); + return (ret); + } /* - * If DB_JOIN_ITEM is - * set, we return it; otherwise we do the lookup in the - * primary and then return. + * If DB_JOIN_ITEM is set, we return it; otherwise we do the lookup + * in the primary and then return. * * Note that we use key_arg here; it is safe (and appropriate) * to do so. @@ -569,14 +572,45 @@ samekey: /* if (operation == DB_JOIN_ITEM) return (0); - if ((ret = jc->j_primary->get(jc->j_primary, - jc->j_curslist[0]->txn, key_arg, data_arg, 0)) != 0) - /* - * The get on the primary failed, most commonly because we're - * using a user buffer that's not big enough. Flag our - * failure so we can return the same key next time. - */ - F_SET(jc, JOIN_RETRY); + /* + * If data_arg->flags == 0--that is, if DB is managing the + * data DBT's memory--it's not safe to just pass the DBT + * through to the primary get call, since we don't want that + * memory to belong to the primary DB handle (and if the primary + * is free-threaded, it can't anyway). + * + * Instead, use memory that is managed by the join cursor, in + * jc->j_rdata. + */ + if (!F_ISSET(data_arg, DB_DBT_MALLOC | DB_DBT_REALLOC | DB_DBT_USERMEM)) + db_manage_data = 1; + else + db_manage_data = 0; + if ((ret = __db_join_primget(jc->j_primary, + jc->j_curslist[0]->txn, jc->j_curslist[0]->locker, key_arg, + db_manage_data ? &jc->j_rdata : data_arg, opmods)) != 0) { + if (ret == DB_NOTFOUND) + /* + * If ret == DB_NOTFOUND, the primary and secondary + * are out of sync; every item in each secondary + * should correspond to something in the primary, + * or we shouldn't have done the join this way. + * Wail. + */ + ret = __db_secondary_corrupt(jc->j_primary); + else + /* + * The get on the primary failed for some other + * reason, most commonly because we're using a user + * buffer that's not big enough. Flag our failure + * so we can return the same key next time. + */ + F_SET(jc, JOIN_RETRY); + } + if (db_manage_data && ret == 0) { + data_arg->data = jc->j_rdata.data; + data_arg->size = jc->j_rdata.size; + } return (ret); } @@ -586,12 +620,14 @@ __db_join_close(dbc) DBC *dbc; { DB *dbp; + DB_ENV *dbenv; JOIN_CURSOR *jc; int ret, t_ret; u_int32_t i; jc = (JOIN_CURSOR *)dbc->internal; dbp = dbc->dbp; + dbenv = dbp->dbenv; ret = t_ret = 0; /* @@ -599,11 +635,11 @@ __db_join_close(dbc) * must happen before any action that can fail and return, or else * __db_close may loop indefinitely. */ - MUTEX_THREAD_LOCK(dbp->dbenv, dbp->mutexp); + MUTEX_THREAD_LOCK(dbenv, dbp->mutexp); TAILQ_REMOVE(&dbp->join_queue, dbc, links); - MUTEX_THREAD_UNLOCK(dbp->dbenv, dbp->mutexp); + MUTEX_THREAD_UNLOCK(dbenv, dbp->mutexp); - PANIC_CHECK(dbc->dbp->dbenv); + PANIC_CHECK(dbenv); /* * Close any open scratch cursors. In each case, there may @@ -625,13 +661,15 @@ __db_join_close(dbc) ret = t_ret; } - __os_free(jc->j_exhausted, 0); - __os_free(jc->j_curslist, 0); - __os_free(jc->j_workcurs, 0); - __os_free(jc->j_fdupcurs, 0); - __os_free(jc->j_key.data, jc->j_key.ulen); - __os_free(jc, sizeof(JOIN_CURSOR)); - __os_free(dbc, sizeof(DBC)); + __os_free(dbenv, jc->j_exhausted); + __os_free(dbenv, jc->j_curslist); + __os_free(dbenv, jc->j_workcurs); + __os_free(dbenv, jc->j_fdupcurs); + __os_free(dbenv, jc->j_key.data); + if (jc->j_rdata.data != NULL) + __os_ufree(dbenv, jc->j_rdata.data); + __os_free(dbenv, jc); + __os_free(dbenv, dbc); return (ret); } @@ -652,10 +690,10 @@ __db_join_close(dbc) * If no matching datum exists, returns DB_NOTFOUND, else 0. */ static int -__db_join_getnext(dbc, key, data, exhausted) +__db_join_getnext(dbc, key, data, exhausted, opmods) DBC *dbc; DBT *key, *data; - u_int32_t exhausted; + u_int32_t exhausted, opmods; { int ret, cmp; DB *dbp; @@ -667,10 +705,14 @@ __db_join_getnext(dbc, key, data, exhausted) switch (exhausted) { case 0: + /* + * We don't want to step on data->data; use a new + * DBT and malloc so we don't step on dbc's rdata memory. + */ memset(&ldata, 0, sizeof(DBT)); - /* We don't want to step on data->data; malloc. */ F_SET(&ldata, DB_DBT_MALLOC); - if ((ret = dbc->c_get(dbc, key, &ldata, DB_CURRENT)) != 0) + if ((ret = dbc->c_real_get(dbc, + key, &ldata, opmods | DB_CURRENT)) != 0) break; cmp = func(dbp, data, &ldata); if (cmp == 0) { @@ -679,10 +721,10 @@ __db_join_getnext(dbc, key, data, exhausted) * it into data, then free the buffer we malloc'ed * above. */ - if ((ret = __db_retcopy(dbp, data, ldata.data, + if ((ret = __db_retcopy(dbp->dbenv, data, ldata.data, ldata.size, &data->data, &data->size)) != 0) return (ret); - __os_free(ldata.data, 0); + __os_ufree(dbp->dbenv, ldata.data); return (0); } @@ -691,10 +733,10 @@ __db_join_getnext(dbc, key, data, exhausted) * dups. We just forget about ldata and free * its buffer--data contains the value we're searching for. */ - __os_free(ldata.data, 0); + __os_ufree(dbp->dbenv, ldata.data); /* FALLTHROUGH */ case 1: - ret = dbc->c_get(dbc, key, data, DB_GET_BOTHC); + ret = dbc->c_real_get(dbc, key, data, opmods | DB_GET_BOTHC); break; default: ret = EINVAL; @@ -708,7 +750,6 @@ __db_join_getnext(dbc, key, data, exhausted) * __db_join_cmp -- * Comparison function for sorting DBCs in cardinality order. */ - static int __db_join_cmp(a, b) const void *a, *b; @@ -728,3 +769,54 @@ __db_join_cmp(a, b) return (counta - countb); } + +/* + * __db_join_primget -- + * Perform a DB->get in the primary, being careful not to use a new + * locker ID if we're doing CDB locking. + */ +static int +__db_join_primget(dbp, txn, lockerid, key, data, flags) + DB *dbp; + DB_TXN *txn; + u_int32_t lockerid; + DBT *key, *data; + u_int32_t flags; +{ + DBC *dbc; + int dirty, ret, rmw, t_ret; + + /* + * The only allowable flags here are the two flags copied into + * "opmods" in __db_join_get, DB_RMW and DB_DIRTY_READ. The former + * is an op on the c_get call, the latter on the cursor call. + * It's a DB bug if we allow any other flags down in here. + */ + rmw = LF_ISSET(DB_RMW); + dirty = LF_ISSET(DB_DIRTY_READ); + LF_CLR(DB_RMW | DB_DIRTY_READ); + DB_ASSERT(flags == 0); + + if ((ret = __db_icursor(dbp, + txn, dbp->type, PGNO_INVALID, 0, lockerid, &dbc)) != 0) + return (ret); + + if (dirty || + (txn != NULL && F_ISSET(txn, TXN_DIRTY_READ))) + F_SET(dbc, DBC_DIRTY_READ); + F_SET(dbc, DBC_TRANSIENT); + + /* + * This shouldn't be necessary, thanks to the fact that join cursors + * swap in their own DB_DBT_REALLOC'ed buffers, but just for form's + * sake, we mirror what __db_get does. + */ + SET_RET_MEM(dbc, dbp); + + ret = dbc->c_get(dbc, key, data, DB_SET | rmw); + + if ((t_ret = __db_c_close(dbc)) != 0 && ret == 0) + ret = t_ret; + + return (ret); +} |