/* vim:set ft=c ts=2 sw=2 sts=2 et cindent: */ /* * Copyright 2012-2013 Michael Steinert * * Permission is hereby granted, free of charge, to any person obtaining a * copy of this software and associated documentation files (the "Software"), * to deal in the Software without restriction, including without limitation * the rights to use, copy, modify, merge, publish, distribute, sublicense, * and/or sell copies of the Software, and to permit persons to whom the * Software is furnished to do so, subject to the following conditions: * * The above copyright notice and this permission notice shall be included in * all copies or substantial portions of the Software. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER * DEALINGS IN THE SOFTWARE. */ #ifdef HAVE_CONFIG_H #include "config.h" #endif #include "amqp_private.h" #include "amqp_tcp_socket.h" #include #include #include struct amqp_tcp_socket_t { const struct amqp_socket_class_t *klass; int sockfd; void *buffer; size_t buffer_length; int internal_error; }; static ssize_t amqp_tcp_socket_send_inner(void *base, const void *buf, size_t len, int flags) { struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base; ssize_t res; const char *buf_left = buf; ssize_t len_left = len; #ifdef MSG_NOSIGNAL flags |= MSG_NOSIGNAL; #endif start: res = send(self->sockfd, buf_left, len_left, flags); if (res < 0) { self->internal_error = amqp_os_socket_error(); if (EINTR == self->internal_error) { goto start; } else { res = AMQP_STATUS_SOCKET_ERROR; } } else { if (res == len_left) { self->internal_error = 0; res = AMQP_STATUS_OK; } else { buf_left += res; len_left -= res; goto start; } } return res; } static ssize_t amqp_tcp_socket_send(void *base, const void *buf, size_t len) { return amqp_tcp_socket_send_inner(base, buf, len, 0); } static ssize_t amqp_tcp_socket_writev(void *base, struct iovec *iov, int iovcnt) { struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base; ssize_t ret; #if defined(_WIN32) DWORD res; /* Making the assumption here that WSAsend won't do a partial send * unless an error occured, in which case we're hosed so it doesn't matter */ if (WSASend(self->sockfd, (LPWSABUF)iov, iovcnt, &res, 0, NULL, NULL) == 0) { self->internal_error = 0; ret = AMQP_STATUS_OK; } else { self->internal_error = WSAGetLastError(); ret = AMQP_STATUS_SOCKET_ERROR; } return ret; #elif defined(MSG_MORE) int i; for (i = 0; i < iovcnt - 1; ++i) { ret = amqp_tcp_socket_send_inner(self, iov[i].iov_base, iov[i].iov_len, MSG_MORE); if (ret != AMQP_STATUS_OK) { goto exit; } } ret = amqp_tcp_socket_send_inner(self, iov[i].iov_base, iov[i].iov_len, 0); exit: return ret; #elif defined(SO_NOSIGPIPE) || !defined(MSG_NOSIGNAL) int i; ssize_t len_left = 0; struct iovec *iov_left = iov; int iovcnt_left = iovcnt; for (i = 0; i < iovcnt; ++i) { len_left += iov[i].iov_len; } start: ret = writev(self->sockfd, iov_left, iovcnt_left); if (ret < 0) { self->internal_error = amqp_os_socket_error(); if (EINTR == self->internal_error) { goto start; } else { self->internal_error = amqp_os_socket_error(); ret = AMQP_STATUS_SOCKET_ERROR; } } else { if (ret == len_left) { self->internal_error = 0; ret = AMQP_STATUS_OK; } else { len_left -= ret; for (i = 0; i < iovcnt_left; ++i) { if (ret < (ssize_t)iov_left[i].iov_len) { iov_left[i].iov_base = ((char*)iov_left[i].iov_base) + ret; iov_left[i].iov_len -= ret; iovcnt_left -= i; iov_left += i; break; } else { ret -= iov_left[i].iov_len; } } goto start; } } return ret; #else int i; size_t bytes = 0; void *bufferp; for (i = 0; i < iovcnt; ++i) { bytes += iov[i].iov_len; } if (self->buffer_length < bytes) { self->buffer = realloc(self->buffer, bytes); if (NULL == self->buffer) { self->buffer_length = 0; self->internal_error = 0; ret = AMQP_STATUS_NO_MEMORY; goto exit; } self->buffer_length = bytes; } bufferp = self->buffer; for (i = 0; i < iovcnt; ++i) { memcpy(bufferp, iov[i].iov_base, iov[i].iov_len); bufferp += iov[i].iov_len; } ret = amqp_tcp_socket_send_inner(self, self->buffer, bytes, 0); exit: return ret; #endif } static ssize_t amqp_tcp_socket_recv(void *base, void *buf, size_t len, int flags) { struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base; ssize_t ret; start: ret = recv(self->sockfd, buf, len, flags); if (0 > ret) { self->internal_error = amqp_os_socket_error(); if (EINTR == self->internal_error) { goto start; } else { ret = AMQP_STATUS_SOCKET_ERROR; } } else if (0 == ret) { ret = AMQP_STATUS_CONNECTION_CLOSED; } return ret; } static int amqp_tcp_socket_open(void *base, const char *host, int port, struct timeval *timeout) { struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base; self->sockfd = amqp_open_socket_noblock(host, port, timeout); if (0 > self->sockfd) { int err = self->sockfd; self->sockfd = -1; return err; } return AMQP_STATUS_OK; } static int amqp_tcp_socket_close(void *base) { struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base; if (-1 != self->sockfd) { if (amqp_os_socket_close(self->sockfd)) { return AMQP_STATUS_SOCKET_ERROR; } self->sockfd = -1; } return AMQP_STATUS_OK; } static int amqp_tcp_socket_get_sockfd(void *base) { struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base; return self->sockfd; } static void amqp_tcp_socket_delete(void *base) { struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base; if (self) { amqp_tcp_socket_close(self); free(self->buffer); free(self); } } static const struct amqp_socket_class_t amqp_tcp_socket_class = { amqp_tcp_socket_writev, /* writev */ amqp_tcp_socket_send, /* send */ amqp_tcp_socket_recv, /* recv */ amqp_tcp_socket_open, /* open */ amqp_tcp_socket_close, /* close */ amqp_tcp_socket_get_sockfd, /* get_sockfd */ amqp_tcp_socket_delete /* delete */ }; amqp_socket_t * amqp_tcp_socket_new(amqp_connection_state_t state) { struct amqp_tcp_socket_t *self = calloc(1, sizeof(*self)); if (!self) { return NULL; } self->klass = &amqp_tcp_socket_class; self->sockfd = -1; amqp_set_socket(state, (amqp_socket_t *)self); return (amqp_socket_t *)self; } void amqp_tcp_socket_set_sockfd(amqp_socket_t *base, int sockfd) { struct amqp_tcp_socket_t *self; if (base->klass != &amqp_tcp_socket_class) { amqp_abort("<%p> is not of type amqp_tcp_socket_t", base); } self = (struct amqp_tcp_socket_t *)base; self->sockfd = sockfd; }