diff options
Diffstat (limited to 'erts/emulator/nifs/common/socket_nif.c')
-rw-r--r-- | erts/emulator/nifs/common/socket_nif.c | 140 |
1 files changed, 135 insertions, 5 deletions
diff --git a/erts/emulator/nifs/common/socket_nif.c b/erts/emulator/nifs/common/socket_nif.c index 84b07252d8..64b183c38e 100644 --- a/erts/emulator/nifs/common/socket_nif.c +++ b/erts/emulator/nifs/common/socket_nif.c @@ -936,6 +936,9 @@ typedef struct { // ERL_NIF_TERM buildDate; BOOLEAN_T dbg; + /* Registry stuff */ + ErlNifPid regPid; + BOOLEAN_T iow; // Where do we send this? Subscription? ErlNifMutex* cntMtx; Uint32 numSockets; @@ -2658,6 +2661,11 @@ static void esock_down_reader(ErlNifEnv* env, ERL_NIF_TERM sockRef, const ErlNifPid* pid); +static void esock_send_reg_add_msg(ErlNifEnv* env, + ERL_NIF_TERM sockRef); +static void esock_send_reg_del_msg(ErlNifEnv* env, + ERL_NIF_TERM sockRef); + static char* esock_send_wrap_msg(ErlNifEnv* env, ESockDescriptor* descP, ERL_NIF_TERM sockRef, @@ -2676,6 +2684,13 @@ static char* esock_send_msg(ErlNifEnv* env, ERL_NIF_TERM msg, ErlNifEnv* msgEnv); +static ERL_NIF_TERM mk_reg_add_msg(ErlNifEnv* env, + ERL_NIF_TERM sockRef); +static ERL_NIF_TERM mk_reg_del_msg(ErlNifEnv* env, + ERL_NIF_TERM sockRef); +static ERL_NIF_TERM mk_reg_msg(ErlNifEnv* env, + ERL_NIF_TERM tag, + ERL_NIF_TERM sockRef); static ERL_NIF_TERM mk_abort_msg(ErlNifEnv* env, ERL_NIF_TERM sockRef, ERL_NIF_TERM opRef, @@ -2994,6 +3009,7 @@ ERL_NIF_TERM esock_atom_socket_tag; // This has a "special" name ('$socket') /* *** Local atoms *** */ #define LOCAL_ATOMS \ LOCAL_ATOM_DECL(adaptation_layer); \ + LOCAL_ATOM_DECL(add); \ LOCAL_ATOM_DECL(addr_unreach); \ LOCAL_ATOM_DECL(address); \ LOCAL_ATOM_DECL(adm_prohibited); \ @@ -3009,6 +3025,7 @@ ERL_NIF_TERM esock_atom_socket_tag; // This has a "special" name ('$socket') LOCAL_ATOM_DECL(counter_wrap); \ LOCAL_ATOM_DECL(counters); \ LOCAL_ATOM_DECL(data_in); \ + LOCAL_ATOM_DECL(del); \ LOCAL_ATOM_DECL(dest_unreach); \ LOCAL_ATOM_DECL(do); \ LOCAL_ATOM_DECL(dont); \ @@ -3280,7 +3297,7 @@ ERL_NIF_TERM esock_global_info(ErlNifEnv* env) * domain: The domain of the socket * type: The type of the socket * protocol: The protocol of the socket - * (ctrl: Controlling process of the socket) + * ctrl: Controlling process of the socket) * (readable: Is the socket readable) * (writable: Is the socket writable) * (connected: Is the socket connected) @@ -3297,7 +3314,7 @@ ERL_NIF_TERM esock_socket_info(ErlNifEnv* env, ERL_NIF_TERM domain = esock_socket_info_domain(env, descP); ERL_NIF_TERM type = esock_socket_info_type(env, descP); ERL_NIF_TERM protocol = esock_socket_info_protocol(env, descP); - // ERL_NIF_TERM ctrlPid = MKPID(env, &descP->ctrlPid); + ERL_NIF_TERM ctrlPid = MKPID(env, &descP->ctrlPid); ERL_NIF_TERM readable = BOOL2ATOM(descP->isReadable); ERL_NIF_TERM writable = BOOL2ATOM(descP->isWritable); // ERL_NIF_TERM connected = BOOL2ATOM(descP->isConnected); @@ -3308,6 +3325,7 @@ ERL_NIF_TERM esock_socket_info(ErlNifEnv* env, ERL_NIF_TERM keys[] = {esock_atom_domain, esock_atom_type, esock_atom_protocol, + esock_atom_ctrl, atom_readable, atom_writable, atom_counters, @@ -3317,6 +3335,7 @@ ERL_NIF_TERM esock_socket_info(ErlNifEnv* env, ERL_NIF_TERM vals[] = {domain, type, protocol, + ctrlPid, readable, writable, counters, @@ -5334,6 +5353,9 @@ ERL_NIF_TERM esock_open(ErlNifEnv* env, inc_socket(domain, type, protocol); + /* And finally update the registry */ + esock_send_reg_add_msg(env, res); + return esock_make_ok2(env, res); } @@ -5622,8 +5644,8 @@ ERL_NIF_TERM nif_connect(ErlNifEnv* env, * safe side we do the best we can to avoid complications... */ - MLOCK(descP->readMtx); MLOCK(descP->writeMtx); + MLOCK(descP->readMtx); MLOCK(descP->cfgMtx); res = esock_connect(env, descP, sockRef); @@ -6552,6 +6574,9 @@ BOOLEAN_T esock_accept_accepted(ErlNifEnv* env, accDescP->isReadable = TRUE; accDescP->isWritable = TRUE; + /* And finally update the registry */ + esock_send_reg_add_msg(env, accRef); + *result = esock_make_ok2(env, accRef); return TRUE; @@ -19319,6 +19344,55 @@ size_t my_strnlen(const char *s, size_t maxlen) #endif + + +/* =========================================================================== + * + * Socket Registry message functions + * + * =========================================================================== + */ + +/* Send a (socket) add message to the socket registry process. + * We know that this process *is* alive since the VM would + * terminate otherwise, so there is no need to test if + * the sending fails. + */ +static +void esock_send_reg_add_msg(ErlNifEnv* env, + ERL_NIF_TERM sockRef) +{ + ERL_NIF_TERM msg = mk_reg_add_msg(env, sockRef); + + esock_send_msg(env, &data.regPid, msg, NULL); +} + + + +/* Send a (socket) del message to the socket registry process. + * We know that this process *is* alive since the VM would + * terminate otherwise, so there is no need to test if + * the sending fails. + */ +static +void esock_send_reg_del_msg(ErlNifEnv* env, + ERL_NIF_TERM sockRef) +{ + ERL_NIF_TERM msg = mk_reg_del_msg(env, sockRef); + + esock_send_msg(env, &data.regPid, msg, NULL); +} + + + + +/* =========================================================================== + * + * Socket user message functions + * + * =========================================================================== + */ + /* Send an counter wrap message to the controlling process: * A message in the form: * @@ -19414,6 +19488,55 @@ char* esock_send_msg(ErlNifEnv* env, +/* *** mk_reg_add_msg *** + * + * Construct a socket add message for the socket registry. + * + * {'$socket', add, Socket} + * + */ +static +ERL_NIF_TERM mk_reg_add_msg(ErlNifEnv* env, + ERL_NIF_TERM sockRef) +{ + return mk_reg_msg(env, atom_add, sockRef); +} + + +/* *** mk_reg_del_msg *** + * + * Construct a socket del message for the socket registry. + * + * {'$socket', del, Socket} + * + */ +static +ERL_NIF_TERM mk_reg_del_msg(ErlNifEnv* env, + ERL_NIF_TERM sockRef) +{ + return mk_reg_msg(env, atom_del, sockRef); +} + + +/* *** mk_reg_msg *** + * + * Construct a general message for the socket registry. + * Tag is (at this time) either the atom 'add' or the atom 'del'. + * + * {'$socket', Tag, Socket} + * + */ +static +ERL_NIF_TERM mk_reg_msg(ErlNifEnv* env, + ERL_NIF_TERM tag, + ERL_NIF_TERM sockRef) +{ + ERL_NIF_TERM socket = mk_socket(env, sockRef); + + return MKT3(env, esock_atom_socket_tag, tag, socket); +} + + /* *** mk_abort_msg *** * * Create the abort message, which has the following form: @@ -20357,6 +20480,9 @@ void esock_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call) MUNLOCK(descP->readMtx); MUNLOCK(descP->writeMtx); + /* And finally update the registry */ + esock_send_reg_del_msg(env, sockRef); + SSDBG( descP, ("SOCKET", "esock_stop -> done (%d, %d)\r\n", descP->sock, fd) ); @@ -20434,8 +20560,9 @@ void inform_waiting_procs(ErlNifEnv* env, SSDBG( descP, ("SOCKET", - "inform_waiting_procs -> abort request %T (from %T)\r\n", - currentP->data.ref, currentP->data.pid) ); + "inform_waiting_procs -> " + "send abort message to waiting %s %T\r\n", + role, currentP->data.pid) ); if (esock_send_abort_msg(env, sockRef, @@ -20860,6 +20987,9 @@ int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info) data.dbg = extract_debug(env, load_info); data.iow = extract_iow(env, load_info); + esock_extract_pid_from_map(env, load_info, + MKA(env, "registry"), &data.regPid); + /* +++ Global Counters +++ */ data.cntMtx = MCREATE("esock[gcnt]"); data.numSockets = 0; |