diff options
author | Micael Karlberg <bmk@erlang.org> | 2022-12-22 16:56:03 +0100 |
---|---|---|
committer | Micael Karlberg <bmk@erlang.org> | 2022-12-22 16:56:03 +0100 |
commit | 024dd020d1970565931af6eddebab7c59f13eda3 (patch) | |
tree | 1de99ed2264fddec159e7d30d252811ce89d8161 /erts | |
parent | 949baa2afb23d3a3b1c45b24ebb4b51071b50c23 (diff) | |
download | erlang-024dd020d1970565931af6eddebab7c59f13eda3.tar.gz |
[erts|esock] Moved the accept function to unix file
Moved the content of the function esock_accept to the
unix essio file (essio_accept).
OTP-18029
Diffstat (limited to 'erts')
-rw-r--r-- | erts/emulator/nifs/common/prim_socket_int.h | 102 | ||||
-rw-r--r-- | erts/emulator/nifs/common/prim_socket_nif.c | 846 | ||||
-rw-r--r-- | erts/emulator/nifs/common/socket_int.h | 4 | ||||
-rw-r--r-- | erts/emulator/nifs/unix/unix_socket_syncio.c | 508 |
4 files changed, 752 insertions, 708 deletions
diff --git a/erts/emulator/nifs/common/prim_socket_int.h b/erts/emulator/nifs/common/prim_socket_int.h index 2c7c58759e..c15dfbe190 100644 --- a/erts/emulator/nifs/common/prim_socket_int.h +++ b/erts/emulator/nifs/common/prim_socket_int.h @@ -108,6 +108,17 @@ typedef int SOCKET; /* A subset of HANDLE */ #endif +/* ********************************************************************* * + * Misc * + * ********************************************************************* * + */ + +#define ESOCK_GET_RESOURCE(ENV, REF, RES) \ + enif_get_resource((ENV), (REF), esocks, (RES)) + +#define ESOCK_MON2TERM(E, M) \ + esock_make_monitor_term((E), (M)) + /* ********************************************************************* * * Counter type and related "things" * @@ -160,6 +171,14 @@ typedef Uint64 ESockCounter; #endif +#define ESOCK_CNT_INC( __E__, __D__, SF, ACNT, CNT, INC) \ + do { \ + if (esock_cnt_inc((CNT), (INC))) { \ + esock_send_wrap_msg((__E__), (__D__), (SF), (ACNT)); \ + } \ + } while (0) + + /* ********************************************************************* * * (Socket) Debug macros * @@ -435,6 +454,14 @@ typedef struct { /* ======================================================================== * + * What to do about this? * + * ======================================================================== * + */ + +extern char* erl_errno_id(int error); /* THIS IS JUST TEMPORARY??? */ + + +/* ======================================================================== * * Functions * * ======================================================================== * */ @@ -454,6 +481,19 @@ extern void esock_send_reg_del_msg(ErlNifEnv* env, ESockDescriptor* descP, ERL_NIF_TERM sockRef); +/* *** Message sending functions *** */ +extern void esock_send_abort_msg(ErlNifEnv* env, + ESockDescriptor* descP, + ERL_NIF_TERM sockRef, + ESockRequestor* reqP, + ERL_NIF_TERM reason); +extern void esock_send_close_msg(ErlNifEnv* env, + ESockDescriptor* descP, + ErlNifPid* pid); +extern void esock_send_wrap_msg(ErlNifEnv* env, + ESockDescriptor* descP, + ERL_NIF_TERM sockRef, + ERL_NIF_TERM cnt); /* ** Monitor functions *** */ extern int esock_monitor(const char* slogan, @@ -472,8 +512,10 @@ extern BOOLEAN_T esock_monitor_eq(const ESockMonitor* monP, const ErlNifMonitor* mon); /* *** Counter functions *** */ -extern void esock_inc_socket(int domain, int type, int protocol); -extern void esock_dec_socket(int domain, int type, int protocol); +extern BOOLEAN_T esock_cnt_inc(ESockCounter* cnt, ESockCounter inc); +extern void esock_cnt_dec(ESockCounter* cnt, ESockCounter dec); +extern void esock_inc_socket(int domain, int type, int protocol); +extern void esock_dec_socket(int domain, int type, int protocol); /* *** Select functions *** */ extern int esock_select_read(ErlNifEnv* env, @@ -506,10 +548,66 @@ extern void esock_requestor_release(const char* slogan, ESockDescriptor* descP, ESockRequestor* reqP); +/* *** esock_activate_next_acceptor *** + * *** esock_activate_next_writer *** + * *** esock_activate_next_reader *** + * + * All the activate-next functions for acceptor, writer and reader + * have exactly the same API, so we apply some macro magic to simplify. + * They simply operates on dufferent data structures. + * + */ + +#define ACTIVATE_NEXT_FUNCS_DEFS \ + ACTIVATE_NEXT_FUNC_DEF(acceptor) \ + ACTIVATE_NEXT_FUNC_DEF(writer) \ + ACTIVATE_NEXT_FUNC_DEF(reader) + +#define ACTIVATE_NEXT_FUNC_DEF(F) \ + extern BOOLEAN_T esock_activate_next_##F(ErlNifEnv* env, \ + ESockDescriptor* descP, \ + ERL_NIF_TERM sockRef); +ACTIVATE_NEXT_FUNCS_DEFS +#undef ACTIVATE_NEXT_FUNC_DEF + +/* esock_acceptor_search4pid | esock_writer_search4pid | esock_reader_search4pid + * esock_acceptor_push | esock_writer_push | esock_reader_push + * esock_acceptor_pop | esock_writer_pop | esock_reader_pop + * esock_acceptor_unqueue | esock_writer_unqueue | esock_reader_unqueue + * + * All the queue operator functions (search4pid, push, pop + * and unqueue) for acceptor, writer and reader has exactly + * the same API, so we apply some macro magic to simplify. + */ + +#define ESOCK_OPERATOR_FUNCS_DEFS \ + ESOCK_OPERATOR_FUNCS_DEF(acceptor) \ + ESOCK_OPERATOR_FUNCS_DEF(writer) \ + ESOCK_OPERATOR_FUNCS_DEF(reader) + +#define ESOCK_OPERATOR_FUNCS_DEF(O) \ + extern BOOLEAN_T esock_##O##_search4pid(ErlNifEnv* env, \ + ESockDescriptor* descP, \ + ErlNifPid* pid); \ + extern void esock_##O##_push(ErlNifEnv* env, \ + ESockDescriptor* descP, \ + ErlNifPid pid, \ + ERL_NIF_TERM ref); \ + extern BOOLEAN_T esock_##O##_pop(ErlNifEnv* env, \ + ESockDescriptor* descP, \ + ESockRequestor* reqP); \ + extern BOOLEAN_T esock_##O##_unqueue(ErlNifEnv* env, \ + ESockDescriptor* descP, \ + ERL_NIF_TERM* refP, \ + const ErlNifPid* pidP); +ESOCK_OPERATOR_FUNCS_DEFS +#undef ESOCK_OPERATOR_FUNCS_DEF + /* *** Environment wrapper functions *** * These hould really be inline, but for now... */ extern void esock_free_env(const char* slogan, ErlNifEnv* env); extern ErlNifEnv* esock_alloc_env(const char* slogan); + #endif // PRIM_SOCKET_INT_H__ diff --git a/erts/emulator/nifs/common/prim_socket_nif.c b/erts/emulator/nifs/common/prim_socket_nif.c index 9675b38aa9..348eafc0ee 100644 --- a/erts/emulator/nifs/common/prim_socket_nif.c +++ b/erts/emulator/nifs/common/prim_socket_nif.c @@ -442,14 +442,6 @@ static void (*esock_sctp_freepaddrs)(struct sockaddr *addrs) = NULL; - - -#define ESOCK_GET_RESOURCE(ENV, REF, RES) \ - enif_get_resource((ENV), (REF), esocks, (RES)) - -#define ESOCK_MON2TERM(E, M) \ - esock_make_monitor_term((E), (M)) - #define ESOCK_RECV_BUFFER_COUNT_DEFAULT 0 #define ESOCK_RECV_BUFFER_SIZE_DEFAULT 8192 #define ESOCK_RECV_CTRL_BUFFER_SIZE_DEFAULT 1024 @@ -874,13 +866,6 @@ static const struct ioctl_flag { /* Global socket debug */ #define SGDBG( proto ) ESOCK_DBG_PRINTF( data.dbg , proto ) -#define ESOCK_CNT_INC( __E__, __D__, SF, ACNT, CNT, INC) \ - do { \ - if (cnt_inc((CNT), (INC))) { \ - esock_send_wrap_msg((__E__), (__D__), (SF), (ACNT)); \ - } \ - } while (0) - /* =================================================================== * * * @@ -943,12 +928,12 @@ static unsigned long one_value = 1; * vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv */ -#ifdef HAS_ACCEPT4 +// #ifdef HAS_ACCEPT4 // We have to figure out what the flags are... -#define sock_accept(s, addr, len) accept4((s), (addr), (len), (SOCK_CLOEXEC)) -#else -#define sock_accept(s, addr, len) accept((s), (addr), (len)) -#endif +// #define sock_accept(s, addr, len) accept4((s), (addr), (len), (SOCK_CLOEXEC)) +// #else +// #define sock_accept(s, addr, len) accept((s), (addr), (len)) +// #endif // #define sock_bind(s, addr, len) bind((s), (addr), (len)) #define sock_close(s) close((s)) #define sock_close_event(e) /* do nothing */ @@ -1005,7 +990,6 @@ static ESockSendfileCounters initESockSendfileCounters = */ -extern char* erl_errno_id(int error); /* THIS IS JUST TEMPORARY??? */ /* All the nif "callback" functions for the socket API has @@ -1140,50 +1124,6 @@ static ERL_NIF_TERM esock_supports_ioctl_requests(ErlNifEnv* env); static ERL_NIF_TERM esock_supports_ioctl_flags(ErlNifEnv* env); static ERL_NIF_TERM esock_supports_options(ErlNifEnv* env); -static ERL_NIF_TERM esock_accept(ErlNifEnv* env, - ESockDescriptor* descP, - ERL_NIF_TERM sockRef, - ERL_NIF_TERM ref); -static ERL_NIF_TERM esock_accept_listening_error(ErlNifEnv* env, - ESockDescriptor* descP, - ERL_NIF_TERM sockRef, - ERL_NIF_TERM accRef, - ErlNifPid caller, - int save_errno); -static ERL_NIF_TERM esock_accept_listening_accept(ErlNifEnv* env, - ESockDescriptor* descP, - ERL_NIF_TERM sockRef, - SOCKET accSock, - ErlNifPid caller); -static ERL_NIF_TERM esock_accept_accepting_current(ErlNifEnv* env, - ESockDescriptor* descP, - ERL_NIF_TERM sockRef, - ERL_NIF_TERM ref); -static ERL_NIF_TERM -esock_accept_accepting_current_accept(ErlNifEnv* env, - ESockDescriptor* descP, - ERL_NIF_TERM sockRef, - SOCKET accSock); -static ERL_NIF_TERM esock_accept_accepting_current_error(ErlNifEnv* env, - ESockDescriptor* descP, - ERL_NIF_TERM sockRef, - ERL_NIF_TERM opRef, - int save_errno); -static ERL_NIF_TERM esock_accept_accepting_other(ErlNifEnv* env, - ESockDescriptor* descP, - ERL_NIF_TERM ref, - ErlNifPid caller); -static ERL_NIF_TERM esock_accept_busy_retry(ErlNifEnv* env, - ESockDescriptor* descP, - ERL_NIF_TERM sockRef, - ERL_NIF_TERM accRef, - ErlNifPid* pidP); -static BOOLEAN_T esock_accept_accepted(ErlNifEnv* env, - ESockDescriptor* descP, - ERL_NIF_TERM sockRef, - SOCKET accSock, - ErlNifPid pid, - ERL_NIF_TERM* result); static ERL_NIF_TERM esock_send(ErlNifEnv* env, ESockDescriptor* descP, ERL_NIF_TERM sockRef, @@ -3370,14 +3310,11 @@ static int socket_setopt(int sock, static BOOLEAN_T ehow2how(ERL_NIF_TERM ehow, int* how); -static BOOLEAN_T cnt_inc(ESockCounter* cnt, ESockCounter inc); -static void cnt_dec(ESockCounter* cnt, ESockCounter dec); - -/* *** activate_next_acceptor *** - * *** activate_next_writer *** - * *** activate_next_reader *** +/* *** esock_activate_next_acceptor *** + * *** esock_activate_next_writer *** + * *** esock_activate_next_reader *** * * All the activate-next functions for acceptor, writer and reader * have exactly the same API, so we apply some macro magic to simplify. @@ -3390,17 +3327,17 @@ static void cnt_dec(ESockCounter* cnt, ESockCounter dec); ACTIVATE_NEXT_FUNC_DEF(writer) \ ACTIVATE_NEXT_FUNC_DEF(reader) -#define ACTIVATE_NEXT_FUNC_DEF(F) \ - static BOOLEAN_T activate_next_##F(ErlNifEnv* env, \ - ESockDescriptor* descP, \ - ERL_NIF_TERM sockRef); +#define ACTIVATE_NEXT_FUNC_DEF(F) \ + extern BOOLEAN_T esock_activate_next_##F(ErlNifEnv* env, \ + ESockDescriptor* descP, \ + ERL_NIF_TERM sockRef); ACTIVATE_NEXT_FUNCS_DEFS #undef ACTIVATE_NEXT_FUNC_DEF -/* *** acceptor_search4pid | writer_search4pid | reader_search4pid *** - * *** acceptor_push | writer_push | reader_push *** - * *** acceptor_pop | writer_pop | reader_pop *** - * *** acceptor_unqueue | writer_unqueue | reader_unqueue *** +/* esock_acceptor_search4pid | esock_writer_search4pid | esock_reader_search4pid + * esock_acceptor_push | esock_writer_push | esock_reader_push + * esock_acceptor_pop | esock_writer_pop | esock_reader_pop + * esock_acceptor_unqueue | esock_writer_unqueue | esock_reader_unqueue * * All the queue operator functions (search4pid, push, pop * and unqueue) for acceptor, writer and reader has exactly @@ -3412,21 +3349,21 @@ ACTIVATE_NEXT_FUNCS_DEFS ESOCK_OPERATOR_FUNCS_DEF(writer) \ ESOCK_OPERATOR_FUNCS_DEF(reader) -#define ESOCK_OPERATOR_FUNCS_DEF(O) \ - static BOOLEAN_T O##_search4pid(ErlNifEnv* env, \ - ESockDescriptor* descP, \ - ErlNifPid* pid); \ - static void O##_push(ErlNifEnv* env, \ - ESockDescriptor* descP, \ - ErlNifPid pid, \ - ERL_NIF_TERM ref); \ - static BOOLEAN_T O##_pop(ErlNifEnv* env, \ - ESockDescriptor* descP, \ - ESockRequestor* reqP); \ - static BOOLEAN_T O##_unqueue(ErlNifEnv* env, \ - ESockDescriptor* descP, \ - ERL_NIF_TERM* refP, \ - const ErlNifPid* pidP); +#define ESOCK_OPERATOR_FUNCS_DEF(O) \ + extern BOOLEAN_T esock_##O##_search4pid(ErlNifEnv* env, \ + ESockDescriptor* descP, \ + ErlNifPid* pid); \ + extern void esock_##O##_push(ErlNifEnv* env, \ + ESockDescriptor* descP, \ + ErlNifPid pid, \ + ERL_NIF_TERM ref); \ + extern BOOLEAN_T esock_##O##_pop(ErlNifEnv* env, \ + ESockDescriptor* descP, \ + ESockRequestor* reqP); \ + extern BOOLEAN_T esock_##O##_unqueue(ErlNifEnv* env, \ + ESockDescriptor* descP, \ + ERL_NIF_TERM* refP, \ + const ErlNifPid* pidP); ESOCK_OPERATOR_FUNCS_DEFS #undef ESOCK_OPERATOR_FUNCS_DEF @@ -3462,23 +3399,11 @@ static void esock_down_reader(ErlNifEnv* env, const ErlNifPid* pidP, const ErlNifMonitor* monP); -static void esock_send_wrap_msg(ErlNifEnv* env, - ESockDescriptor* descP, - ERL_NIF_TERM sockRef, - ERL_NIF_TERM cnt); -static void esock_send_close_msg(ErlNifEnv* env, - ESockDescriptor* descP, - ErlNifPid* pid); #ifdef HAVE_SENDFILE static void esock_send_sendfile_deferred_close_msg(ErlNifEnv* env, ESockDescriptor* descP); #endif -static void esock_send_abort_msg(ErlNifEnv* env, - ESockDescriptor* descP, - ERL_NIF_TERM sockRef, - ESockRequestor* reqP, - ERL_NIF_TERM reason); static BOOLEAN_T esock_send_msg(ErlNifEnv* env, ErlNifPid* pid, ERL_NIF_TERM msg, @@ -3599,6 +3524,10 @@ static const struct in6_addr in6addr_loopback = GLOBAL_ATOM_DECL(accept); \ GLOBAL_ATOM_DECL(acceptconn); \ GLOBAL_ATOM_DECL(acceptfilter); \ + GLOBAL_ATOM_DECL(acc_success); \ + GLOBAL_ATOM_DECL(acc_fails); \ + GLOBAL_ATOM_DECL(acc_tries); \ + GLOBAL_ATOM_DECL(acc_waits); \ GLOBAL_ATOM_DECL(adaption_layer); \ GLOBAL_ATOM_DECL(addr); \ GLOBAL_ATOM_DECL(addrform); \ @@ -3900,10 +3829,6 @@ ERL_NIF_TERM esock_atom_socket_tag; // This has a "special" name ('$socket') /* *** Local atoms *** */ #define LOCAL_ATOMS \ LOCAL_ATOM_DECL(accepting); \ - LOCAL_ATOM_DECL(acc_success); \ - LOCAL_ATOM_DECL(acc_fails); \ - LOCAL_ATOM_DECL(acc_tries); \ - LOCAL_ATOM_DECL(acc_waits); \ LOCAL_ATOM_DECL(adaptation_layer); \ LOCAL_ATOM_DECL(add); \ LOCAL_ATOM_DECL(addr_unreach); \ @@ -4683,10 +4608,10 @@ ERL_NIF_TERM esock_socket_info_counters(ErlNifEnv* env, atom_write_pkg_max, atom_write_tries, atom_write_waits, - atom_acc_success, - atom_acc_fails, - atom_acc_tries, - atom_acc_waits}; + esock_atom_acc_success, + esock_atom_acc_fails, + esock_atom_acc_tries, + esock_atom_acc_waits}; unsigned int numKeys = NUM(keys); ERL_NIF_TERM vals[] = {MKCNT(env, descP->readByteCnt), MKCNT(env, descP->readFails), @@ -5770,7 +5695,7 @@ ERL_NIF_TERM nif_accept(ErlNifEnv* env, descP->currentAcceptor.env, descP->currentAcceptor.ref) ); - res = esock_accept(env, descP, sockRef, ref); + res = ESOCK_IO_ACCEPT(env, descP, sockRef, ref); SSDBG( descP, ("SOCKET", "nif_accept(%T) -> done with" "\r\n res: %T" @@ -5784,491 +5709,6 @@ ERL_NIF_TERM nif_accept(ErlNifEnv* env, } -#ifndef __WIN32__ -static -ERL_NIF_TERM esock_accept(ErlNifEnv* env, - ESockDescriptor* descP, - ERL_NIF_TERM sockRef, - ERL_NIF_TERM accRef) -{ - ErlNifPid caller; - - ESOCK_ASSERT( enif_self(env, &caller) != NULL ); - - if (! IS_OPEN(descP->readState)) - return esock_make_error_closed(env); - - /* Accept and Read uses the same select flag - * so they can not be simultaneous - */ - if (descP->currentReaderP != NULL) - return esock_make_error_invalid(env, esock_atom_state); - - if (descP->currentAcceptorP == NULL) { - SOCKET accSock; - - /* We have no active acceptor (and therefore no acceptors in queue) - */ - - SSDBG( descP, ("SOCKET", "esock_accept {%d} -> try accept\r\n", - descP->sock) ); - - ESOCK_CNT_INC(env, descP, sockRef, atom_acc_tries, &descP->accTries, 1); - - accSock = sock_accept(descP->sock, NULL, NULL); - - if (ESOCK_IS_ERROR(accSock)) { - int save_errno; - - save_errno = sock_errno(); - - return esock_accept_listening_error(env, descP, sockRef, - accRef, caller, save_errno); - } else { - /* We got an incoming connection */ - return esock_accept_listening_accept(env, descP, sockRef, - accSock, caller); - } - } else { - - /* We have an active acceptor and possibly acceptors waiting in queue. - * If the pid of the calling process is not the pid of the - * "current process", push the requester onto the (acceptor) queue. - */ - - SSDBG( descP, ("SOCKET", "esock_accept_accepting -> check: " - "is caller current acceptor:" - "\r\n Caller: %T" - "\r\n Current: %T" - "\r\n Current Mon: %T" - "\r\n", - caller, - descP->currentAcceptor.pid, - ESOCK_MON2TERM(env, &descP->currentAcceptor.mon)) ); - - if (COMPARE_PIDS(&descP->currentAcceptor.pid, &caller) == 0) { - - SSDBG( descP, - ("SOCKET", - "esock_accept_accepting {%d} -> current acceptor" - "\r\n", descP->sock) ); - - return esock_accept_accepting_current(env, descP, sockRef, accRef); - - } else { - - /* Not the "current acceptor", so (maybe) push onto queue */ - - SSDBG( descP, - ("SOCKET", - "esock_accept_accepting {%d} -> *not* current acceptor\r\n", - descP->sock) ); - - return esock_accept_accepting_other(env, descP, accRef, caller); - } - } -} -#endif // #ifndef __WIN32__ - -/* *** esock_accept_listening_error *** - * - * The accept call resultet in an error - handle it. - * There are only two cases: - * 1) BLOCK => Attempt a "retry" - * 2) Other => Return the value (converted to an atom) - */ -#ifndef __WIN32__ -static -ERL_NIF_TERM esock_accept_listening_error(ErlNifEnv* env, - ESockDescriptor* descP, - ERL_NIF_TERM sockRef, - ERL_NIF_TERM accRef, - ErlNifPid caller, - int save_errno) -{ - ERL_NIF_TERM res; - - if (save_errno == ERRNO_BLOCK || - save_errno == EAGAIN) { - - /* *** Try again later *** */ - - SSDBG( descP, - ("SOCKET", - "esock_accept_listening_error {%d} -> would block - retry\r\n", - descP->sock) ); - - descP->currentAcceptor.pid = caller; - ESOCK_ASSERT( MONP("esock_accept_listening -> current acceptor", - env, descP, - &descP->currentAcceptor.pid, - &descP->currentAcceptor.mon) == 0 ); - ESOCK_ASSERT( descP->currentAcceptor.env == NULL ); - descP->currentAcceptor.env = esock_alloc_env("current acceptor"); - descP->currentAcceptor.ref = - CP_TERM(descP->currentAcceptor.env, accRef); - descP->currentAcceptorP = &descP->currentAcceptor; - - SSDBG( descP, - ("SOCKET", - "esock_accept_listening_error {%d} -> retry for: " - "\r\n Current Pid: %T" - "\r\n Current Mon: %T" - "\r\n", - descP->sock, - descP->currentAcceptor.pid, - ESOCK_MON2TERM(env, &descP->currentAcceptor.mon)) ); - - res = esock_accept_busy_retry(env, descP, sockRef, accRef, NULL); - } else { - - SSDBG( descP, - ("SOCKET", - "esock_accept_listening {%d} -> errno: %d\r\n", - descP->sock, save_errno) ); - - ESOCK_CNT_INC(env, descP, sockRef, atom_acc_fails, &descP->accFails, 1); - - res = esock_make_error_errno(env, save_errno); - } - - return res; -} -#endif // #ifndef __WIN32__ - - -/* *** esock_accept_listening_accept *** - * - * The accept call was successful (accepted) - handle the new connection. - */ -#ifndef __WIN32__ -static -ERL_NIF_TERM esock_accept_listening_accept(ErlNifEnv* env, - ESockDescriptor* descP, - ERL_NIF_TERM sockRef, - SOCKET accSock, - ErlNifPid caller) -{ - ERL_NIF_TERM res; - - esock_accept_accepted(env, descP, sockRef, accSock, caller, &res); - - return res; -} -#endif // #ifndef __WIN32__ - - -/* *** esock_accept_accepting_current *** - * Handles when the current acceptor makes another attempt. - */ -#ifndef __WIN32__ -static -ERL_NIF_TERM esock_accept_accepting_current(ErlNifEnv* env, - ESockDescriptor* descP, - ERL_NIF_TERM sockRef, - ERL_NIF_TERM accRef) -{ - SOCKET accSock; - int save_errno; - ERL_NIF_TERM res; - - SSDBG( descP, - ("SOCKET", - "esock_accept_accepting_current {%d} -> try accept\r\n", - descP->sock) ); - - ESOCK_CNT_INC(env, descP, sockRef, atom_acc_tries, &descP->accTries, 1); - - accSock = sock_accept(descP->sock, NULL, NULL); - - if (ESOCK_IS_ERROR(accSock)) { - - save_errno = sock_errno(); - - res = esock_accept_accepting_current_error(env, descP, sockRef, - accRef, save_errno); - } else { - - res = esock_accept_accepting_current_accept(env, descP, sockRef, - accSock); - } - - return res; -} -#endif // #ifndef __WIN32__ - - -/* *** esock_accept_accepting_current_accept *** - * - * Handles when the current acceptor succeeded in its accept call - - * handle the new connection. - */ -#ifndef __WIN32__ -static -ERL_NIF_TERM esock_accept_accepting_current_accept(ErlNifEnv* env, - ESockDescriptor* descP, - ERL_NIF_TERM sockRef, - SOCKET accSock) -{ - ERL_NIF_TERM res; - - SSDBG( descP, - ("SOCKET", - "esock_accept_accepting_current_accept {%d}" - "\r\n", descP->sock) ); - - if (esock_accept_accepted(env, descP, sockRef, accSock, - descP->currentAcceptor.pid, &res)) { - - ESOCK_ASSERT( DEMONP("esock_accept_accepting_current_accept -> " - "current acceptor", - env, descP, &descP->currentAcceptor.mon) == 0); - - MON_INIT(&descP->currentAcceptor.mon); - - if (!activate_next_acceptor(env, descP, sockRef)) { - - SSDBG( descP, - ("SOCKET", - "esock_accept_accepting_current_accept {%d} ->" - " no more acceptors" - "\r\n", descP->sock) ); - - descP->readState &= ~ESOCK_STATE_ACCEPTING; - - descP->currentAcceptorP = NULL; - } - - } - - return res; -} -#endif // #ifndef __WIN32__ - - -/* *** esock_accept_accepting_current_error *** - * The accept call of current acceptor resultet in an error - handle it. - * There are only two cases: - * 1) BLOCK => Attempt a "retry" - * 2) Other => Return the value (converted to an atom) - */ -#ifndef __WIN32__ -static -ERL_NIF_TERM esock_accept_accepting_current_error(ErlNifEnv* env, - ESockDescriptor* descP, - ERL_NIF_TERM sockRef, - ERL_NIF_TERM opRef, - int save_errno) -{ - ERL_NIF_TERM res, reason; - - if (save_errno == ERRNO_BLOCK || - save_errno == EAGAIN) { - - /* - * Just try again, no real error, just a ghost trigger from poll, - */ - - SSDBG( descP, - ("SOCKET", - "esock_accept_accepting_current_error {%d} -> " - "would block: try again\r\n", descP->sock) ); - - ESOCK_CNT_INC(env, descP, sockRef, atom_acc_waits, &descP->accWaits, 1); - - res = esock_accept_busy_retry(env, descP, sockRef, opRef, - &descP->currentAcceptor.pid); - - } else { - ESockRequestor req; - - SSDBG( descP, - ("SOCKET", - "esock_accept_accepting_current_error {%d} -> " - "error: %d\r\n", descP->sock, save_errno) ); - - ESOCK_CNT_INC(env, descP, sockRef, atom_acc_fails, &descP->accFails, 1); - - esock_requestor_release("esock_accept_accepting_current_error", - env, descP, &descP->currentAcceptor); - - reason = MKA(env, erl_errno_id(save_errno)); - res = esock_make_error(env, reason); - - req.env = NULL; - while (acceptor_pop(env, descP, &req)) { - SSDBG( descP, - ("SOCKET", - "esock_accept_accepting_current_error {%d} -> abort %T\r\n", - descP->sock, req.pid) ); - - esock_send_abort_msg(env, descP, sockRef, &req, reason); - - (void) DEMONP("esock_accept_accepting_current_error -> " - "pop'ed writer", - env, descP, &req.mon); - } - descP->currentAcceptorP = NULL; - } - - return res; -} -#endif // #ifndef __WIN32__ - - -/* *** esock_accept_accepting_other *** - * Handles when the another acceptor makes an attempt, which - * results (maybe) in the request being pushed onto the - * acceptor queue. - */ -#ifndef __WIN32__ -ERL_NIF_TERM -esock_accept_accepting_other(ErlNifEnv* env, - ESockDescriptor* descP, - ERL_NIF_TERM ref, - ErlNifPid caller) -{ - if (! acceptor_search4pid(env, descP, &caller)) { - acceptor_push(env, descP, caller, ref); - return esock_atom_select; - } else { - /* Acceptor already in queue */ - return esock_raise_invalid(env, esock_atom_state); - } -} -#endif // #ifndef __WIN32__ - - -/* *** esock_accept_busy_retry *** - * - * Perform a retry select. If successful, set nextState. - */ -#ifndef __WIN32__ -static -ERL_NIF_TERM esock_accept_busy_retry(ErlNifEnv* env, - ESockDescriptor* descP, - ERL_NIF_TERM sockRef, - ERL_NIF_TERM accRef, - ErlNifPid* pidP) -{ - int sres; - ERL_NIF_TERM res; - - if ((sres = esock_select_read(env, descP->sock, descP, pidP, - sockRef, accRef)) < 0) { - - ESOCK_ASSERT( DEMONP("esock_accept_busy_retry - select failed", - env, descP, &descP->currentAcceptor.mon) == 0); - - MON_INIT(&descP->currentAcceptor.mon); - - /* It is very unlikely that a next acceptor will be able - * to do anything successful, but we will clean the queue - */ - - if (!activate_next_acceptor(env, descP, sockRef)) { - SSDBG( descP, - ("SOCKET", - "esock_accept_busy_retry {%d} -> no more acceptors\r\n", - descP->sock) ); - - descP->readState &= ~ESOCK_STATE_ACCEPTING; - - descP->currentAcceptorP = NULL; - } - - res = - enif_raise_exception(env, - MKT2(env, esock_atom_select_read, - MKI(env, sres))); - } else { - descP->readState |= - (ESOCK_STATE_ACCEPTING | ESOCK_STATE_SELECTED); - res = esock_atom_select; - } - - return res; -} -#endif // #ifndef __WIN32__ - - -/* *** esock_accept_accepted *** - * - * Generic function handling a successful accept. - */ -#ifndef __WIN32__ -static -BOOLEAN_T esock_accept_accepted(ErlNifEnv* env, - ESockDescriptor* descP, - ERL_NIF_TERM sockRef, - SOCKET accSock, - ErlNifPid pid, - ERL_NIF_TERM* result) -{ - ESockDescriptor* accDescP; - ErlNifEvent accEvent; - ERL_NIF_TERM accRef; - int save_errno; - - /* - * We got one - */ - - ESOCK_CNT_INC(env, descP, sockRef, atom_acc_success, &descP->accSuccess, 1); - - if ((accEvent = sock_create_event(accSock)) == INVALID_EVENT) { - save_errno = sock_errno(); - (void) sock_close(accSock); - *result = esock_make_error_errno(env, save_errno); - return FALSE; - } - - accDescP = esock_alloc_descriptor(accSock, accEvent); - accDescP->domain = descP->domain; - accDescP->type = descP->type; - accDescP->protocol = descP->protocol; - - MLOCK(descP->writeMtx); - - accDescP->rBufSz = descP->rBufSz; // Inherit buffer size - accDescP->rNum = descP->rNum; // Inherit buffer uses - accDescP->rNumCnt = 0; - accDescP->rCtrlSz = descP->rCtrlSz; // Inherit buffer size - accDescP->wCtrlSz = descP->wCtrlSz; // Inherit buffer size - accDescP->iow = descP->iow; // Inherit iow - accDescP->dbg = descP->dbg; // Inherit debug flag - accDescP->useReg = descP->useReg; // Inherit useReg flag - esock_inc_socket(accDescP->domain, accDescP->type, accDescP->protocol); - - accRef = enif_make_resource(env, accDescP); - enif_release_resource(accDescP); - - accDescP->ctrlPid = pid; - /* pid has actually been compared equal to self() - * in this code path just a little while ago - */ - ESOCK_ASSERT( MONP("esock_accept_accepted -> ctrl", - env, accDescP, - &accDescP->ctrlPid, - &accDescP->ctrlMon) == 0 ); - - SET_NONBLOCKING(accDescP->sock); - - descP->writeState |= ESOCK_STATE_CONNECTED; - - MUNLOCK(descP->writeMtx); - - /* And finally (maybe) update the registry */ - if (descP->useReg) esock_send_reg_add_msg(env, descP, accRef); - - *result = esock_make_ok2(env, accRef); - - return TRUE; -} -#endif // #ifndef __WIN32__ - - - /* ---------------------------------------------------------------------- * nif_send * @@ -7607,7 +7047,7 @@ esock_sendfile_ok(ErlNifEnv *env, /* * Ok, this write is done maybe activate the next (if any) */ - if (! activate_next_writer(env, descP, sockRef)) { + if (! esock_activate_next_writer(env, descP, sockRef)) { SSDBG( descP, ("SOCKET", @@ -13730,7 +13170,7 @@ ERL_NIF_TERM esock_cancel_accept_current(ErlNifEnv* env, "esock_cancel_accept_current(%T) {%d} -> cancel res: %T" "\r\n", sockRef, descP->sock, res) ); - if (!activate_next_acceptor(env, descP, sockRef)) { + if (!esock_activate_next_acceptor(env, descP, sockRef)) { SSDBG( descP, ("SOCKET", @@ -13760,7 +13200,7 @@ ERL_NIF_TERM esock_cancel_accept_waiting(ErlNifEnv* env, { /* unqueue request from (acceptor) queue */ - if (acceptor_unqueue(env, descP, &opRef, selfP)) { + if (esock_acceptor_unqueue(env, descP, &opRef, selfP)) { return esock_atom_ok; } else { return esock_atom_not_found; @@ -13853,7 +13293,7 @@ ERL_NIF_TERM esock_cancel_send_current(ErlNifEnv* env, ("SOCKET", "esock_cancel_send_current(%T) {%d} -> cancel res: %T" "\r\n", sockRef, descP->sock, res) ); - if (!activate_next_writer(env, descP, sockRef)) { + if (!esock_activate_next_writer(env, descP, sockRef)) { SSDBG( descP, ("SOCKET", "esock_cancel_send_current(%T) {%d} -> no more writers" @@ -13879,7 +13319,7 @@ ERL_NIF_TERM esock_cancel_send_waiting(ErlNifEnv* env, { /* unqueue request from (writer) queue */ - if (writer_unqueue(env, descP, &opRef, selfP)) { + if (esock_writer_unqueue(env, descP, &opRef, selfP)) { return esock_atom_ok; } else { return esock_atom_not_found; @@ -13971,7 +13411,7 @@ ERL_NIF_TERM esock_cancel_recv_current(ErlNifEnv* env, ("SOCKET", "esock_cancel_recv_current(%T) {%d} -> cancel res: %T" "\r\n", sockRef, descP->sock, res) ); - if (! activate_next_reader(env, descP, sockRef)) { + if (!esock_activate_next_reader(env, descP, sockRef)) { SSDBG( descP, ("SOCKET", "esock_cancel_recv_current(%T) {%d} -> no more readers" @@ -13997,7 +13437,7 @@ ERL_NIF_TERM esock_cancel_recv_waiting(ErlNifEnv* env, { /* unqueue request from (reader) queue */ - if (reader_unqueue(env, descP, &opRef, selfP)) { + if (esock_reader_unqueue(env, descP, &opRef, selfP)) { return esock_atom_ok; } else { return esock_atom_not_found; @@ -14100,8 +13540,8 @@ BOOLEAN_T send_check_writer(ErlNifEnv* env, "\r\n ref: %T" "\r\n", descP->sock, ref) ); - if (! writer_search4pid(env, descP, &caller)) { - writer_push(env, descP, caller, ref); + if (! esock_writer_search4pid(env, descP, &caller)) { + esock_writer_push(env, descP, caller, ref); *checkResult = esock_atom_select; } else { /* Writer already in queue */ @@ -14255,7 +13695,7 @@ ERL_NIF_TERM send_check_ok(ErlNifEnv* env, /* * Ok, this write is done maybe activate the next (if any) */ - if (!activate_next_writer(env, descP, sockRef)) { + if (!esock_activate_next_writer(env, descP, sockRef)) { SSDBG( descP, ("SOCKET", "send_check_ok(%T) {%d} -> no more writers\r\n", @@ -14328,7 +13768,7 @@ void send_error_waiting_writers(ErlNifEnv* env, ESockRequestor req; req.env = NULL; /* read by writer_pop before free */ - while (writer_pop(env, descP, &req)) { + while (esock_writer_pop(env, descP, &req)) { SSDBG( descP, ("SOCKET", "send_error_waiting_writers(%T) {%d} -> abort" @@ -14394,7 +13834,7 @@ ERL_NIF_TERM send_check_retry(ErlNifEnv* env, /* * Ok, this write is done maybe activate the next (if any) */ - if (!activate_next_writer(env, descP, sockRef)) { + if (!esock_activate_next_writer(env, descP, sockRef)) { SSDBG( descP, ("SOCKET", @@ -14509,10 +13949,10 @@ BOOLEAN_T recv_check_reader(ErlNifEnv* env, "\r\n ref: %T" "\r\n", descP->sock, ref) ); - if (! reader_search4pid(env, descP, &caller)) { + if (! esock_reader_search4pid(env, descP, &caller)) { if (COMPARE(ref, atom_zero) == 0) goto done_ok; - reader_push(env, descP, caller, ref); + esock_reader_push(env, descP, caller, ref); *checkResult = esock_atom_select; } else { /* Reader already in queue */ @@ -14595,7 +14035,7 @@ recv_update_current_reader(ErlNifEnv* env, ESOCK_ASSERT( DEMONP("recv_update_current_reader", env, descP, &descP->currentReader.mon) == 0); - if (! activate_next_reader(env, descP, sockRef)) { + if (! esock_activate_next_reader(env, descP, sockRef)) { SSDBG( descP, ("SOCKET", @@ -14630,7 +14070,7 @@ void recv_error_current_reader(ErlNifEnv* env, env, descP, &descP->currentReader); req.env = NULL; /* read by reader_pop before free */ - while (reader_pop(env, descP, &req)) { + while (esock_reader_pop(env, descP, &req)) { SSDBG( descP, ("SOCKET", "recv_error_current_reader(%T) {%d} -> abort" @@ -17312,40 +16752,40 @@ void esock_dec_socket(int domain, int type, int protocol) { MLOCK(data.cntMtx); - cnt_dec(&data.numSockets, 1); + esock_cnt_dec(&data.numSockets, 1); /* *** Domain counter *** */ if (domain == AF_INET) - cnt_dec(&data.numDomainInet, 1); + esock_cnt_dec(&data.numDomainInet, 1); #if defined(HAVE_IN6) && defined(AF_INET6) else if (domain == AF_INET6) - cnt_dec(&data.numDomainInet6, 1); + esock_cnt_dec(&data.numDomainInet6, 1); #endif #if defined(HAS_AF_LOCAL) else if (domain == AF_LOCAL) - cnt_dec(&data.numDomainInet6, 1); + esock_cnt_dec(&data.numDomainInet6, 1); #endif /* *** Type counter *** */ if (type == SOCK_STREAM) - cnt_dec(&data.numTypeStreams, 1); + esock_cnt_dec(&data.numTypeStreams, 1); else if (type == SOCK_DGRAM) - cnt_dec(&data.numTypeDGrams, 1); + esock_cnt_dec(&data.numTypeDGrams, 1); #if defined(SOCK_SEQPACKET) else if (type == SOCK_SEQPACKET) - cnt_dec(&data.numTypeSeqPkgs, 1); + esock_cnt_dec(&data.numTypeSeqPkgs, 1); #endif /* *** Protocol counter *** */ if (protocol == IPPROTO_IP) - cnt_dec(&data.numProtoIP, 1); + esock_cnt_dec(&data.numProtoIP, 1); else if (protocol == IPPROTO_TCP) - cnt_dec(&data.numProtoTCP, 1); + esock_cnt_dec(&data.numProtoTCP, 1); else if (protocol == IPPROTO_UDP) - cnt_dec(&data.numProtoUDP, 1); + esock_cnt_dec(&data.numProtoUDP, 1); #if defined(HAVE_SCTP) else if (protocol == IPPROTO_SCTP) - cnt_dec(&data.numProtoSCTP, 1); + esock_cnt_dec(&data.numProtoSCTP, 1); #endif MUNLOCK(data.cntMtx); @@ -17357,40 +16797,40 @@ void esock_dec_socket(int domain, int type, int protocol) extern void esock_inc_socket(int domain, int type, int protocol) { - cnt_inc(&data.numSockets, 1); + esock_cnt_inc(&data.numSockets, 1); /* *** Domain counter *** */ if (domain == AF_INET) - cnt_inc(&data.numDomainInet, 1); + esock_cnt_inc(&data.numDomainInet, 1); #if defined(HAVE_IN6) && defined(AF_INET6) else if (domain == AF_INET6) - cnt_inc(&data.numDomainInet6, 1); + esock_cnt_inc(&data.numDomainInet6, 1); #endif #if defined(HAS_AF_LOCAL) else if (domain == AF_LOCAL) - cnt_inc(&data.numDomainInet6, 1); + esock_cnt_inc(&data.numDomainInet6, 1); #endif /* *** Type counter *** */ if (type == SOCK_STREAM) - cnt_inc(&data.numTypeStreams, 1); + esock_cnt_inc(&data.numTypeStreams, 1); else if (type == SOCK_DGRAM) - cnt_inc(&data.numTypeDGrams, 1); + esock_cnt_inc(&data.numTypeDGrams, 1); #if defined(SOCK_SEQPACKET) else if (type == SOCK_SEQPACKET) - cnt_inc(&data.numTypeSeqPkgs, 1); + esock_cnt_inc(&data.numTypeSeqPkgs, 1); #endif /* *** Protocol counter *** */ if (protocol == IPPROTO_IP) - cnt_inc(&data.numProtoIP, 1); + esock_cnt_inc(&data.numProtoIP, 1); else if (protocol == IPPROTO_TCP) - cnt_inc(&data.numProtoTCP, 1); + esock_cnt_inc(&data.numProtoTCP, 1); else if (protocol == IPPROTO_UDP) - cnt_inc(&data.numProtoUDP, 1); + esock_cnt_inc(&data.numProtoUDP, 1); #if defined(HAVE_SCTP) else if (protocol == IPPROTO_SCTP) - cnt_inc(&data.numProtoSCTP, 1); + esock_cnt_inc(&data.numProtoSCTP, 1); #endif } @@ -17526,7 +16966,7 @@ void esock_send_reg_del_msg(ErlNifEnv* env, * This message will only be sent if the iow (Inform On Wrap) is TRUE. */ #ifndef __WIN32__ -static +extern void esock_send_wrap_msg(ErlNifEnv* env, ESockDescriptor* descP, ERL_NIF_TERM sockRef, @@ -17557,7 +16997,7 @@ void esock_send_wrap_msg(ErlNifEnv* env, * (actually that the 'stop' callback function has been called). */ #ifndef __WIN32__ -static +extern void esock_send_close_msg(ErlNifEnv* env, ESockDescriptor* descP, ErlNifPid* pid) @@ -17577,6 +17017,7 @@ void esock_send_close_msg(ErlNifEnv* env, sockRef, descP->sock, MKPID(env, pid), descP->closeRef) ); } } + #ifdef HAVE_SENDFILE static void esock_send_sendfile_deferred_close_msg(ErlNifEnv* env, @@ -17610,7 +17051,7 @@ esock_send_sendfile_deferred_close_msg(ErlNifEnv* env, * erlang API functions for a select message. */ #ifndef __WIN32__ -static +extern void esock_send_abort_msg(ErlNifEnv* env, ESockDescriptor* descP, ERL_NIF_TERM sockRef, @@ -17929,9 +17370,9 @@ int esock_select_cancel(ErlNifEnv* env, * ---------------------------------------------------------------------- */ -/* *** activate_next_acceptor *** - * *** activate_next_writer *** - * *** activate_next_reader *** +/* *** esock_activate_next_acceptor *** + * *** esock_activate_next_writer *** + * *** esock_activate_next_reader *** * * This functions pops the requestors queue and then selects until it * manages to successfully activate a requestor or the queue is empty. @@ -17945,11 +17386,11 @@ int esock_select_cancel(ErlNifEnv* env, ACTIVATE_NEXT_FUNC_DECL(writer, write, currentWriter, writersQ) \ ACTIVATE_NEXT_FUNC_DECL(reader, read, currentReader, readersQ) -#define ACTIVATE_NEXT_FUNC_DECL(F, S, R, Q) \ - static \ - BOOLEAN_T activate_next_##F(ErlNifEnv* env, \ - ESockDescriptor* descP, \ - ERL_NIF_TERM sockRef) \ +#define ACTIVATE_NEXT_FUNC_DECL(F, S, R, Q) \ + extern \ + BOOLEAN_T esock_activate_next_##F(ErlNifEnv* env, \ + ESockDescriptor* descP, \ + ERL_NIF_TERM sockRef) \ { \ BOOLEAN_T popped, activated; \ int sres; \ @@ -17966,7 +17407,7 @@ int esock_select_cancel(ErlNifEnv* env, \ SSDBG( descP, \ ("SOCKET", \ - "activate_next_" #F "(%T) {%d} ->" \ + "esock_activate_next_" #F "(%T) {%d} ->" \ " new (active) requestor: " \ "\r\n pid: %T" \ "\r\n ref: %T" \ @@ -18004,7 +17445,7 @@ int esock_select_cancel(ErlNifEnv* env, \ SSDBG( descP, \ ("SOCKET", \ - "activate_next_" #F "(%T) {%d} ->" \ + "esock_activate_next_" #F "(%T) {%d} ->" \ " no more requestors\r\n", \ sockRef, descP->sock) ); \ \ @@ -18015,7 +17456,7 @@ int esock_select_cancel(ErlNifEnv* env, } while (!popped); \ \ SSDBG( descP, \ - ("SOCKET", "activate_next_" #F "(%T) {%d} -> " \ + ("SOCKET", "esock_activate_next_" #F "(%T) {%d} -> " \ "done with %s\r\n", \ sockRef, descP->sock, B2S(activated)) ); \ \ @@ -18038,9 +17479,9 @@ ACTIVATE_NEXT_FUNCS #ifndef __WIN32__ -/* *** acceptor_search4pid *** - * *** writer_search4pid *** - * *** reader_search4pid *** +/* *** esock_acceptor_search4pid *** + * *** esock_writer_search4pid *** + * *** esock_reader_search4pid *** * * Search for a pid in the requestor (acceptor, writer, or reader) queue. * @@ -18051,13 +17492,13 @@ ACTIVATE_NEXT_FUNCS REQ_SEARCH4PID_FUNC_DECL(writer, writersQ) \ REQ_SEARCH4PID_FUNC_DECL(reader, readersQ) -#define REQ_SEARCH4PID_FUNC_DECL(F, Q) \ - static \ - BOOLEAN_T F##_search4pid(ErlNifEnv* env, \ - ESockDescriptor* descP, \ - ErlNifPid* pid) \ - { \ - return qsearch4pid(env, &descP->Q, pid); \ +#define REQ_SEARCH4PID_FUNC_DECL(F, Q) \ + extern \ + BOOLEAN_T esock_##F##_search4pid(ErlNifEnv* env, \ + ESockDescriptor* descP, \ + ErlNifPid* pid) \ + { \ + return qsearch4pid(env, &descP->Q, pid); \ } REQ_SEARCH4PID_FUNCS #undef REQ_SEARCH4PID_FUNC_DECL @@ -18066,9 +17507,9 @@ REQ_SEARCH4PID_FUNCS -/* *** acceptor_push *** - * *** writer_push *** - * *** reader_push *** +/* *** esock_acceptor_push *** + * *** esock_writer_push *** + * *** esock_reader_push *** * * Push a requestor (acceptor, writer, or reader) onto its queue. * This happens when we already have a current request (of its type). @@ -18083,11 +17524,11 @@ REQ_SEARCH4PID_FUNCS REQ_PUSH_FUNC_DECL(reader, readersQ) #define REQ_PUSH_FUNC_DECL(F, Q) \ - static \ - void F##_push(ErlNifEnv* env, \ - ESockDescriptor* descP, \ - ErlNifPid pid, /* self() */ \ - ERL_NIF_TERM ref) \ + extern \ + void esock_##F##_push(ErlNifEnv* env, \ + ESockDescriptor* descP, \ + ErlNifPid pid, /* self() */ \ + ERL_NIF_TERM ref) \ { \ ESockRequestQueueElement *e; \ ESockRequestor *reqP; \ @@ -18096,9 +17537,9 @@ REQ_SEARCH4PID_FUNCS != NULL ); \ reqP = &e->data; \ reqP->pid = pid; \ - ESOCK_ASSERT( MONP(#F "_push -> " #F " request", \ + ESOCK_ASSERT( MONP("esock_" #F "_push -> " #F " request", \ env, descP, &pid, &reqP->mon) == 0 ); \ - reqP->env = esock_alloc_env(#F "_push"); \ + reqP->env = esock_alloc_env("esock_" #F "_push"); \ reqP->ref = CP_TERM(reqP->env, ref); \ \ qpush(&descP->Q, e); \ @@ -18110,9 +17551,9 @@ REQ_PUSH_FUNCS -/* *** acceptor_pop *** - * *** writer_pop *** - * *** reader_pop *** +/* *** esock_acceptor_pop *** + * *** esock_writer_pop *** + * *** esock_reader_pop *** * * Pop a requestor (acceptor, writer, or reader) from its queue. * @@ -18125,13 +17566,13 @@ REQ_PUSH_FUNCS REQ_POP_FUNC_DECL(writer, writersQ) \ REQ_POP_FUNC_DECL(reader, readersQ) -#define REQ_POP_FUNC_DECL(F, Q) \ - static \ - BOOLEAN_T F##_pop(ErlNifEnv* env, \ - ESockDescriptor* descP, \ - ESockRequestor* reqP) \ - { \ - return esock_requestor_pop(&descP->Q, reqP); \ +#define REQ_POP_FUNC_DECL(F, Q) \ + extern \ + BOOLEAN_T esock_##F##_pop(ErlNifEnv* env, \ + ESockDescriptor* descP, \ + ESockRequestor* reqP) \ + { \ + return esock_requestor_pop(&descP->Q, reqP); \ } REQ_POP_FUNCS #undef REQ_POP_FUNC_DECL @@ -18140,9 +17581,9 @@ REQ_POP_FUNCS -/* *** acceptor_unqueue *** - * *** writer_unqueue *** - * *** reader_unqueue *** +/* *** esock_acceptor_unqueue *** + * *** esock_writer_unqueue *** + * *** esock_reader_unqueue *** * * Remove a requestor (acceptor, writer, or reader) from its queue. * @@ -18156,11 +17597,11 @@ REQ_POP_FUNCS REQ_UNQUEUE_FUNC_DECL(reader, readersQ) #define REQ_UNQUEUE_FUNC_DECL(F, Q) \ - static \ - BOOLEAN_T F##_unqueue(ErlNifEnv* env, \ - ESockDescriptor* descP, \ - ERL_NIF_TERM* refP, \ - const ErlNifPid* pidP) \ + extern \ + BOOLEAN_T esock_##F##_unqueue(ErlNifEnv* env, \ + ESockDescriptor* descP, \ + ERL_NIF_TERM* refP, \ + const ErlNifPid* pidP) \ { \ return qunqueue(env, descP, "qunqueue -> waiting " #F, \ &descP->Q, refP, pidP); \ @@ -18346,10 +17787,8 @@ BOOLEAN_T qunqueue(ErlNifEnv* env, * ---------------------------------------------------------------------- */ -#ifndef __WIN32__ - -static -BOOLEAN_T cnt_inc(ESockCounter* cnt, ESockCounter inc) +extern +BOOLEAN_T esock_cnt_inc(ESockCounter* cnt, ESockCounter inc) { BOOLEAN_T wrap; ESockCounter max = ESOCK_COUNTER_MAX; @@ -18366,8 +17805,8 @@ BOOLEAN_T cnt_inc(ESockCounter* cnt, ESockCounter inc) return (wrap); } -static -void cnt_dec(ESockCounter* cnt, ESockCounter dec) +extern +void esock_cnt_dec(ESockCounter* cnt, ESockCounter dec) { ESockCounter current = *cnt; @@ -18379,9 +17818,6 @@ void cnt_dec(ESockCounter* cnt, ESockCounter dec) return; } -#endif // #ifndef __WIN32__ - - /* ---------------------------------------------------------------------- @@ -19073,7 +18509,7 @@ void esock_down_acceptor(ErlNifEnv* env, "current acceptor - try activate next\r\n", sockRef, descP->sock) ); - if (!activate_next_acceptor(env, descP, sockRef)) { + if (!esock_activate_next_acceptor(env, descP, sockRef)) { SSDBG( descP, ("SOCKET", @@ -19095,7 +18531,7 @@ void esock_down_acceptor(ErlNifEnv* env, "not current acceptor - maybe a waiting acceptor\r\n", sockRef, descP->sock) ); - acceptor_unqueue(env, descP, NULL, pidP); + esock_acceptor_unqueue(env, descP, NULL, pidP); } } #endif // #ifndef __WIN32__ @@ -19123,7 +18559,7 @@ void esock_down_writer(ErlNifEnv* env, "current writer - try activate next\r\n", sockRef, descP->sock) ); - if (!activate_next_writer(env, descP, sockRef)) { + if (!esock_activate_next_writer(env, descP, sockRef)) { SSDBG( descP, ("SOCKET", @@ -19143,7 +18579,7 @@ void esock_down_writer(ErlNifEnv* env, "not current writer - maybe a waiting writer\r\n", sockRef, descP->sock) ); - writer_unqueue(env, descP, NULL, pidP); + esock_writer_unqueue(env, descP, NULL, pidP); } } #endif // #ifndef __WIN32__ @@ -19173,7 +18609,7 @@ void esock_down_reader(ErlNifEnv* env, "current reader - try activate next\r\n", sockRef, descP->sock) ); - if (! activate_next_reader(env, descP, sockRef)) { + if (! esock_activate_next_reader(env, descP, sockRef)) { SSDBG( descP, ("SOCKET", @@ -19193,7 +18629,7 @@ void esock_down_reader(ErlNifEnv* env, "not current reader - maybe a waiting reader\r\n", sockRef, descP->sock) ); - reader_unqueue(env, descP, NULL, pidP); + esock_reader_unqueue(env, descP, NULL, pidP); } } #endif // #ifndef __WIN32__ diff --git a/erts/emulator/nifs/common/socket_int.h b/erts/emulator/nifs/common/socket_int.h index 9740d0b9f1..dbf173dd1d 100644 --- a/erts/emulator/nifs/common/socket_int.h +++ b/erts/emulator/nifs/common/socket_int.h @@ -191,6 +191,10 @@ typedef long ssize_t; GLOBAL_ATOM_DEF(accept); \ GLOBAL_ATOM_DEF(acceptconn); \ GLOBAL_ATOM_DEF(acceptfilter); \ + GLOBAL_ATOM_DEF(acc_success); \ + GLOBAL_ATOM_DEF(acc_fails); \ + GLOBAL_ATOM_DEF(acc_tries); \ + GLOBAL_ATOM_DEF(acc_waits); \ GLOBAL_ATOM_DEF(adaption_layer); \ GLOBAL_ATOM_DEF(addr); \ GLOBAL_ATOM_DEF(addrform); \ diff --git a/erts/emulator/nifs/unix/unix_socket_syncio.c b/erts/emulator/nifs/unix/unix_socket_syncio.c index 909aeb2f1c..b4198d98bb 100644 --- a/erts/emulator/nifs/unix/unix_socket_syncio.c +++ b/erts/emulator/nifs/unix/unix_socket_syncio.c @@ -45,6 +45,13 @@ * ======================================================================== * */ +#ifdef HAS_ACCEPT4 +// We have to figure out what the flags are... +#define sock_accept(s, addr, len) \ + accept4((s), (addr), (len), (SOCK_CLOEXEC)) +#else +#define sock_accept(s, addr, len) accept((s), (addr), (len)) +#endif #define sock_bind(s, addr, len) bind((s), (addr), (len)) #define sock_connect(s, addr, len) connect((s), (addr), (len)) #define sock_errno() errno @@ -77,6 +84,7 @@ static BOOLEAN_T open_get_type(ErlNifEnv* env, static BOOLEAN_T open_get_protocol(ErlNifEnv* env, ERL_NIF_TERM eopts, int* protocol); + #ifdef HAVE_SETNS static BOOLEAN_T open_get_netns(ErlNifEnv* env, ERL_NIF_TERM opts, @@ -86,8 +94,50 @@ static BOOLEAN_T change_network_namespace(BOOLEAN_T dbg, static BOOLEAN_T restore_network_namespace(BOOLEAN_T dbg, int ns, SOCKET sock, int* err); #endif + static BOOLEAN_T verify_is_connected(ESockDescriptor* descP, int* err); +static ERL_NIF_TERM esock_accept_listening_error(ErlNifEnv* env, + ESockDescriptor* descP, + ERL_NIF_TERM sockRef, + ERL_NIF_TERM accRef, + ErlNifPid caller, + int save_errno); +static ERL_NIF_TERM esock_accept_listening_accept(ErlNifEnv* env, + ESockDescriptor* descP, + ERL_NIF_TERM sockRef, + SOCKET accSock, + ErlNifPid caller); +static ERL_NIF_TERM esock_accept_accepting_current(ErlNifEnv* env, + ESockDescriptor* descP, + ERL_NIF_TERM sockRef, + ERL_NIF_TERM ref); +static +ERL_NIF_TERM esock_accept_accepting_current_accept(ErlNifEnv* env, + ESockDescriptor* descP, + ERL_NIF_TERM sockRef, + SOCKET accSock); +static +ERL_NIF_TERM esock_accept_accepting_current_error(ErlNifEnv* env, + ESockDescriptor* descP, + ERL_NIF_TERM sockRef, + ERL_NIF_TERM opRef, + int save_errno); +static ERL_NIF_TERM esock_accept_accepting_other(ErlNifEnv* env, + ESockDescriptor* descP, + ERL_NIF_TERM ref, + ErlNifPid caller); +static ERL_NIF_TERM esock_accept_busy_retry(ErlNifEnv* env, + ESockDescriptor* descP, + ERL_NIF_TERM sockRef, + ERL_NIF_TERM accRef, + ErlNifPid* pidP); +static BOOLEAN_T esock_accept_accepted(ErlNifEnv* env, + ESockDescriptor* descP, + ERL_NIF_TERM sockRef, + SOCKET accSock, + ErlNifPid pid, + ERL_NIF_TERM* result); /* ======================================================================== * @@ -843,10 +893,466 @@ ERL_NIF_TERM essio_accept(ErlNifEnv* env, ERL_NIF_TERM sockRef, ERL_NIF_TERM accRef) { - return enif_raise_exception(env, MKA(env, "notsup")); + ErlNifPid caller; + + ESOCK_ASSERT( enif_self(env, &caller) != NULL ); + + if (! IS_OPEN(descP->readState)) + return esock_make_error_closed(env); + + /* Accept and Read uses the same select flag + * so they can not be simultaneous + */ + if (descP->currentReaderP != NULL) + return esock_make_error_invalid(env, esock_atom_state); + + if (descP->currentAcceptorP == NULL) { + SOCKET accSock; + + /* We have no active acceptor (and therefore no acceptors in queue) + */ + + SSDBG( descP, ("SOCKET", "esock_accept {%d} -> try accept\r\n", + descP->sock) ); + + ESOCK_CNT_INC(env, descP, sockRef, + esock_atom_acc_tries, &descP->accTries, 1); + + accSock = sock_accept(descP->sock, NULL, NULL); + + if (ESOCK_IS_ERROR(accSock)) { + int save_errno; + + save_errno = sock_errno(); + + return esock_accept_listening_error(env, descP, sockRef, + accRef, caller, save_errno); + } else { + /* We got an incoming connection */ + return esock_accept_listening_accept(env, descP, sockRef, + accSock, caller); + } + } else { + + /* We have an active acceptor and possibly acceptors waiting in queue. + * If the pid of the calling process is not the pid of the + * "current process", push the requester onto the (acceptor) queue. + */ + + SSDBG( descP, ("SOCKET", "esock_accept_accepting -> check: " + "is caller current acceptor:" + "\r\n Caller: %T" + "\r\n Current: %T" + "\r\n Current Mon: %T" + "\r\n", + caller, + descP->currentAcceptor.pid, + ESOCK_MON2TERM(env, &descP->currentAcceptor.mon)) ); + + if (COMPARE_PIDS(&descP->currentAcceptor.pid, &caller) == 0) { + + SSDBG( descP, + ("SOCKET", + "esock_accept_accepting {%d} -> current acceptor" + "\r\n", descP->sock) ); + + return esock_accept_accepting_current(env, descP, sockRef, accRef); + + } else { + + /* Not the "current acceptor", so (maybe) push onto queue */ + + SSDBG( descP, + ("SOCKET", + "esock_accept_accepting {%d} -> *not* current acceptor\r\n", + descP->sock) ); + + return esock_accept_accepting_other(env, descP, accRef, caller); + } + } +} + + +/* *** esock_accept_listening_error *** + * + * The accept call resultet in an error - handle it. + * There are only two cases: + * 1) BLOCK => Attempt a "retry" + * 2) Other => Return the value (converted to an atom) + */ +static +ERL_NIF_TERM esock_accept_listening_error(ErlNifEnv* env, + ESockDescriptor* descP, + ERL_NIF_TERM sockRef, + ERL_NIF_TERM accRef, + ErlNifPid caller, + int save_errno) +{ + ERL_NIF_TERM res; + + if (save_errno == ERRNO_BLOCK || + save_errno == EAGAIN) { + + /* *** Try again later *** */ + + SSDBG( descP, + ("SOCKET", + "esock_accept_listening_error {%d} -> would block - retry\r\n", + descP->sock) ); + + descP->currentAcceptor.pid = caller; + ESOCK_ASSERT( MONP("esock_accept_listening -> current acceptor", + env, descP, + &descP->currentAcceptor.pid, + &descP->currentAcceptor.mon) == 0 ); + ESOCK_ASSERT( descP->currentAcceptor.env == NULL ); + descP->currentAcceptor.env = esock_alloc_env("current acceptor"); + descP->currentAcceptor.ref = + CP_TERM(descP->currentAcceptor.env, accRef); + descP->currentAcceptorP = &descP->currentAcceptor; + + SSDBG( descP, + ("SOCKET", + "esock_accept_listening_error {%d} -> retry for: " + "\r\n Current Pid: %T" + "\r\n Current Mon: %T" + "\r\n", + descP->sock, + descP->currentAcceptor.pid, + ESOCK_MON2TERM(env, &descP->currentAcceptor.mon)) ); + + res = esock_accept_busy_retry(env, descP, sockRef, accRef, NULL); + + } else { + + SSDBG( descP, + ("SOCKET", + "esock_accept_listening {%d} -> errno: %d\r\n", + descP->sock, save_errno) ); + + ESOCK_CNT_INC(env, descP, sockRef, + esock_atom_acc_fails, &descP->accFails, 1); + + res = esock_make_error_errno(env, save_errno); + } + + return res; +} + + +/* *** esock_accept_listening_accept *** + * + * The accept call was successful (accepted) - handle the new connection. + */ +static +ERL_NIF_TERM esock_accept_listening_accept(ErlNifEnv* env, + ESockDescriptor* descP, + ERL_NIF_TERM sockRef, + SOCKET accSock, + ErlNifPid caller) +{ + ERL_NIF_TERM res; + + esock_accept_accepted(env, descP, sockRef, accSock, caller, &res); + + return res; +} + + +/* *** esock_accept_accepting_current *** + * Handles when the current acceptor makes another attempt. + */ +static +ERL_NIF_TERM esock_accept_accepting_current(ErlNifEnv* env, + ESockDescriptor* descP, + ERL_NIF_TERM sockRef, + ERL_NIF_TERM accRef) +{ + SOCKET accSock; + int save_errno; + ERL_NIF_TERM res; + + SSDBG( descP, + ("SOCKET", + "esock_accept_accepting_current {%d} -> try accept\r\n", + descP->sock) ); + + ESOCK_CNT_INC(env, descP, sockRef, + esock_atom_acc_tries, &descP->accTries, 1); + + accSock = sock_accept(descP->sock, NULL, NULL); + + if (ESOCK_IS_ERROR(accSock)) { + + save_errno = sock_errno(); + + res = esock_accept_accepting_current_error(env, descP, sockRef, + accRef, save_errno); + } else { + + res = esock_accept_accepting_current_accept(env, descP, sockRef, + accSock); + } + + return res; +} + + +/* *** esock_accept_accepting_current_accept *** + * + * Handles when the current acceptor succeeded in its accept call - + * handle the new connection. + */ +static +ERL_NIF_TERM esock_accept_accepting_current_accept(ErlNifEnv* env, + ESockDescriptor* descP, + ERL_NIF_TERM sockRef, + SOCKET accSock) +{ + ERL_NIF_TERM res; + + SSDBG( descP, + ("SOCKET", + "esock_accept_accepting_current_accept {%d}" + "\r\n", descP->sock) ); + + if (esock_accept_accepted(env, descP, sockRef, accSock, + descP->currentAcceptor.pid, &res)) { + + ESOCK_ASSERT( DEMONP("esock_accept_accepting_current_accept -> " + "current acceptor", + env, descP, &descP->currentAcceptor.mon) == 0); + + MON_INIT(&descP->currentAcceptor.mon); + + if (!esock_activate_next_acceptor(env, descP, sockRef)) { + + SSDBG( descP, + ("SOCKET", + "esock_accept_accepting_current_accept {%d} ->" + " no more acceptors" + "\r\n", descP->sock) ); + + descP->readState &= ~ESOCK_STATE_ACCEPTING; + + descP->currentAcceptorP = NULL; + } + + } + + return res; +} + + +/* *** esock_accept_accepting_current_error *** + * The accept call of current acceptor resultet in an error - handle it. + * There are only two cases: + * 1) BLOCK => Attempt a "retry" + * 2) Other => Return the value (converted to an atom) + */ +static +ERL_NIF_TERM esock_accept_accepting_current_error(ErlNifEnv* env, + ESockDescriptor* descP, + ERL_NIF_TERM sockRef, + ERL_NIF_TERM opRef, + int save_errno) +{ + ERL_NIF_TERM res, reason; + + if (save_errno == ERRNO_BLOCK || + save_errno == EAGAIN) { + + /* + * Just try again, no real error, just a ghost trigger from poll, + */ + + SSDBG( descP, + ("SOCKET", + "esock_accept_accepting_current_error {%d} -> " + "would block: try again\r\n", descP->sock) ); + + ESOCK_CNT_INC(env, descP, sockRef, + esock_atom_acc_waits, &descP->accWaits, 1); + + res = esock_accept_busy_retry(env, descP, sockRef, opRef, + &descP->currentAcceptor.pid); + + } else { + ESockRequestor req; + + SSDBG( descP, + ("SOCKET", + "esock_accept_accepting_current_error {%d} -> " + "error: %d\r\n", descP->sock, save_errno) ); + + ESOCK_CNT_INC(env, descP, sockRef, + esock_atom_acc_fails, &descP->accFails, 1); + + esock_requestor_release("esock_accept_accepting_current_error", + env, descP, &descP->currentAcceptor); + + reason = MKA(env, erl_errno_id(save_errno)); + res = esock_make_error(env, reason); + + req.env = NULL; + while (esock_acceptor_pop(env, descP, &req)) { + SSDBG( descP, + ("SOCKET", + "esock_accept_accepting_current_error {%d} -> abort %T\r\n", + descP->sock, req.pid) ); + + esock_send_abort_msg(env, descP, sockRef, &req, reason); + + (void) DEMONP("esock_accept_accepting_current_error -> " + "pop'ed writer", + env, descP, &req.mon); + } + descP->currentAcceptorP = NULL; + } + + return res; } +/* *** esock_accept_accepting_other *** + * Handles when the another acceptor makes an attempt, which + * results (maybe) in the request being pushed onto the + * acceptor queue. + */ +ERL_NIF_TERM +esock_accept_accepting_other(ErlNifEnv* env, + ESockDescriptor* descP, + ERL_NIF_TERM ref, + ErlNifPid caller) +{ + if (! esock_acceptor_search4pid(env, descP, &caller)) { + esock_acceptor_push(env, descP, caller, ref); + return esock_atom_select; + } else { + /* Acceptor already in queue */ + return esock_raise_invalid(env, esock_atom_state); + } +} + + +/* *** esock_accept_busy_retry *** + * + * Perform a retry select. If successful, set nextState. + */ +static +ERL_NIF_TERM esock_accept_busy_retry(ErlNifEnv* env, + ESockDescriptor* descP, + ERL_NIF_TERM sockRef, + ERL_NIF_TERM accRef, + ErlNifPid* pidP) +{ + int sres; + ERL_NIF_TERM res; + + if ((sres = esock_select_read(env, descP->sock, descP, pidP, + sockRef, accRef)) < 0) { + + ESOCK_ASSERT( DEMONP("esock_accept_busy_retry - select failed", + env, descP, &descP->currentAcceptor.mon) == 0); + + MON_INIT(&descP->currentAcceptor.mon); + + /* It is very unlikely that a next acceptor will be able + * to do anything successful, but we will clean the queue + */ + + if (!esock_activate_next_acceptor(env, descP, sockRef)) { + SSDBG( descP, + ("SOCKET", + "esock_accept_busy_retry {%d} -> no more acceptors\r\n", + descP->sock) ); + + descP->readState &= ~ESOCK_STATE_ACCEPTING; + + descP->currentAcceptorP = NULL; + } + + res = + enif_raise_exception(env, + MKT2(env, esock_atom_select_read, + MKI(env, sres))); + } else { + descP->readState |= + (ESOCK_STATE_ACCEPTING | ESOCK_STATE_SELECTED); + res = esock_atom_select; + } + + return res; +} + + +/* *** esock_accept_accepted *** + * + * Generic function handling a successful accept. + */ +static +BOOLEAN_T esock_accept_accepted(ErlNifEnv* env, + ESockDescriptor* descP, + ERL_NIF_TERM sockRef, + SOCKET accSock, + ErlNifPid pid, + ERL_NIF_TERM* result) +{ + ESockDescriptor* accDescP; + ERL_NIF_TERM accRef; + + /* + * We got one + */ + + ESOCK_CNT_INC(env, descP, sockRef, + esock_atom_acc_success, &descP->accSuccess, 1); + + accDescP = esock_alloc_descriptor(accSock, accSock); + accDescP->domain = descP->domain; + accDescP->type = descP->type; + accDescP->protocol = descP->protocol; + + MLOCK(descP->writeMtx); + + accDescP->rBufSz = descP->rBufSz; // Inherit buffer size + accDescP->rNum = descP->rNum; // Inherit buffer uses + accDescP->rNumCnt = 0; + accDescP->rCtrlSz = descP->rCtrlSz; // Inherit buffer size + accDescP->wCtrlSz = descP->wCtrlSz; // Inherit buffer size + accDescP->iow = descP->iow; // Inherit iow + accDescP->dbg = descP->dbg; // Inherit debug flag + accDescP->useReg = descP->useReg; // Inherit useReg flag + esock_inc_socket(accDescP->domain, accDescP->type, accDescP->protocol); + + accRef = enif_make_resource(env, accDescP); + enif_release_resource(accDescP); + + accDescP->ctrlPid = pid; + /* pid has actually been compared equal to self() + * in this code path just a little while ago + */ + ESOCK_ASSERT( MONP("esock_accept_accepted -> ctrl", + env, accDescP, + &accDescP->ctrlPid, + &accDescP->ctrlMon) == 0 ); + + SET_NONBLOCKING(accDescP->sock); + + descP->writeState |= ESOCK_STATE_CONNECTED; + + MUNLOCK(descP->writeMtx); + + /* And finally (maybe) update the registry */ + if (descP->useReg) esock_send_reg_add_msg(env, descP, accRef); + + *result = esock_make_ok2(env, accRef); + + return TRUE; +} + + + /* ======================================================================== */ extern |