summaryrefslogtreecommitdiff
path: root/storage/federatedx
diff options
context:
space:
mode:
authorMichael Widenius <monty@askmonty.org>2010-08-25 01:44:50 +0300
committerMichael Widenius <monty@askmonty.org>2010-08-25 01:44:50 +0300
commit58a75bb18b2a4080c8fae77024afed37f1be1314 (patch)
tree4d9d65f5747636d6dea9295e79464108df24a9a2 /storage/federatedx
parent99b79db5dca6909456a40d859298ba3992b145da (diff)
parenta82671178919afba86ddfdf2b64321eb9afff8e5 (diff)
downloadmariadb-git-58a75bb18b2a4080c8fae77024afed37f1be1314.tar.gz
Automerge with 5.1
Diffstat (limited to 'storage/federatedx')
-rw-r--r--storage/federatedx/federatedx_io_mysql.cc69
-rw-r--r--storage/federatedx/federatedx_io_null.cc22
-rw-r--r--storage/federatedx/ha_federatedx.cc202
-rw-r--r--storage/federatedx/ha_federatedx.h51
4 files changed, 250 insertions, 94 deletions
diff --git a/storage/federatedx/federatedx_io_mysql.cc b/storage/federatedx/federatedx_io_mysql.cc
index 5245395b060..d6844fab2c6 100644
--- a/storage/federatedx/federatedx_io_mysql.cc
+++ b/storage/federatedx/federatedx_io_mysql.cc
@@ -1,4 +1,4 @@
-/*
+/*
Copyright (c) 2007, Antony T Curtis
All rights reserved.
@@ -51,6 +51,12 @@ typedef struct federatedx_savepoint
uint flags;
} SAVEPT;
+struct mysql_position
+{
+ MYSQL_RES* result;
+ MYSQL_ROW_OFFSET offset;
+};
+
class federatedx_io_mysql :public federatedx_io
{
@@ -76,16 +82,16 @@ public:
virtual int error_code();
virtual const char *error_str();
-
+
void reset();
int commit();
int rollback();
-
+
int savepoint_set(ulong sp);
ulong savepoint_release(ulong sp);
ulong savepoint_rollback(ulong sp);
void savepoint_restrict(ulong sp);
-
+
ulong last_savepoint() const;
ulong actual_savepoint() const;
bool is_autocommit() const;
@@ -94,7 +100,7 @@ public:
uint table_name_length, uint flag);
/* resultset operations */
-
+
virtual void free_result(FEDERATEDX_IO_RESULT *io_result);
virtual unsigned int get_num_fields(FEDERATEDX_IO_RESULT *io_result);
virtual my_ulonglong get_num_rows(FEDERATEDX_IO_RESULT *io_result);
@@ -104,6 +110,12 @@ public:
unsigned int column);
virtual bool is_column_null(const FEDERATEDX_IO_ROW *row,
unsigned int column) const;
+
+ virtual size_t get_ref_length() const;
+ virtual void mark_position(FEDERATEDX_IO_RESULT *io_result,
+ void *ref);
+ virtual int seek_position(FEDERATEDX_IO_RESULT **io_result,
+ const void *ref);
};
@@ -466,14 +478,13 @@ const char *federatedx_io_mysql::error_str()
return mysql_error(&mysql);
}
-
FEDERATEDX_IO_RESULT *federatedx_io_mysql::store_result()
{
FEDERATEDX_IO_RESULT *result;
DBUG_ENTER("federatedx_io_mysql::store_result");
-
+
result= (FEDERATEDX_IO_RESULT *) mysql_store_result(&mysql);
-
+
DBUG_RETURN(result);
}
@@ -590,3 +601,45 @@ error:
free_result(result);
return 1;
}
+
+
+
+size_t federatedx_io_mysql::get_ref_length() const
+{
+ return sizeof(mysql_position);
+}
+
+
+void federatedx_io_mysql::mark_position(FEDERATEDX_IO_RESULT *io_result,
+ void *ref)
+{
+ MYSQL_ROWS *tmp= 0;
+ mysql_position& pos= *reinterpret_cast<mysql_position*>(ref);
+ pos.result= (MYSQL_RES *) io_result;
+
+ if (pos.result && pos.result->data)
+ {
+ for (tmp= pos.result->data->data;
+ tmp && (tmp->next != pos.result->data_cursor);
+ tmp= tmp->next)
+ {}
+ }
+
+ pos.offset= tmp;
+}
+
+int federatedx_io_mysql::seek_position(FEDERATEDX_IO_RESULT **io_result,
+ const void *ref)
+{
+ const mysql_position& pos= *reinterpret_cast<const mysql_position*>(ref);
+
+ if (!pos.result || !pos.offset)
+ return HA_ERR_END_OF_FILE;
+
+ pos.result->current_row= 0;
+ pos.result->data_cursor= pos.offset;
+ *io_result= (FEDERATEDX_IO_RESULT*) pos.result;
+
+ return 0;
+}
+
diff --git a/storage/federatedx/federatedx_io_null.cc b/storage/federatedx/federatedx_io_null.cc
index cd8fc3eaf85..49f93ab6546 100644
--- a/storage/federatedx/federatedx_io_null.cc
+++ b/storage/federatedx/federatedx_io_null.cc
@@ -96,6 +96,11 @@ public:
unsigned int column);
virtual bool is_column_null(const FEDERATEDX_IO_ROW *row,
unsigned int column) const;
+ virtual size_t get_ref_length() const;
+ virtual void mark_position(FEDERATEDX_IO_RESULT *io_result,
+ void *ref);
+ virtual int seek_position(FEDERATEDX_IO_RESULT **io_result,
+ const void *ref);
};
@@ -275,3 +280,20 @@ bool federatedx_io_null::table_metadata(ha_statistics *stats,
return 0;
}
+
+size_t federatedx_io_null::get_ref_length() const
+{
+ return sizeof(int);
+}
+
+
+void federatedx_io_null::mark_position(FEDERATEDX_IO_RESULT *io_result,
+ void *ref)
+{
+}
+
+int federatedx_io_null::seek_position(FEDERATEDX_IO_RESULT **io_result,
+ const void *ref)
+{
+ return 0;
+}
diff --git a/storage/federatedx/ha_federatedx.cc b/storage/federatedx/ha_federatedx.cc
index 04ba9984492..8735e5625e6 100644
--- a/storage/federatedx/ha_federatedx.cc
+++ b/storage/federatedx/ha_federatedx.cc
@@ -1717,14 +1717,14 @@ federatedx_txn *ha_federatedx::get_txn(THD *thd, bool no_create)
return *txnp;
}
-
+
int ha_federatedx::disconnect(handlerton *hton, MYSQL_THD thd)
{
federatedx_txn *txn= (federatedx_txn *) thd_get_ha_data(thd, hton);
delete txn;
return 0;
}
-
+
/*
Used for opening tables. The name will be the name of the file.
@@ -1756,14 +1756,15 @@ int ha_federatedx::open(const char *name, int mode, uint test_if_locked)
free_share(txn, share);
DBUG_RETURN(error);
}
-
+
+ ref_length= io->get_ref_length();
+
txn->release(&io);
-
- ref_length= (table->s->primary_key != MAX_KEY ?
- table->key_info[table->s->primary_key].key_length :
- table->s->reclength);
+
DBUG_PRINT("info", ("ref_length: %u", ref_length));
+ my_init_dynamic_array(&results, sizeof(FEDERATEDX_IO_RESULT*), 4, 4);
+
reset();
DBUG_RETURN(0);
@@ -1788,8 +1789,9 @@ int ha_federatedx::close(void)
DBUG_ENTER("ha_federatedx::close");
/* free the result set */
- if (stored_result)
- retval= free_result();
+ reset();
+
+ delete_dynamic(&results);
/* Disconnect from mysql */
if (!thd || !(txn= get_txn(thd, true)))
@@ -1799,7 +1801,7 @@ int ha_federatedx::close(void)
tmp_txn.release(&io);
DBUG_ASSERT(io == NULL);
-
+
if ((error= free_share(&tmp_txn, share)))
retval= error;
}
@@ -2525,7 +2527,7 @@ int ha_federatedx::index_read_idx(uchar *buf, uint index, const uchar *key,
uint key_len, enum ha_rkey_function find_flag)
{
int retval;
- FEDERATEDX_IO_RESULT *io_result;
+ FEDERATEDX_IO_RESULT *io_result= 0;
DBUG_ENTER("ha_federatedx::index_read_idx");
if ((retval= index_read_idx_with_result_set(buf, index, key,
@@ -2601,7 +2603,7 @@ int ha_federatedx::index_read_idx_with_result_set(uchar *buf, uint index,
if (!(retval= read_next(buf, *result)))
DBUG_RETURN(retval);
- io->free_result(*result);
+ insert_dynamic(&results, (uchar*) result);
*result= 0;
table->status= STATUS_NOT_FOUND;
DBUG_RETURN(retval);
@@ -2669,10 +2671,7 @@ int ha_federatedx::read_range_first(const key_range *start_key,
DBUG_RETURN(retval);
if (stored_result)
- {
- io->free_result(stored_result);
- stored_result= 0;
- }
+ (void) free_result();
if (io->query(sql_query.ptr(), sql_query.length()))
{
@@ -2773,10 +2772,7 @@ int ha_federatedx::rnd_init(bool scan)
DBUG_RETURN(error);
if (stored_result)
- {
- io->free_result(stored_result);
- stored_result= 0;
- }
+ (void) free_result();
if (io->query(share->select_query,
strlen(share->select_query)))
@@ -2803,17 +2799,35 @@ int ha_federatedx::rnd_end()
int ha_federatedx::free_result()
{
int error;
- federatedx_io *tmp_io= 0, **iop;
+ DBUG_ENTER("ha_federatedx::free_result");
DBUG_ASSERT(stored_result);
- if (!*(iop= &io) && (error= txn->acquire(share, TRUE, (iop= &tmp_io))))
+ for (uint i= 0; i < results.elements; ++i)
{
- DBUG_ASSERT(0); // Fail when testing
- return error;
+ FEDERATEDX_IO_RESULT *result= 0;
+ get_dynamic(&results, (uchar*) &result, i);
+ if (result == stored_result)
+ goto end;
}
- (*iop)->free_result(stored_result);
+ if (position_called)
+ {
+ insert_dynamic(&results, (uchar*) &stored_result);
+ }
+ else
+ {
+ federatedx_io *tmp_io= 0, **iop;
+ if (!*(iop= &io) && (error= txn->acquire(share, TRUE, (iop= &tmp_io))))
+ {
+ DBUG_ASSERT(0); // Fail when testing
+ insert_dynamic(&results, (uchar*) &stored_result);
+ goto end;
+ }
+ (*iop)->free_result(stored_result);
+ txn->release(&tmp_io);
+ }
+end:
stored_result= 0;
- txn->release(&tmp_io);
- return 0;
+ position_called= FALSE;
+ DBUG_RETURN(0);
}
int ha_federatedx::index_end(void)
@@ -2862,8 +2876,8 @@ int ha_federatedx::rnd_next(uchar *buf)
SYNOPSIS
field_in_record_is_null()
- buf byte pointer to record
- result mysql result set
+ buf byte pointer to record
+ result mysql result set
DESCRIPTION
This method is a wrapper method that reads one record from a result
@@ -2896,24 +2910,43 @@ int ha_federatedx::read_next(uchar *buf, FEDERATEDX_IO_RESULT *result)
}
-/*
- store reference to current row so that we can later find it for
- a re-read, update or delete.
-
- In case of federatedx, a reference is either a primary key or
- the whole record.
+/**
+ @brief Store a reference to current row.
+
+ @details During a query execution we may have different result sets (RS),
+ e.g. for different ranges. All the RS's used are stored in
+ memory and placed in @c results dynamic array. At the end of
+ execution all stored RS's are freed at once in the
+ @c ha_federated::reset().
+ So, in case of federated, a reference to current row is a
+ stored result address and current data cursor position.
+ As we keep all RS in memory during a query execution,
+ we can get any record using the reference any time until
+ @c ha_federated::reset() is called.
+ TODO: we don't have to store all RS's rows but only those
+ we call @c ha_federated::position() for, so we can free memory
+ where we store other rows in the @c ha_federated::index_end().
+
+ @param[in] record record data (unused)
- Called from filesort.cc, sql_select.cc, sql_delete.cc and sql_update.cc.
*/
-void ha_federatedx::position(const uchar *record)
+void ha_federatedx::position(const uchar *record __attribute__ ((unused)))
{
DBUG_ENTER("ha_federatedx::position");
- if (table->s->primary_key != MAX_KEY)
- key_copy(ref, (uchar *)record, table->key_info + table->s->primary_key,
- ref_length);
- else
- memcpy(ref, record, ref_length);
+
+ bzero(ref, ref_length);
+
+ if (!stored_result)
+ DBUG_VOID_RETURN;
+
+ if (txn->acquire(share, TRUE, &io))
+ DBUG_VOID_RETURN;
+
+ io->mark_position(stored_result, ref);
+
+ position_called= TRUE;
+
DBUG_VOID_RETURN;
}
@@ -2929,23 +2962,23 @@ void ha_federatedx::position(const uchar *record)
int ha_federatedx::rnd_pos(uchar *buf, uchar *pos)
{
- int result;
+ int retval;
+ FEDERATEDX_IO_RESULT *result= stored_result;
DBUG_ENTER("ha_federatedx::rnd_pos");
ha_statistic_increment(&SSV::ha_read_rnd_count);
- if (table->s->primary_key != MAX_KEY)
- {
- /* We have a primary key, so use index_read_idx to find row */
- result= index_read_idx(buf, table->s->primary_key, pos,
- ref_length, HA_READ_KEY_EXACT);
- }
- else
- {
- /* otherwise, get the old record ref as obtained in ::position */
- memcpy(buf, pos, ref_length);
- result= 0;
- }
- table->status= result ? STATUS_NOT_FOUND : 0;
- DBUG_RETURN(result);
+
+ if ((retval= txn->acquire(share, TRUE, &io)))
+ goto error;
+
+ if ((retval= io->seek_position(&result, pos)))
+ goto error;
+
+ retval= read_next(buf, result);
+ DBUG_RETURN(retval);
+
+error:
+ table->status= STATUS_NOT_FOUND;
+ DBUG_RETURN(retval);
}
@@ -2996,15 +3029,20 @@ int ha_federatedx::rnd_pos(uchar *buf, uchar *pos)
int ha_federatedx::info(uint flag)
{
uint error_code;
+ THD *thd= current_thd;
+ federatedx_txn *tmp_txn;
federatedx_io *tmp_io= 0, **iop= 0;
DBUG_ENTER("ha_federatedx::info");
error_code= ER_QUERY_ON_FOREIGN_DATA_SOURCE;
+ // external_lock may not have been called so txn may not be set
+ tmp_txn= get_txn(thd);
+
/* we want not to show table status if not needed to do so */
if (flag & (HA_STATUS_VARIABLE | HA_STATUS_CONST | HA_STATUS_AUTO))
{
- if (!*(iop= &io) && (error_code= txn->acquire(share, TRUE, (iop= &tmp_io))))
+ if (!*(iop= &io) && (error_code= tmp_txn->acquire(share, TRUE, (iop= &tmp_io))))
goto fail;
}
@@ -3029,14 +3067,14 @@ int ha_federatedx::info(uint flag)
If ::info created it's own transaction, close it. This happens in case
of show table status;
*/
- txn->release(&tmp_io);
+ tmp_txn->release(&tmp_io);
DBUG_RETURN(0);
error:
if (iop && *iop)
{
- my_printf_error((*iop)->error_code(), "Got error: %d : %s", MYF(0),
+ my_printf_error((*iop)->error_code(), "Received error: %d : %s", MYF(0),
(*iop)->error_code(), (*iop)->error_str());
}
else if (remote_error_number != -1 /* error already reported */)
@@ -3045,7 +3083,7 @@ error:
my_error(error_code, MYF(0), ER(error_code));
}
fail:
- txn->release(&tmp_io);
+ tmp_txn->release(&tmp_io);
DBUG_RETURN(error_code);
}
@@ -3105,12 +3143,44 @@ int ha_federatedx::extra(ha_extra_function operation)
int ha_federatedx::reset(void)
{
+ int error = 0;
+
insert_dup_update= FALSE;
ignore_duplicates= FALSE;
replace_duplicates= FALSE;
- return 0;
-}
+ position_called= FALSE;
+
+ if (stored_result)
+ insert_dynamic(&results, (uchar*) &stored_result);
+ stored_result= 0;
+
+ if (results.elements)
+ {
+ federatedx_txn *tmp_txn;
+ federatedx_io *tmp_io= 0, **iop;
+
+ // external_lock may not have been called so txn may not be set
+ tmp_txn= get_txn(current_thd);
+
+ if (!*(iop= &io) && (error= tmp_txn->acquire(share, TRUE, (iop= &tmp_io))))
+ {
+ DBUG_ASSERT(0); // Fail when testing
+ return error;
+ }
+
+ for (uint i= 0; i < results.elements; ++i)
+ {
+ FEDERATEDX_IO_RESULT *result= 0;
+ get_dynamic(&results, (uchar*) &result, i);
+ (*iop)->free_result(result);
+ }
+ tmp_txn->release(&tmp_io);
+ reset_dynamic(&results);
+ }
+
+ return error;
+}
/*
Used to delete all rows in a table. Both for cases of truncate and
@@ -3237,7 +3307,7 @@ static int test_connection(MYSQL_THD thd, federatedx_io *io,
str.length(0);
str.append(STRING_WITH_LEN("SELECT * FROM "));
- append_identifier(thd, &str, share->table_name,
+ append_identifier(thd, &str, share->table_name,
share->table_name_length);
str.append(STRING_WITH_LEN(" WHERE 1=0"));
@@ -3288,14 +3358,14 @@ int ha_federatedx::create(const char *name, TABLE *table_arg,
pthread_mutex_lock(&federatedx_mutex);
tmp_share.s= get_server(&tmp_share, NULL);
pthread_mutex_unlock(&federatedx_mutex);
-
+
if (tmp_share.s)
{
tmp_txn= get_txn(thd);
if (!(retval= tmp_txn->acquire(&tmp_share, TRUE, &tmp_io)))
{
retval= test_connection(thd, tmp_io, &tmp_share);
- tmp_txn->release(&tmp_io);
+ tmp_txn->release(&tmp_io);
}
free_server(tmp_txn, tmp_share.s);
}
diff --git a/storage/federatedx/ha_federatedx.h b/storage/federatedx/ha_federatedx.h
index 2fd3c559321..2820f8a6c29 100644
--- a/storage/federatedx/ha_federatedx.h
+++ b/storage/federatedx/ha_federatedx.h
@@ -1,5 +1,5 @@
-/*
-Copyright (c) 2008, Patrick Galbraith
+/*
+Copyright (c) 2008, Patrick Galbraith
All rights reserved.
Redistribution and use in source and binary forms, with or without
@@ -43,7 +43,7 @@ class federatedx_io;
typedef struct st_fedrated_server {
MEM_ROOT mem_root;
uint use_count, io_count;
-
+
uchar *key;
uint key_length;
@@ -74,10 +74,10 @@ typedef struct st_fedrated_server {
#include <mysql.h>
-/*
+/*
handler::print_error has a case statement for error numbers.
- This value is (10000) is far out of range and will envoke the
- default: case.
+ This value is (10000) is far out of range and will envoke the
+ default: case.
(Current error range is 120-159 from include/my_base.h)
*/
#define HA_FEDERATEDX_ERROR_WITH_REMOTE_SYSTEM 10000
@@ -158,7 +158,7 @@ public:
const char * get_database() const { return server->database; }
ushort get_port() const { return server->port; }
const char * get_socket() const { return server->socket; }
-
+
static bool handles_scheme(const char *scheme);
static federatedx_io *construct(MEM_ROOT *server_root,
FEDERATEDX_SERVER *server);
@@ -167,7 +167,7 @@ public:
{ return alloc_root(mem_root, size); }
static void operator delete(void *ptr, size_t size)
{ TRASH(ptr, size); }
-
+
virtual int query(const char *buffer, uint length)=0;
virtual FEDERATEDX_IO_RESULT *store_result()=0;
@@ -178,25 +178,25 @@ public:
virtual int error_code()=0;
virtual const char *error_str()=0;
-
+
virtual void reset()=0;
virtual int commit()=0;
virtual int rollback()=0;
-
+
virtual int savepoint_set(ulong sp)=0;
virtual ulong savepoint_release(ulong sp)=0;
virtual ulong savepoint_rollback(ulong sp)=0;
virtual void savepoint_restrict(ulong sp)=0;
-
+
virtual ulong last_savepoint() const=0;
virtual ulong actual_savepoint() const=0;
virtual bool is_autocommit() const=0;
virtual bool table_metadata(ha_statistics *stats, const char *table_name,
uint table_name_length, uint flag) = 0;
-
+
/* resultset operations */
-
+
virtual void free_result(FEDERATEDX_IO_RESULT *io_result)=0;
virtual unsigned int get_num_fields(FEDERATEDX_IO_RESULT *io_result)=0;
virtual my_ulonglong get_num_rows(FEDERATEDX_IO_RESULT *io_result)=0;
@@ -206,6 +206,13 @@ public:
unsigned int column)=0;
virtual bool is_column_null(const FEDERATEDX_IO_ROW *row,
unsigned int column) const=0;
+
+ virtual size_t get_ref_length() const=0;
+ virtual void mark_position(FEDERATEDX_IO_RESULT *io_result,
+ void *ref)=0;
+ virtual int seek_position(FEDERATEDX_IO_RESULT **io_result,
+ const void *ref)=0;
+
};
@@ -215,12 +222,12 @@ class federatedx_txn
ulong savepoint_level;
ulong savepoint_stmt;
ulong savepoint_next;
-
+
void release_scan();
public:
federatedx_txn();
~federatedx_txn();
-
+
bool has_connections() const { return txn_list != NULL; }
bool in_transaction() const { return savepoint_next != 0; }
int acquire(FEDERATEDX_SHARE *share, bool readonly, federatedx_io **io);
@@ -254,8 +261,12 @@ class ha_federatedx: public handler
federatedx_txn *txn;
federatedx_io *io;
FEDERATEDX_IO_RESULT *stored_result;
+ /**
+ Array of all stored results we get during a query execution.
+ */
+ DYNAMIC_ARRAY results;
+ bool position_called;
uint fetch_num; // stores the fetch num
- FEDERATEDX_IO_OFFSET current_position; // Current position used by ::position()
int remote_error_number;
char remote_error_buf[FEDERATEDX_QUERY_BUFFER_SIZE];
bool ignore_duplicates, replace_duplicates;
@@ -269,7 +280,7 @@ private:
*/
uint convert_row_to_internal_format(uchar *buf, FEDERATEDX_IO_ROW *row,
FEDERATEDX_IO_RESULT *result);
- bool create_where_from_key(String *to, KEY *key_info,
+ bool create_where_from_key(String *to, KEY *key_info,
const key_range *start_key,
const key_range *end_key,
bool records_in_range, bool eq_range);
@@ -348,18 +359,18 @@ public:
Talk to Kostja about this - how to get the
number of rows * ...
disk scan time on other side (block size, size of the row) + network time ...
- The reason for "records * 1000" is that such a large number forces
+ The reason for "records * 1000" is that such a large number forces
this to use indexes "
*/
double scan_time()
{
DBUG_PRINT("info", ("records %lu", (ulong) stats.records));
- return (double)(stats.records*1000);
+ return (double)(stats.records*1000);
}
/*
The next method will never be called if you do not implement indexes.
*/
- double read_time(uint index, uint ranges, ha_rows rows)
+ double read_time(uint index, uint ranges, ha_rows rows)
{
/*
Per Brian, this number is bugus, but this method must be implemented,