diff options
author | Luke Chen <luke.chen@mongodb.com> | 2023-02-13 09:36:00 +1100 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2023-02-12 23:23:59 +0000 |
commit | 6fd87f49a0f0a2a02d1962c2ab4198b1e71f8df3 (patch) | |
tree | 2a13f8a8cf18e92dffdd5749f949e271a6d09c5b /src | |
parent | 9a3f1ee9fade3be3e3334b625000530c098371c0 (diff) | |
download | mongo-6fd87f49a0f0a2a02d1962c2ab4198b1e71f8df3.tar.gz |
Import wiredtiger: 89a336be04c4f934524963880c014ae3ec6b1064 from branch mongodb-master
ref: cb3b25c307..89a336be04
for: 7.0.0-rc0
WT-10366 Implement open(), read and close() in file system class for Azure
Diffstat (limited to 'src')
6 files changed, 229 insertions, 44 deletions
diff --git a/src/third_party/wiredtiger/ext/storage_sources/azure_store/azure_connection.cpp b/src/third_party/wiredtiger/ext/storage_sources/azure_store/azure_connection.cpp index 6c81aa1fb6c..21d0267be98 100644 --- a/src/third_party/wiredtiger/ext/storage_sources/azure_store/azure_connection.cpp +++ b/src/third_party/wiredtiger/ext/storage_sources/azure_store/azure_connection.cpp @@ -162,10 +162,13 @@ azure_connection::read_object( return 0; } +// Check if an object exists in the bucket and find the object size. int -azure_connection::object_exists(const std::string &object_key, bool &exists) const +azure_connection::object_exists( + const std::string &object_key, bool &exists, size_t &object_size) const { exists = false; + object_size = 0; std::string obj = _object_prefix + object_key; auto list_blob_response = _azure_client.ListBlobs(); @@ -177,6 +180,7 @@ azure_connection::object_exists(const std::string &object_key, bool &exists) con if (blob_item.IsDeleted) { return -1; } + object_size = blob_item.BlobSize; exists = true; break; } @@ -184,6 +188,7 @@ azure_connection::object_exists(const std::string &object_key, bool &exists) con return 0; } +// Check if a bucket exists in the Azure storage source. int azure_connection::bucket_exists(bool &exists) const { diff --git a/src/third_party/wiredtiger/ext/storage_sources/azure_store/azure_connection.h b/src/third_party/wiredtiger/ext/storage_sources/azure_store/azure_connection.h index 75abd76aad8..d03c1c96f06 100644 --- a/src/third_party/wiredtiger/ext/storage_sources/azure_store/azure_connection.h +++ b/src/third_party/wiredtiger/ext/storage_sources/azure_store/azure_connection.h @@ -60,7 +60,7 @@ class azure_connection { int put_object(const std::string &object_key, const std::string &file_path) const; int delete_object(const std::string &object_key) const; int read_object(const std::string &object_key, int64_t offset, size_t len, void *buf) const; - int object_exists(const std::string &object_key, bool &exists) const; + int object_exists(const std::string &object_key, bool &exists, size_t &object_size) const; private: const std::string _bucket_name; diff --git a/src/third_party/wiredtiger/ext/storage_sources/azure_store/azure_storage_source.cpp b/src/third_party/wiredtiger/ext/storage_sources/azure_store/azure_storage_source.cpp index 908e0d06003..f7583b1c923 100644 --- a/src/third_party/wiredtiger/ext/storage_sources/azure_store/azure_storage_source.cpp +++ b/src/third_party/wiredtiger/ext/storage_sources/azure_store/azure_storage_source.cpp @@ -28,6 +28,7 @@ #include <wiredtiger.h> #include <wiredtiger_ext.h> #include <algorithm> +#include <fstream> #include <filesystem> #include <iostream> #include <memory> @@ -54,14 +55,16 @@ struct azure_file_system { WT_FILE_SYSTEM *wt_fs; std::mutex fh_mutex; - std::vector<azure_file_handle> azure_fh; + std::vector<azure_file_handle *> azure_fh; std::unique_ptr<azure_connection> azure_conn; std::string home_dir; }; struct azure_file_handle { WT_FILE_HANDLE fh; - azure_store *store; + azure_file_system *fs; + std::string name; + uint32_t reference_count; }; // WT_STORAGE_SOURCE Interface @@ -82,23 +85,21 @@ static int azure_object_list_single(WT_FILE_SYSTEM *, WT_SESSION *, const char * static int azure_object_list_free(WT_FILE_SYSTEM *, WT_SESSION *, char **, uint32_t) __attribute__((__unused__)); static int azure_file_system_terminate(WT_FILE_SYSTEM *, WT_SESSION *); -static int azure_file_exists(WT_FILE_SYSTEM *, WT_SESSION *, const char *, bool *) - __attribute__((__unused__)); +static int azure_file_exists(WT_FILE_SYSTEM *, WT_SESSION *, const char *, bool *); static int azure_remove(WT_FILE_SYSTEM *, WT_SESSION *, const char *, uint32_t) __attribute__((__unused__)); static int azure_rename(WT_FILE_SYSTEM *, WT_SESSION *, const char *, const char *, uint32_t) __attribute__((__unused__)); static int azure_object_size(WT_FILE_SYSTEM *, WT_SESSION *, const char *, wt_off_t *) __attribute__((__unused__)); -static int azure_file_open(WT_FILE_SYSTEM *, WT_SESSION *, const char *, WT_FS_OPEN_FILE_TYPE, - uint32_t, WT_FILE_HANDLE **) __attribute__((__unused__)); +static int azure_file_open( + WT_FILE_SYSTEM *, WT_SESSION *, const char *, WT_FS_OPEN_FILE_TYPE, uint32_t, WT_FILE_HANDLE **); // WT_FILE_HANDLE Interface -static int azure_file_close(WT_FILE_HANDLE *, WT_SESSION *) __attribute__((__unused__)); -static int azure_file_lock(WT_FILE_HANDLE *, WT_SESSION *, bool) __attribute__((__unused__)); -static int azure_file_read(WT_FILE_HANDLE *, WT_SESSION *, wt_off_t, size_t, void *) - __attribute__((__unused__)); -static int azure_file_size(WT_FILE_HANDLE *, WT_SESSION *, wt_off_t *) __attribute__((__unused__)); +static int azure_file_close(WT_FILE_HANDLE *, WT_SESSION *); +static int azure_file_lock(WT_FILE_HANDLE *, WT_SESSION *, bool); +static int azure_file_read(WT_FILE_HANDLE *, WT_SESSION *, wt_off_t, size_t, void *); +static int azure_file_size(WT_FILE_HANDLE *, WT_SESSION *, wt_off_t *); // Return a customised file system to access the Azure storage source. static int @@ -117,6 +118,7 @@ azure_customize_file_system(WT_STORAGE_SOURCE *storage_source, WT_SESSION *sessi // Get the value of the config key from the string azure_store *azure_storage = reinterpret_cast<azure_store *>(storage_source); int ret; + if ((ret = azure_storage->wt_api->config_get_string( azure_storage->wt_api, session, config, "prefix", &obj_prefix_config)) == 0) obj_prefix = std::string(obj_prefix_config.str, obj_prefix_config.len); @@ -175,6 +177,7 @@ static int azure_add_reference(WT_STORAGE_SOURCE *storage_source) { azure_store *azure_storage = reinterpret_cast<azure_store *>(storage_source); + if (azure_storage->reference_count == 0 || azure_storage->reference_count + 1 == 0) { std::cerr << "azure_add_reference: missing reference or overflow." << std::endl; return EINVAL; @@ -188,11 +191,12 @@ static int azure_flush(WT_STORAGE_SOURCE *storage_source, WT_SESSION *session, WT_FILE_SYSTEM *file_system, const char *source, const char *object, const char *config) { + azure_file_system *azure_fs = reinterpret_cast<azure_file_system *>(file_system); + WT_FILE_SYSTEM *wtFileSystem = azure_fs->wt_fs; + WT_UNUSED(storage_source); WT_UNUSED(source); WT_UNUSED(config); - azure_file_system *azure_fs = reinterpret_cast<azure_file_system *>(file_system); - WT_FILE_SYSTEM *wtFileSystem = azure_fs->wt_fs; // std::filesystem::canonical will throw an exception if object does not exist so // check if the object exists. @@ -230,24 +234,29 @@ static int azure_flush_finish(WT_STORAGE_SOURCE *storage_source, WT_SESSION *session, WT_FILE_SYSTEM *file_system, const char *source, const char *object, const char *config) { + azure_file_system *azure_fs = reinterpret_cast<azure_file_system *>(file_system); + WT_UNUSED(storage_source); WT_UNUSED(session); WT_UNUSED(config); WT_UNUSED(source); - azure_file_system *azure_fs = reinterpret_cast<azure_file_system *>(file_system); std::cout << "azure_flush_finish: Checking object: " << object << " exists in Azure." << std::endl; // Check whether the object exists in the cloud. bool exists_cloud = false; - azure_fs->azure_conn->object_exists(object, exists_cloud); + size_t size = 0; + azure_fs->azure_conn->object_exists(object, exists_cloud, size); if (!exists_cloud) { std::cerr << "azure_flush_finish: Object: " << object << " does not exist in Azure." << std::endl; return ENOENT; } std::cout << "azure_flush_finish: Object: " << object << " exists in Azure." << std::endl; + + WT_UNUSED(size); + return 0; } @@ -271,6 +280,7 @@ azure_terminate(WT_STORAGE_SOURCE *storage_source, WT_SESSION *session) } delete azure_storage; + return 0; } @@ -332,16 +342,25 @@ azure_file_system_terminate(WT_FILE_SYSTEM *file_system, WT_SESSION *session) } azure_fs->azure_conn.reset(); free(azure_fs); + return 0; } +// Check if the object (file) exists in the Azure storage source. static int azure_file_exists(WT_FILE_SYSTEM *file_system, WT_SESSION *session, const char *name, bool *existp) { - WT_UNUSED(file_system); + azure_file_system *azure_fs = reinterpret_cast<azure_file_system *>(file_system); + int ret; + size_t size = 0; + WT_UNUSED(session); - WT_UNUSED(name); - WT_UNUSED(existp); + WT_UNUSED(size); + + if ((ret = azure_fs->azure_conn->object_exists(std::string(name), *existp, size)) != 0) { + std::cerr << "azure_file_open: object_exists request to Azure failed." << std::endl; + return ret; + } return 0; } @@ -382,32 +401,125 @@ azure_object_size( return 0; } +// File open for the Azure storage source. static int azure_file_open(WT_FILE_SYSTEM *file_system, WT_SESSION *session, const char *name, WT_FS_OPEN_FILE_TYPE file_type, uint32_t flags, WT_FILE_HANDLE **file_handlep) { - WT_UNUSED(file_system); + azure_file_system *azure_fs = reinterpret_cast<azure_file_system *>(file_system); + WT_UNUSED(session); - WT_UNUSED(name); - WT_UNUSED(file_type); - WT_UNUSED(flags); - WT_UNUSED(file_handlep); + + // Azure only supports opening the file in read only mode. + if ((flags & WT_FS_OPEN_READONLY) == 0 || (flags & WT_FS_OPEN_CREATE) != 0) { + std::cerr << "azure_file_open: read-only access required." << std::endl; + return EINVAL; + } + + // Only data files and regular files should be opened. + if (file_type != WT_FS_OPEN_FILE_TYPE_DATA && file_type != WT_FS_OPEN_FILE_TYPE_REGULAR) { + std::cerr << "azure_file_open: only data file and regular types supported." << std::endl; + return EINVAL; + } + + // Check if object exists. + bool exists; + int ret; + size_t size = 0; + + WT_UNUSED(size); + + if ((ret = azure_fs->azure_conn->object_exists(std::string(name), exists, size)) != 0) { + std::cerr << "azure_file_open: object_exists request to Azure failed." << std::endl; + return ret; + } + if (!exists) { + std::cerr << "azure_file_open: no such file named " << name << "." << std::endl; + return EINVAL; + } + + // Check if there is already an existing file handle open. + auto fh_iterator = std::find_if(azure_fs->azure_fh.begin(), azure_fs->azure_fh.end(), + [name](azure_file_handle *fh) { return fh->name.compare(name) == 0; }); + + // Active file handle for file exists, increment reference count. + if (fh_iterator != azure_fs->azure_fh.end()) { + (*fh_iterator)->reference_count++; + *file_handlep = reinterpret_cast<WT_FILE_HANDLE *>(*fh_iterator); + return 0; + } + + // No active file handle, create a new file handle. + azure_file_handle *azure_fh; + try { + azure_fh = new azure_file_handle; + } catch (std::bad_alloc &e) { + std::cerr << std::string("azure_file_open: ") + e.what() << std::endl; + return ENOMEM; + } + azure_fh->name = name; + azure_fh->reference_count = 1; + azure_fh->fs = azure_fs; + + // Define functions needed for Azure with read-only privilleges. + azure_fh->fh.close = azure_file_close; + azure_fh->fh.fh_advise = nullptr; + azure_fh->fh.fh_extend = nullptr; + azure_fh->fh.fh_extend_nolock = nullptr; + azure_fh->fh.fh_lock = azure_file_lock; + azure_fh->fh.fh_map = nullptr; + azure_fh->fh.fh_map_discard = nullptr; + azure_fh->fh.fh_unmap = nullptr; + azure_fh->fh.fh_read = azure_file_read; + azure_fh->fh.fh_size = azure_file_size; + azure_fh->fh.fh_sync = nullptr; + azure_fh->fh.fh_sync_nowait = nullptr; + azure_fh->fh.fh_truncate = nullptr; + azure_fh->fh.fh_write = nullptr; + azure_fh->fh.name = strdup(name); + + // Exclusive Access is required when adding file handles to list of file handles. + // lock_guard will unlock automatically when the scope is exited. + { + std::lock_guard<std::mutex> lock_guard(azure_fs->fh_mutex); + azure_fs->azure_fh.push_back(azure_fh); + } + *file_handlep = &azure_fh->fh; return 0; } +// File handle close. static int azure_file_close(WT_FILE_HANDLE *file_handle, WT_SESSION *session) { - WT_UNUSED(file_handle); + azure_file_handle *azure_fh = reinterpret_cast<azure_file_handle *>(file_handle); + WT_UNUSED(session); + // If there are other active instances of the file being open, do not close file handle. + if (--azure_fh->reference_count != 0) + return 0; + + // No more active instances of open file, close the file handle. + azure_file_system *azure_fs = azure_fh->fs; + { + std::lock_guard<std::mutex> lock_guard(azure_fs->fh_mutex); + // Erase-remove idiom to eliminate specific file handle + azure_fs->azure_fh.erase( + std::remove(azure_fs->azure_fh.begin(), azure_fs->azure_fh.end(), azure_fh), + azure_fs->azure_fh.end()); + } + return 0; } +// Lock/unlock a file. static int azure_file_lock(WT_FILE_HANDLE *file_handle, WT_SESSION *session, bool lock) { + // Since the file is in the cloud, locks are always granted because concurrent reads do not + // require a lock. WT_UNUSED(file_handle); WT_UNUSED(session); WT_UNUSED(lock); @@ -415,25 +527,43 @@ azure_file_lock(WT_FILE_HANDLE *file_handle, WT_SESSION *session, bool lock) return 0; } +// Read a file using Azure connection class read object functionality. static int azure_file_read( WT_FILE_HANDLE *file_handle, WT_SESSION *session, wt_off_t offset, size_t len, void *buf) { - WT_UNUSED(file_handle); + azure_file_handle *azure_fh = reinterpret_cast<azure_file_handle *>(file_handle); + azure_file_system *azure_fs = azure_fh->fs; + WT_UNUSED(session); - WT_UNUSED(offset); - WT_UNUSED(len); - WT_UNUSED(buf); + + int ret; + if ((ret = azure_fs->azure_conn->read_object(azure_fh->name, offset, len, buf) != 0)) { + std::cerr << "azure_file_read: read_object request to Azure failed." << std::endl; + return ret; + } return 0; } +// Get the size of a file in bytes. static int azure_file_size(WT_FILE_HANDLE *file_handle, WT_SESSION *session, wt_off_t *sizep) { - WT_UNUSED(file_handle); - WT_UNUSED(session); - WT_UNUSED(sizep); + azure_file_handle *azure_fh = reinterpret_cast<azure_file_handle *>(file_handle); + int ret; + bool exists; + size_t size = 0; + *sizep = 0; + + WT_UNUSED(exists); + + if ((ret = azure_fh->fs->azure_conn->object_exists(azure_fh->name, exists, size)) != 0) { + std::cerr << "azure_file_open: object_exists request to Azure failed." << std::endl; + return ret; + } + + *sizep = size; return 0; } diff --git a/src/third_party/wiredtiger/ext/storage_sources/azure_store/test/test_azure_connection.cpp b/src/third_party/wiredtiger/ext/storage_sources/azure_store/test/test_azure_connection.cpp index 14da1f3e9c7..f020cc787d0 100644 --- a/src/third_party/wiredtiger/ext/storage_sources/azure_store/test/test_azure_connection.cpp +++ b/src/third_party/wiredtiger/ext/storage_sources/azure_store/test/test_azure_connection.cpp @@ -72,6 +72,7 @@ TEST_CASE("Testing Azure Connection Class", "azure-connection") auto azure_client = Azure::Storage::Blobs::BlobContainerClient::CreateFromConnectionString( std::getenv("AZURE_STORAGE_CONNECTION_STRING"), "myblobcontainer1"); bool exists = false; + size_t object_size = 0; std::string obj_prefix = randomize_test_prefix(); @@ -104,11 +105,15 @@ TEST_CASE("Testing Azure Connection Class", "azure-connection") SECTION("Check object exists in Azure.", "[azure-connection]") { // Object exists so there should be 1 object. - REQUIRE(conn.object_exists(object_name, exists) == 0); + REQUIRE(conn.object_exists(object_name, exists, object_size) == 0); REQUIRE(exists == true); + auto blob_client = azure_client.GetBlockBlobClient(obj_prefix + object_name); + auto blob_properties = blob_client.GetProperties(); + size_t blob_size = blob_properties.Value.BlobSize; + REQUIRE(object_size == blob_size); // Object does not exist so there should be 0 objects. - REQUIRE(conn.object_exists(non_exist_object_key, exists) == 0); + REQUIRE(conn.object_exists(non_exist_object_key, exists, object_size) == 0); REQUIRE(exists == false); } diff --git a/src/third_party/wiredtiger/import.data b/src/third_party/wiredtiger/import.data index 29e0420cd34..a9c12587639 100644 --- a/src/third_party/wiredtiger/import.data +++ b/src/third_party/wiredtiger/import.data @@ -2,5 +2,5 @@ "vendor": "wiredtiger", "github": "wiredtiger/wiredtiger.git", "branch": "mongodb-master", - "commit": "cb3b25c3072b5802a30674e600432e073c0a9396" + "commit": "89a336be04c4f934524963880c014ae3ec6b1064" } diff --git a/src/third_party/wiredtiger/test/suite/test_tiered19.py b/src/third_party/wiredtiger/test/suite/test_tiered19.py index 8bc7bebd05b..b138ab332a6 100644 --- a/src/third_party/wiredtiger/test/suite/test_tiered19.py +++ b/src/third_party/wiredtiger/test/suite/test_tiered19.py @@ -144,13 +144,16 @@ class test_tiered19(wttest.WiredTigerTestCase, TieredConfigMixin): session, bad_bucket, None, self.get_fs_config(prefix_1)), err_msg) # Test the customize file system function works when there is a valid bucket. - azure_fs_1 = ss.ss_customize_file_system( + azure_fs = ss.ss_customize_file_system( session, self.bucket, None, self.get_fs_config(prefix_1)) # Create another file systems to make sure that terminate works. ss.ss_customize_file_system( session, self.bucket, None, self.get_fs_config(prefix_2)) + # Check fs exist for non-existing object. + self.assertFalse(azure_fs.fs_exist(session, 'foobar')) + # We cannot use the file system to create files, it is readonly. # So use python I/O to build up the file. with open('foobar', 'wb') as f: @@ -158,19 +161,61 @@ class test_tiered19(wttest.WiredTigerTestCase, TieredConfigMixin): f.write(outbytes) # Flush valid file into Azure. - self.assertEqual(ss.ss_flush(session, azure_fs_1, 'foobar', 'foobar', None), 0) + self.assertEqual(ss.ss_flush(session, azure_fs, 'foobar', 'foobar', None), 0) # Check that file exists in Azure. - self.assertEqual(ss.ss_flush_finish(session, azure_fs_1, 'foobar', 'foobar', None), 0) + self.assertEqual(ss.ss_flush_finish(session, azure_fs, 'foobar', 'foobar', None), 0) + # Check file system exists for an existing object. + self.assertTrue(azure_fs.fs_exist(session, 'foobar')) + + # Open existing file in the cloud. Only one active file handle exists for each open file. + # A reference count keeps track of open file instances so we can get a pointer to the same + # file handle as long as there are more open file calls than close file calls (i.e. reference + # count is greater than 0). + fh_1 = azure_fs.fs_open_file(session, 'foobar', file_system.open_file_type_data, file_system.open_readonly) + assert(fh_1 != None) + fh_2 = azure_fs.fs_open_file(session, 'foobar', file_system.open_file_type_data, file_system.open_readonly) + assert(fh_2 != None) + + # File handle lock call not used in Azure implementation. + self.assertEqual(fh_1.fh_lock(session, True), 0) + self.assertEqual(fh_1.fh_lock(session, False), 0) + + # Read using a valid file handle. + inbytes_1 = bytes(1000000) + self.assertEqual(fh_1.fh_read(session, 0, inbytes_1), 0) + self.assertEquals(outbytes[0:1000000], inbytes_1) + + # Close a valid file handle. + self.assertEqual(fh_1.close(session), 0) + + # Read using a valid file handle. + inbytes_2 = bytes(1000000) + self.assertEqual(fh_2.fh_read(session, 0, inbytes_2), 0) + self.assertEquals(outbytes[0:1000000], inbytes_2) + + # File size succeeds. + self.assertEqual(fh_1.fh_size(session), 2200000) + + # Close a valid file handle. + self.assertEquals(fh_2.close(session), 0) + + # Test that opening invalid file fails. + bad_file = 'bad_file' + err_msg = '/Exception: Invalid argument/' + self.assertRaisesHavingMessage(wiredtiger.WiredTigerError, + lambda: azure_fs.fs_open_file(session, bad_file, + file_system.open_file_type_data,file_system.open_readonly), err_msg) + err_msg = '/Exception: No such file or directory/' # Flush non valid file into Azure will result in an exception. self.assertRaisesHavingMessage(wiredtiger.WiredTigerError, - lambda: ss.ss_flush(session, azure_fs_1, 'non_existing_file', 'non_existing_file', None), err_msg) - # Check that file does not exist in Azure. + lambda: ss.ss_flush(session, azure_fs, 'non_existing_file', 'non_existing_file', None), err_msg) + # Check that file does not exist in Azure. self.assertRaisesHavingMessage(wiredtiger.WiredTigerError, - lambda: ss.ss_flush_finish(session, azure_fs_1, 'non_existing_file', 'non_existing_file', None), err_msg) - + lambda: ss.ss_flush_finish(session, azure_fs, 'non_existing_file', 'non_existing_file', None), err_msg) + # Test that azure file system terminate succeeds. - self.assertEqual(azure_fs_1.terminate(session), 0) + self.assertEqual(azure_fs.terminate(session), 0) # Test that azure storage source terminate succeeds. self.assertEqual(ss.terminate(session), 0) |