summaryrefslogtreecommitdiff
path: root/go/swift-rpc-losf/main.go
diff options
context:
space:
mode:
Diffstat (limited to 'go/swift-rpc-losf/main.go')
-rw-r--r--go/swift-rpc-losf/main.go174
1 files changed, 174 insertions, 0 deletions
diff --git a/go/swift-rpc-losf/main.go b/go/swift-rpc-losf/main.go
new file mode 100644
index 000000000..9bf6489d3
--- /dev/null
+++ b/go/swift-rpc-losf/main.go
@@ -0,0 +1,174 @@
+// Copyright (c) 2010-2012 OpenStack Foundation
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+// implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// KV store for LOFS
+package main
+
+import (
+ "flag"
+ "fmt"
+ "github.com/sirupsen/logrus"
+ "net/http"
+ _ "net/http/pprof"
+ "os"
+ "os/signal"
+ "path"
+ "syscall"
+ "time"
+)
+
+// Name of the base losf root directory, relative to the swift disk
+const rootDirBase = "losf"
+
+// Will run checks and create rootDir if needed
+// Returns the path to rootDir
+func diskSetup(diskPath string, policyIdx int, waitForMount bool) (string, error) {
+
+ if waitForMount {
+ log.Debugf("waitForMount is set, if %s is not mounted, will wait until it is", diskPath)
+ }
+ for waitForMount {
+ nowMounted, err := isMounted(diskPath)
+ if err != nil {
+ return "", err
+ }
+ if nowMounted {
+ break
+ }
+ time.Sleep(time.Second / 10)
+ }
+
+ // OVH patch to match a similar mechanism in the python code.
+ // If a ".offline" file exists in srv node, do not start (/srv/node/disk-XX.offline)
+ offlineFile := fmt.Sprintf("%s%s", diskPath, ".offline")
+ offlineFileExists := false
+ if _, err := os.Stat(offlineFile); err == nil {
+ offlineFileExists = true
+ log.Debugf("offline file exists: %s", offlineFile)
+ }
+ for offlineFileExists {
+ if _, err := os.Stat(offlineFile); os.IsNotExist(err) {
+ offlineFileExists = false
+ }
+ time.Sleep(time.Second * 10)
+ }
+
+ rootDir := path.Join(diskPath, getBaseDirName(rootDirBase, policyIdx))
+ log.Debug(rootDir)
+
+ rootDirExists, err := dirExists(rootDir)
+ if err != nil {
+ return "", err
+ }
+ if !rootDirExists {
+ err := os.Mkdir(rootDir, os.FileMode(0700))
+ if err != nil {
+ return "", err
+ }
+ }
+ return rootDir, nil
+}
+
+// Parse options and starts the rpc server.
+func main() {
+ var dbDir, socketPath string
+ var kv KV
+
+ setupLogging()
+
+ debugLevels := map[string]logrus.Level{
+ "panic": logrus.PanicLevel,
+ "fatal": logrus.FatalLevel,
+ "error": logrus.ErrorLevel,
+ "warn": logrus.WarnLevel,
+ "info": logrus.InfoLevel,
+ "debug": logrus.DebugLevel,
+ }
+
+ var diskPath = flag.String("diskPath", "", "Swift disk path (/srv/node/disk-xyz)")
+ var policyIdx = flag.Int("policyIdx", 0, "Policy index")
+ var waitForMount = flag.Bool("waitForMount", true, "Wait for diskPath to be mounted. If diskPath exists but is not a mount, it will wait")
+ var profilerAddr = flag.String("profilerAddr", "", "Start profiler and make it available at this address (127.0.0.1:8081)")
+ var debugLevel = flag.String("debug", "info", "Debug level (error, warn, info, debug)")
+ var allowRoot = flag.Bool("allowRoot", false, "Allow process to run as root")
+ var useGoLevelDB = flag.Bool("useGoLevelDB", false, "Use native golang levelDB package")
+
+ flag.Parse()
+
+ log.SetLevel(debugLevels[*debugLevel])
+
+ if !*allowRoot && os.Getuid() == 0 {
+ log.Fatal("running as root, and allowRoot is false")
+ }
+
+ if *diskPath == "" {
+ log.Fatal("diskPath not specified")
+ }
+
+ rootDir, err := diskSetup(*diskPath, *policyIdx, *waitForMount)
+ if err != nil {
+ panic(err)
+ }
+
+ dbDir = path.Join(rootDir, "db")
+ socketPath = path.Join(rootDir, "rpc.socket")
+ rlog := log.WithFields(logrus.Fields{"socket": socketPath})
+
+ // install signal handler
+ stopChan := make(chan os.Signal, 1)
+ signal.Notify(stopChan, os.Interrupt, syscall.SIGTERM)
+
+ // start http server for profiling
+ if *profilerAddr != "" {
+ go func() {
+ rlog.Debug(http.ListenAndServe(*profilerAddr, nil))
+ }()
+ }
+
+ // Acquire lock to protect socket
+ rlog.Debug("Locking socket")
+ err = lockSocket(socketPath)
+ if err != nil {
+ rlog.Fatalf("Failed to lock RPC socket: %s", err)
+ }
+ os.Remove(socketPath)
+
+ // Open the database
+ if *useGoLevelDB {
+ kv, err = openGoLevelDb(dbDir)
+ } else {
+ kv, err = openLevigoDB(dbDir)
+ }
+
+ if err != nil {
+ rlog.Fatal(err)
+ }
+
+ // Check the kv was stopped properly
+ isClean, err := setKvState(kv)
+ if err != nil {
+ rlog.Fatal(err)
+ }
+ log.Infof("kv is clean: %v", isClean)
+
+ // Start the RPC server
+ rlog.Info("Starting RPC server")
+ err = runServer(kv, *diskPath, socketPath, stopChan, isClean)
+ if err != nil {
+ rlog.Fatal(err)
+ }
+
+ return
+}