diff options
author | sigiesec <simon.giesecke@btc-ag.com> | 2017-09-21 13:44:07 +0200 |
---|---|---|
committer | Alan Antonuk <alan.antonuk@gmail.com> | 2017-10-30 23:20:54 -0700 |
commit | d4f59af61719cded7e1a50fb8419b0517bc44162 (patch) | |
tree | 2fec461a521aa0f0e7700beaf389f23534ebb504 /tests | |
parent | 550f4c56562baf2dc1c6c6b6285b4fd13fa0097c (diff) | |
download | rabbitmq-c-d4f59af61719cded7e1a50fb8419b0517bc44162.tar.gz |
test: add tests for basic.get, basic.consume & basic.publish
Add integration tests that run against a locally running broker.
Diffstat (limited to 'tests')
-rw-r--r-- | tests/CMakeLists.txt | 5 | ||||
-rw-r--r-- | tests/test_basic.c | 206 |
2 files changed, 211 insertions, 0 deletions
diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index a9a41a6..db572af 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -30,6 +30,11 @@ add_executable(test_status_enum target_link_libraries(test_status_enum rabbitmq-static) add_test(status_enum test_status_enum) +add_executable(test_basic + test_basic.c) +target_link_libraries(test_basic rabbitmq-static) +add_test(basic test_basic) + add_executable(test_sasl_mechanism test_sasl_mechanism.c) target_link_libraries(test_sasl_mechanism rabbitmq-static) add_test(sasl_mechanism test_sasl_mechanism) diff --git a/tests/test_basic.c b/tests/test_basic.c new file mode 100644 index 0000000..588ab5d --- /dev/null +++ b/tests/test_basic.c @@ -0,0 +1,206 @@ +/* vim:set ft=c ts=2 sw=2 sts=2 et cindent: */ +/* + * Copyright 2017 Simon Giesecke + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, copy, + * modify, merge, publish, distribute, sublicense, and/or sell copies + * of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS + * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN + * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +#include "amqp.h" +#include "amqp_tcp_socket.h" +#include "amqp_time.h" + +#include <stdio.h> +#include <stdlib.h> +#include <string.h> + +#ifdef _WIN32 +#include <WinSock2.h> +#else +#include <sys/time.h> +#endif + +#ifdef NDEBUG +# undef NDEBUG +#endif +#include <assert.h> + +static const int fixed_channel_id = 1; +static const char test_queue_name[] = "test_queue"; + +amqp_connection_state_t setup_connection_and_channel(void) { + amqp_connection_state_t connection_state_ = amqp_new_connection(); + + amqp_socket_t *socket = amqp_tcp_socket_new(connection_state_); + assert(socket); + + int rc = amqp_socket_open(socket, "localhost", AMQP_PROTOCOL_PORT); + assert(rc == AMQP_STATUS_OK); + + amqp_rpc_reply_t rpc_reply = amqp_login( + connection_state_, "/", 1, AMQP_DEFAULT_FRAME_SIZE, AMQP_DEFAULT_HEARTBEAT, + AMQP_SASL_METHOD_PLAIN, "guest", "guest"); + assert(rpc_reply.reply_type == AMQP_RESPONSE_NORMAL); + + amqp_channel_open_ok_t *res = + amqp_channel_open(connection_state_, fixed_channel_id); + assert(res != NULL); + + return connection_state_; +} + +void close_and_destroy_connection(amqp_connection_state_t connection_state_) { + amqp_rpc_reply_t rpc_reply = amqp_connection_close(connection_state_, AMQP_REPLY_SUCCESS); + assert(rpc_reply.reply_type == AMQP_RESPONSE_NORMAL); + + int rc = amqp_destroy_connection(connection_state_); + assert(rc == AMQP_STATUS_OK); +} + +void basic_publish(amqp_connection_state_t connectionState_, + const char *message_) { + amqp_bytes_t message_bytes = amqp_cstring_bytes(message_); + + amqp_basic_properties_t properties; + properties._flags = 0; + + properties._flags |= AMQP_BASIC_DELIVERY_MODE_FLAG; + properties.delivery_mode = AMQP_DELIVERY_NONPERSISTENT; + + int retval = amqp_basic_publish( + connectionState_, fixed_channel_id, amqp_cstring_bytes(""), + amqp_cstring_bytes(test_queue_name), + /* mandatory=*/1, + /* immediate=*/0, /* RabbitMQ 3.x does not support the "immediate" flag + according to + https://www.rabbitmq.com/specification.html */ + &properties, message_bytes); + + assert(retval == 0); +} + +void queue_declare(amqp_connection_state_t connection_state_, + const char *queue_name_) { + amqp_queue_declare_ok_t *res = amqp_queue_declare( + connection_state_, fixed_channel_id, amqp_cstring_bytes(queue_name_), + /*passive*/ 0, + /*durable*/ 0, + /*exclusive*/ 0, + /*auto_delete*/ 1, amqp_empty_table); + assert(res != NULL); +} + +char *basic_get(amqp_connection_state_t connection_state_, + const char *queue_name_, uint64_t *out_body_size_) { + amqp_rpc_reply_t rpc_reply; + amqp_time_t deadline; + struct timeval timeout = { 0, 500 }; + int time_rc = amqp_time_from_now(&deadline, &timeout); + assert(time_rc == AMQP_STATUS_OK); + + do { + rpc_reply = amqp_basic_get(connection_state_, fixed_channel_id, + amqp_cstring_bytes(queue_name_), /*no_ack*/ 1); + } while (rpc_reply.reply_type == AMQP_RESPONSE_NORMAL && + rpc_reply.reply.id == AMQP_BASIC_GET_EMPTY_METHOD && + amqp_time_has_past(deadline) == AMQP_STATUS_OK); + + assert(rpc_reply.reply_type == AMQP_RESPONSE_NORMAL); + assert(rpc_reply.reply.id == AMQP_BASIC_GET_OK_METHOD); + + amqp_message_t message; + rpc_reply = amqp_read_message(connection_state_, fixed_channel_id, &message, 0); + assert(rpc_reply.reply_type == AMQP_RESPONSE_NORMAL); + + char *body = malloc(message.body.len); + memcpy(body, message.body.bytes, message.body.len); + *out_body_size_ = message.body.len; + amqp_destroy_message(&message); + + return body; +} + +void publish_and_basic_get_message(const char *msg_to_publish) { + amqp_connection_state_t connection_state = setup_connection_and_channel(); + + queue_declare(connection_state, test_queue_name); + basic_publish(connection_state, msg_to_publish); + + uint64_t body_size; + char *msg = basic_get(connection_state, test_queue_name, &body_size); + + assert(body_size == strlen(msg_to_publish)); + assert(strncmp(msg_to_publish, msg, body_size) == 0); + free(msg); + + close_and_destroy_connection(connection_state); +} + +char *consume_message(amqp_connection_state_t connection_state_, + const char *queue_name_, uint64_t *out_body_size_) { + amqp_basic_consume_ok_t *result = + amqp_basic_consume(connection_state_, fixed_channel_id, + amqp_cstring_bytes(queue_name_), amqp_empty_bytes, + /*no_local*/ 0, + /*no_ack*/ 1, + /*exclusive*/ 0, amqp_empty_table); + assert(result != NULL); + + amqp_envelope_t envelope; + struct timeval timeout = {0, 500}; + amqp_rpc_reply_t rpc_reply = + amqp_consume_message(connection_state_, &envelope, &timeout, 0); + assert(rpc_reply.reply_type == AMQP_RESPONSE_NORMAL); + + *out_body_size_ = envelope.message.body.len; + char *body = malloc(*out_body_size_); + if (*out_body_size_) { + memcpy(body, envelope.message.body.bytes, *out_body_size_); + } + + amqp_destroy_envelope(&envelope); + return body; +} + +void publish_and_consume_message(const char *msg_to_publish) { + amqp_connection_state_t connection_state = setup_connection_and_channel(); + + queue_declare(connection_state, test_queue_name); + basic_publish(connection_state, msg_to_publish); + + uint64_t body_size; + char *msg = consume_message(connection_state, test_queue_name, &body_size); + + assert(body_size == strlen(msg_to_publish)); + assert(strncmp(msg_to_publish, msg, body_size) == 0); + free(msg); + + close_and_destroy_connection(connection_state); +} + +int main(void) { + publish_and_basic_get_message(""); + publish_and_basic_get_message("TEST"); + + publish_and_consume_message(""); + publish_and_consume_message("TEST"); + + return 0; +} |