diff options
Diffstat (limited to 'librabbitmq/amqp_api.c')
-rw-r--r-- | librabbitmq/amqp_api.c | 183 |
1 files changed, 90 insertions, 93 deletions
diff --git a/librabbitmq/amqp_api.c b/librabbitmq/amqp_api.c index f42201f..28b2384 100644 --- a/librabbitmq/amqp_api.c +++ b/librabbitmq/amqp_api.c @@ -39,9 +39,9 @@ #ifdef _MSC_VER /* MSVC complains about sprintf being deprecated in favor of sprintf_s */ -# define _CRT_SECURE_NO_WARNINGS +#define _CRT_SECURE_NO_WARNINGS /* MSVC complains about strdup being deprecated in favor of _strdup */ -# define _CRT_NONSTDC_NO_DEPRECATE +#define _CRT_NONSTDC_NO_DEPRECATE #endif #include "amqp_private.h" @@ -55,52 +55,71 @@ #define ERROR_MASK (0x00FF) #define ERROR_CATEGORY_MASK (0xFF00) -enum error_category_enum_ { - EC_base = 0, - EC_tcp = 1, - EC_ssl = 2 -}; +enum error_category_enum_ { EC_base = 0, EC_tcp = 1, EC_ssl = 2 }; static const char *base_error_strings[] = { - "operation completed successfully", /* AMQP_STATUS_OK 0x0 */ - "could not allocate memory", /* AMQP_STATUS_NO_MEMORY -0x0001 */ - "invalid AMQP data", /* AMQP_STATUS_BAD_AQMP_DATA -0x0002 */ - "unknown AMQP class id", /* AMQP_STATUS_UNKNOWN_CLASS -0x0003 */ - "unknown AMQP method id", /* AMQP_STATUS_UNKNOWN_METHOD -0x0004 */ - "hostname lookup failed", /* AMQP_STATUS_HOSTNAME_RESOLUTION_FAILED -0x0005 */ - "incompatible AMQP version", /* AMQP_STATUS_INCOMPATIBLE_AMQP_VERSION -0x0006 */ - "connection closed unexpectedly", /* AMQP_STATUS_CONNECTION_CLOSED -0x0007 */ - "could not parse AMQP URL", /* AMQP_STATUS_BAD_AMQP_URL -0x0008 */ - "a socket error occurred", /* AMQP_STATUS_SOCKET_ERROR -0x0009 */ - "invalid parameter", /* AMQP_STATUS_INVALID_PARAMETER -0x000A */ - "table too large for buffer", /* AMQP_STATUS_TABLE_TOO_BIG -0x000B */ - "unexpected method received", /* AMQP_STATUS_WRONG_METHOD -0x000C */ - "request timed out", /* AMQP_STATUS_TIMEOUT -0x000D */ - "system timer has failed", /* AMQP_STATUS_TIMER_FAILED -0x000E */ - "heartbeat timeout, connection closed",/* AMQP_STATUS_HEARTBEAT_TIMEOUT -0x000F */ - "unexpected protocol state", /* AMQP_STATUS_UNEXPECTED STATE -0x0010 */ - "socket is closed", /* AMQP_STATUS_SOCKET_CLOSED -0x0011 */ - "socket already open", /* AMQP_STATUS_SOCKET_INUSE -0x0012 */ - "unsupported sasl method requested", /* AMQP_STATUS_BROKER_UNSUPPORTED_SASL_METHOD -0x0013 */ - "parameter value is unsupported" /* AMQP_STATUS_UNSUPPORTED -0x0014 */ -}; + /* AMQP_STATUS_OK 0x0 */ + "operation completed successfully", + /* AMQP_STATUS_NO_MEMORY -0x0001 */ + "could not allocate memory", + /* AMQP_STATUS_BAD_AQMP_DATA -0x0002 */ + "invalid AMQP data", + /* AMQP_STATUS_UNKNOWN_CLASS -0x0003 */ + "unknown AMQP class id", + /* AMQP_STATUS_UNKNOWN_METHOD -0x0004 */ + "unknown AMQP method id", + /* AMQP_STATUS_HOSTNAME_RESOLUTION_FAILED -0x0005 */ + "hostname lookup failed", + /* AMQP_STATUS_INCOMPATIBLE_AMQP_VERSION -0x0006 */ + "incompatible AMQP version", + /* AMQP_STATUS_CONNECTION_CLOSED -0x0007 */ + "connection closed unexpectedly", + /* AMQP_STATUS_BAD_AMQP_URL -0x0008 */ + "could not parse AMQP URL", + /* AMQP_STATUS_SOCKET_ERROR -0x0009 */ + "a socket error occurred", + /* AMQP_STATUS_INVALID_PARAMETER -0x000A */ + "invalid parameter", + /* AMQP_STATUS_TABLE_TOO_BIG -0x000B */ + "table too large for buffer", + /* AMQP_STATUS_WRONG_METHOD -0x000C */ + "unexpected method received", + /* AMQP_STATUS_TIMEOUT -0x000D */ + "request timed out", + /* AMQP_STATUS_TIMER_FAILED -0x000E */ + "system timer has failed", + /* AMQP_STATUS_HEARTBEAT_TIMEOUT -0x000F */ + "heartbeat timeout, connection closed", + /* AMQP_STATUS_UNEXPECTED STATE -0x0010 */ + "unexpected protocol state", + /* AMQP_STATUS_SOCKET_CLOSED -0x0011 */ + "socket is closed", + /* AMQP_STATUS_SOCKET_INUSE -0x0012 */ + "socket already open", + /* AMQP_STATUS_BROKER_UNSUPPORTED_SASL_METHOD -0x00013 */ + "unsupported sasl method requested", + /* AMQP_STATUS_UNSUPPORTED -0x0014 */ + "parameter value is unsupported"}; static const char *tcp_error_strings[] = { - "a socket error occurred", /* AMQP_STATUS_TCP_ERROR -0x0100 */ - "socket library initialization failed" /* AMQP_STATUS_TCP_SOCKETLIB_INIT_ERROR -0x0101 */ -}; + /* AMQP_STATUS_TCP_ERROR -0x0100 */ + "a socket error occurred", + /* AMQP_STATUS_TCP_SOCKETLIB_INIT_ERROR -0x0101 */ + "socket library initialization failed"}; static const char *ssl_error_strings[] = { - "a SSL error occurred", /* AMQP_STATUS_SSL_ERROR -0x0200 */ - "SSL hostname verification failed", /* AMQP_STATUS_SSL_HOSTNAME_VERIFY_FAILED -0x0201 */ - "SSL peer cert verification failed", /* AMQP_STATUS_SSL_PEER_VERIFY_FAILED -0x0202 */ - "SSL handshake failed" /* AMQP_STATUS_SSL_CONNECTION_FAILED -0x0203 */ -}; + /* AMQP_STATUS_SSL_ERRO R -0x0200 */ + "a SSL error occurred", + /* AMQP_STATUS_SSL_HOSTNAME_VERIFY_FAILED -0x0201 */ + "SSL hostname verification failed", + /* AMQP_STATUS_SSL_PEER_VERIFY_FAILED -0x0202 */ + "SSL peer cert verification failed", + /* AMQP_STATUS_SSL_CONNECTION_FAILED -0x0203 */ + "SSL handshake failed"}; static const char *unknown_error_string = "(unknown error)"; -const char *amqp_error_string2(int code) -{ +const char *amqp_error_string2(int code) { const char *error_string; size_t category = (((-code) & ERROR_CATEGORY_MASK) >> 8); size_t error = (-code) & ERROR_MASK; @@ -134,14 +153,12 @@ const char *amqp_error_string2(int code) default: error_string = unknown_error_string; break; - } return error_string; } -char *amqp_error_string(int code) -{ +char *amqp_error_string(int code) { /* Previously sometimes clients had to flip the sign on a return value from a * function to get the correct error code. Now, all error codes are negative. * To keep people's legacy code running correctly, we map all error codes to @@ -155,8 +172,7 @@ char *amqp_error_string(int code) return strdup(amqp_error_string2(code)); } -void amqp_abort(const char *fmt, ...) -{ +void amqp_abort(const char *fmt, ...) { va_list ap; va_start(ap, fmt); vfprintf(stderr, fmt, ap); @@ -165,22 +181,19 @@ void amqp_abort(const char *fmt, ...) abort(); } -const amqp_bytes_t amqp_empty_bytes = { 0, NULL }; -const amqp_table_t amqp_empty_table = { 0, NULL }; -const amqp_array_t amqp_empty_array = { 0, NULL }; +const amqp_bytes_t amqp_empty_bytes = {0, NULL}; +const amqp_table_t amqp_empty_table = {0, NULL}; +const amqp_array_t amqp_empty_array = {0, NULL}; -int amqp_basic_publish(amqp_connection_state_t state, - amqp_channel_t channel, - amqp_bytes_t exchange, - amqp_bytes_t routing_key, - amqp_boolean_t mandatory, - amqp_boolean_t immediate, +int amqp_basic_publish(amqp_connection_state_t state, amqp_channel_t channel, + amqp_bytes_t exchange, amqp_bytes_t routing_key, + amqp_boolean_t mandatory, amqp_boolean_t immediate, amqp_basic_properties_t const *properties, - amqp_bytes_t body) -{ + amqp_bytes_t body) { amqp_frame_t f; size_t body_offset; - size_t usable_body_payload_size = state->frame_max - (HEADER_SIZE + FOOTER_SIZE); + size_t usable_body_payload_size = + state->frame_max - (HEADER_SIZE + FOOTER_SIZE); int res; int flagz; @@ -222,7 +235,7 @@ int amqp_basic_publish(amqp_connection_state_t state, f.channel = channel; f.payload.properties.class_id = AMQP_BASIC_CLASS; f.payload.properties.body_size = body.len; - f.payload.properties.decoded = (void *) properties; + f.payload.properties.decoded = (void *)properties; if (body.len > 0) { flagz = AMQP_SF_MORE; @@ -264,11 +277,9 @@ int amqp_basic_publish(amqp_connection_state_t state, } amqp_rpc_reply_t amqp_channel_close(amqp_connection_state_t state, - amqp_channel_t channel, - int code) -{ + amqp_channel_t channel, int code) { char codestr[13]; - amqp_method_number_t replies[2] = { AMQP_CHANNEL_CLOSE_OK_METHOD, 0}; + amqp_method_number_t replies[2] = {AMQP_CHANNEL_CLOSE_OK_METHOD, 0}; amqp_channel_close_t req; if (code < 0 || code > UINT16_MAX) { @@ -281,15 +292,14 @@ amqp_rpc_reply_t amqp_channel_close(amqp_connection_state_t state, req.class_id = 0; req.method_id = 0; - return amqp_simple_rpc(state, channel, AMQP_CHANNEL_CLOSE_METHOD, - replies, &req); + return amqp_simple_rpc(state, channel, AMQP_CHANNEL_CLOSE_METHOD, replies, + &req); } amqp_rpc_reply_t amqp_connection_close(amqp_connection_state_t state, - int code) -{ + int code) { char codestr[13]; - amqp_method_number_t replies[2] = { AMQP_CONNECTION_CLOSE_OK_METHOD, 0}; + amqp_method_number_t replies[2] = {AMQP_CONNECTION_CLOSE_OK_METHOD, 0}; amqp_channel_close_t req; if (code < 0 || code > UINT16_MAX) { @@ -302,15 +312,11 @@ amqp_rpc_reply_t amqp_connection_close(amqp_connection_state_t state, req.class_id = 0; req.method_id = 0; - return amqp_simple_rpc(state, 0, AMQP_CONNECTION_CLOSE_METHOD, - replies, &req); + return amqp_simple_rpc(state, 0, AMQP_CONNECTION_CLOSE_METHOD, replies, &req); } -int amqp_basic_ack(amqp_connection_state_t state, - amqp_channel_t channel, - uint64_t delivery_tag, - amqp_boolean_t multiple) -{ +int amqp_basic_ack(amqp_connection_state_t state, amqp_channel_t channel, + uint64_t delivery_tag, amqp_boolean_t multiple) { amqp_basic_ack_t m; m.delivery_tag = delivery_tag; m.multiple = multiple; @@ -318,30 +324,22 @@ int amqp_basic_ack(amqp_connection_state_t state, } amqp_rpc_reply_t amqp_basic_get(amqp_connection_state_t state, - amqp_channel_t channel, - amqp_bytes_t queue, - amqp_boolean_t no_ack) -{ - amqp_method_number_t replies[] = { AMQP_BASIC_GET_OK_METHOD, - AMQP_BASIC_GET_EMPTY_METHOD, - 0 - }; + amqp_channel_t channel, amqp_bytes_t queue, + amqp_boolean_t no_ack) { + amqp_method_number_t replies[] = {AMQP_BASIC_GET_OK_METHOD, + AMQP_BASIC_GET_EMPTY_METHOD, 0}; amqp_basic_get_t req; req.ticket = 0; req.queue = queue; req.no_ack = no_ack; - state->most_recent_api_result = amqp_simple_rpc(state, channel, - AMQP_BASIC_GET_METHOD, - replies, &req); + state->most_recent_api_result = + amqp_simple_rpc(state, channel, AMQP_BASIC_GET_METHOD, replies, &req); return state->most_recent_api_result; } -int amqp_basic_reject(amqp_connection_state_t state, - amqp_channel_t channel, - uint64_t delivery_tag, - amqp_boolean_t requeue) -{ +int amqp_basic_reject(amqp_connection_state_t state, amqp_channel_t channel, + uint64_t delivery_tag, amqp_boolean_t requeue) { amqp_basic_reject_t req; req.delivery_tag = delivery_tag; req.requeue = requeue; @@ -349,9 +347,8 @@ int amqp_basic_reject(amqp_connection_state_t state, } int amqp_basic_nack(amqp_connection_state_t state, amqp_channel_t channel, - uint64_t delivery_tag, amqp_boolean_t multiple, - amqp_boolean_t requeue) -{ + uint64_t delivery_tag, amqp_boolean_t multiple, + amqp_boolean_t requeue) { amqp_basic_nack_t req; req.delivery_tag = delivery_tag; req.multiple = multiple; @@ -378,7 +375,7 @@ int amqp_set_handshake_timeout(amqp_connection_state_t state, return AMQP_STATUS_OK; } -struct timeval * amqp_get_rpc_timeout(amqp_connection_state_t state) { +struct timeval *amqp_get_rpc_timeout(amqp_connection_state_t state) { return state->rpc_timeout; } |