summaryrefslogtreecommitdiff
path: root/librabbitmq/amqp_socket.c
diff options
context:
space:
mode:
Diffstat (limited to 'librabbitmq/amqp_socket.c')
-rw-r--r--librabbitmq/amqp_socket.c497
1 files changed, 0 insertions, 497 deletions
diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c
deleted file mode 100644
index f23b42b..0000000
--- a/librabbitmq/amqp_socket.c
+++ /dev/null
@@ -1,497 +0,0 @@
-/*
- * ***** BEGIN LICENSE BLOCK *****
- * Version: MPL 1.1/GPL 2.0
- *
- * The contents of this file are subject to the Mozilla Public License
- * Version 1.1 (the "License"); you may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- * http://www.mozilla.org/MPL/
- *
- * Software distributed under the License is distributed on an "AS IS"
- * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
- * the License for the specific language governing rights and
- * limitations under the License.
- *
- * The Original Code is librabbitmq.
- *
- * The Initial Developers of the Original Code are LShift Ltd, Cohesive
- * Financial Technologies LLC, and Rabbit Technologies Ltd. Portions
- * created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, Cohesive
- * Financial Technologies LLC, or Rabbit Technologies Ltd are Copyright
- * (C) 2007-2008 LShift Ltd, Cohesive Financial Technologies LLC, and
- * Rabbit Technologies Ltd.
- *
- * Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
- * Ltd. Portions created by Cohesive Financial Technologies LLC are
- * Copyright (C) 2007-2009 Cohesive Financial Technologies
- * LLC. Portions created by Rabbit Technologies Ltd are Copyright (C)
- * 2007-2009 Rabbit Technologies Ltd.
- *
- * Portions created by Tony Garnock-Jones are Copyright (C) 2009-2010
- * LShift Ltd and Tony Garnock-Jones.
- *
- * All Rights Reserved.
- *
- * Contributor(s): ______________________________________.
- *
- * Alternatively, the contents of this file may be used under the terms
- * of the GNU General Public License Version 2 or later (the "GPL"), in
- * which case the provisions of the GPL are applicable instead of those
- * above. If you wish to allow use of your version of this file only
- * under the terms of the GPL, and not to allow others to use your
- * version of this file under the terms of the MPL, indicate your
- * decision by deleting the provisions above and replace them with the
- * notice and other provisions required by the GPL. If you do not
- * delete the provisions above, a recipient may use your version of
- * this file under the terms of any one of the MPL or the GPL.
- *
- * ***** END LICENSE BLOCK *****
- */
-
-#include <stdlib.h>
-#include <stdio.h>
-#include <string.h>
-#include <stdint.h>
-#include <stdarg.h>
-#include <assert.h>
-
-#include "amqp.h"
-#include "amqp_framing.h"
-#include "amqp_private.h"
-
-
-int amqp_open_socket(char const *hostname,
- int portnumber)
-{
- int sockfd, res;
- struct sockaddr_in addr;
- struct hostent *he;
- int one = 1; /* used as a buffer by setsockopt below */
-
- res = amqp_socket_init();
- if (res)
- return res;
-
- he = gethostbyname(hostname);
- if (he == NULL)
- return -ERROR_GETHOSTBYNAME_FAILED;
-
- addr.sin_family = AF_INET;
- addr.sin_port = htons(portnumber);
- addr.sin_addr.s_addr = * (uint32_t *) he->h_addr_list[0];
-
- sockfd = socket(PF_INET, SOCK_STREAM, 0);
- if (sockfd == -1)
- return -amqp_socket_error();
-
- if (amqp_socket_setsockopt(sockfd, IPPROTO_TCP, TCP_NODELAY, &one,
- sizeof(one)) < 0
- || connect(sockfd, (struct sockaddr *) &addr, sizeof(addr)) < 0)
- {
- res = -amqp_socket_error();
- amqp_socket_close(sockfd);
- return res;
- }
-
- return sockfd;
-}
-
-int amqp_send_header(amqp_connection_state_t state) {
- static const uint8_t header[8] = { 'A', 'M', 'Q', 'P', 0,
- AMQP_PROTOCOL_VERSION_MAJOR,
- AMQP_PROTOCOL_VERSION_MINOR,
- AMQP_PROTOCOL_VERSION_REVISION };
- return send(state->sockfd, (void *)header, 8, 0);
-}
-
-static amqp_bytes_t sasl_method_name(amqp_sasl_method_enum method) {
- amqp_bytes_t res;
-
- switch (method) {
- case AMQP_SASL_METHOD_PLAIN:
- res.bytes = "PLAIN";
- res.len = 5;
- break;
-
- default:
- amqp_abort("Invalid SASL method: %d", (int) method);
- }
-
- return res;
-}
-
-static amqp_bytes_t sasl_response(amqp_pool_t *pool,
- amqp_sasl_method_enum method,
- va_list args)
-{
- amqp_bytes_t response;
-
- switch (method) {
- case AMQP_SASL_METHOD_PLAIN: {
- char *username = va_arg(args, char *);
- size_t username_len = strlen(username);
- char *password = va_arg(args, char *);
- size_t password_len = strlen(password);
- char *response_buf;
-
- amqp_pool_alloc_bytes(pool, strlen(username) + strlen(password) + 2, &response);
- if (response.bytes == NULL)
- /* We never request a zero-length block, because of the +2
- above, so a NULL here really is ENOMEM. */
- return response;
-
- response_buf = response.bytes;
- response_buf[0] = 0;
- memcpy(response_buf + 1, username, username_len);
- response_buf[username_len + 1] = 0;
- memcpy(response_buf + username_len + 2, password, password_len);
- break;
- }
- default:
- amqp_abort("Invalid SASL method: %d", (int) method);
- }
-
- return response;
-}
-
-amqp_boolean_t amqp_frames_enqueued(amqp_connection_state_t state) {
- return (state->first_queued_frame != NULL);
-}
-
-/*
- * Check to see if we have data in our buffer. If this returns 1, we
- * will avoid an immediate blocking read in amqp_simple_wait_frame.
- */
-amqp_boolean_t amqp_data_in_buffer(amqp_connection_state_t state) {
- return (state->sock_inbound_offset < state->sock_inbound_limit);
-}
-
-static int wait_frame_inner(amqp_connection_state_t state,
- amqp_frame_t *decoded_frame)
-{
- while (1) {
- int res;
-
- while (amqp_data_in_buffer(state)) {
- amqp_bytes_t buffer;
- buffer.len = state->sock_inbound_limit - state->sock_inbound_offset;
- buffer.bytes = ((char *) state->sock_inbound_buffer.bytes) + state->sock_inbound_offset;
-
- res = amqp_handle_input(state, buffer, decoded_frame);
- if (res < 0)
- return res;
-
- state->sock_inbound_offset += res;
-
- if (decoded_frame->frame_type != 0)
- /* Complete frame was read. Return it. */
- return 0;
-
- /* Incomplete or ignored frame. Keep processing input. */
- assert(res != 0);
- }
-
- res = recv(state->sockfd, state->sock_inbound_buffer.bytes,
- state->sock_inbound_buffer.len, 0);
- if (res <= 0) {
- if (res == 0)
- return -ERROR_CONNECTION_CLOSED;
- else
- return -amqp_socket_error();
- }
-
- state->sock_inbound_limit = res;
- state->sock_inbound_offset = 0;
- }
-}
-
-int amqp_simple_wait_frame(amqp_connection_state_t state,
- amqp_frame_t *decoded_frame)
-{
- if (state->first_queued_frame != NULL) {
- amqp_frame_t *f = (amqp_frame_t *) state->first_queued_frame->data;
- state->first_queued_frame = state->first_queued_frame->next;
- if (state->first_queued_frame == NULL) {
- state->last_queued_frame = NULL;
- }
- *decoded_frame = *f;
- return 0;
- } else {
- return wait_frame_inner(state, decoded_frame);
- }
-}
-
-int amqp_simple_wait_method(amqp_connection_state_t state,
- amqp_channel_t expected_channel,
- amqp_method_number_t expected_method,
- amqp_method_t *output)
-{
- amqp_frame_t frame;
- int res = amqp_simple_wait_frame(state, &frame);
- if (res < 0)
- return res;
-
- if (frame.channel != expected_channel)
- amqp_abort("Expected 0x%08X method frame on channel %d, got frame on channel %d",
- expected_method,
- expected_channel,
- frame.channel);
- if (frame.frame_type != AMQP_FRAME_METHOD)
- amqp_abort("Expected 0x%08X method frame on channel %d, got frame type %d",
- expected_method,
- expected_channel,
- frame.frame_type);
- if (frame.payload.method.id != expected_method)
- amqp_abort("Expected method ID 0x%08X on channel %d, got ID 0x%08X",
- expected_method,
- expected_channel,
- frame.payload.method.id);
- *output = frame.payload.method;
- return 0;
-}
-
-int amqp_send_method(amqp_connection_state_t state,
- amqp_channel_t channel,
- amqp_method_number_t id,
- void *decoded)
-{
- amqp_frame_t frame;
-
- frame.frame_type = AMQP_FRAME_METHOD;
- frame.channel = channel;
- frame.payload.method.id = id;
- frame.payload.method.decoded = decoded;
- return amqp_send_frame(state, &frame);
-}
-
-static int amqp_id_in_reply_list( amqp_method_number_t expected, amqp_method_number_t *list )
-{
- while ( *list != 0 ) {
- if ( *list == expected ) return 1;
- list++;
- }
- return 0;
-}
-
-amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state,
- amqp_channel_t channel,
- amqp_method_number_t request_id,
- amqp_method_number_t *expected_reply_ids,
- void *decoded_request_method)
-{
- int status;
- amqp_rpc_reply_t result;
-
- memset(&result, 0, sizeof(result));
-
- status = amqp_send_method(state, channel, request_id, decoded_request_method);
- if (status < 0) {
- result.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
- result.library_error = -status;
- return result;
- }
-
- {
- amqp_frame_t frame;
-
- retry:
- status = wait_frame_inner(state, &frame);
- if (status < 0) {
- result.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
- result.library_error = -status;
- return result;
- }
-
- /*
- * We store the frame for later processing unless it's something
- * that directly affects us here, namely a method frame that is
- * either
- * - on the channel we want, and of the expected type, or
- * - on the channel we want, and a channel.close frame, or
- * - on channel zero, and a connection.close frame.
- */
- if (!( (frame.frame_type == AMQP_FRAME_METHOD) &&
- ( ((frame.channel == channel) &&
- ((amqp_id_in_reply_list(frame.payload.method.id, expected_reply_ids)) ||
- (frame.payload.method.id == AMQP_CHANNEL_CLOSE_METHOD)))
- ||
- ((frame.channel == 0) &&
- (frame.payload.method.id == AMQP_CONNECTION_CLOSE_METHOD)) ) ))
- {
- amqp_frame_t *frame_copy = amqp_pool_alloc(&state->decoding_pool, sizeof(amqp_frame_t));
- amqp_link_t *link = amqp_pool_alloc(&state->decoding_pool, sizeof(amqp_link_t));
-
- if (frame_copy == NULL || link == NULL) {
- result.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
- result.library_error = ERROR_NO_MEMORY;
- return result;
- }
-
- *frame_copy = frame;
-
- link->next = NULL;
- link->data = frame_copy;
-
- if (state->last_queued_frame == NULL) {
- state->first_queued_frame = link;
- } else {
- state->last_queued_frame->next = link;
- }
- state->last_queued_frame = link;
-
- goto retry;
- }
-
- result.reply_type = (amqp_id_in_reply_list(frame.payload.method.id, expected_reply_ids))
- ? AMQP_RESPONSE_NORMAL
- : AMQP_RESPONSE_SERVER_EXCEPTION;
-
- result.reply = frame.payload.method;
- return result;
- }
-}
-
-static int amqp_login_inner(amqp_connection_state_t state,
- int channel_max,
- int frame_max,
- int heartbeat,
- amqp_sasl_method_enum sasl_method,
- va_list vl)
-{
- int res;
- amqp_method_t method;
- uint32_t server_frame_max;
- uint16_t server_channel_max;
- uint16_t server_heartbeat;
-
- amqp_send_header(state);
-
- res = amqp_simple_wait_method(state, 0, AMQP_CONNECTION_START_METHOD,
- &method);
- if (res < 0)
- return res;
-
- {
- amqp_connection_start_t *s = (amqp_connection_start_t *) method.decoded;
- if ((s->version_major != AMQP_PROTOCOL_VERSION_MAJOR) ||
- (s->version_minor != AMQP_PROTOCOL_VERSION_MINOR)) {
- return -ERROR_INCOMPATIBLE_AMQP_VERSION;
- }
-
- /* TODO: check that our chosen SASL mechanism is in the list of
- acceptable mechanisms. Or even let the application choose from
- the list! */
- }
-
- {
- amqp_connection_start_ok_t s;
- amqp_bytes_t response_bytes = sasl_response(&state->decoding_pool,
- sasl_method, vl);
-
- if (response_bytes.bytes == NULL)
- return -ERROR_NO_MEMORY;
-
- s.client_properties.num_entries = 0;
- s.client_properties.entries = NULL;
- s.mechanism = sasl_method_name(sasl_method);
- s.response = response_bytes;
- s.locale.bytes = "en_US";
- s.locale.len = 5;
-
- res = amqp_send_method(state, 0, AMQP_CONNECTION_START_OK_METHOD, &s);
- if (res < 0)
- return res;
- }
-
- amqp_release_buffers(state);
-
- res = amqp_simple_wait_method(state, 0, AMQP_CONNECTION_TUNE_METHOD,
- &method);
- if (res < 0)
- return res;
-
- {
- amqp_connection_tune_t *s = (amqp_connection_tune_t *) method.decoded;
- server_channel_max = s->channel_max;
- server_frame_max = s->frame_max;
- server_heartbeat = s->heartbeat;
- }
-
- if (server_channel_max != 0 && server_channel_max < channel_max)
- channel_max = server_channel_max;
-
- if (server_frame_max != 0 && server_frame_max < frame_max)
- frame_max = server_frame_max;
-
- if (server_heartbeat != 0 && server_heartbeat < heartbeat)
- heartbeat = server_heartbeat;
-
- res = amqp_tune_connection(state, channel_max, frame_max, heartbeat);
- if (res < 0)
- return res;
-
- {
- amqp_connection_tune_ok_t s;
- s.frame_max = frame_max;
- s.channel_max = channel_max;
- s.heartbeat = heartbeat;
-
- res = amqp_send_method(state, 0, AMQP_CONNECTION_TUNE_OK_METHOD, &s);
- if (res < 0)
- return res;
- }
-
- amqp_release_buffers(state);
-
- return 0;
-}
-
-amqp_rpc_reply_t amqp_login(amqp_connection_state_t state,
- char const *vhost,
- int channel_max,
- int frame_max,
- int heartbeat,
- amqp_sasl_method_enum sasl_method,
- ...)
-{
- va_list vl;
- amqp_rpc_reply_t result;
- int status;
-
- va_start(vl, sasl_method);
-
- status = amqp_login_inner(state, channel_max, frame_max, heartbeat, sasl_method, vl);
- if (status < 0) {
- result.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
- result.reply.id = 0;
- result.reply.decoded = NULL;
- result.library_error = -status;
- return result;
- }
-
- {
- amqp_method_number_t replies[] = { AMQP_CONNECTION_OPEN_OK_METHOD, 0 };
- amqp_connection_open_t s;
- s.virtual_host = amqp_cstring_bytes(vhost);
- s.capabilities.len = 0;
- s.capabilities.bytes = NULL;
- s.insist = 1;
-
- result = amqp_simple_rpc(state,
- 0,
- AMQP_CONNECTION_OPEN_METHOD,
- (amqp_method_number_t *) &replies,
- &s);
- if (result.reply_type != AMQP_RESPONSE_NORMAL)
- return result;
- }
- amqp_maybe_release_buffers(state);
-
- va_end(vl);
-
- result.reply_type = AMQP_RESPONSE_NORMAL;
- result.reply.id = 0;
- result.reply.decoded = NULL;
- result.library_error = 0;
- return result;
-}