summaryrefslogtreecommitdiff
path: root/ndb/src/kernel/blocks/suma/Suma.hpp
blob: e479ebb7691e82e55e9ef322b95c6efb304175b8 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
/* Copyright (C) 2003 MySQL AB

   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation; version 2 of the License.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */

#ifndef SUMA_H
#define SUMA_H

#include <ndb_limits.h>
#include <SimulatedBlock.hpp>

#include <NodeBitmask.hpp>

#include <SLList.hpp>
#include <DLList.hpp>
#include <KeyTable.hpp>
#include <DataBuffer.hpp>
#include <SignalCounter.hpp>
#include <AttributeHeader.hpp>
#include <AttributeList.hpp>

#include <signaldata/UtilSequence.hpp>
#include <signaldata/SumaImpl.hpp>

class SumaParticipant : public SimulatedBlock {
protected:
  SumaParticipant(const Configuration & conf);
  virtual ~SumaParticipant();
  BLOCK_DEFINES(SumaParticipant);
  
protected:
  /**
   * Private interface
   */
  void execSUB_CREATE_REQ(Signal* signal);
  void execSUB_REMOVE_REQ(Signal* signal);
  
  void execSUB_START_REQ(Signal* signal);
  void execSUB_STOP_REQ(Signal* signal);
  
  void execSUB_SYNC_REQ(Signal* signal);
  void execSUB_ABORT_SYNC_REQ(Signal* signal);

  void execSUB_STOP_CONF(Signal* signal);
  void execSUB_STOP_REF(Signal* signal);

 /**
   * Dict interface
   */
  void execLIST_TABLES_REF(Signal* signal);
  void execLIST_TABLES_CONF(Signal* signal);
  void execGET_TABINFOREF(Signal* signal);
  void execGET_TABINFO_CONF(Signal* signal);
#if 0
  void execGET_TABLEID_CONF(Signal* signal);
  void execGET_TABLEID_REF(Signal* signal);
#endif
  /**
   * Scan interface
   */
  void execSCAN_HBREP(Signal* signal);
  void execSCAN_FRAGREF(Signal* signal);
  void execSCAN_FRAGCONF(Signal* signal);
  void execTRANSID_AI(Signal* signal);
  void execSUB_SYNC_CONTINUE_REF(Signal* signal);
  void execSUB_SYNC_CONTINUE_CONF(Signal* signal);
  
  /**
   * DIH signals
   */
  void execDI_FCOUNTREF(Signal* signal);
  void execDI_FCOUNTCONF(Signal* signal);
  void execDIGETPRIMREF(Signal* signal);
  void execDIGETPRIMCONF(Signal* signal);

  /**
   * continueb
   */
  void execCONTINUEB(Signal* signal);

public:
  typedef DataBuffer<15> TableList;
  
  union FragmentDescriptor { 
    struct  {
      Uint16 m_fragmentNo;
      Uint16 m_nodeId;
    } m_fragDesc;
    Uint32 m_dummy;
  };
  
  /**
   * Used when sending SCAN_FRAG
   */
  union AttributeDescriptor {
    struct {
      Uint16 attrId;
      Uint16 unused;
    } m_attrDesc;
    Uint32 m_dummy;
  };

  struct Table {
    Table() { m_tableId = ~0; }
    void release(SumaParticipant&);

    union { Uint32 m_tableId; Uint32 key; };
    Uint32 m_schemaVersion;
    Uint32 m_hasTriggerDefined[3]; // Insert/Update/Delete
    Uint32 m_triggerIds[3]; // Insert/Update/Delete
    
    /**
     * Default order in which to ask for attributes during scan
     *   1) Fixed, not nullable
     *   2) Rest
     */
    DataBuffer<15>::Head m_attributes; // Attribute id's
    
    /**
     * Fragments
     */
    DataBuffer<15>::Head m_fragments;  // Fragment descriptors
    
    /**
     * Hash table stuff
     */
    Uint32 nextHash;
    union { Uint32 prevHash; Uint32 nextPool; };
    Uint32 hashValue() const {
      return m_tableId;
    }
    bool equal(const Table& rec) const {
      return m_tableId == rec.m_tableId;
    }
  };
  typedef Ptr<Table> TablePtr;

  /**
   * Subscriptions
   */
  struct SyncRecord {
    SyncRecord(SumaParticipant& s, DataBuffer<15>::DataBufferPool & p)
      : m_locked(false), m_tableList(p), suma(s)
#ifdef ERROR_INSERT
	, cerrorInsert(s.cerrorInsert)
#endif
    {}
    
