diff options
author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-11-17 13:24:07 +0000 |
---|---|---|
committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-11-17 13:24:07 +0000 |
commit | 024a50f566496535cdfe5af0e70916a052845822 (patch) | |
tree | 1c3b07f7932f81d46a2117dbc732a92e1a1931de | |
parent | 5b41470ca661d528dbafadb5865eaaa2a4342c21 (diff) | |
parent | 56a9a6779917bf0dc4432a39dc9a533af65d12a2 (diff) | |
download | rabbitmq-c-github-ask-024a50f566496535cdfe5af0e70916a052845822.tar.gz |
merge bug23423 into default
37 files changed, 2507 insertions, 1468 deletions
diff --git a/README.windows b/README.windows index 33c8f06..836a3be 100644 --- a/README.windows +++ b/README.windows @@ -1,76 +1,106 @@ -# rabbitmq-c and Windows +# Using rabbitmq-c on Windows -rabbitmq-c can now be built on Windows using the MinGW/MSYS ports of -the GNU toolchain and miscellaneous utilities. This includes the -example programs and tools. +There are two approaches to building rabbitmq-c under Windows: -The results are native Windows DLLs and EXEs, and can be used without -having MinGW installed. But the librabbitmq header files currently -use GCC extensions, and for this reason it is still not possible to -use Microsoft's C/C++ to build applications against the librabbitmq -DLL. Hopefully this will get fixed before long. +- Build using the MinGW/MSYS (MinGW/MSYS is a port of the GNU + toolchain and utilities to Windows, including the gcc compiler). + The results of building in this way are native Windows DLLs and + EXEs, and can be used without having MinGW installed. The drawback + to this approach is that you cannot safely call the resulting + librabbitmq DLL from code compiled with Microsoft's C compiler. The + advantage is that the whole of rabbitmq-c can be built under + Windows, including the tools. +- Build using Microsoft's C compiler. You will still need to install + MinGW/MSYS in order to run the rabbitmq-c build scripts, but + Microsoft's compiler is used to compile the code. The resulting + librabbitmq DLL can be used from code compiled with Microsoft's C + compiler (i.e. code developed in Visual Studio). The downside to + this approach is that the rabbitmq-c tools cannot be built, due to + dependencies on other libraries. -# Building rabbitmq-c -rabbitmq-c is built on Windows using MinGW and MSYS. In brief, MinGW -is a native port of the GNU toolchain to Windows; MSYS is a set of -ports of common GNU utilities to run under Windows, so that typical -autotools-based builds will work there. MinGW/MSYS can be used to -build native Windows applications and DLLs, which do not depend on -MinGW/MSYS to run. +## Common steps -So to build rabbitmq-c on Windows, you need to download and install -the relevant parts of MinGW/MSYS. This can be fairly time consuming - -there are dozens of files to be downloaded and unpacked. To make it -easier, we provide a bash script that automates this process, in -rabbitmq-c/etc/install-mingw.sh. You can run this under cygwin, or -under Linux and copy the results over or put them on a shared drive. -Some MinGW packages are .tar.lzma files, so it requires a system with -the xz compression utility and a tar that supports the -J option. +With either of the approaches, the initial steps are the same: You +should download and install MinGW/MSYS and Python. -Run install-mingw.sh specifying the destination directory, e.g. +Installing installing the relevant parts of MinGW/MSYS can be fairly +time consuming - there are dozens of files to be downloaded and +unpacked. To make it easier, we provide a bash script that automates +this process, in `rabbitmq-c/etc/install-mingw.sh`. You can run this +script under cygwin or Linux (obviously if you use Linux you'll need +to transfer the resulting files over to the Windows machine). + +Note that some MinGW packages are .tar.lzma files, so it requires a +system with the xz compression utility and a tar that supports the -J +option. Recent cygwin and Linux distros should be fine here. + +Run the install-mingw.sh script specifying the destination directory, +e.g. $ etc/install-mingw.sh mingw -Python is needed for the rabbitmq-c build, so you will also need to -install python under Windows. The Windows installer from python.org +This will download all the required MinGW/MSYS packages, and unpack +them into the `mingw` directory. + +The other prerequisite for the rabbitmq-c build is Python. The +Windows installer from python.org for the latest 2.x version of Python will do fine. -You will need to copy the source code for rabbitmq-c and -rabbitmq-codegen somewhere under your mingw directory. +You will also need to copy the source code for rabbitmq-c and +rabbitmq-codegen somewhere under your `mingw` directory. -Open a cmd window, and ensure that both the MinGW bin directory and -the python install directory are in the path, e.g. +Then to start the MSYS bash shell, open a `cmd` window, and ensure +that both the MinGW bin directory and the python install directory are +in the path, e.g. - C:\>set PATH=%PATH%;C:\mingw\bin;C:\Python26 + C:\>set PATH=%PATH%;C:\mingw\bin;C:\Python27 Then start bash, and run the following mount command (substituting the -Windows path of your MinGW install): +Windows path of your MinGW install if it isn't `C:\mingw`): C:\>bash bash-3.1$ mount 'C:\mingw' /mingw -Then go to the rabbitmq-c directory. If you got the rabbitmq-c -directory from Mercurial (which is the only way to get it at the -moment), you will need to run autoreconf to produce the configuration -scripts: +Finally, go to wherever you copied the rabbitmq-c source. + + bash-3.1$ cd /rabbitmq-c + + +## Building rabbitmq-c with Microsoft's C compiler + +The Microsoft C/C++ compiler is part of MS Visual Studio, including +the gratis Visual Studio Express. Visual Studio 2005 and higher are +known to work. + +Start by following the steps in the previous section. The GNU build +tools have limited support for Microsoft toolchain, but the +install-mingw.sh script will install versions of the packages that are +known to be suitable. In particular, only libtool version 2.2.7a is +known to work; later versions have been reported to introduce +problems. + +Once you are at the bash prompt, build rabbitmq-c by running the +script in `rabbitmq-c/etc/build-ms.sh`: - bash-3.1$ autoreconf -i + bash-3.1$ etc/build-ms.sh -This will produce a few lines of informational output while it runs, -but as long as it doesn't mention any errors, you are ok. +You should end up with a directory `build` containing the librabbitmq +DLL, the corresponding .lib file, and header files. These are +sufficient to create applications using librabbitmq within Visual +Studio. -Finally, configure and make: - bash-3.1$ ./configure && make - [...] +## Building rabbitmq-c with gcc +There is no script to build rabbitmq-c with gcc, but it is as +documented in the README file: -# Running the tools without mingw + bash-3.1$ autoreconf -i && ./configure && make -You can run the resulting tools EXEs without the rest of MinGW. To do -this, copy the following files into a directory: +You can run the resulting tool EXEs without needing the rest of MinGW. To do +this, copy the following files into a single directory: - rabbitmq-c/tools/.libs/*.exe diff --git a/configure.ac b/configure.ac index be7e695..4077362 100644 --- a/configure.ac +++ b/configure.ac @@ -15,9 +15,9 @@ AM_PROG_LIBTOOL dnl Header-file checks AC_HEADER_STDC -dnl Only use -Wall if we have gcc if test "x$GCC" = "xyes"; then - if test -z "`echo "$CFLAGS" | grep "\-Wall" 2> /dev/null`" ; then + dnl Only use -Wall if we have gcc + if ! echo "$CFLAGS" | grep "\-Wall" 2> /dev/null ; then CFLAGS="$CFLAGS -Wall" fi fi @@ -35,6 +35,16 @@ AS_IF([test "x$windows" = xyes], [AC_DEFINE([WINDOWS], [1], [Define to 1 if on Windows.])] ) +AM_CONDITIONAL(GCC, test "x$GCC" = xyes) + +# Detect how to declare inline functions. Because we will sometimes +# use "-ansi -pedantic" with gcc, we need to make sure the result will +# work in that context. +orig_cflags="$CFLAGS" +AS_IF([test "x$GCC" = "xyes"], [CFLAGS="$CFLAGS -ansi -pedantic"]) +AC_C_INLINE +CFLAGS="$orig_cflags" + dnl Decide which API abstraction layer to use PLATFORM_DIR=unix if test "x$windows" = xyes ; then @@ -87,10 +97,22 @@ AC_SUBST(AMQP_CODEGEN_DIR) AC_SUBST(AMQP_SPEC_JSON_PATH) AC_SUBST(PYTHON) -dnl Decide which extra win32 libs we need +dnl Decide which extra win32 libs we need, and handle other special +dnl cases when building with the Microsoft compiler EXTRA_LIBS= -AS_IF([test "x$windows" = xyes], [EXTRA_LIBS="-lws2_32 $EXTRA_LIBS"]) +USE_MISINTTYPES= +AS_IF([test "x$windows" = xyes], + [ + AS_IF([test "x$GCC" = xyes], + [EXTRA_LIBS="-lws2_32 $EXTRA_LIBS"], + [ + EXTRA_LIBS="ws2_32.lib $EXTRA_LIBS" + USE_MSINTTYPES=yes + ]) + ]) + AC_SUBST(EXTRA_LIBS) +AM_CONDITIONAL(USE_MSINTTYPES, test "x$USE_MSINTTYPES" != "x") dnl Check for libpopt, which we need to build the tools AC_ARG_WITH([popt], diff --git a/etc/build-ms.sh b/etc/build-ms.sh new file mode 100755 index 0000000..4a70b35 --- /dev/null +++ b/etc/build-ms.sh @@ -0,0 +1,62 @@ +#!/bin/bash + +# Build rabbitmq-c using Microsoft's C compiler + +set -e + +# Locate the necessary lib and include directories + +drive=$(echo "$SYSTEMDRIVE" | sed 's|^\([A-Za-z]\):$|/\1|') + +for vsvers in 10.0 9.0 8 ; do + vsdir="$drive/Program Files/Microsoft Visual Studio $vsvers" + [ -x "$vsdir/VC/bin/cl.exe" ] && break + + vsdir="$drive/Program Files (x86)/Microsoft Visual Studio $vsvers" + [ -x "$vsdir/VC/bin/cl.exe" ] && break + + vsdir= +done + +if [ -z "$vsdir" ] ; then + echo "Couldn't find suitable Visual Studio installation" + exit 1 +fi + +echo "Using Visual Studio install at $vsdir" + +for sdkpath in "Microsoft SDKs/Windows/"{v7.0A,v6.0A} "Microsoft Visual Studio 8/VC/PlatformSDK" ; do + sdkdir="$drive/Program Files/$sdkpath" + [ -d "$sdkdir/lib" -a -d "$sdkdir/include" ] && break + + sdkdir="$drive/Program Files (x86)/$sdkpath" + [ -d "$sdkdir/lib" -a -d "$sdkdir/include" ] && break + + sdkdir= +done + +if [ -z "$sdkdir" ] ; then + echo "Couldn't find suitable Windows SDK installation" + exit 1 +fi + +echo "Using Windows SDK install at $sdkdir" + +PATH="$PATH:$vsdir/VC/bin:$vsdir/Common7/IDE" +LIB="$vsdir/VC/lib:$sdkdir/lib" +INCLUDE="$vsdir/VC/include:$sdkdir/include" +export PATH LIB INCLUDE + +# Do the build +set -x +autoreconf -i +./configure CC=cl.exe LD=link.exe CFLAGS='-nologo' +sed -i -e 's/^fix_srcfile_path=.*$/fix_srcfile_path=""/;s/^deplibs_check_method=.*$/deplibs_check_method=pass_all/;/^archive_cmds=/s/-link -dll/& -implib:\\$libname.\\$libext/' libtool +make + +# Copy the results of the build into one place, as "make install" +# isn't too useful here. +mkdir -p build/lib build/include build/bin +cp -a librabbitmq/.libs/*.dll examples/.libs/*.exe build/bin +cp -a msinttypes/*.h librabbitmq/amqp.h librabbitmq/amqp_framing.h build/include +cp -a librabbitmq/*.exp librabbitmq/*.lib build/lib diff --git a/examples/Makefile.am b/examples/Makefile.am index 9655021..ea10f77 100644 --- a/examples/Makefile.am +++ b/examples/Makefile.am @@ -1,16 +1,29 @@ -noinst_PROGRAMS = amqp_sendstring amqp_exchange_declare amqp_listen amqp_producer amqp_consumer \ - amqp_unbind amqp_bind amqp_listenq +noinst_PROGRAMS = amqp_sendstring amqp_exchange_declare amqp_listen amqp_producer amqp_consumer amqp_unbind amqp_bind amqp_listenq AM_CFLAGS = -I$(top_srcdir)/librabbitmq + +if GCC +# Because we want to build under Microsoft's C compiler (for which +# there is apparently no demand for C99 support), it's a good idea +# to have gcc tell us when we stray from the old standard. +AM_CFLAGS += -ansi -pedantic +endif + +if USE_MSINTTYPES +AM_CFLAGS += -I$(top_srcdir)/msinttypes +endif + AM_LDFLAGS = $(top_builddir)/librabbitmq/librabbitmq.la -noinst_HEADERS = example_utils.h +noinst_HEADERS = utils.h + +COMMON_SOURCES = utils.c $(PLATFORM_DIR)/platform_utils.c -amqp_sendstring_SOURCES = amqp_sendstring.c example_utils.c -amqp_exchange_declare_SOURCES = amqp_exchange_declare.c example_utils.c -amqp_listen_SOURCES = amqp_listen.c example_utils.c -amqp_producer_SOURCES = amqp_producer.c example_utils.c -amqp_consumer_SOURCES = amqp_consumer.c example_utils.c -amqp_unbind_SOURCES = amqp_unbind.c example_utils.c -amqp_bind_SOURCES = amqp_bind.c example_utils.c -amqp_listenq_SOURCES = amqp_listenq.c example_utils.c +amqp_sendstring_SOURCES = amqp_sendstring.c $(COMMON_SOURCES) +amqp_exchange_declare_SOURCES = amqp_exchange_declare.c $(COMMON_SOURCES) +amqp_listen_SOURCES = amqp_listen.c $(COMMON_SOURCES) +amqp_producer_SOURCES = amqp_producer.c $(COMMON_SOURCES) +amqp_consumer_SOURCES = amqp_consumer.c $(COMMON_SOURCES) +amqp_unbind_SOURCES = amqp_unbind.c $(COMMON_SOURCES) +amqp_bind_SOURCES = amqp_bind.c $(COMMON_SOURCES) +amqp_listenq_SOURCES = amqp_listenq.c $(COMMON_SOURCES) diff --git a/examples/amqp_bind.c b/examples/amqp_bind.c index 1f183a5..561b84d 100644 --- a/examples/amqp_bind.c +++ b/examples/amqp_bind.c @@ -56,9 +56,7 @@ #include <amqp.h> #include <amqp_framing.h> -#include <unistd.h> - -#include "example_utils.h" +#include "utils.h" int main(int argc, char const * const *argv) { char const *hostname; @@ -94,7 +92,7 @@ int main(int argc, char const * const *argv) { amqp_cstring_bytes(queue), amqp_cstring_bytes(exchange), amqp_cstring_bytes(bindingkey), - AMQP_EMPTY_TABLE); + amqp_empty_table); die_on_amqp_error(amqp_get_rpc_reply(conn), "Unbinding"); die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel"); diff --git a/examples/amqp_consumer.c b/examples/amqp_consumer.c index b1754f5..2710643 100644 --- a/examples/amqp_consumer.c +++ b/examples/amqp_consumer.c @@ -56,27 +56,26 @@ #include <amqp.h> #include <amqp_framing.h> -#include <unistd.h> #include <assert.h> -#include "example_utils.h" +#include "utils.h" #define SUMMARY_EVERY_US 1000000 static void run(amqp_connection_state_t conn) { - long long start_time = now_microseconds(); + uint64_t start_time = now_microseconds(); int received = 0; int previous_received = 0; - long long previous_report_time = start_time; - long long next_summary_time = start_time + SUMMARY_EVERY_US; + uint64_t previous_report_time = start_time; + uint64_t next_summary_time = start_time + SUMMARY_EVERY_US; amqp_frame_t frame; int result; size_t body_received; size_t body_target; - long long now; + uint64_t now; while (1) { now = now_microseconds(); @@ -150,8 +149,8 @@ int main(int argc, char const * const *argv) { hostname = argv[1]; port = atoi(argv[2]); - exchange = "amq.direct"; //argv[3]; - bindingkey = "test queue"; //argv[4]; + exchange = "amq.direct"; /* argv[3]; */ + bindingkey = "test queue"; /* argv[4]; */ conn = amqp_new_connection(); @@ -163,8 +162,8 @@ int main(int argc, char const * const *argv) { die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel"); { - amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, 1, AMQP_EMPTY_BYTES, 0, 0, 0, 1, - AMQP_EMPTY_TABLE); + amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, 1, amqp_empty_bytes, 0, 0, 0, 1, + amqp_empty_table); die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring queue"); queuename = amqp_bytes_malloc_dup(r->queue); if (queuename.bytes == NULL) { @@ -174,10 +173,10 @@ int main(int argc, char const * const *argv) { } amqp_queue_bind(conn, 1, queuename, amqp_cstring_bytes(exchange), amqp_cstring_bytes(bindingkey), - AMQP_EMPTY_TABLE); + amqp_empty_table); die_on_amqp_error(amqp_get_rpc_reply(conn), "Binding queue"); - amqp_basic_consume(conn, 1, queuename, AMQP_EMPTY_BYTES, 0, 1, 0, AMQP_EMPTY_TABLE); + amqp_basic_consume(conn, 1, queuename, amqp_empty_bytes, 0, 1, 0, amqp_empty_table); die_on_amqp_error(amqp_get_rpc_reply(conn), "Consuming"); run(conn); diff --git a/examples/amqp_exchange_declare.c b/examples/amqp_exchange_declare.c index e77ac52..9b410ca 100644 --- a/examples/amqp_exchange_declare.c +++ b/examples/amqp_exchange_declare.c @@ -56,9 +56,7 @@ #include <amqp.h> #include <amqp_framing.h> -#include <unistd.h> - -#include "example_utils.h" +#include "utils.h" int main(int argc, char const * const *argv) { char const *hostname; @@ -89,7 +87,7 @@ int main(int argc, char const * const *argv) { die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel"); amqp_exchange_declare(conn, 1, amqp_cstring_bytes(exchange), amqp_cstring_bytes(exchangetype), - 0, 0, AMQP_EMPTY_TABLE); + 0, 0, amqp_empty_table); die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring exchange"); die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel"); diff --git a/examples/amqp_listen.c b/examples/amqp_listen.c index f208652..86c4555 100644 --- a/examples/amqp_listen.c +++ b/examples/amqp_listen.c @@ -56,13 +56,9 @@ #include <amqp.h> #include <amqp_framing.h> -#include <unistd.h> #include <assert.h> -#include "example_utils.h" - -/* Private: compiled out in NDEBUG mode */ -extern void amqp_dump(void const *buffer, size_t len); +#include "utils.h" int main(int argc, char const * const *argv) { char const *hostname; @@ -95,8 +91,8 @@ int main(int argc, char const * const *argv) { die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel"); { - amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, 1, AMQP_EMPTY_BYTES, 0, 0, 0, 1, - AMQP_EMPTY_TABLE); + amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, 1, amqp_empty_bytes, 0, 0, 0, 1, + amqp_empty_table); die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring queue"); queuename = amqp_bytes_malloc_dup(r->queue); if (queuename.bytes == NULL) { @@ -106,10 +102,10 @@ int main(int argc, char const * const *argv) { } amqp_queue_bind(conn, 1, queuename, amqp_cstring_bytes(exchange), amqp_cstring_bytes(bindingkey), - AMQP_EMPTY_TABLE); + amqp_empty_table); die_on_amqp_error(amqp_get_rpc_reply(conn), "Binding queue"); - amqp_basic_consume(conn, 1, queuename, AMQP_EMPTY_BYTES, 0, 1, 0, AMQP_EMPTY_TABLE); + amqp_basic_consume(conn, 1, queuename, amqp_empty_bytes, 0, 1, 0, amqp_empty_table); die_on_amqp_error(amqp_get_rpc_reply(conn), "Consuming"); { diff --git a/examples/amqp_listenq.c b/examples/amqp_listenq.c index 98c389f..c7c2a1e 100644 --- a/examples/amqp_listenq.c +++ b/examples/amqp_listenq.c @@ -56,13 +56,9 @@ #include <amqp.h> #include <amqp_framing.h> -#include <unistd.h> #include <assert.h> -#include "example_utils.h" - -/* Private: compiled out in NDEBUG mode */ -extern void amqp_dump(void const *buffer, size_t len); +#include "utils.h" int main(int argc, char const * const *argv) { char const *hostname; @@ -90,7 +86,7 @@ int main(int argc, char const * const *argv) { amqp_channel_open(conn, 1); die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel"); - amqp_basic_consume(conn, 1, amqp_cstring_bytes(queuename), AMQP_EMPTY_BYTES, 0, 0, 0, AMQP_EMPTY_TABLE); + amqp_basic_consume(conn, 1, amqp_cstring_bytes(queuename), amqp_empty_bytes, 0, 0, 0, amqp_empty_table); die_on_amqp_error(amqp_get_rpc_reply(conn), "Consuming"); { diff --git a/examples/amqp_producer.c b/examples/amqp_producer.c index b83e030..77c3e92 100644 --- a/examples/amqp_producer.c +++ b/examples/amqp_producer.c @@ -56,9 +56,7 @@ #include <amqp.h> #include <amqp_framing.h> -#include <unistd.h> - -#include "example_utils.h" +#include "utils.h" #define SUMMARY_EVERY_US 1000000 @@ -67,21 +65,26 @@ static void send_batch(amqp_connection_state_t conn, int rate_limit, int message_count) { - long long start_time = now_microseconds(); + uint64_t start_time = now_microseconds(); int i; int sent = 0; int previous_sent = 0; - long long previous_report_time = start_time; - long long next_summary_time = start_time + SUMMARY_EVERY_US; + uint64_t previous_report_time = start_time; + uint64_t next_summary_time = start_time + SUMMARY_EVERY_US; char message[256]; + amqp_bytes_t message_bytes; for (i = 0; i < sizeof(message); i++) { message[i] = i & 0xff; } + message_bytes.len = sizeof(message); + message_bytes.bytes = message; + for (i = 0; i < message_count; i++) { - long long now = now_microseconds(); + uint64_t now = now_microseconds(); + die_on_error(amqp_basic_publish(conn, 1, amqp_cstring_bytes("amq.direct"), @@ -89,7 +92,7 @@ static void send_batch(amqp_connection_state_t conn, 0, 0, NULL, - (amqp_bytes_t) {.len = sizeof(message), .bytes = message}), + message_bytes), "Publishing"); sent++; if (now > next_summary_time) { @@ -104,13 +107,13 @@ static void send_batch(amqp_connection_state_t conn, } while (((i * 1000000.0) / (now - start_time)) > rate_limit) { - usleep(2000); + microsleep(2000); now = now_microseconds(); } } { - long long stop_time = now_microseconds(); + uint64_t stop_time = now_microseconds(); int total_delta = stop_time - start_time; printf("PRODUCER - Message count: %d\n", message_count); diff --git a/examples/amqp_sendstring.c b/examples/amqp_sendstring.c index ccd3866..3c94e41 100644 --- a/examples/amqp_sendstring.c +++ b/examples/amqp_sendstring.c @@ -56,9 +56,7 @@ #include <amqp.h> #include <amqp_framing.h> -#include <unistd.h> - -#include "example_utils.h" +#include "utils.h" int main(int argc, char const * const *argv) { char const *hostname; @@ -94,7 +92,7 @@ int main(int argc, char const * const *argv) { amqp_basic_properties_t props; props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG; props.content_type = amqp_cstring_bytes("text/plain"); - props.delivery_mode = 2; // persistent delivery mode + props.delivery_mode = 2; /* persistent delivery mode */ die_on_error(amqp_basic_publish(conn, 1, amqp_cstring_bytes(exchange), diff --git a/examples/amqp_unbind.c b/examples/amqp_unbind.c index 4b92e12..fad3e2b 100644 --- a/examples/amqp_unbind.c +++ b/examples/amqp_unbind.c @@ -56,9 +56,7 @@ #include <amqp.h> #include <amqp_framing.h> -#include <unistd.h> - -#include "example_utils.h" +#include "utils.h" int main(int argc, char const * const *argv) { char const *hostname; @@ -94,7 +92,7 @@ int main(int argc, char const * const *argv) { amqp_cstring_bytes(queue), amqp_cstring_bytes(exchange), amqp_cstring_bytes(bindingkey), - AMQP_EMPTY_TABLE); + amqp_empty_table); die_on_amqp_error(amqp_get_rpc_reply(conn), "Unbinding"); die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel"); diff --git a/examples/example_utils.c b/examples/unix/platform_utils.c index 48f21f9..b039c31 100644 --- a/examples/example_utils.c +++ b/examples/unix/platform_utils.c @@ -48,69 +48,22 @@ * ***** END LICENSE BLOCK ***** */ -#include <stdlib.h> -#include <stdio.h> -#include <string.h> +/* For usleep */ +#define _BSD_SOURCE #include <stdint.h> -#include <amqp.h> -#include <amqp_framing.h> #include <sys/time.h> #include <unistd.h> -void die_on_error(int x, char const *context) { - if (x < 0) { - char *errstr = amqp_error_string(-x); - fprintf(stderr, "%s: %s\n", context, errstr); - free(errstr); - exit(1); - } -} - -void die_on_amqp_error(amqp_rpc_reply_t x, char const *context) { - switch (x.reply_type) { - case AMQP_RESPONSE_NORMAL: - return; - - case AMQP_RESPONSE_NONE: - fprintf(stderr, "%s: missing RPC reply type!", context); - break; - - case AMQP_RESPONSE_LIBRARY_EXCEPTION: - fprintf(stderr, "%s: %s\n", context, amqp_error_string(x.library_error)); - break; - - case AMQP_RESPONSE_SERVER_EXCEPTION: - switch (x.reply.id) { - case AMQP_CONNECTION_CLOSE_METHOD: { - amqp_connection_close_t *m = (amqp_connection_close_t *) x.reply.decoded; - fprintf(stderr, "%s: server connection error %d, message: %.*s", - context, - m->reply_code, - (int) m->reply_text.len, (char *) m->reply_text.bytes); - break; - } - case AMQP_CHANNEL_CLOSE_METHOD: { - amqp_channel_close_t *m = (amqp_channel_close_t *) x.reply.decoded; - fprintf(stderr, "%s: server channel error %d, message: %.*s", - context, - m->reply_code, - (int) m->reply_text.len, (char *) m->reply_text.bytes); - break; - } - default: - fprintf(stderr, "%s: unknown server error, method id 0x%08X", context, x.reply.id); - break; - } - break; - } - - exit(1); -} - -long long now_microseconds(void) { +uint64_t now_microseconds(void) +{ struct timeval tv; gettimeofday(&tv, NULL); - return (long long) tv.tv_sec * 1000000 + (long long) tv.tv_usec; + return (uint64_t) tv.tv_sec * 1000000 + (uint64_t) tv.tv_usec; +} + +void microsleep(int usec) +{ + usleep(usec); } diff --git a/librabbitmq/amqp_debug.c b/examples/utils.c index 7258696..1837be8 100644 --- a/librabbitmq/amqp_debug.c +++ b/examples/utils.c @@ -48,12 +48,67 @@ * ***** END LICENSE BLOCK ***** */ -#include <stdio.h> #include <stdlib.h> +#include <stdio.h> #include <string.h> - #include <ctype.h> +#include <stdint.h> +#include <amqp.h> +#include <amqp_framing.h> + +#include "utils.h" + +void die_on_error(int x, char const *context) { + if (x < 0) { + char *errstr = amqp_error_string(-x); + fprintf(stderr, "%s: %s\n", context, errstr); + free(errstr); + exit(1); + } +} + +void die_on_amqp_error(amqp_rpc_reply_t x, char const *context) { + switch (x.reply_type) { + case AMQP_RESPONSE_NORMAL: + return; + + case AMQP_RESPONSE_NONE: + fprintf(stderr, "%s: missing RPC reply type!\n", context); + break; + + case AMQP_RESPONSE_LIBRARY_EXCEPTION: + fprintf(stderr, "%s: %s\n", context, amqp_error_string(x.library_error)); + break; + + case AMQP_RESPONSE_SERVER_EXCEPTION: + switch (x.reply.id) { + case AMQP_CONNECTION_CLOSE_METHOD: { + amqp_connection_close_t *m = (amqp_connection_close_t *) x.reply.decoded; + fprintf(stderr, "%s: server connection error %d, message: %.*s\n", + context, + m->reply_code, + (int) m->reply_text.len, (char *) m->reply_text.bytes); + break; + } + case AMQP_CHANNEL_CLOSE_METHOD: { + amqp_channel_close_t *m = (amqp_channel_close_t *) x.reply.decoded; + fprintf(stderr, "%s: server channel error %d, message: %.*s\n", + context, + m->reply_code, + (int) m->reply_text.len, (char *) m->reply_text.bytes); + break; + } + default: + fprintf(stderr, "%s: unknown server error, method id 0x%08X\n", context, x.reply.id); + break; + } + break; + } + + exit(1); +} + static void dump_row(long count, int numinrow, int *chs) { int i; diff --git a/examples/example_utils.h b/examples/utils.h index 3dd8ec1..e5ac9a3 100644 --- a/examples/example_utils.h +++ b/examples/utils.h @@ -1,5 +1,5 @@ -#ifndef librabbitmq_examples_example_utils_h -#define librabbitmq_examples_example_utils_h +#ifndef librabbitmq_examples_utils_h +#define librabbitmq_examples_utils_h /* * ***** BEGIN LICENSE BLOCK ***** @@ -54,6 +54,9 @@ extern void die_on_error(int x, char const *context); extern void die_on_amqp_error(amqp_rpc_reply_t x, char const *context); -extern long long now_microseconds(void); +extern void amqp_dump(void const *buffer, size_t len); + +extern uint64_t now_microseconds(void); +extern void microsleep(int usec); #endif diff --git a/examples/windows/platform_utils.c b/examples/windows/platform_utils.c new file mode 100644 index 0000000..915dd27 --- /dev/null +++ b/examples/windows/platform_utils.c @@ -0,0 +1,66 @@ +/* + * ***** BEGIN LICENSE BLOCK ***** + * Version: MPL 1.1/GPL 2.0 + * + * The contents of this file are subject to the Mozilla Public License + * Version 1.1 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * http://www.mozilla.org/MPL/ + * + * Software distributed under the License is distributed on an "AS IS" + * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See + * the License for the specific language governing rights and + * limitations under the License. + * + * The Original Code is librabbitmq. + * + * The Initial Developers of the Original Code are LShift Ltd, Cohesive + * Financial Technologies LLC, and Rabbit Technologies Ltd. Portions + * created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, Cohesive + * Financial Technologies LLC, or Rabbit Technologies Ltd are Copyright + * (C) 2007-2008 LShift Ltd, Cohesive Financial Technologies LLC, and + * Rabbit Technologies Ltd. + * + * Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift + * Ltd. Portions created by Cohesive Financial Technologies LLC are + * Copyright (C) 2007-2009 Cohesive Financial Technologies + * LLC. Portions created by Rabbit Technologies Ltd are Copyright (C) + * 2007-2009 Rabbit Technologies Ltd. + * + * Portions created by Tony Garnock-Jones are Copyright (C) 2009-2010 + * LShift Ltd and Tony Garnock-Jones. + * + * All Rights Reserved. + * + * Contributor(s): ______________________________________. + * + * Alternatively, the contents of this file may be used under the terms + * of the GNU General Public License Version 2 or later (the "GPL"), in + * which case the provisions of the GPL are applicable instead of those + * above. If you wish to allow use of your version of this file only + * under the terms of the GPL, and not to allow others to use your + * version of this file under the terms of the MPL, indicate your + * decision by deleting the provisions above and replace them with the + * notice and other provisions required by the GPL. If you do not + * delete the provisions above, a recipient may use your version of + * this file under the terms of any one of the MPL or the GPL. + * + * ***** END LICENSE BLOCK ***** + */ + +#include <stdint.h> + +#include <windows.h> + +uint64_t now_microseconds(void) +{ + FILETIME ft; + GetSystemTimeAsFileTime(&ft); + return (((uint64_t)ft.dwHighDateTime << 32) | (uint64_t)ft.dwLowDateTime) + / 10; +} + +void microsleep(int usec) +{ + Sleep(usec / 1000); +} diff --git a/librabbitmq/Makefile.am b/librabbitmq/Makefile.am index 82b9f30..a0f8987 100644 --- a/librabbitmq/Makefile.am +++ b/librabbitmq/Makefile.am @@ -1,7 +1,19 @@ lib_LTLIBRARIES = librabbitmq.la -AM_CFLAGS = -I$(srcdir)/$(PLATFORM_DIR) -librabbitmq_la_SOURCES = amqp_mem.c amqp_table.c amqp_connection.c amqp_socket.c amqp_debug.c amqp_api.c $(PLATFORM_DIR)/socket.c +AM_CFLAGS = -I$(srcdir)/$(PLATFORM_DIR) -DBUILDING_LIBRABBITMQ + +if GCC +# Because we want to build under Microsoft's C compiler (for which +# there is apparently no demand for C99 support), it's a good idea +# to have gcc tell us when we stray from the old standard. +AM_CFLAGS += -ansi -pedantic +endif + +if USE_MSINTTYPES +AM_CFLAGS += -I$(top_srcdir)/msinttypes +endif + +librabbitmq_la_SOURCES = amqp_mem.c amqp_table.c amqp_connection.c amqp_socket.c amqp_api.c $(PLATFORM_DIR)/socket.c librabbitmq_la_LDFLAGS = -no-undefined librabbitmq_la_LIBADD = $(EXTRA_LIBS) nodist_librabbitmq_la_SOURCES = amqp_framing.c diff --git a/librabbitmq/amqp.h b/librabbitmq/amqp.h index 40c8292..1541583 100644 --- a/librabbitmq/amqp.h +++ b/librabbitmq/amqp.h @@ -55,6 +55,16 @@ extern "C" { #endif +#ifdef _WIN32 +#ifdef BUILDING_LIBRABBITMQ +#define RABBITMQ_EXPORT extern __declspec(dllexport) +#else +#define RABBITMQ_EXPORT extern __declspec(dllimport) +#endif +#else +#define RABBITMQ_EXPORT extern +#endif + typedef int amqp_boolean_t; typedef uint32_t amqp_method_number_t; typedef uint32_t amqp_flags_t; @@ -65,29 +75,21 @@ typedef struct amqp_bytes_t_ { void *bytes; } amqp_bytes_t; -#define AMQP_EMPTY_BYTES ((amqp_bytes_t) { .len = 0, .bytes = NULL }) - typedef struct amqp_decimal_t_ { - int decimals; + uint8_t decimals; uint32_t value; } amqp_decimal_t; -#define AMQP_DECIMAL(d,v) ((amqp_decimal_t) { .decimals = (d), .value = (v) }) - typedef struct amqp_table_t_ { int num_entries; struct amqp_table_entry_t_ *entries; } amqp_table_t; -#define AMQP_EMPTY_TABLE ((amqp_table_t) { .num_entries = 0, .entries = NULL }) - typedef struct amqp_array_t_ { int num_entries; struct amqp_field_value_t_ *entries; } amqp_array_t; -#define AMQP_EMPTY_ARRAY ((amqp_array_t) { .num_entries = 0, .entries = NULL }) - /* 0-9 0-9-1 Qpid/Rabbit Type Remarks --------------------------------------------------------------------------- @@ -128,7 +130,7 @@ the code. */ typedef struct amqp_field_value_t_ { - char kind; + uint8_t kind; union { amqp_boolean_t boolean; int8_t i8; @@ -162,6 +164,7 @@ typedef enum { AMQP_FIELD_KIND_I32 = 'I', AMQP_FIELD_KIND_U32 = 'i', AMQP_FIELD_KIND_I64 = 'l', + AMQP_FIELD_KIND_U64 = 'L', AMQP_FIELD_KIND_F32 = 'f', AMQP_FIELD_KIND_F64 = 'd', AMQP_FIELD_KIND_DECIMAL = 'D', @@ -170,47 +173,9 @@ typedef enum { AMQP_FIELD_KIND_TIMESTAMP = 'T', AMQP_FIELD_KIND_TABLE = 'F', AMQP_FIELD_KIND_VOID = 'V', - AMQP_FIELD_KIND_BYTES = 'x', + AMQP_FIELD_KIND_BYTES = 'x' } amqp_field_value_kind_t; -#define _AMQP_TEINIT(ke,ki,v) {.key = (ke), .value = {.kind = AMQP_FIELD_KIND_##ki, .value = {v}}} -#define AMQP_TABLE_ENTRY_BOOLEAN(k,v) _AMQP_TEINIT(amqp_cstring_bytes(k), BOOLEAN, .boolean = (v)) -#define AMQP_TABLE_ENTRY_I8(k,v) _AMQP_TEINIT(amqp_cstring_bytes(k), I8, .i8 = (v)) -#define AMQP_TABLE_ENTRY_U8(k,v) _AMQP_TEINIT(amqp_cstring_bytes(k), U8, .u8 = (v)) -#define AMQP_TABLE_ENTRY_I16(k,v) _AMQP_TEINIT(amqp_cstring_bytes(k), I16, .i16 = (v)) -#define AMQP_TABLE_ENTRY_U16(k,v) _AMQP_TEINIT(amqp_cstring_bytes(k), U16, .u16 = (v)) -#define AMQP_TABLE_ENTRY_I32(k,v) _AMQP_TEINIT(amqp_cstring_bytes(k), I32, .i32 = (v)) -#define AMQP_TABLE_ENTRY_U32(k,v) _AMQP_TEINIT(amqp_cstring_bytes(k), U32, .u32 = (v)) -#define AMQP_TABLE_ENTRY_I64(k,v) _AMQP_TEINIT(amqp_cstring_bytes(k), I64, .i64 = (v)) -#define AMQP_TABLE_ENTRY_F32(k,v) _AMQP_TEINIT(amqp_cstring_bytes(k), F32, .f32 = (v)) -#define AMQP_TABLE_ENTRY_F64(k,v) _AMQP_TEINIT(amqp_cstring_bytes(k), F64, .f64 = (v)) -#define AMQP_TABLE_ENTRY_DECIMAL(k,v) _AMQP_TEINIT(amqp_cstring_bytes(k), DECIMAL, .decimal = (v)) -#define AMQP_TABLE_ENTRY_UTF8(k,v) _AMQP_TEINIT(amqp_cstring_bytes(k), UTF8, .bytes = (v)) -#define AMQP_TABLE_ENTRY_ARRAY(k,v) _AMQP_TEINIT(amqp_cstring_bytes(k), ARRAY, .array = (v)) -#define AMQP_TABLE_ENTRY_TIMESTAMP(k,v) _AMQP_TEINIT(amqp_cstring_bytes(k), TIMESTAMP, .u64 = (v)) -#define AMQP_TABLE_ENTRY_TABLE(k,v) _AMQP_TEINIT(amqp_cstring_bytes(k), TABLE, .table = (v)) -#define AMQP_TABLE_ENTRY_VOID(k) _AMQP_TEINIT(amqp_cstring_bytes(k), VOID, .u8 = 0) -#define AMQP_TABLE_ENTRY_BYTES(k,v) _AMQP_TEINIT(amqp_cstring_bytes(k), BYTES, .bytes = (v)) - -#define _AMQP_FVINIT(ki,v) {.kind = AMQP_FIELD_KIND_##ki, .value = {v}} -#define AMQP_FIELD_VALUE_BOOLEAN(v) _AMQP_FVINIT(BOOLEAN, .boolean = (v)) -#define AMQP_FIELD_VALUE_I8(v) _AMQP_FVINIT(I8, .i8 = (v)) -#define AMQP_FIELD_VALUE_U8(v) _AMQP_FVINIT(U8, .u8 = (v)) -#define AMQP_FIELD_VALUE_I16(v) _AMQP_FVINIT(I16, .i16 = (v)) -#define AMQP_FIELD_VALUE_U16(v) _AMQP_FVINIT(U16, .u16 = (v)) -#define AMQP_FIELD_VALUE_I32(v) _AMQP_FVINIT(I32, .i32 = (v)) -#define AMQP_FIELD_VALUE_U32(v) _AMQP_FVINIT(U32, .u32 = (v)) -#define AMQP_FIELD_VALUE_I64(v) _AMQP_FVINIT(I64, .i64 = (v)) -#define AMQP_FIELD_VALUE_F32(v) _AMQP_FVINIT(F32, .f32 = (v)) -#define AMQP_FIELD_VALUE_F64(v) _AMQP_FVINIT(F64, .f64 = (v)) -#define AMQP_FIELD_VALUE_DECIMAL(v) _AMQP_FVINIT(DECIMAL, .decimal = (v)) -#define AMQP_FIELD_VALUE_UTF8(v) _AMQP_FVINIT(UTF8, .bytes = (v)) -#define AMQP_FIELD_VALUE_ARRAY(v) _AMQP_FVINIT(ARRAY, .array = (v)) -#define AMQP_FIELD_VALUE_TIMESTAMP(v) _AMQP_FVINIT(TIMESTAMP, .u64 = (v)) -#define AMQP_FIELD_VALUE_TABLE(v) _AMQP_FVINIT(TABLE, .table = (v)) -#define AMQP_FIELD_VALUE_VOID(k) _AMQP_FVINIT(VOID, .u8 = 0) -#define AMQP_FIELD_VALUE_BYTES(v) _AMQP_FVINIT(BYTES, .bytes = (v)) - typedef struct amqp_pool_blocklist_t_ { int num_blocks; void **blocklist; @@ -270,211 +235,198 @@ typedef enum amqp_sasl_method_enum_ { AMQP_SASL_METHOD_PLAIN = 0 } amqp_sasl_method_enum; -#define AMQP_PSEUDOFRAME_PROTOCOL_HEADER ((uint8_t) 'A') -#define AMQP_PSEUDOFRAME_PROTOCOL_CHANNEL ((amqp_channel_t) ((((int) 'M') << 8) | ((int) 'Q'))) - -typedef int (*amqp_output_fn_t)(void *context, void *buffer, size_t count); - /* Opaque struct. */ typedef struct amqp_connection_state_t_ *amqp_connection_state_t; -extern char const *amqp_version(void); - -extern void init_amqp_pool(amqp_pool_t *pool, size_t pagesize); -extern void recycle_amqp_pool(amqp_pool_t *pool); -extern void empty_amqp_pool(amqp_pool_t *pool); - -extern void *amqp_pool_alloc(amqp_pool_t *pool, size_t amount); -extern void amqp_pool_alloc_bytes(amqp_pool_t *pool, size_t amount, amqp_bytes_t *output); - -extern amqp_bytes_t amqp_cstring_bytes(char const *cstr); -extern amqp_bytes_t amqp_bytes_malloc_dup(amqp_bytes_t src); -extern amqp_bytes_t amqp_bytes_malloc(size_t amount); -extern void amqp_bytes_free(amqp_bytes_t bytes); - -#define AMQP_BYTES_FREE(b) \ - ({ \ - if ((b).bytes != NULL) { \ - free((b).bytes); \ - (b).bytes = NULL; \ - } \ - }) - -extern amqp_connection_state_t amqp_new_connection(void); -extern int amqp_get_sockfd(amqp_connection_state_t state); -extern void amqp_set_sockfd(amqp_connection_state_t state, - int sockfd); -extern int amqp_tune_connection(amqp_connection_state_t state, - int channel_max, - int frame_max, - int heartbeat); -extern int amqp_get_channel_max(amqp_connection_state_t state); -extern int amqp_destroy_connection(amqp_connection_state_t state); - -extern int amqp_handle_input(amqp_connection_state_t state, - amqp_bytes_t received_data, - amqp_frame_t *decoded_frame); - -extern amqp_boolean_t amqp_release_buffers_ok(amqp_connection_state_t state); - -extern void amqp_release_buffers(amqp_connection_state_t state); - -extern void amqp_maybe_release_buffers(amqp_connection_state_t state); - -extern int amqp_send_frame(amqp_connection_state_t state, - amqp_frame_t const *frame); -extern int amqp_send_frame_to(amqp_connection_state_t state, - amqp_frame_t const *frame, - amqp_output_fn_t fn, - void *context); - -extern int amqp_table_entry_cmp(void const *entry1, void const *entry2); - -extern int amqp_open_socket(char const *hostname, int portnumber); - -extern int amqp_send_header(amqp_connection_state_t state); -extern int amqp_send_header_to(amqp_connection_state_t state, - amqp_output_fn_t fn, - void *context); - -extern amqp_boolean_t amqp_frames_enqueued(amqp_connection_state_t state); - -extern int amqp_simple_wait_frame(amqp_connection_state_t state, - amqp_frame_t *decoded_frame); - -extern int amqp_simple_wait_method(amqp_connection_state_t state, - amqp_channel_t expected_channel, - amqp_method_number_t expected_method, - amqp_method_t *output); - -extern int amqp_send_method(amqp_connection_state_t state, - amqp_channel_t channel, - amqp_method_number_t id, - void *decoded); - -extern amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state, - amqp_channel_t channel, - amqp_method_number_t request_id, - amqp_method_number_t *expected_reply_ids, - void *decoded_request_method); - -#define AMQP_EXPAND_METHOD(classname, methodname) (AMQP_ ## classname ## _ ## methodname ## _METHOD) - -#define AMQP_SIMPLE_RPC(state, channel, classname, requestname, replyname, structname, ...) \ - ({ \ - structname _simple_rpc_request___ = (structname) { __VA_ARGS__ }; \ - amqp_method_number_t _replies__[2] = { AMQP_EXPAND_METHOD(classname, replyname), 0}; \ - amqp_simple_rpc(state, channel, \ - AMQP_EXPAND_METHOD(classname, requestname), \ - (amqp_method_number_t *)&_replies__, \ - &_simple_rpc_request___); \ - }) - -#define AMQP_MULTIPLE_RESPONSE_RPC(state, channel, classname, requestname, replynames, structname, ...) \ - ({ \ - structname _simple_rpc_request___ = (structname) { __VA_ARGS__ }; \ - amqp_simple_rpc(state, channel, \ - AMQP_EXPAND_METHOD(classname, requestname), \ - replynames, \ - &_simple_rpc_request___); \ - }) - - -extern amqp_rpc_reply_t amqp_login(amqp_connection_state_t state, - char const *vhost, - int channel_max, - int frame_max, - int heartbeat, - amqp_sasl_method_enum sasl_method, ...); - -extern struct amqp_channel_open_ok_t_ *amqp_channel_open(amqp_connection_state_t state, - amqp_channel_t channel); +RABBITMQ_EXPORT char const *amqp_version(void); + +/* Exported empty data structures */ +RABBITMQ_EXPORT const amqp_bytes_t amqp_empty_bytes; +RABBITMQ_EXPORT const amqp_table_t amqp_empty_table; +RABBITMQ_EXPORT const amqp_array_t amqp_empty_array; + +/* Compatibility macros for the above, to avoid the need to update + code written against earlier versions of librabbitmq. */ +#define AMQP_EMPTY_BYTES amqp_empty_bytes +#define AMQP_EMPTY_TABLE amqp_empty_table +#define AMQP_EMPTY_ARRAY amqp_empty_array + +RABBITMQ_EXPORT void init_amqp_pool(amqp_pool_t *pool, size_t pagesize); +RABBITMQ_EXPORT void recycle_amqp_pool(amqp_pool_t *pool); +RABBITMQ_EXPORT void empty_amqp_pool(amqp_pool_t *pool); + +RABBITMQ_EXPORT void *amqp_pool_alloc(amqp_pool_t *pool, size_t amount); +RABBITMQ_EXPORT void amqp_pool_alloc_bytes(amqp_pool_t *pool, + size_t amount, amqp_bytes_t *output); + +RABBITMQ_EXPORT amqp_bytes_t amqp_cstring_bytes(char const *cstr); +RABBITMQ_EXPORT amqp_bytes_t amqp_bytes_malloc_dup(amqp_bytes_t src); +RABBITMQ_EXPORT amqp_bytes_t amqp_bytes_malloc(size_t amount); +RABBITMQ_EXPORT void amqp_bytes_free(amqp_bytes_t bytes); + +RABBITMQ_EXPORT amqp_connection_state_t amqp_new_connection(void); +RABBITMQ_EXPORT int amqp_get_sockfd(amqp_connection_state_t state); +RABBITMQ_EXPORT void amqp_set_sockfd(amqp_connection_state_t state, + int sockfd); +RABBITMQ_EXPORT int amqp_tune_connection(amqp_connection_state_t state, + int channel_max, + int frame_max, + int heartbeat); +RABBITMQ_EXPORT int amqp_get_channel_max(amqp_connection_state_t state); +RABBITMQ_EXPORT int amqp_destroy_connection(amqp_connection_state_t state); + +RABBITMQ_EXPORT int amqp_handle_input(amqp_connection_state_t state, + amqp_bytes_t received_data, + amqp_frame_t *decoded_frame); + +RABBITMQ_EXPORT amqp_boolean_t amqp_release_buffers_ok( + amqp_connection_state_t state); + +RABBITMQ_EXPORT void amqp_release_buffers(amqp_connection_state_t state); + +RABBITMQ_EXPORT void amqp_maybe_release_buffers(amqp_connection_state_t state); + +RABBITMQ_EXPORT int amqp_send_frame(amqp_connection_state_t state, + amqp_frame_t const *frame); + +RABBITMQ_EXPORT int amqp_table_entry_cmp(void const *entry1, + void const *entry2); + +RABBITMQ_EXPORT int amqp_open_socket(char const *hostname, + int portnumber); + +RABBITMQ_EXPORT int amqp_send_header(amqp_connection_state_t state); + +RABBITMQ_EXPORT amqp_boolean_t amqp_frames_enqueued( + amqp_connection_state_t state); + +RABBITMQ_EXPORT int amqp_simple_wait_frame(amqp_connection_state_t state, + amqp_frame_t *decoded_frame); + +RABBITMQ_EXPORT int amqp_simple_wait_method(amqp_connection_state_t state, + amqp_channel_t expected_channel, + amqp_method_number_t expected_method, + amqp_method_t *output); + +RABBITMQ_EXPORT int amqp_send_method(amqp_connection_state_t state, + amqp_channel_t channel, + amqp_method_number_t id, + void *decoded); + +RABBITMQ_EXPORT amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state, + amqp_channel_t channel, + amqp_method_number_t request_id, + amqp_method_number_t *expected_reply_ids, + void *decoded_request_method); + +RABBITMQ_EXPORT amqp_rpc_reply_t amqp_login(amqp_connection_state_t state, + char const *vhost, + int channel_max, + int frame_max, + int heartbeat, + amqp_sasl_method_enum sasl_method, ...); + +RABBITMQ_EXPORT struct amqp_channel_open_ok_t_ *amqp_channel_open( + amqp_connection_state_t state, + amqp_channel_t channel); struct amqp_basic_properties_t_; -extern int amqp_basic_publish(amqp_connection_state_t state, - amqp_channel_t channel, - amqp_bytes_t exchange, - amqp_bytes_t routing_key, - amqp_boolean_t mandatory, - amqp_boolean_t immediate, - struct amqp_basic_properties_t_ const *properties, - amqp_bytes_t body); - -extern amqp_rpc_reply_t amqp_channel_close(amqp_connection_state_t state, - amqp_channel_t channel, - int code); -extern amqp_rpc_reply_t amqp_connection_close(amqp_connection_state_t state, - int code); - -extern struct amqp_exchange_declare_ok_t_ *amqp_exchange_declare(amqp_connection_state_t state, - amqp_channel_t channel, - amqp_bytes_t exchange, - amqp_bytes_t type, - amqp_boolean_t passive, - amqp_boolean_t durable, - amqp_table_t arguments); - -extern struct amqp_queue_declare_ok_t_ *amqp_queue_declare(amqp_connection_state_t state, - amqp_channel_t channel, - amqp_bytes_t queue, - amqp_boolean_t passive, - amqp_boolean_t durable, - amqp_boolean_t exclusive, - amqp_boolean_t auto_delete, - amqp_table_t arguments); - -extern struct amqp_queue_delete_ok_t_ *amqp_queue_delete(amqp_connection_state_t state, - amqp_channel_t channel, - amqp_bytes_t queue, - amqp_boolean_t if_unused, - amqp_boolean_t if_empty); - -extern struct amqp_queue_bind_ok_t_ *amqp_queue_bind(amqp_connection_state_t state, - amqp_channel_t channel, - amqp_bytes_t queue, - amqp_bytes_t exchange, - amqp_bytes_t routing_key, - amqp_table_t arguments); - -extern struct amqp_queue_unbind_ok_t_ *amqp_queue_unbind(amqp_connection_state_t state, - amqp_channel_t channel, - amqp_bytes_t queue, - amqp_bytes_t exchange, - amqp_bytes_t routing_key, - amqp_table_t arguments); - -extern struct amqp_basic_consume_ok_t_ *amqp_basic_consume(amqp_connection_state_t state, - amqp_channel_t channel, - amqp_bytes_t queue, - amqp_bytes_t consumer_tag, - amqp_boolean_t no_local, - amqp_boolean_t no_ack, - amqp_boolean_t exclusive, - amqp_table_t filter); - -extern int amqp_basic_ack(amqp_connection_state_t state, - amqp_channel_t channel, - uint64_t delivery_tag, - amqp_boolean_t multiple); - -extern amqp_rpc_reply_t amqp_basic_get(amqp_connection_state_t state, - amqp_channel_t channel, - amqp_bytes_t queue, - amqp_boolean_t no_ack); - -extern struct amqp_queue_purge_ok_t_ *amqp_queue_purge(amqp_connection_state_t state, - amqp_channel_t channel, - amqp_bytes_t queue, - amqp_boolean_t no_wait); - -extern struct amqp_tx_select_ok_t_ *amqp_tx_select(amqp_connection_state_t state, - amqp_channel_t channel); - -extern struct amqp_tx_commit_ok_t_ *amqp_tx_commit(amqp_connection_state_t state, - amqp_channel_t channel); - -extern struct amqp_tx_rollback_ok_t_ *amqp_tx_rollback(amqp_connection_state_t state, - amqp_channel_t channel); +RABBITMQ_EXPORT int amqp_basic_publish(amqp_connection_state_t state, + amqp_channel_t channel, + amqp_bytes_t exchange, + amqp_bytes_t routing_key, + amqp_boolean_t mandatory, + amqp_boolean_t immediate, + struct amqp_basic_properties_t_ const *properties, + amqp_bytes_t body); + +RABBITMQ_EXPORT amqp_rpc_reply_t amqp_channel_close( + amqp_connection_state_t state, + amqp_channel_t channel, + int code); +RABBITMQ_EXPORT amqp_rpc_reply_t amqp_connection_close( + amqp_connection_state_t state, + int code); + +RABBITMQ_EXPORT struct amqp_exchange_declare_ok_t_ *amqp_exchange_declare( + amqp_connection_state_t state, + amqp_channel_t channel, + amqp_bytes_t exchange, + amqp_bytes_t type, + amqp_boolean_t passive, + amqp_boolean_t durable, + amqp_table_t arguments); + +RABBITMQ_EXPORT struct amqp_queue_declare_ok_t_ *amqp_queue_declare( + amqp_connection_state_t state, + amqp_channel_t channel, + amqp_bytes_t queue, + amqp_boolean_t passive, + amqp_boolean_t durable, + amqp_boolean_t exclusive, + amqp_boolean_t auto_delete, + amqp_table_t arguments); + +RABBITMQ_EXPORT struct amqp_queue_delete_ok_t_ *amqp_queue_delete( + amqp_connection_state_t state, + amqp_channel_t channel, + amqp_bytes_t queue, + amqp_boolean_t if_unused, + amqp_boolean_t if_empty); + +RABBITMQ_EXPORT struct amqp_queue_bind_ok_t_ *amqp_queue_bind( + amqp_connection_state_t state, + amqp_channel_t channel, + amqp_bytes_t queue, + amqp_bytes_t exchange, + amqp_bytes_t routing_key, + amqp_table_t arguments); + +RABBITMQ_EXPORT struct amqp_queue_unbind_ok_t_ *amqp_queue_unbind( + amqp_connection_state_t state, + amqp_channel_t channel, + amqp_bytes_t queue, + amqp_bytes_t exchange, + amqp_bytes_t routing_key, + amqp_table_t arguments); + +RABBITMQ_EXPORT struct amqp_basic_consume_ok_t_ *amqp_basic_consume( + amqp_connection_state_t state, + amqp_channel_t channel, + amqp_bytes_t queue, + amqp_bytes_t consumer_tag, + amqp_boolean_t no_local, + amqp_boolean_t no_ack, + amqp_boolean_t exclusive, + amqp_table_t filter); + +RABBITMQ_EXPORT int amqp_basic_ack(amqp_connection_state_t state, + amqp_channel_t channel, + uint64_t delivery_tag, + amqp_boolean_t multiple); + +RABBITMQ_EXPORT amqp_rpc_reply_t amqp_basic_get(amqp_connection_state_t state, + amqp_channel_t channel, + amqp_bytes_t queue, + amqp_boolean_t no_ack); + +RABBITMQ_EXPORT struct amqp_queue_purge_ok_t_ *amqp_queue_purge( + amqp_connection_state_t state, + amqp_channel_t channel, + amqp_bytes_t queue, + amqp_boolean_t no_wait); + +RABBITMQ_EXPORT struct amqp_tx_select_ok_t_ *amqp_tx_select( + amqp_connection_state_t state, + amqp_channel_t channel); + +RABBITMQ_EXPORT struct amqp_tx_commit_ok_t_ *amqp_tx_commit( + amqp_connection_state_t state, + amqp_channel_t channel); + +RABBITMQ_EXPORT struct amqp_tx_rollback_ok_t_ *amqp_tx_rollback( + amqp_connection_state_t state, + amqp_channel_t channel); /* * Can be used to see if there is data still in the buffer, if so @@ -483,7 +435,8 @@ extern struct amqp_tx_rollback_ok_t_ *amqp_tx_rollback(amqp_connection_state_t s * * Possibly amqp_frames_enqueued should be used for this? */ -extern amqp_boolean_t amqp_data_in_buffer(amqp_connection_state_t state); +RABBITMQ_EXPORT amqp_boolean_t amqp_data_in_buffer( + amqp_connection_state_t state); /* * For those API operations (such as amqp_basic_ack, @@ -498,7 +451,8 @@ extern amqp_boolean_t amqp_data_in_buffer(amqp_connection_state_t state); * generally do NOT update this per-connection-global amqp_rpc_reply_t * instance. */ -extern amqp_rpc_reply_t amqp_get_rpc_reply(amqp_connection_state_t state); +RABBITMQ_EXPORT amqp_rpc_reply_t amqp_get_rpc_reply( + amqp_connection_state_t state); /* * Get the error string for the given error code. @@ -506,7 +460,16 @@ extern amqp_rpc_reply_t amqp_get_rpc_reply(amqp_connection_state_t state); * The returned string resides on the heap; the caller is responsible * for freeing it. */ -extern char *amqp_error_string(int err); +RABBITMQ_EXPORT char *amqp_error_string(int err); + +RABBITMQ_EXPORT int amqp_decode_table(amqp_bytes_t encoded, + amqp_pool_t *pool, + amqp_table_t *output, + size_t *offset); + +RABBITMQ_EXPORT int amqp_encode_table(amqp_bytes_t encoded, + amqp_table_t *input, + size_t *offset); #ifdef __cplusplus } diff --git a/librabbitmq/amqp_api.c b/librabbitmq/amqp_api.c index b2793ff..bf19761 100644 --- a/librabbitmq/amqp_api.c +++ b/librabbitmq/amqp_api.c @@ -52,6 +52,7 @@ #include <stdio.h> #include <string.h> #include <stdint.h> +#include <stdarg.h> #include "amqp.h" #include "amqp_framing.h" @@ -69,6 +70,12 @@ static const char *client_error_strings[ERROR_MAX] = { "connection closed unexpectedly", /* ERROR_CONNECTION_CLOSED */ }; +/* strdup is not in ISO C90! */ +static inline char *strdup(const char *str) +{ + return strcpy(malloc(strlen(str) + 1),str); +} + char *amqp_error_string(int err) { const char *str; @@ -93,6 +100,20 @@ char *amqp_error_string(int err) return strdup(str); } +void amqp_abort(const char *fmt, ...) +{ + va_list ap; + va_start(ap, fmt); + vfprintf(stderr, fmt, ap); + va_end(ap); + fputc('\n', stderr); + abort(); +} + +const amqp_bytes_t amqp_empty_bytes = { 0, NULL }; +const amqp_table_t amqp_empty_table = { 0, NULL }; +const amqp_array_t amqp_empty_array = { 0, NULL }; + #define RPC_REPLY(replytype) \ (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL \ ? (replytype *) state->most_recent_api_result.reply.decoded \ @@ -101,11 +122,18 @@ char *amqp_error_string(int err) amqp_channel_open_ok_t *amqp_channel_open(amqp_connection_state_t state, amqp_channel_t channel) { - state->most_recent_api_result = - AMQP_SIMPLE_RPC(state, channel, CHANNEL, OPEN, OPEN_OK, - amqp_channel_open_t, - AMQP_EMPTY_BYTES); - return RPC_REPLY(amqp_channel_open_ok_t); + amqp_method_number_t replies[2] = { AMQP_CHANNEL_OPEN_OK_METHOD, 0}; + amqp_channel_open_t req; + req.out_of_band.bytes = NULL; + req.out_of_band.len = 0; + + state->most_recent_api_result = amqp_simple_rpc(state, channel, + AMQP_CHANNEL_OPEN_METHOD, + replies, &req); + if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL) + return state->most_recent_api_result.reply.decoded; + else + return NULL; } int amqp_basic_publish(amqp_connection_state_t state, @@ -120,18 +148,19 @@ int amqp_basic_publish(amqp_connection_state_t state, amqp_frame_t f; size_t body_offset; size_t usable_body_payload_size = state->frame_max - (HEADER_SIZE + FOOTER_SIZE); + int res; - amqp_basic_publish_t m = - (amqp_basic_publish_t) { - .exchange = exchange, - .routing_key = routing_key, - .mandatory = mandatory, - .immediate = immediate - }; - + amqp_basic_publish_t m; amqp_basic_properties_t default_properties; - AMQP_CHECK_RESULT(amqp_send_method(state, channel, AMQP_BASIC_PUBLISH_METHOD, &m)); + m.exchange = exchange; + m.routing_key = routing_key; + m.mandatory = mandatory; + m.immediate = immediate; + + res = amqp_send_method(state, channel, AMQP_BASIC_PUBLISH_METHOD, &m); + if (res < 0) + return res; if (properties == NULL) { memset(&default_properties, 0, sizeof(default_properties)); @@ -143,7 +172,10 @@ int amqp_basic_publish(amqp_connection_state_t state, f.payload.properties.class_id = AMQP_BASIC_CLASS; f.payload.properties.body_size = body.len; f.payload.properties.decoded = (void *) properties; - AMQP_CHECK_RESULT(amqp_send_frame(state, &f)); + + res = amqp_send_frame(state, &f); + if (res < 0) + return res; body_offset = 0; while (1) { @@ -155,7 +187,7 @@ int amqp_basic_publish(amqp_connection_state_t state, f.frame_type = AMQP_FRAME_BODY; f.channel = channel; - f.payload.body_fragment.bytes = BUF_AT(body, body_offset); + f.payload.body_fragment.bytes = amqp_offset(body.bytes, body_offset); if (remaining >= usable_body_payload_size) { f.payload.body_fragment.len = usable_body_payload_size; } else { @@ -163,7 +195,9 @@ int amqp_basic_publish(amqp_connection_state_t state, } body_offset += f.payload.body_fragment.len; - AMQP_CHECK_RESULT(amqp_send_frame(state, &f)); + res = amqp_send_frame(state, &f); + if (res < 0) + return res; } return 0; @@ -174,20 +208,34 @@ amqp_rpc_reply_t amqp_channel_close(amqp_connection_state_t state, int code) { char codestr[13]; - snprintf(codestr, sizeof(codestr), "%d", code); - return AMQP_SIMPLE_RPC(state, channel, CHANNEL, CLOSE, CLOSE_OK, - amqp_channel_close_t, - code, amqp_cstring_bytes(codestr), 0, 0); + amqp_method_number_t replies[2] = { AMQP_CHANNEL_CLOSE_OK_METHOD, 0}; + amqp_channel_close_t req; + + req.reply_code = code; + req.reply_text.bytes = codestr; + req.reply_text.len = sprintf(codestr, "%d", code); + req.class_id = 0; + req.method_id = 0; + + return amqp_simple_rpc(state, channel, AMQP_CHANNEL_CLOSE_METHOD, + replies, &req); } amqp_rpc_reply_t amqp_connection_close(amqp_connection_state_t state, int code) { char codestr[13]; - snprintf(codestr, sizeof(codestr), "%d", code); - return AMQP_SIMPLE_RPC(state, 0, CONNECTION, CLOSE, CLOSE_OK, - amqp_connection_close_t, - code, amqp_cstring_bytes(codestr), 0, 0); + amqp_method_number_t replies[2] = { AMQP_CONNECTION_CLOSE_OK_METHOD, 0}; + amqp_channel_close_t req; + + req.reply_code = code; + req.reply_text.bytes = codestr; + req.reply_text.len = sprintf(codestr, "%d", code); + req.class_id = 0; + req.method_id = 0; + + return amqp_simple_rpc(state, 0, AMQP_CONNECTION_CLOSE_METHOD, + replies, &req); } amqp_exchange_declare_ok_t *amqp_exchange_declare(amqp_connection_state_t state, @@ -198,11 +246,24 @@ amqp_exchange_declare_ok_t *amqp_exchange_declare(amqp_connection_state_t state, amqp_boolean_t durable, amqp_table_t arguments) { - state->most_recent_api_result = - AMQP_SIMPLE_RPC(state, channel, EXCHANGE, DECLARE, DECLARE_OK, - amqp_exchange_declare_t, - 0, exchange, type, passive, durable, 0, 0, 0, arguments); - return RPC_REPLY(amqp_exchange_declare_ok_t); + amqp_method_number_t replies[2] = { AMQP_EXCHANGE_DECLARE_OK_METHOD, 0}; + amqp_exchange_declare_t req; + req.exchange = exchange; + req.type = type; + req.passive = passive; + req.durable = durable; + req.auto_delete = 0; + req.internal = 0; + req.nowait = 0; + req.arguments = arguments; + + state->most_recent_api_result = amqp_simple_rpc(state, channel, + AMQP_EXCHANGE_DECLARE_METHOD, + replies, &req); + if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL) + return state->most_recent_api_result.reply.decoded; + else + return NULL; } amqp_queue_declare_ok_t *amqp_queue_declare(amqp_connection_state_t state, @@ -214,11 +275,23 @@ amqp_queue_declare_ok_t *amqp_queue_declare(amqp_connection_state_t state, amqp_boolean_t auto_delete, amqp_table_t arguments) { - state->most_recent_api_result = - AMQP_SIMPLE_RPC(state, channel, QUEUE, DECLARE, DECLARE_OK, - amqp_queue_declare_t, - 0, queue, passive, durable, exclusive, auto_delete, 0, arguments); - return RPC_REPLY(amqp_queue_declare_ok_t); + amqp_method_number_t replies[2] = { AMQP_QUEUE_DECLARE_OK_METHOD, 0}; + amqp_queue_declare_t req; + req.queue = queue; + req.passive = passive; + req.durable = durable; + req.exclusive = exclusive; + req.auto_delete = auto_delete; + req.nowait = 0; + req.arguments = arguments; + + state->most_recent_api_result = amqp_simple_rpc(state, channel, + AMQP_QUEUE_DECLARE_METHOD, + replies, &req); + if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL) + return state->most_recent_api_result.reply.decoded; + else + return NULL; } amqp_queue_delete_ok_t *amqp_queue_delete(amqp_connection_state_t state, @@ -227,11 +300,20 @@ amqp_queue_delete_ok_t *amqp_queue_delete(amqp_connection_state_t state, amqp_boolean_t if_unused, amqp_boolean_t if_empty) { - state->most_recent_api_result = - AMQP_SIMPLE_RPC(state, channel, QUEUE, DELETE, DELETE_OK, - amqp_queue_delete_t, - 0, queue, if_unused, if_empty, 0); - return RPC_REPLY(amqp_queue_delete_ok_t); + amqp_method_number_t replies[2] = { AMQP_QUEUE_DELETE_OK_METHOD, 0}; + amqp_queue_delete_t req; + req.queue = queue; + req.if_unused = if_unused; + req.if_empty = if_empty; + req.nowait = 0; + + state->most_recent_api_result = amqp_simple_rpc(state, channel, + AMQP_QUEUE_DELETE_METHOD, + replies, &req); + if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL) + return state->most_recent_api_result.reply.decoded; + else + return NULL; } amqp_queue_bind_ok_t *amqp_queue_bind(amqp_connection_state_t state, @@ -241,11 +323,22 @@ amqp_queue_bind_ok_t *amqp_queue_bind(amqp_connection_state_t state, amqp_bytes_t routing_key, amqp_table_t arguments) { - state->most_recent_api_result = - AMQP_SIMPLE_RPC(state, channel, QUEUE, BIND, BIND_OK, - amqp_queue_bind_t, - 0, queue, exchange, routing_key, 0, arguments); - return RPC_REPLY(amqp_queue_bind_ok_t); + amqp_method_number_t replies[2] = { AMQP_QUEUE_BIND_OK_METHOD, 0}; + amqp_queue_bind_t req; + req.ticket = 0; + req.queue = queue; + req.exchange = exchange; + req.routing_key = routing_key; + req.nowait = 0; + req.arguments = arguments; + + state->most_recent_api_result = amqp_simple_rpc(state, channel, + AMQP_QUEUE_BIND_METHOD, + replies, &req); + if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL) + return state->most_recent_api_result.reply.decoded; + else + return NULL; } amqp_queue_unbind_ok_t *amqp_queue_unbind(amqp_connection_state_t state, @@ -255,11 +348,21 @@ amqp_queue_unbind_ok_t *amqp_queue_unbind(amqp_connection_state_t state, amqp_bytes_t routing_key, amqp_table_t arguments) { - state->most_recent_api_result = - AMQP_SIMPLE_RPC(state, channel, QUEUE, UNBIND, UNBIND_OK, - amqp_queue_unbind_t, - 0, queue, exchange, routing_key, arguments); - return RPC_REPLY(amqp_queue_unbind_ok_t); + amqp_method_number_t replies[2] = { AMQP_QUEUE_UNBIND_OK_METHOD, 0}; + amqp_queue_unbind_t req; + req.ticket = 0; + req.queue = queue; + req.exchange = exchange; + req.routing_key = routing_key; + req.arguments = arguments; + + state->most_recent_api_result = amqp_simple_rpc(state, channel, + AMQP_QUEUE_UNBIND_METHOD, + replies, &req); + if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL) + return state->most_recent_api_result.reply.decoded; + else + return NULL; } amqp_basic_consume_ok_t *amqp_basic_consume(amqp_connection_state_t state, @@ -271,11 +374,24 @@ amqp_basic_consume_ok_t *amqp_basic_consume(amqp_connection_state_t state, amqp_boolean_t exclusive, amqp_table_t filter) { - state->most_recent_api_result = - AMQP_SIMPLE_RPC(state, channel, BASIC, CONSUME, CONSUME_OK, - amqp_basic_consume_t, - 0, queue, consumer_tag, no_local, no_ack, exclusive, 0, filter); - return RPC_REPLY(amqp_basic_consume_ok_t); + amqp_method_number_t replies[2] = { AMQP_BASIC_CONSUME_OK_METHOD, 0}; + amqp_basic_consume_t req; + req.ticket = 0; + req.queue = queue; + req.consumer_tag = consumer_tag; + req.no_local = no_local; + req.no_ack = no_ack; + req.exclusive = exclusive; + req.nowait = 0; + req.arguments = filter; + + state->most_recent_api_result = amqp_simple_rpc(state, channel, + AMQP_BASIC_CONSUME_METHOD, + replies, &req); + if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL) + return state->most_recent_api_result.reply.decoded; + else + return NULL; } int amqp_basic_ack(amqp_connection_state_t state, @@ -283,13 +399,10 @@ int amqp_basic_ack(amqp_connection_state_t state, uint64_t delivery_tag, amqp_boolean_t multiple) { - amqp_basic_ack_t m = - (amqp_basic_ack_t) { - .delivery_tag = delivery_tag, - .multiple = multiple - }; - AMQP_CHECK_RESULT(amqp_send_method(state, channel, AMQP_BASIC_ACK_METHOD, &m)); - return 0; + amqp_basic_ack_t m; + m.delivery_tag = delivery_tag; + m.multiple = multiple; + return amqp_send_method(state, channel, AMQP_BASIC_ACK_METHOD, &m); } amqp_queue_purge_ok_t *amqp_queue_purge(amqp_connection_state_t state, @@ -297,11 +410,19 @@ amqp_queue_purge_ok_t *amqp_queue_purge(amqp_connection_state_t state, amqp_bytes_t queue, amqp_boolean_t no_wait) { - state->most_recent_api_result = - AMQP_SIMPLE_RPC(state, channel, QUEUE, PURGE, PURGE_OK, - amqp_queue_purge_t, - 0, queue, no_wait); - return RPC_REPLY(amqp_queue_purge_ok_t); + amqp_method_number_t replies[2] = { AMQP_QUEUE_PURGE_OK_METHOD, 0}; + amqp_queue_purge_t req; + req.ticket = 0; + req.queue = queue; + req.nowait = 0; + + state->most_recent_api_result = amqp_simple_rpc(state, channel, + AMQP_QUEUE_PURGE_METHOD, + replies, &req); + if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL) + return state->most_recent_api_result.reply.decoded; + else + return NULL; } amqp_rpc_reply_t amqp_basic_get(amqp_connection_state_t state, @@ -312,38 +433,54 @@ amqp_rpc_reply_t amqp_basic_get(amqp_connection_state_t state, amqp_method_number_t replies[] = { AMQP_BASIC_GET_OK_METHOD, AMQP_BASIC_GET_EMPTY_METHOD, 0 }; - state->most_recent_api_result = - AMQP_MULTIPLE_RESPONSE_RPC(state, channel, BASIC, GET, replies, - amqp_basic_get_t, - 0, queue, no_ack); + amqp_basic_get_t req; + req.ticket = 0; + req.queue = queue; + req.no_ack = no_ack; + + state->most_recent_api_result = amqp_simple_rpc(state, channel, + AMQP_BASIC_GET_METHOD, + replies, &req); return state->most_recent_api_result; } amqp_tx_select_ok_t *amqp_tx_select(amqp_connection_state_t state, amqp_channel_t channel) { - state->most_recent_api_result = - AMQP_SIMPLE_RPC(state, channel, TX, SELECT, SELECT_OK, - amqp_tx_select_t); - return RPC_REPLY(amqp_tx_select_ok_t); + amqp_method_number_t replies[2] = { AMQP_TX_SELECT_OK_METHOD, 0}; + state->most_recent_api_result = amqp_simple_rpc(state, channel, + AMQP_TX_SELECT_METHOD, + replies, NULL); + if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL) + return state->most_recent_api_result.reply.decoded; + else + return NULL; } amqp_tx_commit_ok_t *amqp_tx_commit(amqp_connection_state_t state, amqp_channel_t channel) { - state->most_recent_api_result = - AMQP_SIMPLE_RPC(state, channel, TX, COMMIT, COMMIT_OK, - amqp_tx_commit_t); - return RPC_REPLY(amqp_tx_commit_ok_t); + amqp_method_number_t replies[2] = { AMQP_TX_COMMIT_OK_METHOD, 0}; + state->most_recent_api_result = amqp_simple_rpc(state, channel, + AMQP_TX_COMMIT_METHOD, + replies, NULL); + if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL) + return state->most_recent_api_result.reply.decoded; + else + return NULL; } amqp_tx_rollback_ok_t *amqp_tx_rollback(amqp_connection_state_t state, amqp_channel_t channel) { - state->most_recent_api_result = - AMQP_SIMPLE_RPC(state, channel, TX, ROLLBACK, ROLLBACK_OK, - amqp_tx_rollback_t); - return RPC_REPLY(amqp_tx_rollback_ok_t); + amqp_method_number_t replies[2] = { AMQP_TX_ROLLBACK_OK_METHOD, 0}; + state->most_recent_api_result = amqp_simple_rpc(state, channel, + AMQP_TX_ROLLBACK_METHOD, + replies, NULL); + if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL) + return state->most_recent_api_result.reply.decoded; + else + return NULL; } amqp_rpc_reply_t amqp_get_rpc_reply(amqp_connection_state_t state) diff --git a/librabbitmq/amqp_connection.c b/librabbitmq/amqp_connection.c index 3d95e98..34d12f8 100644 --- a/librabbitmq/amqp_connection.c +++ b/librabbitmq/amqp_connection.c @@ -58,62 +58,56 @@ #include "amqp_framing.h" #include "amqp_private.h" -#include "socket.h" - #define INITIAL_FRAME_POOL_PAGE_SIZE 65536 #define INITIAL_DECODING_POOL_PAGE_SIZE 131072 #define INITIAL_INBOUND_SOCK_BUFFER_SIZE 131072 -#define ENFORCE_STATE(statevec, statenum) \ - { \ - amqp_connection_state_t _check_state = (statevec); \ - int _wanted_state = (statenum); \ - amqp_assert(_check_state->state == _wanted_state, \ - "Programming error: invalid AMQP connection state: expected %d, got %d", \ - _wanted_state, \ - _check_state->state); \ +#define ENFORCE_STATE(statevec, statenum) \ + { \ + amqp_connection_state_t _check_state = (statevec); \ + int _wanted_state = (statenum); \ + if (_check_state->state != _wanted_state) \ + amqp_abort("Programming error: invalid AMQP connection state: expected %d, got %d", \ + _wanted_state, \ + _check_state->state); \ } amqp_connection_state_t amqp_new_connection(void) { amqp_connection_state_t state = (amqp_connection_state_t) calloc(1, sizeof(struct amqp_connection_state_t_)); - if (state == NULL) { + if (state == NULL) return NULL; - } init_amqp_pool(&state->frame_pool, INITIAL_FRAME_POOL_PAGE_SIZE); init_amqp_pool(&state->decoding_pool, INITIAL_DECODING_POOL_PAGE_SIZE); - state->state = CONNECTION_STATE_IDLE; + if (amqp_tune_connection(state, 0, INITIAL_FRAME_POOL_PAGE_SIZE, 0) != 0) + goto out_nomem; - state->inbound_buffer.bytes = NULL; - state->outbound_buffer.bytes = NULL; - if (amqp_tune_connection(state, 0, INITIAL_FRAME_POOL_PAGE_SIZE, 0) != 0) { - empty_amqp_pool(&state->frame_pool); - empty_amqp_pool(&state->decoding_pool); - free(state); - return NULL; - } + state->inbound_buffer.bytes = amqp_pool_alloc(&state->frame_pool, state->inbound_buffer.len); + if (state->inbound_buffer.bytes == NULL) + goto out_nomem; - state->inbound_offset = 0; - state->target_size = HEADER_SIZE; + state->state = CONNECTION_STATE_INITIAL; + /* the server protocol version response is 8 bytes, which conveniently + is also the minimum frame size */ + state->target_size = 8; state->sockfd = -1; state->sock_inbound_buffer.len = INITIAL_INBOUND_SOCK_BUFFER_SIZE; state->sock_inbound_buffer.bytes = malloc(INITIAL_INBOUND_SOCK_BUFFER_SIZE); - if (state->sock_inbound_buffer.bytes == NULL) { - amqp_destroy_connection(state); - return NULL; - } - - state->sock_inbound_offset = 0; - state->sock_inbound_limit = 0; - - state->first_queued_frame = NULL; - state->last_queued_frame = NULL; + if (state->sock_inbound_buffer.bytes == NULL) + goto out_nomem; return state; + + out_nomem: + free(state->sock_inbound_buffer.bytes); + empty_amqp_pool(&state->frame_pool); + empty_amqp_pool(&state->decoding_pool); + free(state); + return NULL; } int amqp_get_sockfd(amqp_connection_state_t state) { @@ -180,153 +174,162 @@ static void return_to_idle(amqp_connection_state_t state) { state->state = CONNECTION_STATE_IDLE; } +static size_t consume_data(amqp_connection_state_t state, + amqp_bytes_t *received_data) +{ + /* how much data is available and will fit? */ + size_t bytes_consumed = state->target_size - state->inbound_offset; + if (received_data->len < bytes_consumed) + bytes_consumed = received_data->len; + + memcpy(amqp_offset(state->inbound_buffer.bytes, state->inbound_offset), + received_data->bytes, bytes_consumed); + state->inbound_offset += bytes_consumed; + received_data->bytes = amqp_offset(received_data->bytes, bytes_consumed); + received_data->len -= bytes_consumed; + + return bytes_consumed; +} + int amqp_handle_input(amqp_connection_state_t state, amqp_bytes_t received_data, amqp_frame_t *decoded_frame) { - int total_bytes_consumed = 0; - int bytes_consumed; + size_t bytes_consumed; + void *raw_frame; /* Returning frame_type of zero indicates either insufficient input, or a complete, ignored frame was read. */ decoded_frame->frame_type = 0; - read_more: - if (received_data.len == 0) { - return total_bytes_consumed; - } + if (received_data.len == 0) + return 0; if (state->state == CONNECTION_STATE_IDLE) { - state->inbound_buffer.bytes = amqp_pool_alloc(&state->frame_pool, state->inbound_buffer.len); - if (state->inbound_buffer.bytes == NULL) { + state->inbound_buffer.bytes = amqp_pool_alloc(&state->frame_pool, + state->inbound_buffer.len); + if (state->inbound_buffer.bytes == NULL) /* state->inbound_buffer.len is always nonzero, because it corresponds to frame_max, which is not permitted to be less than AMQP_FRAME_MIN_SIZE (currently 4096 bytes). */ return -ERROR_NO_MEMORY; - } - state->state = CONNECTION_STATE_WAITING_FOR_HEADER; - } - bytes_consumed = state->target_size - state->inbound_offset; - if (received_data.len < bytes_consumed) { - bytes_consumed = received_data.len; + state->state = CONNECTION_STATE_HEADER; } - E_BYTES(state->inbound_buffer, state->inbound_offset, bytes_consumed, received_data.bytes); - state->inbound_offset += bytes_consumed; - total_bytes_consumed += bytes_consumed; + bytes_consumed = consume_data(state, &received_data); - assert(state->inbound_offset <= state->target_size); + /* do we have target_size data yet? if not, return with the + expectation that more will arrive */ + if (state->inbound_offset < state->target_size) + return bytes_consumed; - if (state->inbound_offset < state->target_size) { - return total_bytes_consumed; - } + raw_frame = state->inbound_buffer.bytes; switch (state->state) { - case CONNECTION_STATE_WAITING_FOR_HEADER: - if (D_8(state->inbound_buffer, 0) == AMQP_PSEUDOFRAME_PROTOCOL_HEADER && - D_16(state->inbound_buffer, 1) == AMQP_PSEUDOFRAME_PROTOCOL_CHANNEL) - { - state->target_size = 8; - state->state = CONNECTION_STATE_WAITING_FOR_PROTOCOL_HEADER; - } else { - state->target_size = D_32(state->inbound_buffer, 3) + HEADER_SIZE + FOOTER_SIZE; - state->state = CONNECTION_STATE_WAITING_FOR_BODY; - } - - /* Wind buffer forward, and try to read some body out of it. */ - received_data.len -= bytes_consumed; - received_data.bytes = ((char *) received_data.bytes) + bytes_consumed; - goto read_more; - - case CONNECTION_STATE_WAITING_FOR_BODY: { - int frame_type = D_8(state->inbound_buffer, 0); - -#if 0 - printf("recving:\n"); - amqp_dump(state->inbound_buffer.bytes, state->target_size); -#endif - - /* Check frame end marker (footer) */ - if (D_8(state->inbound_buffer, state->target_size - 1) != AMQP_FRAME_END) { - return -ERROR_BAD_AMQP_DATA; - } - - decoded_frame->channel = D_16(state->inbound_buffer, 1); - - switch (frame_type) { - case AMQP_FRAME_METHOD: { - amqp_bytes_t encoded; - - /* Four bytes of method ID before the method args. */ - encoded.len = state->target_size - (HEADER_SIZE + 4 + FOOTER_SIZE); - encoded.bytes = D_BYTES(state->inbound_buffer, HEADER_SIZE + 4, encoded.len); - - decoded_frame->frame_type = AMQP_FRAME_METHOD; - decoded_frame->payload.method.id = D_32(state->inbound_buffer, HEADER_SIZE); - AMQP_CHECK_RESULT(amqp_decode_method(decoded_frame->payload.method.id, - &state->decoding_pool, - encoded, - &decoded_frame->payload.method.decoded)); - break; - } - - case AMQP_FRAME_HEADER: { - amqp_bytes_t encoded; - - /* 12 bytes for properties header. */ - encoded.len = state->target_size - (HEADER_SIZE + 12 + FOOTER_SIZE); - encoded.bytes = D_BYTES(state->inbound_buffer, HEADER_SIZE + 12, encoded.len); - - decoded_frame->frame_type = AMQP_FRAME_HEADER; - decoded_frame->payload.properties.class_id = D_16(state->inbound_buffer, HEADER_SIZE); - decoded_frame->payload.properties.body_size = D_64(state->inbound_buffer, HEADER_SIZE+4); - decoded_frame->payload.properties.raw = encoded; - AMQP_CHECK_RESULT(amqp_decode_properties(decoded_frame->payload.properties.class_id, - &state->decoding_pool, - encoded, - &decoded_frame->payload.properties.decoded)); - break; - } - - case AMQP_FRAME_BODY: { - size_t fragment_len = state->target_size - (HEADER_SIZE + FOOTER_SIZE); - - decoded_frame->frame_type = AMQP_FRAME_BODY; - decoded_frame->payload.body_fragment.len = fragment_len; - decoded_frame->payload.body_fragment.bytes = - D_BYTES(state->inbound_buffer, HEADER_SIZE, fragment_len); - break; - } - - case AMQP_FRAME_HEARTBEAT: - decoded_frame->frame_type = AMQP_FRAME_HEARTBEAT; - break; - - default: - /* Ignore the frame by not changing frame_type away from 0. */ - break; - } + case CONNECTION_STATE_INITIAL: + /* check for a protocol header from the server */ + if (memcmp(raw_frame, "AMQP", 4) == 0) { + decoded_frame->frame_type = AMQP_PSEUDOFRAME_PROTOCOL_HEADER; + decoded_frame->channel = 0; + + decoded_frame->payload.protocol_header.transport_high + = amqp_d8(raw_frame, 4); + decoded_frame->payload.protocol_header.transport_low + = amqp_d8(raw_frame, 5); + decoded_frame->payload.protocol_header.protocol_version_major + = amqp_d8(raw_frame, 6); + decoded_frame->payload.protocol_header.protocol_version_minor + = amqp_d8(raw_frame, 7); return_to_idle(state); - return total_bytes_consumed; + return bytes_consumed; } - case CONNECTION_STATE_WAITING_FOR_PROTOCOL_HEADER: - decoded_frame->frame_type = AMQP_PSEUDOFRAME_PROTOCOL_HEADER; - decoded_frame->channel = AMQP_PSEUDOFRAME_PROTOCOL_CHANNEL; - amqp_assert(D_8(state->inbound_buffer, 3) == (uint8_t) 'P', - "Invalid protocol header received"); - decoded_frame->payload.protocol_header.transport_high = D_8(state->inbound_buffer, 4); - decoded_frame->payload.protocol_header.transport_low = D_8(state->inbound_buffer, 5); - decoded_frame->payload.protocol_header.protocol_version_major = D_8(state->inbound_buffer, 6); - decoded_frame->payload.protocol_header.protocol_version_minor = D_8(state->inbound_buffer, 7); + /* it's not a protocol header; fall through to process it as a + regular frame header */ - return_to_idle(state); - return total_bytes_consumed; + case CONNECTION_STATE_HEADER: + /* frame length is 3 bytes in */ + state->target_size + = amqp_d32(raw_frame, 3) + HEADER_SIZE + FOOTER_SIZE; + state->state = CONNECTION_STATE_BODY; + + bytes_consumed += consume_data(state, &received_data); + + /* do we have target_size data yet? if not, return with the + expectation that more will arrive */ + if (state->inbound_offset < state->target_size) + return bytes_consumed; + + /* fall through to process body */ + + case CONNECTION_STATE_BODY: { + amqp_bytes_t encoded; + int res; + + /* Check frame end marker (footer) */ + if (amqp_d8(raw_frame, state->target_size - 1) != AMQP_FRAME_END) + return -ERROR_BAD_AMQP_DATA; + + decoded_frame->frame_type = amqp_d8(raw_frame, 0); + decoded_frame->channel = amqp_d16(raw_frame, 1); + + switch (decoded_frame->frame_type) { + case AMQP_FRAME_METHOD: + decoded_frame->payload.method.id = amqp_d32(raw_frame, HEADER_SIZE); + encoded.bytes = amqp_offset(raw_frame, HEADER_SIZE + 4); + encoded.len = state->target_size - HEADER_SIZE - 4 - FOOTER_SIZE; + + res = amqp_decode_method(decoded_frame->payload.method.id, + &state->decoding_pool, encoded, + &decoded_frame->payload.method.decoded); + if (res < 0) + return res; + + break; + + case AMQP_FRAME_HEADER: + decoded_frame->payload.properties.class_id + = amqp_d16(raw_frame, HEADER_SIZE); + /* unused 2-byte weight field goes here */ + decoded_frame->payload.properties.body_size + = amqp_d64(raw_frame, HEADER_SIZE + 4); + encoded.bytes = amqp_offset(raw_frame, HEADER_SIZE + 12); + encoded.len = state->target_size - HEADER_SIZE - 12 - FOOTER_SIZE; + decoded_frame->payload.properties.raw = encoded; + + res = amqp_decode_properties(decoded_frame->payload.properties.class_id, + &state->decoding_pool, encoded, + &decoded_frame->payload.properties.decoded); + if (res < 0) + return res; + + break; + + case AMQP_FRAME_BODY: + decoded_frame->payload.body_fragment.len + = state->target_size - HEADER_SIZE - FOOTER_SIZE; + decoded_frame->payload.body_fragment.bytes + = amqp_offset(raw_frame, HEADER_SIZE); + break; + + case AMQP_FRAME_HEARTBEAT: + break; default: - amqp_assert(0, "Internal error: invalid amqp_connection_state_t->state %d", state->state); + /* Ignore the frame */ + decoded_frame->frame_type = 0; + break; + } + + return_to_idle(state); + return bytes_consumed; + } + + default: + amqp_abort("Internal error: invalid amqp_connection_state_t->state %d", state->state); + return bytes_consumed; } } @@ -337,8 +340,8 @@ amqp_boolean_t amqp_release_buffers_ok(amqp_connection_state_t state) { void amqp_release_buffers(amqp_connection_state_t state) { ENFORCE_STATE(state, CONNECTION_STATE_IDLE); - amqp_assert(state->first_queued_frame == NULL, - "Programming error: attempt to amqp_release_buffers while waiting events enqueued"); + if (state->first_queued_frame) + amqp_abort("Programming error: attempt to amqp_release_buffers while waiting events enqueued"); recycle_amqp_pool(&state->frame_pool); recycle_amqp_pool(&state->decoding_pool); @@ -350,103 +353,80 @@ void amqp_maybe_release_buffers(amqp_connection_state_t state) { } } -static int inner_send_frame(amqp_connection_state_t state, - amqp_frame_t const *frame, - amqp_bytes_t *encoded, - int *payload_len) +int amqp_send_frame(amqp_connection_state_t state, + const amqp_frame_t *frame) { - int separate_body; + void *out_frame = state->outbound_buffer.bytes; + int res; - E_8(state->outbound_buffer, 0, frame->frame_type); - E_16(state->outbound_buffer, 1, frame->channel); - switch (frame->frame_type) { - case AMQP_FRAME_METHOD: - E_32(state->outbound_buffer, HEADER_SIZE, frame->payload.method.id); - encoded->len = state->outbound_buffer.len - (HEADER_SIZE + 4 + FOOTER_SIZE); - encoded->bytes = D_BYTES(state->outbound_buffer, HEADER_SIZE + 4, encoded->len); - *payload_len = AMQP_CHECK_RESULT(amqp_encode_method(frame->payload.method.id, - frame->payload.method.decoded, - *encoded)) + 4; - separate_body = 0; - break; + amqp_e8(out_frame, 0, frame->frame_type); + amqp_e16(out_frame, 1, frame->channel); - case AMQP_FRAME_HEADER: - E_16(state->outbound_buffer, HEADER_SIZE, frame->payload.properties.class_id); - E_16(state->outbound_buffer, HEADER_SIZE+2, 0); /* "weight" */ - E_64(state->outbound_buffer, HEADER_SIZE+4, frame->payload.properties.body_size); - encoded->len = state->outbound_buffer.len - (HEADER_SIZE + 12 + FOOTER_SIZE); - encoded->bytes = D_BYTES(state->outbound_buffer, HEADER_SIZE + 12, encoded->len); - *payload_len = AMQP_CHECK_RESULT(amqp_encode_properties(frame->payload.properties.class_id, - frame->payload.properties.decoded, - *encoded)) + 12; - separate_body = 0; - break; + if (frame->frame_type == AMQP_FRAME_BODY) { + /* For a body frame, rather than copying data around, we use + writev to compose the frame */ + struct iovec iov[3]; + uint8_t frame_end_byte = AMQP_FRAME_END; + const amqp_bytes_t *body = &frame->payload.body_fragment; - case AMQP_FRAME_BODY: - *encoded = frame->payload.body_fragment; - *payload_len = encoded->len; - separate_body = 1; - break; + amqp_e32(out_frame, 3, body->len); - case AMQP_FRAME_HEARTBEAT: - *encoded = AMQP_EMPTY_BYTES; - *payload_len = 0; - separate_body = 0; - break; + iov[0].iov_base = out_frame; + iov[0].iov_len = HEADER_SIZE; + iov[1].iov_base = body->bytes; + iov[1].iov_len = body->len; + iov[2].iov_base = &frame_end_byte; + iov[2].iov_len = FOOTER_SIZE; - default: - abort(); + res = amqp_socket_writev(state->sockfd, iov, 3); } + else { + size_t out_frame_len; + amqp_bytes_t encoded; - E_32(state->outbound_buffer, 3, *payload_len); - if (!separate_body) { - E_8(state->outbound_buffer, *payload_len + HEADER_SIZE, AMQP_FRAME_END); - } + switch (frame->frame_type) { + case AMQP_FRAME_METHOD: + amqp_e32(out_frame, HEADER_SIZE, frame->payload.method.id); -#if 0 - if (separate_body) { - printf("sending body frame (header):\n"); - amqp_dump(state->outbound_buffer.bytes, HEADER_SIZE); - printf("sending body frame (payload):\n"); - amqp_dump(encoded->bytes, *payload_len); - } else { - printf("sending:\n"); - amqp_dump(state->outbound_buffer.bytes, *payload_len + HEADER_SIZE + FOOTER_SIZE); - } -#endif + encoded.bytes = amqp_offset(out_frame, HEADER_SIZE + 4); + encoded.len = state->outbound_buffer.len - HEADER_SIZE - 4 - FOOTER_SIZE; - return separate_body; -} + res = amqp_encode_method(frame->payload.method.id, + frame->payload.method.decoded, encoded); + if (res < 0) + return res; -int amqp_send_frame(amqp_connection_state_t state, - amqp_frame_t const *frame) -{ - amqp_bytes_t encoded; - int payload_len, res; - - res = inner_send_frame(state, frame, &encoded, &payload_len); - switch (res) { - case 0: - res = send(state->sockfd, state->outbound_buffer.bytes, - payload_len + (HEADER_SIZE + FOOTER_SIZE), 0); + out_frame_len = res + 4; break; - case 1: { - struct iovec iov[3]; - char frame_end_byte = AMQP_FRAME_END; - iov[0].iov_base = state->outbound_buffer.bytes; - iov[0].iov_len = HEADER_SIZE; - iov[1].iov_base = encoded.bytes; - iov[1].iov_len = payload_len; - iov[2].iov_base = &frame_end_byte; - assert(FOOTER_SIZE == 1); - iov[2].iov_len = FOOTER_SIZE; - res = amqp_socket_writev(state->sockfd, &iov[0], 3); + case AMQP_FRAME_HEADER: + amqp_e16(out_frame, HEADER_SIZE, frame->payload.properties.class_id); + amqp_e16(out_frame, HEADER_SIZE+2, 0); /* "weight" */ + amqp_e64(out_frame, HEADER_SIZE+4, frame->payload.properties.body_size); + + encoded.bytes = amqp_offset(out_frame, HEADER_SIZE + 12); + encoded.len = state->outbound_buffer.len - HEADER_SIZE - 12 - FOOTER_SIZE; + + res = amqp_encode_properties(frame->payload.properties.class_id, + frame->payload.properties.decoded, encoded); + if (res < 0) + return res; + + out_frame_len = res + 12; + break; + + case AMQP_FRAME_HEARTBEAT: + out_frame_len = 0; break; - } default: - return res; + abort(); + } + + amqp_e32(out_frame, 3, out_frame_len); + amqp_e8(out_frame, out_frame_len + HEADER_SIZE, AMQP_FRAME_END); + res = send(state->sockfd, out_frame, + out_frame_len + HEADER_SIZE + FOOTER_SIZE, 0); } if (res < 0) @@ -454,35 +434,3 @@ int amqp_send_frame(amqp_connection_state_t state, else return 0; } - -int amqp_send_frame_to(amqp_connection_state_t state, - amqp_frame_t const *frame, - amqp_output_fn_t fn, - void *context) -{ - amqp_bytes_t encoded; - int payload_len; - int separate_body; - - separate_body = inner_send_frame(state, frame, &encoded, &payload_len); - switch (separate_body) { - case 0: - AMQP_CHECK_RESULT(fn(context, - state->outbound_buffer.bytes, - payload_len + (HEADER_SIZE + FOOTER_SIZE))); - return 0; - - case 1: - AMQP_CHECK_RESULT(fn(context, state->outbound_buffer.bytes, HEADER_SIZE)); - AMQP_CHECK_RESULT(fn(context, encoded.bytes, payload_len)); - { - assert(FOOTER_SIZE == 1); - char frame_end_byte = AMQP_FRAME_END; - AMQP_CHECK_RESULT(fn(context, &frame_end_byte, FOOTER_SIZE)); - } - return 0; - - default: - return separate_body; - } -} diff --git a/librabbitmq/amqp_mem.c b/librabbitmq/amqp_mem.c index 021151a..3ad8c3e 100644 --- a/librabbitmq/amqp_mem.c +++ b/librabbitmq/amqp_mem.c @@ -56,7 +56,7 @@ #include <assert.h> #include "amqp.h" -#include "../config.h" +#include "config.h" char const *amqp_version(void) { return VERSION; /* defined in config.h */ @@ -196,5 +196,5 @@ amqp_bytes_t amqp_bytes_malloc(size_t amount) { } void amqp_bytes_free(amqp_bytes_t bytes) { - AMQP_BYTES_FREE(bytes); + free(bytes.bytes); } diff --git a/librabbitmq/amqp_private.h b/librabbitmq/amqp_private.h index c30663a..eaf6b2c 100644 --- a/librabbitmq/amqp_private.h +++ b/librabbitmq/amqp_private.h @@ -51,9 +51,7 @@ * ***** END LICENSE BLOCK ***** */ -#ifdef __cplusplus -extern "C" { -#endif +#include "config.h" /* Error numbering: Because of differences in error numbering on * different platforms, we want to keep error numbers opaque for @@ -78,42 +76,43 @@ extern "C" { extern char *amqp_os_error_string(int err); +#include "socket.h" + /* - * Connection states: + * Connection states: XXX FIX THIS * - * - CONNECTION_STATE_IDLE: initial state, and entered again after - * each frame is completed. Means that no bytes of the next frame - * have been seen yet. Connections may only be reconfigured, and the + * - CONNECTION_STATE_INITIAL: The initial state, when we cannot be + * sure if the next thing we will get is the first AMQP frame, or a + * protocol header from the server. + * + * - CONNECTION_STATE_IDLE: The normal state between + * frames. Connections may only be reconfigured, and the * connection's pools recycled, when in this state. Whenever we're * in this state, the inbound_buffer's bytes pointer must be NULL; * any other state, and it must point to a block of memory allocated * from the frame_pool. * - * - CONNECTION_STATE_WAITING_FOR_HEADER: Some bytes of an incoming - * frame have been seen, but not a complete frame header's worth. - * - * - CONNECTION_STATE_WAITING_FOR_BODY: A complete frame header has - * been seen, but the frame is not yet complete. When it is - * completed, it will be returned, and the connection will return to - * IDLE state. + * - CONNECTION_STATE_HEADER: Some bytes of an incoming frame have + * been seen, but not a complete frame header's worth. * - * - CONNECTION_STATE_WAITING_FOR_PROTOCOL_HEADER: The beginning of a - * protocol version header has been seen, but the full eight bytes - * hasn't yet been received. When it is completed, it will be + * - CONNECTION_STATE_BODY: A complete frame header has been seen, but + * the frame is not yet complete. When it is completed, it will be * returned, and the connection will return to IDLE state. * */ typedef enum amqp_connection_state_enum_ { CONNECTION_STATE_IDLE = 0, - CONNECTION_STATE_WAITING_FOR_HEADER, - CONNECTION_STATE_WAITING_FOR_BODY, - CONNECTION_STATE_WAITING_FOR_PROTOCOL_HEADER + CONNECTION_STATE_INITIAL, + CONNECTION_STATE_HEADER, + CONNECTION_STATE_BODY } amqp_connection_state_enum; /* 7 bytes up front, then payload, then 1 byte footer */ #define HEADER_SIZE 7 #define FOOTER_SIZE 1 +#define AMQP_PSEUDOFRAME_PROTOCOL_HEADER 'A' + typedef struct amqp_link_t_ { struct amqp_link_t_ *next; void *data; @@ -146,65 +145,107 @@ struct amqp_connection_state_t_ { amqp_rpc_reply_t most_recent_api_result; }; -#define CHECK_LIMIT(b, o, l, v) ({ if ((o + l) > (b).len) { return -ERROR_BAD_AMQP_DATA; } (v); }) -#define BUF_AT(b, o) (&(((uint8_t *) (b).bytes)[o])) - -#define D_8(b, o) CHECK_LIMIT(b, o, 1, * (uint8_t *) BUF_AT(b, o)) -#define D_16(b, o) CHECK_LIMIT(b, o, 2, ({uint16_t v; memcpy(&v, BUF_AT(b, o), 2); ntohs(v);})) -#define D_32(b, o) CHECK_LIMIT(b, o, 4, ({uint32_t v; memcpy(&v, BUF_AT(b, o), 4); ntohl(v);})) -#define D_64(b, o) ({ \ - uint64_t hi = D_32(b, o); \ - uint64_t lo = D_32(b, o + 4); \ - hi << 32 | lo; \ -}) - -#define D_BYTES(b, o, l) CHECK_LIMIT(b, o, l, BUF_AT(b, o)) - -#define E_8(b, o, v) CHECK_LIMIT(b, o, 1, * (uint8_t *) BUF_AT(b, o) = (v)) -#define E_16(b, o, v) CHECK_LIMIT(b, o, 2, ({uint16_t vv = htons(v); memcpy(BUF_AT(b, o), &vv, 2);})) -#define E_32(b, o, v) CHECK_LIMIT(b, o, 4, ({uint32_t vv = htonl(v); memcpy(BUF_AT(b, o), &vv, 4);})) -#define E_64(b, o, v) ({ \ - E_32(b, o, (uint32_t) (((uint64_t) v) >> 32)); \ - E_32(b, o + 4, (uint32_t) (((uint64_t) v) & 0xFFFFFFFF)); \ - }) - -#define E_BYTES(b, o, l, v) CHECK_LIMIT(b, o, l, memcpy(BUF_AT(b, o), (v), (l))) - -extern int amqp_decode_table(amqp_bytes_t encoded, - amqp_pool_t *pool, - amqp_table_t *output, - int *offsetptr); - -extern int amqp_encode_table(amqp_bytes_t encoded, - amqp_table_t *input, - int *offsetptr); - -#define amqp_assert(condition, ...) \ - ({ \ - if (!(condition)) { \ - fprintf(stderr, __VA_ARGS__); \ - fputc('\n', stderr); \ - abort(); \ - } \ - }) - -#define AMQP_CHECK_RESULT_CLEANUP(expr, stmts) \ - ({ \ - int _result = (expr); \ - if (_result < 0) { stmts; return _result; } \ - _result; \ - }) - -#define AMQP_CHECK_RESULT(expr) AMQP_CHECK_RESULT_CLEANUP(expr, ) - -#ifndef NDEBUG -extern void amqp_dump(void const *buffer, size_t len); -#else -#define amqp_dump(buffer, len) ((void) 0) -#endif +static inline void *amqp_offset(void *data, size_t offset) +{ + return (char *)data + offset; +} -#ifdef __cplusplus +/* assuming a machine that supports unaligned accesses (for now) */ + +#define DECLARE_CODEC_BASE_TYPE(bits, htonx, ntohx) \ + \ +static inline void amqp_e##bits(void *data, size_t offset, \ + uint##bits##_t val) \ +{ \ + *(uint##bits##_t *)amqp_offset(data, offset) = htonx(val); \ +} \ + \ +static inline uint##bits##_t amqp_d##bits(void *data, size_t offset) \ +{ \ + return ntohx(*(uint##bits##_t *)amqp_offset(data, offset)); \ +} \ + \ +static inline int amqp_encode_##bits(amqp_bytes_t encoded, size_t *offset, \ + uint##bits##_t input) \ + \ +{ \ + size_t o = *offset; \ + if ((*offset = o + bits / 8) <= encoded.len) { \ + amqp_e##bits(encoded.bytes, o, input); \ + return 1; \ + } \ + else { \ + return 0; \ + } \ +} \ + \ +static inline int amqp_decode_##bits(amqp_bytes_t encoded, size_t *offset, \ + uint##bits##_t *output) \ + \ +{ \ + size_t o = *offset; \ + if ((*offset = o + bits / 8) <= encoded.len) { \ + *output = amqp_d##bits(encoded.bytes, o); \ + *output = ntohx(*(uint##bits##_t *)((char *)encoded.bytes + o)); \ + return 1; \ + } \ + else { \ + return 0; \ + } \ } -#endif + +/* assuming little endian (for now) */ + +#define DECLARE_XTOXLL(func) \ +static inline uint64_t func##ll(uint64_t val) \ +{ \ + union { \ + uint64_t whole; \ + uint32_t halves[2]; \ + } u; \ + uint32_t t; \ + u.whole = val; \ + t = u.halves[0]; \ + u.halves[0] = func##l(u.halves[1]); \ + u.halves[1] = func##l(t); \ + return u.whole; \ +} + +DECLARE_XTOXLL(hton) +DECLARE_XTOXLL(ntoh) + +DECLARE_CODEC_BASE_TYPE(8, (uint8_t), (uint8_t)) +DECLARE_CODEC_BASE_TYPE(16, htons, ntohs) +DECLARE_CODEC_BASE_TYPE(32, htonl, ntohl) +DECLARE_CODEC_BASE_TYPE(64, htonll, ntohll) + +static inline int amqp_encode_bytes(amqp_bytes_t encoded, size_t *offset, + amqp_bytes_t input) +{ + size_t o = *offset; + if ((*offset = o + input.len) <= encoded.len) { + memcpy(amqp_offset(encoded.bytes, o), input.bytes, input.len); + return 1; + } + else { + return 0; + } +} + +static inline int amqp_decode_bytes(amqp_bytes_t encoded, size_t *offset, + amqp_bytes_t *output, size_t len) +{ + size_t o = *offset; + if ((*offset = o + len) <= encoded.len) { + output->bytes = amqp_offset(encoded.bytes, o); + output->len = len; + return 1; + } + else { + return 0; + } +} + +extern void amqp_abort(const char *fmt, ...); #endif diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c index 13f6376..f23b42b 100644 --- a/librabbitmq/amqp_socket.c +++ b/librabbitmq/amqp_socket.c @@ -59,8 +59,6 @@ #include "amqp_framing.h" #include "amqp_private.h" -#include "socket.h" - int amqp_open_socket(char const *hostname, int portnumber) @@ -98,37 +96,28 @@ int amqp_open_socket(char const *hostname, return sockfd; } -static char *header() { - static char header[8]; - header[0] = 'A'; - header[1] = 'M'; - header[2] = 'Q'; - header[3] = 'P'; - header[4] = 0; - header[5] = AMQP_PROTOCOL_VERSION_MAJOR; - header[6] = AMQP_PROTOCOL_VERSION_MINOR; - header[7] = AMQP_PROTOCOL_VERSION_REVISION; - return header; -} - int amqp_send_header(amqp_connection_state_t state) { - return send(state->sockfd, header(), 8, 0); -} - -int amqp_send_header_to(amqp_connection_state_t state, - amqp_output_fn_t fn, - void *context) -{ - return fn(context, header(), 8); + static const uint8_t header[8] = { 'A', 'M', 'Q', 'P', 0, + AMQP_PROTOCOL_VERSION_MAJOR, + AMQP_PROTOCOL_VERSION_MINOR, + AMQP_PROTOCOL_VERSION_REVISION }; + return send(state->sockfd, (void *)header, 8, 0); } static amqp_bytes_t sasl_method_name(amqp_sasl_method_enum method) { + amqp_bytes_t res; + switch (method) { - case AMQP_SASL_METHOD_PLAIN: return (amqp_bytes_t) {.len = 5, .bytes = "PLAIN"}; - default: - amqp_assert(0, "Invalid SASL method: %d", (int) method); + case AMQP_SASL_METHOD_PLAIN: + res.bytes = "PLAIN"; + res.len = 5; + break; + + default: + amqp_abort("Invalid SASL method: %d", (int) method); } - abort(); /* unreachable */ + + return res; } static amqp_bytes_t sasl_response(amqp_pool_t *pool, @@ -143,20 +132,23 @@ static amqp_bytes_t sasl_response(amqp_pool_t *pool, size_t username_len = strlen(username); char *password = va_arg(args, char *); size_t password_len = strlen(password); + char *response_buf; + amqp_pool_alloc_bytes(pool, strlen(username) + strlen(password) + 2, &response); - if (response.bytes == NULL) { + if (response.bytes == NULL) /* We never request a zero-length block, because of the +2 above, so a NULL here really is ENOMEM. */ return response; - } - *BUF_AT(response, 0) = 0; - memcpy(((char *) response.bytes) + 1, username, username_len); - *BUF_AT(response, username_len + 1) = 0; - memcpy(((char *) response.bytes) + username_len + 2, password, password_len); + + response_buf = response.bytes; + response_buf[0] = 0; + memcpy(response_buf + 1, username, username_len); + response_buf[username_len + 1] = 0; + memcpy(response_buf + username_len + 2, password, password_len); break; } default: - amqp_assert(0, "Invalid SASL method: %d", (int) method); + amqp_abort("Invalid SASL method: %d", (int) method); } return response; @@ -178,33 +170,37 @@ static int wait_frame_inner(amqp_connection_state_t state, amqp_frame_t *decoded_frame) { while (1) { - int result; + int res; while (amqp_data_in_buffer(state)) { amqp_bytes_t buffer; buffer.len = state->sock_inbound_limit - state->sock_inbound_offset; buffer.bytes = ((char *) state->sock_inbound_buffer.bytes) + state->sock_inbound_offset; - AMQP_CHECK_RESULT((result = amqp_handle_input(state, buffer, decoded_frame))); - state->sock_inbound_offset += result; + + res = amqp_handle_input(state, buffer, decoded_frame); + if (res < 0) + return res; + + state->sock_inbound_offset += res; if (decoded_frame->frame_type != 0) /* Complete frame was read. Return it. */ return 0; /* Incomplete or ignored frame. Keep processing input. */ - assert(result != 0); - } + assert(res != 0); + } - result = recv(state->sockfd, state->sock_inbound_buffer.bytes, + res = recv(state->sockfd, state->sock_inbound_buffer.bytes, state->sock_inbound_buffer.len, 0); - if (result <= 0) { - if (result == 0) + if (res <= 0) { + if (res == 0) return -ERROR_CONNECTION_CLOSED; else return -amqp_socket_error(); } - state->sock_inbound_limit = result; + state->sock_inbound_limit = res; state->sock_inbound_offset = 0; } } @@ -234,22 +230,22 @@ int amqp_simple_wait_method(amqp_connection_state_t state, int res = amqp_simple_wait_frame(state, &frame); if (res < 0) return res; - - amqp_assert(frame.channel == expected_channel, - "Expected 0x%08X method frame on channel %d, got frame on channel %d", - expected_method, - expected_channel, - frame.channel); - amqp_assert(frame.frame_type == AMQP_FRAME_METHOD, - "Expected 0x%08X method frame on channel %d, got frame type %d", - expected_method, - expected_channel, - frame.frame_type); - amqp_assert(frame.payload.method.id == expected_method, - "Expected method ID 0x%08X on channel %d, got ID 0x%08X", - expected_method, - expected_channel, - frame.payload.method.id); + + if (frame.channel != expected_channel) + amqp_abort("Expected 0x%08X method frame on channel %d, got frame on channel %d", + expected_method, + expected_channel, + frame.channel); + if (frame.frame_type != AMQP_FRAME_METHOD) + amqp_abort("Expected 0x%08X method frame on channel %d, got frame type %d", + expected_method, + expected_channel, + frame.frame_type); + if (frame.payload.method.id != expected_method) + amqp_abort("Expected method ID 0x%08X on channel %d, got ID 0x%08X", + expected_method, + expected_channel, + frame.payload.method.id); *output = frame.payload.method; return 0; } @@ -388,19 +384,23 @@ static int amqp_login_inner(amqp_connection_state_t state, } { - amqp_bytes_t response_bytes = sasl_response(&state->decoding_pool, sasl_method, vl); amqp_connection_start_ok_t s; - if (response_bytes.bytes == NULL) { + amqp_bytes_t response_bytes = sasl_response(&state->decoding_pool, + sasl_method, vl); + + if (response_bytes.bytes == NULL) return -ERROR_NO_MEMORY; - } - s = - (amqp_connection_start_ok_t) { - .client_properties = {.num_entries = 0, .entries = NULL}, - .mechanism = sasl_method_name(sasl_method), - .response = response_bytes, - .locale = {.len = 5, .bytes = "en_US"} - }; - AMQP_CHECK_RESULT(amqp_send_method(state, 0, AMQP_CONNECTION_START_OK_METHOD, &s)); + + s.client_properties.num_entries = 0; + s.client_properties.entries = NULL; + s.mechanism = sasl_method_name(sasl_method); + s.response = response_bytes; + s.locale.bytes = "en_US"; + s.locale.len = 5; + + res = amqp_send_method(state, 0, AMQP_CONNECTION_START_OK_METHOD, &s); + if (res < 0) + return res; } amqp_release_buffers(state); @@ -409,7 +409,7 @@ static int amqp_login_inner(amqp_connection_state_t state, &method); if (res < 0) return res; - + { amqp_connection_tune_t *s = (amqp_connection_tune_t *) method.decoded; server_channel_max = s->channel_max; @@ -417,28 +417,28 @@ static int amqp_login_inner(amqp_connection_state_t state, server_heartbeat = s->heartbeat; } - if (server_channel_max != 0 && server_channel_max < channel_max) { + if (server_channel_max != 0 && server_channel_max < channel_max) channel_max = server_channel_max; - } - if (server_frame_max != 0 && server_frame_max < frame_max) { + if (server_frame_max != 0 && server_frame_max < frame_max) frame_max = server_frame_max; - } - if (server_heartbeat != 0 && server_heartbeat < heartbeat) { + if (server_heartbeat != 0 && server_heartbeat < heartbeat) heartbeat = server_heartbeat; - } - AMQP_CHECK_RESULT(amqp_tune_connection(state, channel_max, frame_max, heartbeat)); + res = amqp_tune_connection(state, channel_max, frame_max, heartbeat); + if (res < 0) + return res; { - amqp_connection_tune_ok_t s = - (amqp_connection_tune_ok_t) { - .channel_max = channel_max, - .frame_max = frame_max, - .heartbeat = heartbeat - }; - AMQP_CHECK_RESULT(amqp_send_method(state, 0, AMQP_CONNECTION_TUNE_OK_METHOD, &s)); + amqp_connection_tune_ok_t s; + s.frame_max = frame_max; + s.channel_max = channel_max; + s.heartbeat = heartbeat; + + res = amqp_send_method(state, 0, AMQP_CONNECTION_TUNE_OK_METHOD, &s); + if (res < 0) + return res; } amqp_release_buffers(state); @@ -470,21 +470,20 @@ amqp_rpc_reply_t amqp_login(amqp_connection_state_t state, } { - amqp_connection_open_t s = - (amqp_connection_open_t) { - .virtual_host = amqp_cstring_bytes(vhost), - .capabilities = {.len = 0, .bytes = NULL}, - .insist = 1 - }; amqp_method_number_t replies[] = { AMQP_CONNECTION_OPEN_OK_METHOD, 0 }; + amqp_connection_open_t s; + s.virtual_host = amqp_cstring_bytes(vhost); + s.capabilities.len = 0; + s.capabilities.bytes = NULL; + s.insist = 1; + result = amqp_simple_rpc(state, 0, AMQP_CONNECTION_OPEN_METHOD, (amqp_method_number_t *) &replies, &s); - if (result.reply_type != AMQP_RESPONSE_NORMAL) { + if (result.reply_type != AMQP_RESPONSE_NORMAL) return result; - } } amqp_maybe_release_buffers(state); diff --git a/librabbitmq/amqp_table.c b/librabbitmq/amqp_table.c index 3f5eb61..e85217f 100644 --- a/librabbitmq/amqp_table.c +++ b/librabbitmq/amqp_table.c @@ -55,7 +55,6 @@ #include "amqp.h" #include "amqp_private.h" -#include "socket.h" #include <assert.h> @@ -65,356 +64,360 @@ static int amqp_decode_field_value(amqp_bytes_t encoded, amqp_pool_t *pool, amqp_field_value_t *entry, - int *offsetptr); /* forward */ + size_t *offset); static int amqp_encode_field_value(amqp_bytes_t encoded, amqp_field_value_t *entry, - int *offsetptr); /* forward */ + size_t *offset); /*---------------------------------------------------------------------------*/ static int amqp_decode_array(amqp_bytes_t encoded, amqp_pool_t *pool, amqp_array_t *output, - int *offsetptr) + size_t *offset) { - int offset = *offsetptr; - uint32_t arraysize = D_32(encoded, offset); + uint32_t arraysize; int num_entries = 0; - amqp_field_value_t *entries = malloc(INITIAL_ARRAY_SIZE * sizeof(amqp_field_value_t)); int allocated_entries = INITIAL_ARRAY_SIZE; - int limit; + amqp_field_value_t *entries; + size_t limit; + int res; - if (entries == NULL) { - return -ERROR_NO_MEMORY; - } + if (!amqp_decode_32(encoded, offset, &arraysize)) + return -ERROR_BAD_AMQP_DATA; - offset += 4; - limit = offset + arraysize; + entries = malloc(allocated_entries * sizeof(amqp_field_value_t)); + if (entries == NULL) + return -ERROR_NO_MEMORY; - while (offset < limit) { + limit = *offset + arraysize; + while (*offset < limit) { if (num_entries >= allocated_entries) { void *newentries; allocated_entries = allocated_entries * 2; newentries = realloc(entries, allocated_entries * sizeof(amqp_field_value_t)); - if (newentries == NULL) { - free(entries); - return -ERROR_NO_MEMORY; - } + res = -ERROR_NO_MEMORY; + if (newentries == NULL) + goto out; + entries = newentries; } - AMQP_CHECK_RESULT_CLEANUP(amqp_decode_field_value(encoded, - pool, - &entries[num_entries], - &offset), - free(entries)); + res = amqp_decode_field_value(encoded, pool, &entries[num_entries], + offset); + if (res < 0) + goto out; + num_entries++; } output->num_entries = num_entries; output->entries = amqp_pool_alloc(pool, num_entries * sizeof(amqp_field_value_t)); - if (output->entries == NULL && num_entries > 0) { - /* NULL is legitimate if we requested a zero-length block. */ - free(entries); - return -ERROR_NO_MEMORY; - } + res = -ERROR_NO_MEMORY; + /* NULL is legitimate if we requested a zero-length block. */ + if (output->entries == NULL && num_entries > 0) + goto out; memcpy(output->entries, entries, num_entries * sizeof(amqp_field_value_t)); - free(entries); + res = 0; - *offsetptr = offset; - return 0; + out: + free(entries); + return res; } int amqp_decode_table(amqp_bytes_t encoded, amqp_pool_t *pool, amqp_table_t *output, - int *offsetptr) + size_t *offset) { - int offset = *offsetptr; - uint32_t tablesize = D_32(encoded, offset); + uint32_t tablesize; int num_entries = 0; - amqp_table_entry_t *entries = malloc(INITIAL_TABLE_SIZE * sizeof(amqp_table_entry_t)); + amqp_table_entry_t *entries; int allocated_entries = INITIAL_TABLE_SIZE; - int limit; + size_t limit; + int res; - if (entries == NULL) { - return -ERROR_NO_MEMORY; - } + if (!amqp_decode_32(encoded, offset, &tablesize)) + return -ERROR_BAD_AMQP_DATA; - offset += 4; - limit = offset + tablesize; + entries = malloc(allocated_entries * sizeof(amqp_table_entry_t)); + if (entries == NULL) + return -ERROR_NO_MEMORY; - while (offset < limit) { - size_t keylen; - amqp_table_entry_t *entry; + limit = *offset + tablesize; + while (*offset < limit) { + uint8_t keylen; - keylen = D_8(encoded, offset); - offset++; + res = -ERROR_BAD_AMQP_DATA; + if (!amqp_decode_8(encoded, offset, &keylen)) + goto out; if (num_entries >= allocated_entries) { void *newentries; allocated_entries = allocated_entries * 2; newentries = realloc(entries, allocated_entries * sizeof(amqp_table_entry_t)); - if (newentries == NULL) { - free(entries); - return -ERROR_NO_MEMORY; - } + res = -ERROR_NO_MEMORY; + if (newentries == NULL) + goto out; + entries = newentries; } - entry = &entries[num_entries]; - entry->key.len = keylen; - entry->key.bytes = D_BYTES(encoded, offset, keylen); - offset += keylen; + res = -ERROR_BAD_AMQP_DATA; + if (!amqp_decode_bytes(encoded, offset, &entries[num_entries].key, keylen)) + goto out; + + res = amqp_decode_field_value(encoded, pool, &entries[num_entries].value, + offset); + if (res < 0) + goto out; - AMQP_CHECK_RESULT_CLEANUP(amqp_decode_field_value(encoded, - pool, - &entry->value, - &offset), - free(entries)); num_entries++; } output->num_entries = num_entries; output->entries = amqp_pool_alloc(pool, num_entries * sizeof(amqp_table_entry_t)); - if (output->entries == NULL && num_entries > 0) { - /* NULL is legitimate if we requested a zero-length block. */ - free(entries); - return -ERROR_NO_MEMORY; - } + res = -ERROR_NO_MEMORY; + /* NULL is legitimate if we requested a zero-length block. */ + if (output->entries == NULL && num_entries > 0) + goto out; memcpy(output->entries, entries, num_entries * sizeof(amqp_table_entry_t)); - free(entries); + res = 0; - *offsetptr = offset; - return 0; + out: + free(entries); + return res; } static int amqp_decode_field_value(amqp_bytes_t encoded, amqp_pool_t *pool, amqp_field_value_t *entry, - int *offsetptr) + size_t *offset) { - int offset = *offsetptr; + int res = -ERROR_BAD_AMQP_DATA; - entry->kind = D_8(encoded, offset); - offset++; + if (!amqp_decode_8(encoded, offset, &entry->kind)) + goto out; + +#define TRIVIAL_FIELD_DECODER(bits) if (!amqp_decode_##bits(encoded, offset, &entry->value.u##bits)) goto out; break +#define SIMPLE_FIELD_DECODER(bits, dest, how) { uint##bits##_t val; if (!amqp_decode_##bits(encoded, offset, &val)) goto out; entry->value.dest = how; } break switch (entry->kind) { - case AMQP_FIELD_KIND_BOOLEAN: - entry->value.boolean = D_8(encoded, offset) ? 1 : 0; - offset++; - break; - case AMQP_FIELD_KIND_I8: - entry->value.i8 = (int8_t) D_8(encoded, offset); - offset++; - break; - case AMQP_FIELD_KIND_U8: - entry->value.u8 = D_8(encoded, offset); - offset++; - break; - case AMQP_FIELD_KIND_I16: - entry->value.i16 = (int16_t) D_16(encoded, offset); - offset += 2; - break; - case AMQP_FIELD_KIND_U16: - entry->value.u16 = D_16(encoded, offset); - offset += 2; - break; - case AMQP_FIELD_KIND_I32: - entry->value.i32 = (int32_t) D_32(encoded, offset); - offset += 4; - break; - case AMQP_FIELD_KIND_U32: - entry->value.u32 = D_32(encoded, offset); - offset += 4; - break; - case AMQP_FIELD_KIND_I64: - entry->value.i64 = (int64_t) D_64(encoded, offset); - offset += 8; - break; - case AMQP_FIELD_KIND_F32: - entry->value.u32 = D_32(encoded, offset); - /* and by punning, f32 magically gets the right value...! */ - offset += 4; - break; - case AMQP_FIELD_KIND_F64: - entry->value.u64 = D_64(encoded, offset); - /* and by punning, f64 magically gets the right value...! */ - offset += 8; - break; - case AMQP_FIELD_KIND_DECIMAL: - entry->value.decimal.decimals = D_8(encoded, offset); - offset++; - entry->value.decimal.value = D_32(encoded, offset); - offset += 4; - break; - case AMQP_FIELD_KIND_UTF8: - /* AMQP_FIELD_KIND_UTF8 and AMQP_FIELD_KIND_BYTES have the - same implementation, but different interpretations. */ - /* fall through */ - case AMQP_FIELD_KIND_BYTES: - entry->value.bytes.len = D_32(encoded, offset); - offset += 4; - entry->value.bytes.bytes = D_BYTES(encoded, offset, entry->value.bytes.len); - offset += entry->value.bytes.len; - break; - case AMQP_FIELD_KIND_ARRAY: - AMQP_CHECK_RESULT(amqp_decode_array(encoded, pool, &(entry->value.array), &offset)); - break; - case AMQP_FIELD_KIND_TIMESTAMP: - entry->value.u64 = D_64(encoded, offset); - offset += 8; - break; - case AMQP_FIELD_KIND_TABLE: - AMQP_CHECK_RESULT(amqp_decode_table(encoded, pool, &(entry->value.table), &offset)); - break; - case AMQP_FIELD_KIND_VOID: - break; - default: - return -ERROR_BAD_AMQP_DATA; + case AMQP_FIELD_KIND_BOOLEAN: + SIMPLE_FIELD_DECODER(8, boolean, val ? 1 : 0); + + case AMQP_FIELD_KIND_I8: + SIMPLE_FIELD_DECODER(8, i8, (int8_t)val); + case AMQP_FIELD_KIND_U8: + TRIVIAL_FIELD_DECODER(8); + + case AMQP_FIELD_KIND_I16: + SIMPLE_FIELD_DECODER(16, i16, (int16_t)val); + case AMQP_FIELD_KIND_U16: + TRIVIAL_FIELD_DECODER(16); + + case AMQP_FIELD_KIND_I32: + SIMPLE_FIELD_DECODER(32, i32, (int32_t)val); + case AMQP_FIELD_KIND_U32: + TRIVIAL_FIELD_DECODER(32); + + case AMQP_FIELD_KIND_I64: + SIMPLE_FIELD_DECODER(64, i64, (int64_t)val); + case AMQP_FIELD_KIND_U64: + TRIVIAL_FIELD_DECODER(64); + + case AMQP_FIELD_KIND_F32: + TRIVIAL_FIELD_DECODER(32); + /* and by punning, f32 magically gets the right value...! */ + + case AMQP_FIELD_KIND_F64: + TRIVIAL_FIELD_DECODER(64); + /* and by punning, f64 magically gets the right value...! */ + + case AMQP_FIELD_KIND_DECIMAL: + if (!amqp_decode_8(encoded, offset, &entry->value.decimal.decimals) + || !amqp_decode_32(encoded, offset, &entry->value.decimal.value)) + goto out; + break; + + case AMQP_FIELD_KIND_UTF8: + /* AMQP_FIELD_KIND_UTF8 and AMQP_FIELD_KIND_BYTES have the + same implementation, but different interpretations. */ + /* fall through */ + case AMQP_FIELD_KIND_BYTES: { + uint32_t len; + if (!amqp_decode_32(encoded, offset, &len) + || !amqp_decode_bytes(encoded, offset, &entry->value.bytes, len)) + goto out; + break; } - *offsetptr = offset; - return 0; + case AMQP_FIELD_KIND_ARRAY: + res = amqp_decode_array(encoded, pool, &(entry->value.array), offset); + goto out; + + case AMQP_FIELD_KIND_TIMESTAMP: + TRIVIAL_FIELD_DECODER(64); + + case AMQP_FIELD_KIND_TABLE: + res = amqp_decode_table(encoded, pool, &(entry->value.table), offset); + goto out; + + case AMQP_FIELD_KIND_VOID: + break; + + default: + goto out; + } + + res = 0; + + out: + return res; } /*---------------------------------------------------------------------------*/ static int amqp_encode_array(amqp_bytes_t encoded, amqp_array_t *input, - int *offsetptr) + size_t *offset) { - int offset = *offsetptr; - int arraysize_offset = offset; - int i; + size_t start = *offset; + int i, res; - offset += 4; /* skip space for the size of the array to be filled in later */ + *offset += 4; /* size of the array gets filled in later on */ for (i = 0; i < input->num_entries; i++) { - AMQP_CHECK_RESULT(amqp_encode_field_value(encoded, &(input->entries[i]), &offset)); + res = amqp_encode_field_value(encoded, &input->entries[i], offset); + if (res < 0) + goto out; } - E_32(encoded, arraysize_offset, (offset - *offsetptr - 4)); - *offsetptr = offset; - return 0; + if (amqp_encode_32(encoded, &start, *offset - start - 4)) + res = 0; + else + res = -ERROR_BAD_AMQP_DATA; + + out: + return res; } int amqp_encode_table(amqp_bytes_t encoded, amqp_table_t *input, - int *offsetptr) + size_t *offset) { - int offset = *offsetptr; - int tablesize_offset = offset; - int i; + size_t start = *offset; + int i, res; - offset += 4; /* skip space for the size of the table to be filled in later */ + *offset += 4; /* size of the table gets filled in later on */ for (i = 0; i < input->num_entries; i++) { - amqp_table_entry_t *entry = &(input->entries[i]); - - E_8(encoded, offset, entry->key.len); - offset++; + res = amqp_encode_8(encoded, offset, input->entries[i].key.len); + if (res < 0) + goto out; - E_BYTES(encoded, offset, entry->key.len, entry->key.bytes); - offset += entry->key.len; + res = amqp_encode_bytes(encoded, offset, input->entries[i].key); + if (res < 0) + goto out; - AMQP_CHECK_RESULT(amqp_encode_field_value(encoded, &(entry->value), &offset)); + res = amqp_encode_field_value(encoded, &input->entries[i].value, offset); + if (res < 0) + goto out; } - E_32(encoded, tablesize_offset, (offset - *offsetptr - 4)); - *offsetptr = offset; - return 0; + if (amqp_encode_32(encoded, &start, *offset - start - 4)) + res = 0; + else + res = -ERROR_BAD_AMQP_DATA; + + out: + return res; } static int amqp_encode_field_value(amqp_bytes_t encoded, amqp_field_value_t *entry, - int *offsetptr) + size_t *offset) { - int offset = *offsetptr; + int res = -ERROR_BAD_AMQP_DATA; + + if (!amqp_encode_8(encoded, offset, entry->kind)) + goto out; - E_8(encoded, offset, entry->kind); - offset++; +#define FIELD_ENCODER(bits, val) if (!amqp_encode_##bits(encoded, offset, val)) goto out; break switch (entry->kind) { - case AMQP_FIELD_KIND_BOOLEAN: - E_8(encoded, offset, entry->value.boolean ? 1 : 0); - offset++; - break; - case AMQP_FIELD_KIND_I8: - E_8(encoded, offset, (uint8_t) entry->value.i8); - offset++; - break; - case AMQP_FIELD_KIND_U8: - E_8(encoded, offset, entry->value.u8); - offset++; - break; - case AMQP_FIELD_KIND_I16: - E_16(encoded, offset, (uint16_t) entry->value.i16); - offset += 2; - break; - case AMQP_FIELD_KIND_U16: - E_16(encoded, offset, entry->value.u16); - offset += 2; - break; - case AMQP_FIELD_KIND_I32: - E_32(encoded, offset, (uint32_t) entry->value.i32); - offset += 4; - break; - case AMQP_FIELD_KIND_U32: - E_32(encoded, offset, entry->value.u32); - offset += 4; - break; - case AMQP_FIELD_KIND_I64: - E_64(encoded, offset, (uint64_t) entry->value.i64); - offset += 8; - break; - case AMQP_FIELD_KIND_F32: - /* by punning, u32 magically gets the right value...! */ - E_32(encoded, offset, entry->value.u32); - offset += 4; - break; - case AMQP_FIELD_KIND_F64: - /* by punning, u64 magically gets the right value...! */ - E_64(encoded, offset, entry->value.u64); - offset += 8; - break; - case AMQP_FIELD_KIND_DECIMAL: - E_8(encoded, offset, entry->value.decimal.decimals); - offset++; - E_32(encoded, offset, entry->value.decimal.value); - offset += 4; - break; - case AMQP_FIELD_KIND_UTF8: - /* AMQP_FIELD_KIND_UTF8 and AMQP_FIELD_KIND_BYTES have the - same implementation, but different interpretations. */ - /* fall through */ - case AMQP_FIELD_KIND_BYTES: - E_32(encoded, offset, entry->value.bytes.len); - offset += 4; - E_BYTES(encoded, offset, entry->value.bytes.len, entry->value.bytes.bytes); - offset += entry->value.bytes.len; - break; - case AMQP_FIELD_KIND_ARRAY: - AMQP_CHECK_RESULT(amqp_encode_array(encoded, &(entry->value.array), &offset)); - break; - case AMQP_FIELD_KIND_TIMESTAMP: - E_64(encoded, offset, entry->value.u64); - offset += 8; - break; - case AMQP_FIELD_KIND_TABLE: - AMQP_CHECK_RESULT(amqp_encode_table(encoded, &(entry->value.table), &offset)); - break; - case AMQP_FIELD_KIND_VOID: - break; - default: - abort(); + case AMQP_FIELD_KIND_BOOLEAN: + FIELD_ENCODER(8, entry->value.boolean ? 1 : 0); + + case AMQP_FIELD_KIND_I8: + FIELD_ENCODER(8, entry->value.i8); + case AMQP_FIELD_KIND_U8: + FIELD_ENCODER(8, entry->value.u8); + + case AMQP_FIELD_KIND_I16: + FIELD_ENCODER(16, entry->value.i16); + case AMQP_FIELD_KIND_U16: + FIELD_ENCODER(16, entry->value.u16); + + case AMQP_FIELD_KIND_I32: + FIELD_ENCODER(32, entry->value.i32); + case AMQP_FIELD_KIND_U32: + FIELD_ENCODER(32, entry->value.u32); + + case AMQP_FIELD_KIND_I64: + FIELD_ENCODER(64, entry->value.i64); + case AMQP_FIELD_KIND_U64: + FIELD_ENCODER(64, entry->value.u64); + + case AMQP_FIELD_KIND_F32: + /* by punning, u32 magically gets the right value...! */ + FIELD_ENCODER(32, entry->value.u32); + + case AMQP_FIELD_KIND_F64: + /* by punning, u64 magically gets the right value...! */ + FIELD_ENCODER(64, entry->value.u64); + + case AMQP_FIELD_KIND_DECIMAL: + if (!amqp_encode_8(encoded, offset, entry->value.decimal.decimals) + || !amqp_encode_32(encoded, offset, entry->value.decimal.value)) + goto out; + break; + + case AMQP_FIELD_KIND_UTF8: + /* AMQP_FIELD_KIND_UTF8 and AMQP_FIELD_KIND_BYTES have the + same implementation, but different interpretations. */ + /* fall through */ + case AMQP_FIELD_KIND_BYTES: + if (!amqp_encode_32(encoded, offset, entry->value.bytes.len) + || !amqp_encode_bytes(encoded, offset, entry->value.bytes)) + goto out; + break; + + case AMQP_FIELD_KIND_ARRAY: + res = amqp_encode_array(encoded, &entry->value.array, offset); + goto out; + + case AMQP_FIELD_KIND_TIMESTAMP: + FIELD_ENCODER(64, entry->value.u64); + + case AMQP_FIELD_KIND_TABLE: + res = amqp_encode_table(encoded, &entry->value.table, offset); + goto out; + + case AMQP_FIELD_KIND_VOID: + break; + + default: + abort(); } - *offsetptr = offset; - return 0; + res = 0; + + out: + return res; } /*---------------------------------------------------------------------------*/ diff --git a/librabbitmq/codegen.py b/librabbitmq/codegen.py index f911966..6fd149e 100644 --- a/librabbitmq/codegen.py +++ b/librabbitmq/codegen.py @@ -52,18 +52,162 @@ from amqp_codegen import * import string import re -cTypeMap = { - 'octet': 'uint8_t', - 'shortstr': 'amqp_bytes_t', - 'longstr': 'amqp_bytes_t', - 'short': 'uint16_t', - 'long': 'uint32_t', - 'longlong': 'uint64_t', - 'bit': 'amqp_boolean_t', - 'table': 'amqp_table_t', - 'timestamp': 'uint64_t', + +class Emitter(object): + """An object the trivially emits generated code lines. + + This largely exists to be wrapped by more sophisticated emitter + classes. + """ + + def __init__(self, prefix): + self.prefix = prefix + + def emit(self, line): + """Emit a line of generated code.""" + print self.prefix + line + + +class BitDecoder(object): + """An emitter object that keeps track of the state involved in + decoding the AMQP bit type.""" + + def __init__(self, emitter): + self.emitter = emitter + self.bit = 0 + + def emit(self, line): + self.bit = 0 + self.emitter.emit(line) + + def decode_bit(self, lvalue): + """Generate code to decode a value of the AMQP bit type into + the given lvalue.""" + if self.bit == 0: + self.emitter.emit("if (!amqp_decode_8(encoded, &offset, &bit_buffer)) return -ERROR_BAD_AMQP_DATA;") + + self.emitter.emit("%s = (bit_buffer & (1 << %d)) ? 1 : 0;" + % (lvalue, self.bit)) + self.bit += 1 + if self.bit == 8: + self.bit = 0 + + +class BitEncoder(object): + """An emitter object that keeps track of the state involved in + encoding the AMQP bit type.""" + + def __init__(self, emitter): + self.emitter = emitter + self.bit = 0 + + def flush(self): + """Flush the state associated with AMQP bit types.""" + if self.bit: + self.emitter.emit("if (!amqp_encode_8(encoded, &offset, bit_buffer)) return -ERROR_BAD_AMQP_DATA;") + self.bit = 0 + + def emit(self, line): + self.flush() + self.emitter.emit(line) + + def encode_bit(self, value): + """Generate code to ebcode a value of the AMQP bit type from + the given value.""" + if self.bit == 0: + self.emitter.emit("bit_buffer = 0;") + + self.emitter.emit("if (%s) bit_buffer |= (1 << %d);" + % (value, self.bit)) + self.bit += 1 + if self.bit == 8: + self.flush() + + +class SimpleType(object): + """A AMQP type that corresponds to a simple scalar C value of a + certain width.""" + + def __init__(self, bits): + self.bits = bits + self.ctype = "uint%d_t" % (bits,) + + def decode(self, emitter, lvalue): + emitter.emit("if (!amqp_decode_%d(encoded, &offset, &%s)) return -ERROR_BAD_AMQP_DATA;" % (self.bits, lvalue)) + + def encode(self, emitter, value): + emitter.emit("if (!amqp_encode_%d(encoded, &offset, %s)) return -ERROR_BAD_AMQP_DATA;" % (self.bits, value)) + + +class StrType(object): + """The AMQP shortstr or longstr types.""" + + def __init__(self, lenbits): + self.lenbits = lenbits + self.ctype = "amqp_bytes_t" + + def decode(self, emitter, lvalue): + emitter.emit("{") + emitter.emit(" uint%d_t len;" % (self.lenbits,)) + emitter.emit(" if (!amqp_decode_%d(encoded, &offset, &len)" % (self.lenbits,)) + emitter.emit(" || !amqp_decode_bytes(encoded, &offset, &%s, len))" % (lvalue,)) + emitter.emit(" return -ERROR_BAD_AMQP_DATA;") + emitter.emit("}") + + def encode(self, emitter, value): + emitter.emit("if (!amqp_encode_%d(encoded, &offset, %s.len)" % (self.lenbits, value)) + emitter.emit(" || !amqp_encode_bytes(encoded, &offset, %s))" % (value,)) + emitter.emit(" return -ERROR_BAD_AMQP_DATA;") + + +class BitType(object): + """The AMQP bit type.""" + + def __init__(self): + self.ctype = "amqp_boolean_t" + + def decode(self, emitter, lvalue): + emitter.decode_bit(lvalue) + + def encode(self, emitter, value): + emitter.encode_bit(value) + + +class TableType(object): + """The AMQP table type.""" + + def __init__(self): + self.ctype = "amqp_table_t" + + def decode(self, emitter, lvalue): + emitter.emit("{") + emitter.emit(" int res = amqp_decode_table(encoded, pool, &(%s), &offset);" % (lvalue,)) + emitter.emit(" if (res < 0) return res;") + emitter.emit("}") + + def encode(self, emitter, value): + emitter.emit("{") + emitter.emit(" int res = amqp_encode_table(encoded, &(%s), &offset);" % (value,)) + emitter.emit(" if (res < 0) return res;") + emitter.emit("}") + + +types = { + 'octet': SimpleType(8), + 'short': SimpleType(16), + 'long': SimpleType(32), + 'longlong': SimpleType(64), + 'shortstr': StrType(8), + 'longstr': StrType(32), + 'bit': BitType(), + 'table': TableType(), + 'timestamp': SimpleType(64), } +def typeFor(spec, f): + """Get a representation of the AMQP type of a field.""" + return types[spec.resolveDomain(f.domain)] + def c_ize(s): s = s.replace('-', '_') s = s.replace(' ', '_') @@ -81,9 +225,6 @@ def cFlagName(c, f): return cConstantName(c.name + '_' + f.name) + '_FLAG' def genErl(spec): - def cType(domain): - return cTypeMap[spec.resolveDomain(domain)] - def fieldTempList(fields): return '[' + ', '.join(['F' + str(f.index) for f in fields]) + ']' @@ -93,78 +234,6 @@ def genErl(spec): def genLookupMethodName(m): print ' case %s: return "%s";' % (m.defName(), m.defName()) - def genSingleDecode(prefix, cLvalue, unresolved_domain): - type = spec.resolveDomain(unresolved_domain) - if type == 'shortstr': - print prefix + "%s.len = D_8(encoded, offset);" % (cLvalue,) - print prefix + "offset++;" - print prefix + "%s.bytes = D_BYTES(encoded, offset, %s.len);" % (cLvalue, cLvalue) - print prefix + "offset += %s.len;" % (cLvalue,) - elif type == 'longstr': - print prefix + "%s.len = D_32(encoded, offset);" % (cLvalue,) - print prefix + "offset += 4;" - print prefix + "%s.bytes = D_BYTES(encoded, offset, %s.len);" % (cLvalue, cLvalue) - print prefix + "offset += %s.len;" % (cLvalue,) - elif type == 'octet': - print prefix + "%s = D_8(encoded, offset);" % (cLvalue,) - print prefix + "offset++;" - elif type == 'short': - print prefix + "%s = D_16(encoded, offset);" % (cLvalue,) - print prefix + "offset += 2;" - elif type == 'long': - print prefix + "%s = D_32(encoded, offset);" % (cLvalue,) - print prefix + "offset += 4;" - elif type == 'longlong': - print prefix + "%s = D_64(encoded, offset);" % (cLvalue,) - print prefix + "offset += 8;" - elif type == 'timestamp': - print prefix + "%s = D_64(encoded, offset);" % (cLvalue,) - print prefix + "offset += 8;" - elif type == 'bit': - raise "Can't decode bit in genSingleDecode" - elif type == 'table': - print prefix + "table_result = amqp_decode_table(encoded, pool, &(%s), &offset);" % \ - (cLvalue,) - print prefix + "AMQP_CHECK_RESULT(table_result);" - else: - raise "Illegal domain in genSingleDecode", type - - def genSingleEncode(prefix, cValue, unresolved_domain): - type = spec.resolveDomain(unresolved_domain) - if type == 'shortstr': - print prefix + "E_8(encoded, offset, %s.len);" % (cValue,) - print prefix + "offset++;" - print prefix + "E_BYTES(encoded, offset, %s.len, %s.bytes);" % (cValue, cValue) - print prefix + "offset += %s.len;" % (cValue,) - elif type == 'longstr': - print prefix + "E_32(encoded, offset, %s.len);" % (cValue,) - print prefix + "offset += 4;" - print prefix + "E_BYTES(encoded, offset, %s.len, %s.bytes);" % (cValue, cValue) - print prefix + "offset += %s.len;" % (cValue,) - elif type == 'octet': - print prefix + "E_8(encoded, offset, %s);" % (cValue,) - print prefix + "offset++;" - elif type == 'short': - print prefix + "E_16(encoded, offset, %s);" % (cValue,) - print prefix + "offset += 2;" - elif type == 'long': - print prefix + "E_32(encoded, offset, %s);" % (cValue,) - print prefix + "offset += 4;" - elif type == 'longlong': - print prefix + "E_64(encoded, offset, %s);" % (cValue,) - print prefix + "offset += 8;" - elif type == 'timestamp': - print prefix + "E_64(encoded, offset, %s);" % (cValue,) - print prefix + "offset += 8;" - elif type == 'bit': - raise "Can't encode bit in genSingleDecode" - elif type == 'table': - print prefix + "table_result = amqp_encode_table(encoded, &(%s), &offset);" % \ - (cValue,) - print prefix + "if (table_result < 0) return table_result;" - else: - raise "Illegal domain in genSingleEncode", type - def genDecodeMethodFields(m): print " case %s: {" % (m.defName(),) if m.arguments: @@ -173,22 +242,11 @@ def genErl(spec): print " if (m == NULL) { return -ERROR_NO_MEMORY; }" else: print " %s *m = NULL; /* no fields */" % (m.structName(),) - bitindex = None + + emitter = BitDecoder(Emitter(" ")) for f in m.arguments: - if spec.resolveDomain(f.domain) == 'bit': - if bitindex is None: - bitindex = 0 - if bitindex >= 8: - bitindex = 0 - if bitindex == 0: - print " bit_buffer = D_8(encoded, offset);" - print " offset++;" - print " m->%s = (bit_buffer & (1 << %d)) ? 1 : 0;" % \ - (c_ize(f.name), bitindex) - bitindex = bitindex + 1 - else: - bitindex = None - genSingleDecode(" ", "m->%s" % (c_ize(f.name),), f.domain) + typeFor(spec, f).decode(emitter, "m->"+c_ize(f.name)) + print " *decoded = m;" print " return 0;" print " }" @@ -199,13 +257,13 @@ def genErl(spec): (c.structName(), c.structName(), c.structName()) print " if (p == NULL) { return -ERROR_NO_MEMORY; }" print " p->_flags = flags;" + + emitter = Emitter(" ") for f in c.fields: - if spec.resolveDomain(f.domain) == 'bit': - pass - else: - print " if (flags & %s) {" % (cFlagName(c, f),) - genSingleDecode(" ", "p->%s" % (c_ize(f.name),), f.domain) - print " }" + emitter.emit("if (flags & %s) {" % (cFlagName(c, f),)) + typeFor(spec, f).decode(emitter, "p->"+c_ize(f.name)) + emitter.emit("}") + print " *decoded = p;" print " return 0;" print " }" @@ -214,28 +272,12 @@ def genErl(spec): print " case %s: {" % (m.defName(),) if m.arguments: print " %s *m = (%s *) decoded;" % (m.structName(), m.structName()) - bitindex = None - def finishBits(): - if bitindex is not None: - print " E_8(encoded, offset, bit_buffer);" - print " offset++;" + + emitter = BitEncoder(Emitter(" ")) for f in m.arguments: - if spec.resolveDomain(f.domain) == 'bit': - if bitindex is None: - bitindex = 0 - print " bit_buffer = 0;" - if bitindex >= 8: - finishBits() - print " bit_buffer = 0;" - bitindex = 0 - print " if (m->%s) { bit_buffer |= (1 << %d); }" % \ - (c_ize(f.name), bitindex) - bitindex = bitindex + 1 - else: - finishBits() - bitindex = None - genSingleEncode(" ", "m->%s" % (c_ize(f.name),), f.domain) - finishBits() + typeFor(spec, f).encode(emitter, "m->"+c_ize(f.name)) + emitter.flush() + print " return offset;" print " }" @@ -243,13 +285,13 @@ def genErl(spec): print " case %d: {" % (c.index,) if c.fields: print " %s *p = (%s *) decoded;" % (c.structName(), c.structName()) + + emitter = Emitter(" ") for f in c.fields: - if spec.resolveDomain(f.domain) == 'bit': - pass - else: - print " if (flags & %s) {" % (cFlagName(c, f),) - genSingleEncode(" ", "p->%s" % (c_ize(f.name),), f.domain) - print " }" + emitter.emit(" if (flags & %s) {" % (cFlagName(c, f),)) + typeFor(spec, f).encode(emitter, "p->"+c_ize(f.name)) + emitter.emit("}") + print " return offset;" print " }" @@ -310,8 +352,7 @@ int amqp_decode_method(amqp_method_number_t methodNumber, amqp_bytes_t encoded, void **decoded) { - int offset = 0; - int table_result; + size_t offset = 0; uint8_t bit_buffer; switch (methodNumber) {""" @@ -326,16 +367,15 @@ int amqp_decode_properties(uint16_t class_id, amqp_bytes_t encoded, void **decoded) { - int offset = 0; - int table_result; + size_t offset = 0; amqp_flags_t flags = 0; int flagword_index = 0; - amqp_flags_t partial_flags; + uint16_t partial_flags; do { - partial_flags = D_16(encoded, offset); - offset += 2; + if (!amqp_decode_16(encoded, &offset, &partial_flags)) + return -ERROR_BAD_AMQP_DATA; flags |= (partial_flags << (flagword_index * 16)); flagword_index++; } while (partial_flags & 1); @@ -351,8 +391,7 @@ int amqp_encode_method(amqp_method_number_t methodNumber, void *decoded, amqp_bytes_t encoded) { - int offset = 0; - int table_result; + size_t offset = 0; uint8_t bit_buffer; switch (methodNumber) {""" @@ -366,8 +405,7 @@ int amqp_encode_properties(uint16_t class_id, void *decoded, amqp_bytes_t encoded) { - int offset = 0; - int table_result; + size_t offset = 0; /* Cheat, and get the flags out generically, relying on the similarity of structure between classes */ @@ -381,8 +419,8 @@ int amqp_encode_properties(uint16_t class_id, amqp_flags_t remainder = remaining_flags >> 16; uint16_t partial_flags = remaining_flags & 0xFFFE; if (remainder != 0) { partial_flags |= 1; } - E_16(encoded, offset, partial_flags); - offset += 2; + if (!amqp_encode_16(encoded, &offset, partial_flags)) + return -ERROR_BAD_AMQP_DATA; remaining_flags = remainder; } while (remaining_flags != 0); } @@ -394,14 +432,16 @@ int amqp_encode_properties(uint16_t class_id, }""" def genHrl(spec): - def cType(domain): - return cTypeMap[spec.resolveDomain(domain)] - def fieldDeclList(fields): - return ''.join([" %s %s;\n" % (cType(f.domain), c_ize(f.name)) for f in fields]) - + if fields: + return ''.join([" %s %s;\n" % (typeFor(spec, f).ctype, + c_ize(f.name)) + for f in fields]) + else: + return " char dummy; /* Dummy field to avoid empty struct */\n" + def propDeclList(fields): - return ''.join([" %s %s;\n" % (cType(f.domain), c_ize(f.name)) + return ''.join([" %s %s;\n" % (typeFor(spec, f).ctype, c_ize(f.name)) for f in fields if spec.resolveDomain(f.domain) != 'bit']) @@ -427,7 +467,7 @@ extern "C" { print """/* Function prototypes. */ extern char const *amqp_constant_name(int constantNumber); extern amqp_boolean_t amqp_constant_is_hard_error(int constantNumber); -extern char const *amqp_method_name(amqp_method_number_t methodNumber); +RABBITMQ_EXPORT char const *amqp_method_name(amqp_method_number_t methodNumber); extern amqp_boolean_t amqp_method_has_content(amqp_method_number_t methodNumber); extern int amqp_decode_method(amqp_method_number_t methodNumber, amqp_pool_t *pool, diff --git a/librabbitmq/unix/socket.c b/librabbitmq/unix/socket.c index 9d37dfc..4f5368e 100644 --- a/librabbitmq/unix/socket.c +++ b/librabbitmq/unix/socket.c @@ -53,6 +53,7 @@ #include <fcntl.h> #include <stdint.h> #include <string.h> +#include <stdlib.h> #include "amqp.h" #include "amqp_private.h" @@ -77,7 +78,13 @@ int amqp_socket_socket(int domain, int type, int proto) } return s; -} +} + +/* strdup is not in ISO C90! */ +static inline char *strdup(const char *str) +{ + return strcpy(malloc(strlen(str) + 1),str); +} char *amqp_os_error_string(int err) { diff --git a/librabbitmq/windows/socket.c b/librabbitmq/windows/socket.c index 9c026bd..bef7b95 100644 --- a/librabbitmq/windows/socket.c +++ b/librabbitmq/windows/socket.c @@ -48,8 +48,12 @@ * ***** END LICENSE BLOCK ***** */ +/* See http://msdn.microsoft.com/en-us/library/ms737629%28VS.85%29.aspx */ +#define WIN32_LEAN_AND_MEAN + #include <windows.h> #include <stdint.h> +#include <stdlib.h> #include "amqp.h" #include "amqp_private.h" @@ -64,13 +68,19 @@ int amqp_socket_init(void) int res = WSAStartup(0x0202, &data); if (res) return -res; - + called_wsastartup = 1; } return 0; } +/* strdup is not in ISO C90! */ +static inline char *strdup(const char *str) +{ + return strcpy(malloc(strlen(str) + 1),str); +} + char *amqp_os_error_string(int err) { char *msg, *copy; diff --git a/librabbitmq/windows/socket.h b/librabbitmq/windows/socket.h index 3e0a378..38ca905 100644 --- a/librabbitmq/windows/socket.h +++ b/librabbitmq/windows/socket.h @@ -69,7 +69,7 @@ static inline int amqp_socket_setsockopt(int sock, int level, int optname, /* same as WSABUF */ struct iovec { u_long iov_len; - char *iov_base; + void *iov_base; }; static inline int amqp_socket_writev(int sock, struct iovec *iov, int nvecs) diff --git a/msinttypes/inttypes.h b/msinttypes/inttypes.h new file mode 100644 index 0000000..4b3828a --- /dev/null +++ b/msinttypes/inttypes.h @@ -0,0 +1,305 @@ +// ISO C9x compliant inttypes.h for Microsoft Visual Studio +// Based on ISO/IEC 9899:TC2 Committee draft (May 6, 2005) WG14/N1124 +// +// Copyright (c) 2006 Alexander Chemeris +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, +// this list of conditions and the following disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright +// notice, this list of conditions and the following disclaimer in the +// documentation and/or other materials provided with the distribution. +// +// 3. The name of the author may be used to endorse or promote products +// derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED +// WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO +// EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; +// OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR +// OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF +// ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +// +/////////////////////////////////////////////////////////////////////////////// + +#ifndef _MSC_VER // [ +#error "Use this header only with Microsoft Visual C++ compilers!" +#endif // _MSC_VER ] + +#ifndef _MSC_INTTYPES_H_ // [ +#define _MSC_INTTYPES_H_ + +#if _MSC_VER > 1000 +#pragma once +#endif + +#include "stdint.h" + +// 7.8 Format conversion of integer types + +typedef struct { + intmax_t quot; + intmax_t rem; +} imaxdiv_t; + +// 7.8.1 Macros for format specifiers + +#if !defined(__cplusplus) || defined(__STDC_FORMAT_MACROS) // [ See footnote 185 at page 198 + +// The fprintf macros for signed integers are: +#define PRId8 "d" +#define PRIi8 "i" +#define PRIdLEAST8 "d" +#define PRIiLEAST8 "i" +#define PRIdFAST8 "d" +#define PRIiFAST8 "i" + +#define PRId16 "hd" +#define PRIi16 "hi" +#define PRIdLEAST16 "hd" +#define PRIiLEAST16 "hi" +#define PRIdFAST16 "hd" +#define PRIiFAST16 "hi" + +#define PRId32 "I32d" +#define PRIi32 "I32i" +#define PRIdLEAST32 "I32d" +#define PRIiLEAST32 "I32i" +#define PRIdFAST32 "I32d" +#define PRIiFAST32 "I32i" + +#define PRId64 "I64d" +#define PRIi64 "I64i" +#define PRIdLEAST64 "I64d" +#define PRIiLEAST64 "I64i" +#define PRIdFAST64 "I64d" +#define PRIiFAST64 "I64i" + +#define PRIdMAX "I64d" +#define PRIiMAX "I64i" + +#define PRIdPTR "Id" +#define PRIiPTR "Ii" + +// The fprintf macros for unsigned integers are: +#define PRIo8 "o" +#define PRIu8 "u" +#define PRIx8 "x" +#define PRIX8 "X" +#define PRIoLEAST8 "o" +#define PRIuLEAST8 "u" +#define PRIxLEAST8 "x" +#define PRIXLEAST8 "X" +#define PRIoFAST8 "o" +#define PRIuFAST8 "u" +#define PRIxFAST8 "x" +#define PRIXFAST8 "X" + +#define PRIo16 "ho" +#define PRIu16 "hu" +#define PRIx16 "hx" +#define PRIX16 "hX" +#define PRIoLEAST16 "ho" +#define PRIuLEAST16 "hu" +#define PRIxLEAST16 "hx" +#define PRIXLEAST16 "hX" +#define PRIoFAST16 "ho" +#define PRIuFAST16 "hu" +#define PRIxFAST16 "hx" +#define PRIXFAST16 "hX" + +#define PRIo32 "I32o" +#define PRIu32 "I32u" +#define PRIx32 "I32x" +#define PRIX32 "I32X" +#define PRIoLEAST32 "I32o" +#define PRIuLEAST32 "I32u" +#define PRIxLEAST32 "I32x" +#define PRIXLEAST32 "I32X" +#define PRIoFAST32 "I32o" +#define PRIuFAST32 "I32u" +#define PRIxFAST32 "I32x" +#define PRIXFAST32 "I32X" + +#define PRIo64 "I64o" +#define PRIu64 "I64u" +#define PRIx64 "I64x" +#define PRIX64 "I64X" +#define PRIoLEAST64 "I64o" +#define PRIuLEAST64 "I64u" +#define PRIxLEAST64 "I64x" +#define PRIXLEAST64 "I64X" +#define PRIoFAST64 "I64o" +#define PRIuFAST64 "I64u" +#define PRIxFAST64 "I64x" +#define PRIXFAST64 "I64X" + +#define PRIoMAX "I64o" +#define PRIuMAX "I64u" +#define PRIxMAX "I64x" +#define PRIXMAX "I64X" + +#define PRIoPTR "Io" +#define PRIuPTR "Iu" +#define PRIxPTR "Ix" +#define PRIXPTR "IX" + +// The fscanf macros for signed integers are: +#define SCNd8 "d" +#define SCNi8 "i" +#define SCNdLEAST8 "d" +#define SCNiLEAST8 "i" +#define SCNdFAST8 "d" +#define SCNiFAST8 "i" + +#define SCNd16 "hd" +#define SCNi16 "hi" +#define SCNdLEAST16 "hd" +#define SCNiLEAST16 "hi" +#define SCNdFAST16 "hd" +#define SCNiFAST16 "hi" + +#define SCNd32 "ld" +#define SCNi32 "li" +#define SCNdLEAST32 "ld" +#define SCNiLEAST32 "li" +#define SCNdFAST32 "ld" +#define SCNiFAST32 "li" + +#define SCNd64 "I64d" +#define SCNi64 "I64i" +#define SCNdLEAST64 "I64d" +#define SCNiLEAST64 "I64i" +#define SCNdFAST64 "I64d" +#define SCNiFAST64 "I64i" + +#define SCNdMAX "I64d" +#define SCNiMAX "I64i" + +#ifdef _WIN64 // [ +# define SCNdPTR "I64d" +# define SCNiPTR "I64i" +#else // _WIN64 ][ +# define SCNdPTR "ld" +# define SCNiPTR "li" +#endif // _WIN64 ] + +// The fscanf macros for unsigned integers are: +#define SCNo8 "o" +#define SCNu8 "u" +#define SCNx8 "x" +#define SCNX8 "X" +#define SCNoLEAST8 "o" +#define SCNuLEAST8 "u" +#define SCNxLEAST8 "x" +#define SCNXLEAST8 "X" +#define SCNoFAST8 "o" +#define SCNuFAST8 "u" +#define SCNxFAST8 "x" +#define SCNXFAST8 "X" + +#define SCNo16 "ho" +#define SCNu16 "hu" +#define SCNx16 "hx" +#define SCNX16 "hX" +#define SCNoLEAST16 "ho" +#define SCNuLEAST16 "hu" +#define SCNxLEAST16 "hx" +#define SCNXLEAST16 "hX" +#define SCNoFAST16 "ho" +#define SCNuFAST16 "hu" +#define SCNxFAST16 "hx" +#define SCNXFAST16 "hX" + +#define SCNo32 "lo" +#define SCNu32 "lu" +#define SCNx32 "lx" +#define SCNX32 "lX" +#define SCNoLEAST32 "lo" +#define SCNuLEAST32 "lu" +#define SCNxLEAST32 "lx" +#define SCNXLEAST32 "lX" +#define SCNoFAST32 "lo" +#define SCNuFAST32 "lu" +#define SCNxFAST32 "lx" +#define SCNXFAST32 "lX" + +#define SCNo64 "I64o" +#define SCNu64 "I64u" +#define SCNx64 "I64x" +#define SCNX64 "I64X" +#define SCNoLEAST64 "I64o" +#define SCNuLEAST64 "I64u" +#define SCNxLEAST64 "I64x" +#define SCNXLEAST64 "I64X" +#define SCNoFAST64 "I64o" +#define SCNuFAST64 "I64u" +#define SCNxFAST64 "I64x" +#define SCNXFAST64 "I64X" + +#define SCNoMAX "I64o" +#define SCNuMAX "I64u" +#define SCNxMAX "I64x" +#define SCNXMAX "I64X" + +#ifdef _WIN64 // [ +# define SCNoPTR "I64o" +# define SCNuPTR "I64u" +# define SCNxPTR "I64x" +# define SCNXPTR "I64X" +#else // _WIN64 ][ +# define SCNoPTR "lo" +# define SCNuPTR "lu" +# define SCNxPTR "lx" +# define SCNXPTR "lX" +#endif // _WIN64 ] + +#endif // __STDC_FORMAT_MACROS ] + +// 7.8.2 Functions for greatest-width integer types + +// 7.8.2.1 The imaxabs function +#define imaxabs _abs64 + +// 7.8.2.2 The imaxdiv function + +// This is modified version of div() function from Microsoft's div.c found +// in %MSVC.NET%\crt\src\div.c +#ifdef STATIC_IMAXDIV // [ +static +#else // STATIC_IMAXDIV ][ +_inline +#endif // STATIC_IMAXDIV ] +imaxdiv_t __cdecl imaxdiv(intmax_t numer, intmax_t denom) +{ + imaxdiv_t result; + + result.quot = numer / denom; + result.rem = numer % denom; + + if (numer < 0 && result.rem > 0) { + // did division wrong; must fix up + ++result.quot; + result.rem -= denom; + } + + return result; +} + +// 7.8.2.3 The strtoimax and strtoumax functions +#define strtoimax _strtoi64 +#define strtoumax _strtoui64 + +// 7.8.2.4 The wcstoimax and wcstoumax functions +#define wcstoimax _wcstoi64 +#define wcstoumax _wcstoui64 + + +#endif // _MSC_INTTYPES_H_ ] diff --git a/msinttypes/stdint.h b/msinttypes/stdint.h new file mode 100644 index 0000000..d02608a --- /dev/null +++ b/msinttypes/stdint.h @@ -0,0 +1,247 @@ +// ISO C9x compliant stdint.h for Microsoft Visual Studio +// Based on ISO/IEC 9899:TC2 Committee draft (May 6, 2005) WG14/N1124 +// +// Copyright (c) 2006-2008 Alexander Chemeris +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, +// this list of conditions and the following disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright +// notice, this list of conditions and the following disclaimer in the +// documentation and/or other materials provided with the distribution. +// +// 3. The name of the author may be used to endorse or promote products +// derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED +// WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO +// EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; +// OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR +// OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF +// ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +// +/////////////////////////////////////////////////////////////////////////////// + +#ifndef _MSC_VER // [ +#error "Use this header only with Microsoft Visual C++ compilers!" +#endif // _MSC_VER ] + +#ifndef _MSC_STDINT_H_ // [ +#define _MSC_STDINT_H_ + +#if _MSC_VER > 1000 +#pragma once +#endif + +#include <limits.h> + +// For Visual Studio 6 in C++ mode and for many Visual Studio versions when +// compiling for ARM we should wrap <wchar.h> include with 'extern "C++" {}' +// or compiler give many errors like this: +// error C2733: second C linkage of overloaded function 'wmemchr' not allowed +#ifdef __cplusplus +extern "C" { +#endif +# include <wchar.h> +#ifdef __cplusplus +} +#endif + +// Define _W64 macros to mark types changing their size, like intptr_t. +#ifndef _W64 +# if !defined(__midl) && (defined(_X86_) || defined(_M_IX86)) && _MSC_VER >= 1300 +# define _W64 __w64 +# else +# define _W64 +# endif +#endif + + +// 7.18.1 Integer types + +// 7.18.1.1 Exact-width integer types + +// Visual Studio 6 and Embedded Visual C++ 4 doesn't +// realize that, e.g. char has the same size as __int8 +// so we give up on __intX for them. +#if (_MSC_VER < 1300) + typedef signed char int8_t; + typedef signed short int16_t; + typedef signed int int32_t; + typedef unsigned char uint8_t; + typedef unsigned short uint16_t; + typedef unsigned int uint32_t; +#else + typedef signed __int8 int8_t; + typedef signed __int16 int16_t; + typedef signed __int32 int32_t; + typedef unsigned __int8 uint8_t; + typedef unsigned __int16 uint16_t; + typedef unsigned __int32 uint32_t; +#endif +typedef signed __int64 int64_t; +typedef unsigned __int64 uint64_t; + + +// 7.18.1.2 Minimum-width integer types +typedef int8_t int_least8_t; +typedef int16_t int_least16_t; +typedef int32_t int_least32_t; +typedef int64_t int_least64_t; +typedef uint8_t uint_least8_t; +typedef uint16_t uint_least16_t; +typedef uint32_t uint_least32_t; +typedef uint64_t uint_least64_t; + +// 7.18.1.3 Fastest minimum-width integer types +typedef int8_t int_fast8_t; +typedef int16_t int_fast16_t; +typedef int32_t int_fast32_t; +typedef int64_t int_fast64_t; +typedef uint8_t uint_fast8_t; +typedef uint16_t uint_fast16_t; +typedef uint32_t uint_fast32_t; +typedef uint64_t uint_fast64_t; + +// 7.18.1.4 Integer types capable of holding object pointers +#ifdef _WIN64 // [ + typedef signed __int64 intptr_t; + typedef unsigned __int64 uintptr_t; +#else // _WIN64 ][ + typedef _W64 signed int intptr_t; + typedef _W64 unsigned int uintptr_t; +#endif // _WIN64 ] + +// 7.18.1.5 Greatest-width integer types +typedef int64_t intmax_t; +typedef uint64_t uintmax_t; + + +// 7.18.2 Limits of specified-width integer types + +#if !defined(__cplusplus) || defined(__STDC_LIMIT_MACROS) // [ See footnote 220 at page 257 and footnote 221 at page 259 + +// 7.18.2.1 Limits of exact-width integer types +#define INT8_MIN ((int8_t)_I8_MIN) +#define INT8_MAX _I8_MAX +#define INT16_MIN ((int16_t)_I16_MIN) +#define INT16_MAX _I16_MAX +#define INT32_MIN ((int32_t)_I32_MIN) +#define INT32_MAX _I32_MAX +#define INT64_MIN ((int64_t)_I64_MIN) +#define INT64_MAX _I64_MAX +#define UINT8_MAX _UI8_MAX +#define UINT16_MAX _UI16_MAX +#define UINT32_MAX _UI32_MAX +#define UINT64_MAX _UI64_MAX + +// 7.18.2.2 Limits of minimum-width integer types +#define INT_LEAST8_MIN INT8_MIN +#define INT_LEAST8_MAX INT8_MAX +#define INT_LEAST16_MIN INT16_MIN +#define INT_LEAST16_MAX INT16_MAX +#define INT_LEAST32_MIN INT32_MIN +#define INT_LEAST32_MAX INT32_MAX +#define INT_LEAST64_MIN INT64_MIN +#define INT_LEAST64_MAX INT64_MAX +#define UINT_LEAST8_MAX UINT8_MAX +#define UINT_LEAST16_MAX UINT16_MAX +#define UINT_LEAST32_MAX UINT32_MAX +#define UINT_LEAST64_MAX UINT64_MAX + +// 7.18.2.3 Limits of fastest minimum-width integer types +#define INT_FAST8_MIN INT8_MIN +#define INT_FAST8_MAX INT8_MAX +#define INT_FAST16_MIN INT16_MIN +#define INT_FAST16_MAX INT16_MAX +#define INT_FAST32_MIN INT32_MIN +#define INT_FAST32_MAX INT32_MAX +#define INT_FAST64_MIN INT64_MIN +#define INT_FAST64_MAX INT64_MAX +#define UINT_FAST8_MAX UINT8_MAX +#define UINT_FAST16_MAX UINT16_MAX +#define UINT_FAST32_MAX UINT32_MAX +#define UINT_FAST64_MAX UINT64_MAX + +// 7.18.2.4 Limits of integer types capable of holding object pointers +#ifdef _WIN64 // [ +# define INTPTR_MIN INT64_MIN +# define INTPTR_MAX INT64_MAX +# define UINTPTR_MAX UINT64_MAX +#else // _WIN64 ][ +# define INTPTR_MIN INT32_MIN +# define INTPTR_MAX INT32_MAX +# define UINTPTR_MAX UINT32_MAX +#endif // _WIN64 ] + +// 7.18.2.5 Limits of greatest-width integer types +#define INTMAX_MIN INT64_MIN +#define INTMAX_MAX INT64_MAX +#define UINTMAX_MAX UINT64_MAX + +// 7.18.3 Limits of other integer types + +#ifdef _WIN64 // [ +# define PTRDIFF_MIN _I64_MIN +# define PTRDIFF_MAX _I64_MAX +#else // _WIN64 ][ +# define PTRDIFF_MIN _I32_MIN +# define PTRDIFF_MAX _I32_MAX +#endif // _WIN64 ] + +#define SIG_ATOMIC_MIN INT_MIN +#define SIG_ATOMIC_MAX INT_MAX + +#ifndef SIZE_MAX // [ +# ifdef _WIN64 // [ +# define SIZE_MAX _UI64_MAX +# else // _WIN64 ][ +# define SIZE_MAX _UI32_MAX +# endif // _WIN64 ] +#endif // SIZE_MAX ] + +// WCHAR_MIN and WCHAR_MAX are also defined in <wchar.h> +#ifndef WCHAR_MIN // [ +# define WCHAR_MIN 0 +#endif // WCHAR_MIN ] +#ifndef WCHAR_MAX // [ +# define WCHAR_MAX _UI16_MAX +#endif // WCHAR_MAX ] + +#define WINT_MIN 0 +#define WINT_MAX _UI16_MAX + +#endif // __STDC_LIMIT_MACROS ] + + +// 7.18.4 Limits of other integer types + +#if !defined(__cplusplus) || defined(__STDC_CONSTANT_MACROS) // [ See footnote 224 at page 260 + +// 7.18.4.1 Macros for minimum-width integer constants + +#define INT8_C(val) val##i8 +#define INT16_C(val) val##i16 +#define INT32_C(val) val##i32 +#define INT64_C(val) val##i64 + +#define UINT8_C(val) val##ui8 +#define UINT16_C(val) val##ui16 +#define UINT32_C(val) val##ui32 +#define UINT64_C(val) val##ui64 + +// 7.18.4.2 Macros for greatest-width integer constants +#define INTMAX_C INT64_C +#define UINTMAX_C UINT64_C + +#endif // __STDC_CONSTANT_MACROS ] + + +#endif // _MSC_STDINT_H_ ] diff --git a/tests/Makefile.am b/tests/Makefile.am index 7c8a4fe..3bd5e6e 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -1,4 +1,16 @@ noinst_PROGRAMS = test_tables -AM_CFLAGS = -I$(top_srcdir)/librabbitmq -I$(top_srcdir)/librabbitmq/$(PLATFORM_DIR) +AM_CFLAGS = -I$(top_srcdir)/librabbitmq + +if GCC +# Because we want to build under Microsoft's C compiler (for which +# there is apparently no demand for C99 support), it's a good idea +# to have gcc tell us when we stray from the old standard. +AM_CFLAGS += -ansi -pedantic +endif + +if USE_MSINTTYPES +AM_CFLAGS += -I$(top_srcdir)/msinttypes +endif + AM_LDFLAGS = $(top_builddir)/librabbitmq/librabbitmq.la diff --git a/tests/test_tables.c b/tests/test_tables.c index be1994b..5c0fbb9 100644 --- a/tests/test_tables.c +++ b/tests/test_tables.c @@ -51,19 +51,15 @@ #include <stdlib.h> #include <stdio.h> #include <string.h> -#include <time.h> #include <inttypes.h> #include <amqp.h> -#include <amqp_framing.h> -#include <amqp_private.h> - -#include <unistd.h> -#include <assert.h> #include <math.h> +#define M_PI 3.14159265358979323846264338327 + static void dump_indent(int indent) { int i; for (i = 0; i < indent; i++) { putchar(' '); } @@ -127,86 +123,152 @@ static void dump_value(int indent, amqp_field_value_t v) { } } -static void test_table_codec(void) { - amqp_table_entry_t inner_entries[2] = - { AMQP_TABLE_ENTRY_I32("one", 54321), - AMQP_TABLE_ENTRY_UTF8("two", amqp_cstring_bytes("A long string")) }; - amqp_table_t inner_table = { .num_entries = sizeof(inner_entries) / sizeof(inner_entries[0]), - .entries = &inner_entries[0] }; - - amqp_field_value_t inner_values[2] = - { AMQP_FIELD_VALUE_I32(54321), - AMQP_FIELD_VALUE_UTF8(amqp_cstring_bytes("A long string")) }; - amqp_array_t inner_array = { .num_entries = sizeof(inner_values) / sizeof(inner_values[0]), - .entries = &inner_values[0] }; - - amqp_table_entry_t entries[14] = - { AMQP_TABLE_ENTRY_UTF8("longstr", amqp_cstring_bytes("Here is a long string")), - AMQP_TABLE_ENTRY_I32("signedint", 12345), - AMQP_TABLE_ENTRY_DECIMAL("decimal", AMQP_DECIMAL(3, 123456)), - AMQP_TABLE_ENTRY_TIMESTAMP("timestamp", 109876543209876), - AMQP_TABLE_ENTRY_TABLE("table", inner_table), - AMQP_TABLE_ENTRY_I8("byte", 255), - AMQP_TABLE_ENTRY_I64("long", 1234567890), - AMQP_TABLE_ENTRY_I16("short", 655), - AMQP_TABLE_ENTRY_BOOLEAN("bool", 1), - AMQP_TABLE_ENTRY_BYTES("binary", amqp_cstring_bytes("a binary string")), - AMQP_TABLE_ENTRY_VOID("void"), - AMQP_TABLE_ENTRY_ARRAY("array", inner_array), - AMQP_TABLE_ENTRY_F32("float", M_PI), - AMQP_TABLE_ENTRY_F64("double", M_PI) }; - amqp_table_t table = { .num_entries = sizeof(entries) / sizeof(entries[0]), - .entries = &entries[0] }; - - uint8_t pre_encoded_table[] = { - 0x00, 0x00, 0x00, 0xff, 0x07, 0x6c, 0x6f, 0x6e, - 0x67, 0x73, 0x74, 0x72, 0x53, 0x00, 0x00, 0x00, - 0x15, 0x48, 0x65, 0x72, 0x65, 0x20, 0x69, 0x73, - 0x20, 0x61, 0x20, 0x6c, 0x6f, 0x6e, 0x67, 0x20, - 0x73, 0x74, 0x72, 0x69, 0x6e, 0x67, 0x09, 0x73, - 0x69, 0x67, 0x6e, 0x65, 0x64, 0x69, 0x6e, 0x74, - 0x49, 0x00, 0x00, 0x30, 0x39, 0x07, 0x64, 0x65, - 0x63, 0x69, 0x6d, 0x61, 0x6c, 0x44, 0x03, 0x00, - 0x01, 0xe2, 0x40, 0x09, 0x74, 0x69, 0x6d, 0x65, - 0x73, 0x74, 0x61, 0x6d, 0x70, 0x54, 0x00, 0x00, - 0x63, 0xee, 0xa0, 0x53, 0xc1, 0x94, 0x05, 0x74, - 0x61, 0x62, 0x6c, 0x65, 0x46, 0x00, 0x00, 0x00, - 0x1f, 0x03, 0x6f, 0x6e, 0x65, 0x49, 0x00, 0x00, - 0xd4, 0x31, 0x03, 0x74, 0x77, 0x6f, 0x53, 0x00, - 0x00, 0x00, 0x0d, 0x41, 0x20, 0x6c, 0x6f, 0x6e, - 0x67, 0x20, 0x73, 0x74, 0x72, 0x69, 0x6e, 0x67, - 0x04, 0x62, 0x79, 0x74, 0x65, 0x62, 0xff, 0x04, - 0x6c, 0x6f, 0x6e, 0x67, 0x6c, 0x00, 0x00, 0x00, - 0x00, 0x49, 0x96, 0x02, 0xd2, 0x05, 0x73, 0x68, - 0x6f, 0x72, 0x74, 0x73, 0x02, 0x8f, 0x04, 0x62, - 0x6f, 0x6f, 0x6c, 0x74, 0x01, 0x06, 0x62, 0x69, - 0x6e, 0x61, 0x72, 0x79, 0x78, 0x00, 0x00, 0x00, - 0x0f, 0x61, 0x20, 0x62, 0x69, 0x6e, 0x61, 0x72, - 0x79, 0x20, 0x73, 0x74, 0x72, 0x69, 0x6e, 0x67, - 0x04, 0x76, 0x6f, 0x69, 0x64, 0x56, 0x05, 0x61, - 0x72, 0x72, 0x61, 0x79, 0x41, 0x00, 0x00, 0x00, - 0x17, 0x49, 0x00, 0x00, 0xd4, 0x31, 0x53, 0x00, - 0x00, 0x00, 0x0d, 0x41, 0x20, 0x6c, 0x6f, 0x6e, - 0x67, 0x20, 0x73, 0x74, 0x72, 0x69, 0x6e, 0x67, - 0x05, 0x66, 0x6c, 0x6f, 0x61, 0x74, 0x66, 0x40, - 0x49, 0x0f, 0xdb, 0x06, 0x64, 0x6f, 0x75, 0x62, - 0x6c, 0x65, 0x64, 0x40, 0x09, 0x21, 0xfb, 0x54, - 0x44, 0x2d, 0x18 - }; +static uint8_t pre_encoded_table[] = { + 0x00, 0x00, 0x00, 0xff, 0x07, 0x6c, 0x6f, 0x6e, + 0x67, 0x73, 0x74, 0x72, 0x53, 0x00, 0x00, 0x00, + 0x15, 0x48, 0x65, 0x72, 0x65, 0x20, 0x69, 0x73, + 0x20, 0x61, 0x20, 0x6c, 0x6f, 0x6e, 0x67, 0x20, + 0x73, 0x74, 0x72, 0x69, 0x6e, 0x67, 0x09, 0x73, + 0x69, 0x67, 0x6e, 0x65, 0x64, 0x69, 0x6e, 0x74, + 0x49, 0x00, 0x00, 0x30, 0x39, 0x07, 0x64, 0x65, + 0x63, 0x69, 0x6d, 0x61, 0x6c, 0x44, 0x03, 0x00, + 0x01, 0xe2, 0x40, 0x09, 0x74, 0x69, 0x6d, 0x65, + 0x73, 0x74, 0x61, 0x6d, 0x70, 0x54, 0x00, 0x00, + 0x63, 0xee, 0xa0, 0x53, 0xc1, 0x94, 0x05, 0x74, + 0x61, 0x62, 0x6c, 0x65, 0x46, 0x00, 0x00, 0x00, + 0x1f, 0x03, 0x6f, 0x6e, 0x65, 0x49, 0x00, 0x00, + 0xd4, 0x31, 0x03, 0x74, 0x77, 0x6f, 0x53, 0x00, + 0x00, 0x00, 0x0d, 0x41, 0x20, 0x6c, 0x6f, 0x6e, + 0x67, 0x20, 0x73, 0x74, 0x72, 0x69, 0x6e, 0x67, + 0x04, 0x62, 0x79, 0x74, 0x65, 0x62, 0xff, 0x04, + 0x6c, 0x6f, 0x6e, 0x67, 0x6c, 0x00, 0x00, 0x00, + 0x00, 0x49, 0x96, 0x02, 0xd2, 0x05, 0x73, 0x68, + 0x6f, 0x72, 0x74, 0x73, 0x02, 0x8f, 0x04, 0x62, + 0x6f, 0x6f, 0x6c, 0x74, 0x01, 0x06, 0x62, 0x69, + 0x6e, 0x61, 0x72, 0x79, 0x78, 0x00, 0x00, 0x00, + 0x0f, 0x61, 0x20, 0x62, 0x69, 0x6e, 0x61, 0x72, + 0x79, 0x20, 0x73, 0x74, 0x72, 0x69, 0x6e, 0x67, + 0x04, 0x76, 0x6f, 0x69, 0x64, 0x56, 0x05, 0x61, + 0x72, 0x72, 0x61, 0x79, 0x41, 0x00, 0x00, 0x00, + 0x17, 0x49, 0x00, 0x00, 0xd4, 0x31, 0x53, 0x00, + 0x00, 0x00, 0x0d, 0x41, 0x20, 0x6c, 0x6f, 0x6e, + 0x67, 0x20, 0x73, 0x74, 0x72, 0x69, 0x6e, 0x67, + 0x05, 0x66, 0x6c, 0x6f, 0x61, 0x74, 0x66, 0x40, + 0x49, 0x0f, 0xdb, 0x06, 0x64, 0x6f, 0x75, 0x62, + 0x6c, 0x65, 0x64, 0x40, 0x09, 0x21, 0xfb, 0x54, + 0x44, 0x2d, 0x18 +}; +static void test_table_codec(void) { amqp_pool_t pool; int result; + amqp_table_entry_t inner_entries[2]; + amqp_table_t inner_table; + + amqp_field_value_t inner_values[2]; + amqp_array_t inner_array; + + amqp_table_entry_t entries[14]; + amqp_table_t table; + + inner_entries[0].key = amqp_cstring_bytes("one"); + inner_entries[0].value.kind = AMQP_FIELD_KIND_I32; + inner_entries[0].value.value.i32 = 54321; + + inner_entries[1].key = amqp_cstring_bytes("two"); + inner_entries[1].value.kind = AMQP_FIELD_KIND_UTF8; + inner_entries[1].value.value.bytes = amqp_cstring_bytes("A long string"); + + inner_table.num_entries = 2; + inner_table.entries = inner_entries; + + inner_values[0].kind = AMQP_FIELD_KIND_I32; + inner_values[0].value.i32 = 54321; + + inner_values[1].kind = AMQP_FIELD_KIND_UTF8; + inner_values[1].value.bytes = amqp_cstring_bytes("A long string"); + + inner_array.num_entries = 2; + inner_array.entries = inner_values; + + entries[0].key = amqp_cstring_bytes("longstr"); + entries[0].value.kind = AMQP_FIELD_KIND_UTF8; + entries[0].value.value.bytes = amqp_cstring_bytes("Here is a long string"); + + entries[1].key = amqp_cstring_bytes("signedint"); + entries[1].value.kind = AMQP_FIELD_KIND_I32; + entries[1].value.value.i32 = 12345; + + entries[2].key = amqp_cstring_bytes("decimal"); + entries[2].value.kind = AMQP_FIELD_KIND_DECIMAL; + entries[2].value.value.decimal.decimals = 3; + entries[2].value.value.decimal.value = 123456; + + entries[3].key = amqp_cstring_bytes("timestamp"); + entries[3].value.kind = AMQP_FIELD_KIND_TIMESTAMP; + entries[3].value.value.u64 = 109876543209876; + + entries[4].key = amqp_cstring_bytes("table"); + entries[4].value.kind = AMQP_FIELD_KIND_TABLE; + entries[4].value.value.table = inner_table; + + entries[5].key = amqp_cstring_bytes("byte"); + entries[5].value.kind = AMQP_FIELD_KIND_I8; + entries[5].value.value.i8 = (int8_t)255; + + entries[6].key = amqp_cstring_bytes("long"); + entries[6].value.kind = AMQP_FIELD_KIND_I64; + entries[6].value.value.i64 = 1234567890; + + entries[7].key = amqp_cstring_bytes("short"); + entries[7].value.kind = AMQP_FIELD_KIND_I16; + entries[7].value.value.i16 = 655; + + entries[8].key = amqp_cstring_bytes("bool"); + entries[8].value.kind = AMQP_FIELD_KIND_BOOLEAN; + entries[8].value.value.boolean = 1; + + entries[9].key = amqp_cstring_bytes("binary"); + entries[9].value.kind = AMQP_FIELD_KIND_BYTES; + entries[9].value.value.bytes = amqp_cstring_bytes("a binary string"); + + entries[10].key = amqp_cstring_bytes("void"); + entries[10].value.kind = AMQP_FIELD_KIND_VOID; + + entries[11].key = amqp_cstring_bytes("array"); + entries[11].value.kind = AMQP_FIELD_KIND_ARRAY; + entries[11].value.value.array = inner_array; + + entries[12].key = amqp_cstring_bytes("float"); + entries[12].value.kind = AMQP_FIELD_KIND_F32; + entries[12].value.value.f32 = M_PI; + + entries[13].key = amqp_cstring_bytes("double"); + entries[13].value.kind = AMQP_FIELD_KIND_F64; + entries[13].value.value.f64 = M_PI; + + table.num_entries = 14; + table.entries = entries; + printf("AAAAAAAAAA\n"); - dump_value(0, (amqp_field_value_t) AMQP_FIELD_VALUE_TABLE(table)); + + { + amqp_field_value_t val; + val.kind = AMQP_FIELD_KIND_TABLE; + val.value.table = table; + dump_value(0, val); + } init_amqp_pool(&pool, 4096); { - amqp_bytes_t decoding_bytes = { .len = sizeof(pre_encoded_table), - .bytes = pre_encoded_table }; amqp_table_t decoded; - int decoding_offset = 0; + size_t decoding_offset = 0; + amqp_bytes_t decoding_bytes; + decoding_bytes.len = sizeof(pre_encoded_table); + decoding_bytes.bytes = pre_encoded_table; + result = amqp_decode_table(decoding_bytes, &pool, &decoded, &decoding_offset); if (result < 0) { char *errstr = amqp_error_string(-result); @@ -215,13 +277,20 @@ static void test_table_codec(void) { abort(); } printf("BBBBBBBBBB\n"); - dump_value(0, (amqp_field_value_t) AMQP_FIELD_VALUE_TABLE(decoded)); + + { + amqp_field_value_t val; + val.kind = AMQP_FIELD_KIND_TABLE; + val.value.table = decoded; + + dump_value(0, val); + } } { uint8_t encoding_buffer[4096]; amqp_bytes_t encoding_result; - int offset = 0; + size_t offset = 0; memset(&encoding_buffer[0], 0, sizeof(encoding_buffer)); encoding_result.len = sizeof(encoding_buffer); @@ -236,7 +305,7 @@ static void test_table_codec(void) { } if (offset != sizeof(pre_encoded_table)) { - printf("Offset should be %d, was %d\n", (int) sizeof(pre_encoded_table), offset); + printf("Offset should be %d, was %d\n", (int) sizeof(pre_encoded_table), (int)offset); abort(); } @@ -251,17 +320,8 @@ static void test_table_codec(void) { } int main(int argc, char const * const *argv) { - amqp_table_entry_t entries[8] = - { AMQP_TABLE_ENTRY_UTF8("zebra", amqp_cstring_bytes("last")), - AMQP_TABLE_ENTRY_UTF8("aardvark", amqp_cstring_bytes("first")), - AMQP_TABLE_ENTRY_UTF8("middle", amqp_cstring_bytes("third")), - AMQP_TABLE_ENTRY_I32("number", 1234), - AMQP_TABLE_ENTRY_DECIMAL("decimal", AMQP_DECIMAL(2, 1234)), - AMQP_TABLE_ENTRY_TIMESTAMP("time", (uint64_t) 1234123412341234LL), - AMQP_TABLE_ENTRY_UTF8("beta", amqp_cstring_bytes("second")), - AMQP_TABLE_ENTRY_UTF8("wombat", amqp_cstring_bytes("fourth")) }; - amqp_table_t table = { .num_entries = sizeof(entries) / sizeof(entries[0]), - .entries = &entries[0] }; + amqp_table_entry_t entries[8]; + amqp_table_t table; union { uint32_t i; @@ -272,6 +332,42 @@ int main(int argc, char const * const *argv) { double d; } vl; + entries[0].key = amqp_cstring_bytes("zebra"); + entries[0].value.kind = AMQP_FIELD_KIND_UTF8; + entries[0].value.value.bytes = amqp_cstring_bytes("last"); + + entries[1].key = amqp_cstring_bytes("aardvark"); + entries[1].value.kind = AMQP_FIELD_KIND_UTF8; + entries[1].value.value.bytes = amqp_cstring_bytes("first"); + + entries[2].key = amqp_cstring_bytes("middle"); + entries[2].value.kind = AMQP_FIELD_KIND_UTF8; + entries[2].value.value.bytes = amqp_cstring_bytes("third"); + + entries[3].key = amqp_cstring_bytes("number"); + entries[3].value.kind = AMQP_FIELD_KIND_I32; + entries[3].value.value.i32 = 1234; + + entries[4].key = amqp_cstring_bytes("decimal"); + entries[4].value.kind = AMQP_FIELD_KIND_DECIMAL; + entries[4].value.value.decimal.decimals = 2; + entries[4].value.value.decimal.value = 1234; + + entries[5].key = amqp_cstring_bytes("time"); + entries[5].value.kind = AMQP_FIELD_KIND_TIMESTAMP; + entries[5].value.value.u64 = 1234123412341234; + + entries[6].key = amqp_cstring_bytes("beta"); + entries[6].value.kind = AMQP_FIELD_KIND_UTF8; + entries[6].value.value.bytes = amqp_cstring_bytes("second"); + + entries[7].key = amqp_cstring_bytes("wombat"); + entries[7].value.kind = AMQP_FIELD_KIND_UTF8; + entries[7].value.value.bytes = amqp_cstring_bytes("fourth"); + + table.num_entries = 8; + table.entries = entries; + vi.f = M_PI; if ((sizeof(float) != 4) || (vi.i != 0x40490fdb)) { printf("*** ERROR: single floating point encoding does not work as expected\n"); @@ -294,7 +390,14 @@ int main(int argc, char const * const *argv) { qsort(table.entries, table.num_entries, sizeof(amqp_table_entry_t), &amqp_table_entry_cmp); printf("----------\n"); - dump_value(0, (amqp_field_value_t) AMQP_FIELD_VALUE_TABLE(table)); + + { + amqp_field_value_t val; + val.kind = AMQP_FIELD_KIND_TABLE; + val.value.table = table; + + dump_value(0, val); + } return 0; } diff --git a/tools/common.c b/tools/common.c index f8b6985..7b0a969 100644 --- a/tools/common.c +++ b/tools/common.c @@ -354,5 +354,5 @@ void process_all_options(int argc, const char **argv, amqp_bytes_t cstring_bytes(const char *str) { - return str ? amqp_cstring_bytes(str) : AMQP_EMPTY_BYTES; + return str ? amqp_cstring_bytes(str) : amqp_empty_bytes; } diff --git a/tools/consume.c b/tools/consume.c index 34037d9..434a6f5 100644 --- a/tools/consume.c +++ b/tools/consume.c @@ -100,7 +100,7 @@ static amqp_bytes_t setup_queue(amqp_connection_state_t conn, /* Declare the queue as auto-delete. */ amqp_queue_declare_ok_t *res = amqp_queue_declare(conn, 1, queue_bytes, 0, 0, 1, 1, - AMQP_EMPTY_TABLE); + amqp_empty_table); if (!res) die_rpc(amqp_get_rpc_reply(conn), "queue.declare"); @@ -119,7 +119,7 @@ static amqp_bytes_t setup_queue(amqp_connection_state_t conn, amqp_bytes_t eb = amqp_cstring_bytes(exchange); if (!amqp_queue_bind(conn, 1, queue_bytes, eb, cstring_bytes(routing_key), - AMQP_EMPTY_TABLE)) + amqp_empty_table)) die_rpc(amqp_get_rpc_reply(conn), "queue.bind"); } @@ -131,8 +131,8 @@ static amqp_bytes_t setup_queue(amqp_connection_state_t conn, static void do_consume(amqp_connection_state_t conn, amqp_bytes_t queue, int no_ack, const char * const *argv) { - if (!amqp_basic_consume(conn, 1, queue, AMQP_EMPTY_BYTES, 0, no_ack, - 0, AMQP_EMPTY_TABLE)) + if (!amqp_basic_consume(conn, 1, queue, amqp_empty_bytes, 0, no_ack, + 0, amqp_empty_table)) die_rpc(amqp_get_rpc_reply(conn), "basic.consume"); for (;;) { diff --git a/tools/declare_queue.c b/tools/declare_queue.c index 3536455..145a15e 100644 --- a/tools/declare_queue.c +++ b/tools/declare_queue.c @@ -88,12 +88,11 @@ int main(int argc, const char **argv) durable, 0, 0, - AMQP_EMPTY_TABLE); - if (reply == NULL) { + amqp_empty_table); + if (reply == NULL) die_rpc(amqp_get_rpc_reply(conn), "queue.declare"); - } - write(1, reply->queue.bytes, reply->queue.len); - write(1, "\n", strlen("\n")); + + printf("%.*s\n", (int)reply->queue.len, (char *)reply->queue.bytes); } close_connection(conn); return 0; diff --git a/tools/windows/compat.c b/tools/windows/compat.c index f0508b2..bce318b 100644 --- a/tools/windows/compat.c +++ b/tools/windows/compat.c @@ -61,13 +61,16 @@ int asprintf(char **strp, const char *fmt, ...) va_start(ap, fmt); len = _vscprintf(fmt, ap); + va_end(ap); + *strp = malloc(len+1); if (!*strp) return -1; - len = vsprintf(*strp, fmt, ap); - *strp[len] = 0; - + va_start(ap, fmt); + _vsnprintf(*strp, len+1, fmt, ap); va_end(ap); + + (*strp)[len] = 0; return len; } diff --git a/tools/windows/process.c b/tools/windows/process.c index 0a005bd..d0e162a 100644 --- a/tools/windows/process.c +++ b/tools/windows/process.c @@ -98,37 +98,57 @@ static char *make_command_line(const char *const *argv) dest = buf = malloc(len); if (!buf) die("allocating memory for subprocess command line"); - - *dest++ = '\"'; + + /* Here we perform the inverse of the CommandLineToArgvW + function. Note that it's rules are slightly crazy: A + sequence of backslashes only act to escape if followed by + double quotes. A seuqence of backslashes not followed by + double quotes is unaffected. */ for (i = 0;;) { const char *src = argv[i]; + int backslashes = 0; + + *dest++ = '\"'; + for (;;) { switch (*src) { case 0: goto done; case '\"': + for (; backslashes; backslashes--) + *dest++ = '\\'; + + *dest++ = '\\'; + *dest++ = '\"'; + break; + case '\\': + backslashes++; *dest++ = '\\'; - /* fall through */ + break; default: - *dest++ = *src++; + backslashes = 0; + *dest++ = *src; break; } + + src++; } - done: + done: + for (; backslashes; backslashes--) + *dest++ = '\\'; + + *dest++ = '\"'; if (!argv[++i]) break; - *dest++ = '\"'; *dest++ = ' '; - *dest++ = '\"'; } - *dest++ = '\"'; *dest++ = 0; return buf; } |