diff options
author | Raimo Niskanen <raimo@erlang.org> | 2021-02-26 16:17:39 +0100 |
---|---|---|
committer | Raimo Niskanen <raimo@erlang.org> | 2021-04-23 13:15:21 +0200 |
commit | 78ce348efffc0d562994c68711e04b8587fbe5c6 (patch) | |
tree | bf8fcfce91d2b91e5b57f5a0810b7b9134421e63 | |
parent | 141fd67c2337245994f38d0259a4b79f041603ab (diff) | |
download | erlang-78ce348efffc0d562994c68711e04b8587fbe5c6.tar.gz |
Implement sendfile over NIF handshake
-rw-r--r-- | erts/emulator/nifs/common/prim_file_nif.c | 82 | ||||
-rw-r--r-- | erts/emulator/nifs/common/prim_file_nif.h | 4 | ||||
-rw-r--r-- | erts/emulator/nifs/common/prim_file_nif_dyncall.h | 36 | ||||
-rw-r--r-- | erts/emulator/nifs/common/prim_socket_nif.c | 1235 | ||||
-rw-r--r-- | erts/emulator/nifs/common/socket_util.c | 4 | ||||
-rw-r--r-- | erts/emulator/nifs/unix/unix_prim_file.c | 18 | ||||
-rw-r--r-- | erts/emulator/nifs/win32/win_prim_file.c | 13 | ||||
-rw-r--r-- | erts/preloaded/ebin/prim_file.beam | bin | 32640 -> 32652 bytes | |||
-rw-r--r-- | erts/preloaded/ebin/prim_socket.beam | bin | 30856 -> 31540 bytes | |||
-rw-r--r-- | erts/preloaded/src/prim_file.erl | 5 | ||||
-rw-r--r-- | erts/preloaded/src/prim_socket.erl | 61 | ||||
-rw-r--r-- | lib/kernel/doc/src/file.xml | 10 | ||||
-rw-r--r-- | lib/kernel/doc/src/socket.xml | 371 | ||||
-rw-r--r-- | lib/kernel/src/file.erl | 91 | ||||
-rw-r--r-- | lib/kernel/src/gen_tcp.erl | 23 | ||||
-rw-r--r-- | lib/kernel/src/gen_tcp_socket.erl | 35 | ||||
-rw-r--r-- | lib/kernel/src/inet.erl | 15 | ||||
-rw-r--r-- | lib/kernel/src/socket.erl | 464 | ||||
-rw-r--r-- | lib/kernel/test/sendfile_SUITE.erl | 84 |
19 files changed, 2186 insertions, 365 deletions
diff --git a/erts/emulator/nifs/common/prim_file_nif.c b/erts/emulator/nifs/common/prim_file_nif.c index c01ad3616c..7d46035eb3 100644 --- a/erts/emulator/nifs/common/prim_file_nif.c +++ b/erts/emulator/nifs/common/prim_file_nif.c @@ -1,7 +1,7 @@ /* * %CopyrightBegin% * - * Copyright Ericsson 2017-2020. All Rights Reserved. + * Copyright Ericsson 2017-2021. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -30,6 +30,7 @@ #include "erl_driver.h" #include "prim_file_nif.h" +#include "prim_file_nif_dyncall.h" /* NIF interface declarations */ static int load(ErlNifEnv *env, void** priv_data, ERL_NIF_TERM load_info); @@ -220,6 +221,8 @@ static ErlNifPid erts_prim_file_pid; static void owner_death_callback(ErlNifEnv* env, void* obj, ErlNifPid* pid, ErlNifMonitor* mon); +static ErlNifResourceDynCall dyncall; + static int load(ErlNifEnv *env, void** priv_data, ERL_NIF_TERM prim_file_pid) { ErlNifResourceTypeInit callbacks; @@ -264,11 +267,13 @@ static int load(ErlNifEnv *env, void** priv_data, ERL_NIF_TERM prim_file_pid) am_cur = enif_make_atom(env, "cur"); am_eof = enif_make_atom(env, "eof"); - callbacks.down = owner_death_callback; - callbacks.dtor = NULL; - callbacks.stop = NULL; + callbacks.down = owner_death_callback; + callbacks.dtor = NULL; + callbacks.stop = NULL; + callbacks.dyncall = dyncall; + callbacks.members = 4; - efile_resource_type = enif_open_resource_type_x(env, "efile", &callbacks, + efile_resource_type = enif_init_resource_type(env, "efile", &callbacks, ERL_NIF_RT_CREATE, NULL); *priv_data = NULL; @@ -414,6 +419,66 @@ static void owner_death_callback(ErlNifEnv* env, void* obj, ErlNifPid* pid, ErlN } } +static void dyncall_dup(ErlNifEnv* env, efile_data_t* d, struct prim_file_nif_dyncall_dup *dc_dup) { + + enum efile_state_t previous_state; + + previous_state = erts_atomic32_cmpxchg_acqb(&d->state, + EFILE_STATE_BUSY, EFILE_STATE_IDLE); + + if (previous_state == EFILE_STATE_IDLE) { + int do_dup = !0; + + dc_dup->error = efile_get_handle(env, d, do_dup, &dc_dup->handle); + + previous_state = erts_atomic32_cmpxchg_relb(&d->state, + EFILE_STATE_IDLE, EFILE_STATE_BUSY); + + ASSERT(previous_state != EFILE_STATE_IDLE); + + if(previous_state == EFILE_STATE_CLOSE_PENDING) { + /* This is the only point where a change from CLOSE_PENDING is + * possible, and we're running synchronously, so we can't race with + * anything else here. */ + posix_errno_t ignored; + + erts_atomic32_set_acqb(&d->state, EFILE_STATE_CLOSED); + efile_close(d, &ignored); + } + } else { + /* CLOSE_PENDING should be impossible at this point since it requires + * a transition from BUSY; the only valid state here is CLOSED. */ + ASSERT(previous_state == EFILE_STATE_CLOSED); + + dc_dup->error = EINVAL; + } +} + +static void dyncall(ErlNifEnv* env, void* obj, void* data) { + efile_data_t *d = (efile_data_t*)obj; + struct prim_file_nif_dyncall *dc; + + for (dc = (struct prim_file_nif_dyncall *)data; + dc->size > 0; + dc = (struct prim_file_nif_dyncall *) + ((char *)dc + dc->size)) { + + switch (dc->op) { + + case prim_file_nif_dyncall_dup: { + struct prim_file_nif_dyncall_dup *dc_dup = + (struct prim_file_nif_dyncall_dup *)dc; + ASSERT(dc->size >= sizeof(*dc_dup)); + + dyncall_dup(env, d, dc_dup); + dc->completed = !0; + + return; + } + } // switch (dc->op) + } +} + static ERL_NIF_TERM efile_filetype_to_atom(enum efile_filetype_t type) { switch(type) { case EFILE_FILETYPE_DEVICE: return am_device; @@ -928,9 +993,14 @@ static ERL_NIF_TERM ipread_s32bu_p32bu_nif_impl(efile_data_t *d, ErlNifEnv *env, } static ERL_NIF_TERM get_handle_nif_impl(efile_data_t *d, ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) { + ERL_NIF_TERM result; + posix_errno_t error; ASSERT(argc == 0); - return efile_get_handle(env, d); + error = efile_get_handle(env, d, 0, &result); + ASSERT(error == 0); (void)error; + + return result; } static ERL_NIF_TERM build_file_info(ErlNifEnv *env, efile_fileinfo_t *info) { diff --git a/erts/emulator/nifs/common/prim_file_nif.h b/erts/emulator/nifs/common/prim_file_nif.h index d4f18fd494..111323b47c 100644 --- a/erts/emulator/nifs/common/prim_file_nif.h +++ b/erts/emulator/nifs/common/prim_file_nif.h @@ -1,7 +1,7 @@ /* * %CopyrightBegin% * - * Copyright Ericsson 2017-2018. All Rights Reserved. + * Copyright Ericsson 2017-2021. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -129,7 +129,7 @@ posix_errno_t efile_marshal_path(ErlNifEnv *env, ERL_NIF_TERM path, efile_path_t * * This is an internal function intended to support tests and tricky * operations like sendfile(2). */ -ERL_NIF_TERM efile_get_handle(ErlNifEnv *env, efile_data_t *d); +posix_errno_t efile_get_handle(ErlNifEnv *env, efile_data_t *d, int do_dup, ERL_NIF_TERM *handle); /** @brief Read until EOF or the given iovec has been filled. * diff --git a/erts/emulator/nifs/common/prim_file_nif_dyncall.h b/erts/emulator/nifs/common/prim_file_nif_dyncall.h new file mode 100644 index 0000000000..b7881fb0f4 --- /dev/null +++ b/erts/emulator/nifs/common/prim_file_nif_dyncall.h @@ -0,0 +1,36 @@ +/* + * %CopyrightBegin% + * + * Copyright Ericsson 2021. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * %CopyrightEnd% + */ + +enum prim_file_nif_dyncall_op { + prim_file_nif_dyncall_dup, +}; + +struct prim_file_nif_dyncall { + size_t size; + enum prim_file_nif_dyncall_op op; + int completed; +}; + +struct prim_file_nif_dyncall_dup { + struct prim_file_nif_dyncall common; + + int error; + ERL_NIF_TERM handle; +}; diff --git a/erts/emulator/nifs/common/prim_socket_nif.c b/erts/emulator/nifs/common/prim_socket_nif.c index 459f75d540..3440ee3dce 100644 --- a/erts/emulator/nifs/common/prim_socket_nif.c +++ b/erts/emulator/nifs/common/prim_socket_nif.c @@ -81,7 +81,16 @@ #include <netpacket/packet.h> #endif -/* SENDFILE STUFF HERE IF WE NEED IT... */ +#ifdef HAVE_SENDFILE +#if defined(__linux__) || (defined(__sun) && defined(__SVR4)) + #include <sys/sendfile.h> +#elif defined(__FreeBSD__) || defined(__DragonFly__) + /* Need to define __BSD_VISIBLE in order to expose prototype + * of sendfile in sys/socket.h + */ + #define __BSD_VISIBLE 1 +#endif +#endif #if defined(__APPLE__) && defined(__MACH__) && !defined(__DARWIN__) #define __DARWIN__ 1 @@ -355,7 +364,7 @@ static void (*esock_sctp_freepaddrs)(struct sockaddr *addrs) = NULL; #include "socket_tarray.h" #include "socket_int.h" #include "socket_util.h" - +#include "prim_file_nif_dyncall.h" #if defined(ERTS_INLINE) # define ESOCK_INLINE ERTS_INLINE @@ -449,12 +458,17 @@ typedef ErlNifEvent HANDLE; #define ESOCK_STATE_ACCEPTING 0x0004 /* readState */ #define ESOCK_STATE_CONNECTING 0x0010 /* writeState */ #define ESOCK_STATE_CONNECTED 0x0020 /* writeState */ + /* This is set in either readState or writeState - * so it has to be read from both */ + * so it has to be read from both. + * Means that the socket has been used in select, + * so select_stop is required. */ #define ESOCK_STATE_SELECTED 0x0100 /* readState or writeState */ + /* These are set in both readState and writeState * so they can be read from either. */ #define ESOCK_STATE_CLOSING 0x0200 /* readState and writeState */ + #define ESOCK_STATE_CLOSED 0x0400 /* readState and writeState */ // #define ESOCK_STATE_DTOR 0x8000 @@ -799,6 +813,7 @@ typedef struct { ESockRequestQueueElement* last; } ESockRequestQueue; + /*** The point of this is primarily testing ***/ /* #if defined(ESOCK_COUNTER_SIZE) @@ -860,6 +875,25 @@ typedef Uint64 ESockCounter; // static const ESockCounter esock_counter_max = ESOCK_COUNTER_MAX; +#ifdef HAVE_SENDFILE + +typedef struct { + ESockCounter cnt; // Calls to OS sendfile() + ESockCounter byteCnt; // Bytes sent with sendfile + ESockCounter fails; // Failed sendfile operations + ESockCounter max; // Largest sendfile operation + ESockCounter maxCnt; // Counter for ="= + ESockCounter pkg; // Sendfile chunks + ESockCounter pkgMax; // Largest sendfile chunk + ESockCounter tries; // Started sendfile operations + ESockCounter waits; // Select's during sendfile +} ESockSendfileCounters; +static ESockSendfileCounters initESockSendfileCounters = + {0, 0, 0, 0, 0, 0, 0, 0, 0}; + +#endif + + typedef struct { /* * +++ This is a way to, possibly, detect memory overrides "and stuff" +++ @@ -888,7 +922,7 @@ typedef struct { /**/ unsigned int writeState; // For debugging ESockRequestor currentWriter; - ESockRequestor* currentWriterP; // NULL or points to currentWriter + ESockRequestor* currentWriterP; // NULL or ¤tWriter ESockRequestQueue writersQ; ESockCounter writePkgCnt; ESockCounter writePkgMax; @@ -897,9 +931,13 @@ typedef struct { ESockCounter writeTries; ESockCounter writeWaits; ESockCounter writeFails; +#ifdef HAVE_SENDFILE + SOCKET sendfileSock; + ESockSendfileCounters* sendfileCountersP; +#endif /* +++ Connector +++ */ ESockRequestor connector; - ESockRequestor* connectorP; + ESockRequestor* connectorP; // NULL or &connector /* +++ Config stuff +++ */ size_t wCtrlSz; // Write control buffer size ESockMeta meta; // Level 'otp' option 'meta' term @@ -909,7 +947,7 @@ typedef struct { /**/ unsigned int readState; // For debugging ESockRequestor currentReader; - ESockRequestor* currentReaderP; // NULL or points to currentReader + ESockRequestor* currentReaderP; // NULL or ¤tReader ESockRequestQueue readersQ; ErlNifBinary rbuffer; // DO WE NEED THIS Uint32 readCapacity; // DO WE NEED THIS @@ -922,7 +960,7 @@ typedef struct { ESockCounter readFails; /* +++ Accept stuff +++ */ ESockRequestor currentAcceptor; - ESockRequestor* currentAcceptorP; // NULL or points to currentAcceptor + ESockRequestor* currentAcceptorP; // NULL or ¤tAcceptor ESockRequestQueue acceptorsQ; ESockCounter accSuccess; ESockCounter accTries; @@ -1047,6 +1085,7 @@ extern char* erl_errno_id(int error); /* THIS IS JUST TEMPORARY??? */ * nif_send * nif_sendto * nif_sendmsg + * nif_sendfile * nif_recv * nif_recvfrom * nif_recvmsg @@ -1143,7 +1182,7 @@ ESOCK_SOCKET_INFO_REQ_FUNCS static ERL_NIF_TERM socket_info_reqs(ErlNifEnv* env, ESockDescriptor* descP, - ESockRequestor* crp, + ESockRequestor* currentRequestorP, ESockRequestQueue* q); static ERL_NIF_TERM esock_supports_0(ErlNifEnv* env); @@ -1257,6 +1296,56 @@ static ERL_NIF_TERM esock_sendmsg(ErlNifEnv* env, ERL_NIF_TERM eMsg, int flags, ERL_NIF_TERM eIOV); + +#ifdef HAVE_SENDFILE +static ERL_NIF_TERM +esock_sendfile_start(ErlNifEnv *env, + ESockDescriptor *descP, + ERL_NIF_TERM sockRef, + ERL_NIF_TERM sendRef, + off_t offset, + size_t count, + ERL_NIF_TERM fRef); +static ERL_NIF_TERM +esock_sendfile_cont(ErlNifEnv *env, + ESockDescriptor *descP, + ERL_NIF_TERM sockRef, + ERL_NIF_TERM sendRef, + off_t offset, + size_t count); +static ERL_NIF_TERM +esock_sendfile_deferred_close(ErlNifEnv *env, + ESockDescriptor *descP); +static int +esock_sendfile(ErlNifEnv *env, + ESockDescriptor *descP, + ERL_NIF_TERM sockRef, + off_t offset, + size_t *count, + int *errP); +static ERL_NIF_TERM +esock_sendfile_error(ErlNifEnv *env, + ESockDescriptor *descP, + ERL_NIF_TERM sockRef, + ERL_NIF_TERM reason); +static ERL_NIF_TERM +esock_sendfile_errno(ErlNifEnv *env, + ESockDescriptor *descP, + ERL_NIF_TERM sockRef, + int err); +static ERL_NIF_TERM +esock_sendfile_ok(ErlNifEnv *env, + ESockDescriptor *descP, + ERL_NIF_TERM sockRef, + size_t count); +static ERL_NIF_TERM +esock_sendfile_select(ErlNifEnv *env, + ESockDescriptor *descP, + ERL_NIF_TERM sockRef, + ERL_NIF_TERM sendRef, + size_t count); +#endif // #ifdef HAVE_SENDFILE + static ERL_NIF_TERM esock_recv(ErlNifEnv* env, ESockDescriptor* descP, ERL_NIF_TERM sendRef, @@ -2868,9 +2957,9 @@ static BOOLEAN_T recv_check_reader(ErlNifEnv* env, static void recv_init_current_reader(ErlNifEnv* env, ESockDescriptor* descP, ERL_NIF_TERM ref); -static ERL_NIF_TERM recv_update_current_reader(ErlNifEnv* env, - ESockDescriptor* descP, - ERL_NIF_TERM sockRef); +static void recv_update_current_reader(ErlNifEnv* env, + ESockDescriptor* descP, + ERL_NIF_TERM sockRef); static void recv_error_current_reader(ErlNifEnv* env, ESockDescriptor* descP, ERL_NIF_TERM sockRef, @@ -3172,7 +3261,7 @@ static void requestor_init(ESockRequestor* reqP); static int requestor_release(const char* slogan, ErlNifEnv* env, ESockDescriptor* descP, - ESockRequestor* reqP); + ESockRequestor* reqP); static BOOLEAN_T qsearch4pid(ErlNifEnv* env, ESockRequestQueue* q, @@ -3236,6 +3325,11 @@ static void esock_send_wrap_msg(ErlNifEnv* env, static void esock_send_close_msg(ErlNifEnv* env, ESockDescriptor* descP, ErlNifPid* pid); +#ifdef HAVE_SENDFILE +static void +esock_send_sendfile_deferred_close_msg(ErlNifEnv* env, + ESockDescriptor* descP); +#endif static void esock_send_abort_msg(ErlNifEnv* env, ESockDescriptor* descP, ERL_NIF_TERM sockRef, @@ -3626,6 +3720,7 @@ ERL_NIF_TERM esock_atom_socket_tag; // This has a "special" name ('$socket') LOCAL_ATOM_DECL(dont); \ LOCAL_ATOM_DECL(dtor); \ LOCAL_ATOM_DECL(dup); \ + LOCAL_ATOM_DECL(efile); \ LOCAL_ATOM_DECL(exclude); \ LOCAL_ATOM_DECL(false); \ LOCAL_ATOM_DECL(frag_needed); \ @@ -3690,6 +3785,7 @@ ERL_NIF_TERM esock_atom_socket_tag; // This has a "special" name ('$socket') LOCAL_ATOM_DECL(pkt_toobig); \ LOCAL_ATOM_DECL(policy_fail); \ LOCAL_ATOM_DECL(port_unreach); \ + LOCAL_ATOM_DECL(prim_file); \ LOCAL_ATOM_DECL(probe); \ LOCAL_ATOM_DECL(protocols); \ LOCAL_ATOM_DECL(rcvctrlbuf); \ @@ -3709,6 +3805,15 @@ ERL_NIF_TERM esock_atom_socket_tag; // This has a "special" name ('$socket') LOCAL_ATOM_DECL(selected); \ LOCAL_ATOM_DECL(sender_dry); \ LOCAL_ATOM_DECL(send_failure); \ + LOCAL_ATOM_DECL(sendfile); \ + LOCAL_ATOM_DECL(sendfile_byte); \ + LOCAL_ATOM_DECL(sendfile_deferred_close); \ + LOCAL_ATOM_DECL(sendfile_fails); \ + LOCAL_ATOM_DECL(sendfile_max); \ + LOCAL_ATOM_DECL(sendfile_pkg); \ + LOCAL_ATOM_DECL(sendfile_pkg_max); \ + LOCAL_ATOM_DECL(sendfile_tries); \ + LOCAL_ATOM_DECL(sendfile_waits); \ LOCAL_ATOM_DECL(shutdown); \ LOCAL_ATOM_DECL(slist); \ LOCAL_ATOM_DECL(sndctrlbuf); \ @@ -3806,6 +3911,9 @@ static ESOCK_INLINE ErlNifEnv* esock_alloc_env(const char* slogan) * nif_send(Sock, SendRef, Data, Flags) * nif_sendto(Sock, SendRef, Data, Dest, Flags) * nif_sendmsg(Sock, SendRef, Msg, Flags) + * nif_sendfile(Sock, SendRef, Offset, Count, InFileRef) + * nif_sendfile(Sock, SendRef, Offset, Count) + * nif_sendfile(Sock) * nif_recv(Sock, Length, Flags, RecvRef) * nif_recvfrom(Sock, RecvRef, BufSz, Flags) * nif_recvmsg(Sock, RecvRef, BufSz, CtrlSz, Flags) @@ -4261,6 +4369,40 @@ ERL_NIF_TERM esock_socket_info_counters(ErlNifEnv* env, ESOCK_ASSERT( numKeys == numVals ); ESOCK_ASSERT( MKMA(env, keys, vals, numKeys, &cnts) ); +#ifdef HAVE_SENDFILE + if (descP->sendfileCountersP != NULL) { + ESockSendfileCounters *cP = descP->sendfileCountersP; + ERL_NIF_TERM m, + sfKeys[] = + {atom_sendfile, + atom_sendfile_byte, + atom_sendfile_fails, + atom_sendfile_max, + atom_sendfile_pkg, + atom_sendfile_pkg_max, + atom_sendfile_tries, + atom_sendfile_waits}, + sfVals[] = + {MKUI(env, cP->cnt), + MKUI(env, cP->byteCnt), + MKUI(env, cP->fails), + MKUI(env, cP->max), + MKUI(env, cP->pkg), + MKUI(env, cP->pkgMax), + MKUI(env, cP->tries), + MKUI(env, cP->waits)}; + size_t n, numSfKeys = NUM(sfKeys); + + ESOCK_ASSERT( numSfKeys == NUM(sfVals) ); + for (n = 0; n < numSfKeys; n++) { + ESOCK_ASSERT( enif_make_map_put(env, cnts, + sfKeys[n], sfVals[n], + &m) ); + cnts = m; + } + } +#endif + SSDBG( descP, ("SOCKET", "esock_socket_info_counters -> done with" "\r\n cnts: %T" "\r\n", cnts) ); @@ -4439,7 +4581,7 @@ ERL_NIF_TERM esock_command_use_socket_registry(ErlNifEnv* env, #define ESOCK_INFO_REQ_FUNCS \ ESOCK_INFO_REQ_FUNC_DECL(readers, currentReaderP, readersQ) \ - ESOCK_INFO_REQ_FUNC_DECL(writers, currentWriterP, writersQ) \ + ESOCK_INFO_REQ_FUNC_DECL(writers, currentWriterP, writersQ) \ ESOCK_INFO_REQ_FUNC_DECL(acceptors, currentAcceptorP, acceptorsQ) #define ESOCK_INFO_REQ_FUNC_DECL(F, CRP, Q) \ @@ -4455,14 +4597,14 @@ ESOCK_INFO_REQ_FUNCS static ERL_NIF_TERM socket_info_reqs(ErlNifEnv* env, ESockDescriptor* descP, - ESockRequestor* crp, + ESockRequestor* currentRequestorP, ESockRequestQueue* q) { ESockRequestQueueElement* tmp; ERL_NIF_TERM info; unsigned int cnt = 0; - if (crp != NULL) { + if (currentRequestorP != NULL) { // We have an active requestor! cnt++; @@ -5588,8 +5730,8 @@ ERL_NIF_TERM esock_connect(ErlNifEnv* env, ESockAddress* addrP, SOCKLEN_T addrLen) { - int sres, save_errno; - ErlNifPid self; + int save_errno; + ErlNifPid self; ESOCK_ASSERT( enif_self(env, &self) != NULL ); @@ -5625,9 +5767,8 @@ ERL_NIF_TERM esock_connect(ErlNifEnv* env, /* Finalize after received select message */ requestor_release("esock_connect finalize -> connected", - env, descP, descP->connectorP); + env, descP, &descP->connector); descP->connectorP = NULL; - descP->writeState &= ~ESOCK_STATE_CONNECTING; if (! verify_is_connected(descP, &save_errno)) { @@ -5671,31 +5812,27 @@ ERL_NIF_TERM esock_connect(ErlNifEnv* env, ("SOCKET", "esock_connect {%d} -> would block => select\r\n", descP->sock) ); { - /* Initiate connector */ + int sres; + if ((sres = + esock_select_write(env, descP->sock, descP, NULL, + sockRef, connRef)) < 0) + return + enif_raise_exception(env, + MKT2(env, atom_select_write, + MKI(env, sres))); + /* Initiate connector */ descP->connector.pid = self; ESOCK_ASSERT( MONP("esock_connect -> conn", env, descP, &self, &descP->connector.mon) == 0 ); descP->connector.env = esock_alloc_env("connector"); - descP->connectorP = &descP->connector; descP->connector.ref = CP_TERM(descP->connector.env, connRef); + descP->connectorP = &descP->connector; + descP->writeState |= + (ESOCK_STATE_CONNECTING | ESOCK_STATE_SELECTED); - if ((sres = - esock_select_write(env, descP->sock, descP, NULL, - sockRef, connRef)) < 0) { - - requestor_release("esock_connect -> select failed", - env, descP, descP->connectorP); - descP->connectorP = NULL; - return enif_raise_exception(env, - MKT2(env, atom_select_write, - MKI(env, sres))); - } else { - descP->writeState |= - (ESOCK_STATE_CONNECTING | ESOCK_STATE_SELECTED); - return atom_select; - } + return atom_select; } break; @@ -5898,11 +6035,11 @@ ERL_NIF_TERM nif_accept(ErlNifEnv* env, SSDBG( descP, ("SOCKET", "nif_accept%T), {%d,0x%X} ->" "\r\n ReqRef: %T" - "\r\n Current Acceptor Addr: %p" - "\r\n Current Acceptor pid: %T" - "\r\n Current Acceptor mon: %T" - "\r\n Current Acceptor env: 0x%lX" - "\r\n Current Acceptor ref: %T" + "\r\n Current Acceptor addr: %p" + "\r\n Current Acceptor pid: %T" + "\r\n Current Acceptor mon: %T" + "\r\n Current Acceptor env: 0x%lX" + "\r\n Current Acceptor ref: %T" "\r\n", sockRef, descP->sock, descP->readState, ref, @@ -6156,7 +6293,7 @@ ERL_NIF_TERM esock_accept_accepting_current_accept(ErlNifEnv* env, descP->readState &= ~ESOCK_STATE_ACCEPTING; - descP->currentAcceptorP = NULL; + descP->currentAcceptorP = NULL; } } @@ -6210,7 +6347,7 @@ ERL_NIF_TERM esock_accept_accepting_current_error(ErlNifEnv* env, ESOCK_CNT_INC(env, descP, sockRef, atom_acc_fails, &descP->accFails, 1); requestor_release("esock_accept_accepting_current_error", - env, descP, descP->currentAcceptorP); + env, descP, &descP->currentAcceptor); reason = MKA(env, erl_errno_id(save_errno)); res = esock_make_error(env, reason); @@ -6227,7 +6364,7 @@ ERL_NIF_TERM esock_accept_accepting_current_error(ErlNifEnv* env, DEMONP("esock_accept_accepting_current_error -> pop'ed writer", env, descP, &req.mon); } - descP->currentAcceptorP = NULL; + descP->currentAcceptorP = NULL; } return res; @@ -6289,7 +6426,7 @@ ERL_NIF_TERM esock_accept_busy_retry(ErlNifEnv* env, descP->readState &= ~ESOCK_STATE_ACCEPTING; - descP->currentAcceptorP = NULL; + descP->currentAcceptorP = NULL; } res = @@ -6514,9 +6651,6 @@ ERL_NIF_TERM esock_send(ErlNifEnv* env, return writerCheck; } - /* We ignore the wrap for the moment. - * Maybe we should issue a wrap-message to controlling process... - */ ESOCK_CNT_INC(env, descP, sockRef, atom_write_tries, &descP->writeTries, 1); send_result = @@ -6662,9 +6796,6 @@ ERL_NIF_TERM esock_sendto(ErlNifEnv* env, return writerCheck; } - /* We ignore the wrap for the moment. - * Maybe we should issue a wrap-message to controlling process... - */ ESOCK_CNT_INC(env, descP, sockRef, atom_write_tries, &descP->writeTries, 1); if (toAddrP != NULL) { @@ -6963,9 +7094,6 @@ ERL_NIF_TERM esock_sendmsg(ErlNifEnv* env, * but zero it just in case */ msgHdr.msg_flags = 0; - /* We ignore the wrap for the moment. - * Maybe we should issue a wrap-message to controlling process... - */ ESOCK_CNT_INC(env, descP, sockRef, atom_write_tries, &descP->writeTries, 1); /* And now, try to send the message */ @@ -7059,6 +7187,676 @@ ERL_NIF_TERM nwritev(ErlNifEnv* env, /* ---------------------------------------------------------------------- + * nif_sendfile/1,4,5 + * + * Description: + * Send a file on a socket + * + * Arguments: + * Socket (ref) - Points to the socket descriptor. + * + * SendRef - A unique id reference() for this (send) request. + * + * Offset - File offset to start from. + * Count - The number of bytes to send. + * + * InFileRef - A file NIF resource. + */ + +static ERL_NIF_TERM +nif_sendfile(ErlNifEnv* env, + int argc, + const ERL_NIF_TERM argv[]) +{ +#if defined(__WIN32__) || !defined(HAVE_SENDFILE) + return enif_raise_exception(env, MKA(env, "notsup")); +#else + ESockDescriptor *descP; + ERL_NIF_TERM sockRef, res; + + SGDBG( ("SOCKET", "nif_sendfile -> entry with argc: %d\r\n", argc) ); + + if (argc < 1) { + SGDBG( ("SOCKET", "nif_sendfile -> argc < 1\r\n") ); + return enif_make_badarg(env); + } + sockRef = argv[0]; + if (! ESOCK_GET_RESOURCE(env, sockRef, (void**) (&descP))) { + SSDBG( descP, ("SOCKET", "nif_sendfile -> get resource failed\r\n") ); + return enif_make_badarg(env); + } + + if (argc < 2) { // argc == 1 + + MLOCK(descP->writeMtx); + + SSDBG( descP, + ("SOCKET", "nif_sendfile(%T), {%d,0x%X} ->" + "\r\n", + sockRef, descP->sock, descP->writeState) ); + + res = esock_sendfile_deferred_close(env, descP); + + } else { + ERL_NIF_TERM sendRef; + ErlNifSInt64 offset64; + ErlNifUInt64 count64u; + off_t offset; + size_t count; + BOOLEAN_T a2ok; + + ESOCK_ASSERT( argc >= 4 ); + + sendRef = argv[1]; + if ((! enif_is_ref(env, sendRef))) { + SGDBG( ("SOCKET", "nif_sendfile -> argv[1] decode failed\r\n") ); + return enif_make_badarg(env); + } + + if ((! (a2ok = GET_INT64(env, argv[2], &offset64))) || + (! GET_UINT64(env, argv[3], &count64u))) { + if ((! IS_INTEGER(env, argv[3])) || + (! IS_INTEGER(env, argv[3]))) + return enif_make_badarg(env); + if (! a2ok) + return esock_make_error_integer_range(env, argv[2]); + else + return esock_make_error_integer_range(env, argv[3]); + } + offset = (off_t) offset64; + if (offset64 != (ErlNifSInt64) offset) + return esock_make_error_integer_range(env, argv[2]); + count = (size_t) count64u; + if (count64u != (ErlNifUInt64) count) + return esock_make_error_integer_range(env, argv[3]); + + if (argc == 4) { + + MLOCK(descP->writeMtx); + + SSDBG( descP, + ("SOCKET", "nif_sendfile(%T), {%d,0x%X} ->" + "\r\n sendRef: %T" + "\r\n offset: %ld" + "\r\n count: %ld" + "\r\n", + sockRef, descP->sock, descP->readState, + sendRef, (long) offset, (long) count) ); + + res = + esock_sendfile_cont(env, descP, sockRef, sendRef, + offset, count); + } else { + ERL_NIF_TERM fRef; + + ESOCK_ASSERT( argc == 5 ); + + fRef = argv[4]; + if ((! enif_is_ref(env, fRef))) + return enif_make_badarg(env); + + MLOCK(descP->writeMtx); + + SSDBG( descP, + ("SOCKET", "nif_sendfile(%T), {%d,0x%X} ->" + "\r\n sendRef: %T" + "\r\n offset: %ld" + "\r\n count: %ld" + "\r\n fRef: %T" + "\r\n", + sockRef, descP->sock, descP->readState, + sendRef, (long) offset, (long) count, fRef) ); + + res = + esock_sendfile_start(env, descP, sockRef, sendRef, + offset, count, fRef); + } + } + + SSDBG( descP, ("SOCKET", "nif_sendfile(%T) -> done with" + "\r\n res: %T" + "\r\n", sockRef, res) ); + + MUNLOCK(descP->writeMtx); + + return res; + +#endif // #if defined(__WIN32__) || !defined(HAVE_SENDFILE) +} + +#ifndef __WIN32__ +#ifdef HAVE_SENDFILE + +/* Start a sendfile() operation + */ +static ERL_NIF_TERM +esock_sendfile_start(ErlNifEnv *env, + ESockDescriptor *descP, + ERL_NIF_TERM sockRef, + ERL_NIF_TERM sendRef, + off_t offset, + size_t count, + ERL_NIF_TERM fRef) { + ERL_NIF_TERM writerCheck; + ssize_t res; + int err; + + SSDBG( descP, ("SOCKET", + "esock_sendfile_start(%T) {%d} -> sendRef: %T\r\n" + " fRef: %T\r\n" + " offset: %lu\r\n" + " count: %lu\r\n", + sockRef, descP->sock, sendRef, + fRef, (unsigned long) offset, (unsigned long) count) ); + + if (! IS_OPEN(descP->writeState)) { + return esock_make_error(env, atom_closed); + } + + /* Connect and Write uses the same select flag + * so they can not be simultaneous + */ + if (descP->connectorP != NULL) { + return esock_make_error_invalid(env, atom_state); + } + + /* Ensure that we either have no current writer or we are it, + * or enqueue this process if there is a current writer + */ + if (! send_check_writer(env, descP, sendRef, &writerCheck)) { + SSDBG( descP, ("SOCKET", + "esock_sendfile_start {%d} -> writer check failed: " + "\r\n %T\r\n", descP->sock, writerCheck) ); + + /* Returns 'select' if current process got enqueued, + * or exception invalid state if current process already + * was enqueued + */ + return writerCheck; + } + + if (descP->sendfileSock != INVALID_SOCKET) + return esock_make_error_invalid(env, atom_state); + + /* Get a dup:ed file handle from prim_file_nif + * through a NIF dyncall + */ + { + struct { + struct prim_file_nif_dyncall_dup dup; + struct prim_file_nif_dyncall end; + } dc; + ErlNifBinary dc_bin[1]; + + dc.dup.common.size = + CHARP(&dc.end) - CHARP(&dc.dup); + dc.dup.common.op = prim_file_nif_dyncall_dup; + dc.dup.common.completed = 0; + dc.end.size = 0; + + /* Request the handle */ + if ((enif_dynamic_resource_call(env, + atom_prim_file, atom_efile, fRef, + &dc) + != 0) || + (! dc.dup.common.completed)) { + // + goto error_invalid_efile; + } + if (dc.dup.error != 0) { + return + esock_sendfile_errno(env, descP, sockRef, + dc.dup.error); + } + if (! enif_inspect_binary(env, dc.dup.handle, dc_bin)) + goto error_invalid_efile; + if (dc_bin->size != sizeof(descP->sendfileSock)) { + enif_release_binary(dc_bin); + goto error_invalid_efile; + } + sys_memcpy(&descP->sendfileSock, dc_bin->data, + sizeof(descP->sendfileSock)); + enif_release_binary(dc_bin); + goto sendfile; + + error_invalid_efile: + return esock_sendfile_error(env, descP, sockRef, + MKT2(env, esock_atom_invalid, + atom_efile)); + } + + sendfile: + SSDBG( descP, ("SOCKET", + "esock_sendfile_start(%T) {%d} -> sendRef: %T\r\n" + " sendfileSock: %d\r\n", + sockRef, descP->sock, sendRef, + descP->sendfileSock) ); + + if (descP->sendfileCountersP == NULL) { + descP->sendfileCountersP = MALLOC(sizeof(ESockSendfileCounters)); + *descP->sendfileCountersP = initESockSendfileCounters; + } + + ESOCK_CNT_INC(env, descP, sockRef, + atom_sendfile_tries, &descP->sendfileCountersP->tries, 1); + descP->sendfileCountersP->maxCnt = 0; + + res = esock_sendfile(env, descP, sockRef, offset, &count, &err); + + if (res < 0) { // Terminal error + + (void) close(descP->sendfileSock); + descP->sendfileSock = INVALID_SOCKET; + + return esock_sendfile_errno(env, descP, sockRef, err); + + } else if (res > 0) { // Retry by select + + if (descP->currentWriterP == NULL) { + int mon_res; + + /* Register writer as current */ + ESOCK_ASSERT( enif_self(env, &descP->currentWriter.pid) != NULL ); + mon_res = + MONP("sendfile -> current writer", + env, descP, + &descP->currentWriter.pid, + &descP->currentWriter.mon); + ESOCK_ASSERT( mon_res >= 0 ); + + if (mon_res > 0) { + /* Caller died already, can happen for dirty NIFs */ + + (void) close(descP->sendfileSock); + descP->sendfileSock = INVALID_SOCKET; + + return + esock_sendfile_error(env, descP, sockRef, + MKT2(env, + esock_atom_invalid, + esock_atom_not_owner)); + } + ESOCK_ASSERT( descP->currentWriter.env == NULL ); + descP->currentWriter.env = esock_alloc_env("current-writer"); + descP->currentWriter.ref = + CP_TERM(descP->currentWriter.env, sendRef); + descP->currentWriterP = &descP->currentWriter; + } + // else current writer is already registered by requestor_pop() + + return esock_sendfile_select(env, descP, sockRef, sendRef, count); + + } else { // res == 0: Done + return esock_sendfile_ok(env, descP, sockRef, count); + } +} + +/* Continue an ongoing sendfile operation + */ +static ERL_NIF_TERM +esock_sendfile_cont(ErlNifEnv *env, + ESockDescriptor *descP, + ERL_NIF_TERM sockRef, + ERL_NIF_TERM sendRef, + off_t offset, + size_t count) { + ErlNifPid caller; + ssize_t res; + int err; + + SSDBG( descP, ("SOCKET", + "esock_sendfile_cont(%T) {%d} -> sendRef: %T\r\n", + sockRef, descP->sock, sendRef) ); + + if (! IS_OPEN(descP->writeState)) + return esock_make_error(env, atom_closed); + + /* Connect and Write uses the same select flag + * so they can not be simultaneous + */ + if (descP->connectorP != NULL) + return esock_make_error_invalid(env, atom_state); + + /* Verify that this process has a sendfile operation in progress */ + ESOCK_ASSERT( enif_self(env, &caller) != NULL ); + if ((descP->currentWriterP == NULL) || + (descP->sendfileSock == INVALID_SOCKET) || + (COMPARE_PIDS(&descP->currentWriter.pid, &caller) != 0)) { + // + return esock_raise_invalid(env, atom_state); + } + + res = esock_sendfile(env, descP, sockRef, offset, &count, &err); + + if (res < 0) { // Terminal error + + (void) close(descP->sendfileSock); + descP->sendfileSock = INVALID_SOCKET; + + return esock_sendfile_errno(env, descP, sockRef, err); + + } else if (res > 0) { // Retry by select + + /* Overwrite current writer registration */ + enif_clear_env(descP->currentWriter.env); + descP->currentWriter.ref = + CP_TERM(descP->currentWriter.env, sendRef); + + return esock_sendfile_select(env, descP, sockRef, sendRef, count); + + } else { // res == 0: Done + return esock_sendfile_ok(env, descP, sockRef, count); + } +} + +/* Deferred close of the dup:ed file descriptor + */ +static ERL_NIF_TERM +esock_sendfile_deferred_close(ErlNifEnv *env, + ESockDescriptor *descP) { + if (descP->sendfileSock == INVALID_SOCKET) + return esock_make_error_invalid(env, atom_state); + + (void) close(descP->sendfileSock); + descP->sendfileSock = INVALID_SOCKET; + + return esock_atom_ok; +} + +/* Platform independent sendfile() function + * + * Return < 0 for terminal error + * 0 for done + * > 0 for retry with select + */ +static int +esock_sendfile(ErlNifEnv *env, + ESockDescriptor *descP, + ERL_NIF_TERM sockRef, + off_t offset, + size_t *countP, + int *errP) { + + size_t pkgSize = 0; // Total sent in this call + + SSDBG( descP, ("SOCKET", + "esock_sendfile(%T) {%d}\r\n", + sockRef, descP->sock) ); + + for (;;) { + size_t chunk_size = (size_t) 0x20000000UL; // 0.5 GB + size_t bytes_sent; + ssize_t res; + int error; + + /* *countP == 0 means send the whole file - use chunk size */ + if ((*countP > 0) && (*countP < chunk_size)) + chunk_size = *countP; + + { + /* Platform dependent code: + * set and check bytes_sent, or + * set res to >= 0, or + * set res to < 0 and error to sock_errno() + */ +#if defined (__linux__) + + off_t prev_offset; + + prev_offset = offset; + res = + sendfile(descP->sock, descP->sendfileSock, + &offset, chunk_size); + if (res < 0) + error = sock_errno(); + ESOCK_ASSERT( offset >= prev_offset ); + ESOCK_ASSERT( (off_t) chunk_size >= (offset - prev_offset) ); + bytes_sent = (size_t) (offset - prev_offset); + +#elif defined(__FreeBSD__) || defined(__DragonFly__) || defined(__DARWIN__) + + off_t sbytes; + +#if defined(__DARWIN__) + sbytes = (off_t) chunk_size; + res = (ssize_t) + sendfile(descP->sendfileSock, descP->sock, offset, + &sbytes, NULL, 0); +#else + res = (ssize_t) + sendfile(descP->sendfileSock, descP->sock, offset, + chunk_size, NULL, &sbytes, 0); +#endif + if (res < 0) + error = sock_errno(); + ESOCK_ASSERT( sbytes >= 0 ); + ESOCK_ASSERT( (off_t) chunk_size >= sbytes ); + bytes_sent = (size_t) sbytes; + +#elif defined(__sun) && defined(__SVR4) && defined(HAVE_SENDFILEV) + + sendfilevec_t sfvec[1]; + + sfvec[0].sfv_fd = descP->sendfileSock; + sfvec[0].sfv_flag = 0; + sfvec[0].sfv_off = offset; + sfvec[0].sfv_len = chunk_size; + + res = sendfilev(descP->sock, sfvec, NUM(sfvec), &bytes_sent); + if (res < 0) { + if ((error = sock_errno()) == EINVAL) { + /* On e.b SunOS 5.10 using sfv_len > file size + * lands here - we regard this as a succesful send + * by pretending it is an EINTR causing the loop + * to continue with more data up to end of file + */ + error = EINTR; + } + } + ESOCK_ASSERT( chunk_size >= bytes_sent ); + +#else +#error "Unsupported sendfile syscall; update configure test." +#endif + + ESOCK_CNT_INC(env, descP, sockRef, + atom_sendfile, &descP->sendfileCountersP->cnt, 1); + + if (bytes_sent != 0) { + + pkgSize += bytes_sent; + + ESOCK_CNT_INC(env, descP, sockRef, + atom_sendfile_pkg, + &descP->sendfileCountersP->pkg, + 1); + ESOCK_CNT_INC(env, descP, sockRef, + atom_sendfile_byte, + &descP->sendfileCountersP->byteCnt, + bytes_sent); + + if (pkgSize > descP->sendfileCountersP->pkgMax) + descP->sendfileCountersP->pkgMax = pkgSize; + if ((descP->sendfileCountersP->maxCnt += bytes_sent) + > descP->sendfileCountersP->max) + descP->sendfileCountersP->max = + descP->sendfileCountersP->maxCnt; + } + + /* *countP == 0 means send whole file */ + if ((*countP > 0) && + ((*countP -= bytes_sent) == 0)) { // All sent + *countP = pkgSize; + return 0; + } + + if (res < 0) { + if (error == ERRNO_BLOCK) { + *countP = pkgSize; + return 1; + } + if (error == EINTR) + continue; + *errP = error; + return INVALID_SOCKET; + } + + if (bytes_sent == 0) { // End of input file + *countP = pkgSize; + return 0; + } + } + } + return 0; +} + +static ERL_NIF_TERM +esock_sendfile_errno(ErlNifEnv *env, + ESockDescriptor *descP, + ERL_NIF_TERM sockRef, + int err) { + ERL_NIF_TERM reason; + + reason = MKA(env, erl_errno_id(err)); + return esock_sendfile_error(env, descP, sockRef, reason); +} + +static ERL_NIF_TERM +esock_sendfile_error(ErlNifEnv *env, + ESockDescriptor *descP, + ERL_NIF_TERM sockRef, + ERL_NIF_TERM reason) { + + if (descP->sendfileCountersP == NULL) { + descP->sendfileCountersP = MALLOC(sizeof(ESockSendfileCounters)); + *descP->sendfileCountersP = initESockSendfileCounters; + } + + ESOCK_CNT_INC(env, descP, sockRef, + atom_sendfile_fails, + &descP->sendfileCountersP->fails, 1); + + SSDBG( descP, ("SOCKET", + "esock_sendfile_error(%T) {%d} -> error: %T\r\n", + sockRef, descP->sock, reason) ); + + /* XXX Should we have special treatment for EINVAL, + * such as to only fail current operation and activate + * the next from the queue? + */ + + if (descP->currentWriterP != NULL) { + + DEMONP("esock_sendfile_error", + env, descP, &descP->currentWriter.mon); + + /* Fail all queued writers */ + requestor_release("esock_sendfile_error", + env, descP, &descP->currentWriter); + send_error_waiting_writers(env, descP, sockRef, reason); + descP->currentWriterP = NULL; + + } + + return esock_make_error(env, reason); +} + +static ERL_NIF_TERM +esock_sendfile_select(ErlNifEnv *env, + ESockDescriptor *descP, + ERL_NIF_TERM sockRef, + ERL_NIF_TERM sendRef, + size_t count) { + int sres; + + /* Select write for this process */ + sres = + esock_select_write(env, descP->sock, descP, NULL, sockRef, sendRef); + if (sres < 0) { + ERL_NIF_TERM reason; + + /* Internal select error */ + DEMONP("esock_sendfile_select - failed", + env, descP, &descP->currentWriter.mon); + + /* Fail all queued writers */ + reason = MKT2(env, atom_select_write, MKI(env, sres)); + requestor_release("esock_sendfile_select_fail", + env, descP, &descP->currentWriter); + send_error_waiting_writers(env, descP, sockRef, reason); + descP->currentWriterP = NULL; + + (void) close(descP->sendfileSock); + descP->sendfileSock = INVALID_SOCKET; + + return enif_raise_exception(env, reason); + + } else { + ErlNifUInt64 bytes_sent; + + SSDBG( descP, + ("SOCKET", "esock_sendfile_select(%T) {%d} -> " + "sendRef (%T)\r\n" + "count: %lu\r\n", + sockRef, descP->sock, sendRef, (unsigned long) count) ); + + ESOCK_CNT_INC(env, descP, sockRef, + atom_sendfile_waits, + &descP->sendfileCountersP->waits, + 1); + descP->writeState |= ESOCK_STATE_SELECTED; + bytes_sent = (ErlNifUInt64) count; + + return MKT2(env, atom_select, MKUI64(env, bytes_sent)); + } +} + +static ERL_NIF_TERM +esock_sendfile_ok(ErlNifEnv *env, + ESockDescriptor *descP, + ERL_NIF_TERM sockRef, + size_t count) { + ErlNifUInt64 bytes_sent64u; + + SSDBG( descP, + ("SOCKET", "esock_sendfile_ok(%T) {%d} -> " + "everything written (%lu) - done\r\n", + sockRef, descP->sock, (unsigned long) count) ); + + if (descP->currentWriterP != NULL) { + + DEMONP("esock_sendfile_ok -> current writer", + env, descP, &descP->currentWriter.mon); + + /* + * Ok, this write is done maybe activate the next (if any) + */ + if (! activate_next_writer(env, descP, sockRef)) { + + SSDBG( descP, + ("SOCKET", + "esock_sendfile_ok(%T) {%d} -> no more writers\r\n", + sockRef, descP->sock) ); + + descP->currentWriterP = NULL; + } + } + + descP->writePkgMaxCnt = 0; + bytes_sent64u = (ErlNifUInt64) count; + + (void) close(descP->sendfileSock); + descP->sendfileSock = INVALID_SOCKET; + + return esock_make_ok2(env, MKUI64(env, bytes_sent64u)); +} + +#endif // #ifdef HAVE_SENDFILE +#endif // #ifndef __WIN32__ + + + +/* ---------------------------------------------------------------------- * nif_recv * * Description: @@ -7089,6 +7887,7 @@ ERL_NIF_TERM nif_recv(ErlNifEnv* env, ssize_t len; /* ssize_t due to the return type of recv() */ int flags; ERL_NIF_TERM res; + BOOLEAN_T a1ok; ESOCK_ASSERT( argc == 4 ); @@ -7103,15 +7902,13 @@ ERL_NIF_TERM nif_recv(ErlNifEnv* env, (COMPARE(recvRef, atom_zero) != 0)) { return enif_make_badarg(env); } - if ((! GET_UINT64(env, argv[1], &elen)) || + if ((! (a1ok = GET_UINT64(env, argv[1], &elen))) || (! GET_INT(env, argv[2], &flags))) { - BOOLEAN_T argv1_is_integer = IS_INTEGER(env, argv[1]); - - if ((! argv1_is_integer) || + if ((! IS_INTEGER(env, argv[1])) || (! IS_INTEGER(env, argv[2]))) return enif_make_badarg(env); - if (argv1_is_integer) + if (! a1ok) return esock_make_error_integer_range(env, argv[1]); return esock_make_error_integer_range(env, argv[2]); @@ -7264,6 +8061,7 @@ ERL_NIF_TERM nif_recvfrom(ErlNifEnv* env, ssize_t len; /* ssize_t due to the return type of recvfrom() */ int flags; ERL_NIF_TERM res; + BOOLEAN_T a1ok; ESOCK_ASSERT( argc == 4 ); @@ -7283,15 +8081,13 @@ ERL_NIF_TERM nif_recvfrom(ErlNifEnv* env, return enif_make_badarg(env); } - if ((! GET_UINT64(env, argv[1], &elen)) || + if ((! (a1ok = GET_UINT64(env, argv[1], &elen))) || (! GET_INT(env, argv[2], &flags))) { - BOOLEAN_T argv1_is_integer = IS_INTEGER(env, argv[1]); - - if ((! argv1_is_integer) || + if ((! IS_INTEGER(env, argv[1])) || (! IS_INTEGER(env, argv[2]))) return enif_make_badarg(env); - if (argv1_is_integer) + if (! a1ok) return esock_make_error_integer_range(env, argv[1]); return esock_make_error_integer_range(env, argv[2]); @@ -7451,6 +8247,7 @@ ERL_NIF_TERM nif_recvmsg(ErlNifEnv* env, ssize_t bufSz, ctrlSz; int flags; ERL_NIF_TERM res; + BOOLEAN_T a1ok, a2ok; ESOCK_ASSERT( argc == 5 ); @@ -7470,21 +8267,17 @@ ERL_NIF_TERM nif_recvmsg(ErlNifEnv* env, return enif_make_badarg(env); } - if ((! GET_UINT64(env, argv[1], &eBufSz)) || - (! GET_UINT64(env, argv[2], &eCtrlSz)) || + if ((! (a1ok = GET_UINT64(env, argv[1], &eBufSz))) || + (! (a2ok = GET_UINT64(env, argv[2], &eCtrlSz))) || (! GET_INT(env, argv[3], &flags))) { - BOOLEAN_T - argv1_is_integer = IS_INTEGER(env, argv[1]), - argv2_is_integer; - - if ((! argv1_is_integer) || - (! (argv2_is_integer = IS_INTEGER(env, argv[2]))) || + if ((! IS_INTEGER(env, argv[1])) || + (! IS_INTEGER(env, argv[2])) || (! IS_INTEGER(env, argv[3]))) return enif_make_badarg(env); - if (argv1_is_integer) + if (! a1ok) return esock_make_error_integer_range(env, argv[1]); - if (argv2_is_integer) + if (! a2ok) return esock_make_error_integer_range(env, argv[2]); return esock_make_error_integer_range(env, argv[3]); @@ -7795,7 +8588,7 @@ BOOLEAN_T esock_do_stop(ErlNifEnv* env, esock_stop_handle_current(env, "writer", - descP, sockRef, descP->currentWriterP); + descP, sockRef, &descP->currentWriter); } /* Inform the waiting Writers (in the same way) */ @@ -7831,7 +8624,7 @@ BOOLEAN_T esock_do_stop(ErlNifEnv* env, esock_stop_handle_current(env, "connector", - descP, sockRef, descP->connectorP); + descP, sockRef, &descP->connector); } descP->connectorP = NULL; @@ -7852,7 +8645,7 @@ BOOLEAN_T esock_do_stop(ErlNifEnv* env, esock_stop_handle_current(env, "reader", - descP, sockRef, descP->currentReaderP); + descP, sockRef, &descP->currentReader); } /* Inform the Readers (in the same way) */ @@ -7889,7 +8682,7 @@ BOOLEAN_T esock_do_stop(ErlNifEnv* env, esock_stop_handle_current(env, "acceptor", - descP, sockRef, descP->currentAcceptorP); + descP, sockRef, &descP->currentAcceptor); } /* Inform the waiting Acceptor (in the same way) */ @@ -7995,17 +8788,19 @@ ERL_NIF_TERM esock_finalize_close(ErlNifEnv* env, // Close the socket /* Stop monitoring the closer. - * since it is the caller, demonitoring must succeed + * Demonitoring may fail since this is a dirty NIF + * - the caller may have died already. */ enif_set_pid_undefined(&descP->closerPid); if (descP->closerMon.isActive) { - ESOCK_ASSERT( DEMONP("esock_finalize_close -> closer", - env, descP, &descP->closerMon) == 0 ); + (void) DEMONP("esock_finalize_close -> closer", + env, descP, &descP->closerMon); } /* Stop monitoring the owner */ enif_set_pid_undefined(&descP->ctrlPid); - DEMONP("esock_finalize_close -> ctrl", env, descP, &descP->ctrlMon); + (void) DEMONP("esock_finalize_close -> ctrl", + env, descP, &descP->ctrlMon); /* Not impossible to still get a esock_down() call from a * just triggered owner monitor down */ @@ -8020,6 +8815,13 @@ ERL_NIF_TERM esock_finalize_close(ErlNifEnv* env, err = esock_close_socket(env, descP, TRUE); +#ifdef HAVE_SENDFILE + if (descP->sendfileSock != INVALID_SOCKET) { + (void) close(descP->sendfileSock); + descP->sendfileSock = INVALID_SOCKET; + } +#endif + if (err != 0) { if (err == ERRNO_BLOCK) { /* Not all data in the buffers where sent, @@ -11599,6 +12401,8 @@ ERL_NIF_TERM esock_cancel(ErlNifEnv* env, ERL_NIF_TERM sockRef, ERL_NIF_TERM opRef) { + int cmp; + /* <KOLLA> * * Do we really need all these variants? Should it not be enough with: @@ -11607,23 +12411,37 @@ ERL_NIF_TERM esock_cancel(ErlNifEnv* env, * * </KOLLA> */ - if (COMPARE(op, esock_atom_connect) == 0) { - return esock_cancel_connect(env, descP, opRef); - } else if (COMPARE(op, esock_atom_accept) == 0) { - return esock_cancel_accept(env, descP, sockRef, opRef); - } else if (COMPARE(op, esock_atom_send) == 0) { - return esock_cancel_send(env, descP, sockRef, opRef); - } else if (COMPARE(op, esock_atom_sendto) == 0) { - return esock_cancel_send(env, descP, sockRef, opRef); - } else if (COMPARE(op, esock_atom_sendmsg) == 0) { - return esock_cancel_send(env, descP, sockRef, opRef); - } else if (COMPARE(op, esock_atom_recv) == 0) { - return esock_cancel_recv(env, descP, sockRef, opRef); - } else if (COMPARE(op, esock_atom_recvfrom) == 0) { - return esock_cancel_recv(env, descP, sockRef, opRef); - } else if (COMPARE(op, esock_atom_recvmsg) == 0) { + + /* Hand crafted binary search */ + if ((cmp = COMPARE(op, esock_atom_recvmsg)) == 0) return esock_cancel_recv(env, descP, sockRef, opRef); + if (cmp < 0) { + if ((cmp = COMPARE(op, esock_atom_recv)) == 0) + return esock_cancel_recv(env, descP, sockRef, opRef); + if (cmp < 0) { + if (COMPARE(op, esock_atom_connect) == 0) + return esock_cancel_connect(env, descP, opRef); + if (COMPARE(op, esock_atom_accept) == 0) + return esock_cancel_accept(env, descP, sockRef, opRef); + } else { + if (COMPARE(op, esock_atom_recvfrom) == 0) + return esock_cancel_recv(env, descP, sockRef, opRef); + } } else { + if ((cmp = COMPARE(op, esock_atom_sendmsg)) == 0) + return esock_cancel_send(env, descP, sockRef, opRef); + if (cmp < 0) { + if (COMPARE(op, esock_atom_send) == 0) + return esock_cancel_send(env, descP, sockRef, opRef); + if (COMPARE(op, atom_sendfile) == 0) + return esock_cancel_send(env, descP, sockRef, opRef); + } else { + if (COMPARE(op, esock_atom_sendto) == 0) + return esock_cancel_send(env, descP, sockRef, opRef); + } + } + + { ERL_NIF_TERM result; const char *reason; @@ -11718,8 +12536,8 @@ ERL_NIF_TERM esock_cancel_accept(ErlNifEnv* env, "\r\n", sockRef, descP->sock, descP->readState, opRef, - ((descP->currentAcceptorP == NULL) ? - "without acceptor" : "with acceptor")) ); + ((descP->currentAcceptorP == NULL) + ? "without acceptor" : "with acceptor")) ); if (! IS_OPEN(descP->readState)) { @@ -11780,7 +12598,7 @@ ERL_NIF_TERM esock_cancel_accept_current(ErlNifEnv* env, descP->readState &= ~ESOCK_STATE_ACCEPTING; - descP->currentAcceptorP = NULL; + descP->currentAcceptorP = NULL; } return res; @@ -11837,7 +12655,8 @@ ERL_NIF_TERM esock_cancel_send(ErlNifEnv* env, "\r\n", sockRef, descP->sock, descP->writeState, opRef, - ((descP->currentWriterP == NULL) ? "without writer" : "with writer")) ); + ((descP->currentWriterP == NULL) + ? "without writer" : "with writer")) ); if (! IS_OPEN(descP->writeState)) { @@ -11952,7 +12771,8 @@ ERL_NIF_TERM esock_cancel_recv(ErlNifEnv* env, "\r\n", sockRef, descP->sock, descP->readState, opRef, - ((descP->currentReaderP == NULL) ? "without reader" : "with reader")) ); + ((descP->currentReaderP == NULL) + ? "without reader" : "with reader")) ); if (! IS_OPEN(descP->readState)) { @@ -12335,7 +13155,7 @@ ERL_NIF_TERM send_check_fail(ErlNifEnv* env, if (descP->currentWriterP != NULL) { requestor_release("send_check_fail", - env, descP, descP->currentWriterP); + env, descP, &descP->currentWriter); send_error_waiting_writers(env, descP, sockRef, reason); @@ -12457,7 +13277,7 @@ ERL_NIF_TERM send_check_retry(ErlNifEnv* env, descP->currentWriter.env = esock_alloc_env("current-writer"); descP->currentWriter.ref = CP_TERM(descP->currentWriter.env, sendRef); - descP->currentWriterP = &descP->currentWriter; + descP->currentWriterP = &descP->currentWriter; } else { /* Overwrite current writer registration */ enif_clear_env(descP->currentWriter.env); @@ -12475,7 +13295,19 @@ ERL_NIF_TERM send_check_retry(ErlNifEnv* env, sres = esock_select_write(env, descP->sock, descP, NULL, sockRef, sendRef); if (sres < 0) { + ERL_NIF_TERM reason; + /* Internal select error */ + DEMONP("send_check_retry - select error", + env, descP, &descP->currentWriter.mon); + + /* Fail all queued writers */ + reason = MKT2(env, atom_select_write, MKI(env, sres)); + requestor_release("send_check_retry - select error", + env, descP, &descP->currentWriter); + send_error_waiting_writers(env, descP, sockRef, reason); + descP->currentWriterP = NULL; + res = enif_raise_exception(env, MKT2(env, atom_select_write, @@ -12607,13 +13439,11 @@ void recv_init_current_reader(ErlNifEnv* env, * as the new current reader and a new (read) select will be done. */ #ifndef __WIN32__ -static -ERL_NIF_TERM recv_update_current_reader(ErlNifEnv* env, - ESockDescriptor* descP, - ERL_NIF_TERM sockRef) +static void +recv_update_current_reader(ErlNifEnv* env, + ESockDescriptor* descP, + ERL_NIF_TERM sockRef) { - ERL_NIF_TERM res = esock_atom_ok; - if (descP->currentReaderP != NULL) { DEMONP("recv_update_current_reader", @@ -12628,10 +13458,7 @@ ERL_NIF_TERM recv_update_current_reader(ErlNifEnv* env, descP->currentReaderP = NULL; } - } - - return res; } #endif // #ifndef __WIN32__ @@ -12654,7 +13481,7 @@ void recv_error_current_reader(ErlNifEnv* env, ESockRequestor req; requestor_release("recv_error_current_reader", - env, descP, descP->currentReaderP); + env, descP, &descP->currentReader); req.env = NULL; /* read by reader_pop before free */ while (reader_pop(env, descP, &req)) { @@ -13077,9 +13904,11 @@ ERL_NIF_TERM recv_check_retry(ErlNifEnv* env, if ((sres = esock_select_read(env, descP->sock, descP, NULL, sockRef, recvRef)) < 0) { - /* Ouch - * Now what? We have copied ref into *its own* environment! + /* Unlikely that any next reader will have better luck, + * but why not give them a shot - the queue will be cleared */ + recv_update_current_reader(env, descP, sockRef); + res = enif_raise_exception(env, MKT2(env, atom_select_read, @@ -13240,6 +14069,10 @@ ERL_NIF_TERM recv_check_partial_part(ErlNifEnv* env, sres = esock_select_read(env, descP->sock, descP, NULL, sockRef, recvRef); if (sres < 0) { + /* Unlikely that any next reader will have better luck, + * but why not give them a shot - the queue will be cleared + */ + recv_update_current_reader(env, descP, sockRef); res = enif_raise_exception(env, @@ -15242,7 +16075,7 @@ ESockDescriptor* alloc_descriptor(SOCKET sock, HANDLE event) enif_alloc_resource(esocks, sizeof(ESockDescriptor))) != NULL ); - descP->pattern = ESOCK_DESC_PATTERN_CREATED; + descP->pattern = ESOCK_DESC_PATTERN_CREATED; requestor_init(&descP->connector); descP->connectorP = NULL; @@ -15254,13 +16087,19 @@ ESockDescriptor* alloc_descriptor(SOCKET sock, HANDLE event) descP->currentWriterP = NULL; // currentWriter not used descP->writersQ.first = NULL; descP->writersQ.last = NULL; - descP->writePkgCnt = 0; - descP->writePkgMax = 0; - descP->writePkgMaxCnt = 0; - descP->writeByteCnt = 0; - descP->writeTries = 0; - descP->writeWaits = 0; - descP->writeFails = 0; + + descP->writePkgCnt = 0; + descP->writePkgMax = 0; + descP->writePkgMaxCnt = 0; + descP->writeByteCnt = 0; + descP->writeTries = 0; + descP->writeWaits = 0; + descP->writeFails = 0; + +#ifdef HAVE_SENDFILE + descP->sendfileSock = INVALID_SOCKET; + descP->sendfileCountersP = NULL; +#endif sprintf(buf, "esock.r[%d]", sock); descP->readMtx = MCREATE(buf); @@ -15269,6 +16108,7 @@ ESockDescriptor* alloc_descriptor(SOCKET sock, HANDLE event) descP->currentReaderP = NULL; // currentReader not used descP->readersQ.first = NULL; descP->readersQ.last = NULL; + descP->readPkgCnt = 0; descP->readPkgMax = 0; descP->readPkgMaxCnt = 0; @@ -15276,6 +16116,7 @@ ESockDescriptor* alloc_descriptor(SOCKET sock, HANDLE event) descP->readTries = 0; descP->readWaits = 0; descP->readFails = 0; + sprintf(buf, "esock.acc[%d]", sock); requestor_init(&descP->currentAcceptor); descP->currentAcceptorP = NULL; // currentAcceptor not used @@ -15679,6 +16520,27 @@ void esock_send_close_msg(ErlNifEnv* env, sockRef, descP->sock, MKPID(env, pid), msg) ); } } +#ifdef HAVE_SENDFILE +static void +esock_send_sendfile_deferred_close_msg(ErlNifEnv* env, + ESockDescriptor* descP) +{ + ERL_NIF_TERM sockRef, msg; + ErlNifPid *pid; + + pid = &data.regPid; + sockRef = enif_make_resource(env, descP); + msg = mk_reg_msg(env, atom_sendfile_deferred_close, sockRef); + + /* If this send should fail we have leaked a file descriptor + * (intolerable), and if we try to close it here, on a regular + * scheduler, it might hang "forever" due to e.g NFS + * (out of the question), so terminating the VM + * is the only viable option + */ + ESOCK_ASSERT( esock_send_msg(env, pid, msg, NULL) ); +} +#endif // #ifdef HAVE_SENDFILE #endif // #ifndef __WIN32__ @@ -16293,7 +17155,7 @@ static void requestor_init(ESockRequestor* reqP) { static int requestor_release(const char* slogan, ErlNifEnv* env, ESockDescriptor* descP, - ESockRequestor* reqP) { + ESockRequestor* reqP) { int res; enif_set_pid_undefined(&reqP->pid); @@ -16626,6 +17488,13 @@ void esock_dtor(ErlNifEnv* env, void* obj) SGDBG( ("SOCKET", "dtor -> try free acceptors request queue\r\n") ); free_request_queue(&descP->acceptorsQ); +#ifdef HAVE_SENDFILE + ESOCK_ASSERT( descP->sendfileSock == INVALID_SOCKET ); + if (descP->sendfileCountersP != NULL) + FREE(descP->sendfileCountersP); + descP->sendfileCountersP = NULL; +#endif + esock_free_env("dtor close env", descP->closeEnv); descP->closeEnv = NULL; @@ -16676,44 +17545,70 @@ void esock_stop(ErlNifEnv* env, void* obj, ErlNifEvent fd, int is_direct_call) "\r\n ctrlPid: %T" "\r\n closerPid: %T" "\r\ncounters:" - "\r\n writePkgCnt: %u" - "\r\n writePkgMax: %u" - "\r\n writeByteCnt: %u" - "\r\n writeTries: %u" - "\r\n writeWaits: %u" - "\r\n writeFails: %u" - "\r\n readPkgCnt: %u" - "\r\n readPkgMax: %u" - "\r\n readByteCnt: %u" - "\r\n readTries: %u" - "\r\n readWaits: %u" - "\r\n accSuccess: %u" - "\r\n accTries: %u" - "\r\n accWaits: %u" - "\r\n accFails: %u" + "\r\n writePkgCnt: %lu" + "\r\n writePkgMax: %lu" + "\r\n writeByteCnt: %lu" + "\r\n writeTries: %lu" + "\r\n writeWaits: %lu" + "\r\n writeFails: %lu" + "\r\n readPkgCnt: %lu" + "\r\n readPkgMax: %lu" + "\r\n readByteCnt: %lu" + "\r\n readTries: %lu" + "\r\n readWaits: %lu" + "\r\n accSuccess: %lu" + "\r\n accTries: %lu" + "\r\n accWaits: %lu" + "\r\n accFails: %lu" "\r\n", descP->sock, fd, (is_direct_call) ? "called" : "scheduled", descP->ctrlPid, descP->closerPid, // - descP->writePkgCnt, - descP->writePkgMax, - descP->writeByteCnt, - descP->writeTries, - descP->writeWaits, - descP->writeFails, + (unsigned long) descP->writePkgCnt, + (unsigned long) descP->writePkgMax, + (unsigned long) descP->writeByteCnt, + (unsigned long) descP->writeTries, + (unsigned long) descP->writeWaits, + (unsigned long) descP->writeFails, + (unsigned long) descP->readPkgCnt, + (unsigned long) descP->readPkgMax, + (unsigned long) descP->readByteCnt, + (unsigned long) descP->readTries, + (unsigned long) descP->readWaits, // - descP->readPkgCnt, - descP->readPkgMax, - descP->readByteCnt, - descP->readTries, - descP->readWaits, - // - descP->accSuccess, - descP->accTries, - descP->accWaits, - descP->accFails) ); + (unsigned long) descP->accSuccess, + (unsigned long) descP->accTries, + (unsigned long) descP->accWaits, + (unsigned long) descP->accFails) ); + +#ifdef HAVE_SENDFILE + if (descP->sendfileCountersP != NULL) { + ESockSendfileCounters *cP = descP->sendfileCountersP; + + SSDBG( descP, ("SOCKET", "esock_stop {%d/%d} ->" + "\r\nsendfileCounters:" + "\r\n cnt: %lu" + "\r\n byteCnt: %lu" + "\r\n fails: %lu" + "\r\n max: %lu" + "\r\n pkg: %lu" + "\r\n pkgMax %lu" + "\r\n tries: %lu" + "\r\n waits: %lu" + "\r\n", + descP->sock, fd, + (unsigned long) cP->cnt, + (unsigned long) cP->byteCnt, + (unsigned long) cP->fails, + (unsigned long) cP->max, + (unsigned long) cP->pkg, + (unsigned long) cP->pkgMax, + (unsigned long) cP->tries, + (unsigned long) cP->waits) ); + } +#endif if (is_direct_call) return; // Nothing to do, caller gets ERL_NIF_SELECT_STOP_SCHEDULED @@ -16747,6 +17642,11 @@ void esock_stop(ErlNifEnv* env, void* obj, ErlNifEvent fd, int is_direct_call) /* We do not have a closer process * - have to do an unclean (non blocking) close */ +#ifdef HAVE_SENDFILE + if (descP->sendfileSock != INVALID_SOCKET) + esock_send_sendfile_deferred_close_msg(env, descP); +#endif + err = esock_close_socket(env, descP, FALSE); if (err != 0) @@ -16924,6 +17824,11 @@ void esock_down(ErlNifEnv* env, * - we have to do an unclean (non blocking) socket close here */ +#ifdef HAVE_SENDFILE + if (descP->sendfileSock != INVALID_SOCKET) + esock_send_sendfile_deferred_close_msg(env, descP); +#endif + err = esock_close_socket(env, descP, FALSE); if (err != 0) esock_warning_msg("Failed closing socket for terminating " @@ -16985,11 +17890,9 @@ void esock_down(ErlNifEnv* env, */ requestor_release("esock_down->connector", - env, descP, descP->connectorP); - - descP->writeState &= ~ESOCK_STATE_CONNECTING; - + env, descP, &descP->connector); descP->connectorP = NULL; + descP->writeState &= ~ESOCK_STATE_CONNECTING; } else { ERL_NIF_TERM sockRef; @@ -17064,6 +17967,11 @@ void esock_down_ctrl(ErlNifEnv* env, * - we have to do an unclean (non blocking) socket close here */ +#ifdef HAVE_SENDFILE + if (descP->sendfileSock != INVALID_SOCKET) + esock_send_sendfile_deferred_close_msg(env, descP); +#endif + err = esock_close_socket(env, descP, FALSE); if (err != 0) esock_warning_msg("Failed closing socket for terminating " @@ -17159,7 +18067,7 @@ void esock_down_writer(ErlNifEnv* env, "esock_down_writer(%T) {%d} -> no active writer\r\n", sockRef, descP->sock) ); - descP->currentWriterP = NULL; + descP->currentWriterP = NULL; } } else { @@ -17209,7 +18117,7 @@ void esock_down_reader(ErlNifEnv* env, "esock_down_reader(%T) {%d} -> no more readers\r\n", sockRef, descP->sock) ); - descP->currentReaderP = NULL; + descP->currentReaderP = NULL; } } else { @@ -17255,6 +18163,9 @@ ErlNifFunc esock_funcs[] = {"nif_send", 4, nif_send, 0}, {"nif_sendto", 5, nif_sendto, 0}, {"nif_sendmsg", 5, nif_sendmsg, 0}, + {"nif_sendfile", 5, nif_sendfile, ERL_NIF_DIRTY_JOB_IO_BOUND}, + {"nif_sendfile", 4, nif_sendfile, ERL_NIF_DIRTY_JOB_IO_BOUND}, + {"nif_sendfile", 1, nif_sendfile, ERL_NIF_DIRTY_JOB_IO_BOUND}, {"nif_recv", 4, nif_recv, 0}, {"nif_recvfrom", 4, nif_recvfrom, 0}, {"nif_recvmsg", 5, nif_recvmsg, 0}, diff --git a/erts/emulator/nifs/common/socket_util.c b/erts/emulator/nifs/common/socket_util.c index 61ea615de8..d03a593238 100644 --- a/erts/emulator/nifs/common/socket_util.c +++ b/erts/emulator/nifs/common/socket_util.c @@ -1,7 +1,7 @@ /* * %CopyrightBegin% * - * Copyright Ericsson AB 2018-2020. All Rights Reserved. + * Copyright Ericsson AB 2018-2021. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -1754,7 +1754,7 @@ ERL_NIF_TERM esock_make_error_errno(ErlNifEnv* env, int err) /* Create an error two (2) tuple in the form: * - * {error, {invalid, {What, Info}}} + * {error, {invalid, What}}} */ extern ERL_NIF_TERM esock_make_error_invalid(ErlNifEnv* env, ERL_NIF_TERM what) diff --git a/erts/emulator/nifs/unix/unix_prim_file.c b/erts/emulator/nifs/unix/unix_prim_file.c index 1efde4c0cf..770dcf91ab 100644 --- a/erts/emulator/nifs/unix/unix_prim_file.c +++ b/erts/emulator/nifs/unix/unix_prim_file.c @@ -1,7 +1,7 @@ /* * %CopyrightBegin% * - * Copyright Ericsson 2017-2020. All Rights Reserved. + * Copyright Ericsson 2017-2021. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -95,16 +95,22 @@ posix_errno_t efile_marshal_path(ErlNifEnv *env, ERL_NIF_TERM path, efile_path_t return 0; } -ERL_NIF_TERM efile_get_handle(ErlNifEnv *env, efile_data_t *d) { +posix_errno_t efile_get_handle(ErlNifEnv *env, efile_data_t *d, int do_dup, ERL_NIF_TERM *handle) { efile_unix_t *u = (efile_unix_t*)d; - ERL_NIF_TERM result; unsigned char *bits; + int fd; - bits = enif_make_new_binary(env, sizeof(u->fd), &result); - memcpy(bits, &u->fd, sizeof(u->fd)); + if (do_dup) { + if ((fd = dup(u->fd)) < 0) + return errno; + } else { + fd = u->fd; + } + bits = enif_make_new_binary(env, sizeof(fd), handle); + memcpy(bits, &fd, sizeof(fd)); - return result; + return 0; } static int open_file_is_dir(const efile_path_t *path, int fd) { diff --git a/erts/emulator/nifs/win32/win_prim_file.c b/erts/emulator/nifs/win32/win_prim_file.c index 72e1e30543..8307e03f8a 100644 --- a/erts/emulator/nifs/win32/win_prim_file.c +++ b/erts/emulator/nifs/win32/win_prim_file.c @@ -1,7 +1,7 @@ /* * %CopyrightBegin% * - * Copyright Ericsson 2017-2020. All Rights Reserved. + * Copyright Ericsson 2017-2021. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -216,16 +216,19 @@ posix_errno_t efile_marshal_path(ErlNifEnv *env, ERL_NIF_TERM path, efile_path_t return get_full_path(env, (WCHAR*)raw_path.data, result); } -ERL_NIF_TERM efile_get_handle(ErlNifEnv *env, efile_data_t *d) { +posix_errno_t efile_get_handle(ErlNifEnv *env, efile_data_t *d, int do_dup, ERL_NIF_TERM *handle) { efile_win_t *w = (efile_win_t*)d; - ERL_NIF_TERM result; unsigned char *bits; - bits = enif_make_new_binary(env, sizeof(w->handle), &result); + if (do_dup) { + /* XXX not implemented yet */ + return ENOTSUP; + } + bits = enif_make_new_binary(env, sizeof(w->handle), handle); memcpy(bits, &w->handle, sizeof(w->handle)); - return result; + return 0; } /** @brief Converts a native path to the preferred form in "erlang space," diff --git a/erts/preloaded/ebin/prim_file.beam b/erts/preloaded/ebin/prim_file.beam Binary files differindex fc6e1ece14..9597015bb6 100644 --- a/erts/preloaded/ebin/prim_file.beam +++ b/erts/preloaded/ebin/prim_file.beam diff --git a/erts/preloaded/ebin/prim_socket.beam b/erts/preloaded/ebin/prim_socket.beam Binary files differindex 5f74928eee..a31c87efd3 100644 --- a/erts/preloaded/ebin/prim_socket.beam +++ b/erts/preloaded/ebin/prim_socket.beam diff --git a/erts/preloaded/src/prim_file.erl b/erts/preloaded/src/prim_file.erl index 3a6a573b89..9e9b2ef5c2 100644 --- a/erts/preloaded/src/prim_file.erl +++ b/erts/preloaded/src/prim_file.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2000-2020. All Rights Reserved. +%% Copyright Ericsson AB 2000-2021. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -30,7 +30,8 @@ -export([file_desc_to_ref/2]). --export([ipread_s32bu_p32bu/3, sendfile/8, altname/1, get_handle/1]). +-export([ipread_s32bu_p32bu/3, sendfile/8, altname/1, + get_handle/1, get_fd_data/1]). -export([read_file/1, write_file/2]). diff --git a/erts/preloaded/src/prim_socket.erl b/erts/preloaded/src/prim_socket.erl index d815eea84e..45c2e79478 100644 --- a/erts/preloaded/src/prim_socket.erl +++ b/erts/preloaded/src/prim_socket.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2018-2020. All Rights Reserved. +%% Copyright Ericsson AB 2018-2021. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -36,6 +36,7 @@ listen/2, accept/2, send/4, sendto/4, sendto/5, sendmsg/4, sendmsg/5, + sendfile/4, sendfile/5, sendfile_deferred_close/1, recv/4, recvfrom/4, recvmsg/5, close/1, finalize_close/1, shutdown/2, @@ -631,6 +632,15 @@ invalid_iov([H|IOV], N) -> invalid_iov(_, N) -> {improper_list, N}. +sendfile(SockRef, Offset, Count, SendRef) -> + nif_sendfile(SockRef, SendRef, Offset, Count). + +sendfile(SockRef, FileRef, Offset, Count, SendRef) -> + nif_sendfile(SockRef, SendRef, Offset, Count, FileRef). + +sendfile_deferred_close(SockRef) -> + nif_sendfile(SockRef). + %% ---------------------------------- recv(SockRef, Length, Flags, RecvRef) -> @@ -971,7 +981,7 @@ p_get(Name) -> %% nif_info() -> erlang:nif_error(undef). -nif_info(_SRef) -> erlang:nif_error(undef). +nif_info(_SockRef) -> erlang:nif_error(undef). nif_command(_Command) -> erlang:nif_error(undef). @@ -981,35 +991,42 @@ nif_supports(_Key) -> erlang:nif_error(undef). nif_open(_FD, _Opts) -> erlang:nif_error(undef). nif_open(_Domain, _Type, _Protocol, _Opts) -> erlang:nif_error(undef). -nif_bind(_SRef, _SockAddr) -> erlang:nif_error(undef). -nif_bind(_SRef, _SockAddrs, _Action) -> erlang:nif_error(undef). +nif_bind(_SockRef, _SockAddr) -> erlang:nif_error(undef). +nif_bind(_SockRef, _SockAddrs, _Action) -> erlang:nif_error(undef). -nif_connect(_SRef) -> erlang:nif_error(undef). -nif_connect(_SRef, _ConnectRef, _SockAddr) -> erlang:nif_error(undef). +nif_connect(_SockRef) -> erlang:nif_error(undef). +nif_connect(_SockRef, _ConnectRef, _SockAddr) -> erlang:nif_error(undef). -nif_listen(_SRef, _Backlog) -> erlang:nif_error(undef). +nif_listen(_SockRef, _Backlog) -> erlang:nif_error(undef). -nif_accept(_SRef, _Ref) -> erlang:nif_error(undef). +nif_accept(_SockRef, _Ref) -> erlang:nif_error(undef). nif_send(_SockRef, _Bin, _Flags, _SendRef) -> erlang:nif_error(undef). -nif_sendto(_SRef, _Bin, _Dest, _Flags, _SendRef) -> erlang:nif_error(undef). -nif_sendmsg(_SRef, _Msg, _Flags, _SendRef, _IOV) -> erlang:nif_error(undef). +nif_sendto(_SockRef, _Bin, _Dest, _Flags, _SendRef) -> erlang:nif_error(undef). +nif_sendmsg(_SockRef, _Msg, _Flags, _SendRef, _IOV) -> erlang:nif_error(undef). + +nif_sendfile(_SockRef, _SendRef, _Offset, _Count, _InFileRef) -> + erlang:nif_error(undef). +nif_sendfile(_SockRef, _SendRef, _Offset, _Count) -> + erlang:nif_error(undef). +nif_sendfile(_SockRef) -> erlang:nif_error(undef). -nif_recv(_SRef, _Length, _Flags, _RecvRef) -> erlang:nif_error(undef). -nif_recvfrom(_SRef, _Length, _Flags, _RecvRef) -> erlang:nif_error(undef). -nif_recvmsg(_SRef, _BufSz, _CtrlSz, _Flags, _RecvRef) -> erlang:nif_error(undef). +nif_recv(_SockRef, _Length, _Flags, _RecvRef) -> erlang:nif_error(undef). +nif_recvfrom(_SockRef, _Length, _Flags, _RecvRef) -> erlang:nif_error(undef). +nif_recvmsg(_SockRef, _BufSz, _CtrlSz, _Flags, _RecvRef) -> + erlang:nif_error(undef). -nif_close(_SRef) -> erlang:nif_error(undef). -nif_finalize_close(_SRef) -> erlang:nif_error(undef). -nif_shutdown(_SRef, _How) -> erlang:nif_error(undef). +nif_close(_SockRef) -> erlang:nif_error(undef). +nif_finalize_close(_SockRef) -> erlang:nif_error(undef). +nif_shutdown(_SockRef, _How) -> erlang:nif_error(undef). -nif_setopt(_Ref, _Lev, _Opt, _Val, _NativeVal) -> erlang:nif_error(undef). -nif_getopt(_Ref, _Lev, _Opt) -> erlang:nif_error(undef). -nif_getopt(_Ref, _Lev, _Opt, _ValSpec) -> erlang:nif_error(undef). +nif_setopt(_SockRef, _Lev, _Opt, _Val, _NativeVal) -> erlang:nif_error(undef). +nif_getopt(_SockRef, _Lev, _Opt) -> erlang:nif_error(undef). +nif_getopt(_SockRef, _Lev, _Opt, _ValSpec) -> erlang:nif_error(undef). -nif_sockname(_Ref) -> erlang:nif_error(undef). -nif_peername(_Ref) -> erlang:nif_error(undef). +nif_sockname(_SockRef) -> erlang:nif_error(undef). +nif_peername(_SockRef) -> erlang:nif_error(undef). -nif_cancel(_SRef, _Op, _Ref) -> erlang:nif_error(undef). +nif_cancel(_SockRef, _Op, _SelectRef) -> erlang:nif_error(undef). %% =========================================================================== diff --git a/lib/kernel/doc/src/file.xml b/lib/kernel/doc/src/file.xml index 7332f0ca3b..f6ef1892c5 100644 --- a/lib/kernel/doc/src/file.xml +++ b/lib/kernel/doc/src/file.xml @@ -4,7 +4,7 @@ <erlref> <header> <copyright> - <year>1996</year><year>2020</year> + <year>1996</year><year>2021</year> <holder>Ericsson AB. All Rights Reserved.</holder> </copyright> <legalnotice> @@ -1857,7 +1857,13 @@ f.txt: {person, "kalle", 25}. <c>0</c> all data after the specified <c>Offset</c> is sent.</p> <p>The file used must be opened using the <c>raw</c> flag, and the process calling <c>sendfile</c> must be the controlling process of the socket. - See <seemfa marker="gen_tcp#controlling_process/2"><c>gen_tcp:controlling_process/2</c></seemfa>.</p> + See <seemfa marker="gen_tcp#controlling_process/2"><c>gen_tcp:controlling_process/2</c></seemfa> + or module + <seemfa marker="socket#setopt/3"><c>socket</c>'s</seemfa> + <seetype marker="socket#otp_socket_option"> + level <c>otp</c> socket option + </seetype> + <c>controlling_process</c>.</p> <p>If the OS used does not support non-blocking <c>sendfile</c>, an Erlang fallback using <seemfa marker="#read/2"><c>read/2</c></seemfa> and <seemfa marker="gen_tcp#send/2"><c>gen_tcp:send/2</c></seemfa> is diff --git a/lib/kernel/doc/src/socket.xml b/lib/kernel/doc/src/socket.xml index 96bcfa32fa..57752fb912 100644 --- a/lib/kernel/doc/src/socket.xml +++ b/lib/kernel/doc/src/socket.xml @@ -63,12 +63,16 @@ <item><c>{'$socket', socket(), select, SelectHandle}</c></item> </taglist> <p> - The caller can now make new call - to the recv function and expect data. + The caller can now call the <c>recv</c> function again + and probably expect data + (it is really up to the OS network protocol implementation). </p> <p> Note that all other users are <em>locked out</em> until the - 'current user' has called the function (recv in this case). + 'current user' has called the function (<c>recv</c> in this case) + and its return value shows that the operation has completed. + An operation can also be cancelled with + <seemfa marker="#cancel/2"><c>cancel/2</c></seemfa>. </p> <p> Instead of <c>Timeout = nowait</c> it is equivalent to create a @@ -160,13 +164,17 @@ </desc> </datatype> <datatype> - <name>socket()</name> + <name name="socket"/> <desc><p>As returned by <seemfa marker="#open/1"><c>open/1,2,3,4</c></seemfa> and <seemfa marker="#accept/1"><c>accept/1,2</c></seemfa>.</p> </desc> </datatype> <datatype> + <name name="socket_handle"/> + <desc><p>An opaque socket handle unique for the socket.</p></desc> + </datatype> + <datatype> <name name="select_tag"/> <desc> <p> @@ -1374,6 +1382,12 @@ <c>select_handle()</c> </seetype> generated by the call. </p> + <p> + If the caller doesn't want to wait for a connection, + it must immediately call + <seemfa marker="#cancel/2"><c>cancel/2</c></seemfa> + to cancel the operation. + </p> </desc> </func> @@ -1415,6 +1429,19 @@ asynchronous call to, e.g. <seemfa marker="#recv/3"><c>recv/3</c></seemfa>. </p> + <p> + An ongoing asynchronous operation blocks the socket + until the operation has been finished in good order, + or until it has been cancelled by this function. + </p> + <p> + Any other process that tries an operation + of the same basic type (accept / send / recv) will be + enqueued and notified with the regular <c>select</c> + mechanism for asynchronous operations + when the current operation and all enqueued before it + has been completed. + </p> </desc> </func> @@ -1534,6 +1561,12 @@ <c>select_handle()</c> </seetype> generated by the call. </p> + <p> + If the caller doesn't want to wait for + the connection to complete, it must immediately call + <seemfa marker="#cancel/2"><c>cancel/2</c></seemfa> + to cancel the operation. + </p> </desc> </func> @@ -2106,7 +2139,7 @@ <c>{ok, {<anno>Data</anno>, <anno>SelectInfo</anno></c> </seetype> with partial data. If the caller doesn't want to wait - for more data, it must call + for more data, it must immediately call <seemfa marker="#cancel/2"><c>cancel/2</c></seemfa> to cancel the operation. </p> @@ -2236,6 +2269,12 @@ <c>select_handle()</c> </seetype> generated by the call. </p> + <p> + If the caller doesn't want to wait for the data, + it must immediately call + <seemfa marker="#cancel/2"><c>cancel/2</c></seemfa> + to cancel the operation. + </p> </desc> </func> @@ -2370,6 +2409,12 @@ <c>select_handle()</c> </seetype> generated by the call. </p> + <p> + If the caller doesn't want to wait for the data, + it must immediately call + <seemfa marker="#cancel/2"><c>cancel/2</c></seemfa> + to cancel the operation. + </p> </desc> </func> @@ -2512,7 +2557,7 @@ <desc> <p> Sends data on a connected socket, - but return a select continuation if the data + but returns a select continuation if the data could not be sent immediately. </p> <p> @@ -2561,7 +2606,7 @@ which can only happen for a socket of <seetype marker="#type">type <c>stream</c></seetype>. If the caller does not want to wait to send the rest of the data, - it should cancel the operation with + it should immediately cancel the operation with <seemfa marker="#cancel/2"><c>cancel/2</c></seemfa>. </p> </desc> @@ -2709,7 +2754,7 @@ <desc> <p> Sends a message on a socket, - but return a select continuation if the data + but returns a select continuation if the data could not be sent immediately. </p> <p> @@ -2758,7 +2803,7 @@ which can only happen for a socket of <seetype marker="#type">type <c>stream</c></seetype>. If the caller does not want to wait to send the rest of the data, - it should cancel the operation with + it should immediately cancel the operation with <seemfa marker="#cancel/2"><c>cancel/2</c></seemfa>. </p> </desc> @@ -2876,7 +2921,7 @@ <desc> <p> Sends data on a socket, - but return a select continuation if the data + but returns a select continuation if the data could not be sent immediately. </p> <p> @@ -2884,7 +2929,7 @@ <seeerl marker="#sendto-infinity"> infinity time-out <c>sendto/3,4</c> </seeerl> - but if the data is not be immediately accepted + but if the data is not immediately accepted by the platform network layer, the function returns <seetype marker="#select_info"><c>{select, <anno>SelectInfo</anno>}</c></seetype>, @@ -2926,7 +2971,7 @@ which can only happen for a socket of <seetype marker="#type">type <c>stream</c></seetype>. If the caller does not want to wait to send the rest of the data, - it should cancel the operation with + it should immediately cancel the operation with <seemfa marker="#cancel/2"><c>cancel/2</c></seemfa>. </p> </desc> @@ -2980,6 +3025,308 @@ </func> <func> + <name name="sendfile" arity="5" clause_i="7" + since="OTP @OTP-17154@" + anchor="sendfile-infinity"/> + <fsummary> + Send file data on a socket, with "infinite" time-out. + </fsummary> + <desc> + <p> + Sends file data on a socket, to the specified destination, + waiting for it to be sent (<em>"infinite" time-out</em>). + </p> + <p> + The <c><anno>FileHandle</anno></c> must refer to + an open raw file as described in + <seemfa marker="file#open/2"><c>file:open/2</c></seemfa>. + </p> + <p> + This call will not return until the data has been accepted + by the platform's network layer, or it reports an error. + </p> + <p> + The <c>Offset</c> argument is the file offset + to start reading from. The default value is <c>0</c>. + </p> + <p> + The <c>Count</c> argument is the number of bytes + to transfer from <c><anno>FileHandle</anno></c> + to <c><anno>Socket</anno></c>. + If <c>Count =:= 0</c> (the default) + the transfer stops at the end of file. + </p> + <p> + The return value indicates the result from + the platform's network layer: + </p> + <taglist> + <tag><c>{ok, <anno>BytesSent</anno>}</c></tag> + <item> + <p> + The transfer completed succesfully after + <c><anno>BytesSent</anno></c> bytes of data. + </p> + </item> + <tag><c>{error, <anno>Reason</anno>}</c></tag> + <item> + <p> + An error has been reported and no data has been transferred. + The <seetype marker="#posix"><c>posix()</c></seetype> + <c><anno>Reason</anno>s</c> are from the + platform's network layer. + <c>closed</c> means that this socket library + knows that the socket is closed, and + <seetype marker="#invalid"><c>invalid()</c></seetype> + means that something about an argument is invalid. + </p> + </item> + <tag> + <c> + {error, {<anno>Reason</anno>, <anno>BytesSent</anno>}} + </c> + </tag> + <item> + <p> + An error has been reported + but before that some data was transferred. + See <c>{error, <anno>Reason</anno>}</c> + and <c>{ok, <anno>BytesSent</anno>}</c> above. + </p> + </item> + </taglist> + </desc> + </func> + + <func> + <name name="sendfile" arity="5" clause_i="8" + since="OTP @OTP-17154@" + anchor="sendfile-timeout"/> + <fsummary>Send file data on a socket, with time-out.</fsummary> + <desc> + <p> + Sends file data on a socket, waiting at most + <c><anno>Timeout</anno></c> milliseconds for it to be sent + (<em>limited time-out</em>). + </p> + <p> + The same as + <seeerl marker="#sendfile-infinity"> + "infinite" time-out <c>sendfile/5</c> + </seeerl> + but returns <c>{error, timeout}</c> + or <c>{error, {timeout, <anno>BytesSent</anno>}}</c> + after <c><anno>Timeout</anno></c> milliseconds, + if not all file data was transferred + by the platform's network layer. + </p> + </desc> + </func> + + <func> + <name name="sendfile" arity="5" clause_i="5" + since="OTP @OTP-17154@" + anchor="sendfile-nowait"/> + <name name="sendfile" arity="5" clause_i="6" + since="OTP @OTP-17154@"/> + <fsummary>Send file data on a socket, but do not wait.</fsummary> + <desc> + <p> + Sends file data on a socket, + but returns a select continuation if the data + could not be sent immediately (<em>nowait</em>). + </p> + <p> + The same as + <seeerl marker="#sendfile-infinity"> + "infinite" time-out <c>sendfile/5</c> + </seeerl> + but if the data is not immediately accepted + by the platform network layer, + the function returns + <seetype marker="#select_info"><c>{select, <anno>SelectInfo</anno>}</c></seetype>, + and the caller will then receive a select message, + <c>{'$socket', Socket, select, SelectHandle}</c> ( + with the + <seetype marker="socket#select_handle"><c>SelectHandle</c></seetype> + that was contained in the + <seetype marker="#select_info"> + <c><anno>SelectInfo</anno></c> + </seetype> + ) when there is room for more data. + Then a call to + <seeerl marker="#sendfile-cont"><c>sendfile/3</c></seeerl> + with <c><anno>SelectInfo</anno></c> as the second argument + will continue the data transfer. + </p> + <p> + If <c>SelectHandle</c> is a + <seetype marker="#select_handle"><c>select_handle()</c></seetype>, + that term will be contained in a returned + <c><anno>SelectInfo</anno></c> + and the corresponding select message. + The <c>SelectHandle</c> is presumed to be + unique to this call. + </p> + <p> + If <c>SelectHandle</c> is <c>nowait</c>, and a + <c><anno>SelectInfo</anno></c> is returned, + it will contain a + <seetype marker="socket#select_handle"> + <c>select_handle()</c> + </seetype> generated by the call. + </p> + <p> + If some file data was sent, the function will return + <seetype marker="#select_info"> + <c> + {ok, {<anno>BytesSent</anno>, <anno>SelectInfo</anno>}. + </c> + </seetype> + If the caller does not want to wait to send the rest of the data, + it should immediately cancel the operation with + <seemfa marker="#cancel/2"><c>cancel/2</c></seemfa>. + </p> + </desc> + </func> + + <func> + <name name="sendfile" arity="5" clause_i="3" + since="OTP @OTP-17154@" + anchor="sendfile-cont"/> + <name name="sendfile" arity="5" clause_i="4" + since="OTP @OTP-17154@"/> + <name name="sendfile" arity="5" clause_i="1" + since="OTP @OTP-17154@"/> + <name name="sendfile" arity="5" clause_i="2" + since="OTP @OTP-17154@"/> + <fsummary>Send file data on a socket, continuation.</fsummary> + <desc> + <p> + Continues sending file data on a socket, where the + send operation was initiated by + <seeerl marker="#sendfile-nowait"><c>sendfile/3,5</c></seeerl> + that returned a <c>SelectInfo</c> continuation. + Otherwise like + <seeerl marker="#sendfile-infinity"> + "infinite" time-out <c>sendfile/5</c> + </seeerl> + , + <seeerl marker="#sendfile-timeout"> + limited time-out <c>sendfile/5</c> + </seeerl> + or + <seeerl marker="#sendfile-nowait"> + nowait <c>sendfile/5</c> + </seeerl> + respectively. + </p> + <p> + <c><anno>Cont</anno></c> is the <c>SelectInfo</c> + that was returned from the previous <c>sendfile()</c> call. + </p> + <p> + The return value indicates the result from + the platform's network layer. + See + <seeerl marker="#sendfile-infinity"> + "infinite" time-out <c>sendfile/5</c>. + </seeerl> + </p> + </desc> + </func> + + <func> + <name since="OTP @OTP-17154@">sendfile(Socket, FileHandle, Offset, Count) + -> Result</name> + <fsummary> + Send file data on a socket, with "infinite" time-out. + </fsummary> + <type> + <v>Socket = <seetype marker="#socket">socket()</seetype></v> + <v>FileHandle = <seetype marker="file#fd">file:fd()</seetype></v> + <v>Offset = integer()</v> + <v>Count = integer() >= 0</v> + </type> + <desc> + <p> + The same as + <seeerl marker="#sendfile-infinity"> + <c> + sendfile(Socket, FileHandle, Offset, Count, infinity), + </c> + </seeerl> + that is: send the file data at <c>Offset</c> and <c>Count</c> + to the socket, without time-out + other than from the platform's network stack. + </p> + </desc> + </func> + + <func> + <name since="OTP @OTP-17154@">sendfile(Socket, FileHandle, Timeout) + -> Result</name> + <fsummary> + Send a file's data on a socket. + </fsummary> + <type> + <v>Socket = <seetype marker="#socket">socket()</seetype></v> + <v>FileHandle = <seetype marker="file#fd">file:fd()</seetype></v> + <v> + Timeout = timeout() | 'nowait' | + <seetype marker="#select_handle">select_handle()</seetype> + </v> + </type> + <desc> + <p> + Depending on the <c>Timeout</c> argument; the same as + <seeerl marker="#sendfile-infinity"> + <c> + sendfile(Socket, FileHandle, 0, 0, infinity), + </c> + </seeerl> + <seeerl marker="#sendfile-timeout"> + <c> + sendfile(Socket, FileHandle, 0, 0, Timeout), + </c> + </seeerl> + or + <seeerl marker="#sendfile-nowait"> + <c> + sendfile(Socket, FileHandle, 0, 0, SelectHandle), + </c> + </seeerl> + that is: send all data in the file to the socket, + with the given <c>Timeout</c>. + </p> + </desc> + </func> + + <func> + <name since="OTP @OTP-17154@">sendfile(Socket, FileHandle) + -> Result</name> + <fsummary> + Send a file's data on a socket, with "infinite" time-out. + </fsummary> + <type> + <v>Socket = <seetype marker="#socket">socket()</seetype></v> + <v>FileHandle = <seetype marker="file#fd">file:fd()</seetype></v> + </type> + <desc> + <p> + The same as + <seeerl marker="#sendfile-infinity"> + <c> + sendfile(Socket, FileHandle, 0, 0, infinity), + </c> + </seeerl> + that is: send all data in the file to the socket, + without time-out other than from the platform's network stack. + </p> + </desc> + </func> + + <func> <name name="setopt" arity="3" clause_i="1" since="OTP 22.0"/> <fsummary> Set a socket option in the protocol level <c>otp</c>. diff --git a/lib/kernel/src/file.erl b/lib/kernel/src/file.erl index d77f214d32..57d3add4cb 100644 --- a/lib/kernel/src/file.erl +++ b/lib/kernel/src/file.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 1996-2019. All Rights Reserved. +%% Copyright Ericsson AB 1996-2021. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -1290,7 +1290,8 @@ change_time(Name, {{AY, AM, AD}, {AH, AMin, ASec}}=Atime, {'ok', non_neg_integer()} | {'error', inet:posix() | closed | badarg | not_owner} when RawFile :: fd(), - Socket :: inet:socket(), + Socket :: inet:socket() | socket:socket() | + fun ((iolist()) -> ok | {error, inet:posix() | closed}), Offset :: non_neg_integer(), Bytes :: non_neg_integer(), Opts :: [sendfile_option()]. @@ -1315,9 +1316,10 @@ sendfile(File, Sock, Offset, Bytes, Opts) -> %% sendfile/2 -spec sendfile(Filename, Socket) -> {'ok', non_neg_integer()} | {'error', inet:posix() | - closed | badarg | not_owner} - when Filename :: name_all(), - Socket :: inet:socket(). + closed | badarg | not_owner} when + Filename :: name_all(), + Socket :: inet:socket() | socket:socket() | + fun ((iolist()) -> ok | {error, inet:posix() | closed}). sendfile(Filename, Sock) -> case file:open(Filename, [read, raw, binary]) of {error, Reason} -> @@ -1328,14 +1330,41 @@ sendfile(Filename, Sock) -> Res end. +-define(module_socket(Handler, Handle), + {'$inet', (Handler), (Handle)}). +-define(socket(Handle), + {'$socket', (Handle)}). + %% Internal sendfile functions sendfile(#file_descriptor{ module = Mod } = Fd, Sock, Offset, Bytes, ChunkSize, Headers, Trailers, Opts) when is_integer(Offset), is_integer(Bytes) -> case Sock of - {'$inet', _, _} -> + ?socket(_) when Headers =:= [], Trailers =:= [] -> + try socket:sendfile(Sock, Fd, Offset, Bytes, infinity) + catch error : notsup -> + sendfile_fallback( + Fd, socket_send(Sock), Offset, Bytes, ChunkSize, + Headers, Trailers) + end; + ?socket(_) -> sendfile_fallback( - Fd, Sock, Offset, Bytes, ChunkSize, + Fd, socket_send(Sock), Offset, Bytes, ChunkSize, + Headers, Trailers); + ?module_socket(GenTcpMod, _) when Headers =:= [], Trailers =:= [] -> + case + GenTcpMod:sendfile(Sock, Fd, Offset, Bytes) + of + {error, enotsup} -> + sendfile_fallback( + Fd, gen_tcp_send(Sock), Offset, Bytes, ChunkSize, + Headers, Trailers); + Else -> + Else + end; + ?module_socket(_, _) -> + sendfile_fallback( + Fd, gen_tcp_send(Sock), Offset, Bytes, ChunkSize, Headers, Trailers); _ when is_port(Sock) -> case Mod:sendfile( @@ -1343,49 +1372,63 @@ sendfile(#file_descriptor{ module = Mod } = Fd, Sock, Offset, Bytes, Headers, Trailers, Opts) of {error, enotsup} -> sendfile_fallback( - Fd, Sock, Offset, Bytes, ChunkSize, + Fd, gen_tcp_send(Sock), Offset, Bytes, ChunkSize, Headers, Trailers); Else -> Else - end + end; + _ when is_function(Sock, 1) -> + sendfile_fallback( + Fd, Sock, Offset, Bytes, ChunkSize, + Headers, Trailers) end; sendfile(_,_,_,_,_,_,_,_) -> {error, badarg}. +socket_send(Sock) -> + fun (Data) -> + socket:send(Sock, Data) + end. + +gen_tcp_send(Sock) -> + fun (Data) -> + gen_tcp:send(Sock, Data) + end. + %%% %% Sendfile Fallback %%% -sendfile_fallback(File, Sock, Offset, Bytes, ChunkSize, +sendfile_fallback(File, Send, Offset, Bytes, ChunkSize, Headers, Trailers) when Headers == []; is_integer(Headers) -> - case sendfile_fallback(File, Sock, Offset, Bytes, ChunkSize) of + case sendfile_fallback(File, Send, Offset, Bytes, ChunkSize) of {ok, BytesSent} when is_list(Trailers), Trailers =/= [], is_integer(Headers) -> - sendfile_send(Sock, Trailers, BytesSent+Headers); + sendfile_send(Send, Trailers, BytesSent+Headers); {ok, BytesSent} when is_list(Trailers), Trailers =/= [] -> - sendfile_send(Sock, Trailers, BytesSent); + sendfile_send(Send, Trailers, BytesSent); {ok, BytesSent} when is_integer(Headers) -> {ok, BytesSent + Headers}; Else -> Else end; -sendfile_fallback(File, Sock, Offset, Bytes, ChunkSize, Headers, Trailers) -> - case sendfile_send(Sock, Headers, 0) of +sendfile_fallback(File, Send, Offset, Bytes, ChunkSize, Headers, Trailers) -> + case sendfile_send(Send, Headers, 0) of {ok, BytesSent} -> - sendfile_fallback(File, Sock, Offset, Bytes, ChunkSize, BytesSent, + sendfile_fallback(File, Send, Offset, Bytes, ChunkSize, BytesSent, Trailers); Else -> Else end. -sendfile_fallback(File, Sock, Offset, Bytes, ChunkSize) +sendfile_fallback(File, Send, Offset, Bytes, ChunkSize) when 0 =< Bytes -> {ok, CurrPos} = file:position(File, {cur, 0}), case file:position(File, {bof, Offset}) of {ok, _NewPos} -> - Res = sendfile_fallback_int(File, Sock, Bytes, ChunkSize, 0), + Res = sendfile_fallback_int(File, Send, Bytes, ChunkSize, 0), _ = file:position(File, {bof, CurrPos}), Res; Error -> @@ -1395,7 +1438,7 @@ sendfile_fallback(_, _, _, _, _) -> {error, einval}. -sendfile_fallback_int(File, Sock, Bytes, ChunkSize, BytesSent) +sendfile_fallback_int(File, Send, Bytes, ChunkSize, BytesSent) when Bytes > BytesSent; Bytes == 0 -> Size = if Bytes == 0 -> ChunkSize; @@ -1406,10 +1449,10 @@ sendfile_fallback_int(File, Sock, Bytes, ChunkSize, BytesSent) end, case file:read(File, Size) of {ok, Data} -> - case sendfile_send(Sock, Data, BytesSent) of + case sendfile_send(Send, Data, BytesSent) of {ok,NewBytesSent} -> sendfile_fallback_int( - File, Sock, Bytes, ChunkSize, + File, Send, Bytes, ChunkSize, NewBytesSent); Error -> Error @@ -1419,12 +1462,12 @@ sendfile_fallback_int(File, Sock, Bytes, ChunkSize, BytesSent) Error -> Error end; -sendfile_fallback_int(_File, _Sock, BytesSent, _ChunkSize, BytesSent) -> +sendfile_fallback_int(_File, _Send, BytesSent, _ChunkSize, BytesSent) -> {ok, BytesSent}. -sendfile_send(Sock, Data, Old) -> +sendfile_send(Send, Data, Old) -> Len = iolist_size(Data), - case gen_tcp:send(Sock, Data) of + case Send(Data) of ok -> {ok, Len+Old}; Else -> diff --git a/lib/kernel/src/gen_tcp.erl b/lib/kernel/src/gen_tcp.erl index f0204802e3..c69524daf6 100644 --- a/lib/kernel/src/gen_tcp.erl +++ b/lib/kernel/src/gen_tcp.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 1997-2020. All Rights Reserved. +%% Copyright Ericsson AB 1997-2021. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -30,6 +30,9 @@ -include("inet_int.hrl"). -include("file.hrl"). +-define(module_socket(Handler, Handle), + {'$inet', (Handler), (Handle)}). + -type option() :: {active, true | false | once | -32768..32767} | {buffer, non_neg_integer()} | @@ -231,7 +234,7 @@ listen(Port, Opts0) -> Socket :: socket(), Reason :: closed | system_limit | inet:posix(). -accept({'$inet', GenTcpMod, _} = S) when is_atom(GenTcpMod) -> +accept(?module_socket(GenTcpMod, _) = S) when is_atom(GenTcpMod) -> GenTcpMod:?FUNCTION_NAME(S, infinity); accept(S) when is_port(S) -> case inet_db:lookup_socket(S) of @@ -247,7 +250,7 @@ accept(S) when is_port(S) -> Socket :: socket(), Reason :: closed | timeout | system_limit | inet:posix(). -accept({'$inet', GenTcpMod, _} = S, Time) when is_atom(GenTcpMod) -> +accept(?module_socket(GenTcpMod, _) = S, Time) when is_atom(GenTcpMod) -> GenTcpMod:?FUNCTION_NAME(S, Time); accept(S, Time) when is_port(S) -> case inet_db:lookup_socket(S) of @@ -266,7 +269,7 @@ accept(S, Time) when is_port(S) -> How :: read | write | read_write, Reason :: inet:posix(). -shutdown({'$inet', GenTcpMod, _} = S, How) when is_atom(GenTcpMod) -> +shutdown(?module_socket(GenTcpMod, _) = S, How) when is_atom(GenTcpMod) -> GenTcpMod:?FUNCTION_NAME(S, How); shutdown(S, How) when is_port(S) -> case inet_db:lookup_socket(S) of @@ -283,7 +286,7 @@ shutdown(S, How) when is_port(S) -> -spec close(Socket) -> ok when Socket :: socket(). -close({'$inet', GenTcpMod, _} = S) when is_atom(GenTcpMod) -> +close(?module_socket(GenTcpMod, _) = S) when is_atom(GenTcpMod) -> GenTcpMod:?FUNCTION_NAME(S); close(S) -> inet:tcp_close(S). @@ -298,7 +301,7 @@ close(S) -> Reason :: closed | {timeout, RestData} | inet:posix(), RestData :: binary(). -send({'$inet', GenTcpMod, _} = S, Packet) when is_atom(GenTcpMod) -> +send(?module_socket(GenTcpMod, _) = S, Packet) when is_atom(GenTcpMod) -> GenTcpMod:?FUNCTION_NAME(S, Packet); send(S, Packet) when is_port(S) -> case inet_db:lookup_socket(S) of @@ -319,7 +322,7 @@ send(S, Packet) when is_port(S) -> Reason :: closed | inet:posix(), HttpPacket :: term(). -recv({'$inet', GenTcpMod, _} = S, Length) when is_atom(GenTcpMod) -> +recv(?module_socket(GenTcpMod, _) = S, Length) when is_atom(GenTcpMod) -> GenTcpMod:?FUNCTION_NAME(S, Length, infinity); recv(S, Length) when is_port(S) -> case inet_db:lookup_socket(S) of @@ -337,7 +340,7 @@ recv(S, Length) when is_port(S) -> Reason :: closed | timeout | inet:posix(), HttpPacket :: term(). -recv({'$inet', GenTcpMod, _} = S, Length, Time) when is_atom(GenTcpMod) -> +recv(?module_socket(GenTcpMod, _) = S, Length, Time) when is_atom(GenTcpMod) -> GenTcpMod:?FUNCTION_NAME(S, Length, Time); recv(S, Length, Time) when is_port(S) -> case inet_db:lookup_socket(S) of @@ -347,7 +350,7 @@ recv(S, Length, Time) when is_port(S) -> Error end. -unrecv({'$inet', GenTcpMod, _} = S, Data) when is_atom(GenTcpMod) -> +unrecv(?module_socket(GenTcpMod, _) = S, Data) when is_atom(GenTcpMod) -> GenTcpMod:?FUNCTION_NAME(S, Data); unrecv(S, Data) when is_port(S) -> case inet_db:lookup_socket(S) of @@ -366,7 +369,7 @@ unrecv(S, Data) when is_port(S) -> Pid :: pid(), Reason :: closed | not_owner | badarg | inet:posix(). -controlling_process({'$inet', GenTcpMod, _} = S, NewOwner) +controlling_process(?module_socket(GenTcpMod, _) = S, NewOwner) when is_atom(GenTcpMod) -> GenTcpMod:?FUNCTION_NAME(S, NewOwner); controlling_process(S, NewOwner) -> diff --git a/lib/kernel/src/gen_tcp_socket.erl b/lib/kernel/src/gen_tcp_socket.erl index 53e449c47f..dad5b06200 100644 --- a/lib/kernel/src/gen_tcp_socket.erl +++ b/lib/kernel/src/gen_tcp_socket.erl @@ -24,6 +24,7 @@ %% gen_tcp -export([connect/4, listen/2, accept/2, send/2, recv/3, + sendfile/4, shutdown/2, close/1, controlling_process/2]). %% inet -export([setopts/2, getopts/2, @@ -400,6 +401,40 @@ send_result(Server, Meta, Result) -> end. %% ------------------------------------------------------------------------- +%% Handler called by file:sendfile/5 to handle ?module_socket()s +%% as a sibling of prim_file:sendfile/8 + +sendfile( + ?module_socket(_Server, Socket), + FileHandle, Offset, Count) -> + %% + case socket:getopt(Socket, {otp,meta}) of + {ok, #{packet := _}} -> + try + %% XXX should we do cork/uncork here, like in prim_inet? + %% And, maybe file:advise too, like prim_file + socket:sendfile(Socket, FileHandle, Offset, Count, infinity) + catch + Class : Reason : Stacktrace + when Class =:= error, Reason =:= badarg -> + %% Convert badarg exception into return value + %% to look like file:sendfile + case Stacktrace of + [{socket, sendfile, _, _} | _] -> + {Class, Reason}; + _ -> + erlang:raise(Class, Reason, Stacktrace) + end; + Class : notsup when Class =:= error -> + {Class, enotsup} + end; + {ok, _BadMeta} -> + {error, badarg}; + {error, _} = Error -> + Error + end. + +%% ------------------------------------------------------------------------- recv(?module_socket(Server, _Socket), Length, Timeout) -> ?badarg_exit(call(Server, {recv, Length, Timeout})). diff --git a/lib/kernel/src/inet.erl b/lib/kernel/src/inet.erl index 4af31b86cc..c41e57345f 100644 --- a/lib/kernel/src/inet.erl +++ b/lib/kernel/src/inet.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 1997-2020. All Rights Reserved. +%% Copyright Ericsson AB 1997-2021. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -130,6 +130,7 @@ 'ewouldblock' | 'exbadport' | 'exbadseq' | file:posix(). -type module_socket() :: {'$inet', Handler :: module(), Handle :: term()}. +-define(module_socket(Handler, Handle), {'$inet', (Handler), (Handle)}). -type socket() :: port() | module_socket(). -type socket_setopt() :: @@ -213,7 +214,7 @@ close(Socket) -> returned_non_ip_address()} | {error, posix()}. -peername({'$inet', GenSocketMod, _} = Socket) when is_atom(GenSocketMod) -> +peername(?module_socket(GenSocketMod, _) = Socket) when is_atom(GenSocketMod) -> GenSocketMod:?FUNCTION_NAME(Socket); peername(Socket) -> prim_inet:peername(Socket). @@ -257,7 +258,7 @@ peernames(Socket, Assoc) -> returned_non_ip_address()} | {error, posix()}. -sockname({'$inet', GenSocketMod, _} = Socket) when is_atom(GenSocketMod) -> +sockname(?module_socket(GenSocketMod, _) = Socket) when is_atom(GenSocketMod) -> GenSocketMod:?FUNCTION_NAME(Socket); sockname(Socket) -> prim_inet:sockname(Socket). @@ -299,7 +300,7 @@ socknames(Socket, Assoc) -> Socket :: socket(), Port :: port_number(). -port({'$inet', GenSocketMod, _} = Socket) when is_atom(GenSocketMod) -> +port(?module_socket(GenSocketMod, _) = Socket) when is_atom(GenSocketMod) -> case GenSocketMod:sockname(Socket) of {ok, {_, Port}} -> {ok, Port}; {error, _} = Error -> Error @@ -320,7 +321,7 @@ send(Socket, Packet) -> Socket :: socket(), Options :: [socket_setopt()]. -setopts({'$inet', GenSocketMod, _} = Socket, Opts) when is_atom(GenSocketMod) -> +setopts(?module_socket(GenSocketMod, _) = Socket, Opts) when is_atom(GenSocketMod) -> GenSocketMod:?FUNCTION_NAME(Socket, Opts); setopts(Socket, Opts) -> SocketOpts = @@ -338,7 +339,7 @@ setopts(Socket, Opts) -> Options :: [socket_getopt()], OptionValues :: [socket_setopt() | gen_tcp:pktoptions_value()]. -getopts({'$inet', GenSocketMod, _} = Socket, Opts) +getopts(?module_socket(GenSocketMod, _) = Socket, Opts) when is_atom(GenSocketMod) -> GenSocketMod:?FUNCTION_NAME(Socket, Opts); getopts(Socket, Opts) -> @@ -531,7 +532,7 @@ getstat(Socket) -> Options :: [stat_option()], OptionValues :: [{stat_option(), integer()}]. -getstat({'$inet', GenSocketMod, _} = Socket, What) +getstat(?module_socket(GenSocketMod, _) = Socket, What) when is_atom(GenSocketMod) -> GenSocketMod:?FUNCTION_NAME(Socket, What); getstat(Socket, What) -> diff --git a/lib/kernel/src/socket.erl b/lib/kernel/src/socket.erl index 63e4ceea1d..c4bee76aa4 100644 --- a/lib/kernel/src/socket.erl +++ b/lib/kernel/src/socket.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2020. All Rights Reserved. +%% Copyright Ericsson AB 2021. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -44,6 +44,8 @@ sendto/3, sendto/4, sendto/5, sendmsg/2, sendmsg/3, sendmsg/4, + sendfile/2, sendfile/3, sendfile/4, sendfile/5, + recv/1, recv/2, recv/3, recv/4, recvfrom/1, recvfrom/2, recvfrom/3, recvfrom/4, recvmsg/1, recvmsg/2, recvmsg/3, recvmsg/4, recvmsg/5, @@ -62,6 +64,7 @@ -export_type([ socket/0, + socket_handle/0, select_tag/0, select_handle/0, @@ -120,6 +123,9 @@ extended_err/0 ]). +%% We need #file_descriptor{} for sendfile/2,3,4,5 +-include("file_int.hrl"). + %% Also in prim_socket -define(REGISTRY, socket_registry). @@ -130,22 +136,30 @@ iov_max := non_neg_integer(), use_registry := boolean()}. --type socket_counters() :: #{read_byte := non_neg_integer(), - read_fails := non_neg_integer(), - read_pkg := non_neg_integer(), - read_pkg_max := non_neg_integer(), - read_tries := non_neg_integer(), - read_waits := non_neg_integer(), - write_byte := non_neg_integer(), - write_fails := non_neg_integer(), - write_pkg := non_neg_integer(), - write_pkg_max := non_neg_integer(), - write_tries := non_neg_integer(), - write_waits := non_neg_integer(), - acc_success := non_neg_integer(), - acc_fails := non_neg_integer(), - acc_tries := non_neg_integer(), - acc_waits := non_neg_integer()}. +-type socket_counters() :: #{read_byte := non_neg_integer(), + read_fails := non_neg_integer(), + read_pkg := non_neg_integer(), + read_pkg_max := non_neg_integer(), + read_tries := non_neg_integer(), + read_waits := non_neg_integer(), + write_byte := non_neg_integer(), + write_fails := non_neg_integer(), + write_pkg := non_neg_integer(), + write_pkg_max := non_neg_integer(), + write_tries := non_neg_integer(), + write_waits := non_neg_integer(), + sendfile => non_neg_integer(), + sendfile_byte => non_neg_integer(), + sendfile_fails => non_neg_integer(), + sendfile_max => non_neg_integer(), + sendfile_pkg => non_neg_integer(), + sendfile_pkg_max => non_neg_integer(), + sendfile_tries => non_neg_integer(), + sendfile_waits => non_neg_integer(), + acc_success := non_neg_integer(), + acc_fails := non_neg_integer(), + acc_tries := non_neg_integer(), + acc_waits := non_neg_integer()}. -type socket_info() :: #{domain := domain() | integer(), type := type() | integer(), @@ -500,7 +514,8 @@ %% Messages sent from the nif-code to erlang processes: -define(socket_msg(Socket, Tag, Info), {?socket_tag, (Socket), (Tag), (Info)}). --opaque socket() :: ?socket(reference()). +-type socket() :: ?socket(socket_handle()). +-opaque socket_handle() :: reference(). %% Some flags are used for send, others for recv, and yet again %% others are found in a cmsg(). They may occur in multiple locations.. @@ -1128,7 +1143,7 @@ connect_deadline(SockRef, SockAddr, Deadline) -> ?socket_msg(_Socket, abort, {Ref, Reason}) -> {error, Reason} after Timeout -> - cancel(SockRef, connect, Ref), + _ = cancel(SockRef, connect, Ref), {error, timeout} end; Result -> @@ -1257,7 +1272,7 @@ accept_deadline(LSockRef, Deadline) -> ?socket_msg(_Socket, abort, {AccRef, Reason}) -> {error, Reason} after Timeout -> - cancel(LSockRef, accept, AccRef), + _ = cancel(LSockRef, accept, AccRef), {error, timeout} end; Result -> @@ -1270,7 +1285,8 @@ accept_result(LSockRef, AccRef, Result) -> Socket = ?socket(SockRef), {ok, Socket}; {error, _} = ERROR -> - cancel(LSockRef, accept, AccRef), % Just to be on the safe side... + %% Just to be on the safe side... + _ = cancel(LSockRef, accept, AccRef), ERROR end. @@ -1479,20 +1495,26 @@ send(Socket, Data, Timeout) -> RestData :: binary(), Reason :: posix() | 'closed' | invalid(). -send(?socket(SockRef), Data, ?SELECT_INFO({send, Cont}, _), Timeout) +send(?socket(SockRef), Data, ?SELECT_INFO(SelectTag, _) = Cont, Timeout) when is_reference(SockRef), is_binary(Data) -> - case deadline(Timeout) of - invalid -> - erlang:error({invalid, {timeout, Timeout}}); - nowait -> - SelectHandle = make_ref(), - send_nowait_cont(SockRef, Data, Cont, SelectHandle); - select_handle -> - SelectHandle = Timeout, - send_nowait_cont(SockRef, Data, Cont, SelectHandle); - Deadline -> - HasWritten = false, - send_deadline_cont(SockRef, Data, Cont, Deadline, HasWritten) + case SelectTag of + {send, ContData} -> + case deadline(Timeout) of + invalid -> + erlang:error({invalid, {timeout, Timeout}}); + nowait -> + SelectHandle = make_ref(), + send_nowait_cont(SockRef, Data, ContData, SelectHandle); + select_handle -> + SelectHandle = Timeout, + send_nowait_cont(SockRef, Data, ContData, SelectHandle); + Deadline -> + HasWritten = false, + send_deadline_cont( + SockRef, Data, ContData, Deadline, HasWritten) + end; + _ -> + {error, {invalid, Cont}} end; send(?socket(SockRef), Data, Flags, Timeout) when is_reference(SockRef), is_binary(Data), is_list(Flags) -> @@ -1553,10 +1575,10 @@ send_deadline_cont(SockRef, Bin, Cont, Deadline, HasWritten) -> -compile({inline, [send_common_nowait_result/3]}). send_common_nowait_result(SelectHandle, Op, Result) -> case Result of - {select, Cont} -> - {select, ?SELECT_INFO({Op, Cont}, SelectHandle)}; - {select, Data, Cont} -> - {ok, {Data, ?SELECT_INFO({Op, Cont}, SelectHandle)}}; + {select, ContData} -> + {select, ?SELECT_INFO({Op, ContData}, SelectHandle)}; + {select, Data, ContData} -> + {ok, {Data, ?SELECT_INFO({Op, ContData}, SelectHandle)}}; %% Result -> Result @@ -1772,23 +1794,29 @@ sendto(Socket, Data, Dest, Flags) when is_list(Flags) -> sendto(Socket, Data, Dest, Flags, ?ESOCK_SENDTO_TIMEOUT_DEFAULT); sendto( ?socket(SockRef) = Socket, Data, - ?SELECT_INFO({sendto, Cont}, _) = SI, Timeout) + ?SELECT_INFO(SelectTag, _) = Cont, Timeout) when is_reference(SockRef) -> - case Data of - Bin when is_binary(Bin) -> - sendto_timeout_cont(SockRef, Bin, Cont, Timeout); - [Bin] when is_binary(Bin) -> - sendto_timeout_cont(SockRef, Bin, Cont, Timeout); - IOV when is_list(IOV) -> - try erlang:list_to_binary(IOV) of - Bin -> - sendto_timeout_cont(SockRef, Bin, Cont, Timeout) - catch - error : badarg -> - erlang:error({invalid, {data, Data}}) + case SelectTag of + {sendto, ContData} -> + case Data of + Bin when is_binary(Bin) -> + sendto_timeout_cont(SockRef, Bin, ContData, Timeout); + [Bin] when is_binary(Bin) -> + sendto_timeout_cont(SockRef, Bin, ContData, Timeout); + IOV when is_list(IOV) -> + try erlang:list_to_binary(IOV) of + Bin -> + sendto_timeout_cont( + SockRef, Bin, ContData, Timeout) + catch + error : badarg -> + erlang:error({invalid, {data, Data}}) + end; + _ -> + erlang:error(badarg, [Socket, Data, Cont, Timeout]) end; _ -> - erlang:error(badarg, [Socket, Data, SI, Timeout]) + {error, {invalid, Cont}} end; sendto(Socket, Data, Dest, Timeout) -> sendto(Socket, Data, Dest, ?ESOCK_SENDTO_FLAGS_DEFAULT, Timeout). @@ -2136,15 +2164,20 @@ sendmsg(Socket, Msg, Timeout) -> sendmsg( ?socket(SockRef) = Socket, RestData, - ?SELECT_INFO({sendmsg, Cont}, _) = SI, Timeout) -> + ?SELECT_INFO(SelectTag, _) = Cont, Timeout) -> %% - case RestData of - #{iov := IOV} -> - sendmsg_timeout_cont(SockRef, IOV, Cont, Timeout); - IOV when is_list(IOV) -> - sendmsg_timeout_cont(SockRef, IOV, Cont, Timeout); + case SelectTag of + {sendmsg, ContData} -> + case RestData of + #{iov := IOV} -> + sendmsg_timeout_cont(SockRef, IOV, ContData, Timeout); + IOV when is_list(IOV) -> + sendmsg_timeout_cont(SockRef, IOV, ContData, Timeout); + _ -> + erlang:error(badarg, [Socket, RestData, Cont, Timeout]) + end; _ -> - erlang:error(badarg, [Socket, RestData, SI, Timeout]) + {error, {invalid, Cont}} end; sendmsg(?socket(SockRef), #{iov := IOV} = Msg, Flags, Timeout) when is_reference(SockRef), is_list(Flags) -> @@ -2206,6 +2239,275 @@ sendmsg_deadline_cont(SockRef, Data, Cont, Deadline, HasWritten) -> %% =========================================================================== %% +%% sendfile - send a file on a socket +%% + +sendfile(Socket, FileHandle) -> + sendfile(Socket, FileHandle, 0, 0, infinity). + +sendfile(Socket, FileHandle, Timeout) -> + sendfile(Socket, FileHandle, 0, 0, Timeout). + +sendfile(Socket, FileHandle_Cont, Offset, Count) -> + sendfile(Socket, FileHandle_Cont, Offset, Count, infinity). + + +-spec sendfile(Socket, Cont, Offset, Count, + SelectHandle :: 'nowait') -> + {'ok', BytesSent} | + {'ok', {BytesSent, SelectInfo}} | + {'error', Reason} + when + Socket :: socket(), + Cont :: select_info(), + Offset :: integer(), + Count :: non_neg_integer(), + BytesSent :: non_neg_integer(), + SelectInfo :: select_info(), + Reason :: posix() | 'closed' | invalid(); + + (Socket, Cont, Offset, Count, + SelectHandle :: select_handle()) -> + {'ok', BytesSent} | + {'ok', {BytesSent, SelectInfo}} | + {'error', {Reason, BytesSent}} + when + Socket :: socket(), + Cont :: select_info(), + Offset :: integer(), + Count :: non_neg_integer(), + BytesSent :: non_neg_integer(), + SelectInfo :: select_info(), + Reason :: posix() | 'closed' | invalid(); + + (Socket, Cont, Offset, Count, + Timeout :: 'infinity') -> + {'ok', BytesSent} | + {'error', Reason} | + {'error', {Reason, BytesSent}} + when + Socket :: socket(), + Cont :: select_info(), + Offset :: integer(), + Count :: non_neg_integer(), + BytesSent :: non_neg_integer(), + Reason :: posix() | 'closed' | invalid(); + + (Socket, Cont, Offset, Count, + Timeout :: non_neg_integer()) -> + {'ok', BytesSent} | + {'error', Reason | 'timeout'} | + {'error', {Reason | 'timeout', BytesSent}} + when + Socket :: socket(), + Cont :: select_info(), + Offset :: integer(), + Count :: non_neg_integer(), + BytesSent :: non_neg_integer(), + Reason :: posix() | 'closed' | invalid(); + + + (Socket, FileHandle, Offset, Count, + SelectHandle :: 'nowait') -> + {'ok', BytesSent} | + {'select', SelectInfo} | + {'ok', {BytesSent, SelectInfo}} | + {'error', Reason} + when + Socket :: socket(), + FileHandle :: file:fd(), + Offset :: integer(), + Count :: non_neg_integer(), + BytesSent :: non_neg_integer(), + SelectInfo :: select_info(), + Reason :: posix() | 'closed' | invalid(); + + (Socket, FileHandle, Offset, Count, + SelectHandle :: select_handle()) -> + {'ok', BytesSent} | + {'select', SelectInfo} | + {'ok', {BytesSent, SelectInfo}} | + {'error', Reason} + when + Socket :: socket(), + FileHandle :: file:fd(), + Offset :: integer(), + Count :: non_neg_integer(), + BytesSent :: non_neg_integer(), + SelectInfo :: select_info(), + Reason :: posix() | 'closed' | invalid(); + + (Socket, FileHandle, Offset, Count, + Timeout :: 'infinity') -> + {'ok', BytesSent} | + {'error', Reason} | + {'error', {Reason, BytesSent}} + when + Socket :: socket(), + FileHandle :: file:fd(), + Offset :: integer(), + Count :: non_neg_integer(), + BytesSent :: non_neg_integer(), + Reason :: posix() | 'closed' | invalid(); + + (Socket, FileHandle, Offset, Count, + Timeout :: non_neg_integer()) -> + {'ok', BytesSent} | + {'error', Reason | 'timeout'} | + {'error', {Reason | 'timeout', BytesSent}} + when + Socket :: socket(), + FileHandle :: file:fd(), + Offset :: integer(), + Count :: non_neg_integer(), + BytesSent :: non_neg_integer(), + Reason :: posix() | 'closed' | invalid(). + +sendfile( + ?socket(SockRef) = Socket, FileHandle_Cont, Offset, Count, Timeout) + when is_integer(Offset), is_integer(Count), 0 =< Count -> + %% + case FileHandle_Cont of + #file_descriptor{module = Module} = FileHandle -> + GetFdData = get_fd_data, + try Module:GetFdData(FileHandle) of + #{handle := FRef} -> + State = {FRef, Offset, Count}, + sendfile_int(SockRef, State, Timeout); + #{} -> + erlang:error( + badarg, + [Socket, FileHandle_Cont, Offset, Count, Timeout]) + catch + Class : Reason : Stacktrace + when Class =:= error, Reason =:= undef -> + case Stacktrace of + [{Module, GetFdData, _, _} | _] -> + erlang:error( + badarg, + [Socket, FileHandle_Cont, + Offset, Count, Timeout]); + _ -> % Re-raise + erlang:raise(Class, Reason, Stacktrace) + end + end; + ?SELECT_INFO(SelectTag, _) = Cont -> + case SelectTag of + {sendfile, FRef} -> + State = {FRef, Offset, Count}, + sendfile_int(SockRef, State, Timeout); + sendfile -> + State = {Offset, Count}, + sendfile_int(SockRef, State, Timeout); + _ -> + {error, {invalid, Cont}} + end; + _ -> + erlang:error( + badarg, [Socket, FileHandle_Cont, Offset, Count, Timeout]) + end; +sendfile(Socket, FileHandle_Cont, Offset, Count, Timeout) -> + erlang:error( + badarg, [Socket, FileHandle_Cont, Offset, Count, Timeout]). + +sendfile_int(SockRef, State, Timeout) -> + case deadline(Timeout) of + invalid -> + erlang:error({invalid, {timeout, Timeout}}); + nowait -> + SelectHandle = make_ref(), + sendfile_nowait(SockRef, State, SelectHandle); + select_handle -> + SelectHandle = Timeout, + sendfile_nowait(SockRef, State, SelectHandle); + Deadline -> + BytesSent = 0, + sendfile_deadline(SockRef, State, BytesSent, Deadline) + end. + + +-compile({inline, [prim_socket_sendfile/3]}). +prim_socket_sendfile(SockRef, {FRef, Offset, Count}, SelectHandle) -> + %% Start call + prim_socket:sendfile(SockRef, FRef, Offset, Count, SelectHandle); +prim_socket_sendfile(SockRef, {Offset, Count}, SelectHandle) -> + %% Continuation call + prim_socket:sendfile(SockRef, Offset, Count, SelectHandle). + +sendfile_nowait(SockRef, State, SelectHandle) -> + case prim_socket_sendfile(SockRef, State, SelectHandle) of + select -> + %% Can only happen when we are enqueued after + %% a send in progress so BytesSent is 0; + %% wait for continuation and later repeat start call + {FRef, _Offset, _Count} = State, + {select, ?SELECT_INFO({sendfile, FRef}, SelectHandle)}; + {select, BytesSent} -> + {ok, + {BytesSent, ?SELECT_INFO(sendfile, SelectHandle)}}; + %% + Result -> + Result + end. + +sendfile_deadline(SockRef, State, BytesSent_0, Deadline) -> + SelectHandle = make_ref(), + case prim_socket_sendfile(SockRef, State, SelectHandle) of + select -> + %% Can only happen when we are enqueued after + %% a send in progress so BytesSent is 0; + %% wait for continuation and repeat start call + Timeout = timeout(Deadline), + receive + ?socket_msg(_Socket, select, SelectHandle) -> + sendfile_deadline( + SockRef, State, BytesSent_0, Deadline); + ?socket_msg(_Socket, abort, {SelectHandle, Reason}) -> + {error, Reason} + after Timeout -> + _ = cancel(SockRef, sendfile, SelectHandle), + {error, timeout} + end; + {select, BytesSent} -> + %% Partial send success; wait for continuation + Timeout = timeout(Deadline), + BytesSent_1 = BytesSent_0 + BytesSent, + receive + ?socket_msg(_Socket, select, SelectHandle) -> + sendfile_deadline( + SockRef, + sendfile_next(BytesSent, State), + BytesSent_1, Deadline); + ?socket_msg(_Socket, abort, {SelectHandle, Reason}) -> + {error, {Reason, BytesSent_1}} + after Timeout -> + _ = cancel(SockRef, sendfile, SelectHandle), + {error, {timeout, BytesSent_1}} + end; + {error, _} = Result when tuple_size(State) =:= 3 -> + Result; + {error, Reason} when tuple_size(State) =:= 2 -> + {error, {Reason, BytesSent_0}}; + {ok, BytesSent} -> + {ok, BytesSent_0 + BytesSent} + end. + +sendfile_next(BytesSent, {_FRef, Offset, Count}) -> + sendfile_next(BytesSent, Offset, Count); +sendfile_next(BytesSent, {Offset, Count}) -> + sendfile_next(BytesSent, Offset, Count). +%% +sendfile_next(BytesSent, Offset, Count) -> + {Offset + BytesSent, + if + Count =:= 0 -> + 0; + BytesSent < Count -> + Count - BytesSent + end}. + +%% =========================================================================== +%% %% recv, recvfrom, recvmsg - receive a message from a socket %% %% Description: @@ -2495,7 +2797,7 @@ recv_deadline(SockRef, Length, Flags, Deadline, Acc) -> ?socket_msg(_Socket, abort, {SelectHandle, Reason}) -> {error, {Reason, bincat(Acc, Bin)}} after Timeout -> - cancel(SockRef, recv, SelectHandle), + _ = cancel(SockRef, recv, SelectHandle), {error, {timeout, bincat(Acc, Bin)}} end; %% @@ -2503,7 +2805,7 @@ recv_deadline(SockRef, Length, Flags, Deadline, Acc) -> %% We first got some data and are then asked to wait, %% but we only want the first that comes %% - cancel and return what we have - cancel(SockRef, recv, SelectHandle), + _ = cancel(SockRef, recv, SelectHandle), {ok, Acc}; select -> %% There is nothing just now, but we will be notified when there @@ -2522,7 +2824,7 @@ recv_deadline(SockRef, Length, Flags, Deadline, Acc) -> ?socket_msg(_Socket, abort, {SelectHandle, Reason}) -> recv_error(Acc, Reason) after Timeout -> - cancel(SockRef, recv, SelectHandle), + _ = cancel(SockRef, recv, SelectHandle), recv_error(Acc, timeout) end; Result -> @@ -2785,7 +3087,7 @@ recvfrom_deadline(SockRef, BufSz, Flags, Deadline) -> ?socket_msg(_Socket, abort, {SelectHandle, Reason}) -> {error, Reason} after Timeout -> - cancel(SockRef, recvfrom, SelectHandle), + _ = cancel(SockRef, recvfrom, SelectHandle), {error, timeout} end; Result -> @@ -3051,7 +3353,7 @@ recvmsg_deadline(SockRef, BufSz, CtrlSz, Flags, Deadline) -> ?socket_msg(_Socket, abort, {SelectHandle, Reason}) -> {error, Reason} after Timeout -> - cancel(SockRef, recvmsg, SelectHandle), + _ = cancel(SockRef, recvmsg, SelectHandle), {error, timeout} end; Result -> @@ -3309,29 +3611,37 @@ peername(Socket) -> SelectInfo :: select_info(), Reason :: 'closed' | invalid(). -cancel(?socket(SockRef), ?SELECT_INFO(Tag, Ref)) +cancel(?socket(SockRef), ?SELECT_INFO(SelectTag, SelectHandle) = SelectInfo) when is_reference(SockRef) -> - case Tag of - {OpName, _} when is_atom(OpName) -> - cancel(SockRef, OpName, Ref); - OpName when is_atom(OpName) -> - cancel(SockRef, OpName, Ref) + case SelectTag of + {Op, _} when is_atom(Op) -> + ok; + Op when is_atom(Op) -> + ok + end, + case cancel(SockRef, Op, SelectHandle) of + ok -> + ok; + invalid -> + {error, {invalid, SelectInfo}}; + Result -> + Result end; cancel(Socket, SelectInfo) -> erlang:error(badarg, [Socket, SelectInfo]). -cancel(SockRef, Op, OpRef) -> - case prim_socket:cancel(SockRef, Op, OpRef) of +cancel(SockRef, Op, SelectHandle) -> + case prim_socket:cancel(SockRef, Op, SelectHandle) of select_sent -> - flush_select_msg(SockRef, OpRef), - _ = flush_abort_msg(SockRef, OpRef), + flush_select_msg(SockRef, SelectHandle), + _ = flush_abort_msg(SockRef, SelectHandle), ok; not_found -> - _ = flush_abort_msg(SockRef, OpRef), - {error, {invalid, ?SELECT_INFO(Op, OpRef)}}; + _ = flush_abort_msg(SockRef, SelectHandle), + invalid; Result -> - _ = flush_abort_msg(SockRef, OpRef), + _ = flush_abort_msg(SockRef, SelectHandle), Result end. diff --git a/lib/kernel/test/sendfile_SUITE.erl b/lib/kernel/test/sendfile_SUITE.erl index a2a19caf35..7a734f056b 100644 --- a/lib/kernel/test/sendfile_SUITE.erl +++ b/lib/kernel/test/sendfile_SUITE.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2011-2020. All Rights Reserved. +%% Copyright Ericsson AB 2011-2021. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -90,21 +90,45 @@ init_per_testcase(TC,Config) when TC == t_sendfile_recvduring; Send = fun(Sock) -> {_Size, Data} = sendfile_file_info(Filename), {ok,Fd} = file:open(Filename, [raw,binary,read]), - %% Determine whether the driver has native support by - %% hitting the raw module directly; file:sendfile/5 will - %% land in the fallback if it doesn't. - RawModule = Fd#file_descriptor.module, - {ok, _Ignored} = RawModule:sendfile(Fd,Sock,0,0,0,[],[],[]), - Data + case Sock of + {'$inet', Handler, _} -> + case + Handler:sendfile(Sock, Fd, 0, 0) + of + {ok, _BytesSent} -> + Data; + {error, enotsup} -> + error(notsup) + end; + _ when is_port(Sock) -> + %% Determine whether the driver + %% has native support by calling + %% the raw module directly; file:sendfile/5 + %% would just use the fallback + %% and we would not be any wiser + RawModule = Fd#file_descriptor.module, + case + RawModule:sendfile(Fd,Sock,0,0,0,[],[],[]) + of + {ok, _Ignored} -> + Data; + {error, enotsup} -> + error(notsup) + end + end end, %% Check if sendfile is supported on this platform - case catch sendfile_send(Send) of - ok -> + try sendfile_send(Send) of + ok -> init_per_testcase(t_sendfile, Config); - Error -> - ct:log("Error: ~p",[Error]), - {skip,"Not supported"} + Other -> + {fail,{not_ok,Other}} + catch + error : notsup -> + {skip,"Not supported"}; + Class : Reason : Stacktrace -> + {fail,{Class, Reason, Stacktrace}} end; init_per_testcase(_TC,Config) -> case read_fd_info() of @@ -306,7 +330,7 @@ t_sendfile_sendduring(Config) -> {ok, #file_info{size = Size}} = file:read_file_info(Filename), spawn_link(fun() -> - timer:sleep(50), + erlang:yield(), ok = gen_tcp:send(Sock, <<2>>) end), {ok, Size} = sendfile(Filename, Sock, SendfileOpts), @@ -323,7 +347,7 @@ t_sendfile_recvduring(Config) -> {ok, #file_info{size = Size}} = file:read_file_info(Filename), spawn_link(fun() -> - timer:sleep(50), + timer:sleep(1), ok = gen_tcp:send(Sock, <<1>>), {ok,<<1>>} = gen_tcp:recv(Sock, 1) end), @@ -340,7 +364,7 @@ t_sendfile_closeduring(Config) -> Send = fun(Sock,SFServPid) -> spawn_link(fun() -> - timer:sleep(50), + timer:sleep(1), SFServPid ! stop end), case erlang:system_info(thread_pool_size) of @@ -355,6 +379,12 @@ t_sendfile_closeduring(Config) -> case sendfile(Filename, Sock, SendfileOpts) of {error, closed} -> ok; + {error, {closed, Size}} + when is_integer(Size) -> + ok; + {error, {epipe, Size}} + when is_integer(Size) -> + ok; {ok, Size} when is_integer(Size) -> ok end @@ -386,7 +416,7 @@ t_sendfile_crashduring(Config) -> Send = fun(Sock) -> spawn_link(fun() -> - timer:sleep(50), + timer:sleep(1), exit(die) end), {error, closed} = sendfile(Filename, Sock, SendfileOpts), @@ -416,23 +446,24 @@ t_sendfile_arguments(Config) -> {ok, Port} = inet:port(Listener), ErrorCheck = - fun(Reason, Offset, Length, Opts) -> + fun(Reasons, Offset, Length, Opts) -> {ok, Sender} = gen_tcp:connect({127, 0, 0, 1}, Port, [{packet, 0}, {active, false}]), {ok, Receiver} = gen_tcp:accept(Listener), {ok, Fd} = file:open(Filename, [read, raw]), {error, Reason} = file:sendfile(Fd, Sender, Offset, Length, Opts), + true = lists:member(Reason, Reasons), gen_tcp:close(Receiver), gen_tcp:close(Sender), file:close(Fd) end, - ErrorCheck(einval, -1, 0, []), - ErrorCheck(einval, 0, -1, []), - ErrorCheck(badarg, gurka, 0, []), - ErrorCheck(badarg, 0, gurka, []), - ErrorCheck(badarg, 0, 0, gurka), - ErrorCheck(badarg, 0, 0, [{chunk_size, gurka}]), + ErrorCheck([einval,badarg], -1, 0, []), + ErrorCheck([einval,badarg], 0, -1, []), + ErrorCheck([badarg], gurka, 0, []), + ErrorCheck([badarg], 0, gurka, []), + ErrorCheck([badarg], 0, 0, gurka), + ErrorCheck([badarg], 0, 0, [{chunk_size, gurka}]), gen_tcp:close(Listener), @@ -523,9 +554,10 @@ sendfile(Filename,Sock,Opts) -> {error, Reason} -> {error, Reason}; {ok, Fd} -> - Res = file:sendfile(Fd, Sock, 0, 0, Opts), - _ = file:close(Fd), - Res + try file:sendfile(Fd, Sock, 0, 0, Opts) + after + _ = file:close(Fd) + end end. %% This function returns the number of open fds on a system |