summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2016-10-05 21:27:14 -0700
committerGarrett D'Amore <garrett@damore.org>2016-10-19 20:29:59 -0700
commit4459720e0c413fd856eb4960c6bce86ea59dbed3 (patch)
tree6d0d5127616630c5fb2bf0dbda151c0f069e6254
parent08cb6aa9d27a1239ffbc0680a833184ad8d7666f (diff)
downloadnanomsg-4459720e0c413fd856eb4960c6bce86ea59dbed3.tar.gz
fixes #811 Memory leak when closing a PUSH socket before the PULL
-rw-r--r--src/transports/inproc/cinproc.c103
-rw-r--r--src/transports/inproc/cinproc.h5
2 files changed, 58 insertions, 50 deletions
diff --git a/src/transports/inproc/cinproc.c b/src/transports/inproc/cinproc.c
index 997588b..00c1bb7 100644
--- a/src/transports/inproc/cinproc.c
+++ b/src/transports/inproc/cinproc.c
@@ -33,9 +33,8 @@
#include <stddef.h>
#define NN_CINPROC_STATE_IDLE 1
-#define NN_CINPROC_STATE_DISCONNECTED 2
-#define NN_CINPROC_STATE_ACTIVE 3
-#define NN_CINPROC_STATE_STOPPING 4
+#define NN_CINPROC_STATE_ACTIVE 2
+#define NN_CINPROC_STATE_STOPPING 3
#define NN_CINPROC_ACTION_CONNECT 1
@@ -68,8 +67,10 @@ int nn_cinproc_create (void *hint, struct nn_epbase **epbase)
nn_fsm_init_root (&self->fsm, nn_cinproc_handler, nn_cinproc_shutdown,
nn_epbase_getctx (&self->item.epbase));
self->state = NN_CINPROC_STATE_IDLE;
- nn_sinproc_init (&self->sinproc, NN_CINPROC_SRC_SINPROC,
- &self->item.epbase, &self->fsm);
+ nn_list_init (&self->sinprocs);
+
+ nn_epbase_stat_increment (&self->item.epbase,
+ NN_STAT_INPROGRESS_CONNECTIONS, 1);
/* Start the state machine. */
nn_fsm_start (&self->fsm);
@@ -96,7 +97,7 @@ static void nn_cinproc_destroy (struct nn_epbase *self)
cinproc = nn_cont (self, struct nn_cinproc, item.epbase);
- nn_sinproc_term (&cinproc->sinproc);
+ nn_list_term (&cinproc->sinprocs);
nn_fsm_term (&cinproc->fsm);
nn_ins_item_term (&cinproc->item);
@@ -108,19 +109,35 @@ static void nn_cinproc_connect (struct nn_ins_item *self,
{
struct nn_cinproc *cinproc;
struct nn_binproc *binproc;
+ struct nn_sinproc *sinproc;
cinproc = nn_cont (self, struct nn_cinproc, item);
binproc = nn_cont (peer, struct nn_binproc, item);
- nn_assert_state (cinproc, NN_CINPROC_STATE_DISCONNECTED);
- nn_sinproc_connect (&cinproc->sinproc, &binproc->fsm);
- nn_fsm_action (&cinproc->fsm, NN_CINPROC_ACTION_CONNECT);
+ nn_assert_state (cinproc, NN_CINPROC_STATE_ACTIVE);
+
+ sinproc = nn_alloc (sizeof (struct nn_sinproc), "sinproc");
+ alloc_assert (sinproc);
+ nn_sinproc_init (sinproc, NN_CINPROC_SRC_SINPROC,
+ &cinproc->item.epbase, &cinproc->fsm);
+
+ nn_list_insert (&cinproc->sinprocs, &sinproc->item,
+ nn_list_end (&cinproc->sinprocs));
+
+ nn_sinproc_connect (sinproc, &binproc->fsm);
+
+ nn_epbase_stat_increment (&cinproc->item.epbase,
+ NN_STAT_INPROGRESS_CONNECTIONS, -1);
+ nn_epbase_stat_increment (&cinproc->item.epbase,
+ NN_STAT_ESTABLISHED_CONNECTIONS, 1);
}
static void nn_cinproc_shutdown (struct nn_fsm *self, int src, int type,
NN_UNUSED void *srcptr)
{
struct nn_cinproc *cinproc;
+ struct nn_sinproc *sinproc;
+ struct nn_list_item *it;
cinproc = nn_cont (self, struct nn_cinproc, fsm);
@@ -131,11 +148,23 @@ static void nn_cinproc_shutdown (struct nn_fsm *self, int src, int type,
nn_ins_disconnect (&cinproc->item);
/* Stop the existing connection. */
- nn_sinproc_stop (&cinproc->sinproc);
+ for (it = nn_list_begin (&cinproc->sinprocs);
+ it != nn_list_end (&cinproc->sinprocs);
+ it = nn_list_next (&cinproc->sinprocs, it)) {
+ sinproc = nn_cont (it, struct nn_sinproc, item);
+ nn_sinproc_stop (sinproc);
+ }
cinproc->state = NN_CINPROC_STATE_STOPPING;
+ goto finish;
}
if (nn_slow (cinproc->state == NN_CINPROC_STATE_STOPPING)) {
- if (!nn_sinproc_isidle (&cinproc->sinproc))
+ sinproc = (struct nn_sinproc *) srcptr;
+ nn_list_erase (&cinproc->sinprocs, &sinproc->item);
+ nn_sinproc_term (sinproc);
+ nn_free (sinproc);
+
+finish:
+ if (!nn_list_empty (&cinproc->sinprocs))
return;
cinproc->state = NN_CINPROC_STATE_IDLE;
nn_fsm_stopped_noevent (&cinproc->fsm);
@@ -151,6 +180,7 @@ static void nn_cinproc_handler (struct nn_fsm *self, int src, int type,
{
struct nn_cinproc *cinproc;
struct nn_sinproc *sinproc;
+ struct nn_sinproc *peer;
cinproc = nn_cont (self, struct nn_cinproc, fsm);
@@ -166,9 +196,7 @@ static void nn_cinproc_handler (struct nn_fsm *self, int src, int type,
case NN_FSM_ACTION:
switch (type) {
case NN_FSM_START:
- cinproc->state = NN_CINPROC_STATE_DISCONNECTED;
- nn_epbase_stat_increment (&cinproc->item.epbase,
- NN_STAT_INPROGRESS_CONNECTIONS, 1);
+ cinproc->state = NN_CINPROC_STATE_ACTIVE;
return;
default:
nn_fsm_bad_action (cinproc->state, src, type);
@@ -179,30 +207,22 @@ static void nn_cinproc_handler (struct nn_fsm *self, int src, int type,
}
/******************************************************************************/
-/* DISCONNECTED state. */
+/* ACTIVE state. */
/******************************************************************************/
- case NN_CINPROC_STATE_DISCONNECTED:
+ case NN_CINPROC_STATE_ACTIVE:
switch (src) {
-
- case NN_FSM_ACTION:
- switch (type) {
- case NN_CINPROC_ACTION_CONNECT:
- cinproc->state = NN_CINPROC_STATE_ACTIVE;
- nn_epbase_stat_increment (&cinproc->item.epbase,
- NN_STAT_INPROGRESS_CONNECTIONS, -1);
- nn_epbase_stat_increment (&cinproc->item.epbase,
- NN_STAT_ESTABLISHED_CONNECTIONS, 1);
- return;
- default:
- nn_fsm_bad_action (cinproc->state, src, type);
- }
-
case NN_SINPROC_SRC_PEER:
- sinproc = (struct nn_sinproc*) srcptr;
+ peer = (struct nn_sinproc*) srcptr;
+
switch (type) {
case NN_SINPROC_CONNECT:
- nn_sinproc_accept (&cinproc->sinproc, sinproc);
- cinproc->state = NN_CINPROC_STATE_ACTIVE;
+ sinproc = nn_alloc (sizeof (struct nn_sinproc), "sinproc");
+ alloc_assert (sinproc);
+ nn_sinproc_init (sinproc, NN_CINPROC_SRC_SINPROC,
+ &cinproc->item.epbase, &cinproc->fsm);
+ nn_list_insert (&cinproc->sinprocs, &sinproc->item,
+ nn_list_end (&cinproc->sinprocs));
+ nn_sinproc_accept (sinproc, peer);
nn_epbase_stat_increment (&cinproc->item.epbase,
NN_STAT_INPROGRESS_CONNECTIONS, -1);
nn_epbase_stat_increment (&cinproc->item.epbase,
@@ -212,29 +232,14 @@ static void nn_cinproc_handler (struct nn_fsm *self, int src, int type,
nn_fsm_bad_action (cinproc->state, src, type);
}
- default:
- nn_fsm_bad_source (cinproc->state, src, type);
- }
-
-/******************************************************************************/
-/* ACTIVE state. */
-/******************************************************************************/
- case NN_CINPROC_STATE_ACTIVE:
- switch (src) {
case NN_CINPROC_SRC_SINPROC:
switch (type) {
case NN_SINPROC_DISCONNECT:
- cinproc->state = NN_CINPROC_STATE_DISCONNECTED;
nn_epbase_stat_increment (&cinproc->item.epbase,
NN_STAT_INPROGRESS_CONNECTIONS, 1);
-
- nn_sinproc_init (&cinproc->sinproc, NN_CINPROC_SRC_SINPROC,
- &cinproc->item.epbase, &cinproc->fsm);
return;
-
- default:
- nn_fsm_bad_action (cinproc->state, src, type);
}
+ return;
default:
nn_fsm_bad_source (cinproc->state, src, type);
diff --git a/src/transports/inproc/cinproc.h b/src/transports/inproc/cinproc.h
index bccaed0..8a6355e 100644
--- a/src/transports/inproc/cinproc.h
+++ b/src/transports/inproc/cinproc.h
@@ -1,5 +1,6 @@
/*
Copyright (c) 2012-2013 Martin Sustrik All rights reserved.
+ Copyright 2016 Garrett D'Amore <garrett@damore.org>
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"),
@@ -30,6 +31,8 @@
#include "../../aio/fsm.h"
+#include "../../utils/list.h"
+
struct nn_cinproc {
/* The state machine. */
@@ -40,7 +43,7 @@ struct nn_cinproc {
struct nn_ins_item item;
/* The actual inproc session. */
- struct nn_sinproc sinproc;
+ struct nn_list sinprocs;
};
int nn_cinproc_create (void *hint, struct nn_epbase **epbase);