summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--db/db.cpp6
-rw-r--r--db/dbclient.cpp12
-rw-r--r--db/dbclient.h2
-rw-r--r--db/jsobj.cpp2
-rw-r--r--db/jsobj.h5
-rw-r--r--db/json.cpp7
-rw-r--r--db/pdfile.cpp9
-rw-r--r--db/query.cpp7
-rw-r--r--db/repl.cpp136
-rw-r--r--db/repl.h50
-rw-r--r--stdafx.cpp5
-rw-r--r--stdafx.h4
-rw-r--r--util/log.h12
-rw-r--r--util/sock.h1
14 files changed, 236 insertions, 22 deletions
diff --git a/db/db.cpp b/db/db.cpp
index b588058f90f..44b9b3e364c 100644
--- a/db/db.cpp
+++ b/db/db.cpp
@@ -39,6 +39,7 @@ bool useCursors = true;
boost::mutex dbMutex;
void closeAllSockets();
+void startReplication();
struct MyStartupTests {
MyStartupTests() {
@@ -367,12 +368,13 @@ public:
115 replay, opLogging
*/
void listen(int port) {
- const char *Version = "db version: 120 09jul2008 logging fix";
+ const char *Version = "db version: 121";
problem() << Version << endl;
pdfileInit();
- testTheDb();
+ //testTheDb();
cout << curTimeMillis() % 10000 << " waiting for connections on port " << port << " ...\n" << endl;
OurListener l(port);
+ startReplication();
l.listen();
}
diff --git a/db/dbclient.cpp b/db/dbclient.cpp
index 03b857ea3ad..55da39a98a5 100644
--- a/db/dbclient.cpp
+++ b/db/dbclient.cpp
@@ -21,6 +21,16 @@
#include "../util/builder.h"
#include "jsobj.h"
+JSObj DBClientConnection::findOne(const char *ns, JSObj query, JSObj *fieldsToReturn) {
+ auto_ptr<DBClientCursor> c =
+ this->query(ns, query, 1, 0, fieldsToReturn);
+
+ if( !c->more() )
+ return JSObj();
+
+ return c->next().copy();
+}
+
bool DBClientConnection::connect(const char *serverAddress, string& errmsg) {
/* not reentrant!
ok as used right now (we are in a big lock), but won't be later, so fix. */
@@ -95,7 +105,7 @@ bool DBClientCursor::more() {
if( cursorId == 0 )
return false;
- cout << "TEMP: requestMore" << endl;
+// cout << "TEMP: requestMore" << endl;
requestMore();
return pos < nReturned;
}
diff --git a/db/dbclient.h b/db/dbclient.h
index 812cc480f44..82354c2ae6a 100644
--- a/db/dbclient.h
+++ b/db/dbclient.h
@@ -74,4 +74,6 @@ public:
returns 0 if error
*/
auto_ptr<DBClientCursor> query(const char *ns, JSObj query, int nToReturn = 0, int nToSkip = 0, JSObj *fieldsToReturn = 0);
+
+ JSObj findOne(const char *ns, JSObj query, JSObj *fieldsToReturn = 0);
};
diff --git a/db/jsobj.cpp b/db/jsobj.cpp
index 423adb07131..49e435f45a0 100644
--- a/db/jsobj.cpp
+++ b/db/jsobj.cpp
@@ -737,7 +737,7 @@ JSObj JSObj::extractFields(JSObj pattern, JSObjBuilder& b) {
const char * JSObj::getStringField(const char *name) {
Element e = getField(name);
- return e.type() == String ? e.valuestr() : 0;
+ return e.type() == String ? e.valuestr() : "";
}
JSObj JSObj::getObjectField(const char *name) {
diff --git a/db/jsobj.h b/db/jsobj.h
index d36cb8d4e98..018be9359af 100644
--- a/db/jsobj.h
+++ b/db/jsobj.h
@@ -247,7 +247,9 @@ public:
Element getField(const char *name); /* return has eoo() true if no match */
+ // returns "" if DNE or wrong type
const char * getStringField(const char *name);
+
JSObj getObjectField(const char *name);
/* makes a new JSObj with the fields specified in pattern.
@@ -383,6 +385,9 @@ public:
b.append((int) strlen(str)+1);
b.append(str);
}
+ void append(const char *fieldName, string str) {
+ append(fieldName, str.c_str());
+ }
/* JSObj will free the buffer when it is finished. */
JSObj doneAndDecouple() {
diff --git a/db/json.cpp b/db/json.cpp
index cdf72b508c4..f92df2d248b 100644
--- a/db/json.cpp
+++ b/db/json.cpp
@@ -32,6 +32,13 @@ void value(JSObjBuilder& b, const char *&p, string& id) {
p += 7;
b.appendOID(id.c_str());
}
+ else if( *p == '1' ) {
+ b.append(id.c_str(), 1);
+ p++;
+ }
+ else {
+ assert(false);
+ }
}
void _fromjson(JSObjBuilder& b, const char *&p) {
diff --git a/db/pdfile.cpp b/db/pdfile.cpp
index 7167435ce1c..1841846a5f9 100644
--- a/db/pdfile.cpp
+++ b/db/pdfile.cpp
@@ -366,8 +366,9 @@ auto_ptr<Cursor> makeNamespaceCursor() {
}*/
/* add a new namespace to the system catalog (<dbname>.system.namespaces).
+ options: { capped : ..., size : ... }
*/
-void addNewNamespaceToCatalog(const char *ns) {
+void addNewNamespaceToCatalog(const char *ns, JSObj *options = 0) {
cout << "New namespace: " << ns << endl;
if( strstr(ns, "system.namespaces") ) {
// system.namespaces holds all the others, so it is not explicitly listed in the catalog.
@@ -378,6 +379,8 @@ void addNewNamespaceToCatalog(const char *ns) {
{
JSObjBuilder b;
b.append("name", ns);
+ if( options )
+ b.append("options", *options);
JSObj j = b.done();
char client[256];
nsToClient(ns, client);
@@ -411,7 +414,7 @@ bool userCreateNS(const char *ns, JSObj& j, string& err) {
/* todo: do this only when we have allocated space successfully? or we could insert with a { ok: 0 } field
and then go back and set to ok : 1 after we are done.
*/
- addNewNamespaceToCatalog(ns);
+ addNewNamespaceToCatalog(ns, &j);
int ies = initialExtentSize(128);
Element e = j.findElement("size");
@@ -1020,7 +1023,7 @@ DiskLoc DataFileMgr::insert(const char *ns, const void *buf, int len, bool god)
const char *name = io.getStringField("name"); // name of the index
tabletoidxns = io.getStringField("ns"); // table it indexes
JSObj key = io.getObjectField("key");
- if( name == 0 || *name == 0 || tabletoidxns == 0 || key.isEmpty() || key.objsize() > 2048 ) {
+ if( *name == 0 || *tabletoidxns == 0 || key.isEmpty() || key.objsize() > 2048 ) {
cout << "user warning: bad add index attempt name:" << (name?name:"") << "\n ns:" <<
(tabletoidxns?tabletoidxns:"") << "\n ourns:" << ns;
cout << "\n idxobj:" << io.toString() << endl;
diff --git a/db/query.cpp b/db/query.cpp
index 6a06e0b42c6..b29527dd145 100644
--- a/db/query.cpp
+++ b/db/query.cpp
@@ -595,7 +595,12 @@ inline bool _runCommands(const char *ns, JSObj& jsobj, stringstream& ss, BufBuil
ok = dbEval(jsobj, anObjBuilder);
}
else if( e.type() == Number ) {
- if( strcmp(e.fieldName(), "dropDatabase") == 0 ) {
+ if( strcmp(e.fieldName(), "getoptime") == 0 ) {
+ valid = true;
+ ok = true;
+ anObjBuilder.append("optime", OpTime::now().asDouble());
+ }
+ else if( strcmp(e.fieldName(), "dropDatabase") == 0 ) {
if( 1 ) {
cout << "dropDatabase " << ns << endl;
valid = true;
diff --git a/db/repl.cpp b/db/repl.cpp
index f025455de3b..dac95c852c8 100644
--- a/db/repl.cpp
+++ b/db/repl.cpp
@@ -23,8 +23,26 @@
#include "../grid/message.h"
#include "dbclient.h"
#include "pdfile.h"
+#include "query.h"
+#include "json.h"
extern JSObj emptyObj;
+extern boost::mutex dbMutex;
+auto_ptr<Cursor> findTableScan(const char *ns, JSObj& order);
+bool userCreateNS(const char *ns, JSObj& j, string& err);
+
+OpTime last((unsigned) time(0), 1);
+
+OpTime OpTime::now() {
+ unsigned t = (unsigned) time(0);
+ if( last.secs == t )
+ return OpTime(last.secs, last.i+1);
+ return OpTime(t, 1);
+}
+
+/* Cloner -----------------------------------------------------------
+ makes copy of existing database.
+*/
class Cloner: boost::noncopyable {
DBClientConnection conn;
@@ -69,6 +87,11 @@ bool Cloner::go(const char *masterHost, string& errmsg) {
const char *name = e.valuestr();
if( strstr(name, ".system.") || strchr(name, '$') )
continue;
+ JSObj options = collection.getObjectField("options");
+ if( !options.isEmpty() ) {
+ string err;
+ userCreateNS(name, options, err);
+ }
copy(name);
}
@@ -84,3 +107,116 @@ bool cloneFrom(const char *masterHost, string& errmsg)
Cloner c;
return c.go(masterHost, errmsg);
}
+
+/* --------------------------------------------------------------*/
+
+Source::Source(JSObj o) {
+ hostName = o.getStringField("host");
+ sourceName = o.getStringField("source");
+ uassert( !hostName.empty() );
+ uassert( !sourceName.empty() );
+ Element e = o.getField("syncedTo");
+ if( !e.eoo() ) {
+ uassert( e.type() == Number );
+ syncedTo.asDouble() = e.number();
+ }
+}
+
+JSObj Source::jsobj() {
+ JSObjBuilder b;
+ b.append("host", hostName);
+ b.append("source", sourceName);
+ b.append("syncedTo", syncedTo.asDouble());
+ return b.doneAndDecouple();
+}
+
+void Source::updateOnDisk() {
+ JSObjBuilder b;
+ b.append("host", hostName);
+ b.append("source", sourceName);
+ JSObj pattern = b.done();
+
+ JSObj o = jsobj();
+
+ stringstream ss;
+ setClient("local.sources");
+ updateObjects("local.sources", o, pattern, false, ss);
+}
+
+void Source::cleanup(vector<Source*>& v) {
+ for( vector<Source*>::iterator i = v.begin(); i != v.end(); i++ )
+ delete *i;
+}
+
+void Source::loadAll(vector<Source*>& v) {
+ setClient("local.sources");
+ auto_ptr<Cursor> c = findTableScan("local.sources", emptyObj);
+ while( c->ok() ) {
+ v.push_back( new Source(c->current()) );
+ c->advance();
+ }
+}
+
+JSObj opTimeQuery = fromjson("{getoptime:1}");
+
+/* note: not yet in mutex at this point. */
+void Source::pull() {
+ log() << "pull source " << sourceName << '@' << hostName << endl;
+
+// if( syncedTo.isNull() ) {
+// }
+
+ DBClientConnection conn;
+ string errmsg;
+ if( !conn.connect(hostName.c_str(), errmsg) ) {
+ cout << " pull: cantconn " << errmsg << endl;
+ return;
+ }
+
+ // get current mtime at the server.
+ JSObj o = conn.findOne("admin.$cmd", opTimeQuery);
+ Element e = o.findElement("optime");
+ if( e.eoo() ) {
+ cout << " pull: failed to get curtime from master" << endl;
+ cout << " " << o.toString() << endl;
+ return;
+ }
+ uassert( e.type() == Number );
+ OpTime serverCurTime;
+ serverCurTime.asDouble() = e.number();
+
+}
+
+/* --------------------------------------------------------------*/
+
+void replMain() {
+ vector<Source*> sources;
+
+ {
+ lock lk(dbMutex);
+ Source::loadAll(sources);
+ }
+
+ for( vector<Source*>::iterator i = sources.begin(); i != sources.end(); i++ ) {
+ (*i)->pull();
+ }
+
+ Source::cleanup(sources);
+}
+
+void replMainThread() {
+ while( 1 ) {
+ try {
+ replMain();
+ sleepsecs(5);
+ }
+ catch( AssertionException ) {
+ problem() << "Assertion in replMainThread(): sleeping 5 minutes before retry" << endl;
+ sleepsecs(300);
+ }
+ }
+}
+
+void startReplication() {
+// boost::thread repl_thread(replMainThread);
+}
diff --git a/db/repl.h b/db/repl.h
index 9a66bfbde41..0ea6b31e6f4 100644
--- a/db/repl.h
+++ b/db/repl.h
@@ -1,5 +1,5 @@
-// repl.h - replication
-
+// repl.h - replication
+
/**
* Copyright (C) 2008 10gen Inc.
*
@@ -15,9 +15,45 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
+
+/* replication data overview
+
+ at the slave:
+ local.sources { host: ..., source: ..., syncedTo: }
+*/
+
+#pragma once
+
+bool cloneFrom(const char *masterHost, string& errmsg);
-#pragma once
-
-bool cloneFrom(const char *masterHost, string& errmsg);
-
-
+#pragma pack(push)
+#pragma pack(4)
+class OpTime {
+ unsigned secs;
+ unsigned i;
+public:
+ OpTime(unsigned a, unsigned b) { secs = a; i = b; }
+ OpTime() { secs = 0; i = 0; }
+ static OpTime now();
+ double& asDouble() { return *((double *) this); }
+ bool isNull() { return secs == 0; }
+};
+#pragma pack(pop)
+
+/* A Source is a source from which we can pull (replicate) data.
+ stored in collection local.sources.
+
+ Can be a group of things to replicate for several databases.
+*/
+class Source {
+public:
+ string hostName;
+ string sourceName;
+ OpTime syncedTo;
+ static void loadAll(vector<Source*>&);
+ static void cleanup(vector<Source*>&);
+ Source(JSObj);
+ void pull();
+ void updateOnDisk();
+ JSObj jsobj(); // { host: ..., source: ..., syncedTo: }
+};
diff --git a/stdafx.cpp b/stdafx.cpp
index 7d461b2e3cb..f499f8b9bc0 100644
--- a/stdafx.cpp
+++ b/stdafx.cpp
@@ -46,6 +46,11 @@ void asserted(const char *msg, const char *file, unsigned line) {
throw AssertionException();
}
+void uasserted(const char *msg, const char *file, unsigned line) {
+ problem() << "User Assertion failure " << msg << ' ' << file << ' ' << line << endl;
+ throw AssertionException();
+}
+
void msgasserted(const char *msg) {
cout << "Assertion: " << msg << '\n';
throw AssertionException();
diff --git a/stdafx.h b/stdafx.h
index 6e27ace022e..23257e08a44 100644
--- a/stdafx.h
+++ b/stdafx.h
@@ -58,6 +58,7 @@ public:
void asserted(const char *msg, const char *file, unsigned line);
void wasserted(const char *msg, const char *file, unsigned line);
+void uasserted(const char *msg, const char *file, unsigned line);
void msgasserted(const char *msg);
#ifdef assert
@@ -66,6 +67,9 @@ void msgasserted(const char *msg);
#define assert(_Expression) (void)( (!!(_Expression)) || (asserted(#_Expression, __FILE__, __LINE__), 0) )
+/* "user assert". if asserts, user did something wrong, not our code */
+#define uassert(_Expression) (void)( (!!(_Expression)) || (uasserted(#_Expression, __FILE__, __LINE__), 0) )
+
#define xassert(_Expression) (void)( (!!(_Expression)) || (asserted(#_Expression, __FILE__, __LINE__), 0) )
#define yassert 1
diff --git a/util/log.h b/util/log.h
index 286d956ebb3..78be6e28b1b 100644
--- a/util/log.h
+++ b/util/log.h
@@ -51,21 +51,21 @@ public:
Logstream& operator<<(const string& x) LOGIT
Logstream& operator<< (ostream& ( *_endl )(ostream&)) { lock lk(mutex); cout << _endl; return *this; }
Logstream& operator<< (ios_base& (*_hex)(ios_base&)) { lock lk(mutex); cout << _hex; return *this; }
- Logstream& prolog() {
+ Logstream& prolog(bool withNs = false) {
lock lk(mutex);
time_t t;
time(&t);
string now(ctime(&t),0,20);
- cout << "~ " << now;
- if( client )
+ cout << now;
+ if( withNs && client )
cout << curNs << ' ';
return *this;
}
};
-inline Logstream& endl ( Logstream& os ) { }
+inline Logstream& endl ( Logstream& os ) { return os; }
extern Logstream logstream;
-// not threadsafe
-inline Logstream& problem() { return logstream.prolog(); }
+inline Logstream& problem() { return logstream.prolog(true); }
+inline Logstream& log() { return logstream.prolog(); }
#define cout logstream
diff --git a/util/sock.h b/util/sock.h
index 46179449464..1e101b619ac 100644
--- a/util/sock.h
+++ b/util/sock.h
@@ -178,7 +178,6 @@ inline SockAddr::SockAddr(int sourcePort) {
}
inline SockAddr::SockAddr(const char *ip, int port) {
- cout << "TEMP port:" << port << endl;
memset(sa.sin_zero, 0, sizeof(sa.sin_zero));
sa.sin_family = AF_INET;
sa.sin_port = htons(port);