summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKeith Bostic <keith@wiredtiger.com>2013-06-09 14:27:20 -0400
committerKeith Bostic <keith@wiredtiger.com>2013-06-09 14:27:20 -0400
commit417bc135b487df3421ee62acee14a80d1f96f150 (patch)
tree43c3bbc75df6cf6673c83f5a23ce42780d02373f
parentb4a13d9a686360e41b3e695f6e6a456f550c7698 (diff)
downloadmongo-417bc135b487df3421ee62acee14a80d1f96f150.tar.gz
Upgrade to using KVS namespaces instead of imposing our own namespaces
on top of the underlying KVS store.
-rw-r--r--ext/test/memrata/memrata.c402
1 files changed, 117 insertions, 285 deletions
diff --git a/ext/test/memrata/memrata.c b/ext/test/memrata/memrata.c
index d417b99694d..692e2163f35 100644
--- a/ext/test/memrata/memrata.c
+++ b/ext/test/memrata/memrata.c
@@ -73,40 +73,19 @@
#define KVS_MAJOR 1 /* KVS major, minor version */
#define KVS_MINOR 0
-/*
- * We partition the flat space into a set of objects based on a packed, 8B
- * leading byte. Object IDs from 0 to KVS_MAXID_BASE are reserved for future
- * use -- I can't think of any reason I'd need them, but it's a cheap backup
- * plan.
- */
-#define KVS_MAXID "memrata:WiredTiger.maxid" /* max object ID key */
-#define KVS_MAXID_BASE 6 /* first object ID */
-
typedef struct __wt_source {
- char *name; /* Unique name */
+ char *uri; /* Unique name */
pthread_rwlock_t lock; /* Lock */
int configured; /* If structure configured */
u_int ref; /* Active reference count */
- /*
- * Each object has a unique leading byte prefix, which is the object's
- * ID turned into a packed string (any 8B unsigned value will pack
- * into a maximum of 9 bytes.) We create a packed copy of the object's
- * ID and a packed copy of the ID one greater than the object's ID,
- * the latter is what we use for a "previous" cursor traversal.
- */
-#define KVS_MAX_PACKED_8B 10
- uint8_t id[KVS_MAX_PACKED_8B]; /* Packed unique ID prefix */
- size_t idlen; /* ID prefix length */
- uint8_t idnext[KVS_MAX_PACKED_8B]; /* Packed next ID prefix */
- size_t idnextlen; /* Next ID prefix length */
-
uint64_t append_recno; /* Allocation record number */
int config_recno; /* config "key_format=r" */
int config_bitfield; /* config "value_format=#t" */
+ kvs_t kvs; /* Underlying KVS namespace */
struct __kvs_source *ks; /* Underlying KVS source */
struct __wt_source *next; /* List of WiredTiger objects */
@@ -114,7 +93,7 @@ typedef struct __wt_source {
typedef struct __kvs_source {
char *name; /* Unique name */
- kvs_t kvs; /* Underlying KVS reference */
+ kvs_t kvs_device; /* Underlying KVS store */
struct __wt_source *ws_head; /* List of WiredTiger sources */
@@ -126,7 +105,6 @@ typedef struct __cursor {
WT_EXTENSION_API *wtext; /* Extension functions */
- KVS_SOURCE *ks; /* KVS source */
WT_SOURCE *ws; /* WiredTiger source */
struct kvs_record record; /* Record */
@@ -232,7 +210,6 @@ copyin_key(WT_CURSOR *wtcursor, int allocate_key)
WT_SESSION *session;
WT_SOURCE *ws;
size_t i, size;
- uint8_t *p, *t;
int ret = 0;
session = wtcursor->session;
@@ -241,14 +218,6 @@ copyin_key(WT_CURSOR *wtcursor, int allocate_key)
wtext = cursor->wtext;
r = &cursor->record;
- r->key = cursor->key;
- r->key_len = 0;
-
- /* Prefix the key with the object's unique ID. */
- for (p = ws->id, t = r->key, i = 0; i < ws->idlen; ++i)
- *t++ = *p++;
- r->key_len += ws->idlen;
-
if (ws->config_recno) {
/*
* Allocate a new record for append operations.
@@ -284,17 +253,23 @@ copyin_key(WT_CURSOR *wtcursor, int allocate_key)
if ((ret = wtext->struct_size(wtext, session,
&size, "r", wtcursor->recno)) != 0 ||
- (ret = wtext->struct_pack(wtext, session, t,
- sizeof(r->key) - ws->idlen, "r", wtcursor->recno)) != 0)
+ (ret = wtext->struct_pack(wtext, session, cursor->key,
+ sizeof(cursor->key), "r", wtcursor->recno)) != 0)
return (ret);
- r->key_len += size;
+ r->key = cursor->key;
+ r->key_len = size;
} else {
- if (wtcursor->key.size + ws->idlen > KVS_MAX_KEY_LEN)
+ if (wtcursor->key.size > KVS_MAX_KEY_LEN)
ERET(wtext, session, ERANGE,
"key size of %" PRIuMAX " is too large",
(uintmax_t)wtcursor->key.size);
- memcpy(t, wtcursor->key.data, wtcursor->key.size);
- r->key_len += wtcursor->key.size;
+
+ /*
+ * XXX
+ * The underlying KVS library data fields aren't const.
+ */
+ r->key = (char *)wtcursor->key.data;
+ r->key_len = wtcursor->key.size;
}
return (0);
}
@@ -311,8 +286,6 @@ copyout_key(WT_CURSOR *wtcursor)
WT_EXTENSION_API *wtext;
WT_SESSION *session;
WT_SOURCE *ws;
- size_t i, len;
- uint8_t *p, *t;
int ret = 0;
session = wtcursor->session;
@@ -321,25 +294,13 @@ copyout_key(WT_CURSOR *wtcursor)
ws = cursor->ws;
r = &cursor->record;
-
- /*
- * Check the object's unique ID, if it doesn't match we've hit the end
- * of the object on a cursor search.
- */
- if (r->key_len < ws->idlen)
- return (WT_NOTFOUND);
- for (p = ws->id, t = r->key, i = 0; i < ws->idlen; ++i)
- if (*t++ != *p++)
- return (WT_NOTFOUND);
- len = r->key_len - ws->idlen;
-
if (ws->config_recno) {
- if ((ret = wtext->struct_unpack(
- wtext, session, t, len, "r", &wtcursor->recno)) != 0)
+ if ((ret = wtext->struct_unpack(wtext,
+ session, r->key, r->key_len, "r", &wtcursor->recno)) != 0)
return (ret);
} else {
- wtcursor->key.data = t;
- wtcursor->key.size = (uint32_t)len;
+ wtcursor->key.data = r->key;
+ wtcursor->key.size = r->key_len;
}
return (0);
}
@@ -471,7 +432,7 @@ kvs_call(WT_CURSOR *wtcursor, const char *fname,
cursor = (CURSOR *)wtcursor;
wtext = cursor->wtext;
- kvs = cursor->ks->kvs;
+ kvs = cursor->ws->kvs;
cursor->record.val = cursor->val;
restart:
@@ -519,28 +480,6 @@ restart:
}
/*
- * nextprev_init --
- * Initialize a cursor for a next/previous traversal.
- */
-static INLINE void
-nextprev_init(WT_CURSOR *wtcursor, uint8_t *id, size_t idlen)
-{
- CURSOR *cursor;
- size_t i;
- uint8_t *p, *t;
-
- cursor = (CURSOR *)wtcursor;
-
- /* Prefix the key with a unique ID. */
- for (p = id,
- t = cursor->key, i = 0; i < idlen; ++i)
- *t++ = *p++;
-
- cursor->record.key = cursor->key;
- cursor->record.key_len = idlen;
-}
-
-/*
* kvs_cursor_next --
* WT_CURSOR::next method.
*/
@@ -554,13 +493,6 @@ kvs_cursor_next(WT_CURSOR *wtcursor)
cursor = (CURSOR *)wtcursor;
ws = cursor->ws;
- /*
- * If this is the start of a new traversal, set the key to the first
- * possible record for the object.
- */
- if (cursor->record.key_len == 0)
- nextprev_init(wtcursor, ws->id, ws->idlen);
-
if ((ret = copy_key(wtcursor)) != 0)
return (ret);
if ((ret = kvs_call(wtcursor, "kvs_next", kvs_next)) != 0)
@@ -586,13 +518,6 @@ kvs_cursor_prev(WT_CURSOR *wtcursor)
cursor = (CURSOR *)wtcursor;
ws = cursor->ws;
- /*
- * If this is the start of a new traversal, set the key to the first
- * possible record after the object.
- */
- if (cursor->record.key_len == 0)
- nextprev_init(wtcursor, ws->idnext, ws->idnextlen);
-
if ((ret = copy_key(wtcursor)) != 0)
return (ret);
if ((ret = kvs_call(wtcursor, "kvs_prev", kvs_prev)) != 0)
@@ -692,15 +617,15 @@ static int
kvs_cursor_insert(WT_CURSOR *wtcursor)
{
CURSOR *cursor;
- KVS_SOURCE *ks;
WT_EXTENSION_API *wtext;
WT_SESSION *session;
+ WT_SOURCE *ws;
int ret = 0;
session = wtcursor->session;
cursor = (CURSOR *)wtcursor;
wtext = cursor->wtext;
- ks = cursor->ks;
+ ws = cursor->ws;
if ((ret = copyin_key(wtcursor, 1)) != 0)
return (ret);
@@ -715,11 +640,11 @@ kvs_cursor_insert(WT_CURSOR *wtcursor)
* does not exist, fail if it does exist), maps to kvs_add.
*/
if (cursor->config_overwrite) {
- if ((ret = kvs_set(ks->kvs, &cursor->record)) != 0)
+ if ((ret = kvs_set(ws->kvs, &cursor->record)) != 0)
ERET(wtext, session, WT_ERROR,
"kvs_set: %s", kvs_strerror(ret));
} else
- if ((ret = kvs_add(ks->kvs, &cursor->record)) != 0) {
+ if ((ret = kvs_add(ws->kvs, &cursor->record)) != 0) {
if (ret == KVS_E_KEY_EXISTS)
return (WT_DUPLICATE_KEY);
ERET(wtext, session, WT_ERROR,
@@ -736,15 +661,15 @@ static int
kvs_cursor_update(WT_CURSOR *wtcursor)
{
CURSOR *cursor;
- KVS_SOURCE *ks;
WT_EXTENSION_API *wtext;
WT_SESSION *session;
+ WT_SOURCE *ws;
int ret = 0;
session = wtcursor->session;
cursor = (CURSOR *)wtcursor;
wtext = cursor->wtext;
- ks = cursor->ks;
+ ws = cursor->ws;
if ((ret = copyin_key(wtcursor, 0)) != 0)
return (ret);
@@ -755,7 +680,7 @@ kvs_cursor_update(WT_CURSOR *wtcursor)
* WT_CURSOR::update (update the record if it does exist, fail if it
* does not exist), maps to kvs_replace.
*/
- if ((ret = kvs_replace(ks->kvs, &cursor->record)) != 0)
+ if ((ret = kvs_replace(ws->kvs, &cursor->record)) != 0)
ERET(wtext,
session, WT_ERROR, "kvs_replace: %s", kvs_strerror(ret));
@@ -770,7 +695,6 @@ static int
kvs_cursor_remove(WT_CURSOR *wtcursor)
{
CURSOR *cursor;
- KVS_SOURCE *ks;
WT_EXTENSION_API *wtext;
WT_SESSION *session;
WT_SOURCE *ws;
@@ -779,7 +703,6 @@ kvs_cursor_remove(WT_CURSOR *wtcursor)
session = wtcursor->session;
cursor = (CURSOR *)wtcursor;
wtext = cursor->wtext;
- ks = cursor->ks;
ws = cursor->ws;
/*
@@ -794,7 +717,7 @@ kvs_cursor_remove(WT_CURSOR *wtcursor)
if ((ret = copyin_key(wtcursor, 0)) != 0)
return (ret);
- if ((ret = kvs_del(ks->kvs, &cursor->record)) == 0)
+ if ((ret = kvs_del(ws->kvs, &cursor->record)) == 0)
return (0);
if (ret == KVS_E_KEY_NOT_FOUND)
return (WT_NOTFOUND);
@@ -1163,8 +1086,9 @@ kvs_source_open(WT_DATA_SOURCE *wtds,
devices = NULL;
/* Open the KVS handle last, so cleanup is easier. */
- ks->kvs = kvs_open(device_list, &kvs_config, flags | KVS_O_CREATE);
- if (ks->kvs == NULL) {
+ ks->kvs_device =
+ kvs_open(device_list, &kvs_config, flags | KVS_O_CREATE);
+ if (ks->kvs_device == NULL) {
ESET(wtext, session, WT_ERROR,
"kvs_open: %s: %s", ks->name, kvs_strerror(os_errno()));
goto err;
@@ -1220,12 +1144,12 @@ ws_source_open(WT_DATA_SOURCE *wtds, WT_SESSION *session,
/* Check for a match: if we find one, we're done. */
for (ws = ks->ws_head; ws != NULL; ws = ws->next)
- if (strcmp(ws->name, uri) == 0)
+ if (strcmp(ws->uri, uri) == 0)
goto done;
/* Allocate and initialize a new underlying WiredTiger source object. */
if ((ws = calloc(1, sizeof(*ws))) == NULL ||
- (ws->name = strdup(uri)) == NULL) {
+ (ws->uri = strdup(uri)) == NULL) {
ret = os_errno();
goto err;
}
@@ -1249,7 +1173,7 @@ done: if ((ret = writelock(wtext, session, &ws->lock)) != 0)
err: if (lockinit)
ETRET(lock_destroy(wtext, session, &ws->lock));
if (ws != NULL) {
- free(ws->name);
+ free(ws->uri);
free(ws);
}
}
@@ -1261,105 +1185,6 @@ err: if (lockinit)
}
/*
- * ws_truncate --
- * Discard the records for an object.
- */
-static int
-ws_truncate(WT_DATA_SOURCE *wtds, WT_SESSION *session, WT_SOURCE *ws)
-{
- struct kvs_record *r, _r;
- DATA_SOURCE *ds;
- WT_EXTENSION_API *wtext;
- kvs_t kvs;
- size_t i;
- int ret = 0;
- uint8_t *p, *t;
- char key[KVS_MAX_KEY_LEN];
-
- ds = (DATA_SOURCE *)wtds;
- wtext = ds->wtext;
- kvs = ws->ks->kvs;
-
- /* Walk the list of objects, discarding them all. */
- r = &_r;
- memset(r, 0, sizeof(*r));
- r->key = key;
- memcpy(r->key, ws->id, ws->idlen);
- r->key_len = ws->idlen;
- r->val = NULL;
- r->val_len = 0;
- while ((ret = kvs_next(kvs, r, 0UL, 0UL)) == 0) {
- /*
- * Check for an object ID mismatch, if we find one, we're
- * done.
- */
- for (p = ws->id, t = r->key, i = 0; i < ws->idlen; ++i)
- if (*t++ != *p++)
- return (0);
- if ((ret = kvs_del(kvs, r)) != 0) {
- ESET(wtext, session,
- WT_ERROR, "kvs_del: %s", kvs_strerror(ret));
- return (ret);
- }
- }
- if (ret == KVS_E_KEY_NOT_FOUND)
- ret = 0;
- if (ret != 0)
- ESET(wtext, session,
- WT_ERROR, "kvs_next: %s", kvs_strerror(ret));
- return (ret);
-}
-
-/*
- * master_id_get --
- * Return the maximum file ID in the system.
- */
-static int
-master_id_get(WT_DATA_SOURCE *wtds, WT_SESSION *session, uint64_t *maxidp)
-{
- DATA_SOURCE *ds;
- WT_EXTENSION_API *wtext;
- int ret = 0;
- const char *value;
-
- ds = (DATA_SOURCE *)wtds;
- wtext = ds->wtext;
-
- if ((ret =
- wtext->metadata_search(wtext, session, KVS_MAXID, &value)) == 0) {
- *maxidp = strtouq(value, NULL, 10);
- return (0);
- }
- if (ret == WT_NOTFOUND) {
- *maxidp = KVS_MAXID_BASE;
- return (0);
- }
- ERET(wtext, session, ret, "%s: %s", KVS_MAXID, wtext->strerror(ret));
-}
-
-/*
- * master_id_set --
- * Increment the maximum file ID in the system.
- */
-static int
-master_id_set(WT_DATA_SOURCE *wtds, WT_SESSION *session, uint64_t maxid)
-{
- DATA_SOURCE *ds;
- WT_EXTENSION_API *wtext;
- int ret = 0;
- char value[32]; /* Large enough for any 8B value. */
-
- ds = (DATA_SOURCE *)wtds;
- wtext = ds->wtext;
-
- (void)snprintf(value, sizeof(value), "%" PRIu64, maxid);
- if ((ret =
- wtext->metadata_update(wtext, session, KVS_MAXID, value)) == 0)
- return (0);
- ERET(wtext, session, ret, "%s: %s", KVS_MAXID, wtext->strerror(ret));
-}
-
-/*
* master_uri_get --
* Get the KVS master record for a URI.
*/
@@ -1407,11 +1232,12 @@ master_uri_rename(WT_DATA_SOURCE *wtds,
ds = (DATA_SOURCE *)wtds;
wtext = ds->wtext;
+ value = NULL;
/* Insert the record under a new name. */
if ((ret = master_uri_get(wtds, session, uri, &value)) != 0 ||
(ret = wtext->metadata_insert(wtext, session, newuri, value)) != 0)
- return (ret);
+ goto err;
/*
* Remove the original record, and if that fails, attempt to remove
@@ -1419,6 +1245,8 @@ master_uri_rename(WT_DATA_SOURCE *wtds,
*/
if ((ret = wtext->metadata_remove(wtext, session, uri)) != 0)
(void)wtext->metadata_remove(wtext, session, newuri);
+
+err: free((void *)value);
return (ret);
}
@@ -1433,18 +1261,12 @@ master_uri_set(WT_DATA_SOURCE *wtds,
DATA_SOURCE *ds;
WT_CONFIG_ITEM a, b;
WT_EXTENSION_API *wtext;
- uint64_t maxid;
int exclusive, ret = 0;
char value[1024];
ds = (DATA_SOURCE *)wtds;
wtext = ds->wtext;
- /* Get the maximum file ID. */
- if ((ret = master_id_get(wtds, session, &maxid)) != 0)
- return (ret);
- ++maxid;
-
exclusive = 0;
if ((ret =
wtext->config_get(wtext, session, config, "exclusive", &a)) == 0)
@@ -1481,17 +1303,14 @@ master_uri_set(WT_DATA_SOURCE *wtds,
* update the master ID record.
*/
(void)snprintf(value, sizeof(value),
- "uid=%" PRIu64
",version=(major=%d,minor=%d)"
",key_format=%.*s,value_format=%.*s",
- maxid, KVS_MAJOR, KVS_MINOR, (int)a.len, a.str, (int)b.len, b.str);
+ KVS_MAJOR, KVS_MINOR, (int)a.len, a.str, (int)b.len, b.str);
if ((ret = wtext->metadata_insert(wtext, session, uri, value)) == 0)
- return (master_id_set(wtds, session, maxid));
-
+ return (0);
if (ret == WT_DUPLICATE_KEY)
return (exclusive ? EEXIST : 0);
-
- ERET(wtext, session, ret, "%s: %s", KVS_MAXID, wtext->strerror(ret));
+ ERET(wtext, session, ret, "%s: %s", uri, wtext->strerror(ret));
}
/*
@@ -1540,7 +1359,10 @@ kvs_session_drop(WT_DATA_SOURCE *wtds,
ds = (DATA_SOURCE *)wtds;
wtext = ds->wtext;
- /* Get a locked reference to the data source. */
+ /*
+ * Get a locked reference to the data source; hold the global lock as
+ * well, we are going to change the list of objects for a KVS store.
+ */
if ((ret = ws_source_open(wtds, session, config, uri, 1, &ws)) != 0)
return (ret);
ks = ws->ks;
@@ -1551,31 +1373,36 @@ kvs_session_drop(WT_DATA_SOURCE *wtds,
goto err;
}
- /* Discard all of the rows in the object. */
- if ((ret = ws_truncate(wtds, session, ws)) != 0)
+ /* Close and delete the underlying object. */
+ if ((ret = kvs_close(ws->kvs)) != 0) {
+ ESET(wtext, session, WT_ERROR,
+ "kvs_close: %s: %s", ws->uri, kvs_strerror(ret));
+ goto err;
+ }
+ if ((ret = kvs_delete_namespace(ks->kvs_device, ws->uri)) != 0) {
+ ESET(wtext, session, WT_ERROR,
+ "kvs_delete_namespace: %s: %s", ws->uri, kvs_strerror(ret));
goto err;
+ }
+
+ /* Discard the metadata entry. */
+ ret = master_uri_drop(wtds, session, uri);
/*
* Remove the entry from the WT_SOURCE list -- it's a singly-linked
* list, find the reference to it.
*/
for (p = &ks->ws_head; *p != NULL; p = &(*p)->next)
- if (*p == ws)
- break;
- /*
- * We should be guaranteed to find an entry, we just looked it up and
- * everything is locked down.
- */
- if (*p == NULL) {
- ret = WT_NOTFOUND;
- goto err;
- }
- *p = (*p)->next;
+ if (*p == ws) {
+ *p = (*p)->next;
- /* Discard the metadata entry. */
- ret = master_uri_drop(wtds, session, uri);
+ ETRET(lock_destroy(wtext, session, &ws->lock));
+ free(ws->uri);
+ free(ws);
+ break;
+ }
-err: ETRET(unlock(wtext, session, &ws->lock));
+err:
ETRET(unlock(wtext, session, &ds->global_lock));
return (ret);
}
@@ -1649,7 +1476,6 @@ kvs_session_open_cursor(WT_DATA_SOURCE *wtds, WT_SESSION *session,
* Finish initializing the cursor (if the WT_SOURCE structure requires
* initialization, we're going to use the cursor as part of that work).
*/
- cursor->ks = ws->ks;
cursor->ws = ws;
/*
@@ -1657,35 +1483,8 @@ kvs_session_open_cursor(WT_DATA_SOURCE *wtds, WT_SESSION *session,
* using information stored in the master record.
*/
if (!ws->configured) {
- ws->configured = 1;
-
if ((ret = master_uri_get(wtds, session, uri, &value)) != 0)
goto err;
- if ((ret = wtext->config_strget(
- wtext, session, value, "uid", &v)) != 0) {
- ESET(wtext, session, ret,
- "WT_EXTENSION_API.config: uid: %s",
- wtext->strerror(ret));
- goto err;
- }
-
- /*
- * Build packed versions of the unique ID and the next ID (the
- * next ID is what we need to do a "previous" cursor traversal.)
- */
- if ((ret = wtext->struct_size(wtext,
- session, &ws->idlen, "r", v.val)) != 0 ||
- (ret = wtext->struct_pack(wtext,
- session, ws->id, sizeof(ws->id), "r", v.val)) != 0 ||
- (ret = wtext->struct_size(wtext,
- session, &ws->idnextlen, "r", v.val + 1)) != 0 ||
- (ret = wtext->struct_pack(wtext, session,
- ws->idnext, sizeof(ws->idnext), "r", v.val + 1)) != 0) {
- ESET(wtext, session, ret,
- "WT_EXTENSION_API.config: uid: %s",
- wtext->strerror(ret));
- goto err;
- }
if ((ret = wtext->config_strget(
wtext, session, value, "key_format", &v)) != 0) {
@@ -1706,6 +1505,15 @@ kvs_session_open_cursor(WT_DATA_SOURCE *wtds, WT_SESSION *session,
ws->config_bitfield =
v.len == 2 && isdigit(v.str[0]) && v.str[1] == 't';
+ /* Open the underlying KVS namespace. */
+ if ((ws->kvs =
+ kvs_open_namespace(ws->ks->kvs_device, uri)) == NULL) {
+ ESET(wtext, session, WT_ERROR,
+ "kvs_open_namespace: %s: %s",
+ uri, kvs_strerror(os_errno()));
+ goto err;
+ }
+
/*
* If it's a record-number key, read the last record from the
* object and set the allocation record value.
@@ -1723,6 +1531,8 @@ kvs_session_open_cursor(WT_DATA_SOURCE *wtds, WT_SESSION *session,
if ((ret = kvs_cursor_reset(wtcursor)) != 0)
goto err;
}
+
+ ws->configured = 1;
}
/* Increment the open reference count to pin the URI and unlock it. */
@@ -1751,6 +1561,7 @@ kvs_session_rename(WT_DATA_SOURCE *wtds, WT_SESSION *session,
const char *uri, const char *newuri, WT_CONFIG_ARG *config)
{
DATA_SOURCE *ds;
+ KVS_SOURCE *ks;
WT_EXTENSION_API *wtext;
WT_SOURCE *ws;
int ret = 0;
@@ -1760,9 +1571,14 @@ kvs_session_rename(WT_DATA_SOURCE *wtds, WT_SESSION *session,
wtext = ds->wtext;
copy = NULL;
- /* Get a locked reference to the WiredTiger source. */
+ /*
+ * Get a locked reference to the data source; hold the global lock as
+ * well, we are going to change the object's name, and we can't allow
+ * other threads to be walking the list and comparing against the name.
+ */
if ((ret = ws_source_open(wtds, session, config, uri, 1, &ws)) != 0)
return (ret);
+ ks = ws->ks;
/* If there are active references to the object, we're busy. */
if (ws->ref != 0) {
@@ -1770,19 +1586,24 @@ kvs_session_rename(WT_DATA_SOURCE *wtds, WT_SESSION *session,
goto err;
}
- /* Get a copy of the new name to simplify cleanup. */
- if ((copy = strdup(newuri)) == NULL)
- return (os_errno());
-
/* Update the metadata record. */
if ((ret = master_uri_rename(wtds, session, uri, newuri)) != 0)
goto err;
- /* Swap our copy of the name. */
- free(ws->name);
- ws->name = copy;
+ /* Update the structure name. */
+ if ((copy = strdup(newuri)) == NULL)
+ return (os_errno());
+ free(ws->uri);
+ ws->uri = copy;
copy = NULL;
+ /* Rename the KVS namespace. */
+ if ((ret = kvs_rename_namespace(ks->kvs_device, uri, newuri)) != 0) {
+ ESET(wtext, session, WT_ERROR,
+ "kvs_rename_namespace: %s: %s", ws->uri, kvs_strerror(ret));
+ goto err;
+ }
+
err: ETRET(unlock(wtext, session, &ws->lock));
ETRET(unlock(wtext, session, &ds->global_lock));
@@ -1810,13 +1631,20 @@ kvs_session_truncate(WT_DATA_SOURCE *wtds,
if ((ret = ws_source_open(wtds, session, config, uri, 0, &ws)) != 0)
return (ret);
- /*
- * If there are active references to the object, we're busy.
- * Else, discard the records.
- */
- ret = ws->ref == 0 ? ws_truncate(wtds, session, ws) : EBUSY;
+ /* If there are active references to the object, we're busy. */
+ if (ws->ref != 0) {
+ ret = EBUSY;
+ goto err;
+ }
- ETRET(unlock(wtext, session, &ws->lock));
+ /* Truncate the underlying object. */
+ if ((ret = kvs_truncate(ws->kvs)) != 0) {
+ ESET(wtext, session, WT_ERROR,
+ "kvs_truncate: %s: %s", ws->uri, kvs_strerror(ret));
+ goto err;
+ }
+
+err: ETRET(unlock(wtext, session, &ws->lock));
return (ret);
}
@@ -1860,7 +1688,7 @@ kvs_terminate(WT_DATA_SOURCE *wtds, WT_SESSION *session)
/* Start a flush on any open KVS sources. */
for (ks = ds->kvs_head; ks != NULL; ks = ks->next)
- if ((tret = kvs_commit(ks->kvs)) != 0)
+ if ((tret = kvs_commit(ks->kvs_device)) != 0)
ESET(wtext, session, WT_ERROR,
"kvs_commit: %s: %s", ks->name, kvs_strerror(tret));
@@ -1871,15 +1699,19 @@ kvs_terminate(WT_DATA_SOURCE *wtds, WT_SESSION *session)
ESET(wtext, session, WT_ERROR,
"%s: has open object %s with %u open "
"cursors during close",
- ks->name, ws->name, ws->ref);
+ ks->name, ws->uri, ws->ref);
+ if ((tret = kvs_close(ws->kvs)) != 0)
+ ESET(wtext, session, WT_ERROR,
+ "kvs_close: %s: %s",
+ ws->uri, kvs_strerror(tret));
ks->ws_head = ws->next;
ETRET(lock_destroy(wtext, session, &ws->lock));
- free(ws->name);
+ free(ws->uri);
free(ws);
}
- if ((tret = kvs_close(ks->kvs)) != 0)
+ if ((tret = kvs_close(ks->kvs_device)) != 0)
ESET(wtext, session, WT_ERROR,
"kvs_close: %s: %s", ks->name, kvs_strerror(tret));