From 526d76892105cdcf8702246f058af22bfb6f3abb Mon Sep 17 00:00:00 2001 From: Magnus Feuer Date: Tue, 23 Jun 2015 15:10:02 -0700 Subject: Fix for parameters --- deps/bt/c_src/bt_linux_drv.c | 123 +++++++++++++++++++++++++++++++------------ deps/bt/src/bt_drv.erl | 3 +- 2 files changed, 92 insertions(+), 34 deletions(-) (limited to 'deps') diff --git a/deps/bt/c_src/bt_linux_drv.c b/deps/bt/c_src/bt_linux_drv.c index 9dc54b9..e46e24c 100644 --- a/deps/bt/c_src/bt_linux_drv.c +++ b/deps/bt/c_src/bt_linux_drv.c @@ -58,6 +58,11 @@ int set_nonblock(int fd) #define ERR_SHORT 1 #define ERR_LONG 2 +typedef struct { + bt_ctx_t* ctx; // access to subscription list + int pending_accepts; + subscription_list_t acceptors; +} linux_listen_queue_t; static int mgmt_open(void) @@ -379,6 +384,8 @@ static void rfcomm_running(struct pollfd* pfd, void* arg) if (pfd->revents & POLLIN) { // input ready DEBUGF("rfcomm_running: %d has input", PTR2INT(s->handle)); + DEBUGF("rfcomm_running: sub %p", s); + DEBUGF("rfcomm_running: id %d", s->id); bt_data_len = read(pfd->fd, bt_data, sizeof(bt_data)); @@ -404,17 +411,22 @@ static void rfcomm_running(struct pollfd* pfd, void* arg) ddata_send(&data, 1); ddata_final(&data); - release_subscription(s); + // Redefine the subscriber as waiting again + s->accept = s->opaque; + s->handle = -1; + bt_poll_del(pfd->fd); shutdown(pfd->fd, 2); close(pfd->fd); } + if (pfd->revents & POLLOUT) { // output ready DEBUGF("rfcomm_running: %d may output", PTR2INT(s->handle)); // FIXME: Send additional pending data. } } + // CALLBACK static void rfcomm_connected(struct pollfd* pfd, void* arg) { @@ -430,24 +442,58 @@ static void rfcomm_connected(struct pollfd* pfd, void* arg) -// CALLBACK static void rfcomm_accept(struct pollfd* pfd, void* arg) { - subscription_t *accept_s = (subscription_t*) arg; - // We have a pending client connection + subscription_t *listen_s = (subscription_t*) arg; + linux_listen_queue_t* lq = (linux_listen_queue_t*) listen_s->opaque; struct sockaddr_rc rem_addr = { 0 }; - int client; + struct sockaddr_rc loc_addr = { 0 }; + int client_des = 0; socklen_t alen = sizeof(rem_addr); uint8_t buf[64]; ddata_t data; + subscription_link_t* link = 0; + subscription_t* accept_s = 0; - client = accept(pfd->fd, (struct sockaddr *)&rem_addr, &alen); - /* send EVENT id {accept,Address,Channel} */ - - bt_poll_add(client, POLLIN | POLLHUP, rfcomm_running, accept_s); - accept_s->handle = INT2PTR(client); + if (listen_s->type != RFCOMM_LISTEN) { + DEBUGF("RFCOMM: not a listen subscription", 0); + return; + } + + // + // Find a waiting process that is currently accepting traffic + // + link = lq->acceptors.first; + while(link) { + if (link->s->accept == listen_s) + break; + link = link->next; + } + + if (!link) { // peek + DEBUGF("RFCOMM: no accepting processes. Will add to pending accepts."); + lq->pending_accepts++; + return; + } + + // Retrieve the accepting subscriber process + accept_s = link->s; + + client_des = accept(pfd->fd, (struct sockaddr *)&rem_addr, &alen); + + bt_poll_add(client_des, POLLIN | POLLHUP, rfcomm_running, accept_s); + + accept_s->handle = INT2PTR(client_des); + accept_s->accept = NULL; // This one is no longer handling accepts. + + getsockname(client_des, (struct sockaddr *)&loc_addr, &alen); + + DEBUGF("RFCOMM: accept on %X", &loc_addr.rc_channel); + DEBUGF("RFCOMM: desc %d", client_des); + DEBUGF("RFCOMM: ptr %p", accept_s); + DEBUGF("RFCOMM: id %p", accept_s->id); ddata_init(&data, buf, sizeof(buf), 0); ddata_put_UINT32(&data, 0); @@ -456,7 +502,7 @@ static void rfcomm_accept(struct pollfd* pfd, void* arg) ddata_put_tag(&data, TUPLE); ddata_put_atom(&data, "accept"); ddata_put_addr(&data, &rem_addr.rc_bdaddr); - ddata_put_uint8(&data, PTR2INT(accept_s->handle)); + ddata_put_uint8(&data, loc_addr.rc_channel); ddata_put_tag(&data, TUPLE_END); ddata_send(&data, 1); ddata_final(&data); @@ -621,7 +667,7 @@ void bt_command(bt_ctx_t* ctx, const uint8_t* src, uint32_t src_len) uint32_t sid = 0; uint8_t channel = 0; subscription_t* listen_sub = 0; -// listen_queue_t* lq = 0; + linux_listen_queue_t* lq = 0; int listen_desc = 0; struct sockaddr_rc loc_addr = { 0 }; int dev_id; @@ -651,6 +697,11 @@ void bt_command(bt_ctx_t* ctx, const uint8_t* src, uint32_t src_len) cleanup)) == NULL) goto mem_error; + if ((lq = alloc_type(linux_listen_queue_t)) == NULL) { + release_subscription(listen_sub); + goto mem_error; + } + dev_id = hci_get_route(NULL); hci_devinfo(dev_id, &dev_info); ba2str( &dev_info.bdaddr, buf ); @@ -680,9 +731,13 @@ void bt_command(bt_ctx_t* ctx, const uint8_t* src, uint32_t src_len) } listen_sub->handle = INT2PTR(listen_desc); - listen_sub->opaque = INT2PTR(channel); + listen_sub->opaque = (void* ) lq; insert_last(&ctx->list, listen_sub); + // Call rfcomm_accept with listen subscriber + // when someone connects to us. + bt_poll_add(listen_desc, POLLIN, rfcomm_accept, listen_sub); + ddata_put_tag(&data_out, REPLY_OK); ddata_put_UINT32(&data_out, cmdid); goto reply; @@ -692,7 +747,7 @@ void bt_command(bt_ctx_t* ctx, const uint8_t* src, uint32_t src_len) case CMD_RFCOMM_ACCEPT: { /* id:32 listen_id:32 */ uint32_t sid = 0; uint32_t listen_id = 0; -// listen_queue_t* lq = 0; + linux_listen_queue_t* lq = 0; subscription_t* listen = 0; subscription_t* s = 0; @@ -722,15 +777,17 @@ void bt_command(bt_ctx_t* ctx, const uint8_t* src, uint32_t src_len) if ((s = new_subscription(RFCOMM,sid,cmdid,NULL,cleanup)) == NULL) goto mem_error; - // s->accept = listen; // mark that we are accepting - - bt_poll_add(PTR2INT(listen->handle), POLLIN, rfcomm_accept, s); - s->handle = listen->opaque; + s->accept = listen; // mark that we are accepting + s->opaque = listen; + + lq = (linux_listen_queue_t*) listen->opaque; + insert_last(&lq->acceptors, s); insert_last(&ctx->list, s); ddata_put_tag(&data_out, REPLY_OK); ddata_put_UINT32(&data_out, cmdid); ddata_send(&data_out, 1); + goto done; } @@ -757,23 +814,23 @@ void bt_command(bt_ctx_t* ctx, const uint8_t* src, uint32_t src_len) unlink_subscription(link); goto done; } - /* else if (s->accept != NULL) { */ - /* listen_queue_t* lq = (listen_queue_t*)((s->accept)->opaque); */ - /* remove_subscription(&lq->wait,RFCOMM,sid); */ - /* unlink_subscription(link); */ - /* goto ok; */ - /* } */ + else if (s->accept != NULL) { + linux_listen_queue_t* lq = (linux_listen_queue_t*)((s->accept)->opaque); + remove_subscription(&lq->acceptors,RFCOMM,sid); + unlink_subscription(link); + goto ok; + } } else if ((link = find_subscription_link(&ctx->list,RFCOMM_LISTEN,sid)) != NULL) { - /* subscription_t* listen = link->s; */ - /* listen_queue_t* lq = (listen_queue_t*)listen->opaque; */ - /* subscription_link_t* link1; */ - /* /\* remove all waiters *\/ */ - /* while((link1=lq->wait.first) != NULL) { */ - /* send_event(link1->s->id, "closed"); */ - /* unlink_subscription(link1); */ - /* } */ - /* unlink_subscription(link); */ + subscription_t* listen = link->s; + linux_listen_queue_t* lq = (linux_listen_queue_t*)listen->opaque; + subscription_link_t* link1; + /* remove all waiters */ + while((link1=lq->acceptors.first) != NULL) { + send_event(link1->s->id, "closed"); + unlink_subscription(link1); + } + unlink_subscription(link); goto ok; } goto error; diff --git a/deps/bt/src/bt_drv.erl b/deps/bt/src/bt_drv.erl index 6f44ac9..9d049b5 100644 --- a/deps/bt/src/bt_drv.erl +++ b/deps/bt/src/bt_drv.erl @@ -79,7 +79,7 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -%% -define(debug, true). +-define(debug, true). -ifdef(debug). -define(dbg(Fmt,As), io:format("~s:~w:" Fmt "\n", [?FILE,?LINE | As])). @@ -1062,6 +1062,7 @@ handle_info({Port,{data,Data}},State) when Port == State#state.bt_port -> S#subscription.subscriber ! {S#subscription.tag,S#subscription.ref,Decoded}, if Decoded == closed -> + ?dbg("closed",[]), unmon(S#subscription.monitor), SList = State#state.subscription -- [S], State1 = State#state { subscription = SList }, -- cgit v1.2.1