From 29563e721661063e48d0690d2ff6e03ad1da225f Mon Sep 17 00:00:00 2001 From: Michael Steinert Date: Sun, 27 May 2012 11:43:55 -0600 Subject: Add SSL/TLS examples Signed-off-by: Michael Steinert --- examples/CMakeLists.txt | 28 +++++- examples/amqps_bind.c | 88 +++++++++++++++++++ examples/amqps_consumer.c | 175 ++++++++++++++++++++++++++++++++++++++ examples/amqps_exchange_declare.c | 83 ++++++++++++++++++ examples/amqps_listen.c | 175 ++++++++++++++++++++++++++++++++++++++ examples/amqps_listenq.c | 158 ++++++++++++++++++++++++++++++++++ examples/amqps_producer.c | 145 +++++++++++++++++++++++++++++++ examples/amqps_sendstring.c | 97 +++++++++++++++++++++ examples/amqps_unbind.c | 88 +++++++++++++++++++ 9 files changed, 1036 insertions(+), 1 deletion(-) create mode 100644 examples/amqps_bind.c create mode 100644 examples/amqps_consumer.c create mode 100644 examples/amqps_exchange_declare.c create mode 100644 examples/amqps_listen.c create mode 100644 examples/amqps_listenq.c create mode 100644 examples/amqps_producer.c create mode 100644 examples/amqps_sendstring.c create mode 100644 examples/amqps_unbind.c (limited to 'examples') diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 5a610cb..6f45551 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -38,4 +38,30 @@ add_executable(amqp_bind amqp_bind.c ${COMMON_SRCS}) target_link_libraries(amqp_bind ${RMQ_LIBRARY_TARGET}) add_executable(amqp_listenq amqp_listenq.c ${COMMON_SRCS}) -target_link_libraries(amqp_listenq ${RMQ_LIBRARY_TARGET}) +target_link_libraries(amqp_listenq rabbitmq) + +if (ENABLE_SSL_SUPPORT) +add_executable(amqps_sendstring amqps_sendstring.c ${COMMON_SRCS}) +target_link_libraries(amqps_sendstring rabbitmq) + +add_executable(amqps_exchange_declare amqps_exchange_declare.c ${COMMON_SRCS}) +target_link_libraries(amqps_exchange_declare rabbitmq) + +add_executable(amqps_listen amqps_listen.c ${COMMON_SRCS}) +target_link_libraries(amqps_listen rabbitmq) + +add_executable(amqps_producer amqps_producer.c ${COMMON_SRCS}) +target_link_libraries(amqps_producer rabbitmq) + +add_executable(amqps_consumer amqps_consumer.c ${COMMON_SRCS}) +target_link_libraries(amqps_consumer rabbitmq) + +add_executable(amqps_unbind amqps_unbind.c ${COMMON_SRCS}) +target_link_libraries(amqps_unbind rabbitmq) + +add_executable(amqps_bind amqps_bind.c ${COMMON_SRCS}) +target_link_libraries(amqps_bind rabbitmq) + +add_executable(amqps_listenq amqps_listenq.c ${COMMON_SRCS}) +target_link_libraries(amqps_listenq rabbitmq) +endif (ENABLE_SSL_SUPPORT) diff --git a/examples/amqps_bind.c b/examples/amqps_bind.c new file mode 100644 index 0000000..1e2c84f --- /dev/null +++ b/examples/amqps_bind.c @@ -0,0 +1,88 @@ +/* + * ***** BEGIN LICENSE BLOCK ***** + * Version: MIT + * + * Portions created by VMware are Copyright (c) 2007-2012 VMware, Inc. + * All Rights Reserved. + * + * Portions created by Tony Garnock-Jones are Copyright (c) 2009-2010 + * VMware, Inc. and Tony Garnock-Jones. All Rights Reserved. + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, copy, + * modify, merge, publish, distribute, sublicense, and/or sell copies + * of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS + * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN + * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * ***** END LICENSE BLOCK ***** + */ + +#include +#include +#include + +#include +#include +#include + +#include "utils.h" + +int main(int argc, char const * const *argv) { + char const *hostname; + int port; + char const *exchange; + char const *bindingkey; + char const *queue; + + int sockfd; + amqp_connection_state_t conn; + + if (argc < 6) { + fprintf(stderr, "Usage: amqps_bind host port exchange bindingkey queue " + "[cacert.pem [key.pem cert.pem]]\n"); + return 1; + } + + hostname = argv[1]; + port = atoi(argv[2]); + exchange = argv[3]; + bindingkey = argv[4]; + queue = argv[5]; + + conn = amqp_new_connection(); + + die_on_error(sockfd = amqp_open_ssl_socket(conn, hostname, port, + argc > 6 ? argv[6] : NULL, + argc > 8 ? argv[7] : NULL, + argc > 8 ? argv[8] : NULL), + "Opening SSL/TLS socket"); + die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), + "Logging in"); + amqp_channel_open(conn, 1); + die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel"); + + amqp_queue_bind(conn, 1, + amqp_cstring_bytes(queue), + amqp_cstring_bytes(exchange), + amqp_cstring_bytes(bindingkey), + amqp_empty_table); + die_on_amqp_error(amqp_get_rpc_reply(conn), "Unbinding"); + + die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel"); + die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection"); + die_on_error(amqp_destroy_connection(conn), "Ending connection"); + return 0; +} diff --git a/examples/amqps_consumer.c b/examples/amqps_consumer.c new file mode 100644 index 0000000..31d251d --- /dev/null +++ b/examples/amqps_consumer.c @@ -0,0 +1,175 @@ +/* + * ***** BEGIN LICENSE BLOCK ***** + * Version: MIT + * + * Portions created by VMware are Copyright (c) 2007-2012 VMware, Inc. + * All Rights Reserved. + * + * Portions created by Tony Garnock-Jones are Copyright (c) 2009-2010 + * VMware, Inc. and Tony Garnock-Jones. All Rights Reserved. + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, copy, + * modify, merge, publish, distribute, sublicense, and/or sell copies + * of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS + * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN + * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * ***** END LICENSE BLOCK ***** + */ + +#include +#include +#include + +#include +#include +#include + +#include + +#include "utils.h" + +#define SUMMARY_EVERY_US 1000000 + +static void run(amqp_connection_state_t conn) +{ + uint64_t start_time = now_microseconds(); + int received = 0; + int previous_received = 0; + uint64_t previous_report_time = start_time; + uint64_t next_summary_time = start_time + SUMMARY_EVERY_US; + + amqp_frame_t frame; + int result; + size_t body_received; + size_t body_target; + + uint64_t now; + + while (1) { + now = now_microseconds(); + if (now > next_summary_time) { + int countOverInterval = received - previous_received; + double intervalRate = countOverInterval / ((now - previous_report_time) / 1000000.0); + printf("%d ms: Received %d - %d since last report (%d Hz)\n", + (int)(now - start_time) / 1000, received, countOverInterval, (int) intervalRate); + + previous_received = received; + previous_report_time = now; + next_summary_time += SUMMARY_EVERY_US; + } + + amqp_maybe_release_buffers(conn); + result = amqp_simple_wait_frame(conn, &frame); + if (result < 0) + return; + + if (frame.frame_type != AMQP_FRAME_METHOD) + continue; + + if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD) + continue; + + result = amqp_simple_wait_frame(conn, &frame); + if (result < 0) + return; + + if (frame.frame_type != AMQP_FRAME_HEADER) { + fprintf(stderr, "Expected header!"); + abort(); + } + + body_target = frame.payload.properties.body_size; + body_received = 0; + + while (body_received < body_target) { + result = amqp_simple_wait_frame(conn, &frame); + if (result < 0) + return; + + if (frame.frame_type != AMQP_FRAME_BODY) { + fprintf(stderr, "Expected body!"); + abort(); + } + + body_received += frame.payload.body_fragment.len; + assert(body_received <= body_target); + } + + received++; + } +} + +int main(int argc, char const * const *argv) { + char const *hostname; + int port; + char const *exchange; + char const *bindingkey; + + int sockfd; + amqp_connection_state_t conn; + + amqp_bytes_t queuename; + + if (argc < 3) { + fprintf(stderr, "Usage: amqps_consumer host port " + "[cacert.pem [key.pem cert.pem]]\n"); + return 1; + } + + hostname = argv[1]; + port = atoi(argv[2]); + exchange = "amq.direct"; /* argv[3]; */ + bindingkey = "test queue"; /* argv[4]; */ + + conn = amqp_new_connection(); + + die_on_error(sockfd = amqp_open_ssl_socket(conn, hostname, port, + argc > 3 ? argv[3] : NULL, + argc > 5 ? argv[4] : NULL, + argc > 5 ? argv[5] : NULL), + "Opening SSL/TLS socket"); + die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), + "Logging in"); + amqp_channel_open(conn, 1); + die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel"); + + { + amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, 1, amqp_empty_bytes, 0, 0, 0, 1, + amqp_empty_table); + die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring queue"); + queuename = amqp_bytes_malloc_dup(r->queue); + if (queuename.bytes == NULL) { + fprintf(stderr, "Out of memory while copying queue name"); + return 1; + } + } + + amqp_queue_bind(conn, 1, queuename, amqp_cstring_bytes(exchange), amqp_cstring_bytes(bindingkey), + amqp_empty_table); + die_on_amqp_error(amqp_get_rpc_reply(conn), "Binding queue"); + + amqp_basic_consume(conn, 1, queuename, amqp_empty_bytes, 0, 1, 0, amqp_empty_table); + die_on_amqp_error(amqp_get_rpc_reply(conn), "Consuming"); + + run(conn); + + die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel"); + die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection"); + die_on_error(amqp_destroy_connection(conn), "Ending connection"); + + return 0; +} diff --git a/examples/amqps_exchange_declare.c b/examples/amqps_exchange_declare.c new file mode 100644 index 0000000..6e81087 --- /dev/null +++ b/examples/amqps_exchange_declare.c @@ -0,0 +1,83 @@ +/* + * ***** BEGIN LICENSE BLOCK ***** + * Version: MIT + * + * Portions created by VMware are Copyright (c) 2007-2012 VMware, Inc. + * All Rights Reserved. + * + * Portions created by Tony Garnock-Jones are Copyright (c) 2009-2010 + * VMware, Inc. and Tony Garnock-Jones. All Rights Reserved. + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, copy, + * modify, merge, publish, distribute, sublicense, and/or sell copies + * of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS + * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN + * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * ***** END LICENSE BLOCK ***** + */ + +#include +#include +#include + +#include +#include +#include + +#include "utils.h" + +int main(int argc, char const * const *argv) { + char const *hostname; + int port; + char const *exchange; + char const *exchangetype; + + int sockfd; + amqp_connection_state_t conn; + + if (argc < 5) { + fprintf(stderr, "Usage: amqps_exchange_declare host port exchange " + "exchangetype [cacert.pem [key.pem cert.pem]]\n"); + return 1; + } + + hostname = argv[1]; + port = atoi(argv[2]); + exchange = argv[3]; + exchangetype = argv[4]; + + conn = amqp_new_connection(); + + die_on_error(sockfd = amqp_open_ssl_socket(conn, hostname, port, + argc > 5 ? argv[5] : NULL, + argc > 7 ? argv[6] : NULL, + argc > 7 ? argv[7] : NULL), + "Opening SSL/TLS socket"); + die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), + "Logging in"); + amqp_channel_open(conn, 1); + die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel"); + + amqp_exchange_declare(conn, 1, amqp_cstring_bytes(exchange), amqp_cstring_bytes(exchangetype), + 0, 0, amqp_empty_table); + die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring exchange"); + + die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel"); + die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection"); + die_on_error(amqp_destroy_connection(conn), "Ending connection"); + return 0; +} diff --git a/examples/amqps_listen.c b/examples/amqps_listen.c new file mode 100644 index 0000000..306bbaa --- /dev/null +++ b/examples/amqps_listen.c @@ -0,0 +1,175 @@ +/* + * ***** BEGIN LICENSE BLOCK ***** + * Version: MIT + * + * Portions created by VMware are Copyright (c) 2007-2012 VMware, Inc. + * All Rights Reserved. + * + * Portions created by Tony Garnock-Jones are Copyright (c) 2009-2010 + * VMware, Inc. and Tony Garnock-Jones. All Rights Reserved. + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, copy, + * modify, merge, publish, distribute, sublicense, and/or sell copies + * of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS + * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN + * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * ***** END LICENSE BLOCK ***** + */ + +#include +#include +#include + +#include +#include +#include + +#include + +#include "utils.h" + +int main(int argc, char const * const *argv) { + char const *hostname; + int port; + char const *exchange; + char const *bindingkey; + + int sockfd; + amqp_connection_state_t conn; + + amqp_bytes_t queuename; + + if (argc < 5) { + fprintf(stderr, "Usage: amqps_listen host port exchange bindingkey " + "[cacert.pem [key.pem cert.pem]]\n"); + return 1; + } + + hostname = argv[1]; + port = atoi(argv[2]); + exchange = argv[3]; + bindingkey = argv[4]; + + conn = amqp_new_connection(); + + die_on_error(sockfd = amqp_open_ssl_socket(conn, hostname, port, + argc > 5 ? argv[5] : NULL, + argc > 7 ? argv[6] : NULL, + argc > 7 ? argv[7] : NULL), + "Opening socket"); + die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), + "Logging in"); + amqp_channel_open(conn, 1); + die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel"); + + { + amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, 1, amqp_empty_bytes, 0, 0, 0, 1, + amqp_empty_table); + die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring queue"); + queuename = amqp_bytes_malloc_dup(r->queue); + if (queuename.bytes == NULL) { + fprintf(stderr, "Out of memory while copying queue name"); + return 1; + } + } + + amqp_queue_bind(conn, 1, queuename, amqp_cstring_bytes(exchange), amqp_cstring_bytes(bindingkey), + amqp_empty_table); + die_on_amqp_error(amqp_get_rpc_reply(conn), "Binding queue"); + + amqp_basic_consume(conn, 1, queuename, amqp_empty_bytes, 0, 1, 0, amqp_empty_table); + die_on_amqp_error(amqp_get_rpc_reply(conn), "Consuming"); + + { + amqp_frame_t frame; + int result; + + amqp_basic_deliver_t *d; + amqp_basic_properties_t *p; + size_t body_target; + size_t body_received; + + while (1) { + amqp_maybe_release_buffers(conn); + result = amqp_simple_wait_frame(conn, &frame); + printf("Result %d\n", result); + if (result < 0) + break; + + printf("Frame type %d, channel %d\n", frame.frame_type, frame.channel); + if (frame.frame_type != AMQP_FRAME_METHOD) + continue; + + printf("Method %s\n", amqp_method_name(frame.payload.method.id)); + if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD) + continue; + + d = (amqp_basic_deliver_t *) frame.payload.method.decoded; + printf("Delivery %u, exchange %.*s routingkey %.*s\n", + (unsigned) d->delivery_tag, + (int) d->exchange.len, (char *) d->exchange.bytes, + (int) d->routing_key.len, (char *) d->routing_key.bytes); + + result = amqp_simple_wait_frame(conn, &frame); + if (result < 0) + break; + + if (frame.frame_type != AMQP_FRAME_HEADER) { + fprintf(stderr, "Expected header!"); + abort(); + } + p = (amqp_basic_properties_t *) frame.payload.properties.decoded; + if (p->_flags & AMQP_BASIC_CONTENT_TYPE_FLAG) { + printf("Content-type: %.*s\n", + (int) p->content_type.len, (char *) p->content_type.bytes); + } + printf("----\n"); + + body_target = frame.payload.properties.body_size; + body_received = 0; + + while (body_received < body_target) { + result = amqp_simple_wait_frame(conn, &frame); + if (result < 0) + break; + + if (frame.frame_type != AMQP_FRAME_BODY) { + fprintf(stderr, "Expected body!"); + abort(); + } + + body_received += frame.payload.body_fragment.len; + assert(body_received <= body_target); + + amqp_dump(frame.payload.body_fragment.bytes, + frame.payload.body_fragment.len); + } + + if (body_received != body_target) { + /* Can only happen when amqp_simple_wait_frame returns <= 0 */ + /* We break here to close the connection */ + break; + } + } + } + + die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel"); + die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection"); + die_on_error(amqp_destroy_connection(conn), "Ending connection"); + + return 0; +} diff --git a/examples/amqps_listenq.c b/examples/amqps_listenq.c new file mode 100644 index 0000000..90b608a --- /dev/null +++ b/examples/amqps_listenq.c @@ -0,0 +1,158 @@ +/* + * ***** BEGIN LICENSE BLOCK ***** + * Version: MIT + * + * Portions created by VMware are Copyright (c) 2007-2012 VMware, Inc. + * All Rights Reserved. + * + * Portions created by Tony Garnock-Jones are Copyright (c) 2009-2010 + * VMware, Inc. and Tony Garnock-Jones. All Rights Reserved. + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, copy, + * modify, merge, publish, distribute, sublicense, and/or sell copies + * of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS + * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN + * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * ***** END LICENSE BLOCK ***** + */ + +#include +#include +#include + +#include +#include +#include + +#include + +#include "utils.h" + +int main(int argc, char const * const *argv) { + char const *hostname; + int port; + char const *queuename; + + int sockfd; + amqp_connection_state_t conn; + + if (argc < 4) { + fprintf(stderr, "Usage: amqps_listenq host port queuename " + "[cacert.pem [key.pem cert.pem]]\n"); + return 1; + } + + hostname = argv[1]; + port = atoi(argv[2]); + queuename = argv[3]; + + conn = amqp_new_connection(); + + die_on_error(sockfd = amqp_open_ssl_socket(conn, hostname, port, + argc > 4 ? argv[4] : NULL, + argc > 6 ? argv[5] : NULL, + argc > 6 ? argv[6] : NULL), + "Opening socket"); + die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), + "Logging in"); + amqp_channel_open(conn, 1); + die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel"); + + amqp_basic_consume(conn, 1, amqp_cstring_bytes(queuename), amqp_empty_bytes, 0, 0, 0, amqp_empty_table); + die_on_amqp_error(amqp_get_rpc_reply(conn), "Consuming"); + + { + amqp_frame_t frame; + int result; + + amqp_basic_deliver_t *d; + amqp_basic_properties_t *p; + size_t body_target; + size_t body_received; + + while (1) { + amqp_maybe_release_buffers(conn); + result = amqp_simple_wait_frame(conn, &frame); + printf("Result %d\n", result); + if (result < 0) + break; + + printf("Frame type %d, channel %d\n", frame.frame_type, frame.channel); + if (frame.frame_type != AMQP_FRAME_METHOD) + continue; + + printf("Method %s\n", amqp_method_name(frame.payload.method.id)); + if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD) + continue; + + d = (amqp_basic_deliver_t *) frame.payload.method.decoded; + printf("Delivery %u, exchange %.*s routingkey %.*s\n", + (unsigned) d->delivery_tag, + (int) d->exchange.len, (char *) d->exchange.bytes, + (int) d->routing_key.len, (char *) d->routing_key.bytes); + + result = amqp_simple_wait_frame(conn, &frame); + if (result < 0) + break; + + if (frame.frame_type != AMQP_FRAME_HEADER) { + fprintf(stderr, "Expected header!"); + abort(); + } + p = (amqp_basic_properties_t *) frame.payload.properties.decoded; + if (p->_flags & AMQP_BASIC_CONTENT_TYPE_FLAG) { + printf("Content-type: %.*s\n", + (int) p->content_type.len, (char *) p->content_type.bytes); + } + printf("----\n"); + + body_target = frame.payload.properties.body_size; + body_received = 0; + + while (body_received < body_target) { + result = amqp_simple_wait_frame(conn, &frame); + if (result < 0) + break; + + if (frame.frame_type != AMQP_FRAME_BODY) { + fprintf(stderr, "Expected body!"); + abort(); + } + + body_received += frame.payload.body_fragment.len; + assert(body_received <= body_target); + + amqp_dump(frame.payload.body_fragment.bytes, + frame.payload.body_fragment.len); + } + + if (body_received != body_target) { + /* Can only happen when amqp_simple_wait_frame returns <= 0 */ + /* We break here to close the connection */ + break; + } + + amqp_basic_ack(conn, 1, d->delivery_tag, 0); + } + } + + die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel"); + die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection"); + die_on_error(amqp_destroy_connection(conn), "Ending connection"); + + return 0; +} diff --git a/examples/amqps_producer.c b/examples/amqps_producer.c new file mode 100644 index 0000000..6e00f29 --- /dev/null +++ b/examples/amqps_producer.c @@ -0,0 +1,145 @@ +/* + * ***** BEGIN LICENSE BLOCK ***** + * Version: MIT + * + * Portions created by VMware are Copyright (c) 2007-2012 VMware, Inc. + * All Rights Reserved. + * + * Portions created by Tony Garnock-Jones are Copyright (c) 2009-2010 + * VMware, Inc. and Tony Garnock-Jones. All Rights Reserved. + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, copy, + * modify, merge, publish, distribute, sublicense, and/or sell copies + * of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS + * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN + * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * ***** END LICENSE BLOCK ***** + */ + +#include +#include +#include + +#include +#include +#include + +#include "utils.h" + +#define SUMMARY_EVERY_US 1000000 + +static void send_batch(amqp_connection_state_t conn, + char const *queue_name, + int rate_limit, + int message_count) +{ + uint64_t start_time = now_microseconds(); + int i; + int sent = 0; + int previous_sent = 0; + uint64_t previous_report_time = start_time; + uint64_t next_summary_time = start_time + SUMMARY_EVERY_US; + + char message[256]; + amqp_bytes_t message_bytes; + + for (i = 0; i < (int)sizeof(message); i++) { + message[i] = i & 0xff; + } + + message_bytes.len = sizeof(message); + message_bytes.bytes = message; + + for (i = 0; i < message_count; i++) { + uint64_t now = now_microseconds(); + + die_on_error(amqp_basic_publish(conn, + 1, + amqp_cstring_bytes("amq.direct"), + amqp_cstring_bytes(queue_name), + 0, + 0, + NULL, + message_bytes), + "Publishing"); + sent++; + if (now > next_summary_time) { + int countOverInterval = sent - previous_sent; + double intervalRate = countOverInterval / ((now - previous_report_time) / 1000000.0); + printf("%d ms: Sent %d - %d since last report (%d Hz)\n", + (int)(now - start_time) / 1000, sent, countOverInterval, (int) intervalRate); + + previous_sent = sent; + previous_report_time = now; + next_summary_time += SUMMARY_EVERY_US; + } + + while (((i * 1000000.0) / (now - start_time)) > rate_limit) { + microsleep(2000); + now = now_microseconds(); + } + } + + { + uint64_t stop_time = now_microseconds(); + int total_delta = stop_time - start_time; + + printf("PRODUCER - Message count: %d\n", message_count); + printf("Total time, milliseconds: %d\n", total_delta / 1000); + printf("Overall messages-per-second: %g\n", (message_count / (total_delta / 1000000.0))); + } +} + +int main(int argc, char const * const *argv) { + char const *hostname; + int port; + int rate_limit; + int message_count; + + int sockfd; + amqp_connection_state_t conn; + + if (argc < 5) { + fprintf(stderr, "Usage: amqps_producer host port rate_limit message_count " + "[cacert.pem [key.pem cert.pem]]\n"); + return 1; + } + + hostname = argv[1]; + port = atoi(argv[2]); + rate_limit = atoi(argv[3]); + message_count = atoi(argv[4]); + + conn = amqp_new_connection(); + + die_on_error(sockfd = amqp_open_ssl_socket(conn, hostname, port, + argc > 5 ? argv[5] : NULL, + argc > 7 ? argv[6] : NULL, + argc > 7 ? argv[7] : NULL), + "Opening SSL/TLS socket"); + die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), + "Logging in"); + amqp_channel_open(conn, 1); + die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel"); + + send_batch(conn, "test queue", rate_limit, message_count); + + die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel"); + die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection"); + die_on_error(amqp_destroy_connection(conn), "Ending connection"); + return 0; +} diff --git a/examples/amqps_sendstring.c b/examples/amqps_sendstring.c new file mode 100644 index 0000000..2315982 --- /dev/null +++ b/examples/amqps_sendstring.c @@ -0,0 +1,97 @@ +/* + * ***** BEGIN LICENSE BLOCK ***** + * Version: MIT + * + * Portions created by VMware are Copyright (c) 2007-2012 VMware, Inc. + * All Rights Reserved. + * + * Portions created by Tony Garnock-Jones are Copyright (c) 2009-2010 + * VMware, Inc. and Tony Garnock-Jones. All Rights Reserved. + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, copy, + * modify, merge, publish, distribute, sublicense, and/or sell copies + * of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS + * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN + * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * ***** END LICENSE BLOCK ***** + */ + +#include +#include +#include + +#include +#include +#include + +#include "utils.h" + +int main(int argc, char const * const *argv) { + char const *hostname; + int port; + char const *exchange; + char const *routingkey; + char const *messagebody; + + int sockfd; + amqp_connection_state_t conn; + + if (argc < 6) { + fprintf(stderr, "Usage: amqps_sendstring host port exchange routingkey " + "messagebody [cacert.pem [key.pem cert.pem]]\n"); + return 1; + } + + hostname = argv[1]; + port = atoi(argv[2]); + exchange = argv[3]; + routingkey = argv[4]; + messagebody = argv[5]; + + conn = amqp_new_connection(); + + die_on_error(sockfd = amqp_open_ssl_socket(conn, hostname, port, + argc > 6 ? argv[6] : NULL, + argc > 8 ? argv[7] : NULL, + argc > 8 ? argv[8] : NULL), + "Opening socket"); + die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), + "Logging in"); + amqp_channel_open(conn, 1); + die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel"); + + { + amqp_basic_properties_t props; + props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG; + props.content_type = amqp_cstring_bytes("text/plain"); + props.delivery_mode = 2; /* persistent delivery mode */ + die_on_error(amqp_basic_publish(conn, + 1, + amqp_cstring_bytes(exchange), + amqp_cstring_bytes(routingkey), + 0, + 0, + &props, + amqp_cstring_bytes(messagebody)), + "Publishing"); + } + + die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel"); + die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection"); + die_on_error(amqp_destroy_connection(conn), "Ending connection"); + return 0; +} diff --git a/examples/amqps_unbind.c b/examples/amqps_unbind.c new file mode 100644 index 0000000..e429c6d --- /dev/null +++ b/examples/amqps_unbind.c @@ -0,0 +1,88 @@ +/* + * ***** BEGIN LICENSE BLOCK ***** + * Version: MIT + * + * Portions created by VMware are Copyright (c) 2007-2012 VMware, Inc. + * All Rights Reserved. + * + * Portions created by Tony Garnock-Jones are Copyright (c) 2009-2010 + * VMware, Inc. and Tony Garnock-Jones. All Rights Reserved. + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, copy, + * modify, merge, publish, distribute, sublicense, and/or sell copies + * of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS + * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN + * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * ***** END LICENSE BLOCK ***** + */ + +#include +#include +#include + +#include +#include +#include + +#include "utils.h" + +int main(int argc, char const * const *argv) { + char const *hostname; + int port; + char const *exchange; + char const *bindingkey; + char const *queue; + + int sockfd; + amqp_connection_state_t conn; + + if (argc < 6) { + fprintf(stderr, "Usage: amqps_unbind host port exchange bindingkey queue " + "[cacert.pem [key.pem cert.pem]]\n"); + return 1; + } + + hostname = argv[1]; + port = atoi(argv[2]); + exchange = argv[3]; + bindingkey = argv[4]; + queue = argv[5]; + + conn = amqp_new_connection(); + + die_on_error(sockfd = amqp_open_ssl_socket(conn, hostname, port, + argc > 6 ? argv[6] : NULL, + argc > 8 ? argv[7] : NULL, + argc > 8 ? argv[8] : NULL), + "Opening socket"); + die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), + "Logging in"); + amqp_channel_open(conn, 1); + die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel"); + + amqp_queue_unbind(conn, 1, + amqp_cstring_bytes(queue), + amqp_cstring_bytes(exchange), + amqp_cstring_bytes(bindingkey), + amqp_empty_table); + die_on_amqp_error(amqp_get_rpc_reply(conn), "Unbinding"); + + die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel"); + die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection"); + die_on_error(amqp_destroy_connection(conn), "Ending connection"); + return 0; +} -- cgit v1.2.1