summaryrefslogtreecommitdiff
path: root/sql/rpl_handler.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/rpl_handler.cc')
-rw-r--r--sql/rpl_handler.cc79
1 files changed, 18 insertions, 61 deletions
diff --git a/sql/rpl_handler.cc b/sql/rpl_handler.cc
index a706fcd37ee..34d3df23435 100644
--- a/sql/rpl_handler.cc
+++ b/sql/rpl_handler.cc
@@ -170,40 +170,16 @@ void delegates_destroy()
/*
This macro is used by almost all the Delegate methods to iterate
over all the observers running given callback function of the
- delegate .
-
- Add observer plugins to the thd->lex list, after each statement, all
- plugins add to thd->lex will be automatically unlocked.
+ delegate.
*/
-#define FOREACH_OBSERVER(r, f, thd, args) \
+#define FOREACH_OBSERVER(r, f, do_lock, args) \
param.server_id= thd->variables.server_id; \
- /*
- Use a struct to make sure that they are allocated adjacent, check
- delete_dynamic().
- */ \
- struct { \
- DYNAMIC_ARRAY plugins; \
- /* preallocate 8 slots */ \
- plugin_ref plugins_buffer[8]; \
- } s; \
- DYNAMIC_ARRAY *plugins= &s.plugins; \
- plugin_ref *plugins_buffer= s.plugins_buffer; \
- init_dynamic_array2(plugins, sizeof(plugin_ref), \
- plugins_buffer, 8, 8, MYF(0)); \
read_lock(); \
Observer_info_iterator iter= observer_info_iter(); \
Observer_info *info= iter++; \
for (; info; info= iter++) \
{ \
- plugin_ref plugin= \
- my_plugin_lock(0, info->plugin); \
- if (!plugin) \
- { \
- /* plugin is not intialized or deleted, this is not an error */ \
- r= 0; \
- break; \
- } \
- insert_dynamic(plugins, (uchar *)&plugin); \
+ if (do_lock) plugin_lock(thd, plugin_int_to_ref(info->plugin_int)); \
if (((Observer *)info->observer)->f \
&& ((Observer *)info->observer)->f args) \
{ \
@@ -213,17 +189,7 @@ void delegates_destroy()
break; \
} \
} \
- unlock(); \
- /*
- Unlock plugins should be done after we released the Delegate lock
- to avoid possible deadlock when this is the last user of the
- plugin, and when we unlock the plugin, it will try to
- deinitialize the plugin, which will try to lock the Delegate in
- order to remove the observers.
- */ \
- plugin_unlock_list(0, (plugin_ref*)plugins->buffer, \
- plugins->elements); \
- delete_dynamic(plugins)
+ unlock();
int Trans_delegate::after_commit(THD *thd, bool all)
@@ -240,7 +206,7 @@ int Trans_delegate::after_commit(THD *thd, bool all)
param.log_pos= log_info ? log_info->log_pos : 0;
int ret= 0;
- FOREACH_OBSERVER(ret, after_commit, thd, (&param));
+ FOREACH_OBSERVER(ret, after_commit, false, (&param));
/*
This is the end of a real transaction or autocommit statement, we
@@ -268,7 +234,7 @@ int Trans_delegate::after_rollback(THD *thd, bool all)
param.log_pos= log_info ? log_info->log_pos : 0;
int ret= 0;
- FOREACH_OBSERVER(ret, after_rollback, thd, (&param));
+ FOREACH_OBSERVER(ret, after_rollback, false, (&param));
/*
This is the end of a real transaction or autocommit statement, we
@@ -307,7 +273,7 @@ int Binlog_storage_delegate::after_flush(THD *thd,
log_info->log_pos = log_pos;
int ret= 0;
- FOREACH_OBSERVER(ret, after_flush, thd,
+ FOREACH_OBSERVER(ret, after_flush, false,
(&param, log_info->log_file, log_info->log_pos, flags));
return ret;
}
@@ -321,7 +287,7 @@ int Binlog_transmit_delegate::transmit_start(THD *thd, ushort flags,
param.flags= flags;
int ret= 0;
- FOREACH_OBSERVER(ret, transmit_start, thd, (&param, log_file, log_pos));
+ FOREACH_OBSERVER(ret, transmit_start, true, (&param, log_file, log_pos));
return ret;
}
@@ -331,7 +297,7 @@ int Binlog_transmit_delegate::transmit_stop(THD *thd, ushort flags)
param.flags= flags;
int ret= 0;
- FOREACH_OBSERVER(ret, transmit_stop, thd, (&param));
+ FOREACH_OBSERVER(ret, transmit_stop, false, (&param));
return ret;
}
@@ -356,13 +322,6 @@ int Binlog_transmit_delegate::reserve_header(THD *thd, ushort flags,
Observer_info *info= iter++;
for (; info; info= iter++)
{
- plugin_ref plugin=
- my_plugin_lock(thd, info->plugin);
- if (!plugin)
- {
- ret= 1;
- break;
- }
hlen= 0;
if (((Observer *)info->observer)->reserve_header
&& ((Observer *)info->observer)->reserve_header(&param,
@@ -371,10 +330,8 @@ int Binlog_transmit_delegate::reserve_header(THD *thd, ushort flags,
&hlen))
{
ret= 1;
- plugin_unlock(thd, plugin);
break;
}
- plugin_unlock(thd, plugin);
if (hlen == 0)
continue;
if (hlen > RESERVE_HEADER_SIZE || packet->append((char *)header, hlen))
@@ -396,7 +353,7 @@ int Binlog_transmit_delegate::before_send_event(THD *thd, ushort flags,
param.flags= flags;
int ret= 0;
- FOREACH_OBSERVER(ret, before_send_event, thd,
+ FOREACH_OBSERVER(ret, before_send_event, false,
(&param, (uchar *)packet->c_ptr(),
packet->length(),
log_file+dirname_length(log_file), log_pos));
@@ -410,7 +367,7 @@ int Binlog_transmit_delegate::after_send_event(THD *thd, ushort flags,
param.flags= flags;
int ret= 0;
- FOREACH_OBSERVER(ret, after_send_event, thd,
+ FOREACH_OBSERVER(ret, after_send_event, false,
(&param, packet->c_ptr(), packet->length()));
return ret;
}
@@ -422,7 +379,7 @@ int Binlog_transmit_delegate::after_reset_master(THD *thd, ushort flags)
param.flags= flags;
int ret= 0;
- FOREACH_OBSERVER(ret, after_reset_master, thd, (&param));
+ FOREACH_OBSERVER(ret, after_reset_master, false, (&param));
return ret;
}
@@ -443,7 +400,7 @@ int Binlog_relay_IO_delegate::thread_start(THD *thd, Master_info *mi)
init_param(&param, mi);
int ret= 0;
- FOREACH_OBSERVER(ret, thread_start, thd, (&param));
+ FOREACH_OBSERVER(ret, thread_start, true, (&param));
return ret;
}
@@ -455,7 +412,7 @@ int Binlog_relay_IO_delegate::thread_stop(THD *thd, Master_info *mi)
init_param(&param, mi);
int ret= 0;
- FOREACH_OBSERVER(ret, thread_stop, thd, (&param));
+ FOREACH_OBSERVER(ret, thread_stop, false, (&param));
return ret;
}
@@ -467,7 +424,7 @@ int Binlog_relay_IO_delegate::before_request_transmit(THD *thd,
init_param(&param, mi);
int ret= 0;
- FOREACH_OBSERVER(ret, before_request_transmit, thd, (&param, (uint32)flags));
+ FOREACH_OBSERVER(ret, before_request_transmit, false, (&param, (uint32)flags));
return ret;
}
@@ -480,7 +437,7 @@ int Binlog_relay_IO_delegate::after_read_event(THD *thd, Master_info *mi,
init_param(&param, mi);
int ret= 0;
- FOREACH_OBSERVER(ret, after_read_event, thd,
+ FOREACH_OBSERVER(ret, after_read_event, false,
(&param, packet, len, event_buf, event_len));
return ret;
}
@@ -498,7 +455,7 @@ int Binlog_relay_IO_delegate::after_queue_event(THD *thd, Master_info *mi,
flags |= BINLOG_STORAGE_IS_SYNCED;
int ret= 0;
- FOREACH_OBSERVER(ret, after_queue_event, thd,
+ FOREACH_OBSERVER(ret, after_queue_event, false,
(&param, event_buf, event_len, flags));
return ret;
}
@@ -510,7 +467,7 @@ int Binlog_relay_IO_delegate::after_reset_slave(THD *thd, Master_info *mi)
init_param(&param, mi);
int ret= 0;
- FOREACH_OBSERVER(ret, after_reset_slave, thd, (&param));
+ FOREACH_OBSERVER(ret, after_reset_slave, false, (&param));
return ret;
}
#endif /* HAVE_REPLICATION */