summaryrefslogtreecommitdiff
path: root/storage
diff options
context:
space:
mode:
authorunknown <jonas@perch.ndb.mysql.com>2005-11-07 13:28:19 +0100
committerunknown <jonas@perch.ndb.mysql.com>2005-11-07 13:28:19 +0100
commit6be5f8a340359cff3302d9e26c8620aff7a78938 (patch)
treea2e355801c606d5c737ae2f9ae567fdf2fd415a3 /storage
parentee740746aff3c5163adfff481bc4cf70c37728f5 (diff)
downloadmariadb-git-6be5f8a340359cff3302d9e26c8620aff7a78938.tar.gz
ndb - Fix problematic handling of TUP_ALLOC/DEALLOC REQ wrt varsize
same row - prepare delete, prepare insert (with different size), commit delete, commit insert would lead to assertion as prepare insert didnt call handle_size_change_after_update
Diffstat (limited to 'storage')
-rw-r--r--storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp1
-rw-r--r--storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp243
2 files changed, 147 insertions, 97 deletions
diff --git a/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp b/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp
index ebb3cc98de0..8a60c26b4ef 100644
--- a/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp
+++ b/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp
@@ -2103,6 +2103,7 @@ private:
}
void prepare_initial_insert(KeyReqStruct*, Operationrec*, Tablerec*);
+ void fix_disk_insert_no_mem_insert(KeyReqStruct*, Operationrec*, Tablerec*);
void setup_fixed_part(KeyReqStruct* req_struct,
Operationrec* const regOperPtr,
Tablerec* const regTabPtr);
diff --git a/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp b/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp
index 8e1c1ee43fd..8ce42b1dcf8 100644
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp
@@ -1298,12 +1298,56 @@ Dbtup::prepare_initial_insert(KeyReqStruct *req_struct,
disk_undo ? (Tuple_header::DISK_ALLOC | Tuple_header::DISK_INLINE) : 0;
}
+void
+Dbtup::fix_disk_insert_no_mem_insert(KeyReqStruct *req_struct,
+ Operationrec* regOperPtr,
+ Tablerec* regTabPtr)
+{
+ regOperPtr->m_undo_buffer_space= sizeof(Dbtup::Disk_undo::Alloc);
+ req_struct->check_offset[DD]= regTabPtr->get_check_offset(DD);
+
+ const Uint32 cnt1= regTabPtr->m_attributes[MM].m_no_of_varsize;
+ const Uint32 cnt2= regTabPtr->m_attributes[DD].m_no_of_varsize;
+ Uint32 *ptr= req_struct->m_tuple_ptr->get_var_part_ptr(regTabPtr);
+
+ if(cnt1)
+ {
+ // Disk part is 32-bit aligned
+ char *varptr = req_struct->m_var_data[MM].m_data_ptr;
+ ptr= ALIGN_WORD(varptr + regTabPtr->m_offsets[MM].m_max_var_offset);
+ }
+ else
+ {
+ ptr -= Tuple_header::HeaderSize;
+ }
+
+ req_struct->m_disk_ptr= (Tuple_header*)ptr;
+
+ if(cnt2)
+ {
+ KeyReqStruct::Var_data *dst= &req_struct->m_var_data[DD];
+ ptr=((Tuple_header*)ptr)->m_data+regTabPtr->m_offsets[DD].m_varpart_offset;
+ dst->m_data_ptr= (char*)(((Uint16*)ptr)+cnt2+1);
+ dst->m_offset_array_ptr= req_struct->var_pos_array + (cnt1 << 1);
+ dst->m_var_len_offset= cnt2;
+ dst->m_max_var_offset= regTabPtr->m_offsets[DD].m_max_var_offset;
+ }
+
+ // Set all null bits
+ memset(req_struct->m_disk_ptr->m_null_bits+
+ regTabPtr->m_offsets[DD].m_null_offset, 0xFF,
+ 4*regTabPtr->m_offsets[DD].m_null_words);
+ req_struct->m_tuple_ptr->m_header_bits =
+ (Tuple_header::DISK_ALLOC | Tuple_header::DISK_INLINE);
+}
+
int Dbtup::handleInsertReq(Signal* signal,
Ptr<Operationrec> regOperPtr,
Ptr<Fragrecord> fragPtr,
Tablerec* const regTabPtr,
KeyReqStruct *req_struct)
{
+ Uint32 tup_version = 1;
Fragrecord* regFragPtr = fragPtr.p;
Uint32 *dst, *ptr= 0;
Tuple_header *base= req_struct->m_tuple_ptr, *org= base;
@@ -1320,51 +1364,64 @@ int Dbtup::handleInsertReq(Signal* signal,
ndbout << "dst: " << hex << UintPtr(dst) << " - "
<< regOperPtr.p->m_copy_tuple_location << endl;
+ bool disk = regTabPtr->m_no_of_disk_attributes > 0;
+ bool mem_insert = get_tuple_state(regOperPtr.p) == TUPLE_INITIAL_INSERT;
+ bool disk_insert = regOperPtr.p->is_first_operation() && disk;
- Uint32 tup_version;
union {
Uint32 sizes[4];
Uint64 cmp[2];
};
- if(regOperPtr.p->is_first_operation())
+
+ if(mem_insert)
{
- tup_version= 1;
+ jam();
+ ndbassert(regOperPtr.p->is_first_operation()); // disk insert
prepare_initial_insert(req_struct, regOperPtr.p, regTabPtr);
- if(regTabPtr->m_no_of_disk_attributes)
- {
- int res;
- if((res= c_lgman->alloc_log_space(regFragPtr->m_logfile_group_id,
- regOperPtr.p->m_undo_buffer_space)))
- {
- terrorCode= res;
- regOperPtr.p->m_undo_buffer_space= 0;
- goto log_space_error;
- }
- }
}
else
{
- Operationrec* prevOp= req_struct->prevOpPtr.p;
- ndbassert(prevOp->op_struct.op_type == ZDELETE);
- tup_version= prevOp->tupVersion + 1;
+ if (!regOperPtr.p->is_first_operation())
+ {
+ Operationrec* prevOp= req_struct->prevOpPtr.p;
+ ndbassert(prevOp->op_struct.op_type == ZDELETE);
+ tup_version= prevOp->tupVersion + 1;
+
+ if(!prevOp->is_first_operation())
+ org= (Tuple_header*)c_undo_buffer.get_ptr(&prevOp->m_copy_tuple_location);
+ }
- if(!prevOp->is_first_operation())
- org= (Tuple_header*)c_undo_buffer.get_ptr(&prevOp->m_copy_tuple_location);
if (regTabPtr->need_expand())
- expand_tuple(req_struct, sizes, org, regTabPtr, true);
+ expand_tuple(req_struct, sizes, org, regTabPtr, !disk_insert);
else
memcpy(dst, org, 4*regTabPtr->m_offsets[MM].m_fix_header_size);
}
+ if (disk_insert)
+ {
+ int res;
+ if (unlikely(!mem_insert))
+ {
+ sizes[DD] = sizes[DD+2] = regTabPtr->m_offsets[DD].m_fix_header_size;
+ fix_disk_insert_no_mem_insert(req_struct, regOperPtr.p, regTabPtr);
+ }
+ if((res= c_lgman->alloc_log_space(regFragPtr->m_logfile_group_id,
+ regOperPtr.p->m_undo_buffer_space)))
+ {
+ terrorCode= res;
+ regOperPtr.p->m_undo_buffer_space= 0;
+ goto log_space_error;
+ }
+ }
+
regOperPtr.p->tupVersion= tup_version & ZTUP_VERSION_MASK;
tuple_ptr->set_tuple_version(tup_version);
if(updateAttributes(req_struct, &cinBuffer[0],
req_struct->attrinfo_len) == -1)
return -1;
-
+
if (checkNullAttributes(req_struct, regTabPtr) == false)
{
-
goto null_check_error;
}
@@ -1376,95 +1433,87 @@ int Dbtup::handleInsertReq(Signal* signal,
/**
* Alloc memory
*/
- if(regOperPtr.p->is_first_operation())
+ Uint32 frag_page_id = req_struct->frag_page_id;
+ Uint32 real_page_id = regOperPtr.p->m_tuple_location.m_page_no;
+ if(mem_insert)
{
- Uint32 frag_page_id = req_struct->frag_page_id;
- Uint32 real_page_id = regOperPtr.p->m_tuple_location.m_page_no;
- regOperPtr.p->m_tuple_location.m_page_no = frag_page_id;
- if (likely(get_tuple_state(regOperPtr.p) == TUPLE_INITIAL_INSERT))
+ if (!regTabPtr->m_attributes[MM].m_no_of_varsize)
{
- if (!regTabPtr->m_attributes[MM].m_no_of_varsize)
- {
- jam();
- if ((ptr= alloc_fix_rec(regFragPtr,
- regTabPtr,
- &regOperPtr.p->m_tuple_location,
- &frag_page_id)) == 0)
- {
- goto mem_error;
- }
- }
- else
+ jam();
+ if ((ptr= alloc_fix_rec(regFragPtr,
+ regTabPtr,
+ &regOperPtr.p->m_tuple_location,
+ &frag_page_id)) == 0)
{
- jam();
- regOperPtr.p->m_tuple_location.m_file_no= sizes[2+MM];
- if ((ptr= alloc_var_rec(regFragPtr, regTabPtr,
- sizes[2+MM],
- &regOperPtr.p->m_tuple_location,
- &frag_page_id, 0)) == 0)
- goto mem_error;
+ goto mem_error;
}
-
- real_page_id = regOperPtr.p->m_tuple_location.m_page_no;
- regOperPtr.p->m_tuple_location.m_page_no= frag_page_id;
- c_lqh->accminupdate(signal,
- regOperPtr.p->userpointer,
- &regOperPtr.p->m_tuple_location);
-
- ((Tuple_header*)ptr)->m_operation_ptr_i= regOperPtr.i;
- ((Tuple_header*)ptr)->m_header_bits= Tuple_header::ALLOC;
- }
-
- if (regTabPtr->m_no_of_disk_attributes)
+ }
+ else
{
- Local_key tmp;
- Uint32 size= regTabPtr->m_attributes[DD].m_no_of_varsize == 0 ?
- 1 : sizes[2+DD];
-
- int ret= disk_page_prealloc(signal, fragPtr, &tmp, size);
- ndbassert(ret >= 0);
-
- regOperPtr.p->op_struct.m_disk_preallocated= 1;
- tmp.m_page_idx= size;
- memcpy(tuple_ptr->get_disk_ref_ptr(regTabPtr), &tmp, sizeof(tmp));
-
- /**
- * Set ref from disk to mm
- */
- Tuple_header* disk_ptr= req_struct->m_disk_ptr;
- disk_ptr->m_header_bits = 0;
- disk_ptr->m_base_record_ref= regOperPtr.p->m_tuple_location.ref();
+ jam();
+ regOperPtr.p->m_tuple_location.m_file_no= sizes[2+MM];
+ if ((ptr= alloc_var_rec(regFragPtr, regTabPtr,
+ sizes[2+MM],
+ &regOperPtr.p->m_tuple_location,
+ &frag_page_id, 0)) == 0)
+ goto mem_error;
}
- regOperPtr.p->m_tuple_location.m_page_no = real_page_id;
- tuple_ptr->m_header_bits |= Tuple_header::ALLOC;
+ real_page_id = regOperPtr.p->m_tuple_location.m_page_no;
+ regOperPtr.p->m_tuple_location.m_page_no= frag_page_id;
+ c_lqh->accminupdate(signal,
+ regOperPtr.p->userpointer,
+ &regOperPtr.p->m_tuple_location);
- if (regTabPtr->checksumIndicator) {
- jam();
- setChecksum(req_struct->m_tuple_ptr, regTabPtr);
- }
-
- return 0;
+ ((Tuple_header*)ptr)->m_operation_ptr_i= regOperPtr.i;
+ ((Tuple_header*)ptr)->m_header_bits= Tuple_header::ALLOC;
+ regOperPtr.p->m_tuple_location.m_page_no = real_page_id;
}
else
{
- if (regTabPtr->checksumIndicator) {
- jam();
- setChecksum(req_struct->m_tuple_ptr, regTabPtr);
+ int ret;
+ if (regTabPtr->need_shrink() && cmp[0] != cmp[1] &&
+ (ret = handle_size_change_after_update(req_struct,
+ base,
+ regOperPtr.p,
+ regFragPtr,
+ regTabPtr,
+ sizes)))
+ {
+ return ret;
}
+ }
+
+ if (disk_insert)
+ {
+ Local_key tmp;
+ Uint32 size= regTabPtr->m_attributes[DD].m_no_of_varsize == 0 ?
+ 1 : sizes[2+DD];
- if (!regTabPtr->need_shrink() || cmp[0] == cmp[1])
- return 0;
-
- return handle_size_change_after_update(req_struct,
- base,
- regOperPtr.p,
- regFragPtr,
- regTabPtr,
- sizes);
+ int ret= disk_page_prealloc(signal, fragPtr, &tmp, size);
+ ndbassert(ret >= 0);
+
+ regOperPtr.p->op_struct.m_disk_preallocated= 1;
+ tmp.m_page_idx= size;
+ memcpy(tuple_ptr->get_disk_ref_ptr(regTabPtr), &tmp, sizeof(tmp));
+
+ /**
+ * Set ref from disk to mm
+ */
+ Local_key ref = regOperPtr.p->m_tuple_location;
+ ref.m_page_no = frag_page_id;
+
+ Tuple_header* disk_ptr= req_struct->m_disk_ptr;
+ disk_ptr->m_header_bits = 0;
+ disk_ptr->m_base_record_ref= ref.ref();
}
-
-
+
+ if (regTabPtr->checksumIndicator)
+ {
+ jam();
+ setChecksum(req_struct->m_tuple_ptr, regTabPtr);
+ }
+ return 0;
mem_error:
terrorCode= ZMEM_NOMEM_ERROR;