diff options
Diffstat (limited to 'libavformat/udp.c')
-rw-r--r-- | libavformat/udp.c | 301 |
1 files changed, 281 insertions, 20 deletions
diff --git a/libavformat/udp.c b/libavformat/udp.c index bfa8cf25e8..a8baa5bf9a 100644 --- a/libavformat/udp.c +++ b/libavformat/udp.c @@ -2,20 +2,20 @@ * UDP prototype streaming system * Copyright (c) 2000, 2001, 2002 Fabrice Bellard * - * This file is part of Libav. + * This file is part of FFmpeg. * - * Libav is free software; you can redistribute it and/or + * FFmpeg is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * - * Libav is distributed in the hope that it will be useful, + * FFmpeg is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public - * License along with Libav; if not, write to the Free Software + * License along with FFmpeg; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA */ @@ -29,31 +29,90 @@ #include "avformat.h" #include "avio_internal.h" #include "libavutil/parseutils.h" +#include "libavutil/fifo.h" +#include "libavutil/intreadwrite.h" #include "libavutil/avstring.h" +#include "libavutil/opt.h" +#include "libavutil/log.h" +#include "libavutil/time.h" #include "internal.h" #include "network.h" #include "os_support.h" #include "url.h" +#if HAVE_PTHREAD_CANCEL +#include <pthread.h> +#endif + +#ifndef HAVE_PTHREAD_CANCEL +#define HAVE_PTHREAD_CANCEL 0 +#endif + #ifndef IPV6_ADD_MEMBERSHIP #define IPV6_ADD_MEMBERSHIP IPV6_JOIN_GROUP #define IPV6_DROP_MEMBERSHIP IPV6_LEAVE_GROUP #endif +#define UDP_TX_BUF_SIZE 32768 +#define UDP_MAX_PKT_SIZE 65536 + typedef struct { + const AVClass *class; int udp_fd; int ttl; int buffer_size; int is_multicast; + int is_broadcast; int local_port; int reuse_socket; + int overrun_nonfatal; struct sockaddr_storage dest_addr; int dest_addr_len; int is_connected; + + /* Circular Buffer variables for use in UDP receive code */ + int circular_buffer_size; + AVFifoBuffer *fifo; + int circular_buffer_error; +#if HAVE_PTHREAD_CANCEL + pthread_t circular_buffer_thread; + pthread_mutex_t mutex; + pthread_cond_t cond; + int thread_started; +#endif + uint8_t tmp[UDP_MAX_PKT_SIZE+4]; + int remaining_in_dg; + char *local_addr; + int packet_size; + int timeout; + struct sockaddr_storage local_addr_storage; } UDPContext; -#define UDP_TX_BUF_SIZE 32768 -#define UDP_MAX_PKT_SIZE 65536 +#define OFFSET(x) offsetof(UDPContext, x) +#define D AV_OPT_FLAG_DECODING_PARAM +#define E AV_OPT_FLAG_ENCODING_PARAM +static const AVOption options[] = { +{"buffer_size", "set packet buffer size in bytes", OFFSET(buffer_size), AV_OPT_TYPE_INT, {.i64 = 0}, 0, INT_MAX, D|E }, +{"localport", "set local port to bind to", OFFSET(local_port), AV_OPT_TYPE_INT, {.i64 = 0}, 0, INT_MAX, D|E }, +{"localaddr", "choose local IP address", OFFSET(local_addr), AV_OPT_TYPE_STRING, {.str = ""}, 0, 0, D|E }, +{"pkt_size", "set size of UDP packets", OFFSET(packet_size), AV_OPT_TYPE_INT, {.i64 = 1472}, 0, INT_MAX, D|E }, +{"reuse", "explicitly allow or disallow reusing UDP sockets", OFFSET(reuse_socket), AV_OPT_TYPE_INT, {.i64 = 0}, 0, 1, D|E }, +{"broadcast", "explicitly allow or disallow broadcast destination", OFFSET(is_broadcast), AV_OPT_TYPE_INT, {.i64 = 0}, 0, 1, E }, +{"ttl", "set the time to live value (for multicast only)", OFFSET(ttl), AV_OPT_TYPE_INT, {.i64 = 16}, 0, INT_MAX, E }, +{"connect", "set if connect() should be called on socket", OFFSET(is_connected), AV_OPT_TYPE_INT, {.i64 = 0}, 0, 1, D|E }, +/* TODO 'sources', 'block' option */ +{"fifo_size", "set the UDP receiving circular buffer size, expressed as a number of packets with size of 188 bytes", OFFSET(circular_buffer_size), AV_OPT_TYPE_INT, {.i64 = 7*4096}, 0, INT_MAX, D }, +{"overrun_nonfatal", "survive in case of UDP receiving circular buffer overrun", OFFSET(overrun_nonfatal), AV_OPT_TYPE_INT, {.i64 = 0}, 0, 1, D }, +{"timeout", "set raise error timeout (only in read mode)", OFFSET(timeout), AV_OPT_TYPE_INT, {.i64 = 0}, 0, INT_MAX, D }, +{NULL} +}; + +static const AVClass udp_context_class = { + .class_name = "udp", + .item_name = av_default_item_name, + .option = options, + .version = LIBAVUTIL_VERSION_INT, +}; static void log_net_error(void *ctx, int level, const char* prefix) { @@ -84,14 +143,17 @@ static int udp_set_multicast_ttl(int sockfd, int mcastTTL, return 0; } -static int udp_join_multicast_group(int sockfd, struct sockaddr *addr) +static int udp_join_multicast_group(int sockfd, struct sockaddr *addr,struct sockaddr *local_addr) { #ifdef IP_ADD_MEMBERSHIP if (addr->sa_family == AF_INET) { struct ip_mreq mreq; mreq.imr_multiaddr.s_addr = ((struct sockaddr_in *)addr)->sin_addr.s_addr; - mreq.imr_interface.s_addr= INADDR_ANY; + if (local_addr) + mreq.imr_interface= ((struct sockaddr_in *)local_addr)->sin_addr; + else + mreq.imr_interface.s_addr= INADDR_ANY; if (setsockopt(sockfd, IPPROTO_IP, IP_ADD_MEMBERSHIP, (const void *)&mreq, sizeof(mreq)) < 0) { log_net_error(NULL, AV_LOG_ERROR, "setsockopt(IP_ADD_MEMBERSHIP)"); return -1; @@ -113,14 +175,17 @@ static int udp_join_multicast_group(int sockfd, struct sockaddr *addr) return 0; } -static int udp_leave_multicast_group(int sockfd, struct sockaddr *addr) +static int udp_leave_multicast_group(int sockfd, struct sockaddr *addr,struct sockaddr *local_addr) { #ifdef IP_DROP_MEMBERSHIP if (addr->sa_family == AF_INET) { struct ip_mreq mreq; mreq.imr_multiaddr.s_addr = ((struct sockaddr_in *)addr)->sin_addr.s_addr; - mreq.imr_interface.s_addr= INADDR_ANY; + if (local_addr) + mreq.imr_interface= ((struct sockaddr_in *)local_addr)->sin_addr; + else + mreq.imr_interface.s_addr= INADDR_ANY; if (setsockopt(sockfd, IPPROTO_IP, IP_DROP_MEMBERSHIP, (const void *)&mreq, sizeof(mreq)) < 0) { log_net_error(NULL, AV_LOG_ERROR, "setsockopt(IP_DROP_MEMBERSHIP)"); return -1; @@ -317,6 +382,7 @@ static int udp_port(struct sockaddr_storage *addr, int addr_len) * 'localport=n' : set the local port * 'pkt_size=n' : set max packet size * 'reuse=1' : enable reusing the socket + * 'overrun_nonfatal=1': survive in case of circular buffer overrun * * @param h media file context * @param uri of the remote server @@ -378,6 +444,65 @@ static int udp_get_file_handle(URLContext *h) return s->udp_fd; } +#if HAVE_PTHREAD_CANCEL +static void *circular_buffer_task( void *_URLContext) +{ + URLContext *h = _URLContext; + UDPContext *s = h->priv_data; + int old_cancelstate; + + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancelstate); + pthread_mutex_lock(&s->mutex); + if (ff_socket_nonblock(s->udp_fd, 0) < 0) { + av_log(h, AV_LOG_ERROR, "Failed to set blocking mode"); + s->circular_buffer_error = AVERROR(EIO); + goto end; + } + while(1) { + int len; + + pthread_mutex_unlock(&s->mutex); + /* Blocking operations are always cancellation points; + see "General Information" / "Thread Cancelation Overview" + in Single Unix. */ + pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_cancelstate); + len = recv(s->udp_fd, s->tmp+4, sizeof(s->tmp)-4, 0); + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancelstate); + pthread_mutex_lock(&s->mutex); + if (len < 0) { + if (ff_neterrno() != AVERROR(EAGAIN) && ff_neterrno() != AVERROR(EINTR)) { + s->circular_buffer_error = ff_neterrno(); + goto end; + } + continue; + } + AV_WL32(s->tmp, len); + + if(av_fifo_space(s->fifo) < len + 4) { + /* No Space left */ + if (s->overrun_nonfatal) { + av_log(h, AV_LOG_WARNING, "Circular buffer overrun. " + "Surviving due to overrun_nonfatal option\n"); + continue; + } else { + av_log(h, AV_LOG_ERROR, "Circular buffer overrun. " + "To avoid, increase fifo_size URL option. " + "To survive in such case, use overrun_nonfatal option\n"); + s->circular_buffer_error = AVERROR(EIO); + goto end; + } + } + av_fifo_generic_write(s->fifo, s->tmp, len+4, NULL); + pthread_cond_signal(&s->cond); + } + +end: + pthread_cond_signal(&s->cond); + pthread_mutex_unlock(&s->mutex); + return NULL; +} +#endif + static int parse_source_list(char *buf, char **sources, int *num_sources, int max_sources) { @@ -416,12 +541,10 @@ static int udp_open(URLContext *h, const char *uri, int flags) char *include_sources[32], *exclude_sources[32]; h->is_streamed = 1; - h->max_packet_size = 1472; is_output = !(flags & AVIO_FLAG_READ); - - s->ttl = 16; - s->buffer_size = is_output ? UDP_TX_BUF_SIZE : UDP_MAX_PKT_SIZE; + if (!s->buffer_size) /* if not set explicitly */ + s->buffer_size = is_output ? UDP_TX_BUF_SIZE : UDP_MAX_PKT_SIZE; p = strchr(uri, '?'); if (p) { @@ -433,6 +556,17 @@ static int udp_open(URLContext *h, const char *uri, int flags) s->reuse_socket = 1; reuse_specified = 1; } + if (av_find_info_tag(buf, sizeof(buf), "overrun_nonfatal", p)) { + char *endptr = NULL; + s->overrun_nonfatal = strtol(buf, &endptr, 10); + /* assume if no digits were found it is a request to enable it */ + if (buf == endptr) + s->overrun_nonfatal = 1; + if (!HAVE_PTHREAD_CANCEL) + av_log(h, AV_LOG_WARNING, + "'overrun_nonfatal' option was set but it is not supported " + "on this build (pthread support is required)\n"); + } if (av_find_info_tag(buf, sizeof(buf), "ttl", p)) { s->ttl = strtol(buf, NULL, 10); } @@ -440,7 +574,7 @@ static int udp_open(URLContext *h, const char *uri, int flags) s->local_port = strtol(buf, NULL, 10); } if (av_find_info_tag(buf, sizeof(buf), "pkt_size", p)) { - h->max_packet_size = strtol(buf, NULL, 10); + s->packet_size = strtol(buf, NULL, 10); } if (av_find_info_tag(buf, sizeof(buf), "buffer_size", p)) { s->buffer_size = strtol(buf, NULL, 10); @@ -448,6 +582,13 @@ static int udp_open(URLContext *h, const char *uri, int flags) if (av_find_info_tag(buf, sizeof(buf), "connect", p)) { s->is_connected = strtol(buf, NULL, 10); } + if (av_find_info_tag(buf, sizeof(buf), "fifo_size", p)) { + s->circular_buffer_size = strtol(buf, NULL, 10); + if (!HAVE_PTHREAD_CANCEL) + av_log(h, AV_LOG_WARNING, + "'circular_buffer_size' option was set but it is not supported " + "on this build (pthread support is required)\n"); + } if (av_find_info_tag(buf, sizeof(buf), "localaddr", p)) { av_strlcpy(localaddr, buf, sizeof(localaddr)); } @@ -461,7 +602,19 @@ static int udp_open(URLContext *h, const char *uri, int flags) FF_ARRAY_ELEMS(exclude_sources))) goto fail; } + if (!is_output && av_find_info_tag(buf, sizeof(buf), "timeout", p)) + s->timeout = strtol(buf, NULL, 10); + if (is_output && av_find_info_tag(buf, sizeof(buf), "broadcast", p)) + s->is_broadcast = strtol(buf, NULL, 10); + } + /* handling needed to support options picking from both AVOption and URL */ + s->circular_buffer_size *= 188; + if (flags & AVIO_FLAG_WRITE) { + h->max_packet_size = s->packet_size; + } else { + h->max_packet_size = UDP_MAX_PKT_SIZE; } + h->rw_timeout = s->timeout; /* fill the dest addr */ av_url_split(NULL, 0, NULL, 0, hostname, sizeof(hostname), &port, NULL, 0, uri); @@ -478,10 +631,12 @@ static int udp_open(URLContext *h, const char *uri, int flags) if ((s->is_multicast || !s->local_port) && (h->flags & AVIO_FLAG_READ)) s->local_port = port; - udp_fd = udp_socket_create(s, &my_addr, &len, localaddr); + udp_fd = udp_socket_create(s, &my_addr, &len, localaddr[0] ? localaddr : s->local_addr); if (udp_fd < 0) goto fail; + s->local_addr_storage=my_addr; //store for future multicast join + /* Follow the requested reuse option, unless it's multicast in which * case enable reuse unless explicitly disabled. */ @@ -491,6 +646,13 @@ static int udp_open(URLContext *h, const char *uri, int flags) goto fail; } + if (s->is_broadcast) { +#ifdef SO_BROADCAST + if (setsockopt (udp_fd, SOL_SOCKET, SO_BROADCAST, &(s->is_broadcast), sizeof(s->is_broadcast)) != 0) +#endif + goto fail; + } + /* If multicast, try binding the multicast address first, to avoid * receiving UDP packets from other sources aimed at the same UDP * port. This fails on windows. This makes sending to the same address @@ -526,7 +688,7 @@ static int udp_open(URLContext *h, const char *uri, int flags) if (udp_set_multicast_sources(udp_fd, (struct sockaddr *)&s->dest_addr, s->dest_addr_len, include_sources, num_include_sources, 1) < 0) goto fail; } else { - if (udp_join_multicast_group(udp_fd, (struct sockaddr *)&s->dest_addr) < 0) + if (udp_join_multicast_group(udp_fd, (struct sockaddr *)&s->dest_addr,(struct sockaddr *)&s->local_addr_storage) < 0) goto fail; } if (num_exclude_sources) { @@ -544,12 +706,20 @@ static int udp_open(URLContext *h, const char *uri, int flags) goto fail; } } else { - /* set udp recv buffer size to the largest possible udp packet size to - * avoid losing data on OSes that set this too low by default. */ + /* set udp recv buffer size to the requested value (default 64K) */ tmp = s->buffer_size; if (setsockopt(udp_fd, SOL_SOCKET, SO_RCVBUF, &tmp, sizeof(tmp)) < 0) { log_net_error(h, AV_LOG_WARNING, "setsockopt(SO_RECVBUF)"); } + len = sizeof(tmp); + if (getsockopt(udp_fd, SOL_SOCKET, SO_RCVBUF, &tmp, &len) < 0) { + log_net_error(h, AV_LOG_WARNING, "getsockopt(SO_RCVBUF)"); + } else { + av_log(h, AV_LOG_DEBUG, "end receive buffer size reported is %d\n", tmp); + if(tmp < s->buffer_size) + av_log(h, AV_LOG_WARNING, "attempted to set receive buffer to size %d but it only ended up set as %d", s->buffer_size, tmp); + } + /* make the socket non-blocking */ ff_socket_nonblock(udp_fd, 1); } @@ -566,10 +736,43 @@ static int udp_open(URLContext *h, const char *uri, int flags) av_freep(&exclude_sources[i]); s->udp_fd = udp_fd; + +#if HAVE_PTHREAD_CANCEL + if (!is_output && s->circular_buffer_size) { + int ret; + + /* start the task going */ + s->fifo = av_fifo_alloc(s->circular_buffer_size); + ret = pthread_mutex_init(&s->mutex, NULL); + if (ret != 0) { + av_log(h, AV_LOG_ERROR, "pthread_mutex_init failed : %s\n", strerror(ret)); + goto fail; + } + ret = pthread_cond_init(&s->cond, NULL); + if (ret != 0) { + av_log(h, AV_LOG_ERROR, "pthread_cond_init failed : %s\n", strerror(ret)); + goto cond_fail; + } + ret = pthread_create(&s->circular_buffer_thread, NULL, circular_buffer_task, h); + if (ret != 0) { + av_log(h, AV_LOG_ERROR, "pthread_create failed : %s\n", strerror(ret)); + goto thread_fail; + } + s->thread_started = 1; + } +#endif + return 0; +#if HAVE_PTHREAD_CANCEL + thread_fail: + pthread_cond_destroy(&s->cond); + cond_fail: + pthread_mutex_destroy(&s->mutex); +#endif fail: if (udp_fd >= 0) closesocket(udp_fd); + av_fifo_freep(&s->fifo); for (i = 0; i < num_include_sources; i++) av_freep(&include_sources[i]); for (i = 0; i < num_exclude_sources; i++) @@ -581,6 +784,50 @@ static int udp_read(URLContext *h, uint8_t *buf, int size) { UDPContext *s = h->priv_data; int ret; + int avail, nonblock = h->flags & AVIO_FLAG_NONBLOCK; + +#if HAVE_PTHREAD_CANCEL + if (s->fifo) { + pthread_mutex_lock(&s->mutex); + do { + avail = av_fifo_size(s->fifo); + if (avail) { // >=size) { + uint8_t tmp[4]; + + av_fifo_generic_read(s->fifo, tmp, 4, NULL); + avail= AV_RL32(tmp); + if(avail > size){ + av_log(h, AV_LOG_WARNING, "Part of datagram lost due to insufficient buffer size\n"); + avail= size; + } + + av_fifo_generic_read(s->fifo, buf, avail, NULL); + av_fifo_drain(s->fifo, AV_RL32(tmp) - avail); + pthread_mutex_unlock(&s->mutex); + return avail; + } else if(s->circular_buffer_error){ + int err = s->circular_buffer_error; + pthread_mutex_unlock(&s->mutex); + return err; + } else if(nonblock) { + pthread_mutex_unlock(&s->mutex); + return AVERROR(EAGAIN); + } + else { + /* FIXME: using the monotonic clock would be better, + but it does not exist on all supported platforms. */ + int64_t t = av_gettime() + 100000; + struct timespec tv = { .tv_sec = t / 1000000, + .tv_nsec = (t % 1000000) * 1000 }; + if (pthread_cond_timedwait(&s->cond, &s->mutex, &tv) < 0) { + pthread_mutex_unlock(&s->mutex); + return AVERROR(errno == ETIMEDOUT ? EAGAIN : errno); + } + nonblock = 1; + } + } while( 1); + } +#endif if (!(h->flags & AVIO_FLAG_NONBLOCK)) { ret = ff_network_wait_fd(s->udp_fd, 0); @@ -588,6 +835,7 @@ static int udp_read(URLContext *h, uint8_t *buf, int size) return ret; } ret = recv(s->udp_fd, buf, size, 0); + return ret < 0 ? ff_neterrno() : ret; } @@ -615,10 +863,22 @@ static int udp_write(URLContext *h, const uint8_t *buf, int size) static int udp_close(URLContext *h) { UDPContext *s = h->priv_data; + int ret; if (s->is_multicast && (h->flags & AVIO_FLAG_READ)) - udp_leave_multicast_group(s->udp_fd, (struct sockaddr *)&s->dest_addr); + udp_leave_multicast_group(s->udp_fd, (struct sockaddr *)&s->dest_addr,(struct sockaddr *)&s->local_addr_storage); closesocket(s->udp_fd); +#if HAVE_PTHREAD_CANCEL + if (s->thread_started) { + pthread_cancel(s->circular_buffer_thread); + ret = pthread_join(s->circular_buffer_thread, NULL); + if (ret != 0) + av_log(h, AV_LOG_ERROR, "pthread_join(): %s\n", strerror(ret)); + pthread_mutex_destroy(&s->mutex); + pthread_cond_destroy(&s->cond); + } +#endif + av_fifo_freep(&s->fifo); return 0; } @@ -630,5 +890,6 @@ URLProtocol ff_udp_protocol = { .url_close = udp_close, .url_get_file_handle = udp_get_file_handle, .priv_data_size = sizeof(UDPContext), + .priv_data_class = &udp_context_class, .flags = URL_PROTOCOL_FLAG_NETWORK, }; |