diff options
Diffstat (limited to 'src/mongo/db/storage')
4 files changed, 79 insertions, 64 deletions
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_column_store.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_column_store.cpp index e9bc642dfb8..e9000044126 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_column_store.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_column_store.cpp @@ -393,28 +393,26 @@ std::unique_ptr<ColumnStore::Cursor> WiredTigerColumnStore::newCursor( class WiredTigerColumnStore::BulkBuilder final : public ColumnStore::BulkBuilder { public: BulkBuilder(WiredTigerColumnStore* idx, OperationContext* opCtx) - : _opCtx(opCtx), - _session(WiredTigerRecoveryUnit::get(_opCtx)->getSessionCache()->getSession()), - _cursor(openBulkCursor(idx)) {} - - ~BulkBuilder() { - _cursor->close(_cursor); - } + : _opCtx(opCtx), _cursor(idx->uri(), opCtx) {} void addCell(PathView path, const RecordId& rid, CellView cell) override { - uasserted(ErrorCodes::NotImplemented, "WiredTigerColumnStore bulk builder"); - } + const std::string& key = makeKey(_buffer, path, rid); + WiredTigerItem keyItem(key.c_str(), key.size()); + _cursor->set_key(_cursor.get(), keyItem.Get()); -private: - WT_CURSOR* openBulkCursor(WiredTigerColumnStore* idx) { - // TODO SERVER-65484: Much of this logic can be shared with standard WT index. - uasserted(ErrorCodes::NotImplemented, "WiredTigerColumnStore bulk builder"); + WiredTigerItem cellItem(cell.rawData(), cell.size()); + _cursor->set_value(_cursor.get(), cellItem.Get()); + + invariantWTOK(wiredTigerCursorInsert(_opCtx, _cursor.get()), _cursor->session); + + ResourceConsumption::MetricsCollector::get(_opCtx).incrementOneIdxEntryWritten( + std::string(_cursor->uri), keyItem.size); } +private: std::string _buffer; OperationContext* const _opCtx; - UniqueWiredTigerSession const _session; - WT_CURSOR* const _cursor; + WiredTigerBulkLoadCursor _cursor; }; std::unique_ptr<ColumnStore::BulkBuilder> WiredTigerColumnStore::makeBulkBuilder( diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_cursor.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_cursor.cpp index 188983bf071..d5704989b20 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_cursor.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_cursor.cpp @@ -82,4 +82,32 @@ WiredTigerCursor::~WiredTigerCursor() { void WiredTigerCursor::reset() { invariantWTOK(_cursor->reset(_cursor), _cursor->session); } + +WiredTigerBulkLoadCursor::WiredTigerBulkLoadCursor(const std::string& indexUri, + OperationContext* opCtx) + : _session(WiredTigerRecoveryUnit::get(opCtx)->getSessionCache()->getSession()) { + // Open cursors can cause bulk open_cursor to fail with EBUSY. + // TODO any other cases that could cause EBUSY? + WiredTigerSession* outerSession = WiredTigerRecoveryUnit::get(opCtx)->getSession(); + outerSession->closeAllCursors(indexUri); + + // The 'checkpoint_wait=false' option is set to prefer falling back on the "non-bulk" cursor + // over waiting a potentially long time for a checkpoint. + WT_SESSION* sessionPtr = _session->getSession(); + int err = sessionPtr->open_cursor( + sessionPtr, indexUri.c_str(), nullptr, "bulk,checkpoint_wait=false", &_cursor); + if (!err) { + return; // Success + } + + LOGV2_WARNING(51783, + "failed to create WiredTiger bulk cursor: {error} falling back to non-bulk " + "cursor for index {index}", + "Failed to create WiredTiger bulk cursor, falling back to non-bulk", + "error"_attr = wiredtiger_strerror(err), + "index"_attr = indexUri); + + invariantWTOK(sessionPtr->open_cursor(sessionPtr, indexUri.c_str(), nullptr, nullptr, &_cursor), + sessionPtr); +} } // namespace mongo diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_cursor.h b/src/mongo/db/storage/wiredtiger/wiredtiger_cursor.h index b31f91465cf..716226e45a9 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_cursor.h +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_cursor.h @@ -81,4 +81,31 @@ protected: WT_CURSOR* _cursor = nullptr; // Owned }; + +/** + * An owning object wrapper for a WT_SESSION and WT_CURSOR configured for bulk loading when + * possible. The cursor is created and closed independently of the cursor cache, which does not + * store bulk cursors. It uses its own session to avoid hijacking an existing transaction in the + * current session. + */ +class WiredTigerBulkLoadCursor { +public: + WiredTigerBulkLoadCursor(const std::string& indexUri, OperationContext* opCtx); + + ~WiredTigerBulkLoadCursor() { + _cursor->close(_cursor); + } + + WT_CURSOR* get() const { + return _cursor; + } + + WT_CURSOR* operator->() const { + return get(); + } + +private: + UniqueWiredTigerSession const _session; + WT_CURSOR* _cursor = nullptr; // Owned +}; } // namespace mongo diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_index.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_index.cpp index 69c422c27d5..16870b4c7d7 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_index.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_index.cpp @@ -692,54 +692,16 @@ RecordId WiredTigerIndex::_decodeRecordIdAtEnd(const void* buffer, size_t size) class WiredTigerIndex::BulkBuilder : public SortedDataBuilderInterface { public: BulkBuilder(WiredTigerIndex* idx, OperationContext* opCtx) - : _ordering(idx->_ordering), - _opCtx(opCtx), - _session(WiredTigerRecoveryUnit::get(_opCtx)->getSessionCache()->getSession()), - _cursor(openBulkCursor(idx)) {} - - ~BulkBuilder() { - _cursor->close(_cursor); - } + : _ordering(idx->_ordering), _opCtx(opCtx), _cursor(idx->uri(), _opCtx) {} protected: - WT_CURSOR* openBulkCursor(WiredTigerIndex* idx) { - // Open cursors can cause bulk open_cursor to fail with EBUSY. - // TODO any other cases that could cause EBUSY? - WiredTigerSession* outerSession = WiredTigerRecoveryUnit::get(_opCtx)->getSession(); - outerSession->closeAllCursors(idx->uri()); - - // Not using cursor cache since we need to set "bulk". - WT_CURSOR* cursor; - // Use a different session to ensure we don't hijack an existing transaction. - // Configure the bulk cursor open to fail quickly if it would wait on a checkpoint - // completing - since checkpoints can take a long time, and waiting can result in - // an unexpected pause in building an index. - WT_SESSION* session = _session->getSession(); - int err = session->open_cursor( - session, idx->uri().c_str(), nullptr, "bulk,checkpoint_wait=false", &cursor); - if (!err) - return cursor; - - LOGV2_WARNING(51783, - "failed to create WiredTiger bulk cursor: {error} falling back to non-bulk " - "cursor for index {index}", - "Failed to create WiredTiger bulk cursor, falling back to non-bulk", - "error"_attr = wiredtiger_strerror(err), - "index"_attr = idx->uri()); - - invariantWTOK(session->open_cursor(session, idx->uri().c_str(), nullptr, nullptr, &cursor), - session); - return cursor; - } - void setKey(WT_CURSOR* cursor, const WT_ITEM* item) { cursor->set_key(cursor, item); } const Ordering _ordering; OperationContext* const _opCtx; - UniqueWiredTigerSession const _session; - WT_CURSOR* const _cursor; + WiredTigerBulkLoadCursor _cursor; }; @@ -756,16 +718,16 @@ public: // Can't use WiredTigerCursor since we aren't using the cache. WiredTigerItem item(keyString.getBuffer(), keyString.getSize()); - setKey(_cursor, item.Get()); + setKey(_cursor.get(), item.Get()); const KeyString::TypeBits typeBits = keyString.getTypeBits(); WiredTigerItem valueItem = typeBits.isAllZeros() ? emptyItem : WiredTigerItem(typeBits.getBuffer(), typeBits.getSize()); - _cursor->set_value(_cursor, valueItem.Get()); + _cursor->set_value(_cursor.get(), valueItem.Get()); - invariantWTOK(wiredTigerCursorInsert(_opCtx, _cursor), _cursor->session); + invariantWTOK(wiredTigerCursorInsert(_opCtx, _cursor.get()), _cursor->session); auto& metricsCollector = ResourceConsumption::MetricsCollector::get(_opCtx); metricsCollector.incrementOneIdxEntryWritten(std::string(_cursor->uri), item.size); @@ -824,16 +786,16 @@ public: // Can't use WiredTigerCursor since we aren't using the cache. WiredTigerItem keyItem(newKeyString.getBuffer(), newKeyString.getSize()); - setKey(_cursor, keyItem.Get()); + setKey(_cursor.get(), keyItem.Get()); const KeyString::TypeBits typeBits = newKeyString.getTypeBits(); WiredTigerItem valueItem = typeBits.isAllZeros() ? emptyItem : WiredTigerItem(typeBits.getBuffer(), typeBits.getSize()); - _cursor->set_value(_cursor, valueItem.Get()); + _cursor->set_value(_cursor.get(), valueItem.Get()); - invariantWTOK(wiredTigerCursorInsert(_opCtx, _cursor), _cursor->session); + invariantWTOK(wiredTigerCursorInsert(_opCtx, _cursor.get()), _cursor->session); auto& metricsCollector = ResourceConsumption::MetricsCollector::get(_opCtx); metricsCollector.incrementOneIdxEntryWritten(std::string(_cursor->uri), keyItem.size); @@ -882,10 +844,10 @@ public: WiredTigerItem keyItem(newKeyString.getBuffer(), sizeWithoutRecordId); WiredTigerItem valueItem(value.getBuffer(), value.getSize()); - setKey(_cursor, keyItem.Get()); - _cursor->set_value(_cursor, valueItem.Get()); + setKey(_cursor.get(), keyItem.Get()); + _cursor->set_value(_cursor.get(), valueItem.Get()); - invariantWTOK(wiredTigerCursorInsert(_opCtx, _cursor), _cursor->session); + invariantWTOK(wiredTigerCursorInsert(_opCtx, _cursor.get()), _cursor->session); auto& metricsCollector = ResourceConsumption::MetricsCollector::get(_opCtx); metricsCollector.incrementOneIdxEntryWritten(std::string(_cursor->uri), keyItem.size); |