summaryrefslogtreecommitdiff
path: root/libavformat/async.c
diff options
context:
space:
mode:
authorZhang Rui <bbcallen@gmail.com>2015-10-13 18:30:47 +0800
committerMichael Niedermayer <michael@niedermayer.cc>2015-10-14 20:40:09 +0200
commit6c7f289fab3429706cb47b81ea02963585f0dd05 (patch)
tree574e38dc4f08b2687db5f532f3910bf58acc2cbb /libavformat/async.c
parent87ff61b9abde99d8cdc2a4332b41f69a80eb3d56 (diff)
downloadffmpeg-6c7f289fab3429706cb47b81ea02963585f0dd05.tar.gz
avformat/async: cache some data for fast seek backward
Signed-off-by: Michael Niedermayer <michael@niedermayer.cc>
Diffstat (limited to 'libavformat/async.c')
-rw-r--r--libavformat/async.c123
1 files changed, 104 insertions, 19 deletions
diff --git a/libavformat/async.c b/libavformat/async.c
index a8c8f542c1..9fac84a01a 100644
--- a/libavformat/async.c
+++ b/libavformat/async.c
@@ -24,7 +24,6 @@
/**
* @TODO
* support timeout
- * support backward short seek
* support work with concatdec, hls
*/
@@ -43,8 +42,17 @@
#endif
#define BUFFER_CAPACITY (4 * 1024 * 1024)
+#define READ_BACK_CAPACITY (4 * 1024 * 1024)
#define SHORT_SEEK_THRESHOLD (256 * 1024)
+typedef struct RingBuffer
+{
+ AVFifoBuffer *fifo;
+ int read_back_capacity;
+
+ int read_pos;
+} RingBuffer;
+
typedef struct Context {
AVClass *class;
URLContext *inner;
@@ -61,7 +69,7 @@ typedef struct Context {
int64_t logical_pos;
int64_t logical_size;
- AVFifoBuffer *fifo;
+ RingBuffer ring;
pthread_cond_t cond_wakeup_main;
pthread_cond_t cond_wakeup_background;
@@ -72,6 +80,73 @@ typedef struct Context {
AVIOInterruptCB interrupt_callback;
} Context;
+static int ring_init(RingBuffer *ring, unsigned int capacity, int read_back_capacity)
+{
+ memset(ring, 0, sizeof(RingBuffer));
+ ring->fifo = av_fifo_alloc(capacity + read_back_capacity);
+ if (!ring->fifo)
+ return AVERROR(ENOMEM);
+
+ ring->read_back_capacity = read_back_capacity;
+ return 0;
+}
+
+static void ring_destroy(RingBuffer *ring)
+{
+ av_fifo_freep(&ring->fifo);
+}
+
+static void ring_reset(RingBuffer *ring)
+{
+ av_fifo_reset(ring->fifo);
+ ring->read_pos = 0;
+}
+
+static int ring_size(RingBuffer *ring)
+{
+ return av_fifo_size(ring->fifo) - ring->read_pos;
+}
+
+static int ring_space(RingBuffer *ring)
+{
+ return av_fifo_space(ring->fifo);
+}
+
+static int ring_generic_read(RingBuffer *ring, void *dest, int buf_size, void (*func)(void*, void*, int))
+{
+ int ret;
+
+ av_assert2(buf_size <= ring_size(ring));
+ ret = av_fifo_generic_peek_at(ring->fifo, dest, ring->read_pos, buf_size, func);
+ ring->read_pos += buf_size;
+
+ if (ring->read_pos > ring->read_back_capacity) {
+ av_fifo_drain(ring->fifo, ring->read_pos - ring->read_back_capacity);
+ ring->read_pos = ring->read_back_capacity;
+ }
+
+ return ret;
+}
+
+static int ring_generic_write(RingBuffer *ring, void *src, int size, int (*func)(void*, void*, int))
+{
+ av_assert2(size <= ring_space(ring));
+ return av_fifo_generic_write(ring->fifo, src, size, func);
+}
+
+static int ring_size_of_read_back(RingBuffer *ring)
+{
+ return ring->read_pos;
+}
+
+static int ring_drain(RingBuffer *ring, int offset)
+{
+ av_assert2(offset >= -ring_size_of_read_back(ring));
+ av_assert2(offset <= -ring_size(ring));
+ ring->read_pos += offset;
+ return 0;
+}
+
static int async_check_interrupt(void *arg)
{
URLContext *h = arg;
@@ -102,7 +177,7 @@ static void *async_buffer_task(void *arg)
{
URLContext *h = arg;
Context *c = h->priv_data;
- AVFifoBuffer *fifo = c->fifo;
+ RingBuffer *ring = &c->ring;
int ret = 0;
int64_t seek_ret;
@@ -132,14 +207,14 @@ static void *async_buffer_task(void *arg)
c->seek_ret = seek_ret;
c->seek_request = 0;
- av_fifo_reset(fifo);
+ ring_reset(ring);
pthread_cond_signal(&c->cond_wakeup_main);
pthread_mutex_unlock(&c->mutex);
continue;
}
- fifo_space = av_fifo_space(fifo);
+ fifo_space = ring_space(ring);
if (c->io_eof_reached || fifo_space <= 0) {
pthread_cond_signal(&c->cond_wakeup_main);
pthread_cond_wait(&c->cond_wakeup_background, &c->mutex);
@@ -149,7 +224,7 @@ static void *async_buffer_task(void *arg)
pthread_mutex_unlock(&c->mutex);
to_copy = FFMIN(4096, fifo_space);
- ret = av_fifo_generic_write(fifo, (void *)h, to_copy, (void *)wrapped_url_read);
+ ret = ring_generic_write(ring, (void *)h, to_copy, (void *)wrapped_url_read);
pthread_mutex_lock(&c->mutex);
if (ret <= 0) {
@@ -173,11 +248,9 @@ static int async_open(URLContext *h, const char *arg, int flags, AVDictionary **
av_strstart(arg, "async:", &arg);
- c->fifo = av_fifo_alloc(BUFFER_CAPACITY);
- if (!c->fifo) {
- ret = AVERROR(ENOMEM);
+ ret = ring_init(&c->ring, BUFFER_CAPACITY, READ_BACK_CAPACITY);
+ if (ret < 0)
goto fifo_fail;
- }
/* wrap interrupt callback */
c->interrupt_callback = h->interrupt_callback;
@@ -225,7 +298,7 @@ cond_wakeup_main_fail:
mutex_fail:
ffurl_close(c->inner);
url_fail:
- av_fifo_freep(&c->fifo);
+ ring_destroy(&c->ring);
fifo_fail:
return ret;
}
@@ -248,7 +321,7 @@ static int async_close(URLContext *h)
pthread_cond_destroy(&c->cond_wakeup_main);
pthread_mutex_destroy(&c->mutex);
ffurl_close(c->inner);
- av_fifo_freep(&c->fifo);
+ ring_destroy(&c->ring);
return 0;
}
@@ -257,7 +330,7 @@ static int async_read_internal(URLContext *h, void *dest, int size, int read_com
void (*func)(void*, void*, int))
{
Context *c = h->priv_data;
- AVFifoBuffer *fifo = c->fifo;
+ RingBuffer *ring = &c->ring;
int to_read = size;
int ret = 0;
@@ -269,10 +342,10 @@ static int async_read_internal(URLContext *h, void *dest, int size, int read_com
ret = AVERROR_EXIT;
break;
}
- fifo_size = av_fifo_size(fifo);
+ fifo_size = ring_size(ring);
to_copy = FFMIN(to_read, fifo_size);
if (to_copy > 0) {
- av_fifo_generic_read(fifo, dest, to_copy, func);
+ ring_generic_read(ring, dest, to_copy, func);
if (!func)
dest = (uint8_t *)dest + to_copy;
c->logical_pos += to_copy;
@@ -312,10 +385,11 @@ static void fifo_do_not_copy_func(void* dest, void* src, int size) {
static int64_t async_seek(URLContext *h, int64_t pos, int whence)
{
Context *c = h->priv_data;
- AVFifoBuffer *fifo = c->fifo;
+ RingBuffer *ring = &c->ring;
int64_t ret;
int64_t new_logical_pos;
int fifo_size;
+ int fifo_size_of_read_back;
if (whence == AVSEEK_SIZE) {
av_log(h, AV_LOG_TRACE, "async_seek: AVSEEK_SIZE: %"PRId64"\n", (int64_t)c->logical_size);
@@ -332,17 +406,28 @@ static int64_t async_seek(URLContext *h, int64_t pos, int whence)
if (new_logical_pos < 0)
return AVERROR(EINVAL);
- fifo_size = av_fifo_size(fifo);
+ fifo_size = ring_size(ring);
+ fifo_size_of_read_back = ring_size_of_read_back(ring);
if (new_logical_pos == c->logical_pos) {
/* current position */
return c->logical_pos;
- } else if ((new_logical_pos > c->logical_pos) &&
+ } else if ((new_logical_pos >= (c->logical_pos - fifo_size_of_read_back)) &&
(new_logical_pos < (c->logical_pos + fifo_size + SHORT_SEEK_THRESHOLD))) {
+ int pos_delta = (int)(new_logical_pos - c->logical_pos);
/* fast seek */
av_log(h, AV_LOG_TRACE, "async_seek: fask_seek %"PRId64" from %d dist:%d/%d\n",
new_logical_pos, (int)c->logical_pos,
(int)(new_logical_pos - c->logical_pos), fifo_size);
- async_read_internal(h, NULL, (int)(new_logical_pos - c->logical_pos), 1, fifo_do_not_copy_func);
+
+ if (pos_delta > 0) {
+ // fast seek forwards
+ async_read_internal(h, NULL, pos_delta, 1, fifo_do_not_copy_func);
+ } else {
+ // fast seek backwards
+ ring_drain(ring, pos_delta);
+ c->logical_pos = new_logical_pos;
+ }
+
return c->logical_pos;
} else if (c->logical_size <= 0) {
/* can not seek */