summaryrefslogtreecommitdiff
path: root/storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp
blob: c027226584cac0c04465e93667798b3df3abf58f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
/* Copyright (C) 2003 MySQL AB

   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation; version 2 of the License.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */

#ifndef NdbEventOperationImpl_H
#define NdbEventOperationImpl_H

#include <NdbEventOperation.hpp>
#include <signaldata/SumaImpl.hpp>
#include <transporter/TransporterDefinitions.hpp>
#include <NdbRecAttr.hpp>
#include <AttributeHeader.hpp>
#include <UtilBuffer.hpp>

#define NDB_EVENT_OP_MAGIC_NUMBER 0xA9F301B4
//#define EVENT_DEBUG
#ifdef EVENT_DEBUG
#define DBUG_ENTER_EVENT(A) DBUG_ENTER(A)
#define DBUG_RETURN_EVENT(A) DBUG_RETURN(A)
#define DBUG_VOID_RETURN_EVENT DBUG_VOID_RETURN
#define DBUG_PRINT_EVENT(A,B) DBUG_PRINT(A,B)
#define DBUG_DUMP_EVENT(A,B,C) DBUG_DUMP(A,B,C)
#else
#define DBUG_ENTER_EVENT(A)
#define DBUG_RETURN_EVENT(A) return(A)
#define DBUG_VOID_RETURN_EVENT return
#define DBUG_PRINT_EVENT(A,B)
#define DBUG_DUMP_EVENT(A,B,C)
#endif

#undef NDB_EVENT_VERIFY_SIZE
#ifdef VM_TRACE
// not much effect on performance, leave on
#define NDB_EVENT_VERIFY_SIZE
#endif

class NdbEventOperationImpl;

struct EventBufData
{
  union {
    SubTableData *sdata;
    Uint32 *memory;
  };
  LinearSectionPtr ptr[3];
  unsigned sz;
  NdbEventOperationImpl *m_event_op;

  /*
   * Blobs are stored in blob list (m_next_blob) where each entry
   * is list of parts (m_next).  TODO order by part number
   *
   * Processed data (m_used_data, m_free_data) keeps the old blob
   * list intact.  It is reconsumed when new data items are needed.
   *
   * Data item lists keep track of item count and sum(sz) and
   * these include both main items and blob parts.
   */

  EventBufData *m_next; // Next wrt to global order or Next blob part
  EventBufData *m_next_blob; // First part in next blob

  EventBufData *m_next_hash; // Next in per-GCI hash
  Uint32 m_pkhash; // PK hash (without op) for fast compare

  EventBufData() {}

  // Get blob part number from blob data
  Uint32 get_blob_part_no() const;

  /*
   * Main item does not include summary of parts (space / performance
   * tradeoff).  The summary is needed when moving single data item.
   * It is not needed when moving entire list.
   */
  void get_full_size(Uint32 & full_count, Uint32 & full_sz) const {
    full_count = 1;
    full_sz = sz;
    if (m_next_blob != 0)
      add_part_size(full_count, full_sz);
  }
  void add_part_size(Uint32 & full_count, Uint32 & full_sz) const;
};

class EventBufData_list
{
public:
  EventBufData_list();
  ~EventBufData_list();

  // remove first and return its size
  void remove_first(Uint32 & full_count, Uint32 & full_sz);
  // for remove+append avoid double call to get_full_size()
  void append_used_data(EventBufData *data, Uint32 full_count, Uint32 full_sz);
  // append data and insert data but ignore Gci_op list
  void append_used_data(EventBufData *data);
  // append data and insert data into Gci_op list with add_gci_op
  void append_data(EventBufData *data);
  // append list to another, will call move_gci_ops
  void append_list(EventBufData_list *list, Uint64 gci);

  int is_empty();

  EventBufData *m_head, *m_tail;
  Uint32 m_count;
  Uint32 m_sz;

