From 89b2072b4fe32c5ad1ceaad9fb1e58b1d1e03bc6 Mon Sep 17 00:00:00 2001 From: Luiz Augusto von Dentz Date: Mon, 12 Dec 2022 15:34:50 -0800 Subject: client/player: Make transport.send non-blocking This makes transport.send command non-blocking by using timerfd callback to initiate the transfers. --- client/player.c | 193 ++++++++++++++++++++++++++++++++++++++------------------ 1 file changed, 132 insertions(+), 61 deletions(-) (limited to 'client') diff --git a/client/player.c b/client/player.c index 8b3785d6b..1f10387f8 100644 --- a/client/player.c +++ b/client/player.c @@ -24,6 +24,7 @@ #include #include #include +#include #include @@ -92,6 +93,7 @@ struct transport { int fd; struct io *io; uint32_t seq; + struct io *timer_io; }; static void endpoint_unregister(void *data) @@ -2959,6 +2961,8 @@ static void transport_close(struct transport *transport) return; close(transport->fd); + transport->fd = -1; + free(transport->filename); } @@ -2966,6 +2970,7 @@ static void transport_free(void *data) { struct transport *transport = data; + io_destroy(transport->timer_io); io_destroy(transport->io); free(transport); } @@ -3330,104 +3335,166 @@ static int open_file(const char *filename, int flags) return fd; } -#define NSEC_USEC(_t) (_t / 1000L) -#define SEC_USEC(_t) (_t * 1000000L) -#define TS_USEC(_ts) (SEC_USEC((_ts)->tv_sec) + NSEC_USEC((_ts)->tv_nsec)) - -static void send_wait(struct timespec *t_start, uint32_t us) +static int elapsed_time(bool reset, int *secs, int *nsecs) { - struct timespec t_now; - struct timespec t_diff; - int64_t delta_us; - - /* Skip sleep at start */ - if (!us) - return; + static struct timespec start; + struct timespec curr; - if (clock_gettime(CLOCK_MONOTONIC, &t_now) < 0) { - bt_shell_printf("clock_gettime: %s (%d)", strerror(errno), - errno); - return; + if (reset) { + if (clock_gettime(CLOCK_MONOTONIC, &start) < 0) { + bt_shell_printf("clock_gettime: %s (%d)", + strerror(errno), errno); + return -errno; + } } - t_diff.tv_sec = t_now.tv_sec - t_start->tv_sec; - if (t_start->tv_nsec > t_now.tv_nsec) { - t_diff.tv_sec--; - t_now.tv_nsec += 1000000000L; + if (clock_gettime(CLOCK_MONOTONIC, &curr) < 0) { + bt_shell_printf("clock_gettime: %s (%d)", strerror(errno), + errno); + return -errno; } - t_diff.tv_nsec = t_now.tv_nsec - t_start->tv_nsec; - delta_us = us - TS_USEC(&t_diff); - - if (delta_us < 0) { - bt_shell_printf("Send is behind: %" PRId64 " us - skip sleep", - delta_us); - delta_us = 1000; + *secs = curr.tv_sec - start.tv_sec; + *nsecs = curr.tv_nsec - start.tv_nsec; + if (*nsecs < 0) { + (*secs)--; + *nsecs += 1000000000; } - usleep(delta_us); - - if (clock_gettime(CLOCK_MONOTONIC, t_start) < 0) - bt_shell_printf("clock_gettime: %s (%d)", strerror(errno), - errno); + return 0; } -static int transport_send(struct transport *transport, int fd, - struct bt_iso_qos *qos) +static int transport_send_seq(struct transport *transport, int fd, uint32_t num) { - struct timespec t_start; uint8_t *buf; - uint32_t num = 0; + uint32_t i; - if (qos && clock_gettime(CLOCK_MONOTONIC, &t_start) < 0) { - bt_shell_printf("clock_gettime: %s (%d)", strerror(errno), - errno); - return -errno; - } + if (!num) + return 0; buf = malloc(transport->mtu[1]); - if (!buf) { - bt_shell_printf("malloc: %s (%d)", strerror(errno), errno); + if (!buf) return -ENOMEM; - } - - /* num of packets = latency (ms) / interval (us) */ - if (qos) - num = (qos->out.latency * 1000 / qos->out.interval); - for (transport->seq = 0; ; transport->seq++) { + for (i = 0; i < num; i++, transport->seq++) { ssize_t ret; int queued; + int secs = 0, nsecs = 0; ret = read(fd, buf, transport->mtu[1]); if (ret <= 0) { if (ret < 0) bt_shell_printf("read failed: %s (%d)", strerror(errno), errno); - close(fd); + free(buf); return ret; } ret = send(transport->sk, buf, ret, 0); if (ret <= 0) { - bt_shell_printf("Send failed: %s (%d)", + bt_shell_printf("send failed: %s (%d)", strerror(errno), errno); + free(buf); return -errno; } + elapsed_time(!transport->seq, &secs, &nsecs); + ioctl(transport->sk, TIOCOUTQ, &queued); - bt_shell_printf("[seq %d] send: %zd bytes " + bt_shell_printf("[seq %d %d.%03ds] send: %zd bytes " "(TIOCOUTQ %d bytes)\n", - transport->seq, ret, queued); - - if (qos) { - if (transport->seq && !((transport->seq + 1) % num)) - send_wait(&t_start, num * qos->out.interval); - } + transport->seq, secs, + (nsecs + 500000) / 1000000, + ret, queued); } free(buf); + + return i; +} + +static bool transport_timer_read(struct io *io, void *user_data) +{ + struct transport *transport = user_data; + struct bt_iso_qos qos; + socklen_t len; + int ret, fd; + uint32_t num; + uint64_t exp; + + if (transport->fd < 0) + return false; + + fd = io_get_fd(io); + ret = read(fd, &exp, sizeof(exp)); + if (ret < 0) { + bt_shell_printf("Failed to read: %s (%d)\n", strerror(errno), + -errno); + return false; + } + + /* Read QoS if available */ + memset(&qos, 0, sizeof(qos)); + len = sizeof(qos); + if (getsockopt(transport->sk, SOL_BLUETOOTH, BT_ISO_QOS, &qos, + &len) < 0) { + bt_shell_printf("Failed to getsockopt(BT_ISO_QOS): %s (%d)\n", + strerror(errno), -errno); + return false; + } + + /* num of packets = latency (ms) / interval (us) */ + num = (qos.out.latency * 1000 / qos.out.interval); + + ret = transport_send_seq(transport, transport->fd, num); + if (ret < 0) { + bt_shell_printf("Unable to send: %s (%d)\n", + strerror(-ret), ret); + return false; + } + + if (!ret) { + transport_close(transport); + return false; + } + + return true; +} + +static int transport_send(struct transport *transport, int fd, + struct bt_iso_qos *qos) +{ + struct itimerspec ts; + int timer_fd; + + transport->seq = 0; + + if (!qos) + return transport_send_seq(transport, fd, UINT32_MAX); + + if (transport->fd >= 0) + return -EALREADY; + + timer_fd = timerfd_create(CLOCK_MONOTONIC, 0); + if (timer_fd < 0) + return -errno; + + memset(&ts, 0, sizeof(ts)); + ts.it_value.tv_nsec = qos->out.latency * 1000000; + ts.it_interval.tv_nsec = qos->out.latency * 1000000; + + if (timerfd_settime(timer_fd, TFD_TIMER_ABSTIME, &ts, NULL) < 0) + return -errno; + + transport->fd = fd; + + transport->timer_io = io_new(timer_fd); + + io_set_read_handler(transport->timer_io, transport_timer_read, + transport, NULL); + + return transport_send_seq(transport, fd, 1); } static void cmd_send_transport(int argc, char *argv[]) @@ -3457,6 +3524,8 @@ static void cmd_send_transport(int argc, char *argv[]) } fd = open_file(argv[2], O_RDONLY); + if (fd < 0) + return bt_shell_noninteractive_quit(EXIT_FAILURE); bt_shell_printf("Sending ...\n"); @@ -3469,10 +3538,12 @@ static void cmd_send_transport(int argc, char *argv[]) else err = transport_send(transport, fd, &qos); - close(fd); - - if (err < 0) + if (err < 0) { + bt_shell_printf("Unable to send: %s (%d)", strerror(-err), + -err); + close(fd); return bt_shell_noninteractive_quit(EXIT_FAILURE); + } return bt_shell_noninteractive_quit(EXIT_SUCCESS); } -- cgit v1.2.1