summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorClarisse Cheah <clarisse.cheah@mongodb.com>2023-02-06 00:48:22 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2023-02-06 01:25:32 +0000
commitbca80a1ea9507681baee65d9257dee976936b9f3 (patch)
tree52e3a44dc6fbd42c7828f5fef571b1d2e9d1a42d
parent50975d9996e0f36dcebcd186f9b56b092478815f (diff)
downloadmongo-bca80a1ea9507681baee65d9257dee976936b9f3.tar.gz
Import wiredtiger: 32c207223a1841bb3a40c893c57025a79e20a2a1 from branch mongodb-master
ref: 79615a8905..32c207223a for: 6.3.0-rc0 WT-10357 Implement customize_file_system() and add_reference() in storage source class for Azure
-rw-r--r--src/third_party/wiredtiger/ext/storage_sources/azure_store/azure_storage_source.cpp170
-rw-r--r--src/third_party/wiredtiger/import.data2
-rw-r--r--src/third_party/wiredtiger/test/suite/test_tiered19.py51
3 files changed, 192 insertions, 31 deletions
diff --git a/src/third_party/wiredtiger/ext/storage_sources/azure_store/azure_storage_source.cpp b/src/third_party/wiredtiger/ext/storage_sources/azure_store/azure_storage_source.cpp
index 9da0cb48a88..989450ff712 100644
--- a/src/third_party/wiredtiger/ext/storage_sources/azure_store/azure_storage_source.cpp
+++ b/src/third_party/wiredtiger/ext/storage_sources/azure_store/azure_storage_source.cpp
@@ -27,6 +27,9 @@
*/
#include <wiredtiger.h>
#include <wiredtiger_ext.h>
+#include <algorithm>
+#include <memory>
+#include <mutex>
#include <vector>
#include "azure_connection.h"
@@ -36,14 +39,22 @@ struct azure_file_system;
struct azure_file_handle;
struct azure_store {
WT_STORAGE_SOURCE store;
+ WT_EXTENSION_API *wt_api;
+
+ std::mutex fs_mutex;
std::vector<azure_file_system *> azure_fs;
+ uint32_t reference_count;
};
struct azure_file_system {
WT_FILE_SYSTEM fs;
azure_store *store;
+ WT_FILE_SYSTEM *wt_fs;
+
+ std::mutex fh_mutex;
std::vector<azure_file_handle> azure_fh;
- azure_connection *azure_conn;
+ std::unique_ptr<azure_connection> azure_conn;
+ std::string home_dir;
};
struct azure_file_handle {
@@ -52,10 +63,10 @@ struct azure_file_handle {
};
// WT_STORAGE_SOURCE Interface
-static int azure_customize_file_system(WT_STORAGE_SOURCE *, WT_SESSION *, const char *,
- const char *, const char *, WT_FILE_SYSTEM **) __attribute__((__unused__));
-static int azure_add_reference(WT_STORAGE_SOURCE *) __attribute__((__unused__));
-static int azure_terminate(WT_FILE_SYSTEM *, WT_SESSION *) __attribute__((__unused__));
+static int azure_customize_file_system(
+ WT_STORAGE_SOURCE *, WT_SESSION *, const char *, const char *, const char *, WT_FILE_SYSTEM **);
+static int azure_add_reference(WT_STORAGE_SOURCE *);
+static int azure_terminate(WT_STORAGE_SOURCE *, WT_SESSION *);
static int azure_flush(WT_STORAGE_SOURCE *, WT_SESSION *, WT_FILE_SYSTEM *, const char *,
const char *, const char *) __attribute__((__unused__));
static int azure_flush_finish(WT_STORAGE_SOURCE *, WT_SESSION *, WT_FILE_SYSTEM *, const char *,
@@ -68,7 +79,7 @@ static int azure_object_list_single(WT_FILE_SYSTEM *, WT_SESSION *, const char *
char ***, uint32_t *) __attribute__((__unused__));
static int azure_object_list_free(WT_FILE_SYSTEM *, WT_SESSION *, char **, uint32_t)
__attribute__((__unused__));
-static int azure_file_system_terminate(WT_FILE_SYSTEM *, WT_SESSION *) __attribute__((__unused__));
+static int azure_file_system_terminate(WT_FILE_SYSTEM *, WT_SESSION *);
static int azure_file_exists(WT_FILE_SYSTEM *, WT_SESSION *, const char *, bool *)
__attribute__((__unused__));
static int azure_remove(WT_FILE_SYSTEM *, WT_SESSION *, const char *, uint32_t)
@@ -87,26 +98,86 @@ static int azure_file_read(WT_FILE_HANDLE *, WT_SESSION *, wt_off_t, size_t, voi
__attribute__((__unused__));
static int azure_file_size(WT_FILE_HANDLE *, WT_SESSION *, wt_off_t *) __attribute__((__unused__));
+// Return a customised file system to access the Azure storage source.
static int
azure_customize_file_system(WT_STORAGE_SOURCE *storage_source, WT_SESSION *session,
- const char *bucket_name, const char *auth_token, const char *config,
- WT_FILE_SYSTEM **file_systemp)
+ const char *bucket, const char *auth_token, const char *config, WT_FILE_SYSTEM **file_system)
{
- WT_UNUSED(storage_source);
- WT_UNUSED(session);
- WT_UNUSED(bucket_name);
- WT_UNUSED(auth_token);
- WT_UNUSED(config);
- WT_UNUSED(file_systemp);
-
- return 0;
+ if (bucket == nullptr || strlen(bucket) == 0) {
+ std::cerr << "azure_customize_file_system: Bucket not specified." << std::endl;
+ return EINVAL;
+ }
+
+ // Get any prefix to be used for the object keys.
+ WT_CONFIG_ITEM obj_prefix_config;
+ std::string obj_prefix;
+
+ // Get the value of the config key from the string
+ azure_store *azure_storage = reinterpret_cast<azure_store *>(storage_source);
+ int ret;
+ if ((ret = azure_storage->wt_api->config_get_string(
+ azure_storage->wt_api, session, config, "prefix", &obj_prefix_config)) == 0)
+ obj_prefix = std::string(obj_prefix_config.str, obj_prefix_config.len);
+ else if (ret != WT_NOTFOUND) {
+ std::cerr << "azure_customize_file_system: error parsing config for object prefix."
+ << std::endl;
+ return ret;
+ }
+
+ // Fetch the native WT file system.
+ WT_FILE_SYSTEM *wt_file_system;
+ if ((ret = azure_storage->wt_api->file_system_get(
+ azure_storage->wt_api, session, &wt_file_system)) != 0)
+ return ret;
+
+ // Create file system and allocate memory for the file system.
+ azure_file_system *azure_fs;
+ try {
+ azure_fs = new azure_file_system;
+ } catch (std::bad_alloc &e) {
+ std::cerr << std::string("azure_customize_file_system: ") + e.what() << std::endl;
+ return ENOMEM;
+ }
+
+ // Initialise references to azure storage source, wt fs and home directory.
+ azure_fs->store = azure_storage;
+ azure_fs->wt_fs = wt_file_system;
+ azure_fs->home_dir = session->connection->get_home(session->connection);
+ try {
+ azure_fs->azure_conn = std::make_unique<azure_connection>(bucket, obj_prefix);
+ } catch (std::runtime_error &e) {
+ std::cerr << std::string("azure_customize_file_system: ") + e.what() << std::endl;
+ return ENOENT;
+ }
+ azure_fs->fs.fs_directory_list = azure_object_list;
+ azure_fs->fs.fs_directory_list_single = azure_object_list_single;
+ azure_fs->fs.fs_directory_list_free = azure_object_list_free;
+ azure_fs->fs.terminate = azure_file_system_terminate;
+ azure_fs->fs.fs_exist = azure_file_exists;
+ azure_fs->fs.fs_open_file = azure_file_open;
+ azure_fs->fs.fs_remove = azure_remove;
+ azure_fs->fs.fs_rename = azure_rename;
+ azure_fs->fs.fs_size = azure_object_size;
+
+ // Add to the list of the active file systems. Lock will be freed when the scope is exited.
+ {
+ std::lock_guard<std::mutex> lock_guard(azure_storage->fs_mutex);
+ azure_storage->azure_fs.push_back(azure_fs);
+ }
+ *file_system = &azure_fs->fs;
+ return ret;
}
+// Add a reference to the storage source so we can reference count to know when to terminate.
static int
azure_add_reference(WT_STORAGE_SOURCE *storage_source)
{
- WT_UNUSED(storage_source);
-
+ azure_store *azure_storage = reinterpret_cast<azure_store *>(storage_source);
+ if (azure_storage->reference_count == 0 || azure_storage->reference_count + 1 == 0) {
+ std::cerr << "azure_add_reference: missing reference or overflow." << std::endl;
+ return EINVAL;
+ }
+ ++azure_storage->reference_count;
return 0;
}
@@ -138,12 +209,26 @@ azure_flush_finish(WT_STORAGE_SOURCE *storage_source, WT_SESSION *session,
return 0;
}
+// Discard any resources on termination.
static int
-azure_terminate(WT_FILE_SYSTEM *file_system, WT_SESSION *session)
+azure_terminate(WT_STORAGE_SOURCE *storage_source, WT_SESSION *session)
{
- WT_UNUSED(file_system);
- WT_UNUSED(session);
-
+ azure_store *azure_storage = reinterpret_cast<azure_store *>(storage_source);
+
+ if (--azure_storage->reference_count != 0)
+ return 0;
+
+ /*
+ * Terminate any active filesystems. There are no references to the storage source, so it is
+ * safe to walk the active filesystem list without a lock. The removal from the list happens
+ * under a lock. Also, removal happens from the front and addition at the end, so we are safe.
+ */
+ while (!azure_storage->azure_fs.empty()) {
+ WT_FILE_SYSTEM *fs = reinterpret_cast<WT_FILE_SYSTEM *>(azure_storage->azure_fs.front());
+ azure_file_system_terminate(fs, session);
+ }
+
+ delete azure_storage;
return 0;
}
@@ -182,17 +267,29 @@ azure_object_list_free(
WT_UNUSED(file_system);
WT_UNUSED(session);
WT_UNUSED(dirlist);
- WT_UNUSED(count);
return 0;
}
+// Discard any resources on termination of the file system.
static int
azure_file_system_terminate(WT_FILE_SYSTEM *file_system, WT_SESSION *session)
{
- WT_UNUSED(file_system);
+ azure_file_system *azure_fs = reinterpret_cast<azure_file_system *>(file_system);
+ azure_store *azure_storage = azure_fs->store;
+
WT_UNUSED(session);
+ // Remove from the active file system list. The lock will be freed when the scope is exited.
+ {
+ std::lock_guard<std::mutex> lock_guard(azure_storage->fs_mutex);
+ // Erase-remove idiom used to eliminate specific file system.
+ azure_storage->azure_fs.erase(
+ std::remove(azure_storage->azure_fs.begin(), azure_storage->azure_fs.end(), azure_fs),
+ azure_storage->azure_fs.end());
+ }
+ azure_fs->azure_conn.reset();
+ free(azure_fs);
return 0;
}
@@ -299,11 +396,30 @@ azure_file_size(WT_FILE_HANDLE *file_handle, WT_SESSION *session, wt_off_t *size
return 0;
}
+// An Azure storage source library - creates an entry point to the Azure extension.
int
wiredtiger_extension_init(WT_CONNECTION *connection, WT_CONFIG_ARG *config)
{
- WT_UNUSED(connection);
- WT_UNUSED(config);
-
+ azure_store *azure_storage = new azure_store;
+ azure_storage->wt_api = connection->get_extension_api(connection);
+
+ azure_storage->store.ss_customize_file_system = azure_customize_file_system;
+ azure_storage->store.ss_add_reference = azure_add_reference;
+ azure_storage->store.terminate = azure_terminate;
+ azure_storage->store.ss_flush = azure_flush;
+ azure_storage->store.ss_flush_finish = azure_flush_finish;
+
+ // The first reference is implied by the call to add_storage_source.
+ azure_storage->reference_count = 1;
+
+ // Load the storage.
+ if ((connection->add_storage_source(
+ connection, "azure_store", &azure_storage->store, nullptr)) != 0) {
+ std::cerr
+ << "wiredtiger_extension_init: Could not load Azure storage source, shutting down."
+ << std::endl;
+ delete azure_storage;
+ return -1;
+ }
return 0;
}
diff --git a/src/third_party/wiredtiger/import.data b/src/third_party/wiredtiger/import.data
index b04da175cf1..8294b3902dc 100644
--- a/src/third_party/wiredtiger/import.data
+++ b/src/third_party/wiredtiger/import.data
@@ -2,5 +2,5 @@
"vendor": "wiredtiger",
"github": "wiredtiger/wiredtiger.git",
"branch": "mongodb-master",
- "commit": "79615a890553eced88b10b115a3e2419331d9ae6"
+ "commit": "32c207223a1841bb3a40c893c57025a79e20a2a1"
}
diff --git a/src/third_party/wiredtiger/test/suite/test_tiered19.py b/src/third_party/wiredtiger/test/suite/test_tiered19.py
index 514b1c8701e..f995f55e803 100644
--- a/src/third_party/wiredtiger/test/suite/test_tiered19.py
+++ b/src/third_party/wiredtiger/test/suite/test_tiered19.py
@@ -26,10 +26,14 @@
# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
# OTHER DEALINGS IN THE SOFTWARE.
-import wttest
+import random, string, wiredtiger, wttest
from helper_tiered import get_auth_token, TieredConfigMixin
from wtscenario import make_scenarios
+# test_tiered19.py
+# Testing storage source functionality for the Azure Storage Store
+# and Google Cloud extensions.
+
class test_tiered19(wttest.WiredTigerTestCase, TieredConfigMixin):
tiered_storage_sources = [
@@ -50,8 +54,49 @@ class test_tiered19(wttest.WiredTigerTestCase, TieredConfigMixin):
# Make scenarios for different cloud service providers
scenarios = make_scenarios(tiered_storage_sources)
+ # Load the storage source extensions.
def conn_extensions(self, extlist):
TieredConfigMixin.conn_extensions(self, extlist)
- def test_gcp_and_azure(self):
- pass \ No newline at end of file
+ def get_storage_source(self):
+ return self.conn.get_storage_source(self.ss_name)
+
+ def get_fs_config(self, prefix = ''):
+ return ",prefix=" + prefix
+
+ def test_ss_file_systems_gcp_and_azure(self):
+ if self.ss_name != "azure_store":
+ return
+ session = self.session
+ ss = self.get_storage_source()
+
+ prefix_1 = self.bucket_prefix.join(
+ random.choices(string.ascii_letters + string.digits, k=10))
+ prefix_2 = self.bucket_prefix.join(
+ random.choices(string.ascii_letters + string.digits, k=10))
+
+ # Test the customize file system function errors when there is an invalid bucket.
+ err_msg = '/Exception: Invalid argument/'
+ self.assertRaisesHavingMessage(wiredtiger.WiredTigerError,
+ lambda: ss.ss_customize_file_system(
+ session, "", None, self.get_fs_config(prefix_1)), err_msg)
+
+ bad_bucket = "./bucket_BAD"
+ err_msg = '/Exception: No such file or directory/'
+ self.assertRaisesHavingMessage(wiredtiger.WiredTigerError,
+ lambda: ss.ss_customize_file_system(
+ session, bad_bucket, None, self.get_fs_config(prefix_1)), err_msg)
+
+ # Test the customize file system function works when there is a valid bucket.
+ azure_fs_1 = ss.ss_customize_file_system(
+ session, self.bucket, None, self.get_fs_config(prefix_1))
+
+ # Create another file systems to make sure that terminate works.
+ ss.ss_customize_file_system(
+ session, self.bucket, None, self.get_fs_config(prefix_2))
+
+ # Test that azure file system terminate succeeds.
+ self.assertEqual(azure_fs_1.terminate(session), 0)
+
+ # Test that azure storage source terminate succeeds.
+ self.assertEqual(ss.terminate(session), 0)