  /*
    distinct ops per gci (assume no hash needed)

    list may be in 2 versions

    1. single list with on gci only
    - one linear array
    Gci_op  *m_gci_op_list;
    Uint32 m_gci_op_count;
    Uint32 m_gci_op_alloc != 0;

    2. multi list with several gcis
    - linked list of gci's
    - one linear array per gci
    Gci_ops *m_gci_ops_list;
    Gci_ops *m_gci_ops_list_tail;
    Uint32 m_is_not_multi_list == 0;

  */
  struct Gci_op                 // 1 + 2
  {
    NdbEventOperationImpl* op;
    Uint32 event_types;
  };
  struct Gci_ops                // 2
  {
    Uint64 m_gci;
    Gci_op *m_gci_op_list;
    Gci_ops *m_next;
    Uint32 m_gci_op_count;
  };
  union
  {
    Gci_op  *m_gci_op_list;      // 1
    Gci_ops *m_gci_ops_list;     // 2
  };
  union
  {
    Uint32 m_gci_op_count;       // 1
    Gci_ops *m_gci_ops_list_tail;// 2
  };
  union
  {
    Uint32 m_gci_op_alloc;       // 1
    Uint32 m_is_not_multi_list;  // 2
  };
  Gci_ops *first_gci_ops();
  Gci_ops *next_gci_ops();
  // case 1 above; add Gci_op to single list
  void add_gci_op(Gci_op g);
private:
  // case 2 above; move single list or multi list from
  // one list to another
  void move_gci_ops(EventBufData_list *list, Uint64 gci);
};

inline
EventBufData_list::EventBufData_list()
  : m_head(0), m_tail(0),
    m_count(0),
    m_sz(0),
    m_gci_op_list(NULL),
    m_gci_ops_list_tail(0),
    m_gci_op_alloc(0)
{
  DBUG_ENTER_EVENT("EventBufData_list::EventBufData_list");
  DBUG_PRINT_EVENT("info", ("this: %p", this));
  DBUG_VOID_RETURN_EVENT;
}

inline
EventBufData_list::~EventBufData_list()
{
  DBUG_ENTER_EVENT("EventBufData_list::~EventBufData_list");
  DBUG_PRINT_EVENT("info", ("this: %p  m_is_not_multi_list: %u",
                            this, m_is_not_multi_list));
  if (m_is_not_multi_list)
  {
    DBUG_PRINT_EVENT("info", ("delete m_gci_op_list: %p", m_gci_op_list));
    delete [] m_gci_op_list;
  }
  else
  {
    Gci_ops *op = first_gci_ops();
    while (op)
      op = next_gci_ops();
  }
  DBUG_VOID_RETURN_EVENT;
}

inline
int EventBufData_list::is_empty()
{
  return m_head == 0;
}

inline
void EventBufData_list::remove_first(Uint32 & full_count, Uint32 & full_sz)
{
  m_head->get_full_size(full_count, full_sz);
#ifdef VM_TRACE
  assert(m_count >= full_count);
  assert(m_sz >= full_sz);
#endif
  m_count -= full_count;
  m_sz -= full_sz;
  m_head = m_head->m_next;
  if (m_head == 0)
    m_tail = 0;
}

inline
void EventBufData_list::append_used_data(EventBufData *data, Uint32 full_count, Uint32 full_sz)
{
  data->m_next = 0;
  if (m_tail)
    m_tail->m_next = data;
  else
  {
#ifdef VM_TRACE
    assert(m_head == 0);
    assert(m_count == 0);
    assert(m_sz == 0);
#endif
    m_head = data;
  }
  m_tail = data;

  m_count += full_count;
  m_sz += full_sz;
}

inline
void EventBufData_list::append_used_data(EventBufData *data)
{
  Uint32 full_count, full_sz;
  data->get_full_size(full_count, full_sz);
  append_used_data(data, full_count, full_sz);
}

inline
void EventBufData_list::append_data(EventBufData *data)
{
  Gci_op g = { data->m_event_op, 
	       1 << SubTableData::getOperation(data->sdata->requestInfo) };
  add_gci_op(g);

  append_used_data(data);
}

inline EventBufData_list::Gci_ops *
EventBufData_list::first_gci_ops()
{
  assert(!m_is_not_multi_list);
  return m_gci_ops_list;
}

