diff options
author | Alan Antonuk <alan.antonuk@gmail.com> | 2015-04-27 22:19:30 -0700 |
---|---|---|
committer | Alan Antonuk <alan.antonuk@gmail.com> | 2015-05-03 20:24:03 -0700 |
commit | 43b6fc5ea0f36866dd79e9697e9e6a0135362e10 (patch) | |
tree | ee7916d391cf49284226aa08f2f39e9200f98c9f | |
parent | 2b8a417b27e221a7bf0201cdc4023ac05de0f108 (diff) | |
download | rabbitmq-c-43b6fc5ea0f36866dd79e9697e9e6a0135362e10.tar.gz |
Remove writev specific code.
-rw-r--r-- | librabbitmq/amqp_connection.c | 5 | ||||
-rw-r--r-- | librabbitmq/amqp_openssl.c | 38 | ||||
-rw-r--r-- | librabbitmq/amqp_socket.c | 50 | ||||
-rw-r--r-- | librabbitmq/amqp_socket.h | 32 | ||||
-rw-r--r-- | librabbitmq/amqp_tcp_socket.c | 108 |
5 files changed, 5 insertions, 228 deletions
diff --git a/librabbitmq/amqp_connection.c b/librabbitmq/amqp_connection.c index 6c30ce3..7168bb5 100644 --- a/librabbitmq/amqp_connection.c +++ b/librabbitmq/amqp_connection.c @@ -471,7 +471,8 @@ static int amqp_frame_to_bytes(const amqp_frame_t *frame, amqp_bytes_t buffer, case AMQP_FRAME_BODY: { const amqp_bytes_t *body = &frame->payload.body_fragment; - memcpy((char *)out_frame + HEADER_SIZE, body->bytes, body->len); + memcpy(amqp_offset(out_frame, HEADER_SIZE), body->bytes, body->len); + out_frame_len = body->len; break; } @@ -519,7 +520,7 @@ static int amqp_frame_to_bytes(const amqp_frame_t *frame, amqp_bytes_t buffer, } amqp_e32(out_frame, 3, out_frame_len); - amqp_e8(out_frame, out_frame_len + HEADER_SIZE, AMQP_FRAME_END); + amqp_e8(out_frame, HEADER_SIZE + out_frame_len, AMQP_FRAME_END); encoded->bytes = out_frame; encoded->len = out_frame_len + HEADER_SIZE + FOOTER_SIZE; diff --git a/librabbitmq/amqp_openssl.c b/librabbitmq/amqp_openssl.c index cd9707c..c225cdd 100644 --- a/librabbitmq/amqp_openssl.c +++ b/librabbitmq/amqp_openssl.c @@ -120,43 +120,6 @@ amqp_ssl_socket_send(void *base, } static ssize_t -amqp_ssl_socket_writev(void *base, - struct iovec *iov, - int iovcnt) -{ - struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; - ssize_t ret = -1; - char *bufferp; - size_t bytes; - int i; - if (-1 == self->sockfd) { - return AMQP_STATUS_SOCKET_CLOSED; - } - - bytes = 0; - for (i = 0; i < iovcnt; ++i) { - bytes += iov[i].iov_len; - } - if (self->length < bytes) { - self->buffer = realloc(self->buffer, bytes); - if (!self->buffer) { - self->length = 0; - ret = AMQP_STATUS_NO_MEMORY; - goto exit; - } - self->length = bytes; - } - bufferp = self->buffer; - for (i = 0; i < iovcnt; ++i) { - memcpy(bufferp, iov[i].iov_base, iov[i].iov_len); - bufferp += iov[i].iov_len; - } - ret = amqp_ssl_socket_send(self, self->buffer, bytes); -exit: - return ret; -} - -static ssize_t amqp_ssl_socket_recv(void *base, void *buf, size_t len, @@ -450,7 +413,6 @@ amqp_ssl_socket_delete(void *base) } static const struct amqp_socket_class_t amqp_ssl_socket_class = { - amqp_ssl_socket_writev, /* writev */ amqp_ssl_socket_send, /* send */ amqp_ssl_socket_recv, /* recv */ amqp_ssl_socket_open, /* open */ diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c index 18399b4..d69f778 100644 --- a/librabbitmq/amqp_socket.c +++ b/librabbitmq/amqp_socket.c @@ -194,14 +194,6 @@ amqp_os_socket_close(int sockfd) } ssize_t -amqp_socket_writev(amqp_socket_t *self, struct iovec *iov, int iovcnt) -{ - assert(self); - assert(self->klass->writev); - return self->klass->writev(self, iov, iovcnt); -} - -ssize_t amqp_socket_send(amqp_socket_t *self, const void *buf, size_t len) { assert(self); @@ -320,48 +312,6 @@ static int do_poll(amqp_connection_state_t state, int res, return res; } -ssize_t amqp_try_writev(amqp_connection_state_t state, struct iovec *iov, - int iovcnt, amqp_time_t deadline) { - int i; - ssize_t res; - struct iovec *iov_left = iov; - int iovcnt_left = iovcnt; - ssize_t len_left; - - len_left = 0; - for (i = 0; i < iovcnt_left; ++i) { - len_left += iov_left[i].iov_len; - } - -start_send: - res = amqp_socket_writev(state->socket, iov_left, iovcnt_left); - - if (res > 0) { - len_left -= res; - if (0 == len_left) { - return AMQP_STATUS_OK; - } - for (i = 0; i < iovcnt_left; ++i) { - if (res < (ssize_t)iov_left[i].iov_len) { - iov_left[i].iov_base = ((char *)iov_left[i].iov_base) + res; - iov_left[i].iov_len -= res; - - iovcnt_left -= i; - iov_left += i; - break; - } else { - res -= iov_left[i].iov_len; - } - } - goto start_send; - } - res = do_poll(state, res, deadline); - if (AMQP_STATUS_OK == res) { - goto start_send; - } - return res; -} - ssize_t amqp_try_send(amqp_connection_state_t state, const void *buf, size_t len, amqp_time_t deadline) { ssize_t res; diff --git a/librabbitmq/amqp_socket.h b/librabbitmq/amqp_socket.h index 813189c..fdf2b0f 100644 --- a/librabbitmq/amqp_socket.h +++ b/librabbitmq/amqp_socket.h @@ -44,7 +44,6 @@ int amqp_os_socket_close(int sockfd); /* Socket callbacks. */ -typedef ssize_t (*amqp_socket_writev_fn)(void *, struct iovec *, int); typedef ssize_t (*amqp_socket_send_fn)(void *, const void *, size_t); typedef ssize_t (*amqp_socket_recv_fn)(void *, void *, size_t, int); typedef int (*amqp_socket_open_fn)(void *, const char *, int, struct timeval *); @@ -54,7 +53,6 @@ typedef void (*amqp_socket_delete_fn)(void *); /** V-table for amqp_socket_t */ struct amqp_socket_class_t { - amqp_socket_writev_fn writev; amqp_socket_send_fn send; amqp_socket_recv_fn recv; amqp_socket_open_fn open; @@ -69,17 +67,6 @@ struct amqp_socket_t_ { }; -#ifdef _WIN32 -/* WinSock2 calls iovec WSABUF with different parameter names. - * this is really a WSABUF with different names - */ -struct iovec { - u_long iov_len; - char FAR *iov_base; -}; -#endif - - /** * Set set the socket object for a connection * @@ -92,25 +79,6 @@ struct iovec { void amqp_set_socket(amqp_connection_state_t state, amqp_socket_t *socket); -/** - * Write to a socket. - * - * This function wraps writev(2) functionality. - * - * This function will only return on error, or when all of the bytes referred - * to in iov have been sent. NOTE: this function may modify the iov struct. - * - * \param [in,out] self A socket object. - * \param [in] iov One or more data vecors. - * \param [in] iovcnt The number of vectors in \e iov. - * - * \return AMQP_STATUS_OK on success. amqp_status_enum value otherwise - */ -ssize_t -amqp_socket_writev(amqp_socket_t *self, struct iovec *iov, int iovcnt); - -ssize_t amqp_try_writev(amqp_connection_state_t state, struct iovec *iov, - int iovcnt, amqp_time_t deadline); /** * Send a message from a socket. diff --git a/librabbitmq/amqp_tcp_socket.c b/librabbitmq/amqp_tcp_socket.c index 6d4ef45..74caee6 100644 --- a/librabbitmq/amqp_tcp_socket.c +++ b/librabbitmq/amqp_tcp_socket.c @@ -46,10 +46,11 @@ struct amqp_tcp_socket_t { static ssize_t -amqp_tcp_socket_send_inner(void *base, const void *buf, size_t len, int flags) +amqp_tcp_socket_send(void *base, const void *buf, size_t len) { struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base; ssize_t res; + int flags = 0; if (-1 == self->sockfd) { return AMQP_STATUS_SOCKET_CLOSED; @@ -84,110 +85,6 @@ start: } static ssize_t -amqp_tcp_socket_send(void *base, const void *buf, size_t len) -{ - return amqp_tcp_socket_send_inner(base, buf, len, 0); -} - -static ssize_t -amqp_tcp_socket_writev(void *base, struct iovec *iov, int iovcnt) -{ - struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base; - ssize_t ret; - if (-1 == self->sockfd) { - return AMQP_STATUS_SOCKET_CLOSED; - } - -#if defined(_WIN32) - { - DWORD res; - /* Making the assumption here that WSAsend won't do a partial send - * unless an error occured, in which case we're hosed so it doesn't matter - */ - if (WSASend(self->sockfd, (LPWSABUF)iov, iovcnt, &res, 0, NULL, NULL) == - 0) { - self->internal_error = 0; - ret = res; - } else { - self->internal_error = WSAGetLastError(); - if (WSAEWOULDBLOCK == self->internal_error) { - ret = AMQP_PRIVATE_STATUS_SOCKET_NEEDWRITE; - } else { - ret = AMQP_STATUS_SOCKET_ERROR; - } - } - return ret; - } - -#elif defined(SO_NOSIGPIPE) || !defined(MSG_NOSIGNAL) - { - int i; - ssize_t len_left = 0; - - struct iovec *iov_left = iov; - int iovcnt_left = iovcnt; - - for (i = 0; i < iovcnt; ++i) { - len_left += iov[i].iov_len; - } - - start: - ret = writev(self->sockfd, iov, iovcnt); - - if (ret < 0) { - self->internal_error = amqp_os_socket_error(); - switch (self->internal_error) { - case EINTR: - goto start; - case EWOULDBLOCK: -#if defined(EAGAIN) && EAGAIN != EWOULDBLOCK - case EAGAIN: -#endif - ret = AMQP_PRIVATE_STATUS_SOCKET_NEEDWRITE; - break; - default: - ret = AMQP_STATUS_SOCKET_ERROR; - } - } - return ret; - } - -#else - { - int i; - size_t bytes = 0; - void *bufferp; - - for (i = 0; i < iovcnt; ++i) { - bytes += iov[i].iov_len; - } - - if (self->buffer_length < bytes) { - self->buffer = realloc(self->buffer, bytes); - if (NULL == self->buffer) { - self->buffer_length = 0; - self->internal_error = 0; - ret = AMQP_STATUS_NO_MEMORY; - goto exit; - } - self->buffer_length = bytes; - } - - bufferp = self->buffer; - for (i = 0; i < iovcnt; ++i) { - memcpy(bufferp, iov[i].iov_base, iov[i].iov_len); - bufferp = (char*)bufferp + iov[i].iov_len; - } - - ret = amqp_tcp_socket_send_inner(self, self->buffer, bytes, 0); - - exit: - return ret; - } -#endif -} - -static ssize_t amqp_tcp_socket_recv(void *base, void *buf, size_t len, int flags) { struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base; @@ -272,7 +169,6 @@ amqp_tcp_socket_delete(void *base) } static const struct amqp_socket_class_t amqp_tcp_socket_class = { - amqp_tcp_socket_writev, /* writev */ amqp_tcp_socket_send, /* send */ amqp_tcp_socket_recv, /* recv */ amqp_tcp_socket_open, /* open */ |