diff options
author | Keith Bostic <keith.bostic@mongodb.com> | 2015-03-03 10:08:59 -0500 |
---|---|---|
committer | Keith Bostic <keith.bostic@mongodb.com> | 2015-03-03 10:08:59 -0500 |
commit | e215abd7868a9f9cecaa85490c8a226f8d4cefa2 (patch) | |
tree | 53655d01565e9289904b30621c4d90faa1373f4c | |
parent | dc0dd9ff2c23b284801a35b4e2e38e3412086fe4 (diff) | |
parent | 0f8b927574885b475c71afd783a69894e5acef6d (diff) | |
download | mongo-e215abd7868a9f9cecaa85490c8a226f8d4cefa2.tar.gz |
Merge pull request #1713 from wiredtiger/shared-cache-reconfigure-bug
Fix a bug in the reconfigure API related to shared cache quotas.
-rw-r--r-- | src/conn/conn_api.c | 3 | ||||
-rw-r--r-- | src/conn/conn_cache.c | 82 | ||||
-rw-r--r-- | src/conn/conn_cache_pool.c | 25 | ||||
-rw-r--r-- | src/conn/conn_open.c | 3 | ||||
-rw-r--r-- | src/evict/evict_lru.c | 4 | ||||
-rw-r--r-- | src/include/extern.h | 2 | ||||
-rw-r--r-- | test/suite/test_shared_cache01.py (renamed from test/suite/test_shared_cache.py) | 39 | ||||
-rw-r--r-- | test/suite/test_shared_cache02.py | 169 |
8 files changed, 261 insertions, 66 deletions
diff --git a/src/conn/conn_api.c b/src/conn/conn_api.c index 0562f9cfc34..6b9824fc415 100644 --- a/src/conn/conn_api.c +++ b/src/conn/conn_api.c @@ -762,8 +762,7 @@ __conn_reconfigure(WT_CONNECTION *wt_conn, const char *config) WT_ERR(__conn_statistics_config(session, config_cfg)); WT_ERR(__wt_async_reconfig(session, config_cfg)); - WT_ERR(__wt_cache_config(session, config_cfg)); - WT_ERR(__wt_cache_pool_config(session, config_cfg)); + WT_ERR(__wt_cache_config(session, 1, config_cfg)); WT_ERR(__wt_checkpoint_server_create(session, config_cfg)); WT_ERR(__wt_lsm_manager_reconfig(session, config_cfg)); WT_ERR(__wt_statlog_create(session, config_cfg)); diff --git a/src/conn/conn_cache.c b/src/conn/conn_cache.c index c513d46137c..4a7e15044de 100644 --- a/src/conn/conn_cache.c +++ b/src/conn/conn_cache.c @@ -9,33 +9,28 @@ #include "wt_internal.h" /* - * __wt_cache_config -- + * __cache_config_local -- * Configure the underlying cache. */ -int -__wt_cache_config(WT_SESSION_IMPL *session, const char *cfg[]) +static int +__cache_config_local(WT_SESSION_IMPL *session, int shared, const char *cfg[]) { WT_CACHE *cache; WT_CONFIG_ITEM cval; WT_CONNECTION_IMPL *conn; + uint32_t evict_workers_max, evict_workers_min; conn = S2C(session); cache = conn->cache; /* * If not using a shared cache configure the cache size, otherwise - * check for a reserved size. + * check for a reserved size. All other settings are independent of + * whether we are using a shared cache or not. */ - if (!F_ISSET(conn, WT_CONN_CACHE_POOL)) { + if (!shared) { WT_RET(__wt_config_gets(session, cfg, "cache_size", &cval)); conn->cache_size = (uint64_t)cval.val; - } else { - WT_RET(__wt_config_gets( - session, cfg, "shared_cache.reserve", &cval)); - if (cval.val == 0) - WT_RET(__wt_config_gets( - session, cfg, "shared_cache.chunk", &cval)); - cache->cp_reserved = (uint64_t)cval.val; } WT_RET(__wt_config_gets(session, cfg, "cache_overhead", &cval)); @@ -57,16 +52,64 @@ __wt_cache_config(WT_SESSION_IMPL *session, const char *cfg[]) */ WT_RET(__wt_config_gets(session, cfg, "eviction.threads_max", &cval)); WT_ASSERT(session, cval.val > 0); - conn->evict_workers_max = (u_int)cval.val - 1; + evict_workers_max = (uint32_t)cval.val - 1; WT_RET(__wt_config_gets(session, cfg, "eviction.threads_min", &cval)); WT_ASSERT(session, cval.val > 0); - conn->evict_workers_min = (u_int)cval.val - 1; + evict_workers_min = (uint32_t)cval.val - 1; - if (conn->evict_workers_min > conn->evict_workers_max) + if (evict_workers_min > evict_workers_max) WT_RET_MSG(session, EINVAL, "eviction=(threads_min) cannot be greater than " "eviction=(threads_max)"); + conn->evict_workers_max = evict_workers_max; + conn->evict_workers_min = evict_workers_min; + + return (0); +} + +/* + * __wt_cache_config -- + * Configure or reconfigure the current cache and shared cache. + */ +int +__wt_cache_config(WT_SESSION_IMPL *session, int reconfigure, const char *cfg[]) +{ + WT_CONFIG_ITEM cval; + WT_CONNECTION_IMPL *conn; + int now_shared, was_shared; + + conn = S2C(session); + + WT_ASSERT(session, conn->cache != NULL); + + WT_RET(__wt_config_gets_none(session, cfg, "shared_cache.name", &cval)); + now_shared = cval.len != 0; + was_shared = F_ISSET(conn, WT_CONN_CACHE_POOL); + + /* Cleanup if reconfiguring */ + if (reconfigure && was_shared && !now_shared) + /* Remove ourselves from the pool if necessary */ + WT_RET(__wt_conn_cache_pool_destroy(session)); + else if (reconfigure && !was_shared && now_shared) + /* + * Cache size will now be managed by the cache pool - the + * start size always needs to be zero to allow the pool to + * manage how much memory is in-use. + */ + conn->cache_size = 0; + + /* + * Always setup the local cache - it's used even if we are + * participating in a shared cache. + */ + WT_RET(__cache_config_local(session, now_shared, cfg)); + if (now_shared) { + WT_RET(__wt_cache_pool_config(session, cfg)); + WT_ASSERT(session, F_ISSET(conn, WT_CONN_CACHE_POOL)); + if (!was_shared) + WT_RET(__wt_conn_cache_pool_open(session)); + } return (0); } @@ -84,19 +127,14 @@ __wt_cache_create(WT_SESSION_IMPL *session, const char *cfg[]) conn = S2C(session); - WT_ASSERT(session, conn->cache == NULL || - (F_ISSET(conn, WT_CONN_CACHE_POOL) && conn->cache != NULL)); + WT_ASSERT(session, conn->cache == NULL); WT_RET(__wt_calloc_one(session, &conn->cache)); cache = conn->cache; /* Use a common routine for run-time configuration options. */ - WT_RET(__wt_cache_config(session, cfg)); - - /* Add the configured cache to the cache pool. */ - if (F_ISSET(conn, WT_CONN_CACHE_POOL)) - WT_RET(__wt_conn_cache_pool_open(session)); + WT_RET(__wt_cache_config(session, 0, cfg)); /* * The target size must be lower than the trigger size or we will never diff --git a/src/conn/conn_cache_pool.c b/src/conn/conn_cache_pool.c index f5b78e33b04..7bf090496a8 100644 --- a/src/conn/conn_cache_pool.c +++ b/src/conn/conn_cache_pool.c @@ -36,17 +36,17 @@ __wt_cache_pool_config(WT_SESSION_IMPL *session, const char **cfg) WT_CONNECTION_IMPL *conn, *entry; WT_DECL_RET; char *pool_name; - int created, reconfiguring; + int created, updating; uint64_t chunk, reserve, size, used_cache; conn = S2C(session); - created = reconfiguring = 0; + created = updating = 0; pool_name = NULL; cp = NULL; size = 0; if (F_ISSET(conn, WT_CONN_CACHE_POOL)) - reconfiguring = 1; + updating = 1; else { WT_RET(__wt_config_gets_none( session, cfg, "shared_cache.name", &cval)); @@ -81,7 +81,7 @@ __wt_cache_pool_config(WT_SESSION_IMPL *session, const char **cfg) __wt_spin_lock(session, &__wt_process.spinlock); if (__wt_process.cache_pool == NULL) { - WT_ASSERT(session, !reconfiguring); + WT_ASSERT(session, !updating); /* Create a cache pool. */ WT_ERR(__wt_calloc_one(session, &cp)); created = 1; @@ -96,7 +96,7 @@ __wt_cache_pool_config(WT_SESSION_IMPL *session, const char **cfg) __wt_process.cache_pool = cp; WT_ERR(__wt_verbose(session, WT_VERB_SHARED_CACHE, "Created cache pool %s", cp->name)); - } else if (!reconfiguring && !WT_STRING_MATCH( + } else if (!updating && !WT_STRING_MATCH( __wt_process.cache_pool->name, pool_name, strlen(pool_name))) /* Only a single cache pool is supported. */ WT_ERR_MSG(session, WT_ERROR, @@ -109,7 +109,7 @@ __wt_cache_pool_config(WT_SESSION_IMPL *session, const char **cfg) * The cache pool requires a reference count to avoid a race between * configuration/open and destroy. */ - if (!reconfiguring) + if (!updating) ++cp->refs; /* @@ -157,7 +157,7 @@ __wt_cache_pool_config(WT_SESSION_IMPL *session, const char **cfg) if (__wt_config_gets(session, &cfg[1], "shared_cache.reserve", &cval) == 0 && cval.val != 0) reserve = (uint64_t)cval.val; - else if (reconfiguring) + else if (updating) reserve = conn->cache->cp_reserved; else reserve = chunk; @@ -171,18 +171,23 @@ __wt_cache_pool_config(WT_SESSION_IMPL *session, const char **cfg) TAILQ_FOREACH(entry, &cp->cache_pool_qh, cpq) used_cache += entry->cache->cp_reserved; } + /* Ignore our old allocation if reconfiguring */ + if (updating) + used_cache -= conn->cache->cp_reserved; if (used_cache + reserve > size) WT_ERR_MSG(session, EINVAL, "Shared cache unable to accommodate this configuration. " - "Shared cache size: %" PRIu64 ", reserved: %" PRIu64, + "Shared cache size: %" PRIu64 ", requested min: %" PRIu64, size, used_cache + reserve); /* The configuration is verified - it's safe to update the pool. */ cp->size = size; cp->chunk = chunk; + conn->cache->cp_reserved = reserve; + /* Wake up the cache pool server so any changes are noticed. */ - if (reconfiguring) + if (updating) WT_ERR(__wt_cond_signal( session, __wt_process.cache_pool->cache_pool_cond)); @@ -192,7 +197,7 @@ __wt_cache_pool_config(WT_SESSION_IMPL *session, const char **cfg) F_SET(conn, WT_CONN_CACHE_POOL); err: __wt_spin_unlock(session, &__wt_process.spinlock); - if (!reconfiguring) + if (!updating) __wt_free(session, pool_name); if (ret != 0 && created) { __wt_free(session, cp->name); diff --git a/src/conn/conn_open.c b/src/conn/conn_open.c index bc1d9eaf01a..0a3d35ac0b1 100644 --- a/src/conn/conn_open.c +++ b/src/conn/conn_open.c @@ -55,9 +55,6 @@ __wt_connection_open(WT_CONNECTION_IMPL *conn, const char *cfg[]) */ WT_WRITE_BARRIER(); - /* Connect to a cache pool. */ - WT_RET(__wt_cache_pool_config(session, cfg)); - /* Create the cache. */ WT_RET(__wt_cache_create(session, cfg)); diff --git a/src/evict/evict_lru.c b/src/evict/evict_lru.c index bc479159cd7..640c9b0541d 100644 --- a/src/evict/evict_lru.c +++ b/src/evict/evict_lru.c @@ -248,7 +248,7 @@ __evict_workers_resize(WT_SESSION_IMPL *session) WT_DECL_RET; WT_EVICT_WORKER *workers; size_t alloc; - u_int i; + uint32_t i; conn = S2C(session); @@ -332,7 +332,7 @@ __wt_evict_destroy(WT_SESSION_IMPL *session) WT_DECL_RET; WT_EVICT_WORKER *workers; WT_SESSION *wt_session; - u_int i; + uint32_t i; conn = S2C(session); cache = conn->cache; diff --git a/src/include/extern.h b/src/include/extern.h index c6628ffdc1c..0ef055e1162 100644 --- a/src/include/extern.h +++ b/src/include/extern.h @@ -207,7 +207,7 @@ extern int __wt_conn_remove_data_source(WT_SESSION_IMPL *session); extern int __wt_extractor_config(WT_SESSION_IMPL *session, const char *config, WT_EXTRACTOR **extractorp, int *ownp); extern int __wt_conn_remove_extractor(WT_SESSION_IMPL *session); extern int __wt_verbose_config(WT_SESSION_IMPL *session, const char *cfg[]); -extern int __wt_cache_config(WT_SESSION_IMPL *session, const char *cfg[]); +extern int __wt_cache_config(WT_SESSION_IMPL *session, int reconfigure, const char *cfg[]); extern int __wt_cache_create(WT_SESSION_IMPL *session, const char *cfg[]); extern void __wt_cache_stats_update(WT_SESSION_IMPL *session); extern int __wt_cache_destroy(WT_SESSION_IMPL *session); diff --git a/test/suite/test_shared_cache.py b/test/suite/test_shared_cache01.py index ff40d31e6df..e6d712e61bc 100644 --- a/test/suite/test_shared_cache.py +++ b/test/suite/test_shared_cache01.py @@ -33,12 +33,12 @@ import wiredtiger, wttest from wttest import unittest from helper import key_populate, simple_populate -# test_shared_cache.py +# test_shared_cache01.py # Checkpoint tests # Test shared cache shared amongst multiple connections. -class test_shared_cache(wttest.WiredTigerTestCase): +class test_shared_cache01(wttest.WiredTigerTestCase): - uri = 'table:test_shared_cache' + uri = 'table:test_shared_cache01' # Setup fairly large items to use up cache data_str = 'abcdefghijklmnopqrstuvwxyz' * 20 @@ -89,7 +89,7 @@ class test_shared_cache(wttest.WiredTigerTestCase): self.sessions = [] # Implicitly closed when closing sessions. # Basic test of shared cache - def test_shared_cache01(self): + def test_shared_cache_basic(self): nops = 1000 self.openConnections(['WT_TEST1', 'WT_TEST2']) @@ -99,7 +99,7 @@ class test_shared_cache(wttest.WiredTigerTestCase): self.closeConnections() # Test of shared cache with more connections - def test_shared_cache02(self): + def test_shared_cache_more_connections(self): nops = 1000 self.openConnections(['WT_TEST1', 'WT_TEST2', 'WT_TEST3', 'WT_TEST4']) @@ -109,7 +109,7 @@ class test_shared_cache(wttest.WiredTigerTestCase): self.closeConnections() # Do enough work for the shared cache to be fully allocated. - def test_shared_cache03(self): + def test_shared_cache_full(self): nops = 10000 self.openConnections(['WT_TEST1', 'WT_TEST2']) for sess in self.sessions: @@ -121,7 +121,7 @@ class test_shared_cache(wttest.WiredTigerTestCase): self.closeConnections() # Switch the work between connections, to test rebalancing. - def test_shared_cache04(self): + def test_shared_cache_rebalance(self): # About 100 MB of data with ~250 byte values. nops = 200000 self.openConnections(['WT_TEST1', 'WT_TEST2']) @@ -132,7 +132,7 @@ class test_shared_cache(wttest.WiredTigerTestCase): self.closeConnections() # Add a new connection once the shared cache is already established. - def test_shared_cache05(self): + def test_shared_cache_late_join(self): nops = 1000 self.openConnections(['WT_TEST1', 'WT_TEST2']) @@ -147,7 +147,7 @@ class test_shared_cache(wttest.WiredTigerTestCase): self.closeConnections() # Close a connection and keep using other connections. - def test_shared_cache06(self): + def test_shared_cache_leaving(self): nops = 10000 self.openConnections(['WT_TEST1', 'WT_TEST2', 'WT_TEST3']) @@ -163,7 +163,7 @@ class test_shared_cache(wttest.WiredTigerTestCase): # Test verbose output @unittest.skip("Verbose output handling") - def test_shared_cache07(self): + def test_shared_cache_verbose(self): nops = 1000 self.openConnections( ['WT_TEST1', 'WT_TEST2'], extra_opts="verbose=[shared_cache]") @@ -174,7 +174,7 @@ class test_shared_cache(wttest.WiredTigerTestCase): self.closeConnections() # Test opening a connection outside of the shared cache - def test_shared_cache08(self): + def test_shared_cache_mixed(self): nops = 1000 self.openConnections(['WT_TEST1', 'WT_TEST2']) @@ -185,7 +185,7 @@ class test_shared_cache(wttest.WiredTigerTestCase): self.closeConnections() # Test default config values - def test_shared_cache09(self): + def test_shared_cache_defaults(self): nops = 1000 self.openConnections(['WT_TEST1', 'WT_TEST2'], pool_opts=',shared_cache=(name=pool,size=200M)') @@ -194,21 +194,8 @@ class test_shared_cache(wttest.WiredTigerTestCase): self.add_records(sess, 0, nops) self.closeConnections() - # Test reconfigure API - def test_shared_cache10(self): - nops = 1000 - self.openConnections(['WT_TEST1', 'WT_TEST2']) - - for sess in self.sessions: - sess.create(self.uri, "key_format=S,value_format=S") - self.add_records(sess, 0, nops) - - connection = self.conns[0] - connection.reconfigure("shared_cache=(name=pool,size=300M)") - self.closeConnections() - # Test default config values - def test_shared_cache11(self): + def test_shared_cache_defaults2(self): nops = 1000 self.openConnections(['WT_TEST1', 'WT_TEST2'], pool_opts=',shared_cache=(name=pool)') diff --git a/test/suite/test_shared_cache02.py b/test/suite/test_shared_cache02.py new file mode 100644 index 00000000000..3806e9d0cda --- /dev/null +++ b/test/suite/test_shared_cache02.py @@ -0,0 +1,169 @@ +#!/usr/bin/env python +# +# Public Domain 2014-2015 MongoDB, Inc. +# Public Domain 2008-2014 WiredTiger, Inc. +# +# This is free and unencumbered software released into the public domain. +# +# Anyone is free to copy, modify, publish, use, compile, sell, or +# distribute this software, either in source code form or as a compiled +# binary, for any purpose, commercial or non-commercial, and by any +# means. +# +# In jurisdictions that recognize copyright laws, the author or authors +# of this software dedicate any and all copyright interest in the +# software to the public domain. We make this dedication for the benefit +# of the public at large and to the detriment of our heirs and +# successors. We intend this dedication to be an overt act of +# relinquishment in perpetuity of all present and future rights to this +# software under copyright law. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. +# IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR +# OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, +# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +# OTHER DEALINGS IN THE SOFTWARE. +# If unittest2 is available, use it in preference to (the old) unittest + +import os +import shutil +import wiredtiger, wttest +from wttest import unittest +from helper import key_populate, simple_populate + +# test_shared_cache02.py +# Shared cache tests +# Test shared cache shared amongst multiple connections. +class test_shared_cache02(wttest.WiredTigerTestCase): + + uri = 'table:test_shared_cache02' + # Setup fairly large items to use up cache + data_str = 'abcdefghijklmnopqrstuvwxyz' * 20 + + # Add a set of records + def add_records(self, session, start, stop): + cursor = session.open_cursor(self.uri, None, "overwrite") + for i in range(start, stop+1): + cursor.set_key("%010d KEY------" % i) + cursor.set_value("%010d VALUE "% i + self.data_str) + self.assertEqual(cursor.insert(), 0) + cursor.close() + + # Disable default setup/shutdown steps - connections are managed manually. + def setUpSessionOpen(self, conn): + return None + + def close_conn(self): + return None + + def setUpConnectionOpen(self, dir): + return None + + def openConnections( + self, + connections, + pool_opts = ',shared_cache=(name=pool,size=200M,chunk=10M,reserve=30M),', + extra_opts = '', + add=0): + if add == 0: + self.conns = [] + self.sessions = [] + # Open the set of connections. + for name in connections: + shutil.rmtree(name, True) + os.mkdir(name) + next_conn = wiredtiger.wiredtiger_open( + name, + 'create,error_prefix="' + self.shortid() + ': "' + + pool_opts + extra_opts) + self.conns.append(next_conn) + self.sessions.append(next_conn.open_session(None)) + return None + + def closeConnections(self): + for tmp_conn in self.conns: + tmp_conn.close() + self.conns = [] + self.sessions = [] # Implicitly closed when closing sessions. + + # Test reconfigure API + def test_shared_cache_reconfig01(self): + nops = 1000 + self.openConnections(['WT_TEST1', 'WT_TEST2']) + + for sess in self.sessions: + sess.create(self.uri, "key_format=S,value_format=S") + self.add_records(sess, 0, nops) + + connection = self.conns[0] + connection.reconfigure("shared_cache=(name=pool,size=300M)") + self.closeConnections() + + # Test reconfigure that grows the usage over quota fails + def test_shared_cache_reconfig02(self): + nops = 1000 + self.openConnections(['WT_TEST1', 'WT_TEST2'], + pool_opts = ',shared_cache=(name=pool,size=50M,reserve=20M),') + + for sess in self.sessions: + sess.create(self.uri, "key_format=S,value_format=S") + self.add_records(sess, 0, nops) + + connection = self.conns[0] + # Reconfigure to over-subscribe, call should fail with an error + self.assertRaisesWithMessage(wiredtiger.WiredTigerError, + lambda: connection.reconfigure("shared_cache=(name=pool,reserve=40M)"), + '/Shared cache unable to accommodate this configuration/') + # TODO: Ensure that the reserve size wasn't updated. + # cursor = self.sessions[0].open_cursor('config:', None, None) + # value = cursor['connection'] + # self.assertTrue(value.find('reserve') != -1) + + self.closeConnections() + + # Test reconfigure that would grow the usage over quota if the + # previous reserve size isn't taken into account + def test_shared_cache_reconfig03(self): + nops = 1000 + self.openConnections(['WT_TEST1', 'WT_TEST2'], + pool_opts = ',shared_cache=(name=pool,size=50M,reserve=20M),') + + for sess in self.sessions: + sess.create(self.uri, "key_format=S,value_format=S") + self.add_records(sess, 0, nops) + + connection = self.conns[0] + + connection.reconfigure("shared_cache=(name=pool,reserve=30M)"), + + # TODO: Ensure that the reserve size was updated. + # cursor = self.sessions[0].open_cursor('config:', None, None) + # value = cursor['connection'] + # self.assertTrue(value.find('reserve') != -1) + + self.closeConnections() + + # Test reconfigure that switches to using a shared cache + # previous reserve size isn't taken into account + def test_shared_cache_reconfig03(self): + nops = 1000 + self.openConnections(['WT_TEST1', 'WT_TEST2'], pool_opts = ',') + + for sess in self.sessions: + sess.create(self.uri, "key_format=S,value_format=S") + self.add_records(sess, 0, nops) + + self.conns[0].reconfigure("shared_cache=(name=pool,reserve=20M)"), + self.conns[1].reconfigure("shared_cache=(name=pool,reserve=20M)"), + + # TODO: Ensure that the reserve size was updated. + # cursor = self.sessions[0].open_cursor('config:', None, None) + # value = cursor['connection'] + # self.assertTrue(value.find('reserve') != -1) + + self.closeConnections() + +if __name__ == '__main__': + wttest.run() |