diff options
author | Clarisse Cheah <clarisse.cheah@mongodb.com> | 2023-02-06 00:48:22 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2023-02-06 01:25:32 +0000 |
commit | bca80a1ea9507681baee65d9257dee976936b9f3 (patch) | |
tree | 52e3a44dc6fbd42c7828f5fef571b1d2e9d1a42d /src | |
parent | 50975d9996e0f36dcebcd186f9b56b092478815f (diff) | |
download | mongo-bca80a1ea9507681baee65d9257dee976936b9f3.tar.gz |
Import wiredtiger: 32c207223a1841bb3a40c893c57025a79e20a2a1 from branch mongodb-master
ref: 79615a8905..32c207223a
for: 6.3.0-rc0
WT-10357 Implement customize_file_system() and add_reference() in storage source class for Azure
Diffstat (limited to 'src')
3 files changed, 192 insertions, 31 deletions
diff --git a/src/third_party/wiredtiger/ext/storage_sources/azure_store/azure_storage_source.cpp b/src/third_party/wiredtiger/ext/storage_sources/azure_store/azure_storage_source.cpp index 9da0cb48a88..989450ff712 100644 --- a/src/third_party/wiredtiger/ext/storage_sources/azure_store/azure_storage_source.cpp +++ b/src/third_party/wiredtiger/ext/storage_sources/azure_store/azure_storage_source.cpp @@ -27,6 +27,9 @@ */ #include <wiredtiger.h> #include <wiredtiger_ext.h> +#include <algorithm> +#include <memory> +#include <mutex> #include <vector> #include "azure_connection.h" @@ -36,14 +39,22 @@ struct azure_file_system; struct azure_file_handle; struct azure_store { WT_STORAGE_SOURCE store; + WT_EXTENSION_API *wt_api; + + std::mutex fs_mutex; std::vector<azure_file_system *> azure_fs; + uint32_t reference_count; }; struct azure_file_system { WT_FILE_SYSTEM fs; azure_store *store; + WT_FILE_SYSTEM *wt_fs; + + std::mutex fh_mutex; std::vector<azure_file_handle> azure_fh; - azure_connection *azure_conn; + std::unique_ptr<azure_connection> azure_conn; + std::string home_dir; }; struct azure_file_handle { @@ -52,10 +63,10 @@ struct azure_file_handle { }; // WT_STORAGE_SOURCE Interface -static int azure_customize_file_system(WT_STORAGE_SOURCE *, WT_SESSION *, const char *, - const char *, const char *, WT_FILE_SYSTEM **) __attribute__((__unused__)); -static int azure_add_reference(WT_STORAGE_SOURCE *) __attribute__((__unused__)); -static int azure_terminate(WT_FILE_SYSTEM *, WT_SESSION *) __attribute__((__unused__)); +static int azure_customize_file_system( + WT_STORAGE_SOURCE *, WT_SESSION *, const char *, const char *, const char *, WT_FILE_SYSTEM **); +static int azure_add_reference(WT_STORAGE_SOURCE *); +static int azure_terminate(WT_STORAGE_SOURCE *, WT_SESSION *); static int azure_flush(WT_STORAGE_SOURCE *, WT_SESSION *, WT_FILE_SYSTEM *, const char *, const char *, const char *) __attribute__((__unused__)); static int azure_flush_finish(WT_STORAGE_SOURCE *, WT_SESSION *, WT_FILE_SYSTEM *, const char *, @@ -68,7 +79,7 @@ static int azure_object_list_single(WT_FILE_SYSTEM *, WT_SESSION *, const char * char ***, uint32_t *) __attribute__((__unused__)); static int azure_object_list_free(WT_FILE_SYSTEM *, WT_SESSION *, char **, uint32_t) __attribute__((__unused__)); -static int azure_file_system_terminate(WT_FILE_SYSTEM *, WT_SESSION *) __attribute__((__unused__)); +static int azure_file_system_terminate(WT_FILE_SYSTEM *, WT_SESSION *); static int azure_file_exists(WT_FILE_SYSTEM *, WT_SESSION *, const char *, bool *) __attribute__((__unused__)); static int azure_remove(WT_FILE_SYSTEM *, WT_SESSION *, const char *, uint32_t) @@ -87,26 +98,86 @@ static int azure_file_read(WT_FILE_HANDLE *, WT_SESSION *, wt_off_t, size_t, voi __attribute__((__unused__)); static int azure_file_size(WT_FILE_HANDLE *, WT_SESSION *, wt_off_t *) __attribute__((__unused__)); +// Return a customised file system to access the Azure storage source. static int azure_customize_file_system(WT_STORAGE_SOURCE *storage_source, WT_SESSION *session, - const char *bucket_name, const char *auth_token, const char *config, - WT_FILE_SYSTEM **file_systemp) + const char *bucket, const char *auth_token, const char *config, WT_FILE_SYSTEM **file_system) { - WT_UNUSED(storage_source); - WT_UNUSED(session); - WT_UNUSED(bucket_name); - WT_UNUSED(auth_token); - WT_UNUSED(config); - WT_UNUSED(file_systemp); - - return 0; + if (bucket == nullptr || strlen(bucket) == 0) { + std::cerr << "azure_customize_file_system: Bucket not specified." << std::endl; + return EINVAL; + } + + // Get any prefix to be used for the object keys. + WT_CONFIG_ITEM obj_prefix_config; + std::string obj_prefix; + + // Get the value of the config key from the string + azure_store *azure_storage = reinterpret_cast<azure_store *>(storage_source); + int ret; + if ((ret = azure_storage->wt_api->config_get_string( + azure_storage->wt_api, session, config, "prefix", &obj_prefix_config)) == 0) + obj_prefix = std::string(obj_prefix_config.str, obj_prefix_config.len); + else if (ret != WT_NOTFOUND) { + std::cerr << "azure_customize_file_system: error parsing config for object prefix." + << std::endl; + return ret; + } + + // Fetch the native WT file system. + WT_FILE_SYSTEM *wt_file_system; + if ((ret = azure_storage->wt_api->file_system_get( + azure_storage->wt_api, session, &wt_file_system)) != 0) + return ret; + + // Create file system and allocate memory for the file system. + azure_file_system *azure_fs; + try { + azure_fs = new azure_file_system; + } catch (std::bad_alloc &e) { + std::cerr << std::string("azure_customize_file_system: ") + e.what() << std::endl; + return ENOMEM; + } + + // Initialise references to azure storage source, wt fs and home directory. + azure_fs->store = azure_storage; + azure_fs->wt_fs = wt_file_system; + azure_fs->home_dir = session->connection->get_home(session->connection); + try { + azure_fs->azure_conn = std::make_unique<azure_connection>(bucket, obj_prefix); + } catch (std::runtime_error &e) { + std::cerr << std::string("azure_customize_file_system: ") + e.what() << std::endl; + return ENOENT; + } + azure_fs->fs.fs_directory_list = azure_object_list; + azure_fs->fs.fs_directory_list_single = azure_object_list_single; + azure_fs->fs.fs_directory_list_free = azure_object_list_free; + azure_fs->fs.terminate = azure_file_system_terminate; + azure_fs->fs.fs_exist = azure_file_exists; + azure_fs->fs.fs_open_file = azure_file_open; + azure_fs->fs.fs_remove = azure_remove; + azure_fs->fs.fs_rename = azure_rename; + azure_fs->fs.fs_size = azure_object_size; + + // Add to the list of the active file systems. Lock will be freed when the scope is exited. + { + std::lock_guard<std::mutex> lock_guard(azure_storage->fs_mutex); + azure_storage->azure_fs.push_back(azure_fs); + } + *file_system = &azure_fs->fs; + return ret; } +// Add a reference to the storage source so we can reference count to know when to terminate. static int azure_add_reference(WT_STORAGE_SOURCE *storage_source) { - WT_UNUSED(storage_source); - + azure_store *azure_storage = reinterpret_cast<azure_store *>(storage_source); + if (azure_storage->reference_count == 0 || azure_storage->reference_count + 1 == 0) { + std::cerr << "azure_add_reference: missing reference or overflow." << std::endl; + return EINVAL; + } + ++azure_storage->reference_count; return 0; } @@ -138,12 +209,26 @@ azure_flush_finish(WT_STORAGE_SOURCE *storage_source, WT_SESSION *session, return 0; } +// Discard any resources on termination. static int -azure_terminate(WT_FILE_SYSTEM *file_system, WT_SESSION *session) +azure_terminate(WT_STORAGE_SOURCE *storage_source, WT_SESSION *session) { - WT_UNUSED(file_system); - WT_UNUSED(session); - + azure_store *azure_storage = reinterpret_cast<azure_store *>(storage_source); + + if (--azure_storage->reference_count != 0) + return 0; + + /* + * Terminate any active filesystems. There are no references to the storage source, so it is + * safe to walk the active filesystem list without a lock. The removal from the list happens + * under a lock. Also, removal happens from the front and addition at the end, so we are safe. + */ + while (!azure_storage->azure_fs.empty()) { + WT_FILE_SYSTEM *fs = reinterpret_cast<WT_FILE_SYSTEM *>(azure_storage->azure_fs.front()); + azure_file_system_terminate(fs, session); + } + + delete azure_storage; return 0; } @@ -182,17 +267,29 @@ azure_object_list_free( WT_UNUSED(file_system); WT_UNUSED(session); WT_UNUSED(dirlist); - WT_UNUSED(count); return 0; } +// Discard any resources on termination of the file system. static int azure_file_system_terminate(WT_FILE_SYSTEM *file_system, WT_SESSION *session) { - WT_UNUSED(file_system); + azure_file_system *azure_fs = reinterpret_cast<azure_file_system *>(file_system); + azure_store *azure_storage = azure_fs->store; + WT_UNUSED(session); + // Remove from the active file system list. The lock will be freed when the scope is exited. + { + std::lock_guard<std::mutex> lock_guard(azure_storage->fs_mutex); + // Erase-remove idiom used to eliminate specific file system. + azure_storage->azure_fs.erase( + std::remove(azure_storage->azure_fs.begin(), azure_storage->azure_fs.end(), azure_fs), + azure_storage->azure_fs.end()); + } + azure_fs->azure_conn.reset(); + free(azure_fs); return 0; } @@ -299,11 +396,30 @@ azure_file_size(WT_FILE_HANDLE *file_handle, WT_SESSION *session, wt_off_t *size return 0; } +// An Azure storage source library - creates an entry point to the Azure extension. int wiredtiger_extension_init(WT_CONNECTION *connection, WT_CONFIG_ARG *config) { - WT_UNUSED(connection); - WT_UNUSED(config); - + azure_store *azure_storage = new azure_store; + azure_storage->wt_api = connection->get_extension_api(connection); + + azure_storage->store.ss_customize_file_system = azure_customize_file_system; + azure_storage->store.ss_add_reference = azure_add_reference; + azure_storage->store.terminate = azure_terminate; + azure_storage->store.ss_flush = azure_flush; + azure_storage->store.ss_flush_finish = azure_flush_finish; + + // The first reference is implied by the call to add_storage_source. + azure_storage->reference_count = 1; + + // Load the storage. + if ((connection->add_storage_source( + connection, "azure_store", &azure_storage->store, nullptr)) != 0) { + std::cerr + << "wiredtiger_extension_init: Could not load Azure storage source, shutting down." + << std::endl; + delete azure_storage; + return -1; + } return 0; } diff --git a/src/third_party/wiredtiger/import.data b/src/third_party/wiredtiger/import.data index b04da175cf1..8294b3902dc 100644 --- a/src/third_party/wiredtiger/import.data +++ b/src/third_party/wiredtiger/import.data @@ -2,5 +2,5 @@ "vendor": "wiredtiger", "github": "wiredtiger/wiredtiger.git", "branch": "mongodb-master", - "commit": "79615a890553eced88b10b115a3e2419331d9ae6" + "commit": "32c207223a1841bb3a40c893c57025a79e20a2a1" } diff --git a/src/third_party/wiredtiger/test/suite/test_tiered19.py b/src/third_party/wiredtiger/test/suite/test_tiered19.py index 514b1c8701e..f995f55e803 100644 --- a/src/third_party/wiredtiger/test/suite/test_tiered19.py +++ b/src/third_party/wiredtiger/test/suite/test_tiered19.py @@ -26,10 +26,14 @@ # ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR # OTHER DEALINGS IN THE SOFTWARE. -import wttest +import random, string, wiredtiger, wttest from helper_tiered import get_auth_token, TieredConfigMixin from wtscenario import make_scenarios +# test_tiered19.py +# Testing storage source functionality for the Azure Storage Store +# and Google Cloud extensions. + class test_tiered19(wttest.WiredTigerTestCase, TieredConfigMixin): tiered_storage_sources = [ @@ -50,8 +54,49 @@ class test_tiered19(wttest.WiredTigerTestCase, TieredConfigMixin): # Make scenarios for different cloud service providers scenarios = make_scenarios(tiered_storage_sources) + # Load the storage source extensions. def conn_extensions(self, extlist): TieredConfigMixin.conn_extensions(self, extlist) - def test_gcp_and_azure(self): - pass
\ No newline at end of file + def get_storage_source(self): + return self.conn.get_storage_source(self.ss_name) + + def get_fs_config(self, prefix = ''): + return ",prefix=" + prefix + + def test_ss_file_systems_gcp_and_azure(self): + if self.ss_name != "azure_store": + return + session = self.session + ss = self.get_storage_source() + + prefix_1 = self.bucket_prefix.join( + random.choices(string.ascii_letters + string.digits, k=10)) + prefix_2 = self.bucket_prefix.join( + random.choices(string.ascii_letters + string.digits, k=10)) + + # Test the customize file system function errors when there is an invalid bucket. + err_msg = '/Exception: Invalid argument/' + self.assertRaisesHavingMessage(wiredtiger.WiredTigerError, + lambda: ss.ss_customize_file_system( + session, "", None, self.get_fs_config(prefix_1)), err_msg) + + bad_bucket = "./bucket_BAD" + err_msg = '/Exception: No such file or directory/' + self.assertRaisesHavingMessage(wiredtiger.WiredTigerError, + lambda: ss.ss_customize_file_system( + session, bad_bucket, None, self.get_fs_config(prefix_1)), err_msg) + + # Test the customize file system function works when there is a valid bucket. + azure_fs_1 = ss.ss_customize_file_system( + session, self.bucket, None, self.get_fs_config(prefix_1)) + + # Create another file systems to make sure that terminate works. + ss.ss_customize_file_system( + session, self.bucket, None, self.get_fs_config(prefix_2)) + + # Test that azure file system terminate succeeds. + self.assertEqual(azure_fs_1.terminate(session), 0) + + # Test that azure storage source terminate succeeds. + self.assertEqual(ss.terminate(session), 0) |