diff options
Diffstat (limited to 'src/mongo/gotools/vendor/src/gopkg.in/mgo.v2/txn/txn.go')
-rw-r--r-- | src/mongo/gotools/vendor/src/gopkg.in/mgo.v2/txn/txn.go | 611 |
1 files changed, 611 insertions, 0 deletions
diff --git a/src/mongo/gotools/vendor/src/gopkg.in/mgo.v2/txn/txn.go b/src/mongo/gotools/vendor/src/gopkg.in/mgo.v2/txn/txn.go new file mode 100644 index 00000000000..204b3cf1d8d --- /dev/null +++ b/src/mongo/gotools/vendor/src/gopkg.in/mgo.v2/txn/txn.go @@ -0,0 +1,611 @@ +// The txn package implements support for multi-document transactions. +// +// For details check the following blog post: +// +// http://blog.labix.org/2012/08/22/multi-doc-transactions-for-mongodb +// +package txn + +import ( + "encoding/binary" + "fmt" + "reflect" + "sort" + "strings" + "sync" + + "gopkg.in/mgo.v2" + "gopkg.in/mgo.v2/bson" + + crand "crypto/rand" + mrand "math/rand" +) + +type state int + +const ( + tpreparing state = 1 // One or more documents not prepared + tprepared state = 2 // Prepared but not yet ready to run + taborting state = 3 // Assertions failed, cleaning up + tapplying state = 4 // Changes are in progress + taborted state = 5 // Pre-conditions failed, nothing done + tapplied state = 6 // All changes applied +) + +func (s state) String() string { + switch s { + case tpreparing: + return "preparing" + case tprepared: + return "prepared" + case taborting: + return "aborting" + case tapplying: + return "applying" + case taborted: + return "aborted" + case tapplied: + return "applied" + } + panic(fmt.Errorf("unknown state: %d", s)) +} + +var rand *mrand.Rand +var randmu sync.Mutex + +func init() { + var seed int64 + err := binary.Read(crand.Reader, binary.BigEndian, &seed) + if err != nil { + panic(err) + } + rand = mrand.New(mrand.NewSource(seed)) +} + +type transaction struct { + Id bson.ObjectId `bson:"_id"` + State state `bson:"s"` + Info interface{} `bson:"i,omitempty"` + Ops []Op `bson:"o"` + Nonce string `bson:"n,omitempty"` + Revnos []int64 `bson:"r,omitempty"` + + docKeysCached docKeys +} + +func (t *transaction) String() string { + if t.Nonce == "" { + return t.Id.Hex() + } + return string(t.token()) +} + +func (t *transaction) done() bool { + return t.State == tapplied || t.State == taborted +} + +func (t *transaction) token() token { + if t.Nonce == "" { + panic("transaction has no nonce") + } + return tokenFor(t) +} + +func (t *transaction) docKeys() docKeys { + if t.docKeysCached != nil { + return t.docKeysCached + } + dkeys := make(docKeys, 0, len(t.Ops)) +NextOp: + for _, op := range t.Ops { + dkey := op.docKey() + for i := range dkeys { + if dkey == dkeys[i] { + continue NextOp + } + } + dkeys = append(dkeys, dkey) + } + sort.Sort(dkeys) + t.docKeysCached = dkeys + return dkeys +} + +// tokenFor returns a unique transaction token that +// is composed by t's id and a nonce. If t already has +// a nonce assigned to it, it will be used, otherwise +// a new nonce will be generated. +func tokenFor(t *transaction) token { + nonce := t.Nonce + if nonce == "" { + nonce = newNonce() + } + return token(t.Id.Hex() + "_" + nonce) +} + +func newNonce() string { + randmu.Lock() + r := rand.Uint32() + randmu.Unlock() + n := make([]byte, 8) + for i := uint(0); i < 8; i++ { + n[i] = "0123456789abcdef"[(r>>(4*i))&0xf] + } + return string(n) +} + +type token string + +func (tt token) id() bson.ObjectId { return bson.ObjectIdHex(string(tt[:24])) } +func (tt token) nonce() string { return string(tt[25:]) } + +// Op represents an operation to a single document that may be +// applied as part of a transaction with other operations. +type Op struct { + // C and Id identify the collection and document this operation + // refers to. Id is matched against the "_id" document field. + C string `bson:"c"` + Id interface{} `bson:"d"` + + // Assert optionally holds a query document that is used to + // test the operation document at the time the transaction is + // going to be applied. The assertions for all operations in + // a transaction are tested before any changes take place, + // and the transaction is entirely aborted if any of them + // fails. This is also the only way to prevent a transaction + // from being being applied (the transaction continues despite + // the outcome of Insert, Update, and Remove). + Assert interface{} `bson:"a,omitempty"` + + // The Insert, Update and Remove fields describe the mutation + // intended by the operation. At most one of them may be set + // per operation. If none are set, Assert must be set and the + // operation becomes a read-only test. + // + // Insert holds the document to be inserted at the time the + // transaction is applied. The Id field will be inserted + // into the document automatically as its _id field. The + // transaction will continue even if the document already + // exists. Use Assert with txn.DocMissing if the insertion is + // required. + // + // Update holds the update document to be applied at the time + // the transaction is applied. The transaction will continue + // even if a document with Id is missing. Use Assert to + // test for the document presence or its contents. + // + // Remove indicates whether to remove the document with Id. + // The transaction continues even if the document doesn't yet + // exist at the time the transaction is applied. Use Assert + // with txn.DocExists to make sure it will be removed. + Insert interface{} `bson:"i,omitempty"` + Update interface{} `bson:"u,omitempty"` + Remove bool `bson:"r,omitempty"` +} + +func (op *Op) isChange() bool { + return op.Update != nil || op.Insert != nil || op.Remove +} + +func (op *Op) docKey() docKey { + return docKey{op.C, op.Id} +} + +func (op *Op) name() string { + switch { + case op.Update != nil: + return "update" + case op.Insert != nil: + return "insert" + case op.Remove: + return "remove" + case op.Assert != nil: + return "assert" + } + return "none" +} + +const ( + // DocExists and DocMissing may be used on an operation's + // Assert value to assert that the document with the given + // Id exists or does not exist, respectively. + DocExists = "d+" + DocMissing = "d-" +) + +// A Runner applies operations as part of a transaction onto any number +// of collections within a database. See the Run method for details. +type Runner struct { + tc *mgo.Collection // txns + sc *mgo.Collection // stash + lc *mgo.Collection // log +} + +// NewRunner returns a new transaction runner that uses tc to hold its +// transactions. +// +// Multiple transaction collections may exist in a single database, but +// all collections that are touched by operations in a given transaction +// collection must be handled exclusively by it. +// +// A second collection with the same name of tc but suffixed by ".stash" +// will be used for implementing the transactional behavior of insert +// and remove operations. +func NewRunner(tc *mgo.Collection) *Runner { + return &Runner{tc, tc.Database.C(tc.Name + ".stash"), nil} +} + +var ErrAborted = fmt.Errorf("transaction aborted") + +// Run creates a new transaction with ops and runs it immediately. +// The id parameter specifies the transaction id, and may be written +// down ahead of time to later verify the success of the change and +// resume it, when the procedure is interrupted for any reason. If +// empty, a random id will be generated. +// The info parameter, if not nil, is included under the "i" +// field of the transaction document. +// +// Operations across documents are not atomically applied, but are +// guaranteed to be eventually all applied in the order provided or +// all aborted, as long as the affected documents are only modified +// through transactions. If documents are simultaneously modified +// by transactions and out of transactions the behavior is undefined. +// +// If Run returns no errors, all operations were applied successfully. +// If it returns ErrAborted, one or more operations can't be applied +// and the transaction was entirely aborted with no changes performed. +// Otherwise, if the transaction is interrupted while running for any +// reason, it may be resumed explicitly or by attempting to apply +// another transaction on any of the documents targeted by ops, as +// long as the interruption was made after the transaction document +// itself was inserted. Run Resume with the obtained transaction id +// to confirm whether the transaction was applied or not. +// +// Any number of transactions may be run concurrently, with one +// runner or many. +func (r *Runner) Run(ops []Op, id bson.ObjectId, info interface{}) (err error) { + const efmt = "error in transaction op %d: %s" + for i := range ops { + op := &ops[i] + if op.C == "" || op.Id == nil { + return fmt.Errorf(efmt, i, "C or Id missing") + } + changes := 0 + if op.Insert != nil { + changes++ + } + if op.Update != nil { + changes++ + } + if op.Remove { + changes++ + } + if changes > 1 { + return fmt.Errorf(efmt, i, "more than one of Insert/Update/Remove set") + } + if changes == 0 && op.Assert == nil { + return fmt.Errorf(efmt, i, "none of Assert/Insert/Update/Remove set") + } + } + if id == "" { + id = bson.NewObjectId() + } + + // Insert transaction sooner rather than later, to stay on the safer side. + t := transaction{ + Id: id, + Ops: ops, + State: tpreparing, + Info: info, + } + if err = r.tc.Insert(&t); err != nil { + return err + } + if err = flush(r, &t); err != nil { + return err + } + if t.State == taborted { + return ErrAborted + } else if t.State != tapplied { + panic(fmt.Errorf("invalid state for %s after flush: %q", &t, t.State)) + } + return nil +} + +// ResumeAll resumes all pending transactions. All ErrAborted errors +// from individual transactions are ignored. +func (r *Runner) ResumeAll() (err error) { + debugf("Resuming all unfinished transactions") + iter := r.tc.Find(bson.D{{"s", bson.D{{"$in", []state{tpreparing, tprepared, tapplying}}}}}).Iter() + var t transaction + for iter.Next(&t) { + if t.State == tapplied || t.State == taborted { + continue + } + debugf("Resuming %s from %q", t.Id, t.State) + if err := flush(r, &t); err != nil { + return err + } + if !t.done() { + panic(fmt.Errorf("invalid state for %s after flush: %q", &t, t.State)) + } + } + return nil +} + +// Resume resumes the transaction with id. It returns mgo.ErrNotFound +// if the transaction is not found. Otherwise, it has the same semantics +// of the Run method after the transaction is inserted. +func (r *Runner) Resume(id bson.ObjectId) (err error) { + t, err := r.load(id) + if err != nil { + return err + } + if !t.done() { + debugf("Resuming %s from %q", t, t.State) + if err := flush(r, t); err != nil { + return err + } + } + if t.State == taborted { + return ErrAborted + } else if t.State != tapplied { + panic(fmt.Errorf("invalid state for %s after flush: %q", t, t.State)) + } + return nil +} + +// ChangeLog enables logging of changes to the given collection +// every time a transaction that modifies content is done being +// applied. +// +// Saved documents are in the format: +// +// {"_id": <txn id>, <collection>: {"d": [<doc id>, ...], "r": [<doc revno>, ...]}} +// +// The document revision is the value of the txn-revno field after +// the change has been applied. Negative values indicate the document +// was not present in the collection. Revisions will not change when +// updates or removes are applied to missing documents or inserts are +// attempted when the document isn't present. +func (r *Runner) ChangeLog(logc *mgo.Collection) { + r.lc = logc +} + +// PurgeMissing removes from collections any state that refers to transaction +// documents that for whatever reason have been lost from the system (removed +// by accident or lost in a hard crash, for example). +// +// This method should very rarely be needed, if at all, and should never be +// used during the normal operation of an application. Its purpose is to put +// a system that has seen unavoidable corruption back in a working state. +func (r *Runner) PurgeMissing(collections ...string) error { + type M map[string]interface{} + type S []interface{} + + type TDoc struct { + Id interface{} "_id" + TxnQueue []string "txn-queue" + } + + found := make(map[bson.ObjectId]bool) + + sort.Strings(collections) + for _, collection := range collections { + c := r.tc.Database.C(collection) + iter := c.Find(nil).Select(bson.M{"_id": 1, "txn-queue": 1}).Iter() + var tdoc TDoc + for iter.Next(&tdoc) { + for _, txnToken := range tdoc.TxnQueue { + txnId := bson.ObjectIdHex(txnToken[:24]) + if found[txnId] { + continue + } + if r.tc.FindId(txnId).One(nil) == nil { + found[txnId] = true + continue + } + logf("WARNING: purging from document %s/%v the missing transaction id %s", collection, tdoc.Id, txnId) + err := c.UpdateId(tdoc.Id, M{"$pull": M{"txn-queue": M{"$regex": "^" + txnId.Hex() + "_*"}}}) + if err != nil { + return fmt.Errorf("error purging missing transaction %s: %v", txnId.Hex(), err) + } + } + } + if err := iter.Close(); err != nil { + return fmt.Errorf("transaction queue iteration error for %s: %v", collection, err) + } + } + + type StashTDoc struct { + Id docKey "_id" + TxnQueue []string "txn-queue" + } + + iter := r.sc.Find(nil).Select(bson.M{"_id": 1, "txn-queue": 1}).Iter() + var stdoc StashTDoc + for iter.Next(&stdoc) { + for _, txnToken := range stdoc.TxnQueue { + txnId := bson.ObjectIdHex(txnToken[:24]) + if found[txnId] { + continue + } + if r.tc.FindId(txnId).One(nil) == nil { + found[txnId] = true + continue + } + logf("WARNING: purging from stash document %s/%v the missing transaction id %s", stdoc.Id.C, stdoc.Id.Id, txnId) + err := r.sc.UpdateId(stdoc.Id, M{"$pull": M{"txn-queue": M{"$regex": "^" + txnId.Hex() + "_*"}}}) + if err != nil { + return fmt.Errorf("error purging missing transaction %s: %v", txnId.Hex(), err) + } + } + } + if err := iter.Close(); err != nil { + return fmt.Errorf("transaction stash iteration error: %v", err) + } + + return nil +} + +func (r *Runner) load(id bson.ObjectId) (*transaction, error) { + var t transaction + err := r.tc.FindId(id).One(&t) + if err == mgo.ErrNotFound { + return nil, fmt.Errorf("cannot find transaction %s", id) + } else if err != nil { + return nil, err + } + return &t, nil +} + +type typeNature int + +const ( + // The order of these values matters. Transactions + // from applications using different ordering will + // be incompatible with each other. + _ typeNature = iota + natureString + natureInt + natureFloat + natureBool + natureStruct +) + +func valueNature(v interface{}) (value interface{}, nature typeNature) { + rv := reflect.ValueOf(v) + switch rv.Kind() { + case reflect.String: + return rv.String(), natureString + case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64: + return rv.Int(), natureInt + case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64: + return int64(rv.Uint()), natureInt + case reflect.Float32, reflect.Float64: + return rv.Float(), natureFloat + case reflect.Bool: + return rv.Bool(), natureBool + case reflect.Struct: + return v, natureStruct + } + panic("document id type unsupported by txn: " + rv.Kind().String()) +} + +type docKey struct { + C string + Id interface{} +} + +type docKeys []docKey + +func (ks docKeys) Len() int { return len(ks) } +func (ks docKeys) Swap(i, j int) { ks[i], ks[j] = ks[j], ks[i] } +func (ks docKeys) Less(i, j int) bool { + a, b := ks[i], ks[j] + if a.C != b.C { + return a.C < b.C + } + return valuecmp(a.Id, b.Id) == -1 +} + +func valuecmp(a, b interface{}) int { + av, an := valueNature(a) + bv, bn := valueNature(b) + if an < bn { + return -1 + } + if an > bn { + return 1 + } + + if av == bv { + return 0 + } + var less bool + switch an { + case natureString: + less = av.(string) < bv.(string) + case natureInt: + less = av.(int64) < bv.(int64) + case natureFloat: + less = av.(float64) < bv.(float64) + case natureBool: + less = !av.(bool) && bv.(bool) + case natureStruct: + less = structcmp(av, bv) == -1 + default: + panic("unreachable") + } + if less { + return -1 + } + return 1 +} + +func structcmp(a, b interface{}) int { + av := reflect.ValueOf(a) + bv := reflect.ValueOf(b) + + var ai, bi = 0, 0 + var an, bn = av.NumField(), bv.NumField() + var avi, bvi interface{} + var af, bf reflect.StructField + for { + for ai < an { + af = av.Type().Field(ai) + if isExported(af.Name) { + avi = av.Field(ai).Interface() + ai++ + break + } + ai++ + } + for bi < bn { + bf = bv.Type().Field(bi) + if isExported(bf.Name) { + bvi = bv.Field(bi).Interface() + bi++ + break + } + bi++ + } + if n := valuecmp(avi, bvi); n != 0 { + return n + } + nameA := getFieldName(af) + nameB := getFieldName(bf) + if nameA < nameB { + return -1 + } + if nameA > nameB { + return 1 + } + if ai == an && bi == bn { + return 0 + } + if ai == an || bi == bn { + if ai == bn { + return -1 + } + return 1 + } + } + panic("unreachable") +} + +func isExported(name string) bool { + a := name[0] + return a >= 'A' && a <= 'Z' +} + +func getFieldName(f reflect.StructField) string { + name := f.Tag.Get("bson") + if i := strings.Index(name, ","); i >= 0 { + name = name[:i] + } + if name == "" { + name = strings.ToLower(f.Name) + } + return name +} |