path: root/src/mongo/tools/sniffer.cpp
diff options
Diffstat (limited to 'src/mongo/tools/sniffer.cpp')
1 files changed, 566 insertions, 0 deletions
diff --git a/src/mongo/tools/sniffer.cpp b/src/mongo/tools/sniffer.cpp
new file mode 100644
index 00000000000..aeab808cfed
--- /dev/null
+++ b/src/mongo/tools/sniffer.cpp
@@ -0,0 +1,566 @@
+// sniffer.cpp
+ * Copyright (C) 2010 10gen Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <>.
+ */
+ large messages - need to track what's left and ingore
+ single object over packet size - can only display begging of object
+ getmore
+ delete
+ killcursors
+ */
+#include "../pch.h"
+#include <pcap.h>
+#ifdef _WIN32
+#undef min
+#undef max
+#include "../bson/util/builder.h"
+#include "../util/net/message.h"
+#include "../util/mmap.h"
+#include "../db/dbmessage.h"
+#include "../client/dbclient.h"
+#include <stdio.h>
+#include <string.h>
+#include <stdlib.h>
+#include <ctype.h>
+#include <errno.h>
+#include <sys/types.h>
+#ifndef _WIN32
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <iostream>
+#include <map>
+#include <string>
+#include <boost/shared_ptr.hpp>
+using namespace std;
+using mongo::asserted;
+using mongo::Message;
+using mongo::MsgData;
+using mongo::DbMessage;
+using mongo::BSONObj;
+using mongo::BufBuilder;
+using mongo::DBClientConnection;
+using mongo::QueryResult;
+using mongo::MemoryMappedFile;
+mongo::CmdLine mongo::cmdLine;
+namespace mongo {
+ void setupSignals( bool inFork ){}
+#define SNAP_LEN 65535
+int captureHeaderSize;
+set<int> serverPorts;
+string forwardAddress;
+bool objcheck = false;
+ostream *outPtr = &cout;
+ostream &out() { return *outPtr; }
+/* IP header */
+struct sniff_ip {
+ u_char ip_vhl; /* version << 4 | header length >> 2 */
+ u_char ip_tos; /* type of service */
+ u_short ip_len; /* total length */
+ u_short ip_id; /* identification */
+ u_short ip_off; /* fragment offset field */
+#define IP_RF 0x8000 /* reserved fragment flag */
+#define IP_DF 0x4000 /* dont fragment flag */
+#define IP_MF 0x2000 /* more fragments flag */
+#define IP_OFFMASK 0x1fff /* mask for fragmenting bits */
+ u_char ip_ttl; /* time to live */
+ u_char ip_p; /* protocol */
+ u_short ip_sum; /* checksum */
+ struct in_addr ip_src,ip_dst; /* source and dest address */
+#define IP_HL(ip) (((ip)->ip_vhl) & 0x0f)
+#define IP_V(ip) (((ip)->ip_vhl) >> 4)
+/* TCP header */
+#ifdef _WIN32
+typedef unsigned __int32 uint32_t;
+typedef uint32_t tcp_seq;
+struct sniff_tcp {
+ u_short th_sport; /* source port */
+ u_short th_dport; /* destination port */
+ tcp_seq th_seq; /* sequence number */
+ tcp_seq th_ack; /* acknowledgement number */
+ u_char th_offx2; /* data offset, rsvd */
+#define TH_OFF(th) (((th)->th_offx2 & 0xf0) >> 4)
+ u_char th_flags;
+#define TH_FIN 0x01
+#define TH_SYN 0x02
+#define TH_RST 0x04
+#define TH_PUSH 0x08
+#define TH_ACK 0x10
+#define TH_URG 0x20
+#define TH_ECE 0x40
+#define TH_CWR 0x80
+#ifndef TH_FLAGS
+ u_short th_win; /* window */
+ u_short th_sum; /* checksum */
+ u_short th_urp; /* urgent pointer */
+#pragma pack( 1 )
+struct Connection {
+ struct in_addr srcAddr;
+ u_short srcPort;
+ struct in_addr dstAddr;
+ u_short dstPort;
+ bool operator<( const Connection &other ) const {
+ return memcmp( this, &other, sizeof( Connection ) ) < 0;
+ }
+ Connection reverse() const {
+ Connection c;
+ c.srcAddr = dstAddr;
+ c.srcPort = dstPort;
+ c.dstAddr = srcAddr;
+ c.dstPort = srcPort;
+ return c;
+ }
+#pragma pack()
+map< Connection, bool > seen;
+map< Connection, int > bytesRemainingInMessage;
+map< Connection, boost::shared_ptr< BufBuilder > > messageBuilder;
+map< Connection, unsigned > expectedSeq;
+map< Connection, boost::shared_ptr<DBClientConnection> > forwarder;
+map< Connection, long long > lastCursor;
+map< Connection, map< long long, long long > > mapCursor;
+void processMessage( Connection& c , Message& d );
+void got_packet(u_char *args, const struct pcap_pkthdr *header, const u_char *packet) {
+ const struct sniff_ip* ip = (struct sniff_ip*)(packet + captureHeaderSize);
+ int size_ip = IP_HL(ip)*4;
+ if ( size_ip < 20 ) {
+ cerr << "*** Invalid IP header length: " << size_ip << " bytes" << endl;
+ return;
+ }
+ assert( ip->ip_p == IPPROTO_TCP );
+ const struct sniff_tcp* tcp = (struct sniff_tcp*)(packet + captureHeaderSize + size_ip);
+ int size_tcp = TH_OFF(tcp)*4;
+ if (size_tcp < 20) {
+ cerr << "*** Invalid TCP header length: " << size_tcp << " bytes" << endl;
+ return;
+ }
+ if ( ! ( serverPorts.count( ntohs( tcp->th_sport ) ) ||
+ serverPorts.count( ntohs( tcp->th_dport ) ) ) ) {
+ return;
+ }
+ const u_char * payload = (const u_char*)(packet + captureHeaderSize + size_ip + size_tcp);
+ unsigned totalSize = ntohs(ip->ip_len);
+ assert( totalSize <= header->caplen );
+ int size_payload = totalSize - (size_ip + size_tcp);
+ if (size_payload <= 0 )
+ return;
+ Connection c;
+ c.srcAddr = ip->ip_src;
+ c.srcPort = tcp->th_sport;
+ c.dstAddr = ip->ip_dst;
+ c.dstPort = tcp->th_dport;
+ if ( seen[ c ] ) {
+ if ( expectedSeq[ c ] != ntohl( tcp->th_seq ) ) {
+ cerr << "Warning: sequence # mismatch, there may be dropped packets" << endl;
+ }
+ }
+ else {
+ seen[ c ] = true;
+ }
+ expectedSeq[ c ] = ntohl( tcp->th_seq ) + size_payload;
+ Message m;
+ if ( bytesRemainingInMessage[ c ] == 0 ) {
+ m.setData( (MsgData*)payload , false );
+ if ( !m.header()->valid() ) {
+ cerr << "Invalid message start, skipping packet." << endl;
+ return;
+ }
+ if ( size_payload > m.header()->len ) {
+ cerr << "Multiple messages in packet, skipping packet." << endl;
+ return;
+ }
+ if ( size_payload < m.header()->len ) {
+ bytesRemainingInMessage[ c ] = m.header()->len - size_payload;
+ messageBuilder[ c ].reset( new BufBuilder() );
+ messageBuilder[ c ]->appendBuf( (void*)payload, size_payload );
+ return;
+ }
+ }
+ else {
+ bytesRemainingInMessage[ c ] -= size_payload;
+ messageBuilder[ c ]->appendBuf( (void*)payload, size_payload );
+ if ( bytesRemainingInMessage[ c ] < 0 ) {
+ cerr << "Received too many bytes to complete message, resetting buffer" << endl;
+ bytesRemainingInMessage[ c ] = 0;
+ messageBuilder[ c ].reset();
+ return;
+ }
+ if ( bytesRemainingInMessage[ c ] > 0 )
+ return;
+ m.setData( (MsgData*)messageBuilder[ c ]->buf(), true );
+ messageBuilder[ c ]->decouple();
+ messageBuilder[ c ].reset();
+ }
+ DbMessage d( m );
+ out() << inet_ntoa(ip->ip_src) << ":" << ntohs( tcp->th_sport )
+ << ( serverPorts.count( ntohs( tcp->th_dport ) ) ? " -->> " : " <<-- " )
+ << inet_ntoa(ip->ip_dst) << ":" << ntohs( tcp->th_dport )
+ << " " << d.getns()
+ << " " << m.header()->len << " bytes "
+ << " id:" << hex << m.header()->id << dec << "\t" << m.header()->id;
+ processMessage( c , m );
+class AuditingDbMessage : public DbMessage {
+ AuditingDbMessage( const Message &m ) : DbMessage( m ) {}
+ BSONObj nextJsObj( const char *context ) {
+ BSONObj ret = DbMessage::nextJsObj();
+ if ( objcheck && !ret.valid() ) {
+ // TODO provide more debugging info
+ cout << "invalid object in " << context << ": " << ret.hexDump() << endl;
+ }
+ return ret;
+ }
+void processMessage( Connection& c , Message& m ) {
+ AuditingDbMessage d(m);
+ if ( m.operation() == mongo::opReply )
+ out() << " - " << (unsigned)m.header()->responseTo;
+ out() << '\n';
+ try {
+ switch( m.operation() ) {
+ case mongo::opReply: {
+ mongo::QueryResult* r = (mongo::QueryResult*)m.singleData();
+ out() << "\treply" << " n:" << r->nReturned << " cursorId: " << r->cursorId << endl;
+ if ( r->nReturned ) {
+ mongo::BSONObj o( r->data() );
+ out() << "\t" << o << endl;
+ }
+ break;
+ }
+ case mongo::dbQuery: {
+ mongo::QueryMessage q(d);
+ out() << "\tquery: " << q.query << " ntoreturn: " << q.ntoreturn << " ntoskip: " << q.ntoskip;
+ if( !q.fields.isEmpty() )
+ out() << " hasfields";
+ if( q.queryOptions & mongo::QueryOption_SlaveOk )
+ out() << " SlaveOk";
+ if( q.queryOptions & mongo::QueryOption_NoCursorTimeout )
+ out() << " NoCursorTimeout";
+ if( q.queryOptions & ~(mongo::QueryOption_SlaveOk | mongo::QueryOption_NoCursorTimeout) )
+ out() << " queryOptions:" << hex << q.queryOptions;
+ out() << endl;
+ break;
+ }
+ case mongo::dbUpdate: {
+ int flags = d.pullInt();
+ BSONObj q = d.nextJsObj( "update" );
+ BSONObj o = d.nextJsObj( "update" );
+ out() << "\tupdate flags:" << flags << " q:" << q << " o:" << o << endl;
+ break;
+ }
+ case mongo::dbInsert: {
+ out() << "\tinsert: " << d.nextJsObj( "insert" ) << endl;
+ while ( d.moreJSObjs() ) {
+ out() << "\t\t" << d.nextJsObj( "insert" ) << endl;
+ }
+ break;
+ }
+ case mongo::dbGetMore: {
+ int nToReturn = d.pullInt();
+ long long cursorId = d.pullInt64();
+ out() << "\tgetMore nToReturn: " << nToReturn << " cursorId: " << cursorId << endl;
+ break;
+ }
+ case mongo::dbDelete: {
+ int flags = d.pullInt();
+ BSONObj q = d.nextJsObj( "delete" );
+ out() << "\tdelete flags: " << flags << " q: " << q << endl;
+ break;
+ }
+ case mongo::dbKillCursors: {
+ int *x = (int *) m.singleData()->_data;
+ x++; // reserved
+ int n = *x;
+ out() << "\tkillCursors n: " << n << endl;
+ break;
+ }
+ default:
+ out() << "\tunknown opcode " << m.operation() << endl;
+ cerr << "*** CANNOT HANDLE TYPE: " << m.operation() << endl;
+ }
+ }
+ catch ( ... ) {
+ cerr << "Error parsing message for operation: " << m.operation() << endl;
+ }
+ if ( !forwardAddress.empty() ) {
+ if ( m.operation() != mongo::opReply ) {
+ boost::shared_ptr<DBClientConnection> conn = forwarder[ c ];
+ if ( !conn ) {
+ conn.reset(new DBClientConnection( true ));
+ conn->connect( forwardAddress );
+ forwarder[ c ] = conn;
+ }
+ if ( m.operation() == mongo::dbQuery || m.operation() == mongo::dbGetMore ) {
+ if ( m.operation() == mongo::dbGetMore ) {
+ DbMessage d( m );
+ d.pullInt();
+ long long &cId = d.pullInt64();
+ cId = mapCursor[ c ][ cId ];
+ }
+ Message response;
+ conn->port().call( m, response );
+ QueryResult *qr = (QueryResult *) response.singleData();
+ if ( !( qr->resultFlags() & mongo::ResultFlag_CursorNotFound ) ) {
+ if ( qr->cursorId != 0 ) {
+ lastCursor[ c ] = qr->cursorId;
+ return;
+ }
+ }
+ lastCursor[ c ] = 0;
+ }
+ else {
+ conn->port().say( m );
+ }
+ }
+ else {
+ Connection r = c.reverse();
+ long long myCursor = lastCursor[ r ];
+ QueryResult *qr = (QueryResult *) m.singleData();
+ long long yourCursor = qr->cursorId;
+ if ( ( qr->resultFlags() & mongo::ResultFlag_CursorNotFound ) )
+ yourCursor = 0;
+ if ( myCursor && !yourCursor )
+ cerr << "Expected valid cursor in sniffed response, found none" << endl;
+ if ( !myCursor && yourCursor )
+ cerr << "Sniffed valid cursor when none expected" << endl;
+ if ( myCursor && yourCursor ) {
+ mapCursor[ r ][ qr->cursorId ] = lastCursor[ r ];
+ lastCursor[ r ] = 0;
+ }
+ }
+ }
+void processDiagLog( const char * file ) {
+ Connection c;
+ MemoryMappedFile f;
+ long length;
+ unsigned long long L = 0;
+ char * root = (char*) file , L, MemoryMappedFile::SEQUENTIAL );
+ assert( L < 0x80000000 );
+ length = (long) L;
+ assert( root );
+ assert( length > 0 );
+ char * pos = root;
+ long read = 0;
+ while ( read < length ) {
+ Message m(pos,false);
+ int len = m.header()->len;
+ DbMessage d(m);
+ cout << len << " " << d.getns() << endl;
+ processMessage( c , m );
+ read += len;
+ pos += len;
+ }
+ f.close();
+void usage() {
+ cout <<
+ "Usage: mongosniff [--help] [--forward host:port] [--source (NET <interface> | (FILE | DIAGLOG) <filename>)] [<port0> <port1> ... ]\n"
+ "--forward Forward all parsed request messages to mongod instance at \n"
+ " specified host:port\n"
+ "--source Source of traffic to sniff, either a network interface or a\n"
+ " file containing previously captured packets in pcap format,\n"
+ " or a file containing output from mongod's --diaglog option.\n"
+ " If no source is specified, mongosniff will attempt to sniff\n"
+ " from one of the machine's network interfaces.\n"
+ "--objcheck Log hex representation of invalid BSON objects and nothing\n"
+ " else. Spurious messages about invalid objects may result\n"
+ " when there are dropped tcp packets.\n"
+ "<port0>... These parameters are used to filter sniffing. By default, \n"
+ " only port 27017 is sniffed.\n"
+ "--help Print this help message.\n"
+ << endl;
+int main(int argc, char **argv) {
+ stringstream nullStream;
+ nullStream.clear(ios::failbit);
+ const char *dev = NULL;
+ char errbuf[PCAP_ERRBUF_SIZE];
+ pcap_t *handle;
+ struct bpf_program fp;
+ bpf_u_int32 mask;
+ bpf_u_int32 net;
+ bool source = false;
+ bool replay = false;
+ bool diaglog = false;
+ const char *file = 0;
+ vector< const char * > args;
+ for( int i = 1; i < argc; ++i )
+ args.push_back( argv[ i ] );
+ try {
+ for( unsigned i = 0; i < args.size(); ++i ) {
+ const char *arg = args[ i ];
+ if ( arg == string( "--help" ) ) {
+ usage();
+ return 0;
+ }
+ else if ( arg == string( "--forward" ) ) {
+ forwardAddress = args[ ++i ];
+ }
+ else if ( arg == string( "--source" ) ) {
+ uassert( 10266 , "can't use --source twice" , source == false );
+ uassert( 10267 , "source needs more args" , args.size() > i + 2);
+ source = true;
+ replay = ( args[ ++i ] == string( "FILE" ) );
+ diaglog = ( args[ i ] == string( "DIAGLOG" ) );
+ if ( replay || diaglog )
+ file = args[ ++i ];
+ else
+ dev = args[ ++i ];
+ }
+ else if ( arg == string( "--objcheck" ) ) {
+ objcheck = true;
+ outPtr = &nullStream;
+ }
+ else {
+ serverPorts.insert( atoi( args[ i ] ) );
+ }
+ }
+ }
+ catch ( ... ) {
+ usage();
+ return -1;
+ }
+ if ( !serverPorts.size() )
+ serverPorts.insert( 27017 );
+ if ( diaglog ) {
+ processDiagLog( file );
+ return 0;
+ }
+ else if ( replay ) {
+ handle = pcap_open_offline(file, errbuf);
+ if ( ! handle ) {
+ cerr << "error opening capture file!" << endl;
+ return -1;
+ }
+ }
+ else {
+ if ( !dev ) {
+ dev = pcap_lookupdev(errbuf);
+ if ( ! dev ) {
+ cerr << "error finding device: " << errbuf << endl;
+ return -1;
+ }
+ cout << "found device: " << dev << endl;
+ }
+ if (pcap_lookupnet(dev, &net, &mask, errbuf) == -1) {
+ cerr << "can't get netmask: " << errbuf << endl;
+ return -1;
+ }
+ handle = pcap_open_live(dev, SNAP_LEN, 1, 1000, errbuf);
+ if ( ! handle ) {
+ cerr << "error opening device: " << errbuf << endl;
+ return -1;
+ }
+ }
+ switch ( pcap_datalink( handle ) ) {
+ case DLT_EN10MB:
+ captureHeaderSize = 14;
+ break;
+ case DLT_NULL:
+ captureHeaderSize = 4;
+ break;
+ default:
+ cerr << "don't know how to handle datalink type: " << pcap_datalink( handle ) << endl;
+ }
+ assert( pcap_compile(handle, &fp, const_cast< char * >( "tcp" ) , 0, net) != -1 );
+ assert( pcap_setfilter(handle, &fp) != -1 );
+ cout << "sniffing... ";
+ for ( set<int>::iterator i = serverPorts.begin(); i != serverPorts.end(); i++ )
+ cout << *i << " ";
+ cout << endl;
+ pcap_loop(handle, 0 , got_packet, NULL);
+ pcap_freecode(&fp);
+ pcap_close(handle);
+ return 0;