diff options
author | Will Korteland <will.korteland@mongodb.com> | 2022-02-10 04:19:58 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-02-10 05:45:13 +0000 |
commit | c4ba946b1ace9f91dd6def000ede23a673c8c589 (patch) | |
tree | 3b1ab03e8b3231d8d9eb353ab8fd2189ca8a0f07 /src/third_party/wiredtiger/ext/storage_sources/s3_store | |
parent | 986917d2d1d761cd0fec7a6ce9ed289730b84f1a (diff) | |
download | mongo-c4ba946b1ace9f91dd6def000ede23a673c8c589.tar.gz |
Import wiredtiger: 8b96413f6f8c6f024d232524275bd03903ca0d72 from branch mongodb-master
ref: 2bb45102b7..8b96413f6f
for: 5.3.0
WT-8660 Make bucket name and prefix as part of the S3Connection class
Diffstat (limited to 'src/third_party/wiredtiger/ext/storage_sources/s3_store')
6 files changed, 348 insertions, 317 deletions
diff --git a/src/third_party/wiredtiger/ext/storage_sources/s3_store/s3_connection.cpp b/src/third_party/wiredtiger/ext/storage_sources/s3_store/s3_connection.cpp index 35b73fd945c..4cd71ce03c3 100644 --- a/src/third_party/wiredtiger/ext/storage_sources/s3_store/s3_connection.cpp +++ b/src/third_party/wiredtiger/ext/storage_sources/s3_store/s3_connection.cpp @@ -12,23 +12,13 @@ #include <vector> /* - * ListBuckets -- - * Builds a list of buckets from AWS account into a vector. Returns 0 if success, otherwise - * 1. + * S3Connection -- + * Constructor for AWS S3 bucket connection. */ -int -S3Connection::ListBuckets(std::vector<std::string> &buckets) const +S3Connection::S3Connection(const Aws::S3Crt::ClientConfiguration &config, + const std::string &bucketName, const std::string &objPrefix) + : _s3CrtClient(config), _bucketName(bucketName), _objectPrefix(objPrefix) { - auto outcome = s3CrtClient.ListBuckets(); - if (outcome.IsSuccess()) { - for (const auto &bucket : outcome.GetResult().GetBuckets()) - buckets.push_back(bucket.GetName()); - return (0); - } else { - std::cerr << "Error in ListBuckets: " << outcome.GetError().GetMessage() << std::endl - << std::endl; - return (1); - } } /* @@ -38,20 +28,15 @@ S3Connection::ListBuckets(std::vector<std::string> &buckets) const * to 1000. Returns 0 if success, otherwise 1. */ int -S3Connection::ListObjects(const std::string &bucketName, const std::string &prefix, - std::vector<std::string> &objects, uint32_t batchSize, bool listSingle) const +S3Connection::ListObjects(const std::string &prefix, std::vector<std::string> &objects, + uint32_t batchSize, bool listSingle) const { Aws::S3Crt::Model::ListObjectsV2Request request; - request.SetBucket(bucketName); - request.SetPrefix(prefix); - - if (listSingle) - request.SetMaxKeys(1); - else - request.SetMaxKeys(batchSize); - - Aws::S3Crt::Model::ListObjectsV2Outcome outcomes = s3CrtClient.ListObjectsV2(request); + request.SetBucket(_bucketName); + request.SetPrefix(_objectPrefix + prefix); + listSingle ? request.SetMaxKeys(1) : request.SetMaxKeys(batchSize); + Aws::S3Crt::Model::ListObjectsV2Outcome outcomes = _s3CrtClient.ListObjectsV2(request); if (!outcomes.IsSuccess()) return (1); auto result = outcomes.GetResult(); @@ -65,7 +50,7 @@ S3Connection::ListObjects(const std::string &bucketName, const std::string &pref std::string continuationToken = result.GetNextContinuationToken(); while (continuationToken != "") { request.SetContinuationToken(continuationToken); - outcomes = s3CrtClient.ListObjectsV2(request); + outcomes = _s3CrtClient.ListObjectsV2(request); if (!outcomes.IsSuccess()) return (1); result = outcomes.GetResult(); @@ -81,24 +66,23 @@ S3Connection::ListObjects(const std::string &bucketName, const std::string &pref * Puts an object into an S3 bucket. Returns 0 if success, otherwise 1. */ int -S3Connection::PutObject( - const std::string &bucketName, const std::string &objectKey, const std::string &fileName) const +S3Connection::PutObject(const std::string &objectKey, const std::string &fileName) const { std::shared_ptr<Aws::IOStream> inputData = Aws::MakeShared<Aws::FStream>( "s3-source", fileName.c_str(), std::ios_base::in | std::ios_base::binary); Aws::S3Crt::Model::PutObjectRequest request; - request.SetBucket(bucketName); - request.SetKey(objectKey); + request.SetBucket(_bucketName); + request.SetKey(_objectPrefix + objectKey); request.SetBody(inputData); - Aws::S3Crt::Model::PutObjectOutcome outcome = s3CrtClient.PutObject(request); + Aws::S3Crt::Model::PutObjectOutcome outcome = _s3CrtClient.PutObject(request); if (outcome.IsSuccess()) { return (0); - } else { - std::cerr << "Error in PutObject: " << outcome.GetError().GetMessage() << std::endl; - return (1); } + + std::cerr << "Error in PutObject: " << outcome.GetError().GetMessage() << std::endl; + return (1); } /* @@ -106,20 +90,18 @@ S3Connection::PutObject( * Deletes an object from S3 bucket. Returns 0 if success, otherwise 1. */ int -S3Connection::DeleteObject(const std::string &bucketName, const std::string &objectKey) const +S3Connection::DeleteObject(const std::string &objectKey) const { Aws::S3Crt::Model::DeleteObjectRequest request; - request.SetBucket(bucketName); - request.SetKey(objectKey); - - Aws::S3Crt::Model::DeleteObjectOutcome outcome = s3CrtClient.DeleteObject(request); + request.SetBucket(_bucketName); + request.SetKey(_objectPrefix + objectKey); - if (outcome.IsSuccess()) { + Aws::S3Crt::Model::DeleteObjectOutcome outcome = _s3CrtClient.DeleteObject(request); + if (outcome.IsSuccess()) return (0); - } else { - std::cerr << "Error in DeleteObject: " << outcome.GetError().GetMessage() << std::endl; - return (1); - } + + std::cerr << "Error in DeleteObject: " << outcome.GetError().GetMessage() << std::endl; + return (1); } /* @@ -127,15 +109,14 @@ S3Connection::DeleteObject(const std::string &bucketName, const std::string &obj * Checks whether an object with the given key exists in the S3 bucket. */ int -S3Connection::ObjectExists( - const std::string &bucketName, const std::string &objectKey, bool &exists) const +S3Connection::ObjectExists(const std::string &objectKey, bool &exists) const { exists = false; Aws::S3Crt::Model::HeadObjectRequest request; - request.SetBucket(bucketName); - request.SetKey(objectKey); - Aws::S3Crt::Model::HeadObjectOutcome outcome = s3CrtClient.HeadObject(request); + request.SetBucket(_bucketName); + request.SetKey(_objectPrefix + objectKey); + Aws::S3Crt::Model::HeadObjectOutcome outcome = _s3CrtClient.HeadObject(request); /* * If an object with the given key does not exist the HEAD request will return a 404. @@ -146,12 +127,11 @@ S3Connection::ObjectExists( return (0); } else if (outcome.GetError().GetResponseCode() == Aws::Http::HttpResponseCode::NOT_FOUND) return (0); - else - return (-1); -} -/* - * S3Connection -- - * Constructor for AWS S3 bucket connection. - */ -S3Connection::S3Connection(const Aws::S3Crt::ClientConfiguration &config) : s3CrtClient(config){}; + /* + * Fix later, return a proper error code. Not sure if we always have + * outcome.GetError().GetResponseCode() + */ + std::cerr << "Error in ObjectExists." << std::endl; + return (1); +} diff --git a/src/third_party/wiredtiger/ext/storage_sources/s3_store/s3_connection.h b/src/third_party/wiredtiger/ext/storage_sources/s3_store/s3_connection.h index 5024eb10d46..d748378b8da 100644 --- a/src/third_party/wiredtiger/ext/storage_sources/s3_store/s3_connection.h +++ b/src/third_party/wiredtiger/ext/storage_sources/s3_store/s3_connection.h @@ -14,18 +14,18 @@ */ class S3Connection { public: - explicit S3Connection(const Aws::S3Crt::ClientConfiguration &config); - int ListBuckets(std::vector<std::string> &buckets) const; - int ListObjects(const std::string &bucketName, const std::string &prefix, - std::vector<std::string> &objects, uint32_t batchSize = 1000, bool listSingle = false) const; - int PutObject(const std::string &bucketName, const std::string &objectKey, - const std::string &fileName) const; - int DeleteObject(const std::string &bucketName, const std::string &objectKey) const; - int ObjectExists( - const std::string &bucketName, const std::string &objectKey, bool &exists) const; + explicit S3Connection(const Aws::S3Crt::ClientConfiguration &config, + const std::string &bucketName, const std::string &objPrefix = ""); + int ListObjects(const std::string &prefix, std::vector<std::string> &objects, + uint32_t batchSize = 1000, bool listSingle = false) const; + int PutObject(const std::string &objectKey, const std::string &fileName) const; + int DeleteObject(const std::string &objectKey) const; + int ObjectExists(const std::string &objectKey, bool &exists) const; ~S3Connection() = default; private: - const Aws::S3Crt::S3CrtClient s3CrtClient; + const Aws::S3Crt::S3CrtClient _s3CrtClient; + const std::string _bucketName; + const std::string _objectPrefix; }; #endif diff --git a/src/third_party/wiredtiger/ext/storage_sources/s3_store/s3_log_system.cpp b/src/third_party/wiredtiger/ext/storage_sources/s3_store/s3_log_system.cpp index 4835adabbf5..f5aadbdd676 100644 --- a/src/third_party/wiredtiger/ext/storage_sources/s3_store/s3_log_system.cpp +++ b/src/third_party/wiredtiger/ext/storage_sources/s3_store/s3_log_system.cpp @@ -3,15 +3,13 @@ #include <cstdarg> S3LogSystem::S3LogSystem(WT_EXTENSION_API *wtApi, uint32_t wtVerbosityLevel) + : _wtApi(wtApi), _wtVerbosityLevel(wtVerbosityLevel) { // If the verbosity level is out of range it will default to AWS SDK Error level. - if (verbosityMapping.find(wtVerbosityLevel) != verbosityMapping.end()) { - awsLogLevel = verbosityMapping.at(wtVerbosityLevel); - } else { - awsLogLevel = Aws::Utils::Logging::LogLevel::Error; - } - this->wtApi = wtApi; - this->wtVerbosityLevel = wtVerbosityLevel; + if (verbosityMapping.find(wtVerbosityLevel) != verbosityMapping.end()) + _awsLogLevel = verbosityMapping.at(wtVerbosityLevel); + else + _awsLogLevel = Aws::Utils::Logging::LogLevel::Error; } void @@ -50,14 +48,14 @@ S3LogSystem::LogStream( void S3LogSystem::LogAwsMessage(const char *tag, const std::string &message) const { - wtApi->err_printf(wtApi, NULL, "%s : %s", tag, message.c_str()); + _wtApi->err_printf(_wtApi, NULL, "%s : %s", tag, message.c_str()); } void S3LogSystem::LogVerboseMessage(int32_t verbosityLevel, const std::string &message) { - if (verbosityLevel <= wtVerbosityLevel) - wtApi->err_printf(wtApi, NULL, "%s", message.c_str()); + if (verbosityLevel <= _wtVerbosityLevel) + _wtApi->err_printf(_wtApi, NULL, "%s", message.c_str()); } void diff --git a/src/third_party/wiredtiger/ext/storage_sources/s3_store/s3_log_system.h b/src/third_party/wiredtiger/ext/storage_sources/s3_store/s3_log_system.h index 36dabc69fc3..bf527d6fdff 100644 --- a/src/third_party/wiredtiger/ext/storage_sources/s3_store/s3_log_system.h +++ b/src/third_party/wiredtiger/ext/storage_sources/s3_store/s3_log_system.h @@ -6,6 +6,13 @@ #include <atomic> +// Mapping the desired WiredTiger extension verbosity level to a rough equivalent AWS +// SDK verbosity level. +static const std::map<int32_t, Aws::Utils::Logging::LogLevel> verbosityMapping = { + {-3, Aws::Utils::Logging::LogLevel::Error}, {-2, Aws::Utils::Logging::LogLevel::Warn}, + {-1, Aws::Utils::Logging::LogLevel::Info}, {0, Aws::Utils::Logging::LogLevel::Info}, + {1, Aws::Utils::Logging::LogLevel::Debug}}; + class S3LogSystem : public Aws::Utils::Logging::LogSystemInterface { public: @@ -13,7 +20,7 @@ class S3LogSystem : public Aws::Utils::Logging::LogSystemInterface { Aws::Utils::Logging::LogLevel GetLogLevel(void) const override { - return awsLogLevel; + return (_awsLogLevel); } void Log( Aws::Utils::Logging::LogLevel logLevel, const char *tag, const char *format, ...) override; @@ -24,13 +31,7 @@ class S3LogSystem : public Aws::Utils::Logging::LogSystemInterface { private: void LogAwsMessage(const char *tag, const std::string &message) const; void LogVerboseMessage(int32_t verbosityLevel, const std::string &message); - std::atomic<Aws::Utils::Logging::LogLevel> awsLogLevel; - WT_EXTENSION_API *wtApi; - int32_t wtVerbosityLevel; + std::atomic<Aws::Utils::Logging::LogLevel> _awsLogLevel; + WT_EXTENSION_API *_wtApi; + int32_t _wtVerbosityLevel; }; -// Mapping the desired WiredTiger extension verbosity level to a rough equivalent AWS -// SDK verbosity level. -static const std::map<int32_t, Aws::Utils::Logging::LogLevel> verbosityMapping = { - {-3, Aws::Utils::Logging::LogLevel::Error}, {-2, Aws::Utils::Logging::LogLevel::Warn}, - {-1, Aws::Utils::Logging::LogLevel::Info}, {0, Aws::Utils::Logging::LogLevel::Info}, - {1, Aws::Utils::Logging::LogLevel::Debug}}; diff --git a/src/third_party/wiredtiger/ext/storage_sources/s3_store/s3_storage_source.cpp b/src/third_party/wiredtiger/ext/storage_sources/s3_store/s3_storage_source.cpp index 8fa6f5bcf88..e4d095d536a 100644 --- a/src/third_party/wiredtiger/ext/storage_sources/s3_store/s3_storage_source.cpp +++ b/src/third_party/wiredtiger/ext/storage_sources/s3_store/s3_storage_source.cpp @@ -25,11 +25,11 @@ * ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR * OTHER DEALINGS IN THE SOFTWARE. */ - #include <wiredtiger.h> #include <wiredtiger_ext.h> #include <sys/stat.h> #include <fstream> +#include <list> #include <errno.h> #include <unistd.h> @@ -42,23 +42,29 @@ #define UNUSED(x) (void)(x) #define FS2S3(fs) (((S3_FILE_SYSTEM *)(fs))->storage) +struct S3_FILE_SYSTEM; + /* S3 storage source structure. */ -typedef struct { +struct S3_STORAGE { WT_STORAGE_SOURCE storageSource; /* Must come first */ WT_EXTENSION_API *wtApi; /* Extension API */ + + std::mutex fsListMutex; /* Protect the file system list */ + std::list<S3_FILE_SYSTEM *> fsList; /* List of initiated file systems */ + + uint32_t referenceCount; /* Number of references to this storge source */ int32_t verbose; -} S3_STORAGE; +}; -typedef struct { +struct S3_FILE_SYSTEM { /* Must come first - this is the interface for the file system we are implementing. */ WT_FILE_SYSTEM fileSystem; S3Connection *connection; S3LogSystem *log; S3_STORAGE *storage; - std::string bucketName; std::string cacheDir; /* Directory for cached objects */ std::string homeDir; /* Owned by the connection */ -} S3_FILE_SYSTEM; +}; /* Configuration variables for connecting to S3CrtClient. */ const Aws::String region = Aws::Region::AP_SOUTHEAST_2; @@ -88,26 +94,6 @@ static int S3ObjectListSingle( static int S3ObjectListFree(WT_FILE_SYSTEM *, WT_SESSION *, char **, uint32_t); /* - * S3Exist-- - * Return if the file exists. First checks the cache, and then the S3 Bucket. - */ -static int -S3Exist(WT_FILE_SYSTEM *fileSystem, WT_SESSION *session, const char *name, bool *exist) -{ - S3_STORAGE *s3; - int ret; - s3 = FS2S3(fileSystem); - S3_FILE_SYSTEM *fs = (S3_FILE_SYSTEM *)fileSystem; - - /* It's not in the cache, try the S3 bucket. */ - *exist = S3CacheExists(fileSystem, name); - if (!*exist) - ret = fs->connection->ObjectExists(fs->bucketName, name, *exist); - - return (ret); -} - -/* * S3Path -- * Construct a pathname from the directory and the object name. */ @@ -128,6 +114,25 @@ S3Path(const std::string &dir, const std::string &name) } /* + * S3Exist-- + * Return if the file exists. First checks the cache, and then the S3 Bucket. + */ +static int +S3Exist(WT_FILE_SYSTEM *fileSystem, WT_SESSION *session, const char *name, bool *exist) +{ + S3_STORAGE *s3; + s3 = FS2S3(fileSystem); + S3_FILE_SYSTEM *fs = (S3_FILE_SYSTEM *)fileSystem; + + /* It's not in the cache, try the S3 bucket. */ + *exist = S3CacheExists(fileSystem, name); + if (!*exist) + return (fs->connection->ObjectExists(name, *exist)); + + return (0); +} + +/* * S3CacheExists -- * Checks whether the given file exists in the cache. */ @@ -181,106 +186,83 @@ static int S3CustomizeFileSystem(WT_STORAGE_SOURCE *storageSource, WT_SESSION *session, const char *bucketName, const char *authToken, const char *config, WT_FILE_SYSTEM **fileSystem) { - S3_STORAGE *s3; S3_FILE_SYSTEM *fs; + S3_STORAGE *s3; int ret; - WT_CONFIG_ITEM cacheDir; - std::string cacheStr; + std::string cacheDir, homeDir; s3 = (S3_STORAGE *)storageSource; /* Mark parameters as unused for now, until implemented. */ UNUSED(authToken); - Aws::S3Crt::ClientConfiguration awsConfig; - awsConfig.region = region; - awsConfig.throughputTargetGbps = throughputTargetGbps; - awsConfig.partSize = partSize; + /* We need to have a bucket to setup the file system. */ + if (bucketName == NULL || strlen(bucketName) == 0) { + std::cerr << "Error: Bucket not specified"; + return (EINVAL); + } - /* Parse configuration string. */ - ret = s3->wtApi->config_get_string(s3->wtApi, session, config, "cache_directory", &cacheDir); - if (ret == 0) - cacheStr = cacheDir.str; - else if (ret == WT_NOTFOUND) + /* + * Parse configuration string. + */ + + /* Get any prefix to be used for the object keys. */ + WT_CONFIG_ITEM objPrefixConf; + std::string objPrefix; + if ((ret = s3->wtApi->config_get_string( + s3->wtApi, session, config, "prefix", &objPrefixConf)) == 0) + objPrefix = objPrefixConf.str; + else if (ret != WT_NOTFOUND) { + std::cerr << "Error: customize_file_system: config parsing for object prefix"; + return 1; + } + + /* + * Get the directory to setup the cache, or use the default one. The default cache directory is + * named "cache-<name>", where name is the last component of the bucket name's path. We'll + * create it if it doesn't exist. + */ + WT_CONFIG_ITEM cacheDirConf; + std::string cacheStr; + if ((ret = s3->wtApi->config_get_string( + s3->wtApi, session, config, "cache_directory", &cacheDirConf)) == 0) + cacheStr = cacheDirConf.str; + else if (ret == WT_NOTFOUND) { + cacheStr = "cache-" + std::string(bucketName); ret = 0; - else + } else return (ret); - Aws::Utils::Logging::InitializeAWSLogging( - Aws::MakeShared<S3LogSystem>("storage", s3->wtApi, s3->verbose)); + /* Store a copy of the home directory in the file system. */ + homeDir = session->connection->get_home(session->connection); + if ((ret = S3GetDirectory(homeDir, cacheStr, true, cacheDir)) != 0) + return (ret); + + /* Create the file system. */ if ((fs = (S3_FILE_SYSTEM *)calloc(1, sizeof(S3_FILE_SYSTEM))) == NULL) return (errno); fs->storage = s3; + fs->homeDir = homeDir; + fs->cacheDir = cacheDir; - /* Store a copy of the home directory and bucket name in the file system. */ - fs->homeDir = session->connection->get_home(session->connection); - fs->bucketName = bucketName; - - /* - * The default cache directory is named "cache-<name>", where name is the last component of the - * bucket name's path. We'll create it if it doesn't exist. - */ - if (cacheStr.empty()) { - cacheStr = "cache-" + fs->bucketName; - fs->cacheDir = cacheStr; - } - if ((ret = S3GetDirectory(fs->homeDir, cacheStr, true, fs->cacheDir)) != 0) - return (ret); + Aws::S3Crt::ClientConfiguration awsConfig; + awsConfig.region = region; + awsConfig.throughputTargetGbps = throughputTargetGbps; + awsConfig.partSize = partSize; /* New can fail; will deal with this later. */ - fs->connection = new S3Connection(awsConfig); + fs->connection = new S3Connection(awsConfig, bucketName, objPrefix); fs->fileSystem.fs_directory_list = S3ObjectList; fs->fileSystem.fs_directory_list_single = S3ObjectListSingle; fs->fileSystem.fs_directory_list_free = S3ObjectListFree; fs->fileSystem.terminate = S3FileSystemTerminate; fs->fileSystem.fs_exist = S3Exist; - /* TODO: Move these into tests. Just testing here temporarily to show all functions work. */ + /* Add to the list of the active file systems. */ { - std::vector<std::string> buckets; - fs->connection->ListBuckets(buckets); - std::cout << "All buckets under my account:" << std::endl; - for (const std::string &bucket : buckets) - std::cout << " * " << bucket << std::endl; - std::cout << std::endl; - - /* Have at least one bucket to use. */ - if (!buckets.empty()) { - const std::string firstBucket = buckets.at(0); - - /* Put object. */ - fs->connection->PutObject(firstBucket, "WiredTiger.turtle", "WiredTiger.turtle"); - - /* Testing directory list. */ - WT_SESSION *session = NULL; - const char *prefix = "WiredTiger"; - char **objectList; - uint32_t count; - - fs->fileSystem.fs_directory_list( - &fs->fileSystem, session, firstBucket.c_str(), prefix, &objectList, &count); - std::cout << "Objects in bucket '" << firstBucket << "':" << std::endl; - for (int i = 0; i < count; i++) - std::cout << (objectList)[i] << std::endl; - - std::cout << "Number of objects retrieved: " << count << std::endl; - fs->fileSystem.fs_directory_list_free(&fs->fileSystem, session, objectList, count); - - fs->fileSystem.fs_directory_list_single( - &fs->fileSystem, session, firstBucket.c_str(), prefix, &objectList, &count); - - std::cout << "Objects in bucket '" << firstBucket << "':" << std::endl; - for (int i = 0; i < count; i++) - std::cout << (objectList)[i] << std::endl; - - std::cout << "Number of objects retrieved: " << count << std::endl; - fs->fileSystem.fs_directory_list_free(&fs->fileSystem, session, objectList, count); - - /* Delete object. */ - fs->connection->DeleteObject(firstBucket, "WiredTiger.turtle"); - } else - std::cout << "No buckets in AWS account." << std::endl; + std::lock_guard<std::mutex> lockGuard(s3->fsListMutex); + s3->fsList.push_back(fs); } *fileSystem = &fs->fileSystem; @@ -295,9 +277,15 @@ static int S3FileSystemTerminate(WT_FILE_SYSTEM *fileSystem, WT_SESSION *session) { S3_FILE_SYSTEM *fs = (S3_FILE_SYSTEM *)fileSystem; + S3_STORAGE *s3 = fs->storage; UNUSED(session); /* unused */ + /* Remove from the active filesystems list. */ + { + std::lock_guard<std::mutex> lockGuard(s3->fsListMutex); + s3->fsList.remove(fs); + } delete (fs->connection); free(fs); @@ -309,13 +297,24 @@ S3FileSystemTerminate(WT_FILE_SYSTEM *fileSystem, WT_SESSION *session) * Return a list of object names for the given location. */ static int -S3ObjectList(WT_FILE_SYSTEM *fileSystem, WT_SESSION *session, const char *bucket, +S3ObjectList(WT_FILE_SYSTEM *fileSystem, WT_SESSION *session, const char *directory, const char *prefix, char ***objectList, uint32_t *count) { S3_FILE_SYSTEM *fs = (S3_FILE_SYSTEM *)fileSystem; std::vector<std::string> objects; + std::string completePrefix; + + if (directory != NULL) { + completePrefix += directory; + /* Add a terminating '/' if one doesn't exist. */ + if (completePrefix.length() > 1 && completePrefix[completePrefix.length() - 1] != '/') + completePrefix += '/'; + } + if (prefix != NULL) + completePrefix += prefix; + int ret; - if (ret = fs->connection->ListObjects(bucket, prefix, objects) != 0) + if (ret = fs->connection->ListObjects(completePrefix, objects) != 0) return (ret); *count = objects.size(); @@ -329,13 +328,24 @@ S3ObjectList(WT_FILE_SYSTEM *fileSystem, WT_SESSION *session, const char *bucket * Return a single object name for the given location. */ static int -S3ObjectListSingle(WT_FILE_SYSTEM *fileSystem, WT_SESSION *session, const char *bucket, +S3ObjectListSingle(WT_FILE_SYSTEM *fileSystem, WT_SESSION *session, const char *directory, const char *prefix, char ***objectList, uint32_t *count) { S3_FILE_SYSTEM *fs = (S3_FILE_SYSTEM *)fileSystem; std::vector<std::string> objects; + std::string completePrefix; + + if (directory != NULL) { + completePrefix += directory; + /* Add a terminating '/' if one doesn't exist. */ + if (completePrefix.length() > 1 && completePrefix[completePrefix.length() - 1] != '/') + completePrefix += '/'; + } + if (prefix != NULL) + completePrefix += prefix; + int ret; - if (ret = fs->connection->ListObjects(bucket, prefix, objects, 1, true) != 0) + if (ret = fs->connection->ListObjects(completePrefix, objects, 1, true) != 0) return (ret); *count = objects.size(); @@ -388,7 +398,15 @@ S3ObjectListAdd( static int S3AddReference(WT_STORAGE_SOURCE *storageSource) { - UNUSED(storageSource); + S3_STORAGE *s3 = (S3_STORAGE *)storageSource; + + /* + * Missing reference or overflow? + */ + if (s3->referenceCount == 0 || s3->referenceCount + 1 == 0) + return (EINVAL); + + ++s3->referenceCount; return (0); } @@ -397,14 +415,27 @@ S3AddReference(WT_STORAGE_SOURCE *storageSource) * Discard any resources on termination. */ static int -S3Terminate(WT_STORAGE_SOURCE *storage, WT_SESSION *session) +S3Terminate(WT_STORAGE_SOURCE *storageSource, WT_SESSION *session) { - S3_STORAGE *s3; - s3 = (S3_STORAGE *)storage; + S3_STORAGE *s3 = (S3_STORAGE *)storageSource; + if (--s3->referenceCount != 0) + return (0); + + /* + * Terminate any active filesystems. There are no references to the storage source, so it is + * safe to walk the active filesystem list without a lock. The removal from the list happens + * under a lock. Also, removal happens from the front and addition at the end, so we are safe. + */ + while (!s3->fsList.empty()) { + S3_FILE_SYSTEM *fs = s3->fsList.front(); + S3FileSystemTerminate(&fs->fileSystem, session); + } + + Aws::Utils::Logging::ShutdownAWSLogging(); Aws::ShutdownAPI(options); + delete (s3); - free(s3); return (0); } @@ -417,7 +448,7 @@ S3Flush(WT_STORAGE_SOURCE *storageSource, WT_SESSION *session, WT_FILE_SYSTEM *f const char *source, const char *object, const char *config) { S3_FILE_SYSTEM *fs = (S3_FILE_SYSTEM *)fileSystem; - return (fs->connection->PutObject(fs->bucketName, object, source)); + return (fs->connection->PutObject(object, source)); } /* @@ -428,9 +459,10 @@ static int S3FlushFinish(WT_STORAGE_SOURCE *storage, WT_SESSION *session, WT_FILE_SYSTEM *fileSystem, const char *source, const char *object, const char *config) { + S3_FILE_SYSTEM *fs = (S3_FILE_SYSTEM *)fileSystem; /* Constructing the pathname for source and cache from file system and local. */ - std::string srcPath = S3Path(((S3_FILE_SYSTEM *)fileSystem)->homeDir, source); - std::string destPath = S3Path(((S3_FILE_SYSTEM *)fileSystem)->cacheDir, source); + std::string srcPath = S3Path(fs->homeDir, source); + std::string destPath = S3Path(fs->cacheDir, source); /* Linking file with the local file. */ int ret = link(srcPath.c_str(), destPath.c_str()); @@ -452,8 +484,7 @@ wiredtiger_extension_init(WT_CONNECTION *connection, WT_CONFIG_ARG *config) S3_FILE_SYSTEM *fs; WT_CONFIG_ITEM v; - if ((s3 = (S3_STORAGE *)calloc(1, sizeof(S3_STORAGE))) == NULL) - return (errno); + s3 = new S3_STORAGE; s3->wtApi = connection->get_extension_api(connection); @@ -469,6 +500,9 @@ wiredtiger_extension_init(WT_CONNECTION *connection, WT_CONFIG_ARG *config) return (ret != 0 ? ret : EINVAL); } + /* Create a logger for this storage source, and then initialize the AWS SDK. */ + Aws::Utils::Logging::InitializeAWSLogging( + Aws::MakeShared<S3LogSystem>("storage", s3->wtApi, s3->verbose)); Aws::InitAPI(options); /* @@ -481,6 +515,11 @@ wiredtiger_extension_init(WT_CONNECTION *connection, WT_CONFIG_ARG *config) s3->storageSource.ss_flush = S3Flush; s3->storageSource.ss_flush_finish = S3FlushFinish; + /* + * The first reference is implied by the call to add_storage_source. + */ + s3->referenceCount = 1; + /* Load the storage */ if ((ret = connection->add_storage_source(connection, "s3_store", &s3->storageSource, NULL)) != 0) diff --git a/src/third_party/wiredtiger/ext/storage_sources/s3_store/test/test_s3_connection.cpp b/src/third_party/wiredtiger/ext/storage_sources/s3_store/test/test_s3_connection.cpp index 93980ef1529..8b94f4a2b62 100644 --- a/src/third_party/wiredtiger/ext/storage_sources/s3_store/test/test_s3_connection.cpp +++ b/src/third_party/wiredtiger/ext/storage_sources/s3_store/test/test_s3_connection.cpp @@ -1,73 +1,104 @@ #include <s3_connection.h> #include <fstream> +#include <random> -#include <fstream> - -/* Default config settings for the S3CrtClient. */ +/* Default config settings for the Test environment. */ namespace TestDefaults { const Aws::String region = Aws::Region::AP_SOUTHEAST_2; const double throughputTargetGbps = 5; -const uint64_t partSize = 8 * 1024 * 1024; /* 8 MB. */ +const uint64_t partSize = 8 * 1024 * 1024; /* 8 MB. */ +static std::string bucketName("s3testext"); // Can be overridden with environment variables. +static std::string objPrefix("s3test_artefacts/unit_"); // To be concatenated with a random string. } // namespace TestDefaults -int TestListBuckets(const Aws::S3Crt::ClientConfiguration &config); +#define TEST_SUCCESS 0 +#define TEST_FAILURE 1 + int TestListObjects(const Aws::S3Crt::ClientConfiguration &config); int TestObjectExists(const Aws::S3Crt::ClientConfiguration &config); -int CleanupTestListObjects(const Aws::S3Crt::ClientConfiguration &config, - const std::string &bucketName, const int totalObjects, const std::string &prefix, - const std::string &fileName); - /* Wrapper for unit test functions. */ -#define TEST(func, config, expectedOutput) \ - do { \ - int __ret; \ - if ((__ret = (func(config))) != expectedOutput) \ - return (__ret); \ +#define TEST(func, config) \ + do { \ + int __ret; \ + if ((__ret = (func(config))) != TEST_SUCCESS) \ + return (__ret); \ } while (0) /* - * TestListBuckets -- - * Example of a unit test to list S3 buckets under the associated AWS account. + * randomizeTestPrefix -- + * Concatenates a random suffix to the prefix being used for the test object keys. Example of + * generated test prefix: "s3test_artefacts/unit_" 2022-31-01-16-34-10_623843294/" */ -int -TestListBuckets(const Aws::S3Crt::ClientConfiguration &config) +static int +randomizeTestPrefix() { - int ret = 0; - S3Connection conn(config); - std::vector<std::string> buckets; - if (ret = conn.ListBuckets(buckets) != 0) - return (ret); + char timeStr[100]; + std::time_t t = std::time(nullptr); + + if (std::strftime(timeStr, sizeof(timeStr), "%F-%H-%M-%S", std::localtime(&t)) == 0) + return (TEST_FAILURE); + + TestDefaults::objPrefix += timeStr; + + /* Create a random device and use it to generate a random seed to initialize the generator. */ + std::random_device myRandomDevice; + unsigned seed = myRandomDevice(); + std::default_random_engine myRandomEngine(seed); + + TestDefaults::objPrefix += '_' + std::to_string(myRandomEngine()); + TestDefaults::objPrefix += '/'; + + return (TEST_SUCCESS); +} + +/* + * setupTestDefaults -- + * Override the defaults with the ones specific for this test instance. + */ +static int +setupTestDefaults() +{ + /* Prefer to use the bucket provided through the environment variable. */ + const char *envBucket = std::getenv("WT_S3_EXT_BUCKET"); + if (envBucket != NULL) + TestDefaults::bucketName = envBucket; + std::cout << "Bucket to be used for testing: " << TestDefaults::bucketName << std::endl; + + /* Append the prefix to be used for object names by a unique string. */ + if (randomizeTestPrefix() != 0) + return (TEST_FAILURE); + std::cout << "Generated prefix: " << TestDefaults::objPrefix << std::endl; + + return (TEST_SUCCESS); +} - std::cout << "All buckets under my account:" << std::endl; - for (const auto &bucket : buckets) - std::cout << " * " << bucket << std::endl; +static int +CleanupTestListObjects(S3Connection &conn, const int totalObjects, const std::string &prefix, + const std::string &fileName) +{ + /* Delete objects and file at end of test. */ + int ret = 0; + for (int i = 0; i < totalObjects; i++) { + if (ret = conn.DeleteObject(prefix + std::to_string(i) + ".txt") != 0) + std::cerr << "Error in CleanupTestListBuckets: failed to remove " + << TestDefaults::objPrefix + prefix << std::to_string(i) << ".txt from " + << TestDefaults::bucketName << std::endl; + } + std::remove(fileName.c_str()); return (ret); } /* * TestListObjects -- - * Unit test for listing S3 objects under the first bucket in the associated AWS account. This - * test assumes there are initially no objects with the prefix of "test_list_objects_" in the - * bucket. + * Unit test for listing S3 objects under the test bucket. */ +/* Todo: Remove code duplication in this function. */ int TestListObjects(const Aws::S3Crt::ClientConfiguration &config) { - S3Connection conn(config); - - /* Temporary workaround to get a bucket to use. */ - std::vector<std::string> buckets; - int ret; - if (ret = conn.ListBuckets(buckets) != 0) - return (ret); - if (buckets.empty()) { - std::cout << "No buckets found in AWS account." << std::endl; - return (1); - } - - const std::string firstBucket = buckets.at(0); + S3Connection conn(config, TestDefaults::bucketName, TestDefaults::objPrefix); std::vector<std::string> objects; /* Name of file to insert in the test. */ @@ -83,136 +114,121 @@ TestListObjects(const Aws::S3Crt::ClientConfiguration &config) /* Expected number of matches. */ int32_t expectedResult = 0; + int ret; /* No matching objects. */ - if (ret = conn.ListObjects(firstBucket, prefix, objects) != 0) + if (ret = conn.ListObjects(prefix, objects) != 0) return (ret); if (objects.size() != expectedResult) - return (1); + return (TEST_FAILURE); /* No matching objects with listSingle. */ - if (ret = conn.ListObjects(firstBucket, prefix, objects, batchSize, listSingle) != 0) + if (ret = conn.ListObjects(prefix, objects, batchSize, listSingle) != 0) return (ret); if (objects.size() != expectedResult) - return (1); + return (TEST_FAILURE); /* Create file to prepare for test. */ if (!static_cast<bool>(std::ofstream(fileName).put('.'))) { std::cerr << "Error creating file." << std::endl; - return (1); + return (TEST_FAILURE); } /* Put objects to prepare for test. */ for (int i = 0; i < totalObjects; i++) { - if (ret = conn.PutObject(firstBucket, prefix + std::to_string(i) + ".txt", fileName) != 0) { - CleanupTestListObjects(config, firstBucket, i, prefix, fileName); + if (ret = conn.PutObject(prefix + std::to_string(i) + ".txt", fileName) != 0) { + CleanupTestListObjects(conn, i, prefix, fileName); return (ret); } } /* List all objects. */ expectedResult = totalObjects; - if (ret = conn.ListObjects(firstBucket, prefix, objects) != 0) { - CleanupTestListObjects(config, firstBucket, totalObjects, prefix, fileName); + if (ret = conn.ListObjects(prefix, objects) != 0) { + CleanupTestListObjects(conn, totalObjects, prefix, fileName); return (ret); } if (objects.size() != expectedResult) { - CleanupTestListObjects(config, firstBucket, totalObjects, prefix, fileName); - return (1); + CleanupTestListObjects(conn, totalObjects, prefix, fileName); + return (TEST_FAILURE); } /* List single. */ objects.clear(); expectedResult = 1; - if (ret = conn.ListObjects(firstBucket, prefix, objects, batchSize, listSingle) != 0) { - CleanupTestListObjects(config, firstBucket, totalObjects, prefix, fileName); + if (ret = conn.ListObjects(prefix, objects, batchSize, listSingle) != 0) { + CleanupTestListObjects(conn, totalObjects, prefix, fileName); return (ret); } if (objects.size() != expectedResult) { - CleanupTestListObjects(config, firstBucket, totalObjects, prefix, fileName); - return (1); + CleanupTestListObjects(conn, totalObjects, prefix, fileName); + return (TEST_FAILURE); } /* Expected number of matches with test_list_objects_1 prefix. */ objects.clear(); expectedResult = 11; - if (ret = conn.ListObjects(firstBucket, prefix + "1", objects) != 0) { - CleanupTestListObjects(config, firstBucket, totalObjects, prefix, fileName); + if (ret = conn.ListObjects(prefix + "1", objects) != 0) { + CleanupTestListObjects(conn, totalObjects, prefix, fileName); return (ret); } if (objects.size() != expectedResult) { - CleanupTestListObjects(config, firstBucket, totalObjects, prefix, fileName); - return (1); + CleanupTestListObjects(conn, totalObjects, prefix, fileName); + return (TEST_FAILURE); } /* List with 5 objects per AWS request. */ objects.clear(); batchSize = 5; expectedResult = totalObjects; - if (ret = conn.ListObjects(firstBucket, prefix, objects, batchSize) != 0) { - CleanupTestListObjects(config, firstBucket, totalObjects, prefix, fileName); + if (ret = conn.ListObjects(prefix, objects, batchSize) != 0) { + CleanupTestListObjects(conn, totalObjects, prefix, fileName); return (ret); } if (objects.size() != expectedResult) { - CleanupTestListObjects(config, firstBucket, totalObjects, prefix, fileName); - return (1); + CleanupTestListObjects(conn, totalObjects, prefix, fileName); + return (TEST_FAILURE); } /* ListSingle with 8 objects per AWS request. */ objects.clear(); expectedResult = 1; - if (ret = conn.ListObjects(firstBucket, prefix, objects, batchSize, listSingle) != 0) { - CleanupTestListObjects(config, firstBucket, totalObjects, prefix, fileName); + if (ret = conn.ListObjects(prefix, objects, batchSize, listSingle) != 0) { + CleanupTestListObjects(conn, totalObjects, prefix, fileName); return (ret); } if (objects.size() != expectedResult) { - CleanupTestListObjects(config, firstBucket, totalObjects, prefix, fileName); - return (1); + CleanupTestListObjects(conn, totalObjects, prefix, fileName); + return (TEST_FAILURE); } /* List with 8 objects per AWS request. */ objects.clear(); batchSize = 8; expectedResult = totalObjects; - if (ret = conn.ListObjects(firstBucket, prefix, objects, batchSize) != 0) { - CleanupTestListObjects(config, firstBucket, totalObjects, prefix, fileName); + if (ret = conn.ListObjects(prefix, objects, batchSize) != 0) { + CleanupTestListObjects(conn, totalObjects, prefix, fileName); return (ret); } if (objects.size() != expectedResult) { - CleanupTestListObjects(config, firstBucket, totalObjects, prefix, fileName); - return (1); + CleanupTestListObjects(conn, totalObjects, prefix, fileName); + return (TEST_FAILURE); } /* ListSingle with 8 objects per AWS request. */ objects.clear(); expectedResult = 1; - if (ret = conn.ListObjects(firstBucket, prefix, objects, batchSize, listSingle) != 0) { - CleanupTestListObjects(config, firstBucket, totalObjects, prefix, fileName); + if (ret = conn.ListObjects(prefix, objects, batchSize, listSingle) != 0) { + CleanupTestListObjects(conn, totalObjects, prefix, fileName); return (ret); } if (objects.size() != expectedResult) { - CleanupTestListObjects(config, firstBucket, totalObjects, prefix, fileName); - return (1); + CleanupTestListObjects(conn, totalObjects, prefix, fileName); + return (TEST_FAILURE); } - CleanupTestListObjects(config, firstBucket, totalObjects, prefix, fileName); - return (0); -} - -int -CleanupTestListObjects(const Aws::S3Crt::ClientConfiguration &config, const std::string &bucketName, - const int totalObjects, const std::string &prefix, const std::string &fileName) -{ - /* Delete objects and file at end of test. */ - S3Connection conn(config); - int ret = 0; - for (int i = 0; i < totalObjects; i++) { - if (ret = conn.DeleteObject(bucketName, prefix + std::to_string(i) + ".txt") != 0) - std::cerr << "Error in CleanupTestListBuckets: failed to remove " << prefix - << std::to_string(i) << ".txt from " << bucketName << std::endl; - } - std::remove(fileName.c_str()); - - return (ret); + // CleanupTestListObjects(conn, totalObjects, prefix, fileName); + std::cout << "TestListObjects(): succeeded." << std::endl; + return (TEST_SUCCESS); } /* @@ -222,14 +238,10 @@ CleanupTestListObjects(const Aws::S3Crt::ClientConfiguration &config, const std: int TestObjectExists(const Aws::S3Crt::ClientConfiguration &config) { - S3Connection conn(config); - std::vector<std::string> buckets; + S3Connection conn(config, TestDefaults::bucketName, TestDefaults::objPrefix); bool exists = false; - int ret; + int ret = TEST_FAILURE; - if (ret = conn.ListBuckets(buckets) != 0) - return (ret); - const std::string bucketName = buckets.at(0); const std::string objectName = "test_object"; const std::string fileName = "test_object.txt"; @@ -238,19 +250,20 @@ TestObjectExists(const Aws::S3Crt::ClientConfiguration &config) File << "Test payload"; File.close(); - if (ret = conn.ObjectExists(bucketName, objectName, exists) != 0 || exists) + if (ret = conn.ObjectExists(objectName, exists) != 0 || exists) return (ret); - if (ret = conn.PutObject(bucketName, objectName, fileName) != 0) + if (ret = conn.PutObject(objectName, fileName) != 0) return (ret); - if (ret = conn.ObjectExists(bucketName, objectName, exists) != 0 || !exists) + if (ret = conn.ObjectExists(objectName, exists) != 0 || !exists) return (ret); - if (ret = conn.DeleteObject(bucketName, objectName) != 0) + if (ret = conn.DeleteObject(objectName) != 0) return (ret); - std::cout << "TestObjectExists(): succeeded.\n" << std::endl; - return (0); + + std::cout << "TestObjectExists(): succeeded." << std::endl; + return (ret); } /* @@ -260,6 +273,10 @@ TestObjectExists(const Aws::S3Crt::ClientConfiguration &config) int main() { + /* Setup the test environment. */ + if (setupTestDefaults() != 0) + return (TEST_FAILURE); + /* Set up the config to use the defaults specified. */ Aws::S3Crt::ClientConfiguration awsConfig; awsConfig.region = TestDefaults::region; @@ -270,14 +287,10 @@ main() Aws::SDKOptions options; Aws::InitAPI(options); - int expectedOutput = 0; - TEST(TestListBuckets, awsConfig, expectedOutput); - TEST(TestListObjects, awsConfig, expectedOutput); - - int objectExistsExpectedOutput = 0; - TEST(TestObjectExists, awsConfig, objectExistsExpectedOutput); + TEST(TestObjectExists, awsConfig); + TEST(TestListObjects, awsConfig); /* Shutdown the API at end of tests. */ Aws::ShutdownAPI(options); - return 0; + return (TEST_SUCCESS); } |