/* * ***** BEGIN LICENSE BLOCK ***** * Version: MIT * * Portions created by VMware are Copyright (c) 2007-2012 VMware, Inc. * All Rights Reserved. * * Portions created by Tony Garnock-Jones are Copyright (c) 2009-2010 * VMware, Inc. and Tony Garnock-Jones. All Rights Reserved. * * Permission is hereby granted, free of charge, to any person * obtaining a copy of this software and associated documentation * files (the "Software"), to deal in the Software without * restriction, including without limitation the rights to use, copy, * modify, merge, publish, distribute, sublicense, and/or sell copies * of the Software, and to permit persons to whom the Software is * furnished to do so, subject to the following conditions: * * The above copyright notice and this permission notice shall be * included in all copies or substantial portions of the Software. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE * SOFTWARE. * ***** END LICENSE BLOCK ***** */ #ifdef HAVE_CONFIG_H #include "config.h" #endif #include "amqp_private.h" #include #include #include #include #include #include static const char *client_error_strings[ERROR_MAX] = { "could not allocate memory", /* ERROR_NO_MEMORY */ "received bad AMQP data", /* ERROR_BAD_AQMP_DATA */ "unknown AMQP class id", /* ERROR_UNKOWN_CLASS */ "unknown AMQP method id", /* ERROR_UNKOWN_METHOD */ "unknown host", /* ERROR_GETHOSTBYNAME_FAILED */ "incompatible AMQP version", /* ERROR_INCOMPATIBLE_AMQP_VERSION */ "connection closed unexpectedly", /* ERROR_CONNECTION_CLOSED */ "could not parse AMQP URL", /* ERROR_BAD_AMQP_URL */ }; char *amqp_error_string(int err) { const char *str; int category = (err & ERROR_CATEGORY_MASK); err = (err & ~ERROR_CATEGORY_MASK); switch (category) { case ERROR_CATEGORY_CLIENT: if (err < 1 || err > ERROR_MAX) str = "(undefined librabbitmq error)"; else str = client_error_strings[err - 1]; break; case ERROR_CATEGORY_OS: return amqp_os_error_string(err); default: str = "(undefined error category)"; } return strdup(str); } void amqp_abort(const char *fmt, ...) { va_list ap; va_start(ap, fmt); vfprintf(stderr, fmt, ap); va_end(ap); fputc('\n', stderr); abort(); } const amqp_bytes_t amqp_empty_bytes = { 0, NULL }; const amqp_table_t amqp_empty_table = { 0, NULL }; const amqp_array_t amqp_empty_array = { 0, NULL }; #define RPC_REPLY(replytype) \ (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL \ ? (replytype *) state->most_recent_api_result.reply.decoded \ : NULL) int amqp_basic_publish(amqp_connection_state_t state, amqp_channel_t channel, amqp_bytes_t exchange, amqp_bytes_t routing_key, amqp_boolean_t mandatory, amqp_boolean_t immediate, amqp_basic_properties_t const *properties, amqp_bytes_t body) { amqp_frame_t f; size_t body_offset; size_t usable_body_payload_size = state->frame_max - (HEADER_SIZE + FOOTER_SIZE); int res; amqp_basic_publish_t m; amqp_basic_properties_t default_properties; m.exchange = exchange; m.routing_key = routing_key; m.mandatory = mandatory; m.immediate = immediate; m.ticket = 0; res = amqp_send_method(state, channel, AMQP_BASIC_PUBLISH_METHOD, &m); if (res < 0) return res; if (properties == NULL) { memset(&default_properties, 0, sizeof(default_properties)); properties = &default_properties; } f.frame_type = AMQP_FRAME_HEADER; f.channel = channel; f.payload.properties.class_id = AMQP_BASIC_CLASS; f.payload.properties.body_size = body.len; f.payload.properties.decoded = (void *) properties; res = amqp_send_frame(state, &f); if (res < 0) return res; body_offset = 0; while (body_offset < body.len) { size_t remaining = body.len - body_offset; if (remaining == 0) break; f.frame_type = AMQP_FRAME_BODY; f.channel = channel; f.payload.body_fragment.bytes = amqp_offset(body.bytes, body_offset); if (remaining >= usable_body_payload_size) { f.payload.body_fragment.len = usable_body_payload_size; } else { f.payload.body_fragment.len = remaining; } body_offset += f.payload.body_fragment.len; res = amqp_send_frame(state, &f); if (res < 0) return res; } return 0; } amqp_rpc_reply_t amqp_channel_close(amqp_connection_state_t state, amqp_channel_t channel, int code) { char codestr[13]; amqp_method_number_t replies[2] = { AMQP_CHANNEL_CLOSE_OK_METHOD, 0}; amqp_channel_close_t req; req.reply_code = code; req.reply_text.bytes = codestr; req.reply_text.len = sprintf(codestr, "%d", code); req.class_id = 0; req.method_id = 0; return amqp_simple_rpc(state, channel, AMQP_CHANNEL_CLOSE_METHOD, replies, &req); } amqp_rpc_reply_t amqp_connection_close(amqp_connection_state_t state, int code) { char codestr[13]; amqp_method_number_t replies[2] = { AMQP_CONNECTION_CLOSE_OK_METHOD, 0}; amqp_channel_close_t req; req.reply_code = code; req.reply_text.bytes = codestr; req.reply_text.len = sprintf(codestr, "%d", code); req.class_id = 0; req.method_id = 0; return amqp_simple_rpc(state, 0, AMQP_CONNECTION_CLOSE_METHOD, replies, &req); } int amqp_basic_ack(amqp_connection_state_t state, amqp_channel_t channel, uint64_t delivery_tag, amqp_boolean_t multiple) { amqp_basic_ack_t m; m.delivery_tag = delivery_tag; m.multiple = multiple; return amqp_send_method(state, channel, AMQP_BASIC_ACK_METHOD, &m); } amqp_rpc_reply_t amqp_basic_get(amqp_connection_state_t state, amqp_channel_t channel, amqp_bytes_t queue, amqp_boolean_t no_ack) { amqp_method_number_t replies[] = { AMQP_BASIC_GET_OK_METHOD, AMQP_BASIC_GET_EMPTY_METHOD, 0 }; amqp_basic_get_t req; req.ticket = 0; req.queue = queue; req.no_ack = no_ack; state->most_recent_api_result = amqp_simple_rpc(state, channel, AMQP_BASIC_GET_METHOD, replies, &req); return state->most_recent_api_result; } int amqp_basic_reject(amqp_connection_state_t state, amqp_channel_t channel, uint64_t delivery_tag, amqp_boolean_t requeue) { amqp_basic_reject_t req; req.delivery_tag = delivery_tag; req.requeue = requeue; return amqp_send_method(state, channel, AMQP_BASIC_REJECT_METHOD, &req); }