diff options
-rw-r--r-- | db/db.cpp | 6 | ||||
-rw-r--r-- | db/dbclient.cpp | 12 | ||||
-rw-r--r-- | db/dbclient.h | 2 | ||||
-rw-r--r-- | db/jsobj.cpp | 2 | ||||
-rw-r--r-- | db/jsobj.h | 5 | ||||
-rw-r--r-- | db/json.cpp | 7 | ||||
-rw-r--r-- | db/pdfile.cpp | 9 | ||||
-rw-r--r-- | db/query.cpp | 7 | ||||
-rw-r--r-- | db/repl.cpp | 136 | ||||
-rw-r--r-- | db/repl.h | 50 | ||||
-rw-r--r-- | stdafx.cpp | 5 | ||||
-rw-r--r-- | stdafx.h | 4 | ||||
-rw-r--r-- | util/log.h | 12 | ||||
-rw-r--r-- | util/sock.h | 1 |
14 files changed, 236 insertions, 22 deletions
diff --git a/db/db.cpp b/db/db.cpp index b588058f90f..44b9b3e364c 100644 --- a/db/db.cpp +++ b/db/db.cpp @@ -39,6 +39,7 @@ bool useCursors = true; boost::mutex dbMutex; void closeAllSockets(); +void startReplication(); struct MyStartupTests { MyStartupTests() { @@ -367,12 +368,13 @@ public: 115 replay, opLogging */ void listen(int port) { - const char *Version = "db version: 120 09jul2008 logging fix"; + const char *Version = "db version: 121"; problem() << Version << endl; pdfileInit(); - testTheDb(); + //testTheDb(); cout << curTimeMillis() % 10000 << " waiting for connections on port " << port << " ...\n" << endl; OurListener l(port); + startReplication(); l.listen(); } diff --git a/db/dbclient.cpp b/db/dbclient.cpp index 03b857ea3ad..55da39a98a5 100644 --- a/db/dbclient.cpp +++ b/db/dbclient.cpp @@ -21,6 +21,16 @@ #include "../util/builder.h" #include "jsobj.h" +JSObj DBClientConnection::findOne(const char *ns, JSObj query, JSObj *fieldsToReturn) { + auto_ptr<DBClientCursor> c = + this->query(ns, query, 1, 0, fieldsToReturn); + + if( !c->more() ) + return JSObj(); + + return c->next().copy(); +} + bool DBClientConnection::connect(const char *serverAddress, string& errmsg) { /* not reentrant! ok as used right now (we are in a big lock), but won't be later, so fix. */ @@ -95,7 +105,7 @@ bool DBClientCursor::more() { if( cursorId == 0 )
return false;
- cout << "TEMP: requestMore" << endl;
+// cout << "TEMP: requestMore" << endl;
requestMore();
return pos < nReturned;
}
diff --git a/db/dbclient.h b/db/dbclient.h index 812cc480f44..82354c2ae6a 100644 --- a/db/dbclient.h +++ b/db/dbclient.h @@ -74,4 +74,6 @@ public: returns 0 if error */ auto_ptr<DBClientCursor> query(const char *ns, JSObj query, int nToReturn = 0, int nToSkip = 0, JSObj *fieldsToReturn = 0); + + JSObj findOne(const char *ns, JSObj query, JSObj *fieldsToReturn = 0); }; diff --git a/db/jsobj.cpp b/db/jsobj.cpp index 423adb07131..49e435f45a0 100644 --- a/db/jsobj.cpp +++ b/db/jsobj.cpp @@ -737,7 +737,7 @@ JSObj JSObj::extractFields(JSObj pattern, JSObjBuilder& b) { const char * JSObj::getStringField(const char *name) { Element e = getField(name); - return e.type() == String ? e.valuestr() : 0; + return e.type() == String ? e.valuestr() : ""; } JSObj JSObj::getObjectField(const char *name) { diff --git a/db/jsobj.h b/db/jsobj.h index d36cb8d4e98..018be9359af 100644 --- a/db/jsobj.h +++ b/db/jsobj.h @@ -247,7 +247,9 @@ public: Element getField(const char *name); /* return has eoo() true if no match */ + // returns "" if DNE or wrong type const char * getStringField(const char *name); + JSObj getObjectField(const char *name); /* makes a new JSObj with the fields specified in pattern. @@ -383,6 +385,9 @@ public: b.append((int) strlen(str)+1); b.append(str); } + void append(const char *fieldName, string str) { + append(fieldName, str.c_str()); + } /* JSObj will free the buffer when it is finished. */ JSObj doneAndDecouple() { diff --git a/db/json.cpp b/db/json.cpp index cdf72b508c4..f92df2d248b 100644 --- a/db/json.cpp +++ b/db/json.cpp @@ -32,6 +32,13 @@ void value(JSObjBuilder& b, const char *&p, string& id) { p += 7; b.appendOID(id.c_str()); } + else if( *p == '1' ) { + b.append(id.c_str(), 1); + p++; + } + else { + assert(false); + } } void _fromjson(JSObjBuilder& b, const char *&p) { diff --git a/db/pdfile.cpp b/db/pdfile.cpp index 7167435ce1c..1841846a5f9 100644 --- a/db/pdfile.cpp +++ b/db/pdfile.cpp @@ -366,8 +366,9 @@ auto_ptr<Cursor> makeNamespaceCursor() { }*/ /* add a new namespace to the system catalog (<dbname>.system.namespaces). + options: { capped : ..., size : ... } */ -void addNewNamespaceToCatalog(const char *ns) { +void addNewNamespaceToCatalog(const char *ns, JSObj *options = 0) { cout << "New namespace: " << ns << endl; if( strstr(ns, "system.namespaces") ) { // system.namespaces holds all the others, so it is not explicitly listed in the catalog. @@ -378,6 +379,8 @@ void addNewNamespaceToCatalog(const char *ns) { { JSObjBuilder b; b.append("name", ns); + if( options ) + b.append("options", *options); JSObj j = b.done(); char client[256]; nsToClient(ns, client); @@ -411,7 +414,7 @@ bool userCreateNS(const char *ns, JSObj& j, string& err) { /* todo: do this only when we have allocated space successfully? or we could insert with a { ok: 0 } field and then go back and set to ok : 1 after we are done. */ - addNewNamespaceToCatalog(ns); + addNewNamespaceToCatalog(ns, &j); int ies = initialExtentSize(128); Element e = j.findElement("size"); @@ -1020,7 +1023,7 @@ DiskLoc DataFileMgr::insert(const char *ns, const void *buf, int len, bool god) const char *name = io.getStringField("name"); // name of the index tabletoidxns = io.getStringField("ns"); // table it indexes JSObj key = io.getObjectField("key"); - if( name == 0 || *name == 0 || tabletoidxns == 0 || key.isEmpty() || key.objsize() > 2048 ) { + if( *name == 0 || *tabletoidxns == 0 || key.isEmpty() || key.objsize() > 2048 ) { cout << "user warning: bad add index attempt name:" << (name?name:"") << "\n ns:" << (tabletoidxns?tabletoidxns:"") << "\n ourns:" << ns; cout << "\n idxobj:" << io.toString() << endl; diff --git a/db/query.cpp b/db/query.cpp index 6a06e0b42c6..b29527dd145 100644 --- a/db/query.cpp +++ b/db/query.cpp @@ -595,7 +595,12 @@ inline bool _runCommands(const char *ns, JSObj& jsobj, stringstream& ss, BufBuil ok = dbEval(jsobj, anObjBuilder); } else if( e.type() == Number ) { - if( strcmp(e.fieldName(), "dropDatabase") == 0 ) { + if( strcmp(e.fieldName(), "getoptime") == 0 ) { + valid = true; + ok = true; + anObjBuilder.append("optime", OpTime::now().asDouble()); + } + else if( strcmp(e.fieldName(), "dropDatabase") == 0 ) { if( 1 ) { cout << "dropDatabase " << ns << endl; valid = true; diff --git a/db/repl.cpp b/db/repl.cpp index f025455de3b..dac95c852c8 100644 --- a/db/repl.cpp +++ b/db/repl.cpp @@ -23,8 +23,26 @@ #include "../grid/message.h" #include "dbclient.h" #include "pdfile.h" +#include "query.h" +#include "json.h" extern JSObj emptyObj; +extern boost::mutex dbMutex; +auto_ptr<Cursor> findTableScan(const char *ns, JSObj& order); +bool userCreateNS(const char *ns, JSObj& j, string& err); + +OpTime last((unsigned) time(0), 1); + +OpTime OpTime::now() { + unsigned t = (unsigned) time(0); + if( last.secs == t ) + return OpTime(last.secs, last.i+1); + return OpTime(t, 1); +} + +/* Cloner ----------------------------------------------------------- + makes copy of existing database. +*/ class Cloner: boost::noncopyable { DBClientConnection conn; @@ -69,6 +87,11 @@ bool Cloner::go(const char *masterHost, string& errmsg) { const char *name = e.valuestr(); if( strstr(name, ".system.") || strchr(name, '$') ) continue; + JSObj options = collection.getObjectField("options"); + if( !options.isEmpty() ) { + string err; + userCreateNS(name, options, err); + } copy(name); } @@ -84,3 +107,116 @@ bool cloneFrom(const char *masterHost, string& errmsg) Cloner c;
return c.go(masterHost, errmsg);
}
+
+/* --------------------------------------------------------------*/
+
+Source::Source(JSObj o) {
+ hostName = o.getStringField("host");
+ sourceName = o.getStringField("source");
+ uassert( !hostName.empty() );
+ uassert( !sourceName.empty() );
+ Element e = o.getField("syncedTo");
+ if( !e.eoo() ) {
+ uassert( e.type() == Number );
+ syncedTo.asDouble() = e.number();
+ }
+}
+
+JSObj Source::jsobj() {
+ JSObjBuilder b;
+ b.append("host", hostName);
+ b.append("source", sourceName);
+ b.append("syncedTo", syncedTo.asDouble());
+ return b.doneAndDecouple();
+}
+
+void Source::updateOnDisk() {
+ JSObjBuilder b;
+ b.append("host", hostName);
+ b.append("source", sourceName);
+ JSObj pattern = b.done();
+
+ JSObj o = jsobj();
+
+ stringstream ss;
+ setClient("local.sources");
+ updateObjects("local.sources", o, pattern, false, ss);
+}
+
+void Source::cleanup(vector<Source*>& v) {
+ for( vector<Source*>::iterator i = v.begin(); i != v.end(); i++ )
+ delete *i;
+}
+
+void Source::loadAll(vector<Source*>& v) {
+ setClient("local.sources");
+ auto_ptr<Cursor> c = findTableScan("local.sources", emptyObj);
+ while( c->ok() ) {
+ v.push_back( new Source(c->current()) );
+ c->advance();
+ }
+}
+
+JSObj opTimeQuery = fromjson("{getoptime:1}");
+
+/* note: not yet in mutex at this point. */
+void Source::pull() {
+ log() << "pull source " << sourceName << '@' << hostName << endl;
+
+// if( syncedTo.isNull() ) {
+// }
+
+ DBClientConnection conn;
+ string errmsg;
+ if( !conn.connect(hostName.c_str(), errmsg) ) { + cout << " pull: cantconn " << errmsg << endl; + return; + } + + // get current mtime at the server. + JSObj o = conn.findOne("admin.$cmd", opTimeQuery);
+ Element e = o.findElement("optime");
+ if( e.eoo() ) {
+ cout << " pull: failed to get curtime from master" << endl;
+ cout << " " << o.toString() << endl;
+ return;
+ }
+ uassert( e.type() == Number );
+ OpTime serverCurTime;
+ serverCurTime.asDouble() = e.number();
+
+}
+
+/* --------------------------------------------------------------*/
+
+void replMain() {
+ vector<Source*> sources;
+
+ {
+ lock lk(dbMutex);
+ Source::loadAll(sources);
+ }
+
+ for( vector<Source*>::iterator i = sources.begin(); i != sources.end(); i++ ) {
+ (*i)->pull();
+ }
+
+ Source::cleanup(sources);
+}
+
+void replMainThread() {
+ while( 1 ) {
+ try {
+ replMain();
+ sleepsecs(5);
+ }
+ catch( AssertionException ) {
+ problem() << "Assertion in replMainThread(): sleeping 5 minutes before retry" << endl;
+ sleepsecs(300);
+ }
+ }
+}
+
+void startReplication() {
+// boost::thread repl_thread(replMainThread);
+}
diff --git a/db/repl.h b/db/repl.h index 9a66bfbde41..0ea6b31e6f4 100644 --- a/db/repl.h +++ b/db/repl.h @@ -1,5 +1,5 @@ -// repl.h - replication
-
+// repl.h - replication + /** * Copyright (C) 2008 10gen Inc. * @@ -15,9 +15,45 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see <http://www.gnu.org/licenses/>. */ + +/* replication data overview + + at the slave: + local.sources { host: ..., source: ..., syncedTo: } +*/ + +#pragma once + +bool cloneFrom(const char *masterHost, string& errmsg); -#pragma once
-
-bool cloneFrom(const char *masterHost, string& errmsg);
-
-
+#pragma pack(push)
+#pragma pack(4)
+class OpTime {
+ unsigned secs;
+ unsigned i;
+public:
+ OpTime(unsigned a, unsigned b) { secs = a; i = b; }
+ OpTime() { secs = 0; i = 0; }
+ static OpTime now();
+ double& asDouble() { return *((double *) this); }
+ bool isNull() { return secs == 0; }
+};
+#pragma pack(pop)
+ +/* A Source is a source from which we can pull (replicate) data. + stored in collection local.sources. + + Can be a group of things to replicate for several databases. +*/ +class Source { +public: + string hostName; + string sourceName; + OpTime syncedTo; + static void loadAll(vector<Source*>&); + static void cleanup(vector<Source*>&); + Source(JSObj); + void pull(); + void updateOnDisk(); + JSObj jsobj(); // { host: ..., source: ..., syncedTo: } +}; diff --git a/stdafx.cpp b/stdafx.cpp index 7d461b2e3cb..f499f8b9bc0 100644 --- a/stdafx.cpp +++ b/stdafx.cpp @@ -46,6 +46,11 @@ void asserted(const char *msg, const char *file, unsigned line) { throw AssertionException(); } +void uasserted(const char *msg, const char *file, unsigned line) { + problem() << "User Assertion failure " << msg << ' ' << file << ' ' << line << endl; + throw AssertionException(); +} + void msgasserted(const char *msg) { cout << "Assertion: " << msg << '\n'; throw AssertionException(); @@ -58,6 +58,7 @@ public: void asserted(const char *msg, const char *file, unsigned line); void wasserted(const char *msg, const char *file, unsigned line); +void uasserted(const char *msg, const char *file, unsigned line); void msgasserted(const char *msg); #ifdef assert @@ -66,6 +67,9 @@ void msgasserted(const char *msg); #define assert(_Expression) (void)( (!!(_Expression)) || (asserted(#_Expression, __FILE__, __LINE__), 0) ) +/* "user assert". if asserts, user did something wrong, not our code */ +#define uassert(_Expression) (void)( (!!(_Expression)) || (uasserted(#_Expression, __FILE__, __LINE__), 0) ) + #define xassert(_Expression) (void)( (!!(_Expression)) || (asserted(#_Expression, __FILE__, __LINE__), 0) ) #define yassert 1 diff --git a/util/log.h b/util/log.h index 286d956ebb3..78be6e28b1b 100644 --- a/util/log.h +++ b/util/log.h @@ -51,21 +51,21 @@ public: Logstream& operator<<(const string& x) LOGIT Logstream& operator<< (ostream& ( *_endl )(ostream&)) { lock lk(mutex); cout << _endl; return *this; } Logstream& operator<< (ios_base& (*_hex)(ios_base&)) { lock lk(mutex); cout << _hex; return *this; } - Logstream& prolog() { + Logstream& prolog(bool withNs = false) { lock lk(mutex); time_t t; time(&t); string now(ctime(&t),0,20); - cout << "~ " << now; - if( client ) + cout << now; + if( withNs && client ) cout << curNs << ' '; return *this; } }; -inline Logstream& endl ( Logstream& os ) { } +inline Logstream& endl ( Logstream& os ) { return os; } extern Logstream logstream; -// not threadsafe -inline Logstream& problem() { return logstream.prolog(); } +inline Logstream& problem() { return logstream.prolog(true); } +inline Logstream& log() { return logstream.prolog(); } #define cout logstream diff --git a/util/sock.h b/util/sock.h index 46179449464..1e101b619ac 100644 --- a/util/sock.h +++ b/util/sock.h @@ -178,7 +178,6 @@ inline SockAddr::SockAddr(int sourcePort) { } inline SockAddr::SockAddr(const char *ip, int port) { - cout << "TEMP port:" << port << endl; memset(sa.sin_zero, 0, sizeof(sa.sin_zero)); sa.sin_family = AF_INET; sa.sin_port = htons(port); |