diff options
Diffstat (limited to 'sql/rpl_handler.cc')
-rw-r--r-- | sql/rpl_handler.cc | 79 |
1 files changed, 18 insertions, 61 deletions
diff --git a/sql/rpl_handler.cc b/sql/rpl_handler.cc index a706fcd37ee..34d3df23435 100644 --- a/sql/rpl_handler.cc +++ b/sql/rpl_handler.cc @@ -170,40 +170,16 @@ void delegates_destroy() /* This macro is used by almost all the Delegate methods to iterate over all the observers running given callback function of the - delegate . - - Add observer plugins to the thd->lex list, after each statement, all - plugins add to thd->lex will be automatically unlocked. + delegate. */ -#define FOREACH_OBSERVER(r, f, thd, args) \ +#define FOREACH_OBSERVER(r, f, do_lock, args) \ param.server_id= thd->variables.server_id; \ - /* - Use a struct to make sure that they are allocated adjacent, check - delete_dynamic(). - */ \ - struct { \ - DYNAMIC_ARRAY plugins; \ - /* preallocate 8 slots */ \ - plugin_ref plugins_buffer[8]; \ - } s; \ - DYNAMIC_ARRAY *plugins= &s.plugins; \ - plugin_ref *plugins_buffer= s.plugins_buffer; \ - init_dynamic_array2(plugins, sizeof(plugin_ref), \ - plugins_buffer, 8, 8, MYF(0)); \ read_lock(); \ Observer_info_iterator iter= observer_info_iter(); \ Observer_info *info= iter++; \ for (; info; info= iter++) \ { \ - plugin_ref plugin= \ - my_plugin_lock(0, info->plugin); \ - if (!plugin) \ - { \ - /* plugin is not intialized or deleted, this is not an error */ \ - r= 0; \ - break; \ - } \ - insert_dynamic(plugins, (uchar *)&plugin); \ + if (do_lock) plugin_lock(thd, plugin_int_to_ref(info->plugin_int)); \ if (((Observer *)info->observer)->f \ && ((Observer *)info->observer)->f args) \ { \ @@ -213,17 +189,7 @@ void delegates_destroy() break; \ } \ } \ - unlock(); \ - /* - Unlock plugins should be done after we released the Delegate lock - to avoid possible deadlock when this is the last user of the - plugin, and when we unlock the plugin, it will try to - deinitialize the plugin, which will try to lock the Delegate in - order to remove the observers. - */ \ - plugin_unlock_list(0, (plugin_ref*)plugins->buffer, \ - plugins->elements); \ - delete_dynamic(plugins) + unlock(); int Trans_delegate::after_commit(THD *thd, bool all) @@ -240,7 +206,7 @@ int Trans_delegate::after_commit(THD *thd, bool all) param.log_pos= log_info ? log_info->log_pos : 0; int ret= 0; - FOREACH_OBSERVER(ret, after_commit, thd, (¶m)); + FOREACH_OBSERVER(ret, after_commit, false, (¶m)); /* This is the end of a real transaction or autocommit statement, we @@ -268,7 +234,7 @@ int Trans_delegate::after_rollback(THD *thd, bool all) param.log_pos= log_info ? log_info->log_pos : 0; int ret= 0; - FOREACH_OBSERVER(ret, after_rollback, thd, (¶m)); + FOREACH_OBSERVER(ret, after_rollback, false, (¶m)); /* This is the end of a real transaction or autocommit statement, we @@ -307,7 +273,7 @@ int Binlog_storage_delegate::after_flush(THD *thd, log_info->log_pos = log_pos; int ret= 0; - FOREACH_OBSERVER(ret, after_flush, thd, + FOREACH_OBSERVER(ret, after_flush, false, (¶m, log_info->log_file, log_info->log_pos, flags)); return ret; } @@ -321,7 +287,7 @@ int Binlog_transmit_delegate::transmit_start(THD *thd, ushort flags, param.flags= flags; int ret= 0; - FOREACH_OBSERVER(ret, transmit_start, thd, (¶m, log_file, log_pos)); + FOREACH_OBSERVER(ret, transmit_start, true, (¶m, log_file, log_pos)); return ret; } @@ -331,7 +297,7 @@ int Binlog_transmit_delegate::transmit_stop(THD *thd, ushort flags) param.flags= flags; int ret= 0; - FOREACH_OBSERVER(ret, transmit_stop, thd, (¶m)); + FOREACH_OBSERVER(ret, transmit_stop, false, (¶m)); return ret; } @@ -356,13 +322,6 @@ int Binlog_transmit_delegate::reserve_header(THD *thd, ushort flags, Observer_info *info= iter++; for (; info; info= iter++) { - plugin_ref plugin= - my_plugin_lock(thd, info->plugin); - if (!plugin) - { - ret= 1; - break; - } hlen= 0; if (((Observer *)info->observer)->reserve_header && ((Observer *)info->observer)->reserve_header(¶m, @@ -371,10 +330,8 @@ int Binlog_transmit_delegate::reserve_header(THD *thd, ushort flags, &hlen)) { ret= 1; - plugin_unlock(thd, plugin); break; } - plugin_unlock(thd, plugin); if (hlen == 0) continue; if (hlen > RESERVE_HEADER_SIZE || packet->append((char *)header, hlen)) @@ -396,7 +353,7 @@ int Binlog_transmit_delegate::before_send_event(THD *thd, ushort flags, param.flags= flags; int ret= 0; - FOREACH_OBSERVER(ret, before_send_event, thd, + FOREACH_OBSERVER(ret, before_send_event, false, (¶m, (uchar *)packet->c_ptr(), packet->length(), log_file+dirname_length(log_file), log_pos)); @@ -410,7 +367,7 @@ int Binlog_transmit_delegate::after_send_event(THD *thd, ushort flags, param.flags= flags; int ret= 0; - FOREACH_OBSERVER(ret, after_send_event, thd, + FOREACH_OBSERVER(ret, after_send_event, false, (¶m, packet->c_ptr(), packet->length())); return ret; } @@ -422,7 +379,7 @@ int Binlog_transmit_delegate::after_reset_master(THD *thd, ushort flags) param.flags= flags; int ret= 0; - FOREACH_OBSERVER(ret, after_reset_master, thd, (¶m)); + FOREACH_OBSERVER(ret, after_reset_master, false, (¶m)); return ret; } @@ -443,7 +400,7 @@ int Binlog_relay_IO_delegate::thread_start(THD *thd, Master_info *mi) init_param(¶m, mi); int ret= 0; - FOREACH_OBSERVER(ret, thread_start, thd, (¶m)); + FOREACH_OBSERVER(ret, thread_start, true, (¶m)); return ret; } @@ -455,7 +412,7 @@ int Binlog_relay_IO_delegate::thread_stop(THD *thd, Master_info *mi) init_param(¶m, mi); int ret= 0; - FOREACH_OBSERVER(ret, thread_stop, thd, (¶m)); + FOREACH_OBSERVER(ret, thread_stop, false, (¶m)); return ret; } @@ -467,7 +424,7 @@ int Binlog_relay_IO_delegate::before_request_transmit(THD *thd, init_param(¶m, mi); int ret= 0; - FOREACH_OBSERVER(ret, before_request_transmit, thd, (¶m, (uint32)flags)); + FOREACH_OBSERVER(ret, before_request_transmit, false, (¶m, (uint32)flags)); return ret; } @@ -480,7 +437,7 @@ int Binlog_relay_IO_delegate::after_read_event(THD *thd, Master_info *mi, init_param(¶m, mi); int ret= 0; - FOREACH_OBSERVER(ret, after_read_event, thd, + FOREACH_OBSERVER(ret, after_read_event, false, (¶m, packet, len, event_buf, event_len)); return ret; } @@ -498,7 +455,7 @@ int Binlog_relay_IO_delegate::after_queue_event(THD *thd, Master_info *mi, flags |= BINLOG_STORAGE_IS_SYNCED; int ret= 0; - FOREACH_OBSERVER(ret, after_queue_event, thd, + FOREACH_OBSERVER(ret, after_queue_event, false, (¶m, event_buf, event_len, flags)); return ret; } @@ -510,7 +467,7 @@ int Binlog_relay_IO_delegate::after_reset_slave(THD *thd, Master_info *mi) init_param(¶m, mi); int ret= 0; - FOREACH_OBSERVER(ret, after_reset_slave, thd, (¶m)); + FOREACH_OBSERVER(ret, after_reset_slave, false, (¶m)); return ret; } #endif /* HAVE_REPLICATION */ |