summaryrefslogtreecommitdiff
path: root/src/mongo/gotools/src/github.com/mongodb/mongo-tools/vendor/github.com/10gen/llmgo/socket.go
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/gotools/src/github.com/mongodb/mongo-tools/vendor/github.com/10gen/llmgo/socket.go')
-rw-r--r--src/mongo/gotools/src/github.com/mongodb/mongo-tools/vendor/github.com/10gen/llmgo/socket.go1039
1 files changed, 0 insertions, 1039 deletions
diff --git a/src/mongo/gotools/src/github.com/mongodb/mongo-tools/vendor/github.com/10gen/llmgo/socket.go b/src/mongo/gotools/src/github.com/mongodb/mongo-tools/vendor/github.com/10gen/llmgo/socket.go
deleted file mode 100644
index 87e546cc8ec..00000000000
--- a/src/mongo/gotools/src/github.com/mongodb/mongo-tools/vendor/github.com/10gen/llmgo/socket.go
+++ /dev/null
@@ -1,1039 +0,0 @@
-// Copyright (C) MongoDB, Inc. 2015-present.
-//
-// Licensed under the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License. You may obtain
-// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
-//
-// Based on gopkg.io/mgo.v2 by Gustavo Niemeyer.
-// See THIRD-PARTY-NOTICES for original license terms.
-
-package mgo
-
-import (
- "bytes"
- "errors"
- "fmt"
- "io"
- "net"
- "sync"
- "time"
-
- "github.com/10gen/llmgo/bson"
-)
-
-const (
- opInvalid = 0
- opReply = 1
- dbMsg = 1000
- dbUpdate = 2001
- dbInsert = 2002
- dbQuery = 2004
- dbGetMore = 2005
- dbDelete = 2006
- dbKillCursors = 2007
- dbCommand = 2010
- dbCommandReply = 2011
- dbCompressed = 2012
- dbMessage = 2013
-)
-
-const (
- MsgFlagChecksumPresent = 1
- MsgFlagMoreToCome = (1 << 1)
- MsgFlagExhaustAllowed = (1 << 16)
-)
-
-const (
- MsgPayload0 = uint8(0)
- MsgPayload1 = uint8(1)
-)
-
-type replyFunc func(err error, rfl *replyFuncLegacyArgs, rfc *replyFuncCommandArgs, rfm *replyFuncMsgArgs)
-
-type MongoSocket struct {
- sync.Mutex
- server *MongoServer // nil when cached
- Conn net.Conn
- timeout time.Duration
- addr string // For debugging only.
- nextRequestId uint32
- replyFuncs map[uint32]replyFunc
- references int
- creds []Credential
- logout []Credential
- cachedNonce string
- gotNonce sync.Cond
- dead error
- serverInfo *mongoServerInfo
-}
-
-type QueryOpFlags uint32
-
-const (
- _ QueryOpFlags = 1 << iota
- flagTailable
- flagSlaveOk
- flagLogReplay
- flagNoCursorTimeout
- flagAwaitData
-)
-
-type QueryOp struct {
- Collection string
- Query interface{}
- Skip int32
- Limit int32
- Selector interface{}
- Flags QueryOpFlags
- replyFunc replyFunc
-
- mode Mode
- Options QueryWrapper
- HasOptions bool
- ServerTags []bson.D
-}
-
-func (op *QueryOp) SetReplyFunc(reply replyFunc) {
- op.replyFunc = reply
-}
-
-type QueryWrapper struct {
- Query interface{} "$query"
- OrderBy interface{} "$orderby,omitempty"
- Hint interface{} "$hint,omitempty"
- Explain bool "$explain,omitempty"
- Snapshot bool "$snapshot,omitempty"
- ReadPreference bson.D "$readPreference,omitempty"
- MaxScan int "$maxScan,omitempty"
- MaxTimeMS int "$maxTimeMS,omitempty"
- Comment string "$comment,omitempty"
-}
-
-func (op *QueryOp) finalQuery(socket *MongoSocket) interface{} {
- if op.Flags&flagSlaveOk != 0 && socket.ServerInfo().Mongos {
- var modeName string
- switch op.mode {
- case Strong:
- modeName = "primary"
- case Monotonic, Eventual:
- modeName = "secondaryPreferred"
- case PrimaryPreferred:
- modeName = "primaryPreferred"
- case Secondary:
- modeName = "secondary"
- case SecondaryPreferred:
- modeName = "secondaryPreferred"
- case Nearest:
- modeName = "nearest"
- default:
- panic(fmt.Sprintf("unsupported read mode: %d", op.mode))
- }
- op.HasOptions = true
- op.Options.ReadPreference = make(bson.D, 0, 2)
- op.Options.ReadPreference = append(op.Options.ReadPreference, bson.DocElem{"mode", modeName})
- if len(op.ServerTags) > 0 {
- op.Options.ReadPreference = append(op.Options.ReadPreference, bson.DocElem{"tags", op.ServerTags})
- }
- }
- if op.HasOptions {
- if op.Query == nil {
- var empty bson.D
- op.Options.Query = empty
- } else {
- op.Options.Query = op.Query
- }
- debugf("final query is %#v\n", &op.Options)
- return &op.Options
- }
- return op.Query
-}
-
-type GetMoreOp struct {
- Collection string
- Limit int32
- CursorId int64
- replyFunc replyFunc
-}
-
-type ReplyOp struct {
- Flags uint32
- CursorId int64
- FirstDoc int32
- ReplyDocs int32
-}
-
-type InsertOp struct {
- Collection string // "database.collection"
- Documents []interface{} // One or more documents to insert
- Flags uint32
-}
-
-type UpdateOp struct {
- Collection string `bson:"-"` // "database.collection"
- Selector interface{} `bson:"q"`
- Update interface{} `bson:"u"`
- Flags uint32 `bson:"-"`
- Multi bool `bson:"multi,omitempty"`
- Upsert bool `bson:"upsert,omitempty"`
-}
-
-type DeleteOp struct {
- Collection string `bson:"-"` // "database.collection"
- Selector interface{} `bson:"q"`
- Flags uint32 `bson:"-"`
- Limit int `bson:"limit"`
-}
-
-type KillCursorsOp struct {
- CursorIds []int64
-}
-
-type CommandOp struct {
- Database string
- CommandName string
- Metadata interface{}
- CommandArgs interface{}
- InputDocs []interface{}
- replyFunc replyFunc
-}
-
-type CommandReplyOp struct {
- Metadata interface{}
- CommandReply interface{}
- OutputDocs []interface{}
-}
-
-type MsgOp struct {
- Flags uint32
- Sections []MsgSection
- Checksum uint32
- replyFunc replyFunc
-}
-
-type MsgSection struct {
- PayloadType uint8
- Data interface{}
-}
-
-// PayloadType1 is a container for the OP_MSG payload data of type 1.
-// There is no definition of the type 0 payload because that is simply a
-// bson document.
-type PayloadType1 struct {
- Size int32
- Identifier string
- Docs []interface{}
-}
-
-func (p *PayloadType1) CalculateSize() (int32, error) {
- docsLen := 0
- for _, d := range p.Docs {
- docAsSlice, err := bson.Marshal(d)
- if err != nil {
- return 0, err
- }
- docsLen += len(docAsSlice)
- }
-
- // This math is as follows:
- // (4 bytes for int32 length) +
- // (length of the identifier string) +
- // (1 for the null byte added to c strings) +
- // (length of the marshalled documents)
- return int32(4 + len(p.Identifier) + 1 + docsLen), nil
-}
-
-// replyFuncCommandArgs contains the arguments needed by the replyFunc to complete a CommandReplyOp.
-type replyFuncCommandArgs struct {
- // op is the newly generated CommandReplyOp
- op *CommandReplyOp
-
- // bytesLeft is the number of bytes that remain to be read by the readLoop.
- // This indicates if there is more data or not so that the reply can decide
- // whether or not to release its lock.
- bytesLeft int
-
- // metadata is a slice containing the unread bson of the CommandReplyOp's metadata field.
- metadata []byte
-
- // commandReply is a slice containing the unread bson of the CommandReplyOp's commandReply field.
- commandReply []byte
-
- // outputDoc is a slice of bytes containing the unread bson of a reply document being handed to the
- // replyFunc
- outputDoc []byte
-}
-
-// replyFuncLegacyArgs contains the arguments needed by the replyFunc to complete a ReplyOp.
-type replyFuncLegacyArgs struct {
- // op is the newly generated ReplyOp
- op *ReplyOp
-
- //docNum is the number of the current document being handed to the reply func.
- // This indicates how many docs have been read so the replyFunc can determine if
- // it can release its lock.
- docNum int
-
- // docData is a slice of bytes containing the unread bson of reply document being handed to the
- // replyFunc
- docData []byte
-}
-
-// replyFuncMsgArgs contains the arguments needed by the replyFunc to complete a MsgOp
-// as a reply.
-type replyFuncMsgArgs struct {
- // op is the newly generated MsgOp. It will contain the Flags and Checksum but
- // not the data of the parsed ops.
- op *MsgOp
-
- // docData is a slice of bytes containing the unread bson of reply document being handed to the
- // replyFunc
- sectionsData []byte
-}
-
-func (op *GetMoreOp) SetReplyFunc(reply replyFunc) {
- op.replyFunc = reply
-}
-func (op *CommandOp) SetReplyFunc(reply replyFunc) {
- op.replyFunc = reply
-}
-func (op *MsgOp) SetReplyFunc(reply replyFunc) {
- op.replyFunc = reply
-}
-
-type OpWithReply interface {
- SetReplyFunc(reply replyFunc)
-}
-
-type requestInfo struct {
- bufferPos int
- replyFunc replyFunc
-}
-
-func NewSocket(server *MongoServer, conn net.Conn, timeout time.Duration) *MongoSocket {
- socket := &MongoSocket{
- Conn: conn,
- addr: server.Addr,
- server: server,
- replyFuncs: make(map[uint32]replyFunc),
- }
- socket.gotNonce.L = &socket.Mutex
- if err := socket.InitialAcquire(server.Info(), timeout); err != nil {
- panic("newSocket: InitialAcquire returned error: " + err.Error())
- }
- stats.socketsAlive(+1)
- debugf("Socket %p to %s: initialized", socket, socket.addr)
- go socket.readLoop()
- return socket
-}
-
-func NewDumbSocket(conn net.Conn) *MongoSocket {
- server := &MongoServer{}
- return &MongoSocket{
- server: server,
- addr: server.Addr,
- Conn: conn,
- replyFuncs: make(map[uint32]replyFunc),
- }
-}
-
-// Server returns the server that the socket is associated with.
-// It returns nil while the socket is cached in its respective server.
-func (socket *MongoSocket) Server() *MongoServer {
- socket.Lock()
- server := socket.server
- socket.Unlock()
- return server
-}
-
-// ServerInfo returns details for the server at the time the socket
-// was initially acquired.
-func (socket *MongoSocket) ServerInfo() *mongoServerInfo {
- socket.Lock()
- serverInfo := socket.serverInfo
- socket.Unlock()
- return serverInfo
-}
-
-// InitialAcquire obtains the first reference to the socket, either
-// right after the connection is made or once a recycled socket is
-// being put back in use.
-func (socket *MongoSocket) InitialAcquire(serverInfo *mongoServerInfo, timeout time.Duration) error {
- socket.Lock()
- if socket.references > 0 {
- panic("Socket acquired out of cache with references")
- }
- if socket.dead != nil {
- dead := socket.dead
- socket.Unlock()
- return dead
- }
- socket.references++
- socket.serverInfo = serverInfo
- socket.timeout = timeout
- stats.socketsInUse(+1)
- stats.socketRefs(+1)
- socket.Unlock()
- return nil
-}
-
-// Acquire obtains an additional reference to the socket.
-// The socket will only be recycled when it's released as many
-// times as it's been acquired.
-func (socket *MongoSocket) Acquire() (info *mongoServerInfo) {
- socket.Lock()
- if socket.references == 0 {
- panic("Socket got non-initial acquire with references == 0")
- }
- // We'll track references to dead sockets as well.
- // Caller is still supposed to release the socket.
- socket.references++
- stats.socketRefs(+1)
- serverInfo := socket.serverInfo
- socket.Unlock()
- return serverInfo
-}
-
-// Release decrements a socket reference. The socket will be
-// recycled once its released as many times as it's been acquired.
-func (socket *MongoSocket) Release() {
- socket.Lock()
- if socket.references == 0 {
- panic("socket.Release() with references == 0")
- }
- socket.references--
- stats.socketRefs(-1)
- if socket.references == 0 {
- stats.socketsInUse(-1)
- server := socket.server
- socket.Unlock()
- socket.LogoutAll()
- // If the socket is dead server is nil.
- if server != nil {
- server.RecycleSocket(socket)
- }
- } else {
- socket.Unlock()
- }
-}
-
-// SetTimeout changes the timeout used on socket operations.
-func (socket *MongoSocket) SetTimeout(d time.Duration) {
- socket.Lock()
- socket.timeout = d
- socket.Unlock()
-}
-
-type deadlineType int
-
-const (
- readDeadline deadlineType = 1
- writeDeadline deadlineType = 2
-)
-
-func (socket *MongoSocket) updateDeadline(which deadlineType) {
- var when time.Time
- if socket.timeout > 0 {
- when = time.Now().Add(socket.timeout)
- }
- whichstr := ""
- switch which {
- case readDeadline | writeDeadline:
- whichstr = "read/write"
- socket.Conn.SetDeadline(when)
- case readDeadline:
- whichstr = "read"
- socket.Conn.SetReadDeadline(when)
- case writeDeadline:
- whichstr = "write"
- socket.Conn.SetWriteDeadline(when)
- default:
- panic("invalid parameter to updateDeadline")
- }
- debugf("Socket %p to %s: updated %s deadline to %s ahead (%s)", socket, socket.addr, whichstr, socket.timeout, when)
-}
-
-// Close terminates the socket use.
-func (socket *MongoSocket) Close() {
- socket.kill(errors.New("Closed explicitly"), false)
-}
-
-func (socket *MongoSocket) kill(err error, abend bool) {
- socket.Lock()
- if socket.dead != nil {
- debugf("Socket %p to %s: killed again: %s (previously: %s)", socket, socket.addr, err.Error(), socket.dead.Error())
- socket.Unlock()
- return
- }
- logf("Socket %p to %s: closing: %s (abend=%v)", socket, socket.addr, err.Error(), abend)
- socket.dead = err
- socket.Conn.Close()
- stats.socketsAlive(-1)
- replyFuncs := socket.replyFuncs
- socket.replyFuncs = make(map[uint32]replyFunc)
- server := socket.server
- socket.server = nil
- socket.gotNonce.Broadcast()
- socket.Unlock()
- for _, replyFunc := range replyFuncs {
- logf("Socket %p to %s: notifying replyFunc of closed socket: %s", socket, socket.addr, err.Error())
- replyFunc(err, nil, nil, nil)
- }
- if abend {
- server.AbendSocket(socket)
- }
-}
-
-func (socket *MongoSocket) SimpleQuery(op *QueryOp) (data []byte, replyOp *ReplyOp, err error) {
- var wait, change sync.Mutex
- var replyDone bool
- var replyData []byte
- var replyErr error
- wait.Lock()
- op.replyFunc = func(err error, rfl *replyFuncLegacyArgs, rfc *replyFuncCommandArgs, rfm *replyFuncMsgArgs) {
- change.Lock()
- if !replyDone {
- replyDone = true
- replyErr = err
- if rfl != nil {
- replyOp = rfl.op
- if err == nil {
- replyData = rfl.docData
- }
- }
- }
- change.Unlock()
- wait.Unlock()
- }
- err = socket.Query(op)
- if err != nil {
- return nil, nil, err
- }
- wait.Lock()
- change.Lock()
- data = replyData
- err = replyErr
- change.Unlock()
- return data, replyOp, err
-}
-
-func (socket *MongoSocket) Query(ops ...interface{}) (err error) {
- if lops := socket.flushLogout(); len(lops) > 0 {
- ops = append(lops, ops...)
- }
-
- buf := make([]byte, 0, 256)
-
- // Serialize operations synchronously to avoid interrupting
- // other goroutines while we can't really be sending data.
- // Also, record id positions so that we can compute request
- // ids at once later with the lock already held.
- requests := make([]requestInfo, len(ops))
- requestCount := 0
-
- for _, op := range ops {
- debugf("Socket %p to %s: serializing op: %#v", socket, socket.addr, op)
- start := len(buf)
- var replyFunc replyFunc
- switch op := op.(type) {
-
- case *UpdateOp:
- buf = addHeader(buf, dbUpdate)
- buf = addInt32(buf, 0) // Reserved
- buf = addCString(buf, op.Collection)
- buf = addInt32(buf, int32(op.Flags))
- debugf("Socket %p to %s: serializing selector document: %#v", socket, socket.addr, op.Selector)
- buf, err = addBSON(buf, op.Selector)
- if err != nil {
- return err
- }
- debugf("Socket %p to %s: serializing update document: %#v", socket, socket.addr, op.Update)
- buf, err = addBSON(buf, op.Update)
- if err != nil {
- return err
- }
-
- case *InsertOp:
- buf = addHeader(buf, dbInsert)
- buf = addInt32(buf, int32(op.Flags))
- buf = addCString(buf, op.Collection)
- for _, doc := range op.Documents {
- debugf("Socket %p to %s: serializing document for insertion: %#v", socket, socket.addr, doc)
- buf, err = addBSON(buf, doc)
- if err != nil {
- return err
- }
- }
-
- case *QueryOp:
- buf = addHeader(buf, dbQuery)
- buf = addInt32(buf, int32(op.Flags))
- buf = addCString(buf, op.Collection)
- buf = addInt32(buf, op.Skip)
- buf = addInt32(buf, op.Limit)
- buf, err = addBSON(buf, op.finalQuery(socket))
- if err != nil {
- return err
- }
- if op.Selector != nil {
- buf, err = addBSON(buf, op.Selector)
- if err != nil {
- return err
- }
- }
- replyFunc = op.replyFunc
-
- case *GetMoreOp:
- buf = addHeader(buf, dbGetMore)
- buf = addInt32(buf, 0) // Reserved
- buf = addCString(buf, op.Collection)
- buf = addInt32(buf, op.Limit)
- buf = addInt64(buf, op.CursorId)
- replyFunc = op.replyFunc
-
- case *ReplyOp:
- buf = addHeader(buf, opReply)
- buf = addInt32(buf, int32(op.Flags))
- buf = addInt64(buf, op.CursorId)
- buf = addInt32(buf, op.FirstDoc)
- buf = addInt32(buf, op.ReplyDocs)
-
- case *DeleteOp:
- buf = addHeader(buf, dbDelete)
- buf = addInt32(buf, 0) // Reserved
- buf = addCString(buf, op.Collection)
- buf = addInt32(buf, int32(op.Flags))
- debugf("Socket %p to %s: serializing selector document: %#v", socket, socket.addr, op.Selector)
- buf, err = addBSON(buf, op.Selector)
- if err != nil {
- return err
- }
-
- case *KillCursorsOp:
- buf = addHeader(buf, dbKillCursors)
- buf = addInt32(buf, 0) // Reserved
- buf = addInt32(buf, int32(len(op.CursorIds)))
- for _, cursorId := range op.CursorIds {
- buf = addInt64(buf, cursorId)
- }
- case *CommandOp:
- buf = addHeader(buf, dbCommand)
- buf = addCString(buf, op.Database)
- buf = addCString(buf, op.CommandName)
- buf, err = addBSON(buf, op.CommandArgs)
- if err != nil {
- return err
- }
- buf, err = addBSON(buf, op.Metadata)
- if err != nil {
- return err
- }
- for _, doc := range op.InputDocs {
- debugf("Socket %p to %s: serializing document for opcommand: %#v", socket, socket.addr, doc)
- buf, err = addBSON(buf, doc)
- if err != nil {
- return err
- }
- }
- replyFunc = op.replyFunc
- case *CommandReplyOp:
- buf = addHeader(buf, dbCommandReply)
- buf, err = addBSON(buf, op.CommandReply)
- if err != nil {
- return err
- }
- buf, err = addBSON(buf, op.Metadata)
- if err != nil {
- return err
- }
- for _, doc := range op.OutputDocs {
- debugf("Socket %p to %s: serializing document for opcommand: %#v", socket, socket.addr, doc)
- buf, err = addBSON(buf, doc)
- if err != nil {
- return err
- }
- }
- case *MsgOp:
- buf = addHeader(buf, dbMessage)
- buf = addInt32(buf, int32(op.Flags))
- for _, section := range op.Sections {
- // Read the payload type
- switch section.PayloadType {
- // If it's the simpler type (type 0) write out its type and BSON
- case MsgPayload0:
- buf = append(buf, byte(0))
- buf, err = addBSON(buf, section.Data)
- if err != nil {
- return err
- }
- case MsgPayload1:
- // Write the payload type
- buf = append(buf, byte(1))
- payload, ok := section.Data.(PayloadType1)
- if !ok {
- panic("incorrect type given for payload")
- }
-
- //Write out the size
- buf = addInt32(buf, payload.Size)
- //Write out the identifier
- buf = addCString(buf, payload.Identifier)
- //Write out the docs
- for _, d := range payload.Docs {
- buf, err = addBSON(buf, d)
- if err != nil {
- return err
- }
- }
- default:
- return fmt.Errorf("unknown payload type: %d", section.PayloadType)
- }
- }
- // check if the checksum should be present
- if op.Flags&MsgFlagChecksumPresent != 0 {
- buf = addInt32(buf, int32(op.Checksum))
- }
- replyFunc = op.replyFunc
-
- default:
- panic("internal error: unknown operation type")
- }
-
- setInt32(buf, start, int32(len(buf)-start))
-
- if replyFunc != nil {
- request := &requests[requestCount]
- request.replyFunc = replyFunc
- request.bufferPos = start
- requestCount++
- }
- }
-
- // Buffer is ready for the pipe. Lock, allocate ids, and enqueue.
-
- socket.Lock()
- if socket.dead != nil {
- dead := socket.dead
- socket.Unlock()
- debugf("Socket %p to %s: failing query, already closed: %s", socket, socket.addr, socket.dead.Error())
- // XXX This seems necessary in case the session is closed concurrently
- // with a query being performed, but it's not yet tested:
- for i := 0; i != requestCount; i++ {
- request := &requests[i]
- if request.replyFunc != nil {
- request.replyFunc(dead, nil, nil, nil)
- }
- }
- return dead
- }
-
- wasWaiting := len(socket.replyFuncs) > 0
-
- // Reserve id 0 for requests which should have no responses.
- requestId := socket.nextRequestId + 1
- if requestId == 0 {
- requestId++
- }
- socket.nextRequestId = requestId + uint32(requestCount)
- for i := 0; i != requestCount; i++ {
- request := &requests[i]
- setInt32(buf, request.bufferPos+4, int32(requestId))
- socket.replyFuncs[requestId] = request.replyFunc
- requestId++
- }
-
- debugf("Socket %p to %s: sending %d op(s) (%d bytes)", socket, socket.addr, len(ops), len(buf))
- stats.sentOps(len(ops))
-
- socket.updateDeadline(writeDeadline)
- _, err = socket.Conn.Write(buf)
-
- if !wasWaiting && requestCount > 0 {
- socket.updateDeadline(readDeadline)
- }
- socket.Unlock()
- return err
-}
-
-// Estimated minimum cost per socket: 1 goroutine + memory for the largest
-// document ever seen.
-func (socket *MongoSocket) readLoop() {
- headerBuf := make([]byte, 16) // 16 from header
- bodyBuf := make([]byte, 20) // 20 from OP_REPLY fixed fields or other general uses
- for {
- var r io.Reader = socket.Conn
-
- // XXX Handle timeouts, , etc
- _, err := io.ReadFull(r, headerBuf)
- if err != nil {
- socket.kill(err, true)
- return
- }
-
- totalLen := getInt32(headerBuf, 0)
- responseTo := getInt32(headerBuf, 8)
- opCode := getInt32(headerBuf, 12)
-
- if opCode == dbCompressed {
- buf := bytes.NewBuffer(headerBuf)
- io.CopyN(buf, r, int64(totalLen-16))
- msg, err := DecompressMessage(buf.Bytes())
- if err != nil {
- socket.kill(err, true)
- return
- }
- r = bytes.NewBuffer(msg)
-
- _, err = io.ReadFull(r, headerBuf)
- if err != nil {
- socket.kill(err, true)
- return
- }
- opCode = getInt32(headerBuf, 12)
- if opCode == dbCompressed {
- err = fmt.Errorf("cannot recursively decompress messages")
- socket.kill(err, true)
- return
- }
- }
-
- // Don't use socket.server.Addr here. socket is not
- // locked and socket.server may go away.
- debugf("Socket %p to %s: got reply (%d bytes)", socket, socket.addr, totalLen)
-
- socket.Lock()
- replyFunc, ok := socket.replyFuncs[uint32(responseTo)]
- if ok {
- delete(socket.replyFuncs, uint32(responseTo))
- }
- socket.Unlock()
-
- switch opCode {
- case opReply:
- _, err := io.ReadFull(r, bodyBuf)
- if err != nil {
- socket.kill(err, true)
- return
- }
- reply := ReplyOp{
- Flags: uint32(getInt32(bodyBuf, 0)),
- CursorId: getInt64(bodyBuf, 4),
- FirstDoc: getInt32(bodyBuf, 12),
- ReplyDocs: getInt32(bodyBuf, 16),
- }
- stats.receivedOps(+1)
- stats.receivedDocs(int(reply.ReplyDocs))
- if replyFunc != nil && reply.ReplyDocs == 0 {
- rfl := replyFuncLegacyArgs{
- op: &reply,
- docNum: -1,
- }
- replyFunc(nil, &rfl, nil, nil)
- } else {
- for i := 0; i != int(reply.ReplyDocs); i++ {
- b, err := readDocument(r)
- if err != nil {
- if replyFunc != nil {
- replyFunc(err, &replyFuncLegacyArgs{docNum: -1}, nil, nil)
- }
- socket.kill(err, true)
- return
- }
-
- if replyFunc != nil {
- rfl := replyFuncLegacyArgs{
- op: &reply,
- docNum: i,
- docData: b,
- }
- replyFunc(nil, &rfl, nil, nil)
- }
- // XXX Do bound checking against totalLen.
- }
- }
- case dbCommandReply:
- commandReplyAsSlice, err := readDocument(r)
- if err != nil {
- socket.kill(err, true)
- return
- }
- metadataAsSlice, err := readDocument(r)
- if err != nil {
- socket.kill(err, true)
- return
- }
- rfc := replyFuncCommandArgs{
- op: &CommandReplyOp{},
- metadata: metadataAsSlice,
- commandReply: commandReplyAsSlice,
- }
- lengthRead := len(commandReplyAsSlice) + len(metadataAsSlice)
- if replyFunc != nil && lengthRead+16 >= int(totalLen) {
- replyFunc(nil, nil, &rfc, nil)
- }
-
- docLen := 0
- for lengthRead+docLen < int(totalLen)-16 {
- documentBuf, err := readDocument(r)
- if err != nil {
- rfc.bytesLeft = 0
- if replyFunc != nil {
- replyFunc(err, nil, &rfc, nil)
- }
- socket.kill(err, true)
- return
- }
- rfc.outputDoc = documentBuf
- if replyFunc != nil {
- replyFunc(nil, nil, &rfc, nil)
- }
- docLen += len(documentBuf)
- }
- case dbMessage:
- rfm := replyFuncMsgArgs{}
- // READ THE FLAGS
- _, err := io.ReadFull(r, bodyBuf[:4])
- if err != nil {
- socket.kill(err, true)
- return
- }
-
- reply := &MsgOp{
- Flags: uint32(getInt32(bodyBuf, 0)),
- }
- rfm.op = reply
- lengthRead := 4
- checksumLength := 0
- checksumPresent := false
- if (reply.Flags & MsgFlagChecksumPresent) != 0 {
- checksumPresent = true
- checksumLength = 4
- }
-
- // read the sections into a big buffer.
- // 16 is used here for the length of the message header.
- sectionsLength := int(totalLen) - (checksumLength + lengthRead + 16)
- rfm.sectionsData = make([]byte, sectionsLength)
- _, err = io.ReadFull(r, rfm.sectionsData)
- if err != nil {
- socket.kill(err, true)
- return
- }
- if checksumPresent {
- //read the checksum
- _, err := io.ReadFull(r, bodyBuf[:4])
- if err != nil {
- socket.kill(err, true)
- return
- }
- rfm.op.Checksum = uint32(getInt32(bodyBuf, 0))
- }
-
- if replyFunc != nil {
- replyFunc(nil, nil, nil, &rfm)
- }
-
- default:
- socket.kill(errors.New("opcode != 1, 2011, or 2013, corrupted data?"), true)
- return
- }
-
- socket.Lock()
- if len(socket.replyFuncs) == 0 {
- // Nothing else to read for now. Disable deadline.
- socket.Conn.SetReadDeadline(time.Time{})
- } else {
- socket.updateDeadline(readDeadline)
- }
- socket.Unlock()
-
- // XXX Do bound checking against totalLen.
- }
-}
-
-func readDocument(r io.Reader) (docBuf []byte, err error) {
- sizeBuf := make([]byte, 4)
- _, err = io.ReadFull(r, sizeBuf)
- if err != nil {
- return
- }
- size := getInt32(sizeBuf, 0)
- docBuf = make([]byte, int(size))
-
- copy(docBuf, sizeBuf)
-
- _, err = io.ReadFull(r, docBuf[4:])
- if err != nil {
- return
- }
- if globalDebug && globalLogger != nil {
- m := bson.M{}
- if err := bson.Unmarshal(docBuf, m); err == nil {
- if conn, ok := r.(net.Conn); ok {
- debugf("Socket with addr '%s' received document: %#v", conn.RemoteAddr(), m)
- }
- }
- }
- return
-}
-
-var emptyHeader = []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}
-
-func addHeader(b []byte, opcode int) []byte {
- i := len(b)
- b = append(b, emptyHeader...)
- // Enough for current opcodes.
- b[i+12] = byte(opcode)
- b[i+13] = byte(opcode >> 8)
- return b
-}
-
-func addInt32(b []byte, i int32) []byte {
- return append(b, byte(i), byte(i>>8), byte(i>>16), byte(i>>24))
-}
-
-func addInt64(b []byte, i int64) []byte {
- return append(b, byte(i), byte(i>>8), byte(i>>16), byte(i>>24),
- byte(i>>32), byte(i>>40), byte(i>>48), byte(i>>56))
-}
-
-func addCString(b []byte, s string) []byte {
- b = append(b, []byte(s)...)
- b = append(b, 0)
- return b
-}
-
-func addBSON(b []byte, doc interface{}) ([]byte, error) {
- if doc == nil {
- return append(b, 5, 0, 0, 0, 0), nil
- }
- data, err := bson.Marshal(doc)
- if err != nil {
- return b, err
- }
- return append(b, data...), nil
-}
-
-func setInt32(b []byte, pos int, i int32) {
- b[pos] = byte(i)
- b[pos+1] = byte(i >> 8)
- b[pos+2] = byte(i >> 16)
- b[pos+3] = byte(i >> 24)
-}
-
-func getInt32(b []byte, pos int) int32 {
- return (int32(b[pos+0])) |
- (int32(b[pos+1]) << 8) |
- (int32(b[pos+2]) << 16) |
- (int32(b[pos+3]) << 24)
-}
-
-func getInt64(b []byte, pos int) int64 {
- return (int64(b[pos+0])) |
- (int64(b[pos+1]) << 8) |
- (int64(b[pos+2]) << 16) |
- (int64(b[pos+3]) << 24) |
- (int64(b[pos+4]) << 32) |
- (int64(b[pos+5]) << 40) |
- (int64(b[pos+6]) << 48) |
- (int64(b[pos+7]) << 56)
-}