summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndy Zhou <azhou@nicira.com>2015-03-13 16:35:49 -0700
committerAndy Zhou <azhou@nicira.com>2015-05-29 17:45:15 -0700
commit1158f320622954f4027a35260916f6a950529c27 (patch)
tree9721ac018aeb699da51527a59877a37fe9abf34c
parentf1de87bb2f568ad126e77e85746ce63376ff0bd5 (diff)
downloadopenvswitch-1158f320622954f4027a35260916f6a950529c27.tar.gz
ovsdb-monitor: add ovsdb_monitor_changes
Currently, each monitor table contains a single hmap 'changes' to track updates. This patch introduces a new data structure 'ovsdb_monitor_changes' that stores the updates 'rows' tagged by its first commit transaction id. Each 'ovsdb_monitor_changes' is refenece counted allowing multiple jsonrpc_monitors to share them. The next patch will allow each ovsdb monitor table to store a list of 'ovsdb_monitor_changes'. This patch stores only one, same as before. Signed-off-by: Andy Zhou <azhou@nicira.com> Acked-by: Ben Pfaff <blp@nicira.com>
-rw-r--r--ovsdb/jsonrpc-server.c7
-rw-r--r--ovsdb/monitor.c130
2 files changed, 114 insertions, 23 deletions
diff --git a/ovsdb/jsonrpc-server.c b/ovsdb/jsonrpc-server.c
index 05aaf8718..efd83b8f0 100644
--- a/ovsdb/jsonrpc-server.c
+++ b/ovsdb/jsonrpc-server.c
@@ -1282,11 +1282,10 @@ ovsdb_jsonrpc_monitor_remove_all(struct ovsdb_jsonrpc_session *s)
}
static struct json *
-ovsdb_jsonrpc_monitor_compose_update(
- struct ovsdb_jsonrpc_monitor *monitor, bool initial)
+ovsdb_jsonrpc_monitor_compose_update(struct ovsdb_jsonrpc_monitor *m,
+ bool initial)
{
- return ovsdb_monitor_compose_update(monitor->dbmon, initial,
- &monitor->unflushed);
+ return ovsdb_monitor_compose_update(m->dbmon, initial, &m->unflushed);
}
static bool
diff --git a/ovsdb/monitor.c b/ovsdb/monitor.c
index 0898808b4..b82941b38 100644
--- a/ovsdb/monitor.c
+++ b/ovsdb/monitor.c
@@ -71,6 +71,23 @@ struct ovsdb_monitor_row {
struct ovsdb_datum *new; /* New data, NULL for a deleted row. */
};
+/* Contains 'struct ovsdb_monitor_row's for rows that have been
+ * updated but not yet flushed to all the jsonrpc connection.
+ *
+ * 'n_refs' represent the number of jsonrpc connections that have
+ * not received updates. Generate the update for the last jsonprc
+ * connection will also destroy the whole "struct ovsdb_monitor_changes"
+ * object.
+ *
+ * 'transaction' stores the first update's transaction id.
+ * */
+struct ovsdb_monitor_changes {
+ struct ovsdb_monitor_table *mt;
+ struct hmap rows;
+ int n_refs;
+ uint64_t transaction;
+};
+
/* A particular table being monitored. */
struct ovsdb_monitor_table {
const struct ovsdb_table *table;
@@ -85,10 +102,16 @@ struct ovsdb_monitor_table {
/* Contains 'struct ovsdb_monitor_row's for rows that have been
* updated but not yet flushed to the jsonrpc connection. */
- struct hmap changes;
+ struct ovsdb_monitor_changes *changes;
};
static void ovsdb_monitor_destroy(struct ovsdb_monitor *dbmon);
+static void ovsdb_monitor_table_add_changes(struct ovsdb_monitor_table *mt,
+ uint64_t next_txn);
+static void ovsdb_monitor_changes_destroy(
+ struct ovsdb_monitor_changes *changes);
+static void ovsdb_monitor_table_track_changes(struct ovsdb_monitor_table *mt,
+ uint64_t transaction);
static int
compare_ovsdb_monitor_column(const void *a_, const void *b_)
@@ -106,7 +129,7 @@ ovsdb_monitor_cast(struct ovsdb_replica *replica)
return CONTAINER_OF(replica, struct ovsdb_monitor, replica);
}
-/* Finds and returns the ovsdb_monitor_row in 'mt->changes' for the
+/* Finds and returns the ovsdb_monitor_row in 'mt->changes->rows' for the
* given 'uuid', or NULL if there is no such row. */
static struct ovsdb_monitor_row *
ovsdb_monitor_row_find(const struct ovsdb_monitor_table *mt,
@@ -114,7 +137,8 @@ ovsdb_monitor_row_find(const struct ovsdb_monitor_table *mt,
{
struct ovsdb_monitor_row *row;
- HMAP_FOR_EACH_WITH_HASH (row, hmap_node, uuid_hash(uuid), &mt->changes) {
+ HMAP_FOR_EACH_WITH_HASH (row, hmap_node, uuid_hash(uuid),
+ &mt->changes->rows) {
if (uuid_equals(uuid, &row->uuid)) {
return row;
}
@@ -233,7 +257,7 @@ ovsdb_monitor_add_table(struct ovsdb_monitor *m,
mt = xzalloc(sizeof *mt);
mt->table = table;
- hmap_init(&mt->changes);
+ mt->changes = NULL;
shash_add(&m->tables, table->schema->name, mt);
}
@@ -286,6 +310,61 @@ ovsdb_monitor_table_check_duplicates(struct ovsdb_monitor *m,
return NULL;
}
+static void
+ovsdb_monitor_table_add_changes(struct ovsdb_monitor_table *mt,
+ uint64_t next_txn)
+{
+ struct ovsdb_monitor_changes *changes;
+
+ changes = xzalloc(sizeof *changes);
+
+ changes->transaction = next_txn;
+ changes->mt = mt;
+ changes->n_refs = 1;
+ hmap_init(&changes->rows);
+ mt->changes = changes;
+}
+
+/* Stop currently tracking changes to table 'mt' since 'transaction'.
+ *
+ * Return 'true' if the 'transaction' is being tracked. 'false' otherwise. */
+static void
+ovsdb_monitor_table_untrack_changes(struct ovsdb_monitor_table *mt,
+ uint64_t transaction)
+{
+ struct ovsdb_monitor_changes *changes = mt->changes;
+ if (changes) {
+ ovs_assert(changes->transaction == transaction);
+ if (--changes->n_refs == 0) {
+ ovsdb_monitor_changes_destroy(changes);
+ mt->changes = NULL;
+ }
+ }
+}
+
+/* Start tracking changes to table 'mt' begins from 'transaction' inclusive.
+ */
+static void
+ovsdb_monitor_table_track_changes(struct ovsdb_monitor_table *mt,
+ uint64_t transaction)
+{
+ ovs_assert(!mt->changes);
+ ovsdb_monitor_table_add_changes(mt, transaction);
+}
+
+static void
+ovsdb_monitor_changes_destroy(struct ovsdb_monitor_changes *changes)
+{
+ struct ovsdb_monitor_row *row, *next;
+
+ HMAP_FOR_EACH_SAFE (row, next, hmap_node, &changes->rows) {
+ hmap_remove(&changes->rows, &row->hmap_node);
+ ovsdb_monitor_row_destroy(changes->mt, row);
+ }
+ hmap_destroy(&changes->rows);
+ free(changes);
+}
+
/* Returns JSON for a <row-update> (as described in RFC 7047) for 'row' within
* 'mt', or NULL if no row update should be sent.
*
@@ -376,7 +455,13 @@ ovsdb_monitor_compose_row_update(
*
* The caller should specify 'initial' as true if the returned JSON is going to
* be used as part of the initial reply to a "monitor" request, false if it is
- * going to be used as part of an "update" notification. */
+ * going to be used as part of an "update" notification.
+ *
+ * 'unflushed' should point to value that is the transaction ID that did
+ * was not updated. The update contains changes between
+ * ['unflushed, ovsdb->n_transcations]. Before the function returns, this
+ * value will be updated to ovsdb->n_transactions + 1, ready for the next
+ * update. */
struct json *
ovsdb_monitor_compose_update(const struct ovsdb_monitor *dbmon,
bool initial, uint64_t *unflushed)
@@ -385,8 +470,8 @@ ovsdb_monitor_compose_update(const struct ovsdb_monitor *dbmon,
unsigned long int *changed;
struct json *json;
size_t max_columns;
-
- *unflushed = dbmon->n_transactions + 1;
+ uint64_t prev_txn = *unflushed;
+ uint64_t next_txn = dbmon->n_transactions + 1;
max_columns = 0;
SHASH_FOR_EACH (node, &dbmon->tables) {
@@ -402,7 +487,12 @@ ovsdb_monitor_compose_update(const struct ovsdb_monitor *dbmon,
struct ovsdb_monitor_row *row, *next;
struct json *table_json = NULL;
- HMAP_FOR_EACH_SAFE (row, next, hmap_node, &mt->changes) {
+ if (!mt->changes) {
+ ovsdb_monitor_table_track_changes(mt, next_txn);
+ continue;
+ }
+
+ HMAP_FOR_EACH_SAFE (row, next, hmap_node, &mt->changes->rows) {
struct json *row_json;
row_json = ovsdb_monitor_compose_row_update(
@@ -426,11 +516,15 @@ ovsdb_monitor_compose_update(const struct ovsdb_monitor *dbmon,
json_object_put(table_json, uuid, row_json);
}
- hmap_remove(&mt->changes, &row->hmap_node);
+ hmap_remove(&mt->changes->rows, &row->hmap_node);
ovsdb_monitor_row_destroy(mt, row);
}
+
+ ovsdb_monitor_table_untrack_changes(mt, prev_txn);
+ ovsdb_monitor_table_track_changes(mt, next_txn);
}
+ *unflushed = next_txn;
free(changed);
return json;
}
@@ -492,8 +586,8 @@ ovsdb_monitor_change_cb(const struct ovsdb_row *old,
change = ovsdb_monitor_row_find(mt, uuid);
if (!change) {
- change = xmalloc(sizeof *change);
- hmap_insert(&mt->changes, &change->hmap_node, uuid_hash(uuid));
+ change = xzalloc(sizeof *change);
+ hmap_insert(&mt->changes->rows, &change->hmap_node, uuid_hash(uuid));
change->uuid = *uuid;
change->old = clone_monitor_row_data(mt, old);
change->new = clone_monitor_row_data(mt, new);
@@ -506,7 +600,7 @@ ovsdb_monitor_change_cb(const struct ovsdb_row *old,
if (!change->old) {
/* This row was added then deleted. Forget about it. */
- hmap_remove(&mt->changes, &change->hmap_node);
+ hmap_remove(&mt->changes->rows, &change->hmap_node);
free(change);
}
}
@@ -527,6 +621,10 @@ ovsdb_monitor_get_initial(const struct ovsdb_monitor *dbmon)
if (mt->select & OJMS_INITIAL) {
struct ovsdb_row *row;
+ if (!mt->changes) {
+ ovsdb_monitor_table_add_changes(mt, 0);
+ }
+
HMAP_FOR_EACH (row, hmap_node, &mt->table->rows) {
ovsdb_monitor_change_cb(NULL, row, NULL, &aux);
}
@@ -568,14 +666,8 @@ ovsdb_monitor_destroy(struct ovsdb_monitor *dbmon)
SHASH_FOR_EACH (node, &dbmon->tables) {
struct ovsdb_monitor_table *mt = node->data;
- struct ovsdb_monitor_row *row, *next;
-
- HMAP_FOR_EACH_SAFE (row, next, hmap_node, &mt->changes) {
- hmap_remove(&mt->changes, &row->hmap_node);
- ovsdb_monitor_row_destroy(mt, row);
- }
- hmap_destroy(&mt->changes);
+ ovsdb_monitor_changes_destroy(mt->changes);
free(mt->columns);
free(mt);
}