summaryrefslogtreecommitdiff
path: root/librabbitmq/amqp_api.c
diff options
context:
space:
mode:
Diffstat (limited to 'librabbitmq/amqp_api.c')
-rw-r--r--librabbitmq/amqp_api.c183
1 files changed, 90 insertions, 93 deletions
diff --git a/librabbitmq/amqp_api.c b/librabbitmq/amqp_api.c
index f42201f..28b2384 100644
--- a/librabbitmq/amqp_api.c
+++ b/librabbitmq/amqp_api.c
@@ -39,9 +39,9 @@
#ifdef _MSC_VER
/* MSVC complains about sprintf being deprecated in favor of sprintf_s */
-# define _CRT_SECURE_NO_WARNINGS
+#define _CRT_SECURE_NO_WARNINGS
/* MSVC complains about strdup being deprecated in favor of _strdup */
-# define _CRT_NONSTDC_NO_DEPRECATE
+#define _CRT_NONSTDC_NO_DEPRECATE
#endif
#include "amqp_private.h"
@@ -55,52 +55,71 @@
#define ERROR_MASK (0x00FF)
#define ERROR_CATEGORY_MASK (0xFF00)
-enum error_category_enum_ {
- EC_base = 0,
- EC_tcp = 1,
- EC_ssl = 2
-};
+enum error_category_enum_ { EC_base = 0, EC_tcp = 1, EC_ssl = 2 };
static const char *base_error_strings[] = {
- "operation completed successfully", /* AMQP_STATUS_OK 0x0 */
- "could not allocate memory", /* AMQP_STATUS_NO_MEMORY -0x0001 */
- "invalid AMQP data", /* AMQP_STATUS_BAD_AQMP_DATA -0x0002 */
- "unknown AMQP class id", /* AMQP_STATUS_UNKNOWN_CLASS -0x0003 */
- "unknown AMQP method id", /* AMQP_STATUS_UNKNOWN_METHOD -0x0004 */
- "hostname lookup failed", /* AMQP_STATUS_HOSTNAME_RESOLUTION_FAILED -0x0005 */
- "incompatible AMQP version", /* AMQP_STATUS_INCOMPATIBLE_AMQP_VERSION -0x0006 */
- "connection closed unexpectedly", /* AMQP_STATUS_CONNECTION_CLOSED -0x0007 */
- "could not parse AMQP URL", /* AMQP_STATUS_BAD_AMQP_URL -0x0008 */
- "a socket error occurred", /* AMQP_STATUS_SOCKET_ERROR -0x0009 */
- "invalid parameter", /* AMQP_STATUS_INVALID_PARAMETER -0x000A */
- "table too large for buffer", /* AMQP_STATUS_TABLE_TOO_BIG -0x000B */
- "unexpected method received", /* AMQP_STATUS_WRONG_METHOD -0x000C */
- "request timed out", /* AMQP_STATUS_TIMEOUT -0x000D */
- "system timer has failed", /* AMQP_STATUS_TIMER_FAILED -0x000E */
- "heartbeat timeout, connection closed",/* AMQP_STATUS_HEARTBEAT_TIMEOUT -0x000F */
- "unexpected protocol state", /* AMQP_STATUS_UNEXPECTED STATE -0x0010 */
- "socket is closed", /* AMQP_STATUS_SOCKET_CLOSED -0x0011 */
- "socket already open", /* AMQP_STATUS_SOCKET_INUSE -0x0012 */
- "unsupported sasl method requested", /* AMQP_STATUS_BROKER_UNSUPPORTED_SASL_METHOD -0x0013 */
- "parameter value is unsupported" /* AMQP_STATUS_UNSUPPORTED -0x0014 */
-};
+ /* AMQP_STATUS_OK 0x0 */
+ "operation completed successfully",
+ /* AMQP_STATUS_NO_MEMORY -0x0001 */
+ "could not allocate memory",
+ /* AMQP_STATUS_BAD_AQMP_DATA -0x0002 */
+ "invalid AMQP data",
+ /* AMQP_STATUS_UNKNOWN_CLASS -0x0003 */
+ "unknown AMQP class id",
+ /* AMQP_STATUS_UNKNOWN_METHOD -0x0004 */
+ "unknown AMQP method id",
+ /* AMQP_STATUS_HOSTNAME_RESOLUTION_FAILED -0x0005 */
+ "hostname lookup failed",
+ /* AMQP_STATUS_INCOMPATIBLE_AMQP_VERSION -0x0006 */
+ "incompatible AMQP version",
+ /* AMQP_STATUS_CONNECTION_CLOSED -0x0007 */
+ "connection closed unexpectedly",
+ /* AMQP_STATUS_BAD_AMQP_URL -0x0008 */
+ "could not parse AMQP URL",
+ /* AMQP_STATUS_SOCKET_ERROR -0x0009 */
+ "a socket error occurred",
+ /* AMQP_STATUS_INVALID_PARAMETER -0x000A */
+ "invalid parameter",
+ /* AMQP_STATUS_TABLE_TOO_BIG -0x000B */
+ "table too large for buffer",
+ /* AMQP_STATUS_WRONG_METHOD -0x000C */
+ "unexpected method received",
+ /* AMQP_STATUS_TIMEOUT -0x000D */
+ "request timed out",
+ /* AMQP_STATUS_TIMER_FAILED -0x000E */
+ "system timer has failed",
+ /* AMQP_STATUS_HEARTBEAT_TIMEOUT -0x000F */
+ "heartbeat timeout, connection closed",
+ /* AMQP_STATUS_UNEXPECTED STATE -0x0010 */
+ "unexpected protocol state",
+ /* AMQP_STATUS_SOCKET_CLOSED -0x0011 */
+ "socket is closed",
+ /* AMQP_STATUS_SOCKET_INUSE -0x0012 */
+ "socket already open",
+ /* AMQP_STATUS_BROKER_UNSUPPORTED_SASL_METHOD -0x00013 */
+ "unsupported sasl method requested",
+ /* AMQP_STATUS_UNSUPPORTED -0x0014 */
+ "parameter value is unsupported"};
static const char *tcp_error_strings[] = {
- "a socket error occurred", /* AMQP_STATUS_TCP_ERROR -0x0100 */
- "socket library initialization failed" /* AMQP_STATUS_TCP_SOCKETLIB_INIT_ERROR -0x0101 */
-};
+ /* AMQP_STATUS_TCP_ERROR -0x0100 */
+ "a socket error occurred",
+ /* AMQP_STATUS_TCP_SOCKETLIB_INIT_ERROR -0x0101 */
+ "socket library initialization failed"};
static const char *ssl_error_strings[] = {
- "a SSL error occurred", /* AMQP_STATUS_SSL_ERROR -0x0200 */
- "SSL hostname verification failed", /* AMQP_STATUS_SSL_HOSTNAME_VERIFY_FAILED -0x0201 */
- "SSL peer cert verification failed", /* AMQP_STATUS_SSL_PEER_VERIFY_FAILED -0x0202 */
- "SSL handshake failed" /* AMQP_STATUS_SSL_CONNECTION_FAILED -0x0203 */
-};
+ /* AMQP_STATUS_SSL_ERRO R -0x0200 */
+ "a SSL error occurred",
+ /* AMQP_STATUS_SSL_HOSTNAME_VERIFY_FAILED -0x0201 */
+ "SSL hostname verification failed",
+ /* AMQP_STATUS_SSL_PEER_VERIFY_FAILED -0x0202 */
+ "SSL peer cert verification failed",
+ /* AMQP_STATUS_SSL_CONNECTION_FAILED -0x0203 */
+ "SSL handshake failed"};
static const char *unknown_error_string = "(unknown error)";
-const char *amqp_error_string2(int code)
-{
+const char *amqp_error_string2(int code) {
const char *error_string;
size_t category = (((-code) & ERROR_CATEGORY_MASK) >> 8);
size_t error = (-code) & ERROR_MASK;
@@ -134,14 +153,12 @@ const char *amqp_error_string2(int code)
default:
error_string = unknown_error_string;
break;
-
}
return error_string;
}
-char *amqp_error_string(int code)
-{
+char *amqp_error_string(int code) {
/* Previously sometimes clients had to flip the sign on a return value from a
* function to get the correct error code. Now, all error codes are negative.
* To keep people's legacy code running correctly, we map all error codes to
@@ -155,8 +172,7 @@ char *amqp_error_string(int code)
return strdup(amqp_error_string2(code));
}
-void amqp_abort(const char *fmt, ...)
-{
+void amqp_abort(const char *fmt, ...) {
va_list ap;
va_start(ap, fmt);
vfprintf(stderr, fmt, ap);
@@ -165,22 +181,19 @@ void amqp_abort(const char *fmt, ...)
abort();
}
-const amqp_bytes_t amqp_empty_bytes = { 0, NULL };
-const amqp_table_t amqp_empty_table = { 0, NULL };
-const amqp_array_t amqp_empty_array = { 0, NULL };
+const amqp_bytes_t amqp_empty_bytes = {0, NULL};
+const amqp_table_t amqp_empty_table = {0, NULL};
+const amqp_array_t amqp_empty_array = {0, NULL};
-int amqp_basic_publish(amqp_connection_state_t state,
- amqp_channel_t channel,
- amqp_bytes_t exchange,
- amqp_bytes_t routing_key,
- amqp_boolean_t mandatory,
- amqp_boolean_t immediate,
+int amqp_basic_publish(amqp_connection_state_t state, amqp_channel_t channel,
+ amqp_bytes_t exchange, amqp_bytes_t routing_key,
+ amqp_boolean_t mandatory, amqp_boolean_t immediate,
amqp_basic_properties_t const *properties,
- amqp_bytes_t body)
-{
+ amqp_bytes_t body) {
amqp_frame_t f;
size_t body_offset;
- size_t usable_body_payload_size = state->frame_max - (HEADER_SIZE + FOOTER_SIZE);
+ size_t usable_body_payload_size =
+ state->frame_max - (HEADER_SIZE + FOOTER_SIZE);
int res;
int flagz;
@@ -222,7 +235,7 @@ int amqp_basic_publish(amqp_connection_state_t state,
f.channel = channel;
f.payload.properties.class_id = AMQP_BASIC_CLASS;
f.payload.properties.body_size = body.len;
- f.payload.properties.decoded = (void *) properties;
+ f.payload.properties.decoded = (void *)properties;
if (body.len > 0) {
flagz = AMQP_SF_MORE;
@@ -264,11 +277,9 @@ int amqp_basic_publish(amqp_connection_state_t state,
}
amqp_rpc_reply_t amqp_channel_close(amqp_connection_state_t state,
- amqp_channel_t channel,
- int code)
-{
+ amqp_channel_t channel, int code) {
char codestr[13];
- amqp_method_number_t replies[2] = { AMQP_CHANNEL_CLOSE_OK_METHOD, 0};
+ amqp_method_number_t replies[2] = {AMQP_CHANNEL_CLOSE_OK_METHOD, 0};
amqp_channel_close_t req;
if (code < 0 || code > UINT16_MAX) {
@@ -281,15 +292,14 @@ amqp_rpc_reply_t amqp_channel_close(amqp_connection_state_t state,
req.class_id = 0;
req.method_id = 0;
- return amqp_simple_rpc(state, channel, AMQP_CHANNEL_CLOSE_METHOD,
- replies, &req);
+ return amqp_simple_rpc(state, channel, AMQP_CHANNEL_CLOSE_METHOD, replies,
+ &req);
}
amqp_rpc_reply_t amqp_connection_close(amqp_connection_state_t state,
- int code)
-{
+ int code) {
char codestr[13];
- amqp_method_number_t replies[2] = { AMQP_CONNECTION_CLOSE_OK_METHOD, 0};
+ amqp_method_number_t replies[2] = {AMQP_CONNECTION_CLOSE_OK_METHOD, 0};
amqp_channel_close_t req;
if (code < 0 || code > UINT16_MAX) {
@@ -302,15 +312,11 @@ amqp_rpc_reply_t amqp_connection_close(amqp_connection_state_t state,
req.class_id = 0;
req.method_id = 0;
- return amqp_simple_rpc(state, 0, AMQP_CONNECTION_CLOSE_METHOD,
- replies, &req);
+ return amqp_simple_rpc(state, 0, AMQP_CONNECTION_CLOSE_METHOD, replies, &req);
}
-int amqp_basic_ack(amqp_connection_state_t state,
- amqp_channel_t channel,
- uint64_t delivery_tag,
- amqp_boolean_t multiple)
-{
+int amqp_basic_ack(amqp_connection_state_t state, amqp_channel_t channel,
+ uint64_t delivery_tag, amqp_boolean_t multiple) {
amqp_basic_ack_t m;
m.delivery_tag = delivery_tag;
m.multiple = multiple;
@@ -318,30 +324,22 @@ int amqp_basic_ack(amqp_connection_state_t state,
}
amqp_rpc_reply_t amqp_basic_get(amqp_connection_state_t state,
- amqp_channel_t channel,
- amqp_bytes_t queue,
- amqp_boolean_t no_ack)
-{
- amqp_method_number_t replies[] = { AMQP_BASIC_GET_OK_METHOD,
- AMQP_BASIC_GET_EMPTY_METHOD,
- 0
- };
+ amqp_channel_t channel, amqp_bytes_t queue,
+ amqp_boolean_t no_ack) {
+ amqp_method_number_t replies[] = {AMQP_BASIC_GET_OK_METHOD,
+ AMQP_BASIC_GET_EMPTY_METHOD, 0};
amqp_basic_get_t req;
req.ticket = 0;
req.queue = queue;
req.no_ack = no_ack;
- state->most_recent_api_result = amqp_simple_rpc(state, channel,
- AMQP_BASIC_GET_METHOD,
- replies, &req);
+ state->most_recent_api_result =
+ amqp_simple_rpc(state, channel, AMQP_BASIC_GET_METHOD, replies, &req);
return state->most_recent_api_result;
}
-int amqp_basic_reject(amqp_connection_state_t state,
- amqp_channel_t channel,
- uint64_t delivery_tag,
- amqp_boolean_t requeue)
-{
+int amqp_basic_reject(amqp_connection_state_t state, amqp_channel_t channel,
+ uint64_t delivery_tag, amqp_boolean_t requeue) {
amqp_basic_reject_t req;
req.delivery_tag = delivery_tag;
req.requeue = requeue;
@@ -349,9 +347,8 @@ int amqp_basic_reject(amqp_connection_state_t state,
}
int amqp_basic_nack(amqp_connection_state_t state, amqp_channel_t channel,
- uint64_t delivery_tag, amqp_boolean_t multiple,
- amqp_boolean_t requeue)
-{
+ uint64_t delivery_tag, amqp_boolean_t multiple,
+ amqp_boolean_t requeue) {
amqp_basic_nack_t req;
req.delivery_tag = delivery_tag;
req.multiple = multiple;
@@ -378,7 +375,7 @@ int amqp_set_handshake_timeout(amqp_connection_state_t state,
return AMQP_STATUS_OK;
}
-struct timeval * amqp_get_rpc_timeout(amqp_connection_state_t state) {
+struct timeval *amqp_get_rpc_timeout(amqp_connection_state_t state) {
return state->rpc_timeout;
}