diff options
Diffstat (limited to 'src/btree/col_modify.c')
-rw-r--r-- | src/btree/col_modify.c | 244 |
1 files changed, 146 insertions, 98 deletions
diff --git a/src/btree/col_modify.c b/src/btree/col_modify.c index c91bda56308..1ad680013f0 100644 --- a/src/btree/col_modify.c +++ b/src/btree/col_modify.c @@ -7,63 +7,51 @@ #include "wt_internal.h" -static int __col_insert_alloc( - WT_SESSION_IMPL *, uint64_t, u_int, WT_INSERT **, size_t *); +static int __col_insert_alloc(WT_SESSION_IMPL *, uint64_t, u_int, WT_INSERT **); /* * __wt_col_modify -- - * Column-store delete insert, and update. + * Column-store delete, insert, and update. */ int -__wt_col_modify(WT_SESSION_IMPL *session, WT_CURSOR_BTREE *cbt, int is_remove) +__wt_col_modify(WT_SESSION_IMPL *session, WT_CURSOR_BTREE *cbt, int op) { WT_BTREE *btree; WT_INSERT *ins; WT_INSERT_HEAD **inshead, *new_inshead, **new_inslist; WT_ITEM *value, _value; WT_PAGE *page; - WT_SESSION_BUFFER *sb; WT_UPDATE *upd; - size_t ins_size, new_inslist_size, new_inshead_size, upd_size; + size_t new_inshead_size, new_inslist_size; uint64_t recno; u_int skipdepth; - int hazard_ref, i, ret; + int i, ret; btree = cbt->btree; page = cbt->page; - recno = cbt->iface.recno; - if (is_remove) { + switch (op) { + case 1: /* Insert */ + page = btree->last_page; + __cursor_search_clear(cbt); + + value = (WT_ITEM *)&cbt->iface.value; + recno = 0; /* Engine allocates */ + break; + case 2: /* Remove */ if (btree->type == BTREE_COL_FIX) { value = &_value; value->data = ""; value->size = 1; } else value = NULL; - } else + recno = cbt->iface.recno; /* App specified */ + break; + case 3: /* Update */ + default: value = (WT_ITEM *)&cbt->iface.value; - - /* - * Append a column-store entry (the only place you can insert into a - * column-store file is after the key space, column-store records are - * immutable). If we don't have an exact match, it's an append and we - * need to extend the file. - */ - if (cbt->compare != 0) { - /* - * We may have, and need to hold, a hazard reference on a page, - * but we're possibly doing some page shuffling of the root, - * which means the standard test to determine whether we should - * release a hazard reference on the page isn't right. Check - * now, before we do the page shuffling. - */ - hazard_ref = page == session->btree->root_page.page ? 0 : 1; - ret = __wt_col_extend(session, page, recno); - if (hazard_ref) { - __wt_page_release(session, page); - cbt->page = NULL; /* XXX WRONG */ - } - return (ret == 0 ? WT_RESTART : 0); + recno = cbt->iface.recno; /* App specified */ + break; } ins = NULL; @@ -73,63 +61,45 @@ __wt_col_modify(WT_SESSION_IMPL *session, WT_CURSOR_BTREE *cbt, int is_remove) ret = 0; /* - * Delete or update a column-store entry. - * Column-store changes mean working in a WT_INSERT list. + * Delete, insert or update a column-store entry. */ - if (cbt->ins != NULL) { - /* - * If changing an already changed record, create a new WT_UPDATE - * entry and have the workQ link it into an existing WT_INSERT - * entry's WT_UPDATE list. - */ - WT_ERR(__wt_update_alloc(session, value, &upd, &upd_size)); - - /* workQ: insert the WT_UPDATE structure. */ - ret = __wt_update_serial(session, page, - cbt->write_gen, &cbt->ins->upd, NULL, 0, &upd, upd_size); - } else { - /* - * We may not have an WT_INSERT_HEAD array (in the case of - * variable length column store) or WT_INSERT_HEAD slot (in the - * case of fixed length column store). Also, there may be an - * insert array but no list at the point we are inserting. - * Allocate as necessary. - */ + if (cbt->ins == NULL) { + /* There may be no WT_INSERT_HEAD, allocate as necessary. */ new_inshead_size = new_inslist_size = 0; - if (page->u.col_leaf.ins == NULL) - switch (page->type) { - case WT_PAGE_COL_FIX: + if (op == 1) { + if (page == NULL || page->u.col_leaf.append == NULL) { new_inslist_size = 1 * sizeof(WT_INSERT_HEAD *); - WT_ERR(__wt_calloc_def(session, - 1, &new_inslist)); + WT_ERR( + __wt_calloc_def(session, 1, &new_inslist)); inshead = &new_inslist[0]; - break; - case WT_PAGE_COL_VAR: + } else + inshead = &page->u.col_leaf.append[0]; + cbt->ins_head = *inshead; + } else if (page->type == WT_PAGE_COL_FIX) { + if (page->u.col_leaf.ins == NULL) { + new_inslist_size = 1 * + sizeof(WT_INSERT_HEAD *); + WT_ERR( + __wt_calloc_def(session, 1, &new_inslist)); + inshead = &new_inslist[0]; + } else + inshead = &page->u.col_leaf.ins[0]; + } else { + if (page->u.col_leaf.ins == NULL) { new_inslist_size = page->entries * sizeof(WT_INSERT_HEAD *); WT_ERR(__wt_calloc_def( session, page->entries, &new_inslist)); inshead = &new_inslist[cbt->slot]; - break; - WT_ILLEGAL_FORMAT(session); - } - else - switch (page->type) { - case WT_PAGE_COL_FIX: - inshead = &page->u.col_leaf.ins[0]; - break; - case WT_PAGE_COL_VAR: + } else inshead = &page->u.col_leaf.ins[cbt->slot]; - break; - WT_ILLEGAL_FORMAT(session); - } + } + /* There may be no WT_INSERT list, allocate as necessary. */ if (*inshead == NULL) { new_inshead_size = sizeof(WT_INSERT_HEAD); - WT_RET(__wt_sb_alloc(session, - sizeof(WT_INSERT_HEAD), &new_inshead, &sb)); - new_inshead->sb = sb; + WT_RET(__wt_calloc_def(session, 1, &new_inshead)); for (i = 0; i < WT_SKIP_MAXDEPTH; i++) cbt->ins_stack[i] = &new_inshead->head[i]; cbt->ins_head = new_inshead; @@ -139,30 +109,43 @@ __wt_col_modify(WT_SESSION_IMPL *session, WT_CURSOR_BTREE *cbt, int is_remove) skipdepth = __wt_skip_choose_depth(); /* - * Allocate a new WT_INSERT/WT_UPDATE pair, link it into the - * WT_INSERT array. + * Allocate a WT_INSERT/WT_UPDATE pair, and update the cursor + * to reference it. */ - WT_ERR(__col_insert_alloc( - session, recno, skipdepth, &ins, &ins_size)); - WT_ERR(__wt_update_alloc(session, value, &upd, &upd_size)); + WT_ERR(__col_insert_alloc(session, recno, skipdepth, &ins)); + WT_ERR(__wt_update_alloc(session, value, &upd)); ins->upd = upd; - ins_size += upd_size; cbt->ins = ins; /* - * workQ: insert the WT_INSERT structure. - * - * For fixed-width stores, we are installing a single insert - * head for the page. Pass NULL to the insert serialization - * function, there is no need to set it again, and we only want - * to account for it once. + * workQ: insert or append the WT_INSERT structure. */ - ret = __wt_insert_serial(session, - page, cbt->write_gen, - inshead, cbt->ins_stack, - &new_inslist, new_inslist_size, - &new_inshead, new_inshead_size, - &ins, ins_size, skipdepth); + if (op == 1) { + WT_ERR(__wt_append_serial(session, + inshead, cbt->ins_stack, + &new_inslist, new_inslist_size, + &new_inshead, new_inshead_size, ins, skipdepth)); + + /* Set up the cursor for the inserted page and value. */ + cbt->page = btree->last_page; + cbt->recno = WT_INSERT_RECNO(ins); + } else + WT_ERR(__wt_insert_serial(session, + page, cbt->write_gen, + inshead, cbt->ins_stack, + &new_inslist, new_inslist_size, + &new_inshead, new_inshead_size, ins, skipdepth)); + } else { + /* + * If changing an already changed record, create a new WT_UPDATE + * entry and have the workQ link it into an existing WT_INSERT + * entry's WT_UPDATE list. + */ + WT_ERR(__wt_update_alloc(session, value, &upd)); + + /* workQ: insert the WT_UPDATE structure. */ + ret = __wt_update_serial(session, page, + cbt->write_gen, &cbt->ins->upd, NULL, 0, upd); } if (ret != 0) { @@ -173,6 +156,7 @@ err: if (ins != NULL) } __wt_free(session, new_inslist); + __wt_free(session, new_inshead); return (ret); } @@ -183,8 +167,8 @@ err: if (ins != NULL) * buffer and fill it in. */ static int -__col_insert_alloc(WT_SESSION_IMPL *session, - uint64_t recno, u_int skipdepth, WT_INSERT **insp, size_t *ins_sizep) +__col_insert_alloc( + WT_SESSION_IMPL *session, uint64_t recno, u_int skipdepth, WT_INSERT **insp) { WT_SESSION_BUFFER *sb; WT_INSERT *ins; @@ -194,14 +178,78 @@ __col_insert_alloc(WT_SESSION_IMPL *session, * Allocate the WT_INSERT structure and skiplist pointers, then copy * the record number into place. */ - ins_size = sizeof(WT_INSERT) + - skipdepth * sizeof(WT_INSERT *); + ins_size = sizeof(WT_INSERT) + skipdepth * sizeof(WT_INSERT *); WT_RET(__wt_sb_alloc(session, ins_size, &ins, &sb)); ins->sb = sb; WT_INSERT_RECNO(ins) = recno; *insp = ins; - *ins_sizep = ins_size; + return (0); +} + +/* + * __wt_append_serial_func -- + * Server function to append an WT_INSERT entry to the tree. + */ +int +__wt_append_serial_func(WT_SESSION_IMPL *session) +{ + WT_BTREE *btree; + WT_PAGE *page; + WT_INSERT_HEAD **inshead, **new_inslist, *new_inshead; + WT_INSERT *new_ins, ***ins_stack; + uint64_t recno; + u_int i, skipdepth; + + btree = session->btree; + page = btree->last_page; + + __wt_append_unpack(session, &inshead, &ins_stack, + &new_inslist, &new_inshead, &new_ins, &skipdepth); + + /* + * If the page does not yet have an insert array, our caller passed + * us one. + */ + if (page->u.col_leaf.append == NULL) { + page->u.col_leaf.append = new_inslist; + __wt_append_new_inslist_taken(session, page); + } + + /* + * If the insert head does not yet have an insert list, our caller + * passed us one. + */ + if (*inshead == NULL) { + *inshead = new_inshead; + __wt_append_new_inshead_taken(session, page); + } + + /* + * If the application specified a record number, there's a race: the + * application may have searched for the record, not found it, then + * called into the append code, and another thread might have added + * the record. Fortunately, we're in the right place because if the + * record didn't exist at some point, it can only have been created + * on this list. Search for the record, if specified. + */ + if ((recno = WT_INSERT_RECNO(new_ins)) == 0) + recno = WT_INSERT_RECNO(new_ins) = ++btree->last_recno; + (void)__col_insert_search_stack(*inshead, ins_stack, recno); + + /* + * First, point the new WT_INSERT item's skiplist references to the next + * elements in the insert list, then flush memory. Second, update the + * skiplist elements that reference the new WT_INSERT item, this ensures + * the list is never inconsistent. + */ + for (i = 0; i < skipdepth; i++) + new_ins->next[i] = *ins_stack[i]; + WT_MEMORY_FLUSH; + for (i = 0; i < skipdepth; i++) + *ins_stack[i] = new_ins; + + __wt_session_serialize_wrapup(session, page, 0); return (0); } |