    void release();

    Uint32 m_subscriptionPtrI;
    bool   m_locked;
    bool   m_doSendSyncData;
    bool   m_error;
    TableList m_tableList;    // Tables to sync (snapshoted at beginning)
    TableList::DataBufferIterator m_tableList_it;

    /**
     * Sync meta
     */
    void startMeta(Signal*);
    void nextMeta(Signal*);
    void completeMeta(Signal*);
    
    /**
     * Sync data
     */
    Uint32 m_currentTable;          // Index in m_tableList
    Uint32 m_currentFragment;       // Index in tabPtr.p->m_fragments
    DataBuffer<15>::Head m_attributeList; // Attribute if other than default
    DataBuffer<15>::Head m_tabList; // tables if other than default
    
    Uint32 m_currentTableId;        // Current table
    Uint32 m_currentNoOfAttributes; // No of attributes for current table
    void startScan(Signal*);
    void nextScan(Signal*);
    bool getNextFragment(TablePtr * tab, FragmentDescriptor * fd);
    void completeScan(Signal*);

    SumaParticipant & suma;
#ifdef ERROR_INSERT
    UintR &cerrorInsert;
#endif
    BlockNumber number() const { return suma.number(); }
    void progError(int line, int cause, const char * extra) { 
      suma.progError(line, cause, extra); 
    }
    
    void runGET_TABINFO_CONF(Signal* signal);    
    void runGET_TABINFOREF(Signal* signal);
    
    void runDI_FCOUNTCONF(Signal* signal);
    void runDIGETPRIMCONF(Signal* signal);

    Uint32 ptrI;
    union { Uint32 nextPool; Uint32 nextList; };
  };
  friend struct SyncRecord;
  
  struct Subscription {
    Subscription() {}
    Uint32 m_subscriberRef;
    Uint32 m_subscriberData;
    Uint32 m_senderRef;
    Uint32 m_senderData;
    Uint32 m_subscriptionId;
    Uint32 m_subscriptionKey;
    Uint32 m_subscriptionType;
    Uint32 m_coordinatorRef;
    Uint32 m_syncPtrI;  // Active sync operation
    Uint32 m_nSubscribers;
    bool m_markRemove;

    Uint32 nextHash;
    union { Uint32 prevHash; Uint32 nextPool; };

    Uint32 hashValue() const {
      return m_subscriptionId + m_subscriptionKey;
    }

    bool equal(const Subscription & s) const {
      return 
	m_subscriptionId == s.m_subscriptionId && 
	m_subscriptionKey == s.m_subscriptionKey;
    }
    /**
     * The following holds the table names of tables included 
     * in the subscription.
     */
    // TODO we've got to fix this, this is to inefficient. Tomas
    char m_tables[MAX_TABLES];
#if 0
    char m_tableNames[MAX_TABLES][MAX_TAB_NAME_SIZE];
#endif
    /**
     * "Iterator" used to iterate through m_tableNames
     */
    Uint32 m_maxTables;
    Uint32 m_currentTable;
  };
  typedef Ptr<Subscription> SubscriptionPtr;
  
  struct Subscriber {
    Uint32 m_senderRef;
    Uint32 m_senderData;
    Uint32 m_subscriberRef;
    Uint32 m_subscriberData;
    Uint32 m_subPtrI; //reference to subscription
    Uint32 nextList;
    union { Uint32 nextPool; Uint32 prevList; };
  };
  typedef Ptr<Subscriber> SubscriberPtr;

  /**
   * 
   */
  DLList<Subscriber> c_metaSubscribers;
  DLList<Subscriber> c_dataSubscribers;
  DLList<Subscriber> c_prepDataSubscribers;
  DLList<Subscriber> c_removeDataSubscribers;

  /**
   * Lists
   */
  KeyTable<Table> c_tables;
  DLHashTable<Subscription> c_subscriptions;
  
  /**
   * Pools
   */
  ArrayPool<Subscriber> c_subscriberPool;
  ArrayPool<Table> c_tablePool_;
  ArrayPool<Subscription> c_subscriptionPool;
  ArrayPool<SyncRecord> c_syncPool;
  DataBuffer<15>::DataBufferPool c_dataBufferPool;

