summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIgor Degtiarov <idegtiarov@mirantis.com>2014-10-23 14:05:38 +0300
committerBilly Olsen <billy.olsen@canonical.com>2015-05-28 11:04:22 -0700
commitb3307b1d6cd38572a78ca9f072b2e7ccd8cbd398 (patch)
treeeb1dcb2567777674292b64ab18ba36dc39428e8b
parentff882bebf20deb04880c7aef37a8565a10f742c2 (diff)
downloadceilometer-b3307b1d6cd38572a78ca9f072b2e7ccd8cbd398.tar.gz
[MongoDB] Fix bug with reconnection to new master node
Fixes bug with raising AutoReconnect exception when MongoDB ReplicaSet loses connection to primary node. Closes-Bug: #1309555 Conflicts: ceilometer/event/storage/impl_db2.py ceilometer/event/storage/impl_mongodb.py ceilometer/storage/__init__.py ceilometer/storage/mongo/utils.py ceilometer/tests/storage/test_pymongo_base.py Conflicts are due to refactoring of the storage drivers and this patch has been modified to account for the refactor. The test case within the file test_pymongo_base.py was removed since equivalent test coverage was included in the cherry picked commit in the test_storage_scenarios.py file. Change-Id: Id0e81ba60b28d09adff6a10d04b412f25257d8ce (cherry-picked from commit 21d882c96cbbaeb8b78ff91e06e3615be97bff07)
-rw-r--r--ceilometer/alarm/storage/impl_db2.py2
-rw-r--r--ceilometer/alarm/storage/impl_mongodb.py2
-rw-r--r--ceilometer/storage/__init__.py4
-rw-r--r--ceilometer/storage/impl_db2.py2
-rw-r--r--ceilometer/storage/impl_mongodb.py2
-rw-r--r--ceilometer/storage/mongo/utils.py119
-rw-r--r--ceilometer/tests/storage/test_pymongo_base.py42
-rw-r--r--ceilometer/tests/storage/test_storage_scenarios.py88
8 files changed, 196 insertions, 65 deletions
diff --git a/ceilometer/alarm/storage/impl_db2.py b/ceilometer/alarm/storage/impl_db2.py
index 9ca37f20..92db547f 100644
--- a/ceilometer/alarm/storage/impl_db2.py
+++ b/ceilometer/alarm/storage/impl_db2.py
@@ -73,5 +73,5 @@ class Connection(pymongo_base.Connection):
# not been implemented. However calling this method is important for
# removal of all the empty dbs created during the test runs since
# test run is against mongodb on Jenkins
- self.conn.drop_database(self.db)
+ self.conn.drop_database(self.db.name)
self.conn.close()
diff --git a/ceilometer/alarm/storage/impl_mongodb.py b/ceilometer/alarm/storage/impl_mongodb.py
index 19fff006..60c0ca4f 100644
--- a/ceilometer/alarm/storage/impl_mongodb.py
+++ b/ceilometer/alarm/storage/impl_mongodb.py
@@ -63,6 +63,6 @@ class Connection(pymongo_base.Connection):
self.upgrade()
def clear(self):
- self.conn.drop_database(self.db)
+ self.conn.drop_database(self.db.name)
# Connection will be reopened automatically if needed
self.conn.close()
diff --git a/ceilometer/storage/__init__.py b/ceilometer/storage/__init__.py
index 36d91cc7..b3383631 100644
--- a/ceilometer/storage/__init__.py
+++ b/ceilometer/storage/__init__.py
@@ -53,6 +53,10 @@ STORAGE_OPTS = [
default=None,
help='The connection string used to connect to the alarm '
'database. (if unset, connection is used)'),
+ cfg.StrOpt('mongodb_replica_set',
+ default='',
+ help="The connection string used to connect to mongo database, "
+ "if mongodb replica set was chosen."),
]
cfg.CONF.register_opts(STORAGE_OPTS, group='database')
diff --git a/ceilometer/storage/impl_db2.py b/ceilometer/storage/impl_db2.py
index 43261625..e5c819e2 100644
--- a/ceilometer/storage/impl_db2.py
+++ b/ceilometer/storage/impl_db2.py
@@ -200,7 +200,7 @@ class Connection(pymongo_base.Connection):
# not been implemented. However calling this method is important for
# removal of all the empty dbs created during the test runs since
# test run is against mongodb on Jenkins
- self.conn.drop_database(self.db)
+ self.conn.drop_database(self.db.name)
self.conn.close()
def record_metering_data(self, data):
diff --git a/ceilometer/storage/impl_mongodb.py b/ceilometer/storage/impl_mongodb.py
index b9ce7a96..490eb52a 100644
--- a/ceilometer/storage/impl_mongodb.py
+++ b/ceilometer/storage/impl_mongodb.py
@@ -470,7 +470,7 @@ class Connection(pymongo_base.Connection):
)
def clear(self):
- self.conn.drop_database(self.db)
+ self.conn.drop_database(self.db.name)
# Connection will be reopened automatically if needed
self.conn.close()
diff --git a/ceilometer/storage/mongo/utils.py b/ceilometer/storage/mongo/utils.py
index 5aa8832f..3d851e0d 100644
--- a/ceilometer/storage/mongo/utils.py
+++ b/ceilometer/storage/mongo/utils.py
@@ -179,26 +179,21 @@ class ConnectionPool(object):
@staticmethod
def _mongo_connect(url):
- max_retries = cfg.CONF.database.max_retries
- retry_interval = cfg.CONF.database.retry_interval
- attempts = 0
- while True:
- try:
- client = pymongo.MongoClient(url, safe=True)
- except pymongo.errors.ConnectionFailure as e:
- if 0 <= max_retries <= attempts:
- LOG.error(_('Unable to connect to the database after '
- '%(retries)d retries. Giving up.') %
- {'retries': max_retries})
- raise
- LOG.warn(_('Unable to connect to the database server: '
- '%(errmsg)s. Trying again in %(retry_interval)d '
- 'seconds.') %
- {'errmsg': e, 'retry_interval': retry_interval})
- attempts += 1
- time.sleep(retry_interval)
+ try:
+ if cfg.CONF.database.mongodb_replica_set:
+ client = MongoProxy(
+ Prefection(
+ pymongo.MongoReplicaSetClient(
+ url,
+ replicaSet=cfg.CONF.database.mongodb_replica_set)))
else:
- return client
+ client = MongoProxy(
+ Prefection(pymongo.MongoClient(url, safe=True)))
+ return client
+ except pymongo.errors.ConnectionFailure as e:
+ LOG.warn(_('Unable to connect to the database server: '
+ '%(errmsg)s.') % {'errmsg': e})
+ raise
class QueryTransformer(object):
@@ -321,3 +316,89 @@ class QueryTransformer(object):
return self._handle_not_op(negated_tree)
return self._handle_simple_op(operator_node, nodes)
+
+
+def safe_mongo_call(call):
+ def closure(*args, **kwargs):
+ max_retries = cfg.CONF.database.max_retries
+ retry_interval = cfg.CONF.database.retry_interval
+ attempts = 0
+ while True:
+ try:
+ return call(*args, **kwargs)
+ except pymongo.errors.AutoReconnect as err:
+ if 0 <= max_retries <= attempts:
+ LOG.error(_('Unable to reconnect to the primary mongodb '
+ 'after %(retries)d retries. Giving up.') %
+ {'retries': max_retries})
+ raise
+ LOG.warn(_('Unable to reconnect to the primary mongodb: '
+ '%(errmsg)s. Trying again in %(retry_interval)d '
+ 'seconds.') %
+ {'errmsg': err, 'retry_interval': retry_interval})
+ attempts += 1
+ time.sleep(retry_interval)
+ return closure
+
+
+class MongoConn(object):
+ def __init__(self, method):
+ self.method = method
+
+ @safe_mongo_call
+ def __call__(self, *args, **kwargs):
+ return self.method(*args, **kwargs)
+
+MONGO_METHODS = set([typ for typ in dir(pymongo.collection.Collection)
+ if not typ.startswith('_')])
+MONGO_METHODS.update(set([typ for typ in dir(pymongo.MongoClient)
+ if not typ.startswith('_')]))
+MONGO_METHODS.update(set([typ for typ in dir(pymongo)
+ if not typ.startswith('_')]))
+
+
+class MongoProxy(object):
+ def __init__(self, conn):
+ self.conn = conn
+
+ def __getitem__(self, item):
+ """Create and return proxy around the method in the connection.
+
+ :param item: name of the connection
+ """
+ return MongoProxy(self.conn[item])
+
+ def __getattr__(self, item):
+ """Wrap MongoDB connection.
+
+ If item is the name of an executable method, for example find or
+ insert, wrap this method in the MongoConn.
+ Else wrap getting attribute with MongoProxy.
+ """
+ if item == 'name':
+ return getattr(self.conn, item)
+ if item in MONGO_METHODS:
+ return MongoConn(getattr(self.conn, item))
+ return MongoProxy(getattr(self.conn, item))
+
+ def __call__(self, *args, **kwargs):
+ return self.conn(*args, **kwargs)
+
+
+class Prefection(pymongo.collection.Collection):
+ def __init__(self, conn):
+ self.conn = conn
+
+ def find(self, *args, **kwargs):
+ # We need this modifying method to check a connection for MongoDB
+ # in context of MongoProxy approach. Initially 'find' returns Cursor
+ # object and doesn't connect to db while Cursor is not used.
+ found = self.find(*args, **kwargs)
+ try:
+ found[0]
+ except IndexError:
+ pass
+ return found
+
+ def __getattr__(self, item):
+ return getattr(self.conn, item)
diff --git a/ceilometer/tests/storage/test_pymongo_base.py b/ceilometer/tests/storage/test_pymongo_base.py
index c40bc7c3..28c40a97 100644
--- a/ceilometer/tests/storage/test_pymongo_base.py
+++ b/ceilometer/tests/storage/test_pymongo_base.py
@@ -12,17 +12,13 @@
"""Tests the mongodb and db2 common functionality
"""
-import contextlib
import copy
import datetime
import mock
-import pymongo
-from ceilometer.openstack.common.gettextutils import _
from ceilometer.publisher import utils
from ceilometer import sample
-from ceilometer.storage.mongo import utils as pymongo_utils
from ceilometer.tests import db as tests_db
from ceilometer.tests.storage import test_storage_scenarios
@@ -168,41 +164,3 @@ class CompatibilityTest(test_storage_scenarios.DBTestBase,
def test_counter_unit(self):
meters = list(self.conn.get_meters())
self.assertEqual(1, len(meters))
-
- def test_mongodb_connect_raises_after_custom_number_of_attempts(self):
- retry_interval = 13
- max_retries = 37
- self.CONF.set_override(
- 'retry_interval', retry_interval, group='database')
- self.CONF.set_override(
- 'max_retries', max_retries, group='database')
- # PyMongo is being used to connect even to DB2, but it only
- # accepts URLs with the 'mongodb' scheme. This replacement is
- # usually done in the DB2 connection implementation, but since
- # we don't call that, we have to do it here.
- self.CONF.set_override(
- 'connection', self.db_manager.url.replace('db2:', 'mongodb:', 1),
- group='database')
-
- pool = pymongo_utils.ConnectionPool()
- with contextlib.nested(
- mock.patch(
- 'pymongo.MongoClient',
- side_effect=pymongo.errors.ConnectionFailure('foo')),
- mock.patch.object(pymongo_utils.LOG, 'error'),
- mock.patch.object(pymongo_utils.LOG, 'warn'),
- mock.patch.object(pymongo_utils.time, 'sleep')
- ) as (MockMongo, MockLOGerror, MockLOGwarn, Mocksleep):
- self.assertRaises(pymongo.errors.ConnectionFailure,
- pool.connect, self.CONF.database.connection)
- Mocksleep.assert_has_calls([mock.call(retry_interval)
- for i in range(max_retries)])
- MockLOGwarn.assert_any_call(
- _('Unable to connect to the database server: %(errmsg)s.'
- ' Trying again in %(retry_interval)d seconds.') %
- {'errmsg': 'foo',
- 'retry_interval': retry_interval})
- MockLOGerror.assert_called_with(
- _('Unable to connect to the database after '
- '%(retries)d retries. Giving up.') %
- {'retries': max_retries})
diff --git a/ceilometer/tests/storage/test_storage_scenarios.py b/ceilometer/tests/storage/test_storage_scenarios.py
index 5318e999..80243784 100644
--- a/ceilometer/tests/storage/test_storage_scenarios.py
+++ b/ceilometer/tests/storage/test_storage_scenarios.py
@@ -23,7 +23,9 @@ import datetime
import operator
import mock
+from oslo.config import cfg
from oslo.utils import timeutils
+import pymongo
import ceilometer
from ceilometer.alarm.storage import models as alarm_models
@@ -3099,3 +3101,89 @@ class BigIntegerTest(tests_db.TestBase,
msg = utils.meter_message_from_counter(
s, self.CONF.publisher.metering_secret)
self.conn.record_metering_data(msg)
+
+
+class MongoAutoReconnectTest(DBTestBase,
+ tests_db.MixinTestsWithBackendScenarios):
+ cfg.CONF.set_override('retry_interval', 1, group='database')
+
+ @tests_db.run_with('mongodb')
+ def test_mongo_client(self):
+ if cfg.CONF.database.mongodb_replica_set:
+ self.assertIsInstance(self.conn.conn.conn.conn,
+ pymongo.MongoReplicaSetClient)
+ else:
+ self.assertIsInstance(self.conn.conn.conn.conn,
+ pymongo.MongoClient)
+
+ @staticmethod
+ def create_side_effect(method, test_exception):
+ def side_effect(*args, **kwargs):
+ if test_exception.pop():
+ raise pymongo.errors.AutoReconnect
+ else:
+ return method(*args, **kwargs)
+ return side_effect
+
+ @tests_db.run_with('mongodb')
+ def test_mongo_find(self):
+ raise_exc = [False, True]
+ method = self.conn.db.resource.find
+
+ with mock.patch('pymongo.collection.Collection.find',
+ mock.Mock()) as mock_find:
+ mock_find.side_effect = self.create_side_effect(method, raise_exc)
+ mock_find.__name__ = 'find'
+ resources = list(self.conn.get_resources())
+ self.assertEqual(9, len(resources))
+
+ @tests_db.run_with('mongodb')
+ def test_mongo_insert(self):
+ raise_exc = [False, True]
+ method = self.conn.db.meter.insert
+
+ with mock.patch('pymongo.collection.Collection.insert',
+ mock.Mock(return_value=method)) as mock_insert:
+ mock_insert.side_effect = self.create_side_effect(method,
+ raise_exc)
+ mock_insert.__name__ = 'insert'
+ self.create_and_store_sample(
+ timestamp=datetime.datetime(2014, 10, 15, 14, 39),
+ source='test-proxy')
+ meters = list(self.conn.db.meter.find())
+ self.assertEqual(12, len(meters))
+
+ @tests_db.run_with('mongodb')
+ def test_mongo_find_and_modify(self):
+ raise_exc = [False, True]
+ method = self.conn.db.resource.find_and_modify
+
+ with mock.patch('pymongo.collection.Collection.find_and_modify',
+ mock.Mock()) as mock_fam:
+ mock_fam.side_effect = self.create_side_effect(method, raise_exc)
+ mock_fam.__name__ = 'find_and_modify'
+ self.create_and_store_sample(
+ timestamp=datetime.datetime(2014, 10, 15, 14, 39),
+ source='test-proxy')
+ data = self.conn.db.resource.find(
+ {'last_sample_timestamp':
+ datetime.datetime(2014, 10, 15, 14, 39)})[0]['source']
+ self.assertEqual('test-proxy', data)
+
+ @tests_db.run_with('mongodb')
+ def test_mongo_update(self):
+ raise_exc = [False, True]
+ method = self.conn.db.resource.update
+
+ with mock.patch('pymongo.collection.Collection.update',
+ mock.Mock()) as mock_update:
+ mock_update.side_effect = self.create_side_effect(method,
+ raise_exc)
+ mock_update.__name__ = 'update'
+ self.create_and_store_sample(
+ timestamp=datetime.datetime(2014, 10, 15, 17, 39),
+ source='test-proxy-update')
+ data = self.conn.db.resource.find(
+ {'last_sample_timestamp':
+ datetime.datetime(2014, 10, 15, 17, 39)})[0]['source']
+ self.assertEqual('test-proxy-update', data)