diff options
Diffstat (limited to 'go')
-rw-r--r-- | go/swift-rpc-losf/Makefile | 49 | ||||
-rw-r--r-- | go/swift-rpc-losf/README.md | 20 | ||||
-rw-r--r-- | go/swift-rpc-losf/codes/codes.go | 17 | ||||
-rw-r--r-- | go/swift-rpc-losf/db.go | 121 | ||||
-rw-r--r-- | go/swift-rpc-losf/db_goleveldb.go | 230 | ||||
-rw-r--r-- | go/swift-rpc-losf/db_leveldb.go | 239 | ||||
-rw-r--r-- | go/swift-rpc-losf/encoding.go | 198 | ||||
-rw-r--r-- | go/swift-rpc-losf/encoding_test.go | 225 | ||||
-rw-r--r-- | go/swift-rpc-losf/go.mod | 27 | ||||
-rw-r--r-- | go/swift-rpc-losf/go.sum | 76 | ||||
m--------- | go/swift-rpc-losf/leveldb | 0 | ||||
-rw-r--r-- | go/swift-rpc-losf/logging.go | 43 | ||||
-rw-r--r-- | go/swift-rpc-losf/main.go | 174 | ||||
-rw-r--r-- | go/swift-rpc-losf/proto/fmgr.pb.go | 2213 | ||||
-rw-r--r-- | go/swift-rpc-losf/rpc.go | 1642 | ||||
-rw-r--r-- | go/swift-rpc-losf/rpc_test.go | 1014 | ||||
m--------- | go/swift-rpc-losf/snappy | 0 | ||||
-rw-r--r-- | go/swift-rpc-losf/stats.go | 41 | ||||
-rw-r--r-- | go/swift-rpc-losf/status/status.go | 27 | ||||
-rw-r--r-- | go/swift-rpc-losf/swift.go | 66 | ||||
-rw-r--r-- | go/swift-rpc-losf/swift_test.go | 130 | ||||
-rw-r--r-- | go/swift-rpc-losf/utils.go | 102 |
22 files changed, 6654 insertions, 0 deletions
diff --git a/go/swift-rpc-losf/Makefile b/go/swift-rpc-losf/Makefile new file mode 100644 index 000000000..b3c365c48 --- /dev/null +++ b/go/swift-rpc-losf/Makefile @@ -0,0 +1,49 @@ +# Copyright (c) 2010-2012 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +.PHONY: all build-snappy install-snappy build-leveldb install-leveldb install-go-leveldb build-losf +all: build-snappy install-snappy build-leveldb install-leveldb install-go-leveldb build-losf + +build-losf: + # this installs the protoc-gen-go in $HOME/go/bin, and requires the PATH be set accordingly + go get -u github.com/golang/protobuf/protoc-gen-go + protoc -I ../../swift/obj fmgr.proto --go_out=proto + go get + go build -o ../../bin/swift-rpc-losf + +# TODO: installation will be taken by setup.py when swift-rpc-losf is +# included bin/ directory + +build-snappy: + git submodule update --init snappy + sed -i 's/\(BUILD_SHARED_LIBS "Build.*\) OFF/\1 ON/' snappy/CMakeLists.txt + mkdir -p snappy/build + cmake -S snappy -B snappy/build + $(MAKE) -C snappy/build all + +install-snappy: + sudo $(MAKE) -C snappy/build install + +build-leveldb: + git submodule update --init leveldb + mkdir -p leveldb/build + cmake -DBUILD_SHARED_LIBS=ON -S leveldb -B leveldb/build + cmake --build leveldb/build + +install-leveldb: + sudo $(MAKE) -C leveldb/build install + +install-go-leveldb: + CGO_CFLAGS=/usr/local/include CGO_LDFLAGS="-L/usr/local/lib -Wl,-rpath=/usr/local/lib" go get github.com/jmhodges/levigo diff --git a/go/swift-rpc-losf/README.md b/go/swift-rpc-losf/README.md new file mode 100644 index 000000000..3cb8d5a28 --- /dev/null +++ b/go/swift-rpc-losf/README.md @@ -0,0 +1,20 @@ +This is the RPC server part of the "LOSF" (Lots Of Small Files) work. + +Setup +===== +You will need a working golang environment, gcc and tools (build-essential), and cmake >= 3.9 (ubuntu 16.04 has a version that is too old, get it from cmake.org) + +GO>=1.11 is required to support go modules (https://github.com/golang/go/wiki/Modules). + +Run `make` command in this directory +``` +make +sudo make install +``` + +Usage ****OUTDATED***** +===== +Currently it does not read the ring, you need to start one process per disk and policy on your object-server. +For example : swift-rpc-losf -diskPath=/srv/node/sda -policyIdx=0 -waitForMount=false + +Note that a new database is marked dirty, because there may already be data on disk. (original db may have been removed or corrupted) diff --git a/go/swift-rpc-losf/codes/codes.go b/go/swift-rpc-losf/codes/codes.go new file mode 100644 index 000000000..c7d6aaed9 --- /dev/null +++ b/go/swift-rpc-losf/codes/codes.go @@ -0,0 +1,17 @@ +package codes + +type StatusCode int + +//go:generate stringer -type=StatusCode +const ( + Ok StatusCode = 200 + Cancelled StatusCode = 299 + InvalidArgument StatusCode = 400 + NotFound StatusCode = 404 + AlreadyExists StatusCode = 409 + PermissionDenied StatusCode = 403 + FailedPrecondition StatusCode = 412 + Unimplemented StatusCode = 501 + Internal StatusCode = 500 + Unavailable StatusCode = 503 +) diff --git a/go/swift-rpc-losf/db.go b/go/swift-rpc-losf/db.go new file mode 100644 index 000000000..1a2832c06 --- /dev/null +++ b/go/swift-rpc-losf/db.go @@ -0,0 +1,121 @@ +// Copyright (c) 2010-2012 OpenStack Foundation +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// This is the definition of the key-value interface. + +package main + +import ( + "bytes" + "fmt" +) + +// KV is the interface for operations that must be supported on the key-value store. +// the namespace is a single byte that is used as a key prefix for the different types of objects +// represented in the key-value; (volume, vfile..) +type KV interface { + Get(namespace byte, key []byte) ([]byte, error) + Put(namespace byte, key, value []byte) error + PutSync(namespace byte, key, value []byte) error + Delete(namespace byte, key []byte) error + NewWriteBatch() WriteBatch + NewIterator(namespace byte) Iterator + Close() +} + +// Iterator is the interface for operations that must be supported on the key-value iterator. +type Iterator interface { + SeekToFirst() + Seek(key []byte) + Next() + Key() []byte + Value() []byte + Valid() bool + Close() +} + +// WriteBatch is the interface for operations that must be supported on a "WriteBatch". +// The key-value used must support a write batch (atomic write of multiple entries) +type WriteBatch interface { + // Put places a key-value pair into the WriteBatch for writing later. + Put(namespace byte, key, value []byte) + Delete(namespace byte, key []byte) + + // Commit the WriteBatch atomically + Commit() error + Close() +} + +// Key for the state of the DB. If it has shut down cleanly, the value should be "closed" +const dbStateKey = "dbstate" +const closeState = "closed" +const openState = "opened" + +// setKvState will be called on startup and check whether the kv was closed cleanly. +// It will then mark the db as "opened". +// This is needed because we write asynchronously to the key-value. After a crash/power loss/OOM kill, the db +// may be not in sync with the actual state of the volumes. +func setKvState(kv KV) (isClean bool, err error) { + // Check if we stopped cleanly + isClean, err = IsDbClean(kv) + if err != nil { + log.Warn("Could not check if DB is clean") + return + } + + if isClean { + log.Info("DB is clean, set db state to open") + err = MarkDbOpened(kv) + if err != nil { + log.Warn("Failed to mark db as opened when starting") + return + } + } + + return +} + +// IsDbClean will return true if the db has been previously closed properly. +// This is determined from a specific key in the database that should be set before closing. +func IsDbClean(kv KV) (isClean bool, err error) { + value, err := kv.Get(statsPrefix, []byte(dbStateKey)) + if err != nil { + log.Warn("failed to check kv state") + return + } + + // if the state is "closed", consider it clean + // if the key is missing (new db) consider it dirty. It may have been deleted after a + // corruption and we want to rebuild the DB with the existing volumes, not let the cluster + // restart from scratch. If it's an actual new machine, the check will do nothing (no existing volumes) + if bytes.Equal(value, []byte(closeState)) { + isClean = true + } else { + log.Info(fmt.Sprintf("DB was not closed cleanly, state: %s", value)) + } + return +} + +// MarkDbClosed marks the DB as clean by setting the value of the db state key +func MarkDbClosed(kv KV) (err error) { + err = kv.PutSync(statsPrefix, []byte(dbStateKey), []byte(closeState)) + return +} + +// MarkDbOpened marks the DB as opened by setting the value of the db state key +func MarkDbOpened(kv KV) (err error) { + err = kv.PutSync(statsPrefix, []byte(dbStateKey), []byte(openState)) + return +} diff --git a/go/swift-rpc-losf/db_goleveldb.go b/go/swift-rpc-losf/db_goleveldb.go new file mode 100644 index 000000000..454e5b29c --- /dev/null +++ b/go/swift-rpc-losf/db_goleveldb.go @@ -0,0 +1,230 @@ +// Copyright (c) 2010-2012 OpenStack Foundation +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// This implements the KV interface using goleveldb, a native golang leveldb package. +// Its behavior has been adapted to match the levigo behavior. + +package main + +import ( + "github.com/syndtr/goleveldb/leveldb" + "github.com/syndtr/goleveldb/leveldb/opt" + "github.com/syndtr/goleveldb/leveldb/iterator" +) + +type goLevelDB struct { + db *leveldb.DB + ro *opt.ReadOptions + wo *opt.WriteOptions +} + +type levelDBIterator struct { + it iterator.Iterator + namespace byte +} + +type levelDBWriteBatch struct { + wb *leveldb.Batch + ldb *goLevelDB +} + +// openGoLevelDB Opens or create the DB. +// (should use an interface?) +func openGoLevelDb(path string) (*goLevelDB, error) { + + // TODO check options + db, err := leveldb.OpenFile(path, nil) + if err != nil { + return nil, err + } + + ro := &opt.ReadOptions{} + wo := &opt.WriteOptions{} + + ldb := goLevelDB{db, ro, wo} + + return &ldb, nil +} + +// Key value operations +// +// All operations take a namespace byte to denote the type of object the entry refers to. +func (ldb *goLevelDB) Get(namespace byte, key []byte) (value []byte, err error) { + db := ldb.db + ro := ldb.ro + + // Prefix the key with a single byte (namespace) + buf := make([]byte, len(key)+1) + buf[0] = namespace + copy(buf[1:], key) + + value, err = db.Get(buf, ro) + // Behave similarly to levigo + if err == leveldb.ErrNotFound { + value = nil + err = nil + } + return +} + +func (ldb *goLevelDB) Put(namespace byte, key, value []byte) error { + db := ldb.db + wo := ldb.wo + + // Prefix the key with a single byte (namespace) + buf := make([]byte, len(key)+1) + buf[0] = namespace + copy(buf[1:], key) + + return db.Put(buf, value, wo) +} + +// PutSync will write an entry with the "Sync" option set +func (ldb *goLevelDB) PutSync(namespace byte, key, value []byte) error { + db := ldb.db + wo := &opt.WriteOptions{Sync: true} + + // Prefix the key with a single byte (namespace) + buf := make([]byte, len(key)+1) + buf[0] = namespace + copy(buf[1:], key) + + return db.Put(buf, value, wo) +} + +func (ldb *goLevelDB) Close() { + ldb.db.Close() +} + +func (ldb *goLevelDB) Delete(namespace byte, key []byte) error { + db := ldb.db + wo := ldb.wo + + // Prefix the key with a single byte (namespace) + buf := make([]byte, len(key)+1) + buf[0] = namespace + copy(buf[1:], key) + + return db.Delete(buf, wo) +} + +func (ldb *goLevelDB) NewWriteBatch() WriteBatch { + lwb := &levelDBWriteBatch{} + lwb.wb = new(leveldb.Batch) + lwb.ldb = ldb + return lwb +} + +// Put on a WriteBatch +func (lwb *levelDBWriteBatch) Put(namespace byte, key, value []byte) { + buf := make([]byte, len(key)+1) + buf[0] = namespace + copy(buf[1:], key) + + lwb.wb.Put(buf, value) + return +} + +// Delete on a WriteBatch +func (lwb *levelDBWriteBatch) Delete(namespace byte, key []byte) { + buf := make([]byte, len(key)+1) + buf[0] = namespace + copy(buf[1:], key) + + lwb.wb.Delete(buf) + return +} + +// Commit a WriteBatch +func (lwb *levelDBWriteBatch) Commit() (err error) { + db := lwb.ldb.db + wo := lwb.ldb.wo + wb := lwb.wb + + err = db.Write(wb, wo) + + return +} + +// Close a WriteBatch +func (lwb *levelDBWriteBatch) Close() { + // TODO: check if there really is nothing to do +} + +// Iterator functions +// +// NewIterator creates a new iterator for the given object type (namespace) +func (ldb *goLevelDB) NewIterator(namespace byte) Iterator { + db := ldb.db + ro := ldb.ro + + // Could use the "range" thing in this library + lit := &levelDBIterator{} + lit.it = db.NewIterator(nil, ro) + lit.namespace = namespace + return lit +} + +// SeekToFirst will seek to the first object of the given type +func (lit *levelDBIterator) SeekToFirst() { + // The "first key" is the first one in the iterator's namespace + buf := make([]byte, 1) + buf[0] = lit.namespace + + lit.it.Seek(buf) + return +} + +// Seek moves the iterator to the position of the key +func (lit *levelDBIterator) Seek(key []byte) { + // Prefix the key with a single byte (namespace) + buf := make([]byte, len(key)+1) + buf[0] = lit.namespace + copy(buf[1:], key) + + lit.it.Seek(buf) + return +} + +// Next moves the iterator to the next key +func (lit *levelDBIterator) Next() { + lit.it.Next() + return +} + +// Key returns the key (without the leading namespace byte) +func (lit *levelDBIterator) Key() (key []byte) { + return lit.it.Key()[1:] +} + +// Value returns the value at the current iterator position +func (lit *levelDBIterator) Value() (key []byte) { + return lit.it.Value() +} + +// Valid returns false if we are past the first or last key in the key-value +func (lit *levelDBIterator) Valid() bool { + if lit.it.Key() != nil && lit.it.Key()[0] == lit.namespace { + return true + } else { + return false + } +} + +// Close the iterator +func (lit *levelDBIterator) Close() { + lit.it.Release() + return +} diff --git a/go/swift-rpc-losf/db_leveldb.go b/go/swift-rpc-losf/db_leveldb.go new file mode 100644 index 000000000..4d3a35a82 --- /dev/null +++ b/go/swift-rpc-losf/db_leveldb.go @@ -0,0 +1,239 @@ +// Copyright (c) 2010-2012 OpenStack Foundation +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// This implements the KV interface using levigo, which is a golang wrapper around the leveldb C++ library. + +package main + +import ( + "github.com/jmhodges/levigo" +) + +// levigoDB holds the leveldb handle and options +type levigoDB struct { + db *levigo.DB + ro *levigo.ReadOptions + wo *levigo.WriteOptions +} + +// levigoIterator wraps a levelDB iterator. The namespace byte is used to specify which type of +// entry (volume, vfile..) it will iterate on. +type levigoIterator struct { + it *levigo.Iterator + namespace byte +} + +// levigoWriteBatch wraps a levigoDB WriteBatch +type levigoWriteBatch struct { + wb *levigo.WriteBatch + ldb *levigoDB +} + +// openLevigoDB Opens or create the DB. +// (shoult use an interface?) +func openLevigoDB(path string) (*levigoDB, error) { + + opts := levigo.NewOptions() + // filter := levigo.NewBloomFilter(10) + // opts.SetFilterPolicy(filter) + // That may be useless, since we're supposed to fit in memory ? 10MB for now + opts.SetCache(levigo.NewLRUCache(10 * 1048576)) + opts.SetCreateIfMissing(true) + + // This will open or create the DB. A new DB is not marked as clean. It + // may have been lost or deleted, while there is data in volumes on-disk. + // A new DB will have to be checked and marked as clean. + db, err := levigo.Open(path, opts) + if err != nil { + return nil, err + } + + ro := levigo.NewReadOptions() + wo := levigo.NewWriteOptions() + + ldb := levigoDB{db, ro, wo} + + return &ldb, nil +} + +// Key value operations +// +// All operations take a namespace byte to denote the type of object the entry refers to. +// Get wraps levigoDB Get +func (ldb *levigoDB) Get(namespace byte, key []byte) (value []byte, err error) { + db := ldb.db + ro := ldb.ro + + // Prefix the key with a single byte (namespace) + buf := make([]byte, len(key)+1) + buf[0] = namespace + copy(buf[1:], key) + + value, err = db.Get(ro, buf) + return +} + +// Put wraps levigoDB Put +func (ldb *levigoDB) Put(namespace byte, key, value []byte) error { + db := ldb.db + wo := ldb.wo + + // Prefix the key with a single byte (namespace) + buf := make([]byte, len(key)+1) + buf[0] = namespace + copy(buf[1:], key) + + return db.Put(wo, buf, value) +} + +// PutSync will write an entry with the "Sync" option set +func (ldb *levigoDB) PutSync(namespace byte, key, value []byte) error { + db := ldb.db + wo := levigo.NewWriteOptions() + wo.SetSync(true) + + // Prefix the key with a single byte (namespace) + buf := make([]byte, len(key)+1) + buf[0] = namespace + copy(buf[1:], key) + + return db.Put(wo, buf, value) +} + +// Close wraps levigoDB Close +func (ldb *levigoDB) Close() { + ldb.db.Close() +} + +// Delete wraps levigoDB Delete +func (ldb *levigoDB) Delete(namespace byte, key []byte) error { + db := ldb.db + wo := ldb.wo + + // Prefix the key with a single byte (namespace) + buf := make([]byte, len(key)+1) + buf[0] = namespace + copy(buf[1:], key) + + return db.Delete(wo, buf) +} + +// NewWriteBatch creates a new WriteBatch +func (ldb *levigoDB) NewWriteBatch() WriteBatch { + lwb := &levigoWriteBatch{} + lwb.wb = levigo.NewWriteBatch() + lwb.ldb = ldb + return lwb +} + +// Put on a WriteBatch +func (lwb *levigoWriteBatch) Put(namespace byte, key, value []byte) { + buf := make([]byte, len(key)+1) + buf[0] = namespace + copy(buf[1:], key) + + lwb.wb.Put(buf, value) + return +} + +// Delete on a WriteBatch +func (lwb *levigoWriteBatch) Delete(namespace byte, key []byte) { + buf := make([]byte, len(key)+1) + buf[0] = namespace + copy(buf[1:], key) + + lwb.wb.Delete(buf) + return +} + +// Commit a WriteBatch +func (lwb *levigoWriteBatch) Commit() (err error) { + db := lwb.ldb.db + wo := lwb.ldb.wo + wb := lwb.wb + + err = db.Write(wo, wb) + + return +} + +// Close a WriteBatch +func (lwb *levigoWriteBatch) Close() { + wb := lwb.wb + + wb.Close() +} + +// Iterator functions +// +// NewIterator creates a new iterator for the given object type (namespace) +func (ldb *levigoDB) NewIterator(namespace byte) Iterator { + lit := &levigoIterator{} + lit.it = ldb.db.NewIterator(ldb.ro) + lit.namespace = namespace + return lit +} + +// SeekToFirst will seek to the first object of the given type +func (lit *levigoIterator) SeekToFirst() { + // The "first key" is the first one in the iterator's namespace + buf := make([]byte, 1) + buf[0] = lit.namespace + + lit.it.Seek(buf) + return +} + +// Seek moves the iterator to the position of the key +func (lit *levigoIterator) Seek(key []byte) { + // Prefix the key with a single byte (namespace) + buf := make([]byte, len(key)+1) + buf[0] = lit.namespace + copy(buf[1:], key) + + lit.it.Seek(buf) + return +} + +// Next moves the iterator to the next key +func (lit *levigoIterator) Next() { + lit.it.Next() + return +} + +// Key returns the key (without the leading namespace byte) +func (lit *levigoIterator) Key() (key []byte) { + return lit.it.Key()[1:] +} + +// Value returns the value at the current iterator position +func (lit *levigoIterator) Value() (key []byte) { + return lit.it.Value() +} + +// Valid returns false if we are past the first or last key in the key-value +func (lit *levigoIterator) Valid() bool { + if lit.it.Valid() && lit.it.Key()[0] == lit.namespace { + return true + } else { + return false + } +} + +// Close the iterator +func (lit *levigoIterator) Close() { + lit.it.Close() + return +} diff --git a/go/swift-rpc-losf/encoding.go b/go/swift-rpc-losf/encoding.go new file mode 100644 index 000000000..99c4384b1 --- /dev/null +++ b/go/swift-rpc-losf/encoding.go @@ -0,0 +1,198 @@ +// Copyright (c) 2010-2012 OpenStack Foundation +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "encoding/binary" + "encoding/hex" + "errors" + "github.com/sirupsen/logrus" +) + +// Encodes a volume key. +func EncodeVolumeKey(index uint32) (val []byte) { + buf := make([]byte, binary.MaxVarintLen32) + + n := binary.PutUvarint(buf, uint64(index)) + + val = buf[:n] + log.WithFields(logrus.Fields{"value": val}).Debug("encoded volume key") + return +} + +func DecodeVolumeKey(val []byte) (index uint32, err error) { + index32, n := binary.Uvarint(val) + if n <= 0 { + err = errors.New("failed to decode index") + return + } + + index = uint32(index32) + return +} + +// volumeType is an int32 to match the type generated by protobuf for enums +func EncodeVolumeValue(partition int64, volumeType int32, nextOffset, usedSpace, state int64) (val []byte) { + buf := make([]byte, binary.MaxVarintLen64*5) + bufLen := 0 + + n := binary.PutUvarint(buf, uint64(partition)) + bufLen += n + + n = binary.PutUvarint(buf[bufLen:], uint64(volumeType)) + bufLen += n + + n = binary.PutUvarint(buf[bufLen:], uint64(nextOffset)) + bufLen += n + + n = binary.PutUvarint(buf[bufLen:], uint64(usedSpace)) + bufLen += n + + n = binary.PutUvarint(buf[bufLen:], uint64(state)) + bufLen += n + + val = buf[:bufLen] + log.WithFields(logrus.Fields{"value": val}).Debug("encoded volume value") + return +} + +func DecodeVolumeValue(val []byte) (partition int64, volumeType int32, nextOffset, usedSpace, state int64, err error) { + position := 0 + + partition64, n := binary.Uvarint(val) + if n <= 0 { + err = errors.New("failed to decode partition") + } + position += n + + volumeType64, n := binary.Uvarint(val[position:]) + if n <= 0 { + err = errors.New("failed to decode nextOffset") + } + position += n + + nextOffset64, n := binary.Uvarint(val[position:]) + if n <= 0 { + err = errors.New("failed to decode nextOffset") + } + position += n + + usedSpace64, n := binary.Uvarint(val[position:]) + if n <= 0 { + err = errors.New("failed to decode usedSpace") + } + position += n + + state64, n := binary.Uvarint(val[position:]) + if n <= 0 { + err = errors.New("failed to decode state") + } + + partition = int64(partition64) + volumeType = int32(volumeType64) + nextOffset = int64(nextOffset64) + usedSpace = int64(usedSpace64) + state = int64(state64) + return +} + +// Encodes an object key. the key is the md5 hash string + the filename. +// Turn the 32 characters hash to a 16 byte array. Leave the filename as +// is for now. We could gain more space encoding the filename (varint timestamp + encoded file extension), +// but there are special cases to handle (the "delta") +func EncodeObjectKey(key []byte) ([]byte, error) { + var err error + + if len(key) < 32 { + err = errors.New("object key len < 32, cannot encode") + return nil, err + } + + dst := make([]byte, 16+len(key[32:])) + n, err := hex.Decode(dst, key[:32]) + if err != nil { + err = errors.New("failed to encode object hash") + return dst, err + } + + if n != 16 { + err = errors.New("encoded object hash is not 16 bytes long") + return dst, err + } + + // copy the filename + copy(dst[16:], key[32:]) + + return dst, err +} + +// Decodes object key +// This is the most called function of the project. The profiler showed that it did +// the most allocations on the heap (after cgo, which is something else to look at..) +// Now expect the buffer from the caller. +// decodedKey size must be 32+len(encodedKey[16:]), because we will decode the 16 bytes +// hash to a 32 bytes string, with the rest unchanged. +func DecodeObjectKey(encodedKey []byte, decodedKey []byte) error { + if len(encodedKey) < 16 { + err := errors.New("DecodeObjectKey called with encodedKey of len < 16") + return err + } + if len(decodedKey) < 32+len(encodedKey[16:]) { + err := errors.New("DecodeObjectKey called with decodedKey too small") + return err + } + + hex.Encode(decodedKey, encodedKey[:16]) + copy(decodedKey[32:], encodedKey[16:]) + + return nil +} + +// Encodes an object file value. +func EncodeObjectValue(volumeIndex uint32, offset uint64) (val []byte) { + buf := make([]byte, binary.MaxVarintLen64*2) + bufLen := 0 + + n := binary.PutUvarint(buf, uint64(volumeIndex)) + bufLen += n + + n = binary.PutUvarint(buf[n:], offset) + bufLen += n + + val = buf[:bufLen] + log.WithFields(logrus.Fields{"value": val}).Debug("encoded object value") + return +} + +func DecodeObjectValue(val []byte) (volumeIndex uint32, offset uint64, err error) { + log.WithFields(logrus.Fields{"value": val}).Debug("Decode object value") + volumeIndex64, n := binary.Uvarint(val) + if n <= 0 { + log.WithFields(logrus.Fields{"index": n}).Debug("failed to decode volumeIndex") + err = errors.New("failed to decode volumeIndex") + } + + offset64, n := binary.Uvarint(val[n:]) + if n <= 0 { + log.WithFields(logrus.Fields{"offset": n}).Debug("failed to decode offset") + err = errors.New("failed to decode offset") + return + } + + volumeIndex = uint32(volumeIndex64) + offset = uint64(offset64) + return +} diff --git a/go/swift-rpc-losf/encoding_test.go b/go/swift-rpc-losf/encoding_test.go new file mode 100644 index 000000000..986a1c903 --- /dev/null +++ b/go/swift-rpc-losf/encoding_test.go @@ -0,0 +1,225 @@ +// Copyright (c) 2010-2012 OpenStack Foundation +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "bytes" + "testing" +) + +type dfKeyTest struct { + index uint32 + value []byte +} + +func TestVolumeKey(t *testing.T) { + var dFKeyTests = []dfKeyTest{ + {0, []byte("\x00")}, + {1, []byte("\x01")}, + {123, []byte("\x7b")}, + {863523, []byte("\xa3\xda\x34")}, + {1<<32 - 1, []byte("\xff\xff\xff\xff\x0f")}, + } + + for _, tt := range dFKeyTests { + // Test encoding + val := EncodeVolumeKey(tt.index) + if !bytes.Equal(val, tt.value) { + t.Errorf("For index: %d, got %x, expected %x", tt.index, val, tt.value) + } + + // Test decoding + index, err := DecodeVolumeKey(val) + if err != nil { + t.Fatal(err) + } + if index != tt.index { + t.Errorf("For value: %x, got %d, expected %d", val, index, tt.index) + } + + } + + // Test overflow + m1 := []byte{0x80, 0x80, 0x80, 0x80} + _, err := DecodeVolumeKey(m1) + if err == nil { + t.Errorf("We should fail to decode %x", m1) + } +} + +type dfValueTest struct { + partition int64 + volumeType int32 + nextOffset int64 + usedSpace int64 + state int64 + value []byte +} + +func TestVolumeValue(t *testing.T) { + var dfValueTests = []dfValueTest{ + {0, 0, 0, 0, 0, []byte("\x00\x00\x00\x00\x00")}, + {1343, 12, 3345314, 9821637, 2, []byte("\xbf\x0a\x0c\xa2\x97\xcc\x01\xc5\xbb\xd7\x04\x02")}, + {^int64(0), ^int32(0), ^int64(0), ^int64(0), ^int64(0), bytes.Repeat([]byte("\xff\xff\xff\xff\xff\xff\xff\xff\xff\x01"), 5)}, + // any negative value does not make sense, and should be caught by the RPC. + // test anyway, they get cast to uint64. + {-3572, 12, -1977878, 66666, -999999, + []byte("\x8c\xe4\xff\xff\xff\xff\xff\xff\xff\x01\x0c\xea\xa3\x87\xff\xff\xff" + + "\xff\xff\xff\x01\xea\x88\x04\xc1\xfb\xc2\xff\xff\xff\xff\xff\xff\x01")}, + } + + for _, tt := range dfValueTests { + // Test encoding + val := EncodeVolumeValue(tt.partition, tt.volumeType, tt.nextOffset, tt.usedSpace, tt.state) + if !bytes.Equal(val, tt.value) { + t.Errorf("For partition: %d, volumeType: %d, nextOffset: %d, usedSpace: %d, state: %d "+ + "got: %x, expected: %x", + tt.partition, tt.volumeType, tt.nextOffset, tt.usedSpace, tt.state, val, tt.value) + } + + // Test decoding + partition, volumeType, nextOffset, usedSpace, state, err := DecodeVolumeValue(tt.value) + if err != nil { + t.Error(err) + } + if partition != tt.partition { + t.Errorf("Decoding value: %x, expected: %d, got: %d", tt.value, tt.partition, partition) + } + if volumeType != tt.volumeType { + t.Errorf("Decoding value: %x, expected: %d, got: %d", tt.value, tt.volumeType, volumeType) + } + if nextOffset != tt.nextOffset { + t.Errorf("Decoding value: %x, expected: %d, got: %d", tt.value, tt.nextOffset, nextOffset) + } + if usedSpace != tt.usedSpace { + t.Errorf("Decoding value: %x, expected: %d, got: %d", tt.value, tt.usedSpace, usedSpace) + } + if state != tt.state { + t.Errorf("Decoding value: %x, expected: %d, got: %d", tt.value, tt.state, state) + } + } + // Test overflow + m1 := []byte{0x80, 0x80, 0x80, 0x80} + _, _, _, _, _, err := DecodeVolumeValue(m1) + if err == nil { + t.Errorf("We should fail to decode %x", m1) + } +} + +type objectKeyTest struct { + key []byte + value []byte +} + +func TestObjectKey(t *testing.T) { + var objectKeyTests = []objectKeyTest{ + {[]byte("b80362143ac3221d15a75f4bd1af3fac1484213329.64315.data"), + []byte("\xb8\x03\x62\x14\x3a\xc3\x22\x1d\x15\xa7\x5f\x4b\xd1\xaf" + + "\x3f\xac\x31\x34\x38\x34\x32\x31\x33\x33\x32\x39\x2e\x36\x34" + + "\x33\x31\x35\x2e\x64\x61\x74\x61")}, + {[]byte("a2b98cd26a070c2e5200be1f950813d51494323929.64315.ts"), + []byte("\xa2\xb9\x8c\xd2\x6a\x07\x0c\x2e\x52\x00\xbe\x1f\x95\x08" + + "\x13\xd5\x31\x34\x39\x34\x33\x32\x33\x39\x32\x39\x2e\x36\x34" + + "\x33\x31\x35\x2e\x74\x73")}, + } + + for _, tt := range objectKeyTests { + // Test encoding + val, err := EncodeObjectKey(tt.key) + if err != nil { + t.Fatal(err) + } + if !bytes.Equal(val, tt.value) { + t.Errorf("For key: %x, got %x, expected %x", tt.key, val, tt.value) + } + + // Test decoding + key := make([]byte, 32+len(val[16:])) + err = DecodeObjectKey(val, key) + if err != nil { + t.Fatal(err) + } + if !bytes.Equal(key, tt.key) { + t.Errorf("For value: %x, got %d, expected %d", val, key, tt.key) + } + + } + + // Test encoding invalid object name, too short + invalidName := []byte("tooshort") + _, err := EncodeObjectKey(invalidName) + if err == nil { + t.Fatalf("should fail to encode object key: %x", invalidName) + } + + // Test encoding invalid object name, bad char + invalidName = []byte("badchar\xff6a070c2e5200be1f950813d51494323929.64315.ts") + _, err = EncodeObjectKey(invalidName) + if err == nil { + t.Fatalf("should fail to encode: %x as object key", invalidName) + } + + // Test decoding invalid data + invalidData := []byte("tooshort") + key := make([]byte, 12) + err = DecodeObjectKey(invalidData, key) + if err == nil { + t.Fatalf("should fail to decode: %x as object key", invalidData) + } + +} + +type objectValueTest struct { + volumeIndex uint32 + offset uint64 + value []byte +} + +func TestObjectValue(t *testing.T) { + var objectValueTests = []objectValueTest{ + {0, 0, []byte("\x00\x00")}, + {1, 16384, []byte("\x01\x80\x80\x01")}, + {823762, 61 * 1024 * 1024 * 1024, []byte("\xd2\xa3\x32\x80\x80\x80\x80\xf4\x01")}, + {1<<32 - 1, 1<<64 - 1, []byte("\xff\xff\xff\xff\x0f\xff\xff\xff\xff\xff\xff\xff\xff\xff\x01")}, + } + + for _, tt := range objectValueTests { + // Test encoding + val := EncodeObjectValue(tt.volumeIndex, tt.offset) + if !bytes.Equal(val, tt.value) { + t.Errorf("For volumeType: %d, offset: %d, got %x, expected %x", tt.volumeIndex, tt.offset, val, tt.value) + } + + // Test decoding + volumeIndex, offset, err := DecodeObjectValue(val) + if err != nil { + t.Fatal(err) + } + if volumeIndex != tt.volumeIndex { + t.Errorf("Decoding value: %x, expected: %d, got: %d", tt.value, tt.volumeIndex, volumeIndex) + } + if offset != tt.offset { + t.Errorf("Decoding value: %x, expected: %d, got: %d", tt.value, tt.offset, offset) + } + } + + // Test decoding invalid data + invalidData := []byte("\xff") + _, _, err := DecodeObjectValue(invalidData) + if err == nil { + t.Fatalf("should fail to decode: %x as object value", invalidData) + } +} diff --git a/go/swift-rpc-losf/go.mod b/go/swift-rpc-losf/go.mod new file mode 100644 index 000000000..b76dcc723 --- /dev/null +++ b/go/swift-rpc-losf/go.mod @@ -0,0 +1,27 @@ +module github.com/openstack/swift-rpc-losf + +go 1.14 + +// This file is auto-generated with following commands +// GO111MODULE=on go mod init // require to run under GOPATH +// GO111MODULE=on go get // able to run anywhare + +// TODO: think of we need to pin the versions as required + +require ( + github.com/alecuyer/statsd/v2 v2.0.6 + github.com/golang/protobuf v1.3.5 + github.com/golang/snappy v0.0.1 // indirect + github.com/jmhodges/levigo v1.0.0 + github.com/konsorten/go-windows-terminal-sequences v1.0.2 // indirect + github.com/kr/pretty v0.1.0 // indirect + github.com/onsi/ginkgo v1.10.1 // indirect + github.com/onsi/gomega v1.7.0 // indirect + github.com/sirupsen/logrus v1.4.2 + github.com/stretchr/testify v1.4.0 // indirect + github.com/syndtr/goleveldb v1.0.0 + golang.org/x/net v0.0.0-20190827160401-ba9fcec4b297 // indirect + golang.org/x/sys v0.0.0-20190904005037-43c01164e931 + golang.org/x/text v0.3.2 // indirect + gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect +) diff --git a/go/swift-rpc-losf/go.sum b/go/swift-rpc-losf/go.sum new file mode 100644 index 000000000..b1a8747d6 --- /dev/null +++ b/go/swift-rpc-losf/go.sum @@ -0,0 +1,76 @@ +github.com/alecuyer/statsd/v2 v2.0.6 h1:Zw7MkTocpUgJiiyGK/4r99qV6rFcq3COJJYkyFVvKpo= +github.com/alecuyer/statsd/v2 v2.0.6/go.mod h1:qNFEkL4GN8jGsZpfrn9P6Q0FfdJf1b1FEGpWnesU4dc= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= +github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= +github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.5 h1:F768QJ1E9tib+q5Sc8MkdJi1RxLTbRcTf8LJV56aRls= +github.com/golang/protobuf v1.3.5/go.mod h1:6O5/vntMXwX2lRkT1hjjk0nAC1IDOTvTlVgjlRvqsdk= +github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db h1:woRePGFeVFfLKN/pOkfl+p/TAqKOfFu+7KPlMVpok/w= +github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= +github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= +github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= +github.com/jmhodges/levigo v1.0.0 h1:q5EC36kV79HWeTBWsod3mG11EgStG3qArTKcvlksN1U= +github.com/jmhodges/levigo v1.0.0/go.mod h1:Q6Qx+uH3RAqyK4rFQroq9RL7mdkABMcfhEI+nNuzMJQ= +github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk= +github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= +github.com/konsorten/go-windows-terminal-sequences v1.0.2 h1:DB17ag19krx9CFsz4o3enTrPXyIXCl+2iCXH/aMAp9s= +github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= +github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/ginkgo v1.7.0 h1:WSHQ+IS43OoUrWtD1/bbclrwK8TTH5hzp+umCiuxHgs= +github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/ginkgo v1.10.1 h1:q/mM8GF/n0shIN8SaAZ0V+jnLPzen6WIVZdiwrRlMlo= +github.com/onsi/ginkgo v1.10.1/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/gomega v1.4.3 h1:RE1xgDvH7imwFD45h+u2SgIfERHlS2yNG4DObb5BSKU= +github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= +github.com/onsi/gomega v1.7.0 h1:XPnZz8VVBHjVsy1vzJmRwIcSwiUO+JFfrv/xGiigmME= +github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4= +github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/syndtr/goleveldb v1.0.0 h1:fBdIW9lB4Iz0n9khmH8w27SJ3QEJ7+IgjPEwGSZiFdE= +github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpPAyBWyWuQ= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190827160401-ba9fcec4b297 h1:k7pJ2yAPLPgbskkFdhRCsA77k2fySZ1zf2zCjvQCiIM= +golang.org/x/net v0.0.0-20190827160401-ba9fcec4b297/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190904005037-43c01164e931 h1:+WYfosiOJzB4BjsISl1Rv4ZLUy+VYcF+u+0Y9jcerv8= +golang.org/x/sys v0.0.0-20190904005037-43c01164e931/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs= +golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= +gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= +gopkg.in/yaml.v2 v2.2.1 h1:mUhvW9EsL+naU5Q3cakzfE91YhliOondGd6ZrsDBHQE= +gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/go/swift-rpc-losf/leveldb b/go/swift-rpc-losf/leveldb new file mode 160000 +Subproject fe4494804f5e3a2e25485d32aeb0eb7d2f25732 diff --git a/go/swift-rpc-losf/logging.go b/go/swift-rpc-losf/logging.go new file mode 100644 index 000000000..d30e764db --- /dev/null +++ b/go/swift-rpc-losf/logging.go @@ -0,0 +1,43 @@ +// Copyright (c) 2010-2012 OpenStack Foundation +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "github.com/sirupsen/logrus" + lSyslog "github.com/sirupsen/logrus/hooks/syslog" + "io/ioutil" + "log/syslog" +) + +// global logger +var log = logrus.New() + +func setupLogging() { + formatter := new(logrus.TextFormatter) + formatter.DisableTimestamp = true + log.Formatter = formatter + + hook, err := lSyslog.NewSyslogHook("", "", syslog.LOG_INFO, "swift-kv") + + if err != nil { + panic("cannot create syslog hook") + } + + log.Hooks.Add(hook) + + // Disable default logging, we only want to log through the syslog hook + log.Out = ioutil.Discard +} diff --git a/go/swift-rpc-losf/main.go b/go/swift-rpc-losf/main.go new file mode 100644 index 000000000..9bf6489d3 --- /dev/null +++ b/go/swift-rpc-losf/main.go @@ -0,0 +1,174 @@ +// Copyright (c) 2010-2012 OpenStack Foundation +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// KV store for LOFS +package main + +import ( + "flag" + "fmt" + "github.com/sirupsen/logrus" + "net/http" + _ "net/http/pprof" + "os" + "os/signal" + "path" + "syscall" + "time" +) + +// Name of the base losf root directory, relative to the swift disk +const rootDirBase = "losf" + +// Will run checks and create rootDir if needed +// Returns the path to rootDir +func diskSetup(diskPath string, policyIdx int, waitForMount bool) (string, error) { + + if waitForMount { + log.Debugf("waitForMount is set, if %s is not mounted, will wait until it is", diskPath) + } + for waitForMount { + nowMounted, err := isMounted(diskPath) + if err != nil { + return "", err + } + if nowMounted { + break + } + time.Sleep(time.Second / 10) + } + + // OVH patch to match a similar mechanism in the python code. + // If a ".offline" file exists in srv node, do not start (/srv/node/disk-XX.offline) + offlineFile := fmt.Sprintf("%s%s", diskPath, ".offline") + offlineFileExists := false + if _, err := os.Stat(offlineFile); err == nil { + offlineFileExists = true + log.Debugf("offline file exists: %s", offlineFile) + } + for offlineFileExists { + if _, err := os.Stat(offlineFile); os.IsNotExist(err) { + offlineFileExists = false + } + time.Sleep(time.Second * 10) + } + + rootDir := path.Join(diskPath, getBaseDirName(rootDirBase, policyIdx)) + log.Debug(rootDir) + + rootDirExists, err := dirExists(rootDir) + if err != nil { + return "", err + } + if !rootDirExists { + err := os.Mkdir(rootDir, os.FileMode(0700)) + if err != nil { + return "", err + } + } + return rootDir, nil +} + +// Parse options and starts the rpc server. +func main() { + var dbDir, socketPath string + var kv KV + + setupLogging() + + debugLevels := map[string]logrus.Level{ + "panic": logrus.PanicLevel, + "fatal": logrus.FatalLevel, + "error": logrus.ErrorLevel, + "warn": logrus.WarnLevel, + "info": logrus.InfoLevel, + "debug": logrus.DebugLevel, + } + + var diskPath = flag.String("diskPath", "", "Swift disk path (/srv/node/disk-xyz)") + var policyIdx = flag.Int("policyIdx", 0, "Policy index") + var waitForMount = flag.Bool("waitForMount", true, "Wait for diskPath to be mounted. If diskPath exists but is not a mount, it will wait") + var profilerAddr = flag.String("profilerAddr", "", "Start profiler and make it available at this address (127.0.0.1:8081)") + var debugLevel = flag.String("debug", "info", "Debug level (error, warn, info, debug)") + var allowRoot = flag.Bool("allowRoot", false, "Allow process to run as root") + var useGoLevelDB = flag.Bool("useGoLevelDB", false, "Use native golang levelDB package") + + flag.Parse() + + log.SetLevel(debugLevels[*debugLevel]) + + if !*allowRoot && os.Getuid() == 0 { + log.Fatal("running as root, and allowRoot is false") + } + + if *diskPath == "" { + log.Fatal("diskPath not specified") + } + + rootDir, err := diskSetup(*diskPath, *policyIdx, *waitForMount) + if err != nil { + panic(err) + } + + dbDir = path.Join(rootDir, "db") + socketPath = path.Join(rootDir, "rpc.socket") + rlog := log.WithFields(logrus.Fields{"socket": socketPath}) + + // install signal handler + stopChan := make(chan os.Signal, 1) + signal.Notify(stopChan, os.Interrupt, syscall.SIGTERM) + + // start http server for profiling + if *profilerAddr != "" { + go func() { + rlog.Debug(http.ListenAndServe(*profilerAddr, nil)) + }() + } + + // Acquire lock to protect socket + rlog.Debug("Locking socket") + err = lockSocket(socketPath) + if err != nil { + rlog.Fatalf("Failed to lock RPC socket: %s", err) + } + os.Remove(socketPath) + + // Open the database + if *useGoLevelDB { + kv, err = openGoLevelDb(dbDir) + } else { + kv, err = openLevigoDB(dbDir) + } + + if err != nil { + rlog.Fatal(err) + } + + // Check the kv was stopped properly + isClean, err := setKvState(kv) + if err != nil { + rlog.Fatal(err) + } + log.Infof("kv is clean: %v", isClean) + + // Start the RPC server + rlog.Info("Starting RPC server") + err = runServer(kv, *diskPath, socketPath, stopChan, isClean) + if err != nil { + rlog.Fatal(err) + } + + return +} diff --git a/go/swift-rpc-losf/proto/fmgr.pb.go b/go/swift-rpc-losf/proto/fmgr.pb.go new file mode 100644 index 000000000..1cb26dc7b --- /dev/null +++ b/go/swift-rpc-losf/proto/fmgr.pb.go @@ -0,0 +1,2213 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// source: fmgr.proto + +package filemgr + +import ( + fmt "fmt" + proto "github.com/golang/protobuf/proto" + math "math" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package + +// Enums +type VolumeType int32 + +const ( + VolumeType_VOLUME_DEFAULT VolumeType = 0 + VolumeType_VOLUME_TOMBSTONE VolumeType = 1 + VolumeType_VOLUME_X_DELETE_AT VolumeType = 2 +) + +var VolumeType_name = map[int32]string{ + 0: "VOLUME_DEFAULT", + 1: "VOLUME_TOMBSTONE", + 2: "VOLUME_X_DELETE_AT", +} + +var VolumeType_value = map[string]int32{ + "VOLUME_DEFAULT": 0, + "VOLUME_TOMBSTONE": 1, + "VOLUME_X_DELETE_AT": 2, +} + +func (x VolumeType) String() string { + return proto.EnumName(VolumeType_name, int32(x)) +} + +func (VolumeType) EnumDescriptor() ([]byte, []int) { + return fileDescriptor_1fcd0776e05e82a6, []int{0} +} + +type VolumeState int32 + +const ( + // Default state, volume can be read from and written to + VolumeState_STATE_RW VolumeState = 0 + // Volume is being compacted (source). New objects cannot be appended + VolumeState_STATE_COMPACTION_SRC VolumeState = 1 + // Volume is a compaction target. New objects cannot be appended + VolumeState_STATE_COMPACTION_TARGET VolumeState = 2 +) + +var VolumeState_name = map[int32]string{ + 0: "STATE_RW", + 1: "STATE_COMPACTION_SRC", + 2: "STATE_COMPACTION_TARGET", +} + +var VolumeState_value = map[string]int32{ + "STATE_RW": 0, + "STATE_COMPACTION_SRC": 1, + "STATE_COMPACTION_TARGET": 2, +} + +func (x VolumeState) String() string { + return proto.EnumName(VolumeState_name, int32(x)) +} + +func (VolumeState) EnumDescriptor() ([]byte, []int) { + return fileDescriptor_1fcd0776e05e82a6, []int{1} +} + +type RegisterVolumeRequest struct { + Partition uint32 `protobuf:"varint,1,opt,name=partition,proto3" json:"partition,omitempty"` + Type VolumeType `protobuf:"varint,2,opt,name=type,proto3,enum=filemgr.VolumeType" json:"type,omitempty"` + VolumeIndex uint32 `protobuf:"varint,3,opt,name=volume_index,json=volumeIndex,proto3" json:"volume_index,omitempty"` + Offset uint64 `protobuf:"varint,4,opt,name=offset,proto3" json:"offset,omitempty"` + State VolumeState `protobuf:"varint,5,opt,name=state,proto3,enum=filemgr.VolumeState" json:"state,omitempty"` + RepairTool bool `protobuf:"varint,6,opt,name=repair_tool,json=repairTool,proto3" json:"repair_tool,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *RegisterVolumeRequest) Reset() { *m = RegisterVolumeRequest{} } +func (m *RegisterVolumeRequest) String() string { return proto.CompactTextString(m) } +func (*RegisterVolumeRequest) ProtoMessage() {} +func (*RegisterVolumeRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_1fcd0776e05e82a6, []int{0} +} + +func (m *RegisterVolumeRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_RegisterVolumeRequest.Unmarshal(m, b) +} +func (m *RegisterVolumeRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_RegisterVolumeRequest.Marshal(b, m, deterministic) +} +func (m *RegisterVolumeRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_RegisterVolumeRequest.Merge(m, src) +} +func (m *RegisterVolumeRequest) XXX_Size() int { + return xxx_messageInfo_RegisterVolumeRequest.Size(m) +} +func (m *RegisterVolumeRequest) XXX_DiscardUnknown() { + xxx_messageInfo_RegisterVolumeRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_RegisterVolumeRequest proto.InternalMessageInfo + +func (m *RegisterVolumeRequest) GetPartition() uint32 { + if m != nil { + return m.Partition + } + return 0 +} + +func (m *RegisterVolumeRequest) GetType() VolumeType { + if m != nil { + return m.Type + } + return VolumeType_VOLUME_DEFAULT +} + +func (m *RegisterVolumeRequest) GetVolumeIndex() uint32 { + if m != nil { + return m.VolumeIndex + } + return 0 +} + +func (m *RegisterVolumeRequest) GetOffset() uint64 { + if m != nil { + return m.Offset + } + return 0 +} + +func (m *RegisterVolumeRequest) GetState() VolumeState { + if m != nil { + return m.State + } + return VolumeState_STATE_RW +} + +func (m *RegisterVolumeRequest) GetRepairTool() bool { + if m != nil { + return m.RepairTool + } + return false +} + +type RegisterVolumeReply struct { + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *RegisterVolumeReply) Reset() { *m = RegisterVolumeReply{} } +func (m *RegisterVolumeReply) String() string { return proto.CompactTextString(m) } +func (*RegisterVolumeReply) ProtoMessage() {} +func (*RegisterVolumeReply) Descriptor() ([]byte, []int) { + return fileDescriptor_1fcd0776e05e82a6, []int{1} +} + +func (m *RegisterVolumeReply) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_RegisterVolumeReply.Unmarshal(m, b) +} +func (m *RegisterVolumeReply) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_RegisterVolumeReply.Marshal(b, m, deterministic) +} +func (m *RegisterVolumeReply) XXX_Merge(src proto.Message) { + xxx_messageInfo_RegisterVolumeReply.Merge(m, src) +} +func (m *RegisterVolumeReply) XXX_Size() int { + return xxx_messageInfo_RegisterVolumeReply.Size(m) +} +func (m *RegisterVolumeReply) XXX_DiscardUnknown() { + xxx_messageInfo_RegisterVolumeReply.DiscardUnknown(m) +} + +var xxx_messageInfo_RegisterVolumeReply proto.InternalMessageInfo + +type UnregisterVolumeRequest struct { + Index uint32 `protobuf:"varint,1,opt,name=index,proto3" json:"index,omitempty"` + RepairTool bool `protobuf:"varint,2,opt,name=repair_tool,json=repairTool,proto3" json:"repair_tool,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *UnregisterVolumeRequest) Reset() { *m = UnregisterVolumeRequest{} } +func (m *UnregisterVolumeRequest) String() string { return proto.CompactTextString(m) } +func (*UnregisterVolumeRequest) ProtoMessage() {} +func (*UnregisterVolumeRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_1fcd0776e05e82a6, []int{2} +} + +func (m *UnregisterVolumeRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_UnregisterVolumeRequest.Unmarshal(m, b) +} +func (m *UnregisterVolumeRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_UnregisterVolumeRequest.Marshal(b, m, deterministic) +} +func (m *UnregisterVolumeRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_UnregisterVolumeRequest.Merge(m, src) +} +func (m *UnregisterVolumeRequest) XXX_Size() int { + return xxx_messageInfo_UnregisterVolumeRequest.Size(m) +} +func (m *UnregisterVolumeRequest) XXX_DiscardUnknown() { + xxx_messageInfo_UnregisterVolumeRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_UnregisterVolumeRequest proto.InternalMessageInfo + +func (m *UnregisterVolumeRequest) GetIndex() uint32 { + if m != nil { + return m.Index + } + return 0 +} + +func (m *UnregisterVolumeRequest) GetRepairTool() bool { + if m != nil { + return m.RepairTool + } + return false +} + +type UnregisterVolumeReply struct { + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *UnregisterVolumeReply) Reset() { *m = UnregisterVolumeReply{} } +func (m *UnregisterVolumeReply) String() string { return proto.CompactTextString(m) } +func (*UnregisterVolumeReply) ProtoMessage() {} +func (*UnregisterVolumeReply) Descriptor() ([]byte, []int) { + return fileDescriptor_1fcd0776e05e82a6, []int{3} +} + +func (m *UnregisterVolumeReply) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_UnregisterVolumeReply.Unmarshal(m, b) +} +func (m *UnregisterVolumeReply) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_UnregisterVolumeReply.Marshal(b, m, deterministic) +} +func (m *UnregisterVolumeReply) XXX_Merge(src proto.Message) { + xxx_messageInfo_UnregisterVolumeReply.Merge(m, src) +} +func (m *UnregisterVolumeReply) XXX_Size() int { + return xxx_messageInfo_UnregisterVolumeReply.Size(m) +} +func (m *UnregisterVolumeReply) XXX_DiscardUnknown() { + xxx_messageInfo_UnregisterVolumeReply.DiscardUnknown(m) +} + +var xxx_messageInfo_UnregisterVolumeReply proto.InternalMessageInfo + +type UpdateVolumeStateRequest struct { + VolumeIndex uint32 `protobuf:"varint,1,opt,name=volume_index,json=volumeIndex,proto3" json:"volume_index,omitempty"` + State VolumeState `protobuf:"varint,2,opt,name=state,proto3,enum=filemgr.VolumeState" json:"state,omitempty"` + RepairTool bool `protobuf:"varint,3,opt,name=repair_tool,json=repairTool,proto3" json:"repair_tool,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *UpdateVolumeStateRequest) Reset() { *m = UpdateVolumeStateRequest{} } +func (m *UpdateVolumeStateRequest) String() string { return proto.CompactTextString(m) } +func (*UpdateVolumeStateRequest) ProtoMessage() {} +func (*UpdateVolumeStateRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_1fcd0776e05e82a6, []int{4} +} + +func (m *UpdateVolumeStateRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_UpdateVolumeStateRequest.Unmarshal(m, b) +} +func (m *UpdateVolumeStateRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_UpdateVolumeStateRequest.Marshal(b, m, deterministic) +} +func (m *UpdateVolumeStateRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_UpdateVolumeStateRequest.Merge(m, src) +} +func (m *UpdateVolumeStateRequest) XXX_Size() int { + return xxx_messageInfo_UpdateVolumeStateRequest.Size(m) +} +func (m *UpdateVolumeStateRequest) XXX_DiscardUnknown() { + xxx_messageInfo_UpdateVolumeStateRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_UpdateVolumeStateRequest proto.InternalMessageInfo + +func (m *UpdateVolumeStateRequest) GetVolumeIndex() uint32 { + if m != nil { + return m.VolumeIndex + } + return 0 +} + +func (m *UpdateVolumeStateRequest) GetState() VolumeState { + if m != nil { + return m.State + } + return VolumeState_STATE_RW +} + +func (m *UpdateVolumeStateRequest) GetRepairTool() bool { + if m != nil { + return m.RepairTool + } + return false +} + +type UpdateVolumeStateReply struct { + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *UpdateVolumeStateReply) Reset() { *m = UpdateVolumeStateReply{} } +func (m *UpdateVolumeStateReply) String() string { return proto.CompactTextString(m) } +func (*UpdateVolumeStateReply) ProtoMessage() {} +func (*UpdateVolumeStateReply) Descriptor() ([]byte, []int) { + return fileDescriptor_1fcd0776e05e82a6, []int{5} +} + +func (m *UpdateVolumeStateReply) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_UpdateVolumeStateReply.Unmarshal(m, b) +} +func (m *UpdateVolumeStateReply) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_UpdateVolumeStateReply.Marshal(b, m, deterministic) +} +func (m *UpdateVolumeStateReply) XXX_Merge(src proto.Message) { + xxx_messageInfo_UpdateVolumeStateReply.Merge(m, src) +} +func (m *UpdateVolumeStateReply) XXX_Size() int { + return xxx_messageInfo_UpdateVolumeStateReply.Size(m) +} +func (m *UpdateVolumeStateReply) XXX_DiscardUnknown() { + xxx_messageInfo_UpdateVolumeStateReply.DiscardUnknown(m) +} + +var xxx_messageInfo_UpdateVolumeStateReply proto.InternalMessageInfo + +type GetVolumeRequest struct { + Index uint32 `protobuf:"varint,1,opt,name=index,proto3" json:"index,omitempty"` + RepairTool bool `protobuf:"varint,2,opt,name=repair_tool,json=repairTool,proto3" json:"repair_tool,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *GetVolumeRequest) Reset() { *m = GetVolumeRequest{} } +func (m *GetVolumeRequest) String() string { return proto.CompactTextString(m) } +func (*GetVolumeRequest) ProtoMessage() {} +func (*GetVolumeRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_1fcd0776e05e82a6, []int{6} +} + +func (m *GetVolumeRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_GetVolumeRequest.Unmarshal(m, b) +} +func (m *GetVolumeRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_GetVolumeRequest.Marshal(b, m, deterministic) +} +func (m *GetVolumeRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_GetVolumeRequest.Merge(m, src) +} +func (m *GetVolumeRequest) XXX_Size() int { + return xxx_messageInfo_GetVolumeRequest.Size(m) +} +func (m *GetVolumeRequest) XXX_DiscardUnknown() { + xxx_messageInfo_GetVolumeRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_GetVolumeRequest proto.InternalMessageInfo + +func (m *GetVolumeRequest) GetIndex() uint32 { + if m != nil { + return m.Index + } + return 0 +} + +func (m *GetVolumeRequest) GetRepairTool() bool { + if m != nil { + return m.RepairTool + } + return false +} + +type GetVolumeReply struct { + VolumeIndex uint32 `protobuf:"varint,1,opt,name=volume_index,json=volumeIndex,proto3" json:"volume_index,omitempty"` + VolumeType VolumeType `protobuf:"varint,2,opt,name=volume_type,json=volumeType,proto3,enum=filemgr.VolumeType" json:"volume_type,omitempty"` + VolumeState uint32 `protobuf:"varint,3,opt,name=volume_state,json=volumeState,proto3" json:"volume_state,omitempty"` + Partition uint32 `protobuf:"varint,4,opt,name=partition,proto3" json:"partition,omitempty"` + NextOffset uint64 `protobuf:"varint,5,opt,name=next_offset,json=nextOffset,proto3" json:"next_offset,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *GetVolumeReply) Reset() { *m = GetVolumeReply{} } +func (m *GetVolumeReply) String() string { return proto.CompactTextString(m) } +func (*GetVolumeReply) ProtoMessage() {} +func (*GetVolumeReply) Descriptor() ([]byte, []int) { + return fileDescriptor_1fcd0776e05e82a6, []int{7} +} + +func (m *GetVolumeReply) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_GetVolumeReply.Unmarshal(m, b) +} +func (m *GetVolumeReply) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_GetVolumeReply.Marshal(b, m, deterministic) +} +func (m *GetVolumeReply) XXX_Merge(src proto.Message) { + xxx_messageInfo_GetVolumeReply.Merge(m, src) +} +func (m *GetVolumeReply) XXX_Size() int { + return xxx_messageInfo_GetVolumeReply.Size(m) +} +func (m *GetVolumeReply) XXX_DiscardUnknown() { + xxx_messageInfo_GetVolumeReply.DiscardUnknown(m) +} + +var xxx_messageInfo_GetVolumeReply proto.InternalMessageInfo + +func (m *GetVolumeReply) GetVolumeIndex() uint32 { + if m != nil { + return m.VolumeIndex + } + return 0 +} + +func (m *GetVolumeReply) GetVolumeType() VolumeType { + if m != nil { + return m.VolumeType + } + return VolumeType_VOLUME_DEFAULT +} + +func (m *GetVolumeReply) GetVolumeState() uint32 { + if m != nil { + return m.VolumeState + } + return 0 +} + +func (m *GetVolumeReply) GetPartition() uint32 { + if m != nil { + return m.Partition + } + return 0 +} + +func (m *GetVolumeReply) GetNextOffset() uint64 { + if m != nil { + return m.NextOffset + } + return 0 +} + +type ListVolumesRequest struct { + Partition uint32 `protobuf:"varint,1,opt,name=partition,proto3" json:"partition,omitempty"` + Type VolumeType `protobuf:"varint,2,opt,name=type,proto3,enum=filemgr.VolumeType" json:"type,omitempty"` + RepairTool bool `protobuf:"varint,3,opt,name=repair_tool,json=repairTool,proto3" json:"repair_tool,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *ListVolumesRequest) Reset() { *m = ListVolumesRequest{} } +func (m *ListVolumesRequest) String() string { return proto.CompactTextString(m) } +func (*ListVolumesRequest) ProtoMessage() {} +func (*ListVolumesRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_1fcd0776e05e82a6, []int{8} +} + +func (m *ListVolumesRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_ListVolumesRequest.Unmarshal(m, b) +} +func (m *ListVolumesRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_ListVolumesRequest.Marshal(b, m, deterministic) +} +func (m *ListVolumesRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_ListVolumesRequest.Merge(m, src) +} +func (m *ListVolumesRequest) XXX_Size() int { + return xxx_messageInfo_ListVolumesRequest.Size(m) +} +func (m *ListVolumesRequest) XXX_DiscardUnknown() { + xxx_messageInfo_ListVolumesRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_ListVolumesRequest proto.InternalMessageInfo + +func (m *ListVolumesRequest) GetPartition() uint32 { + if m != nil { + return m.Partition + } + return 0 +} + +func (m *ListVolumesRequest) GetType() VolumeType { + if m != nil { + return m.Type + } + return VolumeType_VOLUME_DEFAULT +} + +func (m *ListVolumesRequest) GetRepairTool() bool { + if m != nil { + return m.RepairTool + } + return false +} + +type ListVolumesReply struct { + Volumes []*Volume `protobuf:"bytes,1,rep,name=volumes,proto3" json:"volumes,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *ListVolumesReply) Reset() { *m = ListVolumesReply{} } +func (m *ListVolumesReply) String() string { return proto.CompactTextString(m) } +func (*ListVolumesReply) ProtoMessage() {} +func (*ListVolumesReply) Descriptor() ([]byte, []int) { + return fileDescriptor_1fcd0776e05e82a6, []int{9} +} + +func (m *ListVolumesReply) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_ListVolumesReply.Unmarshal(m, b) +} +func (m *ListVolumesReply) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_ListVolumesReply.Marshal(b, m, deterministic) +} +func (m *ListVolumesReply) XXX_Merge(src proto.Message) { + xxx_messageInfo_ListVolumesReply.Merge(m, src) +} +func (m *ListVolumesReply) XXX_Size() int { + return xxx_messageInfo_ListVolumesReply.Size(m) +} +func (m *ListVolumesReply) XXX_DiscardUnknown() { + xxx_messageInfo_ListVolumesReply.DiscardUnknown(m) +} + +var xxx_messageInfo_ListVolumesReply proto.InternalMessageInfo + +func (m *ListVolumesReply) GetVolumes() []*Volume { + if m != nil { + return m.Volumes + } + return nil +} + +type RegisterObjectRequest struct { + Name []byte `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` + VolumeIndex uint32 `protobuf:"varint,2,opt,name=volume_index,json=volumeIndex,proto3" json:"volume_index,omitempty"` + Offset uint64 `protobuf:"varint,3,opt,name=offset,proto3" json:"offset,omitempty"` + NextOffset uint64 `protobuf:"varint,4,opt,name=next_offset,json=nextOffset,proto3" json:"next_offset,omitempty"` + RepairTool bool `protobuf:"varint,5,opt,name=repair_tool,json=repairTool,proto3" json:"repair_tool,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *RegisterObjectRequest) Reset() { *m = RegisterObjectRequest{} } +func (m *RegisterObjectRequest) String() string { return proto.CompactTextString(m) } +func (*RegisterObjectRequest) ProtoMessage() {} +func (*RegisterObjectRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_1fcd0776e05e82a6, []int{10} +} + +func (m *RegisterObjectRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_RegisterObjectRequest.Unmarshal(m, b) +} +func (m *RegisterObjectRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_RegisterObjectRequest.Marshal(b, m, deterministic) +} +func (m *RegisterObjectRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_RegisterObjectRequest.Merge(m, src) +} +func (m *RegisterObjectRequest) XXX_Size() int { + return xxx_messageInfo_RegisterObjectRequest.Size(m) +} +func (m *RegisterObjectRequest) XXX_DiscardUnknown() { + xxx_messageInfo_RegisterObjectRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_RegisterObjectRequest proto.InternalMessageInfo + +func (m *RegisterObjectRequest) GetName() []byte { + if m != nil { + return m.Name + } + return nil +} + +func (m *RegisterObjectRequest) GetVolumeIndex() uint32 { + if m != nil { + return m.VolumeIndex + } + return 0 +} + +func (m *RegisterObjectRequest) GetOffset() uint64 { + if m != nil { + return m.Offset + } + return 0 +} + +func (m *RegisterObjectRequest) GetNextOffset() uint64 { + if m != nil { + return m.NextOffset + } + return 0 +} + +func (m *RegisterObjectRequest) GetRepairTool() bool { + if m != nil { + return m.RepairTool + } + return false +} + +type RegisterObjectReply struct { + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *RegisterObjectReply) Reset() { *m = RegisterObjectReply{} } +func (m *RegisterObjectReply) String() string { return proto.CompactTextString(m) } +func (*RegisterObjectReply) ProtoMessage() {} +func (*RegisterObjectReply) Descriptor() ([]byte, []int) { + return fileDescriptor_1fcd0776e05e82a6, []int{11} +} + +func (m *RegisterObjectReply) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_RegisterObjectReply.Unmarshal(m, b) +} +func (m *RegisterObjectReply) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_RegisterObjectReply.Marshal(b, m, deterministic) +} +func (m *RegisterObjectReply) XXX_Merge(src proto.Message) { + xxx_messageInfo_RegisterObjectReply.Merge(m, src) +} +func (m *RegisterObjectReply) XXX_Size() int { + return xxx_messageInfo_RegisterObjectReply.Size(m) +} +func (m *RegisterObjectReply) XXX_DiscardUnknown() { + xxx_messageInfo_RegisterObjectReply.DiscardUnknown(m) +} + +var xxx_messageInfo_RegisterObjectReply proto.InternalMessageInfo + +type UnregisterObjectRequest struct { + Name []byte `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` + RepairTool bool `protobuf:"varint,2,opt,name=repair_tool,json=repairTool,proto3" json:"repair_tool,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *UnregisterObjectRequest) Reset() { *m = UnregisterObjectRequest{} } +func (m *UnregisterObjectRequest) String() string { return proto.CompactTextString(m) } +func (*UnregisterObjectRequest) ProtoMessage() {} +func (*UnregisterObjectRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_1fcd0776e05e82a6, []int{12} +} + +func (m *UnregisterObjectRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_UnregisterObjectRequest.Unmarshal(m, b) +} +func (m *UnregisterObjectRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_UnregisterObjectRequest.Marshal(b, m, deterministic) +} +func (m *UnregisterObjectRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_UnregisterObjectRequest.Merge(m, src) +} +func (m *UnregisterObjectRequest) XXX_Size() int { + return xxx_messageInfo_UnregisterObjectRequest.Size(m) +} +func (m *UnregisterObjectRequest) XXX_DiscardUnknown() { + xxx_messageInfo_UnregisterObjectRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_UnregisterObjectRequest proto.InternalMessageInfo + +func (m *UnregisterObjectRequest) GetName() []byte { + if m != nil { + return m.Name + } + return nil +} + +func (m *UnregisterObjectRequest) GetRepairTool() bool { + if m != nil { + return m.RepairTool + } + return false +} + +type UnregisterObjectReply struct { + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *UnregisterObjectReply) Reset() { *m = UnregisterObjectReply{} } +func (m *UnregisterObjectReply) String() string { return proto.CompactTextString(m) } +func (*UnregisterObjectReply) ProtoMessage() {} +func (*UnregisterObjectReply) Descriptor() ([]byte, []int) { + return fileDescriptor_1fcd0776e05e82a6, []int{13} +} + +func (m *UnregisterObjectReply) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_UnregisterObjectReply.Unmarshal(m, b) +} +func (m *UnregisterObjectReply) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_UnregisterObjectReply.Marshal(b, m, deterministic) +} +func (m *UnregisterObjectReply) XXX_Merge(src proto.Message) { + xxx_messageInfo_UnregisterObjectReply.Merge(m, src) +} +func (m *UnregisterObjectReply) XXX_Size() int { + return xxx_messageInfo_UnregisterObjectReply.Size(m) +} +func (m *UnregisterObjectReply) XXX_DiscardUnknown() { + xxx_messageInfo_UnregisterObjectReply.DiscardUnknown(m) +} + +var xxx_messageInfo_UnregisterObjectReply proto.InternalMessageInfo + +type RenameObjectRequest struct { + Name []byte `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` + NewName []byte `protobuf:"bytes,2,opt,name=new_name,json=newName,proto3" json:"new_name,omitempty"` + RepairTool bool `protobuf:"varint,3,opt,name=repair_tool,json=repairTool,proto3" json:"repair_tool,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *RenameObjectRequest) Reset() { *m = RenameObjectRequest{} } +func (m *RenameObjectRequest) String() string { return proto.CompactTextString(m) } +func (*RenameObjectRequest) ProtoMessage() {} +func (*RenameObjectRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_1fcd0776e05e82a6, []int{14} +} + +func (m *RenameObjectRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_RenameObjectRequest.Unmarshal(m, b) +} +func (m *RenameObjectRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_RenameObjectRequest.Marshal(b, m, deterministic) +} +func (m *RenameObjectRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_RenameObjectRequest.Merge(m, src) +} +func (m *RenameObjectRequest) XXX_Size() int { + return xxx_messageInfo_RenameObjectRequest.Size(m) +} +func (m *RenameObjectRequest) XXX_DiscardUnknown() { + xxx_messageInfo_RenameObjectRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_RenameObjectRequest proto.InternalMessageInfo + +func (m *RenameObjectRequest) GetName() []byte { + if m != nil { + return m.Name + } + return nil +} + +func (m *RenameObjectRequest) GetNewName() []byte { + if m != nil { + return m.NewName + } + return nil +} + +func (m *RenameObjectRequest) GetRepairTool() bool { + if m != nil { + return m.RepairTool + } + return false +} + +type RenameObjectReply struct { + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *RenameObjectReply) Reset() { *m = RenameObjectReply{} } +func (m *RenameObjectReply) String() string { return proto.CompactTextString(m) } +func (*RenameObjectReply) ProtoMessage() {} +func (*RenameObjectReply) Descriptor() ([]byte, []int) { + return fileDescriptor_1fcd0776e05e82a6, []int{15} +} + +func (m *RenameObjectReply) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_RenameObjectReply.Unmarshal(m, b) +} +func (m *RenameObjectReply) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_RenameObjectReply.Marshal(b, m, deterministic) +} +func (m *RenameObjectReply) XXX_Merge(src proto.Message) { + xxx_messageInfo_RenameObjectReply.Merge(m, src) +} +func (m *RenameObjectReply) XXX_Size() int { + return xxx_messageInfo_RenameObjectReply.Size(m) +} +func (m *RenameObjectReply) XXX_DiscardUnknown() { + xxx_messageInfo_RenameObjectReply.DiscardUnknown(m) +} + +var xxx_messageInfo_RenameObjectReply proto.InternalMessageInfo + +type LoadObjectRequest struct { + Name []byte `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` + IsQuarantined bool `protobuf:"varint,2,opt,name=is_quarantined,json=isQuarantined,proto3" json:"is_quarantined,omitempty"` + RepairTool bool `protobuf:"varint,3,opt,name=repair_tool,json=repairTool,proto3" json:"repair_tool,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *LoadObjectRequest) Reset() { *m = LoadObjectRequest{} } +func (m *LoadObjectRequest) String() string { return proto.CompactTextString(m) } +func (*LoadObjectRequest) ProtoMessage() {} +func (*LoadObjectRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_1fcd0776e05e82a6, []int{16} +} + +func (m *LoadObjectRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_LoadObjectRequest.Unmarshal(m, b) +} +func (m *LoadObjectRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_LoadObjectRequest.Marshal(b, m, deterministic) +} +func (m *LoadObjectRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_LoadObjectRequest.Merge(m, src) +} +func (m *LoadObjectRequest) XXX_Size() int { + return xxx_messageInfo_LoadObjectRequest.Size(m) +} +func (m *LoadObjectRequest) XXX_DiscardUnknown() { + xxx_messageInfo_LoadObjectRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_LoadObjectRequest proto.InternalMessageInfo + +func (m *LoadObjectRequest) GetName() []byte { + if m != nil { + return m.Name + } + return nil +} + +func (m *LoadObjectRequest) GetIsQuarantined() bool { + if m != nil { + return m.IsQuarantined + } + return false +} + +func (m *LoadObjectRequest) GetRepairTool() bool { + if m != nil { + return m.RepairTool + } + return false +} + +type LoadObjectReply struct { + Name []byte `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` + VolumeIndex uint32 `protobuf:"varint,2,opt,name=volume_index,json=volumeIndex,proto3" json:"volume_index,omitempty"` + Offset uint64 `protobuf:"varint,3,opt,name=offset,proto3" json:"offset,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *LoadObjectReply) Reset() { *m = LoadObjectReply{} } +func (m *LoadObjectReply) String() string { return proto.CompactTextString(m) } +func (*LoadObjectReply) ProtoMessage() {} +func (*LoadObjectReply) Descriptor() ([]byte, []int) { + return fileDescriptor_1fcd0776e05e82a6, []int{17} +} + +func (m *LoadObjectReply) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_LoadObjectReply.Unmarshal(m, b) +} +func (m *LoadObjectReply) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_LoadObjectReply.Marshal(b, m, deterministic) +} +func (m *LoadObjectReply) XXX_Merge(src proto.Message) { + xxx_messageInfo_LoadObjectReply.Merge(m, src) +} +func (m *LoadObjectReply) XXX_Size() int { + return xxx_messageInfo_LoadObjectReply.Size(m) +} +func (m *LoadObjectReply) XXX_DiscardUnknown() { + xxx_messageInfo_LoadObjectReply.DiscardUnknown(m) +} + +var xxx_messageInfo_LoadObjectReply proto.InternalMessageInfo + +func (m *LoadObjectReply) GetName() []byte { + if m != nil { + return m.Name + } + return nil +} + +func (m *LoadObjectReply) GetVolumeIndex() uint32 { + if m != nil { + return m.VolumeIndex + } + return 0 +} + +func (m *LoadObjectReply) GetOffset() uint64 { + if m != nil { + return m.Offset + } + return 0 +} + +type QuarantineObjectRequest struct { + Name []byte `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` + RepairTool bool `protobuf:"varint,2,opt,name=repair_tool,json=repairTool,proto3" json:"repair_tool,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *QuarantineObjectRequest) Reset() { *m = QuarantineObjectRequest{} } +func (m *QuarantineObjectRequest) String() string { return proto.CompactTextString(m) } +func (*QuarantineObjectRequest) ProtoMessage() {} +func (*QuarantineObjectRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_1fcd0776e05e82a6, []int{18} +} + +func (m *QuarantineObjectRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_QuarantineObjectRequest.Unmarshal(m, b) +} +func (m *QuarantineObjectRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_QuarantineObjectRequest.Marshal(b, m, deterministic) +} +func (m *QuarantineObjectRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_QuarantineObjectRequest.Merge(m, src) +} +func (m *QuarantineObjectRequest) XXX_Size() int { + return xxx_messageInfo_QuarantineObjectRequest.Size(m) +} +func (m *QuarantineObjectRequest) XXX_DiscardUnknown() { + xxx_messageInfo_QuarantineObjectRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_QuarantineObjectRequest proto.InternalMessageInfo + +func (m *QuarantineObjectRequest) GetName() []byte { + if m != nil { + return m.Name + } + return nil +} + +func (m *QuarantineObjectRequest) GetRepairTool() bool { + if m != nil { + return m.RepairTool + } + return false +} + +type QuarantineObjectReply struct { + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *QuarantineObjectReply) Reset() { *m = QuarantineObjectReply{} } +func (m *QuarantineObjectReply) String() string { return proto.CompactTextString(m) } +func (*QuarantineObjectReply) ProtoMessage() {} +func (*QuarantineObjectReply) Descriptor() ([]byte, []int) { + return fileDescriptor_1fcd0776e05e82a6, []int{19} +} + +func (m *QuarantineObjectReply) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_QuarantineObjectReply.Unmarshal(m, b) +} +func (m *QuarantineObjectReply) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_QuarantineObjectReply.Marshal(b, m, deterministic) +} +func (m *QuarantineObjectReply) XXX_Merge(src proto.Message) { + xxx_messageInfo_QuarantineObjectReply.Merge(m, src) +} +func (m *QuarantineObjectReply) XXX_Size() int { + return xxx_messageInfo_QuarantineObjectReply.Size(m) +} +func (m *QuarantineObjectReply) XXX_DiscardUnknown() { + xxx_messageInfo_QuarantineObjectReply.DiscardUnknown(m) +} + +var xxx_messageInfo_QuarantineObjectReply proto.InternalMessageInfo + +type UnquarantineObjectRequest struct { + Name []byte `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` + RepairTool bool `protobuf:"varint,2,opt,name=repair_tool,json=repairTool,proto3" json:"repair_tool,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *UnquarantineObjectRequest) Reset() { *m = UnquarantineObjectRequest{} } +func (m *UnquarantineObjectRequest) String() string { return proto.CompactTextString(m) } +func (*UnquarantineObjectRequest) ProtoMessage() {} +func (*UnquarantineObjectRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_1fcd0776e05e82a6, []int{20} +} + +func (m *UnquarantineObjectRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_UnquarantineObjectRequest.Unmarshal(m, b) +} +func (m *UnquarantineObjectRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_UnquarantineObjectRequest.Marshal(b, m, deterministic) +} +func (m *UnquarantineObjectRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_UnquarantineObjectRequest.Merge(m, src) +} +func (m *UnquarantineObjectRequest) XXX_Size() int { + return xxx_messageInfo_UnquarantineObjectRequest.Size(m) +} +func (m *UnquarantineObjectRequest) XXX_DiscardUnknown() { + xxx_messageInfo_UnquarantineObjectRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_UnquarantineObjectRequest proto.InternalMessageInfo + +func (m *UnquarantineObjectRequest) GetName() []byte { + if m != nil { + return m.Name + } + return nil +} + +func (m *UnquarantineObjectRequest) GetRepairTool() bool { + if m != nil { + return m.RepairTool + } + return false +} + +type UnquarantineObjectReply struct { + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *UnquarantineObjectReply) Reset() { *m = UnquarantineObjectReply{} } +func (m *UnquarantineObjectReply) String() string { return proto.CompactTextString(m) } +func (*UnquarantineObjectReply) ProtoMessage() {} +func (*UnquarantineObjectReply) Descriptor() ([]byte, []int) { + return fileDescriptor_1fcd0776e05e82a6, []int{21} +} + +func (m *UnquarantineObjectReply) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_UnquarantineObjectReply.Unmarshal(m, b) +} +func (m *UnquarantineObjectReply) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_UnquarantineObjectReply.Marshal(b, m, deterministic) +} +func (m *UnquarantineObjectReply) XXX_Merge(src proto.Message) { + xxx_messageInfo_UnquarantineObjectReply.Merge(m, src) +} +func (m *UnquarantineObjectReply) XXX_Size() int { + return xxx_messageInfo_UnquarantineObjectReply.Size(m) +} +func (m *UnquarantineObjectReply) XXX_DiscardUnknown() { + xxx_messageInfo_UnquarantineObjectReply.DiscardUnknown(m) +} + +var xxx_messageInfo_UnquarantineObjectReply proto.InternalMessageInfo + +type LoadObjectsByPrefixRequest struct { + Prefix []byte `protobuf:"bytes,1,opt,name=prefix,proto3" json:"prefix,omitempty"` + RepairTool bool `protobuf:"varint,2,opt,name=repair_tool,json=repairTool,proto3" json:"repair_tool,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *LoadObjectsByPrefixRequest) Reset() { *m = LoadObjectsByPrefixRequest{} } +func (m *LoadObjectsByPrefixRequest) String() string { return proto.CompactTextString(m) } +func (*LoadObjectsByPrefixRequest) ProtoMessage() {} +func (*LoadObjectsByPrefixRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_1fcd0776e05e82a6, []int{22} +} + +func (m *LoadObjectsByPrefixRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_LoadObjectsByPrefixRequest.Unmarshal(m, b) +} +func (m *LoadObjectsByPrefixRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_LoadObjectsByPrefixRequest.Marshal(b, m, deterministic) +} +func (m *LoadObjectsByPrefixRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_LoadObjectsByPrefixRequest.Merge(m, src) +} +func (m *LoadObjectsByPrefixRequest) XXX_Size() int { + return xxx_messageInfo_LoadObjectsByPrefixRequest.Size(m) +} +func (m *LoadObjectsByPrefixRequest) XXX_DiscardUnknown() { + xxx_messageInfo_LoadObjectsByPrefixRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_LoadObjectsByPrefixRequest proto.InternalMessageInfo + +func (m *LoadObjectsByPrefixRequest) GetPrefix() []byte { + if m != nil { + return m.Prefix + } + return nil +} + +func (m *LoadObjectsByPrefixRequest) GetRepairTool() bool { + if m != nil { + return m.RepairTool + } + return false +} + +type LoadObjectsByPrefixReply struct { + Objects []*Object `protobuf:"bytes,1,rep,name=objects,proto3" json:"objects,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *LoadObjectsByPrefixReply) Reset() { *m = LoadObjectsByPrefixReply{} } +func (m *LoadObjectsByPrefixReply) String() string { return proto.CompactTextString(m) } +func (*LoadObjectsByPrefixReply) ProtoMessage() {} +func (*LoadObjectsByPrefixReply) Descriptor() ([]byte, []int) { + return fileDescriptor_1fcd0776e05e82a6, []int{23} +} + +func (m *LoadObjectsByPrefixReply) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_LoadObjectsByPrefixReply.Unmarshal(m, b) +} +func (m *LoadObjectsByPrefixReply) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_LoadObjectsByPrefixReply.Marshal(b, m, deterministic) +} +func (m *LoadObjectsByPrefixReply) XXX_Merge(src proto.Message) { + xxx_messageInfo_LoadObjectsByPrefixReply.Merge(m, src) +} +func (m *LoadObjectsByPrefixReply) XXX_Size() int { + return xxx_messageInfo_LoadObjectsByPrefixReply.Size(m) +} +func (m *LoadObjectsByPrefixReply) XXX_DiscardUnknown() { + xxx_messageInfo_LoadObjectsByPrefixReply.DiscardUnknown(m) +} + +var xxx_messageInfo_LoadObjectsByPrefixReply proto.InternalMessageInfo + +func (m *LoadObjectsByPrefixReply) GetObjects() []*Object { + if m != nil { + return m.Objects + } + return nil +} + +type LoadObjectsByVolumeRequest struct { + Index uint32 `protobuf:"varint,1,opt,name=index,proto3" json:"index,omitempty"` + Quarantined bool `protobuf:"varint,2,opt,name=quarantined,proto3" json:"quarantined,omitempty"` + PageToken []byte `protobuf:"bytes,3,opt,name=page_token,json=pageToken,proto3" json:"page_token,omitempty"` + PageSize uint32 `protobuf:"varint,4,opt,name=page_size,json=pageSize,proto3" json:"page_size,omitempty"` + RepairTool bool `protobuf:"varint,5,opt,name=repair_tool,json=repairTool,proto3" json:"repair_tool,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *LoadObjectsByVolumeRequest) Reset() { *m = LoadObjectsByVolumeRequest{} } +func (m *LoadObjectsByVolumeRequest) String() string { return proto.CompactTextString(m) } +func (*LoadObjectsByVolumeRequest) ProtoMessage() {} +func (*LoadObjectsByVolumeRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_1fcd0776e05e82a6, []int{24} +} + +func (m *LoadObjectsByVolumeRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_LoadObjectsByVolumeRequest.Unmarshal(m, b) +} +func (m *LoadObjectsByVolumeRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_LoadObjectsByVolumeRequest.Marshal(b, m, deterministic) +} +func (m *LoadObjectsByVolumeRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_LoadObjectsByVolumeRequest.Merge(m, src) +} +func (m *LoadObjectsByVolumeRequest) XXX_Size() int { + return xxx_messageInfo_LoadObjectsByVolumeRequest.Size(m) +} +func (m *LoadObjectsByVolumeRequest) XXX_DiscardUnknown() { + xxx_messageInfo_LoadObjectsByVolumeRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_LoadObjectsByVolumeRequest proto.InternalMessageInfo + +func (m *LoadObjectsByVolumeRequest) GetIndex() uint32 { + if m != nil { + return m.Index + } + return 0 +} + +func (m *LoadObjectsByVolumeRequest) GetQuarantined() bool { + if m != nil { + return m.Quarantined + } + return false +} + +func (m *LoadObjectsByVolumeRequest) GetPageToken() []byte { + if m != nil { + return m.PageToken + } + return nil +} + +func (m *LoadObjectsByVolumeRequest) GetPageSize() uint32 { + if m != nil { + return m.PageSize + } + return 0 +} + +func (m *LoadObjectsByVolumeRequest) GetRepairTool() bool { + if m != nil { + return m.RepairTool + } + return false +} + +type LoadObjectsByVolumeReply struct { + Objects []*Object `protobuf:"bytes,1,rep,name=objects,proto3" json:"objects,omitempty"` + NextPageToken []byte `protobuf:"bytes,2,opt,name=next_page_token,json=nextPageToken,proto3" json:"next_page_token,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *LoadObjectsByVolumeReply) Reset() { *m = LoadObjectsByVolumeReply{} } +func (m *LoadObjectsByVolumeReply) String() string { return proto.CompactTextString(m) } +func (*LoadObjectsByVolumeReply) ProtoMessage() {} +func (*LoadObjectsByVolumeReply) Descriptor() ([]byte, []int) { + return fileDescriptor_1fcd0776e05e82a6, []int{25} +} + +func (m *LoadObjectsByVolumeReply) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_LoadObjectsByVolumeReply.Unmarshal(m, b) +} +func (m *LoadObjectsByVolumeReply) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_LoadObjectsByVolumeReply.Marshal(b, m, deterministic) +} +func (m *LoadObjectsByVolumeReply) XXX_Merge(src proto.Message) { + xxx_messageInfo_LoadObjectsByVolumeReply.Merge(m, src) +} +func (m *LoadObjectsByVolumeReply) XXX_Size() int { + return xxx_messageInfo_LoadObjectsByVolumeReply.Size(m) +} +func (m *LoadObjectsByVolumeReply) XXX_DiscardUnknown() { + xxx_messageInfo_LoadObjectsByVolumeReply.DiscardUnknown(m) +} + +var xxx_messageInfo_LoadObjectsByVolumeReply proto.InternalMessageInfo + +func (m *LoadObjectsByVolumeReply) GetObjects() []*Object { + if m != nil { + return m.Objects + } + return nil +} + +func (m *LoadObjectsByVolumeReply) GetNextPageToken() []byte { + if m != nil { + return m.NextPageToken + } + return nil +} + +type ListPartitionsRequest struct { + PartitionBits uint32 `protobuf:"varint,1,opt,name=partition_bits,json=partitionBits,proto3" json:"partition_bits,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *ListPartitionsRequest) Reset() { *m = ListPartitionsRequest{} } +func (m *ListPartitionsRequest) String() string { return proto.CompactTextString(m) } +func (*ListPartitionsRequest) ProtoMessage() {} +func (*ListPartitionsRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_1fcd0776e05e82a6, []int{26} +} + +func (m *ListPartitionsRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_ListPartitionsRequest.Unmarshal(m, b) +} +func (m *ListPartitionsRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_ListPartitionsRequest.Marshal(b, m, deterministic) +} +func (m *ListPartitionsRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_ListPartitionsRequest.Merge(m, src) +} +func (m *ListPartitionsRequest) XXX_Size() int { + return xxx_messageInfo_ListPartitionsRequest.Size(m) +} +func (m *ListPartitionsRequest) XXX_DiscardUnknown() { + xxx_messageInfo_ListPartitionsRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_ListPartitionsRequest proto.InternalMessageInfo + +func (m *ListPartitionsRequest) GetPartitionBits() uint32 { + if m != nil { + return m.PartitionBits + } + return 0 +} + +type ListPartitionRequest struct { + Partition uint32 `protobuf:"varint,1,opt,name=partition,proto3" json:"partition,omitempty"` + PartitionBits uint32 `protobuf:"varint,2,opt,name=partition_bits,json=partitionBits,proto3" json:"partition_bits,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *ListPartitionRequest) Reset() { *m = ListPartitionRequest{} } +func (m *ListPartitionRequest) String() string { return proto.CompactTextString(m) } +func (*ListPartitionRequest) ProtoMessage() {} +func (*ListPartitionRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_1fcd0776e05e82a6, []int{27} +} + +func (m *ListPartitionRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_ListPartitionRequest.Unmarshal(m, b) +} +func (m *ListPartitionRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_ListPartitionRequest.Marshal(b, m, deterministic) +} +func (m *ListPartitionRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_ListPartitionRequest.Merge(m, src) +} +func (m *ListPartitionRequest) XXX_Size() int { + return xxx_messageInfo_ListPartitionRequest.Size(m) +} +func (m *ListPartitionRequest) XXX_DiscardUnknown() { + xxx_messageInfo_ListPartitionRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_ListPartitionRequest proto.InternalMessageInfo + +func (m *ListPartitionRequest) GetPartition() uint32 { + if m != nil { + return m.Partition + } + return 0 +} + +func (m *ListPartitionRequest) GetPartitionBits() uint32 { + if m != nil { + return m.PartitionBits + } + return 0 +} + +type ListSuffixRequest struct { + Partition uint32 `protobuf:"varint,1,opt,name=partition,proto3" json:"partition,omitempty"` + Suffix []byte `protobuf:"bytes,2,opt,name=suffix,proto3" json:"suffix,omitempty"` + PartitionBits uint32 `protobuf:"varint,3,opt,name=partition_bits,json=partitionBits,proto3" json:"partition_bits,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *ListSuffixRequest) Reset() { *m = ListSuffixRequest{} } +func (m *ListSuffixRequest) String() string { return proto.CompactTextString(m) } +func (*ListSuffixRequest) ProtoMessage() {} +func (*ListSuffixRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_1fcd0776e05e82a6, []int{28} +} + +func (m *ListSuffixRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_ListSuffixRequest.Unmarshal(m, b) +} +func (m *ListSuffixRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_ListSuffixRequest.Marshal(b, m, deterministic) +} +func (m *ListSuffixRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_ListSuffixRequest.Merge(m, src) +} +func (m *ListSuffixRequest) XXX_Size() int { + return xxx_messageInfo_ListSuffixRequest.Size(m) +} +func (m *ListSuffixRequest) XXX_DiscardUnknown() { + xxx_messageInfo_ListSuffixRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_ListSuffixRequest proto.InternalMessageInfo + +func (m *ListSuffixRequest) GetPartition() uint32 { + if m != nil { + return m.Partition + } + return 0 +} + +func (m *ListSuffixRequest) GetSuffix() []byte { + if m != nil { + return m.Suffix + } + return nil +} + +func (m *ListSuffixRequest) GetPartitionBits() uint32 { + if m != nil { + return m.PartitionBits + } + return 0 +} + +type ListQuarantinedOHashesRequest struct { + PageToken []byte `protobuf:"bytes,1,opt,name=page_token,json=pageToken,proto3" json:"page_token,omitempty"` + PageSize uint32 `protobuf:"varint,2,opt,name=page_size,json=pageSize,proto3" json:"page_size,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *ListQuarantinedOHashesRequest) Reset() { *m = ListQuarantinedOHashesRequest{} } +func (m *ListQuarantinedOHashesRequest) String() string { return proto.CompactTextString(m) } +func (*ListQuarantinedOHashesRequest) ProtoMessage() {} +func (*ListQuarantinedOHashesRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_1fcd0776e05e82a6, []int{29} +} + +func (m *ListQuarantinedOHashesRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_ListQuarantinedOHashesRequest.Unmarshal(m, b) +} +func (m *ListQuarantinedOHashesRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_ListQuarantinedOHashesRequest.Marshal(b, m, deterministic) +} +func (m *ListQuarantinedOHashesRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_ListQuarantinedOHashesRequest.Merge(m, src) +} +func (m *ListQuarantinedOHashesRequest) XXX_Size() int { + return xxx_messageInfo_ListQuarantinedOHashesRequest.Size(m) +} +func (m *ListQuarantinedOHashesRequest) XXX_DiscardUnknown() { + xxx_messageInfo_ListQuarantinedOHashesRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_ListQuarantinedOHashesRequest proto.InternalMessageInfo + +func (m *ListQuarantinedOHashesRequest) GetPageToken() []byte { + if m != nil { + return m.PageToken + } + return nil +} + +func (m *ListQuarantinedOHashesRequest) GetPageSize() uint32 { + if m != nil { + return m.PageSize + } + return 0 +} + +type ListQuarantinedOHashesReply struct { + Objects []*QuarantinedObjectName `protobuf:"bytes,1,rep,name=objects,proto3" json:"objects,omitempty"` + NextPageToken []byte `protobuf:"bytes,2,opt,name=next_page_token,json=nextPageToken,proto3" json:"next_page_token,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *ListQuarantinedOHashesReply) Reset() { *m = ListQuarantinedOHashesReply{} } +func (m *ListQuarantinedOHashesReply) String() string { return proto.CompactTextString(m) } +func (*ListQuarantinedOHashesReply) ProtoMessage() {} +func (*ListQuarantinedOHashesReply) Descriptor() ([]byte, []int) { + return fileDescriptor_1fcd0776e05e82a6, []int{30} +} + +func (m *ListQuarantinedOHashesReply) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_ListQuarantinedOHashesReply.Unmarshal(m, b) +} +func (m *ListQuarantinedOHashesReply) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_ListQuarantinedOHashesReply.Marshal(b, m, deterministic) +} +func (m *ListQuarantinedOHashesReply) XXX_Merge(src proto.Message) { + xxx_messageInfo_ListQuarantinedOHashesReply.Merge(m, src) +} +func (m *ListQuarantinedOHashesReply) XXX_Size() int { + return xxx_messageInfo_ListQuarantinedOHashesReply.Size(m) +} +func (m *ListQuarantinedOHashesReply) XXX_DiscardUnknown() { + xxx_messageInfo_ListQuarantinedOHashesReply.DiscardUnknown(m) +} + +var xxx_messageInfo_ListQuarantinedOHashesReply proto.InternalMessageInfo + +func (m *ListQuarantinedOHashesReply) GetObjects() []*QuarantinedObjectName { + if m != nil { + return m.Objects + } + return nil +} + +func (m *ListQuarantinedOHashesReply) GetNextPageToken() []byte { + if m != nil { + return m.NextPageToken + } + return nil +} + +type ListQuarantinedOHashRequest struct { + Prefix []byte `protobuf:"bytes,1,opt,name=prefix,proto3" json:"prefix,omitempty"` + RepairTool bool `protobuf:"varint,2,opt,name=repair_tool,json=repairTool,proto3" json:"repair_tool,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *ListQuarantinedOHashRequest) Reset() { *m = ListQuarantinedOHashRequest{} } +func (m *ListQuarantinedOHashRequest) String() string { return proto.CompactTextString(m) } +func (*ListQuarantinedOHashRequest) ProtoMessage() {} +func (*ListQuarantinedOHashRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_1fcd0776e05e82a6, []int{31} +} + +func (m *ListQuarantinedOHashRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_ListQuarantinedOHashRequest.Unmarshal(m, b) +} +func (m *ListQuarantinedOHashRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_ListQuarantinedOHashRequest.Marshal(b, m, deterministic) +} +func (m *ListQuarantinedOHashRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_ListQuarantinedOHashRequest.Merge(m, src) +} +func (m *ListQuarantinedOHashRequest) XXX_Size() int { + return xxx_messageInfo_ListQuarantinedOHashRequest.Size(m) +} +func (m *ListQuarantinedOHashRequest) XXX_DiscardUnknown() { + xxx_messageInfo_ListQuarantinedOHashRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_ListQuarantinedOHashRequest proto.InternalMessageInfo + +func (m *ListQuarantinedOHashRequest) GetPrefix() []byte { + if m != nil { + return m.Prefix + } + return nil +} + +func (m *ListQuarantinedOHashRequest) GetRepairTool() bool { + if m != nil { + return m.RepairTool + } + return false +} + +type ListQuarantinedOHashReply struct { + Objects []*Object `protobuf:"bytes,1,rep,name=objects,proto3" json:"objects,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *ListQuarantinedOHashReply) Reset() { *m = ListQuarantinedOHashReply{} } +func (m *ListQuarantinedOHashReply) String() string { return proto.CompactTextString(m) } +func (*ListQuarantinedOHashReply) ProtoMessage() {} +func (*ListQuarantinedOHashReply) Descriptor() ([]byte, []int) { + return fileDescriptor_1fcd0776e05e82a6, []int{32} +} + +func (m *ListQuarantinedOHashReply) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_ListQuarantinedOHashReply.Unmarshal(m, b) +} +func (m *ListQuarantinedOHashReply) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_ListQuarantinedOHashReply.Marshal(b, m, deterministic) +} +func (m *ListQuarantinedOHashReply) XXX_Merge(src proto.Message) { + xxx_messageInfo_ListQuarantinedOHashReply.Merge(m, src) +} +func (m *ListQuarantinedOHashReply) XXX_Size() int { + return xxx_messageInfo_ListQuarantinedOHashReply.Size(m) +} +func (m *ListQuarantinedOHashReply) XXX_DiscardUnknown() { + xxx_messageInfo_ListQuarantinedOHashReply.DiscardUnknown(m) +} + +var xxx_messageInfo_ListQuarantinedOHashReply proto.InternalMessageInfo + +func (m *ListQuarantinedOHashReply) GetObjects() []*Object { + if m != nil { + return m.Objects + } + return nil +} + +type GetNextOffsetRequest struct { + VolumeIndex uint32 `protobuf:"varint,1,opt,name=volume_index,json=volumeIndex,proto3" json:"volume_index,omitempty"` + RepairTool bool `protobuf:"varint,2,opt,name=repair_tool,json=repairTool,proto3" json:"repair_tool,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *GetNextOffsetRequest) Reset() { *m = GetNextOffsetRequest{} } +func (m *GetNextOffsetRequest) String() string { return proto.CompactTextString(m) } +func (*GetNextOffsetRequest) ProtoMessage() {} +func (*GetNextOffsetRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_1fcd0776e05e82a6, []int{33} +} + +func (m *GetNextOffsetRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_GetNextOffsetRequest.Unmarshal(m, b) +} +func (m *GetNextOffsetRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_GetNextOffsetRequest.Marshal(b, m, deterministic) +} +func (m *GetNextOffsetRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_GetNextOffsetRequest.Merge(m, src) +} +func (m *GetNextOffsetRequest) XXX_Size() int { + return xxx_messageInfo_GetNextOffsetRequest.Size(m) +} +func (m *GetNextOffsetRequest) XXX_DiscardUnknown() { + xxx_messageInfo_GetNextOffsetRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_GetNextOffsetRequest proto.InternalMessageInfo + +func (m *GetNextOffsetRequest) GetVolumeIndex() uint32 { + if m != nil { + return m.VolumeIndex + } + return 0 +} + +func (m *GetNextOffsetRequest) GetRepairTool() bool { + if m != nil { + return m.RepairTool + } + return false +} + +type GetNextOffsetReply struct { + Offset uint64 `protobuf:"varint,1,opt,name=offset,proto3" json:"offset,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *GetNextOffsetReply) Reset() { *m = GetNextOffsetReply{} } +func (m *GetNextOffsetReply) String() string { return proto.CompactTextString(m) } +func (*GetNextOffsetReply) ProtoMessage() {} +func (*GetNextOffsetReply) Descriptor() ([]byte, []int) { + return fileDescriptor_1fcd0776e05e82a6, []int{34} +} + +func (m *GetNextOffsetReply) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_GetNextOffsetReply.Unmarshal(m, b) +} +func (m *GetNextOffsetReply) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_GetNextOffsetReply.Marshal(b, m, deterministic) +} +func (m *GetNextOffsetReply) XXX_Merge(src proto.Message) { + xxx_messageInfo_GetNextOffsetReply.Merge(m, src) +} +func (m *GetNextOffsetReply) XXX_Size() int { + return xxx_messageInfo_GetNextOffsetReply.Size(m) +} +func (m *GetNextOffsetReply) XXX_DiscardUnknown() { + xxx_messageInfo_GetNextOffsetReply.DiscardUnknown(m) +} + +var xxx_messageInfo_GetNextOffsetReply proto.InternalMessageInfo + +func (m *GetNextOffsetReply) GetOffset() uint64 { + if m != nil { + return m.Offset + } + return 0 +} + +type GetStatsRequest struct { + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *GetStatsRequest) Reset() { *m = GetStatsRequest{} } +func (m *GetStatsRequest) String() string { return proto.CompactTextString(m) } +func (*GetStatsRequest) ProtoMessage() {} +func (*GetStatsRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_1fcd0776e05e82a6, []int{35} +} + +func (m *GetStatsRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_GetStatsRequest.Unmarshal(m, b) +} +func (m *GetStatsRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_GetStatsRequest.Marshal(b, m, deterministic) +} +func (m *GetStatsRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_GetStatsRequest.Merge(m, src) +} +func (m *GetStatsRequest) XXX_Size() int { + return xxx_messageInfo_GetStatsRequest.Size(m) +} +func (m *GetStatsRequest) XXX_DiscardUnknown() { + xxx_messageInfo_GetStatsRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_GetStatsRequest proto.InternalMessageInfo + +type GetStatsReply struct { + Stats map[string]uint64 `protobuf:"bytes,1,rep,name=stats,proto3" json:"stats,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"varint,2,opt,name=value,proto3"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *GetStatsReply) Reset() { *m = GetStatsReply{} } +func (m *GetStatsReply) String() string { return proto.CompactTextString(m) } +func (*GetStatsReply) ProtoMessage() {} +func (*GetStatsReply) Descriptor() ([]byte, []int) { + return fileDescriptor_1fcd0776e05e82a6, []int{36} +} + +func (m *GetStatsReply) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_GetStatsReply.Unmarshal(m, b) +} +func (m *GetStatsReply) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_GetStatsReply.Marshal(b, m, deterministic) +} +func (m *GetStatsReply) XXX_Merge(src proto.Message) { + xxx_messageInfo_GetStatsReply.Merge(m, src) +} +func (m *GetStatsReply) XXX_Size() int { + return xxx_messageInfo_GetStatsReply.Size(m) +} +func (m *GetStatsReply) XXX_DiscardUnknown() { + xxx_messageInfo_GetStatsReply.DiscardUnknown(m) +} + +var xxx_messageInfo_GetStatsReply proto.InternalMessageInfo + +func (m *GetStatsReply) GetStats() map[string]uint64 { + if m != nil { + return m.Stats + } + return nil +} + +type SetKvStateReply struct { + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *SetKvStateReply) Reset() { *m = SetKvStateReply{} } +func (m *SetKvStateReply) String() string { return proto.CompactTextString(m) } +func (*SetKvStateReply) ProtoMessage() {} +func (*SetKvStateReply) Descriptor() ([]byte, []int) { + return fileDescriptor_1fcd0776e05e82a6, []int{37} +} + +func (m *SetKvStateReply) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_SetKvStateReply.Unmarshal(m, b) +} +func (m *SetKvStateReply) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_SetKvStateReply.Marshal(b, m, deterministic) +} +func (m *SetKvStateReply) XXX_Merge(src proto.Message) { + xxx_messageInfo_SetKvStateReply.Merge(m, src) +} +func (m *SetKvStateReply) XXX_Size() int { + return xxx_messageInfo_SetKvStateReply.Size(m) +} +func (m *SetKvStateReply) XXX_DiscardUnknown() { + xxx_messageInfo_SetKvStateReply.DiscardUnknown(m) +} + +var xxx_messageInfo_SetKvStateReply proto.InternalMessageInfo + +type GetKvStateRequest struct { + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *GetKvStateRequest) Reset() { *m = GetKvStateRequest{} } +func (m *GetKvStateRequest) String() string { return proto.CompactTextString(m) } +func (*GetKvStateRequest) ProtoMessage() {} +func (*GetKvStateRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_1fcd0776e05e82a6, []int{38} +} + +func (m *GetKvStateRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_GetKvStateRequest.Unmarshal(m, b) +} +func (m *GetKvStateRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_GetKvStateRequest.Marshal(b, m, deterministic) +} +func (m *GetKvStateRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_GetKvStateRequest.Merge(m, src) +} +func (m *GetKvStateRequest) XXX_Size() int { + return xxx_messageInfo_GetKvStateRequest.Size(m) +} +func (m *GetKvStateRequest) XXX_DiscardUnknown() { + xxx_messageInfo_GetKvStateRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_GetKvStateRequest proto.InternalMessageInfo + +type KvState struct { + IsClean bool `protobuf:"varint,1,opt,name=isClean,proto3" json:"isClean,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *KvState) Reset() { *m = KvState{} } +func (m *KvState) String() string { return proto.CompactTextString(m) } +func (*KvState) ProtoMessage() {} +func (*KvState) Descriptor() ([]byte, []int) { + return fileDescriptor_1fcd0776e05e82a6, []int{39} +} + +func (m *KvState) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_KvState.Unmarshal(m, b) +} +func (m *KvState) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_KvState.Marshal(b, m, deterministic) +} +func (m *KvState) XXX_Merge(src proto.Message) { + xxx_messageInfo_KvState.Merge(m, src) +} +func (m *KvState) XXX_Size() int { + return xxx_messageInfo_KvState.Size(m) +} +func (m *KvState) XXX_DiscardUnknown() { + xxx_messageInfo_KvState.DiscardUnknown(m) +} + +var xxx_messageInfo_KvState proto.InternalMessageInfo + +func (m *KvState) GetIsClean() bool { + if m != nil { + return m.IsClean + } + return false +} + +// Generic messages +type Volume struct { + VolumeIndex uint32 `protobuf:"varint,1,opt,name=volume_index,json=volumeIndex,proto3" json:"volume_index,omitempty"` + VolumeType VolumeType `protobuf:"varint,2,opt,name=volume_type,json=volumeType,proto3,enum=filemgr.VolumeType" json:"volume_type,omitempty"` + VolumeState uint32 `protobuf:"varint,3,opt,name=volume_state,json=volumeState,proto3" json:"volume_state,omitempty"` + Partition uint32 `protobuf:"varint,4,opt,name=partition,proto3" json:"partition,omitempty"` + NextOffset uint64 `protobuf:"varint,5,opt,name=next_offset,json=nextOffset,proto3" json:"next_offset,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *Volume) Reset() { *m = Volume{} } +func (m *Volume) String() string { return proto.CompactTextString(m) } +func (*Volume) ProtoMessage() {} +func (*Volume) Descriptor() ([]byte, []int) { + return fileDescriptor_1fcd0776e05e82a6, []int{40} +} + +func (m *Volume) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_Volume.Unmarshal(m, b) +} +func (m *Volume) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_Volume.Marshal(b, m, deterministic) +} +func (m *Volume) XXX_Merge(src proto.Message) { + xxx_messageInfo_Volume.Merge(m, src) +} +func (m *Volume) XXX_Size() int { + return xxx_messageInfo_Volume.Size(m) +} +func (m *Volume) XXX_DiscardUnknown() { + xxx_messageInfo_Volume.DiscardUnknown(m) +} + +var xxx_messageInfo_Volume proto.InternalMessageInfo + +func (m *Volume) GetVolumeIndex() uint32 { + if m != nil { + return m.VolumeIndex + } + return 0 +} + +func (m *Volume) GetVolumeType() VolumeType { + if m != nil { + return m.VolumeType + } + return VolumeType_VOLUME_DEFAULT +} + +func (m *Volume) GetVolumeState() uint32 { + if m != nil { + return m.VolumeState + } + return 0 +} + +func (m *Volume) GetPartition() uint32 { + if m != nil { + return m.Partition + } + return 0 +} + +func (m *Volume) GetNextOffset() uint64 { + if m != nil { + return m.NextOffset + } + return 0 +} + +type Object struct { + Name []byte `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` + VolumeIndex uint32 `protobuf:"varint,2,opt,name=volume_index,json=volumeIndex,proto3" json:"volume_index,omitempty"` + Offset uint64 `protobuf:"varint,3,opt,name=offset,proto3" json:"offset,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *Object) Reset() { *m = Object{} } +func (m *Object) String() string { return proto.CompactTextString(m) } +func (*Object) ProtoMessage() {} +func (*Object) Descriptor() ([]byte, []int) { + return fileDescriptor_1fcd0776e05e82a6, []int{41} +} + +func (m *Object) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_Object.Unmarshal(m, b) +} +func (m *Object) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_Object.Marshal(b, m, deterministic) +} +func (m *Object) XXX_Merge(src proto.Message) { + xxx_messageInfo_Object.Merge(m, src) +} +func (m *Object) XXX_Size() int { + return xxx_messageInfo_Object.Size(m) +} +func (m *Object) XXX_DiscardUnknown() { + xxx_messageInfo_Object.DiscardUnknown(m) +} + +var xxx_messageInfo_Object proto.InternalMessageInfo + +func (m *Object) GetName() []byte { + if m != nil { + return m.Name + } + return nil +} + +func (m *Object) GetVolumeIndex() uint32 { + if m != nil { + return m.VolumeIndex + } + return 0 +} + +func (m *Object) GetOffset() uint64 { + if m != nil { + return m.Offset + } + return 0 +} + +type QuarantinedObjectName struct { + Name []byte `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *QuarantinedObjectName) Reset() { *m = QuarantinedObjectName{} } +func (m *QuarantinedObjectName) String() string { return proto.CompactTextString(m) } +func (*QuarantinedObjectName) ProtoMessage() {} +func (*QuarantinedObjectName) Descriptor() ([]byte, []int) { + return fileDescriptor_1fcd0776e05e82a6, []int{42} +} + +func (m *QuarantinedObjectName) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_QuarantinedObjectName.Unmarshal(m, b) +} +func (m *QuarantinedObjectName) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_QuarantinedObjectName.Marshal(b, m, deterministic) +} +func (m *QuarantinedObjectName) XXX_Merge(src proto.Message) { + xxx_messageInfo_QuarantinedObjectName.Merge(m, src) +} +func (m *QuarantinedObjectName) XXX_Size() int { + return xxx_messageInfo_QuarantinedObjectName.Size(m) +} +func (m *QuarantinedObjectName) XXX_DiscardUnknown() { + xxx_messageInfo_QuarantinedObjectName.DiscardUnknown(m) +} + +var xxx_messageInfo_QuarantinedObjectName proto.InternalMessageInfo + +func (m *QuarantinedObjectName) GetName() []byte { + if m != nil { + return m.Name + } + return nil +} + +// For listdir() like functions +type DirEntries struct { + Entry []string `protobuf:"bytes,1,rep,name=entry,proto3" json:"entry,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *DirEntries) Reset() { *m = DirEntries{} } +func (m *DirEntries) String() string { return proto.CompactTextString(m) } +func (*DirEntries) ProtoMessage() {} +func (*DirEntries) Descriptor() ([]byte, []int) { + return fileDescriptor_1fcd0776e05e82a6, []int{43} +} + +func (m *DirEntries) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_DirEntries.Unmarshal(m, b) +} +func (m *DirEntries) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_DirEntries.Marshal(b, m, deterministic) +} +func (m *DirEntries) XXX_Merge(src proto.Message) { + xxx_messageInfo_DirEntries.Merge(m, src) +} +func (m *DirEntries) XXX_Size() int { + return xxx_messageInfo_DirEntries.Size(m) +} +func (m *DirEntries) XXX_DiscardUnknown() { + xxx_messageInfo_DirEntries.DiscardUnknown(m) +} + +var xxx_messageInfo_DirEntries proto.InternalMessageInfo + +func (m *DirEntries) GetEntry() []string { + if m != nil { + return m.Entry + } + return nil +} + +func init() { + proto.RegisterEnum("filemgr.VolumeType", VolumeType_name, VolumeType_value) + proto.RegisterEnum("filemgr.VolumeState", VolumeState_name, VolumeState_value) + proto.RegisterType((*RegisterVolumeRequest)(nil), "filemgr.RegisterVolumeRequest") + proto.RegisterType((*RegisterVolumeReply)(nil), "filemgr.RegisterVolumeReply") + proto.RegisterType((*UnregisterVolumeRequest)(nil), "filemgr.UnregisterVolumeRequest") + proto.RegisterType((*UnregisterVolumeReply)(nil), "filemgr.UnregisterVolumeReply") + proto.RegisterType((*UpdateVolumeStateRequest)(nil), "filemgr.UpdateVolumeStateRequest") + proto.RegisterType((*UpdateVolumeStateReply)(nil), "filemgr.UpdateVolumeStateReply") + proto.RegisterType((*GetVolumeRequest)(nil), "filemgr.GetVolumeRequest") + proto.RegisterType((*GetVolumeReply)(nil), "filemgr.GetVolumeReply") + proto.RegisterType((*ListVolumesRequest)(nil), "filemgr.ListVolumesRequest") + proto.RegisterType((*ListVolumesReply)(nil), "filemgr.ListVolumesReply") + proto.RegisterType((*RegisterObjectRequest)(nil), "filemgr.RegisterObjectRequest") + proto.RegisterType((*RegisterObjectReply)(nil), "filemgr.RegisterObjectReply") + proto.RegisterType((*UnregisterObjectRequest)(nil), "filemgr.UnregisterObjectRequest") + proto.RegisterType((*UnregisterObjectReply)(nil), "filemgr.UnregisterObjectReply") + proto.RegisterType((*RenameObjectRequest)(nil), "filemgr.RenameObjectRequest") + proto.RegisterType((*RenameObjectReply)(nil), "filemgr.RenameObjectReply") + proto.RegisterType((*LoadObjectRequest)(nil), "filemgr.LoadObjectRequest") + proto.RegisterType((*LoadObjectReply)(nil), "filemgr.LoadObjectReply") + proto.RegisterType((*QuarantineObjectRequest)(nil), "filemgr.QuarantineObjectRequest") + proto.RegisterType((*QuarantineObjectReply)(nil), "filemgr.QuarantineObjectReply") + proto.RegisterType((*UnquarantineObjectRequest)(nil), "filemgr.UnquarantineObjectRequest") + proto.RegisterType((*UnquarantineObjectReply)(nil), "filemgr.UnquarantineObjectReply") + proto.RegisterType((*LoadObjectsByPrefixRequest)(nil), "filemgr.LoadObjectsByPrefixRequest") + proto.RegisterType((*LoadObjectsByPrefixReply)(nil), "filemgr.LoadObjectsByPrefixReply") + proto.RegisterType((*LoadObjectsByVolumeRequest)(nil), "filemgr.LoadObjectsByVolumeRequest") + proto.RegisterType((*LoadObjectsByVolumeReply)(nil), "filemgr.LoadObjectsByVolumeReply") + proto.RegisterType((*ListPartitionsRequest)(nil), "filemgr.ListPartitionsRequest") + proto.RegisterType((*ListPartitionRequest)(nil), "filemgr.ListPartitionRequest") + proto.RegisterType((*ListSuffixRequest)(nil), "filemgr.ListSuffixRequest") + proto.RegisterType((*ListQuarantinedOHashesRequest)(nil), "filemgr.ListQuarantinedOHashesRequest") + proto.RegisterType((*ListQuarantinedOHashesReply)(nil), "filemgr.ListQuarantinedOHashesReply") + proto.RegisterType((*ListQuarantinedOHashRequest)(nil), "filemgr.ListQuarantinedOHashRequest") + proto.RegisterType((*ListQuarantinedOHashReply)(nil), "filemgr.ListQuarantinedOHashReply") + proto.RegisterType((*GetNextOffsetRequest)(nil), "filemgr.GetNextOffsetRequest") + proto.RegisterType((*GetNextOffsetReply)(nil), "filemgr.GetNextOffsetReply") + proto.RegisterType((*GetStatsRequest)(nil), "filemgr.GetStatsRequest") + proto.RegisterType((*GetStatsReply)(nil), "filemgr.GetStatsReply") + proto.RegisterMapType((map[string]uint64)(nil), "filemgr.GetStatsReply.StatsEntry") + proto.RegisterType((*SetKvStateReply)(nil), "filemgr.SetKvStateReply") + proto.RegisterType((*GetKvStateRequest)(nil), "filemgr.GetKvStateRequest") + proto.RegisterType((*KvState)(nil), "filemgr.KvState") + proto.RegisterType((*Volume)(nil), "filemgr.Volume") + proto.RegisterType((*Object)(nil), "filemgr.Object") + proto.RegisterType((*QuarantinedObjectName)(nil), "filemgr.QuarantinedObjectName") + proto.RegisterType((*DirEntries)(nil), "filemgr.DirEntries") +} + +func init() { + proto.RegisterFile("fmgr.proto", fileDescriptor_1fcd0776e05e82a6) +} + +var fileDescriptor_1fcd0776e05e82a6 = []byte{ + // 1083 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xd4, 0x57, 0xdd, 0x6e, 0xe3, 0x44, + 0x1b, 0x5e, 0x3b, 0x69, 0xda, 0xbe, 0xe9, 0x8f, 0x33, 0x4d, 0x53, 0x77, 0xfb, 0xed, 0xb7, 0x59, + 0xa3, 0x85, 0x50, 0x50, 0x0f, 0x16, 0x24, 0x2a, 0x24, 0x90, 0xd2, 0xd6, 0x1b, 0x2a, 0xd2, 0x24, + 0x38, 0x4e, 0x17, 0xc1, 0x81, 0x71, 0xe9, 0x24, 0x98, 0x4d, 0x6c, 0xd7, 0x9e, 0x74, 0x9b, 0x15, + 0x12, 0x12, 0x87, 0x5c, 0x09, 0xa7, 0x5c, 0x03, 0x87, 0xdc, 0x07, 0xd7, 0x81, 0x66, 0xc6, 0x4e, + 0xfc, 0x47, 0xd3, 0x88, 0x9c, 0x70, 0xe6, 0xf7, 0xb1, 0xf3, 0x3e, 0xcf, 0xfb, 0x3b, 0x13, 0x80, + 0xfe, 0x68, 0xe0, 0x1d, 0xb9, 0x9e, 0x43, 0x1c, 0xb4, 0xda, 0xb7, 0x86, 0x78, 0x34, 0xf0, 0x94, + 0xbf, 0x04, 0xd8, 0xd5, 0xf0, 0xc0, 0xf2, 0x09, 0xf6, 0x2e, 0x9d, 0xe1, 0x78, 0x84, 0x35, 0x7c, + 0x33, 0xc6, 0x3e, 0x41, 0xff, 0x83, 0x75, 0xd7, 0xf4, 0x88, 0x45, 0x2c, 0xc7, 0x96, 0x85, 0xaa, + 0x50, 0xdb, 0xd4, 0x66, 0x00, 0x7a, 0x0f, 0xf2, 0x64, 0xe2, 0x62, 0x59, 0xac, 0x0a, 0xb5, 0xad, + 0x17, 0x3b, 0x47, 0x81, 0xbf, 0x23, 0xee, 0x43, 0x9f, 0xb8, 0x58, 0x63, 0x1f, 0xa0, 0x67, 0xb0, + 0x71, 0xcb, 0x30, 0xc3, 0xb2, 0xaf, 0xf1, 0x9d, 0x9c, 0x63, 0x9e, 0x8a, 0x1c, 0x3b, 0xa7, 0x10, + 0xaa, 0x40, 0xc1, 0xe9, 0xf7, 0x7d, 0x4c, 0xe4, 0x7c, 0x55, 0xa8, 0xe5, 0xb5, 0xc0, 0x42, 0x87, + 0xb0, 0xe2, 0x13, 0x93, 0x60, 0x79, 0x85, 0x91, 0x94, 0x13, 0x24, 0x5d, 0xfa, 0x4e, 0xe3, 0x9f, + 0xa0, 0xa7, 0x50, 0xf4, 0xb0, 0x6b, 0x5a, 0x9e, 0x41, 0x1c, 0x67, 0x28, 0x17, 0xaa, 0x42, 0x6d, + 0x4d, 0x03, 0x0e, 0xe9, 0x8e, 0x33, 0x54, 0x76, 0x61, 0x27, 0x19, 0xa7, 0x3b, 0x9c, 0x28, 0x1d, + 0xd8, 0xeb, 0xd9, 0x5e, 0x66, 0x02, 0xca, 0xb0, 0xc2, 0x25, 0xf3, 0xe0, 0xb9, 0x91, 0x24, 0x12, + 0x53, 0x44, 0x7b, 0xb0, 0x9b, 0xf6, 0x48, 0xa9, 0x7e, 0x15, 0x40, 0xee, 0xb9, 0xd7, 0x26, 0xc1, + 0x51, 0xfd, 0x01, 0x59, 0x32, 0x4d, 0x42, 0x3a, 0x4d, 0xd3, 0x74, 0x88, 0x0b, 0xa7, 0x23, 0x97, + 0x52, 0x29, 0x43, 0x25, 0x43, 0x0b, 0x95, 0x79, 0x0e, 0x52, 0x03, 0x93, 0xa5, 0xa4, 0xe2, 0x4f, + 0x01, 0xb6, 0x22, 0xbe, 0xdc, 0xe1, 0xe4, 0x21, 0x71, 0x7e, 0x0c, 0x81, 0x69, 0xcc, 0xeb, 0x30, + 0xb8, 0x9d, 0x3e, 0x47, 0x1c, 0xf3, 0x24, 0xc5, 0xfa, 0x8c, 0x85, 0x17, 0xef, 0xe8, 0x7c, 0xb2, + 0xa3, 0x9f, 0x42, 0xd1, 0xc6, 0x77, 0xc4, 0x08, 0x5a, 0x71, 0x85, 0xb5, 0x22, 0x50, 0xa8, 0xcd, + 0x10, 0xe5, 0x27, 0x40, 0x4d, 0xcb, 0x0f, 0xa2, 0xf1, 0x97, 0x3c, 0x26, 0x73, 0x0b, 0xf6, 0x19, + 0x48, 0x31, 0x76, 0x9a, 0xcc, 0xf7, 0x61, 0x95, 0xc7, 0xe7, 0xcb, 0x42, 0x35, 0x57, 0x2b, 0xbe, + 0xd8, 0x4e, 0x10, 0x68, 0xe1, 0x7b, 0xe5, 0xb7, 0xc8, 0x9c, 0xb7, 0xaf, 0x7e, 0xc4, 0xdf, 0x93, + 0x30, 0x00, 0x04, 0x79, 0xdb, 0x1c, 0x61, 0xa6, 0x7d, 0x43, 0x63, 0xcf, 0xa9, 0x2a, 0x89, 0xf7, + 0x0d, 0x6d, 0x2e, 0x36, 0xb4, 0x89, 0x34, 0xe6, 0x93, 0x69, 0x4c, 0x46, 0xba, 0x72, 0xdf, 0xa4, + 0x86, 0x4a, 0x69, 0x5f, 0xb6, 0xa2, 0x93, 0x3a, 0x3f, 0x84, 0xc5, 0xe6, 0x34, 0x4a, 0x84, 0x29, + 0x3f, 0xf5, 0x31, 0x9f, 0x64, 0x1f, 0xd6, 0x6c, 0xfc, 0xc6, 0x60, 0xb8, 0xc8, 0xf0, 0x55, 0x1b, + 0xbf, 0x69, 0x65, 0xf0, 0xa7, 0x0b, 0xba, 0x03, 0xa5, 0x38, 0x0d, 0xe5, 0x76, 0xa0, 0xd4, 0x74, + 0xcc, 0xeb, 0xf9, 0xcc, 0xcf, 0x61, 0xcb, 0xf2, 0x8d, 0x9b, 0xb1, 0xe9, 0x99, 0x36, 0xb1, 0x6c, + 0x7c, 0x1d, 0x44, 0xb8, 0x69, 0xf9, 0x5f, 0xcd, 0xc0, 0xf9, 0x2a, 0xbe, 0x83, 0xed, 0x28, 0x21, + 0xed, 0xaa, 0xe5, 0x36, 0x04, 0xad, 0xdb, 0x4c, 0xd1, 0x72, 0xea, 0x96, 0xf6, 0xc7, 0x57, 0xf9, + 0x7e, 0xcf, 0xbe, 0x59, 0x26, 0xd5, 0x3e, 0x6d, 0xb9, 0x9b, 0x4c, 0xb2, 0x1e, 0x3c, 0x9e, 0xe5, + 0xcd, 0x3f, 0x99, 0x74, 0x3c, 0xdc, 0xb7, 0xee, 0x42, 0xb6, 0x0a, 0x14, 0x5c, 0x06, 0x04, 0x7c, + 0x81, 0x35, 0x9f, 0x51, 0x05, 0x39, 0xd3, 0x6d, 0x30, 0xed, 0x0e, 0xc7, 0x53, 0xd3, 0x1e, 0x28, + 0x0b, 0xdf, 0x2b, 0xbf, 0x0b, 0x09, 0x79, 0x0f, 0x59, 0xe7, 0x55, 0x28, 0xa6, 0xfb, 0x29, 0x0a, + 0xa1, 0x27, 0x00, 0xae, 0x39, 0xc0, 0x06, 0x71, 0x5e, 0x63, 0x9b, 0x95, 0x79, 0x83, 0x2e, 0xbb, + 0x01, 0xd6, 0x29, 0x80, 0x0e, 0x80, 0x19, 0x86, 0x6f, 0xbd, 0xc5, 0xc1, 0x7e, 0x5d, 0xa3, 0x40, + 0xd7, 0x7a, 0x8b, 0xe7, 0x8f, 0xfd, 0x28, 0x11, 0x7a, 0xf4, 0xd4, 0x78, 0x78, 0xe8, 0xe8, 0x5d, + 0xd8, 0x66, 0xfb, 0x27, 0x22, 0x94, 0x4f, 0xe6, 0x26, 0x85, 0x3b, 0xa1, 0x58, 0xe5, 0x73, 0xd8, + 0xa5, 0xfb, 0xb4, 0x13, 0xae, 0xea, 0xe9, 0x42, 0x7f, 0x0e, 0x5b, 0xd3, 0xfd, 0x6d, 0x5c, 0x59, + 0x8c, 0x92, 0x86, 0xb2, 0x39, 0x45, 0x4f, 0x2c, 0xe2, 0x2b, 0xdf, 0x42, 0x39, 0xf6, 0xfb, 0x87, + 0x9d, 0x07, 0x69, 0xe7, 0x62, 0x96, 0x73, 0x17, 0x4a, 0xd4, 0x79, 0x77, 0xdc, 0x8f, 0x34, 0xd5, + 0xfd, 0x9e, 0x2b, 0x50, 0xf0, 0xd9, 0xe7, 0x41, 0xb8, 0x81, 0x95, 0xc1, 0x98, 0xcb, 0x0e, 0xe7, + 0x09, 0x65, 0x8c, 0xec, 0x8e, 0xf6, 0x17, 0xa6, 0xff, 0xc3, 0xec, 0x9c, 0x8b, 0xd7, 0x5e, 0xb8, + 0xb7, 0xf6, 0x62, 0xbc, 0xf6, 0xca, 0xcf, 0x70, 0xf0, 0x4f, 0xce, 0x69, 0x75, 0x8f, 0x93, 0xd5, + 0xfd, 0xff, 0xb4, 0xba, 0xd1, 0x9f, 0xb0, 0x4f, 0xe8, 0x6e, 0x5d, 0xbc, 0xd8, 0x97, 0xd9, 0x02, + 0xfe, 0xf5, 0xb8, 0xbe, 0x84, 0xfd, 0x6c, 0xbf, 0x0b, 0xce, 0xeb, 0x37, 0x50, 0x6e, 0x60, 0xd2, + 0x9a, 0x1e, 0x92, 0x0b, 0xdc, 0x0a, 0xe7, 0x6a, 0xfc, 0x10, 0x50, 0xc2, 0x37, 0x15, 0x37, 0xdb, + 0xd6, 0x42, 0x6c, 0x5b, 0x97, 0x60, 0xbb, 0x81, 0x09, 0xbd, 0x2f, 0x85, 0x95, 0x57, 0x7e, 0x11, + 0x60, 0x73, 0x86, 0xd1, 0x1f, 0x7f, 0xc2, 0x6f, 0xa2, 0x61, 0x5c, 0xcf, 0xa6, 0x71, 0xc5, 0x3e, + 0x3b, 0x62, 0x8f, 0xaa, 0x4d, 0xbc, 0x09, 0xbf, 0x96, 0xfa, 0x8f, 0x8f, 0x01, 0x66, 0x20, 0x92, + 0x20, 0xf7, 0x1a, 0x4f, 0x98, 0x80, 0x75, 0x8d, 0x3e, 0xd2, 0xc5, 0x74, 0x6b, 0x0e, 0xc7, 0xbc, + 0x83, 0xf2, 0x1a, 0x37, 0x3e, 0x15, 0x8f, 0x05, 0xaa, 0xab, 0x8b, 0xc9, 0x97, 0xb7, 0x91, 0x8b, + 0xea, 0x0e, 0x94, 0x1a, 0x11, 0x88, 0x8b, 0x7d, 0x07, 0x56, 0x03, 0x04, 0xc9, 0xb0, 0x6a, 0xf9, + 0xa7, 0x43, 0x6c, 0xf2, 0x76, 0x5d, 0xd3, 0x42, 0x53, 0xf9, 0x43, 0x80, 0x02, 0x5f, 0x2f, 0xff, + 0xe5, 0xfb, 0xe8, 0x2b, 0x28, 0xf0, 0x3e, 0x5a, 0xf6, 0x89, 0xfd, 0x41, 0xf4, 0x84, 0x8d, 0xcc, + 0x5d, 0x16, 0x8f, 0xa2, 0x00, 0x9c, 0x59, 0x1e, 0x2d, 0xa8, 0x85, 0x7d, 0x5a, 0x40, 0x4c, 0x6b, + 0xcb, 0x3a, 0x63, 0x5d, 0xe3, 0xc6, 0x61, 0x0b, 0x60, 0x96, 0x25, 0x84, 0x60, 0xeb, 0xb2, 0xdd, + 0xec, 0x5d, 0xa8, 0xc6, 0x99, 0xfa, 0xb2, 0xde, 0x6b, 0xea, 0xd2, 0x23, 0x54, 0x06, 0x29, 0xc0, + 0xf4, 0xf6, 0xc5, 0x49, 0x57, 0x6f, 0xb7, 0x54, 0x49, 0x40, 0x15, 0x40, 0x01, 0xfa, 0xb5, 0x71, + 0xa6, 0x36, 0x55, 0x5d, 0x35, 0xea, 0xba, 0x24, 0x1e, 0x6a, 0x50, 0x8c, 0xfc, 0x6d, 0x41, 0x1b, + 0xb0, 0xd6, 0xd5, 0xeb, 0xba, 0x6a, 0x68, 0xaf, 0xa4, 0x47, 0x48, 0x86, 0x32, 0xb7, 0x4e, 0xdb, + 0x17, 0x9d, 0xfa, 0xa9, 0x7e, 0xde, 0x6e, 0x19, 0x5d, 0xed, 0x54, 0x12, 0xd0, 0x01, 0xec, 0xa5, + 0xde, 0xe8, 0x75, 0xad, 0xa1, 0xea, 0x92, 0x78, 0x55, 0x60, 0x7f, 0x8c, 0x3f, 0xfa, 0x3b, 0x00, + 0x00, 0xff, 0xff, 0x2f, 0xb4, 0x5f, 0x6a, 0x26, 0x0f, 0x00, 0x00, +} diff --git a/go/swift-rpc-losf/rpc.go b/go/swift-rpc-losf/rpc.go new file mode 100644 index 000000000..622ad8c0e --- /dev/null +++ b/go/swift-rpc-losf/rpc.go @@ -0,0 +1,1642 @@ +// Copyright (c) 2010-2012 OpenStack Foundation +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// RPC functions +// +// TODO: The naming of things is not consistent with the python code. + +package main + +import ( + "bytes" + "context" + "fmt" + "github.com/alecuyer/statsd/v2" + "github.com/golang/protobuf/proto" + "github.com/openstack/swift-rpc-losf/codes" + pb "github.com/openstack/swift-rpc-losf/proto" + "github.com/openstack/swift-rpc-losf/status" + "github.com/sirupsen/logrus" + "io/ioutil" + "net" + "net/http" + "os" + "path" + "strings" + "sync" + "time" +) + +type server struct { + kv KV + httpServer *http.Server + + // DB state (is it in sync with the volumes state) + isClean bool + + diskPath string // full path to mountpoint + diskName string // without the path + socketPath string // full path to the socket + + // statsd used as is done in swift + statsd_c *statsd.Client + + // channel to signal server should stop + stopChan chan os.Signal +} + +// The following consts are used as a key prefix for different types in the KV + +// prefix for "volumes" (large file to which we write objects) +const volumePrefix = 'd' + +// prefix for "objects" ("vfile" in the python code, would be a POSIX file on a regular backend) +const objectPrefix = 'o' + +// This is meant to be used when a new file is created with the same name as an existing file. +// Deprecate this. As discussed in https://review.openstack.org/#/c/162243, overwriting an existing file +// never seemed like a good idea, and was done to mimic the existing renamer() behavior. +// We have no way to know if the new file is "better" than the existing one. +const deleteQueuePrefix = 'q' + +// Quarantined objects +const quarantinePrefix = 'r' + +// stats stored in the KV +const statsPrefix = 's' + +// max key length in ascii format. +const maxObjKeyLen = 96 + +type rpcFunc func(*server, context.Context, *[]byte) (*[]byte, error) + +// RegisterVolume registers a new volume (volume) to the KV, given its index number and starting offset. +// Will return an error if the volume index already exists. +func RegisterVolume(s *server, ctx context.Context, pbIn *[]byte) (*[]byte, error) { + in := &pb.RegisterVolumeRequest{} + if err := proto.Unmarshal(*pbIn, in); err != nil { + logrus.Errorf("RegisterVolume failed to unmarshal input: %v", err) + return nil, status.Errorf(codes.InvalidArgument, "unable to deserialize protobuf") + } + + reqlog := log.WithFields(logrus.Fields{"Function": "RegisterVolume", "Partition": in.Partition, "Type": in.Type, + "VolumeIndex": in.VolumeIndex, "Offset": in.Offset, "State": in.State}) + reqlog.Debug("RPC Call") + + if !in.RepairTool && !s.isClean { + reqlog.Debug("KV out of sync with volumes") + return nil, status.Errorf(codes.FailedPrecondition, "KV out of sync with volumes") + } + + key := EncodeVolumeKey(in.VolumeIndex) + + // Does the volume already exist ? + value, err := s.kv.Get(volumePrefix, key) + if err != nil { + reqlog.Errorf("unable to check for existing volume key: %v", err) + s.statsd_c.Increment("register_volume.fail") + return nil, status.Errorf(codes.Unavailable, "unable to check for existing volume key") + } + + if value != nil { + reqlog.Info("volume index already exists in db") + s.statsd_c.Increment("register_volume.ok") + return nil, status.Errorf(codes.AlreadyExists, "volume index already exists in db") + } + + // Register the volume + usedSpace := int64(0) + value = EncodeVolumeValue(int64(in.Partition), int32(in.Type), int64(in.Offset), usedSpace, int64(in.State)) + + err = s.kv.Put(volumePrefix, key, value) + if err != nil { + reqlog.Errorf("failed to Put new volume entry: %v", err) + s.statsd_c.Increment("register_volume.fail") + return nil, status.Errorf(codes.Unavailable, "unable to register new volume") + } + s.statsd_c.Increment("register_volume.ok") + + out, err := proto.Marshal(&pb.RegisterVolumeReply{}) + if err != nil { + reqlog.Errorf("failed to serialize reply for new volume entry: %v", err) + return nil, status.Errorf(codes.Unavailable, "unable to serialize reply for new volume entry: %v", err) + } + return &out, nil +} + +// UnregisterVolume will delete a volume entry from the kv. +func UnregisterVolume(s *server, ctx context.Context, pbIn *[]byte) (*[]byte, error) { + in := &pb.UnregisterVolumeRequest{} + if err := proto.Unmarshal(*pbIn, in); err != nil { + logrus.Errorf("UnregisterVolume failed to unmarshal input: %v", err) + return nil, status.Errorf(codes.InvalidArgument, "unable to deserialize protobuf") + } + + reqlog := log.WithFields(logrus.Fields{"Function": "UnregisterVolume", "VolumeIndex": in.Index}) + reqlog.Debug("RPC Call") + + if !s.isClean { + reqlog.Debug("KV out of sync with volumes") + return nil, status.Errorf(codes.FailedPrecondition, "KV out of sync with volumes") + } + + key := EncodeVolumeKey(in.Index) + + // Check for key + value, err := s.kv.Get(volumePrefix, key) + if err != nil { + reqlog.Errorf("unable to check for volume key: %v", err) + s.statsd_c.Increment("unregister_volume.fail") + return nil, status.Errorf(codes.Unavailable, "unable to check for volume key") + } + + if value == nil { + reqlog.Info("volume index does not exist in db") + s.statsd_c.Increment("unregister_volume.ok") + return nil, status.Errorf(codes.NotFound, "volume index does not exist in db") + } + + // Key exists, delete it + err = s.kv.Delete(volumePrefix, key) + if err != nil { + reqlog.Errorf("failed to Delete volume entry: %v", err) + s.statsd_c.Increment("unregister_volume.fail") + return nil, status.Errorf(codes.Unavailable, "unable to delete volume entry") + } + + s.statsd_c.Increment("unregister_volume.ok") + return serializePb(&pb.UnregisterVolumeReply{}) +} + +// UpdateVolumeState will modify an existing volume state +func UpdateVolumeState(s *server, ctx context.Context, pbIn *[]byte) (*[]byte, error) { + in := &pb.UpdateVolumeStateRequest{} + if err := proto.Unmarshal(*pbIn, in); err != nil { + logrus.Errorf("UpdateVolumeState failed to unmarshal input: %v", err) + return nil, status.Errorf(codes.InvalidArgument, "unable to deserialize protobuf") + } + + reqlog := log.WithFields(logrus.Fields{"Function": "UpdateVolumeState", "VolumeIndex": in.VolumeIndex, "State": in.State}) + reqlog.Debug("RPC Call") + + if !in.RepairTool && !s.isClean { + reqlog.Debug("KV out of sync with volumes") + return nil, status.Errorf(codes.FailedPrecondition, "KV out of sync with volumes") + } + + key := EncodeVolumeKey(in.VolumeIndex) + value, err := s.kv.Get(volumePrefix, key) + if err != nil { + reqlog.Errorf("unable to retrieve volume key: %v", err) + s.statsd_c.Increment("update_volume_state.fail") + return nil, status.Errorf(codes.Unavailable, "unable to retrieve volume key") + } + + if value == nil { + reqlog.Info("volume index does not exist in db") + s.statsd_c.Increment("update_volume_state.ok") + return nil, status.Errorf(codes.NotFound, "volume index does not exist in db") + } + + partition, dfType, offset, usedSpace, state, err := DecodeVolumeValue(value) + reqlog.WithFields(logrus.Fields{"current_state": state}).Info("updating state") + if err != nil { + reqlog.Errorf("failed to decode Volume value: %v", err) + s.statsd_c.Increment("update_volume_state.fail") + return nil, status.Errorf(codes.Internal, "failed to decode Volume value") + } + + value = EncodeVolumeValue(partition, dfType, offset, usedSpace, int64(in.State)) + err = s.kv.Put(volumePrefix, key, value) + if err != nil { + reqlog.Errorf("failed to Put updated volume entry: %v", err) + s.statsd_c.Increment("update_volume_state.fail") + return nil, status.Errorf(codes.Unavailable, "unable to update volume state") + } + s.statsd_c.Increment("update_volume_state.ok") + + out, err := proto.Marshal(&pb.UpdateVolumeStateReply{}) + if err != nil { + reqlog.Errorf("failed to serialize reply for update volume: %v", err) + return nil, status.Errorf(codes.Unavailable, "unable to serialize reply for update volume: %v", err) + } + return &out, nil +} + +// GetVolume will return a volume information +func GetVolume(s *server, ctx context.Context, pbIn *[]byte) (*[]byte, error) { + in := &pb.GetVolumeRequest{} + if err := proto.Unmarshal(*pbIn, in); err != nil { + logrus.Errorf("GetVolume failed to unmarshal input: %v", err) + return nil, status.Errorf(codes.InvalidArgument, "unable to deserialize protobuf") + } + + reqlog := log.WithFields(logrus.Fields{"Function": "GetVolume", "Volume index": in.Index}) + reqlog.Debug("RPC Call") + + if !in.RepairTool && !s.isClean { + reqlog.Debug("KV out of sync with volumes") + return nil, status.Errorf(codes.FailedPrecondition, "KV out of sync with volumes") + } + + key := EncodeVolumeKey(in.Index) + value, err := s.kv.Get(volumePrefix, key) + if err != nil { + reqlog.Errorf("Failed to get volume key %d in KV: %v", key, err) + s.statsd_c.Increment("get_volume.fail") + return nil, status.Errorf(codes.Internal, "Failed to get volume key in KV") + } + + if value == nil { + reqlog.Info("No such Volume") + s.statsd_c.Increment("get_volume.ok") + return nil, status.Errorf(codes.NotFound, "No such Volume") + } + + partition, dfType, nextOffset, _, state, err := DecodeVolumeValue(value) + if err != nil { + reqlog.Errorf("Failed to decode Volume value: %v", err) + s.statsd_c.Increment("get_volume.fail") + return nil, status.Errorf(codes.Internal, "Failed to decode Volume value") + } + + s.statsd_c.Increment("get_volume.ok") + + pb_volume := pb.GetVolumeReply{VolumeIndex: in.Index, VolumeType: pb.VolumeType(dfType), VolumeState: uint32(state), + Partition: uint32(partition), NextOffset: uint64(nextOffset)} + out, err := proto.Marshal(&pb_volume) + if err != nil { + reqlog.Errorf("failed to serialize reply for get volume: %v", err) + return nil, status.Errorf(codes.Unavailable, "unable to serialize reply for get volume: %v", err) + } + return &out, nil +} + +// ListVolumes will return all volumes of the given type, for the given partition. +// If GetlAllVolumes is true, all volumes are listed (all types, all partitions) +// Currently this scans all volumes in the KV. Likely fast enough as long as the KV is cached. +// If it becomes a performance issue, we may want to add an in-memory cache indexed by partition. +func ListVolumes(s *server, ctx context.Context, pbIn *[]byte) (*[]byte, error) { + in := &pb.ListVolumesRequest{} + if err := proto.Unmarshal(*pbIn, in); err != nil { + logrus.Errorf("ListVolumes failed to unmarshal input: %v", err) + return nil, status.Errorf(codes.InvalidArgument, "unable to deserialize protobuf") + } + + reqlog := log.WithFields(logrus.Fields{"Function": "ListVolumes", "Partition": in.Partition, "Type": in.Type}) + reqlog.Debug("RPC Call") + + if !in.RepairTool && !s.isClean { + reqlog.Debug("KV out of sync with volumes") + return nil, status.Errorf(codes.FailedPrecondition, "KV out of sync with volumes") + } + + response := &pb.ListVolumesReply{} + + // Iterate over volumes and return the ones that match the request + it := s.kv.NewIterator(volumePrefix) + defer it.Close() + + for it.SeekToFirst(); it.Valid(); it.Next() { + idx, err := DecodeVolumeKey(it.Key()) + if err != nil { + reqlog.Errorf("failed to decode volume key: %v", err) + s.statsd_c.Increment("list_volumes.fail") + return nil, status.Errorf(codes.Internal, "unable to decode volume value") + } + + partition, dfType, nextOffset, _, state, err := DecodeVolumeValue(it.Value()) + if err != nil { + reqlog.Errorf("failed to decode volume value: %v", err) + s.statsd_c.Increment("list_volumes.fail") + return nil, status.Errorf(codes.Internal, "unable to decode volume value") + } + if uint32(partition) == in.Partition && pb.VolumeType(dfType) == in.Type { + response.Volumes = append(response.Volumes, &pb.Volume{VolumeIndex: idx, + VolumeType: pb.VolumeType(in.Type), VolumeState: uint32(state), + Partition: uint32(partition), NextOffset: uint64(nextOffset)}) + } + } + + s.statsd_c.Increment("list_volumes.ok") + out, err := proto.Marshal(response) + if err != nil { + reqlog.Errorf("failed to serialize reply for list volumes: %v", err) + return nil, status.Errorf(codes.Unavailable, "unable to serialize reply for list volumes: %v", err) + } + return &out, nil +} + +// RegisterObject registers a new object to the kv. +func RegisterObject(s *server, ctx context.Context, pbIn *[]byte) (*[]byte, error) { + in := &pb.RegisterObjectRequest{} + if err := proto.Unmarshal(*pbIn, in); err != nil { + logrus.Errorf("RegisterObject failed to unmarshal input: %v", err) + return nil, status.Errorf(codes.InvalidArgument, "unable to deserialize protobuf") + } + + reqlog := log.WithFields(logrus.Fields{ + "Function": "RegisterObject", + "Name": fmt.Sprintf("%s", in.Name), + "DiskPath": s.diskPath, + "VolumeIndex": in.VolumeIndex, + "Offset": in.Offset, + "NextOffset": in.NextOffset, + "Length": in.NextOffset - in.Offset, // debug + }) + reqlog.Debug("RPC Call") + + if !in.RepairTool && !s.isClean { + reqlog.Debug("KV out of sync with volumes") + return nil, status.Errorf(codes.FailedPrecondition, "KV out of sync with volumes") + } + + // Check if volume exists + volumeKey := EncodeVolumeKey(in.VolumeIndex) + volumeValue, err := s.kv.Get(volumePrefix, volumeKey) + if err != nil { + reqlog.Errorf("unable to check for existing volume key: %v", err) + s.statsd_c.Increment("register_object.fail") + return nil, status.Errorf(codes.Unavailable, "unable to check for existing volume key") + } + + if volumeValue == nil { + reqlog.Info("volume index does not exist in db") + s.statsd_c.Increment("register_object.ok") + return nil, status.Errorf(codes.FailedPrecondition, "volume index does not exist in db") + } + + partition, volumeType, _, currentUsedSpace, state, err := DecodeVolumeValue(volumeValue) + + objectKey, err := EncodeObjectKey(in.Name) + if err != nil { + reqlog.Errorf("unable to encode object key: %v", err) + s.statsd_c.Increment("register_object.fail") + return nil, status.Errorf(codes.Unavailable, "unable to encode object key") + } + + objectValue := EncodeObjectValue(in.VolumeIndex, in.Offset) + + // If an object exists with the same name, we need to move it to the delete queue before overwriting the key. + // On the regular file backend, this would happen automatically with the rename operation. In our case, + // we would leak space. (The space will be reclaimed on compaction, but it shouldn't happen). + + var objMutex = &sync.Mutex{} + objMutex.Lock() + + existingValue, err := s.kv.Get(objectPrefix, objectKey) + if err != nil { + reqlog.Errorf("unable to check for existing object: %v", err) + s.statsd_c.Increment("register_object.fail") + return nil, status.Errorf(codes.Unavailable, "unable to retrieve object") + } + + if existingValue != nil { + reqlog.Info("object already exists") + s.statsd_c.Increment("register_object.ok") + return nil, status.Errorf(codes.AlreadyExists, "object already exists") + } + + // Update volume offset + volumeNewValue := EncodeVolumeValue(int64(partition), volumeType, int64(in.NextOffset), int64(currentUsedSpace), state) + wb := s.kv.NewWriteBatch() + defer wb.Close() + wb.Put(volumePrefix, volumeKey, volumeNewValue) + wb.Put(objectPrefix, objectKey, objectValue) + + err = wb.Commit() + if err != nil { + reqlog.Errorf("failed to Put new volume value and new object entry: %v", err) + s.statsd_c.Increment("register_object.fail") + return nil, status.Errorf(codes.Unavailable, "unable to update volume and register new object") + } + objMutex.Unlock() + + s.statsd_c.Increment("register_object.ok") + + out, err := proto.Marshal(&pb.RegisterObjectReply{}) + if err != nil { + reqlog.Errorf("failed to serialize reply: %v", err) + return nil, status.Errorf(codes.Unavailable, "unable to serialize reply: %v", err) + } + return &out, nil +} + +// UnregisterObject removes an an object entry from the kv. +func UnregisterObject(s *server, ctx context.Context, pbIn *[]byte) (*[]byte, error) { + in := &pb.UnregisterObjectRequest{} + if err := proto.Unmarshal(*pbIn, in); err != nil { + logrus.Errorf("UnregisterObject failed to unmarshal input: %v", err) + return nil, status.Errorf(codes.InvalidArgument, "unable to deserialize protobuf") + } + reqlog := log.WithFields(logrus.Fields{ + "Function": "UnregisterObject", + "Name": fmt.Sprintf("%s", in.Name), + "DiskPath": s.diskPath, + }) + reqlog.Debug("RPC Call") + + if !in.RepairTool && !s.isClean { + reqlog.Debug("KV out of sync with volumes") + return nil, status.Errorf(codes.FailedPrecondition, "KV out of sync with volumes") + } + + objectKey, err := EncodeObjectKey(in.Name) + if err != nil { + reqlog.Errorf("unable to encode object key: %v", err) + s.statsd_c.Increment("unregister_object.fail") + return nil, status.Errorf(codes.Unavailable, "unable to encode object key") + } + + value, err := s.kv.Get(objectPrefix, objectKey) + if err != nil { + reqlog.Errorf("unable to retrieve object: %v", err) + s.statsd_c.Increment("unregister_object.fail") + return nil, status.Errorf(codes.Unavailable, "unable to retrieve object") + } + + if value == nil { + reqlog.Debug("object does not exist") + s.statsd_c.Increment("unregister_object.ok") + return nil, status.Errorf(codes.NotFound, "%s", in.Name) + } + + // Delete key + err = s.kv.Delete(objectPrefix, objectKey) + if err != nil { + reqlog.Errorf("failed to Delete key: %v", err) + s.statsd_c.Increment("unregister_object.fail") + return nil, status.Errorf(codes.Unavailable, "unable to unregister object") + } + + s.statsd_c.Increment("unregister_object.ok") + out, err := proto.Marshal(&pb.UnregisterObjectReply{}) + if err != nil { + reqlog.Errorf("failed to serialize reply for del object reply: %v", err) + return nil, status.Errorf(codes.Unavailable, "unable to serialize reply for del object reply: %v", err) + } + return &out, nil +} + +// RenameObject changes an object key in the kv. (used for erasure code) +func RenameObject(s *server, ctx context.Context, pbIn *[]byte) (*[]byte, error) { + in := &pb.RenameObjectRequest{} + if err := proto.Unmarshal(*pbIn, in); err != nil { + logrus.Errorf("failed to unmarshal input: %v", err) + return nil, status.Errorf(codes.InvalidArgument, "unable to deserialize protobuf") + } + + reqlog := log.WithFields(logrus.Fields{ + "Function": "RenameObject", + "Name": fmt.Sprintf("%s", in.Name), + "NewName": fmt.Sprintf("%s", in.NewName), + }) + reqlog.Debug("RPC Call") + + if !in.RepairTool && !s.isClean { + reqlog.Debug("KV out of sync with volumes") + return nil, status.Errorf(codes.FailedPrecondition, "KV out of sync with volumes") + } + + objectKey, err := EncodeObjectKey(in.Name) + if err != nil { + reqlog.Errorf("unable to encode object key: %v", err) + s.statsd_c.Increment("rename_object.fail") + return nil, status.Errorf(codes.Unavailable, "unable to encode object key") + } + + objectNewKey, err := EncodeObjectKey(in.NewName) + if err != nil { + reqlog.Errorf("unable to encode new object key: %v", err) + s.statsd_c.Increment("rename_object.fail") + return nil, status.Errorf(codes.Unavailable, "unable to encode object key") + } + + value, err := s.kv.Get(objectPrefix, objectKey) + if err != nil { + reqlog.Errorf("unable to retrieve object: %v", err) + s.statsd_c.Increment("rename_object.fail") + return nil, status.Errorf(codes.Unavailable, "unable to retrieve object") + } + + if value == nil { + reqlog.Debug("object does not exist") + s.statsd_c.Increment("rename_object.ok") + return nil, status.Errorf(codes.NotFound, "%s", in.Name) + } + + // Delete old entry and create a new one + wb := s.kv.NewWriteBatch() + defer wb.Close() + wb.Delete(objectPrefix, objectKey) + wb.Put(objectPrefix, objectNewKey, value) + + err = wb.Commit() + if err != nil { + reqlog.Errorf("failed to commit WriteBatch for rename: %v", err) + s.statsd_c.Increment("rename_object.fail") + return nil, status.Errorf(codes.Unavailable, "failed to commit WriteBatch for rename") + } + + s.statsd_c.Increment("rename_object.ok") + + out, err := proto.Marshal(&pb.RenameObjectReply{}) + if err != nil { + reqlog.Errorf("failed to serialize reply: %v", err) + return nil, status.Errorf(codes.Unavailable, "unable to serialize reply: %v", err) + } + return &out, nil +} + +// LoadObject returns an object information +func LoadObject(s *server, ctx context.Context, pbIn *[]byte) (*[]byte, error) { + in := &pb.LoadObjectRequest{} + if err := proto.Unmarshal(*pbIn, in); err != nil { + logrus.Errorf("failed to unmarshal input: %v", err) + return nil, status.Errorf(codes.InvalidArgument, "unable to deserialize protobuf") + } + + reqlog := log.WithFields(logrus.Fields{ + "Function": "LoadObject", + "Name": fmt.Sprintf("%s", in.Name), + "IsQuarantined": fmt.Sprintf("%t", in.IsQuarantined), + }) + reqlog.Debug("RPC Call") + + var prefix byte + + if !in.RepairTool && !s.isClean { + reqlog.Debug("KV out of sync with volumes") + return nil, status.Errorf(codes.FailedPrecondition, "KV out of sync with volumes") + } + + objectKey, err := EncodeObjectKey(in.Name) + if err != nil { + reqlog.Errorf("unable to encode object key: %v", err) + s.statsd_c.Increment("load_object.fail") + return nil, status.Errorf(codes.Unavailable, "unable to encode object key") + } + + if in.IsQuarantined { + prefix = quarantinePrefix + } else { + prefix = objectPrefix + } + reqlog.Debugf("is quarantined: %v", in.IsQuarantined) + value, err := s.kv.Get(prefix, objectKey) + if err != nil { + reqlog.Errorf("unable to retrieve object: %v", err) + s.statsd_c.Increment("load_object.fail") + return nil, status.Errorf(codes.Unavailable, "unable to retrieve object") + } + + if value == nil { + reqlog.Debug("object does not exist") + s.statsd_c.Increment("load_object.ok") + return nil, status.Errorf(codes.NotFound, "%s", in.Name) + } + + volumeIndex, offset, err := DecodeObjectValue(value) + if err != nil { + reqlog.Errorf("failed to decode object value: %v", err) + s.statsd_c.Increment("load_object.fail") + return nil, status.Errorf(codes.Internal, "unable to read object") + } + + s.statsd_c.Increment("load_object.ok") + + out, err := proto.Marshal(&pb.LoadObjectReply{Name: in.Name, VolumeIndex: volumeIndex, Offset: offset}) + if err != nil { + reqlog.Errorf("failed to serialize reply: %v", err) + return nil, status.Errorf(codes.Unavailable, "unable to serialize reply: %v", err) + } + return &out, nil +} + +// QuarantineObject +func QuarantineObject(s *server, ctx context.Context, pbIn *[]byte) (*[]byte, error) { + in := &pb.QuarantineObjectRequest{} + if err := proto.Unmarshal(*pbIn, in); err != nil { + logrus.Errorf("failed to unmarshal input: %v", err) + return nil, status.Errorf(codes.InvalidArgument, "unable to deserialize protobuf") + } + + reqlog := log.WithFields(logrus.Fields{ + "Function": "QuarantineObject", + "Name": fmt.Sprintf("%s", in.Name), + }) + reqlog.Debug("RPC Call") + + if !s.isClean { + reqlog.Debug("KV out of sync with volumes") + return nil, status.Errorf(codes.FailedPrecondition, "KV out of sync with volumes") + } + + objectKey, err := EncodeObjectKey(in.Name) + if err != nil { + reqlog.Errorf("unable to encode object key: %v", err) + s.statsd_c.Increment("quarantine_object.fail") + return nil, status.Errorf(codes.Unavailable, "unable to encode object key") + } + + value, err := s.kv.Get(objectPrefix, objectKey) + if err != nil { + reqlog.Errorf("unable to retrieve object: %v", err) + s.statsd_c.Increment("quarantine_object.fail") + return nil, status.Errorf(codes.Unavailable, "unable to retrieve object") + } + + if value == nil { + reqlog.Debug("object does not exist") + s.statsd_c.Increment("quarantine_object.ok") + return nil, status.Errorf(codes.NotFound, "%s", in.Name) + } + + // Add quarantine key, delete obj key + wb := s.kv.NewWriteBatch() + defer wb.Close() + // TODO: check here if an ohash already exists with the same name. Put files in the same dir, or make a new one ? (current swift code + // appears to add an extension in that case. This will require a new format (encode/decode) in the KV) + // Also check if full key already exists. + wb.Put(quarantinePrefix, objectKey, value) + wb.Delete(objectPrefix, objectKey) + err = wb.Commit() + if err != nil { + reqlog.Errorf("failed to quarantine object: %v", err) + s.statsd_c.Increment("quarantine_object.fail") + return nil, status.Error(codes.Unavailable, "unable to quarantine object") + } + + s.statsd_c.Increment("quarantine_object.ok") + + out, err := proto.Marshal(&pb.QuarantineObjectReply{}) + if err != nil { + reqlog.Errorf("failed to serialize reply: %v", err) + return nil, status.Errorf(codes.Unavailable, "unable to serialize reply: %v", err) + } + return &out, nil +} + +// UnquarantineObject +func UnquarantineObject(s *server, ctx context.Context, pbIn *[]byte) (*[]byte, error) { + in := &pb.UnquarantineObjectRequest{} + if err := proto.Unmarshal(*pbIn, in); err != nil { + logrus.Errorf("failed to unmarshal input: %v", err) + return nil, status.Errorf(codes.InvalidArgument, "unable to deserialize protobuf") + } + + reqlog := log.WithFields(logrus.Fields{ + "Function": "UnquarantineObject", + "Name": fmt.Sprintf("%s", in.Name), + }) + reqlog.Debug("RPC Call") + + if !s.isClean { + reqlog.Debug("KV out of sync with volumes") + return nil, status.Errorf(codes.FailedPrecondition, "KV out of sync with volumes") + } + + objectKey, err := EncodeObjectKey(in.Name) + if err != nil { + reqlog.Errorf("unable to encode object key: %v", err) + s.statsd_c.Increment("unquarantine_object.fail") + return nil, status.Errorf(codes.Unavailable, "unable to encode object key") + } + + value, err := s.kv.Get(quarantinePrefix, objectKey) + if err != nil { + reqlog.Errorf("unable to retrieve object: %v", err) + s.statsd_c.Increment("unquarantine_object.fail") + return nil, status.Errorf(codes.Unavailable, "unable to retrieve object") + } + + if value == nil { + reqlog.Debug("object does not exist") + s.statsd_c.Increment("unquarantine_object.ok") + return nil, status.Errorf(codes.NotFound, "%s", in.Name) + } + + // Add object key, delete quarantine key + wb := s.kv.NewWriteBatch() + defer wb.Close() + wb.Put(objectPrefix, objectKey, value) + wb.Delete(quarantinePrefix, objectKey) + err = wb.Commit() + if err != nil { + reqlog.Errorf("failed to unquarantine object: %v", err) + s.statsd_c.Increment("unquarantine_object.fail") + return nil, status.Error(codes.Unavailable, "unable to unquarantine object") + } + + s.statsd_c.Increment("unquarantine_object.ok") + + out, err := proto.Marshal(&pb.UnquarantineObjectReply{}) + if err != nil { + reqlog.Errorf("failed to serialize reply: %v", err) + return nil, status.Errorf(codes.Unavailable, "unable to serialize reply: %v", err) + } + return &out, nil +} + +// LoadObjectsByPrefix returns list of objects with the given prefix. +// In practice this is used to emulate the object hash directory that swift +// would create with the regular diskfile backend. +func LoadObjectsByPrefix(s *server, ctx context.Context, pbIn *[]byte) (*[]byte, error) { + in := &pb.LoadObjectsByPrefixRequest{} + if err := proto.Unmarshal(*pbIn, in); err != nil { + logrus.Errorf("failed to unmarshal input: %v", err) + return nil, status.Errorf(codes.InvalidArgument, "unable to deserialize protobuf") + } + + reqlog := log.WithFields(logrus.Fields{ + "Function": "LoadObjectsByPrefix", + "Prefix": fmt.Sprintf("%s", in.Prefix), + }) + reqlog.Debug("RPC Call") + + if !in.RepairTool && !s.isClean { + reqlog.Debug("KV out of sync with volumes") + return nil, status.Errorf(codes.FailedPrecondition, "KV out of sync with volumes") + } + + // prefix must be 32 characters for this to work (because we now encode the md5 hash, see + // EncodeObjectKey in encoding.go + if len(in.Prefix) != 32 { + reqlog.Error("prefix len != 32") + s.statsd_c.Increment("load_objects_by_prefix.fail") + return nil, status.Errorf(codes.Internal, "prefix len != 32") + } + + prefix, err := EncodeObjectKey(in.Prefix) + if err != nil { + reqlog.Errorf("unable to encode object prefix: %v", err) + s.statsd_c.Increment("load_objects_by_prefix.fail") + return nil, status.Errorf(codes.Unavailable, "unable to encode object prefix") + } + + it := s.kv.NewIterator(objectPrefix) + defer it.Close() + + response := &pb.LoadObjectsByPrefixReply{} + + // adds one byte because of prefix. Otherwise len(prefix) would be len(prefix)-1 + for it.Seek(prefix); it.Valid() && len(prefix) <= len(it.Key()) && bytes.Equal(prefix, it.Key()[:len(prefix)]); it.Next() { + + // Decode value + volumeIndex, offset, err := DecodeObjectValue(it.Value()) + if err != nil { + reqlog.Errorf("failed to decode object value: %v", err) + s.statsd_c.Increment("load_objects_by_prefix.fail") + return nil, status.Errorf(codes.Internal, "unable to read object") + } + + key := make([]byte, 32+len(it.Key()[16:])) + err = DecodeObjectKey(it.Key(), key) + if err != nil { + reqlog.Errorf("failed to decode object key: %v", err) + s.statsd_c.Increment("load_objects_by_prefix.fail") + return nil, status.Errorf(codes.Internal, "unable to decode object key") + } + response.Objects = append(response.Objects, &pb.Object{Name: key, VolumeIndex: volumeIndex, Offset: offset}) + } + + s.statsd_c.Increment("load_objects_by_prefix.ok") + + return serializePb(response) +} + +// LoadObjectsByVolume returns a list of all objects within a volume, with pagination. +// Quarantined, if true, will return only quarantined objects, if false, non-quarantined objects. +// PageToken is the object name to start from, as returned from a previous call in the +// NextPageToken field. If empty, the iterator will start from the first objects in the volume. +// PageSize is the maximum count of items to return. If zero, the server will pick a reasonnable limit. +// func (s *server) LoadObjectsByVolume(in *pb.VolumeIndex, stream pb.FileMgr_LoadObjectsByVolumeServer) error { +func LoadObjectsByVolume(s *server, ctx context.Context, pbIn *[]byte) (*[]byte, error) { + in := &pb.LoadObjectsByVolumeRequest{} + if err := proto.Unmarshal(*pbIn, in); err != nil { + logrus.Errorf("failed to unmarshal input: %v", err) + return nil, status.Errorf(codes.InvalidArgument, "unable to deserialize protobuf") + } + + reqlog := log.WithFields(logrus.Fields{ + "Function": "LoadObjectsByVolume", + "VolumeIndex": in.Index, + "PageToken": in.PageToken, + "PageSize": in.PageSize, + }) + reqlog.Debug("RPC Call") + + if !in.RepairTool && !s.isClean { + reqlog.Debug("KV out of sync with volumes") + return nil, status.Errorf(codes.FailedPrecondition, "KV out of sync with volumes") + } + + limit := in.PageSize + if limit == 0 { + reqlog.Debug("page_size was not specified, set it to 10000") + limit = 10000 + } + + pageToken := make([]byte, len(in.PageToken)) + pageToken = in.PageToken + if bytes.Equal(pageToken, []byte("")) { + pageToken = []byte(strings.Repeat("0", 32)) + } + + prefix, err := EncodeObjectKey(pageToken) + if err != nil { + reqlog.Errorf("unable to encode object prefix: %v", err) + s.statsd_c.Increment("load_objects_by_volume.fail") + return nil, status.Errorf(codes.Internal, "unable to encode object prefix") + } + + // Return either quarantined or "regular" objects + var it Iterator + if in.Quarantined { + it = s.kv.NewIterator(quarantinePrefix) + } else { + it = s.kv.NewIterator(objectPrefix) + } + defer it.Close() + + response := &pb.LoadObjectsByVolumeReply{} + + // Objects are not indexed by volume. We have to scan the whole KV and examine each value. + // It shouldn't matter as this is only used for compaction, and each object will have to be copied. + // Disk activity dwarfs CPU usage. (for spinning rust anyway, but SSDs?) + count := uint32(0) + for it.Seek(prefix); it.Valid() && count < limit; it.Next() { + volumeIndex, offset, err := DecodeObjectValue(it.Value()) + if err != nil { + reqlog.Errorf("failed to decode object value: %v", err) + s.statsd_c.Increment("load_objects_by_volume.fail") + return nil, status.Errorf(codes.Internal, "unable to read object") + } + + if volumeIndex == in.Index { + key := make([]byte, 32+len(it.Key()[16:])) + err = DecodeObjectKey(it.Key(), key) + if err != nil { + reqlog.Errorf("failed to decode object key: %v", err) + s.statsd_c.Increment("load_objects_by_prefix.fail") + return nil, status.Errorf(codes.Internal, "unable to decode object key") + } + response.Objects = append(response.Objects, &pb.Object{Name: key, VolumeIndex: volumeIndex, Offset: offset}) + count++ + } + } + + // Set NextPageToken if there is at least one ohash found in the same volume + for ; it.Valid(); it.Next() { + volumeIndex, _, err := DecodeObjectValue(it.Value()) + if err != nil { + reqlog.Errorf("failed to decode object value: %v", err) + s.statsd_c.Increment("load_objects_by_volume.fail") + return nil, status.Errorf(codes.Internal, "unable to read object") + } + + if volumeIndex == in.Index { + key := make([]byte, 32+len(it.Key()[16:])) + err = DecodeObjectKey(it.Key(), key) + if err != nil { + reqlog.Errorf("failed to decode object key: %v", err) + s.statsd_c.Increment("load_objects_by_prefix.fail") + return nil, status.Errorf(codes.Internal, "unable to decode object key") + } + nextPageToken := make([]byte, len(key)) + copy(nextPageToken, key) + response.NextPageToken = key + break + } + + } + s.statsd_c.Increment("load_objects_by_volume.ok") + return serializePb(response) +} + +// ListPartitions returns a list of partitions for which we have objects. +// This is used to emulate a listdir() of partitions below the "objects" directory. +func ListPartitions(s *server, ctx context.Context, pbIn *[]byte) (*[]byte, error) { + in := &pb.ListPartitionsRequest{} + if err := proto.Unmarshal(*pbIn, in); err != nil { + logrus.Errorf("failed to unmarshal input: %v", err) + return nil, status.Errorf(codes.InvalidArgument, "unable to deserialize protobuf") + } + + reqlog := log.WithFields(logrus.Fields{ + "Function": "ListPartitions", + "PartitionBits": in.PartitionBits, + }) + reqlog.Debug("RPC Call") + + if !s.isClean { + reqlog.Debug("KV out of sync with volumes") + return nil, status.Errorf(codes.FailedPrecondition, "KV out of sync with volumes") + } + + var currentPartition uint64 + var err error + var ohash []byte + + // Partition bits + pBits := int(in.PartitionBits) + + response := &pb.DirEntries{} + + // Seek to first object key + it := s.kv.NewIterator(objectPrefix) + defer it.Close() + it.SeekToFirst() + + // No object in the KV. + if !it.Valid() { + s.statsd_c.Increment("list_partitions.ok") + return serializePb(response) + } + + // Extract the md5 hash + if len(it.Key()) < 16 { + reqlog.WithFields(logrus.Fields{"key": it.Key()}).Error("object key < 16") + } else { + ohash = make([]byte, 32+len(it.Key()[16:])) + err = DecodeObjectKey(it.Key()[:16], ohash) + if err != nil { + reqlog.Errorf("failed to decode object key: %v", err) + s.statsd_c.Increment("load_objects_by_prefix.fail") + return nil, status.Errorf(codes.Internal, "unable to decode object key") + } + currentPartition, err = getPartitionFromOhash(ohash, pBits) + if err != nil { + s.statsd_c.Increment("list_partitions.fail") + return nil, err + } + } + + response.Entry = append(response.Entry, fmt.Sprintf("%d", currentPartition)) + if err != nil { + s.statsd_c.Increment("list_partitions.fail") + return nil, err + } + + maxPartition, err := getLastPartition(pBits) + + for currentPartition < maxPartition { + currentPartition++ + firstKey, err := getEncodedObjPrefixFromPartition(currentPartition, pBits) + if err != nil { + s.statsd_c.Increment("list_partitions.fail") + return nil, err + } + nextFirstKey, err := getEncodedObjPrefixFromPartition(currentPartition+1, pBits) + if err != nil { + s.statsd_c.Increment("list_partitions.fail") + return nil, err + } + + // key logging is now wrong, as it's not the ascii form + reqlog.WithFields(logrus.Fields{"currentPartition": currentPartition, + "maxPartition": maxPartition, + "firstKey": firstKey, + "ohash": ohash, + "nextFirstKey": nextFirstKey}).Debug("In loop") + + it.Seek(firstKey) + if !it.Valid() { + s.statsd_c.Increment("list_partitions.ok") + return serializePb(response) + } + + if len(it.Key()) < 16 { + reqlog.WithFields(logrus.Fields{"key": it.Key()}).Error("object key < 16") + } else { + ohash = make([]byte, 32+len(it.Key()[16:])) + err = DecodeObjectKey(it.Key()[:16], ohash) + if err != nil { + reqlog.Errorf("failed to decode object key: %v", err) + s.statsd_c.Increment("load_objects_by_prefix.fail") + return nil, status.Errorf(codes.Internal, "unable to decode object key") + } + // nextFirstKey is encoded, compare with encoded hash (16 first bits of the key) + if bytes.Compare(it.Key()[:16], nextFirstKey) > 0 { + // There was no key in currentPartition, find in which partition we are + currentPartition, err = getPartitionFromOhash(ohash, pBits) + if err != nil { + s.statsd_c.Increment("list_partitions.fail") + return nil, err + } + } + response.Entry = append(response.Entry, fmt.Sprintf("%d", currentPartition)) + } + } + + s.statsd_c.Increment("list_partitions.ok") + return serializePb(response) +} + +// ListPartition returns a list of suffixes within a partition +func ListPartition(s *server, ctx context.Context, pbIn *[]byte) (*[]byte, error) { + in := &pb.ListPartitionRequest{} + if err := proto.Unmarshal(*pbIn, in); err != nil { + logrus.Errorf("failed to unmarshal input: %v", err) + return nil, status.Errorf(codes.InvalidArgument, "unable to deserialize protobuf") + } + + reqlog := log.WithFields(logrus.Fields{ + "Function": "ListPartition", + "Partition": in.Partition, + "PartitionBits": in.PartitionBits, + }) + reqlog.Debug("RPC Call") + + if !s.isClean { + reqlog.Debug("KV out of sync with volumes") + return nil, status.Errorf(codes.FailedPrecondition, "KV out of sync with volumes") + } + + // Set to hold suffixes within partition + suffixSet := make(map[[3]byte]bool) + var suffix [3]byte + + // Partition bits + pBits := int(in.PartitionBits) + partition := uint64(in.Partition) + + response := &pb.DirEntries{} + + firstKey, err := getEncodedObjPrefixFromPartition(partition, pBits) + if err != nil { + s.statsd_c.Increment("list_partition.fail") + return nil, err + } + + // Seek to first key in partition, if any + it := s.kv.NewIterator(objectPrefix) + defer it.Close() + + it.Seek(firstKey) + // No object in the KV + if !it.Valid() { + s.statsd_c.Increment("list_partition.ok") + return serializePb(response) + } + + key := make([]byte, 32+len(it.Key()[16:])) + err = DecodeObjectKey(it.Key(), key) + if err != nil { + reqlog.Errorf("failed to decode object key: %v", err) + s.statsd_c.Increment("load_objects_by_prefix.fail") + return nil, status.Errorf(codes.Internal, "unable to decode object key") + } + currentPartition, err := getPartitionFromOhash(key, pBits) + if err != nil { + s.statsd_c.Increment("list_partition.fail") + return nil, err + } + + // Get all suffixes in the partition + for currentPartition == partition { + // Suffix is the last three bytes of the object hash + copy(suffix[:], key[29:32]) + suffixSet[suffix] = true + it.Next() + if !it.Valid() { + break + } + key = make([]byte, 32+len(it.Key()[16:])) + err = DecodeObjectKey(it.Key(), key) + if err != nil { + reqlog.Errorf("failed to decode object key: %v", err) + s.statsd_c.Increment("load_objects_by_prefix.fail") + return nil, status.Errorf(codes.Internal, "unable to decode object key") + } + currentPartition, err = getPartitionFromOhash(key, pBits) + } + + // Build the response from the hashmap + for suffix := range suffixSet { + response.Entry = append(response.Entry, fmt.Sprintf("%s", suffix)) + } + + s.statsd_c.Increment("list_partition.ok") + return serializePb(response) +} + +// ListSuffix returns a list of object hashes below the partition and suffix +func ListSuffix(s *server, ctx context.Context, pbIn *[]byte) (*[]byte, error) { + in := &pb.ListSuffixRequest{} + if err := proto.Unmarshal(*pbIn, in); err != nil { + logrus.Errorf("failed to unmarshal input: %v", err) + return nil, status.Errorf(codes.InvalidArgument, "unable to deserialize protobuf") + } + + reqlog := log.WithFields(logrus.Fields{ + "Function": "ListSuffix", + "Partition": in.Partition, + "Suffix": fmt.Sprintf("%s", in.Suffix), + "PartitionBits": in.PartitionBits, + }) + + if !s.isClean { + reqlog.Debug("KV out of sync with volumes") + return nil, status.Errorf(codes.FailedPrecondition, "KV out of sync with volumes") + } + + execTimeSerie := fmt.Sprintf("list_suffix.runtime.%s", s.diskName) + defer s.statsd_c.NewTiming().Send(execTimeSerie) + reqlog.Debug("RPC Call") + + lastOhash := make([]byte, 32) + + pBits := int(in.PartitionBits) + partition := uint64(in.Partition) + suffix := in.Suffix + + response := &pb.DirEntries{} + + failSerie := fmt.Sprintf("list_suffix.fail.%s", s.diskName) + successSerie := fmt.Sprintf("list_suffix.ok.%s", s.diskName) + firstKey, err := getEncodedObjPrefixFromPartition(partition, pBits) + if err != nil { + s.statsd_c.Increment(failSerie) + return nil, err + } + + // Seek to first key in partition, if any + it := s.kv.NewIterator(objectPrefix) + defer it.Close() + + it.Seek(firstKey) + // No object in the KV + if !it.Valid() { + s.statsd_c.Increment(successSerie) + return serializePb(response) + } + + // Allocate the slice with a capacity matching the length of the longest possible key + // We can then reuse it in the loop below. (avoid heap allocations, profiling showed it was an issue) + curKey := make([]byte, 32+len(firstKey[16:]), maxObjKeyLen) + err = DecodeObjectKey(firstKey, curKey) + if err != nil { + reqlog.Errorf("failed to decode object key: %v", err) + s.statsd_c.Increment("load_objects_by_prefix.fail") + return nil, status.Errorf(codes.Internal, "unable to decode object key") + } + currentPartition, err := getPartitionFromOhash(curKey, pBits) + if err != nil { + s.statsd_c.Increment(failSerie) + return nil, err + } + + for currentPartition == partition { + // Suffix is the last three bytes of the object hash + // key := make([]byte, 32+len(it.Key()[16:])) + curKey = curKey[:32+len(it.Key()[16:])] + err = DecodeObjectKey(it.Key(), curKey) + if err != nil { + reqlog.Errorf("failed to decode object key: %v", err) + s.statsd_c.Increment("load_objects_by_prefix.fail") + return nil, status.Errorf(codes.Internal, "unable to decode object key") + } + if bytes.Compare(curKey[29:32], suffix) == 0 { + ohash := make([]byte, 32) + ohash = curKey[:32] + // Only add to the list if we have not already done so + if !bytes.Equal(ohash, lastOhash) { + response.Entry = append(response.Entry, (fmt.Sprintf("%s", ohash))) + copy(lastOhash, ohash) + } + } + it.Next() + if !it.Valid() { + break + } + curKey = curKey[:32+len(it.Key()[16:])] + err = DecodeObjectKey(it.Key(), curKey) + if err != nil { + reqlog.Errorf("failed to decode object key: %v", err) + s.statsd_c.Increment("load_objects_by_prefix.fail") + return nil, status.Errorf(codes.Internal, "unable to decode object key") + } + currentPartition, err = getPartitionFromOhash(curKey, pBits) + } + + s.statsd_c.Increment(successSerie) + return serializePb(response) +} + +// Returns a list of quarantiened object hashes, with pagination. +// PageToken is the ohash to start from, as returned from a previous call in the +// NextPageToken field. If empty, the iterator will start from the first quarantined +// object hash. PageSize is the maximum count of items to return. If zero, +// the server will pick a reasonnable limit. +func ListQuarantinedOHashes(s *server, ctx context.Context, pbIn *[]byte) (*[]byte, error) { + in := &pb.ListQuarantinedOHashesRequest{} + if err := proto.Unmarshal(*pbIn, in); err != nil { + logrus.Errorf("failed to unmarshal input: %v", err) + return nil, status.Errorf(codes.InvalidArgument, "unable to deserialize protobuf") + } + + reqlog := log.WithFields(logrus.Fields{ + "Function": "ListQuarantinedOhashes", + "PageToken": fmt.Sprintf("%s", in.PageToken), + "PageSize": fmt.Sprintf("%d", in.PageSize), + }) + reqlog.Debug("RPC Call") + + if !s.isClean { + reqlog.Debug("KV out of sync with volumes") + return nil, status.Errorf(codes.FailedPrecondition, "KV out of sync with volumes") + } + + limit := in.PageSize + if limit == 0 { + reqlog.Debug("page_size was not specified, set it to 10000") + limit = 10000 + } + + pageToken := make([]byte, 32) + pageToken = in.PageToken + if bytes.Equal(pageToken, []byte("")) { + pageToken = []byte(strings.Repeat("0", 32)) + } + if len(pageToken) != 32 { + reqlog.Error("prefix len != 32") + s.statsd_c.Increment("list_quarantined_ohashes.fail") + return nil, status.Errorf(codes.InvalidArgument, "page token length != 32") + } + + prefix, err := EncodeObjectKey(pageToken) + if err != nil { + reqlog.Errorf("unable to encode object prefix: %v", err) + s.statsd_c.Increment("list_quarantined_ohashes.fail") + return nil, status.Errorf(codes.Unavailable, "unable to encode object prefix") + } + + it := s.kv.NewIterator(quarantinePrefix) + defer it.Close() + + response := &pb.ListQuarantinedOHashesReply{} + curKey := make([]byte, maxObjKeyLen) + lastOhash := make([]byte, 32) + + count := uint32(0) + for it.Seek(prefix); it.Valid() && count < limit; it.Next() { + curKey = curKey[:32+len(it.Key()[16:])] + err = DecodeObjectKey(it.Key(), curKey) + if err != nil { + reqlog.Errorf("failed to decode quarantined object key: %v", err) + s.statsd_c.Increment("list_quarantined_ohashes.fail") + return nil, status.Errorf(codes.Internal, "unable decode quarantined object key") + } + if !bytes.Equal(curKey[:32], lastOhash) { + ohash := make([]byte, 32) + copy(ohash, curKey[:32]) + response.Objects = append(response.Objects, &pb.QuarantinedObjectName{Name: ohash}) + copy(lastOhash, curKey[:32]) + count++ + } + } + + // Set NextPageToken if there is at least one ohash beyond what we have returned + for ; it.Valid(); it.Next() { + curKey = curKey[:32+len(it.Key()[16:])] + err = DecodeObjectKey(it.Key(), curKey) + if err != nil { + reqlog.Errorf("failed to decode quarantined object key: %v", err) + s.statsd_c.Increment("list_quarantined_ohashes.fail") + return nil, status.Errorf(codes.Internal, "unable decode quarantined object key") + } + if !bytes.Equal(curKey[:32], lastOhash) { + nextPageToken := make([]byte, 32) + copy(nextPageToken, curKey[:32]) + response.NextPageToken = nextPageToken + break + } + } + + s.statsd_c.Increment("list_quarantined_ohashes.ok") + return serializePb(response) +} + +func ListQuarantinedOHash(s *server, ctx context.Context, pbIn *[]byte) (*[]byte, error) { + in := &pb.ListQuarantinedOHashRequest{} + if err := proto.Unmarshal(*pbIn, in); err != nil { + logrus.Errorf("failed to unmarshal input: %v", err) + return nil, status.Errorf(codes.InvalidArgument, "unable to deserialize protobuf") + } + + reqlog := log.WithFields(logrus.Fields{ + "Function": "ListQuarantineOHash", + "Prefix": fmt.Sprintf("%s", in.Prefix), + }) + reqlog.Debug("RPC Call") + + if !in.RepairTool && !s.isClean { + reqlog.Debug("KV out of sync with volumes") + return nil, status.Errorf(codes.FailedPrecondition, "KV out of sync with volumes") + } + + if len(in.Prefix) != 32 { + reqlog.Error("prefix len != 32") + s.statsd_c.Increment("list_quarantined_ohash.fail") + return nil, status.Errorf(codes.Internal, "prefix len != 32") + } + + prefix, err := EncodeObjectKey(in.Prefix) + if err != nil { + reqlog.Errorf("unable to encode object prefix: %v", err) + s.statsd_c.Increment("list_quarantined_ohash.fail") + return nil, status.Errorf(codes.Unavailable, "unable to encode object prefix") + } + + it := s.kv.NewIterator(quarantinePrefix) + defer it.Close() + + response := &pb.ListQuarantinedOHashReply{} + + // adds one byte because of prefix. Otherwise len(prefix) would be len(prefix)-1 + for it.Seek(prefix); it.Valid() && len(prefix) <= len(it.Key()) && bytes.Equal(prefix, it.Key()[:len(prefix)]); it.Next() { + + // Decode value + volumeIndex, offset, err := DecodeObjectValue(it.Value()) + if err != nil { + reqlog.Errorf("failed to decode object value: %v", err) + s.statsd_c.Increment("list_quarantined_ohash.fail") + return nil, status.Errorf(codes.Internal, "unable to read object") + } + + key := make([]byte, 32+len(it.Key()[16:])) + err = DecodeObjectKey(it.Key(), key) + if err != nil { + reqlog.Errorf("failed to decode object key: %v", err) + s.statsd_c.Increment("list_quarantined_ohash.fail") + return nil, status.Errorf(codes.Internal, "unable to decode object key") + } + response.Objects = append(response.Objects, &pb.Object{Name: key, VolumeIndex: volumeIndex, Offset: offset}) + } + + s.statsd_c.Increment("list_quarantined_ohash.ok") + return serializePb(response) +} + +func GetNextOffset(s *server, ctx context.Context, pbIn *[]byte) (*[]byte, error) { + in := &pb.GetNextOffsetRequest{} + if err := proto.Unmarshal(*pbIn, in); err != nil { + logrus.Errorf("failed to unmarshal input: %v", err) + return nil, status.Errorf(codes.InvalidArgument, "unable to deserialize protobuf") + } + + reqlog := log.WithFields(logrus.Fields{"Function": "GetNextOffset", "VolumeIndex": in.VolumeIndex}) + reqlog.Debug("RPC Call") + + if !in.RepairTool && !s.isClean { + reqlog.Debug("KV out of sync with volumes") + return nil, status.Errorf(codes.FailedPrecondition, "KV out of sync with volumes") + } + + key := EncodeVolumeKey(in.VolumeIndex) + + value, err := s.kv.Get(volumePrefix, key) + if err != nil { + reqlog.Errorf("unable to retrieve volume key: %v", err) + s.statsd_c.Increment("get_next_offset.fail") + return nil, status.Errorf(codes.Unavailable, "unable to retrieve volume key") + } + + if value == nil { + reqlog.Info("volume index does not exist in db") + s.statsd_c.Increment("get_next_offset.fail") + return nil, status.Errorf(codes.FailedPrecondition, "volume index does not exist in db") + } + + _, _, nextOffset, _, _, err := DecodeVolumeValue(value) + if err != nil { + reqlog.WithFields(logrus.Fields{"value": value}).Errorf("failed to decode volume value: %v", err) + s.statsd_c.Increment("get_next_offset.fail") + return nil, status.Errorf(codes.Internal, "failed to decode volume value") + } + + s.statsd_c.Increment("get_next_offset.ok") + return serializePb(&pb.GetNextOffsetReply{Offset: uint64(nextOffset)}) +} + +// GetStats returns stats for the KV. used for initial debugging, remove? +func GetStats(s *server, ctx context.Context, pbIn *[]byte) (*[]byte, error) { + in := &pb.GetStatsRequest{} + if err := proto.Unmarshal(*pbIn, in); err != nil { + logrus.Errorf("failed to unmarshal input: %v", err) + return nil, status.Errorf(codes.InvalidArgument, "unable to deserialize protobuf") + } + + response := new(pb.GetStatsReply) + + m := CollectStats(s) + response.Stats = m + + return serializePb(response) +} + +// Sets KV state (is in sync with volumes, or not) +func SetKvState(s *server, ctx context.Context, pbIn *[]byte) (*[]byte, error) { + in := &pb.KvState{} + if err := proto.Unmarshal(*pbIn, in); err != nil { + logrus.Errorf("failed to unmarshal input: %v", err) + return nil, status.Errorf(codes.InvalidArgument, "unable to deserialize protobuf") + } + + reqlog := log.WithFields(logrus.Fields{"Function": "SetClean", "IsClean": in.IsClean}) + reqlog.Debug("RPC Call") + + s.isClean = in.IsClean + return serializePb(&pb.SetKvStateReply{}) +} + +// Gets KV state (is in sync with volumes, or not) +func GetKvState(s *server, ctx context.Context, pbIn *[]byte) (*[]byte, error) { + in := &pb.KvState{} + if err := proto.Unmarshal(*pbIn, in); err != nil { + logrus.Errorf("failed to unmarshal input: %v", err) + return nil, status.Errorf(codes.InvalidArgument, "unable to deserialize protobuf") + } + + reqlog := log.WithFields(logrus.Fields{"Function": "GetKvState"}) + reqlog.Debug("RPC Call") + state := new(pb.KvState) + state.IsClean = s.isClean + return serializePb(state) +} + +// Stops serving RPC requests and closes KV if we receive SIGTERM/SIGINT +func shutdownHandler(s *server, wg *sync.WaitGroup) { + <-s.stopChan + rlog := log.WithFields(logrus.Fields{"socket": s.socketPath}) + rlog.Info("Shutting down") + + // Stop serving RPC requests + // Give it a 5s delay to finish serving active requests, then force close + rlog.Debug("Stopping RPC") + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + if err := s.httpServer.Shutdown(ctx); err != nil { + // Error or timeout + rlog.Infof("HTTP server Shutdown: %v", err) + if err = s.httpServer.Close(); err != nil { + rlog.Infof("HTTP server Close: %v", err) + } + } + + // Mark DB as clean + if s.isClean == true { + rlog.Debug("Mark DB as closed") + err := MarkDbClosed(s.kv) + if err != nil { + rlog.Warn("Failed to mark db as clean when shutting down") + } + } else { + rlog.Warn("State is not clean, not marking DB as closed (still out of sync with volumes)") + } + + // Close KV + rlog.Debug("Closing KV") + s.kv.Close() + wg.Done() +} + +func runServer(kv KV, diskPath string, socketPath string, stopChan chan os.Signal, isClean bool) (err error) { + var wg sync.WaitGroup + + if err != nil { + return + } + os.Chmod(socketPath, 0660) + + _, diskName := path.Split(path.Clean(diskPath)) + fs := &server{kv: kv, diskPath: diskPath, diskName: diskName, socketPath: socketPath, + isClean: isClean, stopChan: stopChan} + + go func() { + unixListener, err := net.Listen("unix", fs.socketPath) + if err != nil { + log.Printf("Cannot serve") + } + server := http.Server{Handler: fs} + fs.httpServer = &server + server.Serve(unixListener) + }() + + // Initialize statsd client + statsdPrefix := "kv" + fs.statsd_c, err = statsd.New(statsd.Prefix(statsdPrefix)) + if err != nil { + return + } + + // Start shutdown handler + wg.Add(1) + go shutdownHandler(fs, &wg) + wg.Wait() + + return +} + +var strToFunc = map[string]rpcFunc{ + "/register_volume": RegisterVolume, + "/unregister_volume": UnregisterVolume, + "/update_volume_state": UpdateVolumeState, + "/get_volume": GetVolume, + "/list_volumes": ListVolumes, + "/register_object": RegisterObject, + "/unregister_object": UnregisterObject, + "/rename_object": RenameObject, + "/load_object": LoadObject, + "/quarantine_object": QuarantineObject, + "/unquarantine_object": UnquarantineObject, + "/load_objects_by_prefix": LoadObjectsByPrefix, + "/load_objects_by_volume": LoadObjectsByVolume, + "/list_partitions": ListPartitions, + "/list_partition": ListPartition, + "/list_suffix": ListSuffix, + "/list_quarantined_ohashes": ListQuarantinedOHashes, + "/list_quarantined_ohash": ListQuarantinedOHash, + "/get_next_offset": GetNextOffset, + "/get_stats": GetStats, + "/set_kv_state": SetKvState, + "/get_kv_state": GetKvState, +} + +func serializePb(msg proto.Message) (*[]byte, error) { + out, err := proto.Marshal(msg) + if err != nil { + log.Errorf("failed to serialize reply: %v", err) + return nil, status.Errorf(codes.Unavailable, "unable to serialize reply: %v", err) + } + return &out, nil +} + +func sendError(w http.ResponseWriter, rpcErr error) (err error) { + w.Header().Set("Content-Type", "Content-Type: text/plain; charset=utf-8") + w.WriteHeader(int(rpcErr.(*status.RpcError).Code())) + errorMsg := []byte(rpcErr.Error()) + _, err = w.Write(errorMsg) + return +} + +func sendReply(w http.ResponseWriter, serializedPb []byte) (err error) { + w.Header().Set("Content-Type", "application/octet-stream") + w.WriteHeader(200) + _, err = w.Write(serializedPb) + return +} + +func (s *server) ServeHTTP(w http.ResponseWriter, r *http.Request) { + var err error + log.Debugf(r.URL.Path) + + // Match URL to RPC function + fn, ok := strToFunc[r.URL.Path] + if !ok { + log.Printf("No match for URL Path %s", r.URL.Path) + if err = sendError(w, status.Errorf(codes.Unimplemented, "Unimplemented RPC function")); err != nil { + log.Printf("Error sending reply: %v", err) + } + return + } + + // Read request (body should be serialized protobuf) + body, err := ioutil.ReadAll(r.Body) + if err != nil { + log.Printf("Error reading body: %v", err) + if err = sendError(w, status.Errorf(codes.Internal, "Failed to read request body")); err != nil { + log.Printf("Error sending reply: %v", err) + } + return + } + + // Call RPC function and send reply + resp, err := fn(s, r.Context(), &body) + if err != nil { + log.Println(err) + if err = sendError(w, err); err != nil { + log.Printf("Error sending reply: %v", err) + } + return + } + + if err = sendReply(w, *resp); err != nil { + log.Printf("Error sending reply: %v", err) + } +} diff --git a/go/swift-rpc-losf/rpc_test.go b/go/swift-rpc-losf/rpc_test.go new file mode 100644 index 000000000..0da9b5e6a --- /dev/null +++ b/go/swift-rpc-losf/rpc_test.go @@ -0,0 +1,1014 @@ +// Copyright (c) 2010-2012 OpenStack Foundation +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "bytes" + "context" + "fmt" + "github.com/alecuyer/statsd/v2" + "github.com/golang/protobuf/proto" + "github.com/openstack/swift-rpc-losf/codes" + pb "github.com/openstack/swift-rpc-losf/proto" + "github.com/sirupsen/logrus" + "io/ioutil" + "net" + "net/http" + "os" + "path" + "strings" + "testing" +) + +func runTestServer(kv KV, diskPath string, socketPath string, listening chan bool) (err error) { + _, diskName := path.Split(path.Clean(diskPath)) + fs := &server{kv: kv, diskPath: diskPath, diskName: diskName, socketPath: socketPath, isClean: true} + + statsdPrefix := "kv" + fs.statsd_c, err = statsd.New(statsd.Prefix(statsdPrefix)) + if err != nil { + return + } + + go func() { + os.Remove(fs.socketPath) + unixListener, err := net.Listen("unix", fs.socketPath) + if err != nil { + log.Fatalf("Cannot serve: %v", err) + } + listening <- true + server := http.Server{Handler: fs} + fs.httpServer = &server + log.Debug("Start serving") + server.Serve(unixListener) + + }() + return +} + +func teardown(tempdir string) { + if strings.HasPrefix(tempdir, "/tmp/") { + os.RemoveAll(tempdir) + } +} + +// var client pb.FileMgrClient +var client http.Client + +// Check that err == nil and HTTP's status code is 200. If err is not nil, return err, +// if status code is not 200, returns an error with the status code received, if err is nil +// and status code is 200, return nil) +func check_200(response *http.Response, err error) error { + if err != nil { + return err + } + + if response.StatusCode != 200 { + return fmt.Errorf("HTTP status code is not 200: %v", response.StatusCode) + } + + return nil +} + +func populateKV() (err error) { + volumes := []pb.RegisterVolumeRequest{ + {Partition: 9, Type: 0, VolumeIndex: 20, Offset: 8192, State: 0, RepairTool: false}, + {Partition: 10, Type: 0, VolumeIndex: 35, Offset: 8192, State: 0, RepairTool: false}, + {Partition: 40, Type: 0, VolumeIndex: 24, Offset: 8192, State: 0, RepairTool: false}, + {Partition: 63, Type: 0, VolumeIndex: 27, Offset: 8192, State: 0, RepairTool: false}, + {Partition: 65, Type: 0, VolumeIndex: 33, Offset: 8192, State: 0, RepairTool: false}, + {Partition: 71, Type: 0, VolumeIndex: 19, Offset: 8192, State: 0, RepairTool: false}, + {Partition: 111, Type: 0, VolumeIndex: 47, Offset: 8192, State: 0, RepairTool: false}, + {Partition: 127, Type: 0, VolumeIndex: 43, Offset: 8192, State: 0, RepairTool: false}, + {Partition: 139, Type: 0, VolumeIndex: 50, Offset: 8192, State: 0, RepairTool: false}, + {Partition: 171, Type: 0, VolumeIndex: 49, Offset: 8192, State: 0, RepairTool: false}, + {Partition: 195, Type: 0, VolumeIndex: 12, Offset: 8192, State: 0, RepairTool: false}, + {Partition: 211, Type: 0, VolumeIndex: 16, Offset: 8192, State: 0, RepairTool: false}, + {Partition: 213, Type: 0, VolumeIndex: 14, Offset: 8192, State: 0, RepairTool: false}, + {Partition: 243, Type: 0, VolumeIndex: 17, Offset: 8192, State: 0, RepairTool: false}, + {Partition: 271, Type: 0, VolumeIndex: 8, Offset: 24576, State: 0, RepairTool: false}, + {Partition: 295, Type: 0, VolumeIndex: 28, Offset: 8192, State: 0, RepairTool: false}, + {Partition: 327, Type: 0, VolumeIndex: 48, Offset: 8192, State: 0, RepairTool: false}, + {Partition: 360, Type: 0, VolumeIndex: 15, Offset: 12288, State: 0, RepairTool: false}, + {Partition: 379, Type: 0, VolumeIndex: 25, Offset: 8192, State: 0, RepairTool: false}, + {Partition: 417, Type: 0, VolumeIndex: 22, Offset: 8192, State: 0, RepairTool: false}, + {Partition: 420, Type: 0, VolumeIndex: 32, Offset: 8192, State: 0, RepairTool: false}, + {Partition: 421, Type: 0, VolumeIndex: 46, Offset: 8192, State: 0, RepairTool: false}, + {Partition: 428, Type: 0, VolumeIndex: 21, Offset: 12288, State: 0, RepairTool: false}, + {Partition: 439, Type: 0, VolumeIndex: 38, Offset: 8192, State: 0, RepairTool: false}, + {Partition: 453, Type: 0, VolumeIndex: 44, Offset: 8192, State: 0, RepairTool: false}, + {Partition: 466, Type: 0, VolumeIndex: 40, Offset: 8192, State: 0, RepairTool: false}, + {Partition: 500, Type: 0, VolumeIndex: 39, Offset: 8192, State: 0, RepairTool: false}, + {Partition: 513, Type: 0, VolumeIndex: 26, Offset: 8192, State: 0, RepairTool: false}, + {Partition: 530, Type: 0, VolumeIndex: 4, Offset: 8192, State: 0, RepairTool: false}, + {Partition: 530, Type: 1, VolumeIndex: 5, Offset: 8192, State: 0, RepairTool: false}, + {Partition: 535, Type: 0, VolumeIndex: 1, Offset: 20480, State: 0, RepairTool: false}, + {Partition: 535, Type: 0, VolumeIndex: 2, Offset: 4096, State: 0, RepairTool: false}, + {Partition: 535, Type: 1, VolumeIndex: 3, Offset: 12288, State: 0, RepairTool: false}, + {Partition: 559, Type: 0, VolumeIndex: 30, Offset: 8192, State: 0, RepairTool: false}, + {Partition: 602, Type: 0, VolumeIndex: 41, Offset: 8192, State: 0, RepairTool: false}, + {Partition: 604, Type: 0, VolumeIndex: 29, Offset: 8192, State: 0, RepairTool: false}, + {Partition: 673, Type: 0, VolumeIndex: 11, Offset: 8192, State: 0, RepairTool: false}, + {Partition: 675, Type: 0, VolumeIndex: 42, Offset: 8192, State: 0, RepairTool: false}, + {Partition: 710, Type: 0, VolumeIndex: 37, Offset: 8192, State: 0, RepairTool: false}, + {Partition: 765, Type: 0, VolumeIndex: 36, Offset: 8192, State: 0, RepairTool: false}, + {Partition: 766, Type: 0, VolumeIndex: 45, Offset: 8192, State: 0, RepairTool: false}, + {Partition: 786, Type: 0, VolumeIndex: 23, Offset: 8192, State: 0, RepairTool: false}, + {Partition: 809, Type: 0, VolumeIndex: 31, Offset: 8192, State: 0, RepairTool: false}, + {Partition: 810, Type: 0, VolumeIndex: 13, Offset: 8192, State: 0, RepairTool: false}, + {Partition: 855, Type: 0, VolumeIndex: 18, Offset: 8192, State: 0, RepairTool: false}, + {Partition: 974, Type: 0, VolumeIndex: 9, Offset: 8192, State: 0, RepairTool: false}, + {Partition: 977, Type: 0, VolumeIndex: 6, Offset: 8192, State: 0, RepairTool: false}, + {Partition: 977, Type: 1, VolumeIndex: 7, Offset: 8192, State: 0, RepairTool: false}, + {Partition: 1009, Type: 0, VolumeIndex: 34, Offset: 8192, State: 0, RepairTool: false}, + {Partition: 1019, Type: 0, VolumeIndex: 10, Offset: 8192, State: 0, RepairTool: false}, + } + + objects := []pb.RegisterObjectRequest{ + {Name: []byte("85fd12f8961e33cbf7229a94118524fa1515589781.45671.ts"), VolumeIndex: 3, Offset: 8192, NextOffset: 12288, RepairTool: false}, + {Name: []byte("84afc1659c7e8271951fe370d6eee0f81515590332.51834.ts"), VolumeIndex: 5, Offset: 4096, NextOffset: 8192, RepairTool: false}, + {Name: []byte("f45bf9000f39092b9de5a74256e3eebe1515590648.06511.ts"), VolumeIndex: 7, Offset: 4096, NextOffset: 8192, RepairTool: false}, + {Name: []byte("43c8adc53dbb40d27add4f614fc49e5e1515595691.35618#0#d.data"), VolumeIndex: 8, Offset: 20480, NextOffset: 24576, RepairTool: false}, + {Name: []byte("f3804523d91d294dab1500145b43395b1515596136.42189#4#d.data"), VolumeIndex: 9, Offset: 4096, NextOffset: 8192, RepairTool: false}, + {Name: []byte("fefe1ba1120cd6cd501927401d6b2ecc1515750800.13517#2#d.data"), VolumeIndex: 10, Offset: 4096, NextOffset: 8192, RepairTool: false}, + {Name: []byte("a8766d2608b77dc6cb0bfe3fe6782c731515750800.18975#0#d.data"), VolumeIndex: 11, Offset: 4096, NextOffset: 8192, RepairTool: false}, + {Name: []byte("30f12368ca25d11fb1a80d10e64b15431515750800.19224#4#d.data"), VolumeIndex: 12, Offset: 4096, NextOffset: 8192, RepairTool: false}, + {Name: []byte("ca9576ada218f74cb8f11648ecec439c1515750800.21553#2#d.data"), VolumeIndex: 13, Offset: 4096, NextOffset: 8192, RepairTool: false}, + {Name: []byte("3549df7ef11006af6852587bf16d82971515750800.22096#2#d.data"), VolumeIndex: 14, Offset: 4096, NextOffset: 8192, RepairTool: false}, + {Name: []byte("5a0a70e36a057a9982d1dc9188069b511515750803.50544#0#d.data"), VolumeIndex: 15, Offset: 8192, NextOffset: 12288, RepairTool: false}, + {Name: []byte("5a1801fea97614f8c5f58511905773d01515750800.40035#0#d.data"), VolumeIndex: 15, Offset: 4096, NextOffset: 8192, RepairTool: false}, + {Name: []byte("34c46ce96897a24374d126d7d7eab2fb1515750800.42545#0#d.data"), VolumeIndex: 16, Offset: 4096, NextOffset: 8192, RepairTool: false}, + {Name: []byte("3cf60143ea488c84da9e1603158203a11515750800.93160#0#d.data"), VolumeIndex: 17, Offset: 4096, NextOffset: 8192, RepairTool: false}, + {Name: []byte("d5c64e9cb0b093441fb6b500141aa0531515750800.94069#2#d.data"), VolumeIndex: 18, Offset: 4096, NextOffset: 8192, RepairTool: false}, + {Name: []byte("11f5db768b6f9a37cf894af99b15c0d11515750801.05135#4#d.data"), VolumeIndex: 19, Offset: 4096, NextOffset: 8192, RepairTool: false}, + {Name: []byte("02573d31b770cda8e0effd7762e8a0751515750801.09785#2#d.data"), VolumeIndex: 20, Offset: 4096, NextOffset: 8192, RepairTool: false}, + {Name: []byte("6b08eabf5667557c72dc6570aa1fb8451515750801.08639#4#d.data"), VolumeIndex: 21, Offset: 4096, NextOffset: 8192, RepairTool: false}, + {Name: []byte("6b08eabf5667557c72dc6570aa1fb8451515750856.77219.meta"), VolumeIndex: 21, Offset: 8192, NextOffset: 12288, RepairTool: false}, + {Name: []byte("6b08eabf5667557c72dc6570abcfb8451515643210.72429#4#d.data"), VolumeIndex: 22, Offset: 8192, NextOffset: 12288, RepairTool: false}, + {Name: []byte("687ba0410f4323c66397a85292077b101515750801.10244#0#d.data"), VolumeIndex: 22, Offset: 4096, NextOffset: 8192, RepairTool: false}, + {Name: []byte("c4aaea9b28c425f45eb64d4d5b0b3f621515750801.19478#2#d.data"), VolumeIndex: 23, Offset: 4096, NextOffset: 8192, RepairTool: false}, + {Name: []byte("0a0898eb861579d1240adbb1c9f0c92b1515750801.20636#2#d.data"), VolumeIndex: 24, Offset: 4096, NextOffset: 8192, RepairTool: false}, + {Name: []byte("5efd43142db5913180ba865ef529eccd1515750801.64704#4#d.data"), VolumeIndex: 25, Offset: 4096, NextOffset: 8192, RepairTool: false}, + {Name: []byte("806a35f1e974f93161b2da51760f22701515750801.68309#2#d.data"), VolumeIndex: 26, Offset: 4096, NextOffset: 8192, RepairTool: false}, + {Name: []byte("0fdceb7af49cdd0cb1262acbdc88ae881515750801.93565#0#d.data"), VolumeIndex: 27, Offset: 4096, NextOffset: 8192, RepairTool: false}, + {Name: []byte("49d4fa294d2c97f08596148bf4615bfa1515750801.93739#4#d.data"), VolumeIndex: 28, Offset: 4096, NextOffset: 8192, RepairTool: false}, + {Name: []byte("971b4d05733f475d447d7f8b050bb0071515750802.09721#2#d.data"), VolumeIndex: 29, Offset: 4096, NextOffset: 8192, RepairTool: false}, + {Name: []byte("8bc66b3ae033db15ceb3729d89a07ece1515750802.51062#0#d.data"), VolumeIndex: 30, Offset: 4096, NextOffset: 8192, RepairTool: false}, + {Name: []byte("ca53beae1aeb4deacd17409e32305a2c1515750802.63996#2#d.data"), VolumeIndex: 31, Offset: 4096, NextOffset: 8192, RepairTool: false}, + {Name: []byte("69375433763d9d511114e8ac869c916c1515750802.63846#0#d.data"), VolumeIndex: 32, Offset: 4096, NextOffset: 8192, RepairTool: false}, + {Name: []byte("105de5f388ab4b72e56bc93f36ad388a1515750802.73393#2#d.data"), VolumeIndex: 33, Offset: 4096, NextOffset: 8192, RepairTool: false}, + {Name: []byte("105de5f388ab4b72e56bc93f36ad388a1515873948.27383#2#d.meta"), VolumeIndex: 33, Offset: 8192, NextOffset: 12288, RepairTool: false}, + {Name: []byte("fc6916fd1e6a0267afac88c395b876ac1515750802.83459#2#d.data"), VolumeIndex: 34, Offset: 4096, NextOffset: 8192, RepairTool: false}, + {Name: []byte("02b10d6bfb205fe0f34f9bd82336dc711515750802.93662#2#d.data"), VolumeIndex: 35, Offset: 4096, NextOffset: 8192, RepairTool: false}, + {Name: []byte("bf43763a98208f15da803e76bf52e7d11515750803.01357#0#d.data"), VolumeIndex: 36, Offset: 4096, NextOffset: 8192, RepairTool: false}, + {Name: []byte("b1abadfed91b1cb4392dd2ec29e171ac1515750803.07767#4#d.data"), VolumeIndex: 37, Offset: 4096, NextOffset: 8192, RepairTool: false}, + {Name: []byte("6de30d74634d088f1f5923336af2b3ae1515750803.36199#4#d.data"), VolumeIndex: 38, Offset: 4096, NextOffset: 8192, RepairTool: false}, + {Name: []byte("7d234bbd1137d509105245ac78427b9f1515750803.49022#4#d.data"), VolumeIndex: 39, Offset: 4096, NextOffset: 8192, RepairTool: false}, + {Name: []byte("749057975c1bac830360530bdcd741591515750803.49647#0#d.data"), VolumeIndex: 40, Offset: 4096, NextOffset: 8192, RepairTool: false}, + {Name: []byte("9692991e77c9742cbc24469391d499981515750803.56295#0#d.data"), VolumeIndex: 41, Offset: 4096, NextOffset: 8192, RepairTool: false}, + {Name: []byte("a8dbd473e360787caff0b97aca33373f1515750803.68428#2#d.data"), VolumeIndex: 42, Offset: 4096, NextOffset: 8192, RepairTool: false}, + {Name: []byte("1ff88cb2b6b64f1fd3b6097f20203ee01515750803.73746#4#d.data"), VolumeIndex: 43, Offset: 4096, NextOffset: 8192, RepairTool: false}, + {Name: []byte("71572f46094d7ac440f5e2a3c72da17b1515750803.75628#2#d.data"), VolumeIndex: 44, Offset: 4096, NextOffset: 8192, RepairTool: false}, + {Name: []byte("bf8e83d954478d66ac1dba7eaa832c721515750803.81141#4#d.data"), VolumeIndex: 45, Offset: 4096, NextOffset: 8192, RepairTool: false}, + {Name: []byte("69724f682fe12b4a4306bceeb75825431515750804.10112#2#d.data"), VolumeIndex: 46, Offset: 4096, NextOffset: 8192, RepairTool: false}, + {Name: []byte("1bf38645ccc5f158c96480f1e0861a141515750804.31472#0#d.data"), VolumeIndex: 47, Offset: 4096, NextOffset: 8192, RepairTool: false}, + {Name: []byte("51fecf0e0bb30920fd0d83ee8fba29f71515750804.32492#2#d.data"), VolumeIndex: 48, Offset: 4096, NextOffset: 8192, RepairTool: false}, + {Name: []byte("2acbf85061e46b3bb3adb8930cb7414d1515750804.46622#2#d.data"), VolumeIndex: 49, Offset: 4096, NextOffset: 8192, RepairTool: false}, + {Name: []byte("22e4a97f1d4f2b6d4150bb9b481e4c971515750804.51987#0#d.data"), VolumeIndex: 50, Offset: 4096, NextOffset: 8192, RepairTool: false}, + } + + // Register volumes + for _, df := range volumes { + out, err := proto.Marshal(&df) + if err != nil { + log.Error("failed to marshal") + return err + } + body := bytes.NewReader(out) + resp, err := client.Post("http://unix/register_volume", "application/octet-stream", body) + if err = check_200(resp, err); err != nil { + return err + } + defer resp.Body.Close() + } + + // Register objects + for _, obj := range objects { + out, err := proto.Marshal(&obj) + if err != nil { + log.Error("failed to marshal") + return err + } + body := bytes.NewReader(out) + resp, err := client.Post("http://unix/register_object", "application/octet-stream", body) + if err = check_200(resp, err); err != nil { + return err + } + defer resp.Body.Close() + } + return +} + +func TestMain(m *testing.M) { + log.SetLevel(logrus.InfoLevel) + diskPath, err := ioutil.TempDir("/tmp", "losf-test") + if err != nil { + log.Fatal(err) + } + rootDir := path.Join(diskPath, "losf") + dbDir := path.Join(rootDir, "db") + + err = os.MkdirAll(rootDir, 0700) + if err != nil { + log.Fatal(err) + } + + kv, err := openLevigoDB(dbDir) + if err != nil { + log.Fatal("failed to create leveldb") + } + socket_path := "/tmp/rpc.socket" + listening := make(chan bool, 1) + go runTestServer(kv, diskPath, socket_path, listening) + // Wait for the socket + <-listening + + client = http.Client{Transport: &http.Transport{DialContext: func(_ context.Context, _, _ string) (net.Conn, error) { + return net.Dial("unix", "/tmp/rpc.socket") + }, + }, + } + + err = populateKV() + if err != nil { + log.Error(err) + log.Fatal("failed to populate test KV") + } + + ret := m.Run() + + teardown(diskPath) + + os.Exit(ret) +} + +// TODO, add more tests: +// - prefix with no objects +// - single object +// - first and last elements of the KV +func TestLoadObjectsByPrefix(t *testing.T) { + prefix := &pb.LoadObjectsByPrefixRequest{Prefix: []byte("105de5f388ab4b72e56bc93f36ad388a")} + + out, err := proto.Marshal(prefix) + if err != nil { + t.Error("failed to marshal") + } + body := bytes.NewReader(out) + + expectedObjects := []pb.Object{ + {Name: []byte("105de5f388ab4b72e56bc93f36ad388a1515750802.73393#2#d.data"), VolumeIndex: 33, Offset: 4096}, + {Name: []byte("105de5f388ab4b72e56bc93f36ad388a1515873948.27383#2#d.meta"), VolumeIndex: 33, Offset: 8192}, + } + + response, err := client.Post("http://unix/load_objects_by_prefix", "application/octet-stream", body) + if err = check_200(response, err); err != nil { + t.Fatalf("RPC call failed: %v", err) + } + defer response.Body.Close() + + r := &pb.LoadObjectsByPrefixReply{} + buf := new(bytes.Buffer) + buf.ReadFrom(response.Body) + if err = proto.Unmarshal(buf.Bytes(), r); err != nil { + t.Error("failed to unmarshal") + } + + for i, obj := range r.Objects { + expected := expectedObjects[i] + if !bytes.Equal(obj.Name, expected.Name) { + t.Errorf("\ngot : %s\nexpected: %s", string(obj.Name), string(expected.Name)) + } + } +} + +func TestListPartitions(t *testing.T) { + partPower := uint32(10) + + lpInfo := &pb.ListPartitionsRequest{PartitionBits: partPower} + out, err := proto.Marshal(lpInfo) + if err != nil { + t.Error("failed to marshal") + } + body := bytes.NewReader(out) + + expectedPartitions := []string{"9", "10", "40", "63", "65", "71", "111", "127", "139", "171", "195", "211", "213", "243", "271", "295", "327", "360", "379", "417", "420", "421", "428", "439", "453", "466", "500", "513", "530", "535", "559", "602", "604", "673", "675", "710", "765", "766", "786", "809", "810", "855", "974", "977", "1009", "1019"} + + response, err := client.Post("http://unix/list_partitions", "application/octet-stream", body) + if err = check_200(response, err); err != nil { + t.Fatalf("RPC call failed: %v", err) + } + defer response.Body.Close() + + r := &pb.DirEntries{} + buf := new(bytes.Buffer) + buf.ReadFrom(response.Body) + if err = proto.Unmarshal(buf.Bytes(), r); err != nil { + t.Error("failed to unmarshal") + } + + if len(r.Entry) != len(expectedPartitions) { + t.Fatalf("\ngot: %v\nwant: %v", r.Entry, expectedPartitions) + } + + for i, obj := range r.Entry { + if obj != expectedPartitions[i] { + t.Fatalf("checking individual elements\ngot: %v\nwant: %v", r.Entry, expectedPartitions) + } + } +} + +// TODO: add more tests, have a suffix with multiple entries +func TestListSuffix(t *testing.T) { + partition := uint32(428) + partPower := uint32(10) + suffix := []byte("845") + + lsInfo := &pb.ListSuffixRequest{Partition: partition, Suffix: suffix, PartitionBits: partPower} + out, err := proto.Marshal(lsInfo) + if err != nil { + t.Error(err) + } + body := bytes.NewReader(out) + + expectedHashes := []string{"6b08eabf5667557c72dc6570aa1fb845", "6b08eabf5667557c72dc6570abcfb845"} + + response, err := client.Post("http://unix/list_suffix", "application/octet-stream", body) + if err = check_200(response, err); err != nil { + t.Fatalf("RPC call failed: %v", err) + } + defer response.Body.Close() + + r := &pb.DirEntries{} + buf := new(bytes.Buffer) + buf.ReadFrom(response.Body) + if err = proto.Unmarshal(buf.Bytes(), r); err != nil { + t.Error(err) + } + + if len(r.Entry) != len(expectedHashes) { + t.Fatalf("\ngot: %v\nwant: %v", r.Entry, expectedHashes) + } + + for i, obj := range r.Entry { + if obj != expectedHashes[i] { + t.Fatalf("checking individual elements\ngot: %v\nwant: %v", r.Entry, expectedHashes) + } + } +} + +func TestState(t *testing.T) { + // Mark dirty and check + kvstate := &pb.KvState{} + out, err := proto.Marshal(kvstate) + if err != nil { + t.Error(err) + } + body := bytes.NewReader(out) + + response, err := client.Post("http://unix/set_kv_state", "application/octet-stream", body) + if err = check_200(response, err); err != nil { + t.Fatalf("Failed to change KV state: %v", err) + } + response.Body.Close() + + empty := &pb.GetKvStateRequest{} + empty_serialized, err := proto.Marshal(empty) + if err != nil { + t.Error(err) + } + body = bytes.NewReader(empty_serialized) + + response, err = client.Post("http://unix/get_kv_state", "application/octet-stream", body) + if err = check_200(response, err); err != nil { + t.Fatalf("RPC call failed: %v", err) + } + r := &pb.KvState{} + buf := new(bytes.Buffer) + buf.ReadFrom(response.Body) + if err = proto.Unmarshal(buf.Bytes(), r); err != nil { + t.Error(err) + } + + if r.IsClean != false { + t.Fatal("isClean true, should be false") + } + + // Mark clean and check + kvstate = &pb.KvState{IsClean: true} + out, err = proto.Marshal(kvstate) + if err != nil { + t.Error(err) + } + body = bytes.NewReader(out) + + response, err = client.Post("http://unix/set_kv_state", "application/octet-stream", body) + if err = check_200(response, err); err != nil { + t.Fatalf("Failed to change KV state: %v", err) + } + response.Body.Close() + + body = bytes.NewReader(empty_serialized) + response, err = client.Post("http://unix/get_kv_state", "application/octet-stream", body) + if err = check_200(response, err); err != nil { + t.Fatalf("RPC call failed: %v", err) + } + defer response.Body.Close() + buf.Reset() + buf.ReadFrom(response.Body) + + if err = proto.Unmarshal(buf.Bytes(), r); err != nil { + t.Error(err) + } + + if r.IsClean != true { + t.Fatal("isClean false, should be true") + } + +} + +func TestRegisterObject(t *testing.T) { + // Register new non-existing object + name := []byte("33dea50d391ee52a8ead7cb562a9b4e2/1539791765.84449#5#d.data") + obj := &pb.RegisterObjectRequest{Name: name, VolumeIndex: 1, Offset: 4096, NextOffset: 8192, RepairTool: false} + out, err := proto.Marshal(obj) + if err != nil { + t.Fatal(err) + } + body := bytes.NewReader(out) + + response, err := client.Post("http://unix/register_object", "application/octet-stream", body) + if err = check_200(response, err); err != nil { + t.Fatalf("failed to register object: %s", err) + } + response.Body.Close() + + objInfo := &pb.LoadObjectRequest{Name: name, IsQuarantined: false, RepairTool: false} + out, err = proto.Marshal(objInfo) + if err != nil { + t.Fatal(err) + } + body = bytes.NewReader(out) + response, err = client.Post("http://unix/load_object", "application/octet-stream", body) + if err = check_200(response, err); err != nil { + t.Fatalf("error getting registered object: %s", err) + } + r := &pb.Object{} + buf := new(bytes.Buffer) + buf.ReadFrom(response.Body) + if err = proto.Unmarshal(buf.Bytes(), r); err != nil { + t.Fatal(err) + } + response.Body.Close() + + if !bytes.Equal(r.Name, name) || r.VolumeIndex != 1 || r.Offset != 4096 { + t.Fatalf("object found but name, volume index, or offset, is wrong: %v", r) + } + + // Register existing object, which should fail + obj = &pb.RegisterObjectRequest{Name: name, VolumeIndex: 1, Offset: 4096, NextOffset: 8192, RepairTool: false} + out, err = proto.Marshal(obj) + if err != nil { + t.Fatal(err) + } + body = bytes.NewReader(out) + response, err = client.Post("http://unix/register_object", "application/octet-stream", body) + if err != nil { + t.Fatal(err) + } + if response.StatusCode != int(codes.AlreadyExists) { + t.Fatalf("wrong status code, expected: %d, got: %d", codes.AlreadyExists, response.StatusCode) + } + response.Body.Close() + + // Remove object + unregInfo := &pb.UnregisterObjectRequest{Name: name} + out, err = proto.Marshal(unregInfo) + if err != nil { + t.Fatal(err) + } + body = bytes.NewReader(out) + response, err = client.Post("http://unix/unregister_object", "application/octet-stream", body) + if err = check_200(response, err); err != nil { + t.Fatalf("failed to unregister object: %s", err) + } + response.Body.Close() + + // Attempt to remove again, should fail + body = bytes.NewReader(out) + response, err = client.Post("http://unix/unregister_object", "application/octet-stream", body) + if err != nil { + t.Fatal(err) + } + if response.StatusCode != int(codes.NotFound) { + t.Fatalf("wrong status code, expected: %d, got: %d", codes.NotFound, response.StatusCode) + } +} + +func TestQuarantineObject(t *testing.T) { + // Quarantine an existing object + name := []byte("bf43763a98208f15da803e76bf52e7d11515750803.01357#0#d.data") + objName := &pb.QuarantineObjectRequest{Name: name, RepairTool: false} + out, err := proto.Marshal(objName) + if err != nil { + t.Fatal(err) + } + body := bytes.NewReader(out) + response, err := client.Post("http://unix/quarantine_object", "applicable/octet-stream", body) + if err = check_200(response, err); err != nil { + t.Fatal("Failed to quarantine object") + } + response.Body.Close() + + // We shouldn't be able to find it + objInfo := &pb.LoadObjectRequest{Name: name, IsQuarantined: false, RepairTool: false} + out, err = proto.Marshal(objInfo) + if err != nil { + t.Fatal(err) + } + body = bytes.NewReader(out) + response, err = client.Post("http://unix/quarantine_object", "applicable/octet-stream", body) + if err != nil { + t.Fatal(err) + } + if response.StatusCode != int(codes.NotFound) { + t.Fatalf("wrong status code, expected: %d, got: %d", codes.NotFound, response.StatusCode) + } + response.Body.Close() + + // TODO, need to test that the quarantined object exists + // then try to quarantine non existent object +} + +func TestUnquarantineObject(t *testing.T) { + // Unquarantine an existing quarantined object (check that) + name := []byte("bf43763a98208f15da803e76bf52e7d11515750803.01357#0#d.data") + objName := &pb.UnquarantineObjectRequest{Name: name, RepairTool: false} + out, err := proto.Marshal(objName) + if err != nil { + t.Fatal(err) + } + body := bytes.NewReader(out) + + response, err := client.Post("http://unix/unquarantine_object", "application/octet-stream", body) + if err = check_200(response, err); err != nil { + t.Fatal("Failed to quarantine object") + } + response.Body.Close() + + // We should be able to find it + objInfo := &pb.LoadObjectRequest{Name: name, IsQuarantined: false, RepairTool: false} + out, err = proto.Marshal(objInfo) + if err != nil { + t.Fatal(err) + } + body = bytes.NewReader(out) + + response, err = client.Post("http://unix/load_object", "application/octet-stream", body) + if err = check_200(response, err); err != nil { + t.Fatal("cannot find unquarantined object") + } + response.Body.Close() + + // TODO, need to test that the quarantined object exists + // then try to quarantine non existent object +} + +// This test modifies the DB +func TestListQuarantinedOHashes(t *testing.T) { + // We shouldn't find any quarantined object initially + lqInfo := &pb.ListQuarantinedOHashesRequest{PageSize: 100} + out, err := proto.Marshal(lqInfo) + if err != nil { + t.Fatal(err) + } + body := bytes.NewReader(out) + + response, err := client.Post("http://unix/list_quarantined_ohashes", "application/octet-stream", body) + if err = check_200(response, err); err != nil { + t.Fatalf("failed to list quarantined ohashes: %v", err) + } + + r := &pb.ListQuarantinedOHashesReply{} + buf := new(bytes.Buffer) + buf.ReadFrom(response.Body) + if err = proto.Unmarshal(buf.Bytes(), r); err != nil { + t.Fatal(err) + } + + if r.Objects != nil { + t.Fatalf("Did not expect to find any quarantined objects. Found: %v", r.Objects) + } + response.Body.Close() + + // Quarantine a few objects and check we can find them + objectsToQuarantine := []pb.QuarantineObjectRequest{ + {Name: []byte("02573d31b770cda8e0effd7762e8a0751515750801.09785#2#d.data"), RepairTool: false}, + {Name: []byte("6b08eabf5667557c72dc6570aa1fb8451515750801.08639#4#d.data"), RepairTool: false}, + {Name: []byte("6b08eabf5667557c72dc6570aa1fb8451515750856.77219.meta"), RepairTool: false}, + {Name: []byte("6b08eabf5667557c72dc6570abcfb8451515643210.72429#4#d.data"), RepairTool: false}, + {Name: []byte("687ba0410f4323c66397a85292077b101515750801.10244#0#d.data"), RepairTool: false}, + {Name: []byte("c4aaea9b28c425f45eb64d4d5b0b3f621515750801.19478#2#d.data"), RepairTool: false}, + {Name: []byte("0a0898eb861579d1240adbb1c9f0c92b1515750801.20636#2#d.data"), RepairTool: false}, + } + + expectedOhashes := [][]byte{ + []byte("02573d31b770cda8e0effd7762e8a075"), + []byte("0a0898eb861579d1240adbb1c9f0c92b"), + []byte("687ba0410f4323c66397a85292077b10"), + []byte("6b08eabf5667557c72dc6570aa1fb845"), + []byte("6b08eabf5667557c72dc6570abcfb845"), + []byte("c4aaea9b28c425f45eb64d4d5b0b3f62"), + } + + for _, qObj := range objectsToQuarantine { + out, err = proto.Marshal(&qObj) + if err != nil { + t.Error(err) + } + body = bytes.NewReader(out) + + response, err = client.Post("http://unix/quarantine_object", "application/octet-stream", body) + if err = check_200(response, err); err != nil { + t.Fatalf("failed to quarantine object: %s", err) + } + response.Body.Close() + } + + // List quarantined objects + lqInfo = &pb.ListQuarantinedOHashesRequest{PageSize: 100} + out, err = proto.Marshal(lqInfo) + if err != nil { + t.Fatal(err) + } + body = bytes.NewReader(out) + + response, err = client.Post("http://unix/list_quarantined_ohashes", "application/octet-stream", body) + if err = check_200(response, err); err != nil { + t.Fatalf("failed to list quarantined ohashes: %v", err) + } + + r = &pb.ListQuarantinedOHashesReply{} + buf.Reset() + buf.ReadFrom(response.Body) + if err = proto.Unmarshal(buf.Bytes(), r); err != nil { + t.Fatal(err) + } + + response.Body.Close() + + receivedOhashes := [][]byte{} + for _, obj := range r.Objects { + receivedOhashes = append(receivedOhashes, obj.Name) + } + + if !testEqSliceBytes(receivedOhashes, expectedOhashes) { + t.Fatalf("\nexpected %v\ngot %v", expectedOhashes, receivedOhashes) + } + + // We got all quarantined objects, so NextPageToken shouldn't be set + if !bytes.Equal(r.NextPageToken, []byte("")) { + t.Fatalf("\nexpected %v got %v", []byte("foo"), r.NextPageToken) + } + + // List quarantined objects, with a PageSize of 1 + lqInfo = &pb.ListQuarantinedOHashesRequest{PageSize: 1} + out, err = proto.Marshal(lqInfo) + if err != nil { + t.Fatal(err) + } + body = bytes.NewReader(out) + + response, err = client.Post("http://unix/list_quarantined_ohashes", "application/octet-stream", body) + if err = check_200(response, err); err != nil { + t.Fatalf("failed to list quarantined ohashes: %v", err) + } + + r = &pb.ListQuarantinedOHashesReply{} + buf.Reset() + buf.ReadFrom(response.Body) + if err = proto.Unmarshal(buf.Bytes(), r); err != nil { + t.Fatal(err) + } + + response.Body.Close() + + receivedOhashes = [][]byte{} + for _, obj := range r.Objects { + receivedOhashes = append(receivedOhashes, obj.Name) + } + + if !testEqSliceBytes(receivedOhashes, [][]byte{expectedOhashes[0]}) { + t.Fatalf("\nexpected %v\ngot %v", [][]byte{expectedOhashes[0]}, receivedOhashes) + } + + // We got the first object, expect NextPageToken to be the second quarantined object hash + if !bytes.Equal(r.NextPageToken, expectedOhashes[1]) { + t.Fatalf("\nexpected %v got %v", expectedOhashes[1], r.NextPageToken) + } + + // Get the next two entries + lqInfo = &pb.ListQuarantinedOHashesRequest{PageSize: 2, PageToken: r.NextPageToken} + out, err = proto.Marshal(lqInfo) + if err != nil { + t.Fatal(err) + } + body = bytes.NewReader(out) + + response, err = client.Post("http://unix/list_quarantined_ohashes", "application/octet-stream", body) + if err = check_200(response, err); err != nil { + t.Fatalf("failed to list quarantined ohashes: %v", err) + } + + r = &pb.ListQuarantinedOHashesReply{} + buf.Reset() + buf.ReadFrom(response.Body) + if err = proto.Unmarshal(buf.Bytes(), r); err != nil { + t.Fatal(err) + } + + response.Body.Close() + + receivedOhashes = [][]byte{} + for _, obj := range r.Objects { + receivedOhashes = append(receivedOhashes, obj.Name) + } + + if !testEqSliceBytes(receivedOhashes, expectedOhashes[1:3]) { + t.Fatalf("\nexpected %v\ngot %v", expectedOhashes[1:3], receivedOhashes) + } + + // We've read 3, expecte NextPageToken to be the 4th quarantined object + if !bytes.Equal(r.NextPageToken, expectedOhashes[3]) { + t.Fatalf("\nexpected %v got %v", expectedOhashes[3], r.NextPageToken) + } + + // Get all remaining entries + lqInfo = &pb.ListQuarantinedOHashesRequest{PageSize: 100, PageToken: r.NextPageToken} + out, err = proto.Marshal(lqInfo) + if err != nil { + t.Fatal(err) + } + body = bytes.NewReader(out) + + response, err = client.Post("http://unix/list_quarantined_ohashes", "application/octet-stream", body) + if err = check_200(response, err); err != nil { + t.Fatalf("failed to list quarantined ohashes: %v", err) + } + + r = &pb.ListQuarantinedOHashesReply{} + buf.Reset() + buf.ReadFrom(response.Body) + if err = proto.Unmarshal(buf.Bytes(), r); err != nil { + t.Fatal(err) + } + + response.Body.Close() + + receivedOhashes = [][]byte{} + for _, obj := range r.Objects { + receivedOhashes = append(receivedOhashes, obj.Name) + } + + if !testEqSliceBytes(receivedOhashes, expectedOhashes[3:]) { + t.Fatalf("\nexpected %v\ngot %v", expectedOhashes[1:3], receivedOhashes) + } + + // We've read all quarantined objects, NextPageToken should not be set + if !bytes.Equal(r.NextPageToken, []byte("")) { + t.Fatalf("\nexpected %v got %v", []byte(""), r.NextPageToken) + } + +} + +func TestLoadObjectsByVolume(t *testing.T) { + // List non quarantined objects from volume 22, we should not find any + volIndex := &pb.LoadObjectsByVolumeRequest{Index: 22} + out, err := proto.Marshal(volIndex) + if err != nil { + t.Fatal(err) + } + body := bytes.NewReader(out) + + response, err := client.Post("http://unix/load_objects_by_volume", "application/octet-stream", body) + if err = check_200(response, err); err != nil { + t.Fatalf("failed to call LoadObjectsByVolume: %v", err) + } + + r := &pb.LoadObjectsByVolumeReply{} + buf := new(bytes.Buffer) + buf.ReadFrom(response.Body) + if err = proto.Unmarshal(buf.Bytes(), r); err != nil { + t.Fatal(err) + } + + if r.Objects != nil { + t.Fatalf("did not expect to find objects") + } + + // List quarantined objects from volume 22 + volIndex = &pb.LoadObjectsByVolumeRequest{Index: 22, Quarantined: true} + out, err = proto.Marshal(volIndex) + if err != nil { + t.Fatal(err) + } + body = bytes.NewReader(out) + + response, err = client.Post("http://unix/load_objects_by_volume", "application/octet-stream", body) + if err = check_200(response, err); err != nil { + t.Fatalf("failed to call LoadObjectsByVolume: %v", err) + } + + r = &pb.LoadObjectsByVolumeReply{} + buf = new(bytes.Buffer) + buf.ReadFrom(response.Body) + if err = proto.Unmarshal(buf.Bytes(), r); err != nil { + t.Fatal(err) + } + + expectedObjects := []pb.Object{ + {Name: []byte("687ba0410f4323c66397a85292077b101515750801.10244#0#d.data"), VolumeIndex: 22, Offset: 4096}, + {Name: []byte("6b08eabf5667557c72dc6570abcfb8451515643210.72429#4#d.data"), VolumeIndex: 22, Offset: 8192}, + } + + // we should have all of them + if len(r.Objects) != len(expectedObjects) { + t.Fatalf("Expected %d objects, got %d", len(expectedObjects), len(r.Objects)) + } + if r.NextPageToken != nil { + t.Fatalf("Expected NextPageToken to be nil, but got: %s", string(r.NextPageToken)) + } + + for i, obj := range r.Objects { + if !bytes.Equal(obj.Name, expectedObjects[i].Name) { + // t.Fatalf("expected %v, got %v", expectedObjects[i].Name, obj.Name) + t.Fatalf("expected %s, got %s", string(expectedObjects[i].Name), string(obj.Name)) + } + if obj.VolumeIndex != expectedObjects[i].VolumeIndex { + t.Fatalf("expected %d, got %d", expectedObjects[i].VolumeIndex, obj.VolumeIndex) + } + if obj.Offset != expectedObjects[i].Offset { + t.Fatalf("expected %d, got %d", expectedObjects[i].Offset, obj.Offset) + } + } + + // List quarantined objects from volume 22 with pagination + volIndex = &pb.LoadObjectsByVolumeRequest{Index: 22, Quarantined: true, PageSize: 1} + out, err = proto.Marshal(volIndex) + if err != nil { + t.Fatal(err) + } + body = bytes.NewReader(out) + + response, err = client.Post("http://unix/load_objects_by_volume", "application/octet-stream", body) + if err = check_200(response, err); err != nil { + t.Fatalf("failed to call LoadObjectsByVolume: %v", err) + } + + r = &pb.LoadObjectsByVolumeReply{} + buf = new(bytes.Buffer) + buf.ReadFrom(response.Body) + if err = proto.Unmarshal(buf.Bytes(), r); err != nil { + t.Fatal(err) + } + + // we should have one object + if len(r.Objects) != 1 { + t.Fatalf("Expected 1 objects, got %d", len(r.Objects)) + } + if !bytes.Equal(r.NextPageToken, expectedObjects[1].Name) { + t.Fatalf("Expected NextPageToken to be %s, but got: %s", string(expectedObjects[1].Name), string(r.NextPageToken)) + } + + if !bytes.Equal(r.Objects[0].Name, expectedObjects[0].Name) { + // t.Fatalf("expected %v, got %v", expectedObjects[i].Name, obj.Name) + t.Fatalf("expected %s, got %s", string(expectedObjects[0].Name), string(r.Objects[0].Name)) + } + if r.Objects[0].VolumeIndex != expectedObjects[0].VolumeIndex { + t.Fatalf("expected %d, got %d", expectedObjects[0].VolumeIndex, r.Objects[0].VolumeIndex) + } + if r.Objects[0].Offset != expectedObjects[0].Offset { + t.Fatalf("expected %d, got %d", expectedObjects[0].Offset, r.Objects[0].Offset) + } + + // Second call with pagination + volIndex = &pb.LoadObjectsByVolumeRequest{Index: 22, Quarantined: true, PageSize: 1, PageToken: r.NextPageToken} + out, err = proto.Marshal(volIndex) + if err != nil { + t.Fatal(err) + } + body = bytes.NewReader(out) + + response, err = client.Post("http://unix/load_objects_by_volume", "application/octet-stream", body) + if err = check_200(response, err); err != nil { + t.Fatalf("failed to call LoadObjectsByVolume: %v", err) + } + + r = &pb.LoadObjectsByVolumeReply{} + buf = new(bytes.Buffer) + buf.ReadFrom(response.Body) + if err = proto.Unmarshal(buf.Bytes(), r); err != nil { + t.Fatal(err) + } + + // we should have one object + if len(r.Objects) != 1 { + t.Fatalf("Expected 1 objects, got %d", len(r.Objects)) + } + if r.NextPageToken != nil { + t.Fatalf("Expected NextPageToken to be nil, but got: %s", string(r.NextPageToken)) + } + + if !bytes.Equal(r.Objects[0].Name, expectedObjects[1].Name) { + t.Fatalf("expected %s, got %s", string(expectedObjects[0].Name), string(r.Objects[0].Name)) + } + if r.Objects[0].VolumeIndex != expectedObjects[1].VolumeIndex { + t.Fatalf("expected %d, got %d", expectedObjects[1].VolumeIndex, r.Objects[0].VolumeIndex) + } + if r.Objects[0].Offset != expectedObjects[1].Offset { + t.Fatalf("expected %d, got %d", expectedObjects[1].Offset, r.Objects[0].Offset) + } +} + +func TestListQuarantinedOHash(t *testing.T) { + ohash := &pb.ListQuarantinedOHashRequest{Prefix: []byte("6b08eabf5667557c72dc6570aa1fb845"), RepairTool: false} + out, err := proto.Marshal(ohash) + if err != nil { + t.Fatal(err) + } + body := bytes.NewReader(out) + + response, err := client.Post("http://unix/list_quarantined_ohash", "application/octet-stream", body) + if err = check_200(response, err); err != nil { + t.Fatalf("error listing quarantined object files: %s", err) + } + + qList := &pb.ListQuarantinedOHashReply{} + buf := new(bytes.Buffer) + buf.ReadFrom(response.Body) + if err = proto.Unmarshal(buf.Bytes(), qList); err != nil { + t.Fatal(err) + } + response.Body.Close() + + expectedFiles := [][]byte{ + []byte("6b08eabf5667557c72dc6570aa1fb8451515750801.08639#4#d.data"), + []byte("6b08eabf5667557c72dc6570aa1fb8451515750856.77219.meta"), + } + + if len(qList.Objects) != len(expectedFiles) { + t.Fatalf("got %d objects, expected %d", len(qList.Objects), len(expectedFiles)) + } + + receivedFiles := make([][]byte, len(qList.Objects)) + for i, obj := range qList.Objects { + receivedFiles[i] = obj.Name + } + + if !testEqSliceBytes(receivedFiles, expectedFiles) { + t.Fatalf("\nexpected %v\ngot %v", expectedFiles, receivedFiles) + } + + // Add test, non existent ohash + // Add test, unquarantine one file, list again +} + +func testEqSliceBytes(a, b [][]byte) bool { + if a == nil && b == nil { + return true + } + if a == nil || b == nil { + return false + } + if len(a) != len(b) { + return false + } + for i := range a { + if !bytes.Equal(a[i], b[i]) { + return false + } + } + return true +} diff --git a/go/swift-rpc-losf/snappy b/go/swift-rpc-losf/snappy new file mode 160000 +Subproject 3f194acb57e0487531c96b97af61dcbd025a78a diff --git a/go/swift-rpc-losf/stats.go b/go/swift-rpc-losf/stats.go new file mode 100644 index 000000000..cc8ebd740 --- /dev/null +++ b/go/swift-rpc-losf/stats.go @@ -0,0 +1,41 @@ +// Copyright (c) 2010-2012 OpenStack Foundation +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +// Returns the number of entries within the namespace +func countItems(kv KV, namespace byte) (itemCount uint64) { + it := kv.NewIterator(namespace) + defer it.Close() + + for it.SeekToFirst(); it.Valid(); it.Next() { + itemCount++ + } + + return +} + +// Returns KV stats in a map +// It will walk the whole KV to do so, used mostly for debugging, don't use for monitoring. +func CollectStats(s *server) (entriesCount map[string]uint64) { + entriesCount = make(map[string]uint64) + + entriesCount["volume_count"] = countItems(s.kv, volumePrefix) + entriesCount["object_count"] = countItems(s.kv, objectPrefix) + entriesCount["deletequeue_count"] = countItems(s.kv, deleteQueuePrefix) + entriesCount["quarantine_count"] = countItems(s.kv, quarantinePrefix) + + return +} diff --git a/go/swift-rpc-losf/status/status.go b/go/swift-rpc-losf/status/status.go new file mode 100644 index 000000000..66b21646c --- /dev/null +++ b/go/swift-rpc-losf/status/status.go @@ -0,0 +1,27 @@ +package status + +import ( + "fmt" + "github.com/openstack/swift-rpc-losf/codes" +) + +type RpcError struct { + code codes.StatusCode + msg string +} + +func (e *RpcError) Error() string { + return fmt.Sprintf("rpc error: %d. %s", e.code, e.msg) +} + +func Error(code codes.StatusCode, msg string) error { + return &RpcError{code, msg} +} + +func Errorf(code codes.StatusCode, format string, a ...interface{}) error { + return Error(code, fmt.Sprintf(format, a...)) +} + +func (e *RpcError) Code() codes.StatusCode { + return e.code +} diff --git a/go/swift-rpc-losf/swift.go b/go/swift-rpc-losf/swift.go new file mode 100644 index 000000000..c51362dac --- /dev/null +++ b/go/swift-rpc-losf/swift.go @@ -0,0 +1,66 @@ +// Copyright (c) 2010-2012 OpenStack Foundation +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "errors" + "fmt" + "strconv" +) + +// getPartitionFromOhash returns the partition, given an object hash and the partition bit count +func getPartitionFromOhash(ohash []byte, partitionBits int) (partition uint64, err error) { + if len(ohash) < 16 { + err = errors.New("ohash must be at least 16 bits long") + return + } + highHash, err := strconv.ParseUint(string(ohash[0:16]), 16, 64) + if err != nil { + return + } + + // shift to get the partition + partition = highHash >> uint64(64-partitionBits) + return +} + +// getLastPartition returns the last possible partition given the partition bit count +func getLastPartition(partitionBits int) (partition uint64, err error) { + for i := 0; i < partitionBits; i++ { + partition |= 1 << uint64(i) + } + return +} + +// Returns the first possible object prefix the KV for the given partition and bit count +// Example: 876, 18 bits -> 00db000000000000 +func getObjPrefixFromPartition(partition uint64, partitionBits int) (prefix []byte, err error) { + firstnum := partition << uint64(64-partitionBits) + prefix = []byte(fmt.Sprintf("%016x0000000000000000", firstnum)) + return +} + +// Returns the first possible object prefix the KV for the given partition and bit count, +// in its encoded form +func getEncodedObjPrefixFromPartition(partition uint64, partitionBits int) (prefix []byte, err error) { + key, err := getObjPrefixFromPartition(partition, partitionBits) + if err != nil { + return + } + + prefix, err = EncodeObjectKey(key) + return +} diff --git a/go/swift-rpc-losf/swift_test.go b/go/swift-rpc-losf/swift_test.go new file mode 100644 index 000000000..5e488558b --- /dev/null +++ b/go/swift-rpc-losf/swift_test.go @@ -0,0 +1,130 @@ +// Copyright (c) 2010-2012 OpenStack Foundation +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "bytes" + "testing" +) + +type getPartitionTest struct { + ohash string + bitCount int + expectedPartition uint64 +} + +func TestGetPartitionFromOhash(t *testing.T) { + var getPartitionTests = []getPartitionTest{ + {"b80362143ac3221d15a75f4bd1af3fac", 18, 188429}, + {"00db344e979c8c8fa4376dc60ba8102e", 18, 876}, + {"01342cbbf02d9b27396ac937b0f049e1", 18, 1232}, + {"ffffc63ac2fa908fc137e7e0f1c4df97", 18, 262143}, + } + + for _, tt := range getPartitionTests { + partition, err := getPartitionFromOhash([]byte(tt.ohash), tt.bitCount) + if err != nil { + t.Error(err) + } + if partition != tt.expectedPartition { + t.Errorf("For ohash: %s, got partition %d, expected %d\n", tt.ohash, partition, tt.expectedPartition) + } + } + + // Test invalid data, too short + invalidHash := []byte("abcd") + bitCount := 18 + _, err := getPartitionFromOhash(invalidHash, bitCount) + if err == nil { + t.Fatalf("Should fail to getPartitionFromOhash for: %x, %d", invalidHash, bitCount) + } + + // invalid md5 + invalidHash = []byte("zzzz2cbbf02d9b27396ac937b0f049e1") + _, err = getPartitionFromOhash(invalidHash, bitCount) + if err == nil { + t.Fatalf("Should fail to getPartitionFromOhash for: %x, %d", invalidHash, bitCount) + } +} + +type getLastPartitionTest struct { + bitCount int + expectedPartition uint64 +} + +func TestGetLastPartition(t *testing.T) { + var getLastPartitionTests = []getLastPartitionTest{ + {18, 262143}, + {17, 131071}, + {16, 65535}, + } + + for _, tt := range getLastPartitionTests { + partition, err := getLastPartition(tt.bitCount) + if err != nil { + t.Error(err) + } + if partition != tt.expectedPartition { + t.Errorf("For bitcount: %d, got last partition: %d, expected %d\n", tt.bitCount, partition, tt.expectedPartition) + } + } +} + +type getObjTest struct { + partition uint64 + bitCount int + expectedPrefix []byte +} + +func TestGetObjPrefixFromPartition(t *testing.T) { + var getObjTests = []getObjTest{ + {876, 18, []byte("00db0000000000000000000000000000")}, + {209827, 18, []byte("cce8c000000000000000000000000000")}, + {260177, 18, []byte("fe144000000000000000000000000000")}, + {260179, 18, []byte("fe14c000000000000000000000000000")}, + {260180, 18, []byte("fe150000000000000000000000000000")}, + } + + for _, tt := range getObjTests { + prefix, err := getObjPrefixFromPartition(tt.partition, tt.bitCount) + if err != nil { + t.Error(err) + } + if bytes.Compare(prefix, tt.expectedPrefix) != 0 { + t.Errorf("For partition: %d, bitCount: %d, got prefix: %s, expected %s\n", tt.partition, tt.bitCount, prefix, tt.expectedPrefix) + } + } +} + +func TestGetEncodedObjPrefixFromPartition(t *testing.T) { + var getObjTests = []getObjTest{ + {876, 18, []byte("\x00\xdb\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00")}, + {209827, 18, []byte("\xcc\xe8\xc0\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00")}, + {260177, 18, []byte("\xfe\x14\x40\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00")}, + {260179, 18, []byte("\xfe\x14\xc0\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00")}, + {260180, 18, []byte("\xfe\x15\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00")}, + } + + for _, tt := range getObjTests { + prefix, err := getEncodedObjPrefixFromPartition(tt.partition, tt.bitCount) + if err != nil { + t.Error(err) + } + if bytes.Compare(prefix, tt.expectedPrefix) != 0 { + t.Errorf("For partition: %d, bitCount: %d, got prefix: %x, expected %x\n", tt.partition, tt.bitCount, prefix, tt.expectedPrefix) + } + } +} diff --git a/go/swift-rpc-losf/utils.go b/go/swift-rpc-losf/utils.go new file mode 100644 index 000000000..c8b9f963a --- /dev/null +++ b/go/swift-rpc-losf/utils.go @@ -0,0 +1,102 @@ +// Copyright (c) 2010-2012 OpenStack Foundation +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "bufio" + "fmt" + "golang.org/x/sys/unix" + "os" + "strings" +) + +// returns true is dirPath is mounted, false otherwise +func isMounted(dirPath string) (bool, error) { + f, err := os.Open("/proc/mounts") + if err != nil { + return false, err + } + defer f.Close() + + scanner := bufio.NewScanner(f) + for scanner.Scan() { + elems := strings.Split(scanner.Text(), " ") + if dirPath == elems[1] { + return true, nil + } + } + + if err := scanner.Err(); err != nil { + return false, err + } + + return false, nil +} + +// Returns true if path exists and is a directory, false otherwise +func dirExists(dirPath string) (bool, error) { + stat, err := os.Stat(dirPath) + if err != nil { + if os.IsNotExist(err) { + return false, nil + } + return false, err + } + + if stat.IsDir() { + return true, nil + } + return false, nil +} + +// Returns the baseDir string from rootDir +func getBaseDirName(rootDir string, policyIdx int) string { + if policyIdx == 0 { + return rootDir + } + + return fmt.Sprintf("%s-%d", rootDir, policyIdx) +} + +// Create a file on the filesystem that will signal to the object-server +// that the KV cannot be used. (done along with check_mount) +func CreateDirtyFile(dirtyFilePath string) (err error) { + f, err := os.Create(dirtyFilePath) + if err != nil { + return + } + f.Close() + return +} + +// global variable, lest the file will be autoclosed and the lock released when +// the lockSocket() function returns. +var lockFile *os.File + +// Acquire a lock on a file to protect the RPC socket. +// Does not block and will return an error if the lock cannot be acquired +// There is no explicit unlock, it will be unlocked when the process stops +func lockSocket(socketPath string) (err error) { + lockFilePath := fmt.Sprintf("%s.lock", socketPath) + + lockFile, err = os.OpenFile(lockFilePath, os.O_WRONLY|os.O_CREATE, 00600) + if err != nil { + return + } + + err = unix.Flock(int(lockFile.Fd()), unix.LOCK_EX|unix.LOCK_NB) + return +} |