summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDaniel Danzberger <daniel@dd-wrt.com>2022-06-08 13:12:29 +0200
committerFelix Fietkau <nbd@nbd.name>2022-06-15 19:59:40 +0200
commit9913aa61de739e3efe067a2d186021c20bcd65e2 (patch)
tree33c326db6b1b5d3424eeba57bd405d446b4f8eb4
parent2bebf93cd3343fe49f22a05ef935e460d2d44f67 (diff)
downloadubus-9913aa61de739e3efe067a2d186021c20bcd65e2.tar.gz
ubusd: add lookup command queuing supportHEADmaster
Defers and continues a client's lookup command to avoid unnecessary buffering under load. Signed-off-by: Daniel Danzberger <daniel@dd-wrt.com> Signed-off-by: Felix Fietkau <nbd@nbd.name>
-rw-r--r--ubusd.h8
-rw-r--r--ubusd_main.c35
-rw-r--r--ubusd_proto.c87
3 files changed, 122 insertions, 8 deletions
diff --git a/ubusd.h b/ubusd.h
index c5d6d2a..f43b936 100644
--- a/ubusd.h
+++ b/ubusd.h
@@ -41,6 +41,12 @@ struct ubus_msg_buf_list {
struct ubus_msg_buf *msg;
};
+struct ubus_client_cmd {
+ struct list_head list;
+ struct ubus_msg_buf *msg;
+ struct ubus_object *obj;
+};
+
struct ubus_client {
struct ubus_id id;
struct uloop_fd sock;
@@ -53,6 +59,7 @@ struct ubus_client {
struct list_head objects;
+ struct list_head cmd_queue;
struct list_head tx_queue;
unsigned int txq_ofs;
unsigned int txq_len;
@@ -86,6 +93,7 @@ void ubusd_proto_receive_message(struct ubus_client *cl, struct ubus_msg_buf *ub
void ubusd_proto_free_client(struct ubus_client *cl);
void ubus_proto_send_msg_from_blob(struct ubus_client *cl, struct ubus_msg_buf *ub,
uint8_t type);
+int ubusd_cmd_lookup(struct ubus_client *cl, struct ubus_client_cmd *cmd);
typedef struct ubus_msg_buf *(*event_fill_cb)(void *priv, const char *id);
void ubusd_event_init(void);
diff --git a/ubusd_main.c b/ubusd_main.c
index 6b132ce..adbd293 100644
--- a/ubusd_main.c
+++ b/ubusd_main.c
@@ -32,6 +32,28 @@ static void handle_client_disconnect(struct ubus_client *cl)
free(cl);
}
+static void ubus_client_cmd_free(struct ubus_client_cmd *cmd)
+{
+ list_del(&cmd->list);
+ ubus_msg_free(cmd->msg);
+ free(cmd);
+}
+
+static void ubus_client_cmd_queue_process(struct ubus_client *cl)
+{
+ struct ubus_client_cmd *cmd, *tmp;
+
+ list_for_each_entry_safe(cmd, tmp, &cl->cmd_queue, list) {
+ int ret = ubusd_cmd_lookup(cl, cmd);
+
+ /* Stop if the last command caused buffering again */
+ if (ret == -2)
+ break;
+
+ ubus_client_cmd_free(cmd);
+ }
+}
+
static void client_cb(struct uloop_fd *sock, unsigned int events)
{
struct ubus_client *cl = container_of(sock, struct ubus_client, sock);
@@ -82,10 +104,15 @@ static void client_cb(struct uloop_fd *sock, unsigned int events)
ubus_msg_list_free(ubl);
}
- /* prevent further ULOOP_WRITE events if we don't have data
- * to send anymore */
- if (list_empty(&cl->tx_queue) && (events & ULOOP_WRITE))
- uloop_fd_add(sock, ULOOP_READ | ULOOP_EDGE_TRIGGER);
+ if (list_empty(&cl->tx_queue) && (events & ULOOP_WRITE)) {
+ /* Process queued commands */
+ ubus_client_cmd_queue_process(cl);
+
+ /* prevent further ULOOP_WRITE events if we don't have data
+ * to send anymore */
+ if (list_empty(&cl->tx_queue))
+ uloop_fd_add(sock, ULOOP_READ | ULOOP_EDGE_TRIGGER);
+ }
retry:
if (!sock->eof && cl->pending_msg_offset < (int) sizeof(cl->hdrbuf)) {
diff --git a/ubusd_proto.c b/ubusd_proto.c
index b20f91c..48de9b9 100644
--- a/ubusd_proto.c
+++ b/ubusd_proto.c
@@ -186,16 +186,56 @@ static void ubusd_send_obj(struct ubus_client *cl, struct ubus_msg_buf *ub, stru
ubus_proto_send_msg_from_blob(cl, ub, UBUS_MSG_DATA);
}
-static int ubusd_handle_lookup(struct ubus_client *cl, struct ubus_msg_buf *ub, struct blob_attr **attr)
+static int ubus_client_cmd_queue_add(struct ubus_client *cl,
+ struct ubus_msg_buf *msg,
+ struct ubus_object *obj)
{
- struct ubus_object *obj;
+ struct ubus_client_cmd *cmd = malloc(sizeof(*cmd));
+
+ if (cmd) {
+ cmd->msg = msg;
+ cmd->obj = obj;
+ list_add_tail(&cmd->list, &cl->cmd_queue);
+ return -2;
+ }
+ return UBUS_STATUS_UNKNOWN_ERROR;
+}
+
+static int __ubusd_handle_lookup(struct ubus_client *cl,
+ struct ubus_msg_buf *ub,
+ struct blob_attr **attr,
+ struct ubus_client_cmd *cmd)
+{
+ struct ubus_object *obj = NULL;
char *objpath;
bool found = false;
int len;
if (!attr[UBUS_ATTR_OBJPATH]) {
- avl_for_each_element(&path, obj, path)
- ubusd_send_obj(cl, ub, obj);
+ if (cmd)
+ obj = cmd->obj;
+
+ /* Start from beginning or continue from the last object */
+ if (obj == NULL)
+ obj = avl_first_element(&path, obj, path);
+
+ avl_for_element_range(obj, avl_last_element(&path, obj, path), obj, path) {
+ /* Keep sending objects until buffering starts */
+ if (list_empty(&cl->tx_queue)) {
+ ubusd_send_obj(cl, ub, obj);
+ } else {
+ /* Queue command and continue on the next call */
+ int ret;
+
+ if (cmd == NULL) {
+ ret = ubus_client_cmd_queue_add(cl, ub, obj);
+ } else {
+ cmd->obj = obj;
+ ret = -2;
+ }
+ return ret;
+ }
+ }
return 0;
}
@@ -230,6 +270,40 @@ static int ubusd_handle_lookup(struct ubus_client *cl, struct ubus_msg_buf *ub,
return 0;
}
+static int ubusd_handle_lookup(struct ubus_client *cl, struct ubus_msg_buf *ub, struct blob_attr **attr)
+{
+ int rc;
+
+ if (list_empty(&cl->tx_queue))
+ rc = __ubusd_handle_lookup(cl, ub, attr, NULL);
+ else
+ rc = ubus_client_cmd_queue_add(cl, ub, NULL);
+
+ return rc;
+}
+
+int ubusd_cmd_lookup(struct ubus_client *cl, struct ubus_client_cmd *cmd)
+{
+ struct ubus_msg_buf *ub = cmd->msg;
+ struct blob_attr **attr;
+ int ret;
+
+ attr = ubus_parse_msg(ub->data, blob_raw_len(ub->data));
+ ret = __ubusd_handle_lookup(cl, ub, attr, cmd);
+
+ if (ret != -2) {
+ struct ubus_msg_buf *retmsg = cl->retmsg;
+ int *retmsg_data = blob_data(blob_data(retmsg->data));
+
+ retmsg->hdr.seq = ub->hdr.seq;
+ retmsg->hdr.peer = ub->hdr.peer;
+
+ *retmsg_data = htonl(ret);
+ ubus_msg_send(cl, retmsg);
+ }
+ return ret;
+}
+
static void
ubusd_forward_invoke(struct ubus_client *cl, struct ubus_object *obj,
const char *method, struct ubus_msg_buf *ub,
@@ -458,6 +532,10 @@ void ubusd_proto_receive_message(struct ubus_client *cl, struct ubus_msg_buf *ub
else
ret = UBUS_STATUS_INVALID_COMMAND;
+ /* Command has not been completed yet and got queued */
+ if (ret == -2)
+ return;
+
ubus_msg_free(ub);
if (ret == -1)
@@ -495,6 +573,7 @@ struct ubus_client *ubusd_proto_new_client(int fd, uloop_fd_handler cb)
goto free;
INIT_LIST_HEAD(&cl->objects);
+ INIT_LIST_HEAD(&cl->cmd_queue);
INIT_LIST_HEAD(&cl->tx_queue);
cl->sock.fd = fd;
cl->sock.cb = cb;