summaryrefslogtreecommitdiff
path: root/go
diff options
context:
space:
mode:
Diffstat (limited to 'go')
-rw-r--r--go/swift-rpc-losf/Makefile49
-rw-r--r--go/swift-rpc-losf/README.md20
-rw-r--r--go/swift-rpc-losf/codes/codes.go17
-rw-r--r--go/swift-rpc-losf/db.go121
-rw-r--r--go/swift-rpc-losf/db_goleveldb.go230
-rw-r--r--go/swift-rpc-losf/db_leveldb.go239
-rw-r--r--go/swift-rpc-losf/encoding.go198
-rw-r--r--go/swift-rpc-losf/encoding_test.go225
-rw-r--r--go/swift-rpc-losf/go.mod27
-rw-r--r--go/swift-rpc-losf/go.sum76
m---------go/swift-rpc-losf/leveldb0
-rw-r--r--go/swift-rpc-losf/logging.go43
-rw-r--r--go/swift-rpc-losf/main.go174
-rw-r--r--go/swift-rpc-losf/proto/fmgr.pb.go2213
-rw-r--r--go/swift-rpc-losf/rpc.go1642
-rw-r--r--go/swift-rpc-losf/rpc_test.go1014
m---------go/swift-rpc-losf/snappy0
-rw-r--r--go/swift-rpc-losf/stats.go41
-rw-r--r--go/swift-rpc-losf/status/status.go27
-rw-r--r--go/swift-rpc-losf/swift.go66
-rw-r--r--go/swift-rpc-losf/swift_test.go130
-rw-r--r--go/swift-rpc-losf/utils.go102
22 files changed, 6654 insertions, 0 deletions
diff --git a/go/swift-rpc-losf/Makefile b/go/swift-rpc-losf/Makefile
new file mode 100644
index 000000000..b3c365c48
--- /dev/null
+++ b/go/swift-rpc-losf/Makefile
@@ -0,0 +1,49 @@
+# Copyright (c) 2010-2012 OpenStack Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+.PHONY: all build-snappy install-snappy build-leveldb install-leveldb install-go-leveldb build-losf
+all: build-snappy install-snappy build-leveldb install-leveldb install-go-leveldb build-losf
+
+build-losf:
+ # this installs the protoc-gen-go in $HOME/go/bin, and requires the PATH be set accordingly
+ go get -u github.com/golang/protobuf/protoc-gen-go
+ protoc -I ../../swift/obj fmgr.proto --go_out=proto
+ go get
+ go build -o ../../bin/swift-rpc-losf
+
+# TODO: installation will be taken by setup.py when swift-rpc-losf is
+# included bin/ directory
+
+build-snappy:
+ git submodule update --init snappy
+ sed -i 's/\(BUILD_SHARED_LIBS "Build.*\) OFF/\1 ON/' snappy/CMakeLists.txt
+ mkdir -p snappy/build
+ cmake -S snappy -B snappy/build
+ $(MAKE) -C snappy/build all
+
+install-snappy:
+ sudo $(MAKE) -C snappy/build install
+
+build-leveldb:
+ git submodule update --init leveldb
+ mkdir -p leveldb/build
+ cmake -DBUILD_SHARED_LIBS=ON -S leveldb -B leveldb/build
+ cmake --build leveldb/build
+
+install-leveldb:
+ sudo $(MAKE) -C leveldb/build install
+
+install-go-leveldb:
+ CGO_CFLAGS=/usr/local/include CGO_LDFLAGS="-L/usr/local/lib -Wl,-rpath=/usr/local/lib" go get github.com/jmhodges/levigo
diff --git a/go/swift-rpc-losf/README.md b/go/swift-rpc-losf/README.md
new file mode 100644
index 000000000..3cb8d5a28
--- /dev/null
+++ b/go/swift-rpc-losf/README.md
@@ -0,0 +1,20 @@
+This is the RPC server part of the "LOSF" (Lots Of Small Files) work.
+
+Setup
+=====
+You will need a working golang environment, gcc and tools (build-essential), and cmake >= 3.9 (ubuntu 16.04 has a version that is too old, get it from cmake.org)
+
+GO>=1.11 is required to support go modules (https://github.com/golang/go/wiki/Modules).
+
+Run `make` command in this directory
+```
+make
+sudo make install
+```
+
+Usage ****OUTDATED*****
+=====
+Currently it does not read the ring, you need to start one process per disk and policy on your object-server.
+For example : swift-rpc-losf -diskPath=/srv/node/sda -policyIdx=0 -waitForMount=false
+
+Note that a new database is marked dirty, because there may already be data on disk. (original db may have been removed or corrupted)
diff --git a/go/swift-rpc-losf/codes/codes.go b/go/swift-rpc-losf/codes/codes.go
new file mode 100644
index 000000000..c7d6aaed9
--- /dev/null
+++ b/go/swift-rpc-losf/codes/codes.go
@@ -0,0 +1,17 @@
+package codes
+
+type StatusCode int
+
+//go:generate stringer -type=StatusCode
+const (
+ Ok StatusCode = 200
+ Cancelled StatusCode = 299
+ InvalidArgument StatusCode = 400
+ NotFound StatusCode = 404
+ AlreadyExists StatusCode = 409
+ PermissionDenied StatusCode = 403
+ FailedPrecondition StatusCode = 412
+ Unimplemented StatusCode = 501
+ Internal StatusCode = 500
+ Unavailable StatusCode = 503
+)
diff --git a/go/swift-rpc-losf/db.go b/go/swift-rpc-losf/db.go
new file mode 100644
index 000000000..1a2832c06
--- /dev/null
+++ b/go/swift-rpc-losf/db.go
@@ -0,0 +1,121 @@
+// Copyright (c) 2010-2012 OpenStack Foundation
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+// implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// This is the definition of the key-value interface.
+
+package main
+
+import (
+ "bytes"
+ "fmt"
+)
+
+// KV is the interface for operations that must be supported on the key-value store.
+// the namespace is a single byte that is used as a key prefix for the different types of objects
+// represented in the key-value; (volume, vfile..)
+type KV interface {
+ Get(namespace byte, key []byte) ([]byte, error)
+ Put(namespace byte, key, value []byte) error
+ PutSync(namespace byte, key, value []byte) error
+ Delete(namespace byte, key []byte) error
+ NewWriteBatch() WriteBatch
+ NewIterator(namespace byte) Iterator
+ Close()
+}
+
+// Iterator is the interface for operations that must be supported on the key-value iterator.
+type Iterator interface {
+ SeekToFirst()
+ Seek(key []byte)
+ Next()
+ Key() []byte
+ Value() []byte
+ Valid() bool
+ Close()
+}
+
+// WriteBatch is the interface for operations that must be supported on a "WriteBatch".
+// The key-value used must support a write batch (atomic write of multiple entries)
+type WriteBatch interface {
+ // Put places a key-value pair into the WriteBatch for writing later.
+ Put(namespace byte, key, value []byte)
+ Delete(namespace byte, key []byte)
+
+ // Commit the WriteBatch atomically
+ Commit() error
+ Close()
+}
+
+// Key for the state of the DB. If it has shut down cleanly, the value should be "closed"
+const dbStateKey = "dbstate"
+const closeState = "closed"
+const openState = "opened"
+
+// setKvState will be called on startup and check whether the kv was closed cleanly.
+// It will then mark the db as "opened".
+// This is needed because we write asynchronously to the key-value. After a crash/power loss/OOM kill, the db
+// may be not in sync with the actual state of the volumes.
+func setKvState(kv KV) (isClean bool, err error) {
+ // Check if we stopped cleanly
+ isClean, err = IsDbClean(kv)
+ if err != nil {
+ log.Warn("Could not check if DB is clean")
+ return
+ }
+
+ if isClean {
+ log.Info("DB is clean, set db state to open")
+ err = MarkDbOpened(kv)
+ if err != nil {
+ log.Warn("Failed to mark db as opened when starting")
+ return
+ }
+ }
+
+ return
+}
+
+// IsDbClean will return true if the db has been previously closed properly.
+// This is determined from a specific key in the database that should be set before closing.
+func IsDbClean(kv KV) (isClean bool, err error) {
+ value, err := kv.Get(statsPrefix, []byte(dbStateKey))
+ if err != nil {
+ log.Warn("failed to check kv state")
+ return
+ }
+
+ // if the state is "closed", consider it clean
+ // if the key is missing (new db) consider it dirty. It may have been deleted after a
+ // corruption and we want to rebuild the DB with the existing volumes, not let the cluster
+ // restart from scratch. If it's an actual new machine, the check will do nothing (no existing volumes)
+ if bytes.Equal(value, []byte(closeState)) {
+ isClean = true
+ } else {
+ log.Info(fmt.Sprintf("DB was not closed cleanly, state: %s", value))
+ }
+ return
+}
+
+// MarkDbClosed marks the DB as clean by setting the value of the db state key
+func MarkDbClosed(kv KV) (err error) {
+ err = kv.PutSync(statsPrefix, []byte(dbStateKey), []byte(closeState))
+ return
+}
+
+// MarkDbOpened marks the DB as opened by setting the value of the db state key
+func MarkDbOpened(kv KV) (err error) {
+ err = kv.PutSync(statsPrefix, []byte(dbStateKey), []byte(openState))
+ return
+}
diff --git a/go/swift-rpc-losf/db_goleveldb.go b/go/swift-rpc-losf/db_goleveldb.go
new file mode 100644
index 000000000..454e5b29c
--- /dev/null
+++ b/go/swift-rpc-losf/db_goleveldb.go
@@ -0,0 +1,230 @@
+// Copyright (c) 2010-2012 OpenStack Foundation
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+// implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// This implements the KV interface using goleveldb, a native golang leveldb package.
+// Its behavior has been adapted to match the levigo behavior.
+
+package main
+
+import (
+ "github.com/syndtr/goleveldb/leveldb"
+ "github.com/syndtr/goleveldb/leveldb/opt"
+ "github.com/syndtr/goleveldb/leveldb/iterator"
+)
+
+type goLevelDB struct {
+ db *leveldb.DB
+ ro *opt.ReadOptions
+ wo *opt.WriteOptions
+}
+
+type levelDBIterator struct {
+ it iterator.Iterator
+ namespace byte
+}
+
+type levelDBWriteBatch struct {
+ wb *leveldb.Batch
+ ldb *goLevelDB
+}
+
+// openGoLevelDB Opens or create the DB.
+// (should use an interface?)
+func openGoLevelDb(path string) (*goLevelDB, error) {
+
+ // TODO check options
+ db, err := leveldb.OpenFile(path, nil)
+ if err != nil {
+ return nil, err
+ }
+
+ ro := &opt.ReadOptions{}
+ wo := &opt.WriteOptions{}
+
+ ldb := goLevelDB{db, ro, wo}
+
+ return &ldb, nil
+}
+
+// Key value operations
+//
+// All operations take a namespace byte to denote the type of object the entry refers to.
+func (ldb *goLevelDB) Get(namespace byte, key []byte) (value []byte, err error) {
+ db := ldb.db
+ ro := ldb.ro
+
+ // Prefix the key with a single byte (namespace)
+ buf := make([]byte, len(key)+1)
+ buf[0] = namespace
+ copy(buf[1:], key)
+
+ value, err = db.Get(buf, ro)
+ // Behave similarly to levigo
+ if err == leveldb.ErrNotFound {
+ value = nil
+ err = nil
+ }
+ return
+}
+
+func (ldb *goLevelDB) Put(namespace byte, key, value []byte) error {
+ db := ldb.db
+ wo := ldb.wo
+
+ // Prefix the key with a single byte (namespace)
+ buf := make([]byte, len(key)+1)
+ buf[0] = namespace
+ copy(buf[1:], key)
+
+ return db.Put(buf, value, wo)
+}
+
+// PutSync will write an entry with the "Sync" option set
+func (ldb *goLevelDB) PutSync(namespace byte, key, value []byte) error {
+ db := ldb.db
+ wo := &opt.WriteOptions{Sync: true}
+
+ // Prefix the key with a single byte (namespace)
+ buf := make([]byte, len(key)+1)
+ buf[0] = namespace
+ copy(buf[1:], key)
+
+ return db.Put(buf, value, wo)
+}
+
+func (ldb *goLevelDB) Close() {
+ ldb.db.Close()
+}
+
+func (ldb *goLevelDB) Delete(namespace byte, key []byte) error {
+ db := ldb.db
+ wo := ldb.wo
+
+ // Prefix the key with a single byte (namespace)
+ buf := make([]byte, len(key)+1)
+ buf[0] = namespace
+ copy(buf[1:], key)
+
+ return db.Delete(buf, wo)
+}
+
+func (ldb *goLevelDB) NewWriteBatch() WriteBatch {
+ lwb := &levelDBWriteBatch{}
+ lwb.wb = new(leveldb.Batch)
+ lwb.ldb = ldb
+ return lwb
+}
+
+// Put on a WriteBatch
+func (lwb *levelDBWriteBatch) Put(namespace byte, key, value []byte) {
+ buf := make([]byte, len(key)+1)
+ buf[0] = namespace
+ copy(buf[1:], key)
+
+ lwb.wb.Put(buf, value)
+ return
+}
+
+// Delete on a WriteBatch
+func (lwb *levelDBWriteBatch) Delete(namespace byte, key []byte) {
+ buf := make([]byte, len(key)+1)
+ buf[0] = namespace
+ copy(buf[1:], key)
+
+ lwb.wb.Delete(buf)
+ return
+}
+
+// Commit a WriteBatch
+func (lwb *levelDBWriteBatch) Commit() (err error) {
+ db := lwb.ldb.db
+ wo := lwb.ldb.wo
+ wb := lwb.wb
+
+ err = db.Write(wb, wo)
+
+ return
+}
+
+// Close a WriteBatch
+func (lwb *levelDBWriteBatch) Close() {
+ // TODO: check if there really is nothing to do
+}
+
+// Iterator functions
+//
+// NewIterator creates a new iterator for the given object type (namespace)
+func (ldb *goLevelDB) NewIterator(namespace byte) Iterator {
+ db := ldb.db
+ ro := ldb.ro
+
+ // Could use the "range" thing in this library
+ lit := &levelDBIterator{}
+ lit.it = db.NewIterator(nil, ro)
+ lit.namespace = namespace
+ return lit
+}
+
+// SeekToFirst will seek to the first object of the given type
+func (lit *levelDBIterator) SeekToFirst() {
+ // The "first key" is the first one in the iterator's namespace
+ buf := make([]byte, 1)
+ buf[0] = lit.namespace
+
+ lit.it.Seek(buf)
+ return
+}
+
+// Seek moves the iterator to the position of the key
+func (lit *levelDBIterator) Seek(key []byte) {
+ // Prefix the key with a single byte (namespace)
+ buf := make([]byte, len(key)+1)
+ buf[0] = lit.namespace
+ copy(buf[1:], key)
+
+ lit.it.Seek(buf)
+ return
+}
+
+// Next moves the iterator to the next key
+func (lit *levelDBIterator) Next() {
+ lit.it.Next()
+ return
+}
+
+// Key returns the key (without the leading namespace byte)
+func (lit *levelDBIterator) Key() (key []byte) {
+ return lit.it.Key()[1:]
+}
+
+// Value returns the value at the current iterator position
+func (lit *levelDBIterator) Value() (key []byte) {
+ return lit.it.Value()
+}
+
+// Valid returns false if we are past the first or last key in the key-value
+func (lit *levelDBIterator) Valid() bool {
+ if lit.it.Key() != nil && lit.it.Key()[0] == lit.namespace {
+ return true
+ } else {
+ return false
+ }
+}
+
+// Close the iterator
+func (lit *levelDBIterator) Close() {
+ lit.it.Release()
+ return
+}
diff --git a/go/swift-rpc-losf/db_leveldb.go b/go/swift-rpc-losf/db_leveldb.go
new file mode 100644
index 000000000..4d3a35a82
--- /dev/null
+++ b/go/swift-rpc-losf/db_leveldb.go
@@ -0,0 +1,239 @@
+// Copyright (c) 2010-2012 OpenStack Foundation
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+// implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// This implements the KV interface using levigo, which is a golang wrapper around the leveldb C++ library.
+
+package main
+
+import (
+ "github.com/jmhodges/levigo"
+)
+
+// levigoDB holds the leveldb handle and options
+type levigoDB struct {
+ db *levigo.DB
+ ro *levigo.ReadOptions
+ wo *levigo.WriteOptions
+}
+
+// levigoIterator wraps a levelDB iterator. The namespace byte is used to specify which type of
+// entry (volume, vfile..) it will iterate on.
+type levigoIterator struct {
+ it *levigo.Iterator
+ namespace byte
+}
+
+// levigoWriteBatch wraps a levigoDB WriteBatch
+type levigoWriteBatch struct {
+ wb *levigo.WriteBatch
+ ldb *levigoDB
+}
+
+// openLevigoDB Opens or create the DB.
+// (shoult use an interface?)
+func openLevigoDB(path string) (*levigoDB, error) {
+
+ opts := levigo.NewOptions()
+ // filter := levigo.NewBloomFilter(10)
+ // opts.SetFilterPolicy(filter)
+ // That may be useless, since we're supposed to fit in memory ? 10MB for now
+ opts.SetCache(levigo.NewLRUCache(10 * 1048576))
+ opts.SetCreateIfMissing(true)
+
+ // This will open or create the DB. A new DB is not marked as clean. It
+ // may have been lost or deleted, while there is data in volumes on-disk.
+ // A new DB will have to be checked and marked as clean.
+ db, err := levigo.Open(path, opts)
+ if err != nil {
+ return nil, err
+ }
+
+ ro := levigo.NewReadOptions()
+ wo := levigo.NewWriteOptions()
+
+ ldb := levigoDB{db, ro, wo}
+
+ return &ldb, nil
+}
+
+// Key value operations
+//
+// All operations take a namespace byte to denote the type of object the entry refers to.
+// Get wraps levigoDB Get
+func (ldb *levigoDB) Get(namespace byte, key []byte) (value []byte, err error) {
+ db := ldb.db
+ ro := ldb.ro
+
+ // Prefix the key with a single byte (namespace)
+ buf := make([]byte, len(key)+1)
+ buf[0] = namespace
+ copy(buf[1:], key)
+
+ value, err = db.Get(ro, buf)
+ return
+}
+
+// Put wraps levigoDB Put
+func (ldb *levigoDB) Put(namespace byte, key, value []byte) error {
+ db := ldb.db
+ wo := ldb.wo
+
+ // Prefix the key with a single byte (namespace)
+ buf := make([]byte, len(key)+1)
+ buf[0] = namespace
+ copy(buf[1:], key)
+
+ return db.Put(wo, buf, value)
+}
+
+// PutSync will write an entry with the "Sync" option set
+func (ldb *levigoDB) PutSync(namespace byte, key, value []byte) error {
+ db := ldb.db
+ wo := levigo.NewWriteOptions()
+ wo.SetSync(true)
+
+ // Prefix the key with a single byte (namespace)
+ buf := make([]byte, len(key)+1)
+ buf[0] = namespace
+ copy(buf[1:], key)
+
+ return db.Put(wo, buf, value)
+}
+
+// Close wraps levigoDB Close
+func (ldb *levigoDB) Close() {
+ ldb.db.Close()
+}
+
+// Delete wraps levigoDB Delete
+func (ldb *levigoDB) Delete(namespace byte, key []byte) error {
+ db := ldb.db
+ wo := ldb.wo
+
+ // Prefix the key with a single byte (namespace)
+ buf := make([]byte, len(key)+1)
+ buf[0] = namespace
+ copy(buf[1:], key)
+
+ return db.Delete(wo, buf)
+}
+
+// NewWriteBatch creates a new WriteBatch
+func (ldb *levigoDB) NewWriteBatch() WriteBatch {
+ lwb := &levigoWriteBatch{}
+ lwb.wb = levigo.NewWriteBatch()
+ lwb.ldb = ldb
+ return lwb
+}
+
+// Put on a WriteBatch
+func (lwb *levigoWriteBatch) Put(namespace byte, key, value []byte) {
+ buf := make([]byte, len(key)+1)
+ buf[0] = namespace
+ copy(buf[1:], key)
+
+ lwb.wb.Put(buf, value)
+ return
+}
+
+// Delete on a WriteBatch
+func (lwb *levigoWriteBatch) Delete(namespace byte, key []byte) {
+ buf := make([]byte, len(key)+1)
+ buf[0] = namespace
+ copy(buf[1:], key)
+
+ lwb.wb.Delete(buf)
+ return
+}
+
+// Commit a WriteBatch
+func (lwb *levigoWriteBatch) Commit() (err error) {
+ db := lwb.ldb.db
+ wo := lwb.ldb.wo
+ wb := lwb.wb
+
+ err = db.Write(wo, wb)
+
+ return
+}
+
+// Close a WriteBatch
+func (lwb *levigoWriteBatch) Close() {
+ wb := lwb.wb
+
+ wb.Close()
+}
+
+// Iterator functions
+//
+// NewIterator creates a new iterator for the given object type (namespace)
+func (ldb *levigoDB) NewIterator(namespace byte) Iterator {
+ lit := &levigoIterator{}
+ lit.it = ldb.db.NewIterator(ldb.ro)
+ lit.namespace = namespace
+ return lit
+}
+
+// SeekToFirst will seek to the first object of the given type
+func (lit *levigoIterator) SeekToFirst() {
+ // The "first key" is the first one in the iterator's namespace
+ buf := make([]byte, 1)
+ buf[0] = lit.namespace
+
+ lit.it.Seek(buf)
+ return
+}
+
+// Seek moves the iterator to the position of the key
+func (lit *levigoIterator) Seek(key []byte) {
+ // Prefix the key with a single byte (namespace)
+ buf := make([]byte, len(key)+1)
+ buf[0] = lit.namespace
+ copy(buf[1:], key)
+
+ lit.it.Seek(buf)
+ return
+}
+
+// Next moves the iterator to the next key
+func (lit *levigoIterator) Next() {
+ lit.it.Next()
+ return
+}
+
+// Key returns the key (without the leading namespace byte)
+func (lit *levigoIterator) Key() (key []byte) {
+ return lit.it.Key()[1:]
+}
+
+// Value returns the value at the current iterator position
+func (lit *levigoIterator) Value() (key []byte) {
+ return lit.it.Value()
+}
+
+// Valid returns false if we are past the first or last key in the key-value
+func (lit *levigoIterator) Valid() bool {
+ if lit.it.Valid() && lit.it.Key()[0] == lit.namespace {
+ return true
+ } else {
+ return false
+ }
+}
+
+// Close the iterator
+func (lit *levigoIterator) Close() {
+ lit.it.Close()
+ return
+}
diff --git a/go/swift-rpc-losf/encoding.go b/go/swift-rpc-losf/encoding.go
new file mode 100644
index 000000000..99c4384b1
--- /dev/null
+++ b/go/swift-rpc-losf/encoding.go
@@ -0,0 +1,198 @@
+// Copyright (c) 2010-2012 OpenStack Foundation
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+// implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main
+
+import (
+ "encoding/binary"
+ "encoding/hex"
+ "errors"
+ "github.com/sirupsen/logrus"
+)
+
+// Encodes a volume key.
+func EncodeVolumeKey(index uint32) (val []byte) {
+ buf := make([]byte, binary.MaxVarintLen32)
+
+ n := binary.PutUvarint(buf, uint64(index))
+
+ val = buf[:n]
+ log.WithFields(logrus.Fields{"value": val}).Debug("encoded volume key")
+ return
+}
+
+func DecodeVolumeKey(val []byte) (index uint32, err error) {
+ index32, n := binary.Uvarint(val)
+ if n <= 0 {
+ err = errors.New("failed to decode index")
+ return
+ }
+
+ index = uint32(index32)
+ return
+}
+
+// volumeType is an int32 to match the type generated by protobuf for enums
+func EncodeVolumeValue(partition int64, volumeType int32, nextOffset, usedSpace, state int64) (val []byte) {
+ buf := make([]byte, binary.MaxVarintLen64*5)
+ bufLen := 0
+
+ n := binary.PutUvarint(buf, uint64(partition))
+ bufLen += n
+
+ n = binary.PutUvarint(buf[bufLen:], uint64(volumeType))
+ bufLen += n
+
+ n = binary.PutUvarint(buf[bufLen:], uint64(nextOffset))
+ bufLen += n
+
+ n = binary.PutUvarint(buf[bufLen:], uint64(usedSpace))
+ bufLen += n
+
+ n = binary.PutUvarint(buf[bufLen:], uint64(state))
+ bufLen += n
+
+ val = buf[:bufLen]
+ log.WithFields(logrus.Fields{"value": val}).Debug("encoded volume value")
+ return
+}
+
+func DecodeVolumeValue(val []byte) (partition int64, volumeType int32, nextOffset, usedSpace, state int64, err error) {
+ position := 0
+
+ partition64, n := binary.Uvarint(val)
+ if n <= 0 {
+ err = errors.New("failed to decode partition")
+ }
+ position += n
+
+ volumeType64, n := binary.Uvarint(val[position:])
+ if n <= 0 {
+ err = errors.New("failed to decode nextOffset")
+ }
+ position += n
+
+ nextOffset64, n := binary.Uvarint(val[position:])
+ if n <= 0 {
+ err = errors.New("failed to decode nextOffset")
+ }
+ position += n
+
+ usedSpace64, n := binary.Uvarint(val[position:])
+ if n <= 0 {
+ err = errors.New("failed to decode usedSpace")
+ }
+ position += n
+
+ state64, n := binary.Uvarint(val[position:])
+ if n <= 0 {
+ err = errors.New("failed to decode state")
+ }
+
+ partition = int64(partition64)
+ volumeType = int32(volumeType64)
+ nextOffset = int64(nextOffset64)
+ usedSpace = int64(usedSpace64)
+ state = int64(state64)
+ return
+}
+
+// Encodes an object key. the key is the md5 hash string + the filename.
+// Turn the 32 characters hash to a 16 byte array. Leave the filename as
+// is for now. We could gain more space encoding the filename (varint timestamp + encoded file extension),
+// but there are special cases to handle (the "delta")
+func EncodeObjectKey(key []byte) ([]byte, error) {
+ var err error
+
+ if len(key) < 32 {
+ err = errors.New("object key len < 32, cannot encode")
+ return nil, err
+ }
+
+ dst := make([]byte, 16+len(key[32:]))
+ n, err := hex.Decode(dst, key[:32])
+ if err != nil {
+ err = errors.New("failed to encode object hash")
+ return dst, err
+ }
+
+ if n != 16 {
+ err = errors.New("encoded object hash is not 16 bytes long")
+ return dst, err
+ }
+
+ // copy the filename
+ copy(dst[16:], key[32:])
+
+ return dst, err
+}
+
+// Decodes object key
+// This is the most called function of the project. The profiler showed that it did
+// the most allocations on the heap (after cgo, which is something else to look at..)
+// Now expect the buffer from the caller.
+// decodedKey size must be 32+len(encodedKey[16:]), because we will decode the 16 bytes
+// hash to a 32 bytes string, with the rest unchanged.
+func DecodeObjectKey(encodedKey []byte, decodedKey []byte) error {
+ if len(encodedKey) < 16 {
+ err := errors.New("DecodeObjectKey called with encodedKey of len < 16")
+ return err
+ }
+ if len(decodedKey) < 32+len(encodedKey[16:]) {
+ err := errors.New("DecodeObjectKey called with decodedKey too small")
+ return err
+ }
+
+ hex.Encode(decodedKey, encodedKey[:16])
+ copy(decodedKey[32:], encodedKey[16:])
+
+ return nil
+}
+
+// Encodes an object file value.
+func EncodeObjectValue(volumeIndex uint32, offset uint64) (val []byte) {
+ buf := make([]byte, binary.MaxVarintLen64*2)
+ bufLen := 0
+
+ n := binary.PutUvarint(buf, uint64(volumeIndex))
+ bufLen += n
+
+ n = binary.PutUvarint(buf[n:], offset)
+ bufLen += n
+
+ val = buf[:bufLen]
+ log.WithFields(logrus.Fields{"value": val}).Debug("encoded object value")
+ return
+}
+
+func DecodeObjectValue(val []byte) (volumeIndex uint32, offset uint64, err error) {
+ log.WithFields(logrus.Fields{"value": val}).Debug("Decode object value")
+ volumeIndex64, n := binary.Uvarint(val)
+ if n <= 0 {
+ log.WithFields(logrus.Fields{"index": n}).Debug("failed to decode volumeIndex")
+ err = errors.New("failed to decode volumeIndex")
+ }
+
+ offset64, n := binary.Uvarint(val[n:])
+ if n <= 0 {
+ log.WithFields(logrus.Fields{"offset": n}).Debug("failed to decode offset")
+ err = errors.New("failed to decode offset")
+ return
+ }
+
+ volumeIndex = uint32(volumeIndex64)
+ offset = uint64(offset64)
+ return
+}
diff --git a/go/swift-rpc-losf/encoding_test.go b/go/swift-rpc-losf/encoding_test.go
new file mode 100644
index 000000000..986a1c903
--- /dev/null
+++ b/go/swift-rpc-losf/encoding_test.go
@@ -0,0 +1,225 @@
+// Copyright (c) 2010-2012 OpenStack Foundation
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+// implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main
+
+import (
+ "bytes"
+ "testing"
+)
+
+type dfKeyTest struct {
+ index uint32
+ value []byte
+}
+
+func TestVolumeKey(t *testing.T) {
+ var dFKeyTests = []dfKeyTest{
+ {0, []byte("\x00")},
+ {1, []byte("\x01")},
+ {123, []byte("\x7b")},
+ {863523, []byte("\xa3\xda\x34")},
+ {1<<32 - 1, []byte("\xff\xff\xff\xff\x0f")},
+ }
+
+ for _, tt := range dFKeyTests {
+ // Test encoding
+ val := EncodeVolumeKey(tt.index)
+ if !bytes.Equal(val, tt.value) {
+ t.Errorf("For index: %d, got %x, expected %x", tt.index, val, tt.value)
+ }
+
+ // Test decoding
+ index, err := DecodeVolumeKey(val)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if index != tt.index {
+ t.Errorf("For value: %x, got %d, expected %d", val, index, tt.index)
+ }
+
+ }
+
+ // Test overflow
+ m1 := []byte{0x80, 0x80, 0x80, 0x80}
+ _, err := DecodeVolumeKey(m1)
+ if err == nil {
+ t.Errorf("We should fail to decode %x", m1)
+ }
+}
+
+type dfValueTest struct {
+ partition int64
+ volumeType int32
+ nextOffset int64
+ usedSpace int64
+ state int64
+ value []byte
+}
+
+func TestVolumeValue(t *testing.T) {
+ var dfValueTests = []dfValueTest{
+ {0, 0, 0, 0, 0, []byte("\x00\x00\x00\x00\x00")},
+ {1343, 12, 3345314, 9821637, 2, []byte("\xbf\x0a\x0c\xa2\x97\xcc\x01\xc5\xbb\xd7\x04\x02")},
+ {^int64(0), ^int32(0), ^int64(0), ^int64(0), ^int64(0), bytes.Repeat([]byte("\xff\xff\xff\xff\xff\xff\xff\xff\xff\x01"), 5)},
+ // any negative value does not make sense, and should be caught by the RPC.
+ // test anyway, they get cast to uint64.
+ {-3572, 12, -1977878, 66666, -999999,
+ []byte("\x8c\xe4\xff\xff\xff\xff\xff\xff\xff\x01\x0c\xea\xa3\x87\xff\xff\xff" +
+ "\xff\xff\xff\x01\xea\x88\x04\xc1\xfb\xc2\xff\xff\xff\xff\xff\xff\x01")},
+ }
+
+ for _, tt := range dfValueTests {
+ // Test encoding
+ val := EncodeVolumeValue(tt.partition, tt.volumeType, tt.nextOffset, tt.usedSpace, tt.state)
+ if !bytes.Equal(val, tt.value) {
+ t.Errorf("For partition: %d, volumeType: %d, nextOffset: %d, usedSpace: %d, state: %d "+
+ "got: %x, expected: %x",
+ tt.partition, tt.volumeType, tt.nextOffset, tt.usedSpace, tt.state, val, tt.value)
+ }
+
+ // Test decoding
+ partition, volumeType, nextOffset, usedSpace, state, err := DecodeVolumeValue(tt.value)
+ if err != nil {
+ t.Error(err)
+ }
+ if partition != tt.partition {
+ t.Errorf("Decoding value: %x, expected: %d, got: %d", tt.value, tt.partition, partition)
+ }
+ if volumeType != tt.volumeType {
+ t.Errorf("Decoding value: %x, expected: %d, got: %d", tt.value, tt.volumeType, volumeType)
+ }
+ if nextOffset != tt.nextOffset {
+ t.Errorf("Decoding value: %x, expected: %d, got: %d", tt.value, tt.nextOffset, nextOffset)
+ }
+ if usedSpace != tt.usedSpace {
+ t.Errorf("Decoding value: %x, expected: %d, got: %d", tt.value, tt.usedSpace, usedSpace)
+ }
+ if state != tt.state {
+ t.Errorf("Decoding value: %x, expected: %d, got: %d", tt.value, tt.state, state)
+ }
+ }
+ // Test overflow
+ m1 := []byte{0x80, 0x80, 0x80, 0x80}
+ _, _, _, _, _, err := DecodeVolumeValue(m1)
+ if err == nil {
+ t.Errorf("We should fail to decode %x", m1)
+ }
+}
+
+type objectKeyTest struct {
+ key []byte
+ value []byte
+}
+
+func TestObjectKey(t *testing.T) {
+ var objectKeyTests = []objectKeyTest{
+ {[]byte("b80362143ac3221d15a75f4bd1af3fac1484213329.64315.data"),
+ []byte("\xb8\x03\x62\x14\x3a\xc3\x22\x1d\x15\xa7\x5f\x4b\xd1\xaf" +
+ "\x3f\xac\x31\x34\x38\x34\x32\x31\x33\x33\x32\x39\x2e\x36\x34" +
+ "\x33\x31\x35\x2e\x64\x61\x74\x61")},
+ {[]byte("a2b98cd26a070c2e5200be1f950813d51494323929.64315.ts"),
+ []byte("\xa2\xb9\x8c\xd2\x6a\x07\x0c\x2e\x52\x00\xbe\x1f\x95\x08" +
+ "\x13\xd5\x31\x34\x39\x34\x33\x32\x33\x39\x32\x39\x2e\x36\x34" +
+ "\x33\x31\x35\x2e\x74\x73")},
+ }
+
+ for _, tt := range objectKeyTests {
+ // Test encoding
+ val, err := EncodeObjectKey(tt.key)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if !bytes.Equal(val, tt.value) {
+ t.Errorf("For key: %x, got %x, expected %x", tt.key, val, tt.value)
+ }
+
+ // Test decoding
+ key := make([]byte, 32+len(val[16:]))
+ err = DecodeObjectKey(val, key)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if !bytes.Equal(key, tt.key) {
+ t.Errorf("For value: %x, got %d, expected %d", val, key, tt.key)
+ }
+
+ }
+
+ // Test encoding invalid object name, too short
+ invalidName := []byte("tooshort")
+ _, err := EncodeObjectKey(invalidName)
+ if err == nil {
+ t.Fatalf("should fail to encode object key: %x", invalidName)
+ }
+
+ // Test encoding invalid object name, bad char
+ invalidName = []byte("badchar\xff6a070c2e5200be1f950813d51494323929.64315.ts")
+ _, err = EncodeObjectKey(invalidName)
+ if err == nil {
+ t.Fatalf("should fail to encode: %x as object key", invalidName)
+ }
+
+ // Test decoding invalid data
+ invalidData := []byte("tooshort")
+ key := make([]byte, 12)
+ err = DecodeObjectKey(invalidData, key)
+ if err == nil {
+ t.Fatalf("should fail to decode: %x as object key", invalidData)
+ }
+
+}
+
+type objectValueTest struct {
+ volumeIndex uint32
+ offset uint64
+ value []byte
+}
+
+func TestObjectValue(t *testing.T) {
+ var objectValueTests = []objectValueTest{
+ {0, 0, []byte("\x00\x00")},
+ {1, 16384, []byte("\x01\x80\x80\x01")},
+ {823762, 61 * 1024 * 1024 * 1024, []byte("\xd2\xa3\x32\x80\x80\x80\x80\xf4\x01")},
+ {1<<32 - 1, 1<<64 - 1, []byte("\xff\xff\xff\xff\x0f\xff\xff\xff\xff\xff\xff\xff\xff\xff\x01")},
+ }
+
+ for _, tt := range objectValueTests {
+ // Test encoding
+ val := EncodeObjectValue(tt.volumeIndex, tt.offset)
+ if !bytes.Equal(val, tt.value) {
+ t.Errorf("For volumeType: %d, offset: %d, got %x, expected %x", tt.volumeIndex, tt.offset, val, tt.value)
+ }
+
+ // Test decoding
+ volumeIndex, offset, err := DecodeObjectValue(val)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if volumeIndex != tt.volumeIndex {
+ t.Errorf("Decoding value: %x, expected: %d, got: %d", tt.value, tt.volumeIndex, volumeIndex)
+ }
+ if offset != tt.offset {
+ t.Errorf("Decoding value: %x, expected: %d, got: %d", tt.value, tt.offset, offset)
+ }
+ }
+
+ // Test decoding invalid data
+ invalidData := []byte("\xff")
+ _, _, err := DecodeObjectValue(invalidData)
+ if err == nil {
+ t.Fatalf("should fail to decode: %x as object value", invalidData)
+ }
+}
diff --git a/go/swift-rpc-losf/go.mod b/go/swift-rpc-losf/go.mod
new file mode 100644
index 000000000..b76dcc723
--- /dev/null
+++ b/go/swift-rpc-losf/go.mod
@@ -0,0 +1,27 @@
+module github.com/openstack/swift-rpc-losf
+
+go 1.14
+
+// This file is auto-generated with following commands
+// GO111MODULE=on go mod init // require to run under GOPATH
+// GO111MODULE=on go get // able to run anywhare
+
+// TODO: think of we need to pin the versions as required
+
+require (
+ github.com/alecuyer/statsd/v2 v2.0.6
+ github.com/golang/protobuf v1.3.5
+ github.com/golang/snappy v0.0.1 // indirect
+ github.com/jmhodges/levigo v1.0.0
+ github.com/konsorten/go-windows-terminal-sequences v1.0.2 // indirect
+ github.com/kr/pretty v0.1.0 // indirect
+ github.com/onsi/ginkgo v1.10.1 // indirect
+ github.com/onsi/gomega v1.7.0 // indirect
+ github.com/sirupsen/logrus v1.4.2
+ github.com/stretchr/testify v1.4.0 // indirect
+ github.com/syndtr/goleveldb v1.0.0
+ golang.org/x/net v0.0.0-20190827160401-ba9fcec4b297 // indirect
+ golang.org/x/sys v0.0.0-20190904005037-43c01164e931
+ golang.org/x/text v0.3.2 // indirect
+ gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect
+)
diff --git a/go/swift-rpc-losf/go.sum b/go/swift-rpc-losf/go.sum
new file mode 100644
index 000000000..b1a8747d6
--- /dev/null
+++ b/go/swift-rpc-losf/go.sum
@@ -0,0 +1,76 @@
+github.com/alecuyer/statsd/v2 v2.0.6 h1:Zw7MkTocpUgJiiyGK/4r99qV6rFcq3COJJYkyFVvKpo=
+github.com/alecuyer/statsd/v2 v2.0.6/go.mod h1:qNFEkL4GN8jGsZpfrn9P6Q0FfdJf1b1FEGpWnesU4dc=
+github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
+github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
+github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
+github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM=
+github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
+github.com/golang/protobuf v1.3.5 h1:F768QJ1E9tib+q5Sc8MkdJi1RxLTbRcTf8LJV56aRls=
+github.com/golang/protobuf v1.3.5/go.mod h1:6O5/vntMXwX2lRkT1hjjk0nAC1IDOTvTlVgjlRvqsdk=
+github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db h1:woRePGFeVFfLKN/pOkfl+p/TAqKOfFu+7KPlMVpok/w=
+github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
+github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
+github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
+github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
+github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
+github.com/jmhodges/levigo v1.0.0 h1:q5EC36kV79HWeTBWsod3mG11EgStG3qArTKcvlksN1U=
+github.com/jmhodges/levigo v1.0.0/go.mod h1:Q6Qx+uH3RAqyK4rFQroq9RL7mdkABMcfhEI+nNuzMJQ=
+github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk=
+github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
+github.com/konsorten/go-windows-terminal-sequences v1.0.2 h1:DB17ag19krx9CFsz4o3enTrPXyIXCl+2iCXH/aMAp9s=
+github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
+github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
+github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
+github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
+github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
+github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
+github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
+github.com/onsi/ginkgo v1.7.0 h1:WSHQ+IS43OoUrWtD1/bbclrwK8TTH5hzp+umCiuxHgs=
+github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
+github.com/onsi/ginkgo v1.10.1 h1:q/mM8GF/n0shIN8SaAZ0V+jnLPzen6WIVZdiwrRlMlo=
+github.com/onsi/ginkgo v1.10.1/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
+github.com/onsi/gomega v1.4.3 h1:RE1xgDvH7imwFD45h+u2SgIfERHlS2yNG4DObb5BSKU=
+github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
+github.com/onsi/gomega v1.7.0 h1:XPnZz8VVBHjVsy1vzJmRwIcSwiUO+JFfrv/xGiigmME=
+github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
+github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
+github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4=
+github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
+github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w=
+github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
+github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
+github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
+github.com/syndtr/goleveldb v1.0.0 h1:fBdIW9lB4Iz0n9khmH8w27SJ3QEJ7+IgjPEwGSZiFdE=
+github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpPAyBWyWuQ=
+golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
+golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20190827160401-ba9fcec4b297 h1:k7pJ2yAPLPgbskkFdhRCsA77k2fySZ1zf2zCjvQCiIM=
+golang.org/x/net v0.0.0-20190827160401-ba9fcec4b297/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20190904005037-43c01164e931 h1:+WYfosiOJzB4BjsISl1Rv4ZLUy+VYcF+u+0Y9jcerv8=
+golang.org/x/sys v0.0.0-20190904005037-43c01164e931/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
+golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
+golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs=
+golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
+golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
+gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4=
+gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
+gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
+gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
+gopkg.in/yaml.v2 v2.2.1 h1:mUhvW9EsL+naU5Q3cakzfE91YhliOondGd6ZrsDBHQE=
+gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
+gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
diff --git a/go/swift-rpc-losf/leveldb b/go/swift-rpc-losf/leveldb
new file mode 160000
+Subproject fe4494804f5e3a2e25485d32aeb0eb7d2f25732
diff --git a/go/swift-rpc-losf/logging.go b/go/swift-rpc-losf/logging.go
new file mode 100644
index 000000000..d30e764db
--- /dev/null
+++ b/go/swift-rpc-losf/logging.go
@@ -0,0 +1,43 @@
+// Copyright (c) 2010-2012 OpenStack Foundation
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+// implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main
+
+import (
+ "github.com/sirupsen/logrus"
+ lSyslog "github.com/sirupsen/logrus/hooks/syslog"
+ "io/ioutil"
+ "log/syslog"
+)
+
+// global logger
+var log = logrus.New()
+
+func setupLogging() {
+ formatter := new(logrus.TextFormatter)
+ formatter.DisableTimestamp = true
+ log.Formatter = formatter
+
+ hook, err := lSyslog.NewSyslogHook("", "", syslog.LOG_INFO, "swift-kv")
+
+ if err != nil {
+ panic("cannot create syslog hook")
+ }
+
+ log.Hooks.Add(hook)
+
+ // Disable default logging, we only want to log through the syslog hook
+ log.Out = ioutil.Discard
+}
diff --git a/go/swift-rpc-losf/main.go b/go/swift-rpc-losf/main.go
new file mode 100644
index 000000000..9bf6489d3
--- /dev/null
+++ b/go/swift-rpc-losf/main.go
@@ -0,0 +1,174 @@
+// Copyright (c) 2010-2012 OpenStack Foundation
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+// implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// KV store for LOFS
+package main
+
+import (
+ "flag"
+ "fmt"
+ "github.com/sirupsen/logrus"
+ "net/http"
+ _ "net/http/pprof"
+ "os"
+ "os/signal"
+ "path"
+ "syscall"
+ "time"
+)
+
+// Name of the base losf root directory, relative to the swift disk
+const rootDirBase = "losf"
+
+// Will run checks and create rootDir if needed
+// Returns the path to rootDir
+func diskSetup(diskPath string, policyIdx int, waitForMount bool) (string, error) {
+
+ if waitForMount {
+ log.Debugf("waitForMount is set, if %s is not mounted, will wait until it is", diskPath)
+ }
+ for waitForMount {
+ nowMounted, err := isMounted(diskPath)
+ if err != nil {
+ return "", err
+ }
+ if nowMounted {
+ break
+ }
+ time.Sleep(time.Second / 10)
+ }
+
+ // OVH patch to match a similar mechanism in the python code.
+ // If a ".offline" file exists in srv node, do not start (/srv/node/disk-XX.offline)
+ offlineFile := fmt.Sprintf("%s%s", diskPath, ".offline")
+ offlineFileExists := false
+ if _, err := os.Stat(offlineFile); err == nil {
+ offlineFileExists = true
+ log.Debugf("offline file exists: %s", offlineFile)
+ }
+ for offlineFileExists {
+ if _, err := os.Stat(offlineFile); os.IsNotExist(err) {
+ offlineFileExists = false
+ }
+ time.Sleep(time.Second * 10)
+ }
+
+ rootDir := path.Join(diskPath, getBaseDirName(rootDirBase, policyIdx))
+ log.Debug(rootDir)
+
+ rootDirExists, err := dirExists(rootDir)
+ if err != nil {
+ return "", err
+ }
+ if !rootDirExists {
+ err := os.Mkdir(rootDir, os.FileMode(0700))
+ if err != nil {
+ return "", err
+ }
+ }
+ return rootDir, nil
+}
+
+// Parse options and starts the rpc server.
+func main() {
+ var dbDir, socketPath string
+ var kv KV
+
+ setupLogging()
+
+ debugLevels := map[string]logrus.Level{
+ "panic": logrus.PanicLevel,
+ "fatal": logrus.FatalLevel,
+ "error": logrus.ErrorLevel,
+ "warn": logrus.WarnLevel,
+ "info": logrus.InfoLevel,
+ "debug": logrus.DebugLevel,
+ }
+
+ var diskPath = flag.String("diskPath", "", "Swift disk path (/srv/node/disk-xyz)")
+ var policyIdx = flag.Int("policyIdx", 0, "Policy index")
+ var waitForMount = flag.Bool("waitForMount", true, "Wait for diskPath to be mounted. If diskPath exists but is not a mount, it will wait")
+ var profilerAddr = flag.String("profilerAddr", "", "Start profiler and make it available at this address (127.0.0.1:8081)")
+ var debugLevel = flag.String("debug", "info", "Debug level (error, warn, info, debug)")
+ var allowRoot = flag.Bool("allowRoot", false, "Allow process to run as root")
+ var useGoLevelDB = flag.Bool("useGoLevelDB", false, "Use native golang levelDB package")
+
+ flag.Parse()
+
+ log.SetLevel(debugLevels[*debugLevel])
+
+ if !*allowRoot && os.Getuid() == 0 {
+ log.Fatal("running as root, and allowRoot is false")
+ }
+
+ if *diskPath == "" {
+ log.Fatal("diskPath not specified")
+ }
+
+ rootDir, err := diskSetup(*diskPath, *policyIdx, *waitForMount)
+ if err != nil {
+ panic(err)
+ }
+
+ dbDir = path.Join(rootDir, "db")
+ socketPath = path.Join(rootDir, "rpc.socket")
+ rlog := log.WithFields(logrus.Fields{"socket": socketPath})
+
+ // install signal handler
+ stopChan := make(chan os.Signal, 1)
+ signal.Notify(stopChan, os.Interrupt, syscall.SIGTERM)
+
+ // start http server for profiling
+ if *profilerAddr != "" {
+ go func() {
+ rlog.Debug(http.ListenAndServe(*profilerAddr, nil))
+ }()
+ }
+
+ // Acquire lock to protect socket
+ rlog.Debug("Locking socket")
+ err = lockSocket(socketPath)
+ if err != nil {
+ rlog.Fatalf("Failed to lock RPC socket: %s", err)
+ }
+ os.Remove(socketPath)
+
+ // Open the database
+ if *useGoLevelDB {
+ kv, err = openGoLevelDb(dbDir)
+ } else {
+ kv, err = openLevigoDB(dbDir)
+ }
+
+ if err != nil {
+ rlog.Fatal(err)
+ }
+
+ // Check the kv was stopped properly
+ isClean, err := setKvState(kv)
+ if err != nil {
+ rlog.Fatal(err)
+ }
+ log.Infof("kv is clean: %v", isClean)
+
+ // Start the RPC server
+ rlog.Info("Starting RPC server")
+ err = runServer(kv, *diskPath, socketPath, stopChan, isClean)
+ if err != nil {
+ rlog.Fatal(err)
+ }
+
+ return
+}
diff --git a/go/swift-rpc-losf/proto/fmgr.pb.go b/go/swift-rpc-losf/proto/fmgr.pb.go
new file mode 100644
index 000000000..1cb26dc7b
--- /dev/null
+++ b/go/swift-rpc-losf/proto/fmgr.pb.go
@@ -0,0 +1,2213 @@
+// Code generated by protoc-gen-go. DO NOT EDIT.
+// source: fmgr.proto
+
+package filemgr
+
+import (
+ fmt "fmt"
+ proto "github.com/golang/protobuf/proto"
+ math "math"
+)
+
+// Reference imports to suppress errors if they are not otherwise used.
+var _ = proto.Marshal
+var _ = fmt.Errorf
+var _ = math.Inf
+
+// This is a compile-time assertion to ensure that this generated file
+// is compatible with the proto package it is being compiled against.
+// A compilation error at this line likely means your copy of the
+// proto package needs to be updated.
+const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package
+
+// Enums
+type VolumeType int32
+
+const (
+ VolumeType_VOLUME_DEFAULT VolumeType = 0
+ VolumeType_VOLUME_TOMBSTONE VolumeType = 1
+ VolumeType_VOLUME_X_DELETE_AT VolumeType = 2
+)
+
+var VolumeType_name = map[int32]string{
+ 0: "VOLUME_DEFAULT",
+ 1: "VOLUME_TOMBSTONE",
+ 2: "VOLUME_X_DELETE_AT",
+}
+
+var VolumeType_value = map[string]int32{
+ "VOLUME_DEFAULT": 0,
+ "VOLUME_TOMBSTONE": 1,
+ "VOLUME_X_DELETE_AT": 2,
+}
+
+func (x VolumeType) String() string {
+ return proto.EnumName(VolumeType_name, int32(x))
+}
+
+func (VolumeType) EnumDescriptor() ([]byte, []int) {
+ return fileDescriptor_1fcd0776e05e82a6, []int{0}
+}
+
+type VolumeState int32
+
+const (
+ // Default state, volume can be read from and written to
+ VolumeState_STATE_RW VolumeState = 0
+ // Volume is being compacted (source). New objects cannot be appended
+ VolumeState_STATE_COMPACTION_SRC VolumeState = 1
+ // Volume is a compaction target. New objects cannot be appended
+ VolumeState_STATE_COMPACTION_TARGET VolumeState = 2
+)
+
+var VolumeState_name = map[int32]string{
+ 0: "STATE_RW",
+ 1: "STATE_COMPACTION_SRC",
+ 2: "STATE_COMPACTION_TARGET",
+}
+
+var VolumeState_value = map[string]int32{
+ "STATE_RW": 0,
+ "STATE_COMPACTION_SRC": 1,
+ "STATE_COMPACTION_TARGET": 2,
+}
+
+func (x VolumeState) String() string {
+ return proto.EnumName(VolumeState_name, int32(x))
+}
+
+func (VolumeState) EnumDescriptor() ([]byte, []int) {
+ return fileDescriptor_1fcd0776e05e82a6, []int{1}
+}
+
+type RegisterVolumeRequest struct {
+ Partition uint32 `protobuf:"varint,1,opt,name=partition,proto3" json:"partition,omitempty"`
+ Type VolumeType `protobuf:"varint,2,opt,name=type,proto3,enum=filemgr.VolumeType" json:"type,omitempty"`
+ VolumeIndex uint32 `protobuf:"varint,3,opt,name=volume_index,json=volumeIndex,proto3" json:"volume_index,omitempty"`
+ Offset uint64 `protobuf:"varint,4,opt,name=offset,proto3" json:"offset,omitempty"`
+ State VolumeState `protobuf:"varint,5,opt,name=state,proto3,enum=filemgr.VolumeState" json:"state,omitempty"`
+ RepairTool bool `protobuf:"varint,6,opt,name=repair_tool,json=repairTool,proto3" json:"repair_tool,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *RegisterVolumeRequest) Reset() { *m = RegisterVolumeRequest{} }
+func (m *RegisterVolumeRequest) String() string { return proto.CompactTextString(m) }
+func (*RegisterVolumeRequest) ProtoMessage() {}
+func (*RegisterVolumeRequest) Descriptor() ([]byte, []int) {
+ return fileDescriptor_1fcd0776e05e82a6, []int{0}
+}
+
+func (m *RegisterVolumeRequest) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_RegisterVolumeRequest.Unmarshal(m, b)
+}
+func (m *RegisterVolumeRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_RegisterVolumeRequest.Marshal(b, m, deterministic)
+}
+func (m *RegisterVolumeRequest) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_RegisterVolumeRequest.Merge(m, src)
+}
+func (m *RegisterVolumeRequest) XXX_Size() int {
+ return xxx_messageInfo_RegisterVolumeRequest.Size(m)
+}
+func (m *RegisterVolumeRequest) XXX_DiscardUnknown() {
+ xxx_messageInfo_RegisterVolumeRequest.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_RegisterVolumeRequest proto.InternalMessageInfo
+
+func (m *RegisterVolumeRequest) GetPartition() uint32 {
+ if m != nil {
+ return m.Partition
+ }
+ return 0
+}
+
+func (m *RegisterVolumeRequest) GetType() VolumeType {
+ if m != nil {
+ return m.Type
+ }
+ return VolumeType_VOLUME_DEFAULT
+}
+
+func (m *RegisterVolumeRequest) GetVolumeIndex() uint32 {
+ if m != nil {
+ return m.VolumeIndex
+ }
+ return 0
+}
+
+func (m *RegisterVolumeRequest) GetOffset() uint64 {
+ if m != nil {
+ return m.Offset
+ }
+ return 0
+}
+
+func (m *RegisterVolumeRequest) GetState() VolumeState {
+ if m != nil {
+ return m.State
+ }
+ return VolumeState_STATE_RW
+}
+
+func (m *RegisterVolumeRequest) GetRepairTool() bool {
+ if m != nil {
+ return m.RepairTool
+ }
+ return false
+}
+
+type RegisterVolumeReply struct {
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *RegisterVolumeReply) Reset() { *m = RegisterVolumeReply{} }
+func (m *RegisterVolumeReply) String() string { return proto.CompactTextString(m) }
+func (*RegisterVolumeReply) ProtoMessage() {}
+func (*RegisterVolumeReply) Descriptor() ([]byte, []int) {
+ return fileDescriptor_1fcd0776e05e82a6, []int{1}
+}
+
+func (m *RegisterVolumeReply) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_RegisterVolumeReply.Unmarshal(m, b)
+}
+func (m *RegisterVolumeReply) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_RegisterVolumeReply.Marshal(b, m, deterministic)
+}
+func (m *RegisterVolumeReply) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_RegisterVolumeReply.Merge(m, src)
+}
+func (m *RegisterVolumeReply) XXX_Size() int {
+ return xxx_messageInfo_RegisterVolumeReply.Size(m)
+}
+func (m *RegisterVolumeReply) XXX_DiscardUnknown() {
+ xxx_messageInfo_RegisterVolumeReply.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_RegisterVolumeReply proto.InternalMessageInfo
+
+type UnregisterVolumeRequest struct {
+ Index uint32 `protobuf:"varint,1,opt,name=index,proto3" json:"index,omitempty"`
+ RepairTool bool `protobuf:"varint,2,opt,name=repair_tool,json=repairTool,proto3" json:"repair_tool,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *UnregisterVolumeRequest) Reset() { *m = UnregisterVolumeRequest{} }
+func (m *UnregisterVolumeRequest) String() string { return proto.CompactTextString(m) }
+func (*UnregisterVolumeRequest) ProtoMessage() {}
+func (*UnregisterVolumeRequest) Descriptor() ([]byte, []int) {
+ return fileDescriptor_1fcd0776e05e82a6, []int{2}
+}
+
+func (m *UnregisterVolumeRequest) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_UnregisterVolumeRequest.Unmarshal(m, b)
+}
+func (m *UnregisterVolumeRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_UnregisterVolumeRequest.Marshal(b, m, deterministic)
+}
+func (m *UnregisterVolumeRequest) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_UnregisterVolumeRequest.Merge(m, src)
+}
+func (m *UnregisterVolumeRequest) XXX_Size() int {
+ return xxx_messageInfo_UnregisterVolumeRequest.Size(m)
+}
+func (m *UnregisterVolumeRequest) XXX_DiscardUnknown() {
+ xxx_messageInfo_UnregisterVolumeRequest.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_UnregisterVolumeRequest proto.InternalMessageInfo
+
+func (m *UnregisterVolumeRequest) GetIndex() uint32 {
+ if m != nil {
+ return m.Index
+ }
+ return 0
+}
+
+func (m *UnregisterVolumeRequest) GetRepairTool() bool {
+ if m != nil {
+ return m.RepairTool
+ }
+ return false
+}
+
+type UnregisterVolumeReply struct {
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *UnregisterVolumeReply) Reset() { *m = UnregisterVolumeReply{} }
+func (m *UnregisterVolumeReply) String() string { return proto.CompactTextString(m) }
+func (*UnregisterVolumeReply) ProtoMessage() {}
+func (*UnregisterVolumeReply) Descriptor() ([]byte, []int) {
+ return fileDescriptor_1fcd0776e05e82a6, []int{3}
+}
+
+func (m *UnregisterVolumeReply) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_UnregisterVolumeReply.Unmarshal(m, b)
+}
+func (m *UnregisterVolumeReply) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_UnregisterVolumeReply.Marshal(b, m, deterministic)
+}
+func (m *UnregisterVolumeReply) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_UnregisterVolumeReply.Merge(m, src)
+}
+func (m *UnregisterVolumeReply) XXX_Size() int {
+ return xxx_messageInfo_UnregisterVolumeReply.Size(m)
+}
+func (m *UnregisterVolumeReply) XXX_DiscardUnknown() {
+ xxx_messageInfo_UnregisterVolumeReply.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_UnregisterVolumeReply proto.InternalMessageInfo
+
+type UpdateVolumeStateRequest struct {
+ VolumeIndex uint32 `protobuf:"varint,1,opt,name=volume_index,json=volumeIndex,proto3" json:"volume_index,omitempty"`
+ State VolumeState `protobuf:"varint,2,opt,name=state,proto3,enum=filemgr.VolumeState" json:"state,omitempty"`
+ RepairTool bool `protobuf:"varint,3,opt,name=repair_tool,json=repairTool,proto3" json:"repair_tool,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *UpdateVolumeStateRequest) Reset() { *m = UpdateVolumeStateRequest{} }
+func (m *UpdateVolumeStateRequest) String() string { return proto.CompactTextString(m) }
+func (*UpdateVolumeStateRequest) ProtoMessage() {}
+func (*UpdateVolumeStateRequest) Descriptor() ([]byte, []int) {
+ return fileDescriptor_1fcd0776e05e82a6, []int{4}
+}
+
+func (m *UpdateVolumeStateRequest) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_UpdateVolumeStateRequest.Unmarshal(m, b)
+}
+func (m *UpdateVolumeStateRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_UpdateVolumeStateRequest.Marshal(b, m, deterministic)
+}
+func (m *UpdateVolumeStateRequest) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_UpdateVolumeStateRequest.Merge(m, src)
+}
+func (m *UpdateVolumeStateRequest) XXX_Size() int {
+ return xxx_messageInfo_UpdateVolumeStateRequest.Size(m)
+}
+func (m *UpdateVolumeStateRequest) XXX_DiscardUnknown() {
+ xxx_messageInfo_UpdateVolumeStateRequest.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_UpdateVolumeStateRequest proto.InternalMessageInfo
+
+func (m *UpdateVolumeStateRequest) GetVolumeIndex() uint32 {
+ if m != nil {
+ return m.VolumeIndex
+ }
+ return 0
+}
+
+func (m *UpdateVolumeStateRequest) GetState() VolumeState {
+ if m != nil {
+ return m.State
+ }
+ return VolumeState_STATE_RW
+}
+
+func (m *UpdateVolumeStateRequest) GetRepairTool() bool {
+ if m != nil {
+ return m.RepairTool
+ }
+ return false
+}
+
+type UpdateVolumeStateReply struct {
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *UpdateVolumeStateReply) Reset() { *m = UpdateVolumeStateReply{} }
+func (m *UpdateVolumeStateReply) String() string { return proto.CompactTextString(m) }
+func (*UpdateVolumeStateReply) ProtoMessage() {}
+func (*UpdateVolumeStateReply) Descriptor() ([]byte, []int) {
+ return fileDescriptor_1fcd0776e05e82a6, []int{5}
+}
+
+func (m *UpdateVolumeStateReply) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_UpdateVolumeStateReply.Unmarshal(m, b)
+}
+func (m *UpdateVolumeStateReply) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_UpdateVolumeStateReply.Marshal(b, m, deterministic)
+}
+func (m *UpdateVolumeStateReply) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_UpdateVolumeStateReply.Merge(m, src)
+}
+func (m *UpdateVolumeStateReply) XXX_Size() int {
+ return xxx_messageInfo_UpdateVolumeStateReply.Size(m)
+}
+func (m *UpdateVolumeStateReply) XXX_DiscardUnknown() {
+ xxx_messageInfo_UpdateVolumeStateReply.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_UpdateVolumeStateReply proto.InternalMessageInfo
+
+type GetVolumeRequest struct {
+ Index uint32 `protobuf:"varint,1,opt,name=index,proto3" json:"index,omitempty"`
+ RepairTool bool `protobuf:"varint,2,opt,name=repair_tool,json=repairTool,proto3" json:"repair_tool,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *GetVolumeRequest) Reset() { *m = GetVolumeRequest{} }
+func (m *GetVolumeRequest) String() string { return proto.CompactTextString(m) }
+func (*GetVolumeRequest) ProtoMessage() {}
+func (*GetVolumeRequest) Descriptor() ([]byte, []int) {
+ return fileDescriptor_1fcd0776e05e82a6, []int{6}
+}
+
+func (m *GetVolumeRequest) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_GetVolumeRequest.Unmarshal(m, b)
+}
+func (m *GetVolumeRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_GetVolumeRequest.Marshal(b, m, deterministic)
+}
+func (m *GetVolumeRequest) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_GetVolumeRequest.Merge(m, src)
+}
+func (m *GetVolumeRequest) XXX_Size() int {
+ return xxx_messageInfo_GetVolumeRequest.Size(m)
+}
+func (m *GetVolumeRequest) XXX_DiscardUnknown() {
+ xxx_messageInfo_GetVolumeRequest.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_GetVolumeRequest proto.InternalMessageInfo
+
+func (m *GetVolumeRequest) GetIndex() uint32 {
+ if m != nil {
+ return m.Index
+ }
+ return 0
+}
+
+func (m *GetVolumeRequest) GetRepairTool() bool {
+ if m != nil {
+ return m.RepairTool
+ }
+ return false
+}
+
+type GetVolumeReply struct {
+ VolumeIndex uint32 `protobuf:"varint,1,opt,name=volume_index,json=volumeIndex,proto3" json:"volume_index,omitempty"`
+ VolumeType VolumeType `protobuf:"varint,2,opt,name=volume_type,json=volumeType,proto3,enum=filemgr.VolumeType" json:"volume_type,omitempty"`
+ VolumeState uint32 `protobuf:"varint,3,opt,name=volume_state,json=volumeState,proto3" json:"volume_state,omitempty"`
+ Partition uint32 `protobuf:"varint,4,opt,name=partition,proto3" json:"partition,omitempty"`
+ NextOffset uint64 `protobuf:"varint,5,opt,name=next_offset,json=nextOffset,proto3" json:"next_offset,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *GetVolumeReply) Reset() { *m = GetVolumeReply{} }
+func (m *GetVolumeReply) String() string { return proto.CompactTextString(m) }
+func (*GetVolumeReply) ProtoMessage() {}
+func (*GetVolumeReply) Descriptor() ([]byte, []int) {
+ return fileDescriptor_1fcd0776e05e82a6, []int{7}
+}
+
+func (m *GetVolumeReply) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_GetVolumeReply.Unmarshal(m, b)
+}
+func (m *GetVolumeReply) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_GetVolumeReply.Marshal(b, m, deterministic)
+}
+func (m *GetVolumeReply) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_GetVolumeReply.Merge(m, src)
+}
+func (m *GetVolumeReply) XXX_Size() int {
+ return xxx_messageInfo_GetVolumeReply.Size(m)
+}
+func (m *GetVolumeReply) XXX_DiscardUnknown() {
+ xxx_messageInfo_GetVolumeReply.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_GetVolumeReply proto.InternalMessageInfo
+
+func (m *GetVolumeReply) GetVolumeIndex() uint32 {
+ if m != nil {
+ return m.VolumeIndex
+ }
+ return 0
+}
+
+func (m *GetVolumeReply) GetVolumeType() VolumeType {
+ if m != nil {
+ return m.VolumeType
+ }
+ return VolumeType_VOLUME_DEFAULT
+}
+
+func (m *GetVolumeReply) GetVolumeState() uint32 {
+ if m != nil {
+ return m.VolumeState
+ }
+ return 0
+}
+
+func (m *GetVolumeReply) GetPartition() uint32 {
+ if m != nil {
+ return m.Partition
+ }
+ return 0
+}
+
+func (m *GetVolumeReply) GetNextOffset() uint64 {
+ if m != nil {
+ return m.NextOffset
+ }
+ return 0
+}
+
+type ListVolumesRequest struct {
+ Partition uint32 `protobuf:"varint,1,opt,name=partition,proto3" json:"partition,omitempty"`
+ Type VolumeType `protobuf:"varint,2,opt,name=type,proto3,enum=filemgr.VolumeType" json:"type,omitempty"`
+ RepairTool bool `protobuf:"varint,3,opt,name=repair_tool,json=repairTool,proto3" json:"repair_tool,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *ListVolumesRequest) Reset() { *m = ListVolumesRequest{} }
+func (m *ListVolumesRequest) String() string { return proto.CompactTextString(m) }
+func (*ListVolumesRequest) ProtoMessage() {}
+func (*ListVolumesRequest) Descriptor() ([]byte, []int) {
+ return fileDescriptor_1fcd0776e05e82a6, []int{8}
+}
+
+func (m *ListVolumesRequest) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_ListVolumesRequest.Unmarshal(m, b)
+}
+func (m *ListVolumesRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_ListVolumesRequest.Marshal(b, m, deterministic)
+}
+func (m *ListVolumesRequest) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_ListVolumesRequest.Merge(m, src)
+}
+func (m *ListVolumesRequest) XXX_Size() int {
+ return xxx_messageInfo_ListVolumesRequest.Size(m)
+}
+func (m *ListVolumesRequest) XXX_DiscardUnknown() {
+ xxx_messageInfo_ListVolumesRequest.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_ListVolumesRequest proto.InternalMessageInfo
+
+func (m *ListVolumesRequest) GetPartition() uint32 {
+ if m != nil {
+ return m.Partition
+ }
+ return 0
+}
+
+func (m *ListVolumesRequest) GetType() VolumeType {
+ if m != nil {
+ return m.Type
+ }
+ return VolumeType_VOLUME_DEFAULT
+}
+
+func (m *ListVolumesRequest) GetRepairTool() bool {
+ if m != nil {
+ return m.RepairTool
+ }
+ return false
+}
+
+type ListVolumesReply struct {
+ Volumes []*Volume `protobuf:"bytes,1,rep,name=volumes,proto3" json:"volumes,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *ListVolumesReply) Reset() { *m = ListVolumesReply{} }
+func (m *ListVolumesReply) String() string { return proto.CompactTextString(m) }
+func (*ListVolumesReply) ProtoMessage() {}
+func (*ListVolumesReply) Descriptor() ([]byte, []int) {
+ return fileDescriptor_1fcd0776e05e82a6, []int{9}
+}
+
+func (m *ListVolumesReply) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_ListVolumesReply.Unmarshal(m, b)
+}
+func (m *ListVolumesReply) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_ListVolumesReply.Marshal(b, m, deterministic)
+}
+func (m *ListVolumesReply) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_ListVolumesReply.Merge(m, src)
+}
+func (m *ListVolumesReply) XXX_Size() int {
+ return xxx_messageInfo_ListVolumesReply.Size(m)
+}
+func (m *ListVolumesReply) XXX_DiscardUnknown() {
+ xxx_messageInfo_ListVolumesReply.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_ListVolumesReply proto.InternalMessageInfo
+
+func (m *ListVolumesReply) GetVolumes() []*Volume {
+ if m != nil {
+ return m.Volumes
+ }
+ return nil
+}
+
+type RegisterObjectRequest struct {
+ Name []byte `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
+ VolumeIndex uint32 `protobuf:"varint,2,opt,name=volume_index,json=volumeIndex,proto3" json:"volume_index,omitempty"`
+ Offset uint64 `protobuf:"varint,3,opt,name=offset,proto3" json:"offset,omitempty"`
+ NextOffset uint64 `protobuf:"varint,4,opt,name=next_offset,json=nextOffset,proto3" json:"next_offset,omitempty"`
+ RepairTool bool `protobuf:"varint,5,opt,name=repair_tool,json=repairTool,proto3" json:"repair_tool,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *RegisterObjectRequest) Reset() { *m = RegisterObjectRequest{} }
+func (m *RegisterObjectRequest) String() string { return proto.CompactTextString(m) }
+func (*RegisterObjectRequest) ProtoMessage() {}
+func (*RegisterObjectRequest) Descriptor() ([]byte, []int) {
+ return fileDescriptor_1fcd0776e05e82a6, []int{10}
+}
+
+func (m *RegisterObjectRequest) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_RegisterObjectRequest.Unmarshal(m, b)
+}
+func (m *RegisterObjectRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_RegisterObjectRequest.Marshal(b, m, deterministic)
+}
+func (m *RegisterObjectRequest) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_RegisterObjectRequest.Merge(m, src)
+}
+func (m *RegisterObjectRequest) XXX_Size() int {
+ return xxx_messageInfo_RegisterObjectRequest.Size(m)
+}
+func (m *RegisterObjectRequest) XXX_DiscardUnknown() {
+ xxx_messageInfo_RegisterObjectRequest.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_RegisterObjectRequest proto.InternalMessageInfo
+
+func (m *RegisterObjectRequest) GetName() []byte {
+ if m != nil {
+ return m.Name
+ }
+ return nil
+}
+
+func (m *RegisterObjectRequest) GetVolumeIndex() uint32 {
+ if m != nil {
+ return m.VolumeIndex
+ }
+ return 0
+}
+
+func (m *RegisterObjectRequest) GetOffset() uint64 {
+ if m != nil {
+ return m.Offset
+ }
+ return 0
+}
+
+func (m *RegisterObjectRequest) GetNextOffset() uint64 {
+ if m != nil {
+ return m.NextOffset
+ }
+ return 0
+}
+
+func (m *RegisterObjectRequest) GetRepairTool() bool {
+ if m != nil {
+ return m.RepairTool
+ }
+ return false
+}
+
+type RegisterObjectReply struct {
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *RegisterObjectReply) Reset() { *m = RegisterObjectReply{} }
+func (m *RegisterObjectReply) String() string { return proto.CompactTextString(m) }
+func (*RegisterObjectReply) ProtoMessage() {}
+func (*RegisterObjectReply) Descriptor() ([]byte, []int) {
+ return fileDescriptor_1fcd0776e05e82a6, []int{11}
+}
+
+func (m *RegisterObjectReply) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_RegisterObjectReply.Unmarshal(m, b)
+}
+func (m *RegisterObjectReply) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_RegisterObjectReply.Marshal(b, m, deterministic)
+}
+func (m *RegisterObjectReply) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_RegisterObjectReply.Merge(m, src)
+}
+func (m *RegisterObjectReply) XXX_Size() int {
+ return xxx_messageInfo_RegisterObjectReply.Size(m)
+}
+func (m *RegisterObjectReply) XXX_DiscardUnknown() {
+ xxx_messageInfo_RegisterObjectReply.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_RegisterObjectReply proto.InternalMessageInfo
+
+type UnregisterObjectRequest struct {
+ Name []byte `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
+ RepairTool bool `protobuf:"varint,2,opt,name=repair_tool,json=repairTool,proto3" json:"repair_tool,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *UnregisterObjectRequest) Reset() { *m = UnregisterObjectRequest{} }
+func (m *UnregisterObjectRequest) String() string { return proto.CompactTextString(m) }
+func (*UnregisterObjectRequest) ProtoMessage() {}
+func (*UnregisterObjectRequest) Descriptor() ([]byte, []int) {
+ return fileDescriptor_1fcd0776e05e82a6, []int{12}
+}
+
+func (m *UnregisterObjectRequest) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_UnregisterObjectRequest.Unmarshal(m, b)
+}
+func (m *UnregisterObjectRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_UnregisterObjectRequest.Marshal(b, m, deterministic)
+}
+func (m *UnregisterObjectRequest) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_UnregisterObjectRequest.Merge(m, src)
+}
+func (m *UnregisterObjectRequest) XXX_Size() int {
+ return xxx_messageInfo_UnregisterObjectRequest.Size(m)
+}
+func (m *UnregisterObjectRequest) XXX_DiscardUnknown() {
+ xxx_messageInfo_UnregisterObjectRequest.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_UnregisterObjectRequest proto.InternalMessageInfo
+
+func (m *UnregisterObjectRequest) GetName() []byte {
+ if m != nil {
+ return m.Name
+ }
+ return nil
+}
+
+func (m *UnregisterObjectRequest) GetRepairTool() bool {
+ if m != nil {
+ return m.RepairTool
+ }
+ return false
+}
+
+type UnregisterObjectReply struct {
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *UnregisterObjectReply) Reset() { *m = UnregisterObjectReply{} }
+func (m *UnregisterObjectReply) String() string { return proto.CompactTextString(m) }
+func (*UnregisterObjectReply) ProtoMessage() {}
+func (*UnregisterObjectReply) Descriptor() ([]byte, []int) {
+ return fileDescriptor_1fcd0776e05e82a6, []int{13}
+}
+
+func (m *UnregisterObjectReply) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_UnregisterObjectReply.Unmarshal(m, b)
+}
+func (m *UnregisterObjectReply) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_UnregisterObjectReply.Marshal(b, m, deterministic)
+}
+func (m *UnregisterObjectReply) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_UnregisterObjectReply.Merge(m, src)
+}
+func (m *UnregisterObjectReply) XXX_Size() int {
+ return xxx_messageInfo_UnregisterObjectReply.Size(m)
+}
+func (m *UnregisterObjectReply) XXX_DiscardUnknown() {
+ xxx_messageInfo_UnregisterObjectReply.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_UnregisterObjectReply proto.InternalMessageInfo
+
+type RenameObjectRequest struct {
+ Name []byte `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
+ NewName []byte `protobuf:"bytes,2,opt,name=new_name,json=newName,proto3" json:"new_name,omitempty"`
+ RepairTool bool `protobuf:"varint,3,opt,name=repair_tool,json=repairTool,proto3" json:"repair_tool,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *RenameObjectRequest) Reset() { *m = RenameObjectRequest{} }
+func (m *RenameObjectRequest) String() string { return proto.CompactTextString(m) }
+func (*RenameObjectRequest) ProtoMessage() {}
+func (*RenameObjectRequest) Descriptor() ([]byte, []int) {
+ return fileDescriptor_1fcd0776e05e82a6, []int{14}
+}
+
+func (m *RenameObjectRequest) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_RenameObjectRequest.Unmarshal(m, b)
+}
+func (m *RenameObjectRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_RenameObjectRequest.Marshal(b, m, deterministic)
+}
+func (m *RenameObjectRequest) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_RenameObjectRequest.Merge(m, src)
+}
+func (m *RenameObjectRequest) XXX_Size() int {
+ return xxx_messageInfo_RenameObjectRequest.Size(m)
+}
+func (m *RenameObjectRequest) XXX_DiscardUnknown() {
+ xxx_messageInfo_RenameObjectRequest.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_RenameObjectRequest proto.InternalMessageInfo
+
+func (m *RenameObjectRequest) GetName() []byte {
+ if m != nil {
+ return m.Name
+ }
+ return nil
+}
+
+func (m *RenameObjectRequest) GetNewName() []byte {
+ if m != nil {
+ return m.NewName
+ }
+ return nil
+}
+
+func (m *RenameObjectRequest) GetRepairTool() bool {
+ if m != nil {
+ return m.RepairTool
+ }
+ return false
+}
+
+type RenameObjectReply struct {
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *RenameObjectReply) Reset() { *m = RenameObjectReply{} }
+func (m *RenameObjectReply) String() string { return proto.CompactTextString(m) }
+func (*RenameObjectReply) ProtoMessage() {}
+func (*RenameObjectReply) Descriptor() ([]byte, []int) {
+ return fileDescriptor_1fcd0776e05e82a6, []int{15}
+}
+
+func (m *RenameObjectReply) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_RenameObjectReply.Unmarshal(m, b)
+}
+func (m *RenameObjectReply) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_RenameObjectReply.Marshal(b, m, deterministic)
+}
+func (m *RenameObjectReply) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_RenameObjectReply.Merge(m, src)
+}
+func (m *RenameObjectReply) XXX_Size() int {
+ return xxx_messageInfo_RenameObjectReply.Size(m)
+}
+func (m *RenameObjectReply) XXX_DiscardUnknown() {
+ xxx_messageInfo_RenameObjectReply.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_RenameObjectReply proto.InternalMessageInfo
+
+type LoadObjectRequest struct {
+ Name []byte `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
+ IsQuarantined bool `protobuf:"varint,2,opt,name=is_quarantined,json=isQuarantined,proto3" json:"is_quarantined,omitempty"`
+ RepairTool bool `protobuf:"varint,3,opt,name=repair_tool,json=repairTool,proto3" json:"repair_tool,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *LoadObjectRequest) Reset() { *m = LoadObjectRequest{} }
+func (m *LoadObjectRequest) String() string { return proto.CompactTextString(m) }
+func (*LoadObjectRequest) ProtoMessage() {}
+func (*LoadObjectRequest) Descriptor() ([]byte, []int) {
+ return fileDescriptor_1fcd0776e05e82a6, []int{16}
+}
+
+func (m *LoadObjectRequest) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_LoadObjectRequest.Unmarshal(m, b)
+}
+func (m *LoadObjectRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_LoadObjectRequest.Marshal(b, m, deterministic)
+}
+func (m *LoadObjectRequest) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_LoadObjectRequest.Merge(m, src)
+}
+func (m *LoadObjectRequest) XXX_Size() int {
+ return xxx_messageInfo_LoadObjectRequest.Size(m)
+}
+func (m *LoadObjectRequest) XXX_DiscardUnknown() {
+ xxx_messageInfo_LoadObjectRequest.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_LoadObjectRequest proto.InternalMessageInfo
+
+func (m *LoadObjectRequest) GetName() []byte {
+ if m != nil {
+ return m.Name
+ }
+ return nil
+}
+
+func (m *LoadObjectRequest) GetIsQuarantined() bool {
+ if m != nil {
+ return m.IsQuarantined
+ }
+ return false
+}
+
+func (m *LoadObjectRequest) GetRepairTool() bool {
+ if m != nil {
+ return m.RepairTool
+ }
+ return false
+}
+
+type LoadObjectReply struct {
+ Name []byte `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
+ VolumeIndex uint32 `protobuf:"varint,2,opt,name=volume_index,json=volumeIndex,proto3" json:"volume_index,omitempty"`
+ Offset uint64 `protobuf:"varint,3,opt,name=offset,proto3" json:"offset,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *LoadObjectReply) Reset() { *m = LoadObjectReply{} }
+func (m *LoadObjectReply) String() string { return proto.CompactTextString(m) }
+func (*LoadObjectReply) ProtoMessage() {}
+func (*LoadObjectReply) Descriptor() ([]byte, []int) {
+ return fileDescriptor_1fcd0776e05e82a6, []int{17}
+}
+
+func (m *LoadObjectReply) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_LoadObjectReply.Unmarshal(m, b)
+}
+func (m *LoadObjectReply) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_LoadObjectReply.Marshal(b, m, deterministic)
+}
+func (m *LoadObjectReply) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_LoadObjectReply.Merge(m, src)
+}
+func (m *LoadObjectReply) XXX_Size() int {
+ return xxx_messageInfo_LoadObjectReply.Size(m)
+}
+func (m *LoadObjectReply) XXX_DiscardUnknown() {
+ xxx_messageInfo_LoadObjectReply.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_LoadObjectReply proto.InternalMessageInfo
+
+func (m *LoadObjectReply) GetName() []byte {
+ if m != nil {
+ return m.Name
+ }
+ return nil
+}
+
+func (m *LoadObjectReply) GetVolumeIndex() uint32 {
+ if m != nil {
+ return m.VolumeIndex
+ }
+ return 0
+}
+
+func (m *LoadObjectReply) GetOffset() uint64 {
+ if m != nil {
+ return m.Offset
+ }
+ return 0
+}
+
+type QuarantineObjectRequest struct {
+ Name []byte `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
+ RepairTool bool `protobuf:"varint,2,opt,name=repair_tool,json=repairTool,proto3" json:"repair_tool,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *QuarantineObjectRequest) Reset() { *m = QuarantineObjectRequest{} }
+func (m *QuarantineObjectRequest) String() string { return proto.CompactTextString(m) }
+func (*QuarantineObjectRequest) ProtoMessage() {}
+func (*QuarantineObjectRequest) Descriptor() ([]byte, []int) {
+ return fileDescriptor_1fcd0776e05e82a6, []int{18}
+}
+
+func (m *QuarantineObjectRequest) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_QuarantineObjectRequest.Unmarshal(m, b)
+}
+func (m *QuarantineObjectRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_QuarantineObjectRequest.Marshal(b, m, deterministic)
+}
+func (m *QuarantineObjectRequest) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_QuarantineObjectRequest.Merge(m, src)
+}
+func (m *QuarantineObjectRequest) XXX_Size() int {
+ return xxx_messageInfo_QuarantineObjectRequest.Size(m)
+}
+func (m *QuarantineObjectRequest) XXX_DiscardUnknown() {
+ xxx_messageInfo_QuarantineObjectRequest.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_QuarantineObjectRequest proto.InternalMessageInfo
+
+func (m *QuarantineObjectRequest) GetName() []byte {
+ if m != nil {
+ return m.Name
+ }
+ return nil
+}
+
+func (m *QuarantineObjectRequest) GetRepairTool() bool {
+ if m != nil {
+ return m.RepairTool
+ }
+ return false
+}
+
+type QuarantineObjectReply struct {
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *QuarantineObjectReply) Reset() { *m = QuarantineObjectReply{} }
+func (m *QuarantineObjectReply) String() string { return proto.CompactTextString(m) }
+func (*QuarantineObjectReply) ProtoMessage() {}
+func (*QuarantineObjectReply) Descriptor() ([]byte, []int) {
+ return fileDescriptor_1fcd0776e05e82a6, []int{19}
+}
+
+func (m *QuarantineObjectReply) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_QuarantineObjectReply.Unmarshal(m, b)
+}
+func (m *QuarantineObjectReply) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_QuarantineObjectReply.Marshal(b, m, deterministic)
+}
+func (m *QuarantineObjectReply) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_QuarantineObjectReply.Merge(m, src)
+}
+func (m *QuarantineObjectReply) XXX_Size() int {
+ return xxx_messageInfo_QuarantineObjectReply.Size(m)
+}
+func (m *QuarantineObjectReply) XXX_DiscardUnknown() {
+ xxx_messageInfo_QuarantineObjectReply.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_QuarantineObjectReply proto.InternalMessageInfo
+
+type UnquarantineObjectRequest struct {
+ Name []byte `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
+ RepairTool bool `protobuf:"varint,2,opt,name=repair_tool,json=repairTool,proto3" json:"repair_tool,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *UnquarantineObjectRequest) Reset() { *m = UnquarantineObjectRequest{} }
+func (m *UnquarantineObjectRequest) String() string { return proto.CompactTextString(m) }
+func (*UnquarantineObjectRequest) ProtoMessage() {}
+func (*UnquarantineObjectRequest) Descriptor() ([]byte, []int) {
+ return fileDescriptor_1fcd0776e05e82a6, []int{20}
+}
+
+func (m *UnquarantineObjectRequest) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_UnquarantineObjectRequest.Unmarshal(m, b)
+}
+func (m *UnquarantineObjectRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_UnquarantineObjectRequest.Marshal(b, m, deterministic)
+}
+func (m *UnquarantineObjectRequest) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_UnquarantineObjectRequest.Merge(m, src)
+}
+func (m *UnquarantineObjectRequest) XXX_Size() int {
+ return xxx_messageInfo_UnquarantineObjectRequest.Size(m)
+}
+func (m *UnquarantineObjectRequest) XXX_DiscardUnknown() {
+ xxx_messageInfo_UnquarantineObjectRequest.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_UnquarantineObjectRequest proto.InternalMessageInfo
+
+func (m *UnquarantineObjectRequest) GetName() []byte {
+ if m != nil {
+ return m.Name
+ }
+ return nil
+}
+
+func (m *UnquarantineObjectRequest) GetRepairTool() bool {
+ if m != nil {
+ return m.RepairTool
+ }
+ return false
+}
+
+type UnquarantineObjectReply struct {
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *UnquarantineObjectReply) Reset() { *m = UnquarantineObjectReply{} }
+func (m *UnquarantineObjectReply) String() string { return proto.CompactTextString(m) }
+func (*UnquarantineObjectReply) ProtoMessage() {}
+func (*UnquarantineObjectReply) Descriptor() ([]byte, []int) {
+ return fileDescriptor_1fcd0776e05e82a6, []int{21}
+}
+
+func (m *UnquarantineObjectReply) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_UnquarantineObjectReply.Unmarshal(m, b)
+}
+func (m *UnquarantineObjectReply) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_UnquarantineObjectReply.Marshal(b, m, deterministic)
+}
+func (m *UnquarantineObjectReply) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_UnquarantineObjectReply.Merge(m, src)
+}
+func (m *UnquarantineObjectReply) XXX_Size() int {
+ return xxx_messageInfo_UnquarantineObjectReply.Size(m)
+}
+func (m *UnquarantineObjectReply) XXX_DiscardUnknown() {
+ xxx_messageInfo_UnquarantineObjectReply.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_UnquarantineObjectReply proto.InternalMessageInfo
+
+type LoadObjectsByPrefixRequest struct {
+ Prefix []byte `protobuf:"bytes,1,opt,name=prefix,proto3" json:"prefix,omitempty"`
+ RepairTool bool `protobuf:"varint,2,opt,name=repair_tool,json=repairTool,proto3" json:"repair_tool,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *LoadObjectsByPrefixRequest) Reset() { *m = LoadObjectsByPrefixRequest{} }
+func (m *LoadObjectsByPrefixRequest) String() string { return proto.CompactTextString(m) }
+func (*LoadObjectsByPrefixRequest) ProtoMessage() {}
+func (*LoadObjectsByPrefixRequest) Descriptor() ([]byte, []int) {
+ return fileDescriptor_1fcd0776e05e82a6, []int{22}
+}
+
+func (m *LoadObjectsByPrefixRequest) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_LoadObjectsByPrefixRequest.Unmarshal(m, b)
+}
+func (m *LoadObjectsByPrefixRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_LoadObjectsByPrefixRequest.Marshal(b, m, deterministic)
+}
+func (m *LoadObjectsByPrefixRequest) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_LoadObjectsByPrefixRequest.Merge(m, src)
+}
+func (m *LoadObjectsByPrefixRequest) XXX_Size() int {
+ return xxx_messageInfo_LoadObjectsByPrefixRequest.Size(m)
+}
+func (m *LoadObjectsByPrefixRequest) XXX_DiscardUnknown() {
+ xxx_messageInfo_LoadObjectsByPrefixRequest.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_LoadObjectsByPrefixRequest proto.InternalMessageInfo
+
+func (m *LoadObjectsByPrefixRequest) GetPrefix() []byte {
+ if m != nil {
+ return m.Prefix
+ }
+ return nil
+}
+
+func (m *LoadObjectsByPrefixRequest) GetRepairTool() bool {
+ if m != nil {
+ return m.RepairTool
+ }
+ return false
+}
+
+type LoadObjectsByPrefixReply struct {
+ Objects []*Object `protobuf:"bytes,1,rep,name=objects,proto3" json:"objects,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *LoadObjectsByPrefixReply) Reset() { *m = LoadObjectsByPrefixReply{} }
+func (m *LoadObjectsByPrefixReply) String() string { return proto.CompactTextString(m) }
+func (*LoadObjectsByPrefixReply) ProtoMessage() {}
+func (*LoadObjectsByPrefixReply) Descriptor() ([]byte, []int) {
+ return fileDescriptor_1fcd0776e05e82a6, []int{23}
+}
+
+func (m *LoadObjectsByPrefixReply) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_LoadObjectsByPrefixReply.Unmarshal(m, b)
+}
+func (m *LoadObjectsByPrefixReply) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_LoadObjectsByPrefixReply.Marshal(b, m, deterministic)
+}
+func (m *LoadObjectsByPrefixReply) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_LoadObjectsByPrefixReply.Merge(m, src)
+}
+func (m *LoadObjectsByPrefixReply) XXX_Size() int {
+ return xxx_messageInfo_LoadObjectsByPrefixReply.Size(m)
+}
+func (m *LoadObjectsByPrefixReply) XXX_DiscardUnknown() {
+ xxx_messageInfo_LoadObjectsByPrefixReply.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_LoadObjectsByPrefixReply proto.InternalMessageInfo
+
+func (m *LoadObjectsByPrefixReply) GetObjects() []*Object {
+ if m != nil {
+ return m.Objects
+ }
+ return nil
+}
+
+type LoadObjectsByVolumeRequest struct {
+ Index uint32 `protobuf:"varint,1,opt,name=index,proto3" json:"index,omitempty"`
+ Quarantined bool `protobuf:"varint,2,opt,name=quarantined,proto3" json:"quarantined,omitempty"`
+ PageToken []byte `protobuf:"bytes,3,opt,name=page_token,json=pageToken,proto3" json:"page_token,omitempty"`
+ PageSize uint32 `protobuf:"varint,4,opt,name=page_size,json=pageSize,proto3" json:"page_size,omitempty"`
+ RepairTool bool `protobuf:"varint,5,opt,name=repair_tool,json=repairTool,proto3" json:"repair_tool,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *LoadObjectsByVolumeRequest) Reset() { *m = LoadObjectsByVolumeRequest{} }
+func (m *LoadObjectsByVolumeRequest) String() string { return proto.CompactTextString(m) }
+func (*LoadObjectsByVolumeRequest) ProtoMessage() {}
+func (*LoadObjectsByVolumeRequest) Descriptor() ([]byte, []int) {
+ return fileDescriptor_1fcd0776e05e82a6, []int{24}
+}
+
+func (m *LoadObjectsByVolumeRequest) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_LoadObjectsByVolumeRequest.Unmarshal(m, b)
+}
+func (m *LoadObjectsByVolumeRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_LoadObjectsByVolumeRequest.Marshal(b, m, deterministic)
+}
+func (m *LoadObjectsByVolumeRequest) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_LoadObjectsByVolumeRequest.Merge(m, src)
+}
+func (m *LoadObjectsByVolumeRequest) XXX_Size() int {
+ return xxx_messageInfo_LoadObjectsByVolumeRequest.Size(m)
+}
+func (m *LoadObjectsByVolumeRequest) XXX_DiscardUnknown() {
+ xxx_messageInfo_LoadObjectsByVolumeRequest.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_LoadObjectsByVolumeRequest proto.InternalMessageInfo
+
+func (m *LoadObjectsByVolumeRequest) GetIndex() uint32 {
+ if m != nil {
+ return m.Index
+ }
+ return 0
+}
+
+func (m *LoadObjectsByVolumeRequest) GetQuarantined() bool {
+ if m != nil {
+ return m.Quarantined
+ }
+ return false
+}
+
+func (m *LoadObjectsByVolumeRequest) GetPageToken() []byte {
+ if m != nil {
+ return m.PageToken
+ }
+ return nil
+}
+
+func (m *LoadObjectsByVolumeRequest) GetPageSize() uint32 {
+ if m != nil {
+ return m.PageSize
+ }
+ return 0
+}
+
+func (m *LoadObjectsByVolumeRequest) GetRepairTool() bool {
+ if m != nil {
+ return m.RepairTool
+ }
+ return false
+}
+
+type LoadObjectsByVolumeReply struct {
+ Objects []*Object `protobuf:"bytes,1,rep,name=objects,proto3" json:"objects,omitempty"`
+ NextPageToken []byte `protobuf:"bytes,2,opt,name=next_page_token,json=nextPageToken,proto3" json:"next_page_token,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *LoadObjectsByVolumeReply) Reset() { *m = LoadObjectsByVolumeReply{} }
+func (m *LoadObjectsByVolumeReply) String() string { return proto.CompactTextString(m) }
+func (*LoadObjectsByVolumeReply) ProtoMessage() {}
+func (*LoadObjectsByVolumeReply) Descriptor() ([]byte, []int) {
+ return fileDescriptor_1fcd0776e05e82a6, []int{25}
+}
+
+func (m *LoadObjectsByVolumeReply) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_LoadObjectsByVolumeReply.Unmarshal(m, b)
+}
+func (m *LoadObjectsByVolumeReply) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_LoadObjectsByVolumeReply.Marshal(b, m, deterministic)
+}
+func (m *LoadObjectsByVolumeReply) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_LoadObjectsByVolumeReply.Merge(m, src)
+}
+func (m *LoadObjectsByVolumeReply) XXX_Size() int {
+ return xxx_messageInfo_LoadObjectsByVolumeReply.Size(m)
+}
+func (m *LoadObjectsByVolumeReply) XXX_DiscardUnknown() {
+ xxx_messageInfo_LoadObjectsByVolumeReply.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_LoadObjectsByVolumeReply proto.InternalMessageInfo
+
+func (m *LoadObjectsByVolumeReply) GetObjects() []*Object {
+ if m != nil {
+ return m.Objects
+ }
+ return nil
+}
+
+func (m *LoadObjectsByVolumeReply) GetNextPageToken() []byte {
+ if m != nil {
+ return m.NextPageToken
+ }
+ return nil
+}
+
+type ListPartitionsRequest struct {
+ PartitionBits uint32 `protobuf:"varint,1,opt,name=partition_bits,json=partitionBits,proto3" json:"partition_bits,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *ListPartitionsRequest) Reset() { *m = ListPartitionsRequest{} }
+func (m *ListPartitionsRequest) String() string { return proto.CompactTextString(m) }
+func (*ListPartitionsRequest) ProtoMessage() {}
+func (*ListPartitionsRequest) Descriptor() ([]byte, []int) {
+ return fileDescriptor_1fcd0776e05e82a6, []int{26}
+}
+
+func (m *ListPartitionsRequest) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_ListPartitionsRequest.Unmarshal(m, b)
+}
+func (m *ListPartitionsRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_ListPartitionsRequest.Marshal(b, m, deterministic)
+}
+func (m *ListPartitionsRequest) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_ListPartitionsRequest.Merge(m, src)
+}
+func (m *ListPartitionsRequest) XXX_Size() int {
+ return xxx_messageInfo_ListPartitionsRequest.Size(m)
+}
+func (m *ListPartitionsRequest) XXX_DiscardUnknown() {
+ xxx_messageInfo_ListPartitionsRequest.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_ListPartitionsRequest proto.InternalMessageInfo
+
+func (m *ListPartitionsRequest) GetPartitionBits() uint32 {
+ if m != nil {
+ return m.PartitionBits
+ }
+ return 0
+}
+
+type ListPartitionRequest struct {
+ Partition uint32 `protobuf:"varint,1,opt,name=partition,proto3" json:"partition,omitempty"`
+ PartitionBits uint32 `protobuf:"varint,2,opt,name=partition_bits,json=partitionBits,proto3" json:"partition_bits,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *ListPartitionRequest) Reset() { *m = ListPartitionRequest{} }
+func (m *ListPartitionRequest) String() string { return proto.CompactTextString(m) }
+func (*ListPartitionRequest) ProtoMessage() {}
+func (*ListPartitionRequest) Descriptor() ([]byte, []int) {
+ return fileDescriptor_1fcd0776e05e82a6, []int{27}
+}
+
+func (m *ListPartitionRequest) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_ListPartitionRequest.Unmarshal(m, b)
+}
+func (m *ListPartitionRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_ListPartitionRequest.Marshal(b, m, deterministic)
+}
+func (m *ListPartitionRequest) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_ListPartitionRequest.Merge(m, src)
+}
+func (m *ListPartitionRequest) XXX_Size() int {
+ return xxx_messageInfo_ListPartitionRequest.Size(m)
+}
+func (m *ListPartitionRequest) XXX_DiscardUnknown() {
+ xxx_messageInfo_ListPartitionRequest.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_ListPartitionRequest proto.InternalMessageInfo
+
+func (m *ListPartitionRequest) GetPartition() uint32 {
+ if m != nil {
+ return m.Partition
+ }
+ return 0
+}
+
+func (m *ListPartitionRequest) GetPartitionBits() uint32 {
+ if m != nil {
+ return m.PartitionBits
+ }
+ return 0
+}
+
+type ListSuffixRequest struct {
+ Partition uint32 `protobuf:"varint,1,opt,name=partition,proto3" json:"partition,omitempty"`
+ Suffix []byte `protobuf:"bytes,2,opt,name=suffix,proto3" json:"suffix,omitempty"`
+ PartitionBits uint32 `protobuf:"varint,3,opt,name=partition_bits,json=partitionBits,proto3" json:"partition_bits,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *ListSuffixRequest) Reset() { *m = ListSuffixRequest{} }
+func (m *ListSuffixRequest) String() string { return proto.CompactTextString(m) }
+func (*ListSuffixRequest) ProtoMessage() {}
+func (*ListSuffixRequest) Descriptor() ([]byte, []int) {
+ return fileDescriptor_1fcd0776e05e82a6, []int{28}
+}
+
+func (m *ListSuffixRequest) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_ListSuffixRequest.Unmarshal(m, b)
+}
+func (m *ListSuffixRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_ListSuffixRequest.Marshal(b, m, deterministic)
+}
+func (m *ListSuffixRequest) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_ListSuffixRequest.Merge(m, src)
+}
+func (m *ListSuffixRequest) XXX_Size() int {
+ return xxx_messageInfo_ListSuffixRequest.Size(m)
+}
+func (m *ListSuffixRequest) XXX_DiscardUnknown() {
+ xxx_messageInfo_ListSuffixRequest.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_ListSuffixRequest proto.InternalMessageInfo
+
+func (m *ListSuffixRequest) GetPartition() uint32 {
+ if m != nil {
+ return m.Partition
+ }
+ return 0
+}
+
+func (m *ListSuffixRequest) GetSuffix() []byte {
+ if m != nil {
+ return m.Suffix
+ }
+ return nil
+}
+
+func (m *ListSuffixRequest) GetPartitionBits() uint32 {
+ if m != nil {
+ return m.PartitionBits
+ }
+ return 0
+}
+
+type ListQuarantinedOHashesRequest struct {
+ PageToken []byte `protobuf:"bytes,1,opt,name=page_token,json=pageToken,proto3" json:"page_token,omitempty"`
+ PageSize uint32 `protobuf:"varint,2,opt,name=page_size,json=pageSize,proto3" json:"page_size,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *ListQuarantinedOHashesRequest) Reset() { *m = ListQuarantinedOHashesRequest{} }
+func (m *ListQuarantinedOHashesRequest) String() string { return proto.CompactTextString(m) }
+func (*ListQuarantinedOHashesRequest) ProtoMessage() {}
+func (*ListQuarantinedOHashesRequest) Descriptor() ([]byte, []int) {
+ return fileDescriptor_1fcd0776e05e82a6, []int{29}
+}
+
+func (m *ListQuarantinedOHashesRequest) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_ListQuarantinedOHashesRequest.Unmarshal(m, b)
+}
+func (m *ListQuarantinedOHashesRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_ListQuarantinedOHashesRequest.Marshal(b, m, deterministic)
+}
+func (m *ListQuarantinedOHashesRequest) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_ListQuarantinedOHashesRequest.Merge(m, src)
+}
+func (m *ListQuarantinedOHashesRequest) XXX_Size() int {
+ return xxx_messageInfo_ListQuarantinedOHashesRequest.Size(m)
+}
+func (m *ListQuarantinedOHashesRequest) XXX_DiscardUnknown() {
+ xxx_messageInfo_ListQuarantinedOHashesRequest.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_ListQuarantinedOHashesRequest proto.InternalMessageInfo
+
+func (m *ListQuarantinedOHashesRequest) GetPageToken() []byte {
+ if m != nil {
+ return m.PageToken
+ }
+ return nil
+}
+
+func (m *ListQuarantinedOHashesRequest) GetPageSize() uint32 {
+ if m != nil {
+ return m.PageSize
+ }
+ return 0
+}
+
+type ListQuarantinedOHashesReply struct {
+ Objects []*QuarantinedObjectName `protobuf:"bytes,1,rep,name=objects,proto3" json:"objects,omitempty"`
+ NextPageToken []byte `protobuf:"bytes,2,opt,name=next_page_token,json=nextPageToken,proto3" json:"next_page_token,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *ListQuarantinedOHashesReply) Reset() { *m = ListQuarantinedOHashesReply{} }
+func (m *ListQuarantinedOHashesReply) String() string { return proto.CompactTextString(m) }
+func (*ListQuarantinedOHashesReply) ProtoMessage() {}
+func (*ListQuarantinedOHashesReply) Descriptor() ([]byte, []int) {
+ return fileDescriptor_1fcd0776e05e82a6, []int{30}
+}
+
+func (m *ListQuarantinedOHashesReply) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_ListQuarantinedOHashesReply.Unmarshal(m, b)
+}
+func (m *ListQuarantinedOHashesReply) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_ListQuarantinedOHashesReply.Marshal(b, m, deterministic)
+}
+func (m *ListQuarantinedOHashesReply) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_ListQuarantinedOHashesReply.Merge(m, src)
+}
+func (m *ListQuarantinedOHashesReply) XXX_Size() int {
+ return xxx_messageInfo_ListQuarantinedOHashesReply.Size(m)
+}
+func (m *ListQuarantinedOHashesReply) XXX_DiscardUnknown() {
+ xxx_messageInfo_ListQuarantinedOHashesReply.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_ListQuarantinedOHashesReply proto.InternalMessageInfo
+
+func (m *ListQuarantinedOHashesReply) GetObjects() []*QuarantinedObjectName {
+ if m != nil {
+ return m.Objects
+ }
+ return nil
+}
+
+func (m *ListQuarantinedOHashesReply) GetNextPageToken() []byte {
+ if m != nil {
+ return m.NextPageToken
+ }
+ return nil
+}
+
+type ListQuarantinedOHashRequest struct {
+ Prefix []byte `protobuf:"bytes,1,opt,name=prefix,proto3" json:"prefix,omitempty"`
+ RepairTool bool `protobuf:"varint,2,opt,name=repair_tool,json=repairTool,proto3" json:"repair_tool,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *ListQuarantinedOHashRequest) Reset() { *m = ListQuarantinedOHashRequest{} }
+func (m *ListQuarantinedOHashRequest) String() string { return proto.CompactTextString(m) }
+func (*ListQuarantinedOHashRequest) ProtoMessage() {}
+func (*ListQuarantinedOHashRequest) Descriptor() ([]byte, []int) {
+ return fileDescriptor_1fcd0776e05e82a6, []int{31}
+}
+
+func (m *ListQuarantinedOHashRequest) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_ListQuarantinedOHashRequest.Unmarshal(m, b)
+}
+func (m *ListQuarantinedOHashRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_ListQuarantinedOHashRequest.Marshal(b, m, deterministic)
+}
+func (m *ListQuarantinedOHashRequest) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_ListQuarantinedOHashRequest.Merge(m, src)
+}
+func (m *ListQuarantinedOHashRequest) XXX_Size() int {
+ return xxx_messageInfo_ListQuarantinedOHashRequest.Size(m)
+}
+func (m *ListQuarantinedOHashRequest) XXX_DiscardUnknown() {
+ xxx_messageInfo_ListQuarantinedOHashRequest.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_ListQuarantinedOHashRequest proto.InternalMessageInfo
+
+func (m *ListQuarantinedOHashRequest) GetPrefix() []byte {
+ if m != nil {
+ return m.Prefix
+ }
+ return nil
+}
+
+func (m *ListQuarantinedOHashRequest) GetRepairTool() bool {
+ if m != nil {
+ return m.RepairTool
+ }
+ return false
+}
+
+type ListQuarantinedOHashReply struct {
+ Objects []*Object `protobuf:"bytes,1,rep,name=objects,proto3" json:"objects,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *ListQuarantinedOHashReply) Reset() { *m = ListQuarantinedOHashReply{} }
+func (m *ListQuarantinedOHashReply) String() string { return proto.CompactTextString(m) }
+func (*ListQuarantinedOHashReply) ProtoMessage() {}
+func (*ListQuarantinedOHashReply) Descriptor() ([]byte, []int) {
+ return fileDescriptor_1fcd0776e05e82a6, []int{32}
+}
+
+func (m *ListQuarantinedOHashReply) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_ListQuarantinedOHashReply.Unmarshal(m, b)
+}
+func (m *ListQuarantinedOHashReply) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_ListQuarantinedOHashReply.Marshal(b, m, deterministic)
+}
+func (m *ListQuarantinedOHashReply) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_ListQuarantinedOHashReply.Merge(m, src)
+}
+func (m *ListQuarantinedOHashReply) XXX_Size() int {
+ return xxx_messageInfo_ListQuarantinedOHashReply.Size(m)
+}
+func (m *ListQuarantinedOHashReply) XXX_DiscardUnknown() {
+ xxx_messageInfo_ListQuarantinedOHashReply.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_ListQuarantinedOHashReply proto.InternalMessageInfo
+
+func (m *ListQuarantinedOHashReply) GetObjects() []*Object {
+ if m != nil {
+ return m.Objects
+ }
+ return nil
+}
+
+type GetNextOffsetRequest struct {
+ VolumeIndex uint32 `protobuf:"varint,1,opt,name=volume_index,json=volumeIndex,proto3" json:"volume_index,omitempty"`
+ RepairTool bool `protobuf:"varint,2,opt,name=repair_tool,json=repairTool,proto3" json:"repair_tool,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *GetNextOffsetRequest) Reset() { *m = GetNextOffsetRequest{} }
+func (m *GetNextOffsetRequest) String() string { return proto.CompactTextString(m) }
+func (*GetNextOffsetRequest) ProtoMessage() {}
+func (*GetNextOffsetRequest) Descriptor() ([]byte, []int) {
+ return fileDescriptor_1fcd0776e05e82a6, []int{33}
+}
+
+func (m *GetNextOffsetRequest) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_GetNextOffsetRequest.Unmarshal(m, b)
+}
+func (m *GetNextOffsetRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_GetNextOffsetRequest.Marshal(b, m, deterministic)
+}
+func (m *GetNextOffsetRequest) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_GetNextOffsetRequest.Merge(m, src)
+}
+func (m *GetNextOffsetRequest) XXX_Size() int {
+ return xxx_messageInfo_GetNextOffsetRequest.Size(m)
+}
+func (m *GetNextOffsetRequest) XXX_DiscardUnknown() {
+ xxx_messageInfo_GetNextOffsetRequest.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_GetNextOffsetRequest proto.InternalMessageInfo
+
+func (m *GetNextOffsetRequest) GetVolumeIndex() uint32 {
+ if m != nil {
+ return m.VolumeIndex
+ }
+ return 0
+}
+
+func (m *GetNextOffsetRequest) GetRepairTool() bool {
+ if m != nil {
+ return m.RepairTool
+ }
+ return false
+}
+
+type GetNextOffsetReply struct {
+ Offset uint64 `protobuf:"varint,1,opt,name=offset,proto3" json:"offset,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *GetNextOffsetReply) Reset() { *m = GetNextOffsetReply{} }
+func (m *GetNextOffsetReply) String() string { return proto.CompactTextString(m) }
+func (*GetNextOffsetReply) ProtoMessage() {}
+func (*GetNextOffsetReply) Descriptor() ([]byte, []int) {
+ return fileDescriptor_1fcd0776e05e82a6, []int{34}
+}
+
+func (m *GetNextOffsetReply) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_GetNextOffsetReply.Unmarshal(m, b)
+}
+func (m *GetNextOffsetReply) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_GetNextOffsetReply.Marshal(b, m, deterministic)
+}
+func (m *GetNextOffsetReply) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_GetNextOffsetReply.Merge(m, src)
+}
+func (m *GetNextOffsetReply) XXX_Size() int {
+ return xxx_messageInfo_GetNextOffsetReply.Size(m)
+}
+func (m *GetNextOffsetReply) XXX_DiscardUnknown() {
+ xxx_messageInfo_GetNextOffsetReply.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_GetNextOffsetReply proto.InternalMessageInfo
+
+func (m *GetNextOffsetReply) GetOffset() uint64 {
+ if m != nil {
+ return m.Offset
+ }
+ return 0
+}
+
+type GetStatsRequest struct {
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *GetStatsRequest) Reset() { *m = GetStatsRequest{} }
+func (m *GetStatsRequest) String() string { return proto.CompactTextString(m) }
+func (*GetStatsRequest) ProtoMessage() {}
+func (*GetStatsRequest) Descriptor() ([]byte, []int) {
+ return fileDescriptor_1fcd0776e05e82a6, []int{35}
+}
+
+func (m *GetStatsRequest) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_GetStatsRequest.Unmarshal(m, b)
+}
+func (m *GetStatsRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_GetStatsRequest.Marshal(b, m, deterministic)
+}
+func (m *GetStatsRequest) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_GetStatsRequest.Merge(m, src)
+}
+func (m *GetStatsRequest) XXX_Size() int {
+ return xxx_messageInfo_GetStatsRequest.Size(m)
+}
+func (m *GetStatsRequest) XXX_DiscardUnknown() {
+ xxx_messageInfo_GetStatsRequest.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_GetStatsRequest proto.InternalMessageInfo
+
+type GetStatsReply struct {
+ Stats map[string]uint64 `protobuf:"bytes,1,rep,name=stats,proto3" json:"stats,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"varint,2,opt,name=value,proto3"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *GetStatsReply) Reset() { *m = GetStatsReply{} }
+func (m *GetStatsReply) String() string { return proto.CompactTextString(m) }
+func (*GetStatsReply) ProtoMessage() {}
+func (*GetStatsReply) Descriptor() ([]byte, []int) {
+ return fileDescriptor_1fcd0776e05e82a6, []int{36}
+}
+
+func (m *GetStatsReply) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_GetStatsReply.Unmarshal(m, b)
+}
+func (m *GetStatsReply) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_GetStatsReply.Marshal(b, m, deterministic)
+}
+func (m *GetStatsReply) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_GetStatsReply.Merge(m, src)
+}
+func (m *GetStatsReply) XXX_Size() int {
+ return xxx_messageInfo_GetStatsReply.Size(m)
+}
+func (m *GetStatsReply) XXX_DiscardUnknown() {
+ xxx_messageInfo_GetStatsReply.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_GetStatsReply proto.InternalMessageInfo
+
+func (m *GetStatsReply) GetStats() map[string]uint64 {
+ if m != nil {
+ return m.Stats
+ }
+ return nil
+}
+
+type SetKvStateReply struct {
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *SetKvStateReply) Reset() { *m = SetKvStateReply{} }
+func (m *SetKvStateReply) String() string { return proto.CompactTextString(m) }
+func (*SetKvStateReply) ProtoMessage() {}
+func (*SetKvStateReply) Descriptor() ([]byte, []int) {
+ return fileDescriptor_1fcd0776e05e82a6, []int{37}
+}
+
+func (m *SetKvStateReply) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_SetKvStateReply.Unmarshal(m, b)
+}
+func (m *SetKvStateReply) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_SetKvStateReply.Marshal(b, m, deterministic)
+}
+func (m *SetKvStateReply) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_SetKvStateReply.Merge(m, src)
+}
+func (m *SetKvStateReply) XXX_Size() int {
+ return xxx_messageInfo_SetKvStateReply.Size(m)
+}
+func (m *SetKvStateReply) XXX_DiscardUnknown() {
+ xxx_messageInfo_SetKvStateReply.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_SetKvStateReply proto.InternalMessageInfo
+
+type GetKvStateRequest struct {
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *GetKvStateRequest) Reset() { *m = GetKvStateRequest{} }
+func (m *GetKvStateRequest) String() string { return proto.CompactTextString(m) }
+func (*GetKvStateRequest) ProtoMessage() {}
+func (*GetKvStateRequest) Descriptor() ([]byte, []int) {
+ return fileDescriptor_1fcd0776e05e82a6, []int{38}
+}
+
+func (m *GetKvStateRequest) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_GetKvStateRequest.Unmarshal(m, b)
+}
+func (m *GetKvStateRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_GetKvStateRequest.Marshal(b, m, deterministic)
+}
+func (m *GetKvStateRequest) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_GetKvStateRequest.Merge(m, src)
+}
+func (m *GetKvStateRequest) XXX_Size() int {
+ return xxx_messageInfo_GetKvStateRequest.Size(m)
+}
+func (m *GetKvStateRequest) XXX_DiscardUnknown() {
+ xxx_messageInfo_GetKvStateRequest.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_GetKvStateRequest proto.InternalMessageInfo
+
+type KvState struct {
+ IsClean bool `protobuf:"varint,1,opt,name=isClean,proto3" json:"isClean,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *KvState) Reset() { *m = KvState{} }
+func (m *KvState) String() string { return proto.CompactTextString(m) }
+func (*KvState) ProtoMessage() {}
+func (*KvState) Descriptor() ([]byte, []int) {
+ return fileDescriptor_1fcd0776e05e82a6, []int{39}
+}
+
+func (m *KvState) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_KvState.Unmarshal(m, b)
+}
+func (m *KvState) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_KvState.Marshal(b, m, deterministic)
+}
+func (m *KvState) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_KvState.Merge(m, src)
+}
+func (m *KvState) XXX_Size() int {
+ return xxx_messageInfo_KvState.Size(m)
+}
+func (m *KvState) XXX_DiscardUnknown() {
+ xxx_messageInfo_KvState.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_KvState proto.InternalMessageInfo
+
+func (m *KvState) GetIsClean() bool {
+ if m != nil {
+ return m.IsClean
+ }
+ return false
+}
+
+// Generic messages
+type Volume struct {
+ VolumeIndex uint32 `protobuf:"varint,1,opt,name=volume_index,json=volumeIndex,proto3" json:"volume_index,omitempty"`
+ VolumeType VolumeType `protobuf:"varint,2,opt,name=volume_type,json=volumeType,proto3,enum=filemgr.VolumeType" json:"volume_type,omitempty"`
+ VolumeState uint32 `protobuf:"varint,3,opt,name=volume_state,json=volumeState,proto3" json:"volume_state,omitempty"`
+ Partition uint32 `protobuf:"varint,4,opt,name=partition,proto3" json:"partition,omitempty"`
+ NextOffset uint64 `protobuf:"varint,5,opt,name=next_offset,json=nextOffset,proto3" json:"next_offset,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *Volume) Reset() { *m = Volume{} }
+func (m *Volume) String() string { return proto.CompactTextString(m) }
+func (*Volume) ProtoMessage() {}
+func (*Volume) Descriptor() ([]byte, []int) {
+ return fileDescriptor_1fcd0776e05e82a6, []int{40}
+}
+
+func (m *Volume) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_Volume.Unmarshal(m, b)
+}
+func (m *Volume) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_Volume.Marshal(b, m, deterministic)
+}
+func (m *Volume) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_Volume.Merge(m, src)
+}
+func (m *Volume) XXX_Size() int {
+ return xxx_messageInfo_Volume.Size(m)
+}
+func (m *Volume) XXX_DiscardUnknown() {
+ xxx_messageInfo_Volume.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_Volume proto.InternalMessageInfo
+
+func (m *Volume) GetVolumeIndex() uint32 {
+ if m != nil {
+ return m.VolumeIndex
+ }
+ return 0
+}
+
+func (m *Volume) GetVolumeType() VolumeType {
+ if m != nil {
+ return m.VolumeType
+ }
+ return VolumeType_VOLUME_DEFAULT
+}
+
+func (m *Volume) GetVolumeState() uint32 {
+ if m != nil {
+ return m.VolumeState
+ }
+ return 0
+}
+
+func (m *Volume) GetPartition() uint32 {
+ if m != nil {
+ return m.Partition
+ }
+ return 0
+}
+
+func (m *Volume) GetNextOffset() uint64 {
+ if m != nil {
+ return m.NextOffset
+ }
+ return 0
+}
+
+type Object struct {
+ Name []byte `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
+ VolumeIndex uint32 `protobuf:"varint,2,opt,name=volume_index,json=volumeIndex,proto3" json:"volume_index,omitempty"`
+ Offset uint64 `protobuf:"varint,3,opt,name=offset,proto3" json:"offset,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *Object) Reset() { *m = Object{} }
+func (m *Object) String() string { return proto.CompactTextString(m) }
+func (*Object) ProtoMessage() {}
+func (*Object) Descriptor() ([]byte, []int) {
+ return fileDescriptor_1fcd0776e05e82a6, []int{41}
+}
+
+func (m *Object) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_Object.Unmarshal(m, b)
+}
+func (m *Object) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_Object.Marshal(b, m, deterministic)
+}
+func (m *Object) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_Object.Merge(m, src)
+}
+func (m *Object) XXX_Size() int {
+ return xxx_messageInfo_Object.Size(m)
+}
+func (m *Object) XXX_DiscardUnknown() {
+ xxx_messageInfo_Object.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_Object proto.InternalMessageInfo
+
+func (m *Object) GetName() []byte {
+ if m != nil {
+ return m.Name
+ }
+ return nil
+}
+
+func (m *Object) GetVolumeIndex() uint32 {
+ if m != nil {
+ return m.VolumeIndex
+ }
+ return 0
+}
+
+func (m *Object) GetOffset() uint64 {
+ if m != nil {
+ return m.Offset
+ }
+ return 0
+}
+
+type QuarantinedObjectName struct {
+ Name []byte `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *QuarantinedObjectName) Reset() { *m = QuarantinedObjectName{} }
+func (m *QuarantinedObjectName) String() string { return proto.CompactTextString(m) }
+func (*QuarantinedObjectName) ProtoMessage() {}
+func (*QuarantinedObjectName) Descriptor() ([]byte, []int) {
+ return fileDescriptor_1fcd0776e05e82a6, []int{42}
+}
+
+func (m *QuarantinedObjectName) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_QuarantinedObjectName.Unmarshal(m, b)
+}
+func (m *QuarantinedObjectName) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_QuarantinedObjectName.Marshal(b, m, deterministic)
+}
+func (m *QuarantinedObjectName) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_QuarantinedObjectName.Merge(m, src)
+}
+func (m *QuarantinedObjectName) XXX_Size() int {
+ return xxx_messageInfo_QuarantinedObjectName.Size(m)
+}
+func (m *QuarantinedObjectName) XXX_DiscardUnknown() {
+ xxx_messageInfo_QuarantinedObjectName.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_QuarantinedObjectName proto.InternalMessageInfo
+
+func (m *QuarantinedObjectName) GetName() []byte {
+ if m != nil {
+ return m.Name
+ }
+ return nil
+}
+
+// For listdir() like functions
+type DirEntries struct {
+ Entry []string `protobuf:"bytes,1,rep,name=entry,proto3" json:"entry,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *DirEntries) Reset() { *m = DirEntries{} }
+func (m *DirEntries) String() string { return proto.CompactTextString(m) }
+func (*DirEntries) ProtoMessage() {}
+func (*DirEntries) Descriptor() ([]byte, []int) {
+ return fileDescriptor_1fcd0776e05e82a6, []int{43}
+}
+
+func (m *DirEntries) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_DirEntries.Unmarshal(m, b)
+}
+func (m *DirEntries) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_DirEntries.Marshal(b, m, deterministic)
+}
+func (m *DirEntries) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_DirEntries.Merge(m, src)
+}
+func (m *DirEntries) XXX_Size() int {
+ return xxx_messageInfo_DirEntries.Size(m)
+}
+func (m *DirEntries) XXX_DiscardUnknown() {
+ xxx_messageInfo_DirEntries.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_DirEntries proto.InternalMessageInfo
+
+func (m *DirEntries) GetEntry() []string {
+ if m != nil {
+ return m.Entry
+ }
+ return nil
+}
+
+func init() {
+ proto.RegisterEnum("filemgr.VolumeType", VolumeType_name, VolumeType_value)
+ proto.RegisterEnum("filemgr.VolumeState", VolumeState_name, VolumeState_value)
+ proto.RegisterType((*RegisterVolumeRequest)(nil), "filemgr.RegisterVolumeRequest")
+ proto.RegisterType((*RegisterVolumeReply)(nil), "filemgr.RegisterVolumeReply")
+ proto.RegisterType((*UnregisterVolumeRequest)(nil), "filemgr.UnregisterVolumeRequest")
+ proto.RegisterType((*UnregisterVolumeReply)(nil), "filemgr.UnregisterVolumeReply")
+ proto.RegisterType((*UpdateVolumeStateRequest)(nil), "filemgr.UpdateVolumeStateRequest")
+ proto.RegisterType((*UpdateVolumeStateReply)(nil), "filemgr.UpdateVolumeStateReply")
+ proto.RegisterType((*GetVolumeRequest)(nil), "filemgr.GetVolumeRequest")
+ proto.RegisterType((*GetVolumeReply)(nil), "filemgr.GetVolumeReply")
+ proto.RegisterType((*ListVolumesRequest)(nil), "filemgr.ListVolumesRequest")
+ proto.RegisterType((*ListVolumesReply)(nil), "filemgr.ListVolumesReply")
+ proto.RegisterType((*RegisterObjectRequest)(nil), "filemgr.RegisterObjectRequest")
+ proto.RegisterType((*RegisterObjectReply)(nil), "filemgr.RegisterObjectReply")
+ proto.RegisterType((*UnregisterObjectRequest)(nil), "filemgr.UnregisterObjectRequest")
+ proto.RegisterType((*UnregisterObjectReply)(nil), "filemgr.UnregisterObjectReply")
+ proto.RegisterType((*RenameObjectRequest)(nil), "filemgr.RenameObjectRequest")
+ proto.RegisterType((*RenameObjectReply)(nil), "filemgr.RenameObjectReply")
+ proto.RegisterType((*LoadObjectRequest)(nil), "filemgr.LoadObjectRequest")
+ proto.RegisterType((*LoadObjectReply)(nil), "filemgr.LoadObjectReply")
+ proto.RegisterType((*QuarantineObjectRequest)(nil), "filemgr.QuarantineObjectRequest")
+ proto.RegisterType((*QuarantineObjectReply)(nil), "filemgr.QuarantineObjectReply")
+ proto.RegisterType((*UnquarantineObjectRequest)(nil), "filemgr.UnquarantineObjectRequest")
+ proto.RegisterType((*UnquarantineObjectReply)(nil), "filemgr.UnquarantineObjectReply")
+ proto.RegisterType((*LoadObjectsByPrefixRequest)(nil), "filemgr.LoadObjectsByPrefixRequest")
+ proto.RegisterType((*LoadObjectsByPrefixReply)(nil), "filemgr.LoadObjectsByPrefixReply")
+ proto.RegisterType((*LoadObjectsByVolumeRequest)(nil), "filemgr.LoadObjectsByVolumeRequest")
+ proto.RegisterType((*LoadObjectsByVolumeReply)(nil), "filemgr.LoadObjectsByVolumeReply")
+ proto.RegisterType((*ListPartitionsRequest)(nil), "filemgr.ListPartitionsRequest")
+ proto.RegisterType((*ListPartitionRequest)(nil), "filemgr.ListPartitionRequest")
+ proto.RegisterType((*ListSuffixRequest)(nil), "filemgr.ListSuffixRequest")
+ proto.RegisterType((*ListQuarantinedOHashesRequest)(nil), "filemgr.ListQuarantinedOHashesRequest")
+ proto.RegisterType((*ListQuarantinedOHashesReply)(nil), "filemgr.ListQuarantinedOHashesReply")
+ proto.RegisterType((*ListQuarantinedOHashRequest)(nil), "filemgr.ListQuarantinedOHashRequest")
+ proto.RegisterType((*ListQuarantinedOHashReply)(nil), "filemgr.ListQuarantinedOHashReply")
+ proto.RegisterType((*GetNextOffsetRequest)(nil), "filemgr.GetNextOffsetRequest")
+ proto.RegisterType((*GetNextOffsetReply)(nil), "filemgr.GetNextOffsetReply")
+ proto.RegisterType((*GetStatsRequest)(nil), "filemgr.GetStatsRequest")
+ proto.RegisterType((*GetStatsReply)(nil), "filemgr.GetStatsReply")
+ proto.RegisterMapType((map[string]uint64)(nil), "filemgr.GetStatsReply.StatsEntry")
+ proto.RegisterType((*SetKvStateReply)(nil), "filemgr.SetKvStateReply")
+ proto.RegisterType((*GetKvStateRequest)(nil), "filemgr.GetKvStateRequest")
+ proto.RegisterType((*KvState)(nil), "filemgr.KvState")
+ proto.RegisterType((*Volume)(nil), "filemgr.Volume")
+ proto.RegisterType((*Object)(nil), "filemgr.Object")
+ proto.RegisterType((*QuarantinedObjectName)(nil), "filemgr.QuarantinedObjectName")
+ proto.RegisterType((*DirEntries)(nil), "filemgr.DirEntries")
+}
+
+func init() {
+ proto.RegisterFile("fmgr.proto", fileDescriptor_1fcd0776e05e82a6)
+}
+
+var fileDescriptor_1fcd0776e05e82a6 = []byte{
+ // 1083 bytes of a gzipped FileDescriptorProto
+ 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xd4, 0x57, 0xdd, 0x6e, 0xe3, 0x44,
+ 0x1b, 0x5e, 0x3b, 0x69, 0xda, 0xbe, 0xe9, 0x8f, 0x33, 0x4d, 0x53, 0x77, 0xfb, 0xed, 0xb7, 0x59,
+ 0xa3, 0x85, 0x50, 0x50, 0x0f, 0x16, 0x24, 0x2a, 0x24, 0x90, 0xd2, 0xd6, 0x1b, 0x2a, 0xd2, 0x24,
+ 0x38, 0x4e, 0x17, 0xc1, 0x81, 0x71, 0xe9, 0x24, 0x98, 0x4d, 0x6c, 0xd7, 0x9e, 0x74, 0x9b, 0x15,
+ 0x12, 0x12, 0x87, 0x5c, 0x09, 0xa7, 0x5c, 0x03, 0x87, 0xdc, 0x07, 0xd7, 0x81, 0x66, 0xc6, 0x4e,
+ 0xfc, 0x47, 0xd3, 0x88, 0x9c, 0x70, 0xe6, 0xf7, 0xb1, 0xf3, 0x3e, 0xcf, 0xfb, 0x3b, 0x13, 0x80,
+ 0xfe, 0x68, 0xe0, 0x1d, 0xb9, 0x9e, 0x43, 0x1c, 0xb4, 0xda, 0xb7, 0x86, 0x78, 0x34, 0xf0, 0x94,
+ 0xbf, 0x04, 0xd8, 0xd5, 0xf0, 0xc0, 0xf2, 0x09, 0xf6, 0x2e, 0x9d, 0xe1, 0x78, 0x84, 0x35, 0x7c,
+ 0x33, 0xc6, 0x3e, 0x41, 0xff, 0x83, 0x75, 0xd7, 0xf4, 0x88, 0x45, 0x2c, 0xc7, 0x96, 0x85, 0xaa,
+ 0x50, 0xdb, 0xd4, 0x66, 0x00, 0x7a, 0x0f, 0xf2, 0x64, 0xe2, 0x62, 0x59, 0xac, 0x0a, 0xb5, 0xad,
+ 0x17, 0x3b, 0x47, 0x81, 0xbf, 0x23, 0xee, 0x43, 0x9f, 0xb8, 0x58, 0x63, 0x1f, 0xa0, 0x67, 0xb0,
+ 0x71, 0xcb, 0x30, 0xc3, 0xb2, 0xaf, 0xf1, 0x9d, 0x9c, 0x63, 0x9e, 0x8a, 0x1c, 0x3b, 0xa7, 0x10,
+ 0xaa, 0x40, 0xc1, 0xe9, 0xf7, 0x7d, 0x4c, 0xe4, 0x7c, 0x55, 0xa8, 0xe5, 0xb5, 0xc0, 0x42, 0x87,
+ 0xb0, 0xe2, 0x13, 0x93, 0x60, 0x79, 0x85, 0x91, 0x94, 0x13, 0x24, 0x5d, 0xfa, 0x4e, 0xe3, 0x9f,
+ 0xa0, 0xa7, 0x50, 0xf4, 0xb0, 0x6b, 0x5a, 0x9e, 0x41, 0x1c, 0x67, 0x28, 0x17, 0xaa, 0x42, 0x6d,
+ 0x4d, 0x03, 0x0e, 0xe9, 0x8e, 0x33, 0x54, 0x76, 0x61, 0x27, 0x19, 0xa7, 0x3b, 0x9c, 0x28, 0x1d,
+ 0xd8, 0xeb, 0xd9, 0x5e, 0x66, 0x02, 0xca, 0xb0, 0xc2, 0x25, 0xf3, 0xe0, 0xb9, 0x91, 0x24, 0x12,
+ 0x53, 0x44, 0x7b, 0xb0, 0x9b, 0xf6, 0x48, 0xa9, 0x7e, 0x15, 0x40, 0xee, 0xb9, 0xd7, 0x26, 0xc1,
+ 0x51, 0xfd, 0x01, 0x59, 0x32, 0x4d, 0x42, 0x3a, 0x4d, 0xd3, 0x74, 0x88, 0x0b, 0xa7, 0x23, 0x97,
+ 0x52, 0x29, 0x43, 0x25, 0x43, 0x0b, 0x95, 0x79, 0x0e, 0x52, 0x03, 0x93, 0xa5, 0xa4, 0xe2, 0x4f,
+ 0x01, 0xb6, 0x22, 0xbe, 0xdc, 0xe1, 0xe4, 0x21, 0x71, 0x7e, 0x0c, 0x81, 0x69, 0xcc, 0xeb, 0x30,
+ 0xb8, 0x9d, 0x3e, 0x47, 0x1c, 0xf3, 0x24, 0xc5, 0xfa, 0x8c, 0x85, 0x17, 0xef, 0xe8, 0x7c, 0xb2,
+ 0xa3, 0x9f, 0x42, 0xd1, 0xc6, 0x77, 0xc4, 0x08, 0x5a, 0x71, 0x85, 0xb5, 0x22, 0x50, 0xa8, 0xcd,
+ 0x10, 0xe5, 0x27, 0x40, 0x4d, 0xcb, 0x0f, 0xa2, 0xf1, 0x97, 0x3c, 0x26, 0x73, 0x0b, 0xf6, 0x19,
+ 0x48, 0x31, 0x76, 0x9a, 0xcc, 0xf7, 0x61, 0x95, 0xc7, 0xe7, 0xcb, 0x42, 0x35, 0x57, 0x2b, 0xbe,
+ 0xd8, 0x4e, 0x10, 0x68, 0xe1, 0x7b, 0xe5, 0xb7, 0xc8, 0x9c, 0xb7, 0xaf, 0x7e, 0xc4, 0xdf, 0x93,
+ 0x30, 0x00, 0x04, 0x79, 0xdb, 0x1c, 0x61, 0xa6, 0x7d, 0x43, 0x63, 0xcf, 0xa9, 0x2a, 0x89, 0xf7,
+ 0x0d, 0x6d, 0x2e, 0x36, 0xb4, 0x89, 0x34, 0xe6, 0x93, 0x69, 0x4c, 0x46, 0xba, 0x72, 0xdf, 0xa4,
+ 0x86, 0x4a, 0x69, 0x5f, 0xb6, 0xa2, 0x93, 0x3a, 0x3f, 0x84, 0xc5, 0xe6, 0x34, 0x4a, 0x84, 0x29,
+ 0x3f, 0xf5, 0x31, 0x9f, 0x64, 0x1f, 0xd6, 0x6c, 0xfc, 0xc6, 0x60, 0xb8, 0xc8, 0xf0, 0x55, 0x1b,
+ 0xbf, 0x69, 0x65, 0xf0, 0xa7, 0x0b, 0xba, 0x03, 0xa5, 0x38, 0x0d, 0xe5, 0x76, 0xa0, 0xd4, 0x74,
+ 0xcc, 0xeb, 0xf9, 0xcc, 0xcf, 0x61, 0xcb, 0xf2, 0x8d, 0x9b, 0xb1, 0xe9, 0x99, 0x36, 0xb1, 0x6c,
+ 0x7c, 0x1d, 0x44, 0xb8, 0x69, 0xf9, 0x5f, 0xcd, 0xc0, 0xf9, 0x2a, 0xbe, 0x83, 0xed, 0x28, 0x21,
+ 0xed, 0xaa, 0xe5, 0x36, 0x04, 0xad, 0xdb, 0x4c, 0xd1, 0x72, 0xea, 0x96, 0xf6, 0xc7, 0x57, 0xf9,
+ 0x7e, 0xcf, 0xbe, 0x59, 0x26, 0xd5, 0x3e, 0x6d, 0xb9, 0x9b, 0x4c, 0xb2, 0x1e, 0x3c, 0x9e, 0xe5,
+ 0xcd, 0x3f, 0x99, 0x74, 0x3c, 0xdc, 0xb7, 0xee, 0x42, 0xb6, 0x0a, 0x14, 0x5c, 0x06, 0x04, 0x7c,
+ 0x81, 0x35, 0x9f, 0x51, 0x05, 0x39, 0xd3, 0x6d, 0x30, 0xed, 0x0e, 0xc7, 0x53, 0xd3, 0x1e, 0x28,
+ 0x0b, 0xdf, 0x2b, 0xbf, 0x0b, 0x09, 0x79, 0x0f, 0x59, 0xe7, 0x55, 0x28, 0xa6, 0xfb, 0x29, 0x0a,
+ 0xa1, 0x27, 0x00, 0xae, 0x39, 0xc0, 0x06, 0x71, 0x5e, 0x63, 0x9b, 0x95, 0x79, 0x83, 0x2e, 0xbb,
+ 0x01, 0xd6, 0x29, 0x80, 0x0e, 0x80, 0x19, 0x86, 0x6f, 0xbd, 0xc5, 0xc1, 0x7e, 0x5d, 0xa3, 0x40,
+ 0xd7, 0x7a, 0x8b, 0xe7, 0x8f, 0xfd, 0x28, 0x11, 0x7a, 0xf4, 0xd4, 0x78, 0x78, 0xe8, 0xe8, 0x5d,
+ 0xd8, 0x66, 0xfb, 0x27, 0x22, 0x94, 0x4f, 0xe6, 0x26, 0x85, 0x3b, 0xa1, 0x58, 0xe5, 0x73, 0xd8,
+ 0xa5, 0xfb, 0xb4, 0x13, 0xae, 0xea, 0xe9, 0x42, 0x7f, 0x0e, 0x5b, 0xd3, 0xfd, 0x6d, 0x5c, 0x59,
+ 0x8c, 0x92, 0x86, 0xb2, 0x39, 0x45, 0x4f, 0x2c, 0xe2, 0x2b, 0xdf, 0x42, 0x39, 0xf6, 0xfb, 0x87,
+ 0x9d, 0x07, 0x69, 0xe7, 0x62, 0x96, 0x73, 0x17, 0x4a, 0xd4, 0x79, 0x77, 0xdc, 0x8f, 0x34, 0xd5,
+ 0xfd, 0x9e, 0x2b, 0x50, 0xf0, 0xd9, 0xe7, 0x41, 0xb8, 0x81, 0x95, 0xc1, 0x98, 0xcb, 0x0e, 0xe7,
+ 0x09, 0x65, 0x8c, 0xec, 0x8e, 0xf6, 0x17, 0xa6, 0xff, 0xc3, 0xec, 0x9c, 0x8b, 0xd7, 0x5e, 0xb8,
+ 0xb7, 0xf6, 0x62, 0xbc, 0xf6, 0xca, 0xcf, 0x70, 0xf0, 0x4f, 0xce, 0x69, 0x75, 0x8f, 0x93, 0xd5,
+ 0xfd, 0xff, 0xb4, 0xba, 0xd1, 0x9f, 0xb0, 0x4f, 0xe8, 0x6e, 0x5d, 0xbc, 0xd8, 0x97, 0xd9, 0x02,
+ 0xfe, 0xf5, 0xb8, 0xbe, 0x84, 0xfd, 0x6c, 0xbf, 0x0b, 0xce, 0xeb, 0x37, 0x50, 0x6e, 0x60, 0xd2,
+ 0x9a, 0x1e, 0x92, 0x0b, 0xdc, 0x0a, 0xe7, 0x6a, 0xfc, 0x10, 0x50, 0xc2, 0x37, 0x15, 0x37, 0xdb,
+ 0xd6, 0x42, 0x6c, 0x5b, 0x97, 0x60, 0xbb, 0x81, 0x09, 0xbd, 0x2f, 0x85, 0x95, 0x57, 0x7e, 0x11,
+ 0x60, 0x73, 0x86, 0xd1, 0x1f, 0x7f, 0xc2, 0x6f, 0xa2, 0x61, 0x5c, 0xcf, 0xa6, 0x71, 0xc5, 0x3e,
+ 0x3b, 0x62, 0x8f, 0xaa, 0x4d, 0xbc, 0x09, 0xbf, 0x96, 0xfa, 0x8f, 0x8f, 0x01, 0x66, 0x20, 0x92,
+ 0x20, 0xf7, 0x1a, 0x4f, 0x98, 0x80, 0x75, 0x8d, 0x3e, 0xd2, 0xc5, 0x74, 0x6b, 0x0e, 0xc7, 0xbc,
+ 0x83, 0xf2, 0x1a, 0x37, 0x3e, 0x15, 0x8f, 0x05, 0xaa, 0xab, 0x8b, 0xc9, 0x97, 0xb7, 0x91, 0x8b,
+ 0xea, 0x0e, 0x94, 0x1a, 0x11, 0x88, 0x8b, 0x7d, 0x07, 0x56, 0x03, 0x04, 0xc9, 0xb0, 0x6a, 0xf9,
+ 0xa7, 0x43, 0x6c, 0xf2, 0x76, 0x5d, 0xd3, 0x42, 0x53, 0xf9, 0x43, 0x80, 0x02, 0x5f, 0x2f, 0xff,
+ 0xe5, 0xfb, 0xe8, 0x2b, 0x28, 0xf0, 0x3e, 0x5a, 0xf6, 0x89, 0xfd, 0x41, 0xf4, 0x84, 0x8d, 0xcc,
+ 0x5d, 0x16, 0x8f, 0xa2, 0x00, 0x9c, 0x59, 0x1e, 0x2d, 0xa8, 0x85, 0x7d, 0x5a, 0x40, 0x4c, 0x6b,
+ 0xcb, 0x3a, 0x63, 0x5d, 0xe3, 0xc6, 0x61, 0x0b, 0x60, 0x96, 0x25, 0x84, 0x60, 0xeb, 0xb2, 0xdd,
+ 0xec, 0x5d, 0xa8, 0xc6, 0x99, 0xfa, 0xb2, 0xde, 0x6b, 0xea, 0xd2, 0x23, 0x54, 0x06, 0x29, 0xc0,
+ 0xf4, 0xf6, 0xc5, 0x49, 0x57, 0x6f, 0xb7, 0x54, 0x49, 0x40, 0x15, 0x40, 0x01, 0xfa, 0xb5, 0x71,
+ 0xa6, 0x36, 0x55, 0x5d, 0x35, 0xea, 0xba, 0x24, 0x1e, 0x6a, 0x50, 0x8c, 0xfc, 0x6d, 0x41, 0x1b,
+ 0xb0, 0xd6, 0xd5, 0xeb, 0xba, 0x6a, 0x68, 0xaf, 0xa4, 0x47, 0x48, 0x86, 0x32, 0xb7, 0x4e, 0xdb,
+ 0x17, 0x9d, 0xfa, 0xa9, 0x7e, 0xde, 0x6e, 0x19, 0x5d, 0xed, 0x54, 0x12, 0xd0, 0x01, 0xec, 0xa5,
+ 0xde, 0xe8, 0x75, 0xad, 0xa1, 0xea, 0x92, 0x78, 0x55, 0x60, 0x7f, 0x8c, 0x3f, 0xfa, 0x3b, 0x00,
+ 0x00, 0xff, 0xff, 0x2f, 0xb4, 0x5f, 0x6a, 0x26, 0x0f, 0x00, 0x00,
+}
diff --git a/go/swift-rpc-losf/rpc.go b/go/swift-rpc-losf/rpc.go
new file mode 100644
index 000000000..622ad8c0e
--- /dev/null
+++ b/go/swift-rpc-losf/rpc.go
@@ -0,0 +1,1642 @@
+// Copyright (c) 2010-2012 OpenStack Foundation
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+// implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// RPC functions
+//
+// TODO: The naming of things is not consistent with the python code.
+
+package main
+
+import (
+ "bytes"
+ "context"
+ "fmt"
+ "github.com/alecuyer/statsd/v2"
+ "github.com/golang/protobuf/proto"
+ "github.com/openstack/swift-rpc-losf/codes"
+ pb "github.com/openstack/swift-rpc-losf/proto"
+ "github.com/openstack/swift-rpc-losf/status"
+ "github.com/sirupsen/logrus"
+ "io/ioutil"
+ "net"
+ "net/http"
+ "os"
+ "path"
+ "strings"
+ "sync"
+ "time"
+)
+
+type server struct {
+ kv KV
+ httpServer *http.Server
+
+ // DB state (is it in sync with the volumes state)
+ isClean bool
+
+ diskPath string // full path to mountpoint
+ diskName string // without the path
+ socketPath string // full path to the socket
+
+ // statsd used as is done in swift
+ statsd_c *statsd.Client
+
+ // channel to signal server should stop
+ stopChan chan os.Signal
+}
+
+// The following consts are used as a key prefix for different types in the KV
+
+// prefix for "volumes" (large file to which we write objects)
+const volumePrefix = 'd'
+
+// prefix for "objects" ("vfile" in the python code, would be a POSIX file on a regular backend)
+const objectPrefix = 'o'
+
+// This is meant to be used when a new file is created with the same name as an existing file.
+// Deprecate this. As discussed in https://review.openstack.org/#/c/162243, overwriting an existing file
+// never seemed like a good idea, and was done to mimic the existing renamer() behavior.
+// We have no way to know if the new file is "better" than the existing one.
+const deleteQueuePrefix = 'q'
+
+// Quarantined objects
+const quarantinePrefix = 'r'
+
+// stats stored in the KV
+const statsPrefix = 's'
+
+// max key length in ascii format.
+const maxObjKeyLen = 96
+
+type rpcFunc func(*server, context.Context, *[]byte) (*[]byte, error)
+
+// RegisterVolume registers a new volume (volume) to the KV, given its index number and starting offset.
+// Will return an error if the volume index already exists.
+func RegisterVolume(s *server, ctx context.Context, pbIn *[]byte) (*[]byte, error) {
+ in := &pb.RegisterVolumeRequest{}
+ if err := proto.Unmarshal(*pbIn, in); err != nil {
+ logrus.Errorf("RegisterVolume failed to unmarshal input: %v", err)
+ return nil, status.Errorf(codes.InvalidArgument, "unable to deserialize protobuf")
+ }
+
+ reqlog := log.WithFields(logrus.Fields{"Function": "RegisterVolume", "Partition": in.Partition, "Type": in.Type,
+ "VolumeIndex": in.VolumeIndex, "Offset": in.Offset, "State": in.State})
+ reqlog.Debug("RPC Call")
+
+ if !in.RepairTool && !s.isClean {
+ reqlog.Debug("KV out of sync with volumes")
+ return nil, status.Errorf(codes.FailedPrecondition, "KV out of sync with volumes")
+ }
+
+ key := EncodeVolumeKey(in.VolumeIndex)
+
+ // Does the volume already exist ?
+ value, err := s.kv.Get(volumePrefix, key)
+ if err != nil {
+ reqlog.Errorf("unable to check for existing volume key: %v", err)
+ s.statsd_c.Increment("register_volume.fail")
+ return nil, status.Errorf(codes.Unavailable, "unable to check for existing volume key")
+ }
+
+ if value != nil {
+ reqlog.Info("volume index already exists in db")
+ s.statsd_c.Increment("register_volume.ok")
+ return nil, status.Errorf(codes.AlreadyExists, "volume index already exists in db")
+ }
+
+ // Register the volume
+ usedSpace := int64(0)
+ value = EncodeVolumeValue(int64(in.Partition), int32(in.Type), int64(in.Offset), usedSpace, int64(in.State))
+
+ err = s.kv.Put(volumePrefix, key, value)
+ if err != nil {
+ reqlog.Errorf("failed to Put new volume entry: %v", err)
+ s.statsd_c.Increment("register_volume.fail")
+ return nil, status.Errorf(codes.Unavailable, "unable to register new volume")
+ }
+ s.statsd_c.Increment("register_volume.ok")
+
+ out, err := proto.Marshal(&pb.RegisterVolumeReply{})
+ if err != nil {
+ reqlog.Errorf("failed to serialize reply for new volume entry: %v", err)
+ return nil, status.Errorf(codes.Unavailable, "unable to serialize reply for new volume entry: %v", err)
+ }
+ return &out, nil
+}
+
+// UnregisterVolume will delete a volume entry from the kv.
+func UnregisterVolume(s *server, ctx context.Context, pbIn *[]byte) (*[]byte, error) {
+ in := &pb.UnregisterVolumeRequest{}
+ if err := proto.Unmarshal(*pbIn, in); err != nil {
+ logrus.Errorf("UnregisterVolume failed to unmarshal input: %v", err)
+ return nil, status.Errorf(codes.InvalidArgument, "unable to deserialize protobuf")
+ }
+
+ reqlog := log.WithFields(logrus.Fields{"Function": "UnregisterVolume", "VolumeIndex": in.Index})
+ reqlog.Debug("RPC Call")
+
+ if !s.isClean {
+ reqlog.Debug("KV out of sync with volumes")
+ return nil, status.Errorf(codes.FailedPrecondition, "KV out of sync with volumes")
+ }
+
+ key := EncodeVolumeKey(in.Index)
+
+ // Check for key
+ value, err := s.kv.Get(volumePrefix, key)
+ if err != nil {
+ reqlog.Errorf("unable to check for volume key: %v", err)
+ s.statsd_c.Increment("unregister_volume.fail")
+ return nil, status.Errorf(codes.Unavailable, "unable to check for volume key")
+ }
+
+ if value == nil {
+ reqlog.Info("volume index does not exist in db")
+ s.statsd_c.Increment("unregister_volume.ok")
+ return nil, status.Errorf(codes.NotFound, "volume index does not exist in db")
+ }
+
+ // Key exists, delete it
+ err = s.kv.Delete(volumePrefix, key)
+ if err != nil {
+ reqlog.Errorf("failed to Delete volume entry: %v", err)
+ s.statsd_c.Increment("unregister_volume.fail")
+ return nil, status.Errorf(codes.Unavailable, "unable to delete volume entry")
+ }
+
+ s.statsd_c.Increment("unregister_volume.ok")
+ return serializePb(&pb.UnregisterVolumeReply{})
+}
+
+// UpdateVolumeState will modify an existing volume state
+func UpdateVolumeState(s *server, ctx context.Context, pbIn *[]byte) (*[]byte, error) {
+ in := &pb.UpdateVolumeStateRequest{}
+ if err := proto.Unmarshal(*pbIn, in); err != nil {
+ logrus.Errorf("UpdateVolumeState failed to unmarshal input: %v", err)
+ return nil, status.Errorf(codes.InvalidArgument, "unable to deserialize protobuf")
+ }
+
+ reqlog := log.WithFields(logrus.Fields{"Function": "UpdateVolumeState", "VolumeIndex": in.VolumeIndex, "State": in.State})
+ reqlog.Debug("RPC Call")
+
+ if !in.RepairTool && !s.isClean {
+ reqlog.Debug("KV out of sync with volumes")
+ return nil, status.Errorf(codes.FailedPrecondition, "KV out of sync with volumes")
+ }
+
+ key := EncodeVolumeKey(in.VolumeIndex)
+ value, err := s.kv.Get(volumePrefix, key)
+ if err != nil {
+ reqlog.Errorf("unable to retrieve volume key: %v", err)
+ s.statsd_c.Increment("update_volume_state.fail")
+ return nil, status.Errorf(codes.Unavailable, "unable to retrieve volume key")
+ }
+
+ if value == nil {
+ reqlog.Info("volume index does not exist in db")
+ s.statsd_c.Increment("update_volume_state.ok")
+ return nil, status.Errorf(codes.NotFound, "volume index does not exist in db")
+ }
+
+ partition, dfType, offset, usedSpace, state, err := DecodeVolumeValue(value)
+ reqlog.WithFields(logrus.Fields{"current_state": state}).Info("updating state")
+ if err != nil {
+ reqlog.Errorf("failed to decode Volume value: %v", err)
+ s.statsd_c.Increment("update_volume_state.fail")
+ return nil, status.Errorf(codes.Internal, "failed to decode Volume value")
+ }
+
+ value = EncodeVolumeValue(partition, dfType, offset, usedSpace, int64(in.State))
+ err = s.kv.Put(volumePrefix, key, value)
+ if err != nil {
+ reqlog.Errorf("failed to Put updated volume entry: %v", err)
+ s.statsd_c.Increment("update_volume_state.fail")
+ return nil, status.Errorf(codes.Unavailable, "unable to update volume state")
+ }
+ s.statsd_c.Increment("update_volume_state.ok")
+
+ out, err := proto.Marshal(&pb.UpdateVolumeStateReply{})
+ if err != nil {
+ reqlog.Errorf("failed to serialize reply for update volume: %v", err)
+ return nil, status.Errorf(codes.Unavailable, "unable to serialize reply for update volume: %v", err)
+ }
+ return &out, nil
+}
+
+// GetVolume will return a volume information
+func GetVolume(s *server, ctx context.Context, pbIn *[]byte) (*[]byte, error) {
+ in := &pb.GetVolumeRequest{}
+ if err := proto.Unmarshal(*pbIn, in); err != nil {
+ logrus.Errorf("GetVolume failed to unmarshal input: %v", err)
+ return nil, status.Errorf(codes.InvalidArgument, "unable to deserialize protobuf")
+ }
+
+ reqlog := log.WithFields(logrus.Fields{"Function": "GetVolume", "Volume index": in.Index})
+ reqlog.Debug("RPC Call")
+
+ if !in.RepairTool && !s.isClean {
+ reqlog.Debug("KV out of sync with volumes")
+ return nil, status.Errorf(codes.FailedPrecondition, "KV out of sync with volumes")
+ }
+
+ key := EncodeVolumeKey(in.Index)
+ value, err := s.kv.Get(volumePrefix, key)
+ if err != nil {
+ reqlog.Errorf("Failed to get volume key %d in KV: %v", key, err)
+ s.statsd_c.Increment("get_volume.fail")
+ return nil, status.Errorf(codes.Internal, "Failed to get volume key in KV")
+ }
+
+ if value == nil {
+ reqlog.Info("No such Volume")
+ s.statsd_c.Increment("get_volume.ok")
+ return nil, status.Errorf(codes.NotFound, "No such Volume")
+ }
+
+ partition, dfType, nextOffset, _, state, err := DecodeVolumeValue(value)
+ if err != nil {
+ reqlog.Errorf("Failed to decode Volume value: %v", err)
+ s.statsd_c.Increment("get_volume.fail")
+ return nil, status.Errorf(codes.Internal, "Failed to decode Volume value")
+ }
+
+ s.statsd_c.Increment("get_volume.ok")
+
+ pb_volume := pb.GetVolumeReply{VolumeIndex: in.Index, VolumeType: pb.VolumeType(dfType), VolumeState: uint32(state),
+ Partition: uint32(partition), NextOffset: uint64(nextOffset)}
+ out, err := proto.Marshal(&pb_volume)
+ if err != nil {
+ reqlog.Errorf("failed to serialize reply for get volume: %v", err)
+ return nil, status.Errorf(codes.Unavailable, "unable to serialize reply for get volume: %v", err)
+ }
+ return &out, nil
+}
+
+// ListVolumes will return all volumes of the given type, for the given partition.
+// If GetlAllVolumes is true, all volumes are listed (all types, all partitions)
+// Currently this scans all volumes in the KV. Likely fast enough as long as the KV is cached.
+// If it becomes a performance issue, we may want to add an in-memory cache indexed by partition.
+func ListVolumes(s *server, ctx context.Context, pbIn *[]byte) (*[]byte, error) {
+ in := &pb.ListVolumesRequest{}
+ if err := proto.Unmarshal(*pbIn, in); err != nil {
+ logrus.Errorf("ListVolumes failed to unmarshal input: %v", err)
+ return nil, status.Errorf(codes.InvalidArgument, "unable to deserialize protobuf")
+ }
+
+ reqlog := log.WithFields(logrus.Fields{"Function": "ListVolumes", "Partition": in.Partition, "Type": in.Type})
+ reqlog.Debug("RPC Call")
+
+ if !in.RepairTool && !s.isClean {
+ reqlog.Debug("KV out of sync with volumes")
+ return nil, status.Errorf(codes.FailedPrecondition, "KV out of sync with volumes")
+ }
+
+ response := &pb.ListVolumesReply{}
+
+ // Iterate over volumes and return the ones that match the request
+ it := s.kv.NewIterator(volumePrefix)
+ defer it.Close()
+
+ for it.SeekToFirst(); it.Valid(); it.Next() {
+ idx, err := DecodeVolumeKey(it.Key())
+ if err != nil {
+ reqlog.Errorf("failed to decode volume key: %v", err)
+ s.statsd_c.Increment("list_volumes.fail")
+ return nil, status.Errorf(codes.Internal, "unable to decode volume value")
+ }
+
+ partition, dfType, nextOffset, _, state, err := DecodeVolumeValue(it.Value())
+ if err != nil {
+ reqlog.Errorf("failed to decode volume value: %v", err)
+ s.statsd_c.Increment("list_volumes.fail")
+ return nil, status.Errorf(codes.Internal, "unable to decode volume value")
+ }
+ if uint32(partition) == in.Partition && pb.VolumeType(dfType) == in.Type {
+ response.Volumes = append(response.Volumes, &pb.Volume{VolumeIndex: idx,
+ VolumeType: pb.VolumeType(in.Type), VolumeState: uint32(state),
+ Partition: uint32(partition), NextOffset: uint64(nextOffset)})
+ }
+ }
+
+ s.statsd_c.Increment("list_volumes.ok")
+ out, err := proto.Marshal(response)
+ if err != nil {
+ reqlog.Errorf("failed to serialize reply for list volumes: %v", err)
+ return nil, status.Errorf(codes.Unavailable, "unable to serialize reply for list volumes: %v", err)
+ }
+ return &out, nil
+}
+
+// RegisterObject registers a new object to the kv.
+func RegisterObject(s *server, ctx context.Context, pbIn *[]byte) (*[]byte, error) {
+ in := &pb.RegisterObjectRequest{}
+ if err := proto.Unmarshal(*pbIn, in); err != nil {
+ logrus.Errorf("RegisterObject failed to unmarshal input: %v", err)
+ return nil, status.Errorf(codes.InvalidArgument, "unable to deserialize protobuf")
+ }
+
+ reqlog := log.WithFields(logrus.Fields{
+ "Function": "RegisterObject",
+ "Name": fmt.Sprintf("%s", in.Name),
+ "DiskPath": s.diskPath,
+ "VolumeIndex": in.VolumeIndex,
+ "Offset": in.Offset,
+ "NextOffset": in.NextOffset,
+ "Length": in.NextOffset - in.Offset, // debug
+ })
+ reqlog.Debug("RPC Call")
+
+ if !in.RepairTool && !s.isClean {
+ reqlog.Debug("KV out of sync with volumes")
+ return nil, status.Errorf(codes.FailedPrecondition, "KV out of sync with volumes")
+ }
+
+ // Check if volume exists
+ volumeKey := EncodeVolumeKey(in.VolumeIndex)
+ volumeValue, err := s.kv.Get(volumePrefix, volumeKey)
+ if err != nil {
+ reqlog.Errorf("unable to check for existing volume key: %v", err)
+ s.statsd_c.Increment("register_object.fail")
+ return nil, status.Errorf(codes.Unavailable, "unable to check for existing volume key")
+ }
+
+ if volumeValue == nil {
+ reqlog.Info("volume index does not exist in db")
+ s.statsd_c.Increment("register_object.ok")
+ return nil, status.Errorf(codes.FailedPrecondition, "volume index does not exist in db")
+ }
+
+ partition, volumeType, _, currentUsedSpace, state, err := DecodeVolumeValue(volumeValue)
+
+ objectKey, err := EncodeObjectKey(in.Name)
+ if err != nil {
+ reqlog.Errorf("unable to encode object key: %v", err)
+ s.statsd_c.Increment("register_object.fail")
+ return nil, status.Errorf(codes.Unavailable, "unable to encode object key")
+ }
+
+ objectValue := EncodeObjectValue(in.VolumeIndex, in.Offset)
+
+ // If an object exists with the same name, we need to move it to the delete queue before overwriting the key.
+ // On the regular file backend, this would happen automatically with the rename operation. In our case,
+ // we would leak space. (The space will be reclaimed on compaction, but it shouldn't happen).
+
+ var objMutex = &sync.Mutex{}
+ objMutex.Lock()
+
+ existingValue, err := s.kv.Get(objectPrefix, objectKey)
+ if err != nil {
+ reqlog.Errorf("unable to check for existing object: %v", err)
+ s.statsd_c.Increment("register_object.fail")
+ return nil, status.Errorf(codes.Unavailable, "unable to retrieve object")
+ }
+
+ if existingValue != nil {
+ reqlog.Info("object already exists")
+ s.statsd_c.Increment("register_object.ok")
+ return nil, status.Errorf(codes.AlreadyExists, "object already exists")
+ }
+
+ // Update volume offset
+ volumeNewValue := EncodeVolumeValue(int64(partition), volumeType, int64(in.NextOffset), int64(currentUsedSpace), state)
+ wb := s.kv.NewWriteBatch()
+ defer wb.Close()
+ wb.Put(volumePrefix, volumeKey, volumeNewValue)
+ wb.Put(objectPrefix, objectKey, objectValue)
+
+ err = wb.Commit()
+ if err != nil {
+ reqlog.Errorf("failed to Put new volume value and new object entry: %v", err)
+ s.statsd_c.Increment("register_object.fail")
+ return nil, status.Errorf(codes.Unavailable, "unable to update volume and register new object")
+ }
+ objMutex.Unlock()
+
+ s.statsd_c.Increment("register_object.ok")
+
+ out, err := proto.Marshal(&pb.RegisterObjectReply{})
+ if err != nil {
+ reqlog.Errorf("failed to serialize reply: %v", err)
+ return nil, status.Errorf(codes.Unavailable, "unable to serialize reply: %v", err)
+ }
+ return &out, nil
+}
+
+// UnregisterObject removes an an object entry from the kv.
+func UnregisterObject(s *server, ctx context.Context, pbIn *[]byte) (*[]byte, error) {
+ in := &pb.UnregisterObjectRequest{}
+ if err := proto.Unmarshal(*pbIn, in); err != nil {
+ logrus.Errorf("UnregisterObject failed to unmarshal input: %v", err)
+ return nil, status.Errorf(codes.InvalidArgument, "unable to deserialize protobuf")
+ }
+ reqlog := log.WithFields(logrus.Fields{
+ "Function": "UnregisterObject",
+ "Name": fmt.Sprintf("%s", in.Name),
+ "DiskPath": s.diskPath,
+ })
+ reqlog.Debug("RPC Call")
+
+ if !in.RepairTool && !s.isClean {
+ reqlog.Debug("KV out of sync with volumes")
+ return nil, status.Errorf(codes.FailedPrecondition, "KV out of sync with volumes")
+ }
+
+ objectKey, err := EncodeObjectKey(in.Name)
+ if err != nil {
+ reqlog.Errorf("unable to encode object key: %v", err)
+ s.statsd_c.Increment("unregister_object.fail")
+ return nil, status.Errorf(codes.Unavailable, "unable to encode object key")
+ }
+
+ value, err := s.kv.Get(objectPrefix, objectKey)
+ if err != nil {
+ reqlog.Errorf("unable to retrieve object: %v", err)
+ s.statsd_c.Increment("unregister_object.fail")
+ return nil, status.Errorf(codes.Unavailable, "unable to retrieve object")
+ }
+
+ if value == nil {
+ reqlog.Debug("object does not exist")
+ s.statsd_c.Increment("unregister_object.ok")
+ return nil, status.Errorf(codes.NotFound, "%s", in.Name)
+ }
+
+ // Delete key
+ err = s.kv.Delete(objectPrefix, objectKey)
+ if err != nil {
+ reqlog.Errorf("failed to Delete key: %v", err)
+ s.statsd_c.Increment("unregister_object.fail")
+ return nil, status.Errorf(codes.Unavailable, "unable to unregister object")
+ }
+
+ s.statsd_c.Increment("unregister_object.ok")
+ out, err := proto.Marshal(&pb.UnregisterObjectReply{})
+ if err != nil {
+ reqlog.Errorf("failed to serialize reply for del object reply: %v", err)
+ return nil, status.Errorf(codes.Unavailable, "unable to serialize reply for del object reply: %v", err)
+ }
+ return &out, nil
+}
+
+// RenameObject changes an object key in the kv. (used for erasure code)
+func RenameObject(s *server, ctx context.Context, pbIn *[]byte) (*[]byte, error) {
+ in := &pb.RenameObjectRequest{}
+ if err := proto.Unmarshal(*pbIn, in); err != nil {
+ logrus.Errorf("failed to unmarshal input: %v", err)
+ return nil, status.Errorf(codes.InvalidArgument, "unable to deserialize protobuf")
+ }
+
+ reqlog := log.WithFields(logrus.Fields{
+ "Function": "RenameObject",
+ "Name": fmt.Sprintf("%s", in.Name),
+ "NewName": fmt.Sprintf("%s", in.NewName),
+ })
+ reqlog.Debug("RPC Call")
+
+ if !in.RepairTool && !s.isClean {
+ reqlog.Debug("KV out of sync with volumes")
+ return nil, status.Errorf(codes.FailedPrecondition, "KV out of sync with volumes")
+ }
+
+ objectKey, err := EncodeObjectKey(in.Name)
+ if err != nil {
+ reqlog.Errorf("unable to encode object key: %v", err)
+ s.statsd_c.Increment("rename_object.fail")
+ return nil, status.Errorf(codes.Unavailable, "unable to encode object key")
+ }
+
+ objectNewKey, err := EncodeObjectKey(in.NewName)
+ if err != nil {
+ reqlog.Errorf("unable to encode new object key: %v", err)
+ s.statsd_c.Increment("rename_object.fail")
+ return nil, status.Errorf(codes.Unavailable, "unable to encode object key")
+ }
+
+ value, err := s.kv.Get(objectPrefix, objectKey)
+ if err != nil {
+ reqlog.Errorf("unable to retrieve object: %v", err)
+ s.statsd_c.Increment("rename_object.fail")
+ return nil, status.Errorf(codes.Unavailable, "unable to retrieve object")
+ }
+
+ if value == nil {
+ reqlog.Debug("object does not exist")
+ s.statsd_c.Increment("rename_object.ok")
+ return nil, status.Errorf(codes.NotFound, "%s", in.Name)
+ }
+
+ // Delete old entry and create a new one
+ wb := s.kv.NewWriteBatch()
+ defer wb.Close()
+ wb.Delete(objectPrefix, objectKey)
+ wb.Put(objectPrefix, objectNewKey, value)
+
+ err = wb.Commit()
+ if err != nil {
+ reqlog.Errorf("failed to commit WriteBatch for rename: %v", err)
+ s.statsd_c.Increment("rename_object.fail")
+ return nil, status.Errorf(codes.Unavailable, "failed to commit WriteBatch for rename")
+ }
+
+ s.statsd_c.Increment("rename_object.ok")
+
+ out, err := proto.Marshal(&pb.RenameObjectReply{})
+ if err != nil {
+ reqlog.Errorf("failed to serialize reply: %v", err)
+ return nil, status.Errorf(codes.Unavailable, "unable to serialize reply: %v", err)
+ }
+ return &out, nil
+}
+
+// LoadObject returns an object information
+func LoadObject(s *server, ctx context.Context, pbIn *[]byte) (*[]byte, error) {
+ in := &pb.LoadObjectRequest{}
+ if err := proto.Unmarshal(*pbIn, in); err != nil {
+ logrus.Errorf("failed to unmarshal input: %v", err)
+ return nil, status.Errorf(codes.InvalidArgument, "unable to deserialize protobuf")
+ }
+
+ reqlog := log.WithFields(logrus.Fields{
+ "Function": "LoadObject",
+ "Name": fmt.Sprintf("%s", in.Name),
+ "IsQuarantined": fmt.Sprintf("%t", in.IsQuarantined),
+ })
+ reqlog.Debug("RPC Call")
+
+ var prefix byte
+
+ if !in.RepairTool && !s.isClean {
+ reqlog.Debug("KV out of sync with volumes")
+ return nil, status.Errorf(codes.FailedPrecondition, "KV out of sync with volumes")
+ }
+
+ objectKey, err := EncodeObjectKey(in.Name)
+ if err != nil {
+ reqlog.Errorf("unable to encode object key: %v", err)
+ s.statsd_c.Increment("load_object.fail")
+ return nil, status.Errorf(codes.Unavailable, "unable to encode object key")
+ }
+
+ if in.IsQuarantined {
+ prefix = quarantinePrefix
+ } else {
+ prefix = objectPrefix
+ }
+ reqlog.Debugf("is quarantined: %v", in.IsQuarantined)
+ value, err := s.kv.Get(prefix, objectKey)
+ if err != nil {
+ reqlog.Errorf("unable to retrieve object: %v", err)
+ s.statsd_c.Increment("load_object.fail")
+ return nil, status.Errorf(codes.Unavailable, "unable to retrieve object")
+ }
+
+ if value == nil {
+ reqlog.Debug("object does not exist")
+ s.statsd_c.Increment("load_object.ok")
+ return nil, status.Errorf(codes.NotFound, "%s", in.Name)
+ }
+
+ volumeIndex, offset, err := DecodeObjectValue(value)
+ if err != nil {
+ reqlog.Errorf("failed to decode object value: %v", err)
+ s.statsd_c.Increment("load_object.fail")
+ return nil, status.Errorf(codes.Internal, "unable to read object")
+ }
+
+ s.statsd_c.Increment("load_object.ok")
+
+ out, err := proto.Marshal(&pb.LoadObjectReply{Name: in.Name, VolumeIndex: volumeIndex, Offset: offset})
+ if err != nil {
+ reqlog.Errorf("failed to serialize reply: %v", err)
+ return nil, status.Errorf(codes.Unavailable, "unable to serialize reply: %v", err)
+ }
+ return &out, nil
+}
+
+// QuarantineObject
+func QuarantineObject(s *server, ctx context.Context, pbIn *[]byte) (*[]byte, error) {
+ in := &pb.QuarantineObjectRequest{}
+ if err := proto.Unmarshal(*pbIn, in); err != nil {
+ logrus.Errorf("failed to unmarshal input: %v", err)
+ return nil, status.Errorf(codes.InvalidArgument, "unable to deserialize protobuf")
+ }
+
+ reqlog := log.WithFields(logrus.Fields{
+ "Function": "QuarantineObject",
+ "Name": fmt.Sprintf("%s", in.Name),
+ })
+ reqlog.Debug("RPC Call")
+
+ if !s.isClean {
+ reqlog.Debug("KV out of sync with volumes")
+ return nil, status.Errorf(codes.FailedPrecondition, "KV out of sync with volumes")
+ }
+
+ objectKey, err := EncodeObjectKey(in.Name)
+ if err != nil {
+ reqlog.Errorf("unable to encode object key: %v", err)
+ s.statsd_c.Increment("quarantine_object.fail")
+ return nil, status.Errorf(codes.Unavailable, "unable to encode object key")
+ }
+
+ value, err := s.kv.Get(objectPrefix, objectKey)
+ if err != nil {
+ reqlog.Errorf("unable to retrieve object: %v", err)
+ s.statsd_c.Increment("quarantine_object.fail")
+ return nil, status.Errorf(codes.Unavailable, "unable to retrieve object")
+ }
+
+ if value == nil {
+ reqlog.Debug("object does not exist")
+ s.statsd_c.Increment("quarantine_object.ok")
+ return nil, status.Errorf(codes.NotFound, "%s", in.Name)
+ }
+
+ // Add quarantine key, delete obj key
+ wb := s.kv.NewWriteBatch()
+ defer wb.Close()
+ // TODO: check here if an ohash already exists with the same name. Put files in the same dir, or make a new one ? (current swift code
+ // appears to add an extension in that case. This will require a new format (encode/decode) in the KV)
+ // Also check if full key already exists.
+ wb.Put(quarantinePrefix, objectKey, value)
+ wb.Delete(objectPrefix, objectKey)
+ err = wb.Commit()
+ if err != nil {
+ reqlog.Errorf("failed to quarantine object: %v", err)
+ s.statsd_c.Increment("quarantine_object.fail")
+ return nil, status.Error(codes.Unavailable, "unable to quarantine object")
+ }
+
+ s.statsd_c.Increment("quarantine_object.ok")
+
+ out, err := proto.Marshal(&pb.QuarantineObjectReply{})
+ if err != nil {
+ reqlog.Errorf("failed to serialize reply: %v", err)
+ return nil, status.Errorf(codes.Unavailable, "unable to serialize reply: %v", err)
+ }
+ return &out, nil
+}
+
+// UnquarantineObject
+func UnquarantineObject(s *server, ctx context.Context, pbIn *[]byte) (*[]byte, error) {
+ in := &pb.UnquarantineObjectRequest{}
+ if err := proto.Unmarshal(*pbIn, in); err != nil {
+ logrus.Errorf("failed to unmarshal input: %v", err)
+ return nil, status.Errorf(codes.InvalidArgument, "unable to deserialize protobuf")
+ }
+
+ reqlog := log.WithFields(logrus.Fields{
+ "Function": "UnquarantineObject",
+ "Name": fmt.Sprintf("%s", in.Name),
+ })
+ reqlog.Debug("RPC Call")
+
+ if !s.isClean {
+ reqlog.Debug("KV out of sync with volumes")
+ return nil, status.Errorf(codes.FailedPrecondition, "KV out of sync with volumes")
+ }
+
+ objectKey, err := EncodeObjectKey(in.Name)
+ if err != nil {
+ reqlog.Errorf("unable to encode object key: %v", err)
+ s.statsd_c.Increment("unquarantine_object.fail")
+ return nil, status.Errorf(codes.Unavailable, "unable to encode object key")
+ }
+
+ value, err := s.kv.Get(quarantinePrefix, objectKey)
+ if err != nil {
+ reqlog.Errorf("unable to retrieve object: %v", err)
+ s.statsd_c.Increment("unquarantine_object.fail")
+ return nil, status.Errorf(codes.Unavailable, "unable to retrieve object")
+ }
+
+ if value == nil {
+ reqlog.Debug("object does not exist")
+ s.statsd_c.Increment("unquarantine_object.ok")
+ return nil, status.Errorf(codes.NotFound, "%s", in.Name)
+ }
+
+ // Add object key, delete quarantine key
+ wb := s.kv.NewWriteBatch()
+ defer wb.Close()
+ wb.Put(objectPrefix, objectKey, value)
+ wb.Delete(quarantinePrefix, objectKey)
+ err = wb.Commit()
+ if err != nil {
+ reqlog.Errorf("failed to unquarantine object: %v", err)
+ s.statsd_c.Increment("unquarantine_object.fail")
+ return nil, status.Error(codes.Unavailable, "unable to unquarantine object")
+ }
+
+ s.statsd_c.Increment("unquarantine_object.ok")
+
+ out, err := proto.Marshal(&pb.UnquarantineObjectReply{})
+ if err != nil {
+ reqlog.Errorf("failed to serialize reply: %v", err)
+ return nil, status.Errorf(codes.Unavailable, "unable to serialize reply: %v", err)
+ }
+ return &out, nil
+}
+
+// LoadObjectsByPrefix returns list of objects with the given prefix.
+// In practice this is used to emulate the object hash directory that swift
+// would create with the regular diskfile backend.
+func LoadObjectsByPrefix(s *server, ctx context.Context, pbIn *[]byte) (*[]byte, error) {
+ in := &pb.LoadObjectsByPrefixRequest{}
+ if err := proto.Unmarshal(*pbIn, in); err != nil {
+ logrus.Errorf("failed to unmarshal input: %v", err)
+ return nil, status.Errorf(codes.InvalidArgument, "unable to deserialize protobuf")
+ }
+
+ reqlog := log.WithFields(logrus.Fields{
+ "Function": "LoadObjectsByPrefix",
+ "Prefix": fmt.Sprintf("%s", in.Prefix),
+ })
+ reqlog.Debug("RPC Call")
+
+ if !in.RepairTool && !s.isClean {
+ reqlog.Debug("KV out of sync with volumes")
+ return nil, status.Errorf(codes.FailedPrecondition, "KV out of sync with volumes")
+ }
+
+ // prefix must be 32 characters for this to work (because we now encode the md5 hash, see
+ // EncodeObjectKey in encoding.go
+ if len(in.Prefix) != 32 {
+ reqlog.Error("prefix len != 32")
+ s.statsd_c.Increment("load_objects_by_prefix.fail")
+ return nil, status.Errorf(codes.Internal, "prefix len != 32")
+ }
+
+ prefix, err := EncodeObjectKey(in.Prefix)
+ if err != nil {
+ reqlog.Errorf("unable to encode object prefix: %v", err)
+ s.statsd_c.Increment("load_objects_by_prefix.fail")
+ return nil, status.Errorf(codes.Unavailable, "unable to encode object prefix")
+ }
+
+ it := s.kv.NewIterator(objectPrefix)
+ defer it.Close()
+
+ response := &pb.LoadObjectsByPrefixReply{}
+
+ // adds one byte because of prefix. Otherwise len(prefix) would be len(prefix)-1
+ for it.Seek(prefix); it.Valid() && len(prefix) <= len(it.Key()) && bytes.Equal(prefix, it.Key()[:len(prefix)]); it.Next() {
+
+ // Decode value
+ volumeIndex, offset, err := DecodeObjectValue(it.Value())
+ if err != nil {
+ reqlog.Errorf("failed to decode object value: %v", err)
+ s.statsd_c.Increment("load_objects_by_prefix.fail")
+ return nil, status.Errorf(codes.Internal, "unable to read object")
+ }
+
+ key := make([]byte, 32+len(it.Key()[16:]))
+ err = DecodeObjectKey(it.Key(), key)
+ if err != nil {
+ reqlog.Errorf("failed to decode object key: %v", err)
+ s.statsd_c.Increment("load_objects_by_prefix.fail")
+ return nil, status.Errorf(codes.Internal, "unable to decode object key")
+ }
+ response.Objects = append(response.Objects, &pb.Object{Name: key, VolumeIndex: volumeIndex, Offset: offset})
+ }
+
+ s.statsd_c.Increment("load_objects_by_prefix.ok")
+
+ return serializePb(response)
+}
+
+// LoadObjectsByVolume returns a list of all objects within a volume, with pagination.
+// Quarantined, if true, will return only quarantined objects, if false, non-quarantined objects.
+// PageToken is the object name to start from, as returned from a previous call in the
+// NextPageToken field. If empty, the iterator will start from the first objects in the volume.
+// PageSize is the maximum count of items to return. If zero, the server will pick a reasonnable limit.
+// func (s *server) LoadObjectsByVolume(in *pb.VolumeIndex, stream pb.FileMgr_LoadObjectsByVolumeServer) error {
+func LoadObjectsByVolume(s *server, ctx context.Context, pbIn *[]byte) (*[]byte, error) {
+ in := &pb.LoadObjectsByVolumeRequest{}
+ if err := proto.Unmarshal(*pbIn, in); err != nil {
+ logrus.Errorf("failed to unmarshal input: %v", err)
+ return nil, status.Errorf(codes.InvalidArgument, "unable to deserialize protobuf")
+ }
+
+ reqlog := log.WithFields(logrus.Fields{
+ "Function": "LoadObjectsByVolume",
+ "VolumeIndex": in.Index,
+ "PageToken": in.PageToken,
+ "PageSize": in.PageSize,
+ })
+ reqlog.Debug("RPC Call")
+
+ if !in.RepairTool && !s.isClean {
+ reqlog.Debug("KV out of sync with volumes")
+ return nil, status.Errorf(codes.FailedPrecondition, "KV out of sync with volumes")
+ }
+
+ limit := in.PageSize
+ if limit == 0 {
+ reqlog.Debug("page_size was not specified, set it to 10000")
+ limit = 10000
+ }
+
+ pageToken := make([]byte, len(in.PageToken))
+ pageToken = in.PageToken
+ if bytes.Equal(pageToken, []byte("")) {
+ pageToken = []byte(strings.Repeat("0", 32))
+ }
+
+ prefix, err := EncodeObjectKey(pageToken)
+ if err != nil {
+ reqlog.Errorf("unable to encode object prefix: %v", err)
+ s.statsd_c.Increment("load_objects_by_volume.fail")
+ return nil, status.Errorf(codes.Internal, "unable to encode object prefix")
+ }
+
+ // Return either quarantined or "regular" objects
+ var it Iterator
+ if in.Quarantined {
+ it = s.kv.NewIterator(quarantinePrefix)
+ } else {
+ it = s.kv.NewIterator(objectPrefix)
+ }
+ defer it.Close()
+
+ response := &pb.LoadObjectsByVolumeReply{}
+
+ // Objects are not indexed by volume. We have to scan the whole KV and examine each value.
+ // It shouldn't matter as this is only used for compaction, and each object will have to be copied.
+ // Disk activity dwarfs CPU usage. (for spinning rust anyway, but SSDs?)
+ count := uint32(0)
+ for it.Seek(prefix); it.Valid() && count < limit; it.Next() {
+ volumeIndex, offset, err := DecodeObjectValue(it.Value())
+ if err != nil {
+ reqlog.Errorf("failed to decode object value: %v", err)
+ s.statsd_c.Increment("load_objects_by_volume.fail")
+ return nil, status.Errorf(codes.Internal, "unable to read object")
+ }
+
+ if volumeIndex == in.Index {
+ key := make([]byte, 32+len(it.Key()[16:]))
+ err = DecodeObjectKey(it.Key(), key)
+ if err != nil {
+ reqlog.Errorf("failed to decode object key: %v", err)
+ s.statsd_c.Increment("load_objects_by_prefix.fail")
+ return nil, status.Errorf(codes.Internal, "unable to decode object key")
+ }
+ response.Objects = append(response.Objects, &pb.Object{Name: key, VolumeIndex: volumeIndex, Offset: offset})
+ count++
+ }
+ }
+
+ // Set NextPageToken if there is at least one ohash found in the same volume
+ for ; it.Valid(); it.Next() {
+ volumeIndex, _, err := DecodeObjectValue(it.Value())
+ if err != nil {
+ reqlog.Errorf("failed to decode object value: %v", err)
+ s.statsd_c.Increment("load_objects_by_volume.fail")
+ return nil, status.Errorf(codes.Internal, "unable to read object")
+ }
+
+ if volumeIndex == in.Index {
+ key := make([]byte, 32+len(it.Key()[16:]))
+ err = DecodeObjectKey(it.Key(), key)
+ if err != nil {
+ reqlog.Errorf("failed to decode object key: %v", err)
+ s.statsd_c.Increment("load_objects_by_prefix.fail")
+ return nil, status.Errorf(codes.Internal, "unable to decode object key")
+ }
+ nextPageToken := make([]byte, len(key))
+ copy(nextPageToken, key)
+ response.NextPageToken = key
+ break
+ }
+
+ }
+ s.statsd_c.Increment("load_objects_by_volume.ok")
+ return serializePb(response)
+}
+
+// ListPartitions returns a list of partitions for which we have objects.
+// This is used to emulate a listdir() of partitions below the "objects" directory.
+func ListPartitions(s *server, ctx context.Context, pbIn *[]byte) (*[]byte, error) {
+ in := &pb.ListPartitionsRequest{}
+ if err := proto.Unmarshal(*pbIn, in); err != nil {
+ logrus.Errorf("failed to unmarshal input: %v", err)
+ return nil, status.Errorf(codes.InvalidArgument, "unable to deserialize protobuf")
+ }
+
+ reqlog := log.WithFields(logrus.Fields{
+ "Function": "ListPartitions",
+ "PartitionBits": in.PartitionBits,
+ })
+ reqlog.Debug("RPC Call")
+
+ if !s.isClean {
+ reqlog.Debug("KV out of sync with volumes")
+ return nil, status.Errorf(codes.FailedPrecondition, "KV out of sync with volumes")
+ }
+
+ var currentPartition uint64
+ var err error
+ var ohash []byte
+
+ // Partition bits
+ pBits := int(in.PartitionBits)
+
+ response := &pb.DirEntries{}
+
+ // Seek to first object key
+ it := s.kv.NewIterator(objectPrefix)
+ defer it.Close()
+ it.SeekToFirst()
+
+ // No object in the KV.
+ if !it.Valid() {
+ s.statsd_c.Increment("list_partitions.ok")
+ return serializePb(response)
+ }
+
+ // Extract the md5 hash
+ if len(it.Key()) < 16 {
+ reqlog.WithFields(logrus.Fields{"key": it.Key()}).Error("object key < 16")
+ } else {
+ ohash = make([]byte, 32+len(it.Key()[16:]))
+ err = DecodeObjectKey(it.Key()[:16], ohash)
+ if err != nil {
+ reqlog.Errorf("failed to decode object key: %v", err)
+ s.statsd_c.Increment("load_objects_by_prefix.fail")
+ return nil, status.Errorf(codes.Internal, "unable to decode object key")
+ }
+ currentPartition, err = getPartitionFromOhash(ohash, pBits)
+ if err != nil {
+ s.statsd_c.Increment("list_partitions.fail")
+ return nil, err
+ }
+ }
+
+ response.Entry = append(response.Entry, fmt.Sprintf("%d", currentPartition))
+ if err != nil {
+ s.statsd_c.Increment("list_partitions.fail")
+ return nil, err
+ }
+
+ maxPartition, err := getLastPartition(pBits)
+
+ for currentPartition < maxPartition {
+ currentPartition++
+ firstKey, err := getEncodedObjPrefixFromPartition(currentPartition, pBits)
+ if err != nil {
+ s.statsd_c.Increment("list_partitions.fail")
+ return nil, err
+ }
+ nextFirstKey, err := getEncodedObjPrefixFromPartition(currentPartition+1, pBits)
+ if err != nil {
+ s.statsd_c.Increment("list_partitions.fail")
+ return nil, err
+ }
+
+ // key logging is now wrong, as it's not the ascii form
+ reqlog.WithFields(logrus.Fields{"currentPartition": currentPartition,
+ "maxPartition": maxPartition,
+ "firstKey": firstKey,
+ "ohash": ohash,
+ "nextFirstKey": nextFirstKey}).Debug("In loop")
+
+ it.Seek(firstKey)
+ if !it.Valid() {
+ s.statsd_c.Increment("list_partitions.ok")
+ return serializePb(response)
+ }
+
+ if len(it.Key()) < 16 {
+ reqlog.WithFields(logrus.Fields{"key": it.Key()}).Error("object key < 16")
+ } else {
+ ohash = make([]byte, 32+len(it.Key()[16:]))
+ err = DecodeObjectKey(it.Key()[:16], ohash)
+ if err != nil {
+ reqlog.Errorf("failed to decode object key: %v", err)
+ s.statsd_c.Increment("load_objects_by_prefix.fail")
+ return nil, status.Errorf(codes.Internal, "unable to decode object key")
+ }
+ // nextFirstKey is encoded, compare with encoded hash (16 first bits of the key)
+ if bytes.Compare(it.Key()[:16], nextFirstKey) > 0 {
+ // There was no key in currentPartition, find in which partition we are
+ currentPartition, err = getPartitionFromOhash(ohash, pBits)
+ if err != nil {
+ s.statsd_c.Increment("list_partitions.fail")
+ return nil, err
+ }
+ }
+ response.Entry = append(response.Entry, fmt.Sprintf("%d", currentPartition))
+ }
+ }
+
+ s.statsd_c.Increment("list_partitions.ok")
+ return serializePb(response)
+}
+
+// ListPartition returns a list of suffixes within a partition
+func ListPartition(s *server, ctx context.Context, pbIn *[]byte) (*[]byte, error) {
+ in := &pb.ListPartitionRequest{}
+ if err := proto.Unmarshal(*pbIn, in); err != nil {
+ logrus.Errorf("failed to unmarshal input: %v", err)
+ return nil, status.Errorf(codes.InvalidArgument, "unable to deserialize protobuf")
+ }
+
+ reqlog := log.WithFields(logrus.Fields{
+ "Function": "ListPartition",
+ "Partition": in.Partition,
+ "PartitionBits": in.PartitionBits,
+ })
+ reqlog.Debug("RPC Call")
+
+ if !s.isClean {
+ reqlog.Debug("KV out of sync with volumes")
+ return nil, status.Errorf(codes.FailedPrecondition, "KV out of sync with volumes")
+ }
+
+ // Set to hold suffixes within partition
+ suffixSet := make(map[[3]byte]bool)
+ var suffix [3]byte
+
+ // Partition bits
+ pBits := int(in.PartitionBits)
+ partition := uint64(in.Partition)
+
+ response := &pb.DirEntries{}
+
+ firstKey, err := getEncodedObjPrefixFromPartition(partition, pBits)
+ if err != nil {
+ s.statsd_c.Increment("list_partition.fail")
+ return nil, err
+ }
+
+ // Seek to first key in partition, if any
+ it := s.kv.NewIterator(objectPrefix)
+ defer it.Close()
+
+ it.Seek(firstKey)
+ // No object in the KV
+ if !it.Valid() {
+ s.statsd_c.Increment("list_partition.ok")
+ return serializePb(response)
+ }
+
+ key := make([]byte, 32+len(it.Key()[16:]))
+ err = DecodeObjectKey(it.Key(), key)
+ if err != nil {
+ reqlog.Errorf("failed to decode object key: %v", err)
+ s.statsd_c.Increment("load_objects_by_prefix.fail")
+ return nil, status.Errorf(codes.Internal, "unable to decode object key")
+ }
+ currentPartition, err := getPartitionFromOhash(key, pBits)
+ if err != nil {
+ s.statsd_c.Increment("list_partition.fail")
+ return nil, err
+ }
+
+ // Get all suffixes in the partition
+ for currentPartition == partition {
+ // Suffix is the last three bytes of the object hash
+ copy(suffix[:], key[29:32])
+ suffixSet[suffix] = true
+ it.Next()
+ if !it.Valid() {
+ break
+ }
+ key = make([]byte, 32+len(it.Key()[16:]))
+ err = DecodeObjectKey(it.Key(), key)
+ if err != nil {
+ reqlog.Errorf("failed to decode object key: %v", err)
+ s.statsd_c.Increment("load_objects_by_prefix.fail")
+ return nil, status.Errorf(codes.Internal, "unable to decode object key")
+ }
+ currentPartition, err = getPartitionFromOhash(key, pBits)
+ }
+
+ // Build the response from the hashmap
+ for suffix := range suffixSet {
+ response.Entry = append(response.Entry, fmt.Sprintf("%s", suffix))
+ }
+
+ s.statsd_c.Increment("list_partition.ok")
+ return serializePb(response)
+}
+
+// ListSuffix returns a list of object hashes below the partition and suffix
+func ListSuffix(s *server, ctx context.Context, pbIn *[]byte) (*[]byte, error) {
+ in := &pb.ListSuffixRequest{}
+ if err := proto.Unmarshal(*pbIn, in); err != nil {
+ logrus.Errorf("failed to unmarshal input: %v", err)
+ return nil, status.Errorf(codes.InvalidArgument, "unable to deserialize protobuf")
+ }
+
+ reqlog := log.WithFields(logrus.Fields{
+ "Function": "ListSuffix",
+ "Partition": in.Partition,
+ "Suffix": fmt.Sprintf("%s", in.Suffix),
+ "PartitionBits": in.PartitionBits,
+ })
+
+ if !s.isClean {
+ reqlog.Debug("KV out of sync with volumes")
+ return nil, status.Errorf(codes.FailedPrecondition, "KV out of sync with volumes")
+ }
+
+ execTimeSerie := fmt.Sprintf("list_suffix.runtime.%s", s.diskName)
+ defer s.statsd_c.NewTiming().Send(execTimeSerie)
+ reqlog.Debug("RPC Call")
+
+ lastOhash := make([]byte, 32)
+
+ pBits := int(in.PartitionBits)
+ partition := uint64(in.Partition)
+ suffix := in.Suffix
+
+ response := &pb.DirEntries{}
+
+ failSerie := fmt.Sprintf("list_suffix.fail.%s", s.diskName)
+ successSerie := fmt.Sprintf("list_suffix.ok.%s", s.diskName)
+ firstKey, err := getEncodedObjPrefixFromPartition(partition, pBits)
+ if err != nil {
+ s.statsd_c.Increment(failSerie)
+ return nil, err
+ }
+
+ // Seek to first key in partition, if any
+ it := s.kv.NewIterator(objectPrefix)
+ defer it.Close()
+
+ it.Seek(firstKey)
+ // No object in the KV
+ if !it.Valid() {
+ s.statsd_c.Increment(successSerie)
+ return serializePb(response)
+ }
+
+ // Allocate the slice with a capacity matching the length of the longest possible key
+ // We can then reuse it in the loop below. (avoid heap allocations, profiling showed it was an issue)
+ curKey := make([]byte, 32+len(firstKey[16:]), maxObjKeyLen)
+ err = DecodeObjectKey(firstKey, curKey)
+ if err != nil {
+ reqlog.Errorf("failed to decode object key: %v", err)
+ s.statsd_c.Increment("load_objects_by_prefix.fail")
+ return nil, status.Errorf(codes.Internal, "unable to decode object key")
+ }
+ currentPartition, err := getPartitionFromOhash(curKey, pBits)
+ if err != nil {
+ s.statsd_c.Increment(failSerie)
+ return nil, err
+ }
+
+ for currentPartition == partition {
+ // Suffix is the last three bytes of the object hash
+ // key := make([]byte, 32+len(it.Key()[16:]))
+ curKey = curKey[:32+len(it.Key()[16:])]
+ err = DecodeObjectKey(it.Key(), curKey)
+ if err != nil {
+ reqlog.Errorf("failed to decode object key: %v", err)
+ s.statsd_c.Increment("load_objects_by_prefix.fail")
+ return nil, status.Errorf(codes.Internal, "unable to decode object key")
+ }
+ if bytes.Compare(curKey[29:32], suffix) == 0 {
+ ohash := make([]byte, 32)
+ ohash = curKey[:32]
+ // Only add to the list if we have not already done so
+ if !bytes.Equal(ohash, lastOhash) {
+ response.Entry = append(response.Entry, (fmt.Sprintf("%s", ohash)))
+ copy(lastOhash, ohash)
+ }
+ }
+ it.Next()
+ if !it.Valid() {
+ break
+ }
+ curKey = curKey[:32+len(it.Key()[16:])]
+ err = DecodeObjectKey(it.Key(), curKey)
+ if err != nil {
+ reqlog.Errorf("failed to decode object key: %v", err)
+ s.statsd_c.Increment("load_objects_by_prefix.fail")
+ return nil, status.Errorf(codes.Internal, "unable to decode object key")
+ }
+ currentPartition, err = getPartitionFromOhash(curKey, pBits)
+ }
+
+ s.statsd_c.Increment(successSerie)
+ return serializePb(response)
+}
+
+// Returns a list of quarantiened object hashes, with pagination.
+// PageToken is the ohash to start from, as returned from a previous call in the
+// NextPageToken field. If empty, the iterator will start from the first quarantined
+// object hash. PageSize is the maximum count of items to return. If zero,
+// the server will pick a reasonnable limit.
+func ListQuarantinedOHashes(s *server, ctx context.Context, pbIn *[]byte) (*[]byte, error) {
+ in := &pb.ListQuarantinedOHashesRequest{}
+ if err := proto.Unmarshal(*pbIn, in); err != nil {
+ logrus.Errorf("failed to unmarshal input: %v", err)
+ return nil, status.Errorf(codes.InvalidArgument, "unable to deserialize protobuf")
+ }
+
+ reqlog := log.WithFields(logrus.Fields{
+ "Function": "ListQuarantinedOhashes",
+ "PageToken": fmt.Sprintf("%s", in.PageToken),
+ "PageSize": fmt.Sprintf("%d", in.PageSize),
+ })
+ reqlog.Debug("RPC Call")
+
+ if !s.isClean {
+ reqlog.Debug("KV out of sync with volumes")
+ return nil, status.Errorf(codes.FailedPrecondition, "KV out of sync with volumes")
+ }
+
+ limit := in.PageSize
+ if limit == 0 {
+ reqlog.Debug("page_size was not specified, set it to 10000")
+ limit = 10000
+ }
+
+ pageToken := make([]byte, 32)
+ pageToken = in.PageToken
+ if bytes.Equal(pageToken, []byte("")) {
+ pageToken = []byte(strings.Repeat("0", 32))
+ }
+ if len(pageToken) != 32 {
+ reqlog.Error("prefix len != 32")
+ s.statsd_c.Increment("list_quarantined_ohashes.fail")
+ return nil, status.Errorf(codes.InvalidArgument, "page token length != 32")
+ }
+
+ prefix, err := EncodeObjectKey(pageToken)
+ if err != nil {
+ reqlog.Errorf("unable to encode object prefix: %v", err)
+ s.statsd_c.Increment("list_quarantined_ohashes.fail")
+ return nil, status.Errorf(codes.Unavailable, "unable to encode object prefix")
+ }
+
+ it := s.kv.NewIterator(quarantinePrefix)
+ defer it.Close()
+
+ response := &pb.ListQuarantinedOHashesReply{}
+ curKey := make([]byte, maxObjKeyLen)
+ lastOhash := make([]byte, 32)
+
+ count := uint32(0)
+ for it.Seek(prefix); it.Valid() && count < limit; it.Next() {
+ curKey = curKey[:32+len(it.Key()[16:])]
+ err = DecodeObjectKey(it.Key(), curKey)
+ if err != nil {
+ reqlog.Errorf("failed to decode quarantined object key: %v", err)
+ s.statsd_c.Increment("list_quarantined_ohashes.fail")
+ return nil, status.Errorf(codes.Internal, "unable decode quarantined object key")
+ }
+ if !bytes.Equal(curKey[:32], lastOhash) {
+ ohash := make([]byte, 32)
+ copy(ohash, curKey[:32])
+ response.Objects = append(response.Objects, &pb.QuarantinedObjectName{Name: ohash})
+ copy(lastOhash, curKey[:32])
+ count++
+ }
+ }
+
+ // Set NextPageToken if there is at least one ohash beyond what we have returned
+ for ; it.Valid(); it.Next() {
+ curKey = curKey[:32+len(it.Key()[16:])]
+ err = DecodeObjectKey(it.Key(), curKey)
+ if err != nil {
+ reqlog.Errorf("failed to decode quarantined object key: %v", err)
+ s.statsd_c.Increment("list_quarantined_ohashes.fail")
+ return nil, status.Errorf(codes.Internal, "unable decode quarantined object key")
+ }
+ if !bytes.Equal(curKey[:32], lastOhash) {
+ nextPageToken := make([]byte, 32)
+ copy(nextPageToken, curKey[:32])
+ response.NextPageToken = nextPageToken
+ break
+ }
+ }
+
+ s.statsd_c.Increment("list_quarantined_ohashes.ok")
+ return serializePb(response)
+}
+
+func ListQuarantinedOHash(s *server, ctx context.Context, pbIn *[]byte) (*[]byte, error) {
+ in := &pb.ListQuarantinedOHashRequest{}
+ if err := proto.Unmarshal(*pbIn, in); err != nil {
+ logrus.Errorf("failed to unmarshal input: %v", err)
+ return nil, status.Errorf(codes.InvalidArgument, "unable to deserialize protobuf")
+ }
+
+ reqlog := log.WithFields(logrus.Fields{
+ "Function": "ListQuarantineOHash",
+ "Prefix": fmt.Sprintf("%s", in.Prefix),
+ })
+ reqlog.Debug("RPC Call")
+
+ if !in.RepairTool && !s.isClean {
+ reqlog.Debug("KV out of sync with volumes")
+ return nil, status.Errorf(codes.FailedPrecondition, "KV out of sync with volumes")
+ }
+
+ if len(in.Prefix) != 32 {
+ reqlog.Error("prefix len != 32")
+ s.statsd_c.Increment("list_quarantined_ohash.fail")
+ return nil, status.Errorf(codes.Internal, "prefix len != 32")
+ }
+
+ prefix, err := EncodeObjectKey(in.Prefix)
+ if err != nil {
+ reqlog.Errorf("unable to encode object prefix: %v", err)
+ s.statsd_c.Increment("list_quarantined_ohash.fail")
+ return nil, status.Errorf(codes.Unavailable, "unable to encode object prefix")
+ }
+
+ it := s.kv.NewIterator(quarantinePrefix)
+ defer it.Close()
+
+ response := &pb.ListQuarantinedOHashReply{}
+
+ // adds one byte because of prefix. Otherwise len(prefix) would be len(prefix)-1
+ for it.Seek(prefix); it.Valid() && len(prefix) <= len(it.Key()) && bytes.Equal(prefix, it.Key()[:len(prefix)]); it.Next() {
+
+ // Decode value
+ volumeIndex, offset, err := DecodeObjectValue(it.Value())
+ if err != nil {
+ reqlog.Errorf("failed to decode object value: %v", err)
+ s.statsd_c.Increment("list_quarantined_ohash.fail")
+ return nil, status.Errorf(codes.Internal, "unable to read object")
+ }
+
+ key := make([]byte, 32+len(it.Key()[16:]))
+ err = DecodeObjectKey(it.Key(), key)
+ if err != nil {
+ reqlog.Errorf("failed to decode object key: %v", err)
+ s.statsd_c.Increment("list_quarantined_ohash.fail")
+ return nil, status.Errorf(codes.Internal, "unable to decode object key")
+ }
+ response.Objects = append(response.Objects, &pb.Object{Name: key, VolumeIndex: volumeIndex, Offset: offset})
+ }
+
+ s.statsd_c.Increment("list_quarantined_ohash.ok")
+ return serializePb(response)
+}
+
+func GetNextOffset(s *server, ctx context.Context, pbIn *[]byte) (*[]byte, error) {
+ in := &pb.GetNextOffsetRequest{}
+ if err := proto.Unmarshal(*pbIn, in); err != nil {
+ logrus.Errorf("failed to unmarshal input: %v", err)
+ return nil, status.Errorf(codes.InvalidArgument, "unable to deserialize protobuf")
+ }
+
+ reqlog := log.WithFields(logrus.Fields{"Function": "GetNextOffset", "VolumeIndex": in.VolumeIndex})
+ reqlog.Debug("RPC Call")
+
+ if !in.RepairTool && !s.isClean {
+ reqlog.Debug("KV out of sync with volumes")
+ return nil, status.Errorf(codes.FailedPrecondition, "KV out of sync with volumes")
+ }
+
+ key := EncodeVolumeKey(in.VolumeIndex)
+
+ value, err := s.kv.Get(volumePrefix, key)
+ if err != nil {
+ reqlog.Errorf("unable to retrieve volume key: %v", err)
+ s.statsd_c.Increment("get_next_offset.fail")
+ return nil, status.Errorf(codes.Unavailable, "unable to retrieve volume key")
+ }
+
+ if value == nil {
+ reqlog.Info("volume index does not exist in db")
+ s.statsd_c.Increment("get_next_offset.fail")
+ return nil, status.Errorf(codes.FailedPrecondition, "volume index does not exist in db")
+ }
+
+ _, _, nextOffset, _, _, err := DecodeVolumeValue(value)
+ if err != nil {
+ reqlog.WithFields(logrus.Fields{"value": value}).Errorf("failed to decode volume value: %v", err)
+ s.statsd_c.Increment("get_next_offset.fail")
+ return nil, status.Errorf(codes.Internal, "failed to decode volume value")
+ }
+
+ s.statsd_c.Increment("get_next_offset.ok")
+ return serializePb(&pb.GetNextOffsetReply{Offset: uint64(nextOffset)})
+}
+
+// GetStats returns stats for the KV. used for initial debugging, remove?
+func GetStats(s *server, ctx context.Context, pbIn *[]byte) (*[]byte, error) {
+ in := &pb.GetStatsRequest{}
+ if err := proto.Unmarshal(*pbIn, in); err != nil {
+ logrus.Errorf("failed to unmarshal input: %v", err)
+ return nil, status.Errorf(codes.InvalidArgument, "unable to deserialize protobuf")
+ }
+
+ response := new(pb.GetStatsReply)
+
+ m := CollectStats(s)
+ response.Stats = m
+
+ return serializePb(response)
+}
+
+// Sets KV state (is in sync with volumes, or not)
+func SetKvState(s *server, ctx context.Context, pbIn *[]byte) (*[]byte, error) {
+ in := &pb.KvState{}
+ if err := proto.Unmarshal(*pbIn, in); err != nil {
+ logrus.Errorf("failed to unmarshal input: %v", err)
+ return nil, status.Errorf(codes.InvalidArgument, "unable to deserialize protobuf")
+ }
+
+ reqlog := log.WithFields(logrus.Fields{"Function": "SetClean", "IsClean": in.IsClean})
+ reqlog.Debug("RPC Call")
+
+ s.isClean = in.IsClean
+ return serializePb(&pb.SetKvStateReply{})
+}
+
+// Gets KV state (is in sync with volumes, or not)
+func GetKvState(s *server, ctx context.Context, pbIn *[]byte) (*[]byte, error) {
+ in := &pb.KvState{}
+ if err := proto.Unmarshal(*pbIn, in); err != nil {
+ logrus.Errorf("failed to unmarshal input: %v", err)
+ return nil, status.Errorf(codes.InvalidArgument, "unable to deserialize protobuf")
+ }
+
+ reqlog := log.WithFields(logrus.Fields{"Function": "GetKvState"})
+ reqlog.Debug("RPC Call")
+ state := new(pb.KvState)
+ state.IsClean = s.isClean
+ return serializePb(state)
+}
+
+// Stops serving RPC requests and closes KV if we receive SIGTERM/SIGINT
+func shutdownHandler(s *server, wg *sync.WaitGroup) {
+ <-s.stopChan
+ rlog := log.WithFields(logrus.Fields{"socket": s.socketPath})
+ rlog.Info("Shutting down")
+
+ // Stop serving RPC requests
+ // Give it a 5s delay to finish serving active requests, then force close
+ rlog.Debug("Stopping RPC")
+
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+
+ if err := s.httpServer.Shutdown(ctx); err != nil {
+ // Error or timeout
+ rlog.Infof("HTTP server Shutdown: %v", err)
+ if err = s.httpServer.Close(); err != nil {
+ rlog.Infof("HTTP server Close: %v", err)
+ }
+ }
+
+ // Mark DB as clean
+ if s.isClean == true {
+ rlog.Debug("Mark DB as closed")
+ err := MarkDbClosed(s.kv)
+ if err != nil {
+ rlog.Warn("Failed to mark db as clean when shutting down")
+ }
+ } else {
+ rlog.Warn("State is not clean, not marking DB as closed (still out of sync with volumes)")
+ }
+
+ // Close KV
+ rlog.Debug("Closing KV")
+ s.kv.Close()
+ wg.Done()
+}
+
+func runServer(kv KV, diskPath string, socketPath string, stopChan chan os.Signal, isClean bool) (err error) {
+ var wg sync.WaitGroup
+
+ if err != nil {
+ return
+ }
+ os.Chmod(socketPath, 0660)
+
+ _, diskName := path.Split(path.Clean(diskPath))
+ fs := &server{kv: kv, diskPath: diskPath, diskName: diskName, socketPath: socketPath,
+ isClean: isClean, stopChan: stopChan}
+
+ go func() {
+ unixListener, err := net.Listen("unix", fs.socketPath)
+ if err != nil {
+ log.Printf("Cannot serve")
+ }
+ server := http.Server{Handler: fs}
+ fs.httpServer = &server
+ server.Serve(unixListener)
+ }()
+
+ // Initialize statsd client
+ statsdPrefix := "kv"
+ fs.statsd_c, err = statsd.New(statsd.Prefix(statsdPrefix))
+ if err != nil {
+ return
+ }
+
+ // Start shutdown handler
+ wg.Add(1)
+ go shutdownHandler(fs, &wg)
+ wg.Wait()
+
+ return
+}
+
+var strToFunc = map[string]rpcFunc{
+ "/register_volume": RegisterVolume,
+ "/unregister_volume": UnregisterVolume,
+ "/update_volume_state": UpdateVolumeState,
+ "/get_volume": GetVolume,
+ "/list_volumes": ListVolumes,
+ "/register_object": RegisterObject,
+ "/unregister_object": UnregisterObject,
+ "/rename_object": RenameObject,
+ "/load_object": LoadObject,
+ "/quarantine_object": QuarantineObject,
+ "/unquarantine_object": UnquarantineObject,
+ "/load_objects_by_prefix": LoadObjectsByPrefix,
+ "/load_objects_by_volume": LoadObjectsByVolume,
+ "/list_partitions": ListPartitions,
+ "/list_partition": ListPartition,
+ "/list_suffix": ListSuffix,
+ "/list_quarantined_ohashes": ListQuarantinedOHashes,
+ "/list_quarantined_ohash": ListQuarantinedOHash,
+ "/get_next_offset": GetNextOffset,
+ "/get_stats": GetStats,
+ "/set_kv_state": SetKvState,
+ "/get_kv_state": GetKvState,
+}
+
+func serializePb(msg proto.Message) (*[]byte, error) {
+ out, err := proto.Marshal(msg)
+ if err != nil {
+ log.Errorf("failed to serialize reply: %v", err)
+ return nil, status.Errorf(codes.Unavailable, "unable to serialize reply: %v", err)
+ }
+ return &out, nil
+}
+
+func sendError(w http.ResponseWriter, rpcErr error) (err error) {
+ w.Header().Set("Content-Type", "Content-Type: text/plain; charset=utf-8")
+ w.WriteHeader(int(rpcErr.(*status.RpcError).Code()))
+ errorMsg := []byte(rpcErr.Error())
+ _, err = w.Write(errorMsg)
+ return
+}
+
+func sendReply(w http.ResponseWriter, serializedPb []byte) (err error) {
+ w.Header().Set("Content-Type", "application/octet-stream")
+ w.WriteHeader(200)
+ _, err = w.Write(serializedPb)
+ return
+}
+
+func (s *server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+ var err error
+ log.Debugf(r.URL.Path)
+
+ // Match URL to RPC function
+ fn, ok := strToFunc[r.URL.Path]
+ if !ok {
+ log.Printf("No match for URL Path %s", r.URL.Path)
+ if err = sendError(w, status.Errorf(codes.Unimplemented, "Unimplemented RPC function")); err != nil {
+ log.Printf("Error sending reply: %v", err)
+ }
+ return
+ }
+
+ // Read request (body should be serialized protobuf)
+ body, err := ioutil.ReadAll(r.Body)
+ if err != nil {
+ log.Printf("Error reading body: %v", err)
+ if err = sendError(w, status.Errorf(codes.Internal, "Failed to read request body")); err != nil {
+ log.Printf("Error sending reply: %v", err)
+ }
+ return
+ }
+
+ // Call RPC function and send reply
+ resp, err := fn(s, r.Context(), &body)
+ if err != nil {
+ log.Println(err)
+ if err = sendError(w, err); err != nil {
+ log.Printf("Error sending reply: %v", err)
+ }
+ return
+ }
+
+ if err = sendReply(w, *resp); err != nil {
+ log.Printf("Error sending reply: %v", err)
+ }
+}
diff --git a/go/swift-rpc-losf/rpc_test.go b/go/swift-rpc-losf/rpc_test.go
new file mode 100644
index 000000000..0da9b5e6a
--- /dev/null
+++ b/go/swift-rpc-losf/rpc_test.go
@@ -0,0 +1,1014 @@
+// Copyright (c) 2010-2012 OpenStack Foundation
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+// implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main
+
+import (
+ "bytes"
+ "context"
+ "fmt"
+ "github.com/alecuyer/statsd/v2"
+ "github.com/golang/protobuf/proto"
+ "github.com/openstack/swift-rpc-losf/codes"
+ pb "github.com/openstack/swift-rpc-losf/proto"
+ "github.com/sirupsen/logrus"
+ "io/ioutil"
+ "net"
+ "net/http"
+ "os"
+ "path"
+ "strings"
+ "testing"
+)
+
+func runTestServer(kv KV, diskPath string, socketPath string, listening chan bool) (err error) {
+ _, diskName := path.Split(path.Clean(diskPath))
+ fs := &server{kv: kv, diskPath: diskPath, diskName: diskName, socketPath: socketPath, isClean: true}
+
+ statsdPrefix := "kv"
+ fs.statsd_c, err = statsd.New(statsd.Prefix(statsdPrefix))
+ if err != nil {
+ return
+ }
+
+ go func() {
+ os.Remove(fs.socketPath)
+ unixListener, err := net.Listen("unix", fs.socketPath)
+ if err != nil {
+ log.Fatalf("Cannot serve: %v", err)
+ }
+ listening <- true
+ server := http.Server{Handler: fs}
+ fs.httpServer = &server
+ log.Debug("Start serving")
+ server.Serve(unixListener)
+
+ }()
+ return
+}
+
+func teardown(tempdir string) {
+ if strings.HasPrefix(tempdir, "/tmp/") {
+ os.RemoveAll(tempdir)
+ }
+}
+
+// var client pb.FileMgrClient
+var client http.Client
+
+// Check that err == nil and HTTP's status code is 200. If err is not nil, return err,
+// if status code is not 200, returns an error with the status code received, if err is nil
+// and status code is 200, return nil)
+func check_200(response *http.Response, err error) error {
+ if err != nil {
+ return err
+ }
+
+ if response.StatusCode != 200 {
+ return fmt.Errorf("HTTP status code is not 200: %v", response.StatusCode)
+ }
+
+ return nil
+}
+
+func populateKV() (err error) {
+ volumes := []pb.RegisterVolumeRequest{
+ {Partition: 9, Type: 0, VolumeIndex: 20, Offset: 8192, State: 0, RepairTool: false},
+ {Partition: 10, Type: 0, VolumeIndex: 35, Offset: 8192, State: 0, RepairTool: false},
+ {Partition: 40, Type: 0, VolumeIndex: 24, Offset: 8192, State: 0, RepairTool: false},
+ {Partition: 63, Type: 0, VolumeIndex: 27, Offset: 8192, State: 0, RepairTool: false},
+ {Partition: 65, Type: 0, VolumeIndex: 33, Offset: 8192, State: 0, RepairTool: false},
+ {Partition: 71, Type: 0, VolumeIndex: 19, Offset: 8192, State: 0, RepairTool: false},
+ {Partition: 111, Type: 0, VolumeIndex: 47, Offset: 8192, State: 0, RepairTool: false},
+ {Partition: 127, Type: 0, VolumeIndex: 43, Offset: 8192, State: 0, RepairTool: false},
+ {Partition: 139, Type: 0, VolumeIndex: 50, Offset: 8192, State: 0, RepairTool: false},
+ {Partition: 171, Type: 0, VolumeIndex: 49, Offset: 8192, State: 0, RepairTool: false},
+ {Partition: 195, Type: 0, VolumeIndex: 12, Offset: 8192, State: 0, RepairTool: false},
+ {Partition: 211, Type: 0, VolumeIndex: 16, Offset: 8192, State: 0, RepairTool: false},
+ {Partition: 213, Type: 0, VolumeIndex: 14, Offset: 8192, State: 0, RepairTool: false},
+ {Partition: 243, Type: 0, VolumeIndex: 17, Offset: 8192, State: 0, RepairTool: false},
+ {Partition: 271, Type: 0, VolumeIndex: 8, Offset: 24576, State: 0, RepairTool: false},
+ {Partition: 295, Type: 0, VolumeIndex: 28, Offset: 8192, State: 0, RepairTool: false},
+ {Partition: 327, Type: 0, VolumeIndex: 48, Offset: 8192, State: 0, RepairTool: false},
+ {Partition: 360, Type: 0, VolumeIndex: 15, Offset: 12288, State: 0, RepairTool: false},
+ {Partition: 379, Type: 0, VolumeIndex: 25, Offset: 8192, State: 0, RepairTool: false},
+ {Partition: 417, Type: 0, VolumeIndex: 22, Offset: 8192, State: 0, RepairTool: false},
+ {Partition: 420, Type: 0, VolumeIndex: 32, Offset: 8192, State: 0, RepairTool: false},
+ {Partition: 421, Type: 0, VolumeIndex: 46, Offset: 8192, State: 0, RepairTool: false},
+ {Partition: 428, Type: 0, VolumeIndex: 21, Offset: 12288, State: 0, RepairTool: false},
+ {Partition: 439, Type: 0, VolumeIndex: 38, Offset: 8192, State: 0, RepairTool: false},
+ {Partition: 453, Type: 0, VolumeIndex: 44, Offset: 8192, State: 0, RepairTool: false},
+ {Partition: 466, Type: 0, VolumeIndex: 40, Offset: 8192, State: 0, RepairTool: false},
+ {Partition: 500, Type: 0, VolumeIndex: 39, Offset: 8192, State: 0, RepairTool: false},
+ {Partition: 513, Type: 0, VolumeIndex: 26, Offset: 8192, State: 0, RepairTool: false},
+ {Partition: 530, Type: 0, VolumeIndex: 4, Offset: 8192, State: 0, RepairTool: false},
+ {Partition: 530, Type: 1, VolumeIndex: 5, Offset: 8192, State: 0, RepairTool: false},
+ {Partition: 535, Type: 0, VolumeIndex: 1, Offset: 20480, State: 0, RepairTool: false},
+ {Partition: 535, Type: 0, VolumeIndex: 2, Offset: 4096, State: 0, RepairTool: false},
+ {Partition: 535, Type: 1, VolumeIndex: 3, Offset: 12288, State: 0, RepairTool: false},
+ {Partition: 559, Type: 0, VolumeIndex: 30, Offset: 8192, State: 0, RepairTool: false},
+ {Partition: 602, Type: 0, VolumeIndex: 41, Offset: 8192, State: 0, RepairTool: false},
+ {Partition: 604, Type: 0, VolumeIndex: 29, Offset: 8192, State: 0, RepairTool: false},
+ {Partition: 673, Type: 0, VolumeIndex: 11, Offset: 8192, State: 0, RepairTool: false},
+ {Partition: 675, Type: 0, VolumeIndex: 42, Offset: 8192, State: 0, RepairTool: false},
+ {Partition: 710, Type: 0, VolumeIndex: 37, Offset: 8192, State: 0, RepairTool: false},
+ {Partition: 765, Type: 0, VolumeIndex: 36, Offset: 8192, State: 0, RepairTool: false},
+ {Partition: 766, Type: 0, VolumeIndex: 45, Offset: 8192, State: 0, RepairTool: false},
+ {Partition: 786, Type: 0, VolumeIndex: 23, Offset: 8192, State: 0, RepairTool: false},
+ {Partition: 809, Type: 0, VolumeIndex: 31, Offset: 8192, State: 0, RepairTool: false},
+ {Partition: 810, Type: 0, VolumeIndex: 13, Offset: 8192, State: 0, RepairTool: false},
+ {Partition: 855, Type: 0, VolumeIndex: 18, Offset: 8192, State: 0, RepairTool: false},
+ {Partition: 974, Type: 0, VolumeIndex: 9, Offset: 8192, State: 0, RepairTool: false},
+ {Partition: 977, Type: 0, VolumeIndex: 6, Offset: 8192, State: 0, RepairTool: false},
+ {Partition: 977, Type: 1, VolumeIndex: 7, Offset: 8192, State: 0, RepairTool: false},
+ {Partition: 1009, Type: 0, VolumeIndex: 34, Offset: 8192, State: 0, RepairTool: false},
+ {Partition: 1019, Type: 0, VolumeIndex: 10, Offset: 8192, State: 0, RepairTool: false},
+ }
+
+ objects := []pb.RegisterObjectRequest{
+ {Name: []byte("85fd12f8961e33cbf7229a94118524fa1515589781.45671.ts"), VolumeIndex: 3, Offset: 8192, NextOffset: 12288, RepairTool: false},
+ {Name: []byte("84afc1659c7e8271951fe370d6eee0f81515590332.51834.ts"), VolumeIndex: 5, Offset: 4096, NextOffset: 8192, RepairTool: false},
+ {Name: []byte("f45bf9000f39092b9de5a74256e3eebe1515590648.06511.ts"), VolumeIndex: 7, Offset: 4096, NextOffset: 8192, RepairTool: false},
+ {Name: []byte("43c8adc53dbb40d27add4f614fc49e5e1515595691.35618#0#d.data"), VolumeIndex: 8, Offset: 20480, NextOffset: 24576, RepairTool: false},
+ {Name: []byte("f3804523d91d294dab1500145b43395b1515596136.42189#4#d.data"), VolumeIndex: 9, Offset: 4096, NextOffset: 8192, RepairTool: false},
+ {Name: []byte("fefe1ba1120cd6cd501927401d6b2ecc1515750800.13517#2#d.data"), VolumeIndex: 10, Offset: 4096, NextOffset: 8192, RepairTool: false},
+ {Name: []byte("a8766d2608b77dc6cb0bfe3fe6782c731515750800.18975#0#d.data"), VolumeIndex: 11, Offset: 4096, NextOffset: 8192, RepairTool: false},
+ {Name: []byte("30f12368ca25d11fb1a80d10e64b15431515750800.19224#4#d.data"), VolumeIndex: 12, Offset: 4096, NextOffset: 8192, RepairTool: false},
+ {Name: []byte("ca9576ada218f74cb8f11648ecec439c1515750800.21553#2#d.data"), VolumeIndex: 13, Offset: 4096, NextOffset: 8192, RepairTool: false},
+ {Name: []byte("3549df7ef11006af6852587bf16d82971515750800.22096#2#d.data"), VolumeIndex: 14, Offset: 4096, NextOffset: 8192, RepairTool: false},
+ {Name: []byte("5a0a70e36a057a9982d1dc9188069b511515750803.50544#0#d.data"), VolumeIndex: 15, Offset: 8192, NextOffset: 12288, RepairTool: false},
+ {Name: []byte("5a1801fea97614f8c5f58511905773d01515750800.40035#0#d.data"), VolumeIndex: 15, Offset: 4096, NextOffset: 8192, RepairTool: false},
+ {Name: []byte("34c46ce96897a24374d126d7d7eab2fb1515750800.42545#0#d.data"), VolumeIndex: 16, Offset: 4096, NextOffset: 8192, RepairTool: false},
+ {Name: []byte("3cf60143ea488c84da9e1603158203a11515750800.93160#0#d.data"), VolumeIndex: 17, Offset: 4096, NextOffset: 8192, RepairTool: false},
+ {Name: []byte("d5c64e9cb0b093441fb6b500141aa0531515750800.94069#2#d.data"), VolumeIndex: 18, Offset: 4096, NextOffset: 8192, RepairTool: false},
+ {Name: []byte("11f5db768b6f9a37cf894af99b15c0d11515750801.05135#4#d.data"), VolumeIndex: 19, Offset: 4096, NextOffset: 8192, RepairTool: false},
+ {Name: []byte("02573d31b770cda8e0effd7762e8a0751515750801.09785#2#d.data"), VolumeIndex: 20, Offset: 4096, NextOffset: 8192, RepairTool: false},
+ {Name: []byte("6b08eabf5667557c72dc6570aa1fb8451515750801.08639#4#d.data"), VolumeIndex: 21, Offset: 4096, NextOffset: 8192, RepairTool: false},
+ {Name: []byte("6b08eabf5667557c72dc6570aa1fb8451515750856.77219.meta"), VolumeIndex: 21, Offset: 8192, NextOffset: 12288, RepairTool: false},
+ {Name: []byte("6b08eabf5667557c72dc6570abcfb8451515643210.72429#4#d.data"), VolumeIndex: 22, Offset: 8192, NextOffset: 12288, RepairTool: false},
+ {Name: []byte("687ba0410f4323c66397a85292077b101515750801.10244#0#d.data"), VolumeIndex: 22, Offset: 4096, NextOffset: 8192, RepairTool: false},
+ {Name: []byte("c4aaea9b28c425f45eb64d4d5b0b3f621515750801.19478#2#d.data"), VolumeIndex: 23, Offset: 4096, NextOffset: 8192, RepairTool: false},
+ {Name: []byte("0a0898eb861579d1240adbb1c9f0c92b1515750801.20636#2#d.data"), VolumeIndex: 24, Offset: 4096, NextOffset: 8192, RepairTool: false},
+ {Name: []byte("5efd43142db5913180ba865ef529eccd1515750801.64704#4#d.data"), VolumeIndex: 25, Offset: 4096, NextOffset: 8192, RepairTool: false},
+ {Name: []byte("806a35f1e974f93161b2da51760f22701515750801.68309#2#d.data"), VolumeIndex: 26, Offset: 4096, NextOffset: 8192, RepairTool: false},
+ {Name: []byte("0fdceb7af49cdd0cb1262acbdc88ae881515750801.93565#0#d.data"), VolumeIndex: 27, Offset: 4096, NextOffset: 8192, RepairTool: false},
+ {Name: []byte("49d4fa294d2c97f08596148bf4615bfa1515750801.93739#4#d.data"), VolumeIndex: 28, Offset: 4096, NextOffset: 8192, RepairTool: false},
+ {Name: []byte("971b4d05733f475d447d7f8b050bb0071515750802.09721#2#d.data"), VolumeIndex: 29, Offset: 4096, NextOffset: 8192, RepairTool: false},
+ {Name: []byte("8bc66b3ae033db15ceb3729d89a07ece1515750802.51062#0#d.data"), VolumeIndex: 30, Offset: 4096, NextOffset: 8192, RepairTool: false},
+ {Name: []byte("ca53beae1aeb4deacd17409e32305a2c1515750802.63996#2#d.data"), VolumeIndex: 31, Offset: 4096, NextOffset: 8192, RepairTool: false},
+ {Name: []byte("69375433763d9d511114e8ac869c916c1515750802.63846#0#d.data"), VolumeIndex: 32, Offset: 4096, NextOffset: 8192, RepairTool: false},
+ {Name: []byte("105de5f388ab4b72e56bc93f36ad388a1515750802.73393#2#d.data"), VolumeIndex: 33, Offset: 4096, NextOffset: 8192, RepairTool: false},
+ {Name: []byte("105de5f388ab4b72e56bc93f36ad388a1515873948.27383#2#d.meta"), VolumeIndex: 33, Offset: 8192, NextOffset: 12288, RepairTool: false},
+ {Name: []byte("fc6916fd1e6a0267afac88c395b876ac1515750802.83459#2#d.data"), VolumeIndex: 34, Offset: 4096, NextOffset: 8192, RepairTool: false},
+ {Name: []byte("02b10d6bfb205fe0f34f9bd82336dc711515750802.93662#2#d.data"), VolumeIndex: 35, Offset: 4096, NextOffset: 8192, RepairTool: false},
+ {Name: []byte("bf43763a98208f15da803e76bf52e7d11515750803.01357#0#d.data"), VolumeIndex: 36, Offset: 4096, NextOffset: 8192, RepairTool: false},
+ {Name: []byte("b1abadfed91b1cb4392dd2ec29e171ac1515750803.07767#4#d.data"), VolumeIndex: 37, Offset: 4096, NextOffset: 8192, RepairTool: false},
+ {Name: []byte("6de30d74634d088f1f5923336af2b3ae1515750803.36199#4#d.data"), VolumeIndex: 38, Offset: 4096, NextOffset: 8192, RepairTool: false},
+ {Name: []byte("7d234bbd1137d509105245ac78427b9f1515750803.49022#4#d.data"), VolumeIndex: 39, Offset: 4096, NextOffset: 8192, RepairTool: false},
+ {Name: []byte("749057975c1bac830360530bdcd741591515750803.49647#0#d.data"), VolumeIndex: 40, Offset: 4096, NextOffset: 8192, RepairTool: false},
+ {Name: []byte("9692991e77c9742cbc24469391d499981515750803.56295#0#d.data"), VolumeIndex: 41, Offset: 4096, NextOffset: 8192, RepairTool: false},
+ {Name: []byte("a8dbd473e360787caff0b97aca33373f1515750803.68428#2#d.data"), VolumeIndex: 42, Offset: 4096, NextOffset: 8192, RepairTool: false},
+ {Name: []byte("1ff88cb2b6b64f1fd3b6097f20203ee01515750803.73746#4#d.data"), VolumeIndex: 43, Offset: 4096, NextOffset: 8192, RepairTool: false},
+ {Name: []byte("71572f46094d7ac440f5e2a3c72da17b1515750803.75628#2#d.data"), VolumeIndex: 44, Offset: 4096, NextOffset: 8192, RepairTool: false},
+ {Name: []byte("bf8e83d954478d66ac1dba7eaa832c721515750803.81141#4#d.data"), VolumeIndex: 45, Offset: 4096, NextOffset: 8192, RepairTool: false},
+ {Name: []byte("69724f682fe12b4a4306bceeb75825431515750804.10112#2#d.data"), VolumeIndex: 46, Offset: 4096, NextOffset: 8192, RepairTool: false},
+ {Name: []byte("1bf38645ccc5f158c96480f1e0861a141515750804.31472#0#d.data"), VolumeIndex: 47, Offset: 4096, NextOffset: 8192, RepairTool: false},
+ {Name: []byte("51fecf0e0bb30920fd0d83ee8fba29f71515750804.32492#2#d.data"), VolumeIndex: 48, Offset: 4096, NextOffset: 8192, RepairTool: false},
+ {Name: []byte("2acbf85061e46b3bb3adb8930cb7414d1515750804.46622#2#d.data"), VolumeIndex: 49, Offset: 4096, NextOffset: 8192, RepairTool: false},
+ {Name: []byte("22e4a97f1d4f2b6d4150bb9b481e4c971515750804.51987#0#d.data"), VolumeIndex: 50, Offset: 4096, NextOffset: 8192, RepairTool: false},
+ }
+
+ // Register volumes
+ for _, df := range volumes {
+ out, err := proto.Marshal(&df)
+ if err != nil {
+ log.Error("failed to marshal")
+ return err
+ }
+ body := bytes.NewReader(out)
+ resp, err := client.Post("http://unix/register_volume", "application/octet-stream", body)
+ if err = check_200(resp, err); err != nil {
+ return err
+ }
+ defer resp.Body.Close()
+ }
+
+ // Register objects
+ for _, obj := range objects {
+ out, err := proto.Marshal(&obj)
+ if err != nil {
+ log.Error("failed to marshal")
+ return err
+ }
+ body := bytes.NewReader(out)
+ resp, err := client.Post("http://unix/register_object", "application/octet-stream", body)
+ if err = check_200(resp, err); err != nil {
+ return err
+ }
+ defer resp.Body.Close()
+ }
+ return
+}
+
+func TestMain(m *testing.M) {
+ log.SetLevel(logrus.InfoLevel)
+ diskPath, err := ioutil.TempDir("/tmp", "losf-test")
+ if err != nil {
+ log.Fatal(err)
+ }
+ rootDir := path.Join(diskPath, "losf")
+ dbDir := path.Join(rootDir, "db")
+
+ err = os.MkdirAll(rootDir, 0700)
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ kv, err := openLevigoDB(dbDir)
+ if err != nil {
+ log.Fatal("failed to create leveldb")
+ }
+ socket_path := "/tmp/rpc.socket"
+ listening := make(chan bool, 1)
+ go runTestServer(kv, diskPath, socket_path, listening)
+ // Wait for the socket
+ <-listening
+
+ client = http.Client{Transport: &http.Transport{DialContext: func(_ context.Context, _, _ string) (net.Conn, error) {
+ return net.Dial("unix", "/tmp/rpc.socket")
+ },
+ },
+ }
+
+ err = populateKV()
+ if err != nil {
+ log.Error(err)
+ log.Fatal("failed to populate test KV")
+ }
+
+ ret := m.Run()
+
+ teardown(diskPath)
+
+ os.Exit(ret)
+}
+
+// TODO, add more tests:
+// - prefix with no objects
+// - single object
+// - first and last elements of the KV
+func TestLoadObjectsByPrefix(t *testing.T) {
+ prefix := &pb.LoadObjectsByPrefixRequest{Prefix: []byte("105de5f388ab4b72e56bc93f36ad388a")}
+
+ out, err := proto.Marshal(prefix)
+ if err != nil {
+ t.Error("failed to marshal")
+ }
+ body := bytes.NewReader(out)
+
+ expectedObjects := []pb.Object{
+ {Name: []byte("105de5f388ab4b72e56bc93f36ad388a1515750802.73393#2#d.data"), VolumeIndex: 33, Offset: 4096},
+ {Name: []byte("105de5f388ab4b72e56bc93f36ad388a1515873948.27383#2#d.meta"), VolumeIndex: 33, Offset: 8192},
+ }
+
+ response, err := client.Post("http://unix/load_objects_by_prefix", "application/octet-stream", body)
+ if err = check_200(response, err); err != nil {
+ t.Fatalf("RPC call failed: %v", err)
+ }
+ defer response.Body.Close()
+
+ r := &pb.LoadObjectsByPrefixReply{}
+ buf := new(bytes.Buffer)
+ buf.ReadFrom(response.Body)
+ if err = proto.Unmarshal(buf.Bytes(), r); err != nil {
+ t.Error("failed to unmarshal")
+ }
+
+ for i, obj := range r.Objects {
+ expected := expectedObjects[i]
+ if !bytes.Equal(obj.Name, expected.Name) {
+ t.Errorf("\ngot : %s\nexpected: %s", string(obj.Name), string(expected.Name))
+ }
+ }
+}
+
+func TestListPartitions(t *testing.T) {
+ partPower := uint32(10)
+
+ lpInfo := &pb.ListPartitionsRequest{PartitionBits: partPower}
+ out, err := proto.Marshal(lpInfo)
+ if err != nil {
+ t.Error("failed to marshal")
+ }
+ body := bytes.NewReader(out)
+
+ expectedPartitions := []string{"9", "10", "40", "63", "65", "71", "111", "127", "139", "171", "195", "211", "213", "243", "271", "295", "327", "360", "379", "417", "420", "421", "428", "439", "453", "466", "500", "513", "530", "535", "559", "602", "604", "673", "675", "710", "765", "766", "786", "809", "810", "855", "974", "977", "1009", "1019"}
+
+ response, err := client.Post("http://unix/list_partitions", "application/octet-stream", body)
+ if err = check_200(response, err); err != nil {
+ t.Fatalf("RPC call failed: %v", err)
+ }
+ defer response.Body.Close()
+
+ r := &pb.DirEntries{}
+ buf := new(bytes.Buffer)
+ buf.ReadFrom(response.Body)
+ if err = proto.Unmarshal(buf.Bytes(), r); err != nil {
+ t.Error("failed to unmarshal")
+ }
+
+ if len(r.Entry) != len(expectedPartitions) {
+ t.Fatalf("\ngot: %v\nwant: %v", r.Entry, expectedPartitions)
+ }
+
+ for i, obj := range r.Entry {
+ if obj != expectedPartitions[i] {
+ t.Fatalf("checking individual elements\ngot: %v\nwant: %v", r.Entry, expectedPartitions)
+ }
+ }
+}
+
+// TODO: add more tests, have a suffix with multiple entries
+func TestListSuffix(t *testing.T) {
+ partition := uint32(428)
+ partPower := uint32(10)
+ suffix := []byte("845")
+
+ lsInfo := &pb.ListSuffixRequest{Partition: partition, Suffix: suffix, PartitionBits: partPower}
+ out, err := proto.Marshal(lsInfo)
+ if err != nil {
+ t.Error(err)
+ }
+ body := bytes.NewReader(out)
+
+ expectedHashes := []string{"6b08eabf5667557c72dc6570aa1fb845", "6b08eabf5667557c72dc6570abcfb845"}
+
+ response, err := client.Post("http://unix/list_suffix", "application/octet-stream", body)
+ if err = check_200(response, err); err != nil {
+ t.Fatalf("RPC call failed: %v", err)
+ }
+ defer response.Body.Close()
+
+ r := &pb.DirEntries{}
+ buf := new(bytes.Buffer)
+ buf.ReadFrom(response.Body)
+ if err = proto.Unmarshal(buf.Bytes(), r); err != nil {
+ t.Error(err)
+ }
+
+ if len(r.Entry) != len(expectedHashes) {
+ t.Fatalf("\ngot: %v\nwant: %v", r.Entry, expectedHashes)
+ }
+
+ for i, obj := range r.Entry {
+ if obj != expectedHashes[i] {
+ t.Fatalf("checking individual elements\ngot: %v\nwant: %v", r.Entry, expectedHashes)
+ }
+ }
+}
+
+func TestState(t *testing.T) {
+ // Mark dirty and check
+ kvstate := &pb.KvState{}
+ out, err := proto.Marshal(kvstate)
+ if err != nil {
+ t.Error(err)
+ }
+ body := bytes.NewReader(out)
+
+ response, err := client.Post("http://unix/set_kv_state", "application/octet-stream", body)
+ if err = check_200(response, err); err != nil {
+ t.Fatalf("Failed to change KV state: %v", err)
+ }
+ response.Body.Close()
+
+ empty := &pb.GetKvStateRequest{}
+ empty_serialized, err := proto.Marshal(empty)
+ if err != nil {
+ t.Error(err)
+ }
+ body = bytes.NewReader(empty_serialized)
+
+ response, err = client.Post("http://unix/get_kv_state", "application/octet-stream", body)
+ if err = check_200(response, err); err != nil {
+ t.Fatalf("RPC call failed: %v", err)
+ }
+ r := &pb.KvState{}
+ buf := new(bytes.Buffer)
+ buf.ReadFrom(response.Body)
+ if err = proto.Unmarshal(buf.Bytes(), r); err != nil {
+ t.Error(err)
+ }
+
+ if r.IsClean != false {
+ t.Fatal("isClean true, should be false")
+ }
+
+ // Mark clean and check
+ kvstate = &pb.KvState{IsClean: true}
+ out, err = proto.Marshal(kvstate)
+ if err != nil {
+ t.Error(err)
+ }
+ body = bytes.NewReader(out)
+
+ response, err = client.Post("http://unix/set_kv_state", "application/octet-stream", body)
+ if err = check_200(response, err); err != nil {
+ t.Fatalf("Failed to change KV state: %v", err)
+ }
+ response.Body.Close()
+
+ body = bytes.NewReader(empty_serialized)
+ response, err = client.Post("http://unix/get_kv_state", "application/octet-stream", body)
+ if err = check_200(response, err); err != nil {
+ t.Fatalf("RPC call failed: %v", err)
+ }
+ defer response.Body.Close()
+ buf.Reset()
+ buf.ReadFrom(response.Body)
+
+ if err = proto.Unmarshal(buf.Bytes(), r); err != nil {
+ t.Error(err)
+ }
+
+ if r.IsClean != true {
+ t.Fatal("isClean false, should be true")
+ }
+
+}
+
+func TestRegisterObject(t *testing.T) {
+ // Register new non-existing object
+ name := []byte("33dea50d391ee52a8ead7cb562a9b4e2/1539791765.84449#5#d.data")
+ obj := &pb.RegisterObjectRequest{Name: name, VolumeIndex: 1, Offset: 4096, NextOffset: 8192, RepairTool: false}
+ out, err := proto.Marshal(obj)
+ if err != nil {
+ t.Fatal(err)
+ }
+ body := bytes.NewReader(out)
+
+ response, err := client.Post("http://unix/register_object", "application/octet-stream", body)
+ if err = check_200(response, err); err != nil {
+ t.Fatalf("failed to register object: %s", err)
+ }
+ response.Body.Close()
+
+ objInfo := &pb.LoadObjectRequest{Name: name, IsQuarantined: false, RepairTool: false}
+ out, err = proto.Marshal(objInfo)
+ if err != nil {
+ t.Fatal(err)
+ }
+ body = bytes.NewReader(out)
+ response, err = client.Post("http://unix/load_object", "application/octet-stream", body)
+ if err = check_200(response, err); err != nil {
+ t.Fatalf("error getting registered object: %s", err)
+ }
+ r := &pb.Object{}
+ buf := new(bytes.Buffer)
+ buf.ReadFrom(response.Body)
+ if err = proto.Unmarshal(buf.Bytes(), r); err != nil {
+ t.Fatal(err)
+ }
+ response.Body.Close()
+
+ if !bytes.Equal(r.Name, name) || r.VolumeIndex != 1 || r.Offset != 4096 {
+ t.Fatalf("object found but name, volume index, or offset, is wrong: %v", r)
+ }
+
+ // Register existing object, which should fail
+ obj = &pb.RegisterObjectRequest{Name: name, VolumeIndex: 1, Offset: 4096, NextOffset: 8192, RepairTool: false}
+ out, err = proto.Marshal(obj)
+ if err != nil {
+ t.Fatal(err)
+ }
+ body = bytes.NewReader(out)
+ response, err = client.Post("http://unix/register_object", "application/octet-stream", body)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if response.StatusCode != int(codes.AlreadyExists) {
+ t.Fatalf("wrong status code, expected: %d, got: %d", codes.AlreadyExists, response.StatusCode)
+ }
+ response.Body.Close()
+
+ // Remove object
+ unregInfo := &pb.UnregisterObjectRequest{Name: name}
+ out, err = proto.Marshal(unregInfo)
+ if err != nil {
+ t.Fatal(err)
+ }
+ body = bytes.NewReader(out)
+ response, err = client.Post("http://unix/unregister_object", "application/octet-stream", body)
+ if err = check_200(response, err); err != nil {
+ t.Fatalf("failed to unregister object: %s", err)
+ }
+ response.Body.Close()
+
+ // Attempt to remove again, should fail
+ body = bytes.NewReader(out)
+ response, err = client.Post("http://unix/unregister_object", "application/octet-stream", body)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if response.StatusCode != int(codes.NotFound) {
+ t.Fatalf("wrong status code, expected: %d, got: %d", codes.NotFound, response.StatusCode)
+ }
+}
+
+func TestQuarantineObject(t *testing.T) {
+ // Quarantine an existing object
+ name := []byte("bf43763a98208f15da803e76bf52e7d11515750803.01357#0#d.data")
+ objName := &pb.QuarantineObjectRequest{Name: name, RepairTool: false}
+ out, err := proto.Marshal(objName)
+ if err != nil {
+ t.Fatal(err)
+ }
+ body := bytes.NewReader(out)
+ response, err := client.Post("http://unix/quarantine_object", "applicable/octet-stream", body)
+ if err = check_200(response, err); err != nil {
+ t.Fatal("Failed to quarantine object")
+ }
+ response.Body.Close()
+
+ // We shouldn't be able to find it
+ objInfo := &pb.LoadObjectRequest{Name: name, IsQuarantined: false, RepairTool: false}
+ out, err = proto.Marshal(objInfo)
+ if err != nil {
+ t.Fatal(err)
+ }
+ body = bytes.NewReader(out)
+ response, err = client.Post("http://unix/quarantine_object", "applicable/octet-stream", body)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if response.StatusCode != int(codes.NotFound) {
+ t.Fatalf("wrong status code, expected: %d, got: %d", codes.NotFound, response.StatusCode)
+ }
+ response.Body.Close()
+
+ // TODO, need to test that the quarantined object exists
+ // then try to quarantine non existent object
+}
+
+func TestUnquarantineObject(t *testing.T) {
+ // Unquarantine an existing quarantined object (check that)
+ name := []byte("bf43763a98208f15da803e76bf52e7d11515750803.01357#0#d.data")
+ objName := &pb.UnquarantineObjectRequest{Name: name, RepairTool: false}
+ out, err := proto.Marshal(objName)
+ if err != nil {
+ t.Fatal(err)
+ }
+ body := bytes.NewReader(out)
+
+ response, err := client.Post("http://unix/unquarantine_object", "application/octet-stream", body)
+ if err = check_200(response, err); err != nil {
+ t.Fatal("Failed to quarantine object")
+ }
+ response.Body.Close()
+
+ // We should be able to find it
+ objInfo := &pb.LoadObjectRequest{Name: name, IsQuarantined: false, RepairTool: false}
+ out, err = proto.Marshal(objInfo)
+ if err != nil {
+ t.Fatal(err)
+ }
+ body = bytes.NewReader(out)
+
+ response, err = client.Post("http://unix/load_object", "application/octet-stream", body)
+ if err = check_200(response, err); err != nil {
+ t.Fatal("cannot find unquarantined object")
+ }
+ response.Body.Close()
+
+ // TODO, need to test that the quarantined object exists
+ // then try to quarantine non existent object
+}
+
+// This test modifies the DB
+func TestListQuarantinedOHashes(t *testing.T) {
+ // We shouldn't find any quarantined object initially
+ lqInfo := &pb.ListQuarantinedOHashesRequest{PageSize: 100}
+ out, err := proto.Marshal(lqInfo)
+ if err != nil {
+ t.Fatal(err)
+ }
+ body := bytes.NewReader(out)
+
+ response, err := client.Post("http://unix/list_quarantined_ohashes", "application/octet-stream", body)
+ if err = check_200(response, err); err != nil {
+ t.Fatalf("failed to list quarantined ohashes: %v", err)
+ }
+
+ r := &pb.ListQuarantinedOHashesReply{}
+ buf := new(bytes.Buffer)
+ buf.ReadFrom(response.Body)
+ if err = proto.Unmarshal(buf.Bytes(), r); err != nil {
+ t.Fatal(err)
+ }
+
+ if r.Objects != nil {
+ t.Fatalf("Did not expect to find any quarantined objects. Found: %v", r.Objects)
+ }
+ response.Body.Close()
+
+ // Quarantine a few objects and check we can find them
+ objectsToQuarantine := []pb.QuarantineObjectRequest{
+ {Name: []byte("02573d31b770cda8e0effd7762e8a0751515750801.09785#2#d.data"), RepairTool: false},
+ {Name: []byte("6b08eabf5667557c72dc6570aa1fb8451515750801.08639#4#d.data"), RepairTool: false},
+ {Name: []byte("6b08eabf5667557c72dc6570aa1fb8451515750856.77219.meta"), RepairTool: false},
+ {Name: []byte("6b08eabf5667557c72dc6570abcfb8451515643210.72429#4#d.data"), RepairTool: false},
+ {Name: []byte("687ba0410f4323c66397a85292077b101515750801.10244#0#d.data"), RepairTool: false},
+ {Name: []byte("c4aaea9b28c425f45eb64d4d5b0b3f621515750801.19478#2#d.data"), RepairTool: false},
+ {Name: []byte("0a0898eb861579d1240adbb1c9f0c92b1515750801.20636#2#d.data"), RepairTool: false},
+ }
+
+ expectedOhashes := [][]byte{
+ []byte("02573d31b770cda8e0effd7762e8a075"),
+ []byte("0a0898eb861579d1240adbb1c9f0c92b"),
+ []byte("687ba0410f4323c66397a85292077b10"),
+ []byte("6b08eabf5667557c72dc6570aa1fb845"),
+ []byte("6b08eabf5667557c72dc6570abcfb845"),
+ []byte("c4aaea9b28c425f45eb64d4d5b0b3f62"),
+ }
+
+ for _, qObj := range objectsToQuarantine {
+ out, err = proto.Marshal(&qObj)
+ if err != nil {
+ t.Error(err)
+ }
+ body = bytes.NewReader(out)
+
+ response, err = client.Post("http://unix/quarantine_object", "application/octet-stream", body)
+ if err = check_200(response, err); err != nil {
+ t.Fatalf("failed to quarantine object: %s", err)
+ }
+ response.Body.Close()
+ }
+
+ // List quarantined objects
+ lqInfo = &pb.ListQuarantinedOHashesRequest{PageSize: 100}
+ out, err = proto.Marshal(lqInfo)
+ if err != nil {
+ t.Fatal(err)
+ }
+ body = bytes.NewReader(out)
+
+ response, err = client.Post("http://unix/list_quarantined_ohashes", "application/octet-stream", body)
+ if err = check_200(response, err); err != nil {
+ t.Fatalf("failed to list quarantined ohashes: %v", err)
+ }
+
+ r = &pb.ListQuarantinedOHashesReply{}
+ buf.Reset()
+ buf.ReadFrom(response.Body)
+ if err = proto.Unmarshal(buf.Bytes(), r); err != nil {
+ t.Fatal(err)
+ }
+
+ response.Body.Close()
+
+ receivedOhashes := [][]byte{}
+ for _, obj := range r.Objects {
+ receivedOhashes = append(receivedOhashes, obj.Name)
+ }
+
+ if !testEqSliceBytes(receivedOhashes, expectedOhashes) {
+ t.Fatalf("\nexpected %v\ngot %v", expectedOhashes, receivedOhashes)
+ }
+
+ // We got all quarantined objects, so NextPageToken shouldn't be set
+ if !bytes.Equal(r.NextPageToken, []byte("")) {
+ t.Fatalf("\nexpected %v got %v", []byte("foo"), r.NextPageToken)
+ }
+
+ // List quarantined objects, with a PageSize of 1
+ lqInfo = &pb.ListQuarantinedOHashesRequest{PageSize: 1}
+ out, err = proto.Marshal(lqInfo)
+ if err != nil {
+ t.Fatal(err)
+ }
+ body = bytes.NewReader(out)
+
+ response, err = client.Post("http://unix/list_quarantined_ohashes", "application/octet-stream", body)
+ if err = check_200(response, err); err != nil {
+ t.Fatalf("failed to list quarantined ohashes: %v", err)
+ }
+
+ r = &pb.ListQuarantinedOHashesReply{}
+ buf.Reset()
+ buf.ReadFrom(response.Body)
+ if err = proto.Unmarshal(buf.Bytes(), r); err != nil {
+ t.Fatal(err)
+ }
+
+ response.Body.Close()
+
+ receivedOhashes = [][]byte{}
+ for _, obj := range r.Objects {
+ receivedOhashes = append(receivedOhashes, obj.Name)
+ }
+
+ if !testEqSliceBytes(receivedOhashes, [][]byte{expectedOhashes[0]}) {
+ t.Fatalf("\nexpected %v\ngot %v", [][]byte{expectedOhashes[0]}, receivedOhashes)
+ }
+
+ // We got the first object, expect NextPageToken to be the second quarantined object hash
+ if !bytes.Equal(r.NextPageToken, expectedOhashes[1]) {
+ t.Fatalf("\nexpected %v got %v", expectedOhashes[1], r.NextPageToken)
+ }
+
+ // Get the next two entries
+ lqInfo = &pb.ListQuarantinedOHashesRequest{PageSize: 2, PageToken: r.NextPageToken}
+ out, err = proto.Marshal(lqInfo)
+ if err != nil {
+ t.Fatal(err)
+ }
+ body = bytes.NewReader(out)
+
+ response, err = client.Post("http://unix/list_quarantined_ohashes", "application/octet-stream", body)
+ if err = check_200(response, err); err != nil {
+ t.Fatalf("failed to list quarantined ohashes: %v", err)
+ }
+
+ r = &pb.ListQuarantinedOHashesReply{}
+ buf.Reset()
+ buf.ReadFrom(response.Body)
+ if err = proto.Unmarshal(buf.Bytes(), r); err != nil {
+ t.Fatal(err)
+ }
+
+ response.Body.Close()
+
+ receivedOhashes = [][]byte{}
+ for _, obj := range r.Objects {
+ receivedOhashes = append(receivedOhashes, obj.Name)
+ }
+
+ if !testEqSliceBytes(receivedOhashes, expectedOhashes[1:3]) {
+ t.Fatalf("\nexpected %v\ngot %v", expectedOhashes[1:3], receivedOhashes)
+ }
+
+ // We've read 3, expecte NextPageToken to be the 4th quarantined object
+ if !bytes.Equal(r.NextPageToken, expectedOhashes[3]) {
+ t.Fatalf("\nexpected %v got %v", expectedOhashes[3], r.NextPageToken)
+ }
+
+ // Get all remaining entries
+ lqInfo = &pb.ListQuarantinedOHashesRequest{PageSize: 100, PageToken: r.NextPageToken}
+ out, err = proto.Marshal(lqInfo)
+ if err != nil {
+ t.Fatal(err)
+ }
+ body = bytes.NewReader(out)
+
+ response, err = client.Post("http://unix/list_quarantined_ohashes", "application/octet-stream", body)
+ if err = check_200(response, err); err != nil {
+ t.Fatalf("failed to list quarantined ohashes: %v", err)
+ }
+
+ r = &pb.ListQuarantinedOHashesReply{}
+ buf.Reset()
+ buf.ReadFrom(response.Body)
+ if err = proto.Unmarshal(buf.Bytes(), r); err != nil {
+ t.Fatal(err)
+ }
+
+ response.Body.Close()
+
+ receivedOhashes = [][]byte{}
+ for _, obj := range r.Objects {
+ receivedOhashes = append(receivedOhashes, obj.Name)
+ }
+
+ if !testEqSliceBytes(receivedOhashes, expectedOhashes[3:]) {
+ t.Fatalf("\nexpected %v\ngot %v", expectedOhashes[1:3], receivedOhashes)
+ }
+
+ // We've read all quarantined objects, NextPageToken should not be set
+ if !bytes.Equal(r.NextPageToken, []byte("")) {
+ t.Fatalf("\nexpected %v got %v", []byte(""), r.NextPageToken)
+ }
+
+}
+
+func TestLoadObjectsByVolume(t *testing.T) {
+ // List non quarantined objects from volume 22, we should not find any
+ volIndex := &pb.LoadObjectsByVolumeRequest{Index: 22}
+ out, err := proto.Marshal(volIndex)
+ if err != nil {
+ t.Fatal(err)
+ }
+ body := bytes.NewReader(out)
+
+ response, err := client.Post("http://unix/load_objects_by_volume", "application/octet-stream", body)
+ if err = check_200(response, err); err != nil {
+ t.Fatalf("failed to call LoadObjectsByVolume: %v", err)
+ }
+
+ r := &pb.LoadObjectsByVolumeReply{}
+ buf := new(bytes.Buffer)
+ buf.ReadFrom(response.Body)
+ if err = proto.Unmarshal(buf.Bytes(), r); err != nil {
+ t.Fatal(err)
+ }
+
+ if r.Objects != nil {
+ t.Fatalf("did not expect to find objects")
+ }
+
+ // List quarantined objects from volume 22
+ volIndex = &pb.LoadObjectsByVolumeRequest{Index: 22, Quarantined: true}
+ out, err = proto.Marshal(volIndex)
+ if err != nil {
+ t.Fatal(err)
+ }
+ body = bytes.NewReader(out)
+
+ response, err = client.Post("http://unix/load_objects_by_volume", "application/octet-stream", body)
+ if err = check_200(response, err); err != nil {
+ t.Fatalf("failed to call LoadObjectsByVolume: %v", err)
+ }
+
+ r = &pb.LoadObjectsByVolumeReply{}
+ buf = new(bytes.Buffer)
+ buf.ReadFrom(response.Body)
+ if err = proto.Unmarshal(buf.Bytes(), r); err != nil {
+ t.Fatal(err)
+ }
+
+ expectedObjects := []pb.Object{
+ {Name: []byte("687ba0410f4323c66397a85292077b101515750801.10244#0#d.data"), VolumeIndex: 22, Offset: 4096},
+ {Name: []byte("6b08eabf5667557c72dc6570abcfb8451515643210.72429#4#d.data"), VolumeIndex: 22, Offset: 8192},
+ }
+
+ // we should have all of them
+ if len(r.Objects) != len(expectedObjects) {
+ t.Fatalf("Expected %d objects, got %d", len(expectedObjects), len(r.Objects))
+ }
+ if r.NextPageToken != nil {
+ t.Fatalf("Expected NextPageToken to be nil, but got: %s", string(r.NextPageToken))
+ }
+
+ for i, obj := range r.Objects {
+ if !bytes.Equal(obj.Name, expectedObjects[i].Name) {
+ // t.Fatalf("expected %v, got %v", expectedObjects[i].Name, obj.Name)
+ t.Fatalf("expected %s, got %s", string(expectedObjects[i].Name), string(obj.Name))
+ }
+ if obj.VolumeIndex != expectedObjects[i].VolumeIndex {
+ t.Fatalf("expected %d, got %d", expectedObjects[i].VolumeIndex, obj.VolumeIndex)
+ }
+ if obj.Offset != expectedObjects[i].Offset {
+ t.Fatalf("expected %d, got %d", expectedObjects[i].Offset, obj.Offset)
+ }
+ }
+
+ // List quarantined objects from volume 22 with pagination
+ volIndex = &pb.LoadObjectsByVolumeRequest{Index: 22, Quarantined: true, PageSize: 1}
+ out, err = proto.Marshal(volIndex)
+ if err != nil {
+ t.Fatal(err)
+ }
+ body = bytes.NewReader(out)
+
+ response, err = client.Post("http://unix/load_objects_by_volume", "application/octet-stream", body)
+ if err = check_200(response, err); err != nil {
+ t.Fatalf("failed to call LoadObjectsByVolume: %v", err)
+ }
+
+ r = &pb.LoadObjectsByVolumeReply{}
+ buf = new(bytes.Buffer)
+ buf.ReadFrom(response.Body)
+ if err = proto.Unmarshal(buf.Bytes(), r); err != nil {
+ t.Fatal(err)
+ }
+
+ // we should have one object
+ if len(r.Objects) != 1 {
+ t.Fatalf("Expected 1 objects, got %d", len(r.Objects))
+ }
+ if !bytes.Equal(r.NextPageToken, expectedObjects[1].Name) {
+ t.Fatalf("Expected NextPageToken to be %s, but got: %s", string(expectedObjects[1].Name), string(r.NextPageToken))
+ }
+
+ if !bytes.Equal(r.Objects[0].Name, expectedObjects[0].Name) {
+ // t.Fatalf("expected %v, got %v", expectedObjects[i].Name, obj.Name)
+ t.Fatalf("expected %s, got %s", string(expectedObjects[0].Name), string(r.Objects[0].Name))
+ }
+ if r.Objects[0].VolumeIndex != expectedObjects[0].VolumeIndex {
+ t.Fatalf("expected %d, got %d", expectedObjects[0].VolumeIndex, r.Objects[0].VolumeIndex)
+ }
+ if r.Objects[0].Offset != expectedObjects[0].Offset {
+ t.Fatalf("expected %d, got %d", expectedObjects[0].Offset, r.Objects[0].Offset)
+ }
+
+ // Second call with pagination
+ volIndex = &pb.LoadObjectsByVolumeRequest{Index: 22, Quarantined: true, PageSize: 1, PageToken: r.NextPageToken}
+ out, err = proto.Marshal(volIndex)
+ if err != nil {
+ t.Fatal(err)
+ }
+ body = bytes.NewReader(out)
+
+ response, err = client.Post("http://unix/load_objects_by_volume", "application/octet-stream", body)
+ if err = check_200(response, err); err != nil {
+ t.Fatalf("failed to call LoadObjectsByVolume: %v", err)
+ }
+
+ r = &pb.LoadObjectsByVolumeReply{}
+ buf = new(bytes.Buffer)
+ buf.ReadFrom(response.Body)
+ if err = proto.Unmarshal(buf.Bytes(), r); err != nil {
+ t.Fatal(err)
+ }
+
+ // we should have one object
+ if len(r.Objects) != 1 {
+ t.Fatalf("Expected 1 objects, got %d", len(r.Objects))
+ }
+ if r.NextPageToken != nil {
+ t.Fatalf("Expected NextPageToken to be nil, but got: %s", string(r.NextPageToken))
+ }
+
+ if !bytes.Equal(r.Objects[0].Name, expectedObjects[1].Name) {
+ t.Fatalf("expected %s, got %s", string(expectedObjects[0].Name), string(r.Objects[0].Name))
+ }
+ if r.Objects[0].VolumeIndex != expectedObjects[1].VolumeIndex {
+ t.Fatalf("expected %d, got %d", expectedObjects[1].VolumeIndex, r.Objects[0].VolumeIndex)
+ }
+ if r.Objects[0].Offset != expectedObjects[1].Offset {
+ t.Fatalf("expected %d, got %d", expectedObjects[1].Offset, r.Objects[0].Offset)
+ }
+}
+
+func TestListQuarantinedOHash(t *testing.T) {
+ ohash := &pb.ListQuarantinedOHashRequest{Prefix: []byte("6b08eabf5667557c72dc6570aa1fb845"), RepairTool: false}
+ out, err := proto.Marshal(ohash)
+ if err != nil {
+ t.Fatal(err)
+ }
+ body := bytes.NewReader(out)
+
+ response, err := client.Post("http://unix/list_quarantined_ohash", "application/octet-stream", body)
+ if err = check_200(response, err); err != nil {
+ t.Fatalf("error listing quarantined object files: %s", err)
+ }
+
+ qList := &pb.ListQuarantinedOHashReply{}
+ buf := new(bytes.Buffer)
+ buf.ReadFrom(response.Body)
+ if err = proto.Unmarshal(buf.Bytes(), qList); err != nil {
+ t.Fatal(err)
+ }
+ response.Body.Close()
+
+ expectedFiles := [][]byte{
+ []byte("6b08eabf5667557c72dc6570aa1fb8451515750801.08639#4#d.data"),
+ []byte("6b08eabf5667557c72dc6570aa1fb8451515750856.77219.meta"),
+ }
+
+ if len(qList.Objects) != len(expectedFiles) {
+ t.Fatalf("got %d objects, expected %d", len(qList.Objects), len(expectedFiles))
+ }
+
+ receivedFiles := make([][]byte, len(qList.Objects))
+ for i, obj := range qList.Objects {
+ receivedFiles[i] = obj.Name
+ }
+
+ if !testEqSliceBytes(receivedFiles, expectedFiles) {
+ t.Fatalf("\nexpected %v\ngot %v", expectedFiles, receivedFiles)
+ }
+
+ // Add test, non existent ohash
+ // Add test, unquarantine one file, list again
+}
+
+func testEqSliceBytes(a, b [][]byte) bool {
+ if a == nil && b == nil {
+ return true
+ }
+ if a == nil || b == nil {
+ return false
+ }
+ if len(a) != len(b) {
+ return false
+ }
+ for i := range a {
+ if !bytes.Equal(a[i], b[i]) {
+ return false
+ }
+ }
+ return true
+}
diff --git a/go/swift-rpc-losf/snappy b/go/swift-rpc-losf/snappy
new file mode 160000
+Subproject 3f194acb57e0487531c96b97af61dcbd025a78a
diff --git a/go/swift-rpc-losf/stats.go b/go/swift-rpc-losf/stats.go
new file mode 100644
index 000000000..cc8ebd740
--- /dev/null
+++ b/go/swift-rpc-losf/stats.go
@@ -0,0 +1,41 @@
+// Copyright (c) 2010-2012 OpenStack Foundation
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+// implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main
+
+// Returns the number of entries within the namespace
+func countItems(kv KV, namespace byte) (itemCount uint64) {
+ it := kv.NewIterator(namespace)
+ defer it.Close()
+
+ for it.SeekToFirst(); it.Valid(); it.Next() {
+ itemCount++
+ }
+
+ return
+}
+
+// Returns KV stats in a map
+// It will walk the whole KV to do so, used mostly for debugging, don't use for monitoring.
+func CollectStats(s *server) (entriesCount map[string]uint64) {
+ entriesCount = make(map[string]uint64)
+
+ entriesCount["volume_count"] = countItems(s.kv, volumePrefix)
+ entriesCount["object_count"] = countItems(s.kv, objectPrefix)
+ entriesCount["deletequeue_count"] = countItems(s.kv, deleteQueuePrefix)
+ entriesCount["quarantine_count"] = countItems(s.kv, quarantinePrefix)
+
+ return
+}
diff --git a/go/swift-rpc-losf/status/status.go b/go/swift-rpc-losf/status/status.go
new file mode 100644
index 000000000..66b21646c
--- /dev/null
+++ b/go/swift-rpc-losf/status/status.go
@@ -0,0 +1,27 @@
+package status
+
+import (
+ "fmt"
+ "github.com/openstack/swift-rpc-losf/codes"
+)
+
+type RpcError struct {
+ code codes.StatusCode
+ msg string
+}
+
+func (e *RpcError) Error() string {
+ return fmt.Sprintf("rpc error: %d. %s", e.code, e.msg)
+}
+
+func Error(code codes.StatusCode, msg string) error {
+ return &RpcError{code, msg}
+}
+
+func Errorf(code codes.StatusCode, format string, a ...interface{}) error {
+ return Error(code, fmt.Sprintf(format, a...))
+}
+
+func (e *RpcError) Code() codes.StatusCode {
+ return e.code
+}
diff --git a/go/swift-rpc-losf/swift.go b/go/swift-rpc-losf/swift.go
new file mode 100644
index 000000000..c51362dac
--- /dev/null
+++ b/go/swift-rpc-losf/swift.go
@@ -0,0 +1,66 @@
+// Copyright (c) 2010-2012 OpenStack Foundation
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+// implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main
+
+import (
+ "errors"
+ "fmt"
+ "strconv"
+)
+
+// getPartitionFromOhash returns the partition, given an object hash and the partition bit count
+func getPartitionFromOhash(ohash []byte, partitionBits int) (partition uint64, err error) {
+ if len(ohash) < 16 {
+ err = errors.New("ohash must be at least 16 bits long")
+ return
+ }
+ highHash, err := strconv.ParseUint(string(ohash[0:16]), 16, 64)
+ if err != nil {
+ return
+ }
+
+ // shift to get the partition
+ partition = highHash >> uint64(64-partitionBits)
+ return
+}
+
+// getLastPartition returns the last possible partition given the partition bit count
+func getLastPartition(partitionBits int) (partition uint64, err error) {
+ for i := 0; i < partitionBits; i++ {
+ partition |= 1 << uint64(i)
+ }
+ return
+}
+
+// Returns the first possible object prefix the KV for the given partition and bit count
+// Example: 876, 18 bits -> 00db000000000000
+func getObjPrefixFromPartition(partition uint64, partitionBits int) (prefix []byte, err error) {
+ firstnum := partition << uint64(64-partitionBits)
+ prefix = []byte(fmt.Sprintf("%016x0000000000000000", firstnum))
+ return
+}
+
+// Returns the first possible object prefix the KV for the given partition and bit count,
+// in its encoded form
+func getEncodedObjPrefixFromPartition(partition uint64, partitionBits int) (prefix []byte, err error) {
+ key, err := getObjPrefixFromPartition(partition, partitionBits)
+ if err != nil {
+ return
+ }
+
+ prefix, err = EncodeObjectKey(key)
+ return
+}
diff --git a/go/swift-rpc-losf/swift_test.go b/go/swift-rpc-losf/swift_test.go
new file mode 100644
index 000000000..5e488558b
--- /dev/null
+++ b/go/swift-rpc-losf/swift_test.go
@@ -0,0 +1,130 @@
+// Copyright (c) 2010-2012 OpenStack Foundation
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+// implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main
+
+import (
+ "bytes"
+ "testing"
+)
+
+type getPartitionTest struct {
+ ohash string
+ bitCount int
+ expectedPartition uint64
+}
+
+func TestGetPartitionFromOhash(t *testing.T) {
+ var getPartitionTests = []getPartitionTest{
+ {"b80362143ac3221d15a75f4bd1af3fac", 18, 188429},
+ {"00db344e979c8c8fa4376dc60ba8102e", 18, 876},
+ {"01342cbbf02d9b27396ac937b0f049e1", 18, 1232},
+ {"ffffc63ac2fa908fc137e7e0f1c4df97", 18, 262143},
+ }
+
+ for _, tt := range getPartitionTests {
+ partition, err := getPartitionFromOhash([]byte(tt.ohash), tt.bitCount)
+ if err != nil {
+ t.Error(err)
+ }
+ if partition != tt.expectedPartition {
+ t.Errorf("For ohash: %s, got partition %d, expected %d\n", tt.ohash, partition, tt.expectedPartition)
+ }
+ }
+
+ // Test invalid data, too short
+ invalidHash := []byte("abcd")
+ bitCount := 18
+ _, err := getPartitionFromOhash(invalidHash, bitCount)
+ if err == nil {
+ t.Fatalf("Should fail to getPartitionFromOhash for: %x, %d", invalidHash, bitCount)
+ }
+
+ // invalid md5
+ invalidHash = []byte("zzzz2cbbf02d9b27396ac937b0f049e1")
+ _, err = getPartitionFromOhash(invalidHash, bitCount)
+ if err == nil {
+ t.Fatalf("Should fail to getPartitionFromOhash for: %x, %d", invalidHash, bitCount)
+ }
+}
+
+type getLastPartitionTest struct {
+ bitCount int
+ expectedPartition uint64
+}
+
+func TestGetLastPartition(t *testing.T) {
+ var getLastPartitionTests = []getLastPartitionTest{
+ {18, 262143},
+ {17, 131071},
+ {16, 65535},
+ }
+
+ for _, tt := range getLastPartitionTests {
+ partition, err := getLastPartition(tt.bitCount)
+ if err != nil {
+ t.Error(err)
+ }
+ if partition != tt.expectedPartition {
+ t.Errorf("For bitcount: %d, got last partition: %d, expected %d\n", tt.bitCount, partition, tt.expectedPartition)
+ }
+ }
+}
+
+type getObjTest struct {
+ partition uint64
+ bitCount int
+ expectedPrefix []byte
+}
+
+func TestGetObjPrefixFromPartition(t *testing.T) {
+ var getObjTests = []getObjTest{
+ {876, 18, []byte("00db0000000000000000000000000000")},
+ {209827, 18, []byte("cce8c000000000000000000000000000")},
+ {260177, 18, []byte("fe144000000000000000000000000000")},
+ {260179, 18, []byte("fe14c000000000000000000000000000")},
+ {260180, 18, []byte("fe150000000000000000000000000000")},
+ }
+
+ for _, tt := range getObjTests {
+ prefix, err := getObjPrefixFromPartition(tt.partition, tt.bitCount)
+ if err != nil {
+ t.Error(err)
+ }
+ if bytes.Compare(prefix, tt.expectedPrefix) != 0 {
+ t.Errorf("For partition: %d, bitCount: %d, got prefix: %s, expected %s\n", tt.partition, tt.bitCount, prefix, tt.expectedPrefix)
+ }
+ }
+}
+
+func TestGetEncodedObjPrefixFromPartition(t *testing.T) {
+ var getObjTests = []getObjTest{
+ {876, 18, []byte("\x00\xdb\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00")},
+ {209827, 18, []byte("\xcc\xe8\xc0\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00")},
+ {260177, 18, []byte("\xfe\x14\x40\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00")},
+ {260179, 18, []byte("\xfe\x14\xc0\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00")},
+ {260180, 18, []byte("\xfe\x15\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00")},
+ }
+
+ for _, tt := range getObjTests {
+ prefix, err := getEncodedObjPrefixFromPartition(tt.partition, tt.bitCount)
+ if err != nil {
+ t.Error(err)
+ }
+ if bytes.Compare(prefix, tt.expectedPrefix) != 0 {
+ t.Errorf("For partition: %d, bitCount: %d, got prefix: %x, expected %x\n", tt.partition, tt.bitCount, prefix, tt.expectedPrefix)
+ }
+ }
+}
diff --git a/go/swift-rpc-losf/utils.go b/go/swift-rpc-losf/utils.go
new file mode 100644
index 000000000..c8b9f963a
--- /dev/null
+++ b/go/swift-rpc-losf/utils.go
@@ -0,0 +1,102 @@
+// Copyright (c) 2010-2012 OpenStack Foundation
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+// implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main
+
+import (
+ "bufio"
+ "fmt"
+ "golang.org/x/sys/unix"
+ "os"
+ "strings"
+)
+
+// returns true is dirPath is mounted, false otherwise
+func isMounted(dirPath string) (bool, error) {
+ f, err := os.Open("/proc/mounts")
+ if err != nil {
+ return false, err
+ }
+ defer f.Close()
+
+ scanner := bufio.NewScanner(f)
+ for scanner.Scan() {
+ elems := strings.Split(scanner.Text(), " ")
+ if dirPath == elems[1] {
+ return true, nil
+ }
+ }
+
+ if err := scanner.Err(); err != nil {
+ return false, err
+ }
+
+ return false, nil
+}
+
+// Returns true if path exists and is a directory, false otherwise
+func dirExists(dirPath string) (bool, error) {
+ stat, err := os.Stat(dirPath)
+ if err != nil {
+ if os.IsNotExist(err) {
+ return false, nil
+ }
+ return false, err
+ }
+
+ if stat.IsDir() {
+ return true, nil
+ }
+ return false, nil
+}
+
+// Returns the baseDir string from rootDir
+func getBaseDirName(rootDir string, policyIdx int) string {
+ if policyIdx == 0 {
+ return rootDir
+ }
+
+ return fmt.Sprintf("%s-%d", rootDir, policyIdx)
+}
+
+// Create a file on the filesystem that will signal to the object-server
+// that the KV cannot be used. (done along with check_mount)
+func CreateDirtyFile(dirtyFilePath string) (err error) {
+ f, err := os.Create(dirtyFilePath)
+ if err != nil {
+ return
+ }
+ f.Close()
+ return
+}
+
+// global variable, lest the file will be autoclosed and the lock released when
+// the lockSocket() function returns.
+var lockFile *os.File
+
+// Acquire a lock on a file to protect the RPC socket.
+// Does not block and will return an error if the lock cannot be acquired
+// There is no explicit unlock, it will be unlocked when the process stops
+func lockSocket(socketPath string) (err error) {
+ lockFilePath := fmt.Sprintf("%s.lock", socketPath)
+
+ lockFile, err = os.OpenFile(lockFilePath, os.O_WRONLY|os.O_CREATE, 00600)
+ if err != nil {
+ return
+ }
+
+ err = unix.Flock(int(lockFile.Fd()), unix.LOCK_EX|unix.LOCK_NB)
+ return
+}