From 76e825b388c6c3b65ef238a245748f38d3f1c1fc Mon Sep 17 00:00:00 2001 From: David Wragg Date: Sun, 30 May 2010 23:31:40 +0100 Subject: Move all includes of popt.h into common.h, which depends on it anyway --- tools/common.c | 2 -- tools/common.h | 2 ++ tools/consume.c | 2 -- tools/get.c | 2 -- tools/publish.c | 2 -- 5 files changed, 2 insertions(+), 8 deletions(-) (limited to 'tools') diff --git a/tools/common.c b/tools/common.c index 6a38a95..6c0e871 100644 --- a/tools/common.c +++ b/tools/common.c @@ -61,8 +61,6 @@ #include #include -#include - #include "common.h" extern char **environ; diff --git a/tools/common.h b/tools/common.h index 09a9242..8ea754c 100644 --- a/tools/common.h +++ b/tools/common.h @@ -50,6 +50,8 @@ #include +#include + #include #include diff --git a/tools/consume.c b/tools/consume.c index 40b61d1..b6bd5e2 100644 --- a/tools/consume.c +++ b/tools/consume.c @@ -53,8 +53,6 @@ #include #include -#include - #include "common.h" /* Convert a amqp_bytes_t to an escaped string form for printing. We diff --git a/tools/get.c b/tools/get.c index f746fd1..8f8e0d0 100644 --- a/tools/get.c +++ b/tools/get.c @@ -52,8 +52,6 @@ #include -#include - #include "common.h" static int do_get(amqp_connection_state_t conn, char *queue) diff --git a/tools/publish.c b/tools/publish.c index 21314b2..15d2386 100644 --- a/tools/publish.c +++ b/tools/publish.c @@ -54,8 +54,6 @@ #include #include -#include - #include "common.h" static void do_publish(amqp_connection_state_t conn, -- cgit v1.2.1 From 2347dc9977d3bf0c9ed19f7ed3a905eb4e65fa46 Mon Sep 17 00:00:00 2001 From: David Wragg Date: Sun, 30 May 2010 23:31:40 +0100 Subject: Add amqp_end_connection, which closes the socket and destroys the connection --- tools/common.c | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) (limited to 'tools') diff --git a/tools/common.c b/tools/common.c index 6c0e871..d4771ac 100644 --- a/tools/common.c +++ b/tools/common.c @@ -245,16 +245,15 @@ amqp_connection_state_t make_connection(void) void close_connection(amqp_connection_state_t conn) { - int s = amqp_get_sockfd(conn); - + int res; die_rpc(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "closing channel"); die_rpc(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "closing connection"); - amqp_destroy_connection(conn); - - if (close(s) < 0) - die_errno(errno, "closing socket"); + + res = amqp_end_connection(conn); + if (res < 0) + die_errno(-res, "closing connection"); } amqp_bytes_t read_all(int fd) -- cgit v1.2.1 From 66a0a987914626fc0ea86067a0ea1dd7a2bebdd2 Mon Sep 17 00:00:00 2001 From: David Wragg Date: Sun, 30 May 2010 23:31:40 +0100 Subject: Make error codes returned by librabbitmq functions opaque Windows doesn't generally use POSIX error codes, which poses a problem for librabbitmq's approach of using those error codes in its API. So make the librabbitmq error codes opaque: They are still be integers, but client code is not supposed to assume anything about them, except that they can be passed to a new amqp_error_string() function which returns the corresponding error message Internally, the error codes are either taken from a set of librabbitmq-specific values, or correspond to an OS-specific (POSIX or win32) error code, with a simple encoding to indicate which is which. --- tools/common.c | 54 ++++++++++++++++++++++++------------------------------ tools/common.h | 6 ++++-- tools/consume.c | 8 ++++---- tools/publish.c | 3 +-- 4 files changed, 33 insertions(+), 38 deletions(-) (limited to 'tools') diff --git a/tools/common.c b/tools/common.c index d4771ac..ec24085 100644 --- a/tools/common.c +++ b/tools/common.c @@ -84,11 +84,24 @@ void die_errno(int err, const char *fmt, ...) va_start(ap, fmt); vfprintf(stderr, fmt, ap); va_end(ap); - fprintf(stderr, ": %s\n", strerror(err)); + fprintf(stderr, ": %s\n", strerror(errno)); exit(1); } -char *amqp_server_exception_string(amqp_rpc_reply_t r) +void die_amqp_error(int err, const char *fmt, ...) +{ + if (err <= 0) + return; + + va_list ap; + va_start(ap, fmt); + vfprintf(stderr, fmt, ap); + va_end(ap); + fprintf(stderr, ": %s\n", amqp_error_string(err)); + exit(1); +} + +const char *amqp_server_exception_string(amqp_rpc_reply_t r) { int res; char *s; @@ -123,26 +136,17 @@ char *amqp_server_exception_string(amqp_rpc_reply_t r) return res >= 0 ? s : NULL; } -char *amqp_rpc_reply_string(amqp_rpc_reply_t r) +const char *amqp_rpc_reply_string(amqp_rpc_reply_t r) { - const char *s; - switch (r.reply_type) { case AMQP_RESPONSE_NORMAL: - s = "normal response"; - break; + return strdup("normal response"); case AMQP_RESPONSE_NONE: - s = "missing RPC reply type"; - break; + return strdup("missing RPC reply type"); case AMQP_RESPONSE_LIBRARY_EXCEPTION: - if (r.library_errno) - s = strerror(r.library_errno); - else - s = "end of stream"; - - break; + return amqp_error_string(r.library_error); case AMQP_RESPONSE_SERVER_EXCEPTION: return amqp_server_exception_string(r); @@ -150,8 +154,6 @@ char *amqp_rpc_reply_string(amqp_rpc_reply_t r) default: abort(); } - - return strdup(s); } void die_rpc(amqp_rpc_reply_t r, const char *fmt, ...) @@ -220,13 +222,8 @@ amqp_connection_state_t make_connection(void) } s = amqp_open_socket(host, port ? port : 5672); - if (s < 0) { - if (s == -ENOENT) - die("unknown host %s", host); - else - die_errno(-s, "opening socket to %s", amqp_server); - } - + die_amqp_error(-s, "opening socket to %s", amqp_server); + set_cloexec(s); conn = amqp_new_connection(); @@ -252,8 +249,7 @@ void close_connection(amqp_connection_state_t conn) "closing connection"); res = amqp_end_connection(conn); - if (res < 0) - die_errno(-res, "closing connection"); + die_amqp_error(-res, "closing connection"); } amqp_bytes_t read_all(int fd) @@ -304,8 +300,7 @@ void copy_body(amqp_connection_state_t conn, int fd) amqp_frame_t frame; int res = amqp_simple_wait_frame(conn, &frame); - if (res < 0) - die_errno(-res, "waiting for header frame"); + die_amqp_error(-res, "waiting for header frame"); if (frame.frame_type != AMQP_FRAME_HEADER) die("expected header, got frame type 0x%X", frame.frame_type); @@ -313,8 +308,7 @@ void copy_body(amqp_connection_state_t conn, int fd) body_remaining = frame.payload.properties.body_size; while (body_remaining) { res = amqp_simple_wait_frame(conn, &frame); - if (res < 0) - die_errno(-res, "waiting for body frame"); + die_amqp_error(-res, "waiting for body frame"); if (frame.frame_type != AMQP_FRAME_BODY) die("expected body, got frame type 0x%X", frame.frame_type); diff --git a/tools/common.h b/tools/common.h index 8ea754c..b3d8ab9 100644 --- a/tools/common.h +++ b/tools/common.h @@ -55,13 +55,15 @@ #include #include -extern char *amqp_server_exception_string(amqp_rpc_reply_t r); -extern char *amqp_rpc_reply_string(amqp_rpc_reply_t r); +extern const char *amqp_server_exception_string(amqp_rpc_reply_t r); +extern const char *amqp_rpc_reply_string(amqp_rpc_reply_t r); extern void die(const char *fmt, ...) __attribute__ ((format (printf, 1, 2))); extern void die_errno(int err, const char *fmt, ...) __attribute__ ((format (printf, 2, 3))); +extern void die_amqp_error(int err, const char *fmt, ...) + __attribute__ ((format (printf, 2, 3))); extern void die_rpc(amqp_rpc_reply_t r, const char *fmt, ...) __attribute__ ((format (printf, 2, 3))); diff --git a/tools/consume.c b/tools/consume.c index b6bd5e2..9999960 100644 --- a/tools/consume.c +++ b/tools/consume.c @@ -155,8 +155,7 @@ static void do_consume(amqp_connection_state_t conn, amqp_bytes_t queue, struct pipeline pl; uint64_t delivery_tag; int res = amqp_simple_wait_frame(conn, &frame); - if (res < 0) - die_errno(-res, "waiting for header frame"); + die_amqp_error(res, "waiting for header frame"); if (frame.frame_type != AMQP_FRAME_METHOD || frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD) @@ -170,8 +169,9 @@ static void do_consume(amqp_connection_state_t conn, amqp_bytes_t queue, copy_body(conn, pl.infd); if (finish_pipeline(&pl) && !no_ack) - die_errno(-amqp_basic_ack(conn, 1, delivery_tag, 0), - "basic.ack"); + die_amqp_error(amqp_basic_ack(conn, 1, delivery_tag, + 0), + "basic.ack"); amqp_maybe_release_buffers(conn); } diff --git a/tools/publish.c b/tools/publish.c index 15d2386..0917dae 100644 --- a/tools/publish.c +++ b/tools/publish.c @@ -64,8 +64,7 @@ static void do_publish(amqp_connection_state_t conn, cstring_bytes(exchange), cstring_bytes(routing_key), 0, 0, props, body); - if (res != 0) - die_errno(-res, "basic.publish"); + die_amqp_error(res, "basic.publish"); } int main(int argc, const char **argv) -- cgit v1.2.1 From 1c198e88d1a0c74676f8d6fade99b2531ba815b8 Mon Sep 17 00:00:00 2001 From: David Wragg Date: Sun, 30 May 2010 23:31:40 +0100 Subject: A Windows port, using MinGW/MSYS --- tools/Makefile.am | 16 ++-- tools/common.c | 59 +------------- tools/common.h | 8 -- tools/consume.c | 1 + tools/unix/process.c | 100 ++++++++++++++++++++++++ tools/unix/process.h | 57 ++++++++++++++ tools/windows/compat.c | 73 +++++++++++++++++ tools/windows/compat.h | 51 ++++++++++++ tools/windows/process.c | 204 ++++++++++++++++++++++++++++++++++++++++++++++++ tools/windows/process.h | 59 ++++++++++++++ 10 files changed, 559 insertions(+), 69 deletions(-) create mode 100644 tools/unix/process.c create mode 100644 tools/unix/process.h create mode 100644 tools/windows/compat.c create mode 100644 tools/windows/compat.h create mode 100644 tools/windows/process.c create mode 100644 tools/windows/process.h (limited to 'tools') diff --git a/tools/Makefile.am b/tools/Makefile.am index 307fbd2..3f9fd6a 100644 --- a/tools/Makefile.am +++ b/tools/Makefile.am @@ -1,15 +1,21 @@ bin_PROGRAMS = amqp-publish amqp-get amqp-consume -AM_CFLAGS = -I$(top_srcdir)/librabbitmq +AM_CFLAGS = -I$(top_srcdir)/librabbitmq -I$(PLATFORM_DIR) AM_LDFLAGS = $(top_builddir)/librabbitmq/librabbitmq.la LDADD=$(LIBPOPT) -noinst_HEADERS = common.h +noinst_HEADERS = common.h $(PLATFORM_DIR)/process.h -amqp_publish_SOURCES = publish.c common.c -amqp_get_SOURCES = get.c common.c -amqp_consume_SOURCES = consume.c common.c +COMMON_SOURCES = common.c + +if WINDOWS +COMMON_SOURCES += windows/compat.c +endif + +amqp_publish_SOURCES = publish.c $(COMMON_SOURCES) +amqp_get_SOURCES = get.c $(COMMON_SOURCES) +amqp_consume_SOURCES = consume.c $(PLATFORM_DIR)/process.c $(COMMON_SOURCES) if TOOLS_DOC man_MANS = doc/amqp-publish.1 doc/amqp-consume.1 doc/amqp-get.1 doc/librabbitmq-tools.7 diff --git a/tools/common.c b/tools/common.c index ec24085..39099c1 100644 --- a/tools/common.c +++ b/tools/common.c @@ -58,12 +58,12 @@ #include #include #include -#include -#include #include "common.h" -extern char **environ; +#ifdef WINDOWS +#include "compat.h" +#endif void die(const char *fmt, ...) { @@ -169,16 +169,6 @@ void die_rpc(amqp_rpc_reply_t r, const char *fmt, ...) exit(1); } -void set_cloexec(int fd) -{ - int flags; - - flags = fcntl(fd, F_GETFD); - if (flags == -1 - || fcntl(fd, F_SETFD, (long)(flags | FD_CLOEXEC)) == -1) - die_errno(errno, "set_cloexec"); -} - static char *amqp_server = "localhost"; static char *amqp_vhost = "/"; static char *amqp_username = "guest"; @@ -224,8 +214,6 @@ amqp_connection_state_t make_connection(void) s = amqp_open_socket(host, port ? port : 5672); die_amqp_error(-s, "opening socket to %s", amqp_server); - set_cloexec(s); - conn = amqp_new_connection(); amqp_set_sockfd(conn, s); @@ -318,47 +306,6 @@ void copy_body(amqp_connection_state_t conn, int fd) } } -void pipeline(const char * const *argv, struct pipeline *pl) -{ - posix_spawn_file_actions_t file_acts; - - int pipefds[2]; - if (pipe(pipefds)) - die_errno(errno, "pipe"); - - die_errno(posix_spawn_file_actions_init(&file_acts), - "posix_spawn_file_actions_init"); - die_errno(posix_spawn_file_actions_adddup2(&file_acts, pipefds[0], 0), - "posix_spawn_file_actions_adddup2"); - die_errno(posix_spawn_file_actions_addclose(&file_acts, pipefds[0]), - "posix_spawn_file_actions_addclose"); - die_errno(posix_spawn_file_actions_addclose(&file_acts, pipefds[1]), - "posix_spawn_file_actions_addclose"); - - die_errno(posix_spawnp(&pl->pid, argv[0], &file_acts, NULL, - (char * const *)argv, environ), - "posix_spawnp: %s", argv[0]); - - die_errno(posix_spawn_file_actions_destroy(&file_acts), - "posix_spawn_file_actions_destroy"); - - if (close(pipefds[0])) - die_errno(errno, "close"); - - pl->infd = pipefds[1]; -} - -int finish_pipeline(struct pipeline *pl) -{ - int status; - - if (close(pl->infd)) - die_errno(errno, "close"); - if (waitpid(pl->pid, &status, 0) < 0) - die_errno(errno, "waitpid"); - return WIFEXITED(status) && WEXITSTATUS(status) == 0; -} - poptContext process_options(int argc, const char **argv, struct poptOption *options, const char *help) diff --git a/tools/common.h b/tools/common.h index b3d8ab9..0caee98 100644 --- a/tools/common.h +++ b/tools/common.h @@ -77,14 +77,6 @@ extern void write_all(int fd, amqp_bytes_t data); extern void copy_body(amqp_connection_state_t conn, int fd); -struct pipeline { - int pid; - int infd; -}; - -extern void pipeline(const char * const *argv, struct pipeline *pl); -extern int finish_pipeline(struct pipeline *pl); - #define INCLUDE_OPTIONS(options) \ {NULL, 0, POPT_ARG_INCLUDE_TABLE, options, 0, options ## _title, NULL} diff --git a/tools/consume.c b/tools/consume.c index 9999960..2117bba 100644 --- a/tools/consume.c +++ b/tools/consume.c @@ -54,6 +54,7 @@ #include #include "common.h" +#include "process.h" /* Convert a amqp_bytes_t to an escaped string form for printing. We use the same escaping conventions as rabbitmqctl. */ diff --git a/tools/unix/process.c b/tools/unix/process.c new file mode 100644 index 0000000..8a02afb --- /dev/null +++ b/tools/unix/process.c @@ -0,0 +1,100 @@ +/* + * ***** BEGIN LICENSE BLOCK ***** + * Version: MPL 1.1/GPL 2.0 + * + * The contents of this file are subject to the Mozilla Public License + * Version 1.1 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * http://www.mozilla.org/MPL/ + * + * Software distributed under the License is distributed on an "AS IS" + * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See + * the License for the specific language governing rights and + * limitations under the License. + * + * The Original Code is librabbitmq. + * + * The Initial Developers of the Original Code are LShift Ltd, Cohesive + * Financial Technologies LLC, and Rabbit Technologies Ltd. Portions + * created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, Cohesive + * Financial Technologies LLC, or Rabbit Technologies Ltd are Copyright + * (C) 2007-2008 LShift Ltd, Cohesive Financial Technologies LLC, and + * Rabbit Technologies Ltd. + * + * Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift + * Ltd. Portions created by Cohesive Financial Technologies LLC are + * Copyright (C) 2007-2010 Cohesive Financial Technologies + * LLC. Portions created by Rabbit Technologies Ltd are Copyright (C) + * 2007-2010 Rabbit Technologies Ltd. + * + * Portions created by Tony Garnock-Jones are Copyright (C) 2009-2010 + * LShift Ltd and Tony Garnock-Jones. + * + * All Rights Reserved. + * + * Contributor(s): ______________________________________. + * + * Alternatively, the contents of this file may be used under the terms + * of the GNU General Public License Version 2 or later (the "GPL"), in + * which case the provisions of the GPL are applicable instead of those + * above. If you wish to allow use of your version of this file only + * under the terms of the GPL, and not to allow others to use your + * version of this file under the terms of the MPL, indicate your + * decision by deleting the provisions above and replace them with the + * notice and other provisions required by the GPL. If you do not + * delete the provisions above, a recipient may use your version of + * this file under the terms of any one of the MPL or the GPL. + * + * ***** END LICENSE BLOCK ***** + */ + +#include +#include +#include +#include + +#include "common.h" +#include "process.h" + +extern char **environ; + +void pipeline(const char *const *argv, struct pipeline *pl) +{ + posix_spawn_file_actions_t file_acts; + + int pipefds[2]; + if (pipe(pipefds)) + die_errno(errno, "pipe"); + + die_errno(posix_spawn_file_actions_init(&file_acts), + "posix_spawn_file_actions_init"); + die_errno(posix_spawn_file_actions_adddup2(&file_acts, pipefds[0], 0), + "posix_spawn_file_actions_adddup2"); + die_errno(posix_spawn_file_actions_addclose(&file_acts, pipefds[0]), + "posix_spawn_file_actions_addclose"); + die_errno(posix_spawn_file_actions_addclose(&file_acts, pipefds[1]), + "posix_spawn_file_actions_addclose"); + + die_errno(posix_spawnp(&pl->pid, argv[0], &file_acts, NULL, + (char * const *)argv, environ), + "posix_spawnp: %s", argv[0]); + + die_errno(posix_spawn_file_actions_destroy(&file_acts), + "posix_spawn_file_actions_destroy"); + + if (close(pipefds[0])) + die_errno(errno, "close"); + + pl->infd = pipefds[1]; +} + +int finish_pipeline(struct pipeline *pl) +{ + int status; + + if (close(pl->infd)) + die_errno(errno, "close"); + if (waitpid(pl->pid, &status, 0) < 0) + die_errno(errno, "waitpid"); + return WIFEXITED(status) && WEXITSTATUS(status) == 0; +} diff --git a/tools/unix/process.h b/tools/unix/process.h new file mode 100644 index 0000000..ac2939d --- /dev/null +++ b/tools/unix/process.h @@ -0,0 +1,57 @@ +/* + * ***** BEGIN LICENSE BLOCK ***** + * Version: MPL 1.1/GPL 2.0 + * + * The contents of this file are subject to the Mozilla Public License + * Version 1.1 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * http://www.mozilla.org/MPL/ + * + * Software distributed under the License is distributed on an "AS IS" + * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See + * the License for the specific language governing rights and + * limitations under the License. + * + * The Original Code is librabbitmq. + * + * The Initial Developers of the Original Code are LShift Ltd, Cohesive + * Financial Technologies LLC, and Rabbit Technologies Ltd. Portions + * created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, Cohesive + * Financial Technologies LLC, or Rabbit Technologies Ltd are Copyright + * (C) 2007-2008 LShift Ltd, Cohesive Financial Technologies LLC, and + * Rabbit Technologies Ltd. + * + * Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift + * Ltd. Portions created by Cohesive Financial Technologies LLC are + * Copyright (C) 2007-2010 Cohesive Financial Technologies + * LLC. Portions created by Rabbit Technologies Ltd are Copyright (C) + * 2007-2010 Rabbit Technologies Ltd. + * + * Portions created by Tony Garnock-Jones are Copyright (C) 2009-2010 + * LShift Ltd and Tony Garnock-Jones. + * + * All Rights Reserved. + * + * Contributor(s): ______________________________________. + * + * Alternatively, the contents of this file may be used under the terms + * of the GNU General Public License Version 2 or later (the "GPL"), in + * which case the provisions of the GPL are applicable instead of those + * above. If you wish to allow use of your version of this file only + * under the terms of the GPL, and not to allow others to use your + * version of this file under the terms of the MPL, indicate your + * decision by deleting the provisions above and replace them with the + * notice and other provisions required by the GPL. If you do not + * delete the provisions above, a recipient may use your version of + * this file under the terms of any one of the MPL or the GPL. + * + * ***** END LICENSE BLOCK ***** + */ + +struct pipeline { + int pid; + int infd; +}; + +extern void pipeline(const char *const *argv, struct pipeline *pl); +extern int finish_pipeline(struct pipeline *pl); diff --git a/tools/windows/compat.c b/tools/windows/compat.c new file mode 100644 index 0000000..f0508b2 --- /dev/null +++ b/tools/windows/compat.c @@ -0,0 +1,73 @@ +/* + * ***** BEGIN LICENSE BLOCK ***** + * Version: MPL 1.1/GPL 2.0 + * + * The contents of this file are subject to the Mozilla Public License + * Version 1.1 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * http://www.mozilla.org/MPL/ + * + * Software distributed under the License is distributed on an "AS IS" + * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See + * the License for the specific language governing rights and + * limitations under the License. + * + * The Original Code is librabbitmq. + * + * The Initial Developers of the Original Code are LShift Ltd, Cohesive + * Financial Technologies LLC, and Rabbit Technologies Ltd. Portions + * created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, Cohesive + * Financial Technologies LLC, or Rabbit Technologies Ltd are Copyright + * (C) 2007-2008 LShift Ltd, Cohesive Financial Technologies LLC, and + * Rabbit Technologies Ltd. + * + * Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift + * Ltd. Portions created by Cohesive Financial Technologies LLC are + * Copyright (C) 2007-2010 Cohesive Financial Technologies + * LLC. Portions created by Rabbit Technologies Ltd are Copyright (C) + * 2007-2010 Rabbit Technologies Ltd. + * + * Portions created by Tony Garnock-Jones are Copyright (C) 2009-2010 + * LShift Ltd and Tony Garnock-Jones. + * + * All Rights Reserved. + * + * Contributor(s): ______________________________________. + * + * Alternatively, the contents of this file may be used under the terms + * of the GNU General Public License Version 2 or later (the "GPL"), in + * which case the provisions of the GPL are applicable instead of those + * above. If you wish to allow use of your version of this file only + * under the terms of the GPL, and not to allow others to use your + * version of this file under the terms of the MPL, indicate your + * decision by deleting the provisions above and replace them with the + * notice and other provisions required by the GPL. If you do not + * delete the provisions above, a recipient may use your version of + * this file under the terms of any one of the MPL or the GPL. + * + * ***** END LICENSE BLOCK ***** + */ + +#include +#include +#include + +#include "compat.h" + +int asprintf(char **strp, const char *fmt, ...) +{ + va_list ap; + int len; + + va_start(ap, fmt); + len = _vscprintf(fmt, ap); + *strp = malloc(len+1); + if (!*strp) + return -1; + + len = vsprintf(*strp, fmt, ap); + *strp[len] = 0; + + va_end(ap); + return len; +} diff --git a/tools/windows/compat.h b/tools/windows/compat.h new file mode 100644 index 0000000..8211b37 --- /dev/null +++ b/tools/windows/compat.h @@ -0,0 +1,51 @@ +/* + * ***** BEGIN LICENSE BLOCK ***** + * Version: MPL 1.1/GPL 2.0 + * + * The contents of this file are subject to the Mozilla Public License + * Version 1.1 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * http://www.mozilla.org/MPL/ + * + * Software distributed under the License is distributed on an "AS IS" + * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See + * the License for the specific language governing rights and + * limitations under the License. + * + * The Original Code is librabbitmq. + * + * The Initial Developers of the Original Code are LShift Ltd, Cohesive + * Financial Technologies LLC, and Rabbit Technologies Ltd. Portions + * created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, Cohesive + * Financial Technologies LLC, or Rabbit Technologies Ltd are Copyright + * (C) 2007-2008 LShift Ltd, Cohesive Financial Technologies LLC, and + * Rabbit Technologies Ltd. + * + * Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift + * Ltd. Portions created by Cohesive Financial Technologies LLC are + * Copyright (C) 2007-2010 Cohesive Financial Technologies + * LLC. Portions created by Rabbit Technologies Ltd are Copyright (C) + * 2007-2010 Rabbit Technologies Ltd. + * + * Portions created by Tony Garnock-Jones are Copyright (C) 2009-2010 + * LShift Ltd and Tony Garnock-Jones. + * + * All Rights Reserved. + * + * Contributor(s): ______________________________________. + * + * Alternatively, the contents of this file may be used under the terms + * of the GNU General Public License Version 2 or later (the "GPL"), in + * which case the provisions of the GPL are applicable instead of those + * above. If you wish to allow use of your version of this file only + * under the terms of the GPL, and not to allow others to use your + * version of this file under the terms of the MPL, indicate your + * decision by deleting the provisions above and replace them with the + * notice and other provisions required by the GPL. If you do not + * delete the provisions above, a recipient may use your version of + * this file under the terms of any one of the MPL or the GPL. + * + * ***** END LICENSE BLOCK ***** + */ + +extern int asprintf(char **strp, const char *fmt, ...); diff --git a/tools/windows/process.c b/tools/windows/process.c new file mode 100644 index 0000000..9a0b893 --- /dev/null +++ b/tools/windows/process.c @@ -0,0 +1,204 @@ +/* + * ***** BEGIN LICENSE BLOCK ***** + * Version: MPL 1.1/GPL 2.0 + * + * The contents of this file are subject to the Mozilla Public License + * Version 1.1 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * http://www.mozilla.org/MPL/ + * + * Software distributed under the License is distributed on an "AS IS" + * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See + * the License for the specific language governing rights and + * limitations under the License. + * + * The Original Code is librabbitmq. + * + * The Initial Developers of the Original Code are LShift Ltd, Cohesive + * Financial Technologies LLC, and Rabbit Technologies Ltd. Portions + * created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, Cohesive + * Financial Technologies LLC, or Rabbit Technologies Ltd are Copyright + * (C) 2007-2008 LShift Ltd, Cohesive Financial Technologies LLC, and + * Rabbit Technologies Ltd. + * + * Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift + * Ltd. Portions created by Cohesive Financial Technologies LLC are + * Copyright (C) 2007-2010 Cohesive Financial Technologies + * LLC. Portions created by Rabbit Technologies Ltd are Copyright (C) + * 2007-2010 Rabbit Technologies Ltd. + * + * Portions created by Tony Garnock-Jones are Copyright (C) 2009-2010 + * LShift Ltd and Tony Garnock-Jones. + * + * All Rights Reserved. + * + * Contributor(s): ______________________________________. + * + * Alternatively, the contents of this file may be used under the terms + * of the GNU General Public License Version 2 or later (the "GPL"), in + * which case the provisions of the GPL are applicable instead of those + * above. If you wish to allow use of your version of this file only + * under the terms of the GPL, and not to allow others to use your + * version of this file under the terms of the MPL, indicate your + * decision by deleting the provisions above and replace them with the + * notice and other provisions required by the GPL. If you do not + * delete the provisions above, a recipient may use your version of + * this file under the terms of any one of the MPL or the GPL. + * + * ***** END LICENSE BLOCK ***** + */ + +#include +#include +#include + +#include "common.h" +#include "process.h" + +void die_windows_error(const char *fmt, ...) +{ + char *msg; + + va_list ap; + va_start(ap, fmt); + vfprintf(stderr, fmt, ap); + va_end(ap); + + if (!FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM + | FORMAT_MESSAGE_ALLOCATE_BUFFER, + NULL, GetLastError(), + MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), + (LPSTR)&msg, 0, NULL)) + msg = "(failed to retrieving Windows error message)"; + + fprintf(stderr, ": %s\n", msg); + exit(1); +} + +static char *make_command_line(const char *const *argv) +{ + int i; + size_t len = 1; /* initial quotes */ + char *buf; + char *dest; + + /* calculate the length of the required buffer, making worst + case assumptions for simplicity */ + for (i = 0;;) { + len += strlen(argv[i]) * 2; + + if (!argv[++i]) + break; + + len += 3; /* quotes, space, quotes */ + } + + len += 2; /* final quotes and the terminating zero */ + + dest = buf = malloc(len); + if (!buf) + die("allocating memory for subprocess command line"); + + *dest++ = '\"'; + + for (i = 0;;) { + const char *src = argv[i]; + for (;;) { + switch (*src) { + case 0: + goto done; + + case '\"': + case '\\': + *dest++ = '\\'; + /* fall through */ + + default: + *dest++ = *src++; + break; + } + } + done: + + if (!argv[++i]) + break; + + *dest++ = '\"'; + *dest++ = ' '; + *dest++ = '\"'; + } + + *dest++ = '\"'; + *dest++ = 0; + return buf; +} + +void pipeline(const char *const *argv, struct pipeline *pl) +{ + HANDLE in_read_handle, in_write_handle; + SECURITY_ATTRIBUTES sec_attr; + PROCESS_INFORMATION proc_info; + STARTUPINFO start_info; + char *cmdline = make_command_line(argv); + + sec_attr.nLength = sizeof sec_attr; + sec_attr.bInheritHandle = TRUE; + sec_attr.lpSecurityDescriptor = NULL; + + if (!CreatePipe(&in_read_handle, &in_write_handle, &sec_attr, 0)) + die_windows_error("CreatePipe"); + + if (!SetHandleInformation(in_write_handle, HANDLE_FLAG_INHERIT, 0)) + die_windows_error("SetHandleInformation"); + + /* when in Rome... */ + ZeroMemory(&proc_info, sizeof proc_info); + ZeroMemory(&start_info, sizeof start_info); + + start_info.cb = sizeof start_info; + start_info.dwFlags |= STARTF_USESTDHANDLES; + + if ((start_info.hStdError = GetStdHandle(STD_ERROR_HANDLE)) + == INVALID_HANDLE_VALUE + || (start_info.hStdOutput = GetStdHandle(STD_OUTPUT_HANDLE)) + == INVALID_HANDLE_VALUE) + die_windows_error("GetStdHandle"); + + start_info.hStdInput = in_read_handle; + + if (!CreateProcess(NULL, cmdline, NULL, NULL, TRUE, 0, + NULL, NULL, &start_info, &proc_info)) + die_windows_error("CreateProcess"); + + if (!CloseHandle(proc_info.hThread)) + die_windows_error("CloseHandle for thread"); + if (!CloseHandle(in_read_handle)) + die_windows_error("CloseHandle"); + + pl->proc_handle = proc_info.hProcess; + pl->infd = _open_osfhandle((intptr_t)in_write_handle, 0); +} + +int finish_pipeline(struct pipeline *pl) +{ + DWORD code; + + if (close(pl->infd)) + die_errno(errno, "close"); + + for (;;) { + if (!GetExitCodeProcess(pl->proc_handle, &code)) + die_windows_error("GetExitCodeProcess"); + if (code != STILL_ACTIVE) + break; + + if (WaitForSingleObject(pl->proc_handle, INFINITE) + == WAIT_FAILED) + die_windows_error("WaitForSingleObject"); + } + + if (!CloseHandle(pl->proc_handle)) + die_windows_error("CloseHandle for process"); + + return code; +} diff --git a/tools/windows/process.h b/tools/windows/process.h new file mode 100644 index 0000000..df276a7 --- /dev/null +++ b/tools/windows/process.h @@ -0,0 +1,59 @@ +/* + * ***** BEGIN LICENSE BLOCK ***** + * Version: MPL 1.1/GPL 2.0 + * + * The contents of this file are subject to the Mozilla Public License + * Version 1.1 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * http://www.mozilla.org/MPL/ + * + * Software distributed under the License is distributed on an "AS IS" + * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See + * the License for the specific language governing rights and + * limitations under the License. + * + * The Original Code is librabbitmq. + * + * The Initial Developers of the Original Code are LShift Ltd, Cohesive + * Financial Technologies LLC, and Rabbit Technologies Ltd. Portions + * created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, Cohesive + * Financial Technologies LLC, or Rabbit Technologies Ltd are Copyright + * (C) 2007-2008 LShift Ltd, Cohesive Financial Technologies LLC, and + * Rabbit Technologies Ltd. + * + * Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift + * Ltd. Portions created by Cohesive Financial Technologies LLC are + * Copyright (C) 2007-2010 Cohesive Financial Technologies + * LLC. Portions created by Rabbit Technologies Ltd are Copyright (C) + * 2007-2010 Rabbit Technologies Ltd. + * + * Portions created by Tony Garnock-Jones are Copyright (C) 2009-2010 + * LShift Ltd and Tony Garnock-Jones. + * + * All Rights Reserved. + * + * Contributor(s): ______________________________________. + * + * Alternatively, the contents of this file may be used under the terms + * of the GNU General Public License Version 2 or later (the "GPL"), in + * which case the provisions of the GPL are applicable instead of those + * above. If you wish to allow use of your version of this file only + * under the terms of the GPL, and not to allow others to use your + * version of this file under the terms of the MPL, indicate your + * decision by deleting the provisions above and replace them with the + * notice and other provisions required by the GPL. If you do not + * delete the provisions above, a recipient may use your version of + * this file under the terms of any one of the MPL or the GPL. + * + * ***** END LICENSE BLOCK ***** + */ + +#include + +struct pipeline { + HANDLE proc_handle; + int infd; +}; + +extern void pipeline(const char *const *argv, struct pipeline *pl); +extern int finish_pipeline(struct pipeline *pl); -- cgit v1.2.1 From 5e48c730f768e40663af19e01b143d95d1e11fef Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Sun, 6 Jun 2010 21:06:15 +1200 Subject: More unix and windows support files in distributions --- tools/Makefile.am | 5 +++++ 1 file changed, 5 insertions(+) (limited to 'tools') diff --git a/tools/Makefile.am b/tools/Makefile.am index ad53d88..ef7a9bc 100644 --- a/tools/Makefile.am +++ b/tools/Makefile.am @@ -20,3 +20,8 @@ amqp_get_SOURCES = get.c $(COMMON_SOURCES) amqp_consume_SOURCES = consume.c $(PLATFORM_DIR)/process.c $(COMMON_SOURCES) amqp_declare_queue_SOURCES = declare_queue.c $(COMMON_SOURCES) amqp_delete_queue_SOURCES = delete_queue.c $(COMMON_SOURCES) + +EXTRA_DIST = \ + unix/process.c unix/process.h \ + windows/process.c windows/process.h \ + windows/compat.c windows/compat.h -- cgit v1.2.1 From 7290a2692cd0f597573de51599c9b50088c02f90 Mon Sep 17 00:00:00 2001 From: David Wragg Date: Mon, 26 Jul 2010 03:09:38 +0100 Subject: Fold amqp_end_connection socket closing into amqp_destroy_connection --- tools/common.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'tools') diff --git a/tools/common.c b/tools/common.c index 39099c1..1d8fbd8 100644 --- a/tools/common.c +++ b/tools/common.c @@ -236,7 +236,7 @@ void close_connection(amqp_connection_state_t conn) die_rpc(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "closing connection"); - res = amqp_end_connection(conn); + res = amqp_destroy_connection(conn); die_amqp_error(-res, "closing connection"); } -- cgit v1.2.1 From 7032fcd3e646d5e25cd8c40144a03ab869a9f5ff Mon Sep 17 00:00:00 2001 From: David Wragg Date: Tue, 27 Jul 2010 18:17:24 +0100 Subject: Remove redundant includes of popt.h --- tools/declare_queue.c | 2 -- tools/delete_queue.c | 2 -- 2 files changed, 4 deletions(-) (limited to 'tools') diff --git a/tools/declare_queue.c b/tools/declare_queue.c index 662531f..3536455 100644 --- a/tools/declare_queue.c +++ b/tools/declare_queue.c @@ -55,8 +55,6 @@ #include #include -#include - #include "common.h" int main(int argc, const char **argv) diff --git a/tools/delete_queue.c b/tools/delete_queue.c index 41d0d13..ccd157e 100644 --- a/tools/delete_queue.c +++ b/tools/delete_queue.c @@ -55,8 +55,6 @@ #include #include -#include - #include "common.h" int main(int argc, const char **argv) -- cgit v1.2.1 From 4f30d164910f88f181489f591c0957317142da01 Mon Sep 17 00:00:00 2001 From: David Wragg Date: Tue, 27 Jul 2010 19:36:36 +0100 Subject: Free heap-allocated error strings Even though we are about to exit anyway. --- tools/common.c | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) (limited to 'tools') diff --git a/tools/common.c b/tools/common.c index 1d8fbd8..2a640be 100644 --- a/tools/common.c +++ b/tools/common.c @@ -77,10 +77,11 @@ void die(const char *fmt, ...) void die_errno(int err, const char *fmt, ...) { + va_list ap; + if (err == 0) return; - va_list ap; va_start(ap, fmt); vfprintf(stderr, fmt, ap); va_end(ap); @@ -90,14 +91,17 @@ void die_errno(int err, const char *fmt, ...) void die_amqp_error(int err, const char *fmt, ...) { + va_list ap; + char *errstr; + if (err <= 0) return; - - va_list ap; + va_start(ap, fmt); vfprintf(stderr, fmt, ap); va_end(ap); - fprintf(stderr, ": %s\n", amqp_error_string(err)); + fprintf(stderr, ": %s\n", errstr = amqp_error_string(err)); + free(errstr); exit(1); } @@ -158,14 +162,17 @@ const char *amqp_rpc_reply_string(amqp_rpc_reply_t r) void die_rpc(amqp_rpc_reply_t r, const char *fmt, ...) { + va_list ap; + char *errstr; + if (r.reply_type == AMQP_RESPONSE_NORMAL) return; - va_list ap; va_start(ap, fmt); vfprintf(stderr, fmt, ap); va_end(ap); - fprintf(stderr, ": %s\n", amqp_rpc_reply_string(r)); + fprintf(stderr, ": %s\n", errstr = amqp_rpc_reply_string(r)); + free(errstr); exit(1); } -- cgit v1.2.1 From 27235c0046dfc87b4f57c652681df4436fce3e0e Mon Sep 17 00:00:00 2001 From: David Wragg Date: Wed, 28 Jul 2010 01:08:49 +0100 Subject: Fix "const char *" to "void *" conversion warnings Functions returning a heap-allocated string should return a "char *", not a "const char *": Because the result is heap-allocated and becomes the responsibility of the caller, it is certainly modifiable. And the pointer will likely get passed to free(), triggering a conversion warning from gcc. So remove all the relevant consts. --- tools/common.c | 4 ++-- tools/common.h | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) (limited to 'tools') diff --git a/tools/common.c b/tools/common.c index 2a640be..c5bda77 100644 --- a/tools/common.c +++ b/tools/common.c @@ -105,7 +105,7 @@ void die_amqp_error(int err, const char *fmt, ...) exit(1); } -const char *amqp_server_exception_string(amqp_rpc_reply_t r) +char *amqp_server_exception_string(amqp_rpc_reply_t r) { int res; char *s; @@ -140,7 +140,7 @@ const char *amqp_server_exception_string(amqp_rpc_reply_t r) return res >= 0 ? s : NULL; } -const char *amqp_rpc_reply_string(amqp_rpc_reply_t r) +char *amqp_rpc_reply_string(amqp_rpc_reply_t r) { switch (r.reply_type) { case AMQP_RESPONSE_NORMAL: diff --git a/tools/common.h b/tools/common.h index 0caee98..84889d3 100644 --- a/tools/common.h +++ b/tools/common.h @@ -55,8 +55,8 @@ #include #include -extern const char *amqp_server_exception_string(amqp_rpc_reply_t r); -extern const char *amqp_rpc_reply_string(amqp_rpc_reply_t r); +extern char *amqp_server_exception_string(amqp_rpc_reply_t r); +extern char *amqp_rpc_reply_string(amqp_rpc_reply_t r); extern void die(const char *fmt, ...) __attribute__ ((format (printf, 1, 2))); -- cgit v1.2.1 From 8edd5fb6db1bd629e295e1932e69bfa3ee301e16 Mon Sep 17 00:00:00 2001 From: David Wragg Date: Wed, 28 Jul 2010 23:03:59 +0100 Subject: We were neglecting to free the constructed command line And fix a tyop in an error message. --- tools/windows/process.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) (limited to 'tools') diff --git a/tools/windows/process.c b/tools/windows/process.c index 9a0b893..0a005bd 100644 --- a/tools/windows/process.c +++ b/tools/windows/process.c @@ -69,7 +69,7 @@ void die_windows_error(const char *fmt, ...) NULL, GetLastError(), MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), (LPSTR)&msg, 0, NULL)) - msg = "(failed to retrieving Windows error message)"; + msg = "(failed to retrieve Windows error message)"; fprintf(stderr, ": %s\n", msg); exit(1); @@ -170,6 +170,8 @@ void pipeline(const char *const *argv, struct pipeline *pl) NULL, NULL, &start_info, &proc_info)) die_windows_error("CreateProcess"); + free(cmdline); + if (!CloseHandle(proc_info.hThread)) die_windows_error("CloseHandle for thread"); if (!CloseHandle(in_read_handle)) -- cgit v1.2.1 From 5929e7cc940c2c43ae3b2e37ead5aa2145da05a4 Mon Sep 17 00:00:00 2001 From: David Wragg Date: Sat, 7 Aug 2010 19:16:02 +0100 Subject: make distcheck was failing because of a missing $(srcdir) relative-reference --- tools/Makefile.am | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'tools') diff --git a/tools/Makefile.am b/tools/Makefile.am index ef7a9bc..ccd36ca 100644 --- a/tools/Makefile.am +++ b/tools/Makefile.am @@ -2,7 +2,7 @@ SUBDIRS=doc bin_PROGRAMS = amqp-publish amqp-get amqp-consume amqp-declare-queue amqp-delete-queue -AM_CFLAGS = -I$(top_srcdir)/librabbitmq -I$(PLATFORM_DIR) +AM_CFLAGS = -I$(top_srcdir)/librabbitmq -I$(srcdir)/$(PLATFORM_DIR) AM_LDFLAGS = $(top_builddir)/librabbitmq/librabbitmq.la LDADD=$(LIBPOPT) -- cgit v1.2.1 From 0a7540f14ce26b9c1f6fba4996c8096cfdce4890 Mon Sep 17 00:00:00 2001 From: David Wragg Date: Sun, 8 Aug 2010 00:11:43 +0100 Subject: Refine amqp-consume options for 0-9-1 queue declaration semantics --- tools/consume.c | 90 +++++++++++++++++++++------------------------------ tools/doc/consume.xml | 58 +++++++++++++++++---------------- 2 files changed, 67 insertions(+), 81 deletions(-) (limited to 'tools') diff --git a/tools/consume.c b/tools/consume.c index 2117bba..a641708 100644 --- a/tools/consume.c +++ b/tools/consume.c @@ -84,61 +84,45 @@ static char *stringify_bytes(amqp_bytes_t bytes) static amqp_bytes_t setup_queue(amqp_connection_state_t conn, char *queue, char *exchange, - char *exchange_type, char *routing_key) + char *routing_key, int declare) { - amqp_bytes_t queue_bytes; - amqp_queue_declare_ok_t *res; + amqp_bytes_t queue_bytes = cstring_bytes(queue); /* if an exchange name wasn't provided, check that we don't have options that require it. */ - if (!exchange) { - char *opt = NULL; - if (routing_key) - opt = "--routing-key"; - else if (exchange_type) - opt = "--exchange-type"; - - if (opt) { - fprintf(stderr, - "%s option requires an exchange name to be " - "provided with --exchange\n", opt); - exit(1); - } + if (!exchange && routing_key) { + fprintf(stderr, "--routing-key option requires an exchange" + " name to be provided with --exchange\n"); + exit(1); } - /* Declare the queue as auto-delete. If the queue already - exists, this won't have any effect. */ - queue_bytes = cstring_bytes(queue); - res = amqp_queue_declare(conn, 1, queue_bytes, 0, 0, 0, 1, - AMQP_EMPTY_TABLE); - if (!res) - die_rpc(amqp_get_rpc_reply(conn), "queue.declare"); - - if (!queue) { - /* the server should have provided a queue name */ - char *sq; - queue_bytes = amqp_bytes_malloc_dup(res->queue); - sq = stringify_bytes(queue_bytes); - fprintf(stderr, "Server provided queue name: %s\n", sq); - free(sq); - } + if (!queue || exchange || declare) { + /* Declare the queue as auto-delete. */ + amqp_queue_declare_ok_t *res = amqp_queue_declare(conn, 1, + queue_bytes, 0, 0, 1, 1, + AMQP_EMPTY_TABLE); + if (!res) + die_rpc(amqp_get_rpc_reply(conn), "queue.declare"); + + if (!queue) { + /* the server should have provided a queue name */ + char *sq; + queue_bytes = amqp_bytes_malloc_dup(res->queue); + sq = stringify_bytes(queue_bytes); + fprintf(stderr, "Server provided queue name: %s\n", + sq); + free(sq); + } - /* Bind to an exchange if requested */ - if (exchange) { - amqp_bytes_t eb = amqp_cstring_bytes(exchange); - - if (exchange_type) { - /* we should create the exchange */ - if (!amqp_exchange_declare(conn, 1, eb, - amqp_cstring_bytes(exchange_type), - 0, 0, 1, AMQP_EMPTY_TABLE)) - die_rpc(amqp_get_rpc_reply(conn), "exchange.declare"); + /* Bind to an exchange if requested */ + if (exchange) { + amqp_bytes_t eb = amqp_cstring_bytes(exchange); + if (!amqp_queue_bind(conn, 1, queue_bytes, eb, + cstring_bytes(routing_key), + AMQP_EMPTY_TABLE)) + die_rpc(amqp_get_rpc_reply(conn), + "queue.bind"); } - - if (!amqp_queue_bind(conn, 1, queue_bytes, eb, - cstring_bytes(routing_key), - AMQP_EMPTY_TABLE)) - die_rpc(amqp_get_rpc_reply(conn), "queue.bind"); } return queue_bytes; @@ -181,13 +165,13 @@ static void do_consume(amqp_connection_state_t conn, amqp_bytes_t queue, int main(int argc, const char **argv) { poptContext opts; - int no_ack; amqp_connection_state_t conn; const char * const *cmd_argv; char *queue = NULL; char *exchange = NULL; - char *exchange_type = NULL; char *routing_key = NULL; + int declare; + int no_ack; amqp_bytes_t queue_bytes; struct poptOption options[] = { @@ -196,11 +180,10 @@ int main(int argc, const char **argv) "the queue to consume from", "queue"}, {"exchange", 'e', POPT_ARG_STRING, &exchange, 0, "bind the queue to this exchange", "exchange"}, - {"exchange-type", 't', POPT_ARG_STRING, &exchange_type, 0, - "create auto-delete exchange of this type for binding", - "type"}, {"routing-key", 'r', POPT_ARG_STRING, &routing_key, 0, "the routing key to bind with", "routing key"}, + {"declare", 'd', POPT_ARG_NONE, &declare, 0, + "declare an exclusive queue", NULL}, {"no-ack", 'A', POPT_ARG_NONE, &no_ack, 0, "consume in no-ack mode", NULL}, POPT_AUTOHELP @@ -218,8 +201,7 @@ int main(int argc, const char **argv) } conn = make_connection(); - queue_bytes = setup_queue(conn, queue, exchange, exchange_type, - routing_key); + queue_bytes = setup_queue(conn, queue, exchange, routing_key, declare); do_consume(conn, queue_bytes, no_ack, cmd_argv); close_connection(conn); return 0; diff --git a/tools/doc/consume.xml b/tools/doc/consume.xml index 448ade6..16d61ad 100644 --- a/tools/doc/consume.xml +++ b/tools/doc/consume.xml @@ -50,8 +50,7 @@ amqp-consume can consume from an existing queue, or it can create a new queue. It can - optionally bind the queue to an existing exchange, or to a - newly created exchange. + optionally bind the queue to an existing exchange. By default, messages will be consumed with explicit @@ -72,13 +71,16 @@ The name of the queue to consume messages - from. If the specified queue does not exist, - an auto-delete queue is created with the given - name. If this option is omitted, a new - auto-delete queue will be created, with a - unique name assigned to the queue by the AMQP - server; that unique name will be displayed on - stderr. + from. + + + + If the option is + omitted, the AMQP server will assign a unique + name to the queue, and that server-assigned + name will be dixsplayed on stderr; this case + implies that an exclusive queue should be + declared. @@ -87,34 +89,36 @@ =exchange name - The name of the exchange to bind the queue to. - If omitted, binding is not performed. The - specified exchange should already exist unless - the option is - used to request the creation of an exchange. + Specifies that an exclusive queue should + be declared, and bound to the given exchange. + The specified exchange should already exist + unless the + option is used to request the creation of an + exchange. - - =type + + =routing key - This option indicates that an auto-delete - exchange of the specified type should be - created. The name of the exchange should be - given by the - option. + The routing key for binding. If omitted, an + empty routing key is assumed. - - =routing key + + - The routing key for the binding. If omitted, - an empty routing key is assumed. + Forces an exclusive queue to be declared, + even when it otherwise would not. That is, + when a queue name is specified with the + option, but no + binding to an exchange is requested with the + option. @@ -138,7 +142,7 @@ Examples - Consume messages from the queue + Consume messages from an existing queue myqueue, and output the message bodies on standard output via @@ -149,7 +153,7 @@ - Bind a newly created auto-delete queue to an + Bind a new exclusive queue to an exchange myexch, and send each message body to the script -- cgit v1.2.1 From 55a5ffa8b12a5b1de877e820786be814131340e6 Mon Sep 17 00:00:00 2001 From: David Wragg Date: Mon, 9 Aug 2010 17:30:09 +0100 Subject: add missing initializers --- tools/consume.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'tools') diff --git a/tools/consume.c b/tools/consume.c index a641708..943406b 100644 --- a/tools/consume.c +++ b/tools/consume.c @@ -170,8 +170,8 @@ int main(int argc, const char **argv) char *queue = NULL; char *exchange = NULL; char *routing_key = NULL; - int declare; - int no_ack; + int declare = 0; + int no_ack = 0; amqp_bytes_t queue_bytes; struct poptOption options[] = { -- cgit v1.2.1 From 10cefe79af380e0402907683a60f93ab9850fdca Mon Sep 17 00:00:00 2001 From: Alexandru Scvortov Date: Mon, 16 Aug 2010 13:35:08 +0100 Subject: removed auto-delete parameter from exchange.declare --- tools/consume.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'tools') diff --git a/tools/consume.c b/tools/consume.c index 0d50228..146b0a7 100644 --- a/tools/consume.c +++ b/tools/consume.c @@ -131,7 +131,7 @@ static amqp_bytes_t setup_queue(amqp_connection_state_t conn, /* we should create the exchange */ if (!amqp_exchange_declare(conn, 1, eb, amqp_cstring_bytes(exchange_type), - 0, 0, 1, AMQP_EMPTY_TABLE)) + 0, 0, AMQP_EMPTY_TABLE)) die_rpc(amqp_get_rpc_reply(conn), "exchange.declare"); } -- cgit v1.2.1