summaryrefslogtreecommitdiff
path: root/sql/rpl_handler.h
blob: 4d1bce45c541ee1f31c62c9465b59f19cf53bf2e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
/* Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.

   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation; version 2 of the License.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA */

#ifndef RPL_HANDLER_H
#define RPL_HANDLER_H

#include "sql_priv.h"
#include "rpl_mi.h"
#include "rpl_rli.h"
#include "sql_plugin.h"
#include "replication.h"

class Observer_info {
public:
  void *observer;
  st_plugin_int *plugin_int;
  plugin_ref plugin;

  Observer_info(void *ob, st_plugin_int *p)
    :observer(ob), plugin_int(p)
  {
    plugin= plugin_int_to_ref(plugin_int);
  }
};

class Delegate {
public:
  typedef List<Observer_info> Observer_info_list;
  typedef List_iterator<Observer_info> Observer_info_iterator;
  
  int add_observer(void *observer, st_plugin_int *plugin)
  {
    int ret= FALSE;
    if (!inited)
      return TRUE;
    write_lock();
    Observer_info_iterator iter(observer_info_list);
    Observer_info *info= iter++;
    while (info && info->observer != observer)
      info= iter++;
    if (!info)
    {
      info= new Observer_info(observer, plugin);
      if (!info || observer_info_list.push_back(info, &memroot))
        ret= TRUE;
    }
    else
      ret= TRUE;
    unlock();
    return ret;
  }
  
  int remove_observer(void *observer, st_plugin_int *plugin)
  {
    int ret= FALSE;
    if (!inited)
      return TRUE;
    write_lock();
    Observer_info_iterator iter(observer_info_list);
    Observer_info *info= iter++;
    while (info && info->observer != observer)
      info= iter++;
    if (info)
    {
      iter.remove();
      delete info;
    }
    else
      ret= TRUE;
    unlock();
    return ret;
  }

  inline Observer_info_iterator observer_info_iter()
  {
    return Observer_info_iterator(observer_info_list);
  }

  inline bool is_empty()
  {
    return observer_info_list.is_empty();
  }

  inline int read_lock()
  {
    if (!inited)
      return TRUE;
    return rw_rdlock(&lock);
  }

  inline int write_lock()
  {
    if (!inited)
      return TRUE;
    return rw_wrlock(&lock);
  }

  inline int unlock()
  {
    if (!inited)
      return TRUE;
    return rw_unlock(&lock);
  }

  inline bool is_inited()
  {
    return inited;
  }
  
  Delegate()
  {
    inited= FALSE;
    if (my_rwlock_init(&lock, NULL))
      return;
    init_sql_alloc(&memroot, 1024, 0);
    inited= TRUE;
  }
  ~Delegate()
  {
    inited= FALSE;
    rwlock_destroy(&lock);
    free_root(&memroot, MYF(0));
  }

private:
  Observer_info_list observer_info_list;
  rw_lock_t lock;
  MEM_ROOT memroot;
  bool inited;
};

class Trans_delegate
  :public Delegate {
public:
  typedef Trans_observer Observer;
  int before_commit(THD *thd, bool all);
  int before_rollback(THD *thd, bool all);
  int after_commit(THD *thd, bool all);
  int after_rollback(THD *thd, bool all);
};

class Binlog_storage_delegate
  :public Delegate {
public:
  typedef Binlog_storage_observer Observer;
  int after_flush(THD *thd, const char *log_file,
                  my_off_t log_pos, bool synced);
};

#ifdef HAVE_REPLICATION
class Binlog_transmit_delegate
  :public Delegate {
public:
  typedef Binlog_transmit_observer Observer;
  int transmit_start(THD *thd, ushort flags,
                     const char *log_file, my_off_t log_pos);
  int transmit_stop(THD *thd, ushort flags);
  int reserve_header(THD *thd, ushort flags, String *packet);
  int before_send_event(THD *thd, ushort flags,
                        String *packet, const
                        char *log_file, my_off_t log_pos );
  int after_send_event(THD *thd, ushort flags,
                       String *packet);
  int after_reset_master(THD *thd, ushort flags);
};

class Binlog_relay_IO_delegate
  :public Delegate {
public:
  typedef Binlog_relay_IO_observer Observer;
  int thread_start(THD *thd, Master_info *mi);
  int thread_stop(THD *thd, Master_info *mi);
  int before_request_transmit(THD *thd, Master_info *mi, ushort flags);
  int after_read_event(THD *thd, Master_info *mi,
                       const char *packet, ulong len,
                       const char **event_buf, ulong *event_len);
  int after_queue_event(THD *thd, Master_info *mi,
                        const char *event_buf, ulong event_len,
                        bool synced);
  int after_reset_slave(THD *thd, Master_info *mi);
private:
  void init_param(Binlog_relay_IO_param *param, Master_info *mi);
};
#endif /* HAVE_REPLICATION */

int delegates_init();
void delegates_destroy();

extern Trans_delegate *transaction_delegate;
extern Binlog_storage_delegate *binlog_storage_delegate;
#ifdef HAVE_REPLICATION
extern Binlog_transmit_delegate *binlog_transmit_delegate;
extern Binlog_relay_IO_delegate *binlog_relay_io_delegate;
#endif /* HAVE_REPLICATION */

/*
  if there is no observers in the delegate, we can return 0
  immediately.
*/
#define RUN_HOOK(group, hook, args)             \
  (group ##_delegate->is_empty() ?              \
   0 : group ##_delegate->hook args)

#endif /* RPL_HANDLER_H */