diff options
Diffstat (limited to 'ndb/src/kernel/blocks/suma/Suma.cpp')
-rw-r--r-- | ndb/src/kernel/blocks/suma/Suma.cpp | 142 |
1 files changed, 96 insertions, 46 deletions
diff --git a/ndb/src/kernel/blocks/suma/Suma.cpp b/ndb/src/kernel/blocks/suma/Suma.cpp index 84a59f440d9..c4225ad2a4c 100644 --- a/ndb/src/kernel/blocks/suma/Suma.cpp +++ b/ndb/src/kernel/blocks/suma/Suma.cpp @@ -50,6 +50,17 @@ //#define EVENT_DEBUG //#define EVENT_PH3_DEBUG //#define EVENT_DEBUG2 +#if 0 +#undef DBUG_ENTER +#undef DBUG_PRINT +#undef DBUG_RETURN +#undef DBUG_VOID_RETURN + +#define DBUG_ENTER(a) {ndbout_c("%s:%d >%s", __FILE__, __LINE__, a);} +#define DBUG_PRINT(a,b) {ndbout << __FILE__ << ":" << __LINE__ << " " << a << ": "; ndbout_c b ;} +#define DBUG_RETURN(a) { ndbout_c("%s:%d <", __FILE__, __LINE__); return(a); } +#define DBUG_VOID_RETURN { ndbout_c("%s:%d <", __FILE__, __LINE__); return; } +#endif /** * @todo: @@ -112,15 +123,12 @@ Suma::getNodeGroupMembers(Signal* signal) { void Suma::execSTTOR(Signal* signal) { jamEntry(); - + + DBUG_ENTER("Suma::execSTTOR"); const Uint32 startphase = signal->theData[1]; const Uint32 typeOfStart = signal->theData[7]; -#ifdef NODEFAIL_DEBUG - ndbout_c ("SUMA::execSTTOR startphase = %u, typeOfStart = %u", - startphase, typeOfStart); - -#endif + DBUG_PRINT("info",("startphase = %u, typeOfStart = %u", startphase, typeOfStart)); if(startphase == 1){ jam(); @@ -155,7 +163,7 @@ Suma::execSTTOR(Signal* signal) { g_subPtrI = subPtr.i; // sendSTTORRY(signal); #endif - return; + DBUG_VOID_RETURN; } if(startphase == 5) { @@ -178,9 +186,7 @@ Suma::execSTTOR(Signal* signal) { for( int i = 0; i < NO_OF_BUCKETS; i++) { if (getResponsibleSumaNodeId(i) == refToNode(reference())) { // I'm running this bucket -#ifdef EVENT_DEBUG - ndbout_c("bucket %u set to true", i); -#endif + DBUG_PRINT("info",("bucket %u set to true", i)); c_buckets[i].active = true; } } @@ -190,32 +196,31 @@ Suma::execSTTOR(Signal* signal) { c_masterNodeId == getOwnNodeId()) { jam(); createSequence(signal); - return; + DBUG_VOID_RETURN; }//if }//if sendSTTORRY(signal); - return; + DBUG_VOID_RETURN; } void Suma::createSequence(Signal* signal) { jam(); + DBUG_ENTER("Suma::createSequence"); UtilSequenceReq * req = (UtilSequenceReq*)signal->getDataPtrSend(); req->senderData = RNIL; req->sequenceId = SUMA_SEQUENCE; req->requestType = UtilSequenceReq::Create; -#ifdef DEBUG_SUMA_SEQUENCE - ndbout_c("SUMA: Create sequence"); -#endif sendSignal(DBUTIL_REF, GSN_UTIL_SEQUENCE_REQ, signal, UtilSequenceReq::SignalLength, JBB); // execUTIL_SEQUENCE_CONF will call createSequenceReply() + DBUG_VOID_RETURN; } void @@ -338,6 +343,7 @@ SumaParticipant::execCONTINUEB(Signal* signal) void Suma::execAPI_FAILREQ(Signal* signal) { jamEntry(); + DBUG_ENTER("Suma::execAPI_FAILREQ"); Uint32 failedApiNode = signal->theData[0]; //BlockReference retRef = signal->theData[1]; @@ -348,11 +354,13 @@ void Suma::execAPI_FAILREQ(Signal* signal) jam(); c_failedApiNodes.clear(failedApiNode); } + DBUG_VOID_RETURN; }//execAPI_FAILREQ() bool SumaParticipant::removeSubscribersOnNode(Signal *signal, Uint32 nodeId) { + DBUG_ENTER("SumaParticipant::removeSubscribersOnNode"); bool found = false; SubscriberPtr i_subbPtr; @@ -372,20 +380,15 @@ SumaParticipant::removeSubscribersOnNode(Signal *signal, Uint32 nodeId) jam(); sendSubStopReq(signal); } - return found; + DBUG_RETURN(found); } void -SumaParticipant::sendSubStopReq(Signal *signal){ +SumaParticipant::sendSubStopReq(Signal *signal, bool unlock){ + DBUG_ENTER("SumaParticipant::sendSubStopReq"); static bool remove_lock = false; jam(); - if(remove_lock) { - jam(); - return; - } - remove_lock = true; - SubscriberPtr subbPtr; c_removeDataSubscribers.first(subbPtr); if (subbPtr.isNull()){ @@ -398,9 +401,15 @@ SumaParticipant::sendSubStopReq(Signal *signal){ c_failedApiNodes.clear(); remove_lock = false; - return; + DBUG_VOID_RETURN; } + if(remove_lock && !unlock) { + jam(); + DBUG_VOID_RETURN; + } + remove_lock = true; + SubscriptionPtr subPtr; c_subscriptions.getPtr(subPtr, subbPtr.p->m_subPtrI); @@ -414,11 +423,13 @@ SumaParticipant::sendSubStopReq(Signal *signal){ req->part = SubscriptionData::TableData; sendSignal(SUMA_REF, GSN_SUB_STOP_REQ, signal, SubStopReq::SignalLength, JBB); + DBUG_VOID_RETURN; } void SumaParticipant::execSUB_STOP_CONF(Signal* signal){ jamEntry(); + DBUG_ENTER("SumaParticipant::execSUB_STOP_CONF"); SubStopConf * const conf = (SubStopConf*)signal->getDataPtr(); @@ -444,12 +455,15 @@ SumaParticipant::execSUB_STOP_CONF(Signal* signal){ } } - sendSubStopReq(signal); + sendSubStopReq(signal,true); + DBUG_VOID_RETURN; } void SumaParticipant::execSUB_STOP_REF(Signal* signal){ jamEntry(); + DBUG_ENTER("SumaParticipant::execSUB_STOP_REF"); + SubStopRef * const ref = (SubStopRef*)signal->getDataPtr(); Uint32 subscriptionId = ref->subscriptionId; @@ -471,11 +485,14 @@ SumaParticipant::execSUB_STOP_REF(Signal* signal){ req->part = part; sendSignal(SUMA_REF, GSN_SUB_STOP_REQ, signal, SubStopReq::SignalLength, JBB); + + DBUG_VOID_RETURN; } void Suma::execNODE_FAILREP(Signal* signal){ jamEntry(); + DBUG_ENTER("Suma::execNODE_FAILREP"); NodeFailRep * const rep = (NodeFailRep*)signal->getDataPtr(); @@ -541,6 +558,7 @@ Suma::execNODE_FAILREP(Signal* signal){ c_aliveNodes.clear(nodePtr.p->nodeId); // this has to be done after the loop above } } + DBUG_VOID_RETURN; } void @@ -610,6 +628,19 @@ Suma::execSIGNAL_DROPPED_REP(Signal* signal){ * */ +static unsigned +count_subscribers(const DLList<SumaParticipant::Subscriber> &subs) +{ + unsigned n= 0; + SumaParticipant::SubscriberPtr i_subbPtr; + subs.first(i_subbPtr); + while(!i_subbPtr.isNull()){ + n++; + subs.next(i_subbPtr); + } + return n; +} + void Suma::execDUMP_STATE_ORD(Signal* signal){ jamEntry(); @@ -664,6 +695,15 @@ Suma::execDUMP_STATE_ORD(Signal* signal){ infoEvent("Suma: c_dataBufferPool size: %d free: %d", c_dataBufferPool.getSize(), c_dataBufferPool.getNoOfFree()); + + infoEvent("Suma: c_metaSubscribers count: %d", + count_subscribers(c_metaSubscribers)); + infoEvent("Suma: c_dataSubscribers count: %d", + count_subscribers(c_dataSubscribers)); + infoEvent("Suma: c_prepDataSubscribers count: %d", + count_subscribers(c_prepDataSubscribers)); + infoEvent("Suma: c_removeDataSubscribers count: %d", + count_subscribers(c_removeDataSubscribers)); } } @@ -812,16 +852,14 @@ Suma::execUTIL_SEQUENCE_CONF(Signal* signal) { jamEntry(); + DBUG_ENTER("Suma::execUTIL_SEQUENCE_CONF"); CRASH_INSERTION(13002); UtilSequenceConf * conf = (UtilSequenceConf*)signal->getDataPtr(); -#ifdef DEBUG_SUMA_SEQUENCE - ndbout_c("SUMA: Create sequence conf"); -#endif if(conf->requestType == UtilSequenceReq::Create) { jam(); createSequenceReply(signal, conf, NULL); - return; + DBUG_VOID_RETURN; } Uint64 subId; @@ -841,18 +879,21 @@ Suma::execUTIL_SEQUENCE_CONF(Signal* signal) CreateSubscriptionIdConf::SignalLength, JBB); c_subscriberPool.release(subbPtr); + + DBUG_VOID_RETURN; } void Suma::execUTIL_SEQUENCE_REF(Signal* signal) { jamEntry(); + DBUG_ENTER("Suma::execUTIL_SEQUENCE_REF"); UtilSequenceRef * ref = (UtilSequenceRef*)signal->getDataPtr(); if(ref->requestType == UtilSequenceReq::Create) { jam(); createSequenceReply(signal, NULL, ref); - return; + DBUG_VOID_RETURN; } Uint32 subData = ref->senderData; @@ -861,7 +902,7 @@ Suma::execUTIL_SEQUENCE_REF(Signal* signal) c_subscriberPool.getPtr(subbPtr,subData); sendSubIdRef(signal, GrepError::SEQUENCE_ERROR); c_subscriberPool.release(subbPtr); - return; + DBUG_VOID_RETURN; }//execUTIL_SEQUENCE_REF() @@ -1429,7 +1470,7 @@ SumaParticipant::execDIGETPRIMCONF(Signal* signal){ void SumaParticipant::execCREATE_TRIG_CONF(Signal* signal){ jamEntry(); - + DBUG_ENTER("SumaParticipant::execCREATE_TRIG_CONF"); CRASH_INSERTION(13009); CreateTrigConf * const conf = (CreateTrigConf*)signal->getDataPtr(); @@ -1442,6 +1483,7 @@ SumaParticipant::execCREATE_TRIG_CONF(Signal* signal){ * dodido * @todo: I (Johan) dont know what to do here. Jonas, what do you mean? */ + DBUG_VOID_RETURN; } void @@ -1453,7 +1495,7 @@ SumaParticipant::execCREATE_TRIG_REF(Signal* signal){ void SumaParticipant::execDROP_TRIG_CONF(Signal* signal){ jamEntry(); - + DBUG_ENTER("SumaParticipant::execDROP_TRIG_CONF"); CRASH_INSERTION(13010); DropTrigConf * const conf = (DropTrigConf*)signal->getDataPtr(); @@ -1461,17 +1503,19 @@ SumaParticipant::execDROP_TRIG_CONF(Signal* signal){ const Uint32 senderData = conf->getConnectionPtr(); SyncRecord* tmp = c_syncPool.getPtr(senderData); tmp->runDROP_TRIG_CONF(signal); + DBUG_VOID_RETURN; } void SumaParticipant::execDROP_TRIG_REF(Signal* signal){ jamEntry(); - + DBUG_ENTER("SumaParticipant::execDROP_TRIG_CONF"); DropTrigRef * const ref = (DropTrigRef*)signal->getDataPtr(); const Uint32 senderData = ref->getConnectionPtr(); SyncRecord* tmp = c_syncPool.getPtr(senderData); tmp->runDROP_TRIG_CONF(signal); + DBUG_VOID_RETURN; } /************************************************************************* @@ -2055,9 +2099,7 @@ SumaParticipant::execSCAN_HBREP(Signal* signal){ void SumaParticipant::execSUB_START_REQ(Signal* signal){ jamEntry(); -#ifdef NODEFAIL_DEBUG - ndbout_c("Suma::execSUB_START_REQ"); -#endif + DBUG_ENTER("SumaParticipant::execSUB_START_REQ"); CRASH_INSERTION(13013); @@ -2067,7 +2109,7 @@ SumaParticipant::execSUB_START_REQ(Signal* signal){ if (RtoI(signal->getSendersBlockRef(), false) == RNIL) { jam(); sendSubStartRef(signal, /** Error Code */ 0, true); - return; + DBUG_VOID_RETURN; } // only allow other Suma's in the nodegroup to come through for restart purposes } @@ -2088,7 +2130,7 @@ SumaParticipant::execSUB_START_REQ(Signal* signal){ if(!c_subscriptions.find(subPtr, key)){ jam(); sendSubStartRef(signal, /** Error Code */ 0); - return; + DBUG_VOID_RETURN; } Ptr<SyncRecord> syncPtr; @@ -2099,7 +2141,7 @@ SumaParticipant::execSUB_START_REQ(Signal* signal){ ndbout_c("Locked"); #endif sendSubStartRef(signal, /** Error Code */ 0, true); - return; + DBUG_VOID_RETURN; } syncPtr.p->m_locked = true; @@ -2108,7 +2150,7 @@ SumaParticipant::execSUB_START_REQ(Signal* signal){ jam(); syncPtr.p->m_locked = false; sendSubStartRef(signal, /** Error Code */ 0); - return; + DBUG_VOID_RETURN; } Uint32 type = subPtr.p->m_subscriptionType; @@ -2176,6 +2218,7 @@ SumaParticipant::execSUB_START_REQ(Signal* signal){ break; } ndbrequire(ok); + DBUG_VOID_RETURN; } void @@ -2800,7 +2843,7 @@ SumaParticipant::decideWhoToSend(Uint32 nBucket, Uint32 gci){ void SumaParticipant::execFIRE_TRIG_ORD(Signal* signal){ jamEntry(); - + DBUG_ENTER("SumaParticipant::execFIRE_TRIG_ORD"); CRASH_INSERTION(13016); FireTrigOrd* const trg = (FireTrigOrd*)signal->getDataPtr(); const Uint32 trigId = trg->getTriggerId(); @@ -2928,6 +2971,7 @@ SumaParticipant::execFIRE_TRIG_ORD(Signal* signal){ } } #endif + DBUG_PRINT("info",("GSN_SUB_TABLE_DATA to node %d", refToNode(ref))); sendSignal(ref, GSN_SUB_TABLE_DATA, signal, SubTableData::SignalLength, JBB, ptr, nptr); data->logType = tmp; @@ -2961,6 +3005,8 @@ SumaParticipant::execFIRE_TRIG_ORD(Signal* signal){ */ f_bufferLock = 0; b_bufferLock = 0; + + DBUG_VOID_RETURN; } void @@ -3209,6 +3255,7 @@ bool SumaParticipant::FailoverBuffer::nodeFailRep() void SumaParticipant::execSUB_STOP_REQ(Signal* signal){ jamEntry(); + DBUG_ENTER("SumaParticipant::execSUB_STOP_REQ"); CRASH_INSERTION(13019); @@ -3238,7 +3285,7 @@ SumaParticipant::execSUB_STOP_REQ(Signal* signal){ SubStopConf::SignalLength, JBB); removeSubscribersOnNode(signal, refToNode(subscriberRef)); - return; + DBUG_VOID_RETURN; } if(!c_subscriptions.find(subPtr, key)){ @@ -3264,7 +3311,7 @@ SumaParticipant::execSUB_STOP_REQ(Signal* signal){ for (;!subbPtr.isNull(); c_dataSubscribers.next(subbPtr)){ jam(); if (subbPtr.p->m_subPtrI == subPtr.i && - subbPtr.p->m_subscriberRef == subscriberRef && + refToNode(subbPtr.p->m_subscriberRef) == refToNode(subscriberRef) && subbPtr.p->m_subscriberData == subscriberData){ // ndbout_c("STOP_REQ: before c_dataSubscribers.release"); jam(); @@ -3279,7 +3326,7 @@ SumaParticipant::execSUB_STOP_REQ(Signal* signal){ if (!found) { jam(); sendSubStopRef(signal, GrepError::SUBSCRIBER_NOT_FOUND); - return; + DBUG_VOID_RETURN; } } @@ -3292,11 +3339,12 @@ SumaParticipant::execSUB_STOP_REQ(Signal* signal){ if (syncPtr.p->m_locked) { jam(); sendSubStopRef(signal, /** Error Code */ 0, true); - return; + DBUG_VOID_RETURN; } syncPtr.p->m_locked = true; syncPtr.p->startDropTrigger(signal); + DBUG_VOID_RETURN; } void @@ -3492,6 +3540,8 @@ SumaParticipant::sendSubRemoveRef(Signal* signal, const SubRemoveReq& req, jam(); SubRemoveRef * ref = (SubRemoveRef *)signal->getDataPtrSend(); ref->senderRef = reference(); + ref->subscriptionId = req.subscriptionId; + ref->subscriptionKey = req.subscriptionKey; ref->senderData = req.senderData; ref->err = errCode; if (temporary) |