// Copyright 2007 - 2021, Alan Antonuk and the rabbitmq-c contributors. // SPDX-License-Identifier: mit #ifdef HAVE_CONFIG_H #include "config.h" #endif #ifdef _MSC_VER /* MSVC complains about sprintf being deprecated in favor of sprintf_s */ #define _CRT_SECURE_NO_WARNINGS /* MSVC complains about strdup being deprecated in favor of _strdup */ #define _CRT_NONSTDC_NO_DEPRECATE #endif #include "amqp_private.h" #include "amqp_time.h" #include #include #include #include #include #define ERROR_MASK (0x00FF) #define ERROR_CATEGORY_MASK (0xFF00) enum error_category_enum_ { EC_base = 0, EC_tcp = 1, EC_ssl = 2 }; static const char *base_error_strings[] = { /* AMQP_STATUS_OK 0x0 */ "operation completed successfully", /* AMQP_STATUS_NO_MEMORY -0x0001 */ "could not allocate memory", /* AMQP_STATUS_BAD_AQMP_DATA -0x0002 */ "invalid AMQP data", /* AMQP_STATUS_UNKNOWN_CLASS -0x0003 */ "unknown AMQP class id", /* AMQP_STATUS_UNKNOWN_METHOD -0x0004 */ "unknown AMQP method id", /* AMQP_STATUS_HOSTNAME_RESOLUTION_FAILED -0x0005 */ "hostname lookup failed", /* AMQP_STATUS_INCOMPATIBLE_AMQP_VERSION -0x0006 */ "incompatible AMQP version", /* AMQP_STATUS_CONNECTION_CLOSED -0x0007 */ "connection closed unexpectedly", /* AMQP_STATUS_BAD_AMQP_URL -0x0008 */ "could not parse AMQP URL", /* AMQP_STATUS_SOCKET_ERROR -0x0009 */ "a socket error occurred", /* AMQP_STATUS_INVALID_PARAMETER -0x000A */ "invalid parameter", /* AMQP_STATUS_TABLE_TOO_BIG -0x000B */ "table too large for buffer", /* AMQP_STATUS_WRONG_METHOD -0x000C */ "unexpected method received", /* AMQP_STATUS_TIMEOUT -0x000D */ "request timed out", /* AMQP_STATUS_TIMER_FAILED -0x000E */ "system timer has failed", /* AMQP_STATUS_HEARTBEAT_TIMEOUT -0x000F */ "heartbeat timeout, connection closed", /* AMQP_STATUS_UNEXPECTED STATE -0x0010 */ "unexpected protocol state", /* AMQP_STATUS_SOCKET_CLOSED -0x0011 */ "socket is closed", /* AMQP_STATUS_SOCKET_INUSE -0x0012 */ "socket already open", /* AMQP_STATUS_BROKER_UNSUPPORTED_SASL_METHOD -0x00013 */ "unsupported sasl method requested", /* AMQP_STATUS_UNSUPPORTED -0x0014 */ "parameter value is unsupported"}; static const char *tcp_error_strings[] = { /* AMQP_STATUS_TCP_ERROR -0x0100 */ "a socket error occurred", /* AMQP_STATUS_TCP_SOCKETLIB_INIT_ERROR -0x0101 */ "socket library initialization failed"}; static const char *ssl_error_strings[] = { /* AMQP_STATUS_SSL_ERROR -0x0200 */ "a SSL error occurred", /* AMQP_STATUS_SSL_HOSTNAME_VERIFY_FAILED -0x0201 */ "SSL hostname verification failed", /* AMQP_STATUS_SSL_PEER_VERIFY_FAILED -0x0202 */ "SSL peer cert verification failed", /* AMQP_STATUS_SSL_CONNECTION_FAILED -0x0203 */ "SSL handshake failed", /* AMQP_STATUS_SSL_SET_ENGINE_FAILED -0x0204 */ "SSL setting engine failed"}; static const char *unknown_error_string = "(unknown error)"; const char *amqp_error_string2(int code) { const char *error_string; size_t category = (((-code) & ERROR_CATEGORY_MASK) >> 8); size_t error = (-code) & ERROR_MASK; switch (category) { case EC_base: if (error < (sizeof(base_error_strings) / sizeof(char *))) { error_string = base_error_strings[error]; } else { error_string = unknown_error_string; } break; case EC_tcp: if (error < (sizeof(tcp_error_strings) / sizeof(char *))) { error_string = tcp_error_strings[error]; } else { error_string = unknown_error_string; } break; case EC_ssl: if (error < (sizeof(ssl_error_strings) / sizeof(char *))) { error_string = ssl_error_strings[error]; } else { error_string = unknown_error_string; } break; default: error_string = unknown_error_string; break; } return error_string; } char *amqp_error_string(int code) { /* Previously sometimes clients had to flip the sign on a return value from a * function to get the correct error code. Now, all error codes are negative. * To keep people's legacy code running correctly, we map all error codes to * negative values. * * This is only done with this deprecated function. */ if (code > 0) { code = -code; } return strdup(amqp_error_string2(code)); } void amqp_abort(const char *fmt, ...) { va_list ap; va_start(ap, fmt); vfprintf(stderr, fmt, ap); va_end(ap); fputc('\n', stderr); abort(); } const amqp_bytes_t amqp_empty_bytes = {0, NULL}; const amqp_table_t amqp_empty_table = {0, NULL}; const amqp_array_t amqp_empty_array = {0, NULL}; int amqp_basic_publish(amqp_connection_state_t state, amqp_channel_t channel, amqp_bytes_t exchange, amqp_bytes_t routing_key, amqp_boolean_t mandatory, amqp_boolean_t immediate, amqp_basic_properties_t const *properties, amqp_bytes_t body) { amqp_frame_t f; size_t body_offset; size_t usable_body_payload_size = state->frame_max - (HEADER_SIZE + FOOTER_SIZE); int res; int flagz; amqp_basic_publish_t m; amqp_basic_properties_t default_properties; m.exchange = exchange; m.routing_key = routing_key; m.mandatory = mandatory; m.immediate = immediate; m.ticket = 0; /* TODO(alanxz): this heartbeat check is happening in the wrong place, it * should really be done in amqp_try_send/writev */ res = amqp_time_has_past(state->next_recv_heartbeat); if (AMQP_STATUS_TIMER_FAILURE == res) { return res; } else if (AMQP_STATUS_TIMEOUT == res) { res = amqp_try_recv(state); if (AMQP_STATUS_TIMEOUT == res) { return AMQP_STATUS_HEARTBEAT_TIMEOUT; } else if (AMQP_STATUS_OK != res) { return res; } } res = amqp_send_method_inner(state, channel, AMQP_BASIC_PUBLISH_METHOD, &m, AMQP_SF_MORE, amqp_time_infinite()); if (res < 0) { return res; } if (properties == NULL) { memset(&default_properties, 0, sizeof(default_properties)); properties = &default_properties; } f.frame_type = AMQP_FRAME_HEADER; f.channel = channel; f.payload.properties.class_id = AMQP_BASIC_CLASS; f.payload.properties.body_size = body.len; f.payload.properties.decoded = (void *)properties; if (body.len > 0) { flagz = AMQP_SF_MORE; } else { flagz = AMQP_SF_NONE; } res = amqp_send_frame_inner(state, &f, flagz, amqp_time_infinite()); if (res < 0) { return res; } body_offset = 0; while (body_offset < body.len) { size_t remaining = body.len - body_offset; if (remaining == 0) { break; } f.frame_type = AMQP_FRAME_BODY; f.channel = channel; f.payload.body_fragment.bytes = amqp_offset(body.bytes, body_offset); if (remaining >= usable_body_payload_size) { f.payload.body_fragment.len = usable_body_payload_size; flagz = AMQP_SF_MORE; } else { f.payload.body_fragment.len = remaining; flagz = AMQP_SF_NONE; } body_offset += f.payload.body_fragment.len; res = amqp_send_frame_inner(state, &f, flagz, amqp_time_infinite()); if (res < 0) { return res; } } return AMQP_STATUS_OK; } amqp_rpc_reply_t amqp_channel_close(amqp_connection_state_t state, amqp_channel_t channel, int code) { char codestr[13]; amqp_method_number_t replies[2] = {AMQP_CHANNEL_CLOSE_OK_METHOD, 0}; amqp_channel_close_t req; if (code < 0 || code > UINT16_MAX) { return amqp_rpc_reply_error(AMQP_STATUS_INVALID_PARAMETER); } req.reply_code = (uint16_t)code; req.reply_text.bytes = codestr; req.reply_text.len = sprintf(codestr, "%d", code); req.class_id = 0; req.method_id = 0; return amqp_simple_rpc(state, channel, AMQP_CHANNEL_CLOSE_METHOD, replies, &req); } amqp_rpc_reply_t amqp_connection_close(amqp_connection_state_t state, int code) { char codestr[13]; amqp_method_number_t replies[2] = {AMQP_CONNECTION_CLOSE_OK_METHOD, 0}; amqp_channel_close_t req; if (code < 0 || code > UINT16_MAX) { return amqp_rpc_reply_error(AMQP_STATUS_INVALID_PARAMETER); } req.reply_code = (uint16_t)code; req.reply_text.bytes = codestr; req.reply_text.len = sprintf(codestr, "%d", code); req.class_id = 0; req.method_id = 0; return amqp_simple_rpc(state, 0, AMQP_CONNECTION_CLOSE_METHOD, replies, &req); } int amqp_basic_ack(amqp_connection_state_t state, amqp_channel_t channel, uint64_t delivery_tag, amqp_boolean_t multiple) { amqp_basic_ack_t m; m.delivery_tag = delivery_tag; m.multiple = multiple; return amqp_send_method(state, channel, AMQP_BASIC_ACK_METHOD, &m); } amqp_rpc_reply_t amqp_basic_get(amqp_connection_state_t state, amqp_channel_t channel, amqp_bytes_t queue, amqp_boolean_t no_ack) { amqp_method_number_t replies[] = {AMQP_BASIC_GET_OK_METHOD, AMQP_BASIC_GET_EMPTY_METHOD, 0}; amqp_basic_get_t req; req.ticket = 0; req.queue = queue; req.no_ack = no_ack; state->most_recent_api_result = amqp_simple_rpc(state, channel, AMQP_BASIC_GET_METHOD, replies, &req); return state->most_recent_api_result; } int amqp_basic_reject(amqp_connection_state_t state, amqp_channel_t channel, uint64_t delivery_tag, amqp_boolean_t requeue) { amqp_basic_reject_t req; req.delivery_tag = delivery_tag; req.requeue = requeue; return amqp_send_method(state, channel, AMQP_BASIC_REJECT_METHOD, &req); } int amqp_basic_nack(amqp_connection_state_t state, amqp_channel_t channel, uint64_t delivery_tag, amqp_boolean_t multiple, amqp_boolean_t requeue) { amqp_basic_nack_t req; req.delivery_tag = delivery_tag; req.multiple = multiple; req.requeue = requeue; return amqp_send_method(state, channel, AMQP_BASIC_NACK_METHOD, &req); } struct timeval *amqp_get_handshake_timeout(amqp_connection_state_t state) { return state->handshake_timeout; } int amqp_set_handshake_timeout(amqp_connection_state_t state, const struct timeval *timeout) { if (timeout) { if (timeout->tv_sec < 0 || timeout->tv_usec < 0) { return AMQP_STATUS_INVALID_PARAMETER; } state->internal_handshake_timeout = *timeout; state->handshake_timeout = &state->internal_handshake_timeout; } else { state->handshake_timeout = NULL; } return AMQP_STATUS_OK; } struct timeval *amqp_get_rpc_timeout(amqp_connection_state_t state) { return state->rpc_timeout; } int amqp_set_rpc_timeout(amqp_connection_state_t state, const struct timeval *timeout) { if (timeout) { if (timeout->tv_sec < 0 || timeout->tv_usec < 0) { return AMQP_STATUS_INVALID_PARAMETER; } state->rpc_timeout = &state->internal_rpc_timeout; *state->rpc_timeout = *timeout; } else { state->rpc_timeout = NULL; } return AMQP_STATUS_OK; }