diff options
author | Ask Solem <ask@celeryproject.org> | 2014-04-14 17:29:03 +0100 |
---|---|---|
committer | Ask Solem <ask@celeryproject.org> | 2014-04-14 17:29:03 +0100 |
commit | be3000b4c84d7503f5ef4067de44ff16d060d158 (patch) | |
tree | fecacb0f149b067202c443b59aad3cc027a0ff1c /librabbitmq | |
parent | dcb8edaccd6e164d624edfab0f3120d96f707f0a (diff) | |
parent | fe844e41ffad5691607982cbfe4054aacdcb81e0 (diff) | |
download | rabbitmq-c-github-ask-be3000b4c84d7503f5ef4067de44ff16d060d158.tar.gz |
Merge branch 'alanxz/master'
Conflicts:
Makefile.am
codegen
Diffstat (limited to 'librabbitmq')
-rw-r--r-- | librabbitmq/CMakeLists.txt | 17 | ||||
-rw-r--r-- | librabbitmq/amqp.h | 1932 | ||||
-rw-r--r-- | librabbitmq/amqp_api.c | 11 | ||||
-rw-r--r-- | librabbitmq/amqp_connection.c | 21 | ||||
-rw-r--r-- | librabbitmq/amqp_consumer.c | 301 | ||||
-rw-r--r-- | librabbitmq/amqp_cyassl.c | 4 | ||||
-rw-r--r-- | librabbitmq/amqp_framing.c | 29 | ||||
-rw-r--r-- | librabbitmq/amqp_framing.h | 10 | ||||
-rw-r--r-- | librabbitmq/amqp_gnutls.c | 4 | ||||
-rw-r--r-- | librabbitmq/amqp_hostcheck.c | 201 | ||||
-rw-r--r-- | librabbitmq/amqp_hostcheck.h | 36 | ||||
-rw-r--r-- | librabbitmq/amqp_mem.c | 7 | ||||
-rw-r--r-- | librabbitmq/amqp_openssl.c | 86 | ||||
-rw-r--r-- | librabbitmq/amqp_polarssl.c | 10 | ||||
-rw-r--r-- | librabbitmq/amqp_private.h | 11 | ||||
-rw-r--r-- | librabbitmq/amqp_socket.c | 333 | ||||
-rw-r--r-- | librabbitmq/amqp_socket.h | 75 | ||||
-rw-r--r-- | librabbitmq/amqp_ssl_socket.h | 53 | ||||
-rw-r--r-- | librabbitmq/amqp_table.c | 150 | ||||
-rw-r--r-- | librabbitmq/amqp_tcp_socket.c | 47 | ||||
-rw-r--r-- | librabbitmq/amqp_tcp_socket.h | 11 | ||||
-rw-r--r-- | librabbitmq/amqp_timer.c | 43 | ||||
-rw-r--r-- | librabbitmq/amqp_timer.h | 29 | ||||
-rw-r--r-- | librabbitmq/codegen.py | 116 | ||||
-rw-r--r-- | librabbitmq/win32/threads.h | 6 |
25 files changed, 3246 insertions, 297 deletions
diff --git a/librabbitmq/CMakeLists.txt b/librabbitmq/CMakeLists.txt index 8ab8bff..d3623b3 100644 --- a/librabbitmq/CMakeLists.txt +++ b/librabbitmq/CMakeLists.txt @@ -81,7 +81,11 @@ if (ENABLE_SSL_SUPPORT) set(AMQP_SSL_SOCKET_H_PATH amqp_ssl_socket.h) if (SSL_ENGINE STREQUAL "OpenSSL") - set(AMQP_SSL_SRCS ${AMQP_SSL_SOCKET_H_PATH} amqp_openssl.c) + set(AMQP_SSL_SRCS ${AMQP_SSL_SOCKET_H_PATH} + amqp_openssl.c + amqp_hostcheck.c + amqp_hostcheck.h + ) include_directories(${OPENSSL_INCLUDE_DIR}) set(AMQP_SSL_LIBS ${OPENSSL_LIBRARIES}) @@ -121,6 +125,7 @@ set(RABBITMQ_SOURCES amqp_api.c amqp.h amqp_connection.c amqp_mem.c amqp_private.h amqp_socket.c amqp_table.c amqp_url.c amqp_socket.h amqp_tcp_socket.c amqp_tcp_socket.h amqp_timer.c amqp_timer.h + amqp_consumer.c ${AMQP_SSL_SRCS} ) @@ -142,9 +147,9 @@ if (BUILD_SHARED_LIBS) endif (WIN32) install(TARGETS rabbitmq - RUNTIME DESTINATION bin - LIBRARY DESTINATION lib - ARCHIVE DESTINATION lib + RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR} + LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR} + ARCHIVE DESTINATION ${CMAKE_INSTALL_LIBDIR} ) install_pdb(rabbitmq) @@ -164,7 +169,7 @@ if (BUILD_STATIC_LIBS) endif (WIN32) install(TARGETS rabbitmq-static - ARCHIVE DESTINATION lib + ARCHIVE DESTINATION ${CMAKE_INSTALL_LIBDIR} ) install_pdb(rabbitmq-static) @@ -179,7 +184,7 @@ install(FILES amqp_tcp_socket.h ${AMQP_SSL_SOCKET_H_PATH} ${STDINT_H_INSTALL_FILE} - DESTINATION include + DESTINATION ${CMAKE_INSTALL_INCLUDEDIR} ) set(RMQ_LIBRARY_TARGET ${RMQ_LIBRARY_TARGET} PARENT_SCOPE) diff --git a/librabbitmq/amqp.h b/librabbitmq/amqp.h index 7f479c8..82b7c02 100644 --- a/librabbitmq/amqp.h +++ b/librabbitmq/amqp.h @@ -1,4 +1,5 @@ /* vim:set ft=c ts=2 sw=2 sts=2 et cindent: */ +/** \file */ /* * ***** BEGIN LICENSE BLOCK ***** * Version: MIT @@ -37,6 +38,8 @@ #ifndef AMQP_H #define AMQP_H +/** \cond HIDE_FROM_DOXYGEN */ + #ifdef __cplusplus #define AMQP_BEGIN_DECLS extern "C" { #define AMQP_END_DECLS } @@ -45,11 +48,13 @@ #define AMQP_END_DECLS #endif -/** Important API Decorators - * AMQP_PUBLIC_FUNCTION - Declares an exportable function - * AMQP_PUBLIC_VARIABLE - Declares an exportable variable - * AMQP_CALL - Declares the calling convention - */ +/* + * \internal + * Important API decorators: + * AMQP_PUBLIC_FUNCTION - a public API function + * AMQP_PUBLIC_VARIABLE - a public API external variable + * AMQP_CALL - calling convension (used on Win32) + */ #if defined(_WIN32) && defined(_MSC_VER) # if defined(AMQP_BUILD) && !defined(AMQP_STATIC) @@ -149,6 +154,12 @@ typedef _W64 int ssize_t; #endif #endif +#if defined(_WIN32) && defined(__MINGW32__) +#include <sys/types.h> +#endif + +/** \endcond */ + #include <stddef.h> #include <stdint.h> @@ -156,29 +167,257 @@ struct timeval; AMQP_BEGIN_DECLS +/** + * \def AMQP_VERSION_MAJOR + * + * Major library version number compile-time constant + * + * The major version is incremented when backwards incompatible API changes + * are made. + * + * \sa AMQP_VERSION, AMQP_VERSION_STRING + * + * \since v0.4.0 + */ + +/** + * \def AMQP_VERSION_MINOR + * + * Minor library version number compile-time constant + * + * The minor version is incremented when new APIs are added. Existing APIs + * are left alone. + * + * \sa AMQP_VERSION, AMQP_VERSION_STRING + * + * \since v0.4.0 + */ + +/** + * \def AMQP_VERSION_PATCH + * + * Patch library version number compile-time constant + * + * The patch version is incremented when library code changes, but the API + * is not changed. + * + * \sa AMQP_VERSION, AMQP_VERSION_STRING + * + * \since v0.4.0 + */ + +/** + * \def AMQP_VERSION_IS_RELEASE + * + * Version constant set to 1 for tagged release, 0 otherwise + * + * NOTE: versions that are not tagged releases are not guaranteed to be API/ABI + * compatible with older releases, and may change commit-to-commit. + * + * \sa AMQP_VERSION, AMQP_VERSION_STRING + * + * \since v0.4.0 + */ +/* + * Developer note: when changing these, be sure to update SOVERSION constants + * in CMakeLists.txt and configure.ac + */ + +#define AMQP_VERSION_MAJOR 0 +#define AMQP_VERSION_MINOR 5 +#define AMQP_VERSION_PATCH 1 +#define AMQP_VERSION_IS_RELEASE 0 + + +/** + * \def AMQP_VERSION + * + * Packed version number + * + * AMQP_VERSION is a 4-byte unsigned integer with the most significant byte + * set to AMQP_VERSION_MAJOR, the second most significant byte set to + * AMQP_VERSION_MINOR, third most significant byte set to AMQP_VERSION_PATCH, + * and the lowest byte set to AMQP_VERSION_IS_RELEASE. + * + * For example version 2.3.4 which is released version would be encoded as + * 0x02030401 + * + * \sa amqp_version_number() AMQP_VERSION_MAJOR, AMQP_VERSION_MINOR, + * AMQP_VERSION_PATCH, AMQP_VERSION_IS_RELEASE + * + * \since v0.4.0 + */ +#define AMQP_VERSION ((AMQP_VERSION_MAJOR << 24) | \ + (AMQP_VERSION_MINOR << 16) | \ + (AMQP_VERSION_PATCH << 8) | \ + (AMQP_VERSION_IS_RELEASE)) + +/** \cond HIDE_FROM_DOXYGEN */ +#define AMQ_STRINGIFY(s) AMQ_STRINGIFY_HELPER(s) +#define AMQ_STRINGIFY_HELPER(s) #s + +#define AMQ_VERSION_STRING AMQ_STRINGIFY(AMQP_VERSION_MAJOR) "." \ + AMQ_STRINGIFY(AMQP_VERSION_MINOR) "." \ + AMQ_STRINGIFY(AMQP_VERSION_PATCH) +/** \endcond */ + +/** + * \def AMQP_VERSION_STRING + * + * Version string compile-time constant + * + * Non-released versions of the library will have "-pre" appended to the + * version string + * + * \sa amqp_version() + * + * \since v0.4.0 + */ +#if AMQP_VERSION_IS_RELEASE +# define AMQP_VERSION_STRING AMQ_VERSION_STRING +#else +# define AMQP_VERSION_STRING AMQ_VERSION_STRING "-pre" +#endif + + +/** + * Returns the rabbitmq-c version as a packed integer. + * + * See \ref AMQP_VERSION + * + * \return packed 32-bit integer representing version of library at runtime + * + * \sa AMQP_VERSION, amqp_version() + * + * \since v0.4.0 + */ +AMQP_PUBLIC_FUNCTION +uint32_t +AMQP_CALL amqp_version_number(void); + +/** + * Returns the rabbitmq-c version as a string. + * + * See \ref AMQP_VERSION_STRING + * + * \return a statically allocated string describing the version of rabbitmq-c. + * + * \sa amqp_version_number(), AMQP_VERSION_STRING, AMQP_VERSION + * + * \since v0.1 + */ +AMQP_PUBLIC_FUNCTION +char const * +AMQP_CALL amqp_version(void); + +/** + * \def AMQP_DEFAULT_FRAME_SIZE + * + * Default frame size (128Kb) + * + * \sa amqp_login(), amqp_login_with_properties() + * + * \since v0.4.0 + */ +#define AMQP_DEFAULT_FRAME_SIZE 131072 + +/** + * \def AMQP_DEFAULT_MAX_CHANNELS + * + * Default maximum number of channels (0, no limit) + * + * \sa amqp_login(), amqp_login_with_properties() + * + * \since v0.4.0 + */ +#define AMQP_DEFAULT_MAX_CHANNELS 0 + +/** + * \def AMQP_DEFAULT_HEARTBEAT + * + * Default heartbeat interval (0, heartbeat disabled) + * + * \sa amqp_login(), amqp_login_with_properties() + * + * \since v0.4.0 + */ +#define AMQP_DEFAULT_HEARTBEAT 0 + +/** + * boolean type 0 = false, true otherwise + * + * \since v0.1 + */ typedef int amqp_boolean_t; + +/** + * Method number + * + * \since v0.1 + */ typedef uint32_t amqp_method_number_t; + +/** + * Bitmask for flags + * + * \since v0.1 + */ typedef uint32_t amqp_flags_t; + +/** + * Channel type + * + * \since v0.1 + */ typedef uint16_t amqp_channel_t; +/** + * Buffer descriptor + * + * \since v0.1 + */ typedef struct amqp_bytes_t_ { - size_t len; - void *bytes; + size_t len; /**< length of the buffer in bytes */ + void *bytes; /**< pointer to the beginning of the buffer */ } amqp_bytes_t; +/** + * Decimal data type + * + * \since v0.1 + */ typedef struct amqp_decimal_t_ { - uint8_t decimals; - uint32_t value; + uint8_t decimals; /**< the location of the decimal point */ + uint32_t value; /**< the value before the decimal point is applied */ } amqp_decimal_t; +/** + * AMQP field table + * + * An AMQP field table is a set of key-value pairs. + * A key is a UTF-8 encoded string up to 128 bytes long, and are not null + * terminated. + * A value can be one of several different datatypes. \sa amqp_field_value_kind_t + * + * \sa amqp_table_entry_t + * + * \since v0.1 + */ typedef struct amqp_table_t_ { - int num_entries; - struct amqp_table_entry_t_ *entries; + int num_entries; /**< length of entries array */ + struct amqp_table_entry_t_ *entries; /**< an array of table entries */ } amqp_table_t; +/** + * An AMQP Field Array + * + * A repeated set of field values, all must be of the same type + * + * \since v0.1 + */ typedef struct amqp_array_t_ { - int num_entries; - struct amqp_field_value_t_ *entries; + int num_entries; /**< Number of entries in the table */ + struct amqp_field_value_t_ *entries; /**< linked list of field values */ } amqp_array_t; /* @@ -220,214 +459,615 @@ of the other two, so this will work for both 0-8 and 0-9-1 branches of the code. */ +/** + * A field table value + * + * \since v0.1 + */ typedef struct amqp_field_value_t_ { - uint8_t kind; + uint8_t kind; /**< the type of the entry /sa amqp_field_value_kind_t */ union { - amqp_boolean_t boolean; - int8_t i8; - uint8_t u8; - int16_t i16; - uint16_t u16; - int32_t i32; - uint32_t u32; - int64_t i64; - uint64_t u64; - float f32; - double f64; - amqp_decimal_t decimal; - amqp_bytes_t bytes; - amqp_table_t table; - amqp_array_t array; - } value; + amqp_boolean_t boolean; /**< boolean type AMQP_FIELD_KIND_BOOLEAN */ + int8_t i8; /**< int8_t type AMQP_FIELD_KIND_I8 */ + uint8_t u8; /**< uint8_t type AMQP_FIELD_KIND_U8 */ + int16_t i16; /**< int16_t type AMQP_FIELD_KIND_I16 */ + uint16_t u16; /**< uint16_t type AMQP_FIELD_KIND_U16 */ + int32_t i32; /**< int32_t type AMQP_FIELD_KIND_I32 */ + uint32_t u32; /**< uint32_t type AMQP_FIELD_KIND_U32 */ + int64_t i64; /**< int64_t type AMQP_FIELD_KIND_I64 */ + uint64_t u64; /**< uint64_t type AMQP_FIELD_KIND_U64, AMQP_FIELD_KIND_TIMESTAMP */ + float f32; /**< float type AMQP_FIELD_KIND_F32 */ + double f64; /**< double type AMQP_FIELD_KIND_F64 */ + amqp_decimal_t decimal; /**< amqp_decimal_t AMQP_FIELD_KIND_DECIMAL */ + amqp_bytes_t bytes; /**< amqp_bytes_t type AMQP_FIELD_KIND_UTF8, AMQP_FIELD_KIND_BYTES */ + amqp_table_t table; /**< amqp_table_t type AMQP_FIELD_KIND_TABLE */ + amqp_array_t array; /**< amqp_array_t type AMQP_FIELD_KIND_ARRAY */ + } value; /**< a union of the value */ } amqp_field_value_t; +/** + * An entry in a field-table + * + * \sa amqp_table_encode(), amqp_table_decode(), amqp_table_clone() + * + * \since v0.1 + */ typedef struct amqp_table_entry_t_ { - amqp_bytes_t key; - amqp_field_value_t value; + amqp_bytes_t key; /**< the table entry key. Its a null-terminated UTF-8 string, + * with a maximum size of 128 bytes */ + amqp_field_value_t value; /**< the table entry values */ } amqp_table_entry_t; +/** + * Field value types + * + * \since v0.1 + */ typedef enum { - AMQP_FIELD_KIND_BOOLEAN = 't', - AMQP_FIELD_KIND_I8 = 'b', - AMQP_FIELD_KIND_U8 = 'B', - AMQP_FIELD_KIND_I16 = 's', - AMQP_FIELD_KIND_U16 = 'u', - AMQP_FIELD_KIND_I32 = 'I', - AMQP_FIELD_KIND_U32 = 'i', - AMQP_FIELD_KIND_I64 = 'l', - AMQP_FIELD_KIND_U64 = 'L', - AMQP_FIELD_KIND_F32 = 'f', - AMQP_FIELD_KIND_F64 = 'd', - AMQP_FIELD_KIND_DECIMAL = 'D', - AMQP_FIELD_KIND_UTF8 = 'S', - AMQP_FIELD_KIND_ARRAY = 'A', - AMQP_FIELD_KIND_TIMESTAMP = 'T', - AMQP_FIELD_KIND_TABLE = 'F', - AMQP_FIELD_KIND_VOID = 'V', - AMQP_FIELD_KIND_BYTES = 'x' + AMQP_FIELD_KIND_BOOLEAN = 't', /**< boolean type. 0 = false, 1 = true @see amqp_boolean_t */ + AMQP_FIELD_KIND_I8 = 'b', /**< 8-bit signed integer, datatype: int8_t */ + AMQP_FIELD_KIND_U8 = 'B', /**< 8-bit unsigned integer, datatype: uint8_t */ + AMQP_FIELD_KIND_I16 = 's', /**< 16-bit signed integer, datatype: int16_t */ + AMQP_FIELD_KIND_U16 = 'u', /**< 16-bit unsigned integer, datatype: uint16_t */ + AMQP_FIELD_KIND_I32 = 'I', /**< 32-bit signed integer, datatype: int32_t */ + AMQP_FIELD_KIND_U32 = 'i', /**< 32-bit unsigned integer, datatype: uint32_t */ + AMQP_FIELD_KIND_I64 = 'l', /**< 64-bit signed integer, datatype: int64_t */ + AMQP_FIELD_KIND_U64 = 'L', /**< 64-bit unsigned integer, datatype: uint64_t */ + AMQP_FIELD_KIND_F32 = 'f', /**< single-precision floating point value, datatype: float */ + AMQP_FIELD_KIND_F64 = 'd', /**< double-precision floating point value, datatype: double */ + AMQP_FIELD_KIND_DECIMAL = 'D', /**< amqp-decimal value, datatype: amqp_decimal_t */ + AMQP_FIELD_KIND_UTF8 = 'S', /**< UTF-8 null-terminated character string, datatype: amqp_bytes_t */ + AMQP_FIELD_KIND_ARRAY = 'A', /**< field array (repeated values of another datatype. datatype: amqp_array_t */ + AMQP_FIELD_KIND_TIMESTAMP = 'T',/**< 64-bit timestamp. datatype uint64_t */ + AMQP_FIELD_KIND_TABLE = 'F', /**< field table. encapsulates a table inside a table entry. datatype: amqp_table_t */ + AMQP_FIELD_KIND_VOID = 'V', /**< empty entry */ + AMQP_FIELD_KIND_BYTES = 'x' /**< unformatted byte string, datatype: amqp_bytes_t */ } amqp_field_value_kind_t; +/** + * A list of allocation blocks + * + * \since v0.1 + */ typedef struct amqp_pool_blocklist_t_ { - int num_blocks; - void **blocklist; + int num_blocks; /**< Number of blocks in the block list */ + void **blocklist; /**< Array of memory blocks */ } amqp_pool_blocklist_t; +/** + * A memory pool + * + * \since v0.1 + */ typedef struct amqp_pool_t_ { - size_t pagesize; - - amqp_pool_blocklist_t pages; - amqp_pool_blocklist_t large_blocks; - - int next_page; - char *alloc_block; - size_t alloc_used; + size_t pagesize; /**< the size of the page in bytes. + * allocations less than or equal to this size are + * allocated in the pages block list + * allocations greater than this are allocated in their + * own block in the large_blocks block list */ + + amqp_pool_blocklist_t pages; /**< blocks that are the size of pagesize */ + amqp_pool_blocklist_t large_blocks; /**< allocations larger than the pagesize */ + + int next_page; /**< an index to the next unused page block */ + char *alloc_block; /**< pointer to the current allocation block */ + size_t alloc_used; /**< number of bytes in the current allocation block that has been used */ } amqp_pool_t; +/** + * An amqp method + * + * \since v0.1 + */ typedef struct amqp_method_t_ { - amqp_method_number_t id; - void *decoded; + amqp_method_number_t id; /**< the method id number */ + void *decoded; /**< pointer to the decoded method, + * cast to the appropriate type to use */ } amqp_method_t; +/** + * An AMQP frame + * + * \since v0.1 + */ typedef struct amqp_frame_t_ { - uint8_t frame_type; /* 0 means no event */ - amqp_channel_t channel; + uint8_t frame_type; /**< frame type. The types: + * - AMQP_FRAME_METHOD - use the method union member + * - AMQP_FRAME_HEADER - use the properties union member + * - AMQP_FRAME_BODY - use the body_fragment union member + */ + amqp_channel_t channel; /**< the channel the frame was received on */ union { - amqp_method_t method; + amqp_method_t method; /**< a method, use if frame_type == AMQP_FRAME_METHOD */ struct { - uint16_t class_id; - uint64_t body_size; - void *decoded; - amqp_bytes_t raw; - } properties; - amqp_bytes_t body_fragment; + uint16_t class_id; /**< the class for the properties */ + uint64_t body_size; /**< size of the body in bytes */ + void *decoded; /**< the decoded properties */ + amqp_bytes_t raw; /**< amqp-encoded properties structure */ + } properties; /**< message header, a.k.a., properties, + use if frame_type == AMQP_FRAME_HEADER */ + amqp_bytes_t body_fragment; /**< a body fragment, use if frame_type == AMQP_FRAME_BODY */ struct { - uint8_t transport_high; - uint8_t transport_low; - uint8_t protocol_version_major; - uint8_t protocol_version_minor; - } protocol_header; - } payload; + uint8_t transport_high; /**< @internal first byte of handshake */ + uint8_t transport_low; /**< @internal second byte of handshake */ + uint8_t protocol_version_major; /**< @internal third byte of handshake */ + uint8_t protocol_version_minor; /**< @internal fourth byte of handshake */ + } protocol_header; /**< Used only when doing the initial handshake with the broker, + don't use otherwise */ + } payload; /**< the payload of the frame */ } amqp_frame_t; +/** + * Response type + * + * \since v0.1 + */ typedef enum amqp_response_type_enum_ { - AMQP_RESPONSE_NONE = 0, - AMQP_RESPONSE_NORMAL, - AMQP_RESPONSE_LIBRARY_EXCEPTION, - AMQP_RESPONSE_SERVER_EXCEPTION + AMQP_RESPONSE_NONE = 0, /**< the library got an EOF from the socket */ + AMQP_RESPONSE_NORMAL, /**< response normal, the RPC completed successfully */ + AMQP_RESPONSE_LIBRARY_EXCEPTION,/**< library error, an error occurred in the library, examine the library_error */ + AMQP_RESPONSE_SERVER_EXCEPTION /**< server exception, the broker returned an error, check replay */ } amqp_response_type_enum; +/** + * Reply from a RPC method on the broker + * + * \since v0.1 + */ typedef struct amqp_rpc_reply_t_ { - amqp_response_type_enum reply_type; - amqp_method_t reply; - int library_error; /* if AMQP_RESPONSE_LIBRARY_EXCEPTION, then 0 here means socket EOF */ + amqp_response_type_enum reply_type; /**< the reply type: + * - AMQP_RESPONSE_NORMAL - the RPC completed successfully + * - AMQP_RESPONSE_SERVER_EXCEPTION - the broker returned + * an exception, check the reply field + * - AMQP_RESPONSE_LIBRARY_EXCEPTION - the library + * encountered an error, check the library_error field + */ + amqp_method_t reply; /**< in case of AMQP_RESPONSE_SERVER_EXCEPTION this + * field will be set to the method returned from the broker */ + int library_error; /**< in case of AMQP_RESPONSE_LIBRARY_EXCEPTION this + * field will be set to an error code. An error + * string can be retrieved using amqp_error_string */ } amqp_rpc_reply_t; +/** + * SASL method type + * + * \since v0.1 + */ typedef enum amqp_sasl_method_enum_ { - AMQP_SASL_METHOD_PLAIN = 0 + AMQP_SASL_METHOD_PLAIN = 0 /**< the PLAIN SASL method for authentication to the broker */ } amqp_sasl_method_enum; -/* Opaque struct. */ +/** + * connection state object + * + * \since v0.1 + */ typedef struct amqp_connection_state_t_ *amqp_connection_state_t; +/** + * Socket object + * + * \since v0.4.0 + */ typedef struct amqp_socket_t_ amqp_socket_t; +/** + * Status codes + * + * \since v0.4.0 + */ typedef enum amqp_status_enum_ { - AMQP_STATUS_OK = 0x0, - AMQP_STATUS_NO_MEMORY = -0x0001, - AMQP_STATUS_BAD_AMQP_DATA = -0x0002, - AMQP_STATUS_UNKNOWN_CLASS = -0x0003, - AMQP_STATUS_UNKNOWN_METHOD = -0x0004, - AMQP_STATUS_HOSTNAME_RESOLUTION_FAILED= -0x0005, - AMQP_STATUS_INCOMPATIBLE_AMQP_VERSION = -0x0006, - AMQP_STATUS_CONNECTION_CLOSED = -0x0007, - AMQP_STATUS_BAD_URL = -0x0008, - AMQP_STATUS_SOCKET_ERROR = -0x0009, - AMQP_STATUS_INVALID_PARAMETER = -0x000A, - AMQP_STATUS_TABLE_TOO_BIG = -0x000B, - AMQP_STATUS_WRONG_METHOD = -0x000C, - AMQP_STATUS_TIMEOUT = -0x000D, - AMQP_STATUS_TIMER_FAILURE = -0x000E, - AMQP_STATUS_HEARTBEAT_TIMEOUT = -0x000F, - - AMQP_STATUS_TCP_ERROR = -0x0100, - AMQP_STATUS_TCP_SOCKETLIB_INIT_ERROR = -0x0101, - - AMQP_STATUS_SSL_ERROR = -0x0200, - AMQP_STATUS_SSL_HOSTNAME_VERIFY_FAILED= -0x0201, - AMQP_STATUS_SSL_PEER_VERIFY_FAILED = -0x0202, - AMQP_STATUS_SSL_CONNECTION_FAILED = -0x0203 + AMQP_STATUS_OK = 0x0, /**< Operation successful */ + AMQP_STATUS_NO_MEMORY = -0x0001, /**< Memory allocation + failed */ + AMQP_STATUS_BAD_AMQP_DATA = -0x0002, /**< Incorrect or corrupt + data was received from + the broker. This is a + protocol error. */ + AMQP_STATUS_UNKNOWN_CLASS = -0x0003, /**< An unknown AMQP class + was received. This is + a protocol error. */ + AMQP_STATUS_UNKNOWN_METHOD = -0x0004, /**< An unknown AMQP method + was received. This is + a protocol error. */ + AMQP_STATUS_HOSTNAME_RESOLUTION_FAILED= -0x0005, /**< Unable to resolve the + * hostname */ + AMQP_STATUS_INCOMPATIBLE_AMQP_VERSION = -0x0006, /**< The broker advertised + an incompaible AMQP + version */ + AMQP_STATUS_CONNECTION_CLOSED = -0x0007, /**< The connection to the + broker has been closed + */ + AMQP_STATUS_BAD_URL = -0x0008, /**< malformed AMQP URL */ + AMQP_STATUS_SOCKET_ERROR = -0x0009, /**< A socket error + occurred */ + AMQP_STATUS_INVALID_PARAMETER = -0x000A, /**< An invalid parameter + was passed into the + function */ + AMQP_STATUS_TABLE_TOO_BIG = -0x000B, /**< The amqp_table_t object + cannot be serialized + because the output + buffer is too small */ + AMQP_STATUS_WRONG_METHOD = -0x000C, /**< The wrong method was + received */ + AMQP_STATUS_TIMEOUT = -0x000D, /**< Operation timed out */ + AMQP_STATUS_TIMER_FAILURE = -0x000E, /**< The underlying system + timer facility failed */ + AMQP_STATUS_HEARTBEAT_TIMEOUT = -0x000F, /**< Timed out waiting for + heartbeat */ + AMQP_STATUS_UNEXPECTED_STATE = -0x0010, /**< Unexpected protocol + state */ + + AMQP_STATUS_TCP_ERROR = -0x0100, /**< A generic TCP error + occurred */ + AMQP_STATUS_TCP_SOCKETLIB_INIT_ERROR = -0x0101, /**< An error occurred trying + to initialize the + socket library*/ + + AMQP_STATUS_SSL_ERROR = -0x0200, /**< A generic SSL error + occurred. */ + AMQP_STATUS_SSL_HOSTNAME_VERIFY_FAILED= -0x0201, /**< SSL validation of + hostname against + peer certificate + failed */ + AMQP_STATUS_SSL_PEER_VERIFY_FAILED = -0x0202, /**< SSL validation of peer + certificate failed. */ + AMQP_STATUS_SSL_CONNECTION_FAILED = -0x0203 /**< SSL handshake failed. */ } amqp_status_enum; -AMQP_PUBLIC_FUNCTION -char const * -AMQP_CALL amqp_version(void); +/** + * AMQP delivery modes. + * Use these values for the #amqp_basic_properties_t::delivery_mode field. + * + * \since v0.5 + */ +typedef enum { + AMQP_DELIVERY_NONPERSISTENT = 1, /**< Non-persistent message */ + AMQP_DELIVERY_PERSISTENT = 2 /**< Persistent message */ +} amqp_delivery_mode_enum; + +AMQP_END_DECLS -/* Exported empty data structures */ +#include <amqp_framing.h> + +AMQP_BEGIN_DECLS + +/** + * Empty bytes structure + * + * \since v0.2 + */ AMQP_PUBLIC_VARIABLE const amqp_bytes_t amqp_empty_bytes; + +/** + * Empty table structure + * + * \since v0.2 + */ AMQP_PUBLIC_VARIABLE const amqp_table_t amqp_empty_table; + +/** + * Empty table array structure + * + * \since v0.2 + */ AMQP_PUBLIC_VARIABLE const amqp_array_t amqp_empty_array; /* Compatibility macros for the above, to avoid the need to update code written against earlier versions of librabbitmq. */ + +/** + * \def AMQP_EMPTY_BYTES + * + * Deprecated, use \ref amqp_empty_bytes instead + * + * \deprecated use \ref amqp_empty_bytes instead + * + * \since v0.1 + */ #define AMQP_EMPTY_BYTES amqp_empty_bytes + +/** + * \def AMQP_EMPTY_TABLE + * + * Deprecated, use \ref amqp_empty_table instead + * + * \deprecated use \ref amqp_empty_table instead + * + * \since v0.1 + */ #define AMQP_EMPTY_TABLE amqp_empty_table + +/** + * \def AMQP_EMPTY_ARRAY + * + * Deprecated, use \ref amqp_empty_array instead + * + * \deprecated use \ref amqp_empty_array instead + * + * \since v0.1 + */ #define AMQP_EMPTY_ARRAY amqp_empty_array +/** + * Initializes an amqp_pool_t memory allocation pool for use + * + * Readies an allocation pool for use. An amqp_pool_t + * must be initialized before use + * + * \param [in] pool the amqp_pool_t structure to initialize. + * Calling this function on a pool a pool that has + * already been initialized will result in undefined + * behavior + * \param [in] pagesize the unit size that the pool will allocate + * memory chunks in. Anything allocated against the pool + * with a requested size will be carved out of a block + * this size. Allocations larger than this will be + * allocated individually + * + * \sa recycle_amqp_pool(), empty_amqp_pool(), amqp_pool_alloc(), + * amqp_pool_alloc_bytes(), amqp_pool_t + * + * \since v0.1 + */ AMQP_PUBLIC_FUNCTION void AMQP_CALL init_amqp_pool(amqp_pool_t *pool, size_t pagesize); +/** + * Recycles an amqp_pool_t memory allocation pool + * + * Recycles the space allocate by the pool + * + * This invalidates all allocations made against the pool before this call is + * made, any use of any allocations made before recycle_amqp_pool() is called + * will result in undefined behavior. + * + * Note: this may or may not release memory, to force memory to be released + * call empty_amqp_pool(). + * + * \param [in] pool the amqp_pool_t to recycle + * + * \sa recycle_amqp_pool(), empty_amqp_pool(), amqp_pool_alloc(), + * amqp_pool_alloc_bytes() + * + * \since v0.1 + * + */ AMQP_PUBLIC_FUNCTION void AMQP_CALL recycle_amqp_pool(amqp_pool_t *pool); +/** + * Empties an amqp memory pool + * + * Releases all memory associated with an allocation pool + * + * \param [in] pool the amqp_pool_t to empty + * + * \since v0.1 + */ AMQP_PUBLIC_FUNCTION void AMQP_CALL empty_amqp_pool(amqp_pool_t *pool); +/** + * Allocates a block of memory from an amqp_pool_t memory pool + * + * Memory will be aligned on a 8-byte boundary. If a 0-length allocation is + * requested, a NULL pointer will be returned. + * + * \param [in] pool the allocation pool to allocate the memory from + * \param [in] amount the size of the allocation in bytes. + * \return a pointer to the memory block, or NULL if the allocation cannot + * be satisfied. + * + * \sa init_amqp_pool(), recycle_amqp_pool(), empty_amqp_pool(), + * amqp_pool_alloc_bytes() + * + * \since v0.1 + */ AMQP_PUBLIC_FUNCTION void * AMQP_CALL amqp_pool_alloc(amqp_pool_t *pool, size_t amount); +/** + * Allocates a block of memory from an amqp_pool_t to an amqp_bytes_t + * + * Memory will be aligned on a 8-byte boundary. If a 0-length allocation is + * requested, output.bytes = NULL. + * + * \param [in] pool the allocation pool to allocate the memory from + * \param [in] amount the size of the allocation in bytes + * \param [in] output the location to store the pointer. On success + * output.bytes will be set to the beginning of the buffer + * output.len will be set to amount + * On error output.bytes will be set to NULL and output.len + * set to 0 + * + * \sa init_amqp_pool(), recycle_amqp_pool(), empty_amqp_pool(), + * amqp_pool_alloc() + * + * \since v0.1 + */ AMQP_PUBLIC_FUNCTION void AMQP_CALL amqp_pool_alloc_bytes(amqp_pool_t *pool, size_t amount, amqp_bytes_t *output); +/** + * Wraps a c string in an amqp_bytes_t + * + * Takes a string, calculates its length and creates an + * amqp_bytes_t that points to it. The string is not duplicated. + * + * For a given input cstr, The amqp_bytes_t output.bytes is the + * same as cstr, output.len is the length of the string not including + * the \0 terminator + * + * This function uses strlen() internally so cstr must be properly + * terminated + * + * \param [in] cstr the c string to wrap + * \return an amqp_bytes_t that describes the string + * + * \since v0.1 + */ AMQP_PUBLIC_FUNCTION amqp_bytes_t AMQP_CALL amqp_cstring_bytes(char const *cstr); +/** + * Duplicates an amqp_bytes_t buffer. + * + * The buffer is cloned and the contents copied. + * + * The memory associated with the output is allocated + * with amqp_bytes_malloc() and should be freed with + * amqp_bytes_free() + * + * \param [in] src + * \return a clone of the src + * + * \sa amqp_bytes_free(), amqp_bytes_malloc() + * + * \since v0.1 + */ AMQP_PUBLIC_FUNCTION amqp_bytes_t AMQP_CALL amqp_bytes_malloc_dup(amqp_bytes_t src); +/** + * Allocates a amqp_bytes_t buffer + * + * Creates an amqp_bytes_t buffer of the specified amount, the buffer should be + * freed using amqp_bytes_free() + * + * \param [in] amount the size of the buffer in bytes + * \returns an amqp_bytes_t with amount bytes allocated. + * output.bytes will be set to NULL on error + * + * \sa amqp_bytes_free(), amqp_bytes_malloc_dup() + * + * \since v0.1 + */ AMQP_PUBLIC_FUNCTION amqp_bytes_t AMQP_CALL amqp_bytes_malloc(size_t amount); +/** + * Frees an amqp_bytes_t buffer + * + * Frees a buffer allocated with amqp_bytes_malloc() or amqp_bytes_malloc_dup() + * + * Calling amqp_bytes_free on buffers not allocated with one + * of those two functions will result in undefined behavior + * + * \param [in] bytes the buffer to free + * + * \sa amqp_bytes_malloc(), amqp_bytes_malloc_dup() + * + * \since v0.1 + */ AMQP_PUBLIC_FUNCTION void AMQP_CALL amqp_bytes_free(amqp_bytes_t bytes); +/** + * Allocate and initialize a new amqp_connection_state_t object + * + * amqp_connection_state_t objects created with this function + * should be freed with amqp_destroy_connection() + * + * \returns an opaque pointer on success, NULL or 0 on failure. + * + * \sa amqp_destroy_connection() + * + * \since v0.1 + */ AMQP_PUBLIC_FUNCTION amqp_connection_state_t AMQP_CALL amqp_new_connection(void); +/** + * Get the underlying socket descriptor for the connection + * + * \warning Use the socket returned from this function carefully, incorrect use + * of the socket outside of the library will lead to undefined behavior. + * Additionally rabbitmq-c may use the socket differently version-to-version, + * what may work in one version, may break in the next version. Be sure to + * throughly test any applications that use the socket returned by this + * function especially when using a newer version of rabbitmq-c + * + * \param [in] state the connection object + * \returns the socket descriptor if one has been set, -1 otherwise + * + * \sa amqp_tcp_socket_new(), amqp_ssl_socket_new(), amqp_socket_open() + * + * \since v0.1 + */ AMQP_PUBLIC_FUNCTION int AMQP_CALL amqp_get_sockfd(amqp_connection_state_t state); + +/** + * Deprecated, use amqp_tcp_socket_new() or amqp_ssl_socket_new() + * + * \deprecated Use amqp_tcp_socket_new() or amqp_ssl_socket_new() + * + * Sets the socket descriptor associated with the connection. The socket + * should be connected to a broker, and should not be read to or written from + * before calling this function. A socket descriptor can be created and opened + * using amqp_open_socket() + * + * \param [in] state the connection object + * \param [in] sockfd the socket + * + * \sa amqp_open_socket(), amqp_tcp_socket_new(), amqp_ssl_socket_new() + * + * \since v0.1 + */ AMQP_DEPRECATED( AMQP_PUBLIC_FUNCTION void AMQP_CALL amqp_set_sockfd(amqp_connection_state_t state, int sockfd) ); -AMQP_PUBLIC_FUNCTION -void -AMQP_CALL amqp_set_socket(amqp_connection_state_t state, amqp_socket_t *socket); +/** + * Tune client side parameters + * + * \warning This function may call abort() if the connection is in a certain + * state. As such it should probably not be called code outside the library. + * connection parameters should be specified when calling amqp_login() or + * amqp_login_with_properties() + * + * This function changes channel_max, frame_max, and heartbeat parameters, on + * the client side only. It does not try to renegotiate these parameters with + * the broker. Using this function will lead to unexpected results. + * + * \param [in] state the connection object + * \param [in] channel_max the maximum number of channels. + * The largest this can be is 65535 + * \param [in] frame_max the maximum size of an frame. + * The smallest this can be is 4096 + * The largest this can be is 2147483647 + * Unless you know what you're doing the recommended + * size is 131072 or 128KB + * \param [in] heartbeat the number of seconds between heartbeats + * + * \return AMQP_STATUS_OK on success, an amqp_status_enum value otherwise. + * Possible error codes include: + * - AMQP_STATUS_NO_MEMORY memory allocation failed. + * - AMQP_STATUS_TIMER_FAILURE the underlying system timer indicated it + * failed. + * + * \sa amqp_login(), amqp_login_with_properties() + * + * \since v0.1 + */ AMQP_PUBLIC_FUNCTION int AMQP_CALL amqp_tune_connection(amqp_connection_state_t state, @@ -435,67 +1075,463 @@ AMQP_CALL amqp_tune_connection(amqp_connection_state_t state, int frame_max, int heartbeat); +/** + * Get the maximum number of channels the connection can handle + * + * The maximum number of channels is set when connection negotiation takes + * place in amqp_login() or amqp_login_with_properties(). + * + * \param [in] state the connection object + * \return the maximum number of channels. 0 if there is no limit + * + * \since v0.1 + */ AMQP_PUBLIC_FUNCTION int AMQP_CALL amqp_get_channel_max(amqp_connection_state_t state); +/** + * Destroys an amqp_connection_state_t object + * + * Destroys a amqp_connection_state_t object that was created with + * amqp_new_connection(). If the connection with the broker is open, it will be + * implicitly closed with a reply code of 200 (success). Any memory that + * would be freed with amqp_maybe_release_buffers() or + * amqp_maybe_release_buffers_on_channel() will be freed, and use of that + * memory will caused undefined behavior. + * + * \param [in] state the connection object + * \return AMQP_STATUS_OK on success. amqp_status_enum value failure + * + * \sa amqp_new_connection() + * + * \since v0.1 + */ AMQP_PUBLIC_FUNCTION int AMQP_CALL amqp_destroy_connection(amqp_connection_state_t state); +/** + * Process incoming data + * + * \warning This is a low-level function intended for those who want to + * have greater control over input and output over the socket from the + * broker. Correctly using this function requires in-depth knowledge of AMQP + * and rabbitmq-c. + * + * For a given buffer of data received from the broker, decode the first + * frame in the buffer. If more than one frame is contained in the input buffer + * the return value will be less than the received_data size, the caller should + * adjust received_data buffer descriptor to point to the beginning of the + * buffer + the return value. + * + * \param [in] state the connection object + * \param [in] received_data a buffer of data received from the broker. The + * function will return the number of bytes of the buffer it used. The + * function copies these bytes to an internal buffer: this part of the buffer + * may be reused after this function successfully completes. + * \param [in,out] decoded_frame caller should pass in a pointer to an + * amqp_frame_t struct. If there is enough data in received_data for a + * complete frame, decoded_frame->frame_type will be set to something OTHER + * than 0. decoded_frame may contain members pointing to memory owned by + * the state object. This memory can be recycled with amqp_maybe_release_buffers() + * or amqp_maybe_release_buffers_on_channel() + * \return number of bytes consumed from received_data or 0 if a 0-length + * buffer was passed. A negative return value indicates failure. Possible errors: + * - AMQP_STATUS_NO_MEMORY failure in allocating memory. The library is likely in + * an indeterminate state making recovery unlikely. Client should note the error + * and terminate the application + * - AMQP_STATUS_BAD_AMQP_DATA bad AMQP data was received. The connection + * should be shutdown immediately + * - AMQP_STATUS_UNKNOWN_METHOD: an unknown method was received from the + * broker. This is likely a protocol error and the connection should be + * shutdown immediately + * - AMQP_STATUS_UNKNOWN_CLASS: a properties frame with an unknown class + * was received from the broker. This is likely a protocol error and the + * connection should be shutdown immediately + * + * \since v0.1 + */ AMQP_PUBLIC_FUNCTION int AMQP_CALL amqp_handle_input(amqp_connection_state_t state, amqp_bytes_t received_data, amqp_frame_t *decoded_frame); +/** + * Check to see if connection memory can be released + * + * \deprecated This function is deprecated in favor of + * amqp_maybe_release_buffers() or amqp_maybe_release_buffers_on_channel() + * + * Checks the state of an amqp_connection_state_t object to see if + * amqp_release_buffers() can be called successfully. + * + * \param [in] state the connection object + * \returns TRUE if the buffers can be released FALSE otherwise + * + * \sa amqp_release_buffers() amqp_maybe_release_buffers() + * amqp_maybe_release_buffers_on_channel() + * + * \since v0.1 + */ AMQP_PUBLIC_FUNCTION amqp_boolean_t AMQP_CALL amqp_release_buffers_ok(amqp_connection_state_t state); +/** + * Release amqp_connection_state_t owned memory + * + * \deprecated This function is deprecated in favor of + * amqp_maybe_release_buffers() or amqp_maybe_release_buffers_on_channel() + * + * \warning caller should ensure amqp_release_buffers_ok() returns true before + * calling this function. Failure to do so may result in abort() being called. + * + * Release memory owned by the amqp_connection_state_t for reuse by the + * library. Use of any memory returned by the library before this function is + * called will result in undefined behavior. + * + * \note internally rabbitmq-c tries to reuse memory when possible. As a result + * its possible calling this function may not have a noticeable effect on + * memory usage. + * + * \param [in] state the connection object + * + * \sa amqp_release_buffers_ok() amqp_maybe_release_buffers() + * amqp_maybe_release_buffers_on_channel() + * + * \since v0.1 + */ AMQP_PUBLIC_FUNCTION void AMQP_CALL amqp_release_buffers(amqp_connection_state_t state); +/** + * Release amqp_connection_state_t owned memory + * + * Release memory owned by the amqp_connection_state_t object related to any + * channel, allowing reuse by the library. Use of any memory returned by the + * library before this function is called with result in undefined behavior. + * + * \note internally rabbitmq-c tries to reuse memory when possible. As a result + * its possible calling this function may not have a noticeable effect on + * memory usage. + * + * \param [in] state the connection object + * + * \sa amqp_maybe_release_buffers_on_channel() + * + * \since v0.1 + */ AMQP_PUBLIC_FUNCTION void AMQP_CALL amqp_maybe_release_buffers(amqp_connection_state_t state); +/** + * Release amqp_connection_state_t owned memory related to a channel + * + * Release memory owned by the amqp_connection_state_t object related to the + * specified channel, allowing reuse by the library. Use of any memory returned + * the library for a specific channel will result in undefined behavior. + * + * \note internally rabbitmq-c tries to reuse memory when possible. As a result + * its possible calling this function may not have a noticeable effect on + * memory usage. + * + * \param [in] state the connection object + * \param [in] channel the channel specifier for which memory should be + * released. Note that the library does not care about the state of the + * channel when calling this function + * + * \sa amqp_maybe_release_buffers() + * + * \since v0.4.0 + */ AMQP_PUBLIC_FUNCTION void -AMQP_CALL amqp_maybe_release_buffers_on_channel(amqp_connection_state_t state, amqp_channel_t channel); +AMQP_CALL amqp_maybe_release_buffers_on_channel(amqp_connection_state_t state, + amqp_channel_t channel); +/** + * Send a frame to the broker + * + * \param [in] state the connection object + * \param [in] frame the frame to send to the broker + * \return AMQP_STATUS_OK on success, an amqp_status_enum value on error. + * Possible error codes: + * - AMQP_STATUS_BAD_AMQP_DATA the serialized form of the method or + * properties was too large to fit in a single AMQP frame, or the + * method contains an invalid value. The frame was not sent. + * - AMQP_STATUS_TABLE_TOO_BIG the serialized form of an amqp_table_t is + * too large to fit in a single AMQP frame. Frame was not sent. + * - AMQP_STATUS_UNKNOWN_METHOD an invalid method type was passed in + * - AMQP_STATUS_UNKNOWN_CLASS an invalid properties type was passed in + * - AMQP_STATUS_TIMER_FAILURE system timer indicated failure. The frame + * was sent + * - AMQP_STATUS_SOCKET_ERROR + * - AMQP_STATUS_SSL_ERROR + * + * \since v0.1 + */ AMQP_PUBLIC_FUNCTION int AMQP_CALL amqp_send_frame(amqp_connection_state_t state, amqp_frame_t const *frame); +/** + * Compare two table entries + * + * Works just like strcmp(), comparing two the table keys, datatype, then values + * + * \param [in] entry1 the entry on the left + * \param [in] entry2 the entry on the right + * \return 0 if entries are equal, 0 < if left is greater, 0 > if right is greater + * + * \since v0.1 + */ AMQP_PUBLIC_FUNCTION int AMQP_CALL amqp_table_entry_cmp(void const *entry1, void const *entry2); +/** + * Open a socket to a remote host + * + * \deprecated This function is deprecated in favor of amqp_socket_open() + * + * Looks up the hostname, then attempts to open a socket to the host using + * the specified portnumber. It also sets various options on the socket to + * improve performance and correctness. + * + * \param [in] hostname this can be a hostname or IP address. + * Both IPv4 and IPv6 are acceptable + * \param [in] portnumber the port to connect on. RabbitMQ brokers + * listen on port 5672, and 5671 for SSL + * \return a positive value indicates success and is the sockfd. A negative + * value (see amqp_status_enum)is returned on failure. Possible error codes: + * - AMQP_STATUS_TCP_SOCKETLIB_INIT_ERROR Initialization of underlying socket + * library failed. + * - AMQP_STATUS_HOSTNAME_RESOLUTION_FAILED hostname lookup failed. + * - AMQP_STATUS_SOCKET_ERROR a socket error occurred. errno or WSAGetLastError() + * may return more useful information. + * + * \note IPv6 support was added in v0.3 + * + * \sa amqp_socket_open() amqp_set_sockfd() + * + * \since v0.1 + */ AMQP_PUBLIC_FUNCTION int AMQP_CALL amqp_open_socket(char const *hostname, int portnumber); +/** + * Send initial AMQP header to the broker + * + * \warning this is a low level function intended for those who want to + * interact with the broker at a very low level. Use of this function without + * understanding what it does will result in AMQP protocol errors. + * + * This function sends the AMQP protocol header to the broker. + * + * \param [in] state the connection object + * \return AMQP_STATUS_OK on success, a negative value on failure. Possible + * error codes: + * - AMQP_STATUS_CONNECTION_CLOSED the connection to the broker was closed. + * - AMQP_STATUS_SOCKET_ERROR a socket error occurred. It is likely the + * underlying socket has been closed. errno or WSAGetLastError() may provide + * further information. + * - AMQP_STATUS_SSL_ERROR a SSL error occurred. The connection to the broker + * was closed. + * + * \since v0.1 + */ AMQP_PUBLIC_FUNCTION int AMQP_CALL amqp_send_header(amqp_connection_state_t state); +/** + * Checks to see if there are any incoming frames ready to be read + * + * Checks to see if there are any amqp_frame_t objects buffered by the + * amqp_connection_state_t object. Having one or more frames buffered means + * that amqp_simple_wait_frame() or amqp_simple_wait_frame_noblock() will + * return a frame without potentially blocking on a read() call. + * + * \param [in] state the connection object + * \return TRUE if there are frames enqueued, FALSE otherwise + * + * \sa amqp_simple_wait_frame() amqp_simple_wait_frame_noblock() + * amqp_data_in_buffer() + * + * \since v0.1 + */ AMQP_PUBLIC_FUNCTION amqp_boolean_t AMQP_CALL amqp_frames_enqueued(amqp_connection_state_t state); +/** + * Read a single amqp_frame_t + * + * Waits for the next amqp_frame_t frame to be read from the broker. + * This function has the potential to block for a long time in the case of + * waiting for a basic.deliver method frame from the broker. + * + * The library may buffer frames. When an amqp_connection_state_t object + * has frames buffered calling amqp_simple_wait_frame() will return an + * amqp_frame_t without entering a blocking read(). You can test to see if + * an amqp_connection_state_t object has frames buffered by calling the + * amqp_frames_enqueued() function. + * + * The library has a socket read buffer. When there is data in an + * amqp_connection_state_t read buffer, amqp_simple_wait_frame() may return an + * amqp_frame_t without entering a blocking read(). You can test to see if an + * amqp_connection_state_t object has data in its read buffer by calling the + * amqp_data_in_buffer() function. + * + * \param [in] state the connection object + * \param [out] decoded_frame the frame + * \return AMQP_STATUS_OK on success, an amqp_status_enum value + * is returned otherwise. Possible errors include: + * - AMQP_STATUS_NO_MEMORY failure in allocating memory. The library is likely in + * an indeterminate state making recovery unlikely. Client should note the error + * and terminate the application + * - AMQP_STATUS_BAD_AMQP_DATA bad AMQP data was received. The connection + * should be shutdown immediately + * - AMQP_STATUS_UNKNOWN_METHOD: an unknown method was received from the + * broker. This is likely a protocol error and the connection should be + * shutdown immediately + * - AMQP_STATUS_UNKNOWN_CLASS: a properties frame with an unknown class + * was received from the broker. This is likely a protocol error and the + * connection should be shutdown immediately + * - AMQP_STATUS_HEARTBEAT_TIMEOUT timed out while waiting for heartbeat + * from the broker. The connection has been closed. + * - AMQP_STATUS_TIMER_FAILURE system timer indicated failure. + * - AMQP_STATUS_SOCKET_ERROR a socket error occurred. The connection has + * been closed + * - AMQP_STATUS_SSL_ERROR a SSL socket error occurred. The connection has + * been closed. + * + * \sa amqp_simple_wait_frame_noblock() amqp_frames_enqueued() + * amqp_data_in_buffer() + * + * \note as of v0.4.0 this function will no longer return heartbeat frames + * when enabled by specifying a non-zero heartbeat value in amqp_login(). + * Heartbeating is handled internally by the library. + * + * \since v0.1 + */ AMQP_PUBLIC_FUNCTION int AMQP_CALL amqp_simple_wait_frame(amqp_connection_state_t state, amqp_frame_t *decoded_frame); +/** + * Read a single amqp_frame_t with a timeout. + * + * Waits for the next amqp_frame_t frame to be read from the broker, up to + * a timespan specified by tv. The function will return AMQP_STATUS_TIMEOUT + * if the timeout is reached. The tv value is not modified by the function. + * + * If a 0 timeval is specified, the function behaves as if its non-blocking: it + * will test to see if a frame can be read from the broker, and return immediately. + * + * If NULL is passed in for tv, the function will behave like + * amqp_simple_wait_frame() and block until a frame is received from the broker + * + * The library may buffer frames. When an amqp_connection_state_t object + * has frames buffered calling amqp_simple_wait_frame_noblock() will return an + * amqp_frame_t without entering a blocking read(). You can test to see if an + * amqp_connection_state_t object has frames buffered by calling the + * amqp_frames_enqueued() function. + * + * The library has a socket read buffer. When there is data in an + * amqp_connection_state_t read buffer, amqp_simple_wait_frame_noblock() may return + * an amqp_frame_t without entering a blocking read(). You can test to see if an + * amqp_connection_state_t object has data in its read buffer by calling the + * amqp_data_in_buffer() function. + * + * \note This function does not return heartbeat frames. When enabled, heartbeating + * is handed internally internally by the library + * + * \param [in,out] state the connection object + * \param [out] decoded_frame the frame + * \param [in] tv the maximum time to wait for a frame to be read. Setting + * tv->tv_sec = 0 and tv->tv_usec = 0 will do a non-blocking read. Specifying + * NULL for tv will make the function block until a frame is read. + * \return AMQP_STATUS_OK on success. An amqp_status_enum value is returned + * otherwise. Possible errors include: + * - AMQP_STATUS_TIMEOUT the timeout was reached while waiting for a frame + * from the broker. + * - AMQP_STATUS_INVALID_PARAMETER the tv parameter contains an invalid value. + * - AMQP_STATUS_NO_MEMORY failure in allocating memory. The library is likely in + * an indeterminate state making recovery unlikely. Client should note the error + * and terminate the application + * - AMQP_STATUS_BAD_AMQP_DATA bad AMQP data was received. The connection + * should be shutdown immediately + * - AMQP_STATUS_UNKNOWN_METHOD: an unknown method was received from the + * broker. This is likely a protocol error and the connection should be + * shutdown immediately + * - AMQP_STATUS_UNKNOWN_CLASS: a properties frame with an unknown class + * was received from the broker. This is likely a protocol error and the + * connection should be shutdown immediately + * - AMQP_STATUS_HEARTBEAT_TIMEOUT timed out while waiting for heartbeat + * from the broker. The connection has been closed. + * - AMQP_STATUS_TIMER_FAILURE system timer indicated failure. + * - AMQP_STATUS_SOCKET_ERROR a socket error occurred. The connection has + * been closed + * - AMQP_STATUS_SSL_ERROR a SSL socket error occurred. The connection has + * been closed. + * + * \sa amqp_simple_wait_frame() amqp_frames_enqueued() amqp_data_in_buffer() + * + * \since v0.4.0 + */ AMQP_PUBLIC_FUNCTION int AMQP_CALL amqp_simple_wait_frame_noblock(amqp_connection_state_t state, amqp_frame_t *decoded_frame, struct timeval *tv); +/** + * Waits for a specific method from the broker + * + * \warning You probably don't want to use this function. If this function + * doesn't receive exactly the frame requested it closes the whole connection. + * + * Waits for a single method on a channel from the broker. + * If a frame is received that does not match expected_channel + * or expected_method the program will abort + * + * \param [in] state the connection object + * \param [in] expected_channel the channel that the method should be delivered on + * \param [in] expected_method the method to wait for + * \param [out] output the method + * \returns AMQP_STATUS_OK on success. An amqp_status_enum value is returned + * otherwise. Possible errors include: + * - AMQP_STATUS_WRONG_METHOD a frame containing the wrong method, wrong frame + * type or wrong channel was received. The connection is closed. + * - AMQP_STATUS_NO_MEMORY failure in allocating memory. The library is likely in + * an indeterminate state making recovery unlikely. Client should note the error + * and terminate the application + * - AMQP_STATUS_BAD_AMQP_DATA bad AMQP data was received. The connection + * should be shutdown immediately + * - AMQP_STATUS_UNKNOWN_METHOD: an unknown method was received from the + * broker. This is likely a protocol error and the connection should be + * shutdown immediately + * - AMQP_STATUS_UNKNOWN_CLASS: a properties frame with an unknown class + * was received from the broker. This is likely a protocol error and the + * connection should be shutdown immediately + * - AMQP_STATUS_HEARTBEAT_TIMEOUT timed out while waiting for heartbeat + * from the broker. The connection has been closed. + * - AMQP_STATUS_TIMER_FAILURE system timer indicated failure. + * - AMQP_STATUS_SOCKET_ERROR a socket error occurred. The connection has + * been closed + * - AMQP_STATUS_SSL_ERROR a SSL socket error occurred. The connection has + * been closed. + * + * \since v0.1 + */ + AMQP_PUBLIC_FUNCTION int AMQP_CALL amqp_simple_wait_method(amqp_connection_state_t state, @@ -503,6 +1539,32 @@ AMQP_CALL amqp_simple_wait_method(amqp_connection_state_t state, amqp_method_number_t expected_method, amqp_method_t *output); +/** + * Sends a method to the broker + * + * This is a thin wrapper around amqp_send_frame(), providing a way to send + * a method to the broker on a specified channel. + * + * \param [in] state the connection object + * \param [in] channel the channel object + * \param [in] id the method number + * \param [in] decoded the method object + * \returns AMQP_STATUS_OK on success, an amqp_status_enum value otherwise. + * Possible errors include: + * - AMQP_STATUS_BAD_AMQP_DATA the serialized form of the method or + * properties was too large to fit in a single AMQP frame, or the + * method contains an invalid value. The frame was not sent. + * - AMQP_STATUS_TABLE_TOO_BIG the serialized form of an amqp_table_t is + * too large to fit in a single AMQP frame. Frame was not sent. + * - AMQP_STATUS_UNKNOWN_METHOD an invalid method type was passed in + * - AMQP_STATUS_UNKNOWN_CLASS an invalid properties type was passed in + * - AMQP_STATUS_TIMER_FAILURE system timer indicated failure. The frame + * was sent + * - AMQP_STATUS_SOCKET_ERROR + * - AMQP_STATUS_SSL_ERROR + * + * \since v0.1 + */ AMQP_PUBLIC_FUNCTION int AMQP_CALL amqp_send_method(amqp_connection_state_t state, @@ -510,6 +1572,38 @@ AMQP_CALL amqp_send_method(amqp_connection_state_t state, amqp_method_number_t id, void *decoded); +/** + * Sends a method to the broker and waits for a method response + * + * \param [in] state the connection object + * \param [in] channel the channel object + * \param [in] request_id the method number of the request + * \param [in] expected_reply_ids a 0 terminated array of expected response + * method numbers + * \param [in] decoded_request_method the method to be sent to the broker + * \return a amqp_rpc_reply_t: + * - r.reply_type == AMQP_RESPONSE_NORMAL. RPC completed successfully + * - r.reply_type == AMQP_RESPONSE_SERVER_EXCEPTION. The broker returned an + * exception: + * - If r.reply.id == AMQP_CHANNEL_CLOSE_METHOD a channel exception + * occurred, cast r.reply.decoded to amqp_channel_close_t* to see details + * of the exception. The client should amqp_send_method() a + * amqp_channel_close_ok_t. The channel must be re-opened before it + * can be used again. Any resources associated with the channel + * (auto-delete exchanges, auto-delete queues, consumers) are invalid + * and must be recreated before attempting to use them again. + * - If r.reply.id == AMQP_CONNECTION_CLOSE_METHOD a connection exception + * occurred, cast r.reply.decoded to amqp_connection_close_t* to see + * details of the exception. The client amqp_send_method() a + * amqp_connection_close_ok_t and disconnect from the broker. + * - r.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION. An exception occurred + * within the library. Examine r.library_error and compare it against + * amqp_status_enum values to determine the error. + * + * \sa amqp_simple_rpc_decoded() + * + * \since v0.1 + */ AMQP_PUBLIC_FUNCTION amqp_rpc_reply_t AMQP_CALL amqp_simple_rpc(amqp_connection_state_t state, @@ -518,6 +1612,20 @@ AMQP_CALL amqp_simple_rpc(amqp_connection_state_t state, amqp_method_number_t *expected_reply_ids, void *decoded_request_method); +/** + * Sends a method to the broker and waits for a method response + * + * \param [in] state the connection object + * \param [in] channel the channel object + * \param [in] request_id the method number of the request + * \param [in] reply_id the method number expected in response + * \param [in] decoded_request_method the request method + * \return a pointer to the method returned from the broker, or NULL on error. + * On error amqp_get_rpc_reply() will return an amqp_rpc_reply_t with + * details on the error that occurred. + * + * \since v0.1 + */ AMQP_PUBLIC_FUNCTION void * AMQP_CALL amqp_simple_rpc_decoded(amqp_connection_state_t state, @@ -526,7 +1634,9 @@ AMQP_CALL amqp_simple_rpc_decoded(amqp_connection_state_t state, amqp_method_number_t reply_id, void *decoded_request_method); -/* +/** + * Get the last global amqp_rpc_reply + * * The API methods corresponding to most synchronous AMQP methods * return a pointer to the decoded method result. Upon error, they * return NULL, and we need some way of discovering what, if anything, @@ -538,17 +1648,143 @@ AMQP_CALL amqp_simple_rpc_decoded(amqp_connection_state_t state, * amqp_rpc_reply_t; operations that do return amqp_rpc_reply_t * generally do NOT update this per-connection-global amqp_rpc_reply_t * instance. + * + * \param [in] state the connection object + * \return the most recent amqp_rpc_reply_t: + * - r.reply_type == AMQP_RESPONSE_NORMAL. RPC completed successfully + * - r.reply_type == AMQP_RESPONSE_SERVER_EXCEPTION. The broker returned an + * exception: + * - If r.reply.id == AMQP_CHANNEL_CLOSE_METHOD a channel exception + * occurred, cast r.reply.decoded to amqp_channel_close_t* to see details + * of the exception. The client should amqp_send_method() a + * amqp_channel_close_ok_t. The channel must be re-opened before it + * can be used again. Any resources associated with the channel + * (auto-delete exchanges, auto-delete queues, consumers) are invalid + * and must be recreated before attempting to use them again. + * - If r.reply.id == AMQP_CONNECTION_CLOSE_METHOD a connection exception + * occurred, cast r.reply.decoded to amqp_connection_close_t* to see + * details of the exception. The client amqp_send_method() a + * amqp_connection_close_ok_t and disconnect from the broker. + * - r.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION. An exception occurred + * within the library. Examine r.library_error and compare it against + * amqp_status_enum values to determine the error. + * + * \sa amqp_simple_rpc_decoded() + * + * \since v0.1 */ AMQP_PUBLIC_FUNCTION amqp_rpc_reply_t AMQP_CALL amqp_get_rpc_reply(amqp_connection_state_t state); +/** + * Login to the broker + * + * After using amqp_open_socket and amqp_set_sockfd, call + * amqp_login to complete connecting to the broker + * + * \param [in] state the connection object + * \param [in] vhost the virtual host to connect to on the broker. The default + * on most brokers is "/" + * \param [in] channel_max the limit for number of channels for the connection. + * 0 means no limit, and is a good default (AMQP_DEFAULT_MAX_CHANNELS) + * Note that the maximum number of channels the protocol supports + * is 65535 (2^16, with the 0-channel reserved) + * \param [in] frame_max the maximum size of an AMQP frame on the wire to + * request of the broker for this connection. 4096 is the minimum + * size, 2^31-1 is the maximum, a good default is 131072 (128KB), or + * AMQP_DEFAULT_FRAME_SIZE + * \param [in] heartbeat the number of seconds between heartbeat frames to + * request of the broker. A value of 0 disables heartbeats. + * Note rabbitmq-c only has partial support for heartbeats, as of + * v0.4.0 they are only serviced during amqp_basic_publish() and + * amqp_simple_wait_frame()/amqp_simple_wait_frame_noblock() + * \param [in] sasl_method the SASL method to authenticate with the broker. + * followed by the authentication information. + * For AMQP_SASL_METHOD_PLAIN, the AMQP_SASL_METHOD_PLAIN + * should be followed by two arguments in this order: + * const char* username, and const char* password. + * \return amqp_rpc_reply_t indicating success or failure. + * - r.reply_type == AMQP_RESPONSE_NORMAL. Login completed successfully + * - r.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION. In most cases errors + * from the broker when logging in will be represented by the broker closing + * the socket. In this case r.library_error will be set to + * AMQP_STATUS_CONNECTION_CLOSED. This error can represent a number of + * error conditions including: invalid vhost, authentication failure. + * - r.reply_type == AMQP_RESPONSE_SERVER_EXCEPTION. The broker returned an + * exception: + * - If r.reply.id == AMQP_CHANNEL_CLOSE_METHOD a channel exception + * occurred, cast r.reply.decoded to amqp_channel_close_t* to see details + * of the exception. The client should amqp_send_method() a + * amqp_channel_close_ok_t. The channel must be re-opened before it + * can be used again. Any resources associated with the channel + * (auto-delete exchanges, auto-delete queues, consumers) are invalid + * and must be recreated before attempting to use them again. + * - If r.reply.id == AMQP_CONNECTION_CLOSE_METHOD a connection exception + * occurred, cast r.reply.decoded to amqp_connection_close_t* to see + * details of the exception. The client amqp_send_method() a + * amqp_connection_close_ok_t and disconnect from the broker. + * + * \since v0.1 + */ AMQP_PUBLIC_FUNCTION amqp_rpc_reply_t AMQP_CALL amqp_login(amqp_connection_state_t state, char const *vhost, int channel_max, int frame_max, int heartbeat, amqp_sasl_method_enum sasl_method, ...); +/** + * Login to the broker passing a properties table + * + * This function is similar to amqp_login() and differs in that it provides a + * way to pass client properties to the broker. This is commonly used to + * negotiate newer protocol features as they are supported by the broker. + * + * \param [in] state the connection object + * \param [in] vhost the virtual host to connect to on the broker. The default + * on most brokers is "/" + * \param [in] channel_max the limit for the number of channels for the connection. + * 0 means no limit, and is a good default (AMQP_DEFAULT_MAX_CHANNELS) + * Note that the maximum number of channels the protocol supports + * is 65535 (2^16, with the 0-channel reserved) + * \param [in] frame_max the maximum size of an AMQP frame ont he wire to + * request of the broker for this connection. 4096 is the minimum + * size, 2^31-1 is the maximum, a good default is 131072 (128KB), or + * AMQP_DEFAULT_FRAME_SIZE + * \param [in] heartbeat the number of seconds between heartbeat frame to + * request of the broker. A value of 0 disables heartbeats. + * Note rabbitmq-c only has partial support for hearts, as of + * v0.4.0 heartbeats are only serviced during amqp_basic_publish(), + * and amqp_simple_wait_frame()/amqp_simple_wait_frame_noblock() + * \param [in] properties a table of properties to send the broker. + * \param [in] sasl_method the SASL method to authenticate with the broker + * followed by the authentication information. + * For AMQP_SASL_METHOD_PLAN, the AMQP_SASL_METHOD_PLAIN parameter + * should be followed by two arguments in this order: + * const char* username, and const char* password. + * \return amqp_rpc_reply_t indicating success or failure. + * - r.reply_type == AMQP_RESPONSE_NORMAL. Login completed successfully + * - r.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION. In most cases errors + * from the broker when logging in will be represented by the broker closing + * the socket. In this case r.library_error will be set to + * AMQP_STATUS_CONNECTION_CLOSED. This error can represent a number of + * error conditions including: invalid vhost, authentication failure. + * - r.reply_type == AMQP_RESPONSE_SERVER_EXCEPTION. The broker returned an + * exception: + * - If r.reply.id == AMQP_CHANNEL_CLOSE_METHOD a channel exception + * occurred, cast r.reply.decoded to amqp_channel_close_t* to see details + * of the exception. The client should amqp_send_method() a + * amqp_channel_close_ok_t. The channel must be re-opened before it + * can be used again. Any resources associated with the channel + * (auto-delete exchanges, auto-delete queues, consumers) are invalid + * and must be recreated before attempting to use them again. + * - If r.reply.id == AMQP_CONNECTION_CLOSE_METHOD a connection exception + * occurred, cast r.reply.decoded to amqp_connection_close_t* to see + * details of the exception. The client amqp_send_method() a + * amqp_connection_close_ok_t and disconnect from the broker. + * + * \since v0.4.0 + */ AMQP_PUBLIC_FUNCTION amqp_rpc_reply_t AMQP_CALL amqp_login_with_properties(amqp_connection_state_t state, char const *vhost, @@ -557,6 +1793,52 @@ AMQP_CALL amqp_login_with_properties(amqp_connection_state_t state, char const * struct amqp_basic_properties_t_; +/** + * Publish a message to the broker + * + * Publish a message on an exchange with a routing key. + * + * Note that at the AMQ protocol level basic.publish is an async method: + * this means error conditions that occur on the broker (such as publishing to + * a non-existent exchange) will not be reflected in the return value of this + * function. + * + * in the return value from this function. + * \param [in] state the connection object + * \param [in] channel the channel identifier + * \param [in] exchange the exchange on the broker to publish to + * \param [in] routing_key the routing key to use when publishing the message + * \param [in] mandatory indicate to the broker that the message MUST be routed + * to a queue. If the broker cannot do this it should respond with + * a basic.reject method. + * \param [in] immediate indicate to the broker that the message MUST be delivered + * to a consumer immediately. If the broker cannot do this it should + * response with a basic.reject method. + * \param [in] properties the properties associated with the message + * \param [in] body the message body + * \return AMQP_STATUS_OK on success, amqp_status_enum value on failure. Note + * that basic.publish is an async method, the return value from this + * function only indicates that the message data was successfully + * transmitted to the broker. It does not indicate failures that occur + * on the broker, such as publishing to a non-existent exchange. + * Possible error values: + * - AMQP_STATUS_TIMER_FAILURE: system timer facility returned an error + * the message was not sent. + * - AMQP_STATUS_HEARTBEAT_TIMEOUT: connection timed out waiting for a + * heartbeat from the broker. The message was not sent. + * - AMQP_STATUS_NO_MEMORY: memory allocation failed. The message was + * not sent. + * - AMQP_STATUS_TABLE_TOO_BIG: a table in the properties was too large + * to fit in a single frame. Message was not sent. + * - AMQP_STATUS_CONNECTION_CLOSED: the connection was closed. + * - AMQP_STATUS_SSL_ERROR: a SSL error occurred. + * - AMQP_STATUS_TCP_ERROR: a TCP error occurred. errno or + * WSAGetLastError() may provide more information + * + * Note: this function does heartbeat processing as of v0.4.0 + * + * \since v0.1 + */ AMQP_PUBLIC_FUNCTION int AMQP_CALL amqp_basic_publish(amqp_connection_state_t state, amqp_channel_t channel, @@ -565,51 +1847,153 @@ AMQP_CALL amqp_basic_publish(amqp_connection_state_t state, amqp_channel_t chann struct amqp_basic_properties_t_ const *properties, amqp_bytes_t body); +/** + * Closes an channel + * + * \param [in] state the connection object + * \param [in] channel the channel identifier + * \param [in] code the reason for closing the channel, AMQP_REPLY_SUCCESS is a good default + * \return amqp_rpc_reply_t indicating success or failure + * + * \since v0.1 + */ AMQP_PUBLIC_FUNCTION amqp_rpc_reply_t AMQP_CALL amqp_channel_close(amqp_connection_state_t state, amqp_channel_t channel, int code); +/** + * Closes the entire connection + * + * Implicitly closes all channels and informs the broker the connection + * is being closed, after receiving acknowldgement from the broker it closes + * the socket. + * + * \param [in] state the connection object + * \param [in] code the reason code for closing the connection. AMQP_REPLY_SUCCESS is a good default. + * \return amqp_rpc_reply_t indicating the result + * + * \since v0.1 + */ AMQP_PUBLIC_FUNCTION amqp_rpc_reply_t AMQP_CALL amqp_connection_close(amqp_connection_state_t state, int code); +/** + * Acknowledges a message + * + * Does a basic.ack on a received message + * + * \param [in] state the connection object + * \param [in] channel the channel identifier + * \param [in] delivery_tag the delivery tag of the message to be ack'd + * \param [in] multiple if true, ack all messages up to this delivery tag, if + * false ack only this delivery tag + * \return 0 on success, 0 > on failing to send the ack to the broker. + * this will not indicate failure if something goes wrong on the broker + * + * \since v0.1 + */ AMQP_PUBLIC_FUNCTION int AMQP_CALL amqp_basic_ack(amqp_connection_state_t state, amqp_channel_t channel, uint64_t delivery_tag, amqp_boolean_t multiple); +/** + * Do a basic.get + * + * Synchonously polls the broker for a message in a queue, and + * retrieves the message if a message is in the queue. + * + * \param [in] state the connection object + * \param [in] channel the channel identifier to use + * \param [in] queue the queue name to retrieve from + * \param [in] no_ack if true the message is automatically ack'ed + * if false amqp_basic_ack should be called once the message + * retrieved has been processed + * \return amqp_rpc_reply indicating success or failure + * + * \since v0.1 + */ AMQP_PUBLIC_FUNCTION amqp_rpc_reply_t AMQP_CALL amqp_basic_get(amqp_connection_state_t state, amqp_channel_t channel, amqp_bytes_t queue, amqp_boolean_t no_ack); +/** + * Do a basic.reject + * + * Actively reject a message that has been delivered + * + * \param [in] state the connection object + * \param [in] channel the channel identifier + * \param [in] delivery_tag the delivery tag of the message to reject + * \param [in] requeue indicate to the broker whether it should requeue the + * message or just discard it. + * \return 0 on success, 0 > on failing to send the reject method to the broker. + * This will not indicate failure if something goes wrong on the broker. + * + * \since v0.1 + */ AMQP_PUBLIC_FUNCTION int AMQP_CALL amqp_basic_reject(amqp_connection_state_t state, amqp_channel_t channel, uint64_t delivery_tag, amqp_boolean_t requeue); -/* +/** + * Do a basic.nack + * + * Actively reject a message, this has the same effect as amqp_basic_reject() + * however, amqp_basic_nack() can negatively acknowledge multiple messages with + * one call much like amqp_basic_ack() can acknowledge mutliple messages with + * one call. + * + * \param [in] state the connection object + * \param [in] channel the channel identifier + * \param [in] delivery_tag the delivery tag of the message to reject + * \param [in] multiple if set to 1 negatively acknowledge all unacknowledged + * messages on this channel. + * \param [in] requeue indicate to the broker whether it should requeue the + * message or dead-letter it. + * \return AMQP_STATUS_OK on success, an amqp_status_enum value otherwise. + * + * \since v0.5.0 + */ +AMQP_PUBLIC_FUNCTION +int +AMQP_CALL amqp_basic_nack(amqp_connection_state_t state, amqp_channel_t channel, + uint64_t delivery_tag, amqp_boolean_t multiple, + amqp_boolean_t requeue); +/** + * Check to see if there is data left in the receive buffer + * * Can be used to see if there is data still in the buffer, if so * calling amqp_simple_wait_frame will not immediately enter a * blocking read. * - * Possibly amqp_frames_enqueued should be used for this? + * \param [in] state the connection object + * \return true if there is data in the recieve buffer, false otherwise + * + * \since v0.1 */ AMQP_PUBLIC_FUNCTION amqp_boolean_t AMQP_CALL amqp_data_in_buffer(amqp_connection_state_t state); -/* +/** * Get the error string for the given error code. * - * @deprecated This function has been deprecated in favor of + * \deprecated This function has been deprecated in favor of * \ref amqp_error_string2() which returns statically allocated * string which do not need to be freed by the caller. * * The returned string resides on the heap; the caller is responsible - * for freeing it + * for freeing it. + * + * \param [in] err return error code + * \return the error string * + * \since v0.1 */ AMQP_DEPRECATED( AMQP_PUBLIC_FUNCTION @@ -617,33 +2001,257 @@ AMQP_DEPRECATED( AMQP_CALL amqp_error_string(int err) ); + +/** + * Get the error string for the given error code. + * + * Get an error string associated with an error code. The string is statically + * allocated and does not need to be freed + * + * \param [in] err the error code + * \return the error string + * + * \since v0.4.0 + */ AMQP_PUBLIC_FUNCTION const char * AMQP_CALL amqp_error_string2(int err); +/** + * Deserialize an amqp_table_t from AMQP wireformat + * + * This is an internal function and is not typically used by + * client applications + * + * \param [in] encoded the buffer containing the serialized data + * \param [in] pool memory pool used to allocate the table entries from + * \param [in] output the amqp_table_t structure to fill in. Any existing + * entries will be erased + * \param [in,out] offset The offset into the encoded buffer to start + * reading the serialized table. It will be updated + * by this function to end of the table + * \return AMQP_STATUS_OK on success, an amqp_status_enum value on failure + * Possible error codes: + * - AMQP_STATUS_NO_MEMORY out of memory + * - AMQP_STATUS_BAD_AMQP_DATA invalid wireformat + * + * \since v0.1 + */ AMQP_PUBLIC_FUNCTION int AMQP_CALL amqp_decode_table(amqp_bytes_t encoded, amqp_pool_t *pool, amqp_table_t *output, size_t *offset); +/** + * Serializes an amqp_table_t to the AMQP wireformat + * + * This is an internal function and is not typically used by + * client applications + * + * \param [in] encoded the buffer where to serialize the table to + * \param [in] input the amqp_table_t to serialize + * \param [in,out] offset The offset into the encoded buffer to start + * writing the serialized table. It will be updated + * by this function to where writing left off + * \return AMQP_STATUS_OK on success, an amqp_status_enum value on failure + * Possible error codes: + * - AMQP_STATUS_TABLE_TOO_BIG the serialized form is too large for the + * buffer + * - AMQP_STATUS_BAD_AMQP_DATA invalid table + * + * \since v0.1 + */ AMQP_PUBLIC_FUNCTION int AMQP_CALL amqp_encode_table(amqp_bytes_t encoded, amqp_table_t *input, size_t *offset); + +/** + * Create a deep-copy of an amqp_table_t object + * + * Creates a deep-copy of an amqp_table_t object, using the provided pool + * object to allocate the necessary memory. This memory can be freed later by + * call recycle_amqp_pool(), or empty_amqp_pool() + * + * \param [in] original the table to copy + * \param [in,out] clone the table to copy to + * \param [in] pool the initialized memory pool to do allocations for the table + * from + * \return AMQP_STATUS_OK on success, amqp_status_enum value on failure. + * Possible error values: + * - AMQP_STATUS_NO_MEMORY - memory allocation failure. + * - AMQP_STATUS_INVALID_PARAMETER - invalid table (e.g., no key name) + * + * \since v0.4.0 + */ +AMQP_PUBLIC_FUNCTION +int +AMQP_CALL amqp_table_clone(amqp_table_t *original, amqp_table_t *clone, amqp_pool_t *pool); + +/** + * A message object + * + * \since v0.4.0 + */ +typedef struct amqp_message_t_ { + amqp_basic_properties_t properties; /**< message properties */ + amqp_bytes_t body; /**< message body */ + amqp_pool_t pool; /**< pool used to allocate properties */ +} amqp_message_t; + +/** + * Reads the next message on a channel + * + * Reads a complete message (header + body) on a specified channel. This + * function is intended to be used with amqp_basic_get() or when an + * AMQP_BASIC_DELIVERY_METHOD method is received. + * + * \param [in,out] state the connection object + * \param [in] channel the channel on which to read the message from + * \param [in,out] message a pointer to a amqp_message_t object. Caller should + * call amqp_message_destroy() when it is done using the + * fields in the message object. The caller is responsible for + * allocating/destroying the amqp_message_t object itself. + * \param [in] flags pass in 0. Currently unused. + * \returns a amqp_rpc_reply_t object. ret.reply_type == AMQP_RESPONSE_NORMAL on success. + * + * \since v0.4.0 + */ +AMQP_PUBLIC_FUNCTION +amqp_rpc_reply_t +AMQP_CALL amqp_read_message(amqp_connection_state_t state, + amqp_channel_t channel, + amqp_message_t *message, int flags); + +/** + * Frees memory associated with a amqp_message_t allocated in amqp_read_message + * + * \param [in] message + * + * \since v0.4.0 + */ +AMQP_PUBLIC_FUNCTION +void +AMQP_CALL amqp_destroy_message(amqp_message_t *message); + +/** + * Envelope object + * + * \since v0.4.0 + */ +typedef struct amqp_envelope_t_ { + amqp_channel_t channel; /**< channel message was delivered on */ + amqp_bytes_t consumer_tag; /**< the consumer tag the message was delivered to */ + uint64_t delivery_tag; /**< the messages delivery tag */ + amqp_boolean_t redelivered; /**< flag indicating whether this message is being redelivered */ + amqp_bytes_t exchange; /**< exchange this message was published to */ + amqp_bytes_t routing_key; /**< the routing key this message was published with */ + amqp_message_t message; /**< the message */ +} amqp_envelope_t; + +/** + * Wait for and consume a message + * + * Waits for a basic.deliver method on any channel, upon receipt of + * basic.deliver it reads that message, and returns. If any other method is + * received before basic.deliver, this function will return an amqp_rpc_reply_t + * with ret.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION, and + * ret.library_error == AMQP_STATUS_UNEXPECTED_FRAME. The caller should then + * call amqp_simple_wait_frame() to read this frame and take appropriate action. + * + * This function should be used after starting a consumer with the + * amqp_basic_consume() function + * + * \param [in,out] state the connection object + * \param [in,out] envelope a pointer to a amqp_envelope_t object. Caller + * should call #amqp_destroy_envelope() when it is done using + * the fields in the envelope object. The caller is responsible + * for allocating/destroying the amqp_envelope_t object itself. + * \param [in] timeout a timeout to wait for a message delivery. Passing in + * NULL will result in blocking behavior. + * \param [in] flags pass in 0. Currently unused. + * \returns a amqp_rpc_reply_t object. ret.reply_type == AMQP_RESPONSE_NORMAL + * on success. If ret.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION, and + * ret.library_error == AMQP_STATUS_UNEXPECTED_FRAME, a frame other + * than AMQP_BASIC_DELIVER_METHOD was received, the caller should call + * amqp_simple_wait_frame() to read this frame and take appropriate + * action. + * + * \since v0.4.0 + */ +AMQP_PUBLIC_FUNCTION +amqp_rpc_reply_t +AMQP_CALL amqp_consume_message(amqp_connection_state_t state, + amqp_envelope_t *envelope, + struct timeval *timeout, int flags); + +/** + * Frees memory associated with a amqp_envelope_t allocated in amqp_consume_message() + * + * \param [in] envelope + * + * \since v0.4.0 + */ +AMQP_PUBLIC_FUNCTION +void +AMQP_CALL amqp_destroy_envelope(amqp_envelope_t *envelope); + + +/** + * Parameters used to connect to the RabbitMQ broker + * + * \since v0.2 + */ struct amqp_connection_info { - char *user; - char *password; - char *host; - char *vhost; - int port; + char *user; /**< the username to authenticate with the broker, default on most broker is 'guest' */ + char *password; /**< the password to authenticate with the broker, default on most brokers is 'guest' */ + char *host; /**< the hostname of the broker */ + char *vhost; /**< the virtual host on the broker to connect to, a good default is "/" */ + int port; /**< the port that the broker is listening on, default on most brokers is 5672 */ amqp_boolean_t ssl; }; +/** + * Initialze an amqp_connection_info to default values + * + * The default values are: + * - user: "guest" + * - password: "guest" + * - host: "localhost" + * - vhost: "/" + * - port: 5672 + * + * \param [out] parsed the connection info to set defaults on + * + * \since v0.2 + */ AMQP_PUBLIC_FUNCTION void AMQP_CALL amqp_default_connection_info(struct amqp_connection_info *parsed); +/** + * Parse a connection URL + * + * An amqp connection url takes the form: + * + * amqp://[$USERNAME[:$PASSWORD]\@]$HOST[:$PORT]/[$VHOST] + * + * Examples: + * amqp://guest:guest\@localhost:5672// + * amqp://guest:guest\@localhost/myvhost + * + * \note This function modifies url parameter. + * + * \param [in] url URI to parse, note that this parameter is modified by the + * function. + * \param [out] parsed the connection info gleaned from the URI. The char* + * members will point to parts of the url input parameter. + * Memory management will depend on how the url is allocated. + * \returns AMQP_STATUS_OK on success, AMQP_STATUS_BAD_URL on failure + * + * \since v0.2 + */ AMQP_PUBLIC_FUNCTION int AMQP_CALL amqp_parse_url(char *url, struct amqp_connection_info *parsed); @@ -662,7 +2270,9 @@ AMQP_CALL amqp_parse_url(char *url, struct amqp_connection_info *parsed); * \param [in] host Connect to this host. * \param [in] port Connect on this remote port. * - * \return Zero upon success, non-zero otherwise. + * \return AMQP_STATUS_OK on success, an amqp_status_enum on failure + * + * \since v0.4.0 */ AMQP_PUBLIC_FUNCTION int @@ -670,54 +2280,74 @@ AMQP_CALL amqp_socket_open(amqp_socket_t *self, const char *host, int port); /** - * Close a socket connection and free resources. + * Open a socket connection. * - * This function closes a socket connection and releases any resources used by - * the object. After calling this function the specified socket should no - * longer be referenced. + * This function opens a socket connection returned from amqp_tcp_socket_new() + * or amqp_ssl_socket_new(). This function should be called after setting + * socket options and prior to assigning the socket to an AMQP connection with + * amqp_set_socket(). * * \param [in,out] self A socket object. + * \param [in] host Connect to this host. + * \param [in] port Connect on this remote port. + * \param [in] timeout Max allowed time to spent on opening. If NULL - run in blocking mode + * + * \return AMQP_STATUS_OK on success, an amqp_status_enum on failure. * - * \return Zero upon success, non-zero otherwise. + * \since v0.4.0 */ AMQP_PUBLIC_FUNCTION int AMQP_CALL -amqp_socket_close(amqp_socket_t *self); +amqp_socket_open_noblock(amqp_socket_t *self, const char *host, int port, struct timeval *timeout); /** - * Retrieve an error code for the last socket operation. + * Get the socket descriptor in use by a socket object. * - * At the time of writing, this interface is not well supported and is subject - * to changes! + * Retrieve the underlying socket descriptor. This function can be used to + * perform low-level socket operations that aren't supported by the socket + * interface. Use with caution! * * \param [in,out] self A socket object. * - * \return Zero upon success, an opaque error code otherwise + * \return The underlying socket descriptor, or -1 if there is no socket descriptor + * associated with + * with + * + * \since v0.4.0 */ AMQP_PUBLIC_FUNCTION int AMQP_CALL -amqp_socket_error(amqp_socket_t *self); +amqp_socket_get_sockfd(amqp_socket_t *self); /** - * Get the socket descriptor in use by a socket object. + * Get the socket object associated with a amqp_connection_state_t * - * Retrieve the underlying socket descriptor. This function can be used to - * perform low-level socket operations that aren't supported by the socket - * interface. Use with caution! + * \param [in] state the connection object to get the socket from + * \return a pointer to the socket object, or NULL if one has not been assigned * - * \param [in,out] self A socket object. + * \since v0.4.0 + */ +AMQP_PUBLIC_FUNCTION +amqp_socket_t * +amqp_get_socket(amqp_connection_state_t state); + +/** + * Get the broker properties table + * + * \param [in] state the connection object + * \return a pointer to an amqp_table_t containing the properties advertised + * by the broker on connection. The connection object owns the table, it + * should not be modified. * - * \return The underlying socket descriptor. + * \since v0.5.0 */ AMQP_PUBLIC_FUNCTION -int -AMQP_CALL -amqp_socket_get_sockfd(amqp_socket_t *self); +amqp_table_t * +amqp_get_server_properties(amqp_connection_state_t state); AMQP_END_DECLS -#include <amqp_framing.h> #endif /* AMQP_H */ diff --git a/librabbitmq/amqp_api.c b/librabbitmq/amqp_api.c index 2f40681..1dd303e 100644 --- a/librabbitmq/amqp_api.c +++ b/librabbitmq/amqp_api.c @@ -327,3 +327,14 @@ int amqp_basic_reject(amqp_connection_state_t state, req.requeue = requeue; return amqp_send_method(state, channel, AMQP_BASIC_REJECT_METHOD, &req); } + +int amqp_basic_nack(amqp_connection_state_t state, amqp_channel_t channel, + uint64_t delivery_tag, amqp_boolean_t multiple, + amqp_boolean_t requeue) +{ + amqp_basic_nack_t req; + req.delivery_tag = delivery_tag; + req.multiple = multiple; + req.requeue = requeue; + return amqp_send_method(state, channel, AMQP_BASIC_NACK_METHOD, &req); +} diff --git a/librabbitmq/amqp_connection.c b/librabbitmq/amqp_connection.c index d5c29b0..5d70b07 100644 --- a/librabbitmq/amqp_connection.c +++ b/librabbitmq/amqp_connection.c @@ -91,6 +91,8 @@ amqp_connection_state_t amqp_new_connection(void) goto out_nomem; } + init_amqp_pool(&state->properties_pool, 512); + return state; out_nomem: @@ -107,20 +109,25 @@ int amqp_get_sockfd(amqp_connection_state_t state) void amqp_set_sockfd(amqp_connection_state_t state, int sockfd) { - amqp_socket_t *socket = amqp_tcp_socket_new(); + amqp_socket_t *socket = amqp_tcp_socket_new(state); if (!socket) { amqp_abort("%s", strerror(errno)); } amqp_tcp_socket_set_sockfd(socket, sockfd); - amqp_set_socket(state, socket); } void amqp_set_socket(amqp_connection_state_t state, amqp_socket_t *socket) { - amqp_socket_close(state->socket); + amqp_socket_delete(state->socket); state->socket = socket; } +amqp_socket_t * +amqp_get_socket(amqp_connection_state_t state) +{ + return state->socket; +} + int amqp_tune_connection(amqp_connection_state_t state, int channel_max, int frame_max, @@ -175,7 +182,8 @@ int amqp_destroy_connection(amqp_connection_state_t state) free(state->outbound_buffer.bytes); free(state->sock_inbound_buffer.bytes); - status = amqp_socket_close(state->socket); + amqp_socket_delete(state->socket); + empty_amqp_pool(&state->properties_pool); free(state); } return status; @@ -511,3 +519,8 @@ int amqp_send_frame(amqp_connection_state_t state, return res; } +amqp_table_t * +amqp_get_server_properties(amqp_connection_state_t state) +{ + return &state->server_properties; +} diff --git a/librabbitmq/amqp_consumer.c b/librabbitmq/amqp_consumer.c new file mode 100644 index 0000000..6c6c1c9 --- /dev/null +++ b/librabbitmq/amqp_consumer.c @@ -0,0 +1,301 @@ +/* vim:set ft=c ts=2 sw=2 sts=2 et cindent: */ +/* + * ***** BEGIN LICENSE BLOCK ***** + * Version: MIT + * + * Portions created by Alan Antonuk are Copyright (c) 2013 + * Alan Antonuk. All Rights Reserved. + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, copy, + * modify, merge, publish, distribute, sublicense, and/or sell copies + * of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS + * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN + * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * ***** END LICENSE BLOCK ***** + */ +#include "amqp.h" +#include "amqp_private.h" +#include "amqp_socket.h" + +#include <stdlib.h> +#include <string.h> + +static +int amqp_basic_properties_clone(amqp_basic_properties_t *original, + amqp_basic_properties_t *clone, + amqp_pool_t *pool) +{ + memset(clone, 0, sizeof(amqp_basic_properties_t)); + clone->_flags = original->_flags; + +#define CLONE_BYTES_POOL(original, clone, pool) \ + if (0 == original.len) { \ + clone = amqp_empty_bytes; \ + } else { \ + amqp_pool_alloc_bytes(pool, original.len, &clone); \ + if (NULL == clone.bytes) { \ + return AMQP_STATUS_NO_MEMORY; \ + } \ + memcpy(clone.bytes, original.bytes, clone.len); \ + } + + if (clone->_flags & AMQP_BASIC_CONTENT_TYPE_FLAG) { + CLONE_BYTES_POOL(original->content_type, clone->content_type, pool) + } + + if (clone->_flags & AMQP_BASIC_CONTENT_ENCODING_FLAG) { + CLONE_BYTES_POOL(original->content_encoding, clone->content_encoding, pool) + } + + if (clone->_flags & AMQP_BASIC_HEADERS_FLAG) { + int res = amqp_table_clone(&original->headers, &clone->headers, pool); + if (AMQP_STATUS_OK != res) { + return res; + } + } + + if (clone->_flags & AMQP_BASIC_DELIVERY_MODE_FLAG) { + clone->delivery_mode = original->delivery_mode; + } + + if (clone->_flags & AMQP_BASIC_PRIORITY_FLAG) { + clone->priority = original->priority; + } + + if (clone->_flags & AMQP_BASIC_CORRELATION_ID_FLAG) { + CLONE_BYTES_POOL(original->correlation_id, clone->correlation_id, pool) + } + + if (clone->_flags & AMQP_BASIC_REPLY_TO_FLAG) { + CLONE_BYTES_POOL(original->reply_to, clone->reply_to, pool) + } + + if (clone->_flags & AMQP_BASIC_EXPIRATION_FLAG) { + CLONE_BYTES_POOL(original->expiration, clone->expiration, pool) + } + + if (clone->_flags & AMQP_BASIC_MESSAGE_ID_FLAG) { + CLONE_BYTES_POOL(original->message_id, clone->message_id, pool) + } + + if (clone->_flags & AMQP_BASIC_TIMESTAMP_FLAG) { + clone->timestamp = original->timestamp; + } + + if (clone->_flags & AMQP_BASIC_TYPE_FLAG) { + CLONE_BYTES_POOL(original->type, clone->type, pool) + } + + if (clone->_flags & AMQP_BASIC_USER_ID_FLAG) { + CLONE_BYTES_POOL(original->user_id, clone->user_id, pool) + } + + if (clone->_flags & AMQP_BASIC_APP_ID_FLAG) { + CLONE_BYTES_POOL(original->app_id, clone->app_id, pool) + } + + if (clone->_flags & AMQP_BASIC_CLUSTER_ID_FLAG) { + CLONE_BYTES_POOL(original->cluster_id, clone->cluster_id, pool) + } + + return AMQP_STATUS_OK; +#undef CLONE_BYTES_POOL +} + + +void amqp_destroy_message(amqp_message_t *message) +{ + empty_amqp_pool(&message->pool); + amqp_bytes_free(message->body); +} + +void amqp_destroy_envelope(amqp_envelope_t *envelope) +{ + amqp_destroy_message(&envelope->message); + amqp_bytes_free(envelope->routing_key); + amqp_bytes_free(envelope->exchange); + amqp_bytes_free(envelope->consumer_tag); +} + + +amqp_rpc_reply_t +amqp_consume_message(amqp_connection_state_t state, amqp_envelope_t *envelope, + struct timeval *timeout, AMQP_UNUSED int flags) +{ + int res; + amqp_frame_t frame; + amqp_basic_deliver_t *delivery_method; + amqp_rpc_reply_t ret; + + memset(&ret, 0, sizeof(amqp_rpc_reply_t)); + memset(envelope, 0, sizeof(amqp_envelope_t)); + + res = amqp_simple_wait_frame_noblock(state, &frame, timeout); + if (AMQP_STATUS_OK != res) { + ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION; + ret.library_error = res; + goto error_out1; + } + + if (AMQP_FRAME_METHOD != frame.frame_type + || AMQP_BASIC_DELIVER_METHOD != frame.payload.method.id) { + amqp_put_back_frame(state, &frame); + ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION; + ret.library_error = AMQP_STATUS_UNEXPECTED_STATE; + goto error_out1; + } + + delivery_method = frame.payload.method.decoded; + + envelope->channel = frame.channel; + envelope->consumer_tag = amqp_bytes_malloc_dup(delivery_method->consumer_tag); + envelope->delivery_tag = delivery_method->delivery_tag; + envelope->redelivered = delivery_method->redelivered; + envelope->exchange = amqp_bytes_malloc_dup(delivery_method->exchange); + envelope->routing_key = amqp_bytes_malloc_dup(delivery_method->routing_key); + + if (NULL == envelope->consumer_tag.bytes || + NULL == envelope->exchange.bytes || + NULL == envelope->routing_key.bytes) { + ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION; + ret.library_error = AMQP_STATUS_NO_MEMORY; + goto error_out2; + } + + ret = amqp_read_message(state, envelope->channel, &envelope->message, 0); + if (AMQP_RESPONSE_NORMAL != ret.reply_type) { + goto error_out2; + } + + ret.reply_type = AMQP_RESPONSE_NORMAL; + return ret; + +error_out2: + amqp_bytes_free(envelope->routing_key); + amqp_bytes_free(envelope->exchange); + amqp_bytes_free(envelope->consumer_tag); +error_out1: + return ret; +} + +amqp_rpc_reply_t amqp_read_message(amqp_connection_state_t state, + amqp_channel_t channel, + amqp_message_t *message, + AMQP_UNUSED int flags) +{ + amqp_frame_t frame; + amqp_rpc_reply_t ret; + + size_t body_read; + char *body_read_ptr; + int res; + + memset(&ret, 0, sizeof(amqp_rpc_reply_t)); + memset(message, 0, sizeof(amqp_message_t)); + + res = amqp_simple_wait_frame_on_channel(state, channel, &frame); + if (AMQP_STATUS_OK != res) { + ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION; + ret.library_error = res; + + goto error_out1; + } + + if (AMQP_FRAME_HEADER != frame.frame_type) { + if (AMQP_FRAME_METHOD == frame.frame_type && + (AMQP_CHANNEL_CLOSE_METHOD == frame.payload.method.id || + AMQP_CONNECTION_CLOSE_METHOD == frame.payload.method.id)) { + + ret.reply_type = AMQP_RESPONSE_SERVER_EXCEPTION; + ret.reply = frame.payload.method; + + } else { + ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION; + ret.library_error = AMQP_STATUS_UNEXPECTED_STATE; + + amqp_put_back_frame(state, &frame); + } + goto error_out1; + } + + init_amqp_pool(&message->pool, 4096); + res = amqp_basic_properties_clone(frame.payload.properties.decoded, + &message->properties, &message->pool); + + if (AMQP_STATUS_OK != res) { + ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION; + ret.library_error = res; + goto error_out3; + } + + if (0 == frame.payload.properties.body_size) { + message->body = amqp_empty_bytes; + } else { + message->body = amqp_bytes_malloc(frame.payload.properties.body_size); + if (NULL == message->body.bytes) { + ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION; + ret.library_error = AMQP_STATUS_NO_MEMORY; + goto error_out1; + } + } + + body_read = 0; + body_read_ptr = message->body.bytes; + + while (body_read < message->body.len) { + res = amqp_simple_wait_frame_on_channel(state, channel, &frame); + if (AMQP_STATUS_OK != res) { + ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION; + ret.library_error = res; + goto error_out2; + } + if (AMQP_FRAME_BODY != frame.frame_type) { + if (AMQP_FRAME_METHOD == frame.frame_type && + (AMQP_CHANNEL_CLOSE_METHOD == frame.payload.method.id || + AMQP_CONNECTION_CLOSE_METHOD == frame.payload.method.id)) { + + ret.reply_type = AMQP_RESPONSE_SERVER_EXCEPTION; + ret.reply = frame.payload.method; + } else { + ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION; + ret.library_error = AMQP_STATUS_BAD_AMQP_DATA; + } + goto error_out2; + } + + if (body_read + frame.payload.body_fragment.len > message->body.len) { + ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION; + ret.library_error = AMQP_STATUS_BAD_AMQP_DATA; + goto error_out2; + } + + memcpy(body_read_ptr, frame.payload.body_fragment.bytes, frame.payload.body_fragment.len); + + body_read += frame.payload.body_fragment.len; + body_read_ptr += frame.payload.body_fragment.len; + } + + ret.reply_type = AMQP_RESPONSE_NORMAL; + return ret; + +error_out2: + amqp_bytes_free(message->body); +error_out3: + empty_amqp_pool(&message->pool); +error_out1: + return ret; +} diff --git a/librabbitmq/amqp_cyassl.c b/librabbitmq/amqp_cyassl.c index 89047d9..05ce12e 100644 --- a/librabbitmq/amqp_cyassl.c +++ b/librabbitmq/amqp_cyassl.c @@ -155,7 +155,7 @@ amqp_ssl_error_string(AMQP_UNUSED int err) } static int -amqp_ssl_socket_open(void *base, const char *host, int port) +amqp_ssl_socket_open(void *base, const char *host, int port, struct timeval *timeout) { struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; int status; @@ -167,7 +167,7 @@ amqp_ssl_socket_open(void *base, const char *host, int port) return -1; } - self->sockfd = amqp_open_socket(host, port); + self->sockfd = amqp_open_socket_noblock(host, port, timeout); if (0 > self->sockfd) { self->last_error = - self->sockfd; return -1; diff --git a/librabbitmq/amqp_framing.c b/librabbitmq/amqp_framing.c index e065e48..b2f0a83 100644 --- a/librabbitmq/amqp_framing.c +++ b/librabbitmq/amqp_framing.c @@ -105,6 +105,8 @@ char const *amqp_method_name(amqp_method_number_t methodNumber) { case AMQP_CONNECTION_OPEN_OK_METHOD: return "AMQP_CONNECTION_OPEN_OK_METHOD"; case AMQP_CONNECTION_CLOSE_METHOD: return "AMQP_CONNECTION_CLOSE_METHOD"; case AMQP_CONNECTION_CLOSE_OK_METHOD: return "AMQP_CONNECTION_CLOSE_OK_METHOD"; + case AMQP_CONNECTION_BLOCKED_METHOD: return "AMQP_CONNECTION_BLOCKED_METHOD"; + case AMQP_CONNECTION_UNBLOCKED_METHOD: return "AMQP_CONNECTION_UNBLOCKED_METHOD"; case AMQP_CHANNEL_OPEN_METHOD: return "AMQP_CHANNEL_OPEN_METHOD"; case AMQP_CHANNEL_OPEN_OK_METHOD: return "AMQP_CHANNEL_OPEN_OK_METHOD"; case AMQP_CHANNEL_FLOW_METHOD: return "AMQP_CHANNEL_FLOW_METHOD"; @@ -326,6 +328,23 @@ int amqp_decode_method(amqp_method_number_t methodNumber, *decoded = m; return 0; } + case AMQP_CONNECTION_BLOCKED_METHOD: { + amqp_connection_blocked_t *m = (amqp_connection_blocked_t *) amqp_pool_alloc(pool, sizeof(amqp_connection_blocked_t)); + if (m == NULL) { return AMQP_STATUS_NO_MEMORY; } + { + uint8_t len; + if (!amqp_decode_8(encoded, &offset, &len) + || !amqp_decode_bytes(encoded, &offset, &m->reason, len)) + return AMQP_STATUS_BAD_AMQP_DATA; + } + *decoded = m; + return 0; + } + case AMQP_CONNECTION_UNBLOCKED_METHOD: { + amqp_connection_unblocked_t *m = NULL; /* no fields */ + *decoded = m; + return 0; + } case AMQP_CHANNEL_OPEN_METHOD: { amqp_channel_open_t *m = (amqp_channel_open_t *) amqp_pool_alloc(pool, sizeof(amqp_channel_open_t)); if (m == NULL) { return AMQP_STATUS_NO_MEMORY; } @@ -1267,6 +1286,16 @@ int amqp_encode_method(amqp_method_number_t methodNumber, case AMQP_CONNECTION_CLOSE_OK_METHOD: { return offset; } + case AMQP_CONNECTION_BLOCKED_METHOD: { + amqp_connection_blocked_t *m = (amqp_connection_blocked_t *) decoded; + if (!amqp_encode_8(encoded, &offset, m->reason.len) + || !amqp_encode_bytes(encoded, &offset, m->reason)) + return AMQP_STATUS_BAD_AMQP_DATA; + return offset; + } + case AMQP_CONNECTION_UNBLOCKED_METHOD: { + return offset; + } case AMQP_CHANNEL_OPEN_METHOD: { amqp_channel_open_t *m = (amqp_channel_open_t *) decoded; if (!amqp_encode_8(encoded, &offset, m->out_of_band.len) diff --git a/librabbitmq/amqp_framing.h b/librabbitmq/amqp_framing.h index f6dafe4..0c04cb8 100644 --- a/librabbitmq/amqp_framing.h +++ b/librabbitmq/amqp_framing.h @@ -183,6 +183,16 @@ typedef struct amqp_connection_close_ok_t_ { char dummy; /* Dummy field to avoid empty struct */ } amqp_connection_close_ok_t; +#define AMQP_CONNECTION_BLOCKED_METHOD ((amqp_method_number_t) 0x000A003C) /* 10, 60; 655420 */ +typedef struct amqp_connection_blocked_t_ { + amqp_bytes_t reason; +} amqp_connection_blocked_t; + +#define AMQP_CONNECTION_UNBLOCKED_METHOD ((amqp_method_number_t) 0x000A003D) /* 10, 61; 655421 */ +typedef struct amqp_connection_unblocked_t_ { + char dummy; /* Dummy field to avoid empty struct */ +} amqp_connection_unblocked_t; + #define AMQP_CHANNEL_OPEN_METHOD ((amqp_method_number_t) 0x0014000A) /* 20, 10; 1310730 */ typedef struct amqp_channel_open_t_ { amqp_bytes_t out_of_band; diff --git a/librabbitmq/amqp_gnutls.c b/librabbitmq/amqp_gnutls.c index 734643c..f18d427 100644 --- a/librabbitmq/amqp_gnutls.c +++ b/librabbitmq/amqp_gnutls.c @@ -119,7 +119,7 @@ amqp_ssl_socket_recv(void *base, } static int -amqp_ssl_socket_open(void *base, const char *host, int port) +amqp_ssl_socket_open(void *base, const char *host, int port, struct timeval *timeout) { struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; int status; @@ -132,7 +132,7 @@ amqp_ssl_socket_open(void *base, const char *host, int port) return -1; } - self->sockfd = amqp_open_socket(host, port); + self->sockfd = amqp_open_socket_noblock(host, port, timeout); if (0 > self->sockfd) { self->last_error = -self->sockfd; return -1; diff --git a/librabbitmq/amqp_hostcheck.c b/librabbitmq/amqp_hostcheck.c new file mode 100644 index 0000000..8ad2cf7 --- /dev/null +++ b/librabbitmq/amqp_hostcheck.c @@ -0,0 +1,201 @@ +/* vim:set ft=c ts=2 sw=2 sts=2 et cindent: */ +/* + * Copyright 1996-2014 Daniel Stenberg <daniel@haxx.se>. + * Copyright 2014 Michael Steinert + * + * All rights reserved. + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT OF THIRD PARTY RIGHTS. + * IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, + * DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR + * OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE + * USE OR OTHER DEALINGS IN THE SOFTWARE. + * + * Except as contained in this notice, the name of a copyright holder shall + * not be used in advertising or otherwise to promote the sale, use or other + * dealings in this Software without prior written authorization of the + * copyright holder. + */ + +#include "amqp_private.h" + +#include <string.h> + +/* Portable, consistent toupper (remember EBCDIC). Do not use toupper() + * because its behavior is altered by the current locale. + */ + +static char +amqp_raw_toupper(char in) +{ + switch (in) { + case 'a': + return 'A'; + case 'b': + return 'B'; + case 'c': + return 'C'; + case 'd': + return 'D'; + case 'e': + return 'E'; + case 'f': + return 'F'; + case 'g': + return 'G'; + case 'h': + return 'H'; + case 'i': + return 'I'; + case 'j': + return 'J'; + case 'k': + return 'K'; + case 'l': + return 'L'; + case 'm': + return 'M'; + case 'n': + return 'N'; + case 'o': + return 'O'; + case 'p': + return 'P'; + case 'q': + return 'Q'; + case 'r': + return 'R'; + case 's': + return 'S'; + case 't': + return 'T'; + case 'u': + return 'U'; + case 'v': + return 'V'; + case 'w': + return 'W'; + case 'x': + return 'X'; + case 'y': + return 'Y'; + case 'z': + return 'Z'; + } + return in; +} + +/* + * amqp_raw_equal() is for doing "raw" case insensitive strings. This is meant + * to be locale independent and only compare strings we know are safe for + * this. See http://daniel.haxx.se/blog/2008/10/15/strcasecmp-in-turkish/ for + * some further explanation to why this function is necessary. + * + * The function is capable of comparing a-z case insensitively even for + * non-ascii. + */ + +static int +amqp_raw_equal(const char *first, const char *second) +{ + while (*first && *second) { + if (amqp_raw_toupper(*first) != amqp_raw_toupper(*second)) { + /* get out of the loop as soon as they don't match */ + break; + } + first++; + second++; + } + /* we do the comparison here (possibly again), just to make sure that if + * the loop above is skipped because one of the strings reached zero, we + * must not return this as a successful match + */ + return (amqp_raw_toupper(*first) == amqp_raw_toupper(*second)); +} + +static int +amqp_raw_nequal(const char *first, const char *second, size_t max) +{ + while (*first && *second && max) { + if (amqp_raw_toupper(*first) != amqp_raw_toupper(*second)) { + break; + } + max--; + first++; + second++; + } + if (0 == max) { + return 1; /* they are equal this far */ + } + return amqp_raw_toupper(*first) == amqp_raw_toupper(*second); +} + +/* + * Match a hostname against a wildcard pattern. + * E.g. + * "foo.host.com" matches "*.host.com". + * + * We use the matching rule described in RFC6125, section 6.4.3. + * http://tools.ietf.org/html/rfc6125#section-6.4.3 + */ + +static int +amqp_hostmatch(const char *hostname, const char *pattern) +{ + const char *pattern_label_end, *pattern_wildcard, *hostname_label_end; + int wildcard_enabled; + size_t prefixlen, suffixlen; + pattern_wildcard = strchr(pattern, '*'); + if (pattern_wildcard == NULL) { + return amqp_raw_equal(pattern, hostname) ? 1 : 0; + } + /* We require at least 2 dots in pattern to avoid too wide wildcard match. */ + wildcard_enabled = 1; + pattern_label_end = strchr(pattern, '.'); + if (pattern_label_end == NULL || + strchr(pattern_label_end + 1, '.') == NULL || + pattern_wildcard > pattern_label_end || + amqp_raw_nequal(pattern, "xn--", 4)) { + wildcard_enabled = 0; + } + if (!wildcard_enabled) { + return amqp_raw_equal(pattern, hostname) ? 1 : 0; + } + hostname_label_end = strchr(hostname, '.'); + if (hostname_label_end == NULL || + !amqp_raw_equal(pattern_label_end, hostname_label_end)) { + return 0; + } + /* The wildcard must match at least one character, so the left-most + * label of the hostname is at least as large as the left-most label + * of the pattern. + */ + if (hostname_label_end - hostname < pattern_label_end - pattern) { + return 0; + } + prefixlen = pattern_wildcard - pattern; + suffixlen = pattern_label_end - (pattern_wildcard + 1); + return amqp_raw_nequal(pattern, hostname, prefixlen) && + amqp_raw_nequal(pattern_wildcard + 1, hostname_label_end - suffixlen, + suffixlen) ? 1 : 0; +} + +int +amqp_hostcheck(const char *match_pattern, const char *hostname) +{ + /* sanity check */ + if (!match_pattern || !*match_pattern || !hostname || !*hostname) { + return 0; + } + /* trivial case */ + if (amqp_raw_equal(hostname, match_pattern)) { + return 1; + } + return amqp_hostmatch(hostname, match_pattern); +} diff --git a/librabbitmq/amqp_hostcheck.h b/librabbitmq/amqp_hostcheck.h new file mode 100644 index 0000000..2832933 --- /dev/null +++ b/librabbitmq/amqp_hostcheck.h @@ -0,0 +1,36 @@ +/* vim:set ft=c ts=2 sw=2 sts=2 et cindent: */ +#ifndef librabbitmq_amqp_hostcheck_h +#define librabbitmq_amqp_hostcheck_h + +/* + * Copyright 1996-2014 Daniel Stenberg <daniel@haxx.se>. + * Copyright 2014 Michael Steinert + * + * All rights reserved. + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT OF THIRD PARTY RIGHTS. + * IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, + * DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR + * OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE + * USE OR OTHER DEALINGS IN THE SOFTWARE. + * + * Except as contained in this notice, the name of a copyright holder shall + * not be used in advertising or otherwise to promote the sale, use or other + * dealings in this Software without prior written authorization of the + * copyright holder. + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +int +amqp_hostcheck(const char *match_pattern, const char *hostname); + +#endif diff --git a/librabbitmq/amqp_mem.c b/librabbitmq/amqp_mem.c index 88b1e9f..586117e 100644 --- a/librabbitmq/amqp_mem.c +++ b/librabbitmq/amqp_mem.c @@ -48,7 +48,12 @@ char const *amqp_version(void) { - return VERSION; /* defined in config.h */ + return AMQP_VERSION_STRING; +} + +uint32_t amqp_version_number(void) +{ + return AMQP_VERSION; } void init_amqp_pool(amqp_pool_t *pool, size_t pagesize) diff --git a/librabbitmq/amqp_openssl.c b/librabbitmq/amqp_openssl.c index 0f6c12c..ab8a94e 100644 --- a/librabbitmq/amqp_openssl.c +++ b/librabbitmq/amqp_openssl.c @@ -25,8 +25,13 @@ #include "config.h" #endif +#if defined(__APPLE__) && defined(__MACH__) +# define MAC_OS_X_VERSION_MIN_REQUIRED MAC_OS_X_VERSION_10_6 +#endif + #include "amqp_ssl_socket.h" #include "amqp_socket.h" +#include "amqp_hostcheck.h" #include "amqp_private.h" #include "threads.h" @@ -210,15 +215,9 @@ amqp_ssl_socket_verify_hostname(void *base, const char *host) goto error; } } -#ifdef _MSC_VER -#define strcasecmp _stricmp -#endif - if (strcasecmp(host, (char *)utf8_value)) { + if (!amqp_hostcheck((char *)utf8_value, host)) { goto error; } -#ifdef _MSC_VER -#undef strcasecmp -#endif exit: OPENSSL_free(utf8_value); return status; @@ -228,7 +227,7 @@ error: } static int -amqp_ssl_socket_open(void *base, const char *host, int port) +amqp_ssl_socket_open(void *base, const char *host, int port, struct timeval *timeout) { struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; long result; @@ -243,7 +242,7 @@ amqp_ssl_socket_open(void *base, const char *host, int port) } SSL_set_mode(self->ssl, SSL_MODE_AUTO_RETRY); - self->sockfd = amqp_open_socket(host, port); + self->sockfd = amqp_open_socket_noblock(host, port, timeout); if (0 > self->sockfd) { status = self->sockfd; self->internal_error = amqp_os_socket_error(); @@ -293,6 +292,7 @@ error_out2: self->sockfd = -1; error_out1: SSL_free(self->ssl); + self->ssl = NULL; goto exit; } @@ -300,28 +300,22 @@ static int amqp_ssl_socket_close(void *base) { struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; - if (self) { + + if (self->ssl) { + SSL_shutdown(self->ssl); SSL_free(self->ssl); - amqp_os_socket_close(self->sockfd); - SSL_CTX_free(self->ctx); - free(self->buffer); - free(self); + self->ssl = NULL; } - destroy_openssl(); - return 0; -} -static int -amqp_ssl_socket_error(void *base) -{ - struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; - return self->internal_error; -} + if (-1 != self->sockfd) { + if (amqp_os_socket_close(self->sockfd)) { + return AMQP_STATUS_SOCKET_ERROR; + } -char * -amqp_ssl_error_string(AMQP_UNUSED int err) -{ - return strdup("A ssl socket error occurred."); + self->sockfd = -1; + } + + return AMQP_STATUS_OK; } static int @@ -331,37 +325,59 @@ amqp_ssl_socket_get_sockfd(void *base) return self->sockfd; } +static void +amqp_ssl_socket_delete(void *base) +{ + struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; + + if (self) { + amqp_ssl_socket_close(self); + + SSL_CTX_free(self->ctx); + free(self->buffer); + free(self); + } + destroy_openssl(); +} + static const struct amqp_socket_class_t amqp_ssl_socket_class = { amqp_ssl_socket_writev, /* writev */ amqp_ssl_socket_send, /* send */ amqp_ssl_socket_recv, /* recv */ amqp_ssl_socket_open, /* open */ amqp_ssl_socket_close, /* close */ - amqp_ssl_socket_error, /* error */ - amqp_ssl_socket_get_sockfd /* get_sockfd */ + amqp_ssl_socket_get_sockfd, /* get_sockfd */ + amqp_ssl_socket_delete /* delete */ }; amqp_socket_t * -amqp_ssl_socket_new(void) +amqp_ssl_socket_new(amqp_connection_state_t state) { struct amqp_ssl_socket_t *self = calloc(1, sizeof(*self)); int status; if (!self) { - goto error; + return NULL; } + + self->sockfd = -1; + self->klass = &amqp_ssl_socket_class; + self->verify = 1; + status = initialize_openssl(); if (status) { goto error; } + self->ctx = SSL_CTX_new(SSLv23_client_method()); if (!self->ctx) { goto error; } - self->klass = &amqp_ssl_socket_class; - self->verify = 1; + + amqp_set_socket(state, (amqp_socket_t *)self); + return (amqp_socket_t *)self; error: - amqp_socket_close((amqp_socket_t *)self); + amqp_ssl_socket_delete((amqp_socket_t *)self); return NULL; } @@ -518,6 +534,7 @@ amqp_ssl_locking_callback(int mode, int n, static int initialize_openssl(void) { +#ifdef ENABLE_THREAD_SAFETY #ifdef _WIN32 /* No such thing as PTHREAD_INITIALIZE_MUTEX macro on Win32, so we use this */ if (NULL == openssl_init_mutex) { @@ -533,7 +550,6 @@ initialize_openssl(void) } #endif /* _WIN32 */ -#ifdef ENABLE_THREAD_SAFETY if (pthread_mutex_lock(&openssl_init_mutex)) { return -1; } diff --git a/librabbitmq/amqp_polarssl.c b/librabbitmq/amqp_polarssl.c index 770fdbe..bae3141 100644 --- a/librabbitmq/amqp_polarssl.c +++ b/librabbitmq/amqp_polarssl.c @@ -128,12 +128,20 @@ amqp_ssl_socket_recv(void *base, } static int -amqp_ssl_socket_open(void *base, const char *host, int port) +amqp_ssl_socket_open(void *base, const char *host, int port, struct timeval *timeout) { int status; struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; self->last_error = 0; + if (timeout && (timeout->tv_sec != 0 || timeout->tv_usec != 0)) { + /* We don't support PolarSSL for now because it uses its own connect() wrapper + * It is not too hard to implement net_connect() with noblock support, + * but then we will have to maintain that piece of code and keep it synced with main PolarSSL code base + */ + return AMQP_STATUS_INVALID_PARAMETER; + } + status = net_connect(&self->sockfd, host, port); if (status) { /* This isn't quite right. We should probably translate between diff --git a/librabbitmq/amqp_private.h b/librabbitmq/amqp_private.h index 7192283..afe182d 100644 --- a/librabbitmq/amqp_private.h +++ b/librabbitmq/amqp_private.h @@ -60,6 +60,14 @@ #endif #ifdef _WIN32 +# ifndef WINVER +/* WINVER 0x0502 is WinXP SP2+, Windows Server 2003 SP1+ + * See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa383745(v=vs.85).aspx#macros_for_conditional_declarations */ +# define WINVER 0x0502 +# endif +# ifndef WIN32_LEAN_AND_MEAN +# define WIN32_LEAN_AND_MEAN +# endif # include <Winsock2.h> #else # include <arpa/inet.h> @@ -175,6 +183,9 @@ struct amqp_connection_state_t_ { uint64_t next_recv_heartbeat; uint64_t next_send_heartbeat; + + amqp_table_t server_properties; + amqp_pool_t properties_pool; }; amqp_pool_t *amqp_get_or_create_channel_pool(amqp_connection_state_t connection, amqp_channel_t channel); diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c index 441192a..79a7696 100644 --- a/librabbitmq/amqp_socket.c +++ b/librabbitmq/amqp_socket.c @@ -133,6 +133,39 @@ amqp_os_socket_setsockopt(int sock, int level, int optname, #endif } +static int +amqp_os_socket_setsockblock(int sock, int block) +{ + +#ifdef _WIN32 + int nonblock = !block; + if (NO_ERROR != ioctlsocket(sock, FIONBIO, &nonblock)) { + return AMQP_STATUS_SOCKET_ERROR; + } else { + return AMQP_STATUS_OK; + } +#else + long arg; + + if ((arg = fcntl(sock, F_GETFL, NULL)) < 0) { + return AMQP_STATUS_SOCKET_ERROR; + } + + if (block) { + arg &= (~O_NONBLOCK); + } else { + arg |= O_NONBLOCK; + } + + if (fcntl(sock, F_SETFL, arg) < 0) { + return AMQP_STATUS_SOCKET_ERROR; + } + + return AMQP_STATUS_OK; +#endif +} + + int amqp_os_socket_error(void) { @@ -182,25 +215,32 @@ amqp_socket_open(amqp_socket_t *self, const char *host, int port) { assert(self); assert(self->klass->open); - return self->klass->open(self, host, port); + return self->klass->open(self, host, port, NULL); } int -amqp_socket_close(amqp_socket_t *self) +amqp_socket_open_noblock(amqp_socket_t *self, const char *host, int port, struct timeval *timeout) { - if (self) { - assert(self->klass->close); - return self->klass->close(self); - } - return AMQP_STATUS_OK; + assert(self); + assert(self->klass->open); + return self->klass->open(self, host, port, timeout); } int -amqp_socket_error(amqp_socket_t *self) +amqp_socket_close(amqp_socket_t *self) { assert(self); - assert(self->klass->error); - return self->klass->error(self); + assert(self->klass->close); + return self->klass->close(self); +} + +void +amqp_socket_delete(amqp_socket_t *self) +{ + if (self) { + assert(self->klass->delete); + self->klass->delete(self); + } } int @@ -211,8 +251,16 @@ amqp_socket_get_sockfd(amqp_socket_t *self) return self->klass->get_sockfd(self); } -int amqp_open_socket(char const *hostname, - int portnumber) +int +amqp_open_socket(char const *hostname, + int portnumber) +{ + return amqp_open_socket_noblock(hostname, portnumber, NULL); +} + +int amqp_open_socket_noblock(char const *hostname, + int portnumber, + struct timeval *timeout) { struct addrinfo hint; struct addrinfo *address_list; @@ -221,6 +269,15 @@ int amqp_open_socket(char const *hostname, int sockfd = -1; int last_error = AMQP_STATUS_OK; int one = 1; /* for setsockopt */ + int res; + int timer_error; + amqp_timer_t timer; + + AMQP_INIT_TIMER(timer) + + if (timeout && (timeout->tv_sec < 0 || timeout->tv_usec < 0)) { + return AMQP_STATUS_INVALID_PARAMETER; + } last_error = amqp_os_socket_init(); if (AMQP_STATUS_OK != last_error) { @@ -241,31 +298,147 @@ int amqp_open_socket(char const *hostname, } for (addr = address_list; addr; addr = addr->ai_next) { + if (-1 != sockfd) { + amqp_os_socket_close(sockfd); + sockfd = -1; + } + sockfd = amqp_os_socket_socket(addr->ai_family, addr->ai_socktype, addr->ai_protocol); + if (-1 == sockfd) { last_error = AMQP_STATUS_SOCKET_ERROR; continue; } + #ifdef SO_NOSIGPIPE if (0 != amqp_os_socket_setsockopt(sockfd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one))) { last_error = AMQP_STATUS_SOCKET_ERROR; - amqp_os_socket_close(sockfd); continue; } #endif /* SO_NOSIGPIPE */ - if (0 != amqp_os_socket_setsockopt(sockfd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one)) - || 0 != connect(sockfd, addr->ai_addr, addr->ai_addrlen)) { + + if (0 != amqp_os_socket_setsockopt(sockfd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one))) { last_error = AMQP_STATUS_SOCKET_ERROR; - amqp_os_socket_close(sockfd); continue; + } + + if (timeout) { + /* Trying to connect with timeout, set socket to non-blocking mode */ + if (AMQP_STATUS_OK != amqp_os_socket_setsockblock(sockfd, 0)) { + last_error = AMQP_STATUS_SOCKET_ERROR; + continue; + } + + res = connect(sockfd, addr->ai_addr, addr->ai_addrlen); + + if (0 == res) { + /* Connected immediately, set to blocking mode again */ + if (AMQP_STATUS_OK != amqp_os_socket_setsockblock(sockfd, 1)) { + last_error = AMQP_STATUS_SOCKET_ERROR; + continue; + } + + last_error = AMQP_STATUS_OK; + break; + } + +#ifdef _WIN32 + if (WSAEWOULDBLOCK == amqp_os_socket_error()) { +#else + if (EINPROGRESS == amqp_os_socket_error()) { +#endif + + while(1) { + fd_set write_fd; + fd_set except_fd; + + FD_ZERO(&write_fd); + FD_SET(sockfd, &write_fd); + + FD_ZERO(&except_fd); + FD_SET(sockfd, &except_fd); + + timer_error = amqp_timer_update(&timer, timeout); + + if (timer_error < 0) { + last_error = timer_error; + break; + } + + /* Win32 requires except_fds to be passed to detect connection + * failure. Other platforms only need write_fds, passing except_fds + * seems to be harmless otherwise + */ + res = select(sockfd+1, NULL, &write_fd, &except_fd, &timer.tv); + + if (res > 0) { + int result; + socklen_t result_len = sizeof(result); + + if (getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &result, &result_len) < 0) { + last_error = AMQP_STATUS_SOCKET_ERROR; + break; + } + + if (result != 0) { + last_error = AMQP_STATUS_SOCKET_ERROR; + break; + } + + /* socket is ready to be written to, set to blocking mode again */ + if (AMQP_STATUS_OK != amqp_os_socket_setsockblock(sockfd, 1)) { + last_error = AMQP_STATUS_SOCKET_ERROR; + continue; + } + + last_error = AMQP_STATUS_OK; + break; + } else if (0 == res) { + /* Timed out - return */ + last_error = AMQP_STATUS_TIMEOUT; + break; + } else if (errno == EINTR) { + /* Try again */ + continue; + } else { + /* Error connecting */ + last_error = AMQP_STATUS_SOCKET_ERROR; + break; + } + } /* end while(1) loop */ + + if (last_error == AMQP_STATUS_OK + || last_error == AMQP_STATUS_TIMEOUT + || last_error == AMQP_STATUS_TIMER_FAILURE) { + /* Exit for loop on timer errors or when connection established */ + break; + } + + } else { + /* Error connecting */ + last_error = AMQP_STATUS_SOCKET_ERROR; + break; + + } + } else { - last_error = AMQP_STATUS_OK; - break; + /* Connect in blocking mode */ + if (0 != connect(sockfd, addr->ai_addr, addr->ai_addrlen)) { + last_error = AMQP_STATUS_SOCKET_ERROR; + continue; + } else { + last_error = AMQP_STATUS_OK; + break; + } } } freeaddrinfo(address_list); if (last_error != AMQP_STATUS_OK) { + if (-1 != sockfd) { + amqp_os_socket_close(sockfd); + } + return last_error; } @@ -404,8 +577,9 @@ static int recv_with_timeout(amqp_connection_state_t state, uint64_t start, stru if (0 == current_timestamp) { return AMQP_STATUS_TIMER_FAILURE; } - end_timestamp = start + timeout->tv_sec * AMQP_NS_PER_S + - timeout->tv_usec * AMQP_NS_PER_US; + end_timestamp = start + + (uint64_t)timeout->tv_sec * AMQP_NS_PER_S + + (uint64_t)timeout->tv_usec * AMQP_NS_PER_US; if (current_timestamp > end_timestamp) { return AMQP_STATUS_TIMEOUT; } @@ -526,9 +700,6 @@ static int wait_frame_inner(amqp_connection_state_t state, /* Complete frame was read. Return it. */ return AMQP_STATUS_OK; } - - /* Incomplete or ignored frame. Keep processing input. */ - assert(res != 0); } beginrecv: @@ -559,8 +730,8 @@ beginrecv: if (timeout) { if (0 == timeout_timestamp) { timeout_timestamp = current_timestamp + - timeout->tv_sec * AMQP_NS_PER_S + - timeout->tv_usec * AMQP_NS_PER_US; + (uint64_t)timeout->tv_sec * AMQP_NS_PER_S + + (uint64_t)timeout->tv_usec * AMQP_NS_PER_US; } if (current_timestamp > timeout_timestamp) { @@ -600,7 +771,6 @@ beginrecv: if (AMQP_STATUS_TIMEOUT == res) { if (next_timestamp == state->next_recv_heartbeat) { amqp_socket_close(state->socket); - state->socket = NULL; return AMQP_STATUS_HEARTBEAT_TIMEOUT; } else if (next_timestamp == timeout_timestamp) { return AMQP_STATUS_TIMEOUT; @@ -616,6 +786,109 @@ beginrecv: } } +static amqp_link_t * amqp_create_link_for_frame(amqp_connection_state_t state, amqp_frame_t *frame) +{ + amqp_link_t *link; + amqp_frame_t *frame_copy; + + amqp_pool_t *channel_pool = amqp_get_or_create_channel_pool(state, frame->channel); + + if (NULL == channel_pool) { + return NULL; + } + + link = amqp_pool_alloc(channel_pool, sizeof(amqp_link_t)); + frame_copy = amqp_pool_alloc(channel_pool, sizeof(amqp_frame_t)); + + if (NULL == link || NULL == frame_copy) { + return NULL; + } + + *frame_copy = *frame; + link->data = frame_copy; + + return link; +} + +int amqp_queue_frame(amqp_connection_state_t state, amqp_frame_t *frame) +{ + amqp_link_t *link = amqp_create_link_for_frame(state, frame); + if (NULL == link) { + return AMQP_STATUS_NO_MEMORY; + } + + if (NULL == state->first_queued_frame) { + state->first_queued_frame = link; + } else { + state->last_queued_frame->next = link; + } + + link->next = NULL; + state->last_queued_frame = link; + + return AMQP_STATUS_OK; +} + +int amqp_put_back_frame(amqp_connection_state_t state, amqp_frame_t *frame) +{ + amqp_link_t *link = amqp_create_link_for_frame(state, frame); + if (NULL == link) { + return AMQP_STATUS_NO_MEMORY; + } + + if (NULL == state->first_queued_frame) { + state->first_queued_frame = link; + state->last_queued_frame = link; + link->next = NULL; + } else { + link->next = state->first_queued_frame; + state->first_queued_frame = link; + } + + return AMQP_STATUS_OK; +} + +int amqp_simple_wait_frame_on_channel(amqp_connection_state_t state, + amqp_channel_t channel, + amqp_frame_t *decoded_frame) +{ + amqp_frame_t *frame_ptr; + amqp_link_t *cur; + int res; + + for (cur = state->first_queued_frame; NULL != cur; cur = cur->next) { + frame_ptr = cur->data; + + if (channel == frame_ptr->channel) { + state->first_queued_frame = cur->next; + if (NULL == state->first_queued_frame) { + state->last_queued_frame = NULL; + } + + *decoded_frame = *frame_ptr; + + return AMQP_STATUS_OK; + } + } + + while (1) { + res = wait_frame_inner(state, decoded_frame, NULL); + + if (AMQP_STATUS_OK != res) { + return res; + } + + if (channel == decoded_frame->channel) { + return AMQP_STATUS_OK; + } else { + res = amqp_queue_frame(state, decoded_frame); + if (res != AMQP_STATUS_OK) { + return res; + } + } + } +} + int amqp_simple_wait_frame(amqp_connection_state_t state, amqp_frame_t *decoded_frame) { @@ -654,7 +927,6 @@ int amqp_simple_wait_method(amqp_connection_state_t state, || frame.frame_type != AMQP_FRAME_METHOD || frame.payload.method.id != expected_method) { amqp_socket_close(state->socket); - state->socket = NULL; return AMQP_STATUS_WRONG_METHOD; } *output = frame.payload.method; @@ -859,6 +1131,13 @@ static amqp_rpc_reply_t amqp_login_inner(amqp_connection_state_t state, goto error_res; } + res = amqp_table_clone(&s->server_properties, &state->server_properties, + &state->properties_pool); + + if (AMQP_STATUS_OK != res) { + goto error_res; + } + /* TODO: check that our chosen SASL mechanism is in the list of acceptable mechanisms. Or even let the application choose from the list! */ diff --git a/librabbitmq/amqp_socket.h b/librabbitmq/amqp_socket.h index 52815fe..bdeea63 100644 --- a/librabbitmq/amqp_socket.h +++ b/librabbitmq/amqp_socket.h @@ -28,11 +28,7 @@ #ifndef AMQP_SOCKET_H #define AMQP_SOCKET_H -#include "amqp.h" - -#ifdef _WIN32 -# include <WinSock2.h> -#endif +#include "amqp_private.h" AMQP_BEGIN_DECLS @@ -46,10 +42,10 @@ amqp_os_socket_close(int sockfd); typedef ssize_t (*amqp_socket_writev_fn)(void *, struct iovec *, int); typedef ssize_t (*amqp_socket_send_fn)(void *, const void *, size_t); typedef ssize_t (*amqp_socket_recv_fn)(void *, void *, size_t, int); -typedef int (*amqp_socket_open_fn)(void *, const char *, int); +typedef int (*amqp_socket_open_fn)(void *, const char *, int, struct timeval *); typedef int (*amqp_socket_close_fn)(void *); -typedef int (*amqp_socket_error_fn)(void *); typedef int (*amqp_socket_get_sockfd_fn)(void *); +typedef void (*amqp_socket_delete_fn)(void *); /** V-table for amqp_socket_t */ struct amqp_socket_class_t { @@ -58,8 +54,8 @@ struct amqp_socket_class_t { amqp_socket_recv_fn recv; amqp_socket_open_fn open; amqp_socket_close_fn close; - amqp_socket_error_fn error; amqp_socket_get_sockfd_fn get_sockfd; + amqp_socket_delete_fn delete; }; /** Abstract base class for amqp_socket_t */ @@ -78,6 +74,19 @@ struct iovec { }; #endif + +/** + * Set set the socket object for a connection + * + * This assigns a socket object to the connection, closing and deleting any + * existing socket + * + * \param [in] state The connection object to add the socket to + * \param [in] socket The socket object to assign to the connection + */ +void +amqp_set_socket(amqp_connection_state_t state, amqp_socket_t *socket); + /** * Write to a socket. * @@ -127,6 +136,56 @@ amqp_socket_send(amqp_socket_t *self, const void *buf, size_t len); ssize_t amqp_socket_recv(amqp_socket_t *self, void *buf, size_t len, int flags); +/** + * Close a socket connection and free resources. + * + * This function closes a socket connection and releases any resources used by + * the object. After calling this function the specified socket should no + * longer be referenced. + * + * \param [in,out] self A socket object. + * + * \return Zero upon success, non-zero otherwise. + */ +int +amqp_socket_close(amqp_socket_t *self); + +/** + * Destroy a socket object + * + * \param [in] self the socket object to delete + */ +void +amqp_socket_delete(amqp_socket_t *self); + +/** + * Open a socket connection. + * + * This function opens a socket connection returned from amqp_tcp_socket_new() + * or amqp_ssl_socket_new(). This function should be called after setting + * socket options and prior to assigning the socket to an AMQP connection with + * amqp_set_socket(). + * + * \param [in] host Connect to this host. + * \param [in] port Connect on this remote port. + * \param [in] timeout Max allowed time to spent on opening. If NULL - run in blocking mode + * + * \return File descriptor upon success, non-zero negative error code otherwise. + */ +int +amqp_open_socket_noblock(char const *hostname, int portnumber, struct timeval *timeout); + +int +amqp_queue_frame(amqp_connection_state_t state, amqp_frame_t *frame); + +int +amqp_put_back_frame(amqp_connection_state_t state, amqp_frame_t *frame); + +int +amqp_simple_wait_frame_on_channel(amqp_connection_state_t state, + amqp_channel_t channel, + amqp_frame_t *decoded_frame); + AMQP_END_DECLS #endif /* AMQP_SOCKET_H */ diff --git a/librabbitmq/amqp_ssl_socket.h b/librabbitmq/amqp_ssl_socket.h index 3bfce51..7f8ea64 100644 --- a/librabbitmq/amqp_ssl_socket.h +++ b/librabbitmq/amqp_ssl_socket.h @@ -1,4 +1,5 @@ /* vim:set ft=c ts=2 sw=2 sts=2 et cindent: */ +/** \file */ /* * Copyright 2012-2013 Michael Steinert * @@ -21,10 +22,6 @@ * DEALINGS IN THE SOFTWARE. */ -/** - * An SSL socket connection. - */ - #ifndef AMQP_SSL_H #define AMQP_SSL_H @@ -35,14 +32,28 @@ AMQP_BEGIN_DECLS /** * Create a new SSL/TLS socket object. * - * Call amqp_socket_close() to release socket resources. + * The returned socket object is owned by the \ref amqp_connection_state_t object + * and will be destroyed when the state object is destroyed or a new socket + * object is created. + * + * If the socket object creation fails, the \ref amqp_connection_state_t object + * will not be changed. + * + * The object returned by this function can be retrieved from the + * amqp_connection_state_t object later using the amqp_get_socket() function. + * + * Calling this function may result in the underlying SSL library being initialized. + * \sa amqp_set_initialize_ssl_library() * + * \param [in,out] state The connection object that owns the SSL/TLS socket * \return A new socket object or NULL if an error occurred. + * + * \since v0.4.0 */ AMQP_PUBLIC_FUNCTION amqp_socket_t * AMQP_CALL -amqp_ssl_socket_new(void); +amqp_ssl_socket_new(amqp_connection_state_t state); /** * Set the CA certificate. @@ -50,7 +61,10 @@ amqp_ssl_socket_new(void); * \param [in,out] self An SSL/TLS socket object. * \param [in] cacert Path to the CA cert file in PEM format. * - * \return Zero if successful, -1 otherwise. + * \return \ref AMQP_STATUS_OK on success an \ref amqp_status_enum value on + * failure. + * + * \since v0.4.0 */ AMQP_PUBLIC_FUNCTION int @@ -65,7 +79,10 @@ amqp_ssl_socket_set_cacert(amqp_socket_t *self, * \param [in] cert Path to the client certificate in PEM foramt. * \param [in] key Path to the client key in PEM format. * - * \return Zero if successful, -1 otherwise. + * \return \ref AMQP_STATUS_OK on success an \ref amqp_status_enum value on + * failure. + * + * \since v0.4.0 */ AMQP_PUBLIC_FUNCTION int @@ -82,7 +99,10 @@ amqp_ssl_socket_set_key(amqp_socket_t *self, * \param [in] key A buffer containing client key in PEM format. * \param [in] n The length of the buffer. * - * \return Zero if successful, -1 otherwise. + * \return \ref AMQP_STATUS_OK on success an \ref amqp_status_enum value on + * failure. + * + * \since v0.4.0 */ AMQP_PUBLIC_FUNCTION int @@ -101,6 +121,8 @@ amqp_ssl_socket_set_key_buffer(amqp_socket_t *self, * * \param [in,out] self An SSL/TLS socket object. * \param [in] verify Enable or disable peer verification. + * + * \since v0.4.0 */ AMQP_PUBLIC_FUNCTION void @@ -114,8 +136,8 @@ amqp_ssl_socket_set_verify(amqp_socket_t *self, * For SSL libraries that require a one-time initialization across * a whole program (e.g., OpenSSL) this sets whether or not rabbitmq-c * will initialize the SSL library when the first call to - * amqp_open_ssl_socket() is made. You should call this function with - * do_init = 0 if the underlying SSL library is intialized somewhere else + * amqp_open_socket() is made. You should call this function with + * do_init = 0 if the underlying SSL library is initialized somewhere else * the program. * * Failing to initialize or double initialization of the SSL library will @@ -124,12 +146,13 @@ amqp_ssl_socket_set_verify(amqp_socket_t *self, * By default rabbitmq-c will initialize the underlying SSL library * * NOTE: calling this function after the first socket has been opened with - * amqp_open_ssl_socket() will not have any effect. + * amqp_open_socket() will not have any effect. * - * \param [in] do_initalize If 0 rabbitmq-c will not initialize the SSL - * library, otherwise rabbitmq-c will initialize the - * SL library + * \param [in] do_initialize If 0 rabbitmq-c will not initialize the SSL + * library, otherwise rabbitmq-c will initialize the + * SSL library * + * \since v0.4.0 */ AMQP_PUBLIC_FUNCTION void diff --git a/librabbitmq/amqp_table.c b/librabbitmq/amqp_table.c index 6071ece..6fa6fd3 100644 --- a/librabbitmq/amqp_table.c +++ b/librabbitmq/amqp_table.c @@ -462,3 +462,153 @@ int amqp_table_entry_cmp(void const *entry1, void const *entry2) return p1->key.len - p2->key.len; } + +static int +amqp_field_value_clone(amqp_field_value_t *original, amqp_field_value_t *clone, amqp_pool_t *pool) +{ + int i; + int res; + clone->kind = original->kind; + + switch (clone->kind) { + case AMQP_FIELD_KIND_BOOLEAN: + clone->value.boolean = original->value.boolean; + break; + + case AMQP_FIELD_KIND_I8: + clone->value.i8 = original->value.i8; + break; + + case AMQP_FIELD_KIND_U8: + clone->value.u8 = original->value.u8; + break; + + case AMQP_FIELD_KIND_I16: + clone->value.i16 = original->value.i16; + break; + + case AMQP_FIELD_KIND_U16: + clone->value.u16 = original->value.u16; + break; + + case AMQP_FIELD_KIND_I32: + clone->value.i32 = original->value.i32; + break; + + case AMQP_FIELD_KIND_U32: + clone->value.u32 = original->value.u32; + break; + + case AMQP_FIELD_KIND_I64: + clone->value.i64 = original->value.i64; + break; + + case AMQP_FIELD_KIND_U64: + case AMQP_FIELD_KIND_TIMESTAMP: + clone->value.u64 = original->value.u64; + break; + + case AMQP_FIELD_KIND_F32: + clone->value.f32 = original->value.f32; + break; + + case AMQP_FIELD_KIND_F64: + clone->value.f64 = original->value.f64; + break; + + case AMQP_FIELD_KIND_DECIMAL: + clone->value.decimal = original->value.decimal; + break; + + case AMQP_FIELD_KIND_UTF8: + case AMQP_FIELD_KIND_BYTES: + if (0 == original->value.bytes.len) { + clone->value.bytes = amqp_empty_bytes; + } else { + amqp_pool_alloc_bytes(pool, original->value.bytes.len, &clone->value.bytes); + if (NULL == clone->value.bytes.bytes) { + return AMQP_STATUS_NO_MEMORY; + } + memcpy(clone->value.bytes.bytes, original->value.bytes.bytes, clone->value.bytes.len); + } + break; + + case AMQP_FIELD_KIND_ARRAY: + if (0 == original->value.array.entries) { + clone->value.array = amqp_empty_array; + } else { + clone->value.array.num_entries = original->value.array.num_entries; + clone->value.array.entries = amqp_pool_alloc(pool, clone->value.array.num_entries * sizeof(amqp_field_value_t)); + if (NULL == clone->value.array.entries) { + return AMQP_STATUS_NO_MEMORY; + } + + for (i = 0; i < clone->value.array.num_entries; ++i) { + res = amqp_field_value_clone(&original->value.array.entries[i], &clone->value.array.entries[i], pool); + if (AMQP_STATUS_OK != res) { + return res; + } + } + } + break; + + case AMQP_FIELD_KIND_TABLE: + return amqp_table_clone(&original->value.table, &clone->value.table, pool); + + case AMQP_FIELD_KIND_VOID: + break; + + default: + return AMQP_STATUS_INVALID_PARAMETER; + } + + return AMQP_STATUS_OK; +} + + +static int +amqp_table_entry_clone(amqp_table_entry_t *original, amqp_table_entry_t *clone, amqp_pool_t *pool) +{ + if (0 == original->key.len) { + return AMQP_STATUS_INVALID_PARAMETER; + } + + amqp_pool_alloc_bytes(pool, original->key.len, &clone->key); + if (NULL == clone->key.bytes) { + return AMQP_STATUS_NO_MEMORY; + } + + memcpy(clone->key.bytes, original->key.bytes, clone->key.len); + + return amqp_field_value_clone(&original->value, &clone->value, pool); +} + +int +amqp_table_clone(amqp_table_t *original, amqp_table_t *clone, amqp_pool_t *pool) +{ + int i; + int res; + clone->num_entries = original->num_entries; + if (0 == clone->num_entries) { + *clone = amqp_empty_table; + return AMQP_STATUS_OK; + } + + clone->entries = amqp_pool_alloc(pool, clone->num_entries * sizeof(amqp_table_entry_t)); + + if (NULL == clone->entries) { + return AMQP_STATUS_NO_MEMORY; + } + + for (i = 0; i < clone->num_entries; ++i) { + res = amqp_table_entry_clone(&original->entries[i], &clone->entries[i], pool); + if (AMQP_STATUS_OK != res) { + goto error_out1; + } + } + + return AMQP_STATUS_OK; + +error_out1: + return res; +} diff --git a/librabbitmq/amqp_tcp_socket.c b/librabbitmq/amqp_tcp_socket.c index e43a596..ed38c06 100644 --- a/librabbitmq/amqp_tcp_socket.c +++ b/librabbitmq/amqp_tcp_socket.c @@ -55,7 +55,7 @@ amqp_tcp_socket_send_inner(void *base, const void *buf, size_t len, int flags) #endif start: - res = send(self->sockfd, buf, len, flags); + res = send(self->sockfd, buf_left, len_left, flags); if (res < 0) { self->internal_error = amqp_os_socket_error(); @@ -220,10 +220,10 @@ start: } static int -amqp_tcp_socket_open(void *base, const char *host, int port) +amqp_tcp_socket_open(void *base, const char *host, int port, struct timeval *timeout) { struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base; - self->sockfd = amqp_open_socket(host, port); + self->sockfd = amqp_open_socket_noblock(host, port, timeout); if (0 > self->sockfd) { int err = self->sockfd; self->sockfd = -1; @@ -236,32 +236,34 @@ static int amqp_tcp_socket_close(void *base) { struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base; - int status = -1; - if (self) { - status = amqp_os_socket_close(self->sockfd); - free(self->buffer); - free(self); - } - if (0 == status) { - return AMQP_STATUS_OK; - } else { - return AMQP_STATUS_SOCKET_ERROR; + if (-1 != self->sockfd) { + if (amqp_os_socket_close(self->sockfd)) { + return AMQP_STATUS_SOCKET_ERROR; + } + self->sockfd = -1; } + + return AMQP_STATUS_OK; } static int -amqp_tcp_socket_error(AMQP_UNUSED void *base) +amqp_tcp_socket_get_sockfd(void *base) { struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base; - return self->internal_error; + return self->sockfd; } -static int -amqp_tcp_socket_get_sockfd(void *base) +static void +amqp_tcp_socket_delete(void *base) { struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base; - return self->sockfd; + + if (self) { + amqp_tcp_socket_close(self); + free(self->buffer); + free(self); + } } static const struct amqp_socket_class_t amqp_tcp_socket_class = { @@ -270,12 +272,12 @@ static const struct amqp_socket_class_t amqp_tcp_socket_class = { amqp_tcp_socket_recv, /* recv */ amqp_tcp_socket_open, /* open */ amqp_tcp_socket_close, /* close */ - amqp_tcp_socket_error, /* error */ - amqp_tcp_socket_get_sockfd /* get_sockfd */ + amqp_tcp_socket_get_sockfd, /* get_sockfd */ + amqp_tcp_socket_delete /* delete */ }; amqp_socket_t * -amqp_tcp_socket_new(void) +amqp_tcp_socket_new(amqp_connection_state_t state) { struct amqp_tcp_socket_t *self = calloc(1, sizeof(*self)); if (!self) { @@ -283,6 +285,9 @@ amqp_tcp_socket_new(void) } self->klass = &amqp_tcp_socket_class; self->sockfd = -1; + + amqp_set_socket(state, (amqp_socket_t *)self); + return (amqp_socket_t *)self; } diff --git a/librabbitmq/amqp_tcp_socket.h b/librabbitmq/amqp_tcp_socket.h index 4c8ba54..95ed206 100644 --- a/librabbitmq/amqp_tcp_socket.h +++ b/librabbitmq/amqp_tcp_socket.h @@ -1,4 +1,5 @@ /* vim:set ft=c ts=2 sw=2 sts=2 et cindent: */ +/** \file */ /* * Copyright 2012-2013 Michael Steinert * @@ -35,14 +36,16 @@ AMQP_BEGIN_DECLS /** * Create a new TCP socket. * - * Call amqp_socket_close() to release socket resources. + * Call amqp_connection_close() to release socket resources. * * \return A new socket object or NULL if an error occurred. + * + * \since v0.4.0 */ AMQP_PUBLIC_FUNCTION amqp_socket_t * AMQP_CALL -amqp_tcp_socket_new(void); +amqp_tcp_socket_new(amqp_connection_state_t state); /** * Assign an open file descriptor to a socket object. @@ -53,11 +56,13 @@ amqp_tcp_socket_new(void); * * \param [in,out] self A TCP socket object. * \param [in] sockfd An open socket descriptor. + * + * \since v0.4.0 */ AMQP_PUBLIC_FUNCTION void AMQP_CALL -amqp_tcp_socket_set_sockfd(amqp_socket_t *base, int sockfd); +amqp_tcp_socket_set_sockfd(amqp_socket_t *self, int sockfd); AMQP_END_DECLS diff --git a/librabbitmq/amqp_timer.c b/librabbitmq/amqp_timer.c index 7066358..95606b8 100644 --- a/librabbitmq/amqp_timer.c +++ b/librabbitmq/amqp_timer.c @@ -20,7 +20,9 @@ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER * DEALINGS IN THE SOFTWARE. */ +#include "amqp.h" #include "amqp_timer.h" +#include <string.h> #if (defined(_WIN32) || defined(__WIN32__) || defined(WIN32)) # define AMQP_WIN_TIMER_API @@ -38,7 +40,7 @@ uint64_t amqp_get_monotonic_timestamp(void) { - static uint64_t NS_PER_COUNT = 0; + static double NS_PER_COUNT = 0; LARGE_INTEGER perf_count; if (0 == NS_PER_COUNT) { @@ -46,14 +48,14 @@ amqp_get_monotonic_timestamp(void) if (!QueryPerformanceFrequency(&perf_frequency)) { return 0; } - NS_PER_COUNT = AMQP_NS_PER_S / perf_frequency.QuadPart; + NS_PER_COUNT = (double)AMQP_NS_PER_S / perf_frequency.QuadPart; } if (!QueryPerformanceCounter(&perf_count)) { return 0; } - return perf_count.QuadPart * NS_PER_COUNT; + return (uint64_t)(perf_count.QuadPart * NS_PER_COUNT); } #endif /* AMQP_WIN_TIMER_API */ @@ -96,3 +98,38 @@ amqp_get_monotonic_timestamp(void) return ((uint64_t)tp.tv_sec * AMQP_NS_PER_S + (uint64_t)tp.tv_nsec); } #endif /* AMQP_POSIX_TIMER_API */ + +int +amqp_timer_update(amqp_timer_t *timer, struct timeval *timeout) +{ + if (0 == timer->current_timestamp) { + timer->current_timestamp = amqp_get_monotonic_timestamp(); + + if (0 == timer->current_timestamp) { + return AMQP_STATUS_TIMER_FAILURE; + } + + timer->timeout_timestamp = timer->current_timestamp + + (uint64_t)timeout->tv_sec * AMQP_NS_PER_S + + (uint64_t)timeout->tv_usec * AMQP_NS_PER_US; + + } else { + timer->current_timestamp = amqp_get_monotonic_timestamp(); + + if (0 == timer->current_timestamp) { + return AMQP_STATUS_TIMER_FAILURE; + } + } + + if (timer->current_timestamp > timer->timeout_timestamp) { + return AMQP_STATUS_TIMEOUT; + } + + timer->ns_until_next_timeout = timer->timeout_timestamp - timer->current_timestamp; + + memset(&timer->tv, 0, sizeof(struct timeval)); + timer->tv.tv_sec = timer->ns_until_next_timeout / AMQP_NS_PER_S; + timer->tv.tv_usec = (timer->ns_until_next_timeout % AMQP_NS_PER_S) / AMQP_NS_PER_US; + + return AMQP_STATUS_OK; +} diff --git a/librabbitmq/amqp_timer.h b/librabbitmq/amqp_timer.h index d1718af..8ac3de9 100644 --- a/librabbitmq/amqp_timer.h +++ b/librabbitmq/amqp_timer.h @@ -25,12 +25,39 @@ #include <stdint.h> +#ifdef _WIN32 +# ifndef WINVER +# define WINVER 0x0502 +# endif +# ifndef WIN32_LEAN_AND_MEAN +# define WIN32_LEAN_AND_MEAN +# endif +# include <Winsock2.h> +#else +# include <sys/time.h> +#endif + #define AMQP_NS_PER_S 1000000000 #define AMQP_NS_PER_US 1000 +#define AMQP_INIT_TIMER(structure) { \ + structure.current_timestamp = 0; \ + structure.timeout_timestamp = 0; \ +} + +typedef struct amqp_timer_t_ { + uint64_t current_timestamp; + uint64_t timeout_timestamp; + uint64_t ns_until_next_timeout; + struct timeval tv; +} amqp_timer_t; + /* Gets a monotonic timestamp in ns */ uint64_t amqp_get_monotonic_timestamp(void); -#endif /* AMQP_TIMER_H */ +/* Prepare timeout value and modify timer state based on timer state. */ +int +amqp_timer_update(amqp_timer_t *timer, struct timeval *timeout); +#endif /* AMQP_TIMER_H */ diff --git a/librabbitmq/codegen.py b/librabbitmq/codegen.py index c8e6d7a..6611716 100644 --- a/librabbitmq/codegen.py +++ b/librabbitmq/codegen.py @@ -244,6 +244,10 @@ def methodApiPrototype(m): fn = m.fullName() info = apiMethodInfo.get(fn, []) + docs = "/**\n * %s\n *\n" % (fn) + docs += " * @param [in] state connection state\n" + docs += " * @param [in] channel the channel to do the RPC on\n" + args = [] for f in m.arguments: n = c_ize(f.name) @@ -254,8 +258,12 @@ def methodApiPrototype(m): args.append(typeFor(m.klass.spec, f).ctype) args.append(" ") args.append(n) + docs += " * @param [in] %s %s\n" % (n, n) + + docs += " * @returns %s_ok_t\n" % (fn) + docs += " */\n" - return "AMQP_PUBLIC_FUNCTION %s_ok_t * AMQP_CALL %s(amqp_connection_state_t state, amqp_channel_t channel%s)" % (fn, fn, ''.join(args)) + return "%sAMQP_PUBLIC_FUNCTION\n%s_ok_t *\nAMQP_CALL %s(amqp_connection_state_t state, amqp_channel_t channel%s)" % (docs, fn, fn, ''.join(args)) AmqpMethod.apiPrototype = methodApiPrototype @@ -545,11 +553,11 @@ int amqp_encode_properties(uint16_t class_id, def genHrl(spec): def fieldDeclList(fields): if fields: - return ''.join([" %s %s;\n" % (typeFor(spec, f).ctype, - c_ize(f.name)) + return ''.join([" %s %s; /**< %s */\n" % (typeFor(spec, f).ctype, + c_ize(f.name), f.name) for f in fields]) else: - return " char dummy; /* Dummy field to avoid empty struct */\n" + return " char dummy; /**< Dummy field to avoid empty struct */\n" def propDeclList(fields): return ''.join([" %s %s;\n" % (typeFor(spec, f).ctype, c_ize(f.name)) @@ -594,6 +602,7 @@ def genHrl(spec): * ***** END LICENSE BLOCK ***** */ +/** @file amqp_framing.h */ #ifndef AMQP_FRAMING_H #define AMQP_FRAMING_H @@ -601,33 +610,74 @@ def genHrl(spec): AMQP_BEGIN_DECLS """ - print "#define AMQP_PROTOCOL_VERSION_MAJOR %d" % (spec.major) - print "#define AMQP_PROTOCOL_VERSION_MINOR %d" % (spec.minor) - print "#define AMQP_PROTOCOL_VERSION_REVISION %d" % (spec.revision) - print "#define AMQP_PROTOCOL_PORT %d" % (spec.port) + print "#define AMQP_PROTOCOL_VERSION_MAJOR %d /**< AMQP protocol version major */" % (spec.major) + print "#define AMQP_PROTOCOL_VERSION_MINOR %d /**< AMQP protocol version minor */" % (spec.minor) + print "#define AMQP_PROTOCOL_VERSION_REVISION %d /**< AMQP protocol version revision */" % (spec.revision) + print "#define AMQP_PROTOCOL_PORT %d /**< Default AMQP Port */" % (spec.port) for (c,v,cls) in spec.constants: - print "#define %s %s" % (cConstantName(c), v) + print "#define %s %s /**< Constant: %s */" % (cConstantName(c), v, c) print print """/* Function prototypes. */ +/** + * Get constant name string from constant + * + * @param [in] constantNumber constant to get the name of + * @returns string describing the constant. String is managed by + * the library and should not be free()'d by the program + */ AMQP_PUBLIC_FUNCTION char const * AMQP_CALL amqp_constant_name(int constantNumber); +/** + * Checks to see if a constant is a hard error + * + * A hard error occurs when something severe enough + * happens that the connection must be closed. + * + * @param [in] constantNumber the error constant + * @returns true if its a hard error, false otherwise + */ AMQP_PUBLIC_FUNCTION amqp_boolean_t AMQP_CALL amqp_constant_is_hard_error(int constantNumber); +/** + * Get method name string from method number + * + * @param [in] methodNumber the method number + * @returns method name string. String is managed by the library + * and should not be freed()'d by the program + */ AMQP_PUBLIC_FUNCTION char const * AMQP_CALL amqp_method_name(amqp_method_number_t methodNumber); +/** + * Check whether a method has content + * + * A method that has content will receive the method frame + * a properties frame, then 1 to N body frames + * + * @param [in] methodNumber the method number + * @returns true if method has content, false otherwise + */ AMQP_PUBLIC_FUNCTION amqp_boolean_t AMQP_CALL amqp_method_has_content(amqp_method_number_t methodNumber); +/** + * Decodes a method from AMQP wireformat + * + * @param [in] methodNumber the method number for the decoded parameter + * @param [in] pool the memory pool to allocate the decoded method from + * @param [in] encoded the encoded byte string buffer + * @param [out] decoded pointer to the decoded method struct + * @returns 0 on success, an error code otherwise + */ AMQP_PUBLIC_FUNCTION int AMQP_CALL amqp_decode_method(amqp_method_number_t methodNumber, @@ -635,6 +685,15 @@ AMQP_CALL amqp_decode_method(amqp_method_number_t methodNumber, amqp_bytes_t encoded, void **decoded); +/** + * Decodes a header frame properties structure from AMQP wireformat + * + * @param [in] class_id the class id for the decoded parameter + * @param [in] pool the memory pool to allocate the decoded properties from + * @param [in] encoded the encoded byte string buffer + * @param [out] decoded pointer to the decoded properties struct + * @returns 0 on success, an error code otherwise + */ AMQP_PUBLIC_FUNCTION int AMQP_CALL amqp_decode_properties(uint16_t class_id, @@ -642,12 +701,32 @@ AMQP_CALL amqp_decode_properties(uint16_t class_id, amqp_bytes_t encoded, void **decoded); +/** + * Encodes a method structure in AMQP wireformat + * + * @param [in] methodNumber the method number for the decoded parameter + * @param [in] decoded the method structure (e.g., amqp_connection_start_t) + * @param [in] encoded an allocated byte buffer for the encoded method + * structure to be written to. If the buffer isn't large enough + * to hold the encoded method, an error code will be returned. + * @returns 0 on success, an error code otherwise. + */ AMQP_PUBLIC_FUNCTION int AMQP_CALL amqp_encode_method(amqp_method_number_t methodNumber, void *decoded, amqp_bytes_t encoded); +/** + * Encodes a properties structure in AMQP wireformat + * + * @param [in] class_id the class id for the decoded parameter + * @param [in] decoded the properties structure (e.g., amqp_basic_properties_t) + * @param [in] encoded an allocated byte buffer for the encoded properties to written to. + * If the buffer isn't large enough to hold the encoded method, an + * an error code will be returned + * @returns 0 on success, an error code otherwise. + */ AMQP_PUBLIC_FUNCTION int AMQP_CALL amqp_encode_properties(uint16_t class_id, @@ -658,19 +737,21 @@ AMQP_CALL amqp_encode_properties(uint16_t class_id, print "/* Method field records. */\n" for m in methods: methodid = m.klass.index << 16 | m.index - print "#define %s ((amqp_method_number_t) 0x%.08X) /* %d, %d; %d */" % \ + print "#define %s ((amqp_method_number_t) 0x%.08X) /**< %s.%s method id @internal %d, %d; %d */" % \ (m.defName(), methodid, + m.klass.name, + m.name, m.klass.index, m.index, methodid) - print "typedef struct %s_ {\n%s} %s;\n" % \ - (m.structName(), fieldDeclList(m.arguments), m.structName()) + print "/** %s.%s method fields */\ntypedef struct %s_ {\n%s} %s;\n" % \ + (m.klass.name, m.name, m.structName(), fieldDeclList(m.arguments), m.structName()) print "/* Class property records. */" for c in spec.allClasses(): - print "#define %s (0x%.04X) /* %d */" % \ - (cConstantName(c.name + "_class"), c.index, c.index) + print "#define %s (0x%.04X) /**< %s class id @internal %d */" % \ + (cConstantName(c.name + "_class"), c.index, c.name, c.index) index = 0 for f in c.fields: if index % 16 == 15: @@ -678,10 +759,11 @@ AMQP_CALL amqp_encode_properties(uint16_t class_id, shortnum = index // 16 partialindex = 15 - (index % 16) bitindex = shortnum * 16 + partialindex - print '#define %s (1 << %d)' % (cFlagName(c, f), bitindex) + print '#define %s (1 << %d) /**< %s.%s property flag */' % (cFlagName(c, f), bitindex, c.name, f.name) index = index + 1 - print "typedef struct %s_ {\n amqp_flags_t _flags;\n%s} %s;\n" % \ - (c.structName(), + print "/** %s class properties */\ntypedef struct %s_ {\n amqp_flags_t _flags; /**< bit-mask of set fields */\n%s} %s;\n" % \ + (c.name, + c.structName(), fieldDeclList(c.fields), c.structName()) diff --git a/librabbitmq/win32/threads.h b/librabbitmq/win32/threads.h index 668b2a3..d1de854 100644 --- a/librabbitmq/win32/threads.h +++ b/librabbitmq/win32/threads.h @@ -24,6 +24,12 @@ #ifndef AMQP_THREAD_H #define AMQP_THREAD_H +#ifndef WINVER +# define WINVER 0x0502 +#endif +#ifndef WIN32_LEAN_AND_MEAN +# define WIN32_LEAN_AND_MEAN +#endif #include <Windows.h> typedef CRITICAL_SECTION *pthread_mutex_t; |