diff options
author | Alan Antonuk <alan.antonuk@gmail.com> | 2017-02-20 18:23:05 -0800 |
---|---|---|
committer | Alan Antonuk <alan.antonuk@gmail.com> | 2017-08-21 22:52:03 -0700 |
commit | 90e6f2e372e2bb502ce919563af58658214fbc2f (patch) | |
tree | 5dfe3d177e9d8e7b4943377f994931e04dfc211c | |
parent | 4b337fe0fa1f733b679014aa1e3b036ccbcd7635 (diff) | |
download | rabbitmq-c-tutorial-1.tar.gz |
Examples: add tutorial 1 example code.tutorial-1
Add a hello-world sender (send.c) and receiver (recv.c) examples that
matches what is done in tutorial #1 on the RabbitMQ website.
-rw-r--r-- | examples/CMakeLists.txt | 2 | ||||
-rw-r--r-- | examples/tutorial-1-hello-world/CMakeLists.txt | 6 | ||||
-rw-r--r-- | examples/tutorial-1-hello-world/recv.c | 141 | ||||
-rw-r--r-- | examples/tutorial-1-hello-world/send.c | 201 |
4 files changed, 350 insertions, 0 deletions
diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 8dcdcf4..18eb850 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -71,3 +71,5 @@ target_link_libraries(amqps_bind ${RMQ_LIBRARY_TARGET}) add_executable(amqps_listenq amqps_listenq.c ${COMMON_SRCS}) target_link_libraries(amqps_listenq ${RMQ_LIBRARY_TARGET}) endif (ENABLE_SSL_SUPPORT) + +add_subdirectory(tutorial-1-hello-world) diff --git a/examples/tutorial-1-hello-world/CMakeLists.txt b/examples/tutorial-1-hello-world/CMakeLists.txt new file mode 100644 index 0000000..d4dc54c --- /dev/null +++ b/examples/tutorial-1-hello-world/CMakeLists.txt @@ -0,0 +1,6 @@ + +add_executable(send send.c) +target_link_libraries(send ${RMQ_LIBRARY_TARGET}) + +add_executable(recv recv.c) +target_link_libraries(recv ${RMQ_LIBRARY_TARGET}) diff --git a/examples/tutorial-1-hello-world/recv.c b/examples/tutorial-1-hello-world/recv.c new file mode 100644 index 0000000..73afbc1 --- /dev/null +++ b/examples/tutorial-1-hello-world/recv.c @@ -0,0 +1,141 @@ +/* + * ***** BEGIN LICENSE BLOCK ***** + * Version: MIT + * + * Portions created by Alan Antonuk are Copyright (c) 2017 Alan Antonuk. + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, copy, + * modify, merge, publish, distribute, sublicense, and/or sell copies + * of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS + * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN + * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * ***** END LICENSE BLOCK ***** + */ + +#include <amqp.h> +#include <amqp_tcp_socket.h> + +#include <stdio.h> +#include <stdlib.h> +#include <string.h> + +int main(void) { + const char* QUEUE_NAME = "hello"; + + amqp_connection_state_t conn; + amqp_socket_t *socket; + int res; + amqp_rpc_reply_t reply; + amqp_channel_t chan; + + + conn = amqp_new_connection(); + if (!conn) { + fprintf(stderr, "failed to allocate connection object\n"); + return 1; + } + + /* socket is owned by the conn */ + socket = amqp_tcp_socket_new(conn); + if (!socket) { + fprintf(stderr, "failed to allocate socket object\n"); + res = 2; + goto cleanup1; + } + + res = amqp_socket_open(socket, "localhost", AMQP_PROTOCOL_PORT); + if (res != AMQP_STATUS_OK) { + fprintf(stderr, "failed to open socket to broker: %s\n", + amqp_error_string2(res)); + res = 3; + goto cleanup1; + } + + reply = + amqp_login(conn, AMQP_DEFAULT_VHOST, AMQP_DEFAULT_MAX_CHANNELS, + AMQP_DEFAULT_FRAME_SIZE, AMQP_DEFAULT_HEARTBEAT, + AMQP_SASL_METHOD_PLAIN, + "guest", /* default guest account */ + "guest" /* default guest password */); + if (reply.reply_type != AMQP_RESPONSE_NORMAL) { + fprintf(stderr, "failed login with broker\n"); + goto cleanup1; + } + + chan = 1; + if (!amqp_channel_open(conn, chan)) { + fprintf(stderr, "failed to open channel\n"); + goto cleanup1; + } + + { + amqp_queue_declare_ok_t *queue = amqp_queue_declare( + conn, chan, amqp_cstring_bytes(QUEUE_NAME), + 0, /* Not passive, actually declare the queue */ + 0, /* Not durable, will not survive broker restart */ + 0, /* Not exclusive to connection, may be used by other connections */ + 0, /* Not automatically deleted upon disconnection */ + amqp_empty_table); + if (!queue) { + fprintf(stderr, "failed to declare queue: \n"); + abort(); + } + } + + { + const char* CONSUMER_NAME = "hello-consumer"; + amqp_basic_consume_ok_t *consumer = amqp_basic_consume( + conn, chan, amqp_cstring_bytes(QUEUE_NAME), + amqp_cstring_bytes(CONSUMER_NAME), + 0, /* non-local, don't deliver messages published by this connection */ + 1, /* exclusive: only this consumer may access the queue */ + 1, /* no-ack: consider message ack'd upon delivery */ + amqp_empty_table); + if (!consumer) { + fprintf(stderr, "failed to start consumer: \n"); + abort(); + } + + printf(" [*] Waiting for messages. To exit press CTRL+C\n"); + for (;;) { + amqp_envelope_t envelope; + + amqp_maybe_release_buffers(conn); + reply = amqp_consume_message(conn, &envelope, + NULL, /* no timeout waiting for a message */ + 0 /* flags are unused currently */); + if (reply.reply_type != AMQP_RESPONSE_NORMAL) { + fprintf(stderr, "failed to read message\n"); + abort(); + } + printf(" [x] Received '%.*s'\n", (int)envelope.message.body.len, + (char*)envelope.message.body.bytes); + amqp_destroy_envelope(&envelope); + } + } + + /* The above loop goes forever, this is never hit, but for completeness */ + reply = amqp_connection_close(conn, AMQP_REPLY_SUCCESS); + if (reply.reply_type != AMQP_RESPONSE_NORMAL) { + fprintf(stderr, "failed to close connection: \n"); + abort(); + } + +cleanup1: + amqp_destroy_connection(conn); + return res; +} diff --git a/examples/tutorial-1-hello-world/send.c b/examples/tutorial-1-hello-world/send.c new file mode 100644 index 0000000..3d67fc1 --- /dev/null +++ b/examples/tutorial-1-hello-world/send.c @@ -0,0 +1,201 @@ +/* + * ***** BEGIN LICENSE BLOCK ***** + * Version: MIT + * + * Portions created by Alan Antonuk are Copyright (c) 2017 Alan Antonuk. + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, copy, + * modify, merge, publish, distribute, sublicense, and/or sell copies + * of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS + * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN + * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * ***** END LICENSE BLOCK ***** + */ + +#include <amqp.h> +#include <amqp_tcp_socket.h> + +#include <stdio.h> +#include <stdlib.h> +#include <string.h> + +static void print_rpc_reply(amqp_rpc_reply_t r, const char* context); + +int main(void) { + const char* QUEUE_NAME = "hello"; + const char* MESSAGE = "Hello World!"; + + amqp_connection_state_t conn; + amqp_socket_t *socket; + amqp_channel_t chan; + int res; + amqp_rpc_reply_t reply; + + conn = amqp_new_connection(); + if (!conn) { + fprintf(stderr, "failed to allocate connection object\n"); + return 1; + } + + /* socket is owned by the conn object, its lifetime is managed by conn */ + socket = amqp_tcp_socket_new(conn); + if (!socket) { + fprintf(stderr, "failed to allocate socket object\n"); + res = 1; + goto cleanup1; + } + + res = amqp_socket_open(socket, "localhost", AMQP_PROTOCOL_PORT); + if (res != AMQP_STATUS_OK) { + fprintf(stderr, "failed to connect to the broker: %s\n", + amqp_error_string2(res)); + res = 2; + goto cleanup1; + } + + reply = + amqp_login(conn, AMQP_DEFAULT_VHOST, AMQP_DEFAULT_MAX_CHANNELS, + AMQP_DEFAULT_FRAME_SIZE, AMQP_DEFAULT_HEARTBEAT, + AMQP_SASL_METHOD_PLAIN, + "guest", /* default guest account */ + "guest" /* default guest password */); + + if (reply.reply_type != AMQP_RESPONSE_NORMAL) { + print_rpc_reply(reply, "failed to complete handshake with broker"); + res = 3; + goto cleanup1; + } + + /* Channel numbers are any arbitrary number between 1 and the maximum + * number of channels requested. AMQP_DEFAULT_MAX_CHANNELS requests 2^16-1 + * channels. A good method to select channel numbers is to start at 1 and + * go up from there, once a channel has been close (amqp_channel_close) it can + * be reused again. + */ + chan = 1; + /* rabbitmq-c that return a pointer to a struct indicate failure by returning + * NULL. Details on the error can be retried by calling amqp_get_rpc_reply */ + if (!amqp_channel_open(conn, chan)) { + /* TODO: print error information */ + print_rpc_reply(amqp_get_rpc_reply(conn), "opening channel 1"); + res = 4; + goto cleanup1; + } + + { + /* Some functions return additional information in a struct. The queue + * struct below is owned by the conn object. It is released by calling + * amqp_maybe_release_buffers or when the whole connection object is + * released when amqp_destroy_connection is called. + */ + amqp_queue_declare_ok_t *queue = amqp_queue_declare( + conn, chan, amqp_cstring_bytes(QUEUE_NAME), + 0, /* Not passive, actually declare the queue */ + 0, /* Not durable, won't survive broker restart */ + 0, /* Not exclusive to this connection, maybe used by connections */ + 0, /* Not automatically deleted upon disconnection */ + amqp_empty_table); + if (!queue) { + /* TODO: AMQP_CHANNEL_CLOSE_METHOD can be recovered */ + print_rpc_reply(amqp_get_rpc_reply(conn), "declaring queue"); + res = 5; + goto cleanup1; + } + } + + { + amqp_basic_properties_t props; + memset(&props, 0, sizeof(props)); + + props.content_type = amqp_cstring_bytes("text/plain"); + /* props._flags is a bitmask indicating which header fields are set. */ + props._flags |= AMQP_BASIC_CONTENT_TYPE_FLAG; + + res = amqp_basic_publish( + conn, chan, amqp_empty_bytes, /* Use the default exchange */ + amqp_cstring_bytes(QUEUE_NAME), /* routing key */ + 0, /* Not mandatory, no error if the message is not routed */ + 0, /* Not immediate, RabbitMQ does not support this */ + &props, amqp_cstring_bytes(MESSAGE)); + if (res != AMQP_STATUS_OK) { + fprintf(stderr, "failed to publish message: %s\n", + amqp_error_string2(res)); + res = 6; + goto cleanup1; + } + } + + printf(" [x] Sent '%s'\n", MESSAGE); + + /* close the connection to the broker, this implicitly closes all channels, + * there is no need to explicitly close any channels when tearing down a + * connection */ + reply = amqp_connection_close(conn, AMQP_REPLY_SUCCESS); + if (reply.reply_type != AMQP_RESPONSE_NORMAL) { + print_rpc_reply(reply, "closing connection"); + res = 7; + goto cleanup1; + } + + + res = 0; + +cleanup1: + amqp_destroy_connection(conn); + return res; +} + +static void print_rpc_reply(amqp_rpc_reply_t r, const char* context) { + switch (r.reply_type) { + case AMQP_RESPONSE_NORMAL: + break; + + case AMQP_RESPONSE_NONE: + fprintf(stderr, "%s: missing RPC reply type!\n", context); + break; + + case AMQP_RESPONSE_LIBRARY_EXCEPTION: + fprintf(stderr, "%s: %s\n", context, amqp_error_string2(r.library_error)); + break; + + case AMQP_RESPONSE_SERVER_EXCEPTION: + switch (r.reply.id) { + case AMQP_CONNECTION_CLOSE_METHOD: { + amqp_connection_close_t *m = + (amqp_connection_close_t *)r.reply.decoded; + fprintf(stderr, "%s: connection exception %uh, message: %.*s\n", + context, m->reply_code, (int)m->reply_text.len, + (char *)m->reply_text.bytes); + break; + } + case AMQP_CHANNEL_CLOSE_METHOD: { + amqp_channel_close_t *m = (amqp_channel_close_t *)r.reply.decoded; + fprintf(stderr, "%s: channel exception %uh, message: %.*s\n", context, + m->reply_code, (int)m->reply_text.len, + (char *)m->reply_text.bytes); + break; + } + default: + fprintf(stderr, "%s: unknown broker error, method 0x%08X, %s\n", + context, r.reply.id, amqp_method_name(r.reply.id)); + break; + } + break; + default: + fprintf(stderr, "%s: unknown reply-type: %d", context, r.reply_type); + } +} |