summaryrefslogtreecommitdiff
path: root/librabbitmq/amqp_socket.c
diff options
context:
space:
mode:
Diffstat (limited to 'librabbitmq/amqp_socket.c')
-rw-r--r--librabbitmq/amqp_socket.c185
1 files changed, 92 insertions, 93 deletions
diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c
index 13f6376..f23b42b 100644
--- a/librabbitmq/amqp_socket.c
+++ b/librabbitmq/amqp_socket.c
@@ -59,8 +59,6 @@
#include "amqp_framing.h"
#include "amqp_private.h"
-#include "socket.h"
-
int amqp_open_socket(char const *hostname,
int portnumber)
@@ -98,37 +96,28 @@ int amqp_open_socket(char const *hostname,
return sockfd;
}
-static char *header() {
- static char header[8];
- header[0] = 'A';
- header[1] = 'M';
- header[2] = 'Q';
- header[3] = 'P';
- header[4] = 0;
- header[5] = AMQP_PROTOCOL_VERSION_MAJOR;
- header[6] = AMQP_PROTOCOL_VERSION_MINOR;
- header[7] = AMQP_PROTOCOL_VERSION_REVISION;
- return header;
-}
-
int amqp_send_header(amqp_connection_state_t state) {
- return send(state->sockfd, header(), 8, 0);
-}
-
-int amqp_send_header_to(amqp_connection_state_t state,
- amqp_output_fn_t fn,
- void *context)
-{
- return fn(context, header(), 8);
+ static const uint8_t header[8] = { 'A', 'M', 'Q', 'P', 0,
+ AMQP_PROTOCOL_VERSION_MAJOR,
+ AMQP_PROTOCOL_VERSION_MINOR,
+ AMQP_PROTOCOL_VERSION_REVISION };
+ return send(state->sockfd, (void *)header, 8, 0);
}
static amqp_bytes_t sasl_method_name(amqp_sasl_method_enum method) {
+ amqp_bytes_t res;
+
switch (method) {
- case AMQP_SASL_METHOD_PLAIN: return (amqp_bytes_t) {.len = 5, .bytes = "PLAIN"};
- default:
- amqp_assert(0, "Invalid SASL method: %d", (int) method);
+ case AMQP_SASL_METHOD_PLAIN:
+ res.bytes = "PLAIN";
+ res.len = 5;
+ break;
+
+ default:
+ amqp_abort("Invalid SASL method: %d", (int) method);
}
- abort(); /* unreachable */
+
+ return res;
}
static amqp_bytes_t sasl_response(amqp_pool_t *pool,
@@ -143,20 +132,23 @@ static amqp_bytes_t sasl_response(amqp_pool_t *pool,
size_t username_len = strlen(username);
char *password = va_arg(args, char *);
size_t password_len = strlen(password);
+ char *response_buf;
+
amqp_pool_alloc_bytes(pool, strlen(username) + strlen(password) + 2, &response);
- if (response.bytes == NULL) {
+ if (response.bytes == NULL)
/* We never request a zero-length block, because of the +2
above, so a NULL here really is ENOMEM. */
return response;
- }
- *BUF_AT(response, 0) = 0;
- memcpy(((char *) response.bytes) + 1, username, username_len);
- *BUF_AT(response, username_len + 1) = 0;
- memcpy(((char *) response.bytes) + username_len + 2, password, password_len);
+
+ response_buf = response.bytes;
+ response_buf[0] = 0;
+ memcpy(response_buf + 1, username, username_len);
+ response_buf[username_len + 1] = 0;
+ memcpy(response_buf + username_len + 2, password, password_len);
break;
}
default:
- amqp_assert(0, "Invalid SASL method: %d", (int) method);
+ amqp_abort("Invalid SASL method: %d", (int) method);
}
return response;
@@ -178,33 +170,37 @@ static int wait_frame_inner(amqp_connection_state_t state,
amqp_frame_t *decoded_frame)
{
while (1) {
- int result;
+ int res;
while (amqp_data_in_buffer(state)) {
amqp_bytes_t buffer;
buffer.len = state->sock_inbound_limit - state->sock_inbound_offset;
buffer.bytes = ((char *) state->sock_inbound_buffer.bytes) + state->sock_inbound_offset;
- AMQP_CHECK_RESULT((result = amqp_handle_input(state, buffer, decoded_frame)));
- state->sock_inbound_offset += result;
+
+ res = amqp_handle_input(state, buffer, decoded_frame);
+ if (res < 0)
+ return res;
+
+ state->sock_inbound_offset += res;
if (decoded_frame->frame_type != 0)
/* Complete frame was read. Return it. */
return 0;
/* Incomplete or ignored frame. Keep processing input. */
- assert(result != 0);
- }
+ assert(res != 0);
+ }
- result = recv(state->sockfd, state->sock_inbound_buffer.bytes,
+ res = recv(state->sockfd, state->sock_inbound_buffer.bytes,
state->sock_inbound_buffer.len, 0);
- if (result <= 0) {
- if (result == 0)
+ if (res <= 0) {
+ if (res == 0)
return -ERROR_CONNECTION_CLOSED;
else
return -amqp_socket_error();
}
- state->sock_inbound_limit = result;
+ state->sock_inbound_limit = res;
state->sock_inbound_offset = 0;
}
}
@@ -234,22 +230,22 @@ int amqp_simple_wait_method(amqp_connection_state_t state,
int res = amqp_simple_wait_frame(state, &frame);
if (res < 0)
return res;
-
- amqp_assert(frame.channel == expected_channel,
- "Expected 0x%08X method frame on channel %d, got frame on channel %d",
- expected_method,
- expected_channel,
- frame.channel);
- amqp_assert(frame.frame_type == AMQP_FRAME_METHOD,
- "Expected 0x%08X method frame on channel %d, got frame type %d",
- expected_method,
- expected_channel,
- frame.frame_type);
- amqp_assert(frame.payload.method.id == expected_method,
- "Expected method ID 0x%08X on channel %d, got ID 0x%08X",
- expected_method,
- expected_channel,
- frame.payload.method.id);
+
+ if (frame.channel != expected_channel)
+ amqp_abort("Expected 0x%08X method frame on channel %d, got frame on channel %d",
+ expected_method,
+ expected_channel,
+ frame.channel);
+ if (frame.frame_type != AMQP_FRAME_METHOD)
+ amqp_abort("Expected 0x%08X method frame on channel %d, got frame type %d",
+ expected_method,
+ expected_channel,
+ frame.frame_type);
+ if (frame.payload.method.id != expected_method)
+ amqp_abort("Expected method ID 0x%08X on channel %d, got ID 0x%08X",
+ expected_method,
+ expected_channel,
+ frame.payload.method.id);
*output = frame.payload.method;
return 0;
}
@@ -388,19 +384,23 @@ static int amqp_login_inner(amqp_connection_state_t state,
}
{
- amqp_bytes_t response_bytes = sasl_response(&state->decoding_pool, sasl_method, vl);
amqp_connection_start_ok_t s;
- if (response_bytes.bytes == NULL) {
+ amqp_bytes_t response_bytes = sasl_response(&state->decoding_pool,
+ sasl_method, vl);
+
+ if (response_bytes.bytes == NULL)
return -ERROR_NO_MEMORY;
- }
- s =
- (amqp_connection_start_ok_t) {
- .client_properties = {.num_entries = 0, .entries = NULL},
- .mechanism = sasl_method_name(sasl_method),
- .response = response_bytes,
- .locale = {.len = 5, .bytes = "en_US"}
- };
- AMQP_CHECK_RESULT(amqp_send_method(state, 0, AMQP_CONNECTION_START_OK_METHOD, &s));
+
+ s.client_properties.num_entries = 0;
+ s.client_properties.entries = NULL;
+ s.mechanism = sasl_method_name(sasl_method);
+ s.response = response_bytes;
+ s.locale.bytes = "en_US";
+ s.locale.len = 5;
+
+ res = amqp_send_method(state, 0, AMQP_CONNECTION_START_OK_METHOD, &s);
+ if (res < 0)
+ return res;
}
amqp_release_buffers(state);
@@ -409,7 +409,7 @@ static int amqp_login_inner(amqp_connection_state_t state,
&method);
if (res < 0)
return res;
-
+
{
amqp_connection_tune_t *s = (amqp_connection_tune_t *) method.decoded;
server_channel_max = s->channel_max;
@@ -417,28 +417,28 @@ static int amqp_login_inner(amqp_connection_state_t state,
server_heartbeat = s->heartbeat;
}
- if (server_channel_max != 0 && server_channel_max < channel_max) {
+ if (server_channel_max != 0 && server_channel_max < channel_max)
channel_max = server_channel_max;
- }
- if (server_frame_max != 0 && server_frame_max < frame_max) {
+ if (server_frame_max != 0 && server_frame_max < frame_max)
frame_max = server_frame_max;
- }
- if (server_heartbeat != 0 && server_heartbeat < heartbeat) {
+ if (server_heartbeat != 0 && server_heartbeat < heartbeat)
heartbeat = server_heartbeat;
- }
- AMQP_CHECK_RESULT(amqp_tune_connection(state, channel_max, frame_max, heartbeat));
+ res = amqp_tune_connection(state, channel_max, frame_max, heartbeat);
+ if (res < 0)
+ return res;
{
- amqp_connection_tune_ok_t s =
- (amqp_connection_tune_ok_t) {
- .channel_max = channel_max,
- .frame_max = frame_max,
- .heartbeat = heartbeat
- };
- AMQP_CHECK_RESULT(amqp_send_method(state, 0, AMQP_CONNECTION_TUNE_OK_METHOD, &s));
+ amqp_connection_tune_ok_t s;
+ s.frame_max = frame_max;
+ s.channel_max = channel_max;
+ s.heartbeat = heartbeat;
+
+ res = amqp_send_method(state, 0, AMQP_CONNECTION_TUNE_OK_METHOD, &s);
+ if (res < 0)
+ return res;
}
amqp_release_buffers(state);
@@ -470,21 +470,20 @@ amqp_rpc_reply_t amqp_login(amqp_connection_state_t state,
}
{
- amqp_connection_open_t s =
- (amqp_connection_open_t) {
- .virtual_host = amqp_cstring_bytes(vhost),
- .capabilities = {.len = 0, .bytes = NULL},
- .insist = 1
- };
amqp_method_number_t replies[] = { AMQP_CONNECTION_OPEN_OK_METHOD, 0 };
+ amqp_connection_open_t s;
+ s.virtual_host = amqp_cstring_bytes(vhost);
+ s.capabilities.len = 0;
+ s.capabilities.bytes = NULL;
+ s.insist = 1;
+
result = amqp_simple_rpc(state,
0,
AMQP_CONNECTION_OPEN_METHOD,
(amqp_method_number_t *) &replies,
&s);
- if (result.reply_type != AMQP_RESPONSE_NORMAL) {
+ if (result.reply_type != AMQP_RESPONSE_NORMAL)
return result;
- }
}
amqp_maybe_release_buffers(state);