From 9913aa61de739e3efe067a2d186021c20bcd65e2 Mon Sep 17 00:00:00 2001 From: Daniel Danzberger Date: Wed, 8 Jun 2022 13:12:29 +0200 Subject: ubusd: add lookup command queuing support Defers and continues a client's lookup command to avoid unnecessary buffering under load. Signed-off-by: Daniel Danzberger Signed-off-by: Felix Fietkau --- ubusd.h | 8 ++++++ ubusd_main.c | 35 +++++++++++++++++++++--- ubusd_proto.c | 87 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++--- 3 files changed, 122 insertions(+), 8 deletions(-) diff --git a/ubusd.h b/ubusd.h index c5d6d2a..f43b936 100644 --- a/ubusd.h +++ b/ubusd.h @@ -41,6 +41,12 @@ struct ubus_msg_buf_list { struct ubus_msg_buf *msg; }; +struct ubus_client_cmd { + struct list_head list; + struct ubus_msg_buf *msg; + struct ubus_object *obj; +}; + struct ubus_client { struct ubus_id id; struct uloop_fd sock; @@ -53,6 +59,7 @@ struct ubus_client { struct list_head objects; + struct list_head cmd_queue; struct list_head tx_queue; unsigned int txq_ofs; unsigned int txq_len; @@ -86,6 +93,7 @@ void ubusd_proto_receive_message(struct ubus_client *cl, struct ubus_msg_buf *ub void ubusd_proto_free_client(struct ubus_client *cl); void ubus_proto_send_msg_from_blob(struct ubus_client *cl, struct ubus_msg_buf *ub, uint8_t type); +int ubusd_cmd_lookup(struct ubus_client *cl, struct ubus_client_cmd *cmd); typedef struct ubus_msg_buf *(*event_fill_cb)(void *priv, const char *id); void ubusd_event_init(void); diff --git a/ubusd_main.c b/ubusd_main.c index 6b132ce..adbd293 100644 --- a/ubusd_main.c +++ b/ubusd_main.c @@ -32,6 +32,28 @@ static void handle_client_disconnect(struct ubus_client *cl) free(cl); } +static void ubus_client_cmd_free(struct ubus_client_cmd *cmd) +{ + list_del(&cmd->list); + ubus_msg_free(cmd->msg); + free(cmd); +} + +static void ubus_client_cmd_queue_process(struct ubus_client *cl) +{ + struct ubus_client_cmd *cmd, *tmp; + + list_for_each_entry_safe(cmd, tmp, &cl->cmd_queue, list) { + int ret = ubusd_cmd_lookup(cl, cmd); + + /* Stop if the last command caused buffering again */ + if (ret == -2) + break; + + ubus_client_cmd_free(cmd); + } +} + static void client_cb(struct uloop_fd *sock, unsigned int events) { struct ubus_client *cl = container_of(sock, struct ubus_client, sock); @@ -82,10 +104,15 @@ static void client_cb(struct uloop_fd *sock, unsigned int events) ubus_msg_list_free(ubl); } - /* prevent further ULOOP_WRITE events if we don't have data - * to send anymore */ - if (list_empty(&cl->tx_queue) && (events & ULOOP_WRITE)) - uloop_fd_add(sock, ULOOP_READ | ULOOP_EDGE_TRIGGER); + if (list_empty(&cl->tx_queue) && (events & ULOOP_WRITE)) { + /* Process queued commands */ + ubus_client_cmd_queue_process(cl); + + /* prevent further ULOOP_WRITE events if we don't have data + * to send anymore */ + if (list_empty(&cl->tx_queue)) + uloop_fd_add(sock, ULOOP_READ | ULOOP_EDGE_TRIGGER); + } retry: if (!sock->eof && cl->pending_msg_offset < (int) sizeof(cl->hdrbuf)) { diff --git a/ubusd_proto.c b/ubusd_proto.c index b20f91c..48de9b9 100644 --- a/ubusd_proto.c +++ b/ubusd_proto.c @@ -186,16 +186,56 @@ static void ubusd_send_obj(struct ubus_client *cl, struct ubus_msg_buf *ub, stru ubus_proto_send_msg_from_blob(cl, ub, UBUS_MSG_DATA); } -static int ubusd_handle_lookup(struct ubus_client *cl, struct ubus_msg_buf *ub, struct blob_attr **attr) +static int ubus_client_cmd_queue_add(struct ubus_client *cl, + struct ubus_msg_buf *msg, + struct ubus_object *obj) { - struct ubus_object *obj; + struct ubus_client_cmd *cmd = malloc(sizeof(*cmd)); + + if (cmd) { + cmd->msg = msg; + cmd->obj = obj; + list_add_tail(&cmd->list, &cl->cmd_queue); + return -2; + } + return UBUS_STATUS_UNKNOWN_ERROR; +} + +static int __ubusd_handle_lookup(struct ubus_client *cl, + struct ubus_msg_buf *ub, + struct blob_attr **attr, + struct ubus_client_cmd *cmd) +{ + struct ubus_object *obj = NULL; char *objpath; bool found = false; int len; if (!attr[UBUS_ATTR_OBJPATH]) { - avl_for_each_element(&path, obj, path) - ubusd_send_obj(cl, ub, obj); + if (cmd) + obj = cmd->obj; + + /* Start from beginning or continue from the last object */ + if (obj == NULL) + obj = avl_first_element(&path, obj, path); + + avl_for_element_range(obj, avl_last_element(&path, obj, path), obj, path) { + /* Keep sending objects until buffering starts */ + if (list_empty(&cl->tx_queue)) { + ubusd_send_obj(cl, ub, obj); + } else { + /* Queue command and continue on the next call */ + int ret; + + if (cmd == NULL) { + ret = ubus_client_cmd_queue_add(cl, ub, obj); + } else { + cmd->obj = obj; + ret = -2; + } + return ret; + } + } return 0; } @@ -230,6 +270,40 @@ static int ubusd_handle_lookup(struct ubus_client *cl, struct ubus_msg_buf *ub, return 0; } +static int ubusd_handle_lookup(struct ubus_client *cl, struct ubus_msg_buf *ub, struct blob_attr **attr) +{ + int rc; + + if (list_empty(&cl->tx_queue)) + rc = __ubusd_handle_lookup(cl, ub, attr, NULL); + else + rc = ubus_client_cmd_queue_add(cl, ub, NULL); + + return rc; +} + +int ubusd_cmd_lookup(struct ubus_client *cl, struct ubus_client_cmd *cmd) +{ + struct ubus_msg_buf *ub = cmd->msg; + struct blob_attr **attr; + int ret; + + attr = ubus_parse_msg(ub->data, blob_raw_len(ub->data)); + ret = __ubusd_handle_lookup(cl, ub, attr, cmd); + + if (ret != -2) { + struct ubus_msg_buf *retmsg = cl->retmsg; + int *retmsg_data = blob_data(blob_data(retmsg->data)); + + retmsg->hdr.seq = ub->hdr.seq; + retmsg->hdr.peer = ub->hdr.peer; + + *retmsg_data = htonl(ret); + ubus_msg_send(cl, retmsg); + } + return ret; +} + static void ubusd_forward_invoke(struct ubus_client *cl, struct ubus_object *obj, const char *method, struct ubus_msg_buf *ub, @@ -458,6 +532,10 @@ void ubusd_proto_receive_message(struct ubus_client *cl, struct ubus_msg_buf *ub else ret = UBUS_STATUS_INVALID_COMMAND; + /* Command has not been completed yet and got queued */ + if (ret == -2) + return; + ubus_msg_free(ub); if (ret == -1) @@ -495,6 +573,7 @@ struct ubus_client *ubusd_proto_new_client(int fd, uloop_fd_handler cb) goto free; INIT_LIST_HEAD(&cl->objects); + INIT_LIST_HEAD(&cl->cmd_queue); INIT_LIST_HEAD(&cl->tx_queue); cl->sock.fd = fd; cl->sock.cb = cb; -- cgit v1.2.1