diff options
author | Garrett D'Amore <garrett@damore.org> | 2016-10-05 21:27:14 -0700 |
---|---|---|
committer | Garrett D'Amore <garrett@damore.org> | 2016-10-19 20:29:59 -0700 |
commit | 4459720e0c413fd856eb4960c6bce86ea59dbed3 (patch) | |
tree | 6d0d5127616630c5fb2bf0dbda151c0f069e6254 | |
parent | 08cb6aa9d27a1239ffbc0680a833184ad8d7666f (diff) | |
download | nanomsg-4459720e0c413fd856eb4960c6bce86ea59dbed3.tar.gz |
fixes #811 Memory leak when closing a PUSH socket before the PULL
-rw-r--r-- | src/transports/inproc/cinproc.c | 103 | ||||
-rw-r--r-- | src/transports/inproc/cinproc.h | 5 |
2 files changed, 58 insertions, 50 deletions
diff --git a/src/transports/inproc/cinproc.c b/src/transports/inproc/cinproc.c index 997588b..00c1bb7 100644 --- a/src/transports/inproc/cinproc.c +++ b/src/transports/inproc/cinproc.c @@ -33,9 +33,8 @@ #include <stddef.h> #define NN_CINPROC_STATE_IDLE 1 -#define NN_CINPROC_STATE_DISCONNECTED 2 -#define NN_CINPROC_STATE_ACTIVE 3 -#define NN_CINPROC_STATE_STOPPING 4 +#define NN_CINPROC_STATE_ACTIVE 2 +#define NN_CINPROC_STATE_STOPPING 3 #define NN_CINPROC_ACTION_CONNECT 1 @@ -68,8 +67,10 @@ int nn_cinproc_create (void *hint, struct nn_epbase **epbase) nn_fsm_init_root (&self->fsm, nn_cinproc_handler, nn_cinproc_shutdown, nn_epbase_getctx (&self->item.epbase)); self->state = NN_CINPROC_STATE_IDLE; - nn_sinproc_init (&self->sinproc, NN_CINPROC_SRC_SINPROC, - &self->item.epbase, &self->fsm); + nn_list_init (&self->sinprocs); + + nn_epbase_stat_increment (&self->item.epbase, + NN_STAT_INPROGRESS_CONNECTIONS, 1); /* Start the state machine. */ nn_fsm_start (&self->fsm); @@ -96,7 +97,7 @@ static void nn_cinproc_destroy (struct nn_epbase *self) cinproc = nn_cont (self, struct nn_cinproc, item.epbase); - nn_sinproc_term (&cinproc->sinproc); + nn_list_term (&cinproc->sinprocs); nn_fsm_term (&cinproc->fsm); nn_ins_item_term (&cinproc->item); @@ -108,19 +109,35 @@ static void nn_cinproc_connect (struct nn_ins_item *self, { struct nn_cinproc *cinproc; struct nn_binproc *binproc; + struct nn_sinproc *sinproc; cinproc = nn_cont (self, struct nn_cinproc, item); binproc = nn_cont (peer, struct nn_binproc, item); - nn_assert_state (cinproc, NN_CINPROC_STATE_DISCONNECTED); - nn_sinproc_connect (&cinproc->sinproc, &binproc->fsm); - nn_fsm_action (&cinproc->fsm, NN_CINPROC_ACTION_CONNECT); + nn_assert_state (cinproc, NN_CINPROC_STATE_ACTIVE); + + sinproc = nn_alloc (sizeof (struct nn_sinproc), "sinproc"); + alloc_assert (sinproc); + nn_sinproc_init (sinproc, NN_CINPROC_SRC_SINPROC, + &cinproc->item.epbase, &cinproc->fsm); + + nn_list_insert (&cinproc->sinprocs, &sinproc->item, + nn_list_end (&cinproc->sinprocs)); + + nn_sinproc_connect (sinproc, &binproc->fsm); + + nn_epbase_stat_increment (&cinproc->item.epbase, + NN_STAT_INPROGRESS_CONNECTIONS, -1); + nn_epbase_stat_increment (&cinproc->item.epbase, + NN_STAT_ESTABLISHED_CONNECTIONS, 1); } static void nn_cinproc_shutdown (struct nn_fsm *self, int src, int type, NN_UNUSED void *srcptr) { struct nn_cinproc *cinproc; + struct nn_sinproc *sinproc; + struct nn_list_item *it; cinproc = nn_cont (self, struct nn_cinproc, fsm); @@ -131,11 +148,23 @@ static void nn_cinproc_shutdown (struct nn_fsm *self, int src, int type, nn_ins_disconnect (&cinproc->item); /* Stop the existing connection. */ - nn_sinproc_stop (&cinproc->sinproc); + for (it = nn_list_begin (&cinproc->sinprocs); + it != nn_list_end (&cinproc->sinprocs); + it = nn_list_next (&cinproc->sinprocs, it)) { + sinproc = nn_cont (it, struct nn_sinproc, item); + nn_sinproc_stop (sinproc); + } cinproc->state = NN_CINPROC_STATE_STOPPING; + goto finish; } if (nn_slow (cinproc->state == NN_CINPROC_STATE_STOPPING)) { - if (!nn_sinproc_isidle (&cinproc->sinproc)) + sinproc = (struct nn_sinproc *) srcptr; + nn_list_erase (&cinproc->sinprocs, &sinproc->item); + nn_sinproc_term (sinproc); + nn_free (sinproc); + +finish: + if (!nn_list_empty (&cinproc->sinprocs)) return; cinproc->state = NN_CINPROC_STATE_IDLE; nn_fsm_stopped_noevent (&cinproc->fsm); @@ -151,6 +180,7 @@ static void nn_cinproc_handler (struct nn_fsm *self, int src, int type, { struct nn_cinproc *cinproc; struct nn_sinproc *sinproc; + struct nn_sinproc *peer; cinproc = nn_cont (self, struct nn_cinproc, fsm); @@ -166,9 +196,7 @@ static void nn_cinproc_handler (struct nn_fsm *self, int src, int type, case NN_FSM_ACTION: switch (type) { case NN_FSM_START: - cinproc->state = NN_CINPROC_STATE_DISCONNECTED; - nn_epbase_stat_increment (&cinproc->item.epbase, - NN_STAT_INPROGRESS_CONNECTIONS, 1); + cinproc->state = NN_CINPROC_STATE_ACTIVE; return; default: nn_fsm_bad_action (cinproc->state, src, type); @@ -179,30 +207,22 @@ static void nn_cinproc_handler (struct nn_fsm *self, int src, int type, } /******************************************************************************/ -/* DISCONNECTED state. */ +/* ACTIVE state. */ /******************************************************************************/ - case NN_CINPROC_STATE_DISCONNECTED: + case NN_CINPROC_STATE_ACTIVE: switch (src) { - - case NN_FSM_ACTION: - switch (type) { - case NN_CINPROC_ACTION_CONNECT: - cinproc->state = NN_CINPROC_STATE_ACTIVE; - nn_epbase_stat_increment (&cinproc->item.epbase, - NN_STAT_INPROGRESS_CONNECTIONS, -1); - nn_epbase_stat_increment (&cinproc->item.epbase, - NN_STAT_ESTABLISHED_CONNECTIONS, 1); - return; - default: - nn_fsm_bad_action (cinproc->state, src, type); - } - case NN_SINPROC_SRC_PEER: - sinproc = (struct nn_sinproc*) srcptr; + peer = (struct nn_sinproc*) srcptr; + switch (type) { case NN_SINPROC_CONNECT: - nn_sinproc_accept (&cinproc->sinproc, sinproc); - cinproc->state = NN_CINPROC_STATE_ACTIVE; + sinproc = nn_alloc (sizeof (struct nn_sinproc), "sinproc"); + alloc_assert (sinproc); + nn_sinproc_init (sinproc, NN_CINPROC_SRC_SINPROC, + &cinproc->item.epbase, &cinproc->fsm); + nn_list_insert (&cinproc->sinprocs, &sinproc->item, + nn_list_end (&cinproc->sinprocs)); + nn_sinproc_accept (sinproc, peer); nn_epbase_stat_increment (&cinproc->item.epbase, NN_STAT_INPROGRESS_CONNECTIONS, -1); nn_epbase_stat_increment (&cinproc->item.epbase, @@ -212,29 +232,14 @@ static void nn_cinproc_handler (struct nn_fsm *self, int src, int type, nn_fsm_bad_action (cinproc->state, src, type); } - default: - nn_fsm_bad_source (cinproc->state, src, type); - } - -/******************************************************************************/ -/* ACTIVE state. */ -/******************************************************************************/ - case NN_CINPROC_STATE_ACTIVE: - switch (src) { case NN_CINPROC_SRC_SINPROC: switch (type) { case NN_SINPROC_DISCONNECT: - cinproc->state = NN_CINPROC_STATE_DISCONNECTED; nn_epbase_stat_increment (&cinproc->item.epbase, NN_STAT_INPROGRESS_CONNECTIONS, 1); - - nn_sinproc_init (&cinproc->sinproc, NN_CINPROC_SRC_SINPROC, - &cinproc->item.epbase, &cinproc->fsm); return; - - default: - nn_fsm_bad_action (cinproc->state, src, type); } + return; default: nn_fsm_bad_source (cinproc->state, src, type); diff --git a/src/transports/inproc/cinproc.h b/src/transports/inproc/cinproc.h index bccaed0..8a6355e 100644 --- a/src/transports/inproc/cinproc.h +++ b/src/transports/inproc/cinproc.h @@ -1,5 +1,6 @@ /* Copyright (c) 2012-2013 Martin Sustrik All rights reserved. + Copyright 2016 Garrett D'Amore <garrett@damore.org> Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), @@ -30,6 +31,8 @@ #include "../../aio/fsm.h" +#include "../../utils/list.h" + struct nn_cinproc { /* The state machine. */ @@ -40,7 +43,7 @@ struct nn_cinproc { struct nn_ins_item item; /* The actual inproc session. */ - struct nn_sinproc sinproc; + struct nn_list sinprocs; }; int nn_cinproc_create (void *hint, struct nn_epbase **epbase); |