diff options
author | Jenkins <jenkins@review.openstack.org> | 2015-06-12 14:12:30 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2015-06-12 14:12:30 +0000 |
commit | 25cd2ffe2c50aacea40e207def4e49aa5b8f32a8 (patch) | |
tree | 82d20d9b6316fa0697ccac73508b2f5db2bc5e26 | |
parent | 06f66e490b3de403784fb30bb22b0dab1a1c4e78 (diff) | |
parent | b3307b1d6cd38572a78ca9f072b2e7ccd8cbd398 (diff) | |
download | ceilometer-25cd2ffe2c50aacea40e207def4e49aa5b8f32a8.tar.gz |
Merge "[MongoDB] Fix bug with reconnection to new master node" into stable/juno
-rw-r--r-- | ceilometer/alarm/storage/impl_db2.py | 2 | ||||
-rw-r--r-- | ceilometer/alarm/storage/impl_mongodb.py | 2 | ||||
-rw-r--r-- | ceilometer/storage/__init__.py | 4 | ||||
-rw-r--r-- | ceilometer/storage/impl_db2.py | 2 | ||||
-rw-r--r-- | ceilometer/storage/impl_mongodb.py | 2 | ||||
-rw-r--r-- | ceilometer/storage/mongo/utils.py | 119 | ||||
-rw-r--r-- | ceilometer/tests/storage/test_pymongo_base.py | 42 | ||||
-rw-r--r-- | ceilometer/tests/storage/test_storage_scenarios.py | 88 |
8 files changed, 196 insertions, 65 deletions
diff --git a/ceilometer/alarm/storage/impl_db2.py b/ceilometer/alarm/storage/impl_db2.py index 9ca37f20..92db547f 100644 --- a/ceilometer/alarm/storage/impl_db2.py +++ b/ceilometer/alarm/storage/impl_db2.py @@ -73,5 +73,5 @@ class Connection(pymongo_base.Connection): # not been implemented. However calling this method is important for # removal of all the empty dbs created during the test runs since # test run is against mongodb on Jenkins - self.conn.drop_database(self.db) + self.conn.drop_database(self.db.name) self.conn.close() diff --git a/ceilometer/alarm/storage/impl_mongodb.py b/ceilometer/alarm/storage/impl_mongodb.py index 19fff006..60c0ca4f 100644 --- a/ceilometer/alarm/storage/impl_mongodb.py +++ b/ceilometer/alarm/storage/impl_mongodb.py @@ -63,6 +63,6 @@ class Connection(pymongo_base.Connection): self.upgrade() def clear(self): - self.conn.drop_database(self.db) + self.conn.drop_database(self.db.name) # Connection will be reopened automatically if needed self.conn.close() diff --git a/ceilometer/storage/__init__.py b/ceilometer/storage/__init__.py index 36d91cc7..b3383631 100644 --- a/ceilometer/storage/__init__.py +++ b/ceilometer/storage/__init__.py @@ -53,6 +53,10 @@ STORAGE_OPTS = [ default=None, help='The connection string used to connect to the alarm ' 'database. (if unset, connection is used)'), + cfg.StrOpt('mongodb_replica_set', + default='', + help="The connection string used to connect to mongo database, " + "if mongodb replica set was chosen."), ] cfg.CONF.register_opts(STORAGE_OPTS, group='database') diff --git a/ceilometer/storage/impl_db2.py b/ceilometer/storage/impl_db2.py index 43261625..e5c819e2 100644 --- a/ceilometer/storage/impl_db2.py +++ b/ceilometer/storage/impl_db2.py @@ -200,7 +200,7 @@ class Connection(pymongo_base.Connection): # not been implemented. However calling this method is important for # removal of all the empty dbs created during the test runs since # test run is against mongodb on Jenkins - self.conn.drop_database(self.db) + self.conn.drop_database(self.db.name) self.conn.close() def record_metering_data(self, data): diff --git a/ceilometer/storage/impl_mongodb.py b/ceilometer/storage/impl_mongodb.py index b9ce7a96..490eb52a 100644 --- a/ceilometer/storage/impl_mongodb.py +++ b/ceilometer/storage/impl_mongodb.py @@ -470,7 +470,7 @@ class Connection(pymongo_base.Connection): ) def clear(self): - self.conn.drop_database(self.db) + self.conn.drop_database(self.db.name) # Connection will be reopened automatically if needed self.conn.close() diff --git a/ceilometer/storage/mongo/utils.py b/ceilometer/storage/mongo/utils.py index 5aa8832f..3d851e0d 100644 --- a/ceilometer/storage/mongo/utils.py +++ b/ceilometer/storage/mongo/utils.py @@ -179,26 +179,21 @@ class ConnectionPool(object): @staticmethod def _mongo_connect(url): - max_retries = cfg.CONF.database.max_retries - retry_interval = cfg.CONF.database.retry_interval - attempts = 0 - while True: - try: - client = pymongo.MongoClient(url, safe=True) - except pymongo.errors.ConnectionFailure as e: - if 0 <= max_retries <= attempts: - LOG.error(_('Unable to connect to the database after ' - '%(retries)d retries. Giving up.') % - {'retries': max_retries}) - raise - LOG.warn(_('Unable to connect to the database server: ' - '%(errmsg)s. Trying again in %(retry_interval)d ' - 'seconds.') % - {'errmsg': e, 'retry_interval': retry_interval}) - attempts += 1 - time.sleep(retry_interval) + try: + if cfg.CONF.database.mongodb_replica_set: + client = MongoProxy( + Prefection( + pymongo.MongoReplicaSetClient( + url, + replicaSet=cfg.CONF.database.mongodb_replica_set))) else: - return client + client = MongoProxy( + Prefection(pymongo.MongoClient(url, safe=True))) + return client + except pymongo.errors.ConnectionFailure as e: + LOG.warn(_('Unable to connect to the database server: ' + '%(errmsg)s.') % {'errmsg': e}) + raise class QueryTransformer(object): @@ -321,3 +316,89 @@ class QueryTransformer(object): return self._handle_not_op(negated_tree) return self._handle_simple_op(operator_node, nodes) + + +def safe_mongo_call(call): + def closure(*args, **kwargs): + max_retries = cfg.CONF.database.max_retries + retry_interval = cfg.CONF.database.retry_interval + attempts = 0 + while True: + try: + return call(*args, **kwargs) + except pymongo.errors.AutoReconnect as err: + if 0 <= max_retries <= attempts: + LOG.error(_('Unable to reconnect to the primary mongodb ' + 'after %(retries)d retries. Giving up.') % + {'retries': max_retries}) + raise + LOG.warn(_('Unable to reconnect to the primary mongodb: ' + '%(errmsg)s. Trying again in %(retry_interval)d ' + 'seconds.') % + {'errmsg': err, 'retry_interval': retry_interval}) + attempts += 1 + time.sleep(retry_interval) + return closure + + +class MongoConn(object): + def __init__(self, method): + self.method = method + + @safe_mongo_call + def __call__(self, *args, **kwargs): + return self.method(*args, **kwargs) + +MONGO_METHODS = set([typ for typ in dir(pymongo.collection.Collection) + if not typ.startswith('_')]) +MONGO_METHODS.update(set([typ for typ in dir(pymongo.MongoClient) + if not typ.startswith('_')])) +MONGO_METHODS.update(set([typ for typ in dir(pymongo) + if not typ.startswith('_')])) + + +class MongoProxy(object): + def __init__(self, conn): + self.conn = conn + + def __getitem__(self, item): + """Create and return proxy around the method in the connection. + + :param item: name of the connection + """ + return MongoProxy(self.conn[item]) + + def __getattr__(self, item): + """Wrap MongoDB connection. + + If item is the name of an executable method, for example find or + insert, wrap this method in the MongoConn. + Else wrap getting attribute with MongoProxy. + """ + if item == 'name': + return getattr(self.conn, item) + if item in MONGO_METHODS: + return MongoConn(getattr(self.conn, item)) + return MongoProxy(getattr(self.conn, item)) + + def __call__(self, *args, **kwargs): + return self.conn(*args, **kwargs) + + +class Prefection(pymongo.collection.Collection): + def __init__(self, conn): + self.conn = conn + + def find(self, *args, **kwargs): + # We need this modifying method to check a connection for MongoDB + # in context of MongoProxy approach. Initially 'find' returns Cursor + # object and doesn't connect to db while Cursor is not used. + found = self.find(*args, **kwargs) + try: + found[0] + except IndexError: + pass + return found + + def __getattr__(self, item): + return getattr(self.conn, item) diff --git a/ceilometer/tests/storage/test_pymongo_base.py b/ceilometer/tests/storage/test_pymongo_base.py index c40bc7c3..28c40a97 100644 --- a/ceilometer/tests/storage/test_pymongo_base.py +++ b/ceilometer/tests/storage/test_pymongo_base.py @@ -12,17 +12,13 @@ """Tests the mongodb and db2 common functionality """ -import contextlib import copy import datetime import mock -import pymongo -from ceilometer.openstack.common.gettextutils import _ from ceilometer.publisher import utils from ceilometer import sample -from ceilometer.storage.mongo import utils as pymongo_utils from ceilometer.tests import db as tests_db from ceilometer.tests.storage import test_storage_scenarios @@ -168,41 +164,3 @@ class CompatibilityTest(test_storage_scenarios.DBTestBase, def test_counter_unit(self): meters = list(self.conn.get_meters()) self.assertEqual(1, len(meters)) - - def test_mongodb_connect_raises_after_custom_number_of_attempts(self): - retry_interval = 13 - max_retries = 37 - self.CONF.set_override( - 'retry_interval', retry_interval, group='database') - self.CONF.set_override( - 'max_retries', max_retries, group='database') - # PyMongo is being used to connect even to DB2, but it only - # accepts URLs with the 'mongodb' scheme. This replacement is - # usually done in the DB2 connection implementation, but since - # we don't call that, we have to do it here. - self.CONF.set_override( - 'connection', self.db_manager.url.replace('db2:', 'mongodb:', 1), - group='database') - - pool = pymongo_utils.ConnectionPool() - with contextlib.nested( - mock.patch( - 'pymongo.MongoClient', - side_effect=pymongo.errors.ConnectionFailure('foo')), - mock.patch.object(pymongo_utils.LOG, 'error'), - mock.patch.object(pymongo_utils.LOG, 'warn'), - mock.patch.object(pymongo_utils.time, 'sleep') - ) as (MockMongo, MockLOGerror, MockLOGwarn, Mocksleep): - self.assertRaises(pymongo.errors.ConnectionFailure, - pool.connect, self.CONF.database.connection) - Mocksleep.assert_has_calls([mock.call(retry_interval) - for i in range(max_retries)]) - MockLOGwarn.assert_any_call( - _('Unable to connect to the database server: %(errmsg)s.' - ' Trying again in %(retry_interval)d seconds.') % - {'errmsg': 'foo', - 'retry_interval': retry_interval}) - MockLOGerror.assert_called_with( - _('Unable to connect to the database after ' - '%(retries)d retries. Giving up.') % - {'retries': max_retries}) diff --git a/ceilometer/tests/storage/test_storage_scenarios.py b/ceilometer/tests/storage/test_storage_scenarios.py index 5318e999..80243784 100644 --- a/ceilometer/tests/storage/test_storage_scenarios.py +++ b/ceilometer/tests/storage/test_storage_scenarios.py @@ -23,7 +23,9 @@ import datetime import operator import mock +from oslo.config import cfg from oslo.utils import timeutils +import pymongo import ceilometer from ceilometer.alarm.storage import models as alarm_models @@ -3099,3 +3101,89 @@ class BigIntegerTest(tests_db.TestBase, msg = utils.meter_message_from_counter( s, self.CONF.publisher.metering_secret) self.conn.record_metering_data(msg) + + +class MongoAutoReconnectTest(DBTestBase, + tests_db.MixinTestsWithBackendScenarios): + cfg.CONF.set_override('retry_interval', 1, group='database') + + @tests_db.run_with('mongodb') + def test_mongo_client(self): + if cfg.CONF.database.mongodb_replica_set: + self.assertIsInstance(self.conn.conn.conn.conn, + pymongo.MongoReplicaSetClient) + else: + self.assertIsInstance(self.conn.conn.conn.conn, + pymongo.MongoClient) + + @staticmethod + def create_side_effect(method, test_exception): + def side_effect(*args, **kwargs): + if test_exception.pop(): + raise pymongo.errors.AutoReconnect + else: + return method(*args, **kwargs) + return side_effect + + @tests_db.run_with('mongodb') + def test_mongo_find(self): + raise_exc = [False, True] + method = self.conn.db.resource.find + + with mock.patch('pymongo.collection.Collection.find', + mock.Mock()) as mock_find: + mock_find.side_effect = self.create_side_effect(method, raise_exc) + mock_find.__name__ = 'find' + resources = list(self.conn.get_resources()) + self.assertEqual(9, len(resources)) + + @tests_db.run_with('mongodb') + def test_mongo_insert(self): + raise_exc = [False, True] + method = self.conn.db.meter.insert + + with mock.patch('pymongo.collection.Collection.insert', + mock.Mock(return_value=method)) as mock_insert: + mock_insert.side_effect = self.create_side_effect(method, + raise_exc) + mock_insert.__name__ = 'insert' + self.create_and_store_sample( + timestamp=datetime.datetime(2014, 10, 15, 14, 39), + source='test-proxy') + meters = list(self.conn.db.meter.find()) + self.assertEqual(12, len(meters)) + + @tests_db.run_with('mongodb') + def test_mongo_find_and_modify(self): + raise_exc = [False, True] + method = self.conn.db.resource.find_and_modify + + with mock.patch('pymongo.collection.Collection.find_and_modify', + mock.Mock()) as mock_fam: + mock_fam.side_effect = self.create_side_effect(method, raise_exc) + mock_fam.__name__ = 'find_and_modify' + self.create_and_store_sample( + timestamp=datetime.datetime(2014, 10, 15, 14, 39), + source='test-proxy') + data = self.conn.db.resource.find( + {'last_sample_timestamp': + datetime.datetime(2014, 10, 15, 14, 39)})[0]['source'] + self.assertEqual('test-proxy', data) + + @tests_db.run_with('mongodb') + def test_mongo_update(self): + raise_exc = [False, True] + method = self.conn.db.resource.update + + with mock.patch('pymongo.collection.Collection.update', + mock.Mock()) as mock_update: + mock_update.side_effect = self.create_side_effect(method, + raise_exc) + mock_update.__name__ = 'update' + self.create_and_store_sample( + timestamp=datetime.datetime(2014, 10, 15, 17, 39), + source='test-proxy-update') + data = self.conn.db.resource.find( + {'last_sample_timestamp': + datetime.datetime(2014, 10, 15, 17, 39)})[0]['source'] + self.assertEqual('test-proxy-update', data) |