summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBen Pfaff <blp@ovn.org>2016-05-14 22:55:16 -0700
committerBen Pfaff <blp@ovn.org>2016-05-14 22:55:16 -0700
commit1c66728fab51d9d33f9935e18ced9993be233b1f (patch)
tree9d787b9e7a8b85b1fe9813868fa111dc9307f29b
parent8108d230ce87b7a6cd0d0365cd19713865aa2b43 (diff)
downloadopenvswitch-1c66728fab51d9d33f9935e18ced9993be233b1f.tar.gz
Add command status tracking to raft implementation and test-raft.
-rw-r--r--ovsdb/raft.c133
-rw-r--r--ovsdb/raft.h6
-rw-r--r--tests/test-raft.c59
3 files changed, 163 insertions, 35 deletions
diff --git a/ovsdb/raft.c b/ovsdb/raft.c
index 171121212..858fe6885 100644
--- a/ovsdb/raft.c
+++ b/ovsdb/raft.c
@@ -116,10 +116,20 @@ struct raft_conn {
};
struct raft_command {
- struct ovs_refcount refcnt;
+ struct hmap_node hmap_node; /* In struct raft's 'commands' hmap. */
+ uint64_t index; /* Index in log. */
+
+ unsigned int n_refs;
enum raft_command_status status;
};
+static void raft_command_complete(struct raft *, struct raft_command *,
+ enum raft_command_status);
+
+static void raft_complete_all_commands(struct raft *,
+ enum raft_command_status);
+static struct raft_command *raft_find_command(struct raft *, uint64_t index);
+
enum raft_waiter_type {
RAFT_W_COMMAND,
RAFT_W_APPEND
@@ -234,6 +244,7 @@ struct raft {
/* Leaders only. Reinitialized after becoming leader. */
struct hmap add_servers; /* Contains "struct raft_server"s to add. */
struct raft_server *remove_server; /* Server being removed. */
+ struct hmap commands; /* Contains "struct raft_command"s. */
/* Candidates only. Reinitialized at start of election. */
int n_votes; /* Number of votes for me. */
@@ -644,6 +655,7 @@ raft_alloc(const char *file_name)
hmap_init(&raft->servers);
hmap_init(&raft->prev_servers);
hmap_init(&raft->add_servers);
+ hmap_init(&raft->commands);
raft->listen_backoff = LLONG_MIN;
ovs_list_init(&raft->conns);
ovs_mutex_init(&raft->fsync_mutex);
@@ -1422,6 +1434,8 @@ raft_close(struct raft *raft)
/* XXX if we're leader then invoke the leadership transfer procedure? */
+ raft_complete_all_commands(raft, RAFT_CMD_SHUTDOWN);
+
ovs_mutex_lock(&raft->fsync_mutex);
raft->fsync_next = UINT64_MAX;
ovs_mutex_unlock(&raft->fsync_mutex);
@@ -1563,14 +1577,6 @@ raft_waiter_complete(struct raft *raft, struct raft_waiter *w)
}
static void
-raft_command_unref(struct raft_command *cmd)
-{
- if (cmd && ovs_refcount_unref(&cmd->refcnt) == 1) {
- free(cmd);
- }
-}
-
-static void
raft_waiter_destroy(struct raft_waiter *w)
{
if (!w) {
@@ -1666,6 +1672,7 @@ raft_start_election(struct raft *raft)
static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
ovs_assert(raft->role != RAFT_LEADER);
+ ovs_assert(hmap_is_empty(&raft->commands));
raft->role = RAFT_CANDIDATE;
/* XXX what if we're not part of the server set? */
@@ -1709,7 +1716,8 @@ raft_start_election(struct raft *raft)
raft_send(raft, &rq);
}
- /* Vote for ourselves. */
+ /* Vote for ourselves.
+ * XXX only if we're not being removed? */
raft_accept_vote(raft, raft->me, true);
/* XXX how do we handle outstanding waiters? */
@@ -1829,33 +1837,50 @@ raft_waiter_create(struct raft *raft, enum raft_waiter_type type)
return w;
}
+const char *
+raft_command_status_to_string(enum raft_command_status status)
+{
+ switch (status) {
+ case RAFT_CMD_INCOMPLETE:
+ return "operation still in progress";
+ case RAFT_CMD_SUCCESS:
+ return "success";
+ case RAFT_CMD_NOT_LEADER:
+ return "not leader";
+ case RAFT_CMD_LOST_LEADERSHIP:
+ return "lost leadership";
+ case RAFT_CMD_SHUTDOWN:
+ return "server shutdown";
+ default:
+ OVS_NOT_REACHED();
+ }
+}
+
static struct raft_command * OVS_WARN_UNUSED_RESULT
raft_command_execute__(struct raft *raft, enum raft_entry_type type,
const char *data)
{
struct raft_command *cmd = xzalloc(sizeof *cmd);
+ cmd->n_refs = 2; /* One for client, one for raft->commands. */
+ cmd->index = raft->log_end;
+ cmd->status = RAFT_CMD_INCOMPLETE;
+ hmap_insert(&raft->commands, &cmd->hmap_node, cmd->index);
+
if (raft->role != RAFT_LEADER) {
- cmd->status = RAFT_CMD_NOT_LEADER;
+ raft_command_complete(raft, cmd, RAFT_CMD_NOT_LEADER);
return cmd;
}
- uint64_t index = raft->log_end;
- ovs_refcount_init(&cmd->refcnt);
- cmd->status = RAFT_CMD_INCOMPLETE;
-
/* Write to local log.
*
* XXX If this server is being removed from the configuration then we
- * should not writ to the local log; see section 4.2.2. Or we could
+ * should not write to the local log; see section 4.2.2. Or we could
* implement leadership transfer. */
struct ovsdb_error *error = raft_write_entry(raft, raft->current_term,
type, xstrdup(data));
if (!error) {
- ovs_refcount_ref(&cmd->refcnt);
-
struct raft_waiter *w = raft_waiter_create(raft, RAFT_W_COMMAND);
- w->command.cmd = cmd;
- w->command.index = index;
+ w->command.index = cmd->index;
} else {
static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
char *s = ovsdb_error_to_string(error);
@@ -1883,6 +1908,66 @@ raft_command_execute(struct raft *raft, const char *data)
{
return raft_command_execute__(raft, RAFT_DATA, data);
}
+
+enum raft_command_status
+raft_command_get_status(const struct raft_command *cmd)
+{
+ ovs_assert(cmd->n_refs > 0);
+ return cmd->status;
+}
+
+void
+raft_command_unref(struct raft_command *cmd)
+{
+ if (cmd) {
+ ovs_assert(cmd->n_refs > 0);
+ if (!--cmd->n_refs) {
+ free(cmd);
+ }
+ }
+}
+
+void
+raft_command_wait(const struct raft_command *cmd)
+{
+ if (cmd->status != RAFT_CMD_INCOMPLETE) {
+ poll_immediate_wake();
+ }
+}
+
+static void
+raft_command_complete(struct raft *raft,
+ struct raft_command *cmd,
+ enum raft_command_status status)
+{
+ ovs_assert(cmd->status == RAFT_CMD_INCOMPLETE);
+ ovs_assert(cmd->n_refs > 0);
+ hmap_remove(&raft->commands, &cmd->hmap_node);
+ cmd->status = status;
+ raft_command_unref(cmd);
+}
+
+static void
+raft_complete_all_commands(struct raft *raft, enum raft_command_status status)
+{
+ struct raft_command *cmd, *next;
+ HMAP_FOR_EACH_SAFE (cmd, next, hmap_node, &raft->commands) {
+ raft_command_complete(raft, cmd, status);
+ }
+}
+
+static struct raft_command *
+raft_find_command(struct raft *raft, uint64_t index)
+{
+ struct raft_command *cmd;
+
+ HMAP_FOR_EACH_IN_BUCKET (cmd, hmap_node, index, &raft->commands) {
+ if (cmd->index == index) {
+ return cmd;
+ }
+ }
+ return NULL;
+}
static void
raft_rpc_destroy(union raft_rpc *rpc)
@@ -2356,6 +2441,7 @@ raft_become_follower(struct raft *raft)
}
/* XXX how do we handle outstanding waiters? */
+ raft_complete_all_commands(raft, RAFT_CMD_LOST_LEADERSHIP);
}
static void
@@ -2765,6 +2851,13 @@ raft_update_commit_index(struct raft *raft, uint64_t new_commit_index)
}
} else {
/* XXX apply log[lastApplied]. */
+ if (raft->role == RAFT_LEADER) {
+ struct raft_command *cmd
+ = raft_find_command(raft, raft->last_applied);
+ if (cmd) {
+ raft_command_complete(raft, cmd, RAFT_CMD_SUCCESS);
+ }
+ }
}
}
}
diff --git a/ovsdb/raft.h b/ovsdb/raft.h
index 4c012f9d1..4f68579c3 100644
--- a/ovsdb/raft.h
+++ b/ovsdb/raft.h
@@ -73,11 +73,15 @@ enum raft_command_status {
RAFT_CMD_INCOMPLETE, /* In progress, please wait. */
RAFT_CMD_SUCCESS, /* Committed. */
RAFT_CMD_NOT_LEADER, /* Failed because we are not the leader. */
- RAFT_CMD_FAILURE, /* Other failure. */
+ RAFT_CMD_LOST_LEADERSHIP, /* Leadership lost after command initiation. */
+ RAFT_CMD_SHUTDOWN, /* Raft server shut down. */
};
+const char *raft_command_status_to_string(enum raft_command_status);
+
struct raft_command *raft_command_execute(struct raft *, const char *data)
OVS_WARN_UNUSED_RESULT;
enum raft_command_status raft_command_get_status(const struct raft_command *);
+void raft_command_unref(struct raft_command *);
void raft_command_wait(const struct raft_command *);
#endif /* lib/raft.h */
diff --git a/tests/test-raft.c b/tests/test-raft.c
index dbf726e91..92e5cefe4 100644
--- a/tests/test-raft.c
+++ b/tests/test-raft.c
@@ -28,8 +28,14 @@
#include "uuid.h"
#include "openvswitch/vlog.h"
+struct execute_ctx {
+ struct raft *raft;
+ struct raft_command *cmd;
+ struct unixctl_conn *conn;
+};
+
OVS_NO_RETURN static void usage(void);
-static void parse_options(int argc, char *argv[]);
+static void parse_options(int argc, char *argv[], char **unixctl_pathp);
static unixctl_cb_func test_raft_exit;
static unixctl_cb_func test_raft_execute;
@@ -51,10 +57,11 @@ check_ovsdb_error(struct ovsdb_error *error)
int
main(int argc, char *argv[])
{
+ char *unixctl_pathp = NULL;
set_program_name(argv[0]);
service_start(&argc, &argv);
fatal_signal_init();
- parse_options(argc, argv);
+ parse_options(argc, argv, &unixctl_pathp);
argc -= optind;
argv += optind;
@@ -65,7 +72,6 @@ main(int argc, char *argv[])
daemonize_start(false);
-
struct raft *raft;
const char *file_name = argv[0];
if (argc == 1) {
@@ -83,16 +89,19 @@ main(int argc, char *argv[])
}
struct unixctl_server *server;
- int error = unixctl_server_create(NULL, &server);
+ int error = unixctl_server_create(unixctl_pathp, &server);
if (error) {
ovs_fatal(error, "failed to create unixctl server");
}
bool exiting = false;
unixctl_command_register("exit", "", 0, 0, test_raft_exit, &exiting);
- unixctl_command_register("execute", "DATA", 1, 1, test_raft_execute, raft);
+
+ struct execute_ctx ec = { .raft = raft };
+ unixctl_command_register("execute", "DATA", 1, 1, test_raft_execute, &ec);
+
unixctl_command_register("take-leadership", "", 0, 0,
- test_raft_take_leadership, raft);
+ test_raft_take_leadership, NULL);
daemonize_complete();
@@ -104,6 +113,18 @@ main(int argc, char *argv[])
break;
}
+ if (ec.cmd) {
+ enum raft_command_status status = raft_command_get_status(ec.cmd);
+ if (status != RAFT_CMD_INCOMPLETE) {
+ unixctl_command_reply(ec.conn,
+ raft_command_status_to_string(status));
+ raft_command_unref(ec.cmd);
+
+ ec.cmd = NULL;
+ ec.conn = NULL;
+ }
+ }
+
unixctl_server_wait(server);
raft_wait(raft);
poll_block();
@@ -115,15 +136,17 @@ main(int argc, char *argv[])
}
static void
-parse_options(int argc, char *argv[])
+parse_options(int argc, char *argv[], char **unixctl_pathp)
{
enum {
OPT_CLUSTER = UCHAR_MAX + 1,
+ OPT_UNIXCTL,
DAEMON_OPTION_ENUMS,
VLOG_OPTION_ENUMS
};
static const struct option long_options[] = {
{"cluster", required_argument, NULL, OPT_CLUSTER},
+ {"unixctl", required_argument, NULL, OPT_UNIXCTL},
{"help", no_argument, NULL, 'h'},
DAEMON_LONG_OPTIONS,
VLOG_LONG_OPTIONS,
@@ -144,6 +167,10 @@ parse_options(int argc, char *argv[])
}
break;
+ case OPT_UNIXCTL:
+ *unixctl_pathp = optarg;
+ break;
+
case 'h':
usage();
@@ -170,7 +197,9 @@ usage(void)
daemon_usage();
vlog_usage();
printf("\nOther options:\n"
- " -h, --help display this help message\n");
+ " --cluster=UUID force cluster ID\n"
+ " --unixctl=SOCKET override default control socket name\n"
+ " -h, --help display this help message\n");
exit(EXIT_SUCCESS);
}
@@ -187,14 +216,16 @@ test_raft_exit(struct unixctl_conn *conn,
static void
test_raft_execute(struct unixctl_conn *conn,
int argc OVS_UNUSED, const char *argv[],
- void *raft_)
+ void *ctx_)
{
- struct raft *raft = raft_;
- struct raft_command *cmd = raft_command_execute(raft, argv[1]);
- if (cmd) {
- /* XXX handle error */
+ struct execute_ctx *ctx = ctx_;
+ if (ctx->cmd) {
+ unixctl_command_reply_error(conn, "command already in progress");
+ return;
}
- unixctl_command_reply(conn, NULL);
+
+ ctx->cmd = raft_command_execute(ctx->raft, argv[1]);
+ ctx->conn = conn;
}