diff options
author | Monty <monty@mariadb.org> | 2019-04-15 18:16:02 +0300 |
---|---|---|
committer | Monty <monty@mariadb.org> | 2019-05-13 14:01:45 +0300 |
commit | a9499a3850f617529526848d9e9da86ee3f9f2e9 (patch) | |
tree | 31f7851d536d121197ce151a71c06813f0ddd7ff | |
parent | 59e1525d744a4588e4778117e157fe9871accc77 (diff) | |
download | mariadb-git-bb-maria-s3.tar.gz |
MDEV-17841 S3 storage enginebb-maria-s3
A read-only storage engine that stores it's data in (aws) S3
To store data in S3 one could use ALTER TABLE:
ALTER TABLE table_name ENGINE=S3
libmarias3 integration done by Sergei Golubchik
libmarias3 created by Andrew Hutchings
61 files changed, 4286 insertions, 175 deletions
diff --git a/.gitignore b/.gitignore index 0fb30cc3184..1739c9361d6 100644 --- a/.gitignore +++ b/.gitignore @@ -177,6 +177,7 @@ storage/maria/aria_dump_log storage/maria/aria_ftdump storage/maria/aria_pack storage/maria/aria_read_log +storage/maria/aria_s3_copy storage/maria/ma_rt_test storage/maria/ma_sp_test storage/maria/ma_test1 diff --git a/.gitmodules b/.gitmodules index 61d4c06dd4e..bccc2dbde1a 100644 --- a/.gitmodules +++ b/.gitmodules @@ -8,3 +8,6 @@ path = wsrep-lib url = https://github.com/codership/wsrep-lib.git branch = master +[submodule "storage/maria/libmarias3"] + path = storage/maria/libmarias3 + url = https://github.com/mariadb-corporation/libmarias3 diff --git a/BUILD/FINISH.sh b/BUILD/FINISH.sh index 447eae0a65f..c58e9effa4c 100644 --- a/BUILD/FINISH.sh +++ b/BUILD/FINISH.sh @@ -42,6 +42,8 @@ cd ./libmariadb git submodule update cd ../storage/rocksdb/rocksdb git submodule update +cd ../../maria/libmarias3 +git submodule update cd ../../.." fi commands="$commands diff --git a/BUILD/SETUP.sh b/BUILD/SETUP.sh index 6cf2a18404c..79290f20015 100755 --- a/BUILD/SETUP.sh +++ b/BUILD/SETUP.sh @@ -194,7 +194,7 @@ base_configs="--prefix=$prefix --enable-assembler " base_configs="$base_configs --with-extra-charsets=complex " base_configs="$base_configs --enable-thread-safe-client " base_configs="$base_configs --with-big-tables $maintainer_mode" -base_configs="$base_configs --with-plugin-aria --with-aria-tmp-tables" +base_configs="$base_configs --with-plugin-aria --with-aria-tmp-tables --with-plugin-s3=STATIC" # Following is to get tokudb to work base_configs="$base_configs --with-jemalloc=NO" diff --git a/include/aria_backup.h b/include/aria_backup.h index 1a1c437d0b9..5cc5f43d9b6 100644 --- a/include/aria_backup.h +++ b/include/aria_backup.h @@ -23,10 +23,14 @@ typedef struct st_aria_table_capabilities ulong bitmap_pages_covered; uint block_size; uint keypage_header; + enum data_file_type data_file_type; my_bool checksum; my_bool transactional; /* This is true if the table can be copied without any locks */ my_bool online_backup_safe; + /* s3 capabilities */ + ulong s3_block_size; + uint8 compression; } ARIA_TABLE_CAPABILITIES; int aria_get_capabilities(File kfile, ARIA_TABLE_CAPABILITIES *cap); diff --git a/include/maria.h b/include/maria.h index 13783426e35..b3d4cedec57 100644 --- a/include/maria.h +++ b/include/maria.h @@ -145,9 +145,11 @@ typedef struct st_maria_create_info ulonglong auto_increment; ulonglong data_file_length; ulonglong key_file_length; + ulong s3_block_size; /* Size of null bitmap at start of row */ uint null_bytes; uint old_options; + uint compression_algorithm; enum data_file_type org_data_file_type; uint16 language; my_bool with_auto_increment, transactional; @@ -229,6 +231,7 @@ typedef struct st_maria_decode_tree /* Decode huff-table */ struct st_maria_bit_buff; +typedef struct s3_info S3_INFO; /* Note that null markers should always be first in a row ! @@ -285,7 +288,7 @@ extern my_bool maria_upgrade(void); extern int maria_close(MARIA_HA *file); extern int maria_delete(MARIA_HA *file, const uchar *buff); extern MARIA_HA *maria_open(const char *name, int mode, - uint wait_if_locked); + uint wait_if_locked, S3_INFO *s3); extern int maria_panic(enum ha_panic_function function); extern int maria_rfirst(MARIA_HA *file, uchar *buf, int inx); extern int maria_rkey(MARIA_HA *file, uchar *buf, int inx, diff --git a/include/my_base.h b/include/my_base.h index 8a8237ce8b2..40b852bc6cc 100644 --- a/include/my_base.h +++ b/include/my_base.h @@ -53,7 +53,7 @@ Allow opening even if table is incompatible as this is for ALTER TABLE which will fix the table structure. */ -#define HA_OPEN_FOR_ALTER 4096U +#define HA_OPEN_FOR_ALTER 8192U /* The following is parameter to ha_rkey() how to use key */ diff --git a/include/my_pthread.h b/include/my_pthread.h index 4d33d1abdd4..75e016169df 100644 --- a/include/my_pthread.h +++ b/include/my_pthread.h @@ -717,22 +717,34 @@ extern void my_mutex_end(void); #define INSTRUMENT_ME 0 +/* + Thread specific variables + + Aria key cache is using the following variables for keeping track of + state: + suspend, next, prev, keycache_link, keycache_file, suspend, lock_type + + MariaDB uses the following to + mutex, current_mutex, current_cond, abort +*/ + struct st_my_thread_var { int thr_errno; mysql_cond_t suspend; mysql_mutex_t mutex; + struct st_my_thread_var *next,**prev; mysql_mutex_t * volatile current_mutex; mysql_cond_t * volatile current_cond; + void *keycache_link; + void *keycache_file; + void *stack_ends_here; + safe_mutex_t *mutex_in_use; pthread_t pthread_self; my_thread_id id, dbug_id; int volatile abort; + uint lock_type; /* used by conditional release the queue */ my_bool init; - struct st_my_thread_var *next,**prev; - void *keycache_link; - uint lock_type; /* used by conditional release the queue */ - void *stack_ends_here; - safe_mutex_t *mutex_in_use; #ifndef DBUG_OFF void *dbug; char name[THREAD_NAME_SIZE+1]; diff --git a/libmysqld/libmysql.c b/libmysqld/libmysql.c index cd170b42b42..d08eaa8b28a 100644 --- a/libmysqld/libmysql.c +++ b/libmysqld/libmysql.c @@ -4920,3 +4920,114 @@ ulong STDCALL mysql_net_field_length(uchar **packet) { return net_field_length(packet); } + +/******************************************************************** + Dummy functions to avoid linking with libmarias3 / libcurl +*********************************************************************/ + +#if defined(WITH_S3_STORAGE_ENGINE) || !defined(FIX_BEFORE_RELESE) +C_MODE_START + +struct ms3_st; +typedef struct ms3_st ms3_st; +struct ms3_list_st; +typedef struct ms3_list_st ms3_list_st; +struct ms3_status_st; +typedef struct ms3_status_st ms3_status_st; +enum ms3_set_option_t +{ + SOME_OPTIONS +}; +typedef enum ms3_set_option_t ms3_set_option_t; +typedef void *(*ms3_malloc_callback)(size_t size); +typedef void (*ms3_free_callback)(void *ptr); +typedef void *(*ms3_realloc_callback)(void *ptr, size_t size); +typedef char *(*ms3_strdup_callback)(const char *str); +typedef void *(*ms3_calloc_callback)(size_t nmemb, size_t size); + + +uint8_t ms3_library_init_malloc(ms3_malloc_callback m, + ms3_free_callback f, ms3_realloc_callback r, + ms3_strdup_callback s, ms3_calloc_callback c) +{ + return 1; +} +void ms3_library_deinit(void) +{ +} + +ms3_st *ms3_init(const char *s3key, const char *s3secret, + const char *region, + const char *base_domain) +{ + return 0; +} + +uint8_t ms3_set_option(ms3_st *ms3, ms3_set_option_t option, void *value) +{ + return 0; +} + +void ms3_deinit(ms3_st *ms3) +{} + +const char *ms3_server_error(ms3_st *ms3) +{ + return 0; +} +const char *ms3_error(uint8_t errcode) +{ + return 0; +} + +uint8_t ms3_list(ms3_st *ms3, const char *bucket, const char *prefix, + ms3_list_st **list) +{ + return 0; +} + +uint8_t ms3_list_dir(ms3_st *ms3, const char *bucket, const char *prefix, + ms3_list_st **list) +{ + return 0; +} + +void ms3_list_free(ms3_list_st *list) +{} + +uint8_t ms3_put(ms3_st *ms3, const char *bucket, const char *key, + const uint8_t *data, size_t length) +{ + return 1; +} + +uint8_t ms3_get(ms3_st *ms3, const char *bucket, const char *key, + uint8_t **data, size_t *length) +{ + return 1; +} + + +void ms3_free(uint8_t *data) +{} + +uint8_t ms3_delete(ms3_st *ms3, const char *bucket, const char *key) +{ + return 1; +} + + +uint8_t ms3_status(ms3_st *ms3, const char *bucket, const char *key, + ms3_status_st *status) +{ + return 1; +} + +uint8_t ms3_move(ms3_st *ms3, const char *source_bucket, const char *source_key, + const char *dest_bucket, const char *dest_key) +{ + return 1; +} + +C_MODE_END +#endif /* WITH_S3_STORAGE_ENGINE */ diff --git a/mysql-test/include/have_s3.inc b/mysql-test/include/have_s3.inc new file mode 100644 index 00000000000..d81778cd157 --- /dev/null +++ b/mysql-test/include/have_s3.inc @@ -0,0 +1,10 @@ +if (!`SELECT count(*) FROM information_schema.engines WHERE + (support = 'YES' OR support = 'DEFAULT') AND + engine = 's3'`) +{ + skip Need s3 engine; +} +if (`select @@global.s3_secret_key = "" or @@global.s3_access_key = ""`) +{ + skip S3 engine not configured; +} diff --git a/mysql-test/main/mysqld--help.test b/mysql-test/main/mysqld--help.test index c2b6424599a..4c55c99929f 100644 --- a/mysql-test/main/mysqld--help.test +++ b/mysql-test/main/mysqld--help.test @@ -26,7 +26,7 @@ perl; collation-server character-set-server log-tc-size version.*/; # Plugins which may or may not be there: - @plugins=qw/innodb archive blackhole federated partition + @plugins=qw/innodb archive blackhole federated partition s3 feedback debug temp-pool ssl des-key-file xtradb sequence thread-concurrency super-large-pages mutex-deadlock-detector connect null-audit aria oqgraph sphinx thread-handling diff --git a/mysql-test/std_data/s3_unique_table.frm b/mysql-test/std_data/s3_unique_table.frm Binary files differnew file mode 100644 index 00000000000..23bb5215783 --- /dev/null +++ b/mysql-test/std_data/s3_unique_table.frm diff --git a/mysql-test/suite/s3/alter.result b/mysql-test/suite/s3/alter.result new file mode 100644 index 00000000000..f8faa5d4eee --- /dev/null +++ b/mysql-test/suite/s3/alter.result @@ -0,0 +1,103 @@ +drop table if exists t1,t2,t3; +# +# Test ALTER TABLE to and from s3 +# +create table t1 (a int, b int) engine=aria; +insert into t1 select seq,seq+10 from seq_1_to_1000; +alter table t1 engine=s3; +show create table t1; +Table Create Table +t1 CREATE TABLE `t1` ( + `a` int(11) DEFAULT NULL, + `b` int(11) DEFAULT NULL +) ENGINE=S3 DEFAULT CHARSET=latin1 PAGE_CHECKSUM=1 +alter table t1 comment="hello"; +show create table t1; +Table Create Table +t1 CREATE TABLE `t1` ( + `a` int(11) DEFAULT NULL, + `b` int(11) DEFAULT NULL +) ENGINE=S3 DEFAULT CHARSET=latin1 PAGE_CHECKSUM=1 COMMENT='hello' +alter table t1 engine=aria; +show create table t1; +Table Create Table +t1 CREATE TABLE `t1` ( + `a` int(11) DEFAULT NULL, + `b` int(11) DEFAULT NULL +) ENGINE=Aria DEFAULT CHARSET=latin1 PAGE_CHECKSUM=1 COMMENT='hello' +select count(*), sum(a), sum(b) from t1; +count(*) sum(a) sum(b) +1000 500500 510500 +drop table t1; +# +# Test ALTER TABLE to and from s3 with rename +# +create table t1 (a int, b int) engine=aria select seq as a,seq+10 as b from seq_1_to_10; +alter table t1 rename to t2, engine=s3; +select count(*), sum(a), sum(b) from t2; +count(*) sum(a) sum(b) +10 55 155 +show create table t2; +Table Create Table +t2 CREATE TABLE `t2` ( + `a` int(11) DEFAULT NULL, + `b` int(11) DEFAULT NULL +) ENGINE=S3 DEFAULT CHARSET=latin1 PAGE_CHECKSUM=1 +alter table t2 rename to t3, engine=aria; +show create table t3; +Table Create Table +t3 CREATE TABLE `t3` ( + `a` int(11) DEFAULT NULL, + `b` int(11) DEFAULT NULL +) ENGINE=Aria DEFAULT CHARSET=latin1 PAGE_CHECKSUM=1 +select count(*), sum(a), sum(b) from t3; +count(*) sum(a) sum(b) +10 55 155 +drop table t3; +# +# Test changing options for a s3 table +# +create table t1 (a int, b int) engine=aria select seq as a,seq+10 as b from seq_1_to_1000; +alter table t1 engine=s3; +alter table t1 engine=s3, compression_algorithm="zlib"; +show create table t1; +Table Create Table +t1 CREATE TABLE `t1` ( + `a` int(11) DEFAULT NULL, + `b` int(11) DEFAULT NULL +) ENGINE=S3 DEFAULT CHARSET=latin1 PAGE_CHECKSUM=1 `compression_algorithm`='zlib' +select count(*), sum(a), sum(b) from t1; +count(*) sum(a) sum(b) +1000 500500 510500 +drop table t1; +# +# Test ALTER TABLE for S3 +# +create table t1 (a int, b int) engine=aria select seq as a,seq+10 as b from seq_1_to_10; +alter table t1 add column c int, engine=s3; +alter table t1 add column d int; +show create table t1; +Table Create Table +t1 CREATE TABLE `t1` ( + `a` int(11) DEFAULT NULL, + `b` int(11) DEFAULT NULL, + `c` int(11) DEFAULT NULL, + `d` int(11) DEFAULT NULL +) ENGINE=S3 DEFAULT CHARSET=latin1 PAGE_CHECKSUM=1 +select count(*), sum(a), sum(b), sum(c), sum(d) from t1; +count(*) sum(a) sum(b) sum(c) sum(d) +10 55 155 NULL NULL +drop table t1; +# +# Test RENAME TABLE +# +create table t1 (a int, b int) engine=aria select seq as a, seq+10 as b from seq_1_to_10; +alter table t1 engine=s3; +rename table t1 to t3; +alter table t3 rename t2; +select count(*), sum(a), sum(b) from t2; +count(*) sum(a) sum(b) +10 55 155 +select count(*), sum(a), sum(b) from t1; +ERROR 42S02: Table 'database.t1' doesn't exist +drop table t2; diff --git a/mysql-test/suite/s3/alter.test b/mysql-test/suite/s3/alter.test new file mode 100644 index 00000000000..b14eb2cb52a --- /dev/null +++ b/mysql-test/suite/s3/alter.test @@ -0,0 +1,79 @@ +--source include/have_s3.inc +--source include/have_sequence.inc + +# +# Create unique database for running the tests +# +--source create_database.inc +--disable_warnings +drop table if exists t1,t2,t3; +--enable_warnings + +--echo # +--echo # Test ALTER TABLE to and from s3 +--echo # + +create table t1 (a int, b int) engine=aria; +insert into t1 select seq,seq+10 from seq_1_to_1000; +alter table t1 engine=s3; +show create table t1; +alter table t1 comment="hello"; +show create table t1; +alter table t1 engine=aria; +show create table t1; +select count(*), sum(a), sum(b) from t1; +drop table t1; + +--echo # +--echo # Test ALTER TABLE to and from s3 with rename +--echo # + +create table t1 (a int, b int) engine=aria select seq as a,seq+10 as b from seq_1_to_10; +alter table t1 rename to t2, engine=s3; +select count(*), sum(a), sum(b) from t2; +show create table t2; +alter table t2 rename to t3, engine=aria; +show create table t3; +select count(*), sum(a), sum(b) from t3; +drop table t3; + +--echo # +--echo # Test changing options for a s3 table +--echo # + +create table t1 (a int, b int) engine=aria select seq as a,seq+10 as b from seq_1_to_1000; +alter table t1 engine=s3; +alter table t1 engine=s3, compression_algorithm="zlib"; +show create table t1; +select count(*), sum(a), sum(b) from t1; +drop table t1; + +--echo # +--echo # Test ALTER TABLE for S3 +--echo # + +create table t1 (a int, b int) engine=aria select seq as a,seq+10 as b from seq_1_to_10; +alter table t1 add column c int, engine=s3; +alter table t1 add column d int; +show create table t1; +select count(*), sum(a), sum(b), sum(c), sum(d) from t1; +drop table t1; + +--echo # +--echo # Test RENAME TABLE +--echo # + +create table t1 (a int, b int) engine=aria select seq as a, seq+10 as b from seq_1_to_10; +alter table t1 engine=s3; +rename table t1 to t3; +alter table t3 rename t2; +select count(*), sum(a), sum(b) from t2; +--replace_result $database database +--error ER_NO_SUCH_TABLE +select count(*), sum(a), sum(b) from t1; +drop table t2; + +# +# clean up +# +--source drop_database.inc diff --git a/mysql-test/suite/s3/arguments.result b/mysql-test/suite/s3/arguments.result new file mode 100644 index 00000000000..4a371aabc9b --- /dev/null +++ b/mysql-test/suite/s3/arguments.result @@ -0,0 +1,58 @@ +drop table if exists t1; +# +# Test options +# +create or replace table t1 (a int, b int, key(a)) engine=aria; +insert into t1 select seq,seq+10 from seq_1_to_10; +alter table t1 engine=s3, s3_block_size=819200, compression_algorithm="zlib"; +show create table t1; +Table Create Table +t1 CREATE TABLE `t1` ( + `a` int(11) DEFAULT NULL, + `b` int(11) DEFAULT NULL, + KEY `a` (`a`) +) ENGINE=S3 DEFAULT CHARSET=latin1 PAGE_CHECKSUM=1 `s3_block_size`=819200 `compression_algorithm`='zlib' +alter table t1 engine=s3, s3_block_size=8192; +ERROR HY000: Incorrect value '8192' for option 's3_block_size' +alter table t1 engine=s3, s3_block_size=65536; +show create table t1; +Table Create Table +t1 CREATE TABLE `t1` ( + `a` int(11) DEFAULT NULL, + `b` int(11) DEFAULT NULL, + KEY `a` (`a`) +) ENGINE=S3 DEFAULT CHARSET=latin1 PAGE_CHECKSUM=1 `compression_algorithm`='zlib' `s3_block_size`=65536 +alter table t1 engine=s3, s3_block_size=100000; +ERROR HY000: Incorrect value '100000' for option 's3_block_size' +show create table t1; +Table Create Table +t1 CREATE TABLE `t1` ( + `a` int(11) DEFAULT NULL, + `b` int(11) DEFAULT NULL, + KEY `a` (`a`) +) ENGINE=S3 DEFAULT CHARSET=latin1 PAGE_CHECKSUM=1 `compression_algorithm`='zlib' `s3_block_size`=65536 +alter table t1 engine=s3, compression_algorithm="wss"; +ERROR HY000: Incorrect value 'wss' for option 'compression_algorithm' +drop table t1; +# Check that key variables are not shown to the end user +show variables like "s3%key"; +Variable_name Value +s3_access_key ***** +s3_secret_key ***** +# Show some "static" s3 variables +set @tmp= @@global.s3_block_size; +show variables like "s3_block_size"; +Variable_name Value +s3_block_size 4194304 +set @@global.s3_block_size=65536; +show variables like "s3_block_size"; +Variable_name Value +s3_block_size 65536 +set @@global.s3_block_size= @tmp; +set @@s3_block_size=65536; +ERROR HY000: Variable 's3_block_size' is a GLOBAL variable and should be set with SET GLOBAL +# Check s3 variables that can't be changed by end user +set @@s3_access_key="abc"; +ERROR HY000: Variable 's3_access_key' is a read only variable +set @@s3_secret_key="abc"; +ERROR HY000: Variable 's3_secret_key' is a read only variable diff --git a/mysql-test/suite/s3/arguments.test b/mysql-test/suite/s3/arguments.test new file mode 100644 index 00000000000..76ef4c960dd --- /dev/null +++ b/mysql-test/suite/s3/arguments.test @@ -0,0 +1,54 @@ +--source include/have_s3.inc +--source include/have_sequence.inc + +# +# Create unique database for running the tests +# +--source create_database.inc +--disable_warnings +drop table if exists t1; +--enable_warnings + +--echo # +--echo # Test options +--echo # + +create or replace table t1 (a int, b int, key(a)) engine=aria; +insert into t1 select seq,seq+10 from seq_1_to_10; +alter table t1 engine=s3, s3_block_size=819200, compression_algorithm="zlib"; +show create table t1; +--error ER_BAD_OPTION_VALUE +alter table t1 engine=s3, s3_block_size=8192; +alter table t1 engine=s3, s3_block_size=65536; +show create table t1; +--error ER_BAD_OPTION_VALUE +alter table t1 engine=s3, s3_block_size=100000; +show create table t1; +--error ER_BAD_OPTION_VALUE +alter table t1 engine=s3, compression_algorithm="wss"; +drop table t1; + +--echo # Check that key variables are not shown to the end user + +show variables like "s3%key"; + +--echo # Show some "static" s3 variables +set @tmp= @@global.s3_block_size; +show variables like "s3_block_size"; +set @@global.s3_block_size=65536; +show variables like "s3_block_size"; +set @@global.s3_block_size= @tmp; +--error ER_GLOBAL_VARIABLE +set @@s3_block_size=65536; + +--echo # Check s3 variables that can't be changed by end user + +--error ER_INCORRECT_GLOBAL_LOCAL_VAR +set @@s3_access_key="abc"; +--error ER_INCORRECT_GLOBAL_LOCAL_VAR +set @@s3_secret_key="abc"; + +# +# clean up +# +--source drop_database.inc diff --git a/mysql-test/suite/s3/basic.result b/mysql-test/suite/s3/basic.result new file mode 100644 index 00000000000..b491c32d75c --- /dev/null +++ b/mysql-test/suite/s3/basic.result @@ -0,0 +1,103 @@ +drop table if exists t1; +# +# Test simple create of s3 table +# +create or replace table t1 (a int, b int, key (a)) engine=aria; +insert into t1 select seq,seq+10 from seq_1_to_10000; +alter table t1 engine=s3; +show create table t1; +Table Create Table +t1 CREATE TABLE `t1` ( + `a` int(11) DEFAULT NULL, + `b` int(11) DEFAULT NULL, + KEY `a` (`a`) +) ENGINE=S3 DEFAULT CHARSET=latin1 PAGE_CHECKSUM=1 +select * from information_schema.tables where table_schema="database" and table_name="t1";; +TABLE_CATALOG TABLE_SCHEMA TABLE_NAME TABLE_TYPE ENGINE VERSION ROW_FORMAT TABLE_ROWS AVG_ROW_LENGTH DATA_LENGTH MAX_DATA_LENGTH INDEX_LENGTH DATA_FREE AUTO_INCREMENT CREATE_TIME UPDATE_TIME CHECK_TIME TABLE_COLLATION CHECKSUM CREATE_OPTIONS TABLE_COMMENT MAX_INDEX_LENGTH TEMPORARY +def # t1 BASE TABLE S3 10 Page 10000 33 335872 # 122880 0 NULL # # # latin1_swedish_ci NULL page_checksum=1 9007199254732800 # +show table status like "t1"; +Name Engine Version Row_format Rows Avg_row_length Data_length Max_data_length Index_length Data_free Auto_increment Create_time Update_time Check_time Collation Checksum Create_options Comment Max_index_length Temporary +t1 S3 10 Page 10000 33 335872 # 122880 0 NULL # # # latin1_swedish_ci NULL page_checksum=1 # N +select * from t1 limit 10; +a b +1 11 +2 12 +3 13 +4 14 +5 15 +6 16 +7 17 +8 18 +9 19 +10 20 +select count(*) from t1; +count(*) +10000 +select * from t1 where a between 10 and 20; +a b +10 20 +11 21 +12 22 +13 23 +14 24 +15 25 +16 26 +17 27 +18 28 +19 29 +20 30 +explain select * from t1 where a between 10 and 20; +id select_type table type possible_keys key key_len ref rows Extra +1 SIMPLE t1 range a a 5 NULL # Using index condition +insert into t1 values (1,1); +ERROR HY000: Table 't1' is read only +update t1 set b=100 where a=1; +ERROR HY000: Table 't1' is read only +delete from t1 where a>10; +ERROR HY000: Table 't1' is read only +alter table t1 engine=aria; +show create table t1; +Table Create Table +t1 CREATE TABLE `t1` ( + `a` int(11) DEFAULT NULL, + `b` int(11) DEFAULT NULL, + KEY `a` (`a`) +) ENGINE=Aria DEFAULT CHARSET=latin1 PAGE_CHECKSUM=1 +select * from t1 limit 10; +a b +1 11 +2 12 +3 13 +4 14 +5 15 +6 16 +7 17 +8 18 +9 19 +10 20 +select count(*) from t1; +count(*) +10000 +delete from t1 where a=1; +drop table t1; +# +# status +# +show variables like "s3%"; +Variable_name Value +s3_access_key X +s3_block_size X +s3_bucket X +s3_pagecache_age_threshold X +s3_pagecache_buffer_size X +s3_pagecache_division_limit X +s3_pagecache_file_hash_size X +s3_region X +s3_secret_key X +show status like "s3%"; +Variable_name Value +S3_pagecache_blocks_not_flushed X +S3_pagecache_blocks_unused X +S3_pagecache_blocks_used X +S3_pagecache_read_requests X +S3_pagecache_reads X diff --git a/mysql-test/suite/s3/basic.test b/mysql-test/suite/s3/basic.test new file mode 100644 index 00000000000..f3f53a55a1c --- /dev/null +++ b/mysql-test/suite/s3/basic.test @@ -0,0 +1,55 @@ +--source include/have_s3.inc +--source include/have_sequence.inc + +# +# Create unique database for running the tests +# +--source create_database.inc +--disable_warnings +drop table if exists t1; +--enable_warnings + +--echo # +--echo # Test simple create of s3 table +--echo # + +create or replace table t1 (a int, b int, key (a)) engine=aria; +insert into t1 select seq,seq+10 from seq_1_to_10000; +alter table t1 engine=s3; +show create table t1; + +--replace_column 2 # 11 # 15 # 16 # 17 # 23 # +--replace_result $database database +--eval select * from information_schema.tables where table_schema="$database" and table_name="t1"; +--replace_column 8 # 12 # 13 # 14 # 19 # +show table status like "t1"; +select * from t1 limit 10; +select count(*) from t1; +select * from t1 where a between 10 and 20; +--replace_column 9 # +explain select * from t1 where a between 10 and 20; +--error ER_OPEN_AS_READONLY +insert into t1 values (1,1); +--error ER_OPEN_AS_READONLY +update t1 set b=100 where a=1; +--error ER_OPEN_AS_READONLY +delete from t1 where a>10; +alter table t1 engine=aria; +show create table t1; +select * from t1 limit 10; +select count(*) from t1; +delete from t1 where a=1; +drop table t1; + +--echo # +--echo # status +--echo # + +--replace_column 2 X +show variables like "s3%"; +--replace_column 2 X +show status like "s3%"; +# +# clean up +# +--source drop_database.inc diff --git a/mysql-test/suite/s3/create_database.inc b/mysql-test/suite/s3/create_database.inc new file mode 100644 index 00000000000..880cdd3a8d5 --- /dev/null +++ b/mysql-test/suite/s3/create_database.inc @@ -0,0 +1,10 @@ +# +# Create unique database to not conflict with concurrently running tests as +# the s3 database is shared +# + +let $database=`select concat("s3_test_",replace(uuid(),"-",""))`; +--disable_query_log +--eval create database $database; +--eval use $database; +--enable_query_log diff --git a/mysql-test/suite/s3/discovery.result b/mysql-test/suite/s3/discovery.result new file mode 100644 index 00000000000..abc97867e89 --- /dev/null +++ b/mysql-test/suite/s3/discovery.result @@ -0,0 +1,57 @@ +drop table if exists t1,t2; +# +# Test discovery of s3 +# +create table t1 (a int, b int) engine=aria select seq as a, seq+10 as b from seq_1_to_10; +alter table t1 engine=s3; +# +# Check discovery by select +# +flush tables; +select * from t1 limit 1; +a b +1 11 +# +# Check if changes to .frm is copied to S3 +# +alter table t1 change column b c int not null; +flush tables; +select * from t1 limit 1; +a c +1 11 +# +# Check if SHOW TABLES finds the S3 tables +# +create table t2 (a int, b int) engine=aria select seq as a, seq+10 as b from seq_1_to_10; +alter table t2 engine=s3; +flush tables; +SHOW TABLES; +Tables_in_database +t1 +t2 +drop table t2; +# +# Check if DROP TABLE works with discovery +# +select count(*) from t1; +count(*) +10 +flush tables; +drop table t1; +select count(*), sum(a) from t1; +ERROR 42S02: Table 'database.t1' doesn't exist +# +# Check if S3 detects that the .frm is too old +# +create table t1 (a int, b int) engine=aria select seq as a, seq+10 as b from seq_1_to_10; +alter table t1 engine=s3; +alter table t1 add column c int, engine=s3; +flush tables; +select * from t1 limit 1; +a b c +1 11 NULL +flush tables; +select * from t1 limit 1; +a b c +1 11 NULL +drop table t1; diff --git a/mysql-test/suite/s3/discovery.test b/mysql-test/suite/s3/discovery.test new file mode 100644 index 00000000000..b85776acac5 --- /dev/null +++ b/mysql-test/suite/s3/discovery.test @@ -0,0 +1,84 @@ +--source include/have_s3.inc +--source include/have_sequence.inc + +# +# Create unique database for running the tests +# +--source create_database.inc +--disable_warnings +drop table if exists t1,t2; +--enable_warnings + +let $datadir=`select @@datadir`; + +--echo # +--echo # Test discovery of s3 +--echo # + +create table t1 (a int, b int) engine=aria select seq as a, seq+10 as b from seq_1_to_10; +alter table t1 engine=s3; + +--echo # +--echo # Check discovery by select +--echo # + +--remove_file $datadir/$database/t1.frm +flush tables; +select * from t1 limit 1; + +--echo # +--echo # Check if changes to .frm is copied to S3 +--echo # + +alter table t1 change column b c int not null; +flush tables; +--remove_file $datadir/$database/t1.frm +select * from t1 limit 1; + +--echo # +--echo # Check if SHOW TABLES finds the S3 tables +--echo # + +create table t2 (a int, b int) engine=aria select seq as a, seq+10 as b from seq_1_to_10; +alter table t2 engine=s3; + +flush tables; +--remove_file $datadir/$database/t1.frm +--replace_result $database database +SHOW TABLES; +drop table t2; + +--echo # +--echo # Check if DROP TABLE works with discovery +--echo # + +select count(*) from t1; +flush tables; +--remove_file $datadir/$database/t1.frm +drop table t1; +--replace_result $database database +--error ER_NO_SUCH_TABLE +select count(*), sum(a) from t1; + +--echo # +--echo # Check if S3 detects that the .frm is too old +--echo # + +create table t1 (a int, b int) engine=aria select seq as a, seq+10 as b from seq_1_to_10; +alter table t1 engine=s3; +--copy_file $datadir/$database/t1.frm $datadir/$database/t1.frm-old +alter table t1 add column c int, engine=s3; +flush tables; +--remove_file $datadir/$database/t1.frm +--copy_file $datadir/$database/t1.frm-old $datadir/$database/t1.frm +--remove_file $datadir/$database/t1.frm-old +select * from t1 limit 1; +flush tables; +--remove_file $datadir/$database/t1.frm +select * from t1 limit 1; +drop table t1; + +# +# clean up +# +--source drop_database.inc diff --git a/mysql-test/suite/s3/drop_database.inc b/mysql-test/suite/s3/drop_database.inc new file mode 100644 index 00000000000..a5425f4ed47 --- /dev/null +++ b/mysql-test/suite/s3/drop_database.inc @@ -0,0 +1,9 @@ + +# +# Drop database created by the s3 tests +# + +--disable_query_log +use test; +--eval drop database $database; +--enable_query_log diff --git a/mysql-test/suite/s3/my.cnf b/mysql-test/suite/s3/my.cnf new file mode 100644 index 00000000000..d4e748dc488 --- /dev/null +++ b/mysql-test/suite/s3/my.cnf @@ -0,0 +1,5 @@ +!include include/default_mysqld.cnf +!include include/default_client.cnf + +[mysqld.1] +s3=ON diff --git a/mysql-test/suite/s3/no_s3.result b/mysql-test/suite/s3/no_s3.result new file mode 100644 index 00000000000..89ab3ea97a1 --- /dev/null +++ b/mysql-test/suite/s3/no_s3.result @@ -0,0 +1,13 @@ +create table t1 (a int, b int) engine=aria select seq,seq+10 from seq_1_to_2; +alter table t1 engine=s3; +ERROR HY000: Can't create table `test`.`t1` (errno: 138 "Unsupported extension used for table") +drop table t1; +select * from s3_unique_table; +ERROR 42000: Table 's3_unique_table' uses an extension that doesn't exist in this MariaDB version +truncate table s3_unique_table; +ERROR 42000: Table 's3_unique_table' uses an extension that doesn't exist in this MariaDB version +rename table s3_unique_table to t1; +ERROR HY000: Error on rename of './test/s3_unique_table' to './test/t1' (errno: 138 "Unsupported extension used for table") +drop table s3_unique_table; +Warnings: +Warning 1112 Table 's3_unique_table' uses an extension that doesn't exist in this MariaDB version diff --git a/mysql-test/suite/s3/no_s3.test b/mysql-test/suite/s3/no_s3.test new file mode 100644 index 00000000000..6c5df76bfa3 --- /dev/null +++ b/mysql-test/suite/s3/no_s3.test @@ -0,0 +1,25 @@ +--source include/have_sequence.inc + +let $datadir=`select @@datadir`; + +if (`select @@global.s3_secret_key <> "" or @@global.s3_access_key <> ""`) +{ + skip S3 engine options given (probably from command line); +} + +# +# Test what happens when we don't have s3 enabled +# +create table t1 (a int, b int) engine=aria select seq,seq+10 from seq_1_to_2; +--error ER_CANT_CREATE_TABLE +alter table t1 engine=s3; +drop table t1; + +--copy_file std_data/s3_unique_table.frm $datadir/test/s3_unique_table.frm +--error ER_UNSUPPORTED_EXTENSION +select * from s3_unique_table; +--error ER_UNSUPPORTED_EXTENSION +truncate table s3_unique_table; +--error ER_ERROR_ON_RENAME +rename table s3_unique_table to t1; +drop table s3_unique_table; diff --git a/mysql-test/suite/s3/suite.pm b/mysql-test/suite/s3/suite.pm new file mode 100644 index 00000000000..5bf1559ae97 --- /dev/null +++ b/mysql-test/suite/s3/suite.pm @@ -0,0 +1,8 @@ +package My::Suite::S3; + +@ISA = qw(My::Suite); + +return "Need S3 engine" unless $::mysqld_variables{'s3'} eq "ON"; + +bless { }; + diff --git a/mysql-test/valgrind.supp b/mysql-test/valgrind.supp index 2ad9eb7532f..cd7bf73ab10 100644 --- a/mysql-test/valgrind.supp +++ b/mysql-test/valgrind.supp @@ -1799,3 +1799,37 @@ fun:FIPS_mode_set obj:/usr/lib64/libcrypto.so* } + +# +# libmarias3 problems +# +{ + libmarias3 crypto + Memcheck:Leak + match-leak-kinds: reachable + fun:malloc + ... + obj:/usr/lib64/libcrypto.so* +} + +# +# libmarias3 problems +# +{ + libmarias3 curl + Memcheck:Leak + match-leak-kinds: reachable + fun:malloc + ... + obj:/usr/lib64/libcrypto.so* +} + +{ + libmarias3 libxml2 + Memcheck:Leak + match-leak-kinds: reachable + fun:calloc + fun:xmlGetGlobalState + ... + fun:s3_deinit_library +} diff --git a/sql/handler.cc b/sql/handler.cc index d26a5730e81..629d995d247 100644 --- a/sql/handler.cc +++ b/sql/handler.cc @@ -2572,9 +2572,10 @@ int ha_delete_table(THD *thd, handlerton *table_type, const char *path, it's not an error if the table doesn't exist in the engine. warn the user, but still report DROP being a success */ - bool intercept= error == ENOENT || error == HA_ERR_NO_SUCH_TABLE; + bool intercept= (error == ENOENT || error == HA_ERR_NO_SUCH_TABLE || + error == HA_ERR_UNSUPPORTED); - if (!intercept || generate_warning) + if ((!intercept || generate_warning) && ! thd->is_error()) { /* Fill up strucutures that print_error may need */ dummy_share.path.str= (char*) path; @@ -2587,7 +2588,10 @@ int ha_delete_table(THD *thd, handlerton *table_type, const char *path, file->print_error(error, MYF(intercept ? ME_WARNING : 0)); } if (intercept) + { + thd->clear_error(); error= 0; + } } delete file; diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc index e61a9675e65..7ceb336ab78 100644 --- a/sql/sql_parse.cc +++ b/sql/sql_parse.cc @@ -6338,6 +6338,8 @@ end_with_restore_list: case SQLCOM_CALL: DBUG_ASSERT(lex->m_sql_cmd != NULL); res= lex->m_sql_cmd->execute(thd); + DBUG_PRINT("result", ("res: %d killed: %d is_error: %d", + res, thd->killed, thd->is_error())); break; default: diff --git a/sql/table.cc b/sql/table.cc index 54854f35d0d..7641c9a023f 100644 --- a/sql/table.cc +++ b/sql/table.cc @@ -1768,7 +1768,8 @@ int TABLE_SHARE::init_from_binary_frm_image(THD *thd, bool write, name.length= str_db_type_length; plugin_ref tmp_plugin= ha_resolve_by_name(thd, &name, false); - if (tmp_plugin != NULL && !plugin_equals(tmp_plugin, se_plugin)) + if (tmp_plugin != NULL && !plugin_equals(tmp_plugin, se_plugin) && + legacy_db_type != DB_TYPE_S3) { if (se_plugin) { diff --git a/storage/maria/CMakeLists.txt b/storage/maria/CMakeLists.txt index 0ecbbae3f04..bf980e3383d 100644 --- a/storage/maria/CMakeLists.txt +++ b/storage/maria/CMakeLists.txt @@ -13,12 +13,10 @@ # along with this program; if not, write to the Free Software # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA -INCLUDE_DIRECTORIES( -${SSL_INCLUDE_DIRS} -) +INCLUDE_DIRECTORIES(${SSL_INCLUDE_DIRS}) IF(SSL_DEFINES) -SET_SOURCE_FILES_PROPERTIES(ma_crypt.c PROPERTIES COMPILE_FLAGS ${SSL_DEFINES}) + SET_SOURCE_FILES_PROPERTIES(ma_crypt.c PROPERTIES COMPILE_FLAGS ${SSL_DEFINES}) ENDIF() SET(ARIA_SOURCES ma_init.c ma_open.c ma_extra.c ma_info.c ma_rkey.c @@ -28,14 +26,14 @@ SET(ARIA_SOURCES ma_init.c ma_open.c ma_extra.c ma_info.c ma_rkey.c ma_rrnd.c ma_scan.c ma_cache.c ma_statrec.c ma_packrec.c ma_dynrec.c ma_blockrec.c ma_bitmap.c - ma_update.c ma_write.c ma_unique.c + ma_update.c ma_write.c ma_unique.c ma_delete.c ma_rprev.c ma_rfirst.c ma_rlast.c ma_rsame.c ma_rsamepos.c ma_panic.c ma_close.c ma_create.c ma_range.c ma_dbug.c ma_checksum.c ma_changed.c ma_static.c ma_delete_all.c ma_delete_table.c ma_rename.c ma_check.c - ma_keycache.c ma_preload.c ma_ft_parser.c + ma_keycache.c ma_preload.c ma_ft_parser.c ma_ft_update.c ma_ft_boolean_search.c ma_ft_nlq_search.c ft_maria.c ma_sort.c ha_maria.cc trnman.c lockman.c @@ -53,17 +51,9 @@ IF(APPLE) ADD_DEFINITIONS(-fno-common) ENDIF() -MYSQL_ADD_PLUGIN(aria ${ARIA_SOURCES} - STORAGE_ENGINE - MANDATORY - RECOMPILE_FOR_EMBEDDED) - -IF(NOT WITH_ARIA_STORAGE_ENGINE) - RETURN() -ENDIF() - -TARGET_LINK_LIBRARIES(aria myisam - mysys mysys_ssl) +MYSQL_ADD_PLUGIN(aria ${ARIA_SOURCES} STORAGE_ENGINE MANDATORY + LINK_LIBRARIES myisam mysys mysys_ssl + RECOMPILE_FOR_EMBEDDED) MYSQL_ADD_EXECUTABLE(aria_ftdump maria_ftdump.c COMPONENT Server) TARGET_LINK_LIBRARIES(aria_ftdump aria) @@ -110,3 +100,33 @@ ENDIF() OPTION(USE_ARIA_FOR_TMP_TABLES "Use Aria for temporary tables" ON) +# +# S3 +# +INCLUDE (CheckIncludeFiles) + +SET(S3_SOURCES ha_s3.cc s3_func.c + libmarias3/src/debug.c libmarias3/src/error.c libmarias3/src/marias3.c + libmarias3/src/request.c libmarias3/src/response.c) + +IF(NOT PLUGIN_S3 STREQUAL NO) + FIND_PACKAGE(LibXml2) + FIND_PACKAGE(CURL) + CHECK_INCLUDE_FILES (mhash.h HAVE_MHASH_H) +ENDIF() + +IF (LIBXML2_FOUND AND CURL_FOUND AND HAVE_MHASH_H) + MYSQL_ADD_PLUGIN(s3 ${S3_SOURCES} STORAGE_ENGINE STATIC_ONLY + LINK_LIBRARIES aria myisam mysys mysys_ssl xml2 curl mhash + RECOMPILE_FOR_EMBEDDED) +ENDIF() + +IF(TARGET s3) + MYSQL_ADD_EXECUTABLE(aria_s3_copy aria_s3_copy.cc COMPONENT Server) + TARGET_LINK_LIBRARIES(aria_s3_copy s3) + + INCLUDE_DIRECTORIES(${CMAKE_CURRENT_SOURCE_DIR}/libmarias3 ${LIBXML2_INCLUDE_DIR}) + ADD_DEFINITIONS(-DWITH_S3_STORAGE_ENGINE) + + TARGET_LINK_LIBRARIES(aria s3) +ENDIF() diff --git a/storage/maria/aria_s3_copy.cc b/storage/maria/aria_s3_copy.cc new file mode 100644 index 00000000000..e1d394f65ac --- /dev/null +++ b/storage/maria/aria_s3_copy.cc @@ -0,0 +1,315 @@ +/* Copyright (C) 2019 MariaDB corporation + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software Foundation, + Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111-1301 USA */ + +/* + Allow copying of Aria tables to and from S3 and also delete them from S3 +*/ + +#include "maria_def.h" +#include <aria_backup.h> +#include <my_getopt.h> +#include <my_check_opt.h> +#include <mysys_err.h> +#include <mysqld_error.h> +#include <zlib.h> +#include <libmarias3/marias3.h> +#include "s3_func.h" + +static const char *load_default_groups[]= { "aria_s3_copy", 0 }; +static const char *opt_s3_access_key, *opt_s3_secret_key; +static const char *opt_s3_region="eu-north-1"; +static const char *opt_database; +static const char *opt_s3_bucket="MariaDB"; +static my_bool opt_compression, opt_verbose, opt_force, opt_s3_debug; +static int opt_operation= -1; +static ulong opt_block_size; +static char **default_argv=0; +static const char *op_types[]= {"to_s3", "from_s3", "delete_from_s3", NullS}; +static TYPELIB op_typelib= {array_elements(op_types)-1,"", op_types, NULL}; +static ms3_st *global_s3_client= 0; + +static struct my_option my_long_options[] = +{ + {"help", '?', "Display this help and exit.", 0, 0, 0, GET_NO_ARG, NO_ARG, 0, + 0, 0, 0, 0, 0}, + {"s3_access_key", 'k', "AWS access key ID", + (char**) &opt_s3_access_key, (char**) &opt_s3_access_key, 0, + GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0}, + {"s3_region", 'r', "AWS region", + (char**) &opt_s3_region, (char**) &opt_s3_region, 0, + GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0}, + {"s3_secret_key", 'K', "AWS secret access key ID", + (char**) &opt_s3_secret_key, (char**) &opt_s3_secret_key, 0, + GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0}, + {"s3_bucket", 'b', "AWS prefix for tables", + (char**) &opt_s3_bucket, (char**) &opt_s3_bucket, 0, + GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0}, + {"compress", 'c', "Use compression", &opt_compression, &opt_compression, + 0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0}, + {"op", 'o', "Operation to excecute. One of 'from_s3', 'to_s3' or " + "'delete_from_s3'", + &opt_operation, &opt_operation, &op_typelib, + GET_ENUM, REQUIRED_ARG, -1, 0, 0, 0, 0, 0}, + {"database", 'd', + "Database for copied table (second prefix). " + "If not given, the directory of the table file is used", + &opt_database, &opt_database, 0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0}, + {"s3_block_size", 'B', "Block size for data/index blocks in s3", + &opt_block_size, &opt_block_size, 0, GET_ULONG, REQUIRED_ARG, + 4*1024*1024, 64*1024, 16*1024*1024, MALLOC_OVERHEAD, 1024, 0 }, + {"force", 'f', "Force copy even if target exists", + &opt_force, &opt_force, 0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0}, + {"verbose", 'v', "Write more information", &opt_verbose, &opt_verbose, + 0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0}, + {"version", 'V', "Print version and exit.", + 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0}, +#ifndef DBUG_OFF + {"debug", '#', "Output debug log. Often this is 'd:t:o,filename'.", + 0, 0, 0, GET_STR, OPT_ARG, 0, 0, 0, 0, 0, 0}, +#endif + {"s3_debug",0, "Output debug log from marias3 to stdout", + &opt_s3_debug, &opt_s3_debug, 0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0}, + { 0, 0, 0, 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0}, +}; + + +static bool get_database_from_path(char *to, size_t to_length, const char *path); + + +static void print_version(void) +{ + printf("%s Ver 1.0 for %s on %s\n", my_progname, SYSTEM_TYPE, + MACHINE_TYPE); +} + +static void usage(void) +{ + print_version(); + puts("\nThis software comes with NO WARRANTY: " + " see the PUBLIC for details.\n"); + puts("Copy an Aria table to and from s3"); + printf("Usage: %s --aws-access-key=# --aws-secret-access-key=# --aws-region # " + "--op=(from|to) [OPTIONS] tables[.MAI]\n", + my_progname_short); + print_defaults("my", load_default_groups); + puts(""); + my_print_help(my_long_options); + my_print_variables(my_long_options); +} + + +ATTRIBUTE_NORETURN static void my_exit(int exit_code) +{ + if (global_s3_client) + { + ms3_deinit(global_s3_client); + global_s3_client= 0; + } + free_defaults(default_argv); + s3_deinit_library(); + my_end(MY_CHECK_ERROR); + exit(exit_code); +} + + +static my_bool get_one_option(int optid, + const struct my_option *opt + __attribute__((unused)), + char *argument) +{ + switch (optid) { + case 'V': + print_version(); + my_exit(0); + case '?': + usage(); + my_exit(0); + case '#': + DBUG_SET_INITIAL(argument ? argument : "d:t:o,/tmp/aria_s3_copy.trace"); + break; + } + return 0; +} + + +static void get_options(register int *argc,register char ***argv) +{ + int ho_error; + + load_defaults_or_exit("my", load_default_groups, argc, argv); + default_argv= *argv; + + if ((ho_error=handle_options(argc, argv, my_long_options, get_one_option))) + my_exit(ho_error); + + if (*argc == 0) + { + usage(); + my_exit(-1); + } + + if (!opt_s3_access_key) + { + fprintf(stderr, "--aws-access-key was not given\n"); + my_exit(-1); + } + if (!opt_s3_secret_key) + { + fprintf(stderr, "--aws-secret-access-key was not given\n"); + my_exit(-1); + } + if ((int) opt_operation == -1) + { + fprintf(stderr, "You must specify an operation with --op=[from|to]\n"); + my_exit(-1); + } + if (opt_s3_debug) + ms3_debug(TRUE); + +} /* get_options */ + + +int main(int argc, char** argv) +{ + MY_INIT(argv[0]); + get_options(&argc,(char***) &argv); + + s3_init_library(); + if (!(global_s3_client= ms3_init(opt_s3_access_key, + opt_s3_secret_key, + opt_s3_region, NULL))) + { + fprintf(stderr, "Can't open connection to S3, error: %d %s", errno, + ms3_error(errno)); + my_exit(1); + } + + { + size_t block_size= opt_block_size; + ms3_set_option(global_s3_client, MS3_OPT_BUFFER_CHUNK_SIZE, &block_size); + } + + for (; *argv ; argv++) + { + char database[FN_REFLEN], table_name[FN_REFLEN], *path; + const char *db; + + path= *argv; + + fn_format(table_name, path, "", "", MY_REPLACE_DIR | MY_REPLACE_EXT); + + /* Get database from option, path or current directory */ + if (!(db= opt_database)) + { + if (get_database_from_path(database, sizeof(database), path)) + { + fprintf(stderr, "Aborting copying of %s\n", path); + my_exit(-1); + } + db= database; + } + + switch (opt_operation) { + case 0: + if (aria_copy_to_s3(global_s3_client, opt_s3_bucket, path, + db, table_name, opt_block_size, opt_compression, + opt_force, opt_verbose)) + { + fprintf(stderr, "Aborting copying of %s\n", path); + my_exit(-1); + } + break; + case 1: + if (aria_copy_from_s3(global_s3_client, opt_s3_bucket, path, + db, opt_compression, opt_force, opt_verbose)) + { + fprintf(stderr, "Aborting copying of %s\n", path); + my_exit(-1); + } + break; + case 2: + if (aria_delete_from_s3(global_s3_client, opt_s3_bucket, db, + table_name, opt_verbose)) + { + fprintf(stderr, "Aborting copying of %s\n", path); + my_exit(-1); + } + break; + } + } + my_exit(0); + return 0; +} + + +/** + Calculate database name base on path of Aria file + + @return 0 ok + @return 1 error +*/ + +static bool get_database_from_path(char *to, size_t to_length, + const char *path) +{ + S3_INFO s3; + if (!set_database_and_table_from_path(&s3, path)) + { + strmake(to, s3.database.str, MY_MIN(s3.database.length, to_length-1)); + return 0; + } + + if (my_getwd(to, to_length-1, MYF(MY_WME))) + return 1; + return get_database_from_path(to, to_length, to); +} + + +#include "ma_check_standalone.h" + +/* + Declare all symbols from libmyisam.a, to ensure that we don't have + to include the library as it pulls in ha_myisam.cc +*/ + +const char *ft_boolean_syntax= 0; +ulong ft_min_word_len=0, ft_max_word_len=0; +const HA_KEYSEG ft_keysegs[FT_SEGS]= { +{ + 0, /* charset */ + HA_FT_WLEN, /* start */ + 0, /* null_pos */ + 0, /* Bit pos */ + HA_VAR_LENGTH_PART | HA_PACK_KEY, /* flag */ + HA_FT_MAXBYTELEN, /* length */ + 63, /* language (will be overwritten +) */ + HA_KEYTYPE_VARTEXT2, /* type */ + 0, /* null_bit */ + 2, 0 /* bit_start, bit_length */ +}, +{ + 0, 0, 0, 0, HA_NO_SORT, HA_FT_WLEN, 63, HA_FT_WTYPE, 0, 0, 0 +} +}; + +struct st_mysql_ftparser ft_default_parser= +{ + MYSQL_FTPARSER_INTERFACE_VERSION, 0, 0, 0 +}; + +C_MODE_START +int is_stopword(const char *word, size_t len) { return 0; } +C_MODE_END diff --git a/storage/maria/ha_maria.cc b/storage/maria/ha_maria.cc index 30f8724aebd..cbcabda1843 100644 --- a/storage/maria/ha_maria.cc +++ b/storage/maria/ha_maria.cc @@ -286,7 +286,7 @@ static MYSQL_SYSVAR_ENUM(sync_log_dir, sync_log_dir, PLUGIN_VAR_RQCMDARG, #endif my_bool use_maria_for_temp_tables= USE_ARIA_FOR_TMP_TABLES_VAL; -static MYSQL_SYSVAR_BOOL(used_for_temp_tables, +static MYSQL_SYSVAR_BOOL(used_for_temp_tables, use_maria_for_temp_tables, PLUGIN_VAR_READONLY | PLUGIN_VAR_NOCMDOPT, "Whether temporary tables should be MyISAM or Aria", 0, 0, 1); @@ -978,7 +978,7 @@ static int maria_create_trn_for_mysql(MARIA_HA *info) DBUG_PRINT("info", ("lock_type: %d trnman_flags: %u", info->lock_type, trnman_get_flags(trn))); } - + #endif DBUG_RETURN(0); } @@ -1060,7 +1060,7 @@ ulong ha_maria::index_flags(uint inx, uint part, bool all_parts) const ulong flags; if (table_share->key_info[inx].algorithm == HA_KEY_ALG_FULLTEXT) flags= 0; - else + else if ((table_share->key_info[inx].flags & HA_SPATIAL || table_share->key_info[inx].algorithm == HA_KEY_ALG_RTREE)) { @@ -1068,7 +1068,7 @@ ulong ha_maria::index_flags(uint inx, uint part, bool all_parts) const flags= HA_READ_NEXT | HA_READ_PREV | HA_READ_RANGE | HA_READ_ORDER | HA_KEYREAD_ONLY | HA_KEY_SCAN_NOT_ROR; } - else + else { flags= HA_READ_NEXT | HA_READ_PREV | HA_READ_RANGE | HA_READ_ORDER | HA_KEYREAD_ONLY | HA_DO_INDEX_COND_PUSHDOWN; @@ -1223,7 +1223,8 @@ int ha_maria::open(const char *name, int mode, uint test_if_locked) test_if_locked|= HA_OPEN_ABORT_IF_CRASHED; } - if (!(file= maria_open(name, mode, test_if_locked | HA_OPEN_FROM_SQL_LAYER))) + if (!(file= maria_open(name, mode, test_if_locked | HA_OPEN_FROM_SQL_LAYER, + s3_open_args()))) { if (my_errno == HA_ERR_OLD_FILE) { @@ -1253,7 +1254,7 @@ int ha_maria::open(const char *name, int mode, uint test_if_locked) stand up to "when client gets ok the data is safe on disk": the record may not even be inserted). In the future, we could enable it back (as a client doing INSERT DELAYED knows the specificities; but we then should - make sure to regularly commit in the delayed_insert thread). + make sure to regularly commit in the delayed_insert thread). */ int_table_flags|= HA_CAN_INSERT_DELAYED; } @@ -1723,11 +1724,11 @@ int ha_maria::repair(THD *thd, HA_CHECK *param, bool do_optimize) error= maria_repair_by_sort(param, file, fixed_name, MY_TEST(param->testflag & T_QUICK)); } - if (error && file->create_unique_index_by_sort && + if (error && file->create_unique_index_by_sort && share->state.dupp_key != MAX_KEY) { my_errno= HA_ERR_FOUND_DUPP_KEY; - print_keydup_error(table, &table->key_info[share->state.dupp_key], + print_keydup_error(table, &table->key_info[share->state.dupp_key], MYF(0)); } } @@ -2406,6 +2407,7 @@ int ha_maria::index_read_map(uchar * buf, const uchar * key, enum ha_rkey_function find_flag) { DBUG_ASSERT(inited == INDEX); + register_handler(file); int error= maria_rkey(file, buf, active_index, key, keypart_map, find_flag); return error; } @@ -2416,13 +2418,15 @@ int ha_maria::index_read_idx_map(uchar * buf, uint index, const uchar * key, enum ha_rkey_function find_flag) { int error; + register_handler(file); + /* Use the pushed index condition if it matches the index we're scanning */ end_range= NULL; if (index == pushed_idx_cond_keyno) ma_set_index_cond_func(file, handler_index_cond_check, this); - + error= maria_rkey(file, buf, index, key, keypart_map, find_flag); - + ma_set_index_cond_func(file, NULL, 0); return error; } @@ -2433,6 +2437,7 @@ int ha_maria::index_read_last_map(uchar * buf, const uchar * key, { DBUG_ENTER("ha_maria::index_read_last_map"); DBUG_ASSERT(inited == INDEX); + register_handler(file); int error= maria_rkey(file, buf, active_index, key, keypart_map, HA_READ_PREFIX_LAST); DBUG_RETURN(error); @@ -2442,6 +2447,7 @@ int ha_maria::index_read_last_map(uchar * buf, const uchar * key, int ha_maria::index_next(uchar * buf) { DBUG_ASSERT(inited == INDEX); + register_handler(file); int error= maria_rnext(file, buf, active_index); return error; } @@ -2450,6 +2456,7 @@ int ha_maria::index_next(uchar * buf) int ha_maria::index_prev(uchar * buf) { DBUG_ASSERT(inited == INDEX); + register_handler(file); int error= maria_rprev(file, buf, active_index); return error; } @@ -2458,6 +2465,7 @@ int ha_maria::index_prev(uchar * buf) int ha_maria::index_first(uchar * buf) { DBUG_ASSERT(inited == INDEX); + register_handler(file); int error= maria_rfirst(file, buf, active_index); return error; } @@ -2466,6 +2474,7 @@ int ha_maria::index_first(uchar * buf) int ha_maria::index_last(uchar * buf) { DBUG_ASSERT(inited == INDEX); + register_handler(file); int error= maria_rlast(file, buf, active_index); return error; } @@ -2477,6 +2486,7 @@ int ha_maria::index_next_same(uchar * buf, { int error; DBUG_ASSERT(inited == INDEX); + register_handler(file); /* TODO: Delete this loop in Maria 1.5 as versioning will ensure this never happens @@ -2490,11 +2500,11 @@ int ha_maria::index_next_same(uchar * buf, int ha_maria::index_init(uint idx, bool sorted) -{ +{ active_index=idx; if (pushed_idx_cond_keyno == idx) ma_set_index_cond_func(file, handler_index_cond_check, this); - return 0; + return 0; } @@ -2504,7 +2514,7 @@ int ha_maria::index_end() ma_set_index_cond_func(file, NULL, 0); in_range_check_pushed_down= FALSE; ds_mrr.dsmrr_close(); - return 0; + return 0; } @@ -2527,13 +2537,14 @@ int ha_maria::rnd_end() int ha_maria::rnd_next(uchar *buf) { - int error= maria_scan(file, buf); - return error; + register_handler(file); + return maria_scan(file, buf); } int ha_maria::remember_rnd_pos() { + register_handler(file); return (*file->s->scan_remember_pos)(file, &remember_pos); } @@ -2541,6 +2552,7 @@ int ha_maria::remember_rnd_pos() int ha_maria::restart_rnd_next(uchar *buf) { int error; + register_handler(file); if ((error= (*file->s->scan_restore_pos)(file, remember_pos))) return error; return rnd_next(buf); @@ -2549,6 +2561,7 @@ int ha_maria::restart_rnd_next(uchar *buf) int ha_maria::rnd_pos(uchar *buf, uchar *pos) { + register_handler(file); int error= maria_rrnd(file, buf, my_get_ptr(pos, ref_length)); return error; } @@ -2608,11 +2621,13 @@ int ha_maria::info(uint flag) data_file_name= index_file_name= 0; fn_format(name_buff, file->s->open_file_name.str, "", MARIA_NAME_DEXT, MY_APPEND_EXT | MY_UNPACK_FILENAME); - if (strcmp(name_buff, maria_info.data_file_name)) - data_file_name =maria_info.data_file_name; + if (strcmp(name_buff, maria_info.data_file_name) && + maria_info.data_file_name[0]) + data_file_name= maria_info.data_file_name; fn_format(name_buff, file->s->open_file_name.str, "", MARIA_NAME_IEXT, MY_APPEND_EXT | MY_UNPACK_FILENAME); - if (strcmp(name_buff, maria_info.index_file_name)) + if (strcmp(name_buff, maria_info.index_file_name) && + maria_info.index_file_name[0]) index_file_name=maria_info.index_file_name; } if (flag & HA_STATUS_ERRKEY) @@ -3138,6 +3153,7 @@ int ha_maria::create(const char *name, TABLE *table_arg, MARIA_CREATE_INFO create_info; TABLE_SHARE *share= table_arg->s; uint options= share->db_options_in_use; + ha_table_option_struct *table_options= table_arg->s->option_struct; enum data_file_type row_type; THD *thd= current_thd; DBUG_ENTER("ha_maria::create"); @@ -3182,6 +3198,12 @@ int ha_maria::create(const char *name, TABLE *table_arg, create_info.data_file_name= ha_create_info->data_file_name; create_info.index_file_name= ha_create_info->index_file_name; create_info.language= share->table_charset->number; + if (ht != maria_hton) + { + /* S3 engine */ + create_info.s3_block_size= table_options->s3_block_size; + create_info.compression_algorithm= table_options->compression_algorithm; + } /* Table is transactional: @@ -3780,7 +3802,7 @@ my_bool ha_maria::register_query_cache_table(THD *thd, const char *table_name, } #endif -struct st_mysql_sys_var* system_variables[]= { +static struct st_mysql_sys_var *system_variables[]= { MYSQL_SYSVAR(block_size), MYSQL_SYSVAR(checkpoint_interval), MYSQL_SYSVAR(checkpoint_log_activity), @@ -3920,7 +3942,7 @@ static void update_log_file_size(MYSQL_THD thd, } -SHOW_VAR status_variables[]= { +static SHOW_VAR status_variables[]= { {"pagecache_blocks_not_flushed", (char*) &maria_pagecache_var.global_blocks_changed, SHOW_LONG}, {"pagecache_blocks_unused", (char*) &maria_pagecache_var.blocks_unused, SHOW_LONG}, {"pagecache_blocks_used", (char*) &maria_pagecache_var.blocks_used, SHOW_LONG}, @@ -3937,7 +3959,7 @@ SHOW_VAR status_variables[]= { ***************************************************************************/ int ha_maria::multi_range_read_init(RANGE_SEQ_IF *seq, void *seq_init_param, - uint n_ranges, uint mode, + uint n_ranges, uint mode, HANDLER_BUFFER *buf) { return ds_mrr.dsmrr_init(this, seq, seq_init_param, n_ranges, mode, buf); @@ -3949,7 +3971,7 @@ int ha_maria::multi_range_read_next(range_id_t *range_info) } ha_rows ha_maria::multi_range_read_info_const(uint keyno, RANGE_SEQ_IF *seq, - void *seq_init_param, + void *seq_init_param, uint n_ranges, uint *bufsz, uint *flags, Cost_estimate *cost) { @@ -3964,14 +3986,14 @@ ha_rows ha_maria::multi_range_read_info_const(uint keyno, RANGE_SEQ_IF *seq, } ha_rows ha_maria::multi_range_read_info(uint keyno, uint n_ranges, uint keys, - uint key_parts, uint *bufsz, + uint key_parts, uint *bufsz, uint *flags, Cost_estimate *cost) { ds_mrr.init(this, table); return ds_mrr.dsmrr_info(keyno, n_ranges, keys, key_parts, bufsz, flags, cost); } -int ha_maria::multi_range_read_explain_info(uint mrr_mode, char *str, +int ha_maria::multi_range_read_explain_info(uint mrr_mode, char *str, size_t size) { return ds_mrr.dsmrr_explain_info(mrr_mode, str, size); @@ -4028,6 +4050,7 @@ Item *ha_maria::idx_cond_push(uint keyno_arg, Item* idx_cond_arg) int ha_maria::find_unique_row(uchar *record, uint constrain_no) { int rc; + register_handler(file); if (file->s->state.header.uniques) { DBUG_ASSERT(file->s->state.header.uniques > constrain_no); diff --git a/storage/maria/ha_maria.h b/storage/maria/ha_maria.h index e67907039a1..e7acdac92f4 100644 --- a/storage/maria/ha_maria.h +++ b/storage/maria/ha_maria.h @@ -48,7 +48,7 @@ class ha_maria :public handler bool can_enable_indexes; /** If a transactional table is doing bulk insert with a single - UNDO_BULK_INSERT with/without repair. + UNDO_BULK_INSERT with/without repair. */ uint8 bulk_insert_single_undo; int repair(THD * thd, HA_CHECK *param, bool optimize); @@ -180,22 +180,28 @@ public: uint n_ranges, uint mode, HANDLER_BUFFER *buf); int multi_range_read_next(range_id_t *range_info); ha_rows multi_range_read_info_const(uint keyno, RANGE_SEQ_IF *seq, - void *seq_init_param, + void *seq_init_param, uint n_ranges, uint *bufsz, uint *flags, Cost_estimate *cost); ha_rows multi_range_read_info(uint keyno, uint n_ranges, uint keys, - uint key_parts, uint *bufsz, + uint key_parts, uint *bufsz, uint *flags, Cost_estimate *cost); int multi_range_read_explain_info(uint mrr_mode, char *str, size_t size); - + /* Index condition pushdown implementation */ Item *idx_cond_push(uint keyno, Item* idx_cond); int find_unique_row(uchar *record, uint unique_idx); + + /* Following functions are needed by the S3 handler */ + virtual S3_INFO *s3_open_args() { return 0; } + virtual void register_handler(MARIA_HA *file) {} + private: DsMrr_impl ds_mrr; friend ICP_RESULT index_cond_func_maria(void *arg); friend void reset_thd_trn(THD *thd); + friend class ha_s3; }; #endif /* HA_MARIA_INCLUDED */ diff --git a/storage/maria/ha_s3.cc b/storage/maria/ha_s3.cc new file mode 100644 index 00000000000..0fd2c40dc05 --- /dev/null +++ b/storage/maria/ha_s3.cc @@ -0,0 +1,729 @@ +/* Copyright (C) 2019 MariaDB Corppration AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the + Free Software Foundation, Inc. + 51 Franklin Street, Fifth Floor, Boston, MA 02111-1301 USA +*/ + +/* + Implementation of S3 storage engine. + + Storage format: + + The S3 engine is read only storage engine. The data is stored in + same format as a non transactional Aria table in BLOCK_RECORD format. + This makes it easy to cache both index and rows in the page cache. + Data and index file are split into blocks of 's3_block_size', default + 4M. + + The table and it's associated files are stored in S3 into the following + locations: + + frm file (for discovery): + aws_bucket/database/table/frm + + First index block (contains description if the Aria file): + aws_bucket/database/table/aria + + Rest of the index file: + aws_bucket/database/table/index/block_number + + Data file: + aws_bucket/database/table/data/block_number + + block_number is 6 digits decimal number, prefixed with 0 + (Can be larger than 6 numbers, the prefix is just for nice output) + + frm and base blocks are small (just the needed data). + index and blocks are of size 's3_block_size' + + If compression is used, then original block size is s3_block_size + but the stored block will be the size of the compressed block. + + Implementation: + The s3 engine inherits from the ha_maria handler + + s3 will use it's own page cache to not interfere with normal Aria + usage but also to ensure that the S3 page cache is large enough + (with a 4M s3_block_size the engine will need a large cache to work, + at least s3_block_size * 32. The default cache is 512M. +*/ + +#include "maria_def.h" +#include "sql_class.h" +#include <mysys_err.h> +#include <libmarias3/marias3.h> +#include <discover.h> +#include "ha_s3.h" +#include "s3_func.h" +#include "aria_backup.h" + +static PAGECACHE s3_pagecache; +static ulong s3_block_size; +static ulong s3_pagecache_division_limit, s3_pagecache_age_threshold; +static ulong s3_pagecache_file_hash_size; +static ulonglong s3_pagecache_buffer_size; +static char *s3_bucket, *s3_access_key=0, *s3_secret_key=0, *s3_region; +static char *s3_tmp_access_key=0, *s3_tmp_secret_key=0; +handlerton *s3_hton= 0; + + +/* Don't show access or secret keys to users if they exists */ + +static void update_access_key(MYSQL_THD thd, + struct st_mysql_sys_var *var, + void *var_ptr, const void *save) +{ + my_free(s3_access_key); + s3_access_key= 0; + /* Don't show real key to user in SHOW VARIABLES */ + if (s3_tmp_access_key[0]) + { + s3_access_key= s3_tmp_access_key; + s3_tmp_access_key= my_strdup("*****", MYF(MY_WME)); + } +} + +static void update_secret_key(MYSQL_THD thd, + struct st_mysql_sys_var *var, + void *var_ptr, const void *save) +{ + my_free(s3_secret_key); + s3_secret_key= 0; + /* Don't show real key to user in SHOW VARIABLES */ + if (s3_tmp_secret_key[0]) + { + s3_secret_key= s3_tmp_secret_key; + s3_tmp_secret_key= my_strdup("*****", MYF(MY_WME)); + } +} + +/* Define system variables for S3 */ + +static MYSQL_SYSVAR_ULONG(block_size, s3_block_size, + PLUGIN_VAR_RQCMDARG, + "Block size for S3", 0, 0, + 4*1024*1024, 65536, 16*1024*1024, 8192); + +static MYSQL_SYSVAR_ULONG(pagecache_age_threshold, + s3_pagecache_age_threshold, PLUGIN_VAR_RQCMDARG, + "This characterizes the number of hits a hot block has to be untouched " + "until it is considered aged enough to be downgraded to a warm block. " + "This specifies the percentage ratio of that number of hits to the " + "total number of blocks in the page cache.", 0, 0, + 300, 100, ~ (ulong) 0L, 100); + +static MYSQL_SYSVAR_ULONGLONG(pagecache_buffer_size, s3_pagecache_buffer_size, + PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY, + "The size of the buffer used for index blocks for S3 tables. " + "Increase this to get better index handling (for all reads and " + "multiple writes) to as much as you can afford.", 0, 0, + 128*1024*1024, 1024*1024*32, ~(ulonglong) 0, 8192); + +static MYSQL_SYSVAR_ULONG(pagecache_division_limit, + s3_pagecache_division_limit, + PLUGIN_VAR_RQCMDARG, + "The minimum percentage of warm blocks in key cache", 0, 0, + 100, 1, 100, 1); + +static MYSQL_SYSVAR_ULONG(pagecache_file_hash_size, + s3_pagecache_file_hash_size, + PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY, + "Number of hash buckets for open files. If you have a lot " + "of S3 files open you should increase this for faster flush of " + "changes. A good value is probably 1/10 of number of possible open " + "S3 files.", 0,0, 512, 32, 16384, 1); + +static MYSQL_SYSVAR_STR(bucket, s3_bucket, + PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY, + "AWS bucket", + 0, 0, "MariaDB"); +static MYSQL_SYSVAR_STR(access_key, s3_tmp_access_key, + PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY | PLUGIN_VAR_MEMALLOC, + "AWS access key", + 0, update_access_key, ""); +static MYSQL_SYSVAR_STR(secret_key, s3_tmp_secret_key, + PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY | PLUGIN_VAR_MEMALLOC, + "AWS secret key", + 0, update_secret_key, ""); +static MYSQL_SYSVAR_STR(region, s3_region, + PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY, + "AWS region", + 0, 0, ""); + + +ha_create_table_option s3_table_option_list[]= +{ + /* + one numeric option, with the default of UINT_MAX32, valid + range of values 0..UINT_MAX32, and a "block size" of 10 + (any value must be divisible by 10). + */ + HA_TOPTION_SYSVAR("s3_block_size", s3_block_size, block_size), + HA_TOPTION_ENUM("compression_algorithm", compression_algorithm, "none,zlib", + 0), + HA_TOPTION_END +}; + + +/***************************************************************************** + S3 handler code +******************************************************************************/ + +/** + Create S3 handler +*/ + + +ha_s3::ha_s3(handlerton *hton, TABLE_SHARE *table_arg) + :ha_maria(hton, table_arg), in_alter_table(0) +{ + /* Remove things that S3 doesn't support */ + int_table_flags&= ~(HA_BINLOG_ROW_CAPABLE | HA_BINLOG_STMT_CAPABLE | + HA_CAN_EXPORT); + can_enable_indexes= 0; +} + + +/** + Remember the handler to use for s3_block_read() + + @note + In the future the ms3_st objects could be stored in + a list in share. In this case we would however need a mutex + to access the next free onw. By using st_my_thread_var we + can avoid the mutex with the small cost of having to call + register handler in all handler functions that will access + the page cache +*/ + +void ha_s3::register_handler(MARIA_HA *file) +{ + struct st_my_thread_var *thread= my_thread_var; + thread->keycache_file= (void*) file; +} + + +/** + Write a row + + When generating the table as part of ALTER TABLE, writes are allowed. + When table is moved to S3, writes are not allowed. +*/ + +int ha_s3::write_row(uchar *buf) +{ + if (in_alter_table) + return ha_maria::write_row(buf); + return HA_ERR_WRONG_COMMAND; +} + +/* Return true if S3 can be used */ + +static my_bool s3_usable() +{ + return (s3_access_key != 0 && s3_secret_key != 0 && s3_region != 0 && + s3_bucket != 0); +} + + +static my_bool s3_info_init(S3_INFO *info) +{ + if (!s3_usable()) + return 1; + lex_string_set(&info->access_key, s3_access_key); + lex_string_set(&info->secret_key, s3_secret_key); + lex_string_set(&info->region, s3_region); + lex_string_set(&info->bucket, s3_bucket); + return 0; +} + +/** + Fill information in S3_INFO including paths to table and database + + Notes: + Database and table name are set even if s3 variables are not + initialized. This is needed by s3::drop_table +*/ + +static my_bool s3_info_init(S3_INFO *s3_info, const char *path, + char *database_buff, size_t database_length) +{ + set_database_and_table_from_path(s3_info, path); + /* Fix database as it's not \0 terminated */ + strmake(database_buff, s3_info->database.str, + MY_MIN(database_length, s3_info->database.length)); + s3_info->database.str= database_buff; + return s3_info_init(s3_info); +} + + +/** + Drop S3 table +*/ + +int ha_s3::delete_table(const char *name) +{ + ms3_st *s3_client; + S3_INFO s3_info; + int error; + char database[NAME_LEN+1]; + DBUG_ENTER("ha_s3::delete_table"); + + error= s3_info_init(&s3_info, name, database, sizeof(database)-1); + + /* If internal on disk temporary table, let Aria take care of it */ + if (!strncmp(s3_info.table.str, "#sql-", 5)) + DBUG_RETURN(ha_maria::delete_table(name)); + + if (error) + DBUG_RETURN(HA_ERR_UNSUPPORTED); + + if (!(s3_client= s3_open_connection(&s3_info))) + DBUG_RETURN(HA_ERR_NO_SUCH_TABLE); + error= aria_delete_from_s3(s3_client, s3_info.bucket.str, + s3_info.database.str, + s3_info.table.str,0); + ms3_deinit(s3_client); + DBUG_RETURN(error); +} + +/** + Copy an Aria table to S3 or rename a table in S3 + + The copy happens as part of the rename in ALTER TABLE when all data + is in an Aria table and we now have to copy it to S3. + + If the table is an old table already in S3, we should just rename it. +*/ + +int ha_s3::rename_table(const char *from, const char *to) +{ + S3_INFO to_s3_info, from_s3_info; + char to_name[FN_REFLEN], from_name[FN_REFLEN], frm_name[FN_REFLEN]; + ms3_st *s3_client; + MY_STAT stat_info; + int error; + DBUG_ENTER("ha_s3::rename_table"); + + if (s3_info_init(&to_s3_info, to, to_name, NAME_LEN)) + DBUG_RETURN(HA_ERR_UNSUPPORTED); + if (!(s3_client= s3_open_connection(&to_s3_info))) + DBUG_RETURN(HA_ERR_NO_SUCH_TABLE); + + /* + Check if this is a on disk table created by ALTER TABLE that should be + copied to S3. We know this is the case if the table is a temporary table + and the .MAI file for the table is on disk + */ + fn_format(frm_name, from, "", reg_ext, MYF(0)); + if (!strncmp(from + dirname_length(from), "#sql-", 5) && + my_stat(frm_name, &stat_info, MYF(0))) + { + /* + The table is a temporary table as part of ALTER TABLE. + Copy the on disk temporary Aria table to S3. + */ + error= aria_copy_to_s3(s3_client, to_s3_info.bucket.str, from, + to_s3_info.database.str, + to_s3_info.table.str, + 0, 0, 0, 0); + if (!error) + { + /* Remove original files table files, keep .frm */ + fn_format(from_name, from, "", MARIA_NAME_DEXT, + MY_APPEND_EXT|MY_UNPACK_FILENAME); + my_delete(from_name, MYF(MY_WME | ME_WARNING)); + fn_format(from_name, from, "", MARIA_NAME_IEXT, + MY_APPEND_EXT|MY_UNPACK_FILENAME); + my_delete(from_name, MYF(MY_WME | ME_WARNING)); + } + } + else + { + /* The table is an internal S3 table. Do the renames */ + s3_info_init(&from_s3_info, from, from_name, NAME_LEN); + + error= aria_rename_s3(s3_client, to_s3_info.bucket.str, + from_s3_info.database.str, + from_s3_info.table.str, + to_s3_info.database.str, + to_s3_info.table.str); + } + ms3_deinit(s3_client); + DBUG_RETURN(error); +} + + +/** + Create a s3 table. + + @notes + One can only create an s3 table as part of ALTER TABLE + The table is created as a non transactional Aria table with + BLOCK_RECORD format +*/ + +int ha_s3::create(const char *name, TABLE *table_arg, + HA_CREATE_INFO *ha_create_info) +{ + uchar *frm_ptr; + size_t frm_len; + int error; + DBUG_ENTER("ha_s3::create"); + + if (!(ha_create_info->options & HA_CREATE_TMP_ALTER)) + DBUG_RETURN(HA_ERR_WRONG_COMMAND); + + if (!s3_usable()) + DBUG_RETURN(HA_ERR_UNSUPPORTED); + + /* Force the table to a format suitable for S3 */ + ha_create_info->row_type= ROW_TYPE_PAGE; + ha_create_info->transactional= HA_CHOICE_NO; + error= ha_maria::create(name, table_arg, ha_create_info); + if (error) + DBUG_RETURN(error); + + /* Create the .frm file. Needed for ha_s3::rename_table() later */ + if (!table_arg->s->read_frm_image((const uchar**) &frm_ptr, &frm_len)) + { + table_arg->s->write_frm_image(frm_ptr, frm_len); + table_arg->s->free_frm_image(frm_ptr); + } + + DBUG_RETURN(0); +} + +/** + Open table + + + @notes + Table is read only, except if opened by ALTER as in this case we + are creating the S3 table. +*/ + +int ha_s3::open(const char *name, int mode, uint open_flags) +{ + int res; + S3_INFO s3_info; + DBUG_ENTER("ha_s3:open"); + + if (!s3_usable()) + DBUG_RETURN(HA_ERR_UNSUPPORTED); + + if (mode != O_RDONLY && !(open_flags & HA_OPEN_FOR_CREATE)) + DBUG_RETURN(EACCES); + + open_args= 0; + if (!(open_flags & HA_OPEN_FOR_CREATE)) + { + (void) s3_info_init(&s3_info); + s3_info.tabledef_version= table->s->tabledef_version; + + /* Pass the above arguments to maria_open() */ + open_args= &s3_info; + } + + if (!(res= ha_maria::open(name, mode, open_flags))) + { + if ((open_flags & HA_OPEN_FOR_CREATE)) + in_alter_table= 1; + else + { + /* + We have to modify the pagecache callbacks for the data file, + index file and for bitmap handling + */ + file->s->pagecache= &s3_pagecache; + file->dfile.big_block_size= file->s->kfile.big_block_size= + file->s->bitmap.file.big_block_size= file->s->base.s3_block_size; + file->s->kfile.head_blocks= file->s->base.keystart / file->s->block_size; + } + } + open_args= 0; + DBUG_RETURN(res); +} + + +/****************************************************************************** + Storage engine handler definitions +******************************************************************************/ + +/** + Free all resources for s3 +*/ + +static handler *s3_create_handler(handlerton *hton, + TABLE_SHARE * table, + MEM_ROOT *mem_root) +{ + return new (mem_root) ha_s3(hton, table); +} + + +static int s3_hton_panic(handlerton *hton, ha_panic_function flag) +{ + if (flag == HA_PANIC_CLOSE && s3_hton) + { + end_pagecache(&s3_pagecache, TRUE); + s3_deinit_library(); + my_free(s3_access_key); + my_free(s3_secret_key); + s3_access_key= s3_secret_key= 0; + s3_hton= 0; + } + return 0; +} + + +/** + Check if a table is in S3 as part of discovery +*/ + +static int s3_discover_table(handlerton *hton, THD* thd, TABLE_SHARE *share) +{ + S3_INFO s3_info; + S3_BLOCK block; + ms3_st *s3_client; + int error; + DBUG_ENTER("s3_discover_table"); + + if (s3_info_init(&s3_info)) + DBUG_RETURN(HA_ERR_NO_SUCH_TABLE); + if (!(s3_client= s3_open_connection(&s3_info))) + DBUG_RETURN(HA_ERR_NO_SUCH_TABLE); + + s3_info.database= share->db; + s3_info.table= share->table_name; + + if (s3_get_frm(s3_client, &s3_info, &block)) + { + s3_free(&block); + ms3_deinit(s3_client); + DBUG_RETURN(HA_ERR_NO_SUCH_TABLE); + } + error= share->init_from_binary_frm_image(thd, 1, + block.str, block.length); + s3_free(&block); + ms3_deinit(s3_client); + DBUG_RETURN((my_errno= error)); +} + + +/** + Check if a table exists + + @return 0 frm doesn't exists + @return 1 frm exists +*/ + +static int s3_discover_table_existance(handlerton *hton, const char *db, + const char *table_name) +{ + S3_INFO s3_info; + ms3_st *s3_client; + int res; + DBUG_ENTER("s3_discover_table_existance"); + + if (s3_info_init(&s3_info)) + DBUG_RETURN(0); + if (!(s3_client= s3_open_connection(&s3_info))) + DBUG_RETURN(0); + + s3_info.database.str= db; + s3_info.database.length= strlen(db); + s3_info.table.str= table_name; + s3_info.table.length= strlen(table_name); + + res= s3_frm_exists(s3_client, &s3_info); + ms3_deinit(s3_client); + DBUG_RETURN(res == 0); // Return 1 if exists +} + + +/** + Return a list of all S3 tables in a database +*/ + +static int s3_discover_table_names(handlerton *hton __attribute__((unused)), + LEX_CSTRING *db, + MY_DIR *dir __attribute__((unused)), + handlerton::discovered_list *result) +{ + char aws_path[AWS_PATH_LENGTH]; + S3_INFO s3_info; + ms3_st *s3_client; + ms3_list_st *list, *org_list= 0; + int error; + DBUG_ENTER("s3_discover_table_names"); + + if (s3_info_init(&s3_info)) + DBUG_RETURN(0); + if (!(s3_client= s3_open_connection(&s3_info))) + DBUG_RETURN(0); + + strxnmov(aws_path, sizeof(aws_path)-1, db->str, "/", NullS); + + if ((error= ms3_list_dir(s3_client, s3_info.bucket.str, aws_path, &org_list))) + goto end; + + for (list= org_list ; list ; list= list->next) + { + const char *name= list->key + db->length + 1; // Skip database and / + size_t name_length= strlen(name)-1; // Remove end / + result->add_table(name, name_length); + } + if (org_list) + ms3_list_free(org_list); +end: + ms3_deinit(s3_client); + DBUG_RETURN(0); +} + +/** + Update the .frm file in S3 +*/ + +static int s3_notify_tabledef_changed(handlerton *hton __attribute__((unused)), + LEX_CSTRING *db, LEX_CSTRING *table, + LEX_CUSTRING *frm, + LEX_CUSTRING *org_tabledef_version) +{ + char aws_path[AWS_PATH_LENGTH]; + S3_INFO s3_info; + ms3_st *s3_client; + int error= 0; + DBUG_ENTER("s3_notify_tabledef_changed"); + + if (s3_info_init(&s3_info)) + DBUG_RETURN(0); + if (!(s3_client= s3_open_connection(&s3_info))) + DBUG_RETURN(0); + + s3_info.database= *db; + s3_info.table= *table; + s3_info.tabledef_version= *org_tabledef_version; + if (s3_check_frm_version(s3_client, &s3_info)) + { + error= 1; + goto err; + } + + strxnmov(aws_path, sizeof(aws_path)-1, db->str, "/", table->str, "/frm", + NullS); + + if (s3_put_object(s3_client, s3_info.bucket.str, aws_path, (uchar*) frm->str, + frm->length, 0)) + error= 2; + +err: + ms3_deinit(s3_client); + DBUG_RETURN(error); +} + + +static int ha_s3_init(void *p) +{ + bool res; + static const char *no_exts[]= { 0 }; + DBUG_ASSERT(maria_hton); + + s3_hton= (handlerton *)p; + + /* Use Aria engine as a base */ + memcpy(s3_hton, maria_hton, sizeof(*s3_hton)); + s3_hton->db_type= DB_TYPE_S3; + s3_hton->create= s3_create_handler; + s3_hton->panic= s3_hton_panic; + s3_hton->table_options= s3_table_option_list; + s3_hton->discover_table= s3_discover_table; + s3_hton->discover_table_names= s3_discover_table_names; + s3_hton->discover_table_existence= s3_discover_table_existance; + s3_hton->notify_tabledef_changed= s3_notify_tabledef_changed; + s3_hton->tablefile_extensions= no_exts; + s3_hton->commit= 0; + s3_hton->rollback= 0; + s3_hton->checkpoint_state= 0; + s3_hton->flush_logs= 0; + s3_hton->show_status= 0; + s3_hton->prepare_for_backup= 0; + s3_hton->end_backup= 0; + s3_hton->flags= 0; + /* Copy global arguments to s3_access_key and s3_secret_key */ + update_access_key(0,0,0,0); + update_secret_key(0,0,0,0); + + if ((res= !init_pagecache(&s3_pagecache, + (size_t) s3_pagecache_buffer_size, + s3_pagecache_division_limit, + s3_pagecache_age_threshold, maria_block_size, + s3_pagecache_file_hash_size, 0))) + s3_hton= 0; + s3_pagecache.big_block_read= s3_block_read; + s3_pagecache.big_block_free= s3_free; + s3_init_library(); + return res ? HA_ERR_INITIALIZATION : 0; +} + +static SHOW_VAR status_variables[]= { + {"pagecache_blocks_not_flushed", + (char*) &s3_pagecache.global_blocks_changed, SHOW_LONG}, + {"pagecache_blocks_unused", + (char*) &s3_pagecache.blocks_unused, SHOW_LONG}, + {"pagecache_blocks_used", + (char*) &s3_pagecache.blocks_used, SHOW_LONG}, + {"pagecache_read_requests", + (char*) &s3_pagecache.global_cache_r_requests, SHOW_LONGLONG}, + {"pagecache_reads", + (char*) &s3_pagecache.global_cache_read, SHOW_LONGLONG}, + {NullS, NullS, SHOW_LONG} +}; + + +static struct st_mysql_sys_var* system_variables[]= { + MYSQL_SYSVAR(block_size), + MYSQL_SYSVAR(pagecache_age_threshold), + MYSQL_SYSVAR(pagecache_buffer_size), + MYSQL_SYSVAR(pagecache_division_limit), + MYSQL_SYSVAR(pagecache_file_hash_size), + MYSQL_SYSVAR(bucket), + MYSQL_SYSVAR(access_key), + MYSQL_SYSVAR(secret_key), + MYSQL_SYSVAR(region), + + NULL +}; + +struct st_mysql_storage_engine s3_storage_engine= +{ MYSQL_HANDLERTON_INTERFACE_VERSION }; + +maria_declare_plugin(s3) +{ + MYSQL_STORAGE_ENGINE_PLUGIN, + &s3_storage_engine, + "S3", + "MariaDB Corporation Ab", + "Read only table stored in S3. Created by running " + "ALTER TABLE table_name ENGINE=s3", + PLUGIN_LICENSE_GPL, + ha_s3_init, /* Plugin Init */ + NULL, /* Plugin Deinit */ + 0x0100, /* 1.0 */ + status_variables, /* status variables */ + system_variables, /* system variables */ + "1.0", /* string version */ + MariaDB_PLUGIN_MATURITY_ALPHA /* maturity */ +} +maria_declare_plugin_end; diff --git a/storage/maria/ha_s3.h b/storage/maria/ha_s3.h new file mode 100644 index 00000000000..701a6ea3458 --- /dev/null +++ b/storage/maria/ha_s3.h @@ -0,0 +1,70 @@ +#ifndef HA_S3_INCLUDED +#define HA_S3_INCLUDED +/* Copyright (C) 2019 MariaDB Corppration AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the + Free Software Foundation, Inc. + 51 Franklin Street, Fifth Floor, Boston, MA 02111-1301 USA +*/ + +#include "ha_maria.h" + +class ha_s3 :public ha_maria +{ + bool in_alter_table; + S3_INFO *open_args; + +public: + ha_s3(handlerton *hton, TABLE_SHARE * table_arg); + ~ha_s3() {} + + int create(const char *name, TABLE *table_arg, HA_CREATE_INFO *ha_create_info); + int open(const char *name, int mode, uint open_flags); + int write_row(uchar *buf); + int update_row(const uchar * old_data, const uchar * new_data) + { + return HA_ERR_WRONG_COMMAND; + } + int delete_row(const uchar * buf) + { + return HA_ERR_WRONG_COMMAND; + } + int check(THD * thd, HA_CHECK_OPT * check_opt) + { + return HA_ERR_WRONG_COMMAND; + } + int analyze(THD * thd, HA_CHECK_OPT * check_opt) + { + return HA_ERR_WRONG_COMMAND; + } + int repair(THD * thd, HA_CHECK_OPT * check_opt) + { + return HA_ERR_WRONG_COMMAND; + } + int preload_keys(THD * thd, HA_CHECK_OPT * check_opt) + { + return HA_ERR_WRONG_COMMAND; + } + /* + drop_table() is only used for internal temporary tables, + not applicable for s3 + */ + void drop_table(const char *name) + { + } + int delete_table(const char *name); + int rename_table(const char *from, const char *to); + S3_INFO *s3_open_args() { return open_args; } + void register_handler(MARIA_HA *file); +}; +#endif /* HA_S3_INCLUDED */ diff --git a/storage/maria/libmarias3 b/storage/maria/libmarias3 new file mode 160000 +Subproject f102fdfa26888c3859901f75f6c17178453d2eb diff --git a/storage/maria/ma_backup.c b/storage/maria/ma_backup.c index 8f20209c48a..ca9cbdc95ba 100644 --- a/storage/maria/ma_backup.c +++ b/storage/maria/ma_backup.c @@ -77,6 +77,9 @@ int aria_get_capabilities(File kfile, ARIA_TABLE_CAPABILITIES *cap) 0) + KEYPAGE_KEYID_SIZE + KEYPAGE_FLAG_SIZE + KEYPAGE_USED_SIZE); cap->block_size= share.base.block_size; + cap->data_file_type= share.state.header.data_file_type; + cap->s3_block_size= share.base.s3_block_size; + cap->compression= share.base.compression_algorithm; if (share.state.header.data_file_type == BLOCK_RECORD) { @@ -110,7 +113,6 @@ err: because maria_backup uses maria_get_capabilities() */ - static uchar *_ma_base_info_read(uchar *ptr, MARIA_BASE_INFO *base) { bmove(base->uuid, ptr, MY_UUID_SIZE); ptr+= MY_UUID_SIZE; @@ -142,14 +144,15 @@ static uchar *_ma_base_info_read(uchar *ptr, MARIA_BASE_INFO *base) base->keys= *ptr++; base->auto_key= *ptr++; base->born_transactional= *ptr++; - ptr++; + base->compression_algorithm= *ptr++; base->pack_bytes= mi_uint2korr(ptr); ptr+= 2; base->blobs= mi_uint2korr(ptr); ptr+= 2; base->max_key_block_length= mi_uint2korr(ptr); ptr+= 2; base->max_key_length= mi_uint2korr(ptr); ptr+= 2; base->extra_alloc_bytes= mi_uint2korr(ptr); ptr+= 2; base->extra_alloc_procent= *ptr++; - ptr+= 16; + base->s3_block_size= mi_uint3korr(ptr); ptr+= 3; + ptr+= 13; return ptr; } diff --git a/storage/maria/ma_blockrec.c b/storage/maria/ma_blockrec.c index e148e33b9f6..ae3c56db1f9 100644 --- a/storage/maria/ma_blockrec.c +++ b/storage/maria/ma_blockrec.c @@ -455,11 +455,14 @@ my_bool _ma_once_end_block_record(MARIA_SHARE *share) File must be synced as it is going out of the maria_open_list and so becoming unknown to Checkpoint. */ - if (share->now_transactional && - mysql_file_sync(share->bitmap.file.file, MYF(MY_WME))) - res= 1; - if (mysql_file_close(share->bitmap.file.file, MYF(MY_WME))) - res= 1; + if (!share->s3_path) + { + if (share->now_transactional && + mysql_file_sync(share->bitmap.file.file, MYF(MY_WME))) + res= 1; + if (mysql_file_close(share->bitmap.file.file, MYF(MY_WME))) + res= 1; + } /* Trivial assignment to guard against multiple invocations (May happen if file are closed but we want to keep the maria object diff --git a/storage/maria/ma_check.c b/storage/maria/ma_check.c index 998bb984452..6569d8e182e 100644 --- a/storage/maria/ma_check.c +++ b/storage/maria/ma_check.c @@ -6151,7 +6151,7 @@ int maria_recreate_table(HA_CHECK *param, MARIA_HA **org_info, char *filename) HA_OPEN_WAIT_IF_LOCKED : (param->testflag & T_DESCRIPT) ? HA_OPEN_IGNORE_IF_LOCKED : - HA_OPEN_ABORT_IF_LOCKED))); + HA_OPEN_ABORT_IF_LOCKED)), 0); if (!*org_info) { _ma_check_print_error(param, @@ -6532,7 +6532,7 @@ static my_bool create_new_data_handle(MARIA_SORT_PARAM *param, File new_file) if (!(sort_info->new_info= maria_open(info->s->open_file_name.str, O_RDWR, HA_OPEN_COPY | HA_OPEN_FOR_REPAIR | - HA_OPEN_INTERNAL_TABLE))) + HA_OPEN_INTERNAL_TABLE, 0))) DBUG_RETURN(1); new_info= sort_info->new_info; diff --git a/storage/maria/ma_close.c b/storage/maria/ma_close.c index 08bb7cee138..593c60382fa 100644 --- a/storage/maria/ma_close.c +++ b/storage/maria/ma_close.c @@ -22,6 +22,9 @@ #include "maria_def.h" #include "ma_crypt.h" +#ifdef WITH_S3_STORAGE_ENGINE +#include "s3_func.h" +#endif /* WITH_S3_STORAGE_ENGINE */ int maria_close(register MARIA_HA *info) { @@ -154,9 +157,10 @@ int maria_close(register MARIA_HA *info) File must be synced as it is going out of the maria_open_list and so becoming unknown to future Checkpoints. */ - if (share->now_transactional && mysql_file_sync(share->kfile.file, MYF(MY_WME))) + if (share->now_transactional && + mysql_file_sync(share->kfile.file, MYF(MY_WME))) error= my_errno; - if (mysql_file_close(share->kfile.file, MYF(0))) + if (!share->s3_path && mysql_file_close(share->kfile.file, MYF(0))) error= my_errno; } thr_lock_delete(&share->lock); @@ -233,6 +237,7 @@ int maria_close(register MARIA_HA *info) if (share_can_be_freed) { ma_crypt_free(share); + my_free(share->s3_path); (void) mysql_mutex_destroy(&share->intern_lock); (void) mysql_mutex_destroy(&share->close_lock); (void) mysql_cond_destroy(&share->key_del_cond); @@ -244,7 +249,7 @@ int maria_close(register MARIA_HA *info) */ } my_free(info->ftparser_param); - if (info->dfile.file >= 0) + if (info->dfile.file >= 0 && ! info->s3) { /* This is outside of mutex so would confuse a concurrent @@ -255,6 +260,10 @@ int maria_close(register MARIA_HA *info) } delete_dynamic(&info->pinned_pages); +#ifdef WITH_S3_STORAGE_ENGINE + if (info->s3) + ms3_deinit(info->s3); +#endif /* WITH_S3_STORAGE_ENGINE */ my_free(info); if (error) diff --git a/storage/maria/ma_create.c b/storage/maria/ma_create.c index 24aa892d212..8d374d4f89b 100644 --- a/storage/maria/ma_create.c +++ b/storage/maria/ma_create.c @@ -328,6 +328,8 @@ int maria_create(const char *name, enum data_file_type datafile_type, share.base.born_transactional= ci->transactional; share.base.max_field_lengths= max_field_lengths; share.base.field_offsets= 0; /* for future */ + share.base.compression_algorithm= ci->compression_algorithm; + share.base.s3_block_size= ci->s3_block_size; if (flags & HA_CREATE_CHECKSUM || (options & HA_OPTION_CHECKSUM)) { diff --git a/storage/maria/ma_delete_table.c b/storage/maria/ma_delete_table.c index 067ab280fdc..0c06091281e 100644 --- a/storage/maria/ma_delete_table.c +++ b/storage/maria/ma_delete_table.c @@ -41,7 +41,7 @@ int maria_delete_table(const char *name) Unfortunately it is necessary to open the table just to check this. We use 'open_for_repair' to be able to open even a crashed table. */ - if (!(info= maria_open(name, O_RDONLY, HA_OPEN_FOR_REPAIR))) + if (!(info= maria_open(name, O_RDONLY, HA_OPEN_FOR_REPAIR, 0))) { sync_dir= 0; } diff --git a/storage/maria/ma_open.c b/storage/maria/ma_open.c index 670588e4dd2..33a96a0f77c 100644 --- a/storage/maria/ma_open.c +++ b/storage/maria/ma_open.c @@ -23,6 +23,7 @@ #include "ma_trnman.h" #include <m_ctype.h> #include "ma_crypt.h" +#include "s3_func.h" #if defined(MSDOS) || defined(__WIN__) #ifdef __WIN__ @@ -91,7 +92,8 @@ MARIA_HA *_ma_test_if_reopen(const char *filename) static MARIA_HA *maria_clone_internal(MARIA_SHARE *share, int mode, File data_file, - uint internal_table) + uint internal_table, + struct ms3_st *s3) { int save_errno; uint errpos; @@ -129,6 +131,7 @@ static MARIA_HA *maria_clone_internal(MARIA_SHARE *share, goto err; errpos= 6; + info.s3= s3; memcpy(info.blobs,share->blobs,sizeof(MARIA_BLOB)*share->base.blobs); info.lastkey_buff2= info.lastkey_buff + share->base.max_key_length; info.last_key.data= info.lastkey_buff; @@ -237,6 +240,7 @@ err: case 6: (*share->end)(&info); delete_dynamic(&info.pinned_pages); + my_free(m_info->s3); my_free(m_info); /* fall through */ case 5: @@ -258,9 +262,10 @@ err: have an open count of 0. ******************************************************************************/ -MARIA_HA *maria_open(const char *name, int mode, uint open_flags) +MARIA_HA *maria_open(const char *name, int mode, uint open_flags, + S3_INFO *s3) { - int kfile,open_mode,save_errno; + int open_mode,save_errno; uint i,j,len,errpos,head_length,base_pos,keys, realpath_err, key_parts,base_key_parts,unique_key_parts,fulltext_keys,uniques; uint internal_table= MY_TEST(open_flags & HA_OPEN_INTERNAL_TABLE); @@ -276,28 +281,49 @@ MARIA_HA *maria_open(const char *name, int mode, uint open_flags) my_off_t key_root[HA_MAX_POSSIBLE_KEY]; ulonglong max_key_file_length, max_data_file_length; my_bool versioning= 1; - File data_file= -1; + File data_file= -1, kfile= -1; + struct ms3_st *s3_client= 0; + S3_INFO *share_s3= 0; + S3_BLOCK index_header; DBUG_ENTER("maria_open"); - kfile= -1; errpos= 0; head_length=sizeof(share_buff.state.header); bzero((uchar*) &info,sizeof(info)); + bzero((uchar*) &index_header, sizeof(index_header)); - realpath_err= my_realpath(name_buff, fn_format(org_name, name, "", - MARIA_NAME_IEXT, - MY_UNPACK_FILENAME),MYF(0)); - if (realpath_err > 0) /* File not found, no point in looking further. */ +#ifndef WITH_S3_STORAGE_ENGINE + DBUG_ASSERT(!s3); +#endif /* WITH_S3_STORAGE_ENGINE */ + + if (!s3) { - DBUG_RETURN(NULL); - } + realpath_err= my_realpath(name_buff, fn_format(org_name, name, "", + MARIA_NAME_IEXT, + MY_UNPACK_FILENAME),MYF(0)); + if (realpath_err > 0) /* File not found, no point in looking further. */ + { + DBUG_RETURN(NULL); + } - if (my_is_symlink(org_name) && - (realpath_err || mysys_test_invalid_symlink(name_buff))) + if (my_is_symlink(org_name) && + (realpath_err || mysys_test_invalid_symlink(name_buff))) + { + my_errno= HA_WRONG_CREATE_OPTION; + DBUG_RETURN(0); + } + } +#ifdef WITH_S3_STORAGE_ENGINE + else { - my_errno= HA_WRONG_CREATE_OPTION; - DBUG_RETURN(0); + strmake(name_buff, name, sizeof(name_buff)-1); /* test_if_reopen() */ + if (!(s3_client= s3_open_connection(s3))) + { + internal_table= 1; /* Avoid unlock on error */ + goto err; + } } +#endif /* WITH_S3_STORAGE_ENGINE */ old_info= 0; if (!internal_table) @@ -312,32 +338,70 @@ MARIA_HA *maria_open(const char *name, int mode, uint open_flags) (uint) strlen(name_buff), maria_pagecache); - DBUG_EXECUTE_IF("maria_pretend_crashed_table_on_open", - if (strstr(name, "/t1")) - { - my_errno= HA_ERR_CRASHED; - goto err; - }); - DEBUG_SYNC_C("mi_open_kfile"); - if ((kfile=mysql_file_open(key_file_kfile, name_buff, - (open_mode=O_RDWR) | O_SHARE | O_NOFOLLOW | O_CLOEXEC, - MYF(MY_NOSYMLINKS))) < 0) + if (!s3) { - if ((errno != EROFS && errno != EACCES) || - mode != O_RDONLY || - (kfile=mysql_file_open(key_file_kfile, name_buff, - (open_mode=O_RDONLY) | O_SHARE | O_NOFOLLOW | O_CLOEXEC, + DBUG_EXECUTE_IF("maria_pretend_crashed_table_on_open", + if (strstr(name, "/t1")) + { + my_errno= HA_ERR_CRASHED; + goto err; + }); + DEBUG_SYNC_C("mi_open_kfile"); + if ((kfile=mysql_file_open(key_file_kfile, name_buff, + (open_mode=O_RDWR) | O_SHARE | O_NOFOLLOW | O_CLOEXEC, MYF(MY_NOSYMLINKS))) < 0) - goto err; + { + if ((errno != EROFS && errno != EACCES) || + mode != O_RDONLY || + (kfile=mysql_file_open(key_file_kfile, name_buff, + (open_mode=O_RDONLY) | O_SHARE | O_NOFOLLOW | O_CLOEXEC, + MYF(MY_NOSYMLINKS))) < 0) + goto err; + } + errpos= 1; + if (mysql_file_pread(kfile,share->state.header.file_version, head_length, + 0, MYF(MY_NABP))) + { + my_errno= HA_ERR_NOT_A_TABLE; + goto err; + } } - share->mode=open_mode; - errpos= 1; - if (mysql_file_pread(kfile,share->state.header.file_version, head_length, - 0, MYF(MY_NABP))) +#ifdef WITH_S3_STORAGE_ENGINE + else { - my_errno= HA_ERR_NOT_A_TABLE; - goto err; + errpos= 1; + if (set_database_and_table_from_path(s3, name_buff)) + { + my_printf_error(HA_ERR_NO_SUCH_TABLE, + "Can't find database and path from %s", MYF(0), + name_buff); + my_errno= HA_ERR_NO_SUCH_TABLE; + goto err; + } + if (!(share_s3= share->s3_path= s3_info_copy(s3))) + goto err; /* EiOM */ + + /* Check if table has changed in S3 */ + if (s3_check_frm_version(s3_client, share_s3) == 1) + { + my_errno= HA_ERR_TABLE_DEF_CHANGED; + goto err; + } + + if (read_index_header(s3_client, share_s3, &index_header)) + goto err; + if (index_header.length < head_length) + { + my_errno=HA_ERR_NOT_A_TABLE; + goto err; + } + memcpy(share->state.header.file_version, index_header.str, + head_length); + kfile= s3_unique_file_number(); } +#endif /* WITH_S3_STORAGE_ENGINE */ + + share->mode=open_mode; if (memcmp(share->state.header.file_version, maria_file_magic, 4)) { DBUG_PRINT("error",("Wrong header in %s",name_buff)); @@ -366,23 +430,31 @@ MARIA_HA *maria_open(const char *name, int mode, uint open_flags) my_errno= HA_ERR_UNSUPPORTED; goto err; } - /* Don't call realpath() if the name can't be a link */ - if (!strcmp(name_buff, org_name) || - my_readlink(index_name, org_name, MYF(0)) == -1) - (void) strmov(index_name, org_name); - *strrchr(org_name, FN_EXTCHAR)= '\0'; - (void) fn_format(data_name,org_name,"",MARIA_NAME_DEXT, - MY_APPEND_EXT|MY_UNPACK_FILENAME); - if (my_is_symlink(data_name)) + if (!s3) { - if (my_realpath(data_name, data_name, MYF(0))) - goto err; - if (mysys_test_invalid_symlink(data_name)) + /* Don't call realpath() if the name can't be a link */ + if (!strcmp(name_buff, org_name) || + my_readlink(index_name, org_name, MYF(0)) == -1) + (void) strmov(index_name, org_name); + *strrchr(org_name, FN_EXTCHAR)= '\0'; + (void) fn_format(data_name,org_name,"",MARIA_NAME_DEXT, + MY_APPEND_EXT|MY_UNPACK_FILENAME); + if (my_is_symlink(data_name)) { - my_errno= HA_WRONG_CREATE_OPTION; - goto err; + if (my_realpath(data_name, data_name, MYF(0))) + goto err; + if (mysys_test_invalid_symlink(data_name)) + { + my_errno= HA_WRONG_CREATE_OPTION; + goto err; + } + share->mode|= O_NOFOLLOW; /* all symlinks are resolved by realpath() */ } - share->mode|= O_NOFOLLOW; /* all symlinks are resolved by realpath() */ + } + else + { + /* Don't show DIRECTORY in show create table */ + index_name[0]= data_name[0]= 0; } info_length=mi_uint2korr(share->state.header.header_length); @@ -400,11 +472,26 @@ MARIA_HA *maria_open(const char *name, int mode, uint open_flags) end_pos=disk_cache+info_length; errpos= 3; - if (mysql_file_pread(kfile, disk_cache, info_length, 0L, MYF(MY_NABP))) + if (!s3) { - _ma_set_fatal_error(share, HA_ERR_CRASHED); - goto err; + if (mysql_file_pread(kfile, disk_cache, info_length, 0L, MYF(MY_NABP))) + { + _ma_set_fatal_error(share, HA_ERR_CRASHED); + goto err; + } } +#ifdef WITH_S3_STORAGE_ENGINE + else + { + if (index_header.length < info_length) + { + my_errno=HA_ERR_NOT_A_TABLE; + goto err; + } + memcpy(disk_cache, index_header.str, info_length); + } +#endif /* WITH_S3_STORAGE_ENGINE */ + len=mi_uint2korr(share->state.header.state_info_length); keys= (uint) share->state.header.keys; uniques= (uint) share->state.header.uniques; @@ -870,9 +957,16 @@ MARIA_HA *maria_open(const char *name, int mode, uint open_flags) if ((share->data_file_type == BLOCK_RECORD || share->data_file_type == COMPRESSED_RECORD)) { - if (_ma_open_datafile(&info, share)) - goto err; - data_file= info.dfile.file; + if (!s3) + { + if (_ma_open_datafile(&info, share)) + goto err; + data_file= info.dfile.file; + } +#ifdef WITH_S3_STORAGE_ENGINE + else + data_file= info.dfile.file= s3_unique_file_number(); +#endif /* WITH_S3_STORAGE_ENGINE */ } errpos= 5; @@ -914,6 +1008,7 @@ MARIA_HA *maria_open(const char *name, int mode, uint open_flags) max_data_file_length= share->base.max_data_file_length; if ((*share->once_init)(share, info.dfile.file)) goto err; + errpos= 6; if (internal_table) set_if_smaller(share->base.max_data_file_length, max_data_file_length); @@ -1042,6 +1137,13 @@ MARIA_HA *maria_open(const char *name, int mode, uint open_flags) info.s= share; maria_extra(&info, HA_EXTRA_MMAP, 0); } +#ifdef WITH_S3_STORAGE_ENGINE + if (s3_client) + { + size_t block_size= share->base.s3_block_size; + ms3_set_option(s3_client, MS3_OPT_BUFFER_CHUNK_SIZE, &block_size); + } +#endif /* WITH_S3_STORAGE_ENGINE */ } else { @@ -1050,8 +1152,13 @@ MARIA_HA *maria_open(const char *name, int mode, uint open_flags) data_file= share->bitmap.file.file; /* Only opened once */ } +#ifdef WITH_S3_STORAGE_ENGINE + if (index_header.alloc_ptr) + s3_free(&index_header); +#endif /* WITH_S3_STORAGE_ENGINE */ + if (!(m_info= maria_clone_internal(share, mode, data_file, - internal_table))) + internal_table, s3_client))) goto err; if (maria_is_crashed(m_info)) @@ -1078,12 +1185,16 @@ err: _ma_report_error(save_errno, &tmp_name); } switch (errpos) { + case 6: + /* Avoid mutex test in _ma_bitmap_end() */ + share->internal_table= 1; + (*share->once_end)(share); + /* fall through */ case 5: - if (data_file >= 0) + if (data_file >= 0 && !s3_client) mysql_file_close(data_file, MYF(0)); if (old_info) break; /* Don't remove open table */ - (*share->once_end)(share); /* fall through */ case 4: ma_crypt_free(share); @@ -1094,12 +1205,20 @@ err: my_free(share_buff.state.rec_per_key_part); /* fall through */ case 1: - mysql_file_close(kfile,MYF(0)); + if (!s3) + mysql_file_close(kfile,MYF(0)); + my_free(share_s3); /* fall through */ case 0: default: break; } +#ifdef WITH_S3_STORAGE_ENGINE + if (s3_client) + ms3_deinit(s3_client); + if (index_header.alloc_ptr) + s3_free(&index_header); +#endif /* WITH_S3_STORAGE_ENGINE */ if (!internal_table) mysql_mutex_unlock(&THR_LOCK_maria); my_errno= save_errno; @@ -1633,14 +1752,15 @@ uint _ma_base_info_write(File file, MARIA_BASE_INFO *base) *ptr++= base->keys; *ptr++= base->auto_key; *ptr++= base->born_transactional; - *ptr++= 0; /* Reserved */ + *ptr++= base->compression_algorithm; mi_int2store(ptr,base->pack_bytes); ptr+= 2; mi_int2store(ptr,base->blobs); ptr+= 2; mi_int2store(ptr,base->max_key_block_length); ptr+= 2; mi_int2store(ptr,base->max_key_length); ptr+= 2; mi_int2store(ptr,base->extra_alloc_bytes); ptr+= 2; *ptr++= base->extra_alloc_procent; - bzero(ptr,16); ptr+= 16; /* extra */ + mi_int3store(ptr, base->s3_block_size); ptr+= 3; + bzero(ptr,13); ptr+= 13; /* extra */ DBUG_ASSERT((ptr - buff) == MARIA_BASE_INFO_SIZE); return mysql_file_write(file, buff, (size_t) (ptr-buff), MYF(MY_NABP)) != 0; } @@ -1677,14 +1797,15 @@ static uchar *_ma_base_info_read(uchar *ptr, MARIA_BASE_INFO *base) base->keys= *ptr++; base->auto_key= *ptr++; base->born_transactional= *ptr++; - ptr++; + base->compression_algorithm= *ptr++; base->pack_bytes= mi_uint2korr(ptr); ptr+= 2; base->blobs= mi_uint2korr(ptr); ptr+= 2; base->max_key_block_length= mi_uint2korr(ptr); ptr+= 2; base->max_key_length= mi_uint2korr(ptr); ptr+= 2; base->extra_alloc_bytes= mi_uint2korr(ptr); ptr+= 2; base->extra_alloc_procent= *ptr++; - ptr+= 16; + base->s3_block_size= mi_uint3korr(ptr); ptr+= 3; + ptr+= 13; return ptr; } diff --git a/storage/maria/ma_pagecache.c b/storage/maria/ma_pagecache.c index d10595fffd9..18497740bd0 100644 --- a/storage/maria/ma_pagecache.c +++ b/storage/maria/ma_pagecache.c @@ -85,6 +85,9 @@ #define PAGECACHE_DEBUG #define PAGECACHE_DEBUG_LOG "my_pagecache_debug.log" */ +#define PAGECACHE_DEBUG +#define PAGECACHE_DEBUG_LOG "my_pagecache_debug.log" +#define _VARARGS(X) X /* In key cache we have external raw locking here we use @@ -127,7 +130,8 @@ my_bool my_disable_flush_pagecache_blocks= 0; #define COND_FOR_REQUESTED 0 /* queue of thread waiting for read operation */ #define COND_FOR_SAVED 1 /* queue of thread waiting for flush */ #define COND_FOR_WRLOCK 2 /* queue of write lock */ -#define COND_SIZE 3 /* number of COND_* queues */ +#define COND_FOR_BIG_BLOCK 3 /* queue of waiting fo big block read */ +#define COND_SIZE 4 /* number of COND_* queues */ typedef mysql_cond_t KEYCACHE_CONDVAR; @@ -146,7 +150,7 @@ struct st_pagecache_hash_link struct st_pagecache_block_link *block; /* reference to the block for the page: */ PAGECACHE_FILE file; /* from such a file */ - pgcache_page_no_t pageno; /* this page */ + pgcache_page_no_t pageno; /* this page */ uint requests; /* number of requests for the page */ }; @@ -174,6 +178,7 @@ struct st_pagecache_hash_link #define PCBLOCK_CHANGED 32 /* block buffer contains a dirty page */ #define PCBLOCK_DIRECT_W 64 /* possible direct write to the block */ #define PCBLOCK_DEL_WRITE 128 /* should be written on delete */ +#define PCBLOCK_BIG_READ 256 /* the first block of the big read in progress */ /* page status, returned by find_block */ #define PAGE_READ 0 @@ -534,10 +539,22 @@ static void pagecache_debug_print _VARARGS((const char *fmt, ...)); #if defined(PAGECACHE_DEBUG_LOG) && defined(PAGECACHE_DEBUG) #define KEYCACHE_PRINT(l, m) KEYCACHE_DBUG_PRINT(l,m) + +#ifdef PAGECACHE_DEBUG_DLOG #define KEYCACHE_DBUG_PRINT(l, m) \ { if (pagecache_debug_log) \ + { \ fprintf(pagecache_debug_log, "%s: ", l); \ + DBUG_PRINT("PCDEBUG", ("%s: ", l)); \ + } \ pagecache_debug_print m; } +#else +#define KEYCACHE_DBUG_PRINT(l, m) \ + { if (pagecache_debug_log) \ + fprintf(pagecache_debug_log, "%s: ", l); \ + pagecache_debug_print m; } +#endif + #define KEYCACHE_DBUG_ASSERT(a) \ { if (! (a) && pagecache_debug_log) \ @@ -748,7 +765,8 @@ static inline uint next_power(uint value) size_t init_pagecache(PAGECACHE *pagecache, size_t use_mem, uint division_limit, uint age_threshold, - uint block_size, uint changed_blocks_hash_size, + uint block_size, + uint changed_blocks_hash_size, myf my_readwrite_flags) { size_t blocks, hash_links, length; @@ -756,6 +774,10 @@ size_t init_pagecache(PAGECACHE *pagecache, size_t use_mem, DBUG_ENTER("init_pagecache"); DBUG_ASSERT(block_size >= 512); + // By default we init usual cache (variables will be assigned to switch to s3) + pagecache->big_block_read= NULL; + pagecache->big_block_free= NULL; + PAGECACHE_DEBUG_OPEN; if (pagecache->inited && pagecache->disk_blocks > 0) { @@ -1350,6 +1372,8 @@ static void link_block(PAGECACHE *pagecache, PAGECACHE_BLOCK_LINK *block, } } while (thread != last_thread); + DBUG_PRINT("XXX", ("hash_link (link block): %p, hash_link: %p -> %p", + hash_link, hash_link->block, block)); hash_link->block= block; /* Ensure that no other thread tries to use this block */ block->status|= PCBLOCK_REASSIGNED; @@ -1646,6 +1670,9 @@ static void unlink_hash(PAGECACHE *pagecache, PAGECACHE_HASH_LINK *hash_link) if ((*hash_link->prev= hash_link->next)) hash_link->next->prev= hash_link->prev; + + DBUG_PRINT("XXX", ("hash_link (unlink): %p, hash_link: %p -> NULL", + hash_link, hash_link->block)); hash_link->block= NULL; if (pagecache->waiting_for_hash_link.last_thread) { @@ -1893,6 +1920,7 @@ static PAGECACHE_BLOCK_LINK *find_block(PAGECACHE *pagecache, my_bool wrmode, my_bool block_is_copied, my_bool reg_req, + my_bool fast, int *page_st) { PAGECACHE_HASH_LINK *hash_link; @@ -1909,6 +1937,7 @@ static PAGECACHE_BLOCK_LINK *find_block(PAGECACHE *pagecache, DBUG_EXECUTE("check_pagecache", test_key_cache(pagecache, "start of find_block", 0);); #endif + DBUG_ASSERT(!fast || !wrmode); restart: /* Find the hash link for the requested page (file, pageno) */ @@ -2018,9 +2047,11 @@ restart: /* This is a request for a new page or for a page not to be removed */ if (! block) { + DBUG_PRINT("XXX", ("request for a new page")); /* No block is assigned for the page yet */ if (pagecache->blocks_unused) { + DBUG_PRINT("XXX", ("there is never used blocks")); if (pagecache->free_block_list) { /* There is a block in the free list. */ @@ -2054,7 +2085,11 @@ restart: block->last_hit_time= 0; block->rec_lsn= LSN_MAX; link_to_file_list(pagecache, block, file, 0); + DBUG_PRINT("XXX", ("block (no block assigned): %p, hash_link: %p -> %p", + block, block->hash_link, hash_link)); block->hash_link= hash_link; + DBUG_PRINT("XXX", ("hash_link (no block assignment): %p, hash_link: %p -> %p", + hash_link, hash_link->block, block)); hash_link->block= block; page_status= PAGE_TO_BE_READ; DBUG_PRINT("info", ("page to be read set for page %p (%u)", @@ -2065,6 +2100,7 @@ restart: } else { + DBUG_PRINT("XXX", ("there is NOT never used blocks")); /* There are no never used blocks, use a block from the LRU chain */ /* @@ -2076,6 +2112,8 @@ restart: if (! pagecache->used_last) { + struct st_my_thread_var *thread; + DBUG_PRINT("XXX", ("there is NOT UNUSED blocks")); /* Wait until a new block is added to the LRU chain; several threads might wait here for the same page, @@ -2084,8 +2122,18 @@ restart: The block is given to us by the next thread executing link_block(). */ + if (fast) + { + DBUG_ASSERT(hash_link->requests == 0); + unlink_hash(pagecache, hash_link); + DBUG_PRINT("info", ("fast and no blocks in LRU")); - struct st_my_thread_var *thread= my_thread_var; + KEYCACHE_DBUG_PRINT("find_block", + ("fast and no blocks in LRU")); + DBUG_RETURN(0); + } + + thread= my_thread_var; thread->keycache_link= (void *) hash_link; wqueue_link_into_queue(&pagecache->waiting_for_block, thread); do @@ -2104,13 +2152,30 @@ restart: } else { + DBUG_PRINT("XXX", ("take a block from LRU")); /* Take the first block from the LRU chain unlinking it from the chain */ block= pagecache->used_last->next_used; + if (fast && + ((block->status & (PCBLOCK_IN_FLUSH | PCBLOCK_CHANGED)) || + (block->hash_link && block->hash_link != hash_link && + block->hash_link->requests))) + { + DBUG_ASSERT(hash_link->requests == 0); + unlink_hash(pagecache, hash_link); + DBUG_PRINT("info", ("fast and LRU block is in switch or has " + "readers")); + KEYCACHE_DBUG_PRINT("find_block", + ("fast and LRU block is in switch or has " + "readers")); + DBUG_RETURN (0); + } if (reg_req) reg_requests(pagecache, block, 1); + DBUG_PRINT("XXX", ("hash_link (LRU): %p, hash_link: %p -> %p", + hash_link, hash_link->block, block)); hash_link->block= block; DBUG_ASSERT(block->requests == 1); } @@ -2181,6 +2246,8 @@ restart: link_to_file_list(pagecache, block, file, (my_bool)(block->hash_link ? 1 : 0)); + DBUG_PRINT("XXX", ("block (LRU): %p, hash_link: %p -> %p", + block, block->hash_link, hash_link)); block->hash_link= hash_link; PCBLOCK_INFO(block); block->hits_left= init_hits_left; @@ -2665,8 +2732,221 @@ retry: DBUG_ASSERT(block->hash_link->requests > 0); block->hash_link->requests--; DBUG_RETURN(1); +} + + +/** + @brief Reading of a big block in the S3 storage engine. + + @param pagecache Page cache + @param block Block to read + + @note + + Page cache is segmented in logical blocks of size 'block_size'. All + read request are for blocks of 'block_size'. + + When using a file with 'big blocks', the file is split into a + header, header size (for index information) and then blocks of + big_block_size. he last block may be smaller than big_block_size. + All 'big blocks' are a multiple of block_size. + The header is never read into the page cache. It's used to store + the table definition and status and is only read by open(). + + When wanting to read a block, we register a read request for that + block and for the first block that is part of the big block read. We + also put a special flag on the first block so that if another thread + would want to do a big block read, it will wait on signal, and then + check if the block it requested is now in the page cache. If it's + not in the cache it will retry. + + After the big block is read, we will put all read block that was not in the + page cache. Blocks that where already in page cache will not be touched + and will not be added first in the FIFO. + + The block for which we had a read request is added first in FIFO and + returned. +*/ + +#ifdef WITH_S3_STORAGE_ENGINE +static my_bool read_big_block(PAGECACHE *pagecache, + PAGECACHE_BLOCK_LINK *block) +{ + int page_st; + size_t big_block_size_in_pages; + size_t offset; + pgcache_page_no_t page, our_page; + pgcache_page_no_t page_to_read; + PAGECACHE_BLOCK_LINK *block_to_read= NULL; + PAGECACHE_IO_HOOK_ARGS args; + S3_BLOCK data; + DBUG_ENTER("read_big_block"); + DBUG_PRINT("enter", ("read BIG block: %p", block)); + bzero((void*) &data, sizeof(data)); + + DBUG_ASSERT(block->hash_link->file.big_block_size % + pagecache->block_size == 0); + big_block_size_in_pages= + block->hash_link->file.big_block_size / pagecache->block_size; + + our_page= block->hash_link->pageno; + + /* find first page of the big block (page_to_read) */ + page_to_read= ((block->hash_link->pageno - + block->hash_link->file.head_blocks) / + big_block_size_in_pages); + page_to_read= (page_to_read * big_block_size_in_pages + + block->hash_link->file.head_blocks); + if (page_to_read != our_page) + { + block_to_read= find_block(pagecache, &block->hash_link->file, + page_to_read, 1, + FALSE, TRUE /* copy under protection (?)*/, + TRUE /*register*/, FALSE, &page_st); + DBUG_ASSERT(block_to_read == block_to_read->hash_link->block); + + if (block_to_read->status & PCBLOCK_ERROR) + { + /* We get first block with an error so all operation failed */ + block->status|= PCBLOCK_ERROR; + block->error= block_to_read->error; + DBUG_RETURN(FALSE); // no retry + } + // only primary request here, PAGE_WAIT_TO_BE_READ is impossible + DBUG_ASSERT(page_st != PAGE_WAIT_TO_BE_READ); + if (block_to_read->status & PCBLOCK_BIG_READ) + { + struct st_my_thread_var *thread; + DBUG_ASSERT(page_st != PAGE_TO_BE_READ); + /* + Block read failed because somebody else is reading the first block + (and all other blocks part of this one). + Wait until block is available. + */ + unreg_request(pagecache, block, 1); + thread= my_thread_var; + /* Put the request into a queue and wait until it can be processed */ + wqueue_add_to_queue(&block->wqueue[COND_FOR_BIG_BLOCK], thread); + do + { + DBUG_PRINT("wait", + ("suspend thread %s %ld", thread->name, + (ulong) thread->id)); + pagecache_pthread_cond_wait(&thread->suspend, + &pagecache->cache_lock); + } + while (thread->next); + DBUG_RETURN(TRUE); + } + } + else + { + block_to_read= block; + page_st= PAGE_TO_BE_READ; + } + + DBUG_ASSERT(!(block_to_read->status & PCBLOCK_BIG_READ)); + // Mark the first page of a big block + block_to_read->status|= PCBLOCK_BIG_READ; + // Don't keep cache locked during the possible slow read from s3 + pagecache_pthread_mutex_unlock(&pagecache->cache_lock); + + // perform read of big block + args.page= NULL; + args.pageno= page_to_read; + args.data= block->hash_link->file.callback_data; + + if (pagecache->big_block_read(pagecache, &args, &block->hash_link->file, + &data)) + { + pagecache_pthread_mutex_lock(&pagecache->cache_lock); + block_to_read->status|= PCBLOCK_ERROR; + block->status|= PCBLOCK_ERROR; + block_to_read->error= block->error= (int16) my_errno; + pagecache->big_block_free(&data); + if (block_to_read != block) + { + remove_reader(block_to_read); + unreg_request(pagecache, block_to_read, 1); + } + DBUG_RETURN(FALSE); // no retry + } + + /* + We need to keep the mutex locked while filling pages. + As there is no changed blocks to flush, this operation should + be reasonable fast + */ + pagecache_pthread_mutex_lock(&pagecache->cache_lock); + + /* Copy the first page to the cache */ + if (page_st != PAGE_READ) + { + DBUG_ASSERT(page_st != PAGE_WAIT_TO_BE_READ); + memcpy(block_to_read->buffer, data.str, pagecache->block_size); + block_to_read->status|= PCBLOCK_READ; + } + else + DBUG_ASSERT(block_to_read->status & PCBLOCK_READ); + + /* Copy the rest of the pages */ + for (offset= pagecache->block_size, page= page_to_read + 1; + offset < data.length; + offset+= pagecache->block_size, page++) + { + DBUG_ASSERT(offset + pagecache->block_size <= data.length); + if (page == our_page) + { + DBUG_ASSERT(!(block->status & PCBLOCK_READ)); + memcpy(block->buffer, data.str + offset, pagecache->block_size); + block->status|= PCBLOCK_READ; + } + else + { + PAGECACHE_BLOCK_LINK *bl; + bl= find_block(pagecache, &block->hash_link->file, page, 1, + FALSE, TRUE /* copy under protection (?)*/, + TRUE /*register*/, TRUE /*fast*/, &page_st); + if (!bl) + { + // we run out of easy avaliable pages in the cache + break; + } + DBUG_ASSERT(bl == bl->hash_link->block); + if ((bl->status & PCBLOCK_ERROR) == 0 && + page_st == PAGE_TO_BE_READ) + { + memcpy(bl->buffer, data.str + offset, pagecache->block_size); + bl->status|= PCBLOCK_READ; + } + remove_reader(bl); + unreg_request(pagecache, bl, 1); + } + } + if (page < our_page) + { + /* we break earlier, but still have to fill page what was requested */ + DBUG_ASSERT(!(block->status & PCBLOCK_READ)); + memcpy(block->buffer, + data.str + ((our_page - page_to_read) * pagecache->block_size), + pagecache->block_size); + block->status|= PCBLOCK_READ; + } + pagecache->big_block_free(&data); + + block_to_read->status&= ~PCBLOCK_BIG_READ; + if (block_to_read != block) + { + remove_reader(block_to_read); + unreg_request(pagecache, block_to_read, 1); + } + if (block->wqueue[COND_FOR_BIG_BLOCK].last_thread) + wqueue_release_queue(&block->wqueue[COND_FOR_BIG_BLOCK]); + + DBUG_RETURN(FALSE); } +#endif /* WITH_S3_STORAGE_ENGINE */ /* @@ -2861,7 +3141,7 @@ void pagecache_unlock(PAGECACHE *pagecache, inc_counter_for_resize_op(pagecache); /* See NOTE for pagecache_unlock about registering requests */ block= find_block(pagecache, file, pageno, 0, 0, 0, - pin == PAGECACHE_PIN_LEFT_UNPINNED, &page_st); + pin == PAGECACHE_PIN_LEFT_UNPINNED, FALSE, &page_st); PCBLOCK_INFO(block); DBUG_ASSERT(block != 0 && page_st == PAGE_READ); if (first_REDO_LSN_for_page) @@ -2948,7 +3228,7 @@ void pagecache_unpin(PAGECACHE *pagecache, inc_counter_for_resize_op(pagecache); /* See NOTE for pagecache_unlock about registering requests */ - block= find_block(pagecache, file, pageno, 0, 0, 0, 0, &page_st); + block= find_block(pagecache, file, pageno, 0, 0, 0, 0, FALSE, &page_st); DBUG_ASSERT(block != 0); DBUG_ASSERT(page_st == PAGE_READ); /* we can't unpin such page without unlock */ @@ -3349,7 +3629,7 @@ uchar *pagecache_read(PAGECACHE *pagecache, char llbuf[22]; DBUG_ENTER("pagecache_read"); DBUG_PRINT("enter", ("fd: %u page: %s buffer: %p level: %u " - "t:%s (%d)%s->%s %s->%s", + "t:%s (%d)%s->%s %s->%s big block: %d", (uint) file->file, ullstr(pageno, llbuf), buff, level, page_cache_page_type_str[type], @@ -3357,7 +3637,8 @@ uchar *pagecache_read(PAGECACHE *pagecache, page_cache_page_lock_str[lock_to_read[lock].new_lock], page_cache_page_lock_str[lock_to_read[lock].unlock_lock], page_cache_page_pin_str[new_pin], - page_cache_page_pin_str[unlock_pin])); + page_cache_page_pin_str[unlock_pin], + MY_TEST(pagecache->big_block_read))); DBUG_ASSERT(buff != 0 || (buff == 0 && (unlock_pin == PAGECACHE_PIN || unlock_pin == PAGECACHE_PIN_LEFT_PINNED))); DBUG_ASSERT(pageno < ((1ULL) << 40)); @@ -3369,6 +3650,14 @@ uchar *pagecache_read(PAGECACHE *pagecache, restart: + /* + If we use big block than the big block is multiple of blocks and we + have enouch blocks in cache + */ + DBUG_ASSERT(!pagecache->big_block_read || + (file->big_block_size != 0 && + file->big_block_size % pagecache->block_size == 0)); + if (pagecache->can_be_used) { /* Key cache is used */ @@ -3387,19 +3676,45 @@ restart: pagecache->global_cache_r_requests++; /* See NOTE for pagecache_unlock about registering requests. */ reg_request= ((new_pin == PAGECACHE_PIN_LEFT_UNPINNED) || - (new_pin == PAGECACHE_PIN)); + (new_pin == PAGECACHE_PIN) || + pagecache->big_block_read); block= find_block(pagecache, file, pageno, level, lock == PAGECACHE_LOCK_WRITE, buff != 0, - reg_request, &page_st); + reg_request, FALSE, &page_st); DBUG_PRINT("info", ("Block type: %s current type %s", page_cache_page_type_str[block->type], page_cache_page_type_str[type])); if (((block->status & PCBLOCK_ERROR) == 0) && (page_st != PAGE_READ)) { - /* The requested page is to be read into the block buffer */ - read_block(pagecache, block, - (my_bool)(page_st == PAGE_TO_BE_READ)); - DBUG_PRINT("info", ("read is done")); +#ifdef WITH_S3_STORAGE_ENGINE + if (!pagecache->big_block_read) +#endif /* WITH_S3_STORAGE_ENGINE */ + { + /* The requested page is to be read into the block buffer */ + read_block(pagecache, block, page_st == PAGE_TO_BE_READ); + DBUG_PRINT("info", ("read is done")); + } +#ifdef WITH_S3_STORAGE_ENGINE + else + { + /* It is big read and this thread should read */ + DBUG_ASSERT(page_st == PAGE_TO_BE_READ); + + if (read_big_block(pagecache, block)) + { + /* block is unregistered in read_big_block */ + pagecache_pthread_mutex_unlock(&pagecache->cache_lock); + DBUG_PRINT("restart", ("big block fail, restarting...")); + goto restart; + } + if (!((new_pin == PAGECACHE_PIN_LEFT_UNPINNED) || + (new_pin == PAGECACHE_PIN))) + { + /* we registered request only for big_block_read */ + unreg_request(pagecache, block, 1); + } + } +#endif /* WITH_S3_STORAGE_ENGINE */ } /* Assert after block is read. Imagine two concurrent SELECTs on same @@ -3990,6 +4305,7 @@ my_bool pagecache_write_part(PAGECACHE *pagecache, DBUG_ASSERT(lock != PAGECACHE_LOCK_READ_UNLOCK); DBUG_ASSERT(offset + size <= pagecache->block_size); DBUG_ASSERT(pageno < ((1ULL) << 40)); + DBUG_ASSERT(pagecache->big_block_read == 0); #endif if (!page_link) @@ -4026,7 +4342,7 @@ restart: (pin == PAGECACHE_PIN)); block= find_block(pagecache, file, pageno, level, TRUE, FALSE, - reg_request, &page_st); + reg_request, FALSE, &page_st); if (!block) { DBUG_ASSERT(write_mode != PAGECACHE_WRITE_DONE); @@ -4278,6 +4594,8 @@ static my_bool free_block(PAGECACHE *pagecache, PAGECACHE_BLOCK_LINK *block, block->type= PAGECACHE_EMPTY_PAGE; #endif block->rec_lsn= LSN_MAX; + DBUG_PRINT("XXX", ("block (Free): %p, hash_link: %p -> NULL", + block, block->hash_link)); block->hash_link= NULL; if (block->temperature == PCBLOCK_WARM) pagecache->warm_blocks--; @@ -5265,8 +5583,11 @@ static void pagecache_debug_print(const char * fmt, ...) va_start(args,fmt); if (pagecache_debug_log) { - VOID(vfprintf(pagecache_debug_log, fmt, args)); - VOID(fputc('\n',pagecache_debug_log)); + vfprintf(pagecache_debug_log, fmt, args); + fputc('\n',pagecache_debug_log); +#ifdef PAGECACHE_DEBUG_DLOG + _db_doprnt_(fmt, args); +#endif } va_end(args); } @@ -5307,8 +5628,7 @@ static void null_post_write_hook(int res __attribute__((unused)), return; } -void -pagecache_file_set_null_hooks(PAGECACHE_FILE *file) +void pagecache_file_set_null_hooks(PAGECACHE_FILE *file) { file->pre_read_hook= null_pre_hook; file->post_read_hook= null_post_read_hook; @@ -5316,4 +5636,5 @@ pagecache_file_set_null_hooks(PAGECACHE_FILE *file) file->post_write_hook= null_post_write_hook; file->flush_log_callback= null_pre_hook; file->callback_data= NULL; + file->head_blocks= file->big_block_size= 0; } diff --git a/storage/maria/ma_pagecache.h b/storage/maria/ma_pagecache.h index 1183f9d57e0..30fffbe54d3 100644 --- a/storage/maria/ma_pagecache.h +++ b/storage/maria/ma_pagecache.h @@ -86,9 +86,25 @@ typedef struct st_pagecache_io_hook_args uchar *crypt_buf; /* when using encryption */ } PAGECACHE_IO_HOOK_ARGS; +struct st_pagecache; + +/* Structure to store things from get_object */ + +typedef struct st_S3_BLOCK +{ + uchar *str, *alloc_ptr; + size_t length; +} S3_BLOCK; + + /* file descriptor for Maria */ typedef struct st_pagecache_file { + /* Number of pages in the header which are not read with big blocks */ + size_t head_blocks; + /* size of a big block for S3 or 0 */ + size_t big_block_size; + /* File number */ File file; /** Cannot be NULL */ @@ -99,9 +115,9 @@ typedef struct st_pagecache_file my_bool (*pre_write_hook)(PAGECACHE_IO_HOOK_ARGS *args); void (*post_write_hook)(int error, PAGECACHE_IO_HOOK_ARGS *args); - /** Cannot be NULL */ my_bool (*flush_log_callback)(PAGECACHE_IO_HOOK_ARGS *args); + /** Cannot be NULL */ uchar *callback_data; } PAGECACHE_FILE; @@ -164,6 +180,17 @@ typedef struct st_pagecache /* hash for other file bl.*/ PAGECACHE_BLOCK_LINK **file_blocks; + /** + Function for reading file in big hunks from S3 + Data will be filled with pointer and length to data read + start_page will be contain first page read. + */ + my_bool (*big_block_read)(struct st_pagecache *pagecache, + PAGECACHE_IO_HOOK_ARGS *args, + struct st_pagecache_file *file, S3_BLOCK *data); + void (*big_block_free)(S3_BLOCK *data); + + /* The following variables are and variables used to hold parameters for initializing the key cache. diff --git a/storage/maria/ma_recovery.c b/storage/maria/ma_recovery.c index 3ddf2d91f16..b9f164449ba 100644 --- a/storage/maria/ma_recovery.c +++ b/storage/maria/ma_recovery.c @@ -812,7 +812,7 @@ prototype_redo_exec_hook(REDO_CREATE_TABLE) goto end; } /* we try hard to get create_rename_lsn, to avoid mistakes if possible */ - info= maria_open(name, O_RDONLY, HA_OPEN_FOR_REPAIR); + info= maria_open(name, O_RDONLY, HA_OPEN_FOR_REPAIR, 0); if (info) { MARIA_SHARE *share= info->s; @@ -933,7 +933,7 @@ prototype_redo_exec_hook(REDO_CREATE_TABLE) correctly filled. So we just open the table (fortunately, an empty data file does not preclude this). */ - if (((info= maria_open(name, O_RDONLY, 0)) == NULL) || + if (((info= maria_open(name, O_RDONLY, 0, 0)) == NULL) || _ma_initialize_data_file(info->s, info->dfile.file)) { eprint(tracef, "Failed to open new table or write to data file"); @@ -1003,7 +1003,7 @@ prototype_redo_exec_hook(REDO_RENAME_TABLE) log insertions of records into the temporary table, so replaying may fail (grep for INCOMPLETE_LOG in files). */ - info= maria_open(old_name, O_RDONLY, HA_OPEN_FOR_REPAIR); + info= maria_open(old_name, O_RDONLY, HA_OPEN_FOR_REPAIR, 0); if (info) { MARIA_SHARE *share= info->s; @@ -1052,7 +1052,7 @@ prototype_redo_exec_hook(REDO_RENAME_TABLE) t, renames it to u (if not testing create_rename_lsn) thus overwriting old-named v, drops u, and we are stuck, we have lost data. */ - info= maria_open(new_name, O_RDONLY, HA_OPEN_FOR_REPAIR); + info= maria_open(new_name, O_RDONLY, HA_OPEN_FOR_REPAIR, 0); if (info) { MARIA_SHARE *share= info->s; @@ -1108,7 +1108,7 @@ prototype_redo_exec_hook(REDO_RENAME_TABLE) eprint(tracef, "Failed to rename table"); goto end; } - info= maria_open(new_name, O_RDONLY, 0); + info= maria_open(new_name, O_RDONLY, 0, 0); if (info == NULL) { eprint(tracef, "Failed to open renamed table"); @@ -1227,7 +1227,7 @@ prototype_redo_exec_hook(REDO_DROP_TABLE) } name= (char *)log_record_buffer.str; tprint(tracef, "Table '%s'", name); - info= maria_open(name, O_RDONLY, HA_OPEN_FOR_REPAIR); + info= maria_open(name, O_RDONLY, HA_OPEN_FOR_REPAIR, 0); if (info) { MARIA_SHARE *share= info->s; @@ -1369,7 +1369,7 @@ static int new_table(uint16 sid, const char *name, LSN lsn_of_file_id) goto end; } tprint(tracef, "Table '%s', id %u", name, sid); - info= maria_open(name, O_RDWR, HA_OPEN_FOR_REPAIR); + info= maria_open(name, O_RDWR, HA_OPEN_FOR_REPAIR, 0); if (info == NULL) { tprint(tracef, ", is absent (must have been dropped later?)" diff --git a/storage/maria/ma_rename.c b/storage/maria/ma_rename.c index 0650d9f6a56..afa60d87186 100644 --- a/storage/maria/ma_rename.c +++ b/storage/maria/ma_rename.c @@ -48,7 +48,7 @@ int maria_rename(const char *old_name, const char *new_name) _ma_check_table_is_closed(new_name,"rename new table2"); #endif /** @todo LOCK take X-lock on table */ - if (!(info= maria_open(old_name, O_RDWR, HA_OPEN_FOR_REPAIR))) + if (!(info= maria_open(old_name, O_RDWR, HA_OPEN_FOR_REPAIR, 0))) DBUG_RETURN(my_errno); share= info->s; #ifdef USE_RAID diff --git a/storage/maria/ma_rt_test.c b/storage/maria/ma_rt_test.c index 88e4d7089e0..6cdd63e64d4 100644 --- a/storage/maria/ma_rt_test.c +++ b/storage/maria/ma_rt_test.c @@ -199,7 +199,7 @@ static int run_test(const char *filename) if (!silent) printf("- Open isam-file\n"); - if (!(file=maria_open(filename,2,HA_OPEN_ABORT_IF_LOCKED))) + if (!(file=maria_open(filename,2,HA_OPEN_ABORT_IF_LOCKED,0))) goto err; maria_begin(file); if (opt_versioning) diff --git a/storage/maria/ma_sp_test.c b/storage/maria/ma_sp_test.c index d3b540e8f7c..1b69defe2e2 100644 --- a/storage/maria/ma_sp_test.c +++ b/storage/maria/ma_sp_test.c @@ -119,7 +119,7 @@ int run_test(const char *filename) if (!silent) printf("- Open isam-file\n"); - if (!(file=maria_open(filename,2,HA_OPEN_ABORT_IF_LOCKED))) + if (!(file=maria_open(filename,2,HA_OPEN_ABORT_IF_LOCKED, 0))) goto err; if (!silent) diff --git a/storage/maria/ma_test1.c b/storage/maria/ma_test1.c index d7ee7136fb2..afe3b120b41 100644 --- a/storage/maria/ma_test1.c +++ b/storage/maria/ma_test1.c @@ -209,7 +209,7 @@ static int run_test(const char *filename) uniques, &uniquedef, &create_info, create_flag)) goto err; - if (!(file=maria_open(filename,2,HA_OPEN_ABORT_IF_LOCKED))) + if (!(file=maria_open(filename,2,HA_OPEN_ABORT_IF_LOCKED, 0))) goto err; if (!silent) printf("- Writing key:s\n"); @@ -343,7 +343,7 @@ static int run_test(const char *filename) goto err; if (maria_close(file)) goto err; - if (!(file=maria_open(filename,2,HA_OPEN_ABORT_IF_LOCKED))) + if (!(file=maria_open(filename,2,HA_OPEN_ABORT_IF_LOCKED, 0))) goto err; if (maria_begin(file)) goto err; diff --git a/storage/maria/ma_test2.c b/storage/maria/ma_test2.c index b6442c2be91..a6133ca6f28 100644 --- a/storage/maria/ma_test2.c +++ b/storage/maria/ma_test2.c @@ -235,7 +235,7 @@ int main(int argc, char *argv[]) 0,(MARIA_UNIQUEDEF*) 0, &create_info,create_flag)) goto err; - if (!(file=maria_open(filename,2,HA_OPEN_ABORT_IF_LOCKED))) + if (!(file=maria_open(filename,2,HA_OPEN_ABORT_IF_LOCKED, 0))) goto err; maria_begin(file); if (opt_versioning) diff --git a/storage/maria/ma_test3.c b/storage/maria/ma_test3.c index 604c2b676a4..b8a6c2585ac 100644 --- a/storage/maria/ma_test3.c +++ b/storage/maria/ma_test3.c @@ -171,8 +171,8 @@ void start_test(int id) MARIA_INFO isam_info; MARIA_HA *file,*file1,*file2=0,*lock; - if (!(file1=maria_open(filename,O_RDWR,HA_OPEN_WAIT_IF_LOCKED)) || - !(file2=maria_open(filename,O_RDWR,HA_OPEN_WAIT_IF_LOCKED))) + if (!(file1=maria_open(filename,O_RDWR,HA_OPEN_WAIT_IF_LOCKED,0)) || + !(file2=maria_open(filename,O_RDWR,HA_OPEN_WAIT_IF_LOCKED,0))) { fprintf(stderr,"Can't open isam-file: %s\n",filename); exit(1); diff --git a/storage/maria/maria_chk.c b/storage/maria/maria_chk.c index 43798fe8c07..67f7befc342 100644 --- a/storage/maria/maria_chk.c +++ b/storage/maria/maria_chk.c @@ -1025,7 +1025,8 @@ static int maria_chk(HA_CHECK *param, char *filename) ((param->testflag & T_WAIT_FOREVER) ? HA_OPEN_WAIT_IF_LOCKED : (param->testflag & T_DESCRIPT) ? - HA_OPEN_IGNORE_IF_LOCKED : HA_OPEN_ABORT_IF_LOCKED)))) + HA_OPEN_IGNORE_IF_LOCKED : HA_OPEN_ABORT_IF_LOCKED), + 0))) { /* Avoid twice printing of isam file name */ param->error_printed=1; @@ -2101,7 +2102,7 @@ static my_bool write_log_record(HA_CHECK *param) Now that all operations including O_NEW_DATA|INDEX are successfully done, we can write a log record. */ - MARIA_HA *info= maria_open(param->isam_file_name, O_RDWR, 0); + MARIA_HA *info= maria_open(param->isam_file_name, O_RDWR, 0, 0); if (info == NULL) _ma_check_print_error(param, default_open_errmsg, my_errno, param->isam_file_name); diff --git a/storage/maria/maria_def.h b/storage/maria/maria_def.h index 7da53f788cf..9bee940eea4 100644 --- a/storage/maria/maria_def.h +++ b/storage/maria/maria_def.h @@ -263,6 +263,7 @@ typedef struct st_ma_base_info ulong min_pack_length; ulong max_pack_length; /* Max possibly length of packed rec */ ulong min_block_length; + ulong s3_block_size; /* Block length for S3 files */ uint fields; /* fields in table */ uint fixed_not_null_fields; uint fixed_not_null_fields_length; @@ -298,6 +299,8 @@ typedef struct st_ma_base_info uint extra_options; /* default language, not really used but displayed by maria_chk */ uint language; + /* Compression library used. 0 for no compression */ + uint compression_algorithm; /* The following are from the header */ uint key_parts, all_key_parts; @@ -362,6 +365,7 @@ typedef struct st_maria_file_bitmap #define MARIA_CHECKPOINT_SEEN_IN_LOOP 4 typedef struct st_maria_crypt_data MARIA_CRYPT_DATA; +struct ms3_st; typedef struct st_maria_share { /* Shared between opens */ @@ -456,6 +460,7 @@ typedef struct st_maria_share uint32 ftkeys; /* Number of distinct full-text keys + 1 */ PAGECACHE_FILE kfile; /* Shared keyfile */ + S3_INFO *s3_path; /* Connection and path in s3 */ File data_file; /* Shared data file */ int mode; /* mode of file on open */ uint reopen; /* How many times opened */ @@ -609,6 +614,7 @@ struct st_maria_handler MARIA_STATUS_INFO *state, state_save; MARIA_STATUS_INFO *state_start; /* State at start of transaction */ MARIA_USED_TABLES *used_tables; + struct ms3_st *s3; MARIA_ROW cur_row; /* The active row that we just read */ MARIA_ROW new_row; /* Storage for a row during update */ MARIA_KEY last_key; /* Last found key */ @@ -714,6 +720,14 @@ struct st_maria_handler void *index_cond_func_arg; /* parameter for the func */ }; +/* Table options for the Aria and S3 storage engine */ + +struct ha_table_option_struct +{ + ulonglong s3_block_size; + uint compression_algorithm; +}; + /* Some defines used by maria-functions */ #define USE_WHOLE_KEY 65535 /* Use whole key in _search() */ diff --git a/storage/maria/maria_ftdump.c b/storage/maria/maria_ftdump.c index 4a1b610ff48..56dec094e86 100644 --- a/storage/maria/maria_ftdump.c +++ b/storage/maria/maria_ftdump.c @@ -88,7 +88,7 @@ int main(int argc,char *argv[]) MARIA_KEY_BLOCK_LENGTH, 0, MY_WME); if (!(info=maria_open(argv[0], O_RDONLY, - HA_OPEN_ABORT_IF_LOCKED|HA_OPEN_FROM_SQL_LAYER))) + HA_OPEN_ABORT_IF_LOCKED|HA_OPEN_FROM_SQL_LAYER, 0))) { error=my_errno; goto err; diff --git a/storage/maria/maria_pack.c b/storage/maria/maria_pack.c index c36543eb231..c1ea6d19613 100644 --- a/storage/maria/maria_pack.c +++ b/storage/maria/maria_pack.c @@ -404,7 +404,7 @@ static MARIA_HA *open_maria_file(char *name,int mode) if (!(isam_file=maria_open(name, mode, HA_OPEN_IGNORE_MOVED_STATE | (opt_wait ? HA_OPEN_WAIT_IF_LOCKED : - HA_OPEN_ABORT_IF_LOCKED)))) + HA_OPEN_ABORT_IF_LOCKED), 0))) { fprintf(stderr, "%s gave error %d on open\n", name, my_errno); DBUG_RETURN(0); diff --git a/storage/maria/s3_func.c b/storage/maria/s3_func.c new file mode 100644 index 00000000000..a7d47411cf2 --- /dev/null +++ b/storage/maria/s3_func.c @@ -0,0 +1,1431 @@ +/* Copyright (C) 2019 MariaDB Corporation Ab + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software Foundation, + Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111-1301 USA */ + +/* + Interface function used by S3 storage engine and aria_copy_for_s3 +*/ + +#include "maria_def.h" +#include "s3_func.h" +#include <aria_backup.h> +#include <mysqld_error.h> +#include <sql_const.h> +#include <mysys_err.h> +#include <mysql_com.h> +#include <zlib.h> + +/* number of '.' to print during a copy in verbose mode */ +#define DISPLAY_WITH 79 + +static void convert_index_to_s3_format(uchar *header, ulong block_size, + int compression); +static void convert_index_to_disk_format(uchar *header); +static void convert_frm_to_s3_format(uchar *header); +static void convert_frm_to_disk_format(uchar *header); +static int s3_read_frm_from_disk(const char *filename, uchar **to, + size_t *to_size); + + +/****************************************************************************** + Allocations handler for libmarias3 + To be removed when we do the init allocation in mysqld.cc +******************************************************************************/ + +static void *s3_wrap_malloc(size_t size) +{ + return my_malloc(size, MYF(MY_WME)); +} + +static void *s3_wrap_calloc(size_t nmemb, size_t size) +{ + return my_malloc(nmemb * size, MYF(MY_WME | MY_ZEROFILL)); +} + +static void *s3_wrap_realloc(void *ptr, size_t size) +{ + return my_realloc(ptr, size, MYF(MY_WME | MY_ALLOW_ZERO_PTR)); +} + +static char *s3_wrap_strdup(const char *str) +{ + return my_strdup(str, MYF(MY_WME)); +} + +static void s3_wrap_free(void *ptr) +{ + my_free(ptr); +} + +void s3_init_library() +{ + ms3_library_init_malloc(s3_wrap_malloc, s3_wrap_free, s3_wrap_realloc, + s3_wrap_strdup, s3_wrap_calloc); +} + +void s3_deinit_library() +{ + ms3_library_deinit(); +} + +/****************************************************************************** + Functions on S3_INFO and S3_BLOCK +******************************************************************************/ + +/* + Free memory allocated by s3_get_object +*/ + +void s3_free(S3_BLOCK *data) +{ + my_free(data->alloc_ptr); + data->alloc_ptr= 0; +} + + +/* + Copy a S3_INFO structure +*/ + +S3_INFO *s3_info_copy(S3_INFO *old) +{ + S3_INFO *to, tmp; + + /* Copy lengths */ + memcpy(&tmp, old, sizeof(tmp)); + /* Allocate new buffers */ + if (!my_multi_malloc(MY_WME, &to, sizeof(S3_INFO), + &tmp.access_key.str, old->access_key.length+1, + &tmp.secret_key.str, old->secret_key.length+1, + &tmp.region.str, old->region.length+1, + &tmp.bucket.str, old->bucket.length+1, + &tmp.database.str, old->database.length+1, + &tmp.table.str, old->table.length+1, + NullS)) + return 0; + /* Copy lengths and new pointers to to */ + memcpy(to, &tmp, sizeof(tmp)); + /* Copy data */ + strmov((char*) to->access_key.str, old->access_key.str); + strmov((char*) to->secret_key.str, old->secret_key.str); + strmov((char*) to->region.str, old->region.str); + strmov((char*) to->bucket.str, old->bucket.str); + /* Database may not be null terminated */ + strmake((char*) to->database.str, old->database.str, old->database.length); + strmov((char*) to->table.str, old->table.str); + return to; +} + +/** + Open a connection to s3 +*/ + +ms3_st *s3_open_connection(S3_INFO *s3) +{ + ms3_st *s3_client; + if (!(s3_client= ms3_init(s3->access_key.str, + s3->secret_key.str, + s3->region.str, + NULL))) + { + my_printf_error(HA_ERR_NO_SUCH_TABLE, + "Can't open connection to S3, error: %d %s", MYF(0), + errno, ms3_error(errno)); + my_errno= HA_ERR_NO_SUCH_TABLE; + } + return s3_client; +} + + +/****************************************************************************** + High level functions to copy tables to and from S3 +******************************************************************************/ + +/** + Create suffix for object name + @param to_end end of suffix (from previous call or 000000 at start) + + The suffix is a 6 length '0' prefixed number. If the number + gets longer than 6, then it's extended to 7 and more digits. +*/ + +static void fix_suffix(char *to_end, ulong nr) +{ + char buff[11]; + uint length= (uint) (int10_to_str(nr, buff, 10) - buff); + set_if_smaller(length, 6); + strmov(to_end - length, buff); +} + +/** + Copy file to 'aws_path' in blocks of block_size + + @return 0 ok + @return 1 error. Error message is printed to stderr + + Notes: + file is always closed before return +*/ + +static my_bool copy_from_file(ms3_st *s3_client, const char *aws_bucket, + const char *aws_path, + File file, my_off_t start, my_off_t file_end, + uchar *block, size_t block_size, + my_bool compression, my_bool display) +{ + my_off_t pos; + char *path_end= strend(aws_path); + ulong bnr; + my_bool print_done= 0; + size_t length; + + for (pos= start, bnr=1 ; pos < file_end ; pos+= length, bnr++) + { + if ((length= my_pread(file, block, block_size, pos, MYF(MY_WME))) == + MY_FILE_ERROR) + goto err; + if (length == 0) + { + my_error(EE_EOFERR, MYF(0), my_filename(file), my_errno); + goto err; + } + + fix_suffix(path_end, bnr); + if (s3_put_object(s3_client, aws_bucket, aws_path, block, length, + compression)) + goto err; + + /* Write up to DISPLAY_WITH number of '.' during copy */ + if (display && + ((pos + block_size) * DISPLAY_WITH / file_end) > + (pos * DISPLAY_WITH/file_end)) + { + fputc('.', stdout); fflush(stdout); + print_done= 1; + } + } + if (print_done) + { + fputc('\n', stdout); fflush(stdout); + } + my_close(file, MYF(MY_WME)); + return 0; + +err: + my_close(file, MYF(MY_WME)); + if (print_done) + { + fputc('\n', stdout); fflush(stdout); + } + return 1; +} + + +/** + Copy an Aria table to S3 + @param s3_client connection to S3 + @param aws_bucket Aws bucket + @param path Path for Aria table (can be temp table) + @param database database name + @param table_name table name + @param block_size Block size in s3. If 0 then use block size + and compression as specified in the .MAI file as + specified as part of open. + @param compression Compression algorithm (0 = none, 1 = zip) + If block size is 0 then use .MAI file. + @return 0 ok + @return 1 error + + The table will be copied in S3 into the following locations: + + frm file (for discovery): + aws_bucket/database/table/frm + + First index block (contains description if the Aria file): + aws_bucket/database/table/aria + + Rest of the index file: + aws_bucket/database/table/index/block_number + + Data file: + aws_bucket/database/table/data/block_number + + block_number is 6 digits decimal number, prefixed with 0 + (Can be larger than 6 numbers, the prefix is just for nice output) + + frm and base blocks are small (just the needed data). + index and blocks are of size 's3_block_size' + + If compression is used, then original block size is s3_block_size + but the stored block will be the size of the compressed block. +*/ + +int aria_copy_to_s3(ms3_st *s3_client, const char *aws_bucket, + const char *path, + const char *database, const char *table_name, + ulong block_size, my_bool compression, + my_bool force, my_bool display) +{ + ARIA_TABLE_CAPABILITIES cap; + char aws_path[FN_REFLEN+100]; + char filename[FN_REFLEN]; + char *aws_path_end, *end; + uchar *alloc_block= 0, *block; + File file= -1; + my_off_t file_size; + size_t frm_length; + int error; + ms3_status_st status; + DBUG_ENTER("aria_copy_to_s3"); + + aws_path_end= strxmov(aws_path, database, "/", table_name, NullS); + strmov(aws_path_end, "/aria"); + + if (!ms3_status(s3_client, aws_bucket, aws_path, &status)) + { + if (!force) + { + my_printf_error(EE_CANTCREATEFILE, "File %s exists in s3", MYF(0), + aws_path); + DBUG_RETURN(EE_CANTCREATEFILE); + } + if ((error= aria_delete_from_s3(s3_client, aws_bucket, database, + table_name, display))) + DBUG_RETURN(error); + } + + /* + Copy frm file if it exists + We do this first to ensure that .frm always exists. This is needed to + ensure that discovery of the table will work. + */ + fn_format(filename, path, "", ".frm", MY_REPLACE_EXT); + if (!s3_read_frm_from_disk(filename, &alloc_block, &frm_length)) + { + if (display) + printf("Copying frm file %s\n", filename); + + end= strmov(aws_path_end,"/frm"); + convert_frm_to_s3_format(alloc_block); + + /* Note that frm is not compressed! */ + if (s3_put_object(s3_client, aws_bucket, aws_path, alloc_block, frm_length, + 0)) + goto err; + + my_free(alloc_block); + alloc_block= 0; + } + + if (display) + printf("Copying aria table: %s.%s to s3\n", database, table_name); + + /* Index file name */ + fn_format(filename, path, "", ".MAI", MY_REPLACE_EXT); + if ((file= my_open(filename, + O_RDONLY | O_SHARE | O_NOFOLLOW | O_CLOEXEC, + MYF(MY_WME))) < 0) + DBUG_RETURN(1); + if ((error= aria_get_capabilities(file, &cap))) + { + fprintf(stderr, "Got error %d when reading Aria header from %s\n", + error, path); + goto err; + } + if (cap.transactional || cap.data_file_type != BLOCK_RECORD) + { + fprintf(stderr, + "Aria table %s doesn't match criteria to be copied to S3.\n" + "It should be non-transactional and should have row_format page", + path); + goto err; + } + /* + If block size is not specified, use the values specified as part of + create + */ + if (block_size == 0) + { + block_size= cap.s3_block_size; + compression= cap.compression; + } + + /* Align S3_BLOCK size with table block size */ + block_size= (block_size/cap.block_size)*cap.block_size; + + /* Allocate block for data + flag for compress header */ + if (!(alloc_block= (uchar*) my_malloc(block_size+ALIGN_SIZE(1), + MYF(MY_WME)))) + goto err; + /* Read/write data here, but with prefix space for compression flag */ + block= alloc_block+ ALIGN_SIZE(1); + + if (my_pread(file, block, cap.header_size, 0, MYF(MY_WME | MY_FNABP))) + goto err; + + strmov(aws_path_end, "/aria"); + + if (display) + printf("Creating aria table information %s\n", aws_path); + + convert_index_to_s3_format(block, block_size, compression); + + /* + The first page is not compressed as we need it to know if the rest is + compressed + */ + if (s3_put_object(s3_client, aws_bucket, aws_path, block, cap.header_size, + 0 /* no compression */ )) + goto err; + + file_size= my_seek(file, 0L, MY_SEEK_END, MYF(0)); + + end= strmov(aws_path_end,"/index"); + + if (display) + printf("Copying index information %s\n", aws_path); + + /* The 000000 will be update with block number by fix_suffix() */ + end= strmov(end, "/000000"); + + error= copy_from_file(s3_client, aws_bucket, aws_path, file, cap.header_size, + file_size, block, block_size, compression, display); + file= -1; + if (error) + goto err; + + /* Copy data file */ + fn_format(filename, path, "", ".MAD", MY_REPLACE_EXT); + if ((file= my_open(filename, + O_RDONLY | O_SHARE | O_NOFOLLOW | O_CLOEXEC, + MYF(MY_WME))) < 0) + DBUG_RETURN(1); + + file_size= my_seek(file, 0L, MY_SEEK_END, MYF(0)); + + end= strmov(aws_path_end, "/data"); + + if (display) + printf("Copying data information %s\n", aws_path); + + /* The 000000 will be update with block number by fix_suffix() */ + end= strmov(end, "/000000"); + + error= copy_from_file(s3_client, aws_bucket, aws_path, file, 0, file_size, + block, block_size, compression, display); + file= -1; + if (error) + goto err; + + my_free(alloc_block); + DBUG_RETURN(0); + +err: + if (file >= 0) + my_close(file, MYF(0)); + my_free(alloc_block); + DBUG_RETURN(1); +} + + +/** + Copy file to 'aws_path' in blocks of block_size + + @return 0 ok + @return 1 error. Error message is printed to stderr + + Notes: + file is always closed before return +*/ + +static my_bool copy_to_file(ms3_st *s3_client, const char *aws_bucket, + char *aws_path, File file, my_off_t start, + my_off_t file_end, my_bool compression, + my_bool display) +{ + my_off_t pos; + char *path_end= strend(aws_path); + size_t error; + ulong bnr; + my_bool print_done= 0; + S3_BLOCK block; + DBUG_ENTER("copy_to_file"); + DBUG_PRINT("enter", ("path: %s start: %llu end: %llu", + aws_path, (ulonglong) start, (ulonglong) file_end)); + + for (pos= start, bnr=1 ; pos < file_end ; pos+= block.length, bnr++) + { + fix_suffix(path_end, bnr); + if (s3_get_object(s3_client, aws_bucket, aws_path, &block, compression, 1)) + goto err; + + error= my_write(file, block.str, block.length, MYF(MY_WME | MY_WME)); + s3_free(&block); + if (error == MY_FILE_ERROR) + goto err; + + /* Write up to DISPLAY_WITH number of '.' during copy */ + if (display && + ((pos + block.length) * DISPLAY_WITH /file_end) > + (pos * DISPLAY_WITH/file_end)) + { + fputc('.', stdout); fflush(stdout); + print_done= 1; + } + } + if (print_done) + { + fputc('\n', stdout); fflush(stdout); + } + my_close(file, MYF(MY_WME)); + DBUG_RETURN(0); + +err: + my_close(file, MYF(MY_WME)); + if (print_done) + { + fputc('\n', stdout); fflush(stdout); + } + DBUG_RETURN(1); +} + + +/** + Copy a table from S3 to current directory +*/ + +int aria_copy_from_s3(ms3_st *s3_client, const char *aws_bucket, + const char *path, const char *database, + my_bool compression, my_bool force, my_bool display) + +{ + MARIA_STATE_INFO state; + MY_STAT stat_info; + char table_name[FN_REFLEN], aws_path[FN_REFLEN+100]; + char filename[FN_REFLEN]; + char *aws_path_end, *end; + File file; + S3_BLOCK block; + my_off_t index_file_size, data_file_size; + uint offset; + int error; + DBUG_ENTER("aria_copy_from_s3"); + + /* Check if index file exists */ + fn_format(filename, path, "", ".MAI", MY_REPLACE_EXT); + if (!force && my_stat(filename, &stat_info, MYF(0))) + { + my_printf_error(EE_CANTCREATEFILE, "Table %s already exists on disk", + MYF(0), filename); + DBUG_RETURN(EE_CANTCREATEFILE); + } + + fn_format(table_name, path, "", "", MY_REPLACE_DIR | MY_REPLACE_EXT); + block.str= 0; + + aws_path_end= strxmov(aws_path, database, "/", table_name, NullS); + strmov(aws_path_end, "/aria"); + + if (s3_get_object(s3_client, aws_bucket, aws_path, &block, 0, 0)) + { + my_printf_error(EE_FILENOTFOUND, "Table %s doesn't exist in s3", MYF(0), + filename); + goto err; + } + if (block.length < MARIA_STATE_INFO_SIZE) + { + fprintf(stderr, "Wrong block length for first block: %lu\n", + (ulong) block.length); + goto err_with_free; + } + + if (display) + printf("Copying aria table: %s.%s from s3\n", database, table_name); + + /* For offset positions, check _ma_state_info_readlength() */ + offset= sizeof(state.header) + 4+ LSN_STORE_SIZE*3 + 8*5; + index_file_size= mi_sizekorr(block.str + offset); + data_file_size= mi_sizekorr(block.str + offset+8); + + if ((file= my_create(filename, 0, + O_WRONLY | O_TRUNC | O_NOFOLLOW, MYF(MY_WME))) < 0) + goto err_with_free; + + convert_index_to_disk_format(block.str); + + if (my_write(file, block.str, block.length, MYF(MY_WME | MY_FNABP))) + goto err_with_free; + + if (display) + printf("Copying index information %s\n", aws_path); + + end= strmov(aws_path_end,"/index/000000"); + + error= copy_to_file(s3_client, aws_bucket, aws_path, file, block.length, + index_file_size, compression, display); + file= -1; + if (error) + goto err_with_free; + + /* Copy data file */ + fn_format(filename, path, "", ".MAD", MY_REPLACE_EXT); + if ((file= my_create(filename, 0, + O_WRONLY | O_TRUNC | O_NOFOLLOW, MYF(MY_WME))) < 0) + DBUG_RETURN(1); + + end= strmov(aws_path_end, "/data"); + + if (display) + printf("Copying data information %s\n", aws_path); + + /* The 000000 will be update with block number by fix_suffix() */ + strmov(end, "/000000"); + + error= copy_to_file(s3_client, aws_bucket, aws_path, file, 0, data_file_size, + compression, display); + file= -1; + s3_free(&block); + block.str= 0; + if (error) + goto err; + + /* Copy frm file if it exists */ + strmov(aws_path_end, "/frm"); + if (!s3_get_object(s3_client, aws_bucket, aws_path, &block, 0, 0)) + { + fn_format(filename, path, "", ".frm", MY_REPLACE_EXT); + if ((file= my_create(filename, 0, + O_WRONLY | O_SHARE | O_NOFOLLOW | O_CLOEXEC, + MYF(0))) >= 0) + { + if (display) + printf("Copying frm file %s\n", filename); + + convert_frm_to_disk_format(block.str); + + if (my_write(file, block.str, block.length, MYF(MY_WME | MY_FNABP))) + goto err_with_free; + } + s3_free(&block); + my_close(file, MYF(MY_WME)); + file= -1; + } + + DBUG_RETURN(0); + +err_with_free: + s3_free(&block); +err: + if (file >= 0) + my_close(file, MYF(0)); + DBUG_RETURN(1); +} + + +/** + Drop all files related to a table from S3 +*/ + +int aria_delete_from_s3(ms3_st *s3_client, const char *aws_bucket, + const char *database, const char *table, + my_bool display) +{ + ms3_status_st status; + char aws_path[FN_REFLEN+100]; + char *aws_path_end; + int error; + DBUG_ENTER("aria_delete_from_s3"); + + aws_path_end= strxmov(aws_path, database, "/", table, NullS); + strmov(aws_path_end, "/aria"); + + /* Check if either /aria or /frm exists */ + + if (ms3_status(s3_client, aws_bucket, aws_path, &status)) + { + strmov(aws_path_end, "/frm"); + if (ms3_status(s3_client, aws_bucket, aws_path, &status)) + { + my_printf_error(HA_ERR_NO_SUCH_TABLE, + "Table %s.%s doesn't exist in s3", MYF(0), + database, table); + my_errno= HA_ERR_NO_SUCH_TABLE; + DBUG_RETURN(HA_ERR_NO_SUCH_TABLE); + } + } + + if (display) + printf("Delete of aria table: %s.%s\n", database, table); + + strmov(aws_path_end,"/index"); + + if (display) + printf("Delete of index information %s\n", aws_path); + + error= s3_delete_directory(s3_client, aws_bucket, aws_path); + + strmov(aws_path_end,"/data"); + if (display) + printf("Delete of data information %s\n", aws_path); + + error|= s3_delete_directory(s3_client, aws_bucket, aws_path); + + if (display) + printf("Delete of base information and frm\n"); + + strmov(aws_path_end,"/aria"); + if (s3_delete_object(s3_client, aws_bucket, aws_path, 1)) + error= 1; + + /* + Delete .frm last as this is used by discovery to check if a s3 table + exists + */ + strmov(aws_path_end,"/frm"); + /* Ignore error if .frm file doesn't exist */ + s3_delete_object(s3_client, aws_bucket, aws_path, 0); + + DBUG_RETURN(error); +} + + +/** + Rename a table in s3 +*/ + + +int aria_rename_s3(ms3_st *s3_client, const char *aws_bucket, + const char *from_database, const char *from_table, + const char *to_database, const char *to_table) +{ + ms3_status_st status; + char to_aws_path[FN_REFLEN+100], from_aws_path[FN_REFLEN+100]; + char *to_aws_path_end, *from_aws_path_end; + int error; + DBUG_ENTER("aria_rename_s3"); + + from_aws_path_end= strxmov(from_aws_path, from_database, "/", from_table, + NullS); + to_aws_path_end= strxmov(to_aws_path, to_database, "/", to_table, NullS); + strmov(from_aws_path_end, "/aria"); + + if (ms3_status(s3_client, aws_bucket, from_aws_path, &status)) + { + my_printf_error(HA_ERR_NO_SUCH_TABLE, + "Table %s.%s doesn't exist in s3", MYF(0), from_database, + from_table); + my_errno= HA_ERR_NO_SUCH_TABLE; + DBUG_RETURN(HA_ERR_NO_SUCH_TABLE); + } + + strmov(from_aws_path_end,"/index"); + strmov(to_aws_path_end,"/index"); + + error= s3_rename_directory(s3_client, aws_bucket, from_aws_path, to_aws_path, + 1); + + strmov(from_aws_path_end,"/data"); + strmov(to_aws_path_end,"/data"); + + error|= s3_rename_directory(s3_client, aws_bucket, from_aws_path, + to_aws_path, 1); + + strmov(from_aws_path_end, "/frm"); + strmov(to_aws_path_end, "/frm"); + + s3_rename_object(s3_client, aws_bucket, from_aws_path, to_aws_path, 1); + + strmov(from_aws_path_end,"/aria"); + strmov(to_aws_path_end,"/aria"); + if (s3_rename_object(s3_client, aws_bucket, from_aws_path, to_aws_path, 1)) + error= 1; + DBUG_RETURN(error); +} + + +/****************************************************************************** + Low level functions interfacing with libmarias3 +******************************************************************************/ + +/** + Create an object for index or data information + + Note that if compression is used, the data may be overwritten and + there must be COMPRESS_HEADER length of free space before the data! + +*/ + +my_bool s3_put_object(ms3_st *s3_client, const char *aws_bucket, + const char *name, uchar *data, size_t length, + my_bool compression) +{ + uint8_t error; + const char *errmsg; + DBUG_ENTER("s3_put_object"); + DBUG_PRINT("enter", ("name: %s", name)); + + if (compression) + { + size_t comp_len; + + data[-COMPRESS_HEADER]= 0; // No compression + if (!my_compress(data, &length, &comp_len)) + data[-COMPRESS_HEADER]= 1; // Compressed package + data-= COMPRESS_HEADER; + length+= COMPRESS_HEADER; + int3store(data+1, comp_len); // Original length or 0 + } + + if (likely(!(error= ms3_put(s3_client, aws_bucket, name, data, length)))) + DBUG_RETURN(FALSE); + + if (!(errmsg= ms3_server_error(s3_client))) + errmsg= ms3_error(error); + + my_printf_error(EE_WRITE, "Got error from put_object(%s): %d %s", MYF(0), + name, error, errmsg); + DBUG_RETURN(TRUE); +} + + +/** + Read an object for index or data information +*/ + +my_bool s3_get_object(ms3_st *s3_client, const char *aws_bucket, + const char *name, S3_BLOCK *block, + my_bool compression, my_bool print_error) +{ + uint8_t error; + uchar *data; + DBUG_ENTER("s3_get_object"); + DBUG_PRINT("enter", ("name: %s compression: %d", name, compression)); + + block->str= block->alloc_ptr= 0; + if (likely(!(error= ms3_get(s3_client, aws_bucket, name, + (uint8_t**) &block->alloc_ptr, + &block->length)))) + { + block->str= block->alloc_ptr; + if (compression) + { + size_t length; + + /* If not compressed */ + if (!block->str[0]) + { + block->length-= COMPRESS_HEADER; + block->str+= COMPRESS_HEADER; + + /* Simple check to ensure that it's a correct block */ + if (block->length % 1024) + { + s3_free(block); + my_printf_error(HA_ERR_NOT_A_TABLE, + "Block '%s' is not compressed", MYF(0), name); + DBUG_RETURN(TRUE); + } + DBUG_RETURN(FALSE); + } + + if (((uchar*)block->str)[0] > 1) + { + s3_free(block); + my_printf_error(HA_ERR_NOT_A_TABLE, + "Block '%s' is not compressed", MYF(0), name); + DBUG_RETURN(TRUE); + } + + length= uint3korr(block->str+1); + + if (!(data= (uchar*) my_malloc(length, MYF(MY_WME | MY_THREAD_SPECIFIC)))) + { + s3_free(block); + DBUG_RETURN(TRUE); + } + if (uncompress(data, &length, block->str + COMPRESS_HEADER, + block->length - COMPRESS_HEADER)) + { + my_printf_error(ER_NET_UNCOMPRESS_ERROR, + "Got error uncompressing s3 packet", MYF(0)); + s3_free(block); + my_free(data); + DBUG_RETURN(TRUE); + } + s3_free(block); + block->str= block->alloc_ptr= data; + block->length= length; + } + DBUG_RETURN(FALSE); + } + if (print_error) + { + if (error == 9) + { + my_printf_error(EE_FILENOTFOUND, "Expected object '%s' didn't exists", + MYF(0), name); + my_errno= EE_FILENOTFOUND; + } + else + { + const char *errmsg; + if (!(errmsg= ms3_server_error(s3_client))) + errmsg= ms3_error(error); + + my_printf_error(EE_READ, "Got error from get_object(%s): %d %s", MYF(0), + name, error, errmsg); + my_errno= EE_READ; + } + } + s3_free(block); + DBUG_RETURN(TRUE); +} + + +my_bool s3_delete_object(ms3_st *s3_client, const char *aws_bucket, + const char *name, my_bool print_error) +{ + uint8_t error; + DBUG_ENTER("s3_delete_object"); + DBUG_PRINT("enter", ("name: %s", name)); + + if (likely(!(error= ms3_delete(s3_client, aws_bucket, name)))) + DBUG_RETURN(FALSE); + + if (print_error) + { + if (error == 9) + my_printf_error(EE_FILENOTFOUND, "Expected object '%s' didn't exists", + MYF(0), name); + else + { + const char *errmsg; + if (!(errmsg= ms3_server_error(s3_client))) + errmsg= ms3_error(error); + + my_printf_error(EE_READ, "Got error from delete_object(%s): %d %s", + MYF(0), name, error, errmsg); + } + } + DBUG_RETURN(TRUE); +} + + +/* + Drop all files in a 'directory' in s3 +*/ + +int s3_delete_directory(ms3_st *s3_client, const char *aws_bucket, + const char *path) +{ + ms3_list_st *list, *org_list= 0; + my_bool error; + DBUG_ENTER("delete_directory"); + DBUG_PRINT("enter", ("path: %s", path)); + + if ((error= ms3_list(s3_client, aws_bucket, path, &org_list))) + { + const char *errmsg; + if (!(errmsg= ms3_server_error(s3_client))) + errmsg= ms3_error(error); + + my_printf_error(EE_FILENOTFOUND, + "Can't get list of files from %s. Error: %d %s", MYF(0), + path, error, errmsg); + DBUG_RETURN(EE_FILENOTFOUND); + } + + for (list= org_list ; list ; list= list->next) + if (s3_delete_object(s3_client, aws_bucket, list->key, 1)) + error= 1; + if (org_list) + ms3_list_free(org_list); + DBUG_RETURN(error); +} + + +my_bool s3_rename_object(ms3_st *s3_client, const char *aws_bucket, + const char *from_name, const char *to_name, + my_bool print_error) +{ + uint8_t error; + DBUG_ENTER("s3_rename_object"); + DBUG_PRINT("enter", ("from: %s to: %s", from_name, to_name)); + + if (likely(!(error= ms3_move(s3_client, + aws_bucket, from_name, + aws_bucket, to_name)))) + DBUG_RETURN(FALSE); + + if (print_error) + { + if (error == 9) + { + my_printf_error(EE_FILENOTFOUND, "Expected object '%s' didn't exists", + MYF(0), from_name); + } + else + { + const char *errmsg; + if (!(errmsg= ms3_server_error(s3_client))) + errmsg= ms3_error(error); + + my_printf_error(EE_READ, "Got error from move_object(%s -> %s): %d %", + MYF(0), + from_name, to_name, error, errmsg); + } + } + DBUG_RETURN(TRUE); +} + + +int s3_rename_directory(ms3_st *s3_client, const char *aws_bucket, + const char *from_name, const char *to_name, + my_bool print_error) +{ + ms3_list_st *list, *org_list= 0; + my_bool error= 0; + char name[AWS_PATH_LENGTH], *end; + DBUG_ENTER("s3_delete_directory"); + + if ((error= ms3_list(s3_client, aws_bucket, from_name, &org_list))) + { + const char *errmsg; + if (!(errmsg= ms3_server_error(s3_client))) + errmsg= ms3_error(error); + + my_printf_error(EE_FILENOTFOUND, + "Can't get list of files from %s. Error: %d %s", MYF(0), + from_name, error, errmsg); + DBUG_RETURN(EE_FILENOTFOUND); + } + + end= strmov(name, to_name); + for (list= org_list ; list ; list= list->next) + { + const char *sep= strrchr(list->key, '/'); + if (sep) /* Safety */ + { + strmake(end, sep, (sizeof(name) - (end-name) - 1)); + if (s3_rename_object(s3_client, aws_bucket, list->key, name, + print_error)) + error= 1; + } + } + if (org_list) + ms3_list_free(org_list); + DBUG_RETURN(error); +} + + +/****************************************************************************** + Converting index and frm files to from S3 storage engine +******************************************************************************/ + +/** + Change index information to be of type s3 + + @param header Copy of header in index file + @param block_size S3 block size + @param compression Compression algorithm to use + + The position are from _ma_base_info_write() +*/ + +static void convert_index_to_s3_format(uchar *header, ulong block_size, + int compression) +{ + MARIA_STATE_INFO state; + uchar *base_pos; + uint base_offset; + + memcpy(state.header.file_version, header, sizeof(state.header)); + base_offset= mi_uint2korr(state.header.base_pos); + base_pos= header + base_offset; + + base_pos[107]= (uchar) compression; + mi_int3store(base_pos+119, block_size); +} + + +/** + Change index information to be a normal disk based table +*/ + +static void convert_index_to_disk_format(uchar *header) +{ + MARIA_STATE_INFO state; + uchar *base_pos; + uint base_offset; + + memcpy(state.header.file_version, header, sizeof(state.header)); + base_offset= mi_uint2korr(state.header.base_pos); + base_pos= header + base_offset; + + base_pos[107]= 0; + mi_int3store(base_pos+119, 0); +} + +/** + Change storage engine in the .frm file from Aria to s3 + + For information about engine types, see legacy_db_type +*/ + +static void convert_frm_to_s3_format(uchar *header) +{ + DBUG_ASSERT(header[3] == 42 || header[3] == 41); /* Aria or S3 */ + header[3]= 41; /* S3 */ +} + +/** + Change storage engine in the .frm file from S3 to Aria + + For information about engine types, see legacy_db_type +*/ + +static void convert_frm_to_disk_format(uchar *header) +{ + DBUG_ASSERT(header[3] == 41); /* S3 */ + header[3]= 42; /* Aria */ +} + + +/****************************************************************************** + Helper functions +******************************************************************************/ + +/** + Set database and table name from path + + s3->database and s3->table_name will be pointed into path + Note that s3->database will not be null terminated! +*/ + +my_bool set_database_and_table_from_path(S3_INFO *s3, const char *path) +{ + size_t org_length= dirname_length(path); + size_t length= 0; + + if (!org_length) + return 1; + + s3->table.str= path+org_length; + s3->table.length= strlen(s3->table.str); + for (length= --org_length; length > 0 ; length --) + { + if (path[length-1] == FN_LIBCHAR || path[length-1] == '/') + break; +#ifdef FN_DEVCHAR + if (path[length-1] == FN_DECVHAR) + break; +#endif + } + if (length && + (path[length] != FN_CURLIB || org_length - length != 1)) + { + s3->database.str= path + length; + s3->database.length= org_length - length; + return 0; + } + return 1; /* Can't find database */ +} + + +/** + Read frm from the disk +*/ + +static int s3_read_frm_from_disk(const char *filename, uchar **to, + size_t *to_size) +{ + File file; + uchar *alloc_block; + size_t file_size; + + *to= 0; + if ((file= my_open(filename, + O_RDONLY | O_SHARE | O_NOFOLLOW | O_CLOEXEC, + MYF(MY_WME))) < 0) + return(1); + + file_size= (size_t) my_seek(file, 0L, MY_SEEK_END, MYF(0)); + if (!(alloc_block= my_malloc(file_size, MYF(MY_WME)))) + goto err; + + if (my_pread(file, alloc_block, file_size, 0, MYF(MY_WME | MY_FNABP))) + goto err; + + *to= alloc_block; + *to_size= file_size; + my_close(file, MYF(0)); + return 0; + +err: + my_free(alloc_block); + my_close(file, MYF(0)); + return 1; +} + + +/** + Get .frm from S3 + + @return 0 ok + @return 1 error +*/ + +my_bool s3_get_frm(ms3_st *s3_client, S3_INFO *s3_info, S3_BLOCK *block) +{ + char aws_path[AWS_PATH_LENGTH]; + + strxnmov(aws_path, sizeof(aws_path)-1, s3_info->database.str, "/", + s3_info->table.str, "/frm", NullS); + + return s3_get_object(s3_client, s3_info->bucket.str, aws_path, block, + 0, 0); +} + +/** + Check if .frm exits in S3 + + @return 0 frm exists + @return 1 error +*/ + +my_bool s3_frm_exists(ms3_st *s3_client, S3_INFO *s3_info) +{ + char aws_path[AWS_PATH_LENGTH]; + ms3_status_st status; + + strxnmov(aws_path, sizeof(aws_path)-1, s3_info->database.str, "/", + s3_info->table.str, "/frm", NullS); + + return ms3_status(s3_client, s3_info->bucket.str, aws_path, &status); +} + + +/** + Get version from frm file + + @param out Store the table_version_here. It's of size MY_UUID_SIZE + @param frm_image Frm image + @param frm_length size of image + + @return 0 Was able to read table version + @return 1 Wrong information in frm file +*/ + +#define FRM_HEADER_SIZE 64 +#define EXTRA2_TABLEDEF_VERSION 0 + +static inline bool is_binary_frm_header(const uchar *head) +{ + return head[0] == 254 + && head[1] == 1 + && head[2] >= FRM_VER + && head[2] <= FRM_VER_CURRENT; +} + +static my_bool get_tabledef_version_from_frm(char *out, const uchar *frm_image, + size_t frm_length) +{ + uint segment_len; + const uchar *extra, *extra_end; + if (!is_binary_frm_header(frm_image) || frm_length <= FRM_HEADER_SIZE) + return 1; + + /* Length of the MariaDB extra2 segment in the form file. */ + segment_len= uint2korr(frm_image + 4); + if (frm_length < FRM_HEADER_SIZE + segment_len) + return 1; + + extra= frm_image + FRM_HEADER_SIZE; + if (*extra == '/') // old frm had '/' there + return 1; + + extra_end= extra + segment_len; + while (extra + 4 < extra_end) + { + uchar type= *extra++; + size_t length= *extra++; + if (!length) + { + length= uint2korr(extra); + extra+= 2; + if (length < 256) + return 1; /* Something is wrong */ + } + if (extra + length > extra_end) + return 1; + if (type == EXTRA2_TABLEDEF_VERSION) + { + if (length != MY_UUID_SIZE) + return 1; + memcpy(out, extra, length); + return 0; /* Found it */ + } + extra+= length; + } + return 1; +} + + +/** + Check if version in frm file matches what the server expects + + @return 0 table definitions matches + @return 1 table definitions doesn't match + @return 2 Can't find the frm version + @return 3 Can't read the frm version +*/ + +int s3_check_frm_version(ms3_st *s3_client, S3_INFO *s3_info) +{ + my_bool res= 0; + char aws_path[AWS_PATH_LENGTH]; + char uuid[MY_UUID_SIZE]; + S3_BLOCK block; + + strxnmov(aws_path, sizeof(aws_path)-1, s3_info->database.str, "/", + s3_info->table.str, "/frm", NullS); + + if (s3_get_object(s3_client, s3_info->bucket.str, aws_path, &block, 0, 0)) + return 2; /* Ignore check, use old frm */ + + if (get_tabledef_version_from_frm(uuid, (uchar*) block.str, block.length) || + s3_info->tabledef_version.length != MY_UUID_SIZE) + { + s3_free(&block); + return 3; /* Wrong definition */ + } + /* res is set to 1 if versions numbers doesn't match */ + res= bcmp(s3_info->tabledef_version.str, uuid, MY_UUID_SIZE) != 0; + s3_free(&block); + return res; +} + + +/****************************************************************************** + Reading blocks from index or data from S3 +******************************************************************************/ + +/* + Read the index header (first page) from the index file + + In case of error, my_error() is called +*/ + +my_bool read_index_header(ms3_st *client, S3_INFO *s3, S3_BLOCK *block) +{ + char aws_path[AWS_PATH_LENGTH]; + DBUG_ENTER("read_index_header"); + strxnmov(aws_path, sizeof(aws_path)-1, s3->database.str, "/", s3->table.str, + "/aria", NullS); + DBUG_RETURN(s3_get_object(client, s3->bucket.str, aws_path, block, 0, 1)); +} + + +#ifdef FOR_FUTURE_IF_NEEDED_FOR_DEBUGGING_WITHOUT_S3 +/** + Read a big block from disk +*/ + +my_bool s3_block_read(struct st_pagecache *pagecache, + PAGECACHE_IO_HOOK_ARGS *args, + struct st_pagecache_file *file, + LEX_STRING *data) +{ + MARIA_SHARE *share= (MARIA_SHARE*) file->callback_data; + my_bool datafile= file != &share->kfile; + + DBUG_ASSERT(file->big_block_size > 0); + DBUG_ASSERT(((((my_off_t) args->pageno - file->head_blocks) << + pagecache->shift) % + file->big_block_size) == 0); + + if (!(data->str= (char *) my_malloc(file->big_block_size, MYF(MY_WME)))) + return TRUE; + + data->length= mysql_file_pread(file->file, + (unsigned char *)data->str, + file->big_block_size, + ((my_off_t) args->pageno << pagecache->shift), + MYF(MY_WME)); + if (data->length == 0 || data->length == MY_FILE_ERROR) + { + if (data->length == 0) + { + LEX_STRING *file_name= (datafile ? + &share->data_file_name : + &share->index_file_name); + my_error(EE_EOFERR, MYF(0), file_name->str, my_errno); + } + my_free(data->str); + data->length= 0; + data->str= 0; + return TRUE; + } + return FALSE; +} +#endif + + +/** + Read a block from S3 to page cache +*/ + +my_bool s3_block_read(struct st_pagecache *pagecache, + PAGECACHE_IO_HOOK_ARGS *args, + struct st_pagecache_file *file, + S3_BLOCK *block) +{ + char aws_path[AWS_PATH_LENGTH]; + MARIA_SHARE *share= (MARIA_SHARE*) file->callback_data; + my_bool datafile= file->file != share->kfile.file; + MARIA_HA *info= (MARIA_HA*) my_thread_var->keycache_file; + ms3_st *client= info->s3; + const char *path_suffix= datafile ? "/data/" : "/index/"; + char *end; + S3_INFO *s3= share->s3_path; + ulong block_number; + DBUG_ENTER("s3_block_read"); + + DBUG_ASSERT(file->big_block_size > 0); + DBUG_ASSERT(((((my_off_t) args->pageno - file->head_blocks) << + pagecache->shift) % + file->big_block_size) == 0); + + block_number= (((args->pageno - file->head_blocks) << pagecache->shift) / + file->big_block_size) + 1; + + end= strxnmov(aws_path, sizeof(aws_path)-12, s3->database.str, "/", + s3->table.str, path_suffix, "000000", NullS); + fix_suffix(end, block_number); + + DBUG_RETURN(s3_get_object(client, s3->bucket.str, aws_path, block, + share->base.compression_algorithm, 1)); +} + +/* + Start file numbers from 1000 to more easily find bugs when the file number + could be mistaken for a real file +*/ +static volatile int32 unique_file_number= 1000; + +int32 s3_unique_file_number() +{ + return my_atomic_add32_explicit(&unique_file_number, 1, + MY_MEMORY_ORDER_RELAXED); +} diff --git a/storage/maria/s3_func.h b/storage/maria/s3_func.h new file mode 100644 index 00000000000..a99d9fe423f --- /dev/null +++ b/storage/maria/s3_func.h @@ -0,0 +1,110 @@ +#ifndef S3_FUNC_INCLUDED +#define S3_FUNC_INCLUDED +/* Copyright (C) 2019 MariaDB Corporation Ab + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software Foundation, + Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111-1301 USA */ + +/* + Interface function used by S3 storage engine and aria_copy_for_s3 +*/ + +#ifdef WITH_S3_STORAGE_ENGINE +C_MODE_START +#include <libmarias3/marias3.h> + +/* Store information about a s3 connection */ + +typedef struct s3_info +{ + LEX_CSTRING access_key, secret_key, region, bucket; + + /* The following will be filled in by maria_open() */ + LEX_CSTRING database, table; + + /* Sent to open to verify version */ + LEX_CUSTRING tabledef_version; +} S3_INFO; + + +/* flag + length is stored in this header */ +#define COMPRESS_HEADER 4 + +/* Max length of an AWS PATH */ +#define AWS_PATH_LENGTH ((NAME_LEN)*3+3+10+6+11) + +void s3_init_library(void); +void s3_deinit_library(void); +int aria_copy_to_s3(ms3_st *s3_client, const char *aws_bucket, + const char *path, + const char *database, const char *table_name, + ulong block_size, my_bool compression, + my_bool force, my_bool display); +int aria_copy_from_s3(ms3_st *s3_client, const char *aws_bucket, + const char *path,const char *database, + my_bool compression, my_bool force, my_bool display); +int aria_delete_from_s3(ms3_st *s3_client, const char *aws_bucket, + const char *database, const char *table, + my_bool display); +int aria_rename_s3(ms3_st *s3_client, const char *aws_bucket, + const char *from_database, const char *from_table, + const char *to_database, const char *to_table); +ms3_st *s3_open_connection(S3_INFO *s3); +my_bool s3_put_object(ms3_st *s3_client, const char *aws_bucket, + const char *name, uchar *data, size_t length, + my_bool compression); +my_bool s3_get_object(ms3_st *s3_client, const char *aws_bucket, + const char *name, S3_BLOCK *block, my_bool compression, + my_bool print_error); +my_bool s3_delete_object(ms3_st *s3_client, const char *aws_bucket, + const char *name, my_bool print_error); +my_bool s3_rename_object(ms3_st *s3_client, const char *aws_bucket, + const char *from_name, const char *to_name, + my_bool print_error); +void s3_free(S3_BLOCK *data); +my_bool s3_copy_from_file(ms3_st *s3_client, const char *aws_bucket, + char *aws_path, File file, my_off_t start, + my_off_t file_end, uchar *block, size_t block_size, + my_bool compression, my_bool display); +my_bool s3_copy_to_file(ms3_st *s3_client, const char *aws_bucket, + char *aws_path, File file, my_off_t start, + my_off_t file_end, my_bool compression, + my_bool display); +int s3_delete_directory(ms3_st *s3_client, const char *aws_bucket, + const char *path); +int s3_rename_directory(ms3_st *s3_client, const char *aws_bucket, + const char *from_name, const char *to_name, + my_bool print_error); + +S3_INFO *s3_info_copy(S3_INFO *old); +my_bool set_database_and_table_from_path(S3_INFO *s3, const char *path); +my_bool s3_get_frm(ms3_st *s3_client, S3_INFO *S3_info, S3_BLOCK *block); +my_bool s3_frm_exists(ms3_st *s3_client, S3_INFO *s3_info); +int s3_check_frm_version(ms3_st *s3_client, S3_INFO *s3_info); +my_bool read_index_header(ms3_st *client, S3_INFO *s3, S3_BLOCK *block); +int32 s3_unique_file_number(void); +my_bool s3_block_read(struct st_pagecache *pagecache, + PAGECACHE_IO_HOOK_ARGS *args, + struct st_pagecache_file *file, + S3_BLOCK *block); +C_MODE_END +#else + +C_MODE_START +/* Dummy structures and interfaces to be used when compiling without S3 */ +struct s3_info; +typedef struct s3_info S3_INFO; +struct ms3_st; +C_MODE_END +#endif /* WITH_S3_STORAGE_ENGINE */ +#endif /* HA_S3_FUNC_INCLUDED */ diff --git a/storage/maria/test_aria_s3_copy.sh b/storage/maria/test_aria_s3_copy.sh new file mode 100755 index 00000000000..ad39df69de2 --- /dev/null +++ b/storage/maria/test_aria_s3_copy.sh @@ -0,0 +1,56 @@ +#!/bin/bash + +# +# Note that this test expact that there are tables test1 and test2 in +# the current directory where test2 has also a .frm file +# + +TMPDIR=tmpdir +LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/local/lib64/ + +my_cmp() +{ + if ! cmp $1 $TMPDIR/$1 + then + echo "aborting" + exit 1; + fi +} + +run_test() +{ + OPT=$1; + echo "******* Running test with options '$OPT' **********" + rm -rf $TMPDIR + mkdir $TMPDIR + cp test?.* $TMPDIR + if ! ./aria_s3_copy --op=to --force $OPT test1 test2 + then + echo Got error $? + exit 1; + fi + rm test?.* + if ! ./aria_s3_copy --op=from $OPT test1 test2 + then + echo Got error $? + exit 1; + fi + if ! ./aria_s3_copy --op=delete $OPT test1 test2 + then + echo Got error $? + exit 1; + fi + my_cmp test1.MAI + my_cmp test1.MAD + my_cmp test2.MAI + my_cmp test2.MAD + my_cmp test2.frm + rm test?.* + cp $TMPDIR/* . + rm -r $TMPDIR +} + +run_test "" +run_test "--s3_block_size=64K --compress" +run_test "--s3_block_size=4M" +echo "ok" diff --git a/storage/maria/test_ma_backup.c b/storage/maria/test_ma_backup.c index 2a9a6704ecb..4d0599dfc46 100644 --- a/storage/maria/test_ma_backup.c +++ b/storage/maria/test_ma_backup.c @@ -315,7 +315,7 @@ static int create_test_table(const char *table_name, int type_of_table) uniques, &uniquedef, &create_info, create_flag)) goto err; - if (!(file=maria_open(table_name,2,HA_OPEN_ABORT_IF_LOCKED))) + if (!(file=maria_open(table_name,2,HA_OPEN_ABORT_IF_LOCKED, 0))) goto err; if (!silent) printf("- Writing key:s\n"); |