summaryrefslogtreecommitdiff
path: root/src/mongo/db/storage
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/storage')
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_column_store.cpp28
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_cursor.cpp28
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_cursor.h27
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_index.cpp60
4 files changed, 79 insertions, 64 deletions
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_column_store.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_column_store.cpp
index e9bc642dfb8..e9000044126 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_column_store.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_column_store.cpp
@@ -393,28 +393,26 @@ std::unique_ptr<ColumnStore::Cursor> WiredTigerColumnStore::newCursor(
class WiredTigerColumnStore::BulkBuilder final : public ColumnStore::BulkBuilder {
public:
BulkBuilder(WiredTigerColumnStore* idx, OperationContext* opCtx)
- : _opCtx(opCtx),
- _session(WiredTigerRecoveryUnit::get(_opCtx)->getSessionCache()->getSession()),
- _cursor(openBulkCursor(idx)) {}
-
- ~BulkBuilder() {
- _cursor->close(_cursor);
- }
+ : _opCtx(opCtx), _cursor(idx->uri(), opCtx) {}
void addCell(PathView path, const RecordId& rid, CellView cell) override {
- uasserted(ErrorCodes::NotImplemented, "WiredTigerColumnStore bulk builder");
- }
+ const std::string& key = makeKey(_buffer, path, rid);
+ WiredTigerItem keyItem(key.c_str(), key.size());
+ _cursor->set_key(_cursor.get(), keyItem.Get());
-private:
- WT_CURSOR* openBulkCursor(WiredTigerColumnStore* idx) {
- // TODO SERVER-65484: Much of this logic can be shared with standard WT index.
- uasserted(ErrorCodes::NotImplemented, "WiredTigerColumnStore bulk builder");
+ WiredTigerItem cellItem(cell.rawData(), cell.size());
+ _cursor->set_value(_cursor.get(), cellItem.Get());
+
+ invariantWTOK(wiredTigerCursorInsert(_opCtx, _cursor.get()), _cursor->session);
+
+ ResourceConsumption::MetricsCollector::get(_opCtx).incrementOneIdxEntryWritten(
+ std::string(_cursor->uri), keyItem.size);
}
+private:
std::string _buffer;
OperationContext* const _opCtx;
- UniqueWiredTigerSession const _session;
- WT_CURSOR* const _cursor;
+ WiredTigerBulkLoadCursor _cursor;
};
std::unique_ptr<ColumnStore::BulkBuilder> WiredTigerColumnStore::makeBulkBuilder(
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_cursor.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_cursor.cpp
index 188983bf071..d5704989b20 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_cursor.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_cursor.cpp
@@ -82,4 +82,32 @@ WiredTigerCursor::~WiredTigerCursor() {
void WiredTigerCursor::reset() {
invariantWTOK(_cursor->reset(_cursor), _cursor->session);
}
+
+WiredTigerBulkLoadCursor::WiredTigerBulkLoadCursor(const std::string& indexUri,
+ OperationContext* opCtx)
+ : _session(WiredTigerRecoveryUnit::get(opCtx)->getSessionCache()->getSession()) {
+ // Open cursors can cause bulk open_cursor to fail with EBUSY.
+ // TODO any other cases that could cause EBUSY?
+ WiredTigerSession* outerSession = WiredTigerRecoveryUnit::get(opCtx)->getSession();
+ outerSession->closeAllCursors(indexUri);
+
+ // The 'checkpoint_wait=false' option is set to prefer falling back on the "non-bulk" cursor
+ // over waiting a potentially long time for a checkpoint.
+ WT_SESSION* sessionPtr = _session->getSession();
+ int err = sessionPtr->open_cursor(
+ sessionPtr, indexUri.c_str(), nullptr, "bulk,checkpoint_wait=false", &_cursor);
+ if (!err) {
+ return; // Success
+ }
+
+ LOGV2_WARNING(51783,
+ "failed to create WiredTiger bulk cursor: {error} falling back to non-bulk "
+ "cursor for index {index}",
+ "Failed to create WiredTiger bulk cursor, falling back to non-bulk",
+ "error"_attr = wiredtiger_strerror(err),
+ "index"_attr = indexUri);
+
+ invariantWTOK(sessionPtr->open_cursor(sessionPtr, indexUri.c_str(), nullptr, nullptr, &_cursor),
+ sessionPtr);
+}
} // namespace mongo
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_cursor.h b/src/mongo/db/storage/wiredtiger/wiredtiger_cursor.h
index b31f91465cf..716226e45a9 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_cursor.h
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_cursor.h
@@ -81,4 +81,31 @@ protected:
WT_CURSOR* _cursor = nullptr; // Owned
};
+
+/**
+ * An owning object wrapper for a WT_SESSION and WT_CURSOR configured for bulk loading when
+ * possible. The cursor is created and closed independently of the cursor cache, which does not
+ * store bulk cursors. It uses its own session to avoid hijacking an existing transaction in the
+ * current session.
+ */
+class WiredTigerBulkLoadCursor {
+public:
+ WiredTigerBulkLoadCursor(const std::string& indexUri, OperationContext* opCtx);
+
+ ~WiredTigerBulkLoadCursor() {
+ _cursor->close(_cursor);
+ }
+
+ WT_CURSOR* get() const {
+ return _cursor;
+ }
+
+ WT_CURSOR* operator->() const {
+ return get();
+ }
+
+private:
+ UniqueWiredTigerSession const _session;
+ WT_CURSOR* _cursor = nullptr; // Owned
+};
} // namespace mongo
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_index.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_index.cpp
index 69c422c27d5..16870b4c7d7 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_index.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_index.cpp
@@ -692,54 +692,16 @@ RecordId WiredTigerIndex::_decodeRecordIdAtEnd(const void* buffer, size_t size)
class WiredTigerIndex::BulkBuilder : public SortedDataBuilderInterface {
public:
BulkBuilder(WiredTigerIndex* idx, OperationContext* opCtx)
- : _ordering(idx->_ordering),
- _opCtx(opCtx),
- _session(WiredTigerRecoveryUnit::get(_opCtx)->getSessionCache()->getSession()),
- _cursor(openBulkCursor(idx)) {}
-
- ~BulkBuilder() {
- _cursor->close(_cursor);
- }
+ : _ordering(idx->_ordering), _opCtx(opCtx), _cursor(idx->uri(), _opCtx) {}
protected:
- WT_CURSOR* openBulkCursor(WiredTigerIndex* idx) {
- // Open cursors can cause bulk open_cursor to fail with EBUSY.
- // TODO any other cases that could cause EBUSY?
- WiredTigerSession* outerSession = WiredTigerRecoveryUnit::get(_opCtx)->getSession();
- outerSession->closeAllCursors(idx->uri());
-
- // Not using cursor cache since we need to set "bulk".
- WT_CURSOR* cursor;
- // Use a different session to ensure we don't hijack an existing transaction.
- // Configure the bulk cursor open to fail quickly if it would wait on a checkpoint
- // completing - since checkpoints can take a long time, and waiting can result in
- // an unexpected pause in building an index.
- WT_SESSION* session = _session->getSession();
- int err = session->open_cursor(
- session, idx->uri().c_str(), nullptr, "bulk,checkpoint_wait=false", &cursor);
- if (!err)
- return cursor;
-
- LOGV2_WARNING(51783,
- "failed to create WiredTiger bulk cursor: {error} falling back to non-bulk "
- "cursor for index {index}",
- "Failed to create WiredTiger bulk cursor, falling back to non-bulk",
- "error"_attr = wiredtiger_strerror(err),
- "index"_attr = idx->uri());
-
- invariantWTOK(session->open_cursor(session, idx->uri().c_str(), nullptr, nullptr, &cursor),
- session);
- return cursor;
- }
-
void setKey(WT_CURSOR* cursor, const WT_ITEM* item) {
cursor->set_key(cursor, item);
}
const Ordering _ordering;
OperationContext* const _opCtx;
- UniqueWiredTigerSession const _session;
- WT_CURSOR* const _cursor;
+ WiredTigerBulkLoadCursor _cursor;
};
@@ -756,16 +718,16 @@ public:
// Can't use WiredTigerCursor since we aren't using the cache.
WiredTigerItem item(keyString.getBuffer(), keyString.getSize());
- setKey(_cursor, item.Get());
+ setKey(_cursor.get(), item.Get());
const KeyString::TypeBits typeBits = keyString.getTypeBits();
WiredTigerItem valueItem = typeBits.isAllZeros()
? emptyItem
: WiredTigerItem(typeBits.getBuffer(), typeBits.getSize());
- _cursor->set_value(_cursor, valueItem.Get());
+ _cursor->set_value(_cursor.get(), valueItem.Get());
- invariantWTOK(wiredTigerCursorInsert(_opCtx, _cursor), _cursor->session);
+ invariantWTOK(wiredTigerCursorInsert(_opCtx, _cursor.get()), _cursor->session);
auto& metricsCollector = ResourceConsumption::MetricsCollector::get(_opCtx);
metricsCollector.incrementOneIdxEntryWritten(std::string(_cursor->uri), item.size);
@@ -824,16 +786,16 @@ public:
// Can't use WiredTigerCursor since we aren't using the cache.
WiredTigerItem keyItem(newKeyString.getBuffer(), newKeyString.getSize());
- setKey(_cursor, keyItem.Get());
+ setKey(_cursor.get(), keyItem.Get());
const KeyString::TypeBits typeBits = newKeyString.getTypeBits();
WiredTigerItem valueItem = typeBits.isAllZeros()
? emptyItem
: WiredTigerItem(typeBits.getBuffer(), typeBits.getSize());
- _cursor->set_value(_cursor, valueItem.Get());
+ _cursor->set_value(_cursor.get(), valueItem.Get());
- invariantWTOK(wiredTigerCursorInsert(_opCtx, _cursor), _cursor->session);
+ invariantWTOK(wiredTigerCursorInsert(_opCtx, _cursor.get()), _cursor->session);
auto& metricsCollector = ResourceConsumption::MetricsCollector::get(_opCtx);
metricsCollector.incrementOneIdxEntryWritten(std::string(_cursor->uri), keyItem.size);
@@ -882,10 +844,10 @@ public:
WiredTigerItem keyItem(newKeyString.getBuffer(), sizeWithoutRecordId);
WiredTigerItem valueItem(value.getBuffer(), value.getSize());
- setKey(_cursor, keyItem.Get());
- _cursor->set_value(_cursor, valueItem.Get());
+ setKey(_cursor.get(), keyItem.Get());
+ _cursor->set_value(_cursor.get(), valueItem.Get());
- invariantWTOK(wiredTigerCursorInsert(_opCtx, _cursor), _cursor->session);
+ invariantWTOK(wiredTigerCursorInsert(_opCtx, _cursor.get()), _cursor->session);
auto& metricsCollector = ResourceConsumption::MetricsCollector::get(_opCtx);
metricsCollector.incrementOneIdxEntryWritten(std::string(_cursor->uri), keyItem.size);