summaryrefslogtreecommitdiff
path: root/librabbitmq/amqp_connection.c
diff options
context:
space:
mode:
Diffstat (limited to 'librabbitmq/amqp_connection.c')
-rw-r--r--librabbitmq/amqp_connection.c61
1 files changed, 19 insertions, 42 deletions
diff --git a/librabbitmq/amqp_connection.c b/librabbitmq/amqp_connection.c
index 6468da7..4666100 100644
--- a/librabbitmq/amqp_connection.c
+++ b/librabbitmq/amqp_connection.c
@@ -38,8 +38,10 @@
#include "config.h"
#endif
+#include "amqp-tcp-socket.h"
#include "amqp_private.h"
#include <assert.h>
+#include <errno.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
@@ -59,18 +61,6 @@
_check_state->state); \
}
-static ssize_t amqp_socket_send(int sockfd, const void *buf, size_t len,
- int flags, AMQP_UNUSED void *user_data)
-{
- return send(sockfd, buf, len, flags);
-}
-
-static ssize_t amqp_socket_recv(int sockfd, void *buf, size_t len, int flags,
- AMQP_UNUSED void *user_data)
-{
- return recv(sockfd, buf, len, flags);
-}
-
amqp_connection_state_t amqp_new_connection(void)
{
int res;
@@ -101,7 +91,6 @@ amqp_connection_state_t amqp_new_connection(void)
is also the minimum frame size */
state->target_size = 8;
- state->sockfd = -1;
state->sock_inbound_buffer.len = INITIAL_INBOUND_SOCK_BUFFER_SIZE;
state->sock_inbound_buffer.bytes = malloc(INITIAL_INBOUND_SOCK_BUFFER_SIZE);
if (state->sock_inbound_buffer.bytes == NULL) {
@@ -120,36 +109,24 @@ out_nomem:
int amqp_get_sockfd(amqp_connection_state_t state)
{
- return state->sockfd;
+ return amqp_socket_get_sockfd(state->socket);
}
void amqp_set_sockfd(amqp_connection_state_t state,
int sockfd)
{
- state->sockfd = sockfd;
- state->writev = amqp_socket_writev;
- state->send = amqp_socket_send;
- state->recv = amqp_socket_recv;
- state->close = amqp_socket_close;
- state->error = amqp_socket_error;
- state->user_data = NULL;
+ amqp_socket_t *socket = amqp_tcp_socket_new();
+ if (!socket) {
+ amqp_abort("%s", strerror(errno));
+ }
+ amqp_tcp_socket_set_sockfd(socket, sockfd);
+ amqp_set_socket(state, socket);
}
-void amqp_set_sockfd_full(amqp_connection_state_t state, int sockfd,
- amqp_socket_writev_fn writev_fn,
- amqp_socket_send_fn send_fn,
- amqp_socket_recv_fn recv_fn,
- amqp_socket_close_fn close_fn,
- amqp_socket_error_fn error_fn,
- void *user_data)
+void amqp_set_socket(amqp_connection_state_t state, amqp_socket_t *socket)
{
- state->sockfd = sockfd;
- state->writev = writev_fn;
- state->send = send_fn;
- state->recv = recv_fn;
- state->close = close_fn;
- state->error = error_fn;
- state->user_data = user_data;
+ amqp_socket_close(state->socket);
+ state->socket = socket;
}
int amqp_tune_connection(amqp_connection_state_t state,
@@ -193,8 +170,8 @@ int amqp_destroy_connection(amqp_connection_state_t state)
empty_amqp_pool(&state->decoding_pool);
free(state->outbound_buffer.bytes);
free(state->sock_inbound_buffer.bytes);
- if (state->sockfd >= 0 && state->close(state->sockfd, state->user_data) < 0)
- status = -state->error(state->user_data);
+ if (amqp_socket_close(state->socket) < 0)
+ status = -amqp_socket_error(state->socket);
free(state);
}
return status;
@@ -425,7 +402,7 @@ int amqp_send_frame(amqp_connection_state_t state,
iov[2].iov_base = &frame_end_byte;
iov[2].iov_len = FOOTER_SIZE;
- res = state->writev(state->sockfd, iov, 3, state->user_data);
+ res = amqp_socket_writev(state->socket, iov, 3);
} else {
size_t out_frame_len;
amqp_bytes_t encoded;
@@ -473,13 +450,13 @@ int amqp_send_frame(amqp_connection_state_t state,
amqp_e32(out_frame, 3, out_frame_len);
amqp_e8(out_frame, out_frame_len + HEADER_SIZE, AMQP_FRAME_END);
- res = state->send(state->sockfd, out_frame,
- out_frame_len + HEADER_SIZE + FOOTER_SIZE,
- MSG_NOSIGNAL, state->user_data);
+ res = amqp_socket_send(state->socket, out_frame,
+ out_frame_len + HEADER_SIZE + FOOTER_SIZE,
+ MSG_NOSIGNAL);
}
if (res < 0) {
- return -state->error(state->user_data);
+ return -amqp_socket_error(state->socket);
} else {
return 0;
}