diff options
6 files changed, 378 insertions, 20 deletions
diff --git a/src/third_party/wiredtiger/ext/storage_sources/local_store/local_store.c b/src/third_party/wiredtiger/ext/storage_sources/local_store/local_store.c index ee5720a1844..fc1984b2547 100644 --- a/src/third_party/wiredtiger/ext/storage_sources/local_store/local_store.c +++ b/src/third_party/wiredtiger/ext/storage_sources/local_store/local_store.c @@ -123,6 +123,7 @@ typedef struct local_file_handle { */ static int local_bucket_path(WT_FILE_SYSTEM *, const char *, char **); static int local_cache_path(WT_FILE_SYSTEM *, const char *, char **); +static int local_home_path(WT_FILE_SYSTEM *, const char *, char **); static int local_configure(LOCAL_STORAGE *, WT_CONFIG_ARG *); static int local_configure_int(LOCAL_STORAGE *, WT_CONFIG_ARG *, const char *, uint32_t *); static int local_delay(LOCAL_STORAGE *); @@ -356,6 +357,16 @@ local_cache_path(WT_FILE_SYSTEM *file_system, const char *name, char **pathp) } /* + * local_home_path -- + * Construct the source pathname from the file system and local name. + */ +static int +local_home_path(WT_FILE_SYSTEM *file_system, const char *name, char **pathp) +{ + return (local_path(file_system, ((LOCAL_FILE_SYSTEM *)file_system)->home_dir, name, pathp)); +} + +/* * local_path -- * Construct a pathname from the file system and local name. */ @@ -667,15 +678,18 @@ local_flush(WT_STORAGE_SOURCE *storage_source, WT_SESSION *session, WT_FILE_SYST { LOCAL_STORAGE *local; int ret; - char *dest_path; + char *dest_path, *src_path; (void)config; /* unused */ - dest_path = NULL; + dest_path = src_path = NULL; local = (LOCAL_STORAGE *)storage_source; ret = 0; if (file_system == NULL || source == NULL || object == NULL) - return local_err(local, session, EINVAL, "ss_flush_finish: required arguments missing"); + return local_err(local, session, EINVAL, "ss_flush: required arguments missing"); + + if ((ret = local_home_path(file_system, source, &src_path)) != 0) + goto err; if ((ret = local_bucket_path(file_system, object, &dest_path)) != 0) goto err; @@ -683,13 +697,15 @@ local_flush(WT_STORAGE_SOURCE *storage_source, WT_SESSION *session, WT_FILE_SYST if ((ret = local_delay(local)) != 0) goto err; - if ((ret = local_file_copy(local, session, source, dest_path, WT_FS_OPEN_FILE_TYPE_DATA)) != 0) + if ((ret = local_file_copy(local, session, src_path, dest_path, WT_FS_OPEN_FILE_TYPE_DATA)) != + 0) goto err; local->object_writes++; err: free(dest_path); + free(src_path); return (ret); } @@ -703,16 +719,19 @@ local_flush_finish(WT_STORAGE_SOURCE *storage_source, WT_SESSION *session, { LOCAL_STORAGE *local; int ret; - char *dest_path; + char *dest_path, *src_path; (void)config; /* unused */ - dest_path = NULL; + dest_path = src_path = NULL; local = (LOCAL_STORAGE *)storage_source; ret = 0; if (file_system == NULL || source == NULL || object == NULL) return local_err(local, session, EINVAL, "ss_flush_finish: required arguments missing"); + if ((ret = local_home_path(file_system, source, &src_path)) != 0) + goto err; + if ((ret = local_cache_path(file_system, object, &dest_path)) != 0) goto err; @@ -721,7 +740,7 @@ local_flush_finish(WT_STORAGE_SOURCE *storage_source, WT_SESSION *session, * Link the object with the original local object. The could be replaced by a file copy if * portability is an issue. */ - if ((ret = link(source, dest_path)) != 0) { + if ((ret = link(src_path, dest_path)) != 0) { ret = local_err( local, session, errno, "ss_flush_finish link %s to %s failed", source, dest_path); goto err; @@ -731,6 +750,7 @@ local_flush_finish(WT_STORAGE_SOURCE *storage_source, WT_SESSION *session, ret = local_err(local, session, errno, "%s: ss_flush_finish chmod failed", dest_path); err: free(dest_path); + free(src_path); return (ret); } diff --git a/src/third_party/wiredtiger/import.data b/src/third_party/wiredtiger/import.data index 68b76c95f80..993d4e1ba51 100644 --- a/src/third_party/wiredtiger/import.data +++ b/src/third_party/wiredtiger/import.data @@ -2,5 +2,5 @@ "vendor": "wiredtiger", "github": "wiredtiger/wiredtiger.git", "branch": "mongodb-master", - "commit": "5b3f8910769d52ecb14d5b0326207c21b8de09be" + "commit": "5c24bfac132a01fe9c561f353bf8b0420e2ec1dd" } diff --git a/src/third_party/wiredtiger/src/conn/conn_tiered.c b/src/third_party/wiredtiger/src/conn/conn_tiered.c index 6f34064ca7e..2d4995dedd1 100644 --- a/src/third_party/wiredtiger/src/conn/conn_tiered.c +++ b/src/third_party/wiredtiger/src/conn/conn_tiered.c @@ -212,7 +212,7 @@ __tier_flush_meta( WT_RET(__wt_scr_alloc(session, 512, &buf)); dhandle = &tiered->iface; - newconfig = NULL; + newconfig = obj_value = NULL; WT_ERR(__wt_meta_track_on(session)); tracking = true; @@ -235,6 +235,7 @@ __tier_flush_meta( err: __wt_free(session, newconfig); + __wt_free(session, obj_value); if (release) WT_TRET(__wt_session_release_dhandle(session)); __wt_scr_free(session, &buf); @@ -251,11 +252,15 @@ int __wt_tier_do_flush(WT_SESSION_IMPL *session, WT_TIERED *tiered, uint32_t id, const char *local_uri, const char *obj_uri) { + WT_CONFIG_ITEM pfx; WT_DECL_RET; WT_FILE_SYSTEM *bucket_fs; WT_STORAGE_SOURCE *storage_source; - const char *local_name, *obj_name; + size_t len; + char *tmp; + const char *cfg[2], *local_name, *obj_name; + tmp = NULL; storage_source = tiered->bstorage->storage_source; bucket_fs = tiered->bstorage->file_system; @@ -263,27 +268,36 @@ __wt_tier_do_flush(WT_SESSION_IMPL *session, WT_TIERED *tiered, uint32_t id, con WT_PREFIX_SKIP_REQUIRED(session, local_name, "file:"); obj_name = obj_uri; WT_PREFIX_SKIP_REQUIRED(session, obj_name, "object:"); + cfg[0] = tiered->obj_config; + cfg[1] = NULL; + WT_RET(__wt_config_gets(session, cfg, "tiered_storage.bucket_prefix", &pfx)); + WT_ASSERT(session, pfx.len != 0); + len = strlen(obj_name) + pfx.len + 1; + WT_RET(__wt_calloc_def(session, len, &tmp)); + WT_ERR(__wt_snprintf(tmp, len, "%.*s%s", (int)pfx.len, pfx.str, obj_name)); /* This call make take a while, and may fail due to network timeout. */ - WT_RET(storage_source->ss_flush( - storage_source, &session->iface, bucket_fs, local_name, obj_name, NULL)); + WT_ERR( + storage_source->ss_flush(storage_source, &session->iface, bucket_fs, local_name, tmp, NULL)); WT_WITH_CHECKPOINT_LOCK(session, WT_WITH_SCHEMA_LOCK(session, ret = __tier_flush_meta(session, tiered, local_uri, obj_uri))); - WT_RET(ret); + WT_ERR(ret); /* * We may need a way to cleanup flushes for those not completed (after a crash), or failed (due * to previous network outage). */ - WT_RET(storage_source->ss_flush_finish( - storage_source, &session->iface, bucket_fs, local_name, obj_name, NULL)); + WT_ERR(storage_source->ss_flush_finish( + storage_source, &session->iface, bucket_fs, local_name, tmp, NULL)); /* * After successful flushing, push a work unit to drop the local object in the future. The * object will be removed locally after the local retention period expires. */ - WT_RET(__wt_tiered_put_drop_local(session, tiered, id)); - return (0); + WT_ERR(__wt_tiered_put_drop_local(session, tiered, id)); +err: + __wt_free(session, tmp); + return (ret); } /* diff --git a/src/third_party/wiredtiger/src/tiered/tiered_opener.c b/src/third_party/wiredtiger/src/tiered/tiered_opener.c index b6f3df90fa7..f786ef30eff 100644 --- a/src/third_party/wiredtiger/src/tiered/tiered_opener.c +++ b/src/third_party/wiredtiger/src/tiered/tiered_opener.c @@ -17,13 +17,18 @@ __tiered_opener_open(WT_BLOCK_FILE_OPENER *opener, WT_SESSION_IMPL *session, uin WT_FS_OPEN_FILE_TYPE type, u_int flags, WT_FH **fhp) { WT_BUCKET_STORAGE *bstorage; + WT_CONFIG_ITEM pfx; WT_DECL_RET; WT_TIERED *tiered; - const char *object_name, *object_uri; + size_t len; + char *tmp; + const char *cfg[2], *object_name, *object_uri, *object_val; bool local_only; tiered = opener->cookie; object_uri = NULL; + object_val = NULL; + tmp = NULL; local_only = false; WT_ASSERT(session, @@ -54,13 +59,25 @@ __tiered_opener_open(WT_BLOCK_FILE_OPENER *opener, WT_SESSION_IMPL *session, uin * This can be called at any time, because we are opening the objects lazily. */ if (!local_only && ret != 0) { + /* Get the prefix from the object's metadata, not the connection. */ + WT_ERR(__wt_metadata_search(session, object_uri, (char **)&object_val)); + cfg[0] = object_val; + cfg[1] = NULL; + WT_ERR(__wt_config_gets(session, cfg, "tiered_storage.bucket_prefix", &pfx)); + /* We expect a prefix. */ + WT_ASSERT(session, pfx.len != 0); + len = strlen(object_name) + pfx.len + 1; + WT_ERR(__wt_calloc_def(session, len, &tmp)); + WT_ERR(__wt_snprintf(tmp, len, "%.*s%s", (int)pfx.len, pfx.str, object_name)); bstorage = tiered->bstorage; - LF_SET(WT_FS_OPEN_READONLY); + LF_SET(WT_FS_OPEN_FIXED | WT_FS_OPEN_READONLY); WT_WITH_BUCKET_STORAGE( - bstorage, session, { ret = __wt_open(session, object_name, type, flags, fhp); }); + bstorage, session, { ret = __wt_open(session, tmp, type, flags, fhp); }); } err: __wt_free(session, object_uri); + __wt_free(session, object_val); + __wt_free(session, tmp); return (ret); } diff --git a/src/third_party/wiredtiger/test/suite/test_tiered09.py b/src/third_party/wiredtiger/test/suite/test_tiered09.py new file mode 100755 index 00000000000..e44b828bfd9 --- /dev/null +++ b/src/third_party/wiredtiger/test/suite/test_tiered09.py @@ -0,0 +1,160 @@ +#!/usr/bin/env python +# +# Public Domain 2014-present MongoDB, Inc. +# Public Domain 2008-2014 WiredTiger, Inc. +# +# This is free and unencumbered software released into the public domain. +# +# Anyone is free to copy, modify, publish, use, compile, sell, or +# distribute this software, either in source code form or as a compiled +# binary, for any purpose, commercial or non-commercial, and by any +# means. +# +# In jurisdictions that recognize copyright laws, the author or authors +# of this software dedicate any and all copyright interest in the +# software to the public domain. We make this dedication for the benefit +# of the public at large and to the detriment of our heirs and +# successors. We intend this dedication to be an overt act of +# relinquishment in perpetuity of all present and future rights to this +# software under copyright law. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. +# IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR +# OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, +# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +# OTHER DEALINGS IN THE SOFTWARE. + +import os, time, wiredtiger, wttest +from wiredtiger import stat +StorageSource = wiredtiger.StorageSource # easy access to constants + +# test_tiered09.py +# Test tiered storage with sequential connections with different prefixes. +class test_tiered09(wttest.WiredTigerTestCase): + + # If the 'uri' changes all the other names must change with it. + base = 'test_tiered09-000000000' + base2 = 'test_second09-000000000' + fileuri_base = 'file:' + base + obj1file = base + '1.wtobj' + obj1second = base2 + '1.wtobj' + obj2file = base + '2.wtobj' + obj3file = base + '3.wtobj' + objuri = 'object:' + base + '1.wtobj' + tiereduri = "tiered:test_tiered09" + uri = "table:test_tiered09" + uri2 = "table:test_second09" + + auth_token = "test_token" + bucket = "mybucket" + extension_name = "local_store" + prefix1 = "1_" + prefix2 = "2_" + prefix3 = "3_" + retention = 1 + saved_conn = '' + def conn_config(self): + os.mkdir(self.bucket) + self.saved_conn = \ + 'statistics=(all),' + \ + 'tiered_storage=(auth_token=%s,' % self.auth_token + \ + 'bucket=%s,' % self.bucket + \ + 'bucket_prefix=%s,' % self.prefix1 + \ + 'local_retention=%d,' % self.retention + \ + 'name=%s)' % self.extension_name + return self.saved_conn + + # Load the local store extension. + def conn_extensions(self, extlist): + # Windows doesn't support dynamically loaded extension libraries. + if os.name == 'nt': + extlist.skip_if_missing = True + extlist.extension('storage_sources', self.extension_name) + + def check(self, tc, n): + for i in range(0, n): + self.assertEqual(tc[str(i)], str(i)) + tc.set_key(str(n)) + self.assertEquals(tc.search(), wiredtiger.WT_NOTFOUND) + + # Test calling the flush_tier API. + def test_tiered(self): + # Create a table. Add some data. Checkpoint and flush tier. + # Close the connection. Then we want to reopen the connection + # with a different bucket prefix and repeat. Then reopen the + # connection with the original prefix. Then reopen and verify + # we can read all the data. + # + # Verify the files are as we expect also. We expect: + # 1_<tablename>-00000001.wtobj + # 2_<tablename>-00000002.wtobj + # 1_<tablename>-00000003.wtobj + # but we can read and access all data in all objects. + self.session.create(self.uri, 'key_format=S,value_format=S,') + # Add first data. Checkpoint, flush and close the connection. + c = self.session.open_cursor(self.uri) + c["0"] = "0" + self.check(c, 1) + c.close() + self.session.checkpoint() + self.session.flush_tier(None) + self.close_conn() + self.assertTrue(os.path.exists(self.obj1file)) + self.assertTrue(os.path.exists(self.obj2file)) + bucket_obj = self.bucket + '/' + self.prefix1 + self.obj1file + self.assertTrue(os.path.exists(bucket_obj)) + # Since we've closed and reopened the connection we lost the work units + # to drop the local objects. Clean them up now to make sure we can open + # the correct object in the bucket. + localobj = './' + self.obj1file + os.remove(localobj) + + # Reopen the connection with a different prefix this time. + conn_params = self.saved_conn + ',tiered_storage=(bucket_prefix=%s)' % self.prefix2 + self.conn = self.wiredtiger_open('.', conn_params) + self.session = self.conn.open_session() + # Add a second table created while the second prefix is used for the connection. + self.session.create(self.uri2, 'key_format=S,value_format=S,') + # Add first data. Checkpoint, flush and close the connection. + c = self.session.open_cursor(self.uri2) + c["0"] = "0" + self.check(c, 1) + c.close() + # Add more data to original table. + # Checkpoint, flush and close the connection. + c = self.session.open_cursor(self.uri) + c["1"] = "1" + self.check(c, 2) + c.close() + self.session.checkpoint() + self.session.flush_tier(None) + self.close_conn() + # Check each table was created with the correct prefix. + bucket_obj = self.bucket + '/' + self.prefix2 + self.obj1second + self.assertTrue(os.path.exists(bucket_obj)) + bucket_obj = self.bucket + '/' + self.prefix1 + self.obj2file + self.assertTrue(os.path.exists(bucket_obj)) + # Since we've closed and reopened the connection we lost the work units + # to drop the local objects. Clean them up now to make sure we can open + # the correct object in the bucket. + localobj = './' + self.obj2file + os.remove(localobj) + localobj = './' + self.obj1second + + # Reopen with the other prefix and check all data. Even though we're using the + # other prefix, we should find all the data in the object with the original + # prefix. + conn_params = self.saved_conn + ',tiered_storage=(bucket_prefix=%s)' % self.prefix3 + self.conn = self.wiredtiger_open('.', conn_params) + self.session = self.conn.open_session() + c = self.session.open_cursor(self.uri) + self.check(c, 2) + c.close() + c = self.session.open_cursor(self.uri2) + self.check(c, 1) + c.close() + +if __name__ == '__main__': + wttest.run() diff --git a/src/third_party/wiredtiger/test/suite/test_tiered10.py b/src/third_party/wiredtiger/test/suite/test_tiered10.py new file mode 100755 index 00000000000..a6b3e5dc56c --- /dev/null +++ b/src/third_party/wiredtiger/test/suite/test_tiered10.py @@ -0,0 +1,147 @@ +#!/usr/bin/env python +# +# Public Domain 2014-present MongoDB, Inc. +# Public Domain 2008-2014 WiredTiger, Inc. +# +# This is free and unencumbered software released into the public domain. +# +# Anyone is free to copy, modify, publish, use, compile, sell, or +# distribute this software, either in source code form or as a compiled +# binary, for any purpose, commercial or non-commercial, and by any +# means. +# +# In jurisdictions that recognize copyright laws, the author or authors +# of this software dedicate any and all copyright interest in the +# software to the public domain. We make this dedication for the benefit +# of the public at large and to the detriment of our heirs and +# successors. We intend this dedication to be an overt act of +# relinquishment in perpetuity of all present and future rights to this +# software under copyright law. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. +# IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR +# OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, +# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +# OTHER DEALINGS IN THE SOFTWARE. + +import os, time, wiredtiger, wttest +from wiredtiger import stat +StorageSource = wiredtiger.StorageSource # easy access to constants + +# test_tiered10.py +# Test tiered storage with simultaneous connections using different +# prefixes to the same bucket directory but different local databases. +class test_tiered10(wttest.WiredTigerTestCase): + + # If the 'uri' changes all the other names must change with it. + base = 'test_tiered10-000000000' + fileuri_base = 'file:' + base + obj1file = base + '1.wtobj' + objuri = 'object:' + base + '1.wtobj' + tiereduri = "tiered:test_tiered10" + uri = "table:test_tiered10" + + auth_token = "test_token" + bucket = "mybucket" + bucket1 = "../" + bucket + bucket2 = "../" + bucket + conn1_dir = "first_dir" + conn2_dir = "second_dir" + extension_name = "local_store" + prefix1 = "1_" + prefix2 = "2_" + retention = 1 + saved_conn = '' + def conn_config(self): + os.mkdir(self.bucket) + os.mkdir(self.conn1_dir) + os.mkdir(self.conn2_dir) + # Use this to create the directories and set up for the others. + dummy_conn = 'create,statistics=(all),' + self.saved_conn = \ + 'create,statistics=(all),' + \ + 'tiered_storage=(auth_token=%s,' % self.auth_token + \ + 'bucket=../%s,' % self.bucket + \ + 'local_retention=%d,' % self.retention + \ + 'name=%s),' % self.extension_name + return dummy_conn + + # Load the local store extension. + def conn_extensions(self, extlist): + # Windows doesn't support dynamically loaded extension libraries. + if os.name == 'nt': + extlist.skip_if_missing = True + extlist.extension('storage_sources', self.extension_name) + + def check(self, tc, base, n): + for i in range(base, n): + self.assertEqual(tc[str(i)], str(i)) + tc.set_key(str(n)) + self.assertEquals(tc.search(), wiredtiger.WT_NOTFOUND) + + # Test calling the flush_tier API. + def test_tiered(self): + # Have two connections running in different directories, but sharing + # the same bucket directory with different prefixes. Each database + # creates an identically named table with different data. Each then + # does a flush tier testing that both databases can coexist in the + # same bucket without conflict. + # + # Then reopen the connections and make sure we can read data correctly. + # + # We open two connections manually so that they both have the same relative + # pathnames. The standard connection is just a dummy for this test. + ext = self.extensionsConfig() + conn1_params = self.saved_conn + ext + ',tiered_storage=(bucket_prefix=%s)' % self.prefix1 + conn1 = self.wiredtiger_open(self.conn1_dir, conn1_params) + session1 = conn1.open_session() + conn2_params = self.saved_conn + ext + ',tiered_storage=(bucket_prefix=%s)' % self.prefix2 + conn2 = self.wiredtiger_open(self.conn2_dir, conn2_params) + session2 = conn2.open_session() + + session1.create(self.uri, 'key_format=S,value_format=S,') + session2.create(self.uri, 'key_format=S,value_format=S,') + + # Add first data. Checkpoint, flush and close the connection. + c1 = session1.open_cursor(self.uri) + c2 = session2.open_cursor(self.uri) + c1["0"] = "0" + c2["20"] = "20" + self.check(c1, 0, 1) + self.check(c2, 20, 1) + c1.close() + c2.close() + session1.checkpoint() + session1.flush_tier(None) + session2.checkpoint() + session2.flush_tier(None) + conn1_obj1 = self.bucket + '/' + self.prefix1 + self.obj1file + conn2_obj1 = self.bucket + '/' + self.prefix2 + self.obj1file + self.assertTrue(os.path.exists(conn1_obj1)) + self.assertTrue(os.path.exists(conn2_obj1)) + conn1.close() + conn2.close() + + # Remove the local copies of the objects before we reopen so that we force + # the system to read from the bucket or bucket cache. + local = self.conn1_dir + '/' + self.obj1file + os.remove(local) + local = self.conn2_dir + '/' + self.obj1file + os.remove(local) + + conn1 = self.wiredtiger_open(self.conn1_dir, conn1_params) + session1 = conn1.open_session() + conn2 = self.wiredtiger_open(self.conn2_dir, conn2_params) + session2 = conn2.open_session() + + c1 = session1.open_cursor(self.uri) + c2 = session2.open_cursor(self.uri) + self.check(c1, 0, 1) + self.check(c2, 20, 1) + c1.close() + c2.close() + +if __name__ == '__main__': + wttest.run() |