summaryrefslogtreecommitdiff
path: root/src/third_party/wiredtiger/ext/storage_sources/s3_store
diff options
context:
space:
mode:
authorWill Korteland <will.korteland@mongodb.com>2022-02-10 04:19:58 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-02-10 05:45:13 +0000
commitc4ba946b1ace9f91dd6def000ede23a673c8c589 (patch)
tree3b1ab03e8b3231d8d9eb353ab8fd2189ca8a0f07 /src/third_party/wiredtiger/ext/storage_sources/s3_store
parent986917d2d1d761cd0fec7a6ce9ed289730b84f1a (diff)
downloadmongo-c4ba946b1ace9f91dd6def000ede23a673c8c589.tar.gz
Import wiredtiger: 8b96413f6f8c6f024d232524275bd03903ca0d72 from branch mongodb-master
ref: 2bb45102b7..8b96413f6f for: 5.3.0 WT-8660 Make bucket name and prefix as part of the S3Connection class
Diffstat (limited to 'src/third_party/wiredtiger/ext/storage_sources/s3_store')
-rw-r--r--src/third_party/wiredtiger/ext/storage_sources/s3_store/s3_connection.cpp96
-rw-r--r--src/third_party/wiredtiger/ext/storage_sources/s3_store/s3_connection.h20
-rw-r--r--src/third_party/wiredtiger/ext/storage_sources/s3_store/s3_log_system.cpp18
-rw-r--r--src/third_party/wiredtiger/ext/storage_sources/s3_store/s3_log_system.h21
-rw-r--r--src/third_party/wiredtiger/ext/storage_sources/s3_store/s3_storage_source.cpp267
-rw-r--r--src/third_party/wiredtiger/ext/storage_sources/s3_store/test/test_s3_connection.cpp243
6 files changed, 348 insertions, 317 deletions
diff --git a/src/third_party/wiredtiger/ext/storage_sources/s3_store/s3_connection.cpp b/src/third_party/wiredtiger/ext/storage_sources/s3_store/s3_connection.cpp
index 35b73fd945c..4cd71ce03c3 100644
--- a/src/third_party/wiredtiger/ext/storage_sources/s3_store/s3_connection.cpp
+++ b/src/third_party/wiredtiger/ext/storage_sources/s3_store/s3_connection.cpp
@@ -12,23 +12,13 @@
#include <vector>
/*
- * ListBuckets --
- * Builds a list of buckets from AWS account into a vector. Returns 0 if success, otherwise
- * 1.
+ * S3Connection --
+ * Constructor for AWS S3 bucket connection.
*/
-int
-S3Connection::ListBuckets(std::vector<std::string> &buckets) const
+S3Connection::S3Connection(const Aws::S3Crt::ClientConfiguration &config,
+ const std::string &bucketName, const std::string &objPrefix)
+ : _s3CrtClient(config), _bucketName(bucketName), _objectPrefix(objPrefix)
{
- auto outcome = s3CrtClient.ListBuckets();
- if (outcome.IsSuccess()) {
- for (const auto &bucket : outcome.GetResult().GetBuckets())
- buckets.push_back(bucket.GetName());
- return (0);
- } else {
- std::cerr << "Error in ListBuckets: " << outcome.GetError().GetMessage() << std::endl
- << std::endl;
- return (1);
- }
}
/*
@@ -38,20 +28,15 @@ S3Connection::ListBuckets(std::vector<std::string> &buckets) const
* to 1000. Returns 0 if success, otherwise 1.
*/
int
-S3Connection::ListObjects(const std::string &bucketName, const std::string &prefix,
- std::vector<std::string> &objects, uint32_t batchSize, bool listSingle) const
+S3Connection::ListObjects(const std::string &prefix, std::vector<std::string> &objects,
+ uint32_t batchSize, bool listSingle) const
{
Aws::S3Crt::Model::ListObjectsV2Request request;
- request.SetBucket(bucketName);
- request.SetPrefix(prefix);
-
- if (listSingle)
- request.SetMaxKeys(1);
- else
- request.SetMaxKeys(batchSize);
-
- Aws::S3Crt::Model::ListObjectsV2Outcome outcomes = s3CrtClient.ListObjectsV2(request);
+ request.SetBucket(_bucketName);
+ request.SetPrefix(_objectPrefix + prefix);
+ listSingle ? request.SetMaxKeys(1) : request.SetMaxKeys(batchSize);
+ Aws::S3Crt::Model::ListObjectsV2Outcome outcomes = _s3CrtClient.ListObjectsV2(request);
if (!outcomes.IsSuccess())
return (1);
auto result = outcomes.GetResult();
@@ -65,7 +50,7 @@ S3Connection::ListObjects(const std::string &bucketName, const std::string &pref
std::string continuationToken = result.GetNextContinuationToken();
while (continuationToken != "") {
request.SetContinuationToken(continuationToken);
- outcomes = s3CrtClient.ListObjectsV2(request);
+ outcomes = _s3CrtClient.ListObjectsV2(request);
if (!outcomes.IsSuccess())
return (1);
result = outcomes.GetResult();
@@ -81,24 +66,23 @@ S3Connection::ListObjects(const std::string &bucketName, const std::string &pref
* Puts an object into an S3 bucket. Returns 0 if success, otherwise 1.
*/
int
-S3Connection::PutObject(
- const std::string &bucketName, const std::string &objectKey, const std::string &fileName) const
+S3Connection::PutObject(const std::string &objectKey, const std::string &fileName) const
{
std::shared_ptr<Aws::IOStream> inputData = Aws::MakeShared<Aws::FStream>(
"s3-source", fileName.c_str(), std::ios_base::in | std::ios_base::binary);
Aws::S3Crt::Model::PutObjectRequest request;
- request.SetBucket(bucketName);
- request.SetKey(objectKey);
+ request.SetBucket(_bucketName);
+ request.SetKey(_objectPrefix + objectKey);
request.SetBody(inputData);
- Aws::S3Crt::Model::PutObjectOutcome outcome = s3CrtClient.PutObject(request);
+ Aws::S3Crt::Model::PutObjectOutcome outcome = _s3CrtClient.PutObject(request);
if (outcome.IsSuccess()) {
return (0);
- } else {
- std::cerr << "Error in PutObject: " << outcome.GetError().GetMessage() << std::endl;
- return (1);
}
+
+ std::cerr << "Error in PutObject: " << outcome.GetError().GetMessage() << std::endl;
+ return (1);
}
/*
@@ -106,20 +90,18 @@ S3Connection::PutObject(
* Deletes an object from S3 bucket. Returns 0 if success, otherwise 1.
*/
int
-S3Connection::DeleteObject(const std::string &bucketName, const std::string &objectKey) const
+S3Connection::DeleteObject(const std::string &objectKey) const
{
Aws::S3Crt::Model::DeleteObjectRequest request;
- request.SetBucket(bucketName);
- request.SetKey(objectKey);
-
- Aws::S3Crt::Model::DeleteObjectOutcome outcome = s3CrtClient.DeleteObject(request);
+ request.SetBucket(_bucketName);
+ request.SetKey(_objectPrefix + objectKey);
- if (outcome.IsSuccess()) {
+ Aws::S3Crt::Model::DeleteObjectOutcome outcome = _s3CrtClient.DeleteObject(request);
+ if (outcome.IsSuccess())
return (0);
- } else {
- std::cerr << "Error in DeleteObject: " << outcome.GetError().GetMessage() << std::endl;
- return (1);
- }
+
+ std::cerr << "Error in DeleteObject: " << outcome.GetError().GetMessage() << std::endl;
+ return (1);
}
/*
@@ -127,15 +109,14 @@ S3Connection::DeleteObject(const std::string &bucketName, const std::string &obj
* Checks whether an object with the given key exists in the S3 bucket.
*/
int
-S3Connection::ObjectExists(
- const std::string &bucketName, const std::string &objectKey, bool &exists) const
+S3Connection::ObjectExists(const std::string &objectKey, bool &exists) const
{
exists = false;
Aws::S3Crt::Model::HeadObjectRequest request;
- request.SetBucket(bucketName);
- request.SetKey(objectKey);
- Aws::S3Crt::Model::HeadObjectOutcome outcome = s3CrtClient.HeadObject(request);
+ request.SetBucket(_bucketName);
+ request.SetKey(_objectPrefix + objectKey);
+ Aws::S3Crt::Model::HeadObjectOutcome outcome = _s3CrtClient.HeadObject(request);
/*
* If an object with the given key does not exist the HEAD request will return a 404.
@@ -146,12 +127,11 @@ S3Connection::ObjectExists(
return (0);
} else if (outcome.GetError().GetResponseCode() == Aws::Http::HttpResponseCode::NOT_FOUND)
return (0);
- else
- return (-1);
-}
-/*
- * S3Connection --
- * Constructor for AWS S3 bucket connection.
- */
-S3Connection::S3Connection(const Aws::S3Crt::ClientConfiguration &config) : s3CrtClient(config){};
+ /*
+ * Fix later, return a proper error code. Not sure if we always have
+ * outcome.GetError().GetResponseCode()
+ */
+ std::cerr << "Error in ObjectExists." << std::endl;
+ return (1);
+}
diff --git a/src/third_party/wiredtiger/ext/storage_sources/s3_store/s3_connection.h b/src/third_party/wiredtiger/ext/storage_sources/s3_store/s3_connection.h
index 5024eb10d46..d748378b8da 100644
--- a/src/third_party/wiredtiger/ext/storage_sources/s3_store/s3_connection.h
+++ b/src/third_party/wiredtiger/ext/storage_sources/s3_store/s3_connection.h
@@ -14,18 +14,18 @@
*/
class S3Connection {
public:
- explicit S3Connection(const Aws::S3Crt::ClientConfiguration &config);
- int ListBuckets(std::vector<std::string> &buckets) const;
- int ListObjects(const std::string &bucketName, const std::string &prefix,
- std::vector<std::string> &objects, uint32_t batchSize = 1000, bool listSingle = false) const;
- int PutObject(const std::string &bucketName, const std::string &objectKey,
- const std::string &fileName) const;
- int DeleteObject(const std::string &bucketName, const std::string &objectKey) const;
- int ObjectExists(
- const std::string &bucketName, const std::string &objectKey, bool &exists) const;
+ explicit S3Connection(const Aws::S3Crt::ClientConfiguration &config,
+ const std::string &bucketName, const std::string &objPrefix = "");
+ int ListObjects(const std::string &prefix, std::vector<std::string> &objects,
+ uint32_t batchSize = 1000, bool listSingle = false) const;
+ int PutObject(const std::string &objectKey, const std::string &fileName) const;
+ int DeleteObject(const std::string &objectKey) const;
+ int ObjectExists(const std::string &objectKey, bool &exists) const;
~S3Connection() = default;
private:
- const Aws::S3Crt::S3CrtClient s3CrtClient;
+ const Aws::S3Crt::S3CrtClient _s3CrtClient;
+ const std::string _bucketName;
+ const std::string _objectPrefix;
};
#endif
diff --git a/src/third_party/wiredtiger/ext/storage_sources/s3_store/s3_log_system.cpp b/src/third_party/wiredtiger/ext/storage_sources/s3_store/s3_log_system.cpp
index 4835adabbf5..f5aadbdd676 100644
--- a/src/third_party/wiredtiger/ext/storage_sources/s3_store/s3_log_system.cpp
+++ b/src/third_party/wiredtiger/ext/storage_sources/s3_store/s3_log_system.cpp
@@ -3,15 +3,13 @@
#include <cstdarg>
S3LogSystem::S3LogSystem(WT_EXTENSION_API *wtApi, uint32_t wtVerbosityLevel)
+ : _wtApi(wtApi), _wtVerbosityLevel(wtVerbosityLevel)
{
// If the verbosity level is out of range it will default to AWS SDK Error level.
- if (verbosityMapping.find(wtVerbosityLevel) != verbosityMapping.end()) {
- awsLogLevel = verbosityMapping.at(wtVerbosityLevel);
- } else {
- awsLogLevel = Aws::Utils::Logging::LogLevel::Error;
- }
- this->wtApi = wtApi;
- this->wtVerbosityLevel = wtVerbosityLevel;
+ if (verbosityMapping.find(wtVerbosityLevel) != verbosityMapping.end())
+ _awsLogLevel = verbosityMapping.at(wtVerbosityLevel);
+ else
+ _awsLogLevel = Aws::Utils::Logging::LogLevel::Error;
}
void
@@ -50,14 +48,14 @@ S3LogSystem::LogStream(
void
S3LogSystem::LogAwsMessage(const char *tag, const std::string &message) const
{
- wtApi->err_printf(wtApi, NULL, "%s : %s", tag, message.c_str());
+ _wtApi->err_printf(_wtApi, NULL, "%s : %s", tag, message.c_str());
}
void
S3LogSystem::LogVerboseMessage(int32_t verbosityLevel, const std::string &message)
{
- if (verbosityLevel <= wtVerbosityLevel)
- wtApi->err_printf(wtApi, NULL, "%s", message.c_str());
+ if (verbosityLevel <= _wtVerbosityLevel)
+ _wtApi->err_printf(_wtApi, NULL, "%s", message.c_str());
}
void
diff --git a/src/third_party/wiredtiger/ext/storage_sources/s3_store/s3_log_system.h b/src/third_party/wiredtiger/ext/storage_sources/s3_store/s3_log_system.h
index 36dabc69fc3..bf527d6fdff 100644
--- a/src/third_party/wiredtiger/ext/storage_sources/s3_store/s3_log_system.h
+++ b/src/third_party/wiredtiger/ext/storage_sources/s3_store/s3_log_system.h
@@ -6,6 +6,13 @@
#include <atomic>
+// Mapping the desired WiredTiger extension verbosity level to a rough equivalent AWS
+// SDK verbosity level.
+static const std::map<int32_t, Aws::Utils::Logging::LogLevel> verbosityMapping = {
+ {-3, Aws::Utils::Logging::LogLevel::Error}, {-2, Aws::Utils::Logging::LogLevel::Warn},
+ {-1, Aws::Utils::Logging::LogLevel::Info}, {0, Aws::Utils::Logging::LogLevel::Info},
+ {1, Aws::Utils::Logging::LogLevel::Debug}};
+
class S3LogSystem : public Aws::Utils::Logging::LogSystemInterface {
public:
@@ -13,7 +20,7 @@ class S3LogSystem : public Aws::Utils::Logging::LogSystemInterface {
Aws::Utils::Logging::LogLevel
GetLogLevel(void) const override
{
- return awsLogLevel;
+ return (_awsLogLevel);
}
void Log(
Aws::Utils::Logging::LogLevel logLevel, const char *tag, const char *format, ...) override;
@@ -24,13 +31,7 @@ class S3LogSystem : public Aws::Utils::Logging::LogSystemInterface {
private:
void LogAwsMessage(const char *tag, const std::string &message) const;
void LogVerboseMessage(int32_t verbosityLevel, const std::string &message);
- std::atomic<Aws::Utils::Logging::LogLevel> awsLogLevel;
- WT_EXTENSION_API *wtApi;
- int32_t wtVerbosityLevel;
+ std::atomic<Aws::Utils::Logging::LogLevel> _awsLogLevel;
+ WT_EXTENSION_API *_wtApi;
+ int32_t _wtVerbosityLevel;
};
-// Mapping the desired WiredTiger extension verbosity level to a rough equivalent AWS
-// SDK verbosity level.
-static const std::map<int32_t, Aws::Utils::Logging::LogLevel> verbosityMapping = {
- {-3, Aws::Utils::Logging::LogLevel::Error}, {-2, Aws::Utils::Logging::LogLevel::Warn},
- {-1, Aws::Utils::Logging::LogLevel::Info}, {0, Aws::Utils::Logging::LogLevel::Info},
- {1, Aws::Utils::Logging::LogLevel::Debug}};
diff --git a/src/third_party/wiredtiger/ext/storage_sources/s3_store/s3_storage_source.cpp b/src/third_party/wiredtiger/ext/storage_sources/s3_store/s3_storage_source.cpp
index 8fa6f5bcf88..e4d095d536a 100644
--- a/src/third_party/wiredtiger/ext/storage_sources/s3_store/s3_storage_source.cpp
+++ b/src/third_party/wiredtiger/ext/storage_sources/s3_store/s3_storage_source.cpp
@@ -25,11 +25,11 @@
* ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
* OTHER DEALINGS IN THE SOFTWARE.
*/
-
#include <wiredtiger.h>
#include <wiredtiger_ext.h>
#include <sys/stat.h>
#include <fstream>
+#include <list>
#include <errno.h>
#include <unistd.h>
@@ -42,23 +42,29 @@
#define UNUSED(x) (void)(x)
#define FS2S3(fs) (((S3_FILE_SYSTEM *)(fs))->storage)
+struct S3_FILE_SYSTEM;
+
/* S3 storage source structure. */
-typedef struct {
+struct S3_STORAGE {
WT_STORAGE_SOURCE storageSource; /* Must come first */
WT_EXTENSION_API *wtApi; /* Extension API */
+
+ std::mutex fsListMutex; /* Protect the file system list */
+ std::list<S3_FILE_SYSTEM *> fsList; /* List of initiated file systems */
+
+ uint32_t referenceCount; /* Number of references to this storge source */
int32_t verbose;
-} S3_STORAGE;
+};
-typedef struct {
+struct S3_FILE_SYSTEM {
/* Must come first - this is the interface for the file system we are implementing. */
WT_FILE_SYSTEM fileSystem;
S3Connection *connection;
S3LogSystem *log;
S3_STORAGE *storage;
- std::string bucketName;
std::string cacheDir; /* Directory for cached objects */
std::string homeDir; /* Owned by the connection */
-} S3_FILE_SYSTEM;
+};
/* Configuration variables for connecting to S3CrtClient. */
const Aws::String region = Aws::Region::AP_SOUTHEAST_2;
@@ -88,26 +94,6 @@ static int S3ObjectListSingle(
static int S3ObjectListFree(WT_FILE_SYSTEM *, WT_SESSION *, char **, uint32_t);
/*
- * S3Exist--
- * Return if the file exists. First checks the cache, and then the S3 Bucket.
- */
-static int
-S3Exist(WT_FILE_SYSTEM *fileSystem, WT_SESSION *session, const char *name, bool *exist)
-{
- S3_STORAGE *s3;
- int ret;
- s3 = FS2S3(fileSystem);
- S3_FILE_SYSTEM *fs = (S3_FILE_SYSTEM *)fileSystem;
-
- /* It's not in the cache, try the S3 bucket. */
- *exist = S3CacheExists(fileSystem, name);
- if (!*exist)
- ret = fs->connection->ObjectExists(fs->bucketName, name, *exist);
-
- return (ret);
-}
-
-/*
* S3Path --
* Construct a pathname from the directory and the object name.
*/
@@ -128,6 +114,25 @@ S3Path(const std::string &dir, const std::string &name)
}
/*
+ * S3Exist--
+ * Return if the file exists. First checks the cache, and then the S3 Bucket.
+ */
+static int
+S3Exist(WT_FILE_SYSTEM *fileSystem, WT_SESSION *session, const char *name, bool *exist)
+{
+ S3_STORAGE *s3;
+ s3 = FS2S3(fileSystem);
+ S3_FILE_SYSTEM *fs = (S3_FILE_SYSTEM *)fileSystem;
+
+ /* It's not in the cache, try the S3 bucket. */
+ *exist = S3CacheExists(fileSystem, name);
+ if (!*exist)
+ return (fs->connection->ObjectExists(name, *exist));
+
+ return (0);
+}
+
+/*
* S3CacheExists --
* Checks whether the given file exists in the cache.
*/
@@ -181,106 +186,83 @@ static int
S3CustomizeFileSystem(WT_STORAGE_SOURCE *storageSource, WT_SESSION *session, const char *bucketName,
const char *authToken, const char *config, WT_FILE_SYSTEM **fileSystem)
{
- S3_STORAGE *s3;
S3_FILE_SYSTEM *fs;
+ S3_STORAGE *s3;
int ret;
- WT_CONFIG_ITEM cacheDir;
- std::string cacheStr;
+ std::string cacheDir, homeDir;
s3 = (S3_STORAGE *)storageSource;
/* Mark parameters as unused for now, until implemented. */
UNUSED(authToken);
- Aws::S3Crt::ClientConfiguration awsConfig;
- awsConfig.region = region;
- awsConfig.throughputTargetGbps = throughputTargetGbps;
- awsConfig.partSize = partSize;
+ /* We need to have a bucket to setup the file system. */
+ if (bucketName == NULL || strlen(bucketName) == 0) {
+ std::cerr << "Error: Bucket not specified";
+ return (EINVAL);
+ }
- /* Parse configuration string. */
- ret = s3->wtApi->config_get_string(s3->wtApi, session, config, "cache_directory", &cacheDir);
- if (ret == 0)
- cacheStr = cacheDir.str;
- else if (ret == WT_NOTFOUND)
+ /*
+ * Parse configuration string.
+ */
+
+ /* Get any prefix to be used for the object keys. */
+ WT_CONFIG_ITEM objPrefixConf;
+ std::string objPrefix;
+ if ((ret = s3->wtApi->config_get_string(
+ s3->wtApi, session, config, "prefix", &objPrefixConf)) == 0)
+ objPrefix = objPrefixConf.str;
+ else if (ret != WT_NOTFOUND) {
+ std::cerr << "Error: customize_file_system: config parsing for object prefix";
+ return 1;
+ }
+
+ /*
+ * Get the directory to setup the cache, or use the default one. The default cache directory is
+ * named "cache-<name>", where name is the last component of the bucket name's path. We'll
+ * create it if it doesn't exist.
+ */
+ WT_CONFIG_ITEM cacheDirConf;
+ std::string cacheStr;
+ if ((ret = s3->wtApi->config_get_string(
+ s3->wtApi, session, config, "cache_directory", &cacheDirConf)) == 0)
+ cacheStr = cacheDirConf.str;
+ else if (ret == WT_NOTFOUND) {
+ cacheStr = "cache-" + std::string(bucketName);
ret = 0;
- else
+ } else
return (ret);
- Aws::Utils::Logging::InitializeAWSLogging(
- Aws::MakeShared<S3LogSystem>("storage", s3->wtApi, s3->verbose));
+ /* Store a copy of the home directory in the file system. */
+ homeDir = session->connection->get_home(session->connection);
+ if ((ret = S3GetDirectory(homeDir, cacheStr, true, cacheDir)) != 0)
+ return (ret);
+
+ /* Create the file system. */
if ((fs = (S3_FILE_SYSTEM *)calloc(1, sizeof(S3_FILE_SYSTEM))) == NULL)
return (errno);
fs->storage = s3;
+ fs->homeDir = homeDir;
+ fs->cacheDir = cacheDir;
- /* Store a copy of the home directory and bucket name in the file system. */
- fs->homeDir = session->connection->get_home(session->connection);
- fs->bucketName = bucketName;
-
- /*
- * The default cache directory is named "cache-<name>", where name is the last component of the
- * bucket name's path. We'll create it if it doesn't exist.
- */
- if (cacheStr.empty()) {
- cacheStr = "cache-" + fs->bucketName;
- fs->cacheDir = cacheStr;
- }
- if ((ret = S3GetDirectory(fs->homeDir, cacheStr, true, fs->cacheDir)) != 0)
- return (ret);
+ Aws::S3Crt::ClientConfiguration awsConfig;
+ awsConfig.region = region;
+ awsConfig.throughputTargetGbps = throughputTargetGbps;
+ awsConfig.partSize = partSize;
/* New can fail; will deal with this later. */
- fs->connection = new S3Connection(awsConfig);
+ fs->connection = new S3Connection(awsConfig, bucketName, objPrefix);
fs->fileSystem.fs_directory_list = S3ObjectList;
fs->fileSystem.fs_directory_list_single = S3ObjectListSingle;
fs->fileSystem.fs_directory_list_free = S3ObjectListFree;
fs->fileSystem.terminate = S3FileSystemTerminate;
fs->fileSystem.fs_exist = S3Exist;
- /* TODO: Move these into tests. Just testing here temporarily to show all functions work. */
+ /* Add to the list of the active file systems. */
{
- std::vector<std::string> buckets;
- fs->connection->ListBuckets(buckets);
- std::cout << "All buckets under my account:" << std::endl;
- for (const std::string &bucket : buckets)
- std::cout << " * " << bucket << std::endl;
- std::cout << std::endl;
-
- /* Have at least one bucket to use. */
- if (!buckets.empty()) {
- const std::string firstBucket = buckets.at(0);
-
- /* Put object. */
- fs->connection->PutObject(firstBucket, "WiredTiger.turtle", "WiredTiger.turtle");
-
- /* Testing directory list. */
- WT_SESSION *session = NULL;
- const char *prefix = "WiredTiger";
- char **objectList;
- uint32_t count;
-
- fs->fileSystem.fs_directory_list(
- &fs->fileSystem, session, firstBucket.c_str(), prefix, &objectList, &count);
- std::cout << "Objects in bucket '" << firstBucket << "':" << std::endl;
- for (int i = 0; i < count; i++)
- std::cout << (objectList)[i] << std::endl;
-
- std::cout << "Number of objects retrieved: " << count << std::endl;
- fs->fileSystem.fs_directory_list_free(&fs->fileSystem, session, objectList, count);
-
- fs->fileSystem.fs_directory_list_single(
- &fs->fileSystem, session, firstBucket.c_str(), prefix, &objectList, &count);
-
- std::cout << "Objects in bucket '" << firstBucket << "':" << std::endl;
- for (int i = 0; i < count; i++)
- std::cout << (objectList)[i] << std::endl;
-
- std::cout << "Number of objects retrieved: " << count << std::endl;
- fs->fileSystem.fs_directory_list_free(&fs->fileSystem, session, objectList, count);
-
- /* Delete object. */
- fs->connection->DeleteObject(firstBucket, "WiredTiger.turtle");
- } else
- std::cout << "No buckets in AWS account." << std::endl;
+ std::lock_guard<std::mutex> lockGuard(s3->fsListMutex);
+ s3->fsList.push_back(fs);
}
*fileSystem = &fs->fileSystem;
@@ -295,9 +277,15 @@ static int
S3FileSystemTerminate(WT_FILE_SYSTEM *fileSystem, WT_SESSION *session)
{
S3_FILE_SYSTEM *fs = (S3_FILE_SYSTEM *)fileSystem;
+ S3_STORAGE *s3 = fs->storage;
UNUSED(session); /* unused */
+ /* Remove from the active filesystems list. */
+ {
+ std::lock_guard<std::mutex> lockGuard(s3->fsListMutex);
+ s3->fsList.remove(fs);
+ }
delete (fs->connection);
free(fs);
@@ -309,13 +297,24 @@ S3FileSystemTerminate(WT_FILE_SYSTEM *fileSystem, WT_SESSION *session)
* Return a list of object names for the given location.
*/
static int
-S3ObjectList(WT_FILE_SYSTEM *fileSystem, WT_SESSION *session, const char *bucket,
+S3ObjectList(WT_FILE_SYSTEM *fileSystem, WT_SESSION *session, const char *directory,
const char *prefix, char ***objectList, uint32_t *count)
{
S3_FILE_SYSTEM *fs = (S3_FILE_SYSTEM *)fileSystem;
std::vector<std::string> objects;
+ std::string completePrefix;
+
+ if (directory != NULL) {
+ completePrefix += directory;
+ /* Add a terminating '/' if one doesn't exist. */
+ if (completePrefix.length() > 1 && completePrefix[completePrefix.length() - 1] != '/')
+ completePrefix += '/';
+ }
+ if (prefix != NULL)
+ completePrefix += prefix;
+
int ret;
- if (ret = fs->connection->ListObjects(bucket, prefix, objects) != 0)
+ if (ret = fs->connection->ListObjects(completePrefix, objects) != 0)
return (ret);
*count = objects.size();
@@ -329,13 +328,24 @@ S3ObjectList(WT_FILE_SYSTEM *fileSystem, WT_SESSION *session, const char *bucket
* Return a single object name for the given location.
*/
static int
-S3ObjectListSingle(WT_FILE_SYSTEM *fileSystem, WT_SESSION *session, const char *bucket,
+S3ObjectListSingle(WT_FILE_SYSTEM *fileSystem, WT_SESSION *session, const char *directory,
const char *prefix, char ***objectList, uint32_t *count)
{
S3_FILE_SYSTEM *fs = (S3_FILE_SYSTEM *)fileSystem;
std::vector<std::string> objects;
+ std::string completePrefix;
+
+ if (directory != NULL) {
+ completePrefix += directory;
+ /* Add a terminating '/' if one doesn't exist. */
+ if (completePrefix.length() > 1 && completePrefix[completePrefix.length() - 1] != '/')
+ completePrefix += '/';
+ }
+ if (prefix != NULL)
+ completePrefix += prefix;
+
int ret;
- if (ret = fs->connection->ListObjects(bucket, prefix, objects, 1, true) != 0)
+ if (ret = fs->connection->ListObjects(completePrefix, objects, 1, true) != 0)
return (ret);
*count = objects.size();
@@ -388,7 +398,15 @@ S3ObjectListAdd(
static int
S3AddReference(WT_STORAGE_SOURCE *storageSource)
{
- UNUSED(storageSource);
+ S3_STORAGE *s3 = (S3_STORAGE *)storageSource;
+
+ /*
+ * Missing reference or overflow?
+ */
+ if (s3->referenceCount == 0 || s3->referenceCount + 1 == 0)
+ return (EINVAL);
+
+ ++s3->referenceCount;
return (0);
}
@@ -397,14 +415,27 @@ S3AddReference(WT_STORAGE_SOURCE *storageSource)
* Discard any resources on termination.
*/
static int
-S3Terminate(WT_STORAGE_SOURCE *storage, WT_SESSION *session)
+S3Terminate(WT_STORAGE_SOURCE *storageSource, WT_SESSION *session)
{
- S3_STORAGE *s3;
- s3 = (S3_STORAGE *)storage;
+ S3_STORAGE *s3 = (S3_STORAGE *)storageSource;
+ if (--s3->referenceCount != 0)
+ return (0);
+
+ /*
+ * Terminate any active filesystems. There are no references to the storage source, so it is
+ * safe to walk the active filesystem list without a lock. The removal from the list happens
+ * under a lock. Also, removal happens from the front and addition at the end, so we are safe.
+ */
+ while (!s3->fsList.empty()) {
+ S3_FILE_SYSTEM *fs = s3->fsList.front();
+ S3FileSystemTerminate(&fs->fileSystem, session);
+ }
+
+ Aws::Utils::Logging::ShutdownAWSLogging();
Aws::ShutdownAPI(options);
+ delete (s3);
- free(s3);
return (0);
}
@@ -417,7 +448,7 @@ S3Flush(WT_STORAGE_SOURCE *storageSource, WT_SESSION *session, WT_FILE_SYSTEM *f
const char *source, const char *object, const char *config)
{
S3_FILE_SYSTEM *fs = (S3_FILE_SYSTEM *)fileSystem;
- return (fs->connection->PutObject(fs->bucketName, object, source));
+ return (fs->connection->PutObject(object, source));
}
/*
@@ -428,9 +459,10 @@ static int
S3FlushFinish(WT_STORAGE_SOURCE *storage, WT_SESSION *session, WT_FILE_SYSTEM *fileSystem,
const char *source, const char *object, const char *config)
{
+ S3_FILE_SYSTEM *fs = (S3_FILE_SYSTEM *)fileSystem;
/* Constructing the pathname for source and cache from file system and local. */
- std::string srcPath = S3Path(((S3_FILE_SYSTEM *)fileSystem)->homeDir, source);
- std::string destPath = S3Path(((S3_FILE_SYSTEM *)fileSystem)->cacheDir, source);
+ std::string srcPath = S3Path(fs->homeDir, source);
+ std::string destPath = S3Path(fs->cacheDir, source);
/* Linking file with the local file. */
int ret = link(srcPath.c_str(), destPath.c_str());
@@ -452,8 +484,7 @@ wiredtiger_extension_init(WT_CONNECTION *connection, WT_CONFIG_ARG *config)
S3_FILE_SYSTEM *fs;
WT_CONFIG_ITEM v;
- if ((s3 = (S3_STORAGE *)calloc(1, sizeof(S3_STORAGE))) == NULL)
- return (errno);
+ s3 = new S3_STORAGE;
s3->wtApi = connection->get_extension_api(connection);
@@ -469,6 +500,9 @@ wiredtiger_extension_init(WT_CONNECTION *connection, WT_CONFIG_ARG *config)
return (ret != 0 ? ret : EINVAL);
}
+ /* Create a logger for this storage source, and then initialize the AWS SDK. */
+ Aws::Utils::Logging::InitializeAWSLogging(
+ Aws::MakeShared<S3LogSystem>("storage", s3->wtApi, s3->verbose));
Aws::InitAPI(options);
/*
@@ -481,6 +515,11 @@ wiredtiger_extension_init(WT_CONNECTION *connection, WT_CONFIG_ARG *config)
s3->storageSource.ss_flush = S3Flush;
s3->storageSource.ss_flush_finish = S3FlushFinish;
+ /*
+ * The first reference is implied by the call to add_storage_source.
+ */
+ s3->referenceCount = 1;
+
/* Load the storage */
if ((ret = connection->add_storage_source(connection, "s3_store", &s3->storageSource, NULL)) !=
0)
diff --git a/src/third_party/wiredtiger/ext/storage_sources/s3_store/test/test_s3_connection.cpp b/src/third_party/wiredtiger/ext/storage_sources/s3_store/test/test_s3_connection.cpp
index 93980ef1529..8b94f4a2b62 100644
--- a/src/third_party/wiredtiger/ext/storage_sources/s3_store/test/test_s3_connection.cpp
+++ b/src/third_party/wiredtiger/ext/storage_sources/s3_store/test/test_s3_connection.cpp
@@ -1,73 +1,104 @@
#include <s3_connection.h>
#include <fstream>
+#include <random>
-#include <fstream>
-
-/* Default config settings for the S3CrtClient. */
+/* Default config settings for the Test environment. */
namespace TestDefaults {
const Aws::String region = Aws::Region::AP_SOUTHEAST_2;
const double throughputTargetGbps = 5;
-const uint64_t partSize = 8 * 1024 * 1024; /* 8 MB. */
+const uint64_t partSize = 8 * 1024 * 1024; /* 8 MB. */
+static std::string bucketName("s3testext"); // Can be overridden with environment variables.
+static std::string objPrefix("s3test_artefacts/unit_"); // To be concatenated with a random string.
} // namespace TestDefaults
-int TestListBuckets(const Aws::S3Crt::ClientConfiguration &config);
+#define TEST_SUCCESS 0
+#define TEST_FAILURE 1
+
int TestListObjects(const Aws::S3Crt::ClientConfiguration &config);
int TestObjectExists(const Aws::S3Crt::ClientConfiguration &config);
-int CleanupTestListObjects(const Aws::S3Crt::ClientConfiguration &config,
- const std::string &bucketName, const int totalObjects, const std::string &prefix,
- const std::string &fileName);
-
/* Wrapper for unit test functions. */
-#define TEST(func, config, expectedOutput) \
- do { \
- int __ret; \
- if ((__ret = (func(config))) != expectedOutput) \
- return (__ret); \
+#define TEST(func, config) \
+ do { \
+ int __ret; \
+ if ((__ret = (func(config))) != TEST_SUCCESS) \
+ return (__ret); \
} while (0)
/*
- * TestListBuckets --
- * Example of a unit test to list S3 buckets under the associated AWS account.
+ * randomizeTestPrefix --
+ * Concatenates a random suffix to the prefix being used for the test object keys. Example of
+ * generated test prefix: "s3test_artefacts/unit_" 2022-31-01-16-34-10_623843294/"
*/
-int
-TestListBuckets(const Aws::S3Crt::ClientConfiguration &config)
+static int
+randomizeTestPrefix()
{
- int ret = 0;
- S3Connection conn(config);
- std::vector<std::string> buckets;
- if (ret = conn.ListBuckets(buckets) != 0)
- return (ret);
+ char timeStr[100];
+ std::time_t t = std::time(nullptr);
+
+ if (std::strftime(timeStr, sizeof(timeStr), "%F-%H-%M-%S", std::localtime(&t)) == 0)
+ return (TEST_FAILURE);
+
+ TestDefaults::objPrefix += timeStr;
+
+ /* Create a random device and use it to generate a random seed to initialize the generator. */
+ std::random_device myRandomDevice;
+ unsigned seed = myRandomDevice();
+ std::default_random_engine myRandomEngine(seed);
+
+ TestDefaults::objPrefix += '_' + std::to_string(myRandomEngine());
+ TestDefaults::objPrefix += '/';
+
+ return (TEST_SUCCESS);
+}
+
+/*
+ * setupTestDefaults --
+ * Override the defaults with the ones specific for this test instance.
+ */
+static int
+setupTestDefaults()
+{
+ /* Prefer to use the bucket provided through the environment variable. */
+ const char *envBucket = std::getenv("WT_S3_EXT_BUCKET");
+ if (envBucket != NULL)
+ TestDefaults::bucketName = envBucket;
+ std::cout << "Bucket to be used for testing: " << TestDefaults::bucketName << std::endl;
+
+ /* Append the prefix to be used for object names by a unique string. */
+ if (randomizeTestPrefix() != 0)
+ return (TEST_FAILURE);
+ std::cout << "Generated prefix: " << TestDefaults::objPrefix << std::endl;
+
+ return (TEST_SUCCESS);
+}
- std::cout << "All buckets under my account:" << std::endl;
- for (const auto &bucket : buckets)
- std::cout << " * " << bucket << std::endl;
+static int
+CleanupTestListObjects(S3Connection &conn, const int totalObjects, const std::string &prefix,
+ const std::string &fileName)
+{
+ /* Delete objects and file at end of test. */
+ int ret = 0;
+ for (int i = 0; i < totalObjects; i++) {
+ if (ret = conn.DeleteObject(prefix + std::to_string(i) + ".txt") != 0)
+ std::cerr << "Error in CleanupTestListBuckets: failed to remove "
+ << TestDefaults::objPrefix + prefix << std::to_string(i) << ".txt from "
+ << TestDefaults::bucketName << std::endl;
+ }
+ std::remove(fileName.c_str());
return (ret);
}
/*
* TestListObjects --
- * Unit test for listing S3 objects under the first bucket in the associated AWS account. This
- * test assumes there are initially no objects with the prefix of "test_list_objects_" in the
- * bucket.
+ * Unit test for listing S3 objects under the test bucket.
*/
+/* Todo: Remove code duplication in this function. */
int
TestListObjects(const Aws::S3Crt::ClientConfiguration &config)
{
- S3Connection conn(config);
-
- /* Temporary workaround to get a bucket to use. */
- std::vector<std::string> buckets;
- int ret;
- if (ret = conn.ListBuckets(buckets) != 0)
- return (ret);
- if (buckets.empty()) {
- std::cout << "No buckets found in AWS account." << std::endl;
- return (1);
- }
-
- const std::string firstBucket = buckets.at(0);
+ S3Connection conn(config, TestDefaults::bucketName, TestDefaults::objPrefix);
std::vector<std::string> objects;
/* Name of file to insert in the test. */
@@ -83,136 +114,121 @@ TestListObjects(const Aws::S3Crt::ClientConfiguration &config)
/* Expected number of matches. */
int32_t expectedResult = 0;
+ int ret;
/* No matching objects. */
- if (ret = conn.ListObjects(firstBucket, prefix, objects) != 0)
+ if (ret = conn.ListObjects(prefix, objects) != 0)
return (ret);
if (objects.size() != expectedResult)
- return (1);
+ return (TEST_FAILURE);
/* No matching objects with listSingle. */
- if (ret = conn.ListObjects(firstBucket, prefix, objects, batchSize, listSingle) != 0)
+ if (ret = conn.ListObjects(prefix, objects, batchSize, listSingle) != 0)
return (ret);
if (objects.size() != expectedResult)
- return (1);
+ return (TEST_FAILURE);
/* Create file to prepare for test. */
if (!static_cast<bool>(std::ofstream(fileName).put('.'))) {
std::cerr << "Error creating file." << std::endl;
- return (1);
+ return (TEST_FAILURE);
}
/* Put objects to prepare for test. */
for (int i = 0; i < totalObjects; i++) {
- if (ret = conn.PutObject(firstBucket, prefix + std::to_string(i) + ".txt", fileName) != 0) {
- CleanupTestListObjects(config, firstBucket, i, prefix, fileName);
+ if (ret = conn.PutObject(prefix + std::to_string(i) + ".txt", fileName) != 0) {
+ CleanupTestListObjects(conn, i, prefix, fileName);
return (ret);
}
}
/* List all objects. */
expectedResult = totalObjects;
- if (ret = conn.ListObjects(firstBucket, prefix, objects) != 0) {
- CleanupTestListObjects(config, firstBucket, totalObjects, prefix, fileName);
+ if (ret = conn.ListObjects(prefix, objects) != 0) {
+ CleanupTestListObjects(conn, totalObjects, prefix, fileName);
return (ret);
}
if (objects.size() != expectedResult) {
- CleanupTestListObjects(config, firstBucket, totalObjects, prefix, fileName);
- return (1);
+ CleanupTestListObjects(conn, totalObjects, prefix, fileName);
+ return (TEST_FAILURE);
}
/* List single. */
objects.clear();
expectedResult = 1;
- if (ret = conn.ListObjects(firstBucket, prefix, objects, batchSize, listSingle) != 0) {
- CleanupTestListObjects(config, firstBucket, totalObjects, prefix, fileName);
+ if (ret = conn.ListObjects(prefix, objects, batchSize, listSingle) != 0) {
+ CleanupTestListObjects(conn, totalObjects, prefix, fileName);
return (ret);
}
if (objects.size() != expectedResult) {
- CleanupTestListObjects(config, firstBucket, totalObjects, prefix, fileName);
- return (1);
+ CleanupTestListObjects(conn, totalObjects, prefix, fileName);
+ return (TEST_FAILURE);
}
/* Expected number of matches with test_list_objects_1 prefix. */
objects.clear();
expectedResult = 11;
- if (ret = conn.ListObjects(firstBucket, prefix + "1", objects) != 0) {
- CleanupTestListObjects(config, firstBucket, totalObjects, prefix, fileName);
+ if (ret = conn.ListObjects(prefix + "1", objects) != 0) {
+ CleanupTestListObjects(conn, totalObjects, prefix, fileName);
return (ret);
}
if (objects.size() != expectedResult) {
- CleanupTestListObjects(config, firstBucket, totalObjects, prefix, fileName);
- return (1);
+ CleanupTestListObjects(conn, totalObjects, prefix, fileName);
+ return (TEST_FAILURE);
}
/* List with 5 objects per AWS request. */
objects.clear();
batchSize = 5;
expectedResult = totalObjects;
- if (ret = conn.ListObjects(firstBucket, prefix, objects, batchSize) != 0) {
- CleanupTestListObjects(config, firstBucket, totalObjects, prefix, fileName);
+ if (ret = conn.ListObjects(prefix, objects, batchSize) != 0) {
+ CleanupTestListObjects(conn, totalObjects, prefix, fileName);
return (ret);
}
if (objects.size() != expectedResult) {
- CleanupTestListObjects(config, firstBucket, totalObjects, prefix, fileName);
- return (1);
+ CleanupTestListObjects(conn, totalObjects, prefix, fileName);
+ return (TEST_FAILURE);
}
/* ListSingle with 8 objects per AWS request. */
objects.clear();
expectedResult = 1;
- if (ret = conn.ListObjects(firstBucket, prefix, objects, batchSize, listSingle) != 0) {
- CleanupTestListObjects(config, firstBucket, totalObjects, prefix, fileName);
+ if (ret = conn.ListObjects(prefix, objects, batchSize, listSingle) != 0) {
+ CleanupTestListObjects(conn, totalObjects, prefix, fileName);
return (ret);
}
if (objects.size() != expectedResult) {
- CleanupTestListObjects(config, firstBucket, totalObjects, prefix, fileName);
- return (1);
+ CleanupTestListObjects(conn, totalObjects, prefix, fileName);
+ return (TEST_FAILURE);
}
/* List with 8 objects per AWS request. */
objects.clear();
batchSize = 8;
expectedResult = totalObjects;
- if (ret = conn.ListObjects(firstBucket, prefix, objects, batchSize) != 0) {
- CleanupTestListObjects(config, firstBucket, totalObjects, prefix, fileName);
+ if (ret = conn.ListObjects(prefix, objects, batchSize) != 0) {
+ CleanupTestListObjects(conn, totalObjects, prefix, fileName);
return (ret);
}
if (objects.size() != expectedResult) {
- CleanupTestListObjects(config, firstBucket, totalObjects, prefix, fileName);
- return (1);
+ CleanupTestListObjects(conn, totalObjects, prefix, fileName);
+ return (TEST_FAILURE);
}
/* ListSingle with 8 objects per AWS request. */
objects.clear();
expectedResult = 1;
- if (ret = conn.ListObjects(firstBucket, prefix, objects, batchSize, listSingle) != 0) {
- CleanupTestListObjects(config, firstBucket, totalObjects, prefix, fileName);
+ if (ret = conn.ListObjects(prefix, objects, batchSize, listSingle) != 0) {
+ CleanupTestListObjects(conn, totalObjects, prefix, fileName);
return (ret);
}
if (objects.size() != expectedResult) {
- CleanupTestListObjects(config, firstBucket, totalObjects, prefix, fileName);
- return (1);
+ CleanupTestListObjects(conn, totalObjects, prefix, fileName);
+ return (TEST_FAILURE);
}
- CleanupTestListObjects(config, firstBucket, totalObjects, prefix, fileName);
- return (0);
-}
-
-int
-CleanupTestListObjects(const Aws::S3Crt::ClientConfiguration &config, const std::string &bucketName,
- const int totalObjects, const std::string &prefix, const std::string &fileName)
-{
- /* Delete objects and file at end of test. */
- S3Connection conn(config);
- int ret = 0;
- for (int i = 0; i < totalObjects; i++) {
- if (ret = conn.DeleteObject(bucketName, prefix + std::to_string(i) + ".txt") != 0)
- std::cerr << "Error in CleanupTestListBuckets: failed to remove " << prefix
- << std::to_string(i) << ".txt from " << bucketName << std::endl;
- }
- std::remove(fileName.c_str());
-
- return (ret);
+ // CleanupTestListObjects(conn, totalObjects, prefix, fileName);
+ std::cout << "TestListObjects(): succeeded." << std::endl;
+ return (TEST_SUCCESS);
}
/*
@@ -222,14 +238,10 @@ CleanupTestListObjects(const Aws::S3Crt::ClientConfiguration &config, const std:
int
TestObjectExists(const Aws::S3Crt::ClientConfiguration &config)
{
- S3Connection conn(config);
- std::vector<std::string> buckets;
+ S3Connection conn(config, TestDefaults::bucketName, TestDefaults::objPrefix);
bool exists = false;
- int ret;
+ int ret = TEST_FAILURE;
- if (ret = conn.ListBuckets(buckets) != 0)
- return (ret);
- const std::string bucketName = buckets.at(0);
const std::string objectName = "test_object";
const std::string fileName = "test_object.txt";
@@ -238,19 +250,20 @@ TestObjectExists(const Aws::S3Crt::ClientConfiguration &config)
File << "Test payload";
File.close();
- if (ret = conn.ObjectExists(bucketName, objectName, exists) != 0 || exists)
+ if (ret = conn.ObjectExists(objectName, exists) != 0 || exists)
return (ret);
- if (ret = conn.PutObject(bucketName, objectName, fileName) != 0)
+ if (ret = conn.PutObject(objectName, fileName) != 0)
return (ret);
- if (ret = conn.ObjectExists(bucketName, objectName, exists) != 0 || !exists)
+ if (ret = conn.ObjectExists(objectName, exists) != 0 || !exists)
return (ret);
- if (ret = conn.DeleteObject(bucketName, objectName) != 0)
+ if (ret = conn.DeleteObject(objectName) != 0)
return (ret);
- std::cout << "TestObjectExists(): succeeded.\n" << std::endl;
- return (0);
+
+ std::cout << "TestObjectExists(): succeeded." << std::endl;
+ return (ret);
}
/*
@@ -260,6 +273,10 @@ TestObjectExists(const Aws::S3Crt::ClientConfiguration &config)
int
main()
{
+ /* Setup the test environment. */
+ if (setupTestDefaults() != 0)
+ return (TEST_FAILURE);
+
/* Set up the config to use the defaults specified. */
Aws::S3Crt::ClientConfiguration awsConfig;
awsConfig.region = TestDefaults::region;
@@ -270,14 +287,10 @@ main()
Aws::SDKOptions options;
Aws::InitAPI(options);
- int expectedOutput = 0;
- TEST(TestListBuckets, awsConfig, expectedOutput);
- TEST(TestListObjects, awsConfig, expectedOutput);
-
- int objectExistsExpectedOutput = 0;
- TEST(TestObjectExists, awsConfig, objectExistsExpectedOutput);
+ TEST(TestObjectExists, awsConfig);
+ TEST(TestListObjects, awsConfig);
/* Shutdown the API at end of tests. */
Aws::ShutdownAPI(options);
- return 0;
+ return (TEST_SUCCESS);
}