summaryrefslogtreecommitdiff
path: root/librabbitmq/amqp_tcp_socket.c
diff options
context:
space:
mode:
Diffstat (limited to 'librabbitmq/amqp_tcp_socket.c')
-rw-r--r--librabbitmq/amqp_tcp_socket.c47
1 files changed, 26 insertions, 21 deletions
diff --git a/librabbitmq/amqp_tcp_socket.c b/librabbitmq/amqp_tcp_socket.c
index e43a596..ed38c06 100644
--- a/librabbitmq/amqp_tcp_socket.c
+++ b/librabbitmq/amqp_tcp_socket.c
@@ -55,7 +55,7 @@ amqp_tcp_socket_send_inner(void *base, const void *buf, size_t len, int flags)
#endif
start:
- res = send(self->sockfd, buf, len, flags);
+ res = send(self->sockfd, buf_left, len_left, flags);
if (res < 0) {
self->internal_error = amqp_os_socket_error();
@@ -220,10 +220,10 @@ start:
}
static int
-amqp_tcp_socket_open(void *base, const char *host, int port)
+amqp_tcp_socket_open(void *base, const char *host, int port, struct timeval *timeout)
{
struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base;
- self->sockfd = amqp_open_socket(host, port);
+ self->sockfd = amqp_open_socket_noblock(host, port, timeout);
if (0 > self->sockfd) {
int err = self->sockfd;
self->sockfd = -1;
@@ -236,32 +236,34 @@ static int
amqp_tcp_socket_close(void *base)
{
struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base;
- int status = -1;
- if (self) {
- status = amqp_os_socket_close(self->sockfd);
- free(self->buffer);
- free(self);
- }
- if (0 == status) {
- return AMQP_STATUS_OK;
- } else {
- return AMQP_STATUS_SOCKET_ERROR;
+ if (-1 != self->sockfd) {
+ if (amqp_os_socket_close(self->sockfd)) {
+ return AMQP_STATUS_SOCKET_ERROR;
+ }
+ self->sockfd = -1;
}
+
+ return AMQP_STATUS_OK;
}
static int
-amqp_tcp_socket_error(AMQP_UNUSED void *base)
+amqp_tcp_socket_get_sockfd(void *base)
{
struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base;
- return self->internal_error;
+ return self->sockfd;
}
-static int
-amqp_tcp_socket_get_sockfd(void *base)
+static void
+amqp_tcp_socket_delete(void *base)
{
struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base;
- return self->sockfd;
+
+ if (self) {
+ amqp_tcp_socket_close(self);
+ free(self->buffer);
+ free(self);
+ }
}
static const struct amqp_socket_class_t amqp_tcp_socket_class = {
@@ -270,12 +272,12 @@ static const struct amqp_socket_class_t amqp_tcp_socket_class = {
amqp_tcp_socket_recv, /* recv */
amqp_tcp_socket_open, /* open */
amqp_tcp_socket_close, /* close */
- amqp_tcp_socket_error, /* error */
- amqp_tcp_socket_get_sockfd /* get_sockfd */
+ amqp_tcp_socket_get_sockfd, /* get_sockfd */
+ amqp_tcp_socket_delete /* delete */
};
amqp_socket_t *
-amqp_tcp_socket_new(void)
+amqp_tcp_socket_new(amqp_connection_state_t state)
{
struct amqp_tcp_socket_t *self = calloc(1, sizeof(*self));
if (!self) {
@@ -283,6 +285,9 @@ amqp_tcp_socket_new(void)
}
self->klass = &amqp_tcp_socket_class;
self->sockfd = -1;
+
+ amqp_set_socket(state, (amqp_socket_t *)self);
+
return (amqp_socket_t *)self;
}