diff options
author | Etienne Petrel <etienne.petrel@mongodb.com> | 2022-02-08 02:56:05 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-02-08 03:46:37 +0000 |
commit | 43f1a02384bd3c75d9ed4d76dad3401af519f930 (patch) | |
tree | a8b92002400e888095f1b7ace386c7a0198d55d1 | |
parent | 24d29e06e2009d88a6e0e7962dfb22116ca12d95 (diff) | |
download | mongo-43f1a02384bd3c75d9ed4d76dad3401af519f930.tar.gz |
Import wiredtiger: 8b3ed1efc59a88ace0d50f9e40f1b5e5a092b05a from branch mongodb-master
ref: 79c9e71494..8b3ed1efc5
for: 5.3.0
WT-8717 Flush files to S3 store and cache them locally to a local directory
4 files changed, 67 insertions, 10 deletions
diff --git a/src/third_party/wiredtiger/ext/storage_sources/s3_store/s3_connection.cpp b/src/third_party/wiredtiger/ext/storage_sources/s3_store/s3_connection.cpp index cebae784d2f..35b73fd945c 100644 --- a/src/third_party/wiredtiger/ext/storage_sources/s3_store/s3_connection.cpp +++ b/src/third_party/wiredtiger/ext/storage_sources/s3_store/s3_connection.cpp @@ -84,17 +84,15 @@ int S3Connection::PutObject( const std::string &bucketName, const std::string &objectKey, const std::string &fileName) const { - Aws::S3Crt::Model::PutObjectRequest request; - request.SetBucket(bucketName); - request.SetKey(objectKey); - std::shared_ptr<Aws::IOStream> inputData = Aws::MakeShared<Aws::FStream>( "s3-source", fileName.c_str(), std::ios_base::in | std::ios_base::binary); + Aws::S3Crt::Model::PutObjectRequest request; + request.SetBucket(bucketName); + request.SetKey(objectKey); request.SetBody(inputData); Aws::S3Crt::Model::PutObjectOutcome outcome = s3CrtClient.PutObject(request); - if (outcome.IsSuccess()) { return (0); } else { diff --git a/src/third_party/wiredtiger/ext/storage_sources/s3_store/s3_storage_source.cpp b/src/third_party/wiredtiger/ext/storage_sources/s3_store/s3_storage_source.cpp index f1baaefbd3f..8fa6f5bcf88 100644 --- a/src/third_party/wiredtiger/ext/storage_sources/s3_store/s3_storage_source.cpp +++ b/src/third_party/wiredtiger/ext/storage_sources/s3_store/s3_storage_source.cpp @@ -31,6 +31,7 @@ #include <sys/stat.h> #include <fstream> #include <errno.h> +#include <unistd.h> #include "s3_connection.h" #include "s3_log_system.h" @@ -70,6 +71,8 @@ Aws::SDKOptions options; static int S3GetDirectory(const std::string &, const std::string &, bool, std::string &); static bool S3CacheExists(WT_FILE_SYSTEM *, const std::string &); static std::string S3Path(const std::string &, const std::string &); +static std::string S3HomePath(WT_FILE_SYSTEM *, const char *); +static std::string S3CachePath(WT_FILE_SYSTEM *, const char *); static int S3Exist(WT_FILE_SYSTEM *, WT_SESSION *, const char *, bool *); static int S3CustomizeFileSystem( WT_STORAGE_SOURCE *, WT_SESSION *, const char *, const char *, const char *, WT_FILE_SYSTEM **); @@ -85,7 +88,7 @@ static int S3ObjectListSingle( static int S3ObjectListFree(WT_FILE_SYSTEM *, WT_SESSION *, char **, uint32_t); /* - * S3Exist -- + * S3Exist-- * Return if the file exists. First checks the cache, and then the S3 Bucket. */ static int @@ -406,6 +409,39 @@ S3Terminate(WT_STORAGE_SOURCE *storage, WT_SESSION *session) } /* + * S3Flush -- + * Flush file to S3 Store using AWS SDK C++ PutObject. + */ +static int +S3Flush(WT_STORAGE_SOURCE *storageSource, WT_SESSION *session, WT_FILE_SYSTEM *fileSystem, + const char *source, const char *object, const char *config) +{ + S3_FILE_SYSTEM *fs = (S3_FILE_SYSTEM *)fileSystem; + return (fs->connection->PutObject(fs->bucketName, object, source)); +} + +/* + * S3FlushFinish -- + * Flush local file to cache. + */ +static int +S3FlushFinish(WT_STORAGE_SOURCE *storage, WT_SESSION *session, WT_FILE_SYSTEM *fileSystem, + const char *source, const char *object, const char *config) +{ + /* Constructing the pathname for source and cache from file system and local. */ + std::string srcPath = S3Path(((S3_FILE_SYSTEM *)fileSystem)->homeDir, source); + std::string destPath = S3Path(((S3_FILE_SYSTEM *)fileSystem)->cacheDir, source); + + /* Linking file with the local file. */ + int ret = link(srcPath.c_str(), destPath.c_str()); + + /* Linking file with the local file. */ + if (ret == 0) + ret = chmod(destPath.c_str(), 0444); + return ret; +} + +/* * wiredtiger_extension_init -- * A S3 storage source library. */ @@ -442,6 +478,8 @@ wiredtiger_extension_init(WT_CONNECTION *connection, WT_CONFIG_ARG *config) s3->storageSource.ss_customize_file_system = S3CustomizeFileSystem; s3->storageSource.ss_add_reference = S3AddReference; s3->storageSource.terminate = S3Terminate; + s3->storageSource.ss_flush = S3Flush; + s3->storageSource.ss_flush_finish = S3FlushFinish; /* Load the storage */ if ((ret = connection->add_storage_source(connection, "s3_store", &s3->storageSource, NULL)) != diff --git a/src/third_party/wiredtiger/import.data b/src/third_party/wiredtiger/import.data index fa9b18d4a07..90a47ade371 100644 --- a/src/third_party/wiredtiger/import.data +++ b/src/third_party/wiredtiger/import.data @@ -2,5 +2,5 @@ "vendor": "wiredtiger", "github": "wiredtiger/wiredtiger.git", "branch": "mongodb-master", - "commit": "79c9e7149493ad043fffdb3626fe018568f6892a" + "commit": "8b3ed1efc59a88ace0d50f9e40f1b5e5a092b05a" } diff --git a/src/third_party/wiredtiger/test/suite/test_s3_store01.py b/src/third_party/wiredtiger/test/suite/test_s3_store01.py index 30a67e6f865..99a9ddde27c 100644 --- a/src/third_party/wiredtiger/test/suite/test_s3_store01.py +++ b/src/third_party/wiredtiger/test/suite/test_s3_store01.py @@ -26,7 +26,7 @@ # ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR # OTHER DEALINGS IN THE SOFTWARE. -import wiredtiger, wttest +import os, wttest # test_s3_store01.py # Test minimal S3 extension with basic interactions with AWS S3CrtClient. @@ -45,13 +45,34 @@ class test_s3_store01(wttest.WiredTigerTestCase): def test_local_basic(self): # Test some basic functionality of the storage source API, calling # each supported method in the API at least once. + bucket_name = "rubysfirstbucket" + cache_prefix = "cache-" + filename = "foobar" + object_name = "foobar" + session = self.session s3_store = self.get_s3_storage_source() + fs = s3_store.ss_customize_file_system(session, bucket_name, "Secret", None) + + # Test flush functionality and flushing to cache and checking if file exists. + f = open(filename, 'wb') + outbytes = ('Ruby\n'*100).encode() + f.write(outbytes) + f.close() - fs = s3_store.ss_customize_file_system(session, "wt-bucket", "Secret", None) - _ = fs.fs_directory_list(session, self.bucket_name, '') + s3_store.ss_flush(session, fs, filename, object_name) + s3_store.ss_flush_finish(session, fs, filename, object_name) + self.assertTrue(fs.fs_exist(session, filename)) + + # Checking that the file still exists in S3 after removing it from the cache. + os.remove(cache_prefix + bucket_name + '/' + filename) + self.assertTrue(fs.fs_exist(session, filename)) + + fs2 = s3_store.ss_customize_file_system(session, "wt-bucket", "Secret", None) + _ = fs2.fs_directory_list(session, self.bucket_name, '') fs.terminate(session) + fs2.terminate(session) if __name__ == '__main__': wttest.run() |