summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorunknown <msvensson@neptunus.homeip.net>2005-02-01 15:43:08 +0100
committerunknown <msvensson@neptunus.homeip.net>2005-02-01 15:43:08 +0100
commit38e395aa325eb418cf92a6ce62646959ee2ed477 (patch)
treefd2990524cb48181687b9d9990b0bb57d73c8579
parentd6747f963e13d87c1a7bc952b95ceeba8ba2aada (diff)
downloadmariadb-git-38e395aa325eb418cf92a6ce62646959ee2ed477.tar.gz
WL#2269 Enable query cache for NDB
- Added a thread that fetches commit_count for open tables. This will mean that NDB will not have to be contacted for every use of a cached query. sql/ha_ndbcluster.cc: Added a thread that periodically will fetch commit_count for open tables and store that value in share. The commit count value is then used when query cache asks if a cached query can be used. The thread activation interval is regulated by the config variable ndb_cache_check_time, it's default value is 0 which means that NDB is contacted every time a cached query is reused. sql/ha_ndbcluster.h: Added commit_count to share Added ndb_cache_check_time sql/mysqld.cc: Added config variable ndb_cache_check_time sql/set_var.cc: Added config variable ndb_cache_check_time
-rw-r--r--mysql-test/r/ndb_cache2.result193
-rw-r--r--mysql-test/r/ndb_cache_multi2.result74
-rw-r--r--mysql-test/t/ndb_cache2.test126
-rw-r--r--mysql-test/t/ndb_cache_multi2.test71
-rw-r--r--sql/ha_ndbcluster.cc306
-rw-r--r--sql/ha_ndbcluster.h2
-rw-r--r--sql/mysqld.cc7
-rw-r--r--sql/set_var.cc3
8 files changed, 708 insertions, 74 deletions
diff --git a/mysql-test/r/ndb_cache2.result b/mysql-test/r/ndb_cache2.result
new file mode 100644
index 00000000000..ce10e9dab00
--- /dev/null
+++ b/mysql-test/r/ndb_cache2.result
@@ -0,0 +1,193 @@
+drop table if exists t1;
+set GLOBAL query_cache_type=on;
+set GLOBAL query_cache_size=1355776;
+set GLOBAL ndb_cache_check_time=5;
+reset query cache;
+flush status;
+CREATE TABLE t1 ( pk int not null primary key,
+a int, b int not null, c varchar(20)) ENGINE=ndbcluster;
+insert into t1 value (1, 2, 3, 'First row');
+select * from t1;
+pk a b c
+1 2 3 First row
+show status like "Qcache_queries_in_cache";
+Variable_name Value
+Qcache_queries_in_cache 1
+show status like "Qcache_inserts";
+Variable_name Value
+Qcache_inserts 1
+show status like "Qcache_hits";
+Variable_name Value
+Qcache_hits 0
+select * from t1;
+pk a b c
+1 2 3 First row
+show status like "Qcache_hits";
+Variable_name Value
+Qcache_hits 1
+update t1 set a=3 where pk=1;
+select * from t1;
+pk a b c
+1 3 3 First row
+show status like "Qcache_inserts";
+Variable_name Value
+Qcache_inserts 2
+show status like "Qcache_hits";
+Variable_name Value
+Qcache_hits 1
+insert into t1 value (2, 7, 8, 'Second row');
+insert into t1 value (4, 5, 6, 'Fourth row');
+select * from t1;
+pk a b c
+2 7 8 Second row
+4 5 6 Fourth row
+1 3 3 First row
+show status like "Qcache_inserts";
+Variable_name Value
+Qcache_inserts 3
+show status like "Qcache_hits";
+Variable_name Value
+Qcache_hits 1
+select * from t1;
+pk a b c
+2 7 8 Second row
+4 5 6 Fourth row
+1 3 3 First row
+show status like "Qcache_hits";
+Variable_name Value
+Qcache_hits 2
+select * from t1 where b=3;
+pk a b c
+1 3 3 First row
+show status like "Qcache_queries_in_cache";
+Variable_name Value
+Qcache_queries_in_cache 2
+show status like "Qcache_hits";
+Variable_name Value
+Qcache_hits 2
+select * from t1 where b=3;
+pk a b c
+1 3 3 First row
+show status like "Qcache_hits";
+Variable_name Value
+Qcache_hits 3
+delete from t1 where c='Fourth row';
+show status like "Qcache_queries_in_cache";
+Variable_name Value
+Qcache_queries_in_cache 0
+select * from t1 where b=3;
+pk a b c
+1 3 3 First row
+show status like "Qcache_hits";
+Variable_name Value
+Qcache_hits 3
+use test;
+select * from t1;
+pk a b c
+2 7 8 Second row
+1 3 3 First row
+select * from t1 where b=3;
+pk a b c
+1 3 3 First row
+show status like "Qcache_hits";
+Variable_name Value
+Qcache_hits 4
+update t1 set a=4 where b=3;
+use test;
+show status like "Qcache_queries_in_cache";
+Variable_name Value
+Qcache_queries_in_cache 0
+select * from t1;
+pk a b c
+2 7 8 Second row
+1 4 3 First row
+select * from t1;
+pk a b c
+2 7 8 Second row
+1 4 3 First row
+show status like "Qcache_inserts";
+Variable_name Value
+Qcache_inserts 7
+show status like "Qcache_hits";
+Variable_name Value
+Qcache_hits 5
+select * from t1;
+pk a b c
+2 7 8 Second row
+1 4 3 First row
+select * from t1;
+pk a b c
+2 7 8 Second row
+1 4 3 First row
+show status like "Qcache_queries_in_cache";
+Variable_name Value
+Qcache_queries_in_cache 1
+show status like "Qcache_inserts";
+Variable_name Value
+Qcache_inserts 7
+show status like "Qcache_hits";
+Variable_name Value
+Qcache_hits 7
+begin;
+update t1 set a=5 where pk=1;
+show status like "Qcache_queries_in_cache";
+Variable_name Value
+Qcache_queries_in_cache 0
+show status like "Qcache_inserts";
+Variable_name Value
+Qcache_inserts 7
+show status like "Qcache_hits";
+Variable_name Value
+Qcache_hits 7
+select * from t1;
+pk a b c
+2 7 8 Second row
+1 4 3 First row
+show status like "Qcache_queries_in_cache";
+Variable_name Value
+Qcache_queries_in_cache 1
+show status like "Qcache_inserts";
+Variable_name Value
+Qcache_inserts 8
+show status like "Qcache_hits";
+Variable_name Value
+Qcache_hits 7
+commit;
+show status like "Qcache_queries_in_cache";
+Variable_name Value
+Qcache_queries_in_cache 1
+show status like "Qcache_inserts";
+Variable_name Value
+Qcache_inserts 8
+show status like "Qcache_hits";
+Variable_name Value
+Qcache_hits 7
+select * from t1;
+pk a b c
+2 7 8 Second row
+1 5 3 First row
+show status like "Qcache_inserts";
+Variable_name Value
+Qcache_inserts 9
+show status like "Qcache_hits";
+Variable_name Value
+Qcache_hits 7
+select * from t1;
+pk a b c
+2 7 8 Second row
+1 5 3 First row
+show status like "Qcache_queries_in_cache";
+Variable_name Value
+Qcache_queries_in_cache 1
+show status like "Qcache_inserts";
+Variable_name Value
+Qcache_inserts 9
+show status like "Qcache_hits";
+Variable_name Value
+Qcache_hits 8
+drop table t1;
+show status like "Qcache_queries_in_cache";
+Variable_name Value
+Qcache_queries_in_cache 0
+SET GLOBAL query_cache_size=0;
+SET GLOBAL ndb_cache_check_time=0;
diff --git a/mysql-test/r/ndb_cache_multi2.result b/mysql-test/r/ndb_cache_multi2.result
new file mode 100644
index 00000000000..6e435c071b5
--- /dev/null
+++ b/mysql-test/r/ndb_cache_multi2.result
@@ -0,0 +1,74 @@
+drop table if exists t1, t2;
+set GLOBAL query_cache_type=on;
+set GLOBAL query_cache_size=1355776;
+set GLOBAL ndb_cache_check_time=1;
+reset query cache;
+flush status;
+set GLOBAL query_cache_type=on;
+set GLOBAL query_cache_size=1355776;
+set GLOBAL ndb_cache_check_time=1;
+reset query cache;
+flush status;
+create table t1 (a int) engine=ndbcluster;
+create table t2 (a int) engine=ndbcluster;
+insert into t1 value (2);
+insert into t2 value (3);
+select * from t1;
+a
+2
+select * from t2;
+a
+3
+show status like "Qcache_queries_in_cache";
+Variable_name Value
+Qcache_queries_in_cache 2
+show status like "Qcache_inserts";
+Variable_name Value
+Qcache_inserts 2
+show status like "Qcache_hits";
+Variable_name Value
+Qcache_hits 0
+show status like "Qcache_queries_in_cache";
+Variable_name Value
+Qcache_queries_in_cache 0
+show status like "Qcache_inserts";
+Variable_name Value
+Qcache_inserts 0
+show status like "Qcache_hits";
+Variable_name Value
+Qcache_hits 0
+select * from t1;
+a
+2
+show status like "Qcache_queries_in_cache";
+Variable_name Value
+Qcache_queries_in_cache 1
+show status like "Qcache_inserts";
+Variable_name Value
+Qcache_inserts 1
+show status like "Qcache_hits";
+Variable_name Value
+Qcache_hits 0
+update t1 set a=3 where a=2;
+show status like "Qcache_queries_in_cache";
+Variable_name Value
+Qcache_queries_in_cache 2
+show status like "Qcache_inserts";
+Variable_name Value
+Qcache_inserts 2
+show status like "Qcache_hits";
+Variable_name Value
+Qcache_hits 0
+select * from t1;
+a
+3
+show status like "Qcache_queries_in_cache";
+Variable_name Value
+Qcache_queries_in_cache 2
+show status like "Qcache_inserts";
+Variable_name Value
+Qcache_inserts 3
+show status like "Qcache_hits";
+Variable_name Value
+Qcache_hits 0
+drop table t1, t2;
diff --git a/mysql-test/t/ndb_cache2.test b/mysql-test/t/ndb_cache2.test
new file mode 100644
index 00000000000..5c1674a7021
--- /dev/null
+++ b/mysql-test/t/ndb_cache2.test
@@ -0,0 +1,126 @@
+-- source include/have_query_cache.inc
+-- source include/have_ndb.inc
+
+--disable_warnings
+drop table if exists t1;
+--enable_warnings
+
+
+# Turn on and reset query cache
+set GLOBAL query_cache_type=on;
+set GLOBAL query_cache_size=1355776;
+# Turn on thread that will fetch commit count for open tables
+set GLOBAL ndb_cache_check_time=5;
+reset query cache;
+flush status;
+
+# Wait for thread to wake up and start "working"
+sleep 20;
+
+# Create test table in NDB
+CREATE TABLE t1 ( pk int not null primary key,
+ a int, b int not null, c varchar(20)) ENGINE=ndbcluster;
+insert into t1 value (1, 2, 3, 'First row');
+
+# Perform one query which should be inerted in query cache
+select * from t1;
+show status like "Qcache_queries_in_cache";
+show status like "Qcache_inserts";
+show status like "Qcache_hits";
+
+# Perform the same query and make sure the query cache is hit
+select * from t1;
+show status like "Qcache_hits";
+
+# Update the table and make sure the correct data is returned
+update t1 set a=3 where pk=1;
+select * from t1;
+show status like "Qcache_inserts";
+show status like "Qcache_hits";
+
+# Insert a new record and make sure the correct data is returned
+insert into t1 value (2, 7, 8, 'Second row');
+insert into t1 value (4, 5, 6, 'Fourth row');
+select * from t1;
+show status like "Qcache_inserts";
+show status like "Qcache_hits";
+select * from t1;
+show status like "Qcache_hits";
+
+# Perform a "new" query and make sure the query cache is not hit
+select * from t1 where b=3;
+show status like "Qcache_queries_in_cache";
+show status like "Qcache_hits";
+
+# Same query again...
+select * from t1 where b=3;
+show status like "Qcache_hits";
+
+# Delete from the table
+delete from t1 where c='Fourth row';
+show status like "Qcache_queries_in_cache";
+select * from t1 where b=3;
+show status like "Qcache_hits";
+
+# Start another connection and check that the query cache is hit
+connect (con1,localhost,root,,);
+connection con1;
+use test;
+select * from t1;
+select * from t1 where b=3;
+show status like "Qcache_hits";
+
+# Update the table and switch to other connection
+update t1 set a=4 where b=3;
+connect (con2,localhost,root,,);
+connection con2;
+use test;
+show status like "Qcache_queries_in_cache";
+select * from t1;
+select * from t1;
+show status like "Qcache_inserts";
+show status like "Qcache_hits";
+connection con1;
+select * from t1;
+select * from t1;
+show status like "Qcache_queries_in_cache";
+show status like "Qcache_inserts";
+show status like "Qcache_hits";
+
+# Use transactions and make sure the query cache is not updated until
+# transaction is commited
+begin;
+update t1 set a=5 where pk=1;
+show status like "Qcache_queries_in_cache";
+show status like "Qcache_inserts";
+show status like "Qcache_hits";
+connection con2;
+select * from t1;
+show status like "Qcache_queries_in_cache";
+show status like "Qcache_inserts";
+show status like "Qcache_hits";
+connection con1;
+commit;
+# Sleep to let the query cache thread update commit count
+sleep 10;
+show status like "Qcache_queries_in_cache";
+show status like "Qcache_inserts";
+show status like "Qcache_hits";
+connection con2;
+select * from t1;
+show status like "Qcache_inserts";
+show status like "Qcache_hits";
+connection con1;
+select * from t1;
+show status like "Qcache_queries_in_cache";
+show status like "Qcache_inserts";
+show status like "Qcache_hits";
+
+drop table t1;
+
+show status like "Qcache_queries_in_cache";
+
+SET GLOBAL query_cache_size=0;
+SET GLOBAL ndb_cache_check_time=0;
+
+
diff --git a/mysql-test/t/ndb_cache_multi2.test b/mysql-test/t/ndb_cache_multi2.test
new file mode 100644
index 00000000000..a9d008dba7c
--- /dev/null
+++ b/mysql-test/t/ndb_cache_multi2.test
@@ -0,0 +1,71 @@
+-- source include/have_query_cache.inc
+-- source include/have_ndb.inc
+-- source include/have_multi_ndb.inc
+
+--disable_warnings
+drop table if exists t1, t2;
+--enable_warnings
+
+
+# Turn on and reset query cache on server1
+connection server1;
+set GLOBAL query_cache_type=on;
+set GLOBAL query_cache_size=1355776;
+set GLOBAL ndb_cache_check_time=1;
+reset query cache;
+flush status;
+
+# Turn on and reset query cache on server2
+connection server2;
+set GLOBAL query_cache_type=on;
+set GLOBAL query_cache_size=1355776;
+set GLOBAL ndb_cache_check_time=1;
+reset query cache;
+flush status;
+
+# Sleep so that the query cache check thread has time to start
+sleep 15;
+
+
+# Create test tables in NDB and load them into cache
+# on server1
+connection server1;
+create table t1 (a int) engine=ndbcluster;
+create table t2 (a int) engine=ndbcluster;
+insert into t1 value (2);
+insert into t2 value (3);
+select * from t1;
+select * from t2;
+show status like "Qcache_queries_in_cache";
+show status like "Qcache_inserts";
+show status like "Qcache_hits";
+
+
+# Connect server2, load table in to cache, then update the table
+connection server2;
+show status like "Qcache_queries_in_cache";
+show status like "Qcache_inserts";
+show status like "Qcache_hits";
+select * from t1;
+show status like "Qcache_queries_in_cache";
+show status like "Qcache_inserts";
+show status like "Qcache_hits";
+update t1 set a=3 where a=2;
+
+# Sleep so that the query cache check thread has time to run
+sleep 5;
+
+# Connect to server1 and check that cache is invalidated
+# and correct data is returned
+connection server1;
+show status like "Qcache_queries_in_cache";
+show status like "Qcache_inserts";
+show status like "Qcache_hits";
+select * from t1;
+show status like "Qcache_queries_in_cache";
+show status like "Qcache_inserts";
+show status like "Qcache_hits";
+
+drop table t1, t2;
+
+
diff --git a/sql/ha_ndbcluster.cc b/sql/ha_ndbcluster.cc
index 0d83955a335..4f6e243db93 100644
--- a/sql/ha_ndbcluster.cc
+++ b/sql/ha_ndbcluster.cc
@@ -86,6 +86,12 @@ static int unpackfrm(const void **data, uint *len,
static int ndb_get_table_statistics(Ndb*, const char *,
Uint64* rows, Uint64* commits);
+// Util thread variables
+static pthread_t ndb_util_thread;
+pthread_mutex_t LOCK_ndb_util_thread;
+pthread_cond_t COND_ndb_util_thread;
+extern "C" pthread_handler_decl(ndb_util_thread_func, arg);
+ulong ndb_cache_check_time;
/*
Dummy buffer to read zero pack_length fields
@@ -3865,6 +3871,7 @@ ha_ndbcluster::~ha_ndbcluster()
}
+
/*
Open a table for further use
- fetch metadata for this table from NDB
@@ -3963,16 +3970,14 @@ void ha_ndbcluster::release_thd_ndb(Thd_ndb* thd_ndb)
Ndb* check_ndb_in_thd(THD* thd)
{
- DBUG_ENTER("check_ndb_in_thd");
- Thd_ndb *thd_ndb= (Thd_ndb*)thd->transaction.thd_ndb;
-
+ Thd_ndb *thd_ndb= (Thd_ndb*)thd->transaction.thd_ndb;
if (!thd_ndb)
{
if (!(thd_ndb= ha_ndbcluster::seize_thd_ndb()))
- DBUG_RETURN(NULL);
+ return NULL;
thd->transaction.thd_ndb= thd_ndb;
}
- DBUG_RETURN(thd_ndb->ndb);
+ return thd_ndb->ndb;
}
@@ -4310,13 +4315,21 @@ bool ndbcluster_init()
(void) hash_init(&ndbcluster_open_tables,system_charset_info,32,0,0,
(hash_get_key) ndbcluster_get_key,0,0);
pthread_mutex_init(&ndbcluster_mutex,MY_MUTEX_INIT_FAST);
+ pthread_mutex_init(&LOCK_ndb_util_thread,MY_MUTEX_INIT_FAST);
+ pthread_cond_init(&COND_ndb_util_thread,NULL);
+
+ // Create utility thread
+ pthread_t tmp;
+ if (pthread_create(&tmp,&connection_attrib,ndb_util_thread_func,0))
+ {
+ DBUG_PRINT("error", ("Could not create ndb utility thread"));
+ goto ndbcluster_init_error;
+ }
+
ndbcluster_inited= 1;
-#ifdef USE_DISCOVER_ON_STARTUP
- if (ndb_discover_tables() != 0)
- goto ndbcluster_init_error;
-#endif
DBUG_RETURN(FALSE);
+
ndbcluster_init_error:
ndbcluster_end();
DBUG_RETURN(TRUE);
@@ -4326,12 +4339,19 @@ bool ndbcluster_init()
/*
End use of the NDB Cluster table handler
- free all global variables allocated by
- ndcluster_init()
+ ndbcluster_init()
*/
bool ndbcluster_end()
{
DBUG_ENTER("ndbcluster_end");
+
+ // Kill ndb utility thread
+ (void) pthread_mutex_lock(&LOCK_ndb_util_thread);
+ DBUG_PRINT("exit",("killing ndb util thread: %lx",ndb_util_thread));
+ (void) pthread_cond_signal(&COND_ndb_util_thread);
+ (void) pthread_mutex_unlock(&LOCK_ndb_util_thread);
+
if(g_ndb)
delete g_ndb;
g_ndb= NULL;
@@ -4342,6 +4362,8 @@ bool ndbcluster_end()
DBUG_RETURN(0);
hash_free(&ndbcluster_open_tables);
pthread_mutex_destroy(&ndbcluster_mutex);
+ pthread_mutex_destroy(&LOCK_ndb_util_thread);
+ pthread_cond_destroy(&COND_ndb_util_thread);
ndbcluster_inited= 0;
DBUG_RETURN(0);
}
@@ -4534,12 +4556,53 @@ const char* ha_ndbcluster::index_type(uint key_number)
return "HASH";
}
}
+
uint8 ha_ndbcluster::table_cache_type()
{
DBUG_ENTER("ha_ndbcluster::table_cache_type=HA_CACHE_TBL_ASKTRANSACT");
DBUG_RETURN(HA_CACHE_TBL_ASKTRANSACT);
}
+
+uint ndb_get_commitcount(THD* thd, char* dbname, char* tabname,
+ Uint64* commit_count)
+{
+ DBUG_ENTER("ndb_get_commitcount");
+
+ if (ndb_cache_check_time > 0)
+ {
+ // Use cached commit_count from share
+ char name[FN_REFLEN];
+ NDB_SHARE* share;
+ (void)strxnmov(name, FN_REFLEN,
+ "./",dbname,"/",tabname,NullS);
+ DBUG_PRINT("info", ("name: %s", name));
+ pthread_mutex_lock(&ndbcluster_mutex);
+ if (!(share=(NDB_SHARE*) hash_search(&ndbcluster_open_tables,
+ (byte*) name,
+ strlen(name))))
+ {
+ pthread_mutex_unlock(&ndbcluster_mutex);
+ DBUG_RETURN(1);
+ }
+ *commit_count= share->commit_count;
+ DBUG_PRINT("info", ("commit_count: %d", *commit_count));
+ pthread_mutex_unlock(&ndbcluster_mutex);
+ DBUG_RETURN(0);
+ }
+
+ // Get commit_count from NDB
+ Ndb *ndb;
+ if (!(ndb= check_ndb_in_thd(thd)))
+ DBUG_RETURN(1);
+ ndb->setDatabaseName(dbname);
+
+ if (ndb_get_table_statistics(ndb, tabname, 0, commit_count))
+ DBUG_RETURN(1);
+ DBUG_RETURN(0);
+}
+
+
static
my_bool
ndbcluster_cache_retrieval_allowed(
@@ -4561,51 +4624,33 @@ ndbcluster_cache_retrieval_allowed(
all cached queries with this table*/
{
DBUG_ENTER("ndbcluster_cache_retrieval_allowed");
- char tabname[128];
- char *dbname= full_name;
- my_bool is_autocommit;
- {
- int dbname_len= strlen(full_name);
- int tabname_len= full_name_len-dbname_len-1;
- memcpy(tabname, full_name+dbname_len+1, tabname_len);
- tabname[tabname_len]= '\0';
- }
- if (thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
- is_autocommit = FALSE;
- else
- is_autocommit = TRUE;
+
+ Uint64 commit_count;
+ bool is_autocommit= !(thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN));
+ char* dbname= full_name;
+ char* tabname= dbname+strlen(dbname)+1;
+
DBUG_PRINT("enter",("dbname=%s, tabname=%s, autocommit=%d",
- dbname,tabname,is_autocommit));
+ dbname, tabname, is_autocommit));
+
if (!is_autocommit)
+ DBUG_RETURN(FALSE);
+
+ if (ndb_get_commitcount(thd, dbname, tabname, &commit_count))
{
- DBUG_PRINT("info",("OPTION_NOT_AUTOCOMMIT=%d OPTION_BEGIN=%d",
- thd->options & OPTION_NOT_AUTOCOMMIT,
- thd->options & OPTION_BEGIN));
- // ToDo enable cache inside a transaction
- // no need to invalidate though so leave *engine_data
+ *engine_data= *engine_data+1; // invalidate
DBUG_RETURN(FALSE);
}
+ DBUG_PRINT("info", ("*engine_data=%llu, commit_count=%llu",
+ *engine_data, commit_count));
+ if (*engine_data != commit_count)
{
- Ndb *ndb;
- Uint64 commit_count;
- if (!(ndb= check_ndb_in_thd(thd)))
- {
- *engine_data= *engine_data+1; // invalidate
- DBUG_RETURN(FALSE);
- }
- ndb->setDatabaseName(dbname);
- if (ndb_get_table_statistics(ndb, tabname, 0, &commit_count))
- {
- *engine_data= *engine_data+1; // invalidate
- DBUG_RETURN(FALSE);
- }
- if (*engine_data != commit_count)
- {
- *engine_data= commit_count; // invalidate
- DBUG_RETURN(FALSE);
- }
+ *engine_data= commit_count; // invalidate
+ DBUG_PRINT("exit",("Do not use cache, commit_count has changed"));
+ DBUG_RETURN(FALSE);
}
- DBUG_PRINT("exit",("*engine_data=%d ok, use cache",*engine_data));
+
+ DBUG_PRINT("exit",("OK to use cache, *engine_data=%llu",*engine_data));
DBUG_RETURN(TRUE);
}
@@ -4630,35 +4675,24 @@ ha_ndbcluster::cached_table_registration(
invalidate all cached queries with this table*/
{
DBUG_ENTER("ha_ndbcluster::cached_table_registration");
- my_bool is_autocommit;
- if (thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
- is_autocommit = FALSE;
- else
- is_autocommit = TRUE;
+
+ bool is_autocommit= !(thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN));
DBUG_PRINT("enter",("dbname=%s, tabname=%s, is_autocommit=%d",
m_dbname,m_tabname,is_autocommit));
if (!is_autocommit)
- {
- DBUG_PRINT("info",("OPTION_NOT_AUTOCOMMIT=%d OPTION_BEGIN=%d",
- thd->options & OPTION_NOT_AUTOCOMMIT,
- thd->options & OPTION_BEGIN));
- // ToDo enable cache inside a transaction
- // no need to invalidate though so leave *engine_data
DBUG_RETURN(FALSE);
- }
+
+
+ Uint64 commit_count;
+ if (ndb_get_commitcount(thd, m_dbname, m_tabname, &commit_count))
{
- Uint64 commit_count;
- Ndb *ndb= get_ndb();
- ndb->setDatabaseName(m_dbname);
- if (ndb_get_table_statistics(ndb, m_tabname, 0, &commit_count))
- {
- *engine_data= 0;
- DBUG_RETURN(FALSE);
- }
- *engine_data= commit_count;
+ *engine_data= 0;
+ DBUG_PRINT("error", ("Could not get commitcount"))
+ DBUG_RETURN(FALSE);
}
+ *engine_data= commit_count;
*engine_callback= ndbcluster_cache_retrieval_allowed;
- DBUG_PRINT("exit",("*engine_data=%d", *engine_data));
+ DBUG_PRINT("exit",("*engine_data=%llu", *engine_data));
DBUG_RETURN(TRUE);
}
@@ -4700,8 +4734,14 @@ static NDB_SHARE* get_share(const char *table_name)
}
thr_lock_init(&share->lock);
pthread_mutex_init(&share->mutex,MY_MUTEX_INIT_FAST);
+ share->commit_count= 0;
}
}
+ DBUG_PRINT("share",
+ ("table_name: %s, length: %d, use_count: %d, commit_count: %d",
+ share->table_name, share->table_name_length, share->use_count,
+ share->commit_count));
+
share->use_count++;
pthread_mutex_unlock(&ndbcluster_mutex);
return share;
@@ -4868,10 +4908,10 @@ ndb_get_table_statistics(Ndb* ndb, const char * table,
ndb->closeTransaction(pTrans);
if(row_count)
- * row_count= sum_rows;
+ *row_count= sum_rows;
if(commit_count)
- * commit_count= sum_commits;
- DBUG_PRINT("exit", ("records: %u commits: %u", sum_rows, sum_commits));
+ *commit_count= sum_commits;
+ DBUG_PRINT("exit", ("records: %llu commits: %llu", sum_rows, sum_commits));
DBUG_RETURN(0);
} while(0);
@@ -4906,4 +4946,124 @@ int ha_ndbcluster::write_ndb_file()
DBUG_RETURN(error);
}
+
+// Utility thread main loop
+extern "C" pthread_handler_decl(ndb_util_thread_func,arg __attribute__((unused)))
+{
+ THD *thd; // needs to be first for thread_stack
+ int error = 0;
+ struct timespec abstime;
+
+ my_thread_init();
+ DBUG_ENTER("ndb_util_thread");
+ DBUG_PRINT("enter", ("ndb_cache_check_time: %d", ndb_cache_check_time));
+
+ thd= new THD; // note that contructor of THD uses DBUG_ !
+ THD_CHECK_SENTRY(thd);
+
+ pthread_detach_this_thread();
+ ndb_util_thread = pthread_self();
+
+ thd->thread_stack = (char*)&thd; // remember where our stack is
+ if (thd->store_globals())
+ {
+ thd->cleanup();
+ delete thd;
+ DBUG_RETURN(NULL);
+ }
+
+ List<NDB_SHARE> util_open_tables;
+ set_timespec(abstime, ndb_cache_check_time);
+ for (;;)
+ {
+
+ pthread_mutex_lock(&LOCK_ndb_util_thread);
+ error= pthread_cond_timedwait(&COND_ndb_util_thread,
+ &LOCK_ndb_util_thread,
+ &abstime);
+ pthread_mutex_unlock(&LOCK_ndb_util_thread);
+
+ DBUG_PRINT("ndb_util_thread", ("Started, ndb_cache_check_time: %d",
+ ndb_cache_check_time));
+
+ if (abort_loop)
+ break; // Shutting down server
+
+ if (ndb_cache_check_time == 0)
+ {
+ set_timespec(abstime, 10);
+ continue;
+ }
+
+ // Set new time to wake up
+ set_timespec(abstime, ndb_cache_check_time);
+
+ // Lock mutex and fill list with pointers to all open tables
+ NDB_SHARE *share;
+ pthread_mutex_lock(&ndbcluster_mutex);
+ for (uint i= 0; i < ndbcluster_open_tables.records; i++)
+ {
+ share= (NDB_SHARE *)hash_element(&ndbcluster_open_tables, i);
+ share->use_count++; // Make sure the table can't be closed
+
+ DBUG_PRINT("ndb_util_thread",
+ ("Found open table[%d]: %s, use_count: %d",
+ i, share->table_name, share->use_count));
+
+ // Store pointer to table
+ util_open_tables.push_back(share);
+ }
+ pthread_mutex_unlock(&ndbcluster_mutex);
+
+
+ // Iterate through the open files list
+ List_iterator_fast<NDB_SHARE> it(util_open_tables);
+ while (share=it++)
+ {
+ // Split tab- and dbname
+ char buf[FN_REFLEN];
+ char *tabname, *db;
+ uint length= dirname_length(share->table_name);
+ tabname= share->table_name+length;
+ memcpy(buf, share->table_name, length-1);
+ buf[length-1]= 0;
+ db= buf+dirname_length(buf);
+ DBUG_PRINT("ndb_util_thread",
+ ("Fetching commit count for: %s, db: %s, tab: %s",
+ share->table_name, db, tabname));
+
+ // Contact NDB to get commit count for table
+ g_ndb->setDatabaseName(db);
+ Uint64 rows, commit_count;
+ if(ndb_get_table_statistics(g_ndb, tabname,
+ &rows, &commit_count) == 0){
+ DBUG_PRINT("ndb_util_thread",
+ ("Table: %s, rows: %llu, commit_count: %llu",
+ share->table_name, rows, commit_count));
+ share->commit_count= commit_count;
+ }
+ else
+ {
+ DBUG_PRINT("ndb_util_thread",
+ ("Error: Could not get commit count for table %s",
+ share->table_name));
+ share->commit_count++; // Invalidate
+ }
+ // Decrease the use count and possibly free share
+ free_share(share);
+ }
+
+ // Clear the list of open tables
+ util_open_tables.empty();
+
+ }
+
+ thd->cleanup();
+ delete thd;
+ DBUG_PRINT("exit", ("ndb_util_thread"));
+ my_thread_end();
+ DBUG_RETURN(NULL);
+}
+
+
#endif /* HAVE_NDBCLUSTER_DB */
diff --git a/sql/ha_ndbcluster.h b/sql/ha_ndbcluster.h
index b5cf727ead7..df88afa678a 100644
--- a/sql/ha_ndbcluster.h
+++ b/sql/ha_ndbcluster.h
@@ -38,6 +38,7 @@ class NdbBlob;
// connectstring to cluster if given by mysqld
extern const char *ndbcluster_connectstring;
+extern ulong ndb_cache_check_time;
typedef enum ndb_index_type {
UNDEFINED_INDEX = 0,
@@ -59,6 +60,7 @@ typedef struct st_ndbcluster_share {
pthread_mutex_t mutex;
char *table_name;
uint table_name_length,use_count;
+ uint commit_count;
} NDB_SHARE;
/*
diff --git a/sql/mysqld.cc b/sql/mysqld.cc
index d1fef3519bf..671f38898c1 100644
--- a/sql/mysqld.cc
+++ b/sql/mysqld.cc
@@ -284,6 +284,7 @@ my_bool opt_console= 0, opt_bdb, opt_innodb, opt_isam, opt_ndbcluster;
#ifdef HAVE_NDBCLUSTER_DB
const char *opt_ndbcluster_connectstring= 0;
my_bool opt_ndb_shm, opt_ndb_optimized_node_selection;
+ulong opt_ndb_cache_check_time= 0;
#endif
my_bool opt_readonly, use_temp_pool, relay_log_purge;
my_bool opt_sync_bdb_logs, opt_sync_frm;
@@ -4016,7 +4017,7 @@ enum options_mysqld
OPT_INNODB, OPT_ISAM,
OPT_NDBCLUSTER, OPT_NDB_CONNECTSTRING, OPT_NDB_USE_EXACT_COUNT,
OPT_NDB_FORCE_SEND, OPT_NDB_AUTOINCREMENT_PREFETCH_SZ,
- OPT_NDB_SHM, OPT_NDB_OPTIMIZED_NODE_SELECTION,
+ OPT_NDB_SHM, OPT_NDB_OPTIMIZED_NODE_SELECTION, OPT_NDB_CACHE_CHECK_TIME,
OPT_SKIP_SAFEMALLOC,
OPT_TEMP_POOL, OPT_TX_ISOLATION,
OPT_SKIP_STACK_TRACE, OPT_SKIP_SYMLINKS,
@@ -4498,6 +4499,10 @@ Disable with --skip-ndbcluster (will save memory).",
(gptr*) &opt_ndb_optimized_node_selection,
(gptr*) &opt_ndb_optimized_node_selection,
0, GET_BOOL, OPT_ARG, 1, 0, 0, 0, 0, 0},
+ { "ndb_cache_check_time", OPT_NDB_CACHE_CHECK_TIME,
+ "A dedicated thread is created to update cached commit count value at the given interval.",
+ (gptr*) &opt_ndb_cache_check_time, (gptr*) &opt_ndb_cache_check_time, 0, GET_ULONG, REQUIRED_ARG,
+ 0, 0, LONG_TIMEOUT, 0, 1, 0},
#endif
{"new", 'n', "Use very new possible 'unsafe' functions.",
(gptr*) &global_system_variables.new_mode,
diff --git a/sql/set_var.cc b/sql/set_var.cc
index 082c55db188..58c30c8e9bc 100644
--- a/sql/set_var.cc
+++ b/sql/set_var.cc
@@ -370,6 +370,7 @@ sys_var_thd_bool
sys_ndb_use_exact_count("ndb_use_exact_count", &SV::ndb_use_exact_count);
sys_var_thd_bool
sys_ndb_use_transactions("ndb_use_transactions", &SV::ndb_use_transactions);
+sys_var_long_ptr sys_ndb_cache_check_time("ndb_cache_check_time", &ndb_cache_check_time);
#endif
/* Time/date/datetime formats */
@@ -630,6 +631,7 @@ sys_var *sys_variables[]=
&sys_ndb_force_send,
&sys_ndb_use_exact_count,
&sys_ndb_use_transactions,
+ &sys_ndb_cache_check_time,
#endif
&sys_unique_checks,
&sys_warning_count
@@ -797,6 +799,7 @@ struct show_var_st init_vars[]= {
{sys_ndb_force_send.name, (char*) &sys_ndb_force_send, SHOW_SYS},
{sys_ndb_use_exact_count.name,(char*) &sys_ndb_use_exact_count, SHOW_SYS},
{sys_ndb_use_transactions.name,(char*) &sys_ndb_use_transactions, SHOW_SYS},
+ {sys_ndb_cache_check_time.name,(char*) &sys_ndb_cache_check_time, SHOW_SYS},
#endif
{sys_net_buffer_length.name,(char*) &sys_net_buffer_length, SHOW_SYS},
{sys_net_read_timeout.name, (char*) &sys_net_read_timeout, SHOW_SYS},