diff options
Diffstat (limited to 'examples/amqp_sendstring.c')
-rw-r--r-- | examples/amqp_sendstring.c | 65 |
1 files changed, 62 insertions, 3 deletions
diff --git a/examples/amqp_sendstring.c b/examples/amqp_sendstring.c index bdf4285..6fe540d 100644 --- a/examples/amqp_sendstring.c +++ b/examples/amqp_sendstring.c @@ -4,6 +4,7 @@ #include <stdint.h> #include <amqp.h> +#include <amqp_framing.h> #include <unistd.h> @@ -14,6 +15,49 @@ static void die_on_error(int x, char const *context) { } } +static void die_on_amqp_error(amqp_rpc_reply_t x, char const *context) { + switch (x.reply_type) { + case AMQP_RESPONSE_NORMAL: + return; + + case AMQP_RESPONSE_NONE: + fprintf(stderr, "%s: missing RPC reply type!", context); + break; + + case AMQP_RESPONSE_LIBRARY_EXCEPTION: + fprintf(stderr, "%s: %s\n", context, strerror(x.library_errno)); + break; + + case AMQP_RESPONSE_SERVER_EXCEPTION: + switch (x.reply.id) { + case AMQP_CONNECTION_CLOSE_METHOD: { + amqp_connection_close_t *m = (amqp_connection_close_t *) x.reply.decoded; + fprintf(stderr, "%s: server connection error %d, message: %*s", + context, + m->reply_code, + (int) m->reply_text.len, + (char *) m->reply_text.bytes); + break; + } + case AMQP_CHANNEL_CLOSE_METHOD: { + amqp_channel_close_t *m = (amqp_channel_close_t *) x.reply.decoded; + fprintf(stderr, "%s: server channel error %d, message: %*s", + context, + m->reply_code, + (int) m->reply_text.len, + (char *) m->reply_text.bytes); + break; + } + default: + fprintf(stderr, "%s: unknown server error, method id 0x%08X", context, x.reply.id); + break; + } + break; + } + + exit(1); +} + int main(int argc, char const * const *argv) { char const *hostname; int port; @@ -35,15 +79,30 @@ int main(int argc, char const * const *argv) { routingkey = argv[4]; messagebody = argv[5]; + conn = amqp_new_connection(); + die_on_error(sockfd = amqp_open_socket(hostname, port), "Opening socket"); - amqp_send_header((amqp_writer_fun_t) write, sockfd); + amqp_set_sockfd(conn, sockfd); + die_on_amqp_error(amqp_login(conn, "/", 131072, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), + "Logging in"); - conn = amqp_new_connection(); + die_on_error(amqp_basic_publish(conn, + amqp_cstring_bytes(exchange), + amqp_cstring_bytes(routingkey), + 0, + 0, + NULL, + amqp_cstring_bytes(messagebody)), + "Publishing"); + printf("Waiting for frames...\n"); while (1) { amqp_frame_t frame; - printf("Result %d\n", amqp_simple_wait_frame(conn, sockfd, &frame)); + int result = amqp_simple_wait_frame(conn, &frame); + printf("Result %d\n", result); printf("Frame type %d, channel %d\n", frame.frame_type, frame.channel); + if (result == 0) break; + amqp_maybe_release_buffers(conn); } amqp_destroy_connection(conn); |