summaryrefslogtreecommitdiff
path: root/librabbitmq/amqp_socket.c
diff options
context:
space:
mode:
Diffstat (limited to 'librabbitmq/amqp_socket.c')
-rw-r--r--librabbitmq/amqp_socket.c28
1 files changed, 16 insertions, 12 deletions
diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c
index 6b9486c..2025549 100644
--- a/librabbitmq/amqp_socket.c
+++ b/librabbitmq/amqp_socket.c
@@ -35,12 +35,16 @@
#endif
#include "amqp_private.h"
-#include <stdlib.h>
+#include <assert.h>
+#include <netdb.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+#include <stdarg.h>
+#include <stdint.h>
#include <stdio.h>
+#include <stdlib.h>
#include <string.h>
-#include <stdint.h>
-#include <stdarg.h>
-#include <assert.h>
+#include <sys/socket.h>
int amqp_open_socket(char const *hostname,
int portnumber)
@@ -62,16 +66,16 @@ int amqp_open_socket(char const *hostname,
addr.sin_port = htons(portnumber);
addr.sin_addr.s_addr = * (uint32_t *) he->h_addr_list[0];
- sockfd = socket(PF_INET, SOCK_STREAM, 0);
+ sockfd = amqp_socket_socket(PF_INET, SOCK_STREAM, 0);
if (sockfd == -1)
- return -amqp_socket_error();
+ return -amqp_socket_error(NULL);
if (amqp_socket_setsockopt(sockfd, IPPROTO_TCP, TCP_NODELAY, &one,
sizeof(one)) < 0
|| connect(sockfd, (struct sockaddr *) &addr, sizeof(addr)) < 0)
{
- res = -amqp_socket_error();
- amqp_socket_close(sockfd);
+ res = -amqp_socket_error(NULL);
+ amqp_socket_close(sockfd, NULL);
return res;
}
@@ -83,7 +87,7 @@ int amqp_send_header(amqp_connection_state_t state) {
AMQP_PROTOCOL_VERSION_MAJOR,
AMQP_PROTOCOL_VERSION_MINOR,
AMQP_PROTOCOL_VERSION_REVISION };
- return send(state->sockfd, (void *)header, 8, 0);
+ return state->send(state->sockfd, (void *)header, 8, 0, state->user_data);
}
static amqp_bytes_t sasl_method_name(amqp_sasl_method_enum method) {
@@ -173,13 +177,13 @@ static int wait_frame_inner(amqp_connection_state_t state,
assert(res != 0);
}
- res = recv(state->sockfd, state->sock_inbound_buffer.bytes,
- state->sock_inbound_buffer.len, 0);
+ res = state->recv(state->sockfd, state->sock_inbound_buffer.bytes,
+ state->sock_inbound_buffer.len, 0, state->user_data);
if (res <= 0) {
if (res == 0)
return -ERROR_CONNECTION_CLOSED;
else
- return -amqp_socket_error();
+ return -state->error(state->user_data);
}
state->sock_inbound_limit = res;