/**
* Copyright (C) 2015 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects
* for all of the code used other than as permitted herein. If you modify
* file(s) with this exception, you may extend this exception to your
* version of the file(s), but you are not obligated to do so. If you do not
* wish to do so, delete this exception statement from your version. If you
* delete this exception statement from all source files in the program,
* then also delete it in the license file.
*/
#include "mongo/platform/basic.h"
#include "mongo/db/ftdc/file_reader.h"
#include
#include
#include "mongo/base/data_range_cursor.h"
#include "mongo/base/data_type_validated.h"
#include "mongo/bson/bsonmisc.h"
#include "mongo/db/ftdc/config.h"
#include "mongo/db/ftdc/util.h"
#include "mongo/db/jsobj.h"
#include "mongo/rpc/object_check.h"
#include "mongo/util/mongoutils/str.h"
namespace mongo {
FTDCFileReader::~FTDCFileReader() {
_stream.close();
}
StatusWith FTDCFileReader::hasNext() {
while (true) {
if (_state == State::kNeedsDoc) {
if (_stream.eof()) {
return {false};
}
auto swDoc = readDocument();
if (!swDoc.isOK()) {
return swDoc.getStatus();
}
if (swDoc.getValue().isEmpty()) {
return {false};
}
_parent = swDoc.getValue();
// metadata or metrics?
auto swType = FTDCBSONUtil::getBSONDocumentType(_parent);
if (!swType.isOK()) {
return swType.getStatus();
}
auto swId = FTDCBSONUtil::getBSONDocumentId(_parent);
if (!swId.isOK()) {
return swId.getStatus();
}
_dateId = swId.getValue();
FTDCBSONUtil::FTDCType type = swType.getValue();
if (type == FTDCBSONUtil::FTDCType::kMetadata) {
_state = State::kMetadataDoc;
auto swMetadata = FTDCBSONUtil::getBSONDocumentFromMetadataDoc(_parent);
if (!swMetadata.isOK()) {
return swMetadata.getStatus();
}
_metadata = swMetadata.getValue();
} else if (type == FTDCBSONUtil::FTDCType::kMetricChunk) {
_state = State::kMetricChunk;
auto swDocs = FTDCBSONUtil::getMetricsFromMetricDoc(_parent, &_decompressor);
if (!swDocs.isOK()) {
return swDocs.getStatus();
}
_docs = swDocs.getValue();
// There is always at least the reference document
_pos = 0;
}
return {true};
}
// We previously returned a metadata document, now we need another document from disk
if (_state == State::kMetadataDoc) {
_state = State::kNeedsDoc;
continue;
}
// If we have a metric chunk, return the next document in the chunk until the chunk is
// exhausted
if (_state == State::kMetricChunk) {
if (_pos + 1 == _docs.size()) {
_state = State::kNeedsDoc;
continue;
}
_pos++;
return {true};
}
}
}
std::tuple FTDCFileReader::next() {
dassert(_state == State::kMetricChunk || _state == State::kMetadataDoc);
if (_state == State::kMetadataDoc) {
return std::tuple(
FTDCBSONUtil::FTDCType::kMetadata, _metadata, _dateId);
}
if (_state == State::kMetricChunk) {
return std::tuple(
FTDCBSONUtil::FTDCType::kMetricChunk, _docs[_pos], _dateId);
}
MONGO_UNREACHABLE;
}
StatusWith FTDCFileReader::readDocument() {
if (!_stream.is_open()) {
return {ErrorCodes::FileNotOpen, "open() needs to be called first."};
}
char buf[sizeof(std::int32_t)];
_stream.read(buf, sizeof(buf));
if (sizeof(buf) != _stream.gcount()) {
// Did we read exactly zero bytes and hit the eof?
// Then return an empty document to indicate we are done reading the file.
if (0 == _stream.gcount() && _stream.eof()) {
return BSONObj();
}
return {ErrorCodes::FileStreamFailed,
str::stream() << "Failed to read 4 bytes from file \'" << _file.generic_string()
<< "\'"};
}
std::uint32_t bsonLength = ConstDataView(buf).read>();
// Reads past the end of the file will be caught below
// The interim file sentinel is 8 bytes of zero.
if (bsonLength == 0) {
return BSONObj();
}
// Reads past the end of the file will be caught below
if (bsonLength > _fileSize || bsonLength < BSONObj::kMinBSONLength) {
return {ErrorCodes::InvalidLength,
str::stream() << "Invalid BSON length found in file \'" << _file.generic_string()
<< "\'"};
}
// Read the BSON document
_buffer.resize(bsonLength);
// Stuff the length into the front
memcpy(_buffer.data(), buf, sizeof(std::int32_t));
// Read the length - 4 bytes from the file
std::int32_t readSize = bsonLength - sizeof(std::int32_t);
_stream.read(_buffer.data() + sizeof(std::int32_t), readSize);
if (readSize != _stream.gcount()) {
return {ErrorCodes::FileStreamFailed,
str::stream() << "Failed to read " << readSize << " bytes from file \'"
<< _file.generic_string()
<< "\'"};
}
ConstDataRange cdr(_buffer.data(), _buffer.data() + bsonLength);
// TODO: Validated only validates objects based on a flag which is the default at the moment
auto swl = cdr.read>();
if (!swl.isOK()) {
return swl.getStatus();
}
return {swl.getValue().val};
}
Status FTDCFileReader::open(const boost::filesystem::path& file) {
_stream.open(file.c_str(), std::ios_base::in | std::ios_base::binary);
if (!_stream.is_open()) {
return Status(ErrorCodes::FileStreamFailed, "Failed to open file " + file.generic_string());
}
_fileSize = boost::filesystem::file_size(file);
_file = file;
return Status::OK();
}
} // namespace mongo