From 08ea984e2729059e3d3e9667d19fa638c653e4c8 Mon Sep 17 00:00:00 2001 From: unknown Date: Mon, 5 Dec 2005 15:20:15 +0100 Subject: ndb - wl#2972 part 1: merge events per event op, gci storage/ndb/include/ndbapi/NdbDictionary.hpp: wl#2972 part 1: merge events per event op, gci storage/ndb/include/ndbapi/NdbEventOperation.hpp: wl#2972 part 1: merge events per event op, gci storage/ndb/ndbapi-examples/ndbapi_event/Makefile: wl#2972 part 1: merge events per event op, gci storage/ndb/ndbapi-examples/ndbapi_event/ndbapi_event.cpp: wl#2972 part 1: merge events per event op, gci storage/ndb/src/ndbapi/NdbEventOperation.cpp: wl#2972 part 1: merge events per event op, gci storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp: wl#2972 part 1: merge events per event op, gci storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp: wl#2972 part 1: merge events per event op, gci storage/ndb/test/ndbapi/test_event.cpp: wl#2972 part 1: merge events per event op, gci --- storage/ndb/include/ndbapi/NdbDictionary.hpp | 3 +- storage/ndb/include/ndbapi/NdbEventOperation.hpp | 6 + storage/ndb/ndbapi-examples/ndbapi_event/Makefile | 4 +- .../ndbapi-examples/ndbapi_event/ndbapi_event.cpp | 86 ++-- storage/ndb/src/ndbapi/NdbEventOperation.cpp | 5 + storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp | 561 +++++++++++++++++---- storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp | 52 +- storage/ndb/test/ndbapi/test_event.cpp | 4 + 8 files changed, 585 insertions(+), 136 deletions(-) (limited to 'storage') diff --git a/storage/ndb/include/ndbapi/NdbDictionary.hpp b/storage/ndb/include/ndbapi/NdbDictionary.hpp index 954dcc62a22..96b2cc09c6a 100644 --- a/storage/ndb/include/ndbapi/NdbDictionary.hpp +++ b/storage/ndb/include/ndbapi/NdbDictionary.hpp @@ -1027,7 +1027,8 @@ public: _TE_CREATE=6, _TE_GCP_COMPLETE=7, _TE_CLUSTER_FAILURE=8, - _TE_STOP=9 + _TE_STOP=9, + _TE_NUL=10 // internal (INS o DEL within same GCI) }; #endif /** diff --git a/storage/ndb/include/ndbapi/NdbEventOperation.hpp b/storage/ndb/include/ndbapi/NdbEventOperation.hpp index 6fe3573eb6a..0572147410b 100644 --- a/storage/ndb/include/ndbapi/NdbEventOperation.hpp +++ b/storage/ndb/include/ndbapi/NdbEventOperation.hpp @@ -93,6 +93,12 @@ public: * Retrieve current state of the NdbEventOperation object */ State getState(); + /** + * By default events on same NdbEventOperation within same GCI + * are merged into a single event. This can be changed with + * separateEvents(true). + */ + void separateEvents(bool flag); /** * Activates the NdbEventOperation to start receiving events. The diff --git a/storage/ndb/ndbapi-examples/ndbapi_event/Makefile b/storage/ndb/ndbapi-examples/ndbapi_event/Makefile index d8f7a03aac6..54c4c903a56 100644 --- a/storage/ndb/ndbapi-examples/ndbapi_event/Makefile +++ b/storage/ndb/ndbapi-examples/ndbapi_event/Makefile @@ -1,7 +1,7 @@ TARGET = ndbapi_event SRCS = ndbapi_event.cpp OBJS = ndbapi_event.o -CXX = g++ +CXX = g++ -g CFLAGS = -c -Wall -fno-rtti -fno-exceptions CXXFLAGS = DEBUG = @@ -17,7 +17,7 @@ $(TARGET): $(OBJS) $(CXX) $(CXXFLAGS) $(LFLAGS) $(LIB_DIR) $(OBJS) -lndbclient -lmysqlclient_r -lmysys -lmystrings -lz $(SYS_LIB) -o $(TARGET) $(TARGET).o: $(SRCS) - $(CXX) $(CFLAGS) -I$(INCLUDE_DIR) -I$(INCLUDE_DIR)/ndbapi $(SRCS) + $(CXX) $(CFLAGS) -I$(INCLUDE_DIR) -I$(INCLUDE_DIR)/ndbapi -I$(TOP_SRCDIR)/include $(SRCS) clean: rm -f *.o $(TARGET) diff --git a/storage/ndb/ndbapi-examples/ndbapi_event/ndbapi_event.cpp b/storage/ndb/ndbapi-examples/ndbapi_event/ndbapi_event.cpp index 6c1ba0c067d..40878599d8f 100644 --- a/storage/ndb/ndbapi-examples/ndbapi_event/ndbapi_event.cpp +++ b/storage/ndb/ndbapi-examples/ndbapi_event/ndbapi_event.cpp @@ -58,24 +58,29 @@ /** * - * Assume that there is a table TAB0 which is being updated by + * Assume that there is a table t0 which is being updated by * another process (e.g. flexBench -l 0 -stdtables). - * We want to monitor what happens with columns COL0, COL2, COL11 + * We want to monitor what happens with columns c0,c1,c2,c3. * * or together with the mysql client; * * shell> mysql -u root * mysql> create database TEST_DB; * mysql> use TEST_DB; - * mysql> create table TAB0 (COL0 int primary key, COL1 int, COL11 int) engine=ndb; + * mysql> create table t0 (c0 int, c1 int, c2 char(4), c3 char(4), + * primary key(c0, c2)) engine ndb charset latin1; * * In another window start ndbapi_event, wait until properly started * - insert into TAB0 values (1,2,3); - insert into TAB0 values (2,2,3); - insert into TAB0 values (3,2,9); - update TAB0 set COL1=10 where COL0=1; - delete from TAB0 where COL0=1; + insert into t0 values (1, 2, 'a', 'b'); + insert into t0 values (3, 4, 'c', 'd'); + update t0 set c3 = 'e' where c0 = 1 and c2 = 'a'; -- use pk + update t0 set c3 = 'f'; -- use scan + update t0 set c3 = 'F'; -- use scan update to 'same' + update t0 set c2 = 'g' where c0 = 1; -- update pk part + update t0 set c2 = 'G' where c0 = 1; -- update pk part to 'same' + update t0 set c0 = 5, c2 = 'H' where c0 = 3; -- update full PK + delete from t0; * * you should see the data popping up in the example window * @@ -92,9 +97,10 @@ int myCreateEvent(Ndb* myNdb, const char **eventColumnName, const int noEventColumnName); -int main() +int main(int argc, char** argv) { ndb_init(); + bool sep = argc > 1 && strcmp(argv[1], "-s") == 0; Ndb_cluster_connection *cluster_connection= new Ndb_cluster_connection(); // Object representing the cluster @@ -126,13 +132,15 @@ int main() if (myNdb->init() == -1) APIERROR(myNdb->getNdbError()); - const char *eventName= "CHNG_IN_TAB0"; - const char *eventTableName= "TAB0"; - const int noEventColumnName= 3; + const char *eventName= "CHNG_IN_t0"; + const char *eventTableName= "t0"; + const int noEventColumnName= 4; const char *eventColumnName[noEventColumnName]= - {"COL0", - "COL1", - "COL11"}; + {"c0", + "c1", + "c2", + "c3" + }; // Create events myCreateEvent(myNdb, @@ -142,13 +150,14 @@ int main() noEventColumnName); int j= 0; - while (j < 5) { + while (j < 99) { // Start "transaction" for handling events NdbEventOperation* op; printf("create EventOperation\n"); if ((op = myNdb->createEventOperation(eventName)) == NULL) APIERROR(myNdb->getNdbError()); + op->separateEvents(sep); printf("get values\n"); NdbRecAttr* recAttr[noEventColumnName]; @@ -175,34 +184,45 @@ int main() i++; switch (op->getEventType()) { case NdbDictionary::Event::TE_INSERT: - printf("%u INSERT: ", i); + printf("%u INSERT", i); break; case NdbDictionary::Event::TE_DELETE: - printf("%u DELETE: ", i); + printf("%u DELETE", i); break; case NdbDictionary::Event::TE_UPDATE: - printf("%u UPDATE: ", i); + printf("%u UPDATE", i); break; default: abort(); // should not happen } - for (int i = 1; i < noEventColumnName; i++) { + printf(" gci=%d\n", op->getGCI()); + printf("post: "); + for (int i = 0; i < noEventColumnName; i++) { if (recAttr[i]->isNULL() >= 0) { // we have a value - printf(" post[%u]=", i); - if (recAttr[i]->isNULL() == 0) // we have a non-null value - printf("%u", recAttr[i]->u_32_value()); - else // we have a null value - printf("NULL"); - } + if (recAttr[i]->isNULL() == 0) { // we have a non-null value + if (i < 2) + printf("%-5u", recAttr[i]->u_32_value()); + else + printf("%-5.4s", recAttr[i]->aRef()); + } else // we have a null value + printf("%-5s", "NULL"); + } else + printf("%-5s", "-"); + } + printf("\npre : "); + for (int i = 0; i < noEventColumnName; i++) { if (recAttrPre[i]->isNULL() >= 0) { // we have a value - printf(" pre[%u]=", i); - if (recAttrPre[i]->isNULL() == 0) // we have a non-null value - printf("%u", recAttrPre[i]->u_32_value()); - else // we have a null value - printf("NULL"); - } + if (recAttrPre[i]->isNULL() == 0) { // we have a non-null value + if (i < 2) + printf("%-5u", recAttrPre[i]->u_32_value()); + else + printf("%-5.4s", recAttrPre[i]->aRef()); + } else // we have a null value + printf("%-5s", "NULL"); + } else + printf("%-5s", "-"); } - printf("\n"); + printf("\n"); } } else ;//printf("timed out\n"); diff --git a/storage/ndb/src/ndbapi/NdbEventOperation.cpp b/storage/ndb/src/ndbapi/NdbEventOperation.cpp index 14d6fe69b35..dd317f2911b 100644 --- a/storage/ndb/src/ndbapi/NdbEventOperation.cpp +++ b/storage/ndb/src/ndbapi/NdbEventOperation.cpp @@ -38,6 +38,11 @@ NdbEventOperation::State NdbEventOperation::getState() return m_impl.getState(); } +void NdbEventOperation::separateEvents(bool flag) +{ + m_impl.m_separateEvents = flag; +} + NdbRecAttr * NdbEventOperation::getValue(const char *colName, char *aValue) { diff --git a/storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp b/storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp index 1fa0c6386be..953130be333 100644 --- a/storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp +++ b/storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp @@ -104,6 +104,8 @@ NdbEventOperationImpl::NdbEventOperationImpl(NdbEventOperation &N, m_state= EO_CREATED; + m_separateEvents = false; + m_has_error= 0; DBUG_PRINT("exit",("this: 0x%x oid: %u", this, m_oid)); @@ -693,6 +695,21 @@ NdbEventBuffer::pollEvents(int aMillisecondNumber, Uint64 *latestGCI) return ret; } +#ifdef VM_TRACE +static void +print_std(const char* tag, const SubTableData * sdata, LinearSectionPtr ptr[3]) +{ + printf("%s\n", tag); + printf("addr=%p gci=%d op=%d\n", (void*)sdata, sdata->gci, sdata->operation); + for (int i = 0; i <= 2; i++) { + printf("sec=%d addr=%p sz=%d\n", i, (void*)ptr[i].p, ptr[i].sz); + for (int j = 0; j < ptr[i].sz; j++) + printf("%08x ", ptr[i].p[j]); + printf("\n"); + } +} +#endif + NdbEventOperation * NdbEventBuffer::nextEvent() { @@ -734,6 +751,10 @@ NdbEventBuffer::nextEvent() op->m_data_done_count++; #endif + // NUL event is not returned + if (data->sdata->operation == NdbDictionary::Event::_TE_NUL) + continue; + int r= op->receive_event(); if (r > 0) { @@ -1099,13 +1120,15 @@ NdbEventBuffer::insertDataL(NdbEventOperationImpl *op, DBUG_ENTER("NdbEventBuffer::insertDataL"); Uint64 gci= sdata->gci; - EventBufData *data= m_free_data; if ( likely((Uint32)op->mi_type & 1 << (Uint32)sdata->operation) ) { Gci_container* bucket= find_bucket(&m_active_gci, gci); DBUG_PRINT("info", ("data insertion in eventId %d", op->m_eventId)); + DBUG_PRINT("info", ("gci=%d tab=%d op=%d node=%d", + sdata->gci, sdata->tableId, sdata->operation, + sdata->req_nodeid)); if (unlikely(bucket == 0)) { @@ -1116,61 +1139,65 @@ NdbEventBuffer::insertDataL(NdbEventOperationImpl *op, DBUG_RETURN(0); } - if (unlikely(data == 0)) + bool use_hash = + ! op->m_separateEvents && + sdata->operation < NdbDictionary::Event::_TE_FIRST_NON_DATA_EVENT; + + // find position in bucket hash table + EventBufData* data = 0; + EventBufData_hash::Pos hpos; + if (use_hash) { -#ifdef VM_TRACE - assert(m_free_data_count == 0); - assert(m_free_data_sz == 0); -#endif - expand(4000); - reportStatus(); + bucket->m_data_hash.search(hpos, op, ptr); + data = hpos.data; + } - data= m_free_data; + if (data == 0) + { + // allocate new result buffer + data = alloc_data(); if (unlikely(data == 0)) { -#ifdef VM_TRACE - printf("m_latest_command: %s\n", m_latest_command); - printf("no free data, m_latestGCI %lld\n", - m_latestGCI); - printf("m_free_data_count %d\n", m_free_data_count); - printf("m_available_data_count %d first gci %d last gci %d\n", - m_available_data.m_count, - m_available_data.m_head ? m_available_data.m_head->sdata->gci : 0, - m_available_data.m_tail ? m_available_data.m_tail->sdata->gci : 0); - printf("m_used_data_count %d\n", m_used_data.m_count); -#endif - op->m_has_error= 2; - DBUG_RETURN(-1); // TODO handle this, overrun, or, skip? + op->m_has_error = 2; + DBUG_RETURN(-1); } - } - // remove data from free list - m_free_data= data->m_next; + if (unlikely(copy_data(sdata, ptr, data))) + { + op->m_has_error = 3; + DBUG_RETURN(-1); + } + // add it to list and hash table + bucket->m_data.append(data); + if (use_hash) + { + bucket->m_data_hash.append(hpos, data); + } #ifdef VM_TRACE - m_free_data_count--; - assert(m_free_data_sz >= data->sz); + op->m_data_count++; #endif - m_free_data_sz-= data->sz; - - if (unlikely(copy_data_alloc(sdata, ptr, data))) + } + else { - op->m_has_error= 3; - DBUG_RETURN(-1); + // event with same op, PK found, merge into old buffer + if (unlikely(merge_data(sdata, ptr, data))) + { + op->m_has_error = 3; + DBUG_RETURN(-1); + } + } + data->m_event_op = op; + if (use_hash) + { + data->m_pkhash = hpos.pkhash; } - - // add it to received data - bucket->m_data.append(data); - - data->m_event_op= op; -#ifdef VM_TRACE - op->m_data_count++; -#endif DBUG_RETURN(0); } #ifdef VM_TRACE if ((Uint32)op->m_eventImpl->mi_type & 1 << (Uint32)sdata->operation) { + // XXX never reached DBUG_PRINT("info",("Data arrived before ready eventId", op->m_eventId)); DBUG_RETURN(0); } @@ -1183,80 +1210,324 @@ NdbEventBuffer::insertDataL(NdbEventOperationImpl *op, #endif } -int -NdbEventBuffer::copy_data_alloc(const SubTableData * const f_sdata, - LinearSectionPtr f_ptr[3], - EventBufData *ev_buf) +// allocate EventBufData +EventBufData* +NdbEventBuffer::alloc_data() { - DBUG_ENTER("NdbEventBuffer::copy_data_alloc"); - const unsigned min_alloc_size= 128; - const unsigned sz4= (sizeof(SubTableData)+3)>>2; - Uint32 f_ptr_sz_0= f_ptr[0].sz; - Uint32 f_ptr_sz_1= f_ptr[1].sz; - Uint32 f_ptr_sz_2= f_ptr[2].sz; - LinearSectionPtr *t_ptr= ev_buf->ptr; - SubTableData *sdata= ev_buf->sdata; - const unsigned alloc_size= (sz4 + - f_ptr_sz_0 + - f_ptr_sz_1 + - f_ptr_sz_2) * sizeof(Uint32); - Uint32 *ptr; - if (alloc_size > min_alloc_size) + DBUG_ENTER("alloc_data"); + EventBufData* data = m_free_data; + + if (unlikely(data == 0)) { - if (sdata) +#ifdef VM_TRACE + assert(m_free_data_count == 0); + assert(m_free_data_sz == 0); +#endif + expand(4000); + reportStatus(); + + data = m_free_data; + if (unlikely(data == 0)) { - NdbMem_Free((char*)sdata); #ifdef VM_TRACE - assert(m_total_alloc >= ev_buf->sz); + printf("m_latest_command: %s\n", m_latest_command); + printf("no free data, m_latestGCI %lld\n", + m_latestGCI); + printf("m_free_data_count %d\n", m_free_data_count); + printf("m_available_data_count %d first gci %d last gci %d\n", + m_available_data.m_count, + m_available_data.m_head ? m_available_data.m_head->sdata->gci : 0, + m_available_data.m_tail ? m_available_data.m_tail->sdata->gci : 0); + printf("m_used_data_count %d\n", m_used_data.m_count); #endif - m_total_alloc-= ev_buf->sz; + DBUG_RETURN(0); // TODO handle this, overrun, or, skip? } - ptr= (Uint32*)NdbMem_Allocate(alloc_size); - ev_buf->sdata= (SubTableData *)ptr; - ev_buf->sz= alloc_size; - m_total_alloc+= alloc_size; } - else /* alloc_size <= min_alloc_size */ + + // remove data from free list + m_free_data = data->m_next; + data->m_next = 0; +#ifdef VM_TRACE + m_free_data_count--; + assert(m_free_data_sz >= data->sz); +#endif + m_free_data_sz -= data->sz; + DBUG_RETURN(data); +} + +// allocate initial or bigger memory area in EventBufData +// takes sizes from given ptr and sets up data->ptr +int +NdbEventBuffer::alloc_mem(EventBufData* data, LinearSectionPtr ptr[3]) +{ + const Uint32 min_alloc_size = 128; + + Uint32 sz4 = (sizeof(SubTableData) + 3) >> 2; + Uint32 alloc_size = (sz4 + ptr[0].sz + ptr[1].sz + ptr[2].sz) << 2; + if (alloc_size < min_alloc_size) + alloc_size = min_alloc_size; + + if (data->sz < alloc_size) + { + NdbMem_Free((char*)data->memory); + assert(m_total_alloc >= data->sz); + m_total_alloc -= data->sz; + data->memory = 0; + data->sz = 0; + + data->memory = (Uint32*)NdbMem_Allocate(alloc_size); + if (data->memory == 0) + return -1; + data->sz = alloc_size; + m_total_alloc += data->sz; + } + + Uint32* memptr = data->memory; + memptr += sz4; + int i; + for (i = 0; i <= 2; i++) { - if (sdata) - ptr= (Uint32*)sdata; - else - { - ptr= (Uint32*)NdbMem_Allocate(min_alloc_size); - ev_buf->sdata= (SubTableData *)ptr; - ev_buf->sz= min_alloc_size; - m_total_alloc+= min_alloc_size; - } + data->ptr[i].p = memptr; + data->ptr[i].sz = ptr[i].sz; + memptr += ptr[i].sz; } - memcpy(ptr,f_sdata,sizeof(SubTableData)); - ptr+= sz4; + return 0; +} + +int +NdbEventBuffer::copy_data(const SubTableData * const sdata, + LinearSectionPtr ptr[3], + EventBufData* data) +{ + DBUG_ENTER("NdbEventBuffer::copy_data"); - t_ptr->p= ptr; - t_ptr->sz= f_ptr_sz_0; + if (alloc_mem(data, ptr) != 0) + DBUG_RETURN(-1); + memcpy(data->sdata, sdata, sizeof(SubTableData)); + int i; + for (i = 0; i <= 2; i++) + memcpy(data->ptr[i].p, ptr[i].p, ptr[i].sz << 2); + DBUG_RETURN(0); +} - memcpy(ptr, f_ptr[0].p, sizeof(Uint32)*f_ptr_sz_0); - ptr+= f_ptr_sz_0; - t_ptr++; +static struct Ev_t { + enum { + INS = NdbDictionary::Event::_TE_INSERT, + DEL = NdbDictionary::Event::_TE_DELETE, + UPD = NdbDictionary::Event::_TE_UPDATE, + NUL = NdbDictionary::Event::_TE_NUL, + ERR = 255 + }; + int t1, t2, t3; +} ev_t[] = { + { Ev_t::INS, Ev_t::INS, Ev_t::ERR }, + { Ev_t::INS, Ev_t::DEL, Ev_t::NUL }, //ok + { Ev_t::INS, Ev_t::UPD, Ev_t::INS }, //ok + { Ev_t::DEL, Ev_t::INS, Ev_t::UPD }, //ok + { Ev_t::DEL, Ev_t::DEL, Ev_t::ERR }, + { Ev_t::DEL, Ev_t::UPD, Ev_t::ERR }, + { Ev_t::UPD, Ev_t::INS, Ev_t::ERR }, + { Ev_t::UPD, Ev_t::DEL, Ev_t::DEL }, //ok + { Ev_t::UPD, Ev_t::UPD, Ev_t::UPD } //ok +}; - t_ptr->p= ptr; - t_ptr->sz= f_ptr_sz_1; +/* + * | INS | DEL | UPD + * 0 | pk ah + all ah | pk ah | pk ah + new ah + * 1 | pk ad + all ad | old pk ad | new pk ad + new ad + * 2 | empty | old non-pk ah+ad | old ah+ad + */ - memcpy(ptr, f_ptr[1].p, sizeof(Uint32)*f_ptr_sz_1); - ptr+= f_ptr_sz_1; - t_ptr++; +static AttributeHeader +copy_head(Uint32& i1, Uint32* p1, Uint32& i2, const Uint32* p2, + Uint32 flags) +{ + AttributeHeader ah(p2[i2]); + bool do_copy = (flags & 1); + if (do_copy) + p1[i1] = p2[i2]; + i1++; + i2++; + return ah; +} - if (f_ptr_sz_2) +static void +copy_attr(AttributeHeader ah, + Uint32& j1, Uint32* p1, Uint32& j2, const Uint32* p2, + Uint32 flags) +{ + bool do_copy = (flags & 1); + bool with_head = (flags & 2); + Uint32 n = with_head + ah.getDataSize(); + if (do_copy) { - t_ptr->p= ptr; - t_ptr->sz= f_ptr_sz_2; - memcpy(ptr, f_ptr[2].p, sizeof(Uint32)*f_ptr_sz_2); + Uint32 k; + for (k = 0; k < n; k++) + p1[j1++] = p2[j2++]; } else { - t_ptr->p= 0; - t_ptr->sz= 0; + j1 += n; + j2 += n; } +} + +int +NdbEventBuffer::merge_data(const SubTableData * const sdata, + LinearSectionPtr ptr2[3], + EventBufData* data) +{ + DBUG_ENTER("NdbEventBuffer::merge_data"); + + Uint32 nkey = data->m_event_op->m_eventImpl->m_tableImpl->m_noOfKeys; + + int t1 = data->sdata->operation; + int t2 = sdata->operation; + if (t1 == Ev_t::NUL) + DBUG_RETURN(copy_data(sdata, ptr2, data)); + + Ev_t* tp = 0; + int i; + for (i = 0; i < sizeof(ev_t)/sizeof(ev_t[0]); i++) { + if (ev_t[i].t1 == t1 && ev_t[i].t2 == t2) { + tp = &ev_t[i]; + break; + } + } + assert(tp != 0 && tp->t3 != Ev_t::ERR); + + // save old data + EventBufData olddata = *data; + data->memory = 0; + data->sz = 0; + + // compose ptr1 o ptr2 = ptr + LinearSectionPtr (&ptr1) [3] = olddata.ptr; + LinearSectionPtr (&ptr) [3] = data->ptr; + + // loop twice where first loop only sets sizes + int loop; + for (loop = 0; loop <= 1; loop++) + { + if (loop == 1) + { + if (alloc_mem(data, ptr) != 0) + DBUG_RETURN(-1); + *data->sdata = *sdata; + data->sdata->operation = tp->t3; + } + + ptr[0].sz = ptr[1].sz = ptr[3].sz = 0; + + // copy pk from new version + { + AttributeHeader ah; + Uint32 i = 0; + Uint32 j = 0; + Uint32 i2 = 0; + Uint32 j2 = 0; + while (i < nkey) + { + ah = copy_head(i, ptr[0].p, i2, ptr2[0].p, loop); + copy_attr(ah, j, ptr[1].p, j2, ptr2[1].p, loop); + } + ptr[0].sz = i; + ptr[1].sz = j; + } + + // merge after values, new version overrides + if (tp->t3 != Ev_t::DEL) + { + AttributeHeader ah; + Uint32 i = ptr[0].sz; + Uint32 j = ptr[1].sz; + Uint32 i1 = 0; + Uint32 j1 = 0; + Uint32 i2 = nkey; + Uint32 j2 = ptr[1].sz; + while (i1 < nkey) + { + j1 += AttributeHeader(ptr1[0].p[i1++]).getDataSize(); + } + while (1) + { + bool b1 = (i1 < ptr1[0].sz); + bool b2 = (i2 < ptr2[0].sz); + if (b1 && b2) + { + Uint32 id1 = AttributeHeader(ptr1[0].p[i1]).getAttributeId(); + Uint32 id2 = AttributeHeader(ptr2[0].p[i2]).getAttributeId(); + if (id1 < id2) + b2 = false; + else if (id1 > id2) + b1 = false; + else + { + j1 += AttributeHeader(ptr1[0].p[i1++]).getDataSize(); + b1 = false; + } + } + if (b1) + { + ah = copy_head(i, ptr[0].p, i1, ptr1[0].p, loop); + copy_attr(ah, j, ptr[1].p, j1, ptr1[1].p, loop); + } + else if (b2) + { + ah = copy_head(i, ptr[0].p, i2, ptr2[0].p, loop); + copy_attr(ah, j, ptr[1].p, j2, ptr2[1].p, loop); + } + else + break; + } + ptr[0].sz = i; + ptr[1].sz = j; + } + + // merge before values, old version overrides + if (tp->t3 != Ev_t::INS) + { + AttributeHeader ah; + Uint32 k = 0; + Uint32 k1 = 0; + Uint32 k2 = 0; + while (1) + { + bool b1 = (k1 < ptr1[2].sz); + bool b2 = (k2 < ptr2[2].sz); + if (b1 && b2) + { + Uint32 id1 = AttributeHeader(ptr1[2].p[k1]).getAttributeId(); + Uint32 id2 = AttributeHeader(ptr2[2].p[k2]).getAttributeId(); + if (id1 < id2) + b2 = false; + else if (id1 > id2) + b1 = false; + else + { + k2 += 1 + AttributeHeader(ptr2[2].p[k2]).getDataSize(); + b2 = false; + } + } + if (b1) + { + ah = AttributeHeader(ptr1[2].p[k1]); + copy_attr(ah, k, ptr[2].p, k1, ptr1[2].p, loop | 2); + } + else if (b2) + { + ah = AttributeHeader(ptr2[2].p[k2]); + copy_attr(ah, k, ptr[2].p, k2, ptr2[2].p, loop | 2); + } + else + break; + } + } + } + + // free old data + NdbMem_Free((char*)olddata.memory); DBUG_RETURN(0); } @@ -1399,5 +1670,107 @@ send_report: #endif } +// hash table routines + +// could optimize the all-fixed case +Uint32 +EventBufData_hash::getpkhash(NdbEventOperationImpl* op, LinearSectionPtr ptr[3]) +{ + const NdbTableImpl* tab = op->m_eventImpl->m_tableImpl; + + // in all cases ptr[0] = pk ah.. ptr[1] = pk ad.. + // for pk update (to equivalent pk) post/pre values give same hash + Uint32 nkey = tab->m_noOfKeys; + assert(nkey != 0 && nkey <= ptr[0].sz); + const Uint32* hptr = ptr[0].p; + const uchar* dptr = (uchar*)ptr[1].p; + + // hash registers + ulong nr1 = 0; + ulong nr2 = 0; + while (nkey-- != 0) + { + AttributeHeader ah(*hptr++); + Uint32 bytesize = ah.getByteSize(); + assert(dptr + bytesize <= (uchar*)(ptr[1].p + ptr[1].sz)); + + Uint32 i = ah.getAttributeId(); + const NdbColumnImpl* col = tab->getColumn(i); + assert(col != 0); + + Uint32 lb, len; + bool ok = NdbSqlUtil::get_var_length(col->m_type, dptr, bytesize, lb, len); + assert(ok); + + CHARSET_INFO* cs = col->m_cs ? col->m_cs : &my_charset_bin; + (*cs->coll->hash_sort)(cs, dptr + lb, len, &nr1, &nr2); + dptr += bytesize; + } + return nr1; +} + +// this is seldom invoked +bool +EventBufData_hash::getpkequal(NdbEventOperationImpl* op, LinearSectionPtr ptr1[3], LinearSectionPtr ptr2[3]) +{ + const NdbTableImpl* tab = op->m_eventImpl->m_tableImpl; + + Uint32 nkey = tab->m_noOfKeys; + assert(nkey != 0 && nkey <= ptr1[0].sz && nkey <= ptr2[0].sz); + const Uint32* hptr1 = ptr1[0].p; + const Uint32* hptr2 = ptr2[0].p; + const uchar* dptr1 = (uchar*)ptr1[1].p; + const uchar* dptr2 = (uchar*)ptr2[1].p; + + while (nkey-- != 0) + { + AttributeHeader ah1(*hptr1++); + AttributeHeader ah2(*hptr2++); + // sizes can differ on update of varchar endspace + Uint32 bytesize1 = ah1.getByteSize(); + Uint32 bytesize2 = ah1.getByteSize(); + assert(dptr1 + bytesize1 <= (uchar*)(ptr1[1].p + ptr1[1].sz)); + assert(dptr2 + bytesize2 <= (uchar*)(ptr2[1].p + ptr2[1].sz)); + + assert(ah1.getAttributeId() == ah2.getAttributeId()); + Uint32 i = ah1.getAttributeId(); + const NdbColumnImpl* col = tab->getColumn(i); + assert(col != 0); + + Uint32 lb1, len1; + bool ok1 = NdbSqlUtil::get_var_length(col->m_type, dptr1, bytesize1, lb1, len1); + Uint32 lb2, len2; + bool ok2 = NdbSqlUtil::get_var_length(col->m_type, dptr2, bytesize2, lb2, len2); + assert(ok1 && ok2 && lb1 == lb2); + + CHARSET_INFO* cs = col->m_cs ? col->m_cs : &my_charset_bin; + int res = (cs->coll->strnncollsp)(cs, dptr1 + lb1, len1, dptr2 + lb2, len2, false); + if (res != 0) + return false; + dptr1 += bytesize1; + dptr2 += bytesize2; + } + return true; +} + +void +EventBufData_hash::search(Pos& hpos, NdbEventOperationImpl* op, LinearSectionPtr ptr[3]) +{ + Uint32 pkhash = getpkhash(op, ptr); + Uint32 index = (op->m_oid ^ pkhash) % GCI_EVENT_HASH_SIZE; + EventBufData* data = m_hash[index]; + while (data != 0) + { + if (data->m_event_op == op && + data->m_pkhash == pkhash && + getpkequal(op, data->ptr, ptr)) + break; + data = data->m_next_hash; + } + hpos.index = index; + hpos.data = data; + hpos.pkhash = pkhash; +} + template class Vector; template class Vector; diff --git a/storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp b/storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp index 542a4a594a5..a04b5fecb34 100644 --- a/storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp +++ b/storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp @@ -25,16 +25,19 @@ #define NDB_EVENT_OP_MAGIC_NUMBER 0xA9F301B4 class NdbEventOperationImpl; + struct EventBufData { union { SubTableData *sdata; - char *memory; + Uint32 *memory; }; LinearSectionPtr ptr[3]; unsigned sz; NdbEventOperationImpl *m_event_op; EventBufData *m_next; // Next wrt to global order + EventBufData *m_next_hash; // Next in per-GCI hash + Uint32 m_pkhash; // PK hash (without op) for fast compare }; class EventBufData_list @@ -116,6 +119,34 @@ void EventBufData_list::append(const EventBufData_list &list) m_sz+= list.m_sz; } +// GCI bucket has also a hash over data, with key event op, table PK. +// It can only be appended to and is invalid after remove_first(). +class EventBufData_hash +{ +public: + struct Pos { // search result + Uint32 index; // index into hash array + EventBufData* data; // non-zero if found + Uint32 pkhash; // PK hash + }; + + static Uint32 getpkhash(NdbEventOperationImpl* op, LinearSectionPtr ptr[3]); + static bool getpkequal(NdbEventOperationImpl* op, LinearSectionPtr ptr1[3], LinearSectionPtr ptr2[3]); + + void search(Pos& hpos, NdbEventOperationImpl* op, LinearSectionPtr ptr[3]); + void append(Pos& hpos, EventBufData* data); + + enum { GCI_EVENT_HASH_SIZE = 101 }; + EventBufData* m_hash[GCI_EVENT_HASH_SIZE]; +}; + +inline +void EventBufData_hash::append(Pos& hpos, EventBufData* data) +{ + data->m_next_hash = m_hash[hpos.index]; + m_hash[hpos.index] = data; +} + struct Gci_container { enum State @@ -127,6 +158,7 @@ struct Gci_container Uint32 m_gcp_complete_rep_count; // Remaining SUB_GCP_COMPLETE_REP until done Uint64 m_gci; // GCI EventBufData_list m_data; + EventBufData_hash m_data_hash; }; class NdbEventOperationImpl : public NdbEventOperation { @@ -173,6 +205,8 @@ public: */ Uint32 m_eventId; Uint32 m_oid; + + bool m_separateEvents; EventBufData *m_data_item; @@ -212,7 +246,6 @@ public: void add_op(); void remove_op(); void init_gci_containers(); - Uint32 m_active_op_count; // accessed from the "receive thread" int insertDataL(NdbEventOperationImpl *op, @@ -233,10 +266,15 @@ public: NdbEventOperationImpl *move_data(); - // used by both user thread and receive thread - int copy_data_alloc(const SubTableData * const f_sdata, - LinearSectionPtr f_ptr[3], - EventBufData *ev_buf); + // routines to copy/merge events + EventBufData* alloc_data(); + int alloc_mem(EventBufData* data, LinearSectionPtr ptr[3]); + int copy_data(const SubTableData * const sdata, + LinearSectionPtr ptr[3], + EventBufData* data); + int merge_data(const SubTableData * const sdata, + LinearSectionPtr ptr[3], + EventBufData* data); void free_list(EventBufData_list &list); @@ -290,6 +328,8 @@ private: // dropped event operations that have not yet // been deleted NdbEventOperationImpl *m_dropped_ev_op; + + Uint32 m_active_op_count; }; inline diff --git a/storage/ndb/test/ndbapi/test_event.cpp b/storage/ndb/test/ndbapi/test_event.cpp index 1bb614c1c8b..f4b64c61f03 100644 --- a/storage/ndb/test/ndbapi/test_event.cpp +++ b/storage/ndb/test/ndbapi/test_event.cpp @@ -169,6 +169,7 @@ eventOperation(Ndb* pNdb, const NdbDictionary::Table &tab, void* pstats, int rec g_err << function << "Event operation creation failed\n"; return NDBT_FAILED; } + pOp->separateEvents(true); g_info << function << "get values\n"; NdbRecAttr* recAttr[1024]; @@ -370,6 +371,7 @@ int runCreateDropEventOperation(NDBT_Context* ctx, NDBT_Step* step) g_err << "Event operation creation failed\n"; return NDBT_FAILED; } + pOp->separateEvents(true); g_info << "dropping event operation" << endl; int res = pNdb->dropEventOperation(pOp); @@ -540,6 +542,7 @@ int runEventApplier(NDBT_Context* ctx, NDBT_Step* step) g_err << "Event operation creation failed on %s" << buf << endl; DBUG_RETURN(NDBT_FAILED); } + pOp->separateEvents(true); int i; int n_columns= table->getNoOfColumns(); @@ -1185,6 +1188,7 @@ static int createEventOperations(Ndb * ndb) { DBUG_RETURN(NDBT_FAILED); } + pOp->separateEvents(true); int n_columns= pTabs[i]->getNoOfColumns(); for (int j = 0; j < n_columns; j++) -- cgit v1.2.1