summaryrefslogtreecommitdiff
path: root/bdb/db/db_join.c
diff options
context:
space:
mode:
Diffstat (limited to 'bdb/db/db_join.c')
-rw-r--r--bdb/db/db_join.c250
1 files changed, 171 insertions, 79 deletions
diff --git a/bdb/db/db_join.c b/bdb/db/db_join.c
index 881dedde0fc..6281b1a8383 100644
--- a/bdb/db/db_join.c
+++ b/bdb/db/db_join.c
@@ -1,14 +1,14 @@
-/*-
+/*
* See the file LICENSE for redistribution information.
*
- * Copyright (c) 1998, 1999, 2000
+ * Copyright (c) 1998-2002
* Sleepycat Software. All rights reserved.
*/
#include "db_config.h"
#ifndef lint
-static const char revid[] = "$Id: db_join.c,v 11.31 2000/12/20 22:41:54 krinsky Exp $";
+static const char revid[] = "$Id: db_join.c,v 11.55 2002/08/08 03:57:47 bostic Exp $";
#endif /* not lint */
#ifndef NO_SYSTEM_INCLUDES
@@ -19,16 +19,17 @@ static const char revid[] = "$Id: db_join.c,v 11.31 2000/12/20 22:41:54 krinsky
#endif
#include "db_int.h"
-#include "db_page.h"
-#include "db_join.h"
-#include "db_am.h"
-#include "btree.h"
+#include "dbinc/db_page.h"
+#include "dbinc/db_join.h"
+#include "dbinc/btree.h"
static int __db_join_close __P((DBC *));
static int __db_join_cmp __P((const void *, const void *));
static int __db_join_del __P((DBC *, u_int32_t));
static int __db_join_get __P((DBC *, DBT *, DBT *, u_int32_t));
-static int __db_join_getnext __P((DBC *, DBT *, DBT *, u_int32_t));
+static int __db_join_getnext __P((DBC *, DBT *, DBT *, u_int32_t, u_int32_t));
+static int __db_join_primget __P((DB *,
+ DB_TXN *, u_int32_t, DBT *, DBT *, u_int32_t));
static int __db_join_put __P((DBC *, DBT *, DBT *, u_int32_t));
/*
@@ -84,7 +85,8 @@ __db_join(primary, curslist, dbcp, flags)
DBC *dbc;
JOIN_CURSOR *jc;
int ret;
- u_int32_t i, ncurs, nslots;
+ u_int32_t i;
+ size_t ncurs, nslots;
COMPQUIET(nslots, 0);
@@ -104,11 +106,13 @@ __db_join(primary, curslist, dbcp, flags)
1, sizeof(JOIN_CURSOR), &jc)) != 0)
goto err;
- if ((ret = __os_malloc(dbenv, 256, NULL, &jc->j_key.data)) != 0)
+ if ((ret = __os_malloc(dbenv, 256, &jc->j_key.data)) != 0)
goto err;
jc->j_key.ulen = 256;
F_SET(&jc->j_key, DB_DBT_USERMEM);
+ F_SET(&jc->j_rdata, DB_DBT_REALLOC);
+
for (jc->j_curslist = curslist;
*jc->j_curslist != NULL; jc->j_curslist++)
;
@@ -184,7 +188,7 @@ __db_join(primary, curslist, dbcp, flags)
jc->j_fdupcurs[i] = NULL;
jc->j_exhausted[i] = 0;
}
- jc->j_ncurs = ncurs;
+ jc->j_ncurs = (u_int32_t)ncurs;
/*
* If DB_JOIN_NOSORT is not set, optimize secondary cursors by
@@ -226,20 +230,20 @@ __db_join(primary, curslist, dbcp, flags)
err: if (jc != NULL) {
if (jc->j_curslist != NULL)
- __os_free(jc->j_curslist, nslots * sizeof(DBC *));
+ __os_free(dbenv, jc->j_curslist);
if (jc->j_workcurs != NULL) {
if (jc->j_workcurs[0] != NULL)
- __os_free(jc->j_workcurs[0], sizeof(DBC));
- __os_free(jc->j_workcurs, nslots * sizeof(DBC *));
+ __os_free(dbenv, jc->j_workcurs[0]);
+ __os_free(dbenv, jc->j_workcurs);
}
if (jc->j_fdupcurs != NULL)
- __os_free(jc->j_fdupcurs, nslots * sizeof(DBC *));
+ __os_free(dbenv, jc->j_fdupcurs);
if (jc->j_exhausted != NULL)
- __os_free(jc->j_exhausted, nslots * sizeof(u_int8_t));
- __os_free(jc, sizeof(JOIN_CURSOR));
+ __os_free(dbenv, jc->j_exhausted);
+ __os_free(dbenv, jc);
}
if (dbc != NULL)
- __os_free(dbc, sizeof(DBC));
+ __os_free(dbenv, dbc);
return (ret);
}
@@ -279,8 +283,8 @@ __db_join_get(dbc, key_arg, data_arg, flags)
DB *dbp;
DBC *cp;
JOIN_CURSOR *jc;
- int ret;
- u_int32_t i, j, operation;
+ int db_manage_data, ret;
+ u_int32_t i, j, operation, opmods;
dbp = dbc->dbp;
jc = (JOIN_CURSOR *)dbc->internal;
@@ -289,6 +293,12 @@ __db_join_get(dbc, key_arg, data_arg, flags)
operation = LF_ISSET(DB_OPFLAGS_MASK);
+ /* !!!
+ * If the set of flags here changes, check that __db_join_primget
+ * is updated to handle them properly.
+ */
+ opmods = LF_ISSET(DB_RMW | DB_DIRTY_READ);
+
if ((ret = __db_joingetchk(dbp, key_arg, flags)) != 0)
return (ret);
@@ -319,13 +329,14 @@ __db_join_get(dbc, key_arg, data_arg, flags)
goto samekey;
F_CLR(jc, JOIN_RETRY);
-retry: ret = jc->j_workcurs[0]->c_get(jc->j_workcurs[0],
- &jc->j_key, key_n, jc->j_exhausted[0] ? DB_NEXT_DUP : DB_CURRENT);
+retry: ret = jc->j_workcurs[0]->c_real_get(jc->j_workcurs[0],
+ &jc->j_key, key_n,
+ opmods | (jc->j_exhausted[0] ? DB_NEXT_DUP : DB_CURRENT));
if (ret == ENOMEM) {
jc->j_key.ulen <<= 1;
if ((ret = __os_realloc(dbp->dbenv,
- jc->j_key.ulen, NULL, &jc->j_key.data)) != 0)
+ jc->j_key.ulen, &jc->j_key.data)) != 0)
goto mem_err;
goto retry;
}
@@ -379,7 +390,7 @@ retry: ret = jc->j_workcurs[0]->c_get(jc->j_workcurs[0],
retry2: cp = jc->j_workcurs[i];
if ((ret = __db_join_getnext(cp, &jc->j_key, key_n,
- jc->j_exhausted[i])) == DB_NOTFOUND) {
+ jc->j_exhausted[i], opmods)) == DB_NOTFOUND) {
/*
* jc->j_workcurs[i] has no more of the datum we're
* interested in. Go back one cursor and get
@@ -475,7 +486,7 @@ retry2: cp = jc->j_workcurs[i];
if (ret == ENOMEM) {
jc->j_key.ulen <<= 1;
if ((ret = __os_realloc(dbp->dbenv, jc->j_key.ulen,
- NULL, &jc->j_key.data)) != 0) {
+ &jc->j_key.data)) != 0) {
mem_err: __db_err(dbp->dbenv,
"Allocation failed for join key, len = %lu",
(u_long)jc->j_key.ulen);
@@ -523,8 +534,8 @@ samekey: /*
* Get the key we tried and failed to return last time;
* it should be the current datum of all the secondary cursors.
*/
- if ((ret = jc->j_workcurs[0]->c_get(jc->j_workcurs[0],
- &jc->j_key, key_n, DB_CURRENT)) != 0)
+ if ((ret = jc->j_workcurs[0]->c_real_get(jc->j_workcurs[0],
+ &jc->j_key, key_n, DB_CURRENT | opmods)) != 0)
return (ret);
F_CLR(jc, JOIN_RETRY);
}
@@ -532,36 +543,28 @@ samekey: /*
/*
* ret == 0; we have a key to return.
*
- * If DB_DBT_USERMEM or DB_DBT_MALLOC is set, we need to
- * copy it back into the dbt we were given for the key;
- * call __db_retcopy.
- *
- * Otherwise, assert that we do not in fact need to copy anything
- * and simply proceed.
+ * If DB_DBT_USERMEM or DB_DBT_MALLOC is set, we need to copy the key
+ * back into the dbt we were given for the key; call __db_retcopy.
+ * Otherwise, assert that we do not need to copy anything and proceed.
*/
- if (F_ISSET(key_arg, DB_DBT_USERMEM) ||
- F_ISSET(key_arg, DB_DBT_MALLOC)) {
+ DB_ASSERT(F_ISSET(
+ key_arg, DB_DBT_USERMEM | DB_DBT_MALLOC) || key_n == key_arg);
+
+ if (F_ISSET(key_arg, DB_DBT_USERMEM | DB_DBT_MALLOC) &&
+ (ret = __db_retcopy(dbp->dbenv,
+ key_arg, key_n->data, key_n->size, NULL, NULL)) != 0) {
/*
- * We need to copy the key back into our original
- * datum. Do so.
+ * The retcopy failed, most commonly because we have a user
+ * buffer for the key which is too small. Set things up to
+ * retry next time, and return.
*/
- if ((ret = __db_retcopy(dbp,
- key_arg, key_n->data, key_n->size, NULL, NULL)) != 0) {
- /*
- * The retcopy failed, most commonly because we
- * have a user buffer for the key which is too small.
- * Set things up to retry next time, and return.
- */
- F_SET(jc, JOIN_RETRY);
- return (ret);
- }
- } else
- DB_ASSERT(key_n == key_arg);
+ F_SET(jc, JOIN_RETRY);
+ return (ret);
+ }
/*
- * If DB_JOIN_ITEM is
- * set, we return it; otherwise we do the lookup in the
- * primary and then return.
+ * If DB_JOIN_ITEM is set, we return it; otherwise we do the lookup
+ * in the primary and then return.
*
* Note that we use key_arg here; it is safe (and appropriate)
* to do so.
@@ -569,14 +572,45 @@ samekey: /*
if (operation == DB_JOIN_ITEM)
return (0);
- if ((ret = jc->j_primary->get(jc->j_primary,
- jc->j_curslist[0]->txn, key_arg, data_arg, 0)) != 0)
- /*
- * The get on the primary failed, most commonly because we're
- * using a user buffer that's not big enough. Flag our
- * failure so we can return the same key next time.
- */
- F_SET(jc, JOIN_RETRY);
+ /*
+ * If data_arg->flags == 0--that is, if DB is managing the
+ * data DBT's memory--it's not safe to just pass the DBT
+ * through to the primary get call, since we don't want that
+ * memory to belong to the primary DB handle (and if the primary
+ * is free-threaded, it can't anyway).
+ *
+ * Instead, use memory that is managed by the join cursor, in
+ * jc->j_rdata.
+ */
+ if (!F_ISSET(data_arg, DB_DBT_MALLOC | DB_DBT_REALLOC | DB_DBT_USERMEM))
+ db_manage_data = 1;
+ else
+ db_manage_data = 0;
+ if ((ret = __db_join_primget(jc->j_primary,
+ jc->j_curslist[0]->txn, jc->j_curslist[0]->locker, key_arg,
+ db_manage_data ? &jc->j_rdata : data_arg, opmods)) != 0) {
+ if (ret == DB_NOTFOUND)
+ /*
+ * If ret == DB_NOTFOUND, the primary and secondary
+ * are out of sync; every item in each secondary
+ * should correspond to something in the primary,
+ * or we shouldn't have done the join this way.
+ * Wail.
+ */
+ ret = __db_secondary_corrupt(jc->j_primary);
+ else
+ /*
+ * The get on the primary failed for some other
+ * reason, most commonly because we're using a user
+ * buffer that's not big enough. Flag our failure
+ * so we can return the same key next time.
+ */
+ F_SET(jc, JOIN_RETRY);
+ }
+ if (db_manage_data && ret == 0) {
+ data_arg->data = jc->j_rdata.data;
+ data_arg->size = jc->j_rdata.size;
+ }
return (ret);
}
@@ -586,12 +620,14 @@ __db_join_close(dbc)
DBC *dbc;
{
DB *dbp;
+ DB_ENV *dbenv;
JOIN_CURSOR *jc;
int ret, t_ret;
u_int32_t i;
jc = (JOIN_CURSOR *)dbc->internal;
dbp = dbc->dbp;
+ dbenv = dbp->dbenv;
ret = t_ret = 0;
/*
@@ -599,11 +635,11 @@ __db_join_close(dbc)
* must happen before any action that can fail and return, or else
* __db_close may loop indefinitely.
*/
- MUTEX_THREAD_LOCK(dbp->dbenv, dbp->mutexp);
+ MUTEX_THREAD_LOCK(dbenv, dbp->mutexp);
TAILQ_REMOVE(&dbp->join_queue, dbc, links);
- MUTEX_THREAD_UNLOCK(dbp->dbenv, dbp->mutexp);
+ MUTEX_THREAD_UNLOCK(dbenv, dbp->mutexp);
- PANIC_CHECK(dbc->dbp->dbenv);
+ PANIC_CHECK(dbenv);
/*
* Close any open scratch cursors. In each case, there may
@@ -625,13 +661,15 @@ __db_join_close(dbc)
ret = t_ret;
}
- __os_free(jc->j_exhausted, 0);
- __os_free(jc->j_curslist, 0);
- __os_free(jc->j_workcurs, 0);
- __os_free(jc->j_fdupcurs, 0);
- __os_free(jc->j_key.data, jc->j_key.ulen);
- __os_free(jc, sizeof(JOIN_CURSOR));
- __os_free(dbc, sizeof(DBC));
+ __os_free(dbenv, jc->j_exhausted);
+ __os_free(dbenv, jc->j_curslist);
+ __os_free(dbenv, jc->j_workcurs);
+ __os_free(dbenv, jc->j_fdupcurs);
+ __os_free(dbenv, jc->j_key.data);
+ if (jc->j_rdata.data != NULL)
+ __os_ufree(dbenv, jc->j_rdata.data);
+ __os_free(dbenv, jc);
+ __os_free(dbenv, dbc);
return (ret);
}
@@ -652,10 +690,10 @@ __db_join_close(dbc)
* If no matching datum exists, returns DB_NOTFOUND, else 0.
*/
static int
-__db_join_getnext(dbc, key, data, exhausted)
+__db_join_getnext(dbc, key, data, exhausted, opmods)
DBC *dbc;
DBT *key, *data;
- u_int32_t exhausted;
+ u_int32_t exhausted, opmods;
{
int ret, cmp;
DB *dbp;
@@ -667,10 +705,14 @@ __db_join_getnext(dbc, key, data, exhausted)
switch (exhausted) {
case 0:
+ /*
+ * We don't want to step on data->data; use a new
+ * DBT and malloc so we don't step on dbc's rdata memory.
+ */
memset(&ldata, 0, sizeof(DBT));
- /* We don't want to step on data->data; malloc. */
F_SET(&ldata, DB_DBT_MALLOC);
- if ((ret = dbc->c_get(dbc, key, &ldata, DB_CURRENT)) != 0)
+ if ((ret = dbc->c_real_get(dbc,
+ key, &ldata, opmods | DB_CURRENT)) != 0)
break;
cmp = func(dbp, data, &ldata);
if (cmp == 0) {
@@ -679,10 +721,10 @@ __db_join_getnext(dbc, key, data, exhausted)
* it into data, then free the buffer we malloc'ed
* above.
*/
- if ((ret = __db_retcopy(dbp, data, ldata.data,
+ if ((ret = __db_retcopy(dbp->dbenv, data, ldata.data,
ldata.size, &data->data, &data->size)) != 0)
return (ret);
- __os_free(ldata.data, 0);
+ __os_ufree(dbp->dbenv, ldata.data);
return (0);
}
@@ -691,10 +733,10 @@ __db_join_getnext(dbc, key, data, exhausted)
* dups. We just forget about ldata and free
* its buffer--data contains the value we're searching for.
*/
- __os_free(ldata.data, 0);
+ __os_ufree(dbp->dbenv, ldata.data);
/* FALLTHROUGH */
case 1:
- ret = dbc->c_get(dbc, key, data, DB_GET_BOTHC);
+ ret = dbc->c_real_get(dbc, key, data, opmods | DB_GET_BOTHC);
break;
default:
ret = EINVAL;
@@ -708,7 +750,6 @@ __db_join_getnext(dbc, key, data, exhausted)
* __db_join_cmp --
* Comparison function for sorting DBCs in cardinality order.
*/
-
static int
__db_join_cmp(a, b)
const void *a, *b;
@@ -728,3 +769,54 @@ __db_join_cmp(a, b)
return (counta - countb);
}
+
+/*
+ * __db_join_primget --
+ * Perform a DB->get in the primary, being careful not to use a new
+ * locker ID if we're doing CDB locking.
+ */
+static int
+__db_join_primget(dbp, txn, lockerid, key, data, flags)
+ DB *dbp;
+ DB_TXN *txn;
+ u_int32_t lockerid;
+ DBT *key, *data;
+ u_int32_t flags;
+{
+ DBC *dbc;
+ int dirty, ret, rmw, t_ret;
+
+ /*
+ * The only allowable flags here are the two flags copied into
+ * "opmods" in __db_join_get, DB_RMW and DB_DIRTY_READ. The former
+ * is an op on the c_get call, the latter on the cursor call.
+ * It's a DB bug if we allow any other flags down in here.
+ */
+ rmw = LF_ISSET(DB_RMW);
+ dirty = LF_ISSET(DB_DIRTY_READ);
+ LF_CLR(DB_RMW | DB_DIRTY_READ);
+ DB_ASSERT(flags == 0);
+
+ if ((ret = __db_icursor(dbp,
+ txn, dbp->type, PGNO_INVALID, 0, lockerid, &dbc)) != 0)
+ return (ret);
+
+ if (dirty ||
+ (txn != NULL && F_ISSET(txn, TXN_DIRTY_READ)))
+ F_SET(dbc, DBC_DIRTY_READ);
+ F_SET(dbc, DBC_TRANSIENT);
+
+ /*
+ * This shouldn't be necessary, thanks to the fact that join cursors
+ * swap in their own DB_DBT_REALLOC'ed buffers, but just for form's
+ * sake, we mirror what __db_get does.
+ */
+ SET_RET_MEM(dbc, dbp);
+
+ ret = dbc->c_get(dbc, key, data, DB_SET | rmw);
+
+ if ((t_ret = __db_c_close(dbc)) != 0 && ret == 0)
+ ret = t_ret;
+
+ return (ret);
+}