summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEliot Horowitz <eliot@10gen.com>2011-03-30 17:03:52 -0400
committerEliot Horowitz <eliot@10gen.com>2011-03-31 03:12:43 -0400
commit134e8eb0e0913747ac648875514332fe9949d4cd (patch)
tree9d74bf3c35cf7920fd39fe18b890cc1d4774a63f
parent7c5b035e1bad0a4f1d84adde04df49b57dae7828 (diff)
downloadmongo-134e8eb0e0913747ac648875514332fe9949d4cd.tar.gz
remove all clientId related code and systems, and just use tls SERVER-2872
-rw-r--r--db/lasterror.cpp134
-rw-r--r--db/lasterror.h19
-rw-r--r--s/client.cpp107
-rw-r--r--s/client.h18
-rw-r--r--s/request.cpp5
-rw-r--r--s/request.h4
-rw-r--r--s/server.cpp17
-rw-r--r--s/writeback_listener.cpp2
-rw-r--r--util/message.cpp29
-rw-r--r--util/message.h5
-rw-r--r--util/message_server.h4
-rw-r--r--util/message_server_port.cpp10
12 files changed, 89 insertions, 265 deletions
diff --git a/db/lasterror.cpp b/db/lasterror.cpp
index ba52111c883..4643aa9bfea 100644
--- a/db/lasterror.cpp
+++ b/db/lasterror.cpp
@@ -28,7 +28,6 @@ namespace mongo {
LastError LastError::noError;
LastErrorHolder lastError;
- mongo::mutex LastErrorHolder::_idsmutex("LastErrorHolder");
bool isShell = false;
void raiseError(int code , const char *msg) {
@@ -79,22 +78,9 @@ namespace mongo {
}
LastErrorHolder::~LastErrorHolder() {
- for ( IDMap::iterator i = _ids.begin(); i != _ids.end(); ++i ) {
- delete i->second.lerr;
- i->second.lerr = 0;
- }
- _ids.clear();
}
- void LastErrorHolder::setID( int id ) {
- _id.set( id );
- }
-
- int LastErrorHolder::getID() {
- return _id.get();
- }
-
LastError * LastErrorHolder::disableForCommand() {
LastError *le = _get();
assert( le );
@@ -111,77 +97,31 @@ namespace mongo {
}
LastError * LastErrorHolder::_get( bool create ) {
- int id = _id.get();
- if ( id == 0 ) {
- LastError * le = _tl.get();
- if ( ! le && create ) {
- le = new LastError();
- _tl.reset( le );
- }
- return le;
- }
-
- scoped_lock lock(_idsmutex);
- map<int,Status>::iterator i = _ids.find( id );
- if ( i == _ids.end() ) {
- if ( ! create )
- return 0;
-
- LastError * le = new LastError();
- Status s;
- s.time = time(0);
- s.lerr = le;
- _ids[id] = s;
- return le;
+ LastError * le = _tl.get();
+ if ( ! le && create ) {
+ le = new LastError();
+ _tl.reset( le );
}
-
- Status &status = i->second;
- status.time = time(0);
- return status.lerr;
- }
-
- void LastErrorHolder::remove( int id ) {
- scoped_lock lock(_idsmutex);
- map<int,Status>::iterator i = _ids.find( id );
- if ( i == _ids.end() )
- return;
-
- delete i->second.lerr;
- _ids.erase( i );
+ return le;
}
void LastErrorHolder::release() {
- int id = _id.get();
- if ( id == 0 ) {
- _tl.release();
- return;
- }
-
- remove( id );
+ _tl.release();
}
/** ok to call more than once. */
void LastErrorHolder::initThread() {
- if( _tl.get() ) return;
- assert( _id.get() == 0 );
- _tl.reset( new LastError() );
+ if( ! _tl.get() )
+ _tl.reset( new LastError() );
}
void LastErrorHolder::reset( LastError * le ) {
- int id = _id.get();
- if ( id == 0 ) {
- _tl.reset( le );
- return;
- }
-
- scoped_lock lock(_idsmutex);
- Status & status = _ids[id];
- status.time = time(0);
- status.lerr = le;
+ _tl.reset( le );
}
void prepareErrForNewRequest( Message &m, LastError * err ) {
// a killCursors message shouldn't affect last error
+ assert( err );
if ( m.operation() == dbKillCursors ) {
err->disabled = true;
}
@@ -191,60 +131,10 @@ namespace mongo {
}
}
- LastError * LastErrorHolder::startRequest( Message& m , int clientId ) {
- assert( clientId );
- setID( clientId );
-
- LastError * le = _get( true );
+ LastError * LastErrorHolder::startRequest( Message& m , LastError * le ) {
+ assert( le );
prepareErrForNewRequest( m, le );
return le;
}
- void LastErrorHolder::startRequest( Message& m , LastError * connectionOwned ) {
- prepareErrForNewRequest( m, connectionOwned );
- }
-
- void LastErrorHolder::disconnect( int clientId ) {
- if ( clientId )
- remove(clientId);
- }
-
- struct LastErrorHolderTest : public UnitTest {
- public:
-
- void test( int i ) {
- _tl.set( i );
- assert( _tl.get() == i );
- }
-
- void tlmaptest() {
- test( 1 );
- test( 12123123 );
- test( -123123 );
- test( numeric_limits<int>::min() );
- test( numeric_limits<int>::max() );
- }
-
- void run() {
- tlmaptest();
-
- LastError * a = new LastError();
- LastError * b = new LastError();
-
- LastErrorHolder holder;
- holder.reset( a );
- assert( a == holder.get() );
- holder.setID( 1 );
- assert( 0 == holder.get() );
- holder.reset( b );
- assert( b == holder.get() );
- holder.setID( 0 );
- assert( a == holder.get() );
-
- holder.remove( 1 );
- }
-
- ThreadLocalValue<int> _tl;
- } lastErrorHolderTest;
-
} // namespace mongo
diff --git a/db/lasterror.h b/db/lasterror.h
index c77ec740f03..86250e496a8 100644
--- a/db/lasterror.h
+++ b/db/lasterror.h
@@ -100,14 +100,14 @@ namespace mongo {
extern class LastErrorHolder {
public:
- LastErrorHolder() : _id( 0 ) {}
+ LastErrorHolder(){}
~LastErrorHolder();
LastError * get( bool create = false );
LastError * getSafe() {
LastError * le = get(false);
if ( ! le ) {
- log( LL_ERROR ) << " no LastError! id: " << getID() << endl;
+ error() << " no LastError!" << endl;
assert( le );
}
return le;
@@ -120,18 +120,12 @@ namespace mongo {
/** ok to call more than once. */
void initThread();
- /**
- * id of 0 means should use thread local management
- */
- void setID( int id );
int getID();
-
- void remove( int id );
+
void release();
/** when db receives a message/request, call this */
- void startRequest( Message& m , LastError * connectionOwned );
- LastError * startRequest( Message& m , int clientId );
+ LastError * startRequest( Message& m , LastError * connectionOwned );
void disconnect( int clientId );
@@ -139,17 +133,12 @@ namespace mongo {
// disable causes get() to return 0.
LastError *disableForCommand(); // only call once per command invocation!
private:
- ThreadLocalValue<int> _id;
boost::thread_specific_ptr<LastError> _tl;
struct Status {
time_t time;
LastError *lerr;
};
- typedef map<int,Status> IDMap;
-
- static mongo::mutex _idsmutex;
- IDMap _ids;
} lastError;
void raiseError(int code , const char *msg);
diff --git a/s/client.cpp b/s/client.cpp
index 1289b79b46f..e1cb0229914 100644
--- a/s/client.cpp
+++ b/s/client.cpp
@@ -36,7 +36,7 @@
namespace mongo {
- ClientInfo::ClientInfo( int clientId ) : _id( clientId ) {
+ ClientInfo::ClientInfo() {
_cur = &_a;
_prev = &_b;
_autoSplitOk = true;
@@ -44,13 +44,6 @@ namespace mongo {
}
ClientInfo::~ClientInfo() {
- if ( _lastAccess ) {
- scoped_lock lk( _clientsLock );
- Cache::iterator i = _clients.find( _id );
- if ( i != _clients.end() ) {
- _clients.erase( i );
- }
- }
}
void ClientInfo::addShard( const string& shard ) {
@@ -79,49 +72,19 @@ namespace mongo {
_cur->clear();
}
- void ClientInfo::disconnect() {
- _lastAccess = 0;
- }
-
- ClientInfo * ClientInfo::get( int clientId , bool create ) {
-
- if ( ! clientId )
- clientId = getClientId();
-
- if ( ! clientId ) {
- ClientInfo * info = _tlInfo.get();
- if ( ! info ) {
- info = new ClientInfo( 0 );
- _tlInfo.reset( info );
- }
+ ClientInfo * ClientInfo::get() {
+ ClientInfo * info = _tlInfo.get();
+ if ( ! info ) {
+ info = new ClientInfo();
+ _tlInfo.reset( info );
info->newRequest();
- return info;
}
-
- scoped_lock lk( _clientsLock );
- Cache::iterator i = _clients.find( clientId );
- if ( i != _clients.end() )
- return i->second;
- if ( ! create )
- return 0;
- ClientInfo * info = new ClientInfo( clientId );
- _clients[clientId] = info;
return info;
}
- void ClientInfo::disconnect( int clientId ) {
- if ( ! clientId )
- return;
-
- scoped_lock lk( _clientsLock );
- Cache::iterator i = _clients.find( clientId );
- if ( i == _clients.end() )
- return;
-
- ClientInfo* ci = i->second;
- ci->disconnect();
- delete ci;
- _clients.erase( i );
+ void ClientInfo::disconnect() {
+ // should be handled by TL cleanup
+ _lastAccess = 0;
}
void ClientInfo::_addWriteBack( vector<WBInfo>& all , const BSONObj& gle ) {
@@ -142,14 +105,14 @@ namespace mongo {
vector<BSONObj> ClientInfo::_handleWriteBacks( vector<WBInfo>& all , bool fromWriteBackListener ) {
vector<BSONObj> res;
+
+ if ( all.size() == 0 )
+ return res;
if ( fromWriteBackListener ) {
LOG(1) << "not doing recusrive writeback" << endl;
return res;
}
-
- if ( all.size() == 0 )
- return res;
for ( unsigned i=0; i<all.size(); i++ ) {
res.push_back( WriteBackListener::waitFor( all[i].connectionId , all[i].id ) );
@@ -182,16 +145,16 @@ namespace mongo {
ok = conn->runCommand( "admin" , options , res );
}
catch( std::exception &e ){
-
- warning() << "Could not get last error." << m_error_message( e.what() ) << endl;
-
- // Catch everything that happens here, since we need to ensure we return our connection when we're
+
+ warning() << "Could not get last error." << m_error_message( e.what() ) << endl;
+
+ // Catch everything that happens here, since we need to ensure we return our connection when we're
// finished.
conn.done();
-
+
return false;
}
-
+
res = res.getOwned();
conn.done();
@@ -219,6 +182,7 @@ namespace mongo {
assert( v.size() == 1 );
result.appendElements( v[0] );
result.appendElementsUnique( res );
+ result.append( "writebackGLE" , v[0] );
result.append( "initialGLEHost" , theShard );
}
}
@@ -231,8 +195,11 @@ namespace mongo {
}
BSONArrayBuilder bbb( result.subarrayStart( "shards" ) );
+ BSONObjBuilder shardRawGLE;
long long n = 0;
+
+ int updatedExistingStat = 0; // 0 is none, -1 has but false, 1 has true
// hit each shard
vector<string> errors;
@@ -243,20 +210,21 @@ namespace mongo {
ShardConnection conn( theShard , "" );
BSONObj res;
bool ok = false;
- try{
- ok = conn->runCommand( "admin" , options , res );
+ try {
+ ok = conn->runCommand( "admin" , options , res );
+ shardRawGLE.append( theShard , res );
}
catch( std::exception &e ){
// Safe to return here, since we haven't started any extra processing yet, just collecting
// responses.
-
+
warning() << "Could not get last error." << m_error_message( e.what() ) << endl;
- conn.done();
-
- return false;
- }
-
+ conn.done();
+
+ return false;
+ }
+
_addWriteBack( writebacks, res );
string temp = DBClientWithCommands::getLastErrorString( res );
@@ -264,13 +232,24 @@ namespace mongo {
errors.push_back( temp );
errorObjects.push_back( res );
}
+
n += res["n"].numberLong();
+ if ( res["updatedExisting"].type() ) {
+ if ( res["updatedExisting"].trueValue() )
+ updatedExistingStat = 1;
+ else if ( updatedExistingStat == 0 )
+ updatedExistingStat = -1;
+ }
+
conn.done();
}
bbb.done();
+ result.append( "shardRawGLE" , shardRawGLE.obj() );
result.appendNumber( "n" , n );
+ if ( updatedExistingStat )
+ result.appendBool( "updatedExisting" , updatedExistingStat > 0 );
// hit other machines just to block
for ( set<string>::const_iterator i=sinceLastGetError().begin(); i!=sinceLastGetError().end(); ++i ) {
@@ -313,8 +292,6 @@ namespace mongo {
return true;
}
- ClientInfo::Cache& ClientInfo::_clients = *(new ClientInfo::Cache());
- mongo::mutex ClientInfo::_clientsLock("_clientsLock");
boost::thread_specific_ptr<ClientInfo> ClientInfo::_tlInfo;
} // namespace mongo
diff --git a/s/client.h b/s/client.h
index bd4295fe22b..2e9fefea6e3 100644
--- a/s/client.h
+++ b/s/client.h
@@ -26,11 +26,8 @@ namespace mongo {
* currently implemented with a thread local
*/
class ClientInfo {
-
- typedef map<int,ClientInfo*> Cache;
-
public:
- ClientInfo( int clientId );
+ ClientInfo();
~ClientInfo();
/** new request from client, adjusts internal state */
@@ -54,7 +51,7 @@ namespace mongo {
* gets shards used on the previous request
*/
set<string> * getPrev() const { return _prev; };
-
+
/**
* gets all shards we've accessed since the last time we called clearSinceLastGetError
*/
@@ -65,6 +62,12 @@ namespace mongo {
*/
void clearSinceLastGetError() { _sinceLastGetError.clear(); }
+
+ /**
+ * resets the list of shards using to process the current request
+ */
+ void clearCurrentShards(){ _cur->clear(); }
+
/**
* calls getLastError
* resets shards since get last error
@@ -77,8 +80,7 @@ namespace mongo {
void noAutoSplit() { _autoSplitOk = false; }
- static ClientInfo * get( int clientId = 0 , bool create = true );
- static void disconnect( int clientId );
+ static ClientInfo * get();
private:
@@ -111,8 +113,6 @@ namespace mongo {
int _lastAccess;
bool _autoSplitOk;
- static mongo::mutex _clientsLock;
- static Cache& _clients;
static boost::thread_specific_ptr<ClientInfo> _tlInfo;
};
diff --git a/s/request.cpp b/s/request.cpp
index 52f2e547bf9..32c17cc282c 100644
--- a/s/request.cpp
+++ b/s/request.cpp
@@ -41,8 +41,7 @@ namespace mongo {
assert( _d.getns() );
_id = _m.header()->id;
- _clientId = p ? p->getClientId() : 0;
- _clientInfo = ClientInfo::get( _clientId );
+ _clientInfo = ClientInfo::get();
_clientInfo->newRequest( p );
}
@@ -74,7 +73,7 @@ namespace mongo {
}
_m.header()->id = _id;
-
+ _clientInfo->clearCurrentShards();
}
Shard Request::primaryShard() const {
diff --git a/s/request.h b/s/request.h
index 5b4c228588b..7c51e5c9d9b 100644
--- a/s/request.h
+++ b/s/request.h
@@ -66,9 +66,6 @@ namespace mongo {
return _chunkManager;
}
- int getClientId() const {
- return _clientId;
- }
ClientInfo * getClientInfo() const {
return _clientInfo;
}
@@ -103,7 +100,6 @@ namespace mongo {
DBConfigPtr _config;
ChunkManagerPtr _chunkManager;
- int _clientId;
ClientInfo * _clientInfo;
OpCounters* _counter;
diff --git a/s/server.cpp b/s/server.cpp
index 9bdeedeec6d..51f30f16a2a 100644
--- a/s/server.cpp
+++ b/s/server.cpp
@@ -77,19 +77,19 @@ namespace mongo {
public:
virtual ~ShardedMessageHandler() {}
- virtual void process( Message& m , AbstractMessagingPort* p ) {
+ virtual void connected( AbstractMessagingPort* p ) {
+ assert( ClientInfo::get() );
+ }
+
+ virtual void process( Message& m , AbstractMessagingPort* p , LastError * le) {
assert( p );
Request r( m , p );
- LastError * le = lastError.startRequest( m , r.getClientId() );
- assert( le );
+ assert( le );
+ lastError.startRequest( m , le );
- if ( logLevel > 5 ) {
- log(5) << "client id: " << hex << r.getClientId() << "\t" << r.getns() << "\t" << dec << r.op() << endl;
- }
try {
r.init();
- setClientId( r.getClientId() );
r.process();
}
catch ( AssertionException & e ) {
@@ -119,8 +119,7 @@ namespace mongo {
}
virtual void disconnected( AbstractMessagingPort* p ) {
- ClientInfo::disconnect( p->getClientId() );
- lastError.disconnect( p->getClientId() );
+ // all things are thread local
}
};
diff --git a/s/writeback_listener.cpp b/s/writeback_listener.cpp
index 21d59d0bae6..3051013d747 100644
--- a/s/writeback_listener.cpp
+++ b/s/writeback_listener.cpp
@@ -159,7 +159,7 @@ namespace mongo {
DBConfigPtr db = grid.getDBConfig( ns );
ShardChunkVersion needVersion( data["version"] );
- log(1) << "connectionId: " << cid << " writebackId: " << wid << " needVersion : " << needVersion.toString()
+ LOG(1) << "connectionId: " << cid << " writebackId: " << wid << " needVersion : " << needVersion.toString()
<< " mine : " << db->getChunkManager( ns )->getVersion().toString() << endl;// TODO change to log(3)
if ( logLevel ) log(1) << debugString( m ) << endl;
diff --git a/util/message.cpp b/util/message.cpp
index 37099dcbbc5..916aa342ce9 100644
--- a/util/message.cpp
+++ b/util/message.cpp
@@ -703,7 +703,6 @@ again:
MSGID NextMsgId;
- ThreadLocalValue<int> clientId;
struct MsgStart {
MsgStart() {
@@ -721,14 +720,6 @@ again:
return op == dbQuery || op == dbGetMore;
}
- void setClientId( int id ) {
- clientId.set( id );
- }
-
- int getClientId() {
- return clientId.get();
- }
-
const int DEFAULT_MAX_CONN = 20000;
const int MAX_MAX_CONN = 20000;
@@ -773,24 +764,4 @@ again:
TicketHolder connTicketHolder(DEFAULT_MAX_CONN);
- int AbstractMessagingPort::getClientId() {
- if ( _clientId == 0 ) {
- /**
- * highest 2 bytes is port
- * this is unique except when there are multiple ips
- * or can be issue with high connection churn
- *
- * lowest 2 bytes is part of the address
- * space there is 128 * 2^16
- * so if there is 8gb of heap area for sockets, then this 100% unique
- */
- DEV assert( sizeof( MessagingPort ) > 128 );
- int x = remotePort();
- x = x << 16;
- x |= ( ( (long long)this >> 7 ) & 0xFFFF ); // lowest 7 bits isn't helpful
- _clientId = x;
- }
- return _clientId;
- }
-
} // namespace mongo
diff --git a/util/message.h b/util/message.h
index 04cdc968d15..f1144454a43 100644
--- a/util/message.h
+++ b/util/message.h
@@ -85,8 +85,6 @@ namespace mongo {
virtual HostAndPort remote() const = 0;
virtual unsigned remotePort() const = 0;
- virtual int getClientId();
-
private:
int _clientId;
};
@@ -469,9 +467,6 @@ namespace mongo {
MSGID nextMessageId();
- void setClientId( int id );
- int getClientId();
-
extern TicketHolder connTicketHolder;
class ElapsedTracker {
diff --git a/util/message_server.h b/util/message_server.h
index 39375c8b2ba..defae0b59ed 100644
--- a/util/message_server.h
+++ b/util/message_server.h
@@ -29,7 +29,9 @@ namespace mongo {
class MessageHandler {
public:
virtual ~MessageHandler() {}
- virtual void process( Message& m , AbstractMessagingPort* p ) = 0;
+
+ virtual void connected( AbstractMessagingPort* p ) = 0;
+ virtual void process( Message& m , AbstractMessagingPort* p , LastError * err ) = 0;
virtual void disconnected( AbstractMessagingPort* p ) = 0;
};
diff --git a/util/message_server_port.cpp b/util/message_server_port.cpp
index 6d00628c8ad..76bd78d516c 100644
--- a/util/message_server_port.cpp
+++ b/util/message_server_port.cpp
@@ -23,6 +23,7 @@
#include "message_server.h"
#include "../db/cmdline.h"
+#include "../db/lasterror.h"
#include "../db/stats/counters.h"
namespace mongo {
@@ -38,14 +39,19 @@ namespace mongo {
setThreadName( "conn" );
- auto_ptr<MessagingPort> p( inPort );
+ scoped_ptr<MessagingPort> p( inPort );
string otherSide;
Message m;
try {
+ LastError * le = new LastError();
+ lastError.reset( le ); // lastError now has ownership
+
otherSide = p->farEnd.toString();
+ handler->connected( p.get() );
+
while ( 1 ) {
m.reset();
p->clearCounters();
@@ -57,7 +63,7 @@ namespace mongo {
break;
}
- handler->process( m , p.get() );
+ handler->process( m , p.get() , le );
networkCounter.hit( p->getBytesIn() , p->getBytesOut() );
}
}