diff options
author | Tony Garnock-Jones <tonyg@kcbbs.gen.nz> | 2009-04-30 00:08:51 +0100 |
---|---|---|
committer | Tony Garnock-Jones <tonyg@kcbbs.gen.nz> | 2009-04-30 00:08:51 +0100 |
commit | 107d08b387e3f43aed3e11465c6c0cca64f89b4a (patch) | |
tree | c3b03096e23ed1932c04ddb1312c94a15159a1cf /examples/amqp_producer.c | |
parent | 5ad7fc306e8a6fb18222a502b8881f39384c4076 (diff) | |
download | rabbitmq-c-github-ask-107d08b387e3f43aed3e11465c6c0cca64f89b4a.tar.gz |
Simple producer/consumer examples
Diffstat (limited to 'examples/amqp_producer.c')
-rw-r--r-- | examples/amqp_producer.c | 104 |
1 files changed, 104 insertions, 0 deletions
diff --git a/examples/amqp_producer.c b/examples/amqp_producer.c new file mode 100644 index 0000000..ef270b1 --- /dev/null +++ b/examples/amqp_producer.c @@ -0,0 +1,104 @@ +#include <stdlib.h> +#include <stdio.h> +#include <string.h> + +#include <stdint.h> +#include <amqp.h> +#include <amqp_framing.h> + +#include <unistd.h> + +#include "example_utils.h" + +#define SUMMARY_EVERY_US 1000000 + +static void send_batch(amqp_connection_state_t conn, + char const *queue_name, + int rate_limit, + int message_count) +{ + long long start_time = now_microseconds(); + int i; + int sent = 0; + int previous_sent = 0; + long long previous_report_time = start_time; + long long next_summary_time = start_time + SUMMARY_EVERY_US; + + char message[256]; + + for (i = 0; i < sizeof(message); i++) { + message[i] = i & 0xff; + } + + for (i = 0; i < message_count; i++) { + long long now = now_microseconds(); + die_on_error(amqp_basic_publish(conn, + (amqp_bytes_t) {.len = 0}, + amqp_cstring_bytes(queue_name), + 0, + 0, + NULL, + (amqp_bytes_t) {.len = sizeof(message), .bytes = message}), + "Publishing"); + sent++; + if (now > next_summary_time) { + int countOverInterval = sent - previous_sent; + double intervalRate = countOverInterval / ((now - previous_report_time) / 1000000.0); + printf("%lld ms: Sent %d - %d since last report (%d Hz)\n", + (now - start_time) / 1000, sent, countOverInterval, (int) intervalRate); + + previous_sent = sent; + previous_report_time = now; + next_summary_time += SUMMARY_EVERY_US; + } + + while (((i * 1000000.0) / (now - start_time)) > rate_limit) { + usleep(2000); + now = now_microseconds(); + } + } + + { + long long stop_time = now_microseconds(); + long long total_delta = stop_time - start_time; + + printf("PRODUCER - Message count: %d\n", message_count); + printf("Total time, milliseconds: %lld\n", total_delta / 1000); + printf("Overall messages-per-second: %g\n", (message_count / (total_delta / 1000000.0))); + } +} + +int main(int argc, char const * const *argv) { + char const *hostname; + int port; + int rate_limit; + int message_count; + + int sockfd; + amqp_connection_state_t conn; + + if (argc < 5) { + fprintf(stderr, "Usage: amqp_producer host port rate_limit message_count\n"); + return 1; + } + + hostname = argv[1]; + port = atoi(argv[2]); + rate_limit = atoi(argv[3]); + message_count = atoi(argv[4]); + + conn = amqp_new_connection(); + + die_on_error(sockfd = amqp_open_socket(hostname, port), "Opening socket"); + amqp_set_sockfd(conn, sockfd); + die_on_amqp_error(amqp_login(conn, "/", 131072, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), + "Logging in"); + + send_batch(conn, "test queue", rate_limit, message_count); + + die_on_amqp_error(amqp_channel_close(conn, AMQP_REPLY_SUCCESS), "Closing channel"); + die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection"); + amqp_destroy_connection(conn); + die_on_error(close(sockfd), "Closing socket"); + return 0; +} |