summaryrefslogtreecommitdiff
path: root/src/mongo/db/storage/multi_bson_stream_cursor.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/storage/multi_bson_stream_cursor.cpp')
-rw-r--r--src/mongo/db/storage/multi_bson_stream_cursor.cpp27
1 files changed, 23 insertions, 4 deletions
diff --git a/src/mongo/db/storage/multi_bson_stream_cursor.cpp b/src/mongo/db/storage/multi_bson_stream_cursor.cpp
index 189131b0208..749a21e2aff 100644
--- a/src/mongo/db/storage/multi_bson_stream_cursor.cpp
+++ b/src/mongo/db/storage/multi_bson_stream_cursor.cpp
@@ -28,7 +28,10 @@
*/
#include "mongo/db/storage/multi_bson_stream_cursor.h"
+
+#include "mongo/db/catalog/virtual_collection_options.h"
#include "mongo/db/storage/record_store.h"
+
namespace mongo {
/**
@@ -37,10 +40,10 @@ namespace mongo {
* and updates bookkeeping. This can never expand the buffer larger than (2 * BSONObjMaxUserSize).
*/
void MultiBsonStreamCursor::expandBuffer(int32_t bsonSize) {
- tassert(6968308,
+ uassert(6968308,
"bsonSize {} > BSONObjMaxUserSize {}"_format(bsonSize, BSONObjMaxUserSize),
(bsonSize <= BSONObjMaxUserSize));
- tassert(6968309, "bsonSize {} < 0"_format(bsonSize), (bsonSize >= 0));
+ uassert(6968309, "bsonSize {} < 0"_format(bsonSize), (bsonSize >= 0));
int newSizeTarget = 2 * bsonSize;
do {
@@ -143,6 +146,23 @@ boost::optional<Record> MultiBsonStreamCursor::nextFromCurrentStream() {
}
/**
+ * Returns an input stream for a named pipe mapped from 'url'.
+ *
+ * While creating an input stream, it strips off the file protocol part from the 'url'.
+ */
+std::unique_ptr<InputStream<NamedPipeInput>> MultiBsonStreamCursor::getInputStream(
+ const std::string& url) {
+ auto filePathPos = url.find(ExternalDataSourceMetadata::kUrlProtocolFile.toString());
+ tassert(
+ ErrorCodes::BadValue, "Invalid file url: {}"_format(url), filePathPos != std::string::npos);
+
+ auto filePathStr =
+ url.substr(filePathPos + ExternalDataSourceMetadata::kUrlProtocolFile.size());
+
+ return std::make_unique<InputStream<NamedPipeInput>>(filePathStr);
+}
+
+/**
* Returns the next record from the vector of streams or boost::none if exhausted or error.
* '_streamReader' is initialized to the first stream, if there is one, in the constructor.
*/
@@ -154,8 +174,7 @@ boost::optional<Record> MultiBsonStreamCursor::next() {
}
++_streamIdx;
if (_streamIdx < _numStreams) {
- _streamReader = std::make_unique<InputStream<NamedPipeInput>>(
- _vopts.dataSources[_streamIdx].url.c_str());
+ _streamReader = getInputStream(_vopts.dataSources[_streamIdx].url);
}
}
return boost::none;