diff options
author | Michael Widenius <monty@askmonty.org> | 2010-08-25 01:44:50 +0300 |
---|---|---|
committer | Michael Widenius <monty@askmonty.org> | 2010-08-25 01:44:50 +0300 |
commit | 58a75bb18b2a4080c8fae77024afed37f1be1314 (patch) | |
tree | 4d9d65f5747636d6dea9295e79464108df24a9a2 /storage/federatedx | |
parent | 99b79db5dca6909456a40d859298ba3992b145da (diff) | |
parent | a82671178919afba86ddfdf2b64321eb9afff8e5 (diff) | |
download | mariadb-git-58a75bb18b2a4080c8fae77024afed37f1be1314.tar.gz |
Automerge with 5.1
Diffstat (limited to 'storage/federatedx')
-rw-r--r-- | storage/federatedx/federatedx_io_mysql.cc | 69 | ||||
-rw-r--r-- | storage/federatedx/federatedx_io_null.cc | 22 | ||||
-rw-r--r-- | storage/federatedx/ha_federatedx.cc | 202 | ||||
-rw-r--r-- | storage/federatedx/ha_federatedx.h | 51 |
4 files changed, 250 insertions, 94 deletions
diff --git a/storage/federatedx/federatedx_io_mysql.cc b/storage/federatedx/federatedx_io_mysql.cc index 5245395b060..d6844fab2c6 100644 --- a/storage/federatedx/federatedx_io_mysql.cc +++ b/storage/federatedx/federatedx_io_mysql.cc @@ -1,4 +1,4 @@ -/* +/* Copyright (c) 2007, Antony T Curtis All rights reserved. @@ -51,6 +51,12 @@ typedef struct federatedx_savepoint uint flags; } SAVEPT; +struct mysql_position +{ + MYSQL_RES* result; + MYSQL_ROW_OFFSET offset; +}; + class federatedx_io_mysql :public federatedx_io { @@ -76,16 +82,16 @@ public: virtual int error_code(); virtual const char *error_str(); - + void reset(); int commit(); int rollback(); - + int savepoint_set(ulong sp); ulong savepoint_release(ulong sp); ulong savepoint_rollback(ulong sp); void savepoint_restrict(ulong sp); - + ulong last_savepoint() const; ulong actual_savepoint() const; bool is_autocommit() const; @@ -94,7 +100,7 @@ public: uint table_name_length, uint flag); /* resultset operations */ - + virtual void free_result(FEDERATEDX_IO_RESULT *io_result); virtual unsigned int get_num_fields(FEDERATEDX_IO_RESULT *io_result); virtual my_ulonglong get_num_rows(FEDERATEDX_IO_RESULT *io_result); @@ -104,6 +110,12 @@ public: unsigned int column); virtual bool is_column_null(const FEDERATEDX_IO_ROW *row, unsigned int column) const; + + virtual size_t get_ref_length() const; + virtual void mark_position(FEDERATEDX_IO_RESULT *io_result, + void *ref); + virtual int seek_position(FEDERATEDX_IO_RESULT **io_result, + const void *ref); }; @@ -466,14 +478,13 @@ const char *federatedx_io_mysql::error_str() return mysql_error(&mysql); } - FEDERATEDX_IO_RESULT *federatedx_io_mysql::store_result() { FEDERATEDX_IO_RESULT *result; DBUG_ENTER("federatedx_io_mysql::store_result"); - + result= (FEDERATEDX_IO_RESULT *) mysql_store_result(&mysql); - + DBUG_RETURN(result); } @@ -590,3 +601,45 @@ error: free_result(result); return 1; } + + + +size_t federatedx_io_mysql::get_ref_length() const +{ + return sizeof(mysql_position); +} + + +void federatedx_io_mysql::mark_position(FEDERATEDX_IO_RESULT *io_result, + void *ref) +{ + MYSQL_ROWS *tmp= 0; + mysql_position& pos= *reinterpret_cast<mysql_position*>(ref); + pos.result= (MYSQL_RES *) io_result; + + if (pos.result && pos.result->data) + { + for (tmp= pos.result->data->data; + tmp && (tmp->next != pos.result->data_cursor); + tmp= tmp->next) + {} + } + + pos.offset= tmp; +} + +int federatedx_io_mysql::seek_position(FEDERATEDX_IO_RESULT **io_result, + const void *ref) +{ + const mysql_position& pos= *reinterpret_cast<const mysql_position*>(ref); + + if (!pos.result || !pos.offset) + return HA_ERR_END_OF_FILE; + + pos.result->current_row= 0; + pos.result->data_cursor= pos.offset; + *io_result= (FEDERATEDX_IO_RESULT*) pos.result; + + return 0; +} + diff --git a/storage/federatedx/federatedx_io_null.cc b/storage/federatedx/federatedx_io_null.cc index cd8fc3eaf85..49f93ab6546 100644 --- a/storage/federatedx/federatedx_io_null.cc +++ b/storage/federatedx/federatedx_io_null.cc @@ -96,6 +96,11 @@ public: unsigned int column); virtual bool is_column_null(const FEDERATEDX_IO_ROW *row, unsigned int column) const; + virtual size_t get_ref_length() const; + virtual void mark_position(FEDERATEDX_IO_RESULT *io_result, + void *ref); + virtual int seek_position(FEDERATEDX_IO_RESULT **io_result, + const void *ref); }; @@ -275,3 +280,20 @@ bool federatedx_io_null::table_metadata(ha_statistics *stats, return 0; } + +size_t federatedx_io_null::get_ref_length() const +{ + return sizeof(int); +} + + +void federatedx_io_null::mark_position(FEDERATEDX_IO_RESULT *io_result, + void *ref) +{ +} + +int federatedx_io_null::seek_position(FEDERATEDX_IO_RESULT **io_result, + const void *ref) +{ + return 0; +} diff --git a/storage/federatedx/ha_federatedx.cc b/storage/federatedx/ha_federatedx.cc index 04ba9984492..8735e5625e6 100644 --- a/storage/federatedx/ha_federatedx.cc +++ b/storage/federatedx/ha_federatedx.cc @@ -1717,14 +1717,14 @@ federatedx_txn *ha_federatedx::get_txn(THD *thd, bool no_create) return *txnp; } - + int ha_federatedx::disconnect(handlerton *hton, MYSQL_THD thd) { federatedx_txn *txn= (federatedx_txn *) thd_get_ha_data(thd, hton); delete txn; return 0; } - + /* Used for opening tables. The name will be the name of the file. @@ -1756,14 +1756,15 @@ int ha_federatedx::open(const char *name, int mode, uint test_if_locked) free_share(txn, share); DBUG_RETURN(error); } - + + ref_length= io->get_ref_length(); + txn->release(&io); - - ref_length= (table->s->primary_key != MAX_KEY ? - table->key_info[table->s->primary_key].key_length : - table->s->reclength); + DBUG_PRINT("info", ("ref_length: %u", ref_length)); + my_init_dynamic_array(&results, sizeof(FEDERATEDX_IO_RESULT*), 4, 4); + reset(); DBUG_RETURN(0); @@ -1788,8 +1789,9 @@ int ha_federatedx::close(void) DBUG_ENTER("ha_federatedx::close"); /* free the result set */ - if (stored_result) - retval= free_result(); + reset(); + + delete_dynamic(&results); /* Disconnect from mysql */ if (!thd || !(txn= get_txn(thd, true))) @@ -1799,7 +1801,7 @@ int ha_federatedx::close(void) tmp_txn.release(&io); DBUG_ASSERT(io == NULL); - + if ((error= free_share(&tmp_txn, share))) retval= error; } @@ -2525,7 +2527,7 @@ int ha_federatedx::index_read_idx(uchar *buf, uint index, const uchar *key, uint key_len, enum ha_rkey_function find_flag) { int retval; - FEDERATEDX_IO_RESULT *io_result; + FEDERATEDX_IO_RESULT *io_result= 0; DBUG_ENTER("ha_federatedx::index_read_idx"); if ((retval= index_read_idx_with_result_set(buf, index, key, @@ -2601,7 +2603,7 @@ int ha_federatedx::index_read_idx_with_result_set(uchar *buf, uint index, if (!(retval= read_next(buf, *result))) DBUG_RETURN(retval); - io->free_result(*result); + insert_dynamic(&results, (uchar*) result); *result= 0; table->status= STATUS_NOT_FOUND; DBUG_RETURN(retval); @@ -2669,10 +2671,7 @@ int ha_federatedx::read_range_first(const key_range *start_key, DBUG_RETURN(retval); if (stored_result) - { - io->free_result(stored_result); - stored_result= 0; - } + (void) free_result(); if (io->query(sql_query.ptr(), sql_query.length())) { @@ -2773,10 +2772,7 @@ int ha_federatedx::rnd_init(bool scan) DBUG_RETURN(error); if (stored_result) - { - io->free_result(stored_result); - stored_result= 0; - } + (void) free_result(); if (io->query(share->select_query, strlen(share->select_query))) @@ -2803,17 +2799,35 @@ int ha_federatedx::rnd_end() int ha_federatedx::free_result() { int error; - federatedx_io *tmp_io= 0, **iop; + DBUG_ENTER("ha_federatedx::free_result"); DBUG_ASSERT(stored_result); - if (!*(iop= &io) && (error= txn->acquire(share, TRUE, (iop= &tmp_io)))) + for (uint i= 0; i < results.elements; ++i) { - DBUG_ASSERT(0); // Fail when testing - return error; + FEDERATEDX_IO_RESULT *result= 0; + get_dynamic(&results, (uchar*) &result, i); + if (result == stored_result) + goto end; } - (*iop)->free_result(stored_result); + if (position_called) + { + insert_dynamic(&results, (uchar*) &stored_result); + } + else + { + federatedx_io *tmp_io= 0, **iop; + if (!*(iop= &io) && (error= txn->acquire(share, TRUE, (iop= &tmp_io)))) + { + DBUG_ASSERT(0); // Fail when testing + insert_dynamic(&results, (uchar*) &stored_result); + goto end; + } + (*iop)->free_result(stored_result); + txn->release(&tmp_io); + } +end: stored_result= 0; - txn->release(&tmp_io); - return 0; + position_called= FALSE; + DBUG_RETURN(0); } int ha_federatedx::index_end(void) @@ -2862,8 +2876,8 @@ int ha_federatedx::rnd_next(uchar *buf) SYNOPSIS field_in_record_is_null() - buf byte pointer to record - result mysql result set + buf byte pointer to record + result mysql result set DESCRIPTION This method is a wrapper method that reads one record from a result @@ -2896,24 +2910,43 @@ int ha_federatedx::read_next(uchar *buf, FEDERATEDX_IO_RESULT *result) } -/* - store reference to current row so that we can later find it for - a re-read, update or delete. - - In case of federatedx, a reference is either a primary key or - the whole record. +/** + @brief Store a reference to current row. + + @details During a query execution we may have different result sets (RS), + e.g. for different ranges. All the RS's used are stored in + memory and placed in @c results dynamic array. At the end of + execution all stored RS's are freed at once in the + @c ha_federated::reset(). + So, in case of federated, a reference to current row is a + stored result address and current data cursor position. + As we keep all RS in memory during a query execution, + we can get any record using the reference any time until + @c ha_federated::reset() is called. + TODO: we don't have to store all RS's rows but only those + we call @c ha_federated::position() for, so we can free memory + where we store other rows in the @c ha_federated::index_end(). + + @param[in] record record data (unused) - Called from filesort.cc, sql_select.cc, sql_delete.cc and sql_update.cc. */ -void ha_federatedx::position(const uchar *record) +void ha_federatedx::position(const uchar *record __attribute__ ((unused))) { DBUG_ENTER("ha_federatedx::position"); - if (table->s->primary_key != MAX_KEY) - key_copy(ref, (uchar *)record, table->key_info + table->s->primary_key, - ref_length); - else - memcpy(ref, record, ref_length); + + bzero(ref, ref_length); + + if (!stored_result) + DBUG_VOID_RETURN; + + if (txn->acquire(share, TRUE, &io)) + DBUG_VOID_RETURN; + + io->mark_position(stored_result, ref); + + position_called= TRUE; + DBUG_VOID_RETURN; } @@ -2929,23 +2962,23 @@ void ha_federatedx::position(const uchar *record) int ha_federatedx::rnd_pos(uchar *buf, uchar *pos) { - int result; + int retval; + FEDERATEDX_IO_RESULT *result= stored_result; DBUG_ENTER("ha_federatedx::rnd_pos"); ha_statistic_increment(&SSV::ha_read_rnd_count); - if (table->s->primary_key != MAX_KEY) - { - /* We have a primary key, so use index_read_idx to find row */ - result= index_read_idx(buf, table->s->primary_key, pos, - ref_length, HA_READ_KEY_EXACT); - } - else - { - /* otherwise, get the old record ref as obtained in ::position */ - memcpy(buf, pos, ref_length); - result= 0; - } - table->status= result ? STATUS_NOT_FOUND : 0; - DBUG_RETURN(result); + + if ((retval= txn->acquire(share, TRUE, &io))) + goto error; + + if ((retval= io->seek_position(&result, pos))) + goto error; + + retval= read_next(buf, result); + DBUG_RETURN(retval); + +error: + table->status= STATUS_NOT_FOUND; + DBUG_RETURN(retval); } @@ -2996,15 +3029,20 @@ int ha_federatedx::rnd_pos(uchar *buf, uchar *pos) int ha_federatedx::info(uint flag) { uint error_code; + THD *thd= current_thd; + federatedx_txn *tmp_txn; federatedx_io *tmp_io= 0, **iop= 0; DBUG_ENTER("ha_federatedx::info"); error_code= ER_QUERY_ON_FOREIGN_DATA_SOURCE; + // external_lock may not have been called so txn may not be set + tmp_txn= get_txn(thd); + /* we want not to show table status if not needed to do so */ if (flag & (HA_STATUS_VARIABLE | HA_STATUS_CONST | HA_STATUS_AUTO)) { - if (!*(iop= &io) && (error_code= txn->acquire(share, TRUE, (iop= &tmp_io)))) + if (!*(iop= &io) && (error_code= tmp_txn->acquire(share, TRUE, (iop= &tmp_io)))) goto fail; } @@ -3029,14 +3067,14 @@ int ha_federatedx::info(uint flag) If ::info created it's own transaction, close it. This happens in case of show table status; */ - txn->release(&tmp_io); + tmp_txn->release(&tmp_io); DBUG_RETURN(0); error: if (iop && *iop) { - my_printf_error((*iop)->error_code(), "Got error: %d : %s", MYF(0), + my_printf_error((*iop)->error_code(), "Received error: %d : %s", MYF(0), (*iop)->error_code(), (*iop)->error_str()); } else if (remote_error_number != -1 /* error already reported */) @@ -3045,7 +3083,7 @@ error: my_error(error_code, MYF(0), ER(error_code)); } fail: - txn->release(&tmp_io); + tmp_txn->release(&tmp_io); DBUG_RETURN(error_code); } @@ -3105,12 +3143,44 @@ int ha_federatedx::extra(ha_extra_function operation) int ha_federatedx::reset(void) { + int error = 0; + insert_dup_update= FALSE; ignore_duplicates= FALSE; replace_duplicates= FALSE; - return 0; -} + position_called= FALSE; + + if (stored_result) + insert_dynamic(&results, (uchar*) &stored_result); + stored_result= 0; + + if (results.elements) + { + federatedx_txn *tmp_txn; + federatedx_io *tmp_io= 0, **iop; + + // external_lock may not have been called so txn may not be set + tmp_txn= get_txn(current_thd); + + if (!*(iop= &io) && (error= tmp_txn->acquire(share, TRUE, (iop= &tmp_io)))) + { + DBUG_ASSERT(0); // Fail when testing + return error; + } + + for (uint i= 0; i < results.elements; ++i) + { + FEDERATEDX_IO_RESULT *result= 0; + get_dynamic(&results, (uchar*) &result, i); + (*iop)->free_result(result); + } + tmp_txn->release(&tmp_io); + reset_dynamic(&results); + } + + return error; +} /* Used to delete all rows in a table. Both for cases of truncate and @@ -3237,7 +3307,7 @@ static int test_connection(MYSQL_THD thd, federatedx_io *io, str.length(0); str.append(STRING_WITH_LEN("SELECT * FROM ")); - append_identifier(thd, &str, share->table_name, + append_identifier(thd, &str, share->table_name, share->table_name_length); str.append(STRING_WITH_LEN(" WHERE 1=0")); @@ -3288,14 +3358,14 @@ int ha_federatedx::create(const char *name, TABLE *table_arg, pthread_mutex_lock(&federatedx_mutex); tmp_share.s= get_server(&tmp_share, NULL); pthread_mutex_unlock(&federatedx_mutex); - + if (tmp_share.s) { tmp_txn= get_txn(thd); if (!(retval= tmp_txn->acquire(&tmp_share, TRUE, &tmp_io))) { retval= test_connection(thd, tmp_io, &tmp_share); - tmp_txn->release(&tmp_io); + tmp_txn->release(&tmp_io); } free_server(tmp_txn, tmp_share.s); } diff --git a/storage/federatedx/ha_federatedx.h b/storage/federatedx/ha_federatedx.h index 2fd3c559321..2820f8a6c29 100644 --- a/storage/federatedx/ha_federatedx.h +++ b/storage/federatedx/ha_federatedx.h @@ -1,5 +1,5 @@ -/* -Copyright (c) 2008, Patrick Galbraith +/* +Copyright (c) 2008, Patrick Galbraith All rights reserved. Redistribution and use in source and binary forms, with or without @@ -43,7 +43,7 @@ class federatedx_io; typedef struct st_fedrated_server { MEM_ROOT mem_root; uint use_count, io_count; - + uchar *key; uint key_length; @@ -74,10 +74,10 @@ typedef struct st_fedrated_server { #include <mysql.h> -/* +/* handler::print_error has a case statement for error numbers. - This value is (10000) is far out of range and will envoke the - default: case. + This value is (10000) is far out of range and will envoke the + default: case. (Current error range is 120-159 from include/my_base.h) */ #define HA_FEDERATEDX_ERROR_WITH_REMOTE_SYSTEM 10000 @@ -158,7 +158,7 @@ public: const char * get_database() const { return server->database; } ushort get_port() const { return server->port; } const char * get_socket() const { return server->socket; } - + static bool handles_scheme(const char *scheme); static federatedx_io *construct(MEM_ROOT *server_root, FEDERATEDX_SERVER *server); @@ -167,7 +167,7 @@ public: { return alloc_root(mem_root, size); } static void operator delete(void *ptr, size_t size) { TRASH(ptr, size); } - + virtual int query(const char *buffer, uint length)=0; virtual FEDERATEDX_IO_RESULT *store_result()=0; @@ -178,25 +178,25 @@ public: virtual int error_code()=0; virtual const char *error_str()=0; - + virtual void reset()=0; virtual int commit()=0; virtual int rollback()=0; - + virtual int savepoint_set(ulong sp)=0; virtual ulong savepoint_release(ulong sp)=0; virtual ulong savepoint_rollback(ulong sp)=0; virtual void savepoint_restrict(ulong sp)=0; - + virtual ulong last_savepoint() const=0; virtual ulong actual_savepoint() const=0; virtual bool is_autocommit() const=0; virtual bool table_metadata(ha_statistics *stats, const char *table_name, uint table_name_length, uint flag) = 0; - + /* resultset operations */ - + virtual void free_result(FEDERATEDX_IO_RESULT *io_result)=0; virtual unsigned int get_num_fields(FEDERATEDX_IO_RESULT *io_result)=0; virtual my_ulonglong get_num_rows(FEDERATEDX_IO_RESULT *io_result)=0; @@ -206,6 +206,13 @@ public: unsigned int column)=0; virtual bool is_column_null(const FEDERATEDX_IO_ROW *row, unsigned int column) const=0; + + virtual size_t get_ref_length() const=0; + virtual void mark_position(FEDERATEDX_IO_RESULT *io_result, + void *ref)=0; + virtual int seek_position(FEDERATEDX_IO_RESULT **io_result, + const void *ref)=0; + }; @@ -215,12 +222,12 @@ class federatedx_txn ulong savepoint_level; ulong savepoint_stmt; ulong savepoint_next; - + void release_scan(); public: federatedx_txn(); ~federatedx_txn(); - + bool has_connections() const { return txn_list != NULL; } bool in_transaction() const { return savepoint_next != 0; } int acquire(FEDERATEDX_SHARE *share, bool readonly, federatedx_io **io); @@ -254,8 +261,12 @@ class ha_federatedx: public handler federatedx_txn *txn; federatedx_io *io; FEDERATEDX_IO_RESULT *stored_result; + /** + Array of all stored results we get during a query execution. + */ + DYNAMIC_ARRAY results; + bool position_called; uint fetch_num; // stores the fetch num - FEDERATEDX_IO_OFFSET current_position; // Current position used by ::position() int remote_error_number; char remote_error_buf[FEDERATEDX_QUERY_BUFFER_SIZE]; bool ignore_duplicates, replace_duplicates; @@ -269,7 +280,7 @@ private: */ uint convert_row_to_internal_format(uchar *buf, FEDERATEDX_IO_ROW *row, FEDERATEDX_IO_RESULT *result); - bool create_where_from_key(String *to, KEY *key_info, + bool create_where_from_key(String *to, KEY *key_info, const key_range *start_key, const key_range *end_key, bool records_in_range, bool eq_range); @@ -348,18 +359,18 @@ public: Talk to Kostja about this - how to get the number of rows * ... disk scan time on other side (block size, size of the row) + network time ... - The reason for "records * 1000" is that such a large number forces + The reason for "records * 1000" is that such a large number forces this to use indexes " */ double scan_time() { DBUG_PRINT("info", ("records %lu", (ulong) stats.records)); - return (double)(stats.records*1000); + return (double)(stats.records*1000); } /* The next method will never be called if you do not implement indexes. */ - double read_time(uint index, uint ranges, ha_rows rows) + double read_time(uint index, uint ranges, ha_rows rows) { /* Per Brian, this number is bugus, but this method must be implemented, |