summaryrefslogtreecommitdiff
path: root/librabbitmq
diff options
context:
space:
mode:
authorAsk Solem <ask@celeryproject.org>2014-04-14 17:29:03 +0100
committerAsk Solem <ask@celeryproject.org>2014-04-14 17:29:03 +0100
commitbe3000b4c84d7503f5ef4067de44ff16d060d158 (patch)
treefecacb0f149b067202c443b59aad3cc027a0ff1c /librabbitmq
parentdcb8edaccd6e164d624edfab0f3120d96f707f0a (diff)
parentfe844e41ffad5691607982cbfe4054aacdcb81e0 (diff)
downloadrabbitmq-c-github-ask-be3000b4c84d7503f5ef4067de44ff16d060d158.tar.gz
Merge branch 'alanxz/master'
Conflicts: Makefile.am codegen
Diffstat (limited to 'librabbitmq')
-rw-r--r--librabbitmq/CMakeLists.txt17
-rw-r--r--librabbitmq/amqp.h1932
-rw-r--r--librabbitmq/amqp_api.c11
-rw-r--r--librabbitmq/amqp_connection.c21
-rw-r--r--librabbitmq/amqp_consumer.c301
-rw-r--r--librabbitmq/amqp_cyassl.c4
-rw-r--r--librabbitmq/amqp_framing.c29
-rw-r--r--librabbitmq/amqp_framing.h10
-rw-r--r--librabbitmq/amqp_gnutls.c4
-rw-r--r--librabbitmq/amqp_hostcheck.c201
-rw-r--r--librabbitmq/amqp_hostcheck.h36
-rw-r--r--librabbitmq/amqp_mem.c7
-rw-r--r--librabbitmq/amqp_openssl.c86
-rw-r--r--librabbitmq/amqp_polarssl.c10
-rw-r--r--librabbitmq/amqp_private.h11
-rw-r--r--librabbitmq/amqp_socket.c333
-rw-r--r--librabbitmq/amqp_socket.h75
-rw-r--r--librabbitmq/amqp_ssl_socket.h53
-rw-r--r--librabbitmq/amqp_table.c150
-rw-r--r--librabbitmq/amqp_tcp_socket.c47
-rw-r--r--librabbitmq/amqp_tcp_socket.h11
-rw-r--r--librabbitmq/amqp_timer.c43
-rw-r--r--librabbitmq/amqp_timer.h29
-rw-r--r--librabbitmq/codegen.py116
-rw-r--r--librabbitmq/win32/threads.h6
25 files changed, 3246 insertions, 297 deletions
diff --git a/librabbitmq/CMakeLists.txt b/librabbitmq/CMakeLists.txt
index 8ab8bff..d3623b3 100644
--- a/librabbitmq/CMakeLists.txt
+++ b/librabbitmq/CMakeLists.txt
@@ -81,7 +81,11 @@ if (ENABLE_SSL_SUPPORT)
set(AMQP_SSL_SOCKET_H_PATH amqp_ssl_socket.h)
if (SSL_ENGINE STREQUAL "OpenSSL")
- set(AMQP_SSL_SRCS ${AMQP_SSL_SOCKET_H_PATH} amqp_openssl.c)
+ set(AMQP_SSL_SRCS ${AMQP_SSL_SOCKET_H_PATH}
+ amqp_openssl.c
+ amqp_hostcheck.c
+ amqp_hostcheck.h
+ )
include_directories(${OPENSSL_INCLUDE_DIR})
set(AMQP_SSL_LIBS ${OPENSSL_LIBRARIES})
@@ -121,6 +125,7 @@ set(RABBITMQ_SOURCES
amqp_api.c amqp.h amqp_connection.c amqp_mem.c amqp_private.h amqp_socket.c
amqp_table.c amqp_url.c amqp_socket.h amqp_tcp_socket.c amqp_tcp_socket.h
amqp_timer.c amqp_timer.h
+ amqp_consumer.c
${AMQP_SSL_SRCS}
)
@@ -142,9 +147,9 @@ if (BUILD_SHARED_LIBS)
endif (WIN32)
install(TARGETS rabbitmq
- RUNTIME DESTINATION bin
- LIBRARY DESTINATION lib
- ARCHIVE DESTINATION lib
+ RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR}
+ LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR}
+ ARCHIVE DESTINATION ${CMAKE_INSTALL_LIBDIR}
)
install_pdb(rabbitmq)
@@ -164,7 +169,7 @@ if (BUILD_STATIC_LIBS)
endif (WIN32)
install(TARGETS rabbitmq-static
- ARCHIVE DESTINATION lib
+ ARCHIVE DESTINATION ${CMAKE_INSTALL_LIBDIR}
)
install_pdb(rabbitmq-static)
@@ -179,7 +184,7 @@ install(FILES
amqp_tcp_socket.h
${AMQP_SSL_SOCKET_H_PATH}
${STDINT_H_INSTALL_FILE}
- DESTINATION include
+ DESTINATION ${CMAKE_INSTALL_INCLUDEDIR}
)
set(RMQ_LIBRARY_TARGET ${RMQ_LIBRARY_TARGET} PARENT_SCOPE)
diff --git a/librabbitmq/amqp.h b/librabbitmq/amqp.h
index 7f479c8..82b7c02 100644
--- a/librabbitmq/amqp.h
+++ b/librabbitmq/amqp.h
@@ -1,4 +1,5 @@
/* vim:set ft=c ts=2 sw=2 sts=2 et cindent: */
+/** \file */
/*
* ***** BEGIN LICENSE BLOCK *****
* Version: MIT
@@ -37,6 +38,8 @@
#ifndef AMQP_H
#define AMQP_H
+/** \cond HIDE_FROM_DOXYGEN */
+
#ifdef __cplusplus
#define AMQP_BEGIN_DECLS extern "C" {
#define AMQP_END_DECLS }
@@ -45,11 +48,13 @@
#define AMQP_END_DECLS
#endif
-/** Important API Decorators
- * AMQP_PUBLIC_FUNCTION - Declares an exportable function
- * AMQP_PUBLIC_VARIABLE - Declares an exportable variable
- * AMQP_CALL - Declares the calling convention
- */
+/*
+ * \internal
+ * Important API decorators:
+ * AMQP_PUBLIC_FUNCTION - a public API function
+ * AMQP_PUBLIC_VARIABLE - a public API external variable
+ * AMQP_CALL - calling convension (used on Win32)
+ */
#if defined(_WIN32) && defined(_MSC_VER)
# if defined(AMQP_BUILD) && !defined(AMQP_STATIC)
@@ -149,6 +154,12 @@ typedef _W64 int ssize_t;
#endif
#endif
+#if defined(_WIN32) && defined(__MINGW32__)
+#include <sys/types.h>
+#endif
+
+/** \endcond */
+
#include <stddef.h>
#include <stdint.h>
@@ -156,29 +167,257 @@ struct timeval;
AMQP_BEGIN_DECLS
+/**
+ * \def AMQP_VERSION_MAJOR
+ *
+ * Major library version number compile-time constant
+ *
+ * The major version is incremented when backwards incompatible API changes
+ * are made.
+ *
+ * \sa AMQP_VERSION, AMQP_VERSION_STRING
+ *
+ * \since v0.4.0
+ */
+
+/**
+ * \def AMQP_VERSION_MINOR
+ *
+ * Minor library version number compile-time constant
+ *
+ * The minor version is incremented when new APIs are added. Existing APIs
+ * are left alone.
+ *
+ * \sa AMQP_VERSION, AMQP_VERSION_STRING
+ *
+ * \since v0.4.0
+ */
+
+/**
+ * \def AMQP_VERSION_PATCH
+ *
+ * Patch library version number compile-time constant
+ *
+ * The patch version is incremented when library code changes, but the API
+ * is not changed.
+ *
+ * \sa AMQP_VERSION, AMQP_VERSION_STRING
+ *
+ * \since v0.4.0
+ */
+
+/**
+ * \def AMQP_VERSION_IS_RELEASE
+ *
+ * Version constant set to 1 for tagged release, 0 otherwise
+ *
+ * NOTE: versions that are not tagged releases are not guaranteed to be API/ABI
+ * compatible with older releases, and may change commit-to-commit.
+ *
+ * \sa AMQP_VERSION, AMQP_VERSION_STRING
+ *
+ * \since v0.4.0
+ */
+/*
+ * Developer note: when changing these, be sure to update SOVERSION constants
+ * in CMakeLists.txt and configure.ac
+ */
+
+#define AMQP_VERSION_MAJOR 0
+#define AMQP_VERSION_MINOR 5
+#define AMQP_VERSION_PATCH 1
+#define AMQP_VERSION_IS_RELEASE 0
+
+
+/**
+ * \def AMQP_VERSION
+ *
+ * Packed version number
+ *
+ * AMQP_VERSION is a 4-byte unsigned integer with the most significant byte
+ * set to AMQP_VERSION_MAJOR, the second most significant byte set to
+ * AMQP_VERSION_MINOR, third most significant byte set to AMQP_VERSION_PATCH,
+ * and the lowest byte set to AMQP_VERSION_IS_RELEASE.
+ *
+ * For example version 2.3.4 which is released version would be encoded as
+ * 0x02030401
+ *
+ * \sa amqp_version_number() AMQP_VERSION_MAJOR, AMQP_VERSION_MINOR,
+ * AMQP_VERSION_PATCH, AMQP_VERSION_IS_RELEASE
+ *
+ * \since v0.4.0
+ */
+#define AMQP_VERSION ((AMQP_VERSION_MAJOR << 24) | \
+ (AMQP_VERSION_MINOR << 16) | \
+ (AMQP_VERSION_PATCH << 8) | \
+ (AMQP_VERSION_IS_RELEASE))
+
+/** \cond HIDE_FROM_DOXYGEN */
+#define AMQ_STRINGIFY(s) AMQ_STRINGIFY_HELPER(s)
+#define AMQ_STRINGIFY_HELPER(s) #s
+
+#define AMQ_VERSION_STRING AMQ_STRINGIFY(AMQP_VERSION_MAJOR) "." \
+ AMQ_STRINGIFY(AMQP_VERSION_MINOR) "." \
+ AMQ_STRINGIFY(AMQP_VERSION_PATCH)
+/** \endcond */
+
+/**
+ * \def AMQP_VERSION_STRING
+ *
+ * Version string compile-time constant
+ *
+ * Non-released versions of the library will have "-pre" appended to the
+ * version string
+ *
+ * \sa amqp_version()
+ *
+ * \since v0.4.0
+ */
+#if AMQP_VERSION_IS_RELEASE
+# define AMQP_VERSION_STRING AMQ_VERSION_STRING
+#else
+# define AMQP_VERSION_STRING AMQ_VERSION_STRING "-pre"
+#endif
+
+
+/**
+ * Returns the rabbitmq-c version as a packed integer.
+ *
+ * See \ref AMQP_VERSION
+ *
+ * \return packed 32-bit integer representing version of library at runtime
+ *
+ * \sa AMQP_VERSION, amqp_version()
+ *
+ * \since v0.4.0
+ */
+AMQP_PUBLIC_FUNCTION
+uint32_t
+AMQP_CALL amqp_version_number(void);
+
+/**
+ * Returns the rabbitmq-c version as a string.
+ *
+ * See \ref AMQP_VERSION_STRING
+ *
+ * \return a statically allocated string describing the version of rabbitmq-c.
+ *
+ * \sa amqp_version_number(), AMQP_VERSION_STRING, AMQP_VERSION
+ *
+ * \since v0.1
+ */
+AMQP_PUBLIC_FUNCTION
+char const *
+AMQP_CALL amqp_version(void);
+
+/**
+ * \def AMQP_DEFAULT_FRAME_SIZE
+ *
+ * Default frame size (128Kb)
+ *
+ * \sa amqp_login(), amqp_login_with_properties()
+ *
+ * \since v0.4.0
+ */
+#define AMQP_DEFAULT_FRAME_SIZE 131072
+
+/**
+ * \def AMQP_DEFAULT_MAX_CHANNELS
+ *
+ * Default maximum number of channels (0, no limit)
+ *
+ * \sa amqp_login(), amqp_login_with_properties()
+ *
+ * \since v0.4.0
+ */
+#define AMQP_DEFAULT_MAX_CHANNELS 0
+
+/**
+ * \def AMQP_DEFAULT_HEARTBEAT
+ *
+ * Default heartbeat interval (0, heartbeat disabled)
+ *
+ * \sa amqp_login(), amqp_login_with_properties()
+ *
+ * \since v0.4.0
+ */
+#define AMQP_DEFAULT_HEARTBEAT 0
+
+/**
+ * boolean type 0 = false, true otherwise
+ *
+ * \since v0.1
+ */
typedef int amqp_boolean_t;
+
+/**
+ * Method number
+ *
+ * \since v0.1
+ */
typedef uint32_t amqp_method_number_t;
+
+/**
+ * Bitmask for flags
+ *
+ * \since v0.1
+ */
typedef uint32_t amqp_flags_t;
+
+/**
+ * Channel type
+ *
+ * \since v0.1
+ */
typedef uint16_t amqp_channel_t;
+/**
+ * Buffer descriptor
+ *
+ * \since v0.1
+ */
typedef struct amqp_bytes_t_ {
- size_t len;
- void *bytes;
+ size_t len; /**< length of the buffer in bytes */
+ void *bytes; /**< pointer to the beginning of the buffer */
} amqp_bytes_t;
+/**
+ * Decimal data type
+ *
+ * \since v0.1
+ */
typedef struct amqp_decimal_t_ {
- uint8_t decimals;
- uint32_t value;
+ uint8_t decimals; /**< the location of the decimal point */
+ uint32_t value; /**< the value before the decimal point is applied */
} amqp_decimal_t;
+/**
+ * AMQP field table
+ *
+ * An AMQP field table is a set of key-value pairs.
+ * A key is a UTF-8 encoded string up to 128 bytes long, and are not null
+ * terminated.
+ * A value can be one of several different datatypes. \sa amqp_field_value_kind_t
+ *
+ * \sa amqp_table_entry_t
+ *
+ * \since v0.1
+ */
typedef struct amqp_table_t_ {
- int num_entries;
- struct amqp_table_entry_t_ *entries;
+ int num_entries; /**< length of entries array */
+ struct amqp_table_entry_t_ *entries; /**< an array of table entries */
} amqp_table_t;
+/**
+ * An AMQP Field Array
+ *
+ * A repeated set of field values, all must be of the same type
+ *
+ * \since v0.1
+ */
typedef struct amqp_array_t_ {
- int num_entries;
- struct amqp_field_value_t_ *entries;
+ int num_entries; /**< Number of entries in the table */
+ struct amqp_field_value_t_ *entries; /**< linked list of field values */
} amqp_array_t;
/*
@@ -220,214 +459,615 @@ of the other two, so this will work for both 0-8 and 0-9-1 branches of
the code.
*/
+/**
+ * A field table value
+ *
+ * \since v0.1
+ */
typedef struct amqp_field_value_t_ {
- uint8_t kind;
+ uint8_t kind; /**< the type of the entry /sa amqp_field_value_kind_t */
union {
- amqp_boolean_t boolean;
- int8_t i8;
- uint8_t u8;
- int16_t i16;
- uint16_t u16;
- int32_t i32;
- uint32_t u32;
- int64_t i64;
- uint64_t u64;
- float f32;
- double f64;
- amqp_decimal_t decimal;
- amqp_bytes_t bytes;
- amqp_table_t table;
- amqp_array_t array;
- } value;
+ amqp_boolean_t boolean; /**< boolean type AMQP_FIELD_KIND_BOOLEAN */
+ int8_t i8; /**< int8_t type AMQP_FIELD_KIND_I8 */
+ uint8_t u8; /**< uint8_t type AMQP_FIELD_KIND_U8 */
+ int16_t i16; /**< int16_t type AMQP_FIELD_KIND_I16 */
+ uint16_t u16; /**< uint16_t type AMQP_FIELD_KIND_U16 */
+ int32_t i32; /**< int32_t type AMQP_FIELD_KIND_I32 */
+ uint32_t u32; /**< uint32_t type AMQP_FIELD_KIND_U32 */
+ int64_t i64; /**< int64_t type AMQP_FIELD_KIND_I64 */
+ uint64_t u64; /**< uint64_t type AMQP_FIELD_KIND_U64, AMQP_FIELD_KIND_TIMESTAMP */
+ float f32; /**< float type AMQP_FIELD_KIND_F32 */
+ double f64; /**< double type AMQP_FIELD_KIND_F64 */
+ amqp_decimal_t decimal; /**< amqp_decimal_t AMQP_FIELD_KIND_DECIMAL */
+ amqp_bytes_t bytes; /**< amqp_bytes_t type AMQP_FIELD_KIND_UTF8, AMQP_FIELD_KIND_BYTES */
+ amqp_table_t table; /**< amqp_table_t type AMQP_FIELD_KIND_TABLE */
+ amqp_array_t array; /**< amqp_array_t type AMQP_FIELD_KIND_ARRAY */
+ } value; /**< a union of the value */
} amqp_field_value_t;
+/**
+ * An entry in a field-table
+ *
+ * \sa amqp_table_encode(), amqp_table_decode(), amqp_table_clone()
+ *
+ * \since v0.1
+ */
typedef struct amqp_table_entry_t_ {
- amqp_bytes_t key;
- amqp_field_value_t value;
+ amqp_bytes_t key; /**< the table entry key. Its a null-terminated UTF-8 string,
+ * with a maximum size of 128 bytes */
+ amqp_field_value_t value; /**< the table entry values */
} amqp_table_entry_t;
+/**
+ * Field value types
+ *
+ * \since v0.1
+ */
typedef enum {
- AMQP_FIELD_KIND_BOOLEAN = 't',
- AMQP_FIELD_KIND_I8 = 'b',
- AMQP_FIELD_KIND_U8 = 'B',
- AMQP_FIELD_KIND_I16 = 's',
- AMQP_FIELD_KIND_U16 = 'u',
- AMQP_FIELD_KIND_I32 = 'I',
- AMQP_FIELD_KIND_U32 = 'i',
- AMQP_FIELD_KIND_I64 = 'l',
- AMQP_FIELD_KIND_U64 = 'L',
- AMQP_FIELD_KIND_F32 = 'f',
- AMQP_FIELD_KIND_F64 = 'd',
- AMQP_FIELD_KIND_DECIMAL = 'D',
- AMQP_FIELD_KIND_UTF8 = 'S',
- AMQP_FIELD_KIND_ARRAY = 'A',
- AMQP_FIELD_KIND_TIMESTAMP = 'T',
- AMQP_FIELD_KIND_TABLE = 'F',
- AMQP_FIELD_KIND_VOID = 'V',
- AMQP_FIELD_KIND_BYTES = 'x'
+ AMQP_FIELD_KIND_BOOLEAN = 't', /**< boolean type. 0 = false, 1 = true @see amqp_boolean_t */
+ AMQP_FIELD_KIND_I8 = 'b', /**< 8-bit signed integer, datatype: int8_t */
+ AMQP_FIELD_KIND_U8 = 'B', /**< 8-bit unsigned integer, datatype: uint8_t */
+ AMQP_FIELD_KIND_I16 = 's', /**< 16-bit signed integer, datatype: int16_t */
+ AMQP_FIELD_KIND_U16 = 'u', /**< 16-bit unsigned integer, datatype: uint16_t */
+ AMQP_FIELD_KIND_I32 = 'I', /**< 32-bit signed integer, datatype: int32_t */
+ AMQP_FIELD_KIND_U32 = 'i', /**< 32-bit unsigned integer, datatype: uint32_t */
+ AMQP_FIELD_KIND_I64 = 'l', /**< 64-bit signed integer, datatype: int64_t */
+ AMQP_FIELD_KIND_U64 = 'L', /**< 64-bit unsigned integer, datatype: uint64_t */
+ AMQP_FIELD_KIND_F32 = 'f', /**< single-precision floating point value, datatype: float */
+ AMQP_FIELD_KIND_F64 = 'd', /**< double-precision floating point value, datatype: double */
+ AMQP_FIELD_KIND_DECIMAL = 'D', /**< amqp-decimal value, datatype: amqp_decimal_t */
+ AMQP_FIELD_KIND_UTF8 = 'S', /**< UTF-8 null-terminated character string, datatype: amqp_bytes_t */
+ AMQP_FIELD_KIND_ARRAY = 'A', /**< field array (repeated values of another datatype. datatype: amqp_array_t */
+ AMQP_FIELD_KIND_TIMESTAMP = 'T',/**< 64-bit timestamp. datatype uint64_t */
+ AMQP_FIELD_KIND_TABLE = 'F', /**< field table. encapsulates a table inside a table entry. datatype: amqp_table_t */
+ AMQP_FIELD_KIND_VOID = 'V', /**< empty entry */
+ AMQP_FIELD_KIND_BYTES = 'x' /**< unformatted byte string, datatype: amqp_bytes_t */
} amqp_field_value_kind_t;
+/**
+ * A list of allocation blocks
+ *
+ * \since v0.1
+ */
typedef struct amqp_pool_blocklist_t_ {
- int num_blocks;
- void **blocklist;
+ int num_blocks; /**< Number of blocks in the block list */
+ void **blocklist; /**< Array of memory blocks */
} amqp_pool_blocklist_t;
+/**
+ * A memory pool
+ *
+ * \since v0.1
+ */
typedef struct amqp_pool_t_ {
- size_t pagesize;
-
- amqp_pool_blocklist_t pages;
- amqp_pool_blocklist_t large_blocks;
-
- int next_page;
- char *alloc_block;
- size_t alloc_used;
+ size_t pagesize; /**< the size of the page in bytes.
+ * allocations less than or equal to this size are
+ * allocated in the pages block list
+ * allocations greater than this are allocated in their
+ * own block in the large_blocks block list */
+
+ amqp_pool_blocklist_t pages; /**< blocks that are the size of pagesize */
+ amqp_pool_blocklist_t large_blocks; /**< allocations larger than the pagesize */
+
+ int next_page; /**< an index to the next unused page block */
+ char *alloc_block; /**< pointer to the current allocation block */
+ size_t alloc_used; /**< number of bytes in the current allocation block that has been used */
} amqp_pool_t;
+/**
+ * An amqp method
+ *
+ * \since v0.1
+ */
typedef struct amqp_method_t_ {
- amqp_method_number_t id;
- void *decoded;
+ amqp_method_number_t id; /**< the method id number */
+ void *decoded; /**< pointer to the decoded method,
+ * cast to the appropriate type to use */
} amqp_method_t;
+/**
+ * An AMQP frame
+ *
+ * \since v0.1
+ */
typedef struct amqp_frame_t_ {
- uint8_t frame_type; /* 0 means no event */
- amqp_channel_t channel;
+ uint8_t frame_type; /**< frame type. The types:
+ * - AMQP_FRAME_METHOD - use the method union member
+ * - AMQP_FRAME_HEADER - use the properties union member
+ * - AMQP_FRAME_BODY - use the body_fragment union member
+ */
+ amqp_channel_t channel; /**< the channel the frame was received on */
union {
- amqp_method_t method;
+ amqp_method_t method; /**< a method, use if frame_type == AMQP_FRAME_METHOD */
struct {
- uint16_t class_id;
- uint64_t body_size;
- void *decoded;
- amqp_bytes_t raw;
- } properties;
- amqp_bytes_t body_fragment;
+ uint16_t class_id; /**< the class for the properties */
+ uint64_t body_size; /**< size of the body in bytes */
+ void *decoded; /**< the decoded properties */
+ amqp_bytes_t raw; /**< amqp-encoded properties structure */
+ } properties; /**< message header, a.k.a., properties,
+ use if frame_type == AMQP_FRAME_HEADER */
+ amqp_bytes_t body_fragment; /**< a body fragment, use if frame_type == AMQP_FRAME_BODY */
struct {
- uint8_t transport_high;
- uint8_t transport_low;
- uint8_t protocol_version_major;
- uint8_t protocol_version_minor;
- } protocol_header;
- } payload;
+ uint8_t transport_high; /**< @internal first byte of handshake */
+ uint8_t transport_low; /**< @internal second byte of handshake */
+ uint8_t protocol_version_major; /**< @internal third byte of handshake */
+ uint8_t protocol_version_minor; /**< @internal fourth byte of handshake */
+ } protocol_header; /**< Used only when doing the initial handshake with the broker,
+ don't use otherwise */
+ } payload; /**< the payload of the frame */
} amqp_frame_t;
+/**
+ * Response type
+ *
+ * \since v0.1
+ */
typedef enum amqp_response_type_enum_ {
- AMQP_RESPONSE_NONE = 0,
- AMQP_RESPONSE_NORMAL,
- AMQP_RESPONSE_LIBRARY_EXCEPTION,
- AMQP_RESPONSE_SERVER_EXCEPTION
+ AMQP_RESPONSE_NONE = 0, /**< the library got an EOF from the socket */
+ AMQP_RESPONSE_NORMAL, /**< response normal, the RPC completed successfully */
+ AMQP_RESPONSE_LIBRARY_EXCEPTION,/**< library error, an error occurred in the library, examine the library_error */
+ AMQP_RESPONSE_SERVER_EXCEPTION /**< server exception, the broker returned an error, check replay */
} amqp_response_type_enum;
+/**
+ * Reply from a RPC method on the broker
+ *
+ * \since v0.1
+ */
typedef struct amqp_rpc_reply_t_ {
- amqp_response_type_enum reply_type;
- amqp_method_t reply;
- int library_error; /* if AMQP_RESPONSE_LIBRARY_EXCEPTION, then 0 here means socket EOF */
+ amqp_response_type_enum reply_type; /**< the reply type:
+ * - AMQP_RESPONSE_NORMAL - the RPC completed successfully
+ * - AMQP_RESPONSE_SERVER_EXCEPTION - the broker returned
+ * an exception, check the reply field
+ * - AMQP_RESPONSE_LIBRARY_EXCEPTION - the library
+ * encountered an error, check the library_error field
+ */
+ amqp_method_t reply; /**< in case of AMQP_RESPONSE_SERVER_EXCEPTION this
+ * field will be set to the method returned from the broker */
+ int library_error; /**< in case of AMQP_RESPONSE_LIBRARY_EXCEPTION this
+ * field will be set to an error code. An error
+ * string can be retrieved using amqp_error_string */
} amqp_rpc_reply_t;
+/**
+ * SASL method type
+ *
+ * \since v0.1
+ */
typedef enum amqp_sasl_method_enum_ {
- AMQP_SASL_METHOD_PLAIN = 0
+ AMQP_SASL_METHOD_PLAIN = 0 /**< the PLAIN SASL method for authentication to the broker */
} amqp_sasl_method_enum;
-/* Opaque struct. */
+/**
+ * connection state object
+ *
+ * \since v0.1
+ */
typedef struct amqp_connection_state_t_ *amqp_connection_state_t;
+/**
+ * Socket object
+ *
+ * \since v0.4.0
+ */
typedef struct amqp_socket_t_ amqp_socket_t;
+/**
+ * Status codes
+ *
+ * \since v0.4.0
+ */
typedef enum amqp_status_enum_
{
- AMQP_STATUS_OK = 0x0,
- AMQP_STATUS_NO_MEMORY = -0x0001,
- AMQP_STATUS_BAD_AMQP_DATA = -0x0002,
- AMQP_STATUS_UNKNOWN_CLASS = -0x0003,
- AMQP_STATUS_UNKNOWN_METHOD = -0x0004,
- AMQP_STATUS_HOSTNAME_RESOLUTION_FAILED= -0x0005,
- AMQP_STATUS_INCOMPATIBLE_AMQP_VERSION = -0x0006,
- AMQP_STATUS_CONNECTION_CLOSED = -0x0007,
- AMQP_STATUS_BAD_URL = -0x0008,
- AMQP_STATUS_SOCKET_ERROR = -0x0009,
- AMQP_STATUS_INVALID_PARAMETER = -0x000A,
- AMQP_STATUS_TABLE_TOO_BIG = -0x000B,
- AMQP_STATUS_WRONG_METHOD = -0x000C,
- AMQP_STATUS_TIMEOUT = -0x000D,
- AMQP_STATUS_TIMER_FAILURE = -0x000E,
- AMQP_STATUS_HEARTBEAT_TIMEOUT = -0x000F,
-
- AMQP_STATUS_TCP_ERROR = -0x0100,
- AMQP_STATUS_TCP_SOCKETLIB_INIT_ERROR = -0x0101,
-
- AMQP_STATUS_SSL_ERROR = -0x0200,
- AMQP_STATUS_SSL_HOSTNAME_VERIFY_FAILED= -0x0201,
- AMQP_STATUS_SSL_PEER_VERIFY_FAILED = -0x0202,
- AMQP_STATUS_SSL_CONNECTION_FAILED = -0x0203
+ AMQP_STATUS_OK = 0x0, /**< Operation successful */
+ AMQP_STATUS_NO_MEMORY = -0x0001, /**< Memory allocation
+ failed */
+ AMQP_STATUS_BAD_AMQP_DATA = -0x0002, /**< Incorrect or corrupt
+ data was received from
+ the broker. This is a
+ protocol error. */
+ AMQP_STATUS_UNKNOWN_CLASS = -0x0003, /**< An unknown AMQP class
+ was received. This is
+ a protocol error. */
+ AMQP_STATUS_UNKNOWN_METHOD = -0x0004, /**< An unknown AMQP method
+ was received. This is
+ a protocol error. */
+ AMQP_STATUS_HOSTNAME_RESOLUTION_FAILED= -0x0005, /**< Unable to resolve the
+ * hostname */
+ AMQP_STATUS_INCOMPATIBLE_AMQP_VERSION = -0x0006, /**< The broker advertised
+ an incompaible AMQP
+ version */
+ AMQP_STATUS_CONNECTION_CLOSED = -0x0007, /**< The connection to the
+ broker has been closed
+ */
+ AMQP_STATUS_BAD_URL = -0x0008, /**< malformed AMQP URL */
+ AMQP_STATUS_SOCKET_ERROR = -0x0009, /**< A socket error
+ occurred */
+ AMQP_STATUS_INVALID_PARAMETER = -0x000A, /**< An invalid parameter
+ was passed into the
+ function */
+ AMQP_STATUS_TABLE_TOO_BIG = -0x000B, /**< The amqp_table_t object
+ cannot be serialized
+ because the output
+ buffer is too small */
+ AMQP_STATUS_WRONG_METHOD = -0x000C, /**< The wrong method was
+ received */
+ AMQP_STATUS_TIMEOUT = -0x000D, /**< Operation timed out */
+ AMQP_STATUS_TIMER_FAILURE = -0x000E, /**< The underlying system
+ timer facility failed */
+ AMQP_STATUS_HEARTBEAT_TIMEOUT = -0x000F, /**< Timed out waiting for
+ heartbeat */
+ AMQP_STATUS_UNEXPECTED_STATE = -0x0010, /**< Unexpected protocol
+ state */
+
+ AMQP_STATUS_TCP_ERROR = -0x0100, /**< A generic TCP error
+ occurred */
+ AMQP_STATUS_TCP_SOCKETLIB_INIT_ERROR = -0x0101, /**< An error occurred trying
+ to initialize the
+ socket library*/
+
+ AMQP_STATUS_SSL_ERROR = -0x0200, /**< A generic SSL error
+ occurred. */
+ AMQP_STATUS_SSL_HOSTNAME_VERIFY_FAILED= -0x0201, /**< SSL validation of
+ hostname against
+ peer certificate
+ failed */
+ AMQP_STATUS_SSL_PEER_VERIFY_FAILED = -0x0202, /**< SSL validation of peer
+ certificate failed. */
+ AMQP_STATUS_SSL_CONNECTION_FAILED = -0x0203 /**< SSL handshake failed. */
} amqp_status_enum;
-AMQP_PUBLIC_FUNCTION
-char const *
-AMQP_CALL amqp_version(void);
+/**
+ * AMQP delivery modes.
+ * Use these values for the #amqp_basic_properties_t::delivery_mode field.
+ *
+ * \since v0.5
+ */
+typedef enum {
+ AMQP_DELIVERY_NONPERSISTENT = 1, /**< Non-persistent message */
+ AMQP_DELIVERY_PERSISTENT = 2 /**< Persistent message */
+} amqp_delivery_mode_enum;
+
+AMQP_END_DECLS
-/* Exported empty data structures */
+#include <amqp_framing.h>
+
+AMQP_BEGIN_DECLS
+
+/**
+ * Empty bytes structure
+ *
+ * \since v0.2
+ */
AMQP_PUBLIC_VARIABLE const amqp_bytes_t amqp_empty_bytes;
+
+/**
+ * Empty table structure
+ *
+ * \since v0.2
+ */
AMQP_PUBLIC_VARIABLE const amqp_table_t amqp_empty_table;
+
+/**
+ * Empty table array structure
+ *
+ * \since v0.2
+ */
AMQP_PUBLIC_VARIABLE const amqp_array_t amqp_empty_array;
/* Compatibility macros for the above, to avoid the need to update
code written against earlier versions of librabbitmq. */
+
+/**
+ * \def AMQP_EMPTY_BYTES
+ *
+ * Deprecated, use \ref amqp_empty_bytes instead
+ *
+ * \deprecated use \ref amqp_empty_bytes instead
+ *
+ * \since v0.1
+ */
#define AMQP_EMPTY_BYTES amqp_empty_bytes
+
+/**
+ * \def AMQP_EMPTY_TABLE
+ *
+ * Deprecated, use \ref amqp_empty_table instead
+ *
+ * \deprecated use \ref amqp_empty_table instead
+ *
+ * \since v0.1
+ */
#define AMQP_EMPTY_TABLE amqp_empty_table
+
+/**
+ * \def AMQP_EMPTY_ARRAY
+ *
+ * Deprecated, use \ref amqp_empty_array instead
+ *
+ * \deprecated use \ref amqp_empty_array instead
+ *
+ * \since v0.1
+ */
#define AMQP_EMPTY_ARRAY amqp_empty_array
+/**
+ * Initializes an amqp_pool_t memory allocation pool for use
+ *
+ * Readies an allocation pool for use. An amqp_pool_t
+ * must be initialized before use
+ *
+ * \param [in] pool the amqp_pool_t structure to initialize.
+ * Calling this function on a pool a pool that has
+ * already been initialized will result in undefined
+ * behavior
+ * \param [in] pagesize the unit size that the pool will allocate
+ * memory chunks in. Anything allocated against the pool
+ * with a requested size will be carved out of a block
+ * this size. Allocations larger than this will be
+ * allocated individually
+ *
+ * \sa recycle_amqp_pool(), empty_amqp_pool(), amqp_pool_alloc(),
+ * amqp_pool_alloc_bytes(), amqp_pool_t
+ *
+ * \since v0.1
+ */
AMQP_PUBLIC_FUNCTION
void
AMQP_CALL init_amqp_pool(amqp_pool_t *pool, size_t pagesize);
+/**
+ * Recycles an amqp_pool_t memory allocation pool
+ *
+ * Recycles the space allocate by the pool
+ *
+ * This invalidates all allocations made against the pool before this call is
+ * made, any use of any allocations made before recycle_amqp_pool() is called
+ * will result in undefined behavior.
+ *
+ * Note: this may or may not release memory, to force memory to be released
+ * call empty_amqp_pool().
+ *
+ * \param [in] pool the amqp_pool_t to recycle
+ *
+ * \sa recycle_amqp_pool(), empty_amqp_pool(), amqp_pool_alloc(),
+ * amqp_pool_alloc_bytes()
+ *
+ * \since v0.1
+ *
+ */
AMQP_PUBLIC_FUNCTION
void
AMQP_CALL recycle_amqp_pool(amqp_pool_t *pool);
+/**
+ * Empties an amqp memory pool
+ *
+ * Releases all memory associated with an allocation pool
+ *
+ * \param [in] pool the amqp_pool_t to empty
+ *
+ * \since v0.1
+ */
AMQP_PUBLIC_FUNCTION
void
AMQP_CALL empty_amqp_pool(amqp_pool_t *pool);
+/**
+ * Allocates a block of memory from an amqp_pool_t memory pool
+ *
+ * Memory will be aligned on a 8-byte boundary. If a 0-length allocation is
+ * requested, a NULL pointer will be returned.
+ *
+ * \param [in] pool the allocation pool to allocate the memory from
+ * \param [in] amount the size of the allocation in bytes.
+ * \return a pointer to the memory block, or NULL if the allocation cannot
+ * be satisfied.
+ *
+ * \sa init_amqp_pool(), recycle_amqp_pool(), empty_amqp_pool(),
+ * amqp_pool_alloc_bytes()
+ *
+ * \since v0.1
+ */
AMQP_PUBLIC_FUNCTION
void *
AMQP_CALL amqp_pool_alloc(amqp_pool_t *pool, size_t amount);
+/**
+ * Allocates a block of memory from an amqp_pool_t to an amqp_bytes_t
+ *
+ * Memory will be aligned on a 8-byte boundary. If a 0-length allocation is
+ * requested, output.bytes = NULL.
+ *
+ * \param [in] pool the allocation pool to allocate the memory from
+ * \param [in] amount the size of the allocation in bytes
+ * \param [in] output the location to store the pointer. On success
+ * output.bytes will be set to the beginning of the buffer
+ * output.len will be set to amount
+ * On error output.bytes will be set to NULL and output.len
+ * set to 0
+ *
+ * \sa init_amqp_pool(), recycle_amqp_pool(), empty_amqp_pool(),
+ * amqp_pool_alloc()
+ *
+ * \since v0.1
+ */
AMQP_PUBLIC_FUNCTION
void
AMQP_CALL amqp_pool_alloc_bytes(amqp_pool_t *pool, size_t amount, amqp_bytes_t *output);
+/**
+ * Wraps a c string in an amqp_bytes_t
+ *
+ * Takes a string, calculates its length and creates an
+ * amqp_bytes_t that points to it. The string is not duplicated.
+ *
+ * For a given input cstr, The amqp_bytes_t output.bytes is the
+ * same as cstr, output.len is the length of the string not including
+ * the \0 terminator
+ *
+ * This function uses strlen() internally so cstr must be properly
+ * terminated
+ *
+ * \param [in] cstr the c string to wrap
+ * \return an amqp_bytes_t that describes the string
+ *
+ * \since v0.1
+ */
AMQP_PUBLIC_FUNCTION
amqp_bytes_t
AMQP_CALL amqp_cstring_bytes(char const *cstr);
+/**
+ * Duplicates an amqp_bytes_t buffer.
+ *
+ * The buffer is cloned and the contents copied.
+ *
+ * The memory associated with the output is allocated
+ * with amqp_bytes_malloc() and should be freed with
+ * amqp_bytes_free()
+ *
+ * \param [in] src
+ * \return a clone of the src
+ *
+ * \sa amqp_bytes_free(), amqp_bytes_malloc()
+ *
+ * \since v0.1
+ */
AMQP_PUBLIC_FUNCTION
amqp_bytes_t
AMQP_CALL amqp_bytes_malloc_dup(amqp_bytes_t src);
+/**
+ * Allocates a amqp_bytes_t buffer
+ *
+ * Creates an amqp_bytes_t buffer of the specified amount, the buffer should be
+ * freed using amqp_bytes_free()
+ *
+ * \param [in] amount the size of the buffer in bytes
+ * \returns an amqp_bytes_t with amount bytes allocated.
+ * output.bytes will be set to NULL on error
+ *
+ * \sa amqp_bytes_free(), amqp_bytes_malloc_dup()
+ *
+ * \since v0.1
+ */
AMQP_PUBLIC_FUNCTION
amqp_bytes_t
AMQP_CALL amqp_bytes_malloc(size_t amount);
+/**
+ * Frees an amqp_bytes_t buffer
+ *
+ * Frees a buffer allocated with amqp_bytes_malloc() or amqp_bytes_malloc_dup()
+ *
+ * Calling amqp_bytes_free on buffers not allocated with one
+ * of those two functions will result in undefined behavior
+ *
+ * \param [in] bytes the buffer to free
+ *
+ * \sa amqp_bytes_malloc(), amqp_bytes_malloc_dup()
+ *
+ * \since v0.1
+ */
AMQP_PUBLIC_FUNCTION
void
AMQP_CALL amqp_bytes_free(amqp_bytes_t bytes);
+/**
+ * Allocate and initialize a new amqp_connection_state_t object
+ *
+ * amqp_connection_state_t objects created with this function
+ * should be freed with amqp_destroy_connection()
+ *
+ * \returns an opaque pointer on success, NULL or 0 on failure.
+ *
+ * \sa amqp_destroy_connection()
+ *
+ * \since v0.1
+ */
AMQP_PUBLIC_FUNCTION
amqp_connection_state_t
AMQP_CALL amqp_new_connection(void);
+/**
+ * Get the underlying socket descriptor for the connection
+ *
+ * \warning Use the socket returned from this function carefully, incorrect use
+ * of the socket outside of the library will lead to undefined behavior.
+ * Additionally rabbitmq-c may use the socket differently version-to-version,
+ * what may work in one version, may break in the next version. Be sure to
+ * throughly test any applications that use the socket returned by this
+ * function especially when using a newer version of rabbitmq-c
+ *
+ * \param [in] state the connection object
+ * \returns the socket descriptor if one has been set, -1 otherwise
+ *
+ * \sa amqp_tcp_socket_new(), amqp_ssl_socket_new(), amqp_socket_open()
+ *
+ * \since v0.1
+ */
AMQP_PUBLIC_FUNCTION
int
AMQP_CALL amqp_get_sockfd(amqp_connection_state_t state);
+
+/**
+ * Deprecated, use amqp_tcp_socket_new() or amqp_ssl_socket_new()
+ *
+ * \deprecated Use amqp_tcp_socket_new() or amqp_ssl_socket_new()
+ *
+ * Sets the socket descriptor associated with the connection. The socket
+ * should be connected to a broker, and should not be read to or written from
+ * before calling this function. A socket descriptor can be created and opened
+ * using amqp_open_socket()
+ *
+ * \param [in] state the connection object
+ * \param [in] sockfd the socket
+ *
+ * \sa amqp_open_socket(), amqp_tcp_socket_new(), amqp_ssl_socket_new()
+ *
+ * \since v0.1
+ */
AMQP_DEPRECATED(
AMQP_PUBLIC_FUNCTION
void
AMQP_CALL amqp_set_sockfd(amqp_connection_state_t state, int sockfd)
);
-AMQP_PUBLIC_FUNCTION
-void
-AMQP_CALL amqp_set_socket(amqp_connection_state_t state, amqp_socket_t *socket);
+/**
+ * Tune client side parameters
+ *
+ * \warning This function may call abort() if the connection is in a certain
+ * state. As such it should probably not be called code outside the library.
+ * connection parameters should be specified when calling amqp_login() or
+ * amqp_login_with_properties()
+ *
+ * This function changes channel_max, frame_max, and heartbeat parameters, on
+ * the client side only. It does not try to renegotiate these parameters with
+ * the broker. Using this function will lead to unexpected results.
+ *
+ * \param [in] state the connection object
+ * \param [in] channel_max the maximum number of channels.
+ * The largest this can be is 65535
+ * \param [in] frame_max the maximum size of an frame.
+ * The smallest this can be is 4096
+ * The largest this can be is 2147483647
+ * Unless you know what you're doing the recommended
+ * size is 131072 or 128KB
+ * \param [in] heartbeat the number of seconds between heartbeats
+ *
+ * \return AMQP_STATUS_OK on success, an amqp_status_enum value otherwise.
+ * Possible error codes include:
+ * - AMQP_STATUS_NO_MEMORY memory allocation failed.
+ * - AMQP_STATUS_TIMER_FAILURE the underlying system timer indicated it
+ * failed.
+ *
+ * \sa amqp_login(), amqp_login_with_properties()
+ *
+ * \since v0.1
+ */
AMQP_PUBLIC_FUNCTION
int
AMQP_CALL amqp_tune_connection(amqp_connection_state_t state,
@@ -435,67 +1075,463 @@ AMQP_CALL amqp_tune_connection(amqp_connection_state_t state,
int frame_max,
int heartbeat);
+/**
+ * Get the maximum number of channels the connection can handle
+ *
+ * The maximum number of channels is set when connection negotiation takes
+ * place in amqp_login() or amqp_login_with_properties().
+ *
+ * \param [in] state the connection object
+ * \return the maximum number of channels. 0 if there is no limit
+ *
+ * \since v0.1
+ */
AMQP_PUBLIC_FUNCTION
int
AMQP_CALL amqp_get_channel_max(amqp_connection_state_t state);
+/**
+ * Destroys an amqp_connection_state_t object
+ *
+ * Destroys a amqp_connection_state_t object that was created with
+ * amqp_new_connection(). If the connection with the broker is open, it will be
+ * implicitly closed with a reply code of 200 (success). Any memory that
+ * would be freed with amqp_maybe_release_buffers() or
+ * amqp_maybe_release_buffers_on_channel() will be freed, and use of that
+ * memory will caused undefined behavior.
+ *
+ * \param [in] state the connection object
+ * \return AMQP_STATUS_OK on success. amqp_status_enum value failure
+ *
+ * \sa amqp_new_connection()
+ *
+ * \since v0.1
+ */
AMQP_PUBLIC_FUNCTION
int
AMQP_CALL amqp_destroy_connection(amqp_connection_state_t state);
+/**
+ * Process incoming data
+ *
+ * \warning This is a low-level function intended for those who want to
+ * have greater control over input and output over the socket from the
+ * broker. Correctly using this function requires in-depth knowledge of AMQP
+ * and rabbitmq-c.
+ *
+ * For a given buffer of data received from the broker, decode the first
+ * frame in the buffer. If more than one frame is contained in the input buffer
+ * the return value will be less than the received_data size, the caller should
+ * adjust received_data buffer descriptor to point to the beginning of the
+ * buffer + the return value.
+ *
+ * \param [in] state the connection object
+ * \param [in] received_data a buffer of data received from the broker. The
+ * function will return the number of bytes of the buffer it used. The
+ * function copies these bytes to an internal buffer: this part of the buffer
+ * may be reused after this function successfully completes.
+ * \param [in,out] decoded_frame caller should pass in a pointer to an
+ * amqp_frame_t struct. If there is enough data in received_data for a
+ * complete frame, decoded_frame->frame_type will be set to something OTHER
+ * than 0. decoded_frame may contain members pointing to memory owned by
+ * the state object. This memory can be recycled with amqp_maybe_release_buffers()
+ * or amqp_maybe_release_buffers_on_channel()
+ * \return number of bytes consumed from received_data or 0 if a 0-length
+ * buffer was passed. A negative return value indicates failure. Possible errors:
+ * - AMQP_STATUS_NO_MEMORY failure in allocating memory. The library is likely in
+ * an indeterminate state making recovery unlikely. Client should note the error
+ * and terminate the application
+ * - AMQP_STATUS_BAD_AMQP_DATA bad AMQP data was received. The connection
+ * should be shutdown immediately
+ * - AMQP_STATUS_UNKNOWN_METHOD: an unknown method was received from the
+ * broker. This is likely a protocol error and the connection should be
+ * shutdown immediately
+ * - AMQP_STATUS_UNKNOWN_CLASS: a properties frame with an unknown class
+ * was received from the broker. This is likely a protocol error and the
+ * connection should be shutdown immediately
+ *
+ * \since v0.1
+ */
AMQP_PUBLIC_FUNCTION
int
AMQP_CALL amqp_handle_input(amqp_connection_state_t state,
amqp_bytes_t received_data,
amqp_frame_t *decoded_frame);
+/**
+ * Check to see if connection memory can be released
+ *
+ * \deprecated This function is deprecated in favor of
+ * amqp_maybe_release_buffers() or amqp_maybe_release_buffers_on_channel()
+ *
+ * Checks the state of an amqp_connection_state_t object to see if
+ * amqp_release_buffers() can be called successfully.
+ *
+ * \param [in] state the connection object
+ * \returns TRUE if the buffers can be released FALSE otherwise
+ *
+ * \sa amqp_release_buffers() amqp_maybe_release_buffers()
+ * amqp_maybe_release_buffers_on_channel()
+ *
+ * \since v0.1
+ */
AMQP_PUBLIC_FUNCTION
amqp_boolean_t
AMQP_CALL amqp_release_buffers_ok(amqp_connection_state_t state);
+/**
+ * Release amqp_connection_state_t owned memory
+ *
+ * \deprecated This function is deprecated in favor of
+ * amqp_maybe_release_buffers() or amqp_maybe_release_buffers_on_channel()
+ *
+ * \warning caller should ensure amqp_release_buffers_ok() returns true before
+ * calling this function. Failure to do so may result in abort() being called.
+ *
+ * Release memory owned by the amqp_connection_state_t for reuse by the
+ * library. Use of any memory returned by the library before this function is
+ * called will result in undefined behavior.
+ *
+ * \note internally rabbitmq-c tries to reuse memory when possible. As a result
+ * its possible calling this function may not have a noticeable effect on
+ * memory usage.
+ *
+ * \param [in] state the connection object
+ *
+ * \sa amqp_release_buffers_ok() amqp_maybe_release_buffers()
+ * amqp_maybe_release_buffers_on_channel()
+ *
+ * \since v0.1
+ */
AMQP_PUBLIC_FUNCTION
void
AMQP_CALL amqp_release_buffers(amqp_connection_state_t state);
+/**
+ * Release amqp_connection_state_t owned memory
+ *
+ * Release memory owned by the amqp_connection_state_t object related to any
+ * channel, allowing reuse by the library. Use of any memory returned by the
+ * library before this function is called with result in undefined behavior.
+ *
+ * \note internally rabbitmq-c tries to reuse memory when possible. As a result
+ * its possible calling this function may not have a noticeable effect on
+ * memory usage.
+ *
+ * \param [in] state the connection object
+ *
+ * \sa amqp_maybe_release_buffers_on_channel()
+ *
+ * \since v0.1
+ */
AMQP_PUBLIC_FUNCTION
void
AMQP_CALL amqp_maybe_release_buffers(amqp_connection_state_t state);
+/**
+ * Release amqp_connection_state_t owned memory related to a channel
+ *
+ * Release memory owned by the amqp_connection_state_t object related to the
+ * specified channel, allowing reuse by the library. Use of any memory returned
+ * the library for a specific channel will result in undefined behavior.
+ *
+ * \note internally rabbitmq-c tries to reuse memory when possible. As a result
+ * its possible calling this function may not have a noticeable effect on
+ * memory usage.
+ *
+ * \param [in] state the connection object
+ * \param [in] channel the channel specifier for which memory should be
+ * released. Note that the library does not care about the state of the
+ * channel when calling this function
+ *
+ * \sa amqp_maybe_release_buffers()
+ *
+ * \since v0.4.0
+ */
AMQP_PUBLIC_FUNCTION
void
-AMQP_CALL amqp_maybe_release_buffers_on_channel(amqp_connection_state_t state, amqp_channel_t channel);
+AMQP_CALL amqp_maybe_release_buffers_on_channel(amqp_connection_state_t state,
+ amqp_channel_t channel);
+/**
+ * Send a frame to the broker
+ *
+ * \param [in] state the connection object
+ * \param [in] frame the frame to send to the broker
+ * \return AMQP_STATUS_OK on success, an amqp_status_enum value on error.
+ * Possible error codes:
+ * - AMQP_STATUS_BAD_AMQP_DATA the serialized form of the method or
+ * properties was too large to fit in a single AMQP frame, or the
+ * method contains an invalid value. The frame was not sent.
+ * - AMQP_STATUS_TABLE_TOO_BIG the serialized form of an amqp_table_t is
+ * too large to fit in a single AMQP frame. Frame was not sent.
+ * - AMQP_STATUS_UNKNOWN_METHOD an invalid method type was passed in
+ * - AMQP_STATUS_UNKNOWN_CLASS an invalid properties type was passed in
+ * - AMQP_STATUS_TIMER_FAILURE system timer indicated failure. The frame
+ * was sent
+ * - AMQP_STATUS_SOCKET_ERROR
+ * - AMQP_STATUS_SSL_ERROR
+ *
+ * \since v0.1
+ */
AMQP_PUBLIC_FUNCTION
int
AMQP_CALL amqp_send_frame(amqp_connection_state_t state, amqp_frame_t const *frame);
+/**
+ * Compare two table entries
+ *
+ * Works just like strcmp(), comparing two the table keys, datatype, then values
+ *
+ * \param [in] entry1 the entry on the left
+ * \param [in] entry2 the entry on the right
+ * \return 0 if entries are equal, 0 < if left is greater, 0 > if right is greater
+ *
+ * \since v0.1
+ */
AMQP_PUBLIC_FUNCTION
int
AMQP_CALL amqp_table_entry_cmp(void const *entry1, void const *entry2);
+/**
+ * Open a socket to a remote host
+ *
+ * \deprecated This function is deprecated in favor of amqp_socket_open()
+ *
+ * Looks up the hostname, then attempts to open a socket to the host using
+ * the specified portnumber. It also sets various options on the socket to
+ * improve performance and correctness.
+ *
+ * \param [in] hostname this can be a hostname or IP address.
+ * Both IPv4 and IPv6 are acceptable
+ * \param [in] portnumber the port to connect on. RabbitMQ brokers
+ * listen on port 5672, and 5671 for SSL
+ * \return a positive value indicates success and is the sockfd. A negative
+ * value (see amqp_status_enum)is returned on failure. Possible error codes:
+ * - AMQP_STATUS_TCP_SOCKETLIB_INIT_ERROR Initialization of underlying socket
+ * library failed.
+ * - AMQP_STATUS_HOSTNAME_RESOLUTION_FAILED hostname lookup failed.
+ * - AMQP_STATUS_SOCKET_ERROR a socket error occurred. errno or WSAGetLastError()
+ * may return more useful information.
+ *
+ * \note IPv6 support was added in v0.3
+ *
+ * \sa amqp_socket_open() amqp_set_sockfd()
+ *
+ * \since v0.1
+ */
AMQP_PUBLIC_FUNCTION
int
AMQP_CALL amqp_open_socket(char const *hostname, int portnumber);
+/**
+ * Send initial AMQP header to the broker
+ *
+ * \warning this is a low level function intended for those who want to
+ * interact with the broker at a very low level. Use of this function without
+ * understanding what it does will result in AMQP protocol errors.
+ *
+ * This function sends the AMQP protocol header to the broker.
+ *
+ * \param [in] state the connection object
+ * \return AMQP_STATUS_OK on success, a negative value on failure. Possible
+ * error codes:
+ * - AMQP_STATUS_CONNECTION_CLOSED the connection to the broker was closed.
+ * - AMQP_STATUS_SOCKET_ERROR a socket error occurred. It is likely the
+ * underlying socket has been closed. errno or WSAGetLastError() may provide
+ * further information.
+ * - AMQP_STATUS_SSL_ERROR a SSL error occurred. The connection to the broker
+ * was closed.
+ *
+ * \since v0.1
+ */
AMQP_PUBLIC_FUNCTION
int
AMQP_CALL amqp_send_header(amqp_connection_state_t state);
+/**
+ * Checks to see if there are any incoming frames ready to be read
+ *
+ * Checks to see if there are any amqp_frame_t objects buffered by the
+ * amqp_connection_state_t object. Having one or more frames buffered means
+ * that amqp_simple_wait_frame() or amqp_simple_wait_frame_noblock() will
+ * return a frame without potentially blocking on a read() call.
+ *
+ * \param [in] state the connection object
+ * \return TRUE if there are frames enqueued, FALSE otherwise
+ *
+ * \sa amqp_simple_wait_frame() amqp_simple_wait_frame_noblock()
+ * amqp_data_in_buffer()
+ *
+ * \since v0.1
+ */
AMQP_PUBLIC_FUNCTION
amqp_boolean_t
AMQP_CALL amqp_frames_enqueued(amqp_connection_state_t state);
+/**
+ * Read a single amqp_frame_t
+ *
+ * Waits for the next amqp_frame_t frame to be read from the broker.
+ * This function has the potential to block for a long time in the case of
+ * waiting for a basic.deliver method frame from the broker.
+ *
+ * The library may buffer frames. When an amqp_connection_state_t object
+ * has frames buffered calling amqp_simple_wait_frame() will return an
+ * amqp_frame_t without entering a blocking read(). You can test to see if
+ * an amqp_connection_state_t object has frames buffered by calling the
+ * amqp_frames_enqueued() function.
+ *
+ * The library has a socket read buffer. When there is data in an
+ * amqp_connection_state_t read buffer, amqp_simple_wait_frame() may return an
+ * amqp_frame_t without entering a blocking read(). You can test to see if an
+ * amqp_connection_state_t object has data in its read buffer by calling the
+ * amqp_data_in_buffer() function.
+ *
+ * \param [in] state the connection object
+ * \param [out] decoded_frame the frame
+ * \return AMQP_STATUS_OK on success, an amqp_status_enum value
+ * is returned otherwise. Possible errors include:
+ * - AMQP_STATUS_NO_MEMORY failure in allocating memory. The library is likely in
+ * an indeterminate state making recovery unlikely. Client should note the error
+ * and terminate the application
+ * - AMQP_STATUS_BAD_AMQP_DATA bad AMQP data was received. The connection
+ * should be shutdown immediately
+ * - AMQP_STATUS_UNKNOWN_METHOD: an unknown method was received from the
+ * broker. This is likely a protocol error and the connection should be
+ * shutdown immediately
+ * - AMQP_STATUS_UNKNOWN_CLASS: a properties frame with an unknown class
+ * was received from the broker. This is likely a protocol error and the
+ * connection should be shutdown immediately
+ * - AMQP_STATUS_HEARTBEAT_TIMEOUT timed out while waiting for heartbeat
+ * from the broker. The connection has been closed.
+ * - AMQP_STATUS_TIMER_FAILURE system timer indicated failure.
+ * - AMQP_STATUS_SOCKET_ERROR a socket error occurred. The connection has
+ * been closed
+ * - AMQP_STATUS_SSL_ERROR a SSL socket error occurred. The connection has
+ * been closed.
+ *
+ * \sa amqp_simple_wait_frame_noblock() amqp_frames_enqueued()
+ * amqp_data_in_buffer()
+ *
+ * \note as of v0.4.0 this function will no longer return heartbeat frames
+ * when enabled by specifying a non-zero heartbeat value in amqp_login().
+ * Heartbeating is handled internally by the library.
+ *
+ * \since v0.1
+ */
AMQP_PUBLIC_FUNCTION
int
AMQP_CALL amqp_simple_wait_frame(amqp_connection_state_t state,
amqp_frame_t *decoded_frame);
+/**
+ * Read a single amqp_frame_t with a timeout.
+ *
+ * Waits for the next amqp_frame_t frame to be read from the broker, up to
+ * a timespan specified by tv. The function will return AMQP_STATUS_TIMEOUT
+ * if the timeout is reached. The tv value is not modified by the function.
+ *
+ * If a 0 timeval is specified, the function behaves as if its non-blocking: it
+ * will test to see if a frame can be read from the broker, and return immediately.
+ *
+ * If NULL is passed in for tv, the function will behave like
+ * amqp_simple_wait_frame() and block until a frame is received from the broker
+ *
+ * The library may buffer frames. When an amqp_connection_state_t object
+ * has frames buffered calling amqp_simple_wait_frame_noblock() will return an
+ * amqp_frame_t without entering a blocking read(). You can test to see if an
+ * amqp_connection_state_t object has frames buffered by calling the
+ * amqp_frames_enqueued() function.
+ *
+ * The library has a socket read buffer. When there is data in an
+ * amqp_connection_state_t read buffer, amqp_simple_wait_frame_noblock() may return
+ * an amqp_frame_t without entering a blocking read(). You can test to see if an
+ * amqp_connection_state_t object has data in its read buffer by calling the
+ * amqp_data_in_buffer() function.
+ *
+ * \note This function does not return heartbeat frames. When enabled, heartbeating
+ * is handed internally internally by the library
+ *
+ * \param [in,out] state the connection object
+ * \param [out] decoded_frame the frame
+ * \param [in] tv the maximum time to wait for a frame to be read. Setting
+ * tv->tv_sec = 0 and tv->tv_usec = 0 will do a non-blocking read. Specifying
+ * NULL for tv will make the function block until a frame is read.
+ * \return AMQP_STATUS_OK on success. An amqp_status_enum value is returned
+ * otherwise. Possible errors include:
+ * - AMQP_STATUS_TIMEOUT the timeout was reached while waiting for a frame
+ * from the broker.
+ * - AMQP_STATUS_INVALID_PARAMETER the tv parameter contains an invalid value.
+ * - AMQP_STATUS_NO_MEMORY failure in allocating memory. The library is likely in
+ * an indeterminate state making recovery unlikely. Client should note the error
+ * and terminate the application
+ * - AMQP_STATUS_BAD_AMQP_DATA bad AMQP data was received. The connection
+ * should be shutdown immediately
+ * - AMQP_STATUS_UNKNOWN_METHOD: an unknown method was received from the
+ * broker. This is likely a protocol error and the connection should be
+ * shutdown immediately
+ * - AMQP_STATUS_UNKNOWN_CLASS: a properties frame with an unknown class
+ * was received from the broker. This is likely a protocol error and the
+ * connection should be shutdown immediately
+ * - AMQP_STATUS_HEARTBEAT_TIMEOUT timed out while waiting for heartbeat
+ * from the broker. The connection has been closed.
+ * - AMQP_STATUS_TIMER_FAILURE system timer indicated failure.
+ * - AMQP_STATUS_SOCKET_ERROR a socket error occurred. The connection has
+ * been closed
+ * - AMQP_STATUS_SSL_ERROR a SSL socket error occurred. The connection has
+ * been closed.
+ *
+ * \sa amqp_simple_wait_frame() amqp_frames_enqueued() amqp_data_in_buffer()
+ *
+ * \since v0.4.0
+ */
AMQP_PUBLIC_FUNCTION
int
AMQP_CALL amqp_simple_wait_frame_noblock(amqp_connection_state_t state,
amqp_frame_t *decoded_frame,
struct timeval *tv);
+/**
+ * Waits for a specific method from the broker
+ *
+ * \warning You probably don't want to use this function. If this function
+ * doesn't receive exactly the frame requested it closes the whole connection.
+ *
+ * Waits for a single method on a channel from the broker.
+ * If a frame is received that does not match expected_channel
+ * or expected_method the program will abort
+ *
+ * \param [in] state the connection object
+ * \param [in] expected_channel the channel that the method should be delivered on
+ * \param [in] expected_method the method to wait for
+ * \param [out] output the method
+ * \returns AMQP_STATUS_OK on success. An amqp_status_enum value is returned
+ * otherwise. Possible errors include:
+ * - AMQP_STATUS_WRONG_METHOD a frame containing the wrong method, wrong frame
+ * type or wrong channel was received. The connection is closed.
+ * - AMQP_STATUS_NO_MEMORY failure in allocating memory. The library is likely in
+ * an indeterminate state making recovery unlikely. Client should note the error
+ * and terminate the application
+ * - AMQP_STATUS_BAD_AMQP_DATA bad AMQP data was received. The connection
+ * should be shutdown immediately
+ * - AMQP_STATUS_UNKNOWN_METHOD: an unknown method was received from the
+ * broker. This is likely a protocol error and the connection should be
+ * shutdown immediately
+ * - AMQP_STATUS_UNKNOWN_CLASS: a properties frame with an unknown class
+ * was received from the broker. This is likely a protocol error and the
+ * connection should be shutdown immediately
+ * - AMQP_STATUS_HEARTBEAT_TIMEOUT timed out while waiting for heartbeat
+ * from the broker. The connection has been closed.
+ * - AMQP_STATUS_TIMER_FAILURE system timer indicated failure.
+ * - AMQP_STATUS_SOCKET_ERROR a socket error occurred. The connection has
+ * been closed
+ * - AMQP_STATUS_SSL_ERROR a SSL socket error occurred. The connection has
+ * been closed.
+ *
+ * \since v0.1
+ */
+
AMQP_PUBLIC_FUNCTION
int
AMQP_CALL amqp_simple_wait_method(amqp_connection_state_t state,
@@ -503,6 +1539,32 @@ AMQP_CALL amqp_simple_wait_method(amqp_connection_state_t state,
amqp_method_number_t expected_method,
amqp_method_t *output);
+/**
+ * Sends a method to the broker
+ *
+ * This is a thin wrapper around amqp_send_frame(), providing a way to send
+ * a method to the broker on a specified channel.
+ *
+ * \param [in] state the connection object
+ * \param [in] channel the channel object
+ * \param [in] id the method number
+ * \param [in] decoded the method object
+ * \returns AMQP_STATUS_OK on success, an amqp_status_enum value otherwise.
+ * Possible errors include:
+ * - AMQP_STATUS_BAD_AMQP_DATA the serialized form of the method or
+ * properties was too large to fit in a single AMQP frame, or the
+ * method contains an invalid value. The frame was not sent.
+ * - AMQP_STATUS_TABLE_TOO_BIG the serialized form of an amqp_table_t is
+ * too large to fit in a single AMQP frame. Frame was not sent.
+ * - AMQP_STATUS_UNKNOWN_METHOD an invalid method type was passed in
+ * - AMQP_STATUS_UNKNOWN_CLASS an invalid properties type was passed in
+ * - AMQP_STATUS_TIMER_FAILURE system timer indicated failure. The frame
+ * was sent
+ * - AMQP_STATUS_SOCKET_ERROR
+ * - AMQP_STATUS_SSL_ERROR
+ *
+ * \since v0.1
+ */
AMQP_PUBLIC_FUNCTION
int
AMQP_CALL amqp_send_method(amqp_connection_state_t state,
@@ -510,6 +1572,38 @@ AMQP_CALL amqp_send_method(amqp_connection_state_t state,
amqp_method_number_t id,
void *decoded);
+/**
+ * Sends a method to the broker and waits for a method response
+ *
+ * \param [in] state the connection object
+ * \param [in] channel the channel object
+ * \param [in] request_id the method number of the request
+ * \param [in] expected_reply_ids a 0 terminated array of expected response
+ * method numbers
+ * \param [in] decoded_request_method the method to be sent to the broker
+ * \return a amqp_rpc_reply_t:
+ * - r.reply_type == AMQP_RESPONSE_NORMAL. RPC completed successfully
+ * - r.reply_type == AMQP_RESPONSE_SERVER_EXCEPTION. The broker returned an
+ * exception:
+ * - If r.reply.id == AMQP_CHANNEL_CLOSE_METHOD a channel exception
+ * occurred, cast r.reply.decoded to amqp_channel_close_t* to see details
+ * of the exception. The client should amqp_send_method() a
+ * amqp_channel_close_ok_t. The channel must be re-opened before it
+ * can be used again. Any resources associated with the channel
+ * (auto-delete exchanges, auto-delete queues, consumers) are invalid
+ * and must be recreated before attempting to use them again.
+ * - If r.reply.id == AMQP_CONNECTION_CLOSE_METHOD a connection exception
+ * occurred, cast r.reply.decoded to amqp_connection_close_t* to see
+ * details of the exception. The client amqp_send_method() a
+ * amqp_connection_close_ok_t and disconnect from the broker.
+ * - r.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION. An exception occurred
+ * within the library. Examine r.library_error and compare it against
+ * amqp_status_enum values to determine the error.
+ *
+ * \sa amqp_simple_rpc_decoded()
+ *
+ * \since v0.1
+ */
AMQP_PUBLIC_FUNCTION
amqp_rpc_reply_t
AMQP_CALL amqp_simple_rpc(amqp_connection_state_t state,
@@ -518,6 +1612,20 @@ AMQP_CALL amqp_simple_rpc(amqp_connection_state_t state,
amqp_method_number_t *expected_reply_ids,
void *decoded_request_method);
+/**
+ * Sends a method to the broker and waits for a method response
+ *
+ * \param [in] state the connection object
+ * \param [in] channel the channel object
+ * \param [in] request_id the method number of the request
+ * \param [in] reply_id the method number expected in response
+ * \param [in] decoded_request_method the request method
+ * \return a pointer to the method returned from the broker, or NULL on error.
+ * On error amqp_get_rpc_reply() will return an amqp_rpc_reply_t with
+ * details on the error that occurred.
+ *
+ * \since v0.1
+ */
AMQP_PUBLIC_FUNCTION
void *
AMQP_CALL amqp_simple_rpc_decoded(amqp_connection_state_t state,
@@ -526,7 +1634,9 @@ AMQP_CALL amqp_simple_rpc_decoded(amqp_connection_state_t state,
amqp_method_number_t reply_id,
void *decoded_request_method);
-/*
+/**
+ * Get the last global amqp_rpc_reply
+ *
* The API methods corresponding to most synchronous AMQP methods
* return a pointer to the decoded method result. Upon error, they
* return NULL, and we need some way of discovering what, if anything,
@@ -538,17 +1648,143 @@ AMQP_CALL amqp_simple_rpc_decoded(amqp_connection_state_t state,
* amqp_rpc_reply_t; operations that do return amqp_rpc_reply_t
* generally do NOT update this per-connection-global amqp_rpc_reply_t
* instance.
+ *
+ * \param [in] state the connection object
+ * \return the most recent amqp_rpc_reply_t:
+ * - r.reply_type == AMQP_RESPONSE_NORMAL. RPC completed successfully
+ * - r.reply_type == AMQP_RESPONSE_SERVER_EXCEPTION. The broker returned an
+ * exception:
+ * - If r.reply.id == AMQP_CHANNEL_CLOSE_METHOD a channel exception
+ * occurred, cast r.reply.decoded to amqp_channel_close_t* to see details
+ * of the exception. The client should amqp_send_method() a
+ * amqp_channel_close_ok_t. The channel must be re-opened before it
+ * can be used again. Any resources associated with the channel
+ * (auto-delete exchanges, auto-delete queues, consumers) are invalid
+ * and must be recreated before attempting to use them again.
+ * - If r.reply.id == AMQP_CONNECTION_CLOSE_METHOD a connection exception
+ * occurred, cast r.reply.decoded to amqp_connection_close_t* to see
+ * details of the exception. The client amqp_send_method() a
+ * amqp_connection_close_ok_t and disconnect from the broker.
+ * - r.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION. An exception occurred
+ * within the library. Examine r.library_error and compare it against
+ * amqp_status_enum values to determine the error.
+ *
+ * \sa amqp_simple_rpc_decoded()
+ *
+ * \since v0.1
*/
AMQP_PUBLIC_FUNCTION
amqp_rpc_reply_t
AMQP_CALL amqp_get_rpc_reply(amqp_connection_state_t state);
+/**
+ * Login to the broker
+ *
+ * After using amqp_open_socket and amqp_set_sockfd, call
+ * amqp_login to complete connecting to the broker
+ *
+ * \param [in] state the connection object
+ * \param [in] vhost the virtual host to connect to on the broker. The default
+ * on most brokers is "/"
+ * \param [in] channel_max the limit for number of channels for the connection.
+ * 0 means no limit, and is a good default (AMQP_DEFAULT_MAX_CHANNELS)
+ * Note that the maximum number of channels the protocol supports
+ * is 65535 (2^16, with the 0-channel reserved)
+ * \param [in] frame_max the maximum size of an AMQP frame on the wire to
+ * request of the broker for this connection. 4096 is the minimum
+ * size, 2^31-1 is the maximum, a good default is 131072 (128KB), or
+ * AMQP_DEFAULT_FRAME_SIZE
+ * \param [in] heartbeat the number of seconds between heartbeat frames to
+ * request of the broker. A value of 0 disables heartbeats.
+ * Note rabbitmq-c only has partial support for heartbeats, as of
+ * v0.4.0 they are only serviced during amqp_basic_publish() and
+ * amqp_simple_wait_frame()/amqp_simple_wait_frame_noblock()
+ * \param [in] sasl_method the SASL method to authenticate with the broker.
+ * followed by the authentication information.
+ * For AMQP_SASL_METHOD_PLAIN, the AMQP_SASL_METHOD_PLAIN
+ * should be followed by two arguments in this order:
+ * const char* username, and const char* password.
+ * \return amqp_rpc_reply_t indicating success or failure.
+ * - r.reply_type == AMQP_RESPONSE_NORMAL. Login completed successfully
+ * - r.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION. In most cases errors
+ * from the broker when logging in will be represented by the broker closing
+ * the socket. In this case r.library_error will be set to
+ * AMQP_STATUS_CONNECTION_CLOSED. This error can represent a number of
+ * error conditions including: invalid vhost, authentication failure.
+ * - r.reply_type == AMQP_RESPONSE_SERVER_EXCEPTION. The broker returned an
+ * exception:
+ * - If r.reply.id == AMQP_CHANNEL_CLOSE_METHOD a channel exception
+ * occurred, cast r.reply.decoded to amqp_channel_close_t* to see details
+ * of the exception. The client should amqp_send_method() a
+ * amqp_channel_close_ok_t. The channel must be re-opened before it
+ * can be used again. Any resources associated with the channel
+ * (auto-delete exchanges, auto-delete queues, consumers) are invalid
+ * and must be recreated before attempting to use them again.
+ * - If r.reply.id == AMQP_CONNECTION_CLOSE_METHOD a connection exception
+ * occurred, cast r.reply.decoded to amqp_connection_close_t* to see
+ * details of the exception. The client amqp_send_method() a
+ * amqp_connection_close_ok_t and disconnect from the broker.
+ *
+ * \since v0.1
+ */
AMQP_PUBLIC_FUNCTION
amqp_rpc_reply_t
AMQP_CALL amqp_login(amqp_connection_state_t state, char const *vhost,
int channel_max, int frame_max, int heartbeat,
amqp_sasl_method_enum sasl_method, ...);
+/**
+ * Login to the broker passing a properties table
+ *
+ * This function is similar to amqp_login() and differs in that it provides a
+ * way to pass client properties to the broker. This is commonly used to
+ * negotiate newer protocol features as they are supported by the broker.
+ *
+ * \param [in] state the connection object
+ * \param [in] vhost the virtual host to connect to on the broker. The default
+ * on most brokers is "/"
+ * \param [in] channel_max the limit for the number of channels for the connection.
+ * 0 means no limit, and is a good default (AMQP_DEFAULT_MAX_CHANNELS)
+ * Note that the maximum number of channels the protocol supports
+ * is 65535 (2^16, with the 0-channel reserved)
+ * \param [in] frame_max the maximum size of an AMQP frame ont he wire to
+ * request of the broker for this connection. 4096 is the minimum
+ * size, 2^31-1 is the maximum, a good default is 131072 (128KB), or
+ * AMQP_DEFAULT_FRAME_SIZE
+ * \param [in] heartbeat the number of seconds between heartbeat frame to
+ * request of the broker. A value of 0 disables heartbeats.
+ * Note rabbitmq-c only has partial support for hearts, as of
+ * v0.4.0 heartbeats are only serviced during amqp_basic_publish(),
+ * and amqp_simple_wait_frame()/amqp_simple_wait_frame_noblock()
+ * \param [in] properties a table of properties to send the broker.
+ * \param [in] sasl_method the SASL method to authenticate with the broker
+ * followed by the authentication information.
+ * For AMQP_SASL_METHOD_PLAN, the AMQP_SASL_METHOD_PLAIN parameter
+ * should be followed by two arguments in this order:
+ * const char* username, and const char* password.
+ * \return amqp_rpc_reply_t indicating success or failure.
+ * - r.reply_type == AMQP_RESPONSE_NORMAL. Login completed successfully
+ * - r.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION. In most cases errors
+ * from the broker when logging in will be represented by the broker closing
+ * the socket. In this case r.library_error will be set to
+ * AMQP_STATUS_CONNECTION_CLOSED. This error can represent a number of
+ * error conditions including: invalid vhost, authentication failure.
+ * - r.reply_type == AMQP_RESPONSE_SERVER_EXCEPTION. The broker returned an
+ * exception:
+ * - If r.reply.id == AMQP_CHANNEL_CLOSE_METHOD a channel exception
+ * occurred, cast r.reply.decoded to amqp_channel_close_t* to see details
+ * of the exception. The client should amqp_send_method() a
+ * amqp_channel_close_ok_t. The channel must be re-opened before it
+ * can be used again. Any resources associated with the channel
+ * (auto-delete exchanges, auto-delete queues, consumers) are invalid
+ * and must be recreated before attempting to use them again.
+ * - If r.reply.id == AMQP_CONNECTION_CLOSE_METHOD a connection exception
+ * occurred, cast r.reply.decoded to amqp_connection_close_t* to see
+ * details of the exception. The client amqp_send_method() a
+ * amqp_connection_close_ok_t and disconnect from the broker.
+ *
+ * \since v0.4.0
+ */
AMQP_PUBLIC_FUNCTION
amqp_rpc_reply_t
AMQP_CALL amqp_login_with_properties(amqp_connection_state_t state, char const *vhost,
@@ -557,6 +1793,52 @@ AMQP_CALL amqp_login_with_properties(amqp_connection_state_t state, char const *
struct amqp_basic_properties_t_;
+/**
+ * Publish a message to the broker
+ *
+ * Publish a message on an exchange with a routing key.
+ *
+ * Note that at the AMQ protocol level basic.publish is an async method:
+ * this means error conditions that occur on the broker (such as publishing to
+ * a non-existent exchange) will not be reflected in the return value of this
+ * function.
+ *
+ * in the return value from this function.
+ * \param [in] state the connection object
+ * \param [in] channel the channel identifier
+ * \param [in] exchange the exchange on the broker to publish to
+ * \param [in] routing_key the routing key to use when publishing the message
+ * \param [in] mandatory indicate to the broker that the message MUST be routed
+ * to a queue. If the broker cannot do this it should respond with
+ * a basic.reject method.
+ * \param [in] immediate indicate to the broker that the message MUST be delivered
+ * to a consumer immediately. If the broker cannot do this it should
+ * response with a basic.reject method.
+ * \param [in] properties the properties associated with the message
+ * \param [in] body the message body
+ * \return AMQP_STATUS_OK on success, amqp_status_enum value on failure. Note
+ * that basic.publish is an async method, the return value from this
+ * function only indicates that the message data was successfully
+ * transmitted to the broker. It does not indicate failures that occur
+ * on the broker, such as publishing to a non-existent exchange.
+ * Possible error values:
+ * - AMQP_STATUS_TIMER_FAILURE: system timer facility returned an error
+ * the message was not sent.
+ * - AMQP_STATUS_HEARTBEAT_TIMEOUT: connection timed out waiting for a
+ * heartbeat from the broker. The message was not sent.
+ * - AMQP_STATUS_NO_MEMORY: memory allocation failed. The message was
+ * not sent.
+ * - AMQP_STATUS_TABLE_TOO_BIG: a table in the properties was too large
+ * to fit in a single frame. Message was not sent.
+ * - AMQP_STATUS_CONNECTION_CLOSED: the connection was closed.
+ * - AMQP_STATUS_SSL_ERROR: a SSL error occurred.
+ * - AMQP_STATUS_TCP_ERROR: a TCP error occurred. errno or
+ * WSAGetLastError() may provide more information
+ *
+ * Note: this function does heartbeat processing as of v0.4.0
+ *
+ * \since v0.1
+ */
AMQP_PUBLIC_FUNCTION
int
AMQP_CALL amqp_basic_publish(amqp_connection_state_t state, amqp_channel_t channel,
@@ -565,51 +1847,153 @@ AMQP_CALL amqp_basic_publish(amqp_connection_state_t state, amqp_channel_t chann
struct amqp_basic_properties_t_ const *properties,
amqp_bytes_t body);
+/**
+ * Closes an channel
+ *
+ * \param [in] state the connection object
+ * \param [in] channel the channel identifier
+ * \param [in] code the reason for closing the channel, AMQP_REPLY_SUCCESS is a good default
+ * \return amqp_rpc_reply_t indicating success or failure
+ *
+ * \since v0.1
+ */
AMQP_PUBLIC_FUNCTION
amqp_rpc_reply_t
AMQP_CALL amqp_channel_close(amqp_connection_state_t state, amqp_channel_t channel,
int code);
+/**
+ * Closes the entire connection
+ *
+ * Implicitly closes all channels and informs the broker the connection
+ * is being closed, after receiving acknowldgement from the broker it closes
+ * the socket.
+ *
+ * \param [in] state the connection object
+ * \param [in] code the reason code for closing the connection. AMQP_REPLY_SUCCESS is a good default.
+ * \return amqp_rpc_reply_t indicating the result
+ *
+ * \since v0.1
+ */
AMQP_PUBLIC_FUNCTION
amqp_rpc_reply_t
AMQP_CALL amqp_connection_close(amqp_connection_state_t state, int code);
+/**
+ * Acknowledges a message
+ *
+ * Does a basic.ack on a received message
+ *
+ * \param [in] state the connection object
+ * \param [in] channel the channel identifier
+ * \param [in] delivery_tag the delivery tag of the message to be ack'd
+ * \param [in] multiple if true, ack all messages up to this delivery tag, if
+ * false ack only this delivery tag
+ * \return 0 on success, 0 > on failing to send the ack to the broker.
+ * this will not indicate failure if something goes wrong on the broker
+ *
+ * \since v0.1
+ */
AMQP_PUBLIC_FUNCTION
int
AMQP_CALL amqp_basic_ack(amqp_connection_state_t state, amqp_channel_t channel,
uint64_t delivery_tag, amqp_boolean_t multiple);
+/**
+ * Do a basic.get
+ *
+ * Synchonously polls the broker for a message in a queue, and
+ * retrieves the message if a message is in the queue.
+ *
+ * \param [in] state the connection object
+ * \param [in] channel the channel identifier to use
+ * \param [in] queue the queue name to retrieve from
+ * \param [in] no_ack if true the message is automatically ack'ed
+ * if false amqp_basic_ack should be called once the message
+ * retrieved has been processed
+ * \return amqp_rpc_reply indicating success or failure
+ *
+ * \since v0.1
+ */
AMQP_PUBLIC_FUNCTION
amqp_rpc_reply_t
AMQP_CALL amqp_basic_get(amqp_connection_state_t state, amqp_channel_t channel,
amqp_bytes_t queue, amqp_boolean_t no_ack);
+/**
+ * Do a basic.reject
+ *
+ * Actively reject a message that has been delivered
+ *
+ * \param [in] state the connection object
+ * \param [in] channel the channel identifier
+ * \param [in] delivery_tag the delivery tag of the message to reject
+ * \param [in] requeue indicate to the broker whether it should requeue the
+ * message or just discard it.
+ * \return 0 on success, 0 > on failing to send the reject method to the broker.
+ * This will not indicate failure if something goes wrong on the broker.
+ *
+ * \since v0.1
+ */
AMQP_PUBLIC_FUNCTION
int
AMQP_CALL amqp_basic_reject(amqp_connection_state_t state, amqp_channel_t channel,
uint64_t delivery_tag, amqp_boolean_t requeue);
-/*
+/**
+ * Do a basic.nack
+ *
+ * Actively reject a message, this has the same effect as amqp_basic_reject()
+ * however, amqp_basic_nack() can negatively acknowledge multiple messages with
+ * one call much like amqp_basic_ack() can acknowledge mutliple messages with
+ * one call.
+ *
+ * \param [in] state the connection object
+ * \param [in] channel the channel identifier
+ * \param [in] delivery_tag the delivery tag of the message to reject
+ * \param [in] multiple if set to 1 negatively acknowledge all unacknowledged
+ * messages on this channel.
+ * \param [in] requeue indicate to the broker whether it should requeue the
+ * message or dead-letter it.
+ * \return AMQP_STATUS_OK on success, an amqp_status_enum value otherwise.
+ *
+ * \since v0.5.0
+ */
+AMQP_PUBLIC_FUNCTION
+int
+AMQP_CALL amqp_basic_nack(amqp_connection_state_t state, amqp_channel_t channel,
+ uint64_t delivery_tag, amqp_boolean_t multiple,
+ amqp_boolean_t requeue);
+/**
+ * Check to see if there is data left in the receive buffer
+ *
* Can be used to see if there is data still in the buffer, if so
* calling amqp_simple_wait_frame will not immediately enter a
* blocking read.
*
- * Possibly amqp_frames_enqueued should be used for this?
+ * \param [in] state the connection object
+ * \return true if there is data in the recieve buffer, false otherwise
+ *
+ * \since v0.1
*/
AMQP_PUBLIC_FUNCTION
amqp_boolean_t
AMQP_CALL amqp_data_in_buffer(amqp_connection_state_t state);
-/*
+/**
* Get the error string for the given error code.
*
- * @deprecated This function has been deprecated in favor of
+ * \deprecated This function has been deprecated in favor of
* \ref amqp_error_string2() which returns statically allocated
* string which do not need to be freed by the caller.
*
* The returned string resides on the heap; the caller is responsible
- * for freeing it
+ * for freeing it.
+ *
+ * \param [in] err return error code
+ * \return the error string
*
+ * \since v0.1
*/
AMQP_DEPRECATED(
AMQP_PUBLIC_FUNCTION
@@ -617,33 +2001,257 @@ AMQP_DEPRECATED(
AMQP_CALL amqp_error_string(int err)
);
+
+/**
+ * Get the error string for the given error code.
+ *
+ * Get an error string associated with an error code. The string is statically
+ * allocated and does not need to be freed
+ *
+ * \param [in] err the error code
+ * \return the error string
+ *
+ * \since v0.4.0
+ */
AMQP_PUBLIC_FUNCTION
const char *
AMQP_CALL amqp_error_string2(int err);
+/**
+ * Deserialize an amqp_table_t from AMQP wireformat
+ *
+ * This is an internal function and is not typically used by
+ * client applications
+ *
+ * \param [in] encoded the buffer containing the serialized data
+ * \param [in] pool memory pool used to allocate the table entries from
+ * \param [in] output the amqp_table_t structure to fill in. Any existing
+ * entries will be erased
+ * \param [in,out] offset The offset into the encoded buffer to start
+ * reading the serialized table. It will be updated
+ * by this function to end of the table
+ * \return AMQP_STATUS_OK on success, an amqp_status_enum value on failure
+ * Possible error codes:
+ * - AMQP_STATUS_NO_MEMORY out of memory
+ * - AMQP_STATUS_BAD_AMQP_DATA invalid wireformat
+ *
+ * \since v0.1
+ */
AMQP_PUBLIC_FUNCTION
int
AMQP_CALL amqp_decode_table(amqp_bytes_t encoded, amqp_pool_t *pool,
amqp_table_t *output, size_t *offset);
+/**
+ * Serializes an amqp_table_t to the AMQP wireformat
+ *
+ * This is an internal function and is not typically used by
+ * client applications
+ *
+ * \param [in] encoded the buffer where to serialize the table to
+ * \param [in] input the amqp_table_t to serialize
+ * \param [in,out] offset The offset into the encoded buffer to start
+ * writing the serialized table. It will be updated
+ * by this function to where writing left off
+ * \return AMQP_STATUS_OK on success, an amqp_status_enum value on failure
+ * Possible error codes:
+ * - AMQP_STATUS_TABLE_TOO_BIG the serialized form is too large for the
+ * buffer
+ * - AMQP_STATUS_BAD_AMQP_DATA invalid table
+ *
+ * \since v0.1
+ */
AMQP_PUBLIC_FUNCTION
int
AMQP_CALL amqp_encode_table(amqp_bytes_t encoded, amqp_table_t *input, size_t *offset);
+
+/**
+ * Create a deep-copy of an amqp_table_t object
+ *
+ * Creates a deep-copy of an amqp_table_t object, using the provided pool
+ * object to allocate the necessary memory. This memory can be freed later by
+ * call recycle_amqp_pool(), or empty_amqp_pool()
+ *
+ * \param [in] original the table to copy
+ * \param [in,out] clone the table to copy to
+ * \param [in] pool the initialized memory pool to do allocations for the table
+ * from
+ * \return AMQP_STATUS_OK on success, amqp_status_enum value on failure.
+ * Possible error values:
+ * - AMQP_STATUS_NO_MEMORY - memory allocation failure.
+ * - AMQP_STATUS_INVALID_PARAMETER - invalid table (e.g., no key name)
+ *
+ * \since v0.4.0
+ */
+AMQP_PUBLIC_FUNCTION
+int
+AMQP_CALL amqp_table_clone(amqp_table_t *original, amqp_table_t *clone, amqp_pool_t *pool);
+
+/**
+ * A message object
+ *
+ * \since v0.4.0
+ */
+typedef struct amqp_message_t_ {
+ amqp_basic_properties_t properties; /**< message properties */
+ amqp_bytes_t body; /**< message body */
+ amqp_pool_t pool; /**< pool used to allocate properties */
+} amqp_message_t;
+
+/**
+ * Reads the next message on a channel
+ *
+ * Reads a complete message (header + body) on a specified channel. This
+ * function is intended to be used with amqp_basic_get() or when an
+ * AMQP_BASIC_DELIVERY_METHOD method is received.
+ *
+ * \param [in,out] state the connection object
+ * \param [in] channel the channel on which to read the message from
+ * \param [in,out] message a pointer to a amqp_message_t object. Caller should
+ * call amqp_message_destroy() when it is done using the
+ * fields in the message object. The caller is responsible for
+ * allocating/destroying the amqp_message_t object itself.
+ * \param [in] flags pass in 0. Currently unused.
+ * \returns a amqp_rpc_reply_t object. ret.reply_type == AMQP_RESPONSE_NORMAL on success.
+ *
+ * \since v0.4.0
+ */
+AMQP_PUBLIC_FUNCTION
+amqp_rpc_reply_t
+AMQP_CALL amqp_read_message(amqp_connection_state_t state,
+ amqp_channel_t channel,
+ amqp_message_t *message, int flags);
+
+/**
+ * Frees memory associated with a amqp_message_t allocated in amqp_read_message
+ *
+ * \param [in] message
+ *
+ * \since v0.4.0
+ */
+AMQP_PUBLIC_FUNCTION
+void
+AMQP_CALL amqp_destroy_message(amqp_message_t *message);
+
+/**
+ * Envelope object
+ *
+ * \since v0.4.0
+ */
+typedef struct amqp_envelope_t_ {
+ amqp_channel_t channel; /**< channel message was delivered on */
+ amqp_bytes_t consumer_tag; /**< the consumer tag the message was delivered to */
+ uint64_t delivery_tag; /**< the messages delivery tag */
+ amqp_boolean_t redelivered; /**< flag indicating whether this message is being redelivered */
+ amqp_bytes_t exchange; /**< exchange this message was published to */
+ amqp_bytes_t routing_key; /**< the routing key this message was published with */
+ amqp_message_t message; /**< the message */
+} amqp_envelope_t;
+
+/**
+ * Wait for and consume a message
+ *
+ * Waits for a basic.deliver method on any channel, upon receipt of
+ * basic.deliver it reads that message, and returns. If any other method is
+ * received before basic.deliver, this function will return an amqp_rpc_reply_t
+ * with ret.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION, and
+ * ret.library_error == AMQP_STATUS_UNEXPECTED_FRAME. The caller should then
+ * call amqp_simple_wait_frame() to read this frame and take appropriate action.
+ *
+ * This function should be used after starting a consumer with the
+ * amqp_basic_consume() function
+ *
+ * \param [in,out] state the connection object
+ * \param [in,out] envelope a pointer to a amqp_envelope_t object. Caller
+ * should call #amqp_destroy_envelope() when it is done using
+ * the fields in the envelope object. The caller is responsible
+ * for allocating/destroying the amqp_envelope_t object itself.
+ * \param [in] timeout a timeout to wait for a message delivery. Passing in
+ * NULL will result in blocking behavior.
+ * \param [in] flags pass in 0. Currently unused.
+ * \returns a amqp_rpc_reply_t object. ret.reply_type == AMQP_RESPONSE_NORMAL
+ * on success. If ret.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION, and
+ * ret.library_error == AMQP_STATUS_UNEXPECTED_FRAME, a frame other
+ * than AMQP_BASIC_DELIVER_METHOD was received, the caller should call
+ * amqp_simple_wait_frame() to read this frame and take appropriate
+ * action.
+ *
+ * \since v0.4.0
+ */
+AMQP_PUBLIC_FUNCTION
+amqp_rpc_reply_t
+AMQP_CALL amqp_consume_message(amqp_connection_state_t state,
+ amqp_envelope_t *envelope,
+ struct timeval *timeout, int flags);
+
+/**
+ * Frees memory associated with a amqp_envelope_t allocated in amqp_consume_message()
+ *
+ * \param [in] envelope
+ *
+ * \since v0.4.0
+ */
+AMQP_PUBLIC_FUNCTION
+void
+AMQP_CALL amqp_destroy_envelope(amqp_envelope_t *envelope);
+
+
+/**
+ * Parameters used to connect to the RabbitMQ broker
+ *
+ * \since v0.2
+ */
struct amqp_connection_info {
- char *user;
- char *password;
- char *host;
- char *vhost;
- int port;
+ char *user; /**< the username to authenticate with the broker, default on most broker is 'guest' */
+ char *password; /**< the password to authenticate with the broker, default on most brokers is 'guest' */
+ char *host; /**< the hostname of the broker */
+ char *vhost; /**< the virtual host on the broker to connect to, a good default is "/" */
+ int port; /**< the port that the broker is listening on, default on most brokers is 5672 */
amqp_boolean_t ssl;
};
+/**
+ * Initialze an amqp_connection_info to default values
+ *
+ * The default values are:
+ * - user: "guest"
+ * - password: "guest"
+ * - host: "localhost"
+ * - vhost: "/"
+ * - port: 5672
+ *
+ * \param [out] parsed the connection info to set defaults on
+ *
+ * \since v0.2
+ */
AMQP_PUBLIC_FUNCTION
void
AMQP_CALL amqp_default_connection_info(struct amqp_connection_info *parsed);
+/**
+ * Parse a connection URL
+ *
+ * An amqp connection url takes the form:
+ *
+ * amqp://[$USERNAME[:$PASSWORD]\@]$HOST[:$PORT]/[$VHOST]
+ *
+ * Examples:
+ * amqp://guest:guest\@localhost:5672//
+ * amqp://guest:guest\@localhost/myvhost
+ *
+ * \note This function modifies url parameter.
+ *
+ * \param [in] url URI to parse, note that this parameter is modified by the
+ * function.
+ * \param [out] parsed the connection info gleaned from the URI. The char*
+ * members will point to parts of the url input parameter.
+ * Memory management will depend on how the url is allocated.
+ * \returns AMQP_STATUS_OK on success, AMQP_STATUS_BAD_URL on failure
+ *
+ * \since v0.2
+ */
AMQP_PUBLIC_FUNCTION
int
AMQP_CALL amqp_parse_url(char *url, struct amqp_connection_info *parsed);
@@ -662,7 +2270,9 @@ AMQP_CALL amqp_parse_url(char *url, struct amqp_connection_info *parsed);
* \param [in] host Connect to this host.
* \param [in] port Connect on this remote port.
*
- * \return Zero upon success, non-zero otherwise.
+ * \return AMQP_STATUS_OK on success, an amqp_status_enum on failure
+ *
+ * \since v0.4.0
*/
AMQP_PUBLIC_FUNCTION
int
@@ -670,54 +2280,74 @@ AMQP_CALL
amqp_socket_open(amqp_socket_t *self, const char *host, int port);
/**
- * Close a socket connection and free resources.
+ * Open a socket connection.
*
- * This function closes a socket connection and releases any resources used by
- * the object. After calling this function the specified socket should no
- * longer be referenced.
+ * This function opens a socket connection returned from amqp_tcp_socket_new()
+ * or amqp_ssl_socket_new(). This function should be called after setting
+ * socket options and prior to assigning the socket to an AMQP connection with
+ * amqp_set_socket().
*
* \param [in,out] self A socket object.
+ * \param [in] host Connect to this host.
+ * \param [in] port Connect on this remote port.
+ * \param [in] timeout Max allowed time to spent on opening. If NULL - run in blocking mode
+ *
+ * \return AMQP_STATUS_OK on success, an amqp_status_enum on failure.
*
- * \return Zero upon success, non-zero otherwise.
+ * \since v0.4.0
*/
AMQP_PUBLIC_FUNCTION
int
AMQP_CALL
-amqp_socket_close(amqp_socket_t *self);
+amqp_socket_open_noblock(amqp_socket_t *self, const char *host, int port, struct timeval *timeout);
/**
- * Retrieve an error code for the last socket operation.
+ * Get the socket descriptor in use by a socket object.
*
- * At the time of writing, this interface is not well supported and is subject
- * to changes!
+ * Retrieve the underlying socket descriptor. This function can be used to
+ * perform low-level socket operations that aren't supported by the socket
+ * interface. Use with caution!
*
* \param [in,out] self A socket object.
*
- * \return Zero upon success, an opaque error code otherwise
+ * \return The underlying socket descriptor, or -1 if there is no socket descriptor
+ * associated with
+ * with
+ *
+ * \since v0.4.0
*/
AMQP_PUBLIC_FUNCTION
int
AMQP_CALL
-amqp_socket_error(amqp_socket_t *self);
+amqp_socket_get_sockfd(amqp_socket_t *self);
/**
- * Get the socket descriptor in use by a socket object.
+ * Get the socket object associated with a amqp_connection_state_t
*
- * Retrieve the underlying socket descriptor. This function can be used to
- * perform low-level socket operations that aren't supported by the socket
- * interface. Use with caution!
+ * \param [in] state the connection object to get the socket from
+ * \return a pointer to the socket object, or NULL if one has not been assigned
*
- * \param [in,out] self A socket object.
+ * \since v0.4.0
+ */
+AMQP_PUBLIC_FUNCTION
+amqp_socket_t *
+amqp_get_socket(amqp_connection_state_t state);
+
+/**
+ * Get the broker properties table
+ *
+ * \param [in] state the connection object
+ * \return a pointer to an amqp_table_t containing the properties advertised
+ * by the broker on connection. The connection object owns the table, it
+ * should not be modified.
*
- * \return The underlying socket descriptor.
+ * \since v0.5.0
*/
AMQP_PUBLIC_FUNCTION
-int
-AMQP_CALL
-amqp_socket_get_sockfd(amqp_socket_t *self);
+amqp_table_t *
+amqp_get_server_properties(amqp_connection_state_t state);
AMQP_END_DECLS
-#include <amqp_framing.h>
#endif /* AMQP_H */
diff --git a/librabbitmq/amqp_api.c b/librabbitmq/amqp_api.c
index 2f40681..1dd303e 100644
--- a/librabbitmq/amqp_api.c
+++ b/librabbitmq/amqp_api.c
@@ -327,3 +327,14 @@ int amqp_basic_reject(amqp_connection_state_t state,
req.requeue = requeue;
return amqp_send_method(state, channel, AMQP_BASIC_REJECT_METHOD, &req);
}
+
+int amqp_basic_nack(amqp_connection_state_t state, amqp_channel_t channel,
+ uint64_t delivery_tag, amqp_boolean_t multiple,
+ amqp_boolean_t requeue)
+{
+ amqp_basic_nack_t req;
+ req.delivery_tag = delivery_tag;
+ req.multiple = multiple;
+ req.requeue = requeue;
+ return amqp_send_method(state, channel, AMQP_BASIC_NACK_METHOD, &req);
+}
diff --git a/librabbitmq/amqp_connection.c b/librabbitmq/amqp_connection.c
index d5c29b0..5d70b07 100644
--- a/librabbitmq/amqp_connection.c
+++ b/librabbitmq/amqp_connection.c
@@ -91,6 +91,8 @@ amqp_connection_state_t amqp_new_connection(void)
goto out_nomem;
}
+ init_amqp_pool(&state->properties_pool, 512);
+
return state;
out_nomem:
@@ -107,20 +109,25 @@ int amqp_get_sockfd(amqp_connection_state_t state)
void amqp_set_sockfd(amqp_connection_state_t state,
int sockfd)
{
- amqp_socket_t *socket = amqp_tcp_socket_new();
+ amqp_socket_t *socket = amqp_tcp_socket_new(state);
if (!socket) {
amqp_abort("%s", strerror(errno));
}
amqp_tcp_socket_set_sockfd(socket, sockfd);
- amqp_set_socket(state, socket);
}
void amqp_set_socket(amqp_connection_state_t state, amqp_socket_t *socket)
{
- amqp_socket_close(state->socket);
+ amqp_socket_delete(state->socket);
state->socket = socket;
}
+amqp_socket_t *
+amqp_get_socket(amqp_connection_state_t state)
+{
+ return state->socket;
+}
+
int amqp_tune_connection(amqp_connection_state_t state,
int channel_max,
int frame_max,
@@ -175,7 +182,8 @@ int amqp_destroy_connection(amqp_connection_state_t state)
free(state->outbound_buffer.bytes);
free(state->sock_inbound_buffer.bytes);
- status = amqp_socket_close(state->socket);
+ amqp_socket_delete(state->socket);
+ empty_amqp_pool(&state->properties_pool);
free(state);
}
return status;
@@ -511,3 +519,8 @@ int amqp_send_frame(amqp_connection_state_t state,
return res;
}
+amqp_table_t *
+amqp_get_server_properties(amqp_connection_state_t state)
+{
+ return &state->server_properties;
+}
diff --git a/librabbitmq/amqp_consumer.c b/librabbitmq/amqp_consumer.c
new file mode 100644
index 0000000..6c6c1c9
--- /dev/null
+++ b/librabbitmq/amqp_consumer.c
@@ -0,0 +1,301 @@
+/* vim:set ft=c ts=2 sw=2 sts=2 et cindent: */
+/*
+ * ***** BEGIN LICENSE BLOCK *****
+ * Version: MIT
+ *
+ * Portions created by Alan Antonuk are Copyright (c) 2013
+ * Alan Antonuk. All Rights Reserved.
+ *
+ * Permission is hereby granted, free of charge, to any person
+ * obtaining a copy of this software and associated documentation
+ * files (the "Software"), to deal in the Software without
+ * restriction, including without limitation the rights to use, copy,
+ * modify, merge, publish, distribute, sublicense, and/or sell copies
+ * of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+ * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
+ * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
+ * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+ * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ * ***** END LICENSE BLOCK *****
+ */
+#include "amqp.h"
+#include "amqp_private.h"
+#include "amqp_socket.h"
+
+#include <stdlib.h>
+#include <string.h>
+
+static
+int amqp_basic_properties_clone(amqp_basic_properties_t *original,
+ amqp_basic_properties_t *clone,
+ amqp_pool_t *pool)
+{
+ memset(clone, 0, sizeof(amqp_basic_properties_t));
+ clone->_flags = original->_flags;
+
+#define CLONE_BYTES_POOL(original, clone, pool) \
+ if (0 == original.len) { \
+ clone = amqp_empty_bytes; \
+ } else { \
+ amqp_pool_alloc_bytes(pool, original.len, &clone); \
+ if (NULL == clone.bytes) { \
+ return AMQP_STATUS_NO_MEMORY; \
+ } \
+ memcpy(clone.bytes, original.bytes, clone.len); \
+ }
+
+ if (clone->_flags & AMQP_BASIC_CONTENT_TYPE_FLAG) {
+ CLONE_BYTES_POOL(original->content_type, clone->content_type, pool)
+ }
+
+ if (clone->_flags & AMQP_BASIC_CONTENT_ENCODING_FLAG) {
+ CLONE_BYTES_POOL(original->content_encoding, clone->content_encoding, pool)
+ }
+
+ if (clone->_flags & AMQP_BASIC_HEADERS_FLAG) {
+ int res = amqp_table_clone(&original->headers, &clone->headers, pool);
+ if (AMQP_STATUS_OK != res) {
+ return res;
+ }
+ }
+
+ if (clone->_flags & AMQP_BASIC_DELIVERY_MODE_FLAG) {
+ clone->delivery_mode = original->delivery_mode;
+ }
+
+ if (clone->_flags & AMQP_BASIC_PRIORITY_FLAG) {
+ clone->priority = original->priority;
+ }
+
+ if (clone->_flags & AMQP_BASIC_CORRELATION_ID_FLAG) {
+ CLONE_BYTES_POOL(original->correlation_id, clone->correlation_id, pool)
+ }
+
+ if (clone->_flags & AMQP_BASIC_REPLY_TO_FLAG) {
+ CLONE_BYTES_POOL(original->reply_to, clone->reply_to, pool)
+ }
+
+ if (clone->_flags & AMQP_BASIC_EXPIRATION_FLAG) {
+ CLONE_BYTES_POOL(original->expiration, clone->expiration, pool)
+ }
+
+ if (clone->_flags & AMQP_BASIC_MESSAGE_ID_FLAG) {
+ CLONE_BYTES_POOL(original->message_id, clone->message_id, pool)
+ }
+
+ if (clone->_flags & AMQP_BASIC_TIMESTAMP_FLAG) {
+ clone->timestamp = original->timestamp;
+ }
+
+ if (clone->_flags & AMQP_BASIC_TYPE_FLAG) {
+ CLONE_BYTES_POOL(original->type, clone->type, pool)
+ }
+
+ if (clone->_flags & AMQP_BASIC_USER_ID_FLAG) {
+ CLONE_BYTES_POOL(original->user_id, clone->user_id, pool)
+ }
+
+ if (clone->_flags & AMQP_BASIC_APP_ID_FLAG) {
+ CLONE_BYTES_POOL(original->app_id, clone->app_id, pool)
+ }
+
+ if (clone->_flags & AMQP_BASIC_CLUSTER_ID_FLAG) {
+ CLONE_BYTES_POOL(original->cluster_id, clone->cluster_id, pool)
+ }
+
+ return AMQP_STATUS_OK;
+#undef CLONE_BYTES_POOL
+}
+
+
+void amqp_destroy_message(amqp_message_t *message)
+{
+ empty_amqp_pool(&message->pool);
+ amqp_bytes_free(message->body);
+}
+
+void amqp_destroy_envelope(amqp_envelope_t *envelope)
+{
+ amqp_destroy_message(&envelope->message);
+ amqp_bytes_free(envelope->routing_key);
+ amqp_bytes_free(envelope->exchange);
+ amqp_bytes_free(envelope->consumer_tag);
+}
+
+
+amqp_rpc_reply_t
+amqp_consume_message(amqp_connection_state_t state, amqp_envelope_t *envelope,
+ struct timeval *timeout, AMQP_UNUSED int flags)
+{
+ int res;
+ amqp_frame_t frame;
+ amqp_basic_deliver_t *delivery_method;
+ amqp_rpc_reply_t ret;
+
+ memset(&ret, 0, sizeof(amqp_rpc_reply_t));
+ memset(envelope, 0, sizeof(amqp_envelope_t));
+
+ res = amqp_simple_wait_frame_noblock(state, &frame, timeout);
+ if (AMQP_STATUS_OK != res) {
+ ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
+ ret.library_error = res;
+ goto error_out1;
+ }
+
+ if (AMQP_FRAME_METHOD != frame.frame_type
+ || AMQP_BASIC_DELIVER_METHOD != frame.payload.method.id) {
+ amqp_put_back_frame(state, &frame);
+ ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
+ ret.library_error = AMQP_STATUS_UNEXPECTED_STATE;
+ goto error_out1;
+ }
+
+ delivery_method = frame.payload.method.decoded;
+
+ envelope->channel = frame.channel;
+ envelope->consumer_tag = amqp_bytes_malloc_dup(delivery_method->consumer_tag);
+ envelope->delivery_tag = delivery_method->delivery_tag;
+ envelope->redelivered = delivery_method->redelivered;
+ envelope->exchange = amqp_bytes_malloc_dup(delivery_method->exchange);
+ envelope->routing_key = amqp_bytes_malloc_dup(delivery_method->routing_key);
+
+ if (NULL == envelope->consumer_tag.bytes ||
+ NULL == envelope->exchange.bytes ||
+ NULL == envelope->routing_key.bytes) {
+ ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
+ ret.library_error = AMQP_STATUS_NO_MEMORY;
+ goto error_out2;
+ }
+
+ ret = amqp_read_message(state, envelope->channel, &envelope->message, 0);
+ if (AMQP_RESPONSE_NORMAL != ret.reply_type) {
+ goto error_out2;
+ }
+
+ ret.reply_type = AMQP_RESPONSE_NORMAL;
+ return ret;
+
+error_out2:
+ amqp_bytes_free(envelope->routing_key);
+ amqp_bytes_free(envelope->exchange);
+ amqp_bytes_free(envelope->consumer_tag);
+error_out1:
+ return ret;
+}
+
+amqp_rpc_reply_t amqp_read_message(amqp_connection_state_t state,
+ amqp_channel_t channel,
+ amqp_message_t *message,
+ AMQP_UNUSED int flags)
+{
+ amqp_frame_t frame;
+ amqp_rpc_reply_t ret;
+
+ size_t body_read;
+ char *body_read_ptr;
+ int res;
+
+ memset(&ret, 0, sizeof(amqp_rpc_reply_t));
+ memset(message, 0, sizeof(amqp_message_t));
+
+ res = amqp_simple_wait_frame_on_channel(state, channel, &frame);
+ if (AMQP_STATUS_OK != res) {
+ ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
+ ret.library_error = res;
+
+ goto error_out1;
+ }
+
+ if (AMQP_FRAME_HEADER != frame.frame_type) {
+ if (AMQP_FRAME_METHOD == frame.frame_type &&
+ (AMQP_CHANNEL_CLOSE_METHOD == frame.payload.method.id ||
+ AMQP_CONNECTION_CLOSE_METHOD == frame.payload.method.id)) {
+
+ ret.reply_type = AMQP_RESPONSE_SERVER_EXCEPTION;
+ ret.reply = frame.payload.method;
+
+ } else {
+ ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
+ ret.library_error = AMQP_STATUS_UNEXPECTED_STATE;
+
+ amqp_put_back_frame(state, &frame);
+ }
+ goto error_out1;
+ }
+
+ init_amqp_pool(&message->pool, 4096);
+ res = amqp_basic_properties_clone(frame.payload.properties.decoded,
+ &message->properties, &message->pool);
+
+ if (AMQP_STATUS_OK != res) {
+ ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
+ ret.library_error = res;
+ goto error_out3;
+ }
+
+ if (0 == frame.payload.properties.body_size) {
+ message->body = amqp_empty_bytes;
+ } else {
+ message->body = amqp_bytes_malloc(frame.payload.properties.body_size);
+ if (NULL == message->body.bytes) {
+ ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
+ ret.library_error = AMQP_STATUS_NO_MEMORY;
+ goto error_out1;
+ }
+ }
+
+ body_read = 0;
+ body_read_ptr = message->body.bytes;
+
+ while (body_read < message->body.len) {
+ res = amqp_simple_wait_frame_on_channel(state, channel, &frame);
+ if (AMQP_STATUS_OK != res) {
+ ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
+ ret.library_error = res;
+ goto error_out2;
+ }
+ if (AMQP_FRAME_BODY != frame.frame_type) {
+ if (AMQP_FRAME_METHOD == frame.frame_type &&
+ (AMQP_CHANNEL_CLOSE_METHOD == frame.payload.method.id ||
+ AMQP_CONNECTION_CLOSE_METHOD == frame.payload.method.id)) {
+
+ ret.reply_type = AMQP_RESPONSE_SERVER_EXCEPTION;
+ ret.reply = frame.payload.method;
+ } else {
+ ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
+ ret.library_error = AMQP_STATUS_BAD_AMQP_DATA;
+ }
+ goto error_out2;
+ }
+
+ if (body_read + frame.payload.body_fragment.len > message->body.len) {
+ ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
+ ret.library_error = AMQP_STATUS_BAD_AMQP_DATA;
+ goto error_out2;
+ }
+
+ memcpy(body_read_ptr, frame.payload.body_fragment.bytes, frame.payload.body_fragment.len);
+
+ body_read += frame.payload.body_fragment.len;
+ body_read_ptr += frame.payload.body_fragment.len;
+ }
+
+ ret.reply_type = AMQP_RESPONSE_NORMAL;
+ return ret;
+
+error_out2:
+ amqp_bytes_free(message->body);
+error_out3:
+ empty_amqp_pool(&message->pool);
+error_out1:
+ return ret;
+}
diff --git a/librabbitmq/amqp_cyassl.c b/librabbitmq/amqp_cyassl.c
index 89047d9..05ce12e 100644
--- a/librabbitmq/amqp_cyassl.c
+++ b/librabbitmq/amqp_cyassl.c
@@ -155,7 +155,7 @@ amqp_ssl_error_string(AMQP_UNUSED int err)
}
static int
-amqp_ssl_socket_open(void *base, const char *host, int port)
+amqp_ssl_socket_open(void *base, const char *host, int port, struct timeval *timeout)
{
struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
int status;
@@ -167,7 +167,7 @@ amqp_ssl_socket_open(void *base, const char *host, int port)
return -1;
}
- self->sockfd = amqp_open_socket(host, port);
+ self->sockfd = amqp_open_socket_noblock(host, port, timeout);
if (0 > self->sockfd) {
self->last_error = - self->sockfd;
return -1;
diff --git a/librabbitmq/amqp_framing.c b/librabbitmq/amqp_framing.c
index e065e48..b2f0a83 100644
--- a/librabbitmq/amqp_framing.c
+++ b/librabbitmq/amqp_framing.c
@@ -105,6 +105,8 @@ char const *amqp_method_name(amqp_method_number_t methodNumber) {
case AMQP_CONNECTION_OPEN_OK_METHOD: return "AMQP_CONNECTION_OPEN_OK_METHOD";
case AMQP_CONNECTION_CLOSE_METHOD: return "AMQP_CONNECTION_CLOSE_METHOD";
case AMQP_CONNECTION_CLOSE_OK_METHOD: return "AMQP_CONNECTION_CLOSE_OK_METHOD";
+ case AMQP_CONNECTION_BLOCKED_METHOD: return "AMQP_CONNECTION_BLOCKED_METHOD";
+ case AMQP_CONNECTION_UNBLOCKED_METHOD: return "AMQP_CONNECTION_UNBLOCKED_METHOD";
case AMQP_CHANNEL_OPEN_METHOD: return "AMQP_CHANNEL_OPEN_METHOD";
case AMQP_CHANNEL_OPEN_OK_METHOD: return "AMQP_CHANNEL_OPEN_OK_METHOD";
case AMQP_CHANNEL_FLOW_METHOD: return "AMQP_CHANNEL_FLOW_METHOD";
@@ -326,6 +328,23 @@ int amqp_decode_method(amqp_method_number_t methodNumber,
*decoded = m;
return 0;
}
+ case AMQP_CONNECTION_BLOCKED_METHOD: {
+ amqp_connection_blocked_t *m = (amqp_connection_blocked_t *) amqp_pool_alloc(pool, sizeof(amqp_connection_blocked_t));
+ if (m == NULL) { return AMQP_STATUS_NO_MEMORY; }
+ {
+ uint8_t len;
+ if (!amqp_decode_8(encoded, &offset, &len)
+ || !amqp_decode_bytes(encoded, &offset, &m->reason, len))
+ return AMQP_STATUS_BAD_AMQP_DATA;
+ }
+ *decoded = m;
+ return 0;
+ }
+ case AMQP_CONNECTION_UNBLOCKED_METHOD: {
+ amqp_connection_unblocked_t *m = NULL; /* no fields */
+ *decoded = m;
+ return 0;
+ }
case AMQP_CHANNEL_OPEN_METHOD: {
amqp_channel_open_t *m = (amqp_channel_open_t *) amqp_pool_alloc(pool, sizeof(amqp_channel_open_t));
if (m == NULL) { return AMQP_STATUS_NO_MEMORY; }
@@ -1267,6 +1286,16 @@ int amqp_encode_method(amqp_method_number_t methodNumber,
case AMQP_CONNECTION_CLOSE_OK_METHOD: {
return offset;
}
+ case AMQP_CONNECTION_BLOCKED_METHOD: {
+ amqp_connection_blocked_t *m = (amqp_connection_blocked_t *) decoded;
+ if (!amqp_encode_8(encoded, &offset, m->reason.len)
+ || !amqp_encode_bytes(encoded, &offset, m->reason))
+ return AMQP_STATUS_BAD_AMQP_DATA;
+ return offset;
+ }
+ case AMQP_CONNECTION_UNBLOCKED_METHOD: {
+ return offset;
+ }
case AMQP_CHANNEL_OPEN_METHOD: {
amqp_channel_open_t *m = (amqp_channel_open_t *) decoded;
if (!amqp_encode_8(encoded, &offset, m->out_of_band.len)
diff --git a/librabbitmq/amqp_framing.h b/librabbitmq/amqp_framing.h
index f6dafe4..0c04cb8 100644
--- a/librabbitmq/amqp_framing.h
+++ b/librabbitmq/amqp_framing.h
@@ -183,6 +183,16 @@ typedef struct amqp_connection_close_ok_t_ {
char dummy; /* Dummy field to avoid empty struct */
} amqp_connection_close_ok_t;
+#define AMQP_CONNECTION_BLOCKED_METHOD ((amqp_method_number_t) 0x000A003C) /* 10, 60; 655420 */
+typedef struct amqp_connection_blocked_t_ {
+ amqp_bytes_t reason;
+} amqp_connection_blocked_t;
+
+#define AMQP_CONNECTION_UNBLOCKED_METHOD ((amqp_method_number_t) 0x000A003D) /* 10, 61; 655421 */
+typedef struct amqp_connection_unblocked_t_ {
+ char dummy; /* Dummy field to avoid empty struct */
+} amqp_connection_unblocked_t;
+
#define AMQP_CHANNEL_OPEN_METHOD ((amqp_method_number_t) 0x0014000A) /* 20, 10; 1310730 */
typedef struct amqp_channel_open_t_ {
amqp_bytes_t out_of_band;
diff --git a/librabbitmq/amqp_gnutls.c b/librabbitmq/amqp_gnutls.c
index 734643c..f18d427 100644
--- a/librabbitmq/amqp_gnutls.c
+++ b/librabbitmq/amqp_gnutls.c
@@ -119,7 +119,7 @@ amqp_ssl_socket_recv(void *base,
}
static int
-amqp_ssl_socket_open(void *base, const char *host, int port)
+amqp_ssl_socket_open(void *base, const char *host, int port, struct timeval *timeout)
{
struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
int status;
@@ -132,7 +132,7 @@ amqp_ssl_socket_open(void *base, const char *host, int port)
return -1;
}
- self->sockfd = amqp_open_socket(host, port);
+ self->sockfd = amqp_open_socket_noblock(host, port, timeout);
if (0 > self->sockfd) {
self->last_error = -self->sockfd;
return -1;
diff --git a/librabbitmq/amqp_hostcheck.c b/librabbitmq/amqp_hostcheck.c
new file mode 100644
index 0000000..8ad2cf7
--- /dev/null
+++ b/librabbitmq/amqp_hostcheck.c
@@ -0,0 +1,201 @@
+/* vim:set ft=c ts=2 sw=2 sts=2 et cindent: */
+/*
+ * Copyright 1996-2014 Daniel Stenberg <daniel@haxx.se>.
+ * Copyright 2014 Michael Steinert
+ *
+ * All rights reserved.
+ *
+ * Permission to use, copy, modify, and distribute this software for any
+ * purpose with or without fee is hereby granted, provided that the above
+ * copyright notice and this permission notice appear in all copies.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT OF THIRD PARTY RIGHTS.
+ * IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
+ * DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
+ * OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
+ * USE OR OTHER DEALINGS IN THE SOFTWARE.
+ *
+ * Except as contained in this notice, the name of a copyright holder shall
+ * not be used in advertising or otherwise to promote the sale, use or other
+ * dealings in this Software without prior written authorization of the
+ * copyright holder.
+ */
+
+#include "amqp_private.h"
+
+#include <string.h>
+
+/* Portable, consistent toupper (remember EBCDIC). Do not use toupper()
+ * because its behavior is altered by the current locale.
+ */
+
+static char
+amqp_raw_toupper(char in)
+{
+ switch (in) {
+ case 'a':
+ return 'A';
+ case 'b':
+ return 'B';
+ case 'c':
+ return 'C';
+ case 'd':
+ return 'D';
+ case 'e':
+ return 'E';
+ case 'f':
+ return 'F';
+ case 'g':
+ return 'G';
+ case 'h':
+ return 'H';
+ case 'i':
+ return 'I';
+ case 'j':
+ return 'J';
+ case 'k':
+ return 'K';
+ case 'l':
+ return 'L';
+ case 'm':
+ return 'M';
+ case 'n':
+ return 'N';
+ case 'o':
+ return 'O';
+ case 'p':
+ return 'P';
+ case 'q':
+ return 'Q';
+ case 'r':
+ return 'R';
+ case 's':
+ return 'S';
+ case 't':
+ return 'T';
+ case 'u':
+ return 'U';
+ case 'v':
+ return 'V';
+ case 'w':
+ return 'W';
+ case 'x':
+ return 'X';
+ case 'y':
+ return 'Y';
+ case 'z':
+ return 'Z';
+ }
+ return in;
+}
+
+/*
+ * amqp_raw_equal() is for doing "raw" case insensitive strings. This is meant
+ * to be locale independent and only compare strings we know are safe for
+ * this. See http://daniel.haxx.se/blog/2008/10/15/strcasecmp-in-turkish/ for
+ * some further explanation to why this function is necessary.
+ *
+ * The function is capable of comparing a-z case insensitively even for
+ * non-ascii.
+ */
+
+static int
+amqp_raw_equal(const char *first, const char *second)
+{
+ while (*first && *second) {
+ if (amqp_raw_toupper(*first) != amqp_raw_toupper(*second)) {
+ /* get out of the loop as soon as they don't match */
+ break;
+ }
+ first++;
+ second++;
+ }
+ /* we do the comparison here (possibly again), just to make sure that if
+ * the loop above is skipped because one of the strings reached zero, we
+ * must not return this as a successful match
+ */
+ return (amqp_raw_toupper(*first) == amqp_raw_toupper(*second));
+}
+
+static int
+amqp_raw_nequal(const char *first, const char *second, size_t max)
+{
+ while (*first && *second && max) {
+ if (amqp_raw_toupper(*first) != amqp_raw_toupper(*second)) {
+ break;
+ }
+ max--;
+ first++;
+ second++;
+ }
+ if (0 == max) {
+ return 1; /* they are equal this far */
+ }
+ return amqp_raw_toupper(*first) == amqp_raw_toupper(*second);
+}
+
+/*
+ * Match a hostname against a wildcard pattern.
+ * E.g.
+ * "foo.host.com" matches "*.host.com".
+ *
+ * We use the matching rule described in RFC6125, section 6.4.3.
+ * http://tools.ietf.org/html/rfc6125#section-6.4.3
+ */
+
+static int
+amqp_hostmatch(const char *hostname, const char *pattern)
+{
+ const char *pattern_label_end, *pattern_wildcard, *hostname_label_end;
+ int wildcard_enabled;
+ size_t prefixlen, suffixlen;
+ pattern_wildcard = strchr(pattern, '*');
+ if (pattern_wildcard == NULL) {
+ return amqp_raw_equal(pattern, hostname) ? 1 : 0;
+ }
+ /* We require at least 2 dots in pattern to avoid too wide wildcard match. */
+ wildcard_enabled = 1;
+ pattern_label_end = strchr(pattern, '.');
+ if (pattern_label_end == NULL ||
+ strchr(pattern_label_end + 1, '.') == NULL ||
+ pattern_wildcard > pattern_label_end ||
+ amqp_raw_nequal(pattern, "xn--", 4)) {
+ wildcard_enabled = 0;
+ }
+ if (!wildcard_enabled) {
+ return amqp_raw_equal(pattern, hostname) ? 1 : 0;
+ }
+ hostname_label_end = strchr(hostname, '.');
+ if (hostname_label_end == NULL ||
+ !amqp_raw_equal(pattern_label_end, hostname_label_end)) {
+ return 0;
+ }
+ /* The wildcard must match at least one character, so the left-most
+ * label of the hostname is at least as large as the left-most label
+ * of the pattern.
+ */
+ if (hostname_label_end - hostname < pattern_label_end - pattern) {
+ return 0;
+ }
+ prefixlen = pattern_wildcard - pattern;
+ suffixlen = pattern_label_end - (pattern_wildcard + 1);
+ return amqp_raw_nequal(pattern, hostname, prefixlen) &&
+ amqp_raw_nequal(pattern_wildcard + 1, hostname_label_end - suffixlen,
+ suffixlen) ? 1 : 0;
+}
+
+int
+amqp_hostcheck(const char *match_pattern, const char *hostname)
+{
+ /* sanity check */
+ if (!match_pattern || !*match_pattern || !hostname || !*hostname) {
+ return 0;
+ }
+ /* trivial case */
+ if (amqp_raw_equal(hostname, match_pattern)) {
+ return 1;
+ }
+ return amqp_hostmatch(hostname, match_pattern);
+}
diff --git a/librabbitmq/amqp_hostcheck.h b/librabbitmq/amqp_hostcheck.h
new file mode 100644
index 0000000..2832933
--- /dev/null
+++ b/librabbitmq/amqp_hostcheck.h
@@ -0,0 +1,36 @@
+/* vim:set ft=c ts=2 sw=2 sts=2 et cindent: */
+#ifndef librabbitmq_amqp_hostcheck_h
+#define librabbitmq_amqp_hostcheck_h
+
+/*
+ * Copyright 1996-2014 Daniel Stenberg <daniel@haxx.se>.
+ * Copyright 2014 Michael Steinert
+ *
+ * All rights reserved.
+ *
+ * Permission to use, copy, modify, and distribute this software for any
+ * purpose with or without fee is hereby granted, provided that the above
+ * copyright notice and this permission notice appear in all copies.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT OF THIRD PARTY RIGHTS.
+ * IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
+ * DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
+ * OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
+ * USE OR OTHER DEALINGS IN THE SOFTWARE.
+ *
+ * Except as contained in this notice, the name of a copyright holder shall
+ * not be used in advertising or otherwise to promote the sale, use or other
+ * dealings in this Software without prior written authorization of the
+ * copyright holder.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+int
+amqp_hostcheck(const char *match_pattern, const char *hostname);
+
+#endif
diff --git a/librabbitmq/amqp_mem.c b/librabbitmq/amqp_mem.c
index 88b1e9f..586117e 100644
--- a/librabbitmq/amqp_mem.c
+++ b/librabbitmq/amqp_mem.c
@@ -48,7 +48,12 @@
char const *amqp_version(void)
{
- return VERSION; /* defined in config.h */
+ return AMQP_VERSION_STRING;
+}
+
+uint32_t amqp_version_number(void)
+{
+ return AMQP_VERSION;
}
void init_amqp_pool(amqp_pool_t *pool, size_t pagesize)
diff --git a/librabbitmq/amqp_openssl.c b/librabbitmq/amqp_openssl.c
index 0f6c12c..ab8a94e 100644
--- a/librabbitmq/amqp_openssl.c
+++ b/librabbitmq/amqp_openssl.c
@@ -25,8 +25,13 @@
#include "config.h"
#endif
+#if defined(__APPLE__) && defined(__MACH__)
+# define MAC_OS_X_VERSION_MIN_REQUIRED MAC_OS_X_VERSION_10_6
+#endif
+
#include "amqp_ssl_socket.h"
#include "amqp_socket.h"
+#include "amqp_hostcheck.h"
#include "amqp_private.h"
#include "threads.h"
@@ -210,15 +215,9 @@ amqp_ssl_socket_verify_hostname(void *base, const char *host)
goto error;
}
}
-#ifdef _MSC_VER
-#define strcasecmp _stricmp
-#endif
- if (strcasecmp(host, (char *)utf8_value)) {
+ if (!amqp_hostcheck((char *)utf8_value, host)) {
goto error;
}
-#ifdef _MSC_VER
-#undef strcasecmp
-#endif
exit:
OPENSSL_free(utf8_value);
return status;
@@ -228,7 +227,7 @@ error:
}
static int
-amqp_ssl_socket_open(void *base, const char *host, int port)
+amqp_ssl_socket_open(void *base, const char *host, int port, struct timeval *timeout)
{
struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
long result;
@@ -243,7 +242,7 @@ amqp_ssl_socket_open(void *base, const char *host, int port)
}
SSL_set_mode(self->ssl, SSL_MODE_AUTO_RETRY);
- self->sockfd = amqp_open_socket(host, port);
+ self->sockfd = amqp_open_socket_noblock(host, port, timeout);
if (0 > self->sockfd) {
status = self->sockfd;
self->internal_error = amqp_os_socket_error();
@@ -293,6 +292,7 @@ error_out2:
self->sockfd = -1;
error_out1:
SSL_free(self->ssl);
+ self->ssl = NULL;
goto exit;
}
@@ -300,28 +300,22 @@ static int
amqp_ssl_socket_close(void *base)
{
struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
- if (self) {
+
+ if (self->ssl) {
+ SSL_shutdown(self->ssl);
SSL_free(self->ssl);
- amqp_os_socket_close(self->sockfd);
- SSL_CTX_free(self->ctx);
- free(self->buffer);
- free(self);
+ self->ssl = NULL;
}
- destroy_openssl();
- return 0;
-}
-static int
-amqp_ssl_socket_error(void *base)
-{
- struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
- return self->internal_error;
-}
+ if (-1 != self->sockfd) {
+ if (amqp_os_socket_close(self->sockfd)) {
+ return AMQP_STATUS_SOCKET_ERROR;
+ }
-char *
-amqp_ssl_error_string(AMQP_UNUSED int err)
-{
- return strdup("A ssl socket error occurred.");
+ self->sockfd = -1;
+ }
+
+ return AMQP_STATUS_OK;
}
static int
@@ -331,37 +325,59 @@ amqp_ssl_socket_get_sockfd(void *base)
return self->sockfd;
}
+static void
+amqp_ssl_socket_delete(void *base)
+{
+ struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
+
+ if (self) {
+ amqp_ssl_socket_close(self);
+
+ SSL_CTX_free(self->ctx);
+ free(self->buffer);
+ free(self);
+ }
+ destroy_openssl();
+}
+
static const struct amqp_socket_class_t amqp_ssl_socket_class = {
amqp_ssl_socket_writev, /* writev */
amqp_ssl_socket_send, /* send */
amqp_ssl_socket_recv, /* recv */
amqp_ssl_socket_open, /* open */
amqp_ssl_socket_close, /* close */
- amqp_ssl_socket_error, /* error */
- amqp_ssl_socket_get_sockfd /* get_sockfd */
+ amqp_ssl_socket_get_sockfd, /* get_sockfd */
+ amqp_ssl_socket_delete /* delete */
};
amqp_socket_t *
-amqp_ssl_socket_new(void)
+amqp_ssl_socket_new(amqp_connection_state_t state)
{
struct amqp_ssl_socket_t *self = calloc(1, sizeof(*self));
int status;
if (!self) {
- goto error;
+ return NULL;
}
+
+ self->sockfd = -1;
+ self->klass = &amqp_ssl_socket_class;
+ self->verify = 1;
+
status = initialize_openssl();
if (status) {
goto error;
}
+
self->ctx = SSL_CTX_new(SSLv23_client_method());
if (!self->ctx) {
goto error;
}
- self->klass = &amqp_ssl_socket_class;
- self->verify = 1;
+
+ amqp_set_socket(state, (amqp_socket_t *)self);
+
return (amqp_socket_t *)self;
error:
- amqp_socket_close((amqp_socket_t *)self);
+ amqp_ssl_socket_delete((amqp_socket_t *)self);
return NULL;
}
@@ -518,6 +534,7 @@ amqp_ssl_locking_callback(int mode, int n,
static int
initialize_openssl(void)
{
+#ifdef ENABLE_THREAD_SAFETY
#ifdef _WIN32
/* No such thing as PTHREAD_INITIALIZE_MUTEX macro on Win32, so we use this */
if (NULL == openssl_init_mutex) {
@@ -533,7 +550,6 @@ initialize_openssl(void)
}
#endif /* _WIN32 */
-#ifdef ENABLE_THREAD_SAFETY
if (pthread_mutex_lock(&openssl_init_mutex)) {
return -1;
}
diff --git a/librabbitmq/amqp_polarssl.c b/librabbitmq/amqp_polarssl.c
index 770fdbe..bae3141 100644
--- a/librabbitmq/amqp_polarssl.c
+++ b/librabbitmq/amqp_polarssl.c
@@ -128,12 +128,20 @@ amqp_ssl_socket_recv(void *base,
}
static int
-amqp_ssl_socket_open(void *base, const char *host, int port)
+amqp_ssl_socket_open(void *base, const char *host, int port, struct timeval *timeout)
{
int status;
struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
self->last_error = 0;
+ if (timeout && (timeout->tv_sec != 0 || timeout->tv_usec != 0)) {
+ /* We don't support PolarSSL for now because it uses its own connect() wrapper
+ * It is not too hard to implement net_connect() with noblock support,
+ * but then we will have to maintain that piece of code and keep it synced with main PolarSSL code base
+ */
+ return AMQP_STATUS_INVALID_PARAMETER;
+ }
+
status = net_connect(&self->sockfd, host, port);
if (status) {
/* This isn't quite right. We should probably translate between
diff --git a/librabbitmq/amqp_private.h b/librabbitmq/amqp_private.h
index 7192283..afe182d 100644
--- a/librabbitmq/amqp_private.h
+++ b/librabbitmq/amqp_private.h
@@ -60,6 +60,14 @@
#endif
#ifdef _WIN32
+# ifndef WINVER
+/* WINVER 0x0502 is WinXP SP2+, Windows Server 2003 SP1+
+ * See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa383745(v=vs.85).aspx#macros_for_conditional_declarations */
+# define WINVER 0x0502
+# endif
+# ifndef WIN32_LEAN_AND_MEAN
+# define WIN32_LEAN_AND_MEAN
+# endif
# include <Winsock2.h>
#else
# include <arpa/inet.h>
@@ -175,6 +183,9 @@ struct amqp_connection_state_t_ {
uint64_t next_recv_heartbeat;
uint64_t next_send_heartbeat;
+
+ amqp_table_t server_properties;
+ amqp_pool_t properties_pool;
};
amqp_pool_t *amqp_get_or_create_channel_pool(amqp_connection_state_t connection, amqp_channel_t channel);
diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c
index 441192a..79a7696 100644
--- a/librabbitmq/amqp_socket.c
+++ b/librabbitmq/amqp_socket.c
@@ -133,6 +133,39 @@ amqp_os_socket_setsockopt(int sock, int level, int optname,
#endif
}
+static int
+amqp_os_socket_setsockblock(int sock, int block)
+{
+
+#ifdef _WIN32
+ int nonblock = !block;
+ if (NO_ERROR != ioctlsocket(sock, FIONBIO, &nonblock)) {
+ return AMQP_STATUS_SOCKET_ERROR;
+ } else {
+ return AMQP_STATUS_OK;
+ }
+#else
+ long arg;
+
+ if ((arg = fcntl(sock, F_GETFL, NULL)) < 0) {
+ return AMQP_STATUS_SOCKET_ERROR;
+ }
+
+ if (block) {
+ arg &= (~O_NONBLOCK);
+ } else {
+ arg |= O_NONBLOCK;
+ }
+
+ if (fcntl(sock, F_SETFL, arg) < 0) {
+ return AMQP_STATUS_SOCKET_ERROR;
+ }
+
+ return AMQP_STATUS_OK;
+#endif
+}
+
+
int
amqp_os_socket_error(void)
{
@@ -182,25 +215,32 @@ amqp_socket_open(amqp_socket_t *self, const char *host, int port)
{
assert(self);
assert(self->klass->open);
- return self->klass->open(self, host, port);
+ return self->klass->open(self, host, port, NULL);
}
int
-amqp_socket_close(amqp_socket_t *self)
+amqp_socket_open_noblock(amqp_socket_t *self, const char *host, int port, struct timeval *timeout)
{
- if (self) {
- assert(self->klass->close);
- return self->klass->close(self);
- }
- return AMQP_STATUS_OK;
+ assert(self);
+ assert(self->klass->open);
+ return self->klass->open(self, host, port, timeout);
}
int
-amqp_socket_error(amqp_socket_t *self)
+amqp_socket_close(amqp_socket_t *self)
{
assert(self);
- assert(self->klass->error);
- return self->klass->error(self);
+ assert(self->klass->close);
+ return self->klass->close(self);
+}
+
+void
+amqp_socket_delete(amqp_socket_t *self)
+{
+ if (self) {
+ assert(self->klass->delete);
+ self->klass->delete(self);
+ }
}
int
@@ -211,8 +251,16 @@ amqp_socket_get_sockfd(amqp_socket_t *self)
return self->klass->get_sockfd(self);
}
-int amqp_open_socket(char const *hostname,
- int portnumber)
+int
+amqp_open_socket(char const *hostname,
+ int portnumber)
+{
+ return amqp_open_socket_noblock(hostname, portnumber, NULL);
+}
+
+int amqp_open_socket_noblock(char const *hostname,
+ int portnumber,
+ struct timeval *timeout)
{
struct addrinfo hint;
struct addrinfo *address_list;
@@ -221,6 +269,15 @@ int amqp_open_socket(char const *hostname,
int sockfd = -1;
int last_error = AMQP_STATUS_OK;
int one = 1; /* for setsockopt */
+ int res;
+ int timer_error;
+ amqp_timer_t timer;
+
+ AMQP_INIT_TIMER(timer)
+
+ if (timeout && (timeout->tv_sec < 0 || timeout->tv_usec < 0)) {
+ return AMQP_STATUS_INVALID_PARAMETER;
+ }
last_error = amqp_os_socket_init();
if (AMQP_STATUS_OK != last_error) {
@@ -241,31 +298,147 @@ int amqp_open_socket(char const *hostname,
}
for (addr = address_list; addr; addr = addr->ai_next) {
+ if (-1 != sockfd) {
+ amqp_os_socket_close(sockfd);
+ sockfd = -1;
+ }
+
sockfd = amqp_os_socket_socket(addr->ai_family, addr->ai_socktype, addr->ai_protocol);
+
if (-1 == sockfd) {
last_error = AMQP_STATUS_SOCKET_ERROR;
continue;
}
+
#ifdef SO_NOSIGPIPE
if (0 != amqp_os_socket_setsockopt(sockfd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one))) {
last_error = AMQP_STATUS_SOCKET_ERROR;
- amqp_os_socket_close(sockfd);
continue;
}
#endif /* SO_NOSIGPIPE */
- if (0 != amqp_os_socket_setsockopt(sockfd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one))
- || 0 != connect(sockfd, addr->ai_addr, addr->ai_addrlen)) {
+
+ if (0 != amqp_os_socket_setsockopt(sockfd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one))) {
last_error = AMQP_STATUS_SOCKET_ERROR;
- amqp_os_socket_close(sockfd);
continue;
+ }
+
+ if (timeout) {
+ /* Trying to connect with timeout, set socket to non-blocking mode */
+ if (AMQP_STATUS_OK != amqp_os_socket_setsockblock(sockfd, 0)) {
+ last_error = AMQP_STATUS_SOCKET_ERROR;
+ continue;
+ }
+
+ res = connect(sockfd, addr->ai_addr, addr->ai_addrlen);
+
+ if (0 == res) {
+ /* Connected immediately, set to blocking mode again */
+ if (AMQP_STATUS_OK != amqp_os_socket_setsockblock(sockfd, 1)) {
+ last_error = AMQP_STATUS_SOCKET_ERROR;
+ continue;
+ }
+
+ last_error = AMQP_STATUS_OK;
+ break;
+ }
+
+#ifdef _WIN32
+ if (WSAEWOULDBLOCK == amqp_os_socket_error()) {
+#else
+ if (EINPROGRESS == amqp_os_socket_error()) {
+#endif
+
+ while(1) {
+ fd_set write_fd;
+ fd_set except_fd;
+
+ FD_ZERO(&write_fd);
+ FD_SET(sockfd, &write_fd);
+
+ FD_ZERO(&except_fd);
+ FD_SET(sockfd, &except_fd);
+
+ timer_error = amqp_timer_update(&timer, timeout);
+
+ if (timer_error < 0) {
+ last_error = timer_error;
+ break;
+ }
+
+ /* Win32 requires except_fds to be passed to detect connection
+ * failure. Other platforms only need write_fds, passing except_fds
+ * seems to be harmless otherwise
+ */
+ res = select(sockfd+1, NULL, &write_fd, &except_fd, &timer.tv);
+
+ if (res > 0) {
+ int result;
+ socklen_t result_len = sizeof(result);
+
+ if (getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &result, &result_len) < 0) {
+ last_error = AMQP_STATUS_SOCKET_ERROR;
+ break;
+ }
+
+ if (result != 0) {
+ last_error = AMQP_STATUS_SOCKET_ERROR;
+ break;
+ }
+
+ /* socket is ready to be written to, set to blocking mode again */
+ if (AMQP_STATUS_OK != amqp_os_socket_setsockblock(sockfd, 1)) {
+ last_error = AMQP_STATUS_SOCKET_ERROR;
+ continue;
+ }
+
+ last_error = AMQP_STATUS_OK;
+ break;
+ } else if (0 == res) {
+ /* Timed out - return */
+ last_error = AMQP_STATUS_TIMEOUT;
+ break;
+ } else if (errno == EINTR) {
+ /* Try again */
+ continue;
+ } else {
+ /* Error connecting */
+ last_error = AMQP_STATUS_SOCKET_ERROR;
+ break;
+ }
+ } /* end while(1) loop */
+
+ if (last_error == AMQP_STATUS_OK
+ || last_error == AMQP_STATUS_TIMEOUT
+ || last_error == AMQP_STATUS_TIMER_FAILURE) {
+ /* Exit for loop on timer errors or when connection established */
+ break;
+ }
+
+ } else {
+ /* Error connecting */
+ last_error = AMQP_STATUS_SOCKET_ERROR;
+ break;
+
+ }
+
} else {
- last_error = AMQP_STATUS_OK;
- break;
+ /* Connect in blocking mode */
+ if (0 != connect(sockfd, addr->ai_addr, addr->ai_addrlen)) {
+ last_error = AMQP_STATUS_SOCKET_ERROR;
+ continue;
+ } else {
+ last_error = AMQP_STATUS_OK;
+ break;
+ }
}
}
freeaddrinfo(address_list);
if (last_error != AMQP_STATUS_OK) {
+ if (-1 != sockfd) {
+ amqp_os_socket_close(sockfd);
+ }
+
return last_error;
}
@@ -404,8 +577,9 @@ static int recv_with_timeout(amqp_connection_state_t state, uint64_t start, stru
if (0 == current_timestamp) {
return AMQP_STATUS_TIMER_FAILURE;
}
- end_timestamp = start + timeout->tv_sec * AMQP_NS_PER_S +
- timeout->tv_usec * AMQP_NS_PER_US;
+ end_timestamp = start +
+ (uint64_t)timeout->tv_sec * AMQP_NS_PER_S +
+ (uint64_t)timeout->tv_usec * AMQP_NS_PER_US;
if (current_timestamp > end_timestamp) {
return AMQP_STATUS_TIMEOUT;
}
@@ -526,9 +700,6 @@ static int wait_frame_inner(amqp_connection_state_t state,
/* Complete frame was read. Return it. */
return AMQP_STATUS_OK;
}
-
- /* Incomplete or ignored frame. Keep processing input. */
- assert(res != 0);
}
beginrecv:
@@ -559,8 +730,8 @@ beginrecv:
if (timeout) {
if (0 == timeout_timestamp) {
timeout_timestamp = current_timestamp +
- timeout->tv_sec * AMQP_NS_PER_S +
- timeout->tv_usec * AMQP_NS_PER_US;
+ (uint64_t)timeout->tv_sec * AMQP_NS_PER_S +
+ (uint64_t)timeout->tv_usec * AMQP_NS_PER_US;
}
if (current_timestamp > timeout_timestamp) {
@@ -600,7 +771,6 @@ beginrecv:
if (AMQP_STATUS_TIMEOUT == res) {
if (next_timestamp == state->next_recv_heartbeat) {
amqp_socket_close(state->socket);
- state->socket = NULL;
return AMQP_STATUS_HEARTBEAT_TIMEOUT;
} else if (next_timestamp == timeout_timestamp) {
return AMQP_STATUS_TIMEOUT;
@@ -616,6 +786,109 @@ beginrecv:
}
}
+static amqp_link_t * amqp_create_link_for_frame(amqp_connection_state_t state, amqp_frame_t *frame)
+{
+ amqp_link_t *link;
+ amqp_frame_t *frame_copy;
+
+ amqp_pool_t *channel_pool = amqp_get_or_create_channel_pool(state, frame->channel);
+
+ if (NULL == channel_pool) {
+ return NULL;
+ }
+
+ link = amqp_pool_alloc(channel_pool, sizeof(amqp_link_t));
+ frame_copy = amqp_pool_alloc(channel_pool, sizeof(amqp_frame_t));
+
+ if (NULL == link || NULL == frame_copy) {
+ return NULL;
+ }
+
+ *frame_copy = *frame;
+ link->data = frame_copy;
+
+ return link;
+}
+
+int amqp_queue_frame(amqp_connection_state_t state, amqp_frame_t *frame)
+{
+ amqp_link_t *link = amqp_create_link_for_frame(state, frame);
+ if (NULL == link) {
+ return AMQP_STATUS_NO_MEMORY;
+ }
+
+ if (NULL == state->first_queued_frame) {
+ state->first_queued_frame = link;
+ } else {
+ state->last_queued_frame->next = link;
+ }
+
+ link->next = NULL;
+ state->last_queued_frame = link;
+
+ return AMQP_STATUS_OK;
+}
+
+int amqp_put_back_frame(amqp_connection_state_t state, amqp_frame_t *frame)
+{
+ amqp_link_t *link = amqp_create_link_for_frame(state, frame);
+ if (NULL == link) {
+ return AMQP_STATUS_NO_MEMORY;
+ }
+
+ if (NULL == state->first_queued_frame) {
+ state->first_queued_frame = link;
+ state->last_queued_frame = link;
+ link->next = NULL;
+ } else {
+ link->next = state->first_queued_frame;
+ state->first_queued_frame = link;
+ }
+
+ return AMQP_STATUS_OK;
+}
+
+int amqp_simple_wait_frame_on_channel(amqp_connection_state_t state,
+ amqp_channel_t channel,
+ amqp_frame_t *decoded_frame)
+{
+ amqp_frame_t *frame_ptr;
+ amqp_link_t *cur;
+ int res;
+
+ for (cur = state->first_queued_frame; NULL != cur; cur = cur->next) {
+ frame_ptr = cur->data;
+
+ if (channel == frame_ptr->channel) {
+ state->first_queued_frame = cur->next;
+ if (NULL == state->first_queued_frame) {
+ state->last_queued_frame = NULL;
+ }
+
+ *decoded_frame = *frame_ptr;
+
+ return AMQP_STATUS_OK;
+ }
+ }
+
+ while (1) {
+ res = wait_frame_inner(state, decoded_frame, NULL);
+
+ if (AMQP_STATUS_OK != res) {
+ return res;
+ }
+
+ if (channel == decoded_frame->channel) {
+ return AMQP_STATUS_OK;
+ } else {
+ res = amqp_queue_frame(state, decoded_frame);
+ if (res != AMQP_STATUS_OK) {
+ return res;
+ }
+ }
+ }
+}
+
int amqp_simple_wait_frame(amqp_connection_state_t state,
amqp_frame_t *decoded_frame)
{
@@ -654,7 +927,6 @@ int amqp_simple_wait_method(amqp_connection_state_t state,
|| frame.frame_type != AMQP_FRAME_METHOD
|| frame.payload.method.id != expected_method) {
amqp_socket_close(state->socket);
- state->socket = NULL;
return AMQP_STATUS_WRONG_METHOD;
}
*output = frame.payload.method;
@@ -859,6 +1131,13 @@ static amqp_rpc_reply_t amqp_login_inner(amqp_connection_state_t state,
goto error_res;
}
+ res = amqp_table_clone(&s->server_properties, &state->server_properties,
+ &state->properties_pool);
+
+ if (AMQP_STATUS_OK != res) {
+ goto error_res;
+ }
+
/* TODO: check that our chosen SASL mechanism is in the list of
acceptable mechanisms. Or even let the application choose from
the list! */
diff --git a/librabbitmq/amqp_socket.h b/librabbitmq/amqp_socket.h
index 52815fe..bdeea63 100644
--- a/librabbitmq/amqp_socket.h
+++ b/librabbitmq/amqp_socket.h
@@ -28,11 +28,7 @@
#ifndef AMQP_SOCKET_H
#define AMQP_SOCKET_H
-#include "amqp.h"
-
-#ifdef _WIN32
-# include <WinSock2.h>
-#endif
+#include "amqp_private.h"
AMQP_BEGIN_DECLS
@@ -46,10 +42,10 @@ amqp_os_socket_close(int sockfd);
typedef ssize_t (*amqp_socket_writev_fn)(void *, struct iovec *, int);
typedef ssize_t (*amqp_socket_send_fn)(void *, const void *, size_t);
typedef ssize_t (*amqp_socket_recv_fn)(void *, void *, size_t, int);
-typedef int (*amqp_socket_open_fn)(void *, const char *, int);
+typedef int (*amqp_socket_open_fn)(void *, const char *, int, struct timeval *);
typedef int (*amqp_socket_close_fn)(void *);
-typedef int (*amqp_socket_error_fn)(void *);
typedef int (*amqp_socket_get_sockfd_fn)(void *);
+typedef void (*amqp_socket_delete_fn)(void *);
/** V-table for amqp_socket_t */
struct amqp_socket_class_t {
@@ -58,8 +54,8 @@ struct amqp_socket_class_t {
amqp_socket_recv_fn recv;
amqp_socket_open_fn open;
amqp_socket_close_fn close;
- amqp_socket_error_fn error;
amqp_socket_get_sockfd_fn get_sockfd;
+ amqp_socket_delete_fn delete;
};
/** Abstract base class for amqp_socket_t */
@@ -78,6 +74,19 @@ struct iovec {
};
#endif
+
+/**
+ * Set set the socket object for a connection
+ *
+ * This assigns a socket object to the connection, closing and deleting any
+ * existing socket
+ *
+ * \param [in] state The connection object to add the socket to
+ * \param [in] socket The socket object to assign to the connection
+ */
+void
+amqp_set_socket(amqp_connection_state_t state, amqp_socket_t *socket);
+
/**
* Write to a socket.
*
@@ -127,6 +136,56 @@ amqp_socket_send(amqp_socket_t *self, const void *buf, size_t len);
ssize_t
amqp_socket_recv(amqp_socket_t *self, void *buf, size_t len, int flags);
+/**
+ * Close a socket connection and free resources.
+ *
+ * This function closes a socket connection and releases any resources used by
+ * the object. After calling this function the specified socket should no
+ * longer be referenced.
+ *
+ * \param [in,out] self A socket object.
+ *
+ * \return Zero upon success, non-zero otherwise.
+ */
+int
+amqp_socket_close(amqp_socket_t *self);
+
+/**
+ * Destroy a socket object
+ *
+ * \param [in] self the socket object to delete
+ */
+void
+amqp_socket_delete(amqp_socket_t *self);
+
+/**
+ * Open a socket connection.
+ *
+ * This function opens a socket connection returned from amqp_tcp_socket_new()
+ * or amqp_ssl_socket_new(). This function should be called after setting
+ * socket options and prior to assigning the socket to an AMQP connection with
+ * amqp_set_socket().
+ *
+ * \param [in] host Connect to this host.
+ * \param [in] port Connect on this remote port.
+ * \param [in] timeout Max allowed time to spent on opening. If NULL - run in blocking mode
+ *
+ * \return File descriptor upon success, non-zero negative error code otherwise.
+ */
+int
+amqp_open_socket_noblock(char const *hostname, int portnumber, struct timeval *timeout);
+
+int
+amqp_queue_frame(amqp_connection_state_t state, amqp_frame_t *frame);
+
+int
+amqp_put_back_frame(amqp_connection_state_t state, amqp_frame_t *frame);
+
+int
+amqp_simple_wait_frame_on_channel(amqp_connection_state_t state,
+ amqp_channel_t channel,
+ amqp_frame_t *decoded_frame);
+
AMQP_END_DECLS
#endif /* AMQP_SOCKET_H */
diff --git a/librabbitmq/amqp_ssl_socket.h b/librabbitmq/amqp_ssl_socket.h
index 3bfce51..7f8ea64 100644
--- a/librabbitmq/amqp_ssl_socket.h
+++ b/librabbitmq/amqp_ssl_socket.h
@@ -1,4 +1,5 @@
/* vim:set ft=c ts=2 sw=2 sts=2 et cindent: */
+/** \file */
/*
* Copyright 2012-2013 Michael Steinert
*
@@ -21,10 +22,6 @@
* DEALINGS IN THE SOFTWARE.
*/
-/**
- * An SSL socket connection.
- */
-
#ifndef AMQP_SSL_H
#define AMQP_SSL_H
@@ -35,14 +32,28 @@ AMQP_BEGIN_DECLS
/**
* Create a new SSL/TLS socket object.
*
- * Call amqp_socket_close() to release socket resources.
+ * The returned socket object is owned by the \ref amqp_connection_state_t object
+ * and will be destroyed when the state object is destroyed or a new socket
+ * object is created.
+ *
+ * If the socket object creation fails, the \ref amqp_connection_state_t object
+ * will not be changed.
+ *
+ * The object returned by this function can be retrieved from the
+ * amqp_connection_state_t object later using the amqp_get_socket() function.
+ *
+ * Calling this function may result in the underlying SSL library being initialized.
+ * \sa amqp_set_initialize_ssl_library()
*
+ * \param [in,out] state The connection object that owns the SSL/TLS socket
* \return A new socket object or NULL if an error occurred.
+ *
+ * \since v0.4.0
*/
AMQP_PUBLIC_FUNCTION
amqp_socket_t *
AMQP_CALL
-amqp_ssl_socket_new(void);
+amqp_ssl_socket_new(amqp_connection_state_t state);
/**
* Set the CA certificate.
@@ -50,7 +61,10 @@ amqp_ssl_socket_new(void);
* \param [in,out] self An SSL/TLS socket object.
* \param [in] cacert Path to the CA cert file in PEM format.
*
- * \return Zero if successful, -1 otherwise.
+ * \return \ref AMQP_STATUS_OK on success an \ref amqp_status_enum value on
+ * failure.
+ *
+ * \since v0.4.0
*/
AMQP_PUBLIC_FUNCTION
int
@@ -65,7 +79,10 @@ amqp_ssl_socket_set_cacert(amqp_socket_t *self,
* \param [in] cert Path to the client certificate in PEM foramt.
* \param [in] key Path to the client key in PEM format.
*
- * \return Zero if successful, -1 otherwise.
+ * \return \ref AMQP_STATUS_OK on success an \ref amqp_status_enum value on
+ * failure.
+ *
+ * \since v0.4.0
*/
AMQP_PUBLIC_FUNCTION
int
@@ -82,7 +99,10 @@ amqp_ssl_socket_set_key(amqp_socket_t *self,
* \param [in] key A buffer containing client key in PEM format.
* \param [in] n The length of the buffer.
*
- * \return Zero if successful, -1 otherwise.
+ * \return \ref AMQP_STATUS_OK on success an \ref amqp_status_enum value on
+ * failure.
+ *
+ * \since v0.4.0
*/
AMQP_PUBLIC_FUNCTION
int
@@ -101,6 +121,8 @@ amqp_ssl_socket_set_key_buffer(amqp_socket_t *self,
*
* \param [in,out] self An SSL/TLS socket object.
* \param [in] verify Enable or disable peer verification.
+ *
+ * \since v0.4.0
*/
AMQP_PUBLIC_FUNCTION
void
@@ -114,8 +136,8 @@ amqp_ssl_socket_set_verify(amqp_socket_t *self,
* For SSL libraries that require a one-time initialization across
* a whole program (e.g., OpenSSL) this sets whether or not rabbitmq-c
* will initialize the SSL library when the first call to
- * amqp_open_ssl_socket() is made. You should call this function with
- * do_init = 0 if the underlying SSL library is intialized somewhere else
+ * amqp_open_socket() is made. You should call this function with
+ * do_init = 0 if the underlying SSL library is initialized somewhere else
* the program.
*
* Failing to initialize or double initialization of the SSL library will
@@ -124,12 +146,13 @@ amqp_ssl_socket_set_verify(amqp_socket_t *self,
* By default rabbitmq-c will initialize the underlying SSL library
*
* NOTE: calling this function after the first socket has been opened with
- * amqp_open_ssl_socket() will not have any effect.
+ * amqp_open_socket() will not have any effect.
*
- * \param [in] do_initalize If 0 rabbitmq-c will not initialize the SSL
- * library, otherwise rabbitmq-c will initialize the
- * SL library
+ * \param [in] do_initialize If 0 rabbitmq-c will not initialize the SSL
+ * library, otherwise rabbitmq-c will initialize the
+ * SSL library
*
+ * \since v0.4.0
*/
AMQP_PUBLIC_FUNCTION
void
diff --git a/librabbitmq/amqp_table.c b/librabbitmq/amqp_table.c
index 6071ece..6fa6fd3 100644
--- a/librabbitmq/amqp_table.c
+++ b/librabbitmq/amqp_table.c
@@ -462,3 +462,153 @@ int amqp_table_entry_cmp(void const *entry1, void const *entry2)
return p1->key.len - p2->key.len;
}
+
+static int
+amqp_field_value_clone(amqp_field_value_t *original, amqp_field_value_t *clone, amqp_pool_t *pool)
+{
+ int i;
+ int res;
+ clone->kind = original->kind;
+
+ switch (clone->kind) {
+ case AMQP_FIELD_KIND_BOOLEAN:
+ clone->value.boolean = original->value.boolean;
+ break;
+
+ case AMQP_FIELD_KIND_I8:
+ clone->value.i8 = original->value.i8;
+ break;
+
+ case AMQP_FIELD_KIND_U8:
+ clone->value.u8 = original->value.u8;
+ break;
+
+ case AMQP_FIELD_KIND_I16:
+ clone->value.i16 = original->value.i16;
+ break;
+
+ case AMQP_FIELD_KIND_U16:
+ clone->value.u16 = original->value.u16;
+ break;
+
+ case AMQP_FIELD_KIND_I32:
+ clone->value.i32 = original->value.i32;
+ break;
+
+ case AMQP_FIELD_KIND_U32:
+ clone->value.u32 = original->value.u32;
+ break;
+
+ case AMQP_FIELD_KIND_I64:
+ clone->value.i64 = original->value.i64;
+ break;
+
+ case AMQP_FIELD_KIND_U64:
+ case AMQP_FIELD_KIND_TIMESTAMP:
+ clone->value.u64 = original->value.u64;
+ break;
+
+ case AMQP_FIELD_KIND_F32:
+ clone->value.f32 = original->value.f32;
+ break;
+
+ case AMQP_FIELD_KIND_F64:
+ clone->value.f64 = original->value.f64;
+ break;
+
+ case AMQP_FIELD_KIND_DECIMAL:
+ clone->value.decimal = original->value.decimal;
+ break;
+
+ case AMQP_FIELD_KIND_UTF8:
+ case AMQP_FIELD_KIND_BYTES:
+ if (0 == original->value.bytes.len) {
+ clone->value.bytes = amqp_empty_bytes;
+ } else {
+ amqp_pool_alloc_bytes(pool, original->value.bytes.len, &clone->value.bytes);
+ if (NULL == clone->value.bytes.bytes) {
+ return AMQP_STATUS_NO_MEMORY;
+ }
+ memcpy(clone->value.bytes.bytes, original->value.bytes.bytes, clone->value.bytes.len);
+ }
+ break;
+
+ case AMQP_FIELD_KIND_ARRAY:
+ if (0 == original->value.array.entries) {
+ clone->value.array = amqp_empty_array;
+ } else {
+ clone->value.array.num_entries = original->value.array.num_entries;
+ clone->value.array.entries = amqp_pool_alloc(pool, clone->value.array.num_entries * sizeof(amqp_field_value_t));
+ if (NULL == clone->value.array.entries) {
+ return AMQP_STATUS_NO_MEMORY;
+ }
+
+ for (i = 0; i < clone->value.array.num_entries; ++i) {
+ res = amqp_field_value_clone(&original->value.array.entries[i], &clone->value.array.entries[i], pool);
+ if (AMQP_STATUS_OK != res) {
+ return res;
+ }
+ }
+ }
+ break;
+
+ case AMQP_FIELD_KIND_TABLE:
+ return amqp_table_clone(&original->value.table, &clone->value.table, pool);
+
+ case AMQP_FIELD_KIND_VOID:
+ break;
+
+ default:
+ return AMQP_STATUS_INVALID_PARAMETER;
+ }
+
+ return AMQP_STATUS_OK;
+}
+
+
+static int
+amqp_table_entry_clone(amqp_table_entry_t *original, amqp_table_entry_t *clone, amqp_pool_t *pool)
+{
+ if (0 == original->key.len) {
+ return AMQP_STATUS_INVALID_PARAMETER;
+ }
+
+ amqp_pool_alloc_bytes(pool, original->key.len, &clone->key);
+ if (NULL == clone->key.bytes) {
+ return AMQP_STATUS_NO_MEMORY;
+ }
+
+ memcpy(clone->key.bytes, original->key.bytes, clone->key.len);
+
+ return amqp_field_value_clone(&original->value, &clone->value, pool);
+}
+
+int
+amqp_table_clone(amqp_table_t *original, amqp_table_t *clone, amqp_pool_t *pool)
+{
+ int i;
+ int res;
+ clone->num_entries = original->num_entries;
+ if (0 == clone->num_entries) {
+ *clone = amqp_empty_table;
+ return AMQP_STATUS_OK;
+ }
+
+ clone->entries = amqp_pool_alloc(pool, clone->num_entries * sizeof(amqp_table_entry_t));
+
+ if (NULL == clone->entries) {
+ return AMQP_STATUS_NO_MEMORY;
+ }
+
+ for (i = 0; i < clone->num_entries; ++i) {
+ res = amqp_table_entry_clone(&original->entries[i], &clone->entries[i], pool);
+ if (AMQP_STATUS_OK != res) {
+ goto error_out1;
+ }
+ }
+
+ return AMQP_STATUS_OK;
+
+error_out1:
+ return res;
+}
diff --git a/librabbitmq/amqp_tcp_socket.c b/librabbitmq/amqp_tcp_socket.c
index e43a596..ed38c06 100644
--- a/librabbitmq/amqp_tcp_socket.c
+++ b/librabbitmq/amqp_tcp_socket.c
@@ -55,7 +55,7 @@ amqp_tcp_socket_send_inner(void *base, const void *buf, size_t len, int flags)
#endif
start:
- res = send(self->sockfd, buf, len, flags);
+ res = send(self->sockfd, buf_left, len_left, flags);
if (res < 0) {
self->internal_error = amqp_os_socket_error();
@@ -220,10 +220,10 @@ start:
}
static int
-amqp_tcp_socket_open(void *base, const char *host, int port)
+amqp_tcp_socket_open(void *base, const char *host, int port, struct timeval *timeout)
{
struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base;
- self->sockfd = amqp_open_socket(host, port);
+ self->sockfd = amqp_open_socket_noblock(host, port, timeout);
if (0 > self->sockfd) {
int err = self->sockfd;
self->sockfd = -1;
@@ -236,32 +236,34 @@ static int
amqp_tcp_socket_close(void *base)
{
struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base;
- int status = -1;
- if (self) {
- status = amqp_os_socket_close(self->sockfd);
- free(self->buffer);
- free(self);
- }
- if (0 == status) {
- return AMQP_STATUS_OK;
- } else {
- return AMQP_STATUS_SOCKET_ERROR;
+ if (-1 != self->sockfd) {
+ if (amqp_os_socket_close(self->sockfd)) {
+ return AMQP_STATUS_SOCKET_ERROR;
+ }
+ self->sockfd = -1;
}
+
+ return AMQP_STATUS_OK;
}
static int
-amqp_tcp_socket_error(AMQP_UNUSED void *base)
+amqp_tcp_socket_get_sockfd(void *base)
{
struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base;
- return self->internal_error;
+ return self->sockfd;
}
-static int
-amqp_tcp_socket_get_sockfd(void *base)
+static void
+amqp_tcp_socket_delete(void *base)
{
struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base;
- return self->sockfd;
+
+ if (self) {
+ amqp_tcp_socket_close(self);
+ free(self->buffer);
+ free(self);
+ }
}
static const struct amqp_socket_class_t amqp_tcp_socket_class = {
@@ -270,12 +272,12 @@ static const struct amqp_socket_class_t amqp_tcp_socket_class = {
amqp_tcp_socket_recv, /* recv */
amqp_tcp_socket_open, /* open */
amqp_tcp_socket_close, /* close */
- amqp_tcp_socket_error, /* error */
- amqp_tcp_socket_get_sockfd /* get_sockfd */
+ amqp_tcp_socket_get_sockfd, /* get_sockfd */
+ amqp_tcp_socket_delete /* delete */
};
amqp_socket_t *
-amqp_tcp_socket_new(void)
+amqp_tcp_socket_new(amqp_connection_state_t state)
{
struct amqp_tcp_socket_t *self = calloc(1, sizeof(*self));
if (!self) {
@@ -283,6 +285,9 @@ amqp_tcp_socket_new(void)
}
self->klass = &amqp_tcp_socket_class;
self->sockfd = -1;
+
+ amqp_set_socket(state, (amqp_socket_t *)self);
+
return (amqp_socket_t *)self;
}
diff --git a/librabbitmq/amqp_tcp_socket.h b/librabbitmq/amqp_tcp_socket.h
index 4c8ba54..95ed206 100644
--- a/librabbitmq/amqp_tcp_socket.h
+++ b/librabbitmq/amqp_tcp_socket.h
@@ -1,4 +1,5 @@
/* vim:set ft=c ts=2 sw=2 sts=2 et cindent: */
+/** \file */
/*
* Copyright 2012-2013 Michael Steinert
*
@@ -35,14 +36,16 @@ AMQP_BEGIN_DECLS
/**
* Create a new TCP socket.
*
- * Call amqp_socket_close() to release socket resources.
+ * Call amqp_connection_close() to release socket resources.
*
* \return A new socket object or NULL if an error occurred.
+ *
+ * \since v0.4.0
*/
AMQP_PUBLIC_FUNCTION
amqp_socket_t *
AMQP_CALL
-amqp_tcp_socket_new(void);
+amqp_tcp_socket_new(amqp_connection_state_t state);
/**
* Assign an open file descriptor to a socket object.
@@ -53,11 +56,13 @@ amqp_tcp_socket_new(void);
*
* \param [in,out] self A TCP socket object.
* \param [in] sockfd An open socket descriptor.
+ *
+ * \since v0.4.0
*/
AMQP_PUBLIC_FUNCTION
void
AMQP_CALL
-amqp_tcp_socket_set_sockfd(amqp_socket_t *base, int sockfd);
+amqp_tcp_socket_set_sockfd(amqp_socket_t *self, int sockfd);
AMQP_END_DECLS
diff --git a/librabbitmq/amqp_timer.c b/librabbitmq/amqp_timer.c
index 7066358..95606b8 100644
--- a/librabbitmq/amqp_timer.c
+++ b/librabbitmq/amqp_timer.c
@@ -20,7 +20,9 @@
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
* DEALINGS IN THE SOFTWARE.
*/
+#include "amqp.h"
#include "amqp_timer.h"
+#include <string.h>
#if (defined(_WIN32) || defined(__WIN32__) || defined(WIN32))
# define AMQP_WIN_TIMER_API
@@ -38,7 +40,7 @@
uint64_t
amqp_get_monotonic_timestamp(void)
{
- static uint64_t NS_PER_COUNT = 0;
+ static double NS_PER_COUNT = 0;
LARGE_INTEGER perf_count;
if (0 == NS_PER_COUNT) {
@@ -46,14 +48,14 @@ amqp_get_monotonic_timestamp(void)
if (!QueryPerformanceFrequency(&perf_frequency)) {
return 0;
}
- NS_PER_COUNT = AMQP_NS_PER_S / perf_frequency.QuadPart;
+ NS_PER_COUNT = (double)AMQP_NS_PER_S / perf_frequency.QuadPart;
}
if (!QueryPerformanceCounter(&perf_count)) {
return 0;
}
- return perf_count.QuadPart * NS_PER_COUNT;
+ return (uint64_t)(perf_count.QuadPart * NS_PER_COUNT);
}
#endif /* AMQP_WIN_TIMER_API */
@@ -96,3 +98,38 @@ amqp_get_monotonic_timestamp(void)
return ((uint64_t)tp.tv_sec * AMQP_NS_PER_S + (uint64_t)tp.tv_nsec);
}
#endif /* AMQP_POSIX_TIMER_API */
+
+int
+amqp_timer_update(amqp_timer_t *timer, struct timeval *timeout)
+{
+ if (0 == timer->current_timestamp) {
+ timer->current_timestamp = amqp_get_monotonic_timestamp();
+
+ if (0 == timer->current_timestamp) {
+ return AMQP_STATUS_TIMER_FAILURE;
+ }
+
+ timer->timeout_timestamp = timer->current_timestamp +
+ (uint64_t)timeout->tv_sec * AMQP_NS_PER_S +
+ (uint64_t)timeout->tv_usec * AMQP_NS_PER_US;
+
+ } else {
+ timer->current_timestamp = amqp_get_monotonic_timestamp();
+
+ if (0 == timer->current_timestamp) {
+ return AMQP_STATUS_TIMER_FAILURE;
+ }
+ }
+
+ if (timer->current_timestamp > timer->timeout_timestamp) {
+ return AMQP_STATUS_TIMEOUT;
+ }
+
+ timer->ns_until_next_timeout = timer->timeout_timestamp - timer->current_timestamp;
+
+ memset(&timer->tv, 0, sizeof(struct timeval));
+ timer->tv.tv_sec = timer->ns_until_next_timeout / AMQP_NS_PER_S;
+ timer->tv.tv_usec = (timer->ns_until_next_timeout % AMQP_NS_PER_S) / AMQP_NS_PER_US;
+
+ return AMQP_STATUS_OK;
+}
diff --git a/librabbitmq/amqp_timer.h b/librabbitmq/amqp_timer.h
index d1718af..8ac3de9 100644
--- a/librabbitmq/amqp_timer.h
+++ b/librabbitmq/amqp_timer.h
@@ -25,12 +25,39 @@
#include <stdint.h>
+#ifdef _WIN32
+# ifndef WINVER
+# define WINVER 0x0502
+# endif
+# ifndef WIN32_LEAN_AND_MEAN
+# define WIN32_LEAN_AND_MEAN
+# endif
+# include <Winsock2.h>
+#else
+# include <sys/time.h>
+#endif
+
#define AMQP_NS_PER_S 1000000000
#define AMQP_NS_PER_US 1000
+#define AMQP_INIT_TIMER(structure) { \
+ structure.current_timestamp = 0; \
+ structure.timeout_timestamp = 0; \
+}
+
+typedef struct amqp_timer_t_ {
+ uint64_t current_timestamp;
+ uint64_t timeout_timestamp;
+ uint64_t ns_until_next_timeout;
+ struct timeval tv;
+} amqp_timer_t;
+
/* Gets a monotonic timestamp in ns */
uint64_t
amqp_get_monotonic_timestamp(void);
-#endif /* AMQP_TIMER_H */
+/* Prepare timeout value and modify timer state based on timer state. */
+int
+amqp_timer_update(amqp_timer_t *timer, struct timeval *timeout);
+#endif /* AMQP_TIMER_H */
diff --git a/librabbitmq/codegen.py b/librabbitmq/codegen.py
index c8e6d7a..6611716 100644
--- a/librabbitmq/codegen.py
+++ b/librabbitmq/codegen.py
@@ -244,6 +244,10 @@ def methodApiPrototype(m):
fn = m.fullName()
info = apiMethodInfo.get(fn, [])
+ docs = "/**\n * %s\n *\n" % (fn)
+ docs += " * @param [in] state connection state\n"
+ docs += " * @param [in] channel the channel to do the RPC on\n"
+
args = []
for f in m.arguments:
n = c_ize(f.name)
@@ -254,8 +258,12 @@ def methodApiPrototype(m):
args.append(typeFor(m.klass.spec, f).ctype)
args.append(" ")
args.append(n)
+ docs += " * @param [in] %s %s\n" % (n, n)
+
+ docs += " * @returns %s_ok_t\n" % (fn)
+ docs += " */\n"
- return "AMQP_PUBLIC_FUNCTION %s_ok_t * AMQP_CALL %s(amqp_connection_state_t state, amqp_channel_t channel%s)" % (fn, fn, ''.join(args))
+ return "%sAMQP_PUBLIC_FUNCTION\n%s_ok_t *\nAMQP_CALL %s(amqp_connection_state_t state, amqp_channel_t channel%s)" % (docs, fn, fn, ''.join(args))
AmqpMethod.apiPrototype = methodApiPrototype
@@ -545,11 +553,11 @@ int amqp_encode_properties(uint16_t class_id,
def genHrl(spec):
def fieldDeclList(fields):
if fields:
- return ''.join([" %s %s;\n" % (typeFor(spec, f).ctype,
- c_ize(f.name))
+ return ''.join([" %s %s; /**< %s */\n" % (typeFor(spec, f).ctype,
+ c_ize(f.name), f.name)
for f in fields])
else:
- return " char dummy; /* Dummy field to avoid empty struct */\n"
+ return " char dummy; /**< Dummy field to avoid empty struct */\n"
def propDeclList(fields):
return ''.join([" %s %s;\n" % (typeFor(spec, f).ctype, c_ize(f.name))
@@ -594,6 +602,7 @@ def genHrl(spec):
* ***** END LICENSE BLOCK *****
*/
+/** @file amqp_framing.h */
#ifndef AMQP_FRAMING_H
#define AMQP_FRAMING_H
@@ -601,33 +610,74 @@ def genHrl(spec):
AMQP_BEGIN_DECLS
"""
- print "#define AMQP_PROTOCOL_VERSION_MAJOR %d" % (spec.major)
- print "#define AMQP_PROTOCOL_VERSION_MINOR %d" % (spec.minor)
- print "#define AMQP_PROTOCOL_VERSION_REVISION %d" % (spec.revision)
- print "#define AMQP_PROTOCOL_PORT %d" % (spec.port)
+ print "#define AMQP_PROTOCOL_VERSION_MAJOR %d /**< AMQP protocol version major */" % (spec.major)
+ print "#define AMQP_PROTOCOL_VERSION_MINOR %d /**< AMQP protocol version minor */" % (spec.minor)
+ print "#define AMQP_PROTOCOL_VERSION_REVISION %d /**< AMQP protocol version revision */" % (spec.revision)
+ print "#define AMQP_PROTOCOL_PORT %d /**< Default AMQP Port */" % (spec.port)
for (c,v,cls) in spec.constants:
- print "#define %s %s" % (cConstantName(c), v)
+ print "#define %s %s /**< Constant: %s */" % (cConstantName(c), v, c)
print
print """/* Function prototypes. */
+/**
+ * Get constant name string from constant
+ *
+ * @param [in] constantNumber constant to get the name of
+ * @returns string describing the constant. String is managed by
+ * the library and should not be free()'d by the program
+ */
AMQP_PUBLIC_FUNCTION
char const *
AMQP_CALL amqp_constant_name(int constantNumber);
+/**
+ * Checks to see if a constant is a hard error
+ *
+ * A hard error occurs when something severe enough
+ * happens that the connection must be closed.
+ *
+ * @param [in] constantNumber the error constant
+ * @returns true if its a hard error, false otherwise
+ */
AMQP_PUBLIC_FUNCTION
amqp_boolean_t
AMQP_CALL amqp_constant_is_hard_error(int constantNumber);
+/**
+ * Get method name string from method number
+ *
+ * @param [in] methodNumber the method number
+ * @returns method name string. String is managed by the library
+ * and should not be freed()'d by the program
+ */
AMQP_PUBLIC_FUNCTION
char const *
AMQP_CALL amqp_method_name(amqp_method_number_t methodNumber);
+/**
+ * Check whether a method has content
+ *
+ * A method that has content will receive the method frame
+ * a properties frame, then 1 to N body frames
+ *
+ * @param [in] methodNumber the method number
+ * @returns true if method has content, false otherwise
+ */
AMQP_PUBLIC_FUNCTION
amqp_boolean_t
AMQP_CALL amqp_method_has_content(amqp_method_number_t methodNumber);
+/**
+ * Decodes a method from AMQP wireformat
+ *
+ * @param [in] methodNumber the method number for the decoded parameter
+ * @param [in] pool the memory pool to allocate the decoded method from
+ * @param [in] encoded the encoded byte string buffer
+ * @param [out] decoded pointer to the decoded method struct
+ * @returns 0 on success, an error code otherwise
+ */
AMQP_PUBLIC_FUNCTION
int
AMQP_CALL amqp_decode_method(amqp_method_number_t methodNumber,
@@ -635,6 +685,15 @@ AMQP_CALL amqp_decode_method(amqp_method_number_t methodNumber,
amqp_bytes_t encoded,
void **decoded);
+/**
+ * Decodes a header frame properties structure from AMQP wireformat
+ *
+ * @param [in] class_id the class id for the decoded parameter
+ * @param [in] pool the memory pool to allocate the decoded properties from
+ * @param [in] encoded the encoded byte string buffer
+ * @param [out] decoded pointer to the decoded properties struct
+ * @returns 0 on success, an error code otherwise
+ */
AMQP_PUBLIC_FUNCTION
int
AMQP_CALL amqp_decode_properties(uint16_t class_id,
@@ -642,12 +701,32 @@ AMQP_CALL amqp_decode_properties(uint16_t class_id,
amqp_bytes_t encoded,
void **decoded);
+/**
+ * Encodes a method structure in AMQP wireformat
+ *
+ * @param [in] methodNumber the method number for the decoded parameter
+ * @param [in] decoded the method structure (e.g., amqp_connection_start_t)
+ * @param [in] encoded an allocated byte buffer for the encoded method
+ * structure to be written to. If the buffer isn't large enough
+ * to hold the encoded method, an error code will be returned.
+ * @returns 0 on success, an error code otherwise.
+ */
AMQP_PUBLIC_FUNCTION
int
AMQP_CALL amqp_encode_method(amqp_method_number_t methodNumber,
void *decoded,
amqp_bytes_t encoded);
+/**
+ * Encodes a properties structure in AMQP wireformat
+ *
+ * @param [in] class_id the class id for the decoded parameter
+ * @param [in] decoded the properties structure (e.g., amqp_basic_properties_t)
+ * @param [in] encoded an allocated byte buffer for the encoded properties to written to.
+ * If the buffer isn't large enough to hold the encoded method, an
+ * an error code will be returned
+ * @returns 0 on success, an error code otherwise.
+ */
AMQP_PUBLIC_FUNCTION
int
AMQP_CALL amqp_encode_properties(uint16_t class_id,
@@ -658,19 +737,21 @@ AMQP_CALL amqp_encode_properties(uint16_t class_id,
print "/* Method field records. */\n"
for m in methods:
methodid = m.klass.index << 16 | m.index
- print "#define %s ((amqp_method_number_t) 0x%.08X) /* %d, %d; %d */" % \
+ print "#define %s ((amqp_method_number_t) 0x%.08X) /**< %s.%s method id @internal %d, %d; %d */" % \
(m.defName(),
methodid,
+ m.klass.name,
+ m.name,
m.klass.index,
m.index,
methodid)
- print "typedef struct %s_ {\n%s} %s;\n" % \
- (m.structName(), fieldDeclList(m.arguments), m.structName())
+ print "/** %s.%s method fields */\ntypedef struct %s_ {\n%s} %s;\n" % \
+ (m.klass.name, m.name, m.structName(), fieldDeclList(m.arguments), m.structName())
print "/* Class property records. */"
for c in spec.allClasses():
- print "#define %s (0x%.04X) /* %d */" % \
- (cConstantName(c.name + "_class"), c.index, c.index)
+ print "#define %s (0x%.04X) /**< %s class id @internal %d */" % \
+ (cConstantName(c.name + "_class"), c.index, c.name, c.index)
index = 0
for f in c.fields:
if index % 16 == 15:
@@ -678,10 +759,11 @@ AMQP_CALL amqp_encode_properties(uint16_t class_id,
shortnum = index // 16
partialindex = 15 - (index % 16)
bitindex = shortnum * 16 + partialindex
- print '#define %s (1 << %d)' % (cFlagName(c, f), bitindex)
+ print '#define %s (1 << %d) /**< %s.%s property flag */' % (cFlagName(c, f), bitindex, c.name, f.name)
index = index + 1
- print "typedef struct %s_ {\n amqp_flags_t _flags;\n%s} %s;\n" % \
- (c.structName(),
+ print "/** %s class properties */\ntypedef struct %s_ {\n amqp_flags_t _flags; /**< bit-mask of set fields */\n%s} %s;\n" % \
+ (c.name,
+ c.structName(),
fieldDeclList(c.fields),
c.structName())
diff --git a/librabbitmq/win32/threads.h b/librabbitmq/win32/threads.h
index 668b2a3..d1de854 100644
--- a/librabbitmq/win32/threads.h
+++ b/librabbitmq/win32/threads.h
@@ -24,6 +24,12 @@
#ifndef AMQP_THREAD_H
#define AMQP_THREAD_H
+#ifndef WINVER
+# define WINVER 0x0502
+#endif
+#ifndef WIN32_LEAN_AND_MEAN
+# define WIN32_LEAN_AND_MEAN
+#endif
#include <Windows.h>
typedef CRITICAL_SECTION *pthread_mutex_t;