summaryrefslogtreecommitdiff
path: root/src/mongo/tools/import.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/tools/import.cpp')
-rw-r--r--src/mongo/tools/import.cpp589
1 files changed, 0 insertions, 589 deletions
diff --git a/src/mongo/tools/import.cpp b/src/mongo/tools/import.cpp
deleted file mode 100644
index 9d2f45ae472..00000000000
--- a/src/mongo/tools/import.cpp
+++ /dev/null
@@ -1,589 +0,0 @@
-/**
-* Copyright (C) 2008 10gen Inc.
-*
-* This program is free software: you can redistribute it and/or modify
-* it under the terms of the GNU Affero General Public License, version 3,
-* as published by the Free Software Foundation.
-*
-* This program is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-* GNU Affero General Public License for more details.
-*
-* You should have received a copy of the GNU Affero General Public License
-* along with this program. If not, see <http://www.gnu.org/licenses/>.
-*
-* As a special exception, the copyright holders give permission to link the
-* code of portions of this program with the OpenSSL library under certain
-* conditions as described in each individual source file and distribute
-* linked combinations including the program with the OpenSSL library. You
-* must comply with the GNU Affero General Public License in all respects
-* for all of the code used other than as permitted herein. If you modify
-* file(s) with this exception, you may extend this exception to your
-* version of the file(s), but you are not obligated to do so. If you do not
-* wish to do so, delete this exception statement from your version. If you
-* delete this exception statement from all source files in the program,
-* then also delete it in the license file.
-*/
-
-#include "mongo/pch.h"
-
-#include <boost/algorithm/string.hpp>
-#include <boost/filesystem/operations.hpp>
-#include <fstream>
-#include <iostream>
-
-#include "mongo/base/initializer.h"
-#include "mongo/db/json.h"
-#include "mongo/tools/mongoimport_options.h"
-#include "mongo/tools/tool.h"
-#include "mongo/util/log.h"
-#include "mongo/util/options_parser/option_section.h"
-#include "mongo/util/text.h"
-
-using namespace mongo;
-using std::string;
-using std::stringstream;
-
-class Import : public Tool {
-
- enum Type { JSON , CSV , TSV };
- Type _type;
-
- const char * _sep;
- static const int BUF_SIZE;
-
- void csvTokenizeRow(const string& row, vector<string>& tokens) {
- bool inQuotes = false;
- bool prevWasQuote = false;
- bool tokenQuoted = false;
- string curtoken = "";
- for (string::const_iterator it = row.begin(); it != row.end(); ++it) {
- char element = *it;
- if (element == '"') {
- if (!inQuotes) {
- inQuotes = true;
- tokenQuoted = true;
- curtoken = "";
- } else {
- if (prevWasQuote) {
- curtoken += "\"";
- prevWasQuote = false;
- } else {
- prevWasQuote = true;
- }
- }
- } else {
- if (inQuotes && prevWasQuote) {
- inQuotes = false;
- prevWasQuote = false;
- tokens.push_back(curtoken);
- }
-
- if (element == ',' && !inQuotes) {
- if (!tokenQuoted) { // If token was quoted, it's already been added
- boost::trim(curtoken);
- tokens.push_back(curtoken);
- }
- curtoken = "";
- tokenQuoted = false;
- } else {
- curtoken += element;
- }
- }
- }
- if (!tokenQuoted || (inQuotes && prevWasQuote)) {
- boost::trim(curtoken);
- tokens.push_back(curtoken);
- }
- }
-
- void _append( BSONObjBuilder& b , const string& fieldName , const string& data ) {
- if (mongoImportGlobalParams.ignoreBlanks && data.size() == 0)
- return;
-
- if ( b.appendAsNumber( fieldName , data ) )
- return;
-
- // TODO: other types?
- b.append ( fieldName , data );
- }
-
- /*
- * Reads one line from in into buf.
- * Returns the number of bytes that should be skipped - the caller should
- * increment buf by this amount.
- */
- int getLine(istream* in, char* buf) {
- in->getline( buf , BUF_SIZE );
- if ((in->rdstate() & ios_base::eofbit) && (in->rdstate() & ios_base::failbit)) {
- // this is the last line, and it's empty (not even a newline)
- buf[0] = '\0';
- return 0;
- }
-
- uassert(16329, str::stream() << "read error, or input line too long (max length: "
- << BUF_SIZE << ")", !(in->rdstate() & ios_base::failbit));
- if (logger::globalLogDomain()->shouldLog(logger::LogSeverity::Debug(1))) {
- toolInfoLog() << "got line:" << buf << std::endl;
- }
-
- uassert( 10263 , "unknown error reading file" ,
- (!(in->rdstate() & ios_base::badbit)) &&
- (!(in->rdstate() & ios_base::failbit) || (in->rdstate() & ios_base::eofbit)) );
-
- int numBytesSkipped = 0;
- if (strncmp("\xEF\xBB\xBF", buf, 3) == 0) { // UTF-8 BOM (notepad is stupid)
- buf += 3;
- numBytesSkipped += 3;
- }
-
- uassert(13289, "Invalid UTF8 character detected", isValidUTF8(buf));
- return numBytesSkipped;
- }
-
- /*
- * description:
- * given a pointer into a JSON array, returns the next valid JSON object
- * args:
- * buf - pointer into the JSON array
- * o - BSONObj to fill
- * numBytesRead - return parameter for how far we read
- * return:
- * true if an object was successfully parsed
- * false if there is no object left to parse
- * throws:
- * exception on parsing error
- */
- bool parseJSONArray(char* buf, BSONObj* o, int* numBytesRead) {
-
- // Skip extra characters since fromjson must be passed a character buffer that starts with a
- // valid JSON object, and does not accept JSON arrays.
- // (NOTE: this doesn't catch all invalid JSON arrays, but does fail on invalid characters)
- *numBytesRead = 0;
- while (buf[0] == '[' ||
- buf[0] == ']' ||
- buf[0] == ',' ||
- isspace(buf[0])) {
- (*numBytesRead)++;
- buf++;
- }
-
- if (buf[0] == '\0')
- return false;
-
- try {
- int len = 0;
- *o = fromjson(buf, &len);
- (*numBytesRead) += len;
- } catch ( MsgAssertionException& e ) {
- uasserted(13293, string("Invalid JSON passed to mongoimport: ") + e.what());
- }
-
- return true;
- }
-
- /*
- * Parses one object from the input file. This usually corresponds to one line in the input
- * file, unless the file is a CSV and contains a newline within a quoted string entry.
- * Returns a true if a BSONObj was successfully created and false if not.
- */
- bool parseRow(istream* in, BSONObj& o, int& numBytesRead) {
- boost::scoped_array<char> buffer(new char[BUF_SIZE+2]);
- char* line = buffer.get();
-
- numBytesRead = getLine(in, line);
- line += numBytesRead;
-
- if (line[0] == '\0') {
- return false;
- }
- numBytesRead += strlen( line );
-
- if (_type == JSON) {
- // Strip out trailing whitespace
- char * end = ( line + strlen( line ) ) - 1;
- while ( end >= line && isspace(*end) ) {
- *end = 0;
- end--;
- }
- try {
- o = fromjson( line );
- } catch ( MsgAssertionException& e ) {
- uasserted(13504, string("BSON representation of supplied JSON is too large: ") + e.what());
- }
- return true;
- }
-
- vector<string> tokens;
- if (_type == CSV) {
- string row;
- bool inside_quotes = false;
- size_t last_quote = 0;
- while (true) {
- string lineStr(line);
- // Deal with line breaks in quoted strings
- last_quote = lineStr.find_first_of('"');
- while (last_quote != string::npos) {
- inside_quotes = !inside_quotes;
- last_quote = lineStr.find_first_of('"', last_quote+1);
- }
-
- row.append(lineStr);
-
- if (inside_quotes) {
- row.append("\n");
- int num = getLine(in, line);
- line += num;
- numBytesRead += num;
-
- uassert(15854, "CSV file ends while inside quoted field", line[0] != '\0');
- numBytesRead += strlen( line );
- } else {
- break;
- }
- }
- // now 'row' is string corresponding to one row of the CSV file
- // (which may span multiple lines) and represents one BSONObj
- csvTokenizeRow(row, tokens);
- }
- else { // _type == TSV
- while (line[0] != '\t' && isspace(line[0])) { // Strip leading whitespace, but not tabs
- line++;
- }
-
- boost::split(tokens, line, boost::is_any_of(_sep));
- }
-
- // Now that the row is tokenized, create a BSONObj out of it.
- BSONObjBuilder b;
- unsigned int pos=0;
- for (vector<string>::iterator it = tokens.begin(); it != tokens.end(); ++it) {
- string token = *it;
- if (mongoImportGlobalParams.headerLine) {
- toolGlobalParams.fields.push_back(token);
- }
- else {
- string name;
- if (pos < toolGlobalParams.fields.size()) {
- name = toolGlobalParams.fields[pos];
- }
- else {
- stringstream ss;
- ss << "field" << pos;
- name = ss.str();
- }
- pos++;
-
- _append( b , name , token );
- }
- }
- o = b.obj();
- return true;
- }
-
-public:
- Import() : Tool() {
- _type = JSON;
- }
-
- virtual void printHelp( ostream & out ) {
- printMongoImportHelp(&out);
- }
-
- unsigned long long lastErrorFailures;
-
- /** @return true if ok */
- bool checkLastError() {
- string s = conn().getLastError();
- if( !s.empty() ) {
- if( str::contains(s,"uplicate") ) {
- // we don't want to return an error from the mongoimport process for
- // dup key errors
- toolInfoLog() << s << endl;
- }
- else {
- lastErrorFailures++;
- toolInfoLog() << "error: " << s << endl;
- return false;
- }
- }
- return true;
- }
-
- void importDocument (const std::string &ns, const BSONObj& o) {
- bool doUpsert = mongoImportGlobalParams.upsert;
- BSONObjBuilder b;
- if (mongoImportGlobalParams.upsert) {
- for (vector<string>::const_iterator it = mongoImportGlobalParams.upsertFields.begin(),
- end = mongoImportGlobalParams.upsertFields.end(); it != end; ++it) {
- BSONElement e = o.getFieldDotted(it->c_str());
- if (e.eoo()) {
- doUpsert = false;
- break;
- }
- b.appendAs(e, *it);
- }
- }
-
- if (doUpsert) {
- conn().update(ns, Query(b.obj()), o, true);
- }
- else {
- conn().insert(ns.c_str(), o);
- }
- }
-
- int run() {
- long long fileSize = 0;
- int headerRows = 0;
-
- istream * in = &cin;
-
- ifstream file(mongoImportGlobalParams.filename.c_str(), ios_base::in);
-
- if (mongoImportGlobalParams.filename.size() > 0 &&
- mongoImportGlobalParams.filename != "-") {
- if ( ! boost::filesystem::exists(mongoImportGlobalParams.filename) ) {
- toolError() << "file doesn't exist: " << mongoImportGlobalParams.filename
- << std::endl;
- return -1;
- }
- in = &file;
- fileSize = boost::filesystem::file_size(mongoImportGlobalParams.filename);
- }
-
- // check if we're actually talking to a machine that can write
- if (!isMaster()) {
- return -1;
- }
-
- string ns;
-
- try {
- ns = getNS();
- }
- catch (...) {
- // The only time getNS throws is when the collection was not specified. In that case,
- // check if the user specified a file name and use that as the collection name.
- if (!mongoImportGlobalParams.filename.empty()) {
- string oldCollName =
- boost::filesystem::path(mongoImportGlobalParams.filename).leaf().string();
- oldCollName = oldCollName.substr( 0 , oldCollName.find_last_of( "." ) );
- cerr << "using filename '" << oldCollName << "' as collection." << endl;
- ns = toolGlobalParams.db + "." + oldCollName;
- }
- else {
- printHelp(cerr);
- return -1;
- }
- }
-
- if (logger::globalLogDomain()->shouldLog(logger::LogSeverity::Debug(1))) {
- toolInfoLog() << "ns: " << ns << endl;
- }
-
- if (mongoImportGlobalParams.drop) {
- toolInfoLog() << "dropping: " << ns << endl;
- conn().dropCollection(ns.c_str());
- }
-
- if (mongoImportGlobalParams.type == "json")
- _type = JSON;
- else if (mongoImportGlobalParams.type == "csv") {
- _type = CSV;
- _sep = ",";
- }
- else if (mongoImportGlobalParams.type == "tsv") {
- _type = TSV;
- _sep = "\t";
- }
- else {
- toolError() << "don't know what type [" << mongoImportGlobalParams.type << "] is"
- << std::endl;
- return -1;
- }
-
- if (_type == CSV || _type == TSV) {
- if (mongoImportGlobalParams.headerLine) {
- headerRows = 1;
- }
- else {
- if (!toolGlobalParams.fieldsSpecified) {
- throw UserException(9998, "You need to specify fields or have a headerline to "
- "import this file type");
- }
- }
- }
-
-
- time_t start = time(0);
- if (logger::globalLogDomain()->shouldLog(logger::LogSeverity::Debug(1))) {
- toolInfoLog() << "filesize: " << fileSize << endl;
- }
- ProgressMeter pm( fileSize );
- int num = 0;
- int lastNumChecked = num;
- int errors = 0;
- lastErrorFailures = 0;
- int len = 0;
-
- // We have to handle jsonArrays differently since we can't read line by line
- if (_type == JSON && mongoImportGlobalParams.jsonArray) {
-
- // We cycle through these buffers in order to continuously read from the stream
- boost::scoped_array<char> buffer1(new char[BUF_SIZE]);
- boost::scoped_array<char> buffer2(new char[BUF_SIZE]);
- char* current_buffer = buffer1.get();
- char* next_buffer = buffer2.get();
- char* temp_buffer;
-
- // buffer_base_offset is the offset into the stream where our buffer starts, while
- // input_stream_offset is the total number of bytes read from the stream
- uint64_t buffer_base_offset = 0;
- uint64_t input_stream_offset = 0;
-
- // Fill our buffer
- // NOTE: istream::get automatically appends '\0' at the end of what it reads
- in->get(current_buffer, BUF_SIZE, '\0');
- uassert(16808, str::stream() << "read error: " << strerror(errno), !in->fail());
-
- // Record how far we read into the stream.
- input_stream_offset += in->gcount();
-
- while (true) {
- try {
-
- BSONObj o;
-
- // Try to parse (parseJSONArray)
- if (!parseJSONArray(current_buffer, &o, &len)) {
- break;
- }
-
- // Import documents
- if (mongoImportGlobalParams.doimport) {
- importDocument(ns, o);
-
- if (num < 10) {
- // we absolutely want to check the first and last op of the batch. we do
- // a few more as that won't be too time expensive.
- checkLastError();
- lastNumChecked = num;
- }
- }
-
- // Copy over the part of buffer that was not parsed
- strcpy(next_buffer, current_buffer + len);
-
- // Advance our buffer base past what we've already parsed
- buffer_base_offset += len;
-
- // Fill up the end of our next buffer only if there is something in the stream
- if (!in->eof()) {
- // NOTE: istream::get automatically appends '\0' at the end of what it reads
- in->get(next_buffer + (input_stream_offset - buffer_base_offset),
- BUF_SIZE - (input_stream_offset - buffer_base_offset), '\0');
- uassert(16809, str::stream() << "read error: "
- << strerror(errno), !in->fail());
-
- // Record how far we read into the stream.
- input_stream_offset += in->gcount();
- }
-
- // Swap buffer pointers
- temp_buffer = current_buffer;
- current_buffer = next_buffer;
- next_buffer = temp_buffer;
-
- num++;
- }
- catch ( const std::exception& e ) {
- toolError() << "exception: " << e.what()
- << ", current buffer: " << current_buffer << std::endl;
- errors++;
-
- // Since we only support JSON arrays all on one line, we might as well stop now
- // because we can't read any more documents
- break;
- }
-
- if (!toolGlobalParams.quiet) {
- if (pm.hit(len + 1)) {
- log() << "\t\t\t" << num << "\t" << (num / (time(0) - start)) << "/second"
- << std::endl;
- }
- }
- }
- }
- else {
- while (in->rdstate() == 0) {
- try {
- BSONObj o;
-
- if (!parseRow(in, o, len)) {
- continue;
- }
-
- if (mongoImportGlobalParams.headerLine) {
- mongoImportGlobalParams.headerLine = false;
- }
- else if (mongoImportGlobalParams.doimport) {
- importDocument(ns, o);
-
- if (num < 10) {
- // we absolutely want to check the first and last op of the batch. we do
- // a few more as that won't be too time expensive.
- checkLastError();
- lastNumChecked = num;
- }
- }
-
- num++;
- }
- catch ( const std::exception& e ) {
- toolError() << "exception:" << e.what() << std::endl;
- errors++;
-
- if (mongoImportGlobalParams.stopOnError)
- break;
- }
-
- if (!toolGlobalParams.quiet) {
- if (pm.hit(len + 1)) {
- log() << "\t\t\t" << num << "\t" << (num / (time(0) - start)) << "/second"
- << std::endl;
- }
- }
- }
- }
-
- // this is for two reasons: to wait for all operations to reach the server and be processed, and this will wait until all data reaches the server,
- // and secondly to check if there were an error (on the last op)
- if( lastNumChecked+1 != num ) { // avoid redundant log message if already reported above
- toolInfoLog() << "check " << lastNumChecked << " " << num << endl;
- checkLastError();
- }
-
- bool hadErrors = lastErrorFailures || errors;
-
- // the message is vague on lastErrorFailures as we don't call it on every single operation.
- // so if we have a lastErrorFailure there might be more than just what has been counted.
- toolInfoLog() << (lastErrorFailures ? "tried to import " : "imported ")
- << (num - headerRows)
- << (((num - headerRows) == 1) ? " document" : " documents")
- << std::endl;
-
- if ( !hadErrors )
- return 0;
-
- toolError() << "encountered " << (lastErrorFailures?"at least ":"")
- << lastErrorFailures+errors << " error(s)"
- << (lastErrorFailures+errors == 1 ? "" : "s") << std::endl;
- return -1;
- }
-};
-
-const int Import::BUF_SIZE(1024 * 1024 * 16);
-
-REGISTER_MONGO_TOOL(Import);