summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRaimo Niskanen <raimo@erlang.org>2021-02-26 16:17:39 +0100
committerRaimo Niskanen <raimo@erlang.org>2021-04-23 13:15:21 +0200
commit78ce348efffc0d562994c68711e04b8587fbe5c6 (patch)
treebf8fcfce91d2b91e5b57f5a0810b7b9134421e63
parent141fd67c2337245994f38d0259a4b79f041603ab (diff)
downloaderlang-78ce348efffc0d562994c68711e04b8587fbe5c6.tar.gz
Implement sendfile over NIF handshake
-rw-r--r--erts/emulator/nifs/common/prim_file_nif.c82
-rw-r--r--erts/emulator/nifs/common/prim_file_nif.h4
-rw-r--r--erts/emulator/nifs/common/prim_file_nif_dyncall.h36
-rw-r--r--erts/emulator/nifs/common/prim_socket_nif.c1235
-rw-r--r--erts/emulator/nifs/common/socket_util.c4
-rw-r--r--erts/emulator/nifs/unix/unix_prim_file.c18
-rw-r--r--erts/emulator/nifs/win32/win_prim_file.c13
-rw-r--r--erts/preloaded/ebin/prim_file.beambin32640 -> 32652 bytes
-rw-r--r--erts/preloaded/ebin/prim_socket.beambin30856 -> 31540 bytes
-rw-r--r--erts/preloaded/src/prim_file.erl5
-rw-r--r--erts/preloaded/src/prim_socket.erl61
-rw-r--r--lib/kernel/doc/src/file.xml10
-rw-r--r--lib/kernel/doc/src/socket.xml371
-rw-r--r--lib/kernel/src/file.erl91
-rw-r--r--lib/kernel/src/gen_tcp.erl23
-rw-r--r--lib/kernel/src/gen_tcp_socket.erl35
-rw-r--r--lib/kernel/src/inet.erl15
-rw-r--r--lib/kernel/src/socket.erl464
-rw-r--r--lib/kernel/test/sendfile_SUITE.erl84
19 files changed, 2186 insertions, 365 deletions
diff --git a/erts/emulator/nifs/common/prim_file_nif.c b/erts/emulator/nifs/common/prim_file_nif.c
index c01ad3616c..7d46035eb3 100644
--- a/erts/emulator/nifs/common/prim_file_nif.c
+++ b/erts/emulator/nifs/common/prim_file_nif.c
@@ -1,7 +1,7 @@
/*
* %CopyrightBegin%
*
- * Copyright Ericsson 2017-2020. All Rights Reserved.
+ * Copyright Ericsson 2017-2021. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -30,6 +30,7 @@
#include "erl_driver.h"
#include "prim_file_nif.h"
+#include "prim_file_nif_dyncall.h"
/* NIF interface declarations */
static int load(ErlNifEnv *env, void** priv_data, ERL_NIF_TERM load_info);
@@ -220,6 +221,8 @@ static ErlNifPid erts_prim_file_pid;
static void owner_death_callback(ErlNifEnv* env, void* obj, ErlNifPid* pid, ErlNifMonitor* mon);
+static ErlNifResourceDynCall dyncall;
+
static int load(ErlNifEnv *env, void** priv_data, ERL_NIF_TERM prim_file_pid)
{
ErlNifResourceTypeInit callbacks;
@@ -264,11 +267,13 @@ static int load(ErlNifEnv *env, void** priv_data, ERL_NIF_TERM prim_file_pid)
am_cur = enif_make_atom(env, "cur");
am_eof = enif_make_atom(env, "eof");
- callbacks.down = owner_death_callback;
- callbacks.dtor = NULL;
- callbacks.stop = NULL;
+ callbacks.down = owner_death_callback;
+ callbacks.dtor = NULL;
+ callbacks.stop = NULL;
+ callbacks.dyncall = dyncall;
+ callbacks.members = 4;
- efile_resource_type = enif_open_resource_type_x(env, "efile", &callbacks,
+ efile_resource_type = enif_init_resource_type(env, "efile", &callbacks,
ERL_NIF_RT_CREATE, NULL);
*priv_data = NULL;
@@ -414,6 +419,66 @@ static void owner_death_callback(ErlNifEnv* env, void* obj, ErlNifPid* pid, ErlN
}
}
+static void dyncall_dup(ErlNifEnv* env, efile_data_t* d, struct prim_file_nif_dyncall_dup *dc_dup) {
+
+ enum efile_state_t previous_state;
+
+ previous_state = erts_atomic32_cmpxchg_acqb(&d->state,
+ EFILE_STATE_BUSY, EFILE_STATE_IDLE);
+
+ if (previous_state == EFILE_STATE_IDLE) {
+ int do_dup = !0;
+
+ dc_dup->error = efile_get_handle(env, d, do_dup, &dc_dup->handle);
+
+ previous_state = erts_atomic32_cmpxchg_relb(&d->state,
+ EFILE_STATE_IDLE, EFILE_STATE_BUSY);
+
+ ASSERT(previous_state != EFILE_STATE_IDLE);
+
+ if(previous_state == EFILE_STATE_CLOSE_PENDING) {
+ /* This is the only point where a change from CLOSE_PENDING is
+ * possible, and we're running synchronously, so we can't race with
+ * anything else here. */
+ posix_errno_t ignored;
+
+ erts_atomic32_set_acqb(&d->state, EFILE_STATE_CLOSED);
+ efile_close(d, &ignored);
+ }
+ } else {
+ /* CLOSE_PENDING should be impossible at this point since it requires
+ * a transition from BUSY; the only valid state here is CLOSED. */
+ ASSERT(previous_state == EFILE_STATE_CLOSED);
+
+ dc_dup->error = EINVAL;
+ }
+}
+
+static void dyncall(ErlNifEnv* env, void* obj, void* data) {
+ efile_data_t *d = (efile_data_t*)obj;
+ struct prim_file_nif_dyncall *dc;
+
+ for (dc = (struct prim_file_nif_dyncall *)data;
+ dc->size > 0;
+ dc = (struct prim_file_nif_dyncall *)
+ ((char *)dc + dc->size)) {
+
+ switch (dc->op) {
+
+ case prim_file_nif_dyncall_dup: {
+ struct prim_file_nif_dyncall_dup *dc_dup =
+ (struct prim_file_nif_dyncall_dup *)dc;
+ ASSERT(dc->size >= sizeof(*dc_dup));
+
+ dyncall_dup(env, d, dc_dup);
+ dc->completed = !0;
+
+ return;
+ }
+ } // switch (dc->op)
+ }
+}
+
static ERL_NIF_TERM efile_filetype_to_atom(enum efile_filetype_t type) {
switch(type) {
case EFILE_FILETYPE_DEVICE: return am_device;
@@ -928,9 +993,14 @@ static ERL_NIF_TERM ipread_s32bu_p32bu_nif_impl(efile_data_t *d, ErlNifEnv *env,
}
static ERL_NIF_TERM get_handle_nif_impl(efile_data_t *d, ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) {
+ ERL_NIF_TERM result;
+ posix_errno_t error;
ASSERT(argc == 0);
- return efile_get_handle(env, d);
+ error = efile_get_handle(env, d, 0, &result);
+ ASSERT(error == 0); (void)error;
+
+ return result;
}
static ERL_NIF_TERM build_file_info(ErlNifEnv *env, efile_fileinfo_t *info) {
diff --git a/erts/emulator/nifs/common/prim_file_nif.h b/erts/emulator/nifs/common/prim_file_nif.h
index d4f18fd494..111323b47c 100644
--- a/erts/emulator/nifs/common/prim_file_nif.h
+++ b/erts/emulator/nifs/common/prim_file_nif.h
@@ -1,7 +1,7 @@
/*
* %CopyrightBegin%
*
- * Copyright Ericsson 2017-2018. All Rights Reserved.
+ * Copyright Ericsson 2017-2021. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -129,7 +129,7 @@ posix_errno_t efile_marshal_path(ErlNifEnv *env, ERL_NIF_TERM path, efile_path_t
*
* This is an internal function intended to support tests and tricky
* operations like sendfile(2). */
-ERL_NIF_TERM efile_get_handle(ErlNifEnv *env, efile_data_t *d);
+posix_errno_t efile_get_handle(ErlNifEnv *env, efile_data_t *d, int do_dup, ERL_NIF_TERM *handle);
/** @brief Read until EOF or the given iovec has been filled.
*
diff --git a/erts/emulator/nifs/common/prim_file_nif_dyncall.h b/erts/emulator/nifs/common/prim_file_nif_dyncall.h
new file mode 100644
index 0000000000..b7881fb0f4
--- /dev/null
+++ b/erts/emulator/nifs/common/prim_file_nif_dyncall.h
@@ -0,0 +1,36 @@
+/*
+ * %CopyrightBegin%
+ *
+ * Copyright Ericsson 2021. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * %CopyrightEnd%
+ */
+
+enum prim_file_nif_dyncall_op {
+ prim_file_nif_dyncall_dup,
+};
+
+struct prim_file_nif_dyncall {
+ size_t size;
+ enum prim_file_nif_dyncall_op op;
+ int completed;
+};
+
+struct prim_file_nif_dyncall_dup {
+ struct prim_file_nif_dyncall common;
+
+ int error;
+ ERL_NIF_TERM handle;
+};
diff --git a/erts/emulator/nifs/common/prim_socket_nif.c b/erts/emulator/nifs/common/prim_socket_nif.c
index 459f75d540..3440ee3dce 100644
--- a/erts/emulator/nifs/common/prim_socket_nif.c
+++ b/erts/emulator/nifs/common/prim_socket_nif.c
@@ -81,7 +81,16 @@
#include <netpacket/packet.h>
#endif
-/* SENDFILE STUFF HERE IF WE NEED IT... */
+#ifdef HAVE_SENDFILE
+#if defined(__linux__) || (defined(__sun) && defined(__SVR4))
+ #include <sys/sendfile.h>
+#elif defined(__FreeBSD__) || defined(__DragonFly__)
+ /* Need to define __BSD_VISIBLE in order to expose prototype
+ * of sendfile in sys/socket.h
+ */
+ #define __BSD_VISIBLE 1
+#endif
+#endif
#if defined(__APPLE__) && defined(__MACH__) && !defined(__DARWIN__)
#define __DARWIN__ 1
@@ -355,7 +364,7 @@ static void (*esock_sctp_freepaddrs)(struct sockaddr *addrs) = NULL;
#include "socket_tarray.h"
#include "socket_int.h"
#include "socket_util.h"
-
+#include "prim_file_nif_dyncall.h"
#if defined(ERTS_INLINE)
# define ESOCK_INLINE ERTS_INLINE
@@ -449,12 +458,17 @@ typedef ErlNifEvent HANDLE;
#define ESOCK_STATE_ACCEPTING 0x0004 /* readState */
#define ESOCK_STATE_CONNECTING 0x0010 /* writeState */
#define ESOCK_STATE_CONNECTED 0x0020 /* writeState */
+
/* This is set in either readState or writeState
- * so it has to be read from both */
+ * so it has to be read from both.
+ * Means that the socket has been used in select,
+ * so select_stop is required. */
#define ESOCK_STATE_SELECTED 0x0100 /* readState or writeState */
+
/* These are set in both readState and writeState
* so they can be read from either. */
#define ESOCK_STATE_CLOSING 0x0200 /* readState and writeState */
+
#define ESOCK_STATE_CLOSED 0x0400 /* readState and writeState */
//
#define ESOCK_STATE_DTOR 0x8000
@@ -799,6 +813,7 @@ typedef struct {
ESockRequestQueueElement* last;
} ESockRequestQueue;
+
/*** The point of this is primarily testing ***/
/*
#if defined(ESOCK_COUNTER_SIZE)
@@ -860,6 +875,25 @@ typedef Uint64 ESockCounter;
// static const ESockCounter esock_counter_max = ESOCK_COUNTER_MAX;
+#ifdef HAVE_SENDFILE
+
+typedef struct {
+ ESockCounter cnt; // Calls to OS sendfile()
+ ESockCounter byteCnt; // Bytes sent with sendfile
+ ESockCounter fails; // Failed sendfile operations
+ ESockCounter max; // Largest sendfile operation
+ ESockCounter maxCnt; // Counter for ="=
+ ESockCounter pkg; // Sendfile chunks
+ ESockCounter pkgMax; // Largest sendfile chunk
+ ESockCounter tries; // Started sendfile operations
+ ESockCounter waits; // Select's during sendfile
+} ESockSendfileCounters;
+static ESockSendfileCounters initESockSendfileCounters =
+ {0, 0, 0, 0, 0, 0, 0, 0, 0};
+
+#endif
+
+
typedef struct {
/*
* +++ This is a way to, possibly, detect memory overrides "and stuff" +++
@@ -888,7 +922,7 @@ typedef struct {
/**/
unsigned int writeState; // For debugging
ESockRequestor currentWriter;
- ESockRequestor* currentWriterP; // NULL or points to currentWriter
+ ESockRequestor* currentWriterP; // NULL or &currentWriter
ESockRequestQueue writersQ;
ESockCounter writePkgCnt;
ESockCounter writePkgMax;
@@ -897,9 +931,13 @@ typedef struct {
ESockCounter writeTries;
ESockCounter writeWaits;
ESockCounter writeFails;
+#ifdef HAVE_SENDFILE
+ SOCKET sendfileSock;
+ ESockSendfileCounters* sendfileCountersP;
+#endif
/* +++ Connector +++ */
ESockRequestor connector;
- ESockRequestor* connectorP;
+ ESockRequestor* connectorP; // NULL or &connector
/* +++ Config stuff +++ */
size_t wCtrlSz; // Write control buffer size
ESockMeta meta; // Level 'otp' option 'meta' term
@@ -909,7 +947,7 @@ typedef struct {
/**/
unsigned int readState; // For debugging
ESockRequestor currentReader;
- ESockRequestor* currentReaderP; // NULL or points to currentReader
+ ESockRequestor* currentReaderP; // NULL or &currentReader
ESockRequestQueue readersQ;
ErlNifBinary rbuffer; // DO WE NEED THIS
Uint32 readCapacity; // DO WE NEED THIS
@@ -922,7 +960,7 @@ typedef struct {
ESockCounter readFails;
/* +++ Accept stuff +++ */
ESockRequestor currentAcceptor;
- ESockRequestor* currentAcceptorP; // NULL or points to currentAcceptor
+ ESockRequestor* currentAcceptorP; // NULL or &currentAcceptor
ESockRequestQueue acceptorsQ;
ESockCounter accSuccess;
ESockCounter accTries;
@@ -1047,6 +1085,7 @@ extern char* erl_errno_id(int error); /* THIS IS JUST TEMPORARY??? */
* nif_send
* nif_sendto
* nif_sendmsg
+ * nif_sendfile
* nif_recv
* nif_recvfrom
* nif_recvmsg
@@ -1143,7 +1182,7 @@ ESOCK_SOCKET_INFO_REQ_FUNCS
static ERL_NIF_TERM socket_info_reqs(ErlNifEnv* env,
ESockDescriptor* descP,
- ESockRequestor* crp,
+ ESockRequestor* currentRequestorP,
ESockRequestQueue* q);
static ERL_NIF_TERM esock_supports_0(ErlNifEnv* env);
@@ -1257,6 +1296,56 @@ static ERL_NIF_TERM esock_sendmsg(ErlNifEnv* env,
ERL_NIF_TERM eMsg,
int flags,
ERL_NIF_TERM eIOV);
+
+#ifdef HAVE_SENDFILE
+static ERL_NIF_TERM
+esock_sendfile_start(ErlNifEnv *env,
+ ESockDescriptor *descP,
+ ERL_NIF_TERM sockRef,
+ ERL_NIF_TERM sendRef,
+ off_t offset,
+ size_t count,
+ ERL_NIF_TERM fRef);
+static ERL_NIF_TERM
+esock_sendfile_cont(ErlNifEnv *env,
+ ESockDescriptor *descP,
+ ERL_NIF_TERM sockRef,
+ ERL_NIF_TERM sendRef,
+ off_t offset,
+ size_t count);
+static ERL_NIF_TERM
+esock_sendfile_deferred_close(ErlNifEnv *env,
+ ESockDescriptor *descP);
+static int
+esock_sendfile(ErlNifEnv *env,
+ ESockDescriptor *descP,
+ ERL_NIF_TERM sockRef,
+ off_t offset,
+ size_t *count,
+ int *errP);
+static ERL_NIF_TERM
+esock_sendfile_error(ErlNifEnv *env,
+ ESockDescriptor *descP,
+ ERL_NIF_TERM sockRef,
+ ERL_NIF_TERM reason);
+static ERL_NIF_TERM
+esock_sendfile_errno(ErlNifEnv *env,
+ ESockDescriptor *descP,
+ ERL_NIF_TERM sockRef,
+ int err);
+static ERL_NIF_TERM
+esock_sendfile_ok(ErlNifEnv *env,
+ ESockDescriptor *descP,
+ ERL_NIF_TERM sockRef,
+ size_t count);
+static ERL_NIF_TERM
+esock_sendfile_select(ErlNifEnv *env,
+ ESockDescriptor *descP,
+ ERL_NIF_TERM sockRef,
+ ERL_NIF_TERM sendRef,
+ size_t count);
+#endif // #ifdef HAVE_SENDFILE
+
static ERL_NIF_TERM esock_recv(ErlNifEnv* env,
ESockDescriptor* descP,
ERL_NIF_TERM sendRef,
@@ -2868,9 +2957,9 @@ static BOOLEAN_T recv_check_reader(ErlNifEnv* env,
static void recv_init_current_reader(ErlNifEnv* env,
ESockDescriptor* descP,
ERL_NIF_TERM ref);
-static ERL_NIF_TERM recv_update_current_reader(ErlNifEnv* env,
- ESockDescriptor* descP,
- ERL_NIF_TERM sockRef);
+static void recv_update_current_reader(ErlNifEnv* env,
+ ESockDescriptor* descP,
+ ERL_NIF_TERM sockRef);
static void recv_error_current_reader(ErlNifEnv* env,
ESockDescriptor* descP,
ERL_NIF_TERM sockRef,
@@ -3172,7 +3261,7 @@ static void requestor_init(ESockRequestor* reqP);
static int requestor_release(const char* slogan,
ErlNifEnv* env,
ESockDescriptor* descP,
- ESockRequestor* reqP);
+ ESockRequestor* reqP);
static BOOLEAN_T qsearch4pid(ErlNifEnv* env,
ESockRequestQueue* q,
@@ -3236,6 +3325,11 @@ static void esock_send_wrap_msg(ErlNifEnv* env,
static void esock_send_close_msg(ErlNifEnv* env,
ESockDescriptor* descP,
ErlNifPid* pid);
+#ifdef HAVE_SENDFILE
+static void
+esock_send_sendfile_deferred_close_msg(ErlNifEnv* env,
+ ESockDescriptor* descP);
+#endif
static void esock_send_abort_msg(ErlNifEnv* env,
ESockDescriptor* descP,
ERL_NIF_TERM sockRef,
@@ -3626,6 +3720,7 @@ ERL_NIF_TERM esock_atom_socket_tag; // This has a "special" name ('$socket')
LOCAL_ATOM_DECL(dont); \
LOCAL_ATOM_DECL(dtor); \
LOCAL_ATOM_DECL(dup); \
+ LOCAL_ATOM_DECL(efile); \
LOCAL_ATOM_DECL(exclude); \
LOCAL_ATOM_DECL(false); \
LOCAL_ATOM_DECL(frag_needed); \
@@ -3690,6 +3785,7 @@ ERL_NIF_TERM esock_atom_socket_tag; // This has a "special" name ('$socket')
LOCAL_ATOM_DECL(pkt_toobig); \
LOCAL_ATOM_DECL(policy_fail); \
LOCAL_ATOM_DECL(port_unreach); \
+ LOCAL_ATOM_DECL(prim_file); \
LOCAL_ATOM_DECL(probe); \
LOCAL_ATOM_DECL(protocols); \
LOCAL_ATOM_DECL(rcvctrlbuf); \
@@ -3709,6 +3805,15 @@ ERL_NIF_TERM esock_atom_socket_tag; // This has a "special" name ('$socket')
LOCAL_ATOM_DECL(selected); \
LOCAL_ATOM_DECL(sender_dry); \
LOCAL_ATOM_DECL(send_failure); \
+ LOCAL_ATOM_DECL(sendfile); \
+ LOCAL_ATOM_DECL(sendfile_byte); \
+ LOCAL_ATOM_DECL(sendfile_deferred_close); \
+ LOCAL_ATOM_DECL(sendfile_fails); \
+ LOCAL_ATOM_DECL(sendfile_max); \
+ LOCAL_ATOM_DECL(sendfile_pkg); \
+ LOCAL_ATOM_DECL(sendfile_pkg_max); \
+ LOCAL_ATOM_DECL(sendfile_tries); \
+ LOCAL_ATOM_DECL(sendfile_waits); \
LOCAL_ATOM_DECL(shutdown); \
LOCAL_ATOM_DECL(slist); \
LOCAL_ATOM_DECL(sndctrlbuf); \
@@ -3806,6 +3911,9 @@ static ESOCK_INLINE ErlNifEnv* esock_alloc_env(const char* slogan)
* nif_send(Sock, SendRef, Data, Flags)
* nif_sendto(Sock, SendRef, Data, Dest, Flags)
* nif_sendmsg(Sock, SendRef, Msg, Flags)
+ * nif_sendfile(Sock, SendRef, Offset, Count, InFileRef)
+ * nif_sendfile(Sock, SendRef, Offset, Count)
+ * nif_sendfile(Sock)
* nif_recv(Sock, Length, Flags, RecvRef)
* nif_recvfrom(Sock, RecvRef, BufSz, Flags)
* nif_recvmsg(Sock, RecvRef, BufSz, CtrlSz, Flags)
@@ -4261,6 +4369,40 @@ ERL_NIF_TERM esock_socket_info_counters(ErlNifEnv* env,
ESOCK_ASSERT( numKeys == numVals );
ESOCK_ASSERT( MKMA(env, keys, vals, numKeys, &cnts) );
+#ifdef HAVE_SENDFILE
+ if (descP->sendfileCountersP != NULL) {
+ ESockSendfileCounters *cP = descP->sendfileCountersP;
+ ERL_NIF_TERM m,
+ sfKeys[] =
+ {atom_sendfile,
+ atom_sendfile_byte,
+ atom_sendfile_fails,
+ atom_sendfile_max,
+ atom_sendfile_pkg,
+ atom_sendfile_pkg_max,
+ atom_sendfile_tries,
+ atom_sendfile_waits},
+ sfVals[] =
+ {MKUI(env, cP->cnt),
+ MKUI(env, cP->byteCnt),
+ MKUI(env, cP->fails),
+ MKUI(env, cP->max),
+ MKUI(env, cP->pkg),
+ MKUI(env, cP->pkgMax),
+ MKUI(env, cP->tries),
+ MKUI(env, cP->waits)};
+ size_t n, numSfKeys = NUM(sfKeys);
+
+ ESOCK_ASSERT( numSfKeys == NUM(sfVals) );
+ for (n = 0; n < numSfKeys; n++) {
+ ESOCK_ASSERT( enif_make_map_put(env, cnts,
+ sfKeys[n], sfVals[n],
+ &m) );
+ cnts = m;
+ }
+ }
+#endif
+
SSDBG( descP, ("SOCKET", "esock_socket_info_counters -> done with"
"\r\n cnts: %T"
"\r\n", cnts) );
@@ -4439,7 +4581,7 @@ ERL_NIF_TERM esock_command_use_socket_registry(ErlNifEnv* env,
#define ESOCK_INFO_REQ_FUNCS \
ESOCK_INFO_REQ_FUNC_DECL(readers, currentReaderP, readersQ) \
- ESOCK_INFO_REQ_FUNC_DECL(writers, currentWriterP, writersQ) \
+ ESOCK_INFO_REQ_FUNC_DECL(writers, currentWriterP, writersQ) \
ESOCK_INFO_REQ_FUNC_DECL(acceptors, currentAcceptorP, acceptorsQ)
#define ESOCK_INFO_REQ_FUNC_DECL(F, CRP, Q) \
@@ -4455,14 +4597,14 @@ ESOCK_INFO_REQ_FUNCS
static
ERL_NIF_TERM socket_info_reqs(ErlNifEnv* env,
ESockDescriptor* descP,
- ESockRequestor* crp,
+ ESockRequestor* currentRequestorP,
ESockRequestQueue* q)
{
ESockRequestQueueElement* tmp;
ERL_NIF_TERM info;
unsigned int cnt = 0;
- if (crp != NULL) {
+ if (currentRequestorP != NULL) {
// We have an active requestor!
cnt++;
@@ -5588,8 +5730,8 @@ ERL_NIF_TERM esock_connect(ErlNifEnv* env,
ESockAddress* addrP,
SOCKLEN_T addrLen)
{
- int sres, save_errno;
- ErlNifPid self;
+ int save_errno;
+ ErlNifPid self;
ESOCK_ASSERT( enif_self(env, &self) != NULL );
@@ -5625,9 +5767,8 @@ ERL_NIF_TERM esock_connect(ErlNifEnv* env,
/* Finalize after received select message */
requestor_release("esock_connect finalize -> connected",
- env, descP, descP->connectorP);
+ env, descP, &descP->connector);
descP->connectorP = NULL;
-
descP->writeState &= ~ESOCK_STATE_CONNECTING;
if (! verify_is_connected(descP, &save_errno)) {
@@ -5671,31 +5812,27 @@ ERL_NIF_TERM esock_connect(ErlNifEnv* env,
("SOCKET", "esock_connect {%d} -> would block => select\r\n",
descP->sock) );
{
- /* Initiate connector */
+ int sres;
+ if ((sres =
+ esock_select_write(env, descP->sock, descP, NULL,
+ sockRef, connRef)) < 0)
+ return
+ enif_raise_exception(env,
+ MKT2(env, atom_select_write,
+ MKI(env, sres)));
+ /* Initiate connector */
descP->connector.pid = self;
ESOCK_ASSERT( MONP("esock_connect -> conn",
env, descP,
&self, &descP->connector.mon) == 0 );
descP->connector.env = esock_alloc_env("connector");
- descP->connectorP = &descP->connector;
descP->connector.ref = CP_TERM(descP->connector.env, connRef);
+ descP->connectorP = &descP->connector;
+ descP->writeState |=
+ (ESOCK_STATE_CONNECTING | ESOCK_STATE_SELECTED);
- if ((sres =
- esock_select_write(env, descP->sock, descP, NULL,
- sockRef, connRef)) < 0) {
-
- requestor_release("esock_connect -> select failed",
- env, descP, descP->connectorP);
- descP->connectorP = NULL;
- return enif_raise_exception(env,
- MKT2(env, atom_select_write,
- MKI(env, sres)));
- } else {
- descP->writeState |=
- (ESOCK_STATE_CONNECTING | ESOCK_STATE_SELECTED);
- return atom_select;
- }
+ return atom_select;
}
break;
@@ -5898,11 +6035,11 @@ ERL_NIF_TERM nif_accept(ErlNifEnv* env,
SSDBG( descP,
("SOCKET", "nif_accept%T), {%d,0x%X} ->"
"\r\n ReqRef: %T"
- "\r\n Current Acceptor Addr: %p"
- "\r\n Current Acceptor pid: %T"
- "\r\n Current Acceptor mon: %T"
- "\r\n Current Acceptor env: 0x%lX"
- "\r\n Current Acceptor ref: %T"
+ "\r\n Current Acceptor addr: %p"
+ "\r\n Current Acceptor pid: %T"
+ "\r\n Current Acceptor mon: %T"
+ "\r\n Current Acceptor env: 0x%lX"
+ "\r\n Current Acceptor ref: %T"
"\r\n",
sockRef, descP->sock, descP->readState,
ref,
@@ -6156,7 +6293,7 @@ ERL_NIF_TERM esock_accept_accepting_current_accept(ErlNifEnv* env,
descP->readState &= ~ESOCK_STATE_ACCEPTING;
- descP->currentAcceptorP = NULL;
+ descP->currentAcceptorP = NULL;
}
}
@@ -6210,7 +6347,7 @@ ERL_NIF_TERM esock_accept_accepting_current_error(ErlNifEnv* env,
ESOCK_CNT_INC(env, descP, sockRef, atom_acc_fails, &descP->accFails, 1);
requestor_release("esock_accept_accepting_current_error",
- env, descP, descP->currentAcceptorP);
+ env, descP, &descP->currentAcceptor);
reason = MKA(env, erl_errno_id(save_errno));
res = esock_make_error(env, reason);
@@ -6227,7 +6364,7 @@ ERL_NIF_TERM esock_accept_accepting_current_error(ErlNifEnv* env,
DEMONP("esock_accept_accepting_current_error -> pop'ed writer",
env, descP, &req.mon);
}
- descP->currentAcceptorP = NULL;
+ descP->currentAcceptorP = NULL;
}
return res;
@@ -6289,7 +6426,7 @@ ERL_NIF_TERM esock_accept_busy_retry(ErlNifEnv* env,
descP->readState &= ~ESOCK_STATE_ACCEPTING;
- descP->currentAcceptorP = NULL;
+ descP->currentAcceptorP = NULL;
}
res =
@@ -6514,9 +6651,6 @@ ERL_NIF_TERM esock_send(ErlNifEnv* env,
return writerCheck;
}
- /* We ignore the wrap for the moment.
- * Maybe we should issue a wrap-message to controlling process...
- */
ESOCK_CNT_INC(env, descP, sockRef, atom_write_tries, &descP->writeTries, 1);
send_result =
@@ -6662,9 +6796,6 @@ ERL_NIF_TERM esock_sendto(ErlNifEnv* env,
return writerCheck;
}
- /* We ignore the wrap for the moment.
- * Maybe we should issue a wrap-message to controlling process...
- */
ESOCK_CNT_INC(env, descP, sockRef, atom_write_tries, &descP->writeTries, 1);
if (toAddrP != NULL) {
@@ -6963,9 +7094,6 @@ ERL_NIF_TERM esock_sendmsg(ErlNifEnv* env,
* but zero it just in case */
msgHdr.msg_flags = 0;
- /* We ignore the wrap for the moment.
- * Maybe we should issue a wrap-message to controlling process...
- */
ESOCK_CNT_INC(env, descP, sockRef, atom_write_tries, &descP->writeTries, 1);
/* And now, try to send the message */
@@ -7059,6 +7187,676 @@ ERL_NIF_TERM nwritev(ErlNifEnv* env,
/* ----------------------------------------------------------------------
+ * nif_sendfile/1,4,5
+ *
+ * Description:
+ * Send a file on a socket
+ *
+ * Arguments:
+ * Socket (ref) - Points to the socket descriptor.
+ *
+ * SendRef - A unique id reference() for this (send) request.
+ *
+ * Offset - File offset to start from.
+ * Count - The number of bytes to send.
+ *
+ * InFileRef - A file NIF resource.
+ */
+
+static ERL_NIF_TERM
+nif_sendfile(ErlNifEnv* env,
+ int argc,
+ const ERL_NIF_TERM argv[])
+{
+#if defined(__WIN32__) || !defined(HAVE_SENDFILE)
+ return enif_raise_exception(env, MKA(env, "notsup"));
+#else
+ ESockDescriptor *descP;
+ ERL_NIF_TERM sockRef, res;
+
+ SGDBG( ("SOCKET", "nif_sendfile -> entry with argc: %d\r\n", argc) );
+
+ if (argc < 1) {
+ SGDBG( ("SOCKET", "nif_sendfile -> argc < 1\r\n") );
+ return enif_make_badarg(env);
+ }
+ sockRef = argv[0];
+ if (! ESOCK_GET_RESOURCE(env, sockRef, (void**) (&descP))) {
+ SSDBG( descP, ("SOCKET", "nif_sendfile -> get resource failed\r\n") );
+ return enif_make_badarg(env);
+ }
+
+ if (argc < 2) { // argc == 1
+
+ MLOCK(descP->writeMtx);
+
+ SSDBG( descP,
+ ("SOCKET", "nif_sendfile(%T), {%d,0x%X} ->"
+ "\r\n",
+ sockRef, descP->sock, descP->writeState) );
+
+ res = esock_sendfile_deferred_close(env, descP);
+
+ } else {
+ ERL_NIF_TERM sendRef;
+ ErlNifSInt64 offset64;
+ ErlNifUInt64 count64u;
+ off_t offset;
+ size_t count;
+ BOOLEAN_T a2ok;
+
+ ESOCK_ASSERT( argc >= 4 );
+
+ sendRef = argv[1];
+ if ((! enif_is_ref(env, sendRef))) {
+ SGDBG( ("SOCKET", "nif_sendfile -> argv[1] decode failed\r\n") );
+ return enif_make_badarg(env);
+ }
+
+ if ((! (a2ok = GET_INT64(env, argv[2], &offset64))) ||
+ (! GET_UINT64(env, argv[3], &count64u))) {
+ if ((! IS_INTEGER(env, argv[3])) ||
+ (! IS_INTEGER(env, argv[3])))
+ return enif_make_badarg(env);
+ if (! a2ok)
+ return esock_make_error_integer_range(env, argv[2]);
+ else
+ return esock_make_error_integer_range(env, argv[3]);
+ }
+ offset = (off_t) offset64;
+ if (offset64 != (ErlNifSInt64) offset)
+ return esock_make_error_integer_range(env, argv[2]);
+ count = (size_t) count64u;
+ if (count64u != (ErlNifUInt64) count)
+ return esock_make_error_integer_range(env, argv[3]);
+
+ if (argc == 4) {
+
+ MLOCK(descP->writeMtx);
+
+ SSDBG( descP,
+ ("SOCKET", "nif_sendfile(%T), {%d,0x%X} ->"
+ "\r\n sendRef: %T"
+ "\r\n offset: %ld"
+ "\r\n count: %ld"
+ "\r\n",
+ sockRef, descP->sock, descP->readState,
+ sendRef, (long) offset, (long) count) );
+
+ res =
+ esock_sendfile_cont(env, descP, sockRef, sendRef,
+ offset, count);
+ } else {
+ ERL_NIF_TERM fRef;
+
+ ESOCK_ASSERT( argc == 5 );
+
+ fRef = argv[4];
+ if ((! enif_is_ref(env, fRef)))
+ return enif_make_badarg(env);
+
+ MLOCK(descP->writeMtx);
+
+ SSDBG( descP,
+ ("SOCKET", "nif_sendfile(%T), {%d,0x%X} ->"
+ "\r\n sendRef: %T"
+ "\r\n offset: %ld"
+ "\r\n count: %ld"
+ "\r\n fRef: %T"
+ "\r\n",
+ sockRef, descP->sock, descP->readState,
+ sendRef, (long) offset, (long) count, fRef) );
+
+ res =
+ esock_sendfile_start(env, descP, sockRef, sendRef,
+ offset, count, fRef);
+ }
+ }
+
+ SSDBG( descP, ("SOCKET", "nif_sendfile(%T) -> done with"
+ "\r\n res: %T"
+ "\r\n", sockRef, res) );
+
+ MUNLOCK(descP->writeMtx);
+
+ return res;
+
+#endif // #if defined(__WIN32__) || !defined(HAVE_SENDFILE)
+}
+
+#ifndef __WIN32__
+#ifdef HAVE_SENDFILE
+
+/* Start a sendfile() operation
+ */
+static ERL_NIF_TERM
+esock_sendfile_start(ErlNifEnv *env,
+ ESockDescriptor *descP,
+ ERL_NIF_TERM sockRef,
+ ERL_NIF_TERM sendRef,
+ off_t offset,
+ size_t count,
+ ERL_NIF_TERM fRef) {
+ ERL_NIF_TERM writerCheck;
+ ssize_t res;
+ int err;
+
+ SSDBG( descP, ("SOCKET",
+ "esock_sendfile_start(%T) {%d} -> sendRef: %T\r\n"
+ " fRef: %T\r\n"
+ " offset: %lu\r\n"
+ " count: %lu\r\n",
+ sockRef, descP->sock, sendRef,
+ fRef, (unsigned long) offset, (unsigned long) count) );
+
+ if (! IS_OPEN(descP->writeState)) {
+ return esock_make_error(env, atom_closed);
+ }
+
+ /* Connect and Write uses the same select flag
+ * so they can not be simultaneous
+ */
+ if (descP->connectorP != NULL) {
+ return esock_make_error_invalid(env, atom_state);
+ }
+
+ /* Ensure that we either have no current writer or we are it,
+ * or enqueue this process if there is a current writer
+ */
+ if (! send_check_writer(env, descP, sendRef, &writerCheck)) {
+ SSDBG( descP, ("SOCKET",
+ "esock_sendfile_start {%d} -> writer check failed: "
+ "\r\n %T\r\n", descP->sock, writerCheck) );
+
+ /* Returns 'select' if current process got enqueued,
+ * or exception invalid state if current process already
+ * was enqueued
+ */
+ return writerCheck;
+ }
+
+ if (descP->sendfileSock != INVALID_SOCKET)
+ return esock_make_error_invalid(env, atom_state);
+
+ /* Get a dup:ed file handle from prim_file_nif
+ * through a NIF dyncall
+ */
+ {
+ struct {
+ struct prim_file_nif_dyncall_dup dup;
+ struct prim_file_nif_dyncall end;
+ } dc;
+ ErlNifBinary dc_bin[1];
+
+ dc.dup.common.size =
+ CHARP(&dc.end) - CHARP(&dc.dup);
+ dc.dup.common.op = prim_file_nif_dyncall_dup;
+ dc.dup.common.completed = 0;
+ dc.end.size = 0;
+
+ /* Request the handle */
+ if ((enif_dynamic_resource_call(env,
+ atom_prim_file, atom_efile, fRef,
+ &dc)
+ != 0) ||
+ (! dc.dup.common.completed)) {
+ //
+ goto error_invalid_efile;
+ }
+ if (dc.dup.error != 0) {
+ return
+ esock_sendfile_errno(env, descP, sockRef,
+ dc.dup.error);
+ }
+ if (! enif_inspect_binary(env, dc.dup.handle, dc_bin))
+ goto error_invalid_efile;
+ if (dc_bin->size != sizeof(descP->sendfileSock)) {
+ enif_release_binary(dc_bin);
+ goto error_invalid_efile;
+ }
+ sys_memcpy(&descP->sendfileSock, dc_bin->data,
+ sizeof(descP->sendfileSock));
+ enif_release_binary(dc_bin);
+ goto sendfile;
+
+ error_invalid_efile:
+ return esock_sendfile_error(env, descP, sockRef,
+ MKT2(env, esock_atom_invalid,
+ atom_efile));
+ }
+
+ sendfile:
+ SSDBG( descP, ("SOCKET",
+ "esock_sendfile_start(%T) {%d} -> sendRef: %T\r\n"
+ " sendfileSock: %d\r\n",
+ sockRef, descP->sock, sendRef,
+ descP->sendfileSock) );
+
+ if (descP->sendfileCountersP == NULL) {
+ descP->sendfileCountersP = MALLOC(sizeof(ESockSendfileCounters));
+ *descP->sendfileCountersP = initESockSendfileCounters;
+ }
+
+ ESOCK_CNT_INC(env, descP, sockRef,
+ atom_sendfile_tries, &descP->sendfileCountersP->tries, 1);
+ descP->sendfileCountersP->maxCnt = 0;
+
+ res = esock_sendfile(env, descP, sockRef, offset, &count, &err);
+
+ if (res < 0) { // Terminal error
+
+ (void) close(descP->sendfileSock);
+ descP->sendfileSock = INVALID_SOCKET;
+
+ return esock_sendfile_errno(env, descP, sockRef, err);
+
+ } else if (res > 0) { // Retry by select
+
+ if (descP->currentWriterP == NULL) {
+ int mon_res;
+
+ /* Register writer as current */
+ ESOCK_ASSERT( enif_self(env, &descP->currentWriter.pid) != NULL );
+ mon_res =
+ MONP("sendfile -> current writer",
+ env, descP,
+ &descP->currentWriter.pid,
+ &descP->currentWriter.mon);
+ ESOCK_ASSERT( mon_res >= 0 );
+
+ if (mon_res > 0) {
+ /* Caller died already, can happen for dirty NIFs */
+
+ (void) close(descP->sendfileSock);
+ descP->sendfileSock = INVALID_SOCKET;
+
+ return
+ esock_sendfile_error(env, descP, sockRef,
+ MKT2(env,
+ esock_atom_invalid,
+ esock_atom_not_owner));
+ }
+ ESOCK_ASSERT( descP->currentWriter.env == NULL );
+ descP->currentWriter.env = esock_alloc_env("current-writer");
+ descP->currentWriter.ref =
+ CP_TERM(descP->currentWriter.env, sendRef);
+ descP->currentWriterP = &descP->currentWriter;
+ }
+ // else current writer is already registered by requestor_pop()
+
+ return esock_sendfile_select(env, descP, sockRef, sendRef, count);
+
+ } else { // res == 0: Done
+ return esock_sendfile_ok(env, descP, sockRef, count);
+ }
+}
+
+/* Continue an ongoing sendfile operation
+ */
+static ERL_NIF_TERM
+esock_sendfile_cont(ErlNifEnv *env,
+ ESockDescriptor *descP,
+ ERL_NIF_TERM sockRef,
+ ERL_NIF_TERM sendRef,
+ off_t offset,
+ size_t count) {
+ ErlNifPid caller;
+ ssize_t res;
+ int err;
+
+ SSDBG( descP, ("SOCKET",
+ "esock_sendfile_cont(%T) {%d} -> sendRef: %T\r\n",
+ sockRef, descP->sock, sendRef) );
+
+ if (! IS_OPEN(descP->writeState))
+ return esock_make_error(env, atom_closed);
+
+ /* Connect and Write uses the same select flag
+ * so they can not be simultaneous
+ */
+ if (descP->connectorP != NULL)
+ return esock_make_error_invalid(env, atom_state);
+
+ /* Verify that this process has a sendfile operation in progress */
+ ESOCK_ASSERT( enif_self(env, &caller) != NULL );
+ if ((descP->currentWriterP == NULL) ||
+ (descP->sendfileSock == INVALID_SOCKET) ||
+ (COMPARE_PIDS(&descP->currentWriter.pid, &caller) != 0)) {
+ //
+ return esock_raise_invalid(env, atom_state);
+ }
+
+ res = esock_sendfile(env, descP, sockRef, offset, &count, &err);
+
+ if (res < 0) { // Terminal error
+
+ (void) close(descP->sendfileSock);
+ descP->sendfileSock = INVALID_SOCKET;
+
+ return esock_sendfile_errno(env, descP, sockRef, err);
+
+ } else if (res > 0) { // Retry by select
+
+ /* Overwrite current writer registration */
+ enif_clear_env(descP->currentWriter.env);
+ descP->currentWriter.ref =
+ CP_TERM(descP->currentWriter.env, sendRef);
+
+ return esock_sendfile_select(env, descP, sockRef, sendRef, count);
+
+ } else { // res == 0: Done
+ return esock_sendfile_ok(env, descP, sockRef, count);
+ }
+}
+
+/* Deferred close of the dup:ed file descriptor
+ */
+static ERL_NIF_TERM
+esock_sendfile_deferred_close(ErlNifEnv *env,
+ ESockDescriptor *descP) {
+ if (descP->sendfileSock == INVALID_SOCKET)
+ return esock_make_error_invalid(env, atom_state);
+
+ (void) close(descP->sendfileSock);
+ descP->sendfileSock = INVALID_SOCKET;
+
+ return esock_atom_ok;
+}
+
+/* Platform independent sendfile() function
+ *
+ * Return < 0 for terminal error
+ * 0 for done
+ * > 0 for retry with select
+ */
+static int
+esock_sendfile(ErlNifEnv *env,
+ ESockDescriptor *descP,
+ ERL_NIF_TERM sockRef,
+ off_t offset,
+ size_t *countP,
+ int *errP) {
+
+ size_t pkgSize = 0; // Total sent in this call
+
+ SSDBG( descP, ("SOCKET",
+ "esock_sendfile(%T) {%d}\r\n",
+ sockRef, descP->sock) );
+
+ for (;;) {
+ size_t chunk_size = (size_t) 0x20000000UL; // 0.5 GB
+ size_t bytes_sent;
+ ssize_t res;
+ int error;
+
+ /* *countP == 0 means send the whole file - use chunk size */
+ if ((*countP > 0) && (*countP < chunk_size))
+ chunk_size = *countP;
+
+ {
+ /* Platform dependent code:
+ * set and check bytes_sent, or
+ * set res to >= 0, or
+ * set res to < 0 and error to sock_errno()
+ */
+#if defined (__linux__)
+
+ off_t prev_offset;
+
+ prev_offset = offset;
+ res =
+ sendfile(descP->sock, descP->sendfileSock,
+ &offset, chunk_size);
+ if (res < 0)
+ error = sock_errno();
+ ESOCK_ASSERT( offset >= prev_offset );
+ ESOCK_ASSERT( (off_t) chunk_size >= (offset - prev_offset) );
+ bytes_sent = (size_t) (offset - prev_offset);
+
+#elif defined(__FreeBSD__) || defined(__DragonFly__) || defined(__DARWIN__)
+
+ off_t sbytes;
+
+#if defined(__DARWIN__)
+ sbytes = (off_t) chunk_size;
+ res = (ssize_t)
+ sendfile(descP->sendfileSock, descP->sock, offset,
+ &sbytes, NULL, 0);
+#else
+ res = (ssize_t)
+ sendfile(descP->sendfileSock, descP->sock, offset,
+ chunk_size, NULL, &sbytes, 0);
+#endif
+ if (res < 0)
+ error = sock_errno();
+ ESOCK_ASSERT( sbytes >= 0 );
+ ESOCK_ASSERT( (off_t) chunk_size >= sbytes );
+ bytes_sent = (size_t) sbytes;
+
+#elif defined(__sun) && defined(__SVR4) && defined(HAVE_SENDFILEV)
+
+ sendfilevec_t sfvec[1];
+
+ sfvec[0].sfv_fd = descP->sendfileSock;
+ sfvec[0].sfv_flag = 0;
+ sfvec[0].sfv_off = offset;
+ sfvec[0].sfv_len = chunk_size;
+
+ res = sendfilev(descP->sock, sfvec, NUM(sfvec), &bytes_sent);
+ if (res < 0) {
+ if ((error = sock_errno()) == EINVAL) {
+ /* On e.b SunOS 5.10 using sfv_len > file size
+ * lands here - we regard this as a succesful send
+ * by pretending it is an EINTR causing the loop
+ * to continue with more data up to end of file
+ */
+ error = EINTR;
+ }
+ }
+ ESOCK_ASSERT( chunk_size >= bytes_sent );
+
+#else
+#error "Unsupported sendfile syscall; update configure test."
+#endif
+
+ ESOCK_CNT_INC(env, descP, sockRef,
+ atom_sendfile, &descP->sendfileCountersP->cnt, 1);
+
+ if (bytes_sent != 0) {
+
+ pkgSize += bytes_sent;
+
+ ESOCK_CNT_INC(env, descP, sockRef,
+ atom_sendfile_pkg,
+ &descP->sendfileCountersP->pkg,
+ 1);
+ ESOCK_CNT_INC(env, descP, sockRef,
+ atom_sendfile_byte,
+ &descP->sendfileCountersP->byteCnt,
+ bytes_sent);
+
+ if (pkgSize > descP->sendfileCountersP->pkgMax)
+ descP->sendfileCountersP->pkgMax = pkgSize;
+ if ((descP->sendfileCountersP->maxCnt += bytes_sent)
+ > descP->sendfileCountersP->max)
+ descP->sendfileCountersP->max =
+ descP->sendfileCountersP->maxCnt;
+ }
+
+ /* *countP == 0 means send whole file */
+ if ((*countP > 0) &&
+ ((*countP -= bytes_sent) == 0)) { // All sent
+ *countP = pkgSize;
+ return 0;
+ }
+
+ if (res < 0) {
+ if (error == ERRNO_BLOCK) {
+ *countP = pkgSize;
+ return 1;
+ }
+ if (error == EINTR)
+ continue;
+ *errP = error;
+ return INVALID_SOCKET;
+ }
+
+ if (bytes_sent == 0) { // End of input file
+ *countP = pkgSize;
+ return 0;
+ }
+ }
+ }
+ return 0;
+}
+
+static ERL_NIF_TERM
+esock_sendfile_errno(ErlNifEnv *env,
+ ESockDescriptor *descP,
+ ERL_NIF_TERM sockRef,
+ int err) {
+ ERL_NIF_TERM reason;
+
+ reason = MKA(env, erl_errno_id(err));
+ return esock_sendfile_error(env, descP, sockRef, reason);
+}
+
+static ERL_NIF_TERM
+esock_sendfile_error(ErlNifEnv *env,
+ ESockDescriptor *descP,
+ ERL_NIF_TERM sockRef,
+ ERL_NIF_TERM reason) {
+
+ if (descP->sendfileCountersP == NULL) {
+ descP->sendfileCountersP = MALLOC(sizeof(ESockSendfileCounters));
+ *descP->sendfileCountersP = initESockSendfileCounters;
+ }
+
+ ESOCK_CNT_INC(env, descP, sockRef,
+ atom_sendfile_fails,
+ &descP->sendfileCountersP->fails, 1);
+
+ SSDBG( descP, ("SOCKET",
+ "esock_sendfile_error(%T) {%d} -> error: %T\r\n",
+ sockRef, descP->sock, reason) );
+
+ /* XXX Should we have special treatment for EINVAL,
+ * such as to only fail current operation and activate
+ * the next from the queue?
+ */
+
+ if (descP->currentWriterP != NULL) {
+
+ DEMONP("esock_sendfile_error",
+ env, descP, &descP->currentWriter.mon);
+
+ /* Fail all queued writers */
+ requestor_release("esock_sendfile_error",
+ env, descP, &descP->currentWriter);
+ send_error_waiting_writers(env, descP, sockRef, reason);
+ descP->currentWriterP = NULL;
+
+ }
+
+ return esock_make_error(env, reason);
+}
+
+static ERL_NIF_TERM
+esock_sendfile_select(ErlNifEnv *env,
+ ESockDescriptor *descP,
+ ERL_NIF_TERM sockRef,
+ ERL_NIF_TERM sendRef,
+ size_t count) {
+ int sres;
+
+ /* Select write for this process */
+ sres =
+ esock_select_write(env, descP->sock, descP, NULL, sockRef, sendRef);
+ if (sres < 0) {
+ ERL_NIF_TERM reason;
+
+ /* Internal select error */
+ DEMONP("esock_sendfile_select - failed",
+ env, descP, &descP->currentWriter.mon);
+
+ /* Fail all queued writers */
+ reason = MKT2(env, atom_select_write, MKI(env, sres));
+ requestor_release("esock_sendfile_select_fail",
+ env, descP, &descP->currentWriter);
+ send_error_waiting_writers(env, descP, sockRef, reason);
+ descP->currentWriterP = NULL;
+
+ (void) close(descP->sendfileSock);
+ descP->sendfileSock = INVALID_SOCKET;
+
+ return enif_raise_exception(env, reason);
+
+ } else {
+ ErlNifUInt64 bytes_sent;
+
+ SSDBG( descP,
+ ("SOCKET", "esock_sendfile_select(%T) {%d} -> "
+ "sendRef (%T)\r\n"
+ "count: %lu\r\n",
+ sockRef, descP->sock, sendRef, (unsigned long) count) );
+
+ ESOCK_CNT_INC(env, descP, sockRef,
+ atom_sendfile_waits,
+ &descP->sendfileCountersP->waits,
+ 1);
+ descP->writeState |= ESOCK_STATE_SELECTED;
+ bytes_sent = (ErlNifUInt64) count;
+
+ return MKT2(env, atom_select, MKUI64(env, bytes_sent));
+ }
+}
+
+static ERL_NIF_TERM
+esock_sendfile_ok(ErlNifEnv *env,
+ ESockDescriptor *descP,
+ ERL_NIF_TERM sockRef,
+ size_t count) {
+ ErlNifUInt64 bytes_sent64u;
+
+ SSDBG( descP,
+ ("SOCKET", "esock_sendfile_ok(%T) {%d} -> "
+ "everything written (%lu) - done\r\n",
+ sockRef, descP->sock, (unsigned long) count) );
+
+ if (descP->currentWriterP != NULL) {
+
+ DEMONP("esock_sendfile_ok -> current writer",
+ env, descP, &descP->currentWriter.mon);
+
+ /*
+ * Ok, this write is done maybe activate the next (if any)
+ */
+ if (! activate_next_writer(env, descP, sockRef)) {
+
+ SSDBG( descP,
+ ("SOCKET",
+ "esock_sendfile_ok(%T) {%d} -> no more writers\r\n",
+ sockRef, descP->sock) );
+
+ descP->currentWriterP = NULL;
+ }
+ }
+
+ descP->writePkgMaxCnt = 0;
+ bytes_sent64u = (ErlNifUInt64) count;
+
+ (void) close(descP->sendfileSock);
+ descP->sendfileSock = INVALID_SOCKET;
+
+ return esock_make_ok2(env, MKUI64(env, bytes_sent64u));
+}
+
+#endif // #ifdef HAVE_SENDFILE
+#endif // #ifndef __WIN32__
+
+
+
+/* ----------------------------------------------------------------------
* nif_recv
*
* Description:
@@ -7089,6 +7887,7 @@ ERL_NIF_TERM nif_recv(ErlNifEnv* env,
ssize_t len; /* ssize_t due to the return type of recv() */
int flags;
ERL_NIF_TERM res;
+ BOOLEAN_T a1ok;
ESOCK_ASSERT( argc == 4 );
@@ -7103,15 +7902,13 @@ ERL_NIF_TERM nif_recv(ErlNifEnv* env,
(COMPARE(recvRef, atom_zero) != 0)) {
return enif_make_badarg(env);
}
- if ((! GET_UINT64(env, argv[1], &elen)) ||
+ if ((! (a1ok = GET_UINT64(env, argv[1], &elen))) ||
(! GET_INT(env, argv[2], &flags))) {
- BOOLEAN_T argv1_is_integer = IS_INTEGER(env, argv[1]);
-
- if ((! argv1_is_integer) ||
+ if ((! IS_INTEGER(env, argv[1])) ||
(! IS_INTEGER(env, argv[2])))
return enif_make_badarg(env);
- if (argv1_is_integer)
+ if (! a1ok)
return esock_make_error_integer_range(env, argv[1]);
return
esock_make_error_integer_range(env, argv[2]);
@@ -7264,6 +8061,7 @@ ERL_NIF_TERM nif_recvfrom(ErlNifEnv* env,
ssize_t len; /* ssize_t due to the return type of recvfrom() */
int flags;
ERL_NIF_TERM res;
+ BOOLEAN_T a1ok;
ESOCK_ASSERT( argc == 4 );
@@ -7283,15 +8081,13 @@ ERL_NIF_TERM nif_recvfrom(ErlNifEnv* env,
return enif_make_badarg(env);
}
- if ((! GET_UINT64(env, argv[1], &elen)) ||
+ if ((! (a1ok = GET_UINT64(env, argv[1], &elen))) ||
(! GET_INT(env, argv[2], &flags))) {
- BOOLEAN_T argv1_is_integer = IS_INTEGER(env, argv[1]);
-
- if ((! argv1_is_integer) ||
+ if ((! IS_INTEGER(env, argv[1])) ||
(! IS_INTEGER(env, argv[2])))
return enif_make_badarg(env);
- if (argv1_is_integer)
+ if (! a1ok)
return esock_make_error_integer_range(env, argv[1]);
return
esock_make_error_integer_range(env, argv[2]);
@@ -7451,6 +8247,7 @@ ERL_NIF_TERM nif_recvmsg(ErlNifEnv* env,
ssize_t bufSz, ctrlSz;
int flags;
ERL_NIF_TERM res;
+ BOOLEAN_T a1ok, a2ok;
ESOCK_ASSERT( argc == 5 );
@@ -7470,21 +8267,17 @@ ERL_NIF_TERM nif_recvmsg(ErlNifEnv* env,
return enif_make_badarg(env);
}
- if ((! GET_UINT64(env, argv[1], &eBufSz)) ||
- (! GET_UINT64(env, argv[2], &eCtrlSz)) ||
+ if ((! (a1ok = GET_UINT64(env, argv[1], &eBufSz))) ||
+ (! (a2ok = GET_UINT64(env, argv[2], &eCtrlSz))) ||
(! GET_INT(env, argv[3], &flags))) {
- BOOLEAN_T
- argv1_is_integer = IS_INTEGER(env, argv[1]),
- argv2_is_integer;
-
- if ((! argv1_is_integer) ||
- (! (argv2_is_integer = IS_INTEGER(env, argv[2]))) ||
+ if ((! IS_INTEGER(env, argv[1])) ||
+ (! IS_INTEGER(env, argv[2])) ||
(! IS_INTEGER(env, argv[3])))
return enif_make_badarg(env);
- if (argv1_is_integer)
+ if (! a1ok)
return esock_make_error_integer_range(env, argv[1]);
- if (argv2_is_integer)
+ if (! a2ok)
return esock_make_error_integer_range(env, argv[2]);
return
esock_make_error_integer_range(env, argv[3]);
@@ -7795,7 +8588,7 @@ BOOLEAN_T esock_do_stop(ErlNifEnv* env,
esock_stop_handle_current(env,
"writer",
- descP, sockRef, descP->currentWriterP);
+ descP, sockRef, &descP->currentWriter);
}
/* Inform the waiting Writers (in the same way) */
@@ -7831,7 +8624,7 @@ BOOLEAN_T esock_do_stop(ErlNifEnv* env,
esock_stop_handle_current(env,
"connector",
- descP, sockRef, descP->connectorP);
+ descP, sockRef, &descP->connector);
}
descP->connectorP = NULL;
@@ -7852,7 +8645,7 @@ BOOLEAN_T esock_do_stop(ErlNifEnv* env,
esock_stop_handle_current(env,
"reader",
- descP, sockRef, descP->currentReaderP);
+ descP, sockRef, &descP->currentReader);
}
/* Inform the Readers (in the same way) */
@@ -7889,7 +8682,7 @@ BOOLEAN_T esock_do_stop(ErlNifEnv* env,
esock_stop_handle_current(env,
"acceptor",
- descP, sockRef, descP->currentAcceptorP);
+ descP, sockRef, &descP->currentAcceptor);
}
/* Inform the waiting Acceptor (in the same way) */
@@ -7995,17 +8788,19 @@ ERL_NIF_TERM esock_finalize_close(ErlNifEnv* env,
// Close the socket
/* Stop monitoring the closer.
- * since it is the caller, demonitoring must succeed
+ * Demonitoring may fail since this is a dirty NIF
+ * - the caller may have died already.
*/
enif_set_pid_undefined(&descP->closerPid);
if (descP->closerMon.isActive) {
- ESOCK_ASSERT( DEMONP("esock_finalize_close -> closer",
- env, descP, &descP->closerMon) == 0 );
+ (void) DEMONP("esock_finalize_close -> closer",
+ env, descP, &descP->closerMon);
}
/* Stop monitoring the owner */
enif_set_pid_undefined(&descP->ctrlPid);
- DEMONP("esock_finalize_close -> ctrl", env, descP, &descP->ctrlMon);
+ (void) DEMONP("esock_finalize_close -> ctrl",
+ env, descP, &descP->ctrlMon);
/* Not impossible to still get a esock_down() call from a
* just triggered owner monitor down
*/
@@ -8020,6 +8815,13 @@ ERL_NIF_TERM esock_finalize_close(ErlNifEnv* env,
err = esock_close_socket(env, descP, TRUE);
+#ifdef HAVE_SENDFILE
+ if (descP->sendfileSock != INVALID_SOCKET) {
+ (void) close(descP->sendfileSock);
+ descP->sendfileSock = INVALID_SOCKET;
+ }
+#endif
+
if (err != 0) {
if (err == ERRNO_BLOCK) {
/* Not all data in the buffers where sent,
@@ -11599,6 +12401,8 @@ ERL_NIF_TERM esock_cancel(ErlNifEnv* env,
ERL_NIF_TERM sockRef,
ERL_NIF_TERM opRef)
{
+ int cmp;
+
/* <KOLLA>
*
* Do we really need all these variants? Should it not be enough with:
@@ -11607,23 +12411,37 @@ ERL_NIF_TERM esock_cancel(ErlNifEnv* env,
*
* </KOLLA>
*/
- if (COMPARE(op, esock_atom_connect) == 0) {
- return esock_cancel_connect(env, descP, opRef);
- } else if (COMPARE(op, esock_atom_accept) == 0) {
- return esock_cancel_accept(env, descP, sockRef, opRef);
- } else if (COMPARE(op, esock_atom_send) == 0) {
- return esock_cancel_send(env, descP, sockRef, opRef);
- } else if (COMPARE(op, esock_atom_sendto) == 0) {
- return esock_cancel_send(env, descP, sockRef, opRef);
- } else if (COMPARE(op, esock_atom_sendmsg) == 0) {
- return esock_cancel_send(env, descP, sockRef, opRef);
- } else if (COMPARE(op, esock_atom_recv) == 0) {
- return esock_cancel_recv(env, descP, sockRef, opRef);
- } else if (COMPARE(op, esock_atom_recvfrom) == 0) {
- return esock_cancel_recv(env, descP, sockRef, opRef);
- } else if (COMPARE(op, esock_atom_recvmsg) == 0) {
+
+ /* Hand crafted binary search */
+ if ((cmp = COMPARE(op, esock_atom_recvmsg)) == 0)
return esock_cancel_recv(env, descP, sockRef, opRef);
+ if (cmp < 0) {
+ if ((cmp = COMPARE(op, esock_atom_recv)) == 0)
+ return esock_cancel_recv(env, descP, sockRef, opRef);
+ if (cmp < 0) {
+ if (COMPARE(op, esock_atom_connect) == 0)
+ return esock_cancel_connect(env, descP, opRef);
+ if (COMPARE(op, esock_atom_accept) == 0)
+ return esock_cancel_accept(env, descP, sockRef, opRef);
+ } else {
+ if (COMPARE(op, esock_atom_recvfrom) == 0)
+ return esock_cancel_recv(env, descP, sockRef, opRef);
+ }
} else {
+ if ((cmp = COMPARE(op, esock_atom_sendmsg)) == 0)
+ return esock_cancel_send(env, descP, sockRef, opRef);
+ if (cmp < 0) {
+ if (COMPARE(op, esock_atom_send) == 0)
+ return esock_cancel_send(env, descP, sockRef, opRef);
+ if (COMPARE(op, atom_sendfile) == 0)
+ return esock_cancel_send(env, descP, sockRef, opRef);
+ } else {
+ if (COMPARE(op, esock_atom_sendto) == 0)
+ return esock_cancel_send(env, descP, sockRef, opRef);
+ }
+ }
+
+ {
ERL_NIF_TERM result;
const char *reason;
@@ -11718,8 +12536,8 @@ ERL_NIF_TERM esock_cancel_accept(ErlNifEnv* env,
"\r\n",
sockRef, descP->sock, descP->readState,
opRef,
- ((descP->currentAcceptorP == NULL) ?
- "without acceptor" : "with acceptor")) );
+ ((descP->currentAcceptorP == NULL)
+ ? "without acceptor" : "with acceptor")) );
if (! IS_OPEN(descP->readState)) {
@@ -11780,7 +12598,7 @@ ERL_NIF_TERM esock_cancel_accept_current(ErlNifEnv* env,
descP->readState &= ~ESOCK_STATE_ACCEPTING;
- descP->currentAcceptorP = NULL;
+ descP->currentAcceptorP = NULL;
}
return res;
@@ -11837,7 +12655,8 @@ ERL_NIF_TERM esock_cancel_send(ErlNifEnv* env,
"\r\n",
sockRef, descP->sock, descP->writeState,
opRef,
- ((descP->currentWriterP == NULL) ? "without writer" : "with writer")) );
+ ((descP->currentWriterP == NULL)
+ ? "without writer" : "with writer")) );
if (! IS_OPEN(descP->writeState)) {
@@ -11952,7 +12771,8 @@ ERL_NIF_TERM esock_cancel_recv(ErlNifEnv* env,
"\r\n",
sockRef, descP->sock, descP->readState,
opRef,
- ((descP->currentReaderP == NULL) ? "without reader" : "with reader")) );
+ ((descP->currentReaderP == NULL)
+ ? "without reader" : "with reader")) );
if (! IS_OPEN(descP->readState)) {
@@ -12335,7 +13155,7 @@ ERL_NIF_TERM send_check_fail(ErlNifEnv* env,
if (descP->currentWriterP != NULL) {
requestor_release("send_check_fail",
- env, descP, descP->currentWriterP);
+ env, descP, &descP->currentWriter);
send_error_waiting_writers(env, descP, sockRef, reason);
@@ -12457,7 +13277,7 @@ ERL_NIF_TERM send_check_retry(ErlNifEnv* env,
descP->currentWriter.env = esock_alloc_env("current-writer");
descP->currentWriter.ref =
CP_TERM(descP->currentWriter.env, sendRef);
- descP->currentWriterP = &descP->currentWriter;
+ descP->currentWriterP = &descP->currentWriter;
} else {
/* Overwrite current writer registration */
enif_clear_env(descP->currentWriter.env);
@@ -12475,7 +13295,19 @@ ERL_NIF_TERM send_check_retry(ErlNifEnv* env,
sres = esock_select_write(env, descP->sock, descP, NULL, sockRef, sendRef);
if (sres < 0) {
+ ERL_NIF_TERM reason;
+
/* Internal select error */
+ DEMONP("send_check_retry - select error",
+ env, descP, &descP->currentWriter.mon);
+
+ /* Fail all queued writers */
+ reason = MKT2(env, atom_select_write, MKI(env, sres));
+ requestor_release("send_check_retry - select error",
+ env, descP, &descP->currentWriter);
+ send_error_waiting_writers(env, descP, sockRef, reason);
+ descP->currentWriterP = NULL;
+
res =
enif_raise_exception(env,
MKT2(env, atom_select_write,
@@ -12607,13 +13439,11 @@ void recv_init_current_reader(ErlNifEnv* env,
* as the new current reader and a new (read) select will be done.
*/
#ifndef __WIN32__
-static
-ERL_NIF_TERM recv_update_current_reader(ErlNifEnv* env,
- ESockDescriptor* descP,
- ERL_NIF_TERM sockRef)
+static void
+recv_update_current_reader(ErlNifEnv* env,
+ ESockDescriptor* descP,
+ ERL_NIF_TERM sockRef)
{
- ERL_NIF_TERM res = esock_atom_ok;
-
if (descP->currentReaderP != NULL) {
DEMONP("recv_update_current_reader",
@@ -12628,10 +13458,7 @@ ERL_NIF_TERM recv_update_current_reader(ErlNifEnv* env,
descP->currentReaderP = NULL;
}
-
}
-
- return res;
}
#endif // #ifndef __WIN32__
@@ -12654,7 +13481,7 @@ void recv_error_current_reader(ErlNifEnv* env,
ESockRequestor req;
requestor_release("recv_error_current_reader",
- env, descP, descP->currentReaderP);
+ env, descP, &descP->currentReader);
req.env = NULL; /* read by reader_pop before free */
while (reader_pop(env, descP, &req)) {
@@ -13077,9 +13904,11 @@ ERL_NIF_TERM recv_check_retry(ErlNifEnv* env,
if ((sres = esock_select_read(env, descP->sock, descP, NULL,
sockRef, recvRef)) < 0) {
- /* Ouch
- * Now what? We have copied ref into *its own* environment!
+ /* Unlikely that any next reader will have better luck,
+ * but why not give them a shot - the queue will be cleared
*/
+ recv_update_current_reader(env, descP, sockRef);
+
res =
enif_raise_exception(env,
MKT2(env, atom_select_read,
@@ -13240,6 +14069,10 @@ ERL_NIF_TERM recv_check_partial_part(ErlNifEnv* env,
sres = esock_select_read(env, descP->sock, descP, NULL,
sockRef, recvRef);
if (sres < 0) {
+ /* Unlikely that any next reader will have better luck,
+ * but why not give them a shot - the queue will be cleared
+ */
+ recv_update_current_reader(env, descP, sockRef);
res =
enif_raise_exception(env,
@@ -15242,7 +16075,7 @@ ESockDescriptor* alloc_descriptor(SOCKET sock, HANDLE event)
enif_alloc_resource(esocks, sizeof(ESockDescriptor)))
!= NULL );
- descP->pattern = ESOCK_DESC_PATTERN_CREATED;
+ descP->pattern = ESOCK_DESC_PATTERN_CREATED;
requestor_init(&descP->connector);
descP->connectorP = NULL;
@@ -15254,13 +16087,19 @@ ESockDescriptor* alloc_descriptor(SOCKET sock, HANDLE event)
descP->currentWriterP = NULL; // currentWriter not used
descP->writersQ.first = NULL;
descP->writersQ.last = NULL;
- descP->writePkgCnt = 0;
- descP->writePkgMax = 0;
- descP->writePkgMaxCnt = 0;
- descP->writeByteCnt = 0;
- descP->writeTries = 0;
- descP->writeWaits = 0;
- descP->writeFails = 0;
+
+ descP->writePkgCnt = 0;
+ descP->writePkgMax = 0;
+ descP->writePkgMaxCnt = 0;
+ descP->writeByteCnt = 0;
+ descP->writeTries = 0;
+ descP->writeWaits = 0;
+ descP->writeFails = 0;
+
+#ifdef HAVE_SENDFILE
+ descP->sendfileSock = INVALID_SOCKET;
+ descP->sendfileCountersP = NULL;
+#endif
sprintf(buf, "esock.r[%d]", sock);
descP->readMtx = MCREATE(buf);
@@ -15269,6 +16108,7 @@ ESockDescriptor* alloc_descriptor(SOCKET sock, HANDLE event)
descP->currentReaderP = NULL; // currentReader not used
descP->readersQ.first = NULL;
descP->readersQ.last = NULL;
+
descP->readPkgCnt = 0;
descP->readPkgMax = 0;
descP->readPkgMaxCnt = 0;
@@ -15276,6 +16116,7 @@ ESockDescriptor* alloc_descriptor(SOCKET sock, HANDLE event)
descP->readTries = 0;
descP->readWaits = 0;
descP->readFails = 0;
+
sprintf(buf, "esock.acc[%d]", sock);
requestor_init(&descP->currentAcceptor);
descP->currentAcceptorP = NULL; // currentAcceptor not used
@@ -15679,6 +16520,27 @@ void esock_send_close_msg(ErlNifEnv* env,
sockRef, descP->sock, MKPID(env, pid), msg) );
}
}
+#ifdef HAVE_SENDFILE
+static void
+esock_send_sendfile_deferred_close_msg(ErlNifEnv* env,
+ ESockDescriptor* descP)
+{
+ ERL_NIF_TERM sockRef, msg;
+ ErlNifPid *pid;
+
+ pid = &data.regPid;
+ sockRef = enif_make_resource(env, descP);
+ msg = mk_reg_msg(env, atom_sendfile_deferred_close, sockRef);
+
+ /* If this send should fail we have leaked a file descriptor
+ * (intolerable), and if we try to close it here, on a regular
+ * scheduler, it might hang "forever" due to e.g NFS
+ * (out of the question), so terminating the VM
+ * is the only viable option
+ */
+ ESOCK_ASSERT( esock_send_msg(env, pid, msg, NULL) );
+}
+#endif // #ifdef HAVE_SENDFILE
#endif // #ifndef __WIN32__
@@ -16293,7 +17155,7 @@ static void requestor_init(ESockRequestor* reqP) {
static int requestor_release(const char* slogan,
ErlNifEnv* env,
ESockDescriptor* descP,
- ESockRequestor* reqP) {
+ ESockRequestor* reqP) {
int res;
enif_set_pid_undefined(&reqP->pid);
@@ -16626,6 +17488,13 @@ void esock_dtor(ErlNifEnv* env, void* obj)
SGDBG( ("SOCKET", "dtor -> try free acceptors request queue\r\n") );
free_request_queue(&descP->acceptorsQ);
+#ifdef HAVE_SENDFILE
+ ESOCK_ASSERT( descP->sendfileSock == INVALID_SOCKET );
+ if (descP->sendfileCountersP != NULL)
+ FREE(descP->sendfileCountersP);
+ descP->sendfileCountersP = NULL;
+#endif
+
esock_free_env("dtor close env", descP->closeEnv);
descP->closeEnv = NULL;
@@ -16676,44 +17545,70 @@ void esock_stop(ErlNifEnv* env, void* obj, ErlNifEvent fd, int is_direct_call)
"\r\n ctrlPid: %T"
"\r\n closerPid: %T"
"\r\ncounters:"
- "\r\n writePkgCnt: %u"
- "\r\n writePkgMax: %u"
- "\r\n writeByteCnt: %u"
- "\r\n writeTries: %u"
- "\r\n writeWaits: %u"
- "\r\n writeFails: %u"
- "\r\n readPkgCnt: %u"
- "\r\n readPkgMax: %u"
- "\r\n readByteCnt: %u"
- "\r\n readTries: %u"
- "\r\n readWaits: %u"
- "\r\n accSuccess: %u"
- "\r\n accTries: %u"
- "\r\n accWaits: %u"
- "\r\n accFails: %u"
+ "\r\n writePkgCnt: %lu"
+ "\r\n writePkgMax: %lu"
+ "\r\n writeByteCnt: %lu"
+ "\r\n writeTries: %lu"
+ "\r\n writeWaits: %lu"
+ "\r\n writeFails: %lu"
+ "\r\n readPkgCnt: %lu"
+ "\r\n readPkgMax: %lu"
+ "\r\n readByteCnt: %lu"
+ "\r\n readTries: %lu"
+ "\r\n readWaits: %lu"
+ "\r\n accSuccess: %lu"
+ "\r\n accTries: %lu"
+ "\r\n accWaits: %lu"
+ "\r\n accFails: %lu"
"\r\n",
descP->sock, fd,
(is_direct_call) ? "called" : "scheduled",
descP->ctrlPid,
descP->closerPid,
//
- descP->writePkgCnt,
- descP->writePkgMax,
- descP->writeByteCnt,
- descP->writeTries,
- descP->writeWaits,
- descP->writeFails,
+ (unsigned long) descP->writePkgCnt,
+ (unsigned long) descP->writePkgMax,
+ (unsigned long) descP->writeByteCnt,
+ (unsigned long) descP->writeTries,
+ (unsigned long) descP->writeWaits,
+ (unsigned long) descP->writeFails,
+ (unsigned long) descP->readPkgCnt,
+ (unsigned long) descP->readPkgMax,
+ (unsigned long) descP->readByteCnt,
+ (unsigned long) descP->readTries,
+ (unsigned long) descP->readWaits,
//
- descP->readPkgCnt,
- descP->readPkgMax,
- descP->readByteCnt,
- descP->readTries,
- descP->readWaits,
- //
- descP->accSuccess,
- descP->accTries,
- descP->accWaits,
- descP->accFails) );
+ (unsigned long) descP->accSuccess,
+ (unsigned long) descP->accTries,
+ (unsigned long) descP->accWaits,
+ (unsigned long) descP->accFails) );
+
+#ifdef HAVE_SENDFILE
+ if (descP->sendfileCountersP != NULL) {
+ ESockSendfileCounters *cP = descP->sendfileCountersP;
+
+ SSDBG( descP, ("SOCKET", "esock_stop {%d/%d} ->"
+ "\r\nsendfileCounters:"
+ "\r\n cnt: %lu"
+ "\r\n byteCnt: %lu"
+ "\r\n fails: %lu"
+ "\r\n max: %lu"
+ "\r\n pkg: %lu"
+ "\r\n pkgMax %lu"
+ "\r\n tries: %lu"
+ "\r\n waits: %lu"
+ "\r\n",
+ descP->sock, fd,
+ (unsigned long) cP->cnt,
+ (unsigned long) cP->byteCnt,
+ (unsigned long) cP->fails,
+ (unsigned long) cP->max,
+ (unsigned long) cP->pkg,
+ (unsigned long) cP->pkgMax,
+ (unsigned long) cP->tries,
+ (unsigned long) cP->waits) );
+ }
+#endif
if (is_direct_call)
return; // Nothing to do, caller gets ERL_NIF_SELECT_STOP_SCHEDULED
@@ -16747,6 +17642,11 @@ void esock_stop(ErlNifEnv* env, void* obj, ErlNifEvent fd, int is_direct_call)
/* We do not have a closer process
* - have to do an unclean (non blocking) close */
+#ifdef HAVE_SENDFILE
+ if (descP->sendfileSock != INVALID_SOCKET)
+ esock_send_sendfile_deferred_close_msg(env, descP);
+#endif
+
err = esock_close_socket(env, descP, FALSE);
if (err != 0)
@@ -16924,6 +17824,11 @@ void esock_down(ErlNifEnv* env,
* - we have to do an unclean (non blocking) socket close here
*/
+#ifdef HAVE_SENDFILE
+ if (descP->sendfileSock != INVALID_SOCKET)
+ esock_send_sendfile_deferred_close_msg(env, descP);
+#endif
+
err = esock_close_socket(env, descP, FALSE);
if (err != 0)
esock_warning_msg("Failed closing socket for terminating "
@@ -16985,11 +17890,9 @@ void esock_down(ErlNifEnv* env,
*/
requestor_release("esock_down->connector",
- env, descP, descP->connectorP);
-
- descP->writeState &= ~ESOCK_STATE_CONNECTING;
-
+ env, descP, &descP->connector);
descP->connectorP = NULL;
+ descP->writeState &= ~ESOCK_STATE_CONNECTING;
} else {
ERL_NIF_TERM sockRef;
@@ -17064,6 +17967,11 @@ void esock_down_ctrl(ErlNifEnv* env,
* - we have to do an unclean (non blocking) socket close here
*/
+#ifdef HAVE_SENDFILE
+ if (descP->sendfileSock != INVALID_SOCKET)
+ esock_send_sendfile_deferred_close_msg(env, descP);
+#endif
+
err = esock_close_socket(env, descP, FALSE);
if (err != 0)
esock_warning_msg("Failed closing socket for terminating "
@@ -17159,7 +18067,7 @@ void esock_down_writer(ErlNifEnv* env,
"esock_down_writer(%T) {%d} -> no active writer\r\n",
sockRef, descP->sock) );
- descP->currentWriterP = NULL;
+ descP->currentWriterP = NULL;
}
} else {
@@ -17209,7 +18117,7 @@ void esock_down_reader(ErlNifEnv* env,
"esock_down_reader(%T) {%d} -> no more readers\r\n",
sockRef, descP->sock) );
- descP->currentReaderP = NULL;
+ descP->currentReaderP = NULL;
}
} else {
@@ -17255,6 +18163,9 @@ ErlNifFunc esock_funcs[] =
{"nif_send", 4, nif_send, 0},
{"nif_sendto", 5, nif_sendto, 0},
{"nif_sendmsg", 5, nif_sendmsg, 0},
+ {"nif_sendfile", 5, nif_sendfile, ERL_NIF_DIRTY_JOB_IO_BOUND},
+ {"nif_sendfile", 4, nif_sendfile, ERL_NIF_DIRTY_JOB_IO_BOUND},
+ {"nif_sendfile", 1, nif_sendfile, ERL_NIF_DIRTY_JOB_IO_BOUND},
{"nif_recv", 4, nif_recv, 0},
{"nif_recvfrom", 4, nif_recvfrom, 0},
{"nif_recvmsg", 5, nif_recvmsg, 0},
diff --git a/erts/emulator/nifs/common/socket_util.c b/erts/emulator/nifs/common/socket_util.c
index 61ea615de8..d03a593238 100644
--- a/erts/emulator/nifs/common/socket_util.c
+++ b/erts/emulator/nifs/common/socket_util.c
@@ -1,7 +1,7 @@
/*
* %CopyrightBegin%
*
- * Copyright Ericsson AB 2018-2020. All Rights Reserved.
+ * Copyright Ericsson AB 2018-2021. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -1754,7 +1754,7 @@ ERL_NIF_TERM esock_make_error_errno(ErlNifEnv* env, int err)
/* Create an error two (2) tuple in the form:
*
- * {error, {invalid, {What, Info}}}
+ * {error, {invalid, What}}}
*/
extern
ERL_NIF_TERM esock_make_error_invalid(ErlNifEnv* env, ERL_NIF_TERM what)
diff --git a/erts/emulator/nifs/unix/unix_prim_file.c b/erts/emulator/nifs/unix/unix_prim_file.c
index 1efde4c0cf..770dcf91ab 100644
--- a/erts/emulator/nifs/unix/unix_prim_file.c
+++ b/erts/emulator/nifs/unix/unix_prim_file.c
@@ -1,7 +1,7 @@
/*
* %CopyrightBegin%
*
- * Copyright Ericsson 2017-2020. All Rights Reserved.
+ * Copyright Ericsson 2017-2021. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -95,16 +95,22 @@ posix_errno_t efile_marshal_path(ErlNifEnv *env, ERL_NIF_TERM path, efile_path_t
return 0;
}
-ERL_NIF_TERM efile_get_handle(ErlNifEnv *env, efile_data_t *d) {
+posix_errno_t efile_get_handle(ErlNifEnv *env, efile_data_t *d, int do_dup, ERL_NIF_TERM *handle) {
efile_unix_t *u = (efile_unix_t*)d;
- ERL_NIF_TERM result;
unsigned char *bits;
+ int fd;
- bits = enif_make_new_binary(env, sizeof(u->fd), &result);
- memcpy(bits, &u->fd, sizeof(u->fd));
+ if (do_dup) {
+ if ((fd = dup(u->fd)) < 0)
+ return errno;
+ } else {
+ fd = u->fd;
+ }
+ bits = enif_make_new_binary(env, sizeof(fd), handle);
+ memcpy(bits, &fd, sizeof(fd));
- return result;
+ return 0;
}
static int open_file_is_dir(const efile_path_t *path, int fd) {
diff --git a/erts/emulator/nifs/win32/win_prim_file.c b/erts/emulator/nifs/win32/win_prim_file.c
index 72e1e30543..8307e03f8a 100644
--- a/erts/emulator/nifs/win32/win_prim_file.c
+++ b/erts/emulator/nifs/win32/win_prim_file.c
@@ -1,7 +1,7 @@
/*
* %CopyrightBegin%
*
- * Copyright Ericsson 2017-2020. All Rights Reserved.
+ * Copyright Ericsson 2017-2021. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -216,16 +216,19 @@ posix_errno_t efile_marshal_path(ErlNifEnv *env, ERL_NIF_TERM path, efile_path_t
return get_full_path(env, (WCHAR*)raw_path.data, result);
}
-ERL_NIF_TERM efile_get_handle(ErlNifEnv *env, efile_data_t *d) {
+posix_errno_t efile_get_handle(ErlNifEnv *env, efile_data_t *d, int do_dup, ERL_NIF_TERM *handle) {
efile_win_t *w = (efile_win_t*)d;
- ERL_NIF_TERM result;
unsigned char *bits;
- bits = enif_make_new_binary(env, sizeof(w->handle), &result);
+ if (do_dup) {
+ /* XXX not implemented yet */
+ return ENOTSUP;
+ }
+ bits = enif_make_new_binary(env, sizeof(w->handle), handle);
memcpy(bits, &w->handle, sizeof(w->handle));
- return result;
+ return 0;
}
/** @brief Converts a native path to the preferred form in "erlang space,"
diff --git a/erts/preloaded/ebin/prim_file.beam b/erts/preloaded/ebin/prim_file.beam
index fc6e1ece14..9597015bb6 100644
--- a/erts/preloaded/ebin/prim_file.beam
+++ b/erts/preloaded/ebin/prim_file.beam
Binary files differ
diff --git a/erts/preloaded/ebin/prim_socket.beam b/erts/preloaded/ebin/prim_socket.beam
index 5f74928eee..a31c87efd3 100644
--- a/erts/preloaded/ebin/prim_socket.beam
+++ b/erts/preloaded/ebin/prim_socket.beam
Binary files differ
diff --git a/erts/preloaded/src/prim_file.erl b/erts/preloaded/src/prim_file.erl
index 3a6a573b89..9e9b2ef5c2 100644
--- a/erts/preloaded/src/prim_file.erl
+++ b/erts/preloaded/src/prim_file.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 2000-2020. All Rights Reserved.
+%% Copyright Ericsson AB 2000-2021. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@@ -30,7 +30,8 @@
-export([file_desc_to_ref/2]).
--export([ipread_s32bu_p32bu/3, sendfile/8, altname/1, get_handle/1]).
+-export([ipread_s32bu_p32bu/3, sendfile/8, altname/1,
+ get_handle/1, get_fd_data/1]).
-export([read_file/1, write_file/2]).
diff --git a/erts/preloaded/src/prim_socket.erl b/erts/preloaded/src/prim_socket.erl
index d815eea84e..45c2e79478 100644
--- a/erts/preloaded/src/prim_socket.erl
+++ b/erts/preloaded/src/prim_socket.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 2018-2020. All Rights Reserved.
+%% Copyright Ericsson AB 2018-2021. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@@ -36,6 +36,7 @@
listen/2,
accept/2,
send/4, sendto/4, sendto/5, sendmsg/4, sendmsg/5,
+ sendfile/4, sendfile/5, sendfile_deferred_close/1,
recv/4, recvfrom/4, recvmsg/5,
close/1, finalize_close/1,
shutdown/2,
@@ -631,6 +632,15 @@ invalid_iov([H|IOV], N) ->
invalid_iov(_, N) ->
{improper_list, N}.
+sendfile(SockRef, Offset, Count, SendRef) ->
+ nif_sendfile(SockRef, SendRef, Offset, Count).
+
+sendfile(SockRef, FileRef, Offset, Count, SendRef) ->
+ nif_sendfile(SockRef, SendRef, Offset, Count, FileRef).
+
+sendfile_deferred_close(SockRef) ->
+ nif_sendfile(SockRef).
+
%% ----------------------------------
recv(SockRef, Length, Flags, RecvRef) ->
@@ -971,7 +981,7 @@ p_get(Name) ->
%%
nif_info() -> erlang:nif_error(undef).
-nif_info(_SRef) -> erlang:nif_error(undef).
+nif_info(_SockRef) -> erlang:nif_error(undef).
nif_command(_Command) -> erlang:nif_error(undef).
@@ -981,35 +991,42 @@ nif_supports(_Key) -> erlang:nif_error(undef).
nif_open(_FD, _Opts) -> erlang:nif_error(undef).
nif_open(_Domain, _Type, _Protocol, _Opts) -> erlang:nif_error(undef).
-nif_bind(_SRef, _SockAddr) -> erlang:nif_error(undef).
-nif_bind(_SRef, _SockAddrs, _Action) -> erlang:nif_error(undef).
+nif_bind(_SockRef, _SockAddr) -> erlang:nif_error(undef).
+nif_bind(_SockRef, _SockAddrs, _Action) -> erlang:nif_error(undef).
-nif_connect(_SRef) -> erlang:nif_error(undef).
-nif_connect(_SRef, _ConnectRef, _SockAddr) -> erlang:nif_error(undef).
+nif_connect(_SockRef) -> erlang:nif_error(undef).
+nif_connect(_SockRef, _ConnectRef, _SockAddr) -> erlang:nif_error(undef).
-nif_listen(_SRef, _Backlog) -> erlang:nif_error(undef).
+nif_listen(_SockRef, _Backlog) -> erlang:nif_error(undef).
-nif_accept(_SRef, _Ref) -> erlang:nif_error(undef).
+nif_accept(_SockRef, _Ref) -> erlang:nif_error(undef).
nif_send(_SockRef, _Bin, _Flags, _SendRef) -> erlang:nif_error(undef).
-nif_sendto(_SRef, _Bin, _Dest, _Flags, _SendRef) -> erlang:nif_error(undef).
-nif_sendmsg(_SRef, _Msg, _Flags, _SendRef, _IOV) -> erlang:nif_error(undef).
+nif_sendto(_SockRef, _Bin, _Dest, _Flags, _SendRef) -> erlang:nif_error(undef).
+nif_sendmsg(_SockRef, _Msg, _Flags, _SendRef, _IOV) -> erlang:nif_error(undef).
+
+nif_sendfile(_SockRef, _SendRef, _Offset, _Count, _InFileRef) ->
+ erlang:nif_error(undef).
+nif_sendfile(_SockRef, _SendRef, _Offset, _Count) ->
+ erlang:nif_error(undef).
+nif_sendfile(_SockRef) -> erlang:nif_error(undef).
-nif_recv(_SRef, _Length, _Flags, _RecvRef) -> erlang:nif_error(undef).
-nif_recvfrom(_SRef, _Length, _Flags, _RecvRef) -> erlang:nif_error(undef).
-nif_recvmsg(_SRef, _BufSz, _CtrlSz, _Flags, _RecvRef) -> erlang:nif_error(undef).
+nif_recv(_SockRef, _Length, _Flags, _RecvRef) -> erlang:nif_error(undef).
+nif_recvfrom(_SockRef, _Length, _Flags, _RecvRef) -> erlang:nif_error(undef).
+nif_recvmsg(_SockRef, _BufSz, _CtrlSz, _Flags, _RecvRef) ->
+ erlang:nif_error(undef).
-nif_close(_SRef) -> erlang:nif_error(undef).
-nif_finalize_close(_SRef) -> erlang:nif_error(undef).
-nif_shutdown(_SRef, _How) -> erlang:nif_error(undef).
+nif_close(_SockRef) -> erlang:nif_error(undef).
+nif_finalize_close(_SockRef) -> erlang:nif_error(undef).
+nif_shutdown(_SockRef, _How) -> erlang:nif_error(undef).
-nif_setopt(_Ref, _Lev, _Opt, _Val, _NativeVal) -> erlang:nif_error(undef).
-nif_getopt(_Ref, _Lev, _Opt) -> erlang:nif_error(undef).
-nif_getopt(_Ref, _Lev, _Opt, _ValSpec) -> erlang:nif_error(undef).
+nif_setopt(_SockRef, _Lev, _Opt, _Val, _NativeVal) -> erlang:nif_error(undef).
+nif_getopt(_SockRef, _Lev, _Opt) -> erlang:nif_error(undef).
+nif_getopt(_SockRef, _Lev, _Opt, _ValSpec) -> erlang:nif_error(undef).
-nif_sockname(_Ref) -> erlang:nif_error(undef).
-nif_peername(_Ref) -> erlang:nif_error(undef).
+nif_sockname(_SockRef) -> erlang:nif_error(undef).
+nif_peername(_SockRef) -> erlang:nif_error(undef).
-nif_cancel(_SRef, _Op, _Ref) -> erlang:nif_error(undef).
+nif_cancel(_SockRef, _Op, _SelectRef) -> erlang:nif_error(undef).
%% ===========================================================================
diff --git a/lib/kernel/doc/src/file.xml b/lib/kernel/doc/src/file.xml
index 7332f0ca3b..f6ef1892c5 100644
--- a/lib/kernel/doc/src/file.xml
+++ b/lib/kernel/doc/src/file.xml
@@ -4,7 +4,7 @@
<erlref>
<header>
<copyright>
- <year>1996</year><year>2020</year>
+ <year>1996</year><year>2021</year>
<holder>Ericsson AB. All Rights Reserved.</holder>
</copyright>
<legalnotice>
@@ -1857,7 +1857,13 @@ f.txt: {person, "kalle", 25}.
<c>0</c> all data after the specified <c>Offset</c> is sent.</p>
<p>The file used must be opened using the <c>raw</c> flag, and the process
calling <c>sendfile</c> must be the controlling process of the socket.
- See <seemfa marker="gen_tcp#controlling_process/2"><c>gen_tcp:controlling_process/2</c></seemfa>.</p>
+ See <seemfa marker="gen_tcp#controlling_process/2"><c>gen_tcp:controlling_process/2</c></seemfa>
+ or module
+ <seemfa marker="socket#setopt/3"><c>socket</c>'s</seemfa>
+ <seetype marker="socket#otp_socket_option">
+ level <c>otp</c> socket option
+ </seetype>
+ <c>controlling_process</c>.</p>
<p>If the OS used does not support non-blocking <c>sendfile</c>, an
Erlang fallback using <seemfa marker="#read/2"><c>read/2</c></seemfa>
and <seemfa marker="gen_tcp#send/2"><c>gen_tcp:send/2</c></seemfa> is
diff --git a/lib/kernel/doc/src/socket.xml b/lib/kernel/doc/src/socket.xml
index 96bcfa32fa..57752fb912 100644
--- a/lib/kernel/doc/src/socket.xml
+++ b/lib/kernel/doc/src/socket.xml
@@ -63,12 +63,16 @@
<item><c>{'$socket', socket(), select, SelectHandle}</c></item>
</taglist>
<p>
- The caller can now make new call
- to the recv function and expect data.
+ The caller can now call the <c>recv</c> function again
+ and probably expect data
+ (it is really up to the OS network protocol implementation).
</p>
<p>
Note that all other users are <em>locked out</em> until the
- 'current user' has called the function (recv in this case).
+ 'current user' has called the function (<c>recv</c> in this case)
+ and its return value shows that the operation has completed.
+ An operation can also be cancelled with
+ <seemfa marker="#cancel/2"><c>cancel/2</c></seemfa>.
</p>
<p>
Instead of <c>Timeout = nowait</c> it is equivalent to create a
@@ -160,13 +164,17 @@
</desc>
</datatype>
<datatype>
- <name>socket()</name>
+ <name name="socket"/>
<desc><p>As returned by
<seemfa marker="#open/1"><c>open/1,2,3,4</c></seemfa> and
<seemfa marker="#accept/1"><c>accept/1,2</c></seemfa>.</p>
</desc>
</datatype>
<datatype>
+ <name name="socket_handle"/>
+ <desc><p>An opaque socket handle unique for the socket.</p></desc>
+ </datatype>
+ <datatype>
<name name="select_tag"/>
<desc>
<p>
@@ -1374,6 +1382,12 @@
<c>select_handle()</c>
</seetype> generated by the call.
</p>
+ <p>
+ If the caller doesn't want to wait for a connection,
+ it must immediately call
+ <seemfa marker="#cancel/2"><c>cancel/2</c></seemfa>
+ to cancel the operation.
+ </p>
</desc>
</func>
@@ -1415,6 +1429,19 @@
asynchronous call to, e.g.
<seemfa marker="#recv/3"><c>recv/3</c></seemfa>.
</p>
+ <p>
+ An ongoing asynchronous operation blocks the socket
+ until the operation has been finished in good order,
+ or until it has been cancelled by this function.
+ </p>
+ <p>
+ Any other process that tries an operation
+ of the same basic type (accept / send / recv) will be
+ enqueued and notified with the regular <c>select</c>
+ mechanism for asynchronous operations
+ when the current operation and all enqueued before it
+ has been completed.
+ </p>
</desc>
</func>
@@ -1534,6 +1561,12 @@
<c>select_handle()</c>
</seetype> generated by the call.
</p>
+ <p>
+ If the caller doesn't want to wait for
+ the connection to complete, it must immediately call
+ <seemfa marker="#cancel/2"><c>cancel/2</c></seemfa>
+ to cancel the operation.
+ </p>
</desc>
</func>
@@ -2106,7 +2139,7 @@
<c>{ok, {<anno>Data</anno>, <anno>SelectInfo</anno></c>
</seetype>
with partial data. If the caller doesn't want to wait
- for more data, it must call
+ for more data, it must immediately call
<seemfa marker="#cancel/2"><c>cancel/2</c></seemfa>
to cancel the operation.
</p>
@@ -2236,6 +2269,12 @@
<c>select_handle()</c>
</seetype> generated by the call.
</p>
+ <p>
+ If the caller doesn't want to wait for the data,
+ it must immediately call
+ <seemfa marker="#cancel/2"><c>cancel/2</c></seemfa>
+ to cancel the operation.
+ </p>
</desc>
</func>
@@ -2370,6 +2409,12 @@
<c>select_handle()</c>
</seetype> generated by the call.
</p>
+ <p>
+ If the caller doesn't want to wait for the data,
+ it must immediately call
+ <seemfa marker="#cancel/2"><c>cancel/2</c></seemfa>
+ to cancel the operation.
+ </p>
</desc>
</func>
@@ -2512,7 +2557,7 @@
<desc>
<p>
Sends data on a connected socket,
- but return a select continuation if the data
+ but returns a select continuation if the data
could not be sent immediately.
</p>
<p>
@@ -2561,7 +2606,7 @@
which can only happen for a socket of
<seetype marker="#type">type <c>stream</c></seetype>.
If the caller does not want to wait to send the rest of the data,
- it should cancel the operation with
+ it should immediately cancel the operation with
<seemfa marker="#cancel/2"><c>cancel/2</c></seemfa>.
</p>
</desc>
@@ -2709,7 +2754,7 @@
<desc>
<p>
Sends a message on a socket,
- but return a select continuation if the data
+ but returns a select continuation if the data
could not be sent immediately.
</p>
<p>
@@ -2758,7 +2803,7 @@
which can only happen for a socket of
<seetype marker="#type">type <c>stream</c></seetype>.
If the caller does not want to wait to send the rest of the data,
- it should cancel the operation with
+ it should immediately cancel the operation with
<seemfa marker="#cancel/2"><c>cancel/2</c></seemfa>.
</p>
</desc>
@@ -2876,7 +2921,7 @@
<desc>
<p>
Sends data on a socket,
- but return a select continuation if the data
+ but returns a select continuation if the data
could not be sent immediately.
</p>
<p>
@@ -2884,7 +2929,7 @@
<seeerl marker="#sendto-infinity">
infinity time-out <c>sendto/3,4</c>
</seeerl>
- but if the data is not be immediately accepted
+ but if the data is not immediately accepted
by the platform network layer,
the function returns
<seetype marker="#select_info"><c>{select,&nbsp;<anno>SelectInfo</anno>}</c></seetype>,
@@ -2926,7 +2971,7 @@
which can only happen for a socket of
<seetype marker="#type">type <c>stream</c></seetype>.
If the caller does not want to wait to send the rest of the data,
- it should cancel the operation with
+ it should immediately cancel the operation with
<seemfa marker="#cancel/2"><c>cancel/2</c></seemfa>.
</p>
</desc>
@@ -2980,6 +3025,308 @@
</func>
<func>
+ <name name="sendfile" arity="5" clause_i="7"
+ since="OTP @OTP-17154@"
+ anchor="sendfile-infinity"/>
+ <fsummary>
+ Send file data on a socket, with "infinite" time-out.
+ </fsummary>
+ <desc>
+ <p>
+ Sends file data on a socket, to the specified destination,
+ waiting for it to be sent (<em>"infinite" time-out</em>).
+ </p>
+ <p>
+ The <c><anno>FileHandle</anno></c> must refer to
+ an open raw file as described in
+ <seemfa marker="file#open/2"><c>file:open/2</c></seemfa>.
+ </p>
+ <p>
+ This call will not return until the data has been accepted
+ by the platform's network layer, or it reports an error.
+ </p>
+ <p>
+ The <c>Offset</c> argument is the file offset
+ to start reading from. The default value is <c>0</c>.
+ </p>
+ <p>
+ The <c>Count</c> argument is the number of bytes
+ to transfer from <c><anno>FileHandle</anno></c>
+ to <c><anno>Socket</anno></c>.
+ If <c>Count&nbsp;=:=&nbsp;0</c> (the default)
+ the transfer stops at the end of file.
+ </p>
+ <p>
+ The return value indicates the result from
+ the platform's network layer:
+ </p>
+ <taglist>
+ <tag><c>{ok,&nbsp;<anno>BytesSent</anno>}</c></tag>
+ <item>
+ <p>
+ The transfer completed succesfully after
+ <c><anno>BytesSent</anno></c> bytes of data.
+ </p>
+ </item>
+ <tag><c>{error,&nbsp;<anno>Reason</anno>}</c></tag>
+ <item>
+ <p>
+ An error has been reported and no data has been transferred.
+ The <seetype marker="#posix"><c>posix()</c></seetype>
+ <c><anno>Reason</anno>s</c> are from the
+ platform's network layer.
+ <c>closed</c> means that this socket library
+ knows that the socket is closed, and
+ <seetype marker="#invalid"><c>invalid()</c></seetype>
+ means that something about an argument is invalid.
+ </p>
+ </item>
+ <tag>
+ <c>
+ {error,&nbsp;{<anno>Reason</anno>,&nbsp;<anno>BytesSent</anno>}}
+ </c>
+ </tag>
+ <item>
+ <p>
+ An error has been reported
+ but before that some data was transferred.
+ See <c>{error,&nbsp;<anno>Reason</anno>}</c>
+ and <c>{ok,&nbsp;<anno>BytesSent</anno>}</c> above.
+ </p>
+ </item>
+ </taglist>
+ </desc>
+ </func>
+
+ <func>
+ <name name="sendfile" arity="5" clause_i="8"
+ since="OTP @OTP-17154@"
+ anchor="sendfile-timeout"/>
+ <fsummary>Send file data on a socket, with time-out.</fsummary>
+ <desc>
+ <p>
+ Sends file data on a socket, waiting at most
+ <c><anno>Timeout</anno></c> milliseconds for it to be sent
+ (<em>limited time-out</em>).
+ </p>
+ <p>
+ The same as
+ <seeerl marker="#sendfile-infinity">
+ "infinite" time-out <c>sendfile/5</c>
+ </seeerl>
+ but returns <c>{error,&nbsp;timeout}</c>
+ or <c>{error,&nbsp;{timeout,&nbsp;<anno>BytesSent</anno>}}</c>
+ after <c><anno>Timeout</anno></c> milliseconds,
+ if not all file data was transferred
+ by the platform's network layer.
+ </p>
+ </desc>
+ </func>
+
+ <func>
+ <name name="sendfile" arity="5" clause_i="5"
+ since="OTP @OTP-17154@"
+ anchor="sendfile-nowait"/>
+ <name name="sendfile" arity="5" clause_i="6"
+ since="OTP @OTP-17154@"/>
+ <fsummary>Send file data on a socket, but do not wait.</fsummary>
+ <desc>
+ <p>
+ Sends file data on a socket,
+ but returns a select continuation if the data
+ could not be sent immediately (<em>nowait</em>).
+ </p>
+ <p>
+ The same as
+ <seeerl marker="#sendfile-infinity">
+ "infinite" time-out <c>sendfile/5</c>
+ </seeerl>
+ but if the data is not immediately accepted
+ by the platform network layer,
+ the function returns
+ <seetype marker="#select_info"><c>{select,&nbsp;<anno>SelectInfo</anno>}</c></seetype>,
+ and the caller will then receive a select message,
+ <c>{'$socket',&nbsp;Socket,&nbsp;select,&nbsp;SelectHandle}</c> (
+ with the
+ <seetype marker="socket#select_handle"><c>SelectHandle</c></seetype>
+ that was contained in the
+ <seetype marker="#select_info">
+ <c><anno>SelectInfo</anno></c>
+ </seetype>
+ ) when there is room for more data.
+ Then a call to
+ <seeerl marker="#sendfile-cont"><c>sendfile/3</c></seeerl>
+ with <c><anno>SelectInfo</anno></c> as the second argument
+ will continue the data transfer.
+ </p>
+ <p>
+ If <c>SelectHandle</c> is a
+ <seetype marker="#select_handle"><c>select_handle()</c></seetype>,
+ that term will be contained in a returned
+ <c><anno>SelectInfo</anno></c>
+ and the corresponding select message.
+ The <c>SelectHandle</c> is presumed to be
+ unique to this call.
+ </p>
+ <p>
+ If <c>SelectHandle</c> is <c>nowait</c>, and a
+ <c><anno>SelectInfo</anno></c> is returned,
+ it will contain a
+ <seetype marker="socket#select_handle">
+ <c>select_handle()</c>
+ </seetype> generated by the call.
+ </p>
+ <p>
+ If some file data was sent, the function will return
+ <seetype marker="#select_info">
+ <c>
+ {ok,&nbsp;{<anno>BytesSent</anno>,&nbsp;<anno>SelectInfo</anno>}.
+ </c>
+ </seetype>
+ If the caller does not want to wait to send the rest of the data,
+ it should immediately cancel the operation with
+ <seemfa marker="#cancel/2"><c>cancel/2</c></seemfa>.
+ </p>
+ </desc>
+ </func>
+
+ <func>
+ <name name="sendfile" arity="5" clause_i="3"
+ since="OTP @OTP-17154@"
+ anchor="sendfile-cont"/>
+ <name name="sendfile" arity="5" clause_i="4"
+ since="OTP @OTP-17154@"/>
+ <name name="sendfile" arity="5" clause_i="1"
+ since="OTP @OTP-17154@"/>
+ <name name="sendfile" arity="5" clause_i="2"
+ since="OTP @OTP-17154@"/>
+ <fsummary>Send file data on a socket, continuation.</fsummary>
+ <desc>
+ <p>
+ Continues sending file data on a socket, where the
+ send operation was initiated by
+ <seeerl marker="#sendfile-nowait"><c>sendfile/3,5</c></seeerl>
+ that returned a <c>SelectInfo</c> continuation.
+ Otherwise like
+ <seeerl marker="#sendfile-infinity">
+ "infinite" time-out <c>sendfile/5</c>
+ </seeerl>
+ ,
+ <seeerl marker="#sendfile-timeout">
+ limited time-out <c>sendfile/5</c>
+ </seeerl>
+ or
+ <seeerl marker="#sendfile-nowait">
+ nowait <c>sendfile/5</c>
+ </seeerl>
+ respectively.
+ </p>
+ <p>
+ <c><anno>Cont</anno></c> is the <c>SelectInfo</c>
+ that was returned from the previous <c>sendfile()</c> call.
+ </p>
+ <p>
+ The return value indicates the result from
+ the platform's network layer.
+ See
+ <seeerl marker="#sendfile-infinity">
+ "infinite" time-out <c>sendfile/5</c>.
+ </seeerl>
+ </p>
+ </desc>
+ </func>
+
+ <func>
+ <name since="OTP @OTP-17154@">sendfile(Socket, FileHandle, Offset, Count)
+ -> Result</name>
+ <fsummary>
+ Send file data on a socket, with "infinite" time-out.
+ </fsummary>
+ <type>
+ <v>Socket = <seetype marker="#socket">socket()</seetype></v>
+ <v>FileHandle = <seetype marker="file#fd">file:fd()</seetype></v>
+ <v>Offset = integer()</v>
+ <v>Count = integer() >= 0</v>
+ </type>
+ <desc>
+ <p>
+ The same as
+ <seeerl marker="#sendfile-infinity">
+ <c>
+ sendfile(Socket,&nbsp;FileHandle,&nbsp;Offset,&nbsp;Count,&nbsp;infinity),
+ </c>
+ </seeerl>
+ that is: send the file data at <c>Offset</c> and <c>Count</c>
+ to the socket, without time-out
+ other than from the platform's network stack.
+ </p>
+ </desc>
+ </func>
+
+ <func>
+ <name since="OTP @OTP-17154@">sendfile(Socket, FileHandle, Timeout)
+ -> Result</name>
+ <fsummary>
+ Send a file's data on a socket.
+ </fsummary>
+ <type>
+ <v>Socket = <seetype marker="#socket">socket()</seetype></v>
+ <v>FileHandle = <seetype marker="file#fd">file:fd()</seetype></v>
+ <v>
+ Timeout = timeout() | 'nowait' |
+ <seetype marker="#select_handle">select_handle()</seetype>
+ </v>
+ </type>
+ <desc>
+ <p>
+ Depending on the <c>Timeout</c> argument; the same as
+ <seeerl marker="#sendfile-infinity">
+ <c>
+ sendfile(Socket,&nbsp;FileHandle,&nbsp;0,&nbsp;0,&nbsp;infinity),
+ </c>
+ </seeerl>
+ <seeerl marker="#sendfile-timeout">
+ <c>
+ sendfile(Socket,&nbsp;FileHandle,&nbsp;0,&nbsp;0,&nbsp;Timeout),
+ </c>
+ </seeerl>
+ or
+ <seeerl marker="#sendfile-nowait">
+ <c>
+ sendfile(Socket,&nbsp;FileHandle,&nbsp;0,&nbsp;0,&nbsp;SelectHandle),
+ </c>
+ </seeerl>
+ that is: send all data in the file to the socket,
+ with the given <c>Timeout</c>.
+ </p>
+ </desc>
+ </func>
+
+ <func>
+ <name since="OTP @OTP-17154@">sendfile(Socket, FileHandle)
+ -> Result</name>
+ <fsummary>
+ Send a file's data on a socket, with "infinite" time-out.
+ </fsummary>
+ <type>
+ <v>Socket = <seetype marker="#socket">socket()</seetype></v>
+ <v>FileHandle = <seetype marker="file#fd">file:fd()</seetype></v>
+ </type>
+ <desc>
+ <p>
+ The same as
+ <seeerl marker="#sendfile-infinity">
+ <c>
+ sendfile(Socket,&nbsp;FileHandle,&nbsp;0,&nbsp;0,&nbsp;infinity),
+ </c>
+ </seeerl>
+ that is: send all data in the file to the socket,
+ without time-out other than from the platform's network stack.
+ </p>
+ </desc>
+ </func>
+
+ <func>
<name name="setopt" arity="3" clause_i="1" since="OTP 22.0"/>
<fsummary>
Set a socket option in the protocol level <c>otp</c>.
diff --git a/lib/kernel/src/file.erl b/lib/kernel/src/file.erl
index d77f214d32..57d3add4cb 100644
--- a/lib/kernel/src/file.erl
+++ b/lib/kernel/src/file.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 1996-2019. All Rights Reserved.
+%% Copyright Ericsson AB 1996-2021. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@@ -1290,7 +1290,8 @@ change_time(Name, {{AY, AM, AD}, {AH, AMin, ASec}}=Atime,
{'ok', non_neg_integer()} | {'error', inet:posix() |
closed | badarg | not_owner} when
RawFile :: fd(),
- Socket :: inet:socket(),
+ Socket :: inet:socket() | socket:socket() |
+ fun ((iolist()) -> ok | {error, inet:posix() | closed}),
Offset :: non_neg_integer(),
Bytes :: non_neg_integer(),
Opts :: [sendfile_option()].
@@ -1315,9 +1316,10 @@ sendfile(File, Sock, Offset, Bytes, Opts) ->
%% sendfile/2
-spec sendfile(Filename, Socket) ->
{'ok', non_neg_integer()} | {'error', inet:posix() |
- closed | badarg | not_owner}
- when Filename :: name_all(),
- Socket :: inet:socket().
+ closed | badarg | not_owner} when
+ Filename :: name_all(),
+ Socket :: inet:socket() | socket:socket() |
+ fun ((iolist()) -> ok | {error, inet:posix() | closed}).
sendfile(Filename, Sock) ->
case file:open(Filename, [read, raw, binary]) of
{error, Reason} ->
@@ -1328,14 +1330,41 @@ sendfile(Filename, Sock) ->
Res
end.
+-define(module_socket(Handler, Handle),
+ {'$inet', (Handler), (Handle)}).
+-define(socket(Handle),
+ {'$socket', (Handle)}).
+
%% Internal sendfile functions
sendfile(#file_descriptor{ module = Mod } = Fd, Sock, Offset, Bytes,
ChunkSize, Headers, Trailers, Opts)
when is_integer(Offset), is_integer(Bytes) ->
case Sock of
- {'$inet', _, _} ->
+ ?socket(_) when Headers =:= [], Trailers =:= [] ->
+ try socket:sendfile(Sock, Fd, Offset, Bytes, infinity)
+ catch error : notsup ->
+ sendfile_fallback(
+ Fd, socket_send(Sock), Offset, Bytes, ChunkSize,
+ Headers, Trailers)
+ end;
+ ?socket(_) ->
sendfile_fallback(
- Fd, Sock, Offset, Bytes, ChunkSize,
+ Fd, socket_send(Sock), Offset, Bytes, ChunkSize,
+ Headers, Trailers);
+ ?module_socket(GenTcpMod, _) when Headers =:= [], Trailers =:= [] ->
+ case
+ GenTcpMod:sendfile(Sock, Fd, Offset, Bytes)
+ of
+ {error, enotsup} ->
+ sendfile_fallback(
+ Fd, gen_tcp_send(Sock), Offset, Bytes, ChunkSize,
+ Headers, Trailers);
+ Else ->
+ Else
+ end;
+ ?module_socket(_, _) ->
+ sendfile_fallback(
+ Fd, gen_tcp_send(Sock), Offset, Bytes, ChunkSize,
Headers, Trailers);
_ when is_port(Sock) ->
case Mod:sendfile(
@@ -1343,49 +1372,63 @@ sendfile(#file_descriptor{ module = Mod } = Fd, Sock, Offset, Bytes,
Headers, Trailers, Opts) of
{error, enotsup} ->
sendfile_fallback(
- Fd, Sock, Offset, Bytes, ChunkSize,
+ Fd, gen_tcp_send(Sock), Offset, Bytes, ChunkSize,
Headers, Trailers);
Else ->
Else
- end
+ end;
+ _ when is_function(Sock, 1) ->
+ sendfile_fallback(
+ Fd, Sock, Offset, Bytes, ChunkSize,
+ Headers, Trailers)
end;
sendfile(_,_,_,_,_,_,_,_) ->
{error, badarg}.
+socket_send(Sock) ->
+ fun (Data) ->
+ socket:send(Sock, Data)
+ end.
+
+gen_tcp_send(Sock) ->
+ fun (Data) ->
+ gen_tcp:send(Sock, Data)
+ end.
+
%%%
%% Sendfile Fallback
%%%
-sendfile_fallback(File, Sock, Offset, Bytes, ChunkSize,
+sendfile_fallback(File, Send, Offset, Bytes, ChunkSize,
Headers, Trailers)
when Headers == []; is_integer(Headers) ->
- case sendfile_fallback(File, Sock, Offset, Bytes, ChunkSize) of
+ case sendfile_fallback(File, Send, Offset, Bytes, ChunkSize) of
{ok, BytesSent} when is_list(Trailers),
Trailers =/= [],
is_integer(Headers) ->
- sendfile_send(Sock, Trailers, BytesSent+Headers);
+ sendfile_send(Send, Trailers, BytesSent+Headers);
{ok, BytesSent} when is_list(Trailers), Trailers =/= [] ->
- sendfile_send(Sock, Trailers, BytesSent);
+ sendfile_send(Send, Trailers, BytesSent);
{ok, BytesSent} when is_integer(Headers) ->
{ok, BytesSent + Headers};
Else ->
Else
end;
-sendfile_fallback(File, Sock, Offset, Bytes, ChunkSize, Headers, Trailers) ->
- case sendfile_send(Sock, Headers, 0) of
+sendfile_fallback(File, Send, Offset, Bytes, ChunkSize, Headers, Trailers) ->
+ case sendfile_send(Send, Headers, 0) of
{ok, BytesSent} ->
- sendfile_fallback(File, Sock, Offset, Bytes, ChunkSize, BytesSent,
+ sendfile_fallback(File, Send, Offset, Bytes, ChunkSize, BytesSent,
Trailers);
Else ->
Else
end.
-sendfile_fallback(File, Sock, Offset, Bytes, ChunkSize)
+sendfile_fallback(File, Send, Offset, Bytes, ChunkSize)
when 0 =< Bytes ->
{ok, CurrPos} = file:position(File, {cur, 0}),
case file:position(File, {bof, Offset}) of
{ok, _NewPos} ->
- Res = sendfile_fallback_int(File, Sock, Bytes, ChunkSize, 0),
+ Res = sendfile_fallback_int(File, Send, Bytes, ChunkSize, 0),
_ = file:position(File, {bof, CurrPos}),
Res;
Error ->
@@ -1395,7 +1438,7 @@ sendfile_fallback(_, _, _, _, _) ->
{error, einval}.
-sendfile_fallback_int(File, Sock, Bytes, ChunkSize, BytesSent)
+sendfile_fallback_int(File, Send, Bytes, ChunkSize, BytesSent)
when Bytes > BytesSent; Bytes == 0 ->
Size = if Bytes == 0 ->
ChunkSize;
@@ -1406,10 +1449,10 @@ sendfile_fallback_int(File, Sock, Bytes, ChunkSize, BytesSent)
end,
case file:read(File, Size) of
{ok, Data} ->
- case sendfile_send(Sock, Data, BytesSent) of
+ case sendfile_send(Send, Data, BytesSent) of
{ok,NewBytesSent} ->
sendfile_fallback_int(
- File, Sock, Bytes, ChunkSize,
+ File, Send, Bytes, ChunkSize,
NewBytesSent);
Error ->
Error
@@ -1419,12 +1462,12 @@ sendfile_fallback_int(File, Sock, Bytes, ChunkSize, BytesSent)
Error ->
Error
end;
-sendfile_fallback_int(_File, _Sock, BytesSent, _ChunkSize, BytesSent) ->
+sendfile_fallback_int(_File, _Send, BytesSent, _ChunkSize, BytesSent) ->
{ok, BytesSent}.
-sendfile_send(Sock, Data, Old) ->
+sendfile_send(Send, Data, Old) ->
Len = iolist_size(Data),
- case gen_tcp:send(Sock, Data) of
+ case Send(Data) of
ok ->
{ok, Len+Old};
Else ->
diff --git a/lib/kernel/src/gen_tcp.erl b/lib/kernel/src/gen_tcp.erl
index f0204802e3..c69524daf6 100644
--- a/lib/kernel/src/gen_tcp.erl
+++ b/lib/kernel/src/gen_tcp.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 1997-2020. All Rights Reserved.
+%% Copyright Ericsson AB 1997-2021. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@@ -30,6 +30,9 @@
-include("inet_int.hrl").
-include("file.hrl").
+-define(module_socket(Handler, Handle),
+ {'$inet', (Handler), (Handle)}).
+
-type option() ::
{active, true | false | once | -32768..32767} |
{buffer, non_neg_integer()} |
@@ -231,7 +234,7 @@ listen(Port, Opts0) ->
Socket :: socket(),
Reason :: closed | system_limit | inet:posix().
-accept({'$inet', GenTcpMod, _} = S) when is_atom(GenTcpMod) ->
+accept(?module_socket(GenTcpMod, _) = S) when is_atom(GenTcpMod) ->
GenTcpMod:?FUNCTION_NAME(S, infinity);
accept(S) when is_port(S) ->
case inet_db:lookup_socket(S) of
@@ -247,7 +250,7 @@ accept(S) when is_port(S) ->
Socket :: socket(),
Reason :: closed | timeout | system_limit | inet:posix().
-accept({'$inet', GenTcpMod, _} = S, Time) when is_atom(GenTcpMod) ->
+accept(?module_socket(GenTcpMod, _) = S, Time) when is_atom(GenTcpMod) ->
GenTcpMod:?FUNCTION_NAME(S, Time);
accept(S, Time) when is_port(S) ->
case inet_db:lookup_socket(S) of
@@ -266,7 +269,7 @@ accept(S, Time) when is_port(S) ->
How :: read | write | read_write,
Reason :: inet:posix().
-shutdown({'$inet', GenTcpMod, _} = S, How) when is_atom(GenTcpMod) ->
+shutdown(?module_socket(GenTcpMod, _) = S, How) when is_atom(GenTcpMod) ->
GenTcpMod:?FUNCTION_NAME(S, How);
shutdown(S, How) when is_port(S) ->
case inet_db:lookup_socket(S) of
@@ -283,7 +286,7 @@ shutdown(S, How) when is_port(S) ->
-spec close(Socket) -> ok when
Socket :: socket().
-close({'$inet', GenTcpMod, _} = S) when is_atom(GenTcpMod) ->
+close(?module_socket(GenTcpMod, _) = S) when is_atom(GenTcpMod) ->
GenTcpMod:?FUNCTION_NAME(S);
close(S) ->
inet:tcp_close(S).
@@ -298,7 +301,7 @@ close(S) ->
Reason :: closed | {timeout, RestData} | inet:posix(),
RestData :: binary().
-send({'$inet', GenTcpMod, _} = S, Packet) when is_atom(GenTcpMod) ->
+send(?module_socket(GenTcpMod, _) = S, Packet) when is_atom(GenTcpMod) ->
GenTcpMod:?FUNCTION_NAME(S, Packet);
send(S, Packet) when is_port(S) ->
case inet_db:lookup_socket(S) of
@@ -319,7 +322,7 @@ send(S, Packet) when is_port(S) ->
Reason :: closed | inet:posix(),
HttpPacket :: term().
-recv({'$inet', GenTcpMod, _} = S, Length) when is_atom(GenTcpMod) ->
+recv(?module_socket(GenTcpMod, _) = S, Length) when is_atom(GenTcpMod) ->
GenTcpMod:?FUNCTION_NAME(S, Length, infinity);
recv(S, Length) when is_port(S) ->
case inet_db:lookup_socket(S) of
@@ -337,7 +340,7 @@ recv(S, Length) when is_port(S) ->
Reason :: closed | timeout | inet:posix(),
HttpPacket :: term().
-recv({'$inet', GenTcpMod, _} = S, Length, Time) when is_atom(GenTcpMod) ->
+recv(?module_socket(GenTcpMod, _) = S, Length, Time) when is_atom(GenTcpMod) ->
GenTcpMod:?FUNCTION_NAME(S, Length, Time);
recv(S, Length, Time) when is_port(S) ->
case inet_db:lookup_socket(S) of
@@ -347,7 +350,7 @@ recv(S, Length, Time) when is_port(S) ->
Error
end.
-unrecv({'$inet', GenTcpMod, _} = S, Data) when is_atom(GenTcpMod) ->
+unrecv(?module_socket(GenTcpMod, _) = S, Data) when is_atom(GenTcpMod) ->
GenTcpMod:?FUNCTION_NAME(S, Data);
unrecv(S, Data) when is_port(S) ->
case inet_db:lookup_socket(S) of
@@ -366,7 +369,7 @@ unrecv(S, Data) when is_port(S) ->
Pid :: pid(),
Reason :: closed | not_owner | badarg | inet:posix().
-controlling_process({'$inet', GenTcpMod, _} = S, NewOwner)
+controlling_process(?module_socket(GenTcpMod, _) = S, NewOwner)
when is_atom(GenTcpMod) ->
GenTcpMod:?FUNCTION_NAME(S, NewOwner);
controlling_process(S, NewOwner) ->
diff --git a/lib/kernel/src/gen_tcp_socket.erl b/lib/kernel/src/gen_tcp_socket.erl
index 53e449c47f..dad5b06200 100644
--- a/lib/kernel/src/gen_tcp_socket.erl
+++ b/lib/kernel/src/gen_tcp_socket.erl
@@ -24,6 +24,7 @@
%% gen_tcp
-export([connect/4, listen/2, accept/2,
send/2, recv/3,
+ sendfile/4,
shutdown/2, close/1, controlling_process/2]).
%% inet
-export([setopts/2, getopts/2,
@@ -400,6 +401,40 @@ send_result(Server, Meta, Result) ->
end.
%% -------------------------------------------------------------------------
+%% Handler called by file:sendfile/5 to handle ?module_socket()s
+%% as a sibling of prim_file:sendfile/8
+
+sendfile(
+ ?module_socket(_Server, Socket),
+ FileHandle, Offset, Count) ->
+ %%
+ case socket:getopt(Socket, {otp,meta}) of
+ {ok, #{packet := _}} ->
+ try
+ %% XXX should we do cork/uncork here, like in prim_inet?
+ %% And, maybe file:advise too, like prim_file
+ socket:sendfile(Socket, FileHandle, Offset, Count, infinity)
+ catch
+ Class : Reason : Stacktrace
+ when Class =:= error, Reason =:= badarg ->
+ %% Convert badarg exception into return value
+ %% to look like file:sendfile
+ case Stacktrace of
+ [{socket, sendfile, _, _} | _] ->
+ {Class, Reason};
+ _ ->
+ erlang:raise(Class, Reason, Stacktrace)
+ end;
+ Class : notsup when Class =:= error ->
+ {Class, enotsup}
+ end;
+ {ok, _BadMeta} ->
+ {error, badarg};
+ {error, _} = Error ->
+ Error
+ end.
+
+%% -------------------------------------------------------------------------
recv(?module_socket(Server, _Socket), Length, Timeout) ->
?badarg_exit(call(Server, {recv, Length, Timeout})).
diff --git a/lib/kernel/src/inet.erl b/lib/kernel/src/inet.erl
index 4af31b86cc..c41e57345f 100644
--- a/lib/kernel/src/inet.erl
+++ b/lib/kernel/src/inet.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 1997-2020. All Rights Reserved.
+%% Copyright Ericsson AB 1997-2021. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@@ -130,6 +130,7 @@
'ewouldblock' |
'exbadport' | 'exbadseq' | file:posix().
-type module_socket() :: {'$inet', Handler :: module(), Handle :: term()}.
+-define(module_socket(Handler, Handle), {'$inet', (Handler), (Handle)}).
-type socket() :: port() | module_socket().
-type socket_setopt() ::
@@ -213,7 +214,7 @@ close(Socket) ->
returned_non_ip_address()} |
{error, posix()}.
-peername({'$inet', GenSocketMod, _} = Socket) when is_atom(GenSocketMod) ->
+peername(?module_socket(GenSocketMod, _) = Socket) when is_atom(GenSocketMod) ->
GenSocketMod:?FUNCTION_NAME(Socket);
peername(Socket) ->
prim_inet:peername(Socket).
@@ -257,7 +258,7 @@ peernames(Socket, Assoc) ->
returned_non_ip_address()} |
{error, posix()}.
-sockname({'$inet', GenSocketMod, _} = Socket) when is_atom(GenSocketMod) ->
+sockname(?module_socket(GenSocketMod, _) = Socket) when is_atom(GenSocketMod) ->
GenSocketMod:?FUNCTION_NAME(Socket);
sockname(Socket) ->
prim_inet:sockname(Socket).
@@ -299,7 +300,7 @@ socknames(Socket, Assoc) ->
Socket :: socket(),
Port :: port_number().
-port({'$inet', GenSocketMod, _} = Socket) when is_atom(GenSocketMod) ->
+port(?module_socket(GenSocketMod, _) = Socket) when is_atom(GenSocketMod) ->
case GenSocketMod:sockname(Socket) of
{ok, {_, Port}} -> {ok, Port};
{error, _} = Error -> Error
@@ -320,7 +321,7 @@ send(Socket, Packet) ->
Socket :: socket(),
Options :: [socket_setopt()].
-setopts({'$inet', GenSocketMod, _} = Socket, Opts) when is_atom(GenSocketMod) ->
+setopts(?module_socket(GenSocketMod, _) = Socket, Opts) when is_atom(GenSocketMod) ->
GenSocketMod:?FUNCTION_NAME(Socket, Opts);
setopts(Socket, Opts) ->
SocketOpts =
@@ -338,7 +339,7 @@ setopts(Socket, Opts) ->
Options :: [socket_getopt()],
OptionValues :: [socket_setopt() | gen_tcp:pktoptions_value()].
-getopts({'$inet', GenSocketMod, _} = Socket, Opts)
+getopts(?module_socket(GenSocketMod, _) = Socket, Opts)
when is_atom(GenSocketMod) ->
GenSocketMod:?FUNCTION_NAME(Socket, Opts);
getopts(Socket, Opts) ->
@@ -531,7 +532,7 @@ getstat(Socket) ->
Options :: [stat_option()],
OptionValues :: [{stat_option(), integer()}].
-getstat({'$inet', GenSocketMod, _} = Socket, What)
+getstat(?module_socket(GenSocketMod, _) = Socket, What)
when is_atom(GenSocketMod) ->
GenSocketMod:?FUNCTION_NAME(Socket, What);
getstat(Socket, What) ->
diff --git a/lib/kernel/src/socket.erl b/lib/kernel/src/socket.erl
index 63e4ceea1d..c4bee76aa4 100644
--- a/lib/kernel/src/socket.erl
+++ b/lib/kernel/src/socket.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 2020. All Rights Reserved.
+%% Copyright Ericsson AB 2021. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@@ -44,6 +44,8 @@
sendto/3, sendto/4, sendto/5,
sendmsg/2, sendmsg/3, sendmsg/4,
+ sendfile/2, sendfile/3, sendfile/4, sendfile/5,
+
recv/1, recv/2, recv/3, recv/4,
recvfrom/1, recvfrom/2, recvfrom/3, recvfrom/4,
recvmsg/1, recvmsg/2, recvmsg/3, recvmsg/4, recvmsg/5,
@@ -62,6 +64,7 @@
-export_type([
socket/0,
+ socket_handle/0,
select_tag/0,
select_handle/0,
@@ -120,6 +123,9 @@
extended_err/0
]).
+%% We need #file_descriptor{} for sendfile/2,3,4,5
+-include("file_int.hrl").
+
%% Also in prim_socket
-define(REGISTRY, socket_registry).
@@ -130,22 +136,30 @@
iov_max := non_neg_integer(),
use_registry := boolean()}.
--type socket_counters() :: #{read_byte := non_neg_integer(),
- read_fails := non_neg_integer(),
- read_pkg := non_neg_integer(),
- read_pkg_max := non_neg_integer(),
- read_tries := non_neg_integer(),
- read_waits := non_neg_integer(),
- write_byte := non_neg_integer(),
- write_fails := non_neg_integer(),
- write_pkg := non_neg_integer(),
- write_pkg_max := non_neg_integer(),
- write_tries := non_neg_integer(),
- write_waits := non_neg_integer(),
- acc_success := non_neg_integer(),
- acc_fails := non_neg_integer(),
- acc_tries := non_neg_integer(),
- acc_waits := non_neg_integer()}.
+-type socket_counters() :: #{read_byte := non_neg_integer(),
+ read_fails := non_neg_integer(),
+ read_pkg := non_neg_integer(),
+ read_pkg_max := non_neg_integer(),
+ read_tries := non_neg_integer(),
+ read_waits := non_neg_integer(),
+ write_byte := non_neg_integer(),
+ write_fails := non_neg_integer(),
+ write_pkg := non_neg_integer(),
+ write_pkg_max := non_neg_integer(),
+ write_tries := non_neg_integer(),
+ write_waits := non_neg_integer(),
+ sendfile => non_neg_integer(),
+ sendfile_byte => non_neg_integer(),
+ sendfile_fails => non_neg_integer(),
+ sendfile_max => non_neg_integer(),
+ sendfile_pkg => non_neg_integer(),
+ sendfile_pkg_max => non_neg_integer(),
+ sendfile_tries => non_neg_integer(),
+ sendfile_waits => non_neg_integer(),
+ acc_success := non_neg_integer(),
+ acc_fails := non_neg_integer(),
+ acc_tries := non_neg_integer(),
+ acc_waits := non_neg_integer()}.
-type socket_info() :: #{domain := domain() | integer(),
type := type() | integer(),
@@ -500,7 +514,8 @@
%% Messages sent from the nif-code to erlang processes:
-define(socket_msg(Socket, Tag, Info), {?socket_tag, (Socket), (Tag), (Info)}).
--opaque socket() :: ?socket(reference()).
+-type socket() :: ?socket(socket_handle()).
+-opaque socket_handle() :: reference().
%% Some flags are used for send, others for recv, and yet again
%% others are found in a cmsg(). They may occur in multiple locations..
@@ -1128,7 +1143,7 @@ connect_deadline(SockRef, SockAddr, Deadline) ->
?socket_msg(_Socket, abort, {Ref, Reason}) ->
{error, Reason}
after Timeout ->
- cancel(SockRef, connect, Ref),
+ _ = cancel(SockRef, connect, Ref),
{error, timeout}
end;
Result ->
@@ -1257,7 +1272,7 @@ accept_deadline(LSockRef, Deadline) ->
?socket_msg(_Socket, abort, {AccRef, Reason}) ->
{error, Reason}
after Timeout ->
- cancel(LSockRef, accept, AccRef),
+ _ = cancel(LSockRef, accept, AccRef),
{error, timeout}
end;
Result ->
@@ -1270,7 +1285,8 @@ accept_result(LSockRef, AccRef, Result) ->
Socket = ?socket(SockRef),
{ok, Socket};
{error, _} = ERROR ->
- cancel(LSockRef, accept, AccRef), % Just to be on the safe side...
+ %% Just to be on the safe side...
+ _ = cancel(LSockRef, accept, AccRef),
ERROR
end.
@@ -1479,20 +1495,26 @@ send(Socket, Data, Timeout) ->
RestData :: binary(),
Reason :: posix() | 'closed' | invalid().
-send(?socket(SockRef), Data, ?SELECT_INFO({send, Cont}, _), Timeout)
+send(?socket(SockRef), Data, ?SELECT_INFO(SelectTag, _) = Cont, Timeout)
when is_reference(SockRef), is_binary(Data) ->
- case deadline(Timeout) of
- invalid ->
- erlang:error({invalid, {timeout, Timeout}});
- nowait ->
- SelectHandle = make_ref(),
- send_nowait_cont(SockRef, Data, Cont, SelectHandle);
- select_handle ->
- SelectHandle = Timeout,
- send_nowait_cont(SockRef, Data, Cont, SelectHandle);
- Deadline ->
- HasWritten = false,
- send_deadline_cont(SockRef, Data, Cont, Deadline, HasWritten)
+ case SelectTag of
+ {send, ContData} ->
+ case deadline(Timeout) of
+ invalid ->
+ erlang:error({invalid, {timeout, Timeout}});
+ nowait ->
+ SelectHandle = make_ref(),
+ send_nowait_cont(SockRef, Data, ContData, SelectHandle);
+ select_handle ->
+ SelectHandle = Timeout,
+ send_nowait_cont(SockRef, Data, ContData, SelectHandle);
+ Deadline ->
+ HasWritten = false,
+ send_deadline_cont(
+ SockRef, Data, ContData, Deadline, HasWritten)
+ end;
+ _ ->
+ {error, {invalid, Cont}}
end;
send(?socket(SockRef), Data, Flags, Timeout)
when is_reference(SockRef), is_binary(Data), is_list(Flags) ->
@@ -1553,10 +1575,10 @@ send_deadline_cont(SockRef, Bin, Cont, Deadline, HasWritten) ->
-compile({inline, [send_common_nowait_result/3]}).
send_common_nowait_result(SelectHandle, Op, Result) ->
case Result of
- {select, Cont} ->
- {select, ?SELECT_INFO({Op, Cont}, SelectHandle)};
- {select, Data, Cont} ->
- {ok, {Data, ?SELECT_INFO({Op, Cont}, SelectHandle)}};
+ {select, ContData} ->
+ {select, ?SELECT_INFO({Op, ContData}, SelectHandle)};
+ {select, Data, ContData} ->
+ {ok, {Data, ?SELECT_INFO({Op, ContData}, SelectHandle)}};
%%
Result ->
Result
@@ -1772,23 +1794,29 @@ sendto(Socket, Data, Dest, Flags) when is_list(Flags) ->
sendto(Socket, Data, Dest, Flags, ?ESOCK_SENDTO_TIMEOUT_DEFAULT);
sendto(
?socket(SockRef) = Socket, Data,
- ?SELECT_INFO({sendto, Cont}, _) = SI, Timeout)
+ ?SELECT_INFO(SelectTag, _) = Cont, Timeout)
when is_reference(SockRef) ->
- case Data of
- Bin when is_binary(Bin) ->
- sendto_timeout_cont(SockRef, Bin, Cont, Timeout);
- [Bin] when is_binary(Bin) ->
- sendto_timeout_cont(SockRef, Bin, Cont, Timeout);
- IOV when is_list(IOV) ->
- try erlang:list_to_binary(IOV) of
- Bin ->
- sendto_timeout_cont(SockRef, Bin, Cont, Timeout)
- catch
- error : badarg ->
- erlang:error({invalid, {data, Data}})
+ case SelectTag of
+ {sendto, ContData} ->
+ case Data of
+ Bin when is_binary(Bin) ->
+ sendto_timeout_cont(SockRef, Bin, ContData, Timeout);
+ [Bin] when is_binary(Bin) ->
+ sendto_timeout_cont(SockRef, Bin, ContData, Timeout);
+ IOV when is_list(IOV) ->
+ try erlang:list_to_binary(IOV) of
+ Bin ->
+ sendto_timeout_cont(
+ SockRef, Bin, ContData, Timeout)
+ catch
+ error : badarg ->
+ erlang:error({invalid, {data, Data}})
+ end;
+ _ ->
+ erlang:error(badarg, [Socket, Data, Cont, Timeout])
end;
_ ->
- erlang:error(badarg, [Socket, Data, SI, Timeout])
+ {error, {invalid, Cont}}
end;
sendto(Socket, Data, Dest, Timeout) ->
sendto(Socket, Data, Dest, ?ESOCK_SENDTO_FLAGS_DEFAULT, Timeout).
@@ -2136,15 +2164,20 @@ sendmsg(Socket, Msg, Timeout) ->
sendmsg(
?socket(SockRef) = Socket, RestData,
- ?SELECT_INFO({sendmsg, Cont}, _) = SI, Timeout) ->
+ ?SELECT_INFO(SelectTag, _) = Cont, Timeout) ->
%%
- case RestData of
- #{iov := IOV} ->
- sendmsg_timeout_cont(SockRef, IOV, Cont, Timeout);
- IOV when is_list(IOV) ->
- sendmsg_timeout_cont(SockRef, IOV, Cont, Timeout);
+ case SelectTag of
+ {sendmsg, ContData} ->
+ case RestData of
+ #{iov := IOV} ->
+ sendmsg_timeout_cont(SockRef, IOV, ContData, Timeout);
+ IOV when is_list(IOV) ->
+ sendmsg_timeout_cont(SockRef, IOV, ContData, Timeout);
+ _ ->
+ erlang:error(badarg, [Socket, RestData, Cont, Timeout])
+ end;
_ ->
- erlang:error(badarg, [Socket, RestData, SI, Timeout])
+ {error, {invalid, Cont}}
end;
sendmsg(?socket(SockRef), #{iov := IOV} = Msg, Flags, Timeout)
when is_reference(SockRef), is_list(Flags) ->
@@ -2206,6 +2239,275 @@ sendmsg_deadline_cont(SockRef, Data, Cont, Deadline, HasWritten) ->
%% ===========================================================================
%%
+%% sendfile - send a file on a socket
+%%
+
+sendfile(Socket, FileHandle) ->
+ sendfile(Socket, FileHandle, 0, 0, infinity).
+
+sendfile(Socket, FileHandle, Timeout) ->
+ sendfile(Socket, FileHandle, 0, 0, Timeout).
+
+sendfile(Socket, FileHandle_Cont, Offset, Count) ->
+ sendfile(Socket, FileHandle_Cont, Offset, Count, infinity).
+
+
+-spec sendfile(Socket, Cont, Offset, Count,
+ SelectHandle :: 'nowait') ->
+ {'ok', BytesSent} |
+ {'ok', {BytesSent, SelectInfo}} |
+ {'error', Reason}
+ when
+ Socket :: socket(),
+ Cont :: select_info(),
+ Offset :: integer(),
+ Count :: non_neg_integer(),
+ BytesSent :: non_neg_integer(),
+ SelectInfo :: select_info(),
+ Reason :: posix() | 'closed' | invalid();
+
+ (Socket, Cont, Offset, Count,
+ SelectHandle :: select_handle()) ->
+ {'ok', BytesSent} |
+ {'ok', {BytesSent, SelectInfo}} |
+ {'error', {Reason, BytesSent}}
+ when
+ Socket :: socket(),
+ Cont :: select_info(),
+ Offset :: integer(),
+ Count :: non_neg_integer(),
+ BytesSent :: non_neg_integer(),
+ SelectInfo :: select_info(),
+ Reason :: posix() | 'closed' | invalid();
+
+ (Socket, Cont, Offset, Count,
+ Timeout :: 'infinity') ->
+ {'ok', BytesSent} |
+ {'error', Reason} |
+ {'error', {Reason, BytesSent}}
+ when
+ Socket :: socket(),
+ Cont :: select_info(),
+ Offset :: integer(),
+ Count :: non_neg_integer(),
+ BytesSent :: non_neg_integer(),
+ Reason :: posix() | 'closed' | invalid();
+
+ (Socket, Cont, Offset, Count,
+ Timeout :: non_neg_integer()) ->
+ {'ok', BytesSent} |
+ {'error', Reason | 'timeout'} |
+ {'error', {Reason | 'timeout', BytesSent}}
+ when
+ Socket :: socket(),
+ Cont :: select_info(),
+ Offset :: integer(),
+ Count :: non_neg_integer(),
+ BytesSent :: non_neg_integer(),
+ Reason :: posix() | 'closed' | invalid();
+
+
+ (Socket, FileHandle, Offset, Count,
+ SelectHandle :: 'nowait') ->
+ {'ok', BytesSent} |
+ {'select', SelectInfo} |
+ {'ok', {BytesSent, SelectInfo}} |
+ {'error', Reason}
+ when
+ Socket :: socket(),
+ FileHandle :: file:fd(),
+ Offset :: integer(),
+ Count :: non_neg_integer(),
+ BytesSent :: non_neg_integer(),
+ SelectInfo :: select_info(),
+ Reason :: posix() | 'closed' | invalid();
+
+ (Socket, FileHandle, Offset, Count,
+ SelectHandle :: select_handle()) ->
+ {'ok', BytesSent} |
+ {'select', SelectInfo} |
+ {'ok', {BytesSent, SelectInfo}} |
+ {'error', Reason}
+ when
+ Socket :: socket(),
+ FileHandle :: file:fd(),
+ Offset :: integer(),
+ Count :: non_neg_integer(),
+ BytesSent :: non_neg_integer(),
+ SelectInfo :: select_info(),
+ Reason :: posix() | 'closed' | invalid();
+
+ (Socket, FileHandle, Offset, Count,
+ Timeout :: 'infinity') ->
+ {'ok', BytesSent} |
+ {'error', Reason} |
+ {'error', {Reason, BytesSent}}
+ when
+ Socket :: socket(),
+ FileHandle :: file:fd(),
+ Offset :: integer(),
+ Count :: non_neg_integer(),
+ BytesSent :: non_neg_integer(),
+ Reason :: posix() | 'closed' | invalid();
+
+ (Socket, FileHandle, Offset, Count,
+ Timeout :: non_neg_integer()) ->
+ {'ok', BytesSent} |
+ {'error', Reason | 'timeout'} |
+ {'error', {Reason | 'timeout', BytesSent}}
+ when
+ Socket :: socket(),
+ FileHandle :: file:fd(),
+ Offset :: integer(),
+ Count :: non_neg_integer(),
+ BytesSent :: non_neg_integer(),
+ Reason :: posix() | 'closed' | invalid().
+
+sendfile(
+ ?socket(SockRef) = Socket, FileHandle_Cont, Offset, Count, Timeout)
+ when is_integer(Offset), is_integer(Count), 0 =< Count ->
+ %%
+ case FileHandle_Cont of
+ #file_descriptor{module = Module} = FileHandle ->
+ GetFdData = get_fd_data,
+ try Module:GetFdData(FileHandle) of
+ #{handle := FRef} ->
+ State = {FRef, Offset, Count},
+ sendfile_int(SockRef, State, Timeout);
+ #{} ->
+ erlang:error(
+ badarg,
+ [Socket, FileHandle_Cont, Offset, Count, Timeout])
+ catch
+ Class : Reason : Stacktrace
+ when Class =:= error, Reason =:= undef ->
+ case Stacktrace of
+ [{Module, GetFdData, _, _} | _] ->
+ erlang:error(
+ badarg,
+ [Socket, FileHandle_Cont,
+ Offset, Count, Timeout]);
+ _ -> % Re-raise
+ erlang:raise(Class, Reason, Stacktrace)
+ end
+ end;
+ ?SELECT_INFO(SelectTag, _) = Cont ->
+ case SelectTag of
+ {sendfile, FRef} ->
+ State = {FRef, Offset, Count},
+ sendfile_int(SockRef, State, Timeout);
+ sendfile ->
+ State = {Offset, Count},
+ sendfile_int(SockRef, State, Timeout);
+ _ ->
+ {error, {invalid, Cont}}
+ end;
+ _ ->
+ erlang:error(
+ badarg, [Socket, FileHandle_Cont, Offset, Count, Timeout])
+ end;
+sendfile(Socket, FileHandle_Cont, Offset, Count, Timeout) ->
+ erlang:error(
+ badarg, [Socket, FileHandle_Cont, Offset, Count, Timeout]).
+
+sendfile_int(SockRef, State, Timeout) ->
+ case deadline(Timeout) of
+ invalid ->
+ erlang:error({invalid, {timeout, Timeout}});
+ nowait ->
+ SelectHandle = make_ref(),
+ sendfile_nowait(SockRef, State, SelectHandle);
+ select_handle ->
+ SelectHandle = Timeout,
+ sendfile_nowait(SockRef, State, SelectHandle);
+ Deadline ->
+ BytesSent = 0,
+ sendfile_deadline(SockRef, State, BytesSent, Deadline)
+ end.
+
+
+-compile({inline, [prim_socket_sendfile/3]}).
+prim_socket_sendfile(SockRef, {FRef, Offset, Count}, SelectHandle) ->
+ %% Start call
+ prim_socket:sendfile(SockRef, FRef, Offset, Count, SelectHandle);
+prim_socket_sendfile(SockRef, {Offset, Count}, SelectHandle) ->
+ %% Continuation call
+ prim_socket:sendfile(SockRef, Offset, Count, SelectHandle).
+
+sendfile_nowait(SockRef, State, SelectHandle) ->
+ case prim_socket_sendfile(SockRef, State, SelectHandle) of
+ select ->
+ %% Can only happen when we are enqueued after
+ %% a send in progress so BytesSent is 0;
+ %% wait for continuation and later repeat start call
+ {FRef, _Offset, _Count} = State,
+ {select, ?SELECT_INFO({sendfile, FRef}, SelectHandle)};
+ {select, BytesSent} ->
+ {ok,
+ {BytesSent, ?SELECT_INFO(sendfile, SelectHandle)}};
+ %%
+ Result ->
+ Result
+ end.
+
+sendfile_deadline(SockRef, State, BytesSent_0, Deadline) ->
+ SelectHandle = make_ref(),
+ case prim_socket_sendfile(SockRef, State, SelectHandle) of
+ select ->
+ %% Can only happen when we are enqueued after
+ %% a send in progress so BytesSent is 0;
+ %% wait for continuation and repeat start call
+ Timeout = timeout(Deadline),
+ receive
+ ?socket_msg(_Socket, select, SelectHandle) ->
+ sendfile_deadline(
+ SockRef, State, BytesSent_0, Deadline);
+ ?socket_msg(_Socket, abort, {SelectHandle, Reason}) ->
+ {error, Reason}
+ after Timeout ->
+ _ = cancel(SockRef, sendfile, SelectHandle),
+ {error, timeout}
+ end;
+ {select, BytesSent} ->
+ %% Partial send success; wait for continuation
+ Timeout = timeout(Deadline),
+ BytesSent_1 = BytesSent_0 + BytesSent,
+ receive
+ ?socket_msg(_Socket, select, SelectHandle) ->
+ sendfile_deadline(
+ SockRef,
+ sendfile_next(BytesSent, State),
+ BytesSent_1, Deadline);
+ ?socket_msg(_Socket, abort, {SelectHandle, Reason}) ->
+ {error, {Reason, BytesSent_1}}
+ after Timeout ->
+ _ = cancel(SockRef, sendfile, SelectHandle),
+ {error, {timeout, BytesSent_1}}
+ end;
+ {error, _} = Result when tuple_size(State) =:= 3 ->
+ Result;
+ {error, Reason} when tuple_size(State) =:= 2 ->
+ {error, {Reason, BytesSent_0}};
+ {ok, BytesSent} ->
+ {ok, BytesSent_0 + BytesSent}
+ end.
+
+sendfile_next(BytesSent, {_FRef, Offset, Count}) ->
+ sendfile_next(BytesSent, Offset, Count);
+sendfile_next(BytesSent, {Offset, Count}) ->
+ sendfile_next(BytesSent, Offset, Count).
+%%
+sendfile_next(BytesSent, Offset, Count) ->
+ {Offset + BytesSent,
+ if
+ Count =:= 0 ->
+ 0;
+ BytesSent < Count ->
+ Count - BytesSent
+ end}.
+
+%% ===========================================================================
+%%
%% recv, recvfrom, recvmsg - receive a message from a socket
%%
%% Description:
@@ -2495,7 +2797,7 @@ recv_deadline(SockRef, Length, Flags, Deadline, Acc) ->
?socket_msg(_Socket, abort, {SelectHandle, Reason}) ->
{error, {Reason, bincat(Acc, Bin)}}
after Timeout ->
- cancel(SockRef, recv, SelectHandle),
+ _ = cancel(SockRef, recv, SelectHandle),
{error, {timeout, bincat(Acc, Bin)}}
end;
%%
@@ -2503,7 +2805,7 @@ recv_deadline(SockRef, Length, Flags, Deadline, Acc) ->
%% We first got some data and are then asked to wait,
%% but we only want the first that comes
%% - cancel and return what we have
- cancel(SockRef, recv, SelectHandle),
+ _ = cancel(SockRef, recv, SelectHandle),
{ok, Acc};
select ->
%% There is nothing just now, but we will be notified when there
@@ -2522,7 +2824,7 @@ recv_deadline(SockRef, Length, Flags, Deadline, Acc) ->
?socket_msg(_Socket, abort, {SelectHandle, Reason}) ->
recv_error(Acc, Reason)
after Timeout ->
- cancel(SockRef, recv, SelectHandle),
+ _ = cancel(SockRef, recv, SelectHandle),
recv_error(Acc, timeout)
end;
Result ->
@@ -2785,7 +3087,7 @@ recvfrom_deadline(SockRef, BufSz, Flags, Deadline) ->
?socket_msg(_Socket, abort, {SelectHandle, Reason}) ->
{error, Reason}
after Timeout ->
- cancel(SockRef, recvfrom, SelectHandle),
+ _ = cancel(SockRef, recvfrom, SelectHandle),
{error, timeout}
end;
Result ->
@@ -3051,7 +3353,7 @@ recvmsg_deadline(SockRef, BufSz, CtrlSz, Flags, Deadline) ->
?socket_msg(_Socket, abort, {SelectHandle, Reason}) ->
{error, Reason}
after Timeout ->
- cancel(SockRef, recvmsg, SelectHandle),
+ _ = cancel(SockRef, recvmsg, SelectHandle),
{error, timeout}
end;
Result ->
@@ -3309,29 +3611,37 @@ peername(Socket) ->
SelectInfo :: select_info(),
Reason :: 'closed' | invalid().
-cancel(?socket(SockRef), ?SELECT_INFO(Tag, Ref))
+cancel(?socket(SockRef), ?SELECT_INFO(SelectTag, SelectHandle) = SelectInfo)
when is_reference(SockRef) ->
- case Tag of
- {OpName, _} when is_atom(OpName) ->
- cancel(SockRef, OpName, Ref);
- OpName when is_atom(OpName) ->
- cancel(SockRef, OpName, Ref)
+ case SelectTag of
+ {Op, _} when is_atom(Op) ->
+ ok;
+ Op when is_atom(Op) ->
+ ok
+ end,
+ case cancel(SockRef, Op, SelectHandle) of
+ ok ->
+ ok;
+ invalid ->
+ {error, {invalid, SelectInfo}};
+ Result ->
+ Result
end;
cancel(Socket, SelectInfo) ->
erlang:error(badarg, [Socket, SelectInfo]).
-cancel(SockRef, Op, OpRef) ->
- case prim_socket:cancel(SockRef, Op, OpRef) of
+cancel(SockRef, Op, SelectHandle) ->
+ case prim_socket:cancel(SockRef, Op, SelectHandle) of
select_sent ->
- flush_select_msg(SockRef, OpRef),
- _ = flush_abort_msg(SockRef, OpRef),
+ flush_select_msg(SockRef, SelectHandle),
+ _ = flush_abort_msg(SockRef, SelectHandle),
ok;
not_found ->
- _ = flush_abort_msg(SockRef, OpRef),
- {error, {invalid, ?SELECT_INFO(Op, OpRef)}};
+ _ = flush_abort_msg(SockRef, SelectHandle),
+ invalid;
Result ->
- _ = flush_abort_msg(SockRef, OpRef),
+ _ = flush_abort_msg(SockRef, SelectHandle),
Result
end.
diff --git a/lib/kernel/test/sendfile_SUITE.erl b/lib/kernel/test/sendfile_SUITE.erl
index a2a19caf35..7a734f056b 100644
--- a/lib/kernel/test/sendfile_SUITE.erl
+++ b/lib/kernel/test/sendfile_SUITE.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 2011-2020. All Rights Reserved.
+%% Copyright Ericsson AB 2011-2021. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@@ -90,21 +90,45 @@ init_per_testcase(TC,Config) when TC == t_sendfile_recvduring;
Send = fun(Sock) ->
{_Size, Data} = sendfile_file_info(Filename),
{ok,Fd} = file:open(Filename, [raw,binary,read]),
- %% Determine whether the driver has native support by
- %% hitting the raw module directly; file:sendfile/5 will
- %% land in the fallback if it doesn't.
- RawModule = Fd#file_descriptor.module,
- {ok, _Ignored} = RawModule:sendfile(Fd,Sock,0,0,0,[],[],[]),
- Data
+ case Sock of
+ {'$inet', Handler, _} ->
+ case
+ Handler:sendfile(Sock, Fd, 0, 0)
+ of
+ {ok, _BytesSent} ->
+ Data;
+ {error, enotsup} ->
+ error(notsup)
+ end;
+ _ when is_port(Sock) ->
+ %% Determine whether the driver
+ %% has native support by calling
+ %% the raw module directly; file:sendfile/5
+ %% would just use the fallback
+ %% and we would not be any wiser
+ RawModule = Fd#file_descriptor.module,
+ case
+ RawModule:sendfile(Fd,Sock,0,0,0,[],[],[])
+ of
+ {ok, _Ignored} ->
+ Data;
+ {error, enotsup} ->
+ error(notsup)
+ end
+ end
end,
%% Check if sendfile is supported on this platform
- case catch sendfile_send(Send) of
- ok ->
+ try sendfile_send(Send) of
+ ok ->
init_per_testcase(t_sendfile, Config);
- Error ->
- ct:log("Error: ~p",[Error]),
- {skip,"Not supported"}
+ Other ->
+ {fail,{not_ok,Other}}
+ catch
+ error : notsup ->
+ {skip,"Not supported"};
+ Class : Reason : Stacktrace ->
+ {fail,{Class, Reason, Stacktrace}}
end;
init_per_testcase(_TC,Config) ->
case read_fd_info() of
@@ -306,7 +330,7 @@ t_sendfile_sendduring(Config) ->
{ok, #file_info{size = Size}} =
file:read_file_info(Filename),
spawn_link(fun() ->
- timer:sleep(50),
+ erlang:yield(),
ok = gen_tcp:send(Sock, <<2>>)
end),
{ok, Size} = sendfile(Filename, Sock, SendfileOpts),
@@ -323,7 +347,7 @@ t_sendfile_recvduring(Config) ->
{ok, #file_info{size = Size}} =
file:read_file_info(Filename),
spawn_link(fun() ->
- timer:sleep(50),
+ timer:sleep(1),
ok = gen_tcp:send(Sock, <<1>>),
{ok,<<1>>} = gen_tcp:recv(Sock, 1)
end),
@@ -340,7 +364,7 @@ t_sendfile_closeduring(Config) ->
Send = fun(Sock,SFServPid) ->
spawn_link(fun() ->
- timer:sleep(50),
+ timer:sleep(1),
SFServPid ! stop
end),
case erlang:system_info(thread_pool_size) of
@@ -355,6 +379,12 @@ t_sendfile_closeduring(Config) ->
case sendfile(Filename, Sock, SendfileOpts) of
{error, closed} ->
ok;
+ {error, {closed, Size}}
+ when is_integer(Size) ->
+ ok;
+ {error, {epipe, Size}}
+ when is_integer(Size) ->
+ ok;
{ok, Size} when is_integer(Size) ->
ok
end
@@ -386,7 +416,7 @@ t_sendfile_crashduring(Config) ->
Send = fun(Sock) ->
spawn_link(fun() ->
- timer:sleep(50),
+ timer:sleep(1),
exit(die)
end),
{error, closed} = sendfile(Filename, Sock, SendfileOpts),
@@ -416,23 +446,24 @@ t_sendfile_arguments(Config) ->
{ok, Port} = inet:port(Listener),
ErrorCheck =
- fun(Reason, Offset, Length, Opts) ->
+ fun(Reasons, Offset, Length, Opts) ->
{ok, Sender} = gen_tcp:connect({127, 0, 0, 1}, Port,
[{packet, 0}, {active, false}]),
{ok, Receiver} = gen_tcp:accept(Listener),
{ok, Fd} = file:open(Filename, [read, raw]),
{error, Reason} = file:sendfile(Fd, Sender, Offset, Length, Opts),
+ true = lists:member(Reason, Reasons),
gen_tcp:close(Receiver),
gen_tcp:close(Sender),
file:close(Fd)
end,
- ErrorCheck(einval, -1, 0, []),
- ErrorCheck(einval, 0, -1, []),
- ErrorCheck(badarg, gurka, 0, []),
- ErrorCheck(badarg, 0, gurka, []),
- ErrorCheck(badarg, 0, 0, gurka),
- ErrorCheck(badarg, 0, 0, [{chunk_size, gurka}]),
+ ErrorCheck([einval,badarg], -1, 0, []),
+ ErrorCheck([einval,badarg], 0, -1, []),
+ ErrorCheck([badarg], gurka, 0, []),
+ ErrorCheck([badarg], 0, gurka, []),
+ ErrorCheck([badarg], 0, 0, gurka),
+ ErrorCheck([badarg], 0, 0, [{chunk_size, gurka}]),
gen_tcp:close(Listener),
@@ -523,9 +554,10 @@ sendfile(Filename,Sock,Opts) ->
{error, Reason} ->
{error, Reason};
{ok, Fd} ->
- Res = file:sendfile(Fd, Sock, 0, 0, Opts),
- _ = file:close(Fd),
- Res
+ try file:sendfile(Fd, Sock, 0, 0, Opts)
+ after
+ _ = file:close(Fd)
+ end
end.
%% This function returns the number of open fds on a system