summaryrefslogtreecommitdiff
path: root/src/modules/bluetooth/module-bluez5-device.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/modules/bluetooth/module-bluez5-device.c')
-rw-r--r--src/modules/bluetooth/module-bluez5-device.c300
1 files changed, 202 insertions, 98 deletions
diff --git a/src/modules/bluetooth/module-bluez5-device.c b/src/modules/bluetooth/module-bluez5-device.c
index c3acc1dc6..edfa12fac 100644
--- a/src/modules/bluetooth/module-bluez5-device.c
+++ b/src/modules/bluetooth/module-bluez5-device.c
@@ -56,9 +56,8 @@ PA_MODULE_LOAD_ONCE(false);
PA_MODULE_USAGE("path=<device object path>"
"autodetect_mtu=<boolean>");
-#define MAX_PLAYBACK_CATCH_UP_USEC (100 * PA_USEC_PER_MSEC)
#define FIXED_LATENCY_PLAYBACK_A2DP (25 * PA_USEC_PER_MSEC)
-#define FIXED_LATENCY_PLAYBACK_SCO (125 * PA_USEC_PER_MSEC)
+#define FIXED_LATENCY_PLAYBACK_SCO (25 * PA_USEC_PER_MSEC)
#define FIXED_LATENCY_RECORD_A2DP (25 * PA_USEC_PER_MSEC)
#define FIXED_LATENCY_RECORD_SCO (25 * PA_USEC_PER_MSEC)
@@ -660,6 +659,38 @@ static int a2dp_process_push(struct userdata *u) {
return ret;
}
+static void update_buffer_size(struct userdata *u) {
+ int old_bufsize;
+ socklen_t len = sizeof(int);
+ int ret;
+
+ ret = getsockopt(u->stream_fd, SOL_SOCKET, SO_SNDBUF, &old_bufsize, &len);
+ if (ret == -1) {
+ pa_log_warn("Changing bluetooth buffer size: Failed to getsockopt(SO_SNDBUF): %s", pa_cstrerror(errno));
+ } else {
+ int new_bufsize;
+
+ /* Set send buffer size as small as possible. The minimum value is 1024 according to the
+ * socket man page. The data is written to the socket in chunks of write_block_size, so
+ * there should at least be room for two chunks in the buffer. Generally, write_block_size
+ * is larger than 512. If not, use the next multiple of write_block_size which is larger
+ * than 1024. */
+ new_bufsize = 2 * u->write_block_size;
+ if (new_bufsize < 1024)
+ new_bufsize = (1024 / u->write_block_size + 1) * u->write_block_size;
+
+ /* The kernel internally doubles the buffer size that was set by setsockopt and getsockopt
+ * returns the doubled value. */
+ if (new_bufsize != old_bufsize / 2) {
+ ret = setsockopt(u->stream_fd, SOL_SOCKET, SO_SNDBUF, &new_bufsize, len);
+ if (ret == -1)
+ pa_log_warn("Changing bluetooth buffer size: Failed to change from %d to %d: %s", old_bufsize / 2, new_bufsize, pa_cstrerror(errno));
+ else
+ pa_log_info("Changing bluetooth buffer size: Changed from %d to %d", old_bufsize / 2, new_bufsize);
+ }
+ }
+}
+
/* Run from I/O thread */
static void a2dp_set_bitpool(struct userdata *u, uint8_t bitpool) {
struct sbc_info *sbc_info;
@@ -694,6 +725,15 @@ static void a2dp_set_bitpool(struct userdata *u, uint8_t bitpool) {
pa_sink_set_max_request_within_thread(u->sink, u->write_block_size);
pa_sink_set_fixed_latency_within_thread(u->sink,
FIXED_LATENCY_PLAYBACK_A2DP + pa_bytes_to_usec(u->write_block_size, &u->sample_spec));
+
+ /* If there is still data in the memchunk, we have to discard it
+ * because the write_block_size may have changed. */
+ if (u->write_memchunk.memblock) {
+ pa_memblock_unref(u->write_memchunk.memblock);
+ pa_memchunk_reset(&u->write_memchunk);
+ }
+
+ update_buffer_size(u);
}
/* Run from I/O thread */
@@ -852,8 +892,10 @@ static void setup_stream(struct userdata *u) {
pa_log_debug("Stream properly set up, we're ready to roll!");
- if (u->profile == PA_BLUETOOTH_PROFILE_A2DP_SINK)
+ if (u->profile == PA_BLUETOOTH_PROFILE_A2DP_SINK) {
a2dp_set_bitpool(u, u->sbc_info.max_bitpool);
+ update_buffer_size(u);
+ }
u->rtpoll_item = pa_rtpoll_item_new(u->rtpoll, PA_RTPOLL_NEVER, 1);
pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL);
@@ -1068,12 +1110,12 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse
switch (code) {
case PA_SINK_MESSAGE_GET_LATENCY: {
- int64_t wi, ri;
+ int64_t wi = 0, ri = 0;
if (u->read_smoother) {
ri = pa_smoother_get(u->read_smoother, pa_rtclock_now());
wi = pa_bytes_to_usec(u->write_index + u->write_block_size, &u->sample_spec);
- } else {
+ } else if (u->started_at) {
ri = pa_rtclock_now() - u->started_at;
wi = pa_bytes_to_usec(u->write_index, &u->sample_spec);
}
@@ -1415,12 +1457,32 @@ static int init_profile(struct userdata *u) {
return r;
}
+static int write_block(struct userdata *u) {
+ int n_written;
+
+ if (u->write_index <= 0)
+ u->started_at = pa_rtclock_now();
+
+ if (u->profile == PA_BLUETOOTH_PROFILE_A2DP_SINK) {
+ if ((n_written = a2dp_process_render(u)) < 0)
+ return -1;
+ } else {
+ if ((n_written = sco_process_render(u)) < 0)
+ return -1;
+ }
+
+ if (n_written == 0)
+ pa_log_debug("Got EAGAIN on write() after POLLOUT, probably there is a temporary connection loss.");
+
+ return n_written;
+}
+
+
/* I/O thread function */
static void thread_func(void *userdata) {
struct userdata *u = userdata;
- unsigned do_write = 0;
- unsigned pending_read_bytes = 0;
- bool writable = false;
+ unsigned blocks_to_write = 0;
+ unsigned bytes_to_write = 0;
pa_assert(u);
pa_assert(u->transport);
@@ -1440,9 +1502,13 @@ static void thread_func(void *userdata) {
struct pollfd *pollfd;
int ret;
bool disable_timer = true;
+ bool writable = false;
+ bool have_source = u->source ? PA_SOURCE_IS_LINKED(u->source->thread_info.state) : false;
+ bool have_sink = u->sink ? PA_SINK_IS_LINKED(u->sink->thread_info.state) : false;
pollfd = u->rtpoll_item ? pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL) : NULL;
+ /* Check for stream error or close */
if (pollfd && (pollfd->revents & ~(POLLOUT|POLLIN))) {
pa_log_info("FD error: %s%s%s%s",
pollfd->revents & POLLERR ? "POLLERR " :"",
@@ -1453,147 +1519,185 @@ static void thread_func(void *userdata) {
if (pollfd->revents & POLLHUP) {
pollfd = NULL;
teardown_stream(u);
- do_write = 0;
- pending_read_bytes = 0;
- writable = false;
+ blocks_to_write = 0;
+ bytes_to_write = 0;
pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(u->msg), BLUETOOTH_MESSAGE_STREAM_FD_HUP, NULL, 0, NULL, NULL);
} else
goto fail;
}
- if (u->source && PA_SOURCE_IS_LINKED(u->source->thread_info.state)) {
+ /* If there is a pollfd, the stream is set up and we need to do something */
+ if (pollfd) {
- /* We should send two blocks to the device before we expect
- * a response. */
+ /* Handle source if present */
+ if (have_source) {
- if (u->write_index == 0 && u->read_index <= 0)
- do_write = 2;
+ /* We should send two blocks to the device before we expect a response. */
+ if (u->write_index == 0 && u->read_index <= 0)
+ blocks_to_write = 2;
- if (pollfd && (pollfd->revents & POLLIN)) {
- int n_read;
+ /* If we got woken up by POLLIN let's do some reading */
+ if (pollfd->revents & POLLIN) {
+ int n_read;
- if (u->profile == PA_BLUETOOTH_PROFILE_A2DP_SOURCE)
- n_read = a2dp_process_push(u);
- else
- n_read = sco_process_push(u);
+ if (u->profile == PA_BLUETOOTH_PROFILE_A2DP_SOURCE)
+ n_read = a2dp_process_push(u);
+ else
+ n_read = sco_process_push(u);
- if (n_read < 0)
- goto fail;
+ if (n_read < 0)
+ goto fail;
- if (n_read > 0) {
- /* We just read something, so we are supposed to write something, too */
- pending_read_bytes += n_read;
- do_write += pending_read_bytes / u->write_block_size;
- pending_read_bytes = pending_read_bytes % u->write_block_size;
+ if (n_read > 0) {
+ /* We just read something, so we are supposed to write something, too */
+ bytes_to_write += n_read;
+ blocks_to_write += bytes_to_write / u->write_block_size;
+ bytes_to_write = bytes_to_write % u->write_block_size;
+ }
}
}
- }
- if (u->sink && PA_SINK_IS_LINKED(u->sink->thread_info.state)) {
+ /* Handle sink if present */
+ if (have_sink) {
- if (PA_UNLIKELY(u->sink->thread_info.rewind_requested))
- pa_sink_process_rewind(u->sink, 0);
+ /* Process rewinds */
+ if (PA_UNLIKELY(u->sink->thread_info.rewind_requested))
+ pa_sink_process_rewind(u->sink, 0);
- if (pollfd) {
+ /* Test if the stream is writable */
if (pollfd->revents & POLLOUT)
writable = true;
- if ((!u->source || !PA_SOURCE_IS_LINKED(u->source->thread_info.state)) && do_write <= 0 && writable) {
- pa_usec_t time_passed;
- pa_usec_t audio_sent;
+ /* If we have a source, we let the source determine the timing
+ * for the sink */
+ if (have_source) {
- /* Hmm, there is no input stream we could synchronize
- * to. So let's do things by time */
+ if (writable && blocks_to_write > 0) {
+ int result;
- time_passed = pa_rtclock_now() - u->started_at;
- audio_sent = pa_bytes_to_usec(u->write_index, &u->sample_spec);
+ if ((result = write_block(u)) < 0)
+ goto fail;
+ blocks_to_write -= result;
+
+ /* writable controls whether we set POLLOUT when polling - we set it to
+ * false to enable POLLOUT. If there are more blocks to write, we want to
+ * be woken up immediately when the socket becomes writable. If there
+ * aren't currently any more blocks to write, then we'll have to wait
+ * until we've received more data, so in that case we only want to set
+ * POLLIN. Note that when we are woken up the next time, POLLOUT won't be
+ * set in revents even if the socket has meanwhile become writable, which
+ * may seem bad, but in that case we'll set POLLOUT in the subsequent
+ * poll, and the poll will return immediately, so our writes won't be
+ * delayed. */
+ if (blocks_to_write > 0)
+ writable = false;
+ }
+
+ /* There is no source, we have to use the system clock for timing */
+ } else {
+ bool have_written = false;
+ pa_usec_t time_passed = 0;
+ pa_usec_t audio_sent = 0;
+
+ if (u->started_at) {
+ time_passed = pa_rtclock_now() - u->started_at;
+ audio_sent = pa_bytes_to_usec(u->write_index, &u->sample_spec);
+ }
+
+ /* A new block needs to be sent. */
if (audio_sent <= time_passed) {
- pa_usec_t audio_to_send = time_passed - audio_sent;
+ size_t bytes_to_send = pa_usec_to_bytes(time_passed - audio_sent, &u->sample_spec);
- /* Never try to catch up for more than 100ms */
- if (u->write_index > 0 && audio_to_send > MAX_PLAYBACK_CATCH_UP_USEC) {
- pa_usec_t skip_usec;
+ /* There are more than two blocks that need to be written. It seems that
+ * the socket has not been accepting data fast enough (could be due to
+ * hiccups in the wireless transmission). We need to discard everything
+ * older than two block sizes to keep the latency from growing. */
+ if (bytes_to_send > 2 * u->write_block_size) {
uint64_t skip_bytes;
+ pa_memchunk tmp;
+ size_t mempool_max_block_size = pa_mempool_block_size_max(u->core->mempool);
+ pa_usec_t skip_usec;
- skip_usec = audio_to_send - MAX_PLAYBACK_CATCH_UP_USEC;
- skip_bytes = pa_usec_to_bytes(skip_usec, &u->sample_spec);
+ skip_bytes = bytes_to_send - 2 * u->write_block_size;
+ skip_usec = pa_bytes_to_usec(skip_bytes, &u->sample_spec);
- if (skip_bytes > 0) {
- pa_memchunk tmp;
+ pa_log_debug("Skipping %llu us (= %llu bytes) in audio stream",
+ (unsigned long long) skip_usec,
+ (unsigned long long) skip_bytes);
- pa_log_warn("Skipping %llu us (= %llu bytes) in audio stream",
- (unsigned long long) skip_usec,
- (unsigned long long) skip_bytes);
+ while (skip_bytes > 0) {
+ size_t bytes_to_render;
- pa_sink_render_full(u->sink, skip_bytes, &tmp);
- pa_memblock_unref(tmp.memblock);
- u->write_index += skip_bytes;
+ if (skip_bytes > mempool_max_block_size)
+ bytes_to_render = mempool_max_block_size;
+ else
+ bytes_to_render = skip_bytes;
- if (u->profile == PA_BLUETOOTH_PROFILE_A2DP_SINK)
- a2dp_reduce_bitpool(u);
+ pa_sink_render_full(u->sink, bytes_to_render, &tmp);
+ pa_memblock_unref(tmp.memblock);
+ u->write_index += bytes_to_render;
+ skip_bytes -= bytes_to_render;
}
+
+ if (u->write_index > 0 && u->profile == PA_BLUETOOTH_PROFILE_A2DP_SINK)
+ a2dp_reduce_bitpool(u);
}
- do_write = 1;
- pending_read_bytes = 0;
+ blocks_to_write = 1;
}
- }
- if (writable && do_write > 0) {
- int n_written;
+ /* If the stream is writable, send some data if necessary */
+ if (writable && blocks_to_write > 0) {
+ int result;
- if (u->write_index <= 0)
- u->started_at = pa_rtclock_now();
-
- if (u->profile == PA_BLUETOOTH_PROFILE_A2DP_SINK) {
- if ((n_written = a2dp_process_render(u)) < 0)
- goto fail;
- } else {
- if ((n_written = sco_process_render(u)) < 0)
+ if ((result = write_block(u)) < 0)
goto fail;
- }
- if (n_written == 0)
- pa_log("Broken kernel: we got EAGAIN on write() after POLLOUT!");
+ blocks_to_write -= result;
+ writable = false;
+ if (result)
+ have_written = true;
+ }
- do_write -= n_written;
- writable = false;
+ /* If nothing was written during this iteration, either the stream
+ * is not writable or there was no write pending. Set up a timer that
+ * will wake up the thread when the next data needs to be written. */
+ if (!have_written) {
+ pa_usec_t sleep_for;
+ pa_usec_t next_write_at;
+
+ if (writable) {
+ /* There was no write pending on this iteration of the loop.
+ * Let's estimate when we need to wake up next */
+ next_write_at = pa_bytes_to_usec(u->write_index, &u->sample_spec);
+ sleep_for = time_passed < next_write_at ? next_write_at - time_passed : 0;
+ /* pa_log("Sleeping for %lu; time passed %lu, next write at %lu", (unsigned long) sleep_for, (unsigned long) time_passed, (unsigned long)next_write_at); */
+ } else
+ /* We could not write because the stream was not ready. Let's try
+ * again in 500 ms and drop audio if we still can't write. The
+ * thread will also be woken up when we can write again. */
+ sleep_for = PA_USEC_PER_MSEC * 500;
+
+ pa_rtpoll_set_timer_relative(u->rtpoll, sleep_for);
+ disable_timer = false;
+ }
}
+ }
- if ((!u->source || !PA_SOURCE_IS_LINKED(u->source->thread_info.state)) && do_write <= 0) {
- pa_usec_t sleep_for;
- pa_usec_t time_passed, next_write_at;
+ /* Set events to wake up the thread */
+ pollfd->events = (short) (((have_sink && !writable) ? POLLOUT : 0) | (have_source ? POLLIN : 0));
- if (writable) {
- /* Hmm, there is no input stream we could synchronize
- * to. So let's estimate when we need to wake up the latest */
- time_passed = pa_rtclock_now() - u->started_at;
- next_write_at = pa_bytes_to_usec(u->write_index, &u->sample_spec);
- sleep_for = time_passed < next_write_at ? next_write_at - time_passed : 0;
- /* pa_log("Sleeping for %lu; time passed %lu, next write at %lu", (unsigned long) sleep_for, (unsigned long) time_passed, (unsigned long)next_write_at); */
- } else
- /* drop stream every 500 ms */
- sleep_for = PA_USEC_PER_MSEC * 500;
-
- pa_rtpoll_set_timer_relative(u->rtpoll, sleep_for);
- disable_timer = false;
- }
- }
}
if (disable_timer)
pa_rtpoll_set_timer_disabled(u->rtpoll);
- /* Hmm, nothing to do. Let's sleep */
- if (pollfd)
- pollfd->events = (short) (((u->sink && PA_SINK_IS_LINKED(u->sink->thread_info.state) && !writable) ? POLLOUT : 0) |
- (u->source && PA_SOURCE_IS_LINKED(u->source->thread_info.state) ? POLLIN : 0));
-
if ((ret = pa_rtpoll_run(u->rtpoll)) < 0) {
pa_log_debug("pa_rtpoll_run failed with: %d", ret);
goto fail;
}
+
if (ret == 0) {
pa_log_debug("IO thread shutdown requested, stopping cleanly");
transport_release(u);