summaryrefslogtreecommitdiff
path: root/src/btree/col_modify.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/btree/col_modify.c')
-rw-r--r--src/btree/col_modify.c244
1 files changed, 146 insertions, 98 deletions
diff --git a/src/btree/col_modify.c b/src/btree/col_modify.c
index c91bda56308..1ad680013f0 100644
--- a/src/btree/col_modify.c
+++ b/src/btree/col_modify.c
@@ -7,63 +7,51 @@
#include "wt_internal.h"
-static int __col_insert_alloc(
- WT_SESSION_IMPL *, uint64_t, u_int, WT_INSERT **, size_t *);
+static int __col_insert_alloc(WT_SESSION_IMPL *, uint64_t, u_int, WT_INSERT **);
/*
* __wt_col_modify --
- * Column-store delete insert, and update.
+ * Column-store delete, insert, and update.
*/
int
-__wt_col_modify(WT_SESSION_IMPL *session, WT_CURSOR_BTREE *cbt, int is_remove)
+__wt_col_modify(WT_SESSION_IMPL *session, WT_CURSOR_BTREE *cbt, int op)
{
WT_BTREE *btree;
WT_INSERT *ins;
WT_INSERT_HEAD **inshead, *new_inshead, **new_inslist;
WT_ITEM *value, _value;
WT_PAGE *page;
- WT_SESSION_BUFFER *sb;
WT_UPDATE *upd;
- size_t ins_size, new_inslist_size, new_inshead_size, upd_size;
+ size_t new_inshead_size, new_inslist_size;
uint64_t recno;
u_int skipdepth;
- int hazard_ref, i, ret;
+ int i, ret;
btree = cbt->btree;
page = cbt->page;
- recno = cbt->iface.recno;
- if (is_remove) {
+ switch (op) {
+ case 1: /* Insert */
+ page = btree->last_page;
+ __cursor_search_clear(cbt);
+
+ value = (WT_ITEM *)&cbt->iface.value;
+ recno = 0; /* Engine allocates */
+ break;
+ case 2: /* Remove */
if (btree->type == BTREE_COL_FIX) {
value = &_value;
value->data = "";
value->size = 1;
} else
value = NULL;
- } else
+ recno = cbt->iface.recno; /* App specified */
+ break;
+ case 3: /* Update */
+ default:
value = (WT_ITEM *)&cbt->iface.value;
-
- /*
- * Append a column-store entry (the only place you can insert into a
- * column-store file is after the key space, column-store records are
- * immutable). If we don't have an exact match, it's an append and we
- * need to extend the file.
- */
- if (cbt->compare != 0) {
- /*
- * We may have, and need to hold, a hazard reference on a page,
- * but we're possibly doing some page shuffling of the root,
- * which means the standard test to determine whether we should
- * release a hazard reference on the page isn't right. Check
- * now, before we do the page shuffling.
- */
- hazard_ref = page == session->btree->root_page.page ? 0 : 1;
- ret = __wt_col_extend(session, page, recno);
- if (hazard_ref) {
- __wt_page_release(session, page);
- cbt->page = NULL; /* XXX WRONG */
- }
- return (ret == 0 ? WT_RESTART : 0);
+ recno = cbt->iface.recno; /* App specified */
+ break;
}
ins = NULL;
@@ -73,63 +61,45 @@ __wt_col_modify(WT_SESSION_IMPL *session, WT_CURSOR_BTREE *cbt, int is_remove)
ret = 0;
/*
- * Delete or update a column-store entry.
- * Column-store changes mean working in a WT_INSERT list.
+ * Delete, insert or update a column-store entry.
*/
- if (cbt->ins != NULL) {
- /*
- * If changing an already changed record, create a new WT_UPDATE
- * entry and have the workQ link it into an existing WT_INSERT
- * entry's WT_UPDATE list.
- */
- WT_ERR(__wt_update_alloc(session, value, &upd, &upd_size));
-
- /* workQ: insert the WT_UPDATE structure. */
- ret = __wt_update_serial(session, page,
- cbt->write_gen, &cbt->ins->upd, NULL, 0, &upd, upd_size);
- } else {
- /*
- * We may not have an WT_INSERT_HEAD array (in the case of
- * variable length column store) or WT_INSERT_HEAD slot (in the
- * case of fixed length column store). Also, there may be an
- * insert array but no list at the point we are inserting.
- * Allocate as necessary.
- */
+ if (cbt->ins == NULL) {
+ /* There may be no WT_INSERT_HEAD, allocate as necessary. */
new_inshead_size = new_inslist_size = 0;
- if (page->u.col_leaf.ins == NULL)
- switch (page->type) {
- case WT_PAGE_COL_FIX:
+ if (op == 1) {
+ if (page == NULL || page->u.col_leaf.append == NULL) {
new_inslist_size = 1 *
sizeof(WT_INSERT_HEAD *);
- WT_ERR(__wt_calloc_def(session,
- 1, &new_inslist));
+ WT_ERR(
+ __wt_calloc_def(session, 1, &new_inslist));
inshead = &new_inslist[0];
- break;
- case WT_PAGE_COL_VAR:
+ } else
+ inshead = &page->u.col_leaf.append[0];
+ cbt->ins_head = *inshead;
+ } else if (page->type == WT_PAGE_COL_FIX) {
+ if (page->u.col_leaf.ins == NULL) {
+ new_inslist_size = 1 *
+ sizeof(WT_INSERT_HEAD *);
+ WT_ERR(
+ __wt_calloc_def(session, 1, &new_inslist));
+ inshead = &new_inslist[0];
+ } else
+ inshead = &page->u.col_leaf.ins[0];
+ } else {
+ if (page->u.col_leaf.ins == NULL) {
new_inslist_size = page->entries *
sizeof(WT_INSERT_HEAD *);
WT_ERR(__wt_calloc_def(
session, page->entries, &new_inslist));
inshead = &new_inslist[cbt->slot];
- break;
- WT_ILLEGAL_FORMAT(session);
- }
- else
- switch (page->type) {
- case WT_PAGE_COL_FIX:
- inshead = &page->u.col_leaf.ins[0];
- break;
- case WT_PAGE_COL_VAR:
+ } else
inshead = &page->u.col_leaf.ins[cbt->slot];
- break;
- WT_ILLEGAL_FORMAT(session);
- }
+ }
+ /* There may be no WT_INSERT list, allocate as necessary. */
if (*inshead == NULL) {
new_inshead_size = sizeof(WT_INSERT_HEAD);
- WT_RET(__wt_sb_alloc(session,
- sizeof(WT_INSERT_HEAD), &new_inshead, &sb));
- new_inshead->sb = sb;
+ WT_RET(__wt_calloc_def(session, 1, &new_inshead));
for (i = 0; i < WT_SKIP_MAXDEPTH; i++)
cbt->ins_stack[i] = &new_inshead->head[i];
cbt->ins_head = new_inshead;
@@ -139,30 +109,43 @@ __wt_col_modify(WT_SESSION_IMPL *session, WT_CURSOR_BTREE *cbt, int is_remove)
skipdepth = __wt_skip_choose_depth();
/*
- * Allocate a new WT_INSERT/WT_UPDATE pair, link it into the
- * WT_INSERT array.
+ * Allocate a WT_INSERT/WT_UPDATE pair, and update the cursor
+ * to reference it.
*/
- WT_ERR(__col_insert_alloc(
- session, recno, skipdepth, &ins, &ins_size));
- WT_ERR(__wt_update_alloc(session, value, &upd, &upd_size));
+ WT_ERR(__col_insert_alloc(session, recno, skipdepth, &ins));
+ WT_ERR(__wt_update_alloc(session, value, &upd));
ins->upd = upd;
- ins_size += upd_size;
cbt->ins = ins;
/*
- * workQ: insert the WT_INSERT structure.
- *
- * For fixed-width stores, we are installing a single insert
- * head for the page. Pass NULL to the insert serialization
- * function, there is no need to set it again, and we only want
- * to account for it once.
+ * workQ: insert or append the WT_INSERT structure.
*/
- ret = __wt_insert_serial(session,
- page, cbt->write_gen,
- inshead, cbt->ins_stack,
- &new_inslist, new_inslist_size,
- &new_inshead, new_inshead_size,
- &ins, ins_size, skipdepth);
+ if (op == 1) {
+ WT_ERR(__wt_append_serial(session,
+ inshead, cbt->ins_stack,
+ &new_inslist, new_inslist_size,
+ &new_inshead, new_inshead_size, ins, skipdepth));
+
+ /* Set up the cursor for the inserted page and value. */
+ cbt->page = btree->last_page;
+ cbt->recno = WT_INSERT_RECNO(ins);
+ } else
+ WT_ERR(__wt_insert_serial(session,
+ page, cbt->write_gen,
+ inshead, cbt->ins_stack,
+ &new_inslist, new_inslist_size,
+ &new_inshead, new_inshead_size, ins, skipdepth));
+ } else {
+ /*
+ * If changing an already changed record, create a new WT_UPDATE
+ * entry and have the workQ link it into an existing WT_INSERT
+ * entry's WT_UPDATE list.
+ */
+ WT_ERR(__wt_update_alloc(session, value, &upd));
+
+ /* workQ: insert the WT_UPDATE structure. */
+ ret = __wt_update_serial(session, page,
+ cbt->write_gen, &cbt->ins->upd, NULL, 0, upd);
}
if (ret != 0) {
@@ -173,6 +156,7 @@ err: if (ins != NULL)
}
__wt_free(session, new_inslist);
+ __wt_free(session, new_inshead);
return (ret);
}
@@ -183,8 +167,8 @@ err: if (ins != NULL)
* buffer and fill it in.
*/
static int
-__col_insert_alloc(WT_SESSION_IMPL *session,
- uint64_t recno, u_int skipdepth, WT_INSERT **insp, size_t *ins_sizep)
+__col_insert_alloc(
+ WT_SESSION_IMPL *session, uint64_t recno, u_int skipdepth, WT_INSERT **insp)
{
WT_SESSION_BUFFER *sb;
WT_INSERT *ins;
@@ -194,14 +178,78 @@ __col_insert_alloc(WT_SESSION_IMPL *session,
* Allocate the WT_INSERT structure and skiplist pointers, then copy
* the record number into place.
*/
- ins_size = sizeof(WT_INSERT) +
- skipdepth * sizeof(WT_INSERT *);
+ ins_size = sizeof(WT_INSERT) + skipdepth * sizeof(WT_INSERT *);
WT_RET(__wt_sb_alloc(session, ins_size, &ins, &sb));
ins->sb = sb;
WT_INSERT_RECNO(ins) = recno;
*insp = ins;
- *ins_sizep = ins_size;
+ return (0);
+}
+
+/*
+ * __wt_append_serial_func --
+ * Server function to append an WT_INSERT entry to the tree.
+ */
+int
+__wt_append_serial_func(WT_SESSION_IMPL *session)
+{
+ WT_BTREE *btree;
+ WT_PAGE *page;
+ WT_INSERT_HEAD **inshead, **new_inslist, *new_inshead;
+ WT_INSERT *new_ins, ***ins_stack;
+ uint64_t recno;
+ u_int i, skipdepth;
+
+ btree = session->btree;
+ page = btree->last_page;
+
+ __wt_append_unpack(session, &inshead, &ins_stack,
+ &new_inslist, &new_inshead, &new_ins, &skipdepth);
+
+ /*
+ * If the page does not yet have an insert array, our caller passed
+ * us one.
+ */
+ if (page->u.col_leaf.append == NULL) {
+ page->u.col_leaf.append = new_inslist;
+ __wt_append_new_inslist_taken(session, page);
+ }
+
+ /*
+ * If the insert head does not yet have an insert list, our caller
+ * passed us one.
+ */
+ if (*inshead == NULL) {
+ *inshead = new_inshead;
+ __wt_append_new_inshead_taken(session, page);
+ }
+
+ /*
+ * If the application specified a record number, there's a race: the
+ * application may have searched for the record, not found it, then
+ * called into the append code, and another thread might have added
+ * the record. Fortunately, we're in the right place because if the
+ * record didn't exist at some point, it can only have been created
+ * on this list. Search for the record, if specified.
+ */
+ if ((recno = WT_INSERT_RECNO(new_ins)) == 0)
+ recno = WT_INSERT_RECNO(new_ins) = ++btree->last_recno;
+ (void)__col_insert_search_stack(*inshead, ins_stack, recno);
+
+ /*
+ * First, point the new WT_INSERT item's skiplist references to the next
+ * elements in the insert list, then flush memory. Second, update the
+ * skiplist elements that reference the new WT_INSERT item, this ensures
+ * the list is never inconsistent.
+ */
+ for (i = 0; i < skipdepth; i++)
+ new_ins->next[i] = *ins_stack[i];
+ WT_MEMORY_FLUSH;
+ for (i = 0; i < skipdepth; i++)
+ *ins_stack[i] = new_ins;
+
+ __wt_session_serialize_wrapup(session, page, 0);
return (0);
}