/* vim:set ft=c ts=2 sw=2 sts=2 et cindent: */ /* * ***** BEGIN LICENSE BLOCK ***** * Version: MIT * * Portions created by VMware are Copyright (c) 2007-2012 VMware, Inc. * All Rights Reserved. * * Portions created by Tony Garnock-Jones are Copyright (c) 2009-2010 * VMware, Inc. and Tony Garnock-Jones. All Rights Reserved. * * Permission is hereby granted, free of charge, to any person * obtaining a copy of this software and associated documentation * files (the "Software"), to deal in the Software without * restriction, including without limitation the rights to use, copy, * modify, merge, publish, distribute, sublicense, and/or sell copies * of the Software, and to permit persons to whom the Software is * furnished to do so, subject to the following conditions: * * The above copyright notice and this permission notice shall be * included in all copies or substantial portions of the Software. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE * SOFTWARE. * ***** END LICENSE BLOCK ***** */ #include #include #include #include #include #include #include #include "utils.h" #define SUMMARY_EVERY_US 1000000 static void run(amqp_connection_state_t conn) { uint64_t start_time = now_microseconds(); int received = 0; int previous_received = 0; uint64_t previous_report_time = start_time; uint64_t next_summary_time = start_time + SUMMARY_EVERY_US; amqp_frame_t frame; int result; size_t body_received; size_t body_target; uint64_t now; while (1) { now = now_microseconds(); if (now > next_summary_time) { int countOverInterval = received - previous_received; double intervalRate = countOverInterval / ((now - previous_report_time) / 1000000.0); printf("%d ms: Received %d - %d since last report (%d Hz)\n", (int)(now - start_time) / 1000, received, countOverInterval, (int) intervalRate); previous_received = received; previous_report_time = now; next_summary_time += SUMMARY_EVERY_US; } amqp_maybe_release_buffers(conn); result = amqp_simple_wait_frame(conn, &frame); if (result < 0) return; if (frame.frame_type != AMQP_FRAME_METHOD) continue; if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD) continue; result = amqp_simple_wait_frame(conn, &frame); if (result < 0) return; if (frame.frame_type != AMQP_FRAME_HEADER) { fprintf(stderr, "Expected header!"); abort(); } body_target = frame.payload.properties.body_size; body_received = 0; while (body_received < body_target) { result = amqp_simple_wait_frame(conn, &frame); if (result < 0) return; if (frame.frame_type != AMQP_FRAME_BODY) { fprintf(stderr, "Expected body!"); abort(); } body_received += frame.payload.body_fragment.len; assert(body_received <= body_target); } received++; } } int main(int argc, char const * const *argv) { char const *hostname; int port; char const *exchange; char const *bindingkey; int sockfd; amqp_connection_state_t conn; amqp_bytes_t queuename; if (argc < 3) { fprintf(stderr, "Usage: amqp_consumer host port\n"); return 1; } hostname = argv[1]; port = atoi(argv[2]); exchange = "amq.direct"; /* argv[3]; */ bindingkey = "test queue"; /* argv[4]; */ conn = amqp_new_connection(); die_on_error(sockfd = amqp_open_socket(hostname, port), "Opening socket"); amqp_set_sockfd(conn, sockfd); die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), "Logging in"); amqp_channel_open(conn, 1); die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel"); { amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, 1, amqp_empty_bytes, 0, 0, 0, 1, amqp_empty_table); die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring queue"); queuename = amqp_bytes_malloc_dup(r->queue); if (queuename.bytes == NULL) { fprintf(stderr, "Out of memory while copying queue name"); return 1; } } amqp_queue_bind(conn, 1, queuename, amqp_cstring_bytes(exchange), amqp_cstring_bytes(bindingkey), amqp_empty_table); die_on_amqp_error(amqp_get_rpc_reply(conn), "Binding queue"); amqp_basic_consume(conn, 1, queuename, amqp_empty_bytes, 0, 1, 0, amqp_empty_table); die_on_amqp_error(amqp_get_rpc_reply(conn), "Consuming"); run(conn); die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel"); die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection"); die_on_error(amqp_destroy_connection(conn), "Ending connection"); return 0; }