diff options
author | Ethan Jackson <ethan@nicira.com> | 2013-10-30 12:59:15 -0700 |
---|---|---|
committer | Ethan Jackson <ethan@nicira.com> | 2013-12-12 17:13:04 -0800 |
commit | 8e407f2744c0fad4fe4785c7be5849eb6ad1f903 (patch) | |
tree | 8813cc0d75b39a71ab9a6765e717fadf0064802b /ofproto/netflow.c | |
parent | 8bfaca5b9a63a4900c3c99fc6a97e2b821870f66 (diff) | |
download | openvswitch-8e407f2744c0fad4fe4785c7be5849eb6ad1f903.tar.gz |
netflow: Make thread safe.
In future patches upcall handler threads will need to update netflow.
Signed-off-by: Ethan Jackson <ethan@nicira.com>
Acked-by: Ben Pfaff <blp@nicira.com>
Diffstat (limited to 'ofproto/netflow.c')
-rw-r--r-- | ofproto/netflow.c | 78 |
1 files changed, 66 insertions, 12 deletions
diff --git a/ofproto/netflow.c b/ofproto/netflow.c index 4e8949324..c91ecf068 100644 --- a/ofproto/netflow.c +++ b/ofproto/netflow.c @@ -52,6 +52,8 @@ struct netflow { long long int reconfig_time; /* When we reconfigured the timeouts. */ struct hmap flows; /* Contains 'netflow_flows'. */ + + atomic_int ref_cnt; }; struct netflow_flow { @@ -76,10 +78,15 @@ struct netflow_flow { long long int used; /* Last-used time (0 if never used). */ }; +static struct ovs_mutex mutex = OVS_MUTEX_INITIALIZER; + static struct netflow_flow *netflow_flow_lookup(const struct netflow *, - const struct flow *); + const struct flow *) + OVS_REQUIRES(mutex); static uint32_t netflow_flow_hash(const struct flow *); -static void netflow_expire__(struct netflow *, struct netflow_flow *); +static void netflow_expire__(struct netflow *, struct netflow_flow *) + OVS_REQUIRES(mutex); +static void netflow_run__(struct netflow *) OVS_REQUIRES(mutex); void netflow_mask_wc(struct flow *flow, struct flow_wildcards *wc) @@ -98,6 +105,7 @@ netflow_mask_wc(struct flow *flow, struct flow_wildcards *wc) static void gen_netflow_rec(struct netflow *nf, struct netflow_flow *nf_flow, uint32_t packet_count, uint32_t byte_count) + OVS_REQUIRES(mutex) { struct netflow_v5_header *nf_hdr; struct netflow_v5_record *nf_rec; @@ -157,7 +165,7 @@ gen_netflow_rec(struct netflow *nf, struct netflow_flow *nf_flow, /* NetFlow messages are limited to 30 records. */ if (ntohs(nf_hdr->count) >= 30) { - netflow_run(nf); + netflow_run__(nf); } } @@ -165,6 +173,7 @@ void netflow_flow_update(struct netflow *nf, struct flow *flow, ofp_port_t output_iface, const struct dpif_flow_stats *stats) + OVS_EXCLUDED(mutex) { struct netflow_flow *nf_flow; long long int used; @@ -174,6 +183,7 @@ netflow_flow_update(struct netflow *nf, struct flow *flow, return; } + ovs_mutex_lock(&mutex); nf_flow = netflow_flow_lookup(nf, flow); if (!nf_flow) { nf_flow = xzalloc(sizeof *nf_flow); @@ -209,10 +219,13 @@ netflow_flow_update(struct netflow *nf, struct flow *flow, nf_flow->last_expired = time_msec(); } } + + ovs_mutex_unlock(&mutex); } static void netflow_expire__(struct netflow *nf, struct netflow_flow *nf_flow) + OVS_REQUIRES(mutex) { uint64_t pkts, bytes; @@ -264,32 +277,38 @@ netflow_expire__(struct netflow *nf, struct netflow_flow *nf_flow) } void -netflow_expire(struct netflow *nf, struct flow *flow) +netflow_expire(struct netflow *nf, struct flow *flow) OVS_EXCLUDED(mutex) { - struct netflow_flow *nf_flow = netflow_flow_lookup(nf, flow); + struct netflow_flow *nf_flow; + ovs_mutex_lock(&mutex); + nf_flow = netflow_flow_lookup(nf, flow); if (nf_flow) { netflow_expire__(nf, nf_flow); } + ovs_mutex_unlock(&mutex); } void -netflow_flow_clear(struct netflow *nf, struct flow *flow) +netflow_flow_clear(struct netflow *nf, struct flow *flow) OVS_EXCLUDED(mutex) { - struct netflow_flow *nf_flow = netflow_flow_lookup(nf, flow); + struct netflow_flow *nf_flow; + ovs_mutex_lock(&mutex); + nf_flow = netflow_flow_lookup(nf, flow); if (nf_flow) { ovs_assert(!nf_flow->packet_count); ovs_assert(!nf_flow->byte_count); hmap_remove(&nf->flows, &nf_flow->hmap_node); free(nf_flow); } + ovs_mutex_unlock(&mutex); } /* Returns true if it's time to send out a round of NetFlow active timeouts, * false otherwise. */ -void -netflow_run(struct netflow *nf) +static void +netflow_run__(struct netflow *nf) OVS_REQUIRES(mutex) { long long int now = time_msec(); struct netflow_flow *nf_flow, *next; @@ -321,23 +340,35 @@ netflow_run(struct netflow *nf) } void -netflow_wait(struct netflow *nf) +netflow_run(struct netflow *nf) { + ovs_mutex_lock(&mutex); + netflow_run__(nf); + ovs_mutex_unlock(&mutex); +} + +void +netflow_wait(struct netflow *nf) OVS_EXCLUDED(mutex) +{ + ovs_mutex_lock(&mutex); if (nf->active_timeout) { poll_timer_wait_until(nf->next_timeout); } if (nf->packet.size) { poll_immediate_wake(); } + ovs_mutex_unlock(&mutex); } int netflow_set_options(struct netflow *nf, const struct netflow_options *nf_options) + OVS_EXCLUDED(mutex) { int error = 0; long long int old_timeout; + ovs_mutex_lock(&mutex); nf->engine_type = nf_options->engine_type; nf->engine_id = nf_options->engine_id; nf->add_id_to_iface = nf_options->add_id_to_iface; @@ -356,6 +387,7 @@ netflow_set_options(struct netflow *nf, nf->reconfig_time = time_msec(); nf->next_timeout = time_msec(); } + ovs_mutex_unlock(&mutex); return error; } @@ -371,14 +403,35 @@ netflow_create(void) nf->add_id_to_iface = false; nf->netflow_cnt = 0; hmap_init(&nf->flows); + atomic_init(&nf->ref_cnt, 1); ofpbuf_init(&nf->packet, 1500); return nf; } -void -netflow_destroy(struct netflow *nf) +struct netflow * +netflow_ref(const struct netflow *nf_) { + struct netflow *nf = CONST_CAST(struct netflow *, nf_); if (nf) { + int orig; + atomic_add(&nf->ref_cnt, 1, &orig); + ovs_assert(orig > 0); + } + return nf; +} + +void +netflow_unref(struct netflow *nf) +{ + int orig; + + if (!nf) { + return; + } + + atomic_sub(&nf->ref_cnt, 1, &orig); + ovs_assert(orig > 0); + if (orig == 1) { ofpbuf_uninit(&nf->packet); collectors_destroy(nf->collectors); free(nf); @@ -389,6 +442,7 @@ netflow_destroy(struct netflow *nf) static struct netflow_flow * netflow_flow_lookup(const struct netflow *nf, const struct flow *flow) + OVS_REQUIRES(mutex) { struct netflow_flow *nf_flow; |