summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKeith Bostic <keith@wiredtiger.com>2013-06-09 11:31:35 -0700
committerKeith Bostic <keith@wiredtiger.com>2013-06-09 11:31:35 -0700
commit8a77f8b692d5dca272f9e72c774914096af2df78 (patch)
tree43c3bbc75df6cf6673c83f5a23ce42780d02373f
parent45ef4219704184d30dc4f586ad1fa975cf5642c2 (diff)
parent417bc135b487df3421ee62acee14a80d1f96f150 (diff)
downloadmongo-8a77f8b692d5dca272f9e72c774914096af2df78.tar.gz
Merge pull request #575 from wiredtiger/memrata-txn
KVS library version 4.1 (upgrade to use the KVS name-spaces).
-rw-r--r--ext/test/memrata/memrata.c622
1 files changed, 218 insertions, 404 deletions
diff --git a/ext/test/memrata/memrata.c b/ext/test/memrata/memrata.c
index cadb5cc8766..692e2163f35 100644
--- a/ext/test/memrata/memrata.c
+++ b/ext/test/memrata/memrata.c
@@ -73,54 +73,29 @@
#define KVS_MAJOR 1 /* KVS major, minor version */
#define KVS_MINOR 0
-/*
- * We partition the flat space into a set of objects based on a packed, 8B
- * leading byte. Object IDs from 0 to KVS_MAXID_BASE are reserved for future
- * use -- I can't think of any reason I'd need them, but it's a cheap backup
- * plan.
- */
-#define KVS_MAXID "memrata:WiredTiger.maxid" /* max object ID key */
-#define KVS_MAXID_BASE 6 /* first object ID */
-
-/*
- * Each KVS source supports a set of URIs (named objects). Cursors reference
- * their underlying URI and underlying KVS source.
- */
-typedef struct __uri_source {
- char *name; /* Unique name */
+typedef struct __wt_source {
+ char *uri; /* Unique name */
pthread_rwlock_t lock; /* Lock */
- int configured; /* If URI source configured */
+ int configured; /* If structure configured */
u_int ref; /* Active reference count */
- /*
- * Each object has a unique leading byte prefix, which is the object's
- * ID turned into a packed string (any 8B unsigned value will pack
- * into a maximum of 9 bytes.) We create a packed copy of the object's
- * ID and a packed copy of the ID one greater than the object's ID,
- * the latter is what we use for a "previous" cursor traversal.
- */
-#define KVS_MAX_PACKED_8B 10
- uint8_t id[KVS_MAX_PACKED_8B]; /* Packed unique ID prefix */
- size_t idlen; /* ID prefix length */
- uint8_t idnext[KVS_MAX_PACKED_8B]; /* Packed next ID prefix */
- size_t idnextlen; /* Next ID prefix length */
-
uint64_t append_recno; /* Allocation record number */
int config_recno; /* config "key_format=r" */
int config_bitfield; /* config "value_format=#t" */
+ kvs_t kvs; /* Underlying KVS namespace */
struct __kvs_source *ks; /* Underlying KVS source */
- struct __uri_source *next; /* List of URIs */
-} URI_SOURCE;
+ struct __wt_source *next; /* List of WiredTiger objects */
+} WT_SOURCE;
typedef struct __kvs_source {
char *name; /* Unique name */
- kvs_t kvs; /* Underlying KVS reference */
+ kvs_t kvs_device; /* Underlying KVS store */
- struct __uri_source *uri_head; /* List of URIs */
+ struct __wt_source *ws_head; /* List of WiredTiger sources */
struct __kvs_source *next; /* List of KVS sources */
} KVS_SOURCE;
@@ -130,8 +105,7 @@ typedef struct __cursor {
WT_EXTENSION_API *wtext; /* Extension functions */
- KVS_SOURCE *ks; /* Underlying KVS source */
- URI_SOURCE *us; /* Underlying URI */
+ WT_SOURCE *ws; /* WiredTiger source */
struct kvs_record record; /* Record */
uint8_t key[KVS_MAX_KEY_LEN]; /* key, value */
@@ -232,28 +206,19 @@ copyin_key(WT_CURSOR *wtcursor, int allocate_key)
{
struct kvs_record *r;
CURSOR *cursor;
- URI_SOURCE *us;
WT_EXTENSION_API *wtext;
WT_SESSION *session;
+ WT_SOURCE *ws;
size_t i, size;
- uint8_t *p, *t;
int ret = 0;
session = wtcursor->session;
cursor = (CURSOR *)wtcursor;
- us = cursor->us;
+ ws = cursor->ws;
wtext = cursor->wtext;
r = &cursor->record;
- r->key = cursor->key;
- r->key_len = 0;
-
- /* Prefix the key with the object's unique ID. */
- for (p = us->id, t = r->key, i = 0; i < us->idlen; ++i)
- *t++ = *p++;
- r->key_len += us->idlen;
-
- if (us->config_recno) {
+ if (ws->config_recno) {
/*
* Allocate a new record for append operations.
*
@@ -272,33 +237,39 @@ copyin_key(WT_CURSOR *wtcursor, int allocate_key)
* not quite right.
*/
if (allocate_key && cursor->config_append) {
- if ((ret = writelock(wtext, session, &us->lock)) != 0)
+ if ((ret = writelock(wtext, session, &ws->lock)) != 0)
return (ret);
- wtcursor->recno = ++us->append_recno;
- if ((ret = unlock(wtext, session, &us->lock)) != 0)
+ wtcursor->recno = ++ws->append_recno;
+ if ((ret = unlock(wtext, session, &ws->lock)) != 0)
return (ret);
- } else if (wtcursor->recno > us->append_recno) {
- if ((ret = writelock(wtext, session, &us->lock)) != 0)
+ } else if (wtcursor->recno > ws->append_recno) {
+ if ((ret = writelock(wtext, session, &ws->lock)) != 0)
return (ret);
- if (wtcursor->recno > us->append_recno)
- us->append_recno = wtcursor->recno;
- if ((ret = unlock(wtext, session, &us->lock)) != 0)
+ if (wtcursor->recno > ws->append_recno)
+ ws->append_recno = wtcursor->recno;
+ if ((ret = unlock(wtext, session, &ws->lock)) != 0)
return (ret);
}
if ((ret = wtext->struct_size(wtext, session,
&size, "r", wtcursor->recno)) != 0 ||
- (ret = wtext->struct_pack(wtext, session, t,
- sizeof(r->key) - us->idlen, "r", wtcursor->recno)) != 0)
+ (ret = wtext->struct_pack(wtext, session, cursor->key,
+ sizeof(cursor->key), "r", wtcursor->recno)) != 0)
return (ret);
- r->key_len += size;
+ r->key = cursor->key;
+ r->key_len = size;
} else {
- if (wtcursor->key.size + us->idlen > KVS_MAX_KEY_LEN)
+ if (wtcursor->key.size > KVS_MAX_KEY_LEN)
ERET(wtext, session, ERANGE,
"key size of %" PRIuMAX " is too large",
(uintmax_t)wtcursor->key.size);
- memcpy(t, wtcursor->key.data, wtcursor->key.size);
- r->key_len += wtcursor->key.size;
+
+ /*
+ * XXX
+ * The underlying KVS library data fields aren't const.
+ */
+ r->key = (char *)wtcursor->key.data;
+ r->key_len = wtcursor->key.size;
}
return (0);
}
@@ -312,38 +283,24 @@ copyout_key(WT_CURSOR *wtcursor)
{
struct kvs_record *r;
CURSOR *cursor;
- URI_SOURCE *us;
WT_EXTENSION_API *wtext;
WT_SESSION *session;
- size_t i, len;
- uint8_t *p, *t;
+ WT_SOURCE *ws;
int ret = 0;
session = wtcursor->session;
cursor = (CURSOR *)wtcursor;
- us = cursor->us;
wtext = cursor->wtext;
+ ws = cursor->ws;
r = &cursor->record;
-
- /*
- * Check the object's unique ID, if it doesn't match we've hit the end
- * of the object on a cursor search.
- */
- if (r->key_len < us->idlen)
- return (WT_NOTFOUND);
- for (p = us->id, t = r->key, i = 0; i < us->idlen; ++i)
- if (*t++ != *p++)
- return (WT_NOTFOUND);
- len = r->key_len - us->idlen;
-
- if (us->config_recno) {
- if ((ret = wtext->struct_unpack(
- wtext, session, t, len, "r", &wtcursor->recno)) != 0)
+ if (ws->config_recno) {
+ if ((ret = wtext->struct_unpack(wtext,
+ session, r->key, r->key_len, "r", &wtcursor->recno)) != 0)
return (ret);
} else {
- wtcursor->key.data = t;
- wtcursor->key.size = (uint32_t)len;
+ wtcursor->key.data = r->key;
+ wtcursor->key.size = r->key_len;
}
return (0);
}
@@ -475,7 +432,7 @@ kvs_call(WT_CURSOR *wtcursor, const char *fname,
cursor = (CURSOR *)wtcursor;
wtext = cursor->wtext;
- kvs = cursor->ks->kvs;
+ kvs = cursor->ws->kvs;
cursor->record.val = cursor->val;
restart:
@@ -523,28 +480,6 @@ restart:
}
/*
- * nextprev_init --
- * Initialize a cursor for a next/previous traversal.
- */
-static INLINE void
-nextprev_init(WT_CURSOR *wtcursor, uint8_t *id, size_t idlen)
-{
- CURSOR *cursor;
- size_t i;
- uint8_t *p, *t;
-
- cursor = (CURSOR *)wtcursor;
-
- /* Prefix the key with a unique ID. */
- for (p = id,
- t = cursor->key, i = 0; i < idlen; ++i)
- *t++ = *p++;
-
- cursor->record.key = cursor->key;
- cursor->record.key_len = idlen;
-}
-
-/*
* kvs_cursor_next --
* WT_CURSOR::next method.
*/
@@ -552,18 +487,11 @@ static int
kvs_cursor_next(WT_CURSOR *wtcursor)
{
CURSOR *cursor;
- URI_SOURCE *us;
+ WT_SOURCE *ws;
int ret = 0;
cursor = (CURSOR *)wtcursor;
- us = cursor->us;
-
- /*
- * If this is the start of a new traversal, set the key to the first
- * possible record for the object.
- */
- if (cursor->record.key_len == 0)
- nextprev_init(wtcursor, us->id, us->idlen);
+ ws = cursor->ws;
if ((ret = copy_key(wtcursor)) != 0)
return (ret);
@@ -584,18 +512,11 @@ static int
kvs_cursor_prev(WT_CURSOR *wtcursor)
{
CURSOR *cursor;
- URI_SOURCE *us;
+ WT_SOURCE *ws;
int ret = 0;
cursor = (CURSOR *)wtcursor;
- us = cursor->us;
-
- /*
- * If this is the start of a new traversal, set the key to the first
- * possible record after the object.
- */
- if (cursor->record.key_len == 0)
- nextprev_init(wtcursor, us->idnext, us->idnextlen);
+ ws = cursor->ws;
if ((ret = copy_key(wtcursor)) != 0)
return (ret);
@@ -696,15 +617,15 @@ static int
kvs_cursor_insert(WT_CURSOR *wtcursor)
{
CURSOR *cursor;
- KVS_SOURCE *ks;
WT_EXTENSION_API *wtext;
WT_SESSION *session;
+ WT_SOURCE *ws;
int ret = 0;
session = wtcursor->session;
cursor = (CURSOR *)wtcursor;
wtext = cursor->wtext;
- ks = cursor->ks;
+ ws = cursor->ws;
if ((ret = copyin_key(wtcursor, 1)) != 0)
return (ret);
@@ -719,11 +640,11 @@ kvs_cursor_insert(WT_CURSOR *wtcursor)
* does not exist, fail if it does exist), maps to kvs_add.
*/
if (cursor->config_overwrite) {
- if ((ret = kvs_set(ks->kvs, &cursor->record)) != 0)
+ if ((ret = kvs_set(ws->kvs, &cursor->record)) != 0)
ERET(wtext, session, WT_ERROR,
"kvs_set: %s", kvs_strerror(ret));
} else
- if ((ret = kvs_add(ks->kvs, &cursor->record)) != 0) {
+ if ((ret = kvs_add(ws->kvs, &cursor->record)) != 0) {
if (ret == KVS_E_KEY_EXISTS)
return (WT_DUPLICATE_KEY);
ERET(wtext, session, WT_ERROR,
@@ -740,15 +661,15 @@ static int
kvs_cursor_update(WT_CURSOR *wtcursor)
{
CURSOR *cursor;
- KVS_SOURCE *ks;
WT_EXTENSION_API *wtext;
WT_SESSION *session;
+ WT_SOURCE *ws;
int ret = 0;
session = wtcursor->session;
cursor = (CURSOR *)wtcursor;
wtext = cursor->wtext;
- ks = cursor->ks;
+ ws = cursor->ws;
if ((ret = copyin_key(wtcursor, 0)) != 0)
return (ret);
@@ -759,7 +680,7 @@ kvs_cursor_update(WT_CURSOR *wtcursor)
* WT_CURSOR::update (update the record if it does exist, fail if it
* does not exist), maps to kvs_replace.
*/
- if ((ret = kvs_replace(ks->kvs, &cursor->record)) != 0)
+ if ((ret = kvs_replace(ws->kvs, &cursor->record)) != 0)
ERET(wtext,
session, WT_ERROR, "kvs_replace: %s", kvs_strerror(ret));
@@ -774,23 +695,21 @@ static int
kvs_cursor_remove(WT_CURSOR *wtcursor)
{
CURSOR *cursor;
- KVS_SOURCE *ks;
- URI_SOURCE *us;
WT_EXTENSION_API *wtext;
WT_SESSION *session;
+ WT_SOURCE *ws;
int ret = 0;
session = wtcursor->session;
cursor = (CURSOR *)wtcursor;
wtext = cursor->wtext;
- ks = cursor->ks;
- us = cursor->us;
+ ws = cursor->ws;
/*
* WiredTiger's "remove" of a bitfield is really an update with a value
* of a single byte of zero.
*/
- if (us->config_bitfield) {
+ if (ws->config_bitfield) {
wtcursor->value.size = 1;
wtcursor->value.data = "\0";
return (kvs_cursor_update(wtcursor));
@@ -798,7 +717,7 @@ kvs_cursor_remove(WT_CURSOR *wtcursor)
if ((ret = copyin_key(wtcursor, 0)) != 0)
return (ret);
- if ((ret = kvs_del(ks->kvs, &cursor->record)) == 0)
+ if ((ret = kvs_del(ws->kvs, &cursor->record)) == 0)
return (0);
if (ret == KVS_E_KEY_NOT_FOUND)
return (WT_NOTFOUND);
@@ -813,20 +732,20 @@ static int
kvs_cursor_close(WT_CURSOR *wtcursor)
{
CURSOR *cursor;
- URI_SOURCE *us;
WT_EXTENSION_API *wtext;
WT_SESSION *session;
+ WT_SOURCE *ws;
int ret = 0;
session = wtcursor->session;
cursor = (CURSOR *)wtcursor;
wtext = cursor->wtext;
- us = cursor->us;
+ ws = cursor->ws;
- if ((ret = writelock(wtext, session, &us->lock)) != 0)
+ if ((ret = writelock(wtext, session, &ws->lock)) != 0)
goto err;
- --us->ref;
- if ((ret = unlock(wtext, session, &us->lock)) != 0)
+ --ws->ref;
+ if ((ret = unlock(wtext, session, &ws->lock)) != 0)
goto err;
err: free(cursor->val);
@@ -850,13 +769,14 @@ static const KVS_OPTIONS kvs_options[] = {
* struct kvs_config configuration
*/
{ "kvs_parallelism=64", "int", "min=1,max=512" },
- { "kvs_granularity=4000000", "int", "min=1000000,max=10000000" },
+ { "kvs_granularity=2M", "int", "min=1M,max=10M" },
{ "kvs_avg_key_len=16", "int", "min=10,max=512" },
- { "kvs_avg_val_len=16", "int", "min=10,max=51" },
+ { "kvs_avg_val_len=100", "int", "min=10,max=1M" },
{ "kvs_write_bufs=32", "int", "min=16,max=256" },
- { "kvs_read_bufs=64", "int", "min=16,max=256" },
- { "kvs_commit_timeout=1000000", "int", "min=100,max=10000000" },
+ { "kvs_read_bufs=2048", "int", "min=16,max=1M" },
+ { "kvs_commit_timeout=5M", "int", "min=100,max=10M" },
{ "kvs_reclaim_threshold=60", "int", "min=1,max=80" },
+ { "kvs_reclaim_period=1000", "int", "min=100,max=10K" },
/*
* KVS_O_FLAG flag configuration
@@ -1031,6 +951,7 @@ kvs_config_read(WT_EXTENSION_API *wtext,
KVS_CONFIG_SET("kvs_read_bufs", read_bufs);
KVS_CONFIG_SET("kvs_commit_timeout", commit_timeout);
KVS_CONFIG_SET("kvs_reclaim_threshold", reclaim_threshold);
+ KVS_CONFIG_SET("kvs_reclaim_period", reclaim_period);
#define KVS_FLAG_SET(n, f) \
if (strcmp(name, n) == 0) { \
@@ -1038,10 +959,13 @@ kvs_config_read(WT_EXTENSION_API *wtext,
*flagsp |= f; \
continue; \
}
+ /*
+ * We don't export KVS_O_CREATE: WT_SESSION.create always adds
+ * it in.
+ */
KVS_FLAG_SET("kvs_open_o_debug", KVS_O_DEBUG);
- KVS_FLAG_SET("kvs_open_o_reclaim", KVS_O_SCAN);
- KVS_FLAG_SET("kvs_open_o_scan", KVS_O_RECLAIM);
- KVS_FLAG_SET("kvs_open_o_truncate", KVS_O_CREATE);
+ KVS_FLAG_SET("kvs_open_o_scan", KVS_O_SCAN);
+ KVS_FLAG_SET("kvs_open_o_truncate", KVS_O_TRUNCATE);
}
return (0);
}
@@ -1161,29 +1085,10 @@ kvs_source_open(WT_DATA_SOURCE *wtds,
ks->name = devices;
devices = NULL;
- /*
- * Open the KVS handle last, so cleanup is easier: we don't have any way
- * to say "create the object if it doesn't exist", and since the create
- * flag destroys the underlying store, we can't just always set it. If
- * we fail with "invalid volume", try again with a create flag.
- */
-#if defined(KVS_O_TRUNCATE)
- This is fragile: once KVS_O_CREATE changes to not always destroy the
- store, we can set it all the time, and whatever new flag destroys the
- store should get pushed out into the api as a new kvs flag. (See the
- above setting of KVS_O_CREATE for the kvs_open_o_truncate option.)
-#endif
- ks->kvs = kvs_open(device_list, &kvs_config, flags);
- if (ks->kvs == NULL)
- ret = os_errno();
- if (ret == KVS_E_VOL_INVALID) {
- ret = 0;
- flags |= KVS_O_CREATE;
- ks->kvs = kvs_open(device_list, &kvs_config, flags);
- if (ks->kvs == NULL)
- ret = os_errno();
- }
- if (ks->kvs == NULL) {
+ /* Open the KVS handle last, so cleanup is easier. */
+ ks->kvs_device =
+ kvs_open(device_list, &kvs_config, flags | KVS_O_CREATE);
+ if (ks->kvs_device == NULL) {
ESET(wtext, session, WT_ERROR,
"kvs_open: %s: %s", ks->name, kvs_strerror(os_errno()));
goto err;
@@ -1213,17 +1118,18 @@ err: if (locked)
}
/*
- * uri_source_open --
- * Return a locked URI, allocating and opening if it doesn't already exist.
+ * ws_source_open --
+ * Return a locked WiredTiger source, allocating and opening if it doesn't
+ * already exist.
*/
static int
-uri_source_open(WT_DATA_SOURCE *wtds, WT_SESSION *session,
- WT_CONFIG_ARG *config, const char *uri, int hold_global, URI_SOURCE **refp)
+ws_source_open(WT_DATA_SOURCE *wtds, WT_SESSION *session,
+ WT_CONFIG_ARG *config, const char *uri, int hold_global, WT_SOURCE **refp)
{
DATA_SOURCE *ds;
KVS_SOURCE *ks;
- URI_SOURCE *us;
WT_EXTENSION_API *wtext;
+ WT_SOURCE *ws;
int lockinit, ret = 0;
*refp = NULL;
@@ -1237,38 +1143,38 @@ uri_source_open(WT_DATA_SOURCE *wtds, WT_SESSION *session,
return (ret);
/* Check for a match: if we find one, we're done. */
- for (us = ks->uri_head; us != NULL; us = us->next)
- if (strcmp(us->name, uri) == 0)
+ for (ws = ks->ws_head; ws != NULL; ws = ws->next)
+ if (strcmp(ws->uri, uri) == 0)
goto done;
- /* Allocate and initialize a new underlying URI source object. */
- if ((us = calloc(1, sizeof(*us))) == NULL ||
- (us->name = strdup(uri)) == NULL) {
+ /* Allocate and initialize a new underlying WiredTiger source object. */
+ if ((ws = calloc(1, sizeof(*ws))) == NULL ||
+ (ws->uri = strdup(uri)) == NULL) {
ret = os_errno();
goto err;
}
- if ((ret = lock_init(wtext, session, &us->lock)) != 0)
+ if ((ret = lock_init(wtext, session, &ws->lock)) != 0)
goto err;
lockinit = 1;
- us->ks = ks;
+ ws->ks = ks;
/* Insert the new entry at the head of the list. */
- us->next = ks->uri_head;
- ks->uri_head = us;
+ ws->next = ks->ws_head;
+ ks->ws_head = ws;
/* Return the locked object. */
-done: if ((ret = writelock(wtext, session, &us->lock)) != 0)
+done: if ((ret = writelock(wtext, session, &ws->lock)) != 0)
goto err;
- *refp = us;
- us = NULL;
+ *refp = ws;
+ ws = NULL;
if (0) {
err: if (lockinit)
- ETRET(lock_destroy(wtext, session, &us->lock));
- if (us != NULL) {
- free(us->name);
- free(us);
+ ETRET(lock_destroy(wtext, session, &ws->lock));
+ if (ws != NULL) {
+ free(ws->uri);
+ free(ws);
}
}
@@ -1279,105 +1185,6 @@ err: if (lockinit)
}
/*
- * uri_truncate --
- * Discard the records for an object.
- */
-static int
-uri_truncate(WT_DATA_SOURCE *wtds, WT_SESSION *session, URI_SOURCE *us)
-{
- struct kvs_record *r, _r;
- DATA_SOURCE *ds;
- WT_EXTENSION_API *wtext;
- kvs_t kvs;
- size_t i;
- int ret = 0;
- uint8_t *p, *t;
- char key[KVS_MAX_KEY_LEN];
-
- ds = (DATA_SOURCE *)wtds;
- wtext = ds->wtext;
- kvs = us->ks->kvs;
-
- /* Walk the list of objects, discarding them all. */
- r = &_r;
- memset(r, 0, sizeof(*r));
- r->key = key;
- memcpy(r->key, us->id, us->idlen);
- r->key_len = us->idlen;
- r->val = NULL;
- r->val_len = 0;
- while ((ret = kvs_next(kvs, r, 0UL, 0UL)) == 0) {
- /*
- * Check for an object ID mismatch, if we find one, we're
- * done.
- */
- for (p = us->id, t = r->key, i = 0; i < us->idlen; ++i)
- if (*t++ != *p++)
- return (0);
- if ((ret = kvs_del(kvs, r)) != 0) {
- ESET(wtext, session,
- WT_ERROR, "kvs_del: %s", kvs_strerror(ret));
- return (ret);
- }
- }
- if (ret == KVS_E_KEY_NOT_FOUND)
- ret = 0;
- if (ret != 0)
- ESET(wtext, session,
- WT_ERROR, "kvs_next: %s", kvs_strerror(ret));
- return (ret);
-}
-
-/*
- * master_id_get --
- * Return the maximum file ID in the system.
- */
-static int
-master_id_get(WT_DATA_SOURCE *wtds, WT_SESSION *session, uint64_t *maxidp)
-{
- DATA_SOURCE *ds;
- WT_EXTENSION_API *wtext;
- int ret = 0;
- const char *value;
-
- ds = (DATA_SOURCE *)wtds;
- wtext = ds->wtext;
-
- if ((ret =
- wtext->metadata_search(wtext, session, KVS_MAXID, &value)) == 0) {
- *maxidp = strtouq(value, NULL, 10);
- return (0);
- }
- if (ret == WT_NOTFOUND) {
- *maxidp = KVS_MAXID_BASE;
- return (0);
- }
- ERET(wtext, session, ret, "%s: %s", KVS_MAXID, wtext->strerror(ret));
-}
-
-/*
- * master_id_set --
- * Increment the maximum file ID in the system.
- */
-static int
-master_id_set(WT_DATA_SOURCE *wtds, WT_SESSION *session, uint64_t maxid)
-{
- DATA_SOURCE *ds;
- WT_EXTENSION_API *wtext;
- int ret = 0;
- char value[32]; /* Large enough for any 8B value. */
-
- ds = (DATA_SOURCE *)wtds;
- wtext = ds->wtext;
-
- (void)snprintf(value, sizeof(value), "%" PRIu64, maxid);
- if ((ret =
- wtext->metadata_update(wtext, session, KVS_MAXID, value)) == 0)
- return (0);
- ERET(wtext, session, ret, "%s: %s", KVS_MAXID, wtext->strerror(ret));
-}
-
-/*
* master_uri_get --
* Get the KVS master record for a URI.
*/
@@ -1425,11 +1232,12 @@ master_uri_rename(WT_DATA_SOURCE *wtds,
ds = (DATA_SOURCE *)wtds;
wtext = ds->wtext;
+ value = NULL;
/* Insert the record under a new name. */
if ((ret = master_uri_get(wtds, session, uri, &value)) != 0 ||
(ret = wtext->metadata_insert(wtext, session, newuri, value)) != 0)
- return (ret);
+ goto err;
/*
* Remove the original record, and if that fails, attempt to remove
@@ -1437,6 +1245,8 @@ master_uri_rename(WT_DATA_SOURCE *wtds,
*/
if ((ret = wtext->metadata_remove(wtext, session, uri)) != 0)
(void)wtext->metadata_remove(wtext, session, newuri);
+
+err: free((void *)value);
return (ret);
}
@@ -1451,18 +1261,12 @@ master_uri_set(WT_DATA_SOURCE *wtds,
DATA_SOURCE *ds;
WT_CONFIG_ITEM a, b;
WT_EXTENSION_API *wtext;
- uint64_t maxid;
int exclusive, ret = 0;
char value[1024];
ds = (DATA_SOURCE *)wtds;
wtext = ds->wtext;
- /* Get the maximum file ID. */
- if ((ret = master_id_get(wtds, session, &maxid)) != 0)
- return (ret);
- ++maxid;
-
exclusive = 0;
if ((ret =
wtext->config_get(wtext, session, config, "exclusive", &a)) == 0)
@@ -1499,17 +1303,14 @@ master_uri_set(WT_DATA_SOURCE *wtds,
* update the master ID record.
*/
(void)snprintf(value, sizeof(value),
- "uid=%" PRIu64
",version=(major=%d,minor=%d)"
",key_format=%.*s,value_format=%.*s",
- maxid, KVS_MAJOR, KVS_MINOR, (int)a.len, a.str, (int)b.len, b.str);
+ KVS_MAJOR, KVS_MINOR, (int)a.len, a.str, (int)b.len, b.str);
if ((ret = wtext->metadata_insert(wtext, session, uri, value)) == 0)
- return (master_id_set(wtds, session, maxid));
-
+ return (0);
if (ret == WT_DUPLICATE_KEY)
return (exclusive ? EEXIST : 0);
-
- ERET(wtext, session, ret, "%s: %s", KVS_MAXID, wtext->strerror(ret));
+ ERET(wtext, session, ret, "%s: %s", uri, wtext->strerror(ret));
}
/*
@@ -1521,22 +1322,22 @@ kvs_session_create(WT_DATA_SOURCE *wtds,
WT_SESSION *session, const char *uri, WT_CONFIG_ARG *config)
{
DATA_SOURCE *ds;
- URI_SOURCE *us;
WT_EXTENSION_API *wtext;
+ WT_SOURCE *ws;
int ret = 0;
ds = (DATA_SOURCE *)wtds;
wtext = ds->wtext;
- /* Get a locked reference to the URI. */
- if ((ret = uri_source_open(wtds, session, config, uri, 0, &us)) != 0)
+ /* Get a locked reference to the WiredTiger source. */
+ if ((ret = ws_source_open(wtds, session, config, uri, 0, &ws)) != 0)
return (ret);
/* Create the URI master record if it doesn't already exist. */
ret = master_uri_set(wtds, session, uri, config);
- /* Unlock the URI. */
- ETRET(unlock(wtext, session, &us->lock));
+ /* Unlock the WiredTiger source. */
+ ETRET(unlock(wtext, session, &ws->lock));
return (ret);
}
@@ -1551,49 +1352,57 @@ kvs_session_drop(WT_DATA_SOURCE *wtds,
{
DATA_SOURCE *ds;
KVS_SOURCE *ks;
- URI_SOURCE **p, *us;
WT_EXTENSION_API *wtext;
+ WT_SOURCE **p, *ws;
int ret = 0;
ds = (DATA_SOURCE *)wtds;
wtext = ds->wtext;
- /* Get a locked reference to the data source. */
- if ((ret = uri_source_open(wtds, session, config, uri, 1, &us)) != 0)
+ /*
+ * Get a locked reference to the data source; hold the global lock as
+ * well, we are going to change the list of objects for a KVS store.
+ */
+ if ((ret = ws_source_open(wtds, session, config, uri, 1, &ws)) != 0)
return (ret);
- ks = us->ks;
+ ks = ws->ks;
/* If there are active references to the object, we're busy. */
- if (us->ref != 0) {
+ if (ws->ref != 0) {
ret = EBUSY;
goto err;
}
- /* Discard all of the rows in the object. */
- if ((ret = uri_truncate(wtds, session, us)) != 0)
+ /* Close and delete the underlying object. */
+ if ((ret = kvs_close(ws->kvs)) != 0) {
+ ESET(wtext, session, WT_ERROR,
+ "kvs_close: %s: %s", ws->uri, kvs_strerror(ret));
goto err;
-
- /*
- * Remove the entry from the URI_SOURCE list -- it's a singly-linked
- * list, find the reference to it.
- */
- for (p = &ks->uri_head; *p != NULL; p = &(*p)->next)
- if (*p == us)
- break;
- /*
- * We should be guaranteed to find an entry, we just looked it up and
- * everything is locked down.
- */
- if (*p == NULL) {
- ret = WT_NOTFOUND;
+ }
+ if ((ret = kvs_delete_namespace(ks->kvs_device, ws->uri)) != 0) {
+ ESET(wtext, session, WT_ERROR,
+ "kvs_delete_namespace: %s: %s", ws->uri, kvs_strerror(ret));
goto err;
}
- *p = (*p)->next;
/* Discard the metadata entry. */
ret = master_uri_drop(wtds, session, uri);
-err: ETRET(unlock(wtext, session, &us->lock));
+ /*
+ * Remove the entry from the WT_SOURCE list -- it's a singly-linked
+ * list, find the reference to it.
+ */
+ for (p = &ks->ws_head; *p != NULL; p = &(*p)->next)
+ if (*p == ws) {
+ *p = (*p)->next;
+
+ ETRET(lock_destroy(wtext, session, &ws->lock));
+ free(ws->uri);
+ free(ws);
+ break;
+ }
+
+err:
ETRET(unlock(wtext, session, &ds->global_lock));
return (ret);
}
@@ -1608,10 +1417,10 @@ kvs_session_open_cursor(WT_DATA_SOURCE *wtds, WT_SESSION *session,
{
CURSOR *cursor;
DATA_SOURCE *ds;
- URI_SOURCE *us;
WT_CONFIG_ITEM v;
WT_CURSOR *wtcursor;
WT_EXTENSION_API *wtext;
+ WT_SOURCE *ws;
int ret = 0;
const char *value;
@@ -1659,51 +1468,23 @@ kvs_session_open_cursor(WT_DATA_SOURCE *wtds, WT_SESSION *session,
}
cursor->config_overwrite = v.val != 0;
- /* Get a locked reference to the URI. */
- if ((ret = uri_source_open(wtds, session, config, uri, 0, &us)) != 0)
+ /* Get a locked reference to the WiredTiger source. */
+ if ((ret = ws_source_open(wtds, session, config, uri, 0, &ws)) != 0)
goto err;
/*
- * Finish initializing the cursor (if the URI_SOURCE structure requires
+ * Finish initializing the cursor (if the WT_SOURCE structure requires
* initialization, we're going to use the cursor as part of that work).
*/
- cursor->ks = us->ks;
- cursor->us = us;
+ cursor->ws = ws;
/*
* If this is the first access to the URI, we have to configure it
* using information stored in the master record.
*/
- if (!us->configured) {
- us->configured = 1;
-
+ if (!ws->configured) {
if ((ret = master_uri_get(wtds, session, uri, &value)) != 0)
goto err;
- if ((ret = wtext->config_strget(
- wtext, session, value, "uid", &v)) != 0) {
- ESET(wtext, session, ret,
- "WT_EXTENSION_API.config: uid: %s",
- wtext->strerror(ret));
- goto err;
- }
-
- /*
- * Build packed versions of the unique ID and the next ID (the
- * next ID is what we need to do a "previous" cursor traversal.)
- */
- if ((ret = wtext->struct_size(wtext,
- session, &us->idlen, "r", v.val)) != 0 ||
- (ret = wtext->struct_pack(wtext,
- session, us->id, sizeof(us->id), "r", v.val)) != 0 ||
- (ret = wtext->struct_size(wtext,
- session, &us->idnextlen, "r", v.val + 1)) != 0 ||
- (ret = wtext->struct_pack(wtext, session,
- us->idnext, sizeof(us->idnext), "r", v.val + 1)) != 0) {
- ESET(wtext, session, ret,
- "WT_EXTENSION_API.config: uid: %s",
- wtext->strerror(ret));
- goto err;
- }
if ((ret = wtext->config_strget(
wtext, session, value, "key_format", &v)) != 0) {
@@ -1712,7 +1493,7 @@ kvs_session_open_cursor(WT_DATA_SOURCE *wtds, WT_SESSION *session,
wtext->strerror(ret));
goto err;
}
- us->config_recno = v.len == 1 && v.str[0] == 'r';
+ ws->config_recno = v.len == 1 && v.str[0] == 'r';
if ((ret = wtext->config_strget(
wtext, session, value, "value_format", &v)) != 0) {
@@ -1721,31 +1502,42 @@ kvs_session_open_cursor(WT_DATA_SOURCE *wtds, WT_SESSION *session,
wtext->strerror(ret));
goto err;
}
- us->config_bitfield =
+ ws->config_bitfield =
v.len == 2 && isdigit(v.str[0]) && v.str[1] == 't';
+ /* Open the underlying KVS namespace. */
+ if ((ws->kvs =
+ kvs_open_namespace(ws->ks->kvs_device, uri)) == NULL) {
+ ESET(wtext, session, WT_ERROR,
+ "kvs_open_namespace: %s: %s",
+ uri, kvs_strerror(os_errno()));
+ goto err;
+ }
+
/*
* If it's a record-number key, read the last record from the
* object and set the allocation record value.
*/
- if (us->config_recno) {
+ if (ws->config_recno) {
wtcursor = (WT_CURSOR *)cursor;
if ((ret = kvs_cursor_reset(wtcursor)) != 0)
goto err;
if ((ret = kvs_cursor_prev(wtcursor)) == 0)
- us->append_recno = wtcursor->recno;
+ ws->append_recno = wtcursor->recno;
else if (ret != WT_NOTFOUND)
goto err;
if ((ret = kvs_cursor_reset(wtcursor)) != 0)
goto err;
}
+
+ ws->configured = 1;
}
/* Increment the open reference count to pin the URI and unlock it. */
- ++us->ref;
- if ((ret = unlock(wtext, session, &us->lock)) != 0)
+ ++ws->ref;
+ if ((ret = unlock(wtext, session, &ws->lock)) != 0)
goto err;
*new_cursor = (WT_CURSOR *)cursor;
@@ -1769,8 +1561,9 @@ kvs_session_rename(WT_DATA_SOURCE *wtds, WT_SESSION *session,
const char *uri, const char *newuri, WT_CONFIG_ARG *config)
{
DATA_SOURCE *ds;
- URI_SOURCE *us;
+ KVS_SOURCE *ks;
WT_EXTENSION_API *wtext;
+ WT_SOURCE *ws;
int ret = 0;
char *copy;
@@ -1778,30 +1571,40 @@ kvs_session_rename(WT_DATA_SOURCE *wtds, WT_SESSION *session,
wtext = ds->wtext;
copy = NULL;
- /* Get a locked reference to the URI. */
- if ((ret = uri_source_open(wtds, session, config, uri, 1, &us)) != 0)
+ /*
+ * Get a locked reference to the data source; hold the global lock as
+ * well, we are going to change the object's name, and we can't allow
+ * other threads to be walking the list and comparing against the name.
+ */
+ if ((ret = ws_source_open(wtds, session, config, uri, 1, &ws)) != 0)
return (ret);
+ ks = ws->ks;
/* If there are active references to the object, we're busy. */
- if (us->ref != 0) {
+ if (ws->ref != 0) {
ret = EBUSY;
goto err;
}
- /* Get a copy of the new name to simplify cleanup. */
- if ((copy = strdup(newuri)) == NULL)
- return (os_errno());
-
/* Update the metadata record. */
if ((ret = master_uri_rename(wtds, session, uri, newuri)) != 0)
goto err;
- /* Swap our copy of the name. */
- free(us->name);
- us->name = copy;
+ /* Update the structure name. */
+ if ((copy = strdup(newuri)) == NULL)
+ return (os_errno());
+ free(ws->uri);
+ ws->uri = copy;
copy = NULL;
-err: ETRET(unlock(wtext, session, &us->lock));
+ /* Rename the KVS namespace. */
+ if ((ret = kvs_rename_namespace(ks->kvs_device, uri, newuri)) != 0) {
+ ESET(wtext, session, WT_ERROR,
+ "kvs_rename_namespace: %s: %s", ws->uri, kvs_strerror(ret));
+ goto err;
+ }
+
+err: ETRET(unlock(wtext, session, &ws->lock));
ETRET(unlock(wtext, session, &ds->global_lock));
free(copy);
@@ -1817,24 +1620,31 @@ kvs_session_truncate(WT_DATA_SOURCE *wtds,
WT_SESSION *session, const char *uri, WT_CONFIG_ARG *config)
{
DATA_SOURCE *ds;
- URI_SOURCE *us;
WT_EXTENSION_API *wtext;
+ WT_SOURCE *ws;
int ret = 0;
ds = (DATA_SOURCE *)wtds;
wtext = ds->wtext;
- /* Get a locked reference to the URI. */
- if ((ret = uri_source_open(wtds, session, config, uri, 0, &us)) != 0)
+ /* Get a locked reference to the WiredTiger source. */
+ if ((ret = ws_source_open(wtds, session, config, uri, 0, &ws)) != 0)
return (ret);
- /*
- * If there are active references to the object, we're busy.
- * Else, discard the records.
- */
- ret = us->ref == 0 ? uri_truncate(wtds, session, us) : EBUSY;
+ /* If there are active references to the object, we're busy. */
+ if (ws->ref != 0) {
+ ret = EBUSY;
+ goto err;
+ }
- ETRET(unlock(wtext, session, &us->lock));
+ /* Truncate the underlying object. */
+ if ((ret = kvs_truncate(ws->kvs)) != 0) {
+ ESET(wtext, session, WT_ERROR,
+ "kvs_truncate: %s: %s", ws->uri, kvs_strerror(ret));
+ goto err;
+ }
+
+err: ETRET(unlock(wtext, session, &ws->lock));
return (ret);
}
@@ -1867,8 +1677,8 @@ kvs_terminate(WT_DATA_SOURCE *wtds, WT_SESSION *session)
{
DATA_SOURCE *ds;
KVS_SOURCE *ks;
- URI_SOURCE *us;
WT_EXTENSION_API *wtext;
+ WT_SOURCE *ws;
int tret, ret = 0;
ds = (DATA_SOURCE *)wtds;
@@ -1878,26 +1688,30 @@ kvs_terminate(WT_DATA_SOURCE *wtds, WT_SESSION *session)
/* Start a flush on any open KVS sources. */
for (ks = ds->kvs_head; ks != NULL; ks = ks->next)
- if ((tret = kvs_commit(ks->kvs)) != 0)
+ if ((tret = kvs_commit(ks->kvs_device)) != 0)
ESET(wtext, session, WT_ERROR,
"kvs_commit: %s: %s", ks->name, kvs_strerror(tret));
/* Close and discard all objects. */
while ((ks = ds->kvs_head) != NULL) {
- while ((us = ks->uri_head) != NULL) {
- if (us->ref != 0)
+ while ((ws = ks->ws_head) != NULL) {
+ if (ws->ref != 0)
ESET(wtext, session, WT_ERROR,
"%s: has open object %s with %u open "
"cursors during close",
- ks->name, us->name, us->ref);
+ ks->name, ws->uri, ws->ref);
+ if ((tret = kvs_close(ws->kvs)) != 0)
+ ESET(wtext, session, WT_ERROR,
+ "kvs_close: %s: %s",
+ ws->uri, kvs_strerror(tret));
- ks->uri_head = us->next;
- ETRET(lock_destroy(wtext, session, &us->lock));
- free(us->name);
- free(us);
+ ks->ws_head = ws->next;
+ ETRET(lock_destroy(wtext, session, &ws->lock));
+ free(ws->uri);
+ free(ws);
}
- if ((tret = kvs_close(ks->kvs)) != 0)
+ if ((tret = kvs_close(ks->kvs_device)) != 0)
ESET(wtext, session, WT_ERROR,
"kvs_close: %s: %s", ks->name, kvs_strerror(tret));
@@ -1947,7 +1761,7 @@ wiredtiger_extension_init(WT_CONNECTION *connection, WT_CONFIG_ARG *config)
wtext = connection->get_extension_api(connection);
/* Check the library version */
-#if KVS_VERSION_MAJOR != 2 || KVS_VERSION_MINOR != 8
+#if KVS_VERSION_MAJOR != 4 || KVS_VERSION_MINOR != 1
ERET(wtext, NULL, EINVAL,
"unsupported KVS library version %d.%d",
KVS_VERSION_MAJOR, KVS_VERSION_MINOR);