inline EventBufData_list::Gci_ops *
EventBufData_list::next_gci_ops()
{
  assert(!m_is_not_multi_list);
  Gci_ops *first = m_gci_ops_list;
  m_gci_ops_list = first->m_next;
  if (first->m_gci_op_list)
  {
    DBUG_PRINT_EVENT("info", ("this: %p  delete m_gci_op_list: %p",
                              this, first->m_gci_op_list));
    delete [] first->m_gci_op_list;
  }
  delete first;
  if (m_gci_ops_list == 0)
    m_gci_ops_list_tail = 0;
  return m_gci_ops_list;
}

// GCI bucket has also a hash over data, with key event op, table PK.
// It can only be appended to and is invalid after remove_first().
class EventBufData_hash
{
public:
  struct Pos { // search result
    Uint32 index;       // index into hash array
    EventBufData* data; // non-zero if found
    Uint32 pkhash;      // PK hash
  };

  static Uint32 getpkhash(NdbEventOperationImpl* op, LinearSectionPtr ptr[3]);
  static bool getpkequal(NdbEventOperationImpl* op, LinearSectionPtr ptr1[3], LinearSectionPtr ptr2[3]);

  void search(Pos& hpos, NdbEventOperationImpl* op, LinearSectionPtr ptr[3]);
  void append(Pos& hpos, EventBufData* data);

  enum { GCI_EVENT_HASH_SIZE = 101 };
  EventBufData* m_hash[GCI_EVENT_HASH_SIZE];
};

inline
void EventBufData_hash::append(Pos& hpos, EventBufData* data)
{
  data->m_next_hash = m_hash[hpos.index];
  m_hash[hpos.index] = data;
}

struct Gci_container
{
  enum State 
  {
    GC_COMPLETE = 0x1 // GCI is complete, but waiting for out of order
  };
  
  Uint32 m_state;
  Uint32 m_gcp_complete_rep_count; // Remaining SUB_GCP_COMPLETE_REP until done
  Uint64 m_gci;                    // GCI
  EventBufData_list m_data;
  EventBufData_hash m_data_hash;
};

struct Gci_container_pod
{
  char data[sizeof(Gci_container)];
};

class NdbEventOperationImpl : public NdbEventOperation {
public:
  NdbEventOperationImpl(NdbEventOperation &f,
			Ndb *theNdb, 
			const char* eventName);
  NdbEventOperationImpl(Ndb *theNdb, 
			NdbEventImpl& evnt);
  void init(NdbEventImpl& evnt);
  NdbEventOperationImpl(NdbEventOperationImpl&); //unimplemented
  NdbEventOperationImpl& operator=(const NdbEventOperationImpl&); //unimplemented
  ~NdbEventOperationImpl();

  NdbEventOperation::State getState();

  int execute();
  int execute_nolock();
  int stop();
  NdbRecAttr *getValue(const char *colName, char *aValue, int n);
  NdbRecAttr *getValue(const NdbColumnImpl *, char *aValue, int n);
  NdbBlob *getBlobHandle(const char *colName, int n);
  NdbBlob *getBlobHandle(const NdbColumnImpl *, int n);
  int readBlobParts(char* buf, NdbBlob* blob, Uint32 part, Uint32 count);
  int receive_event();
  bool tableNameChanged() const;
  bool tableFrmChanged() const;
  bool tableFragmentationChanged() const;
  bool tableRangeListChanged() const;
  Uint64 getGCI();
  Uint32 getAnyValue() const;
  Uint64 getLatestGCI();
  bool execSUB_TABLE_DATA(NdbApiSignal * signal, 
                          LinearSectionPtr ptr[3]);

  NdbDictionary::Event::TableEvent getEventType();

  void print();
  void printAll();

  NdbEventOperation *m_facade;
  Uint32 m_magic_number;

  const NdbError & getNdbError() const;
  NdbError m_error;

  Ndb *m_ndb;
  NdbEventImpl *m_eventImpl;

  NdbRecAttr *theFirstPkAttrs[2];
  NdbRecAttr *theCurrentPkAttrs[2];
  NdbRecAttr *theFirstDataAttrs[2];
  NdbRecAttr *theCurrentDataAttrs[2];

  NdbBlob* theBlobList;
  NdbEventOperationImpl* theBlobOpList; // in main op, list of blob ops
  NdbEventOperationImpl* theMainOp; // in blob op, the main op

