summaryrefslogtreecommitdiff
path: root/ovsdb
diff options
context:
space:
mode:
authorBen Pfaff <blp@nicira.com>2009-11-16 10:38:14 -0800
committerBen Pfaff <blp@nicira.com>2009-11-17 16:02:46 -0800
commita8425c53c5785856cabe80295f0cea0135febdb6 (patch)
treeaf060d0fc8cb939e56087a3f90123bbeccac0a38 /ovsdb
parent1fd13cde12973420d573af8d161c612a9203b1cd (diff)
downloadopenvswitch-a8425c53c5785856cabe80295f0cea0135febdb6.tar.gz
ovsdb: Monitor support.
Diffstat (limited to 'ovsdb')
-rw-r--r--ovsdb/SPECS128
-rw-r--r--ovsdb/column.c13
-rw-r--r--ovsdb/column.h1
-rw-r--r--ovsdb/jsonrpc-server.c426
-rw-r--r--ovsdb/ovsdb-client.1.in24
-rw-r--r--ovsdb/ovsdb-client.c222
6 files changed, 808 insertions, 6 deletions
diff --git a/ovsdb/SPECS b/ovsdb/SPECS
index ae4d649b6..97c9a7806 100644
--- a/ovsdb/SPECS
+++ b/ovsdb/SPECS
@@ -281,6 +281,134 @@ form:
The "cancel" notification itself has no reply.
+monitor
+.......
+
+Request object members:
+
+ "method": "monitor" required
+ "params": [<value>, <monitor-requests>] required
+ "id": any JSON value except null required
+
+<monitor-requests> is an object that maps from a table name to a
+<monitor-request>.
+
+Each <monitor-request> is an object with the following members:
+
+ "columns": [<column>*] optional
+ "select": <monitor-select> optional
+
+<monitor-select> is an object with the following members:
+
+ "initial": <boolean> optional
+ "insert": <boolean> optional
+ "delete": <boolean> optional
+ "modify": <boolean> optional
+
+Response object members:
+
+ "result": <table-updates>
+ "error": null
+ "id": same "id" as request
+
+This JSON-RPC request enables a client to replicate tables or subsets
+of tables. Each <monitor-request> specifies a table to be replicated.
+The JSON-RPC response to the "monitor" includes the initial contents
+of each table. Afterward, when changes to those tables are committed,
+the changes are automatically sent to the client using the "update"
+monitor notification. This monitoring persists until the JSON-RPC
+session terminates or until the client sends a "monitor_cancel"
+JSON-RPC request.
+
+Each <monitor-request> describes how to monitor a table:
+
+ The circumstances in which an "update" notification is sent for a
+ row within the table are determined by <monitor-select>:
+
+ If "initial" is omitted or true, every row in the table is
+ sent as part of the reply to the "monitor" request.
+
+ If "insert" is omitted or true, "update" notifications are
+ sent for rows newly inserted into the table.
+
+ If "delete" is omitted or true, "update" notifications are
+ sent for rows deleted from the table.
+
+ If "modify" is omitted or true, "update" notifications are
+ sent whenever when a row in the table is modified.
+
+ The "columns" member specifies the columns whose values are
+ monitored. If "columns" is omitted, all columns in the table,
+ except for "_uuid", are monitored.
+
+The "result" in the JSON-RPC response to the "monitor" request is a
+<table-updates> object (see below) that contains the contents of the
+tables for which "initial" rows are selected. If no tables' initial
+contents are requested, then "result" is an empty object.
+
+update
+......
+
+Notification object members:
+
+ "method": "update"
+ "params": [<value>, <table-updates>]
+ "id": null
+
+The <value> in "params" is the same as the value passed as the <value>
+in "params" for the "monitor" request.
+
+<table-updates> is an object that maps from a table name to a
+<table-update>.
+
+A <table-update> is an object that maps from the row's UUID (as a
+36-byte string) to a <row-update> object.
+
+A <row-update> is an object with the following members:
+
+ "old": <row> present for "delete" and "modify" updates
+ "new": <row> present for "initial", "insert", and "modify" updates
+
+This JSON-RPC notification is sent from the server to the client to
+tell it about changes to a monitored table (or the initial state of a
+modified table). Each table in which one or more rows has changed (or
+whose initial view is being presented) is represented in "updates".
+Each row that has changed (or whose initial view is being presented)
+is represented in its <table-update> as a member with its name taken
+from the row's _uuid member. The corresponding value is a
+<row-update>:
+
+ The "old" member is present for "delete" and "modify" updates.
+ For "delete" updates, each monitored column is included. For
+ "modify" updates, the prior value of each monitored column whose
+ value has changed is included (monitored columns that have not
+ changed are represented in "new").
+
+ The "new" member is present for "initial", "insert", and "modify"
+ updates. For "initial" and "insert" updates, each monitored
+ column is included. For "modify" updates, the new value of each
+ monitored column is included.
+
+monitor_cancel
+..............
+
+Request object members:
+
+ "method": "monitor_cancel" required
+ "params": [<value>] required
+ "id": any JSON value except null required
+
+Response object members:
+
+ "result": {}
+ "error": null
+ "id": the request "id" member
+
+Cancels the ongoing table monitor request, identified by the <value>
+in "params" matching the <value> in "params" for an ongoing "monitor"
+request. No more "update" messages will be sent for this table
+monitor.
+
echo
....
diff --git a/ovsdb/column.c b/ovsdb/column.c
index 1e8a2d09d..fc21cdc98 100644
--- a/ovsdb/column.c
+++ b/ovsdb/column.c
@@ -174,6 +174,19 @@ error:
"array of distinct column names expected");
}
+struct json *
+ovsdb_column_set_to_json(const struct ovsdb_column_set *set)
+{
+ struct json *json;
+ size_t i;
+
+ json = json_array_create_empty();
+ for (i = 0; i < set->n_columns; i++) {
+ json_array_add(json, json_string_create(set->columns[i]->name));
+ }
+ return json;
+}
+
void
ovsdb_column_set_add(struct ovsdb_column_set *set,
const struct ovsdb_column *column)
diff --git a/ovsdb/column.h b/ovsdb/column.h
index 594215108..5fd39ae10 100644
--- a/ovsdb/column.h
+++ b/ovsdb/column.h
@@ -71,6 +71,7 @@ void ovsdb_column_set_clone(struct ovsdb_column_set *,
struct ovsdb_error *ovsdb_column_set_from_json(const struct json *,
const struct ovsdb_table *,
struct ovsdb_column_set *);
+struct json *ovsdb_column_set_to_json(const struct ovsdb_column_set *);
void ovsdb_column_set_add(struct ovsdb_column_set *,
const struct ovsdb_column *);
diff --git a/ovsdb/jsonrpc-server.c b/ovsdb/jsonrpc-server.c
index 977d30246..fc8b194e9 100644
--- a/ovsdb/jsonrpc-server.c
+++ b/ovsdb/jsonrpc-server.c
@@ -22,10 +22,15 @@
#include "column.h"
#include "json.h"
#include "jsonrpc.h"
+#include "ovsdb-error.h"
+#include "ovsdb-parser.h"
#include "ovsdb.h"
#include "reconnect.h"
+#include "row.h"
#include "stream.h"
+#include "table.h"
#include "timeval.h"
+#include "transaction.h"
#include "trigger.h"
#define THIS_MODULE VLM_ovsdb_jsonrpc_server
@@ -33,6 +38,7 @@
struct ovsdb_jsonrpc_session;
+/* Sessions. */
static void ovsdb_jsonrpc_session_create_active(struct ovsdb_jsonrpc_server *,
const char *name);
static void ovsdb_jsonrpc_session_create_passive(struct ovsdb_jsonrpc_server *,
@@ -40,6 +46,7 @@ static void ovsdb_jsonrpc_session_create_passive(struct ovsdb_jsonrpc_server *,
static void ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_server *);
static void ovsdb_jsonrpc_session_wait_all(struct ovsdb_jsonrpc_server *);
+/* Triggers. */
static void ovsdb_jsonrpc_trigger_create(struct ovsdb_jsonrpc_session *,
struct json *id, struct json *params);
static struct ovsdb_jsonrpc_trigger *ovsdb_jsonrpc_trigger_find(
@@ -48,6 +55,15 @@ static void ovsdb_jsonrpc_trigger_complete(struct ovsdb_jsonrpc_trigger *);
static void ovsdb_jsonrpc_trigger_complete_all(struct ovsdb_jsonrpc_session *);
static void ovsdb_jsonrpc_trigger_complete_done(
struct ovsdb_jsonrpc_session *);
+
+/* Monitors. */
+static struct json *ovsdb_jsonrpc_monitor_create(
+ struct ovsdb_jsonrpc_session *, struct json *params);
+static struct jsonrpc_msg *ovsdb_jsonrpc_monitor_cancel(
+ struct ovsdb_jsonrpc_session *,
+ struct json_array *params,
+ const struct json *request_id);
+static void ovsdb_jsonrpc_monitor_remove_all(struct ovsdb_jsonrpc_session *);
/* JSON-RPC database server. */
@@ -151,6 +167,9 @@ struct ovsdb_jsonrpc_session {
struct hmap triggers; /* Hmap of "struct ovsdb_jsonrpc_trigger"s. */
struct list completions; /* Completed triggers. */
+ /* Monitors. */
+ struct hmap monitors; /* Hmap of "struct ovsdb_jsonrpc_monitor"s. */
+
/* Connecting and reconnecting. */
struct reconnect *reconnect; /* For back-off. */
bool active; /* Active or passive connection? */
@@ -177,6 +196,7 @@ ovsdb_jsonrpc_session_create(struct ovsdb_jsonrpc_server *svr,
s->server = svr;
list_push_back(&svr->sessions, &s->node);
hmap_init(&s->triggers);
+ hmap_init(&s->monitors);
list_init(&s->completions);
s->reconnect = reconnect_create(time_msec());
reconnect_set_name(s->reconnect, name);
@@ -221,6 +241,7 @@ ovsdb_jsonrpc_session_disconnect(struct ovsdb_jsonrpc_session *s)
if (s->rpc) {
jsonrpc_error(s->rpc, EOF);
ovsdb_jsonrpc_trigger_complete_all(s);
+ ovsdb_jsonrpc_monitor_remove_all(s);
jsonrpc_close(s->rpc);
s->rpc = NULL;
} else if (s->stream) {
@@ -375,6 +396,12 @@ ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *s,
if (!strcmp(request->method, "transact")) {
reply = execute_transaction(s, request);
+ } else if (!strcmp(request->method, "monitor")) {
+ reply = jsonrpc_create_reply(
+ ovsdb_jsonrpc_monitor_create(s, request->params), request->id);
+ } else if (!strcmp(request->method, "monitor_cancel")) {
+ reply = ovsdb_jsonrpc_monitor_cancel(s, json_array(request->params),
+ request->id);
} else if (!strcmp(request->method, "get_schema")) {
reply = jsonrpc_create_reply(
ovsdb_schema_to_json(s->server->db->schema), request->id);
@@ -522,3 +549,402 @@ ovsdb_jsonrpc_trigger_complete_done(struct ovsdb_jsonrpc_session *s)
ovsdb_jsonrpc_trigger_complete(t);
}
}
+
+/* JSON-RPC database table monitors. */
+
+enum ovsdb_jsonrpc_monitor_selection {
+ OJMS_INITIAL = 1 << 0, /* All rows when monitor is created. */
+ OJMS_INSERT = 1 << 1, /* New rows. */
+ OJMS_DELETE = 1 << 2, /* Deleted rows. */
+ OJMS_MODIFY = 1 << 3 /* Modified rows. */
+};
+
+struct ovsdb_jsonrpc_monitor_table {
+ const struct ovsdb_table *table;
+ enum ovsdb_jsonrpc_monitor_selection select;
+ struct ovsdb_column_set columns;
+};
+
+struct ovsdb_jsonrpc_monitor {
+ struct ovsdb_replica replica;
+ struct ovsdb_jsonrpc_session *session;
+ struct hmap_node node; /* In ovsdb_jsonrpc_session's "monitors". */
+
+ struct json *monitor_id;
+ struct shash tables; /* Holds "struct ovsdb_jsonrpc_monitor_table"s. */
+};
+
+static const struct ovsdb_replica_class ovsdb_jsonrpc_replica_class;
+
+struct ovsdb_jsonrpc_monitor *ovsdb_jsonrpc_monitor_find(
+ struct ovsdb_jsonrpc_session *, const struct json *monitor_id);
+static void ovsdb_jsonrpc_monitor_destroy(struct ovsdb_replica *);
+static struct json *ovsdb_jsonrpc_monitor_get_initial(
+ const struct ovsdb_jsonrpc_monitor *);
+
+static bool
+parse_bool(struct ovsdb_parser *parser, const char *name, bool default_value)
+{
+ const struct json *json;
+
+ json = ovsdb_parser_member(parser, name, OP_BOOLEAN | OP_OPTIONAL);
+ return json ? json_boolean(json) : default_value;
+}
+
+struct ovsdb_jsonrpc_monitor *
+ovsdb_jsonrpc_monitor_find(struct ovsdb_jsonrpc_session *s,
+ const struct json *monitor_id)
+{
+ struct ovsdb_jsonrpc_monitor *m;
+
+ HMAP_FOR_EACH_WITH_HASH (m, struct ovsdb_jsonrpc_monitor, node,
+ json_hash(monitor_id, 0), &s->monitors) {
+ if (json_equal(m->monitor_id, monitor_id)) {
+ return m;
+ }
+ }
+
+ return NULL;
+}
+
+static struct json *
+ovsdb_jsonrpc_monitor_create(struct ovsdb_jsonrpc_session *s,
+ struct json *params)
+{
+ struct ovsdb_jsonrpc_monitor *m = NULL;
+ struct json *monitor_id, *monitor_requests;
+ struct ovsdb_error *error = NULL;
+ struct shash_node *node;
+ struct json *json;
+
+ if (json_array(params)->n != 2) {
+ error = ovsdb_syntax_error(params, NULL, "invalid parameters");
+ goto error;
+ }
+ monitor_id = params->u.array.elems[0];
+ monitor_requests = params->u.array.elems[1];
+ if (monitor_requests->type != JSON_OBJECT) {
+ error = ovsdb_syntax_error(monitor_requests, NULL,
+ "monitor-requests must be object");
+ goto error;
+ }
+
+ if (ovsdb_jsonrpc_monitor_find(s, monitor_id)) {
+ error = ovsdb_syntax_error(monitor_id, NULL, "duplicate monitor ID");
+ goto error;
+ }
+
+ m = xzalloc(sizeof *m);
+ ovsdb_replica_init(&m->replica, &ovsdb_jsonrpc_replica_class);
+ ovsdb_add_replica(s->server->db, &m->replica);
+ m->session = s;
+ hmap_insert(&s->monitors, &m->node, json_hash(monitor_id, 0));
+ m->monitor_id = json_clone(monitor_id);
+ shash_init(&m->tables);
+
+ SHASH_FOR_EACH (node, json_object(monitor_requests)) {
+ const struct ovsdb_table *table;
+ struct ovsdb_jsonrpc_monitor_table *mt;
+ const struct json *columns_json, *select_json;
+ struct ovsdb_parser parser;
+
+ table = ovsdb_get_table(s->server->db, node->name);
+ if (!table) {
+ error = ovsdb_syntax_error(NULL, NULL,
+ "no table named %s", node->name);
+ goto error;
+ }
+
+ mt = xzalloc(sizeof *mt);
+ mt->table = table;
+ mt->select = OJMS_INITIAL | OJMS_INSERT | OJMS_DELETE | OJMS_MODIFY;
+ ovsdb_column_set_init(&mt->columns);
+ shash_add(&m->tables, table->schema->name, mt);
+
+ ovsdb_parser_init(&parser, node->data, "table %s", node->name);
+ columns_json = ovsdb_parser_member(&parser, "columns",
+ OP_ARRAY | OP_OPTIONAL);
+ select_json = ovsdb_parser_member(&parser, "select",
+ OP_OBJECT | OP_OPTIONAL);
+ error = ovsdb_parser_finish(&parser);
+ if (error) {
+ goto error;
+ }
+
+ if (columns_json) {
+ error = ovsdb_column_set_from_json(columns_json, table,
+ &mt->columns);
+ if (error) {
+ goto error;
+ }
+ } else {
+ struct shash_node *node;
+
+ SHASH_FOR_EACH (node, &table->schema->columns) {
+ const struct ovsdb_column *column = node->data;
+ if (column->index != OVSDB_COL_UUID) {
+ ovsdb_column_set_add(&mt->columns, column);
+ }
+ }
+ }
+
+ if (select_json) {
+ mt->select = 0;
+ ovsdb_parser_init(&parser, select_json, "table %s select",
+ table->schema->name);
+ if (parse_bool(&parser, "initial", true)) {
+ mt->select |= OJMS_INITIAL;
+ }
+ if (parse_bool(&parser, "insert", true)) {
+ mt->select |= OJMS_INSERT;
+ }
+ if (parse_bool(&parser, "delete", true)) {
+ mt->select |= OJMS_DELETE;
+ }
+ if (parse_bool(&parser, "modify", true)) {
+ mt->select |= OJMS_MODIFY;
+ }
+ error = ovsdb_parser_finish(&parser);
+ if (error) {
+ goto error;
+ }
+ }
+ }
+
+ return ovsdb_jsonrpc_monitor_get_initial(m);
+
+error:
+ ovsdb_remove_replica(s->server->db, &m->replica);
+
+ json = ovsdb_error_to_json(error);
+ ovsdb_error_destroy(error);
+ return json;
+}
+
+static struct jsonrpc_msg *
+ovsdb_jsonrpc_monitor_cancel(struct ovsdb_jsonrpc_session *s,
+ struct json_array *params,
+ const struct json *request_id)
+{
+ if (params->n != 1) {
+ return jsonrpc_create_error(json_string_create("invalid parameters"),
+ request_id);
+ } else {
+ struct ovsdb_jsonrpc_monitor *m;
+
+ m = ovsdb_jsonrpc_monitor_find(s, params->elems[0]);
+ if (!m) {
+ return jsonrpc_create_error(json_string_create("unknown monitor"),
+ request_id);
+ } else {
+ ovsdb_remove_replica(s->server->db, &m->replica);
+ return jsonrpc_create_reply(json_object_create(), request_id);
+ }
+ }
+}
+
+static void
+ovsdb_jsonrpc_monitor_remove_all(struct ovsdb_jsonrpc_session *s)
+{
+ struct ovsdb_jsonrpc_monitor *m, *next;
+
+ HMAP_FOR_EACH_SAFE (m, next,
+ struct ovsdb_jsonrpc_monitor, node, &s->monitors) {
+ ovsdb_remove_replica(s->server->db, &m->replica);
+ }
+}
+
+static struct ovsdb_jsonrpc_monitor *
+ovsdb_jsonrpc_monitor_cast(struct ovsdb_replica *replica)
+{
+ assert(replica->class == &ovsdb_jsonrpc_replica_class);
+ return CONTAINER_OF(replica, struct ovsdb_jsonrpc_monitor, replica);
+}
+
+struct ovsdb_jsonrpc_monitor_aux {
+ bool initial; /* Sending initial contents of table? */
+ const struct ovsdb_jsonrpc_monitor *monitor;
+ struct json *json; /* JSON for the whole transaction. */
+
+ /* Current table. */
+ struct ovsdb_jsonrpc_monitor_table *mt;
+ struct json *table_json; /* JSON for table's transaction. */
+};
+
+static bool
+ovsdb_jsonrpc_monitor_change_cb(const struct ovsdb_row *old,
+ const struct ovsdb_row *new,
+ void *aux_)
+{
+ struct ovsdb_jsonrpc_monitor_aux *aux = aux_;
+ const struct ovsdb_jsonrpc_monitor *m = aux->monitor;
+ struct ovsdb_table *table = new ? new->table : old->table;
+ enum ovsdb_jsonrpc_monitor_selection type;
+ struct json *old_json, *new_json;
+ struct json *row_json;
+ char uuid[UUID_LEN + 1];
+ int n_changed;
+ size_t i;
+
+ if (!aux->mt || table != aux->mt->table) {
+ aux->mt = shash_find_data(&m->tables, table->schema->name);
+ aux->table_json = NULL;
+ if (!aux->mt) {
+ /* We don't care about rows in this table at all. Tell the caller
+ * to skip it. */
+ return false;
+ }
+ }
+
+ type = (aux->initial ? OJMS_INITIAL
+ : !old ? OJMS_INSERT
+ : !new ? OJMS_DELETE
+ : OJMS_MODIFY);
+ if (!(aux->mt->select & type)) {
+ /* We don't care about this type of change (but do want to be called
+ * back for changes to other rows in the same table). */
+ return true;
+ }
+
+ old_json = new_json = NULL;
+ n_changed = 0;
+ for (i = 0; i < aux->mt->columns.n_columns; i++) {
+ const struct ovsdb_column *column = aux->mt->columns.columns[i];
+ unsigned int idx = column->index;
+ bool changed = false;
+
+ if (type == OJMS_MODIFY) {
+ changed = !ovsdb_datum_equals(&old->fields[idx],
+ &new->fields[idx], &column->type);
+ n_changed += changed;
+ }
+ if (changed || type == OJMS_DELETE) {
+ if (!old_json) {
+ old_json = json_object_create();
+ }
+ json_object_put(old_json, column->name,
+ ovsdb_datum_to_json(&old->fields[idx],
+ &column->type));
+ }
+ if (type & (OJMS_INITIAL | OJMS_INSERT | OJMS_MODIFY)) {
+ if (!new_json) {
+ new_json = json_object_create();
+ }
+ json_object_put(new_json, column->name,
+ ovsdb_datum_to_json(&new->fields[idx],
+ &column->type));
+ }
+ }
+ if ((type == OJMS_MODIFY && !n_changed) || (!old_json && !new_json)) {
+ /* No reportable changes. */
+ json_destroy(old_json);
+ json_destroy(new_json);
+ return true;
+ }
+
+ /* Create JSON object for transaction overall. */
+ if (!aux->json) {
+ aux->json = json_object_create();
+ }
+
+ /* Create JSON object for transaction on this table. */
+ if (!aux->table_json) {
+ aux->table_json = json_object_create();
+ json_object_put(aux->json, aux->mt->table->schema->name,
+ aux->table_json);
+ }
+
+ /* Create JSON object for transaction on this row. */
+ row_json = json_object_create();
+ if (old_json) {
+ json_object_put(row_json, "old", old_json);
+ }
+ if (new_json) {
+ json_object_put(row_json, "new", new_json);
+ }
+
+ /* Add JSON row to JSON table. */
+ snprintf(uuid, sizeof uuid,
+ UUID_FMT, UUID_ARGS(ovsdb_row_get_uuid(new ? new : old)));
+ json_object_put(aux->table_json, uuid, row_json);
+
+ return true;
+}
+
+static void
+ovsdb_jsonrpc_monitor_init_aux(struct ovsdb_jsonrpc_monitor_aux *aux,
+ const struct ovsdb_jsonrpc_monitor *m,
+ bool initial)
+{
+ aux->initial = initial;
+ aux->monitor = m;
+ aux->json = NULL;
+ aux->mt = NULL;
+ aux->table_json = NULL;
+}
+
+static struct ovsdb_error *
+ovsdb_jsonrpc_monitor_commit(struct ovsdb_replica *replica,
+ const struct ovsdb_txn *txn, bool durable UNUSED)
+{
+ struct ovsdb_jsonrpc_monitor *m = ovsdb_jsonrpc_monitor_cast(replica);
+ struct ovsdb_jsonrpc_monitor_aux aux;
+
+ ovsdb_jsonrpc_monitor_init_aux(&aux, m, false);
+ ovsdb_txn_for_each_change(txn, ovsdb_jsonrpc_monitor_change_cb, &aux);
+ if (aux.json) {
+ struct jsonrpc_msg *msg;
+ struct json *params;
+
+ params = json_array_create_2(json_clone(aux.monitor->monitor_id),
+ aux.json);
+ msg = jsonrpc_create_notify("update", params);
+ jsonrpc_send(aux.monitor->session->rpc, msg);
+ }
+
+ return NULL;
+}
+
+static struct json *
+ovsdb_jsonrpc_monitor_get_initial(const struct ovsdb_jsonrpc_monitor *m)
+{
+ struct ovsdb_jsonrpc_monitor_aux aux;
+ struct shash_node *node;
+
+ ovsdb_jsonrpc_monitor_init_aux(&aux, m, true);
+ SHASH_FOR_EACH (node, &m->tables) {
+ struct ovsdb_jsonrpc_monitor_table *mt = node->data;
+
+ if (mt->select & OJMS_INITIAL) {
+ struct ovsdb_row *row;
+
+ HMAP_FOR_EACH (row, struct ovsdb_row, hmap_node,
+ &mt->table->rows) {
+ ovsdb_jsonrpc_monitor_change_cb(NULL, row, &aux);
+ }
+ }
+ }
+ return aux.json ? aux.json : json_object_create();
+}
+
+static void
+ovsdb_jsonrpc_monitor_destroy(struct ovsdb_replica *replica)
+{
+ struct ovsdb_jsonrpc_monitor *m = ovsdb_jsonrpc_monitor_cast(replica);
+ struct shash_node *node;
+
+ json_destroy(m->monitor_id);
+ SHASH_FOR_EACH (node, &m->tables) {
+ struct ovsdb_jsonrpc_monitor_table *mt = node->data;
+ ovsdb_column_set_destroy(&mt->columns);
+ free(mt);
+ }
+ shash_destroy(&m->tables);
+ hmap_remove(&m->session->monitors, &m->node);
+ free(m);
+}
+
+static const struct ovsdb_replica_class ovsdb_jsonrpc_replica_class = {
+ ovsdb_jsonrpc_monitor_commit,
+ ovsdb_jsonrpc_monitor_destroy
+};
diff --git a/ovsdb/ovsdb-client.1.in b/ovsdb/ovsdb-client.1.in
index 0337c3dc2..9825d327b 100644
--- a/ovsdb/ovsdb-client.1.in
+++ b/ovsdb/ovsdb-client.1.in
@@ -20,6 +20,10 @@ ovsdb\-client \- command-line interface to \fBovsdb-server\fR(1)
.br
\fBovsdb\-client \fR[\fIoptions\fR] \fBtransact\fI server transaction\fR
.br
+\fBovsdb\-client \fR[\fIoptions\fR] \fBmonitor\fI server table\fR
+[\fIcolumn\fR[\fB,\fIcolumn\fR]...]
+[\fIselect\fR[\fB,\fIselect\fR]...]
+.br
\fBovsdb\-client help\fR
.IP "Output formatting options:"
[\fB--format=\fIformat\fR]
@@ -62,10 +66,28 @@ Connects to \fIserver\fR, retrieves the database schema, and prints
a table listing the names, type, and comment (if any) on each column. If
\fItable\fR is specified, only columns in that table are listed;
otherwise, the tables include columns in all tables.
-.IP "\fBovsdb\-client \fR[\fIoptions\fR] \fBtransact\fI server transaction\fR"
+.
+.IP "\fBtransact\fI server transaction\fR"
Connects to \fIserver\fR, sends it the specified \fItransaction\fR,
which must be a JSON array containing one or more valid OVSDB
operations, and prints the received reply on stdout.
+.
+.IP "\fBmonitor\fI server table\fR [\fIcolumn\fR[\fB,\fIcolumn\fR]...] [\fIselect\fR[\fB,\fIselect\fR]...]"
+Connects to \fIserver\fR and monitors the contents of \fItable\fR. By
+default, the initial contents of \fItable\fR are printed, followed by
+each change as it occurs. If at least one \fIcolumn\fR is specified,
+only those columns are monitored. If at least one \fIselect\fR is
+specified, they are interpreted as follows:
+.RS
+.IP "\fBinitial\fR"
+Print the initial contents of the specified columns.
+.IP "\fBinsert\fR"
+Print newly inserted rows.
+.IP "\fBdelete\fR"
+Print deleted rows.
+.IP "\fBmodify\fR"
+Print old and new values of modified rows.
+.RE
.SH OPTIONS
.SS "Output Formatting Options"
Much of the output from \fBovsdb\-client\fR is in the form of tables.
diff --git a/ovsdb/ovsdb-client.c b/ovsdb/ovsdb-client.c
index 249cafe54..6e0068125 100644
--- a/ovsdb/ovsdb-client.c
+++ b/ovsdb/ovsdb-client.c
@@ -149,7 +149,10 @@ usage(void)
" list columns in TABLE (or all tables) on SERVER\n"
"\n transact SERVER TRANSACTION\n"
" run TRANSACTION (a JSON array of operations) on SERVER\n"
- " and print the results as JSON on stdout\n",
+ " and print the results as JSON on stdout\n"
+ "\n monitor SERVER TABLE [COLUMN,...] [SELECT,...]\n"
+ " monitor contents of (COLUMNs in) TABLE on SERVER\n"
+ " Valid SELECTs are: initial, insert, delete, modify\n",
program_name, program_name);
stream_usage("SERVER", true, true);
printf("\nOutput formatting options:\n"
@@ -227,14 +230,12 @@ check_ovsdb_error(struct ovsdb_error *error)
}
static struct ovsdb_schema *
-fetch_schema(const char *server)
+fetch_schema_from_rpc(struct jsonrpc *rpc)
{
struct jsonrpc_msg *request, *reply;
struct ovsdb_schema *schema;
- struct jsonrpc *rpc;
int error;
- rpc = open_jsonrpc(server);
request = jsonrpc_create_request("get_schema", json_array_create_empty());
error = jsonrpc_transact_block(rpc, request, &reply);
if (error) {
@@ -242,6 +243,18 @@ fetch_schema(const char *server)
}
check_ovsdb_error(ovsdb_schema_from_json(reply->result, &schema));
jsonrpc_msg_destroy(reply);
+
+ return schema;
+}
+
+static struct ovsdb_schema *
+fetch_schema(const char *server)
+{
+ struct ovsdb_schema *schema;
+ struct jsonrpc *rpc;
+
+ rpc = open_jsonrpc(server);
+ schema = fetch_schema_from_rpc(rpc);
jsonrpc_close(rpc);
return schema;
@@ -267,6 +280,22 @@ table_init(struct table *table)
}
static void
+table_destroy(struct table *table)
+{
+ size_t i;
+
+ for (i = 0; i < table->n_columns; i++) {
+ free(table->columns[i].heading);
+ }
+ free(table->columns);
+
+ for (i = 0; i < table->n_columns * table->n_rows; i++) {
+ free(table->cells[i]);
+ }
+ free(table->cells);
+}
+
+static void
table_add_column(struct table *table, const char *heading, ...)
PRINTF_FORMAT(2, 3);
@@ -590,7 +619,7 @@ do_list_columns(int argc UNUSED, char *argv[])
}
static void
-do_transact(int argc UNUSED, char *argv[] UNUSED)
+do_transact(int argc UNUSED, char *argv[])
{
struct jsonrpc_msg *request, *reply;
struct json *transaction;
@@ -616,6 +645,188 @@ do_transact(int argc UNUSED, char *argv[] UNUSED)
}
static void
+monitor_print_row(struct json *row, const char *type, const char *uuid,
+ const struct ovsdb_column_set *columns, struct table *t)
+{
+ size_t i;
+
+ if (!row) {
+ ovs_error(0, "missing %s row", type);
+ return;
+ } else if (row->type != JSON_OBJECT) {
+ ovs_error(0, "<row> is not object");
+ return;
+ }
+
+ table_add_row(t);
+ table_add_cell(t, uuid);
+ table_add_cell(t, type);
+ for (i = 0; i < columns->n_columns; i++) {
+ const struct ovsdb_column *column = columns->columns[i];
+ struct json *value = shash_find_data(json_object(row), column->name);
+ if (value) {
+ table_add_cell_nocopy(t, json_to_string(value, JSSF_SORT));
+ } else {
+ table_add_cell(t, "");
+ }
+ }
+}
+
+static void
+monitor_print(struct json *table_updates,
+ const struct ovsdb_table_schema *table,
+ const struct ovsdb_column_set *columns, bool initial)
+{
+ struct json *table_update;
+ struct shash_node *node;
+ struct table t;
+ size_t i;
+
+ table_init(&t);
+
+ if (table_updates->type != JSON_OBJECT) {
+ ovs_error(0, "<table-updates> is not object");
+ return;
+ }
+ table_update = shash_find_data(json_object(table_updates), table->name);
+ if (!table_update) {
+ return;
+ }
+ if (table_update->type != JSON_OBJECT) {
+ ovs_error(0, "<table-update> is not object");
+ return;
+ }
+
+ table_add_column(&t, "row");
+ table_add_column(&t, "action");
+ for (i = 0; i < columns->n_columns; i++) {
+ table_add_column(&t, "%s", columns->columns[i]->name);
+ }
+ SHASH_FOR_EACH (node, json_object(table_update)) {
+ struct json *row_update = node->data;
+ struct json *old, *new;
+
+ if (row_update->type != JSON_OBJECT) {
+ ovs_error(0, "<row-update> is not object");
+ continue;
+ }
+ old = shash_find_data(json_object(row_update), "old");
+ new = shash_find_data(json_object(row_update), "new");
+ if (initial) {
+ monitor_print_row(new, "initial", node->name, columns, &t);
+ } else if (!old) {
+ monitor_print_row(new, "insert", node->name, columns, &t);
+ } else if (!new) {
+ monitor_print_row(old, "delete", node->name, columns, &t);
+ } else {
+ monitor_print_row(old, "old", node->name, columns, &t);
+ monitor_print_row(new, "new", "", columns, &t);
+ }
+ }
+ table_print(&t);
+ table_destroy(&t);
+}
+
+static void
+do_monitor(int argc, char *argv[])
+{
+ struct ovsdb_column_set columns = OVSDB_COLUMN_SET_INITIALIZER;
+ struct ovsdb_table_schema *table;
+ struct ovsdb_schema *schema;
+ struct jsonrpc_msg *request;
+ struct jsonrpc *rpc;
+ struct json *select, *monitor, *monitor_request, *monitor_requests,
+ *request_id;
+
+ rpc = open_jsonrpc(argv[1]);
+
+ schema = fetch_schema_from_rpc(rpc);
+ table = shash_find_data(&schema->tables, argv[2]);
+ if (!table) {
+ ovs_fatal(0, "%s: no table named \"%s\"", argv[1], argv[2]);
+ }
+
+ if (argc >= 4 && *argv[3] != '\0') {
+ char *save_ptr = NULL;
+ char *token;
+
+ for (token = strtok_r(argv[3], ",", &save_ptr); token != NULL;
+ token = strtok_r(NULL, ",", &save_ptr)) {
+ const struct ovsdb_column *column;
+ column = ovsdb_table_schema_get_column(table, token);
+ if (!column) {
+ ovs_fatal(0, "%s: table \"%s\" does not have a "
+ "column named \"%s\"", argv[1], argv[2], token);
+ }
+ ovsdb_column_set_add(&columns, column);
+ }
+ } else {
+ struct shash_node *node;
+
+ SHASH_FOR_EACH (node, &table->columns) {
+ const struct ovsdb_column *column = node->data;
+ if (column->index != OVSDB_COL_UUID) {
+ ovsdb_column_set_add(&columns, column);
+ }
+ }
+ }
+
+ if (argc >= 5 && *argv[4] != '\0') {
+ char *save_ptr = NULL;
+ char *token;
+
+ select = json_object_create();
+ for (token = strtok_r(argv[4], ",", &save_ptr); token != NULL;
+ token = strtok_r(NULL, ",", &save_ptr)) {
+ json_object_put(select, token, json_boolean_create(true));
+ }
+ } else {
+ select = NULL;
+ }
+
+ monitor_request = json_object_create();
+ json_object_put(monitor_request,
+ "columns", ovsdb_column_set_to_json(&columns));
+ if (select) {
+ json_object_put(monitor_request, "select", select);
+ }
+
+ monitor_requests = json_object_create();
+ json_object_put(monitor_requests, argv[2], monitor_request);
+
+ monitor = json_array_create_2(json_null_create(), monitor_requests);
+ request = jsonrpc_create_request("monitor", monitor);
+ request_id = json_clone(request->id);
+ jsonrpc_send(rpc, request);
+ for (;;) {
+ struct jsonrpc_msg *msg;
+ int error;
+
+ error = jsonrpc_recv_block(rpc, &msg);
+ if (error) {
+ ovs_fatal(error, "%s: receive failed", argv[1]);
+ }
+
+ if (msg->type == JSONRPC_REQUEST && !strcmp(msg->method, "echo")) {
+ jsonrpc_send(rpc, jsonrpc_create_reply(json_clone(msg->params),
+ msg->id));
+ } else if (msg->type == JSONRPC_REPLY
+ && json_equal(msg->id, request_id)) {
+ monitor_print(msg->result, table, &columns, true);
+ } else if (msg->type == JSONRPC_NOTIFY
+ && !strcmp(msg->method, "update")) {
+ struct json *params = msg->params;
+ if (params->type == JSON_ARRAY
+ && params->u.array.n == 2
+ && params->u.array.elems[0]->type == JSON_NULL) {
+ monitor_print(params->u.array.elems[1],
+ table, &columns, false);
+ }
+ }
+ }
+}
+
+static void
do_help(int argc UNUSED, char *argv[] UNUSED)
{
usage();
@@ -626,6 +837,7 @@ static const struct command all_commands[] = {
{ "list-tables", 1, 1, do_list_tables },
{ "list-columns", 1, 2, do_list_columns },
{ "transact", 2, 2, do_transact },
+ { "monitor", 2, 4, do_monitor },
{ "help", 0, INT_MAX, do_help },
{ NULL, 0, 0, NULL },
};