diff options
Diffstat (limited to 'src/mongo/db/storage/wiredtiger/wiredtiger_index.cpp')
-rw-r--r-- | src/mongo/db/storage/wiredtiger/wiredtiger_index.cpp | 60 |
1 files changed, 11 insertions, 49 deletions
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_index.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_index.cpp index 69c422c27d5..16870b4c7d7 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_index.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_index.cpp @@ -692,54 +692,16 @@ RecordId WiredTigerIndex::_decodeRecordIdAtEnd(const void* buffer, size_t size) class WiredTigerIndex::BulkBuilder : public SortedDataBuilderInterface { public: BulkBuilder(WiredTigerIndex* idx, OperationContext* opCtx) - : _ordering(idx->_ordering), - _opCtx(opCtx), - _session(WiredTigerRecoveryUnit::get(_opCtx)->getSessionCache()->getSession()), - _cursor(openBulkCursor(idx)) {} - - ~BulkBuilder() { - _cursor->close(_cursor); - } + : _ordering(idx->_ordering), _opCtx(opCtx), _cursor(idx->uri(), _opCtx) {} protected: - WT_CURSOR* openBulkCursor(WiredTigerIndex* idx) { - // Open cursors can cause bulk open_cursor to fail with EBUSY. - // TODO any other cases that could cause EBUSY? - WiredTigerSession* outerSession = WiredTigerRecoveryUnit::get(_opCtx)->getSession(); - outerSession->closeAllCursors(idx->uri()); - - // Not using cursor cache since we need to set "bulk". - WT_CURSOR* cursor; - // Use a different session to ensure we don't hijack an existing transaction. - // Configure the bulk cursor open to fail quickly if it would wait on a checkpoint - // completing - since checkpoints can take a long time, and waiting can result in - // an unexpected pause in building an index. - WT_SESSION* session = _session->getSession(); - int err = session->open_cursor( - session, idx->uri().c_str(), nullptr, "bulk,checkpoint_wait=false", &cursor); - if (!err) - return cursor; - - LOGV2_WARNING(51783, - "failed to create WiredTiger bulk cursor: {error} falling back to non-bulk " - "cursor for index {index}", - "Failed to create WiredTiger bulk cursor, falling back to non-bulk", - "error"_attr = wiredtiger_strerror(err), - "index"_attr = idx->uri()); - - invariantWTOK(session->open_cursor(session, idx->uri().c_str(), nullptr, nullptr, &cursor), - session); - return cursor; - } - void setKey(WT_CURSOR* cursor, const WT_ITEM* item) { cursor->set_key(cursor, item); } const Ordering _ordering; OperationContext* const _opCtx; - UniqueWiredTigerSession const _session; - WT_CURSOR* const _cursor; + WiredTigerBulkLoadCursor _cursor; }; @@ -756,16 +718,16 @@ public: // Can't use WiredTigerCursor since we aren't using the cache. WiredTigerItem item(keyString.getBuffer(), keyString.getSize()); - setKey(_cursor, item.Get()); + setKey(_cursor.get(), item.Get()); const KeyString::TypeBits typeBits = keyString.getTypeBits(); WiredTigerItem valueItem = typeBits.isAllZeros() ? emptyItem : WiredTigerItem(typeBits.getBuffer(), typeBits.getSize()); - _cursor->set_value(_cursor, valueItem.Get()); + _cursor->set_value(_cursor.get(), valueItem.Get()); - invariantWTOK(wiredTigerCursorInsert(_opCtx, _cursor), _cursor->session); + invariantWTOK(wiredTigerCursorInsert(_opCtx, _cursor.get()), _cursor->session); auto& metricsCollector = ResourceConsumption::MetricsCollector::get(_opCtx); metricsCollector.incrementOneIdxEntryWritten(std::string(_cursor->uri), item.size); @@ -824,16 +786,16 @@ public: // Can't use WiredTigerCursor since we aren't using the cache. WiredTigerItem keyItem(newKeyString.getBuffer(), newKeyString.getSize()); - setKey(_cursor, keyItem.Get()); + setKey(_cursor.get(), keyItem.Get()); const KeyString::TypeBits typeBits = newKeyString.getTypeBits(); WiredTigerItem valueItem = typeBits.isAllZeros() ? emptyItem : WiredTigerItem(typeBits.getBuffer(), typeBits.getSize()); - _cursor->set_value(_cursor, valueItem.Get()); + _cursor->set_value(_cursor.get(), valueItem.Get()); - invariantWTOK(wiredTigerCursorInsert(_opCtx, _cursor), _cursor->session); + invariantWTOK(wiredTigerCursorInsert(_opCtx, _cursor.get()), _cursor->session); auto& metricsCollector = ResourceConsumption::MetricsCollector::get(_opCtx); metricsCollector.incrementOneIdxEntryWritten(std::string(_cursor->uri), keyItem.size); @@ -882,10 +844,10 @@ public: WiredTigerItem keyItem(newKeyString.getBuffer(), sizeWithoutRecordId); WiredTigerItem valueItem(value.getBuffer(), value.getSize()); - setKey(_cursor, keyItem.Get()); - _cursor->set_value(_cursor, valueItem.Get()); + setKey(_cursor.get(), keyItem.Get()); + _cursor->set_value(_cursor.get(), valueItem.Get()); - invariantWTOK(wiredTigerCursorInsert(_opCtx, _cursor), _cursor->session); + invariantWTOK(wiredTigerCursorInsert(_opCtx, _cursor.get()), _cursor->session); auto& metricsCollector = ResourceConsumption::MetricsCollector::get(_opCtx); metricsCollector.incrementOneIdxEntryWritten(std::string(_cursor->uri), keyItem.size); |