From 133c2d7b4614a74093137bc3c642e31152c115b7 Mon Sep 17 00:00:00 2001 From: unknown Date: Sun, 7 May 2006 08:12:00 +0200 Subject: ndb - dbacc - remove some unused variables storage/ndb/src/kernel/blocks/dbacc/Dbacc.hpp: remove unused variables storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp: remove unused variables --- storage/ndb/src/kernel/blocks/dbacc/Dbacc.hpp | 4 ---- storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp | 6 ++---- 2 files changed, 2 insertions(+), 8 deletions(-) diff --git a/storage/ndb/src/kernel/blocks/dbacc/Dbacc.hpp b/storage/ndb/src/kernel/blocks/dbacc/Dbacc.hpp index 7000d9dc8c4..726761726ce 100644 --- a/storage/ndb/src/kernel/blocks/dbacc/Dbacc.hpp +++ b/storage/ndb/src/kernel/blocks/dbacc/Dbacc.hpp @@ -358,7 +358,6 @@ struct Fragmentrec { // List of lock owners and list of lock waiters to support LCP handling //----------------------------------------------------------------------------- Uint32 lockOwnersList; - Uint32 m_current_sequence_no; //----------------------------------------------------------------------------- // References to Directory Ranges (which in turn references directories, which @@ -502,9 +501,6 @@ struct Operationrec { Uint32 scanRecPtr; Uint32 transId1; Uint32 transId2; - Uint32 longPagePtr; - Uint32 longKeyPageIndex; - Uint32 m_sequence_no; State opState; Uint32 userptr; State transactionstate; diff --git a/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp b/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp index 2d6f579302c..9779d046169 100644 --- a/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp +++ b/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp @@ -984,8 +984,6 @@ void Dbacc::initOpRec(Signal* signal) operationRecPtr.p->insertIsDone = ZFALSE; operationRecPtr.p->elementIsDisappeared = ZFALSE; operationRecPtr.p->insertDeleteLen = fragrecptr.p->elementLength; - operationRecPtr.p->longPagePtr = RNIL; - operationRecPtr.p->longKeyPageIndex = RNIL; operationRecPtr.p->scanRecPtr = RNIL; // bit to mark lock operation @@ -1405,7 +1403,7 @@ void Dbacc::placeSerialQueueRead(Signal* signal) { readWriteOpPtr.i = queOperPtr.p->nextSerialQue; ptrCheckGuard(readWriteOpPtr, coprecsize, operationrec); - PSQR_LOOP: +PSQR_LOOP: jam(); if (readWriteOpPtr.p->nextSerialQue == RNIL) { jam(); @@ -1596,7 +1594,7 @@ Uint32 Dbacc::placeWriteInLockQueue(Signal* signal) void Dbacc::placeSerialQueueWrite(Signal* signal) { readWriteOpPtr = queOperPtr; - PSQW_LOOP: +PSQW_LOOP: if (readWriteOpPtr.p->nextSerialQue == RNIL) { jam(); /* --------------------------------------------------------------------------------- */ -- cgit v1.2.1 From e65c68fc327344f2a286d8fdd1c5d1cc7df2f9a5 Mon Sep 17 00:00:00 2001 From: unknown Date: Sun, 7 May 2006 09:42:15 +0200 Subject: ndb - dbacc - unused variables storage/ndb/src/kernel/blocks/dbacc/Dbacc.hpp: more unused variables storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp: more unused variables --- storage/ndb/src/kernel/blocks/dbacc/Dbacc.hpp | 4 --- storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp | 43 +++++------------------ 2 files changed, 8 insertions(+), 39 deletions(-) diff --git a/storage/ndb/src/kernel/blocks/dbacc/Dbacc.hpp b/storage/ndb/src/kernel/blocks/dbacc/Dbacc.hpp index 726761726ce..8373004fd0c 100644 --- a/storage/ndb/src/kernel/blocks/dbacc/Dbacc.hpp +++ b/storage/ndb/src/kernel/blocks/dbacc/Dbacc.hpp @@ -477,7 +477,6 @@ struct Fragmentrec { /* OPERATIONREC */ /* --------------------------------------------------------------------------------- */ struct Operationrec { - Uint32 keydata[8]; Uint32 localdata[2]; Uint32 elementIsforward; Uint32 elementPage; @@ -487,16 +486,13 @@ struct Operationrec { Uint32 hashvaluePart; Uint32 hashValue; Uint32 insertDeleteLen; - Uint32 keyinfoPage; Uint32 nextLockOwnerOp; Uint32 nextOp; Uint32 nextParallelQue; - Uint32 nextQueOp; Uint32 nextSerialQue; Uint32 prevOp; Uint32 prevLockOwnerOp; Uint32 prevParallelQue; - Uint32 prevQueOp; Uint32 prevSerialQue; Uint32 scanRecPtr; Uint32 transId1; diff --git a/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp b/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp index 9779d046169..3768609860b 100644 --- a/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp +++ b/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp @@ -974,12 +974,9 @@ void Dbacc::initOpRec(Signal* signal) operationRecPtr.p->fragptr = fragrecptr.i; operationRecPtr.p->nextParallelQue = RNIL; operationRecPtr.p->prevParallelQue = RNIL; - operationRecPtr.p->prevQueOp = RNIL; - operationRecPtr.p->nextQueOp = RNIL; operationRecPtr.p->nextSerialQue = RNIL; operationRecPtr.p->prevSerialQue = RNIL; operationRecPtr.p->elementPage = RNIL; - operationRecPtr.p->keyinfoPage = RNIL; operationRecPtr.p->lockOwner = ZFALSE; operationRecPtr.p->insertIsDone = ZFALSE; operationRecPtr.p->elementIsDisappeared = ZFALSE; @@ -990,13 +987,6 @@ void Dbacc::initOpRec(Signal* signal) operationRecPtr.p->isAccLockReq = (Treqinfo >> 31) & 0x1; // undo log is not run via ACCKEYREQ - if(ERROR_INSERTED(5900) || ERROR_INSERTED(5901)) - { - for(unsigned i = 0; i<8 && itheData[4]; i++){ - operationRecPtr.p->keydata[i] = signal->theData[i+7]; - } - } - }//Dbacc::initOpRec() /* --------------------------------------------------------------------------------- */ @@ -1640,13 +1630,6 @@ PSQW_LOOP: /* ------------------------------------------------------------------------- */ void Dbacc::acckeyref1Lab(Signal* signal, Uint32 result_code) { - if (operationRecPtr.p->keyinfoPage != RNIL) { - jam(); - rpPageptr.i = operationRecPtr.p->keyinfoPage; - ptrCheckGuard(rpPageptr, cpagesize, page8); - releasePage(signal); - operationRecPtr.p->keyinfoPage = RNIL; - }//if operationRecPtr.p->transactionstate = WAIT_COMMIT_ABORT; /* ************************<< */ /* ACCKEYREF */ @@ -4040,12 +4023,9 @@ void Dbacc::insertLockOwnersList(Signal* signal, insOperPtr.p->lockOwner = ZTRUE; insOperPtr.p->prevLockOwnerOp = RNIL; tmpOperPtr.i = fragrecptr.p->lockOwnersList; - const Uint32 seq = fragrecptr.p->m_current_sequence_no; insOperPtr.p->nextLockOwnerOp = tmpOperPtr.i; - insOperPtr.p->m_sequence_no = seq; fragrecptr.p->lockOwnersList = insOperPtr.i; - fragrecptr.p->m_current_sequence_no = seq+1; if (tmpOperPtr.i == RNIL) { return; } else { @@ -6014,9 +5994,6 @@ void Dbacc::initScanOpRec(Signal* signal) operationRecPtr.p->prevParallelQue = RNIL; operationRecPtr.p->nextSerialQue = RNIL; operationRecPtr.p->prevSerialQue = RNIL; - operationRecPtr.p->prevQueOp = RNIL; - operationRecPtr.p->nextQueOp = RNIL; - operationRecPtr.p->keyinfoPage = RNIL; // Safety precaution operationRecPtr.p->transId1 = scanPtr.p->scanTrid1; operationRecPtr.p->transId2 = scanPtr.p->scanTrid2; operationRecPtr.p->lockOwner = ZFALSE; @@ -6036,7 +6013,6 @@ void Dbacc::initScanOpRec(Signal* signal) tisoLocalPtr = tisoLocalPtr + tisoIsforward; }//for arrGuard(tisoLocalPtr, 2048); - operationRecPtr.p->keydata[0] = isoPageptr.p->word32[tisoLocalPtr]; operationRecPtr.p->tupkeylen = fragrecptr.p->keyLength; operationRecPtr.p->xfrmtupkeylen = 0; // not used }//Dbacc::initScanOpRec() @@ -7410,21 +7386,18 @@ Dbacc::execDUMP_STATE_ORD(Signal* signal) infoEvent("fid=%d, fragptr=%d, hashvaluePart=%d ", tmpOpPtr.p->fid, tmpOpPtr.p->fragptr, tmpOpPtr.p->hashvaluePart); - infoEvent("hashValue=%d, insertDeleteLen=%d, keyinfoPage=%d ", - tmpOpPtr.p->hashValue, tmpOpPtr.p->insertDeleteLen, - tmpOpPtr.p->keyinfoPage); + infoEvent("hashValue=%d, insertDeleteLen=%d", + tmpOpPtr.p->hashValue, tmpOpPtr.p->insertDeleteLen); infoEvent("nextLockOwnerOp=%d, nextOp=%d, nextParallelQue=%d ", tmpOpPtr.p->nextLockOwnerOp, tmpOpPtr.p->nextOp, tmpOpPtr.p->nextParallelQue); - infoEvent("nextQueOp=%d, nextSerialQue=%d, prevOp=%d ", - tmpOpPtr.p->nextQueOp, tmpOpPtr.p->nextSerialQue, + infoEvent("nextSerialQue=%d, prevOp=%d ", + tmpOpPtr.p->nextSerialQue, tmpOpPtr.p->prevOp); - infoEvent("prevLockOwnerOp=%d, prevParallelQue=%d, prevQueOp=%d ", - tmpOpPtr.p->prevLockOwnerOp, tmpOpPtr.p->nextParallelQue, - tmpOpPtr.p->prevQueOp); - infoEvent("prevSerialQue=%d, scanRecPtr=%d, longPagePtr=%d ", - tmpOpPtr.p->prevSerialQue, tmpOpPtr.p->scanRecPtr, - tmpOpPtr.p->longPagePtr); + infoEvent("prevLockOwnerOp=%d, prevParallelQue=%d", + tmpOpPtr.p->prevLockOwnerOp, tmpOpPtr.p->nextParallelQue); + infoEvent("prevSerialQue=%d, scanRecPtr=%d", + tmpOpPtr.p->prevSerialQue, tmpOpPtr.p->scanRecPtr); infoEvent("transactionstate=%d, elementIsDisappeared=%d, insertIsDone=%d ", tmpOpPtr.p->transactionstate, tmpOpPtr.p->elementIsDisappeared, tmpOpPtr.p->insertIsDone); -- cgit v1.2.1 From f2824f10fc0ceccbdf1a3fcacd73e275150a86ac Mon Sep 17 00:00:00 2001 From: unknown Date: Wed, 17 May 2006 13:13:07 +0200 Subject: ndb - Fix recursive mutex lock in drop index (ndbapi programs only) storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp: Fix recursive mutex lock in drop index (ndbapi programs only) storage/ndb/src/ndbapi/NdbDictionaryImpl.hpp: Fix recursive mutex lock in drop index (ndbapi programs only) --- storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp | 60 +---------- storage/ndb/src/ndbapi/NdbDictionaryImpl.hpp | 145 +++++++++++++-------------- 2 files changed, 75 insertions(+), 130 deletions(-) diff --git a/storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp b/storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp index 8eb0b37120d..be3df3aca4a 100644 --- a/storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp +++ b/storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp @@ -2958,63 +2958,6 @@ NdbDictionaryImpl::removeCachedObject(NdbTableImpl & impl) DBUG_RETURN(0); } -/***************************************************************** - * Get index info - */ -NdbIndexImpl* -NdbDictionaryImpl::getIndexImpl(const char * externalName, - const BaseString& internalName) -{ - ASSERT_NOT_MYSQLD; - Ndb_local_table_info * info = get_local_table_info(internalName); - if(info == 0){ - m_error.code = 4243; - return 0; - } - NdbTableImpl * tab = info->m_table_impl; - - if(tab->m_indexType == NdbDictionary::Object::TypeUndefined) - { - // Not an index - m_error.code = 4243; - return 0; - } - - NdbTableImpl* prim = getTable(tab->m_primaryTable.c_str()); - if(prim == 0){ - m_error.code = 4243; - return 0; - } - - return getIndexImpl(externalName, internalName, *tab, *prim); -} - -NdbIndexImpl* -NdbDictionaryImpl::getIndexImpl(const char * externalName, - const BaseString& internalName, - NdbTableImpl &tab, - NdbTableImpl &prim) -{ - DBUG_ENTER("NdbDictionaryImpl::getIndexImpl"); - DBUG_ASSERT(tab.m_indexType != NdbDictionary::Object::TypeUndefined); - /** - * Create index impl - */ - NdbIndexImpl* idx; - if(NdbDictInterface::create_index_obj_from_table(&idx, &tab, &prim) == 0){ - idx->m_table = &tab; - idx->m_externalName.assign(externalName); - idx->m_internalName.assign(internalName); - idx->m_table_id = prim.getObjectId(); - idx->m_table_version = prim.getObjectVersion(); - // TODO Assign idx to tab->m_index - // Don't do it right now since assign can't asign a table with index - // tab->m_index = idx; - DBUG_RETURN(idx); - } - DBUG_RETURN(0); -} - int NdbDictInterface::create_index_obj_from_table(NdbIndexImpl** dst, NdbTableImpl* tab, @@ -3072,6 +3015,9 @@ NdbDictInterface::create_index_obj_from_table(NdbIndexImpl** dst, tab->m_columns[i]->m_distributionKey = 0; } + idx->m_table_id = prim->getObjectId(); + idx->m_table_version = prim->getObjectVersion(); + * dst = idx; DBUG_PRINT("exit", ("m_id: %d m_version: %d", idx->m_id, idx->m_version)); DBUG_RETURN(0); diff --git a/storage/ndb/src/ndbapi/NdbDictionaryImpl.hpp b/storage/ndb/src/ndbapi/NdbDictionaryImpl.hpp index 5a7a1ebb0ab..cf30abc6c3f 100644 --- a/storage/ndb/src/ndbapi/NdbDictionaryImpl.hpp +++ b/storage/ndb/src/ndbapi/NdbDictionaryImpl.hpp @@ -617,6 +617,7 @@ public: get_local_table_info(const BaseString& internalTableName); NdbIndexImpl * getIndex(const char * indexName, const char * tableName); + NdbIndexImpl * getIndex(const char * indexName, const NdbTableImpl& prim); NdbEventImpl * getEvent(const char * eventName, NdbTableImpl* = NULL); NdbEventImpl * getBlobEvent(const NdbEventImpl& ev, uint col_no); NdbEventImpl * getEventImpl(const char * internalName); @@ -958,51 +959,36 @@ NdbDictionaryImpl::get_local_table_info(const BaseString& internalTableName) DBUG_RETURN(info); // autoincrement already initialized } -class InitIndexGlobal : public GlobalCacheInitObject +class InitIndex : public GlobalCacheInitObject { public: const char *m_index_name; - NdbTableImpl &m_prim; + const NdbTableImpl &m_prim; - InitIndexGlobal(NdbDictionaryImpl *dict, - const BaseString &internal_indexname, - const char *index_name, - NdbTableImpl &prim) : - GlobalCacheInitObject(dict, internal_indexname), + InitIndex(const BaseString &internal_indexname, + const char *index_name, + const NdbTableImpl &prim) : + GlobalCacheInitObject(0, internal_indexname), m_index_name(index_name), m_prim(prim) - {} - int init(NdbTableImpl &tab) const - { - tab.m_index= m_dict->getIndexImpl(m_index_name, m_name, tab, m_prim); - if (tab.m_index == 0) - return 1; - tab.m_index->m_table= &tab; - return 0; - } -}; - -class InitIndex : public GlobalCacheInitObject -{ -public: - const char *m_index_name; - - InitIndex(NdbDictionaryImpl *dict, - const BaseString &internal_indexname, - const char *index_name) : - GlobalCacheInitObject(dict, internal_indexname), - m_index_name(index_name) - {} - int init(NdbTableImpl &tab) const - { - DBUG_ASSERT(tab.m_index == 0); - tab.m_index= m_dict->getIndexImpl(m_index_name, m_name); - if (tab.m_index) + {} + + int init(NdbTableImpl &tab) const { + DBUG_ENTER("InitIndex::init"); + DBUG_ASSERT(tab.m_indexType != NdbDictionary::Object::TypeUndefined); + /** + * Create index impl + */ + NdbIndexImpl* idx; + if(NdbDictInterface::create_index_obj_from_table(&idx, &tab, &m_prim) == 0) { - tab.m_index->m_table= &tab; - return 0; + idx->m_table = &tab; + idx->m_externalName.assign(m_index_name); + idx->m_internalName.assign(m_name); + tab.m_index = idx; + DBUG_RETURN(0); } - return 1; + DBUG_RETURN(1); } }; @@ -1019,14 +1005,14 @@ NdbDictionaryImpl::getIndexGlobal(const char * index_name, while (retry) { NdbTableImpl *tab= - fetchGlobalTableImplRef(InitIndexGlobal(this, internal_indexname, - index_name, ndbtab)); + fetchGlobalTableImplRef(InitIndex(internal_indexname, + index_name, ndbtab)); if (tab) { // tab->m_index sould be set. otherwise tab == 0 NdbIndexImpl *idx= tab->m_index; - if (idx->m_table_id != ndbtab.getObjectId() || - idx->m_table_version != ndbtab.getObjectVersion()) + if (idx->m_table_id != (unsigned)ndbtab.getObjectId() || + idx->m_table_version != (unsigned)ndbtab.getObjectVersion()) { releaseIndexGlobal(*idx, 1); retry--; @@ -1067,41 +1053,54 @@ NdbIndexImpl * NdbDictionaryImpl::getIndex(const char * index_name, const char * table_name) { - while (table_name || m_ndb.usingFullyQualifiedNames()) + if (table_name == 0) { - const BaseString internal_indexname( - (table_name) - ? - m_ndb.internalize_index_name(getTable(table_name), index_name) - : - m_ndb.internalize_table_name(index_name)); // Index is also a table - - if (internal_indexname.length()) - { - Ndb_local_table_info *info= m_localHash.get(internal_indexname.c_str()); - NdbTableImpl *tab; - if (info == 0) - { - tab= fetchGlobalTableImplRef(InitIndex(this, internal_indexname, - index_name)); - if (tab) - { - info= Ndb_local_table_info::create(tab, 0); - if (info) - m_localHash.put(internal_indexname.c_str(), info); - else - break; - } - else - break; - } - else - tab= info->m_table_impl; - return tab->m_index; - } - break; + assert(0); + m_error.code= 4243; + return 0; + } + + + NdbTableImpl* prim = getTable(table_name); + if (prim == 0) + { + m_error.code= 4243; + return 0; } + return getIndex(index_name, *prim); +} + +inline +NdbIndexImpl * +NdbDictionaryImpl::getIndex(const char* index_name, + const NdbTableImpl& prim) +{ + + const BaseString + internal_indexname(m_ndb.internalize_index_name(&prim, index_name)); + + Ndb_local_table_info *info= m_localHash.get(internal_indexname.c_str()); + NdbTableImpl *tab; + if (info == 0) + { + tab= fetchGlobalTableImplRef(InitIndex(internal_indexname, + index_name, + prim)); + if (!tab) + goto err; + + info= Ndb_local_table_info::create(tab, 0); + if (!info) + goto err; + m_localHash.put(internal_indexname.c_str(), info); + } + else + tab= info->m_table_impl; + + return tab->m_index; + +err: m_error.code= 4243; return 0; } -- cgit v1.2.1 From 0185fae546d6b5779abd5ef5ea04f2b3c479978f Mon Sep 17 00:00:00 2001 From: unknown Date: Wed, 17 May 2006 16:59:44 +0200 Subject: ndb - Add mutex surronding sessions, as ndb_mgmd now actively tries to go and "purge stale sessions" storage/ndb/include/util/SocketServer.hpp: Add mutex surronding sessions, as ndb_mgmd now actively tries to go and "purge stale sessions" storage/ndb/src/common/util/SocketServer.cpp: Add mutex surronding sessions, as ndb_mgmd now actively tries to go and "purge stale sessions" storage/ndb/src/mgmsrv/Services.cpp: Add mutex surronding sessions, as ndb_mgmd now actively tries to go and "purge stale sessions" --- storage/ndb/include/util/SocketServer.hpp | 8 +++-- storage/ndb/src/common/util/SocketServer.cpp | 44 ++++++++++++++++++++++------ storage/ndb/src/mgmsrv/Services.cpp | 2 ++ 3 files changed, 42 insertions(+), 12 deletions(-) diff --git a/storage/ndb/include/util/SocketServer.hpp b/storage/ndb/include/util/SocketServer.hpp index e766a0b99c4..c4f7e8c0ade 100644 --- a/storage/ndb/include/util/SocketServer.hpp +++ b/storage/ndb/include/util/SocketServer.hpp @@ -106,7 +106,8 @@ public: void stopSessions(bool wait = false); void foreachSession(void (*f)(Session*, void*), void *data); - + void checkSessions(); + private: struct SessionInstance { Service * m_service; @@ -117,12 +118,13 @@ private: Service * m_service; NDB_SOCKET_TYPE m_socket; }; - MutexVector m_sessions; + NdbLockable m_session_mutex; + Vector m_sessions; MutexVector m_services; unsigned m_maxSessions; void doAccept(); - void checkSessions(); + void checkSessionsImpl(); void startSession(SessionInstance &); /** diff --git a/storage/ndb/src/common/util/SocketServer.cpp b/storage/ndb/src/common/util/SocketServer.cpp index f0af925cf6d..f9d2c7463be 100644 --- a/storage/ndb/src/common/util/SocketServer.cpp +++ b/storage/ndb/src/common/util/SocketServer.cpp @@ -184,9 +184,12 @@ SocketServer::doAccept(){ SessionInstance s; s.m_service = si.m_service; s.m_session = si.m_service->newSession(childSock); - if(s.m_session != 0){ + if(s.m_session != 0) + { + m_session_mutex.lock(); m_sessions.push_back(s); startSession(m_sessions.back()); + m_session_mutex.unlock(); } continue; @@ -240,10 +243,13 @@ void SocketServer::doRun(){ while(!m_stopThread){ - checkSessions(); + m_session_mutex.lock(); + checkSessionsImpl(); if(m_sessions.size() < m_maxSessions){ + m_session_mutex.unlock(); doAccept(); } else { + m_session_mutex.unlock(); NdbSleep_MilliSleep(200); } } @@ -276,17 +282,30 @@ transfer(NDB_SOCKET_TYPE sock){ void SocketServer::foreachSession(void (*func)(SocketServer::Session*, void *), void *data) { + m_session_mutex.lock(); for(int i = m_sessions.size() - 1; i >= 0; i--){ (*func)(m_sessions[i].m_session, data); } - checkSessions(); + m_session_mutex.unlock(); } void -SocketServer::checkSessions(){ - for(int i = m_sessions.size() - 1; i >= 0; i--){ - if(m_sessions[i].m_session->m_stopped){ - if(m_sessions[i].m_thread != 0){ +SocketServer::checkSessions() +{ + m_session_mutex.lock(); + checkSessionsImpl(); + m_session_mutex.unlock(); +} + +void +SocketServer::checkSessionsImpl() +{ + for(int i = m_sessions.size() - 1; i >= 0; i--) + { + if(m_sessions[i].m_session->m_stopped) + { + if(m_sessions[i].m_thread != 0) + { void* ret; NdbThread_WaitFor(m_sessions[i].m_thread, &ret); NdbThread_Destroy(&m_sessions[i].m_thread); @@ -301,19 +320,26 @@ SocketServer::checkSessions(){ void SocketServer::stopSessions(bool wait){ int i; + m_session_mutex.lock(); for(i = m_sessions.size() - 1; i>=0; i--) { m_sessions[i].m_session->stopSession(); m_sessions[i].m_session->m_stop = true; // to make sure } + m_session_mutex.unlock(); + for(i = m_services.size() - 1; i>=0; i--) m_services[i].m_service->stopSessions(); if(wait){ + m_session_mutex.lock(); while(m_sessions.size() > 0){ - checkSessions(); + checkSessionsImpl(); + m_session_mutex.unlock(); NdbSleep_MilliSleep(100); + m_session_mutex.lock(); } + m_session_mutex.unlock(); } } @@ -348,4 +374,4 @@ sessionThread_C(void* _sc){ } template class MutexVector; -template class MutexVector; +template class Vector; diff --git a/storage/ndb/src/mgmsrv/Services.cpp b/storage/ndb/src/mgmsrv/Services.cpp index be15484688b..d7f58d124aa 100644 --- a/storage/ndb/src/mgmsrv/Services.cpp +++ b/storage/ndb/src/mgmsrv/Services.cpp @@ -502,6 +502,7 @@ MgmApiSession::get_nodeid(Parser_t::Context &, ps.tick= tick; m_mgmsrv.get_socket_server()-> foreachSession(stop_session_if_timed_out,&ps); + m_mgmsrv.get_socket_server()->checkSessions(); error_string = ""; continue; } @@ -1559,6 +1560,7 @@ MgmApiSession::purge_stale_sessions(Parser_t::Context &ctx, ps.free_nodes.bitXORC(NodeBitmask()); // invert connected_nodes to get free nodes m_mgmsrv.get_socket_server()->foreachSession(stop_session_if_not_connected,&ps); + m_mgmsrv.get_socket_server()->checkSessions(); m_output->println("purge stale sessions reply"); if (str.length() > 0) -- cgit v1.2.1 From 7d6ee98f1c0be068755cef4baa586b99fd98e604 Mon Sep 17 00:00:00 2001 From: unknown Date: Thu, 18 May 2006 08:50:49 +0200 Subject: ndb - fix crashing dump numbers storage/ndb/include/kernel/signaldata/DumpStateOrd.hpp: fix crashing dump numbers --- storage/ndb/include/kernel/signaldata/DumpStateOrd.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/storage/ndb/include/kernel/signaldata/DumpStateOrd.hpp b/storage/ndb/include/kernel/signaldata/DumpStateOrd.hpp index 3e3d926a999..8172a034985 100644 --- a/storage/ndb/include/kernel/signaldata/DumpStateOrd.hpp +++ b/storage/ndb/include/kernel/signaldata/DumpStateOrd.hpp @@ -141,7 +141,7 @@ public: TuxSetLogFlags = 12002, TuxMetaDataJunk = 12009, - DumpTsman = 9000, + DumpTsman = 9002, DumpLgman = 10000, DumpPgman = 11000 }; -- cgit v1.2.1 From be0ab479b822a3b9d065dc659ef59d5450f2270b Mon Sep 17 00:00:00 2001 From: unknown Date: Thu, 18 May 2006 10:17:53 +0200 Subject: ndb - bug#19293 and family introduce acc per row logical mutex to fix difficult error handling cases storage/ndb/src/kernel/blocks/dbacc/Dbacc.hpp: 1) Fix per row mutex so that only 1 op at a time is running on a row 2) Change TUP_ALLOC/DEALLOC semantic, so that a new record will be allocated if LQ = { T1(DEL) - T2(INS) } 3) Rewrite lock queus to be O(1) in all cases but a few abort cases where we scan parallell queue 4) Impl. a validate_lock_queue/dump_lock_queue test framework storage/ndb/src/kernel/blocks/dbacc/DbaccInit.cpp: 1) Fix per row mutex so that only 1 op at a time is running on a row 2) Change TUP_ALLOC/DEALLOC semantic, so that a new record will be allocated if LQ = { T1(DEL) - T2(INS) } 3) Rewrite lock queus to be O(1) in all cases but a few abort cases where we scan parallell queue 4) Impl. a validate_lock_queue/dump_lock_queue test framework storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp: 1) Fix per row mutex so that only 1 op at a time is running on a row 2) Change TUP_ALLOC/DEALLOC semantic, so that a new record will be allocated if LQ = { T1(DEL) - T2(INS) } 3) Rewrite lock queus to be O(1) in all cases but a few abort cases where we scan parallell queue 4) Impl. a validate_lock_queue/dump_lock_queue test framework storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp: 1) impl. a new read key from operation record needed by acc 2) expand TRACE_OP toolkit 3) impl. ACCKEY_ORD as needed by ACC changes storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp: 1) impl. a new read key from operation record needed by acc 2) expand TRACE_OP toolkit 3) impl. ACCKEY_ORD as needed by ACC changes storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp: remove unused states/methods storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp: remove extremly tricky code that handles disk_insert_but_no_mem_insert that is no long needed with current acc changes storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp: remove unused states/methods storage/ndb/test/ndbapi/testOperations.cpp: renable last 3 lock upgrade testcases since they now pass --- storage/ndb/src/kernel/blocks/dbacc/Dbacc.hpp | 133 +- storage/ndb/src/kernel/blocks/dbacc/DbaccInit.cpp | 4 - storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp | 3130 +++++++++++++------- storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp | 29 +- storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp | 417 ++- storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp | 3 - .../ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp | 120 +- storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp | 1 - storage/ndb/test/ndbapi/testOperations.cpp | 4 +- 9 files changed, 2412 insertions(+), 1429 deletions(-) diff --git a/storage/ndb/src/kernel/blocks/dbacc/Dbacc.hpp b/storage/ndb/src/kernel/blocks/dbacc/Dbacc.hpp index 8373004fd0c..b1638599dcc 100644 --- a/storage/ndb/src/kernel/blocks/dbacc/Dbacc.hpp +++ b/storage/ndb/src/kernel/blocks/dbacc/Dbacc.hpp @@ -17,14 +17,13 @@ #ifndef DBACC_H #define DBACC_H - +#ifdef VM_TRACE +#define ACC_SAFE_QUEUE +#endif #include #include -// primary key is stored in TUP -#include "../dbtup/Dbtup.hpp" - #ifdef DBACC_C // Debug Macros #define dbgWord32(ptr, ind, val) @@ -135,7 +134,10 @@ ndbout << "Ptr: " << ptr.p->word32 << " \tIndex: " << tmp_string << " \tValue: " #define ZRIGHT 2 #define ZROOTFRAGMENTSIZE 32 #define ZSCAN_LOCK_ALL 3 -#define ZSCAN_OP 5 +/** + * Check kernel_types for other operation types + */ +#define ZSCAN_OP 6 #define ZSCAN_REC_SIZE 256 #define ZSTAND_BY 2 #define ZTABLESIZE 16 @@ -477,6 +479,7 @@ struct Fragmentrec { /* OPERATIONREC */ /* --------------------------------------------------------------------------------- */ struct Operationrec { + Uint32 m_op_bits; Uint32 localdata[2]; Uint32 elementIsforward; Uint32 elementPage; @@ -485,36 +488,62 @@ struct Operationrec { Uint32 fragptr; Uint32 hashvaluePart; Uint32 hashValue; - Uint32 insertDeleteLen; Uint32 nextLockOwnerOp; Uint32 nextOp; Uint32 nextParallelQue; - Uint32 nextSerialQue; + union { + Uint32 nextSerialQue; + Uint32 m_lock_owner_ptr_i; // if nextParallelQue = RNIL, else undefined + }; Uint32 prevOp; Uint32 prevLockOwnerOp; - Uint32 prevParallelQue; - Uint32 prevSerialQue; + union { + Uint32 prevParallelQue; + Uint32 m_lo_last_parallel_op_ptr_i; + }; + union { + Uint32 prevSerialQue; + Uint32 m_lo_last_serial_op_ptr_i; + }; Uint32 scanRecPtr; Uint32 transId1; Uint32 transId2; - State opState; Uint32 userptr; - State transactionstate; Uint16 elementContainer; Uint16 tupkeylen; Uint32 xfrmtupkeylen; Uint32 userblockref; Uint32 scanBits; - Uint8 elementIsDisappeared; - Uint8 insertIsDone; - Uint8 lockMode; - Uint8 lockOwner; - Uint8 nodeType; - Uint8 operation; - Uint8 opSimple; - Uint8 dirtyRead; - Uint8 commitDeleteCheckFlag; - Uint8 isAccLockReq; + + enum OpBits { + OP_MASK = 0x0000F // 4 bits for operation type + ,OP_LOCK_MODE = 0x00010 // 0 - shared lock, 1 = exclusive lock + ,OP_ACC_LOCK_MODE = 0x00020 // Or:de lock mode of all operation + // before me + ,OP_LOCK_OWNER = 0x00040 + ,OP_RUN_QUEUE = 0x00080 // In parallell queue of lock owner + ,OP_DIRTY_READ = 0x00100 + ,OP_LOCK_REQ = 0x00200 // isAccLockReq + ,OP_COMMIT_DELETE_CHECK = 0x00400 + ,OP_INSERT_IS_DONE = 0x00800 + ,OP_ELEMENT_DISAPPEARED = 0x01000 + + ,OP_STATE_MASK = 0xF0000 + ,OP_STATE_IDLE = 0xF0000 + ,OP_STATE_WAITING = 0x00000 + ,OP_STATE_RUNNING = 0x10000 + ,OP_STATE_EXECUTED = 0x30000 + ,OP_STATE_RUNNING_ABORT = 0x20000 + + ,OP_EXECUTED_DIRTY_READ = 0x3050F + ,OP_INITIAL = ~(Uint32)0 + }; + + bool is_same_trans(const Operationrec* op) const { + return + transId1 == op->transId1 && transId2 == op->transId2; + } + }; /* p2c: size = 168 bytes */ typedef Ptr OperationrecPtr; @@ -577,7 +606,6 @@ struct ScanRec { Uint32 scanUserblockref; Uint32 scanMask; Uint8 scanLockMode; - Uint8 scanKeyinfoFlag; Uint8 scanTimer; Uint8 scanContinuebCounter; Uint8 scanReadCommittedFlag; @@ -602,7 +630,8 @@ public: virtual ~Dbacc(); // pointer to TUP instance in this thread - Dbtup* c_tup; + class Dbtup* c_tup; + class Dblqh* c_lqh; void execACCMINUPDATE(Signal* signal); @@ -640,7 +669,8 @@ private: void ACCKEY_error(Uint32 fromWhere); void commitDeleteCheck(); - + void report_dealloc(Signal* signal, const Operationrec* opPtrP); + typedef void * RootfragmentrecPtr; void initRootFragPageZero(FragmentrecPtr, Page8Ptr); void initFragAdd(Signal*, FragmentrecPtr); @@ -679,14 +709,30 @@ private: bool addfragtotab(Signal* signal, Uint32 rootIndex, Uint32 fragId); void initOpRec(Signal* signal); void sendAcckeyconf(Signal* signal); - Uint32 placeReadInLockQueue(Signal* signal); - void placeSerialQueueRead(Signal* signal); - void checkOnlyReadEntry(Signal* signal); Uint32 getNoParallelTransaction(const Operationrec*); - void moveLastParallelQueue(Signal* signal); - void moveLastParallelQueueWrite(Signal* signal); - Uint32 placeWriteInLockQueue(Signal* signal); - void placeSerialQueueWrite(Signal* signal); + +#ifdef VM_TRACE + Uint32 getNoParallelTransactionFull(const Operationrec*); +#endif +#ifdef ACC_SAFE_QUEUE + bool validate_lock_queue(OperationrecPtr opPtr); + Uint32 get_parallel_head(OperationrecPtr opPtr); + void dump_lock_queue(OperationrecPtr loPtr); +#else + bool validate_lock_queue(OperationrecPtr) { return true;} +#endif + +public: + void execACCKEY_ORD(Signal* signal, Uint32 opPtrI); + void startNext(Signal* signal, OperationrecPtr lastOp); + +private: + Uint32 placeReadInLockQueue(OperationrecPtr lockOwnerPtr); + Uint32 placeWriteInLockQueue(OperationrecPtr lockOwnerPtr); + void placeSerialQueue(OperationrecPtr lockOwner, OperationrecPtr op); + void abortSerieQueueOperation(Signal* signal, OperationrecPtr op); + void abortParallelQueueOperation(Signal* signal, OperationrecPtr op); + void expandcontainer(Signal* signal); void shrinkcontainer(Signal* signal); void nextcontainerinfoExp(Signal* signal); @@ -716,8 +762,8 @@ private: void increaselistcont(Signal* signal); void seizeLeftlist(Signal* signal); void seizeRightlist(Signal* signal); - Uint32 readTablePk(Uint32 localkey1); - void getElement(Signal* signal); + Uint32 readTablePk(Uint32 localkey1, Uint32 eh, const Operationrec*); + Uint32 getElement(Signal* signal, OperationrecPtr& lockOwner); void getdirindex(Signal* signal); void commitdelete(Signal* signal); void deleteElement(Signal* signal); @@ -726,12 +772,17 @@ private: void releaseRightlist(Signal* signal); void checkoverfreelist(Signal* signal); void abortOperation(Signal* signal); - void accAbortReqLab(Signal* signal); void commitOperation(Signal* signal); - void copyOpInfo(Signal* signal); + void copyOpInfo(OperationrecPtr dst, OperationrecPtr src); Uint32 executeNextOperation(Signal* signal); void releaselock(Signal* signal); + void release_lockowner(Signal* signal, OperationrecPtr, bool commit); + void startNew(Signal* signal, OperationrecPtr newOwner); + void abortWaitingOperation(Signal*, OperationrecPtr); + void abortExecutedOperation(Signal*, OperationrecPtr); + void takeOutFragWaitQue(Signal* signal); + void check_lock_upgrade(Signal* signal, OperationrecPtr release_op, bool lo); void check_lock_upgrade(Signal* signal, OperationrecPtr lock_owner, OperationrecPtr release_op); void allocOverflowPage(Signal* signal); @@ -780,8 +831,8 @@ private: void senddatapagesLab(Signal* signal); void sttorrysignalLab(Signal* signal); void sendholdconfsignalLab(Signal* signal); - void accIsLockedLab(Signal* signal); - void insertExistElemLab(Signal* signal); + void accIsLockedLab(Signal* signal, OperationrecPtr lockOwnerPtr); + void insertExistElemLab(Signal* signal, OperationrecPtr lockOwnerPtr); void refaccConnectLab(Signal* signal); void releaseScanLab(Signal* signal); void ndbrestart1Lab(Signal* signal); @@ -840,8 +891,6 @@ private: Operationrec *operationrec; OperationrecPtr operationRecPtr; OperationrecPtr idrOperationRecPtr; - OperationrecPtr copyInOperPtr; - OperationrecPtr copyOperPtr; OperationrecPtr mlpqOperPtr; OperationrecPtr queOperPtr; OperationrecPtr readWriteOpPtr; @@ -885,8 +934,6 @@ private: Page8Ptr lcnPageptr; Page8Ptr lcnCopyPageptr; Page8Ptr lupPageptr; - Page8Ptr priPageptr; - Page8Ptr pwiPageptr; Page8Ptr ciPageidptr; Page8Ptr gsePageidptr; Page8Ptr isoPageptr; @@ -926,8 +973,6 @@ private: Tabrec *tabrec; TabrecPtr tabptr; Uint32 ctablesize; - Uint32 tpwiElementptr; - Uint32 tpriElementptr; Uint32 tgseElementptr; Uint32 tgseContainerptr; Uint32 trlHead; @@ -961,8 +1006,6 @@ private: Uint32 tdelForward; Uint32 tiopPageId; Uint32 tipPageId; - Uint32 tgeLocked; - Uint32 tgeResult; Uint32 tgeContainerptr; Uint32 tgeElementptr; Uint32 tgeForward; diff --git a/storage/ndb/src/kernel/blocks/dbacc/DbaccInit.cpp b/storage/ndb/src/kernel/blocks/dbacc/DbaccInit.cpp index 80fe82ed657..27355299a9c 100644 --- a/storage/ndb/src/kernel/blocks/dbacc/DbaccInit.cpp +++ b/storage/ndb/src/kernel/blocks/dbacc/DbaccInit.cpp @@ -130,8 +130,6 @@ Dbacc::Dbacc(Block_context& ctx): &fragrecptr, &operationRecPtr, &idrOperationRecPtr, - ©InOperPtr, - ©OperPtr, &mlpqOperPtr, &queOperPtr, &readWriteOpPtr, @@ -161,8 +159,6 @@ Dbacc::Dbacc(Block_context& ctx): &lcnPageptr, &lcnCopyPageptr, &lupPageptr, - &priPageptr, - &pwiPageptr, &ciPageidptr, &gsePageidptr, &isoPageptr, diff --git a/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp b/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp index 3768609860b..5337c7c015d 100644 --- a/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp +++ b/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp @@ -41,6 +41,19 @@ #define DEBUG(x) #endif +#ifdef ACC_SAFE_QUEUE +#define vlqrequire(x) do { if (unlikely(!(x))) {\ + dump_lock_queue(loPtr); \ + ndbrequire(false); } } while(0) +#else +#define vlqrequire(x) ndbrequire(x) +#endif + + +// primary key is stored in TUP +#include "../dbtup/Dbtup.hpp" +#include "../dblqh/Dblqh.hpp" + // Signal entries and statement blocks /* --------------------------------------------------------------------------------- */ @@ -208,8 +221,8 @@ void Dbacc::execSTTOR(Signal* signal) switch (tstartphase) { case 1: jam(); - c_tup = (Dbtup*)globalData.getBlock(DBTUP); - ndbrequire(c_tup != 0); + ndbrequire((c_tup = (Dbtup*)globalData.getBlock(DBTUP)) != 0); + ndbrequire((c_lqh = (Dblqh*)globalData.getBlock(DBLQH)) != 0); break; } tuserblockref = signal->theData[3]; @@ -435,9 +448,7 @@ void Dbacc::initialiseOperationRec(Signal* signal) for (operationRecPtr.i = 0; operationRecPtr.i < coprecsize; operationRecPtr.i++) { refresh_watch_dog(); ptrAss(operationRecPtr, operationrec); - operationRecPtr.p->transactionstate = IDLE; - operationRecPtr.p->operation = ZUNDEFINED_OP; - operationRecPtr.p->opState = FREE_OP; + operationRecPtr.p->m_op_bits = ~0; operationRecPtr.p->nextOp = operationRecPtr.i + 1; }//for operationRecPtr.i = coprecsize - 1; @@ -899,8 +910,7 @@ void Dbacc::execACCSEIZEREQ(Signal* signal) ptrGuard(operationRecPtr); operationRecPtr.p->userptr = tuserptr; operationRecPtr.p->userblockref = tuserblockref; - operationRecPtr.p->operation = ZUNDEFINED_OP; - operationRecPtr.p->transactionstate = IDLE; + operationRecPtr.p->m_op_bits = ~0; /* ******************************< */ /* ACCSEIZECONF */ /* ******************************< */ @@ -954,22 +964,19 @@ void Dbacc::initOpRec(Signal* signal) operationRecPtr.p->xfrmtupkeylen = signal->theData[4]; operationRecPtr.p->transId1 = signal->theData[5]; operationRecPtr.p->transId2 = signal->theData[6]; - operationRecPtr.p->transactionstate = ACTIVE; - operationRecPtr.p->commitDeleteCheckFlag = ZFALSE; - operationRecPtr.p->operation = Treqinfo & 0x7; - /* --------------------------------------------------------------------------------- */ - // opSimple is not used in this version. Is needed for deadlock handling later on. - /* --------------------------------------------------------------------------------- */ - // operationRecPtr.p->opSimple = (Treqinfo >> 3) & 0x1; - - operationRecPtr.p->lockMode = (Treqinfo >> 4) & 0x3; Uint32 readFlag = (((Treqinfo >> 4) & 0x3) == 0); // Only 1 if Read Uint32 dirtyFlag = (((Treqinfo >> 6) & 0x1) == 1); // Only 1 if Dirty Uint32 dirtyReadFlag = readFlag & dirtyFlag; - operationRecPtr.p->dirtyRead = dirtyReadFlag; - operationRecPtr.p->nodeType = (Treqinfo >> 7) & 0x3; + Uint32 opbits = 0; + opbits |= Treqinfo & 0x7; + opbits |= ((Treqinfo >> 4) & 0x3) ? Operationrec::OP_LOCK_MODE : 0; + opbits |= ((Treqinfo >> 4) & 0x3) ? Operationrec::OP_ACC_LOCK_MODE : 0; + opbits |= (dirtyReadFlag) ? Operationrec::OP_DIRTY_READ : 0; + opbits |= ((Treqinfo >> 31) & 0x1) ? Operationrec::OP_LOCK_REQ : 0; + + //operationRecPtr.p->nodeType = (Treqinfo >> 7) & 0x3; operationRecPtr.p->fid = fragrecptr.p->myfid; operationRecPtr.p->fragptr = fragrecptr.i; operationRecPtr.p->nextParallelQue = RNIL; @@ -977,14 +984,10 @@ void Dbacc::initOpRec(Signal* signal) operationRecPtr.p->nextSerialQue = RNIL; operationRecPtr.p->prevSerialQue = RNIL; operationRecPtr.p->elementPage = RNIL; - operationRecPtr.p->lockOwner = ZFALSE; - operationRecPtr.p->insertIsDone = ZFALSE; - operationRecPtr.p->elementIsDisappeared = ZFALSE; - operationRecPtr.p->insertDeleteLen = fragrecptr.p->elementLength; operationRecPtr.p->scanRecPtr = RNIL; + operationRecPtr.p->m_op_bits = opbits; // bit to mark lock operation - operationRecPtr.p->isAccLockReq = (Treqinfo >> 31) & 0x1; // undo log is not run via ACCKEYREQ }//Dbacc::initOpRec() @@ -995,7 +998,7 @@ void Dbacc::initOpRec(Signal* signal) void Dbacc::sendAcckeyconf(Signal* signal) { signal->theData[0] = operationRecPtr.p->userptr; - signal->theData[1] = operationRecPtr.p->operation; + signal->theData[1] = operationRecPtr.p->m_op_bits & Operationrec::OP_MASK; signal->theData[2] = operationRecPtr.p->fid; signal->theData[3] = operationRecPtr.p->localdata[0]; signal->theData[4] = operationRecPtr.p->localdata[1]; @@ -1003,7 +1006,8 @@ void Dbacc::sendAcckeyconf(Signal* signal) }//Dbacc::sendAcckeyconf() -void Dbacc::ACCKEY_error(Uint32 fromWhere) +void +Dbacc::ACCKEY_error(Uint32 fromWhere) { switch(fromWhere) { case 0: @@ -1057,7 +1061,8 @@ void Dbacc::execACCKEYREQ(Signal* signal) }//if ptrAss(operationRecPtr, operationrec); ptrAss(fragrecptr, fragmentrec); - ndbrequire(operationRecPtr.p->transactionstate == IDLE); + + ndbrequire(operationRecPtr.p->m_op_bits == Operationrec::OP_INITIAL); initOpRec(signal); // normalize key if any char attr @@ -1071,23 +1076,31 @@ void Dbacc::execACCKEYREQ(Signal* signal) /* WE REMEMBER THESE ADDRESS IF WE LATER NEED TO INSERT */ /* THE ITEM AFTER NOT FINDING THE ITEM. */ /*---------------------------------------------------------------*/ - getElement(signal); - - if (tgeResult == ZTRUE) { - switch (operationRecPtr.p->operation) { + OperationrecPtr lockOwnerPtr; + const Uint32 found = getElement(signal, lockOwnerPtr); + + Uint32 opbits = operationRecPtr.p->m_op_bits; + Uint32 op = opbits & Operationrec::OP_MASK; + if (found == ZTRUE) + { + switch (op) { case ZREAD: case ZUPDATE: case ZDELETE: case ZWRITE: case ZSCAN_OP: - if (!tgeLocked){ - if(operationRecPtr.p->operation == ZWRITE) + if (!lockOwnerPtr.p) + { + if(op == ZWRITE) { jam(); - operationRecPtr.p->operation = ZUPDATE; + opbits &= ~(Uint32)Operationrec::OP_MASK; + opbits |= (op = ZUPDATE); } + opbits |= Operationrec::OP_STATE_RUNNING; + opbits |= Operationrec::OP_RUN_QUEUE; sendAcckeyconf(signal); - if (operationRecPtr.p->dirtyRead == ZFALSE) { + if (! (opbits & Operationrec::OP_DIRTY_READ)) { /*---------------------------------------------------------------*/ // It is not a dirty read. We proceed by locking and continue with // the operation. @@ -1104,42 +1117,44 @@ void Dbacc::execACCKEYREQ(Signal* signal) dbgWord32(gePageptr, tgeElementptr, eh); gePageptr.p->word32[tgeElementptr] = eh; - insertLockOwnersList(signal , operationRecPtr); - return; + opbits |= Operationrec::OP_LOCK_OWNER; + insertLockOwnersList(signal, operationRecPtr); } else { jam(); /*---------------------------------------------------------------*/ // It is a dirty read. We do not lock anything. Set state to // IDLE since no COMMIT call will come. /*---------------------------------------------------------------*/ - operationRecPtr.p->transactionstate = IDLE; - operationRecPtr.p->operation = ZUNDEFINED_OP; - return; + opbits = Operationrec::OP_EXECUTED_DIRTY_READ; }//if + operationRecPtr.p->m_op_bits = opbits; + return; } else { jam(); - accIsLockedLab(signal); + accIsLockedLab(signal, lockOwnerPtr); return; }//if break; case ZINSERT: jam(); - insertExistElemLab(signal); + insertExistElemLab(signal, lockOwnerPtr); return; break; default: ndbrequire(false); break; }//switch - } else if (tgeResult == ZFALSE) { - switch (operationRecPtr.p->operation) { - case ZINSERT: + } else if (found == ZFALSE) { + switch (op){ case ZWRITE: + opbits &= ~(Uint32)Operationrec::OP_MASK; + opbits |= (op = ZINSERT); + case ZINSERT: jam(); - // If a write operation makes an insert we switch operation to ZINSERT so - // that the commit-method knows an insert has been made and updates noOfElements. - operationRecPtr.p->operation = ZINSERT; - operationRecPtr.p->insertIsDone = ZTRUE; + opbits |= Operationrec::OP_INSERT_IS_DONE; + opbits |= Operationrec::OP_STATE_RUNNING; + opbits |= Operationrec::OP_RUN_QUEUE; + operationRecPtr.p->m_op_bits = opbits; insertelementLab(signal); return; break; @@ -1157,12 +1172,284 @@ void Dbacc::execACCKEYREQ(Signal* signal) }//switch } else { jam(); - acckeyref1Lab(signal, tgeResult); + acckeyref1Lab(signal, found); return; }//if return; }//Dbacc::execACCKEYREQ() +void +Dbacc::execACCKEY_ORD(Signal* signal, Uint32 opPtrI) +{ + OperationrecPtr lastOp; + lastOp.i = opPtrI; + ptrCheckGuard(lastOp, coprecsize, operationrec); + Uint32 opbits = lastOp.p->m_op_bits; + Uint32 opstate = opbits & Operationrec::OP_STATE_MASK; + + if (likely(opbits == Operationrec::OP_EXECUTED_DIRTY_READ)) + { + jam(); + lastOp.p->m_op_bits = Operationrec::OP_INITIAL; + return; + } + else if (likely(opstate == Operationrec::OP_STATE_RUNNING)) + { + opbits |= Operationrec::OP_STATE_EXECUTED; + lastOp.p->m_op_bits = opbits; + startNext(signal, lastOp); + return; + } + else if (opstate == Operationrec::OP_STATE_RUNNING_ABORT) + { + } + else + { + } + + ndbout_c("bits: %.8x state: %.8x", opbits, opstate); + ndbrequire(false); +} + +void +Dbacc::startNext(Signal* signal, OperationrecPtr lastOp) +{ + jam(); + OperationrecPtr nextOp; + OperationrecPtr loPtr; + nextOp.i = lastOp.p->nextParallelQue; + loPtr.i = lastOp.p->m_lock_owner_ptr_i; + Uint32 opbits = lastOp.p->m_op_bits; + + if ((opbits & Operationrec::OP_STATE_MASK)!= Operationrec::OP_STATE_EXECUTED) + { + jam(); + return; + } + + Uint32 nextbits; + if (nextOp.i != RNIL) + { + jam(); + ptrCheckGuard(nextOp, coprecsize, operationrec); + nextbits = nextOp.p->m_op_bits; + goto checkop; + } + + if ((opbits & Operationrec::OP_LOCK_OWNER) == 0) + { + jam(); + ptrCheckGuard(loPtr, coprecsize, operationrec); + nextOp.i = loPtr.p->nextSerialQue; + } + else + { + jam(); + nextOp.i = loPtr.i; + loPtr = lastOp; + } + + ndbassert(loPtr.p->m_op_bits & Operationrec::OP_LOCK_OWNER); + + if (nextOp.i == RNIL) + { + jam(); + return; + } + + /** + * There is an op in serie queue... + * Check if it can run + */ + ptrCheckGuard(nextOp, coprecsize, operationrec); + nextbits = nextOp.p->m_op_bits; + + bool same = nextOp.p->is_same_trans(lastOp.p); + + if (!same && ((opbits & Operationrec::OP_ACC_LOCK_MODE) || + (nextbits & Operationrec::OP_LOCK_MODE))) + { + jam(); + /** + * Not same transaction + * and either last had exclusive lock + * or next had exclusive lock + */ + return; + } + + /** + * same trans and X-lock + */ + if (same && (opbits & Operationrec::OP_ACC_LOCK_MODE)) + { + jam(); + goto upgrade; + } + + /** + * all shared lock... + */ + if ((opbits & Operationrec::OP_ACC_LOCK_MODE) == 0 && + (nextbits & Operationrec::OP_LOCK_MODE) == 0) + { + jam(); + goto upgrade; + } + + /** + * There is a shared parallell queue & and exclusive op is first in queue + */ + ndbassert((opbits & Operationrec::OP_ACC_LOCK_MODE) == 0 && + (nextbits & Operationrec::OP_LOCK_MODE)); + + /** + * We must check if there are many transactions in parallel queue... + */ + OperationrecPtr tmp; + tmp.i = loPtr.p->nextParallelQue; + while (tmp.i != RNIL) + { + ptrCheckGuard(tmp, coprecsize, operationrec); + if (!nextOp.p->is_same_trans(tmp.p)) + { + jam(); + /** + * parallel queue contained another transaction, dont let it run + */ + return; + } + } + +upgrade: + /** + * Move first op in serie queue to end of parallell queue + */ + + tmp.i = loPtr.p->nextSerialQue = nextOp.p->nextSerialQue; + loPtr.p->m_lo_last_parallel_op_ptr_i = nextOp.i; + nextOp.p->nextSerialQue = RNIL; + nextOp.p->prevSerialQue = RNIL; + nextOp.p->m_lock_owner_ptr_i = loPtr.i; + nextOp.p->prevParallelQue = lastOp.i; + lastOp.p->nextParallelQue = nextOp.i; + + if (tmp.i != RNIL) + { + jam(); + ptrCheckGuard(tmp, coprecsize, operationrec); + tmp.p->prevSerialQue = loPtr.i; + } + else + { + jam(); + loPtr.p->m_lo_last_serial_op_ptr_i = RNIL; + } + + nextbits |= Operationrec::OP_RUN_QUEUE; + + /** + * Currently no grouping of ops in serie queue + */ + ndbrequire(nextOp.p->nextParallelQue == RNIL); + +checkop: + Uint32 errCode = 0; + OperationrecPtr save = operationRecPtr; + operationRecPtr = nextOp; + + Uint32 lastop = opbits & Operationrec::OP_MASK; + Uint32 nextop = nextbits & Operationrec::OP_MASK; + + nextbits &= nextbits & ~(Uint32)Operationrec::OP_STATE_MASK; + nextbits |= Operationrec::OP_STATE_RUNNING; + + if (lastop == ZDELETE) + { + jam(); + if (nextop != ZINSERT && nextop != ZWRITE) + { + errCode = ZREAD_ERROR; + goto ref; + } + + nextbits &= ~(Uint32)Operationrec::OP_MASK; + nextbits &= ~(Uint32)Operationrec::OP_ELEMENT_DISAPPEARED; + nextbits |= (nextop = ZINSERT); + nextbits |= Operationrec::OP_INSERT_IS_DONE; + goto conf; + } + else if (nextop == ZINSERT) + { + jam(); + errCode = ZWRITE_ERROR; + goto ref; + } + else if (nextop == ZWRITE) + { + jam(); + nextbits &= ~(Uint32)Operationrec::OP_MASK; + nextbits |= (nextop = ZUPDATE); + goto conf; + } + else + { + jam(); + } + +conf: + nextOp.p->m_op_bits = nextbits; + nextOp.p->localdata[0] = lastOp.p->localdata[0]; + nextOp.p->localdata[1] = lastOp.p->localdata[1]; + + if (nextop == ZSCAN_OP && (nextbits & Operationrec::OP_LOCK_REQ) == 0) + { + jam(); + takeOutScanLockQueue(nextOp.p->scanRecPtr); + putReadyScanQueue(signal, nextOp.p->scanRecPtr); + } + else + { + jam(); + sendAcckeyconf(signal); + sendSignal(nextOp.p->userblockref, GSN_ACCKEYCONF, + signal, 6, JBA); + } + + operationRecPtr = save; + return; + +ref: + nextOp.p->m_op_bits = nextbits; + + if (nextop == ZSCAN_OP && (nextbits & Operationrec::OP_LOCK_REQ) == 0) + { + jam(); + nextOp.p->m_op_bits |= Operationrec::OP_ELEMENT_DISAPPEARED; + takeOutScanLockQueue(nextOp.p->scanRecPtr); + putReadyScanQueue(signal, nextOp.p->scanRecPtr); + } + else + { + jam(); + signal->theData[0] = nextOp.p->userptr; + signal->theData[1] = errCode; + sendSignal(nextOp.p->userblockref, GSN_ACCKEYREF, signal, + 2, JBB); + } + + operationRecPtr = save; + return; +} + + +#if 0 +void +Dbacc::execACCKEY_REP_REF(Signal* signal, Uint32 opPtrI) +{ +} +#endif + void Dbacc::xfrmKeyData(Signal* signal) { @@ -1176,23 +1463,22 @@ Dbacc::xfrmKeyData(Signal* signal) operationRecPtr.p->xfrmtupkeylen = len; } -void Dbacc::accIsLockedLab(Signal* signal) +void +Dbacc::accIsLockedLab(Signal* signal, OperationrecPtr lockOwnerPtr) { ndbrequire(csystemRestart == ZFALSE); - queOperPtr.i = ElementHeader::getOpPtrI(gePageptr.p->word32[tgeElementptr]); - ptrCheckGuard(queOperPtr, coprecsize, operationrec); - if (operationRecPtr.p->dirtyRead == ZFALSE) { + + Uint32 bits = operationRecPtr.p->m_op_bits; + validate_lock_queue(lockOwnerPtr); + + if ((bits & Operationrec::OP_DIRTY_READ) == 0){ Uint32 return_result; - if (operationRecPtr.p->lockMode == ZREADLOCK) { + if ((bits & Operationrec::OP_LOCK_MODE) == ZREADLOCK) { jam(); - priPageptr = gePageptr; - tpriElementptr = tgeElementptr; - return_result = placeReadInLockQueue(signal); + return_result = placeReadInLockQueue(lockOwnerPtr); } else { jam(); - pwiPageptr = gePageptr; - tpwiElementptr = tgeElementptr; - return_result = placeWriteInLockQueue(signal); + return_result = placeWriteInLockQueue(lockOwnerPtr); }//if if (return_result == ZPARALLEL_QUEUE) { jam(); @@ -1202,24 +1488,29 @@ void Dbacc::accIsLockedLab(Signal* signal) jam(); signal->theData[0] = RNIL; return; - } else if (return_result == ZWRITE_ERROR) { + } else { jam(); acckeyref1Lab(signal, return_result); return; }//if ndbrequire(false); - } else { - if (queOperPtr.p->elementIsDisappeared == ZFALSE) { + } + else + { + if (!(lockOwnerPtr.p->m_op_bits & Operationrec::OP_ELEMENT_DISAPPEARED) && + lockOwnerPtr.p->localdata[0] != ~(Uint32)0) + { jam(); - /*---------------------------------------------------------------*/ - // It is a dirty read. We do not lock anything. Set state to - // IDLE since no COMMIT call will arrive. - /*---------------------------------------------------------------*/ + /* --------------------------------------------------------------- + * It is a dirty read. We do not lock anything. Set state to + *IDLE since no COMMIT call will arrive. + * ---------------------------------------------------------------*/ sendAcckeyconf(signal); - operationRecPtr.p->transactionstate = IDLE; - operationRecPtr.p->operation = ZUNDEFINED_OP; + operationRecPtr.p->m_op_bits = Operationrec::OP_EXECUTED_DIRTY_READ; return; - } else { + } + else + { jam(); /*---------------------------------------------------------------*/ // The tuple does not exist in the committed world currently. @@ -1231,17 +1522,18 @@ void Dbacc::accIsLockedLab(Signal* signal) }//if }//Dbacc::accIsLockedLab() -/* --------------------------------------------------------------------------------- */ -/* I N S E R T E X I S T E L E M E N T */ -/* --------------------------------------------------------------------------------- */ -void Dbacc::insertExistElemLab(Signal* signal) +/* ------------------------------------------------------------------------ */ +/* I N S E R T E X I S T E L E M E N T */ +/* ------------------------------------------------------------------------ */ +void Dbacc::insertExistElemLab(Signal* signal, OperationrecPtr lockOwnerPtr) { - if (!tgeLocked){ + if (!lockOwnerPtr.p) + { jam(); acckeyref1Lab(signal, ZWRITE_ERROR);/* THE ELEMENT ALREADY EXIST */ return; }//if - accIsLockedLab(signal); + accIsLockedLab(signal, lockOwnerPtr); }//Dbacc::insertExistElemLab() /* --------------------------------------------------------------------------------- */ @@ -1259,26 +1551,8 @@ void Dbacc::insertelementLab(Signal* signal) }//if }//if ndbrequire(operationRecPtr.p->tupkeylen <= fragrecptr.p->keyLength); - - Uint32 localKey; - if(!operationRecPtr.p->isAccLockReq) - { - signal->theData[0] = operationRecPtr.p->userptr; - Uint32 blockNo = refToBlock(operationRecPtr.p->userblockref); - EXECUTE_DIRECT(blockNo, GSN_LQH_ALLOCREQ, signal, 1); - jamEntry(); - if (signal->theData[0] != 0) { - jam(); - Uint32 result_code = signal->theData[0]; - acckeyref1Lab(signal, result_code); - return; - }//if - localKey = (signal->theData[1] << MAX_TUPLES_BITS) + signal->theData[2]; - } - else - { - localKey = signal->theData[7]; - } + ndbassert(!(operationRecPtr.p->m_op_bits & Operationrec::OP_LOCK_REQ)); + Uint32 localKey = ~(Uint32)0; insertLockOwnersList(signal, operationRecPtr); @@ -1293,344 +1567,672 @@ void Dbacc::insertelementLab(Signal* signal) idrOperationRecPtr = operationRecPtr; clocalkey[0] = localKey; operationRecPtr.p->localdata[0] = localKey; - /* --------------------------------------------------------------------------------- */ - /* WE SET THE LOCAL KEY TO MINUS ONE TO INDICATE IT IS NOT YET VALID. */ - /* --------------------------------------------------------------------------------- */ + /* ----------------------------------------------------------------------- */ + /* WE SET THE LOCAL KEY TO MINUS ONE TO INDICATE IT IS NOT YET VALID. */ + /* ----------------------------------------------------------------------- */ insertElement(signal); sendAcckeyconf(signal); return; }//Dbacc::insertelementLab() -/* --------------------------------------------------------------------------------- */ -/* PLACE_READ_IN_LOCK_QUEUE */ -/* INPUT: OPERATION_REC_PTR OUR OPERATION POINTER */ -/* QUE_OPER_PTR LOCK QUEUE OWNER OPERATION POINTER */ -/* PRI_PAGEPTR PAGE POINTER OF ELEMENT */ -/* TPRI_ELEMENTPTR ELEMENT POINTER OF ELEMENT */ -/* OUTPUT TRESULT = */ -/* ZPARALLEL_QUEUE OPERATION PLACED IN PARALLEL QUEUE */ -/* OPERATION CAN PROCEED NOW. */ -/* ZSERIAL_QUEUE OPERATION PLACED IN SERIAL QUEUE */ -/* ERROR CODE OPERATION NEEDS ABORTING */ -/* THE ELEMENT WAS LOCKED AND WE WANT TO READ THE TUPLE. WE WILL CHECK THE LOCK */ -/* QUEUES TO PERFORM THE PROPER ACTION. */ -/* */ -/* IN SOME PLACES IN THE CODE BELOW THAT HANDLES WHAT TO DO WHEN THE TUPLE IS LOCKED */ -/* WE DO ASSUME THAT NEXT_PARALLEL_QUEUE AND NEXT_SERIAL_QUEUE ON OPERATION_REC_PTR */ -/* HAVE BEEN INITIALISED TO RNIL. THUS WE DO NOT PERFORM THIS ONCE MORE EVEN IF IT */ -/* COULD BE NICE FOR READABILITY. */ -/* --------------------------------------------------------------------------------- */ -Uint32 Dbacc::placeReadInLockQueue(Signal* signal) + +/* ------------------------------------------------------------------------ */ +/* GET_NO_PARALLEL_TRANSACTION */ +/* ------------------------------------------------------------------------ */ +Uint32 +Dbacc::getNoParallelTransaction(const Operationrec * op) { - if (getNoParallelTransaction(queOperPtr.p) == 1) { - if ((queOperPtr.p->transId1 == operationRecPtr.p->transId1) && - (queOperPtr.p->transId2 == operationRecPtr.p->transId2)) { - /* --------------------------------------------------------------------------------- */ - /* WE ARE PERFORMING A READ OPERATION AND THIS TRANSACTION ALREADY OWNS THE LOCK */ - /* ALONE. PUT THE OPERATION LAST IN THE PARALLEL QUEUE. */ - /* --------------------------------------------------------------------------------- */ - jam(); - mlpqOperPtr = queOperPtr; - moveLastParallelQueue(signal); - operationRecPtr.p->localdata[0] = queOperPtr.p->localdata[0]; - operationRecPtr.p->localdata[1] = queOperPtr.p->localdata[1]; - operationRecPtr.p->prevParallelQue = mlpqOperPtr.i; - mlpqOperPtr.p->nextParallelQue = operationRecPtr.i; - switch (queOperPtr.p->lockMode) { - case ZREADLOCK: - jam(); - /*empty*/; - break; - default: - jam(); - /* --------------------------------------------------------------------------------- */ - /* IF THE TRANSACTION PREVIOUSLY SET A WRITE LOCK WE MUST ENSURE THAT ALL */ - /* OPERATIONS IN THE PARALLEL QUEUE HAVE WRITE LOCK MODE TO AVOID STRANGE BUGS.*/ - /* --------------------------------------------------------------------------------- */ - operationRecPtr.p->lockMode = queOperPtr.p->lockMode; - break; - }//switch - return ZPARALLEL_QUEUE; - }//if - }//if - if (queOperPtr.p->nextSerialQue == RNIL) { - /* --------------------------------------------------------------------------------- */ - /* WE ARE PERFORMING A READ OPERATION AND THERE IS NO SERIAL QUEUE. IF THERE IS NO */ - /* WRITE OPERATION THAT OWNS THE LOCK OR ANY WRITE OPERATION IN THE PARALLEL QUEUE */ - /* IT IS ENOUGH TO CHECK THE LOCK MODE OF THE LEADER IN THE PARALLEL QUEUE. IF IT IS */ - /* A READ LOCK THEN WE PLACE OURSELVES IN THE PARALLEL QUEUE OTHERWISE WE GO ON TO */ - /* PLACE OURSELVES IN THE SERIAL QUEUE. */ - /* --------------------------------------------------------------------------------- */ - switch (queOperPtr.p->lockMode) { - case ZREADLOCK: - jam(); - mlpqOperPtr = queOperPtr; - moveLastParallelQueue(signal); - operationRecPtr.p->prevParallelQue = mlpqOperPtr.i; - mlpqOperPtr.p->nextParallelQue = operationRecPtr.i; - operationRecPtr.p->localdata[0] = queOperPtr.p->localdata[0]; - operationRecPtr.p->localdata[1] = queOperPtr.p->localdata[1]; - return ZPARALLEL_QUEUE; - default: - jam(); - queOperPtr.p->nextSerialQue = operationRecPtr.i; - operationRecPtr.p->prevSerialQue = queOperPtr.i; - break; - }//switch - } else { + OperationrecPtr tmp; + + tmp.i= op->nextParallelQue; + Uint32 transId[2] = { op->transId1, op->transId2 }; + while (tmp.i != RNIL) + { jam(); - placeSerialQueueRead(signal); - }//if - return ZSERIAL_QUEUE; -}//Dbacc::placeReadInLockQueue() + ptrCheckGuard(tmp, coprecsize, operationrec); + if (tmp.p->transId1 == transId[0] && tmp.p->transId2 == transId[1]) + tmp.i = tmp.p->nextParallelQue; + else + return 2; + } + return 1; +}//Dbacc::getNoParallelTransaction() -/* --------------------------------------------------------------------------------- */ -/* WE WILL CHECK IF THIS TRANSACTION IS ALREADY PLACED AT SOME SPOT IN THE PARALLEL */ -/* SERIAL QUEUE WITHOUT ANY NEIGHBORS FROM OTHER TRANSACTION. IF SO WE WILL INSERT */ -/* IT IN THAT PARALLEL QUEUE. */ -/* --------------------------------------------------------------------------------- */ -void Dbacc::placeSerialQueueRead(Signal* signal) +#ifdef VM_TRACE +Uint32 +Dbacc::getNoParallelTransactionFull(const Operationrec * op) { - readWriteOpPtr.i = queOperPtr.p->nextSerialQue; - ptrCheckGuard(readWriteOpPtr, coprecsize, operationrec); -PSQR_LOOP: - jam(); - if (readWriteOpPtr.p->nextSerialQue == RNIL) { - jam(); - /* --------------------------------------------------------------------------------- */ - /* THERE WAS NO PREVIOUS OPERATION IN THIS TRANSACTION WHICH WE COULD PUT IT */ - /* IN THE PARALLEL QUEUE TOGETHER WITH. */ - /* --------------------------------------------------------------------------------- */ - checkOnlyReadEntry(signal); - return; - }//if - if (getNoParallelTransaction(readWriteOpPtr.p) == 1) { - jam(); - /* --------------------------------------------------------------------------------- */ - /* THERE WAS ONLY ONE TRANSACTION INVOLVED IN THE PARALLEL QUEUE. IF THIS IS OUR */ - /* TRANSACTION WE CAN STILL GET HOLD OF THE LOCK. */ - /* --------------------------------------------------------------------------------- */ - if ((readWriteOpPtr.p->transId1 == operationRecPtr.p->transId1) && - (readWriteOpPtr.p->transId2 == operationRecPtr.p->transId2)) { - jam(); - /* --------------------------------------------------------------------------------- */ - /* WE ARE PERFORMING A READ IN THE SAME TRANSACTION WHERE WE ALREADY */ - /* PREVIOUSLY HAVE EXECUTED AN OPERATION. INSERT-DELETE, READ-UPDATE, READ-READ, */ - /* UPDATE-UPDATE, UPDATE-DELETE, READ-DELETE, INSERT-READ, INSERT-UPDATE ARE ALLOWED */ - /* COMBINATIONS. A NEW INSERT AFTER A DELETE IS NOT ALLOWED AND SUCH AN INSERT WILL */ - /* GO TO THE SERIAL LOCK QUEUE WHICH IT WILL NOT LEAVE UNTIL A TIME-OUT AND THE */ - /* TRANSACTION IS ABORTED. READS AND UPDATES AFTER DELETES IS ALSO NOT ALLOWED. */ - /* --------------------------------------------------------------------------------- */ - mlpqOperPtr = readWriteOpPtr; - moveLastParallelQueue(signal); - readWriteOpPtr = mlpqOperPtr; - operationRecPtr.p->prevParallelQue = readWriteOpPtr.i; - readWriteOpPtr.p->nextParallelQue = operationRecPtr.i; - operationRecPtr.p->localdata[0] = readWriteOpPtr.p->localdata[0]; - operationRecPtr.p->localdata[1] = readWriteOpPtr.p->localdata[1]; - switch (readWriteOpPtr.p->lockMode) { - case ZREADLOCK: - jam(); - /*empty*/; - break; - default: - jam(); - /* --------------------------------------------------------------------------------- */ - /* IF THE TRANSACTION PREVIOUSLY SET A WRITE LOCK WE MUST ENSURE THAT ALL */ - /* OPERATIONS IN THE PARALLEL QUEUE HAVE WRITE LOCK MODE TO AVOID STRANGE BUGS.*/ - /* --------------------------------------------------------------------------------- */ - operationRecPtr.p->lockMode = readWriteOpPtr.p->lockMode; - break; - }//switch - return; - }//if - }//if - readWriteOpPtr.i = readWriteOpPtr.p->nextSerialQue; - ptrCheckGuard(readWriteOpPtr, coprecsize, operationrec); - goto PSQR_LOOP; -}//Dbacc::placeSerialQueueRead() + ConstPtr tmp; + + tmp.p = op; + while ((tmp.p->m_op_bits & Operationrec::OP_LOCK_OWNER) == 0) + { + tmp.i = tmp.p->prevParallelQue; + if (tmp.i != RNIL) + { + ptrCheckGuard(tmp, coprecsize, operationrec); + } + else + { + break; + } + } + + return getNoParallelTransaction(tmp.p); +} +#endif -/* --------------------------------------------------------------------------------- */ -/* WE WILL CHECK IF THE LAST ENTRY IN THE SERIAL QUEUE CONTAINS ONLY READ */ -/* OPERATIONS. IF SO WE WILL INSERT IT IN THAT PARALLEL QUEUE. OTHERWISE WE */ -/* WILL PLACE IT AT THE END OF THE SERIAL QUEUE. */ -/* --------------------------------------------------------------------------------- */ -void Dbacc::checkOnlyReadEntry(Signal* signal) -{ - switch (readWriteOpPtr.p->lockMode) { - case ZREADLOCK: - jam(); - /* --------------------------------------------------------------------------------- */ - /* SINCE THIS LAST QUEUE ONLY CONTAINS READ LOCKS WE CAN JOIN THE PARALLEL QUEUE AT */ - /* THE END. */ - /* --------------------------------------------------------------------------------- */ - mlpqOperPtr = readWriteOpPtr; - moveLastParallelQueue(signal); - readWriteOpPtr = mlpqOperPtr; - operationRecPtr.p->prevParallelQue = readWriteOpPtr.i; - readWriteOpPtr.p->nextParallelQue = operationRecPtr.i; - operationRecPtr.p->localdata[0] = readWriteOpPtr.p->localdata[0]; - operationRecPtr.p->localdata[1] = readWriteOpPtr.p->localdata[1]; - break; - default: - jam(); /* PUT THE OPERATION RECORD IN THE SERIAL QUEUE */ - readWriteOpPtr.p->nextSerialQue = operationRecPtr.i; - operationRecPtr.p->prevSerialQue = readWriteOpPtr.i; - break; - }//switch -}//Dbacc::checkOnlyReadEntry() +#ifdef ACC_SAFE_QUEUE -/* --------------------------------------------------------------------------------- */ -/* GET_NO_PARALLEL_TRANSACTION */ -/* --------------------------------------------------------------------------------- */ Uint32 -Dbacc::getNoParallelTransaction(const Operationrec * op) +Dbacc::get_parallel_head(OperationrecPtr opPtr) { - OperationrecPtr tmp; + while ((opPtr.p->m_op_bits & Operationrec::OP_LOCK_OWNER) == 0 && + opPtr.p->prevParallelQue != RNIL) + { + opPtr.i = opPtr.p->prevParallelQue; + ptrCheckGuard(opPtr, coprecsize, operationrec); + } - tmp.i= op->nextParallelQue; - Uint32 transId[2] = { op->transId1, op->transId2 }; - while (tmp.i != RNIL) + return opPtr.i; +} + +bool +Dbacc::validate_lock_queue(OperationrecPtr opPtr) +{ + OperationrecPtr loPtr; + loPtr.i = get_parallel_head(opPtr); + ptrCheckGuard(loPtr, coprecsize, operationrec); + + while((loPtr.p->m_op_bits & Operationrec::OP_LOCK_OWNER) == 0 && + loPtr.p->prevSerialQue != RNIL) { - jam(); - ptrCheckGuard(tmp, coprecsize, operationrec); - if (tmp.p->transId1 == transId[0] && tmp.p->transId2 == transId[1]) - tmp.i = tmp.p->nextParallelQue; + loPtr.i = loPtr.p->prevSerialQue; + ptrCheckGuard(loPtr, coprecsize, operationrec); + } + + // Now we have lock owner... + vlqrequire(loPtr.p->m_op_bits & Operationrec::OP_LOCK_OWNER); + vlqrequire(loPtr.p->m_op_bits & Operationrec::OP_RUN_QUEUE); + + // 1 Validate page pointer + { + Page8Ptr pagePtr; + pagePtr.i = loPtr.p->elementPage; + ptrCheckGuard(pagePtr, cpagesize, page8); + arrGuard(loPtr.p->elementPointer, 2048); + Uint32 eh = pagePtr.p->word32[loPtr.p->elementPointer]; + vlqrequire(ElementHeader::getLocked(eh)); + vlqrequire(ElementHeader::getOpPtrI(eh) == loPtr.i); + } + + // 2 Lock owner should always have same LOCK_MODE and ACC_LOCK_MODE + if (loPtr.p->m_op_bits & Operationrec::OP_LOCK_MODE) + { + vlqrequire(loPtr.p->m_op_bits & Operationrec::OP_ACC_LOCK_MODE); + } + else + { + vlqrequire((loPtr.p->m_op_bits & Operationrec::OP_ACC_LOCK_MODE) == 0); + } + + // 3 Lock owner should never be waiting... + bool running = false; + { + Uint32 opstate = loPtr.p->m_op_bits & Operationrec::OP_STATE_MASK; + if (opstate == Operationrec::OP_STATE_RUNNING || + opstate == Operationrec::OP_STATE_RUNNING_ABORT) + running = true; else - return 2; + { + vlqrequire(opstate == Operationrec::OP_STATE_EXECUTED); + } } - return 1; -}//Dbacc::getNoParallelTransaction() + + // Validate parallel queue + { + bool many = false; + bool orlockmode = loPtr.p->m_op_bits & Operationrec::OP_LOCK_MODE; + OperationrecPtr lastP = loPtr; + + while (lastP.p->nextParallelQue != RNIL) + { + Uint32 prev = lastP.i; + lastP.i = lastP.p->nextParallelQue; + ptrCheckGuard(lastP, coprecsize, operationrec); + + vlqrequire(lastP.p->prevParallelQue == prev); + + Uint32 opbits = lastP.p->m_op_bits; + many |= loPtr.p->is_same_trans(lastP.p) ? 0 : 1; + orlockmode |= !!(opbits & Operationrec::OP_LOCK_MODE); + + vlqrequire(opbits & Operationrec::OP_RUN_QUEUE); + vlqrequire((opbits & Operationrec::OP_LOCK_OWNER) == 0); + + Uint32 opstate = opbits & Operationrec::OP_STATE_MASK; + if (running) + { + // If I found a running operation, + // all following should be waiting + vlqrequire(opstate == Operationrec::OP_STATE_WAITING); + } + else + { + if (opstate == Operationrec::OP_STATE_RUNNING || + opstate == Operationrec::OP_STATE_RUNNING_ABORT) + running = true; + else + vlqrequire(opstate == Operationrec::OP_STATE_EXECUTED); + } + + if (lastP.p->m_op_bits & Operationrec::OP_LOCK_MODE) + { + vlqrequire(lastP.p->m_op_bits & Operationrec::OP_ACC_LOCK_MODE); + } + else + { + vlqrequire((lastP.p->m_op_bits && orlockmode) == orlockmode); + vlqrequire((lastP.p->m_op_bits & Operationrec::OP_MASK) == ZREAD || + (lastP.p->m_op_bits & Operationrec::OP_MASK) == ZSCAN_OP); + } + + if (many) + { + vlqrequire(orlockmode == 0); + } + } + + if (lastP.i != loPtr.i) + { + vlqrequire(loPtr.p->m_lo_last_parallel_op_ptr_i == lastP.i); + vlqrequire(lastP.p->m_lock_owner_ptr_i == loPtr.i); + } + else + { + vlqrequire(loPtr.p->m_lo_last_parallel_op_ptr_i == RNIL); + } + } + + // Validate serie queue + if (loPtr.p->nextSerialQue != RNIL) + { + Uint32 prev = loPtr.i; + OperationrecPtr lastS; + lastS.i = loPtr.p->nextSerialQue; + while (true) + { + ptrCheckGuard(lastS, coprecsize, operationrec); + vlqrequire(lastS.p->prevSerialQue == prev); + vlqrequire(getNoParallelTransaction(lastS.p) == 1); + vlqrequire((lastS.p->m_op_bits & Operationrec::OP_LOCK_OWNER) == 0); + vlqrequire((lastS.p->m_op_bits & Operationrec::OP_RUN_QUEUE) == 0); + vlqrequire((lastS.p->m_op_bits & Operationrec::OP_STATE_MASK) == Operationrec::OP_STATE_WAITING); + if (lastS.p->nextSerialQue == RNIL) + break; + prev = lastS.i; + lastS.i = lastS.p->nextSerialQue; + } + + vlqrequire(loPtr.p->m_lo_last_serial_op_ptr_i == lastS.i); + } + else + { + vlqrequire(loPtr.p->m_lo_last_serial_op_ptr_i == RNIL); + } + return true; +} -void Dbacc::moveLastParallelQueue(Signal* signal) +NdbOut& +operator<<(NdbOut & out, Dbacc::OperationrecPtr ptr) { - while (mlpqOperPtr.p->nextParallelQue != RNIL) { - jam(); - mlpqOperPtr.i = mlpqOperPtr.p->nextParallelQue; - ptrCheckGuard(mlpqOperPtr, coprecsize, operationrec); - }//if -}//Dbacc::moveLastParallelQueue() + Uint32 opbits = ptr.p->m_op_bits; + out << "[ " << dec << ptr.i + << " [ " << hex << ptr.p->transId1 + << " " << hex << ptr.p->transId2 << "] " + << " bits: H'" << hex << opbits << " "; + + bool read = false; + switch(opbits & Dbacc::Operationrec::OP_MASK){ + case ZREAD: out << "READ "; read = true; break; + case ZINSERT: out << "INSERT "; break; + case ZUPDATE: out << "UPDATE "; break; + case ZDELETE: out << "DELETE "; break; + case ZWRITE: out << "WRITE "; break; + case ZSCAN_OP: out << "SCAN "; read = true; break; + default: + out << " "; + } + + if (read) + { + if (opbits & Dbacc::Operationrec::OP_LOCK_MODE) + out << "(X)"; + else + out << "(S)"; + if (opbits & Dbacc::Operationrec::OP_ACC_LOCK_MODE) + out << "(X)"; + else + out << "(S)"; + } -void Dbacc::moveLastParallelQueueWrite(Signal* signal) + if (opbits) + { + out << "(RQ)"; + } + + switch(opbits & Dbacc::Operationrec::OP_STATE_MASK){ + case Dbacc::Operationrec::OP_STATE_WAITING: + out << " WAITING "; break; + case Dbacc::Operationrec::OP_STATE_RUNNING: + out << " RUNNING "; break; + case Dbacc::Operationrec::OP_STATE_EXECUTED: + out << " EXECUTED "; break; + case Dbacc::Operationrec::OP_STATE_RUNNING_ABORT: + out << " RUNNIG_ABORT "; break; + case Dbacc::Operationrec::OP_STATE_IDLE: + out << " IDLE "; break; + default: + out << " "; + } + +/* + OP_MASK = 0x000F // 4 bits for operation type + ,OP_LOCK_MODE = 0x0010 // 0 - shared lock, 1 = exclusive lock + ,OP_ACC_LOCK_MODE = 0x0020 // Or:de lock mode of all operation + // before me + ,OP_LOCK_OWNER = 0x0040 + ,OP_DIRTY_READ = 0x0080 + ,OP_LOCK_REQ = 0x0100 // isAccLockReq + ,OP_COMMIT_DELETE_CHECK = 0x0200 + ,OP_INSERT_IS_DONE = 0x0400 + ,OP_ELEMENT_DISAPPEARED = 0x0800 + + ,OP_STATE_MASK = 0xF000 + ,OP_STATE_IDLE = 0xF000 + ,OP_STATE_WAITING = 0x0000 + ,OP_STATE_RUNNING = 0x1000 + ,OP_STATE_EXECUTED = 0x3000 + ,OP_STATE_RUNNING_ABORT = 0x2000 + }; +*/ + if (opbits & Dbacc::Operationrec::OP_LOCK_OWNER) + out << "LO "; + + if (opbits & Dbacc::Operationrec::OP_DIRTY_READ) + out << "DR "; + + if (opbits & Dbacc::Operationrec::OP_LOCK_REQ) + out << "LOCK_REQ "; + + if (opbits & Dbacc::Operationrec::OP_COMMIT_DELETE_CHECK) + out << "COMMIT_DELETE_CHECK "; + + if (opbits & Dbacc::Operationrec::OP_INSERT_IS_DONE) + out << "INSERT_IS_DONE "; + + if (opbits & Dbacc::Operationrec::OP_ELEMENT_DISAPPEARED) + out << "ELEMENT_DISAPPEARED "; + + if (opbits & Dbacc::Operationrec::OP_LOCK_OWNER) + { + out << "last_parallel: " << dec << ptr.p->m_lo_last_parallel_op_ptr_i << " "; + out << "last_serial: " << dec << ptr.p->m_lo_last_serial_op_ptr_i << " "; + } + + out << "]"; + return out; +} + +void +Dbacc::dump_lock_queue(OperationrecPtr loPtr) { - /* --------------------------------------------------------------------------------- */ - /* ENSURE THAT ALL OPERATIONS HAVE LOCK MODE SET TO WRITE SINCE WE INSERT A */ - /* WRITE LOCK INTO THE PARALLEL QUEUE. */ - /* --------------------------------------------------------------------------------- */ - while (mlpqOperPtr.p->nextParallelQue != RNIL) { - jam(); - mlpqOperPtr.p->lockMode = operationRecPtr.p->lockMode; - mlpqOperPtr.i = mlpqOperPtr.p->nextParallelQue; - ptrCheckGuard(mlpqOperPtr, coprecsize, operationrec); - }//if - mlpqOperPtr.p->lockMode = operationRecPtr.p->lockMode; -}//Dbacc::moveLastParallelQueueWrite() + if ((loPtr.p->m_op_bits & Operationrec::OP_LOCK_OWNER) == 0) + { + while ((loPtr.p->m_op_bits & Operationrec::OP_LOCK_OWNER) == 0 && + loPtr.p->prevParallelQue != RNIL) + { + loPtr.i = loPtr.p->prevParallelQue; + ptrCheckGuard(loPtr, coprecsize, operationrec); + } + + while ((loPtr.p->m_op_bits & Operationrec::OP_LOCK_OWNER) == 0 && + loPtr.p->prevSerialQue != RNIL) + { + loPtr.i = loPtr.p->prevSerialQue; + ptrCheckGuard(loPtr, coprecsize, operationrec); + } -/* --------------------------------------------------------------------------------- */ -/* PLACE_WRITE_IN_LOCK_QUEUE */ -/* INPUT: OPERATION_REC_PTR OUR OPERATION POINTER */ -/* QUE_OPER_PTR LOCK QUEUE OWNER OPERATION POINTER */ -/* PWI_PAGEPTR PAGE POINTER OF ELEMENT */ -/* TPWI_ELEMENTPTR ELEMENT POINTER OF ELEMENT */ -/* OUTPUT TRESULT = */ -/* ZPARALLEL_QUEUE OPERATION PLACED IN PARALLEL QUEUE */ -/* OPERATION CAN PROCEED NOW. */ -/* ZSERIAL_QUEUE OPERATION PLACED IN SERIAL QUEUE */ -/* ERROR CODE OPERATION NEEDS ABORTING */ -/* --------------------------------------------------------------------------------- */ -Uint32 Dbacc::placeWriteInLockQueue(Signal* signal) + ndbassert(loPtr.p->m_op_bits & Operationrec::OP_LOCK_OWNER); + } + + ndbout << "-- HEAD --" << endl; + OperationrecPtr tmp = loPtr; + while (tmp.i != RNIL) + { + ptrCheckGuard(tmp, coprecsize, operationrec); + ndbout << tmp << " "; + tmp.i = tmp.p->nextParallelQue; + + if (tmp.i == loPtr.i) + { + ndbout << " "; + break; + } + } + ndbout << endl; + + tmp.i = loPtr.p->nextSerialQue; + while (tmp.i != RNIL) + { + ptrCheckGuard(tmp, coprecsize, operationrec); + OperationrecPtr tmp2 = tmp; + + if (tmp.i == loPtr.i) + { + ndbout << "" << endl; + break; + } + + while (tmp2.i != RNIL) + { + ptrCheckGuard(tmp2, coprecsize, operationrec); + ndbout << tmp2 << " "; + tmp2.i = tmp2.p->nextParallelQue; + + if (tmp2.i == tmp.i) + { + ndbout << ""; + break; + } + } + ndbout << endl; + tmp.i = tmp.p->nextSerialQue; + } +} +#endif + +/* ------------------------------------------------------------------------- + * PLACE_WRITE_IN_LOCK_QUEUE + * INPUT: OPERATION_REC_PTR OUR OPERATION POINTER + * QUE_OPER_PTR LOCK QUEUE OWNER OPERATION POINTER + * PWI_PAGEPTR PAGE POINTER OF ELEMENT + * TPWI_ELEMENTPTR ELEMENT POINTER OF ELEMENT + * OUTPUT TRESULT = + * ZPARALLEL_QUEUE OPERATION PLACED IN PARALLEL QUEUE + * OPERATION CAN PROCEED NOW. + * ZSERIAL_QUEUE OPERATION PLACED IN SERIAL QUEUE + * ERROR CODE OPERATION NEEDS ABORTING + * ------------------------------------------------------------------------- */ +Uint32 +Dbacc::placeWriteInLockQueue(OperationrecPtr lockOwnerPtr) { - if (!((getNoParallelTransaction(queOperPtr.p) == 1) && - (queOperPtr.p->transId1 == operationRecPtr.p->transId1) && - (queOperPtr.p->transId2 == operationRecPtr.p->transId2))) { + OperationrecPtr lastOpPtr; + lastOpPtr.i = lockOwnerPtr.p->m_lo_last_parallel_op_ptr_i; + Uint32 opbits = operationRecPtr.p->m_op_bits; + + if (lastOpPtr.i == RNIL) + { + lastOpPtr = lockOwnerPtr; + } + else + { + ptrCheckGuard(lastOpPtr, coprecsize, operationrec); + } + + ndbassert(get_parallel_head(lastOpPtr) == lockOwnerPtr.i); + + Uint32 lastbits = lastOpPtr.p->m_op_bits; + if (lastbits & Operationrec::OP_ACC_LOCK_MODE) + { + if(operationRecPtr.p->is_same_trans(lastOpPtr.p)) + { + goto checkop; + } + } + else + { + /** + * We dont have an exclusive lock on operation and + * + */ jam(); - placeSerialQueueWrite(signal); - return ZSERIAL_QUEUE; - }//if + + /** + * Scan parallell queue to see if we are the only one + */ + OperationrecPtr loopPtr = lockOwnerPtr; + do + { + ptrCheckGuard(loopPtr, coprecsize, operationrec); + if (!loopPtr.p->is_same_trans(operationRecPtr.p)) + { + goto serial; + } + loopPtr.i = loopPtr.p->nextParallelQue; + } while (loopPtr.i != RNIL); + + goto checkop; + } + +serial: + jam(); + placeSerialQueue(lockOwnerPtr, operationRecPtr); + + validate_lock_queue(lockOwnerPtr); + + return ZSERIAL_QUEUE; +checkop: /* - WE ARE PERFORMING AN READ EXCLUSIVE, INSERT, UPDATE OR DELETE IN THE SAME - TRANSACTION WHERE WE PREVIOUSLY HAVE EXECUTED AN OPERATION. - Read-All, Update-All, Insert-All and Delete-Insert are allowed - combinations. - Delete-Read, Delete-Update and Delete-Delete are not an allowed - combination and will result in tuple not found error. + WE ARE PERFORMING AN READ EXCLUSIVE, INSERT, UPDATE OR DELETE IN THE SAME + TRANSACTION WHERE WE PREVIOUSLY HAVE EXECUTED AN OPERATION. + Read-All, Update-All, Insert-All and Delete-Insert are allowed + combinations. + Delete-Read, Delete-Update and Delete-Delete are not an allowed + combination and will result in tuple not found error. */ - mlpqOperPtr = queOperPtr; - moveLastParallelQueueWrite(signal); + Uint32 lstate = lastbits & Operationrec::OP_STATE_MASK; - if (operationRecPtr.p->operation == ZINSERT && - mlpqOperPtr.p->operation != ZDELETE){ + Uint32 retValue = ZSERIAL_QUEUE; // So that it gets blocked... + if (lstate == Operationrec::OP_STATE_EXECUTED) + { jam(); - return ZWRITE_ERROR; - }//if - if(operationRecPtr.p->operation == ZWRITE) - { - operationRecPtr.p->operation = - (mlpqOperPtr.p->operation == ZDELETE) ? ZINSERT : ZUPDATE; + /** + * Since last operation has executed...we can now check operation types + * if not, we have to wait until it has executed + */ + Uint32 op = opbits & Operationrec::OP_MASK; + Uint32 lop = lastbits & Operationrec::OP_MASK; + if (op == ZINSERT && lop != ZDELETE) + { + jam(); + return ZWRITE_ERROR; + }//if + + /** + * NOTE. No checking op operation types, as one can read different save + * points... + */ +#if 0 + if (lop == ZDELETE && (op != ZINSERT && op != ZWRITE)) + { + jam(); + return ZREAD_ERROR; + } +#else + if (lop == ZDELETE && (op == ZUPDATE && op == ZDELETE)) + { + jam(); + return ZREAD_ERROR; + } +#endif + + if(op == ZWRITE) + { + opbits &= ~(Uint32)Operationrec::OP_MASK; + opbits |= (lop == ZDELETE) ? ZINSERT : ZUPDATE; + } + + opbits |= Operationrec::OP_STATE_RUNNING; + operationRecPtr.p->localdata[0] = lastOpPtr.p->localdata[0]; + operationRecPtr.p->localdata[1] = lastOpPtr.p->localdata[1]; + retValue = ZPARALLEL_QUEUE; } - operationRecPtr.p->localdata[0] = queOperPtr.p->localdata[0]; - operationRecPtr.p->localdata[1] = queOperPtr.p->localdata[1]; - operationRecPtr.p->prevParallelQue = mlpqOperPtr.i; - mlpqOperPtr.p->nextParallelQue = operationRecPtr.i; - return ZPARALLEL_QUEUE; + opbits |= Operationrec::OP_RUN_QUEUE; + operationRecPtr.p->m_op_bits = opbits; + operationRecPtr.p->prevParallelQue = lastOpPtr.i; + operationRecPtr.p->m_lock_owner_ptr_i = lockOwnerPtr.i; + lastOpPtr.p->nextParallelQue = operationRecPtr.i; + lockOwnerPtr.p->m_lo_last_parallel_op_ptr_i = operationRecPtr.i; + + validate_lock_queue(lockOwnerPtr); + + return retValue; }//Dbacc::placeWriteInLockQueue() -/* --------------------------------------------------------------------------------- */ -/* WE HAVE TO PLACE IT SOMEWHERE IN THE SERIAL QUEUE INSTEAD. */ -/* --------------------------------------------------------------------------------- */ -void Dbacc::placeSerialQueueWrite(Signal* signal) +Uint32 +Dbacc::placeReadInLockQueue(OperationrecPtr lockOwnerPtr) { - readWriteOpPtr = queOperPtr; -PSQW_LOOP: - if (readWriteOpPtr.p->nextSerialQue == RNIL) { + OperationrecPtr lastOpPtr; + OperationrecPtr loopPtr = lockOwnerPtr; + lastOpPtr.i = lockOwnerPtr.p->m_lo_last_parallel_op_ptr_i; + Uint32 opbits = operationRecPtr.p->m_op_bits; + + if (lastOpPtr.i == RNIL) + { + lastOpPtr = lockOwnerPtr; + } + else + { + ptrCheckGuard(lastOpPtr, coprecsize, operationrec); + } + + ndbassert(get_parallel_head(lastOpPtr) == lockOwnerPtr.i); + + /** + * Last operation in parallell queue of lock owner is same trans + * and ACC_LOCK_MODE is exlusive, then we can proceed + */ + Uint32 lastbits = lastOpPtr.p->m_op_bits; + bool same = operationRecPtr.p->is_same_trans(lastOpPtr.p); + if (same && (lastbits & Operationrec::OP_ACC_LOCK_MODE)) + { jam(); - /* --------------------------------------------------------------------------------- */ - /* WE COULD NOT PUT IN ANY PARALLEL QUEUE. WE MUST PUT IT LAST IN THE SERIAL QUEUE. */ - /* --------------------------------------------------------------------------------- */ - readWriteOpPtr.p->nextSerialQue = operationRecPtr.i; - operationRecPtr.p->prevSerialQue = readWriteOpPtr.i; - return; - }//if - readWriteOpPtr.i = readWriteOpPtr.p->nextSerialQue; - ptrCheckGuard(readWriteOpPtr, coprecsize, operationrec); - if (getNoParallelTransaction(readWriteOpPtr.p) == 1) { - /* --------------------------------------------------------------------------------- */ - /* THERE WAS ONLY ONE TRANSACTION INVOLVED IN THE PARALLEL QUEUE. IF THIS IS OUR */ - /* TRANSACTION WE CAN STILL GET HOLD OF THE LOCK. */ - /* --------------------------------------------------------------------------------- */ - if ((readWriteOpPtr.p->transId1 == operationRecPtr.p->transId1) && - (readWriteOpPtr.p->transId2 == operationRecPtr.p->transId2)) { + goto checkop; + } + + if ((lastbits & Operationrec::OP_ACC_LOCK_MODE) && !same) + { + jam(); + /** + * Last op in serial queue had X-lock and was not our transaction... + */ + goto serial; + } + + if (lockOwnerPtr.p->nextSerialQue == RNIL) + { + jam(); + goto checkop; + } + + /** + * Scan parallell queue to see if we are already there... + */ + do + { + ptrCheckGuard(loopPtr, coprecsize, operationrec); + if (loopPtr.p->is_same_trans(operationRecPtr.p)) + goto checkop; + loopPtr.i = loopPtr.p->nextParallelQue; + } while (loopPtr.i != RNIL); + +serial: + placeSerialQueue(lockOwnerPtr, operationRecPtr); + + validate_lock_queue(lockOwnerPtr); + + return ZSERIAL_QUEUE; + +checkop: + Uint32 lstate = lastbits & Operationrec::OP_STATE_MASK; + + Uint32 retValue = ZSERIAL_QUEUE; // So that it gets blocked... + if (lstate == Operationrec::OP_STATE_EXECUTED) + { + jam(); + + /** + * NOTE. No checking op operation types, as one can read different save + * points... + */ + +#if 0 + /** + * Since last operation has executed...we can now check operation types + * if not, we have to wait until it has executed + */ + if (lop == ZDELETE) + { jam(); - /* --------------------------------------------------------------------------------- */ - /* WE ARE PERFORMING AN UPDATE OR DELETE IN THE SAME TRANSACTION WHERE WE ALREADY */ - /* PREVIOUSLY HAVE EXECUTED AN OPERATION. INSERT-DELETE, READ-UPDATE, READ-READ, */ - /* UPDATE-UPDATE, UPDATE-DELETE, READ-DELETE, INSERT-READ, INSERT-UPDATE ARE ALLOWED */ - /* COMBINATIONS. A NEW INSERT AFTER A DELETE IS NOT ALLOWED AND SUCH AN INSERT WILL */ - /* GO TO THE SERIAL LOCK QUEUE WHICH IT WILL NOT LEAVE UNTIL A TIME-OUT AND THE */ - /* TRANSACTION IS ABORTED. READS AND UPDATES AFTER DELETES IS ALSO NOT ALLOWED. */ - /* --------------------------------------------------------------------------------- */ - mlpqOperPtr = readWriteOpPtr; - moveLastParallelQueueWrite(signal); - readWriteOpPtr = mlpqOperPtr; - operationRecPtr.p->prevParallelQue = readWriteOpPtr.i; - readWriteOpPtr.p->nextParallelQue = operationRecPtr.i; - operationRecPtr.p->localdata[0] = readWriteOpPtr.p->localdata[0]; - operationRecPtr.p->localdata[1] = readWriteOpPtr.p->localdata[1]; - return; - }//if - }//if - goto PSQW_LOOP; -}//Dbacc::placeSerialQueueWrite() + return ZREAD_ERROR; + } +#endif + + opbits |= Operationrec::OP_STATE_RUNNING; + operationRecPtr.p->localdata[0] = lastOpPtr.p->localdata[0]; + operationRecPtr.p->localdata[1] = lastOpPtr.p->localdata[1]; + retValue = ZPARALLEL_QUEUE; + } + opbits |= (lastbits & Operationrec::OP_ACC_LOCK_MODE); + opbits |= Operationrec::OP_RUN_QUEUE; + operationRecPtr.p->m_op_bits = opbits; + + operationRecPtr.p->prevParallelQue = lastOpPtr.i; + operationRecPtr.p->m_lock_owner_ptr_i = lockOwnerPtr.i; + lastOpPtr.p->nextParallelQue = operationRecPtr.i; + lockOwnerPtr.p->m_lo_last_parallel_op_ptr_i = operationRecPtr.i; + + validate_lock_queue(lockOwnerPtr); + + return retValue; +}//Dbacc::placeReadInLockQueue + +void Dbacc::placeSerialQueue(OperationrecPtr lockOwnerPtr, + OperationrecPtr opPtr) +{ + OperationrecPtr lastOpPtr; + lastOpPtr.i = lockOwnerPtr.p->m_lo_last_serial_op_ptr_i; + + if (lastOpPtr.i == RNIL) + { + // Lock owner is last... + ndbrequire(lockOwnerPtr.p->nextSerialQue == RNIL); + lastOpPtr = lockOwnerPtr; + } + else + { + ptrCheckGuard(lastOpPtr, coprecsize, operationrec); + } + + operationRecPtr.p->prevSerialQue = lastOpPtr.i; + lastOpPtr.p->nextSerialQue = opPtr.i; + lockOwnerPtr.p->m_lo_last_serial_op_ptr_i = opPtr.i; +} /* ------------------------------------------------------------------------- */ /* ACC KEYREQ END */ /* ------------------------------------------------------------------------- */ void Dbacc::acckeyref1Lab(Signal* signal, Uint32 result_code) { - operationRecPtr.p->transactionstate = WAIT_COMMIT_ABORT; + operationRecPtr.p->m_op_bits = ~0; /* ************************<< */ /* ACCKEYREF */ /* ************************<< */ @@ -1639,15 +2241,15 @@ void Dbacc::acckeyref1Lab(Signal* signal, Uint32 result_code) return; }//Dbacc::acckeyref1Lab() -/* ******************--------------------------------------------------------------- */ -/* ACCMINUPDATE UPDATE LOCAL KEY REQ */ -/* DESCRIPTION: UPDATES LOCAL KEY OF AN ELEMENTS IN THE HASH TABLE */ -/* THIS SIGNAL IS WAITED AFTER ANY INSERT REQ */ -/* ENTER ACCMINUPDATE WITH SENDER: LQH, LEVEL B */ -/* OPERATION_REC_PTR, OPERATION RECORD PTR */ -/* CLOCALKEY(0), LOCAL KEY 1 */ -/* CLOCALKEY(1) LOCAL KEY 2 */ -/* ******************--------------------------------------------------------------- */ +/* ******************----------------------------------------------------- */ +/* ACCMINUPDATE UPDATE LOCAL KEY REQ */ +/* DESCRIPTION: UPDATES LOCAL KEY OF AN ELEMENTS IN THE HASH TABLE */ +/* THIS SIGNAL IS WAITED AFTER ANY INSERT REQ */ +/* ENTER ACCMINUPDATE WITH SENDER: LQH, LEVEL B */ +/* OPERATION_REC_PTR, OPERATION RECORD PTR */ +/* CLOCALKEY(0), LOCAL KEY 1 */ +/* CLOCALKEY(1) LOCAL KEY 2 */ +/* ******************----------------------------------------------------- */ void Dbacc::execACCMINUPDATE(Signal* signal) { Page8Ptr ulkPageidptr; @@ -1660,19 +2262,26 @@ void Dbacc::execACCMINUPDATE(Signal* signal) tlocalkey1 = signal->theData[1]; tlocalkey2 = signal->theData[2]; ptrCheckGuard(operationRecPtr, coprecsize, operationrec); - if (operationRecPtr.p->transactionstate == ACTIVE) { - fragrecptr.i = operationRecPtr.p->fragptr; - ulkPageidptr.i = operationRecPtr.p->elementPage; - tulkLocalPtr = operationRecPtr.p->elementPointer + operationRecPtr.p->elementIsforward; + Uint32 opbits = operationRecPtr.p->m_op_bits; + fragrecptr.i = operationRecPtr.p->fragptr; + ulkPageidptr.i = operationRecPtr.p->elementPage; + tulkLocalPtr = operationRecPtr.p->elementPointer + + operationRecPtr.p->elementIsforward; + + if ((opbits & Operationrec::OP_STATE_MASK) == Operationrec::OP_STATE_RUNNING) + { ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec); ptrCheckGuard(ulkPageidptr, cpagesize, page8); dbgWord32(ulkPageidptr, tulkLocalPtr, tlocalkey1); arrGuard(tulkLocalPtr, 2048); ulkPageidptr.p->word32[tulkLocalPtr] = tlocalkey1; operationRecPtr.p->localdata[0] = tlocalkey1; - if (fragrecptr.p->localkeylen == 1) { + if (likely(fragrecptr.p->localkeylen == 1)) + { return; - } else if (fragrecptr.p->localkeylen == 2) { + } + else if (fragrecptr.p->localkeylen == 2) + { jam(); tulkLocalPtr = tulkLocalPtr + operationRecPtr.p->elementIsforward; operationRecPtr.p->localdata[1] = tlocalkey2; @@ -1696,15 +2305,17 @@ void Dbacc::execACC_COMMITREQ(Signal* signal) { Uint8 Toperation; jamEntry(); - operationRecPtr.i = signal->theData[0]; + Uint32 tmp = operationRecPtr.i = signal->theData[0]; ptrCheckGuard(operationRecPtr, coprecsize, operationrec); - ndbrequire(operationRecPtr.p->transactionstate == ACTIVE); + void* ptr = operationRecPtr.p; + Uint32 opbits = operationRecPtr.p->m_op_bits; fragrecptr.i = operationRecPtr.p->fragptr; ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec); + Toperation = opbits & Operationrec::OP_MASK; commitOperation(signal); - Toperation = operationRecPtr.p->operation; - operationRecPtr.p->transactionstate = IDLE; - operationRecPtr.p->operation = ZUNDEFINED_OP; + ndbassert(operationRecPtr.i == tmp); + ndbassert(operationRecPtr.p == ptr); + operationRecPtr.p->m_op_bits = Operationrec::OP_INITIAL; if(Toperation != ZREAD){ fragrecptr.p->m_commit_count++; if (Toperation != ZINSERT) { @@ -1713,7 +2324,7 @@ void Dbacc::execACC_COMMITREQ(Signal* signal) } else { jam(); fragrecptr.p->noOfElements--; - fragrecptr.p->slack += operationRecPtr.p->insertDeleteLen; + fragrecptr.p->slack += fragrecptr.p->elementLength; if (fragrecptr.p->slack > fragrecptr.p->slackCheck) { /* TIME FOR JOIN BUCKETS PROCESS */ if (fragrecptr.p->expandCounter > 0) { @@ -1732,7 +2343,7 @@ void Dbacc::execACC_COMMITREQ(Signal* signal) } else { jam(); /* EXPAND PROCESS HANDLING */ fragrecptr.p->noOfElements++; - fragrecptr.p->slack -= operationRecPtr.p->insertDeleteLen; + fragrecptr.p->slack -= fragrecptr.p->elementLength; if (fragrecptr.p->slack >= (1u << 31)) { /* IT MEANS THAT IF SLACK < ZERO */ if (fragrecptr.p->expandFlag == 0) { @@ -1749,45 +2360,57 @@ void Dbacc::execACC_COMMITREQ(Signal* signal) return; }//Dbacc::execACC_COMMITREQ() -/* ******************--------------------------------------------------------------- */ -/* ACC ABORT REQ ABORT ALL OPERATION OF THE TRANSACTION */ -/* ******************------------------------------+ */ -/* SENDER: LQH, LEVEL B */ -/* ******************--------------------------------------------------------------- */ -/* ACC ABORT REQ ABORT TRANSACTION */ -/* ******************------------------------------+ */ +/* ******************------------------------------------------------------- */ +/* ACC ABORT REQ ABORT ALL OPERATION OF THE TRANSACTION */ +/* ******************------------------------------+ */ +/* SENDER: LQH, LEVEL B */ +/* ******************------------------------------------------------------- */ +/* ACC ABORT REQ ABORT TRANSACTION */ +/* ******************------------------------------+ */ /* SENDER: LQH, LEVEL B */ void Dbacc::execACC_ABORTREQ(Signal* signal) { jamEntry(); - accAbortReqLab(signal); -}//Dbacc::execACC_ABORTREQ() - -void Dbacc::accAbortReqLab(Signal* signal) -{ operationRecPtr.i = signal->theData[0]; - bool sendConf = signal->theData[1]; + Uint32 sendConf = signal->theData[1]; ptrCheckGuard(operationRecPtr, coprecsize, operationrec); + fragrecptr.i = operationRecPtr.p->fragptr; + Uint32 opbits = operationRecPtr.p->m_op_bits; + Uint32 opstate = opbits & Operationrec::OP_STATE_MASK; tresult = 0; /* ZFALSE */ - if ((operationRecPtr.p->transactionstate == ACTIVE) || - (operationRecPtr.p->transactionstate == WAIT_COMMIT_ABORT)) { + + if (opbits == Operationrec::OP_EXECUTED_DIRTY_READ) + { + jam(); + } + else if (opstate == Operationrec::OP_STATE_EXECUTED || + opstate == Operationrec::OP_STATE_WAITING || + opstate == Operationrec::OP_STATE_RUNNING) + { jam(); - fragrecptr.i = operationRecPtr.p->fragptr; ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec); - operationRecPtr.p->transactionstate = ABORT; abortOperation(signal); - } else { - ndbrequire(operationRecPtr.p->transactionstate == IDLE); - jam(); - }//if - operationRecPtr.p->transactionstate = IDLE; - operationRecPtr.p->operation = ZUNDEFINED_OP; - if (! sendConf) - return; + } + + operationRecPtr.p->m_op_bits = Operationrec::OP_INITIAL; + signal->theData[0] = operationRecPtr.p->userptr; - sendSignal(operationRecPtr.p->userblockref, GSN_ACC_ABORTCONF, signal, 1, JBB); - return; -}//Dbacc::accAbortReqLab() + signal->theData[1] = 0; + switch(sendConf){ + case 0: + return; + case 2: + if (opstate != Operationrec::OP_STATE_RUNNING) + { + return; + } + case 1: + sendSignal(operationRecPtr.p->userblockref, GSN_ACC_ABORTCONF, + signal, 1, JBB); + } + + signal->theData[1] = RNIL; +} /* * Lock or unlock tuple. @@ -1828,8 +2451,7 @@ void Dbacc::execACC_LOCKREQ(Signal* signal) // init as in ACCSEIZEREQ operationRecPtr.p->userptr = req->userPtr; operationRecPtr.p->userblockref = req->userRef; - operationRecPtr.p->operation = ZUNDEFINED_OP; - operationRecPtr.p->transactionstate = IDLE; + operationRecPtr.p->m_op_bits = ~0; operationRecPtr.p->scanRecPtr = RNIL; // do read with lock via ACCKEYREQ Uint32 lockMode = (lockOp == AccLockReq::LockShared) ? 0 : 1; @@ -1880,8 +2502,8 @@ void Dbacc::execACC_LOCKREQ(Signal* signal) jam(); // do abort via ACC_ABORTREQ (immediate) signal->theData[0] = req->accOpPtr; - signal->theData[1] = false; // Dont send abort - accAbortReqLab(signal); + signal->theData[1] = 0; // Dont send abort + execACC_ABORTREQ(signal); releaseOpRec(signal); req->returnCode = AccLockReq::Success; *sig = *req; @@ -1891,8 +2513,8 @@ void Dbacc::execACC_LOCKREQ(Signal* signal) jam(); // do abort via ACC_ABORTREQ (with conf signal) signal->theData[0] = req->accOpPtr; - signal->theData[1] = true; // send abort - accAbortReqLab(signal); + signal->theData[1] = 1; // send abort + execACC_ABORTREQ(signal); releaseOpRec(signal); req->returnCode = AccLockReq::Success; *sig = *req; @@ -2551,16 +3173,29 @@ void Dbacc::getdirindex(Signal* signal) }//Dbacc::getdirindex() Uint32 -Dbacc::readTablePk(Uint32 localkey1) +Dbacc::readTablePk(Uint32 localkey1, Uint32 eh, const Operationrec* op) { + int ret; Uint32 tableId = fragrecptr.p->myTableId; Uint32 fragId = fragrecptr.p->myfid; - Uint32 fragPageId = localkey1 >> MAX_TUPLES_BITS; - Uint32 pageIndex = localkey1 & ((1 << MAX_TUPLES_BITS ) - 1); + bool xfrm = fragrecptr.p->hasCharAttr; + #ifdef VM_TRACE memset(ckeys, 0x1f, (fragrecptr.p->keyLength * MAX_XFRM_MULTIPLY) << 2); #endif - int ret = c_tup->accReadPk(tableId, fragId, fragPageId, pageIndex, ckeys, true); + + if (likely(localkey1 != ~(Uint32)0)) + { + Uint32 fragPageId = localkey1 >> MAX_TUPLES_BITS; + Uint32 pageIndex = localkey1 & ((1 << MAX_TUPLES_BITS ) - 1); + ret = c_tup->accReadPk(tableId, fragId, fragPageId, pageIndex, + ckeys, true); + } + else + { + ndbrequire(ElementHeader::getLocked(eh)); + ret = c_lqh->readPrimaryKeys(op->userptr, ckeys, xfrm); + } jamEntry(); ndbrequire(ret >= 0); return ret; @@ -2594,11 +3229,12 @@ void ndb_acc_ia64_icc810_dummy_func() #endif #endif -void Dbacc::getElement(Signal* signal) +Uint32 +Dbacc::getElement(Signal* signal, OperationrecPtr& lockOwnerPtr) { + Uint32 errcode; DirRangePtr geOverflowrangeptr; DirectoryarrayPtr geOverflowDirptr; - OperationrecPtr geTmpOperationRecPtr; Uint32 tgeElementHeader; Uint32 tgeElemStep; Uint32 tgeContainerhead; @@ -2613,7 +3249,6 @@ void Dbacc::getElement(Signal* signal) getdirindex(signal); tgePageindex = tgdiPageindex; gePageptr = gdiPageptr; - tgeResult = ZFALSE; /* * The value seached is * - table key for ACCKEYREQ, stored in TUP @@ -2623,7 +3258,6 @@ void Dbacc::getElement(Signal* signal) ndbrequire(TelemLen == ZELEM_HEAD_SIZE + fragrecptr.p->localkeylen); tgeNextptrtype = ZLEFT; - tgeLocked = 0; const Uint32 tmp = fragrecptr.p->k + fragrecptr.p->lhfragbits; const Uint32 opHashValuePart = (operationRecPtr.p->hashValue >> tmp) &0xFFFF; @@ -2636,9 +3270,17 @@ void Dbacc::getElement(Signal* signal) tgeKeyptr = (tgeElementptr + ZELEM_HEAD_SIZE) + fragrecptr.p->localkeylen; tgeElemStep = TelemLen; tgeForward = 1; - if (tgeContainerptr >= 2048) { ACCKEY_error(4); return;} + if (unlikely(tgeContainerptr >= 2048)) + { + errcode = 4; + goto error; + } tgeRemLen = gePageptr.p->word32[tgeContainerptr] >> 26; - if ((tgeContainerptr + tgeRemLen - 1) >= 2048) { ACCKEY_error(5); return;} + if (unlikely(((tgeContainerptr + tgeRemLen - 1) >= 2048))) + { + errcode = 5; + goto error; + } } else if (tgeNextptrtype == ZRIGHT) { jam(); tgeContainerptr = tgeContainerptr + ((ZHEAD_SIZE + ZBUF_SIZE) - ZCON_HEAD_SIZE); @@ -2646,29 +3288,42 @@ void Dbacc::getElement(Signal* signal) tgeKeyptr = (tgeElementptr - ZELEM_HEAD_SIZE) - fragrecptr.p->localkeylen; tgeElemStep = 0 - TelemLen; tgeForward = (Uint32)-1; - if (tgeContainerptr >= 2048) { ACCKEY_error(4); return;} + if (unlikely(tgeContainerptr >= 2048)) + { + errcode = 4; + goto error; + } tgeRemLen = gePageptr.p->word32[tgeContainerptr] >> 26; - if ((tgeContainerptr - tgeRemLen) >= 2048) { ACCKEY_error(5); return;} + if (unlikely((tgeContainerptr - tgeRemLen) >= 2048)) + { + errcode = 5; + goto error; + } } else { - ACCKEY_error(6); return; + errcode = 6; + goto error; }//if if (tgeRemLen >= ZCON_HEAD_SIZE + TelemLen) { - if (tgeRemLen > ZBUF_SIZE) { - ACCKEY_error(7); return; + if (unlikely(tgeRemLen > ZBUF_SIZE)) + { + errcode = 7; + goto error; }//if - /* --------------------------------------------------------------------------------- */ - // There is at least one element in this container. Check if it is the element - // searched for. - /* --------------------------------------------------------------------------------- */ + /* ------------------------------------------------------------------- */ + // There is at least one element in this container. + // Check if it is the element searched for. + /* ------------------------------------------------------------------- */ do { tgeElementHeader = gePageptr.p->word32[tgeElementptr]; tgeRemLen = tgeRemLen - TelemLen; Uint32 hashValuePart; + lockOwnerPtr.i = RNIL; + lockOwnerPtr.p = NULL; if (ElementHeader::getLocked(tgeElementHeader)) { jam(); - geTmpOperationRecPtr.i = ElementHeader::getOpPtrI(tgeElementHeader); - ptrCheckGuard(geTmpOperationRecPtr, coprecsize, operationrec); - hashValuePart = geTmpOperationRecPtr.p->hashvaluePart; + lockOwnerPtr.i = ElementHeader::getOpPtrI(tgeElementHeader); + ptrCheckGuard(lockOwnerPtr, coprecsize, operationrec); + hashValuePart = lockOwnerPtr.p->hashvaluePart; } else { jam(); hashValuePart = ElementHeader::getHashValuePart(tgeElementHeader); @@ -2678,21 +3333,21 @@ void Dbacc::getElement(Signal* signal) Uint32 localkey1 = gePageptr.p->word32[tgeElementptr + tgeForward]; Uint32 localkey2 = 0; bool found; - if (! searchLocalKey) { - Uint32 len = readTablePk(localkey1); + if (! searchLocalKey) + { + Uint32 len = readTablePk(localkey1, tgeElementptr, lockOwnerPtr.p); found = (len == operationRecPtr.p->xfrmtupkeylen) && (memcmp(Tkeydata, ckeys, len << 2) == 0); } else { jam(); found = (localkey1 == Tkeydata[0]); } - if (found) { + if (found) + { jam(); - tgeLocked = ElementHeader::getLocked(tgeElementHeader); - tgeResult = ZTRUE; operationRecPtr.p->localdata[0] = localkey1; operationRecPtr.p->localdata[1] = localkey2; - return; + return ZTRUE; } } if (tgeRemLen <= ZCON_HEAD_SIZE) { @@ -2701,18 +3356,22 @@ void Dbacc::getElement(Signal* signal) tgeElementptr = tgeElementptr + tgeElemStep; } while (true); }//if - if (tgeRemLen != ZCON_HEAD_SIZE) { - ACCKEY_error(8); return; + if (unlikely(tgeRemLen != ZCON_HEAD_SIZE)) + { + errcode = 8; + goto error; }//if tgeContainerhead = gePageptr.p->word32[tgeContainerptr]; tgeNextptrtype = (tgeContainerhead >> 7) & 0x3; if (tgeNextptrtype == 0) { jam(); - return; /* NO MORE CONTAINER */ + return ZFALSE; /* NO MORE CONTAINER */ }//if tgePageindex = tgeContainerhead & 0x7f; /* NEXT CONTAINER PAGE INDEX 7 BITS */ - if (tgePageindex > ZEMPTYLIST) { - ACCKEY_error(9); return; + if (unlikely(tgePageindex > ZEMPTYLIST)) + { + errcode = 9; + goto error; }//if if (((tgeContainerhead >> 9) & 1) == ZFALSE) { jam(); @@ -2726,55 +3385,72 @@ void Dbacc::getElement(Signal* signal) ptrCheckGuard(gePageptr, cpagesize, page8); }//if } while (1); - return; + + return ZFALSE; + +error: + ACCKEY_error(errcode); + return ~0; }//Dbacc::getElement() -/* --------------------------------------------------------------------------------- */ -/* --------------------------------------------------------------------------------- */ -/* --------------------------------------------------------------------------------- */ -/* */ -/* END OF GET_ELEMENT MODULE */ -/* */ -/* --------------------------------------------------------------------------------- */ -/* --------------------------------------------------------------------------------- */ -/* --------------------------------------------------------------------------------- */ -/* --------------------------------------------------------------------------------- */ -/* --------------------------------------------------------------------------------- */ -/* */ -/* MODULE: DELETE */ -/* */ -/* --------------------------------------------------------------------------------- */ -/* --------------------------------------------------------------------------------- */ -/* --------------------------------------------------------------------------------- */ -/* COMMITDELETE */ -/* INPUT: OPERATION_REC_PTR, PTR TO AN OPERATION RECORD. */ -/* FRAGRECPTR, PTR TO A FRAGMENT RECORD */ -/* */ -/* OUTPUT: */ -/* NONE */ -/* DESCRIPTION: DELETE OPERATIONS WILL BE COMPLETED AT THE COMMIT OF TRANSA- */ -/* CTION. THIS SUBROUTINE SEARCHS FOR ELEMENT AND DELETES IT. IT DOES SO BY */ -/* REPLACING IT WITH THE LAST ELEMENT IN THE BUCKET. IF THE DELETED ELEMENT */ -/* IS ALSO THE LAST ELEMENT THEN IT IS ONLY NECESSARY TO REMOVE THE ELEMENT. */ -/* --------------------------------------------------------------------------------- */ +/* ------------------------------------------------------------------------- */ +/* ------------------------------------------------------------------------- */ +/* ------------------------------------------------------------------------- */ +/* */ +/* END OF GET_ELEMENT MODULE */ +/* */ +/* ------------------------------------------------------------------------- */ +/* ------------------------------------------------------------------------- */ +/* ------------------------------------------------------------------------- */ +/* ------------------------------------------------------------------------- */ +/* ------------------------------------------------------------------------- */ +/* */ +/* MODULE: DELETE */ +/* */ +/* ------------------------------------------------------------------------- */ +/* ------------------------------------------------------------------------- */ +/* ------------------------------------------------------------------------- */ +/* COMMITDELETE */ +/* INPUT: OPERATION_REC_PTR, PTR TO AN OPERATION RECORD. */ +/* FRAGRECPTR, PTR TO A FRAGMENT RECORD */ +/* */ +/* OUTPUT: */ +/* NONE */ +/* DESCRIPTION: DELETE OPERATIONS WILL BE COMPLETED AT THE + * COMMIT OF TRANSACTION. THIS SUBROUTINE SEARCHS FOR ELEMENT AND + * DELETES IT. IT DOES SO BY REPLACING IT WITH THE LAST + * ELEMENT IN THE BUCKET. IF THE DELETED ELEMENT IS ALSO THE LAST + * ELEMENT THEN IT IS ONLY NECESSARY TO REMOVE THE ELEMENT + * ------------------------------------------------------------------------- */ +void +Dbacc::report_dealloc(Signal* signal, const Operationrec* opPtrP) +{ + Uint32 localKey = opPtrP->localdata[0]; + Uint32 opbits = opPtrP->m_op_bits; + Uint32 userptr= opPtrP->userptr; + Uint32 scanInd = + ((opbits & Operationrec::OP_MASK) == ZSCAN_OP) || + (opbits & Operationrec::OP_LOCK_REQ); + + if (localKey != ~(Uint32)0) + { + signal->theData[0] = fragrecptr.p->myfid; + signal->theData[1] = fragrecptr.p->myTableId; + Uint32 pageId = localKey >> MAX_TUPLES_BITS; + Uint32 pageIndex = localKey & ((1 << MAX_TUPLES_BITS) - 1); + signal->theData[2] = pageId; + signal->theData[3] = pageIndex; + signal->theData[4] = userptr; + signal->theData[5] = scanInd; + EXECUTE_DIRECT(DBLQH, GSN_TUP_DEALLOCREQ, signal, 6); + jamEntry(); + } +} + void Dbacc::commitdelete(Signal* signal) { jam(); - Uint32 localKey = operationRecPtr.p->localdata[0]; - Uint32 userptr= operationRecPtr.p->userptr; - Uint32 scanInd = operationRecPtr.p->operation == ZSCAN_OP - || operationRecPtr.p->isAccLockReq; - - signal->theData[0] = fragrecptr.p->myfid; - signal->theData[1] = fragrecptr.p->myTableId; - Uint32 pageId = localKey >> MAX_TUPLES_BITS; - Uint32 pageIndex = localKey & ((1 << MAX_TUPLES_BITS) - 1); - signal->theData[2] = pageId; - signal->theData[3] = pageIndex; - signal->theData[4] = userptr; - signal->theData[5] = scanInd; - EXECUTE_DIRECT(DBLQH, GSN_TUP_DEALLOCREQ, signal, 6); - jamEntry(); + report_dealloc(signal, operationRecPtr.p); getdirindex(signal); tlastPageindex = tgdiPageindex; @@ -3263,31 +3939,316 @@ void Dbacc::checkoverfreelist(Signal* signal) /*BE CHECKED. THE OPERATION RECORD WILL BE REMOVED FROM THE QUEUE IF IT */ /*BELONGED TO ANY ONE, OTHERWISE THE ELEMENT HEAD WILL BE UPDATED. */ /* ------------------------------------------------------------------------- */ + +/** + * + * P0 - P1 - P2 - P3 + * S0 + * S1 + * S2 + */ +void +Dbacc::abortParallelQueueOperation(Signal* signal, OperationrecPtr opPtr) +{ + OperationrecPtr nextP; + OperationrecPtr prevP; + OperationrecPtr loPtr; + + Uint32 opbits = opPtr.p->m_op_bits; + Uint32 opstate = opbits & Operationrec::OP_STATE_MASK; + nextP.i = opPtr.p->nextParallelQue; + prevP.i = opPtr.p->prevParallelQue; + loPtr.i = opPtr.p->m_lock_owner_ptr_i; + + ndbassert(! (opbits & Operationrec::OP_LOCK_OWNER)); + ndbassert(opbits & Operationrec::OP_RUN_QUEUE); + + ptrCheckGuard(prevP, coprecsize, operationrec); + ndbassert(prevP.p->nextParallelQue == opPtr.i); + prevP.p->nextParallelQue = nextP.i; + + if (nextP.i != RNIL) + { + ptrCheckGuard(nextP, coprecsize, operationrec); + ndbassert(nextP.p->prevParallelQue == opPtr.i); + nextP.p->prevParallelQue = prevP.i; + } + else if (prevP.i != loPtr.i) + { + jam(); + ptrCheckGuard(loPtr, coprecsize, operationrec); + ndbassert(loPtr.p->m_op_bits & Operationrec::OP_LOCK_OWNER); + ndbassert(loPtr.p->m_lo_last_parallel_op_ptr_i == opPtr.i); + loPtr.p->m_lo_last_parallel_op_ptr_i = prevP.i; + prevP.p->m_lock_owner_ptr_i = loPtr.i; + + /** + * Abort P3...check start next + */ + startNext(signal, prevP); + validate_lock_queue(prevP); + return; + } + else + { + jam(); + ndbassert(prevP.p->m_op_bits & Operationrec::OP_LOCK_OWNER); + prevP.p->m_lo_last_parallel_op_ptr_i = RNIL; + startNext(signal, prevP); + validate_lock_queue(prevP); + return; + } + + if (opbits & Operationrec::OP_LOCK_MODE) + { + Uint32 nextbits = nextP.p->m_op_bits; + while ((nextbits & Operationrec::OP_LOCK_MODE) == 0) + { + ndbassert(nextbits & Operationrec::OP_ACC_LOCK_MODE); + nextbits &= ~(Uint32)Operationrec::OP_ACC_LOCK_MODE; + nextP.p->m_op_bits = nextbits; + + if (nextP.p->nextParallelQue != RNIL) + { + nextP.i = nextP.p->nextParallelQue; + ptrCheckGuard(nextP, coprecsize, operationrec); + nextbits = nextP.p->m_op_bits; + } + else + { + break; + } + } + } + + /** + * Abort P1, P2 + */ + + /** + * Scan to last of run queue + */ + while (nextP.p->nextParallelQue != RNIL) + { + nextP.i = nextP.p->nextParallelQue; + ptrCheckGuard(nextP, coprecsize, operationrec); + } + +#ifdef VM_TRACE + loPtr.i = nextP.p->m_lock_owner_ptr_i; + ptrCheckGuard(loPtr, coprecsize, operationrec); + ndbassert(loPtr.p->m_op_bits & Operationrec::OP_LOCK_OWNER); + ndbassert(loPtr.p->m_lo_last_parallel_op_ptr_i == nextP.i); +#endif + startNext(signal, nextP); + validate_lock_queue(nextP); + + return; +} + +void +Dbacc::abortSerieQueueOperation(Signal* signal, OperationrecPtr opPtr) +{ + OperationrecPtr prevS, nextS; + OperationrecPtr prevP, nextP; + OperationrecPtr loPtr; + + Uint32 opbits = opPtr.p->m_op_bits; + + prevS.i = opPtr.p->prevSerialQue; + nextS.i = opPtr.p->nextSerialQue; + + prevP.i = opPtr.p->prevParallelQue; + nextP.i = opPtr.p->nextParallelQue; + + ndbassert((opbits & Operationrec::OP_LOCK_OWNER) == 0); + ndbassert((opbits & Operationrec::OP_RUN_QUEUE) == 0); + + if (prevP.i != RNIL) + { + /** + * We're not list head... + */ + ptrCheckGuard(prevP, coprecsize, operationrec); + ndbassert(prevP.p->nextParallelQue == opPtr.i); + prevP.p->nextParallelQue = nextP.i; + + if (nextP.i != RNIL) + { + ptrCheckGuard(nextP, coprecsize, operationrec); + ndbassert(nextP.p->prevParallelQue == opPtr.i); + ndbassert((nextP.p->m_op_bits & Operationrec::OP_STATE_MASK) == + Operationrec::OP_STATE_WAITING); + nextP.p->prevParallelQue = prevP.i; + + if ((prevP.p->m_op_bits & Operationrec::OP_ACC_LOCK_MODE) == 0 && + opbits & Operationrec::OP_LOCK_MODE) + { + /** + * Scan right in parallel queue to fix OP_ACC_LOCK_MODE + */ + while ((nextP.p->m_op_bits & Operationrec::OP_LOCK_MODE) == 0) + { + ndbassert(nextP.p->m_op_bits & Operationrec::OP_ACC_LOCK_MODE); + nextP.p->m_op_bits &= ~(Uint32)Operationrec::OP_ACC_LOCK_MODE; + nextP.i = nextP.p->nextParallelQue; + if (nextP.i == RNIL) + break; + ptrCheckGuard(nextP, coprecsize, operationrec); + } + } + } + validate_lock_queue(prevP); + return; + } + else + { + /** + * We're a list head + */ + ptrCheckGuard(prevS, coprecsize, operationrec); + ndbassert(prevS.p->nextSerialQue == opPtr.i); + + if (nextP.i != RNIL) + { + /** + * Promote nextP to list head + */ + ptrCheckGuard(nextP, coprecsize, operationrec); + ndbassert(nextP.p->prevParallelQue == opPtr.i); + prevS.p->nextSerialQue = nextP.i; + nextP.p->prevParallelQue = RNIL; + nextP.p->nextSerialQue = nextS.i; + if (nextS.i != RNIL) + { + jam(); + ptrCheckGuard(nextS, coprecsize, operationrec); + ndbassert(nextS.p->prevSerialQue == opPtr.i); + nextS.p->prevSerialQue = nextP.i; + validate_lock_queue(prevS); + return; + } + else + { + // nextS is RNIL, i.e we're last in serie queue... + // we must update lockOwner.m_lo_last_serial_op_ptr_i + loPtr = prevS; + while ((loPtr.p->m_op_bits & Operationrec::OP_LOCK_OWNER) == 0) + { + loPtr.i = loPtr.p->prevSerialQue; + ptrCheckGuard(loPtr, coprecsize, operationrec); + } + ndbassert(loPtr.p->m_lo_last_serial_op_ptr_i == opPtr.i); + loPtr.p->m_lo_last_serial_op_ptr_i = nextP.i; + validate_lock_queue(loPtr); + return; + } + } + + if (nextS.i == RNIL) + { + /** + * Abort S2 + */ + + // nextS is RNIL, i.e we're last in serie queue... + // and we have no parallel queue, + // we must update lockOwner.m_lo_last_serial_op_ptr_i + prevS.p->nextSerialQue = RNIL; + + loPtr = prevS; + while ((loPtr.p->m_op_bits & Operationrec::OP_LOCK_OWNER) == 0) + { + loPtr.i = loPtr.p->prevSerialQue; + ptrCheckGuard(loPtr, coprecsize, operationrec); + } + ndbassert(loPtr.p->m_lo_last_serial_op_ptr_i == opPtr.i); + if (prevS.i != loPtr.i) + { + jam(); + loPtr.p->m_lo_last_serial_op_ptr_i = prevS.i; + } + else + { + loPtr.p->m_lo_last_serial_op_ptr_i = RNIL; + } + validate_lock_queue(loPtr); + } + else if (nextP.i == RNIL) + { + ptrCheckGuard(nextS, coprecsize, operationrec); + ndbassert(nextS.p->prevSerialQue == opPtr.i); + prevS.p->nextSerialQue = nextS.i; + nextS.p->prevSerialQue = prevS.i; + + if (prevS.p->m_op_bits & Operationrec::OP_LOCK_OWNER) + { + /** + * Abort S0 + */ + OperationrecPtr lastOp; + lastOp.i = prevS.p->m_lo_last_parallel_op_ptr_i; + if (lastOp.i != RNIL) + { + jam(); + ptrCheckGuard(lastOp, coprecsize, operationrec); + ndbassert(lastOp.p->m_lock_owner_ptr_i = prevS.i); + } + else + { + jam(); + lastOp = prevS; + } + startNext(signal, lastOp); + validate_lock_queue(lastOp); + } + else + { + validate_lock_queue(prevS); + } + } + } +} + + void Dbacc::abortOperation(Signal* signal) { - OperationrecPtr aboOperRecPtr; - OperationrecPtr TaboOperRecPtr; - Page8Ptr aboPageidptr; - Uint32 taboElementptr; - Uint32 tmp2Olq; + Uint32 opbits = operationRecPtr.p->m_op_bits; + + validate_lock_queue(operationRecPtr); - if (operationRecPtr.p->lockOwner == ZTRUE) { + if (opbits & Operationrec::OP_LOCK_OWNER) + { takeOutLockOwnersList(signal, operationRecPtr); - if (operationRecPtr.p->insertIsDone == ZTRUE) { + opbits &= ~(Uint32)Operationrec::OP_LOCK_OWNER; + if (opbits & Operationrec::OP_INSERT_IS_DONE) + { jam(); - operationRecPtr.p->elementIsDisappeared = ZTRUE; + opbits |= Operationrec::OP_ELEMENT_DISAPPEARED; }//if - if ((operationRecPtr.p->nextParallelQue != RNIL) || - (operationRecPtr.p->nextSerialQue != RNIL)) { + operationRecPtr.p->m_op_bits = opbits; + const bool queue = (operationRecPtr.p->nextParallelQue != RNIL || + operationRecPtr.p->nextSerialQue != RNIL); + + if (queue) + { jam(); - releaselock(signal); - } else { - /* --------------------------------------------------------------------------------- */ - /* WE ARE OWNER OF THE LOCK AND NO OTHER OPERATIONS ARE QUEUED. IF INSERT OR STANDBY */ - /* WE DELETE THE ELEMENT OTHERWISE WE REMOVE THE LOCK FROM THE ELEMENT. */ - /* --------------------------------------------------------------------------------- */ - if (operationRecPtr.p->elementIsDisappeared == ZFALSE) { + release_lockowner(signal, operationRecPtr, false); + } + else + { + /* ------------------------------------------------------------------- + * WE ARE OWNER OF THE LOCK AND NO OTHER OPERATIONS ARE QUEUED. + * IF INSERT OR STANDBY WE DELETE THE ELEMENT OTHERWISE WE REMOVE + * THE LOCK FROM THE ELEMENT. + * ------------------------------------------------------------------ */ + if ((opbits & Operationrec::OP_ELEMENT_DISAPPEARED) == 0) + { jam(); + Page8Ptr aboPageidptr; + Uint32 taboElementptr; + Uint32 tmp2Olq; + taboElementptr = operationRecPtr.p->elementPointer; aboPageidptr.i = operationRecPtr.p->elementPage; tmp2Olq = ElementHeader::setUnlocked(operationRecPtr.p->hashvaluePart, @@ -3297,87 +4258,31 @@ void Dbacc::abortOperation(Signal* signal) arrGuard(taboElementptr, 2048); aboPageidptr.p->word32[taboElementptr] = tmp2Olq; return; - } else { + } + else + { jam(); commitdelete(signal); }//if }//if - } else { - /* --------------------------------------------------------------- */ - // We are not the lock owner. - /* --------------------------------------------------------------- */ - jam(); - if (operationRecPtr.p->prevParallelQue != RNIL) { - jam(); - /* ---------------------------------------------------------------------------------- */ - /* SINCE WE ARE NOT QUEUE LEADER WE NEED NOT CONSIDER IF THE ELEMENT IS TO BE DELETED.*/ - /* We will simply remove it from the parallel list without any other rearrangements. */ - /* ---------------------------------------------------------------------------------- */ - aboOperRecPtr.i = operationRecPtr.p->prevParallelQue; - ptrCheckGuard(aboOperRecPtr, coprecsize, operationrec); - aboOperRecPtr.p->nextParallelQue = operationRecPtr.p->nextParallelQue; - if (operationRecPtr.p->nextParallelQue != RNIL) { - jam(); - aboOperRecPtr.i = operationRecPtr.p->nextParallelQue; - ptrCheckGuard(aboOperRecPtr, coprecsize, operationrec); - aboOperRecPtr.p->prevParallelQue = operationRecPtr.p->prevParallelQue; - }//if - } else if (operationRecPtr.p->prevSerialQue != RNIL) { - /* ------------------------------------------------------------------------- */ - // We are not in the parallel queue owning the lock. Thus we are in another parallel - // queue longer down in the serial queue. We are however first since prevParallelQue - // == RNIL. - /* ------------------------------------------------------------------------- */ - if (operationRecPtr.p->nextParallelQue != RNIL) { - jam(); - /* ------------------------------------------------------------------------- */ - // We have an operation in the queue after us. We simply rearrange this parallel queue. - // The new leader of this parallel queue will be operation in the serial queue. - /* ------------------------------------------------------------------------- */ - aboOperRecPtr.i = operationRecPtr.p->nextParallelQue; - ptrCheckGuard(aboOperRecPtr, coprecsize, operationrec); - aboOperRecPtr.p->nextSerialQue = operationRecPtr.p->nextSerialQue; - aboOperRecPtr.p->prevSerialQue = operationRecPtr.p->prevSerialQue; - aboOperRecPtr.p->prevParallelQue = RNIL; // Queue Leader - if (operationRecPtr.p->nextSerialQue != RNIL) { - jam(); - TaboOperRecPtr.i = operationRecPtr.p->nextSerialQue; - ptrCheckGuard(TaboOperRecPtr, coprecsize, operationrec); - TaboOperRecPtr.p->prevSerialQue = aboOperRecPtr.i; - }//if - TaboOperRecPtr.i = operationRecPtr.p->prevSerialQue; - ptrCheckGuard(TaboOperRecPtr, coprecsize, operationrec); - TaboOperRecPtr.p->nextSerialQue = aboOperRecPtr.i; - } else { - jam(); - /* ------------------------------------------------------------------------- */ - // We are the only operation in this parallel queue. We will thus shrink the serial - // queue. - /* ------------------------------------------------------------------------- */ - aboOperRecPtr.i = operationRecPtr.p->prevSerialQue; - ptrCheckGuard(aboOperRecPtr, coprecsize, operationrec); - aboOperRecPtr.p->nextSerialQue = operationRecPtr.p->nextSerialQue; - if (operationRecPtr.p->nextSerialQue != RNIL) { - jam(); - aboOperRecPtr.i = operationRecPtr.p->nextSerialQue; - ptrCheckGuard(aboOperRecPtr, coprecsize, operationrec); - aboOperRecPtr.p->prevSerialQue = operationRecPtr.p->prevSerialQue; - }//if - }//if - }//if - }//if - /* ------------------------------------------------------------------------- */ - // If prevParallelQue = RNIL and prevSerialQue = RNIL and we are not owner of the - // lock then we cannot be in any lock queue at all. - /* ------------------------------------------------------------------------- */ -}//Dbacc::abortOperation() + } + else if (opbits & Operationrec::OP_RUN_QUEUE) + { + abortParallelQueueOperation(signal, operationRecPtr); + } + else + { + abortSerieQueueOperation(signal, operationRecPtr); + } +} -void Dbacc::commitDeleteCheck() +void +Dbacc::commitDeleteCheck() { OperationrecPtr opPtr; OperationrecPtr lastOpPtr; OperationrecPtr deleteOpPtr; - bool elementDeleted = false; + Uint32 elementDeleted = 0; bool deleteCheckOngoing = true; Uint32 hashValue = 0; lastOpPtr = operationRecPtr; @@ -3390,7 +4295,9 @@ void Dbacc::commitDeleteCheck() }//while deleteOpPtr = lastOpPtr; do { - if (deleteOpPtr.p->operation == ZDELETE) { + Uint32 opbits = deleteOpPtr.p->m_op_bits; + Uint32 op = opbits & Operationrec::OP_MASK; + if (op == ZDELETE) { jam(); /* ------------------------------------------------------------------- * IF THE CURRENT OPERATION TO BE COMMITTED IS A DELETE OPERATION DUE TO @@ -3404,10 +4311,9 @@ void Dbacc::commitDeleteCheck() * DELETE-OPERATION THAT HAS A HASH VALUE. * ----------------------------------------------------------------- */ hashValue = deleteOpPtr.p->hashValue; - elementDeleted = true; + elementDeleted = Operationrec::OP_ELEMENT_DISAPPEARED; deleteCheckOngoing = false; - } else if ((deleteOpPtr.p->operation == ZREAD) || - (deleteOpPtr.p->operation == ZSCAN_OP)) { + } else if (op == ZREAD || op == ZSCAN_OP) { /* ------------------------------------------------------------------- * We are trying to find out whether the commit will in the end delete * the tuple. Normally the delete will be the last operation in the @@ -3418,7 +4324,7 @@ void Dbacc::commitDeleteCheck() * we have to continue scanning the list looking for a delete operation. */ deleteOpPtr.i = deleteOpPtr.p->prevParallelQue; - if (deleteOpPtr.i == RNIL) { + if (opbits & Operationrec::OP_LOCK_OWNER) { jam(); deleteCheckOngoing = false; } else { @@ -3437,14 +4343,14 @@ void Dbacc::commitDeleteCheck() opPtr = lastOpPtr; do { jam(); - opPtr.p->commitDeleteCheckFlag = ZTRUE; + opPtr.p->m_op_bits |= Operationrec::OP_COMMIT_DELETE_CHECK; if (elementDeleted) { jam(); - opPtr.p->elementIsDisappeared = ZTRUE; + opPtr.p->m_op_bits |= elementDeleted; opPtr.p->hashValue = hashValue; }//if opPtr.i = opPtr.p->prevParallelQue; - if (opPtr.i == RNIL) { + if (opPtr.p->m_op_bits & Operationrec::OP_LOCK_OWNER) { jam(); break; }//if @@ -3460,14 +4366,13 @@ void Dbacc::commitDeleteCheck() /* ------------------------------------------------------------------------- */ void Dbacc::commitOperation(Signal* signal) { - OperationrecPtr tolqTmpPtr; - Page8Ptr coPageidptr; - Uint32 tcoElementptr; - Uint32 tmp2Olq; + validate_lock_queue(operationRecPtr); - if ((operationRecPtr.p->commitDeleteCheckFlag == ZFALSE) && - (operationRecPtr.p->operation != ZSCAN_OP) && - (operationRecPtr.p->operation != ZREAD)) { + Uint32 opbits = operationRecPtr.p->m_op_bits; + Uint32 op = opbits & Operationrec::OP_MASK; + ndbrequire((opbits & Operationrec::OP_STATE_MASK) == Operationrec::OP_STATE_EXECUTED); + if ((opbits & Operationrec::OP_COMMIT_DELETE_CHECK) == 0 && (op != ZREAD)) + { jam(); /* This method is used to check whether the end result of the transaction will be to delete the tuple. In this case all operation will be marked @@ -3479,16 +4384,30 @@ void Dbacc::commitOperation(Signal* signal) lock is released. */ commitDeleteCheck(); + opbits = operationRecPtr.p->m_op_bits; }//if - if (operationRecPtr.p->lockOwner == ZTRUE) { + + ndbassert(opbits & Operationrec::OP_RUN_QUEUE); + + if (opbits & Operationrec::OP_LOCK_OWNER) + { takeOutLockOwnersList(signal, operationRecPtr); - if ((operationRecPtr.p->nextParallelQue == RNIL) && - (operationRecPtr.p->nextSerialQue == RNIL) && - (operationRecPtr.p->elementIsDisappeared == ZFALSE)) { + opbits &= ~(Uint32)Operationrec::OP_LOCK_OWNER; + operationRecPtr.p->m_op_bits = opbits; + + const bool queue = (operationRecPtr.p->nextParallelQue != RNIL || + operationRecPtr.p->nextSerialQue != RNIL); + + if (!queue && (opbits & Operationrec::OP_ELEMENT_DISAPPEARED) == 0) + { /* - This is the normal path through the commit for operations owning the - lock without any queues and not a delete operation. - */ + * This is the normal path through the commit for operations owning the + * lock without any queues and not a delete operation. + */ + Page8Ptr coPageidptr; + Uint32 tcoElementptr; + Uint32 tmp2Olq; + coPageidptr.i = operationRecPtr.p->elementPage; tcoElementptr = operationRecPtr.p->elementPointer; tmp2Olq = ElementHeader::setUnlocked(operationRecPtr.p->hashvaluePart, @@ -3498,445 +4417,421 @@ void Dbacc::commitOperation(Signal* signal) arrGuard(tcoElementptr, 2048); coPageidptr.p->word32[tcoElementptr] = tmp2Olq; return; - } else if ((operationRecPtr.p->nextParallelQue != RNIL) || - (operationRecPtr.p->nextSerialQue != RNIL)) { + } + else if (queue) + { jam(); /* - The case when there is a queue lined up. - Release the lock and pass it to the next operation lined up. - */ - releaselock(signal); + * The case when there is a queue lined up. + * Release the lock and pass it to the next operation lined up. + */ + release_lockowner(signal, operationRecPtr, true); return; - } else { + } + else + { jam(); /* - No queue and elementIsDisappeared is true. We perform the actual delete - operation. - */ + * No queue and elementIsDisappeared is true. + * We perform the actual delete operation. + */ commitdelete(signal); return; }//if - } else { - /* - THE OPERATION DOES NOT OWN THE LOCK. IT MUST BE IN A LOCK QUEUE OF THE - ELEMENT. - */ - ndbrequire(operationRecPtr.p->prevParallelQue != RNIL); + } + else + { + /** + * THE OPERATION DOES NOT OWN THE LOCK. IT MUST BE IN A LOCK QUEUE OF THE + * ELEMENT. + */ + jam(); + OperationrecPtr prev, next, lockOwner; + prev.i = operationRecPtr.p->prevParallelQue; + next.i = operationRecPtr.p->nextParallelQue; + lockOwner.i = operationRecPtr.p->m_lock_owner_ptr_i; + ptrCheckGuard(prev, coprecsize, operationrec); + + prev.p->nextParallelQue = next.i; + if (next.i != RNIL) + { + jam(); + ptrCheckGuard(next, coprecsize, operationrec); + next.p->prevParallelQue = prev.i; + } + else if (prev.p->m_op_bits & Operationrec::OP_LOCK_OWNER) + { + jam(); + ndbassert(lockOwner.i == prev.i); + prev.p->m_lo_last_parallel_op_ptr_i = RNIL; + next = prev; + } + else + { + jam(); + /** + * Last operation in parallell queue + */ + ndbassert(prev.i != lockOwner.i); + ptrCheckGuard(lockOwner, coprecsize, operationrec); + ndbassert(lockOwner.p->m_op_bits & Operationrec::OP_LOCK_OWNER); + lockOwner.p->m_lo_last_parallel_op_ptr_i = prev.i; + prev.p->m_lock_owner_ptr_i = lockOwner.i; + next = prev; + } + + /** + * Check possible lock upgrade + */ + if(opbits & Operationrec::OP_ACC_LOCK_MODE) + { + jam(); + + /** + * Not lock owner...committing a exclusive operation... + * + * e.g + * T1(R) T1(X) + * T2(R/X) + * + * If T1(X) commits T2(R/X) is not supposed to run + * as T1(R) should also commit + * + * e.g + * T1(R) T1(X) T1*(R) + * T2(R/X) + * + * If T1*(R) commits T2(R/X) is not supposed to run + * as T1(R),T2(x) should also commit + */ + validate_lock_queue(prev); + return; + } + + /** + * We committed a shared lock + * Check if we can start next... + */ + while(next.p->nextParallelQue != RNIL) + { + jam(); + next.i = next.p->nextParallelQue; + ptrCheckGuard(next, coprecsize, operationrec); + + if ((next.p->m_op_bits & Operationrec::OP_STATE_MASK) != + Operationrec::OP_STATE_EXECUTED) + { + jam(); + return; + } + } + + startNext(signal, next); + + validate_lock_queue(prev); + } +}//Dbacc::commitOperation() + +void +Dbacc::release_lockowner(Signal* signal, OperationrecPtr opPtr, bool commit) +{ + OperationrecPtr nextP; + OperationrecPtr nextS; + OperationrecPtr newOwner; + OperationrecPtr lastP; + + Uint32 opbits = opPtr.p->m_op_bits; + nextP.i = opPtr.p->nextParallelQue; + nextS.i = opPtr.p->nextSerialQue; + lastP.i = opPtr.p->m_lo_last_parallel_op_ptr_i; + Uint32 lastS = opPtr.p->m_lo_last_serial_op_ptr_i; + + ndbassert(lastP.i != RNIL || lastS != RNIL); + ndbassert(nextP.i != RNIL || nextS.i != RNIL); + + enum { + NOTHING, + CHECK_LOCK_UPGRADE, + START_NEW + } action = NOTHING; + + if (nextP.i != RNIL) + { jam(); - tolqTmpPtr.i = operationRecPtr.p->prevParallelQue; - ptrCheckGuard(tolqTmpPtr, coprecsize, operationrec); - tolqTmpPtr.p->nextParallelQue = operationRecPtr.p->nextParallelQue; - if (operationRecPtr.p->nextParallelQue != RNIL) { - jam(); - tolqTmpPtr.i = operationRecPtr.p->nextParallelQue; - ptrCheckGuard(tolqTmpPtr, coprecsize, operationrec); - tolqTmpPtr.p->prevParallelQue = operationRecPtr.p->prevParallelQue; - }//if + ptrCheckGuard(nextP, coprecsize, operationrec); + newOwner = nextP; - /** - * Check possible lock upgrade - * 1) Find lock owner - * 2) Count transactions in parallel que - * 3) If count == 1 and TRANSID(next serial) == TRANSID(lock owner) - * upgrade next serial - */ - if(operationRecPtr.p->lockMode) + if (lastP.i == newOwner.i) { - jam(); - /** - * Committing a non shared operation can't lead to lock upgrade - */ - return; + newOwner.p->m_lo_last_parallel_op_ptr_i = RNIL; + lastP = nextP; + } + else + { + ptrCheckGuard(lastP, coprecsize, operationrec); + newOwner.p->m_lo_last_parallel_op_ptr_i = lastP.i; + lastP.p->m_lock_owner_ptr_i = newOwner.i; } - OperationrecPtr lock_owner; - lock_owner.i = operationRecPtr.p->prevParallelQue; - ptrCheckGuard(lock_owner, coprecsize, operationrec); - Uint32 transid[2] = { lock_owner.p->transId1, - lock_owner.p->transId2 }; + newOwner.p->m_lo_last_serial_op_ptr_i = lastS; + newOwner.p->nextSerialQue = nextS.i; + if (nextS.i != RNIL) + { + jam(); + ptrCheckGuard(nextS, coprecsize, operationrec); + ndbassert(nextS.p->prevSerialQue == opPtr.i); + nextS.p->prevSerialQue = newOwner.i; + } - while(lock_owner.p->prevParallelQue != RNIL) + if (commit) { - lock_owner.i = lock_owner.p->prevParallelQue; - ptrCheckGuard(lock_owner, coprecsize, operationrec); - - if(lock_owner.p->transId1 != transid[0] || - lock_owner.p->transId2 != transid[1]) + if ((opbits & Operationrec::OP_ACC_LOCK_MODE) == ZREADLOCK) { jam(); /** - * If more than 1 trans in lock queue -> no lock upgrade + * Lock owner...committing a shared operation... + * this can be a lock upgrade + * + * e.g + * T1(R) T2(R) + * T2(X) + * + * If T1(R) commits T2(X) is supposed to run + * + * e.g + * T1(X) T1(R) + * T2(R) + * + * If T1(X) commits, then T1(R) _should_ commit before T2(R) is + * allowed to proceed */ - return; + action = CHECK_LOCK_UPGRADE; + } + else + { + jam(); + newOwner.p->m_op_bits |= Operationrec::OP_LOCK_MODE; } } - - check_lock_upgrade(signal, lock_owner, operationRecPtr); - } -}//Dbacc::commitOperation() + else + { + /** + * Aborting an operation can *always* lead to lock upgrade + */ + action = CHECK_LOCK_UPGRADE; -void -Dbacc::check_lock_upgrade(Signal* signal, - OperationrecPtr lock_owner, - OperationrecPtr release_op) -{ - if((lock_owner.p->transId1 == release_op.p->transId1 && - lock_owner.p->transId2 == release_op.p->transId2) || - release_op.p->lockMode || - lock_owner.p->nextSerialQue == RNIL) + /** + * Update ACC_LOCK_MODE + */ + if (opbits & Operationrec::OP_LOCK_MODE) + { + Uint32 nextbits = nextP.p->m_op_bits; + while ((nextbits & Operationrec::OP_LOCK_MODE) == 0) + { + ndbassert(nextbits & Operationrec::OP_ACC_LOCK_MODE); + nextbits &= ~(Uint32)Operationrec::OP_ACC_LOCK_MODE; + nextP.p->m_op_bits = nextbits; + + if (nextP.p->nextParallelQue != RNIL) + { + nextP.i = nextP.p->nextParallelQue; + ptrCheckGuard(nextP, coprecsize, operationrec); + nextbits = nextP.p->m_op_bits; + } + else + { + break; + } + } + } + } + } + else { jam(); - /** - * No lock upgrade if same trans or lock owner has no serial queue - * or releasing non shared op - */ - return; - } + ptrCheckGuard(nextS, coprecsize, operationrec); + newOwner = nextS; + + newOwner.p->m_op_bits |= Operationrec::OP_RUN_QUEUE; + + if (opbits & Operationrec::OP_ELEMENT_DISAPPEARED) + { + report_dealloc(signal, opPtr.p); + newOwner.p->localdata[0] = ~0; + } + else + { + jam(); + newOwner.p->localdata[0] = opPtr.p->localdata[0]; + newOwner.p->localdata[1] = opPtr.p->localdata[1]; + } + + lastP = newOwner; + while (lastP.p->nextParallelQue != RNIL) + { + lastP.i = lastP.p->nextParallelQue; + ptrCheckGuard(lastP, coprecsize, operationrec); + lastP.p->m_op_bits |= Operationrec::OP_RUN_QUEUE; + } + + if (newOwner.i != lastP.i) + { + jam(); + newOwner.p->m_lo_last_parallel_op_ptr_i = lastP.i; + } + else + { + jam(); + newOwner.p->m_lo_last_parallel_op_ptr_i = RNIL; + } - OperationrecPtr next; - next.i = lock_owner.p->nextSerialQue; - ptrCheckGuard(next, coprecsize, operationrec); + if (newOwner.i != lastS) + { + jam(); + newOwner.p->m_lo_last_serial_op_ptr_i = lastS; + } + else + { + jam(); + newOwner.p->m_lo_last_serial_op_ptr_i = RNIL; + } + + action = START_NEW; + } - if(lock_owner.p->transId1 != next.p->transId1 || - lock_owner.p->transId2 != next.p->transId2) + insertLockOwnersList(signal, newOwner); + + /** + * Copy op info, and store op in element + * + */ { - jam(); - /** - * No lock upgrad if !same trans in serial queue - */ - return; + newOwner.p->elementPage = opPtr.p->elementPage; + newOwner.p->elementIsforward = opPtr.p->elementIsforward; + newOwner.p->elementPointer = opPtr.p->elementPointer; + newOwner.p->elementContainer = opPtr.p->elementContainer; + newOwner.p->scanBits = opPtr.p->scanBits; + newOwner.p->hashvaluePart = opPtr.p->hashvaluePart; + newOwner.p->m_op_bits |= (opbits & Operationrec::OP_ELEMENT_DISAPPEARED); + if (opbits & Operationrec::OP_ELEMENT_DISAPPEARED) + { + /* ------------------------------------------------------------------- */ + // If the elementIsDisappeared is set then we know that the + // hashValue is also set since it always originates from a + // committing abort or a aborting insert. + // Scans do not initialise the hashValue and must have this + // value initialised if they are + // to successfully commit the delete. + /* ------------------------------------------------------------------- */ + jam(); + newOwner.p->hashValue = opPtr.p->hashValue; + }//if + + Page8Ptr pagePtr; + pagePtr.i = newOwner.p->elementPage; + ptrCheckGuard(pagePtr, cpagesize, page8); + const Uint32 tmp = ElementHeader::setLocked(newOwner.i); + arrGuard(newOwner.p->elementPointer, 2048); + pagePtr.p->word32[newOwner.p->elementPointer] = tmp; } - if (getNoParallelTransaction(lock_owner.p) > 1) - { - jam(); - /** - * No lock upgrade if more than 1 transaction in parallell queue - */ + switch(action){ + case NOTHING: + validate_lock_queue(newOwner); return; + case START_NEW: + startNew(signal, newOwner); + validate_lock_queue(newOwner); + return; + case CHECK_LOCK_UPGRADE: + startNext(signal, lastP); + validate_lock_queue(lastP); + break; } + +} - if (getNoParallelTransaction(next.p) > 1) +void +Dbacc::startNew(Signal* signal, OperationrecPtr newOwner) +{ + OperationrecPtr save = operationRecPtr; + operationRecPtr = newOwner; + + Uint32 opbits = newOwner.p->m_op_bits; + Uint32 op = opbits & Operationrec::OP_MASK; + Uint32 opstate = (opbits & Operationrec::OP_STATE_MASK); + ndbassert(opstate == Operationrec::OP_STATE_WAITING); + ndbassert(opbits & Operationrec::OP_LOCK_OWNER); + const bool deleted = opbits & Operationrec::OP_ELEMENT_DISAPPEARED; + Uint32 errCode = 0; + + opbits &= opbits & ~(Uint32)Operationrec::OP_STATE_MASK; + opbits |= Operationrec::OP_STATE_RUNNING; + + if (op == ZSCAN_OP && (opbits & Operationrec::OP_LOCK_REQ) == 0) + goto scan; + + if (deleted) { jam(); - /** - * No lock upgrade if more than 1 transaction in next's parallell queue - */ - return; + if (op != ZINSERT && op != ZWRITE) + { + errCode = ZREAD_ERROR; + goto ref; + } + + opbits &= ~(Uint32)Operationrec::OP_MASK; + opbits &= ~(Uint32)Operationrec::OP_ELEMENT_DISAPPEARED; + opbits |= (op = ZINSERT); + opbits |= Operationrec::OP_INSERT_IS_DONE; + goto conf; } - - OperationrecPtr tmp; - tmp.i = lock_owner.p->nextSerialQue = next.p->nextSerialQue; - if(tmp.i != RNIL) + else if (op == ZINSERT) { - ptrCheckGuard(tmp, coprecsize, operationrec); - ndbassert(tmp.p->prevSerialQue == next.i); - tmp.p->prevSerialQue = lock_owner.i; + jam(); + errCode = ZWRITE_ERROR; + goto ref; } - next.p->nextSerialQue = next.p->prevSerialQue = RNIL; - - // Find end of parallell que - tmp = lock_owner; - Uint32 lockMode = next.p->lockMode > lock_owner.p->lockMode ? - next.p->lockMode : lock_owner.p->lockMode; - while(tmp.p->nextParallelQue != RNIL) + else if (op == ZWRITE) { jam(); - tmp.i = tmp.p->nextParallelQue; - tmp.p->lockMode = lockMode; - ptrCheckGuard(tmp, coprecsize, operationrec); + opbits &= ~(Uint32)Operationrec::OP_MASK; + opbits |= (op = ZUPDATE); + goto conf; } - tmp.p->lockMode = lockMode; + +conf: + newOwner.p->m_op_bits = opbits; + + sendAcckeyconf(signal); + sendSignal(newOwner.p->userblockref, GSN_ACCKEYCONF, + signal, 6, JBA); + + operationRecPtr = save; + return; - next.p->prevParallelQue = tmp.i; - tmp.p->nextParallelQue = next.i; +scan: + jam(); + newOwner.p->m_op_bits = opbits; - OperationrecPtr save = operationRecPtr; + takeOutScanLockQueue(newOwner.p->scanRecPtr); + putReadyScanQueue(signal, newOwner.p->scanRecPtr); - Uint32 localdata[2]; - localdata[0] = lock_owner.p->localdata[0]; - localdata[1] = lock_owner.p->localdata[1]; - do { - next.p->localdata[0] = localdata[0]; - next.p->localdata[1] = localdata[1]; - next.p->lockMode = lockMode; - - operationRecPtr = next; - executeNextOperation(signal); - if (next.p->nextParallelQue != RNIL) - { - jam(); - next.i = next.p->nextParallelQue; - ptrCheckGuard(next, coprecsize, operationrec); - } else { - jam(); - break; - }//if - } while (1); - operationRecPtr = save; + return; -} - -/* ------------------------------------------------------------------------- */ -/* RELEASELOCK */ -/* RESETS LOCK OF AN ELEMENT. */ -/* INFORMATION ABOUT THE ELEMENT IS SAVED IN THE OPERATION RECORD */ -/* THESE INFORMATION IS USED TO UPDATE HEADER OF THE ELEMENT */ -/* ------------------------------------------------------------------------- */ -void Dbacc::releaselock(Signal* signal) -{ - OperationrecPtr rloOperPtr; - OperationrecPtr trlOperPtr; - OperationrecPtr trlTmpOperPtr; - Uint32 TelementIsDisappeared; - - trlOperPtr.i = RNIL; - if (operationRecPtr.p->nextParallelQue != RNIL) { - jam(); - /** --------------------------------------------------------------------- - * NEXT OPERATION TAKES OVER THE LOCK. - * We will simply move the info from the leader - * to the new queue leader. - * -------------------------------------------------------------------- */ - trlOperPtr.i = operationRecPtr.p->nextParallelQue; - ptrCheckGuard(trlOperPtr, coprecsize, operationrec); - copyInOperPtr = trlOperPtr; - copyOperPtr = operationRecPtr; - copyOpInfo(signal); - trlOperPtr.p->prevParallelQue = RNIL; - if (operationRecPtr.p->nextSerialQue != RNIL) { - jam(); - /* ----------------------------------------------------------------- - * THERE IS A SERIAL QUEUE. MOVE IT FROM RELEASED OP REC TO THE - * NEW LOCK OWNER. - * ------------------------------------------------------------------ */ - trlOperPtr.p->nextSerialQue = operationRecPtr.p->nextSerialQue; - trlTmpOperPtr.i = trlOperPtr.p->nextSerialQue; - ptrCheckGuard(trlTmpOperPtr, coprecsize, operationrec); - trlTmpOperPtr.p->prevSerialQue = trlOperPtr.i; - }//if - - check_lock_upgrade(signal, copyInOperPtr, operationRecPtr); - } else { - ndbrequire(operationRecPtr.p->nextSerialQue != RNIL); - jam(); - /** --------------------------------------------------------------------- - * THE PARALLEL QUEUE IS EMPTY AND THE SERIAL QUEUE IS NOT EMPTY. - * WE NEED TO REARRANGE LISTS AND START A NUMBER OF OPERATIONS. - * -------------------------------------------------------------------- */ - trlOperPtr.i = operationRecPtr.p->nextSerialQue; - ptrCheckGuard(trlOperPtr, coprecsize, operationrec); - copyOperPtr = operationRecPtr; - copyInOperPtr = trlOperPtr; - copyOpInfo(signal); - trlOperPtr.p->prevSerialQue = RNIL; - ndbrequire(trlOperPtr.p->prevParallelQue == RNIL); - /* --------------------------------------------------------------------- */ - /* WE HAVE MOVED TO THE NEXT PARALLEL QUEUE. WE MUST START ALL OF THOSE */ - /* OPERATIONS WHICH UP TILL NOW HAVE BEEN QUEUED WAITING FOR THE LOCK. */ - /* --------------------------------------------------------------------- */ - rloOperPtr = operationRecPtr; - trlTmpOperPtr = trlOperPtr; - TelementIsDisappeared = trlOperPtr.p->elementIsDisappeared; - Uint32 ThashValue = trlOperPtr.p->hashValue; - do { - /* ------------------------------------------------------------------ */ - // Ensure that all operations in the queue are assigned with the - // elementIsDisappeared to ensure that the element is removed after - // a previous delete. An insert does however revert this decision - // since the element is put back again. - // Local checkpoints complicate life here since they do not - // execute the next operation but simply change - // the state on the operation. - // We need to set-up the variable elementIsDisappeared - // properly even when local checkpoints and inserts/writes after - // deletes occur. - /* ------------------------------------------------------------------- */ - trlTmpOperPtr.p->elementIsDisappeared = TelementIsDisappeared; - if (TelementIsDisappeared == ZTRUE) { - /* ----------------------------------------------------------------- */ - // If the elementIsDisappeared is set then we know that the - // hashValue is also set since it always originates from a - // committing abort or a aborting insert. - // Scans do not initialise the hashValue and must have this - // value initialised if they are to successfully commit the delete. - /* ----------------------------------------------------------------- */ - jam(); - trlTmpOperPtr.p->hashValue = ThashValue; - }//if - trlTmpOperPtr.p->localdata[0] = trlOperPtr.p->localdata[0]; - trlTmpOperPtr.p->localdata[1] = trlOperPtr.p->localdata[1]; - /* ------------------------------------------------------------------- */ - // Restart the queued operation. - /* ------------------------------------------------------------------- */ - operationRecPtr = trlTmpOperPtr; - TelementIsDisappeared = executeNextOperation(signal); - ThashValue = operationRecPtr.p->hashValue; - if (trlTmpOperPtr.p->nextParallelQue != RNIL) { - jam(); - /* ----------------------------------------------------------------- */ - // We will continue with the next operation in the parallel - // queue and start this as well. - /* ----------------------------------------------------------------- */ - trlTmpOperPtr.i = trlTmpOperPtr.p->nextParallelQue; - ptrCheckGuard(trlTmpOperPtr, coprecsize, operationrec); - } else { - jam(); - break; - }//if - } while (1); - operationRecPtr = rloOperPtr; - }//if - - // Insert the next op into the lock owner list - insertLockOwnersList(signal, trlOperPtr); +ref: + newOwner.p->m_op_bits = opbits; + + signal->theData[0] = newOwner.p->userptr; + signal->theData[1] = errCode; + sendSignal(newOwner.p->userblockref, GSN_ACCKEYREF, signal, + 2, JBB); + + operationRecPtr = save; return; -}//Dbacc::releaselock() - -/* --------------------------------------------------------------------------------- */ -/* COPY_OP_INFO */ -/* INPUT: COPY_IN_OPER_PTR AND COPY_OPER_PTR. */ -/* DESCRIPTION:INFORMATION ABOUT THE ELEMENT WILL BE MOVED FROM OPERATION */ -/* REC TO QUEUE OP REC. QUE OP REC TAKES OVER THE LOCK. */ -/* --------------------------------------------------------------------------------- */ -void Dbacc::copyOpInfo(Signal* signal) -{ - Page8Ptr coiPageidptr; - - copyInOperPtr.p->elementPage = copyOperPtr.p->elementPage; - copyInOperPtr.p->elementIsforward = copyOperPtr.p->elementIsforward; - copyInOperPtr.p->elementContainer = copyOperPtr.p->elementContainer; - copyInOperPtr.p->elementPointer = copyOperPtr.p->elementPointer; - copyInOperPtr.p->scanBits = copyOperPtr.p->scanBits; - copyInOperPtr.p->hashvaluePart = copyOperPtr.p->hashvaluePart; - copyInOperPtr.p->elementIsDisappeared = copyOperPtr.p->elementIsDisappeared; - if (copyInOperPtr.p->elementIsDisappeared == ZTRUE) { - /* --------------------------------------------------------------------------------- */ - // If the elementIsDisappeared is set then we know that the hashValue is also set - // since it always originates from a committing abort or a aborting insert. Scans - // do not initialise the hashValue and must have this value initialised if they are - // to successfully commit the delete. - /* --------------------------------------------------------------------------------- */ - jam(); - copyInOperPtr.p->hashValue = copyOperPtr.p->hashValue; - }//if - coiPageidptr.i = copyOperPtr.p->elementPage; - ptrCheckGuard(coiPageidptr, cpagesize, page8); - const Uint32 tmp = ElementHeader::setLocked(copyInOperPtr.i); - dbgWord32(coiPageidptr, copyOperPtr.p->elementPointer, tmp); - arrGuard(copyOperPtr.p->elementPointer, 2048); - coiPageidptr.p->word32[copyOperPtr.p->elementPointer] = tmp; - copyInOperPtr.p->localdata[0] = copyOperPtr.p->localdata[0]; - copyInOperPtr.p->localdata[1] = copyOperPtr.p->localdata[1]; -}//Dbacc::copyOpInfo() - -/* ******************--------------------------------------------------------------- */ -/* EXECUTE NEXT OPERATION */ -/* NEXT OPERATION IN A LOCK QUEUE WILL BE EXECUTED. */ -/* --------------------------------------------------------------------------------- */ -Uint32 Dbacc::executeNextOperation(Signal* signal) -{ - ndbrequire(operationRecPtr.p->transactionstate == ACTIVE); - if (operationRecPtr.p->elementIsDisappeared == ZTRUE) { - /* --------------------------------------------------------------------- */ - /* PREVIOUS OPERATION WAS DELETE OPERATION AND THE ELEMENT IS DELETED. */ - /* --------------------------------------------------------------------- */ - if (((operationRecPtr.p->operation != ZINSERT) && - (operationRecPtr.p->operation != ZWRITE)) || - (operationRecPtr.p->prevParallelQue != RNIL)) { - if (operationRecPtr.p->operation != ZSCAN_OP || - operationRecPtr.p->isAccLockReq) { - jam(); - /* ----------------------------------------------------------------- */ - // Updates and reads with a previous delete simply aborts with read - // error indicating that tuple did not exist. - // Also inserts and writes not being the first operation. - /* ----------------------------------------------------------------- */ - operationRecPtr.p->transactionstate = WAIT_COMMIT_ABORT; - signal->theData[0] = operationRecPtr.p->userptr; - signal->theData[1] = ZREAD_ERROR; - sendSignal(operationRecPtr.p->userblockref, GSN_ACCKEYREF, signal, - 2, JBB); - return operationRecPtr.p->elementIsDisappeared; - } else { - /* ----------------------------------------------------------------- */ - /* ABORT OF OPERATION NEEDED BUT THE OPERATION IS A - * SCAN => SPECIAL TREATMENT. - * IF THE SCAN WAITS IN QUEUE THEN WE MUST REMOVE THE OPERATION - * FROM THE SCAN LOCK QUEUE AND IF NO MORE OPERATIONS ARE QUEUED - * THEN WE SHOULD RESTART THE SCAN PROCESS. OTHERWISE WE SIMPLY - * RELEASE THE OPERATION AND DECREASE THE NUMBER OF LOCKS HELD. - * ----------------------------------------------------------------- */ - takeOutScanLockQueue(operationRecPtr.p->scanRecPtr); - putReadyScanQueue(signal, operationRecPtr.p->scanRecPtr); - return operationRecPtr.p->elementIsDisappeared; - }//if - }//if - /* --------------------------------------------------------------------- */ - // Insert and writes can continue but need to be converted to inserts. - /* --------------------------------------------------------------------- */ - jam(); - operationRecPtr.p->elementIsDisappeared = ZFALSE; - operationRecPtr.p->operation = ZINSERT; - operationRecPtr.p->insertIsDone = ZTRUE; - } else if (operationRecPtr.p->operation == ZINSERT) { - bool abortFlag = true; - if (operationRecPtr.p->prevParallelQue != RNIL) { - OperationrecPtr prevOpPtr; - jam(); - prevOpPtr.i = operationRecPtr.p->prevParallelQue; - ptrCheckGuard(prevOpPtr, coprecsize, operationrec); - if (prevOpPtr.p->operation == ZDELETE) { - jam(); - abortFlag = false; - }//if - }//if - if (abortFlag) { - jam(); - /* ------------------------------------------------------------------- */ - /* ELEMENT STILL REMAINS AND WE ARE TRYING TO INSERT IT AGAIN. */ - /* THIS IS CLEARLY NOT A GOOD IDEA. */ - /* ------------------------------------------------------------------- */ - operationRecPtr.p->transactionstate = WAIT_COMMIT_ABORT; - signal->theData[0] = operationRecPtr.p->userptr; - signal->theData[1] = ZWRITE_ERROR; - sendSignal(operationRecPtr.p->userblockref, GSN_ACCKEYREF, signal, - 2, JBB); - return operationRecPtr.p->elementIsDisappeared; - }//if - } - else if(operationRecPtr.p->operation == ZWRITE) - { - jam(); - operationRecPtr.p->operation = ZUPDATE; - if (operationRecPtr.p->prevParallelQue != RNIL) { - OperationrecPtr prevOpPtr; - jam(); - prevOpPtr.i = operationRecPtr.p->prevParallelQue; - ptrCheckGuard(prevOpPtr, coprecsize, operationrec); - if (prevOpPtr.p->operation == ZDELETE) - { - jam(); - operationRecPtr.p->operation = ZINSERT; - } - } - } - - if (operationRecPtr.p->operation == ZSCAN_OP && - ! operationRecPtr.p->isAccLockReq) { - jam(); - takeOutScanLockQueue(operationRecPtr.p->scanRecPtr); - putReadyScanQueue(signal, operationRecPtr.p->scanRecPtr); - } else { - jam(); - sendAcckeyconf(signal); - sendSignal(operationRecPtr.p->userblockref, GSN_ACCKEYCONF, - signal, 6, JBB); - }//if - return operationRecPtr.p->elementIsDisappeared; -}//Dbacc::executeNextOperation() +} /** * takeOutLockOwnersList @@ -3950,7 +4845,6 @@ void Dbacc::takeOutLockOwnersList(Signal* signal, { const Uint32 Tprev = outOperPtr.p->prevLockOwnerOp; const Uint32 Tnext = outOperPtr.p->nextLockOwnerOp; - #ifdef VM_TRACE // Check that operation is already in the list OperationrecPtr tmpOperPtr; @@ -3965,8 +4859,7 @@ void Dbacc::takeOutLockOwnersList(Signal* signal, ndbrequire(inList == true); #endif - ndbrequire(outOperPtr.p->lockOwner == ZTRUE); - outOperPtr.p->lockOwner = ZFALSE; + ndbassert(outOperPtr.p->m_op_bits & Operationrec::OP_LOCK_OWNER); // Fast path through the code for the common case. if ((Tprev == RNIL) && (Tnext == RNIL)) { @@ -4007,7 +4900,6 @@ void Dbacc::insertLockOwnersList(Signal* signal, const OperationrecPtr& insOperPtr) { OperationrecPtr tmpOperPtr; - #ifdef VM_TRACE // Check that operation is not already in list tmpOperPtr.i = fragrecptr.p->lockOwnersList; @@ -4017,12 +4909,12 @@ void Dbacc::insertLockOwnersList(Signal* signal, tmpOperPtr.i = tmpOperPtr.p->nextLockOwnerOp; } #endif + tmpOperPtr.i = fragrecptr.p->lockOwnersList; + + ndbrequire(! (insOperPtr.p->m_op_bits & Operationrec::OP_LOCK_OWNER)); - ndbrequire(insOperPtr.p->lockOwner == ZFALSE); - - insOperPtr.p->lockOwner = ZTRUE; + insOperPtr.p->m_op_bits |= Operationrec::OP_LOCK_OWNER; insOperPtr.p->prevLockOwnerOp = RNIL; - tmpOperPtr.i = fragrecptr.p->lockOwnersList; insOperPtr.p->nextLockOwnerOp = tmpOperPtr.i; fragrecptr.p->lockOwnersList = insOperPtr.i; @@ -5385,6 +6277,7 @@ void Dbacc::execNEXT_SCANREQ(Signal* signal) if (!scanPtr.p->scanReadCommittedFlag) { commitOperation(signal); }//if + operationRecPtr.p->m_op_bits = Operationrec::OP_INITIAL; takeOutActiveScanOp(signal); releaseOpRec(signal); scanPtr.p->scanOpsAllocated--; @@ -5400,9 +6293,10 @@ void Dbacc::execNEXT_SCANREQ(Signal* signal) jam(); fragrecptr.i = scanPtr.p->activeLocalFrag; ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec); - /* --------------------------------------------------------------------------------- */ - /* THE SCAN PROCESS IS FINISHED. RELOCK ALL LOCKED EL. RELESE ALL INVOLVED REC. */ - /* --------------------------------------------------------------------------------- */ + /* --------------------------------------------------------------------- + * THE SCAN PROCESS IS FINISHED. RELOCK ALL LOCKED EL. + * RELESE ALL INVOLVED REC. + * ------------------------------------------------------------------- */ releaseScanLab(signal); return; break; @@ -5452,24 +6346,26 @@ void Dbacc::checkNextBucketLab(Signal* signal) scanPtr.p->nextBucketIndex++; if (scanPtr.p->scanBucketState == ScanRec::SECOND_LAP) { if (scanPtr.p->nextBucketIndex > scanPtr.p->maxBucketIndexToRescan) { - /* --------------------------------------------------------------------------------- */ - // We have finished the rescan phase. We are ready to proceed with the next fragment part. - /* --------------------------------------------------------------------------------- */ + /* ---------------------------------------------------------------- */ + // We have finished the rescan phase. + // We are ready to proceed with the next fragment part. + /* ---------------------------------------------------------------- */ jam(); checkNextFragmentLab(signal); return; }//if } else if (scanPtr.p->scanBucketState == ScanRec::FIRST_LAP) { if ((fragrecptr.p->p + fragrecptr.p->maxp) < scanPtr.p->nextBucketIndex) { - /* --------------------------------------------------------------------------------- */ + /* ---------------------------------------------------------------- */ // All buckets have been scanned a first time. - /* --------------------------------------------------------------------------------- */ + /* ---------------------------------------------------------------- */ if (scanPtr.p->minBucketIndexToRescan == 0xFFFFFFFF) { jam(); - /* --------------------------------------------------------------------------------- */ - // We have not had any merges behind the scan. Thus it is not necessary to perform - // any rescan any buckets and we can proceed immediately with the next fragment part. - /* --------------------------------------------------------------------------------- */ + /* -------------------------------------------------------------- */ + // We have not had any merges behind the scan. + // Thus it is not necessary to perform any rescan any buckets + // and we can proceed immediately with the next fragment part. + /* --------------------------------------------------------------- */ checkNextFragmentLab(signal); return; } else { @@ -5561,18 +6457,23 @@ void Dbacc::checkNextBucketLab(Signal* signal) tslElementptr = tnsElementptr; setlock(signal); insertLockOwnersList(signal, operationRecPtr); + operationRecPtr.p->m_op_bits |= + Operationrec::OP_STATE_RUNNING | Operationrec::OP_RUN_QUEUE; }//if } else { arrGuard(tnsElementptr, 2048); queOperPtr.i = ElementHeader::getOpPtrI(nsPageptr.p->word32[tnsElementptr]); ptrCheckGuard(queOperPtr, coprecsize, operationrec); - if (queOperPtr.p->elementIsDisappeared == ZTRUE) { + if (queOperPtr.p->m_op_bits & Operationrec::OP_ELEMENT_DISAPPEARED || + queOperPtr.p->localdata[0] == ~(Uint32)0) + { jam(); - /* --------------------------------------------------------------------------------- */ - // If the lock owner indicates the element is disappeared then we will not report this - // tuple. We will continue with the next tuple. - /* --------------------------------------------------------------------------------- */ + /* ------------------------------------------------------------------ */ + // If the lock owner indicates the element is disappeared then + // we will not report this tuple. We will continue with the next tuple. + /* ------------------------------------------------------------------ */ + operationRecPtr.p->m_op_bits = Operationrec::OP_INITIAL; releaseOpRec(signal); scanPtr.p->scanOpsAllocated--; signal->theData[0] = scanPtr.i; @@ -5584,31 +6485,29 @@ void Dbacc::checkNextBucketLab(Signal* signal) Uint32 return_result; if (scanPtr.p->scanLockMode == ZREADLOCK) { jam(); - priPageptr = nsPageptr; - tpriElementptr = tnsElementptr; - return_result = placeReadInLockQueue(signal); + return_result = placeReadInLockQueue(queOperPtr); } else { jam(); - pwiPageptr = nsPageptr; - tpwiElementptr = tnsElementptr; - return_result = placeWriteInLockQueue(signal); + return_result = placeWriteInLockQueue(queOperPtr); }//if if (return_result == ZSERIAL_QUEUE) { - /* --------------------------------------------------------------------------------- */ - /* WE PLACED THE OPERATION INTO A SERIAL QUEUE AND THUS WE HAVE TO WAIT FOR */ - /* THE LOCK TO BE RELEASED. WE CONTINUE WITH THE NEXT ELEMENT. */ - /* --------------------------------------------------------------------------------- */ + /* ----------------------------------------------------------------- + * WE PLACED THE OPERATION INTO A SERIAL QUEUE AND THUS WE HAVE TO + * WAIT FOR THE LOCK TO BE RELEASED. WE CONTINUE WITH THE NEXT ELEMENT + * ----------------------------------------------------------------- */ putOpScanLockQue(); /* PUT THE OP IN A QUE IN THE SCAN REC */ signal->theData[0] = scanPtr.i; signal->theData[1] = AccCheckScan::ZCHECK_LCP_STOP; sendSignal(cownBlockref, GSN_ACC_CHECK_SCAN, signal, 2, JBB); return; - } else if (return_result == ZWRITE_ERROR) { + } else if (return_result != ZPARALLEL_QUEUE) { jam(); - /* --------------------------------------------------------------------------------- */ - // The tuple is either not committed yet or a delete in the same transaction (not - // possible here since we are a scan). Thus we simply continue with the next tuple. - /* --------------------------------------------------------------------------------- */ + /* ----------------------------------------------------------------- */ + // The tuple is either not committed yet or a delete in + // the same transaction (not possible here since we are a scan). + // Thus we simply continue with the next tuple. + /* ----------------------------------------------------------------- */ + operationRecPtr.p->m_op_bits = Operationrec::OP_INITIAL; releaseOpRec(signal); scanPtr.p->scanOpsAllocated--; signal->theData[0] = scanPtr.i; @@ -5619,10 +6518,11 @@ void Dbacc::checkNextBucketLab(Signal* signal) ndbassert(return_result == ZPARALLEL_QUEUE); }//if }//if - /* --------------------------------------------------------------------------------- */ - // Committed read proceed without caring for locks immediately down here except when - // the tuple was deleted permanently and no new operation has inserted it again. - /* --------------------------------------------------------------------------------- */ + /* ----------------------------------------------------------------------- */ + // Committed read proceed without caring for locks immediately + // down here except when the tuple was deleted permanently + // and no new operation has inserted it again. + /* ----------------------------------------------------------------------- */ putActiveScanOp(signal); sendNextScanConf(signal); return; @@ -5646,14 +6546,15 @@ void Dbacc::initScanFragmentPart(Signal* signal) DirRangePtr cnfDirRangePtr; DirectoryarrayPtr cnfDirptr; Page8Ptr cnfPageidptr; - /* --------------------------------------------------------------------------------- */ + /* ----------------------------------------------------------------------- */ // Set the active fragment part. // Set the current bucket scanned to the first. // Start with the first lap. // Remember the number of buckets at start of the scan. - // Set the minimum and maximum to values that will always be smaller and larger than. + // Set the minimum and maximum to values that will always be smaller and + // larger than. // Reset the scan indicator on the first bucket. - /* --------------------------------------------------------------------------------- */ + /* ----------------------------------------------------------------------- */ scanPtr.p->activeLocalFrag = fragrecptr.i; scanPtr.p->nextBucketIndex = 0; /* INDEX OF SCAN BUCKET */ scanPtr.p->scanBucketState = ScanRec::FIRST_LAP; @@ -5672,11 +6573,11 @@ void Dbacc::initScanFragmentPart(Signal* signal) releaseScanBucket(signal); }//Dbacc::initScanFragmentPart() -/* --------------------------------------------------------------------------------- */ -/* FLAG = 6 = ZCOPY_CLOSE THE SCAN PROCESS IS READY OR ABORTED. ALL OPERATION IN THE */ -/* ACTIVE OR WAIT QUEUE ARE RELEASED, SCAN FLAG OF ROOT FRAG IS RESET AND THE SCAN */ -/* RECORD IS RELEASED. */ -/* --------------------------------------------------------------------------------- */ +/* ------------------------------------------------------------------------- + * FLAG = 6 = ZCOPY_CLOSE THE SCAN PROCESS IS READY OR ABORTED. + * ALL OPERATION IN THE ACTIVE OR WAIT QUEUE ARE RELEASED, + * SCAN FLAG OF ROOT FRAG IS RESET AND THE SCAN RECORD IS RELEASED. + * ------------------------------------------------------------------------ */ void Dbacc::releaseScanLab(Signal* signal) { releaseAndCommitActiveOps(signal); @@ -5715,8 +6616,17 @@ void Dbacc::releaseAndCommitActiveOps(Signal* signal) ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec); if (!scanPtr.p->scanReadCommittedFlag) { jam(); - commitOperation(signal); + if ((operationRecPtr.p->m_op_bits & Operationrec::OP_STATE_MASK) == + Operationrec::OP_STATE_EXECUTED) + { + commitOperation(signal); + } + else + { + abortOperation(signal); + } }//if + operationRecPtr.p->m_op_bits = Operationrec::OP_INITIAL; takeOutActiveScanOp(signal); releaseOpRec(signal); scanPtr.p->scanOpsAllocated--; @@ -5737,8 +6647,17 @@ void Dbacc::releaseAndCommitQueuedOps(Signal* signal) ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec); if (!scanPtr.p->scanReadCommittedFlag) { jam(); - commitOperation(signal); + if ((operationRecPtr.p->m_op_bits & Operationrec::OP_STATE_MASK) == + Operationrec::OP_STATE_EXECUTED) + { + commitOperation(signal); + } + else + { + abortOperation(signal); + } }//if + operationRecPtr.p->m_op_bits = Operationrec::OP_INITIAL; takeOutReadyScanQueue(signal); releaseOpRec(signal); scanPtr.p->scanOpsAllocated--; @@ -5761,6 +6680,7 @@ void Dbacc::releaseAndAbortLockedOps(Signal* signal) { abortOperation(signal); }//if takeOutScanLockQueue(scanPtr.i); + operationRecPtr.p->m_op_bits = Operationrec::OP_INITIAL; releaseOpRec(signal); scanPtr.p->scanOpsAllocated--; operationRecPtr.i = trsoOperPtr.i; @@ -5795,9 +6715,11 @@ void Dbacc::execACC_CHECK_SCAN(Signal* signal) takeOutReadyScanQueue(signal); fragrecptr.i = operationRecPtr.p->fragptr; ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec); - if (operationRecPtr.p->elementIsDisappeared == ZTRUE) { + if (operationRecPtr.p->m_op_bits & Operationrec::OP_ELEMENT_DISAPPEARED) + { jam(); abortOperation(signal); + operationRecPtr.p->m_op_bits = Operationrec::OP_INITIAL; releaseOpRec(signal); scanPtr.p->scanOpsAllocated--; continue; @@ -5884,7 +6806,8 @@ void Dbacc::execACC_TO_REQ(Signal* signal) jamEntry(); tatrOpPtr.i = signal->theData[1]; /* OPER PTR OF ACC */ ptrCheckGuard(tatrOpPtr, coprecsize, operationrec); - if (tatrOpPtr.p->operation == ZSCAN_OP) { + if ((tatrOpPtr.p->m_op_bits & Operationrec::OP_MASK) == ZSCAN_OP) + { tatrOpPtr.p->transId1 = signal->theData[2]; tatrOpPtr.p->transId2 = signal->theData[3]; } else { @@ -5981,29 +6904,28 @@ void Dbacc::initScanOpRec(Signal* signal) scanPtr.p->scanOpsAllocated++; + Uint32 opbits = 0; + opbits |= ZSCAN_OP; + opbits |= scanPtr.p->scanLockMode ? Operationrec::OP_LOCK_MODE : 0; + opbits |= scanPtr.p->scanLockMode ? Operationrec::OP_ACC_LOCK_MODE : 0; + opbits |= scanPtr.p->scanReadCommittedFlag ? + Operationrec::OP_EXECUTED_DIRTY_READ : 0; + opbits |= Operationrec::OP_COMMIT_DELETE_CHECK; operationRecPtr.p->userptr = RNIL; operationRecPtr.p->scanRecPtr = scanPtr.i; - operationRecPtr.p->operation = ZSCAN_OP; - operationRecPtr.p->transactionstate = ACTIVE; - operationRecPtr.p->commitDeleteCheckFlag = ZFALSE; - operationRecPtr.p->lockMode = scanPtr.p->scanLockMode; operationRecPtr.p->fid = fragrecptr.p->myfid; operationRecPtr.p->fragptr = fragrecptr.i; - operationRecPtr.p->elementIsDisappeared = ZFALSE; operationRecPtr.p->nextParallelQue = RNIL; operationRecPtr.p->prevParallelQue = RNIL; operationRecPtr.p->nextSerialQue = RNIL; operationRecPtr.p->prevSerialQue = RNIL; operationRecPtr.p->transId1 = scanPtr.p->scanTrid1; operationRecPtr.p->transId2 = scanPtr.p->scanTrid2; - operationRecPtr.p->lockOwner = ZFALSE; - operationRecPtr.p->dirtyRead = 0; - operationRecPtr.p->nodeType = 0; // Not a stand-by node operationRecPtr.p->elementIsforward = tisoIsforward; operationRecPtr.p->elementContainer = tisoContainerptr; operationRecPtr.p->elementPointer = tisoElementptr; operationRecPtr.p->elementPage = isoPageptr.i; - operationRecPtr.p->isAccLockReq = ZFALSE; + operationRecPtr.p->m_op_bits = opbits; tisoLocalPtr = tisoElementptr + tisoIsforward; guard24 = fragrecptr.p->localkeylen - 1; for (tisoTmp = 0; tisoTmp <= guard24; tisoTmp++) { @@ -6868,14 +7790,12 @@ void Dbacc::releaseOpRec(Signal* signal) } ndbrequire(opInList == false); #endif - ndbrequire(operationRecPtr.p->lockOwner == ZFALSE); + ndbrequire(operationRecPtr.p->m_op_bits == Operationrec::OP_INITIAL); operationRecPtr.p->nextOp = cfreeopRec; cfreeopRec = operationRecPtr.i; /* UPDATE FREE LIST OF OP RECORDS */ operationRecPtr.p->prevOp = RNIL; - operationRecPtr.p->opState = FREE_OP; - operationRecPtr.p->transactionstate = IDLE; - operationRecPtr.p->operation = ZUNDEFINED_OP; + operationRecPtr.p->m_op_bits = Operationrec::OP_INITIAL; }//Dbacc::releaseOpRec() /* --------------------------------------------------------------------------------- */ @@ -7377,8 +8297,8 @@ Dbacc::execDUMP_STATE_ORD(Signal* signal) OperationrecPtr tmpOpPtr; tmpOpPtr.i = recordNo; ptrAss(tmpOpPtr, operationrec); - infoEvent("Dbacc::operationrec[%d]: opState=%d, transid(0x%x, 0x%x)", - tmpOpPtr.i, tmpOpPtr.p->opState, tmpOpPtr.p->transId1, + infoEvent("Dbacc::operationrec[%d]: transid(0x%x, 0x%x)", + tmpOpPtr.i, tmpOpPtr.p->transId1, tmpOpPtr.p->transId2); infoEvent("elementIsforward=%d, elementPage=%d, elementPointer=%d ", tmpOpPtr.p->elementIsforward, tmpOpPtr.p->elementPage, @@ -7386,8 +8306,7 @@ Dbacc::execDUMP_STATE_ORD(Signal* signal) infoEvent("fid=%d, fragptr=%d, hashvaluePart=%d ", tmpOpPtr.p->fid, tmpOpPtr.p->fragptr, tmpOpPtr.p->hashvaluePart); - infoEvent("hashValue=%d, insertDeleteLen=%d", - tmpOpPtr.p->hashValue, tmpOpPtr.p->insertDeleteLen); + infoEvent("hashValue=%d", tmpOpPtr.p->hashValue); infoEvent("nextLockOwnerOp=%d, nextOp=%d, nextParallelQue=%d ", tmpOpPtr.p->nextLockOwnerOp, tmpOpPtr.p->nextOp, tmpOpPtr.p->nextParallelQue); @@ -7398,15 +8317,8 @@ Dbacc::execDUMP_STATE_ORD(Signal* signal) tmpOpPtr.p->prevLockOwnerOp, tmpOpPtr.p->nextParallelQue); infoEvent("prevSerialQue=%d, scanRecPtr=%d", tmpOpPtr.p->prevSerialQue, tmpOpPtr.p->scanRecPtr); - infoEvent("transactionstate=%d, elementIsDisappeared=%d, insertIsDone=%d ", - tmpOpPtr.p->transactionstate, tmpOpPtr.p->elementIsDisappeared, - tmpOpPtr.p->insertIsDone); - infoEvent("lockMode=%d, lockOwner=%d, nodeType=%d ", - tmpOpPtr.p->lockMode, tmpOpPtr.p->lockOwner, - tmpOpPtr.p->nodeType); - infoEvent("operation=%d, opSimple=%d, dirtyRead=%d,scanBits=%d ", - tmpOpPtr.p->operation, tmpOpPtr.p->opSimple, - tmpOpPtr.p->dirtyRead, tmpOpPtr.p->scanBits); + infoEvent("m_op_bits=0x%x, scanBits=%d ", + tmpOpPtr.p->m_op_bits, tmpOpPtr.p->scanBits); return; } diff --git a/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp b/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp index 407cd8074ab..d228d8ae819 100644 --- a/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp +++ b/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp @@ -33,8 +33,6 @@ // primary key is stored in TUP #include "../dbtup/Dbtup.hpp" -#include "../dbacc/Dbacc.hpp" - #ifdef DBLQH_C // Constants /* ------------------------------------------------------------------------- */ @@ -2556,8 +2554,19 @@ private: Dbtup* c_tup; Dbacc* c_acc; + + /** + * Read primary key from tup + */ Uint32 readPrimaryKeys(ScanRecord*, TcConnectionrec*, Uint32 * dst); + /** + * Read primary key from operation + */ +public: + Uint32 readPrimaryKeys(Uint32 opPtrI, Uint32 * dst, bool xfrm); +private: + void acckeyconf_tupkeyreq(Signal*, TcConnectionrec*, Fragrecord*, Uint32, Uint32); void acckeyconf_load_diskpage(Signal*,TcConnectionrecPtr,Fragrecord*,Uint32); @@ -2924,6 +2933,11 @@ public: } DLHashTable c_scanTakeOverHash; + +#ifdef ERROR_INSERT + inline bool TRACE_OP_CHECK(const TcConnectionrec* regTcPtr); + void TRACE_OP_DUMP(const TcConnectionrec* regTcPtr, const char * pos); +#endif }; inline @@ -2991,10 +3005,19 @@ Dblqh::accminupdate(Signal* signal, Uint32 opId, const Local_key* key) signal->theData[1] = key->m_page_no << MAX_TUPLES_BITS | key->m_page_idx; c_acc->execACCMINUPDATE(signal); - if (ERROR_INSERTED(5712)) + if (ERROR_INSERTED(5712) || ERROR_INSERTED(5713)) ndbout << " LK: " << *key; regTcPtr.p->m_row_id = *key; } +inline +bool +Dblqh::TRACE_OP_CHECK(const TcConnectionrec* regTcPtr) +{ + return (ERROR_INSERTED(5712) && + (regTcPtr->operation == ZINSERT || + regTcPtr->operation == ZDELETE)) || + ERROR_INSERTED(5713); +} #endif diff --git a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp index d031f9a00bf..18e5e6cd585 100644 --- a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp +++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp @@ -67,48 +67,70 @@ // seen only when we debug the product #ifdef VM_TRACE #define DEBUG(x) ndbout << "DBLQH: "<< x << endl; +static NdbOut & operator<<(NdbOut& out, Dblqh::TcConnectionrec::TransactionState state){ out << (int)state; return out; } +static NdbOut & operator<<(NdbOut& out, Dblqh::TcConnectionrec::LogWriteState state){ out << (int)state; return out; } +static NdbOut & operator<<(NdbOut& out, Dblqh::TcConnectionrec::ListState state){ out << (int)state; return out; } +static NdbOut & operator<<(NdbOut& out, Dblqh::TcConnectionrec::AbortState state){ out << (int)state; return out; } +static NdbOut & operator<<(NdbOut& out, Dblqh::ScanRecord::ScanState state){ out << (int)state; return out; } +static NdbOut & operator<<(NdbOut& out, Dblqh::LogFileOperationRecord::LfoState state){ out << (int)state; return out; } +static NdbOut & operator<<(NdbOut& out, Dblqh::ScanRecord::ScanType state){ out << (int)state; return out; } +static +NdbOut & +operator<<(NdbOut& out, Operation_t op) +{ + switch(op){ + case ZREAD: out << "READ"; break; + case ZREAD_EX: out << "READ-EX"; break; + case ZINSERT: out << "INSERT"; break; + case ZUPDATE: out << "UPDATE"; break; + case ZDELETE: out << "DELETE"; break; + case ZWRITE: out << "WRITE"; break; + } + return out; +} + #else #define DEBUG(x) #endif @@ -120,7 +142,7 @@ const Uint32 NR_ScanNo = 0; #if defined VM_TRACE || defined ERROR_INSERT || defined NDBD_TRACENR #include -NdbOut * tracenrout = 0; +static NdbOut * tracenrout = 0; static int TRACENR_FLAG = 0; #define TRACENR(x) (* tracenrout) << x #define SET_TRACENR_FLAG TRACENR_FLAG = 1 @@ -132,6 +154,13 @@ static int TRACENR_FLAG = 0; #define CLEAR_TRACENR_FLAG #endif +#ifdef ERROR_INSERT +static NdbOut * traceopout = 0; +#define TRACE_OP(regTcPtr, place) do { if (TRACE_OP_CHECK(regTcPtr)) TRACE_OP_DUMP(regTcPtr, place); } while(0) +#else +#define TRACE_OP(x) {} +#endif + /* ------------------------------------------------------------------------- */ /* ------- SEND SYSTEM ERROR ------- */ /* */ @@ -454,6 +483,10 @@ void Dblqh::execSTTOR(Signal* signal) name = NdbConfig_SignalLogFileName(getOwnNodeId()); tracenrout = new NdbOut(* new FileOutputStream(fopen(name, "w+"))); #endif + +#ifdef ERROR_INSERT + traceopout = &ndbout; +#endif return; break; @@ -2531,14 +2564,15 @@ void Dblqh::execTUPKEYCONF(Signal* signal) case TcConnectionrec::WAIT_ACC_ABORT: case TcConnectionrec::ABORT_QUEUED: jam(); -/* -------------------------------------------------------------------------- */ -/* IGNORE SINCE ABORT OF THIS OPERATION IS ONGOING ALREADY. */ -/* -------------------------------------------------------------------------- */ +/* ------------------------------------------------------------------------- */ +/* IGNORE SINCE ABORT OF THIS OPERATION IS ONGOING ALREADY. */ +/* ------------------------------------------------------------------------- */ break; default: ndbrequire(false); break; }//switch + }//Dblqh::execTUPKEYCONF() /* ************> */ @@ -2560,6 +2594,8 @@ void Dblqh::execTUPKEYREF(Signal* signal) c_fragment_pool.getPtr(regFragptr); fragptr = regFragptr; + TRACE_OP(regTcPtr, "TUPKEYREF"); + if (unlikely(activeCreat == Fragrecord::AC_NR_COPY)) { jam(); @@ -2568,7 +2604,12 @@ void Dblqh::execTUPKEYREF(Signal* signal) ndbassert(regTcPtr->transactionState == TcConnectionrec::WAIT_TUP || regTcPtr->transactionState ==TcConnectionrec::WAIT_TUP_TO_ABORT); } - + else if (getNodeState().startLevel == NodeState::SL_STARTED) + { + if (terrorCode == 899) + ndbout << "899: " << regTcPtr->m_row_id << endl; + } + switch (tcConnectptr.p->transactionState) { case TcConnectionrec::WAIT_TUP: jam(); @@ -3606,57 +3647,7 @@ void Dblqh::endgettupkeyLab(Signal* signal) regTcPtr->transactionState = TcConnectionrec::WAIT_ATTR; return; }//if -//#define TRACE_LQHKEYREQ -#ifdef TRACE_LQHKEYREQ - { - ndbout << (regTcPtr->operation == ZREAD ? "READ" : - regTcPtr->operation == ZUPDATE ? "UPDATE" : - regTcPtr->operation == ZINSERT ? "INSERT" : - regTcPtr->operation == ZDELETE ? "DELETE" : "") - << "(" << (int)regTcPtr->operation << ")" - << " from=(" << getBlockName(refToBlock(regTcPtr->clientBlockref)) - << ", " << refToNode(regTcPtr->clientBlockref) << ")" - << " table=" << regTcPtr->tableref << " "; - - ndbout << "hash: " << hex << regTcPtr->hashValue << endl; - - ndbout << "key=[" << hex; - Uint32 i; - for(i = 0; iprimKeyLen && i < 4; i++){ - ndbout << hex << regTcPtr->tupkeyData[i] << " "; - } - - DatabufPtr regDatabufptr; - regDatabufptr.i = regTcPtr->firstTupkeybuf; - while(i < regTcPtr->primKeyLen) - { - ptrCheckGuard(regDatabufptr, cdatabufFileSize, databuf); - for(Uint32 j = 0; j<4 && iprimKeyLen; j++, i++) - ndbout << hex << regDatabufptr.p->data[j] << " "; - } - ndbout << "]" << endl; - - ndbout << "attr=[" << hex; - for(i = 0; ireclenAiLqhkey && i < 5; i++) - ndbout << hex << regTcPtr->firstAttrinfo[i] << " "; - - AttrbufPtr regAttrinbufptr; - regAttrinbufptr.i= regTcPtr->firstAttrinbuf; - while(i < regTcPtr->totReclenAi) - { - ptrCheckGuard(regAttrinbufptr, cattrinbufFileSize, attrbuf); - Uint32 dataLen = regAttrinbufptr.p->attrbuf[ZINBUF_DATA_LEN]; - ndbrequire(dataLen != 0); - ndbrequire(i + dataLen <= regTcPtr->totReclenAi); - for(Uint32 j= 0; jattrbuf[j] << " "; - - regAttrinbufptr.i = regAttrinbufptr.p->attrbuf[ZINBUF_NEXT]; - } - - ndbout << "]" << endl; - } -#endif + /* ---------------------------------------------------------------------- */ /* NOW RECEPTION OF LQHKEYREQ IS COMPLETED THE NEXT STEP IS TO START*/ /* PROCESSING THE MESSAGE. IF THE MESSAGE IS TO A STAND-BY NODE */ @@ -3763,6 +3754,7 @@ void Dblqh::prepareContinueAfterBlockedLab(Signal* signal) /* ----------------------------------------------------------------- */ if (TRACENR_FLAG) { + TRACE_OP(regTcPtr, "RECEIVED"); switch (regTcPtr->operation) { case ZREAD: TRACENR("READ"); break; case ZUPDATE: TRACENR("UPDATE"); break; @@ -3847,6 +3839,9 @@ Dblqh::exec_acckeyreq(Signal* signal, TcConnectionrecPtr regTcPtr) signal->theData[8] = sig2; signal->theData[9] = sig3; signal->theData[10] = sig4; + + TRACE_OP(regTcPtr.p, "ACC"); + if (regTcPtr.p->primKeyLen > 4) { sendKeyinfoAcc(signal, 11); }//if @@ -4133,7 +4128,7 @@ Dblqh::nr_copy_delete_row(Signal* signal, jam(); ndbrequire(rowid == 0); signal->theData[0] = accPtr; - signal->theData[1] = false; + signal->theData[1] = 0; EXECUTE_DIRECT(ref, GSN_ACC_ABORTREQ, signal, 2); jamEntry(); return; @@ -4144,16 +4139,18 @@ Dblqh::nr_copy_delete_row(Signal* signal, */ ndbrequire(regTcPtr.p->m_dealloc == 0); Local_key save = regTcPtr.p->m_row_id; - signal->theData[0] = regTcPtr.p->accConnectrec; + + c_acc->execACCKEY_ORD(signal, accPtr); + signal->theData[0] = accPtr; EXECUTE_DIRECT(ref, GSN_ACC_COMMITREQ, signal, 1); jamEntry(); - + ndbrequire(regTcPtr.p->m_dealloc == 1); int ret = c_tup->nr_delete(signal, regTcPtr.i, fragPtr.p->tupFragptr, ®TcPtr.p->m_row_id, regTcPtr.p->gci); jamEntry(); - + if (ret) { ndbassert(ret == 1); @@ -4167,7 +4164,7 @@ Dblqh::nr_copy_delete_row(Signal* signal, } TRACENR("DELETED: " << regTcPtr.p->m_row_id << endl); - + regTcPtr.p->m_dealloc = 0; regTcPtr.p->m_row_id = save; fragptr = fragPtr; @@ -4274,6 +4271,45 @@ Dblqh::nr_delete_complete(Signal* signal, Nr_op_info* op) } } +Uint32 +Dblqh::readPrimaryKeys(Uint32 opPtrI, Uint32 * dst, bool xfrm) +{ + TcConnectionrecPtr regTcPtr; + DatabufPtr regDatabufptr; + Uint64 Tmp[MAX_KEY_SIZE_IN_WORDS >> 1]; + + jamEntry(); + regTcPtr.i = opPtrI; + ptrCheckGuard(regTcPtr, ctcConnectrecFileSize, tcConnectionrec); + + Uint32 tableId = regTcPtr.p->tableref; + Uint32 keyLen = regTcPtr.p->primKeyLen; + regDatabufptr.i = regTcPtr.p->firstTupkeybuf; + Uint32 * tmp = xfrm ? (Uint32*)Tmp : dst; + + memcpy(tmp, regTcPtr.p->tupkeyData, sizeof(regTcPtr.p->tupkeyData)); + if (keyLen > 4) + { + tmp += 4; + Uint32 pos = 4; + do { + ptrCheckGuard(regDatabufptr, cdatabufFileSize, databuf); + memcpy(tmp, regDatabufptr.p->data, sizeof(regDatabufptr.p->data)); + regDatabufptr.i = regDatabufptr.p->nextDatabuf; + tmp += sizeof(regDatabufptr.p->data) >> 2; + pos += sizeof(regDatabufptr.p->data) >> 2; + } while(pos < keyLen); + } + + if (xfrm) + { + jam(); + Uint32 keyPartLen[MAX_ATTRIBUTES_IN_INDEX]; + return xfrm_key(tableId, (Uint32*)Tmp, dst, ~0, keyPartLen); + } + + return keyLen; +} /* =*======================================================================= */ /* ======= SEND KEYINFO TO ACC ======= */ @@ -4447,10 +4483,6 @@ Dblqh::acckeyconf_tupkeyreq(Signal* signal, TcConnectionrec* regTcPtr, * ----------------------------------------------------------------------- */ Uint32 page_idx = local_key & MAX_TUPLES_PER_PAGE; Uint32 page_no = local_key >> MAX_TUPLES_BITS; -#ifdef TRACE_LQHKEYREQ - ndbout << "localkey: [ " << hex << page_no << " " << page_idx << "]" - << endl; -#endif Uint32 Ttupreq = regTcPtr->dirtyOp; Ttupreq = Ttupreq + (regTcPtr->opSimple << 1); Ttupreq = Ttupreq + (op << 6); @@ -4506,70 +4538,13 @@ Dblqh::acckeyconf_tupkeyreq(Signal* signal, TcConnectionrec* regTcPtr, tupKeyReq->m_row_id_page_no = sig0; tupKeyReq->m_row_id_page_idx = sig1; - if (ERROR_INSERTED(5712) && regTcPtr->operation == ZINSERT) - { - ndbout << "INSERT " << regFragptrP->tabRef - << "(" << regFragptrP->fragId << ")"; - - { - ndbout << "key=[" << hex; - Uint32 i; - for(i = 0; iprimKeyLen && i < 4; i++){ - ndbout << hex << regTcPtr->tupkeyData[i] << " "; - } - - DatabufPtr regDatabufptr; - regDatabufptr.i = regTcPtr->firstTupkeybuf; - while(i < regTcPtr->primKeyLen) - { - ptrCheckGuard(regDatabufptr, cdatabufFileSize, databuf); - for(Uint32 j = 0; j<4 && iprimKeyLen; j++, i++) - ndbout << hex << regDatabufptr.p->data[j] << " "; - } - ndbout << "] "; - } - - if(regTcPtr->m_use_rowid) - ndbout << " " << regTcPtr->m_row_id; - } - - if (ERROR_INSERTED(5712) && regTcPtr->operation == ZDELETE) - { - Local_key lk; lk.assref(local_key); - - ndbout << "DELETE " << regFragptrP->tabRef - << "(" << regFragptrP->fragId << ") " << lk; - - { - ndbout << "key=[" << hex; - Uint32 i; - for(i = 0; iprimKeyLen && i < 4; i++){ - ndbout << hex << regTcPtr->tupkeyData[i] << " "; - } - - DatabufPtr regDatabufptr; - regDatabufptr.i = regTcPtr->firstTupkeybuf; - while(i < regTcPtr->primKeyLen) - { - ptrCheckGuard(regDatabufptr, cdatabufFileSize, databuf); - for(Uint32 j = 0; j<4 && iprimKeyLen; j++, i++) - ndbout << hex << regDatabufptr.p->data[j] << " "; - } - ndbout << "]" << endl; - } - - } - + TRACE_OP(regTcPtr, "TUPKEYREQ"); + regTcPtr->m_use_rowid |= (op == ZINSERT); regTcPtr->m_row_id.m_page_no = page_no; regTcPtr->m_row_id.m_page_idx = page_idx; EXECUTE_DIRECT(tup, GSN_TUPKEYREQ, signal, TupKeyReq::SignalLength); - - if (ERROR_INSERTED(5712) && regTcPtr->operation == ZINSERT) - { - ndbout << endl; - } }//Dblqh::execACCKEYCONF() void @@ -4654,27 +4629,37 @@ void Dblqh::tupkeyConfLab(Signal* signal) const TupKeyConf * const tupKeyConf = (TupKeyConf *)&signal->theData[0]; TcConnectionrec * const regTcPtr = tcConnectptr.p; Uint32 activeCreat = regTcPtr->activeCreat; + Uint32 readLen = tupKeyConf->readLength; + Uint32 writeLen = tupKeyConf->writeLength; + Uint32 accOp = regTcPtr->accConnectrec; + c_acc->execACCKEY_ORD(signal, accOp); + + TRACE_OP(regTcPtr, "TUPKEYCONF"); + if (regTcPtr->simpleRead) { jam(); /* ---------------------------------------------------------------------- - * THE OPERATION IS A SIMPLE READ. WE WILL IMMEDIATELY COMMIT THE OPERATION. - * SINCE WE HAVE NOT RELEASED THE FRAGMENT LOCK (FOR LOCAL CHECKPOINTS) YET + * THE OPERATION IS A SIMPLE READ. + * WE WILL IMMEDIATELY COMMIT THE OPERATION. + * SINCE WE HAVE NOT RELEASED THE FRAGMENT LOCK + * (FOR LOCAL CHECKPOINTS) YET * WE CAN GO IMMEDIATELY TO COMMIT_CONTINUE_AFTER_BLOCKED. - * WE HAVE ALREADY SENT THE RESPONSE SO WE ARE NOT INTERESTED IN READ LENGTH - * ---------------------------------------------------------------------- */ + * WE HAVE ALREADY SENT THE RESPONSE SO WE ARE NOT INTERESTED IN + * READ LENGTH + * --------------------------------------------------------------------- */ commitContinueAfterBlockedLab(signal); return; }//if - if (tupKeyConf->readLength != 0) { + if (readLen != 0) + { jam(); /* SET BIT 15 IN REQINFO */ LqhKeyReq::setApplicationAddressFlag(regTcPtr->reqinfo, 1); - - regTcPtr->readlenAi = tupKeyConf->readLength; + regTcPtr->readlenAi = readLen; }//if - regTcPtr->totSendlenAi = tupKeyConf->writeLength; + regTcPtr->totSendlenAi = writeLen; ndbrequire(regTcPtr->totSendlenAi == regTcPtr->currTupAiLen); if (unlikely(activeCreat == Fragrecord::AC_NR_COPY)) @@ -5597,6 +5582,8 @@ void Dblqh::releaseOprec(Signal* signal) if (TRACENR_FLAG) TRACENR("DELETED: " << regTcPtr->m_row_id << endl); + + TRACE_OP(regTcPtr, "DEALLOC"); signal->theData[0] = regTcPtr->fragmentid; signal->theData[1] = regTcPtr->tableref; @@ -5818,6 +5805,10 @@ void Dblqh::execCOMMIT(Signal* signal) ptrAss(tcConnectptr, regTcConnectionrec); if ((tcConnectptr.p->transid[0] == transid1) && (tcConnectptr.p->transid[1] == transid2)) { + + TcConnectionrec * const regTcPtr = tcConnectptr.p; + TRACE_OP(regTcPtr, "COMMIT"); + commitReqLab(signal, gci); return; }//if @@ -5937,6 +5928,10 @@ void Dblqh::execCOMPLETE(Signal* signal) if ((tcConnectptr.p->transactionState == TcConnectionrec::COMMITTED) && (tcConnectptr.p->transid[0] == transid1) && (tcConnectptr.p->transid[1] == transid2)) { + + TcConnectionrec * const regTcPtr = tcConnectptr.p; + TRACE_OP(regTcPtr, "COMPLETE"); + if (tcConnectptr.p->seqNoReplica != 0 && tcConnectptr.p->activeCreat == Fragrecord::AC_NORMAL) { jam(); @@ -6313,12 +6308,16 @@ void Dblqh::commitContinueAfterBlockedLab(Signal* signal) TRACENR(endl); } + TRACE_OP(regTcPtr.p, "ACC_COMMITREQ"); + Uint32 acc = refToBlock(regTcPtr.p->tcAccBlockref); signal->theData[0] = regTcPtr.p->accConnectrec; EXECUTE_DIRECT(acc, GSN_ACC_COMMITREQ, signal, 1); } else { if(!dirtyOp){ + TRACE_OP(regTcPtr.p, "ACC_COMMITREQ"); + Uint32 acc = refToBlock(regTcPtr.p->tcAccBlockref); signal->theData[0] = regTcPtr.p->accConnectrec; EXECUTE_DIRECT(acc, GSN_ACC_COMMITREQ, signal, 1); @@ -6362,6 +6361,8 @@ Dblqh::tupcommit_conf_callback(Signal* signal, Uint32 tcPtrI) c_fragment_pool.getPtr(regFragptr); fragptr = regFragptr; + TRACE_OP(tcPtr, "ACC_COMMITREQ"); + Uint32 acc = refToBlock(tcPtr->tcAccBlockref); signal->theData[0] = tcPtr->accConnectrec; EXECUTE_DIRECT(acc, GSN_ACC_COMMITREQ, signal, 1); @@ -6670,6 +6671,8 @@ void Dblqh::execABORT(Signal* signal) regTcPtr->commitAckMarker = RNIL; } + TRACE_OP(regTcPtr, "ABORT"); + abortStateHandlerLab(signal); return; @@ -7087,23 +7090,30 @@ void Dblqh::abortContinueAfterBlockedLab(Signal* signal, bool canBlock) * ALSO AS PART OF A NORMAL ABORT WITHOUT BLOCKING. * WE MUST ABORT TUP BEFORE ACC TO ENSURE THAT NO ONE RACES IN * AND SEES A STATE IN TUP. - * ------------------------------------------------------------------------ */ + * ----------------------------------------------------------------------- */ TcConnectionrec * const regTcPtr = tcConnectptr.p; - fragptr.i = regTcPtr->fragmentptr; - c_fragment_pool.getPtr(fragptr); - signal->theData[0] = regTcPtr->tupConnectrec; - EXECUTE_DIRECT(DBTUP, GSN_TUP_ABORTREQ, signal, 1); + + TRACE_OP(regTcPtr, "ACC ABORT"); + regTcPtr->transactionState = TcConnectionrec::WAIT_ACC_ABORT; signal->theData[0] = regTcPtr->accConnectrec; - signal->theData[1] = true; + signal->theData[1] = 2; // JOB BUFFER IF NEEDED EXECUTE_DIRECT(DBACC, GSN_ACC_ABORTREQ, signal, 2); - /* ------------------------------------------------------------------------ - * We need to insert a real-time break by sending ACC_ABORTCONF through the - * job buffer to ensure that we catch any ACCKEYCONF or TUPKEYCONF or - * TUPKEYREF that are in the job buffer but not yet processed. Doing - * everything without that would race and create a state error when they - * are executed. - * ----------------------------------------------------------------------- */ + + if (signal->theData[1] == RNIL) + { + jam(); + /* ------------------------------------------------------------------------ + * We need to insert a real-time break by sending ACC_ABORTCONF through the + * job buffer to ensure that we catch any ACCKEYCONF or TUPKEYCONF or + * TUPKEYREF that are in the job buffer but not yet processed. Doing + * everything without that would race and create a state error when they + * are executed. + * --------------------------------------------------------------------- */ + return; + } + + execACC_ABORTCONF(signal); return; }//Dblqh::abortContinueAfterBlockedLab() @@ -7117,6 +7127,11 @@ void Dblqh::execACC_ABORTCONF(Signal* signal) ptrCheckGuard(tcConnectptr, ctcConnectrecFileSize, tcConnectionrec); TcConnectionrec * const regTcPtr = tcConnectptr.p; ndbrequire(regTcPtr->transactionState == TcConnectionrec::WAIT_ACC_ABORT); + + TRACE_OP(regTcPtr, "ACC_ABORTCONF"); + signal->theData[0] = regTcPtr->tupConnectrec; + EXECUTE_DIRECT(DBTUP, GSN_TUP_ABORTREQ, signal, 1); + continueAbortLab(signal); return; }//Dblqh::execACC_ABORTCONF() @@ -8837,7 +8852,9 @@ void Dblqh::nextScanConfScanLab(Signal* signal) }//if // If accOperationPtr == RNIL no record was returned by ACC - if (nextScanConf->accOperationPtr == RNIL) { + Uint32 accOpPtr = nextScanConf->accOperationPtr; + if (accOpPtr == RNIL) + { jam(); /************************************************************* * STOP THE SCAN PROCESS IF THIS HAS BEEN REQUESTED. @@ -8871,7 +8888,8 @@ void Dblqh::nextScanConfScanLab(Signal* signal) jam(); set_acc_ptr_in_scan_record(scanptr.p, scanptr.p->m_curr_batch_size_rows, - nextScanConf->accOperationPtr); + accOpPtr); + jam(); nextScanConfLoopLab(signal); }//Dblqh::nextScanConfScanLab() @@ -9071,12 +9089,24 @@ void Dblqh::scanTupkeyConfLab(Signal* signal) tcConnectptr.p->transactionState = TcConnectionrec::SCAN_STATE_USED; scanptr.i = tcConnectptr.p->tcScanRec; c_scanRecordPool.getPtr(scanptr); + + Uint32 rows = scanptr.p->m_curr_batch_size_rows; + Uint32 accOpPtr= get_acc_ptr_from_scan_record(scanptr.p, rows, false); + if (accOpPtr != (Uint32)-1) + { + c_acc->execACCKEY_ORD(signal, accOpPtr); + } + else + { + ndbassert(refToBlock(scanptr.p->scanBlockref) != DBACC); + } + if (scanptr.p->scanCompletedStatus == ZTRUE) { /* --------------------------------------------------------------------- * STOP THE SCAN PROCESS IF THIS HAS BEEN REQUESTED. * --------------------------------------------------------------------- */ - if ((scanptr.p->scanLockHold == ZTRUE) && - (scanptr.p->m_curr_batch_size_rows > 0)) { + if ((scanptr.p->scanLockHold == ZTRUE) && rows) + { jam(); scanptr.p->scanReleaseCounter = 1; scanReleaseLocksLab(signal); @@ -9093,7 +9123,7 @@ void Dblqh::scanTupkeyConfLab(Signal* signal) }//if ndbrequire(scanptr.p->m_curr_batch_size_rows < MAX_PARALLEL_OP_PER_SCAN); scanptr.p->m_curr_batch_size_bytes+= tdata4; - scanptr.p->m_curr_batch_size_rows++; + scanptr.p->m_curr_batch_size_rows = rows + 1; scanptr.p->m_last_row = tdata5; if (scanptr.p->check_scan_batch_completed() | tdata5){ if (scanptr.p->scanLockHold == ZTRUE) { @@ -9103,7 +9133,7 @@ void Dblqh::scanTupkeyConfLab(Signal* signal) return; } else { jam(); - scanptr.p->scanReleaseCounter = scanptr.p->m_curr_batch_size_rows; + scanptr.p->scanReleaseCounter = rows + 1; scanReleaseLocksLab(signal); return; } @@ -9187,12 +9217,24 @@ void Dblqh::scanTupkeyRefLab(Signal* signal) tcConnectptr.p->transactionState = TcConnectionrec::SCAN_STATE_USED; scanptr.i = tcConnectptr.p->tcScanRec; c_scanRecordPool.getPtr(scanptr); + + Uint32 rows = scanptr.p->m_curr_batch_size_rows; + Uint32 accOpPtr= get_acc_ptr_from_scan_record(scanptr.p, rows, false); + if (accOpPtr != (Uint32)-1) + { + c_acc->execACCKEY_ORD(signal, accOpPtr); + } + else + { + ndbassert(refToBlock(scanptr.p->scanBlockref) != DBACC); + } + if (scanptr.p->scanCompletedStatus == ZTRUE) { /* --------------------------------------------------------------------- * STOP THE SCAN PROCESS IF THIS HAS BEEN REQUESTED. * --------------------------------------------------------------------- */ - if ((scanptr.p->scanLockHold == ZTRUE) && - (scanptr.p->m_curr_batch_size_rows > 0)) { + if ((scanptr.p->scanLockHold == ZTRUE) && rows) + { jam(); scanptr.p->scanReleaseCounter = 1; scanReleaseLocksLab(signal); @@ -9213,8 +9255,8 @@ void Dblqh::scanTupkeyRefLab(Signal* signal) scanptr.p->scanReleaseCounter = 1; } else { jam(); - scanptr.p->m_curr_batch_size_rows++; - scanptr.p->scanReleaseCounter = scanptr.p->m_curr_batch_size_rows; + scanptr.p->m_curr_batch_size_rows = rows + 1; + scanptr.p->scanReleaseCounter = rows + 1; }//if /* -------------------------------------------------------------------- * WE NEED TO RELEASE ALL LOCKS CURRENTLY @@ -9224,7 +9266,7 @@ void Dblqh::scanTupkeyRefLab(Signal* signal) return; }//if Uint32 time_passed= tcConnectptr.p->tcTimer - cLqhTimeOutCount; - if (scanptr.p->m_curr_batch_size_rows > 0) { + if (rows) { if (time_passed > 1) { /* ----------------------------------------------------------------------- * WE NEED TO ENSURE THAT WE DO NOT SEARCH FOR THE NEXT TUPLE FOR A @@ -9232,7 +9274,7 @@ void Dblqh::scanTupkeyRefLab(Signal* signal) * THE FOUND TUPLE IF FOUND TUPLES ARE RARE. If more than 10 ms passed we * send the found tuples to the API. * ----------------------------------------------------------------------- */ - scanptr.p->scanReleaseCounter = scanptr.p->m_curr_batch_size_rows + 1; + scanptr.p->scanReleaseCounter = rows + 1; scanReleaseLocksLab(signal); return; } @@ -9872,6 +9914,8 @@ void Dblqh::execCOPY_FRAGREQ(Signal* signal) fragptr.p->m_scanNumberMask.clear(NR_ScanNo); scanptr.p->scanBlockref = DBTUP_REF; scanptr.p->scanLockHold = ZFALSE; + scanptr.p->m_curr_batch_size_rows = 0; + scanptr.p->m_curr_batch_size_bytes= 0; initScanTc(0, 0, @@ -10074,7 +10118,7 @@ void Dblqh::nextScanConfCopyLab(Signal* signal) initCopyTc(signal, ZDELETE); set_acc_ptr_in_scan_record(scanptr.p, 0, RNIL); tcConP->gci = nextScanConf->gci; - + tcConP->primKeyLen = 0; tcConP->totSendlenAi = 0; tcConP->connectState = TcConnectionrec::COPY_CONNECTED; @@ -10197,6 +10241,12 @@ void Dblqh::copyTupkeyConfLab(Signal* signal) scanptr.i = tcConnectptr.p->tcScanRec; c_scanRecordPool.getPtr(scanptr); ScanRecord* scanP = scanptr.p; + + Uint32 rows = scanP->m_curr_batch_size_rows; + Uint32 accOpPtr= get_acc_ptr_from_scan_record(scanP, rows, false); + ndbassert(accOpPtr != (Uint32)-1); + c_acc->execACCKEY_ORD(signal, accOpPtr); + if (tcConnectptr.p->errorCode != 0) { jam(); closeCopyLab(signal); @@ -18538,6 +18588,21 @@ Dblqh::execDUMP_STATE_ORD(Signal* signal) } ndbrequire(arg != 2308); } + +#ifdef ERROR_INSERT + if (arg == 5712 || arg == 5713) + { + if (arg == 5712) + { + traceopout = &ndbout; + } + else if (arg == 5713) + { + traceopout = tracenrout; + } + SET_ERROR_INSERT_VALUE(arg); + } +#endif }//Dblqh::execDUMP_STATE_ORD() @@ -18702,3 +18767,39 @@ void Dblqh::writeDbgInfoPageHeader(LogPageRecordPtr logP, Uint32 place, logP.p->logPageWord[ZPOS_IN_WRITING]= 1; } +#if defined ERROR_INSERT +void +Dblqh::TRACE_OP_DUMP(const Dblqh::TcConnectionrec* regTcPtr, const char * pos) +{ + (* traceopout) + << "[ " << hex << regTcPtr->transid[0] + << " " << hex << regTcPtr->transid[1] << " ] " << dec + << pos + << " " << (Operation_t)regTcPtr->operation + << " " << regTcPtr->tableref + << "(" << regTcPtr->fragmentid << ")" + << "(" << (regTcPtr->seqNoReplica == 0 ? "P" : "B") << ")" ; + + { + (* traceopout) << "key=[" << hex; + Uint32 i; + for(i = 0; iprimKeyLen && i < 4; i++){ + (* traceopout) << hex << regTcPtr->tupkeyData[i] << " "; + } + + DatabufPtr regDatabufptr; + regDatabufptr.i = regTcPtr->firstTupkeybuf; + while(i < regTcPtr->primKeyLen) + { + ptrCheckGuard(regDatabufptr, cdatabufFileSize, databuf); + for(Uint32 j = 0; j<4 && iprimKeyLen; j++, i++) + (* traceopout) << hex << regDatabufptr.p->data[j] << " "; + } + (* traceopout) << "] "; + } + + if (regTcPtr->m_use_rowid) + (* traceopout) << " " << regTcPtr->m_row_id; + (* traceopout) << endl; +} +#endif diff --git a/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp b/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp index b319932aec7..4ff6e069963 100644 --- a/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp +++ b/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp @@ -297,7 +297,6 @@ enum TransState { }; enum TupleState { - TUPLE_INITIAL_INSERT = 0, TUPLE_PREPARED = 1, TUPLE_ALREADY_ABORTED = 2, TUPLE_TO_BE_COMMITTED = 3 @@ -305,7 +304,6 @@ enum TupleState { enum State { NOT_INITIALIZED = 0, - COMMON_AREA_PAGES = 1, IDLE = 17, ACTIVE = 18, SYSTEM_RESTART = 19, @@ -1441,7 +1439,6 @@ private: void execSET_VAR_REQ(Signal* signal); void execDROP_TAB_REQ(Signal* signal); void execALTER_TAB_REQ(Signal* signal); - void execTUP_ALLOCREQ(Signal* signal); void execTUP_DEALLOCREQ(Signal* signal); void execTUP_WRITELOG_REQ(Signal* signal); diff --git a/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp b/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp index 90abe2cb809..d45c5f9d988 100644 --- a/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp +++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp @@ -202,29 +202,6 @@ Dbtup::receive_attrinfo(Signal* signal, Uint32 op, } } -void Dbtup::execTUP_ALLOCREQ(Signal* signal) -{ - OperationrecPtr regOperPtr; - - jamEntry(); - - regOperPtr.i= signal->theData[0]; - c_operation_pool.getPtr(regOperPtr); - - regOperPtr.p->op_struct.tuple_state= TUPLE_INITIAL_INSERT; - //ndbout_c("execTUP_ALLOCREQ"); - - signal->theData[0]= 0; - signal->theData[1]= ~0 >> MAX_TUPLES_BITS; - signal->theData[2]= (1 << MAX_TUPLES_BITS) - 1; - return; - -mem_error: - jam(); - signal->theData[0]= ZMEM_NOMEM_ERROR; - return; -} - void Dbtup::setChecksum(Tuple_header* tuple_ptr, Tablerec* regTabPtr) @@ -455,13 +432,13 @@ Dbtup::load_diskpage(Signal* signal, ptrCheckGuard(tabptr, cnoOfTablerec, tablerec); Tablerec* regTabPtr = tabptr.p; - if(regOperPtr->op_struct.tuple_state == TUPLE_INITIAL_INSERT) + if(local_key == ~(Uint32)0) { jam(); regOperPtr->op_struct.m_wait_log_buffer= 1; regOperPtr->op_struct.m_load_diskpage_on_commit= 1; return 1; - } + } jam(); Uint32 page_idx= local_key & MAX_TUPLES_PER_PAGE; @@ -663,7 +640,7 @@ void Dbtup::execTUPKEYREQ(Signal* signal) regOperPtr->savepointId= sig1; regOperPtr->op_struct.primary_replica= sig2; - regOperPtr->m_tuple_location.m_page_idx= sig3; + Uint32 pageidx = regOperPtr->m_tuple_location.m_page_idx= sig3; sig1= tupKeyReq->opRef; sig2= tupKeyReq->tcOpIndex; @@ -673,7 +650,7 @@ void Dbtup::execTUPKEYREQ(Signal* signal) req_struct.tc_operation_ptr= sig1; req_struct.TC_index= sig2; req_struct.TC_ref= sig3; - req_struct.frag_page_id= sig4; + Uint32 pageid = req_struct.frag_page_id= sig4; req_struct.m_use_rowid = (TrequestInfo >> 11) & 1; sig1= tupKeyReq->attrBufLen; @@ -706,7 +683,8 @@ void Dbtup::execTUPKEYREQ(Signal* signal) copyAttrinfo(regOperPtr, &cinBuffer[0]); - if(Roptype == ZINSERT && get_tuple_state(regOperPtr)== TUPLE_INITIAL_INSERT) + Uint32 localkey = (pageid << MAX_TUPLES_BITS) + pageidx; + if(Roptype == ZINSERT && localkey == ~0) { // No tuple allocatated yet goto do_insert; @@ -1159,49 +1137,6 @@ Dbtup::prepare_initial_insert(KeyReqStruct *req_struct, disk_undo ? (Tuple_header::DISK_ALLOC | Tuple_header::DISK_INLINE) : 0; } -void -Dbtup::fix_disk_insert_no_mem_insert(KeyReqStruct *req_struct, - Operationrec* regOperPtr, - Tablerec* regTabPtr) -{ - regOperPtr->m_undo_buffer_space= sizeof(Dbtup::Disk_undo::Alloc); - req_struct->check_offset[DD]= regTabPtr->get_check_offset(DD); - - const Uint32 cnt1= regTabPtr->m_attributes[MM].m_no_of_varsize; - const Uint32 cnt2= regTabPtr->m_attributes[DD].m_no_of_varsize; - Uint32 *ptr= req_struct->m_tuple_ptr->get_var_part_ptr(regTabPtr); - - if(cnt1) - { - // Disk part is 32-bit aligned - char *varptr = req_struct->m_var_data[MM].m_data_ptr; - ptr= ALIGN_WORD(varptr + regTabPtr->m_offsets[MM].m_max_var_offset); - } - else - { - ptr -= Tuple_header::HeaderSize; - } - - req_struct->m_disk_ptr= (Tuple_header*)ptr; - - if(cnt2) - { - KeyReqStruct::Var_data *dst= &req_struct->m_var_data[DD]; - ptr=((Tuple_header*)ptr)->m_data+regTabPtr->m_offsets[DD].m_varpart_offset; - dst->m_data_ptr= (char*)(((Uint16*)ptr)+cnt2+1); - dst->m_offset_array_ptr= req_struct->var_pos_array + (cnt1 << 1); - dst->m_var_len_offset= cnt2; - dst->m_max_var_offset= regTabPtr->m_offsets[DD].m_max_var_offset; - } - - // Set all null bits - memset(req_struct->m_disk_ptr->m_null_bits+ - regTabPtr->m_offsets[DD].m_null_offset, 0xFF, - 4*regTabPtr->m_offsets[DD].m_null_words); - req_struct->m_tuple_ptr->m_header_bits = - (Tuple_header::DISK_ALLOC | Tuple_header::DISK_INLINE); -} - int Dbtup::handleInsertReq(Signal* signal, Ptr regOperPtr, Ptr fragPtr, @@ -1215,8 +1150,8 @@ int Dbtup::handleInsertReq(Signal* signal, Tuple_header *tuple_ptr; bool disk = regTabPtr->m_no_of_disk_attributes > 0; - bool mem_insert = get_tuple_state(regOperPtr.p) == TUPLE_INITIAL_INSERT; - bool disk_insert = regOperPtr.p->is_first_operation() && disk; + bool mem_insert = regOperPtr.p->is_first_operation(); + bool disk_insert = mem_insert && disk; bool varsize = regTabPtr->m_attributes[MM].m_no_of_varsize; bool rowid = req_struct->m_use_rowid; Uint32 real_page_id = regOperPtr.p->m_tuple_location.m_page_no; @@ -1244,21 +1179,16 @@ int Dbtup::handleInsertReq(Signal* signal, if(mem_insert) { jam(); - ndbassert(regOperPtr.p->is_first_operation()); // disk insert prepare_initial_insert(req_struct, regOperPtr.p, regTabPtr); } else { - if (!regOperPtr.p->is_first_operation()) - { - Operationrec* prevOp= req_struct->prevOpPtr.p; - ndbassert(prevOp->op_struct.op_type == ZDELETE); - tup_version= prevOp->tupVersion + 1; - - if(!prevOp->is_first_operation()) - org= (Tuple_header*)c_undo_buffer.get_ptr(&prevOp->m_copy_tuple_location); - } - + Operationrec* prevOp= req_struct->prevOpPtr.p; + ndbassert(prevOp->op_struct.op_type == ZDELETE); + tup_version= prevOp->tupVersion + 1; + + if(!prevOp->is_first_operation()) + org= (Tuple_header*)c_undo_buffer.get_ptr(&prevOp->m_copy_tuple_location); if (regTabPtr->need_expand()) expand_tuple(req_struct, sizes, org, regTabPtr, !disk_insert); else @@ -1268,11 +1198,6 @@ int Dbtup::handleInsertReq(Signal* signal, if (disk_insert) { int res; - if (unlikely(!mem_insert)) - { - sizes[DD] = sizes[DD+2] = regTabPtr->m_offsets[DD].m_fix_header_size; - fix_disk_insert_no_mem_insert(req_struct, regOperPtr.p, regTabPtr); - } if (ERROR_INSERTED(4015)) { @@ -1381,6 +1306,7 @@ int Dbtup::handleInsertReq(Signal* signal, } if (unlikely(ptr == 0)) { + jam(); goto alloc_rowid_error; } } @@ -1396,7 +1322,7 @@ int Dbtup::handleInsertReq(Signal* signal, (varsize ? Tuple_header::CHAINED_ROW : 0); regOperPtr.p->m_tuple_location.m_page_no = real_page_id; } - else if(!rowid || !regOperPtr.p->is_first_operation()) + else { int ret; if (ERROR_INSERTED(4020)) @@ -1417,20 +1343,6 @@ int Dbtup::handleInsertReq(Signal* signal, req_struct->m_use_rowid = false; base->m_header_bits &= ~(Uint32)Tuple_header::FREE; } - else - { - if ((req_struct->m_row_id.m_page_no == frag_page_id && - req_struct->m_row_id.m_page_idx == regOperPtr.p->m_tuple_location.m_page_idx)) - { - ndbout_c("no mem insert but rowid (same)"); - base->m_header_bits &= ~(Uint32)Tuple_header::FREE; - } - else - { - // no mem insert, but rowid - ndbrequire(false); - } - } base->m_header_bits |= Tuple_header::ALLOC & (regOperPtr.p->is_first_operation() ? ~0 : 1); diff --git a/storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp b/storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp index d1580f8da54..8a68905cef9 100644 --- a/storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp +++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp @@ -89,7 +89,6 @@ Dbtup::Dbtup(Block_context& ctx, Pgman* pgman) addRecSignal(GSN_DROP_TAB_REQ, &Dbtup::execDROP_TAB_REQ); - addRecSignal(GSN_TUP_ALLOCREQ, &Dbtup::execTUP_ALLOCREQ); addRecSignal(GSN_TUP_DEALLOCREQ, &Dbtup::execTUP_DEALLOCREQ); addRecSignal(GSN_TUP_WRITELOG_REQ, &Dbtup::execTUP_WRITELOG_REQ); diff --git a/storage/ndb/test/ndbapi/testOperations.cpp b/storage/ndb/test/ndbapi/testOperations.cpp index 65b406f155d..e116545f7e3 100644 --- a/storage/ndb/test/ndbapi/testOperations.cpp +++ b/storage/ndb/test/ndbapi/testOperations.cpp @@ -665,9 +665,9 @@ main(int argc, const char** argv){ for(Uint32 i = 0; i < 12; i++) { - if(i == 6 || i == 8 || i == 10) + if(false && (i == 6 || i == 8 || i == 10)) continue; - + BaseString name("bug_9749"); name.appfmt("_%d", i); NDBT_TestCaseImpl1 *pt = new NDBT_TestCaseImpl1(&ts, -- cgit v1.2.1 From 8bfc2d9c1b861224f8be8b707a1f9d5e52fda7e4 Mon Sep 17 00:00:00 2001 From: unknown Date: Fri, 19 May 2006 09:38:59 +0200 Subject: ndb - bug#19928 and bug#19929 fix to critical bugs in tup scan that affected lcp,backup and opt. nr storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp: 1) dont let dirty read scan find uncommitted inserts 2) force opt. nr scan to wait for locked rows 3) when finding LCP keep record, use accOpPtr -1, so that it will not be committed towards ACC --- storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp | 40 ++++++++++++++--------- 1 file changed, 24 insertions(+), 16 deletions(-) diff --git a/storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp b/storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp index 43875911f7f..0d2ffe0927f 100644 --- a/storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp +++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp @@ -582,12 +582,15 @@ Dbtup::scanNext(Signal* signal, ScanOpPtr scanPtr) Fragrecord& frag = *fragPtr.p; // tuple found Tuple_header* th = 0; + Uint32 thbits = 0; Uint32 loop_count = 0; Uint32 scanGCI = scanPtr.p->m_scanGCI; Uint32 foundGCI; - bool mm = (bits & ScanOp::SCAN_DD); - bool lcp = (bits & ScanOp::SCAN_LCP); + const bool mm = (bits & ScanOp::SCAN_DD); + const bool lcp = (bits & ScanOp::SCAN_LCP); + const bool dirty = (bits & ScanOp::SCAN_LOCK) == 0; + Uint32 lcp_list = fragPtr.p->m_lcp_keep_list; Uint32 size = table.m_offsets[mm].m_fix_header_size + (bits & ScanOp::SCAN_VS ? Tuple_header::HeaderSize + 1: 0); @@ -750,22 +753,22 @@ Dbtup::scanNext(Signal* signal, ScanOpPtr scanPtr) { pos.m_get = ScanPos::Get_next_tuple_fs; th = (Tuple_header*)&page->m_data[key.m_page_idx]; + thbits = th->m_header_bits; + if (likely(! (bits & ScanOp::SCAN_NR))) { - if (! (th->m_header_bits & Tuple_header::FREE)) { - goto found_tuple; - } - else + jam(); + if (! (thbits & Tuple_header::FREE)) { - jam(); - // skip free tuple - } + if (! ((thbits & Tuple_header::ALLOC) && dirty)) + goto found_tuple; + } } else { if ((foundGCI = *th->get_mm_gci(tablePtr.p)) > scanGCI) { - if (! (th->m_header_bits & Tuple_header::FREE)) + if (! (thbits & Tuple_header::FREE)) { jam(); goto found_tuple; @@ -775,9 +778,11 @@ Dbtup::scanNext(Signal* signal, ScanOpPtr scanPtr) goto found_deleted_rowid; } } - else + else if (thbits != Fix_page::FREE_RECORD && + th->m_operation_ptr_i != RNIL) { jam(); + goto found_tuple; // Locked tuple... // skip free tuple } } @@ -793,8 +798,7 @@ Dbtup::scanNext(Signal* signal, ScanOpPtr scanPtr) jam(); { // caller has already set pos.m_get to next tuple - if (! (bits & ScanOp::SCAN_LCP && - th->m_header_bits & Tuple_header::LCP_SKIP)) { + if (! (bits & ScanOp::SCAN_LCP && thbits & Tuple_header::LCP_SKIP)) { Local_key& key_mm = pos.m_key_mm; if (! (bits & ScanOp::SCAN_DD)) { key_mm = pos.m_key; @@ -810,7 +814,11 @@ Dbtup::scanNext(Signal* signal, ScanOpPtr scanPtr) } else { jam(); // clear it so that it will show up in next LCP - th->m_header_bits &= ~(Uint32)Tuple_header::LCP_SKIP; + th->m_header_bits = thbits & ~(Uint32)Tuple_header::LCP_SKIP; + if (tablePtr.p->m_bits & Tablerec::TR_Checksum) { + jam(); + setChecksum(th, tablePtr.p); + } } } break; @@ -833,7 +841,7 @@ Dbtup::scanNext(Signal* signal, ScanOpPtr scanPtr) th = (Tuple_header*)(mmpage->m_data + key_mm.m_page_idx); if ((foundGCI = *th->get_mm_gci(tablePtr.p)) > scanGCI) { - if (! (th->m_header_bits & Tuple_header::FREE)) + if (! (thbits & Tuple_header::FREE)) break; } } @@ -893,7 +901,7 @@ found_lcp_keep: NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend(); conf->scanPtr = scan.m_userPtr; - conf->accOperationPtr = RNIL + 1; + conf->accOperationPtr = (Uint32)-1; conf->fragId = frag.fragmentId; conf->localKey[0] = lcp_list; conf->localKey[1] = 0; -- cgit v1.2.1 From 56589afdc28142a708d2e8fd4acb12e81bee79f9 Mon Sep 17 00:00:00 2001 From: unknown Date: Fri, 19 May 2006 16:29:48 +0200 Subject: ndb - fix scan bugs introduced by acc modifications add more error testcases storage/ndb/src/kernel/blocks/dbacc/Dbacc.hpp: remove unused state storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp: 1) remove unused state 2) Fix abort of running lock owner 3) Fix abort of running op in parallell queue (especially scans) storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp: remove some printouts add some jams fix so that close tupscan, can not acciently start acc scan in queue (NOTE limits #tupscans to 12 which is not really necessary...but the fix was easy) storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp: Use abort of locks when closing/blocked as Dbacc gets annoyed when committing an op with state running storage/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp: Use abort of locks when closing/blocked as Dbacc gets annoyed when committing an op with state running storage/ndb/test/include/HugoOperations.hpp: new method storage/ndb/test/ndbapi/testBasic.cpp: add more test cases storage/ndb/test/ndbapi/testScan.cpp: add more testcases storage/ndb/test/run-test/daily-basic-tests.txt: add more testcases storage/ndb/test/src/HugoOperations.cpp: add more testcases --- storage/ndb/src/kernel/blocks/dbacc/Dbacc.hpp | 1 - storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp | 62 ++++++++++++++---- storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp | 16 ++--- storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp | 10 +-- storage/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp | 12 ++-- storage/ndb/test/include/HugoOperations.hpp | 2 + storage/ndb/test/ndbapi/testBasic.cpp | 78 +++++++++++++++++++---- storage/ndb/test/ndbapi/testScan.cpp | 24 ++++++- storage/ndb/test/run-test/daily-basic-tests.txt | 12 ++++ storage/ndb/test/src/HugoOperations.cpp | 29 +++++++-- 10 files changed, 194 insertions(+), 52 deletions(-) diff --git a/storage/ndb/src/kernel/blocks/dbacc/Dbacc.hpp b/storage/ndb/src/kernel/blocks/dbacc/Dbacc.hpp index b1638599dcc..f0c3dbc2866 100644 --- a/storage/ndb/src/kernel/blocks/dbacc/Dbacc.hpp +++ b/storage/ndb/src/kernel/blocks/dbacc/Dbacc.hpp @@ -533,7 +533,6 @@ struct Operationrec { ,OP_STATE_WAITING = 0x00000 ,OP_STATE_RUNNING = 0x10000 ,OP_STATE_EXECUTED = 0x30000 - ,OP_STATE_RUNNING_ABORT = 0x20000 ,OP_EXECUTED_DIRTY_READ = 0x3050F ,OP_INITIAL = ~(Uint32)0 diff --git a/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp b/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp index 5337c7c015d..62a786d9f0e 100644 --- a/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp +++ b/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp @@ -1181,6 +1181,7 @@ void Dbacc::execACCKEYREQ(Signal* signal) void Dbacc::execACCKEY_ORD(Signal* signal, Uint32 opPtrI) { + jamEntry(); OperationrecPtr lastOp; lastOp.i = opPtrI; ptrCheckGuard(lastOp, coprecsize, operationrec); @@ -1200,9 +1201,6 @@ Dbacc::execACCKEY_ORD(Signal* signal, Uint32 opPtrI) startNext(signal, lastOp); return; } - else if (opstate == Operationrec::OP_STATE_RUNNING_ABORT) - { - } else { } @@ -1240,15 +1238,14 @@ Dbacc::startNext(Signal* signal, OperationrecPtr lastOp) { jam(); ptrCheckGuard(loPtr, coprecsize, operationrec); - nextOp.i = loPtr.p->nextSerialQue; } else { jam(); - nextOp.i = loPtr.i; loPtr = lastOp; } + nextOp.i = loPtr.p->nextSerialQue; ndbassert(loPtr.p->m_op_bits & Operationrec::OP_LOCK_OWNER); if (nextOp.i == RNIL) @@ -1411,6 +1408,9 @@ conf: else { jam(); + fragrecptr.i = nextOp.p->fragptr; + ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec); + sendAcckeyconf(signal); sendSignal(nextOp.p->userblockref, GSN_ACCKEYCONF, signal, 6, JBA); @@ -1680,8 +1680,7 @@ Dbacc::validate_lock_queue(OperationrecPtr opPtr) bool running = false; { Uint32 opstate = loPtr.p->m_op_bits & Operationrec::OP_STATE_MASK; - if (opstate == Operationrec::OP_STATE_RUNNING || - opstate == Operationrec::OP_STATE_RUNNING_ABORT) + if (opstate == Operationrec::OP_STATE_RUNNING) running = true; else { @@ -1719,8 +1718,7 @@ Dbacc::validate_lock_queue(OperationrecPtr opPtr) } else { - if (opstate == Operationrec::OP_STATE_RUNNING || - opstate == Operationrec::OP_STATE_RUNNING_ABORT) + if (opstate == Operationrec::OP_STATE_RUNNING) running = true; else vlqrequire(opstate == Operationrec::OP_STATE_EXECUTED); @@ -1830,8 +1828,6 @@ operator<<(NdbOut & out, Dbacc::OperationrecPtr ptr) out << " RUNNING "; break; case Dbacc::Operationrec::OP_STATE_EXECUTED: out << " EXECUTED "; break; - case Dbacc::Operationrec::OP_STATE_RUNNING_ABORT: - out << " RUNNIG_ABORT "; break; case Dbacc::Operationrec::OP_STATE_IDLE: out << " IDLE "; break; default: @@ -1857,7 +1853,6 @@ operator<<(NdbOut & out, Dbacc::OperationrecPtr ptr) ,OP_STATE_WAITING = 0x0000 ,OP_STATE_RUNNING = 0x1000 ,OP_STATE_EXECUTED = 0x3000 - ,OP_STATE_RUNNING_ABORT = 0x2000 }; */ if (opbits & Dbacc::Operationrec::OP_LOCK_OWNER) @@ -3950,6 +3945,7 @@ void Dbacc::checkoverfreelist(Signal* signal) void Dbacc::abortParallelQueueOperation(Signal* signal, OperationrecPtr opPtr) { + jam(); OperationrecPtr nextP; OperationrecPtr prevP; OperationrecPtr loPtr; @@ -3992,13 +3988,21 @@ Dbacc::abortParallelQueueOperation(Signal* signal, OperationrecPtr opPtr) else { jam(); + /** + * P0 - P1 + * + * Abort P1, check start next + */ ndbassert(prevP.p->m_op_bits & Operationrec::OP_LOCK_OWNER); prevP.p->m_lo_last_parallel_op_ptr_i = RNIL; startNext(signal, prevP); validate_lock_queue(prevP); return; } - + + /** + * Abort P1/P2 + */ if (opbits & Operationrec::OP_LOCK_MODE) { Uint32 nextbits = nextP.p->m_op_bits; @@ -4024,12 +4028,23 @@ Dbacc::abortParallelQueueOperation(Signal* signal, OperationrecPtr opPtr) /** * Abort P1, P2 */ + if (opstate == Operationrec::OP_STATE_RUNNING) + { + jam(); + startNext(signal, prevP); + validate_lock_queue(prevP); + return; + } + + ndbassert(opstate == Operationrec::OP_STATE_EXECUTED || + opstate == Operationrec::OP_STATE_WAITING); /** * Scan to last of run queue */ while (nextP.p->nextParallelQue != RNIL) { + jam(); nextP.i = nextP.p->nextParallelQue; ptrCheckGuard(nextP, coprecsize, operationrec); } @@ -4049,6 +4064,7 @@ Dbacc::abortParallelQueueOperation(Signal* signal, OperationrecPtr opPtr) void Dbacc::abortSerieQueueOperation(Signal* signal, OperationrecPtr opPtr) { + jam(); OperationrecPtr prevS, nextS; OperationrecPtr prevP, nextP; OperationrecPtr loPtr; @@ -4620,7 +4636,25 @@ Dbacc::release_lockowner(Signal* signal, OperationrecPtr opPtr, bool commit) * Aborting an operation can *always* lead to lock upgrade */ action = CHECK_LOCK_UPGRADE; - + Uint32 opstate = opbits & Operationrec::OP_STATE_MASK; + if (opstate != Operationrec::OP_STATE_EXECUTED) + { + ndbassert(opstate == Operationrec::OP_STATE_RUNNING); + if (opbits & Operationrec::OP_ELEMENT_DISAPPEARED) + { + jam(); + report_dealloc(signal, opPtr.p); + newOwner.p->localdata[0] = ~0; + } + else + { + jam(); + newOwner.p->localdata[0] = opPtr.p->localdata[0]; + newOwner.p->localdata[1] = opPtr.p->localdata[1]; + } + action = START_NEW; + } + /** * Update ACC_LOCK_MODE */ diff --git a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp index 18e5e6cd585..89d275c6625 100644 --- a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp +++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp @@ -2604,11 +2604,6 @@ void Dblqh::execTUPKEYREF(Signal* signal) ndbassert(regTcPtr->transactionState == TcConnectionrec::WAIT_TUP || regTcPtr->transactionState ==TcConnectionrec::WAIT_TUP_TO_ABORT); } - else if (getNodeState().startLevel == NodeState::SL_STARTED) - { - if (terrorCode == 899) - ndbout << "899: " << regTcPtr->m_row_id << endl; - } switch (tcConnectptr.p->transactionState) { case TcConnectionrec::WAIT_TUP: @@ -9095,6 +9090,7 @@ void Dblqh::scanTupkeyConfLab(Signal* signal) if (accOpPtr != (Uint32)-1) { c_acc->execACCKEY_ORD(signal, accOpPtr); + jamEntry(); } else { @@ -9419,7 +9415,7 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq) const Uint32 readCommitted = ScanFragReq::getReadCommittedFlag(reqinfo); const Uint32 rangeScan = ScanFragReq::getRangeScanFlag(reqinfo); const Uint32 descending = ScanFragReq::getDescendingFlag(reqinfo); - const Uint32 tupScan = ScanFragReq::getTupScanFlag(reqinfo); + Uint32 tupScan = ScanFragReq::getTupScanFlag(reqinfo); const Uint32 attrLen = ScanFragReq::getAttrLen(reqinfo); const Uint32 scanPrio = ScanFragReq::getScanPrio(reqinfo); @@ -9458,7 +9454,7 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq) scanptr.p->scanApiOpPtr = scanFragReq->clientOpPtr; scanptr.p->m_last_row = 0; scanptr.p->scanStoredProcId = RNIL; - + scanptr.p->copyPtr = RNIL; if (max_rows == 0 || (max_bytes > 0 && max_rows > max_bytes)){ jam(); return ScanFragRef::ZWRONG_BATCH_SIZE; @@ -9479,8 +9475,10 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq) * !idx uses 1 - (MAX_PARALLEL_SCANS_PER_FRAG - 1) = 1-11 * idx uses from MAX_PARALLEL_SCANS_PER_FRAG - MAX = 12-42) */ - Uint32 start = (rangeScan || tupScan ? MAX_PARALLEL_SCANS_PER_FRAG : 1 ); - Uint32 stop = (rangeScan || tupScan ? MAX_PARALLEL_INDEX_SCANS_PER_FRAG : MAX_PARALLEL_SCANS_PER_FRAG - 1); + tupScan = 0; // Make sure that close tup scan does not start acc scan incorrectly + Uint32 start = (rangeScan || tupScan) ? MAX_PARALLEL_SCANS_PER_FRAG : 1 ; + Uint32 stop = (rangeScan || tupScan) ? MAX_PARALLEL_INDEX_SCANS_PER_FRAG : + MAX_PARALLEL_SCANS_PER_FRAG - 1; stop += start; Uint32 free = tFragPtr.p->m_scanNumberMask.find(start); diff --git a/storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp b/storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp index 0d2ffe0927f..d9e94e63726 100644 --- a/storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp +++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp @@ -156,7 +156,7 @@ Dbtup::execNEXT_SCANREQ(Signal* signal) conf->scanPtr = scan.m_userPtr; unsigned signalLength = 1; sendSignal(scanPtr.p->m_userRef, GSN_NEXT_SCANCONF, - signal, signalLength, JBB); + signal, signalLength, JBB); return; } break; @@ -171,7 +171,7 @@ Dbtup::execNEXT_SCANREQ(Signal* signal) lockReq->requestInfo = AccLockReq::AbortWithConf; lockReq->accOpPtr = scan.m_accLockOp; EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, - signal, AccLockReq::UndoSignalLength); + signal, AccLockReq::UndoSignalLength); jamEntry(); ndbrequire(lockReq->returnCode == AccLockReq::Success); scan.m_state = ScanOp::Aborting; @@ -182,10 +182,10 @@ Dbtup::execNEXT_SCANREQ(Signal* signal) ndbrequire(scan.m_accLockOp != RNIL); AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend(); lockReq->returnCode = RNIL; - lockReq->requestInfo = AccLockReq::Unlock; + lockReq->requestInfo = AccLockReq::Abort; lockReq->accOpPtr = scan.m_accLockOp; EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, - signal, AccLockReq::UndoSignalLength); + signal, AccLockReq::UndoSignalLength); jamEntry(); ndbrequire(lockReq->returnCode == AccLockReq::Success); scan.m_accLockOp = RNIL; @@ -433,7 +433,7 @@ Dbtup::execACCKEYCONF(Signal* signal) jam(); AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend(); lockReq->returnCode = RNIL; - lockReq->requestInfo = AccLockReq::Unlock; + lockReq->requestInfo = AccLockReq::Abort; lockReq->accOpPtr = scan.m_accLockOp; EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength); jamEntry(); diff --git a/storage/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp b/storage/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp index 3c0b2c4ed3f..55315806635 100644 --- a/storage/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp +++ b/storage/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp @@ -321,7 +321,7 @@ Dbtux::execNEXT_SCANREQ(Signal* signal) conf->scanPtr = scan.m_userPtr; unsigned signalLength = 1; sendSignal(scanPtr.p->m_userRef, GSN_NEXT_SCANCONF, - signal, signalLength, JBB); + signal, signalLength, JBB); return; } break; @@ -344,7 +344,8 @@ Dbtux::execNEXT_SCANREQ(Signal* signal) lockReq->returnCode = RNIL; lockReq->requestInfo = AccLockReq::AbortWithConf; lockReq->accOpPtr = scan.m_accLockOp; - EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength); + EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, + AccLockReq::UndoSignalLength); jamEntry(); ndbrequire(lockReq->returnCode == AccLockReq::Success); scan.m_state = ScanOp::Aborting; @@ -355,9 +356,10 @@ Dbtux::execNEXT_SCANREQ(Signal* signal) ndbrequire(scan.m_accLockOp != RNIL); AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend(); lockReq->returnCode = RNIL; - lockReq->requestInfo = AccLockReq::Unlock; + lockReq->requestInfo = AccLockReq::Abort; lockReq->accOpPtr = scan.m_accLockOp; - EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength); + EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, + AccLockReq::UndoSignalLength); jamEntry(); ndbrequire(lockReq->returnCode == AccLockReq::Success); scan.m_accLockOp = RNIL; @@ -612,7 +614,7 @@ Dbtux::execACCKEYCONF(Signal* signal) jam(); AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend(); lockReq->returnCode = RNIL; - lockReq->requestInfo = AccLockReq::Unlock; + lockReq->requestInfo = AccLockReq::Abort; lockReq->accOpPtr = scan.m_accLockOp; EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength); jamEntry(); diff --git a/storage/ndb/test/include/HugoOperations.hpp b/storage/ndb/test/include/HugoOperations.hpp index c6ecb4c574e..995463e5056 100644 --- a/storage/ndb/test/include/HugoOperations.hpp +++ b/storage/ndb/test/include/HugoOperations.hpp @@ -108,6 +108,8 @@ public: NDBT_ResultRow& get_row(Uint32 idx) { return *rows[idx];} int execute_async(Ndb*, NdbTransaction::ExecType, NdbTransaction::AbortOption = NdbTransaction::AbortOnError); + int execute_async_prepare(Ndb*, NdbTransaction::ExecType, NdbTransaction::AbortOption = NdbTransaction::AbortOnError); + int wait_async(Ndb*, int timeout = -1); protected: diff --git a/storage/ndb/test/ndbapi/testBasic.cpp b/storage/ndb/test/ndbapi/testBasic.cpp index d45a8ecb7b1..f6a4bf37478 100644 --- a/storage/ndb/test/ndbapi/testBasic.cpp +++ b/storage/ndb/test/ndbapi/testBasic.cpp @@ -1090,11 +1090,6 @@ runMassiveRollback4(NDBT_Context* ctx, NDBT_Step* step){ ok = false; break; } - if (hugoOps.execute_NoCommit(pNdb) != 0) - { - ok = false; - break; - } } hugoOps.execute_Rollback(pNdb); CHECK(hugoOps.closeTransaction(pNdb) == 0); @@ -1199,6 +1194,61 @@ runTupErrors(NDBT_Context* ctx, NDBT_Step* step){ return NDBT_OK; } +int +runInsertError(NDBT_Context* ctx, NDBT_Step* step){ + + int result = NDBT_OK; + HugoOperations hugoOp1(*ctx->getTab()); + HugoOperations hugoOp2(*ctx->getTab()); + Ndb* pNdb = GETNDB(step); + + NdbRestarter restarter; + restarter.insertErrorInAllNodes(4017); + const Uint32 LOOPS = 10; + for (Uint32 i = 0; igetTab()); + Ndb* pNdb = GETNDB(step); + + NdbRestarter restarter; + restarter.insertErrorInAllNodes(4017); + + const Uint32 LOOPS = 1; + for (Uint32 i = 0; igetNumRecords(); int parallelism = ctx->getProperty("Parallelism", 240); int abort = ctx->getProperty("AbortProb", 5); + int tupscan = ctx->getProperty("TupScan", (Uint32)0); int i = 0; HugoTransactions hugoTrans(*ctx->getTab()); while (iisTestStopped()) { g_info << i << ": "; NdbOperation::LockMode lm = (NdbOperation::LockMode)(rand() % 3); + int scan_flags = 0; + + if (tupscan == 1) + scan_flags |= NdbScanOperation::SF_TupScan; + else if (tupscan == 2 && ((rand() & 0x800))) + { + scan_flags |= NdbScanOperation::SF_TupScan; + } + if (hugoTrans.scanReadRecords(GETNDB(step), records, abort, parallelism, - lm) != 0){ + lm, + scan_flags) != 0){ return NDBT_FAILED; } i++; @@ -1320,6 +1331,16 @@ TESTCASE("ScanRead488", STEPS(runRandScanRead, 70); FINALIZER(runClearTable); } +TESTCASE("ScanRead488T", + "Verify scan requirement: It's only possible to have 11 concurrent "\ + "scans per fragment running in Ndb kernel at the same time. "\ + "When this limit is exceeded the scan will be aborted with errorcode "\ + "488."){ + TC_PROPERTY("TupScan", 1); + INITIALIZER(runLoadTable); + STEPS(runRandScanRead, 70); + FINALIZER(runClearTable); +} TESTCASE("ScanRead488O", "Verify scan requirement: It's only possible to have 11 concurrent "\ "scans per fragment running in Ndb kernel at the same time. "\ @@ -1336,6 +1357,7 @@ TESTCASE("ScanRead488_Mixed", "scans per fragment running in Ndb kernel at the same time. "\ "When this limit is exceeded the scan will be aborted with errorcode "\ "488."){ + TC_PROPERTY("TupScan", 2); INITIALIZER(createOrderedPkIndex); INITIALIZER(runLoadTable); STEPS(runRandScanRead, 50); diff --git a/storage/ndb/test/run-test/daily-basic-tests.txt b/storage/ndb/test/run-test/daily-basic-tests.txt index 3fead45533f..a251c4db305 100644 --- a/storage/ndb/test/run-test/daily-basic-tests.txt +++ b/storage/ndb/test/run-test/daily-basic-tests.txt @@ -219,6 +219,14 @@ max-time: 500 cmd: testBasic args: -n TupError +max-time: 500 +cmd: testBasic +args: -n InsertError T1 + +max-time: 500 +cmd: testBasic +args: -n InsertError2 T1 + max-time: 500 cmd: testTimeout args: T1 @@ -273,6 +281,10 @@ max-time: 500 cmd: testScan args: -n ScanRead488O -l 10 T6 D1 D2 +max-time: 1000 +cmd: testScan +args: -n ScanRead488T -l 10 T6 D1 D2 + max-time: 1000 cmd: testScan args: -n ScanRead488_Mixed -l 10 T6 D1 D2 diff --git a/storage/ndb/test/src/HugoOperations.cpp b/storage/ndb/test/src/HugoOperations.cpp index 84ea88388dc..2903cb8810e 100644 --- a/storage/ndb/test/src/HugoOperations.cpp +++ b/storage/ndb/test/src/HugoOperations.cpp @@ -471,16 +471,33 @@ HugoOperations::execute_async(Ndb* pNdb, NdbTransaction::ExecType et, return NDBT_OK; } +int +HugoOperations::execute_async_prepare(Ndb* pNdb, NdbTransaction::ExecType et, + NdbTransaction::AbortOption eao){ + + m_async_reply= 0; + pTrans->executeAsynchPrepare(et, + HugoOperations_async_callback, + this, + eao); + + return NDBT_OK; +} + int HugoOperations::wait_async(Ndb* pNdb, int timeout) { - pNdb->pollNdb(1000); - - if(m_async_reply) + volatile int * wait = &m_async_reply; + while (!* wait) { - if(m_async_return) - ndbout << "ERROR: " << pNdb->getNdbError(m_async_return) << endl; - return m_async_return; + pNdb->sendPollNdb(1000); + + if(* wait) + { + if(m_async_return) + ndbout << "ERROR: " << pNdb->getNdbError(m_async_return) << endl; + return m_async_return; + } } ndbout_c("wait returned nothing..."); return -1; -- cgit v1.2.1 From b416be25d3e109f9c862d140c58e706a1e4e865a Mon Sep 17 00:00:00 2001 From: unknown Date: Sat, 20 May 2006 16:54:29 +0200 Subject: gcc4 compile fix --- storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp b/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp index d228d8ae819..5caa823dbc3 100644 --- a/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp +++ b/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp @@ -33,6 +33,9 @@ // primary key is stored in TUP #include "../dbtup/Dbtup.hpp" +class Dbacc; +class Dbtup; + #ifdef DBLQH_C // Constants /* ------------------------------------------------------------------------- */ @@ -2934,8 +2937,8 @@ public: DLHashTable c_scanTakeOverHash; -#ifdef ERROR_INSERT inline bool TRACE_OP_CHECK(const TcConnectionrec* regTcPtr); +#ifdef ERROR_INSERT void TRACE_OP_DUMP(const TcConnectionrec* regTcPtr, const char * pos); #endif }; -- cgit v1.2.1 From 27a213f3e0e0ef770a7fffcfc4df53e90cd96760 Mon Sep 17 00:00:00 2001 From: unknown Date: Mon, 22 May 2006 08:46:19 +0200 Subject: ndb - increase max time of MassiveRollback storage/ndb/test/run-test/daily-basic-tests.txt: increase max time --- storage/ndb/test/run-test/daily-basic-tests.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/storage/ndb/test/run-test/daily-basic-tests.txt b/storage/ndb/test/run-test/daily-basic-tests.txt index a251c4db305..f5c6e451b8f 100644 --- a/storage/ndb/test/run-test/daily-basic-tests.txt +++ b/storage/ndb/test/run-test/daily-basic-tests.txt @@ -199,7 +199,7 @@ max-time: 500 cmd: testBasicAsynch args: -n PkDeleteAsynch -max-time: 500 +max-time: 1000 cmd: testBasic args: -n MassiveRollback T1 T7 D1 D2 -- cgit v1.2.1 From 3283735a2a5d3a32abc7e2aca730c81dd4e710be Mon Sep 17 00:00:00 2001 From: unknown Date: Mon, 22 May 2006 17:11:05 +0200 Subject: ndb - dbacc rewamp fix so that getElement read localkey from lockowner instead of from page plus some cleanups storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp: 1) Use OP_INITIAL instead of ~0 2) Use JBB instead of JBA (once that I temporary changed...) 3) Add more validation to validate_lock_queue (insert/delete) 4) make getElement read localkey from lockowner instead of from page --- storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp | 58 +++++++++++++++++++---- 1 file changed, 48 insertions(+), 10 deletions(-) diff --git a/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp b/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp index 62a786d9f0e..4ee8a91f611 100644 --- a/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp +++ b/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp @@ -448,7 +448,7 @@ void Dbacc::initialiseOperationRec(Signal* signal) for (operationRecPtr.i = 0; operationRecPtr.i < coprecsize; operationRecPtr.i++) { refresh_watch_dog(); ptrAss(operationRecPtr, operationrec); - operationRecPtr.p->m_op_bits = ~0; + operationRecPtr.p->m_op_bits = Operationrec::OP_INITIAL; operationRecPtr.p->nextOp = operationRecPtr.i + 1; }//for operationRecPtr.i = coprecsize - 1; @@ -910,7 +910,7 @@ void Dbacc::execACCSEIZEREQ(Signal* signal) ptrGuard(operationRecPtr); operationRecPtr.p->userptr = tuserptr; operationRecPtr.p->userblockref = tuserblockref; - operationRecPtr.p->m_op_bits = ~0; + operationRecPtr.p->m_op_bits = Operationrec::OP_INITIAL; /* ******************************< */ /* ACCSEIZECONF */ /* ******************************< */ @@ -1413,7 +1413,7 @@ conf: sendAcckeyconf(signal); sendSignal(nextOp.p->userblockref, GSN_ACCKEYCONF, - signal, 6, JBA); + signal, 6, JBB); } operationRecPtr = save; @@ -1688,6 +1688,21 @@ Dbacc::validate_lock_queue(OperationrecPtr opPtr) } } + bool exists = true; + switch (loPtr.p->m_op_bits & Operationrec::OP_MASK){ + case ZREAD: + case ZINSERT: + case ZUPDATE: + case ZSCAN_OP: + exists = true; + break; + case ZDELETE: + exists = false; + break; + case ZWRITE: + vlqrequire(false); + } + // Validate parallel queue { bool many = false; @@ -1739,6 +1754,26 @@ Dbacc::validate_lock_queue(OperationrecPtr opPtr) { vlqrequire(orlockmode == 0); } + + if (opstate == Operationrec::OP_STATE_RUNNING || + opstate == Operationrec::OP_STATE_EXECUTED) + { + switch (lastP.p->m_op_bits & Operationrec::OP_MASK){ + case ZREAD: + case ZUPDATE: + case ZSCAN_OP: + vlqrequire(exists); + break; + case ZDELETE: + vlqrequire(exists); + exists = false; + break; + case ZINSERT: + vlqrequire(!exists); + exists = true; + break; + } + } } if (lastP.i != loPtr.i) @@ -2227,7 +2262,7 @@ void Dbacc::placeSerialQueue(OperationrecPtr lockOwnerPtr, /* ------------------------------------------------------------------------- */ void Dbacc::acckeyref1Lab(Signal* signal, Uint32 result_code) { - operationRecPtr.p->m_op_bits = ~0; + operationRecPtr.p->m_op_bits = Operationrec::OP_INITIAL; /* ************************<< */ /* ACCKEYREF */ /* ************************<< */ @@ -2446,7 +2481,7 @@ void Dbacc::execACC_LOCKREQ(Signal* signal) // init as in ACCSEIZEREQ operationRecPtr.p->userptr = req->userPtr; operationRecPtr.p->userblockref = req->userRef; - operationRecPtr.p->m_op_bits = ~0; + operationRecPtr.p->m_op_bits = Operationrec::OP_INITIAL; operationRecPtr.p->scanRecPtr = RNIL; // do read with lock via ACCKEYREQ Uint32 lockMode = (lockOp == AccLockReq::LockShared) ? 0 : 1; @@ -3312,6 +3347,7 @@ Dbacc::getElement(Signal* signal, OperationrecPtr& lockOwnerPtr) tgeElementHeader = gePageptr.p->word32[tgeElementptr]; tgeRemLen = tgeRemLen - TelemLen; Uint32 hashValuePart; + Uint32 localkey1, localkey2; lockOwnerPtr.i = RNIL; lockOwnerPtr.p = NULL; if (ElementHeader::getLocked(tgeElementHeader)) { @@ -3319,14 +3355,16 @@ Dbacc::getElement(Signal* signal, OperationrecPtr& lockOwnerPtr) lockOwnerPtr.i = ElementHeader::getOpPtrI(tgeElementHeader); ptrCheckGuard(lockOwnerPtr, coprecsize, operationrec); hashValuePart = lockOwnerPtr.p->hashvaluePart; + localkey1 = lockOwnerPtr.p->localdata[0]; + localkey2 = lockOwnerPtr.p->localdata[1]; } else { jam(); hashValuePart = ElementHeader::getHashValuePart(tgeElementHeader); + localkey1 = gePageptr.p->word32[tgeElementptr + tgeForward]; + localkey2 = 0; } if (hashValuePart == opHashValuePart) { jam(); - Uint32 localkey1 = gePageptr.p->word32[tgeElementptr + tgeForward]; - Uint32 localkey2 = 0; bool found; if (! searchLocalKey) { @@ -4644,7 +4682,7 @@ Dbacc::release_lockowner(Signal* signal, OperationrecPtr opPtr, bool commit) { jam(); report_dealloc(signal, opPtr.p); - newOwner.p->localdata[0] = ~0; + newOwner.p->localdata[0] = ~(Uint32)0; } else { @@ -4692,7 +4730,7 @@ Dbacc::release_lockowner(Signal* signal, OperationrecPtr opPtr, bool commit) if (opbits & Operationrec::OP_ELEMENT_DISAPPEARED) { report_dealloc(signal, opPtr.p); - newOwner.p->localdata[0] = ~0; + newOwner.p->localdata[0] = ~(Uint32)0; } else { @@ -4840,7 +4878,7 @@ conf: sendAcckeyconf(signal); sendSignal(newOwner.p->userblockref, GSN_ACCKEYCONF, - signal, 6, JBA); + signal, 6, JBB); operationRecPtr = save; return; -- cgit v1.2.1 From b0add081a43c8fe8aeb0f366a1986126a4bbd72a Mon Sep 17 00:00:00 2001 From: unknown Date: Mon, 22 May 2006 20:39:53 +0200 Subject: ndb - dbacc remove too strong assertion storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp: remove too strong assertion --- storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp | 35 ----------------------- 1 file changed, 35 deletions(-) diff --git a/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp b/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp index 4ee8a91f611..18decbf0238 100644 --- a/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp +++ b/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp @@ -1688,21 +1688,6 @@ Dbacc::validate_lock_queue(OperationrecPtr opPtr) } } - bool exists = true; - switch (loPtr.p->m_op_bits & Operationrec::OP_MASK){ - case ZREAD: - case ZINSERT: - case ZUPDATE: - case ZSCAN_OP: - exists = true; - break; - case ZDELETE: - exists = false; - break; - case ZWRITE: - vlqrequire(false); - } - // Validate parallel queue { bool many = false; @@ -1754,26 +1739,6 @@ Dbacc::validate_lock_queue(OperationrecPtr opPtr) { vlqrequire(orlockmode == 0); } - - if (opstate == Operationrec::OP_STATE_RUNNING || - opstate == Operationrec::OP_STATE_EXECUTED) - { - switch (lastP.p->m_op_bits & Operationrec::OP_MASK){ - case ZREAD: - case ZUPDATE: - case ZSCAN_OP: - vlqrequire(exists); - break; - case ZDELETE: - vlqrequire(exists); - exists = false; - break; - case ZINSERT: - vlqrequire(!exists); - exists = true; - break; - } - } } if (lastP.i != loPtr.i) -- cgit v1.2.1 From 9ed1d4128e7607630f85bd0b289dc02fd11d3add Mon Sep 17 00:00:00 2001 From: unknown Date: Wed, 31 May 2006 08:18:27 +0200 Subject: ndb - fix testNodeRestart -n Bug18612 when running with only 1 node group storage/ndb/test/ndbapi/testNodeRestart.cpp: This test needs 2 node groups. --- storage/ndb/test/ndbapi/testNodeRestart.cpp | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/storage/ndb/test/ndbapi/testNodeRestart.cpp b/storage/ndb/test/ndbapi/testNodeRestart.cpp index aed0b39f196..00bc6d38850 100644 --- a/storage/ndb/test/ndbapi/testNodeRestart.cpp +++ b/storage/ndb/test/ndbapi/testNodeRestart.cpp @@ -696,7 +696,10 @@ runBug18612(NDBT_Context* ctx, NDBT_Step* step){ do { int tmp = restarter.getRandomNodeOtherNodeGroup(node1, rand()); if (tmp == -1) - break; + { + ctx->stopTest(); + return NDBT_OK; + } node1 = tmp; } while(nodesmask.get(node1)); -- cgit v1.2.1 From 48969dc09567c90c9cf3bb2daeebe71bdb1aa025 Mon Sep 17 00:00:00 2001 From: unknown Date: Wed, 31 May 2006 08:22:54 +0200 Subject: ndb - fix return value of "testBasic -n InsertError2 T1" storage/ndb/test/ndbapi/testBasic.cpp: fix return value --- storage/ndb/test/ndbapi/testBasic.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/storage/ndb/test/ndbapi/testBasic.cpp b/storage/ndb/test/ndbapi/testBasic.cpp index f6a4bf37478..bcbadbc627b 100644 --- a/storage/ndb/test/ndbapi/testBasic.cpp +++ b/storage/ndb/test/ndbapi/testBasic.cpp @@ -1242,11 +1242,12 @@ runInsertError2(NDBT_Context* ctx, NDBT_Step* step){ CHECK(hugoOp1.pkInsertRecord(pNdb, 1) == 0); CHECK(hugoOp1.pkDeleteRecord(pNdb, 1) == 0); - CHECK(hugoOp1.execute_NoCommit(pNdb) == 0); + hugoOp1.execute_NoCommit(pNdb); CHECK(hugoOp1.closeTransaction(pNdb) == 0); } restarter.insertErrorInAllNodes(0); + return NDBT_OK; } NDBT_TESTSUITE(testBasic); -- cgit v1.2.1 From 1f33c408bf523623574026f3c475fc60ea5fe8fc Mon Sep 17 00:00:00 2001 From: unknown Date: Wed, 31 May 2006 11:57:35 +0200 Subject: ndb - print some in case of error storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp: print scan state in case of error --- storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp index 89d275c6625..6b0b59ca8f9 100644 --- a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp +++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp @@ -7633,6 +7633,7 @@ void Dblqh::execNEXT_SCANCONF(Signal* signal) scanLockReleasedLab(signal); break; default: + ndbout_c("%d", scanptr.p->scanState); ndbrequire(false); }//switch }//Dblqh::execNEXT_SCANCONF() -- cgit v1.2.1 From 5f91dfde9e92436bc07df70bde7695292b72df3e Mon Sep 17 00:00:00 2001 From: unknown Date: Thu, 1 Jun 2006 09:42:49 +0200 Subject: ndb - dbacc store new op when converting WRITE to UPDATE so that ACCKEYCONF gets correct storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp: store new op when converting WRITE to UPDATE so that ACCKEYCONF gets correct --- storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp b/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp index 18decbf0238..7ddf491fafc 100644 --- a/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp +++ b/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp @@ -1096,6 +1096,7 @@ void Dbacc::execACCKEYREQ(Signal* signal) jam(); opbits &= ~(Uint32)Operationrec::OP_MASK; opbits |= (op = ZUPDATE); + operationRecPtr.p->m_op_bits = opbits; // store to get correct ACCKEYCONF } opbits |= Operationrec::OP_STATE_RUNNING; opbits |= Operationrec::OP_RUN_QUEUE; -- cgit v1.2.1 From 57b262f32a8f3e54ac07b9f2dc0a6429673472ad Mon Sep 17 00:00:00 2001 From: unknown Date: Thu, 1 Jun 2006 14:34:46 +0200 Subject: ndb - bug#20197 also close scan which are in "delivered" state, as it's impossible to release locks afterwards storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp: also close scan which are in "delivered" state, as it's impossible to release locks afterwards --- storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp b/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp index 869ae116f43..415720b430a 100644 --- a/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp +++ b/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp @@ -7052,6 +7052,18 @@ void Dbtc::checkScanActiveInFailedLqh(Signal* signal, found = true; } } + + ScanFragList deliv(c_scan_frag_pool, scanptr.p->m_delivered_scan_frags); + for(deliv.first(ptr); !ptr.isNull(); deliv.next(ptr)) + { + jam(); + if (refToNode(ptr.p->lqhBlockref) == failedNodeId) + { + jam(); + found = true; + break; + } + } } if(found){ jam(); -- cgit v1.2.1 From f68abd75360a75d3285133b6e5443d8a5f7f9635 Mon Sep 17 00:00:00 2001 From: unknown Date: Thu, 1 Jun 2006 14:36:53 +0200 Subject: ndb - acc - add assertion storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp: Add assertion that we dont try to read key from lqh on scan (as it does not have key...) --- storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp b/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp index 7ddf491fafc..ffb3d673713 100644 --- a/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp +++ b/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp @@ -3190,6 +3190,7 @@ Dbacc::readTablePk(Uint32 localkey1, Uint32 eh, const Operationrec* op) else { ndbrequire(ElementHeader::getLocked(eh)); + ndbrequire((op->m_op_bits & Operationrec::OP_MASK) != ZSCAN_OP); ret = c_lqh->readPrimaryKeys(op->userptr, ckeys, xfrm); } jamEntry(); -- cgit v1.2.1 From d411a684bb15e0c4a84051ea653e626ffd664da6 Mon Sep 17 00:00:00 2001 From: unknown Date: Fri, 2 Jun 2006 15:22:49 +0200 Subject: ndb - bug#20185 second try - handle CS_PREPARE_TO_COMMIT explicitly storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp: handle CS_PREPARE_TO_COMMIT explictly --- storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp b/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp index 415720b430a..3f16e32d089 100644 --- a/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp +++ b/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp @@ -7093,15 +7093,20 @@ Dbtc::nodeFailCheckTransactions(Signal* signal, for (transPtr.i = transPtrI; transPtr.i < capiConnectFilesize; transPtr.i++) { ptrCheckGuard(transPtr, capiConnectFilesize, apiConnectRecord); + Uiint32 state = transPtr.p->apiConnectstate; if (transPtr.p->m_transaction_nodes.get(failedNodeId)) { jam(); - - // Force timeout regardless of state - c_appl_timeout_value = 1; - setApiConTimer(transPtr.i, TtcTimer - 2, __LINE__); - timeOutFoundLab(signal, transPtr.i, ZNODEFAIL_BEFORE_COMMIT); - c_appl_timeout_value = TapplTimeout; + + // avoid assertion in timeoutfoundlab + if (state != CS_PREPARE_TO_COMMIT) + { + // Force timeout regardless of state + c_appl_timeout_value = 1; + setApiConTimer(transPtr.i, TtcTimer - 2, __LINE__); + timeOutFoundLab(signal, transPtr.i, ZNODEFAIL_BEFORE_COMMIT); + c_appl_timeout_value = TapplTimeout; + } } // Send CONTINUEB to continue later -- cgit v1.2.1 From 3f950c925e327a10e79be3cebc8d57d40018bda9 Mon Sep 17 00:00:00 2001 From: unknown Date: Fri, 2 Jun 2006 15:54:09 +0200 Subject: ndb - bug#20185 update test prg + fix typo storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp: compile error storage/ndb/test/ndbapi/testNodeRestart.cpp: upgrade test prg not to kill master node as master take over wont work with all these dealyed GCP_PREPARE --- storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp | 2 +- storage/ndb/test/ndbapi/testNodeRestart.cpp | 13 +++++++++++-- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp b/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp index 3f16e32d089..28ff20e74ef 100644 --- a/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp +++ b/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp @@ -7093,7 +7093,7 @@ Dbtc::nodeFailCheckTransactions(Signal* signal, for (transPtr.i = transPtrI; transPtr.i < capiConnectFilesize; transPtr.i++) { ptrCheckGuard(transPtr, capiConnectFilesize, apiConnectRecord); - Uiint32 state = transPtr.p->apiConnectstate; + Uint32 state = transPtr.p->apiConnectstate; if (transPtr.p->m_transaction_nodes.get(failedNodeId)) { jam(); diff --git a/storage/ndb/test/ndbapi/testNodeRestart.cpp b/storage/ndb/test/ndbapi/testNodeRestart.cpp index 2bf0d58e591..5474837228a 100644 --- a/storage/ndb/test/ndbapi/testNodeRestart.cpp +++ b/storage/ndb/test/ndbapi/testNodeRestart.cpp @@ -879,12 +879,15 @@ int runBug20185(NDBT_Context* ctx, NDBT_Step* step){ HugoOperations hugoOps(*ctx->getTab()); Ndb* pNdb = GETNDB(step); + const int masterNode = restarter.getMasterNodeId(); + int dump[] = { 7090, 20 } ; if (restarter.dumpStateAllNodes(dump, 2)) return NDBT_FAILED; NdbSleep_MilliSleep(3000); - + +retry: if(hugoOps.startTransaction(pNdb) != 0) return NDBT_FAILED; @@ -894,8 +897,14 @@ int runBug20185(NDBT_Context* ctx, NDBT_Step* step){ if (hugoOps.execute_NoCommit(pNdb) != 0) return NDBT_FAILED; - int nodeId; const int node = hugoOps.getTransaction()->getConnectedNodeId(); + if (node != masterNode) + { + hugoOps.closeTransaction(pNdb); + goto retry; + } + + int nodeId; do { nodeId = restarter.getDbNodeId(rand() % restarter.getNumDbNodes()); } while (nodeId == node); -- cgit v1.2.1 From 674e6e95660b108922eec7101938101ecf9720cb Mon Sep 17 00:00:00 2001 From: unknown Date: Mon, 5 Jun 2006 15:32:18 +0200 Subject: ndb - dbacc - fix gcc4 compile error storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp: add block to remove gcc compiler error storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp: add jamEntry --- storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp | 40 ++++++++++++----------- storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp | 6 ++++ 2 files changed, 27 insertions(+), 19 deletions(-) diff --git a/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp b/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp index ffb3d673713..dc8b123f10f 100644 --- a/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp +++ b/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp @@ -1262,27 +1262,29 @@ Dbacc::startNext(Signal* signal, OperationrecPtr lastOp) ptrCheckGuard(nextOp, coprecsize, operationrec); nextbits = nextOp.p->m_op_bits; - bool same = nextOp.p->is_same_trans(lastOp.p); - - if (!same && ((opbits & Operationrec::OP_ACC_LOCK_MODE) || - (nextbits & Operationrec::OP_LOCK_MODE))) { - jam(); + const bool same = nextOp.p->is_same_trans(lastOp.p); + + if (!same && ((opbits & Operationrec::OP_ACC_LOCK_MODE) || + (nextbits & Operationrec::OP_LOCK_MODE))) + { + jam(); + /** + * Not same transaction + * and either last had exclusive lock + * or next had exclusive lock + */ + return; + } + /** - * Not same transaction - * and either last had exclusive lock - * or next had exclusive lock + * same trans and X-lock */ - return; - } - - /** - * same trans and X-lock - */ - if (same && (opbits & Operationrec::OP_ACC_LOCK_MODE)) - { - jam(); - goto upgrade; + if (same && (opbits & Operationrec::OP_ACC_LOCK_MODE)) + { + jam(); + goto upgrade; + } } /** @@ -1294,7 +1296,7 @@ Dbacc::startNext(Signal* signal, OperationrecPtr lastOp) jam(); goto upgrade; } - + /** * There is a shared parallell queue & and exclusive op is first in queue */ diff --git a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp index 6b0b59ca8f9..b4ea9a18de5 100644 --- a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp +++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp @@ -9063,6 +9063,7 @@ Dblqh::readPrimaryKeys(ScanRecord *scanP, TcConnectionrec *tcConP, Uint32 *dst) } int ret = c_tup->accReadPk(tableId, fragId, fragPageId, pageIndex, dst, false); + jamEntry(); if(0) ndbout_c("readPrimaryKeys(table: %d fragment: %d [ %d %d ] -> %d", tableId, fragId, fragPageId, pageIndex, ret); @@ -9434,6 +9435,11 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq) scanptr.p->m_max_batch_size_rows = max_rows; scanptr.p->m_max_batch_size_bytes = max_bytes; +#if 0 + if (! rangeScan) + tupScan = 1; +#endif + if (! rangeScan && ! tupScan) scanptr.p->scanBlockref = tcConnectptr.p->tcAccBlockref; else if (! tupScan) -- cgit v1.2.1 From a775c47a1e82cc9db429a698955db43a8e48bab6 Mon Sep 17 00:00:00 2001 From: unknown Date: Wed, 7 Jun 2006 16:02:37 +0200 Subject: ndb - fix release compile error storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp: fix release compile --- storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp index b4ea9a18de5..0ddb32efded 100644 --- a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp +++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp @@ -158,7 +158,7 @@ static int TRACENR_FLAG = 0; static NdbOut * traceopout = 0; #define TRACE_OP(regTcPtr, place) do { if (TRACE_OP_CHECK(regTcPtr)) TRACE_OP_DUMP(regTcPtr, place); } while(0) #else -#define TRACE_OP(x) {} +#define TRACE_OP(x, y) {} #endif /* ------------------------------------------------------------------------- */ -- cgit v1.2.1 From 2394be4a7a1eb7e88974aedf0a5c416df0d0ed23 Mon Sep 17 00:00:00 2001 From: unknown Date: Thu, 8 Jun 2006 12:41:06 +0200 Subject: ndb - bug#20334 fix bug in tup scan wrt LCP storage/ndb/include/kernel/signaldata/AccScan.hpp: Add LCP flag to AccScan and ScanFrag storage/ndb/include/kernel/signaldata/ScanFrag.hpp: Add LCP flag to AccScan and ScanFrag storage/ndb/src/kernel/blocks/backup/Backup.cpp: Backup sets LCP scan when lcp storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp: Add LCP scan flag storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp: Add LCP scan flag storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp: Fix assignment of scan record wrt LCP and disk --- storage/ndb/include/kernel/signaldata/AccScan.hpp | 18 +++++++++++++ storage/ndb/include/kernel/signaldata/ScanFrag.hpp | 21 +++++++++++++-- storage/ndb/src/kernel/blocks/backup/Backup.cpp | 30 ++++++++++++---------- storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp | 1 + storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp | 3 +++ storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp | 21 +++++++++------ 6 files changed, 71 insertions(+), 23 deletions(-) diff --git a/storage/ndb/include/kernel/signaldata/AccScan.hpp b/storage/ndb/include/kernel/signaldata/AccScan.hpp index fd1982c77af..3f6bc7f7d4d 100644 --- a/storage/ndb/include/kernel/signaldata/AccScan.hpp +++ b/storage/ndb/include/kernel/signaldata/AccScan.hpp @@ -67,6 +67,9 @@ private: static Uint32 getNRScanFlag(const Uint32 & requestInfo); static void setNRScanFlag(Uint32 & requestInfo, Uint32 nr); + + static Uint32 getLcpScanFlag(const Uint32 & requestInfo); + static void setLcpScanFlag(Uint32 & requestInfo, Uint32 nr); }; /** @@ -77,6 +80,7 @@ private: * z = Descending (TUX) - 1 Bit 6 * d = No disk scan - 1 Bit 7 * n = Node recovery scan - 1 Bit 8 + * c = LCP scan - 1 Bit 9 * * 1111111111222222222233 * 01234567890123456789012345678901 @@ -88,6 +92,7 @@ private: #define AS_DESCENDING_SHIFT (6) #define AS_NO_DISK_SCAN (7) #define AS_NR_SCAN (8) +#define AS_LCP_SCAN (9) inline Uint32 @@ -154,6 +159,19 @@ AccScanReq::setNRScanFlag(UintR & requestInfo, UintR val){ requestInfo |= (val << AS_NR_SCAN); } +inline +Uint32 +AccScanReq::getLcpScanFlag(const Uint32 & requestInfo){ + return (requestInfo >> AS_LCP_SCAN) & 1; +} + +inline +void +AccScanReq::setLcpScanFlag(UintR & requestInfo, UintR val){ + ASSERT_BOOL(val, "AccScanReq::setNoDiskScanFlag"); + requestInfo |= (val << AS_LCP_SCAN); +} + class AccScanConf { /** * Sender(s) diff --git a/storage/ndb/include/kernel/signaldata/ScanFrag.hpp b/storage/ndb/include/kernel/signaldata/ScanFrag.hpp index 3c767c7f69c..b5700addb15 100644 --- a/storage/ndb/include/kernel/signaldata/ScanFrag.hpp +++ b/storage/ndb/include/kernel/signaldata/ScanFrag.hpp @@ -61,7 +61,8 @@ public: static Uint32 getAttrLen(const Uint32 & requestInfo); static Uint32 getScanPrio(const Uint32 & requestInfo); static Uint32 getNoDiskFlag(const Uint32 & requestInfo); - + static Uint32 getLcpScanFlag(const Uint32 & requestInfo); + static void setLockMode(Uint32 & requestInfo, Uint32 lockMode); static void setHoldLockFlag(Uint32 & requestInfo, Uint32 holdLock); static void setKeyinfoFlag(Uint32 & requestInfo, Uint32 keyinfo); @@ -72,6 +73,7 @@ public: static void setAttrLen(Uint32 & requestInfo, Uint32 attrLen); static void setScanPrio(Uint32& requestInfo, Uint32 prio); static void setNoDiskFlag(Uint32& requestInfo, Uint32 val); + static void setLcpScanFlag(Uint32 & requestInfo, Uint32 val); }; class KeyInfo20 { @@ -198,6 +200,7 @@ public: * Request Info * * a = Length of attrinfo - 16 Bits (16-31) + * c = LCP scan - 1 Bit 3 * d = No disk - 1 Bit 4 * l = Lock Mode - 1 Bit 5 * h = Hold lock - 1 Bit 7 @@ -205,7 +208,7 @@ public: * r = read committed - 1 Bit 9 * x = range scan - 1 Bit 6 * z = descending - 1 Bit 10 - * t = tup scan -1 Bit 11 (implies x=z=0) + * t = tup scan - 1 Bit 11 (implies x=z=0) * p = Scan prio - 4 Bits (12-15) -> max 15 * * 1111111111222222222233 @@ -222,6 +225,7 @@ public: #define SF_RANGE_SCAN_SHIFT (6) #define SF_DESCENDING_SHIFT (10) #define SF_TUP_SCAN_SHIFT (11) +#define SF_LCP_SCAN_SHIFT (3) #define SF_ATTR_LEN_SHIFT (16) #define SF_ATTR_LEN_MASK (65535) @@ -359,6 +363,19 @@ ScanFragReq::setNoDiskFlag(UintR & requestInfo, UintR val){ requestInfo |= (val << SF_NO_DISK_SHIFT); } +inline +Uint32 +ScanFragReq::getLcpScanFlag(const Uint32 & requestInfo){ + return (requestInfo >> SF_LCP_SCAN_SHIFT) & 1; +} + +inline +void +ScanFragReq::setLcpScanFlag(UintR & requestInfo, UintR val){ + ASSERT_BOOL(val, "ScanFragReq::setLcpScanFlag"); + requestInfo |= (val << SF_LCP_SCAN_SHIFT); +} + inline Uint32 KeyInfo20::setScanInfo(Uint32 opNo, Uint32 scanNo){ diff --git a/storage/ndb/src/kernel/blocks/backup/Backup.cpp b/storage/ndb/src/kernel/blocks/backup/Backup.cpp index 620fac9eb05..07df1db862b 100644 --- a/storage/ndb/src/kernel/blocks/backup/Backup.cpp +++ b/storage/ndb/src/kernel/blocks/backup/Backup.cpp @@ -2985,22 +2985,25 @@ Backup::parseTableDescription(Signal* signal, if (disk) { /** - * Remove all disk attributes, but add DISK_REF (8 bytes) + * Remove all disk attributes */ - tabPtr.p->noOfAttributes -= (disk - 1); + tabPtr.p->noOfAttributes -= disk; - AttributePtr attrPtr; - ndbrequire(tabPtr.p->attributes.seize(attrPtr)); - - Uint32 sz32 = 2; - attrPtr.p->data.attrId = AttributeHeader::DISK_REF; - attrPtr.p->data.m_flags = Attribute::COL_FIXED; - attrPtr.p->data.sz32 = 2; - - attrPtr.p->data.offset = tabPtr.p->sz_FixedAttributes; - tabPtr.p->sz_FixedAttributes += sz32; + { + AttributePtr attrPtr; + ndbrequire(tabPtr.p->attributes.seize(attrPtr)); + + Uint32 sz32 = 2; + attrPtr.p->data.attrId = AttributeHeader::DISK_REF; + attrPtr.p->data.m_flags = Attribute::COL_FIXED; + attrPtr.p->data.sz32 = 2; + + attrPtr.p->data.offset = tabPtr.p->sz_FixedAttributes; + tabPtr.p->sz_FixedAttributes += sz32; + tabPtr.p->noOfAttributes ++; + } } - + { AttributePtr attrPtr; ndbrequire(tabPtr.p->attributes.seize(attrPtr)); @@ -3309,6 +3312,7 @@ Backup::execBACKUP_FRAGMENT_REQ(Signal* signal) ScanFragReq::setScanPrio(req->requestInfo, 1); ScanFragReq::setTupScanFlag(req->requestInfo, 1); ScanFragReq::setNoDiskFlag(req->requestInfo, 1); + ScanFragReq::setLcpScanFlag(req->requestInfo, 1); } req->transId1 = 0; req->transId2 = (BACKUP << 20) + (getOwnNodeId() << 8); diff --git a/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp b/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp index 5caa823dbc3..f1d1fdbf000 100644 --- a/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp +++ b/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp @@ -572,6 +572,7 @@ public: Uint8 rangeScan; Uint8 descending; Uint8 tupScan; + Uint8 lcpScan; Uint8 scanTcWaiting; Uint8 scanKeyinfoFlag; Uint8 m_last_row; diff --git a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp index 0ddb32efded..53cf5c06fe4 100644 --- a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp +++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp @@ -8362,6 +8362,8 @@ void Dblqh::continueAfterReceivingAllAiLab(Signal* signal) AccScanReq::setDescendingFlag(req->requestInfo, scanptr.p->descending); AccScanReq::setNoDiskScanFlag(req->requestInfo, !tcConnectptr.p->m_disk_table); + AccScanReq::setLcpScanFlag(req->requestInfo, scanptr.p->lcpScan); + req->transId1 = tcConnectptr.p->transid[0]; req->transId2 = tcConnectptr.p->transid[1]; req->savePointId = tcConnectptr.p->savePointId; @@ -9453,6 +9455,7 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq) scanptr.p->rangeScan = rangeScan; scanptr.p->descending = descending; scanptr.p->tupScan = tupScan; + scanptr.p->lcpScan = ScanFragReq::getLcpScanFlag(reqinfo); scanptr.p->scanState = ScanRecord::SCAN_FREE; scanptr.p->scanFlag = ZFALSE; scanptr.p->m_row_id.setNull(); diff --git a/storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp b/storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp index d9e94e63726..3ebfbd4aaa9 100644 --- a/storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp +++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp @@ -53,7 +53,10 @@ Dbtup::execACC_SCANREQ(Signal* signal) Fragrecord& frag = *fragPtr.p; // flags Uint32 bits = 0; - if (frag.m_lcp_scan_op == RNIL) { + + if (!AccScanReq::getLcpScanFlag(req->requestInfo) || + tablePtr.p->m_no_of_disk_attributes == 0) + { // seize from pool and link to per-fragment list LocalDLList list(c_scanOpPool, frag.m_scanList); if (! list.seize(scanPtr)) { @@ -63,23 +66,25 @@ Dbtup::execACC_SCANREQ(Signal* signal) if (!AccScanReq::getNoDiskScanFlag(req->requestInfo) && tablePtr.p->m_no_of_disk_attributes) { - bits |= ScanOp::SCAN_DD; + bits |= ScanOp::SCAN_DD; } bool mm = (bits & ScanOp::SCAN_DD); if (tablePtr.p->m_attributes[mm].m_no_of_varsize > 0) { bits |= ScanOp::SCAN_VS; - // disk pages have fixed page format - ndbrequire(! (bits & ScanOp::SCAN_DD)); + // disk pages have fixed page format + ndbrequire(! (bits & ScanOp::SCAN_DD)); } if (! AccScanReq::getReadCommittedFlag(req->requestInfo)) { - if (AccScanReq::getLockMode(req->requestInfo) == 0) - bits |= ScanOp::SCAN_LOCK_SH; - else - bits |= ScanOp::SCAN_LOCK_EX; + if (AccScanReq::getLockMode(req->requestInfo) == 0) + bits |= ScanOp::SCAN_LOCK_SH; + else + bits |= ScanOp::SCAN_LOCK_EX; } } else { jam(); + // LCP scan and disk + ndbrequire(frag.m_lcp_scan_op == c_lcp_scan_op); c_scanOpPool.getPtr(scanPtr, frag.m_lcp_scan_op); bits |= ScanOp::SCAN_LCP; -- cgit v1.2.1 From 6925d636867a4e062a906392d472a6c77bf71ca3 Mon Sep 17 00:00:00 2001 From: unknown Date: Thu, 8 Jun 2006 15:43:41 +0200 Subject: ndb - hugo Print (and optionally specify) random seed storage/ndb/test/src/NDBT_Test.cpp: Print (and optionally specify) random seed --- storage/ndb/test/src/NDBT_Test.cpp | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/storage/ndb/test/src/NDBT_Test.cpp b/storage/ndb/test/src/NDBT_Test.cpp index 701c9912373..1d21a0d1756 100644 --- a/storage/ndb/test/src/NDBT_Test.cpp +++ b/storage/ndb/test/src/NDBT_Test.cpp @@ -1110,6 +1110,7 @@ static int opt_timer; static char * opt_remote_mgm = NULL; static char * opt_testname = NULL; static int opt_verbose; +static int opt_seed = 0; static struct my_option my_long_options[] = { @@ -1129,6 +1130,9 @@ static struct my_option my_long_options[] = { "loops", 'l', "Number of loops", (gptr*) &opt_loops, (gptr*) &opt_loops, 0, GET_INT, REQUIRED_ARG, 5, 0, 0, 0, 0, 0 }, + { "seed", 1024, "Random seed", + (gptr*) &opt_seed, (gptr*) &opt_seed, 0, + GET_INT, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 }, { "testname", 'n', "Name of test to run", (gptr*) &opt_testname, (gptr*) &opt_testname, 0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 }, @@ -1224,6 +1228,14 @@ int NDBT_TestSuite::execute(int argc, const char** argv){ { return NDBT_ProgramExit(NDBT_FAILED); } + + if (opt_seed == 0) + { + opt_seed = NdbTick_CurrentMillisecond(); + } + ndbout_c("random seed: %u", opt_seed); + srand(opt_seed); + srandom(opt_seed); { Ndb ndb(&con, "TEST_DB"); -- cgit v1.2.1 From 6aa63d18199bd2d6b5a0a1de8c1f8a45e094c8b6 Mon Sep 17 00:00:00 2001 From: unknown Date: Fri, 9 Jun 2006 08:22:50 +0200 Subject: ndb - testframework impl. ugly flag to skip dict cache invalidation as it break all testSystemRestart testcases storage/ndb/src/ndbapi/ClusterMgr.cpp: impl. ugly flag to skip dict cache invalidation as it break all testSystemRestart testcases storage/ndb/test/src/NDBT_Test.cpp: impl. ugly flag to skip dict cache invalidation as it break all testSystemRestart testcases --- storage/ndb/src/ndbapi/ClusterMgr.cpp | 14 +++++++++----- storage/ndb/test/src/NDBT_Test.cpp | 4 ++++ 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/storage/ndb/src/ndbapi/ClusterMgr.cpp b/storage/ndb/src/ndbapi/ClusterMgr.cpp index b108ed3fd41..63fdb73c49f 100644 --- a/storage/ndb/src/ndbapi/ClusterMgr.cpp +++ b/storage/ndb/src/ndbapi/ClusterMgr.cpp @@ -38,6 +38,7 @@ #include int global_flag_send_heartbeat_now= 0; +int global_flag_skip_invalidate_cache = 0; // Just a C wrapper for threadMain extern "C" @@ -458,11 +459,14 @@ ClusterMgr::reportNodeFailed(NodeId nodeId){ theNode.nfCompleteRep = false; if(noOfAliveNodes == 0) { - theFacade.m_globalDictCache.lock(); - theFacade.m_globalDictCache.invalidate_all(); - theFacade.m_globalDictCache.unlock(); - m_connect_count ++; - m_cluster_state = CS_waiting_for_clean_cache; + if (!global_flag_skip_invalidate_cache) + { + theFacade.m_globalDictCache.lock(); + theFacade.m_globalDictCache.invalidate_all(); + theFacade.m_globalDictCache.unlock(); + m_connect_count ++; + m_cluster_state = CS_waiting_for_clean_cache; + } NFCompleteRep rep; for(Uint32 i = 1; i Date: Fri, 9 Jun 2006 09:06:43 +0200 Subject: ndb - restore --with-ndb-ccflags functionality config/ac-macros/ha_ndbcluster.m4: restore --with-ndb-ccflags functionality --- config/ac-macros/ha_ndbcluster.m4 | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/config/ac-macros/ha_ndbcluster.m4 b/config/ac-macros/ha_ndbcluster.m4 index 505d000c196..ee31fa9fca2 100644 --- a/config/ac-macros/ha_ndbcluster.m4 +++ b/config/ac-macros/ha_ndbcluster.m4 @@ -197,7 +197,7 @@ AC_DEFUN([MYSQL_SETUP_NDBCLUSTER], [ MAKE_BINARY_DISTRIBUTION_OPTIONS="$MAKE_BINARY_DISTRIBUTION_OPTIONS --with-ndbcluster" - # CXXFLAGS="$CXXFLAGS \$(NDB_CXXFLAGS)" + CXXFLAGS="$CXXFLAGS \$(NDB_CXXFLAGS)" if test "$have_ndb_debug" = "default" then have_ndb_debug=$with_debug -- cgit v1.2.1 From 26917822951431339821cfe94cf3e67dd1f3957e Mon Sep 17 00:00:00 2001 From: unknown Date: Fri, 9 Jun 2006 11:39:27 +0200 Subject: ndb - fix 5.1 scan error handling problem Make sure that error gets set corretly also when error code directly after taking mutex storage/ndb/src/ndbapi/NdbScanOperation.cpp: Make sure that error gets set corretly also when error code directly after taking mutex --- storage/ndb/src/ndbapi/NdbScanOperation.cpp | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/storage/ndb/src/ndbapi/NdbScanOperation.cpp b/storage/ndb/src/ndbapi/NdbScanOperation.cpp index 6a4e657d172..49ef742a93f 100644 --- a/storage/ndb/src/ndbapi/NdbScanOperation.cpp +++ b/storage/ndb/src/ndbapi/NdbScanOperation.cpp @@ -478,10 +478,14 @@ int NdbScanOperation::nextResultImpl(bool fetchAllowed, bool forceSend) */ PollGuard poll_guard(tp, &theNdb->theImpl->theWaiter, theNdb->theNdbBlockNumber); - if(theError.code) - return -1; - Uint32 seq = theNdbCon->theNodeSequence; + const Uint32 seq = theNdbCon->theNodeSequence; + + if(theError.code) + { + goto err4; + } + if(seq == tp->getNodeSequence(nodeId) && send_next_scan(idx, false) == 0) { @@ -564,6 +568,10 @@ int NdbScanOperation::nextResultImpl(bool fetchAllowed, bool forceSend) if(theError.code == 0) setErrorCode(4028); // seq changed = Node fail break; + case -4: +err4: + setErrorCode(theError.code); + break; } theNdbCon->theTransactionIsStarted = false; -- cgit v1.2.1