/* Copyright (c) 2009, 2010, 2011, 2012 Nicira, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include #include "trigger.h" #include #include #include "file.h" #include "openvswitch/json.h" #include "jsonrpc.h" #include "ovsdb.h" #include "ovsdb-error.h" #include "openvswitch/poll-loop.h" #include "server.h" #include "transaction.h" #include "transaction-forward.h" #include "openvswitch/vlog.h" #include "util.h" #include "uuid.h" VLOG_DEFINE_THIS_MODULE(trigger); static bool ovsdb_trigger_try(struct ovsdb_trigger *, long long int now); static void ovsdb_trigger_complete(struct ovsdb_trigger *); static void trigger_convert_error(struct ovsdb_trigger *, struct ovsdb_error *); static void trigger_success(struct ovsdb_trigger *, struct json *result); bool ovsdb_trigger_init(struct ovsdb_session *session, struct ovsdb *db, struct ovsdb_trigger *trigger, struct jsonrpc_msg *request, long long int now, bool read_only, const char *role, const char *id) { ovs_assert(!strcmp(request->method, "transact") || !strcmp(request->method, "convert")); trigger->session = session; trigger->db = db; ovs_list_push_back(&trigger->db->triggers, &trigger->node); trigger->request = request; trigger->converted_db = NULL; trigger->reply = NULL; trigger->progress = NULL; trigger->txn_forward = NULL; trigger->created = now; trigger->timeout_msec = LLONG_MAX; trigger->read_only = read_only; trigger->role = nullable_xstrdup(role); trigger->id = nullable_xstrdup(id); return ovsdb_trigger_try(trigger, now); } void ovsdb_trigger_destroy(struct ovsdb_trigger *trigger) { ovsdb_txn_progress_destroy(trigger->progress); ovsdb_txn_forward_destroy(trigger->db, trigger->txn_forward); ovs_list_remove(&trigger->node); ovsdb_destroy(trigger->converted_db); jsonrpc_msg_destroy(trigger->request); jsonrpc_msg_destroy(trigger->reply); free(trigger->role); free(trigger->id); } bool ovsdb_trigger_is_complete(const struct ovsdb_trigger *trigger) { return trigger->reply && !trigger->progress && !trigger->txn_forward; } struct jsonrpc_msg * ovsdb_trigger_steal_reply(struct ovsdb_trigger *trigger) { struct jsonrpc_msg *reply = trigger->reply; trigger->reply = NULL; return reply; } /* Cancels 'trigger'. 'reason' should be a human-readable reason for log * messages etc. */ void ovsdb_trigger_cancel(struct ovsdb_trigger *trigger, const char *reason) { if (trigger->progress) { /* The transaction still might complete asynchronously, but we can stop * tracking it. */ ovsdb_txn_progress_destroy(trigger->progress); trigger->progress = NULL; } if (trigger->txn_forward) { ovsdb_txn_forward_destroy(trigger->db, trigger->txn_forward); trigger->txn_forward = NULL; } jsonrpc_msg_destroy(trigger->reply); trigger->reply = NULL; if (!strcmp(trigger->request->method, "transact")) { /* There's no place to stick 'reason' into the error reply because RFC * 7047 prescribes a fix form for these messages, see section 4.1.4. */ trigger->reply = jsonrpc_create_error(json_string_create("canceled"), trigger->request->id); ovsdb_trigger_complete(trigger); } else if (!strcmp(trigger->request->method, "convert")) { trigger_convert_error( trigger, ovsdb_error("canceled", "database conversion canceled because %s", reason)); } } void ovsdb_trigger_prereplace_db(struct ovsdb_trigger *trigger) { if (!ovsdb_trigger_is_complete(trigger)) { if (!strcmp(trigger->request->method, "transact")) { ovsdb_trigger_cancel(trigger, "database schema is changing"); } else if (!strcmp(trigger->request->method, "convert")) { /* We don't cancel "convert" requests when a database is being * replaced for two reasons. First, we expect the administrator to * do some kind of sensible synchronization on conversion requests, * that is, it only really makes sense for the admin to do a single * conversion at a time at a scheduled point. Second, if we did * then every "convert" request would end up getting canceled since * "convert" itself causes the database to be replaced. */ } else { OVS_NOT_REACHED(); } } } /* Find among incomplete triggers one that caused database conversion * with specified transaction ID. */ struct ovsdb * ovsdb_trigger_find_and_steal_converted_db(const struct ovsdb *db, const struct uuid *txnid) { struct ovsdb *converted_db = NULL; struct ovsdb_trigger *t; if (uuid_is_zero(txnid)) { return NULL; } LIST_FOR_EACH_SAFE (t, node, &db->triggers) { if (t->db == db && t->converted_db && uuid_equals(&t->conversion_txnid, txnid)) { converted_db = t->converted_db; t->converted_db = NULL; break; } } return converted_db; } bool ovsdb_trigger_run(struct ovsdb *db, long long int now) { struct ovsdb_trigger *t; bool run_triggers = db->run_triggers; db->run_triggers_now = db->run_triggers = false; bool disconnect_all = false; LIST_FOR_EACH_SAFE (t, node, &db->triggers) { if (run_triggers || now - t->created >= t->timeout_msec || t->progress || t->txn_forward) { if (ovsdb_trigger_try(t, now)) { disconnect_all = true; } } } return disconnect_all; } void ovsdb_trigger_wait(struct ovsdb *db, long long int now) { if (db->run_triggers_now) { poll_immediate_wake(); } else { long long int deadline = LLONG_MAX; struct ovsdb_trigger *t; LIST_FOR_EACH (t, node, &db->triggers) { if (t->created < LLONG_MAX - t->timeout_msec) { long long int t_deadline = t->created + t->timeout_msec; if (deadline > t_deadline) { deadline = t_deadline; if (now >= deadline) { break; } } } } if (deadline < LLONG_MAX) { poll_timer_wait_until(deadline); } } } static bool ovsdb_trigger_try(struct ovsdb_trigger *t, long long int now) { /* Handle "initialized" state. */ if (!t->reply && !t->txn_forward) { ovs_assert(!t->progress); struct ovsdb_txn *txn = NULL; if (!strcmp(t->request->method, "transact")) { if (!ovsdb_txn_precheck_prereq(t->db)) { return false; } bool durable, forwarding_needed; struct json *result; /* Trying to compose transaction. */ txn = ovsdb_execute_compose( t->db, t->session, t->request->params, t->read_only, t->role, t->id, now - t->created, &t->timeout_msec, &durable, &forwarding_needed, &result); if (!txn) { if (result) { /* Complete. There was an error but we still represent it * in JSON-RPC as a successful result. */ trigger_success(t, result); } else { /* Unsatisfied "wait" condition. Take no action now, retry * later. */ } return false; } if (forwarding_needed) { /* Transaction is good, but we don't need it. */ ovsdb_txn_abort(txn); json_destroy(result); /* Transition to "forwarding" state. */ t->txn_forward = ovsdb_txn_forward_create(t->db, t->request); /* Forward will not be completed immediately. Will check * next time. */ return false; } else { /* Transition to "committing" state. */ t->reply = jsonrpc_create_reply(result, t->request->id); t->progress = ovsdb_txn_propose_commit(txn, durable); } } else if (!strcmp(t->request->method, "convert")) { /* Permission check. */ if (t->role && *t->role) { trigger_convert_error( t, ovsdb_perm_error( "RBAC rules for client \"%s\" role \"%s\" prohibit " "\"convert\" of database %s " "(only the root role may convert databases)", t->id, t->role, t->db->schema->name)); return false; } /* Validate parameters. */ const struct json *params = t->request->params; if (params->type != JSON_ARRAY || params->array.n != 2) { trigger_convert_error(t, ovsdb_syntax_error(params, NULL, "array expected")); return false; } /* Parse new schema and make a converted copy. */ const struct json *new_schema_json = params->array.elems[1]; struct ovsdb_schema *new_schema; struct ovsdb_error *error = ovsdb_schema_from_json(new_schema_json, &new_schema); if (!error && strcmp(new_schema->name, t->db->schema->name)) { error = ovsdb_error("invalid parameters", "new schema name (%s) does not match " "database name (%s)", new_schema->name, t->db->schema->name); } if (!error) { ovsdb_destroy(t->converted_db); error = ovsdb_convert(t->db, new_schema, &t->converted_db); } if (error) { ovsdb_schema_destroy(new_schema); trigger_convert_error(t, error); return false; } struct json *txn_json; if (ovsdb_conversion_with_no_data_supported(t->db)) { txn_json = json_null_create(); } else { /* Make the new copy into a transaction log record. */ txn_json = ovsdb_to_txn_json( t->converted_db, "converted by ovsdb-server", true); } /* Propose the change. */ t->progress = ovsdb_txn_propose_schema_change( t->db, new_schema, txn_json, &t->conversion_txnid); ovsdb_schema_destroy(new_schema); json_destroy(txn_json); t->reply = jsonrpc_create_reply(json_object_create(), t->request->id); } else { OVS_NOT_REACHED(); } /* If the transaction committed synchronously, complete it and * transition to "complete". This is more than an optimization because * the file-based storage isn't implemented to read back the * transactions that we write (which is an ugly broken abstraction but * it's what we have). */ if (ovsdb_txn_progress_is_complete(t->progress) && !ovsdb_txn_progress_get_error(t->progress)) { if (txn) { ovsdb_txn_complete(txn); } ovsdb_txn_progress_destroy(t->progress); t->progress = NULL; ovsdb_trigger_complete(t); if (t->converted_db) { ovsdb_replace(t->db, t->converted_db); t->converted_db = NULL; return true; } return false; } /* Fall through to the general handling for the "committing" state. We * abort the transaction--if and when it eventually commits, we'll read * it back from storage and replay it locally. */ if (txn) { ovsdb_txn_abort(txn); } } /* Handle "committing" state. */ if (t->progress) { if (!ovsdb_txn_progress_is_complete(t->progress)) { return false; } /* Transition to "complete". */ struct ovsdb_error *error = ovsdb_error_clone(ovsdb_txn_progress_get_error(t->progress)); ovsdb_txn_progress_destroy(t->progress); t->progress = NULL; if (error) { if (!strcmp(ovsdb_error_get_tag(error), "cluster error")) { /* Temporary error. Transition back to "initialized" state to * try again. */ char *err_s = ovsdb_error_to_string(error); VLOG_DBG("cluster error %s", err_s); jsonrpc_msg_destroy(t->reply); t->reply = NULL; t->db->run_triggers = true; if (!strstr(err_s, "not leader")) { t->db->run_triggers_now = true; } free(err_s); ovsdb_error_destroy(error); } else { /* Permanent error. Transition to "completed" state to report * it. */ if (!strcmp(t->request->method, "transact")) { json_array_add(t->reply->result, ovsdb_error_to_json_free(error)); ovsdb_trigger_complete(t); } else if (!strcmp(t->request->method, "convert")) { jsonrpc_msg_destroy(t->reply); t->reply = NULL; trigger_convert_error(t, error); } } } else { /* Success. */ ovsdb_trigger_complete(t); } return false; } else if (t->txn_forward) { /* Handle "forwarding" state. */ if (!ovsdb_txn_forward_is_complete(t->txn_forward)) { return false; } /* Transition to "complete". */ ovs_assert(!t->reply); t->reply = ovsdb_txn_forward_steal_reply(t->txn_forward); ovsdb_txn_forward_destroy(t->db, t->txn_forward); t->txn_forward = NULL; ovsdb_trigger_complete(t); return false; } OVS_NOT_REACHED(); } static void ovsdb_trigger_complete(struct ovsdb_trigger *t) { ovs_assert(t->reply); ovs_list_remove(&t->node); ovs_list_push_back(&t->session->completions, &t->node); } /* Makes a "convert" request into an error. * * This is not suitable for "transact" requests because their replies should * never be bare ovsdb_errors: RFC 7047 says that their replies must either be * a JSON-RPC reply that contains an array of operation replies (which can be * errors), or a JSON-RPC error whose "error" member is simply "canceled". */ static void trigger_convert_error(struct ovsdb_trigger *t, struct ovsdb_error *error) { ovs_assert(!strcmp(t->request->method, "convert")); ovs_assert(error && !t->reply); t->reply = jsonrpc_create_error( ovsdb_error_to_json_free(error), t->request->id); ovsdb_trigger_complete(t); } static void trigger_success(struct ovsdb_trigger *t, struct json *result) { ovs_assert(result && !t->reply); t->reply = jsonrpc_create_reply(result, t->request->id); ovsdb_trigger_complete(t); }