From a24e184fa0a58d4ad1a2c20c60383011c70911d2 Mon Sep 17 00:00:00 2001 From: Michael Erickson Date: Thu, 13 Feb 2020 12:51:24 -0800 Subject: rebrand plugin from "clustrixdb" to "xpand" --- mysql-test/suite/clustrixdb/basics.result | 49 - mysql-test/suite/clustrixdb/basics.test | 38 - mysql-test/suite/clustrixdb/my.cnf | 4 - mysql-test/suite/clustrixdb/suite.opt | 4 - mysql-test/suite/clustrixdb/update.result | 26 - mysql-test/suite/clustrixdb/update.test | 19 - mysql-test/suite/clustrixdb/upsert.result | 72 -- mysql-test/suite/clustrixdb/upsert.test | 49 - mysql-test/suite/xpand/basics.result | 49 + mysql-test/suite/xpand/basics.test | 38 + mysql-test/suite/xpand/my.cnf | 4 + mysql-test/suite/xpand/suite.opt | 4 + mysql-test/suite/xpand/update.result | 26 + mysql-test/suite/xpand/update.test | 19 + mysql-test/suite/xpand/upsert.result | 72 ++ mysql-test/suite/xpand/upsert.test | 49 + storage/clustrixdb/CMakeLists.txt | 24 - storage/clustrixdb/clustrix_connection.cc | 1190 ---------------------- storage/clustrixdb/clustrix_connection.h | 123 --- storage/clustrixdb/ha_clustrixdb.cc | 1372 -------------------------- storage/clustrixdb/ha_clustrixdb.h | 130 --- storage/clustrixdb/ha_clustrixdb_pushdown.cc | 478 --------- storage/clustrixdb/ha_clustrixdb_pushdown.h | 87 -- storage/xpand/CMakeLists.txt | 24 + storage/xpand/ha_xpand.cc | 1372 ++++++++++++++++++++++++++ storage/xpand/ha_xpand.h | 130 +++ storage/xpand/ha_xpand_pushdown.cc | 478 +++++++++ storage/xpand/ha_xpand_pushdown.h | 87 ++ storage/xpand/xpand_connection.cc | 1190 ++++++++++++++++++++++ storage/xpand/xpand_connection.h | 123 +++ 30 files changed, 3665 insertions(+), 3665 deletions(-) delete mode 100644 mysql-test/suite/clustrixdb/basics.result delete mode 100644 mysql-test/suite/clustrixdb/basics.test delete mode 100644 mysql-test/suite/clustrixdb/my.cnf delete mode 100644 mysql-test/suite/clustrixdb/suite.opt delete mode 100644 mysql-test/suite/clustrixdb/update.result delete mode 100644 mysql-test/suite/clustrixdb/update.test delete mode 100644 mysql-test/suite/clustrixdb/upsert.result delete mode 100644 mysql-test/suite/clustrixdb/upsert.test create mode 100644 mysql-test/suite/xpand/basics.result create mode 100644 mysql-test/suite/xpand/basics.test create mode 100644 mysql-test/suite/xpand/my.cnf create mode 100644 mysql-test/suite/xpand/suite.opt create mode 100644 mysql-test/suite/xpand/update.result create mode 100644 mysql-test/suite/xpand/update.test create mode 100644 mysql-test/suite/xpand/upsert.result create mode 100644 mysql-test/suite/xpand/upsert.test delete mode 100644 storage/clustrixdb/CMakeLists.txt delete mode 100644 storage/clustrixdb/clustrix_connection.cc delete mode 100644 storage/clustrixdb/clustrix_connection.h delete mode 100644 storage/clustrixdb/ha_clustrixdb.cc delete mode 100644 storage/clustrixdb/ha_clustrixdb.h delete mode 100644 storage/clustrixdb/ha_clustrixdb_pushdown.cc delete mode 100644 storage/clustrixdb/ha_clustrixdb_pushdown.h create mode 100644 storage/xpand/CMakeLists.txt create mode 100644 storage/xpand/ha_xpand.cc create mode 100644 storage/xpand/ha_xpand.h create mode 100644 storage/xpand/ha_xpand_pushdown.cc create mode 100644 storage/xpand/ha_xpand_pushdown.h create mode 100644 storage/xpand/xpand_connection.cc create mode 100644 storage/xpand/xpand_connection.h diff --git a/mysql-test/suite/clustrixdb/basics.result b/mysql-test/suite/clustrixdb/basics.result deleted file mode 100644 index 2af500cf9f1..00000000000 --- a/mysql-test/suite/clustrixdb/basics.result +++ /dev/null @@ -1,49 +0,0 @@ -CREATE DATABASE clx; -USE clx; -DROP TABLE IF EXISTS cx1; -Warnings: -Note 1051 Unknown table 'clx.cx1' -CREATE TABLE cx1(i BIGINT)ENGINE=clustrixdb; -CREATE TABLE cx1(i BIGINT)ENGINE=clustrixdb; -ERROR 42S01: Table 'cx1' already exists -INSERT INTO cx1 VALUES (42); -SELECT * FROM cx1; -i -42 -DROP TABLE cx1; -SHOW CREATE TABLE cx1; -ERROR 42S02: Table 'clx.cx1' doesn't exist -DROP TABLE IF EXISTS intandtext; -Warnings: -Note 1051 Unknown table 'clx.intandtext' -CREATE TABLE intandtext(i bigint, t text)ENGINE=clustrixdb; -INSERT INTO intandtext VALUES(10, 'someqwqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqq'); -SELECT i,t FROM intandtext; -i t -10 someqwqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqq -EXPLAIN SELECT i,t FROM intandtext; -id select_type table type possible_keys key key_len ref rows Extra -1 PUSHED SELECT NULL NULL NULL NULL NULL NULL NULL NULL -SET @@optimizer_switch='derived_merge=OFF'; -SET clustrixdb_select_handler=OFF; -SELECT i,t FROM (SELECT i,t FROM intandtext) t; -i t -10 someqwqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqq -EXPLAIN SELECT i,t FROM (SELECT i,t FROM intandtext) t; -id select_type table type possible_keys key key_len ref rows Extra -1 PRIMARY ALL NULL NULL NULL NULL 10000 -2 PUSHED DERIVED NULL NULL NULL NULL NULL NULL NULL NULL -SET clustrixdb_derived_handler=OFF; -SELECT i,t FROM intandtext; -i t -10 someqwqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqq -SELECT i,t FROM (SELECT i,t FROM intandtext) t; -i t -10 someqwqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqq -EXPLAIN SELECT i,t FROM (SELECT i,t FROM intandtext) t; -id select_type table type possible_keys key key_len ref rows Extra -1 PRIMARY ALL NULL NULL NULL NULL 10000 -2 DERIVED intandtext ALL NULL NULL NULL NULL 10000 -DROP TABLE intandtext; -USE test; -DROP DATABASE clx; diff --git a/mysql-test/suite/clustrixdb/basics.test b/mysql-test/suite/clustrixdb/basics.test deleted file mode 100644 index 81a81983bf3..00000000000 --- a/mysql-test/suite/clustrixdb/basics.test +++ /dev/null @@ -1,38 +0,0 @@ -CREATE DATABASE clx; -USE clx; - -DROP TABLE IF EXISTS cx1; -CREATE TABLE cx1(i BIGINT)ENGINE=clustrixdb; ---error ER_TABLE_EXISTS_ERROR -CREATE TABLE cx1(i BIGINT)ENGINE=clustrixdb; - -INSERT INTO cx1 VALUES (42); - -SELECT * FROM cx1; - -DROP TABLE cx1; - ---error ER_NO_SUCH_TABLE -SHOW CREATE TABLE cx1; - -DROP TABLE IF EXISTS intandtext; -CREATE TABLE intandtext(i bigint, t text)ENGINE=clustrixdb; -INSERT INTO intandtext VALUES(10, 'someqwqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqq'); - -SELECT i,t FROM intandtext; -EXPLAIN SELECT i,t FROM intandtext; - -SET @@optimizer_switch='derived_merge=OFF'; -SET clustrixdb_select_handler=OFF; -SELECT i,t FROM (SELECT i,t FROM intandtext) t; -EXPLAIN SELECT i,t FROM (SELECT i,t FROM intandtext) t; - -SET clustrixdb_derived_handler=OFF; -SELECT i,t FROM intandtext; -SELECT i,t FROM (SELECT i,t FROM intandtext) t; -EXPLAIN SELECT i,t FROM (SELECT i,t FROM intandtext) t; - -DROP TABLE intandtext; - -USE test; -DROP DATABASE clx; diff --git a/mysql-test/suite/clustrixdb/my.cnf b/mysql-test/suite/clustrixdb/my.cnf deleted file mode 100644 index 8105041b85c..00000000000 --- a/mysql-test/suite/clustrixdb/my.cnf +++ /dev/null @@ -1,4 +0,0 @@ -!include include/default_my.cnf - -[mysqld.1] -socket= /tmp/mysqld42.sock diff --git a/mysql-test/suite/clustrixdb/suite.opt b/mysql-test/suite/clustrixdb/suite.opt deleted file mode 100644 index 481f692cfdd..00000000000 --- a/mysql-test/suite/clustrixdb/suite.opt +++ /dev/null @@ -1,4 +0,0 @@ ---plugin-load=clustrixdb=ha_clustrixdb.so ---core-file ---clustrixdb_port=3306 ---plugin-maturity=unknown diff --git a/mysql-test/suite/clustrixdb/update.result b/mysql-test/suite/clustrixdb/update.result deleted file mode 100644 index fec6a409a7b..00000000000 --- a/mysql-test/suite/clustrixdb/update.result +++ /dev/null @@ -1,26 +0,0 @@ -CREATE DATABASE IF NOT EXISTS `db1`; -USE `db1`; -DROP TABLE IF EXISTS `t1`; -Warnings: -Note 1051 Unknown table 'db1.t1' -CREATE TABLE `t1`(i BIGINT, t TEXT)ENGINE=clustrixdb; -INSERT INTO `t1` (i, t) VALUES (42, 'один'); -INSERT INTO `t1` (i, t) VALUES (42, 'ноль'); -SELECT * FROM `t1` ORDER BY `i` DESC, `t` DESC; -i t -42 один -42 ноль -UPDATE `t1` SET i=i+1,t='два' WHERE t='один'; -SELECT * FROM `t1` ORDER BY `i` DESC, `t` DESC; -i t -42 один -42 ноль -USE test; -UPDATE `db1`.`t1` SET i=i+1,t='три' WHERE t='два'; -SELECT * FROM `db1`.`t1` ORDER BY `i` DESC, `t` DESC; -i t -42 один -42 ноль -DROP TABLE `db1`.`t1`; -USE test; -DROP DATABASE `db1`; diff --git a/mysql-test/suite/clustrixdb/update.test b/mysql-test/suite/clustrixdb/update.test deleted file mode 100644 index 599d7f348ac..00000000000 --- a/mysql-test/suite/clustrixdb/update.test +++ /dev/null @@ -1,19 +0,0 @@ -CREATE DATABASE IF NOT EXISTS `db1`; -USE `db1`; -DROP TABLE IF EXISTS `t1`; -CREATE TABLE `t1`(i BIGINT, t TEXT)ENGINE=clustrixdb; -INSERT INTO `t1` (i, t) VALUES (42, 'один'); -INSERT INTO `t1` (i, t) VALUES (42, 'ноль'); -SELECT * FROM `t1` ORDER BY `i` DESC, `t` DESC; - -UPDATE `t1` SET i=i+1,t='два' WHERE t='один'; -SELECT * FROM `t1` ORDER BY `i` DESC, `t` DESC; - -USE test; -UPDATE `db1`.`t1` SET i=i+1,t='три' WHERE t='два'; -SELECT * FROM `db1`.`t1` ORDER BY `i` DESC, `t` DESC; - -DROP TABLE `db1`.`t1`; - -USE test; -DROP DATABASE `db1`; diff --git a/mysql-test/suite/clustrixdb/upsert.result b/mysql-test/suite/clustrixdb/upsert.result deleted file mode 100644 index f30cfe95314..00000000000 --- a/mysql-test/suite/clustrixdb/upsert.result +++ /dev/null @@ -1,72 +0,0 @@ -CREATE DATABASE IF NOT EXISTS `db1`; -USE `db1`; -DROP TABLE IF EXISTS `ins_duplicate`; -Warnings: -Note 1051 Unknown table 'db1.ins_duplicate' -CREATE TABLE `ins_duplicate`(`id` INT PRIMARY KEY, `animal` VARCHAR(30)) ENGINE=clustrixdb; -INSERT INTO `ins_duplicate` VALUES (1,'Aardvark'), (2,'Cheetah'), (3,'Zebra'); -SELECT * FROM `ins_duplicate` ORDER BY `id`; -id animal -1 Aardvark -2 Cheetah -3 Zebra -INSERT INTO ins_duplicate VALUES (1,'Antelope'); -ERROR 23000: Clustrix error: [5120] Duplicate key in container: `db1`.`ins_duplicate` Primary key: (1) -INSERT INTO ins_duplicate VALUES (1,'Antelope') ON DUPLICATE KEY UPDATE animal='Banana'; -SELECT * FROM `ins_duplicate` ORDER BY `id`; -id animal -1 Banana -2 Cheetah -3 Zebra -INSERT INTO ins_duplicate VALUES (1,'Antelope'),(2,'Cheetah'); -ERROR 23000: Clustrix error: [5120] Duplicate key in container: `db1`.`ins_duplicate` Primary key: (1) -INSERT INTO ins_duplicate VALUES (1,'Antelope'),(2,'Cheetah') ON DUPLICATE KEY UPDATE animal='hybrid'; -SELECT * FROM `ins_duplicate` ORDER BY `id`; -id animal -1 hybrid -2 hybrid -3 Zebra -BEGIN; -SELECT * FROM `ins_duplicate` ORDER BY `id`; -id animal -1 hybrid -2 hybrid -3 Zebra -INSERT INTO ins_duplicate VALUES (1,'Antelope'); -ERROR 23000: Clustrix error: [5120] Duplicate key in container: `db1`.`ins_duplicate` Primary key: (1) -INSERT INTO ins_duplicate VALUES (1,'Antelope') ON DUPLICATE KEY UPDATE animal='Vegetable'; -SELECT * FROM `ins_duplicate` ORDER BY `id`; -id animal -1 Vegetable -2 hybrid -3 Zebra -INSERT INTO ins_duplicate VALUES (1,'Antelope'),(2,'Cheetah'); -ERROR 23000: Clustrix error: [5120] Duplicate key in container: `db1`.`ins_duplicate` Primary key: (1) -INSERT INTO ins_duplicate VALUES (1,'Antelope'),(2,'Cheetah') ON DUPLICATE KEY UPDATE animal='hybrid2'; -COMMIT; -BEGIN; -SELECT * FROM `ins_duplicate` ORDER BY `id`; -id animal -1 hybrid2 -2 hybrid2 -3 Zebra -INSERT INTO ins_duplicate VALUES (1,'Antelope'); -ERROR 23000: Clustrix error: [5120] Duplicate key in container: `db1`.`ins_duplicate` Primary key: (1) -INSERT INTO ins_duplicate VALUES (1,'Antelope') ON DUPLICATE KEY UPDATE animal='Vegetable'; -SELECT * FROM `ins_duplicate` ORDER BY `id`; -id animal -1 Vegetable -2 hybrid2 -3 Zebra -INSERT INTO ins_duplicate VALUES (1,'Antelope'),(2,'Cheetah'); -ERROR 23000: Clustrix error: [5120] Duplicate key in container: `db1`.`ins_duplicate` Primary key: (1) -INSERT INTO ins_duplicate VALUES (1,'Antelope'),(2,'Cheetah') ON DUPLICATE KEY UPDATE animal='hybrid3'; -ROLLBACK; -SELECT * FROM `ins_duplicate` ORDER BY `id`; -id animal -1 hybrid2 -2 hybrid2 -3 Zebra -DROP TABLE `db1`.`ins_duplicate`; -USE test; -DROP DATABASE `db1`; diff --git a/mysql-test/suite/clustrixdb/upsert.test b/mysql-test/suite/clustrixdb/upsert.test deleted file mode 100644 index badd6cb50d4..00000000000 --- a/mysql-test/suite/clustrixdb/upsert.test +++ /dev/null @@ -1,49 +0,0 @@ -CREATE DATABASE IF NOT EXISTS `db1`; -USE `db1`; -DROP TABLE IF EXISTS `ins_duplicate`; -CREATE TABLE `ins_duplicate`(`id` INT PRIMARY KEY, `animal` VARCHAR(30)) ENGINE=clustrixdb; -INSERT INTO `ins_duplicate` VALUES (1,'Aardvark'), (2,'Cheetah'), (3,'Zebra'); -SELECT * FROM `ins_duplicate` ORDER BY `id`; - ---error ER_DUP_ENTRY -INSERT INTO ins_duplicate VALUES (1,'Antelope'); -INSERT INTO ins_duplicate VALUES (1,'Antelope') ON DUPLICATE KEY UPDATE animal='Banana'; -SELECT * FROM `ins_duplicate` ORDER BY `id`; - ---error ER_DUP_ENTRY -INSERT INTO ins_duplicate VALUES (1,'Antelope'),(2,'Cheetah'); -INSERT INTO ins_duplicate VALUES (1,'Antelope'),(2,'Cheetah') ON DUPLICATE KEY UPDATE animal='hybrid'; -SELECT * FROM `ins_duplicate` ORDER BY `id`; - -BEGIN; -SELECT * FROM `ins_duplicate` ORDER BY `id`; - ---error ER_DUP_ENTRY -INSERT INTO ins_duplicate VALUES (1,'Antelope'); -INSERT INTO ins_duplicate VALUES (1,'Antelope') ON DUPLICATE KEY UPDATE animal='Vegetable'; -SELECT * FROM `ins_duplicate` ORDER BY `id`; - ---error ER_DUP_ENTRY -INSERT INTO ins_duplicate VALUES (1,'Antelope'),(2,'Cheetah'); -INSERT INTO ins_duplicate VALUES (1,'Antelope'),(2,'Cheetah') ON DUPLICATE KEY UPDATE animal='hybrid2'; -COMMIT; - -BEGIN; -SELECT * FROM `ins_duplicate` ORDER BY `id`; - ---error ER_DUP_ENTRY -INSERT INTO ins_duplicate VALUES (1,'Antelope'); -INSERT INTO ins_duplicate VALUES (1,'Antelope') ON DUPLICATE KEY UPDATE animal='Vegetable'; -SELECT * FROM `ins_duplicate` ORDER BY `id`; - ---error ER_DUP_ENTRY -INSERT INTO ins_duplicate VALUES (1,'Antelope'),(2,'Cheetah'); -INSERT INTO ins_duplicate VALUES (1,'Antelope'),(2,'Cheetah') ON DUPLICATE KEY UPDATE animal='hybrid3'; -ROLLBACK; - -SELECT * FROM `ins_duplicate` ORDER BY `id`; - -DROP TABLE `db1`.`ins_duplicate`; - -USE test; -DROP DATABASE `db1`; diff --git a/mysql-test/suite/xpand/basics.result b/mysql-test/suite/xpand/basics.result new file mode 100644 index 00000000000..07fb56dc072 --- /dev/null +++ b/mysql-test/suite/xpand/basics.result @@ -0,0 +1,49 @@ +CREATE DATABASE xpd; +USE xpd; +DROP TABLE IF EXISTS cx1; +Warnings: +Note 1051 Unknown table 'xpd.cx1' +CREATE TABLE cx1(i BIGINT)ENGINE=xpand; +CREATE TABLE cx1(i BIGINT)ENGINE=xpand; +ERROR 42S01: Table 'cx1' already exists +INSERT INTO cx1 VALUES (42); +SELECT * FROM cx1; +i +42 +DROP TABLE cx1; +SHOW CREATE TABLE cx1; +ERROR 42S02: Table 'xpd.cx1' doesn't exist +DROP TABLE IF EXISTS intandtext; +Warnings: +Note 1051 Unknown table 'xpd.intandtext' +CREATE TABLE intandtext(i bigint, t text)ENGINE=xpand; +INSERT INTO intandtext VALUES(10, 'someqwqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqq'); +SELECT i,t FROM intandtext; +i t +10 someqwqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqq +EXPLAIN SELECT i,t FROM intandtext; +id select_type table type possible_keys key key_len ref rows Extra +1 PUSHED SELECT NULL NULL NULL NULL NULL NULL NULL NULL +SET @@optimizer_switch='derived_merge=OFF'; +SET xpand_select_handler=OFF; +SELECT i,t FROM (SELECT i,t FROM intandtext) t; +i t +10 someqwqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqq +EXPLAIN SELECT i,t FROM (SELECT i,t FROM intandtext) t; +id select_type table type possible_keys key key_len ref rows Extra +1 PRIMARY ALL NULL NULL NULL NULL 10000 +2 PUSHED DERIVED NULL NULL NULL NULL NULL NULL NULL NULL +SET xpand_derived_handler=OFF; +SELECT i,t FROM intandtext; +i t +10 someqwqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqq +SELECT i,t FROM (SELECT i,t FROM intandtext) t; +i t +10 someqwqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqq +EXPLAIN SELECT i,t FROM (SELECT i,t FROM intandtext) t; +id select_type table type possible_keys key key_len ref rows Extra +1 PRIMARY ALL NULL NULL NULL NULL 10000 +2 DERIVED intandtext ALL NULL NULL NULL NULL 10000 +DROP TABLE intandtext; +USE test; +DROP DATABASE xpd; diff --git a/mysql-test/suite/xpand/basics.test b/mysql-test/suite/xpand/basics.test new file mode 100644 index 00000000000..0284086bfbd --- /dev/null +++ b/mysql-test/suite/xpand/basics.test @@ -0,0 +1,38 @@ +CREATE DATABASE xpd; +USE xpd; + +DROP TABLE IF EXISTS cx1; +CREATE TABLE cx1(i BIGINT)ENGINE=xpand; +--error ER_TABLE_EXISTS_ERROR +CREATE TABLE cx1(i BIGINT)ENGINE=xpand; + +INSERT INTO cx1 VALUES (42); + +SELECT * FROM cx1; + +DROP TABLE cx1; + +--error ER_NO_SUCH_TABLE +SHOW CREATE TABLE cx1; + +DROP TABLE IF EXISTS intandtext; +CREATE TABLE intandtext(i bigint, t text)ENGINE=xpand; +INSERT INTO intandtext VALUES(10, 'someqwqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqq'); + +SELECT i,t FROM intandtext; +EXPLAIN SELECT i,t FROM intandtext; + +SET @@optimizer_switch='derived_merge=OFF'; +SET xpand_select_handler=OFF; +SELECT i,t FROM (SELECT i,t FROM intandtext) t; +EXPLAIN SELECT i,t FROM (SELECT i,t FROM intandtext) t; + +SET xpand_derived_handler=OFF; +SELECT i,t FROM intandtext; +SELECT i,t FROM (SELECT i,t FROM intandtext) t; +EXPLAIN SELECT i,t FROM (SELECT i,t FROM intandtext) t; + +DROP TABLE intandtext; + +USE test; +DROP DATABASE xpd; diff --git a/mysql-test/suite/xpand/my.cnf b/mysql-test/suite/xpand/my.cnf new file mode 100644 index 00000000000..8105041b85c --- /dev/null +++ b/mysql-test/suite/xpand/my.cnf @@ -0,0 +1,4 @@ +!include include/default_my.cnf + +[mysqld.1] +socket= /tmp/mysqld42.sock diff --git a/mysql-test/suite/xpand/suite.opt b/mysql-test/suite/xpand/suite.opt new file mode 100644 index 00000000000..1e15cb158ae --- /dev/null +++ b/mysql-test/suite/xpand/suite.opt @@ -0,0 +1,4 @@ +--plugin-load=xpand=ha_xpand.so +--core-file +--xpand_port=3306 +--plugin-maturity=unknown diff --git a/mysql-test/suite/xpand/update.result b/mysql-test/suite/xpand/update.result new file mode 100644 index 00000000000..9b39f318ca1 --- /dev/null +++ b/mysql-test/suite/xpand/update.result @@ -0,0 +1,26 @@ +CREATE DATABASE IF NOT EXISTS `db1`; +USE `db1`; +DROP TABLE IF EXISTS `t1`; +Warnings: +Note 1051 Unknown table 'db1.t1' +CREATE TABLE `t1`(i BIGINT, t TEXT)ENGINE=xpand; +INSERT INTO `t1` (i, t) VALUES (42, 'один'); +INSERT INTO `t1` (i, t) VALUES (42, 'ноль'); +SELECT * FROM `t1` ORDER BY `i` DESC, `t` DESC; +i t +42 один +42 ноль +UPDATE `t1` SET i=i+1,t='два' WHERE t='один'; +SELECT * FROM `t1` ORDER BY `i` DESC, `t` DESC; +i t +42 один +42 ноль +USE test; +UPDATE `db1`.`t1` SET i=i+1,t='три' WHERE t='два'; +SELECT * FROM `db1`.`t1` ORDER BY `i` DESC, `t` DESC; +i t +42 один +42 ноль +DROP TABLE `db1`.`t1`; +USE test; +DROP DATABASE `db1`; diff --git a/mysql-test/suite/xpand/update.test b/mysql-test/suite/xpand/update.test new file mode 100644 index 00000000000..eb40a19d7ac --- /dev/null +++ b/mysql-test/suite/xpand/update.test @@ -0,0 +1,19 @@ +CREATE DATABASE IF NOT EXISTS `db1`; +USE `db1`; +DROP TABLE IF EXISTS `t1`; +CREATE TABLE `t1`(i BIGINT, t TEXT)ENGINE=xpand; +INSERT INTO `t1` (i, t) VALUES (42, 'один'); +INSERT INTO `t1` (i, t) VALUES (42, 'ноль'); +SELECT * FROM `t1` ORDER BY `i` DESC, `t` DESC; + +UPDATE `t1` SET i=i+1,t='два' WHERE t='один'; +SELECT * FROM `t1` ORDER BY `i` DESC, `t` DESC; + +USE test; +UPDATE `db1`.`t1` SET i=i+1,t='три' WHERE t='два'; +SELECT * FROM `db1`.`t1` ORDER BY `i` DESC, `t` DESC; + +DROP TABLE `db1`.`t1`; + +USE test; +DROP DATABASE `db1`; diff --git a/mysql-test/suite/xpand/upsert.result b/mysql-test/suite/xpand/upsert.result new file mode 100644 index 00000000000..13c948c0a60 --- /dev/null +++ b/mysql-test/suite/xpand/upsert.result @@ -0,0 +1,72 @@ +CREATE DATABASE IF NOT EXISTS `db1`; +USE `db1`; +DROP TABLE IF EXISTS `ins_duplicate`; +Warnings: +Note 1051 Unknown table 'db1.ins_duplicate' +CREATE TABLE `ins_duplicate`(`id` INT PRIMARY KEY, `animal` VARCHAR(30)) ENGINE=xpand; +INSERT INTO `ins_duplicate` VALUES (1,'Aardvark'), (2,'Cheetah'), (3,'Zebra'); +SELECT * FROM `ins_duplicate` ORDER BY `id`; +id animal +1 Aardvark +2 Cheetah +3 Zebra +INSERT INTO ins_duplicate VALUES (1,'Antelope'); +ERROR 23000: Xpand error: [5120] Duplicate key in container: `db1`.`ins_duplicate` Primary key: (1) +INSERT INTO ins_duplicate VALUES (1,'Antelope') ON DUPLICATE KEY UPDATE animal='Banana'; +SELECT * FROM `ins_duplicate` ORDER BY `id`; +id animal +1 Banana +2 Cheetah +3 Zebra +INSERT INTO ins_duplicate VALUES (1,'Antelope'),(2,'Cheetah'); +ERROR 23000: Xpand error: [5120] Duplicate key in container: `db1`.`ins_duplicate` Primary key: (1) +INSERT INTO ins_duplicate VALUES (1,'Antelope'),(2,'Cheetah') ON DUPLICATE KEY UPDATE animal='hybrid'; +SELECT * FROM `ins_duplicate` ORDER BY `id`; +id animal +1 hybrid +2 hybrid +3 Zebra +BEGIN; +SELECT * FROM `ins_duplicate` ORDER BY `id`; +id animal +1 hybrid +2 hybrid +3 Zebra +INSERT INTO ins_duplicate VALUES (1,'Antelope'); +ERROR 23000: Xpand error: [5120] Duplicate key in container: `db1`.`ins_duplicate` Primary key: (1) +INSERT INTO ins_duplicate VALUES (1,'Antelope') ON DUPLICATE KEY UPDATE animal='Vegetable'; +SELECT * FROM `ins_duplicate` ORDER BY `id`; +id animal +1 Vegetable +2 hybrid +3 Zebra +INSERT INTO ins_duplicate VALUES (1,'Antelope'),(2,'Cheetah'); +ERROR 23000: Xpand error: [5120] Duplicate key in container: `db1`.`ins_duplicate` Primary key: (1) +INSERT INTO ins_duplicate VALUES (1,'Antelope'),(2,'Cheetah') ON DUPLICATE KEY UPDATE animal='hybrid2'; +COMMIT; +BEGIN; +SELECT * FROM `ins_duplicate` ORDER BY `id`; +id animal +1 hybrid2 +2 hybrid2 +3 Zebra +INSERT INTO ins_duplicate VALUES (1,'Antelope'); +ERROR 23000: Xpand error: [5120] Duplicate key in container: `db1`.`ins_duplicate` Primary key: (1) +INSERT INTO ins_duplicate VALUES (1,'Antelope') ON DUPLICATE KEY UPDATE animal='Vegetable'; +SELECT * FROM `ins_duplicate` ORDER BY `id`; +id animal +1 Vegetable +2 hybrid2 +3 Zebra +INSERT INTO ins_duplicate VALUES (1,'Antelope'),(2,'Cheetah'); +ERROR 23000: Xpand error: [5120] Duplicate key in container: `db1`.`ins_duplicate` Primary key: (1) +INSERT INTO ins_duplicate VALUES (1,'Antelope'),(2,'Cheetah') ON DUPLICATE KEY UPDATE animal='hybrid3'; +ROLLBACK; +SELECT * FROM `ins_duplicate` ORDER BY `id`; +id animal +1 hybrid2 +2 hybrid2 +3 Zebra +DROP TABLE `db1`.`ins_duplicate`; +USE test; +DROP DATABASE `db1`; diff --git a/mysql-test/suite/xpand/upsert.test b/mysql-test/suite/xpand/upsert.test new file mode 100644 index 00000000000..7763cb27d61 --- /dev/null +++ b/mysql-test/suite/xpand/upsert.test @@ -0,0 +1,49 @@ +CREATE DATABASE IF NOT EXISTS `db1`; +USE `db1`; +DROP TABLE IF EXISTS `ins_duplicate`; +CREATE TABLE `ins_duplicate`(`id` INT PRIMARY KEY, `animal` VARCHAR(30)) ENGINE=xpand; +INSERT INTO `ins_duplicate` VALUES (1,'Aardvark'), (2,'Cheetah'), (3,'Zebra'); +SELECT * FROM `ins_duplicate` ORDER BY `id`; + +--error ER_DUP_ENTRY +INSERT INTO ins_duplicate VALUES (1,'Antelope'); +INSERT INTO ins_duplicate VALUES (1,'Antelope') ON DUPLICATE KEY UPDATE animal='Banana'; +SELECT * FROM `ins_duplicate` ORDER BY `id`; + +--error ER_DUP_ENTRY +INSERT INTO ins_duplicate VALUES (1,'Antelope'),(2,'Cheetah'); +INSERT INTO ins_duplicate VALUES (1,'Antelope'),(2,'Cheetah') ON DUPLICATE KEY UPDATE animal='hybrid'; +SELECT * FROM `ins_duplicate` ORDER BY `id`; + +BEGIN; +SELECT * FROM `ins_duplicate` ORDER BY `id`; + +--error ER_DUP_ENTRY +INSERT INTO ins_duplicate VALUES (1,'Antelope'); +INSERT INTO ins_duplicate VALUES (1,'Antelope') ON DUPLICATE KEY UPDATE animal='Vegetable'; +SELECT * FROM `ins_duplicate` ORDER BY `id`; + +--error ER_DUP_ENTRY +INSERT INTO ins_duplicate VALUES (1,'Antelope'),(2,'Cheetah'); +INSERT INTO ins_duplicate VALUES (1,'Antelope'),(2,'Cheetah') ON DUPLICATE KEY UPDATE animal='hybrid2'; +COMMIT; + +BEGIN; +SELECT * FROM `ins_duplicate` ORDER BY `id`; + +--error ER_DUP_ENTRY +INSERT INTO ins_duplicate VALUES (1,'Antelope'); +INSERT INTO ins_duplicate VALUES (1,'Antelope') ON DUPLICATE KEY UPDATE animal='Vegetable'; +SELECT * FROM `ins_duplicate` ORDER BY `id`; + +--error ER_DUP_ENTRY +INSERT INTO ins_duplicate VALUES (1,'Antelope'),(2,'Cheetah'); +INSERT INTO ins_duplicate VALUES (1,'Antelope'),(2,'Cheetah') ON DUPLICATE KEY UPDATE animal='hybrid3'; +ROLLBACK; + +SELECT * FROM `ins_duplicate` ORDER BY `id`; + +DROP TABLE `db1`.`ins_duplicate`; + +USE test; +DROP DATABASE `db1`; diff --git a/storage/clustrixdb/CMakeLists.txt b/storage/clustrixdb/CMakeLists.txt deleted file mode 100644 index 42237eae6e9..00000000000 --- a/storage/clustrixdb/CMakeLists.txt +++ /dev/null @@ -1,24 +0,0 @@ -#***************************************************************************** -# Copyright (c) 2019, MariaDB Corporation. -#****************************************************************************/ - -IF(MSVC) - # Temporarily disable "conversion from size_t .." - IF(CMAKE_SIZEOF_VOID_P EQUAL 8) - SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} /wd4267") - ENDIF() -ENDIF() - -SET(CLUSTRIXDB_PLUGIN_STATIC "clustrixdb") -SET(CLUSTRIXDB_PLUGIN_DYNAMIC "ha_clustrixdb") -SET(CLUSTRIXDB_SOURCES ha_clustrixdb.cc clustrix_connection.cc ha_clustrixdb_pushdown.cc) -MYSQL_ADD_PLUGIN(clustrixdb ${CLUSTRIXDB_SOURCES} STORAGE_ENGINE) - -IF(MSVC) - IF (CMAKE_BUILD_TYPE STREQUAL "Debug") - ADD_CUSTOM_COMMAND(TARGET clustrixdb - POST_BUILD - COMMAND if not exist ..\\..\\sql\\lib mkdir ..\\..\\sql\\lib\\plugin - COMMAND copy Debug\\ha_clustrixdb.dll ..\\..\\sql\\lib\\plugin\\ha_clustrixdb.dll) - ENDIF() -ENDIF() diff --git a/storage/clustrixdb/clustrix_connection.cc b/storage/clustrixdb/clustrix_connection.cc deleted file mode 100644 index f03f43acaeb..00000000000 --- a/storage/clustrixdb/clustrix_connection.cc +++ /dev/null @@ -1,1190 +0,0 @@ -/***************************************************************************** -Copyright (c) 2019, MariaDB Corporation. -*****************************************************************************/ - -/** @file clustrix_connection.cc */ - -#include "clustrix_connection.h" -#include -#include "errmsg.h" -#include "handler.h" -#include "table.h" - -extern int clustrix_connect_timeout; -extern int clustrix_read_timeout; -extern int clustrix_write_timeout; -extern char *clustrix_host; -extern char *clustrix_username; -extern char *clustrix_password; -extern uint clustrix_port; -extern char *clustrix_socket; - -/* - This class implements the commands that can be sent to the cluster by the - Xpand engine. All of these commands return a status to the caller, but some - commands also create open invocations on the cluster, which must be closed by - sending additional commands. - - Transactions on the cluster are started using flags attached to commands, and - transactions are committed or rolled back using separate commands. - - Methods ending with _next affect the transaction state after the next command - is sent to the cluster. Other transaction commands are sent to the cluster - immediately, and the state is changed before they return. - - _____________________ _______________________ - | | | | | | - V | | V | | - NONE --> REQUESTED --> STARTED --> NEW_STMT | - | | - `----> ROLLBACK_STMT ---` - - The commit and rollback commands will change any other state to NONE. This - includes the REQUESTED state, for which nothing will be sent to the cluster. - The rollback statement command can likewise change the state from NEW_STMT to - STARTED without sending anything to the cluster. - - In addition, the CLUSTRIX_TRANS_AUTOCOMMIT flag will cause the transactions - for commands that complete without leaving open invocations on the cluster to - be committed if successful or rolled back if there was an error. If - auto-commit is enabled, only one open invocation may be in progress at a - time. -*/ - -enum clustrix_trans_state { - CLUSTRIX_TRANS_STARTED = 0, - CLUSTRIX_TRANS_REQUESTED = 1, - CLUSTRIX_TRANS_NEW_STMT = 2, - CLUSTRIX_TRANS_ROLLBACK_STMT = 4, - CLUSTRIX_TRANS_NONE = 32, -}; - -enum clustrix_trans_post_flags { - CLUSTRIX_TRANS_AUTOCOMMIT = 8, - CLUSTRIX_TRANS_NO_POST_FLAGS = 0, -}; - -enum clustrix_commands { - CLUSTRIX_WRITE_ROW = 1, - CLUSTRIX_SCAN_TABLE, - CLUSTRIX_SCAN_NEXT, - CLUSTRIX_SCAN_STOP, - CLUSTRIX_KEY_READ, - CLUSTRIX_KEY_DELETE, - CLUSTRIX_SCAN_QUERY, - CLUSTRIX_KEY_UPDATE, - CLUSTRIX_SCAN_FROM_KEY, - CLUSTRIX_UPDATE_QUERY, - CLUSTRIX_COMMIT, - CLUSTRIX_ROLLBACK, -}; - -/**************************************************************************** -** Class clustrix_connection -****************************************************************************/ -clustrix_connection::clustrix_connection() - : command_buffer(NULL), command_buffer_length(0), command_length(0), - trans_state(CLUSTRIX_TRANS_NONE), trans_flags(CLUSTRIX_TRANS_NO_POST_FLAGS) -{ - DBUG_ENTER("clustrix_connection::clustrix_connection"); - memset(&clustrix_net, 0, sizeof(MYSQL)); - DBUG_VOID_RETURN; -} - -clustrix_connection::~clustrix_connection() -{ - DBUG_ENTER("clustrix_connection::~clustrix_connection"); - if (is_connected()) - disconnect(TRUE); - - if (command_buffer) - my_free(command_buffer); - DBUG_VOID_RETURN; -} - -void clustrix_connection::disconnect(bool is_destructor) -{ - DBUG_ENTER("clustrix_connection::disconnect"); - if (is_destructor) - { - /* - Connection object destruction occurs after the destruction of - the thread used by the network has begun, so usage of that - thread object now is not reliable - */ - clustrix_net.net.thd = NULL; - } - mysql_close(&clustrix_net); - DBUG_VOID_RETURN; -} - -int host_list_next; -extern int host_list_cnt; -extern char **host_list; - -int clustrix_connection::connect() -{ - int error_code = 0; - my_bool my_true = 1; - DBUG_ENTER("clustrix_connection::connect"); - - // cpu concurrency by damned! - int host_num = host_list_next; - host_num = host_num % host_list_cnt; - char *host = host_list[host_num]; - host_list_next = host_num + 1; - DBUG_PRINT("host", ("%s", host)); - - /* Validate the connection parameters */ - if (!strcmp(clustrix_socket, "")) - if (!strcmp(host, "127.0.0.1")) - if (clustrix_port == MYSQL_PORT_DEFAULT) - DBUG_RETURN(ER_CONNECT_TO_FOREIGN_DATA_SOURCE); - - //clustrix_net.methods = &connection_methods; - - if (!mysql_init(&clustrix_net)) - DBUG_RETURN(HA_ERR_OUT_OF_MEM); - - mysql_options(&clustrix_net, MYSQL_OPT_READ_TIMEOUT, - &clustrix_read_timeout); - mysql_options(&clustrix_net, MYSQL_OPT_WRITE_TIMEOUT, - &clustrix_write_timeout); - mysql_options(&clustrix_net, MYSQL_OPT_CONNECT_TIMEOUT, - &clustrix_connect_timeout); - mysql_options(&clustrix_net, MYSQL_OPT_USE_REMOTE_CONNECTION, - NULL); - mysql_options(&clustrix_net, MYSQL_SET_CHARSET_NAME, "utf8mb4"); - mysql_options(&clustrix_net, MYSQL_OPT_USE_THREAD_SPECIFIC_MEMORY, - (char *) &my_true); - mysql_options(&clustrix_net, MYSQL_INIT_COMMAND,"SET autocommit=0"); - -#ifdef CLUSTRIX_CONNECTION_SSL - if (opt_ssl_ca_length | conn->tgt_ssl_capath_length | - conn->tgt_ssl_cert_length | conn->tgt_ssl_key_length) - { - mysql_ssl_set(&clustrix_net, conn->tgt_ssl_key, conn->tgt_ssl_cert, - conn->tgt_ssl_ca, conn->tgt_ssl_capath, conn->tgt_ssl_cipher); - if (conn->tgt_ssl_vsc) - { - my_bool verify_flg = TRUE; - mysql_options(&clustrix_net, MYSQL_OPT_SSL_VERIFY_SERVER_CERT, - &verify_flg); - } - } -#endif - - if (!mysql_real_connect(&clustrix_net, host, - clustrix_username, clustrix_password, - NULL, clustrix_port, clustrix_socket, - CLIENT_MULTI_STATEMENTS)) - { - error_code = mysql_errno(&clustrix_net); - disconnect(); - - if (error_code != CR_CONN_HOST_ERROR && - error_code != CR_CONNECTION_ERROR) - { - if (error_code == ER_CON_COUNT_ERROR) - { - my_error(ER_CON_COUNT_ERROR, MYF(0)); - DBUG_RETURN(ER_CON_COUNT_ERROR); - } - my_error(ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), host); - DBUG_RETURN(ER_CONNECT_TO_FOREIGN_DATA_SOURCE); - } - } - - clustrix_net.reconnect = 1; - - DBUG_RETURN(0); -} - -int clustrix_connection::begin_command(uchar command) -{ - if (trans_state == CLUSTRIX_TRANS_NONE) - return HA_ERR_INTERNAL_ERROR; - - command_length = 0; - int error_code = 0; - if ((error_code = add_command_operand_uchar(command))) - return error_code; - - if ((error_code = add_command_operand_uchar(trans_state | trans_flags))) - return error_code; - - return error_code; -} - -int clustrix_connection::send_command() -{ - my_bool com_error; - - /* - Please note: - * The transaction state is set before the command is sent because rolling - back a nonexistent transaction is better than leaving a tranaction open - on the cluster. - * The state may have alreadly been STARTED. - * Commit and rollback commands update the transaction state after calling - this function. - * If auto-commit is enabled, the state may also updated after the - response has been processed. We do not clear the auto-commit flag here - because it needs to be sent with each command until the transaction is - committed or rolled back. - */ - trans_state = CLUSTRIX_TRANS_STARTED; - - com_error = simple_command(&clustrix_net, - (enum_server_command)CLUSTRIX_SERVER_REQUEST, - command_buffer, command_length, TRUE); - - if (com_error) - { - int error_code = mysql_errno(&clustrix_net); - my_printf_error(error_code, - "Clustrix error: %s", MYF(0), - mysql_error(&clustrix_net)); - return error_code; - } - - return 0; -} - -int clustrix_connection::read_query_response() -{ - my_bool comerr = clustrix_net.methods->read_query_result(&clustrix_net); - int error_code = 0; - if (comerr) - { - error_code = mysql_errno(&clustrix_net); - my_printf_error(error_code, - "Clustrix error: %s", MYF(0), - mysql_error(&clustrix_net)); - } - - auto_commit_closed(); - return error_code; -} - -bool clustrix_connection::has_open_transaction() -{ - return trans_state != CLUSTRIX_TRANS_NONE; -} - -int clustrix_connection::commit_transaction() -{ - DBUG_ENTER("clustrix_connection::commit_transaction"); - if (trans_state == CLUSTRIX_TRANS_NONE) - DBUG_RETURN(HA_ERR_INTERNAL_ERROR); - - if (trans_state == CLUSTRIX_TRANS_REQUESTED) { - trans_state = CLUSTRIX_TRANS_NONE; - trans_flags = CLUSTRIX_TRANS_NO_POST_FLAGS; - DBUG_RETURN(0); - } - - int error_code; - if ((error_code = begin_command(CLUSTRIX_COMMIT))) - DBUG_RETURN(error_code); - - if ((error_code = send_command())) - DBUG_RETURN(error_code); - - if ((error_code = read_query_response())) - DBUG_RETURN(error_code); - - trans_state = CLUSTRIX_TRANS_NONE; - trans_flags = CLUSTRIX_TRANS_NO_POST_FLAGS; - DBUG_RETURN(error_code); -} - -int clustrix_connection::rollback_transaction() -{ - DBUG_ENTER("clustrix_connection::rollback_transaction"); - if (trans_state == CLUSTRIX_TRANS_NONE || - trans_state == CLUSTRIX_TRANS_REQUESTED) { - trans_state = CLUSTRIX_TRANS_NONE; - DBUG_RETURN(0); - } - - int error_code; - if ((error_code = begin_command(CLUSTRIX_ROLLBACK))) - DBUG_RETURN(error_code); - - if ((error_code = send_command())) - DBUG_RETURN(error_code); - - if ((error_code = read_query_response())) - DBUG_RETURN(error_code); - - trans_state = CLUSTRIX_TRANS_NONE; - trans_flags = CLUSTRIX_TRANS_NO_POST_FLAGS; - DBUG_RETURN(error_code); -} - -int clustrix_connection::begin_transaction_next() -{ - DBUG_ENTER("clustrix_connection::begin_transaction_next"); - if (trans_state != CLUSTRIX_TRANS_NONE || - trans_flags != CLUSTRIX_TRANS_NO_POST_FLAGS) - DBUG_RETURN(HA_ERR_INTERNAL_ERROR); - - trans_state = CLUSTRIX_TRANS_REQUESTED; - DBUG_RETURN(0); -} - -int clustrix_connection::new_statement_next() -{ - DBUG_ENTER("clustrix_connection::new_statement_next"); - if (trans_state != CLUSTRIX_TRANS_STARTED || - trans_flags != CLUSTRIX_TRANS_NO_POST_FLAGS) - DBUG_RETURN(HA_ERR_INTERNAL_ERROR); - - trans_state = CLUSTRIX_TRANS_NEW_STMT; - DBUG_RETURN(0); -} - -int clustrix_connection::rollback_statement_next() -{ - DBUG_ENTER("clustrix_connection::rollback_statement_next"); - if (trans_state != CLUSTRIX_TRANS_STARTED || - trans_flags != CLUSTRIX_TRANS_NO_POST_FLAGS) - DBUG_RETURN(HA_ERR_INTERNAL_ERROR); - - trans_state = CLUSTRIX_TRANS_ROLLBACK_STMT; - DBUG_RETURN(0); -} - -void clustrix_connection::auto_commit_next() -{ - trans_flags |= CLUSTRIX_TRANS_AUTOCOMMIT; -} - -void clustrix_connection::auto_commit_closed() -{ - if (trans_flags & CLUSTRIX_TRANS_AUTOCOMMIT) { - trans_flags &= ~CLUSTRIX_TRANS_AUTOCOMMIT; - trans_state = CLUSTRIX_TRANS_NONE; - } -} - -int clustrix_connection::run_query(String &stmt) -{ - int error_code = mysql_real_query(&clustrix_net, stmt.ptr(), stmt.length()); - if (error_code) - return mysql_errno(&clustrix_net); - return error_code; -} - -int clustrix_connection::write_row(ulonglong clustrix_table_oid, - uchar *packed_row, size_t packed_size, - ulonglong *last_insert_id) -{ - int error_code; - command_length = 0; - - // row based commands should not be called with auto commit. - if (trans_flags & CLUSTRIX_TRANS_AUTOCOMMIT) - return HA_ERR_INTERNAL_ERROR; - - if ((error_code = begin_command(CLUSTRIX_WRITE_ROW))) - return error_code; - - if ((error_code = add_command_operand_ulonglong(clustrix_table_oid))) - return error_code; - - if ((error_code = add_command_operand_str(packed_row, packed_size))) - return error_code; - - if ((error_code = send_command())) - return error_code; - - if ((error_code = read_query_response())) - return error_code; - - *last_insert_id = clustrix_net.insert_id; - return error_code; -} - -int clustrix_connection::key_update(ulonglong clustrix_table_oid, - uchar *packed_key, size_t packed_key_length, - MY_BITMAP *update_set, - uchar *packed_new_data, - size_t packed_new_length) -{ - int error_code; - command_length = 0; - - // row based commands should not be called with auto commit. - if (trans_flags & CLUSTRIX_TRANS_AUTOCOMMIT) - return HA_ERR_INTERNAL_ERROR; - - if ((error_code = begin_command(CLUSTRIX_KEY_UPDATE))) - return error_code; - - if ((error_code = add_command_operand_ulonglong(clustrix_table_oid))) - return error_code; - - if ((error_code = add_command_operand_str(packed_key, packed_key_length))) - return error_code; - - if ((error_code = add_command_operand_bitmap(update_set))) - return error_code; - - if ((error_code = add_command_operand_str(packed_new_data, - packed_new_length))) - return error_code; - - if ((error_code = send_command())) - return error_code; - - if ((error_code = read_query_response())) - return error_code; - - return error_code; -} - -int clustrix_connection::key_delete(ulonglong clustrix_table_oid, - uchar *packed_key, size_t packed_key_length) -{ - int error_code; - command_length = 0; - - // row based commands should not be called with auto commit. - if (trans_flags & CLUSTRIX_TRANS_AUTOCOMMIT) - return HA_ERR_INTERNAL_ERROR; - - if ((error_code = begin_command(CLUSTRIX_KEY_DELETE))) - return error_code; - - if ((error_code = add_command_operand_ulonglong(clustrix_table_oid))) - return error_code; - - if ((error_code = add_command_operand_str(packed_key, packed_key_length))) - return error_code; - - if ((error_code = send_command())) - return error_code; - - if ((error_code = read_query_response())) - return error_code; - - return error_code; -} - -int clustrix_connection::key_read(ulonglong clustrix_table_oid, uint index, - clustrix_lock_mode_t lock_mode, - MY_BITMAP *read_set, uchar *packed_key, - ulong packed_key_length, uchar **rowdata, - ulong *rowdata_length) -{ - int error_code; - command_length = 0; - - // row based commands should not be called with auto commit. - if (trans_flags & CLUSTRIX_TRANS_AUTOCOMMIT) - return HA_ERR_INTERNAL_ERROR; - - if ((error_code = begin_command(CLUSTRIX_KEY_READ))) - return error_code; - - if ((error_code = add_command_operand_ulonglong(clustrix_table_oid))) - return error_code; - - if ((error_code = add_command_operand_uint(index))) - return error_code; - - if ((error_code = add_command_operand_uchar((uchar)lock_mode))) - return error_code; - - if ((error_code = add_command_operand_bitmap(read_set))) - return error_code; - - if ((error_code = add_command_operand_str(packed_key, packed_key_length))) - return error_code; - - if ((error_code = send_command())) - return error_code; - - ulong packet_length = cli_safe_read(&clustrix_net); - if (packet_length == packet_error) - return mysql_errno(&clustrix_net); - - uchar *data = clustrix_net.net.read_pos; - *rowdata_length = safe_net_field_length_ll(&data, packet_length); - *rowdata = (uchar *)my_malloc(*rowdata_length, MYF(MY_WME)); - memcpy(*rowdata, data, *rowdata_length); - - packet_length = cli_safe_read(&clustrix_net); - if (packet_length == packet_error) { - my_free(*rowdata); - *rowdata = NULL; - *rowdata_length = 0; - return mysql_errno(&clustrix_net); - } - - return 0; -} - -class clustrix_connection_cursor { - struct rowdata { - ulong length; - uchar *data; - }; - - ulong current_row; - ulong last_row; - struct rowdata *rows; - uchar *outstanding_row; // to be freed on next request. - MYSQL *clustrix_net; - -public: - ulong buffer_size; - ulonglong scan_refid; - bool eof_reached; - -private: - int cache_row(uchar *rowdata, ulong rowdata_length) - { - DBUG_ENTER("clustrix_connection_cursor::cache_row"); - rows[last_row].length = rowdata_length; - rows[last_row].data = (uchar *)my_malloc(rowdata_length, MYF(MY_WME)); - if (!rows[last_row].data) - DBUG_RETURN(HA_ERR_OUT_OF_MEM); - memcpy(rows[last_row].data, rowdata, rowdata_length); - last_row++; - DBUG_RETURN(0); - } - - int load_rows_impl(bool *stmt_completed) - { - DBUG_ENTER("clustrix_connection_cursor::load_rows_impl"); - int error_code = 0; - ulong packet_length = cli_safe_read(clustrix_net); - if (packet_length == packet_error) { - error_code = mysql_errno(clustrix_net); - *stmt_completed = TRUE; - if (error_code == HA_ERR_END_OF_FILE) { - // We have read all rows for query. - eof_reached = TRUE; - DBUG_RETURN(0); - } - DBUG_RETURN(error_code); - } - - uchar *rowdata = clustrix_net->net.read_pos; - ulong rowdata_length = safe_net_field_length_ll(&rowdata, packet_length); - if (!rowdata_length) { - // We have read all rows in this batch. - DBUG_RETURN(0); - } - - if ((error_code = cache_row(rowdata, rowdata_length))) - DBUG_RETURN(error_code); - - DBUG_RETURN(load_rows_impl(stmt_completed)); - } - -public: - clustrix_connection_cursor(MYSQL *clustrix_net_, ulong bufsize) - { - DBUG_ENTER("clustrix_connection_cursor::clustrix_connection_cursor"); - clustrix_net = clustrix_net_; - eof_reached = FALSE; - current_row = 0; - last_row = 0; - outstanding_row = NULL; - buffer_size = bufsize; - rows = NULL; - DBUG_VOID_RETURN; - } - - ~clustrix_connection_cursor() - { - DBUG_ENTER("clustrix_connection_cursor::~clustrix_connection_cursor"); - if (outstanding_row) - my_free(outstanding_row); - if (rows) { - while (current_row < last_row) - my_free(rows[current_row++].data); - my_free(rows); - } - DBUG_VOID_RETURN; - } - - int load_rows(bool *stmt_completed) - { - DBUG_ENTER("clustrix_connection_cursor::load_rows"); - current_row = 0; - last_row = 0; - DBUG_RETURN(load_rows_impl(stmt_completed)); - } - - int initialize(bool *stmt_completed) - { - DBUG_ENTER("clustrix_connection_cursor::initialize"); - ulong packet_length = cli_safe_read(clustrix_net); - if (packet_length == packet_error) { - *stmt_completed = TRUE; - DBUG_RETURN(mysql_errno(clustrix_net)); - } - - unsigned char *pos = clustrix_net->net.read_pos; - scan_refid = safe_net_field_length_ll(&pos, packet_length); - - rows = (struct rowdata *)my_malloc(buffer_size * sizeof(struct rowdata), - MYF(MY_WME)); - if (!rows) - DBUG_RETURN(HA_ERR_OUT_OF_MEM); - - DBUG_RETURN(load_rows(stmt_completed)); - } - - uchar *retrieve_row(ulong *rowdata_length) - { - DBUG_ENTER("clustrix_connection_cursor::retrieve_row"); - if (outstanding_row) { - my_free(outstanding_row); - outstanding_row = NULL; - } - if (current_row == last_row) - DBUG_RETURN(NULL); - *rowdata_length = rows[current_row].length; - outstanding_row = rows[current_row].data; - current_row++; - DBUG_RETURN(outstanding_row); - } -}; - -int clustrix_connection::allocate_cursor(MYSQL *clustrix_net, ulong buffer_size, - clustrix_connection_cursor **scan) -{ - DBUG_ENTER("clustrix_connection::allocate_cursor"); - *scan = new clustrix_connection_cursor(clustrix_net, buffer_size); - if (!*scan) - DBUG_RETURN(HA_ERR_OUT_OF_MEM); - - bool stmt_completed = FALSE; - int error_code = (*scan)->initialize(&stmt_completed); - if (error_code) { - delete *scan; - *scan = NULL; - } - - if (stmt_completed) - auto_commit_closed(); - - DBUG_RETURN(error_code); -} - -int clustrix_connection::scan_table(ulonglong clustrix_table_oid, - clustrix_lock_mode_t lock_mode, - MY_BITMAP *read_set, ushort row_req, - clustrix_connection_cursor **scan) -{ - int error_code; - command_length = 0; - - // row based commands should not be called with auto commit. - if (trans_flags & CLUSTRIX_TRANS_AUTOCOMMIT) - return HA_ERR_INTERNAL_ERROR; - - if ((error_code = begin_command(CLUSTRIX_SCAN_TABLE))) - return error_code; - - if ((error_code = add_command_operand_ushort(row_req))) - return error_code; - - if ((error_code = add_command_operand_ulonglong(clustrix_table_oid))) - return error_code; - - if ((error_code = add_command_operand_uchar((uchar)lock_mode))) - return error_code; - - if ((error_code = add_command_operand_bitmap(read_set))) - return error_code; - - if ((error_code = send_command())) - return error_code; - - return allocate_cursor(&clustrix_net, row_req, scan); -} - -/** - * @brief - * Sends a command to initiate query scan. - * @details - * Sends a command over mysql protocol connection to initiate an - * arbitrary query using a query text. - * Uses field types, field metadata and nullability to explicitly - * cast result to expected data type. Exploits RBR TABLE_MAP_EVENT - * format + sends SQL text. - * @args - * stmt& Query text to send - * fieldtype* array of byte wide field types of result projection - * null_bits* fields nullability bitmap of result projection - * field_metadata* Field metadata of result projection - * scan_refid id used to reference this scan later - * Used in pushdowns to initiate query scan. - **/ -int clustrix_connection::scan_query(String &stmt, uchar *fieldtype, uint fields, - uchar *null_bits, uint null_bits_size, - uchar *field_metadata, - uint field_metadata_size, - ushort row_req, - clustrix_connection_cursor **scan) -{ - int error_code; - command_length = 0; - - if ((error_code = begin_command(CLUSTRIX_SCAN_QUERY))) - return error_code; - - if ((error_code = add_command_operand_ushort(row_req))) - return error_code; - - if ((error_code = add_command_operand_str((uchar*)stmt.ptr(), stmt.length()))) - return error_code; - - if ((error_code = add_command_operand_str(fieldtype, fields))) - return error_code; - - if ((error_code = add_command_operand_str(field_metadata, field_metadata_size))) - return error_code; - - // This variable length string calls for an additional store w/o lcb lenth prefix. - if ((error_code = add_command_operand_vlstr(null_bits, null_bits_size))) - return error_code; - - if ((error_code = send_command())) - return error_code; - - return allocate_cursor(&clustrix_net, row_req, scan); -} - -/** - * @brief - * Sends a command to initiate UPDATE. - * @details - * Sends a command over mysql protocol connection to initiate an - * UPDATE query using a query text. - * @args - * stmt& Query text to send - * dbname current working database - * dbname ¤t database name - **/ -int clustrix_connection::update_query(String &stmt, LEX_CSTRING &dbname, - ulonglong *affected_rows) -{ - int error_code; - command_length = 0; - - if ((error_code = begin_command(CLUSTRIX_UPDATE_QUERY))) - return error_code; - - if ((error_code = add_command_operand_str((uchar*)dbname.str, dbname.length))) - return error_code; - - if ((error_code = add_command_operand_str((uchar*)stmt.ptr(), stmt.length()))) - return error_code; - - if ((error_code = send_command())) - return error_code; - - error_code = read_query_response(); - if (!error_code) - *affected_rows = clustrix_net.affected_rows; - - return error_code; -} - -int clustrix_connection::scan_from_key(ulonglong clustrix_table_oid, uint index, - clustrix_lock_mode_t lock_mode, - enum scan_type scan_dir, - int no_key_cols, bool sorted_scan, - MY_BITMAP *read_set, uchar *packed_key, - ulong packed_key_length, ushort row_req, - clustrix_connection_cursor **scan) -{ - int error_code; - command_length = 0; - - // row based commands should not be called with auto commit. - if (trans_flags & CLUSTRIX_TRANS_AUTOCOMMIT) - return HA_ERR_INTERNAL_ERROR; - - if ((error_code = begin_command(CLUSTRIX_SCAN_FROM_KEY))) - return error_code; - - if ((error_code = add_command_operand_ushort(row_req))) - return error_code; - - if ((error_code = add_command_operand_ulonglong(clustrix_table_oid))) - return error_code; - - if ((error_code = add_command_operand_uint(index))) - return error_code; - - if ((error_code = add_command_operand_uchar((uchar)lock_mode))) - return error_code; - - if ((error_code = add_command_operand_uchar(scan_dir))) - return error_code; - - if ((error_code = add_command_operand_uint(no_key_cols))) - return error_code; - - if ((error_code = add_command_operand_uchar(sorted_scan))) - return error_code; - - if ((error_code = add_command_operand_str(packed_key, packed_key_length))) - return error_code; - - if ((error_code = add_command_operand_bitmap(read_set))) - return error_code; - - if ((error_code = send_command())) - return error_code; - - return allocate_cursor(&clustrix_net, row_req, scan); -} - -int clustrix_connection::scan_next(clustrix_connection_cursor *scan, - uchar **rowdata, ulong *rowdata_length) -{ - *rowdata = scan->retrieve_row(rowdata_length); - if (*rowdata) - return 0; - - if (scan->eof_reached) - return HA_ERR_END_OF_FILE; - - int error_code; - command_length = 0; - - if ((error_code = begin_command(CLUSTRIX_SCAN_NEXT))) - return error_code; - - if ((error_code = add_command_operand_ushort(scan->buffer_size))) - return error_code; - - if ((error_code = add_command_operand_lcb(scan->scan_refid))) - return error_code; - - if ((error_code = send_command())) - return error_code; - - bool stmt_completed = FALSE; - error_code = scan->load_rows(&stmt_completed); - if (stmt_completed) - auto_commit_closed(); - if (error_code) - return error_code; - - *rowdata = scan->retrieve_row(rowdata_length); - if (!*rowdata) - return HA_ERR_END_OF_FILE; - - return 0; -} - -int clustrix_connection::scan_end(clustrix_connection_cursor *scan) -{ - int error_code; - command_length = 0; - ulonglong scan_refid = scan->scan_refid; - bool eof_reached = scan->eof_reached; - delete scan; - - if (eof_reached) - return 0; - - if ((error_code = begin_command(CLUSTRIX_SCAN_STOP))) - return error_code; - - if ((error_code = add_command_operand_lcb(scan_refid))) - return error_code; - - if ((error_code = send_command())) - return error_code; - - return read_query_response(); -} - -int clustrix_connection::populate_table_list(LEX_CSTRING *db, - handlerton::discovered_list *result) -{ - int error_code = 0; - String stmt; - stmt.append("SHOW FULL TABLES FROM "); - stmt.append(db); - stmt.append(" WHERE table_type = 'BASE TABLE'"); - - if (mysql_real_query(&clustrix_net, stmt.c_ptr(), stmt.length())) { - int error_code = mysql_errno(&clustrix_net); - if (error_code == ER_BAD_DB_ERROR) - return 0; - else - return error_code; - } - - MYSQL_RES *results = mysql_store_result(&clustrix_net); - if (mysql_num_fields(results) != 2) { - error_code = HA_ERR_CORRUPT_EVENT; - goto error; - } - - MYSQL_ROW row; - while((row = mysql_fetch_row(results))) - result->add_table(row[0], strlen(row[0])); - -error: - mysql_free_result(results); - return error_code; -} - -int clustrix_connection::discover_table_details(LEX_CSTRING *db, - LEX_CSTRING *name, THD *thd, - TABLE_SHARE *share) -{ - DBUG_ENTER("clustrix_connection::discover_table_details"); - int error_code = 0; - MYSQL_RES *results_oid = NULL; - MYSQL_RES *results_create = NULL; - MYSQL_ROW row; - String get_oid, show; - - /* get oid */ - get_oid.append("select r.table " - "from system.databases d " - " inner join ""system.relations r on d.db = r.db " - "where d.name = '"); - get_oid.append(db); - get_oid.append("' and r.name = '"); - get_oid.append(name); - get_oid.append("'"); - - if (mysql_real_query(&clustrix_net, get_oid.c_ptr(), get_oid.length())) { - if ((error_code = mysql_errno(&clustrix_net))) { - DBUG_PRINT("mysql_real_query returns ", ("%d", error_code)); - error_code = HA_ERR_NO_SUCH_TABLE; - goto error; - } - } - - results_oid = mysql_store_result(&clustrix_net); - DBUG_PRINT("oid results", - ("rows: %llu, fields: %u", mysql_num_rows(results_oid), - mysql_num_fields(results_oid))); - - if (mysql_num_rows(results_oid) != 1) { - error_code = HA_ERR_NO_SUCH_TABLE; - goto error; - } - - while((row = mysql_fetch_row(results_oid))) { - DBUG_PRINT("row", ("%s", row[0])); - uchar *to = (uchar*)alloc_root(&share->mem_root, strlen(row[0]) + 1); - if (!to) { - error_code = HA_ERR_OUT_OF_MEM; - goto error; - } - - strcpy((char *)to, (char *)row[0]); - share->tabledef_version.str = to; - share->tabledef_version.length = strlen(row[0]); - } - - /* get show create statement */ - show.append("show simple create table "); - show.append(db); - show.append("."); - show.append(name); - if (mysql_real_query(&clustrix_net, show.c_ptr(), show.length())) { - if ((error_code = mysql_errno(&clustrix_net))) { - DBUG_PRINT("mysql_real_query returns ", ("%d", error_code)); - error_code = HA_ERR_NO_SUCH_TABLE; - goto error; - } - } - - results_create = mysql_store_result(&clustrix_net); - DBUG_PRINT("show table results", - ("rows: %llu, fields: %u", mysql_num_rows(results_create), - mysql_num_fields(results_create))); - - if (mysql_num_rows(results_create) != 1) { - error_code = HA_ERR_NO_SUCH_TABLE; - goto error; - } - - if (mysql_num_fields(results_create) != 2) { - error_code = HA_ERR_CORRUPT_EVENT; - goto error; - } - - while((row = mysql_fetch_row(results_create))) { - DBUG_PRINT("row", ("%s - %s", row[0], row[1])); - error_code = share->init_from_sql_statement_string(thd, false, row[1], - strlen(row[1])); - } - -error: - if (results_oid) - mysql_free_result(results_oid); - - if (results_create) - mysql_free_result(results_create); - DBUG_RETURN(error_code); -} - -#define COMMAND_BUFFER_SIZE_INCREMENT 1024 -#define COMMAND_BUFFER_SIZE_INCREMENT_BITS 10 -int clustrix_connection::expand_command_buffer(size_t add_length) -{ - size_t expanded_length; - - if (command_buffer_length >= command_length + add_length) - return 0; - - expanded_length = command_buffer_length + - ((add_length >> COMMAND_BUFFER_SIZE_INCREMENT_BITS) - << COMMAND_BUFFER_SIZE_INCREMENT_BITS) + - COMMAND_BUFFER_SIZE_INCREMENT; - - if (!command_buffer_length) - command_buffer = (uchar *) my_malloc(expanded_length, MYF(MY_WME)); - else - command_buffer = (uchar *) my_realloc(command_buffer, expanded_length, - MYF(MY_WME)); - if (!command_buffer) - return HA_ERR_OUT_OF_MEM; - - command_buffer_length = expanded_length; - - return 0; -} - -int clustrix_connection::add_command_operand_uchar(uchar value) -{ - int error_code = expand_command_buffer(sizeof(value)); - if (error_code) - return error_code; - - memcpy(command_buffer + command_length, &value, sizeof(value)); - command_length += sizeof(value); - - return 0; -} - -int clustrix_connection::add_command_operand_ushort(ushort value) -{ - ushort be_value = htobe16(value); - int error_code = expand_command_buffer(sizeof(be_value)); - if (error_code) - return error_code; - - memcpy(command_buffer + command_length, &be_value, sizeof(be_value)); - command_length += sizeof(be_value); - return 0; -} - -int clustrix_connection::add_command_operand_uint(uint value) -{ - uint be_value = htobe32(value); - int error_code = expand_command_buffer(sizeof(be_value)); - if (error_code) - return error_code; - - memcpy(command_buffer + command_length, &be_value, sizeof(be_value)); - command_length += sizeof(be_value); - return 0; -} - -int clustrix_connection::add_command_operand_ulonglong(ulonglong value) -{ - ulonglong be_value = htobe64(value); - int error_code = expand_command_buffer(sizeof(be_value)); - if (error_code) - return error_code; - - memcpy(command_buffer + command_length, &be_value, sizeof(be_value)); - command_length += sizeof(be_value); - return 0; -} - -int clustrix_connection::add_command_operand_lcb(ulonglong value) -{ - int len = net_length_size(value); - int error_code = expand_command_buffer(len); - if (error_code) - return error_code; - - net_store_length(command_buffer + command_length, value); - command_length += len; - return 0; -} - -int clustrix_connection::add_command_operand_str(const uchar *str, - size_t str_length) -{ - int error_code = add_command_operand_lcb(str_length); - if (error_code) - return error_code; - - if (!str_length) - return 0; - - error_code = expand_command_buffer(str_length); - if (error_code) - return error_code; - - memcpy(command_buffer + command_length, str, str_length); - command_length += str_length; - return 0; -} - -/** - * @brief - * Puts variable length string into the buffer. - * @details - * Puts into the buffer variable length string the size - * of which is send by other means. For details see - * MDB Client/Server Protocol. - * @args - * str - string to send - * str_length - size - **/ -int clustrix_connection::add_command_operand_vlstr(const uchar *str, - size_t str_length) -{ - int error_code = expand_command_buffer(str_length); - if (error_code) - return error_code; - - memcpy(command_buffer + command_length, str, str_length); - command_length += str_length; - return 0; -} - -int clustrix_connection::add_command_operand_lex_string(LEX_CSTRING str) -{ - return add_command_operand_str((const uchar *)str.str, str.length); -} - -int clustrix_connection::add_command_operand_bitmap(MY_BITMAP *bitmap) -{ - int error_code = add_command_operand_lcb(bitmap->n_bits); - if (error_code) - return error_code; - - int no_bytes = no_bytes_in_map(bitmap); - error_code = expand_command_buffer(no_bytes); - if (error_code) - return error_code; - - memcpy(command_buffer + command_length, bitmap->bitmap, no_bytes); - command_length += no_bytes; - return 0; -} diff --git a/storage/clustrixdb/clustrix_connection.h b/storage/clustrixdb/clustrix_connection.h deleted file mode 100644 index f2a48f3da6c..00000000000 --- a/storage/clustrixdb/clustrix_connection.h +++ /dev/null @@ -1,123 +0,0 @@ -/***************************************************************************** -Copyright (c) 2019, MariaDB Corporation. -*****************************************************************************/ - -#ifndef _clustrix_connection_h -#define _clustrix_connection_h - -#ifdef USE_PRAGMA_INTERFACE -#pragma interface /* gcc class implementation */ -#endif - -#define MYSQL_SERVER 1 -#include "my_global.h" -#include "m_string.h" -#include "mysql.h" -#include "sql_common.h" -#include "my_base.h" -#include "mysqld_error.h" -#include "my_bitmap.h" -#include "handler.h" - -#define CLUSTRIX_SERVER_REQUEST 30 - -typedef enum clustrix_lock_mode { - CLUSTRIX_NO_LOCKS, - CLUSTRIX_SHARED, - CLUSTRIX_EXCLUSIVE, -} clustrix_lock_mode_t; - -class clustrix_connection_cursor; -class clustrix_connection -{ -private: - MYSQL clustrix_net; - uchar *command_buffer; - size_t command_buffer_length; - size_t command_length; - - int trans_state; - int trans_flags; - int allocate_cursor(MYSQL *clustrix_net, ulong buffer_size, - clustrix_connection_cursor **scan); -public: - clustrix_connection(); - ~clustrix_connection(); - - inline bool is_connected() - { - return clustrix_net.net.vio; - } - int connect(); - void disconnect(bool is_destructor = FALSE); - - bool has_open_transaction(); - int commit_transaction(); - int rollback_transaction(); - int begin_transaction_next(); - int new_statement_next(); - int rollback_statement_next(); // also starts new statement - void auto_commit_next(); - void auto_commit_closed(); - - int run_query(String &stmt); - int write_row(ulonglong clustrix_table_oid, uchar *packed_row, - size_t packed_size, ulonglong *last_insert_id); - int key_update(ulonglong clustrix_table_oid, - uchar *packed_key, size_t packed_key_length, - MY_BITMAP *update_set, - uchar *packed_new_data, size_t packed_new_length); - int key_delete(ulonglong clustrix_table_oid, - uchar *packed_key, size_t packed_key_length); - int key_read(ulonglong clustrix_table_oid, uint index, - clustrix_lock_mode_t lock_mode, MY_BITMAP *read_set, - uchar *packed_key, ulong packed_key_length, uchar **rowdata, - ulong *rowdata_length); - enum sort_order {SORT_NONE = 0, SORT_ASC = 1, SORT_DESC = 2}; - enum scan_type { - READ_KEY_OR_NEXT, /* rows with key and greater */ - READ_KEY_OR_PREV, /* rows with key and less. */ - READ_AFTER_KEY, /* rows with keys greater than key */ - READ_BEFORE_KEY, /* rows with keys less than key */ - READ_FROM_START, /* rows with forwards from first key. */ - READ_FROM_LAST, /* rows with backwards from last key. */ - }; - int scan_table(ulonglong clustrix_table_oid, - clustrix_lock_mode_t lock_mode, - MY_BITMAP *read_set, ushort row_req, - clustrix_connection_cursor **scan); - int scan_query(String &stmt, uchar *fieldtype, uint fields, uchar *null_bits, - uint null_bits_size, uchar *field_metadata, - uint field_metadata_size, ushort row_req, - clustrix_connection_cursor **scan); - int update_query(String &stmt, LEX_CSTRING &dbname, ulonglong *affected_rows); - int scan_from_key(ulonglong clustrix_table_oid, uint index, - clustrix_lock_mode_t lock_mode, - enum scan_type scan_dir, int no_key_cols, bool sorted_scan, - MY_BITMAP *read_set, uchar *packed_key, - ulong packed_key_length, ushort row_req, - clustrix_connection_cursor **scan); - int scan_next(clustrix_connection_cursor *scan, uchar **rowdata, - ulong *rowdata_length); - int scan_end(clustrix_connection_cursor *scan); - - int populate_table_list(LEX_CSTRING *db, handlerton::discovered_list *result); - int discover_table_details(LEX_CSTRING *db, LEX_CSTRING *name, THD *thd, - TABLE_SHARE *share); - -private: - int expand_command_buffer(size_t add_length); - int add_command_operand_uchar(uchar value); - int add_command_operand_ushort(ushort value); - int add_command_operand_uint(uint value); - int add_command_operand_ulonglong(ulonglong value); - int add_command_operand_lcb(ulonglong value); - int add_command_operand_str(const uchar *str, size_t length); - int add_command_operand_vlstr(const uchar *str, size_t length); - int add_command_operand_lex_string(LEX_CSTRING str); - int add_command_operand_bitmap(MY_BITMAP *bitmap); - int begin_command(uchar command); - int send_command(); - int read_query_response(); -}; -#endif // _clustrix_connection_h diff --git a/storage/clustrixdb/ha_clustrixdb.cc b/storage/clustrixdb/ha_clustrixdb.cc deleted file mode 100644 index d80ab91653b..00000000000 --- a/storage/clustrixdb/ha_clustrixdb.cc +++ /dev/null @@ -1,1372 +0,0 @@ -/***************************************************************************** -Copyright (c) 2019, MariaDB Corporation. -*****************************************************************************/ - -/** @file ha_clustrixdb.cc */ - -#include "ha_clustrixdb.h" -#include "ha_clustrixdb_pushdown.h" -#include "key.h" - -handlerton *clustrixdb_hton = NULL; - -int clustrix_connect_timeout; -static MYSQL_SYSVAR_INT -( - connect_timeout, - clustrix_connect_timeout, - PLUGIN_VAR_OPCMDARG, - "Timeout for connecting to Clustrix", - NULL, NULL, -1, -1, 2147483647, 0 -); - -int clustrix_read_timeout; -static MYSQL_SYSVAR_INT -( - read_timeout, - clustrix_read_timeout, - PLUGIN_VAR_OPCMDARG, - "Timeout for receiving data from Clustrix", - NULL, NULL, -1, -1, 2147483647, 0 -); - -int clustrix_write_timeout; -static MYSQL_SYSVAR_INT -( - write_timeout, - clustrix_write_timeout, - PLUGIN_VAR_OPCMDARG, - "Timeout for sending data to Clustrix", - NULL, NULL, -1, -1, 2147483647, 0 -); - -char *clustrix_host; -static MYSQL_SYSVAR_STR -( - host, - clustrix_host, - PLUGIN_VAR_OPCMDARG | PLUGIN_VAR_MEMALLOC, - "Clustrix host", - NULL, NULL, "127.0.0.1" -); - -int host_list_cnt; -char **host_list; - -static void free_host_list() -{ - if (host_list) { - for (int i = 0; host_list[i]; i++) - my_free(host_list[i]); - my_free(host_list); - host_list = NULL; - } -} - -static void update_host_list(char *clustrix_host) -{ - free_host_list(); - - int cnt = 0; - for (char *p = clustrix_host, *s = clustrix_host; ; p++) { - if (*p == ',' || *p == '\0') { - if (p > s) { - cnt++; - } - if (!*p) - break; - s = p + 1; - } - } - - DBUG_PRINT("host_cnt", ("%d", cnt)); - host_list = (char **)my_malloc(sizeof(char *) * cnt+1, MYF(MY_WME)); - host_list[cnt] = 0; - host_list_cnt = cnt; - - int i = 0; - for (char *p = clustrix_host, *s = clustrix_host; ; p++) { - if (*p == ',' || *p == '\0') { - if (p > s) { - char *host = (char *)my_malloc(p - s + 1, MYF(MY_WME)); - host[p-s] = '\0'; - memcpy(host, s, p-s); - DBUG_PRINT("host", ("%s", host)); - host_list[i++] = host; - } - if (!*p) - break; - s = p + 1; - } - } - - DBUG_PRINT("clustrix_host", ("%s", clustrix_host)); -} - -char *clustrix_username; -static MYSQL_SYSVAR_STR -( - username, - clustrix_username, - PLUGIN_VAR_OPCMDARG | PLUGIN_VAR_MEMALLOC, - "Clustrix user name", - NULL, NULL, "root" -); - -char *clustrix_password; -static MYSQL_SYSVAR_STR -( - password, - clustrix_password, - PLUGIN_VAR_OPCMDARG | PLUGIN_VAR_MEMALLOC, - "Clustrix password", - NULL, NULL, "" -); - -uint clustrix_port; -static MYSQL_SYSVAR_UINT -( - port, - clustrix_port, - PLUGIN_VAR_RQCMDARG, - "Clustrix port", - NULL, NULL, MYSQL_PORT_DEFAULT, MYSQL_PORT_DEFAULT, 65535, 0 -); - -char *clustrix_socket; -static MYSQL_SYSVAR_STR -( - socket, - clustrix_socket, - PLUGIN_VAR_OPCMDARG | PLUGIN_VAR_MEMALLOC, - "Clustrix socket", - NULL, NULL, "" -); - -static MYSQL_THDVAR_UINT -( - row_buffer, - PLUGIN_VAR_RQCMDARG, - "Clustrix rowstore row buffer size", - NULL, NULL, 20, 1, 65535, 0 -); - -// Per thread select handler knob -static MYSQL_THDVAR_BOOL( - select_handler, - PLUGIN_VAR_NOCMDARG, - "", - NULL, - NULL, - 1 -); - -// Per thread derived handler knob -static MYSQL_THDVAR_BOOL( - derived_handler, - PLUGIN_VAR_NOCMDARG, - "", - NULL, - NULL, - 1 -); - -static MYSQL_THDVAR_BOOL( - enable_direct_update, - PLUGIN_VAR_NOCMDARG, - "", - NULL, - NULL, - 1 -); - -bool select_handler_setting(THD* thd) -{ - return ( thd == NULL ) ? false : THDVAR(thd, select_handler); -} - -bool derived_handler_setting(THD* thd) -{ - return ( thd == NULL ) ? false : THDVAR(thd, derived_handler); -} - -uint row_buffer_setting(THD* thd) -{ - return THDVAR(thd, row_buffer); -} - -/**************************************************************************** -** Utility functions -****************************************************************************/ -// This is a wastefull aproach but better then fixed sized buffer. -size_t estimate_row_size(TABLE *table) -{ - size_t row_size = 0; - size_t null_byte_count = (bitmap_bits_set(table->write_set) + 7) / 8; - row_size += null_byte_count; - Field **p_field= table->field, *field; - for ( ; (field= *p_field) ; p_field++) { - row_size += field->max_data_length(); - } - return row_size; -} - -/** - * @brief - * Decodes object name. - * - * @details - * Replaces the encoded object name in the path with a decoded variant, - * e.g if path contains ./test/d@0024. This f() makes it ./test/d$ - * - * Used in delete and rename DDL processing. - **/ -static void decode_objectname(char *buf, const char *path, size_t buf_size) -{ - size_t new_path_len = filename_to_tablename(path, buf, buf_size); - buf[new_path_len] = '\0'; -} - -static void decode_file_path(const char *path, char *decoded_dbname, - char *decoded_tbname) -{ - // The format cont ains './' in the beginning of a path. - char *dbname_start = (char*) path + 2; - char *dbname_end = dbname_start; - while (*dbname_end != '/') - dbname_end++; - - int cnt = dbname_end - dbname_start; - char *dbname = (char *)my_alloca(cnt + 1); - memcpy(dbname, dbname_start, cnt); - dbname[cnt] = '\0'; - decode_objectname(decoded_dbname, dbname, FN_REFLEN); - my_afree(dbname); - - char *tbname_start = dbname_end + 1; - decode_objectname(decoded_tbname, tbname_start, FN_REFLEN); -} - -clustrix_connection *get_trx(THD *thd, int *error_code) -{ - *error_code = 0; - clustrix_connection *trx; - if (!(trx = (clustrix_connection *)thd_get_ha_data(thd, clustrixdb_hton))) - { - if (!(trx = new clustrix_connection())) { - *error_code = HA_ERR_OUT_OF_MEM; - return NULL; - } - - *error_code = trx->connect(); - if (*error_code) { - delete trx; - return NULL; - } - - thd_set_ha_data(thd, clustrixdb_hton, trx); - } - - return trx; -} -/**************************************************************************** -** Class ha_clustrixdb -****************************************************************************/ - -ha_clustrixdb::ha_clustrixdb(handlerton *hton, TABLE_SHARE *table_arg) - : handler(hton, table_arg) -{ - DBUG_ENTER("ha_clustrixdb::ha_clustrixdb"); - rgi = NULL; - scan_cur = NULL; - clustrix_table_oid = 0; - upsert_flag = 0; - DBUG_VOID_RETURN; -} - -ha_clustrixdb::~ha_clustrixdb() -{ - if (rgi) - remove_current_table_from_rpl_table_list(rgi); -} - -int ha_clustrixdb::create(const char *name, TABLE *form, HA_CREATE_INFO *info) -{ - int error_code; - THD *thd = ha_thd(); - clustrix_connection *trx = get_trx(thd, &error_code); - if (!trx) - return error_code; - - enum tmp_table_type saved_tmp_table_type = form->s->tmp_table; - Table_specification_st *create_info = &thd->lex->create_info; - const bool is_tmp_table = info->options & HA_LEX_CREATE_TMP_TABLE; - String create_table_stmt; - - /* Create a copy of the CREATE TABLE statement */ - if (!is_tmp_table) - form->s->tmp_table = NO_TMP_TABLE; - const char *old_dbstr = thd->db.str; - thd->db.str = NULL; - ulong old = create_info->used_fields; - create_info->used_fields &= ~HA_CREATE_USED_ENGINE; - - TABLE_LIST table_list; - memset(&table_list, 0, sizeof(table_list)); - table_list.table = form; - error_code = show_create_table(thd, &table_list, &create_table_stmt, - create_info, WITH_DB_NAME); - - if (!is_tmp_table) - form->s->tmp_table = saved_tmp_table_type; - create_info->used_fields = old; - thd->db.str = old_dbstr; - if (error_code) - return error_code; - - // To syncronize the schemas of MDB FE and CLX BE. - if (form->s && form->s->db.length) { - String createdb_stmt; - createdb_stmt.append("CREATE DATABASE IF NOT EXISTS `"); - createdb_stmt.append(form->s->db.str, form->s->db.length); - createdb_stmt.append("`"); - trx->run_query(createdb_stmt); - } - - error_code = trx->run_query(create_table_stmt); - return error_code; -} - -int ha_clustrixdb::delete_table(const char *path) -{ - int error_code; - THD *thd = ha_thd(); - clustrix_connection *trx = get_trx(thd, &error_code); - if (!trx) - return error_code; - - char decoded_dbname[FN_REFLEN]; - char decoded_tbname[FN_REFLEN]; - decode_file_path(path, decoded_dbname, decoded_tbname); - - String delete_cmd; - delete_cmd.append("DROP TABLE `"); - delete_cmd.append(decoded_dbname); - delete_cmd.append("`.`"); - delete_cmd.append(decoded_tbname); - delete_cmd.append("`"); - - return trx->run_query(delete_cmd); -} - -int ha_clustrixdb::rename_table(const char* from, const char* to) -{ - int error_code; - THD *thd = ha_thd(); - clustrix_connection *trx = get_trx(thd, &error_code); - if (!trx) - return error_code; - - char decoded_from_dbname[FN_REFLEN]; - char decoded_from_tbname[FN_REFLEN]; - decode_file_path(from, decoded_from_dbname, decoded_from_tbname); - - char decoded_to_dbname[FN_REFLEN]; - char decoded_to_tbname[FN_REFLEN]; - decode_file_path(to, decoded_to_dbname, decoded_to_tbname); - - String rename_cmd; - rename_cmd.append("RENAME TABLE `"); - rename_cmd.append(decoded_from_dbname); - rename_cmd.append("`.`"); - rename_cmd.append(decoded_from_tbname); - rename_cmd.append("` TO `"); - rename_cmd.append(decoded_to_dbname); - rename_cmd.append("`.`"); - rename_cmd.append(decoded_to_tbname); - rename_cmd.append("`;"); - - return trx->run_query(rename_cmd); -} - -static void -clustrixdb_mark_table_for_discovery(TABLE *table) -{ - table->s->tabledef_version.str = NULL; - table->s->tabledef_version.length = 0; - table->m_needs_reopen = TRUE; -} - -int ha_clustrixdb::open(const char *name, int mode, uint test_if_locked) -{ - DBUG_ENTER("ha_clustrixdb::open"); - DBUG_PRINT("oid", - ("%s", table->s->tabledef_version.str)); - - if (!table->s->tabledef_version.str) - DBUG_RETURN(HA_ERR_TABLE_DEF_CHANGED); - if (!clustrix_table_oid) - clustrix_table_oid = atoll((const char *)table->s->tabledef_version.str); - - // Surrogate key marker - has_hidden_key = table->s->primary_key == MAX_KEY; - if (has_hidden_key) { - ref_length = 8; - } else { - KEY* key_info = table->key_info + table->s->primary_key; - ref_length = key_info->key_length; - } - - DBUG_PRINT("open finished", - ("oid: %llu, ref_length: %u", clustrix_table_oid, ref_length)); - DBUG_RETURN(0); -} - -int ha_clustrixdb::close(void) -{ - return 0; -} - -int ha_clustrixdb::reset() -{ - upsert_flag &= ~CLUSTRIX_BULK_UPSERT; - upsert_flag &= ~CLUSTRIX_HAS_UPSERT; - upsert_flag &= ~CLUSTRIX_UPSERT_SENT; - clx_lock_type = CLUSTRIX_NO_LOCKS; - return 0; -} - -int ha_clustrixdb::extra(enum ha_extra_function operation) -{ - DBUG_ENTER("ha_clustrixdb::extra"); - if (operation == HA_EXTRA_INSERT_WITH_UPDATE) - upsert_flag |= CLUSTRIX_HAS_UPSERT; - DBUG_RETURN(0); -} - -/*@brief UPSERT State Machine*/ -/************************************************************* - * DESCRIPTION: - * Fasttrack for UPSERT sends queries down to a CLX backend. - * UPSERT could be of two kinds: singular and bulk. The plugin - * re-/sets CLUSTRIX_BULK_UPSERT in end|start_bulk_insert - * methods. CLUSTRIX_UPSERT_SENT is used to avoid multiple - * execution at CLX backend. - * Generic CLUSTRIX_HAS_UPSERT is set for bulk UPSERT only b/c - * MDB calls write_row only once. - ************************************************************/ -int ha_clustrixdb::write_row(const uchar *buf) -{ - int error_code = 0; - THD *thd = ha_thd(); - clustrix_connection *trx = get_trx(thd, &error_code); - if (!trx) - return error_code; - - if (upsert_flag & CLUSTRIX_HAS_UPSERT) { - if (!(upsert_flag & CLUSTRIX_UPSERT_SENT)) { - ha_rows update_rows; - String update_stmt; - update_stmt.append(thd->query_string.str()); - - if (!thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) - trx->auto_commit_next(); - - error_code= trx->update_query(update_stmt, table->s->db, &update_rows); - if (upsert_flag & CLUSTRIX_BULK_UPSERT) - upsert_flag |= CLUSTRIX_UPSERT_SENT; - else - upsert_flag &= ~CLUSTRIX_HAS_UPSERT; - } - - return error_code; - } - - /* Convert the row format to binlog (packed) format */ - uchar *packed_new_row = (uchar*) my_alloca(estimate_row_size(table)); - size_t packed_size = pack_row(table, table->write_set, packed_new_row, buf); - - /* XXX: Clustrix may needs to return HA_ERR_AUTOINC_ERANGE if we hit that - error. */ - ulonglong last_insert_id = 0; - if ((error_code = trx->write_row(clustrix_table_oid, - packed_new_row, packed_size, - &last_insert_id))) - goto err; - - if (table->next_number_field) - insert_id_for_cur_row = last_insert_id; - -err: - if (error_code == HA_ERR_TABLE_DEF_CHANGED) - clustrixdb_mark_table_for_discovery(table); - - if (packed_size) - my_afree(packed_new_row); - - return error_code; -} - -int ha_clustrixdb::update_row(const uchar *old_data, const uchar *new_data) -{ - DBUG_ENTER("ha_clustrixdb::update_row"); - int error_code; - THD *thd = ha_thd(); - clustrix_connection *trx = get_trx(thd, &error_code); - if (!trx) - DBUG_RETURN(error_code); - - size_t row_size = estimate_row_size(table); - size_t packed_key_len; - uchar *packed_key = (uchar*) my_alloca(row_size); - build_key_packed_row(table->s->primary_key, old_data, - packed_key, &packed_key_len); - - uchar *packed_new_row = (uchar*) my_alloca(row_size); - size_t packed_new_size = pack_row(table, table->write_set, packed_new_row, - new_data); - - /* Send the packed rows to Clustrix */ - error_code = trx->key_update(clustrix_table_oid, packed_key, packed_key_len, - table->write_set, - packed_new_row, packed_new_size); - - if(packed_key) - my_afree(packed_key); - - if(packed_new_row) - my_afree(packed_new_row); - - if (error_code == HA_ERR_TABLE_DEF_CHANGED) - clustrixdb_mark_table_for_discovery(table); - - DBUG_RETURN(error_code); -} - -int ha_clustrixdb::direct_update_rows_init(List *update_fields) -{ - DBUG_ENTER("ha_clustrixdb::direct_update_rows_init"); - THD *thd= ha_thd(); - if (!THDVAR(thd, enable_direct_update)) - DBUG_RETURN(HA_ERR_WRONG_COMMAND); - DBUG_RETURN(0); -} - -int ha_clustrixdb::direct_update_rows(ha_rows *update_rows, ha_rows *found_rows) -{ - DBUG_ENTER("ha_clustrixdb::direct_update_rows"); - int error_code= 0; - THD *thd= ha_thd(); - clustrix_connection *trx= get_trx(thd, &error_code); - if (!trx) - return error_code; - - String update_stmt; - update_stmt.append(thd->query_string.str()); - - if (!thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) - trx->auto_commit_next(); - - error_code = trx->update_query(update_stmt, table->s->db, update_rows); - *found_rows = *update_rows; - DBUG_RETURN(error_code); -} - -void ha_clustrixdb::start_bulk_insert(ha_rows rows, uint flags) -{ - DBUG_ENTER("ha_clustrixdb::start_bulk_insert"); - int error_code= 0; - THD *thd= ha_thd(); - clustrix_connection *trx= get_trx(thd, &error_code); - if (!trx) { - // TBD log this - DBUG_VOID_RETURN; - } - - upsert_flag |= CLUSTRIX_BULK_UPSERT; - - DBUG_VOID_RETURN; -} - -int ha_clustrixdb::end_bulk_insert() -{ - DBUG_ENTER("ha_clustrixdb::end_bulk_insert"); - upsert_flag &= ~CLUSTRIX_BULK_UPSERT; - upsert_flag &= ~CLUSTRIX_HAS_UPSERT; - upsert_flag &= ~CLUSTRIX_UPSERT_SENT; - DBUG_RETURN(0); -} - -int ha_clustrixdb::delete_row(const uchar *buf) -{ - int error_code; - THD *thd = ha_thd(); - clustrix_connection *trx = get_trx(thd, &error_code); - if (!trx) - return error_code; - - // The estimate should consider only key fields widths. - size_t packed_key_len; - uchar *packed_key = (uchar*) my_alloca(estimate_row_size(table)); - build_key_packed_row(table->s->primary_key, buf, packed_key, &packed_key_len); - - error_code = trx->key_delete(clustrix_table_oid, packed_key, packed_key_len); - - if (error_code == HA_ERR_TABLE_DEF_CHANGED) - clustrixdb_mark_table_for_discovery(table); - - if (packed_key) - my_afree(packed_key); - - return error_code; -} - -ha_clustrixdb::Table_flags ha_clustrixdb::table_flags(void) const -{ - Table_flags flags = HA_PARTIAL_COLUMN_READ | - HA_REC_NOT_IN_SEQ | - HA_FAST_KEY_READ | - HA_NULL_IN_KEY | - HA_CAN_INDEX_BLOBS | - HA_AUTO_PART_KEY | - HA_CAN_SQL_HANDLER | - HA_BINLOG_STMT_CAPABLE | - HA_CAN_TABLE_CONDITION_PUSHDOWN | - HA_CAN_DIRECT_UPDATE_AND_DELETE; - - return flags; -} - -ulong ha_clustrixdb::index_flags(uint idx, uint part, bool all_parts) const -{ - ulong flags = HA_READ_NEXT | - HA_READ_PREV | - HA_READ_ORDER | - HA_READ_RANGE; - - return flags; -} - -ha_rows ha_clustrixdb::records() -{ - return 10000; -} - -ha_rows ha_clustrixdb::records_in_range(uint inx, key_range *min_key, - key_range *max_key) -{ - return 2; -} - -int ha_clustrixdb::info(uint flag) -{ - //THD *thd = ha_thd(); - if (flag & HA_STATUS_TIME) - { - /* Retrieve the time of the most recent update to the table */ - // stats.update_time = - } - - if (flag & HA_STATUS_AUTO) - { - /* Retrieve the latest auto_increment value */ - stats.auto_increment_value = next_insert_id; - } - - if (flag & HA_STATUS_VARIABLE) - { - /* Retrieve variable info, such as row counts and file lengths */ - stats.records = records(); - stats.deleted = 0; - // stats.data_file_length = - // stats.index_file_length = - // stats.delete_length = - stats.check_time = 0; - // stats.mrr_length_per_rec = - - if (stats.records == 0) - stats.mean_rec_length = 0; - else - stats.mean_rec_length = (ulong) - (stats.data_file_length / stats.records); - } - - if (flag & HA_STATUS_CONST) - { - /* - Retrieve constant info, such as file names, max file lengths, - create time, block size - */ - // stats.max_data_file_length = - // stats.create_time = - // stats.block_size = - } - - return 0; -} - -int ha_clustrixdb::index_init(uint idx, bool sorted) -{ - int error_code = 0; - THD *thd = ha_thd(); - clustrix_connection *trx = get_trx(thd, &error_code); - if (!trx) - return error_code; - - active_index = idx; - add_current_table_to_rpl_table_list(&rgi, thd, table); - scan_cur = NULL; - - /* Return all columns until there is a better understanding of - requirements. */ - if (my_bitmap_init(&scan_fields, NULL, table->read_set->n_bits, false)) - return ER_OUTOFMEMORY; - bitmap_set_all(&scan_fields); - sorted_scan = sorted; - - return 0; -} - -int ha_clustrixdb::index_read(uchar * buf, const uchar * key, uint key_len, - enum ha_rkey_function find_flag) -{ - DBUG_ENTER("ha_clustrixdb::index_read"); - int error_code = 0; - THD *thd = ha_thd(); - clustrix_connection *trx = get_trx(thd, &error_code); - if (!trx) - DBUG_RETURN(error_code); - - key_restore(buf, key, &table->key_info[active_index], key_len); - // The estimate should consider only key fields widths. - size_t packed_key_len; - uchar *packed_key = (uchar*) my_alloca(estimate_row_size(table)); - build_key_packed_row(active_index, buf, packed_key, &packed_key_len); - - bool exact = false; - clustrix_connection::scan_type st; - switch (find_flag) { - case HA_READ_KEY_EXACT: - exact = true; - break; - case HA_READ_KEY_OR_NEXT: - st = clustrix_connection::READ_KEY_OR_NEXT; - break; - case HA_READ_KEY_OR_PREV: - st = clustrix_connection::READ_KEY_OR_PREV; - break; - case HA_READ_AFTER_KEY: - st = clustrix_connection::READ_AFTER_KEY; - break; - case HA_READ_BEFORE_KEY: - st = clustrix_connection::READ_BEFORE_KEY; - break; - case HA_READ_PREFIX: - case HA_READ_PREFIX_LAST: - case HA_READ_PREFIX_LAST_OR_PREV: - case HA_READ_MBR_CONTAIN: - case HA_READ_MBR_INTERSECT: - case HA_READ_MBR_WITHIN: - case HA_READ_MBR_DISJOINT: - case HA_READ_MBR_EQUAL: - DBUG_RETURN(ER_NOT_SUPPORTED_YET); - } - - uchar *rowdata = NULL; - if (exact) { - is_scan = false; - ulong rowdata_length; - error_code = trx->key_read(clustrix_table_oid, 0, clx_lock_type, - table->read_set, packed_key, packed_key_len, - &rowdata, &rowdata_length); - if (!error_code) - error_code = unpack_row_to_buf(rgi, table, buf, rowdata, - table->read_set, - rowdata + rowdata_length); - } else { - is_scan = true; - error_code = trx->scan_from_key(clustrix_table_oid, active_index, - clx_lock_type, st, -1, sorted_scan, - &scan_fields, packed_key, packed_key_len, - THDVAR(thd, row_buffer), &scan_cur); - if (!error_code) - error_code = rnd_next(buf); - } - - if (rowdata) - my_free(rowdata); - - if (packed_key) - my_afree(packed_key); - - if (error_code == HA_ERR_TABLE_DEF_CHANGED) - clustrixdb_mark_table_for_discovery(table); - - DBUG_RETURN(error_code); -} - -int ha_clustrixdb::index_first(uchar *buf) -{ - DBUG_ENTER("ha_clustrixdb::index_first"); - int error_code = 0; - THD *thd = ha_thd(); - clustrix_connection *trx = get_trx(thd, &error_code); - if (!trx) - DBUG_RETURN(error_code); - - error_code = trx->scan_from_key(clustrix_table_oid, active_index, - clx_lock_type, - clustrix_connection::READ_FROM_START, - -1, sorted_scan, &scan_fields, NULL, 0, - THDVAR(thd, row_buffer), &scan_cur); - - if (error_code == HA_ERR_TABLE_DEF_CHANGED) - clustrixdb_mark_table_for_discovery(table); - - if (error_code) - DBUG_RETURN(error_code); - - DBUG_RETURN(rnd_next(buf)); -} - -int ha_clustrixdb::index_last(uchar *buf) -{ - DBUG_ENTER("ha_clustrixdb::index_last"); - int error_code = 0; - THD *thd = ha_thd(); - clustrix_connection *trx = get_trx(thd, &error_code); - if (!trx) - DBUG_RETURN(error_code); - - error_code = trx->scan_from_key(clustrix_table_oid, active_index, - clx_lock_type, - clustrix_connection::READ_FROM_LAST, - -1, sorted_scan, &scan_fields, NULL, 0, - THDVAR(thd, row_buffer), &scan_cur); - - if (error_code == HA_ERR_TABLE_DEF_CHANGED) - clustrixdb_mark_table_for_discovery(table); - - if (error_code) - DBUG_RETURN(error_code); - - DBUG_RETURN(rnd_next(buf)); -} - -int ha_clustrixdb::index_next(uchar *buf) -{ - DBUG_ENTER("index_next"); - DBUG_RETURN(rnd_next(buf)); -} - -#if 0 -int ha_clustrixdb::index_next_same(uchar *buf, const uchar *key, uint keylen) -{ - DBUG_ENTER("index_next_same"); - DBUG_RETURN(rnd_next(buf)); -} -#endif - -int ha_clustrixdb::index_prev(uchar *buf) -{ - DBUG_ENTER("index_prev"); - DBUG_RETURN(rnd_next(buf)); -} - -int ha_clustrixdb::index_end() -{ - DBUG_ENTER("index_prev"); - if (scan_cur) - DBUG_RETURN(rnd_end()); - else - DBUG_RETURN(0); -} - -int ha_clustrixdb::rnd_init(bool scan) -{ - DBUG_ENTER("ha_clustrixdb::rnd_init"); - int error_code = 0; - THD *thd = ha_thd(); - if (thd->lex->sql_command == SQLCOM_UPDATE) - DBUG_RETURN(error_code); - clustrix_connection *trx = get_trx(thd, &error_code); - if (!trx) - DBUG_RETURN(error_code); - - add_current_table_to_rpl_table_list(&rgi, thd, table); - is_scan = scan; - scan_cur = NULL; - - if (my_bitmap_init(&scan_fields, NULL, table->read_set->n_bits, false)) - DBUG_RETURN(ER_OUTOFMEMORY); - -#if 0 - if (table->s->keys) - table->mark_columns_used_by_index(table->s->primary_key, &scan_fields); - else - bitmap_clear_all(&scan_fields); - - bitmap_union(&scan_fields, table->read_set); -#else - /* Why is read_set not setup correctly? */ - bitmap_set_all(&scan_fields); -#endif - - error_code = trx->scan_table(clustrix_table_oid, clx_lock_type, - &scan_fields, THDVAR(thd, row_buffer), - &scan_cur); - - if (error_code == HA_ERR_TABLE_DEF_CHANGED) - clustrixdb_mark_table_for_discovery(table); - - if (error_code) - DBUG_RETURN(error_code); - - DBUG_RETURN(0); -} - -int ha_clustrixdb::rnd_next(uchar *buf) -{ - int error_code = 0; - THD *thd = ha_thd(); - clustrix_connection *trx = get_trx(thd, &error_code); - if (!trx) - return error_code; - - assert(is_scan); - assert(scan_cur); - - uchar *rowdata; - ulong rowdata_length; - if ((error_code = trx->scan_next(scan_cur, &rowdata, &rowdata_length))) - return error_code; - - if (has_hidden_key) { - last_hidden_key = *(ulonglong *)rowdata; - rowdata += 8; - rowdata_length -= 8; - } - - error_code = unpack_row_to_buf(rgi, table, buf, rowdata, &scan_fields, - rowdata + rowdata_length); - - if (error_code) - return error_code; - - return 0; -} - -int ha_clustrixdb::rnd_pos(uchar * buf, uchar *pos) -{ - DBUG_ENTER("clx_rnd_pos"); - DBUG_DUMP("pos", pos, ref_length); - - int error_code = 0; - THD *thd = ha_thd(); - clustrix_connection *trx = get_trx(thd, &error_code); - if (!trx) - DBUG_RETURN(error_code); - - /* WDD: We need a way to convert key buffers directy to rbr buffers. */ - - if (has_hidden_key) { - memcpy(&last_hidden_key, pos, sizeof(ulonglong)); - } else { - uint keyno = table->s->primary_key; - uint len = calculate_key_len(table, keyno, pos, - table->const_key_parts[keyno]); - key_restore(buf, pos, &table->key_info[keyno], len); - } - - // The estimate should consider only key fields widths. - uchar *packed_key = (uchar*) my_alloca(estimate_row_size(table)); - size_t packed_key_len; - build_key_packed_row(table->s->primary_key, buf, packed_key, &packed_key_len); - - uchar *rowdata = NULL; - ulong rowdata_length; - if ((error_code = trx->key_read(clustrix_table_oid, 0, clx_lock_type, - table->read_set, packed_key, packed_key_len, - &rowdata, &rowdata_length))) - goto err; - - if ((error_code = unpack_row_to_buf(rgi, table, buf, rowdata, table->read_set, - rowdata + rowdata_length))) - goto err; - -err: - if (rowdata) - my_free(rowdata); - - if (packed_key) - my_afree(packed_key); - - if (error_code == HA_ERR_TABLE_DEF_CHANGED) - clustrixdb_mark_table_for_discovery(table); - - DBUG_RETURN(error_code); -} - -int ha_clustrixdb::rnd_end() -{ - DBUG_ENTER("ha_clustrixdb::rnd_end()"); - int error_code = 0; - THD *thd = ha_thd(); - if (thd->lex->sql_command == SQLCOM_UPDATE) - DBUG_RETURN(error_code); - - clustrix_connection *trx = get_trx(thd, &error_code); - if (!trx) - DBUG_RETURN(error_code); - - my_bitmap_free(&scan_fields); - if (scan_cur && (error_code = trx->scan_end(scan_cur))) - DBUG_RETURN(error_code); - scan_cur = NULL; - - DBUG_RETURN(0); -} - -void ha_clustrixdb::position(const uchar *record) -{ - DBUG_ENTER("clx_position"); - if (has_hidden_key) { - memcpy(ref, &last_hidden_key, sizeof(ulonglong)); - } else { - KEY* key_info = table->key_info + table->s->primary_key; - key_copy(ref, record, key_info, key_info->key_length); - } - DBUG_DUMP("key", ref, ref_length); - DBUG_VOID_RETURN; -} - -uint ha_clustrixdb::lock_count(void) const -{ - /* Hopefully, we don't need to use thread locks */ - return 0; -} - -THR_LOCK_DATA **ha_clustrixdb::store_lock(THD *thd, - THR_LOCK_DATA **to, - enum thr_lock_type lock_type) -{ - /* Hopefully, we don't need to use thread locks */ - return to; -} - -int ha_clustrixdb::external_lock(THD *thd, int lock_type) -{ - DBUG_ENTER("ha_clustrixdb::external_lock()"); - int error_code; - clustrix_connection *trx = get_trx(thd, &error_code); - if (error_code) - DBUG_RETURN(error_code); - - if (lock_type == F_WRLCK) - clx_lock_type = CLUSTRIX_EXCLUSIVE; - else if (lock_type == F_RDLCK) - clx_lock_type = CLUSTRIX_SHARED; - else if (lock_type == F_UNLCK) - clx_lock_type = CLUSTRIX_NO_LOCKS; - - if (lock_type != F_UNLCK) { - if (!trx->has_open_transaction()) { - error_code = trx->begin_transaction_next(); - if (error_code) - DBUG_RETURN(error_code); - } - - trans_register_ha(thd, FALSE, clustrixdb_hton); - if (thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) - trans_register_ha(thd, TRUE, clustrixdb_hton); - } - - DBUG_RETURN(error_code); -} - -/**************************************************************************** - Engine Condition Pushdown -****************************************************************************/ - -const COND *ha_clustrixdb::cond_push(const COND *cond) -{ - return cond; -} - -void ha_clustrixdb::cond_pop() -{ -} - -int ha_clustrixdb::info_push(uint info_type, void *info) -{ - return 0; -} - -/**************************************************************************** -** Row encoding functions -****************************************************************************/ - -void add_current_table_to_rpl_table_list(rpl_group_info **_rgi, THD *thd, - TABLE *table) -{ - if (*_rgi) - return; - - Relay_log_info *rli = new Relay_log_info(FALSE); - rli->sql_driver_thd = thd; - - rpl_group_info *rgi = new rpl_group_info(rli); - *_rgi = rgi; - rgi->thd = thd; - rgi->tables_to_lock_count = 0; - rgi->tables_to_lock = NULL; - if (rgi->tables_to_lock_count) - return; - - rgi->tables_to_lock = (RPL_TABLE_LIST *)my_malloc(sizeof(RPL_TABLE_LIST), - MYF(MY_WME)); - rgi->tables_to_lock->init_one_table(&table->s->db, &table->s->table_name, 0, - TL_READ); - rgi->tables_to_lock->table = table; - rgi->tables_to_lock->table_id = table->tablenr; - rgi->tables_to_lock->m_conv_table = NULL; - rgi->tables_to_lock->master_had_triggers = FALSE; - rgi->tables_to_lock->m_tabledef_valid = TRUE; - // We need one byte per column to save a column's binlog type. - uchar *col_type = (uchar*) my_alloca(table->s->fields); - for (uint i = 0 ; i < table->s->fields ; ++i) - col_type[i] = table->field[i]->binlog_type(); - - table_def *tabledef = &rgi->tables_to_lock->m_tabledef; - new (tabledef) table_def(col_type, table->s->fields, NULL, 0, NULL, 0); - rgi->tables_to_lock_count++; - if (col_type) - my_afree(col_type); -} - -void remove_current_table_from_rpl_table_list(rpl_group_info *rgi) -{ - if (!rgi->tables_to_lock) - return; - - rgi->tables_to_lock->m_tabledef.table_def::~table_def(); - rgi->tables_to_lock->m_tabledef_valid = FALSE; - my_free(rgi->tables_to_lock); - rgi->tables_to_lock_count--; - rgi->tables_to_lock = NULL; - delete rgi->rli; - delete rgi; -} - -void ha_clustrixdb::build_key_packed_row(uint index, const uchar *buf, - uchar *packed_key, - size_t *packed_key_len) -{ - if (index == table->s->primary_key && has_hidden_key) { - memcpy(packed_key, &last_hidden_key, sizeof(ulonglong)); - *packed_key_len = sizeof(ulonglong); - } else { - // make a row from the table - table->mark_columns_used_by_index(index, &table->tmp_set); - *packed_key_len = pack_row(table, &table->tmp_set, packed_key, buf); - } -} - -int unpack_row_to_buf(rpl_group_info *rgi, TABLE *table, uchar *data, - uchar const *const row_data, MY_BITMAP const *cols, - uchar const *const row_end) -{ - /* Since unpack_row can only write to record[0], if 'data' does not point to - table->record[0], we must back it up and then restore it afterwards. */ - uchar const *current_row_end; - ulong master_reclength; - uchar *backup_row = NULL; - if (data != table->record[0]) { - /* See Update_rows_log_event::do_exec_row(rpl_group_info *rgi) - and the definitions of store_record and restore_record. */ - backup_row = (uchar*) my_alloca(table->s->reclength); - memcpy(backup_row, table->record[0], table->s->reclength); - restore_record(table, record[data == table->record[1] ? 1 : 2]); - } - - int error_code = unpack_row(rgi, table, table->s->fields, row_data, cols, - ¤t_row_end, &master_reclength, row_end); - - if (backup_row) { - store_record(table, record[data == table->record[1] ? 1 : 2]); - memcpy(table->record[0], backup_row, table->s->reclength); - my_afree(backup_row); - } - - return error_code; -} - -/**************************************************************************** -** Plugin Functions -****************************************************************************/ - -static int clustrixdb_commit(handlerton *hton, THD *thd, bool all) -{ - clustrix_connection* trx = (clustrix_connection *) thd_get_ha_data(thd, hton); - assert(trx); - - int error_code = 0; - if (trx->has_open_transaction()) { - if (all || !thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) - error_code = trx->commit_transaction(); - else - error_code = trx->new_statement_next(); - } - - return error_code; -} - -static int clustrixdb_rollback(handlerton *hton, THD *thd, bool all) -{ - clustrix_connection* trx = (clustrix_connection *) thd_get_ha_data(thd, hton); - assert(trx); - - int error_code = 0; - if (trx->has_open_transaction()) { - if (all || !thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) - error_code = trx->rollback_transaction(); - else - error_code = trx->rollback_statement_next(); - } - - return error_code; -} - -static handler* clustrixdb_create_handler(handlerton *hton, TABLE_SHARE *table, - MEM_ROOT *mem_root) -{ - return new (mem_root) ha_clustrixdb(hton, table); -} - -static int clustrixdb_close_connection(handlerton* hton, THD* thd) -{ - clustrix_connection* trx = (clustrix_connection *) thd_get_ha_data(thd, hton); - if (!trx) - return 0; /* Transaction is not started */ - - int error_code = clustrixdb_rollback(clustrixdb_hton, thd, TRUE); - - delete trx; - - return error_code; -} - -static int clustrixdb_panic(handlerton *hton, ha_panic_function type) -{ - return 0; -} - -static bool clustrixdb_show_status(handlerton *hton, THD *thd, - stat_print_fn *stat_print, - enum ha_stat_type stat_type) -{ - return FALSE; -} - -static int clustrixdb_discover_table_names(handlerton *hton, LEX_CSTRING *db, - MY_DIR *dir, - handlerton::discovered_list *result) -{ - clustrix_connection *clustrix_net = new clustrix_connection(); - int error_code = clustrix_net->connect(); - if (error_code) - goto err; - - clustrix_net->populate_table_list(db, result); - -err: - delete clustrix_net; - return error_code; -} - -int clustrixdb_discover_table(handlerton *hton, THD *thd, TABLE_SHARE *share) -{ - clustrix_connection *clustrix_net = new clustrix_connection(); - int error_code = clustrix_net->connect(); - if (error_code) - goto err; - - error_code = clustrix_net->discover_table_details(&share->db, - &share->table_name, - thd, share); - -err: - delete clustrix_net; - return error_code; -} - -static int clustrixdb_init(void *p) -{ - DBUG_ENTER("clustrixdb_init"); - clustrixdb_hton = (handlerton *) p; - clustrixdb_hton->flags = HTON_NO_FLAGS; - clustrixdb_hton->panic = clustrixdb_panic; - clustrixdb_hton->close_connection = clustrixdb_close_connection; - clustrixdb_hton->commit = clustrixdb_commit; - clustrixdb_hton->rollback = clustrixdb_rollback; - clustrixdb_hton->create = clustrixdb_create_handler; - clustrixdb_hton->show_status = clustrixdb_show_status; - clustrixdb_hton->discover_table_names = clustrixdb_discover_table_names; - clustrixdb_hton->discover_table = clustrixdb_discover_table; - clustrixdb_hton->create_select = create_clustrixdb_select_handler; - clustrixdb_hton->create_derived = create_clustrixdb_derived_handler; - - update_host_list(clustrix_host); - - DBUG_RETURN(0); -} - -static int clustrixdb_deinit(void *p) -{ - DBUG_ENTER("clustrixdb_deinit"); - free_host_list(); - DBUG_RETURN(0); -} - -struct st_mysql_show_var clustrixdb_status_vars[] = -{ - {NullS, NullS, SHOW_LONG} -}; - -static struct st_mysql_sys_var* clustrixdb_system_variables[] = -{ - MYSQL_SYSVAR(connect_timeout), - MYSQL_SYSVAR(read_timeout), - MYSQL_SYSVAR(write_timeout), - MYSQL_SYSVAR(host), - MYSQL_SYSVAR(username), - MYSQL_SYSVAR(password), - MYSQL_SYSVAR(port), - MYSQL_SYSVAR(socket), - MYSQL_SYSVAR(row_buffer), - MYSQL_SYSVAR(select_handler), - MYSQL_SYSVAR(derived_handler), - MYSQL_SYSVAR(enable_direct_update), - NULL -}; - -static struct st_mysql_storage_engine clustrixdb_storage_engine = - {MYSQL_HANDLERTON_INTERFACE_VERSION}; - -maria_declare_plugin(clustrixdb) -{ - MYSQL_STORAGE_ENGINE_PLUGIN, /* Plugin Type */ - &clustrixdb_storage_engine, /* Plugin Descriptor */ - "CLUSTRIXDB", /* Plugin Name */ - "MariaDB", /* Plugin Author */ - "ClustrixDB storage engine", /* Plugin Description */ - PLUGIN_LICENSE_GPL, /* Plugin Licence */ - clustrixdb_init, /* Plugin Entry Point */ - clustrixdb_deinit, /* Plugin Deinitializer */ - 0x0001, /* Hex Version Number (0.1) */ - NULL /* clustrixdb_status_vars */, /* Status Variables */ - clustrixdb_system_variables, /* System Variables */ - "0.1", /* String Version */ - MariaDB_PLUGIN_MATURITY_EXPERIMENTAL /* Maturity Level */ -} -maria_declare_plugin_end; diff --git a/storage/clustrixdb/ha_clustrixdb.h b/storage/clustrixdb/ha_clustrixdb.h deleted file mode 100644 index 461b17e4438..00000000000 --- a/storage/clustrixdb/ha_clustrixdb.h +++ /dev/null @@ -1,130 +0,0 @@ -/***************************************************************************** -Copyright (c) 2019, MariaDB Corporation. -*****************************************************************************/ - -#ifndef _ha_clustrixdb_h -#define _ha_clustrixdb_h - -#ifdef USE_PRAGMA_INTERFACE -#pragma interface /* gcc class implementation */ -#endif - -#define MYSQL_SERVER 1 -#include "clustrix_connection.h" -#include "my_bitmap.h" -#include "table.h" -#include "rpl_rli.h" -#include "handler.h" -#include "sql_class.h" -#include "sql_show.h" -#include "mysql.h" -#include "../../sql/rpl_record.h" - -size_t estimate_row_size(TABLE *table); -clustrix_connection *get_trx(THD *thd, int *error_code); -bool get_enable_sh(THD* thd); -void add_current_table_to_rpl_table_list(rpl_group_info **_rgi, THD *thd, - TABLE *table); -void remove_current_table_from_rpl_table_list(rpl_group_info *rgi); -int unpack_row_to_buf(rpl_group_info *rgi, TABLE *table, uchar *data, - uchar const *const row_data, MY_BITMAP const *cols, - uchar const *const row_end); - -class ha_clustrixdb : public handler -{ -private: - ulonglong clustrix_table_oid; - rpl_group_info *rgi; - - Field *auto_inc_field; - ulonglong auto_inc_value; - - bool has_hidden_key; - ulonglong last_hidden_key; - clustrix_connection_cursor *scan_cur; - bool is_scan; - MY_BITMAP scan_fields; - bool sorted_scan; - clustrix_lock_mode_t clx_lock_type; - - uint last_dup_errkey; - - typedef enum clustrix_upsert_flags { - CLUSTRIX_HAS_UPSERT= 1, - CLUSTRIX_BULK_UPSERT= 2, - CLUSTRIX_UPSERT_SENT= 4 - } clx_upsert_flags_t; - int upsert_flag; - -public: - ha_clustrixdb(handlerton *hton, TABLE_SHARE *table_arg); - ~ha_clustrixdb(); - int create(const char *name, TABLE *form, HA_CREATE_INFO *info); - int delete_table(const char *name); - int rename_table(const char* from, const char* to); - int open(const char *name, int mode, uint test_if_locked); - int close(void); - int reset(); - int extra(enum ha_extra_function operation); - int write_row(const uchar *buf); - // start_bulk_update exec_bulk_update - int update_row(const uchar *old_data, const uchar *new_data); - // start_bulk_delete exec_bulk_delete - int delete_row(const uchar *buf); - int direct_update_rows_init(List *update_fields); - int direct_update_rows(ha_rows *update_rows, ha_rows *found_rows); - void start_bulk_insert(ha_rows rows, uint flags = 0); - int end_bulk_insert(); - - Table_flags table_flags(void) const; - ulong index_flags(uint idx, uint part, bool all_parts) const; - uint max_supported_keys() const { return MAX_KEY; } - - ha_rows records(); - ha_rows records_in_range(uint inx, key_range *min_key, - key_range *max_key); - - int info(uint flag); // see my_base.h for full description - - // multi_read_range - // read_range - int index_init(uint idx, bool sorted); - int index_read(uchar * buf, const uchar * key, uint key_len, - enum ha_rkey_function find_flag); - int index_first(uchar *buf); - int index_prev(uchar *buf); - int index_last(uchar *buf); - int index_next(uchar *buf); - //int index_next_same(uchar *buf, const uchar *key, uint keylen); - int index_end(); - - int rnd_init(bool scan); - int rnd_next(uchar *buf); - int rnd_pos(uchar * buf, uchar *pos); - int rnd_end(); - - void position(const uchar *record); - uint lock_count(void) const; - THR_LOCK_DATA **store_lock(THD *thd, - THR_LOCK_DATA **to, - enum thr_lock_type lock_type); - int external_lock(THD *thd, int lock_type); - - uint8 table_cache_type() - { - return(HA_CACHE_TBL_NOCACHE); - } - - const COND *cond_push(const COND *cond); - void cond_pop(); - int info_push(uint info_type, void *info); - -private: - void build_key_packed_row(uint index, const uchar *buf, - uchar *packed_key, size_t *packed_key_len); -}; - -bool select_handler_setting(THD* thd); -bool derived_handler_setting(THD* thd); -uint row_buffer_setting(THD* thd); -#endif // _ha_clustrixdb_h diff --git a/storage/clustrixdb/ha_clustrixdb_pushdown.cc b/storage/clustrixdb/ha_clustrixdb_pushdown.cc deleted file mode 100644 index 58cf01cce44..00000000000 --- a/storage/clustrixdb/ha_clustrixdb_pushdown.cc +++ /dev/null @@ -1,478 +0,0 @@ -/***************************************************************************** -Copyright (c) 2019, MariaDB Corporation. -*****************************************************************************/ - -#include "ha_clustrixdb.h" -#include "ha_clustrixdb_pushdown.h" - -extern handlerton *clustrixdb_hton; -extern uint clustrix_row_buffer; - -/*@brief Fills up array data types, metadata and nullability*/ -/************************************************************ - * DESCRIPTION: - * Fills up three arrays with: field binlog data types, field - * metadata and nullability bitmask as in Table_map_log_event - * ctor. Internally creates a temporary table as does - * Pushdown_select. DH uses the actual temp table w/o - * b/c create_DH is called later compared to create_SH. - * More details in server/sql/log_event_server.cc - * PARAMETERS: - * thd - THD* - * table__ - TABLE* temp table for the results - * sl - SELECT_LEX* - * fieldtype - uchar* - * field_metadata - uchar* - * null_bits - uchar* - * num_null_bytes - null bit size - * fields_count - a number of fields - * RETURN: - * metadata_size int or -1 in case of error - ************************************************************/ -int get_field_types(THD *thd, TABLE *table__, SELECT_LEX *sl, uchar *fieldtype, - uchar *field_metadata, uchar *null_bits, const int num_null_bytes, const uint fields_count) -{ - int field_metadata_size = 0; - int metadata_index = 0; - TABLE *tmp_table= table__; - - if (!tmp_table) { - // Construct a tmp table with fields to find out result DTs. - // This should be reconsidered if it worths the effort. - List types; - TMP_TABLE_PARAM tmp_table_param; - sl->master_unit()->join_union_item_types(thd, types, 1); - tmp_table_param.init(); - tmp_table_param.field_count= types.elements; - - tmp_table = create_tmp_table(thd, &tmp_table_param, types, - (ORDER *) 0, false, 0, - TMP_TABLE_ALL_COLUMNS, 1, - &empty_clex_str, true, false); - if (!tmp_table) { - field_metadata_size = -1; - goto err; - } - } - - for (unsigned int i = 0 ; i < fields_count; ++i) { - fieldtype[i]= tmp_table->field[i]->binlog_type(); - } - - bzero(field_metadata, (fields_count * 2)); - for (unsigned int i= 0 ; i < fields_count ; i++) - { - Binlog_type_info bti= tmp_table->field[i]->binlog_type_info(); - uchar *ptr = reinterpret_cast(&bti.m_metadata); - memcpy(&field_metadata[metadata_index], ptr, bti.m_metadata_size); - metadata_index+= bti.m_metadata_size; - } - - if (metadata_index < 251) - field_metadata_size += metadata_index + 1; - else - field_metadata_size += metadata_index + 3; - - bzero(null_bits, num_null_bytes); - for (unsigned int i= 0 ; i < fields_count ; ++i) { - if (tmp_table->field[i]->maybe_null()) { - null_bits[(i / 8)]+= 1 << (i % 8); - } - } - - if (!table__) - free_tmp_table(thd, tmp_table); -err: - return field_metadata_size; -} - - -/*@brief create_clustrixdb_select_handler- Creates handler*/ -/************************************************************ - * DESCRIPTION: - * Creates a select handler - * More details in server/sql/select_handler.h - * PARAMETERS: - * thd - THD pointer. - * sel - SELECT_LEX* that describes the query. - * RETURN: - * select_handler if possible - * NULL otherwise - ************************************************************/ -select_handler* -create_clustrixdb_select_handler(THD* thd, SELECT_LEX* select_lex) -{ - ha_clustrixdb_select_handler *sh = NULL; - if (!select_handler_setting(thd)) { - return sh; - } - - // TODO Return early for EXPLAIN before we run the actual scan. - // We can send compile request when we separate compilation - // and execution. - clustrix_connection_cursor *scan = NULL; - if (thd->lex->describe) { - sh = new ha_clustrixdb_select_handler(thd, select_lex, scan); - return sh; - } - - // Multi-update runs an implicit query to collect constraints. - // SH couldn't be used for this. - if (thd->lex->sql_command == SQLCOM_UPDATE_MULTI) { - return sh; - } - - String query; - // Print the query into a string provided - select_lex->print(thd, &query, QT_ORDINARY); - int error_code = 0; - int field_metadata_size = 0; - clustrix_connection *trx = NULL; - - // We presume this number is equal to types.elements in get_field_types - uint items_number = select_lex->get_item_list()->elements; - uint num_null_bytes = (items_number + 7) / 8; - uchar *fieldtype = NULL; - uchar *null_bits = NULL; - uchar *field_metadata = NULL; - uchar *meta_memory= (uchar *)my_multi_malloc(MYF(MY_WME), &fieldtype, items_number, - &null_bits, num_null_bytes, &field_metadata, (items_number * 2), NULL); - - if (!meta_memory) { - // The only way to say something here is to raise warning - // b/c we will fallback to other access methods: derived handler or rowstore. - goto err; - } - - if((field_metadata_size = - get_field_types(thd, NULL, select_lex, fieldtype, field_metadata, null_bits, num_null_bytes, items_number)) < 0) { - goto err; - } - - trx = get_trx(thd, &error_code); - if (!trx) - goto err; - - if (!thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) - trx->auto_commit_next(); - - if ((error_code = trx->scan_query(query, fieldtype, items_number, null_bits, - num_null_bytes, field_metadata, - field_metadata_size, - row_buffer_setting(thd), &scan))) { - goto err; - } - - sh = new ha_clustrixdb_select_handler(thd, select_lex, scan); - -err: - // deallocate buffers - if (meta_memory) - my_free(meta_memory); - - return sh; -} - -/*********************************************************** - * DESCRIPTION: - * select_handler constructor - * PARAMETERS: - * thd - THD pointer. - * select_lex - sematic tree for the query. - **********************************************************/ -ha_clustrixdb_select_handler::ha_clustrixdb_select_handler( - THD *thd, - SELECT_LEX* select_lex, - clustrix_connection_cursor *scan_) - : select_handler(thd, clustrixdb_hton) -{ - thd__ = thd; - scan = scan_; - select = select_lex; - rgi = NULL; -} - -/*********************************************************** - * DESCRIPTION: - * select_handler constructor - * This frees dynamic memory allocated for bitmap - * and disables replication to SH temp table. - **********************************************************/ -ha_clustrixdb_select_handler::~ha_clustrixdb_select_handler() -{ - int error_code; - clustrix_connection *trx = get_trx(thd, &error_code); - if (!trx) { - // TBD Log this - } - if (trx && scan) - trx->scan_end(scan); - - // If the ::init_scan has been executed - if (table__) - my_bitmap_free(&scan_fields); - - if (rgi) - remove_current_table_from_rpl_table_list(rgi); -} - -/*@brief Initiate the query for select_handler */ -/*********************************************************** - * DESCRIPTION: - * Initializes dynamic structures and sets SH temp table - * as RBR replication destination to unpack rows. - * * PARAMETERS: - * RETURN: - * rc as int - * ********************************************************/ -int ha_clustrixdb_select_handler::init_scan() -{ - // Save this into the base handler class attribute - table__ = table; - // need this bitmap future in next_row() - if (my_bitmap_init(&scan_fields, NULL, table->read_set->n_bits, false)) - return ER_OUTOFMEMORY; - bitmap_set_all(&scan_fields); - - add_current_table_to_rpl_table_list(&rgi, thd__, table__); - - return 0; -} - -/*@brief Fetch next row for select_handler */ -/*********************************************************** - * DESCRIPTION: - * Fetch next row for select_handler. - * PARAMETERS: - * RETURN: - * rc as int - * ********************************************************/ -int ha_clustrixdb_select_handler::next_row() -{ - int error_code = 0; - clustrix_connection *trx = get_trx(thd, &error_code); - if (!trx) - return error_code; - - assert(scan); - - uchar *rowdata; - ulong rowdata_length; - if ((error_code = trx->scan_next(scan, &rowdata, &rowdata_length))) - return error_code; - - uchar const *current_row_end; - ulong master_reclength; - - error_code = unpack_row(rgi, table, table->s->fields, rowdata, - &scan_fields, ¤t_row_end, - &master_reclength, rowdata + rowdata_length); - - if (error_code) - return error_code; - - return 0; -} - -/*@brief Finishes the scan and clean it up */ -/*********************************************************** - * DESCRIPTION: - * Finishes the scan for select handler - * PARAMETERS: - * RETURN: - * rc as int - ***********************************************************/ -int ha_clustrixdb_select_handler::end_scan() -{ - return 0; -} - -/*@brief create_clustrixdb_derived_handler- Creates handler*/ -/************************************************************ - * DESCRIPTION: - * Creates a derived handler - * More details in server/sql/derived_handler.h - * PARAMETERS: - * thd - THD pointer. - * derived - TABLE_LIST* that describes the tables involved - * RETURN: - * derived_handler if possible - * NULL otherwise - ************************************************************/ -derived_handler* -create_clustrixdb_derived_handler(THD* thd, TABLE_LIST *derived) -{ - ha_clustrixdb_derived_handler *dh = NULL; - if (!derived_handler_setting(thd)) { - return dh; - } - - SELECT_LEX_UNIT *unit= derived->derived; - SELECT_LEX *select_lex = unit->first_select(); - String query; - - dh = new ha_clustrixdb_derived_handler(thd, select_lex, NULL); - - return dh; -} - -/*********************************************************** - * DESCRIPTION: - * derived_handler constructor - * PARAMETERS: - * thd - THD pointer. - * select_lex - sematic tree for the query. - **********************************************************/ -ha_clustrixdb_derived_handler::ha_clustrixdb_derived_handler( - THD *thd, - SELECT_LEX* select_lex, - clustrix_connection_cursor *scan_) - : derived_handler(thd, clustrixdb_hton) -{ - thd__ = thd; - scan = scan_; - select = select_lex; - rgi = NULL; -} - -/*********************************************************** - * DESCRIPTION: - * derived_handler constructor - * This frees dynamic memory allocated for bitmap - * and disables replication to SH temp table. - **********************************************************/ -ha_clustrixdb_derived_handler::~ha_clustrixdb_derived_handler() -{ - int error_code; - - - - clustrix_connection *trx = get_trx(thd, &error_code); - if (!trx) { - // TBD Log this. - } - if (trx && scan) - trx->scan_end(scan); - - // If the ::init_scan has been executed - if (table__) - my_bitmap_free(&scan_fields); - - if (rgi) - remove_current_table_from_rpl_table_list(rgi); -} - -/*@brief Initiate the query for derived_handler */ -/*********************************************************** - * DESCRIPTION: - * Initializes dynamic structures and sets SH temp table - * as RBR replication destination to unpack rows. - * * PARAMETERS: - * RETURN: - * rc as int - * ********************************************************/ -int ha_clustrixdb_derived_handler::init_scan() -{ - String query; - // Print the query into a string provided - select->print(thd__, &query, QT_ORDINARY); - int error_code = 0; - int field_metadata_size = 0; - clustrix_connection *trx = NULL; - - // We presume this number is equal to types.elements in get_field_types - uint items_number= select->get_item_list()->elements; - uint num_null_bytes = (items_number + 7) / 8; - uchar *fieldtype = NULL; - uchar *null_bits = NULL; - uchar *field_metadata = NULL; - uchar *meta_memory= (uchar *)my_multi_malloc(MYF(MY_WME), &fieldtype, items_number, - &null_bits, num_null_bytes, &field_metadata, (items_number * 2), NULL); - - if (!meta_memory) { - // The only way to say something here is to raise warning - // b/c we will fallback to other access methods: derived handler or rowstore. - goto err; - } - - if((field_metadata_size= - get_field_types(thd__, table, select, fieldtype, field_metadata, null_bits, num_null_bytes, items_number)) < 0) { - goto err; - } - - trx = get_trx(thd__, &error_code); - if (!trx) - goto err; - - if ((error_code = trx->scan_query(query, fieldtype, items_number, null_bits, - num_null_bytes, field_metadata, - field_metadata_size, - row_buffer_setting(thd), &scan))) { - goto err; - } - - // Save this into the base handler class attribute - table__ = table; - - // need this bitmap future in next_row() - if (my_bitmap_init(&scan_fields, NULL, table->read_set->n_bits, false)) - return ER_OUTOFMEMORY; - bitmap_set_all(&scan_fields); - - add_current_table_to_rpl_table_list(&rgi, thd__, table__); - -err: - // deallocate buffers - if (meta_memory) - my_free(meta_memory); - - return error_code; -} - -/*@brief Fetch next row for derived_handler */ -/*********************************************************** - * DESCRIPTION: - * Fetch next row for derived_handler. - * PARAMETERS: - * RETURN: - * rc as int - * ********************************************************/ -int ha_clustrixdb_derived_handler::next_row() -{ - int error_code = 0; - clustrix_connection *trx = get_trx(thd, &error_code); - if (!trx) - return error_code; - - assert(scan); - - uchar *rowdata; - ulong rowdata_length; - if ((error_code = trx->scan_next(scan, &rowdata, &rowdata_length))) - return error_code; - - uchar const *current_row_end; - ulong master_reclength; - - error_code = unpack_row(rgi, table, table->s->fields, rowdata, - &scan_fields, ¤t_row_end, - &master_reclength, rowdata + rowdata_length); - - if (error_code) - return error_code; - - return 0; -} - -/*@brief Finishes the scan and clean it up */ -/*********************************************************** - * DESCRIPTION: - * Finishes the scan for derived handler - * PARAMETERS: - * RETURN: - * rc as int - ***********************************************************/ -int ha_clustrixdb_derived_handler::end_scan() -{ - return 0; -} diff --git a/storage/clustrixdb/ha_clustrixdb_pushdown.h b/storage/clustrixdb/ha_clustrixdb_pushdown.h deleted file mode 100644 index 2f08bd427b0..00000000000 --- a/storage/clustrixdb/ha_clustrixdb_pushdown.h +++ /dev/null @@ -1,87 +0,0 @@ -/***************************************************************************** -Copyright (c) 2019, MariaDB Corporation. -*****************************************************************************/ -#ifndef _ha_clustrixdb_pushdown_h -#define _ha_clustrixdb_pushdown_h - -#include "select_handler.h" -#include "derived_handler.h" -#include "sql_select.h" - -/*@brief base_handler class*/ -/*********************************************************** - * DESCRIPTION: - * To be described - ************************************************************/ -class ha_clustrixdb_base_handler -{ - // To simulate abstract class - protected: - ha_clustrixdb_base_handler(): thd__(0),table__(0) {} - ~ha_clustrixdb_base_handler() {} - - // Copies of pushdown handlers attributes - // to use them in shared methods. - THD *thd__; - TABLE *table__; - // The bitmap used to sent - MY_BITMAP scan_fields; - // Structures to unpack RBR rows from CLX BE - rpl_group_info *rgi; - // CLX BE scan operation reference - clustrix_connection_cursor *scan; -}; - -/*@brief select_handler class*/ -/*********************************************************** - * DESCRIPTION: - * select_handler API methods. Could be used by the server - * tp pushdown the whole query described by SELECT_LEX. - * More details in server/sql/select_handler.h - * sel semantic tree for the query in SELECT_LEX. - ************************************************************/ -class ha_clustrixdb_select_handler: - private ha_clustrixdb_base_handler, - public select_handler -{ - public: - ha_clustrixdb_select_handler(THD* thd_arg, SELECT_LEX* sel, - clustrix_connection_cursor *scan); - ~ha_clustrixdb_select_handler(); - - int init_scan(); - int next_row(); - int end_scan(); - void print_error(int, unsigned long) {} -}; - -/*@brief derived_handler class*/ -/*********************************************************** - * DESCRIPTION: - * derived_handler API methods. Could be used by the server - * tp pushdown the whole query described by SELECT_LEX. - * More details in server/sql/derived_handler.h - * sel semantic tree for the query in SELECT_LEX. - ************************************************************/ -class ha_clustrixdb_derived_handler: - private ha_clustrixdb_base_handler, - public derived_handler -{ - public: - ha_clustrixdb_derived_handler(THD* thd_arg, SELECT_LEX* sel, - clustrix_connection_cursor *scan); - ~ha_clustrixdb_derived_handler(); - - int init_scan(); - int next_row(); - int end_scan(); - void print_error(int, unsigned long) {} -}; - - -select_handler *create_clustrixdb_select_handler(THD* thd, - SELECT_LEX* select_lex); -derived_handler *create_clustrixdb_derived_handler(THD* thd, - TABLE_LIST *derived); - -#endif diff --git a/storage/xpand/CMakeLists.txt b/storage/xpand/CMakeLists.txt new file mode 100644 index 00000000000..78fe16c0525 --- /dev/null +++ b/storage/xpand/CMakeLists.txt @@ -0,0 +1,24 @@ +#***************************************************************************** +# Copyright (c) 2019, MariaDB Corporation. +#****************************************************************************/ + +IF(MSVC) + # Temporarily disable "conversion from size_t .." + IF(CMAKE_SIZEOF_VOID_P EQUAL 8) + SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} /wd4267") + ENDIF() +ENDIF() + +SET(XPAND_PLUGIN_STATIC "xpand") +SET(XPAND_PLUGIN_DYNAMIC "ha_xpand") +SET(XPAND_SOURCES ha_xpand.cc xpand_connection.cc ha_xpand_pushdown.cc) +MYSQL_ADD_PLUGIN(xpand ${XPAND_SOURCES} STORAGE_ENGINE) + +IF(MSVC) + IF (CMAKE_BUILD_TYPE STREQUAL "Debug") + ADD_CUSTOM_COMMAND(TARGET xpand + POST_BUILD + COMMAND if not exist ..\\..\\sql\\lib mkdir ..\\..\\sql\\lib\\plugin + COMMAND copy Debug\\ha_xpand.dll ..\\..\\sql\\lib\\plugin\\ha_xpand.dll) + ENDIF() +ENDIF() diff --git a/storage/xpand/ha_xpand.cc b/storage/xpand/ha_xpand.cc new file mode 100644 index 00000000000..8a8366fa9f7 --- /dev/null +++ b/storage/xpand/ha_xpand.cc @@ -0,0 +1,1372 @@ +/***************************************************************************** +Copyright (c) 2019, MariaDB Corporation. +*****************************************************************************/ + +/** @file ha_xpand.cc */ + +#include "ha_xpand.h" +#include "ha_xpand_pushdown.h" +#include "key.h" + +handlerton *xpand_hton = NULL; + +int xpand_connect_timeout; +static MYSQL_SYSVAR_INT +( + connect_timeout, + xpand_connect_timeout, + PLUGIN_VAR_OPCMDARG, + "Timeout for connecting to Xpand", + NULL, NULL, -1, -1, 2147483647, 0 +); + +int xpand_read_timeout; +static MYSQL_SYSVAR_INT +( + read_timeout, + xpand_read_timeout, + PLUGIN_VAR_OPCMDARG, + "Timeout for receiving data from Xpand", + NULL, NULL, -1, -1, 2147483647, 0 +); + +int xpand_write_timeout; +static MYSQL_SYSVAR_INT +( + write_timeout, + xpand_write_timeout, + PLUGIN_VAR_OPCMDARG, + "Timeout for sending data to Xpand", + NULL, NULL, -1, -1, 2147483647, 0 +); + +char *xpand_host; +static MYSQL_SYSVAR_STR +( + host, + xpand_host, + PLUGIN_VAR_OPCMDARG | PLUGIN_VAR_MEMALLOC, + "Xpand host", + NULL, NULL, "127.0.0.1" +); + +int host_list_cnt; +char **host_list; + +static void free_host_list() +{ + if (host_list) { + for (int i = 0; host_list[i]; i++) + my_free(host_list[i]); + my_free(host_list); + host_list = NULL; + } +} + +static void update_host_list(char *xpand_host) +{ + free_host_list(); + + int cnt = 0; + for (char *p = xpand_host, *s = xpand_host; ; p++) { + if (*p == ',' || *p == '\0') { + if (p > s) { + cnt++; + } + if (!*p) + break; + s = p + 1; + } + } + + DBUG_PRINT("host_cnt", ("%d", cnt)); + host_list = (char **)my_malloc(sizeof(char *) * cnt+1, MYF(MY_WME)); + host_list[cnt] = 0; + host_list_cnt = cnt; + + int i = 0; + for (char *p = xpand_host, *s = xpand_host; ; p++) { + if (*p == ',' || *p == '\0') { + if (p > s) { + char *host = (char *)my_malloc(p - s + 1, MYF(MY_WME)); + host[p-s] = '\0'; + memcpy(host, s, p-s); + DBUG_PRINT("host", ("%s", host)); + host_list[i++] = host; + } + if (!*p) + break; + s = p + 1; + } + } + + DBUG_PRINT("xpand_host", ("%s", xpand_host)); +} + +char *xpand_username; +static MYSQL_SYSVAR_STR +( + username, + xpand_username, + PLUGIN_VAR_OPCMDARG | PLUGIN_VAR_MEMALLOC, + "Xpand user name", + NULL, NULL, "root" +); + +char *xpand_password; +static MYSQL_SYSVAR_STR +( + password, + xpand_password, + PLUGIN_VAR_OPCMDARG | PLUGIN_VAR_MEMALLOC, + "Xpand password", + NULL, NULL, "" +); + +uint xpand_port; +static MYSQL_SYSVAR_UINT +( + port, + xpand_port, + PLUGIN_VAR_RQCMDARG, + "Xpand port", + NULL, NULL, MYSQL_PORT_DEFAULT, MYSQL_PORT_DEFAULT, 65535, 0 +); + +char *xpand_socket; +static MYSQL_SYSVAR_STR +( + socket, + xpand_socket, + PLUGIN_VAR_OPCMDARG | PLUGIN_VAR_MEMALLOC, + "Xpand socket", + NULL, NULL, "" +); + +static MYSQL_THDVAR_UINT +( + row_buffer, + PLUGIN_VAR_RQCMDARG, + "Xpand rowstore row buffer size", + NULL, NULL, 20, 1, 65535, 0 +); + +// Per thread select handler knob +static MYSQL_THDVAR_BOOL( + select_handler, + PLUGIN_VAR_NOCMDARG, + "", + NULL, + NULL, + 1 +); + +// Per thread derived handler knob +static MYSQL_THDVAR_BOOL( + derived_handler, + PLUGIN_VAR_NOCMDARG, + "", + NULL, + NULL, + 1 +); + +static MYSQL_THDVAR_BOOL( + enable_direct_update, + PLUGIN_VAR_NOCMDARG, + "", + NULL, + NULL, + 1 +); + +bool select_handler_setting(THD* thd) +{ + return ( thd == NULL ) ? false : THDVAR(thd, select_handler); +} + +bool derived_handler_setting(THD* thd) +{ + return ( thd == NULL ) ? false : THDVAR(thd, derived_handler); +} + +uint row_buffer_setting(THD* thd) +{ + return THDVAR(thd, row_buffer); +} + +/**************************************************************************** +** Utility functions +****************************************************************************/ +// This is a wastefull aproach but better then fixed sized buffer. +size_t estimate_row_size(TABLE *table) +{ + size_t row_size = 0; + size_t null_byte_count = (bitmap_bits_set(table->write_set) + 7) / 8; + row_size += null_byte_count; + Field **p_field= table->field, *field; + for ( ; (field= *p_field) ; p_field++) { + row_size += field->max_data_length(); + } + return row_size; +} + +/** + * @brief + * Decodes object name. + * + * @details + * Replaces the encoded object name in the path with a decoded variant, + * e.g if path contains ./test/d@0024. This f() makes it ./test/d$ + * + * Used in delete and rename DDL processing. + **/ +static void decode_objectname(char *buf, const char *path, size_t buf_size) +{ + size_t new_path_len = filename_to_tablename(path, buf, buf_size); + buf[new_path_len] = '\0'; +} + +static void decode_file_path(const char *path, char *decoded_dbname, + char *decoded_tbname) +{ + // The format cont ains './' in the beginning of a path. + char *dbname_start = (char*) path + 2; + char *dbname_end = dbname_start; + while (*dbname_end != '/') + dbname_end++; + + int cnt = dbname_end - dbname_start; + char *dbname = (char *)my_alloca(cnt + 1); + memcpy(dbname, dbname_start, cnt); + dbname[cnt] = '\0'; + decode_objectname(decoded_dbname, dbname, FN_REFLEN); + my_afree(dbname); + + char *tbname_start = dbname_end + 1; + decode_objectname(decoded_tbname, tbname_start, FN_REFLEN); +} + +xpand_connection *get_trx(THD *thd, int *error_code) +{ + *error_code = 0; + xpand_connection *trx; + if (!(trx = (xpand_connection *)thd_get_ha_data(thd, xpand_hton))) + { + if (!(trx = new xpand_connection())) { + *error_code = HA_ERR_OUT_OF_MEM; + return NULL; + } + + *error_code = trx->connect(); + if (*error_code) { + delete trx; + return NULL; + } + + thd_set_ha_data(thd, xpand_hton, trx); + } + + return trx; +} +/**************************************************************************** +** Class ha_xpand +****************************************************************************/ + +ha_xpand::ha_xpand(handlerton *hton, TABLE_SHARE *table_arg) + : handler(hton, table_arg) +{ + DBUG_ENTER("ha_xpand::ha_xpand"); + rgi = NULL; + scan_cur = NULL; + xpand_table_oid = 0; + upsert_flag = 0; + DBUG_VOID_RETURN; +} + +ha_xpand::~ha_xpand() +{ + if (rgi) + remove_current_table_from_rpl_table_list(rgi); +} + +int ha_xpand::create(const char *name, TABLE *form, HA_CREATE_INFO *info) +{ + int error_code; + THD *thd = ha_thd(); + xpand_connection *trx = get_trx(thd, &error_code); + if (!trx) + return error_code; + + enum tmp_table_type saved_tmp_table_type = form->s->tmp_table; + Table_specification_st *create_info = &thd->lex->create_info; + const bool is_tmp_table = info->options & HA_LEX_CREATE_TMP_TABLE; + String create_table_stmt; + + /* Create a copy of the CREATE TABLE statement */ + if (!is_tmp_table) + form->s->tmp_table = NO_TMP_TABLE; + const char *old_dbstr = thd->db.str; + thd->db.str = NULL; + ulong old = create_info->used_fields; + create_info->used_fields &= ~HA_CREATE_USED_ENGINE; + + TABLE_LIST table_list; + memset(&table_list, 0, sizeof(table_list)); + table_list.table = form; + error_code = show_create_table(thd, &table_list, &create_table_stmt, + create_info, WITH_DB_NAME); + + if (!is_tmp_table) + form->s->tmp_table = saved_tmp_table_type; + create_info->used_fields = old; + thd->db.str = old_dbstr; + if (error_code) + return error_code; + + // To syncronize the schemas of MDB FE and XPD BE. + if (form->s && form->s->db.length) { + String createdb_stmt; + createdb_stmt.append("CREATE DATABASE IF NOT EXISTS `"); + createdb_stmt.append(form->s->db.str, form->s->db.length); + createdb_stmt.append("`"); + trx->run_query(createdb_stmt); + } + + error_code = trx->run_query(create_table_stmt); + return error_code; +} + +int ha_xpand::delete_table(const char *path) +{ + int error_code; + THD *thd = ha_thd(); + xpand_connection *trx = get_trx(thd, &error_code); + if (!trx) + return error_code; + + char decoded_dbname[FN_REFLEN]; + char decoded_tbname[FN_REFLEN]; + decode_file_path(path, decoded_dbname, decoded_tbname); + + String delete_cmd; + delete_cmd.append("DROP TABLE `"); + delete_cmd.append(decoded_dbname); + delete_cmd.append("`.`"); + delete_cmd.append(decoded_tbname); + delete_cmd.append("`"); + + return trx->run_query(delete_cmd); +} + +int ha_xpand::rename_table(const char* from, const char* to) +{ + int error_code; + THD *thd = ha_thd(); + xpand_connection *trx = get_trx(thd, &error_code); + if (!trx) + return error_code; + + char decoded_from_dbname[FN_REFLEN]; + char decoded_from_tbname[FN_REFLEN]; + decode_file_path(from, decoded_from_dbname, decoded_from_tbname); + + char decoded_to_dbname[FN_REFLEN]; + char decoded_to_tbname[FN_REFLEN]; + decode_file_path(to, decoded_to_dbname, decoded_to_tbname); + + String rename_cmd; + rename_cmd.append("RENAME TABLE `"); + rename_cmd.append(decoded_from_dbname); + rename_cmd.append("`.`"); + rename_cmd.append(decoded_from_tbname); + rename_cmd.append("` TO `"); + rename_cmd.append(decoded_to_dbname); + rename_cmd.append("`.`"); + rename_cmd.append(decoded_to_tbname); + rename_cmd.append("`;"); + + return trx->run_query(rename_cmd); +} + +static void +xpand_mark_table_for_discovery(TABLE *table) +{ + table->s->tabledef_version.str = NULL; + table->s->tabledef_version.length = 0; + table->m_needs_reopen = TRUE; +} + +int ha_xpand::open(const char *name, int mode, uint test_if_locked) +{ + DBUG_ENTER("ha_xpand::open"); + DBUG_PRINT("oid", + ("%s", table->s->tabledef_version.str)); + + if (!table->s->tabledef_version.str) + DBUG_RETURN(HA_ERR_TABLE_DEF_CHANGED); + if (!xpand_table_oid) + xpand_table_oid = atoll((const char *)table->s->tabledef_version.str); + + // Surrogate key marker + has_hidden_key = table->s->primary_key == MAX_KEY; + if (has_hidden_key) { + ref_length = 8; + } else { + KEY* key_info = table->key_info + table->s->primary_key; + ref_length = key_info->key_length; + } + + DBUG_PRINT("open finished", + ("oid: %llu, ref_length: %u", xpand_table_oid, ref_length)); + DBUG_RETURN(0); +} + +int ha_xpand::close(void) +{ + return 0; +} + +int ha_xpand::reset() +{ + upsert_flag &= ~XPAND_BULK_UPSERT; + upsert_flag &= ~XPAND_HAS_UPSERT; + upsert_flag &= ~XPAND_UPSERT_SENT; + xpd_lock_type = XPAND_NO_LOCKS; + return 0; +} + +int ha_xpand::extra(enum ha_extra_function operation) +{ + DBUG_ENTER("ha_xpand::extra"); + if (operation == HA_EXTRA_INSERT_WITH_UPDATE) + upsert_flag |= XPAND_HAS_UPSERT; + DBUG_RETURN(0); +} + +/*@brief UPSERT State Machine*/ +/************************************************************* + * DESCRIPTION: + * Fasttrack for UPSERT sends queries down to a XPD backend. + * UPSERT could be of two kinds: singular and bulk. The plugin + * re-/sets XPAND_BULK_UPSERT in end|start_bulk_insert + * methods. XPAND_UPSERT_SENT is used to avoid multiple + * execution at XPD backend. + * Generic XPAND_HAS_UPSERT is set for bulk UPSERT only b/c + * MDB calls write_row only once. + ************************************************************/ +int ha_xpand::write_row(const uchar *buf) +{ + int error_code = 0; + THD *thd = ha_thd(); + xpand_connection *trx = get_trx(thd, &error_code); + if (!trx) + return error_code; + + if (upsert_flag & XPAND_HAS_UPSERT) { + if (!(upsert_flag & XPAND_UPSERT_SENT)) { + ha_rows update_rows; + String update_stmt; + update_stmt.append(thd->query_string.str()); + + if (!thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) + trx->auto_commit_next(); + + error_code= trx->update_query(update_stmt, table->s->db, &update_rows); + if (upsert_flag & XPAND_BULK_UPSERT) + upsert_flag |= XPAND_UPSERT_SENT; + else + upsert_flag &= ~XPAND_HAS_UPSERT; + } + + return error_code; + } + + /* Convert the row format to binlog (packed) format */ + uchar *packed_new_row = (uchar*) my_alloca(estimate_row_size(table)); + size_t packed_size = pack_row(table, table->write_set, packed_new_row, buf); + + /* XXX: Xpand may needs to return HA_ERR_AUTOINC_ERANGE if we hit that + error. */ + ulonglong last_insert_id = 0; + if ((error_code = trx->write_row(xpand_table_oid, + packed_new_row, packed_size, + &last_insert_id))) + goto err; + + if (table->next_number_field) + insert_id_for_cur_row = last_insert_id; + +err: + if (error_code == HA_ERR_TABLE_DEF_CHANGED) + xpand_mark_table_for_discovery(table); + + if (packed_size) + my_afree(packed_new_row); + + return error_code; +} + +int ha_xpand::update_row(const uchar *old_data, const uchar *new_data) +{ + DBUG_ENTER("ha_xpand::update_row"); + int error_code; + THD *thd = ha_thd(); + xpand_connection *trx = get_trx(thd, &error_code); + if (!trx) + DBUG_RETURN(error_code); + + size_t row_size = estimate_row_size(table); + size_t packed_key_len; + uchar *packed_key = (uchar*) my_alloca(row_size); + build_key_packed_row(table->s->primary_key, old_data, + packed_key, &packed_key_len); + + uchar *packed_new_row = (uchar*) my_alloca(row_size); + size_t packed_new_size = pack_row(table, table->write_set, packed_new_row, + new_data); + + /* Send the packed rows to Xpand */ + error_code = trx->key_update(xpand_table_oid, packed_key, packed_key_len, + table->write_set, + packed_new_row, packed_new_size); + + if(packed_key) + my_afree(packed_key); + + if(packed_new_row) + my_afree(packed_new_row); + + if (error_code == HA_ERR_TABLE_DEF_CHANGED) + xpand_mark_table_for_discovery(table); + + DBUG_RETURN(error_code); +} + +int ha_xpand::direct_update_rows_init(List *update_fields) +{ + DBUG_ENTER("ha_xpand::direct_update_rows_init"); + THD *thd= ha_thd(); + if (!THDVAR(thd, enable_direct_update)) + DBUG_RETURN(HA_ERR_WRONG_COMMAND); + DBUG_RETURN(0); +} + +int ha_xpand::direct_update_rows(ha_rows *update_rows, ha_rows *found_rows) +{ + DBUG_ENTER("ha_xpand::direct_update_rows"); + int error_code= 0; + THD *thd= ha_thd(); + xpand_connection *trx= get_trx(thd, &error_code); + if (!trx) + return error_code; + + String update_stmt; + update_stmt.append(thd->query_string.str()); + + if (!thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) + trx->auto_commit_next(); + + error_code = trx->update_query(update_stmt, table->s->db, update_rows); + *found_rows = *update_rows; + DBUG_RETURN(error_code); +} + +void ha_xpand::start_bulk_insert(ha_rows rows, uint flags) +{ + DBUG_ENTER("ha_xpand::start_bulk_insert"); + int error_code= 0; + THD *thd= ha_thd(); + xpand_connection *trx= get_trx(thd, &error_code); + if (!trx) { + // TBD log this + DBUG_VOID_RETURN; + } + + upsert_flag |= XPAND_BULK_UPSERT; + + DBUG_VOID_RETURN; +} + +int ha_xpand::end_bulk_insert() +{ + DBUG_ENTER("ha_xpand::end_bulk_insert"); + upsert_flag &= ~XPAND_BULK_UPSERT; + upsert_flag &= ~XPAND_HAS_UPSERT; + upsert_flag &= ~XPAND_UPSERT_SENT; + DBUG_RETURN(0); +} + +int ha_xpand::delete_row(const uchar *buf) +{ + int error_code; + THD *thd = ha_thd(); + xpand_connection *trx = get_trx(thd, &error_code); + if (!trx) + return error_code; + + // The estimate should consider only key fields widths. + size_t packed_key_len; + uchar *packed_key = (uchar*) my_alloca(estimate_row_size(table)); + build_key_packed_row(table->s->primary_key, buf, packed_key, &packed_key_len); + + error_code = trx->key_delete(xpand_table_oid, packed_key, packed_key_len); + + if (error_code == HA_ERR_TABLE_DEF_CHANGED) + xpand_mark_table_for_discovery(table); + + if (packed_key) + my_afree(packed_key); + + return error_code; +} + +ha_xpand::Table_flags ha_xpand::table_flags(void) const +{ + Table_flags flags = HA_PARTIAL_COLUMN_READ | + HA_REC_NOT_IN_SEQ | + HA_FAST_KEY_READ | + HA_NULL_IN_KEY | + HA_CAN_INDEX_BLOBS | + HA_AUTO_PART_KEY | + HA_CAN_SQL_HANDLER | + HA_BINLOG_STMT_CAPABLE | + HA_CAN_TABLE_CONDITION_PUSHDOWN | + HA_CAN_DIRECT_UPDATE_AND_DELETE; + + return flags; +} + +ulong ha_xpand::index_flags(uint idx, uint part, bool all_parts) const +{ + ulong flags = HA_READ_NEXT | + HA_READ_PREV | + HA_READ_ORDER | + HA_READ_RANGE; + + return flags; +} + +ha_rows ha_xpand::records() +{ + return 10000; +} + +ha_rows ha_xpand::records_in_range(uint inx, key_range *min_key, + key_range *max_key) +{ + return 2; +} + +int ha_xpand::info(uint flag) +{ + //THD *thd = ha_thd(); + if (flag & HA_STATUS_TIME) + { + /* Retrieve the time of the most recent update to the table */ + // stats.update_time = + } + + if (flag & HA_STATUS_AUTO) + { + /* Retrieve the latest auto_increment value */ + stats.auto_increment_value = next_insert_id; + } + + if (flag & HA_STATUS_VARIABLE) + { + /* Retrieve variable info, such as row counts and file lengths */ + stats.records = records(); + stats.deleted = 0; + // stats.data_file_length = + // stats.index_file_length = + // stats.delete_length = + stats.check_time = 0; + // stats.mrr_length_per_rec = + + if (stats.records == 0) + stats.mean_rec_length = 0; + else + stats.mean_rec_length = (ulong) + (stats.data_file_length / stats.records); + } + + if (flag & HA_STATUS_CONST) + { + /* + Retrieve constant info, such as file names, max file lengths, + create time, block size + */ + // stats.max_data_file_length = + // stats.create_time = + // stats.block_size = + } + + return 0; +} + +int ha_xpand::index_init(uint idx, bool sorted) +{ + int error_code = 0; + THD *thd = ha_thd(); + xpand_connection *trx = get_trx(thd, &error_code); + if (!trx) + return error_code; + + active_index = idx; + add_current_table_to_rpl_table_list(&rgi, thd, table); + scan_cur = NULL; + + /* Return all columns until there is a better understanding of + requirements. */ + if (my_bitmap_init(&scan_fields, NULL, table->read_set->n_bits, false)) + return ER_OUTOFMEMORY; + bitmap_set_all(&scan_fields); + sorted_scan = sorted; + + return 0; +} + +int ha_xpand::index_read(uchar * buf, const uchar * key, uint key_len, + enum ha_rkey_function find_flag) +{ + DBUG_ENTER("ha_xpand::index_read"); + int error_code = 0; + THD *thd = ha_thd(); + xpand_connection *trx = get_trx(thd, &error_code); + if (!trx) + DBUG_RETURN(error_code); + + key_restore(buf, key, &table->key_info[active_index], key_len); + // The estimate should consider only key fields widths. + size_t packed_key_len; + uchar *packed_key = (uchar*) my_alloca(estimate_row_size(table)); + build_key_packed_row(active_index, buf, packed_key, &packed_key_len); + + bool exact = false; + xpand_connection::scan_type st; + switch (find_flag) { + case HA_READ_KEY_EXACT: + exact = true; + break; + case HA_READ_KEY_OR_NEXT: + st = xpand_connection::READ_KEY_OR_NEXT; + break; + case HA_READ_KEY_OR_PREV: + st = xpand_connection::READ_KEY_OR_PREV; + break; + case HA_READ_AFTER_KEY: + st = xpand_connection::READ_AFTER_KEY; + break; + case HA_READ_BEFORE_KEY: + st = xpand_connection::READ_BEFORE_KEY; + break; + case HA_READ_PREFIX: + case HA_READ_PREFIX_LAST: + case HA_READ_PREFIX_LAST_OR_PREV: + case HA_READ_MBR_CONTAIN: + case HA_READ_MBR_INTERSECT: + case HA_READ_MBR_WITHIN: + case HA_READ_MBR_DISJOINT: + case HA_READ_MBR_EQUAL: + DBUG_RETURN(ER_NOT_SUPPORTED_YET); + } + + uchar *rowdata = NULL; + if (exact) { + is_scan = false; + ulong rowdata_length; + error_code = trx->key_read(xpand_table_oid, 0, xpd_lock_type, + table->read_set, packed_key, packed_key_len, + &rowdata, &rowdata_length); + if (!error_code) + error_code = unpack_row_to_buf(rgi, table, buf, rowdata, + table->read_set, + rowdata + rowdata_length); + } else { + is_scan = true; + error_code = trx->scan_from_key(xpand_table_oid, active_index, + xpd_lock_type, st, -1, sorted_scan, + &scan_fields, packed_key, packed_key_len, + THDVAR(thd, row_buffer), &scan_cur); + if (!error_code) + error_code = rnd_next(buf); + } + + if (rowdata) + my_free(rowdata); + + if (packed_key) + my_afree(packed_key); + + if (error_code == HA_ERR_TABLE_DEF_CHANGED) + xpand_mark_table_for_discovery(table); + + DBUG_RETURN(error_code); +} + +int ha_xpand::index_first(uchar *buf) +{ + DBUG_ENTER("ha_xpand::index_first"); + int error_code = 0; + THD *thd = ha_thd(); + xpand_connection *trx = get_trx(thd, &error_code); + if (!trx) + DBUG_RETURN(error_code); + + error_code = trx->scan_from_key(xpand_table_oid, active_index, + xpd_lock_type, + xpand_connection::READ_FROM_START, + -1, sorted_scan, &scan_fields, NULL, 0, + THDVAR(thd, row_buffer), &scan_cur); + + if (error_code == HA_ERR_TABLE_DEF_CHANGED) + xpand_mark_table_for_discovery(table); + + if (error_code) + DBUG_RETURN(error_code); + + DBUG_RETURN(rnd_next(buf)); +} + +int ha_xpand::index_last(uchar *buf) +{ + DBUG_ENTER("ha_xpand::index_last"); + int error_code = 0; + THD *thd = ha_thd(); + xpand_connection *trx = get_trx(thd, &error_code); + if (!trx) + DBUG_RETURN(error_code); + + error_code = trx->scan_from_key(xpand_table_oid, active_index, + xpd_lock_type, + xpand_connection::READ_FROM_LAST, + -1, sorted_scan, &scan_fields, NULL, 0, + THDVAR(thd, row_buffer), &scan_cur); + + if (error_code == HA_ERR_TABLE_DEF_CHANGED) + xpand_mark_table_for_discovery(table); + + if (error_code) + DBUG_RETURN(error_code); + + DBUG_RETURN(rnd_next(buf)); +} + +int ha_xpand::index_next(uchar *buf) +{ + DBUG_ENTER("index_next"); + DBUG_RETURN(rnd_next(buf)); +} + +#if 0 +int ha_xpand::index_next_same(uchar *buf, const uchar *key, uint keylen) +{ + DBUG_ENTER("index_next_same"); + DBUG_RETURN(rnd_next(buf)); +} +#endif + +int ha_xpand::index_prev(uchar *buf) +{ + DBUG_ENTER("index_prev"); + DBUG_RETURN(rnd_next(buf)); +} + +int ha_xpand::index_end() +{ + DBUG_ENTER("index_prev"); + if (scan_cur) + DBUG_RETURN(rnd_end()); + else + DBUG_RETURN(0); +} + +int ha_xpand::rnd_init(bool scan) +{ + DBUG_ENTER("ha_xpand::rnd_init"); + int error_code = 0; + THD *thd = ha_thd(); + if (thd->lex->sql_command == SQLCOM_UPDATE) + DBUG_RETURN(error_code); + xpand_connection *trx = get_trx(thd, &error_code); + if (!trx) + DBUG_RETURN(error_code); + + add_current_table_to_rpl_table_list(&rgi, thd, table); + is_scan = scan; + scan_cur = NULL; + + if (my_bitmap_init(&scan_fields, NULL, table->read_set->n_bits, false)) + DBUG_RETURN(ER_OUTOFMEMORY); + +#if 0 + if (table->s->keys) + table->mark_columns_used_by_index(table->s->primary_key, &scan_fields); + else + bitmap_clear_all(&scan_fields); + + bitmap_union(&scan_fields, table->read_set); +#else + /* Why is read_set not setup correctly? */ + bitmap_set_all(&scan_fields); +#endif + + error_code = trx->scan_table(xpand_table_oid, xpd_lock_type, + &scan_fields, THDVAR(thd, row_buffer), + &scan_cur); + + if (error_code == HA_ERR_TABLE_DEF_CHANGED) + xpand_mark_table_for_discovery(table); + + if (error_code) + DBUG_RETURN(error_code); + + DBUG_RETURN(0); +} + +int ha_xpand::rnd_next(uchar *buf) +{ + int error_code = 0; + THD *thd = ha_thd(); + xpand_connection *trx = get_trx(thd, &error_code); + if (!trx) + return error_code; + + assert(is_scan); + assert(scan_cur); + + uchar *rowdata; + ulong rowdata_length; + if ((error_code = trx->scan_next(scan_cur, &rowdata, &rowdata_length))) + return error_code; + + if (has_hidden_key) { + last_hidden_key = *(ulonglong *)rowdata; + rowdata += 8; + rowdata_length -= 8; + } + + error_code = unpack_row_to_buf(rgi, table, buf, rowdata, &scan_fields, + rowdata + rowdata_length); + + if (error_code) + return error_code; + + return 0; +} + +int ha_xpand::rnd_pos(uchar * buf, uchar *pos) +{ + DBUG_ENTER("xpd_rnd_pos"); + DBUG_DUMP("pos", pos, ref_length); + + int error_code = 0; + THD *thd = ha_thd(); + xpand_connection *trx = get_trx(thd, &error_code); + if (!trx) + DBUG_RETURN(error_code); + + /* WDD: We need a way to convert key buffers directy to rbr buffers. */ + + if (has_hidden_key) { + memcpy(&last_hidden_key, pos, sizeof(ulonglong)); + } else { + uint keyno = table->s->primary_key; + uint len = calculate_key_len(table, keyno, pos, + table->const_key_parts[keyno]); + key_restore(buf, pos, &table->key_info[keyno], len); + } + + // The estimate should consider only key fields widths. + uchar *packed_key = (uchar*) my_alloca(estimate_row_size(table)); + size_t packed_key_len; + build_key_packed_row(table->s->primary_key, buf, packed_key, &packed_key_len); + + uchar *rowdata = NULL; + ulong rowdata_length; + if ((error_code = trx->key_read(xpand_table_oid, 0, xpd_lock_type, + table->read_set, packed_key, packed_key_len, + &rowdata, &rowdata_length))) + goto err; + + if ((error_code = unpack_row_to_buf(rgi, table, buf, rowdata, table->read_set, + rowdata + rowdata_length))) + goto err; + +err: + if (rowdata) + my_free(rowdata); + + if (packed_key) + my_afree(packed_key); + + if (error_code == HA_ERR_TABLE_DEF_CHANGED) + xpand_mark_table_for_discovery(table); + + DBUG_RETURN(error_code); +} + +int ha_xpand::rnd_end() +{ + DBUG_ENTER("ha_xpand::rnd_end()"); + int error_code = 0; + THD *thd = ha_thd(); + if (thd->lex->sql_command == SQLCOM_UPDATE) + DBUG_RETURN(error_code); + + xpand_connection *trx = get_trx(thd, &error_code); + if (!trx) + DBUG_RETURN(error_code); + + my_bitmap_free(&scan_fields); + if (scan_cur && (error_code = trx->scan_end(scan_cur))) + DBUG_RETURN(error_code); + scan_cur = NULL; + + DBUG_RETURN(0); +} + +void ha_xpand::position(const uchar *record) +{ + DBUG_ENTER("xpd_position"); + if (has_hidden_key) { + memcpy(ref, &last_hidden_key, sizeof(ulonglong)); + } else { + KEY* key_info = table->key_info + table->s->primary_key; + key_copy(ref, record, key_info, key_info->key_length); + } + DBUG_DUMP("key", ref, ref_length); + DBUG_VOID_RETURN; +} + +uint ha_xpand::lock_count(void) const +{ + /* Hopefully, we don't need to use thread locks */ + return 0; +} + +THR_LOCK_DATA **ha_xpand::store_lock(THD *thd, + THR_LOCK_DATA **to, + enum thr_lock_type lock_type) +{ + /* Hopefully, we don't need to use thread locks */ + return to; +} + +int ha_xpand::external_lock(THD *thd, int lock_type) +{ + DBUG_ENTER("ha_xpand::external_lock()"); + int error_code; + xpand_connection *trx = get_trx(thd, &error_code); + if (error_code) + DBUG_RETURN(error_code); + + if (lock_type == F_WRLCK) + xpd_lock_type = XPAND_EXCLUSIVE; + else if (lock_type == F_RDLCK) + xpd_lock_type = XPAND_SHARED; + else if (lock_type == F_UNLCK) + xpd_lock_type = XPAND_NO_LOCKS; + + if (lock_type != F_UNLCK) { + if (!trx->has_open_transaction()) { + error_code = trx->begin_transaction_next(); + if (error_code) + DBUG_RETURN(error_code); + } + + trans_register_ha(thd, FALSE, xpand_hton); + if (thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) + trans_register_ha(thd, TRUE, xpand_hton); + } + + DBUG_RETURN(error_code); +} + +/**************************************************************************** + Engine Condition Pushdown +****************************************************************************/ + +const COND *ha_xpand::cond_push(const COND *cond) +{ + return cond; +} + +void ha_xpand::cond_pop() +{ +} + +int ha_xpand::info_push(uint info_type, void *info) +{ + return 0; +} + +/**************************************************************************** +** Row encoding functions +****************************************************************************/ + +void add_current_table_to_rpl_table_list(rpl_group_info **_rgi, THD *thd, + TABLE *table) +{ + if (*_rgi) + return; + + Relay_log_info *rli = new Relay_log_info(FALSE); + rli->sql_driver_thd = thd; + + rpl_group_info *rgi = new rpl_group_info(rli); + *_rgi = rgi; + rgi->thd = thd; + rgi->tables_to_lock_count = 0; + rgi->tables_to_lock = NULL; + if (rgi->tables_to_lock_count) + return; + + rgi->tables_to_lock = (RPL_TABLE_LIST *)my_malloc(sizeof(RPL_TABLE_LIST), + MYF(MY_WME)); + rgi->tables_to_lock->init_one_table(&table->s->db, &table->s->table_name, 0, + TL_READ); + rgi->tables_to_lock->table = table; + rgi->tables_to_lock->table_id = table->tablenr; + rgi->tables_to_lock->m_conv_table = NULL; + rgi->tables_to_lock->master_had_triggers = FALSE; + rgi->tables_to_lock->m_tabledef_valid = TRUE; + // We need one byte per column to save a column's binlog type. + uchar *col_type = (uchar*) my_alloca(table->s->fields); + for (uint i = 0 ; i < table->s->fields ; ++i) + col_type[i] = table->field[i]->binlog_type(); + + table_def *tabledef = &rgi->tables_to_lock->m_tabledef; + new (tabledef) table_def(col_type, table->s->fields, NULL, 0, NULL, 0); + rgi->tables_to_lock_count++; + if (col_type) + my_afree(col_type); +} + +void remove_current_table_from_rpl_table_list(rpl_group_info *rgi) +{ + if (!rgi->tables_to_lock) + return; + + rgi->tables_to_lock->m_tabledef.table_def::~table_def(); + rgi->tables_to_lock->m_tabledef_valid = FALSE; + my_free(rgi->tables_to_lock); + rgi->tables_to_lock_count--; + rgi->tables_to_lock = NULL; + delete rgi->rli; + delete rgi; +} + +void ha_xpand::build_key_packed_row(uint index, const uchar *buf, + uchar *packed_key, + size_t *packed_key_len) +{ + if (index == table->s->primary_key && has_hidden_key) { + memcpy(packed_key, &last_hidden_key, sizeof(ulonglong)); + *packed_key_len = sizeof(ulonglong); + } else { + // make a row from the table + table->mark_columns_used_by_index(index, &table->tmp_set); + *packed_key_len = pack_row(table, &table->tmp_set, packed_key, buf); + } +} + +int unpack_row_to_buf(rpl_group_info *rgi, TABLE *table, uchar *data, + uchar const *const row_data, MY_BITMAP const *cols, + uchar const *const row_end) +{ + /* Since unpack_row can only write to record[0], if 'data' does not point to + table->record[0], we must back it up and then restore it afterwards. */ + uchar const *current_row_end; + ulong master_reclength; + uchar *backup_row = NULL; + if (data != table->record[0]) { + /* See Update_rows_log_event::do_exec_row(rpl_group_info *rgi) + and the definitions of store_record and restore_record. */ + backup_row = (uchar*) my_alloca(table->s->reclength); + memcpy(backup_row, table->record[0], table->s->reclength); + restore_record(table, record[data == table->record[1] ? 1 : 2]); + } + + int error_code = unpack_row(rgi, table, table->s->fields, row_data, cols, + ¤t_row_end, &master_reclength, row_end); + + if (backup_row) { + store_record(table, record[data == table->record[1] ? 1 : 2]); + memcpy(table->record[0], backup_row, table->s->reclength); + my_afree(backup_row); + } + + return error_code; +} + +/**************************************************************************** +** Plugin Functions +****************************************************************************/ + +static int xpand_commit(handlerton *hton, THD *thd, bool all) +{ + xpand_connection* trx = (xpand_connection *) thd_get_ha_data(thd, hton); + assert(trx); + + int error_code = 0; + if (trx->has_open_transaction()) { + if (all || !thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) + error_code = trx->commit_transaction(); + else + error_code = trx->new_statement_next(); + } + + return error_code; +} + +static int xpand_rollback(handlerton *hton, THD *thd, bool all) +{ + xpand_connection* trx = (xpand_connection *) thd_get_ha_data(thd, hton); + assert(trx); + + int error_code = 0; + if (trx->has_open_transaction()) { + if (all || !thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) + error_code = trx->rollback_transaction(); + else + error_code = trx->rollback_statement_next(); + } + + return error_code; +} + +static handler* xpand_create_handler(handlerton *hton, TABLE_SHARE *table, + MEM_ROOT *mem_root) +{ + return new (mem_root) ha_xpand(hton, table); +} + +static int xpand_close_connection(handlerton* hton, THD* thd) +{ + xpand_connection* trx = (xpand_connection *) thd_get_ha_data(thd, hton); + if (!trx) + return 0; /* Transaction is not started */ + + int error_code = xpand_rollback(xpand_hton, thd, TRUE); + + delete trx; + + return error_code; +} + +static int xpand_panic(handlerton *hton, ha_panic_function type) +{ + return 0; +} + +static bool xpand_show_status(handlerton *hton, THD *thd, + stat_print_fn *stat_print, + enum ha_stat_type stat_type) +{ + return FALSE; +} + +static int xpand_discover_table_names(handlerton *hton, LEX_CSTRING *db, + MY_DIR *dir, + handlerton::discovered_list *result) +{ + xpand_connection *xpand_net = new xpand_connection(); + int error_code = xpand_net->connect(); + if (error_code) + goto err; + + xpand_net->populate_table_list(db, result); + +err: + delete xpand_net; + return error_code; +} + +int xpand_discover_table(handlerton *hton, THD *thd, TABLE_SHARE *share) +{ + xpand_connection *xpand_net = new xpand_connection(); + int error_code = xpand_net->connect(); + if (error_code) + goto err; + + error_code = xpand_net->discover_table_details(&share->db, + &share->table_name, + thd, share); + +err: + delete xpand_net; + return error_code; +} + +static int xpand_init(void *p) +{ + DBUG_ENTER("xpand_init"); + xpand_hton = (handlerton *) p; + xpand_hton->flags = HTON_NO_FLAGS; + xpand_hton->panic = xpand_panic; + xpand_hton->close_connection = xpand_close_connection; + xpand_hton->commit = xpand_commit; + xpand_hton->rollback = xpand_rollback; + xpand_hton->create = xpand_create_handler; + xpand_hton->show_status = xpand_show_status; + xpand_hton->discover_table_names = xpand_discover_table_names; + xpand_hton->discover_table = xpand_discover_table; + xpand_hton->create_select = create_xpand_select_handler; + xpand_hton->create_derived = create_xpand_derived_handler; + + update_host_list(xpand_host); + + DBUG_RETURN(0); +} + +static int xpand_deinit(void *p) +{ + DBUG_ENTER("xpand_deinit"); + free_host_list(); + DBUG_RETURN(0); +} + +struct st_mysql_show_var xpand_status_vars[] = +{ + {NullS, NullS, SHOW_LONG} +}; + +static struct st_mysql_sys_var* xpand_system_variables[] = +{ + MYSQL_SYSVAR(connect_timeout), + MYSQL_SYSVAR(read_timeout), + MYSQL_SYSVAR(write_timeout), + MYSQL_SYSVAR(host), + MYSQL_SYSVAR(username), + MYSQL_SYSVAR(password), + MYSQL_SYSVAR(port), + MYSQL_SYSVAR(socket), + MYSQL_SYSVAR(row_buffer), + MYSQL_SYSVAR(select_handler), + MYSQL_SYSVAR(derived_handler), + MYSQL_SYSVAR(enable_direct_update), + NULL +}; + +static struct st_mysql_storage_engine xpand_storage_engine = + {MYSQL_HANDLERTON_INTERFACE_VERSION}; + +maria_declare_plugin(xpand) +{ + MYSQL_STORAGE_ENGINE_PLUGIN, /* Plugin Type */ + &xpand_storage_engine, /* Plugin Descriptor */ + "XPAND", /* Plugin Name */ + "MariaDB", /* Plugin Author */ + "Xpand storage engine", /* Plugin Description */ + PLUGIN_LICENSE_GPL, /* Plugin Licence */ + xpand_init, /* Plugin Entry Point */ + xpand_deinit, /* Plugin Deinitializer */ + 0x0001, /* Hex Version Number (0.1) */ + NULL /* xpand_status_vars */, /* Status Variables */ + xpand_system_variables, /* System Variables */ + "0.1", /* String Version */ + MariaDB_PLUGIN_MATURITY_EXPERIMENTAL /* Maturity Level */ +} +maria_declare_plugin_end; diff --git a/storage/xpand/ha_xpand.h b/storage/xpand/ha_xpand.h new file mode 100644 index 00000000000..48359af6470 --- /dev/null +++ b/storage/xpand/ha_xpand.h @@ -0,0 +1,130 @@ +/***************************************************************************** +Copyright (c) 2019, MariaDB Corporation. +*****************************************************************************/ + +#ifndef _ha_xpand_h +#define _ha_xpand_h + +#ifdef USE_PRAGMA_INTERFACE +#pragma interface /* gcc class implementation */ +#endif + +#define MYSQL_SERVER 1 +#include "xpand_connection.h" +#include "my_bitmap.h" +#include "table.h" +#include "rpl_rli.h" +#include "handler.h" +#include "sql_class.h" +#include "sql_show.h" +#include "mysql.h" +#include "../../sql/rpl_record.h" + +size_t estimate_row_size(TABLE *table); +xpand_connection *get_trx(THD *thd, int *error_code); +bool get_enable_sh(THD* thd); +void add_current_table_to_rpl_table_list(rpl_group_info **_rgi, THD *thd, + TABLE *table); +void remove_current_table_from_rpl_table_list(rpl_group_info *rgi); +int unpack_row_to_buf(rpl_group_info *rgi, TABLE *table, uchar *data, + uchar const *const row_data, MY_BITMAP const *cols, + uchar const *const row_end); + +class ha_xpand : public handler +{ +private: + ulonglong xpand_table_oid; + rpl_group_info *rgi; + + Field *auto_inc_field; + ulonglong auto_inc_value; + + bool has_hidden_key; + ulonglong last_hidden_key; + xpand_connection_cursor *scan_cur; + bool is_scan; + MY_BITMAP scan_fields; + bool sorted_scan; + xpand_lock_mode_t xpd_lock_type; + + uint last_dup_errkey; + + typedef enum xpand_upsert_flags { + XPAND_HAS_UPSERT= 1, + XPAND_BULK_UPSERT= 2, + XPAND_UPSERT_SENT= 4 + } xpd_upsert_flags_t; + int upsert_flag; + +public: + ha_xpand(handlerton *hton, TABLE_SHARE *table_arg); + ~ha_xpand(); + int create(const char *name, TABLE *form, HA_CREATE_INFO *info); + int delete_table(const char *name); + int rename_table(const char* from, const char* to); + int open(const char *name, int mode, uint test_if_locked); + int close(void); + int reset(); + int extra(enum ha_extra_function operation); + int write_row(const uchar *buf); + // start_bulk_update exec_bulk_update + int update_row(const uchar *old_data, const uchar *new_data); + // start_bulk_delete exec_bulk_delete + int delete_row(const uchar *buf); + int direct_update_rows_init(List *update_fields); + int direct_update_rows(ha_rows *update_rows, ha_rows *found_rows); + void start_bulk_insert(ha_rows rows, uint flags = 0); + int end_bulk_insert(); + + Table_flags table_flags(void) const; + ulong index_flags(uint idx, uint part, bool all_parts) const; + uint max_supported_keys() const { return MAX_KEY; } + + ha_rows records(); + ha_rows records_in_range(uint inx, key_range *min_key, + key_range *max_key); + + int info(uint flag); // see my_base.h for full description + + // multi_read_range + // read_range + int index_init(uint idx, bool sorted); + int index_read(uchar * buf, const uchar * key, uint key_len, + enum ha_rkey_function find_flag); + int index_first(uchar *buf); + int index_prev(uchar *buf); + int index_last(uchar *buf); + int index_next(uchar *buf); + //int index_next_same(uchar *buf, const uchar *key, uint keylen); + int index_end(); + + int rnd_init(bool scan); + int rnd_next(uchar *buf); + int rnd_pos(uchar * buf, uchar *pos); + int rnd_end(); + + void position(const uchar *record); + uint lock_count(void) const; + THR_LOCK_DATA **store_lock(THD *thd, + THR_LOCK_DATA **to, + enum thr_lock_type lock_type); + int external_lock(THD *thd, int lock_type); + + uint8 table_cache_type() + { + return(HA_CACHE_TBL_NOCACHE); + } + + const COND *cond_push(const COND *cond); + void cond_pop(); + int info_push(uint info_type, void *info); + +private: + void build_key_packed_row(uint index, const uchar *buf, + uchar *packed_key, size_t *packed_key_len); +}; + +bool select_handler_setting(THD* thd); +bool derived_handler_setting(THD* thd); +uint row_buffer_setting(THD* thd); +#endif // _ha_xpand_h diff --git a/storage/xpand/ha_xpand_pushdown.cc b/storage/xpand/ha_xpand_pushdown.cc new file mode 100644 index 00000000000..e53c4f445fa --- /dev/null +++ b/storage/xpand/ha_xpand_pushdown.cc @@ -0,0 +1,478 @@ +/***************************************************************************** +Copyright (c) 2019, MariaDB Corporation. +*****************************************************************************/ + +#include "ha_xpand.h" +#include "ha_xpand_pushdown.h" + +extern handlerton *xpand_hton; +extern uint xpand_row_buffer; + +/*@brief Fills up array data types, metadata and nullability*/ +/************************************************************ + * DESCRIPTION: + * Fills up three arrays with: field binlog data types, field + * metadata and nullability bitmask as in Table_map_log_event + * ctor. Internally creates a temporary table as does + * Pushdown_select. DH uses the actual temp table w/o + * b/c create_DH is called later compared to create_SH. + * More details in server/sql/log_event_server.cc + * PARAMETERS: + * thd - THD* + * table__ - TABLE* temp table for the results + * sl - SELECT_LEX* + * fieldtype - uchar* + * field_metadata - uchar* + * null_bits - uchar* + * num_null_bytes - null bit size + * fields_count - a number of fields + * RETURN: + * metadata_size int or -1 in case of error + ************************************************************/ +int get_field_types(THD *thd, TABLE *table__, SELECT_LEX *sl, uchar *fieldtype, + uchar *field_metadata, uchar *null_bits, const int num_null_bytes, const uint fields_count) +{ + int field_metadata_size = 0; + int metadata_index = 0; + TABLE *tmp_table= table__; + + if (!tmp_table) { + // Construct a tmp table with fields to find out result DTs. + // This should be reconsidered if it worths the effort. + List types; + TMP_TABLE_PARAM tmp_table_param; + sl->master_unit()->join_union_item_types(thd, types, 1); + tmp_table_param.init(); + tmp_table_param.field_count= types.elements; + + tmp_table = create_tmp_table(thd, &tmp_table_param, types, + (ORDER *) 0, false, 0, + TMP_TABLE_ALL_COLUMNS, 1, + &empty_clex_str, true, false); + if (!tmp_table) { + field_metadata_size = -1; + goto err; + } + } + + for (unsigned int i = 0 ; i < fields_count; ++i) { + fieldtype[i]= tmp_table->field[i]->binlog_type(); + } + + bzero(field_metadata, (fields_count * 2)); + for (unsigned int i= 0 ; i < fields_count ; i++) + { + Binlog_type_info bti= tmp_table->field[i]->binlog_type_info(); + uchar *ptr = reinterpret_cast(&bti.m_metadata); + memcpy(&field_metadata[metadata_index], ptr, bti.m_metadata_size); + metadata_index+= bti.m_metadata_size; + } + + if (metadata_index < 251) + field_metadata_size += metadata_index + 1; + else + field_metadata_size += metadata_index + 3; + + bzero(null_bits, num_null_bytes); + for (unsigned int i= 0 ; i < fields_count ; ++i) { + if (tmp_table->field[i]->maybe_null()) { + null_bits[(i / 8)]+= 1 << (i % 8); + } + } + + if (!table__) + free_tmp_table(thd, tmp_table); +err: + return field_metadata_size; +} + + +/*@brief create_xpand_select_handler- Creates handler*/ +/************************************************************ + * DESCRIPTION: + * Creates a select handler + * More details in server/sql/select_handler.h + * PARAMETERS: + * thd - THD pointer. + * sel - SELECT_LEX* that describes the query. + * RETURN: + * select_handler if possible + * NULL otherwise + ************************************************************/ +select_handler* +create_xpand_select_handler(THD* thd, SELECT_LEX* select_lex) +{ + ha_xpand_select_handler *sh = NULL; + if (!select_handler_setting(thd)) { + return sh; + } + + // TODO Return early for EXPLAIN before we run the actual scan. + // We can send compile request when we separate compilation + // and execution. + xpand_connection_cursor *scan = NULL; + if (thd->lex->describe) { + sh = new ha_xpand_select_handler(thd, select_lex, scan); + return sh; + } + + // Multi-update runs an implicit query to collect constraints. + // SH couldn't be used for this. + if (thd->lex->sql_command == SQLCOM_UPDATE_MULTI) { + return sh; + } + + String query; + // Print the query into a string provided + select_lex->print(thd, &query, QT_ORDINARY); + int error_code = 0; + int field_metadata_size = 0; + xpand_connection *trx = NULL; + + // We presume this number is equal to types.elements in get_field_types + uint items_number = select_lex->get_item_list()->elements; + uint num_null_bytes = (items_number + 7) / 8; + uchar *fieldtype = NULL; + uchar *null_bits = NULL; + uchar *field_metadata = NULL; + uchar *meta_memory= (uchar *)my_multi_malloc(MYF(MY_WME), &fieldtype, items_number, + &null_bits, num_null_bytes, &field_metadata, (items_number * 2), NULL); + + if (!meta_memory) { + // The only way to say something here is to raise warning + // b/c we will fallback to other access methods: derived handler or rowstore. + goto err; + } + + if((field_metadata_size = + get_field_types(thd, NULL, select_lex, fieldtype, field_metadata, null_bits, num_null_bytes, items_number)) < 0) { + goto err; + } + + trx = get_trx(thd, &error_code); + if (!trx) + goto err; + + if (!thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) + trx->auto_commit_next(); + + if ((error_code = trx->scan_query(query, fieldtype, items_number, null_bits, + num_null_bytes, field_metadata, + field_metadata_size, + row_buffer_setting(thd), &scan))) { + goto err; + } + + sh = new ha_xpand_select_handler(thd, select_lex, scan); + +err: + // deallocate buffers + if (meta_memory) + my_free(meta_memory); + + return sh; +} + +/*********************************************************** + * DESCRIPTION: + * select_handler constructor + * PARAMETERS: + * thd - THD pointer. + * select_lex - sematic tree for the query. + **********************************************************/ +ha_xpand_select_handler::ha_xpand_select_handler( + THD *thd, + SELECT_LEX* select_lex, + xpand_connection_cursor *scan_) + : select_handler(thd, xpand_hton) +{ + thd__ = thd; + scan = scan_; + select = select_lex; + rgi = NULL; +} + +/*********************************************************** + * DESCRIPTION: + * select_handler constructor + * This frees dynamic memory allocated for bitmap + * and disables replication to SH temp table. + **********************************************************/ +ha_xpand_select_handler::~ha_xpand_select_handler() +{ + int error_code; + xpand_connection *trx = get_trx(thd, &error_code); + if (!trx) { + // TBD Log this + } + if (trx && scan) + trx->scan_end(scan); + + // If the ::init_scan has been executed + if (table__) + my_bitmap_free(&scan_fields); + + if (rgi) + remove_current_table_from_rpl_table_list(rgi); +} + +/*@brief Initiate the query for select_handler */ +/*********************************************************** + * DESCRIPTION: + * Initializes dynamic structures and sets SH temp table + * as RBR replication destination to unpack rows. + * * PARAMETERS: + * RETURN: + * rc as int + * ********************************************************/ +int ha_xpand_select_handler::init_scan() +{ + // Save this into the base handler class attribute + table__ = table; + // need this bitmap future in next_row() + if (my_bitmap_init(&scan_fields, NULL, table->read_set->n_bits, false)) + return ER_OUTOFMEMORY; + bitmap_set_all(&scan_fields); + + add_current_table_to_rpl_table_list(&rgi, thd__, table__); + + return 0; +} + +/*@brief Fetch next row for select_handler */ +/*********************************************************** + * DESCRIPTION: + * Fetch next row for select_handler. + * PARAMETERS: + * RETURN: + * rc as int + * ********************************************************/ +int ha_xpand_select_handler::next_row() +{ + int error_code = 0; + xpand_connection *trx = get_trx(thd, &error_code); + if (!trx) + return error_code; + + assert(scan); + + uchar *rowdata; + ulong rowdata_length; + if ((error_code = trx->scan_next(scan, &rowdata, &rowdata_length))) + return error_code; + + uchar const *current_row_end; + ulong master_reclength; + + error_code = unpack_row(rgi, table, table->s->fields, rowdata, + &scan_fields, ¤t_row_end, + &master_reclength, rowdata + rowdata_length); + + if (error_code) + return error_code; + + return 0; +} + +/*@brief Finishes the scan and clean it up */ +/*********************************************************** + * DESCRIPTION: + * Finishes the scan for select handler + * PARAMETERS: + * RETURN: + * rc as int + ***********************************************************/ +int ha_xpand_select_handler::end_scan() +{ + return 0; +} + +/*@brief create_xpand_derived_handler- Creates handler*/ +/************************************************************ + * DESCRIPTION: + * Creates a derived handler + * More details in server/sql/derived_handler.h + * PARAMETERS: + * thd - THD pointer. + * derived - TABLE_LIST* that describes the tables involved + * RETURN: + * derived_handler if possible + * NULL otherwise + ************************************************************/ +derived_handler* +create_xpand_derived_handler(THD* thd, TABLE_LIST *derived) +{ + ha_xpand_derived_handler *dh = NULL; + if (!derived_handler_setting(thd)) { + return dh; + } + + SELECT_LEX_UNIT *unit= derived->derived; + SELECT_LEX *select_lex = unit->first_select(); + String query; + + dh = new ha_xpand_derived_handler(thd, select_lex, NULL); + + return dh; +} + +/*********************************************************** + * DESCRIPTION: + * derived_handler constructor + * PARAMETERS: + * thd - THD pointer. + * select_lex - sematic tree for the query. + **********************************************************/ +ha_xpand_derived_handler::ha_xpand_derived_handler( + THD *thd, + SELECT_LEX* select_lex, + xpand_connection_cursor *scan_) + : derived_handler(thd, xpand_hton) +{ + thd__ = thd; + scan = scan_; + select = select_lex; + rgi = NULL; +} + +/*********************************************************** + * DESCRIPTION: + * derived_handler constructor + * This frees dynamic memory allocated for bitmap + * and disables replication to SH temp table. + **********************************************************/ +ha_xpand_derived_handler::~ha_xpand_derived_handler() +{ + int error_code; + + + + xpand_connection *trx = get_trx(thd, &error_code); + if (!trx) { + // TBD Log this. + } + if (trx && scan) + trx->scan_end(scan); + + // If the ::init_scan has been executed + if (table__) + my_bitmap_free(&scan_fields); + + if (rgi) + remove_current_table_from_rpl_table_list(rgi); +} + +/*@brief Initiate the query for derived_handler */ +/*********************************************************** + * DESCRIPTION: + * Initializes dynamic structures and sets SH temp table + * as RBR replication destination to unpack rows. + * * PARAMETERS: + * RETURN: + * rc as int + * ********************************************************/ +int ha_xpand_derived_handler::init_scan() +{ + String query; + // Print the query into a string provided + select->print(thd__, &query, QT_ORDINARY); + int error_code = 0; + int field_metadata_size = 0; + xpand_connection *trx = NULL; + + // We presume this number is equal to types.elements in get_field_types + uint items_number= select->get_item_list()->elements; + uint num_null_bytes = (items_number + 7) / 8; + uchar *fieldtype = NULL; + uchar *null_bits = NULL; + uchar *field_metadata = NULL; + uchar *meta_memory= (uchar *)my_multi_malloc(MYF(MY_WME), &fieldtype, items_number, + &null_bits, num_null_bytes, &field_metadata, (items_number * 2), NULL); + + if (!meta_memory) { + // The only way to say something here is to raise warning + // b/c we will fallback to other access methods: derived handler or rowstore. + goto err; + } + + if((field_metadata_size= + get_field_types(thd__, table, select, fieldtype, field_metadata, null_bits, num_null_bytes, items_number)) < 0) { + goto err; + } + + trx = get_trx(thd__, &error_code); + if (!trx) + goto err; + + if ((error_code = trx->scan_query(query, fieldtype, items_number, null_bits, + num_null_bytes, field_metadata, + field_metadata_size, + row_buffer_setting(thd), &scan))) { + goto err; + } + + // Save this into the base handler class attribute + table__ = table; + + // need this bitmap future in next_row() + if (my_bitmap_init(&scan_fields, NULL, table->read_set->n_bits, false)) + return ER_OUTOFMEMORY; + bitmap_set_all(&scan_fields); + + add_current_table_to_rpl_table_list(&rgi, thd__, table__); + +err: + // deallocate buffers + if (meta_memory) + my_free(meta_memory); + + return error_code; +} + +/*@brief Fetch next row for derived_handler */ +/*********************************************************** + * DESCRIPTION: + * Fetch next row for derived_handler. + * PARAMETERS: + * RETURN: + * rc as int + * ********************************************************/ +int ha_xpand_derived_handler::next_row() +{ + int error_code = 0; + xpand_connection *trx = get_trx(thd, &error_code); + if (!trx) + return error_code; + + assert(scan); + + uchar *rowdata; + ulong rowdata_length; + if ((error_code = trx->scan_next(scan, &rowdata, &rowdata_length))) + return error_code; + + uchar const *current_row_end; + ulong master_reclength; + + error_code = unpack_row(rgi, table, table->s->fields, rowdata, + &scan_fields, ¤t_row_end, + &master_reclength, rowdata + rowdata_length); + + if (error_code) + return error_code; + + return 0; +} + +/*@brief Finishes the scan and clean it up */ +/*********************************************************** + * DESCRIPTION: + * Finishes the scan for derived handler + * PARAMETERS: + * RETURN: + * rc as int + ***********************************************************/ +int ha_xpand_derived_handler::end_scan() +{ + return 0; +} diff --git a/storage/xpand/ha_xpand_pushdown.h b/storage/xpand/ha_xpand_pushdown.h new file mode 100644 index 00000000000..3fd234131c9 --- /dev/null +++ b/storage/xpand/ha_xpand_pushdown.h @@ -0,0 +1,87 @@ +/***************************************************************************** +Copyright (c) 2019, MariaDB Corporation. +*****************************************************************************/ +#ifndef _ha_xpand_pushdown_h +#define _ha_xpand_pushdown_h + +#include "select_handler.h" +#include "derived_handler.h" +#include "sql_select.h" + +/*@brief base_handler class*/ +/*********************************************************** + * DESCRIPTION: + * To be described + ************************************************************/ +class ha_xpand_base_handler +{ + // To simulate abstract class + protected: + ha_xpand_base_handler(): thd__(0),table__(0) {} + ~ha_xpand_base_handler() {} + + // Copies of pushdown handlers attributes + // to use them in shared methods. + THD *thd__; + TABLE *table__; + // The bitmap used to sent + MY_BITMAP scan_fields; + // Structures to unpack RBR rows from XPD BE + rpl_group_info *rgi; + // XPD BE scan operation reference + xpand_connection_cursor *scan; +}; + +/*@brief select_handler class*/ +/*********************************************************** + * DESCRIPTION: + * select_handler API methods. Could be used by the server + * tp pushdown the whole query described by SELECT_LEX. + * More details in server/sql/select_handler.h + * sel semantic tree for the query in SELECT_LEX. + ************************************************************/ +class ha_xpand_select_handler: + private ha_xpand_base_handler, + public select_handler +{ + public: + ha_xpand_select_handler(THD* thd_arg, SELECT_LEX* sel, + xpand_connection_cursor *scan); + ~ha_xpand_select_handler(); + + int init_scan(); + int next_row(); + int end_scan(); + void print_error(int, unsigned long) {} +}; + +/*@brief derived_handler class*/ +/*********************************************************** + * DESCRIPTION: + * derived_handler API methods. Could be used by the server + * tp pushdown the whole query described by SELECT_LEX. + * More details in server/sql/derived_handler.h + * sel semantic tree for the query in SELECT_LEX. + ************************************************************/ +class ha_xpand_derived_handler: + private ha_xpand_base_handler, + public derived_handler +{ + public: + ha_xpand_derived_handler(THD* thd_arg, SELECT_LEX* sel, + xpand_connection_cursor *scan); + ~ha_xpand_derived_handler(); + + int init_scan(); + int next_row(); + int end_scan(); + void print_error(int, unsigned long) {} +}; + + +select_handler *create_xpand_select_handler(THD* thd, + SELECT_LEX* select_lex); +derived_handler *create_xpand_derived_handler(THD* thd, + TABLE_LIST *derived); + +#endif diff --git a/storage/xpand/xpand_connection.cc b/storage/xpand/xpand_connection.cc new file mode 100644 index 00000000000..fac728f291c --- /dev/null +++ b/storage/xpand/xpand_connection.cc @@ -0,0 +1,1190 @@ +/***************************************************************************** +Copyright (c) 2019, MariaDB Corporation. +*****************************************************************************/ + +/** @file xpand_connection.cc */ + +#include "xpand_connection.h" +#include +#include "errmsg.h" +#include "handler.h" +#include "table.h" + +extern int xpand_connect_timeout; +extern int xpand_read_timeout; +extern int xpand_write_timeout; +extern char *xpand_host; +extern char *xpand_username; +extern char *xpand_password; +extern uint xpand_port; +extern char *xpand_socket; + +/* + This class implements the commands that can be sent to the cluster by the + Xpand engine. All of these commands return a status to the caller, but some + commands also create open invocations on the cluster, which must be closed by + sending additional commands. + + Transactions on the cluster are started using flags attached to commands, and + transactions are committed or rolled back using separate commands. + + Methods ending with _next affect the transaction state after the next command + is sent to the cluster. Other transaction commands are sent to the cluster + immediately, and the state is changed before they return. + + _____________________ _______________________ + | | | | | | + V | | V | | + NONE --> REQUESTED --> STARTED --> NEW_STMT | + | | + `----> ROLLBACK_STMT ---` + + The commit and rollback commands will change any other state to NONE. This + includes the REQUESTED state, for which nothing will be sent to the cluster. + The rollback statement command can likewise change the state from NEW_STMT to + STARTED without sending anything to the cluster. + + In addition, the XPAND_TRANS_AUTOCOMMIT flag will cause the transactions + for commands that complete without leaving open invocations on the cluster to + be committed if successful or rolled back if there was an error. If + auto-commit is enabled, only one open invocation may be in progress at a + time. +*/ + +enum xpand_trans_state { + XPAND_TRANS_STARTED = 0, + XPAND_TRANS_REQUESTED = 1, + XPAND_TRANS_NEW_STMT = 2, + XPAND_TRANS_ROLLBACK_STMT = 4, + XPAND_TRANS_NONE = 32, +}; + +enum xpand_trans_post_flags { + XPAND_TRANS_AUTOCOMMIT = 8, + XPAND_TRANS_NO_POST_FLAGS = 0, +}; + +enum xpand_commands { + XPAND_WRITE_ROW = 1, + XPAND_SCAN_TABLE, + XPAND_SCAN_NEXT, + XPAND_SCAN_STOP, + XPAND_KEY_READ, + XPAND_KEY_DELETE, + XPAND_SCAN_QUERY, + XPAND_KEY_UPDATE, + XPAND_SCAN_FROM_KEY, + XPAND_UPDATE_QUERY, + XPAND_COMMIT, + XPAND_ROLLBACK, +}; + +/**************************************************************************** +** Class xpand_connection +****************************************************************************/ +xpand_connection::xpand_connection() + : command_buffer(NULL), command_buffer_length(0), command_length(0), + trans_state(XPAND_TRANS_NONE), trans_flags(XPAND_TRANS_NO_POST_FLAGS) +{ + DBUG_ENTER("xpand_connection::xpand_connection"); + memset(&xpand_net, 0, sizeof(MYSQL)); + DBUG_VOID_RETURN; +} + +xpand_connection::~xpand_connection() +{ + DBUG_ENTER("xpand_connection::~xpand_connection"); + if (is_connected()) + disconnect(TRUE); + + if (command_buffer) + my_free(command_buffer); + DBUG_VOID_RETURN; +} + +void xpand_connection::disconnect(bool is_destructor) +{ + DBUG_ENTER("xpand_connection::disconnect"); + if (is_destructor) + { + /* + Connection object destruction occurs after the destruction of + the thread used by the network has begun, so usage of that + thread object now is not reliable + */ + xpand_net.net.thd = NULL; + } + mysql_close(&xpand_net); + DBUG_VOID_RETURN; +} + +int host_list_next; +extern int host_list_cnt; +extern char **host_list; + +int xpand_connection::connect() +{ + int error_code = 0; + my_bool my_true = 1; + DBUG_ENTER("xpand_connection::connect"); + + // cpu concurrency by damned! + int host_num = host_list_next; + host_num = host_num % host_list_cnt; + char *host = host_list[host_num]; + host_list_next = host_num + 1; + DBUG_PRINT("host", ("%s", host)); + + /* Validate the connection parameters */ + if (!strcmp(xpand_socket, "")) + if (!strcmp(host, "127.0.0.1")) + if (xpand_port == MYSQL_PORT_DEFAULT) + DBUG_RETURN(ER_CONNECT_TO_FOREIGN_DATA_SOURCE); + + //xpand_net.methods = &connection_methods; + + if (!mysql_init(&xpand_net)) + DBUG_RETURN(HA_ERR_OUT_OF_MEM); + + mysql_options(&xpand_net, MYSQL_OPT_READ_TIMEOUT, + &xpand_read_timeout); + mysql_options(&xpand_net, MYSQL_OPT_WRITE_TIMEOUT, + &xpand_write_timeout); + mysql_options(&xpand_net, MYSQL_OPT_CONNECT_TIMEOUT, + &xpand_connect_timeout); + mysql_options(&xpand_net, MYSQL_OPT_USE_REMOTE_CONNECTION, + NULL); + mysql_options(&xpand_net, MYSQL_SET_CHARSET_NAME, "utf8mb4"); + mysql_options(&xpand_net, MYSQL_OPT_USE_THREAD_SPECIFIC_MEMORY, + (char *) &my_true); + mysql_options(&xpand_net, MYSQL_INIT_COMMAND,"SET autocommit=0"); + +#ifdef XPAND_CONNECTION_SSL + if (opt_ssl_ca_length | conn->tgt_ssl_capath_length | + conn->tgt_ssl_cert_length | conn->tgt_ssl_key_length) + { + mysql_ssl_set(&xpand_net, conn->tgt_ssl_key, conn->tgt_ssl_cert, + conn->tgt_ssl_ca, conn->tgt_ssl_capath, conn->tgt_ssl_cipher); + if (conn->tgt_ssl_vsc) + { + my_bool verify_flg = TRUE; + mysql_options(&xpand_net, MYSQL_OPT_SSL_VERIFY_SERVER_CERT, + &verify_flg); + } + } +#endif + + if (!mysql_real_connect(&xpand_net, host, + xpand_username, xpand_password, + NULL, xpand_port, xpand_socket, + CLIENT_MULTI_STATEMENTS)) + { + error_code = mysql_errno(&xpand_net); + disconnect(); + + if (error_code != CR_CONN_HOST_ERROR && + error_code != CR_CONNECTION_ERROR) + { + if (error_code == ER_CON_COUNT_ERROR) + { + my_error(ER_CON_COUNT_ERROR, MYF(0)); + DBUG_RETURN(ER_CON_COUNT_ERROR); + } + my_error(ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), host); + DBUG_RETURN(ER_CONNECT_TO_FOREIGN_DATA_SOURCE); + } + } + + xpand_net.reconnect = 1; + + DBUG_RETURN(0); +} + +int xpand_connection::begin_command(uchar command) +{ + if (trans_state == XPAND_TRANS_NONE) + return HA_ERR_INTERNAL_ERROR; + + command_length = 0; + int error_code = 0; + if ((error_code = add_command_operand_uchar(command))) + return error_code; + + if ((error_code = add_command_operand_uchar(trans_state | trans_flags))) + return error_code; + + return error_code; +} + +int xpand_connection::send_command() +{ + my_bool com_error; + + /* + Please note: + * The transaction state is set before the command is sent because rolling + back a nonexistent transaction is better than leaving a tranaction open + on the cluster. + * The state may have alreadly been STARTED. + * Commit and rollback commands update the transaction state after calling + this function. + * If auto-commit is enabled, the state may also updated after the + response has been processed. We do not clear the auto-commit flag here + because it needs to be sent with each command until the transaction is + committed or rolled back. + */ + trans_state = XPAND_TRANS_STARTED; + + com_error = simple_command(&xpand_net, + (enum_server_command)XPAND_SERVER_REQUEST, + command_buffer, command_length, TRUE); + + if (com_error) + { + int error_code = mysql_errno(&xpand_net); + my_printf_error(error_code, + "Xpand error: %s", MYF(0), + mysql_error(&xpand_net)); + return error_code; + } + + return 0; +} + +int xpand_connection::read_query_response() +{ + my_bool comerr = xpand_net.methods->read_query_result(&xpand_net); + int error_code = 0; + if (comerr) + { + error_code = mysql_errno(&xpand_net); + my_printf_error(error_code, + "Xpand error: %s", MYF(0), + mysql_error(&xpand_net)); + } + + auto_commit_closed(); + return error_code; +} + +bool xpand_connection::has_open_transaction() +{ + return trans_state != XPAND_TRANS_NONE; +} + +int xpand_connection::commit_transaction() +{ + DBUG_ENTER("xpand_connection::commit_transaction"); + if (trans_state == XPAND_TRANS_NONE) + DBUG_RETURN(HA_ERR_INTERNAL_ERROR); + + if (trans_state == XPAND_TRANS_REQUESTED) { + trans_state = XPAND_TRANS_NONE; + trans_flags = XPAND_TRANS_NO_POST_FLAGS; + DBUG_RETURN(0); + } + + int error_code; + if ((error_code = begin_command(XPAND_COMMIT))) + DBUG_RETURN(error_code); + + if ((error_code = send_command())) + DBUG_RETURN(error_code); + + if ((error_code = read_query_response())) + DBUG_RETURN(error_code); + + trans_state = XPAND_TRANS_NONE; + trans_flags = XPAND_TRANS_NO_POST_FLAGS; + DBUG_RETURN(error_code); +} + +int xpand_connection::rollback_transaction() +{ + DBUG_ENTER("xpand_connection::rollback_transaction"); + if (trans_state == XPAND_TRANS_NONE || + trans_state == XPAND_TRANS_REQUESTED) { + trans_state = XPAND_TRANS_NONE; + DBUG_RETURN(0); + } + + int error_code; + if ((error_code = begin_command(XPAND_ROLLBACK))) + DBUG_RETURN(error_code); + + if ((error_code = send_command())) + DBUG_RETURN(error_code); + + if ((error_code = read_query_response())) + DBUG_RETURN(error_code); + + trans_state = XPAND_TRANS_NONE; + trans_flags = XPAND_TRANS_NO_POST_FLAGS; + DBUG_RETURN(error_code); +} + +int xpand_connection::begin_transaction_next() +{ + DBUG_ENTER("xpand_connection::begin_transaction_next"); + if (trans_state != XPAND_TRANS_NONE || + trans_flags != XPAND_TRANS_NO_POST_FLAGS) + DBUG_RETURN(HA_ERR_INTERNAL_ERROR); + + trans_state = XPAND_TRANS_REQUESTED; + DBUG_RETURN(0); +} + +int xpand_connection::new_statement_next() +{ + DBUG_ENTER("xpand_connection::new_statement_next"); + if (trans_state != XPAND_TRANS_STARTED || + trans_flags != XPAND_TRANS_NO_POST_FLAGS) + DBUG_RETURN(HA_ERR_INTERNAL_ERROR); + + trans_state = XPAND_TRANS_NEW_STMT; + DBUG_RETURN(0); +} + +int xpand_connection::rollback_statement_next() +{ + DBUG_ENTER("xpand_connection::rollback_statement_next"); + if (trans_state != XPAND_TRANS_STARTED || + trans_flags != XPAND_TRANS_NO_POST_FLAGS) + DBUG_RETURN(HA_ERR_INTERNAL_ERROR); + + trans_state = XPAND_TRANS_ROLLBACK_STMT; + DBUG_RETURN(0); +} + +void xpand_connection::auto_commit_next() +{ + trans_flags |= XPAND_TRANS_AUTOCOMMIT; +} + +void xpand_connection::auto_commit_closed() +{ + if (trans_flags & XPAND_TRANS_AUTOCOMMIT) { + trans_flags &= ~XPAND_TRANS_AUTOCOMMIT; + trans_state = XPAND_TRANS_NONE; + } +} + +int xpand_connection::run_query(String &stmt) +{ + int error_code = mysql_real_query(&xpand_net, stmt.ptr(), stmt.length()); + if (error_code) + return mysql_errno(&xpand_net); + return error_code; +} + +int xpand_connection::write_row(ulonglong xpand_table_oid, + uchar *packed_row, size_t packed_size, + ulonglong *last_insert_id) +{ + int error_code; + command_length = 0; + + // row based commands should not be called with auto commit. + if (trans_flags & XPAND_TRANS_AUTOCOMMIT) + return HA_ERR_INTERNAL_ERROR; + + if ((error_code = begin_command(XPAND_WRITE_ROW))) + return error_code; + + if ((error_code = add_command_operand_ulonglong(xpand_table_oid))) + return error_code; + + if ((error_code = add_command_operand_str(packed_row, packed_size))) + return error_code; + + if ((error_code = send_command())) + return error_code; + + if ((error_code = read_query_response())) + return error_code; + + *last_insert_id = xpand_net.insert_id; + return error_code; +} + +int xpand_connection::key_update(ulonglong xpand_table_oid, + uchar *packed_key, size_t packed_key_length, + MY_BITMAP *update_set, + uchar *packed_new_data, + size_t packed_new_length) +{ + int error_code; + command_length = 0; + + // row based commands should not be called with auto commit. + if (trans_flags & XPAND_TRANS_AUTOCOMMIT) + return HA_ERR_INTERNAL_ERROR; + + if ((error_code = begin_command(XPAND_KEY_UPDATE))) + return error_code; + + if ((error_code = add_command_operand_ulonglong(xpand_table_oid))) + return error_code; + + if ((error_code = add_command_operand_str(packed_key, packed_key_length))) + return error_code; + + if ((error_code = add_command_operand_bitmap(update_set))) + return error_code; + + if ((error_code = add_command_operand_str(packed_new_data, + packed_new_length))) + return error_code; + + if ((error_code = send_command())) + return error_code; + + if ((error_code = read_query_response())) + return error_code; + + return error_code; +} + +int xpand_connection::key_delete(ulonglong xpand_table_oid, + uchar *packed_key, size_t packed_key_length) +{ + int error_code; + command_length = 0; + + // row based commands should not be called with auto commit. + if (trans_flags & XPAND_TRANS_AUTOCOMMIT) + return HA_ERR_INTERNAL_ERROR; + + if ((error_code = begin_command(XPAND_KEY_DELETE))) + return error_code; + + if ((error_code = add_command_operand_ulonglong(xpand_table_oid))) + return error_code; + + if ((error_code = add_command_operand_str(packed_key, packed_key_length))) + return error_code; + + if ((error_code = send_command())) + return error_code; + + if ((error_code = read_query_response())) + return error_code; + + return error_code; +} + +int xpand_connection::key_read(ulonglong xpand_table_oid, uint index, + xpand_lock_mode_t lock_mode, + MY_BITMAP *read_set, uchar *packed_key, + ulong packed_key_length, uchar **rowdata, + ulong *rowdata_length) +{ + int error_code; + command_length = 0; + + // row based commands should not be called with auto commit. + if (trans_flags & XPAND_TRANS_AUTOCOMMIT) + return HA_ERR_INTERNAL_ERROR; + + if ((error_code = begin_command(XPAND_KEY_READ))) + return error_code; + + if ((error_code = add_command_operand_ulonglong(xpand_table_oid))) + return error_code; + + if ((error_code = add_command_operand_uint(index))) + return error_code; + + if ((error_code = add_command_operand_uchar((uchar)lock_mode))) + return error_code; + + if ((error_code = add_command_operand_bitmap(read_set))) + return error_code; + + if ((error_code = add_command_operand_str(packed_key, packed_key_length))) + return error_code; + + if ((error_code = send_command())) + return error_code; + + ulong packet_length = cli_safe_read(&xpand_net); + if (packet_length == packet_error) + return mysql_errno(&xpand_net); + + uchar *data = xpand_net.net.read_pos; + *rowdata_length = safe_net_field_length_ll(&data, packet_length); + *rowdata = (uchar *)my_malloc(*rowdata_length, MYF(MY_WME)); + memcpy(*rowdata, data, *rowdata_length); + + packet_length = cli_safe_read(&xpand_net); + if (packet_length == packet_error) { + my_free(*rowdata); + *rowdata = NULL; + *rowdata_length = 0; + return mysql_errno(&xpand_net); + } + + return 0; +} + +class xpand_connection_cursor { + struct rowdata { + ulong length; + uchar *data; + }; + + ulong current_row; + ulong last_row; + struct rowdata *rows; + uchar *outstanding_row; // to be freed on next request. + MYSQL *xpand_net; + +public: + ulong buffer_size; + ulonglong scan_refid; + bool eof_reached; + +private: + int cache_row(uchar *rowdata, ulong rowdata_length) + { + DBUG_ENTER("xpand_connection_cursor::cache_row"); + rows[last_row].length = rowdata_length; + rows[last_row].data = (uchar *)my_malloc(rowdata_length, MYF(MY_WME)); + if (!rows[last_row].data) + DBUG_RETURN(HA_ERR_OUT_OF_MEM); + memcpy(rows[last_row].data, rowdata, rowdata_length); + last_row++; + DBUG_RETURN(0); + } + + int load_rows_impl(bool *stmt_completed) + { + DBUG_ENTER("xpand_connection_cursor::load_rows_impl"); + int error_code = 0; + ulong packet_length = cli_safe_read(xpand_net); + if (packet_length == packet_error) { + error_code = mysql_errno(xpand_net); + *stmt_completed = TRUE; + if (error_code == HA_ERR_END_OF_FILE) { + // We have read all rows for query. + eof_reached = TRUE; + DBUG_RETURN(0); + } + DBUG_RETURN(error_code); + } + + uchar *rowdata = xpand_net->net.read_pos; + ulong rowdata_length = safe_net_field_length_ll(&rowdata, packet_length); + if (!rowdata_length) { + // We have read all rows in this batch. + DBUG_RETURN(0); + } + + if ((error_code = cache_row(rowdata, rowdata_length))) + DBUG_RETURN(error_code); + + DBUG_RETURN(load_rows_impl(stmt_completed)); + } + +public: + xpand_connection_cursor(MYSQL *xpand_net_, ulong bufsize) + { + DBUG_ENTER("xpand_connection_cursor::xpand_connection_cursor"); + xpand_net = xpand_net_; + eof_reached = FALSE; + current_row = 0; + last_row = 0; + outstanding_row = NULL; + buffer_size = bufsize; + rows = NULL; + DBUG_VOID_RETURN; + } + + ~xpand_connection_cursor() + { + DBUG_ENTER("xpand_connection_cursor::~xpand_connection_cursor"); + if (outstanding_row) + my_free(outstanding_row); + if (rows) { + while (current_row < last_row) + my_free(rows[current_row++].data); + my_free(rows); + } + DBUG_VOID_RETURN; + } + + int load_rows(bool *stmt_completed) + { + DBUG_ENTER("xpand_connection_cursor::load_rows"); + current_row = 0; + last_row = 0; + DBUG_RETURN(load_rows_impl(stmt_completed)); + } + + int initialize(bool *stmt_completed) + { + DBUG_ENTER("xpand_connection_cursor::initialize"); + ulong packet_length = cli_safe_read(xpand_net); + if (packet_length == packet_error) { + *stmt_completed = TRUE; + DBUG_RETURN(mysql_errno(xpand_net)); + } + + unsigned char *pos = xpand_net->net.read_pos; + scan_refid = safe_net_field_length_ll(&pos, packet_length); + + rows = (struct rowdata *)my_malloc(buffer_size * sizeof(struct rowdata), + MYF(MY_WME)); + if (!rows) + DBUG_RETURN(HA_ERR_OUT_OF_MEM); + + DBUG_RETURN(load_rows(stmt_completed)); + } + + uchar *retrieve_row(ulong *rowdata_length) + { + DBUG_ENTER("xpand_connection_cursor::retrieve_row"); + if (outstanding_row) { + my_free(outstanding_row); + outstanding_row = NULL; + } + if (current_row == last_row) + DBUG_RETURN(NULL); + *rowdata_length = rows[current_row].length; + outstanding_row = rows[current_row].data; + current_row++; + DBUG_RETURN(outstanding_row); + } +}; + +int xpand_connection::allocate_cursor(MYSQL *xpand_net, ulong buffer_size, + xpand_connection_cursor **scan) +{ + DBUG_ENTER("xpand_connection::allocate_cursor"); + *scan = new xpand_connection_cursor(xpand_net, buffer_size); + if (!*scan) + DBUG_RETURN(HA_ERR_OUT_OF_MEM); + + bool stmt_completed = FALSE; + int error_code = (*scan)->initialize(&stmt_completed); + if (error_code) { + delete *scan; + *scan = NULL; + } + + if (stmt_completed) + auto_commit_closed(); + + DBUG_RETURN(error_code); +} + +int xpand_connection::scan_table(ulonglong xpand_table_oid, + xpand_lock_mode_t lock_mode, + MY_BITMAP *read_set, ushort row_req, + xpand_connection_cursor **scan) +{ + int error_code; + command_length = 0; + + // row based commands should not be called with auto commit. + if (trans_flags & XPAND_TRANS_AUTOCOMMIT) + return HA_ERR_INTERNAL_ERROR; + + if ((error_code = begin_command(XPAND_SCAN_TABLE))) + return error_code; + + if ((error_code = add_command_operand_ushort(row_req))) + return error_code; + + if ((error_code = add_command_operand_ulonglong(xpand_table_oid))) + return error_code; + + if ((error_code = add_command_operand_uchar((uchar)lock_mode))) + return error_code; + + if ((error_code = add_command_operand_bitmap(read_set))) + return error_code; + + if ((error_code = send_command())) + return error_code; + + return allocate_cursor(&xpand_net, row_req, scan); +} + +/** + * @brief + * Sends a command to initiate query scan. + * @details + * Sends a command over mysql protocol connection to initiate an + * arbitrary query using a query text. + * Uses field types, field metadata and nullability to explicitly + * cast result to expected data type. Exploits RBR TABLE_MAP_EVENT + * format + sends SQL text. + * @args + * stmt& Query text to send + * fieldtype* array of byte wide field types of result projection + * null_bits* fields nullability bitmap of result projection + * field_metadata* Field metadata of result projection + * scan_refid id used to reference this scan later + * Used in pushdowns to initiate query scan. + **/ +int xpand_connection::scan_query(String &stmt, uchar *fieldtype, uint fields, + uchar *null_bits, uint null_bits_size, + uchar *field_metadata, + uint field_metadata_size, + ushort row_req, + xpand_connection_cursor **scan) +{ + int error_code; + command_length = 0; + + if ((error_code = begin_command(XPAND_SCAN_QUERY))) + return error_code; + + if ((error_code = add_command_operand_ushort(row_req))) + return error_code; + + if ((error_code = add_command_operand_str((uchar*)stmt.ptr(), stmt.length()))) + return error_code; + + if ((error_code = add_command_operand_str(fieldtype, fields))) + return error_code; + + if ((error_code = add_command_operand_str(field_metadata, field_metadata_size))) + return error_code; + + // This variable length string calls for an additional store w/o lcb lenth prefix. + if ((error_code = add_command_operand_vlstr(null_bits, null_bits_size))) + return error_code; + + if ((error_code = send_command())) + return error_code; + + return allocate_cursor(&xpand_net, row_req, scan); +} + +/** + * @brief + * Sends a command to initiate UPDATE. + * @details + * Sends a command over mysql protocol connection to initiate an + * UPDATE query using a query text. + * @args + * stmt& Query text to send + * dbname current working database + * dbname ¤t database name + **/ +int xpand_connection::update_query(String &stmt, LEX_CSTRING &dbname, + ulonglong *affected_rows) +{ + int error_code; + command_length = 0; + + if ((error_code = begin_command(XPAND_UPDATE_QUERY))) + return error_code; + + if ((error_code = add_command_operand_str((uchar*)dbname.str, dbname.length))) + return error_code; + + if ((error_code = add_command_operand_str((uchar*)stmt.ptr(), stmt.length()))) + return error_code; + + if ((error_code = send_command())) + return error_code; + + error_code = read_query_response(); + if (!error_code) + *affected_rows = xpand_net.affected_rows; + + return error_code; +} + +int xpand_connection::scan_from_key(ulonglong xpand_table_oid, uint index, + xpand_lock_mode_t lock_mode, + enum scan_type scan_dir, + int no_key_cols, bool sorted_scan, + MY_BITMAP *read_set, uchar *packed_key, + ulong packed_key_length, ushort row_req, + xpand_connection_cursor **scan) +{ + int error_code; + command_length = 0; + + // row based commands should not be called with auto commit. + if (trans_flags & XPAND_TRANS_AUTOCOMMIT) + return HA_ERR_INTERNAL_ERROR; + + if ((error_code = begin_command(XPAND_SCAN_FROM_KEY))) + return error_code; + + if ((error_code = add_command_operand_ushort(row_req))) + return error_code; + + if ((error_code = add_command_operand_ulonglong(xpand_table_oid))) + return error_code; + + if ((error_code = add_command_operand_uint(index))) + return error_code; + + if ((error_code = add_command_operand_uchar((uchar)lock_mode))) + return error_code; + + if ((error_code = add_command_operand_uchar(scan_dir))) + return error_code; + + if ((error_code = add_command_operand_uint(no_key_cols))) + return error_code; + + if ((error_code = add_command_operand_uchar(sorted_scan))) + return error_code; + + if ((error_code = add_command_operand_str(packed_key, packed_key_length))) + return error_code; + + if ((error_code = add_command_operand_bitmap(read_set))) + return error_code; + + if ((error_code = send_command())) + return error_code; + + return allocate_cursor(&xpand_net, row_req, scan); +} + +int xpand_connection::scan_next(xpand_connection_cursor *scan, + uchar **rowdata, ulong *rowdata_length) +{ + *rowdata = scan->retrieve_row(rowdata_length); + if (*rowdata) + return 0; + + if (scan->eof_reached) + return HA_ERR_END_OF_FILE; + + int error_code; + command_length = 0; + + if ((error_code = begin_command(XPAND_SCAN_NEXT))) + return error_code; + + if ((error_code = add_command_operand_ushort(scan->buffer_size))) + return error_code; + + if ((error_code = add_command_operand_lcb(scan->scan_refid))) + return error_code; + + if ((error_code = send_command())) + return error_code; + + bool stmt_completed = FALSE; + error_code = scan->load_rows(&stmt_completed); + if (stmt_completed) + auto_commit_closed(); + if (error_code) + return error_code; + + *rowdata = scan->retrieve_row(rowdata_length); + if (!*rowdata) + return HA_ERR_END_OF_FILE; + + return 0; +} + +int xpand_connection::scan_end(xpand_connection_cursor *scan) +{ + int error_code; + command_length = 0; + ulonglong scan_refid = scan->scan_refid; + bool eof_reached = scan->eof_reached; + delete scan; + + if (eof_reached) + return 0; + + if ((error_code = begin_command(XPAND_SCAN_STOP))) + return error_code; + + if ((error_code = add_command_operand_lcb(scan_refid))) + return error_code; + + if ((error_code = send_command())) + return error_code; + + return read_query_response(); +} + +int xpand_connection::populate_table_list(LEX_CSTRING *db, + handlerton::discovered_list *result) +{ + int error_code = 0; + String stmt; + stmt.append("SHOW FULL TABLES FROM "); + stmt.append(db); + stmt.append(" WHERE table_type = 'BASE TABLE'"); + + if (mysql_real_query(&xpand_net, stmt.c_ptr(), stmt.length())) { + int error_code = mysql_errno(&xpand_net); + if (error_code == ER_BAD_DB_ERROR) + return 0; + else + return error_code; + } + + MYSQL_RES *results = mysql_store_result(&xpand_net); + if (mysql_num_fields(results) != 2) { + error_code = HA_ERR_CORRUPT_EVENT; + goto error; + } + + MYSQL_ROW row; + while((row = mysql_fetch_row(results))) + result->add_table(row[0], strlen(row[0])); + +error: + mysql_free_result(results); + return error_code; +} + +int xpand_connection::discover_table_details(LEX_CSTRING *db, + LEX_CSTRING *name, THD *thd, + TABLE_SHARE *share) +{ + DBUG_ENTER("xpand_connection::discover_table_details"); + int error_code = 0; + MYSQL_RES *results_oid = NULL; + MYSQL_RES *results_create = NULL; + MYSQL_ROW row; + String get_oid, show; + + /* get oid */ + get_oid.append("select r.table " + "from system.databases d " + " inner join ""system.relations r on d.db = r.db " + "where d.name = '"); + get_oid.append(db); + get_oid.append("' and r.name = '"); + get_oid.append(name); + get_oid.append("'"); + + if (mysql_real_query(&xpand_net, get_oid.c_ptr(), get_oid.length())) { + if ((error_code = mysql_errno(&xpand_net))) { + DBUG_PRINT("mysql_real_query returns ", ("%d", error_code)); + error_code = HA_ERR_NO_SUCH_TABLE; + goto error; + } + } + + results_oid = mysql_store_result(&xpand_net); + DBUG_PRINT("oid results", + ("rows: %llu, fields: %u", mysql_num_rows(results_oid), + mysql_num_fields(results_oid))); + + if (mysql_num_rows(results_oid) != 1) { + error_code = HA_ERR_NO_SUCH_TABLE; + goto error; + } + + while((row = mysql_fetch_row(results_oid))) { + DBUG_PRINT("row", ("%s", row[0])); + uchar *to = (uchar*)alloc_root(&share->mem_root, strlen(row[0]) + 1); + if (!to) { + error_code = HA_ERR_OUT_OF_MEM; + goto error; + } + + strcpy((char *)to, (char *)row[0]); + share->tabledef_version.str = to; + share->tabledef_version.length = strlen(row[0]); + } + + /* get show create statement */ + show.append("show simple create table "); + show.append(db); + show.append("."); + show.append(name); + if (mysql_real_query(&xpand_net, show.c_ptr(), show.length())) { + if ((error_code = mysql_errno(&xpand_net))) { + DBUG_PRINT("mysql_real_query returns ", ("%d", error_code)); + error_code = HA_ERR_NO_SUCH_TABLE; + goto error; + } + } + + results_create = mysql_store_result(&xpand_net); + DBUG_PRINT("show table results", + ("rows: %llu, fields: %u", mysql_num_rows(results_create), + mysql_num_fields(results_create))); + + if (mysql_num_rows(results_create) != 1) { + error_code = HA_ERR_NO_SUCH_TABLE; + goto error; + } + + if (mysql_num_fields(results_create) != 2) { + error_code = HA_ERR_CORRUPT_EVENT; + goto error; + } + + while((row = mysql_fetch_row(results_create))) { + DBUG_PRINT("row", ("%s - %s", row[0], row[1])); + error_code = share->init_from_sql_statement_string(thd, false, row[1], + strlen(row[1])); + } + +error: + if (results_oid) + mysql_free_result(results_oid); + + if (results_create) + mysql_free_result(results_create); + DBUG_RETURN(error_code); +} + +#define COMMAND_BUFFER_SIZE_INCREMENT 1024 +#define COMMAND_BUFFER_SIZE_INCREMENT_BITS 10 +int xpand_connection::expand_command_buffer(size_t add_length) +{ + size_t expanded_length; + + if (command_buffer_length >= command_length + add_length) + return 0; + + expanded_length = command_buffer_length + + ((add_length >> COMMAND_BUFFER_SIZE_INCREMENT_BITS) + << COMMAND_BUFFER_SIZE_INCREMENT_BITS) + + COMMAND_BUFFER_SIZE_INCREMENT; + + if (!command_buffer_length) + command_buffer = (uchar *) my_malloc(expanded_length, MYF(MY_WME)); + else + command_buffer = (uchar *) my_realloc(command_buffer, expanded_length, + MYF(MY_WME)); + if (!command_buffer) + return HA_ERR_OUT_OF_MEM; + + command_buffer_length = expanded_length; + + return 0; +} + +int xpand_connection::add_command_operand_uchar(uchar value) +{ + int error_code = expand_command_buffer(sizeof(value)); + if (error_code) + return error_code; + + memcpy(command_buffer + command_length, &value, sizeof(value)); + command_length += sizeof(value); + + return 0; +} + +int xpand_connection::add_command_operand_ushort(ushort value) +{ + ushort be_value = htobe16(value); + int error_code = expand_command_buffer(sizeof(be_value)); + if (error_code) + return error_code; + + memcpy(command_buffer + command_length, &be_value, sizeof(be_value)); + command_length += sizeof(be_value); + return 0; +} + +int xpand_connection::add_command_operand_uint(uint value) +{ + uint be_value = htobe32(value); + int error_code = expand_command_buffer(sizeof(be_value)); + if (error_code) + return error_code; + + memcpy(command_buffer + command_length, &be_value, sizeof(be_value)); + command_length += sizeof(be_value); + return 0; +} + +int xpand_connection::add_command_operand_ulonglong(ulonglong value) +{ + ulonglong be_value = htobe64(value); + int error_code = expand_command_buffer(sizeof(be_value)); + if (error_code) + return error_code; + + memcpy(command_buffer + command_length, &be_value, sizeof(be_value)); + command_length += sizeof(be_value); + return 0; +} + +int xpand_connection::add_command_operand_lcb(ulonglong value) +{ + int len = net_length_size(value); + int error_code = expand_command_buffer(len); + if (error_code) + return error_code; + + net_store_length(command_buffer + command_length, value); + command_length += len; + return 0; +} + +int xpand_connection::add_command_operand_str(const uchar *str, + size_t str_length) +{ + int error_code = add_command_operand_lcb(str_length); + if (error_code) + return error_code; + + if (!str_length) + return 0; + + error_code = expand_command_buffer(str_length); + if (error_code) + return error_code; + + memcpy(command_buffer + command_length, str, str_length); + command_length += str_length; + return 0; +} + +/** + * @brief + * Puts variable length string into the buffer. + * @details + * Puts into the buffer variable length string the size + * of which is send by other means. For details see + * MDB Client/Server Protocol. + * @args + * str - string to send + * str_length - size + **/ +int xpand_connection::add_command_operand_vlstr(const uchar *str, + size_t str_length) +{ + int error_code = expand_command_buffer(str_length); + if (error_code) + return error_code; + + memcpy(command_buffer + command_length, str, str_length); + command_length += str_length; + return 0; +} + +int xpand_connection::add_command_operand_lex_string(LEX_CSTRING str) +{ + return add_command_operand_str((const uchar *)str.str, str.length); +} + +int xpand_connection::add_command_operand_bitmap(MY_BITMAP *bitmap) +{ + int error_code = add_command_operand_lcb(bitmap->n_bits); + if (error_code) + return error_code; + + int no_bytes = no_bytes_in_map(bitmap); + error_code = expand_command_buffer(no_bytes); + if (error_code) + return error_code; + + memcpy(command_buffer + command_length, bitmap->bitmap, no_bytes); + command_length += no_bytes; + return 0; +} diff --git a/storage/xpand/xpand_connection.h b/storage/xpand/xpand_connection.h new file mode 100644 index 00000000000..c07d8da5f5b --- /dev/null +++ b/storage/xpand/xpand_connection.h @@ -0,0 +1,123 @@ +/***************************************************************************** +Copyright (c) 2019, MariaDB Corporation. +*****************************************************************************/ + +#ifndef _xpand_connection_h +#define _xpand_connection_h + +#ifdef USE_PRAGMA_INTERFACE +#pragma interface /* gcc class implementation */ +#endif + +#define MYSQL_SERVER 1 +#include "my_global.h" +#include "m_string.h" +#include "mysql.h" +#include "sql_common.h" +#include "my_base.h" +#include "mysqld_error.h" +#include "my_bitmap.h" +#include "handler.h" + +#define XPAND_SERVER_REQUEST 30 + +typedef enum xpand_lock_mode { + XPAND_NO_LOCKS, + XPAND_SHARED, + XPAND_EXCLUSIVE, +} xpand_lock_mode_t; + +class xpand_connection_cursor; +class xpand_connection +{ +private: + MYSQL xpand_net; + uchar *command_buffer; + size_t command_buffer_length; + size_t command_length; + + int trans_state; + int trans_flags; + int allocate_cursor(MYSQL *xpand_net, ulong buffer_size, + xpand_connection_cursor **scan); +public: + xpand_connection(); + ~xpand_connection(); + + inline bool is_connected() + { + return xpand_net.net.vio; + } + int connect(); + void disconnect(bool is_destructor = FALSE); + + bool has_open_transaction(); + int commit_transaction(); + int rollback_transaction(); + int begin_transaction_next(); + int new_statement_next(); + int rollback_statement_next(); // also starts new statement + void auto_commit_next(); + void auto_commit_closed(); + + int run_query(String &stmt); + int write_row(ulonglong xpand_table_oid, uchar *packed_row, + size_t packed_size, ulonglong *last_insert_id); + int key_update(ulonglong xpand_table_oid, + uchar *packed_key, size_t packed_key_length, + MY_BITMAP *update_set, + uchar *packed_new_data, size_t packed_new_length); + int key_delete(ulonglong xpand_table_oid, + uchar *packed_key, size_t packed_key_length); + int key_read(ulonglong xpand_table_oid, uint index, + xpand_lock_mode_t lock_mode, MY_BITMAP *read_set, + uchar *packed_key, ulong packed_key_length, uchar **rowdata, + ulong *rowdata_length); + enum sort_order {SORT_NONE = 0, SORT_ASC = 1, SORT_DESC = 2}; + enum scan_type { + READ_KEY_OR_NEXT, /* rows with key and greater */ + READ_KEY_OR_PREV, /* rows with key and less. */ + READ_AFTER_KEY, /* rows with keys greater than key */ + READ_BEFORE_KEY, /* rows with keys less than key */ + READ_FROM_START, /* rows with forwards from first key. */ + READ_FROM_LAST, /* rows with backwards from last key. */ + }; + int scan_table(ulonglong xpand_table_oid, + xpand_lock_mode_t lock_mode, + MY_BITMAP *read_set, ushort row_req, + xpand_connection_cursor **scan); + int scan_query(String &stmt, uchar *fieldtype, uint fields, uchar *null_bits, + uint null_bits_size, uchar *field_metadata, + uint field_metadata_size, ushort row_req, + xpand_connection_cursor **scan); + int update_query(String &stmt, LEX_CSTRING &dbname, ulonglong *affected_rows); + int scan_from_key(ulonglong xpand_table_oid, uint index, + xpand_lock_mode_t lock_mode, + enum scan_type scan_dir, int no_key_cols, bool sorted_scan, + MY_BITMAP *read_set, uchar *packed_key, + ulong packed_key_length, ushort row_req, + xpand_connection_cursor **scan); + int scan_next(xpand_connection_cursor *scan, uchar **rowdata, + ulong *rowdata_length); + int scan_end(xpand_connection_cursor *scan); + + int populate_table_list(LEX_CSTRING *db, handlerton::discovered_list *result); + int discover_table_details(LEX_CSTRING *db, LEX_CSTRING *name, THD *thd, + TABLE_SHARE *share); + +private: + int expand_command_buffer(size_t add_length); + int add_command_operand_uchar(uchar value); + int add_command_operand_ushort(ushort value); + int add_command_operand_uint(uint value); + int add_command_operand_ulonglong(ulonglong value); + int add_command_operand_lcb(ulonglong value); + int add_command_operand_str(const uchar *str, size_t length); + int add_command_operand_vlstr(const uchar *str, size_t length); + int add_command_operand_lex_string(LEX_CSTRING str); + int add_command_operand_bitmap(MY_BITMAP *bitmap); + int begin_command(uchar command); + int send_command(); + int read_query_response(); +}; +#endif // _xpand_connection_h -- cgit v1.2.1