summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Antonuk <alan.antonuk@gmail.com>2015-04-27 22:19:30 -0700
committerAlan Antonuk <alan.antonuk@gmail.com>2015-05-03 20:24:03 -0700
commit43b6fc5ea0f36866dd79e9697e9e6a0135362e10 (patch)
treeee7916d391cf49284226aa08f2f39e9200f98c9f
parent2b8a417b27e221a7bf0201cdc4023ac05de0f108 (diff)
downloadrabbitmq-c-43b6fc5ea0f36866dd79e9697e9e6a0135362e10.tar.gz
Remove writev specific code.
-rw-r--r--librabbitmq/amqp_connection.c5
-rw-r--r--librabbitmq/amqp_openssl.c38
-rw-r--r--librabbitmq/amqp_socket.c50
-rw-r--r--librabbitmq/amqp_socket.h32
-rw-r--r--librabbitmq/amqp_tcp_socket.c108
5 files changed, 5 insertions, 228 deletions
diff --git a/librabbitmq/amqp_connection.c b/librabbitmq/amqp_connection.c
index 6c30ce3..7168bb5 100644
--- a/librabbitmq/amqp_connection.c
+++ b/librabbitmq/amqp_connection.c
@@ -471,7 +471,8 @@ static int amqp_frame_to_bytes(const amqp_frame_t *frame, amqp_bytes_t buffer,
case AMQP_FRAME_BODY: {
const amqp_bytes_t *body = &frame->payload.body_fragment;
- memcpy((char *)out_frame + HEADER_SIZE, body->bytes, body->len);
+ memcpy(amqp_offset(out_frame, HEADER_SIZE), body->bytes, body->len);
+
out_frame_len = body->len;
break;
}
@@ -519,7 +520,7 @@ static int amqp_frame_to_bytes(const amqp_frame_t *frame, amqp_bytes_t buffer,
}
amqp_e32(out_frame, 3, out_frame_len);
- amqp_e8(out_frame, out_frame_len + HEADER_SIZE, AMQP_FRAME_END);
+ amqp_e8(out_frame, HEADER_SIZE + out_frame_len, AMQP_FRAME_END);
encoded->bytes = out_frame;
encoded->len = out_frame_len + HEADER_SIZE + FOOTER_SIZE;
diff --git a/librabbitmq/amqp_openssl.c b/librabbitmq/amqp_openssl.c
index cd9707c..c225cdd 100644
--- a/librabbitmq/amqp_openssl.c
+++ b/librabbitmq/amqp_openssl.c
@@ -120,43 +120,6 @@ amqp_ssl_socket_send(void *base,
}
static ssize_t
-amqp_ssl_socket_writev(void *base,
- struct iovec *iov,
- int iovcnt)
-{
- struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
- ssize_t ret = -1;
- char *bufferp;
- size_t bytes;
- int i;
- if (-1 == self->sockfd) {
- return AMQP_STATUS_SOCKET_CLOSED;
- }
-
- bytes = 0;
- for (i = 0; i < iovcnt; ++i) {
- bytes += iov[i].iov_len;
- }
- if (self->length < bytes) {
- self->buffer = realloc(self->buffer, bytes);
- if (!self->buffer) {
- self->length = 0;
- ret = AMQP_STATUS_NO_MEMORY;
- goto exit;
- }
- self->length = bytes;
- }
- bufferp = self->buffer;
- for (i = 0; i < iovcnt; ++i) {
- memcpy(bufferp, iov[i].iov_base, iov[i].iov_len);
- bufferp += iov[i].iov_len;
- }
- ret = amqp_ssl_socket_send(self, self->buffer, bytes);
-exit:
- return ret;
-}
-
-static ssize_t
amqp_ssl_socket_recv(void *base,
void *buf,
size_t len,
@@ -450,7 +413,6 @@ amqp_ssl_socket_delete(void *base)
}
static const struct amqp_socket_class_t amqp_ssl_socket_class = {
- amqp_ssl_socket_writev, /* writev */
amqp_ssl_socket_send, /* send */
amqp_ssl_socket_recv, /* recv */
amqp_ssl_socket_open, /* open */
diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c
index 18399b4..d69f778 100644
--- a/librabbitmq/amqp_socket.c
+++ b/librabbitmq/amqp_socket.c
@@ -194,14 +194,6 @@ amqp_os_socket_close(int sockfd)
}
ssize_t
-amqp_socket_writev(amqp_socket_t *self, struct iovec *iov, int iovcnt)
-{
- assert(self);
- assert(self->klass->writev);
- return self->klass->writev(self, iov, iovcnt);
-}
-
-ssize_t
amqp_socket_send(amqp_socket_t *self, const void *buf, size_t len)
{
assert(self);
@@ -320,48 +312,6 @@ static int do_poll(amqp_connection_state_t state, int res,
return res;
}
-ssize_t amqp_try_writev(amqp_connection_state_t state, struct iovec *iov,
- int iovcnt, amqp_time_t deadline) {
- int i;
- ssize_t res;
- struct iovec *iov_left = iov;
- int iovcnt_left = iovcnt;
- ssize_t len_left;
-
- len_left = 0;
- for (i = 0; i < iovcnt_left; ++i) {
- len_left += iov_left[i].iov_len;
- }
-
-start_send:
- res = amqp_socket_writev(state->socket, iov_left, iovcnt_left);
-
- if (res > 0) {
- len_left -= res;
- if (0 == len_left) {
- return AMQP_STATUS_OK;
- }
- for (i = 0; i < iovcnt_left; ++i) {
- if (res < (ssize_t)iov_left[i].iov_len) {
- iov_left[i].iov_base = ((char *)iov_left[i].iov_base) + res;
- iov_left[i].iov_len -= res;
-
- iovcnt_left -= i;
- iov_left += i;
- break;
- } else {
- res -= iov_left[i].iov_len;
- }
- }
- goto start_send;
- }
- res = do_poll(state, res, deadline);
- if (AMQP_STATUS_OK == res) {
- goto start_send;
- }
- return res;
-}
-
ssize_t amqp_try_send(amqp_connection_state_t state, const void *buf,
size_t len, amqp_time_t deadline) {
ssize_t res;
diff --git a/librabbitmq/amqp_socket.h b/librabbitmq/amqp_socket.h
index 813189c..fdf2b0f 100644
--- a/librabbitmq/amqp_socket.h
+++ b/librabbitmq/amqp_socket.h
@@ -44,7 +44,6 @@ int
amqp_os_socket_close(int sockfd);
/* Socket callbacks. */
-typedef ssize_t (*amqp_socket_writev_fn)(void *, struct iovec *, int);
typedef ssize_t (*amqp_socket_send_fn)(void *, const void *, size_t);
typedef ssize_t (*amqp_socket_recv_fn)(void *, void *, size_t, int);
typedef int (*amqp_socket_open_fn)(void *, const char *, int, struct timeval *);
@@ -54,7 +53,6 @@ typedef void (*amqp_socket_delete_fn)(void *);
/** V-table for amqp_socket_t */
struct amqp_socket_class_t {
- amqp_socket_writev_fn writev;
amqp_socket_send_fn send;
amqp_socket_recv_fn recv;
amqp_socket_open_fn open;
@@ -69,17 +67,6 @@ struct amqp_socket_t_ {
};
-#ifdef _WIN32
-/* WinSock2 calls iovec WSABUF with different parameter names.
- * this is really a WSABUF with different names
- */
-struct iovec {
- u_long iov_len;
- char FAR *iov_base;
-};
-#endif
-
-
/**
* Set set the socket object for a connection
*
@@ -92,25 +79,6 @@ struct iovec {
void
amqp_set_socket(amqp_connection_state_t state, amqp_socket_t *socket);
-/**
- * Write to a socket.
- *
- * This function wraps writev(2) functionality.
- *
- * This function will only return on error, or when all of the bytes referred
- * to in iov have been sent. NOTE: this function may modify the iov struct.
- *
- * \param [in,out] self A socket object.
- * \param [in] iov One or more data vecors.
- * \param [in] iovcnt The number of vectors in \e iov.
- *
- * \return AMQP_STATUS_OK on success. amqp_status_enum value otherwise
- */
-ssize_t
-amqp_socket_writev(amqp_socket_t *self, struct iovec *iov, int iovcnt);
-
-ssize_t amqp_try_writev(amqp_connection_state_t state, struct iovec *iov,
- int iovcnt, amqp_time_t deadline);
/**
* Send a message from a socket.
diff --git a/librabbitmq/amqp_tcp_socket.c b/librabbitmq/amqp_tcp_socket.c
index 6d4ef45..74caee6 100644
--- a/librabbitmq/amqp_tcp_socket.c
+++ b/librabbitmq/amqp_tcp_socket.c
@@ -46,10 +46,11 @@ struct amqp_tcp_socket_t {
static ssize_t
-amqp_tcp_socket_send_inner(void *base, const void *buf, size_t len, int flags)
+amqp_tcp_socket_send(void *base, const void *buf, size_t len)
{
struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base;
ssize_t res;
+ int flags = 0;
if (-1 == self->sockfd) {
return AMQP_STATUS_SOCKET_CLOSED;
@@ -84,110 +85,6 @@ start:
}
static ssize_t
-amqp_tcp_socket_send(void *base, const void *buf, size_t len)
-{
- return amqp_tcp_socket_send_inner(base, buf, len, 0);
-}
-
-static ssize_t
-amqp_tcp_socket_writev(void *base, struct iovec *iov, int iovcnt)
-{
- struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base;
- ssize_t ret;
- if (-1 == self->sockfd) {
- return AMQP_STATUS_SOCKET_CLOSED;
- }
-
-#if defined(_WIN32)
- {
- DWORD res;
- /* Making the assumption here that WSAsend won't do a partial send
- * unless an error occured, in which case we're hosed so it doesn't matter
- */
- if (WSASend(self->sockfd, (LPWSABUF)iov, iovcnt, &res, 0, NULL, NULL) ==
- 0) {
- self->internal_error = 0;
- ret = res;
- } else {
- self->internal_error = WSAGetLastError();
- if (WSAEWOULDBLOCK == self->internal_error) {
- ret = AMQP_PRIVATE_STATUS_SOCKET_NEEDWRITE;
- } else {
- ret = AMQP_STATUS_SOCKET_ERROR;
- }
- }
- return ret;
- }
-
-#elif defined(SO_NOSIGPIPE) || !defined(MSG_NOSIGNAL)
- {
- int i;
- ssize_t len_left = 0;
-
- struct iovec *iov_left = iov;
- int iovcnt_left = iovcnt;
-
- for (i = 0; i < iovcnt; ++i) {
- len_left += iov[i].iov_len;
- }
-
- start:
- ret = writev(self->sockfd, iov, iovcnt);
-
- if (ret < 0) {
- self->internal_error = amqp_os_socket_error();
- switch (self->internal_error) {
- case EINTR:
- goto start;
- case EWOULDBLOCK:
-#if defined(EAGAIN) && EAGAIN != EWOULDBLOCK
- case EAGAIN:
-#endif
- ret = AMQP_PRIVATE_STATUS_SOCKET_NEEDWRITE;
- break;
- default:
- ret = AMQP_STATUS_SOCKET_ERROR;
- }
- }
- return ret;
- }
-
-#else
- {
- int i;
- size_t bytes = 0;
- void *bufferp;
-
- for (i = 0; i < iovcnt; ++i) {
- bytes += iov[i].iov_len;
- }
-
- if (self->buffer_length < bytes) {
- self->buffer = realloc(self->buffer, bytes);
- if (NULL == self->buffer) {
- self->buffer_length = 0;
- self->internal_error = 0;
- ret = AMQP_STATUS_NO_MEMORY;
- goto exit;
- }
- self->buffer_length = bytes;
- }
-
- bufferp = self->buffer;
- for (i = 0; i < iovcnt; ++i) {
- memcpy(bufferp, iov[i].iov_base, iov[i].iov_len);
- bufferp = (char*)bufferp + iov[i].iov_len;
- }
-
- ret = amqp_tcp_socket_send_inner(self, self->buffer, bytes, 0);
-
- exit:
- return ret;
- }
-#endif
-}
-
-static ssize_t
amqp_tcp_socket_recv(void *base, void *buf, size_t len, int flags)
{
struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base;
@@ -272,7 +169,6 @@ amqp_tcp_socket_delete(void *base)
}
static const struct amqp_socket_class_t amqp_tcp_socket_class = {
- amqp_tcp_socket_writev, /* writev */
amqp_tcp_socket_send, /* send */
amqp_tcp_socket_recv, /* recv */
amqp_tcp_socket_open, /* open */