diff options
author | Clarisse Cheah <clarisse.cheah@mongodb.com> | 2023-02-06 00:48:27 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2023-02-06 01:25:32 +0000 |
commit | 5e1084da11f60c12eda14feaddfc58da253e7c4c (patch) | |
tree | ed8484dc03c8ffa31dc9c2d1441bdc8aee83b619 | |
parent | 362494af33a0c84570024452176bb7daa1d665d6 (diff) | |
download | mongo-5e1084da11f60c12eda14feaddfc58da253e7c4c.tar.gz |
Import wiredtiger: 7cef0e9b3ea2ee63b54b2e0f592cb0a18e25720b from branch mongodb-master
ref: 82a6defada..7cef0e9b3e
for: 6.3.0-rc0
WT-10356 implement customise_file_system and add_reference for GCP storage sources
4 files changed, 288 insertions, 35 deletions
diff --git a/src/third_party/wiredtiger/ext/storage_sources/gcp_store/gcp_connection.cpp b/src/third_party/wiredtiger/ext/storage_sources/gcp_store/gcp_connection.cpp index beb09e3b665..f4f2852ce7b 100644 --- a/src/third_party/wiredtiger/ext/storage_sources/gcp_store/gcp_connection.cpp +++ b/src/third_party/wiredtiger/ext/storage_sources/gcp_store/gcp_connection.cpp @@ -34,8 +34,7 @@ namespace gcs = google::cloud::storage; using namespace gcs; gcp_connection::gcp_connection(const std::string &bucket_name, const std::string &prefix) - : _gcp_client(google::cloud::storage::Client()), _bucket_name(bucket_name), - _object_prefix(prefix) + : _gcp_client(gcs::Client()), _bucket_name(bucket_name), _object_prefix(prefix) { // StatusOr either contains a usable BucketMetadata value or a Status object explaining why the // value is not present. The value's validity is checked by StatusOr::ok(). diff --git a/src/third_party/wiredtiger/ext/storage_sources/gcp_store/gcp_storage_source.cpp b/src/third_party/wiredtiger/ext/storage_sources/gcp_store/gcp_storage_source.cpp index 89021c656e1..30f22f01dbd 100644 --- a/src/third_party/wiredtiger/ext/storage_sources/gcp_store/gcp_storage_source.cpp +++ b/src/third_party/wiredtiger/ext/storage_sources/gcp_store/gcp_storage_source.cpp @@ -13,23 +13,31 @@ struct gcp_file_system; struct gcp_file_handle; /* - * The first struct member must be the WT interface that is being implemented. + * The first struct member must be the WiredTiger interface that is being implemented. */ struct gcp_store { - WT_STORAGE_SOURCE store; + WT_STORAGE_SOURCE storage_source; + WT_EXTENSION_API *wt_api; + std::mutex fs_list_mutex; + std::vector<gcp_file_system *> fs_list; std::vector<gcp_file_system> gcp_fs; + uint32_t reference_count; + WT_VERBOSE_LEVEL verbose; }; struct gcp_file_system { - WT_FILE_SYSTEM fs; - gcp_store *store; - std::vector<gcp_file_handle> gcp_fh; - gcp_connection *gcp_conn; + WT_FILE_SYSTEM file_system; + gcp_store *storage_source; + WT_FILE_SYSTEM *wt_file_system; + std::unique_ptr<gcp_connection> gcp_conn; + std::vector<gcp_file_handle *> fh_list; + std::string home_dir; }; struct gcp_file_handle { WT_FILE_HANDLE fh; - gcp_store *store; + gcp_store *storage_source; + WT_FILE_HANDLE *wt_file_handle; }; static int gcp_customize_file_system(WT_STORAGE_SOURCE *, WT_SESSION *, const char *, const char *, @@ -40,6 +48,7 @@ static int gcp_flush(WT_STORAGE_SOURCE *, WT_SESSION *, WT_FILE_SYSTEM *, const const char *, const char *) __attribute__((__unused__)); static int gcp_flush_finish(WT_STORAGE_SOURCE *, WT_SESSION *, WT_FILE_SYSTEM *, const char *, const char *, const char *) __attribute__((__unused__)); +static int gcp_file_close(WT_FILE_HANDLE *, WT_SESSION *) __attribute__((__unused__)); static int gcp_file_exists(WT_FILE_SYSTEM *, WT_SESSION *, const char *, bool *) __attribute__((__unused__)); static int gcp_file_open(WT_FILE_SYSTEM *, WT_SESSION *, const char *, WT_FS_OPEN_FILE_TYPE, @@ -48,32 +57,136 @@ static int gcp_remove(WT_FILE_SYSTEM *, WT_SESSION *, const char *, uint32_t) __attribute__((__unused__)); static int gcp_rename(WT_FILE_SYSTEM *, WT_SESSION *, const char *, const char *, uint32_t) __attribute__((__unused__)); -static int gcp_file_size(WT_FILE_HANDLE *, WT_SESSION *, wt_off_t *) __attribute__((__unused__)); +static int gcp_file_size(WT_FILE_SYSTEM *, WT_SESSION *, const char *, wt_off_t *) + __attribute__((__unused__)); +static int gcp_object_list( + WT_FILE_SYSTEM *, WT_SESSION *, const char *, const char *, char ***, uint32_t *); static int gcp_object_list_add(const gcp_store &, char ***, const std::vector<std::string> &, const uint32_t) __attribute__((__unused__)); static int gcp_object_list_single(WT_FILE_SYSTEM *, WT_SESSION *, const char *, const char *, char ***, uint32_t *) __attribute__((__unused__)); static int gcp_object_list_free(WT_FILE_SYSTEM *, WT_SESSION *, char **, uint32_t) __attribute__((__unused__)); +static int gcp_terminate(WT_STORAGE_SOURCE *, WT_SESSION *) __attribute__((__unused__)); static int -gcp_customize_file_system(WT_STORAGE_SOURCE *store, WT_SESSION *session, const char *bucket, - const char *auth_token, const char *config, WT_FILE_SYSTEM **file_system) +gcp_customize_file_system(WT_STORAGE_SOURCE *storage_source, WT_SESSION *session, + const char *bucket, const char *auth_file, const char *config, WT_FILE_SYSTEM **file_system) { - WT_UNUSED(store); - WT_UNUSED(session); - WT_UNUSED(bucket); - WT_UNUSED(auth_token); - WT_UNUSED(config); - WT_UNUSED(file_system); + // Check if bucket name is given + if (bucket == nullptr || strlen(bucket) == 0) { + std::cerr << "gcp_customize_file_system: bucket not specified." << std::endl; + return EINVAL; + } + + // Fail if there is no authentication provided. + if (auth_file == nullptr || strlen(auth_file) == 0) { + std::cerr << "gcp_customize_file_system: auth_file not specified." << std::endl; + return EINVAL; + } + + if (std::filesystem::path(auth_file).extension() != ".json") { + std::cerr << "gcp_customize_file_system: improper auth_file: " + std::string(auth_file) + + " should be a .json file." + << std::endl; + return EINVAL; + } + + gcp_store *gcp = reinterpret_cast<gcp_store *>(storage_source); + int ret; + + // Get any prefix to be used for the object keys. + WT_CONFIG_ITEM obj_prefix_conf; + std::string obj_prefix; + if ((ret = gcp->wt_api->config_get_string( + gcp->wt_api, session, config, "prefix", &obj_prefix_conf)) == 0) + obj_prefix = std::string(obj_prefix_conf.str, obj_prefix_conf.len); + else if (ret != WT_NOTFOUND) { + std::cerr << "gcp_customize_file_system: error parsing config for object prefix." + << std::endl; + return ret; + } + + // Fetch the native WiredTiger file system. + WT_FILE_SYSTEM *wt_file_system; + if ((ret = gcp->wt_api->file_system_get(gcp->wt_api, session, &wt_file_system)) != 0) { + std::cerr << "gcp_customize_file_system: failed to fetch the native WireTiger file system" + << std::endl; + return ret; + } + + // Create the file system and allocate memory for file system. + gcp_file_system *fs; + try { + fs = new gcp_file_system; + } catch (std::bad_alloc &e) { + std::cerr << "gcp_customize_file_system: " << e.what() << std::endl; + return ENOMEM; + } + + // Set variables specific to GCP. + fs->storage_source = gcp; + fs->wt_file_system = wt_file_system; + fs->home_dir = session->connection->get_home(session->connection); + + // Create connection to google cloud. + try { + fs->gcp_conn = std::make_unique<gcp_connection>(bucket, obj_prefix); + } catch (std::invalid_argument &e) { + std::cerr << "gcp_customize_file_system: " << e.what() << std::endl; + return EINVAL; + } + + // Map google cloud functions to the file system. + fs->file_system.fs_directory_list = gcp_object_list; + fs->file_system.fs_directory_list_single = gcp_object_list_single; + fs->file_system.fs_directory_list_free = gcp_object_list_free; + fs->file_system.terminate = gcp_file_system_terminate; + fs->file_system.fs_exist = gcp_file_exists; + fs->file_system.fs_open_file = gcp_file_open; + fs->file_system.fs_remove = gcp_remove; + fs->file_system.fs_rename = gcp_rename; + fs->file_system.fs_size = gcp_file_size; + + // Add to the list of the active file systems. Lock will be freed when the scope is exited. + { + std::lock_guard<std::mutex> lock_guard(gcp->fs_list_mutex); + gcp->fs_list.push_back(fs); + } + + *file_system = &fs->file_system; + + return 0; +} + +static int +gcp_add_reference(WT_STORAGE_SOURCE *storage_source) +{ + gcp_store *gcp = reinterpret_cast<gcp_store *>(storage_source); + + if (gcp->reference_count == 0) { + std::cerr << "gcp_add_reference: gcp storage source extension hasn't been initialized." + << std::endl; + return EINVAL; + } + + if (gcp->reference_count + 1 == 0) { + std::cerr << "gcp_add_reference: adding reference will overflow reference count." + << std::endl; + return EINVAL; + } + + ++gcp->reference_count; return 0; } +// File handle close. static int -gcp_add_reference(WT_STORAGE_SOURCE *store) +gcp_file_close(WT_FILE_HANDLE *file_handle, WT_SESSION *session) { - WT_UNUSED(store); + WT_UNUSED(file_handle); + WT_UNUSED(session); return 0; } @@ -81,9 +194,22 @@ gcp_add_reference(WT_STORAGE_SOURCE *store) static int gcp_file_system_terminate(WT_FILE_SYSTEM *file_system, WT_SESSION *session) { - WT_UNUSED(file_system); + gcp_file_system *fs = reinterpret_cast<gcp_file_system *>(file_system); + gcp_store *gcp = reinterpret_cast<gcp_file_system *>(fs)->storage_source; + WT_UNUSED(session); + // Remove the current filesystem from the active filesystems list. The lock will be freed when + // the scope is exited. + { + std::lock_guard<std::mutex> lock_guard(gcp->fs_list_mutex); + // Erase remove idiom is used here to remove specific file system. + gcp->fs_list.erase( + std::remove(gcp->fs_list.begin(), gcp->fs_list.end(), fs), gcp->fs_list.end()); + } + + delete (fs); + return 0; } @@ -166,16 +292,31 @@ gcp_rename(WT_FILE_SYSTEM *file_system, WT_SESSION *session, const char *from, c } static int -gcp_file_size(WT_FILE_HANDLE *file_handle, WT_SESSION *session, wt_off_t *sizep) +gcp_file_size(WT_FILE_SYSTEM *file_system, WT_SESSION *session, const char *name, wt_off_t *sizep) { - WT_UNUSED(file_handle); + WT_UNUSED(file_system); WT_UNUSED(session); + WT_UNUSED(name); WT_UNUSED(sizep); return 0; } static int +gcp_object_list(WT_FILE_SYSTEM *file_system, WT_SESSION *session, const char *directory, + const char *prefix, char ***object_list, uint32_t *count) +{ + WT_UNUSED(file_system); + WT_UNUSED(session); + WT_UNUSED(directory); + WT_UNUSED(prefix); + WT_UNUSED(object_list); + WT_UNUSED(count); + + return 0; +} + +static int gcp_object_list_add(const gcp_store &gcp_, char ***object_list, const std::vector<std::string> &objects, const uint32_t count) { @@ -213,11 +354,68 @@ gcp_object_list_free( return 0; } -int -wiredtiger_extension_init(WT_CONNECTION *connection, WT_CONFIG_ARG *config) +static int +gcp_terminate(WT_STORAGE_SOURCE *storage_source, WT_SESSION *session) { - WT_UNUSED(connection); - WT_UNUSED(config); + gcp_store *gcp = reinterpret_cast<gcp_store *>(storage_source); + + if (--gcp->reference_count != 0) + return 0; + + /* + * Terminate any active filesystems. There are no references to the storage source, so it is + * safe to walk the active filesystem list without a lock. The removal from the list happens + * under a lock. Also, removal happens from the front and addition at the end, so we are safe. + */ + while (!gcp->fs_list.empty()) { + gcp_file_system *fs = gcp->fs_list.front(); + gcp_file_system_terminate(&fs->file_system, session); + } + + std::cout << "gcp_terminate: terminated GCP storage source." << std::endl; + delete (gcp); return 0; } + +int +wiredtiger_extension_init(WT_CONNECTION *connection, WT_CONFIG_ARG *config) +{ + gcp_store *gcp; + WT_CONFIG_ITEM v; + + gcp = new gcp_store; + gcp->wt_api = connection->get_extension_api(connection); + int ret = gcp->wt_api->config_get(gcp->wt_api, nullptr, config, "verbose.tiered", &v); + + if (ret == 0 && v.val >= WT_VERBOSE_ERROR && v.val <= WT_VERBOSE_DEBUG_5) { + gcp->verbose = (WT_VERBOSE_LEVEL)v.val; + } else if (ret != WT_NOTFOUND) { + std::cerr << "wiredtiger_extension_init: error parsing config for verbosity level." << v.val + << std::endl; + delete (gcp); + return (ret != 0 ? ret : EINVAL); + } + + // Allocate a gcp storage structure, with a WT_STORAGE structure as the first field. + // This allows us to treat references to either type of structure as a reference to the other + // type. + gcp->storage_source.ss_customize_file_system = gcp_customize_file_system; + gcp->storage_source.ss_add_reference = gcp_add_reference; + gcp->storage_source.terminate = gcp_terminate; + gcp->storage_source.ss_flush = gcp_flush; + gcp->storage_source.ss_flush_finish = gcp_flush_finish; + + // The first reference is implied by the call to add_storage_source. + gcp->reference_count = 1; + + // Load the storage. + if ((ret = connection->add_storage_source( + connection, "gcp_store", &gcp->storage_source, nullptr)) != 0) { + std::cerr << "wiredtiger_extension_init: could not load GCP storage source, shutting down." + << std::endl; + delete (gcp); + } + + return ret; +} diff --git a/src/third_party/wiredtiger/import.data b/src/third_party/wiredtiger/import.data index 6fda752ac10..c4b91ee05bb 100644 --- a/src/third_party/wiredtiger/import.data +++ b/src/third_party/wiredtiger/import.data @@ -2,5 +2,5 @@ "vendor": "wiredtiger", "github": "wiredtiger/wiredtiger.git", "branch": "mongodb-master", - "commit": "82a6defadabfe59093df8346ebce7200a2ac55ff" + "commit": "7cef0e9b3ea2ee63b54b2e0f592cb0a18e25720b" } diff --git a/src/third_party/wiredtiger/test/suite/test_tiered19.py b/src/third_party/wiredtiger/test/suite/test_tiered19.py index f995f55e803..8c2c026ac1c 100644 --- a/src/third_party/wiredtiger/test/suite/test_tiered19.py +++ b/src/third_party/wiredtiger/test/suite/test_tiered19.py @@ -30,10 +30,11 @@ import random, string, wiredtiger, wttest from helper_tiered import get_auth_token, TieredConfigMixin from wtscenario import make_scenarios +file_system = wiredtiger.FileSystem + # test_tiered19.py # Testing storage source functionality for the Azure Storage Store # and Google Cloud extensions. - class test_tiered19(wttest.WiredTigerTestCase, TieredConfigMixin): tiered_storage_sources = [ @@ -46,23 +47,78 @@ class test_tiered19(wttest.WiredTigerTestCase, TieredConfigMixin): ('gcp_store', dict(is_tiered = True, is_local_storage = False, auth_token = get_auth_token('gcp_store'), - bucket = 'pythontest', + bucket = 'test_tiered19', bucket_prefix = "pfx_", ss_name = 'gcp_store')), ] # Make scenarios for different cloud service providers scenarios = make_scenarios(tiered_storage_sources) + + def get_storage_source(self): + return self.conn.get_storage_source(self.ss_name) + + def get_fs_config(self, prefix = '', cache_dir = ''): + conf = '' + if prefix: + conf += ',prefix=' + prefix + if cache_dir: + conf += ',cache_directory=' + cache_dir + return conf # Load the storage source extensions. def conn_extensions(self, extlist): TieredConfigMixin.conn_extensions(self, extlist) - def get_storage_source(self): - return self.conn.get_storage_source(self.ss_name) - - def get_fs_config(self, prefix = ''): - return ",prefix=" + prefix + def test_gcp_filesystem(self): + # Test basic functionality of the storage source API, calling + # each supported method in the API at least once. + + session = self.session + ss = self.get_storage_source() + + if (self.ss_name != 'gcp_store'): + return + + # Since this class has multiple tests, append test name to the prefix to + # avoid namespace collision. 0th element on the stack is the current function. + prefix = self.bucket_prefix.join(random.choices(string.ascii_letters + string.digits, k=10)) + + # Success case: an existing accessible bucket has been provided with the correct credentials file. + fs = ss.ss_customize_file_system(session, self.bucket, self.auth_token, self.get_fs_config(prefix)) + + # Error cases. + err_msg = 'Exception: Invalid argument' + + # Do not provide bucket name and credentials. + self.assertRaisesHavingMessage(wiredtiger.WiredTigerError, + lambda: ss.ss_customize_file_system( + session, None, None, self.get_fs_config(prefix)), err_msg) + # Provide empty bucket string. + self.assertRaisesHavingMessage(wiredtiger.WiredTigerError, + lambda: ss.ss_customize_file_system( + session, "", None, self.get_fs_config(prefix)), err_msg) + # Provide credentials in incorrect form. + self.assertRaisesHavingMessage(wiredtiger.WiredTigerError, + lambda: ss.ss_customize_file_system( + session, self.bucket, "gcp_cred", self.get_fs_config(prefix)), err_msg) + # Provide empty credentials string. + self.assertRaisesHavingMessage(wiredtiger.WiredTigerError, + lambda: ss.ss_customize_file_system( + session, self.bucket, "", self.get_fs_config(prefix)), err_msg) + # Provide a bucket name that does not exist. + non_exist_bucket = "non_exist" + self.assertRaisesHavingMessage(wiredtiger.WiredTigerError, + lambda: ss.ss_customize_file_system( + session, non_exist_bucket, None, self.get_fs_config(prefix)), err_msg) + # Provide a bucket name that exists but we do not have access to. + no_access_bucket = "test_cred" + self.assertRaisesHavingMessage(wiredtiger.WiredTigerError, + lambda: ss.ss_customize_file_system( + session, no_access_bucket, None, self.get_fs_config(prefix)), err_msg) + + fs.terminate(session) + ss.terminate(session) def test_ss_file_systems_gcp_and_azure(self): if self.ss_name != "azure_store": |