diff options
author | Tony Garnock-Jones <tonygarnockjones@gmail.com> | 2010-06-06 15:34:01 +1200 |
---|---|---|
committer | Tony Garnock-Jones <tonygarnockjones@gmail.com> | 2010-06-06 15:34:01 +1200 |
commit | 80615f70c699603872c5e49c154e7af04dd83498 (patch) | |
tree | 1a235190a46ed7133b7ec7626e1f87929d1c1c90 | |
parent | 489cfea427b6bb8a8d0a367285d53f9285fdb2b0 (diff) | |
parent | 872d469c5917e80b9e52e7eac7e5357c1bfcf428 (diff) | |
download | rabbitmq-c-github-ask-80615f70c699603872c5e49c154e7af04dd83498.tar.gz |
Merge default into bug22825
39 files changed, 1342 insertions, 282 deletions
diff --git a/README.windows b/README.windows new file mode 100644 index 0000000..723c25e --- /dev/null +++ b/README.windows @@ -0,0 +1,77 @@ +# rabbitmq-c and Windows + +rabbitmq-c can now be built on Windows using the MinGW/MSYS ports of +the GNU toolchain and miscellaneous utilities. This includes the +example programs and tools. + +The results are native Windows DLLs and EXEs, and can be used without +having MinGW installed. But the librabbitmq header files currently +use GCC extensions, and for this reason it is still not possible to +use Microsoft's C/C++ to build applications against the librabbitmq +DLL. Hopefully this will get fixed before long. + + +# Building rabbitmq-c + +rabbitmq-c is built on Windows using MinGW and MSYS. In brief, MinGW +is a native port of the GNU toolchain to Windows; MSYS is a set of +ports of common GNU utilities to run under Windows, so that typical +autotools-based builds will work there. MinGW/MSYS can be used to +build native Windows applications and DLLs, which do not depend on +MinGW/MSYS to run. + +So to build rabbitmq-c on Windows, you need to download and install +the relevant parts of MinGW/MSYS. This can be a fairly time consuming +process - there are about 20 files to be downloaded and unpacked. To +make it easier, we provide a bash script that automates this process, +in rabbitmq-c/etc/install-mingw.sh. You can run this cygwin, or under +Linux and copy the results over or put them on a shared drive. Some +MinGW packages are .tar.lzma files, so it requires a system with xz +and a tar that supports the -J option, which probably rules out OSX. + +Run install-mingw.sh specifying the destination directory, e.g. + + $ etc/install-mingw.sh /tmp/mingw + +Python is needed for the rabbitmq-c build, so you will also need to +install python under Windows. The Windows installer from python.org +will do fine. + +You will need to get the rabbitmq-c and rabbitmq-codegen source code. +If you are getting it from Mercurial, you will need to run "autoreconf +-i" within rabbitmq-c somewhere other than under MinGW first (perhaps +this can be done under MinGW/MSYS, but the packages installed by +install-mingw.sh are not sufficient). + +Open a cmd window, and ensure that both the MinGW bin directory and the python install directory are in the path, e.g. + + C:\>set PATH=%PATH%;C:\mingw\bin;C:\Python26 + +Then start bash, and run the following mount command (substituting the +Windows path of your MinGW install): + + C:\>bash + bash-3.1$ mount 'C:\mingw' /mingw + +Then go to the rabbitmq-c directory, and configure and make as normal: + + bash-3.1$ ./configure && make + [...] + + +# Running the tools without mingw + +You can run the resulting tools EXEs without the rest of MinGW. To do +this, copy the following files into a directory: + +- rabbitmq-c/tools/.libs/*.exe + +- rabbitmq-c/librabbitmq/.libs/librabbitmq-0.dll + +- /bin/libpopt-0.dll + +- /bin/libiconv-2.dll + +- /bin/libintl-8.dll + + diff --git a/configure.ac b/configure.ac index 0269bd3..c516eb8 100644 --- a/configure.ac +++ b/configure.ac @@ -9,6 +9,7 @@ AC_GNU_SOURCE AC_PROG_CC dnl Library checks +AC_LIBTOOL_WIN32_DLL AM_PROG_LIBTOOL dnl Header-file checks @@ -21,6 +22,26 @@ if test "x$GCC" = "xyes"; then fi fi +dnl Detect the kind of host we're building for +AC_CANONICAL_HOST +windows=no +case "${host}" in +*-*-mingw*) + windows=yes + ;; +esac +AM_CONDITIONAL(WINDOWS, test "x$windows" = xyes) +AS_IF([test "x$windows" = xyes], + [AC_DEFINE([WINDOWS], [1], [Define to 1 if on Windows.])] +) + +dnl Decide which API abstraction layer to use +PLATFORM_DIR=unix +if test "x$windows" = xyes ; then + PLATFORM_DIR=windows +fi +AC_SUBST(PLATFORM_DIR) + dnl Enable -m64 if we were asked to do so AC_ARG_ENABLE(64-bit, [ --enable-64-bit produce 64-bit library], @@ -46,7 +67,8 @@ checkPython() { return fi PYTHON=$1 - if $PYTHON -c 'import simplejson' 2>/dev/null + if $PYTHON -c 'import json' 2>/dev/null \ + || $PYTHON -c 'import simplejson' 2>/dev/null then found_python=yes AC_MSG_RESULT($PYTHON) @@ -64,8 +86,12 @@ AC_SUBST(AMQP_CODEGEN_DIR) AC_SUBST(AMQP_SPEC_JSON_PATH) AC_SUBST(PYTHON) -# Check for libpopt, which we need to build the tools +dnl Decide which extra win32 libs we need +EXTRA_LIBS= +AS_IF([test "x$windows" = xyes], [EXTRA_LIBS="-lws2_32 $EXTRA_LIBS"]) +AC_SUBST(EXTRA_LIBS) +dnl Check for libpopt, which we need to build the tools AC_ARG_WITH([popt], [AS_HELP_STRING([--with-popt], [use the popt library. Needed for tools.])], [], diff --git a/etc/install-mingw.sh b/etc/install-mingw.sh new file mode 100755 index 0000000..7fdd339 --- /dev/null +++ b/etc/install-mingw.sh @@ -0,0 +1,69 @@ +#!/bin/bash + +if [ $# -ne 1 ] ; then + echo "usage: install-mingw.sh <destination directory>" 1>&2 + exit 1 +fi + +unpack_dir=$1 + +if [ -eb "$unpack_dir" ] ; then + echo "Destination directory already exists; please delete it if you are sure" 1>&2 + exit 1 +fi + +set -e + +download_dir=/tmp/install-mingw.$$ +mkdir -p $download_dir $unpack_dir + +while read f ; do + wget -P $download_dir -N http://switch.dl.sourceforge.net/project/mingw/$f +done <<EOF +MinGW/mpc/mpc-0.8.1-1/libmpc-0.8.1-1-mingw32-dll-2.tar.lzma +MinGW/BaseSystem/GCC/Version4/gcc-4.5.0-1/gcc-core-4.5.0-1-mingw32-bin.tar.lzma +MinGW/BaseSystem/GCC/Version4/gcc-4.5.0-1/libgcc-4.5.0-1-mingw32-dll-1.tar.lzma +MSYS/BaseSystem/msys-1.0.14-1/msysCORE-1.0.14-1-msys-1.0.14-bin.tar.lzma +MinGW/BaseSystem/GNU-Binutils/binutils-2.20.1/binutils-2.20.1-2-mingw32-bin.tar.gz +MinGW/BaseSystem/RuntimeLibrary/MinGW-RT/mingwrt-3.18/mingwrt-3.18-mingw32-dll.tar.gz +MinGW/BaseSystem/RuntimeLibrary/MinGW-RT/mingwrt-3.18/mingwrt-3.18-mingw32-dev.tar.gz +MinGW/pthreads-w32/pthreads-w32-2.8.0-3/libpthread-2.8.0-3-mingw32-dll-2.tar.lzma +MinGW/mpfr/mpfr-2.4.1-1/libmpfr-2.4.1-1-mingw32-dll-1.tar.lzma +MinGW/gmp/gmp-5.0.1-1/libgmpxx-5.0.1-1-mingw32-dll-4.tar.lzma +MinGW/gmp/gmp-5.0.1-1/libgmp-5.0.1-1-mingw32-dll-10.tar.lzma +MinGW/BaseSystem/RuntimeLibrary/Win32-API/w32api-3.14/w32api-3.14-mingw32-dev.tar.gz +MSYS/make/make-3.81-2/make-3.81-2-msys-1.0.11-bin.tar.lzma +MSYS/bash/bash-3.1.17-2/bash-3.1.17-2-msys-1.0.11-bin.tar.lzma +MSYS/coreutils/coreutils-5.97-2/coreutils-5.97-2-msys-1.0.11-bin.tar.lzma +MinGW/popt/popt-1.15-1/libpopt-1.15-1-mingw32-dll-0.tar.lzma +MinGW/popt/popt-1.15-1/libpopt-1.15-1-mingw32-dev.tar.lzma +MSYS/diffutils/diffutils-2.8.7.20071206cvs-2/diffutils-2.8.7.20071206cvs-2-msys-1.0.11-bin.tar.lzma +MSYS/gawk/gawk-3.1.7-1/gawk-3.1.7-1-msys-1.0.11-bin.tar.lzma +MSYS/grep/grep-2.5.4-1/grep-2.5.4-1-msys-1.0.11-bin.tar.lzma +MSYS/sed/sed-4.2.1-1/sed-4.2.1-1-msys-1.0.11-bin.tar.lzma +MSYS/libtool/libtool-2.2.7a-1/libtool-2.2.7a-1-msys-1.0.11-bin.tar.lzma +MinGW/gettext/gettext-0.17-1/libintl-0.17-1-mingw32-dll-8.tar.lzma +MinGW/gettext/gettext-0.17-1/gettext-0.17-1-mingw32-dev.tar.lzma +MinGW/libiconv/libiconv-1.13.1-1/libiconv-1.13.1-1-mingw32-dll-2.tar.lzma +MinGW/libiconv/libiconv-1.13.1-1/libiconv-1.13.1-1-mingw32-dev.tar.lzma +MinGW/libiconv/libiconv-1.13.1-1/libcharset-1.13.1-1-mingw32-dll-1.tar.lzma +EOF + +for f in $download_dir/* ; do + case $f in + *.tar.gz) + tar -C $unpack_dir -xzf $f + ;; + + *.tar.lzma) + tar -C $unpack_dir -xJf $f + ;; + + *) + echo "Don't know how to unpack $f" 1>&2 + exit 1 + ;; + esac +done + +rm -rf $download_dir diff --git a/examples/amqp_bind.c b/examples/amqp_bind.c index 697df2a..cf7c377 100644 --- a/examples/amqp_bind.c +++ b/examples/amqp_bind.c @@ -99,7 +99,6 @@ int main(int argc, char const * const *argv) { die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel"); die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection"); - amqp_destroy_connection(conn); - die_on_error(close(sockfd), "Closing socket"); + die_on_error(amqp_end_connection(conn), "Ending connection"); return 0; } diff --git a/examples/amqp_consumer.c b/examples/amqp_consumer.c index aa33639..5dfbb33 100644 --- a/examples/amqp_consumer.c +++ b/examples/amqp_consumer.c @@ -84,8 +84,8 @@ static void run(amqp_connection_state_t conn) if (now > next_summary_time) { int countOverInterval = received - previous_received; double intervalRate = countOverInterval / ((now - previous_report_time) / 1000000.0); - printf("%lld ms: Received %d - %d since last report (%d Hz)\n", - (now - start_time) / 1000, received, countOverInterval, (int) intervalRate); + printf("%d ms: Received %d - %d since last report (%d Hz)\n", + (int)(now - start_time) / 1000, received, countOverInterval, (int) intervalRate); previous_received = received; previous_report_time = now; @@ -94,7 +94,8 @@ static void run(amqp_connection_state_t conn) amqp_maybe_release_buffers(conn); result = amqp_simple_wait_frame(conn, &frame); - if (result <= 0) return; + if (result < 0) + return; if (frame.frame_type != AMQP_FRAME_METHOD) continue; @@ -103,7 +104,9 @@ static void run(amqp_connection_state_t conn) continue; result = amqp_simple_wait_frame(conn, &frame); - if (result <= 0) return; + if (result < 0) + return; + if (frame.frame_type != AMQP_FRAME_HEADER) { fprintf(stderr, "Expected header!"); abort(); @@ -114,7 +117,8 @@ static void run(amqp_connection_state_t conn) while (body_received < body_target) { result = amqp_simple_wait_frame(conn, &frame); - if (result <= 0) return; + if (result < 0) + return; if (frame.frame_type != AMQP_FRAME_BODY) { fprintf(stderr, "Expected body!"); @@ -180,8 +184,7 @@ int main(int argc, char const * const *argv) { die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel"); die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection"); - amqp_destroy_connection(conn); - die_on_error(close(sockfd), "Closing socket"); + die_on_error(amqp_end_connection(conn), "Ending connection"); return 0; } diff --git a/examples/amqp_exchange_declare.c b/examples/amqp_exchange_declare.c index 14bc163..32e71b0 100644 --- a/examples/amqp_exchange_declare.c +++ b/examples/amqp_exchange_declare.c @@ -94,7 +94,6 @@ int main(int argc, char const * const *argv) { die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel"); die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection"); - amqp_destroy_connection(conn); - die_on_error(close(sockfd), "Closing socket"); + die_on_error(amqp_end_connection(conn), "Ending connection"); return 0; } diff --git a/examples/amqp_listen.c b/examples/amqp_listen.c index 0162ce6..412dcd5 100644 --- a/examples/amqp_listen.c +++ b/examples/amqp_listen.c @@ -125,7 +125,7 @@ int main(int argc, char const * const *argv) { amqp_maybe_release_buffers(conn); result = amqp_simple_wait_frame(conn, &frame); printf("Result %d\n", result); - if (result <= 0) + if (result < 0) break; printf("Frame type %d, channel %d\n", frame.frame_type, frame.channel); @@ -143,7 +143,7 @@ int main(int argc, char const * const *argv) { (int) d->routing_key.len, (char *) d->routing_key.bytes); result = amqp_simple_wait_frame(conn, &frame); - if (result <= 0) + if (result < 0) break; if (frame.frame_type != AMQP_FRAME_HEADER) { @@ -162,7 +162,7 @@ int main(int argc, char const * const *argv) { while (body_received < body_target) { result = amqp_simple_wait_frame(conn, &frame); - if (result <= 0) + if (result < 0) break; if (frame.frame_type != AMQP_FRAME_BODY) { @@ -187,8 +187,7 @@ int main(int argc, char const * const *argv) { die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel"); die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection"); - amqp_destroy_connection(conn); - die_on_error(close(sockfd), "Closing socket"); + die_on_error(amqp_end_connection(conn), "Ending connection"); return 0; } diff --git a/examples/amqp_listenq.c b/examples/amqp_listenq.c index 9100dec..1fc7db0 100644 --- a/examples/amqp_listenq.c +++ b/examples/amqp_listenq.c @@ -107,7 +107,7 @@ int main(int argc, char const * const *argv) { amqp_maybe_release_buffers(conn); result = amqp_simple_wait_frame(conn, &frame); printf("Result %d\n", result); - if (result <= 0) + if (result < 0) break; printf("Frame type %d, channel %d\n", frame.frame_type, frame.channel); @@ -125,7 +125,7 @@ int main(int argc, char const * const *argv) { (int) d->routing_key.len, (char *) d->routing_key.bytes); result = amqp_simple_wait_frame(conn, &frame); - if (result <= 0) + if (result < 0) break; if (frame.frame_type != AMQP_FRAME_HEADER) { @@ -144,7 +144,7 @@ int main(int argc, char const * const *argv) { while (body_received < body_target) { result = amqp_simple_wait_frame(conn, &frame); - if (result <= 0) + if (result < 0) break; if (frame.frame_type != AMQP_FRAME_BODY) { @@ -171,8 +171,7 @@ int main(int argc, char const * const *argv) { die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel"); die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection"); - amqp_destroy_connection(conn); - die_on_error(close(sockfd), "Closing socket"); + die_on_error(amqp_end_connection(conn), "Ending connection"); return 0; } diff --git a/examples/amqp_producer.c b/examples/amqp_producer.c index ac6eebc..61cc925 100644 --- a/examples/amqp_producer.c +++ b/examples/amqp_producer.c @@ -95,8 +95,8 @@ static void send_batch(amqp_connection_state_t conn, if (now > next_summary_time) { int countOverInterval = sent - previous_sent; double intervalRate = countOverInterval / ((now - previous_report_time) / 1000000.0); - printf("%lld ms: Sent %d - %d since last report (%d Hz)\n", - (now - start_time) / 1000, sent, countOverInterval, (int) intervalRate); + printf("%d ms: Sent %d - %d since last report (%d Hz)\n", + (int)(now - start_time) / 1000, sent, countOverInterval, (int) intervalRate); previous_sent = sent; previous_report_time = now; @@ -111,10 +111,10 @@ static void send_batch(amqp_connection_state_t conn, { long long stop_time = now_microseconds(); - long long total_delta = stop_time - start_time; + int total_delta = stop_time - start_time; printf("PRODUCER - Message count: %d\n", message_count); - printf("Total time, milliseconds: %lld\n", total_delta / 1000); + printf("Total time, milliseconds: %d\n", total_delta / 1000); printf("Overall messages-per-second: %g\n", (message_count / (total_delta / 1000000.0))); } } @@ -151,7 +151,6 @@ int main(int argc, char const * const *argv) { die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel"); die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection"); - amqp_destroy_connection(conn); - die_on_error(close(sockfd), "Closing socket"); + die_on_error(amqp_end_connection(conn), "Ending connection"); return 0; } diff --git a/examples/amqp_sendstring.c b/examples/amqp_sendstring.c index 6e8e0b6..e669505 100644 --- a/examples/amqp_sendstring.c +++ b/examples/amqp_sendstring.c @@ -108,7 +108,6 @@ int main(int argc, char const * const *argv) { die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel"); die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection"); - amqp_destroy_connection(conn); - die_on_error(close(sockfd), "Closing socket"); + die_on_error(amqp_end_connection(conn), "Ending connection"); return 0; } diff --git a/examples/amqp_unbind.c b/examples/amqp_unbind.c index 27df916..bc92efe 100644 --- a/examples/amqp_unbind.c +++ b/examples/amqp_unbind.c @@ -99,7 +99,6 @@ int main(int argc, char const * const *argv) { die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel"); die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection"); - amqp_destroy_connection(conn); - die_on_error(close(sockfd), "Closing socket"); + die_on_error(amqp_end_connection(conn), "Ending connection"); return 0; } diff --git a/examples/example_utils.c b/examples/example_utils.c index 628572c..ae8f093 100644 --- a/examples/example_utils.c +++ b/examples/example_utils.c @@ -61,7 +61,7 @@ void die_on_error(int x, char const *context) { if (x < 0) { - fprintf(stderr, "%s: %s\n", context, strerror(-x)); + fprintf(stderr, "%s: %s\n", context, amqp_error_string(-x)); exit(1); } } @@ -76,8 +76,7 @@ void die_on_amqp_error(amqp_rpc_reply_t x, char const *context) { break; case AMQP_RESPONSE_LIBRARY_EXCEPTION: - fprintf(stderr, "%s: %s\n", context, - x.library_errno ? strerror(x.library_errno) : "(end-of-stream)"); + fprintf(stderr, "%s: %s\n", context, amqp_error_string(x.library_error)); break; case AMQP_RESPONSE_SERVER_EXCEPTION: diff --git a/librabbitmq/Makefile.am b/librabbitmq/Makefile.am index b4c8843..53c9ea1 100644 --- a/librabbitmq/Makefile.am +++ b/librabbitmq/Makefile.am @@ -1,9 +1,12 @@ lib_LTLIBRARIES = librabbitmq.la -librabbitmq_la_SOURCES = amqp_mem.c amqp_table.c amqp_connection.c amqp_socket.c amqp_debug.c amqp_api.c +AM_CFLAGS = -I$(PLATFORM_DIR) +librabbitmq_la_SOURCES = amqp_mem.c amqp_table.c amqp_connection.c amqp_socket.c amqp_debug.c amqp_api.c $(PLATFORM_DIR)/socket.c +librabbitmq_la_LDFLAGS = -no-undefined +librabbitmq_la_LIBADD = $(EXTRA_LIBS) nodist_librabbitmq_la_SOURCES = amqp_framing.c include_HEADERS = amqp_framing.h amqp.h -noinst_HEADERS = amqp_private.h +noinst_HEADERS = amqp_private.h $(PLATFORM_DIR)/socket.h BUILT_SOURCES = amqp_framing.h amqp_framing.c CLEANFILES = amqp_framing.h amqp_framing.c EXTRA_DIST = codegen.py diff --git a/librabbitmq/amqp.h b/librabbitmq/amqp.h index 017b86e..fa76637 100644 --- a/librabbitmq/amqp.h +++ b/librabbitmq/amqp.h @@ -263,7 +263,7 @@ typedef enum amqp_response_type_enum_ { typedef struct amqp_rpc_reply_t_ { amqp_response_type_enum reply_type; amqp_method_t reply; - int library_errno; /* if AMQP_RESPONSE_LIBRARY_EXCEPTION, then 0 here means socket EOF */ + int library_error; /* if AMQP_RESPONSE_LIBRARY_EXCEPTION, then 0 here means socket EOF */ } amqp_rpc_reply_t; typedef enum amqp_sasl_method_enum_ { @@ -310,6 +310,7 @@ extern int amqp_tune_connection(amqp_connection_state_t state, int heartbeat); int amqp_get_channel_max(amqp_connection_state_t state); extern void amqp_destroy_connection(amqp_connection_state_t state); +extern int amqp_end_connection(amqp_connection_state_t state); extern int amqp_handle_input(amqp_connection_state_t state, amqp_bytes_t received_data, @@ -500,6 +501,14 @@ extern amqp_boolean_t amqp_data_in_buffer(amqp_connection_state_t state); */ extern amqp_rpc_reply_t amqp_get_rpc_reply(amqp_connection_state_t state); +/* + * Get the error string for the given error code. + * + * The returned string resides on the heap; the caller is responsible + * for freeing it. + */ +extern const char *amqp_error_string(int err); + #ifdef __cplusplus } #endif diff --git a/librabbitmq/amqp_api.c b/librabbitmq/amqp_api.c index a748f90..3724a37 100644 --- a/librabbitmq/amqp_api.c +++ b/librabbitmq/amqp_api.c @@ -52,7 +52,6 @@ #include <stdio.h> #include <string.h> #include <stdint.h> -#include <errno.h> #include "amqp.h" #include "amqp_framing.h" @@ -60,6 +59,40 @@ #include <assert.h> +static const char *client_error_strings[ERROR_MAX] = { + "could not allocate memory", /* ERROR_NO_MEMORY */ + "received bad AMQP data", /* ERROR_BAD_AQMP_DATA */ + "unknown AMQP class id", /* ERROR_UNKOWN_CLASS */ + "unknown AMQP method id", /* ERROR_UNKOWN_METHOD */ + "unknown host", /* ERROR_HOST_NOT_FOUND */ + "incompatible AMQP version", /* ERROR_INCOMPATIBLE_AMQP_VERSION */ + "connection closed unexpectedly", /* ERROR_CONNECTION_CLOSED */ +}; + +const char *amqp_error_string(int err) +{ + const char *str; + int category = (err & ERROR_CATEGORY_MASK); + err = (err & ~ERROR_CATEGORY_MASK); + + switch (category) { + case ERROR_CATEGORY_CLIENT: + if (err < 1 || err > ERROR_MAX) + str = "(undefined librabbitmq error)"; + else + str = client_error_strings[err - 1]; + break; + + case ERROR_CATEGORY_OS: + return amqp_os_error_string(err); + + default: + str = "(undefined error category)"; + } + + return strdup(str); +} + #define RPC_REPLY(replytype) \ (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL \ ? (replytype *) state->most_recent_api_result.reply.decoded \ diff --git a/librabbitmq/amqp_connection.c b/librabbitmq/amqp_connection.c index 8623eed..63af96a 100644 --- a/librabbitmq/amqp_connection.c +++ b/librabbitmq/amqp_connection.c @@ -52,17 +52,13 @@ #include <stdio.h> #include <string.h> #include <stdint.h> -#include <errno.h> - -#include <unistd.h> -#include <sys/uio.h> -#include <sys/types.h> +#include <assert.h> #include "amqp.h" #include "amqp_framing.h" #include "amqp_private.h" -#include <assert.h> +#include "socket.h" #define INITIAL_FRAME_POOL_PAGE_SIZE 65536 #define INITIAL_DECODING_POOL_PAGE_SIZE 131072 @@ -151,7 +147,7 @@ int amqp_tune_connection(amqp_connection_state_t state, newbuf = realloc(state->outbound_buffer.bytes, frame_max); if (newbuf == NULL) { amqp_destroy_connection(state); - return -ENOMEM; + return -ERROR_NO_MEMORY; } state->outbound_buffer.bytes = newbuf; @@ -170,6 +166,15 @@ void amqp_destroy_connection(amqp_connection_state_t state) { free(state); } +int amqp_end_connection(amqp_connection_state_t state) { + int s = state->sockfd; + amqp_destroy_connection(state); + if (socket_close(s) < 0) + return -encoded_socket_errno(); + else + return 0; +} + static void return_to_idle(amqp_connection_state_t state) { state->inbound_buffer.bytes = NULL; state->inbound_offset = 0; @@ -199,7 +204,7 @@ int amqp_handle_input(amqp_connection_state_t state, /* state->inbound_buffer.len is always nonzero, because it corresponds to frame_max, which is not permitted to be less than AMQP_FRAME_MIN_SIZE (currently 4096 bytes). */ - return -ENOMEM; + return -ERROR_NO_MEMORY; } state->state = CONNECTION_STATE_WAITING_FOR_HEADER; } @@ -246,7 +251,7 @@ int amqp_handle_input(amqp_connection_state_t state, /* Check frame end marker (footer) */ if (D_8(state->inbound_buffer, state->target_size - 1) != AMQP_FRAME_END) { - return -EINVAL; + return -ERROR_BAD_AMQP_DATA; } decoded_frame->channel = D_16(state->inbound_buffer, 1); @@ -392,7 +397,7 @@ static int inner_send_frame(amqp_connection_state_t state, break; default: - return -EINVAL; + abort(); } E_32(state->outbound_buffer, 3, *payload_len); @@ -419,16 +424,14 @@ int amqp_send_frame(amqp_connection_state_t state, amqp_frame_t const *frame) { amqp_bytes_t encoded; - int payload_len; - int separate_body; + int payload_len, res; - separate_body = inner_send_frame(state, frame, &encoded, &payload_len); - switch (separate_body) { + res = inner_send_frame(state, frame, &encoded, &payload_len); + switch (res) { case 0: - AMQP_CHECK_RESULT(write(state->sockfd, - state->outbound_buffer.bytes, - payload_len + (HEADER_SIZE + FOOTER_SIZE))); - return 0; + res = socket_write(state->sockfd, state->outbound_buffer.bytes, + payload_len + (HEADER_SIZE + FOOTER_SIZE)); + break; case 1: { struct iovec iov[3]; @@ -440,13 +443,18 @@ int amqp_send_frame(amqp_connection_state_t state, iov[2].iov_base = &frame_end_byte; assert(FOOTER_SIZE == 1); iov[2].iov_len = FOOTER_SIZE; - AMQP_CHECK_RESULT(writev(state->sockfd, &iov[0], 3)); - return 0; + res = socket_writev(state->sockfd, &iov[0], 3); + break; } default: - return separate_body; + return res; } + + if (res < 0) + return -encoded_socket_errno(); + else + return 0; } int amqp_send_frame_to(amqp_connection_state_t state, diff --git a/librabbitmq/amqp_mem.c b/librabbitmq/amqp_mem.c index 6e52dc8..021151a 100644 --- a/librabbitmq/amqp_mem.c +++ b/librabbitmq/amqp_mem.c @@ -53,7 +53,6 @@ #include <string.h> #include <stdint.h> #include <sys/types.h> -#include <errno.h> #include <assert.h> #include "amqp.h" @@ -102,25 +101,24 @@ void empty_amqp_pool(amqp_pool_t *pool) { empty_blocklist(&pool->pages); } +/* Returns 1 on success, 0 on failure */ static int record_pool_block(amqp_pool_blocklist_t *x, void *block) { size_t blocklistlength = sizeof(void *) * (x->num_blocks + 1); if (x->blocklist == NULL) { x->blocklist = malloc(blocklistlength); - if (x->blocklist == NULL) { - return -ENOMEM; - } + if (x->blocklist == NULL) + return 0; } else { void *newbl = realloc(x->blocklist, blocklistlength); - if (newbl == NULL) { - return -ENOMEM; - } + if (newbl == NULL) + return 0; x->blocklist = newbl; } x->blocklist[x->num_blocks] = block; x->num_blocks++; - return 0; + return 1; } void *amqp_pool_alloc(amqp_pool_t *pool, size_t amount) { @@ -135,9 +133,8 @@ void *amqp_pool_alloc(amqp_pool_t *pool, size_t amount) { if (result == NULL) { return NULL; } - if (record_pool_block(&pool->large_blocks, result) != 0) { + if (!record_pool_block(&pool->large_blocks, result)) return NULL; - } return result; } @@ -156,9 +153,8 @@ void *amqp_pool_alloc(amqp_pool_t *pool, size_t amount) { if (pool->alloc_block == NULL) { return NULL; } - if (record_pool_block(&pool->pages, pool->alloc_block) != 0) { + if (!record_pool_block(&pool->pages, pool->alloc_block)) return NULL; - } pool->next_page = pool->pages.num_blocks; } else { pool->alloc_block = pool->pages.blocklist[pool->next_page]; diff --git a/librabbitmq/amqp_private.h b/librabbitmq/amqp_private.h index 3985619..7206ae5 100644 --- a/librabbitmq/amqp_private.h +++ b/librabbitmq/amqp_private.h @@ -55,7 +55,28 @@ extern "C" { #endif -#include <arpa/inet.h> /* ntohl, htonl, ntohs, htons */ +/* Error numbering: Because of differences in error numbering on + * different platforms, we want to keep error numbers opaque for + * client code. Internally, we encode the category of an error + * (i.e. where its number comes from) in the top bits of the number + * (assuming that an int has at least 32 bits). + */ +#define ERROR_CATEGORY_MASK (1 << 29) + +#define ERROR_CATEGORY_CLIENT (0 << 29) /* librabbitmq error codes */ +#define ERROR_CATEGORY_OS (1 << 29) /* OS-specific error codes */ + +/* librabbitmq error codes */ +#define ERROR_NO_MEMORY 1 +#define ERROR_BAD_AMQP_DATA 2 +#define ERROR_UNKNOWN_CLASS 3 +#define ERROR_UNKNOWN_METHOD 4 +#define ERROR_HOST_NOT_FOUND 5 +#define ERROR_INCOMPATIBLE_AMQP_VERSION 6 +#define ERROR_CONNECTION_CLOSED 7 +#define ERROR_MAX 7 + +extern const char *amqp_os_error_string(int err); /* * Connection states: @@ -125,7 +146,7 @@ struct amqp_connection_state_t_ { amqp_rpc_reply_t most_recent_api_result; }; -#define CHECK_LIMIT(b, o, l, v) ({ if ((o + l) > (b).len) { return -EFAULT; } (v); }) +#define CHECK_LIMIT(b, o, l, v) ({ if ((o + l) > (b).len) { return -ERROR_BAD_AMQP_DATA; } (v); }) #define BUF_AT(b, o) (&(((uint8_t *) (b).bytes)[o])) #define D_8(b, o) CHECK_LIMIT(b, o, 1, * (uint8_t *) BUF_AT(b, o)) @@ -176,13 +197,6 @@ extern int amqp_encode_table(amqp_bytes_t encoded, #define AMQP_CHECK_RESULT(expr) AMQP_CHECK_RESULT_CLEANUP(expr, ) -#define AMQP_CHECK_EOF_RESULT(expr) \ - ({ \ - int _result = (expr); \ - if (_result <= 0) return _result; \ - _result; \ - }) - #ifndef NDEBUG extern void amqp_dump(void const *buffer, size_t len); #else diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c index d16c319..6425c34 100644 --- a/librabbitmq/amqp_socket.c +++ b/librabbitmq/amqp_socket.c @@ -52,43 +52,40 @@ #include <stdio.h> #include <string.h> #include <stdint.h> -#include <errno.h> #include <stdarg.h> +#include <assert.h> #include "amqp.h" #include "amqp_framing.h" #include "amqp_private.h" -#include <sys/types.h> -#include <sys/uio.h> -#include <unistd.h> -#include <sys/socket.h> -#include <netdb.h> -#include <netinet/in.h> +#include "socket.h" -#include <assert.h> int amqp_open_socket(char const *hostname, int portnumber) { - int sockfd; + int sockfd, res; struct sockaddr_in addr; struct hostent *he; + res = socket_init(); + if (res) + return res; + he = gethostbyname(hostname); - if (he == NULL) { - return -ENOENT; - } + if (he == NULL) + return -ERROR_HOST_NOT_FOUND; addr.sin_family = AF_INET; addr.sin_port = htons(portnumber); addr.sin_addr.s_addr = * (uint32_t *) he->h_addr_list[0]; - sockfd = socket(PF_INET, SOCK_STREAM, 0); - if (connect(sockfd, (struct sockaddr *) &addr, sizeof(addr)) < 0) { - int result = -errno; - close(sockfd); - return result; + sockfd = socket_socket(PF_INET, SOCK_STREAM, 0); + if (socket_connect(sockfd, (struct sockaddr *) &addr, sizeof(addr)) < 0) { + res = -encoded_socket_errno(); + socket_close(sockfd); + return res; } return sockfd; @@ -108,7 +105,7 @@ static char *header() { } int amqp_send_header(amqp_connection_state_t state) { - return write(state->sockfd, header(), 8); + return socket_write(state->sockfd, header(), 8); } int amqp_send_header_to(amqp_connection_state_t state, @@ -183,24 +180,22 @@ static int wait_frame_inner(amqp_connection_state_t state, AMQP_CHECK_RESULT((result = amqp_handle_input(state, buffer, decoded_frame))); state->sock_inbound_offset += result; - if (decoded_frame->frame_type != 0) { + if (decoded_frame->frame_type != 0) /* Complete frame was read. Return it. */ - return 1; - } + return 0; /* Incomplete or ignored frame. Keep processing input. */ assert(result != 0); } - result = read(state->sockfd, - state->sock_inbound_buffer.bytes, - state->sock_inbound_buffer.len); - if (result < 0) { - return -errno; - } - if (result == 0) { - /* EOF. */ - return 0; + result = socket_read(state->sockfd, + state->sock_inbound_buffer.bytes, + state->sock_inbound_buffer.len); + if (result <= 0) { + if (result == 0) + return -ERROR_CONNECTION_CLOSED; + else + return -encoded_socket_errno(); } state->sock_inbound_limit = result; @@ -218,7 +213,7 @@ int amqp_simple_wait_frame(amqp_connection_state_t state, state->last_queued_frame = NULL; } *decoded_frame = *f; - return 1; + return 0; } else { return wait_frame_inner(state, decoded_frame); } @@ -230,8 +225,10 @@ int amqp_simple_wait_method(amqp_connection_state_t state, amqp_method_t *output) { amqp_frame_t frame; - - AMQP_CHECK_EOF_RESULT(amqp_simple_wait_frame(state, &frame)); + int res = amqp_simple_wait_frame(state, &frame); + if (res < 0) + return res; + amqp_assert(frame.channel == expected_channel, "Expected 0x%08X method frame on channel %d, got frame on channel %d", expected_method, @@ -248,7 +245,7 @@ int amqp_simple_wait_method(amqp_connection_state_t state, expected_channel, frame.payload.method.id); *output = frame.payload.method; - return 1; + return 0; } int amqp_send_method(amqp_connection_state_t state, @@ -288,7 +285,7 @@ amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state, status = amqp_send_method(state, channel, request_id, decoded_request_method); if (status < 0) { result.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION; - result.library_errno = -status; + result.library_error = -status; return result; } @@ -297,9 +294,9 @@ amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state, retry: status = wait_frame_inner(state, &frame); - if (status <= 0) { + if (status < 0) { result.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION; - result.library_errno = -status; + result.library_error = -status; return result; } @@ -324,7 +321,7 @@ amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state, if (frame_copy == NULL || link == NULL) { result.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION; - result.library_errno = ENOMEM; + result.library_error = ERROR_NO_MEMORY; return result; } @@ -359,6 +356,7 @@ static int amqp_login_inner(amqp_connection_state_t state, amqp_sasl_method_enum sasl_method, va_list vl) { + int res; amqp_method_t method; uint32_t server_frame_max; uint16_t server_channel_max; @@ -366,12 +364,16 @@ static int amqp_login_inner(amqp_connection_state_t state, amqp_send_header(state); - AMQP_CHECK_EOF_RESULT(amqp_simple_wait_method(state, 0, AMQP_CONNECTION_START_METHOD, &method)); + res = amqp_simple_wait_method(state, 0, AMQP_CONNECTION_START_METHOD, + &method); + if (res < 0) + return res; + { amqp_connection_start_t *s = (amqp_connection_start_t *) method.decoded; if ((s->version_major != AMQP_PROTOCOL_VERSION_MAJOR) || (s->version_minor != AMQP_PROTOCOL_VERSION_MINOR)) { - return -EPROTOTYPE; + return -ERROR_INCOMPATIBLE_AMQP_VERSION; } /* TODO: check that our chosen SASL mechanism is in the list of @@ -383,7 +385,7 @@ static int amqp_login_inner(amqp_connection_state_t state, amqp_bytes_t response_bytes = sasl_response(&state->decoding_pool, sasl_method, vl); amqp_connection_start_ok_t s; if (response_bytes.bytes == NULL) { - return -ENOMEM; + return -ERROR_NO_MEMORY; } s = (amqp_connection_start_ok_t) { @@ -397,7 +399,11 @@ static int amqp_login_inner(amqp_connection_state_t state, amqp_release_buffers(state); - AMQP_CHECK_EOF_RESULT(amqp_simple_wait_method(state, 0, AMQP_CONNECTION_TUNE_METHOD, &method)); + res = amqp_simple_wait_method(state, 0, AMQP_CONNECTION_TUNE_METHOD, + &method); + if (res < 0) + return res; + { amqp_connection_tune_t *s = (amqp_connection_tune_t *) method.decoded; server_channel_max = s->channel_max; @@ -431,7 +437,7 @@ static int amqp_login_inner(amqp_connection_state_t state, amqp_release_buffers(state); - return 1; + return 0; } amqp_rpc_reply_t amqp_login(amqp_connection_state_t state, @@ -449,11 +455,11 @@ amqp_rpc_reply_t amqp_login(amqp_connection_state_t state, va_start(vl, sasl_method); status = amqp_login_inner(state, channel_max, frame_max, heartbeat, sasl_method, vl); - if (status <= 0) { + if (status < 0) { result.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION; result.reply.id = 0; result.reply.decoded = NULL; - result.library_errno = -status; + result.library_error = -status; return result; } @@ -481,6 +487,6 @@ amqp_rpc_reply_t amqp_login(amqp_connection_state_t state, result.reply_type = AMQP_RESPONSE_NORMAL; result.reply.id = 0; result.reply.decoded = NULL; - result.library_errno = 0; + result.library_error = 0; return result; } diff --git a/librabbitmq/amqp_table.c b/librabbitmq/amqp_table.c index 25c5932..3f5eb61 100644 --- a/librabbitmq/amqp_table.c +++ b/librabbitmq/amqp_table.c @@ -52,10 +52,10 @@ #include <stdio.h> #include <string.h> #include <stdint.h> -#include <errno.h> #include "amqp.h" #include "amqp_private.h" +#include "socket.h" #include <assert.h> @@ -86,7 +86,7 @@ static int amqp_decode_array(amqp_bytes_t encoded, int limit; if (entries == NULL) { - return -ENOMEM; + return -ERROR_NO_MEMORY; } offset += 4; @@ -99,7 +99,7 @@ static int amqp_decode_array(amqp_bytes_t encoded, newentries = realloc(entries, allocated_entries * sizeof(amqp_field_value_t)); if (newentries == NULL) { free(entries); - return -ENOMEM; + return -ERROR_NO_MEMORY; } entries = newentries; } @@ -117,7 +117,7 @@ static int amqp_decode_array(amqp_bytes_t encoded, if (output->entries == NULL && num_entries > 0) { /* NULL is legitimate if we requested a zero-length block. */ free(entries); - return -ENOMEM; + return -ERROR_NO_MEMORY; } memcpy(output->entries, entries, num_entries * sizeof(amqp_field_value_t)); @@ -140,7 +140,7 @@ int amqp_decode_table(amqp_bytes_t encoded, int limit; if (entries == NULL) { - return -ENOMEM; + return -ERROR_NO_MEMORY; } offset += 4; @@ -159,7 +159,7 @@ int amqp_decode_table(amqp_bytes_t encoded, newentries = realloc(entries, allocated_entries * sizeof(amqp_table_entry_t)); if (newentries == NULL) { free(entries); - return -ENOMEM; + return -ERROR_NO_MEMORY; } entries = newentries; } @@ -182,7 +182,7 @@ int amqp_decode_table(amqp_bytes_t encoded, if (output->entries == NULL && num_entries > 0) { /* NULL is legitimate if we requested a zero-length block. */ free(entries); - return -ENOMEM; + return -ERROR_NO_MEMORY; } memcpy(output->entries, entries, num_entries * sizeof(amqp_table_entry_t)); @@ -274,7 +274,7 @@ static int amqp_decode_field_value(amqp_bytes_t encoded, case AMQP_FIELD_KIND_VOID: break; default: - return -EINVAL; + return -ERROR_BAD_AMQP_DATA; } *offsetptr = offset; @@ -410,7 +410,7 @@ static int amqp_encode_field_value(amqp_bytes_t encoded, case AMQP_FIELD_KIND_VOID: break; default: - return -EINVAL; + abort(); } *offsetptr = offset; diff --git a/librabbitmq/codegen.py b/librabbitmq/codegen.py index 7aea9f4..6b38666 100644 --- a/librabbitmq/codegen.py +++ b/librabbitmq/codegen.py @@ -170,7 +170,7 @@ def genErl(spec): if m.arguments: print " %s *m = (%s *) amqp_pool_alloc(pool, sizeof(%s));" % \ (m.structName(), m.structName(), m.structName()) - print " if (m == NULL) { return -ENOMEM; }" + print " if (m == NULL) { return -ERROR_NO_MEMORY; }" else: print " %s *m = NULL; /* no fields */" % (m.structName(),) bitindex = None @@ -197,7 +197,7 @@ def genErl(spec): print " case %d: {" % (c.index,) print " %s *p = (%s *) amqp_pool_alloc(pool, sizeof(%s));" % \ (c.structName(), c.structName(), c.structName()) - print " if (p == NULL) { return -ENOMEM; }" + print " if (p == NULL) { return -ERROR_NO_MEMORY; }" print " p->_flags = flags;" for f in c.fields: if spec.resolveDomain(f.domain) == 'bit': @@ -261,12 +261,11 @@ def genErl(spec): print '#include <stdint.h>' print '#include <string.h>' print '#include <stdio.h>' - print '#include <errno.h>' - print '#include <arpa/inet.h> /* ntohl, htonl, ntohs, htons */' print print '#include "amqp.h"' print '#include "amqp_framing.h"' print '#include "amqp_private.h"' + print '#include "socket.h"' print """ char const *amqp_constant_name(int constantNumber) { @@ -317,7 +316,7 @@ int amqp_decode_method(amqp_method_number_t methodNumber, switch (methodNumber) {""" for m in methods: genDecodeMethodFields(m) - print """ default: return -ENOENT; + print """ default: return -ERROR_UNKNOWN_METHOD; } }""" @@ -343,7 +342,7 @@ int amqp_decode_properties(uint16_t class_id, switch (class_id) {""" for c in spec.allClasses(): genDecodeProperties(c) - print """ default: return -ENOENT; + print """ default: return -ERROR_UNKNOWN_CLASS; } }""" @@ -358,7 +357,7 @@ int amqp_encode_method(amqp_method_number_t methodNumber, switch (methodNumber) {""" for m in methods: genEncodeMethodFields(m) - print """ default: return -ENOENT; + print """ default: return -ERROR_UNKNOWN_METHOD; } }""" @@ -390,7 +389,7 @@ int amqp_encode_properties(uint16_t class_id, switch (class_id) {""" for c in spec.allClasses(): genEncodeProperties(c) - print """ default: return -ENOENT; + print """ default: return -ERROR_UNKNOWN_CLASS; } }""" diff --git a/librabbitmq/unix/socket.c b/librabbitmq/unix/socket.c new file mode 100644 index 0000000..51db413 --- /dev/null +++ b/librabbitmq/unix/socket.c @@ -0,0 +1,85 @@ +/* + * ***** BEGIN LICENSE BLOCK ***** + * Version: MPL 1.1/GPL 2.0 + * + * The contents of this file are subject to the Mozilla Public License + * Version 1.1 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * http://www.mozilla.org/MPL/ + * + * Software distributed under the License is distributed on an "AS IS" + * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See + * the License for the specific language governing rights and + * limitations under the License. + * + * The Original Code is librabbitmq. + * + * The Initial Developers of the Original Code are LShift Ltd, Cohesive + * Financial Technologies LLC, and Rabbit Technologies Ltd. Portions + * created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, Cohesive + * Financial Technologies LLC, or Rabbit Technologies Ltd are Copyright + * (C) 2007-2008 LShift Ltd, Cohesive Financial Technologies LLC, and + * Rabbit Technologies Ltd. + * + * Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift + * Ltd. Portions created by Cohesive Financial Technologies LLC are + * Copyright (C) 2007-2010 Cohesive Financial Technologies + * LLC. Portions created by Rabbit Technologies Ltd are Copyright (C) + * 2007-2010 Rabbit Technologies Ltd. + * + * Portions created by Tony Garnock-Jones are Copyright (C) 2009-2010 + * LShift Ltd and Tony Garnock-Jones. + * + * All Rights Reserved. + * + * Contributor(s): ______________________________________. + * + * Alternatively, the contents of this file may be used under the terms + * of the GNU General Public License Version 2 or later (the "GPL"), in + * which case the provisions of the GPL are applicable instead of those + * above. If you wish to allow use of your version of this file only + * under the terms of the GPL, and not to allow others to use your + * version of this file under the terms of the MPL, indicate your + * decision by deleting the provisions above and replace them with the + * notice and other provisions required by the GPL. If you do not + * delete the provisions above, a recipient may use your version of + * this file under the terms of any one of the MPL or the GPL. + * + * ***** END LICENSE BLOCK ***** + */ + +#include <sys/socket.h> +#include <unistd.h> +#include <fcntl.h> +#include <stdint.h> +#include <string.h> + +#include "amqp.h" +#include "amqp_private.h" +#include "socket.h" + +int socket_socket(int domain, int type, int proto) +{ + int flags; + + int s = socket(domain, type, proto); + if (s < 0) + return s; + + /* Always enable CLOEXEC on the socket */ + flags = fcntl(s, F_GETFD); + if (flags == -1 + || fcntl(s, F_SETFD, (long)(flags | FD_CLOEXEC)) == -1) { + int e = errno; + close(s); + errno = e; + return -1; + } + + return s; +} + +const char *amqp_os_error_string(int err) +{ + return strdup(strerror(err)); +} diff --git a/librabbitmq/unix/socket.h b/librabbitmq/unix/socket.h new file mode 100644 index 0000000..d7295c3 --- /dev/null +++ b/librabbitmq/unix/socket.h @@ -0,0 +1,80 @@ +#ifndef librabbitmq_unix_socket_h +#define librabbitmq_unix_socket_h + +/* + * ***** BEGIN LICENSE BLOCK ***** + * Version: MPL 1.1/GPL 2.0 + * + * The contents of this file are subject to the Mozilla Public License + * Version 1.1 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * http://www.mozilla.org/MPL/ + * + * Software distributed under the License is distributed on an "AS IS" + * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See + * the License for the specific language governing rights and + * limitations under the License. + * + * The Original Code is librabbitmq. + * + * The Initial Developers of the Original Code are LShift Ltd, Cohesive + * Financial Technologies LLC, and Rabbit Technologies Ltd. Portions + * created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, Cohesive + * Financial Technologies LLC, or Rabbit Technologies Ltd are Copyright + * (C) 2007-2008 LShift Ltd, Cohesive Financial Technologies LLC, and + * Rabbit Technologies Ltd. + * + * Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift + * Ltd. Portions created by Cohesive Financial Technologies LLC are + * Copyright (C) 2007-2010 Cohesive Financial Technologies + * LLC. Portions created by Rabbit Technologies Ltd are Copyright (C) + * 2007-2010 Rabbit Technologies Ltd. + * + * Portions created by Tony Garnock-Jones are Copyright (C) 2009-2010 + * LShift Ltd and Tony Garnock-Jones. + * + * All Rights Reserved. + * + * Contributor(s): ______________________________________. + * + * Alternatively, the contents of this file may be used under the terms + * of the GNU General Public License Version 2 or later (the "GPL"), in + * which case the provisions of the GPL are applicable instead of those + * above. If you wish to allow use of your version of this file only + * under the terms of the GPL, and not to allow others to use your + * version of this file under the terms of the MPL, indicate your + * decision by deleting the provisions above and replace them with the + * notice and other provisions required by the GPL. If you do not + * delete the provisions above, a recipient may use your version of + * this file under the terms of any one of the MPL or the GPL. + * + * ***** END LICENSE BLOCK ***** + */ + +#include <errno.h> +#include <sys/types.h> +#include <unistd.h> +#include <sys/uio.h> +#include <sys/socket.h> +#include <netdb.h> +#include <netinet/in.h> + +static inline int socket_init(void) +{ + return 0; +} + +extern int socket_socket(int domain, int type, int proto); + +#define socket_connect connect +#define socket_close close +#define socket_read read +#define socket_write write +#define socket_writev writev + +static inline int encoded_socket_errno() +{ + return errno | ERROR_CATEGORY_OS; +} + +#endif diff --git a/librabbitmq/windows/socket.c b/librabbitmq/windows/socket.c new file mode 100644 index 0000000..f809f62 --- /dev/null +++ b/librabbitmq/windows/socket.c @@ -0,0 +1,88 @@ +/* + * ***** BEGIN LICENSE BLOCK ***** + * Version: MPL 1.1/GPL 2.0 + * + * The contents of this file are subject to the Mozilla Public License + * Version 1.1 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * http://www.mozilla.org/MPL/ + * + * Software distributed under the License is distributed on an "AS IS" + * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See + * the License for the specific language governing rights and + * limitations under the License. + * + * The Original Code is librabbitmq. + * + * The Initial Developers of the Original Code are LShift Ltd, Cohesive + * Financial Technologies LLC, and Rabbit Technologies Ltd. Portions + * created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, Cohesive + * Financial Technologies LLC, or Rabbit Technologies Ltd are Copyright + * (C) 2007-2008 LShift Ltd, Cohesive Financial Technologies LLC, and + * Rabbit Technologies Ltd. + * + * Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift + * Ltd. Portions created by Cohesive Financial Technologies LLC are + * Copyright (C) 2007-2010 Cohesive Financial Technologies + * LLC. Portions created by Rabbit Technologies Ltd are Copyright (C) + * 2007-2010 Rabbit Technologies Ltd. + * + * Portions created by Tony Garnock-Jones are Copyright (C) 2009-2010 + * LShift Ltd and Tony Garnock-Jones. + * + * All Rights Reserved. + * + * Contributor(s): ______________________________________. + * + * Alternatively, the contents of this file may be used under the terms + * of the GNU General Public License Version 2 or later (the "GPL"), in + * which case the provisions of the GPL are applicable instead of those + * above. If you wish to allow use of your version of this file only + * under the terms of the GPL, and not to allow others to use your + * version of this file under the terms of the MPL, indicate your + * decision by deleting the provisions above and replace them with the + * notice and other provisions required by the GPL. If you do not + * delete the provisions above, a recipient may use your version of + * this file under the terms of any one of the MPL or the GPL. + * + * ***** END LICENSE BLOCK ***** + */ + +#include <windows.h> +#include <stdint.h> + +#include "amqp.h" +#include "amqp_private.h" +#include "socket.h" + +static int called_wsastartup; + +int socket_init(void) +{ + if (!called_wsastartup) { + WSADATA data; + int res = WSAStartup(0x0202, &data); + if (res) + return -res; + + called_wsastartup = 1; + } + + return 0; +} + +const char *amqp_os_error_string(int err) +{ + char *msg, *copy; + + if (!FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM + | FORMAT_MESSAGE_ALLOCATE_BUFFER, + NULL, err, + MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), + (LPSTR)&msg, 0, NULL)) + return strdup("(error retrieving Windows error message)"); + + copy = strdup(msg); + LocalFree(msg); + return copy; +} diff --git a/librabbitmq/windows/socket.h b/librabbitmq/windows/socket.h new file mode 100644 index 0000000..bff6efc --- /dev/null +++ b/librabbitmq/windows/socket.h @@ -0,0 +1,92 @@ +#ifndef librabbitmq_windows_socket_h +#define librabbitmq_windows_socket_h + +/* + * ***** BEGIN LICENSE BLOCK ***** + * Version: MPL 1.1/GPL 2.0 + * + * The contents of this file are subject to the Mozilla Public License + * Version 1.1 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * http://www.mozilla.org/MPL/ + * + * Software distributed under the License is distributed on an "AS IS" + * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See + * the License for the specific language governing rights and + * limitations under the License. + * + * The Original Code is librabbitmq. + * + * The Initial Developers of the Original Code are LShift Ltd, Cohesive + * Financial Technologies LLC, and Rabbit Technologies Ltd. Portions + * created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, Cohesive + * Financial Technologies LLC, or Rabbit Technologies Ltd are Copyright + * (C) 2007-2008 LShift Ltd, Cohesive Financial Technologies LLC, and + * Rabbit Technologies Ltd. + * + * Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift + * Ltd. Portions created by Cohesive Financial Technologies LLC are + * Copyright (C) 2007-2010 Cohesive Financial Technologies + * LLC. Portions created by Rabbit Technologies Ltd are Copyright (C) + * 2007-2010 Rabbit Technologies Ltd. + * + * Portions created by Tony Garnock-Jones are Copyright (C) 2009-2010 + * LShift Ltd and Tony Garnock-Jones. + * + * All Rights Reserved. + * + * Contributor(s): ______________________________________. + * + * Alternatively, the contents of this file may be used under the terms + * of the GNU General Public License Version 2 or later (the "GPL"), in + * which case the provisions of the GPL are applicable instead of those + * above. If you wish to allow use of your version of this file only + * under the terms of the GPL, and not to allow others to use your + * version of this file under the terms of the MPL, indicate your + * decision by deleting the provisions above and replace them with the + * notice and other provisions required by the GPL. If you do not + * delete the provisions above, a recipient may use your version of + * this file under the terms of any one of the MPL or the GPL. + * + * ***** END LICENSE BLOCK ***** + */ + +#include <winsock2.h> + +extern int socket_init(void); + +#define socket_socket socket +#define socket_connect connect +#define socket_close closesocket + +static inline int socket_read(int sock, void *buf, size_t count) +{ + return recv(sock, buf, count, 0); +} + +static inline int socket_write(int sock, void *buf, size_t count) +{ + return send(sock, buf, count, 0); +} + +/* same as WSABUF */ +struct iovec { + u_long iov_len; + char *iov_base; +}; + +static inline int socket_writev(int sock, struct iovec *iov, int nvecs) +{ + DWORD ret; + if (WSASend(sock, (LPWSABUF)iov, nvecs, &ret, 0, NULL, NULL) == 0) + return ret; + else + return -1; +} + +static inline int encoded_socket_errno() +{ + return WSAGetLastError() | ERROR_CATEGORY_OS; +} + +#endif diff --git a/tests/Makefile.am b/tests/Makefile.am index 1ac6faf..7c8a4fe 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -1,4 +1,4 @@ noinst_PROGRAMS = test_tables -AM_CFLAGS = -I$(top_srcdir)/librabbitmq +AM_CFLAGS = -I$(top_srcdir)/librabbitmq -I$(top_srcdir)/librabbitmq/$(PLATFORM_DIR) AM_LDFLAGS = $(top_builddir)/librabbitmq/librabbitmq.la diff --git a/tests/test_tables.c b/tests/test_tables.c index e620443..1a0652a 100644 --- a/tests/test_tables.c +++ b/tests/test_tables.c @@ -54,7 +54,8 @@ #include <time.h> #include <errno.h> -#include <stdint.h> +#include <inttypes.h> + #include <amqp.h> #include <amqp_framing.h> #include <amqp_private.h> @@ -75,13 +76,13 @@ static void dump_value(int indent, amqp_field_value_t v) { putchar(' '); switch (v.kind) { case AMQP_FIELD_KIND_BOOLEAN: puts(v.value.boolean ? "true" : "false"); break; - case AMQP_FIELD_KIND_I8: printf("%d\n", v.value.i8); break; - case AMQP_FIELD_KIND_U8: printf("%d\n", v.value.u8); break; - case AMQP_FIELD_KIND_I16: printf("%d\n", v.value.i16); break; - case AMQP_FIELD_KIND_U16: printf("%d\n", v.value.u16); break; - case AMQP_FIELD_KIND_I32: printf("%ld\n", (long) v.value.i32); break; - case AMQP_FIELD_KIND_U32: printf("%lu\n", (unsigned long) v.value.u32); break; - case AMQP_FIELD_KIND_I64: printf("%lld\n", (long long) v.value.i64); break; + case AMQP_FIELD_KIND_I8: printf("%"PRId8"\n", v.value.i8); break; + case AMQP_FIELD_KIND_U8: printf("%"PRIu8"\n", v.value.u8); break; + case AMQP_FIELD_KIND_I16: printf("%"PRId16"\n", v.value.i16); break; + case AMQP_FIELD_KIND_U16: printf("%"PRIu16"\n", v.value.u16); break; + case AMQP_FIELD_KIND_I32: printf("%"PRId32"\n", v.value.i32); break; + case AMQP_FIELD_KIND_U32: printf("%"PRIu32"\n", v.value.u32); break; + case AMQP_FIELD_KIND_I64: printf("%"PRId64"\n", v.value.i64); break; case AMQP_FIELD_KIND_F32: printf("%g\n", (double) v.value.f32); break; case AMQP_FIELD_KIND_F64: printf("%g\n", v.value.f64); break; case AMQP_FIELD_KIND_DECIMAL: @@ -106,7 +107,7 @@ static void dump_value(int indent, amqp_field_value_t v) { } } break; - case AMQP_FIELD_KIND_TIMESTAMP: printf("%llu\n", (unsigned long long) v.value.u64); break; + case AMQP_FIELD_KIND_TIMESTAMP: printf("%"PRIu64"\n", v.value.u64); break; case AMQP_FIELD_KIND_TABLE: putchar('\n'); { @@ -209,7 +210,8 @@ static void test_table_codec(void) { int decoding_offset = 0; result = amqp_decode_table(decoding_bytes, &pool, &decoded, &decoding_offset); if (result < 0) { - printf("Table decoding failed: %d (%s)\n", result, strerror(-result)); + printf("Table decoding failed: %d (%s)\n", result, + amqp_error_string(-result)); abort(); } printf("BBBBBBBBBB\n"); @@ -227,7 +229,8 @@ static void test_table_codec(void) { result = amqp_encode_table(encoding_result, &table, &offset); if (result < 0) { - printf("Table encoding failed: %d (%s)\n", result, strerror(-result)); + printf("Table encoding failed: %d (%s)\n", result, + amqp_error_string(-result)); abort(); } @@ -272,7 +275,7 @@ int main(int argc, char const * const *argv) { if ((sizeof(float) != 4) || (vi.i != 0x40490fdb)) { printf("*** ERROR: single floating point encoding does not work as expected\n"); printf("sizeof float is %lu, float is %g, u32 is 0x%08lx\n", - sizeof(float), + (unsigned long)sizeof(float), vi.f, (unsigned long) vi.i); } @@ -280,10 +283,9 @@ int main(int argc, char const * const *argv) { vl.d = M_PI; if ((sizeof(double) != 8) || (vl.l != 0x400921fb54442d18L)) { printf("*** ERROR: double floating point encoding does not work as expected\n"); - printf("sizeof double is %lu, double is %g, u64 is 0x%16llx\n", - sizeof(double), - vl.d, - (unsigned long long) vl.l); + printf("sizeof double is %lu, double is %g, u64 is 0x%16"PRIx64"\n", + (unsigned long)sizeof(double), + vl.d, vl.l); } test_table_codec(); diff --git a/tools/Makefile.am b/tools/Makefile.am index 2c47385..ad53d88 100644 --- a/tools/Makefile.am +++ b/tools/Makefile.am @@ -2,15 +2,21 @@ SUBDIRS=doc bin_PROGRAMS = amqp-publish amqp-get amqp-consume amqp-declare-queue amqp-delete-queue -AM_CFLAGS = -I$(top_srcdir)/librabbitmq +AM_CFLAGS = -I$(top_srcdir)/librabbitmq -I$(PLATFORM_DIR) AM_LDFLAGS = $(top_builddir)/librabbitmq/librabbitmq.la LDADD=$(LIBPOPT) -noinst_HEADERS = common.h +noinst_HEADERS = common.h $(PLATFORM_DIR)/process.h -amqp_publish_SOURCES = publish.c common.c -amqp_get_SOURCES = get.c common.c -amqp_consume_SOURCES = consume.c common.c -amqp_declare_queue_SOURCES = declare_queue.c common.c -amqp_delete_queue_SOURCES = delete_queue.c common.c +COMMON_SOURCES = common.c + +if WINDOWS +COMMON_SOURCES += windows/compat.c +endif + +amqp_publish_SOURCES = publish.c $(COMMON_SOURCES) +amqp_get_SOURCES = get.c $(COMMON_SOURCES) +amqp_consume_SOURCES = consume.c $(PLATFORM_DIR)/process.c $(COMMON_SOURCES) +amqp_declare_queue_SOURCES = declare_queue.c $(COMMON_SOURCES) +amqp_delete_queue_SOURCES = delete_queue.c $(COMMON_SOURCES) diff --git a/tools/common.c b/tools/common.c index 6a38a95..39099c1 100644 --- a/tools/common.c +++ b/tools/common.c @@ -58,14 +58,12 @@ #include <unistd.h> #include <fcntl.h> #include <errno.h> -#include <spawn.h> -#include <sys/wait.h> - -#include <popt.h> #include "common.h" -extern char **environ; +#ifdef WINDOWS +#include "compat.h" +#endif void die(const char *fmt, ...) { @@ -86,11 +84,24 @@ void die_errno(int err, const char *fmt, ...) va_start(ap, fmt); vfprintf(stderr, fmt, ap); va_end(ap); - fprintf(stderr, ": %s\n", strerror(err)); + fprintf(stderr, ": %s\n", strerror(errno)); exit(1); } -char *amqp_server_exception_string(amqp_rpc_reply_t r) +void die_amqp_error(int err, const char *fmt, ...) +{ + if (err <= 0) + return; + + va_list ap; + va_start(ap, fmt); + vfprintf(stderr, fmt, ap); + va_end(ap); + fprintf(stderr, ": %s\n", amqp_error_string(err)); + exit(1); +} + +const char *amqp_server_exception_string(amqp_rpc_reply_t r) { int res; char *s; @@ -125,26 +136,17 @@ char *amqp_server_exception_string(amqp_rpc_reply_t r) return res >= 0 ? s : NULL; } -char *amqp_rpc_reply_string(amqp_rpc_reply_t r) +const char *amqp_rpc_reply_string(amqp_rpc_reply_t r) { - const char *s; - switch (r.reply_type) { case AMQP_RESPONSE_NORMAL: - s = "normal response"; - break; + return strdup("normal response"); case AMQP_RESPONSE_NONE: - s = "missing RPC reply type"; - break; + return strdup("missing RPC reply type"); case AMQP_RESPONSE_LIBRARY_EXCEPTION: - if (r.library_errno) - s = strerror(r.library_errno); - else - s = "end of stream"; - - break; + return amqp_error_string(r.library_error); case AMQP_RESPONSE_SERVER_EXCEPTION: return amqp_server_exception_string(r); @@ -152,8 +154,6 @@ char *amqp_rpc_reply_string(amqp_rpc_reply_t r) default: abort(); } - - return strdup(s); } void die_rpc(amqp_rpc_reply_t r, const char *fmt, ...) @@ -169,16 +169,6 @@ void die_rpc(amqp_rpc_reply_t r, const char *fmt, ...) exit(1); } -void set_cloexec(int fd) -{ - int flags; - - flags = fcntl(fd, F_GETFD); - if (flags == -1 - || fcntl(fd, F_SETFD, (long)(flags | FD_CLOEXEC)) == -1) - die_errno(errno, "set_cloexec"); -} - static char *amqp_server = "localhost"; static char *amqp_vhost = "/"; static char *amqp_username = "guest"; @@ -222,14 +212,7 @@ amqp_connection_state_t make_connection(void) } s = amqp_open_socket(host, port ? port : 5672); - if (s < 0) { - if (s == -ENOENT) - die("unknown host %s", host); - else - die_errno(-s, "opening socket to %s", amqp_server); - } - - set_cloexec(s); + die_amqp_error(-s, "opening socket to %s", amqp_server); conn = amqp_new_connection(); amqp_set_sockfd(conn, s); @@ -247,16 +230,14 @@ amqp_connection_state_t make_connection(void) void close_connection(amqp_connection_state_t conn) { - int s = amqp_get_sockfd(conn); - + int res; die_rpc(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "closing channel"); die_rpc(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "closing connection"); - amqp_destroy_connection(conn); - - if (close(s) < 0) - die_errno(errno, "closing socket"); + + res = amqp_end_connection(conn); + die_amqp_error(-res, "closing connection"); } amqp_bytes_t read_all(int fd) @@ -307,8 +288,7 @@ void copy_body(amqp_connection_state_t conn, int fd) amqp_frame_t frame; int res = amqp_simple_wait_frame(conn, &frame); - if (res < 0) - die_errno(-res, "waiting for header frame"); + die_amqp_error(-res, "waiting for header frame"); if (frame.frame_type != AMQP_FRAME_HEADER) die("expected header, got frame type 0x%X", frame.frame_type); @@ -316,8 +296,7 @@ void copy_body(amqp_connection_state_t conn, int fd) body_remaining = frame.payload.properties.body_size; while (body_remaining) { res = amqp_simple_wait_frame(conn, &frame); - if (res < 0) - die_errno(-res, "waiting for body frame"); + die_amqp_error(-res, "waiting for body frame"); if (frame.frame_type != AMQP_FRAME_BODY) die("expected body, got frame type 0x%X", frame.frame_type); @@ -327,47 +306,6 @@ void copy_body(amqp_connection_state_t conn, int fd) } } -void pipeline(const char * const *argv, struct pipeline *pl) -{ - posix_spawn_file_actions_t file_acts; - - int pipefds[2]; - if (pipe(pipefds)) - die_errno(errno, "pipe"); - - die_errno(posix_spawn_file_actions_init(&file_acts), - "posix_spawn_file_actions_init"); - die_errno(posix_spawn_file_actions_adddup2(&file_acts, pipefds[0], 0), - "posix_spawn_file_actions_adddup2"); - die_errno(posix_spawn_file_actions_addclose(&file_acts, pipefds[0]), - "posix_spawn_file_actions_addclose"); - die_errno(posix_spawn_file_actions_addclose(&file_acts, pipefds[1]), - "posix_spawn_file_actions_addclose"); - - die_errno(posix_spawnp(&pl->pid, argv[0], &file_acts, NULL, - (char * const *)argv, environ), - "posix_spawnp: %s", argv[0]); - - die_errno(posix_spawn_file_actions_destroy(&file_acts), - "posix_spawn_file_actions_destroy"); - - if (close(pipefds[0])) - die_errno(errno, "close"); - - pl->infd = pipefds[1]; -} - -int finish_pipeline(struct pipeline *pl) -{ - int status; - - if (close(pl->infd)) - die_errno(errno, "close"); - if (waitpid(pl->pid, &status, 0) < 0) - die_errno(errno, "waitpid"); - return WIFEXITED(status) && WEXITSTATUS(status) == 0; -} - poptContext process_options(int argc, const char **argv, struct poptOption *options, const char *help) diff --git a/tools/common.h b/tools/common.h index 09a9242..0caee98 100644 --- a/tools/common.h +++ b/tools/common.h @@ -50,16 +50,20 @@ #include <stdint.h> +#include <popt.h> + #include <amqp.h> #include <amqp_framing.h> -extern char *amqp_server_exception_string(amqp_rpc_reply_t r); -extern char *amqp_rpc_reply_string(amqp_rpc_reply_t r); +extern const char *amqp_server_exception_string(amqp_rpc_reply_t r); +extern const char *amqp_rpc_reply_string(amqp_rpc_reply_t r); extern void die(const char *fmt, ...) __attribute__ ((format (printf, 1, 2))); extern void die_errno(int err, const char *fmt, ...) __attribute__ ((format (printf, 2, 3))); +extern void die_amqp_error(int err, const char *fmt, ...) + __attribute__ ((format (printf, 2, 3))); extern void die_rpc(amqp_rpc_reply_t r, const char *fmt, ...) __attribute__ ((format (printf, 2, 3))); @@ -73,14 +77,6 @@ extern void write_all(int fd, amqp_bytes_t data); extern void copy_body(amqp_connection_state_t conn, int fd); -struct pipeline { - int pid; - int infd; -}; - -extern void pipeline(const char * const *argv, struct pipeline *pl); -extern int finish_pipeline(struct pipeline *pl); - #define INCLUDE_OPTIONS(options) \ {NULL, 0, POPT_ARG_INCLUDE_TABLE, options, 0, options ## _title, NULL} diff --git a/tools/consume.c b/tools/consume.c index 40b61d1..2117bba 100644 --- a/tools/consume.c +++ b/tools/consume.c @@ -53,9 +53,8 @@ #include <stdio.h> #include <stdlib.h> -#include <popt.h> - #include "common.h" +#include "process.h" /* Convert a amqp_bytes_t to an escaped string form for printing. We use the same escaping conventions as rabbitmqctl. */ @@ -157,8 +156,7 @@ static void do_consume(amqp_connection_state_t conn, amqp_bytes_t queue, struct pipeline pl; uint64_t delivery_tag; int res = amqp_simple_wait_frame(conn, &frame); - if (res < 0) - die_errno(-res, "waiting for header frame"); + die_amqp_error(res, "waiting for header frame"); if (frame.frame_type != AMQP_FRAME_METHOD || frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD) @@ -172,8 +170,9 @@ static void do_consume(amqp_connection_state_t conn, amqp_bytes_t queue, copy_body(conn, pl.infd); if (finish_pipeline(&pl) && !no_ack) - die_errno(-amqp_basic_ack(conn, 1, delivery_tag, 0), - "basic.ack"); + die_amqp_error(amqp_basic_ack(conn, 1, delivery_tag, + 0), + "basic.ack"); amqp_maybe_release_buffers(conn); } diff --git a/tools/get.c b/tools/get.c index f746fd1..8f8e0d0 100644 --- a/tools/get.c +++ b/tools/get.c @@ -52,8 +52,6 @@ #include <stdio.h> -#include <popt.h> - #include "common.h" static int do_get(amqp_connection_state_t conn, char *queue) diff --git a/tools/publish.c b/tools/publish.c index 21314b2..0917dae 100644 --- a/tools/publish.c +++ b/tools/publish.c @@ -54,8 +54,6 @@ #include <stdlib.h> #include <string.h> -#include <popt.h> - #include "common.h" static void do_publish(amqp_connection_state_t conn, @@ -66,8 +64,7 @@ static void do_publish(amqp_connection_state_t conn, cstring_bytes(exchange), cstring_bytes(routing_key), 0, 0, props, body); - if (res != 0) - die_errno(-res, "basic.publish"); + die_amqp_error(res, "basic.publish"); } int main(int argc, const char **argv) diff --git a/tools/unix/process.c b/tools/unix/process.c new file mode 100644 index 0000000..8a02afb --- /dev/null +++ b/tools/unix/process.c @@ -0,0 +1,100 @@ +/* + * ***** BEGIN LICENSE BLOCK ***** + * Version: MPL 1.1/GPL 2.0 + * + * The contents of this file are subject to the Mozilla Public License + * Version 1.1 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * http://www.mozilla.org/MPL/ + * + * Software distributed under the License is distributed on an "AS IS" + * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See + * the License for the specific language governing rights and + * limitations under the License. + * + * The Original Code is librabbitmq. + * + * The Initial Developers of the Original Code are LShift Ltd, Cohesive + * Financial Technologies LLC, and Rabbit Technologies Ltd. Portions + * created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, Cohesive + * Financial Technologies LLC, or Rabbit Technologies Ltd are Copyright + * (C) 2007-2008 LShift Ltd, Cohesive Financial Technologies LLC, and + * Rabbit Technologies Ltd. + * + * Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift + * Ltd. Portions created by Cohesive Financial Technologies LLC are + * Copyright (C) 2007-2010 Cohesive Financial Technologies + * LLC. Portions created by Rabbit Technologies Ltd are Copyright (C) + * 2007-2010 Rabbit Technologies Ltd. + * + * Portions created by Tony Garnock-Jones are Copyright (C) 2009-2010 + * LShift Ltd and Tony Garnock-Jones. + * + * All Rights Reserved. + * + * Contributor(s): ______________________________________. + * + * Alternatively, the contents of this file may be used under the terms + * of the GNU General Public License Version 2 or later (the "GPL"), in + * which case the provisions of the GPL are applicable instead of those + * above. If you wish to allow use of your version of this file only + * under the terms of the GPL, and not to allow others to use your + * version of this file under the terms of the MPL, indicate your + * decision by deleting the provisions above and replace them with the + * notice and other provisions required by the GPL. If you do not + * delete the provisions above, a recipient may use your version of + * this file under the terms of any one of the MPL or the GPL. + * + * ***** END LICENSE BLOCK ***** + */ + +#include <unistd.h> +#include <errno.h> +#include <spawn.h> +#include <sys/wait.h> + +#include "common.h" +#include "process.h" + +extern char **environ; + +void pipeline(const char *const *argv, struct pipeline *pl) +{ + posix_spawn_file_actions_t file_acts; + + int pipefds[2]; + if (pipe(pipefds)) + die_errno(errno, "pipe"); + + die_errno(posix_spawn_file_actions_init(&file_acts), + "posix_spawn_file_actions_init"); + die_errno(posix_spawn_file_actions_adddup2(&file_acts, pipefds[0], 0), + "posix_spawn_file_actions_adddup2"); + die_errno(posix_spawn_file_actions_addclose(&file_acts, pipefds[0]), + "posix_spawn_file_actions_addclose"); + die_errno(posix_spawn_file_actions_addclose(&file_acts, pipefds[1]), + "posix_spawn_file_actions_addclose"); + + die_errno(posix_spawnp(&pl->pid, argv[0], &file_acts, NULL, + (char * const *)argv, environ), + "posix_spawnp: %s", argv[0]); + + die_errno(posix_spawn_file_actions_destroy(&file_acts), + "posix_spawn_file_actions_destroy"); + + if (close(pipefds[0])) + die_errno(errno, "close"); + + pl->infd = pipefds[1]; +} + +int finish_pipeline(struct pipeline *pl) +{ + int status; + + if (close(pl->infd)) + die_errno(errno, "close"); + if (waitpid(pl->pid, &status, 0) < 0) + die_errno(errno, "waitpid"); + return WIFEXITED(status) && WEXITSTATUS(status) == 0; +} diff --git a/tools/unix/process.h b/tools/unix/process.h new file mode 100644 index 0000000..ac2939d --- /dev/null +++ b/tools/unix/process.h @@ -0,0 +1,57 @@ +/* + * ***** BEGIN LICENSE BLOCK ***** + * Version: MPL 1.1/GPL 2.0 + * + * The contents of this file are subject to the Mozilla Public License + * Version 1.1 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * http://www.mozilla.org/MPL/ + * + * Software distributed under the License is distributed on an "AS IS" + * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See + * the License for the specific language governing rights and + * limitations under the License. + * + * The Original Code is librabbitmq. + * + * The Initial Developers of the Original Code are LShift Ltd, Cohesive + * Financial Technologies LLC, and Rabbit Technologies Ltd. Portions + * created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, Cohesive + * Financial Technologies LLC, or Rabbit Technologies Ltd are Copyright + * (C) 2007-2008 LShift Ltd, Cohesive Financial Technologies LLC, and + * Rabbit Technologies Ltd. + * + * Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift + * Ltd. Portions created by Cohesive Financial Technologies LLC are + * Copyright (C) 2007-2010 Cohesive Financial Technologies + * LLC. Portions created by Rabbit Technologies Ltd are Copyright (C) + * 2007-2010 Rabbit Technologies Ltd. + * + * Portions created by Tony Garnock-Jones are Copyright (C) 2009-2010 + * LShift Ltd and Tony Garnock-Jones. + * + * All Rights Reserved. + * + * Contributor(s): ______________________________________. + * + * Alternatively, the contents of this file may be used under the terms + * of the GNU General Public License Version 2 or later (the "GPL"), in + * which case the provisions of the GPL are applicable instead of those + * above. If you wish to allow use of your version of this file only + * under the terms of the GPL, and not to allow others to use your + * version of this file under the terms of the MPL, indicate your + * decision by deleting the provisions above and replace them with the + * notice and other provisions required by the GPL. If you do not + * delete the provisions above, a recipient may use your version of + * this file under the terms of any one of the MPL or the GPL. + * + * ***** END LICENSE BLOCK ***** + */ + +struct pipeline { + int pid; + int infd; +}; + +extern void pipeline(const char *const *argv, struct pipeline *pl); +extern int finish_pipeline(struct pipeline *pl); diff --git a/tools/windows/compat.c b/tools/windows/compat.c new file mode 100644 index 0000000..f0508b2 --- /dev/null +++ b/tools/windows/compat.c @@ -0,0 +1,73 @@ +/* + * ***** BEGIN LICENSE BLOCK ***** + * Version: MPL 1.1/GPL 2.0 + * + * The contents of this file are subject to the Mozilla Public License + * Version 1.1 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * http://www.mozilla.org/MPL/ + * + * Software distributed under the License is distributed on an "AS IS" + * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See + * the License for the specific language governing rights and + * limitations under the License. + * + * The Original Code is librabbitmq. + * + * The Initial Developers of the Original Code are LShift Ltd, Cohesive + * Financial Technologies LLC, and Rabbit Technologies Ltd. Portions + * created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, Cohesive + * Financial Technologies LLC, or Rabbit Technologies Ltd are Copyright + * (C) 2007-2008 LShift Ltd, Cohesive Financial Technologies LLC, and + * Rabbit Technologies Ltd. + * + * Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift + * Ltd. Portions created by Cohesive Financial Technologies LLC are + * Copyright (C) 2007-2010 Cohesive Financial Technologies + * LLC. Portions created by Rabbit Technologies Ltd are Copyright (C) + * 2007-2010 Rabbit Technologies Ltd. + * + * Portions created by Tony Garnock-Jones are Copyright (C) 2009-2010 + * LShift Ltd and Tony Garnock-Jones. + * + * All Rights Reserved. + * + * Contributor(s): ______________________________________. + * + * Alternatively, the contents of this file may be used under the terms + * of the GNU General Public License Version 2 or later (the "GPL"), in + * which case the provisions of the GPL are applicable instead of those + * above. If you wish to allow use of your version of this file only + * under the terms of the GPL, and not to allow others to use your + * version of this file under the terms of the MPL, indicate your + * decision by deleting the provisions above and replace them with the + * notice and other provisions required by the GPL. If you do not + * delete the provisions above, a recipient may use your version of + * this file under the terms of any one of the MPL or the GPL. + * + * ***** END LICENSE BLOCK ***** + */ + +#include <stdio.h> +#include <stdarg.h> +#include <stdlib.h> + +#include "compat.h" + +int asprintf(char **strp, const char *fmt, ...) +{ + va_list ap; + int len; + + va_start(ap, fmt); + len = _vscprintf(fmt, ap); + *strp = malloc(len+1); + if (!*strp) + return -1; + + len = vsprintf(*strp, fmt, ap); + *strp[len] = 0; + + va_end(ap); + return len; +} diff --git a/tools/windows/compat.h b/tools/windows/compat.h new file mode 100644 index 0000000..8211b37 --- /dev/null +++ b/tools/windows/compat.h @@ -0,0 +1,51 @@ +/* + * ***** BEGIN LICENSE BLOCK ***** + * Version: MPL 1.1/GPL 2.0 + * + * The contents of this file are subject to the Mozilla Public License + * Version 1.1 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * http://www.mozilla.org/MPL/ + * + * Software distributed under the License is distributed on an "AS IS" + * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See + * the License for the specific language governing rights and + * limitations under the License. + * + * The Original Code is librabbitmq. + * + * The Initial Developers of the Original Code are LShift Ltd, Cohesive + * Financial Technologies LLC, and Rabbit Technologies Ltd. Portions + * created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, Cohesive + * Financial Technologies LLC, or Rabbit Technologies Ltd are Copyright + * (C) 2007-2008 LShift Ltd, Cohesive Financial Technologies LLC, and + * Rabbit Technologies Ltd. + * + * Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift + * Ltd. Portions created by Cohesive Financial Technologies LLC are + * Copyright (C) 2007-2010 Cohesive Financial Technologies + * LLC. Portions created by Rabbit Technologies Ltd are Copyright (C) + * 2007-2010 Rabbit Technologies Ltd. + * + * Portions created by Tony Garnock-Jones are Copyright (C) 2009-2010 + * LShift Ltd and Tony Garnock-Jones. + * + * All Rights Reserved. + * + * Contributor(s): ______________________________________. + * + * Alternatively, the contents of this file may be used under the terms + * of the GNU General Public License Version 2 or later (the "GPL"), in + * which case the provisions of the GPL are applicable instead of those + * above. If you wish to allow use of your version of this file only + * under the terms of the GPL, and not to allow others to use your + * version of this file under the terms of the MPL, indicate your + * decision by deleting the provisions above and replace them with the + * notice and other provisions required by the GPL. If you do not + * delete the provisions above, a recipient may use your version of + * this file under the terms of any one of the MPL or the GPL. + * + * ***** END LICENSE BLOCK ***** + */ + +extern int asprintf(char **strp, const char *fmt, ...); diff --git a/tools/windows/process.c b/tools/windows/process.c new file mode 100644 index 0000000..9a0b893 --- /dev/null +++ b/tools/windows/process.c @@ -0,0 +1,204 @@ +/* + * ***** BEGIN LICENSE BLOCK ***** + * Version: MPL 1.1/GPL 2.0 + * + * The contents of this file are subject to the Mozilla Public License + * Version 1.1 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * http://www.mozilla.org/MPL/ + * + * Software distributed under the License is distributed on an "AS IS" + * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See + * the License for the specific language governing rights and + * limitations under the License. + * + * The Original Code is librabbitmq. + * + * The Initial Developers of the Original Code are LShift Ltd, Cohesive + * Financial Technologies LLC, and Rabbit Technologies Ltd. Portions + * created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, Cohesive + * Financial Technologies LLC, or Rabbit Technologies Ltd are Copyright + * (C) 2007-2008 LShift Ltd, Cohesive Financial Technologies LLC, and + * Rabbit Technologies Ltd. + * + * Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift + * Ltd. Portions created by Cohesive Financial Technologies LLC are + * Copyright (C) 2007-2010 Cohesive Financial Technologies + * LLC. Portions created by Rabbit Technologies Ltd are Copyright (C) + * 2007-2010 Rabbit Technologies Ltd. + * + * Portions created by Tony Garnock-Jones are Copyright (C) 2009-2010 + * LShift Ltd and Tony Garnock-Jones. + * + * All Rights Reserved. + * + * Contributor(s): ______________________________________. + * + * Alternatively, the contents of this file may be used under the terms + * of the GNU General Public License Version 2 or later (the "GPL"), in + * which case the provisions of the GPL are applicable instead of those + * above. If you wish to allow use of your version of this file only + * under the terms of the GPL, and not to allow others to use your + * version of this file under the terms of the MPL, indicate your + * decision by deleting the provisions above and replace them with the + * notice and other provisions required by the GPL. If you do not + * delete the provisions above, a recipient may use your version of + * this file under the terms of any one of the MPL or the GPL. + * + * ***** END LICENSE BLOCK ***** + */ + +#include <stdio.h> +#include <io.h> +#include <windows.h> + +#include "common.h" +#include "process.h" + +void die_windows_error(const char *fmt, ...) +{ + char *msg; + + va_list ap; + va_start(ap, fmt); + vfprintf(stderr, fmt, ap); + va_end(ap); + + if (!FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM + | FORMAT_MESSAGE_ALLOCATE_BUFFER, + NULL, GetLastError(), + MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), + (LPSTR)&msg, 0, NULL)) + msg = "(failed to retrieving Windows error message)"; + + fprintf(stderr, ": %s\n", msg); + exit(1); +} + +static char *make_command_line(const char *const *argv) +{ + int i; + size_t len = 1; /* initial quotes */ + char *buf; + char *dest; + + /* calculate the length of the required buffer, making worst + case assumptions for simplicity */ + for (i = 0;;) { + len += strlen(argv[i]) * 2; + + if (!argv[++i]) + break; + + len += 3; /* quotes, space, quotes */ + } + + len += 2; /* final quotes and the terminating zero */ + + dest = buf = malloc(len); + if (!buf) + die("allocating memory for subprocess command line"); + + *dest++ = '\"'; + + for (i = 0;;) { + const char *src = argv[i]; + for (;;) { + switch (*src) { + case 0: + goto done; + + case '\"': + case '\\': + *dest++ = '\\'; + /* fall through */ + + default: + *dest++ = *src++; + break; + } + } + done: + + if (!argv[++i]) + break; + + *dest++ = '\"'; + *dest++ = ' '; + *dest++ = '\"'; + } + + *dest++ = '\"'; + *dest++ = 0; + return buf; +} + +void pipeline(const char *const *argv, struct pipeline *pl) +{ + HANDLE in_read_handle, in_write_handle; + SECURITY_ATTRIBUTES sec_attr; + PROCESS_INFORMATION proc_info; + STARTUPINFO start_info; + char *cmdline = make_command_line(argv); + + sec_attr.nLength = sizeof sec_attr; + sec_attr.bInheritHandle = TRUE; + sec_attr.lpSecurityDescriptor = NULL; + + if (!CreatePipe(&in_read_handle, &in_write_handle, &sec_attr, 0)) + die_windows_error("CreatePipe"); + + if (!SetHandleInformation(in_write_handle, HANDLE_FLAG_INHERIT, 0)) + die_windows_error("SetHandleInformation"); + + /* when in Rome... */ + ZeroMemory(&proc_info, sizeof proc_info); + ZeroMemory(&start_info, sizeof start_info); + + start_info.cb = sizeof start_info; + start_info.dwFlags |= STARTF_USESTDHANDLES; + + if ((start_info.hStdError = GetStdHandle(STD_ERROR_HANDLE)) + == INVALID_HANDLE_VALUE + || (start_info.hStdOutput = GetStdHandle(STD_OUTPUT_HANDLE)) + == INVALID_HANDLE_VALUE) + die_windows_error("GetStdHandle"); + + start_info.hStdInput = in_read_handle; + + if (!CreateProcess(NULL, cmdline, NULL, NULL, TRUE, 0, + NULL, NULL, &start_info, &proc_info)) + die_windows_error("CreateProcess"); + + if (!CloseHandle(proc_info.hThread)) + die_windows_error("CloseHandle for thread"); + if (!CloseHandle(in_read_handle)) + die_windows_error("CloseHandle"); + + pl->proc_handle = proc_info.hProcess; + pl->infd = _open_osfhandle((intptr_t)in_write_handle, 0); +} + +int finish_pipeline(struct pipeline *pl) +{ + DWORD code; + + if (close(pl->infd)) + die_errno(errno, "close"); + + for (;;) { + if (!GetExitCodeProcess(pl->proc_handle, &code)) + die_windows_error("GetExitCodeProcess"); + if (code != STILL_ACTIVE) + break; + + if (WaitForSingleObject(pl->proc_handle, INFINITE) + == WAIT_FAILED) + die_windows_error("WaitForSingleObject"); + } + + if (!CloseHandle(pl->proc_handle)) + die_windows_error("CloseHandle for process"); + + return code; +} diff --git a/tools/windows/process.h b/tools/windows/process.h new file mode 100644 index 0000000..df276a7 --- /dev/null +++ b/tools/windows/process.h @@ -0,0 +1,59 @@ +/* + * ***** BEGIN LICENSE BLOCK ***** + * Version: MPL 1.1/GPL 2.0 + * + * The contents of this file are subject to the Mozilla Public License + * Version 1.1 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * http://www.mozilla.org/MPL/ + * + * Software distributed under the License is distributed on an "AS IS" + * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See + * the License for the specific language governing rights and + * limitations under the License. + * + * The Original Code is librabbitmq. + * + * The Initial Developers of the Original Code are LShift Ltd, Cohesive + * Financial Technologies LLC, and Rabbit Technologies Ltd. Portions + * created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, Cohesive + * Financial Technologies LLC, or Rabbit Technologies Ltd are Copyright + * (C) 2007-2008 LShift Ltd, Cohesive Financial Technologies LLC, and + * Rabbit Technologies Ltd. + * + * Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift + * Ltd. Portions created by Cohesive Financial Technologies LLC are + * Copyright (C) 2007-2010 Cohesive Financial Technologies + * LLC. Portions created by Rabbit Technologies Ltd are Copyright (C) + * 2007-2010 Rabbit Technologies Ltd. + * + * Portions created by Tony Garnock-Jones are Copyright (C) 2009-2010 + * LShift Ltd and Tony Garnock-Jones. + * + * All Rights Reserved. + * + * Contributor(s): ______________________________________. + * + * Alternatively, the contents of this file may be used under the terms + * of the GNU General Public License Version 2 or later (the "GPL"), in + * which case the provisions of the GPL are applicable instead of those + * above. If you wish to allow use of your version of this file only + * under the terms of the GPL, and not to allow others to use your + * version of this file under the terms of the MPL, indicate your + * decision by deleting the provisions above and replace them with the + * notice and other provisions required by the GPL. If you do not + * delete the provisions above, a recipient may use your version of + * this file under the terms of any one of the MPL or the GPL. + * + * ***** END LICENSE BLOCK ***** + */ + +#include <windef.h> + +struct pipeline { + HANDLE proc_handle; + int infd; +}; + +extern void pipeline(const char *const *argv, struct pipeline *pl); +extern int finish_pipeline(struct pipeline *pl); |