  NdbEventOperation::State m_state; /* note connection to mi_type */
  Uint32 mi_type; /* should be == 0 if m_state != EO_EXECUTING
		   * else same as in EventImpl
		   */
  Uint32 m_eventId;
  Uint32 m_oid;

  /*
    m_node_bit_mask keeps track of which ndb nodes have reference to
    an event op

    - add    - TE_ACTIVE
    - remove - TE_STOP, TE_NODE_FAILURE, TE_CLUSTER_FAILURE

    TE_NODE_FAILURE and TE_CLUSTER_FAILURE are created as events
    and added to all event ops listed as active or pending delete
    in m_dropped_ev_op using insertDataL, includeing the blob
    event ops referenced by a regular event op.
    - NdbEventBuffer::report_node_failure
    - NdbEventBuffer::completeClusterFailed

    TE_ACTIVE is sent from the kernel on initial execute/start of the
    event op, but is also internally generetad on node connect like
    TE_NODE_FAILURE and TE_CLUSTER_FAILURE
    - NdbEventBuffer::report_node_connected

    when m_node_bit_mask becomes clear, the kernel reference is
    removed from m_ref_count

    node id 0 is used to denote that cluster has a reference
   */

  Bitmask<(unsigned int)_NDB_NODE_BITMASK_SIZE> m_node_bit_mask;

  /*
    m_ref_count keeps track of outstanding references to an event
    operation impl object.  To make sure that the object is not
    deleted too early.

    If on dropEventOperation there are still references to an
    object it is queued for delete in NdbEventBuffer::m_dropped_ev_op
  
    the following references exists for a _non_ blob event op:
    * user reference
    - add    - NdbEventBuffer::createEventOperation
    - remove - NdbEventBuffer::dropEventOperation
    * kernel reference
    - add    - execute_nolock
    - remove - TE_STOP, TE_CLUSTER_FAILURE
    * blob reference
    - add    - execute_nolock on blob event
    - remove - TE_STOP, TE_CLUSTER_FAILURE on blob event
    * gci reference
    - add    - insertDataL/add_gci_op
    - remove - NdbEventBuffer::deleteUsedEventOperations

    the following references exists for a blob event op:
    * kernel reference
    - add    - execute_nolock
    - remove - TE_STOP, TE_CLUSTER_FAILURE    
   */

  int m_ref_count;
  bool m_mergeEvents;
  
  EventBufData *m_data_item;

  void *m_custom_data;
  int m_has_error;

  Uint32 m_fragmentId;
  UtilBuffer m_buffer;

  // Bit mask for what has changed in a table (for TE_ALTER event)
  Uint32 m_change_mask;

#ifdef VM_TRACE
  Uint32 m_data_done_count;
  Uint32 m_data_count;
#endif

  // managed by the ndb object
  NdbEventOperationImpl *m_next;
  NdbEventOperationImpl *m_prev;
private:
  void receive_data(NdbRecAttr *r, const Uint32 *data, Uint32 sz);
};

class NdbEventBuffer {
public:
  NdbEventBuffer(Ndb*);
  ~NdbEventBuffer();

  const Uint32 &m_system_nodes;
  Vector<Gci_container_pod> m_active_gci;
  NdbEventOperation *createEventOperation(const char* eventName,
					  NdbError &);
  NdbEventOperationImpl *createEventOperationImpl(NdbEventImpl& evnt,
                                                  NdbError &);
  void dropEventOperation(NdbEventOperation *);
  static NdbEventOperationImpl* getEventOperationImpl(NdbEventOperation* tOp);

  void add_drop_lock() { NdbMutex_Lock(m_add_drop_mutex); }
  void add_drop_unlock() { NdbMutex_Unlock(m_add_drop_mutex); }
  void lock() { NdbMutex_Lock(m_mutex); }
  void unlock() { NdbMutex_Unlock(m_mutex); }

  void add_op();
  void remove_op();
  void init_gci_containers();

  // accessed from the "receive thread"
  int insertDataL(NdbEventOperationImpl *op,
		  const SubTableData * const sdata,
		  LinearSectionPtr ptr[3]);
  void execSUB_GCP_COMPLETE_REP(const SubGcpCompleteRep * const rep);
  void complete_outof_order_gcis();
  
