summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xbin/swift-account-auditor31
-rwxr-xr-xbin/swift-account-reaper31
-rwxr-xr-xbin/swift-account-replicator31
-rwxr-xr-xbin/swift-container-auditor31
-rwxr-xr-xbin/swift-container-replicator30
-rwxr-xr-xbin/swift-container-updater24
-rwxr-xr-xbin/swift-object-auditor28
-rwxr-xr-xbin/swift-object-replicator49
-rwxr-xr-xbin/swift-object-updater25
-rw-r--r--swift/account/auditor.py9
-rw-r--r--swift/account/reaper.py8
-rw-r--r--swift/account/replicator.py26
-rw-r--r--swift/common/daemon.py60
-rw-r--r--swift/common/db_replicator.py11
-rw-r--r--swift/common/utils.py2
-rw-r--r--swift/container/auditor.py8
-rw-r--r--swift/container/replicator.py25
-rw-r--r--swift/container/updater.py8
-rw-r--r--swift/obj/auditor.py10
-rw-r--r--swift/obj/replicator.py42
-rw-r--r--swift/obj/updater.py10
-rw-r--r--test/unit/common/test_db_replicator.py4
-rw-r--r--test/unit/container/test_updater.py14
-rw-r--r--test/unit/obj/test_replicator.py6
-rw-r--r--test/unit/obj/test_updater.py12
25 files changed, 228 insertions, 307 deletions
diff --git a/bin/swift-account-auditor b/bin/swift-account-auditor
index 681adc6a2..a71597959 100755
--- a/bin/swift-account-auditor
+++ b/bin/swift-account-auditor
@@ -14,10 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import os
-import signal
import sys
-from ConfigParser import ConfigParser
from swift.account.auditor import AccountAuditor
from swift.common import utils
@@ -26,32 +23,6 @@ if __name__ == '__main__':
if len(sys.argv) < 2:
print "Usage: swift-account-auditor CONFIG_FILE [once]"
sys.exit()
-
once = len(sys.argv) > 2 and sys.argv[2] == 'once'
-
conf = utils.readconf(sys.argv[1], 'account-auditor')
- logger = utils.get_logger(conf)
- # log uncaught exceptions
- sys.excepthook = lambda *exc_info: \
- logger.critical('UNCAUGHT EXCEPTION', exc_info=exc_info)
- sys.stdout = sys.stderr = utils.LoggerFileObject(logger)
-
- utils.drop_privileges(conf.get('user', 'swift'))
-
- try:
- os.setsid()
- except OSError:
- pass
-
- def kill_children(*args):
- signal.signal(signal.SIGTERM, signal.SIG_IGN)
- os.killpg(0, signal.SIGTERM)
- sys.exit()
-
- signal.signal(signal.SIGTERM, kill_children)
-
- auditor = AccountAuditor(conf)
- if once:
- auditor.audit_once()
- else:
- auditor.audit_forever()
+ auditor = AccountAuditor(conf).run(once)
diff --git a/bin/swift-account-reaper b/bin/swift-account-reaper
index 444d19a09..90496c64e 100755
--- a/bin/swift-account-reaper
+++ b/bin/swift-account-reaper
@@ -14,10 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import os
-import signal
import sys
-from ConfigParser import ConfigParser
from swift.account.reaper import AccountReaper
from swift.common import utils
@@ -26,32 +23,6 @@ if __name__ == '__main__':
if len(sys.argv) < 2:
print "Usage: account-reaper CONFIG_FILE [once]"
sys.exit()
-
once = len(sys.argv) > 2 and sys.argv[2] == 'once'
-
conf = utils.readconf(sys.argv[1], 'account-reaper')
- logger = utils.get_logger(conf)
- # log uncaught exceptions
- sys.excepthook = lambda *exc_info: \
- logger.critical('UNCAUGHT EXCEPTION', exc_info=exc_info)
- sys.stdout = sys.stderr = utils.LoggerFileObject(logger)
-
- utils.drop_privileges(conf.get('user', 'swift'))
-
- try:
- os.setsid()
- except OSError:
- pass
-
- def kill_children(*args):
- signal.signal(signal.SIGTERM, signal.SIG_IGN)
- os.killpg(0, signal.SIGTERM)
- sys.exit()
-
- signal.signal(signal.SIGTERM, kill_children)
-
- reaper = AccountReaper(conf)
- if once:
- reaper.reap_once()
- else:
- reaper.reap_forever()
+ reaper = AccountReaper(conf).run(once)
diff --git a/bin/swift-account-replicator b/bin/swift-account-replicator
index 18bea931a..c71c326b8 100755
--- a/bin/swift-account-replicator
+++ b/bin/swift-account-replicator
@@ -15,31 +15,14 @@
# limitations under the License.
import sys
-from ConfigParser import ConfigParser
-import getopt
-from swift.account import server as account_server
-from swift.common import db, db_replicator, utils
-
-class AccountReplicator(db_replicator.Replicator):
- server_type = 'account'
- ring_file = 'account.ring.gz'
- brokerclass = db.AccountBroker
- datadir = account_server.DATADIR
- default_port = 6002
+from swift.common import utils
+from swift.account.replicator import AccountReplicator
if __name__ == '__main__':
- optlist, args = getopt.getopt(sys.argv[1:], '', ['once'])
-
- if not args:
- print "Usage: swift-account-replicator <--once> CONFIG_FILE [once]"
- sys.exit()
-
- once = len(args) > 1 and args[1] == 'once'
+ if len(sys.argv) < 2:
+ print "Usage: swift-account-replicator CONFIG_FILE [once]"
+ sys.exit(1)
+ once = len(sys.argv) > 2 and sys.argv[2] == 'once'
conf = utils.readconf(sys.argv[1], 'account-replicator')
- utils.drop_privileges(conf.get('user', 'swift'))
- if once or '--once' in [opt[0] for opt in optlist]:
- AccountReplicator(conf).replicate_once()
- else:
- AccountReplicator(conf).replicate_forever()
-
+ AccountReplicator(conf).run(once)
diff --git a/bin/swift-container-auditor b/bin/swift-container-auditor
index 3f22fbf69..b3472c54a 100755
--- a/bin/swift-container-auditor
+++ b/bin/swift-container-auditor
@@ -14,10 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import os
-import signal
import sys
-from ConfigParser import ConfigParser
from swift.container.auditor import ContainerAuditor
from swift.common import utils
@@ -26,32 +23,6 @@ if __name__ == '__main__':
if len(sys.argv) < 2:
print "Usage: swift-container-auditor CONFIG_FILE [once]"
sys.exit()
-
once = len(sys.argv) > 2 and sys.argv[2] == 'once'
-
conf = utils.readconf(sys.argv[1], 'container-auditor')
- logger = utils.get_logger(conf)
- # log uncaught exceptions
- sys.excepthook = lambda *exc_info: \
- logger.critical('UNCAUGHT EXCEPTION', exc_info=exc_info)
- sys.stdout = sys.stderr = utils.LoggerFileObject(logger)
-
- utils.drop_privileges(conf.get('user', 'swift'))
-
- try:
- os.setsid()
- except OSError:
- pass
-
- def kill_children(*args):
- signal.signal(signal.SIGTERM, signal.SIG_IGN)
- os.killpg(0, signal.SIGTERM)
- sys.exit()
-
- signal.signal(signal.SIGTERM, kill_children)
-
- auditor = ContainerAuditor(conf)
- if once:
- auditor.audit_once()
- else:
- auditor.audit_forever()
+ ContainerAuditor(conf).run(once)
diff --git a/bin/swift-container-replicator b/bin/swift-container-replicator
index 6e8ae0f41..a0594142d 100755
--- a/bin/swift-container-replicator
+++ b/bin/swift-container-replicator
@@ -15,31 +15,15 @@
# limitations under the License.
import sys
-from ConfigParser import ConfigParser
-import getopt
-from swift.container import server as container_server
-from swift.common import db, db_replicator, utils
-
-class ContainerReplicator(db_replicator.Replicator):
- server_type = 'container'
- ring_file = 'container.ring.gz'
- brokerclass = db.ContainerBroker
- datadir = container_server.DATADIR
- default_port = 6001
+from swift.common import db, utils
+from swift.container.replicator import ContainerReplicator
if __name__ == '__main__':
- optlist, args = getopt.getopt(sys.argv[1:], '', ['once'])
-
- if not args:
- print "Usage: swift-container-replicator <--once> CONFIG_FILE [once]"
+ if len(sys.argv) < 2:
+ print "Usage: swift-container-replicator CONFIG_FILE [once]"
sys.exit(1)
-
- once = len(args) > 1 and args[1] == 'once'
- conf = utils.readconf(args[0], 'container-replicator')
- utils.drop_privileges(conf.get('user', 'swift'))
- if once or '--once' in [opt[0] for opt in optlist]:
- ContainerReplicator(conf).replicate_once()
- else:
- ContainerReplicator(conf).replicate_forever()
+ once = len(sys.argv) > 2 and sys.argv[2] == 'once'
+ conf = utils.readconf(sys.argv[1], 'container-replicator')
+ ContainerReplicator(conf).run(once)
diff --git a/bin/swift-container-updater b/bin/swift-container-updater
index 92b7017fa..ed22d2990 100755
--- a/bin/swift-container-updater
+++ b/bin/swift-container-updater
@@ -14,10 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import os
-import signal
import sys
-from ConfigParser import ConfigParser
from swift.container.updater import ContainerUpdater
from swift.common import utils
@@ -26,25 +23,6 @@ if __name__ == '__main__':
if len(sys.argv) < 2:
print "Usage: swift-container-updater CONFIG_FILE [once]"
sys.exit()
-
once = len(sys.argv) > 2 and sys.argv[2] == 'once'
conf = utils.readconf(sys.argv[1], 'container-updater')
- utils.drop_privileges(conf.get('user', 'swift'))
-
- try:
- os.setsid()
- except OSError:
- pass
-
- def kill_children(*args):
- signal.signal(signal.SIGTERM, signal.SIG_IGN)
- os.killpg(0, signal.SIGTERM)
- sys.exit()
-
- signal.signal(signal.SIGTERM, kill_children)
-
- updater = ContainerUpdater(conf)
- if once:
- updater.update_once_single_threaded()
- else:
- updater.update_forever()
+ ContainerUpdater(conf).run(once)
diff --git a/bin/swift-object-auditor b/bin/swift-object-auditor
index a80065414..d60bcb614 100755
--- a/bin/swift-object-auditor
+++ b/bin/swift-object-auditor
@@ -14,8 +14,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import os
-import signal
import sys
from swift.obj.auditor import ObjectAuditor
@@ -28,28 +26,4 @@ if __name__ == '__main__':
once = len(sys.argv) > 2 and sys.argv[2] == 'once'
conf = utils.readconf(sys.argv[1], 'object-auditor')
- logger = utils.get_logger(conf)
- # log uncaught exceptions
- sys.excepthook = lambda *exc_info: \
- logger.critical('UNCAUGHT EXCEPTION', exc_info=exc_info)
- sys.stdout = sys.stderr = utils.LoggerFileObject(logger)
-
- utils.drop_privileges(conf.get('user', 'swift'))
-
- try:
- os.setsid()
- except OSError:
- pass
-
- def kill_children(*args):
- signal.signal(signal.SIGTERM, signal.SIG_IGN)
- os.killpg(0, signal.SIGTERM)
- sys.exit()
-
- signal.signal(signal.SIGTERM, kill_children)
-
- auditor = ObjectAuditor(conf)
- if once:
- auditor.audit_once()
- else:
- auditor.audit_forever()
+ ObjectAuditor(conf).run(once)
diff --git a/bin/swift-object-replicator b/bin/swift-object-replicator
index c01c96aff..cc8c32fdd 100755
--- a/bin/swift-object-replicator
+++ b/bin/swift-object-replicator
@@ -15,54 +15,15 @@
# limitations under the License.
import sys
-import logging
-import time
-
-from eventlet import sleep, hubs
-hubs.use_hub('poll')
from swift.obj.replicator import ObjectReplicator
-from swift.common.utils import get_logger, drop_privileges, LoggerFileObject, \
- readconf
-
-TRUE_VALUES = set(('true', '1', 'yes', 'True', 'Yes'))
+from swift.common import utils
if __name__ == '__main__':
if len(sys.argv) < 2:
print "Usage: swift-object-replicator CONFIG_FILE [once]"
sys.exit()
- conf = readconf(sys.argv[1], "object-replicator")
- once = len(sys.argv) > 2 and sys.argv[2] == 'once'
- logger = get_logger(conf)
- # log uncaught exceptions
- sys.excepthook = lambda *exc_info: \
- logger.critical('UNCAUGHT EXCEPTION', exc_info=exc_info)
- sys.stdout = sys.stderr = LoggerFileObject(logger)
- drop_privileges(conf.get('user', 'swift'))
- if not once and conf.get('daemonize', 'true') in TRUE_VALUES:
- logger.info("Starting object replicator in daemon mode.")
- # Run the replicator continually
- while True:
- start = time.time()
- logger.info("Starting object replication pass.")
- # Run the replicator
- replicator = ObjectReplicator(conf, logger)
- replicator.run()
- total = (time.time() - start)/60
- # Reload the config
- logger.info("Object replication complete. (%.02f minutes)" % total)
- conf = read_configs(sys.argv[1])
- if conf.get('daemonize', 'true') not in TRUE_VALUES:
- # Stop running
- logger.info("Daemon mode turned off in config, stopping.")
- break
- logger.debug('Replication sleeping for %s seconds.' %
- conf['run_pause'])
- sleep(int(conf['run_pause']))
- else:
- start = time.time()
- logger.info("Running object replicator in script mode.")
- replicator = ObjectReplicator(conf, logger)
- replicator.run()
- total = (time.time() - start)/60
- logger.info("Object replication complete. (%.02f minutes)" % total)
+ conf = utils.readconf(sys.argv[1], "object-replicator")
+ once = (len(sys.argv) > 2 and sys.argv[2] == 'once') or \
+ conf.get('daemonize', 'true') not in utils.TRUE_VALUES
+ ObjectReplicator(conf).run(once)
diff --git a/bin/swift-object-updater b/bin/swift-object-updater
index 375e63ec8..d24cdbcfe 100755
--- a/bin/swift-object-updater
+++ b/bin/swift-object-updater
@@ -14,8 +14,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import os
-import signal
import sys
from swift.obj.updater import ObjectUpdater
@@ -25,27 +23,6 @@ if __name__ == '__main__':
if len(sys.argv) < 2:
print "Usage: swift-object-updater CONFIG_FILE [once]"
sys.exit(1)
-
once = len(sys.argv) > 2 and sys.argv[2] == 'once'
-
conf = utils.readconf(sys.argv[1], 'object-updater')
- utils.drop_privileges(conf.get('user', 'swift'))
-
- try:
- os.setsid()
- except OSError:
- pass
-
- def kill_children(*args):
- signal.signal(signal.SIGTERM, signal.SIG_IGN)
- os.killpg(0, signal.SIGTERM)
- sys.exit()
-
- signal.signal(signal.SIGTERM, kill_children)
-
- updater = ObjectUpdater(conf)
- if once:
- updater.update_once_single_threaded()
- else:
- updater.update_forever()
-
+ ObjectUpdater(conf).run(once)
diff --git a/swift/account/auditor.py b/swift/account/auditor.py
index 249d9f436..b145f663a 100644
--- a/swift/account/auditor.py
+++ b/swift/account/auditor.py
@@ -26,16 +26,18 @@ from swift.common.bufferedhttp import http_connect
from swift.common.exceptions import ConnectionTimeout
from swift.common.ring import Ring
from swift.common.utils import get_logger
+from swift.common.daemon import Daemon
class AuditException(Exception):
pass
-class AccountAuditor(object):
+class AccountAuditor(Daemon):
"""Audit accounts."""
def __init__(self, conf):
+ self.conf = conf
self.logger = get_logger(conf, 'account-auditor')
self.devices = conf.get('devices', '/srv/node')
self.mount_check = conf.get('mount_check', 'true').lower() in \
@@ -60,12 +62,11 @@ class AccountAuditor(object):
"""
if not self.container_ring:
self.logger.debug(
-
'Loading container ring from %s' % self.container_ring_path)
self.container_ring = Ring(self.container_ring_path)
return self.container_ring
- def audit_forever(self): # pragma: no cover
+ def run_forever(self): # pragma: no cover
"""Run the account audit until stopped."""
reported = time.time()
time.sleep(random() * self.interval)
@@ -92,7 +93,7 @@ class AccountAuditor(object):
if elapsed < self.interval:
time.sleep(self.interval - elapsed)
- def audit_once(self):
+ def run_once(self):
"""Run the account audit once."""
self.logger.info('Begin account audit "once" mode')
begin = time.time()
diff --git a/swift/account/reaper.py b/swift/account/reaper.py
index f747e3ce0..d93b36371 100644
--- a/swift/account/reaper.py
+++ b/swift/account/reaper.py
@@ -27,9 +27,10 @@ from swift.common.direct_client import ClientException, \
direct_delete_container, direct_delete_object, direct_get_container
from swift.common.ring import Ring
from swift.common.utils import get_logger, whataremyips
+from swift.common.daemon import Daemon
-class AccountReaper(object):
+class AccountReaper(Daemon):
"""
Removes data from status=DELETED accounts. These are accounts that have
been asked to be removed by the reseller via services
@@ -51,6 +52,7 @@ class AccountReaper(object):
"""
def __init__(self, conf):
+ self.conf = conf
self.logger = get_logger(conf)
self.devices = conf.get('devices', '/srv/node')
self.mount_check = conf.get('mount_check', 'true').lower() in \
@@ -95,7 +97,7 @@ class AccountReaper(object):
self.object_ring = Ring(self.object_ring_path)
return self.object_ring
- def reap_forever(self):
+ def run_forever(self):
"""
Main entry point when running the reaper in its normal daemon mode.
This repeatedly calls :func:`reap_once` no quicker than the
@@ -110,7 +112,7 @@ class AccountReaper(object):
if elapsed < self.interval:
sleep(self.interval - elapsed)
- def reap_once(self):
+ def run_once(self):
"""
Main entry point when running the reaper in 'once' mode, where it will
do a single pass over all accounts on the server. This is called
diff --git a/swift/account/replicator.py b/swift/account/replicator.py
new file mode 100644
index 000000000..6d0ffb9ce
--- /dev/null
+++ b/swift/account/replicator.py
@@ -0,0 +1,26 @@
+# Copyright (c) 2010 OpenStack, LLC.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from swift.account import server as account_server
+from swift.common import db, db_replicator
+
+
+class AccountReplicator(db_replicator.Replicator):
+ server_type = 'account'
+ ring_file = 'account.ring.gz'
+ brokerclass = db.AccountBroker
+ datadir = account_server.DATADIR
+ default_port = 6002
+
diff --git a/swift/common/daemon.py b/swift/common/daemon.py
new file mode 100644
index 000000000..80d3eff4c
--- /dev/null
+++ b/swift/common/daemon.py
@@ -0,0 +1,60 @@
+# Copyright (c) 2010 OpenStack, LLC.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import sys
+import signal
+from swift.common import utils
+
+class Daemon(object):
+ """Daemon base class"""
+
+ def __init__(self, conf):
+ self.conf = conf
+ self.logger = utils.get_logger(conf, 'swift-daemon')
+
+ def run_once(self):
+ """Override this to run the script once"""
+ raise NotImplementedError('run_once not implemented')
+
+ def run_forever(self):
+ """Override this to run forever"""
+ raise NotImplementedError('run_forever not implemented')
+
+ def run(self, once=False):
+ """Run the daemon"""
+ # log uncaught exceptions
+ sys.excepthook = lambda *exc_info: \
+ self.logger.critical('UNCAUGHT EXCEPTION', exc_info=exc_info)
+ sys.stdout = sys.stderr = utils.LoggerFileObject(self.logger)
+
+ utils.drop_privileges(self.conf.get('user', 'swift'))
+
+ try:
+ os.setsid()
+ except OSError:
+ pass
+
+ def kill_children(*args):
+ signal.signal(signal.SIGTERM, signal.SIG_IGN)
+ os.killpg(0, signal.SIGTERM)
+ sys.exit()
+
+ signal.signal(signal.SIGTERM, kill_children)
+
+ if once:
+ self.run_once()
+ else:
+ self.run_forever()
diff --git a/swift/common/db_replicator.py b/swift/common/db_replicator.py
index 3e7433fd3..c777930fc 100644
--- a/swift/common/db_replicator.py
+++ b/swift/common/db_replicator.py
@@ -33,6 +33,7 @@ from swift.common.utils import get_logger, whataremyips, storage_directory, \
from swift.common import ring
from swift.common.bufferedhttp import BufferedHTTPConnection
from swift.common.exceptions import DriveNotMounted, ConnectionTimeout
+from swift.common.daemon import Daemon
def quarantine_db(object_file, server_type):
@@ -84,14 +85,14 @@ class ReplConnection(BufferedHTTPConnection):
return None
-class Replicator(object):
+class Replicator(Daemon):
"""
Implements the logic for directing db replication.
"""
def __init__(self, conf):
- self.logger = \
- get_logger(conf)
+ self.conf = conf
+ self.logger = get_logger(conf)
# log uncaught exceptions
sys.excepthook = lambda * exc_info: \
self.logger.critical('UNCAUGHT EXCEPTION', exc_info=exc_info)
@@ -396,7 +397,7 @@ class Replicator(object):
except StopIteration:
its.remove(it)
- def replicate_once(self):
+ def run_once(self):
"""Run a replication pass once."""
self._zero_stats()
dirs = []
@@ -425,7 +426,7 @@ class Replicator(object):
self.logger.info('Replication run OVER')
self._report_stats()
- def replicate_forever(self):
+ def run_forever(self):
"""
Replicate dbs under the given root in an infinite loop.
"""
diff --git a/swift/common/utils.py b/swift/common/utils.py
index 920d54d64..0028581cb 100644
--- a/swift/common/utils.py
+++ b/swift/common/utils.py
@@ -55,6 +55,8 @@ _posix_fadvise = None
# will end up with would also require knowing this suffix.
HASH_PATH_SUFFIX = os.environ.get('SWIFT_HASH_PATH_SUFFIX', 'endcap')
+# Used when reading config values
+TRUE_VALUES = set(('true', '1', 'yes', 'True', 'Yes'))
def load_libc_function(func_name):
"""
diff --git a/swift/container/auditor.py b/swift/container/auditor.py
index a41bf5358..592f4b7cd 100644
--- a/swift/container/auditor.py
+++ b/swift/container/auditor.py
@@ -27,16 +27,18 @@ from swift.common.bufferedhttp import http_connect
from swift.common.exceptions import ConnectionTimeout
from swift.common.ring import Ring
from swift.common.utils import get_logger
+from swift.common.daemon import Daemon
class AuditException(Exception):
pass
-class ContainerAuditor(object):
+class ContainerAuditor(Daemon):
"""Audit containers."""
def __init__(self, conf):
+ self.conf = conf
self.logger = get_logger(conf)
self.devices = conf.get('devices', '/srv/node')
self.mount_check = conf.get('mount_check', 'true').lower() in \
@@ -81,7 +83,7 @@ class ContainerAuditor(object):
self.object_ring = Ring(self.object_ring_path)
return self.object_ring
- def audit_forever(self): # pragma: no cover
+ def run_forever(self): # pragma: no cover
"""Run the container audit until stopped."""
reported = time.time()
time.sleep(random() * self.interval)
@@ -114,7 +116,7 @@ class ContainerAuditor(object):
if elapsed < self.interval:
time.sleep(self.interval - elapsed)
- def audit_once(self):
+ def run_once(self):
"""Run the container audit once."""
self.logger.info('Begin container audit "once" mode')
begin = time.time()
diff --git a/swift/container/replicator.py b/swift/container/replicator.py
new file mode 100644
index 000000000..c264ce680
--- /dev/null
+++ b/swift/container/replicator.py
@@ -0,0 +1,25 @@
+# Copyright (c) 2010 OpenStack, LLC.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from swift.container import server as container_server
+from swift.common import db, db_replicator
+
+class ContainerReplicator(db_replicator.Replicator):
+ server_type = 'container'
+ ring_file = 'container.ring.gz'
+ brokerclass = db.ContainerBroker
+ datadir = container_server.DATADIR
+ default_port = 6001
+
diff --git a/swift/container/updater.py b/swift/container/updater.py
index 003d06b13..646815257 100644
--- a/swift/container/updater.py
+++ b/swift/container/updater.py
@@ -28,12 +28,14 @@ from swift.common.db import ContainerBroker
from swift.common.exceptions import ConnectionTimeout
from swift.common.ring import Ring
from swift.common.utils import get_logger, whataremyips
+from swift.common.daemon import Daemon
-class ContainerUpdater(object):
+class ContainerUpdater(Daemon):
"""Update container information in account listings."""
def __init__(self, conf):
+ self.conf = conf
self.logger = get_logger(conf, 'container-updater')
self.devices = conf.get('devices', '/srv/node')
self.mount_check = conf.get('mount_check', 'true').lower() in \
@@ -78,7 +80,7 @@ class ContainerUpdater(object):
shuffle(paths)
return paths
- def update_forever(self): # pragma: no cover
+ def run_forever(self): # pragma: no cover
"""
Run the updator continuously.
"""
@@ -118,7 +120,7 @@ class ContainerUpdater(object):
if elapsed < self.interval:
time.sleep(self.interval - elapsed)
- def update_once_single_threaded(self):
+ def run_once(self):
"""
Run the updater once.
"""
diff --git a/swift/obj/auditor.py b/swift/obj/auditor.py
index d9e2145d9..2e41a54f3 100644
--- a/swift/obj/auditor.py
+++ b/swift/obj/auditor.py
@@ -28,13 +28,15 @@ from swift.common.exceptions import ConnectionTimeout
from swift.common.ring import Ring
from swift.common.utils import get_logger, renamer
from swift.common.exceptions import AuditException
+from swift.common.daemon import Daemon
-class ObjectAuditor(object):
+class ObjectAuditor(Daemon):
"""Audit objects."""
def __init__(self, conf):
- self.logger = get_logger(conf)
+ self.conf = conf
+ self.logger = get_logger(conf, 'object-auditor')
self.devices = conf.get('devices', '/srv/node')
self.mount_check = conf.get('mount_check', 'true').lower() in \
('true', 't', '1', 'on', 'yes', 'y')
@@ -63,7 +65,7 @@ class ObjectAuditor(object):
self.container_ring = Ring(self.container_ring_path)
return self.container_ring
- def audit_forever(self): # pragma: no cover
+ def run_forever(self): # pragma: no cover
"""Run the object audit until stopped."""
reported = time.time()
time.sleep(random() * self.interval)
@@ -97,7 +99,7 @@ class ObjectAuditor(object):
if elapsed < self.interval:
time.sleep(self.interval - elapsed)
- def audit_once(self):
+ def run_once(self):
"""Run the object audit once."""
self.logger.info('Begin object audit "once" mode')
begin = time.time()
diff --git a/swift/obj/replicator.py b/swift/obj/replicator.py
index 9567c3b7a..45a77980d 100644
--- a/swift/obj/replicator.py
+++ b/swift/obj/replicator.py
@@ -24,15 +24,17 @@ import itertools
import cPickle as pickle
import eventlet
-from eventlet import GreenPool, tpool, Timeout, sleep
+from eventlet import GreenPool, tpool, Timeout, sleep, hubs
from eventlet.green import subprocess
from eventlet.support.greenlets import GreenletExit
from swift.common.ring import Ring
from swift.common.utils import whataremyips, unlink_older_than, lock_path, \
- renamer, compute_eta
+ renamer, compute_eta, get_logger
from swift.common.bufferedhttp import http_connect
+from swift.common.daemon import Daemon
+hubs.use_hub('poll')
PICKLE_PROTOCOL = 2
ONE_WEEK = 604800
@@ -190,22 +192,22 @@ def get_hashes(partition_dir, do_listdir=True, reclaim_age=ONE_WEEK):
return hashed, hashes
-class ObjectReplicator(object):
+class ObjectReplicator(Daemon):
"""
Replicate objects.
Encapsulates most logic and data needed by the object replication process.
- Each call to .run() performs one replication pass. It's up to the caller
- to do this in a loop.
+ Each call to .replicate() performs one replication pass. It's up to the
+ caller to do this in a loop.
"""
- def __init__(self, conf, logger):
+ def __init__(self, conf):
"""
:param conf: configuration object obtained from ConfigParser
:param logger: logging object
"""
self.conf = conf
- self.logger = logger
+ self.logger = get_logger(conf, 'object-replicator')
self.devices_dir = conf.get('devices', '/srv/node')
self.mount_check = conf.get('mount_check', 'true').lower() in \
('true', 't', '1', 'on', 'yes', 'y')
@@ -221,6 +223,7 @@ class ObjectReplicator(object):
self.next_check = time.time() + self.ring_check_interval
self.reclaim_age = int(conf.get('reclaim_age', 86400 * 7))
self.partition_times = []
+ self.run_pause = int(conf.get('run_pause', 30))
def _rsync(self, args):
"""
@@ -450,7 +453,7 @@ class ObjectReplicator(object):
eventlet.sleep(300)
self.stats_line()
- def run(self):
+ def replicate(self):
"""Run a replication pass"""
self.start = time.time()
self.suffix_count = 0
@@ -506,3 +509,26 @@ class ObjectReplicator(object):
self.kill_coros()
self.stats_line()
stats.kill()
+
+ def run_once(self):
+ start = time.time()
+ self.logger.info("Running object replicator in script mode.")
+ self.replicate()
+ total = (time.time() - start)/60
+ self.logger.info(
+ "Object replication complete. (%.02f minutes)" % total)
+
+ def run_forever(self):
+ self.logger.info("Starting object replicator in daemon mode.")
+ # Run the replicator continually
+ while True:
+ start = time.time()
+ self.logger.info("Starting object replication pass.")
+ # Run the replicator
+ self.replicate()
+ total = (time.time() - start)/60
+ self.logger.info(
+ "Object replication complete. (%.02f minutes)" % total)
+ self.logger.debug('Replication sleeping for %s seconds.' %
+ self.run_pause)
+ sleep(self.run_pause)
diff --git a/swift/obj/updater.py b/swift/obj/updater.py
index 7fa918282..40121e4b9 100644
--- a/swift/obj/updater.py
+++ b/swift/obj/updater.py
@@ -26,14 +26,16 @@ from swift.common.bufferedhttp import http_connect
from swift.common.exceptions import ConnectionTimeout
from swift.common.ring import Ring
from swift.common.utils import get_logger, renamer
+from swift.common.daemon import Daemon
from swift.obj.server import ASYNCDIR
-class ObjectUpdater(object):
+class ObjectUpdater(Daemon):
"""Update object information in container listings."""
def __init__(self, conf):
- self.logger = get_logger(conf)
+ self.conf = conf
+ self.logger = get_logger(conf, 'object-updater')
self.devices = conf.get('devices', '/srv/node')
self.mount_check = conf.get('mount_check', 'true').lower() in \
('true', 't', '1', 'on', 'yes', 'y')
@@ -56,7 +58,7 @@ class ObjectUpdater(object):
self.container_ring = Ring(self.container_ring_path)
return self.container_ring
- def update_forever(self): # pragma: no cover
+ def run_forever(self): # pragma: no cover
"""Run the updater continuously."""
time.sleep(random() * self.interval)
while True:
@@ -95,7 +97,7 @@ class ObjectUpdater(object):
if elapsed < self.interval:
time.sleep(self.interval - elapsed)
- def update_once_single_threaded(self):
+ def run_once(self):
"""Run the updater once"""
self.logger.info('Begin object update single threaded sweep')
begin = time.time()
diff --git a/test/unit/common/test_db_replicator.py b/test/unit/common/test_db_replicator.py
index 65fe149ff..1ffe1e923 100644
--- a/test/unit/common/test_db_replicator.py
+++ b/test/unit/common/test_db_replicator.py
@@ -170,9 +170,9 @@ class TestDBReplicator(unittest.TestCase):
{'id': 'a', 'point': -1, 'max_row': 10, 'hash': 'd'},
FakeBroker(), -1)), False)
- def test_replicate_once(self):
+ def test_run_once(self):
replicator = TestReplicator({})
- replicator.replicate_once()
+ replicator.run_once()
def test_usync(self):
fake_http = ReplHttp()
diff --git a/test/unit/container/test_updater.py b/test/unit/container/test_updater.py
index 108334363..35e762916 100644
--- a/test/unit/container/test_updater.py
+++ b/test/unit/container/test_updater.py
@@ -77,7 +77,7 @@ class TestContainerUpdater(unittest.TestCase):
self.assertEquals(cu.node_timeout, 5)
self.assert_(cu.get_account_ring() is not None)
- def test_update_once_single_threaded(self):
+ def test_run_once(self):
cu = container_updater.ContainerUpdater({
'devices': self.devices_dir,
'mount_check': 'false',
@@ -86,17 +86,17 @@ class TestContainerUpdater(unittest.TestCase):
'concurrency': '1',
'node_timeout': '15',
})
- cu.update_once_single_threaded()
+ cu.run_once()
containers_dir = os.path.join(self.sda1, container_server.DATADIR)
os.mkdir(containers_dir)
- cu.update_once_single_threaded()
+ cu.run_once()
self.assert_(os.path.exists(containers_dir))
subdir = os.path.join(containers_dir, 'subdir')
os.mkdir(subdir)
cb = ContainerBroker(os.path.join(subdir, 'hash.db'), account='a',
container='c')
cb.initialize(normalize_timestamp(1))
- cu.update_once_single_threaded()
+ cu.run_once()
info = cb.get_info()
self.assertEquals(info['object_count'], 0)
self.assertEquals(info['bytes_used'], 0)
@@ -105,7 +105,7 @@ class TestContainerUpdater(unittest.TestCase):
cb.put_object('o', normalize_timestamp(2), 3, 'text/plain',
'68b329da9893e34099c7d8ad5cb9c940')
- cu.update_once_single_threaded()
+ cu.run_once()
info = cb.get_info()
self.assertEquals(info['object_count'], 1)
self.assertEquals(info['bytes_used'], 3)
@@ -148,7 +148,7 @@ class TestContainerUpdater(unittest.TestCase):
for dev in cu.get_account_ring().devs:
if dev is not None:
dev['port'] = bindsock.getsockname()[1]
- cu.update_once_single_threaded()
+ cu.run_once()
for event in spawned.wait():
err = event.wait()
if err:
@@ -202,7 +202,7 @@ class TestContainerUpdater(unittest.TestCase):
for dev in cu.get_account_ring().devs:
if dev is not None:
dev['port'] = bindsock.getsockname()[1]
- cu.update_once_single_threaded()
+ cu.run_once()
for event in spawned.wait():
err = event.wait()
if err:
diff --git a/test/unit/obj/test_replicator.py b/test/unit/obj/test_replicator.py
index be04f9ee0..06f7d7458 100644
--- a/test/unit/obj/test_replicator.py
+++ b/test/unit/obj/test_replicator.py
@@ -105,7 +105,7 @@ class TestObjectReplicator(unittest.TestCase):
swift_dir=self.testdir, devices=self.devices, mount_check='false',
timeout='300', stats_interval='1')
self.replicator = object_replicator.ObjectReplicator(
- self.conf, null_logger)
+ self.conf)
# def test_check_ring(self):
# self.replicator.collect_jobs('sda', 0, self.ring)
@@ -184,11 +184,11 @@ class TestObjectReplicator(unittest.TestCase):
def test_run(self):
with _mock_process([(0,'')]*100):
- self.replicator.run()
+ self.replicator.replicate()
def test_run_withlog(self):
with _mock_process([(0,"stuff in log")]*100):
- self.replicator.run()
+ self.replicator.replicate()
if __name__ == '__main__':
unittest.main()
diff --git a/test/unit/obj/test_updater.py b/test/unit/obj/test_updater.py
index 0d71dc966..0064e2cbd 100644
--- a/test/unit/obj/test_updater.py
+++ b/test/unit/obj/test_updater.py
@@ -69,7 +69,7 @@ class TestObjectUpdater(unittest.TestCase):
self.assertEquals(cu.node_timeout, 5)
self.assert_(cu.get_container_ring() is not None)
- def test_update_once_single_threaded(self):
+ def test_run_once(self):
cu = object_updater.ObjectUpdater({
'devices': self.devices_dir,
'mount_check': 'false',
@@ -78,15 +78,15 @@ class TestObjectUpdater(unittest.TestCase):
'concurrency': '1',
'node_timeout': '15',
})
- cu.update_once_single_threaded()
+ cu.run_once()
async_dir = os.path.join(self.sda1, object_server.ASYNCDIR)
os.mkdir(async_dir)
- cu.update_once_single_threaded()
+ cu.run_once()
self.assert_(os.path.exists(async_dir))
odd_dir = os.path.join(async_dir, 'not really supposed to be here')
os.mkdir(odd_dir)
- cu.update_once_single_threaded()
+ cu.run_once()
self.assert_(os.path.exists(async_dir))
self.assert_(not os.path.exists(odd_dir))
@@ -98,7 +98,7 @@ class TestObjectUpdater(unittest.TestCase):
pickle.dump({'op': 'PUT', 'account': 'a', 'container': 'c', 'obj': 'o',
'headers': {'X-Container-Timestamp': normalize_timestamp(0)}},
open(op_path, 'wb'))
- cu.update_once_single_threaded()
+ cu.run_once()
self.assert_(os.path.exists(op_path))
bindsock = listen(('127.0.0.1', 0))
@@ -140,7 +140,7 @@ class TestObjectUpdater(unittest.TestCase):
for dev in cu.get_container_ring().devs:
if dev is not None:
dev['port'] = bindsock.getsockname()[1]
- cu.update_once_single_threaded()
+ cu.run_once()
err = event.wait()
if err:
raise err