diff options
Diffstat (limited to 'librabbitmq/amqp_connection.c')
-rw-r--r-- | librabbitmq/amqp_connection.c | 61 |
1 files changed, 19 insertions, 42 deletions
diff --git a/librabbitmq/amqp_connection.c b/librabbitmq/amqp_connection.c index 6468da7..4666100 100644 --- a/librabbitmq/amqp_connection.c +++ b/librabbitmq/amqp_connection.c @@ -38,8 +38,10 @@ #include "config.h" #endif +#include "amqp-tcp-socket.h" #include "amqp_private.h" #include <assert.h> +#include <errno.h> #include <stdint.h> #include <stdio.h> #include <stdlib.h> @@ -59,18 +61,6 @@ _check_state->state); \ } -static ssize_t amqp_socket_send(int sockfd, const void *buf, size_t len, - int flags, AMQP_UNUSED void *user_data) -{ - return send(sockfd, buf, len, flags); -} - -static ssize_t amqp_socket_recv(int sockfd, void *buf, size_t len, int flags, - AMQP_UNUSED void *user_data) -{ - return recv(sockfd, buf, len, flags); -} - amqp_connection_state_t amqp_new_connection(void) { int res; @@ -101,7 +91,6 @@ amqp_connection_state_t amqp_new_connection(void) is also the minimum frame size */ state->target_size = 8; - state->sockfd = -1; state->sock_inbound_buffer.len = INITIAL_INBOUND_SOCK_BUFFER_SIZE; state->sock_inbound_buffer.bytes = malloc(INITIAL_INBOUND_SOCK_BUFFER_SIZE); if (state->sock_inbound_buffer.bytes == NULL) { @@ -120,36 +109,24 @@ out_nomem: int amqp_get_sockfd(amqp_connection_state_t state) { - return state->sockfd; + return amqp_socket_get_sockfd(state->socket); } void amqp_set_sockfd(amqp_connection_state_t state, int sockfd) { - state->sockfd = sockfd; - state->writev = amqp_socket_writev; - state->send = amqp_socket_send; - state->recv = amqp_socket_recv; - state->close = amqp_socket_close; - state->error = amqp_socket_error; - state->user_data = NULL; + amqp_socket_t *socket = amqp_tcp_socket_new(); + if (!socket) { + amqp_abort("%s", strerror(errno)); + } + amqp_tcp_socket_set_sockfd(socket, sockfd); + amqp_set_socket(state, socket); } -void amqp_set_sockfd_full(amqp_connection_state_t state, int sockfd, - amqp_socket_writev_fn writev_fn, - amqp_socket_send_fn send_fn, - amqp_socket_recv_fn recv_fn, - amqp_socket_close_fn close_fn, - amqp_socket_error_fn error_fn, - void *user_data) +void amqp_set_socket(amqp_connection_state_t state, amqp_socket_t *socket) { - state->sockfd = sockfd; - state->writev = writev_fn; - state->send = send_fn; - state->recv = recv_fn; - state->close = close_fn; - state->error = error_fn; - state->user_data = user_data; + amqp_socket_close(state->socket); + state->socket = socket; } int amqp_tune_connection(amqp_connection_state_t state, @@ -193,8 +170,8 @@ int amqp_destroy_connection(amqp_connection_state_t state) empty_amqp_pool(&state->decoding_pool); free(state->outbound_buffer.bytes); free(state->sock_inbound_buffer.bytes); - if (state->sockfd >= 0 && state->close(state->sockfd, state->user_data) < 0) - status = -state->error(state->user_data); + if (amqp_socket_close(state->socket) < 0) + status = -amqp_socket_error(state->socket); free(state); } return status; @@ -425,7 +402,7 @@ int amqp_send_frame(amqp_connection_state_t state, iov[2].iov_base = &frame_end_byte; iov[2].iov_len = FOOTER_SIZE; - res = state->writev(state->sockfd, iov, 3, state->user_data); + res = amqp_socket_writev(state->socket, iov, 3); } else { size_t out_frame_len; amqp_bytes_t encoded; @@ -473,13 +450,13 @@ int amqp_send_frame(amqp_connection_state_t state, amqp_e32(out_frame, 3, out_frame_len); amqp_e8(out_frame, out_frame_len + HEADER_SIZE, AMQP_FRAME_END); - res = state->send(state->sockfd, out_frame, - out_frame_len + HEADER_SIZE + FOOTER_SIZE, - MSG_NOSIGNAL, state->user_data); + res = amqp_socket_send(state->socket, out_frame, + out_frame_len + HEADER_SIZE + FOOTER_SIZE, + MSG_NOSIGNAL); } if (res < 0) { - return -state->error(state->user_data); + return -amqp_socket_error(state->socket); } else { return 0; } |