  void report_node_connected(Uint32 node_id);
  void report_node_failure(Uint32 node_id);
  void completeClusterFailed();

  // used by user thread 
  Uint64 getLatestGCI();
  Uint32 getEventId(int bufferId);

  int pollEvents(int aMillisecondNumber, Uint64 *latestGCI= 0);
  int flushIncompleteEvents(Uint64 gci);
  NdbEventOperation *nextEvent();
  NdbEventOperationImpl* getGCIEventOperations(Uint32* iter,
                                               Uint32* event_types);
  void deleteUsedEventOperations();

  NdbEventOperationImpl *move_data();

  // routines to copy/merge events
  EventBufData* alloc_data();
  int alloc_mem(EventBufData* data,
                LinearSectionPtr ptr[3],
                Uint32 * change_sz);
  void dealloc_mem(EventBufData* data,
                   Uint32 * change_sz);
  int copy_data(const SubTableData * const sdata,
                LinearSectionPtr ptr[3],
                EventBufData* data,
                Uint32 * change_sz);
  int merge_data(const SubTableData * const sdata,
                 LinearSectionPtr ptr[3],
                 EventBufData* data,
                 Uint32 * change_sz);
  int get_main_data(Gci_container* bucket,
                    EventBufData_hash::Pos& hpos,
                    EventBufData* blob_data);
  void add_blob_data(Gci_container* bucket,
                     EventBufData* main_data,
                     EventBufData* blob_data);

  void free_list(EventBufData_list &list);

  void reportStatus();

  // Global Mutex used for some things
  static NdbMutex *p_add_drop_mutex;

#ifdef VM_TRACE
  const char *m_latest_command;
  Uint64 m_flush_gci;
#endif

  Ndb *m_ndb;
  Uint64 m_latestGCI;           // latest "handover" GCI
  Uint64 m_latest_complete_GCI; // latest complete GCI (in case of outof order)

  NdbMutex *m_mutex;
  struct NdbCondition *p_cond;

  // receive thread
  Gci_container m_complete_data;
  EventBufData *m_free_data;
#ifdef VM_TRACE
  Uint32 m_free_data_count;
#endif
  Uint32 m_free_data_sz;

  // user thread
  EventBufData_list m_available_data;
  EventBufData_list m_used_data;

  unsigned m_total_alloc; // total allocated memory

  // threshholds to report status
  unsigned m_free_thresh, m_min_free_thresh, m_max_free_thresh;
  unsigned m_gci_slip_thresh;

  NdbError m_error;

#ifdef VM_TRACE
  static void verify_size(const EventBufData* data, Uint32 count, Uint32 sz);
  static void verify_size(const EventBufData_list & list);
#endif

private:
  void insert_event(NdbEventOperationImpl* impl,
                    SubTableData &data,
                    LinearSectionPtr *ptr,
                    Uint32 &oid_ref);
  
  int expand(unsigned sz);

  // all allocated data
  struct EventBufData_chunk
  {
    unsigned sz;
    EventBufData data[1];
  };
  Vector<EventBufData_chunk *> m_allocated_data;
  unsigned m_sz;

  /*
    dropped event operations (dropEventOperation) that have not yet
    been deleted because of outstanding m_ref_count

    check for delete is done on occations when the ref_count may have
    changed by calling deleteUsedEventOperations:
    - nextEvent - each time the user has completed processing a gci
  */
  NdbEventOperationImpl *m_dropped_ev_op;

  Uint32 m_active_op_count;
  NdbMutex *m_add_drop_mutex;
};

inline
NdbEventOperationImpl*
NdbEventBuffer::getEventOperationImpl(NdbEventOperation* tOp)
{
  return &tOp->m_impl;
}

inline void
NdbEventOperationImpl::receive_data(NdbRecAttr *r,
				    const Uint32 *data,
				    Uint32 sz)
{
  r->receive_data(data,sz);
#if 0
  if (sz)
  {
    assert((r->attrSize() * r->arraySize() + 3) >> 2 == sz);
    r->theNULLind= 0;
    memcpy(r->aRef(), data, 4 * sz);
    return;
  }
  r->theNULLind= 1;
#endif
}

#endif