summaryrefslogtreecommitdiff
path: root/tools
diff options
context:
space:
mode:
authorDavid Wragg <david@rabbitmq.com>2010-09-03 10:18:55 +0100
committerDavid Wragg <david@rabbitmq.com>2010-09-03 10:18:55 +0100
commitb85fa81a4076536048ea374094dccbd580a5fe6e (patch)
tree7985515d5dd7925706eaa82623449b6eaa053b7a /tools
parentb339e621a8a85fbd749fdb499161320abed5ebb3 (diff)
parent1b1340ad50e18edc194f26a7156cab44b8a1bba0 (diff)
downloadrabbitmq-c-github-ask-b85fa81a4076536048ea374094dccbd580a5fe6e.tar.gz
Merge amqp_0_9_1 into bug22951 to remove headbug22951
Diffstat (limited to 'tools')
-rw-r--r--tools/Makefile.am25
-rw-r--r--tools/common.c127
-rw-r--r--tools/common.h12
-rw-r--r--tools/consume.c101
-rw-r--r--tools/declare_queue.c2
-rw-r--r--tools/delete_queue.c2
-rw-r--r--tools/doc/consume.xml58
-rw-r--r--tools/get.c2
-rw-r--r--tools/publish.c5
-rw-r--r--tools/unix/process.c100
-rw-r--r--tools/unix/process.h57
-rw-r--r--tools/windows/compat.c73
-rw-r--r--tools/windows/compat.h51
-rw-r--r--tools/windows/process.c206
-rw-r--r--tools/windows/process.h59
15 files changed, 677 insertions, 203 deletions
diff --git a/tools/Makefile.am b/tools/Makefile.am
index 2c47385..ccd36ca 100644
--- a/tools/Makefile.am
+++ b/tools/Makefile.am
@@ -2,15 +2,26 @@ SUBDIRS=doc
bin_PROGRAMS = amqp-publish amqp-get amqp-consume amqp-declare-queue amqp-delete-queue
-AM_CFLAGS = -I$(top_srcdir)/librabbitmq
+AM_CFLAGS = -I$(top_srcdir)/librabbitmq -I$(srcdir)/$(PLATFORM_DIR)
AM_LDFLAGS = $(top_builddir)/librabbitmq/librabbitmq.la
LDADD=$(LIBPOPT)
-noinst_HEADERS = common.h
+noinst_HEADERS = common.h $(PLATFORM_DIR)/process.h
-amqp_publish_SOURCES = publish.c common.c
-amqp_get_SOURCES = get.c common.c
-amqp_consume_SOURCES = consume.c common.c
-amqp_declare_queue_SOURCES = declare_queue.c common.c
-amqp_delete_queue_SOURCES = delete_queue.c common.c
+COMMON_SOURCES = common.c
+
+if WINDOWS
+COMMON_SOURCES += windows/compat.c
+endif
+
+amqp_publish_SOURCES = publish.c $(COMMON_SOURCES)
+amqp_get_SOURCES = get.c $(COMMON_SOURCES)
+amqp_consume_SOURCES = consume.c $(PLATFORM_DIR)/process.c $(COMMON_SOURCES)
+amqp_declare_queue_SOURCES = declare_queue.c $(COMMON_SOURCES)
+amqp_delete_queue_SOURCES = delete_queue.c $(COMMON_SOURCES)
+
+EXTRA_DIST = \
+ unix/process.c unix/process.h \
+ windows/process.c windows/process.h \
+ windows/compat.c windows/compat.h
diff --git a/tools/common.c b/tools/common.c
index 6a38a95..c5bda77 100644
--- a/tools/common.c
+++ b/tools/common.c
@@ -58,14 +58,12 @@
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
-#include <spawn.h>
-#include <sys/wait.h>
-
-#include <popt.h>
#include "common.h"
-extern char **environ;
+#ifdef WINDOWS
+#include "compat.h"
+#endif
void die(const char *fmt, ...)
{
@@ -79,14 +77,31 @@ void die(const char *fmt, ...)
void die_errno(int err, const char *fmt, ...)
{
+ va_list ap;
+
if (err == 0)
return;
+ va_start(ap, fmt);
+ vfprintf(stderr, fmt, ap);
+ va_end(ap);
+ fprintf(stderr, ": %s\n", strerror(errno));
+ exit(1);
+}
+
+void die_amqp_error(int err, const char *fmt, ...)
+{
va_list ap;
+ char *errstr;
+
+ if (err <= 0)
+ return;
+
va_start(ap, fmt);
vfprintf(stderr, fmt, ap);
va_end(ap);
- fprintf(stderr, ": %s\n", strerror(err));
+ fprintf(stderr, ": %s\n", errstr = amqp_error_string(err));
+ free(errstr);
exit(1);
}
@@ -127,24 +142,15 @@ char *amqp_server_exception_string(amqp_rpc_reply_t r)
char *amqp_rpc_reply_string(amqp_rpc_reply_t r)
{
- const char *s;
-
switch (r.reply_type) {
case AMQP_RESPONSE_NORMAL:
- s = "normal response";
- break;
+ return strdup("normal response");
case AMQP_RESPONSE_NONE:
- s = "missing RPC reply type";
- break;
+ return strdup("missing RPC reply type");
case AMQP_RESPONSE_LIBRARY_EXCEPTION:
- if (r.library_errno)
- s = strerror(r.library_errno);
- else
- s = "end of stream";
-
- break;
+ return amqp_error_string(r.library_error);
case AMQP_RESPONSE_SERVER_EXCEPTION:
return amqp_server_exception_string(r);
@@ -152,33 +158,24 @@ char *amqp_rpc_reply_string(amqp_rpc_reply_t r)
default:
abort();
}
-
- return strdup(s);
}
void die_rpc(amqp_rpc_reply_t r, const char *fmt, ...)
{
+ va_list ap;
+ char *errstr;
+
if (r.reply_type == AMQP_RESPONSE_NORMAL)
return;
- va_list ap;
va_start(ap, fmt);
vfprintf(stderr, fmt, ap);
va_end(ap);
- fprintf(stderr, ": %s\n", amqp_rpc_reply_string(r));
+ fprintf(stderr, ": %s\n", errstr = amqp_rpc_reply_string(r));
+ free(errstr);
exit(1);
}
-void set_cloexec(int fd)
-{
- int flags;
-
- flags = fcntl(fd, F_GETFD);
- if (flags == -1
- || fcntl(fd, F_SETFD, (long)(flags | FD_CLOEXEC)) == -1)
- die_errno(errno, "set_cloexec");
-}
-
static char *amqp_server = "localhost";
static char *amqp_vhost = "/";
static char *amqp_username = "guest";
@@ -222,14 +219,7 @@ amqp_connection_state_t make_connection(void)
}
s = amqp_open_socket(host, port ? port : 5672);
- if (s < 0) {
- if (s == -ENOENT)
- die("unknown host %s", host);
- else
- die_errno(-s, "opening socket to %s", amqp_server);
- }
-
- set_cloexec(s);
+ die_amqp_error(-s, "opening socket to %s", amqp_server);
conn = amqp_new_connection();
amqp_set_sockfd(conn, s);
@@ -247,16 +237,14 @@ amqp_connection_state_t make_connection(void)
void close_connection(amqp_connection_state_t conn)
{
- int s = amqp_get_sockfd(conn);
-
+ int res;
die_rpc(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS),
"closing channel");
die_rpc(amqp_connection_close(conn, AMQP_REPLY_SUCCESS),
"closing connection");
- amqp_destroy_connection(conn);
-
- if (close(s) < 0)
- die_errno(errno, "closing socket");
+
+ res = amqp_destroy_connection(conn);
+ die_amqp_error(-res, "closing connection");
}
amqp_bytes_t read_all(int fd)
@@ -307,8 +295,7 @@ void copy_body(amqp_connection_state_t conn, int fd)
amqp_frame_t frame;
int res = amqp_simple_wait_frame(conn, &frame);
- if (res < 0)
- die_errno(-res, "waiting for header frame");
+ die_amqp_error(-res, "waiting for header frame");
if (frame.frame_type != AMQP_FRAME_HEADER)
die("expected header, got frame type 0x%X",
frame.frame_type);
@@ -316,8 +303,7 @@ void copy_body(amqp_connection_state_t conn, int fd)
body_remaining = frame.payload.properties.body_size;
while (body_remaining) {
res = amqp_simple_wait_frame(conn, &frame);
- if (res < 0)
- die_errno(-res, "waiting for body frame");
+ die_amqp_error(-res, "waiting for body frame");
if (frame.frame_type != AMQP_FRAME_BODY)
die("expected body, got frame type 0x%X",
frame.frame_type);
@@ -327,47 +313,6 @@ void copy_body(amqp_connection_state_t conn, int fd)
}
}
-void pipeline(const char * const *argv, struct pipeline *pl)
-{
- posix_spawn_file_actions_t file_acts;
-
- int pipefds[2];
- if (pipe(pipefds))
- die_errno(errno, "pipe");
-
- die_errno(posix_spawn_file_actions_init(&file_acts),
- "posix_spawn_file_actions_init");
- die_errno(posix_spawn_file_actions_adddup2(&file_acts, pipefds[0], 0),
- "posix_spawn_file_actions_adddup2");
- die_errno(posix_spawn_file_actions_addclose(&file_acts, pipefds[0]),
- "posix_spawn_file_actions_addclose");
- die_errno(posix_spawn_file_actions_addclose(&file_acts, pipefds[1]),
- "posix_spawn_file_actions_addclose");
-
- die_errno(posix_spawnp(&pl->pid, argv[0], &file_acts, NULL,
- (char * const *)argv, environ),
- "posix_spawnp: %s", argv[0]);
-
- die_errno(posix_spawn_file_actions_destroy(&file_acts),
- "posix_spawn_file_actions_destroy");
-
- if (close(pipefds[0]))
- die_errno(errno, "close");
-
- pl->infd = pipefds[1];
-}
-
-int finish_pipeline(struct pipeline *pl)
-{
- int status;
-
- if (close(pl->infd))
- die_errno(errno, "close");
- if (waitpid(pl->pid, &status, 0) < 0)
- die_errno(errno, "waitpid");
- return WIFEXITED(status) && WEXITSTATUS(status) == 0;
-}
-
poptContext process_options(int argc, const char **argv,
struct poptOption *options,
const char *help)
diff --git a/tools/common.h b/tools/common.h
index 09a9242..84889d3 100644
--- a/tools/common.h
+++ b/tools/common.h
@@ -50,6 +50,8 @@
#include <stdint.h>
+#include <popt.h>
+
#include <amqp.h>
#include <amqp_framing.h>
@@ -60,6 +62,8 @@ extern void die(const char *fmt, ...)
__attribute__ ((format (printf, 1, 2)));
extern void die_errno(int err, const char *fmt, ...)
__attribute__ ((format (printf, 2, 3)));
+extern void die_amqp_error(int err, const char *fmt, ...)
+ __attribute__ ((format (printf, 2, 3)));
extern void die_rpc(amqp_rpc_reply_t r, const char *fmt, ...)
__attribute__ ((format (printf, 2, 3)));
@@ -73,14 +77,6 @@ extern void write_all(int fd, amqp_bytes_t data);
extern void copy_body(amqp_connection_state_t conn, int fd);
-struct pipeline {
- int pid;
- int infd;
-};
-
-extern void pipeline(const char * const *argv, struct pipeline *pl);
-extern int finish_pipeline(struct pipeline *pl);
-
#define INCLUDE_OPTIONS(options) \
{NULL, 0, POPT_ARG_INCLUDE_TABLE, options, 0, options ## _title, NULL}
diff --git a/tools/consume.c b/tools/consume.c
index 7725ba8..34037d9 100644
--- a/tools/consume.c
+++ b/tools/consume.c
@@ -53,9 +53,8 @@
#include <stdio.h>
#include <stdlib.h>
-#include <popt.h>
-
#include "common.h"
+#include "process.h"
/* Convert a amqp_bytes_t to an escaped string form for printing. We
use the same escaping conventions as rabbitmqctl. */
@@ -85,61 +84,45 @@ static char *stringify_bytes(amqp_bytes_t bytes)
static amqp_bytes_t setup_queue(amqp_connection_state_t conn,
char *queue, char *exchange,
- char *exchange_type, char *routing_key)
+ char *routing_key, int declare)
{
- amqp_bytes_t queue_bytes;
- amqp_queue_declare_ok_t *res;
+ amqp_bytes_t queue_bytes = cstring_bytes(queue);
/* if an exchange name wasn't provided, check that we don't
have options that require it. */
- if (!exchange) {
- char *opt = NULL;
- if (routing_key)
- opt = "--routing-key";
- else if (exchange_type)
- opt = "--exchange-type";
-
- if (opt) {
- fprintf(stderr,
- "%s option requires an exchange name to be "
- "provided with --exchange\n", opt);
- exit(1);
- }
+ if (!exchange && routing_key) {
+ fprintf(stderr, "--routing-key option requires an exchange"
+ " name to be provided with --exchange\n");
+ exit(1);
}
- /* Declare the queue as auto-delete. If the queue already
- exists, this won't have any effect. */
- queue_bytes = cstring_bytes(queue);
- res = amqp_queue_declare(conn, 1, queue_bytes, 0, 0, 0, 1,
- AMQP_EMPTY_TABLE);
- if (!res)
- die_rpc(amqp_get_rpc_reply(conn), "queue.declare");
-
- if (!queue) {
- /* the server should have provided a queue name */
- char *sq;
- queue_bytes = amqp_bytes_malloc_dup(res->queue);
- sq = stringify_bytes(queue_bytes);
- fprintf(stderr, "Server provided queue name: %s\n", sq);
- free(sq);
- }
+ if (!queue || exchange || declare) {
+ /* Declare the queue as auto-delete. */
+ amqp_queue_declare_ok_t *res = amqp_queue_declare(conn, 1,
+ queue_bytes, 0, 0, 1, 1,
+ AMQP_EMPTY_TABLE);
+ if (!res)
+ die_rpc(amqp_get_rpc_reply(conn), "queue.declare");
+
+ if (!queue) {
+ /* the server should have provided a queue name */
+ char *sq;
+ queue_bytes = amqp_bytes_malloc_dup(res->queue);
+ sq = stringify_bytes(queue_bytes);
+ fprintf(stderr, "Server provided queue name: %s\n",
+ sq);
+ free(sq);
+ }
- /* Bind to an exchange if requested */
- if (exchange) {
- amqp_bytes_t eb = amqp_cstring_bytes(exchange);
-
- if (exchange_type) {
- /* we should create the exchange */
- if (!amqp_exchange_declare(conn, 1, eb,
- amqp_cstring_bytes(exchange_type),
- 0, 0, 1, AMQP_EMPTY_TABLE))
- die_rpc(amqp_get_rpc_reply(conn), "exchange.declare");
+ /* Bind to an exchange if requested */
+ if (exchange) {
+ amqp_bytes_t eb = amqp_cstring_bytes(exchange);
+ if (!amqp_queue_bind(conn, 1, queue_bytes, eb,
+ cstring_bytes(routing_key),
+ AMQP_EMPTY_TABLE))
+ die_rpc(amqp_get_rpc_reply(conn),
+ "queue.bind");
}
-
- if (!amqp_queue_bind(conn, 1, queue_bytes, eb,
- cstring_bytes(routing_key),
- AMQP_EMPTY_TABLE))
- die_rpc(amqp_get_rpc_reply(conn), "queue.bind");
}
return queue_bytes;
@@ -157,8 +140,7 @@ static void do_consume(amqp_connection_state_t conn, amqp_bytes_t queue,
struct pipeline pl;
uint64_t delivery_tag;
int res = amqp_simple_wait_frame(conn, &frame);
- if (res < 0)
- die_errno(-res, "waiting for header frame");
+ die_amqp_error(res, "waiting for header frame");
if (frame.frame_type != AMQP_FRAME_METHOD
|| frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD)
@@ -172,8 +154,9 @@ static void do_consume(amqp_connection_state_t conn, amqp_bytes_t queue,
copy_body(conn, pl.infd);
if (finish_pipeline(&pl) && !no_ack)
- die_errno(-amqp_basic_ack(conn, 1, delivery_tag, 0),
- "basic.ack");
+ die_amqp_error(amqp_basic_ack(conn, 1, delivery_tag,
+ 0),
+ "basic.ack");
amqp_maybe_release_buffers(conn);
}
@@ -182,13 +165,13 @@ static void do_consume(amqp_connection_state_t conn, amqp_bytes_t queue,
int main(int argc, const char **argv)
{
poptContext opts;
- int no_ack;
amqp_connection_state_t conn;
const char * const *cmd_argv;
char *queue = NULL;
char *exchange = NULL;
- char *exchange_type = NULL;
char *routing_key = NULL;
+ int declare = 0;
+ int no_ack = 0;
amqp_bytes_t queue_bytes;
struct poptOption options[] = {
@@ -197,11 +180,10 @@ int main(int argc, const char **argv)
"the queue to consume from", "queue"},
{"exchange", 'e', POPT_ARG_STRING, &exchange, 0,
"bind the queue to this exchange", "exchange"},
- {"exchange-type", 't', POPT_ARG_STRING, &exchange_type, 0,
- "create auto-delete exchange of this type for binding",
- "type"},
{"routing-key", 'r', POPT_ARG_STRING, &routing_key, 0,
"the routing key to bind with", "routing key"},
+ {"declare", 'd', POPT_ARG_NONE, &declare, 0,
+ "declare an exclusive queue", NULL},
{"no-ack", 'A', POPT_ARG_NONE, &no_ack, 0,
"consume in no-ack mode", NULL},
POPT_AUTOHELP
@@ -219,8 +201,7 @@ int main(int argc, const char **argv)
}
conn = make_connection();
- queue_bytes = setup_queue(conn, queue, exchange, exchange_type,
- routing_key);
+ queue_bytes = setup_queue(conn, queue, exchange, routing_key, declare);
do_consume(conn, queue_bytes, no_ack, cmd_argv);
close_connection(conn);
return 0;
diff --git a/tools/declare_queue.c b/tools/declare_queue.c
index 662531f..3536455 100644
--- a/tools/declare_queue.c
+++ b/tools/declare_queue.c
@@ -55,8 +55,6 @@
#include <string.h>
#include <unistd.h>
-#include <popt.h>
-
#include "common.h"
int main(int argc, const char **argv)
diff --git a/tools/delete_queue.c b/tools/delete_queue.c
index 41d0d13..ccd157e 100644
--- a/tools/delete_queue.c
+++ b/tools/delete_queue.c
@@ -55,8 +55,6 @@
#include <string.h>
#include <unistd.h>
-#include <popt.h>
-
#include "common.h"
int main(int argc, const char **argv)
diff --git a/tools/doc/consume.xml b/tools/doc/consume.xml
index 448ade6..16d61ad 100644
--- a/tools/doc/consume.xml
+++ b/tools/doc/consume.xml
@@ -50,8 +50,7 @@
<para>
<command>amqp-consume</command> can consume from an
existing queue, or it can create a new queue. It can
- optionally bind the queue to an existing exchange, or to a
- newly created exchange.
+ optionally bind the queue to an existing exchange.
</para>
<para>
By default, messages will be consumed with explicit
@@ -72,13 +71,16 @@
<listitem>
<para>
The name of the queue to consume messages
- from. If the specified queue does not exist,
- an auto-delete queue is created with the given
- name. If this option is omitted, a new
- auto-delete queue will be created, with a
- unique name assigned to the queue by the AMQP
- server; that unique name will be displayed on
- stderr.
+ from.
+ </para>
+
+ <para>
+ If the <option>--queue</option> option is
+ omitted, the AMQP server will assign a unique
+ name to the queue, and that server-assigned
+ name will be dixsplayed on stderr; this case
+ implies that an exclusive queue should be
+ declared.
</para>
</listitem>
</varlistentry>
@@ -87,34 +89,36 @@
<term><option>--exchange</option>=<replaceable class="parameter">exchange name</replaceable></term>
<listitem>
<para>
- The name of the exchange to bind the queue to.
- If omitted, binding is not performed. The
- specified exchange should already exist unless
- the <option>--exchange-type</option> option is
- used to request the creation of an exchange.
+ Specifies that an exclusive queue should
+ be declared, and bound to the given exchange.
+ The specified exchange should already exist
+ unless the <option>--exchange-type</option>
+ option is used to request the creation of an
+ exchange.
</para>
</listitem>
</varlistentry>
<varlistentry>
- <term><option>-t</option></term>
- <term><option>--exchange-type</option>=<replaceable class="parameter">type</replaceable></term>
+ <term><option>-r</option></term>
+ <term><option>--routing-key</option>=<replaceable class="parameter">routing key</replaceable></term>
<listitem>
<para>
- This option indicates that an auto-delete
- exchange of the specified type should be
- created. The name of the exchange should be
- given by the <option>--exchange</option>
- option.
+ The routing key for binding. If omitted, an
+ empty routing key is assumed.
</para>
</listitem>
</varlistentry>
<varlistentry>
- <term><option>-r</option></term>
- <term><option>--routing-key</option>=<replaceable class="parameter">routing key</replaceable></term>
+ <term><option>-d</option></term>
+ <term><option>--declare</option></term>
<listitem>
<para>
- The routing key for the binding. If omitted,
- an empty routing key is assumed.
+ Forces an exclusive queue to be declared,
+ even when it otherwise would not. That is,
+ when a queue name is specified with the
+ <option>--queue</option> option, but no
+ binding to an exchange is requested with the
+ <option>--exchange</option> option.
</para>
</listitem>
</varlistentry>
@@ -138,7 +142,7 @@
<title>Examples</title>
<variablelist>
<varlistentry>
- <term>Consume messages from the queue
+ <term>Consume messages from an existing queue
<quote><systemitem
class="resource">myqueue</systemitem></quote>, and
output the message bodies on standard output via
@@ -149,7 +153,7 @@
</varlistentry>
<varlistentry>
- <term>Bind a newly created auto-delete queue to an
+ <term>Bind a new exclusive queue to an
exchange <quote><systemitem
class="resource">myexch</systemitem></quote>, and send
each message body to the script
diff --git a/tools/get.c b/tools/get.c
index f746fd1..8f8e0d0 100644
--- a/tools/get.c
+++ b/tools/get.c
@@ -52,8 +52,6 @@
#include <stdio.h>
-#include <popt.h>
-
#include "common.h"
static int do_get(amqp_connection_state_t conn, char *queue)
diff --git a/tools/publish.c b/tools/publish.c
index 21314b2..0917dae 100644
--- a/tools/publish.c
+++ b/tools/publish.c
@@ -54,8 +54,6 @@
#include <stdlib.h>
#include <string.h>
-#include <popt.h>
-
#include "common.h"
static void do_publish(amqp_connection_state_t conn,
@@ -66,8 +64,7 @@ static void do_publish(amqp_connection_state_t conn,
cstring_bytes(exchange),
cstring_bytes(routing_key),
0, 0, props, body);
- if (res != 0)
- die_errno(-res, "basic.publish");
+ die_amqp_error(res, "basic.publish");
}
int main(int argc, const char **argv)
diff --git a/tools/unix/process.c b/tools/unix/process.c
new file mode 100644
index 0000000..8a02afb
--- /dev/null
+++ b/tools/unix/process.c
@@ -0,0 +1,100 @@
+/*
+ * ***** BEGIN LICENSE BLOCK *****
+ * Version: MPL 1.1/GPL 2.0
+ *
+ * The contents of this file are subject to the Mozilla Public License
+ * Version 1.1 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ * http://www.mozilla.org/MPL/
+ *
+ * Software distributed under the License is distributed on an "AS IS"
+ * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+ * the License for the specific language governing rights and
+ * limitations under the License.
+ *
+ * The Original Code is librabbitmq.
+ *
+ * The Initial Developers of the Original Code are LShift Ltd, Cohesive
+ * Financial Technologies LLC, and Rabbit Technologies Ltd. Portions
+ * created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, Cohesive
+ * Financial Technologies LLC, or Rabbit Technologies Ltd are Copyright
+ * (C) 2007-2008 LShift Ltd, Cohesive Financial Technologies LLC, and
+ * Rabbit Technologies Ltd.
+ *
+ * Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+ * Ltd. Portions created by Cohesive Financial Technologies LLC are
+ * Copyright (C) 2007-2010 Cohesive Financial Technologies
+ * LLC. Portions created by Rabbit Technologies Ltd are Copyright (C)
+ * 2007-2010 Rabbit Technologies Ltd.
+ *
+ * Portions created by Tony Garnock-Jones are Copyright (C) 2009-2010
+ * LShift Ltd and Tony Garnock-Jones.
+ *
+ * All Rights Reserved.
+ *
+ * Contributor(s): ______________________________________.
+ *
+ * Alternatively, the contents of this file may be used under the terms
+ * of the GNU General Public License Version 2 or later (the "GPL"), in
+ * which case the provisions of the GPL are applicable instead of those
+ * above. If you wish to allow use of your version of this file only
+ * under the terms of the GPL, and not to allow others to use your
+ * version of this file under the terms of the MPL, indicate your
+ * decision by deleting the provisions above and replace them with the
+ * notice and other provisions required by the GPL. If you do not
+ * delete the provisions above, a recipient may use your version of
+ * this file under the terms of any one of the MPL or the GPL.
+ *
+ * ***** END LICENSE BLOCK *****
+ */
+
+#include <unistd.h>
+#include <errno.h>
+#include <spawn.h>
+#include <sys/wait.h>
+
+#include "common.h"
+#include "process.h"
+
+extern char **environ;
+
+void pipeline(const char *const *argv, struct pipeline *pl)
+{
+ posix_spawn_file_actions_t file_acts;
+
+ int pipefds[2];
+ if (pipe(pipefds))
+ die_errno(errno, "pipe");
+
+ die_errno(posix_spawn_file_actions_init(&file_acts),
+ "posix_spawn_file_actions_init");
+ die_errno(posix_spawn_file_actions_adddup2(&file_acts, pipefds[0], 0),
+ "posix_spawn_file_actions_adddup2");
+ die_errno(posix_spawn_file_actions_addclose(&file_acts, pipefds[0]),
+ "posix_spawn_file_actions_addclose");
+ die_errno(posix_spawn_file_actions_addclose(&file_acts, pipefds[1]),
+ "posix_spawn_file_actions_addclose");
+
+ die_errno(posix_spawnp(&pl->pid, argv[0], &file_acts, NULL,
+ (char * const *)argv, environ),
+ "posix_spawnp: %s", argv[0]);
+
+ die_errno(posix_spawn_file_actions_destroy(&file_acts),
+ "posix_spawn_file_actions_destroy");
+
+ if (close(pipefds[0]))
+ die_errno(errno, "close");
+
+ pl->infd = pipefds[1];
+}
+
+int finish_pipeline(struct pipeline *pl)
+{
+ int status;
+
+ if (close(pl->infd))
+ die_errno(errno, "close");
+ if (waitpid(pl->pid, &status, 0) < 0)
+ die_errno(errno, "waitpid");
+ return WIFEXITED(status) && WEXITSTATUS(status) == 0;
+}
diff --git a/tools/unix/process.h b/tools/unix/process.h
new file mode 100644
index 0000000..ac2939d
--- /dev/null
+++ b/tools/unix/process.h
@@ -0,0 +1,57 @@
+/*
+ * ***** BEGIN LICENSE BLOCK *****
+ * Version: MPL 1.1/GPL 2.0
+ *
+ * The contents of this file are subject to the Mozilla Public License
+ * Version 1.1 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ * http://www.mozilla.org/MPL/
+ *
+ * Software distributed under the License is distributed on an "AS IS"
+ * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+ * the License for the specific language governing rights and
+ * limitations under the License.
+ *
+ * The Original Code is librabbitmq.
+ *
+ * The Initial Developers of the Original Code are LShift Ltd, Cohesive
+ * Financial Technologies LLC, and Rabbit Technologies Ltd. Portions
+ * created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, Cohesive
+ * Financial Technologies LLC, or Rabbit Technologies Ltd are Copyright
+ * (C) 2007-2008 LShift Ltd, Cohesive Financial Technologies LLC, and
+ * Rabbit Technologies Ltd.
+ *
+ * Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+ * Ltd. Portions created by Cohesive Financial Technologies LLC are
+ * Copyright (C) 2007-2010 Cohesive Financial Technologies
+ * LLC. Portions created by Rabbit Technologies Ltd are Copyright (C)
+ * 2007-2010 Rabbit Technologies Ltd.
+ *
+ * Portions created by Tony Garnock-Jones are Copyright (C) 2009-2010
+ * LShift Ltd and Tony Garnock-Jones.
+ *
+ * All Rights Reserved.
+ *
+ * Contributor(s): ______________________________________.
+ *
+ * Alternatively, the contents of this file may be used under the terms
+ * of the GNU General Public License Version 2 or later (the "GPL"), in
+ * which case the provisions of the GPL are applicable instead of those
+ * above. If you wish to allow use of your version of this file only
+ * under the terms of the GPL, and not to allow others to use your
+ * version of this file under the terms of the MPL, indicate your
+ * decision by deleting the provisions above and replace them with the
+ * notice and other provisions required by the GPL. If you do not
+ * delete the provisions above, a recipient may use your version of
+ * this file under the terms of any one of the MPL or the GPL.
+ *
+ * ***** END LICENSE BLOCK *****
+ */
+
+struct pipeline {
+ int pid;
+ int infd;
+};
+
+extern void pipeline(const char *const *argv, struct pipeline *pl);
+extern int finish_pipeline(struct pipeline *pl);
diff --git a/tools/windows/compat.c b/tools/windows/compat.c
new file mode 100644
index 0000000..f0508b2
--- /dev/null
+++ b/tools/windows/compat.c
@@ -0,0 +1,73 @@
+/*
+ * ***** BEGIN LICENSE BLOCK *****
+ * Version: MPL 1.1/GPL 2.0
+ *
+ * The contents of this file are subject to the Mozilla Public License
+ * Version 1.1 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ * http://www.mozilla.org/MPL/
+ *
+ * Software distributed under the License is distributed on an "AS IS"
+ * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+ * the License for the specific language governing rights and
+ * limitations under the License.
+ *
+ * The Original Code is librabbitmq.
+ *
+ * The Initial Developers of the Original Code are LShift Ltd, Cohesive
+ * Financial Technologies LLC, and Rabbit Technologies Ltd. Portions
+ * created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, Cohesive
+ * Financial Technologies LLC, or Rabbit Technologies Ltd are Copyright
+ * (C) 2007-2008 LShift Ltd, Cohesive Financial Technologies LLC, and
+ * Rabbit Technologies Ltd.
+ *
+ * Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+ * Ltd. Portions created by Cohesive Financial Technologies LLC are
+ * Copyright (C) 2007-2010 Cohesive Financial Technologies
+ * LLC. Portions created by Rabbit Technologies Ltd are Copyright (C)
+ * 2007-2010 Rabbit Technologies Ltd.
+ *
+ * Portions created by Tony Garnock-Jones are Copyright (C) 2009-2010
+ * LShift Ltd and Tony Garnock-Jones.
+ *
+ * All Rights Reserved.
+ *
+ * Contributor(s): ______________________________________.
+ *
+ * Alternatively, the contents of this file may be used under the terms
+ * of the GNU General Public License Version 2 or later (the "GPL"), in
+ * which case the provisions of the GPL are applicable instead of those
+ * above. If you wish to allow use of your version of this file only
+ * under the terms of the GPL, and not to allow others to use your
+ * version of this file under the terms of the MPL, indicate your
+ * decision by deleting the provisions above and replace them with the
+ * notice and other provisions required by the GPL. If you do not
+ * delete the provisions above, a recipient may use your version of
+ * this file under the terms of any one of the MPL or the GPL.
+ *
+ * ***** END LICENSE BLOCK *****
+ */
+
+#include <stdio.h>
+#include <stdarg.h>
+#include <stdlib.h>
+
+#include "compat.h"
+
+int asprintf(char **strp, const char *fmt, ...)
+{
+ va_list ap;
+ int len;
+
+ va_start(ap, fmt);
+ len = _vscprintf(fmt, ap);
+ *strp = malloc(len+1);
+ if (!*strp)
+ return -1;
+
+ len = vsprintf(*strp, fmt, ap);
+ *strp[len] = 0;
+
+ va_end(ap);
+ return len;
+}
diff --git a/tools/windows/compat.h b/tools/windows/compat.h
new file mode 100644
index 0000000..8211b37
--- /dev/null
+++ b/tools/windows/compat.h
@@ -0,0 +1,51 @@
+/*
+ * ***** BEGIN LICENSE BLOCK *****
+ * Version: MPL 1.1/GPL 2.0
+ *
+ * The contents of this file are subject to the Mozilla Public License
+ * Version 1.1 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ * http://www.mozilla.org/MPL/
+ *
+ * Software distributed under the License is distributed on an "AS IS"
+ * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+ * the License for the specific language governing rights and
+ * limitations under the License.
+ *
+ * The Original Code is librabbitmq.
+ *
+ * The Initial Developers of the Original Code are LShift Ltd, Cohesive
+ * Financial Technologies LLC, and Rabbit Technologies Ltd. Portions
+ * created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, Cohesive
+ * Financial Technologies LLC, or Rabbit Technologies Ltd are Copyright
+ * (C) 2007-2008 LShift Ltd, Cohesive Financial Technologies LLC, and
+ * Rabbit Technologies Ltd.
+ *
+ * Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+ * Ltd. Portions created by Cohesive Financial Technologies LLC are
+ * Copyright (C) 2007-2010 Cohesive Financial Technologies
+ * LLC. Portions created by Rabbit Technologies Ltd are Copyright (C)
+ * 2007-2010 Rabbit Technologies Ltd.
+ *
+ * Portions created by Tony Garnock-Jones are Copyright (C) 2009-2010
+ * LShift Ltd and Tony Garnock-Jones.
+ *
+ * All Rights Reserved.
+ *
+ * Contributor(s): ______________________________________.
+ *
+ * Alternatively, the contents of this file may be used under the terms
+ * of the GNU General Public License Version 2 or later (the "GPL"), in
+ * which case the provisions of the GPL are applicable instead of those
+ * above. If you wish to allow use of your version of this file only
+ * under the terms of the GPL, and not to allow others to use your
+ * version of this file under the terms of the MPL, indicate your
+ * decision by deleting the provisions above and replace them with the
+ * notice and other provisions required by the GPL. If you do not
+ * delete the provisions above, a recipient may use your version of
+ * this file under the terms of any one of the MPL or the GPL.
+ *
+ * ***** END LICENSE BLOCK *****
+ */
+
+extern int asprintf(char **strp, const char *fmt, ...);
diff --git a/tools/windows/process.c b/tools/windows/process.c
new file mode 100644
index 0000000..0a005bd
--- /dev/null
+++ b/tools/windows/process.c
@@ -0,0 +1,206 @@
+/*
+ * ***** BEGIN LICENSE BLOCK *****
+ * Version: MPL 1.1/GPL 2.0
+ *
+ * The contents of this file are subject to the Mozilla Public License
+ * Version 1.1 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ * http://www.mozilla.org/MPL/
+ *
+ * Software distributed under the License is distributed on an "AS IS"
+ * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+ * the License for the specific language governing rights and
+ * limitations under the License.
+ *
+ * The Original Code is librabbitmq.
+ *
+ * The Initial Developers of the Original Code are LShift Ltd, Cohesive
+ * Financial Technologies LLC, and Rabbit Technologies Ltd. Portions
+ * created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, Cohesive
+ * Financial Technologies LLC, or Rabbit Technologies Ltd are Copyright
+ * (C) 2007-2008 LShift Ltd, Cohesive Financial Technologies LLC, and
+ * Rabbit Technologies Ltd.
+ *
+ * Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+ * Ltd. Portions created by Cohesive Financial Technologies LLC are
+ * Copyright (C) 2007-2010 Cohesive Financial Technologies
+ * LLC. Portions created by Rabbit Technologies Ltd are Copyright (C)
+ * 2007-2010 Rabbit Technologies Ltd.
+ *
+ * Portions created by Tony Garnock-Jones are Copyright (C) 2009-2010
+ * LShift Ltd and Tony Garnock-Jones.
+ *
+ * All Rights Reserved.
+ *
+ * Contributor(s): ______________________________________.
+ *
+ * Alternatively, the contents of this file may be used under the terms
+ * of the GNU General Public License Version 2 or later (the "GPL"), in
+ * which case the provisions of the GPL are applicable instead of those
+ * above. If you wish to allow use of your version of this file only
+ * under the terms of the GPL, and not to allow others to use your
+ * version of this file under the terms of the MPL, indicate your
+ * decision by deleting the provisions above and replace them with the
+ * notice and other provisions required by the GPL. If you do not
+ * delete the provisions above, a recipient may use your version of
+ * this file under the terms of any one of the MPL or the GPL.
+ *
+ * ***** END LICENSE BLOCK *****
+ */
+
+#include <stdio.h>
+#include <io.h>
+#include <windows.h>
+
+#include "common.h"
+#include "process.h"
+
+void die_windows_error(const char *fmt, ...)
+{
+ char *msg;
+
+ va_list ap;
+ va_start(ap, fmt);
+ vfprintf(stderr, fmt, ap);
+ va_end(ap);
+
+ if (!FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM
+ | FORMAT_MESSAGE_ALLOCATE_BUFFER,
+ NULL, GetLastError(),
+ MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
+ (LPSTR)&msg, 0, NULL))
+ msg = "(failed to retrieve Windows error message)";
+
+ fprintf(stderr, ": %s\n", msg);
+ exit(1);
+}
+
+static char *make_command_line(const char *const *argv)
+{
+ int i;
+ size_t len = 1; /* initial quotes */
+ char *buf;
+ char *dest;
+
+ /* calculate the length of the required buffer, making worst
+ case assumptions for simplicity */
+ for (i = 0;;) {
+ len += strlen(argv[i]) * 2;
+
+ if (!argv[++i])
+ break;
+
+ len += 3; /* quotes, space, quotes */
+ }
+
+ len += 2; /* final quotes and the terminating zero */
+
+ dest = buf = malloc(len);
+ if (!buf)
+ die("allocating memory for subprocess command line");
+
+ *dest++ = '\"';
+
+ for (i = 0;;) {
+ const char *src = argv[i];
+ for (;;) {
+ switch (*src) {
+ case 0:
+ goto done;
+
+ case '\"':
+ case '\\':
+ *dest++ = '\\';
+ /* fall through */
+
+ default:
+ *dest++ = *src++;
+ break;
+ }
+ }
+ done:
+
+ if (!argv[++i])
+ break;
+
+ *dest++ = '\"';
+ *dest++ = ' ';
+ *dest++ = '\"';
+ }
+
+ *dest++ = '\"';
+ *dest++ = 0;
+ return buf;
+}
+
+void pipeline(const char *const *argv, struct pipeline *pl)
+{
+ HANDLE in_read_handle, in_write_handle;
+ SECURITY_ATTRIBUTES sec_attr;
+ PROCESS_INFORMATION proc_info;
+ STARTUPINFO start_info;
+ char *cmdline = make_command_line(argv);
+
+ sec_attr.nLength = sizeof sec_attr;
+ sec_attr.bInheritHandle = TRUE;
+ sec_attr.lpSecurityDescriptor = NULL;
+
+ if (!CreatePipe(&in_read_handle, &in_write_handle, &sec_attr, 0))
+ die_windows_error("CreatePipe");
+
+ if (!SetHandleInformation(in_write_handle, HANDLE_FLAG_INHERIT, 0))
+ die_windows_error("SetHandleInformation");
+
+ /* when in Rome... */
+ ZeroMemory(&proc_info, sizeof proc_info);
+ ZeroMemory(&start_info, sizeof start_info);
+
+ start_info.cb = sizeof start_info;
+ start_info.dwFlags |= STARTF_USESTDHANDLES;
+
+ if ((start_info.hStdError = GetStdHandle(STD_ERROR_HANDLE))
+ == INVALID_HANDLE_VALUE
+ || (start_info.hStdOutput = GetStdHandle(STD_OUTPUT_HANDLE))
+ == INVALID_HANDLE_VALUE)
+ die_windows_error("GetStdHandle");
+
+ start_info.hStdInput = in_read_handle;
+
+ if (!CreateProcess(NULL, cmdline, NULL, NULL, TRUE, 0,
+ NULL, NULL, &start_info, &proc_info))
+ die_windows_error("CreateProcess");
+
+ free(cmdline);
+
+ if (!CloseHandle(proc_info.hThread))
+ die_windows_error("CloseHandle for thread");
+ if (!CloseHandle(in_read_handle))
+ die_windows_error("CloseHandle");
+
+ pl->proc_handle = proc_info.hProcess;
+ pl->infd = _open_osfhandle((intptr_t)in_write_handle, 0);
+}
+
+int finish_pipeline(struct pipeline *pl)
+{
+ DWORD code;
+
+ if (close(pl->infd))
+ die_errno(errno, "close");
+
+ for (;;) {
+ if (!GetExitCodeProcess(pl->proc_handle, &code))
+ die_windows_error("GetExitCodeProcess");
+ if (code != STILL_ACTIVE)
+ break;
+
+ if (WaitForSingleObject(pl->proc_handle, INFINITE)
+ == WAIT_FAILED)
+ die_windows_error("WaitForSingleObject");
+ }
+
+ if (!CloseHandle(pl->proc_handle))
+ die_windows_error("CloseHandle for process");
+
+ return code;
+}
diff --git a/tools/windows/process.h b/tools/windows/process.h
new file mode 100644
index 0000000..df276a7
--- /dev/null
+++ b/tools/windows/process.h
@@ -0,0 +1,59 @@
+/*
+ * ***** BEGIN LICENSE BLOCK *****
+ * Version: MPL 1.1/GPL 2.0
+ *
+ * The contents of this file are subject to the Mozilla Public License
+ * Version 1.1 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ * http://www.mozilla.org/MPL/
+ *
+ * Software distributed under the License is distributed on an "AS IS"
+ * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+ * the License for the specific language governing rights and
+ * limitations under the License.
+ *
+ * The Original Code is librabbitmq.
+ *
+ * The Initial Developers of the Original Code are LShift Ltd, Cohesive
+ * Financial Technologies LLC, and Rabbit Technologies Ltd. Portions
+ * created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, Cohesive
+ * Financial Technologies LLC, or Rabbit Technologies Ltd are Copyright
+ * (C) 2007-2008 LShift Ltd, Cohesive Financial Technologies LLC, and
+ * Rabbit Technologies Ltd.
+ *
+ * Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+ * Ltd. Portions created by Cohesive Financial Technologies LLC are
+ * Copyright (C) 2007-2010 Cohesive Financial Technologies
+ * LLC. Portions created by Rabbit Technologies Ltd are Copyright (C)
+ * 2007-2010 Rabbit Technologies Ltd.
+ *
+ * Portions created by Tony Garnock-Jones are Copyright (C) 2009-2010
+ * LShift Ltd and Tony Garnock-Jones.
+ *
+ * All Rights Reserved.
+ *
+ * Contributor(s): ______________________________________.
+ *
+ * Alternatively, the contents of this file may be used under the terms
+ * of the GNU General Public License Version 2 or later (the "GPL"), in
+ * which case the provisions of the GPL are applicable instead of those
+ * above. If you wish to allow use of your version of this file only
+ * under the terms of the GPL, and not to allow others to use your
+ * version of this file under the terms of the MPL, indicate your
+ * decision by deleting the provisions above and replace them with the
+ * notice and other provisions required by the GPL. If you do not
+ * delete the provisions above, a recipient may use your version of
+ * this file under the terms of any one of the MPL or the GPL.
+ *
+ * ***** END LICENSE BLOCK *****
+ */
+
+#include <windef.h>
+
+struct pipeline {
+ HANDLE proc_handle;
+ int infd;
+};
+
+extern void pipeline(const char *const *argv, struct pipeline *pl);
+extern int finish_pipeline(struct pipeline *pl);