summaryrefslogtreecommitdiff
path: root/storage/tokudb/ft-index/src/ydb.cc
diff options
context:
space:
mode:
Diffstat (limited to 'storage/tokudb/ft-index/src/ydb.cc')
-rw-r--r--storage/tokudb/ft-index/src/ydb.cc300
1 files changed, 264 insertions, 36 deletions
diff --git a/storage/tokudb/ft-index/src/ydb.cc b/storage/tokudb/ft-index/src/ydb.cc
index 7617f21d6fc..6a08b89c81a 100644
--- a/storage/tokudb/ft-index/src/ydb.cc
+++ b/storage/tokudb/ft-index/src/ydb.cc
@@ -49,6 +49,7 @@ UNIVERSITY PATENT NOTICE:
PATENT MARKING NOTICE:
This software is covered by US Patent No. 8,185,551.
+ This software is covered by US Patent No. 8,489,638.
PATENT RIGHTS GRANT:
@@ -1114,8 +1115,8 @@ env_close(DB_ENV * env, uint32_t flags) {
r = toku_ydb_do_error(env, EINVAL, "%s", err_msg);
goto panic_and_quit_early;
}
- if (env->i->open_dbs) { //Verify that there are no open dbs.
- if (toku_omt_size(env->i->open_dbs) > 0) {
+ if (env->i->open_dbs_by_dname) { //Verify that there are no open dbs.
+ if (toku_omt_size(env->i->open_dbs_by_dname) > 0) {
err_msg = "Cannot close environment due to open DBs\n";
r = toku_ydb_do_error(env, EINVAL, "%s", err_msg);
goto panic_and_quit_early;
@@ -1191,11 +1192,13 @@ env_close(DB_ENV * env, uint32_t flags) {
toku_free(env->i->real_log_dir);
if (env->i->real_tmp_dir)
toku_free(env->i->real_tmp_dir);
- if (env->i->open_dbs)
- toku_omt_destroy(&env->i->open_dbs);
+ if (env->i->open_dbs_by_dname)
+ toku_omt_destroy(&env->i->open_dbs_by_dname);
+ if (env->i->open_dbs_by_dict_id)
+ toku_omt_destroy(&env->i->open_dbs_by_dict_id);
if (env->i->dir)
toku_free(env->i->dir);
- toku_mutex_destroy(&env->i->open_dbs_lock);
+ toku_pthread_rwlock_destroy(&env->i->open_dbs_rwlock);
// Immediately before freeing internal environment unlock the directories
unlock_single_process(env);
@@ -1717,6 +1720,12 @@ env_set_lock_timeout(DB_ENV *env, uint64_t lock_timeout_msec) {
return 0;
}
+static int
+env_set_lock_timeout_callback(DB_ENV *env, lock_timeout_callback callback) {
+ env->i->lock_wait_timeout_callback = callback;
+ return 0;
+}
+
static void
format_time(const time_t *timer, char *buf) {
ctime_r(timer, buf);
@@ -1745,9 +1754,11 @@ typedef enum {
FS_ENOSPC_REDZONE_CTR, // number of operations rejected by enospc prevention (red zone)
FS_ENOSPC_MOST_RECENT, // most recent time that file system was completely full
FS_ENOSPC_COUNT, // total number of times ENOSPC was returned from an attempt to write
- FS_FSYNC_TIME ,
+ FS_FSYNC_TIME,
FS_FSYNC_COUNT,
- FS_STATUS_NUM_ROWS
+ FS_LONG_FSYNC_TIME,
+ FS_LONG_FSYNC_COUNT,
+ FS_STATUS_NUM_ROWS, // must be last
} fs_status_entry;
typedef struct {
@@ -1766,8 +1777,10 @@ fs_status_init(void) {
FS_STATUS_INIT(FS_ENOSPC_REDZONE_CTR, nullptr, UINT64, "number of operations rejected by enospc prevention (red zone)", TOKU_ENGINE_STATUS);
FS_STATUS_INIT(FS_ENOSPC_MOST_RECENT, nullptr, UNIXTIME, "most recent disk full", TOKU_ENGINE_STATUS);
FS_STATUS_INIT(FS_ENOSPC_COUNT, nullptr, UINT64, "number of write operations that returned ENOSPC", TOKU_ENGINE_STATUS);
- FS_STATUS_INIT(FS_FSYNC_TIME, FILESYSTEM_FSYNC_SECONDS, UINT64, "fsync time", TOKU_ENGINE_STATUS|TOKU_GLOBAL_STATUS);
+ FS_STATUS_INIT(FS_FSYNC_TIME, FILESYSTEM_FSYNC_TIME, UINT64, "fsync time", TOKU_ENGINE_STATUS|TOKU_GLOBAL_STATUS);
FS_STATUS_INIT(FS_FSYNC_COUNT, FILESYSTEM_FSYNC_NUM, UINT64, "fsync count", TOKU_ENGINE_STATUS|TOKU_GLOBAL_STATUS);
+ FS_STATUS_INIT(FS_LONG_FSYNC_TIME, FILESYSTEM_LONG_FSYNC_TIME, UINT64, "long fsync time", TOKU_ENGINE_STATUS|TOKU_GLOBAL_STATUS);
+ FS_STATUS_INIT(FS_LONG_FSYNC_COUNT, FILESYSTEM_LONG_FSYNC_NUM, UINT64, "long fsync count", TOKU_ENGINE_STATUS|TOKU_GLOBAL_STATUS);
fsstat.initialized = true;
}
#undef FS_STATUS_INIT
@@ -1792,10 +1805,12 @@ fs_get_status(DB_ENV * env, fs_redzone_state * redzone_state) {
FS_STATUS_VALUE(FS_ENOSPC_MOST_RECENT) = enospc_most_recent_timestamp;
FS_STATUS_VALUE(FS_ENOSPC_COUNT) = enospc_total;
- uint64_t fsync_count, fsync_time;
- toku_get_fsync_times(&fsync_count, &fsync_time);
+ uint64_t fsync_count, fsync_time, long_fsync_threshold, long_fsync_count, long_fsync_time;
+ toku_get_fsync_times(&fsync_count, &fsync_time, &long_fsync_threshold, &long_fsync_count, &long_fsync_time);
FS_STATUS_VALUE(FS_FSYNC_COUNT) = fsync_count;
FS_STATUS_VALUE(FS_FSYNC_TIME) = fsync_time;
+ FS_STATUS_VALUE(FS_LONG_FSYNC_COUNT) = long_fsync_count;
+ FS_STATUS_VALUE(FS_LONG_FSYNC_TIME) = long_fsync_time;
}
#undef FS_STATUS_VALUE
@@ -2238,6 +2253,177 @@ env_get_cursor_for_directory(DB_ENV* env, DB_TXN* txn, DBC** c) {
return toku_db_cursor(env->i->directory, txn, c, 0);
}
+struct ltm_iterate_requests_callback_extra {
+ ltm_iterate_requests_callback_extra(DB_ENV *e,
+ iterate_requests_callback cb,
+ void *ex) :
+ env(e), callback(cb), extra(ex) {
+ }
+ DB_ENV *env;
+ iterate_requests_callback callback;
+ void *extra;
+};
+
+static int
+find_db_by_dict_id(OMTVALUE v, void *dict_id_v) {
+ DB *db = (DB *) v;
+ DICTIONARY_ID dict_id = db->i->dict_id;
+ DICTIONARY_ID dict_id_find = *(DICTIONARY_ID *) dict_id_v;
+ if (dict_id.dictid < dict_id_find.dictid) {
+ return -1;
+ } else if (dict_id.dictid > dict_id_find.dictid) {
+ return 1;
+ } else {
+ return 0;
+ }
+}
+
+static DB *
+locked_get_db_by_dict_id(DB_ENV *env, DICTIONARY_ID dict_id) {
+ OMTVALUE dbv;
+ int r = toku_omt_find_zero(env->i->open_dbs_by_dict_id, find_db_by_dict_id,
+ (void *) &dict_id, &dbv, nullptr);
+ return r == 0 ? (DB *) dbv : nullptr;
+}
+
+static int ltm_iterate_requests_callback(DICTIONARY_ID dict_id, TXNID txnid,
+ const DBT *left_key,
+ const DBT *right_key,
+ TXNID blocking_txnid,
+ uint64_t start_time,
+ void *extra) {
+ ltm_iterate_requests_callback_extra *info =
+ reinterpret_cast<ltm_iterate_requests_callback_extra *>(extra);
+
+ toku_pthread_rwlock_rdlock(&info->env->i->open_dbs_rwlock);
+ int r = 0;
+ DB *db = locked_get_db_by_dict_id(info->env, dict_id);
+ if (db != nullptr) {
+ r = info->callback(db, txnid, left_key, right_key,
+ blocking_txnid, start_time, info->extra);
+ }
+ toku_pthread_rwlock_rdunlock(&info->env->i->open_dbs_rwlock);
+ return r;
+}
+
+static int
+env_iterate_pending_lock_requests(DB_ENV *env,
+ iterate_requests_callback callback,
+ void *extra) {
+ if (!env_opened(env)) {
+ return EINVAL;
+ }
+
+ toku::locktree::manager *mgr = &env->i->ltm;
+ ltm_iterate_requests_callback_extra e(env, callback, extra);
+ return mgr->iterate_pending_lock_requests(ltm_iterate_requests_callback, &e);
+}
+
+// for the lifetime of this object:
+// - open_dbs_rwlock must be read locked (or better)
+// - txn_mutex must be held
+struct iter_txn_row_locks_callback_extra {
+ iter_txn_row_locks_callback_extra(DB_ENV *e, toku::omt<txn_lt_key_ranges> *m) :
+ env(e), current_db(nullptr), which_lt(0), lt_map(m) {
+ if (lt_map->size() > 0) {
+ set_iterator_and_current_db();
+ }
+ }
+
+ void set_iterator_and_current_db() {
+ txn_lt_key_ranges ranges;
+ const int r = lt_map->fetch(which_lt, &ranges);
+ invariant_zero(r);
+ current_db = locked_get_db_by_dict_id(env, ranges.lt->get_dict_id());
+ iter.create(ranges.buffer);
+ }
+
+ DB_ENV *env;
+ DB *current_db;
+ size_t which_lt;
+ toku::omt<txn_lt_key_ranges> *lt_map;
+ toku::range_buffer::iterator iter;
+ toku::range_buffer::iterator::record rec;
+};
+
+static int iter_txn_row_locks_callback(DB **db, DBT *left_key, DBT *right_key, void *extra) {
+ iter_txn_row_locks_callback_extra *info =
+ reinterpret_cast<iter_txn_row_locks_callback_extra *>(extra);
+
+ while (info->which_lt < info->lt_map->size()) {
+ const bool more = info->iter.current(&info->rec);
+ if (more) {
+ *db = info->current_db;
+ // The caller should interpret data/size == 0 to mean infinity.
+ // Therefore, when we copyref pos/neg infinity into left/right_key,
+ // the caller knows what we're talking about.
+ toku_copyref_dbt(left_key, *info->rec.get_left_key());
+ toku_copyref_dbt(right_key, *info->rec.get_right_key());
+ info->iter.next();
+ return 0;
+ } else {
+ info->which_lt++;
+ if (info->which_lt < info->lt_map->size()) {
+ info->set_iterator_and_current_db();
+ }
+ }
+ }
+ return DB_NOTFOUND;
+}
+
+struct iter_txns_callback_extra {
+ iter_txns_callback_extra(DB_ENV *e, iterate_transactions_callback cb, void *ex) :
+ env(e), callback(cb), extra(ex) {
+ }
+ DB_ENV *env;
+ iterate_transactions_callback callback;
+ void *extra;
+};
+
+static int iter_txns_callback(TOKUTXN txn, void *extra) {
+ iter_txns_callback_extra *info =
+ reinterpret_cast<iter_txns_callback_extra *>(extra);
+
+ DB_TXN *dbtxn = toku_txn_get_container_db_txn(txn);
+ invariant_notnull(dbtxn);
+
+ toku_mutex_lock(&db_txn_struct_i(dbtxn)->txn_mutex);
+ toku_pthread_rwlock_rdlock(&info->env->i->open_dbs_rwlock);
+
+ iter_txn_row_locks_callback_extra e(info->env, &db_txn_struct_i(dbtxn)->lt_map);
+ const int r = info->callback(toku_txn_get_txnid(txn).parent_id64,
+ toku_txn_get_client_id(txn),
+ iter_txn_row_locks_callback,
+ &e,
+ info->extra);
+
+ toku_pthread_rwlock_rdunlock(&info->env->i->open_dbs_rwlock);
+ toku_mutex_unlock(&db_txn_struct_i(dbtxn)->txn_mutex);
+
+ return r;
+}
+
+static int
+env_iterate_live_transactions(DB_ENV *env,
+ iterate_transactions_callback callback,
+ void *extra) {
+ if (!env_opened(env)) {
+ return EINVAL;
+ }
+
+ TXN_MANAGER txn_manager = toku_logger_get_txn_manager(env->i->logger);
+ iter_txns_callback_extra e(env, callback, extra);
+ return toku_txn_manager_iter_over_live_root_txns(txn_manager, iter_txns_callback, &e);
+}
+
+static void env_set_loader_memory_size(DB_ENV *env, uint64_t loader_memory_size) {
+ env->i->loader_memory_size = loader_memory_size;
+}
+
+static uint64_t env_get_loader_memory_size(DB_ENV *env) {
+ return env->i->loader_memory_size;
+}
+
static int
toku_env_create(DB_ENV ** envp, uint32_t flags) {
int r = ENOSYS;
@@ -2301,13 +2487,18 @@ toku_env_create(DB_ENV ** envp, uint32_t flags) {
USENV(txn_stat);
USENV(get_lock_timeout);
USENV(set_lock_timeout);
+ USENV(set_lock_timeout_callback);
USENV(set_redzone);
USENV(log_flush);
USENV(log_archive);
USENV(create_loader);
USENV(get_cursor_for_persistent_environment);
USENV(get_cursor_for_directory);
+ USENV(iterate_pending_lock_requests);
+ USENV(iterate_live_transactions);
USENV(change_fsync_log_period);
+ USENV(set_loader_memory_size);
+ USENV(get_loader_memory_size);
#undef USENV
// unlocked methods
@@ -2344,10 +2535,11 @@ toku_env_create(DB_ENV ** envp, uint32_t flags) {
// The escalate callback will need it to translate txnids to DB_TXNs
result->i->ltm.create(toku_db_lt_on_create_callback, toku_db_lt_on_destroy_callback, toku_db_txn_escalate_callback, result);
- r = toku_omt_create(&result->i->open_dbs);
- toku_mutex_init(&result->i->open_dbs_lock, NULL);
+ r = toku_omt_create(&result->i->open_dbs_by_dname);
assert_zero(r);
- assert(result->i->open_dbs);
+ r = toku_omt_create(&result->i->open_dbs_by_dict_id);
+ assert_zero(r);
+ toku_pthread_rwlock_init(&result->i->open_dbs_rwlock, NULL);
*envp = result;
r = 0;
@@ -2372,7 +2564,7 @@ DB_ENV_CREATE_FUN (DB_ENV ** envp, uint32_t flags) {
// return <0 if v is earlier in omt than dbv
// return >0 if v is later in omt than dbv
static int
-find_db_by_db (OMTVALUE v, void *dbv) {
+find_db_by_db_dname(OMTVALUE v, void *dbv) {
DB *db = (DB *) v; // DB* that is stored in the omt
DB *dbfind = (DB *) dbv; // extra, to be compared to v
int cmp;
@@ -2385,42 +2577,78 @@ find_db_by_db (OMTVALUE v, void *dbv) {
return 0;
}
+static int
+find_db_by_db_dict_id(OMTVALUE v, void *dbv) {
+ DB *db = (DB *) v;
+ DB *dbfind = (DB *) dbv;
+ DICTIONARY_ID dict_id = db->i->dict_id;
+ DICTIONARY_ID dict_id_find = dbfind->i->dict_id;
+ if (dict_id.dictid < dict_id_find.dictid) {
+ return -1;
+ } else if (dict_id.dictid > dict_id_find.dictid) {
+ return 1;
+ } else if (db < dbfind) {
+ return -1;
+ } else if (db > dbfind) {
+ return 1;
+ } else {
+ return 0;
+ }
+}
+
// Tell env that there is a new db handle (with non-unique dname in db->i-dname)
void
env_note_db_opened(DB_ENV *env, DB *db) {
- toku_mutex_lock(&env->i->open_dbs_lock);
+ toku_pthread_rwlock_wrlock(&env->i->open_dbs_rwlock);
assert(db->i->dname); // internal (non-user) dictionary has no dname
+
int r;
- OMTVALUE dbv;
+ OMTVALUE v;
uint32_t idx;
- STATUS_VALUE(YDB_LAYER_NUM_OPEN_DBS) = toku_omt_size(env->i->open_dbs);
+ r = toku_omt_find_zero(env->i->open_dbs_by_dname, find_db_by_db_dname,
+ db, &v, &idx);
+ assert(r == DB_NOTFOUND);
+ r = toku_omt_insert_at(env->i->open_dbs_by_dname, db, idx);
+ assert_zero(r);
+ r = toku_omt_find_zero(env->i->open_dbs_by_dict_id, find_db_by_db_dict_id,
+ db, &v, &idx);
+ assert(r == DB_NOTFOUND);
+ r = toku_omt_insert_at(env->i->open_dbs_by_dict_id, db, idx);
+ assert_zero(r);
+
+ STATUS_VALUE(YDB_LAYER_NUM_OPEN_DBS) = toku_omt_size(env->i->open_dbs_by_dname);
STATUS_VALUE(YDB_LAYER_NUM_DB_OPEN)++;
- if (STATUS_VALUE(YDB_LAYER_NUM_OPEN_DBS) > STATUS_VALUE(YDB_LAYER_MAX_OPEN_DBS))
+ if (STATUS_VALUE(YDB_LAYER_NUM_OPEN_DBS) > STATUS_VALUE(YDB_LAYER_MAX_OPEN_DBS)) {
STATUS_VALUE(YDB_LAYER_MAX_OPEN_DBS) = STATUS_VALUE(YDB_LAYER_NUM_OPEN_DBS);
- r = toku_omt_find_zero(env->i->open_dbs, find_db_by_db, db, &dbv, &idx);
- assert(r==DB_NOTFOUND); //Must not already be there.
- r = toku_omt_insert_at(env->i->open_dbs, db, idx);
- assert_zero(r);
- toku_mutex_unlock(&env->i->open_dbs_lock);
+ }
+ toku_pthread_rwlock_wrunlock(&env->i->open_dbs_rwlock);
}
// Effect: Tell the DB_ENV that the DB is no longer in use by the user of the API. The DB may still be in use by the fractal tree internals.
void
env_note_db_closed(DB_ENV *env, DB *db) {
- toku_mutex_lock(&env->i->open_dbs_lock);
+ toku_pthread_rwlock_wrlock(&env->i->open_dbs_rwlock);
assert(db->i->dname); // internal (non-user) dictionary has no dname
- assert(toku_omt_size(env->i->open_dbs) > 0);
+ assert(toku_omt_size(env->i->open_dbs_by_dname) > 0);
+ assert(toku_omt_size(env->i->open_dbs_by_dict_id) > 0);
+
int r;
- OMTVALUE dbv;
+ OMTVALUE v;
uint32_t idx;
- STATUS_VALUE(YDB_LAYER_NUM_DB_CLOSE)++;
- r = toku_omt_find_zero(env->i->open_dbs, find_db_by_db, db, &dbv, &idx);
- assert_zero(r); //Must already be there.
- assert((DB*)dbv == db);
- r = toku_omt_delete_at(env->i->open_dbs, idx);
- STATUS_VALUE(YDB_LAYER_NUM_OPEN_DBS) = toku_omt_size(env->i->open_dbs);
+ r = toku_omt_find_zero(env->i->open_dbs_by_dname, find_db_by_db_dname,
+ db, &v, &idx);
+ assert_zero(r);
+ r = toku_omt_delete_at(env->i->open_dbs_by_dname, idx);
assert_zero(r);
- toku_mutex_unlock(&env->i->open_dbs_lock);
+ r = toku_omt_find_zero(env->i->open_dbs_by_dict_id, find_db_by_db_dict_id,
+ db, &v, &idx);
+ assert_zero(r);
+ r = toku_omt_delete_at(env->i->open_dbs_by_dict_id, idx);
+ assert_zero(r);
+
+ STATUS_VALUE(YDB_LAYER_NUM_DB_CLOSE)++;
+ STATUS_VALUE(YDB_LAYER_NUM_OPEN_DBS) = toku_omt_size(env->i->open_dbs_by_dname);
+ toku_pthread_rwlock_wrunlock(&env->i->open_dbs_rwlock);
}
static int
@@ -2440,8 +2668,8 @@ env_is_db_with_dname_open(DB_ENV *env, const char *dname) {
bool rval;
OMTVALUE dbv;
uint32_t idx;
- toku_mutex_lock(&env->i->open_dbs_lock);
- r = toku_omt_find_zero(env->i->open_dbs, find_open_db_by_dname, (void*)dname, &dbv, &idx);
+ toku_pthread_rwlock_rdlock(&env->i->open_dbs_rwlock);
+ r = toku_omt_find_zero(env->i->open_dbs_by_dname, find_open_db_by_dname, (void*)dname, &dbv, &idx);
if (r==0) {
DB *db = (DB *) dbv;
assert(strcmp(dname, db->i->dname) == 0);
@@ -2451,7 +2679,7 @@ env_is_db_with_dname_open(DB_ENV *env, const char *dname) {
assert(r==DB_NOTFOUND);
rval = false;
}
- toku_mutex_unlock(&env->i->open_dbs_lock);
+ toku_pthread_rwlock_rdunlock(&env->i->open_dbs_rwlock);
return rval;
}