diff options
-rw-r--r-- | ext/test/memrata/memrata.c | 402 |
1 files changed, 117 insertions, 285 deletions
diff --git a/ext/test/memrata/memrata.c b/ext/test/memrata/memrata.c index d417b99694d..692e2163f35 100644 --- a/ext/test/memrata/memrata.c +++ b/ext/test/memrata/memrata.c @@ -73,40 +73,19 @@ #define KVS_MAJOR 1 /* KVS major, minor version */ #define KVS_MINOR 0 -/* - * We partition the flat space into a set of objects based on a packed, 8B - * leading byte. Object IDs from 0 to KVS_MAXID_BASE are reserved for future - * use -- I can't think of any reason I'd need them, but it's a cheap backup - * plan. - */ -#define KVS_MAXID "memrata:WiredTiger.maxid" /* max object ID key */ -#define KVS_MAXID_BASE 6 /* first object ID */ - typedef struct __wt_source { - char *name; /* Unique name */ + char *uri; /* Unique name */ pthread_rwlock_t lock; /* Lock */ int configured; /* If structure configured */ u_int ref; /* Active reference count */ - /* - * Each object has a unique leading byte prefix, which is the object's - * ID turned into a packed string (any 8B unsigned value will pack - * into a maximum of 9 bytes.) We create a packed copy of the object's - * ID and a packed copy of the ID one greater than the object's ID, - * the latter is what we use for a "previous" cursor traversal. - */ -#define KVS_MAX_PACKED_8B 10 - uint8_t id[KVS_MAX_PACKED_8B]; /* Packed unique ID prefix */ - size_t idlen; /* ID prefix length */ - uint8_t idnext[KVS_MAX_PACKED_8B]; /* Packed next ID prefix */ - size_t idnextlen; /* Next ID prefix length */ - uint64_t append_recno; /* Allocation record number */ int config_recno; /* config "key_format=r" */ int config_bitfield; /* config "value_format=#t" */ + kvs_t kvs; /* Underlying KVS namespace */ struct __kvs_source *ks; /* Underlying KVS source */ struct __wt_source *next; /* List of WiredTiger objects */ @@ -114,7 +93,7 @@ typedef struct __wt_source { typedef struct __kvs_source { char *name; /* Unique name */ - kvs_t kvs; /* Underlying KVS reference */ + kvs_t kvs_device; /* Underlying KVS store */ struct __wt_source *ws_head; /* List of WiredTiger sources */ @@ -126,7 +105,6 @@ typedef struct __cursor { WT_EXTENSION_API *wtext; /* Extension functions */ - KVS_SOURCE *ks; /* KVS source */ WT_SOURCE *ws; /* WiredTiger source */ struct kvs_record record; /* Record */ @@ -232,7 +210,6 @@ copyin_key(WT_CURSOR *wtcursor, int allocate_key) WT_SESSION *session; WT_SOURCE *ws; size_t i, size; - uint8_t *p, *t; int ret = 0; session = wtcursor->session; @@ -241,14 +218,6 @@ copyin_key(WT_CURSOR *wtcursor, int allocate_key) wtext = cursor->wtext; r = &cursor->record; - r->key = cursor->key; - r->key_len = 0; - - /* Prefix the key with the object's unique ID. */ - for (p = ws->id, t = r->key, i = 0; i < ws->idlen; ++i) - *t++ = *p++; - r->key_len += ws->idlen; - if (ws->config_recno) { /* * Allocate a new record for append operations. @@ -284,17 +253,23 @@ copyin_key(WT_CURSOR *wtcursor, int allocate_key) if ((ret = wtext->struct_size(wtext, session, &size, "r", wtcursor->recno)) != 0 || - (ret = wtext->struct_pack(wtext, session, t, - sizeof(r->key) - ws->idlen, "r", wtcursor->recno)) != 0) + (ret = wtext->struct_pack(wtext, session, cursor->key, + sizeof(cursor->key), "r", wtcursor->recno)) != 0) return (ret); - r->key_len += size; + r->key = cursor->key; + r->key_len = size; } else { - if (wtcursor->key.size + ws->idlen > KVS_MAX_KEY_LEN) + if (wtcursor->key.size > KVS_MAX_KEY_LEN) ERET(wtext, session, ERANGE, "key size of %" PRIuMAX " is too large", (uintmax_t)wtcursor->key.size); - memcpy(t, wtcursor->key.data, wtcursor->key.size); - r->key_len += wtcursor->key.size; + + /* + * XXX + * The underlying KVS library data fields aren't const. + */ + r->key = (char *)wtcursor->key.data; + r->key_len = wtcursor->key.size; } return (0); } @@ -311,8 +286,6 @@ copyout_key(WT_CURSOR *wtcursor) WT_EXTENSION_API *wtext; WT_SESSION *session; WT_SOURCE *ws; - size_t i, len; - uint8_t *p, *t; int ret = 0; session = wtcursor->session; @@ -321,25 +294,13 @@ copyout_key(WT_CURSOR *wtcursor) ws = cursor->ws; r = &cursor->record; - - /* - * Check the object's unique ID, if it doesn't match we've hit the end - * of the object on a cursor search. - */ - if (r->key_len < ws->idlen) - return (WT_NOTFOUND); - for (p = ws->id, t = r->key, i = 0; i < ws->idlen; ++i) - if (*t++ != *p++) - return (WT_NOTFOUND); - len = r->key_len - ws->idlen; - if (ws->config_recno) { - if ((ret = wtext->struct_unpack( - wtext, session, t, len, "r", &wtcursor->recno)) != 0) + if ((ret = wtext->struct_unpack(wtext, + session, r->key, r->key_len, "r", &wtcursor->recno)) != 0) return (ret); } else { - wtcursor->key.data = t; - wtcursor->key.size = (uint32_t)len; + wtcursor->key.data = r->key; + wtcursor->key.size = r->key_len; } return (0); } @@ -471,7 +432,7 @@ kvs_call(WT_CURSOR *wtcursor, const char *fname, cursor = (CURSOR *)wtcursor; wtext = cursor->wtext; - kvs = cursor->ks->kvs; + kvs = cursor->ws->kvs; cursor->record.val = cursor->val; restart: @@ -519,28 +480,6 @@ restart: } /* - * nextprev_init -- - * Initialize a cursor for a next/previous traversal. - */ -static INLINE void -nextprev_init(WT_CURSOR *wtcursor, uint8_t *id, size_t idlen) -{ - CURSOR *cursor; - size_t i; - uint8_t *p, *t; - - cursor = (CURSOR *)wtcursor; - - /* Prefix the key with a unique ID. */ - for (p = id, - t = cursor->key, i = 0; i < idlen; ++i) - *t++ = *p++; - - cursor->record.key = cursor->key; - cursor->record.key_len = idlen; -} - -/* * kvs_cursor_next -- * WT_CURSOR::next method. */ @@ -554,13 +493,6 @@ kvs_cursor_next(WT_CURSOR *wtcursor) cursor = (CURSOR *)wtcursor; ws = cursor->ws; - /* - * If this is the start of a new traversal, set the key to the first - * possible record for the object. - */ - if (cursor->record.key_len == 0) - nextprev_init(wtcursor, ws->id, ws->idlen); - if ((ret = copy_key(wtcursor)) != 0) return (ret); if ((ret = kvs_call(wtcursor, "kvs_next", kvs_next)) != 0) @@ -586,13 +518,6 @@ kvs_cursor_prev(WT_CURSOR *wtcursor) cursor = (CURSOR *)wtcursor; ws = cursor->ws; - /* - * If this is the start of a new traversal, set the key to the first - * possible record after the object. - */ - if (cursor->record.key_len == 0) - nextprev_init(wtcursor, ws->idnext, ws->idnextlen); - if ((ret = copy_key(wtcursor)) != 0) return (ret); if ((ret = kvs_call(wtcursor, "kvs_prev", kvs_prev)) != 0) @@ -692,15 +617,15 @@ static int kvs_cursor_insert(WT_CURSOR *wtcursor) { CURSOR *cursor; - KVS_SOURCE *ks; WT_EXTENSION_API *wtext; WT_SESSION *session; + WT_SOURCE *ws; int ret = 0; session = wtcursor->session; cursor = (CURSOR *)wtcursor; wtext = cursor->wtext; - ks = cursor->ks; + ws = cursor->ws; if ((ret = copyin_key(wtcursor, 1)) != 0) return (ret); @@ -715,11 +640,11 @@ kvs_cursor_insert(WT_CURSOR *wtcursor) * does not exist, fail if it does exist), maps to kvs_add. */ if (cursor->config_overwrite) { - if ((ret = kvs_set(ks->kvs, &cursor->record)) != 0) + if ((ret = kvs_set(ws->kvs, &cursor->record)) != 0) ERET(wtext, session, WT_ERROR, "kvs_set: %s", kvs_strerror(ret)); } else - if ((ret = kvs_add(ks->kvs, &cursor->record)) != 0) { + if ((ret = kvs_add(ws->kvs, &cursor->record)) != 0) { if (ret == KVS_E_KEY_EXISTS) return (WT_DUPLICATE_KEY); ERET(wtext, session, WT_ERROR, @@ -736,15 +661,15 @@ static int kvs_cursor_update(WT_CURSOR *wtcursor) { CURSOR *cursor; - KVS_SOURCE *ks; WT_EXTENSION_API *wtext; WT_SESSION *session; + WT_SOURCE *ws; int ret = 0; session = wtcursor->session; cursor = (CURSOR *)wtcursor; wtext = cursor->wtext; - ks = cursor->ks; + ws = cursor->ws; if ((ret = copyin_key(wtcursor, 0)) != 0) return (ret); @@ -755,7 +680,7 @@ kvs_cursor_update(WT_CURSOR *wtcursor) * WT_CURSOR::update (update the record if it does exist, fail if it * does not exist), maps to kvs_replace. */ - if ((ret = kvs_replace(ks->kvs, &cursor->record)) != 0) + if ((ret = kvs_replace(ws->kvs, &cursor->record)) != 0) ERET(wtext, session, WT_ERROR, "kvs_replace: %s", kvs_strerror(ret)); @@ -770,7 +695,6 @@ static int kvs_cursor_remove(WT_CURSOR *wtcursor) { CURSOR *cursor; - KVS_SOURCE *ks; WT_EXTENSION_API *wtext; WT_SESSION *session; WT_SOURCE *ws; @@ -779,7 +703,6 @@ kvs_cursor_remove(WT_CURSOR *wtcursor) session = wtcursor->session; cursor = (CURSOR *)wtcursor; wtext = cursor->wtext; - ks = cursor->ks; ws = cursor->ws; /* @@ -794,7 +717,7 @@ kvs_cursor_remove(WT_CURSOR *wtcursor) if ((ret = copyin_key(wtcursor, 0)) != 0) return (ret); - if ((ret = kvs_del(ks->kvs, &cursor->record)) == 0) + if ((ret = kvs_del(ws->kvs, &cursor->record)) == 0) return (0); if (ret == KVS_E_KEY_NOT_FOUND) return (WT_NOTFOUND); @@ -1163,8 +1086,9 @@ kvs_source_open(WT_DATA_SOURCE *wtds, devices = NULL; /* Open the KVS handle last, so cleanup is easier. */ - ks->kvs = kvs_open(device_list, &kvs_config, flags | KVS_O_CREATE); - if (ks->kvs == NULL) { + ks->kvs_device = + kvs_open(device_list, &kvs_config, flags | KVS_O_CREATE); + if (ks->kvs_device == NULL) { ESET(wtext, session, WT_ERROR, "kvs_open: %s: %s", ks->name, kvs_strerror(os_errno())); goto err; @@ -1220,12 +1144,12 @@ ws_source_open(WT_DATA_SOURCE *wtds, WT_SESSION *session, /* Check for a match: if we find one, we're done. */ for (ws = ks->ws_head; ws != NULL; ws = ws->next) - if (strcmp(ws->name, uri) == 0) + if (strcmp(ws->uri, uri) == 0) goto done; /* Allocate and initialize a new underlying WiredTiger source object. */ if ((ws = calloc(1, sizeof(*ws))) == NULL || - (ws->name = strdup(uri)) == NULL) { + (ws->uri = strdup(uri)) == NULL) { ret = os_errno(); goto err; } @@ -1249,7 +1173,7 @@ done: if ((ret = writelock(wtext, session, &ws->lock)) != 0) err: if (lockinit) ETRET(lock_destroy(wtext, session, &ws->lock)); if (ws != NULL) { - free(ws->name); + free(ws->uri); free(ws); } } @@ -1261,105 +1185,6 @@ err: if (lockinit) } /* - * ws_truncate -- - * Discard the records for an object. - */ -static int -ws_truncate(WT_DATA_SOURCE *wtds, WT_SESSION *session, WT_SOURCE *ws) -{ - struct kvs_record *r, _r; - DATA_SOURCE *ds; - WT_EXTENSION_API *wtext; - kvs_t kvs; - size_t i; - int ret = 0; - uint8_t *p, *t; - char key[KVS_MAX_KEY_LEN]; - - ds = (DATA_SOURCE *)wtds; - wtext = ds->wtext; - kvs = ws->ks->kvs; - - /* Walk the list of objects, discarding them all. */ - r = &_r; - memset(r, 0, sizeof(*r)); - r->key = key; - memcpy(r->key, ws->id, ws->idlen); - r->key_len = ws->idlen; - r->val = NULL; - r->val_len = 0; - while ((ret = kvs_next(kvs, r, 0UL, 0UL)) == 0) { - /* - * Check for an object ID mismatch, if we find one, we're - * done. - */ - for (p = ws->id, t = r->key, i = 0; i < ws->idlen; ++i) - if (*t++ != *p++) - return (0); - if ((ret = kvs_del(kvs, r)) != 0) { - ESET(wtext, session, - WT_ERROR, "kvs_del: %s", kvs_strerror(ret)); - return (ret); - } - } - if (ret == KVS_E_KEY_NOT_FOUND) - ret = 0; - if (ret != 0) - ESET(wtext, session, - WT_ERROR, "kvs_next: %s", kvs_strerror(ret)); - return (ret); -} - -/* - * master_id_get -- - * Return the maximum file ID in the system. - */ -static int -master_id_get(WT_DATA_SOURCE *wtds, WT_SESSION *session, uint64_t *maxidp) -{ - DATA_SOURCE *ds; - WT_EXTENSION_API *wtext; - int ret = 0; - const char *value; - - ds = (DATA_SOURCE *)wtds; - wtext = ds->wtext; - - if ((ret = - wtext->metadata_search(wtext, session, KVS_MAXID, &value)) == 0) { - *maxidp = strtouq(value, NULL, 10); - return (0); - } - if (ret == WT_NOTFOUND) { - *maxidp = KVS_MAXID_BASE; - return (0); - } - ERET(wtext, session, ret, "%s: %s", KVS_MAXID, wtext->strerror(ret)); -} - -/* - * master_id_set -- - * Increment the maximum file ID in the system. - */ -static int -master_id_set(WT_DATA_SOURCE *wtds, WT_SESSION *session, uint64_t maxid) -{ - DATA_SOURCE *ds; - WT_EXTENSION_API *wtext; - int ret = 0; - char value[32]; /* Large enough for any 8B value. */ - - ds = (DATA_SOURCE *)wtds; - wtext = ds->wtext; - - (void)snprintf(value, sizeof(value), "%" PRIu64, maxid); - if ((ret = - wtext->metadata_update(wtext, session, KVS_MAXID, value)) == 0) - return (0); - ERET(wtext, session, ret, "%s: %s", KVS_MAXID, wtext->strerror(ret)); -} - -/* * master_uri_get -- * Get the KVS master record for a URI. */ @@ -1407,11 +1232,12 @@ master_uri_rename(WT_DATA_SOURCE *wtds, ds = (DATA_SOURCE *)wtds; wtext = ds->wtext; + value = NULL; /* Insert the record under a new name. */ if ((ret = master_uri_get(wtds, session, uri, &value)) != 0 || (ret = wtext->metadata_insert(wtext, session, newuri, value)) != 0) - return (ret); + goto err; /* * Remove the original record, and if that fails, attempt to remove @@ -1419,6 +1245,8 @@ master_uri_rename(WT_DATA_SOURCE *wtds, */ if ((ret = wtext->metadata_remove(wtext, session, uri)) != 0) (void)wtext->metadata_remove(wtext, session, newuri); + +err: free((void *)value); return (ret); } @@ -1433,18 +1261,12 @@ master_uri_set(WT_DATA_SOURCE *wtds, DATA_SOURCE *ds; WT_CONFIG_ITEM a, b; WT_EXTENSION_API *wtext; - uint64_t maxid; int exclusive, ret = 0; char value[1024]; ds = (DATA_SOURCE *)wtds; wtext = ds->wtext; - /* Get the maximum file ID. */ - if ((ret = master_id_get(wtds, session, &maxid)) != 0) - return (ret); - ++maxid; - exclusive = 0; if ((ret = wtext->config_get(wtext, session, config, "exclusive", &a)) == 0) @@ -1481,17 +1303,14 @@ master_uri_set(WT_DATA_SOURCE *wtds, * update the master ID record. */ (void)snprintf(value, sizeof(value), - "uid=%" PRIu64 ",version=(major=%d,minor=%d)" ",key_format=%.*s,value_format=%.*s", - maxid, KVS_MAJOR, KVS_MINOR, (int)a.len, a.str, (int)b.len, b.str); + KVS_MAJOR, KVS_MINOR, (int)a.len, a.str, (int)b.len, b.str); if ((ret = wtext->metadata_insert(wtext, session, uri, value)) == 0) - return (master_id_set(wtds, session, maxid)); - + return (0); if (ret == WT_DUPLICATE_KEY) return (exclusive ? EEXIST : 0); - - ERET(wtext, session, ret, "%s: %s", KVS_MAXID, wtext->strerror(ret)); + ERET(wtext, session, ret, "%s: %s", uri, wtext->strerror(ret)); } /* @@ -1540,7 +1359,10 @@ kvs_session_drop(WT_DATA_SOURCE *wtds, ds = (DATA_SOURCE *)wtds; wtext = ds->wtext; - /* Get a locked reference to the data source. */ + /* + * Get a locked reference to the data source; hold the global lock as + * well, we are going to change the list of objects for a KVS store. + */ if ((ret = ws_source_open(wtds, session, config, uri, 1, &ws)) != 0) return (ret); ks = ws->ks; @@ -1551,31 +1373,36 @@ kvs_session_drop(WT_DATA_SOURCE *wtds, goto err; } - /* Discard all of the rows in the object. */ - if ((ret = ws_truncate(wtds, session, ws)) != 0) + /* Close and delete the underlying object. */ + if ((ret = kvs_close(ws->kvs)) != 0) { + ESET(wtext, session, WT_ERROR, + "kvs_close: %s: %s", ws->uri, kvs_strerror(ret)); + goto err; + } + if ((ret = kvs_delete_namespace(ks->kvs_device, ws->uri)) != 0) { + ESET(wtext, session, WT_ERROR, + "kvs_delete_namespace: %s: %s", ws->uri, kvs_strerror(ret)); goto err; + } + + /* Discard the metadata entry. */ + ret = master_uri_drop(wtds, session, uri); /* * Remove the entry from the WT_SOURCE list -- it's a singly-linked * list, find the reference to it. */ for (p = &ks->ws_head; *p != NULL; p = &(*p)->next) - if (*p == ws) - break; - /* - * We should be guaranteed to find an entry, we just looked it up and - * everything is locked down. - */ - if (*p == NULL) { - ret = WT_NOTFOUND; - goto err; - } - *p = (*p)->next; + if (*p == ws) { + *p = (*p)->next; - /* Discard the metadata entry. */ - ret = master_uri_drop(wtds, session, uri); + ETRET(lock_destroy(wtext, session, &ws->lock)); + free(ws->uri); + free(ws); + break; + } -err: ETRET(unlock(wtext, session, &ws->lock)); +err: ETRET(unlock(wtext, session, &ds->global_lock)); return (ret); } @@ -1649,7 +1476,6 @@ kvs_session_open_cursor(WT_DATA_SOURCE *wtds, WT_SESSION *session, * Finish initializing the cursor (if the WT_SOURCE structure requires * initialization, we're going to use the cursor as part of that work). */ - cursor->ks = ws->ks; cursor->ws = ws; /* @@ -1657,35 +1483,8 @@ kvs_session_open_cursor(WT_DATA_SOURCE *wtds, WT_SESSION *session, * using information stored in the master record. */ if (!ws->configured) { - ws->configured = 1; - if ((ret = master_uri_get(wtds, session, uri, &value)) != 0) goto err; - if ((ret = wtext->config_strget( - wtext, session, value, "uid", &v)) != 0) { - ESET(wtext, session, ret, - "WT_EXTENSION_API.config: uid: %s", - wtext->strerror(ret)); - goto err; - } - - /* - * Build packed versions of the unique ID and the next ID (the - * next ID is what we need to do a "previous" cursor traversal.) - */ - if ((ret = wtext->struct_size(wtext, - session, &ws->idlen, "r", v.val)) != 0 || - (ret = wtext->struct_pack(wtext, - session, ws->id, sizeof(ws->id), "r", v.val)) != 0 || - (ret = wtext->struct_size(wtext, - session, &ws->idnextlen, "r", v.val + 1)) != 0 || - (ret = wtext->struct_pack(wtext, session, - ws->idnext, sizeof(ws->idnext), "r", v.val + 1)) != 0) { - ESET(wtext, session, ret, - "WT_EXTENSION_API.config: uid: %s", - wtext->strerror(ret)); - goto err; - } if ((ret = wtext->config_strget( wtext, session, value, "key_format", &v)) != 0) { @@ -1706,6 +1505,15 @@ kvs_session_open_cursor(WT_DATA_SOURCE *wtds, WT_SESSION *session, ws->config_bitfield = v.len == 2 && isdigit(v.str[0]) && v.str[1] == 't'; + /* Open the underlying KVS namespace. */ + if ((ws->kvs = + kvs_open_namespace(ws->ks->kvs_device, uri)) == NULL) { + ESET(wtext, session, WT_ERROR, + "kvs_open_namespace: %s: %s", + uri, kvs_strerror(os_errno())); + goto err; + } + /* * If it's a record-number key, read the last record from the * object and set the allocation record value. @@ -1723,6 +1531,8 @@ kvs_session_open_cursor(WT_DATA_SOURCE *wtds, WT_SESSION *session, if ((ret = kvs_cursor_reset(wtcursor)) != 0) goto err; } + + ws->configured = 1; } /* Increment the open reference count to pin the URI and unlock it. */ @@ -1751,6 +1561,7 @@ kvs_session_rename(WT_DATA_SOURCE *wtds, WT_SESSION *session, const char *uri, const char *newuri, WT_CONFIG_ARG *config) { DATA_SOURCE *ds; + KVS_SOURCE *ks; WT_EXTENSION_API *wtext; WT_SOURCE *ws; int ret = 0; @@ -1760,9 +1571,14 @@ kvs_session_rename(WT_DATA_SOURCE *wtds, WT_SESSION *session, wtext = ds->wtext; copy = NULL; - /* Get a locked reference to the WiredTiger source. */ + /* + * Get a locked reference to the data source; hold the global lock as + * well, we are going to change the object's name, and we can't allow + * other threads to be walking the list and comparing against the name. + */ if ((ret = ws_source_open(wtds, session, config, uri, 1, &ws)) != 0) return (ret); + ks = ws->ks; /* If there are active references to the object, we're busy. */ if (ws->ref != 0) { @@ -1770,19 +1586,24 @@ kvs_session_rename(WT_DATA_SOURCE *wtds, WT_SESSION *session, goto err; } - /* Get a copy of the new name to simplify cleanup. */ - if ((copy = strdup(newuri)) == NULL) - return (os_errno()); - /* Update the metadata record. */ if ((ret = master_uri_rename(wtds, session, uri, newuri)) != 0) goto err; - /* Swap our copy of the name. */ - free(ws->name); - ws->name = copy; + /* Update the structure name. */ + if ((copy = strdup(newuri)) == NULL) + return (os_errno()); + free(ws->uri); + ws->uri = copy; copy = NULL; + /* Rename the KVS namespace. */ + if ((ret = kvs_rename_namespace(ks->kvs_device, uri, newuri)) != 0) { + ESET(wtext, session, WT_ERROR, + "kvs_rename_namespace: %s: %s", ws->uri, kvs_strerror(ret)); + goto err; + } + err: ETRET(unlock(wtext, session, &ws->lock)); ETRET(unlock(wtext, session, &ds->global_lock)); @@ -1810,13 +1631,20 @@ kvs_session_truncate(WT_DATA_SOURCE *wtds, if ((ret = ws_source_open(wtds, session, config, uri, 0, &ws)) != 0) return (ret); - /* - * If there are active references to the object, we're busy. - * Else, discard the records. - */ - ret = ws->ref == 0 ? ws_truncate(wtds, session, ws) : EBUSY; + /* If there are active references to the object, we're busy. */ + if (ws->ref != 0) { + ret = EBUSY; + goto err; + } - ETRET(unlock(wtext, session, &ws->lock)); + /* Truncate the underlying object. */ + if ((ret = kvs_truncate(ws->kvs)) != 0) { + ESET(wtext, session, WT_ERROR, + "kvs_truncate: %s: %s", ws->uri, kvs_strerror(ret)); + goto err; + } + +err: ETRET(unlock(wtext, session, &ws->lock)); return (ret); } @@ -1860,7 +1688,7 @@ kvs_terminate(WT_DATA_SOURCE *wtds, WT_SESSION *session) /* Start a flush on any open KVS sources. */ for (ks = ds->kvs_head; ks != NULL; ks = ks->next) - if ((tret = kvs_commit(ks->kvs)) != 0) + if ((tret = kvs_commit(ks->kvs_device)) != 0) ESET(wtext, session, WT_ERROR, "kvs_commit: %s: %s", ks->name, kvs_strerror(tret)); @@ -1871,15 +1699,19 @@ kvs_terminate(WT_DATA_SOURCE *wtds, WT_SESSION *session) ESET(wtext, session, WT_ERROR, "%s: has open object %s with %u open " "cursors during close", - ks->name, ws->name, ws->ref); + ks->name, ws->uri, ws->ref); + if ((tret = kvs_close(ws->kvs)) != 0) + ESET(wtext, session, WT_ERROR, + "kvs_close: %s: %s", + ws->uri, kvs_strerror(tret)); ks->ws_head = ws->next; ETRET(lock_destroy(wtext, session, &ws->lock)); - free(ws->name); + free(ws->uri); free(ws); } - if ((tret = kvs_close(ks->kvs)) != 0) + if ((tret = kvs_close(ks->kvs_device)) != 0) ESET(wtext, session, WT_ERROR, "kvs_close: %s: %s", ks->name, kvs_strerror(tret)); |