diff options
Diffstat (limited to 'librabbitmq/amqp_tcp_socket.c')
-rw-r--r-- | librabbitmq/amqp_tcp_socket.c | 47 |
1 files changed, 26 insertions, 21 deletions
diff --git a/librabbitmq/amqp_tcp_socket.c b/librabbitmq/amqp_tcp_socket.c index e43a596..ed38c06 100644 --- a/librabbitmq/amqp_tcp_socket.c +++ b/librabbitmq/amqp_tcp_socket.c @@ -55,7 +55,7 @@ amqp_tcp_socket_send_inner(void *base, const void *buf, size_t len, int flags) #endif start: - res = send(self->sockfd, buf, len, flags); + res = send(self->sockfd, buf_left, len_left, flags); if (res < 0) { self->internal_error = amqp_os_socket_error(); @@ -220,10 +220,10 @@ start: } static int -amqp_tcp_socket_open(void *base, const char *host, int port) +amqp_tcp_socket_open(void *base, const char *host, int port, struct timeval *timeout) { struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base; - self->sockfd = amqp_open_socket(host, port); + self->sockfd = amqp_open_socket_noblock(host, port, timeout); if (0 > self->sockfd) { int err = self->sockfd; self->sockfd = -1; @@ -236,32 +236,34 @@ static int amqp_tcp_socket_close(void *base) { struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base; - int status = -1; - if (self) { - status = amqp_os_socket_close(self->sockfd); - free(self->buffer); - free(self); - } - if (0 == status) { - return AMQP_STATUS_OK; - } else { - return AMQP_STATUS_SOCKET_ERROR; + if (-1 != self->sockfd) { + if (amqp_os_socket_close(self->sockfd)) { + return AMQP_STATUS_SOCKET_ERROR; + } + self->sockfd = -1; } + + return AMQP_STATUS_OK; } static int -amqp_tcp_socket_error(AMQP_UNUSED void *base) +amqp_tcp_socket_get_sockfd(void *base) { struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base; - return self->internal_error; + return self->sockfd; } -static int -amqp_tcp_socket_get_sockfd(void *base) +static void +amqp_tcp_socket_delete(void *base) { struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base; - return self->sockfd; + + if (self) { + amqp_tcp_socket_close(self); + free(self->buffer); + free(self); + } } static const struct amqp_socket_class_t amqp_tcp_socket_class = { @@ -270,12 +272,12 @@ static const struct amqp_socket_class_t amqp_tcp_socket_class = { amqp_tcp_socket_recv, /* recv */ amqp_tcp_socket_open, /* open */ amqp_tcp_socket_close, /* close */ - amqp_tcp_socket_error, /* error */ - amqp_tcp_socket_get_sockfd /* get_sockfd */ + amqp_tcp_socket_get_sockfd, /* get_sockfd */ + amqp_tcp_socket_delete /* delete */ }; amqp_socket_t * -amqp_tcp_socket_new(void) +amqp_tcp_socket_new(amqp_connection_state_t state) { struct amqp_tcp_socket_t *self = calloc(1, sizeof(*self)); if (!self) { @@ -283,6 +285,9 @@ amqp_tcp_socket_new(void) } self->klass = &amqp_tcp_socket_class; self->sockfd = -1; + + amqp_set_socket(state, (amqp_socket_t *)self); + return (amqp_socket_t *)self; } |