summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEtienne Petrel <etienne.petrel@mongodb.com>2022-02-08 02:56:05 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-02-08 03:46:37 +0000
commit43f1a02384bd3c75d9ed4d76dad3401af519f930 (patch)
treea8b92002400e888095f1b7ace386c7a0198d55d1
parent24d29e06e2009d88a6e0e7962dfb22116ca12d95 (diff)
downloadmongo-43f1a02384bd3c75d9ed4d76dad3401af519f930.tar.gz
Import wiredtiger: 8b3ed1efc59a88ace0d50f9e40f1b5e5a092b05a from branch mongodb-master
ref: 79c9e71494..8b3ed1efc5 for: 5.3.0 WT-8717 Flush files to S3 store and cache them locally to a local directory
-rw-r--r--src/third_party/wiredtiger/ext/storage_sources/s3_store/s3_connection.cpp8
-rw-r--r--src/third_party/wiredtiger/ext/storage_sources/s3_store/s3_storage_source.cpp40
-rw-r--r--src/third_party/wiredtiger/import.data2
-rw-r--r--src/third_party/wiredtiger/test/suite/test_s3_store01.py27
4 files changed, 67 insertions, 10 deletions
diff --git a/src/third_party/wiredtiger/ext/storage_sources/s3_store/s3_connection.cpp b/src/third_party/wiredtiger/ext/storage_sources/s3_store/s3_connection.cpp
index cebae784d2f..35b73fd945c 100644
--- a/src/third_party/wiredtiger/ext/storage_sources/s3_store/s3_connection.cpp
+++ b/src/third_party/wiredtiger/ext/storage_sources/s3_store/s3_connection.cpp
@@ -84,17 +84,15 @@ int
S3Connection::PutObject(
const std::string &bucketName, const std::string &objectKey, const std::string &fileName) const
{
- Aws::S3Crt::Model::PutObjectRequest request;
- request.SetBucket(bucketName);
- request.SetKey(objectKey);
-
std::shared_ptr<Aws::IOStream> inputData = Aws::MakeShared<Aws::FStream>(
"s3-source", fileName.c_str(), std::ios_base::in | std::ios_base::binary);
+ Aws::S3Crt::Model::PutObjectRequest request;
+ request.SetBucket(bucketName);
+ request.SetKey(objectKey);
request.SetBody(inputData);
Aws::S3Crt::Model::PutObjectOutcome outcome = s3CrtClient.PutObject(request);
-
if (outcome.IsSuccess()) {
return (0);
} else {
diff --git a/src/third_party/wiredtiger/ext/storage_sources/s3_store/s3_storage_source.cpp b/src/third_party/wiredtiger/ext/storage_sources/s3_store/s3_storage_source.cpp
index f1baaefbd3f..8fa6f5bcf88 100644
--- a/src/third_party/wiredtiger/ext/storage_sources/s3_store/s3_storage_source.cpp
+++ b/src/third_party/wiredtiger/ext/storage_sources/s3_store/s3_storage_source.cpp
@@ -31,6 +31,7 @@
#include <sys/stat.h>
#include <fstream>
#include <errno.h>
+#include <unistd.h>
#include "s3_connection.h"
#include "s3_log_system.h"
@@ -70,6 +71,8 @@ Aws::SDKOptions options;
static int S3GetDirectory(const std::string &, const std::string &, bool, std::string &);
static bool S3CacheExists(WT_FILE_SYSTEM *, const std::string &);
static std::string S3Path(const std::string &, const std::string &);
+static std::string S3HomePath(WT_FILE_SYSTEM *, const char *);
+static std::string S3CachePath(WT_FILE_SYSTEM *, const char *);
static int S3Exist(WT_FILE_SYSTEM *, WT_SESSION *, const char *, bool *);
static int S3CustomizeFileSystem(
WT_STORAGE_SOURCE *, WT_SESSION *, const char *, const char *, const char *, WT_FILE_SYSTEM **);
@@ -85,7 +88,7 @@ static int S3ObjectListSingle(
static int S3ObjectListFree(WT_FILE_SYSTEM *, WT_SESSION *, char **, uint32_t);
/*
- * S3Exist --
+ * S3Exist--
* Return if the file exists. First checks the cache, and then the S3 Bucket.
*/
static int
@@ -406,6 +409,39 @@ S3Terminate(WT_STORAGE_SOURCE *storage, WT_SESSION *session)
}
/*
+ * S3Flush --
+ * Flush file to S3 Store using AWS SDK C++ PutObject.
+ */
+static int
+S3Flush(WT_STORAGE_SOURCE *storageSource, WT_SESSION *session, WT_FILE_SYSTEM *fileSystem,
+ const char *source, const char *object, const char *config)
+{
+ S3_FILE_SYSTEM *fs = (S3_FILE_SYSTEM *)fileSystem;
+ return (fs->connection->PutObject(fs->bucketName, object, source));
+}
+
+/*
+ * S3FlushFinish --
+ * Flush local file to cache.
+ */
+static int
+S3FlushFinish(WT_STORAGE_SOURCE *storage, WT_SESSION *session, WT_FILE_SYSTEM *fileSystem,
+ const char *source, const char *object, const char *config)
+{
+ /* Constructing the pathname for source and cache from file system and local. */
+ std::string srcPath = S3Path(((S3_FILE_SYSTEM *)fileSystem)->homeDir, source);
+ std::string destPath = S3Path(((S3_FILE_SYSTEM *)fileSystem)->cacheDir, source);
+
+ /* Linking file with the local file. */
+ int ret = link(srcPath.c_str(), destPath.c_str());
+
+ /* Linking file with the local file. */
+ if (ret == 0)
+ ret = chmod(destPath.c_str(), 0444);
+ return ret;
+}
+
+/*
* wiredtiger_extension_init --
* A S3 storage source library.
*/
@@ -442,6 +478,8 @@ wiredtiger_extension_init(WT_CONNECTION *connection, WT_CONFIG_ARG *config)
s3->storageSource.ss_customize_file_system = S3CustomizeFileSystem;
s3->storageSource.ss_add_reference = S3AddReference;
s3->storageSource.terminate = S3Terminate;
+ s3->storageSource.ss_flush = S3Flush;
+ s3->storageSource.ss_flush_finish = S3FlushFinish;
/* Load the storage */
if ((ret = connection->add_storage_source(connection, "s3_store", &s3->storageSource, NULL)) !=
diff --git a/src/third_party/wiredtiger/import.data b/src/third_party/wiredtiger/import.data
index fa9b18d4a07..90a47ade371 100644
--- a/src/third_party/wiredtiger/import.data
+++ b/src/third_party/wiredtiger/import.data
@@ -2,5 +2,5 @@
"vendor": "wiredtiger",
"github": "wiredtiger/wiredtiger.git",
"branch": "mongodb-master",
- "commit": "79c9e7149493ad043fffdb3626fe018568f6892a"
+ "commit": "8b3ed1efc59a88ace0d50f9e40f1b5e5a092b05a"
}
diff --git a/src/third_party/wiredtiger/test/suite/test_s3_store01.py b/src/third_party/wiredtiger/test/suite/test_s3_store01.py
index 30a67e6f865..99a9ddde27c 100644
--- a/src/third_party/wiredtiger/test/suite/test_s3_store01.py
+++ b/src/third_party/wiredtiger/test/suite/test_s3_store01.py
@@ -26,7 +26,7 @@
# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
# OTHER DEALINGS IN THE SOFTWARE.
-import wiredtiger, wttest
+import os, wttest
# test_s3_store01.py
# Test minimal S3 extension with basic interactions with AWS S3CrtClient.
@@ -45,13 +45,34 @@ class test_s3_store01(wttest.WiredTigerTestCase):
def test_local_basic(self):
# Test some basic functionality of the storage source API, calling
# each supported method in the API at least once.
+ bucket_name = "rubysfirstbucket"
+ cache_prefix = "cache-"
+ filename = "foobar"
+ object_name = "foobar"
+
session = self.session
s3_store = self.get_s3_storage_source()
+ fs = s3_store.ss_customize_file_system(session, bucket_name, "Secret", None)
+
+ # Test flush functionality and flushing to cache and checking if file exists.
+ f = open(filename, 'wb')
+ outbytes = ('Ruby\n'*100).encode()
+ f.write(outbytes)
+ f.close()
- fs = s3_store.ss_customize_file_system(session, "wt-bucket", "Secret", None)
- _ = fs.fs_directory_list(session, self.bucket_name, '')
+ s3_store.ss_flush(session, fs, filename, object_name)
+ s3_store.ss_flush_finish(session, fs, filename, object_name)
+ self.assertTrue(fs.fs_exist(session, filename))
+
+ # Checking that the file still exists in S3 after removing it from the cache.
+ os.remove(cache_prefix + bucket_name + '/' + filename)
+ self.assertTrue(fs.fs_exist(session, filename))
+
+ fs2 = s3_store.ss_customize_file_system(session, "wt-bucket", "Secret", None)
+ _ = fs2.fs_directory_list(session, self.bucket_name, '')
fs.terminate(session)
+ fs2.terminate(session)
if __name__ == '__main__':
wttest.run()