summaryrefslogtreecommitdiff
path: root/storage/federatedx
diff options
context:
space:
mode:
Diffstat (limited to 'storage/federatedx')
-rw-r--r--storage/federatedx/federatedx_io_mysql.cc5
-rw-r--r--storage/federatedx/federatedx_txn.cc8
-rw-r--r--storage/federatedx/ha_federatedx.cc48
-rw-r--r--storage/federatedx/ha_federatedx.h3
4 files changed, 39 insertions, 25 deletions
diff --git a/storage/federatedx/federatedx_io_mysql.cc b/storage/federatedx/federatedx_io_mysql.cc
index 8f027e1b5e0..54059c0ecff 100644
--- a/storage/federatedx/federatedx_io_mysql.cc
+++ b/storage/federatedx/federatedx_io_mysql.cc
@@ -120,6 +120,7 @@ public:
void *ref);
virtual int seek_position(FEDERATEDX_IO_RESULT **io_result,
const void *ref);
+ virtual void set_thd(void *thd);
};
@@ -648,3 +649,7 @@ int federatedx_io_mysql::seek_position(FEDERATEDX_IO_RESULT **io_result,
return 0;
}
+void federatedx_io_mysql::set_thd(void *thd)
+{
+ mysql.net.thd= thd;
+}
diff --git a/storage/federatedx/federatedx_txn.cc b/storage/federatedx/federatedx_txn.cc
index 232ac335dfc..220896cc2a4 100644
--- a/storage/federatedx/federatedx_txn.cc
+++ b/storage/federatedx/federatedx_txn.cc
@@ -93,8 +93,8 @@ void federatedx_txn::close(FEDERATEDX_SERVER *server)
}
-int federatedx_txn::acquire(FEDERATEDX_SHARE *share, bool readonly,
- federatedx_io **ioptr)
+int federatedx_txn::acquire(FEDERATEDX_SHARE *share, void *thd,
+ bool readonly, federatedx_io **ioptr)
{
federatedx_io *io;
FEDERATEDX_SERVER *server= share->s;
@@ -131,6 +131,7 @@ int federatedx_txn::acquire(FEDERATEDX_SHARE *share, bool readonly,
io->busy= TRUE;
io->owner_ptr= ioptr;
+ io->set_thd(thd);
}
DBUG_ASSERT(io->busy && io->server == server);
@@ -157,7 +158,10 @@ void federatedx_txn::release(federatedx_io **ioptr)
io->active, io->is_autocommit()));
if (io->is_autocommit())
+ {
+ io->set_thd(NULL);
io->active= FALSE;
+ }
}
release_scan();
diff --git a/storage/federatedx/ha_federatedx.cc b/storage/federatedx/ha_federatedx.cc
index e3506e1a4df..ed97c08ddf7 100644
--- a/storage/federatedx/ha_federatedx.cc
+++ b/storage/federatedx/ha_federatedx.cc
@@ -1340,6 +1340,7 @@ bool ha_federatedx::create_where_from_key(String *to,
}
break;
}
+ /* fall through */
case HA_READ_KEY_OR_NEXT:
DBUG_PRINT("info", ("federatedx HA_READ_KEY_OR_NEXT %d", i));
if (emit_key_part_name(&tmp, key_part) ||
@@ -1359,6 +1360,7 @@ bool ha_federatedx::create_where_from_key(String *to,
goto err;
break;
}
+ /* fall through */
case HA_READ_KEY_OR_PREV:
DBUG_PRINT("info", ("federatedx HA_READ_KEY_OR_PREV %d", i));
if (emit_key_part_name(&tmp, key_part) ||
@@ -1760,7 +1762,7 @@ int ha_federatedx::open(const char *name, int mode, uint test_if_locked)
txn= get_txn(thd);
- if ((error= txn->acquire(share, TRUE, &io)))
+ if ((error= txn->acquire(share, thd, TRUE, &io)))
{
free_share(txn, share);
DBUG_RETURN(error);
@@ -2045,7 +2047,7 @@ int ha_federatedx::write_row(uchar *buf)
/* we always want to append this, even if there aren't any fields */
values_string.append(STRING_WITH_LEN(") "));
- if ((error= txn->acquire(share, FALSE, &io)))
+ if ((error= txn->acquire(share, ha_thd(), FALSE, &io)))
DBUG_RETURN(error);
if (use_bulk_insert)
@@ -2134,7 +2136,7 @@ void ha_federatedx::start_bulk_insert(ha_rows rows, uint flags)
Make sure we have an open connection so that we know the
maximum packet size.
*/
- if (txn->acquire(share, FALSE, &io))
+ if (txn->acquire(share, ha_thd(), FALSE, &io))
DBUG_VOID_RETURN;
page_size= (uint) my_getpagesize();
@@ -2165,7 +2167,7 @@ int ha_federatedx::end_bulk_insert()
if (bulk_insert.str && bulk_insert.length && !table_will_be_deleted)
{
- if ((error= txn->acquire(share, FALSE, &io)))
+ if ((error= txn->acquire(share, ha_thd(), FALSE, &io)))
DBUG_RETURN(error);
if (io->query(bulk_insert.str, bulk_insert.length))
error= stash_remote_error();
@@ -2217,7 +2219,7 @@ int ha_federatedx::optimize(THD* thd, HA_CHECK_OPT* check_opt)
DBUG_ASSERT(txn == get_txn(thd));
- if ((error= txn->acquire(share, FALSE, &io)))
+ if ((error= txn->acquire(share, thd, FALSE, &io)))
DBUG_RETURN(error);
if (io->query(query.ptr(), query.length()))
@@ -2249,7 +2251,7 @@ int ha_federatedx::repair(THD* thd, HA_CHECK_OPT* check_opt)
DBUG_ASSERT(txn == get_txn(thd));
- if ((error= txn->acquire(share, FALSE, &io)))
+ if ((error= txn->acquire(share, thd, FALSE, &io)))
DBUG_RETURN(error);
if (io->query(query.ptr(), query.length()))
@@ -2408,7 +2410,7 @@ int ha_federatedx::update_row(const uchar *old_data, uchar *new_data)
if (!has_a_primary_key)
update_string.append(STRING_WITH_LEN(" LIMIT 1"));
- if ((error= txn->acquire(share, FALSE, &io)))
+ if ((error= txn->acquire(share, ha_thd(), FALSE, &io)))
DBUG_RETURN(error);
if (io->query(update_string.ptr(), update_string.length()))
@@ -2486,7 +2488,7 @@ int ha_federatedx::delete_row(const uchar *buf)
DBUG_PRINT("info",
("Delete sql: %s", delete_string.c_ptr_quick()));
- if ((error= txn->acquire(share, FALSE, &io)))
+ if ((error= txn->acquire(share, ha_thd(), FALSE, &io)))
DBUG_RETURN(error);
if (io->query(delete_string.ptr(), delete_string.length()))
@@ -2595,7 +2597,7 @@ int ha_federatedx::index_read_idx_with_result_set(uchar *buf, uint index,
NULL, 0, 0);
sql_query.append(index_string);
- if ((retval= txn->acquire(share, TRUE, &io)))
+ if ((retval= txn->acquire(share, ha_thd(), TRUE, &io)))
DBUG_RETURN(retval);
if (io->query(sql_query.ptr(), sql_query.length()))
@@ -2675,7 +2677,7 @@ int ha_federatedx::read_range_first(const key_range *start_key,
&table->key_info[active_index],
start_key, end_key, 0, eq_range_arg);
- if ((retval= txn->acquire(share, TRUE, &io)))
+ if ((retval= txn->acquire(share, ha_thd(), TRUE, &io)))
DBUG_RETURN(retval);
if (stored_result)
@@ -2775,7 +2777,7 @@ int ha_federatedx::rnd_init(bool scan)
{
int error;
- if ((error= txn->acquire(share, TRUE, &io)))
+ if ((error= txn->acquire(share, ha_thd(), TRUE, &io)))
DBUG_RETURN(error);
if (stored_result)
@@ -2822,7 +2824,7 @@ int ha_federatedx::free_result()
else
{
federatedx_io *tmp_io= 0, **iop;
- if (!*(iop= &io) && (error= txn->acquire(share, TRUE, (iop= &tmp_io))))
+ if (!*(iop= &io) && (error= txn->acquire(share, ha_thd(), TRUE, (iop= &tmp_io))))
{
DBUG_ASSERT(0); // Fail when testing
insert_dynamic(&results, (uchar*) &stored_result);
@@ -2902,7 +2904,7 @@ int ha_federatedx::read_next(uchar *buf, FEDERATEDX_IO_RESULT *result)
FEDERATEDX_IO_ROW *row;
DBUG_ENTER("ha_federatedx::read_next");
- if ((retval= txn->acquire(share, TRUE, &io)))
+ if ((retval= txn->acquire(share, ha_thd(), TRUE, &io)))
DBUG_RETURN(retval);
/* Fetch a row, insert it back in a row format. */
@@ -2947,7 +2949,7 @@ void ha_federatedx::position(const uchar *record __attribute__ ((unused)))
DBUG_VOID_RETURN;
}
- if (txn->acquire(share, TRUE, &io))
+ if (txn->acquire(share, ha_thd(), TRUE, &io))
DBUG_VOID_RETURN;
io->mark_position(stored_result, ref);
@@ -2976,7 +2978,7 @@ int ha_federatedx::rnd_pos(uchar *buf, uchar *pos)
/* We have to move this to 'ref' to get things aligned */
bmove(ref, pos, ref_length);
- if ((retval= txn->acquire(share, TRUE, &io)))
+ if ((retval= txn->acquire(share, ha_thd(), TRUE, &io)))
goto error;
if ((retval= io->seek_position(&result, ref)))
@@ -3050,7 +3052,7 @@ int ha_federatedx::info(uint flag)
/* we want not to show table status if not needed to do so */
if (flag & (HA_STATUS_VARIABLE | HA_STATUS_CONST | HA_STATUS_AUTO))
{
- if (!*(iop= &io) && (error_code= tmp_txn->acquire(share, TRUE, (iop= &tmp_io))))
+ if (!*(iop= &io) && (error_code= tmp_txn->acquire(share, thd, TRUE, (iop= &tmp_io))))
goto fail;
}
@@ -3151,6 +3153,7 @@ int ha_federatedx::extra(ha_extra_function operation)
int ha_federatedx::reset(void)
{
+ THD *thd= ha_thd();
int error = 0;
insert_dup_update= FALSE;
@@ -3168,9 +3171,9 @@ int ha_federatedx::reset(void)
federatedx_io *tmp_io= 0, **iop;
// external_lock may not have been called so txn may not be set
- tmp_txn= get_txn(ha_thd());
+ tmp_txn= get_txn(thd);
- if (!*(iop= &io) && (error= tmp_txn->acquire(share, TRUE, (iop= &tmp_io))))
+ if (!*(iop= &io) && (error= tmp_txn->acquire(share, thd, TRUE, (iop= &tmp_io))))
{
DBUG_ASSERT(0); // Fail when testing
return error;
@@ -3204,6 +3207,7 @@ int ha_federatedx::reset(void)
int ha_federatedx::delete_all_rows()
{
+ THD *thd= ha_thd();
char query_buffer[FEDERATEDX_QUERY_BUFFER_SIZE];
String query(query_buffer, sizeof(query_buffer), &my_charset_bin);
int error;
@@ -3217,14 +3221,14 @@ int ha_federatedx::delete_all_rows()
ident_quote_char);
/* no need for savepoint in autocommit mode */
- if (!(ha_thd()->variables.option_bits & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)))
+ if (!(thd->variables.option_bits & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)))
txn->stmt_autocommit();
/*
TRUNCATE won't return anything in mysql_affected_rows
*/
- if ((error= txn->acquire(share, FALSE, &io)))
+ if ((error= txn->acquire(share, thd, FALSE, &io)))
DBUG_RETURN(error);
if (io->query(query.ptr(), query.length()))
@@ -3369,7 +3373,7 @@ int ha_federatedx::create(const char *name, TABLE *table_arg,
if (tmp_share.s)
{
tmp_txn= get_txn(thd);
- if (!(retval= tmp_txn->acquire(&tmp_share, TRUE, &tmp_io)))
+ if (!(retval= tmp_txn->acquire(&tmp_share, thd, TRUE, &tmp_io)))
{
retval= test_connection(thd, tmp_io, &tmp_share);
tmp_txn->release(&tmp_io);
@@ -3466,7 +3470,7 @@ int ha_federatedx::external_lock(MYSQL_THD thd, int lock_type)
{
table_will_be_deleted = FALSE;
txn= get_txn(thd);
- if (!(error= txn->acquire(share, lock_type == F_RDLCK, &io)) &&
+ if (!(error= txn->acquire(share, ha_thd(), lock_type == F_RDLCK, &io)) &&
(lock_type == F_WRLCK || !io->is_autocommit()))
{
if (!thd_test_options(thd, (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)))
diff --git a/storage/federatedx/ha_federatedx.h b/storage/federatedx/ha_federatedx.h
index f8531014ee4..abf25abe0c5 100644
--- a/storage/federatedx/ha_federatedx.h
+++ b/storage/federatedx/ha_federatedx.h
@@ -215,6 +215,7 @@ public:
void *ref)=0;
virtual int seek_position(FEDERATEDX_IO_RESULT **io_result,
const void *ref)=0;
+ virtual void set_thd(void *thd) { }
};
@@ -233,7 +234,7 @@ public:
bool has_connections() const { return txn_list != NULL; }
bool in_transaction() const { return savepoint_next != 0; }
- int acquire(FEDERATEDX_SHARE *share, bool readonly, federatedx_io **io);
+ int acquire(FEDERATEDX_SHARE *share, void *thd, bool readonly, federatedx_io **io);
void release(federatedx_io **io);
void close(FEDERATEDX_SERVER *);