From 68a23c02a5aaa4193a79f309e020f3343e255302 Mon Sep 17 00:00:00 2001 From: David Golden Date: Fri, 29 Jun 2018 16:00:56 -0400 Subject: Import tools: bd441aa9f15a804220bb2f69835d627df90e7e30 from branch v3.6 ref: f3850cb98e..bd441aa9f1 for: 3.6.6 TOOLS-2058 mongoreplay does not show OP_MSG commands TOOLS-2062 Support zlib compression in mongoreplay --- src/mongo/gotools/import.data | 2 +- src/mongo/gotools/mongoreplay/message.go | 15 ------ src/mongo/gotools/mongoreplay/msg_op.go | 2 +- src/mongo/gotools/mongoreplay/opcode.go | 15 ++++++ src/mongo/gotools/mongoreplay/stat_collector.go | 18 ++++++++ src/mongo/gotools/mongoreplay/util.go | 18 ++++++++ .../vendor/src/github.com/10gen/llmgo/compress.go | 54 ++++++++++++++++++++++ 7 files changed, 107 insertions(+), 17 deletions(-) diff --git a/src/mongo/gotools/import.data b/src/mongo/gotools/import.data index fc402d4b529..6a9914ccce0 100644 --- a/src/mongo/gotools/import.data +++ b/src/mongo/gotools/import.data @@ -1,5 +1,5 @@ { - "commit": "f3850cb98e5e005c2de2c82b76b592060f14b9e9", + "commit": "bd441aa9f15a804220bb2f69835d627df90e7e30", "github": "mongodb/mongo-tools.git", "vendor": "tools", "branch": "v3.6" diff --git a/src/mongo/gotools/mongoreplay/message.go b/src/mongo/gotools/mongoreplay/message.go index dcdd9a05ad5..55ad3fd1d0e 100644 --- a/src/mongo/gotools/mongoreplay/message.go +++ b/src/mongo/gotools/mongoreplay/message.go @@ -76,21 +76,6 @@ func (m *MsgHeader) WriteTo(w io.Writer) (int64, error) { return n, nil } -var goodOpCode = map[int32]bool{ - 1: true, //OP_REPLY Reply to a client request. responseTo is set. - 1000: true, //OP_MSG Generic msg command followed by a string. - 2001: true, //OP_UPDATE Update document. - 2002: true, //OP_INSERT Insert new document. - 2003: true, //RESERVED Formerly used for OP_GET_BY_OID. - 2004: true, //OP_QUERY Query a collection. - 2005: true, //OP_GET_MORE Get more data from a query. See Cursors. - 2006: true, //OP_DELETE Delete documents. - 2007: true, //OP_KILL_CURSORS Notify database that the client has finished with the cursor. - 2010: true, //OP_COMMAND A new wire protocol message representing a command request - 2011: true, //OP_COMMANDREPLY A new wire protocol message representing a command - 2012: true, //OP_COMPRESSED Compressed op -} - // LooksReal does a best efffort to detect if a MsgHeadr is not invalid func (m *MsgHeader) LooksReal() bool { // AFAIK, the smallest wire protocol message possible is a 24 byte diff --git a/src/mongo/gotools/mongoreplay/msg_op.go b/src/mongo/gotools/mongoreplay/msg_op.go index 15c5c10744e..38d5beefe08 100644 --- a/src/mongo/gotools/mongoreplay/msg_op.go +++ b/src/mongo/gotools/mongoreplay/msg_op.go @@ -205,7 +205,7 @@ func (msgOp *MsgOp) Meta() OpMetadata { msgOp.Database, msgOp.CommandName, map[string]interface{}{ - "section": msgOp.Sections, + "sections": msgOp.Sections, }, } } diff --git a/src/mongo/gotools/mongoreplay/opcode.go b/src/mongo/gotools/mongoreplay/opcode.go index b305a605fbf..8fea6ff98d0 100644 --- a/src/mongo/gotools/mongoreplay/opcode.go +++ b/src/mongo/gotools/mongoreplay/opcode.go @@ -58,3 +58,18 @@ const ( OpCodeCompressed = OpCode(2012) OpCodeMessage = OpCode(2013) ) + +var goodOpCode = map[int32]bool{ + 1: true, //OP_REPLY Reply to a client request. responseTo is set. + 2001: true, //OP_UPDATE Update document. + 2002: true, //OP_INSERT Insert new document. + 2003: true, //RESERVED Formerly used for OP_GET_BY_OID. + 2004: true, //OP_QUERY Query a collection. + 2005: true, //OP_GET_MORE Get more data from a query. See Cursors. + 2006: true, //OP_DELETE Delete documents. + 2007: true, //OP_KILL_CURSORS Notify database that the client has finished with the cursor. + 2010: true, //OP_COMMAND A new wire protocol message representing a command request + 2011: true, //OP_COMMANDREPLY A new wire protocol message representing a command + 2012: true, //OP_COMPRESSED Compressed op + 2013: true, //OP_MSG New command/reply type +} diff --git a/src/mongo/gotools/mongoreplay/stat_collector.go b/src/mongo/gotools/mongoreplay/stat_collector.go index fa07a289b47..34657afec06 100644 --- a/src/mongo/gotools/mongoreplay/stat_collector.go +++ b/src/mongo/gotools/mongoreplay/stat_collector.go @@ -412,6 +412,24 @@ func (gen *RegularStatGenerator) GenerateOpStat(recordedOp *RecordedOp, parsedOp case *ReplyOp: return gen.ResolveOp(recordedOp, t, stat) } + case OpCodeMessage: + switch t := parsedOp.(type) { + case *MsgOp: + stat.RequestData = meta.Data + stat.RequestID = recordedOp.Header.RequestID + gen.AddUnresolvedOp(recordedOp, parsedOp, stat) + // In 'PairedMode', the stat is not considered completed at this point. + // We save the op as 'unresolved' and return nil. When the reply is seen + // we retrieve the saved stat and generate a completed pair stat, which + // is then returned. + if gen.PairedMode { + return nil + } + case *MsgOpReply: + stat.RequestID = recordedOp.Header.ResponseTo + stat.ReplyData = meta.Data + return gen.ResolveOp(recordedOp, t, stat) + } default: stat.RequestData = meta.Data } diff --git a/src/mongo/gotools/mongoreplay/util.go b/src/mongo/gotools/mongoreplay/util.go index e8b6d1fec0f..8f38bfbc136 100644 --- a/src/mongo/gotools/mongoreplay/util.go +++ b/src/mongo/gotools/mongoreplay/util.go @@ -416,6 +416,9 @@ func ConvertBSONValueToJSON(x interface{}) (interface{}, error) { case int32: // NumberInt return json.NumberInt(v), nil + case uint8: // NumberInt + return json.NumberInt(v), nil + case float64: return json.NumberFloat(v), nil @@ -457,6 +460,21 @@ func ConvertBSONValueToJSON(x interface{}) (interface{}, error) { } return json.JavaScript{v.Code, scope}, nil + case mgo.MsgSection: + out := map[string]interface{}{ + "payloadType": v.PayloadType, + "payload": v.Data, + } + return ConvertBSONValueToJSON(out) + + case mgo.PayloadType1: + out := map[string]interface{}{ + "size": v.Size, + "identifier": v.Identifier, + "documents": v.Docs, + } + return ConvertBSONValueToJSON(out) + default: switch x { case bson.MinKey: // MinKey diff --git a/src/mongo/gotools/vendor/src/github.com/10gen/llmgo/compress.go b/src/mongo/gotools/vendor/src/github.com/10gen/llmgo/compress.go index d73f3ae9bfe..d433f190478 100644 --- a/src/mongo/gotools/vendor/src/github.com/10gen/llmgo/compress.go +++ b/src/mongo/gotools/vendor/src/github.com/10gen/llmgo/compress.go @@ -1,6 +1,8 @@ package mgo import ( + "bytes" + "compress/zlib" "fmt" "io" @@ -19,6 +21,7 @@ type messageCompressor interface { const ( noopCompressorId = 0 snappyCompressorId = 1 + zlibCompressorId = 2 ) var ( @@ -27,6 +30,7 @@ var ( tbl: map[uint8]messageCompressor{ noopCompressorId: new(noopMessageCompressor), snappyCompressorId: new(snappyMessageCompressor), + zlibCompressorId: new(zlibMessageCompressor), }, } ) @@ -169,3 +173,53 @@ func (snappyMessageCompressor) decompressData(dst, src []byte) (n int, err error _, err = snappy.Decode(dst, src) return } + +type zlibMessageCompressor struct{} + +func (zlibMessageCompressor) getId() uint8 { return zlibCompressorId } +func (zlibMessageCompressor) getName() string { return "zlib" } +func (zlibMessageCompressor) getMaxCompressedSize(srcLen int) int { + return srcLen +} +func (zlibMessageCompressor) compressData(dst, src []byte) (n int, err error) { + var buf bytes.Buffer + wtr := zlib.NewWriter(&buf) + _, err = wtr.Write(src) + if err != nil { + return + } + err = wtr.Close() + if err != nil { + return + } + if buf.Len() > len(dst) { + err = io.ErrShortBuffer + return + } + copy(dst, buf.Bytes()) + n = buf.Len() + return +} + +func (zlibMessageCompressor) decompressData(dst, src []byte) (n int, err error) { + var buf bytes.Buffer + rdr, err := zlib.NewReader(bytes.NewReader(src)) + if err != nil { + return + } + _, err = buf.ReadFrom(rdr) + if err != nil { + return + } + err = rdr.Close() + if err != nil { + return + } + if buf.Len() > len(dst) { + err = io.ErrShortBuffer + return + } + copy(dst, buf.Bytes()) + n = buf.Len() + return +} -- cgit v1.2.1