summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGabe Villasana <villagab4@gmail.com>2019-07-09 14:48:25 -0400
committerGabe Villasana <villagab4@gmail.com>2019-07-24 16:08:28 -0400
commitf50faf0265a66ee7811d01701ce1c9fb91bdce90 (patch)
treea62fba2bc3225cfa6ad7219fee8b1ba1667b00b7
parent1a0d2933c9c26cb7770bf7e4282fa573cdc574c3 (diff)
downloadmongo-f50faf0265a66ee7811d01701ce1c9fb91bdce90.tar.gz
SERVER-41905 Add in-memory checksumming to external sorter
-rw-r--r--src/mongo/db/sorter/SConscript2
-rw-r--r--src/mongo/db/sorter/sorter.cpp65
-rw-r--r--src/mongo/db/sorter/sorter.h6
3 files changed, 69 insertions, 4 deletions
diff --git a/src/mongo/db/sorter/SConscript b/src/mongo/db/sorter/SConscript
index 5c3508586a3..083d4e0fdad 100644
--- a/src/mongo/db/sorter/SConscript
+++ b/src/mongo/db/sorter/SConscript
@@ -16,6 +16,6 @@ sorterEnv.CppUnitTest(
'$BUILD_DIR/mongo/db/storage/encryption_hooks',
'$BUILD_DIR/mongo/db/storage/storage_options',
'$BUILD_DIR/mongo/s/is_mongos',
- '$BUILD_DIR/third_party/shim_snappy'
+ '$BUILD_DIR/third_party/shim_snappy',
],
)
diff --git a/src/mongo/db/sorter/sorter.cpp b/src/mongo/db/sorter/sorter.cpp
index 4efa4a79840..25d1b9e77f1 100644
--- a/src/mongo/db/sorter/sorter.cpp
+++ b/src/mongo/db/sorter/sorter.cpp
@@ -69,6 +69,20 @@
namespace mongo {
+namespace {
+
+/**
+ * Calculates and returns a new murmur hash value based on the prior murmur hash and a new piece
+ * of data.
+ */
+uint32_t addDataToChecksum(const void* startOfData, size_t sizeOfData, uint32_t checksum) {
+ unsigned newChecksum;
+ MurmurHash3_x86_32(startOfData, sizeOfData, checksum, &newChecksum);
+ return newChecksum;
+}
+
+} // namespace
+
namespace sorter {
using std::shared_ptr;
@@ -158,12 +172,14 @@ public:
FileIterator(const std::string& fileName,
std::streampos fileStartOffset,
std::streampos fileEndOffset,
- const Settings& settings)
+ const Settings& settings,
+ const uint32_t checksum)
: _settings(settings),
_done(false),
_fileName(fileName),
_fileStartOffset(fileStartOffset),
- _fileEndOffset(fileEndOffset) {
+ _fileEndOffset(fileEndOffset),
+ _originalChecksum(checksum) {
uassert(16815,
str::stream() << "unexpected empty file: " << _fileName,
boost::filesystem::file_size(_fileName) != 0);
@@ -191,6 +207,18 @@ public:
str::stream() << "error closing file \"" << _fileName << "\": "
<< myErrnoWithDescription(),
!_file.fail());
+
+ // If the file iterator reads through all data objects, we can ensure non-corrupt data
+ // by comparing the newly calculated checksum with the original checksum from the data
+ // written to disk. Some iterators do not read back all data from the file, which prohibits
+ // the _afterReadChecksum from obtaining all the information needed. Thus, we only fassert
+ // if all data that was written to disk is read back and the checksums are not equivalent.
+ if (_done && _bufferReader->atEof() && (_originalChecksum != _afterReadChecksum)) {
+ fassert(31182,
+ Status(ErrorCodes::Error::ChecksumMismatch,
+ "Data read from disk does not match what was written to disk. Possible "
+ "corruption of data."));
+ }
}
bool more() {
@@ -203,12 +231,22 @@ public:
verify(!_done);
fillBufferIfNeeded();
+ const char* startOfNewData = static_cast<const char*>(_bufferReader->pos());
+
// Note: calling read() on the _bufferReader buffer in the deserialize function advances the
// buffer. Since Key comes before Value in the _bufferReader, and C++ makes no function
// parameter evaluation order guarantees, we cannot deserialize Key and Value straight into
// the Data constructor
auto first = Key::deserializeForSorter(*_bufferReader, _settings.first);
auto second = Value::deserializeForSorter(*_bufferReader, _settings.second);
+
+ // The difference of _bufferReader's position before and after reading the data
+ // will provide the length of the data that was just read.
+ const char* endOfNewData = static_cast<const char*>(_bufferReader->pos());
+
+ _afterReadChecksum =
+ addDataToChecksum(startOfNewData, endOfNewData - startOfNewData, _afterReadChecksum);
+
return Data(std::move(first), std::move(second));
}
@@ -310,12 +348,23 @@ private:
const Settings _settings;
bool _done;
+
std::unique_ptr<char[]> _buffer;
std::unique_ptr<BufReader> _bufferReader;
std::string _fileName; // File containing the sorted data range.
std::streampos _fileStartOffset; // File offset at which the sorted data range starts.
std::streampos _fileEndOffset; // File offset at which the sorted data range ends.
std::ifstream _file;
+
+ // Checksum value that is updated with each read of a data object from disk. We can compare
+ // this value with _originalChecksum to check for data corruption if and only if the
+ // FileIterator is exhausted.
+ uint32_t _afterReadChecksum = 0;
+
+ // Checksum value retrieved from SortedFileWriter that was calculated as data was spilled
+ // to disk. This is not modified, and is only used for comparison against _afterReadChecksum
+ // when the FileIterator is exhausted to ensure no data corruption.
+ const uint32_t _originalChecksum;
};
/**
@@ -935,9 +984,19 @@ SortedFileWriter<Key, Value>::SortedFileWriter(const SortOptions& opts,
template <typename Key, typename Value>
void SortedFileWriter<Key, Value>::addAlreadySorted(const Key& key, const Value& val) {
+
+ // Offset that points to the place in the buffer where a new data object will be stored.
+ int _nextObjPos = _buffer.len();
+
+ // Add serialized key and value to the buffer.
key.serializeForSorter(_buffer);
val.serializeForSorter(_buffer);
+ // Serializing the key and value grows the buffer, but _buffer.buf() still points to the
+ // beginning. Use _buffer.len() to determine portion of buffer containing new datum.
+ _checksum =
+ addDataToChecksum(_buffer.buf() + _nextObjPos, _buffer.len() - _nextObjPos, _checksum);
+
if (_buffer.len() > 64 * 1024)
spill();
}
@@ -1008,7 +1067,7 @@ SortIteratorInterface<Key, Value>* SortedFileWriter<Key, Value>::done() {
_file.close();
return new sorter::FileIterator<Key, Value>(
- _fileName, _fileStartOffset, _fileEndOffset, _settings);
+ _fileName, _fileStartOffset, _fileEndOffset, _settings, _checksum);
}
//
diff --git a/src/mongo/db/sorter/sorter.h b/src/mongo/db/sorter/sorter.h
index f504d466ac3..87d80a45f92 100644
--- a/src/mongo/db/sorter/sorter.h
+++ b/src/mongo/db/sorter/sorter.h
@@ -29,6 +29,8 @@
#pragma once
+#include <third_party/murmurhash3/MurmurHash3.h>
+
#include <deque>
#include <fstream>
#include <memory>
@@ -260,6 +262,10 @@ private:
std::ofstream _file;
BufBuilder _buffer;
+ // Keeps track of the hash of all data objects spilled to disk. Passed to the FileIterator
+ // to ensure data has not been corrupted after reading from disk.
+ uint32_t _checksum = 0;
+
// Tracks where in the file we started and finished writing the sorted data range so that the
// information can be given to the Iterator in done(), and to the user via getFileEndOffset()
// for the next SortedFileWriter instance using the same file.