diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2011-05-12 13:08:45 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2011-05-12 13:08:45 +0100 |
commit | 36a1423f8ad3626303e24ba34abb2ba7429e91bc (patch) | |
tree | 9bacec0e0273010e6f44d386ebc4fdfaaafda470 /librabbitmq | |
parent | 11a5577f7c6ee26ec832bc806567e1909449dfc4 (diff) | |
download | rabbitmq-c-github-ask-bug24079.tar.gz |
Pre-junk. rabbitmq-c is not in fact a plugin.bug24079
Diffstat (limited to 'librabbitmq')
-rw-r--r-- | librabbitmq/Makefile.am | 35 | ||||
-rw-r--r-- | librabbitmq/amqp.h | 478 | ||||
-rw-r--r-- | librabbitmq/amqp_api.c | 489 | ||||
-rw-r--r-- | librabbitmq/amqp_connection.c | 436 | ||||
-rw-r--r-- | librabbitmq/amqp_mem.c | 200 | ||||
-rw-r--r-- | librabbitmq/amqp_private.h | 251 | ||||
-rw-r--r-- | librabbitmq/amqp_socket.c | 497 | ||||
-rw-r--r-- | librabbitmq/amqp_table.c | 441 | ||||
-rw-r--r-- | librabbitmq/codegen.py | 531 | ||||
-rw-r--r-- | librabbitmq/unix/socket.c | 92 | ||||
-rw-r--r-- | librabbitmq/unix/socket.h | 79 | ||||
-rw-r--r-- | librabbitmq/windows/socket.c | 98 | ||||
-rw-r--r-- | librabbitmq/windows/socket.h | 89 |
13 files changed, 0 insertions, 3716 deletions
diff --git a/librabbitmq/Makefile.am b/librabbitmq/Makefile.am deleted file mode 100644 index 1217c49..0000000 --- a/librabbitmq/Makefile.am +++ /dev/null @@ -1,35 +0,0 @@ -lib_LTLIBRARIES = librabbitmq.la - -AM_CFLAGS = -I$(srcdir)/$(PLATFORM_DIR) -DBUILDING_LIBRABBITMQ - -if GCC -# Because we want to build under Microsoft's C compiler (for which -# there is apparently no demand for C99 support), it's a good idea -# to have gcc tell us when we stray from the old standard. -AM_CFLAGS += -ansi -pedantic -endif - -if USE_MSINTTYPES -AM_CFLAGS += -I$(top_srcdir)/msinttypes -endif - -librabbitmq_la_SOURCES = amqp_mem.c amqp_table.c amqp_connection.c amqp_socket.c amqp_api.c $(PLATFORM_DIR)/socket.c -librabbitmq_la_LDFLAGS = -no-undefined -librabbitmq_la_LIBADD = $(EXTRA_LIBS) -nodist_librabbitmq_la_SOURCES = amqp_framing.c -include_HEADERS = amqp_framing.h amqp.h -noinst_HEADERS = amqp_private.h $(PLATFORM_DIR)/socket.h -BUILT_SOURCES = amqp_framing.h amqp_framing.c -CLEANFILES = amqp_framing.h amqp_framing.c -EXTRA_DIST = \ - codegen.py \ - unix/socket.c unix/socket.h \ - windows/socket.c windows/socket.h - -CODEGEN_PY=$(srcdir)/codegen.py - -amqp_framing.h: $(top_srcdir)/$(AMQP_SPEC_JSON_PATH) $(CODEGEN_PY) - PYTHONPATH=$(top_srcdir)/$(AMQP_CODEGEN_DIR) $(PYTHON) $(CODEGEN_PY) header $< $@ - -amqp_framing.c: $(top_srcdir)/$(AMQP_SPEC_JSON_PATH) $(CODEGEN_PY) - PYTHONPATH=$(top_srcdir)/$(AMQP_CODEGEN_DIR) $(PYTHON) $(CODEGEN_PY) body $< $@ diff --git a/librabbitmq/amqp.h b/librabbitmq/amqp.h deleted file mode 100644 index 1541583..0000000 --- a/librabbitmq/amqp.h +++ /dev/null @@ -1,478 +0,0 @@ -#ifndef librabbitmq_amqp_h -#define librabbitmq_amqp_h - -/* - * ***** BEGIN LICENSE BLOCK ***** - * Version: MPL 1.1/GPL 2.0 - * - * The contents of this file are subject to the Mozilla Public License - * Version 1.1 (the "License"); you may not use this file except in - * compliance with the License. You may obtain a copy of the License at - * http://www.mozilla.org/MPL/ - * - * Software distributed under the License is distributed on an "AS IS" - * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See - * the License for the specific language governing rights and - * limitations under the License. - * - * The Original Code is librabbitmq. - * - * The Initial Developers of the Original Code are LShift Ltd, Cohesive - * Financial Technologies LLC, and Rabbit Technologies Ltd. Portions - * created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, Cohesive - * Financial Technologies LLC, or Rabbit Technologies Ltd are Copyright - * (C) 2007-2008 LShift Ltd, Cohesive Financial Technologies LLC, and - * Rabbit Technologies Ltd. - * - * Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift - * Ltd. Portions created by Cohesive Financial Technologies LLC are - * Copyright (C) 2007-2009 Cohesive Financial Technologies - * LLC. Portions created by Rabbit Technologies Ltd are Copyright (C) - * 2007-2009 Rabbit Technologies Ltd. - * - * Portions created by Tony Garnock-Jones are Copyright (C) 2009-2010 - * LShift Ltd and Tony Garnock-Jones. - * - * All Rights Reserved. - * - * Contributor(s): ______________________________________. - * - * Alternatively, the contents of this file may be used under the terms - * of the GNU General Public License Version 2 or later (the "GPL"), in - * which case the provisions of the GPL are applicable instead of those - * above. If you wish to allow use of your version of this file only - * under the terms of the GPL, and not to allow others to use your - * version of this file under the terms of the MPL, indicate your - * decision by deleting the provisions above and replace them with the - * notice and other provisions required by the GPL. If you do not - * delete the provisions above, a recipient may use your version of - * this file under the terms of any one of the MPL or the GPL. - * - * ***** END LICENSE BLOCK ***** - */ - -#ifdef __cplusplus -extern "C" { -#endif - -#ifdef _WIN32 -#ifdef BUILDING_LIBRABBITMQ -#define RABBITMQ_EXPORT extern __declspec(dllexport) -#else -#define RABBITMQ_EXPORT extern __declspec(dllimport) -#endif -#else -#define RABBITMQ_EXPORT extern -#endif - -typedef int amqp_boolean_t; -typedef uint32_t amqp_method_number_t; -typedef uint32_t amqp_flags_t; -typedef uint16_t amqp_channel_t; - -typedef struct amqp_bytes_t_ { - size_t len; - void *bytes; -} amqp_bytes_t; - -typedef struct amqp_decimal_t_ { - uint8_t decimals; - uint32_t value; -} amqp_decimal_t; - -typedef struct amqp_table_t_ { - int num_entries; - struct amqp_table_entry_t_ *entries; -} amqp_table_t; - -typedef struct amqp_array_t_ { - int num_entries; - struct amqp_field_value_t_ *entries; -} amqp_array_t; - -/* - 0-9 0-9-1 Qpid/Rabbit Type Remarks ---------------------------------------------------------------------------- - t t Boolean - b b Signed 8-bit - B Unsigned 8-bit - U s Signed 16-bit (A1) - u Unsigned 16-bit - I I I Signed 32-bit - i Unsigned 32-bit - L l Signed 64-bit (B) - l Unsigned 64-bit - f f 32-bit float - d d 64-bit float - D D D Decimal - s Short string (A2) - S S S Long string - A Nested Array - T T T Timestamp (u64) - F F F Nested Table - V V V Void - x Byte array - -Remarks: - - A1, A2: Notice how the types **CONFLICT** here. In Qpid and Rabbit, - 's' means a signed 16-bit integer; in 0-9-1, it means a - short string. - - B: Notice how the signednesses **CONFLICT** here. In Qpid and Rabbit, - 'l' means a signed 64-bit integer; in 0-9-1, it means an unsigned - 64-bit integer. - -I'm going with the Qpid/Rabbit types, where there's a conflict, and -the 0-9-1 types otherwise. 0-8 is a subset of 0-9, which is a subset -of the other two, so this will work for both 0-8 and 0-9-1 branches of -the code. -*/ - -typedef struct amqp_field_value_t_ { - uint8_t kind; - union { - amqp_boolean_t boolean; - int8_t i8; - uint8_t u8; - int16_t i16; - uint16_t u16; - int32_t i32; - uint32_t u32; - int64_t i64; - uint64_t u64; - float f32; - double f64; - amqp_decimal_t decimal; - amqp_bytes_t bytes; - amqp_table_t table; - amqp_array_t array; - } value; -} amqp_field_value_t; - -typedef struct amqp_table_entry_t_ { - amqp_bytes_t key; - amqp_field_value_t value; -} amqp_table_entry_t; - -typedef enum { - AMQP_FIELD_KIND_BOOLEAN = 't', - AMQP_FIELD_KIND_I8 = 'b', - AMQP_FIELD_KIND_U8 = 'B', - AMQP_FIELD_KIND_I16 = 's', - AMQP_FIELD_KIND_U16 = 'u', - AMQP_FIELD_KIND_I32 = 'I', - AMQP_FIELD_KIND_U32 = 'i', - AMQP_FIELD_KIND_I64 = 'l', - AMQP_FIELD_KIND_U64 = 'L', - AMQP_FIELD_KIND_F32 = 'f', - AMQP_FIELD_KIND_F64 = 'd', - AMQP_FIELD_KIND_DECIMAL = 'D', - AMQP_FIELD_KIND_UTF8 = 'S', - AMQP_FIELD_KIND_ARRAY = 'A', - AMQP_FIELD_KIND_TIMESTAMP = 'T', - AMQP_FIELD_KIND_TABLE = 'F', - AMQP_FIELD_KIND_VOID = 'V', - AMQP_FIELD_KIND_BYTES = 'x' -} amqp_field_value_kind_t; - -typedef struct amqp_pool_blocklist_t_ { - int num_blocks; - void **blocklist; -} amqp_pool_blocklist_t; - -typedef struct amqp_pool_t_ { - size_t pagesize; - - amqp_pool_blocklist_t pages; - amqp_pool_blocklist_t large_blocks; - - int next_page; - char *alloc_block; - size_t alloc_used; -} amqp_pool_t; - -typedef struct amqp_method_t_ { - amqp_method_number_t id; - void *decoded; -} amqp_method_t; - -typedef struct amqp_frame_t_ { - uint8_t frame_type; /* 0 means no event */ - amqp_channel_t channel; - union { - amqp_method_t method; - struct { - uint16_t class_id; - uint64_t body_size; - void *decoded; - amqp_bytes_t raw; - } properties; - amqp_bytes_t body_fragment; - struct { - uint8_t transport_high; - uint8_t transport_low; - uint8_t protocol_version_major; - uint8_t protocol_version_minor; - } protocol_header; - } payload; -} amqp_frame_t; - -typedef enum amqp_response_type_enum_ { - AMQP_RESPONSE_NONE = 0, - AMQP_RESPONSE_NORMAL, - AMQP_RESPONSE_LIBRARY_EXCEPTION, - AMQP_RESPONSE_SERVER_EXCEPTION -} amqp_response_type_enum; - -typedef struct amqp_rpc_reply_t_ { - amqp_response_type_enum reply_type; - amqp_method_t reply; - int library_error; /* if AMQP_RESPONSE_LIBRARY_EXCEPTION, then 0 here means socket EOF */ -} amqp_rpc_reply_t; - -typedef enum amqp_sasl_method_enum_ { - AMQP_SASL_METHOD_PLAIN = 0 -} amqp_sasl_method_enum; - -/* Opaque struct. */ -typedef struct amqp_connection_state_t_ *amqp_connection_state_t; - -RABBITMQ_EXPORT char const *amqp_version(void); - -/* Exported empty data structures */ -RABBITMQ_EXPORT const amqp_bytes_t amqp_empty_bytes; -RABBITMQ_EXPORT const amqp_table_t amqp_empty_table; -RABBITMQ_EXPORT const amqp_array_t amqp_empty_array; - -/* Compatibility macros for the above, to avoid the need to update - code written against earlier versions of librabbitmq. */ -#define AMQP_EMPTY_BYTES amqp_empty_bytes -#define AMQP_EMPTY_TABLE amqp_empty_table -#define AMQP_EMPTY_ARRAY amqp_empty_array - -RABBITMQ_EXPORT void init_amqp_pool(amqp_pool_t *pool, size_t pagesize); -RABBITMQ_EXPORT void recycle_amqp_pool(amqp_pool_t *pool); -RABBITMQ_EXPORT void empty_amqp_pool(amqp_pool_t *pool); - -RABBITMQ_EXPORT void *amqp_pool_alloc(amqp_pool_t *pool, size_t amount); -RABBITMQ_EXPORT void amqp_pool_alloc_bytes(amqp_pool_t *pool, - size_t amount, amqp_bytes_t *output); - -RABBITMQ_EXPORT amqp_bytes_t amqp_cstring_bytes(char const *cstr); -RABBITMQ_EXPORT amqp_bytes_t amqp_bytes_malloc_dup(amqp_bytes_t src); -RABBITMQ_EXPORT amqp_bytes_t amqp_bytes_malloc(size_t amount); -RABBITMQ_EXPORT void amqp_bytes_free(amqp_bytes_t bytes); - -RABBITMQ_EXPORT amqp_connection_state_t amqp_new_connection(void); -RABBITMQ_EXPORT int amqp_get_sockfd(amqp_connection_state_t state); -RABBITMQ_EXPORT void amqp_set_sockfd(amqp_connection_state_t state, - int sockfd); -RABBITMQ_EXPORT int amqp_tune_connection(amqp_connection_state_t state, - int channel_max, - int frame_max, - int heartbeat); -RABBITMQ_EXPORT int amqp_get_channel_max(amqp_connection_state_t state); -RABBITMQ_EXPORT int amqp_destroy_connection(amqp_connection_state_t state); - -RABBITMQ_EXPORT int amqp_handle_input(amqp_connection_state_t state, - amqp_bytes_t received_data, - amqp_frame_t *decoded_frame); - -RABBITMQ_EXPORT amqp_boolean_t amqp_release_buffers_ok( - amqp_connection_state_t state); - -RABBITMQ_EXPORT void amqp_release_buffers(amqp_connection_state_t state); - -RABBITMQ_EXPORT void amqp_maybe_release_buffers(amqp_connection_state_t state); - -RABBITMQ_EXPORT int amqp_send_frame(amqp_connection_state_t state, - amqp_frame_t const *frame); - -RABBITMQ_EXPORT int amqp_table_entry_cmp(void const *entry1, - void const *entry2); - -RABBITMQ_EXPORT int amqp_open_socket(char const *hostname, - int portnumber); - -RABBITMQ_EXPORT int amqp_send_header(amqp_connection_state_t state); - -RABBITMQ_EXPORT amqp_boolean_t amqp_frames_enqueued( - amqp_connection_state_t state); - -RABBITMQ_EXPORT int amqp_simple_wait_frame(amqp_connection_state_t state, - amqp_frame_t *decoded_frame); - -RABBITMQ_EXPORT int amqp_simple_wait_method(amqp_connection_state_t state, - amqp_channel_t expected_channel, - amqp_method_number_t expected_method, - amqp_method_t *output); - -RABBITMQ_EXPORT int amqp_send_method(amqp_connection_state_t state, - amqp_channel_t channel, - amqp_method_number_t id, - void *decoded); - -RABBITMQ_EXPORT amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state, - amqp_channel_t channel, - amqp_method_number_t request_id, - amqp_method_number_t *expected_reply_ids, - void *decoded_request_method); - -RABBITMQ_EXPORT amqp_rpc_reply_t amqp_login(amqp_connection_state_t state, - char const *vhost, - int channel_max, - int frame_max, - int heartbeat, - amqp_sasl_method_enum sasl_method, ...); - -RABBITMQ_EXPORT struct amqp_channel_open_ok_t_ *amqp_channel_open( - amqp_connection_state_t state, - amqp_channel_t channel); - -struct amqp_basic_properties_t_; -RABBITMQ_EXPORT int amqp_basic_publish(amqp_connection_state_t state, - amqp_channel_t channel, - amqp_bytes_t exchange, - amqp_bytes_t routing_key, - amqp_boolean_t mandatory, - amqp_boolean_t immediate, - struct amqp_basic_properties_t_ const *properties, - amqp_bytes_t body); - -RABBITMQ_EXPORT amqp_rpc_reply_t amqp_channel_close( - amqp_connection_state_t state, - amqp_channel_t channel, - int code); -RABBITMQ_EXPORT amqp_rpc_reply_t amqp_connection_close( - amqp_connection_state_t state, - int code); - -RABBITMQ_EXPORT struct amqp_exchange_declare_ok_t_ *amqp_exchange_declare( - amqp_connection_state_t state, - amqp_channel_t channel, - amqp_bytes_t exchange, - amqp_bytes_t type, - amqp_boolean_t passive, - amqp_boolean_t durable, - amqp_table_t arguments); - -RABBITMQ_EXPORT struct amqp_queue_declare_ok_t_ *amqp_queue_declare( - amqp_connection_state_t state, - amqp_channel_t channel, - amqp_bytes_t queue, - amqp_boolean_t passive, - amqp_boolean_t durable, - amqp_boolean_t exclusive, - amqp_boolean_t auto_delete, - amqp_table_t arguments); - -RABBITMQ_EXPORT struct amqp_queue_delete_ok_t_ *amqp_queue_delete( - amqp_connection_state_t state, - amqp_channel_t channel, - amqp_bytes_t queue, - amqp_boolean_t if_unused, - amqp_boolean_t if_empty); - -RABBITMQ_EXPORT struct amqp_queue_bind_ok_t_ *amqp_queue_bind( - amqp_connection_state_t state, - amqp_channel_t channel, - amqp_bytes_t queue, - amqp_bytes_t exchange, - amqp_bytes_t routing_key, - amqp_table_t arguments); - -RABBITMQ_EXPORT struct amqp_queue_unbind_ok_t_ *amqp_queue_unbind( - amqp_connection_state_t state, - amqp_channel_t channel, - amqp_bytes_t queue, - amqp_bytes_t exchange, - amqp_bytes_t routing_key, - amqp_table_t arguments); - -RABBITMQ_EXPORT struct amqp_basic_consume_ok_t_ *amqp_basic_consume( - amqp_connection_state_t state, - amqp_channel_t channel, - amqp_bytes_t queue, - amqp_bytes_t consumer_tag, - amqp_boolean_t no_local, - amqp_boolean_t no_ack, - amqp_boolean_t exclusive, - amqp_table_t filter); - -RABBITMQ_EXPORT int amqp_basic_ack(amqp_connection_state_t state, - amqp_channel_t channel, - uint64_t delivery_tag, - amqp_boolean_t multiple); - -RABBITMQ_EXPORT amqp_rpc_reply_t amqp_basic_get(amqp_connection_state_t state, - amqp_channel_t channel, - amqp_bytes_t queue, - amqp_boolean_t no_ack); - -RABBITMQ_EXPORT struct amqp_queue_purge_ok_t_ *amqp_queue_purge( - amqp_connection_state_t state, - amqp_channel_t channel, - amqp_bytes_t queue, - amqp_boolean_t no_wait); - -RABBITMQ_EXPORT struct amqp_tx_select_ok_t_ *amqp_tx_select( - amqp_connection_state_t state, - amqp_channel_t channel); - -RABBITMQ_EXPORT struct amqp_tx_commit_ok_t_ *amqp_tx_commit( - amqp_connection_state_t state, - amqp_channel_t channel); - -RABBITMQ_EXPORT struct amqp_tx_rollback_ok_t_ *amqp_tx_rollback( - amqp_connection_state_t state, - amqp_channel_t channel); - -/* - * Can be used to see if there is data still in the buffer, if so - * calling amqp_simple_wait_frame will not immediately enter a - * blocking read. - * - * Possibly amqp_frames_enqueued should be used for this? - */ -RABBITMQ_EXPORT amqp_boolean_t amqp_data_in_buffer( - amqp_connection_state_t state); - -/* - * For those API operations (such as amqp_basic_ack, - * amqp_queue_declare, and so on) that do not themselves return - * amqp_rpc_reply_t instances, we need some way of discovering what, - * if anything, went wrong. amqp_get_rpc_reply() returns the most - * recent amqp_rpc_reply_t instance corresponding to such an API - * operation for the given connection. - * - * Only use it for operations that do not themselves return - * amqp_rpc_reply_t; operations that do return amqp_rpc_reply_t - * generally do NOT update this per-connection-global amqp_rpc_reply_t - * instance. - */ -RABBITMQ_EXPORT amqp_rpc_reply_t amqp_get_rpc_reply( - amqp_connection_state_t state); - -/* - * Get the error string for the given error code. - * - * The returned string resides on the heap; the caller is responsible - * for freeing it. - */ -RABBITMQ_EXPORT char *amqp_error_string(int err); - -RABBITMQ_EXPORT int amqp_decode_table(amqp_bytes_t encoded, - amqp_pool_t *pool, - amqp_table_t *output, - size_t *offset); - -RABBITMQ_EXPORT int amqp_encode_table(amqp_bytes_t encoded, - amqp_table_t *input, - size_t *offset); - -#ifdef __cplusplus -} -#endif - -#endif diff --git a/librabbitmq/amqp_api.c b/librabbitmq/amqp_api.c deleted file mode 100644 index bf19761..0000000 --- a/librabbitmq/amqp_api.c +++ /dev/null @@ -1,489 +0,0 @@ -/* - * ***** BEGIN LICENSE BLOCK ***** - * Version: MPL 1.1/GPL 2.0 - * - * The contents of this file are subject to the Mozilla Public License - * Version 1.1 (the "License"); you may not use this file except in - * compliance with the License. You may obtain a copy of the License at - * http://www.mozilla.org/MPL/ - * - * Software distributed under the License is distributed on an "AS IS" - * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See - * the License for the specific language governing rights and - * limitations under the License. - * - * The Original Code is librabbitmq. - * - * The Initial Developers of the Original Code are LShift Ltd, Cohesive - * Financial Technologies LLC, and Rabbit Technologies Ltd. Portions - * created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, Cohesive - * Financial Technologies LLC, or Rabbit Technologies Ltd are Copyright - * (C) 2007-2008 LShift Ltd, Cohesive Financial Technologies LLC, and - * Rabbit Technologies Ltd. - * - * Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift - * Ltd. Portions created by Cohesive Financial Technologies LLC are - * Copyright (C) 2007-2009 Cohesive Financial Technologies - * LLC. Portions created by Rabbit Technologies Ltd are Copyright (C) - * 2007-2009 Rabbit Technologies Ltd. - * - * Portions created by Tony Garnock-Jones are Copyright (C) 2009-2010 - * LShift Ltd and Tony Garnock-Jones. - * - * All Rights Reserved. - * - * Contributor(s): ______________________________________. - * - * Alternatively, the contents of this file may be used under the terms - * of the GNU General Public License Version 2 or later (the "GPL"), in - * which case the provisions of the GPL are applicable instead of those - * above. If you wish to allow use of your version of this file only - * under the terms of the GPL, and not to allow others to use your - * version of this file under the terms of the MPL, indicate your - * decision by deleting the provisions above and replace them with the - * notice and other provisions required by the GPL. If you do not - * delete the provisions above, a recipient may use your version of - * this file under the terms of any one of the MPL or the GPL. - * - * ***** END LICENSE BLOCK ***** - */ - -#include <stdlib.h> -#include <stdio.h> -#include <string.h> -#include <stdint.h> -#include <stdarg.h> - -#include "amqp.h" -#include "amqp_framing.h" -#include "amqp_private.h" - -#include <assert.h> - -static const char *client_error_strings[ERROR_MAX] = { - "could not allocate memory", /* ERROR_NO_MEMORY */ - "received bad AMQP data", /* ERROR_BAD_AQMP_DATA */ - "unknown AMQP class id", /* ERROR_UNKOWN_CLASS */ - "unknown AMQP method id", /* ERROR_UNKOWN_METHOD */ - "unknown host", /* ERROR_GETHOSTBYNAME_FAILED */ - "incompatible AMQP version", /* ERROR_INCOMPATIBLE_AMQP_VERSION */ - "connection closed unexpectedly", /* ERROR_CONNECTION_CLOSED */ -}; - -/* strdup is not in ISO C90! */ -static inline char *strdup(const char *str) -{ - return strcpy(malloc(strlen(str) + 1),str); -} - -char *amqp_error_string(int err) -{ - const char *str; - int category = (err & ERROR_CATEGORY_MASK); - err = (err & ~ERROR_CATEGORY_MASK); - - switch (category) { - case ERROR_CATEGORY_CLIENT: - if (err < 1 || err > ERROR_MAX) - str = "(undefined librabbitmq error)"; - else - str = client_error_strings[err - 1]; - break; - - case ERROR_CATEGORY_OS: - return amqp_os_error_string(err); - - default: - str = "(undefined error category)"; - } - - return strdup(str); -} - -void amqp_abort(const char *fmt, ...) -{ - va_list ap; - va_start(ap, fmt); - vfprintf(stderr, fmt, ap); - va_end(ap); - fputc('\n', stderr); - abort(); -} - -const amqp_bytes_t amqp_empty_bytes = { 0, NULL }; -const amqp_table_t amqp_empty_table = { 0, NULL }; -const amqp_array_t amqp_empty_array = { 0, NULL }; - -#define RPC_REPLY(replytype) \ - (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL \ - ? (replytype *) state->most_recent_api_result.reply.decoded \ - : NULL) - -amqp_channel_open_ok_t *amqp_channel_open(amqp_connection_state_t state, - amqp_channel_t channel) -{ - amqp_method_number_t replies[2] = { AMQP_CHANNEL_OPEN_OK_METHOD, 0}; - amqp_channel_open_t req; - req.out_of_band.bytes = NULL; - req.out_of_band.len = 0; - - state->most_recent_api_result = amqp_simple_rpc(state, channel, - AMQP_CHANNEL_OPEN_METHOD, - replies, &req); - if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL) - return state->most_recent_api_result.reply.decoded; - else - return NULL; -} - -int amqp_basic_publish(amqp_connection_state_t state, - amqp_channel_t channel, - amqp_bytes_t exchange, - amqp_bytes_t routing_key, - amqp_boolean_t mandatory, - amqp_boolean_t immediate, - amqp_basic_properties_t const *properties, - amqp_bytes_t body) -{ - amqp_frame_t f; - size_t body_offset; - size_t usable_body_payload_size = state->frame_max - (HEADER_SIZE + FOOTER_SIZE); - int res; - - amqp_basic_publish_t m; - amqp_basic_properties_t default_properties; - - m.exchange = exchange; - m.routing_key = routing_key; - m.mandatory = mandatory; - m.immediate = immediate; - - res = amqp_send_method(state, channel, AMQP_BASIC_PUBLISH_METHOD, &m); - if (res < 0) - return res; - - if (properties == NULL) { - memset(&default_properties, 0, sizeof(default_properties)); - properties = &default_properties; - } - - f.frame_type = AMQP_FRAME_HEADER; - f.channel = channel; - f.payload.properties.class_id = AMQP_BASIC_CLASS; - f.payload.properties.body_size = body.len; - f.payload.properties.decoded = (void *) properties; - - res = amqp_send_frame(state, &f); - if (res < 0) - return res; - - body_offset = 0; - while (1) { - int remaining = body.len - body_offset; - assert(remaining >= 0); - - if (remaining == 0) - break; - - f.frame_type = AMQP_FRAME_BODY; - f.channel = channel; - f.payload.body_fragment.bytes = amqp_offset(body.bytes, body_offset); - if (remaining >= usable_body_payload_size) { - f.payload.body_fragment.len = usable_body_payload_size; - } else { - f.payload.body_fragment.len = remaining; - } - - body_offset += f.payload.body_fragment.len; - res = amqp_send_frame(state, &f); - if (res < 0) - return res; - } - - return 0; -} - -amqp_rpc_reply_t amqp_channel_close(amqp_connection_state_t state, - amqp_channel_t channel, - int code) -{ - char codestr[13]; - amqp_method_number_t replies[2] = { AMQP_CHANNEL_CLOSE_OK_METHOD, 0}; - amqp_channel_close_t req; - - req.reply_code = code; - req.reply_text.bytes = codestr; - req.reply_text.len = sprintf(codestr, "%d", code); - req.class_id = 0; - req.method_id = 0; - - return amqp_simple_rpc(state, channel, AMQP_CHANNEL_CLOSE_METHOD, - replies, &req); -} - -amqp_rpc_reply_t amqp_connection_close(amqp_connection_state_t state, - int code) -{ - char codestr[13]; - amqp_method_number_t replies[2] = { AMQP_CONNECTION_CLOSE_OK_METHOD, 0}; - amqp_channel_close_t req; - - req.reply_code = code; - req.reply_text.bytes = codestr; - req.reply_text.len = sprintf(codestr, "%d", code); - req.class_id = 0; - req.method_id = 0; - - return amqp_simple_rpc(state, 0, AMQP_CONNECTION_CLOSE_METHOD, - replies, &req); -} - -amqp_exchange_declare_ok_t *amqp_exchange_declare(amqp_connection_state_t state, - amqp_channel_t channel, - amqp_bytes_t exchange, - amqp_bytes_t type, - amqp_boolean_t passive, - amqp_boolean_t durable, - amqp_table_t arguments) -{ - amqp_method_number_t replies[2] = { AMQP_EXCHANGE_DECLARE_OK_METHOD, 0}; - amqp_exchange_declare_t req; - req.exchange = exchange; - req.type = type; - req.passive = passive; - req.durable = durable; - req.auto_delete = 0; - req.internal = 0; - req.nowait = 0; - req.arguments = arguments; - - state->most_recent_api_result = amqp_simple_rpc(state, channel, - AMQP_EXCHANGE_DECLARE_METHOD, - replies, &req); - if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL) - return state->most_recent_api_result.reply.decoded; - else - return NULL; -} - -amqp_queue_declare_ok_t *amqp_queue_declare(amqp_connection_state_t state, - amqp_channel_t channel, - amqp_bytes_t queue, - amqp_boolean_t passive, - amqp_boolean_t durable, - amqp_boolean_t exclusive, - amqp_boolean_t auto_delete, - amqp_table_t arguments) -{ - amqp_method_number_t replies[2] = { AMQP_QUEUE_DECLARE_OK_METHOD, 0}; - amqp_queue_declare_t req; - req.queue = queue; - req.passive = passive; - req.durable = durable; - req.exclusive = exclusive; - req.auto_delete = auto_delete; - req.nowait = 0; - req.arguments = arguments; - - state->most_recent_api_result = amqp_simple_rpc(state, channel, - AMQP_QUEUE_DECLARE_METHOD, - replies, &req); - if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL) - return state->most_recent_api_result.reply.decoded; - else - return NULL; -} - -amqp_queue_delete_ok_t *amqp_queue_delete(amqp_connection_state_t state, - amqp_channel_t channel, - amqp_bytes_t queue, - amqp_boolean_t if_unused, - amqp_boolean_t if_empty) -{ - amqp_method_number_t replies[2] = { AMQP_QUEUE_DELETE_OK_METHOD, 0}; - amqp_queue_delete_t req; - req.queue = queue; - req.if_unused = if_unused; - req.if_empty = if_empty; - req.nowait = 0; - - state->most_recent_api_result = amqp_simple_rpc(state, channel, - AMQP_QUEUE_DELETE_METHOD, - replies, &req); - if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL) - return state->most_recent_api_result.reply.decoded; - else - return NULL; -} - -amqp_queue_bind_ok_t *amqp_queue_bind(amqp_connection_state_t state, - amqp_channel_t channel, - amqp_bytes_t queue, - amqp_bytes_t exchange, - amqp_bytes_t routing_key, - amqp_table_t arguments) -{ - amqp_method_number_t replies[2] = { AMQP_QUEUE_BIND_OK_METHOD, 0}; - amqp_queue_bind_t req; - req.ticket = 0; - req.queue = queue; - req.exchange = exchange; - req.routing_key = routing_key; - req.nowait = 0; - req.arguments = arguments; - - state->most_recent_api_result = amqp_simple_rpc(state, channel, - AMQP_QUEUE_BIND_METHOD, - replies, &req); - if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL) - return state->most_recent_api_result.reply.decoded; - else - return NULL; -} - -amqp_queue_unbind_ok_t *amqp_queue_unbind(amqp_connection_state_t state, - amqp_channel_t channel, - amqp_bytes_t queue, - amqp_bytes_t exchange, - amqp_bytes_t routing_key, - amqp_table_t arguments) -{ - amqp_method_number_t replies[2] = { AMQP_QUEUE_UNBIND_OK_METHOD, 0}; - amqp_queue_unbind_t req; - req.ticket = 0; - req.queue = queue; - req.exchange = exchange; - req.routing_key = routing_key; - req.arguments = arguments; - - state->most_recent_api_result = amqp_simple_rpc(state, channel, - AMQP_QUEUE_UNBIND_METHOD, - replies, &req); - if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL) - return state->most_recent_api_result.reply.decoded; - else - return NULL; -} - -amqp_basic_consume_ok_t *amqp_basic_consume(amqp_connection_state_t state, - amqp_channel_t channel, - amqp_bytes_t queue, - amqp_bytes_t consumer_tag, - amqp_boolean_t no_local, - amqp_boolean_t no_ack, - amqp_boolean_t exclusive, - amqp_table_t filter) -{ - amqp_method_number_t replies[2] = { AMQP_BASIC_CONSUME_OK_METHOD, 0}; - amqp_basic_consume_t req; - req.ticket = 0; - req.queue = queue; - req.consumer_tag = consumer_tag; - req.no_local = no_local; - req.no_ack = no_ack; - req.exclusive = exclusive; - req.nowait = 0; - req.arguments = filter; - - state->most_recent_api_result = amqp_simple_rpc(state, channel, - AMQP_BASIC_CONSUME_METHOD, - replies, &req); - if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL) - return state->most_recent_api_result.reply.decoded; - else - return NULL; -} - -int amqp_basic_ack(amqp_connection_state_t state, - amqp_channel_t channel, - uint64_t delivery_tag, - amqp_boolean_t multiple) -{ - amqp_basic_ack_t m; - m.delivery_tag = delivery_tag; - m.multiple = multiple; - return amqp_send_method(state, channel, AMQP_BASIC_ACK_METHOD, &m); -} - -amqp_queue_purge_ok_t *amqp_queue_purge(amqp_connection_state_t state, - amqp_channel_t channel, - amqp_bytes_t queue, - amqp_boolean_t no_wait) -{ - amqp_method_number_t replies[2] = { AMQP_QUEUE_PURGE_OK_METHOD, 0}; - amqp_queue_purge_t req; - req.ticket = 0; - req.queue = queue; - req.nowait = 0; - - state->most_recent_api_result = amqp_simple_rpc(state, channel, - AMQP_QUEUE_PURGE_METHOD, - replies, &req); - if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL) - return state->most_recent_api_result.reply.decoded; - else - return NULL; -} - -amqp_rpc_reply_t amqp_basic_get(amqp_connection_state_t state, - amqp_channel_t channel, - amqp_bytes_t queue, - amqp_boolean_t no_ack) -{ - amqp_method_number_t replies[] = { AMQP_BASIC_GET_OK_METHOD, - AMQP_BASIC_GET_EMPTY_METHOD, - 0 }; - amqp_basic_get_t req; - req.ticket = 0; - req.queue = queue; - req.no_ack = no_ack; - - state->most_recent_api_result = amqp_simple_rpc(state, channel, - AMQP_BASIC_GET_METHOD, - replies, &req); - return state->most_recent_api_result; -} - -amqp_tx_select_ok_t *amqp_tx_select(amqp_connection_state_t state, - amqp_channel_t channel) -{ - amqp_method_number_t replies[2] = { AMQP_TX_SELECT_OK_METHOD, 0}; - state->most_recent_api_result = amqp_simple_rpc(state, channel, - AMQP_TX_SELECT_METHOD, - replies, NULL); - if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL) - return state->most_recent_api_result.reply.decoded; - else - return NULL; -} - -amqp_tx_commit_ok_t *amqp_tx_commit(amqp_connection_state_t state, - amqp_channel_t channel) -{ - amqp_method_number_t replies[2] = { AMQP_TX_COMMIT_OK_METHOD, 0}; - state->most_recent_api_result = amqp_simple_rpc(state, channel, - AMQP_TX_COMMIT_METHOD, - replies, NULL); - if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL) - return state->most_recent_api_result.reply.decoded; - else - return NULL; -} - -amqp_tx_rollback_ok_t *amqp_tx_rollback(amqp_connection_state_t state, - amqp_channel_t channel) -{ - amqp_method_number_t replies[2] = { AMQP_TX_ROLLBACK_OK_METHOD, 0}; - state->most_recent_api_result = amqp_simple_rpc(state, channel, - AMQP_TX_ROLLBACK_METHOD, - replies, NULL); - if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL) - return state->most_recent_api_result.reply.decoded; - else - return NULL; -} - -amqp_rpc_reply_t amqp_get_rpc_reply(amqp_connection_state_t state) -{ - return state->most_recent_api_result; -} diff --git a/librabbitmq/amqp_connection.c b/librabbitmq/amqp_connection.c deleted file mode 100644 index 34d12f8..0000000 --- a/librabbitmq/amqp_connection.c +++ /dev/null @@ -1,436 +0,0 @@ -/* - * ***** BEGIN LICENSE BLOCK ***** - * Version: MPL 1.1/GPL 2.0 - * - * The contents of this file are subject to the Mozilla Public License - * Version 1.1 (the "License"); you may not use this file except in - * compliance with the License. You may obtain a copy of the License at - * http://www.mozilla.org/MPL/ - * - * Software distributed under the License is distributed on an "AS IS" - * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See - * the License for the specific language governing rights and - * limitations under the License. - * - * The Original Code is librabbitmq. - * - * The Initial Developers of the Original Code are LShift Ltd, Cohesive - * Financial Technologies LLC, and Rabbit Technologies Ltd. Portions - * created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, Cohesive - * Financial Technologies LLC, or Rabbit Technologies Ltd are Copyright - * (C) 2007-2008 LShift Ltd, Cohesive Financial Technologies LLC, and - * Rabbit Technologies Ltd. - * - * Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift - * Ltd. Portions created by Cohesive Financial Technologies LLC are - * Copyright (C) 2007-2009 Cohesive Financial Technologies - * LLC. Portions created by Rabbit Technologies Ltd are Copyright (C) - * 2007-2009 Rabbit Technologies Ltd. - * - * Portions created by Tony Garnock-Jones are Copyright (C) 2009-2010 - * LShift Ltd and Tony Garnock-Jones. - * - * All Rights Reserved. - * - * Contributor(s): ______________________________________. - * - * Alternatively, the contents of this file may be used under the terms - * of the GNU General Public License Version 2 or later (the "GPL"), in - * which case the provisions of the GPL are applicable instead of those - * above. If you wish to allow use of your version of this file only - * under the terms of the GPL, and not to allow others to use your - * version of this file under the terms of the MPL, indicate your - * decision by deleting the provisions above and replace them with the - * notice and other provisions required by the GPL. If you do not - * delete the provisions above, a recipient may use your version of - * this file under the terms of any one of the MPL or the GPL. - * - * ***** END LICENSE BLOCK ***** - */ - -#include <stdlib.h> -#include <stdio.h> -#include <string.h> -#include <stdint.h> -#include <assert.h> - -#include "amqp.h" -#include "amqp_framing.h" -#include "amqp_private.h" - -#define INITIAL_FRAME_POOL_PAGE_SIZE 65536 -#define INITIAL_DECODING_POOL_PAGE_SIZE 131072 -#define INITIAL_INBOUND_SOCK_BUFFER_SIZE 131072 - -#define ENFORCE_STATE(statevec, statenum) \ - { \ - amqp_connection_state_t _check_state = (statevec); \ - int _wanted_state = (statenum); \ - if (_check_state->state != _wanted_state) \ - amqp_abort("Programming error: invalid AMQP connection state: expected %d, got %d", \ - _wanted_state, \ - _check_state->state); \ - } - -amqp_connection_state_t amqp_new_connection(void) { - amqp_connection_state_t state = - (amqp_connection_state_t) calloc(1, sizeof(struct amqp_connection_state_t_)); - - if (state == NULL) - return NULL; - - init_amqp_pool(&state->frame_pool, INITIAL_FRAME_POOL_PAGE_SIZE); - init_amqp_pool(&state->decoding_pool, INITIAL_DECODING_POOL_PAGE_SIZE); - - if (amqp_tune_connection(state, 0, INITIAL_FRAME_POOL_PAGE_SIZE, 0) != 0) - goto out_nomem; - - state->inbound_buffer.bytes = amqp_pool_alloc(&state->frame_pool, state->inbound_buffer.len); - if (state->inbound_buffer.bytes == NULL) - goto out_nomem; - - state->state = CONNECTION_STATE_INITIAL; - /* the server protocol version response is 8 bytes, which conveniently - is also the minimum frame size */ - state->target_size = 8; - - state->sockfd = -1; - state->sock_inbound_buffer.len = INITIAL_INBOUND_SOCK_BUFFER_SIZE; - state->sock_inbound_buffer.bytes = malloc(INITIAL_INBOUND_SOCK_BUFFER_SIZE); - if (state->sock_inbound_buffer.bytes == NULL) - goto out_nomem; - - return state; - - out_nomem: - free(state->sock_inbound_buffer.bytes); - empty_amqp_pool(&state->frame_pool); - empty_amqp_pool(&state->decoding_pool); - free(state); - return NULL; -} - -int amqp_get_sockfd(amqp_connection_state_t state) { - return state->sockfd; -} - -void amqp_set_sockfd(amqp_connection_state_t state, - int sockfd) -{ - state->sockfd = sockfd; -} - -int amqp_tune_connection(amqp_connection_state_t state, - int channel_max, - int frame_max, - int heartbeat) -{ - void *newbuf; - - ENFORCE_STATE(state, CONNECTION_STATE_IDLE); - - state->channel_max = channel_max; - state->frame_max = frame_max; - state->heartbeat = heartbeat; - - empty_amqp_pool(&state->frame_pool); - init_amqp_pool(&state->frame_pool, frame_max); - - state->inbound_buffer.len = frame_max; - state->outbound_buffer.len = frame_max; - newbuf = realloc(state->outbound_buffer.bytes, frame_max); - if (newbuf == NULL) { - amqp_destroy_connection(state); - return -ERROR_NO_MEMORY; - } - state->outbound_buffer.bytes = newbuf; - - return 0; -} - -int amqp_get_channel_max(amqp_connection_state_t state) { - return state->channel_max; -} - -int amqp_destroy_connection(amqp_connection_state_t state) { - int s = state->sockfd; - - empty_amqp_pool(&state->frame_pool); - empty_amqp_pool(&state->decoding_pool); - free(state->outbound_buffer.bytes); - free(state->sock_inbound_buffer.bytes); - free(state); - - if (s >= 0 && amqp_socket_close(s) < 0) - return -amqp_socket_error(); - else - return 0; -} - -static void return_to_idle(amqp_connection_state_t state) { - state->inbound_buffer.bytes = NULL; - state->inbound_offset = 0; - state->target_size = HEADER_SIZE; - state->state = CONNECTION_STATE_IDLE; -} - -static size_t consume_data(amqp_connection_state_t state, - amqp_bytes_t *received_data) -{ - /* how much data is available and will fit? */ - size_t bytes_consumed = state->target_size - state->inbound_offset; - if (received_data->len < bytes_consumed) - bytes_consumed = received_data->len; - - memcpy(amqp_offset(state->inbound_buffer.bytes, state->inbound_offset), - received_data->bytes, bytes_consumed); - state->inbound_offset += bytes_consumed; - received_data->bytes = amqp_offset(received_data->bytes, bytes_consumed); - received_data->len -= bytes_consumed; - - return bytes_consumed; -} - -int amqp_handle_input(amqp_connection_state_t state, - amqp_bytes_t received_data, - amqp_frame_t *decoded_frame) -{ - size_t bytes_consumed; - void *raw_frame; - - /* Returning frame_type of zero indicates either insufficient input, - or a complete, ignored frame was read. */ - decoded_frame->frame_type = 0; - - if (received_data.len == 0) - return 0; - - if (state->state == CONNECTION_STATE_IDLE) { - state->inbound_buffer.bytes = amqp_pool_alloc(&state->frame_pool, - state->inbound_buffer.len); - if (state->inbound_buffer.bytes == NULL) - /* state->inbound_buffer.len is always nonzero, because it - corresponds to frame_max, which is not permitted to be less - than AMQP_FRAME_MIN_SIZE (currently 4096 bytes). */ - return -ERROR_NO_MEMORY; - - state->state = CONNECTION_STATE_HEADER; - } - - bytes_consumed = consume_data(state, &received_data); - - /* do we have target_size data yet? if not, return with the - expectation that more will arrive */ - if (state->inbound_offset < state->target_size) - return bytes_consumed; - - raw_frame = state->inbound_buffer.bytes; - - switch (state->state) { - case CONNECTION_STATE_INITIAL: - /* check for a protocol header from the server */ - if (memcmp(raw_frame, "AMQP", 4) == 0) { - decoded_frame->frame_type = AMQP_PSEUDOFRAME_PROTOCOL_HEADER; - decoded_frame->channel = 0; - - decoded_frame->payload.protocol_header.transport_high - = amqp_d8(raw_frame, 4); - decoded_frame->payload.protocol_header.transport_low - = amqp_d8(raw_frame, 5); - decoded_frame->payload.protocol_header.protocol_version_major - = amqp_d8(raw_frame, 6); - decoded_frame->payload.protocol_header.protocol_version_minor - = amqp_d8(raw_frame, 7); - - return_to_idle(state); - return bytes_consumed; - } - - /* it's not a protocol header; fall through to process it as a - regular frame header */ - - case CONNECTION_STATE_HEADER: - /* frame length is 3 bytes in */ - state->target_size - = amqp_d32(raw_frame, 3) + HEADER_SIZE + FOOTER_SIZE; - state->state = CONNECTION_STATE_BODY; - - bytes_consumed += consume_data(state, &received_data); - - /* do we have target_size data yet? if not, return with the - expectation that more will arrive */ - if (state->inbound_offset < state->target_size) - return bytes_consumed; - - /* fall through to process body */ - - case CONNECTION_STATE_BODY: { - amqp_bytes_t encoded; - int res; - - /* Check frame end marker (footer) */ - if (amqp_d8(raw_frame, state->target_size - 1) != AMQP_FRAME_END) - return -ERROR_BAD_AMQP_DATA; - - decoded_frame->frame_type = amqp_d8(raw_frame, 0); - decoded_frame->channel = amqp_d16(raw_frame, 1); - - switch (decoded_frame->frame_type) { - case AMQP_FRAME_METHOD: - decoded_frame->payload.method.id = amqp_d32(raw_frame, HEADER_SIZE); - encoded.bytes = amqp_offset(raw_frame, HEADER_SIZE + 4); - encoded.len = state->target_size - HEADER_SIZE - 4 - FOOTER_SIZE; - - res = amqp_decode_method(decoded_frame->payload.method.id, - &state->decoding_pool, encoded, - &decoded_frame->payload.method.decoded); - if (res < 0) - return res; - - break; - - case AMQP_FRAME_HEADER: - decoded_frame->payload.properties.class_id - = amqp_d16(raw_frame, HEADER_SIZE); - /* unused 2-byte weight field goes here */ - decoded_frame->payload.properties.body_size - = amqp_d64(raw_frame, HEADER_SIZE + 4); - encoded.bytes = amqp_offset(raw_frame, HEADER_SIZE + 12); - encoded.len = state->target_size - HEADER_SIZE - 12 - FOOTER_SIZE; - decoded_frame->payload.properties.raw = encoded; - - res = amqp_decode_properties(decoded_frame->payload.properties.class_id, - &state->decoding_pool, encoded, - &decoded_frame->payload.properties.decoded); - if (res < 0) - return res; - - break; - - case AMQP_FRAME_BODY: - decoded_frame->payload.body_fragment.len - = state->target_size - HEADER_SIZE - FOOTER_SIZE; - decoded_frame->payload.body_fragment.bytes - = amqp_offset(raw_frame, HEADER_SIZE); - break; - - case AMQP_FRAME_HEARTBEAT: - break; - - default: - /* Ignore the frame */ - decoded_frame->frame_type = 0; - break; - } - - return_to_idle(state); - return bytes_consumed; - } - - default: - amqp_abort("Internal error: invalid amqp_connection_state_t->state %d", state->state); - return bytes_consumed; - } -} - -amqp_boolean_t amqp_release_buffers_ok(amqp_connection_state_t state) { - return (state->state == CONNECTION_STATE_IDLE) && (state->first_queued_frame == NULL); -} - -void amqp_release_buffers(amqp_connection_state_t state) { - ENFORCE_STATE(state, CONNECTION_STATE_IDLE); - - if (state->first_queued_frame) - amqp_abort("Programming error: attempt to amqp_release_buffers while waiting events enqueued"); - - recycle_amqp_pool(&state->frame_pool); - recycle_amqp_pool(&state->decoding_pool); -} - -void amqp_maybe_release_buffers(amqp_connection_state_t state) { - if (amqp_release_buffers_ok(state)) { - amqp_release_buffers(state); - } -} - -int amqp_send_frame(amqp_connection_state_t state, - const amqp_frame_t *frame) -{ - void *out_frame = state->outbound_buffer.bytes; - int res; - - amqp_e8(out_frame, 0, frame->frame_type); - amqp_e16(out_frame, 1, frame->channel); - - if (frame->frame_type == AMQP_FRAME_BODY) { - /* For a body frame, rather than copying data around, we use - writev to compose the frame */ - struct iovec iov[3]; - uint8_t frame_end_byte = AMQP_FRAME_END; - const amqp_bytes_t *body = &frame->payload.body_fragment; - - amqp_e32(out_frame, 3, body->len); - - iov[0].iov_base = out_frame; - iov[0].iov_len = HEADER_SIZE; - iov[1].iov_base = body->bytes; - iov[1].iov_len = body->len; - iov[2].iov_base = &frame_end_byte; - iov[2].iov_len = FOOTER_SIZE; - - res = amqp_socket_writev(state->sockfd, iov, 3); - } - else { - size_t out_frame_len; - amqp_bytes_t encoded; - - switch (frame->frame_type) { - case AMQP_FRAME_METHOD: - amqp_e32(out_frame, HEADER_SIZE, frame->payload.method.id); - - encoded.bytes = amqp_offset(out_frame, HEADER_SIZE + 4); - encoded.len = state->outbound_buffer.len - HEADER_SIZE - 4 - FOOTER_SIZE; - - res = amqp_encode_method(frame->payload.method.id, - frame->payload.method.decoded, encoded); - if (res < 0) - return res; - - out_frame_len = res + 4; - break; - - case AMQP_FRAME_HEADER: - amqp_e16(out_frame, HEADER_SIZE, frame->payload.properties.class_id); - amqp_e16(out_frame, HEADER_SIZE+2, 0); /* "weight" */ - amqp_e64(out_frame, HEADER_SIZE+4, frame->payload.properties.body_size); - - encoded.bytes = amqp_offset(out_frame, HEADER_SIZE + 12); - encoded.len = state->outbound_buffer.len - HEADER_SIZE - 12 - FOOTER_SIZE; - - res = amqp_encode_properties(frame->payload.properties.class_id, - frame->payload.properties.decoded, encoded); - if (res < 0) - return res; - - out_frame_len = res + 12; - break; - - case AMQP_FRAME_HEARTBEAT: - out_frame_len = 0; - break; - - default: - abort(); - } - - amqp_e32(out_frame, 3, out_frame_len); - amqp_e8(out_frame, out_frame_len + HEADER_SIZE, AMQP_FRAME_END); - res = send(state->sockfd, out_frame, - out_frame_len + HEADER_SIZE + FOOTER_SIZE, 0); - } - - if (res < 0) - return -amqp_socket_error(); - else - return 0; -} diff --git a/librabbitmq/amqp_mem.c b/librabbitmq/amqp_mem.c deleted file mode 100644 index 3ad8c3e..0000000 --- a/librabbitmq/amqp_mem.c +++ /dev/null @@ -1,200 +0,0 @@ -/* - * ***** BEGIN LICENSE BLOCK ***** - * Version: MPL 1.1/GPL 2.0 - * - * The contents of this file are subject to the Mozilla Public License - * Version 1.1 (the "License"); you may not use this file except in - * compliance with the License. You may obtain a copy of the License at - * http://www.mozilla.org/MPL/ - * - * Software distributed under the License is distributed on an "AS IS" - * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See - * the License for the specific language governing rights and - * limitations under the License. - * - * The Original Code is librabbitmq. - * - * The Initial Developers of the Original Code are LShift Ltd, Cohesive - * Financial Technologies LLC, and Rabbit Technologies Ltd. Portions - * created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, Cohesive - * Financial Technologies LLC, or Rabbit Technologies Ltd are Copyright - * (C) 2007-2008 LShift Ltd, Cohesive Financial Technologies LLC, and - * Rabbit Technologies Ltd. - * - * Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift - * Ltd. Portions created by Cohesive Financial Technologies LLC are - * Copyright (C) 2007-2009 Cohesive Financial Technologies - * LLC. Portions created by Rabbit Technologies Ltd are Copyright (C) - * 2007-2009 Rabbit Technologies Ltd. - * - * Portions created by Tony Garnock-Jones are Copyright (C) 2009-2010 - * LShift Ltd and Tony Garnock-Jones. - * - * All Rights Reserved. - * - * Contributor(s): ______________________________________. - * - * Alternatively, the contents of this file may be used under the terms - * of the GNU General Public License Version 2 or later (the "GPL"), in - * which case the provisions of the GPL are applicable instead of those - * above. If you wish to allow use of your version of this file only - * under the terms of the GPL, and not to allow others to use your - * version of this file under the terms of the MPL, indicate your - * decision by deleting the provisions above and replace them with the - * notice and other provisions required by the GPL. If you do not - * delete the provisions above, a recipient may use your version of - * this file under the terms of any one of the MPL or the GPL. - * - * ***** END LICENSE BLOCK ***** - */ - -#include <stdlib.h> -#include <stdio.h> -#include <string.h> -#include <stdint.h> -#include <sys/types.h> -#include <assert.h> - -#include "amqp.h" -#include "config.h" - -char const *amqp_version(void) { - return VERSION; /* defined in config.h */ -} - -void init_amqp_pool(amqp_pool_t *pool, size_t pagesize) { - pool->pagesize = pagesize ? pagesize : 4096; - - pool->pages.num_blocks = 0; - pool->pages.blocklist = NULL; - - pool->large_blocks.num_blocks = 0; - pool->large_blocks.blocklist = NULL; - - pool->next_page = 0; - pool->alloc_block = NULL; - pool->alloc_used = 0; -} - -static void empty_blocklist(amqp_pool_blocklist_t *x) { - int i; - - for (i = 0; i < x->num_blocks; i++) { - free(x->blocklist[i]); - } - if (x->blocklist != NULL) { - free(x->blocklist); - } - x->num_blocks = 0; - x->blocklist = NULL; -} - -void recycle_amqp_pool(amqp_pool_t *pool) { - empty_blocklist(&pool->large_blocks); - pool->next_page = 0; - pool->alloc_block = NULL; - pool->alloc_used = 0; -} - -void empty_amqp_pool(amqp_pool_t *pool) { - recycle_amqp_pool(pool); - empty_blocklist(&pool->pages); -} - -/* Returns 1 on success, 0 on failure */ -static int record_pool_block(amqp_pool_blocklist_t *x, void *block) { - size_t blocklistlength = sizeof(void *) * (x->num_blocks + 1); - - if (x->blocklist == NULL) { - x->blocklist = malloc(blocklistlength); - if (x->blocklist == NULL) - return 0; - } else { - void *newbl = realloc(x->blocklist, blocklistlength); - if (newbl == NULL) - return 0; - x->blocklist = newbl; - } - - x->blocklist[x->num_blocks] = block; - x->num_blocks++; - return 1; -} - -void *amqp_pool_alloc(amqp_pool_t *pool, size_t amount) { - if (amount == 0) { - return NULL; - } - - amount = (amount + 7) & (~7); /* round up to nearest 8-byte boundary */ - - if (amount > pool->pagesize) { - void *result = calloc(1, amount); - if (result == NULL) { - return NULL; - } - if (!record_pool_block(&pool->large_blocks, result)) - return NULL; - return result; - } - - if (pool->alloc_block != NULL) { - assert(pool->alloc_used <= pool->pagesize); - - if (pool->alloc_used + amount <= pool->pagesize) { - void *result = pool->alloc_block + pool->alloc_used; - pool->alloc_used += amount; - return result; - } - } - - if (pool->next_page >= pool->pages.num_blocks) { - pool->alloc_block = calloc(1, pool->pagesize); - if (pool->alloc_block == NULL) { - return NULL; - } - if (!record_pool_block(&pool->pages, pool->alloc_block)) - return NULL; - pool->next_page = pool->pages.num_blocks; - } else { - pool->alloc_block = pool->pages.blocklist[pool->next_page]; - pool->next_page++; - } - - pool->alloc_used = amount; - - return pool->alloc_block; -} - -void amqp_pool_alloc_bytes(amqp_pool_t *pool, size_t amount, amqp_bytes_t *output) { - output->len = amount; - output->bytes = amqp_pool_alloc(pool, amount); -} - -amqp_bytes_t amqp_cstring_bytes(char const *cstr) { - amqp_bytes_t result; - result.len = strlen(cstr); - result.bytes = (void *) cstr; - return result; -} - -amqp_bytes_t amqp_bytes_malloc_dup(amqp_bytes_t src) { - amqp_bytes_t result; - result.len = src.len; - result.bytes = malloc(src.len); - if (result.bytes != NULL) { - memcpy(result.bytes, src.bytes, src.len); - } - return result; -} - -amqp_bytes_t amqp_bytes_malloc(size_t amount) { - amqp_bytes_t result; - result.len = amount; - result.bytes = malloc(amount); /* will return NULL if it fails */ - return result; -} - -void amqp_bytes_free(amqp_bytes_t bytes) { - free(bytes.bytes); -} diff --git a/librabbitmq/amqp_private.h b/librabbitmq/amqp_private.h deleted file mode 100644 index eaf6b2c..0000000 --- a/librabbitmq/amqp_private.h +++ /dev/null @@ -1,251 +0,0 @@ -#ifndef librabbitmq_amqp_private_h -#define librabbitmq_amqp_private_h - -/* - * ***** BEGIN LICENSE BLOCK ***** - * Version: MPL 1.1/GPL 2.0 - * - * The contents of this file are subject to the Mozilla Public License - * Version 1.1 (the "License"); you may not use this file except in - * compliance with the License. You may obtain a copy of the License at - * http://www.mozilla.org/MPL/ - * - * Software distributed under the License is distributed on an "AS IS" - * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See - * the License for the specific language governing rights and - * limitations under the License. - * - * The Original Code is librabbitmq. - * - * The Initial Developers of the Original Code are LShift Ltd, Cohesive - * Financial Technologies LLC, and Rabbit Technologies Ltd. Portions - * created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, Cohesive - * Financial Technologies LLC, or Rabbit Technologies Ltd are Copyright - * (C) 2007-2008 LShift Ltd, Cohesive Financial Technologies LLC, and - * Rabbit Technologies Ltd. - * - * Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift - * Ltd. Portions created by Cohesive Financial Technologies LLC are - * Copyright (C) 2007-2009 Cohesive Financial Technologies - * LLC. Portions created by Rabbit Technologies Ltd are Copyright (C) - * 2007-2009 Rabbit Technologies Ltd. - * - * Portions created by Tony Garnock-Jones are Copyright (C) 2009-2010 - * LShift Ltd and Tony Garnock-Jones. - * - * All Rights Reserved. - * - * Contributor(s): ______________________________________. - * - * Alternatively, the contents of this file may be used under the terms - * of the GNU General Public License Version 2 or later (the "GPL"), in - * which case the provisions of the GPL are applicable instead of those - * above. If you wish to allow use of your version of this file only - * under the terms of the GPL, and not to allow others to use your - * version of this file under the terms of the MPL, indicate your - * decision by deleting the provisions above and replace them with the - * notice and other provisions required by the GPL. If you do not - * delete the provisions above, a recipient may use your version of - * this file under the terms of any one of the MPL or the GPL. - * - * ***** END LICENSE BLOCK ***** - */ - -#include "config.h" - -/* Error numbering: Because of differences in error numbering on - * different platforms, we want to keep error numbers opaque for - * client code. Internally, we encode the category of an error - * (i.e. where its number comes from) in the top bits of the number - * (assuming that an int has at least 32 bits). - */ -#define ERROR_CATEGORY_MASK (1 << 29) - -#define ERROR_CATEGORY_CLIENT (0 << 29) /* librabbitmq error codes */ -#define ERROR_CATEGORY_OS (1 << 29) /* OS-specific error codes */ - -/* librabbitmq error codes */ -#define ERROR_NO_MEMORY 1 -#define ERROR_BAD_AMQP_DATA 2 -#define ERROR_UNKNOWN_CLASS 3 -#define ERROR_UNKNOWN_METHOD 4 -#define ERROR_GETHOSTBYNAME_FAILED 5 -#define ERROR_INCOMPATIBLE_AMQP_VERSION 6 -#define ERROR_CONNECTION_CLOSED 7 -#define ERROR_MAX 7 - -extern char *amqp_os_error_string(int err); - -#include "socket.h" - -/* - * Connection states: XXX FIX THIS - * - * - CONNECTION_STATE_INITIAL: The initial state, when we cannot be - * sure if the next thing we will get is the first AMQP frame, or a - * protocol header from the server. - * - * - CONNECTION_STATE_IDLE: The normal state between - * frames. Connections may only be reconfigured, and the - * connection's pools recycled, when in this state. Whenever we're - * in this state, the inbound_buffer's bytes pointer must be NULL; - * any other state, and it must point to a block of memory allocated - * from the frame_pool. - * - * - CONNECTION_STATE_HEADER: Some bytes of an incoming frame have - * been seen, but not a complete frame header's worth. - * - * - CONNECTION_STATE_BODY: A complete frame header has been seen, but - * the frame is not yet complete. When it is completed, it will be - * returned, and the connection will return to IDLE state. - * - */ -typedef enum amqp_connection_state_enum_ { - CONNECTION_STATE_IDLE = 0, - CONNECTION_STATE_INITIAL, - CONNECTION_STATE_HEADER, - CONNECTION_STATE_BODY -} amqp_connection_state_enum; - -/* 7 bytes up front, then payload, then 1 byte footer */ -#define HEADER_SIZE 7 -#define FOOTER_SIZE 1 - -#define AMQP_PSEUDOFRAME_PROTOCOL_HEADER 'A' - -typedef struct amqp_link_t_ { - struct amqp_link_t_ *next; - void *data; -} amqp_link_t; - -struct amqp_connection_state_t_ { - amqp_pool_t frame_pool; - amqp_pool_t decoding_pool; - - amqp_connection_state_enum state; - - int channel_max; - int frame_max; - int heartbeat; - amqp_bytes_t inbound_buffer; - - size_t inbound_offset; - size_t target_size; - - amqp_bytes_t outbound_buffer; - - int sockfd; - amqp_bytes_t sock_inbound_buffer; - size_t sock_inbound_offset; - size_t sock_inbound_limit; - - amqp_link_t *first_queued_frame; - amqp_link_t *last_queued_frame; - - amqp_rpc_reply_t most_recent_api_result; -}; - -static inline void *amqp_offset(void *data, size_t offset) -{ - return (char *)data + offset; -} - -/* assuming a machine that supports unaligned accesses (for now) */ - -#define DECLARE_CODEC_BASE_TYPE(bits, htonx, ntohx) \ - \ -static inline void amqp_e##bits(void *data, size_t offset, \ - uint##bits##_t val) \ -{ \ - *(uint##bits##_t *)amqp_offset(data, offset) = htonx(val); \ -} \ - \ -static inline uint##bits##_t amqp_d##bits(void *data, size_t offset) \ -{ \ - return ntohx(*(uint##bits##_t *)amqp_offset(data, offset)); \ -} \ - \ -static inline int amqp_encode_##bits(amqp_bytes_t encoded, size_t *offset, \ - uint##bits##_t input) \ - \ -{ \ - size_t o = *offset; \ - if ((*offset = o + bits / 8) <= encoded.len) { \ - amqp_e##bits(encoded.bytes, o, input); \ - return 1; \ - } \ - else { \ - return 0; \ - } \ -} \ - \ -static inline int amqp_decode_##bits(amqp_bytes_t encoded, size_t *offset, \ - uint##bits##_t *output) \ - \ -{ \ - size_t o = *offset; \ - if ((*offset = o + bits / 8) <= encoded.len) { \ - *output = amqp_d##bits(encoded.bytes, o); \ - *output = ntohx(*(uint##bits##_t *)((char *)encoded.bytes + o)); \ - return 1; \ - } \ - else { \ - return 0; \ - } \ -} - -/* assuming little endian (for now) */ - -#define DECLARE_XTOXLL(func) \ -static inline uint64_t func##ll(uint64_t val) \ -{ \ - union { \ - uint64_t whole; \ - uint32_t halves[2]; \ - } u; \ - uint32_t t; \ - u.whole = val; \ - t = u.halves[0]; \ - u.halves[0] = func##l(u.halves[1]); \ - u.halves[1] = func##l(t); \ - return u.whole; \ -} - -DECLARE_XTOXLL(hton) -DECLARE_XTOXLL(ntoh) - -DECLARE_CODEC_BASE_TYPE(8, (uint8_t), (uint8_t)) -DECLARE_CODEC_BASE_TYPE(16, htons, ntohs) -DECLARE_CODEC_BASE_TYPE(32, htonl, ntohl) -DECLARE_CODEC_BASE_TYPE(64, htonll, ntohll) - -static inline int amqp_encode_bytes(amqp_bytes_t encoded, size_t *offset, - amqp_bytes_t input) -{ - size_t o = *offset; - if ((*offset = o + input.len) <= encoded.len) { - memcpy(amqp_offset(encoded.bytes, o), input.bytes, input.len); - return 1; - } - else { - return 0; - } -} - -static inline int amqp_decode_bytes(amqp_bytes_t encoded, size_t *offset, - amqp_bytes_t *output, size_t len) -{ - size_t o = *offset; - if ((*offset = o + len) <= encoded.len) { - output->bytes = amqp_offset(encoded.bytes, o); - output->len = len; - return 1; - } - else { - return 0; - } -} - -extern void amqp_abort(const char *fmt, ...); - -#endif diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c deleted file mode 100644 index f23b42b..0000000 --- a/librabbitmq/amqp_socket.c +++ /dev/null @@ -1,497 +0,0 @@ -/* - * ***** BEGIN LICENSE BLOCK ***** - * Version: MPL 1.1/GPL 2.0 - * - * The contents of this file are subject to the Mozilla Public License - * Version 1.1 (the "License"); you may not use this file except in - * compliance with the License. You may obtain a copy of the License at - * http://www.mozilla.org/MPL/ - * - * Software distributed under the License is distributed on an "AS IS" - * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See - * the License for the specific language governing rights and - * limitations under the License. - * - * The Original Code is librabbitmq. - * - * The Initial Developers of the Original Code are LShift Ltd, Cohesive - * Financial Technologies LLC, and Rabbit Technologies Ltd. Portions - * created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, Cohesive - * Financial Technologies LLC, or Rabbit Technologies Ltd are Copyright - * (C) 2007-2008 LShift Ltd, Cohesive Financial Technologies LLC, and - * Rabbit Technologies Ltd. - * - * Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift - * Ltd. Portions created by Cohesive Financial Technologies LLC are - * Copyright (C) 2007-2009 Cohesive Financial Technologies - * LLC. Portions created by Rabbit Technologies Ltd are Copyright (C) - * 2007-2009 Rabbit Technologies Ltd. - * - * Portions created by Tony Garnock-Jones are Copyright (C) 2009-2010 - * LShift Ltd and Tony Garnock-Jones. - * - * All Rights Reserved. - * - * Contributor(s): ______________________________________. - * - * Alternatively, the contents of this file may be used under the terms - * of the GNU General Public License Version 2 or later (the "GPL"), in - * which case the provisions of the GPL are applicable instead of those - * above. If you wish to allow use of your version of this file only - * under the terms of the GPL, and not to allow others to use your - * version of this file under the terms of the MPL, indicate your - * decision by deleting the provisions above and replace them with the - * notice and other provisions required by the GPL. If you do not - * delete the provisions above, a recipient may use your version of - * this file under the terms of any one of the MPL or the GPL. - * - * ***** END LICENSE BLOCK ***** - */ - -#include <stdlib.h> -#include <stdio.h> -#include <string.h> -#include <stdint.h> -#include <stdarg.h> -#include <assert.h> - -#include "amqp.h" -#include "amqp_framing.h" -#include "amqp_private.h" - - -int amqp_open_socket(char const *hostname, - int portnumber) -{ - int sockfd, res; - struct sockaddr_in addr; - struct hostent *he; - int one = 1; /* used as a buffer by setsockopt below */ - - res = amqp_socket_init(); - if (res) - return res; - - he = gethostbyname(hostname); - if (he == NULL) - return -ERROR_GETHOSTBYNAME_FAILED; - - addr.sin_family = AF_INET; - addr.sin_port = htons(portnumber); - addr.sin_addr.s_addr = * (uint32_t *) he->h_addr_list[0]; - - sockfd = socket(PF_INET, SOCK_STREAM, 0); - if (sockfd == -1) - return -amqp_socket_error(); - - if (amqp_socket_setsockopt(sockfd, IPPROTO_TCP, TCP_NODELAY, &one, - sizeof(one)) < 0 - || connect(sockfd, (struct sockaddr *) &addr, sizeof(addr)) < 0) - { - res = -amqp_socket_error(); - amqp_socket_close(sockfd); - return res; - } - - return sockfd; -} - -int amqp_send_header(amqp_connection_state_t state) { - static const uint8_t header[8] = { 'A', 'M', 'Q', 'P', 0, - AMQP_PROTOCOL_VERSION_MAJOR, - AMQP_PROTOCOL_VERSION_MINOR, - AMQP_PROTOCOL_VERSION_REVISION }; - return send(state->sockfd, (void *)header, 8, 0); -} - -static amqp_bytes_t sasl_method_name(amqp_sasl_method_enum method) { - amqp_bytes_t res; - - switch (method) { - case AMQP_SASL_METHOD_PLAIN: - res.bytes = "PLAIN"; - res.len = 5; - break; - - default: - amqp_abort("Invalid SASL method: %d", (int) method); - } - - return res; -} - -static amqp_bytes_t sasl_response(amqp_pool_t *pool, - amqp_sasl_method_enum method, - va_list args) -{ - amqp_bytes_t response; - - switch (method) { - case AMQP_SASL_METHOD_PLAIN: { - char *username = va_arg(args, char *); - size_t username_len = strlen(username); - char *password = va_arg(args, char *); - size_t password_len = strlen(password); - char *response_buf; - - amqp_pool_alloc_bytes(pool, strlen(username) + strlen(password) + 2, &response); - if (response.bytes == NULL) - /* We never request a zero-length block, because of the +2 - above, so a NULL here really is ENOMEM. */ - return response; - - response_buf = response.bytes; - response_buf[0] = 0; - memcpy(response_buf + 1, username, username_len); - response_buf[username_len + 1] = 0; - memcpy(response_buf + username_len + 2, password, password_len); - break; - } - default: - amqp_abort("Invalid SASL method: %d", (int) method); - } - - return response; -} - -amqp_boolean_t amqp_frames_enqueued(amqp_connection_state_t state) { - return (state->first_queued_frame != NULL); -} - -/* - * Check to see if we have data in our buffer. If this returns 1, we - * will avoid an immediate blocking read in amqp_simple_wait_frame. - */ -amqp_boolean_t amqp_data_in_buffer(amqp_connection_state_t state) { - return (state->sock_inbound_offset < state->sock_inbound_limit); -} - -static int wait_frame_inner(amqp_connection_state_t state, - amqp_frame_t *decoded_frame) -{ - while (1) { - int res; - - while (amqp_data_in_buffer(state)) { - amqp_bytes_t buffer; - buffer.len = state->sock_inbound_limit - state->sock_inbound_offset; - buffer.bytes = ((char *) state->sock_inbound_buffer.bytes) + state->sock_inbound_offset; - - res = amqp_handle_input(state, buffer, decoded_frame); - if (res < 0) - return res; - - state->sock_inbound_offset += res; - - if (decoded_frame->frame_type != 0) - /* Complete frame was read. Return it. */ - return 0; - - /* Incomplete or ignored frame. Keep processing input. */ - assert(res != 0); - } - - res = recv(state->sockfd, state->sock_inbound_buffer.bytes, - state->sock_inbound_buffer.len, 0); - if (res <= 0) { - if (res == 0) - return -ERROR_CONNECTION_CLOSED; - else - return -amqp_socket_error(); - } - - state->sock_inbound_limit = res; - state->sock_inbound_offset = 0; - } -} - -int amqp_simple_wait_frame(amqp_connection_state_t state, - amqp_frame_t *decoded_frame) -{ - if (state->first_queued_frame != NULL) { - amqp_frame_t *f = (amqp_frame_t *) state->first_queued_frame->data; - state->first_queued_frame = state->first_queued_frame->next; - if (state->first_queued_frame == NULL) { - state->last_queued_frame = NULL; - } - *decoded_frame = *f; - return 0; - } else { - return wait_frame_inner(state, decoded_frame); - } -} - -int amqp_simple_wait_method(amqp_connection_state_t state, - amqp_channel_t expected_channel, - amqp_method_number_t expected_method, - amqp_method_t *output) -{ - amqp_frame_t frame; - int res = amqp_simple_wait_frame(state, &frame); - if (res < 0) - return res; - - if (frame.channel != expected_channel) - amqp_abort("Expected 0x%08X method frame on channel %d, got frame on channel %d", - expected_method, - expected_channel, - frame.channel); - if (frame.frame_type != AMQP_FRAME_METHOD) - amqp_abort("Expected 0x%08X method frame on channel %d, got frame type %d", - expected_method, - expected_channel, - frame.frame_type); - if (frame.payload.method.id != expected_method) - amqp_abort("Expected method ID 0x%08X on channel %d, got ID 0x%08X", - expected_method, - expected_channel, - frame.payload.method.id); - *output = frame.payload.method; - return 0; -} - -int amqp_send_method(amqp_connection_state_t state, - amqp_channel_t channel, - amqp_method_number_t id, - void *decoded) -{ - amqp_frame_t frame; - - frame.frame_type = AMQP_FRAME_METHOD; - frame.channel = channel; - frame.payload.method.id = id; - frame.payload.method.decoded = decoded; - return amqp_send_frame(state, &frame); -} - -static int amqp_id_in_reply_list( amqp_method_number_t expected, amqp_method_number_t *list ) -{ - while ( *list != 0 ) { - if ( *list == expected ) return 1; - list++; - } - return 0; -} - -amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state, - amqp_channel_t channel, - amqp_method_number_t request_id, - amqp_method_number_t *expected_reply_ids, - void *decoded_request_method) -{ - int status; - amqp_rpc_reply_t result; - - memset(&result, 0, sizeof(result)); - - status = amqp_send_method(state, channel, request_id, decoded_request_method); - if (status < 0) { - result.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION; - result.library_error = -status; - return result; - } - - { - amqp_frame_t frame; - - retry: - status = wait_frame_inner(state, &frame); - if (status < 0) { - result.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION; - result.library_error = -status; - return result; - } - - /* - * We store the frame for later processing unless it's something - * that directly affects us here, namely a method frame that is - * either - * - on the channel we want, and of the expected type, or - * - on the channel we want, and a channel.close frame, or - * - on channel zero, and a connection.close frame. - */ - if (!( (frame.frame_type == AMQP_FRAME_METHOD) && - ( ((frame.channel == channel) && - ((amqp_id_in_reply_list(frame.payload.method.id, expected_reply_ids)) || - (frame.payload.method.id == AMQP_CHANNEL_CLOSE_METHOD))) - || - ((frame.channel == 0) && - (frame.payload.method.id == AMQP_CONNECTION_CLOSE_METHOD)) ) )) - { - amqp_frame_t *frame_copy = amqp_pool_alloc(&state->decoding_pool, sizeof(amqp_frame_t)); - amqp_link_t *link = amqp_pool_alloc(&state->decoding_pool, sizeof(amqp_link_t)); - - if (frame_copy == NULL || link == NULL) { - result.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION; - result.library_error = ERROR_NO_MEMORY; - return result; - } - - *frame_copy = frame; - - link->next = NULL; - link->data = frame_copy; - - if (state->last_queued_frame == NULL) { - state->first_queued_frame = link; - } else { - state->last_queued_frame->next = link; - } - state->last_queued_frame = link; - - goto retry; - } - - result.reply_type = (amqp_id_in_reply_list(frame.payload.method.id, expected_reply_ids)) - ? AMQP_RESPONSE_NORMAL - : AMQP_RESPONSE_SERVER_EXCEPTION; - - result.reply = frame.payload.method; - return result; - } -} - -static int amqp_login_inner(amqp_connection_state_t state, - int channel_max, - int frame_max, - int heartbeat, - amqp_sasl_method_enum sasl_method, - va_list vl) -{ - int res; - amqp_method_t method; - uint32_t server_frame_max; - uint16_t server_channel_max; - uint16_t server_heartbeat; - - amqp_send_header(state); - - res = amqp_simple_wait_method(state, 0, AMQP_CONNECTION_START_METHOD, - &method); - if (res < 0) - return res; - - { - amqp_connection_start_t *s = (amqp_connection_start_t *) method.decoded; - if ((s->version_major != AMQP_PROTOCOL_VERSION_MAJOR) || - (s->version_minor != AMQP_PROTOCOL_VERSION_MINOR)) { - return -ERROR_INCOMPATIBLE_AMQP_VERSION; - } - - /* TODO: check that our chosen SASL mechanism is in the list of - acceptable mechanisms. Or even let the application choose from - the list! */ - } - - { - amqp_connection_start_ok_t s; - amqp_bytes_t response_bytes = sasl_response(&state->decoding_pool, - sasl_method, vl); - - if (response_bytes.bytes == NULL) - return -ERROR_NO_MEMORY; - - s.client_properties.num_entries = 0; - s.client_properties.entries = NULL; - s.mechanism = sasl_method_name(sasl_method); - s.response = response_bytes; - s.locale.bytes = "en_US"; - s.locale.len = 5; - - res = amqp_send_method(state, 0, AMQP_CONNECTION_START_OK_METHOD, &s); - if (res < 0) - return res; - } - - amqp_release_buffers(state); - - res = amqp_simple_wait_method(state, 0, AMQP_CONNECTION_TUNE_METHOD, - &method); - if (res < 0) - return res; - - { - amqp_connection_tune_t *s = (amqp_connection_tune_t *) method.decoded; - server_channel_max = s->channel_max; - server_frame_max = s->frame_max; - server_heartbeat = s->heartbeat; - } - - if (server_channel_max != 0 && server_channel_max < channel_max) - channel_max = server_channel_max; - - if (server_frame_max != 0 && server_frame_max < frame_max) - frame_max = server_frame_max; - - if (server_heartbeat != 0 && server_heartbeat < heartbeat) - heartbeat = server_heartbeat; - - res = amqp_tune_connection(state, channel_max, frame_max, heartbeat); - if (res < 0) - return res; - - { - amqp_connection_tune_ok_t s; - s.frame_max = frame_max; - s.channel_max = channel_max; - s.heartbeat = heartbeat; - - res = amqp_send_method(state, 0, AMQP_CONNECTION_TUNE_OK_METHOD, &s); - if (res < 0) - return res; - } - - amqp_release_buffers(state); - - return 0; -} - -amqp_rpc_reply_t amqp_login(amqp_connection_state_t state, - char const *vhost, - int channel_max, - int frame_max, - int heartbeat, - amqp_sasl_method_enum sasl_method, - ...) -{ - va_list vl; - amqp_rpc_reply_t result; - int status; - - va_start(vl, sasl_method); - - status = amqp_login_inner(state, channel_max, frame_max, heartbeat, sasl_method, vl); - if (status < 0) { - result.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION; - result.reply.id = 0; - result.reply.decoded = NULL; - result.library_error = -status; - return result; - } - - { - amqp_method_number_t replies[] = { AMQP_CONNECTION_OPEN_OK_METHOD, 0 }; - amqp_connection_open_t s; - s.virtual_host = amqp_cstring_bytes(vhost); - s.capabilities.len = 0; - s.capabilities.bytes = NULL; - s.insist = 1; - - result = amqp_simple_rpc(state, - 0, - AMQP_CONNECTION_OPEN_METHOD, - (amqp_method_number_t *) &replies, - &s); - if (result.reply_type != AMQP_RESPONSE_NORMAL) - return result; - } - amqp_maybe_release_buffers(state); - - va_end(vl); - - result.reply_type = AMQP_RESPONSE_NORMAL; - result.reply.id = 0; - result.reply.decoded = NULL; - result.library_error = 0; - return result; -} diff --git a/librabbitmq/amqp_table.c b/librabbitmq/amqp_table.c deleted file mode 100644 index e85217f..0000000 --- a/librabbitmq/amqp_table.c +++ /dev/null @@ -1,441 +0,0 @@ -/* - * ***** BEGIN LICENSE BLOCK ***** - * Version: MPL 1.1/GPL 2.0 - * - * The contents of this file are subject to the Mozilla Public License - * Version 1.1 (the "License"); you may not use this file except in - * compliance with the License. You may obtain a copy of the License at - * http://www.mozilla.org/MPL/ - * - * Software distributed under the License is distributed on an "AS IS" - * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See - * the License for the specific language governing rights and - * limitations under the License. - * - * The Original Code is librabbitmq. - * - * The Initial Developers of the Original Code are LShift Ltd, Cohesive - * Financial Technologies LLC, and Rabbit Technologies Ltd. Portions - * created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, Cohesive - * Financial Technologies LLC, or Rabbit Technologies Ltd are Copyright - * (C) 2007-2008 LShift Ltd, Cohesive Financial Technologies LLC, and - * Rabbit Technologies Ltd. - * - * Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift - * Ltd. Portions created by Cohesive Financial Technologies LLC are - * Copyright (C) 2007-2009 Cohesive Financial Technologies - * LLC. Portions created by Rabbit Technologies Ltd are Copyright (C) - * 2007-2009 Rabbit Technologies Ltd. - * - * Portions created by Tony Garnock-Jones are Copyright (C) 2009-2010 - * LShift Ltd and Tony Garnock-Jones. - * - * All Rights Reserved. - * - * Contributor(s): ______________________________________. - * - * Alternatively, the contents of this file may be used under the terms - * of the GNU General Public License Version 2 or later (the "GPL"), in - * which case the provisions of the GPL are applicable instead of those - * above. If you wish to allow use of your version of this file only - * under the terms of the GPL, and not to allow others to use your - * version of this file under the terms of the MPL, indicate your - * decision by deleting the provisions above and replace them with the - * notice and other provisions required by the GPL. If you do not - * delete the provisions above, a recipient may use your version of - * this file under the terms of any one of the MPL or the GPL. - * - * ***** END LICENSE BLOCK ***** - */ - -#include <stdlib.h> -#include <stdio.h> -#include <string.h> -#include <stdint.h> - -#include "amqp.h" -#include "amqp_private.h" - -#include <assert.h> - -#define INITIAL_ARRAY_SIZE 16 -#define INITIAL_TABLE_SIZE 16 - -static int amqp_decode_field_value(amqp_bytes_t encoded, - amqp_pool_t *pool, - amqp_field_value_t *entry, - size_t *offset); - -static int amqp_encode_field_value(amqp_bytes_t encoded, - amqp_field_value_t *entry, - size_t *offset); - -/*---------------------------------------------------------------------------*/ - -static int amqp_decode_array(amqp_bytes_t encoded, - amqp_pool_t *pool, - amqp_array_t *output, - size_t *offset) -{ - uint32_t arraysize; - int num_entries = 0; - int allocated_entries = INITIAL_ARRAY_SIZE; - amqp_field_value_t *entries; - size_t limit; - int res; - - if (!amqp_decode_32(encoded, offset, &arraysize)) - return -ERROR_BAD_AMQP_DATA; - - entries = malloc(allocated_entries * sizeof(amqp_field_value_t)); - if (entries == NULL) - return -ERROR_NO_MEMORY; - - limit = *offset + arraysize; - while (*offset < limit) { - if (num_entries >= allocated_entries) { - void *newentries; - allocated_entries = allocated_entries * 2; - newentries = realloc(entries, allocated_entries * sizeof(amqp_field_value_t)); - res = -ERROR_NO_MEMORY; - if (newentries == NULL) - goto out; - - entries = newentries; - } - - res = amqp_decode_field_value(encoded, pool, &entries[num_entries], - offset); - if (res < 0) - goto out; - - num_entries++; - } - - output->num_entries = num_entries; - output->entries = amqp_pool_alloc(pool, num_entries * sizeof(amqp_field_value_t)); - res = -ERROR_NO_MEMORY; - /* NULL is legitimate if we requested a zero-length block. */ - if (output->entries == NULL && num_entries > 0) - goto out; - - memcpy(output->entries, entries, num_entries * sizeof(amqp_field_value_t)); - res = 0; - - out: - free(entries); - return res; -} - -int amqp_decode_table(amqp_bytes_t encoded, - amqp_pool_t *pool, - amqp_table_t *output, - size_t *offset) -{ - uint32_t tablesize; - int num_entries = 0; - amqp_table_entry_t *entries; - int allocated_entries = INITIAL_TABLE_SIZE; - size_t limit; - int res; - - if (!amqp_decode_32(encoded, offset, &tablesize)) - return -ERROR_BAD_AMQP_DATA; - - entries = malloc(allocated_entries * sizeof(amqp_table_entry_t)); - if (entries == NULL) - return -ERROR_NO_MEMORY; - - limit = *offset + tablesize; - while (*offset < limit) { - uint8_t keylen; - - res = -ERROR_BAD_AMQP_DATA; - if (!amqp_decode_8(encoded, offset, &keylen)) - goto out; - - if (num_entries >= allocated_entries) { - void *newentries; - allocated_entries = allocated_entries * 2; - newentries = realloc(entries, allocated_entries * sizeof(amqp_table_entry_t)); - res = -ERROR_NO_MEMORY; - if (newentries == NULL) - goto out; - - entries = newentries; - } - - res = -ERROR_BAD_AMQP_DATA; - if (!amqp_decode_bytes(encoded, offset, &entries[num_entries].key, keylen)) - goto out; - - res = amqp_decode_field_value(encoded, pool, &entries[num_entries].value, - offset); - if (res < 0) - goto out; - - num_entries++; - } - - output->num_entries = num_entries; - output->entries = amqp_pool_alloc(pool, num_entries * sizeof(amqp_table_entry_t)); - res = -ERROR_NO_MEMORY; - /* NULL is legitimate if we requested a zero-length block. */ - if (output->entries == NULL && num_entries > 0) - goto out; - - memcpy(output->entries, entries, num_entries * sizeof(amqp_table_entry_t)); - res = 0; - - out: - free(entries); - return res; -} - -static int amqp_decode_field_value(amqp_bytes_t encoded, - amqp_pool_t *pool, - amqp_field_value_t *entry, - size_t *offset) -{ - int res = -ERROR_BAD_AMQP_DATA; - - if (!amqp_decode_8(encoded, offset, &entry->kind)) - goto out; - -#define TRIVIAL_FIELD_DECODER(bits) if (!amqp_decode_##bits(encoded, offset, &entry->value.u##bits)) goto out; break -#define SIMPLE_FIELD_DECODER(bits, dest, how) { uint##bits##_t val; if (!amqp_decode_##bits(encoded, offset, &val)) goto out; entry->value.dest = how; } break - - switch (entry->kind) { - case AMQP_FIELD_KIND_BOOLEAN: - SIMPLE_FIELD_DECODER(8, boolean, val ? 1 : 0); - - case AMQP_FIELD_KIND_I8: - SIMPLE_FIELD_DECODER(8, i8, (int8_t)val); - case AMQP_FIELD_KIND_U8: - TRIVIAL_FIELD_DECODER(8); - - case AMQP_FIELD_KIND_I16: - SIMPLE_FIELD_DECODER(16, i16, (int16_t)val); - case AMQP_FIELD_KIND_U16: - TRIVIAL_FIELD_DECODER(16); - - case AMQP_FIELD_KIND_I32: - SIMPLE_FIELD_DECODER(32, i32, (int32_t)val); - case AMQP_FIELD_KIND_U32: - TRIVIAL_FIELD_DECODER(32); - - case AMQP_FIELD_KIND_I64: - SIMPLE_FIELD_DECODER(64, i64, (int64_t)val); - case AMQP_FIELD_KIND_U64: - TRIVIAL_FIELD_DECODER(64); - - case AMQP_FIELD_KIND_F32: - TRIVIAL_FIELD_DECODER(32); - /* and by punning, f32 magically gets the right value...! */ - - case AMQP_FIELD_KIND_F64: - TRIVIAL_FIELD_DECODER(64); - /* and by punning, f64 magically gets the right value...! */ - - case AMQP_FIELD_KIND_DECIMAL: - if (!amqp_decode_8(encoded, offset, &entry->value.decimal.decimals) - || !amqp_decode_32(encoded, offset, &entry->value.decimal.value)) - goto out; - break; - - case AMQP_FIELD_KIND_UTF8: - /* AMQP_FIELD_KIND_UTF8 and AMQP_FIELD_KIND_BYTES have the - same implementation, but different interpretations. */ - /* fall through */ - case AMQP_FIELD_KIND_BYTES: { - uint32_t len; - if (!amqp_decode_32(encoded, offset, &len) - || !amqp_decode_bytes(encoded, offset, &entry->value.bytes, len)) - goto out; - break; - } - - case AMQP_FIELD_KIND_ARRAY: - res = amqp_decode_array(encoded, pool, &(entry->value.array), offset); - goto out; - - case AMQP_FIELD_KIND_TIMESTAMP: - TRIVIAL_FIELD_DECODER(64); - - case AMQP_FIELD_KIND_TABLE: - res = amqp_decode_table(encoded, pool, &(entry->value.table), offset); - goto out; - - case AMQP_FIELD_KIND_VOID: - break; - - default: - goto out; - } - - res = 0; - - out: - return res; -} - -/*---------------------------------------------------------------------------*/ - -static int amqp_encode_array(amqp_bytes_t encoded, - amqp_array_t *input, - size_t *offset) -{ - size_t start = *offset; - int i, res; - - *offset += 4; /* size of the array gets filled in later on */ - - for (i = 0; i < input->num_entries; i++) { - res = amqp_encode_field_value(encoded, &input->entries[i], offset); - if (res < 0) - goto out; - } - - if (amqp_encode_32(encoded, &start, *offset - start - 4)) - res = 0; - else - res = -ERROR_BAD_AMQP_DATA; - - out: - return res; -} - -int amqp_encode_table(amqp_bytes_t encoded, - amqp_table_t *input, - size_t *offset) -{ - size_t start = *offset; - int i, res; - - *offset += 4; /* size of the table gets filled in later on */ - - for (i = 0; i < input->num_entries; i++) { - res = amqp_encode_8(encoded, offset, input->entries[i].key.len); - if (res < 0) - goto out; - - res = amqp_encode_bytes(encoded, offset, input->entries[i].key); - if (res < 0) - goto out; - - res = amqp_encode_field_value(encoded, &input->entries[i].value, offset); - if (res < 0) - goto out; - } - - if (amqp_encode_32(encoded, &start, *offset - start - 4)) - res = 0; - else - res = -ERROR_BAD_AMQP_DATA; - - out: - return res; -} - -static int amqp_encode_field_value(amqp_bytes_t encoded, - amqp_field_value_t *entry, - size_t *offset) -{ - int res = -ERROR_BAD_AMQP_DATA; - - if (!amqp_encode_8(encoded, offset, entry->kind)) - goto out; - -#define FIELD_ENCODER(bits, val) if (!amqp_encode_##bits(encoded, offset, val)) goto out; break - - switch (entry->kind) { - case AMQP_FIELD_KIND_BOOLEAN: - FIELD_ENCODER(8, entry->value.boolean ? 1 : 0); - - case AMQP_FIELD_KIND_I8: - FIELD_ENCODER(8, entry->value.i8); - case AMQP_FIELD_KIND_U8: - FIELD_ENCODER(8, entry->value.u8); - - case AMQP_FIELD_KIND_I16: - FIELD_ENCODER(16, entry->value.i16); - case AMQP_FIELD_KIND_U16: - FIELD_ENCODER(16, entry->value.u16); - - case AMQP_FIELD_KIND_I32: - FIELD_ENCODER(32, entry->value.i32); - case AMQP_FIELD_KIND_U32: - FIELD_ENCODER(32, entry->value.u32); - - case AMQP_FIELD_KIND_I64: - FIELD_ENCODER(64, entry->value.i64); - case AMQP_FIELD_KIND_U64: - FIELD_ENCODER(64, entry->value.u64); - - case AMQP_FIELD_KIND_F32: - /* by punning, u32 magically gets the right value...! */ - FIELD_ENCODER(32, entry->value.u32); - - case AMQP_FIELD_KIND_F64: - /* by punning, u64 magically gets the right value...! */ - FIELD_ENCODER(64, entry->value.u64); - - case AMQP_FIELD_KIND_DECIMAL: - if (!amqp_encode_8(encoded, offset, entry->value.decimal.decimals) - || !amqp_encode_32(encoded, offset, entry->value.decimal.value)) - goto out; - break; - - case AMQP_FIELD_KIND_UTF8: - /* AMQP_FIELD_KIND_UTF8 and AMQP_FIELD_KIND_BYTES have the - same implementation, but different interpretations. */ - /* fall through */ - case AMQP_FIELD_KIND_BYTES: - if (!amqp_encode_32(encoded, offset, entry->value.bytes.len) - || !amqp_encode_bytes(encoded, offset, entry->value.bytes)) - goto out; - break; - - case AMQP_FIELD_KIND_ARRAY: - res = amqp_encode_array(encoded, &entry->value.array, offset); - goto out; - - case AMQP_FIELD_KIND_TIMESTAMP: - FIELD_ENCODER(64, entry->value.u64); - - case AMQP_FIELD_KIND_TABLE: - res = amqp_encode_table(encoded, &entry->value.table, offset); - goto out; - - case AMQP_FIELD_KIND_VOID: - break; - - default: - abort(); - } - - res = 0; - - out: - return res; -} - -/*---------------------------------------------------------------------------*/ - -int amqp_table_entry_cmp(void const *entry1, void const *entry2) { - amqp_table_entry_t const *p1 = (amqp_table_entry_t const *) entry1; - amqp_table_entry_t const *p2 = (amqp_table_entry_t const *) entry2; - - int d; - int minlen; - - minlen = p1->key.len; - if (p2->key.len < minlen) minlen = p2->key.len; - - d = memcmp(p1->key.bytes, p2->key.bytes, minlen); - if (d != 0) { - return d; - } - - return p1->key.len - p2->key.len; -} diff --git a/librabbitmq/codegen.py b/librabbitmq/codegen.py deleted file mode 100644 index 6fd149e..0000000 --- a/librabbitmq/codegen.py +++ /dev/null @@ -1,531 +0,0 @@ -# ***** BEGIN LICENSE BLOCK ***** -# Version: MPL 1.1/GPL 2.0 -# -# The contents of this file are subject to the Mozilla Public License -# Version 1.1 (the "License"); you may not use this file except in -# compliance with the License. You may obtain a copy of the License at -# http://www.mozilla.org/MPL/ -# -# Software distributed under the License is distributed on an "AS IS" -# basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -# the License for the specific language governing rights and -# limitations under the License. -# -# The Original Code is librabbitmq. -# -# The Initial Developers of the Original Code are LShift Ltd, Cohesive -# Financial Technologies LLC, and Rabbit Technologies Ltd. Portions -# created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, Cohesive -# Financial Technologies LLC, or Rabbit Technologies Ltd are Copyright -# (C) 2007-2008 LShift Ltd, Cohesive Financial Technologies LLC, and -# Rabbit Technologies Ltd. -# -# Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift -# Ltd. Portions created by Cohesive Financial Technologies LLC are -# Copyright (C) 2007-2009 Cohesive Financial Technologies -# LLC. Portions created by Rabbit Technologies Ltd are Copyright (C) -# 2007-2009 Rabbit Technologies Ltd. -# -# Portions created by Tony Garnock-Jones are Copyright (C) 2009-2010 -# LShift Ltd and Tony Garnock-Jones. -# -# All Rights Reserved. -# -# Contributor(s): ______________________________________. -# -# Alternatively, the contents of this file may be used under the terms -# of the GNU General Public License Version 2 or later (the "GPL"), in -# which case the provisions of the GPL are applicable instead of those -# above. If you wish to allow use of your version of this file only -# under the terms of the GPL, and not to allow others to use your -# version of this file under the terms of the MPL, indicate your -# decision by deleting the provisions above and replace them with the -# notice and other provisions required by the GPL. If you do not -# delete the provisions above, a recipient may use your version of -# this file under the terms of any one of the MPL or the GPL. -# -# ***** END LICENSE BLOCK ***** - -from __future__ import nested_scopes - -from amqp_codegen import * -import string -import re - - -class Emitter(object): - """An object the trivially emits generated code lines. - - This largely exists to be wrapped by more sophisticated emitter - classes. - """ - - def __init__(self, prefix): - self.prefix = prefix - - def emit(self, line): - """Emit a line of generated code.""" - print self.prefix + line - - -class BitDecoder(object): - """An emitter object that keeps track of the state involved in - decoding the AMQP bit type.""" - - def __init__(self, emitter): - self.emitter = emitter - self.bit = 0 - - def emit(self, line): - self.bit = 0 - self.emitter.emit(line) - - def decode_bit(self, lvalue): - """Generate code to decode a value of the AMQP bit type into - the given lvalue.""" - if self.bit == 0: - self.emitter.emit("if (!amqp_decode_8(encoded, &offset, &bit_buffer)) return -ERROR_BAD_AMQP_DATA;") - - self.emitter.emit("%s = (bit_buffer & (1 << %d)) ? 1 : 0;" - % (lvalue, self.bit)) - self.bit += 1 - if self.bit == 8: - self.bit = 0 - - -class BitEncoder(object): - """An emitter object that keeps track of the state involved in - encoding the AMQP bit type.""" - - def __init__(self, emitter): - self.emitter = emitter - self.bit = 0 - - def flush(self): - """Flush the state associated with AMQP bit types.""" - if self.bit: - self.emitter.emit("if (!amqp_encode_8(encoded, &offset, bit_buffer)) return -ERROR_BAD_AMQP_DATA;") - self.bit = 0 - - def emit(self, line): - self.flush() - self.emitter.emit(line) - - def encode_bit(self, value): - """Generate code to ebcode a value of the AMQP bit type from - the given value.""" - if self.bit == 0: - self.emitter.emit("bit_buffer = 0;") - - self.emitter.emit("if (%s) bit_buffer |= (1 << %d);" - % (value, self.bit)) - self.bit += 1 - if self.bit == 8: - self.flush() - - -class SimpleType(object): - """A AMQP type that corresponds to a simple scalar C value of a - certain width.""" - - def __init__(self, bits): - self.bits = bits - self.ctype = "uint%d_t" % (bits,) - - def decode(self, emitter, lvalue): - emitter.emit("if (!amqp_decode_%d(encoded, &offset, &%s)) return -ERROR_BAD_AMQP_DATA;" % (self.bits, lvalue)) - - def encode(self, emitter, value): - emitter.emit("if (!amqp_encode_%d(encoded, &offset, %s)) return -ERROR_BAD_AMQP_DATA;" % (self.bits, value)) - - -class StrType(object): - """The AMQP shortstr or longstr types.""" - - def __init__(self, lenbits): - self.lenbits = lenbits - self.ctype = "amqp_bytes_t" - - def decode(self, emitter, lvalue): - emitter.emit("{") - emitter.emit(" uint%d_t len;" % (self.lenbits,)) - emitter.emit(" if (!amqp_decode_%d(encoded, &offset, &len)" % (self.lenbits,)) - emitter.emit(" || !amqp_decode_bytes(encoded, &offset, &%s, len))" % (lvalue,)) - emitter.emit(" return -ERROR_BAD_AMQP_DATA;") - emitter.emit("}") - - def encode(self, emitter, value): - emitter.emit("if (!amqp_encode_%d(encoded, &offset, %s.len)" % (self.lenbits, value)) - emitter.emit(" || !amqp_encode_bytes(encoded, &offset, %s))" % (value,)) - emitter.emit(" return -ERROR_BAD_AMQP_DATA;") - - -class BitType(object): - """The AMQP bit type.""" - - def __init__(self): - self.ctype = "amqp_boolean_t" - - def decode(self, emitter, lvalue): - emitter.decode_bit(lvalue) - - def encode(self, emitter, value): - emitter.encode_bit(value) - - -class TableType(object): - """The AMQP table type.""" - - def __init__(self): - self.ctype = "amqp_table_t" - - def decode(self, emitter, lvalue): - emitter.emit("{") - emitter.emit(" int res = amqp_decode_table(encoded, pool, &(%s), &offset);" % (lvalue,)) - emitter.emit(" if (res < 0) return res;") - emitter.emit("}") - - def encode(self, emitter, value): - emitter.emit("{") - emitter.emit(" int res = amqp_encode_table(encoded, &(%s), &offset);" % (value,)) - emitter.emit(" if (res < 0) return res;") - emitter.emit("}") - - -types = { - 'octet': SimpleType(8), - 'short': SimpleType(16), - 'long': SimpleType(32), - 'longlong': SimpleType(64), - 'shortstr': StrType(8), - 'longstr': StrType(32), - 'bit': BitType(), - 'table': TableType(), - 'timestamp': SimpleType(64), -} - -def typeFor(spec, f): - """Get a representation of the AMQP type of a field.""" - return types[spec.resolveDomain(f.domain)] - -def c_ize(s): - s = s.replace('-', '_') - s = s.replace(' ', '_') - return s - -AmqpMethod.defName = lambda m: cConstantName(c_ize(m.klass.name) + '_' + c_ize(m.name) + "_method") -AmqpMethod.structName = lambda m: "amqp_" + c_ize(m.klass.name) + '_' + c_ize(m.name) + "_t" - -AmqpClass.structName = lambda c: "amqp_" + c_ize(c.name) + "_properties_t" - -def cConstantName(s): - return 'AMQP_' + '_'.join(re.split('[- ]', s.upper())) - -def cFlagName(c, f): - return cConstantName(c.name + '_' + f.name) + '_FLAG' - -def genErl(spec): - def fieldTempList(fields): - return '[' + ', '.join(['F' + str(f.index) for f in fields]) + ']' - - def fieldMapList(fields): - return ', '.join([c_ize(f.name) + " = F" + str(f.index) for f in fields]) - - def genLookupMethodName(m): - print ' case %s: return "%s";' % (m.defName(), m.defName()) - - def genDecodeMethodFields(m): - print " case %s: {" % (m.defName(),) - if m.arguments: - print " %s *m = (%s *) amqp_pool_alloc(pool, sizeof(%s));" % \ - (m.structName(), m.structName(), m.structName()) - print " if (m == NULL) { return -ERROR_NO_MEMORY; }" - else: - print " %s *m = NULL; /* no fields */" % (m.structName(),) - - emitter = BitDecoder(Emitter(" ")) - for f in m.arguments: - typeFor(spec, f).decode(emitter, "m->"+c_ize(f.name)) - - print " *decoded = m;" - print " return 0;" - print " }" - - def genDecodeProperties(c): - print " case %d: {" % (c.index,) - print " %s *p = (%s *) amqp_pool_alloc(pool, sizeof(%s));" % \ - (c.structName(), c.structName(), c.structName()) - print " if (p == NULL) { return -ERROR_NO_MEMORY; }" - print " p->_flags = flags;" - - emitter = Emitter(" ") - for f in c.fields: - emitter.emit("if (flags & %s) {" % (cFlagName(c, f),)) - typeFor(spec, f).decode(emitter, "p->"+c_ize(f.name)) - emitter.emit("}") - - print " *decoded = p;" - print " return 0;" - print " }" - - def genEncodeMethodFields(m): - print " case %s: {" % (m.defName(),) - if m.arguments: - print " %s *m = (%s *) decoded;" % (m.structName(), m.structName()) - - emitter = BitEncoder(Emitter(" ")) - for f in m.arguments: - typeFor(spec, f).encode(emitter, "m->"+c_ize(f.name)) - emitter.flush() - - print " return offset;" - print " }" - - def genEncodeProperties(c): - print " case %d: {" % (c.index,) - if c.fields: - print " %s *p = (%s *) decoded;" % (c.structName(), c.structName()) - - emitter = Emitter(" ") - for f in c.fields: - emitter.emit(" if (flags & %s) {" % (cFlagName(c, f),)) - typeFor(spec, f).encode(emitter, "p->"+c_ize(f.name)) - emitter.emit("}") - - print " return offset;" - print " }" - - methods = spec.allMethods() - - print '/* Autogenerated code. Do not edit. */' - print - print '#include <stdlib.h>' - print '#include <stdint.h>' - print '#include <string.h>' - print '#include <stdio.h>' - print - print '#include "amqp.h"' - print '#include "amqp_framing.h"' - print '#include "amqp_private.h"' - print '#include "socket.h"' - - print """ -char const *amqp_constant_name(int constantNumber) { - switch (constantNumber) {""" - for (c,v,cls) in spec.constants: - print " case %s: return \"%s\";" % (cConstantName(c), cConstantName(c)) - print """ default: return "(unknown)"; - } -}""" - - print """ -amqp_boolean_t amqp_constant_is_hard_error(int constantNumber) { - switch (constantNumber) {""" - for (c,v,cls) in spec.constants: - if cls == 'hard-error': - print " case %s: return 1;" % (cConstantName(c),) - print """ default: return 0; - } -}""" - - print """ -char const *amqp_method_name(amqp_method_number_t methodNumber) { - switch (methodNumber) {""" - for m in methods: genLookupMethodName(m) - print """ default: return NULL; - } -}""" - - print """ -amqp_boolean_t amqp_method_has_content(amqp_method_number_t methodNumber) { - switch (methodNumber) {""" - for m in methods: - if m.hasContent: - print ' case %s: return 1;' % (m.defName()) - print """ default: return 0; - } -}""" - - print """ -int amqp_decode_method(amqp_method_number_t methodNumber, - amqp_pool_t *pool, - amqp_bytes_t encoded, - void **decoded) -{ - size_t offset = 0; - uint8_t bit_buffer; - - switch (methodNumber) {""" - for m in methods: genDecodeMethodFields(m) - print """ default: return -ERROR_UNKNOWN_METHOD; - } -}""" - - print """ -int amqp_decode_properties(uint16_t class_id, - amqp_pool_t *pool, - amqp_bytes_t encoded, - void **decoded) -{ - size_t offset = 0; - - amqp_flags_t flags = 0; - int flagword_index = 0; - uint16_t partial_flags; - - do { - if (!amqp_decode_16(encoded, &offset, &partial_flags)) - return -ERROR_BAD_AMQP_DATA; - flags |= (partial_flags << (flagword_index * 16)); - flagword_index++; - } while (partial_flags & 1); - - switch (class_id) {""" - for c in spec.allClasses(): genDecodeProperties(c) - print """ default: return -ERROR_UNKNOWN_CLASS; - } -}""" - - print """ -int amqp_encode_method(amqp_method_number_t methodNumber, - void *decoded, - amqp_bytes_t encoded) -{ - size_t offset = 0; - uint8_t bit_buffer; - - switch (methodNumber) {""" - for m in methods: genEncodeMethodFields(m) - print """ default: return -ERROR_UNKNOWN_METHOD; - } -}""" - - print """ -int amqp_encode_properties(uint16_t class_id, - void *decoded, - amqp_bytes_t encoded) -{ - size_t offset = 0; - - /* Cheat, and get the flags out generically, relying on the - similarity of structure between classes */ - amqp_flags_t flags = * (amqp_flags_t *) decoded; /* cheating! */ - - { - /* We take a copy of flags to avoid destroying it, as it is used - in the autogenerated code below. */ - amqp_flags_t remaining_flags = flags; - do { - amqp_flags_t remainder = remaining_flags >> 16; - uint16_t partial_flags = remaining_flags & 0xFFFE; - if (remainder != 0) { partial_flags |= 1; } - if (!amqp_encode_16(encoded, &offset, partial_flags)) - return -ERROR_BAD_AMQP_DATA; - remaining_flags = remainder; - } while (remaining_flags != 0); - } - - switch (class_id) {""" - for c in spec.allClasses(): genEncodeProperties(c) - print """ default: return -ERROR_UNKNOWN_CLASS; - } -}""" - -def genHrl(spec): - def fieldDeclList(fields): - if fields: - return ''.join([" %s %s;\n" % (typeFor(spec, f).ctype, - c_ize(f.name)) - for f in fields]) - else: - return " char dummy; /* Dummy field to avoid empty struct */\n" - - def propDeclList(fields): - return ''.join([" %s %s;\n" % (typeFor(spec, f).ctype, c_ize(f.name)) - for f in fields - if spec.resolveDomain(f.domain) != 'bit']) - - methods = spec.allMethods() - - print """/* Autogenerated code. Do not edit. */ -#ifndef librabbitmq_amqp_framing_h -#define librabbitmq_amqp_framing_h - -#ifdef __cplusplus -extern "C" { -#endif -""" - print "#define AMQP_PROTOCOL_VERSION_MAJOR %d" % (spec.major) - print "#define AMQP_PROTOCOL_VERSION_MINOR %d" % (spec.minor) - print "#define AMQP_PROTOCOL_VERSION_REVISION %d" % (spec.revision) - print "#define AMQP_PROTOCOL_PORT %d" % (spec.port) - - for (c,v,cls) in spec.constants: - print "#define %s %s" % (cConstantName(c), v) - print - - print """/* Function prototypes. */ -extern char const *amqp_constant_name(int constantNumber); -extern amqp_boolean_t amqp_constant_is_hard_error(int constantNumber); -RABBITMQ_EXPORT char const *amqp_method_name(amqp_method_number_t methodNumber); -extern amqp_boolean_t amqp_method_has_content(amqp_method_number_t methodNumber); -extern int amqp_decode_method(amqp_method_number_t methodNumber, - amqp_pool_t *pool, - amqp_bytes_t encoded, - void **decoded); -extern int amqp_decode_properties(uint16_t class_id, - amqp_pool_t *pool, - amqp_bytes_t encoded, - void **decoded); -extern int amqp_encode_method(amqp_method_number_t methodNumber, - void *decoded, - amqp_bytes_t encoded); -extern int amqp_encode_properties(uint16_t class_id, - void *decoded, - amqp_bytes_t encoded); -""" - - print "/* Method field records. */" - for m in methods: - methodid = m.klass.index << 16 | m.index - print "#define %s ((amqp_method_number_t) 0x%.08X) /* %d, %d; %d */" % \ - (m.defName(), - methodid, - m.klass.index, - m.index, - methodid) - print "typedef struct %s_ {\n%s} %s;\n" % \ - (m.structName(), fieldDeclList(m.arguments), m.structName()) - - print "/* Class property records. */" - for c in spec.allClasses(): - print "#define %s (0x%.04X) /* %d */" % \ - (cConstantName(c.name + "_class"), c.index, c.index) - index = 0 - for f in c.fields: - if index % 16 == 15: - index = index + 1 - shortnum = index / 16 - partialindex = 15 - (index % 16) - bitindex = shortnum * 16 + partialindex - print '#define %s (1 << %d)' % (cFlagName(c, f), bitindex) - index = index + 1 - print "typedef struct %s_ {\n amqp_flags_t _flags;\n%s} %s;\n" % \ - (c.structName(), - fieldDeclList(c.fields), - c.structName()) - - print """#ifdef __cplusplus -} -#endif - -#endif""" - -def generateErl(specPath): - genErl(AmqpSpec(specPath)) - -def generateHrl(specPath): - genHrl(AmqpSpec(specPath)) - -if __name__ == "__main__": - do_main(generateHrl, generateErl) diff --git a/librabbitmq/unix/socket.c b/librabbitmq/unix/socket.c deleted file mode 100644 index 4f5368e..0000000 --- a/librabbitmq/unix/socket.c +++ /dev/null @@ -1,92 +0,0 @@ -/* - * ***** BEGIN LICENSE BLOCK ***** - * Version: MPL 1.1/GPL 2.0 - * - * The contents of this file are subject to the Mozilla Public License - * Version 1.1 (the "License"); you may not use this file except in - * compliance with the License. You may obtain a copy of the License at - * http://www.mozilla.org/MPL/ - * - * Software distributed under the License is distributed on an "AS IS" - * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See - * the License for the specific language governing rights and - * limitations under the License. - * - * The Original Code is librabbitmq. - * - * The Initial Developers of the Original Code are LShift Ltd, Cohesive - * Financial Technologies LLC, and Rabbit Technologies Ltd. Portions - * created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, Cohesive - * Financial Technologies LLC, or Rabbit Technologies Ltd are Copyright - * (C) 2007-2008 LShift Ltd, Cohesive Financial Technologies LLC, and - * Rabbit Technologies Ltd. - * - * Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift - * Ltd. Portions created by Cohesive Financial Technologies LLC are - * Copyright (C) 2007-2010 Cohesive Financial Technologies - * LLC. Portions created by Rabbit Technologies Ltd are Copyright (C) - * 2007-2010 Rabbit Technologies Ltd. - * - * Portions created by Tony Garnock-Jones are Copyright (C) 2009-2010 - * LShift Ltd and Tony Garnock-Jones. - * - * All Rights Reserved. - * - * Contributor(s): ______________________________________. - * - * Alternatively, the contents of this file may be used under the terms - * of the GNU General Public License Version 2 or later (the "GPL"), in - * which case the provisions of the GPL are applicable instead of those - * above. If you wish to allow use of your version of this file only - * under the terms of the GPL, and not to allow others to use your - * version of this file under the terms of the MPL, indicate your - * decision by deleting the provisions above and replace them with the - * notice and other provisions required by the GPL. If you do not - * delete the provisions above, a recipient may use your version of - * this file under the terms of any one of the MPL or the GPL. - * - * ***** END LICENSE BLOCK ***** - */ - -#include <sys/socket.h> -#include <unistd.h> -#include <fcntl.h> -#include <stdint.h> -#include <string.h> -#include <stdlib.h> - -#include "amqp.h" -#include "amqp_private.h" -#include "socket.h" - -int amqp_socket_socket(int domain, int type, int proto) -{ - int flags; - - int s = socket(domain, type, proto); - if (s < 0) - return s; - - /* Always enable CLOEXEC on the socket */ - flags = fcntl(s, F_GETFD); - if (flags == -1 - || fcntl(s, F_SETFD, (long)(flags | FD_CLOEXEC)) == -1) { - int e = errno; - close(s); - errno = e; - return -1; - } - - return s; -} - -/* strdup is not in ISO C90! */ -static inline char *strdup(const char *str) -{ - return strcpy(malloc(strlen(str) + 1),str); -} - -char *amqp_os_error_string(int err) -{ - return strdup(strerror(err)); -} diff --git a/librabbitmq/unix/socket.h b/librabbitmq/unix/socket.h deleted file mode 100644 index 5cb37f1..0000000 --- a/librabbitmq/unix/socket.h +++ /dev/null @@ -1,79 +0,0 @@ -#ifndef librabbitmq_unix_socket_h -#define librabbitmq_unix_socket_h - -/* - * ***** BEGIN LICENSE BLOCK ***** - * Version: MPL 1.1/GPL 2.0 - * - * The contents of this file are subject to the Mozilla Public License - * Version 1.1 (the "License"); you may not use this file except in - * compliance with the License. You may obtain a copy of the License at - * http://www.mozilla.org/MPL/ - * - * Software distributed under the License is distributed on an "AS IS" - * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See - * the License for the specific language governing rights and - * limitations under the License. - * - * The Original Code is librabbitmq. - * - * The Initial Developers of the Original Code are LShift Ltd, Cohesive - * Financial Technologies LLC, and Rabbit Technologies Ltd. Portions - * created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, Cohesive - * Financial Technologies LLC, or Rabbit Technologies Ltd are Copyright - * (C) 2007-2008 LShift Ltd, Cohesive Financial Technologies LLC, and - * Rabbit Technologies Ltd. - * - * Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift - * Ltd. Portions created by Cohesive Financial Technologies LLC are - * Copyright (C) 2007-2010 Cohesive Financial Technologies - * LLC. Portions created by Rabbit Technologies Ltd are Copyright (C) - * 2007-2010 Rabbit Technologies Ltd. - * - * Portions created by Tony Garnock-Jones are Copyright (C) 2009-2010 - * LShift Ltd and Tony Garnock-Jones. - * - * All Rights Reserved. - * - * Contributor(s): ______________________________________. - * - * Alternatively, the contents of this file may be used under the terms - * of the GNU General Public License Version 2 or later (the "GPL"), in - * which case the provisions of the GPL are applicable instead of those - * above. If you wish to allow use of your version of this file only - * under the terms of the GPL, and not to allow others to use your - * version of this file under the terms of the MPL, indicate your - * decision by deleting the provisions above and replace them with the - * notice and other provisions required by the GPL. If you do not - * delete the provisions above, a recipient may use your version of - * this file under the terms of any one of the MPL or the GPL. - * - * ***** END LICENSE BLOCK ***** - */ - -#include <errno.h> -#include <sys/types.h> -#include <unistd.h> -#include <sys/uio.h> -#include <sys/socket.h> -#include <netdb.h> -#include <netinet/in.h> -#include <netinet/tcp.h> - -static inline int amqp_socket_init(void) -{ - return 0; -} - -extern int amqp_socket_socket(int domain, int type, int proto); - -#define amqp_socket_setsockopt setsockopt -#define amqp_socket_close close -#define amqp_socket_writev writev - -static inline int amqp_socket_error() -{ - return errno | ERROR_CATEGORY_OS; -} - -#endif diff --git a/librabbitmq/windows/socket.c b/librabbitmq/windows/socket.c deleted file mode 100644 index bef7b95..0000000 --- a/librabbitmq/windows/socket.c +++ /dev/null @@ -1,98 +0,0 @@ -/* - * ***** BEGIN LICENSE BLOCK ***** - * Version: MPL 1.1/GPL 2.0 - * - * The contents of this file are subject to the Mozilla Public License - * Version 1.1 (the "License"); you may not use this file except in - * compliance with the License. You may obtain a copy of the License at - * http://www.mozilla.org/MPL/ - * - * Software distributed under the License is distributed on an "AS IS" - * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See - * the License for the specific language governing rights and - * limitations under the License. - * - * The Original Code is librabbitmq. - * - * The Initial Developers of the Original Code are LShift Ltd, Cohesive - * Financial Technologies LLC, and Rabbit Technologies Ltd. Portions - * created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, Cohesive - * Financial Technologies LLC, or Rabbit Technologies Ltd are Copyright - * (C) 2007-2008 LShift Ltd, Cohesive Financial Technologies LLC, and - * Rabbit Technologies Ltd. - * - * Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift - * Ltd. Portions created by Cohesive Financial Technologies LLC are - * Copyright (C) 2007-2010 Cohesive Financial Technologies - * LLC. Portions created by Rabbit Technologies Ltd are Copyright (C) - * 2007-2010 Rabbit Technologies Ltd. - * - * Portions created by Tony Garnock-Jones are Copyright (C) 2009-2010 - * LShift Ltd and Tony Garnock-Jones. - * - * All Rights Reserved. - * - * Contributor(s): ______________________________________. - * - * Alternatively, the contents of this file may be used under the terms - * of the GNU General Public License Version 2 or later (the "GPL"), in - * which case the provisions of the GPL are applicable instead of those - * above. If you wish to allow use of your version of this file only - * under the terms of the GPL, and not to allow others to use your - * version of this file under the terms of the MPL, indicate your - * decision by deleting the provisions above and replace them with the - * notice and other provisions required by the GPL. If you do not - * delete the provisions above, a recipient may use your version of - * this file under the terms of any one of the MPL or the GPL. - * - * ***** END LICENSE BLOCK ***** - */ - -/* See http://msdn.microsoft.com/en-us/library/ms737629%28VS.85%29.aspx */ -#define WIN32_LEAN_AND_MEAN - -#include <windows.h> -#include <stdint.h> -#include <stdlib.h> - -#include "amqp.h" -#include "amqp_private.h" -#include "socket.h" - -static int called_wsastartup; - -int amqp_socket_init(void) -{ - if (!called_wsastartup) { - WSADATA data; - int res = WSAStartup(0x0202, &data); - if (res) - return -res; - - called_wsastartup = 1; - } - - return 0; -} - -/* strdup is not in ISO C90! */ -static inline char *strdup(const char *str) -{ - return strcpy(malloc(strlen(str) + 1),str); -} - -char *amqp_os_error_string(int err) -{ - char *msg, *copy; - - if (!FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM - | FORMAT_MESSAGE_ALLOCATE_BUFFER, - NULL, err, - MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), - (LPSTR)&msg, 0, NULL)) - return strdup("(error retrieving Windows error message)"); - - copy = strdup(msg); - LocalFree(msg); - return copy; -} diff --git a/librabbitmq/windows/socket.h b/librabbitmq/windows/socket.h deleted file mode 100644 index 38ca905..0000000 --- a/librabbitmq/windows/socket.h +++ /dev/null @@ -1,89 +0,0 @@ -#ifndef librabbitmq_windows_socket_h -#define librabbitmq_windows_socket_h - -/* - * ***** BEGIN LICENSE BLOCK ***** - * Version: MPL 1.1/GPL 2.0 - * - * The contents of this file are subject to the Mozilla Public License - * Version 1.1 (the "License"); you may not use this file except in - * compliance with the License. You may obtain a copy of the License at - * http://www.mozilla.org/MPL/ - * - * Software distributed under the License is distributed on an "AS IS" - * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See - * the License for the specific language governing rights and - * limitations under the License. - * - * The Original Code is librabbitmq. - * - * The Initial Developers of the Original Code are LShift Ltd, Cohesive - * Financial Technologies LLC, and Rabbit Technologies Ltd. Portions - * created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, Cohesive - * Financial Technologies LLC, or Rabbit Technologies Ltd are Copyright - * (C) 2007-2008 LShift Ltd, Cohesive Financial Technologies LLC, and - * Rabbit Technologies Ltd. - * - * Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift - * Ltd. Portions created by Cohesive Financial Technologies LLC are - * Copyright (C) 2007-2010 Cohesive Financial Technologies - * LLC. Portions created by Rabbit Technologies Ltd are Copyright (C) - * 2007-2010 Rabbit Technologies Ltd. - * - * Portions created by Tony Garnock-Jones are Copyright (C) 2009-2010 - * LShift Ltd and Tony Garnock-Jones. - * - * All Rights Reserved. - * - * Contributor(s): ______________________________________. - * - * Alternatively, the contents of this file may be used under the terms - * of the GNU General Public License Version 2 or later (the "GPL"), in - * which case the provisions of the GPL are applicable instead of those - * above. If you wish to allow use of your version of this file only - * under the terms of the GPL, and not to allow others to use your - * version of this file under the terms of the MPL, indicate your - * decision by deleting the provisions above and replace them with the - * notice and other provisions required by the GPL. If you do not - * delete the provisions above, a recipient may use your version of - * this file under the terms of any one of the MPL or the GPL. - * - * ***** END LICENSE BLOCK ***** - */ - -#include <winsock2.h> - -extern int amqp_socket_init(void); - -#define amqp_socket_socket socket -#define amqp_socket_close closesocket - -static inline int amqp_socket_setsockopt(int sock, int level, int optname, - const void *optval, size_t optlen) -{ - /* the winsock setsockopt function has its 4th argument as a - const char * */ - return setsockopt(sock, level, optname, (const char *)optval, optlen); -} - -/* same as WSABUF */ -struct iovec { - u_long iov_len; - void *iov_base; -}; - -static inline int amqp_socket_writev(int sock, struct iovec *iov, int nvecs) -{ - DWORD ret; - if (WSASend(sock, (LPWSABUF)iov, nvecs, &ret, 0, NULL, NULL) == 0) - return ret; - else - return -1; -} - -static inline int amqp_socket_error() -{ - return WSAGetLastError() | ERROR_CATEGORY_OS; -} - -#endif |