summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTony Garnock-Jones <tonygarnockjones@gmail.com>2010-06-06 15:34:01 +1200
committerTony Garnock-Jones <tonygarnockjones@gmail.com>2010-06-06 15:34:01 +1200
commit80615f70c699603872c5e49c154e7af04dd83498 (patch)
tree1a235190a46ed7133b7ec7626e1f87929d1c1c90
parent489cfea427b6bb8a8d0a367285d53f9285fdb2b0 (diff)
parent872d469c5917e80b9e52e7eac7e5357c1bfcf428 (diff)
downloadrabbitmq-c-github-ask-80615f70c699603872c5e49c154e7af04dd83498.tar.gz
Merge default into bug22825
-rw-r--r--README.windows77
-rw-r--r--configure.ac30
-rwxr-xr-xetc/install-mingw.sh69
-rw-r--r--examples/amqp_bind.c3
-rw-r--r--examples/amqp_consumer.c17
-rw-r--r--examples/amqp_exchange_declare.c3
-rw-r--r--examples/amqp_listen.c9
-rw-r--r--examples/amqp_listenq.c9
-rw-r--r--examples/amqp_producer.c11
-rw-r--r--examples/amqp_sendstring.c3
-rw-r--r--examples/amqp_unbind.c3
-rw-r--r--examples/example_utils.c5
-rw-r--r--librabbitmq/Makefile.am7
-rw-r--r--librabbitmq/amqp.h11
-rw-r--r--librabbitmq/amqp_api.c35
-rw-r--r--librabbitmq/amqp_connection.c50
-rw-r--r--librabbitmq/amqp_mem.c20
-rw-r--r--librabbitmq/amqp_private.h32
-rw-r--r--librabbitmq/amqp_socket.c98
-rw-r--r--librabbitmq/amqp_table.c18
-rw-r--r--librabbitmq/codegen.py15
-rw-r--r--librabbitmq/unix/socket.c85
-rw-r--r--librabbitmq/unix/socket.h80
-rw-r--r--librabbitmq/windows/socket.c88
-rw-r--r--librabbitmq/windows/socket.h92
-rw-r--r--tests/Makefile.am2
-rw-r--r--tests/test_tables.c34
-rw-r--r--tools/Makefile.am20
-rw-r--r--tools/common.c120
-rw-r--r--tools/common.h16
-rw-r--r--tools/consume.c11
-rw-r--r--tools/get.c2
-rw-r--r--tools/publish.c5
-rw-r--r--tools/unix/process.c100
-rw-r--r--tools/unix/process.h57
-rw-r--r--tools/windows/compat.c73
-rw-r--r--tools/windows/compat.h51
-rw-r--r--tools/windows/process.c204
-rw-r--r--tools/windows/process.h59
39 files changed, 1342 insertions, 282 deletions
diff --git a/README.windows b/README.windows
new file mode 100644
index 0000000..723c25e
--- /dev/null
+++ b/README.windows
@@ -0,0 +1,77 @@
+# rabbitmq-c and Windows
+
+rabbitmq-c can now be built on Windows using the MinGW/MSYS ports of
+the GNU toolchain and miscellaneous utilities. This includes the
+example programs and tools.
+
+The results are native Windows DLLs and EXEs, and can be used without
+having MinGW installed. But the librabbitmq header files currently
+use GCC extensions, and for this reason it is still not possible to
+use Microsoft's C/C++ to build applications against the librabbitmq
+DLL. Hopefully this will get fixed before long.
+
+
+# Building rabbitmq-c
+
+rabbitmq-c is built on Windows using MinGW and MSYS. In brief, MinGW
+is a native port of the GNU toolchain to Windows; MSYS is a set of
+ports of common GNU utilities to run under Windows, so that typical
+autotools-based builds will work there. MinGW/MSYS can be used to
+build native Windows applications and DLLs, which do not depend on
+MinGW/MSYS to run.
+
+So to build rabbitmq-c on Windows, you need to download and install
+the relevant parts of MinGW/MSYS. This can be a fairly time consuming
+process - there are about 20 files to be downloaded and unpacked. To
+make it easier, we provide a bash script that automates this process,
+in rabbitmq-c/etc/install-mingw.sh. You can run this cygwin, or under
+Linux and copy the results over or put them on a shared drive. Some
+MinGW packages are .tar.lzma files, so it requires a system with xz
+and a tar that supports the -J option, which probably rules out OSX.
+
+Run install-mingw.sh specifying the destination directory, e.g.
+
+ $ etc/install-mingw.sh /tmp/mingw
+
+Python is needed for the rabbitmq-c build, so you will also need to
+install python under Windows. The Windows installer from python.org
+will do fine.
+
+You will need to get the rabbitmq-c and rabbitmq-codegen source code.
+If you are getting it from Mercurial, you will need to run "autoreconf
+-i" within rabbitmq-c somewhere other than under MinGW first (perhaps
+this can be done under MinGW/MSYS, but the packages installed by
+install-mingw.sh are not sufficient).
+
+Open a cmd window, and ensure that both the MinGW bin directory and the python install directory are in the path, e.g.
+
+ C:\>set PATH=%PATH%;C:\mingw\bin;C:\Python26
+
+Then start bash, and run the following mount command (substituting the
+Windows path of your MinGW install):
+
+ C:\>bash
+ bash-3.1$ mount 'C:\mingw' /mingw
+
+Then go to the rabbitmq-c directory, and configure and make as normal:
+
+ bash-3.1$ ./configure && make
+ [...]
+
+
+# Running the tools without mingw
+
+You can run the resulting tools EXEs without the rest of MinGW. To do
+this, copy the following files into a directory:
+
+- rabbitmq-c/tools/.libs/*.exe
+
+- rabbitmq-c/librabbitmq/.libs/librabbitmq-0.dll
+
+- /bin/libpopt-0.dll
+
+- /bin/libiconv-2.dll
+
+- /bin/libintl-8.dll
+
+
diff --git a/configure.ac b/configure.ac
index 0269bd3..c516eb8 100644
--- a/configure.ac
+++ b/configure.ac
@@ -9,6 +9,7 @@ AC_GNU_SOURCE
AC_PROG_CC
dnl Library checks
+AC_LIBTOOL_WIN32_DLL
AM_PROG_LIBTOOL
dnl Header-file checks
@@ -21,6 +22,26 @@ if test "x$GCC" = "xyes"; then
fi
fi
+dnl Detect the kind of host we're building for
+AC_CANONICAL_HOST
+windows=no
+case "${host}" in
+*-*-mingw*)
+ windows=yes
+ ;;
+esac
+AM_CONDITIONAL(WINDOWS, test "x$windows" = xyes)
+AS_IF([test "x$windows" = xyes],
+ [AC_DEFINE([WINDOWS], [1], [Define to 1 if on Windows.])]
+)
+
+dnl Decide which API abstraction layer to use
+PLATFORM_DIR=unix
+if test "x$windows" = xyes ; then
+ PLATFORM_DIR=windows
+fi
+AC_SUBST(PLATFORM_DIR)
+
dnl Enable -m64 if we were asked to do so
AC_ARG_ENABLE(64-bit,
[ --enable-64-bit produce 64-bit library],
@@ -46,7 +67,8 @@ checkPython() {
return
fi
PYTHON=$1
- if $PYTHON -c 'import simplejson' 2>/dev/null
+ if $PYTHON -c 'import json' 2>/dev/null \
+ || $PYTHON -c 'import simplejson' 2>/dev/null
then
found_python=yes
AC_MSG_RESULT($PYTHON)
@@ -64,8 +86,12 @@ AC_SUBST(AMQP_CODEGEN_DIR)
AC_SUBST(AMQP_SPEC_JSON_PATH)
AC_SUBST(PYTHON)
-# Check for libpopt, which we need to build the tools
+dnl Decide which extra win32 libs we need
+EXTRA_LIBS=
+AS_IF([test "x$windows" = xyes], [EXTRA_LIBS="-lws2_32 $EXTRA_LIBS"])
+AC_SUBST(EXTRA_LIBS)
+dnl Check for libpopt, which we need to build the tools
AC_ARG_WITH([popt],
[AS_HELP_STRING([--with-popt], [use the popt library. Needed for tools.])],
[],
diff --git a/etc/install-mingw.sh b/etc/install-mingw.sh
new file mode 100755
index 0000000..7fdd339
--- /dev/null
+++ b/etc/install-mingw.sh
@@ -0,0 +1,69 @@
+#!/bin/bash
+
+if [ $# -ne 1 ] ; then
+ echo "usage: install-mingw.sh <destination directory>" 1>&2
+ exit 1
+fi
+
+unpack_dir=$1
+
+if [ -eb "$unpack_dir" ] ; then
+ echo "Destination directory already exists; please delete it if you are sure" 1>&2
+ exit 1
+fi
+
+set -e
+
+download_dir=/tmp/install-mingw.$$
+mkdir -p $download_dir $unpack_dir
+
+while read f ; do
+ wget -P $download_dir -N http://switch.dl.sourceforge.net/project/mingw/$f
+done <<EOF
+MinGW/mpc/mpc-0.8.1-1/libmpc-0.8.1-1-mingw32-dll-2.tar.lzma
+MinGW/BaseSystem/GCC/Version4/gcc-4.5.0-1/gcc-core-4.5.0-1-mingw32-bin.tar.lzma
+MinGW/BaseSystem/GCC/Version4/gcc-4.5.0-1/libgcc-4.5.0-1-mingw32-dll-1.tar.lzma
+MSYS/BaseSystem/msys-1.0.14-1/msysCORE-1.0.14-1-msys-1.0.14-bin.tar.lzma
+MinGW/BaseSystem/GNU-Binutils/binutils-2.20.1/binutils-2.20.1-2-mingw32-bin.tar.gz
+MinGW/BaseSystem/RuntimeLibrary/MinGW-RT/mingwrt-3.18/mingwrt-3.18-mingw32-dll.tar.gz
+MinGW/BaseSystem/RuntimeLibrary/MinGW-RT/mingwrt-3.18/mingwrt-3.18-mingw32-dev.tar.gz
+MinGW/pthreads-w32/pthreads-w32-2.8.0-3/libpthread-2.8.0-3-mingw32-dll-2.tar.lzma
+MinGW/mpfr/mpfr-2.4.1-1/libmpfr-2.4.1-1-mingw32-dll-1.tar.lzma
+MinGW/gmp/gmp-5.0.1-1/libgmpxx-5.0.1-1-mingw32-dll-4.tar.lzma
+MinGW/gmp/gmp-5.0.1-1/libgmp-5.0.1-1-mingw32-dll-10.tar.lzma
+MinGW/BaseSystem/RuntimeLibrary/Win32-API/w32api-3.14/w32api-3.14-mingw32-dev.tar.gz
+MSYS/make/make-3.81-2/make-3.81-2-msys-1.0.11-bin.tar.lzma
+MSYS/bash/bash-3.1.17-2/bash-3.1.17-2-msys-1.0.11-bin.tar.lzma
+MSYS/coreutils/coreutils-5.97-2/coreutils-5.97-2-msys-1.0.11-bin.tar.lzma
+MinGW/popt/popt-1.15-1/libpopt-1.15-1-mingw32-dll-0.tar.lzma
+MinGW/popt/popt-1.15-1/libpopt-1.15-1-mingw32-dev.tar.lzma
+MSYS/diffutils/diffutils-2.8.7.20071206cvs-2/diffutils-2.8.7.20071206cvs-2-msys-1.0.11-bin.tar.lzma
+MSYS/gawk/gawk-3.1.7-1/gawk-3.1.7-1-msys-1.0.11-bin.tar.lzma
+MSYS/grep/grep-2.5.4-1/grep-2.5.4-1-msys-1.0.11-bin.tar.lzma
+MSYS/sed/sed-4.2.1-1/sed-4.2.1-1-msys-1.0.11-bin.tar.lzma
+MSYS/libtool/libtool-2.2.7a-1/libtool-2.2.7a-1-msys-1.0.11-bin.tar.lzma
+MinGW/gettext/gettext-0.17-1/libintl-0.17-1-mingw32-dll-8.tar.lzma
+MinGW/gettext/gettext-0.17-1/gettext-0.17-1-mingw32-dev.tar.lzma
+MinGW/libiconv/libiconv-1.13.1-1/libiconv-1.13.1-1-mingw32-dll-2.tar.lzma
+MinGW/libiconv/libiconv-1.13.1-1/libiconv-1.13.1-1-mingw32-dev.tar.lzma
+MinGW/libiconv/libiconv-1.13.1-1/libcharset-1.13.1-1-mingw32-dll-1.tar.lzma
+EOF
+
+for f in $download_dir/* ; do
+ case $f in
+ *.tar.gz)
+ tar -C $unpack_dir -xzf $f
+ ;;
+
+ *.tar.lzma)
+ tar -C $unpack_dir -xJf $f
+ ;;
+
+ *)
+ echo "Don't know how to unpack $f" 1>&2
+ exit 1
+ ;;
+ esac
+done
+
+rm -rf $download_dir
diff --git a/examples/amqp_bind.c b/examples/amqp_bind.c
index 697df2a..cf7c377 100644
--- a/examples/amqp_bind.c
+++ b/examples/amqp_bind.c
@@ -99,7 +99,6 @@ int main(int argc, char const * const *argv) {
die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel");
die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection");
- amqp_destroy_connection(conn);
- die_on_error(close(sockfd), "Closing socket");
+ die_on_error(amqp_end_connection(conn), "Ending connection");
return 0;
}
diff --git a/examples/amqp_consumer.c b/examples/amqp_consumer.c
index aa33639..5dfbb33 100644
--- a/examples/amqp_consumer.c
+++ b/examples/amqp_consumer.c
@@ -84,8 +84,8 @@ static void run(amqp_connection_state_t conn)
if (now > next_summary_time) {
int countOverInterval = received - previous_received;
double intervalRate = countOverInterval / ((now - previous_report_time) / 1000000.0);
- printf("%lld ms: Received %d - %d since last report (%d Hz)\n",
- (now - start_time) / 1000, received, countOverInterval, (int) intervalRate);
+ printf("%d ms: Received %d - %d since last report (%d Hz)\n",
+ (int)(now - start_time) / 1000, received, countOverInterval, (int) intervalRate);
previous_received = received;
previous_report_time = now;
@@ -94,7 +94,8 @@ static void run(amqp_connection_state_t conn)
amqp_maybe_release_buffers(conn);
result = amqp_simple_wait_frame(conn, &frame);
- if (result <= 0) return;
+ if (result < 0)
+ return;
if (frame.frame_type != AMQP_FRAME_METHOD)
continue;
@@ -103,7 +104,9 @@ static void run(amqp_connection_state_t conn)
continue;
result = amqp_simple_wait_frame(conn, &frame);
- if (result <= 0) return;
+ if (result < 0)
+ return;
+
if (frame.frame_type != AMQP_FRAME_HEADER) {
fprintf(stderr, "Expected header!");
abort();
@@ -114,7 +117,8 @@ static void run(amqp_connection_state_t conn)
while (body_received < body_target) {
result = amqp_simple_wait_frame(conn, &frame);
- if (result <= 0) return;
+ if (result < 0)
+ return;
if (frame.frame_type != AMQP_FRAME_BODY) {
fprintf(stderr, "Expected body!");
@@ -180,8 +184,7 @@ int main(int argc, char const * const *argv) {
die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel");
die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection");
- amqp_destroy_connection(conn);
- die_on_error(close(sockfd), "Closing socket");
+ die_on_error(amqp_end_connection(conn), "Ending connection");
return 0;
}
diff --git a/examples/amqp_exchange_declare.c b/examples/amqp_exchange_declare.c
index 14bc163..32e71b0 100644
--- a/examples/amqp_exchange_declare.c
+++ b/examples/amqp_exchange_declare.c
@@ -94,7 +94,6 @@ int main(int argc, char const * const *argv) {
die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel");
die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection");
- amqp_destroy_connection(conn);
- die_on_error(close(sockfd), "Closing socket");
+ die_on_error(amqp_end_connection(conn), "Ending connection");
return 0;
}
diff --git a/examples/amqp_listen.c b/examples/amqp_listen.c
index 0162ce6..412dcd5 100644
--- a/examples/amqp_listen.c
+++ b/examples/amqp_listen.c
@@ -125,7 +125,7 @@ int main(int argc, char const * const *argv) {
amqp_maybe_release_buffers(conn);
result = amqp_simple_wait_frame(conn, &frame);
printf("Result %d\n", result);
- if (result <= 0)
+ if (result < 0)
break;
printf("Frame type %d, channel %d\n", frame.frame_type, frame.channel);
@@ -143,7 +143,7 @@ int main(int argc, char const * const *argv) {
(int) d->routing_key.len, (char *) d->routing_key.bytes);
result = amqp_simple_wait_frame(conn, &frame);
- if (result <= 0)
+ if (result < 0)
break;
if (frame.frame_type != AMQP_FRAME_HEADER) {
@@ -162,7 +162,7 @@ int main(int argc, char const * const *argv) {
while (body_received < body_target) {
result = amqp_simple_wait_frame(conn, &frame);
- if (result <= 0)
+ if (result < 0)
break;
if (frame.frame_type != AMQP_FRAME_BODY) {
@@ -187,8 +187,7 @@ int main(int argc, char const * const *argv) {
die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel");
die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection");
- amqp_destroy_connection(conn);
- die_on_error(close(sockfd), "Closing socket");
+ die_on_error(amqp_end_connection(conn), "Ending connection");
return 0;
}
diff --git a/examples/amqp_listenq.c b/examples/amqp_listenq.c
index 9100dec..1fc7db0 100644
--- a/examples/amqp_listenq.c
+++ b/examples/amqp_listenq.c
@@ -107,7 +107,7 @@ int main(int argc, char const * const *argv) {
amqp_maybe_release_buffers(conn);
result = amqp_simple_wait_frame(conn, &frame);
printf("Result %d\n", result);
- if (result <= 0)
+ if (result < 0)
break;
printf("Frame type %d, channel %d\n", frame.frame_type, frame.channel);
@@ -125,7 +125,7 @@ int main(int argc, char const * const *argv) {
(int) d->routing_key.len, (char *) d->routing_key.bytes);
result = amqp_simple_wait_frame(conn, &frame);
- if (result <= 0)
+ if (result < 0)
break;
if (frame.frame_type != AMQP_FRAME_HEADER) {
@@ -144,7 +144,7 @@ int main(int argc, char const * const *argv) {
while (body_received < body_target) {
result = amqp_simple_wait_frame(conn, &frame);
- if (result <= 0)
+ if (result < 0)
break;
if (frame.frame_type != AMQP_FRAME_BODY) {
@@ -171,8 +171,7 @@ int main(int argc, char const * const *argv) {
die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel");
die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection");
- amqp_destroy_connection(conn);
- die_on_error(close(sockfd), "Closing socket");
+ die_on_error(amqp_end_connection(conn), "Ending connection");
return 0;
}
diff --git a/examples/amqp_producer.c b/examples/amqp_producer.c
index ac6eebc..61cc925 100644
--- a/examples/amqp_producer.c
+++ b/examples/amqp_producer.c
@@ -95,8 +95,8 @@ static void send_batch(amqp_connection_state_t conn,
if (now > next_summary_time) {
int countOverInterval = sent - previous_sent;
double intervalRate = countOverInterval / ((now - previous_report_time) / 1000000.0);
- printf("%lld ms: Sent %d - %d since last report (%d Hz)\n",
- (now - start_time) / 1000, sent, countOverInterval, (int) intervalRate);
+ printf("%d ms: Sent %d - %d since last report (%d Hz)\n",
+ (int)(now - start_time) / 1000, sent, countOverInterval, (int) intervalRate);
previous_sent = sent;
previous_report_time = now;
@@ -111,10 +111,10 @@ static void send_batch(amqp_connection_state_t conn,
{
long long stop_time = now_microseconds();
- long long total_delta = stop_time - start_time;
+ int total_delta = stop_time - start_time;
printf("PRODUCER - Message count: %d\n", message_count);
- printf("Total time, milliseconds: %lld\n", total_delta / 1000);
+ printf("Total time, milliseconds: %d\n", total_delta / 1000);
printf("Overall messages-per-second: %g\n", (message_count / (total_delta / 1000000.0)));
}
}
@@ -151,7 +151,6 @@ int main(int argc, char const * const *argv) {
die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel");
die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection");
- amqp_destroy_connection(conn);
- die_on_error(close(sockfd), "Closing socket");
+ die_on_error(amqp_end_connection(conn), "Ending connection");
return 0;
}
diff --git a/examples/amqp_sendstring.c b/examples/amqp_sendstring.c
index 6e8e0b6..e669505 100644
--- a/examples/amqp_sendstring.c
+++ b/examples/amqp_sendstring.c
@@ -108,7 +108,6 @@ int main(int argc, char const * const *argv) {
die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel");
die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection");
- amqp_destroy_connection(conn);
- die_on_error(close(sockfd), "Closing socket");
+ die_on_error(amqp_end_connection(conn), "Ending connection");
return 0;
}
diff --git a/examples/amqp_unbind.c b/examples/amqp_unbind.c
index 27df916..bc92efe 100644
--- a/examples/amqp_unbind.c
+++ b/examples/amqp_unbind.c
@@ -99,7 +99,6 @@ int main(int argc, char const * const *argv) {
die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel");
die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection");
- amqp_destroy_connection(conn);
- die_on_error(close(sockfd), "Closing socket");
+ die_on_error(amqp_end_connection(conn), "Ending connection");
return 0;
}
diff --git a/examples/example_utils.c b/examples/example_utils.c
index 628572c..ae8f093 100644
--- a/examples/example_utils.c
+++ b/examples/example_utils.c
@@ -61,7 +61,7 @@
void die_on_error(int x, char const *context) {
if (x < 0) {
- fprintf(stderr, "%s: %s\n", context, strerror(-x));
+ fprintf(stderr, "%s: %s\n", context, amqp_error_string(-x));
exit(1);
}
}
@@ -76,8 +76,7 @@ void die_on_amqp_error(amqp_rpc_reply_t x, char const *context) {
break;
case AMQP_RESPONSE_LIBRARY_EXCEPTION:
- fprintf(stderr, "%s: %s\n", context,
- x.library_errno ? strerror(x.library_errno) : "(end-of-stream)");
+ fprintf(stderr, "%s: %s\n", context, amqp_error_string(x.library_error));
break;
case AMQP_RESPONSE_SERVER_EXCEPTION:
diff --git a/librabbitmq/Makefile.am b/librabbitmq/Makefile.am
index b4c8843..53c9ea1 100644
--- a/librabbitmq/Makefile.am
+++ b/librabbitmq/Makefile.am
@@ -1,9 +1,12 @@
lib_LTLIBRARIES = librabbitmq.la
-librabbitmq_la_SOURCES = amqp_mem.c amqp_table.c amqp_connection.c amqp_socket.c amqp_debug.c amqp_api.c
+AM_CFLAGS = -I$(PLATFORM_DIR)
+librabbitmq_la_SOURCES = amqp_mem.c amqp_table.c amqp_connection.c amqp_socket.c amqp_debug.c amqp_api.c $(PLATFORM_DIR)/socket.c
+librabbitmq_la_LDFLAGS = -no-undefined
+librabbitmq_la_LIBADD = $(EXTRA_LIBS)
nodist_librabbitmq_la_SOURCES = amqp_framing.c
include_HEADERS = amqp_framing.h amqp.h
-noinst_HEADERS = amqp_private.h
+noinst_HEADERS = amqp_private.h $(PLATFORM_DIR)/socket.h
BUILT_SOURCES = amqp_framing.h amqp_framing.c
CLEANFILES = amqp_framing.h amqp_framing.c
EXTRA_DIST = codegen.py
diff --git a/librabbitmq/amqp.h b/librabbitmq/amqp.h
index 017b86e..fa76637 100644
--- a/librabbitmq/amqp.h
+++ b/librabbitmq/amqp.h
@@ -263,7 +263,7 @@ typedef enum amqp_response_type_enum_ {
typedef struct amqp_rpc_reply_t_ {
amqp_response_type_enum reply_type;
amqp_method_t reply;
- int library_errno; /* if AMQP_RESPONSE_LIBRARY_EXCEPTION, then 0 here means socket EOF */
+ int library_error; /* if AMQP_RESPONSE_LIBRARY_EXCEPTION, then 0 here means socket EOF */
} amqp_rpc_reply_t;
typedef enum amqp_sasl_method_enum_ {
@@ -310,6 +310,7 @@ extern int amqp_tune_connection(amqp_connection_state_t state,
int heartbeat);
int amqp_get_channel_max(amqp_connection_state_t state);
extern void amqp_destroy_connection(amqp_connection_state_t state);
+extern int amqp_end_connection(amqp_connection_state_t state);
extern int amqp_handle_input(amqp_connection_state_t state,
amqp_bytes_t received_data,
@@ -500,6 +501,14 @@ extern amqp_boolean_t amqp_data_in_buffer(amqp_connection_state_t state);
*/
extern amqp_rpc_reply_t amqp_get_rpc_reply(amqp_connection_state_t state);
+/*
+ * Get the error string for the given error code.
+ *
+ * The returned string resides on the heap; the caller is responsible
+ * for freeing it.
+ */
+extern const char *amqp_error_string(int err);
+
#ifdef __cplusplus
}
#endif
diff --git a/librabbitmq/amqp_api.c b/librabbitmq/amqp_api.c
index a748f90..3724a37 100644
--- a/librabbitmq/amqp_api.c
+++ b/librabbitmq/amqp_api.c
@@ -52,7 +52,6 @@
#include <stdio.h>
#include <string.h>
#include <stdint.h>
-#include <errno.h>
#include "amqp.h"
#include "amqp_framing.h"
@@ -60,6 +59,40 @@
#include <assert.h>
+static const char *client_error_strings[ERROR_MAX] = {
+ "could not allocate memory", /* ERROR_NO_MEMORY */
+ "received bad AMQP data", /* ERROR_BAD_AQMP_DATA */
+ "unknown AMQP class id", /* ERROR_UNKOWN_CLASS */
+ "unknown AMQP method id", /* ERROR_UNKOWN_METHOD */
+ "unknown host", /* ERROR_HOST_NOT_FOUND */
+ "incompatible AMQP version", /* ERROR_INCOMPATIBLE_AMQP_VERSION */
+ "connection closed unexpectedly", /* ERROR_CONNECTION_CLOSED */
+};
+
+const char *amqp_error_string(int err)
+{
+ const char *str;
+ int category = (err & ERROR_CATEGORY_MASK);
+ err = (err & ~ERROR_CATEGORY_MASK);
+
+ switch (category) {
+ case ERROR_CATEGORY_CLIENT:
+ if (err < 1 || err > ERROR_MAX)
+ str = "(undefined librabbitmq error)";
+ else
+ str = client_error_strings[err - 1];
+ break;
+
+ case ERROR_CATEGORY_OS:
+ return amqp_os_error_string(err);
+
+ default:
+ str = "(undefined error category)";
+ }
+
+ return strdup(str);
+}
+
#define RPC_REPLY(replytype) \
(state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL \
? (replytype *) state->most_recent_api_result.reply.decoded \
diff --git a/librabbitmq/amqp_connection.c b/librabbitmq/amqp_connection.c
index 8623eed..63af96a 100644
--- a/librabbitmq/amqp_connection.c
+++ b/librabbitmq/amqp_connection.c
@@ -52,17 +52,13 @@
#include <stdio.h>
#include <string.h>
#include <stdint.h>
-#include <errno.h>
-
-#include <unistd.h>
-#include <sys/uio.h>
-#include <sys/types.h>
+#include <assert.h>
#include "amqp.h"
#include "amqp_framing.h"
#include "amqp_private.h"
-#include <assert.h>
+#include "socket.h"
#define INITIAL_FRAME_POOL_PAGE_SIZE 65536
#define INITIAL_DECODING_POOL_PAGE_SIZE 131072
@@ -151,7 +147,7 @@ int amqp_tune_connection(amqp_connection_state_t state,
newbuf = realloc(state->outbound_buffer.bytes, frame_max);
if (newbuf == NULL) {
amqp_destroy_connection(state);
- return -ENOMEM;
+ return -ERROR_NO_MEMORY;
}
state->outbound_buffer.bytes = newbuf;
@@ -170,6 +166,15 @@ void amqp_destroy_connection(amqp_connection_state_t state) {
free(state);
}
+int amqp_end_connection(amqp_connection_state_t state) {
+ int s = state->sockfd;
+ amqp_destroy_connection(state);
+ if (socket_close(s) < 0)
+ return -encoded_socket_errno();
+ else
+ return 0;
+}
+
static void return_to_idle(amqp_connection_state_t state) {
state->inbound_buffer.bytes = NULL;
state->inbound_offset = 0;
@@ -199,7 +204,7 @@ int amqp_handle_input(amqp_connection_state_t state,
/* state->inbound_buffer.len is always nonzero, because it
corresponds to frame_max, which is not permitted to be less
than AMQP_FRAME_MIN_SIZE (currently 4096 bytes). */
- return -ENOMEM;
+ return -ERROR_NO_MEMORY;
}
state->state = CONNECTION_STATE_WAITING_FOR_HEADER;
}
@@ -246,7 +251,7 @@ int amqp_handle_input(amqp_connection_state_t state,
/* Check frame end marker (footer) */
if (D_8(state->inbound_buffer, state->target_size - 1) != AMQP_FRAME_END) {
- return -EINVAL;
+ return -ERROR_BAD_AMQP_DATA;
}
decoded_frame->channel = D_16(state->inbound_buffer, 1);
@@ -392,7 +397,7 @@ static int inner_send_frame(amqp_connection_state_t state,
break;
default:
- return -EINVAL;
+ abort();
}
E_32(state->outbound_buffer, 3, *payload_len);
@@ -419,16 +424,14 @@ int amqp_send_frame(amqp_connection_state_t state,
amqp_frame_t const *frame)
{
amqp_bytes_t encoded;
- int payload_len;
- int separate_body;
+ int payload_len, res;
- separate_body = inner_send_frame(state, frame, &encoded, &payload_len);
- switch (separate_body) {
+ res = inner_send_frame(state, frame, &encoded, &payload_len);
+ switch (res) {
case 0:
- AMQP_CHECK_RESULT(write(state->sockfd,
- state->outbound_buffer.bytes,
- payload_len + (HEADER_SIZE + FOOTER_SIZE)));
- return 0;
+ res = socket_write(state->sockfd, state->outbound_buffer.bytes,
+ payload_len + (HEADER_SIZE + FOOTER_SIZE));
+ break;
case 1: {
struct iovec iov[3];
@@ -440,13 +443,18 @@ int amqp_send_frame(amqp_connection_state_t state,
iov[2].iov_base = &frame_end_byte;
assert(FOOTER_SIZE == 1);
iov[2].iov_len = FOOTER_SIZE;
- AMQP_CHECK_RESULT(writev(state->sockfd, &iov[0], 3));
- return 0;
+ res = socket_writev(state->sockfd, &iov[0], 3);
+ break;
}
default:
- return separate_body;
+ return res;
}
+
+ if (res < 0)
+ return -encoded_socket_errno();
+ else
+ return 0;
}
int amqp_send_frame_to(amqp_connection_state_t state,
diff --git a/librabbitmq/amqp_mem.c b/librabbitmq/amqp_mem.c
index 6e52dc8..021151a 100644
--- a/librabbitmq/amqp_mem.c
+++ b/librabbitmq/amqp_mem.c
@@ -53,7 +53,6 @@
#include <string.h>
#include <stdint.h>
#include <sys/types.h>
-#include <errno.h>
#include <assert.h>
#include "amqp.h"
@@ -102,25 +101,24 @@ void empty_amqp_pool(amqp_pool_t *pool) {
empty_blocklist(&pool->pages);
}
+/* Returns 1 on success, 0 on failure */
static int record_pool_block(amqp_pool_blocklist_t *x, void *block) {
size_t blocklistlength = sizeof(void *) * (x->num_blocks + 1);
if (x->blocklist == NULL) {
x->blocklist = malloc(blocklistlength);
- if (x->blocklist == NULL) {
- return -ENOMEM;
- }
+ if (x->blocklist == NULL)
+ return 0;
} else {
void *newbl = realloc(x->blocklist, blocklistlength);
- if (newbl == NULL) {
- return -ENOMEM;
- }
+ if (newbl == NULL)
+ return 0;
x->blocklist = newbl;
}
x->blocklist[x->num_blocks] = block;
x->num_blocks++;
- return 0;
+ return 1;
}
void *amqp_pool_alloc(amqp_pool_t *pool, size_t amount) {
@@ -135,9 +133,8 @@ void *amqp_pool_alloc(amqp_pool_t *pool, size_t amount) {
if (result == NULL) {
return NULL;
}
- if (record_pool_block(&pool->large_blocks, result) != 0) {
+ if (!record_pool_block(&pool->large_blocks, result))
return NULL;
- }
return result;
}
@@ -156,9 +153,8 @@ void *amqp_pool_alloc(amqp_pool_t *pool, size_t amount) {
if (pool->alloc_block == NULL) {
return NULL;
}
- if (record_pool_block(&pool->pages, pool->alloc_block) != 0) {
+ if (!record_pool_block(&pool->pages, pool->alloc_block))
return NULL;
- }
pool->next_page = pool->pages.num_blocks;
} else {
pool->alloc_block = pool->pages.blocklist[pool->next_page];
diff --git a/librabbitmq/amqp_private.h b/librabbitmq/amqp_private.h
index 3985619..7206ae5 100644
--- a/librabbitmq/amqp_private.h
+++ b/librabbitmq/amqp_private.h
@@ -55,7 +55,28 @@
extern "C" {
#endif
-#include <arpa/inet.h> /* ntohl, htonl, ntohs, htons */
+/* Error numbering: Because of differences in error numbering on
+ * different platforms, we want to keep error numbers opaque for
+ * client code. Internally, we encode the category of an error
+ * (i.e. where its number comes from) in the top bits of the number
+ * (assuming that an int has at least 32 bits).
+ */
+#define ERROR_CATEGORY_MASK (1 << 29)
+
+#define ERROR_CATEGORY_CLIENT (0 << 29) /* librabbitmq error codes */
+#define ERROR_CATEGORY_OS (1 << 29) /* OS-specific error codes */
+
+/* librabbitmq error codes */
+#define ERROR_NO_MEMORY 1
+#define ERROR_BAD_AMQP_DATA 2
+#define ERROR_UNKNOWN_CLASS 3
+#define ERROR_UNKNOWN_METHOD 4
+#define ERROR_HOST_NOT_FOUND 5
+#define ERROR_INCOMPATIBLE_AMQP_VERSION 6
+#define ERROR_CONNECTION_CLOSED 7
+#define ERROR_MAX 7
+
+extern const char *amqp_os_error_string(int err);
/*
* Connection states:
@@ -125,7 +146,7 @@ struct amqp_connection_state_t_ {
amqp_rpc_reply_t most_recent_api_result;
};
-#define CHECK_LIMIT(b, o, l, v) ({ if ((o + l) > (b).len) { return -EFAULT; } (v); })
+#define CHECK_LIMIT(b, o, l, v) ({ if ((o + l) > (b).len) { return -ERROR_BAD_AMQP_DATA; } (v); })
#define BUF_AT(b, o) (&(((uint8_t *) (b).bytes)[o]))
#define D_8(b, o) CHECK_LIMIT(b, o, 1, * (uint8_t *) BUF_AT(b, o))
@@ -176,13 +197,6 @@ extern int amqp_encode_table(amqp_bytes_t encoded,
#define AMQP_CHECK_RESULT(expr) AMQP_CHECK_RESULT_CLEANUP(expr, )
-#define AMQP_CHECK_EOF_RESULT(expr) \
- ({ \
- int _result = (expr); \
- if (_result <= 0) return _result; \
- _result; \
- })
-
#ifndef NDEBUG
extern void amqp_dump(void const *buffer, size_t len);
#else
diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c
index d16c319..6425c34 100644
--- a/librabbitmq/amqp_socket.c
+++ b/librabbitmq/amqp_socket.c
@@ -52,43 +52,40 @@
#include <stdio.h>
#include <string.h>
#include <stdint.h>
-#include <errno.h>
#include <stdarg.h>
+#include <assert.h>
#include "amqp.h"
#include "amqp_framing.h"
#include "amqp_private.h"
-#include <sys/types.h>
-#include <sys/uio.h>
-#include <unistd.h>
-#include <sys/socket.h>
-#include <netdb.h>
-#include <netinet/in.h>
+#include "socket.h"
-#include <assert.h>
int amqp_open_socket(char const *hostname,
int portnumber)
{
- int sockfd;
+ int sockfd, res;
struct sockaddr_in addr;
struct hostent *he;
+ res = socket_init();
+ if (res)
+ return res;
+
he = gethostbyname(hostname);
- if (he == NULL) {
- return -ENOENT;
- }
+ if (he == NULL)
+ return -ERROR_HOST_NOT_FOUND;
addr.sin_family = AF_INET;
addr.sin_port = htons(portnumber);
addr.sin_addr.s_addr = * (uint32_t *) he->h_addr_list[0];
- sockfd = socket(PF_INET, SOCK_STREAM, 0);
- if (connect(sockfd, (struct sockaddr *) &addr, sizeof(addr)) < 0) {
- int result = -errno;
- close(sockfd);
- return result;
+ sockfd = socket_socket(PF_INET, SOCK_STREAM, 0);
+ if (socket_connect(sockfd, (struct sockaddr *) &addr, sizeof(addr)) < 0) {
+ res = -encoded_socket_errno();
+ socket_close(sockfd);
+ return res;
}
return sockfd;
@@ -108,7 +105,7 @@ static char *header() {
}
int amqp_send_header(amqp_connection_state_t state) {
- return write(state->sockfd, header(), 8);
+ return socket_write(state->sockfd, header(), 8);
}
int amqp_send_header_to(amqp_connection_state_t state,
@@ -183,24 +180,22 @@ static int wait_frame_inner(amqp_connection_state_t state,
AMQP_CHECK_RESULT((result = amqp_handle_input(state, buffer, decoded_frame)));
state->sock_inbound_offset += result;
- if (decoded_frame->frame_type != 0) {
+ if (decoded_frame->frame_type != 0)
/* Complete frame was read. Return it. */
- return 1;
- }
+ return 0;
/* Incomplete or ignored frame. Keep processing input. */
assert(result != 0);
}
- result = read(state->sockfd,
- state->sock_inbound_buffer.bytes,
- state->sock_inbound_buffer.len);
- if (result < 0) {
- return -errno;
- }
- if (result == 0) {
- /* EOF. */
- return 0;
+ result = socket_read(state->sockfd,
+ state->sock_inbound_buffer.bytes,
+ state->sock_inbound_buffer.len);
+ if (result <= 0) {
+ if (result == 0)
+ return -ERROR_CONNECTION_CLOSED;
+ else
+ return -encoded_socket_errno();
}
state->sock_inbound_limit = result;
@@ -218,7 +213,7 @@ int amqp_simple_wait_frame(amqp_connection_state_t state,
state->last_queued_frame = NULL;
}
*decoded_frame = *f;
- return 1;
+ return 0;
} else {
return wait_frame_inner(state, decoded_frame);
}
@@ -230,8 +225,10 @@ int amqp_simple_wait_method(amqp_connection_state_t state,
amqp_method_t *output)
{
amqp_frame_t frame;
-
- AMQP_CHECK_EOF_RESULT(amqp_simple_wait_frame(state, &frame));
+ int res = amqp_simple_wait_frame(state, &frame);
+ if (res < 0)
+ return res;
+
amqp_assert(frame.channel == expected_channel,
"Expected 0x%08X method frame on channel %d, got frame on channel %d",
expected_method,
@@ -248,7 +245,7 @@ int amqp_simple_wait_method(amqp_connection_state_t state,
expected_channel,
frame.payload.method.id);
*output = frame.payload.method;
- return 1;
+ return 0;
}
int amqp_send_method(amqp_connection_state_t state,
@@ -288,7 +285,7 @@ amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state,
status = amqp_send_method(state, channel, request_id, decoded_request_method);
if (status < 0) {
result.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
- result.library_errno = -status;
+ result.library_error = -status;
return result;
}
@@ -297,9 +294,9 @@ amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state,
retry:
status = wait_frame_inner(state, &frame);
- if (status <= 0) {
+ if (status < 0) {
result.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
- result.library_errno = -status;
+ result.library_error = -status;
return result;
}
@@ -324,7 +321,7 @@ amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state,
if (frame_copy == NULL || link == NULL) {
result.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
- result.library_errno = ENOMEM;
+ result.library_error = ERROR_NO_MEMORY;
return result;
}
@@ -359,6 +356,7 @@ static int amqp_login_inner(amqp_connection_state_t state,
amqp_sasl_method_enum sasl_method,
va_list vl)
{
+ int res;
amqp_method_t method;
uint32_t server_frame_max;
uint16_t server_channel_max;
@@ -366,12 +364,16 @@ static int amqp_login_inner(amqp_connection_state_t state,
amqp_send_header(state);
- AMQP_CHECK_EOF_RESULT(amqp_simple_wait_method(state, 0, AMQP_CONNECTION_START_METHOD, &method));
+ res = amqp_simple_wait_method(state, 0, AMQP_CONNECTION_START_METHOD,
+ &method);
+ if (res < 0)
+ return res;
+
{
amqp_connection_start_t *s = (amqp_connection_start_t *) method.decoded;
if ((s->version_major != AMQP_PROTOCOL_VERSION_MAJOR) ||
(s->version_minor != AMQP_PROTOCOL_VERSION_MINOR)) {
- return -EPROTOTYPE;
+ return -ERROR_INCOMPATIBLE_AMQP_VERSION;
}
/* TODO: check that our chosen SASL mechanism is in the list of
@@ -383,7 +385,7 @@ static int amqp_login_inner(amqp_connection_state_t state,
amqp_bytes_t response_bytes = sasl_response(&state->decoding_pool, sasl_method, vl);
amqp_connection_start_ok_t s;
if (response_bytes.bytes == NULL) {
- return -ENOMEM;
+ return -ERROR_NO_MEMORY;
}
s =
(amqp_connection_start_ok_t) {
@@ -397,7 +399,11 @@ static int amqp_login_inner(amqp_connection_state_t state,
amqp_release_buffers(state);
- AMQP_CHECK_EOF_RESULT(amqp_simple_wait_method(state, 0, AMQP_CONNECTION_TUNE_METHOD, &method));
+ res = amqp_simple_wait_method(state, 0, AMQP_CONNECTION_TUNE_METHOD,
+ &method);
+ if (res < 0)
+ return res;
+
{
amqp_connection_tune_t *s = (amqp_connection_tune_t *) method.decoded;
server_channel_max = s->channel_max;
@@ -431,7 +437,7 @@ static int amqp_login_inner(amqp_connection_state_t state,
amqp_release_buffers(state);
- return 1;
+ return 0;
}
amqp_rpc_reply_t amqp_login(amqp_connection_state_t state,
@@ -449,11 +455,11 @@ amqp_rpc_reply_t amqp_login(amqp_connection_state_t state,
va_start(vl, sasl_method);
status = amqp_login_inner(state, channel_max, frame_max, heartbeat, sasl_method, vl);
- if (status <= 0) {
+ if (status < 0) {
result.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
result.reply.id = 0;
result.reply.decoded = NULL;
- result.library_errno = -status;
+ result.library_error = -status;
return result;
}
@@ -481,6 +487,6 @@ amqp_rpc_reply_t amqp_login(amqp_connection_state_t state,
result.reply_type = AMQP_RESPONSE_NORMAL;
result.reply.id = 0;
result.reply.decoded = NULL;
- result.library_errno = 0;
+ result.library_error = 0;
return result;
}
diff --git a/librabbitmq/amqp_table.c b/librabbitmq/amqp_table.c
index 25c5932..3f5eb61 100644
--- a/librabbitmq/amqp_table.c
+++ b/librabbitmq/amqp_table.c
@@ -52,10 +52,10 @@
#include <stdio.h>
#include <string.h>
#include <stdint.h>
-#include <errno.h>
#include "amqp.h"
#include "amqp_private.h"
+#include "socket.h"
#include <assert.h>
@@ -86,7 +86,7 @@ static int amqp_decode_array(amqp_bytes_t encoded,
int limit;
if (entries == NULL) {
- return -ENOMEM;
+ return -ERROR_NO_MEMORY;
}
offset += 4;
@@ -99,7 +99,7 @@ static int amqp_decode_array(amqp_bytes_t encoded,
newentries = realloc(entries, allocated_entries * sizeof(amqp_field_value_t));
if (newentries == NULL) {
free(entries);
- return -ENOMEM;
+ return -ERROR_NO_MEMORY;
}
entries = newentries;
}
@@ -117,7 +117,7 @@ static int amqp_decode_array(amqp_bytes_t encoded,
if (output->entries == NULL && num_entries > 0) {
/* NULL is legitimate if we requested a zero-length block. */
free(entries);
- return -ENOMEM;
+ return -ERROR_NO_MEMORY;
}
memcpy(output->entries, entries, num_entries * sizeof(amqp_field_value_t));
@@ -140,7 +140,7 @@ int amqp_decode_table(amqp_bytes_t encoded,
int limit;
if (entries == NULL) {
- return -ENOMEM;
+ return -ERROR_NO_MEMORY;
}
offset += 4;
@@ -159,7 +159,7 @@ int amqp_decode_table(amqp_bytes_t encoded,
newentries = realloc(entries, allocated_entries * sizeof(amqp_table_entry_t));
if (newentries == NULL) {
free(entries);
- return -ENOMEM;
+ return -ERROR_NO_MEMORY;
}
entries = newentries;
}
@@ -182,7 +182,7 @@ int amqp_decode_table(amqp_bytes_t encoded,
if (output->entries == NULL && num_entries > 0) {
/* NULL is legitimate if we requested a zero-length block. */
free(entries);
- return -ENOMEM;
+ return -ERROR_NO_MEMORY;
}
memcpy(output->entries, entries, num_entries * sizeof(amqp_table_entry_t));
@@ -274,7 +274,7 @@ static int amqp_decode_field_value(amqp_bytes_t encoded,
case AMQP_FIELD_KIND_VOID:
break;
default:
- return -EINVAL;
+ return -ERROR_BAD_AMQP_DATA;
}
*offsetptr = offset;
@@ -410,7 +410,7 @@ static int amqp_encode_field_value(amqp_bytes_t encoded,
case AMQP_FIELD_KIND_VOID:
break;
default:
- return -EINVAL;
+ abort();
}
*offsetptr = offset;
diff --git a/librabbitmq/codegen.py b/librabbitmq/codegen.py
index 7aea9f4..6b38666 100644
--- a/librabbitmq/codegen.py
+++ b/librabbitmq/codegen.py
@@ -170,7 +170,7 @@ def genErl(spec):
if m.arguments:
print " %s *m = (%s *) amqp_pool_alloc(pool, sizeof(%s));" % \
(m.structName(), m.structName(), m.structName())
- print " if (m == NULL) { return -ENOMEM; }"
+ print " if (m == NULL) { return -ERROR_NO_MEMORY; }"
else:
print " %s *m = NULL; /* no fields */" % (m.structName(),)
bitindex = None
@@ -197,7 +197,7 @@ def genErl(spec):
print " case %d: {" % (c.index,)
print " %s *p = (%s *) amqp_pool_alloc(pool, sizeof(%s));" % \
(c.structName(), c.structName(), c.structName())
- print " if (p == NULL) { return -ENOMEM; }"
+ print " if (p == NULL) { return -ERROR_NO_MEMORY; }"
print " p->_flags = flags;"
for f in c.fields:
if spec.resolveDomain(f.domain) == 'bit':
@@ -261,12 +261,11 @@ def genErl(spec):
print '#include <stdint.h>'
print '#include <string.h>'
print '#include <stdio.h>'
- print '#include <errno.h>'
- print '#include <arpa/inet.h> /* ntohl, htonl, ntohs, htons */'
print
print '#include "amqp.h"'
print '#include "amqp_framing.h"'
print '#include "amqp_private.h"'
+ print '#include "socket.h"'
print """
char const *amqp_constant_name(int constantNumber) {
@@ -317,7 +316,7 @@ int amqp_decode_method(amqp_method_number_t methodNumber,
switch (methodNumber) {"""
for m in methods: genDecodeMethodFields(m)
- print """ default: return -ENOENT;
+ print """ default: return -ERROR_UNKNOWN_METHOD;
}
}"""
@@ -343,7 +342,7 @@ int amqp_decode_properties(uint16_t class_id,
switch (class_id) {"""
for c in spec.allClasses(): genDecodeProperties(c)
- print """ default: return -ENOENT;
+ print """ default: return -ERROR_UNKNOWN_CLASS;
}
}"""
@@ -358,7 +357,7 @@ int amqp_encode_method(amqp_method_number_t methodNumber,
switch (methodNumber) {"""
for m in methods: genEncodeMethodFields(m)
- print """ default: return -ENOENT;
+ print """ default: return -ERROR_UNKNOWN_METHOD;
}
}"""
@@ -390,7 +389,7 @@ int amqp_encode_properties(uint16_t class_id,
switch (class_id) {"""
for c in spec.allClasses(): genEncodeProperties(c)
- print """ default: return -ENOENT;
+ print """ default: return -ERROR_UNKNOWN_CLASS;
}
}"""
diff --git a/librabbitmq/unix/socket.c b/librabbitmq/unix/socket.c
new file mode 100644
index 0000000..51db413
--- /dev/null
+++ b/librabbitmq/unix/socket.c
@@ -0,0 +1,85 @@
+/*
+ * ***** BEGIN LICENSE BLOCK *****
+ * Version: MPL 1.1/GPL 2.0
+ *
+ * The contents of this file are subject to the Mozilla Public License
+ * Version 1.1 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ * http://www.mozilla.org/MPL/
+ *
+ * Software distributed under the License is distributed on an "AS IS"
+ * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+ * the License for the specific language governing rights and
+ * limitations under the License.
+ *
+ * The Original Code is librabbitmq.
+ *
+ * The Initial Developers of the Original Code are LShift Ltd, Cohesive
+ * Financial Technologies LLC, and Rabbit Technologies Ltd. Portions
+ * created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, Cohesive
+ * Financial Technologies LLC, or Rabbit Technologies Ltd are Copyright
+ * (C) 2007-2008 LShift Ltd, Cohesive Financial Technologies LLC, and
+ * Rabbit Technologies Ltd.
+ *
+ * Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+ * Ltd. Portions created by Cohesive Financial Technologies LLC are
+ * Copyright (C) 2007-2010 Cohesive Financial Technologies
+ * LLC. Portions created by Rabbit Technologies Ltd are Copyright (C)
+ * 2007-2010 Rabbit Technologies Ltd.
+ *
+ * Portions created by Tony Garnock-Jones are Copyright (C) 2009-2010
+ * LShift Ltd and Tony Garnock-Jones.
+ *
+ * All Rights Reserved.
+ *
+ * Contributor(s): ______________________________________.
+ *
+ * Alternatively, the contents of this file may be used under the terms
+ * of the GNU General Public License Version 2 or later (the "GPL"), in
+ * which case the provisions of the GPL are applicable instead of those
+ * above. If you wish to allow use of your version of this file only
+ * under the terms of the GPL, and not to allow others to use your
+ * version of this file under the terms of the MPL, indicate your
+ * decision by deleting the provisions above and replace them with the
+ * notice and other provisions required by the GPL. If you do not
+ * delete the provisions above, a recipient may use your version of
+ * this file under the terms of any one of the MPL or the GPL.
+ *
+ * ***** END LICENSE BLOCK *****
+ */
+
+#include <sys/socket.h>
+#include <unistd.h>
+#include <fcntl.h>
+#include <stdint.h>
+#include <string.h>
+
+#include "amqp.h"
+#include "amqp_private.h"
+#include "socket.h"
+
+int socket_socket(int domain, int type, int proto)
+{
+ int flags;
+
+ int s = socket(domain, type, proto);
+ if (s < 0)
+ return s;
+
+ /* Always enable CLOEXEC on the socket */
+ flags = fcntl(s, F_GETFD);
+ if (flags == -1
+ || fcntl(s, F_SETFD, (long)(flags | FD_CLOEXEC)) == -1) {
+ int e = errno;
+ close(s);
+ errno = e;
+ return -1;
+ }
+
+ return s;
+}
+
+const char *amqp_os_error_string(int err)
+{
+ return strdup(strerror(err));
+}
diff --git a/librabbitmq/unix/socket.h b/librabbitmq/unix/socket.h
new file mode 100644
index 0000000..d7295c3
--- /dev/null
+++ b/librabbitmq/unix/socket.h
@@ -0,0 +1,80 @@
+#ifndef librabbitmq_unix_socket_h
+#define librabbitmq_unix_socket_h
+
+/*
+ * ***** BEGIN LICENSE BLOCK *****
+ * Version: MPL 1.1/GPL 2.0
+ *
+ * The contents of this file are subject to the Mozilla Public License
+ * Version 1.1 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ * http://www.mozilla.org/MPL/
+ *
+ * Software distributed under the License is distributed on an "AS IS"
+ * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+ * the License for the specific language governing rights and
+ * limitations under the License.
+ *
+ * The Original Code is librabbitmq.
+ *
+ * The Initial Developers of the Original Code are LShift Ltd, Cohesive
+ * Financial Technologies LLC, and Rabbit Technologies Ltd. Portions
+ * created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, Cohesive
+ * Financial Technologies LLC, or Rabbit Technologies Ltd are Copyright
+ * (C) 2007-2008 LShift Ltd, Cohesive Financial Technologies LLC, and
+ * Rabbit Technologies Ltd.
+ *
+ * Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+ * Ltd. Portions created by Cohesive Financial Technologies LLC are
+ * Copyright (C) 2007-2010 Cohesive Financial Technologies
+ * LLC. Portions created by Rabbit Technologies Ltd are Copyright (C)
+ * 2007-2010 Rabbit Technologies Ltd.
+ *
+ * Portions created by Tony Garnock-Jones are Copyright (C) 2009-2010
+ * LShift Ltd and Tony Garnock-Jones.
+ *
+ * All Rights Reserved.
+ *
+ * Contributor(s): ______________________________________.
+ *
+ * Alternatively, the contents of this file may be used under the terms
+ * of the GNU General Public License Version 2 or later (the "GPL"), in
+ * which case the provisions of the GPL are applicable instead of those
+ * above. If you wish to allow use of your version of this file only
+ * under the terms of the GPL, and not to allow others to use your
+ * version of this file under the terms of the MPL, indicate your
+ * decision by deleting the provisions above and replace them with the
+ * notice and other provisions required by the GPL. If you do not
+ * delete the provisions above, a recipient may use your version of
+ * this file under the terms of any one of the MPL or the GPL.
+ *
+ * ***** END LICENSE BLOCK *****
+ */
+
+#include <errno.h>
+#include <sys/types.h>
+#include <unistd.h>
+#include <sys/uio.h>
+#include <sys/socket.h>
+#include <netdb.h>
+#include <netinet/in.h>
+
+static inline int socket_init(void)
+{
+ return 0;
+}
+
+extern int socket_socket(int domain, int type, int proto);
+
+#define socket_connect connect
+#define socket_close close
+#define socket_read read
+#define socket_write write
+#define socket_writev writev
+
+static inline int encoded_socket_errno()
+{
+ return errno | ERROR_CATEGORY_OS;
+}
+
+#endif
diff --git a/librabbitmq/windows/socket.c b/librabbitmq/windows/socket.c
new file mode 100644
index 0000000..f809f62
--- /dev/null
+++ b/librabbitmq/windows/socket.c
@@ -0,0 +1,88 @@
+/*
+ * ***** BEGIN LICENSE BLOCK *****
+ * Version: MPL 1.1/GPL 2.0
+ *
+ * The contents of this file are subject to the Mozilla Public License
+ * Version 1.1 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ * http://www.mozilla.org/MPL/
+ *
+ * Software distributed under the License is distributed on an "AS IS"
+ * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+ * the License for the specific language governing rights and
+ * limitations under the License.
+ *
+ * The Original Code is librabbitmq.
+ *
+ * The Initial Developers of the Original Code are LShift Ltd, Cohesive
+ * Financial Technologies LLC, and Rabbit Technologies Ltd. Portions
+ * created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, Cohesive
+ * Financial Technologies LLC, or Rabbit Technologies Ltd are Copyright
+ * (C) 2007-2008 LShift Ltd, Cohesive Financial Technologies LLC, and
+ * Rabbit Technologies Ltd.
+ *
+ * Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+ * Ltd. Portions created by Cohesive Financial Technologies LLC are
+ * Copyright (C) 2007-2010 Cohesive Financial Technologies
+ * LLC. Portions created by Rabbit Technologies Ltd are Copyright (C)
+ * 2007-2010 Rabbit Technologies Ltd.
+ *
+ * Portions created by Tony Garnock-Jones are Copyright (C) 2009-2010
+ * LShift Ltd and Tony Garnock-Jones.
+ *
+ * All Rights Reserved.
+ *
+ * Contributor(s): ______________________________________.
+ *
+ * Alternatively, the contents of this file may be used under the terms
+ * of the GNU General Public License Version 2 or later (the "GPL"), in
+ * which case the provisions of the GPL are applicable instead of those
+ * above. If you wish to allow use of your version of this file only
+ * under the terms of the GPL, and not to allow others to use your
+ * version of this file under the terms of the MPL, indicate your
+ * decision by deleting the provisions above and replace them with the
+ * notice and other provisions required by the GPL. If you do not
+ * delete the provisions above, a recipient may use your version of
+ * this file under the terms of any one of the MPL or the GPL.
+ *
+ * ***** END LICENSE BLOCK *****
+ */
+
+#include <windows.h>
+#include <stdint.h>
+
+#include "amqp.h"
+#include "amqp_private.h"
+#include "socket.h"
+
+static int called_wsastartup;
+
+int socket_init(void)
+{
+ if (!called_wsastartup) {
+ WSADATA data;
+ int res = WSAStartup(0x0202, &data);
+ if (res)
+ return -res;
+
+ called_wsastartup = 1;
+ }
+
+ return 0;
+}
+
+const char *amqp_os_error_string(int err)
+{
+ char *msg, *copy;
+
+ if (!FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM
+ | FORMAT_MESSAGE_ALLOCATE_BUFFER,
+ NULL, err,
+ MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
+ (LPSTR)&msg, 0, NULL))
+ return strdup("(error retrieving Windows error message)");
+
+ copy = strdup(msg);
+ LocalFree(msg);
+ return copy;
+}
diff --git a/librabbitmq/windows/socket.h b/librabbitmq/windows/socket.h
new file mode 100644
index 0000000..bff6efc
--- /dev/null
+++ b/librabbitmq/windows/socket.h
@@ -0,0 +1,92 @@
+#ifndef librabbitmq_windows_socket_h
+#define librabbitmq_windows_socket_h
+
+/*
+ * ***** BEGIN LICENSE BLOCK *****
+ * Version: MPL 1.1/GPL 2.0
+ *
+ * The contents of this file are subject to the Mozilla Public License
+ * Version 1.1 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ * http://www.mozilla.org/MPL/
+ *
+ * Software distributed under the License is distributed on an "AS IS"
+ * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+ * the License for the specific language governing rights and
+ * limitations under the License.
+ *
+ * The Original Code is librabbitmq.
+ *
+ * The Initial Developers of the Original Code are LShift Ltd, Cohesive
+ * Financial Technologies LLC, and Rabbit Technologies Ltd. Portions
+ * created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, Cohesive
+ * Financial Technologies LLC, or Rabbit Technologies Ltd are Copyright
+ * (C) 2007-2008 LShift Ltd, Cohesive Financial Technologies LLC, and
+ * Rabbit Technologies Ltd.
+ *
+ * Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+ * Ltd. Portions created by Cohesive Financial Technologies LLC are
+ * Copyright (C) 2007-2010 Cohesive Financial Technologies
+ * LLC. Portions created by Rabbit Technologies Ltd are Copyright (C)
+ * 2007-2010 Rabbit Technologies Ltd.
+ *
+ * Portions created by Tony Garnock-Jones are Copyright (C) 2009-2010
+ * LShift Ltd and Tony Garnock-Jones.
+ *
+ * All Rights Reserved.
+ *
+ * Contributor(s): ______________________________________.
+ *
+ * Alternatively, the contents of this file may be used under the terms
+ * of the GNU General Public License Version 2 or later (the "GPL"), in
+ * which case the provisions of the GPL are applicable instead of those
+ * above. If you wish to allow use of your version of this file only
+ * under the terms of the GPL, and not to allow others to use your
+ * version of this file under the terms of the MPL, indicate your
+ * decision by deleting the provisions above and replace them with the
+ * notice and other provisions required by the GPL. If you do not
+ * delete the provisions above, a recipient may use your version of
+ * this file under the terms of any one of the MPL or the GPL.
+ *
+ * ***** END LICENSE BLOCK *****
+ */
+
+#include <winsock2.h>
+
+extern int socket_init(void);
+
+#define socket_socket socket
+#define socket_connect connect
+#define socket_close closesocket
+
+static inline int socket_read(int sock, void *buf, size_t count)
+{
+ return recv(sock, buf, count, 0);
+}
+
+static inline int socket_write(int sock, void *buf, size_t count)
+{
+ return send(sock, buf, count, 0);
+}
+
+/* same as WSABUF */
+struct iovec {
+ u_long iov_len;
+ char *iov_base;
+};
+
+static inline int socket_writev(int sock, struct iovec *iov, int nvecs)
+{
+ DWORD ret;
+ if (WSASend(sock, (LPWSABUF)iov, nvecs, &ret, 0, NULL, NULL) == 0)
+ return ret;
+ else
+ return -1;
+}
+
+static inline int encoded_socket_errno()
+{
+ return WSAGetLastError() | ERROR_CATEGORY_OS;
+}
+
+#endif
diff --git a/tests/Makefile.am b/tests/Makefile.am
index 1ac6faf..7c8a4fe 100644
--- a/tests/Makefile.am
+++ b/tests/Makefile.am
@@ -1,4 +1,4 @@
noinst_PROGRAMS = test_tables
-AM_CFLAGS = -I$(top_srcdir)/librabbitmq
+AM_CFLAGS = -I$(top_srcdir)/librabbitmq -I$(top_srcdir)/librabbitmq/$(PLATFORM_DIR)
AM_LDFLAGS = $(top_builddir)/librabbitmq/librabbitmq.la
diff --git a/tests/test_tables.c b/tests/test_tables.c
index e620443..1a0652a 100644
--- a/tests/test_tables.c
+++ b/tests/test_tables.c
@@ -54,7 +54,8 @@
#include <time.h>
#include <errno.h>
-#include <stdint.h>
+#include <inttypes.h>
+
#include <amqp.h>
#include <amqp_framing.h>
#include <amqp_private.h>
@@ -75,13 +76,13 @@ static void dump_value(int indent, amqp_field_value_t v) {
putchar(' ');
switch (v.kind) {
case AMQP_FIELD_KIND_BOOLEAN: puts(v.value.boolean ? "true" : "false"); break;
- case AMQP_FIELD_KIND_I8: printf("%d\n", v.value.i8); break;
- case AMQP_FIELD_KIND_U8: printf("%d\n", v.value.u8); break;
- case AMQP_FIELD_KIND_I16: printf("%d\n", v.value.i16); break;
- case AMQP_FIELD_KIND_U16: printf("%d\n", v.value.u16); break;
- case AMQP_FIELD_KIND_I32: printf("%ld\n", (long) v.value.i32); break;
- case AMQP_FIELD_KIND_U32: printf("%lu\n", (unsigned long) v.value.u32); break;
- case AMQP_FIELD_KIND_I64: printf("%lld\n", (long long) v.value.i64); break;
+ case AMQP_FIELD_KIND_I8: printf("%"PRId8"\n", v.value.i8); break;
+ case AMQP_FIELD_KIND_U8: printf("%"PRIu8"\n", v.value.u8); break;
+ case AMQP_FIELD_KIND_I16: printf("%"PRId16"\n", v.value.i16); break;
+ case AMQP_FIELD_KIND_U16: printf("%"PRIu16"\n", v.value.u16); break;
+ case AMQP_FIELD_KIND_I32: printf("%"PRId32"\n", v.value.i32); break;
+ case AMQP_FIELD_KIND_U32: printf("%"PRIu32"\n", v.value.u32); break;
+ case AMQP_FIELD_KIND_I64: printf("%"PRId64"\n", v.value.i64); break;
case AMQP_FIELD_KIND_F32: printf("%g\n", (double) v.value.f32); break;
case AMQP_FIELD_KIND_F64: printf("%g\n", v.value.f64); break;
case AMQP_FIELD_KIND_DECIMAL:
@@ -106,7 +107,7 @@ static void dump_value(int indent, amqp_field_value_t v) {
}
}
break;
- case AMQP_FIELD_KIND_TIMESTAMP: printf("%llu\n", (unsigned long long) v.value.u64); break;
+ case AMQP_FIELD_KIND_TIMESTAMP: printf("%"PRIu64"\n", v.value.u64); break;
case AMQP_FIELD_KIND_TABLE:
putchar('\n');
{
@@ -209,7 +210,8 @@ static void test_table_codec(void) {
int decoding_offset = 0;
result = amqp_decode_table(decoding_bytes, &pool, &decoded, &decoding_offset);
if (result < 0) {
- printf("Table decoding failed: %d (%s)\n", result, strerror(-result));
+ printf("Table decoding failed: %d (%s)\n", result,
+ amqp_error_string(-result));
abort();
}
printf("BBBBBBBBBB\n");
@@ -227,7 +229,8 @@ static void test_table_codec(void) {
result = amqp_encode_table(encoding_result, &table, &offset);
if (result < 0) {
- printf("Table encoding failed: %d (%s)\n", result, strerror(-result));
+ printf("Table encoding failed: %d (%s)\n", result,
+ amqp_error_string(-result));
abort();
}
@@ -272,7 +275,7 @@ int main(int argc, char const * const *argv) {
if ((sizeof(float) != 4) || (vi.i != 0x40490fdb)) {
printf("*** ERROR: single floating point encoding does not work as expected\n");
printf("sizeof float is %lu, float is %g, u32 is 0x%08lx\n",
- sizeof(float),
+ (unsigned long)sizeof(float),
vi.f,
(unsigned long) vi.i);
}
@@ -280,10 +283,9 @@ int main(int argc, char const * const *argv) {
vl.d = M_PI;
if ((sizeof(double) != 8) || (vl.l != 0x400921fb54442d18L)) {
printf("*** ERROR: double floating point encoding does not work as expected\n");
- printf("sizeof double is %lu, double is %g, u64 is 0x%16llx\n",
- sizeof(double),
- vl.d,
- (unsigned long long) vl.l);
+ printf("sizeof double is %lu, double is %g, u64 is 0x%16"PRIx64"\n",
+ (unsigned long)sizeof(double),
+ vl.d, vl.l);
}
test_table_codec();
diff --git a/tools/Makefile.am b/tools/Makefile.am
index 2c47385..ad53d88 100644
--- a/tools/Makefile.am
+++ b/tools/Makefile.am
@@ -2,15 +2,21 @@ SUBDIRS=doc
bin_PROGRAMS = amqp-publish amqp-get amqp-consume amqp-declare-queue amqp-delete-queue
-AM_CFLAGS = -I$(top_srcdir)/librabbitmq
+AM_CFLAGS = -I$(top_srcdir)/librabbitmq -I$(PLATFORM_DIR)
AM_LDFLAGS = $(top_builddir)/librabbitmq/librabbitmq.la
LDADD=$(LIBPOPT)
-noinst_HEADERS = common.h
+noinst_HEADERS = common.h $(PLATFORM_DIR)/process.h
-amqp_publish_SOURCES = publish.c common.c
-amqp_get_SOURCES = get.c common.c
-amqp_consume_SOURCES = consume.c common.c
-amqp_declare_queue_SOURCES = declare_queue.c common.c
-amqp_delete_queue_SOURCES = delete_queue.c common.c
+COMMON_SOURCES = common.c
+
+if WINDOWS
+COMMON_SOURCES += windows/compat.c
+endif
+
+amqp_publish_SOURCES = publish.c $(COMMON_SOURCES)
+amqp_get_SOURCES = get.c $(COMMON_SOURCES)
+amqp_consume_SOURCES = consume.c $(PLATFORM_DIR)/process.c $(COMMON_SOURCES)
+amqp_declare_queue_SOURCES = declare_queue.c $(COMMON_SOURCES)
+amqp_delete_queue_SOURCES = delete_queue.c $(COMMON_SOURCES)
diff --git a/tools/common.c b/tools/common.c
index 6a38a95..39099c1 100644
--- a/tools/common.c
+++ b/tools/common.c
@@ -58,14 +58,12 @@
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
-#include <spawn.h>
-#include <sys/wait.h>
-
-#include <popt.h>
#include "common.h"
-extern char **environ;
+#ifdef WINDOWS
+#include "compat.h"
+#endif
void die(const char *fmt, ...)
{
@@ -86,11 +84,24 @@ void die_errno(int err, const char *fmt, ...)
va_start(ap, fmt);
vfprintf(stderr, fmt, ap);
va_end(ap);
- fprintf(stderr, ": %s\n", strerror(err));
+ fprintf(stderr, ": %s\n", strerror(errno));
exit(1);
}
-char *amqp_server_exception_string(amqp_rpc_reply_t r)
+void die_amqp_error(int err, const char *fmt, ...)
+{
+ if (err <= 0)
+ return;
+
+ va_list ap;
+ va_start(ap, fmt);
+ vfprintf(stderr, fmt, ap);
+ va_end(ap);
+ fprintf(stderr, ": %s\n", amqp_error_string(err));
+ exit(1);
+}
+
+const char *amqp_server_exception_string(amqp_rpc_reply_t r)
{
int res;
char *s;
@@ -125,26 +136,17 @@ char *amqp_server_exception_string(amqp_rpc_reply_t r)
return res >= 0 ? s : NULL;
}
-char *amqp_rpc_reply_string(amqp_rpc_reply_t r)
+const char *amqp_rpc_reply_string(amqp_rpc_reply_t r)
{
- const char *s;
-
switch (r.reply_type) {
case AMQP_RESPONSE_NORMAL:
- s = "normal response";
- break;
+ return strdup("normal response");
case AMQP_RESPONSE_NONE:
- s = "missing RPC reply type";
- break;
+ return strdup("missing RPC reply type");
case AMQP_RESPONSE_LIBRARY_EXCEPTION:
- if (r.library_errno)
- s = strerror(r.library_errno);
- else
- s = "end of stream";
-
- break;
+ return amqp_error_string(r.library_error);
case AMQP_RESPONSE_SERVER_EXCEPTION:
return amqp_server_exception_string(r);
@@ -152,8 +154,6 @@ char *amqp_rpc_reply_string(amqp_rpc_reply_t r)
default:
abort();
}
-
- return strdup(s);
}
void die_rpc(amqp_rpc_reply_t r, const char *fmt, ...)
@@ -169,16 +169,6 @@ void die_rpc(amqp_rpc_reply_t r, const char *fmt, ...)
exit(1);
}
-void set_cloexec(int fd)
-{
- int flags;
-
- flags = fcntl(fd, F_GETFD);
- if (flags == -1
- || fcntl(fd, F_SETFD, (long)(flags | FD_CLOEXEC)) == -1)
- die_errno(errno, "set_cloexec");
-}
-
static char *amqp_server = "localhost";
static char *amqp_vhost = "/";
static char *amqp_username = "guest";
@@ -222,14 +212,7 @@ amqp_connection_state_t make_connection(void)
}
s = amqp_open_socket(host, port ? port : 5672);
- if (s < 0) {
- if (s == -ENOENT)
- die("unknown host %s", host);
- else
- die_errno(-s, "opening socket to %s", amqp_server);
- }
-
- set_cloexec(s);
+ die_amqp_error(-s, "opening socket to %s", amqp_server);
conn = amqp_new_connection();
amqp_set_sockfd(conn, s);
@@ -247,16 +230,14 @@ amqp_connection_state_t make_connection(void)
void close_connection(amqp_connection_state_t conn)
{
- int s = amqp_get_sockfd(conn);
-
+ int res;
die_rpc(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS),
"closing channel");
die_rpc(amqp_connection_close(conn, AMQP_REPLY_SUCCESS),
"closing connection");
- amqp_destroy_connection(conn);
-
- if (close(s) < 0)
- die_errno(errno, "closing socket");
+
+ res = amqp_end_connection(conn);
+ die_amqp_error(-res, "closing connection");
}
amqp_bytes_t read_all(int fd)
@@ -307,8 +288,7 @@ void copy_body(amqp_connection_state_t conn, int fd)
amqp_frame_t frame;
int res = amqp_simple_wait_frame(conn, &frame);
- if (res < 0)
- die_errno(-res, "waiting for header frame");
+ die_amqp_error(-res, "waiting for header frame");
if (frame.frame_type != AMQP_FRAME_HEADER)
die("expected header, got frame type 0x%X",
frame.frame_type);
@@ -316,8 +296,7 @@ void copy_body(amqp_connection_state_t conn, int fd)
body_remaining = frame.payload.properties.body_size;
while (body_remaining) {
res = amqp_simple_wait_frame(conn, &frame);
- if (res < 0)
- die_errno(-res, "waiting for body frame");
+ die_amqp_error(-res, "waiting for body frame");
if (frame.frame_type != AMQP_FRAME_BODY)
die("expected body, got frame type 0x%X",
frame.frame_type);
@@ -327,47 +306,6 @@ void copy_body(amqp_connection_state_t conn, int fd)
}
}
-void pipeline(const char * const *argv, struct pipeline *pl)
-{
- posix_spawn_file_actions_t file_acts;
-
- int pipefds[2];
- if (pipe(pipefds))
- die_errno(errno, "pipe");
-
- die_errno(posix_spawn_file_actions_init(&file_acts),
- "posix_spawn_file_actions_init");
- die_errno(posix_spawn_file_actions_adddup2(&file_acts, pipefds[0], 0),
- "posix_spawn_file_actions_adddup2");
- die_errno(posix_spawn_file_actions_addclose(&file_acts, pipefds[0]),
- "posix_spawn_file_actions_addclose");
- die_errno(posix_spawn_file_actions_addclose(&file_acts, pipefds[1]),
- "posix_spawn_file_actions_addclose");
-
- die_errno(posix_spawnp(&pl->pid, argv[0], &file_acts, NULL,
- (char * const *)argv, environ),
- "posix_spawnp: %s", argv[0]);
-
- die_errno(posix_spawn_file_actions_destroy(&file_acts),
- "posix_spawn_file_actions_destroy");
-
- if (close(pipefds[0]))
- die_errno(errno, "close");
-
- pl->infd = pipefds[1];
-}
-
-int finish_pipeline(struct pipeline *pl)
-{
- int status;
-
- if (close(pl->infd))
- die_errno(errno, "close");
- if (waitpid(pl->pid, &status, 0) < 0)
- die_errno(errno, "waitpid");
- return WIFEXITED(status) && WEXITSTATUS(status) == 0;
-}
-
poptContext process_options(int argc, const char **argv,
struct poptOption *options,
const char *help)
diff --git a/tools/common.h b/tools/common.h
index 09a9242..0caee98 100644
--- a/tools/common.h
+++ b/tools/common.h
@@ -50,16 +50,20 @@
#include <stdint.h>
+#include <popt.h>
+
#include <amqp.h>
#include <amqp_framing.h>
-extern char *amqp_server_exception_string(amqp_rpc_reply_t r);
-extern char *amqp_rpc_reply_string(amqp_rpc_reply_t r);
+extern const char *amqp_server_exception_string(amqp_rpc_reply_t r);
+extern const char *amqp_rpc_reply_string(amqp_rpc_reply_t r);
extern void die(const char *fmt, ...)
__attribute__ ((format (printf, 1, 2)));
extern void die_errno(int err, const char *fmt, ...)
__attribute__ ((format (printf, 2, 3)));
+extern void die_amqp_error(int err, const char *fmt, ...)
+ __attribute__ ((format (printf, 2, 3)));
extern void die_rpc(amqp_rpc_reply_t r, const char *fmt, ...)
__attribute__ ((format (printf, 2, 3)));
@@ -73,14 +77,6 @@ extern void write_all(int fd, amqp_bytes_t data);
extern void copy_body(amqp_connection_state_t conn, int fd);
-struct pipeline {
- int pid;
- int infd;
-};
-
-extern void pipeline(const char * const *argv, struct pipeline *pl);
-extern int finish_pipeline(struct pipeline *pl);
-
#define INCLUDE_OPTIONS(options) \
{NULL, 0, POPT_ARG_INCLUDE_TABLE, options, 0, options ## _title, NULL}
diff --git a/tools/consume.c b/tools/consume.c
index 40b61d1..2117bba 100644
--- a/tools/consume.c
+++ b/tools/consume.c
@@ -53,9 +53,8 @@
#include <stdio.h>
#include <stdlib.h>
-#include <popt.h>
-
#include "common.h"
+#include "process.h"
/* Convert a amqp_bytes_t to an escaped string form for printing. We
use the same escaping conventions as rabbitmqctl. */
@@ -157,8 +156,7 @@ static void do_consume(amqp_connection_state_t conn, amqp_bytes_t queue,
struct pipeline pl;
uint64_t delivery_tag;
int res = amqp_simple_wait_frame(conn, &frame);
- if (res < 0)
- die_errno(-res, "waiting for header frame");
+ die_amqp_error(res, "waiting for header frame");
if (frame.frame_type != AMQP_FRAME_METHOD
|| frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD)
@@ -172,8 +170,9 @@ static void do_consume(amqp_connection_state_t conn, amqp_bytes_t queue,
copy_body(conn, pl.infd);
if (finish_pipeline(&pl) && !no_ack)
- die_errno(-amqp_basic_ack(conn, 1, delivery_tag, 0),
- "basic.ack");
+ die_amqp_error(amqp_basic_ack(conn, 1, delivery_tag,
+ 0),
+ "basic.ack");
amqp_maybe_release_buffers(conn);
}
diff --git a/tools/get.c b/tools/get.c
index f746fd1..8f8e0d0 100644
--- a/tools/get.c
+++ b/tools/get.c
@@ -52,8 +52,6 @@
#include <stdio.h>
-#include <popt.h>
-
#include "common.h"
static int do_get(amqp_connection_state_t conn, char *queue)
diff --git a/tools/publish.c b/tools/publish.c
index 21314b2..0917dae 100644
--- a/tools/publish.c
+++ b/tools/publish.c
@@ -54,8 +54,6 @@
#include <stdlib.h>
#include <string.h>
-#include <popt.h>
-
#include "common.h"
static void do_publish(amqp_connection_state_t conn,
@@ -66,8 +64,7 @@ static void do_publish(amqp_connection_state_t conn,
cstring_bytes(exchange),
cstring_bytes(routing_key),
0, 0, props, body);
- if (res != 0)
- die_errno(-res, "basic.publish");
+ die_amqp_error(res, "basic.publish");
}
int main(int argc, const char **argv)
diff --git a/tools/unix/process.c b/tools/unix/process.c
new file mode 100644
index 0000000..8a02afb
--- /dev/null
+++ b/tools/unix/process.c
@@ -0,0 +1,100 @@
+/*
+ * ***** BEGIN LICENSE BLOCK *****
+ * Version: MPL 1.1/GPL 2.0
+ *
+ * The contents of this file are subject to the Mozilla Public License
+ * Version 1.1 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ * http://www.mozilla.org/MPL/
+ *
+ * Software distributed under the License is distributed on an "AS IS"
+ * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+ * the License for the specific language governing rights and
+ * limitations under the License.
+ *
+ * The Original Code is librabbitmq.
+ *
+ * The Initial Developers of the Original Code are LShift Ltd, Cohesive
+ * Financial Technologies LLC, and Rabbit Technologies Ltd. Portions
+ * created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, Cohesive
+ * Financial Technologies LLC, or Rabbit Technologies Ltd are Copyright
+ * (C) 2007-2008 LShift Ltd, Cohesive Financial Technologies LLC, and
+ * Rabbit Technologies Ltd.
+ *
+ * Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+ * Ltd. Portions created by Cohesive Financial Technologies LLC are
+ * Copyright (C) 2007-2010 Cohesive Financial Technologies
+ * LLC. Portions created by Rabbit Technologies Ltd are Copyright (C)
+ * 2007-2010 Rabbit Technologies Ltd.
+ *
+ * Portions created by Tony Garnock-Jones are Copyright (C) 2009-2010
+ * LShift Ltd and Tony Garnock-Jones.
+ *
+ * All Rights Reserved.
+ *
+ * Contributor(s): ______________________________________.
+ *
+ * Alternatively, the contents of this file may be used under the terms
+ * of the GNU General Public License Version 2 or later (the "GPL"), in
+ * which case the provisions of the GPL are applicable instead of those
+ * above. If you wish to allow use of your version of this file only
+ * under the terms of the GPL, and not to allow others to use your
+ * version of this file under the terms of the MPL, indicate your
+ * decision by deleting the provisions above and replace them with the
+ * notice and other provisions required by the GPL. If you do not
+ * delete the provisions above, a recipient may use your version of
+ * this file under the terms of any one of the MPL or the GPL.
+ *
+ * ***** END LICENSE BLOCK *****
+ */
+
+#include <unistd.h>
+#include <errno.h>
+#include <spawn.h>
+#include <sys/wait.h>
+
+#include "common.h"
+#include "process.h"
+
+extern char **environ;
+
+void pipeline(const char *const *argv, struct pipeline *pl)
+{
+ posix_spawn_file_actions_t file_acts;
+
+ int pipefds[2];
+ if (pipe(pipefds))
+ die_errno(errno, "pipe");
+
+ die_errno(posix_spawn_file_actions_init(&file_acts),
+ "posix_spawn_file_actions_init");
+ die_errno(posix_spawn_file_actions_adddup2(&file_acts, pipefds[0], 0),
+ "posix_spawn_file_actions_adddup2");
+ die_errno(posix_spawn_file_actions_addclose(&file_acts, pipefds[0]),
+ "posix_spawn_file_actions_addclose");
+ die_errno(posix_spawn_file_actions_addclose(&file_acts, pipefds[1]),
+ "posix_spawn_file_actions_addclose");
+
+ die_errno(posix_spawnp(&pl->pid, argv[0], &file_acts, NULL,
+ (char * const *)argv, environ),
+ "posix_spawnp: %s", argv[0]);
+
+ die_errno(posix_spawn_file_actions_destroy(&file_acts),
+ "posix_spawn_file_actions_destroy");
+
+ if (close(pipefds[0]))
+ die_errno(errno, "close");
+
+ pl->infd = pipefds[1];
+}
+
+int finish_pipeline(struct pipeline *pl)
+{
+ int status;
+
+ if (close(pl->infd))
+ die_errno(errno, "close");
+ if (waitpid(pl->pid, &status, 0) < 0)
+ die_errno(errno, "waitpid");
+ return WIFEXITED(status) && WEXITSTATUS(status) == 0;
+}
diff --git a/tools/unix/process.h b/tools/unix/process.h
new file mode 100644
index 0000000..ac2939d
--- /dev/null
+++ b/tools/unix/process.h
@@ -0,0 +1,57 @@
+/*
+ * ***** BEGIN LICENSE BLOCK *****
+ * Version: MPL 1.1/GPL 2.0
+ *
+ * The contents of this file are subject to the Mozilla Public License
+ * Version 1.1 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ * http://www.mozilla.org/MPL/
+ *
+ * Software distributed under the License is distributed on an "AS IS"
+ * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+ * the License for the specific language governing rights and
+ * limitations under the License.
+ *
+ * The Original Code is librabbitmq.
+ *
+ * The Initial Developers of the Original Code are LShift Ltd, Cohesive
+ * Financial Technologies LLC, and Rabbit Technologies Ltd. Portions
+ * created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, Cohesive
+ * Financial Technologies LLC, or Rabbit Technologies Ltd are Copyright
+ * (C) 2007-2008 LShift Ltd, Cohesive Financial Technologies LLC, and
+ * Rabbit Technologies Ltd.
+ *
+ * Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+ * Ltd. Portions created by Cohesive Financial Technologies LLC are
+ * Copyright (C) 2007-2010 Cohesive Financial Technologies
+ * LLC. Portions created by Rabbit Technologies Ltd are Copyright (C)
+ * 2007-2010 Rabbit Technologies Ltd.
+ *
+ * Portions created by Tony Garnock-Jones are Copyright (C) 2009-2010
+ * LShift Ltd and Tony Garnock-Jones.
+ *
+ * All Rights Reserved.
+ *
+ * Contributor(s): ______________________________________.
+ *
+ * Alternatively, the contents of this file may be used under the terms
+ * of the GNU General Public License Version 2 or later (the "GPL"), in
+ * which case the provisions of the GPL are applicable instead of those
+ * above. If you wish to allow use of your version of this file only
+ * under the terms of the GPL, and not to allow others to use your
+ * version of this file under the terms of the MPL, indicate your
+ * decision by deleting the provisions above and replace them with the
+ * notice and other provisions required by the GPL. If you do not
+ * delete the provisions above, a recipient may use your version of
+ * this file under the terms of any one of the MPL or the GPL.
+ *
+ * ***** END LICENSE BLOCK *****
+ */
+
+struct pipeline {
+ int pid;
+ int infd;
+};
+
+extern void pipeline(const char *const *argv, struct pipeline *pl);
+extern int finish_pipeline(struct pipeline *pl);
diff --git a/tools/windows/compat.c b/tools/windows/compat.c
new file mode 100644
index 0000000..f0508b2
--- /dev/null
+++ b/tools/windows/compat.c
@@ -0,0 +1,73 @@
+/*
+ * ***** BEGIN LICENSE BLOCK *****
+ * Version: MPL 1.1/GPL 2.0
+ *
+ * The contents of this file are subject to the Mozilla Public License
+ * Version 1.1 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ * http://www.mozilla.org/MPL/
+ *
+ * Software distributed under the License is distributed on an "AS IS"
+ * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+ * the License for the specific language governing rights and
+ * limitations under the License.
+ *
+ * The Original Code is librabbitmq.
+ *
+ * The Initial Developers of the Original Code are LShift Ltd, Cohesive
+ * Financial Technologies LLC, and Rabbit Technologies Ltd. Portions
+ * created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, Cohesive
+ * Financial Technologies LLC, or Rabbit Technologies Ltd are Copyright
+ * (C) 2007-2008 LShift Ltd, Cohesive Financial Technologies LLC, and
+ * Rabbit Technologies Ltd.
+ *
+ * Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+ * Ltd. Portions created by Cohesive Financial Technologies LLC are
+ * Copyright (C) 2007-2010 Cohesive Financial Technologies
+ * LLC. Portions created by Rabbit Technologies Ltd are Copyright (C)
+ * 2007-2010 Rabbit Technologies Ltd.
+ *
+ * Portions created by Tony Garnock-Jones are Copyright (C) 2009-2010
+ * LShift Ltd and Tony Garnock-Jones.
+ *
+ * All Rights Reserved.
+ *
+ * Contributor(s): ______________________________________.
+ *
+ * Alternatively, the contents of this file may be used under the terms
+ * of the GNU General Public License Version 2 or later (the "GPL"), in
+ * which case the provisions of the GPL are applicable instead of those
+ * above. If you wish to allow use of your version of this file only
+ * under the terms of the GPL, and not to allow others to use your
+ * version of this file under the terms of the MPL, indicate your
+ * decision by deleting the provisions above and replace them with the
+ * notice and other provisions required by the GPL. If you do not
+ * delete the provisions above, a recipient may use your version of
+ * this file under the terms of any one of the MPL or the GPL.
+ *
+ * ***** END LICENSE BLOCK *****
+ */
+
+#include <stdio.h>
+#include <stdarg.h>
+#include <stdlib.h>
+
+#include "compat.h"
+
+int asprintf(char **strp, const char *fmt, ...)
+{
+ va_list ap;
+ int len;
+
+ va_start(ap, fmt);
+ len = _vscprintf(fmt, ap);
+ *strp = malloc(len+1);
+ if (!*strp)
+ return -1;
+
+ len = vsprintf(*strp, fmt, ap);
+ *strp[len] = 0;
+
+ va_end(ap);
+ return len;
+}
diff --git a/tools/windows/compat.h b/tools/windows/compat.h
new file mode 100644
index 0000000..8211b37
--- /dev/null
+++ b/tools/windows/compat.h
@@ -0,0 +1,51 @@
+/*
+ * ***** BEGIN LICENSE BLOCK *****
+ * Version: MPL 1.1/GPL 2.0
+ *
+ * The contents of this file are subject to the Mozilla Public License
+ * Version 1.1 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ * http://www.mozilla.org/MPL/
+ *
+ * Software distributed under the License is distributed on an "AS IS"
+ * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+ * the License for the specific language governing rights and
+ * limitations under the License.
+ *
+ * The Original Code is librabbitmq.
+ *
+ * The Initial Developers of the Original Code are LShift Ltd, Cohesive
+ * Financial Technologies LLC, and Rabbit Technologies Ltd. Portions
+ * created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, Cohesive
+ * Financial Technologies LLC, or Rabbit Technologies Ltd are Copyright
+ * (C) 2007-2008 LShift Ltd, Cohesive Financial Technologies LLC, and
+ * Rabbit Technologies Ltd.
+ *
+ * Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+ * Ltd. Portions created by Cohesive Financial Technologies LLC are
+ * Copyright (C) 2007-2010 Cohesive Financial Technologies
+ * LLC. Portions created by Rabbit Technologies Ltd are Copyright (C)
+ * 2007-2010 Rabbit Technologies Ltd.
+ *
+ * Portions created by Tony Garnock-Jones are Copyright (C) 2009-2010
+ * LShift Ltd and Tony Garnock-Jones.
+ *
+ * All Rights Reserved.
+ *
+ * Contributor(s): ______________________________________.
+ *
+ * Alternatively, the contents of this file may be used under the terms
+ * of the GNU General Public License Version 2 or later (the "GPL"), in
+ * which case the provisions of the GPL are applicable instead of those
+ * above. If you wish to allow use of your version of this file only
+ * under the terms of the GPL, and not to allow others to use your
+ * version of this file under the terms of the MPL, indicate your
+ * decision by deleting the provisions above and replace them with the
+ * notice and other provisions required by the GPL. If you do not
+ * delete the provisions above, a recipient may use your version of
+ * this file under the terms of any one of the MPL or the GPL.
+ *
+ * ***** END LICENSE BLOCK *****
+ */
+
+extern int asprintf(char **strp, const char *fmt, ...);
diff --git a/tools/windows/process.c b/tools/windows/process.c
new file mode 100644
index 0000000..9a0b893
--- /dev/null
+++ b/tools/windows/process.c
@@ -0,0 +1,204 @@
+/*
+ * ***** BEGIN LICENSE BLOCK *****
+ * Version: MPL 1.1/GPL 2.0
+ *
+ * The contents of this file are subject to the Mozilla Public License
+ * Version 1.1 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ * http://www.mozilla.org/MPL/
+ *
+ * Software distributed under the License is distributed on an "AS IS"
+ * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+ * the License for the specific language governing rights and
+ * limitations under the License.
+ *
+ * The Original Code is librabbitmq.
+ *
+ * The Initial Developers of the Original Code are LShift Ltd, Cohesive
+ * Financial Technologies LLC, and Rabbit Technologies Ltd. Portions
+ * created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, Cohesive
+ * Financial Technologies LLC, or Rabbit Technologies Ltd are Copyright
+ * (C) 2007-2008 LShift Ltd, Cohesive Financial Technologies LLC, and
+ * Rabbit Technologies Ltd.
+ *
+ * Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+ * Ltd. Portions created by Cohesive Financial Technologies LLC are
+ * Copyright (C) 2007-2010 Cohesive Financial Technologies
+ * LLC. Portions created by Rabbit Technologies Ltd are Copyright (C)
+ * 2007-2010 Rabbit Technologies Ltd.
+ *
+ * Portions created by Tony Garnock-Jones are Copyright (C) 2009-2010
+ * LShift Ltd and Tony Garnock-Jones.
+ *
+ * All Rights Reserved.
+ *
+ * Contributor(s): ______________________________________.
+ *
+ * Alternatively, the contents of this file may be used under the terms
+ * of the GNU General Public License Version 2 or later (the "GPL"), in
+ * which case the provisions of the GPL are applicable instead of those
+ * above. If you wish to allow use of your version of this file only
+ * under the terms of the GPL, and not to allow others to use your
+ * version of this file under the terms of the MPL, indicate your
+ * decision by deleting the provisions above and replace them with the
+ * notice and other provisions required by the GPL. If you do not
+ * delete the provisions above, a recipient may use your version of
+ * this file under the terms of any one of the MPL or the GPL.
+ *
+ * ***** END LICENSE BLOCK *****
+ */
+
+#include <stdio.h>
+#include <io.h>
+#include <windows.h>
+
+#include "common.h"
+#include "process.h"
+
+void die_windows_error(const char *fmt, ...)
+{
+ char *msg;
+
+ va_list ap;
+ va_start(ap, fmt);
+ vfprintf(stderr, fmt, ap);
+ va_end(ap);
+
+ if (!FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM
+ | FORMAT_MESSAGE_ALLOCATE_BUFFER,
+ NULL, GetLastError(),
+ MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
+ (LPSTR)&msg, 0, NULL))
+ msg = "(failed to retrieving Windows error message)";
+
+ fprintf(stderr, ": %s\n", msg);
+ exit(1);
+}
+
+static char *make_command_line(const char *const *argv)
+{
+ int i;
+ size_t len = 1; /* initial quotes */
+ char *buf;
+ char *dest;
+
+ /* calculate the length of the required buffer, making worst
+ case assumptions for simplicity */
+ for (i = 0;;) {
+ len += strlen(argv[i]) * 2;
+
+ if (!argv[++i])
+ break;
+
+ len += 3; /* quotes, space, quotes */
+ }
+
+ len += 2; /* final quotes and the terminating zero */
+
+ dest = buf = malloc(len);
+ if (!buf)
+ die("allocating memory for subprocess command line");
+
+ *dest++ = '\"';
+
+ for (i = 0;;) {
+ const char *src = argv[i];
+ for (;;) {
+ switch (*src) {
+ case 0:
+ goto done;
+
+ case '\"':
+ case '\\':
+ *dest++ = '\\';
+ /* fall through */
+
+ default:
+ *dest++ = *src++;
+ break;
+ }
+ }
+ done:
+
+ if (!argv[++i])
+ break;
+
+ *dest++ = '\"';
+ *dest++ = ' ';
+ *dest++ = '\"';
+ }
+
+ *dest++ = '\"';
+ *dest++ = 0;
+ return buf;
+}
+
+void pipeline(const char *const *argv, struct pipeline *pl)
+{
+ HANDLE in_read_handle, in_write_handle;
+ SECURITY_ATTRIBUTES sec_attr;
+ PROCESS_INFORMATION proc_info;
+ STARTUPINFO start_info;
+ char *cmdline = make_command_line(argv);
+
+ sec_attr.nLength = sizeof sec_attr;
+ sec_attr.bInheritHandle = TRUE;
+ sec_attr.lpSecurityDescriptor = NULL;
+
+ if (!CreatePipe(&in_read_handle, &in_write_handle, &sec_attr, 0))
+ die_windows_error("CreatePipe");
+
+ if (!SetHandleInformation(in_write_handle, HANDLE_FLAG_INHERIT, 0))
+ die_windows_error("SetHandleInformation");
+
+ /* when in Rome... */
+ ZeroMemory(&proc_info, sizeof proc_info);
+ ZeroMemory(&start_info, sizeof start_info);
+
+ start_info.cb = sizeof start_info;
+ start_info.dwFlags |= STARTF_USESTDHANDLES;
+
+ if ((start_info.hStdError = GetStdHandle(STD_ERROR_HANDLE))
+ == INVALID_HANDLE_VALUE
+ || (start_info.hStdOutput = GetStdHandle(STD_OUTPUT_HANDLE))
+ == INVALID_HANDLE_VALUE)
+ die_windows_error("GetStdHandle");
+
+ start_info.hStdInput = in_read_handle;
+
+ if (!CreateProcess(NULL, cmdline, NULL, NULL, TRUE, 0,
+ NULL, NULL, &start_info, &proc_info))
+ die_windows_error("CreateProcess");
+
+ if (!CloseHandle(proc_info.hThread))
+ die_windows_error("CloseHandle for thread");
+ if (!CloseHandle(in_read_handle))
+ die_windows_error("CloseHandle");
+
+ pl->proc_handle = proc_info.hProcess;
+ pl->infd = _open_osfhandle((intptr_t)in_write_handle, 0);
+}
+
+int finish_pipeline(struct pipeline *pl)
+{
+ DWORD code;
+
+ if (close(pl->infd))
+ die_errno(errno, "close");
+
+ for (;;) {
+ if (!GetExitCodeProcess(pl->proc_handle, &code))
+ die_windows_error("GetExitCodeProcess");
+ if (code != STILL_ACTIVE)
+ break;
+
+ if (WaitForSingleObject(pl->proc_handle, INFINITE)
+ == WAIT_FAILED)
+ die_windows_error("WaitForSingleObject");
+ }
+
+ if (!CloseHandle(pl->proc_handle))
+ die_windows_error("CloseHandle for process");
+
+ return code;
+}
diff --git a/tools/windows/process.h b/tools/windows/process.h
new file mode 100644
index 0000000..df276a7
--- /dev/null
+++ b/tools/windows/process.h
@@ -0,0 +1,59 @@
+/*
+ * ***** BEGIN LICENSE BLOCK *****
+ * Version: MPL 1.1/GPL 2.0
+ *
+ * The contents of this file are subject to the Mozilla Public License
+ * Version 1.1 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ * http://www.mozilla.org/MPL/
+ *
+ * Software distributed under the License is distributed on an "AS IS"
+ * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+ * the License for the specific language governing rights and
+ * limitations under the License.
+ *
+ * The Original Code is librabbitmq.
+ *
+ * The Initial Developers of the Original Code are LShift Ltd, Cohesive
+ * Financial Technologies LLC, and Rabbit Technologies Ltd. Portions
+ * created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, Cohesive
+ * Financial Technologies LLC, or Rabbit Technologies Ltd are Copyright
+ * (C) 2007-2008 LShift Ltd, Cohesive Financial Technologies LLC, and
+ * Rabbit Technologies Ltd.
+ *
+ * Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+ * Ltd. Portions created by Cohesive Financial Technologies LLC are
+ * Copyright (C) 2007-2010 Cohesive Financial Technologies
+ * LLC. Portions created by Rabbit Technologies Ltd are Copyright (C)
+ * 2007-2010 Rabbit Technologies Ltd.
+ *
+ * Portions created by Tony Garnock-Jones are Copyright (C) 2009-2010
+ * LShift Ltd and Tony Garnock-Jones.
+ *
+ * All Rights Reserved.
+ *
+ * Contributor(s): ______________________________________.
+ *
+ * Alternatively, the contents of this file may be used under the terms
+ * of the GNU General Public License Version 2 or later (the "GPL"), in
+ * which case the provisions of the GPL are applicable instead of those
+ * above. If you wish to allow use of your version of this file only
+ * under the terms of the GPL, and not to allow others to use your
+ * version of this file under the terms of the MPL, indicate your
+ * decision by deleting the provisions above and replace them with the
+ * notice and other provisions required by the GPL. If you do not
+ * delete the provisions above, a recipient may use your version of
+ * this file under the terms of any one of the MPL or the GPL.
+ *
+ * ***** END LICENSE BLOCK *****
+ */
+
+#include <windef.h>
+
+struct pipeline {
+ HANDLE proc_handle;
+ int infd;
+};
+
+extern void pipeline(const char *const *argv, struct pipeline *pl);
+extern int finish_pipeline(struct pipeline *pl);