diff options
-rw-r--r-- | ovsdb/raft.c | 133 | ||||
-rw-r--r-- | ovsdb/raft.h | 6 | ||||
-rw-r--r-- | tests/test-raft.c | 59 |
3 files changed, 163 insertions, 35 deletions
diff --git a/ovsdb/raft.c b/ovsdb/raft.c index 171121212..858fe6885 100644 --- a/ovsdb/raft.c +++ b/ovsdb/raft.c @@ -116,10 +116,20 @@ struct raft_conn { }; struct raft_command { - struct ovs_refcount refcnt; + struct hmap_node hmap_node; /* In struct raft's 'commands' hmap. */ + uint64_t index; /* Index in log. */ + + unsigned int n_refs; enum raft_command_status status; }; +static void raft_command_complete(struct raft *, struct raft_command *, + enum raft_command_status); + +static void raft_complete_all_commands(struct raft *, + enum raft_command_status); +static struct raft_command *raft_find_command(struct raft *, uint64_t index); + enum raft_waiter_type { RAFT_W_COMMAND, RAFT_W_APPEND @@ -234,6 +244,7 @@ struct raft { /* Leaders only. Reinitialized after becoming leader. */ struct hmap add_servers; /* Contains "struct raft_server"s to add. */ struct raft_server *remove_server; /* Server being removed. */ + struct hmap commands; /* Contains "struct raft_command"s. */ /* Candidates only. Reinitialized at start of election. */ int n_votes; /* Number of votes for me. */ @@ -644,6 +655,7 @@ raft_alloc(const char *file_name) hmap_init(&raft->servers); hmap_init(&raft->prev_servers); hmap_init(&raft->add_servers); + hmap_init(&raft->commands); raft->listen_backoff = LLONG_MIN; ovs_list_init(&raft->conns); ovs_mutex_init(&raft->fsync_mutex); @@ -1422,6 +1434,8 @@ raft_close(struct raft *raft) /* XXX if we're leader then invoke the leadership transfer procedure? */ + raft_complete_all_commands(raft, RAFT_CMD_SHUTDOWN); + ovs_mutex_lock(&raft->fsync_mutex); raft->fsync_next = UINT64_MAX; ovs_mutex_unlock(&raft->fsync_mutex); @@ -1563,14 +1577,6 @@ raft_waiter_complete(struct raft *raft, struct raft_waiter *w) } static void -raft_command_unref(struct raft_command *cmd) -{ - if (cmd && ovs_refcount_unref(&cmd->refcnt) == 1) { - free(cmd); - } -} - -static void raft_waiter_destroy(struct raft_waiter *w) { if (!w) { @@ -1666,6 +1672,7 @@ raft_start_election(struct raft *raft) static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5); ovs_assert(raft->role != RAFT_LEADER); + ovs_assert(hmap_is_empty(&raft->commands)); raft->role = RAFT_CANDIDATE; /* XXX what if we're not part of the server set? */ @@ -1709,7 +1716,8 @@ raft_start_election(struct raft *raft) raft_send(raft, &rq); } - /* Vote for ourselves. */ + /* Vote for ourselves. + * XXX only if we're not being removed? */ raft_accept_vote(raft, raft->me, true); /* XXX how do we handle outstanding waiters? */ @@ -1829,33 +1837,50 @@ raft_waiter_create(struct raft *raft, enum raft_waiter_type type) return w; } +const char * +raft_command_status_to_string(enum raft_command_status status) +{ + switch (status) { + case RAFT_CMD_INCOMPLETE: + return "operation still in progress"; + case RAFT_CMD_SUCCESS: + return "success"; + case RAFT_CMD_NOT_LEADER: + return "not leader"; + case RAFT_CMD_LOST_LEADERSHIP: + return "lost leadership"; + case RAFT_CMD_SHUTDOWN: + return "server shutdown"; + default: + OVS_NOT_REACHED(); + } +} + static struct raft_command * OVS_WARN_UNUSED_RESULT raft_command_execute__(struct raft *raft, enum raft_entry_type type, const char *data) { struct raft_command *cmd = xzalloc(sizeof *cmd); + cmd->n_refs = 2; /* One for client, one for raft->commands. */ + cmd->index = raft->log_end; + cmd->status = RAFT_CMD_INCOMPLETE; + hmap_insert(&raft->commands, &cmd->hmap_node, cmd->index); + if (raft->role != RAFT_LEADER) { - cmd->status = RAFT_CMD_NOT_LEADER; + raft_command_complete(raft, cmd, RAFT_CMD_NOT_LEADER); return cmd; } - uint64_t index = raft->log_end; - ovs_refcount_init(&cmd->refcnt); - cmd->status = RAFT_CMD_INCOMPLETE; - /* Write to local log. * * XXX If this server is being removed from the configuration then we - * should not writ to the local log; see section 4.2.2. Or we could + * should not write to the local log; see section 4.2.2. Or we could * implement leadership transfer. */ struct ovsdb_error *error = raft_write_entry(raft, raft->current_term, type, xstrdup(data)); if (!error) { - ovs_refcount_ref(&cmd->refcnt); - struct raft_waiter *w = raft_waiter_create(raft, RAFT_W_COMMAND); - w->command.cmd = cmd; - w->command.index = index; + w->command.index = cmd->index; } else { static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5); char *s = ovsdb_error_to_string(error); @@ -1883,6 +1908,66 @@ raft_command_execute(struct raft *raft, const char *data) { return raft_command_execute__(raft, RAFT_DATA, data); } + +enum raft_command_status +raft_command_get_status(const struct raft_command *cmd) +{ + ovs_assert(cmd->n_refs > 0); + return cmd->status; +} + +void +raft_command_unref(struct raft_command *cmd) +{ + if (cmd) { + ovs_assert(cmd->n_refs > 0); + if (!--cmd->n_refs) { + free(cmd); + } + } +} + +void +raft_command_wait(const struct raft_command *cmd) +{ + if (cmd->status != RAFT_CMD_INCOMPLETE) { + poll_immediate_wake(); + } +} + +static void +raft_command_complete(struct raft *raft, + struct raft_command *cmd, + enum raft_command_status status) +{ + ovs_assert(cmd->status == RAFT_CMD_INCOMPLETE); + ovs_assert(cmd->n_refs > 0); + hmap_remove(&raft->commands, &cmd->hmap_node); + cmd->status = status; + raft_command_unref(cmd); +} + +static void +raft_complete_all_commands(struct raft *raft, enum raft_command_status status) +{ + struct raft_command *cmd, *next; + HMAP_FOR_EACH_SAFE (cmd, next, hmap_node, &raft->commands) { + raft_command_complete(raft, cmd, status); + } +} + +static struct raft_command * +raft_find_command(struct raft *raft, uint64_t index) +{ + struct raft_command *cmd; + + HMAP_FOR_EACH_IN_BUCKET (cmd, hmap_node, index, &raft->commands) { + if (cmd->index == index) { + return cmd; + } + } + return NULL; +} static void raft_rpc_destroy(union raft_rpc *rpc) @@ -2356,6 +2441,7 @@ raft_become_follower(struct raft *raft) } /* XXX how do we handle outstanding waiters? */ + raft_complete_all_commands(raft, RAFT_CMD_LOST_LEADERSHIP); } static void @@ -2765,6 +2851,13 @@ raft_update_commit_index(struct raft *raft, uint64_t new_commit_index) } } else { /* XXX apply log[lastApplied]. */ + if (raft->role == RAFT_LEADER) { + struct raft_command *cmd + = raft_find_command(raft, raft->last_applied); + if (cmd) { + raft_command_complete(raft, cmd, RAFT_CMD_SUCCESS); + } + } } } } diff --git a/ovsdb/raft.h b/ovsdb/raft.h index 4c012f9d1..4f68579c3 100644 --- a/ovsdb/raft.h +++ b/ovsdb/raft.h @@ -73,11 +73,15 @@ enum raft_command_status { RAFT_CMD_INCOMPLETE, /* In progress, please wait. */ RAFT_CMD_SUCCESS, /* Committed. */ RAFT_CMD_NOT_LEADER, /* Failed because we are not the leader. */ - RAFT_CMD_FAILURE, /* Other failure. */ + RAFT_CMD_LOST_LEADERSHIP, /* Leadership lost after command initiation. */ + RAFT_CMD_SHUTDOWN, /* Raft server shut down. */ }; +const char *raft_command_status_to_string(enum raft_command_status); + struct raft_command *raft_command_execute(struct raft *, const char *data) OVS_WARN_UNUSED_RESULT; enum raft_command_status raft_command_get_status(const struct raft_command *); +void raft_command_unref(struct raft_command *); void raft_command_wait(const struct raft_command *); #endif /* lib/raft.h */ diff --git a/tests/test-raft.c b/tests/test-raft.c index dbf726e91..92e5cefe4 100644 --- a/tests/test-raft.c +++ b/tests/test-raft.c @@ -28,8 +28,14 @@ #include "uuid.h" #include "openvswitch/vlog.h" +struct execute_ctx { + struct raft *raft; + struct raft_command *cmd; + struct unixctl_conn *conn; +}; + OVS_NO_RETURN static void usage(void); -static void parse_options(int argc, char *argv[]); +static void parse_options(int argc, char *argv[], char **unixctl_pathp); static unixctl_cb_func test_raft_exit; static unixctl_cb_func test_raft_execute; @@ -51,10 +57,11 @@ check_ovsdb_error(struct ovsdb_error *error) int main(int argc, char *argv[]) { + char *unixctl_pathp = NULL; set_program_name(argv[0]); service_start(&argc, &argv); fatal_signal_init(); - parse_options(argc, argv); + parse_options(argc, argv, &unixctl_pathp); argc -= optind; argv += optind; @@ -65,7 +72,6 @@ main(int argc, char *argv[]) daemonize_start(false); - struct raft *raft; const char *file_name = argv[0]; if (argc == 1) { @@ -83,16 +89,19 @@ main(int argc, char *argv[]) } struct unixctl_server *server; - int error = unixctl_server_create(NULL, &server); + int error = unixctl_server_create(unixctl_pathp, &server); if (error) { ovs_fatal(error, "failed to create unixctl server"); } bool exiting = false; unixctl_command_register("exit", "", 0, 0, test_raft_exit, &exiting); - unixctl_command_register("execute", "DATA", 1, 1, test_raft_execute, raft); + + struct execute_ctx ec = { .raft = raft }; + unixctl_command_register("execute", "DATA", 1, 1, test_raft_execute, &ec); + unixctl_command_register("take-leadership", "", 0, 0, - test_raft_take_leadership, raft); + test_raft_take_leadership, NULL); daemonize_complete(); @@ -104,6 +113,18 @@ main(int argc, char *argv[]) break; } + if (ec.cmd) { + enum raft_command_status status = raft_command_get_status(ec.cmd); + if (status != RAFT_CMD_INCOMPLETE) { + unixctl_command_reply(ec.conn, + raft_command_status_to_string(status)); + raft_command_unref(ec.cmd); + + ec.cmd = NULL; + ec.conn = NULL; + } + } + unixctl_server_wait(server); raft_wait(raft); poll_block(); @@ -115,15 +136,17 @@ main(int argc, char *argv[]) } static void -parse_options(int argc, char *argv[]) +parse_options(int argc, char *argv[], char **unixctl_pathp) { enum { OPT_CLUSTER = UCHAR_MAX + 1, + OPT_UNIXCTL, DAEMON_OPTION_ENUMS, VLOG_OPTION_ENUMS }; static const struct option long_options[] = { {"cluster", required_argument, NULL, OPT_CLUSTER}, + {"unixctl", required_argument, NULL, OPT_UNIXCTL}, {"help", no_argument, NULL, 'h'}, DAEMON_LONG_OPTIONS, VLOG_LONG_OPTIONS, @@ -144,6 +167,10 @@ parse_options(int argc, char *argv[]) } break; + case OPT_UNIXCTL: + *unixctl_pathp = optarg; + break; + case 'h': usage(); @@ -170,7 +197,9 @@ usage(void) daemon_usage(); vlog_usage(); printf("\nOther options:\n" - " -h, --help display this help message\n"); + " --cluster=UUID force cluster ID\n" + " --unixctl=SOCKET override default control socket name\n" + " -h, --help display this help message\n"); exit(EXIT_SUCCESS); } @@ -187,14 +216,16 @@ test_raft_exit(struct unixctl_conn *conn, static void test_raft_execute(struct unixctl_conn *conn, int argc OVS_UNUSED, const char *argv[], - void *raft_) + void *ctx_) { - struct raft *raft = raft_; - struct raft_command *cmd = raft_command_execute(raft, argv[1]); - if (cmd) { - /* XXX handle error */ + struct execute_ctx *ctx = ctx_; + if (ctx->cmd) { + unixctl_command_reply_error(conn, "command already in progress"); + return; } - unixctl_command_reply(conn, NULL); + + ctx->cmd = raft_command_execute(ctx->raft, argv[1]); + ctx->conn = conn; } |