diff options
author | Luke Chen <luke.chen@mongodb.com> | 2022-02-15 08:33:23 +1100 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-02-14 22:27:59 +0000 |
commit | 85b78395ff7a5e559162891ac1c18f33ba303474 (patch) | |
tree | ae585ccad062ce26745b91848183161b76b127d1 | |
parent | c05b9d22da8e01fe7e9fce8ae25da25e95fd448c (diff) | |
download | mongo-85b78395ff7a5e559162891ac1c18f33ba303474.tar.gz |
Import wiredtiger: fc3f4b5a754a8abf30e920afb0d2769ff88b04c0 from branch mongodb-master
ref: 460513f9f4..fc3f4b5a75
for: 5.3.0
WT-8718 Return S3 file handle for the open file call, allow reading and close.
6 files changed, 301 insertions, 28 deletions
diff --git a/src/third_party/wiredtiger/ext/storage_sources/s3_store/s3_connection.cpp b/src/third_party/wiredtiger/ext/storage_sources/s3_store/s3_connection.cpp index 4cd71ce03c3..d2e37a6ecd8 100644 --- a/src/third_party/wiredtiger/ext/storage_sources/s3_store/s3_connection.cpp +++ b/src/third_party/wiredtiger/ext/storage_sources/s3_store/s3_connection.cpp @@ -2,6 +2,7 @@ #include <aws/s3-crt/model/DeleteObjectRequest.h> #include <aws/s3-crt/model/ListObjectsV2Request.h> #include <aws/s3-crt/model/PutObjectRequest.h> +#include <aws/s3-crt/model/GetObjectRequest.h> #include <aws/s3-crt/model/HeadObjectRequest.h> #include "s3_connection.h" @@ -11,6 +12,7 @@ #include <string> #include <vector> +#define S3_ALLOCATION_TAG "" /* * S3Connection -- * Constructor for AWS S3 bucket connection. @@ -105,6 +107,32 @@ S3Connection::DeleteObject(const std::string &objectKey) const } /* + * GetObject -- + * Retrieves an object from S3. The object is downloaded to disk at the specified location. + */ +int +S3Connection::GetObject(const std::string &objectKey, const std::string &path) const +{ + Aws::S3Crt::Model::GetObjectRequest request; + request.SetBucket(_bucketName); + request.SetKey(_objectPrefix + objectKey); + /* + * The S3 Object should be downloaded to disk rather than into an in-memory buffer. Use a custom + * response stream factory to specify how the response should be downloaded. + * https://sdk.amazonaws.com/cpp/api/0.14.3/class_aws_1_1_utils_1_1_stream_1_1_response_stream.html + */ + request.SetResponseStreamFactory([=]() { + return (Aws::New<Aws::FStream>( + S3_ALLOCATION_TAG, path, std::ios_base::out | std::ios_base::binary)); + }); + + if (!_s3CrtClient.GetObject(request).IsSuccess()) + return (1); + + return (0); +} + +/* * ObjectExists -- * Checks whether an object with the given key exists in the S3 bucket. */ diff --git a/src/third_party/wiredtiger/ext/storage_sources/s3_store/s3_connection.h b/src/third_party/wiredtiger/ext/storage_sources/s3_store/s3_connection.h index d748378b8da..297f5674ba5 100644 --- a/src/third_party/wiredtiger/ext/storage_sources/s3_store/s3_connection.h +++ b/src/third_party/wiredtiger/ext/storage_sources/s3_store/s3_connection.h @@ -21,6 +21,8 @@ class S3Connection { int PutObject(const std::string &objectKey, const std::string &fileName) const; int DeleteObject(const std::string &objectKey) const; int ObjectExists(const std::string &objectKey, bool &exists) const; + int GetObject(const std::string &objectKey, const std::string &path) const; + ~S3Connection() = default; private: diff --git a/src/third_party/wiredtiger/ext/storage_sources/s3_store/s3_storage_source.cpp b/src/third_party/wiredtiger/ext/storage_sources/s3_store/s3_storage_source.cpp index e4d095d536a..eed71eb99a1 100644 --- a/src/third_party/wiredtiger/ext/storage_sources/s3_store/s3_storage_source.cpp +++ b/src/third_party/wiredtiger/ext/storage_sources/s3_store/s3_storage_source.cpp @@ -42,6 +42,7 @@ #define UNUSED(x) (void)(x) #define FS2S3(fs) (((S3_FILE_SYSTEM *)(fs))->storage) +struct S3_FILE_HANDLE; struct S3_FILE_SYSTEM; /* S3 storage source structure. */ @@ -51,6 +52,8 @@ struct S3_STORAGE { std::mutex fsListMutex; /* Protect the file system list */ std::list<S3_FILE_SYSTEM *> fsList; /* List of initiated file systems */ + std::mutex fhMutex; /* Protect the file handle list*/ + std::list<S3_FILE_HANDLE *> fhList; /* List of open file handles */ uint32_t referenceCount; /* Number of references to this storge source */ int32_t verbose; @@ -59,13 +62,30 @@ struct S3_STORAGE { struct S3_FILE_SYSTEM { /* Must come first - this is the interface for the file system we are implementing. */ WT_FILE_SYSTEM fileSystem; + S3_STORAGE *storage; + /* + * The S3_FILE_SYSTEM is built on top of the WT_FILE_SYSTEM. We require an instance of the + * WT_FILE_SYSTEM in order to access the native WiredTiger filesystem functionality, such as the + * native WT file handle open. + */ + WT_FILE_SYSTEM *wtFileSystem; S3Connection *connection; S3LogSystem *log; - S3_STORAGE *storage; std::string cacheDir; /* Directory for cached objects */ std::string homeDir; /* Owned by the connection */ }; +struct S3_FILE_HANDLE { + WT_FILE_HANDLE iface; /* Must come first */ + S3_STORAGE *storage; /* Enclosing storage source */ + /* + * Similarly, The S3_FILE_HANDLE is built on top of the WT_FILE_HANDLE. We require an instance + * of the WT_FILE_HANDLE in order to access the native WiredTiger filehandle functionality, such + * as the native WT file handle read and close. + */ + WT_FILE_HANDLE *wtFileHandle; +}; + /* Configuration variables for connecting to S3CrtClient. */ const Aws::String region = Aws::Region::AP_SOUTHEAST_2; const double throughputTargetGbps = 5; @@ -84,7 +104,10 @@ static int S3CustomizeFileSystem( WT_STORAGE_SOURCE *, WT_SESSION *, const char *, const char *, const char *, WT_FILE_SYSTEM **); static int S3AddReference(WT_STORAGE_SOURCE *); static int S3FileSystemTerminate(WT_FILE_SYSTEM *, WT_SESSION *); - +static int S3Open( + WT_FILE_SYSTEM *, WT_SESSION *, const char *, WT_FS_OPEN_FILE_TYPE, uint32_t, WT_FILE_HANDLE **); +static bool LocalFileExists(const std::string &); +static int S3FileRead(WT_FILE_HANDLE *, WT_SESSION *, wt_off_t, size_t, void *); static int S3ObjectList( WT_FILE_SYSTEM *, WT_SESSION *, const char *, const char *, char ***, uint32_t *); static int S3ObjectListAdd( @@ -92,7 +115,7 @@ static int S3ObjectListAdd( static int S3ObjectListSingle( WT_FILE_SYSTEM *, WT_SESSION *, const char *, const char *, char ***, uint32_t *); static int S3ObjectListFree(WT_FILE_SYSTEM *, WT_SESSION *, char **, uint32_t); - +static int S3FileClose(WT_FILE_HANDLE *, WT_SESSION *); /* * S3Path -- * Construct a pathname from the directory and the object name. @@ -139,7 +162,17 @@ S3Exist(WT_FILE_SYSTEM *fileSystem, WT_SESSION *session, const char *name, bool static bool S3CacheExists(WT_FILE_SYSTEM *fileSystem, const std::string &name) { - std::string path = S3Path(((S3_FILE_SYSTEM *)fileSystem)->cacheDir, name); + const std::string path = S3Path(((S3_FILE_SYSTEM *)fileSystem)->cacheDir, name); + return (LocalFileExists(path)); +} + +/* + * LocalFileExists -- + * Checks whether a file corresponding to the provided path exists locally. + */ +static bool +LocalFileExists(const std::string &path) +{ std::ifstream f(path); return (f.good()); } @@ -179,6 +212,143 @@ S3GetDirectory(const std::string &home, const std::string &name, bool create, st } /* + * S3FileClose -- + * File handle close. + */ +static int +S3FileClose(WT_FILE_HANDLE *fileHandle, WT_SESSION *session) +{ + int ret = 0; + S3_FILE_HANDLE *s3FileHandle = (S3_FILE_HANDLE *)fileHandle; + S3_STORAGE *storage = s3FileHandle->storage; + WT_FILE_HANDLE *wtFileHandle = s3FileHandle->wtFileHandle; + + /* + * We require exclusive access to the list of file handles when removing file handles. The + * lock_guard will be unlocked automatically once the scope is exited. + */ + { + std::lock_guard<std::mutex> lock(storage->fhMutex); + storage->fhList.remove(s3FileHandle); + } + if (wtFileHandle != NULL) { + ret = wtFileHandle->close(wtFileHandle, session); + if (ret != 0) { + free(s3FileHandle->iface.name); + free(s3FileHandle); + return (ret); + } + } + + free(s3FileHandle->iface.name); + free(s3FileHandle); + + return (ret); +} + +/* + * S3Open -- + * File open for the s3 storage source. + */ +static int +S3Open(WT_FILE_SYSTEM *fileSystem, WT_SESSION *session, const char *name, + WT_FS_OPEN_FILE_TYPE fileType, uint32_t flags, WT_FILE_HANDLE **fileHandlePtr) +{ + S3_FILE_HANDLE *s3FileHandle; + S3_FILE_SYSTEM *s3Fs = (S3_FILE_SYSTEM *)fileSystem; + S3_STORAGE *s3 = s3Fs->storage; + WT_FILE_SYSTEM *wtFileSystem = s3Fs->wtFileSystem; + WT_FILE_HANDLE *wtFileHandle; + int ret; + + *fileHandlePtr = NULL; + + /* We only support opening the file in read only mode. */ + if ((flags & WT_FS_OPEN_READONLY) == 0 || (flags & WT_FS_OPEN_CREATE) != 0) { + std::cerr << "ss_open_object: readonly access required: " << name << std::endl; + return (EINVAL); + } + + /* + * Currently, only data files should be being opened; although this constraint can be relaxed in + * the future. + */ + if (fileType != WT_FS_OPEN_FILE_TYPE_DATA && fileType != WT_FS_OPEN_FILE_TYPE_REGULAR) { + std::cerr << name << ": open: only data file and regular types supported" << std::endl; + return (EINVAL); + } + + if ((s3FileHandle = (S3_FILE_HANDLE *)calloc(1, sizeof(S3_FILE_HANDLE))) == NULL) + return (ENOMEM); + + /* Make a copy from S3 if the file is not in the cache. */ + const std::string cachePath = S3Path(s3Fs->cacheDir, name); + if (!LocalFileExists(cachePath)) { + if ((ret = s3Fs->connection->GetObject(name, cachePath)) != 0) { + std::cerr << "ss_open_object: GetObject request to s3 failed" << std::endl; + return (ret); + } + } + + /* Use WiredTiger's native file handle open. */ + ret = wtFileSystem->fs_open_file( + wtFileSystem, session, cachePath.c_str(), fileType, flags, &wtFileHandle); + if (ret != 0) { + std::cerr << "ss_open_objet: open " << name << std::endl; + return (ret); + } + + s3FileHandle->wtFileHandle = wtFileHandle; + s3FileHandle->storage = s3; + + WT_FILE_HANDLE *fileHandle = (WT_FILE_HANDLE *)s3FileHandle; + fileHandle->close = S3FileClose; + fileHandle->fh_advise = NULL; + fileHandle->fh_extend = NULL; + fileHandle->fh_extend_nolock = NULL; + fileHandle->fh_lock = NULL; + fileHandle->fh_map = NULL; + fileHandle->fh_map_discard = NULL; + fileHandle->fh_map_preload = NULL; + fileHandle->fh_unmap = NULL; + fileHandle->fh_read = S3FileRead; + fileHandle->fh_size = NULL; + fileHandle->fh_sync = NULL; + fileHandle->fh_sync_nowait = NULL; + fileHandle->fh_truncate = NULL; + fileHandle->fh_write = NULL; + + fileHandle->name = strdup(name); + if (fileHandle->name == NULL) { + std::cout << "ss_open_object: unable to allocate memory for object name" << std::endl; + return (ENOMEM); + } + + /* + * We require exclusive access to the list of file handles when adding file handles to it. The + * lock_guard will be unlocked automatically when the scope is exited. + */ + { + std::lock_guard<std::mutex> lock(s3->fhMutex); + s3FileHandle->storage->fhList.push_back(s3FileHandle); + } + + *fileHandlePtr = fileHandle; + return (0); +} + +/* + * S3FileRead -- + * Read a file using WiredTiger's native file handle read. + */ +static int +S3FileRead(WT_FILE_HANDLE *file_handle, WT_SESSION *session, wt_off_t offset, size_t len, void *buf) +{ + WT_FILE_HANDLE *wtFileHandle = ((S3_FILE_HANDLE *)file_handle)->wtFileHandle; + return (wtFileHandle->fh_read(wtFileHandle, session, offset, len, buf)); +} + +/* * S3CustomizeFileSystem -- * Return a customized file system to access the s3 storage source objects. */ @@ -187,9 +357,10 @@ S3CustomizeFileSystem(WT_STORAGE_SOURCE *storageSource, WT_SESSION *session, con const char *authToken, const char *config, WT_FILE_SYSTEM **fileSystem) { S3_FILE_SYSTEM *fs; + WT_FILE_SYSTEM *wtFileSystem; S3_STORAGE *s3; int ret; - std::string cacheDir, homeDir; + std::string cacheDir; s3 = (S3_STORAGE *)storageSource; @@ -214,7 +385,7 @@ S3CustomizeFileSystem(WT_STORAGE_SOURCE *storageSource, WT_SESSION *session, con objPrefix = objPrefixConf.str; else if (ret != WT_NOTFOUND) { std::cerr << "Error: customize_file_system: config parsing for object prefix"; - return 1; + return (1); } /* @@ -233,9 +404,12 @@ S3CustomizeFileSystem(WT_STORAGE_SOURCE *storageSource, WT_SESSION *session, con } else return (ret); - /* Store a copy of the home directory in the file system. */ - homeDir = session->connection->get_home(session->connection); + /* Fetch the native WT file system. */ + if ((ret = s3->wtApi->file_system_get(s3->wtApi, session, &wtFileSystem)) != 0) + return (ret); + /* Get a copy of the home and cache directory. */ + const std::string homeDir = session->connection->get_home(session->connection); if ((ret = S3GetDirectory(homeDir, cacheStr, true, cacheDir)) != 0) return (ret); @@ -243,6 +417,7 @@ S3CustomizeFileSystem(WT_STORAGE_SOURCE *storageSource, WT_SESSION *session, con if ((fs = (S3_FILE_SYSTEM *)calloc(1, sizeof(S3_FILE_SYSTEM))) == NULL) return (errno); fs->storage = s3; + fs->wtFileSystem = wtFileSystem; fs->homeDir = homeDir; fs->cacheDir = cacheDir; @@ -258,15 +433,16 @@ S3CustomizeFileSystem(WT_STORAGE_SOURCE *storageSource, WT_SESSION *session, con fs->fileSystem.fs_directory_list_free = S3ObjectListFree; fs->fileSystem.terminate = S3FileSystemTerminate; fs->fileSystem.fs_exist = S3Exist; + fs->fileSystem.fs_open_file = S3Open; - /* Add to the list of the active file systems. */ + /* Add to the list of the active file systems. Lock will be freed when the scope is exited. */ { std::lock_guard<std::mutex> lockGuard(s3->fsListMutex); s3->fsList.push_back(fs); } *fileSystem = &fs->fileSystem; - return (0); + return (ret); } /* @@ -281,7 +457,7 @@ S3FileSystemTerminate(WT_FILE_SYSTEM *fileSystem, WT_SESSION *session) UNUSED(session); /* unused */ - /* Remove from the active filesystems list. */ + /* Remove from the active filesystems list. The lock will be freed when the scope is exited. */ { std::lock_guard<std::mutex> lockGuard(s3->fsListMutex); s3->fsList.remove(fs); @@ -423,6 +599,15 @@ S3Terminate(WT_STORAGE_SOURCE *storageSource, WT_SESSION *session) return (0); /* + * Is it currently unclear at the moment what the multi-threading will look like in the + * extension. The current implementation is NOT thread-safe, and needs to be addressed in the + * future, as mulitple threads could call terminate leading to a race condition. + */ + while (!s3->fhList.empty()) { + S3_FILE_HANDLE *fs = s3->fhList.front(); + S3FileClose((WT_FILE_HANDLE *)fs, session); + } + /* * Terminate any active filesystems. There are no references to the storage source, so it is * safe to walk the active filesystem list without a lock. The removal from the list happens * under a lock. Also, removal happens from the front and addition at the end, so we are safe. @@ -434,8 +619,8 @@ S3Terminate(WT_STORAGE_SOURCE *storageSource, WT_SESSION *session) Aws::Utils::Logging::ShutdownAWSLogging(); Aws::ShutdownAPI(options); - delete (s3); + delete (s3); return (0); } @@ -467,10 +652,10 @@ S3FlushFinish(WT_STORAGE_SOURCE *storage, WT_SESSION *session, WT_FILE_SYSTEM *f /* Linking file with the local file. */ int ret = link(srcPath.c_str(), destPath.c_str()); - /* Linking file with the local file. */ + /* The file should be read-only. */ if (ret == 0) ret = chmod(destPath.c_str(), 0444); - return ret; + return (ret); } /* diff --git a/src/third_party/wiredtiger/ext/storage_sources/s3_store/test/test_s3_connection.cpp b/src/third_party/wiredtiger/ext/storage_sources/s3_store/test/test_s3_connection.cpp index 8b94f4a2b62..1b39f5c44ef 100644 --- a/src/third_party/wiredtiger/ext/storage_sources/s3_store/test/test_s3_connection.cpp +++ b/src/third_party/wiredtiger/ext/storage_sources/s3_store/test/test_s3_connection.cpp @@ -15,6 +15,7 @@ static std::string objPrefix("s3test_artefacts/unit_"); // To be concatenated wi #define TEST_FAILURE 1 int TestListObjects(const Aws::S3Crt::ClientConfiguration &config); +int TestGetObject(const Aws::S3Crt::ClientConfiguration &config); int TestObjectExists(const Aws::S3Crt::ClientConfiguration &config); /* Wrapper for unit test functions. */ @@ -63,12 +64,12 @@ setupTestDefaults() const char *envBucket = std::getenv("WT_S3_EXT_BUCKET"); if (envBucket != NULL) TestDefaults::bucketName = envBucket; - std::cout << "Bucket to be used for testing: " << TestDefaults::bucketName << std::endl; + std::cerr << "Bucket to be used for testing: " << TestDefaults::bucketName << std::endl; /* Append the prefix to be used for object names by a unique string. */ if (randomizeTestPrefix() != 0) return (TEST_FAILURE); - std::cout << "Generated prefix: " << TestDefaults::objPrefix << std::endl; + std::cerr << "Generated prefix: " << TestDefaults::objPrefix << std::endl; return (TEST_SUCCESS); } @@ -232,6 +233,54 @@ TestListObjects(const Aws::S3Crt::ClientConfiguration &config) } /* + * TestGetObject -- + * Unit test to get an object from an S3 Bucket. + */ +int +TestGetObject(const Aws::S3Crt::ClientConfiguration &config) +{ + S3Connection conn(config, TestDefaults::bucketName, TestDefaults::objPrefix); + int ret = TEST_FAILURE; + + const std::string objectName = "permanent_object"; + const std::string path = "./" + objectName; + + /* Create a file and upload to the bucket. */ + std::ofstream File(objectName); + File << "Test payload"; + File.close(); + if ((ret = conn.PutObject(objectName, objectName)) != 0) + return (ret); + + /* Delete the local copy of the file. */ + if (std::remove(path.c_str()) != 0) + return (TEST_FAILURE); + + /* Download the file from S3 */ + if ((ret = conn.GetObject(objectName, path)) != 0) { + std::cerr << "TestGetObject: call to S3Connection:GetObject has failed." << std::endl; + return (ret); + } + + /* The file should now be in the current directory. */ + std::ifstream f(path); + if (!f.good()) { + std::cerr << "TestGetObject: target " << objectName + << " has not been successfully downloaded." << std::endl; + return (TEST_FAILURE); + } + + /* Clean up test artifacts. */ + if (std::remove(path.c_str()) != 0) + return (TEST_FAILURE); + + if ((ret = conn.DeleteObject(objectName)) != 0) + return (ret); + + std::cout << "TestGetObject() succeeded." << std::endl; + return (TEST_SUCCESS); +} +/* * TestObjectExists -- * Unit test to check if an object exists in an AWS bucket. */ @@ -250,20 +299,21 @@ TestObjectExists(const Aws::S3Crt::ClientConfiguration &config) File << "Test payload"; File.close(); - if (ret = conn.ObjectExists(objectName, exists) != 0 || exists) - return (ret); + ret = conn.ObjectExists(objectName, exists); + if (ret != 0 || exists) + return (TEST_FAILURE); - if (ret = conn.PutObject(objectName, fileName) != 0) + if ((ret = conn.PutObject(objectName, fileName)) != 0) return (ret); - if (ret = conn.ObjectExists(objectName, exists) != 0 || !exists) - return (ret); + ret = conn.ObjectExists(objectName, exists); + if (ret != 0 || !exists) + return (TEST_FAILURE); - if (ret = conn.DeleteObject(objectName) != 0) + if ((ret = conn.DeleteObject(objectName)) != 0) return (ret); - - std::cout << "TestObjectExists(): succeeded." << std::endl; - return (ret); + std::cout << "TestObjectExists() succeeded." << std::endl; + return (TEST_SUCCESS); } /* @@ -289,6 +339,7 @@ main() TEST(TestObjectExists, awsConfig); TEST(TestListObjects, awsConfig); + TEST(TestGetObject, awsConfig); /* Shutdown the API at end of tests. */ Aws::ShutdownAPI(options); diff --git a/src/third_party/wiredtiger/import.data b/src/third_party/wiredtiger/import.data index 0b5df664156..a40c814a496 100644 --- a/src/third_party/wiredtiger/import.data +++ b/src/third_party/wiredtiger/import.data @@ -2,5 +2,5 @@ "vendor": "wiredtiger", "github": "wiredtiger/wiredtiger.git", "branch": "mongodb-master", - "commit": "460513f9f4bc3948a85ef4b772dfbb9ab7949778" + "commit": "fc3f4b5a754a8abf30e920afb0d2769ff88b04c0" } diff --git a/src/third_party/wiredtiger/test/suite/test_s3_store01.py b/src/third_party/wiredtiger/test/suite/test_s3_store01.py index cce79b21df0..ed4a98b1f2e 100644 --- a/src/third_party/wiredtiger/test/suite/test_s3_store01.py +++ b/src/third_party/wiredtiger/test/suite/test_s3_store01.py @@ -26,7 +26,8 @@ # ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR # OTHER DEALINGS IN THE SOFTWARE. -import datetime, os, wttest, random +import datetime, random, os, wiredtiger, wttest +FileSystem = wiredtiger.FileSystem # easy access to constants # test_s3_store01.py # Test minimal S3 extension with basic interactions with AWS S3CrtClient. @@ -67,7 +68,7 @@ class test_s3_store01(wttest.WiredTigerTestCase): # Test flush functionality and flushing to cache and checking if file exists. f = open(filename, 'wb') - outbytes = ('Ruby\n'*100).encode() + outbytes = ('MORE THAN ENOUGH DATA\n'*100000).encode() f.write(outbytes) f.close() @@ -75,6 +76,12 @@ class test_s3_store01(wttest.WiredTigerTestCase): s3_store.ss_flush_finish(session, fs, filename, object_name) self.assertTrue(fs.fs_exist(session, filename)) + fh = fs.fs_open_file(session, filename, FileSystem.open_file_type_data, FileSystem.open_readonly) + inbytes = bytes(1000000) # An empty buffer with a million zero bytes. + fh.fh_read(session, 0, inbytes) # Read into the buffer. + self.assertEquals(outbytes[0:1000000], inbytes) + fh.close(session) + # Checking that the file still exists in S3 after removing it from the cache. os.remove(cache_prefix + self.bucket_name + '/' + filename) self.assertTrue(fs.fs_exist(session, filename)) |