diff options
author | unknown <jonas@perch.ndb.mysql.com> | 2005-11-07 13:28:19 +0100 |
---|---|---|
committer | unknown <jonas@perch.ndb.mysql.com> | 2005-11-07 13:28:19 +0100 |
commit | 6be5f8a340359cff3302d9e26c8620aff7a78938 (patch) | |
tree | a2e355801c606d5c737ae2f9ae567fdf2fd415a3 /storage | |
parent | ee740746aff3c5163adfff481bc4cf70c37728f5 (diff) | |
download | mariadb-git-6be5f8a340359cff3302d9e26c8620aff7a78938.tar.gz |
ndb - Fix problematic handling of TUP_ALLOC/DEALLOC REQ wrt varsize
same row -
prepare delete, prepare insert (with different size), commit delete, commit insert would lead to assertion
as prepare insert didnt call handle_size_change_after_update
Diffstat (limited to 'storage')
-rw-r--r-- | storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp | 1 | ||||
-rw-r--r-- | storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp | 243 |
2 files changed, 147 insertions, 97 deletions
diff --git a/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp b/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp index ebb3cc98de0..8a60c26b4ef 100644 --- a/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp +++ b/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp @@ -2103,6 +2103,7 @@ private: } void prepare_initial_insert(KeyReqStruct*, Operationrec*, Tablerec*); + void fix_disk_insert_no_mem_insert(KeyReqStruct*, Operationrec*, Tablerec*); void setup_fixed_part(KeyReqStruct* req_struct, Operationrec* const regOperPtr, Tablerec* const regTabPtr); diff --git a/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp b/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp index 8e1c1ee43fd..8ce42b1dcf8 100644 --- a/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp +++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp @@ -1298,12 +1298,56 @@ Dbtup::prepare_initial_insert(KeyReqStruct *req_struct, disk_undo ? (Tuple_header::DISK_ALLOC | Tuple_header::DISK_INLINE) : 0; } +void +Dbtup::fix_disk_insert_no_mem_insert(KeyReqStruct *req_struct, + Operationrec* regOperPtr, + Tablerec* regTabPtr) +{ + regOperPtr->m_undo_buffer_space= sizeof(Dbtup::Disk_undo::Alloc); + req_struct->check_offset[DD]= regTabPtr->get_check_offset(DD); + + const Uint32 cnt1= regTabPtr->m_attributes[MM].m_no_of_varsize; + const Uint32 cnt2= regTabPtr->m_attributes[DD].m_no_of_varsize; + Uint32 *ptr= req_struct->m_tuple_ptr->get_var_part_ptr(regTabPtr); + + if(cnt1) + { + // Disk part is 32-bit aligned + char *varptr = req_struct->m_var_data[MM].m_data_ptr; + ptr= ALIGN_WORD(varptr + regTabPtr->m_offsets[MM].m_max_var_offset); + } + else + { + ptr -= Tuple_header::HeaderSize; + } + + req_struct->m_disk_ptr= (Tuple_header*)ptr; + + if(cnt2) + { + KeyReqStruct::Var_data *dst= &req_struct->m_var_data[DD]; + ptr=((Tuple_header*)ptr)->m_data+regTabPtr->m_offsets[DD].m_varpart_offset; + dst->m_data_ptr= (char*)(((Uint16*)ptr)+cnt2+1); + dst->m_offset_array_ptr= req_struct->var_pos_array + (cnt1 << 1); + dst->m_var_len_offset= cnt2; + dst->m_max_var_offset= regTabPtr->m_offsets[DD].m_max_var_offset; + } + + // Set all null bits + memset(req_struct->m_disk_ptr->m_null_bits+ + regTabPtr->m_offsets[DD].m_null_offset, 0xFF, + 4*regTabPtr->m_offsets[DD].m_null_words); + req_struct->m_tuple_ptr->m_header_bits = + (Tuple_header::DISK_ALLOC | Tuple_header::DISK_INLINE); +} + int Dbtup::handleInsertReq(Signal* signal, Ptr<Operationrec> regOperPtr, Ptr<Fragrecord> fragPtr, Tablerec* const regTabPtr, KeyReqStruct *req_struct) { + Uint32 tup_version = 1; Fragrecord* regFragPtr = fragPtr.p; Uint32 *dst, *ptr= 0; Tuple_header *base= req_struct->m_tuple_ptr, *org= base; @@ -1320,51 +1364,64 @@ int Dbtup::handleInsertReq(Signal* signal, ndbout << "dst: " << hex << UintPtr(dst) << " - " << regOperPtr.p->m_copy_tuple_location << endl; + bool disk = regTabPtr->m_no_of_disk_attributes > 0; + bool mem_insert = get_tuple_state(regOperPtr.p) == TUPLE_INITIAL_INSERT; + bool disk_insert = regOperPtr.p->is_first_operation() && disk; - Uint32 tup_version; union { Uint32 sizes[4]; Uint64 cmp[2]; }; - if(regOperPtr.p->is_first_operation()) + + if(mem_insert) { - tup_version= 1; + jam(); + ndbassert(regOperPtr.p->is_first_operation()); // disk insert prepare_initial_insert(req_struct, regOperPtr.p, regTabPtr); - if(regTabPtr->m_no_of_disk_attributes) - { - int res; - if((res= c_lgman->alloc_log_space(regFragPtr->m_logfile_group_id, - regOperPtr.p->m_undo_buffer_space))) - { - terrorCode= res; - regOperPtr.p->m_undo_buffer_space= 0; - goto log_space_error; - } - } } else { - Operationrec* prevOp= req_struct->prevOpPtr.p; - ndbassert(prevOp->op_struct.op_type == ZDELETE); - tup_version= prevOp->tupVersion + 1; + if (!regOperPtr.p->is_first_operation()) + { + Operationrec* prevOp= req_struct->prevOpPtr.p; + ndbassert(prevOp->op_struct.op_type == ZDELETE); + tup_version= prevOp->tupVersion + 1; + + if(!prevOp->is_first_operation()) + org= (Tuple_header*)c_undo_buffer.get_ptr(&prevOp->m_copy_tuple_location); + } - if(!prevOp->is_first_operation()) - org= (Tuple_header*)c_undo_buffer.get_ptr(&prevOp->m_copy_tuple_location); if (regTabPtr->need_expand()) - expand_tuple(req_struct, sizes, org, regTabPtr, true); + expand_tuple(req_struct, sizes, org, regTabPtr, !disk_insert); else memcpy(dst, org, 4*regTabPtr->m_offsets[MM].m_fix_header_size); } + if (disk_insert) + { + int res; + if (unlikely(!mem_insert)) + { + sizes[DD] = sizes[DD+2] = regTabPtr->m_offsets[DD].m_fix_header_size; + fix_disk_insert_no_mem_insert(req_struct, regOperPtr.p, regTabPtr); + } + if((res= c_lgman->alloc_log_space(regFragPtr->m_logfile_group_id, + regOperPtr.p->m_undo_buffer_space))) + { + terrorCode= res; + regOperPtr.p->m_undo_buffer_space= 0; + goto log_space_error; + } + } + regOperPtr.p->tupVersion= tup_version & ZTUP_VERSION_MASK; tuple_ptr->set_tuple_version(tup_version); if(updateAttributes(req_struct, &cinBuffer[0], req_struct->attrinfo_len) == -1) return -1; - + if (checkNullAttributes(req_struct, regTabPtr) == false) { - goto null_check_error; } @@ -1376,95 +1433,87 @@ int Dbtup::handleInsertReq(Signal* signal, /** * Alloc memory */ - if(regOperPtr.p->is_first_operation()) + Uint32 frag_page_id = req_struct->frag_page_id; + Uint32 real_page_id = regOperPtr.p->m_tuple_location.m_page_no; + if(mem_insert) { - Uint32 frag_page_id = req_struct->frag_page_id; - Uint32 real_page_id = regOperPtr.p->m_tuple_location.m_page_no; - regOperPtr.p->m_tuple_location.m_page_no = frag_page_id; - if (likely(get_tuple_state(regOperPtr.p) == TUPLE_INITIAL_INSERT)) + if (!regTabPtr->m_attributes[MM].m_no_of_varsize) { - if (!regTabPtr->m_attributes[MM].m_no_of_varsize) - { - jam(); - if ((ptr= alloc_fix_rec(regFragPtr, - regTabPtr, - ®OperPtr.p->m_tuple_location, - &frag_page_id)) == 0) - { - goto mem_error; - } - } - else + jam(); + if ((ptr= alloc_fix_rec(regFragPtr, + regTabPtr, + ®OperPtr.p->m_tuple_location, + &frag_page_id)) == 0) { - jam(); - regOperPtr.p->m_tuple_location.m_file_no= sizes[2+MM]; - if ((ptr= alloc_var_rec(regFragPtr, regTabPtr, - sizes[2+MM], - ®OperPtr.p->m_tuple_location, - &frag_page_id, 0)) == 0) - goto mem_error; + goto mem_error; } - - real_page_id = regOperPtr.p->m_tuple_location.m_page_no; - regOperPtr.p->m_tuple_location.m_page_no= frag_page_id; - c_lqh->accminupdate(signal, - regOperPtr.p->userpointer, - ®OperPtr.p->m_tuple_location); - - ((Tuple_header*)ptr)->m_operation_ptr_i= regOperPtr.i; - ((Tuple_header*)ptr)->m_header_bits= Tuple_header::ALLOC; - } - - if (regTabPtr->m_no_of_disk_attributes) + } + else { - Local_key tmp; - Uint32 size= regTabPtr->m_attributes[DD].m_no_of_varsize == 0 ? - 1 : sizes[2+DD]; - - int ret= disk_page_prealloc(signal, fragPtr, &tmp, size); - ndbassert(ret >= 0); - - regOperPtr.p->op_struct.m_disk_preallocated= 1; - tmp.m_page_idx= size; - memcpy(tuple_ptr->get_disk_ref_ptr(regTabPtr), &tmp, sizeof(tmp)); - - /** - * Set ref from disk to mm - */ - Tuple_header* disk_ptr= req_struct->m_disk_ptr; - disk_ptr->m_header_bits = 0; - disk_ptr->m_base_record_ref= regOperPtr.p->m_tuple_location.ref(); + jam(); + regOperPtr.p->m_tuple_location.m_file_no= sizes[2+MM]; + if ((ptr= alloc_var_rec(regFragPtr, regTabPtr, + sizes[2+MM], + ®OperPtr.p->m_tuple_location, + &frag_page_id, 0)) == 0) + goto mem_error; } - regOperPtr.p->m_tuple_location.m_page_no = real_page_id; - tuple_ptr->m_header_bits |= Tuple_header::ALLOC; + real_page_id = regOperPtr.p->m_tuple_location.m_page_no; + regOperPtr.p->m_tuple_location.m_page_no= frag_page_id; + c_lqh->accminupdate(signal, + regOperPtr.p->userpointer, + ®OperPtr.p->m_tuple_location); - if (regTabPtr->checksumIndicator) { - jam(); - setChecksum(req_struct->m_tuple_ptr, regTabPtr); - } - - return 0; + ((Tuple_header*)ptr)->m_operation_ptr_i= regOperPtr.i; + ((Tuple_header*)ptr)->m_header_bits= Tuple_header::ALLOC; + regOperPtr.p->m_tuple_location.m_page_no = real_page_id; } else { - if (regTabPtr->checksumIndicator) { - jam(); - setChecksum(req_struct->m_tuple_ptr, regTabPtr); + int ret; + if (regTabPtr->need_shrink() && cmp[0] != cmp[1] && + (ret = handle_size_change_after_update(req_struct, + base, + regOperPtr.p, + regFragPtr, + regTabPtr, + sizes))) + { + return ret; } + } + + if (disk_insert) + { + Local_key tmp; + Uint32 size= regTabPtr->m_attributes[DD].m_no_of_varsize == 0 ? + 1 : sizes[2+DD]; - if (!regTabPtr->need_shrink() || cmp[0] == cmp[1]) - return 0; - - return handle_size_change_after_update(req_struct, - base, - regOperPtr.p, - regFragPtr, - regTabPtr, - sizes); + int ret= disk_page_prealloc(signal, fragPtr, &tmp, size); + ndbassert(ret >= 0); + + regOperPtr.p->op_struct.m_disk_preallocated= 1; + tmp.m_page_idx= size; + memcpy(tuple_ptr->get_disk_ref_ptr(regTabPtr), &tmp, sizeof(tmp)); + + /** + * Set ref from disk to mm + */ + Local_key ref = regOperPtr.p->m_tuple_location; + ref.m_page_no = frag_page_id; + + Tuple_header* disk_ptr= req_struct->m_disk_ptr; + disk_ptr->m_header_bits = 0; + disk_ptr->m_base_record_ref= ref.ref(); } - - + + if (regTabPtr->checksumIndicator) + { + jam(); + setChecksum(req_struct->m_tuple_ptr, regTabPtr); + } + return 0; mem_error: terrorCode= ZMEM_NOMEM_ERROR; |