diff options
author | David Howells <dhowells@redhat.com> | 2016-06-17 15:42:35 +0100 |
---|---|---|
committer | David Howells <dhowells@redhat.com> | 2016-06-22 09:20:55 +0100 |
commit | 999b69f89241c9384c104b84329c13350fd696ef (patch) | |
tree | 636bb30d72eac731d0dc11055479948d52989d49 /net/rxrpc/conn_object.c | |
parent | 5627cc8b961e4b07d5d649d9bd01ac929dcc1a95 (diff) | |
download | linux-next-999b69f89241c9384c104b84329c13350fd696ef.tar.gz |
rxrpc: Kill the client connection bundle concept
Kill off the concept of maintaining a bundle of connections to a particular
target service to increase the number of call slots available for any
beyond four for that service (there are four call slots per connection).
This will make cleaning up the connection handling code easier and
facilitate removal of the rxrpc_transport struct. Bundling can be
reintroduced later if necessary.
Signed-off-by: David Howells <dhowells@redhat.com>
Diffstat (limited to 'net/rxrpc/conn_object.c')
-rw-r--r-- | net/rxrpc/conn_object.c | 552 |
1 files changed, 197 insertions, 355 deletions
diff --git a/net/rxrpc/conn_object.c b/net/rxrpc/conn_object.c index 1754f2e2e16b..276ff505394f 100644 --- a/net/rxrpc/conn_object.c +++ b/net/rxrpc/conn_object.c @@ -32,152 +32,6 @@ DEFINE_RWLOCK(rxrpc_connection_lock); static DECLARE_DELAYED_WORK(rxrpc_connection_reap, rxrpc_connection_reaper); /* - * allocate a new client connection bundle - */ -static struct rxrpc_conn_bundle *rxrpc_alloc_bundle(gfp_t gfp) -{ - struct rxrpc_conn_bundle *bundle; - - _enter(""); - - bundle = kzalloc(sizeof(struct rxrpc_conn_bundle), gfp); - if (bundle) { - INIT_LIST_HEAD(&bundle->unused_conns); - INIT_LIST_HEAD(&bundle->avail_conns); - INIT_LIST_HEAD(&bundle->busy_conns); - init_waitqueue_head(&bundle->chanwait); - atomic_set(&bundle->usage, 1); - } - - _leave(" = %p", bundle); - return bundle; -} - -/* - * compare bundle parameters with what we're looking for - * - return -ve, 0 or +ve - */ -static inline -int rxrpc_cmp_bundle(const struct rxrpc_conn_bundle *bundle, - struct key *key, u16 service_id) -{ - return (bundle->service_id - service_id) ?: - ((unsigned long)bundle->key - (unsigned long)key); -} - -/* - * get bundle of client connections that a client socket can make use of - */ -struct rxrpc_conn_bundle *rxrpc_get_bundle(struct rxrpc_sock *rx, - struct rxrpc_transport *trans, - struct key *key, - u16 service_id, - gfp_t gfp) -{ - struct rxrpc_conn_bundle *bundle, *candidate; - struct rb_node *p, *parent, **pp; - - _enter("%p{%x},%x,%hx,", - rx, key_serial(key), trans->debug_id, service_id); - - /* search the extant bundles first for one that matches the specified - * user ID */ - spin_lock(&trans->client_lock); - - p = trans->bundles.rb_node; - while (p) { - bundle = rb_entry(p, struct rxrpc_conn_bundle, node); - - if (rxrpc_cmp_bundle(bundle, key, service_id) < 0) - p = p->rb_left; - else if (rxrpc_cmp_bundle(bundle, key, service_id) > 0) - p = p->rb_right; - else - goto found_extant_bundle; - } - - spin_unlock(&trans->client_lock); - - /* not yet present - create a candidate for a new record and then - * redo the search */ - candidate = rxrpc_alloc_bundle(gfp); - if (!candidate) { - _leave(" = -ENOMEM"); - return ERR_PTR(-ENOMEM); - } - - candidate->key = key_get(key); - candidate->service_id = service_id; - - spin_lock(&trans->client_lock); - - pp = &trans->bundles.rb_node; - parent = NULL; - while (*pp) { - parent = *pp; - bundle = rb_entry(parent, struct rxrpc_conn_bundle, node); - - if (rxrpc_cmp_bundle(bundle, key, service_id) < 0) - pp = &(*pp)->rb_left; - else if (rxrpc_cmp_bundle(bundle, key, service_id) > 0) - pp = &(*pp)->rb_right; - else - goto found_extant_second; - } - - /* second search also failed; add the new bundle */ - bundle = candidate; - candidate = NULL; - - rb_link_node(&bundle->node, parent, pp); - rb_insert_color(&bundle->node, &trans->bundles); - spin_unlock(&trans->client_lock); - _net("BUNDLE new on trans %d", trans->debug_id); - _leave(" = %p [new]", bundle); - return bundle; - - /* we found the bundle in the list immediately */ -found_extant_bundle: - atomic_inc(&bundle->usage); - spin_unlock(&trans->client_lock); - _net("BUNDLE old on trans %d", trans->debug_id); - _leave(" = %p [extant %d]", bundle, atomic_read(&bundle->usage)); - return bundle; - - /* we found the bundle on the second time through the list */ -found_extant_second: - atomic_inc(&bundle->usage); - spin_unlock(&trans->client_lock); - kfree(candidate); - _net("BUNDLE old2 on trans %d", trans->debug_id); - _leave(" = %p [second %d]", bundle, atomic_read(&bundle->usage)); - return bundle; -} - -/* - * release a bundle - */ -void rxrpc_put_bundle(struct rxrpc_transport *trans, - struct rxrpc_conn_bundle *bundle) -{ - _enter("%p,%p{%d}",trans, bundle, atomic_read(&bundle->usage)); - - if (atomic_dec_and_lock(&bundle->usage, &trans->client_lock)) { - _debug("Destroy bundle"); - rb_erase(&bundle->node, &trans->bundles); - spin_unlock(&trans->client_lock); - ASSERT(list_empty(&bundle->unused_conns)); - ASSERT(list_empty(&bundle->avail_conns)); - ASSERT(list_empty(&bundle->busy_conns)); - ASSERTCMP(bundle->num_conns, ==, 0); - key_put(bundle->key); - kfree(bundle); - } - - _leave(""); -} - -/* * allocate a new connection */ static struct rxrpc_connection *rxrpc_alloc_connection(gfp_t gfp) @@ -188,8 +42,10 @@ static struct rxrpc_connection *rxrpc_alloc_connection(gfp_t gfp) conn = kzalloc(sizeof(struct rxrpc_connection), gfp); if (conn) { + spin_lock_init(&conn->channel_lock); + init_waitqueue_head(&conn->channel_wq); INIT_WORK(&conn->processor, &rxrpc_process_connection); - INIT_LIST_HEAD(&conn->bundle_link); + INIT_LIST_HEAD(&conn->link); conn->calls = RB_ROOT; skb_queue_head_init(&conn->rx_queue); conn->security = &rxrpc_no_security; @@ -197,7 +53,7 @@ static struct rxrpc_connection *rxrpc_alloc_connection(gfp_t gfp) spin_lock_init(&conn->state_lock); atomic_set(&conn->usage, 1); conn->debug_id = atomic_inc_return(&rxrpc_debug_id); - conn->avail_calls = RXRPC_MAXCALLS; + atomic_set(&conn->avail_chans, RXRPC_MAXCALLS); conn->size_align = 4; conn->header_size = sizeof(struct rxrpc_wire_header); } @@ -240,7 +96,8 @@ static void rxrpc_add_call_ID_to_conn(struct rxrpc_connection *conn, } /* - * Allocate a client connection. + * Allocate a client connection. The caller must take care to clear any + * padding bytes in *cp. */ static struct rxrpc_connection * rxrpc_alloc_client_connection(struct rxrpc_conn_parameters *cp, @@ -275,7 +132,7 @@ rxrpc_alloc_client_connection(struct rxrpc_conn_parameters *cp, break; } - ret = rxrpc_get_client_connection_id(conn, trans, gfp); + ret = rxrpc_get_client_connection_id(conn, gfp); if (ret < 0) goto error_0; @@ -290,6 +147,8 @@ rxrpc_alloc_client_connection(struct rxrpc_conn_parameters *cp, write_unlock(&rxrpc_connection_lock); key_get(conn->params.key); + conn->trans = trans; + atomic_inc(&trans->usage); _leave(" = %p", conn); return conn; @@ -303,217 +162,173 @@ error_0: } /* - * connect a call on an exclusive connection - */ -static int rxrpc_connect_exclusive(struct rxrpc_sock *rx, - struct rxrpc_conn_parameters *cp, - struct rxrpc_transport *trans, - struct rxrpc_call *call, - gfp_t gfp) -{ - struct rxrpc_connection *conn; - int chan; - - _enter(""); - - conn = rxrpc_alloc_client_connection(cp, trans, gfp); - if (IS_ERR(conn)) { - _leave(" = %ld", PTR_ERR(conn)); - return PTR_ERR(conn); - } - - atomic_inc(&trans->usage); - conn->trans = trans; - conn->bundle = NULL; - - _net("CONNECT EXCL new %d on TRANS %d", - conn->debug_id, conn->trans->debug_id); - - /* Since no one else can use the connection, we just use the first - * channel. - */ - chan = 0; - rxrpc_get_connection(conn); - conn->avail_calls = RXRPC_MAXCALLS - 1; - conn->channels[chan] = call; - conn->call_counter = 1; - call->conn = conn; - call->channel = chan; - call->cid = conn->proto.cid | chan; - call->call_id = 1; - - _net("CONNECT client on conn %d chan %d as call %x", - conn->debug_id, chan, call->call_id); - - rxrpc_add_call_ID_to_conn(conn, call); - _leave(" = 0"); - return 0; -} - -/* * find a connection for a call * - called in process context with IRQs enabled */ -int rxrpc_connect_call(struct rxrpc_sock *rx, +int rxrpc_connect_call(struct rxrpc_call *call, struct rxrpc_conn_parameters *cp, struct rxrpc_transport *trans, - struct rxrpc_conn_bundle *bundle, - struct rxrpc_call *call, + struct sockaddr_rxrpc *srx, gfp_t gfp) { - struct rxrpc_connection *conn, *candidate; + struct rxrpc_connection *conn, *candidate = NULL; + struct rxrpc_local *local = cp->local; + struct rb_node *p, **pp, *parent; + long diff; int chan; DECLARE_WAITQUEUE(myself, current); - _enter("%p,%lx,", rx, call->user_call_ID); - - if (cp->exclusive) - return rxrpc_connect_exclusive(rx, cp, trans, call, gfp); - - spin_lock(&trans->client_lock); - for (;;) { - /* see if the bundle has a call slot available */ - if (!list_empty(&bundle->avail_conns)) { - _debug("avail"); - conn = list_entry(bundle->avail_conns.next, - struct rxrpc_connection, - bundle_link); - if (conn->state >= RXRPC_CONN_REMOTELY_ABORTED) { - list_del_init(&conn->bundle_link); - bundle->num_conns--; - continue; - } - if (--conn->avail_calls == 0) - list_move(&conn->bundle_link, - &bundle->busy_conns); - ASSERTCMP(conn->avail_calls, <, RXRPC_MAXCALLS); - ASSERT(conn->channels[0] == NULL || - conn->channels[1] == NULL || - conn->channels[2] == NULL || - conn->channels[3] == NULL); - rxrpc_get_connection(conn); - break; - } + _enter("{%d,%lx},", call->debug_id, call->user_call_ID); - if (!list_empty(&bundle->unused_conns)) { - _debug("unused"); - conn = list_entry(bundle->unused_conns.next, - struct rxrpc_connection, - bundle_link); - if (conn->state >= RXRPC_CONN_REMOTELY_ABORTED) { - list_del_init(&conn->bundle_link); - bundle->num_conns--; - continue; - } - ASSERTCMP(conn->avail_calls, ==, RXRPC_MAXCALLS); - conn->avail_calls = RXRPC_MAXCALLS - 1; - ASSERT(conn->channels[0] == NULL && - conn->channels[1] == NULL && - conn->channels[2] == NULL && - conn->channels[3] == NULL); - rxrpc_get_connection(conn); - list_move(&conn->bundle_link, &bundle->avail_conns); - break; + cp->peer = trans->peer; + rxrpc_get_peer(cp->peer); + + if (!cp->exclusive) { + /* Search for a existing client connection unless this is going + * to be a connection that's used exclusively for a single call. + */ + _debug("search 1"); + spin_lock(&local->client_conns_lock); + p = local->client_conns.rb_node; + while (p) { + conn = rb_entry(p, struct rxrpc_connection, client_node); + +#define cmp(X) ((long)conn->params.X - (long)cp->X) + diff = (cmp(peer) ?: + cmp(key) ?: + cmp(security_level)); + if (diff < 0) + p = p->rb_left; + else if (diff > 0) + p = p->rb_right; + else + goto found_extant_conn; } + spin_unlock(&local->client_conns_lock); + } - /* need to allocate a new connection */ - _debug("get new conn [%d]", bundle->num_conns); + /* We didn't find a connection or we want an exclusive one. */ + _debug("get new conn"); + candidate = rxrpc_alloc_client_connection(cp, trans, gfp); + if (!candidate) { + _leave(" = -ENOMEM"); + return -ENOMEM; + } - spin_unlock(&trans->client_lock); + if (cp->exclusive) { + /* Assign the call on an exclusive connection to channel 0 and + * don't add the connection to the endpoint's shareable conn + * lookup tree. + */ + _debug("exclusive chan 0"); + conn = candidate; + atomic_set(&conn->avail_chans, RXRPC_MAXCALLS - 1); + spin_lock(&conn->channel_lock); + chan = 0; + goto found_channel; + } - if (signal_pending(current)) - goto interrupted; + /* We need to redo the search before attempting to add a new connection + * lest we race with someone else adding a conflicting instance. + */ + _debug("search 2"); + spin_lock(&local->client_conns_lock); - if (bundle->num_conns >= 20) { - _debug("too many conns"); + pp = &local->client_conns.rb_node; + parent = NULL; + while (*pp) { + parent = *pp; + conn = rb_entry(parent, struct rxrpc_connection, client_node); - if (!gfpflags_allow_blocking(gfp)) { - _leave(" = -EAGAIN"); - return -EAGAIN; - } + diff = (cmp(peer) ?: + cmp(key) ?: + cmp(security_level)); + if (diff < 0) + pp = &(*pp)->rb_left; + else if (diff > 0) + pp = &(*pp)->rb_right; + else + goto found_extant_conn; + } - add_wait_queue(&bundle->chanwait, &myself); - for (;;) { - set_current_state(TASK_INTERRUPTIBLE); - if (bundle->num_conns < 20 || - !list_empty(&bundle->unused_conns) || - !list_empty(&bundle->avail_conns)) - break; - if (signal_pending(current)) - goto interrupted_dequeue; - schedule(); - } - remove_wait_queue(&bundle->chanwait, &myself); - __set_current_state(TASK_RUNNING); - spin_lock(&trans->client_lock); - continue; - } + /* The second search also failed; simply add the new connection with + * the new call in channel 0. Note that we need to take the channel + * lock before dropping the client conn lock. + */ + _debug("new conn"); + conn = candidate; + candidate = NULL; - /* not yet present - create a candidate for a new connection and then - * redo the check */ - candidate = rxrpc_alloc_client_connection(cp, trans, gfp); - if (!candidate) { - _leave(" = -ENOMEM"); - return -ENOMEM; - } + rb_link_node(&conn->client_node, parent, pp); + rb_insert_color(&conn->client_node, &local->client_conns); + + atomic_set(&conn->avail_chans, RXRPC_MAXCALLS - 1); + spin_lock(&conn->channel_lock); + spin_unlock(&local->client_conns_lock); + chan = 0; + +found_channel: + _debug("found chan"); + call->conn = conn; + call->channel = chan; + call->epoch = conn->proto.epoch; + call->cid = conn->proto.cid | chan; + call->call_id = ++conn->call_counter; + rcu_assign_pointer(conn->channels[chan], call); - atomic_inc(&bundle->usage); - atomic_inc(&trans->usage); - candidate->trans = trans; - candidate->bundle = bundle; + _net("CONNECT call %d on conn %d", call->debug_id, conn->debug_id); - spin_lock(&trans->client_lock); + rxrpc_add_call_ID_to_conn(conn, call); + spin_unlock(&conn->channel_lock); + _leave(" = %p {u=%d}", conn, atomic_read(&conn->usage)); + return 0; + + /* We found a suitable connection already in existence. Discard any + * candidate we may have allocated, and try to get a channel on this + * one. + */ +found_extant_conn: + _debug("found conn"); + rxrpc_get_connection(conn); + spin_unlock(&local->client_conns_lock); - list_add(&candidate->bundle_link, &bundle->unused_conns); - bundle->num_conns++; + rxrpc_put_connection(candidate); - _net("CONNECT new %d on TRANS %d", - candidate->debug_id, candidate->trans->debug_id); + if (!atomic_add_unless(&conn->avail_chans, -1, 0)) { + if (!gfpflags_allow_blocking(gfp)) { + rxrpc_put_connection(conn); + _leave(" = -EAGAIN"); + return -EAGAIN; + } - /* leave the candidate lurking in zombie mode attached to the - * bundle until we're ready for it */ - rxrpc_put_connection(candidate); - candidate = NULL; + add_wait_queue(&conn->channel_wq, &myself); + for (;;) { + set_current_state(TASK_INTERRUPTIBLE); + if (atomic_add_unless(&conn->avail_chans, -1, 0)) + break; + if (signal_pending(current)) + goto interrupted; + schedule(); + } + remove_wait_queue(&conn->channel_wq, &myself); + __set_current_state(TASK_RUNNING); } - /* we've got a connection with a free channel and we can now attach the - * call to it - * - we're holding the transport's client lock - * - we're holding a reference on the connection - * - we're holding a reference on the bundle + /* The connection allegedly now has a free channel and we can now + * attach the call to it. */ + spin_lock(&conn->channel_lock); + for (chan = 0; chan < RXRPC_MAXCALLS; chan++) if (!conn->channels[chan]) goto found_channel; - ASSERT(conn->channels[0] == NULL || - conn->channels[1] == NULL || - conn->channels[2] == NULL || - conn->channels[3] == NULL); BUG(); -found_channel: - conn->channels[chan] = call; - call->conn = conn; - call->channel = chan; - call->cid = conn->proto.cid | chan; - call->call_id = ++conn->call_counter; - - _net("CONNECT client on conn %d chan %d as call %x", - conn->debug_id, chan, call->call_id); - - ASSERTCMP(conn->avail_calls, <, RXRPC_MAXCALLS); - spin_unlock(&trans->client_lock); - - rxrpc_add_call_ID_to_conn(conn, call); - - _leave(" = 0"); - return 0; - -interrupted_dequeue: - remove_wait_queue(&bundle->chanwait, &myself); - __set_current_state(TASK_RUNNING); interrupted: + remove_wait_queue(&conn->channel_wq, &myself); + __set_current_state(TASK_RUNNING); + rxrpc_put_connection(conn); _leave(" = -ERESTARTSYS"); return -ERESTARTSYS; } @@ -521,8 +336,8 @@ interrupted: /* * get a record of an incoming connection */ -struct rxrpc_connection * -rxrpc_incoming_connection(struct rxrpc_transport *trans, struct sk_buff *skb) +struct rxrpc_connection *rxrpc_incoming_connection(struct rxrpc_transport *trans, + struct sk_buff *skb) { struct rxrpc_connection *conn, *candidate = NULL; struct rxrpc_skb_priv *sp = rxrpc_skb(skb); @@ -543,7 +358,7 @@ rxrpc_incoming_connection(struct rxrpc_transport *trans, struct sk_buff *skb) p = trans->server_conns.rb_node; while (p) { - conn = rb_entry(p, struct rxrpc_connection, node); + conn = rb_entry(p, struct rxrpc_connection, service_node); _debug("maybe %x", conn->proto.cid); @@ -588,7 +403,7 @@ rxrpc_incoming_connection(struct rxrpc_transport *trans, struct sk_buff *skb) p = NULL; while (*pp) { p = *pp; - conn = rb_entry(p, struct rxrpc_connection, node); + conn = rb_entry(p, struct rxrpc_connection, service_node); if (epoch < conn->proto.epoch) pp = &(*pp)->rb_left; @@ -605,8 +420,8 @@ rxrpc_incoming_connection(struct rxrpc_transport *trans, struct sk_buff *skb) /* we can now add the new candidate to the list */ conn = candidate; candidate = NULL; - rb_link_node(&conn->node, p, pp); - rb_insert_color(&conn->node, &trans->server_conns); + rb_link_node(&conn->service_node, p, pp); + rb_insert_color(&conn->service_node, &trans->server_conns); atomic_inc(&conn->trans->usage); write_unlock_bh(&trans->conn_lock); @@ -672,7 +487,7 @@ struct rxrpc_connection *rxrpc_find_connection(struct rxrpc_transport *trans, if (sp->hdr.flags & RXRPC_CLIENT_INITIATED) { p = trans->server_conns.rb_node; while (p) { - conn = rb_entry(p, struct rxrpc_connection, node); + conn = rb_entry(p, struct rxrpc_connection, service_node); _debug("maybe %x", conn->proto.cid); @@ -705,10 +520,31 @@ found: } /* + * Disconnect a call and clear any channel it occupies when that call + * terminates. + */ +void rxrpc_disconnect_call(struct rxrpc_call *call) +{ + struct rxrpc_connection *conn = call->conn; + unsigned chan = call->channel; + + _enter("%d,%d", conn->debug_id, call->channel); + + if (conn->channels[chan] == call) { + rcu_assign_pointer(conn->channels[chan], NULL); + atomic_inc(&conn->avail_chans); + wake_up(&conn->channel_wq); + } +} + +/* * release a virtual connection */ void rxrpc_put_connection(struct rxrpc_connection *conn) { + if (!conn) + return; + _enter("%p{u=%d,d=%d}", conn, atomic_read(&conn->usage), conn->debug_id); @@ -734,9 +570,6 @@ static void rxrpc_destroy_connection(struct rxrpc_connection *conn) _net("DESTROY CONN %d", conn->debug_id); - if (conn->bundle) - rxrpc_put_bundle(conn->trans, conn->bundle); - ASSERT(RB_EMPTY_ROOT(&conn->calls)); rxrpc_purge_queue(&conn->rx_queue); @@ -773,30 +606,39 @@ static void rxrpc_connection_reaper(struct work_struct *work) if (likely(atomic_read(&conn->usage) > 0)) continue; - spin_lock(&conn->trans->client_lock); - write_lock_bh(&conn->trans->conn_lock); - reap_time = conn->put_time + rxrpc_connection_expiry; + if (rxrpc_conn_is_client(conn)) { + struct rxrpc_local *local = conn->params.local; + spin_lock(&local->client_conns_lock); + reap_time = conn->put_time + rxrpc_connection_expiry; - if (atomic_read(&conn->usage) > 0) { - ; - } else if (reap_time <= now) { - list_move_tail(&conn->link, &graveyard); - if (conn->out_clientflag) + if (atomic_read(&conn->usage) > 0) { + ; + } else if (reap_time <= now) { + list_move_tail(&conn->link, &graveyard); rxrpc_put_client_connection_id(conn); - else - rb_erase(&conn->node, + rb_erase(&conn->client_node, + &local->client_conns); + } else if (reap_time < earliest) { + earliest = reap_time; + } + + spin_unlock(&local->client_conns_lock); + } else { + write_lock_bh(&conn->trans->conn_lock); + reap_time = conn->put_time + rxrpc_connection_expiry; + + if (atomic_read(&conn->usage) > 0) { + ; + } else if (reap_time <= now) { + list_move_tail(&conn->link, &graveyard); + rb_erase(&conn->service_node, &conn->trans->server_conns); - if (conn->bundle) { - list_del_init(&conn->bundle_link); - conn->bundle->num_conns--; + } else if (reap_time < earliest) { + earliest = reap_time; } - } else if (reap_time < earliest) { - earliest = reap_time; + write_unlock_bh(&conn->trans->conn_lock); } - - write_unlock_bh(&conn->trans->conn_lock); - spin_unlock(&conn->trans->client_lock); } write_unlock(&rxrpc_connection_lock); |