diff options
author | Keith Bostic <keith@wiredtiger.com> | 2013-06-09 11:31:35 -0700 |
---|---|---|
committer | Keith Bostic <keith@wiredtiger.com> | 2013-06-09 11:31:35 -0700 |
commit | 8a77f8b692d5dca272f9e72c774914096af2df78 (patch) | |
tree | 43c3bbc75df6cf6673c83f5a23ce42780d02373f | |
parent | 45ef4219704184d30dc4f586ad1fa975cf5642c2 (diff) | |
parent | 417bc135b487df3421ee62acee14a80d1f96f150 (diff) | |
download | mongo-8a77f8b692d5dca272f9e72c774914096af2df78.tar.gz |
Merge pull request #575 from wiredtiger/memrata-txn
KVS library version 4.1 (upgrade to use the KVS name-spaces).
-rw-r--r-- | ext/test/memrata/memrata.c | 622 |
1 files changed, 218 insertions, 404 deletions
diff --git a/ext/test/memrata/memrata.c b/ext/test/memrata/memrata.c index cadb5cc8766..692e2163f35 100644 --- a/ext/test/memrata/memrata.c +++ b/ext/test/memrata/memrata.c @@ -73,54 +73,29 @@ #define KVS_MAJOR 1 /* KVS major, minor version */ #define KVS_MINOR 0 -/* - * We partition the flat space into a set of objects based on a packed, 8B - * leading byte. Object IDs from 0 to KVS_MAXID_BASE are reserved for future - * use -- I can't think of any reason I'd need them, but it's a cheap backup - * plan. - */ -#define KVS_MAXID "memrata:WiredTiger.maxid" /* max object ID key */ -#define KVS_MAXID_BASE 6 /* first object ID */ - -/* - * Each KVS source supports a set of URIs (named objects). Cursors reference - * their underlying URI and underlying KVS source. - */ -typedef struct __uri_source { - char *name; /* Unique name */ +typedef struct __wt_source { + char *uri; /* Unique name */ pthread_rwlock_t lock; /* Lock */ - int configured; /* If URI source configured */ + int configured; /* If structure configured */ u_int ref; /* Active reference count */ - /* - * Each object has a unique leading byte prefix, which is the object's - * ID turned into a packed string (any 8B unsigned value will pack - * into a maximum of 9 bytes.) We create a packed copy of the object's - * ID and a packed copy of the ID one greater than the object's ID, - * the latter is what we use for a "previous" cursor traversal. - */ -#define KVS_MAX_PACKED_8B 10 - uint8_t id[KVS_MAX_PACKED_8B]; /* Packed unique ID prefix */ - size_t idlen; /* ID prefix length */ - uint8_t idnext[KVS_MAX_PACKED_8B]; /* Packed next ID prefix */ - size_t idnextlen; /* Next ID prefix length */ - uint64_t append_recno; /* Allocation record number */ int config_recno; /* config "key_format=r" */ int config_bitfield; /* config "value_format=#t" */ + kvs_t kvs; /* Underlying KVS namespace */ struct __kvs_source *ks; /* Underlying KVS source */ - struct __uri_source *next; /* List of URIs */ -} URI_SOURCE; + struct __wt_source *next; /* List of WiredTiger objects */ +} WT_SOURCE; typedef struct __kvs_source { char *name; /* Unique name */ - kvs_t kvs; /* Underlying KVS reference */ + kvs_t kvs_device; /* Underlying KVS store */ - struct __uri_source *uri_head; /* List of URIs */ + struct __wt_source *ws_head; /* List of WiredTiger sources */ struct __kvs_source *next; /* List of KVS sources */ } KVS_SOURCE; @@ -130,8 +105,7 @@ typedef struct __cursor { WT_EXTENSION_API *wtext; /* Extension functions */ - KVS_SOURCE *ks; /* Underlying KVS source */ - URI_SOURCE *us; /* Underlying URI */ + WT_SOURCE *ws; /* WiredTiger source */ struct kvs_record record; /* Record */ uint8_t key[KVS_MAX_KEY_LEN]; /* key, value */ @@ -232,28 +206,19 @@ copyin_key(WT_CURSOR *wtcursor, int allocate_key) { struct kvs_record *r; CURSOR *cursor; - URI_SOURCE *us; WT_EXTENSION_API *wtext; WT_SESSION *session; + WT_SOURCE *ws; size_t i, size; - uint8_t *p, *t; int ret = 0; session = wtcursor->session; cursor = (CURSOR *)wtcursor; - us = cursor->us; + ws = cursor->ws; wtext = cursor->wtext; r = &cursor->record; - r->key = cursor->key; - r->key_len = 0; - - /* Prefix the key with the object's unique ID. */ - for (p = us->id, t = r->key, i = 0; i < us->idlen; ++i) - *t++ = *p++; - r->key_len += us->idlen; - - if (us->config_recno) { + if (ws->config_recno) { /* * Allocate a new record for append operations. * @@ -272,33 +237,39 @@ copyin_key(WT_CURSOR *wtcursor, int allocate_key) * not quite right. */ if (allocate_key && cursor->config_append) { - if ((ret = writelock(wtext, session, &us->lock)) != 0) + if ((ret = writelock(wtext, session, &ws->lock)) != 0) return (ret); - wtcursor->recno = ++us->append_recno; - if ((ret = unlock(wtext, session, &us->lock)) != 0) + wtcursor->recno = ++ws->append_recno; + if ((ret = unlock(wtext, session, &ws->lock)) != 0) return (ret); - } else if (wtcursor->recno > us->append_recno) { - if ((ret = writelock(wtext, session, &us->lock)) != 0) + } else if (wtcursor->recno > ws->append_recno) { + if ((ret = writelock(wtext, session, &ws->lock)) != 0) return (ret); - if (wtcursor->recno > us->append_recno) - us->append_recno = wtcursor->recno; - if ((ret = unlock(wtext, session, &us->lock)) != 0) + if (wtcursor->recno > ws->append_recno) + ws->append_recno = wtcursor->recno; + if ((ret = unlock(wtext, session, &ws->lock)) != 0) return (ret); } if ((ret = wtext->struct_size(wtext, session, &size, "r", wtcursor->recno)) != 0 || - (ret = wtext->struct_pack(wtext, session, t, - sizeof(r->key) - us->idlen, "r", wtcursor->recno)) != 0) + (ret = wtext->struct_pack(wtext, session, cursor->key, + sizeof(cursor->key), "r", wtcursor->recno)) != 0) return (ret); - r->key_len += size; + r->key = cursor->key; + r->key_len = size; } else { - if (wtcursor->key.size + us->idlen > KVS_MAX_KEY_LEN) + if (wtcursor->key.size > KVS_MAX_KEY_LEN) ERET(wtext, session, ERANGE, "key size of %" PRIuMAX " is too large", (uintmax_t)wtcursor->key.size); - memcpy(t, wtcursor->key.data, wtcursor->key.size); - r->key_len += wtcursor->key.size; + + /* + * XXX + * The underlying KVS library data fields aren't const. + */ + r->key = (char *)wtcursor->key.data; + r->key_len = wtcursor->key.size; } return (0); } @@ -312,38 +283,24 @@ copyout_key(WT_CURSOR *wtcursor) { struct kvs_record *r; CURSOR *cursor; - URI_SOURCE *us; WT_EXTENSION_API *wtext; WT_SESSION *session; - size_t i, len; - uint8_t *p, *t; + WT_SOURCE *ws; int ret = 0; session = wtcursor->session; cursor = (CURSOR *)wtcursor; - us = cursor->us; wtext = cursor->wtext; + ws = cursor->ws; r = &cursor->record; - - /* - * Check the object's unique ID, if it doesn't match we've hit the end - * of the object on a cursor search. - */ - if (r->key_len < us->idlen) - return (WT_NOTFOUND); - for (p = us->id, t = r->key, i = 0; i < us->idlen; ++i) - if (*t++ != *p++) - return (WT_NOTFOUND); - len = r->key_len - us->idlen; - - if (us->config_recno) { - if ((ret = wtext->struct_unpack( - wtext, session, t, len, "r", &wtcursor->recno)) != 0) + if (ws->config_recno) { + if ((ret = wtext->struct_unpack(wtext, + session, r->key, r->key_len, "r", &wtcursor->recno)) != 0) return (ret); } else { - wtcursor->key.data = t; - wtcursor->key.size = (uint32_t)len; + wtcursor->key.data = r->key; + wtcursor->key.size = r->key_len; } return (0); } @@ -475,7 +432,7 @@ kvs_call(WT_CURSOR *wtcursor, const char *fname, cursor = (CURSOR *)wtcursor; wtext = cursor->wtext; - kvs = cursor->ks->kvs; + kvs = cursor->ws->kvs; cursor->record.val = cursor->val; restart: @@ -523,28 +480,6 @@ restart: } /* - * nextprev_init -- - * Initialize a cursor for a next/previous traversal. - */ -static INLINE void -nextprev_init(WT_CURSOR *wtcursor, uint8_t *id, size_t idlen) -{ - CURSOR *cursor; - size_t i; - uint8_t *p, *t; - - cursor = (CURSOR *)wtcursor; - - /* Prefix the key with a unique ID. */ - for (p = id, - t = cursor->key, i = 0; i < idlen; ++i) - *t++ = *p++; - - cursor->record.key = cursor->key; - cursor->record.key_len = idlen; -} - -/* * kvs_cursor_next -- * WT_CURSOR::next method. */ @@ -552,18 +487,11 @@ static int kvs_cursor_next(WT_CURSOR *wtcursor) { CURSOR *cursor; - URI_SOURCE *us; + WT_SOURCE *ws; int ret = 0; cursor = (CURSOR *)wtcursor; - us = cursor->us; - - /* - * If this is the start of a new traversal, set the key to the first - * possible record for the object. - */ - if (cursor->record.key_len == 0) - nextprev_init(wtcursor, us->id, us->idlen); + ws = cursor->ws; if ((ret = copy_key(wtcursor)) != 0) return (ret); @@ -584,18 +512,11 @@ static int kvs_cursor_prev(WT_CURSOR *wtcursor) { CURSOR *cursor; - URI_SOURCE *us; + WT_SOURCE *ws; int ret = 0; cursor = (CURSOR *)wtcursor; - us = cursor->us; - - /* - * If this is the start of a new traversal, set the key to the first - * possible record after the object. - */ - if (cursor->record.key_len == 0) - nextprev_init(wtcursor, us->idnext, us->idnextlen); + ws = cursor->ws; if ((ret = copy_key(wtcursor)) != 0) return (ret); @@ -696,15 +617,15 @@ static int kvs_cursor_insert(WT_CURSOR *wtcursor) { CURSOR *cursor; - KVS_SOURCE *ks; WT_EXTENSION_API *wtext; WT_SESSION *session; + WT_SOURCE *ws; int ret = 0; session = wtcursor->session; cursor = (CURSOR *)wtcursor; wtext = cursor->wtext; - ks = cursor->ks; + ws = cursor->ws; if ((ret = copyin_key(wtcursor, 1)) != 0) return (ret); @@ -719,11 +640,11 @@ kvs_cursor_insert(WT_CURSOR *wtcursor) * does not exist, fail if it does exist), maps to kvs_add. */ if (cursor->config_overwrite) { - if ((ret = kvs_set(ks->kvs, &cursor->record)) != 0) + if ((ret = kvs_set(ws->kvs, &cursor->record)) != 0) ERET(wtext, session, WT_ERROR, "kvs_set: %s", kvs_strerror(ret)); } else - if ((ret = kvs_add(ks->kvs, &cursor->record)) != 0) { + if ((ret = kvs_add(ws->kvs, &cursor->record)) != 0) { if (ret == KVS_E_KEY_EXISTS) return (WT_DUPLICATE_KEY); ERET(wtext, session, WT_ERROR, @@ -740,15 +661,15 @@ static int kvs_cursor_update(WT_CURSOR *wtcursor) { CURSOR *cursor; - KVS_SOURCE *ks; WT_EXTENSION_API *wtext; WT_SESSION *session; + WT_SOURCE *ws; int ret = 0; session = wtcursor->session; cursor = (CURSOR *)wtcursor; wtext = cursor->wtext; - ks = cursor->ks; + ws = cursor->ws; if ((ret = copyin_key(wtcursor, 0)) != 0) return (ret); @@ -759,7 +680,7 @@ kvs_cursor_update(WT_CURSOR *wtcursor) * WT_CURSOR::update (update the record if it does exist, fail if it * does not exist), maps to kvs_replace. */ - if ((ret = kvs_replace(ks->kvs, &cursor->record)) != 0) + if ((ret = kvs_replace(ws->kvs, &cursor->record)) != 0) ERET(wtext, session, WT_ERROR, "kvs_replace: %s", kvs_strerror(ret)); @@ -774,23 +695,21 @@ static int kvs_cursor_remove(WT_CURSOR *wtcursor) { CURSOR *cursor; - KVS_SOURCE *ks; - URI_SOURCE *us; WT_EXTENSION_API *wtext; WT_SESSION *session; + WT_SOURCE *ws; int ret = 0; session = wtcursor->session; cursor = (CURSOR *)wtcursor; wtext = cursor->wtext; - ks = cursor->ks; - us = cursor->us; + ws = cursor->ws; /* * WiredTiger's "remove" of a bitfield is really an update with a value * of a single byte of zero. */ - if (us->config_bitfield) { + if (ws->config_bitfield) { wtcursor->value.size = 1; wtcursor->value.data = "\0"; return (kvs_cursor_update(wtcursor)); @@ -798,7 +717,7 @@ kvs_cursor_remove(WT_CURSOR *wtcursor) if ((ret = copyin_key(wtcursor, 0)) != 0) return (ret); - if ((ret = kvs_del(ks->kvs, &cursor->record)) == 0) + if ((ret = kvs_del(ws->kvs, &cursor->record)) == 0) return (0); if (ret == KVS_E_KEY_NOT_FOUND) return (WT_NOTFOUND); @@ -813,20 +732,20 @@ static int kvs_cursor_close(WT_CURSOR *wtcursor) { CURSOR *cursor; - URI_SOURCE *us; WT_EXTENSION_API *wtext; WT_SESSION *session; + WT_SOURCE *ws; int ret = 0; session = wtcursor->session; cursor = (CURSOR *)wtcursor; wtext = cursor->wtext; - us = cursor->us; + ws = cursor->ws; - if ((ret = writelock(wtext, session, &us->lock)) != 0) + if ((ret = writelock(wtext, session, &ws->lock)) != 0) goto err; - --us->ref; - if ((ret = unlock(wtext, session, &us->lock)) != 0) + --ws->ref; + if ((ret = unlock(wtext, session, &ws->lock)) != 0) goto err; err: free(cursor->val); @@ -850,13 +769,14 @@ static const KVS_OPTIONS kvs_options[] = { * struct kvs_config configuration */ { "kvs_parallelism=64", "int", "min=1,max=512" }, - { "kvs_granularity=4000000", "int", "min=1000000,max=10000000" }, + { "kvs_granularity=2M", "int", "min=1M,max=10M" }, { "kvs_avg_key_len=16", "int", "min=10,max=512" }, - { "kvs_avg_val_len=16", "int", "min=10,max=51" }, + { "kvs_avg_val_len=100", "int", "min=10,max=1M" }, { "kvs_write_bufs=32", "int", "min=16,max=256" }, - { "kvs_read_bufs=64", "int", "min=16,max=256" }, - { "kvs_commit_timeout=1000000", "int", "min=100,max=10000000" }, + { "kvs_read_bufs=2048", "int", "min=16,max=1M" }, + { "kvs_commit_timeout=5M", "int", "min=100,max=10M" }, { "kvs_reclaim_threshold=60", "int", "min=1,max=80" }, + { "kvs_reclaim_period=1000", "int", "min=100,max=10K" }, /* * KVS_O_FLAG flag configuration @@ -1031,6 +951,7 @@ kvs_config_read(WT_EXTENSION_API *wtext, KVS_CONFIG_SET("kvs_read_bufs", read_bufs); KVS_CONFIG_SET("kvs_commit_timeout", commit_timeout); KVS_CONFIG_SET("kvs_reclaim_threshold", reclaim_threshold); + KVS_CONFIG_SET("kvs_reclaim_period", reclaim_period); #define KVS_FLAG_SET(n, f) \ if (strcmp(name, n) == 0) { \ @@ -1038,10 +959,13 @@ kvs_config_read(WT_EXTENSION_API *wtext, *flagsp |= f; \ continue; \ } + /* + * We don't export KVS_O_CREATE: WT_SESSION.create always adds + * it in. + */ KVS_FLAG_SET("kvs_open_o_debug", KVS_O_DEBUG); - KVS_FLAG_SET("kvs_open_o_reclaim", KVS_O_SCAN); - KVS_FLAG_SET("kvs_open_o_scan", KVS_O_RECLAIM); - KVS_FLAG_SET("kvs_open_o_truncate", KVS_O_CREATE); + KVS_FLAG_SET("kvs_open_o_scan", KVS_O_SCAN); + KVS_FLAG_SET("kvs_open_o_truncate", KVS_O_TRUNCATE); } return (0); } @@ -1161,29 +1085,10 @@ kvs_source_open(WT_DATA_SOURCE *wtds, ks->name = devices; devices = NULL; - /* - * Open the KVS handle last, so cleanup is easier: we don't have any way - * to say "create the object if it doesn't exist", and since the create - * flag destroys the underlying store, we can't just always set it. If - * we fail with "invalid volume", try again with a create flag. - */ -#if defined(KVS_O_TRUNCATE) - This is fragile: once KVS_O_CREATE changes to not always destroy the - store, we can set it all the time, and whatever new flag destroys the - store should get pushed out into the api as a new kvs flag. (See the - above setting of KVS_O_CREATE for the kvs_open_o_truncate option.) -#endif - ks->kvs = kvs_open(device_list, &kvs_config, flags); - if (ks->kvs == NULL) - ret = os_errno(); - if (ret == KVS_E_VOL_INVALID) { - ret = 0; - flags |= KVS_O_CREATE; - ks->kvs = kvs_open(device_list, &kvs_config, flags); - if (ks->kvs == NULL) - ret = os_errno(); - } - if (ks->kvs == NULL) { + /* Open the KVS handle last, so cleanup is easier. */ + ks->kvs_device = + kvs_open(device_list, &kvs_config, flags | KVS_O_CREATE); + if (ks->kvs_device == NULL) { ESET(wtext, session, WT_ERROR, "kvs_open: %s: %s", ks->name, kvs_strerror(os_errno())); goto err; @@ -1213,17 +1118,18 @@ err: if (locked) } /* - * uri_source_open -- - * Return a locked URI, allocating and opening if it doesn't already exist. + * ws_source_open -- + * Return a locked WiredTiger source, allocating and opening if it doesn't + * already exist. */ static int -uri_source_open(WT_DATA_SOURCE *wtds, WT_SESSION *session, - WT_CONFIG_ARG *config, const char *uri, int hold_global, URI_SOURCE **refp) +ws_source_open(WT_DATA_SOURCE *wtds, WT_SESSION *session, + WT_CONFIG_ARG *config, const char *uri, int hold_global, WT_SOURCE **refp) { DATA_SOURCE *ds; KVS_SOURCE *ks; - URI_SOURCE *us; WT_EXTENSION_API *wtext; + WT_SOURCE *ws; int lockinit, ret = 0; *refp = NULL; @@ -1237,38 +1143,38 @@ uri_source_open(WT_DATA_SOURCE *wtds, WT_SESSION *session, return (ret); /* Check for a match: if we find one, we're done. */ - for (us = ks->uri_head; us != NULL; us = us->next) - if (strcmp(us->name, uri) == 0) + for (ws = ks->ws_head; ws != NULL; ws = ws->next) + if (strcmp(ws->uri, uri) == 0) goto done; - /* Allocate and initialize a new underlying URI source object. */ - if ((us = calloc(1, sizeof(*us))) == NULL || - (us->name = strdup(uri)) == NULL) { + /* Allocate and initialize a new underlying WiredTiger source object. */ + if ((ws = calloc(1, sizeof(*ws))) == NULL || + (ws->uri = strdup(uri)) == NULL) { ret = os_errno(); goto err; } - if ((ret = lock_init(wtext, session, &us->lock)) != 0) + if ((ret = lock_init(wtext, session, &ws->lock)) != 0) goto err; lockinit = 1; - us->ks = ks; + ws->ks = ks; /* Insert the new entry at the head of the list. */ - us->next = ks->uri_head; - ks->uri_head = us; + ws->next = ks->ws_head; + ks->ws_head = ws; /* Return the locked object. */ -done: if ((ret = writelock(wtext, session, &us->lock)) != 0) +done: if ((ret = writelock(wtext, session, &ws->lock)) != 0) goto err; - *refp = us; - us = NULL; + *refp = ws; + ws = NULL; if (0) { err: if (lockinit) - ETRET(lock_destroy(wtext, session, &us->lock)); - if (us != NULL) { - free(us->name); - free(us); + ETRET(lock_destroy(wtext, session, &ws->lock)); + if (ws != NULL) { + free(ws->uri); + free(ws); } } @@ -1279,105 +1185,6 @@ err: if (lockinit) } /* - * uri_truncate -- - * Discard the records for an object. - */ -static int -uri_truncate(WT_DATA_SOURCE *wtds, WT_SESSION *session, URI_SOURCE *us) -{ - struct kvs_record *r, _r; - DATA_SOURCE *ds; - WT_EXTENSION_API *wtext; - kvs_t kvs; - size_t i; - int ret = 0; - uint8_t *p, *t; - char key[KVS_MAX_KEY_LEN]; - - ds = (DATA_SOURCE *)wtds; - wtext = ds->wtext; - kvs = us->ks->kvs; - - /* Walk the list of objects, discarding them all. */ - r = &_r; - memset(r, 0, sizeof(*r)); - r->key = key; - memcpy(r->key, us->id, us->idlen); - r->key_len = us->idlen; - r->val = NULL; - r->val_len = 0; - while ((ret = kvs_next(kvs, r, 0UL, 0UL)) == 0) { - /* - * Check for an object ID mismatch, if we find one, we're - * done. - */ - for (p = us->id, t = r->key, i = 0; i < us->idlen; ++i) - if (*t++ != *p++) - return (0); - if ((ret = kvs_del(kvs, r)) != 0) { - ESET(wtext, session, - WT_ERROR, "kvs_del: %s", kvs_strerror(ret)); - return (ret); - } - } - if (ret == KVS_E_KEY_NOT_FOUND) - ret = 0; - if (ret != 0) - ESET(wtext, session, - WT_ERROR, "kvs_next: %s", kvs_strerror(ret)); - return (ret); -} - -/* - * master_id_get -- - * Return the maximum file ID in the system. - */ -static int -master_id_get(WT_DATA_SOURCE *wtds, WT_SESSION *session, uint64_t *maxidp) -{ - DATA_SOURCE *ds; - WT_EXTENSION_API *wtext; - int ret = 0; - const char *value; - - ds = (DATA_SOURCE *)wtds; - wtext = ds->wtext; - - if ((ret = - wtext->metadata_search(wtext, session, KVS_MAXID, &value)) == 0) { - *maxidp = strtouq(value, NULL, 10); - return (0); - } - if (ret == WT_NOTFOUND) { - *maxidp = KVS_MAXID_BASE; - return (0); - } - ERET(wtext, session, ret, "%s: %s", KVS_MAXID, wtext->strerror(ret)); -} - -/* - * master_id_set -- - * Increment the maximum file ID in the system. - */ -static int -master_id_set(WT_DATA_SOURCE *wtds, WT_SESSION *session, uint64_t maxid) -{ - DATA_SOURCE *ds; - WT_EXTENSION_API *wtext; - int ret = 0; - char value[32]; /* Large enough for any 8B value. */ - - ds = (DATA_SOURCE *)wtds; - wtext = ds->wtext; - - (void)snprintf(value, sizeof(value), "%" PRIu64, maxid); - if ((ret = - wtext->metadata_update(wtext, session, KVS_MAXID, value)) == 0) - return (0); - ERET(wtext, session, ret, "%s: %s", KVS_MAXID, wtext->strerror(ret)); -} - -/* * master_uri_get -- * Get the KVS master record for a URI. */ @@ -1425,11 +1232,12 @@ master_uri_rename(WT_DATA_SOURCE *wtds, ds = (DATA_SOURCE *)wtds; wtext = ds->wtext; + value = NULL; /* Insert the record under a new name. */ if ((ret = master_uri_get(wtds, session, uri, &value)) != 0 || (ret = wtext->metadata_insert(wtext, session, newuri, value)) != 0) - return (ret); + goto err; /* * Remove the original record, and if that fails, attempt to remove @@ -1437,6 +1245,8 @@ master_uri_rename(WT_DATA_SOURCE *wtds, */ if ((ret = wtext->metadata_remove(wtext, session, uri)) != 0) (void)wtext->metadata_remove(wtext, session, newuri); + +err: free((void *)value); return (ret); } @@ -1451,18 +1261,12 @@ master_uri_set(WT_DATA_SOURCE *wtds, DATA_SOURCE *ds; WT_CONFIG_ITEM a, b; WT_EXTENSION_API *wtext; - uint64_t maxid; int exclusive, ret = 0; char value[1024]; ds = (DATA_SOURCE *)wtds; wtext = ds->wtext; - /* Get the maximum file ID. */ - if ((ret = master_id_get(wtds, session, &maxid)) != 0) - return (ret); - ++maxid; - exclusive = 0; if ((ret = wtext->config_get(wtext, session, config, "exclusive", &a)) == 0) @@ -1499,17 +1303,14 @@ master_uri_set(WT_DATA_SOURCE *wtds, * update the master ID record. */ (void)snprintf(value, sizeof(value), - "uid=%" PRIu64 ",version=(major=%d,minor=%d)" ",key_format=%.*s,value_format=%.*s", - maxid, KVS_MAJOR, KVS_MINOR, (int)a.len, a.str, (int)b.len, b.str); + KVS_MAJOR, KVS_MINOR, (int)a.len, a.str, (int)b.len, b.str); if ((ret = wtext->metadata_insert(wtext, session, uri, value)) == 0) - return (master_id_set(wtds, session, maxid)); - + return (0); if (ret == WT_DUPLICATE_KEY) return (exclusive ? EEXIST : 0); - - ERET(wtext, session, ret, "%s: %s", KVS_MAXID, wtext->strerror(ret)); + ERET(wtext, session, ret, "%s: %s", uri, wtext->strerror(ret)); } /* @@ -1521,22 +1322,22 @@ kvs_session_create(WT_DATA_SOURCE *wtds, WT_SESSION *session, const char *uri, WT_CONFIG_ARG *config) { DATA_SOURCE *ds; - URI_SOURCE *us; WT_EXTENSION_API *wtext; + WT_SOURCE *ws; int ret = 0; ds = (DATA_SOURCE *)wtds; wtext = ds->wtext; - /* Get a locked reference to the URI. */ - if ((ret = uri_source_open(wtds, session, config, uri, 0, &us)) != 0) + /* Get a locked reference to the WiredTiger source. */ + if ((ret = ws_source_open(wtds, session, config, uri, 0, &ws)) != 0) return (ret); /* Create the URI master record if it doesn't already exist. */ ret = master_uri_set(wtds, session, uri, config); - /* Unlock the URI. */ - ETRET(unlock(wtext, session, &us->lock)); + /* Unlock the WiredTiger source. */ + ETRET(unlock(wtext, session, &ws->lock)); return (ret); } @@ -1551,49 +1352,57 @@ kvs_session_drop(WT_DATA_SOURCE *wtds, { DATA_SOURCE *ds; KVS_SOURCE *ks; - URI_SOURCE **p, *us; WT_EXTENSION_API *wtext; + WT_SOURCE **p, *ws; int ret = 0; ds = (DATA_SOURCE *)wtds; wtext = ds->wtext; - /* Get a locked reference to the data source. */ - if ((ret = uri_source_open(wtds, session, config, uri, 1, &us)) != 0) + /* + * Get a locked reference to the data source; hold the global lock as + * well, we are going to change the list of objects for a KVS store. + */ + if ((ret = ws_source_open(wtds, session, config, uri, 1, &ws)) != 0) return (ret); - ks = us->ks; + ks = ws->ks; /* If there are active references to the object, we're busy. */ - if (us->ref != 0) { + if (ws->ref != 0) { ret = EBUSY; goto err; } - /* Discard all of the rows in the object. */ - if ((ret = uri_truncate(wtds, session, us)) != 0) + /* Close and delete the underlying object. */ + if ((ret = kvs_close(ws->kvs)) != 0) { + ESET(wtext, session, WT_ERROR, + "kvs_close: %s: %s", ws->uri, kvs_strerror(ret)); goto err; - - /* - * Remove the entry from the URI_SOURCE list -- it's a singly-linked - * list, find the reference to it. - */ - for (p = &ks->uri_head; *p != NULL; p = &(*p)->next) - if (*p == us) - break; - /* - * We should be guaranteed to find an entry, we just looked it up and - * everything is locked down. - */ - if (*p == NULL) { - ret = WT_NOTFOUND; + } + if ((ret = kvs_delete_namespace(ks->kvs_device, ws->uri)) != 0) { + ESET(wtext, session, WT_ERROR, + "kvs_delete_namespace: %s: %s", ws->uri, kvs_strerror(ret)); goto err; } - *p = (*p)->next; /* Discard the metadata entry. */ ret = master_uri_drop(wtds, session, uri); -err: ETRET(unlock(wtext, session, &us->lock)); + /* + * Remove the entry from the WT_SOURCE list -- it's a singly-linked + * list, find the reference to it. + */ + for (p = &ks->ws_head; *p != NULL; p = &(*p)->next) + if (*p == ws) { + *p = (*p)->next; + + ETRET(lock_destroy(wtext, session, &ws->lock)); + free(ws->uri); + free(ws); + break; + } + +err: ETRET(unlock(wtext, session, &ds->global_lock)); return (ret); } @@ -1608,10 +1417,10 @@ kvs_session_open_cursor(WT_DATA_SOURCE *wtds, WT_SESSION *session, { CURSOR *cursor; DATA_SOURCE *ds; - URI_SOURCE *us; WT_CONFIG_ITEM v; WT_CURSOR *wtcursor; WT_EXTENSION_API *wtext; + WT_SOURCE *ws; int ret = 0; const char *value; @@ -1659,51 +1468,23 @@ kvs_session_open_cursor(WT_DATA_SOURCE *wtds, WT_SESSION *session, } cursor->config_overwrite = v.val != 0; - /* Get a locked reference to the URI. */ - if ((ret = uri_source_open(wtds, session, config, uri, 0, &us)) != 0) + /* Get a locked reference to the WiredTiger source. */ + if ((ret = ws_source_open(wtds, session, config, uri, 0, &ws)) != 0) goto err; /* - * Finish initializing the cursor (if the URI_SOURCE structure requires + * Finish initializing the cursor (if the WT_SOURCE structure requires * initialization, we're going to use the cursor as part of that work). */ - cursor->ks = us->ks; - cursor->us = us; + cursor->ws = ws; /* * If this is the first access to the URI, we have to configure it * using information stored in the master record. */ - if (!us->configured) { - us->configured = 1; - + if (!ws->configured) { if ((ret = master_uri_get(wtds, session, uri, &value)) != 0) goto err; - if ((ret = wtext->config_strget( - wtext, session, value, "uid", &v)) != 0) { - ESET(wtext, session, ret, - "WT_EXTENSION_API.config: uid: %s", - wtext->strerror(ret)); - goto err; - } - - /* - * Build packed versions of the unique ID and the next ID (the - * next ID is what we need to do a "previous" cursor traversal.) - */ - if ((ret = wtext->struct_size(wtext, - session, &us->idlen, "r", v.val)) != 0 || - (ret = wtext->struct_pack(wtext, - session, us->id, sizeof(us->id), "r", v.val)) != 0 || - (ret = wtext->struct_size(wtext, - session, &us->idnextlen, "r", v.val + 1)) != 0 || - (ret = wtext->struct_pack(wtext, session, - us->idnext, sizeof(us->idnext), "r", v.val + 1)) != 0) { - ESET(wtext, session, ret, - "WT_EXTENSION_API.config: uid: %s", - wtext->strerror(ret)); - goto err; - } if ((ret = wtext->config_strget( wtext, session, value, "key_format", &v)) != 0) { @@ -1712,7 +1493,7 @@ kvs_session_open_cursor(WT_DATA_SOURCE *wtds, WT_SESSION *session, wtext->strerror(ret)); goto err; } - us->config_recno = v.len == 1 && v.str[0] == 'r'; + ws->config_recno = v.len == 1 && v.str[0] == 'r'; if ((ret = wtext->config_strget( wtext, session, value, "value_format", &v)) != 0) { @@ -1721,31 +1502,42 @@ kvs_session_open_cursor(WT_DATA_SOURCE *wtds, WT_SESSION *session, wtext->strerror(ret)); goto err; } - us->config_bitfield = + ws->config_bitfield = v.len == 2 && isdigit(v.str[0]) && v.str[1] == 't'; + /* Open the underlying KVS namespace. */ + if ((ws->kvs = + kvs_open_namespace(ws->ks->kvs_device, uri)) == NULL) { + ESET(wtext, session, WT_ERROR, + "kvs_open_namespace: %s: %s", + uri, kvs_strerror(os_errno())); + goto err; + } + /* * If it's a record-number key, read the last record from the * object and set the allocation record value. */ - if (us->config_recno) { + if (ws->config_recno) { wtcursor = (WT_CURSOR *)cursor; if ((ret = kvs_cursor_reset(wtcursor)) != 0) goto err; if ((ret = kvs_cursor_prev(wtcursor)) == 0) - us->append_recno = wtcursor->recno; + ws->append_recno = wtcursor->recno; else if (ret != WT_NOTFOUND) goto err; if ((ret = kvs_cursor_reset(wtcursor)) != 0) goto err; } + + ws->configured = 1; } /* Increment the open reference count to pin the URI and unlock it. */ - ++us->ref; - if ((ret = unlock(wtext, session, &us->lock)) != 0) + ++ws->ref; + if ((ret = unlock(wtext, session, &ws->lock)) != 0) goto err; *new_cursor = (WT_CURSOR *)cursor; @@ -1769,8 +1561,9 @@ kvs_session_rename(WT_DATA_SOURCE *wtds, WT_SESSION *session, const char *uri, const char *newuri, WT_CONFIG_ARG *config) { DATA_SOURCE *ds; - URI_SOURCE *us; + KVS_SOURCE *ks; WT_EXTENSION_API *wtext; + WT_SOURCE *ws; int ret = 0; char *copy; @@ -1778,30 +1571,40 @@ kvs_session_rename(WT_DATA_SOURCE *wtds, WT_SESSION *session, wtext = ds->wtext; copy = NULL; - /* Get a locked reference to the URI. */ - if ((ret = uri_source_open(wtds, session, config, uri, 1, &us)) != 0) + /* + * Get a locked reference to the data source; hold the global lock as + * well, we are going to change the object's name, and we can't allow + * other threads to be walking the list and comparing against the name. + */ + if ((ret = ws_source_open(wtds, session, config, uri, 1, &ws)) != 0) return (ret); + ks = ws->ks; /* If there are active references to the object, we're busy. */ - if (us->ref != 0) { + if (ws->ref != 0) { ret = EBUSY; goto err; } - /* Get a copy of the new name to simplify cleanup. */ - if ((copy = strdup(newuri)) == NULL) - return (os_errno()); - /* Update the metadata record. */ if ((ret = master_uri_rename(wtds, session, uri, newuri)) != 0) goto err; - /* Swap our copy of the name. */ - free(us->name); - us->name = copy; + /* Update the structure name. */ + if ((copy = strdup(newuri)) == NULL) + return (os_errno()); + free(ws->uri); + ws->uri = copy; copy = NULL; -err: ETRET(unlock(wtext, session, &us->lock)); + /* Rename the KVS namespace. */ + if ((ret = kvs_rename_namespace(ks->kvs_device, uri, newuri)) != 0) { + ESET(wtext, session, WT_ERROR, + "kvs_rename_namespace: %s: %s", ws->uri, kvs_strerror(ret)); + goto err; + } + +err: ETRET(unlock(wtext, session, &ws->lock)); ETRET(unlock(wtext, session, &ds->global_lock)); free(copy); @@ -1817,24 +1620,31 @@ kvs_session_truncate(WT_DATA_SOURCE *wtds, WT_SESSION *session, const char *uri, WT_CONFIG_ARG *config) { DATA_SOURCE *ds; - URI_SOURCE *us; WT_EXTENSION_API *wtext; + WT_SOURCE *ws; int ret = 0; ds = (DATA_SOURCE *)wtds; wtext = ds->wtext; - /* Get a locked reference to the URI. */ - if ((ret = uri_source_open(wtds, session, config, uri, 0, &us)) != 0) + /* Get a locked reference to the WiredTiger source. */ + if ((ret = ws_source_open(wtds, session, config, uri, 0, &ws)) != 0) return (ret); - /* - * If there are active references to the object, we're busy. - * Else, discard the records. - */ - ret = us->ref == 0 ? uri_truncate(wtds, session, us) : EBUSY; + /* If there are active references to the object, we're busy. */ + if (ws->ref != 0) { + ret = EBUSY; + goto err; + } - ETRET(unlock(wtext, session, &us->lock)); + /* Truncate the underlying object. */ + if ((ret = kvs_truncate(ws->kvs)) != 0) { + ESET(wtext, session, WT_ERROR, + "kvs_truncate: %s: %s", ws->uri, kvs_strerror(ret)); + goto err; + } + +err: ETRET(unlock(wtext, session, &ws->lock)); return (ret); } @@ -1867,8 +1677,8 @@ kvs_terminate(WT_DATA_SOURCE *wtds, WT_SESSION *session) { DATA_SOURCE *ds; KVS_SOURCE *ks; - URI_SOURCE *us; WT_EXTENSION_API *wtext; + WT_SOURCE *ws; int tret, ret = 0; ds = (DATA_SOURCE *)wtds; @@ -1878,26 +1688,30 @@ kvs_terminate(WT_DATA_SOURCE *wtds, WT_SESSION *session) /* Start a flush on any open KVS sources. */ for (ks = ds->kvs_head; ks != NULL; ks = ks->next) - if ((tret = kvs_commit(ks->kvs)) != 0) + if ((tret = kvs_commit(ks->kvs_device)) != 0) ESET(wtext, session, WT_ERROR, "kvs_commit: %s: %s", ks->name, kvs_strerror(tret)); /* Close and discard all objects. */ while ((ks = ds->kvs_head) != NULL) { - while ((us = ks->uri_head) != NULL) { - if (us->ref != 0) + while ((ws = ks->ws_head) != NULL) { + if (ws->ref != 0) ESET(wtext, session, WT_ERROR, "%s: has open object %s with %u open " "cursors during close", - ks->name, us->name, us->ref); + ks->name, ws->uri, ws->ref); + if ((tret = kvs_close(ws->kvs)) != 0) + ESET(wtext, session, WT_ERROR, + "kvs_close: %s: %s", + ws->uri, kvs_strerror(tret)); - ks->uri_head = us->next; - ETRET(lock_destroy(wtext, session, &us->lock)); - free(us->name); - free(us); + ks->ws_head = ws->next; + ETRET(lock_destroy(wtext, session, &ws->lock)); + free(ws->uri); + free(ws); } - if ((tret = kvs_close(ks->kvs)) != 0) + if ((tret = kvs_close(ks->kvs_device)) != 0) ESET(wtext, session, WT_ERROR, "kvs_close: %s: %s", ks->name, kvs_strerror(tret)); @@ -1947,7 +1761,7 @@ wiredtiger_extension_init(WT_CONNECTION *connection, WT_CONFIG_ARG *config) wtext = connection->get_extension_api(connection); /* Check the library version */ -#if KVS_VERSION_MAJOR != 2 || KVS_VERSION_MINOR != 8 +#if KVS_VERSION_MAJOR != 4 || KVS_VERSION_MINOR != 1 ERET(wtext, NULL, EINVAL, "unsupported KVS library version %d.%d", KVS_VERSION_MAJOR, KVS_VERSION_MINOR); |