summaryrefslogtreecommitdiff
path: root/ndb/src/kernel/blocks/suma/Suma.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'ndb/src/kernel/blocks/suma/Suma.cpp')
-rw-r--r--ndb/src/kernel/blocks/suma/Suma.cpp142
1 files changed, 96 insertions, 46 deletions
diff --git a/ndb/src/kernel/blocks/suma/Suma.cpp b/ndb/src/kernel/blocks/suma/Suma.cpp
index 84a59f440d9..c4225ad2a4c 100644
--- a/ndb/src/kernel/blocks/suma/Suma.cpp
+++ b/ndb/src/kernel/blocks/suma/Suma.cpp
@@ -50,6 +50,17 @@
//#define EVENT_DEBUG
//#define EVENT_PH3_DEBUG
//#define EVENT_DEBUG2
+#if 0
+#undef DBUG_ENTER
+#undef DBUG_PRINT
+#undef DBUG_RETURN
+#undef DBUG_VOID_RETURN
+
+#define DBUG_ENTER(a) {ndbout_c("%s:%d >%s", __FILE__, __LINE__, a);}
+#define DBUG_PRINT(a,b) {ndbout << __FILE__ << ":" << __LINE__ << " " << a << ": "; ndbout_c b ;}
+#define DBUG_RETURN(a) { ndbout_c("%s:%d <", __FILE__, __LINE__); return(a); }
+#define DBUG_VOID_RETURN { ndbout_c("%s:%d <", __FILE__, __LINE__); return; }
+#endif
/**
* @todo:
@@ -112,15 +123,12 @@ Suma::getNodeGroupMembers(Signal* signal) {
void
Suma::execSTTOR(Signal* signal) {
jamEntry();
-
+
+ DBUG_ENTER("Suma::execSTTOR");
const Uint32 startphase = signal->theData[1];
const Uint32 typeOfStart = signal->theData[7];
-#ifdef NODEFAIL_DEBUG
- ndbout_c ("SUMA::execSTTOR startphase = %u, typeOfStart = %u",
- startphase, typeOfStart);
-
-#endif
+ DBUG_PRINT("info",("startphase = %u, typeOfStart = %u", startphase, typeOfStart));
if(startphase == 1){
jam();
@@ -155,7 +163,7 @@ Suma::execSTTOR(Signal* signal) {
g_subPtrI = subPtr.i;
// sendSTTORRY(signal);
#endif
- return;
+ DBUG_VOID_RETURN;
}
if(startphase == 5) {
@@ -178,9 +186,7 @@ Suma::execSTTOR(Signal* signal) {
for( int i = 0; i < NO_OF_BUCKETS; i++) {
if (getResponsibleSumaNodeId(i) == refToNode(reference())) {
// I'm running this bucket
-#ifdef EVENT_DEBUG
- ndbout_c("bucket %u set to true", i);
-#endif
+ DBUG_PRINT("info",("bucket %u set to true", i));
c_buckets[i].active = true;
}
}
@@ -190,32 +196,31 @@ Suma::execSTTOR(Signal* signal) {
c_masterNodeId == getOwnNodeId()) {
jam();
createSequence(signal);
- return;
+ DBUG_VOID_RETURN;
}//if
}//if
sendSTTORRY(signal);
- return;
+ DBUG_VOID_RETURN;
}
void
Suma::createSequence(Signal* signal)
{
jam();
+ DBUG_ENTER("Suma::createSequence");
UtilSequenceReq * req = (UtilSequenceReq*)signal->getDataPtrSend();
req->senderData = RNIL;
req->sequenceId = SUMA_SEQUENCE;
req->requestType = UtilSequenceReq::Create;
-#ifdef DEBUG_SUMA_SEQUENCE
- ndbout_c("SUMA: Create sequence");
-#endif
sendSignal(DBUTIL_REF, GSN_UTIL_SEQUENCE_REQ,
signal, UtilSequenceReq::SignalLength, JBB);
// execUTIL_SEQUENCE_CONF will call createSequenceReply()
+ DBUG_VOID_RETURN;
}
void
@@ -338,6 +343,7 @@ SumaParticipant::execCONTINUEB(Signal* signal)
void Suma::execAPI_FAILREQ(Signal* signal)
{
jamEntry();
+ DBUG_ENTER("Suma::execAPI_FAILREQ");
Uint32 failedApiNode = signal->theData[0];
//BlockReference retRef = signal->theData[1];
@@ -348,11 +354,13 @@ void Suma::execAPI_FAILREQ(Signal* signal)
jam();
c_failedApiNodes.clear(failedApiNode);
}
+ DBUG_VOID_RETURN;
}//execAPI_FAILREQ()
bool
SumaParticipant::removeSubscribersOnNode(Signal *signal, Uint32 nodeId)
{
+ DBUG_ENTER("SumaParticipant::removeSubscribersOnNode");
bool found = false;
SubscriberPtr i_subbPtr;
@@ -372,20 +380,15 @@ SumaParticipant::removeSubscribersOnNode(Signal *signal, Uint32 nodeId)
jam();
sendSubStopReq(signal);
}
- return found;
+ DBUG_RETURN(found);
}
void
-SumaParticipant::sendSubStopReq(Signal *signal){
+SumaParticipant::sendSubStopReq(Signal *signal, bool unlock){
+ DBUG_ENTER("SumaParticipant::sendSubStopReq");
static bool remove_lock = false;
jam();
- if(remove_lock) {
- jam();
- return;
- }
- remove_lock = true;
-
SubscriberPtr subbPtr;
c_removeDataSubscribers.first(subbPtr);
if (subbPtr.isNull()){
@@ -398,9 +401,15 @@ SumaParticipant::sendSubStopReq(Signal *signal){
c_failedApiNodes.clear();
remove_lock = false;
- return;
+ DBUG_VOID_RETURN;
}
+ if(remove_lock && !unlock) {
+ jam();
+ DBUG_VOID_RETURN;
+ }
+ remove_lock = true;
+
SubscriptionPtr subPtr;
c_subscriptions.getPtr(subPtr, subbPtr.p->m_subPtrI);
@@ -414,11 +423,13 @@ SumaParticipant::sendSubStopReq(Signal *signal){
req->part = SubscriptionData::TableData;
sendSignal(SUMA_REF, GSN_SUB_STOP_REQ, signal, SubStopReq::SignalLength, JBB);
+ DBUG_VOID_RETURN;
}
void
SumaParticipant::execSUB_STOP_CONF(Signal* signal){
jamEntry();
+ DBUG_ENTER("SumaParticipant::execSUB_STOP_CONF");
SubStopConf * const conf = (SubStopConf*)signal->getDataPtr();
@@ -444,12 +455,15 @@ SumaParticipant::execSUB_STOP_CONF(Signal* signal){
}
}
- sendSubStopReq(signal);
+ sendSubStopReq(signal,true);
+ DBUG_VOID_RETURN;
}
void
SumaParticipant::execSUB_STOP_REF(Signal* signal){
jamEntry();
+ DBUG_ENTER("SumaParticipant::execSUB_STOP_REF");
+
SubStopRef * const ref = (SubStopRef*)signal->getDataPtr();
Uint32 subscriptionId = ref->subscriptionId;
@@ -471,11 +485,14 @@ SumaParticipant::execSUB_STOP_REF(Signal* signal){
req->part = part;
sendSignal(SUMA_REF, GSN_SUB_STOP_REQ, signal, SubStopReq::SignalLength, JBB);
+
+ DBUG_VOID_RETURN;
}
void
Suma::execNODE_FAILREP(Signal* signal){
jamEntry();
+ DBUG_ENTER("Suma::execNODE_FAILREP");
NodeFailRep * const rep = (NodeFailRep*)signal->getDataPtr();
@@ -541,6 +558,7 @@ Suma::execNODE_FAILREP(Signal* signal){
c_aliveNodes.clear(nodePtr.p->nodeId); // this has to be done after the loop above
}
}
+ DBUG_VOID_RETURN;
}
void
@@ -610,6 +628,19 @@ Suma::execSIGNAL_DROPPED_REP(Signal* signal){
*
*/
+static unsigned
+count_subscribers(const DLList<SumaParticipant::Subscriber> &subs)
+{
+ unsigned n= 0;
+ SumaParticipant::SubscriberPtr i_subbPtr;
+ subs.first(i_subbPtr);
+ while(!i_subbPtr.isNull()){
+ n++;
+ subs.next(i_subbPtr);
+ }
+ return n;
+}
+
void
Suma::execDUMP_STATE_ORD(Signal* signal){
jamEntry();
@@ -664,6 +695,15 @@ Suma::execDUMP_STATE_ORD(Signal* signal){
infoEvent("Suma: c_dataBufferPool size: %d free: %d",
c_dataBufferPool.getSize(),
c_dataBufferPool.getNoOfFree());
+
+ infoEvent("Suma: c_metaSubscribers count: %d",
+ count_subscribers(c_metaSubscribers));
+ infoEvent("Suma: c_dataSubscribers count: %d",
+ count_subscribers(c_dataSubscribers));
+ infoEvent("Suma: c_prepDataSubscribers count: %d",
+ count_subscribers(c_prepDataSubscribers));
+ infoEvent("Suma: c_removeDataSubscribers count: %d",
+ count_subscribers(c_removeDataSubscribers));
}
}
@@ -812,16 +852,14 @@ Suma::execUTIL_SEQUENCE_CONF(Signal* signal)
{
jamEntry();
+ DBUG_ENTER("Suma::execUTIL_SEQUENCE_CONF");
CRASH_INSERTION(13002);
UtilSequenceConf * conf = (UtilSequenceConf*)signal->getDataPtr();
-#ifdef DEBUG_SUMA_SEQUENCE
- ndbout_c("SUMA: Create sequence conf");
-#endif
if(conf->requestType == UtilSequenceReq::Create) {
jam();
createSequenceReply(signal, conf, NULL);
- return;
+ DBUG_VOID_RETURN;
}
Uint64 subId;
@@ -841,18 +879,21 @@ Suma::execUTIL_SEQUENCE_CONF(Signal* signal)
CreateSubscriptionIdConf::SignalLength, JBB);
c_subscriberPool.release(subbPtr);
+
+ DBUG_VOID_RETURN;
}
void
Suma::execUTIL_SEQUENCE_REF(Signal* signal)
{
jamEntry();
+ DBUG_ENTER("Suma::execUTIL_SEQUENCE_REF");
UtilSequenceRef * ref = (UtilSequenceRef*)signal->getDataPtr();
if(ref->requestType == UtilSequenceReq::Create) {
jam();
createSequenceReply(signal, NULL, ref);
- return;
+ DBUG_VOID_RETURN;
}
Uint32 subData = ref->senderData;
@@ -861,7 +902,7 @@ Suma::execUTIL_SEQUENCE_REF(Signal* signal)
c_subscriberPool.getPtr(subbPtr,subData);
sendSubIdRef(signal, GrepError::SEQUENCE_ERROR);
c_subscriberPool.release(subbPtr);
- return;
+ DBUG_VOID_RETURN;
}//execUTIL_SEQUENCE_REF()
@@ -1429,7 +1470,7 @@ SumaParticipant::execDIGETPRIMCONF(Signal* signal){
void
SumaParticipant::execCREATE_TRIG_CONF(Signal* signal){
jamEntry();
-
+ DBUG_ENTER("SumaParticipant::execCREATE_TRIG_CONF");
CRASH_INSERTION(13009);
CreateTrigConf * const conf = (CreateTrigConf*)signal->getDataPtr();
@@ -1442,6 +1483,7 @@ SumaParticipant::execCREATE_TRIG_CONF(Signal* signal){
* dodido
* @todo: I (Johan) dont know what to do here. Jonas, what do you mean?
*/
+ DBUG_VOID_RETURN;
}
void
@@ -1453,7 +1495,7 @@ SumaParticipant::execCREATE_TRIG_REF(Signal* signal){
void
SumaParticipant::execDROP_TRIG_CONF(Signal* signal){
jamEntry();
-
+ DBUG_ENTER("SumaParticipant::execDROP_TRIG_CONF");
CRASH_INSERTION(13010);
DropTrigConf * const conf = (DropTrigConf*)signal->getDataPtr();
@@ -1461,17 +1503,19 @@ SumaParticipant::execDROP_TRIG_CONF(Signal* signal){
const Uint32 senderData = conf->getConnectionPtr();
SyncRecord* tmp = c_syncPool.getPtr(senderData);
tmp->runDROP_TRIG_CONF(signal);
+ DBUG_VOID_RETURN;
}
void
SumaParticipant::execDROP_TRIG_REF(Signal* signal){
jamEntry();
-
+ DBUG_ENTER("SumaParticipant::execDROP_TRIG_CONF");
DropTrigRef * const ref = (DropTrigRef*)signal->getDataPtr();
const Uint32 senderData = ref->getConnectionPtr();
SyncRecord* tmp = c_syncPool.getPtr(senderData);
tmp->runDROP_TRIG_CONF(signal);
+ DBUG_VOID_RETURN;
}
/*************************************************************************
@@ -2055,9 +2099,7 @@ SumaParticipant::execSCAN_HBREP(Signal* signal){
void
SumaParticipant::execSUB_START_REQ(Signal* signal){
jamEntry();
-#ifdef NODEFAIL_DEBUG
- ndbout_c("Suma::execSUB_START_REQ");
-#endif
+ DBUG_ENTER("SumaParticipant::execSUB_START_REQ");
CRASH_INSERTION(13013);
@@ -2067,7 +2109,7 @@ SumaParticipant::execSUB_START_REQ(Signal* signal){
if (RtoI(signal->getSendersBlockRef(), false) == RNIL) {
jam();
sendSubStartRef(signal, /** Error Code */ 0, true);
- return;
+ DBUG_VOID_RETURN;
}
// only allow other Suma's in the nodegroup to come through for restart purposes
}
@@ -2088,7 +2130,7 @@ SumaParticipant::execSUB_START_REQ(Signal* signal){
if(!c_subscriptions.find(subPtr, key)){
jam();
sendSubStartRef(signal, /** Error Code */ 0);
- return;
+ DBUG_VOID_RETURN;
}
Ptr<SyncRecord> syncPtr;
@@ -2099,7 +2141,7 @@ SumaParticipant::execSUB_START_REQ(Signal* signal){
ndbout_c("Locked");
#endif
sendSubStartRef(signal, /** Error Code */ 0, true);
- return;
+ DBUG_VOID_RETURN;
}
syncPtr.p->m_locked = true;
@@ -2108,7 +2150,7 @@ SumaParticipant::execSUB_START_REQ(Signal* signal){
jam();
syncPtr.p->m_locked = false;
sendSubStartRef(signal, /** Error Code */ 0);
- return;
+ DBUG_VOID_RETURN;
}
Uint32 type = subPtr.p->m_subscriptionType;
@@ -2176,6 +2218,7 @@ SumaParticipant::execSUB_START_REQ(Signal* signal){
break;
}
ndbrequire(ok);
+ DBUG_VOID_RETURN;
}
void
@@ -2800,7 +2843,7 @@ SumaParticipant::decideWhoToSend(Uint32 nBucket, Uint32 gci){
void
SumaParticipant::execFIRE_TRIG_ORD(Signal* signal){
jamEntry();
-
+ DBUG_ENTER("SumaParticipant::execFIRE_TRIG_ORD");
CRASH_INSERTION(13016);
FireTrigOrd* const trg = (FireTrigOrd*)signal->getDataPtr();
const Uint32 trigId = trg->getTriggerId();
@@ -2928,6 +2971,7 @@ SumaParticipant::execFIRE_TRIG_ORD(Signal* signal){
}
}
#endif
+ DBUG_PRINT("info",("GSN_SUB_TABLE_DATA to node %d", refToNode(ref)));
sendSignal(ref, GSN_SUB_TABLE_DATA, signal,
SubTableData::SignalLength, JBB, ptr, nptr);
data->logType = tmp;
@@ -2961,6 +3005,8 @@ SumaParticipant::execFIRE_TRIG_ORD(Signal* signal){
*/
f_bufferLock = 0;
b_bufferLock = 0;
+
+ DBUG_VOID_RETURN;
}
void
@@ -3209,6 +3255,7 @@ bool SumaParticipant::FailoverBuffer::nodeFailRep()
void
SumaParticipant::execSUB_STOP_REQ(Signal* signal){
jamEntry();
+ DBUG_ENTER("SumaParticipant::execSUB_STOP_REQ");
CRASH_INSERTION(13019);
@@ -3238,7 +3285,7 @@ SumaParticipant::execSUB_STOP_REQ(Signal* signal){
SubStopConf::SignalLength, JBB);
removeSubscribersOnNode(signal, refToNode(subscriberRef));
- return;
+ DBUG_VOID_RETURN;
}
if(!c_subscriptions.find(subPtr, key)){
@@ -3264,7 +3311,7 @@ SumaParticipant::execSUB_STOP_REQ(Signal* signal){
for (;!subbPtr.isNull(); c_dataSubscribers.next(subbPtr)){
jam();
if (subbPtr.p->m_subPtrI == subPtr.i &&
- subbPtr.p->m_subscriberRef == subscriberRef &&
+ refToNode(subbPtr.p->m_subscriberRef) == refToNode(subscriberRef) &&
subbPtr.p->m_subscriberData == subscriberData){
// ndbout_c("STOP_REQ: before c_dataSubscribers.release");
jam();
@@ -3279,7 +3326,7 @@ SumaParticipant::execSUB_STOP_REQ(Signal* signal){
if (!found) {
jam();
sendSubStopRef(signal, GrepError::SUBSCRIBER_NOT_FOUND);
- return;
+ DBUG_VOID_RETURN;
}
}
@@ -3292,11 +3339,12 @@ SumaParticipant::execSUB_STOP_REQ(Signal* signal){
if (syncPtr.p->m_locked) {
jam();
sendSubStopRef(signal, /** Error Code */ 0, true);
- return;
+ DBUG_VOID_RETURN;
}
syncPtr.p->m_locked = true;
syncPtr.p->startDropTrigger(signal);
+ DBUG_VOID_RETURN;
}
void
@@ -3492,6 +3540,8 @@ SumaParticipant::sendSubRemoveRef(Signal* signal, const SubRemoveReq& req,
jam();
SubRemoveRef * ref = (SubRemoveRef *)signal->getDataPtrSend();
ref->senderRef = reference();
+ ref->subscriptionId = req.subscriptionId;
+ ref->subscriptionKey = req.subscriptionKey;
ref->senderData = req.senderData;
ref->err = errCode;
if (temporary)