summaryrefslogtreecommitdiff
path: root/s
diff options
context:
space:
mode:
authorEliot Horowitz <eliot@10gen.com>2011-01-03 14:12:32 -0500
committerEliot Horowitz <eliot@10gen.com>2011-01-03 14:13:07 -0500
commitb99c10320e1fcb28e36167ed0022fcbf55e02cbf (patch)
tree797a7fbda356fa8994ef2fe1ca28d5a55d9b1cfb /s
parent17621a52d3cb1bce3042011f538535b34bfde2f5 (diff)
downloadmongo-b99c10320e1fcb28e36167ed0022fcbf55e02cbf.tar.gz
new writeback hookup with per connection info SERVER-1467 SERVER-1855
Diffstat (limited to 's')
-rw-r--r--s/client.cpp24
-rw-r--r--s/client.h10
-rw-r--r--s/d_logic.cpp1
-rw-r--r--s/writeback_listener.cpp63
-rw-r--r--s/writeback_listener.h12
5 files changed, 78 insertions, 32 deletions
diff --git a/s/client.cpp b/s/client.cpp
index 0ce5cc8f1ca..2e803afbbe0 100644
--- a/s/client.cpp
+++ b/s/client.cpp
@@ -123,19 +123,29 @@ namespace mongo {
_clients.erase( i );
}
- void ClientInfo::_addWriteBack( vector<OID>& all , const BSONObj& o ){
- BSONElement e = o["writeback"];
+ void ClientInfo::_addWriteBack( vector<WBInfo>& all , const BSONObj& o ){
+ BSONElement w = o["writeback"];
- if ( e.type() == jstOID )
- all.push_back( e.OID() );
+ if ( w.type() != jstOID )
+ return;
+
+ BSONElement cid = o["connectionId"];
+ cout << "ELIOT : " << cid << endl;
+
+ if ( cid.eoo() ){
+ error() << "getLastError writeback can't work because of version mis-match" << endl;
+ return;
+ }
+
+ all.push_back( WBInfo( cid.numberLong() , w.OID() ) );
}
- void ClientInfo::_handleWriteBacks( vector<OID>& all ){
+ void ClientInfo::_handleWriteBacks( vector<WBInfo>& all ){
if ( all.size() == 0 )
return;
for ( unsigned i=0; i<all.size(); i++ ){
- WriteBackListener::waitFor( all[i] );
+ WriteBackListener::waitFor( all[i].connectionId , all[i].id );
}
}
@@ -149,7 +159,7 @@ namespace mongo {
return true;
}
- vector<OID> writebacks;
+ vector<WBInfo> writebacks;
// handle single server
if ( shards->size() == 1 ){
diff --git a/s/client.h b/s/client.h
index 60a05d92aad..050661b1876 100644
--- a/s/client.h
+++ b/s/client.h
@@ -77,9 +77,15 @@ namespace mongo {
private:
+ struct WBInfo {
+ WBInfo( ConnectionId c , OID o ) : connectionId( c ) , id( o ){}
+ ConnectionId connectionId;
+ OID id;
+ };
+
// for getLastError
- void _addWriteBack( vector<OID>& all , const BSONObj& o );
- void _handleWriteBacks( vector<OID>& all );
+ void _addWriteBack( vector<WBInfo>& all , const BSONObj& o );
+ void _handleWriteBacks( vector<WBInfo>& all );
int _id; // unique client id
diff --git a/s/d_logic.cpp b/s/d_logic.cpp
index 40a8c91f3c5..7de7a1ef7e6 100644
--- a/s/d_logic.cpp
+++ b/s/d_logic.cpp
@@ -103,6 +103,7 @@ namespace mongo {
b.appendBool( "writeBack" , true );
b.append( "ns" , ns );
b.append( "id" , writebackID );
+ b.append( "connectionId" , cc().getConnectionId() );
b.appendTimestamp( "version" , shardingState.getVersion( ns ) );
b.appendTimestamp( "yourVersion" , ShardedConnectionInfo::get( true )->getVersion( ns ) );
b.appendBinData( "msg" , m.header()->len , bdtCustom , (char*)(m.singleData()) );
diff --git a/s/writeback_listener.cpp b/s/writeback_listener.cpp
index ebef9c7c83f..2e1bc10af25 100644
--- a/s/writeback_listener.cpp
+++ b/s/writeback_listener.cpp
@@ -34,7 +34,7 @@ namespace mongo {
map<string,WriteBackListener*> WriteBackListener::_cache;
mongo::mutex WriteBackListener::_cacheLock("WriteBackListener");
- set<OID> WriteBackListener::_seenWritebacks;
+ map<ConnectionId,WriteBackListener::WBStatus> WriteBackListener::_seenWritebacks;
mongo::mutex WriteBackListener::_seenWritebacksLock("WriteBackListener::seen");
WriteBackListener::WriteBackListener( const string& addr ) : _addr( addr ){
@@ -52,13 +52,16 @@ namespace mongo {
}
/* static */
- void WriteBackListener::waitFor( const OID& oid ){
+ void WriteBackListener::waitFor( ConnectionId connectionId, const OID& oid ){
Timer t;
for ( int i=0; i<5000; i++ ){
{
scoped_lock lk( _seenWritebacksLock );
- if ( _seenWritebacks.count( oid ) )
+ WBStatus s = _seenWritebacks[connectionId];
+ if ( oid <= s.id ){
+ // TODO return gle
return;
+ }
}
sleepmillis( 10 );
}
@@ -68,16 +71,9 @@ namespace mongo {
}
void WriteBackListener::run(){
- OID lastID;
- lastID.clear();
int secsToSleep = 0;
while ( ! inShutdown() && Shard::isMember( _addr ) ){
- if ( lastID.isSet() ){
- scoped_lock lk( _seenWritebacksLock );
- _seenWritebacks.insert( lastID );
- lastID.clear();
- }
try {
ScopedDbConnection conn( _addr );
@@ -100,20 +96,25 @@ namespace mongo {
BSONObj data = result.getObjectField( "data" );
if ( data.getBoolField( "writeBack" ) ){
string ns = data["ns"].valuestrsafe();
- {
- BSONElement e = data["id"];
- if ( e.type() == jstOID )
- lastID = e.OID();
+
+ ConnectionId cid = 0;
+ OID wid;
+ if ( data["connectionId"].isNumber() && data["id"].type() == jstOID ){
+ cid = data["connectionId"].numberLong();
+ wid = data["id"].OID();
+ }
+ else {
+ warning() << "mongos/mongod version mismatch (1.7.5 is the split)" << endl;
}
- int len;
+ int len; // not used, but needed for next call
Message m( (void*)data["msg"].binData( len ) , false );
massert( 10427 , "invalid writeback message" , m.header()->valid() );
DBConfigPtr db = grid.getDBConfig( ns );
ShardChunkVersion needVersion( data["version"] );
- log(1) << "writeback id: " << lastID << " needVersion : " << needVersion.toString()
+ log(1) << "connectionId: " << cid << " writebackId: " << wid << " needVersion : " << needVersion.toString()
<< " mine : " << db->getChunkManager( ns )->getVersion().toString() << endl;// TODO change to log(3)
if ( logLevel ) log(1) << debugString( m ) << endl;
@@ -129,10 +130,32 @@ namespace mongo {
// we need to reload the chunk manager and get the new shard versions
db->getChunkManager( ns , true );
}
-
- Request r( m , 0 );
- r.init();
- r.process();
+
+ // do reequest and then call getLastError
+ // we have to call getLastError so we can return the right fields to the user if they decide to call getLastError
+
+ BSONObj gle;
+ try {
+ Request r( m , 0 );
+ r.init();
+ r.process();
+
+ gle = BSONObj(); // TODO
+ }
+ catch ( DBException& e ){
+ error() << "error processing writeback: " << e << endl;
+ BSONObjBuilder b;
+ b.append( "err" , e.toString() );
+ e.getInfo().append( b );
+ gle = b.obj();
+ }
+
+ {
+ scoped_lock lk( _seenWritebacksLock );
+ WBStatus& s = _seenWritebacks[cid];
+ s.id = wid;
+ s.gle = gle;
+ }
}
else if ( result["noop"].trueValue() ){
// no-op
diff --git a/s/writeback_listener.h b/s/writeback_listener.h
index f567a073e2f..bd2664d9332 100644
--- a/s/writeback_listener.h
+++ b/s/writeback_listener.h
@@ -22,6 +22,7 @@
#include "../client/connpool.h"
#include "../util/background.h"
+#include "../db/client.h"
namespace mongo {
@@ -36,7 +37,7 @@ namespace mongo {
public:
static void init( DBClientBase& conn );
- static void waitFor( const OID& oid );
+ static void waitFor( ConnectionId connectionId, const OID& oid );
protected:
WriteBackListener( const string& addr );
@@ -48,10 +49,15 @@ namespace mongo {
string _addr;
static mongo::mutex _cacheLock; // protects _cache
- static map<string,WriteBackListener*> _cache;
+ static map<string,WriteBackListener*> _cache; // server to listener
+ struct WBStatus {
+ OID id;
+ BSONObj gle;
+ };
+
static mongo::mutex _seenWritebacksLock; // protects _seenWritbacks
- static set<OID> _seenWritebacks; // TODO: this can grow unbounded
+ static map<ConnectionId,WBStatus> _seenWritebacks; // connectionId -> last write back GLE
};
void waitForWriteback( const OID& oid );