1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
|
# Copyright (c) 2010-2015 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import subprocess
import six
from eventlet import sleep
from swift.common.daemon import Daemon
from swift.common.ring.utils import is_local_device
from swift.common.storage_policy import POLICIES, get_policy_string
from swift.common.utils import PrefixLoggerAdapter, get_logger, \
config_true_value, whataremyips
from swift.obj import rpc_http as rpc
class ObjectRpcManager(Daemon):
def __init__(self, conf, logger=None):
self.conf = conf
self.logger = PrefixLoggerAdapter(
logger or get_logger(conf, log_route='object-rpcmanager'), {})
self.devices_dir = conf.get('devices', '/srv/node')
self.mount_check = config_true_value(conf.get('mount_check', 'true'))
# use native golang leveldb implementation
self.use_go_leveldb = config_true_value(
conf.get('use_go_leveldb', 'false'))
self.swift_dir = conf.get('swift_dir', '/etc/swift')
self.bind_ip = conf.get('bind_ip', '0.0.0.0')
self.servers_per_port = int(conf.get('servers_per_port', '0') or 0)
self.port = None if self.servers_per_port else \
int(conf.get('bind_port', 6200))
self.volcheck = conf.get('volcheck', '')
self.losf_bin = conf.get('losf_bin', '')
self.healthcheck_interval = int(conf.get('healthcheck_interval', 10))
# check if the path to LOSF binary and volume checker exist
if not os.path.exists(self.volcheck):
raise AttributeError(
"Invalid or missing volcheck in your config file")
if not os.path.exists(self.losf_bin):
raise AttributeError(
"Invalid or missing losf_bin in your config file")
# this should select only kv enabled policies
# (requires loading policies?)
self.policies = POLICIES
self.ring_check_interval = int(conf.get('ring_check_interval', 15))
# add RPC state check interval
self.kv_disks = self.get_kv_disks()
def load_object_ring(self, policy):
"""
Make sure the policy's rings are loaded.
:param policy: the StoragePolicy instance
:returns: appropriate ring object
"""
policy.load_ring(self.swift_dir)
return policy.object_ring
# add filter for KV only
def get_policy2devices(self):
ips = whataremyips(self.bind_ip)
policy2devices = {}
for policy in self.policies:
self.load_object_ring(policy)
local_devices = list(six.moves.filter(
lambda dev: dev and is_local_device(
ips, self.port,
dev['replication_ip'], dev['replication_port']),
policy.object_ring.devs))
policy2devices[policy] = local_devices
return policy2devices
def get_kv_disks(self):
"""
Returns a dict of KV backed policies to list of devices
:return: dict
"""
policy2devices = self.get_policy2devices()
kv_disks = {}
for policy, devs in policy2devices.items():
if policy.diskfile_module.endswith(('.kv', '.hybrid')):
kv_disks[policy.idx] = [d['device'] for d in devs]
return kv_disks
def get_worker_args(self, once=False, **kwargs):
"""
Take the set of all local devices for this node from all the KV
backed policies rings.
:param once: False if the worker(s) will be daemonized, True if the
worker(s) will be run once
:param kwargs: optional overrides from the command line
"""
# Note that this get re-used in is_healthy
self.kv_disks = self.get_kv_disks()
# TODO: what to do in this case ?
if not self.kv_disks:
# we only need a single worker to do nothing until a ring change
yield dict(multiprocess_worker_index=0)
return
for policy_idx, devs in self.kv_disks.iteritems():
for dev in devs:
disk_path = os.path.join(self.devices_dir, dev)
losf_dir = get_policy_string('losf', policy_idx)
socket_path = os.path.join(disk_path, losf_dir, 'rpc.socket')
yield dict(policy_idx=policy_idx, disk_path=disk_path)
yield dict(policy_idx=policy_idx, disk_path=disk_path,
socket_path=socket_path, statecheck=True)
def is_healthy(self):
return self.get_kv_disks() == self.kv_disks
def run_forever(self, policy_idx=None, disk_path=None, socket_path=None,
statecheck=None, *args, **kwargs):
if statecheck:
volcheck_args = [self.volcheck, '--disk_path', str(disk_path),
'--policy_idx', str(policy_idx),
'--keepuser', '--repair', '--no_prompt']
# sleep a bit to let the RPC server start. Otherwise it will
# timeout and take longer to get the checks started.
sleep(2)
while True:
try:
state = rpc.get_kv_state(socket_path)
if not state.isClean:
self.logger.debug(volcheck_args)
subprocess.call(volcheck_args)
except Exception:
self.logger.exception("state check failed, continue")
sleep(10)
else:
losf_args = ['swift-losf-rpc', '-diskPath', str(disk_path),
'-debug', 'info',
'-policyIdx', str(policy_idx),
'-waitForMount={}'.format(str(self.mount_check))]
if self.use_go_leveldb:
losf_args.append('-useGoLevelDB')
os.execv(self.losf_bin, losf_args)
|