  /**
   * Functions
   */
  bool parseTable(Signal* signal, class GetTabInfoConf* conf, Uint32 tableId,
		  SyncRecord* syncPtr_p);
  bool checkTableTriggers(SegmentedSectionPtr ptr);

  void addTableId(Uint32 TableId,
		  SubscriptionPtr subPtr, SyncRecord *psyncRec);

  void sendSubIdRef(Signal* signal, Uint32 errorCode);
  void sendSubCreateConf(Signal* signal, Uint32 sender, SubscriptionPtr subPtr);  
  void sendSubCreateRef(Signal* signal, const SubCreateReq& req, Uint32 errorCode);  
  void sendSubSyncRef(Signal* signal, Uint32 errorCode);  
  void sendSubRemoveRef(Signal* signal, const SubRemoveReq& ref,
			Uint32 errorCode, bool temporary = false);
  void completeSubRemoveReq(Signal* signal, SubscriptionPtr subPtr);

  /**
   * Table admin
   */
  void convertNameToId( SubscriptionPtr subPtr, Signal * signal);


};

class Suma : public SumaParticipant {
  BLOCK_DEFINES(Suma);
public:
  Suma(const Configuration & conf);
  virtual ~Suma();
private:
  /**
   * Public interface
   */
  void execCREATE_SUBSCRIPTION_REQ(Signal* signal);
  void execDROP_SUBSCRIPTION_REQ(Signal* signal);
  
  void execSTART_SUBSCRIPTION_REQ(Signal* signal);
  void execSTOP_SUBSCRIPTION_REQ(Signal* signal);
  
  void execSYNC_SUBSCRIPTION_REQ(Signal* signal);
  void execABORT_SYNC_REQ(Signal* signal);

  /**
   * Framework signals
   */

  void execREAD_CONFIG_REQ(Signal* signal);

  void execSTTOR(Signal* signal);
  void sendSTTORRY(Signal*);
  void execNDB_STTOR(Signal* signal);
  void execDUMP_STATE_ORD(Signal* signal);
  void execREAD_NODESCONF(Signal* signal);
  void execNODE_FAILREP(Signal* signal);
  void execINCL_NODEREQ(Signal* signal);
  void execCONTINUEB(Signal* signal);
  void execSIGNAL_DROPPED_REP(Signal* signal);

  /**
   * Controller interface
   */
  void execSUB_ABORT_SYNC_REF(Signal* signal);
  void execSUB_ABORT_SYNC_CONF(Signal* signal);

  /**
   * Subscription generation interface
   */
  void createSequence(Signal* signal);
  void createSequenceReply(Signal* signal,
			   UtilSequenceConf* conf,
			   UtilSequenceRef* ref);
  void execUTIL_SEQUENCE_CONF(Signal* signal);  
  void execUTIL_SEQUENCE_REF(Signal* signal);
  void execCREATE_SUBID_REQ(Signal* signal);
  
private:
  friend class Restart;
  struct SubCoordinator {
    Uint32 m_subscriberRef;
    Uint32 m_subscriberData;
    
    Uint32 m_subscriptionId;
    Uint32 m_subscriptionKey;
    
    NdbNodeBitmask m_participants;
    
    Uint32 m_outstandingGsn;
    SignalCounter m_outstandingRequests;
    
    Uint32 nextList;
    union { Uint32 prevList; Uint32 nextPool; };
  };
  Ptr<SubCoordinator> SubCoordinatorPtr;
  
  struct Node {
    Uint32 nodeId;
    Uint32 alive;
    Uint32 nextList;
    union { Uint32 prevList; Uint32 nextPool; };
  };
  typedef Ptr<Node> NodePtr;

  /**
   * Variables
   */
  NodeId c_masterNodeId;
  SLList<Node> c_nodes;
  NdbNodeBitmask c_aliveNodes;
  NdbNodeBitmask c_preparingNodes;

  Uint32 RtoI(Uint32 sumaRef, bool dieOnNotFound = true);

  /**
   * for all Suma's to keep track of other Suma's in Node group
   */
  Uint32 c_nodeGroup;
  Uint32 c_noNodesInGroup;
  Uint32 c_idInNodeGroup;
  NodeId c_nodesInGroup[MAX_REPLICAS];

  /**
   * don't seem to be used
   */
  ArrayPool<Node> c_nodePool;
  ArrayPool<SubCoordinator> c_subCoordinatorPool;
  DLList<SubCoordinator> c_runningSubscriptions;
};

#endif