summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIlya Maximets <i.maximets@ovn.org>2021-04-15 19:05:40 +0200
committerIlya Maximets <i.maximets@ovn.org>2021-07-15 22:38:07 +0200
commit7964ffe7d2bfd35b08c221ad1c3c04dc4403b6f1 (patch)
tree58d6bcee6d997214646d64a08cf2c91cf3905856
parent026c77c58ddba12ad81082ca27564ab9c33986bd (diff)
downloadopenvswitch-7964ffe7d2bfd35b08c221ad1c3c04dc4403b6f1.tar.gz
ovsdb: relay: Add support for transaction forwarding.
Current version of ovsdb relay allows to scale out read-only access to the primary database. However, many clients are not read-only but read-mostly. For example, ovn-controller. In order to scale out database access for this case ovsdb-server need to process transactions that are not read-only. Relay is not allowed to do that, i.e. not allowed to modify the database, but it can act like a proxy and forward transactions that includes database modifications to the primary server and forward replies back to a client. At the same time it may serve read-only transactions and monitor requests by itself greatly reducing the load on primary server. This configuration will slightly increase transaction latency, but it's not very important for read-mostly use cases. Implementation details: With this change instead of creating a trigger to commit the transaction, ovsdb-server will create a trigger for transaction forwarding. Later, ovsdb_relay_run() will send all new transactions to the relay source. Once transaction reply received from the relay source, ovsdb-relay module will update the state of the transaction forwarding with the reply. After that, trigger_run() will complete the trigger and jsonrpc_server_run() will send the reply back to the client. Since transaction reply from the relay source will be received after all the updates, client will receive all the updates before receiving the transaction reply as it is in a normal scenario with other database models. Acked-by: Mark D. Gray <mark.d.gray@redhat.com> Acked-by: Dumitru Ceara <dceara@redhat.com> Signed-off-by: Ilya Maximets <i.maximets@ovn.org>
-rw-r--r--ovsdb/automake.mk2
-rw-r--r--ovsdb/execution.c18
-rw-r--r--ovsdb/ovsdb.c9
-rw-r--r--ovsdb/ovsdb.h8
-rw-r--r--ovsdb/relay.c12
-rw-r--r--ovsdb/transaction-forward.c182
-rw-r--r--ovsdb/transaction-forward.h44
-rw-r--r--ovsdb/trigger.c49
-rw-r--r--ovsdb/trigger.h41
-rw-r--r--tests/ovsdb-server.at85
10 files changed, 411 insertions, 39 deletions
diff --git a/ovsdb/automake.mk b/ovsdb/automake.mk
index 05c8ebbdf..62cc02686 100644
--- a/ovsdb/automake.mk
+++ b/ovsdb/automake.mk
@@ -48,6 +48,8 @@ ovsdb_libovsdb_la_SOURCES = \
ovsdb/trigger.h \
ovsdb/transaction.c \
ovsdb/transaction.h \
+ ovsdb/transaction-forward.c \
+ ovsdb/transaction-forward.h \
ovsdb/ovsdb-util.c \
ovsdb/ovsdb-util.h
ovsdb_libovsdb_la_CFLAGS = $(AM_CFLAGS)
diff --git a/ovsdb/execution.c b/ovsdb/execution.c
index dd2569055..f9b8067d0 100644
--- a/ovsdb/execution.c
+++ b/ovsdb/execution.c
@@ -99,7 +99,8 @@ lookup_executor(const char *name, bool *read_only)
}
/* On success, returns a transaction and stores the results to return to the
- * client in '*resultsp'.
+ * client in '*resultsp'. If 'forwarding_needed' is nonnull and transaction
+ * needs to be forwarded (in relay mode), sets '*forwarding_needed' to true.
*
* On failure, returns NULL. If '*resultsp' is nonnull, then it is the results
* to return to the client. If '*resultsp' is null, then the execution failed
@@ -111,7 +112,8 @@ ovsdb_execute_compose(struct ovsdb *db, const struct ovsdb_session *session,
const struct json *params, bool read_only,
const char *role, const char *id,
long long int elapsed_msec, long long int *timeout_msec,
- bool *durable, struct json **resultsp)
+ bool *durable, bool *forwarding_needed,
+ struct json **resultsp)
{
struct ovsdb_execution x;
struct ovsdb_error *error;
@@ -120,6 +122,9 @@ ovsdb_execute_compose(struct ovsdb *db, const struct ovsdb_session *session,
size_t i;
*durable = false;
+ if (forwarding_needed) {
+ *forwarding_needed = false;
+ }
if (params->type != JSON_ARRAY
|| !params->array.n
|| params->array.elems[0]->type != JSON_STRING
@@ -196,11 +201,8 @@ ovsdb_execute_compose(struct ovsdb *db, const struct ovsdb_session *session,
"%s operation not allowed on "
"table in reserved database %s",
op_name, db->schema->name);
- } else if (db->is_relay) {
- error = ovsdb_error("not allowed",
- "%s operation not allowed when "
- "database server is in relay mode",
- op_name);
+ } else if (db->is_relay && forwarding_needed) {
+ *forwarding_needed = true;
}
}
if (error) {
@@ -245,7 +247,7 @@ ovsdb_execute(struct ovsdb *db, const struct ovsdb_session *session,
struct json *results;
struct ovsdb_txn *txn = ovsdb_execute_compose(
db, session, params, read_only, role, id, elapsed_msec, timeout_msec,
- &durable, &results);
+ &durable, NULL, &results);
if (!txn) {
return results;
}
diff --git a/ovsdb/ovsdb.c b/ovsdb/ovsdb.c
index 999cd0d75..126d16a2f 100644
--- a/ovsdb/ovsdb.c
+++ b/ovsdb/ovsdb.c
@@ -33,6 +33,7 @@
#include "table.h"
#include "timeval.h"
#include "transaction.h"
+#include "transaction-forward.h"
#include "trigger.h"
#include "openvswitch/vlog.h"
@@ -422,6 +423,8 @@ ovsdb_create(struct ovsdb_schema *schema, struct ovsdb_storage *storage)
db->run_triggers_now = db->run_triggers = false;
db->is_relay = false;
+ ovs_list_init(&db->txn_forward_new);
+ hmap_init(&db->txn_forward_sent);
shash_init(&db->tables);
if (schema) {
@@ -465,6 +468,12 @@ ovsdb_destroy(struct ovsdb *db)
/* Destroy txn history. */
ovsdb_txn_history_destroy(db);
+ /* Cancell all the forwarded transactions. There should not be
+ * any as all triggers should be already cancelled. */
+ ovsdb_txn_forward_cancel_all(db, false);
+ ovs_assert(hmap_is_empty(&db->txn_forward_sent));
+ hmap_destroy(&db->txn_forward_sent);
+
/* The caller must ensure that no triggers remain. */
ovs_assert(ovs_list_is_empty(&db->triggers));
diff --git a/ovsdb/ovsdb.h b/ovsdb/ovsdb.h
index 16bd5f5ec..4a7bd0f0e 100644
--- a/ovsdb/ovsdb.h
+++ b/ovsdb/ovsdb.h
@@ -93,7 +93,11 @@ struct ovsdb {
struct ovs_list txn_history; /* Contains "struct ovsdb_txn_history_node. */
/* Relay mode. */
- bool is_relay;
+ bool is_relay; /* True, if database is in relay mode. */
+ /* List that holds transactions waiting to be forwarded to the server. */
+ struct ovs_list txn_forward_new;
+ /* Hash map for transactions that are already sent and waits for reply. */
+ struct hmap txn_forward_sent;
};
struct ovsdb *ovsdb_create(struct ovsdb_schema *, struct ovsdb_storage *);
@@ -107,7 +111,7 @@ struct ovsdb_txn *ovsdb_execute_compose(
struct ovsdb *, const struct ovsdb_session *, const struct json *params,
bool read_only, const char *role, const char *id,
long long int elapsed_msec, long long int *timeout_msec,
- bool *durable, struct json **);
+ bool *durable, bool *forwarding_needed, struct json **);
struct json *ovsdb_execute(struct ovsdb *, const struct ovsdb_session *,
const struct json *params, bool read_only,
diff --git a/ovsdb/relay.c b/ovsdb/relay.c
index 247d32e00..f55cdbc6f 100644
--- a/ovsdb/relay.c
+++ b/ovsdb/relay.c
@@ -32,6 +32,7 @@
#include "row.h"
#include "table.h"
#include "transaction.h"
+#include "transaction-forward.h"
#include "util.h"
VLOG_DEFINE_THIS_MODULE(relay);
@@ -302,6 +303,7 @@ ovsdb_relay_run(void)
struct relay_ctx *ctx = node->data;
struct ovs_list events;
+ ovsdb_txn_forward_run(ctx->db, ctx->cs);
ovsdb_cs_run(ctx->cs, &events);
struct ovsdb_cs_event *event;
@@ -313,7 +315,9 @@ ovsdb_relay_run(void)
switch (event->type) {
case OVSDB_CS_EVENT_TYPE_RECONNECT:
- /* Nothing to do. */
+ /* Cancelling all the transactions that were already sent but
+ * not replied yet as they might be lost. */
+ ovsdb_txn_forward_cancel_all(ctx->db, true);
break;
case OVSDB_CS_EVENT_TYPE_UPDATE:
@@ -321,8 +325,11 @@ ovsdb_relay_run(void)
break;
case OVSDB_CS_EVENT_TYPE_TXN_REPLY:
+ ovsdb_txn_forward_complete(ctx->db, event->txn_reply);
+ break;
+
case OVSDB_CS_EVENT_TYPE_LOCKED:
- /* Not expected. */
+ VLOG_WARN("%s: Unexpected LOCKED event.", ctx->db->name);
break;
}
ovsdb_cs_event_destroy(event);
@@ -339,5 +346,6 @@ ovsdb_relay_wait(void)
struct relay_ctx *ctx = node->data;
ovsdb_cs_wait(ctx->cs);
+ ovsdb_txn_forward_wait(ctx->db, ctx->cs);
}
}
diff --git a/ovsdb/transaction-forward.c b/ovsdb/transaction-forward.c
new file mode 100644
index 000000000..8ff12ef4b
--- /dev/null
+++ b/ovsdb/transaction-forward.c
@@ -0,0 +1,182 @@
+/*
+ * Copyright (c) 2021, Red Hat, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <config.h>
+
+#include "transaction-forward.h"
+
+#include "coverage.h"
+#include "jsonrpc.h"
+#include "openvswitch/hmap.h"
+#include "openvswitch/json.h"
+#include "openvswitch/list.h"
+#include "openvswitch/poll-loop.h"
+#include "openvswitch/vlog.h"
+#include "ovsdb.h"
+#include "ovsdb-cs.h"
+#include "util.h"
+
+VLOG_DEFINE_THIS_MODULE(transaction_forward);
+
+COVERAGE_DEFINE(txn_forward_cancel);
+COVERAGE_DEFINE(txn_forward_complete);
+COVERAGE_DEFINE(txn_forward_create);
+COVERAGE_DEFINE(txn_forward_sent);
+
+struct ovsdb_txn_forward {
+ struct ovs_list new_node; /* In 'txn_forward_new' of struct ovsdb. */
+ struct hmap_node sent_node; /* In 'txn_forward_sent' of struct ovsdb. */
+ struct json *id; /* 'id' of the forwarded transaction. */
+ struct jsonrpc_msg *request; /* Original request. */
+ struct jsonrpc_msg *reply; /* Reply from the server. */
+};
+
+struct ovsdb_txn_forward *
+ovsdb_txn_forward_create(struct ovsdb *db, const struct jsonrpc_msg *request)
+{
+ struct ovsdb_txn_forward *txn_fwd = xzalloc(sizeof *txn_fwd);
+
+ COVERAGE_INC(txn_forward_create);
+ txn_fwd->request = jsonrpc_msg_clone(request);
+ ovs_list_push_back(&db->txn_forward_new, &txn_fwd->new_node);
+
+ return txn_fwd;
+}
+
+static void
+ovsdb_txn_forward_unlist(struct ovsdb *db, struct ovsdb_txn_forward *txn_fwd)
+{
+ if (!ovs_list_is_empty(&txn_fwd->new_node)) {
+ ovs_list_remove(&txn_fwd->new_node);
+ ovs_list_init(&txn_fwd->new_node);
+ }
+ if (!hmap_node_is_null(&txn_fwd->sent_node)) {
+ hmap_remove(&db->txn_forward_sent, &txn_fwd->sent_node);
+ hmap_node_nullify(&txn_fwd->sent_node);
+ }
+}
+
+void
+ovsdb_txn_forward_destroy(struct ovsdb *db, struct ovsdb_txn_forward *txn_fwd)
+{
+ if (!txn_fwd) {
+ return;
+ }
+
+ ovsdb_txn_forward_unlist(db, txn_fwd);
+ json_destroy(txn_fwd->id);
+ jsonrpc_msg_destroy(txn_fwd->request);
+ jsonrpc_msg_destroy(txn_fwd->reply);
+ free(txn_fwd);
+}
+
+bool
+ovsdb_txn_forward_is_complete(const struct ovsdb_txn_forward *txn_fwd)
+{
+ return txn_fwd->reply != NULL;
+}
+
+void
+ovsdb_txn_forward_complete(struct ovsdb *db, const struct jsonrpc_msg *reply)
+{
+ struct ovsdb_txn_forward *t;
+ size_t hash = json_hash(reply->id, 0);
+
+ HMAP_FOR_EACH_WITH_HASH (t, sent_node, hash, &db->txn_forward_sent) {
+ if (json_equal(reply->id, t->id)) {
+ COVERAGE_INC(txn_forward_complete);
+ t->reply = jsonrpc_msg_clone(reply);
+
+ /* Replacing id with the id of the original request. */
+ json_destroy(t->reply->id);
+ t->reply->id = json_clone(t->request->id);
+
+ hmap_remove(&db->txn_forward_sent, &t->sent_node);
+ hmap_node_nullify(&t->sent_node);
+
+ db->run_triggers_now = db->run_triggers = true;
+ return;
+ }
+ }
+}
+
+struct jsonrpc_msg *
+ovsdb_txn_forward_steal_reply(struct ovsdb_txn_forward *txn_fwd)
+{
+ struct jsonrpc_msg *reply = txn_fwd->reply;
+
+ txn_fwd->reply = NULL;
+ return reply;
+}
+
+void
+ovsdb_txn_forward_run(struct ovsdb *db, struct ovsdb_cs *cs)
+{
+ struct ovsdb_txn_forward *t, *next;
+
+ /* Send all transactions that needs to be forwarded. */
+ LIST_FOR_EACH_SAFE (t, next, new_node, &db->txn_forward_new) {
+ if (!ovsdb_cs_may_send_transaction(cs)) {
+ break;
+ }
+ ovs_assert(!strcmp(t->request->method, "transact"));
+ t->id = ovsdb_cs_send_transaction(cs, json_clone(t->request->params));
+ if (t->id) {
+ COVERAGE_INC(txn_forward_sent);
+ ovs_list_remove(&t->new_node);
+ ovs_list_init(&t->new_node);
+ hmap_insert(&db->txn_forward_sent, &t->sent_node,
+ json_hash(t->id, 0));
+ }
+ }
+}
+
+void
+ovsdb_txn_forward_wait(struct ovsdb *db, struct ovsdb_cs *cs)
+{
+ if (ovsdb_cs_may_send_transaction(cs)
+ && !ovs_list_is_empty(&db->txn_forward_new)) {
+ poll_immediate_wake();
+ }
+}
+
+void
+ovsdb_txn_forward_cancel(struct ovsdb *db, struct ovsdb_txn_forward *txn_fwd)
+{
+ COVERAGE_INC(txn_forward_cancel);
+ jsonrpc_msg_destroy(txn_fwd->reply);
+ txn_fwd->reply = jsonrpc_create_error(json_string_create("canceled"),
+ txn_fwd->request->id);
+ ovsdb_txn_forward_unlist(db, txn_fwd);
+}
+
+void
+ovsdb_txn_forward_cancel_all(struct ovsdb *db, bool sent_only)
+{
+ struct ovsdb_txn_forward *t, *next;
+
+ HMAP_FOR_EACH_SAFE (t, next, sent_node, &db->txn_forward_sent) {
+ ovsdb_txn_forward_cancel(db, t);
+ }
+
+ if (sent_only) {
+ return;
+ }
+
+ LIST_FOR_EACH_SAFE (t, next, new_node, &db->txn_forward_new) {
+ ovsdb_txn_forward_cancel(db, t);
+ }
+}
diff --git a/ovsdb/transaction-forward.h b/ovsdb/transaction-forward.h
new file mode 100644
index 000000000..6788d3824
--- /dev/null
+++ b/ovsdb/transaction-forward.h
@@ -0,0 +1,44 @@
+/*
+ * Copyright (c) 2021, Red Hat, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef OVSDB_TXN_FORWARD_H
+#define OVSDB_TXN_FORWARD_H 1
+
+#include <stdbool.h>
+
+struct ovsdb;
+struct ovsdb_cs;
+struct ovsdb_txn_forward;
+struct jsonrpc_session;
+struct jsonrpc_msg;
+
+struct ovsdb_txn_forward *ovsdb_txn_forward_create(
+ struct ovsdb *, const struct jsonrpc_msg *request);
+void ovsdb_txn_forward_destroy(struct ovsdb *, struct ovsdb_txn_forward *);
+
+bool ovsdb_txn_forward_is_complete(const struct ovsdb_txn_forward *);
+void ovsdb_txn_forward_complete(struct ovsdb *,
+ const struct jsonrpc_msg *reply);
+
+struct jsonrpc_msg *ovsdb_txn_forward_steal_reply(struct ovsdb_txn_forward *);
+
+void ovsdb_txn_forward_run(struct ovsdb *, struct ovsdb_cs *);
+void ovsdb_txn_forward_wait(struct ovsdb *, struct ovsdb_cs *);
+
+void ovsdb_txn_forward_cancel(struct ovsdb *, struct ovsdb_txn_forward *);
+void ovsdb_txn_forward_cancel_all(struct ovsdb *, bool sent_only);
+
+#endif /* OVSDB_TXN_FORWARD_H */
diff --git a/ovsdb/trigger.c b/ovsdb/trigger.c
index 0372302af..726c138bf 100644
--- a/ovsdb/trigger.c
+++ b/ovsdb/trigger.c
@@ -28,6 +28,7 @@
#include "openvswitch/poll-loop.h"
#include "server.h"
#include "transaction.h"
+#include "transaction-forward.h"
#include "openvswitch/vlog.h"
#include "util.h"
@@ -53,6 +54,7 @@ ovsdb_trigger_init(struct ovsdb_session *session, struct ovsdb *db,
trigger->request = request;
trigger->reply = NULL;
trigger->progress = NULL;
+ trigger->txn_forward = NULL;
trigger->created = now;
trigger->timeout_msec = LLONG_MAX;
trigger->read_only = read_only;
@@ -65,6 +67,7 @@ void
ovsdb_trigger_destroy(struct ovsdb_trigger *trigger)
{
ovsdb_txn_progress_destroy(trigger->progress);
+ ovsdb_txn_forward_destroy(trigger->db, trigger->txn_forward);
ovs_list_remove(&trigger->node);
jsonrpc_msg_destroy(trigger->request);
jsonrpc_msg_destroy(trigger->reply);
@@ -75,7 +78,7 @@ ovsdb_trigger_destroy(struct ovsdb_trigger *trigger)
bool
ovsdb_trigger_is_complete(const struct ovsdb_trigger *trigger)
{
- return trigger->reply && !trigger->progress;
+ return trigger->reply && !trigger->progress && !trigger->txn_forward;
}
struct jsonrpc_msg *
@@ -98,6 +101,11 @@ ovsdb_trigger_cancel(struct ovsdb_trigger *trigger, const char *reason)
trigger->progress = NULL;
}
+ if (trigger->txn_forward) {
+ ovsdb_txn_forward_destroy(trigger->db, trigger->txn_forward);
+ trigger->txn_forward = NULL;
+ }
+
jsonrpc_msg_destroy(trigger->reply);
trigger->reply = NULL;
@@ -148,7 +156,7 @@ ovsdb_trigger_run(struct ovsdb *db, long long int now)
LIST_FOR_EACH_SAFE (t, next, node, &db->triggers) {
if (run_triggers
|| now - t->created >= t->timeout_msec
- || t->progress) {
+ || t->progress || t->txn_forward) {
if (ovsdb_trigger_try(t, now)) {
disconnect_all = true;
}
@@ -188,7 +196,7 @@ static bool
ovsdb_trigger_try(struct ovsdb_trigger *t, long long int now)
{
/* Handle "initialized" state. */
- if (!t->reply) {
+ if (!t->reply && !t->txn_forward) {
ovs_assert(!t->progress);
struct ovsdb_txn *txn = NULL;
@@ -198,13 +206,14 @@ ovsdb_trigger_try(struct ovsdb_trigger *t, long long int now)
return false;
}
- bool durable;
+ bool durable, forwarding_needed;
struct json *result;
+ /* Trying to compose transaction. */
txn = ovsdb_execute_compose(
t->db, t->session, t->request->params, t->read_only,
t->role, t->id, now - t->created, &t->timeout_msec,
- &durable, &result);
+ &durable, &forwarding_needed, &result);
if (!txn) {
if (result) {
/* Complete. There was an error but we still represent it
@@ -217,9 +226,20 @@ ovsdb_trigger_try(struct ovsdb_trigger *t, long long int now)
return false;
}
- /* Transition to "committing" state. */
- t->reply = jsonrpc_create_reply(result, t->request->id);
- t->progress = ovsdb_txn_propose_commit(txn, durable);
+ if (forwarding_needed) {
+ /* Transaction is good, but we don't need it. */
+ ovsdb_txn_abort(txn);
+ json_destroy(result);
+ /* Transition to "forwarding" state. */
+ t->txn_forward = ovsdb_txn_forward_create(t->db, t->request);
+ /* Forward will not be completed immediately. Will check
+ * next time. */
+ return false;
+ } else {
+ /* Transition to "committing" state. */
+ t->reply = jsonrpc_create_reply(result, t->request->id);
+ t->progress = ovsdb_txn_propose_commit(txn, durable);
+ }
} else if (!strcmp(t->request->method, "convert")) {
/* Permission check. */
if (t->role && *t->role) {
@@ -349,6 +369,19 @@ ovsdb_trigger_try(struct ovsdb_trigger *t, long long int now)
}
return false;
+ } else if (t->txn_forward) {
+ /* Handle "forwarding" state. */
+ if (!ovsdb_txn_forward_is_complete(t->txn_forward)) {
+ return false;
+ }
+
+ /* Transition to "complete". */
+ ovs_assert(!t->reply);
+ t->reply = ovsdb_txn_forward_steal_reply(t->txn_forward);
+ ovsdb_txn_forward_destroy(t->db, t->txn_forward);
+ t->txn_forward = NULL;
+ ovsdb_trigger_complete(t);
+ return false;
}
OVS_NOT_REACHED();
diff --git a/ovsdb/trigger.h b/ovsdb/trigger.h
index 79af7f6be..d060c72e5 100644
--- a/ovsdb/trigger.h
+++ b/ovsdb/trigger.h
@@ -22,26 +22,34 @@ struct ovsdb;
/* Triggers have the following states:
*
- * - Initialized (reply == NULL, progress == NULL): Executing the trigger
- * can keep it in the initialized state, if it has a "wait" condition that
- * isn't met. Executing the trigger can also yield an error, in which
- * case it transitions to "complete". Otherwise, execution yields a
- * transaction, which the database attempts to commit. If the transaction
- * completes immediately and synchronously, then the trigger transitions
- * to the "complete" state. If the transaction requires some time to
- * complete, it transitions to the "committing" state.
+ * - Initialized (reply == NULL, progress == NULL, txn_forward == NULL):
+ * Executing the trigger can keep it in the initialized state, if it has a
+ * "wait" condition that isn't met. Executing the trigger can also yield
+ * an error, in which case it transitions to "complete". Otherwise,
+ * execution yields a transaction, which the database attempts to commit.
+ * If the transaction completes immediately and synchronously, then the
+ * trigger transitions to the "complete" state. If the transaction
+ * requires some time to complete, it transitions to the "committing"
+ * state. If the transaction can not be completed locally due to
+ * read-only restrictions and transaction forwarding is enabled, starts
+ * forwarding and transitions to the "forwarding" state.
*
- * - Committing (reply != NULL, progress != NULL): The transaction is
- * committing. If it succeeds, or if it fails permanently, then the
- * trigger transitions to "complete". If it fails temporarily
- * (e.g. because someone else committed to cluster-based storage before we
- * did), then we transition back to "initialized" to try again.
+ * - Committing (reply != NULL, progress != NULL, txn_forward == NULL):
+ * The transaction is committing. If it succeeds, or if it fails
+ * permanently, then the trigger transitions to "complete". If it fails
+ * temporarily (e.g. because someone else committed to cluster-based
+ * storage before we did), then we transition back to "initialized" to
+ * try again.
*
- * - Complete (reply != NULL, progress == NULL): The transaction is done
- * and either succeeded or failed.
+ * - Forwarding (reply == NULL, progress == NULL, txn_forward != NULL):
+ * Transaction is forwarded. Either it succeeds or it fails, the trigger
+ * transitions to "complete".
+ *
+ * - Complete (reply != NULL, progress == NULL, txn_forward == NULL):
+ * The transaction is done and either succeeded or failed.
*/
struct ovsdb_trigger {
- /* In "initialized" or "committing" state, in db->triggers.
+ /* In "initialized", "committing" or "forwarding" state, in db->triggers.
* In "complete", in session->completions. */
struct ovs_list node;
struct ovsdb_session *session; /* Session that owns this trigger. */
@@ -49,6 +57,7 @@ struct ovsdb_trigger {
struct jsonrpc_msg *request; /* Database request. */
struct jsonrpc_msg *reply; /* Result (null if none yet). */
struct ovsdb_txn_progress *progress;
+ struct ovsdb_txn_forward *txn_forward; /* Tracks transaction forwarding. */
long long int created; /* Time created. */
long long int timeout_msec; /* Max wait duration. */
bool read_only; /* Database is in read only mode. */
diff --git a/tests/ovsdb-server.at b/tests/ovsdb-server.at
index ba1b369c1..ac243d6a7 100644
--- a/tests/ovsdb-server.at
+++ b/tests/ovsdb-server.at
@@ -3,10 +3,13 @@ AT_BANNER([OVSDB -- ovsdb-server transactions (Unix sockets)])
m4_define([OVSDB_SERVER_SHUTDOWN],
[OVS_APP_EXIT_AND_WAIT_BY_TARGET([ovsdb-server], [ovsdb-server.pid])])
+m4_define([OVSDB_SERVER_SHUTDOWN_N],
+ [cp pid$1 savepid$1
+ AT_CHECK([ovs-appctl -t "`pwd`"/unixctl$1 -e exit], [0], [ignore], [ignore])
+ OVS_WAIT_WHILE([kill -0 `cat savepid$1`], [kill `cat savepid$1`])])
+
m4_define([OVSDB_SERVER_SHUTDOWN2],
- [cp pid2 savepid2
- AT_CHECK([ovs-appctl -t "`pwd`"/unixctl2 -e exit], [0], [ignore], [ignore])
- OVS_WAIT_WHILE([kill -0 `cat savepid2`], [kill `cat savepid2`])])
+ [OVSDB_SERVER_SHUTDOWN_N([2])])
# OVSDB_CHECK_EXECUTION(TITLE, SCHEMA, TRANSACTIONS, OUTPUT, [KEYWORDS])
#
@@ -1412,6 +1415,82 @@ m4_define([OVSDB_CHECK_EXECUTION],
EXECUTION_EXAMPLES
+AT_BANNER([OVSDB -- ovsdb-server relay])
+
+# OVSDB_CHECK_EXECUTION(TITLE, SCHEMA, TRANSACTIONS, OUTPUT, [KEYWORDS])
+#
+# Creates a database with the given SCHEMA and starts an ovsdb-server on
+# it. Also starts a daisy chain of ovsdb-servers in relay mode where the
+# first relay server is connected to the main non-relay ovsdb-server.
+#
+# Runs each of the TRANSACTIONS (which should be a quoted list of
+# quoted strings) against one of relay servers in the middle with
+# ovsdb-client one at a time. The server executes read-only transactions
+# and forwards rest of them to the previous ovsdb-server in a chain.
+# The main ovsdb-server executes 'write' transactions. Transaction
+# reply with data updates propagates back through the chain to all
+# the servers and the client.
+#
+# main relay relay relay relay relay
+# server1 <-- server2 <-- server3 <-- server4 <-- server5 <-- server6
+# ^
+# |
+# ovsdb-client
+#
+# Checks that the overall output is OUTPUT, but UUIDs in the output
+# are replaced by markers of the form <N> where N is a number. The
+# first unique UUID is replaced by <0>, the next by <1>, and so on.
+# If a given UUID appears more than once it is always replaced by the
+# same marker.
+#
+# Checks that the dump of all databases is the same.
+#
+# TITLE is provided to AT_SETUP and KEYWORDS to AT_KEYWORDS.
+m4_define([OVSDB_CHECK_EXECUTION],
+ [AT_SETUP([$1])
+ AT_KEYWORDS([ovsdb server tcp relay $5])
+ n_servers=6
+ target=4
+ $2 > schema
+ schema_name=`ovsdb-tool schema-name schema`
+ AT_CHECK([ovsdb-tool create db1 schema], [0], [stdout], [ignore])
+
+ on_exit 'kill `cat *.pid`'
+ AT_CHECK([ovsdb-server --detach --no-chdir --log-file=ovsdb-server1.log dnl
+ --pidfile --remote=punix:db1.sock db1
+ ], [0], [ignore], [ignore])
+
+ for i in $(seq 2 ${n_servers}); do
+ AT_CHECK([ovsdb-server --detach --no-chdir dnl
+ --log-file=ovsdb-server$i.log dnl
+ --pidfile=${i}.pid --remote=punix:db${i}.sock dnl
+ --unixctl=unixctl${i} -vjsonrpc:file:dbg dnl
+ relay:${schema_name}:unix:db$((i-1)).sock
+ ], [0], [ignore], [ignore])
+ done
+
+ m4_foreach([txn], [$3],
+ [AT_CHECK([ovsdb-client transact unix:db${target}.sock 'txn'], [0],
+ [stdout], [ignore])
+ cat stdout >> output
+ ])
+
+ AT_CHECK([uuidfilt output], [0], [$4], [ignore])
+
+ AT_CHECK([ovsdb-client dump unix:db1.sock], [0], [stdout], [ignore])
+ for i in $(seq 2 ${n_servers}); do
+ OVS_WAIT_UNTIL([ovsdb-client dump unix:db${i}.sock > dump${i}; dnl
+ diff stdout dump${i}])
+ done
+
+ OVSDB_SERVER_SHUTDOWN
+ for i in $(seq 2 ${n_servers}); do
+ OVSDB_SERVER_SHUTDOWN_N([$i])
+ done
+ AT_CLEANUP])
+
+EXECUTION_EXAMPLES
+
AT_BANNER([OVSDB -- ovsdb-server replication])
# OVSDB_CHECK_EXECUTION(TITLE, SCHEMA, TRANSACTIONS, OUTPUT, [KEYWORDS])