diff options
author | Spencer T Brody <spencer@10gen.com> | 2011-06-30 12:07:43 -0400 |
---|---|---|
committer | Spencer T Brody <spencer@10gen.com> | 2011-07-26 10:33:00 -0400 |
commit | eaf7961e83afb62e65e9f6993127700f677c7dff (patch) | |
tree | 8087b10a7dd780aac2673eb21702c055ca449fa7 /tools | |
parent | 7ab832da996558f05cfbeea69c40d2852cf6d68a (diff) | |
download | mongo-eaf7961e83afb62e65e9f6993127700f677c7dff.tar.gz |
Refactor import code to make it easier to plug in new CSV parser.
Diffstat (limited to 'tools')
-rw-r--r-- | tools/import.cpp | 232 |
1 files changed, 141 insertions, 91 deletions
diff --git a/tools/import.cpp b/tools/import.cpp index c7a18b940ec..f631ab46dc4 100644 --- a/tools/import.cpp +++ b/tools/import.cpp @@ -44,6 +44,13 @@ class Import : public Tool { bool _doimport; bool _jsonArray; vector<string> _upsertFields; + static const int BUF_SIZE = 1024 * 1024 * 4; + + string stripLeadingWhitespace(string str) { + int i = 0; + while (isspace(str[i])) { ++i; }; // Finds index of first non-whitespace character + return str.substr(i, str.size() - i); + } void _append( BSONObjBuilder& b , const string& fieldName , const string& data ) { if ( b.appendAsNumber( fieldName , data ) ) @@ -56,88 +63,162 @@ class Import : public Tool { b.append( fieldName , data ); } - BSONObj parseLine( char * line ) { - uassert(13289, "Invalid UTF8 character detected", isValidUTF8(line)); + /* + * Reads one line from in into buf. + * Returns the number of bytes that should be skipped - the caller should + * increment buf by this amount. + */ + int getLine(istream* in, char* buf) { + if (_jsonArray) { + in->read(buf, BUF_SIZE); + uassert(13295, "JSONArray file too large", (in->rdstate() & ios_base::eofbit)); + buf[ in->gcount() ] = '\0'; + } + else { + in->getline( buf , BUF_SIZE ); + log(1) << "got line:" << buf << endl; + } + uassert( 10263 , "unknown error reading file" , + (!(in->rdstate() & ios_base::badbit)) && + (!(in->rdstate() & ios_base::failbit) || (in->rdstate() & ios_base::eofbit)) ); + + int numBytesSkipped = 0; + if (strncmp("\xEF\xBB\xBF", buf, 3) == 0) { // UTF-8 BOM (notepad is stupid) + buf += 3; + numBytesSkipped += 3; + } + + uassert(13289, "Invalid UTF8 character detected", isValidUTF8(buf)); + return numBytesSkipped; + } + + /* + * Parses a BSON object out of a JSON array. + * Returns number of bytes processed on success and -1 on failure. + */ + int parseJSONArray(char* buf, BSONObj& o) { + int len = 0; + while (buf[0] != '{' && buf[0] != '\0') { + len++; + buf++; + } + if (buf[0] == '\0') + return -1; + + int jslen; + o = fromjson(buf, &jslen); + len += jslen; + + return len; + } + + /* + * Parses one object from the input file. This usually corresponds to one line in the input + * file, unless the file is a CSV and contains a newline within a quoted string entry. + * Returns a true if a BSONObj was successfully created and false if not. + */ + bool parseRow(istream* in, BSONObj& o, int& numBytesRead) { + boost::scoped_array<char> line(new char[BUF_SIZE+2]); + char* buf = line.get(); + + numBytesRead = getLine(in, buf); + buf += numBytesRead; + + while ((_type != TSV || buf[0] != '\t') && isspace( buf[0] )) { + numBytesRead++; + buf++; + } + if (buf[0] == '\0') { + return false; + } + numBytesRead += strlen( buf ); - if ( _type == JSON ) { - char * end = ( line + strlen( line ) ) - 1; + if (_type == JSON) { + // Strip out trailing whitespace + char * end = ( buf + strlen( buf ) ) - 1; while ( isspace(*end) ) { *end = 0; end--; } - return fromjson( line ); + o = fromjson( buf ); + return true; } BSONObjBuilder b; + vector<string> tokens; unsigned int pos=0; - while ( line[0] ) { - string name; - if ( pos < _fields.size() ) { - name = _fields[pos]; - } - else { - stringstream ss; - ss << "field" << pos; - name = ss.str(); - } - pos++; - + while ( buf[0] ) { bool done = false; string data; char * end; - if ( _type == CSV && line[0] == '"' ) { - line++; //skip first '"' + if ( _type == CSV && buf[0] == '"' ) { + buf++; //skip first '"' while (true) { - end = strchr( line , '"' ); + end = strchr( buf , '"' ); if (!end) { - data += line; + data += buf; done = true; break; } else if (end[1] == '"') { // two '"'s get appended as one - data.append(line, end-line+1); //include '"' - line = end+2; //skip both '"'s + data.append(buf, end-buf+1); //include '"' + buf = end+2; //skip both '"'s } else if (end[-1] == '\\') { // "\\\"" gets appended as '"' - data.append(line, end-line-1); //exclude '\\' + data.append(buf, end-buf-1); //exclude '\\' data.append("\""); - line = end+1; //skip the '"' + buf = end+1; //skip the '"' } else { - data.append(line, end-line); - line = end+2; //skip '"' and ',' + data.append(buf, end-buf); + buf = end+2; //skip '"' and ',' break; } } } else { - end = strstr( line , _sep ); + end = strstr( buf , _sep ); if ( ! end ) { done = true; - data = string( line ); + data = string( buf ); } else { - data = string( line , end - line ); - line = end+1; + data = string( buf , end - buf ); + buf = end+1; } } + tokens.push_back(data); + if ( done ) + break; + } + + for (vector<string>::iterator token = tokens.begin(); token != tokens.end(); ++token) { if ( _headerLine ) { - while ( isspace( data[0] ) ) - data = data.substr( 1 ); - _fields.push_back( data ); + _fields.push_back(stripLeadingWhitespace(*token)); } - else - _append( b , name , data ); + else { + string name; + if ( pos < _fields.size() ) { + name = _fields[pos]; + } + else { + stringstream ss; + ss << "field" << pos; + name = ss.str(); + } + pos++; - if ( done ) - break; + _append( b , name , *token ); + } } - return b.obj(); + + o = b.obj(); + return true; } public: @@ -255,68 +336,37 @@ public: _jsonArray = true; } - int errors = 0; - - int num = 0; - time_t start = time(0); - log(1) << "filesize: " << fileSize << endl; ProgressMeter pm( fileSize ); - const int BUF_SIZE = 1024 * 1024 * 4; + int num = 0; + int errors = 0; + int len = 0; + // buf and line are only used when parsing a jsonArray boost::scoped_array<char> line(new char[BUF_SIZE+2]); - char * buf = line.get(); - while ( _jsonArray || in->rdstate() == 0 ) { - if (_jsonArray) { - if (buf == line.get()) { //first pass - in->read(buf, BUF_SIZE); - uassert(13295, "JSONArray file too large", (in->rdstate() & ios_base::eofbit)); - buf[ in->gcount() ] = '\0'; - } - } - else { - buf = line.get(); - in->getline( buf , BUF_SIZE ); - log(1) << "got line:" << buf << endl; - } - uassert( 10263 , "unknown error reading file" , - (!(in->rdstate() & ios_base::badbit)) && - (!(in->rdstate() & ios_base::failbit) || (in->rdstate() & ios_base::eofbit)) ); - - int len = 0; - if (strncmp("\xEF\xBB\xBF", buf, 3) == 0) { // UTF-8 BOM (notepad is stupid) - buf += 3; - len += 3; - } - - if (_jsonArray) { - while (buf[0] != '{' && buf[0] != '\0') { - len++; - buf++; - } - if (buf[0] == '\0') - break; - } - else { - while ((_type != TSV || buf[0] != '\t') && isspace( buf[0] )) { - len++; - buf++; - } - if (buf[0] == '\0') - continue; - len += strlen( buf ); - } + char* buf = line.get(); + while ( _jsonArray || in->rdstate() == 0 ) { try { BSONObj o; if (_jsonArray) { - int jslen; - o = fromjson(buf, &jslen); - len += jslen; - buf += jslen; + int bytesProcessed = 0; + if (buf == line.get()) { // Only read on first pass - the whole array must be on one line. + bytesProcessed = getLine(in, buf); + buf += bytesProcessed; + len += bytesProcessed; + } + if ((bytesProcessed = parseJSONArray(buf, o)) < 0) { + len += bytesProcessed; + break; + } + len += bytesProcessed; + buf += len; } else { - o = parseLine( buf ); + if (!parseRow(in, o, len)) { + continue; + } } if ( _headerLine ) { |