summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2010-11-17 13:24:07 +0000
committerAlexandru Scvortov <alexandru@rabbitmq.com>2010-11-17 13:24:07 +0000
commit024a50f566496535cdfe5af0e70916a052845822 (patch)
tree1c3b07f7932f81d46a2117dbc732a92e1a1931de
parent5b41470ca661d528dbafadb5865eaaa2a4342c21 (diff)
parent56a9a6779917bf0dc4432a39dc9a533af65d12a2 (diff)
downloadrabbitmq-c-github-ask-024a50f566496535cdfe5af0e70916a052845822.tar.gz
merge bug23423 into default
-rw-r--r--README.windows122
-rw-r--r--configure.ac30
-rwxr-xr-xetc/build-ms.sh62
-rw-r--r--examples/Makefile.am35
-rw-r--r--examples/amqp_bind.c6
-rw-r--r--examples/amqp_consumer.c23
-rw-r--r--examples/amqp_exchange_declare.c6
-rw-r--r--examples/amqp_listen.c14
-rw-r--r--examples/amqp_listenq.c8
-rw-r--r--examples/amqp_producer.c23
-rw-r--r--examples/amqp_sendstring.c6
-rw-r--r--examples/amqp_unbind.c6
-rw-r--r--examples/unix/platform_utils.c (renamed from examples/example_utils.c)67
-rw-r--r--examples/utils.c (renamed from librabbitmq/amqp_debug.c)59
-rw-r--r--examples/utils.h (renamed from examples/example_utils.h)9
-rw-r--r--examples/windows/platform_utils.c66
-rw-r--r--librabbitmq/Makefile.am16
-rw-r--r--librabbitmq/amqp.h467
-rw-r--r--librabbitmq/amqp_api.c303
-rw-r--r--librabbitmq/amqp_connection.c480
-rw-r--r--librabbitmq/amqp_mem.c4
-rw-r--r--librabbitmq/amqp_private.h197
-rw-r--r--librabbitmq/amqp_socket.c185
-rw-r--r--librabbitmq/amqp_table.c497
-rw-r--r--librabbitmq/codegen.py346
-rw-r--r--librabbitmq/unix/socket.c9
-rw-r--r--librabbitmq/windows/socket.c12
-rw-r--r--librabbitmq/windows/socket.h2
-rw-r--r--msinttypes/inttypes.h305
-rw-r--r--msinttypes/stdint.h247
-rw-r--r--tests/Makefile.am14
-rw-r--r--tests/test_tables.c285
-rw-r--r--tools/common.c2
-rw-r--r--tools/consume.c8
-rw-r--r--tools/declare_queue.c9
-rw-r--r--tools/windows/compat.c9
-rw-r--r--tools/windows/process.c36
37 files changed, 2507 insertions, 1468 deletions
diff --git a/README.windows b/README.windows
index 33c8f06..836a3be 100644
--- a/README.windows
+++ b/README.windows
@@ -1,76 +1,106 @@
-# rabbitmq-c and Windows
+# Using rabbitmq-c on Windows
-rabbitmq-c can now be built on Windows using the MinGW/MSYS ports of
-the GNU toolchain and miscellaneous utilities. This includes the
-example programs and tools.
+There are two approaches to building rabbitmq-c under Windows:
-The results are native Windows DLLs and EXEs, and can be used without
-having MinGW installed. But the librabbitmq header files currently
-use GCC extensions, and for this reason it is still not possible to
-use Microsoft's C/C++ to build applications against the librabbitmq
-DLL. Hopefully this will get fixed before long.
+- Build using the MinGW/MSYS (MinGW/MSYS is a port of the GNU
+ toolchain and utilities to Windows, including the gcc compiler).
+ The results of building in this way are native Windows DLLs and
+ EXEs, and can be used without having MinGW installed. The drawback
+ to this approach is that you cannot safely call the resulting
+ librabbitmq DLL from code compiled with Microsoft's C compiler. The
+ advantage is that the whole of rabbitmq-c can be built under
+ Windows, including the tools.
+- Build using Microsoft's C compiler. You will still need to install
+ MinGW/MSYS in order to run the rabbitmq-c build scripts, but
+ Microsoft's compiler is used to compile the code. The resulting
+ librabbitmq DLL can be used from code compiled with Microsoft's C
+ compiler (i.e. code developed in Visual Studio). The downside to
+ this approach is that the rabbitmq-c tools cannot be built, due to
+ dependencies on other libraries.
-# Building rabbitmq-c
-rabbitmq-c is built on Windows using MinGW and MSYS. In brief, MinGW
-is a native port of the GNU toolchain to Windows; MSYS is a set of
-ports of common GNU utilities to run under Windows, so that typical
-autotools-based builds will work there. MinGW/MSYS can be used to
-build native Windows applications and DLLs, which do not depend on
-MinGW/MSYS to run.
+## Common steps
-So to build rabbitmq-c on Windows, you need to download and install
-the relevant parts of MinGW/MSYS. This can be fairly time consuming -
-there are dozens of files to be downloaded and unpacked. To make it
-easier, we provide a bash script that automates this process, in
-rabbitmq-c/etc/install-mingw.sh. You can run this under cygwin, or
-under Linux and copy the results over or put them on a shared drive.
-Some MinGW packages are .tar.lzma files, so it requires a system with
-the xz compression utility and a tar that supports the -J option.
+With either of the approaches, the initial steps are the same: You
+should download and install MinGW/MSYS and Python.
-Run install-mingw.sh specifying the destination directory, e.g.
+Installing installing the relevant parts of MinGW/MSYS can be fairly
+time consuming - there are dozens of files to be downloaded and
+unpacked. To make it easier, we provide a bash script that automates
+this process, in `rabbitmq-c/etc/install-mingw.sh`. You can run this
+script under cygwin or Linux (obviously if you use Linux you'll need
+to transfer the resulting files over to the Windows machine).
+
+Note that some MinGW packages are .tar.lzma files, so it requires a
+system with the xz compression utility and a tar that supports the -J
+option. Recent cygwin and Linux distros should be fine here.
+
+Run the install-mingw.sh script specifying the destination directory,
+e.g.
$ etc/install-mingw.sh mingw
-Python is needed for the rabbitmq-c build, so you will also need to
-install python under Windows. The Windows installer from python.org
+This will download all the required MinGW/MSYS packages, and unpack
+them into the `mingw` directory.
+
+The other prerequisite for the rabbitmq-c build is Python. The
+Windows installer from python.org for the latest 2.x version of Python
will do fine.
-You will need to copy the source code for rabbitmq-c and
-rabbitmq-codegen somewhere under your mingw directory.
+You will also need to copy the source code for rabbitmq-c and
+rabbitmq-codegen somewhere under your `mingw` directory.
-Open a cmd window, and ensure that both the MinGW bin directory and
-the python install directory are in the path, e.g.
+Then to start the MSYS bash shell, open a `cmd` window, and ensure
+that both the MinGW bin directory and the python install directory are
+in the path, e.g.
- C:\>set PATH=%PATH%;C:\mingw\bin;C:\Python26
+ C:\>set PATH=%PATH%;C:\mingw\bin;C:\Python27
Then start bash, and run the following mount command (substituting the
-Windows path of your MinGW install):
+Windows path of your MinGW install if it isn't `C:\mingw`):
C:\>bash
bash-3.1$ mount 'C:\mingw' /mingw
-Then go to the rabbitmq-c directory. If you got the rabbitmq-c
-directory from Mercurial (which is the only way to get it at the
-moment), you will need to run autoreconf to produce the configuration
-scripts:
+Finally, go to wherever you copied the rabbitmq-c source.
+
+ bash-3.1$ cd /rabbitmq-c
+
+
+## Building rabbitmq-c with Microsoft's C compiler
+
+The Microsoft C/C++ compiler is part of MS Visual Studio, including
+the gratis Visual Studio Express. Visual Studio 2005 and higher are
+known to work.
+
+Start by following the steps in the previous section. The GNU build
+tools have limited support for Microsoft toolchain, but the
+install-mingw.sh script will install versions of the packages that are
+known to be suitable. In particular, only libtool version 2.2.7a is
+known to work; later versions have been reported to introduce
+problems.
+
+Once you are at the bash prompt, build rabbitmq-c by running the
+script in `rabbitmq-c/etc/build-ms.sh`:
- bash-3.1$ autoreconf -i
+ bash-3.1$ etc/build-ms.sh
-This will produce a few lines of informational output while it runs,
-but as long as it doesn't mention any errors, you are ok.
+You should end up with a directory `build` containing the librabbitmq
+DLL, the corresponding .lib file, and header files. These are
+sufficient to create applications using librabbitmq within Visual
+Studio.
-Finally, configure and make:
- bash-3.1$ ./configure && make
- [...]
+## Building rabbitmq-c with gcc
+There is no script to build rabbitmq-c with gcc, but it is as
+documented in the README file:
-# Running the tools without mingw
+ bash-3.1$ autoreconf -i && ./configure && make
-You can run the resulting tools EXEs without the rest of MinGW. To do
-this, copy the following files into a directory:
+You can run the resulting tool EXEs without needing the rest of MinGW. To do
+this, copy the following files into a single directory:
- rabbitmq-c/tools/.libs/*.exe
diff --git a/configure.ac b/configure.ac
index be7e695..4077362 100644
--- a/configure.ac
+++ b/configure.ac
@@ -15,9 +15,9 @@ AM_PROG_LIBTOOL
dnl Header-file checks
AC_HEADER_STDC
-dnl Only use -Wall if we have gcc
if test "x$GCC" = "xyes"; then
- if test -z "`echo "$CFLAGS" | grep "\-Wall" 2> /dev/null`" ; then
+ dnl Only use -Wall if we have gcc
+ if ! echo "$CFLAGS" | grep "\-Wall" 2> /dev/null ; then
CFLAGS="$CFLAGS -Wall"
fi
fi
@@ -35,6 +35,16 @@ AS_IF([test "x$windows" = xyes],
[AC_DEFINE([WINDOWS], [1], [Define to 1 if on Windows.])]
)
+AM_CONDITIONAL(GCC, test "x$GCC" = xyes)
+
+# Detect how to declare inline functions. Because we will sometimes
+# use "-ansi -pedantic" with gcc, we need to make sure the result will
+# work in that context.
+orig_cflags="$CFLAGS"
+AS_IF([test "x$GCC" = "xyes"], [CFLAGS="$CFLAGS -ansi -pedantic"])
+AC_C_INLINE
+CFLAGS="$orig_cflags"
+
dnl Decide which API abstraction layer to use
PLATFORM_DIR=unix
if test "x$windows" = xyes ; then
@@ -87,10 +97,22 @@ AC_SUBST(AMQP_CODEGEN_DIR)
AC_SUBST(AMQP_SPEC_JSON_PATH)
AC_SUBST(PYTHON)
-dnl Decide which extra win32 libs we need
+dnl Decide which extra win32 libs we need, and handle other special
+dnl cases when building with the Microsoft compiler
EXTRA_LIBS=
-AS_IF([test "x$windows" = xyes], [EXTRA_LIBS="-lws2_32 $EXTRA_LIBS"])
+USE_MISINTTYPES=
+AS_IF([test "x$windows" = xyes],
+ [
+ AS_IF([test "x$GCC" = xyes],
+ [EXTRA_LIBS="-lws2_32 $EXTRA_LIBS"],
+ [
+ EXTRA_LIBS="ws2_32.lib $EXTRA_LIBS"
+ USE_MSINTTYPES=yes
+ ])
+ ])
+
AC_SUBST(EXTRA_LIBS)
+AM_CONDITIONAL(USE_MSINTTYPES, test "x$USE_MSINTTYPES" != "x")
dnl Check for libpopt, which we need to build the tools
AC_ARG_WITH([popt],
diff --git a/etc/build-ms.sh b/etc/build-ms.sh
new file mode 100755
index 0000000..4a70b35
--- /dev/null
+++ b/etc/build-ms.sh
@@ -0,0 +1,62 @@
+#!/bin/bash
+
+# Build rabbitmq-c using Microsoft's C compiler
+
+set -e
+
+# Locate the necessary lib and include directories
+
+drive=$(echo "$SYSTEMDRIVE" | sed 's|^\([A-Za-z]\):$|/\1|')
+
+for vsvers in 10.0 9.0 8 ; do
+ vsdir="$drive/Program Files/Microsoft Visual Studio $vsvers"
+ [ -x "$vsdir/VC/bin/cl.exe" ] && break
+
+ vsdir="$drive/Program Files (x86)/Microsoft Visual Studio $vsvers"
+ [ -x "$vsdir/VC/bin/cl.exe" ] && break
+
+ vsdir=
+done
+
+if [ -z "$vsdir" ] ; then
+ echo "Couldn't find suitable Visual Studio installation"
+ exit 1
+fi
+
+echo "Using Visual Studio install at $vsdir"
+
+for sdkpath in "Microsoft SDKs/Windows/"{v7.0A,v6.0A} "Microsoft Visual Studio 8/VC/PlatformSDK" ; do
+ sdkdir="$drive/Program Files/$sdkpath"
+ [ -d "$sdkdir/lib" -a -d "$sdkdir/include" ] && break
+
+ sdkdir="$drive/Program Files (x86)/$sdkpath"
+ [ -d "$sdkdir/lib" -a -d "$sdkdir/include" ] && break
+
+ sdkdir=
+done
+
+if [ -z "$sdkdir" ] ; then
+ echo "Couldn't find suitable Windows SDK installation"
+ exit 1
+fi
+
+echo "Using Windows SDK install at $sdkdir"
+
+PATH="$PATH:$vsdir/VC/bin:$vsdir/Common7/IDE"
+LIB="$vsdir/VC/lib:$sdkdir/lib"
+INCLUDE="$vsdir/VC/include:$sdkdir/include"
+export PATH LIB INCLUDE
+
+# Do the build
+set -x
+autoreconf -i
+./configure CC=cl.exe LD=link.exe CFLAGS='-nologo'
+sed -i -e 's/^fix_srcfile_path=.*$/fix_srcfile_path=""/;s/^deplibs_check_method=.*$/deplibs_check_method=pass_all/;/^archive_cmds=/s/-link -dll/& -implib:\\$libname.\\$libext/' libtool
+make
+
+# Copy the results of the build into one place, as "make install"
+# isn't too useful here.
+mkdir -p build/lib build/include build/bin
+cp -a librabbitmq/.libs/*.dll examples/.libs/*.exe build/bin
+cp -a msinttypes/*.h librabbitmq/amqp.h librabbitmq/amqp_framing.h build/include
+cp -a librabbitmq/*.exp librabbitmq/*.lib build/lib
diff --git a/examples/Makefile.am b/examples/Makefile.am
index 9655021..ea10f77 100644
--- a/examples/Makefile.am
+++ b/examples/Makefile.am
@@ -1,16 +1,29 @@
-noinst_PROGRAMS = amqp_sendstring amqp_exchange_declare amqp_listen amqp_producer amqp_consumer \
- amqp_unbind amqp_bind amqp_listenq
+noinst_PROGRAMS = amqp_sendstring amqp_exchange_declare amqp_listen amqp_producer amqp_consumer amqp_unbind amqp_bind amqp_listenq
AM_CFLAGS = -I$(top_srcdir)/librabbitmq
+
+if GCC
+# Because we want to build under Microsoft's C compiler (for which
+# there is apparently no demand for C99 support), it's a good idea
+# to have gcc tell us when we stray from the old standard.
+AM_CFLAGS += -ansi -pedantic
+endif
+
+if USE_MSINTTYPES
+AM_CFLAGS += -I$(top_srcdir)/msinttypes
+endif
+
AM_LDFLAGS = $(top_builddir)/librabbitmq/librabbitmq.la
-noinst_HEADERS = example_utils.h
+noinst_HEADERS = utils.h
+
+COMMON_SOURCES = utils.c $(PLATFORM_DIR)/platform_utils.c
-amqp_sendstring_SOURCES = amqp_sendstring.c example_utils.c
-amqp_exchange_declare_SOURCES = amqp_exchange_declare.c example_utils.c
-amqp_listen_SOURCES = amqp_listen.c example_utils.c
-amqp_producer_SOURCES = amqp_producer.c example_utils.c
-amqp_consumer_SOURCES = amqp_consumer.c example_utils.c
-amqp_unbind_SOURCES = amqp_unbind.c example_utils.c
-amqp_bind_SOURCES = amqp_bind.c example_utils.c
-amqp_listenq_SOURCES = amqp_listenq.c example_utils.c
+amqp_sendstring_SOURCES = amqp_sendstring.c $(COMMON_SOURCES)
+amqp_exchange_declare_SOURCES = amqp_exchange_declare.c $(COMMON_SOURCES)
+amqp_listen_SOURCES = amqp_listen.c $(COMMON_SOURCES)
+amqp_producer_SOURCES = amqp_producer.c $(COMMON_SOURCES)
+amqp_consumer_SOURCES = amqp_consumer.c $(COMMON_SOURCES)
+amqp_unbind_SOURCES = amqp_unbind.c $(COMMON_SOURCES)
+amqp_bind_SOURCES = amqp_bind.c $(COMMON_SOURCES)
+amqp_listenq_SOURCES = amqp_listenq.c $(COMMON_SOURCES)
diff --git a/examples/amqp_bind.c b/examples/amqp_bind.c
index 1f183a5..561b84d 100644
--- a/examples/amqp_bind.c
+++ b/examples/amqp_bind.c
@@ -56,9 +56,7 @@
#include <amqp.h>
#include <amqp_framing.h>
-#include <unistd.h>
-
-#include "example_utils.h"
+#include "utils.h"
int main(int argc, char const * const *argv) {
char const *hostname;
@@ -94,7 +92,7 @@ int main(int argc, char const * const *argv) {
amqp_cstring_bytes(queue),
amqp_cstring_bytes(exchange),
amqp_cstring_bytes(bindingkey),
- AMQP_EMPTY_TABLE);
+ amqp_empty_table);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Unbinding");
die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel");
diff --git a/examples/amqp_consumer.c b/examples/amqp_consumer.c
index b1754f5..2710643 100644
--- a/examples/amqp_consumer.c
+++ b/examples/amqp_consumer.c
@@ -56,27 +56,26 @@
#include <amqp.h>
#include <amqp_framing.h>
-#include <unistd.h>
#include <assert.h>
-#include "example_utils.h"
+#include "utils.h"
#define SUMMARY_EVERY_US 1000000
static void run(amqp_connection_state_t conn)
{
- long long start_time = now_microseconds();
+ uint64_t start_time = now_microseconds();
int received = 0;
int previous_received = 0;
- long long previous_report_time = start_time;
- long long next_summary_time = start_time + SUMMARY_EVERY_US;
+ uint64_t previous_report_time = start_time;
+ uint64_t next_summary_time = start_time + SUMMARY_EVERY_US;
amqp_frame_t frame;
int result;
size_t body_received;
size_t body_target;
- long long now;
+ uint64_t now;
while (1) {
now = now_microseconds();
@@ -150,8 +149,8 @@ int main(int argc, char const * const *argv) {
hostname = argv[1];
port = atoi(argv[2]);
- exchange = "amq.direct"; //argv[3];
- bindingkey = "test queue"; //argv[4];
+ exchange = "amq.direct"; /* argv[3]; */
+ bindingkey = "test queue"; /* argv[4]; */
conn = amqp_new_connection();
@@ -163,8 +162,8 @@ int main(int argc, char const * const *argv) {
die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");
{
- amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, 1, AMQP_EMPTY_BYTES, 0, 0, 0, 1,
- AMQP_EMPTY_TABLE);
+ amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, 1, amqp_empty_bytes, 0, 0, 0, 1,
+ amqp_empty_table);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring queue");
queuename = amqp_bytes_malloc_dup(r->queue);
if (queuename.bytes == NULL) {
@@ -174,10 +173,10 @@ int main(int argc, char const * const *argv) {
}
amqp_queue_bind(conn, 1, queuename, amqp_cstring_bytes(exchange), amqp_cstring_bytes(bindingkey),
- AMQP_EMPTY_TABLE);
+ amqp_empty_table);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Binding queue");
- amqp_basic_consume(conn, 1, queuename, AMQP_EMPTY_BYTES, 0, 1, 0, AMQP_EMPTY_TABLE);
+ amqp_basic_consume(conn, 1, queuename, amqp_empty_bytes, 0, 1, 0, amqp_empty_table);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Consuming");
run(conn);
diff --git a/examples/amqp_exchange_declare.c b/examples/amqp_exchange_declare.c
index e77ac52..9b410ca 100644
--- a/examples/amqp_exchange_declare.c
+++ b/examples/amqp_exchange_declare.c
@@ -56,9 +56,7 @@
#include <amqp.h>
#include <amqp_framing.h>
-#include <unistd.h>
-
-#include "example_utils.h"
+#include "utils.h"
int main(int argc, char const * const *argv) {
char const *hostname;
@@ -89,7 +87,7 @@ int main(int argc, char const * const *argv) {
die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");
amqp_exchange_declare(conn, 1, amqp_cstring_bytes(exchange), amqp_cstring_bytes(exchangetype),
- 0, 0, AMQP_EMPTY_TABLE);
+ 0, 0, amqp_empty_table);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring exchange");
die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel");
diff --git a/examples/amqp_listen.c b/examples/amqp_listen.c
index f208652..86c4555 100644
--- a/examples/amqp_listen.c
+++ b/examples/amqp_listen.c
@@ -56,13 +56,9 @@
#include <amqp.h>
#include <amqp_framing.h>
-#include <unistd.h>
#include <assert.h>
-#include "example_utils.h"
-
-/* Private: compiled out in NDEBUG mode */
-extern void amqp_dump(void const *buffer, size_t len);
+#include "utils.h"
int main(int argc, char const * const *argv) {
char const *hostname;
@@ -95,8 +91,8 @@ int main(int argc, char const * const *argv) {
die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");
{
- amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, 1, AMQP_EMPTY_BYTES, 0, 0, 0, 1,
- AMQP_EMPTY_TABLE);
+ amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, 1, amqp_empty_bytes, 0, 0, 0, 1,
+ amqp_empty_table);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring queue");
queuename = amqp_bytes_malloc_dup(r->queue);
if (queuename.bytes == NULL) {
@@ -106,10 +102,10 @@ int main(int argc, char const * const *argv) {
}
amqp_queue_bind(conn, 1, queuename, amqp_cstring_bytes(exchange), amqp_cstring_bytes(bindingkey),
- AMQP_EMPTY_TABLE);
+ amqp_empty_table);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Binding queue");
- amqp_basic_consume(conn, 1, queuename, AMQP_EMPTY_BYTES, 0, 1, 0, AMQP_EMPTY_TABLE);
+ amqp_basic_consume(conn, 1, queuename, amqp_empty_bytes, 0, 1, 0, amqp_empty_table);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Consuming");
{
diff --git a/examples/amqp_listenq.c b/examples/amqp_listenq.c
index 98c389f..c7c2a1e 100644
--- a/examples/amqp_listenq.c
+++ b/examples/amqp_listenq.c
@@ -56,13 +56,9 @@
#include <amqp.h>
#include <amqp_framing.h>
-#include <unistd.h>
#include <assert.h>
-#include "example_utils.h"
-
-/* Private: compiled out in NDEBUG mode */
-extern void amqp_dump(void const *buffer, size_t len);
+#include "utils.h"
int main(int argc, char const * const *argv) {
char const *hostname;
@@ -90,7 +86,7 @@ int main(int argc, char const * const *argv) {
amqp_channel_open(conn, 1);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");
- amqp_basic_consume(conn, 1, amqp_cstring_bytes(queuename), AMQP_EMPTY_BYTES, 0, 0, 0, AMQP_EMPTY_TABLE);
+ amqp_basic_consume(conn, 1, amqp_cstring_bytes(queuename), amqp_empty_bytes, 0, 0, 0, amqp_empty_table);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Consuming");
{
diff --git a/examples/amqp_producer.c b/examples/amqp_producer.c
index b83e030..77c3e92 100644
--- a/examples/amqp_producer.c
+++ b/examples/amqp_producer.c
@@ -56,9 +56,7 @@
#include <amqp.h>
#include <amqp_framing.h>
-#include <unistd.h>
-
-#include "example_utils.h"
+#include "utils.h"
#define SUMMARY_EVERY_US 1000000
@@ -67,21 +65,26 @@ static void send_batch(amqp_connection_state_t conn,
int rate_limit,
int message_count)
{
- long long start_time = now_microseconds();
+ uint64_t start_time = now_microseconds();
int i;
int sent = 0;
int previous_sent = 0;
- long long previous_report_time = start_time;
- long long next_summary_time = start_time + SUMMARY_EVERY_US;
+ uint64_t previous_report_time = start_time;
+ uint64_t next_summary_time = start_time + SUMMARY_EVERY_US;
char message[256];
+ amqp_bytes_t message_bytes;
for (i = 0; i < sizeof(message); i++) {
message[i] = i & 0xff;
}
+ message_bytes.len = sizeof(message);
+ message_bytes.bytes = message;
+
for (i = 0; i < message_count; i++) {
- long long now = now_microseconds();
+ uint64_t now = now_microseconds();
+
die_on_error(amqp_basic_publish(conn,
1,
amqp_cstring_bytes("amq.direct"),
@@ -89,7 +92,7 @@ static void send_batch(amqp_connection_state_t conn,
0,
0,
NULL,
- (amqp_bytes_t) {.len = sizeof(message), .bytes = message}),
+ message_bytes),
"Publishing");
sent++;
if (now > next_summary_time) {
@@ -104,13 +107,13 @@ static void send_batch(amqp_connection_state_t conn,
}
while (((i * 1000000.0) / (now - start_time)) > rate_limit) {
- usleep(2000);
+ microsleep(2000);
now = now_microseconds();
}
}
{
- long long stop_time = now_microseconds();
+ uint64_t stop_time = now_microseconds();
int total_delta = stop_time - start_time;
printf("PRODUCER - Message count: %d\n", message_count);
diff --git a/examples/amqp_sendstring.c b/examples/amqp_sendstring.c
index ccd3866..3c94e41 100644
--- a/examples/amqp_sendstring.c
+++ b/examples/amqp_sendstring.c
@@ -56,9 +56,7 @@
#include <amqp.h>
#include <amqp_framing.h>
-#include <unistd.h>
-
-#include "example_utils.h"
+#include "utils.h"
int main(int argc, char const * const *argv) {
char const *hostname;
@@ -94,7 +92,7 @@ int main(int argc, char const * const *argv) {
amqp_basic_properties_t props;
props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
props.content_type = amqp_cstring_bytes("text/plain");
- props.delivery_mode = 2; // persistent delivery mode
+ props.delivery_mode = 2; /* persistent delivery mode */
die_on_error(amqp_basic_publish(conn,
1,
amqp_cstring_bytes(exchange),
diff --git a/examples/amqp_unbind.c b/examples/amqp_unbind.c
index 4b92e12..fad3e2b 100644
--- a/examples/amqp_unbind.c
+++ b/examples/amqp_unbind.c
@@ -56,9 +56,7 @@
#include <amqp.h>
#include <amqp_framing.h>
-#include <unistd.h>
-
-#include "example_utils.h"
+#include "utils.h"
int main(int argc, char const * const *argv) {
char const *hostname;
@@ -94,7 +92,7 @@ int main(int argc, char const * const *argv) {
amqp_cstring_bytes(queue),
amqp_cstring_bytes(exchange),
amqp_cstring_bytes(bindingkey),
- AMQP_EMPTY_TABLE);
+ amqp_empty_table);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Unbinding");
die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel");
diff --git a/examples/example_utils.c b/examples/unix/platform_utils.c
index 48f21f9..b039c31 100644
--- a/examples/example_utils.c
+++ b/examples/unix/platform_utils.c
@@ -48,69 +48,22 @@
* ***** END LICENSE BLOCK *****
*/
-#include <stdlib.h>
-#include <stdio.h>
-#include <string.h>
+/* For usleep */
+#define _BSD_SOURCE
#include <stdint.h>
-#include <amqp.h>
-#include <amqp_framing.h>
#include <sys/time.h>
#include <unistd.h>
-void die_on_error(int x, char const *context) {
- if (x < 0) {
- char *errstr = amqp_error_string(-x);
- fprintf(stderr, "%s: %s\n", context, errstr);
- free(errstr);
- exit(1);
- }
-}
-
-void die_on_amqp_error(amqp_rpc_reply_t x, char const *context) {
- switch (x.reply_type) {
- case AMQP_RESPONSE_NORMAL:
- return;
-
- case AMQP_RESPONSE_NONE:
- fprintf(stderr, "%s: missing RPC reply type!", context);
- break;
-
- case AMQP_RESPONSE_LIBRARY_EXCEPTION:
- fprintf(stderr, "%s: %s\n", context, amqp_error_string(x.library_error));
- break;
-
- case AMQP_RESPONSE_SERVER_EXCEPTION:
- switch (x.reply.id) {
- case AMQP_CONNECTION_CLOSE_METHOD: {
- amqp_connection_close_t *m = (amqp_connection_close_t *) x.reply.decoded;
- fprintf(stderr, "%s: server connection error %d, message: %.*s",
- context,
- m->reply_code,
- (int) m->reply_text.len, (char *) m->reply_text.bytes);
- break;
- }
- case AMQP_CHANNEL_CLOSE_METHOD: {
- amqp_channel_close_t *m = (amqp_channel_close_t *) x.reply.decoded;
- fprintf(stderr, "%s: server channel error %d, message: %.*s",
- context,
- m->reply_code,
- (int) m->reply_text.len, (char *) m->reply_text.bytes);
- break;
- }
- default:
- fprintf(stderr, "%s: unknown server error, method id 0x%08X", context, x.reply.id);
- break;
- }
- break;
- }
-
- exit(1);
-}
-
-long long now_microseconds(void) {
+uint64_t now_microseconds(void)
+{
struct timeval tv;
gettimeofday(&tv, NULL);
- return (long long) tv.tv_sec * 1000000 + (long long) tv.tv_usec;
+ return (uint64_t) tv.tv_sec * 1000000 + (uint64_t) tv.tv_usec;
+}
+
+void microsleep(int usec)
+{
+ usleep(usec);
}
diff --git a/librabbitmq/amqp_debug.c b/examples/utils.c
index 7258696..1837be8 100644
--- a/librabbitmq/amqp_debug.c
+++ b/examples/utils.c
@@ -48,12 +48,67 @@
* ***** END LICENSE BLOCK *****
*/
-#include <stdio.h>
#include <stdlib.h>
+#include <stdio.h>
#include <string.h>
-
#include <ctype.h>
+#include <stdint.h>
+#include <amqp.h>
+#include <amqp_framing.h>
+
+#include "utils.h"
+
+void die_on_error(int x, char const *context) {
+ if (x < 0) {
+ char *errstr = amqp_error_string(-x);
+ fprintf(stderr, "%s: %s\n", context, errstr);
+ free(errstr);
+ exit(1);
+ }
+}
+
+void die_on_amqp_error(amqp_rpc_reply_t x, char const *context) {
+ switch (x.reply_type) {
+ case AMQP_RESPONSE_NORMAL:
+ return;
+
+ case AMQP_RESPONSE_NONE:
+ fprintf(stderr, "%s: missing RPC reply type!\n", context);
+ break;
+
+ case AMQP_RESPONSE_LIBRARY_EXCEPTION:
+ fprintf(stderr, "%s: %s\n", context, amqp_error_string(x.library_error));
+ break;
+
+ case AMQP_RESPONSE_SERVER_EXCEPTION:
+ switch (x.reply.id) {
+ case AMQP_CONNECTION_CLOSE_METHOD: {
+ amqp_connection_close_t *m = (amqp_connection_close_t *) x.reply.decoded;
+ fprintf(stderr, "%s: server connection error %d, message: %.*s\n",
+ context,
+ m->reply_code,
+ (int) m->reply_text.len, (char *) m->reply_text.bytes);
+ break;
+ }
+ case AMQP_CHANNEL_CLOSE_METHOD: {
+ amqp_channel_close_t *m = (amqp_channel_close_t *) x.reply.decoded;
+ fprintf(stderr, "%s: server channel error %d, message: %.*s\n",
+ context,
+ m->reply_code,
+ (int) m->reply_text.len, (char *) m->reply_text.bytes);
+ break;
+ }
+ default:
+ fprintf(stderr, "%s: unknown server error, method id 0x%08X\n", context, x.reply.id);
+ break;
+ }
+ break;
+ }
+
+ exit(1);
+}
+
static void dump_row(long count, int numinrow, int *chs) {
int i;
diff --git a/examples/example_utils.h b/examples/utils.h
index 3dd8ec1..e5ac9a3 100644
--- a/examples/example_utils.h
+++ b/examples/utils.h
@@ -1,5 +1,5 @@
-#ifndef librabbitmq_examples_example_utils_h
-#define librabbitmq_examples_example_utils_h
+#ifndef librabbitmq_examples_utils_h
+#define librabbitmq_examples_utils_h
/*
* ***** BEGIN LICENSE BLOCK *****
@@ -54,6 +54,9 @@
extern void die_on_error(int x, char const *context);
extern void die_on_amqp_error(amqp_rpc_reply_t x, char const *context);
-extern long long now_microseconds(void);
+extern void amqp_dump(void const *buffer, size_t len);
+
+extern uint64_t now_microseconds(void);
+extern void microsleep(int usec);
#endif
diff --git a/examples/windows/platform_utils.c b/examples/windows/platform_utils.c
new file mode 100644
index 0000000..915dd27
--- /dev/null
+++ b/examples/windows/platform_utils.c
@@ -0,0 +1,66 @@
+/*
+ * ***** BEGIN LICENSE BLOCK *****
+ * Version: MPL 1.1/GPL 2.0
+ *
+ * The contents of this file are subject to the Mozilla Public License
+ * Version 1.1 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ * http://www.mozilla.org/MPL/
+ *
+ * Software distributed under the License is distributed on an "AS IS"
+ * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+ * the License for the specific language governing rights and
+ * limitations under the License.
+ *
+ * The Original Code is librabbitmq.
+ *
+ * The Initial Developers of the Original Code are LShift Ltd, Cohesive
+ * Financial Technologies LLC, and Rabbit Technologies Ltd. Portions
+ * created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, Cohesive
+ * Financial Technologies LLC, or Rabbit Technologies Ltd are Copyright
+ * (C) 2007-2008 LShift Ltd, Cohesive Financial Technologies LLC, and
+ * Rabbit Technologies Ltd.
+ *
+ * Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+ * Ltd. Portions created by Cohesive Financial Technologies LLC are
+ * Copyright (C) 2007-2009 Cohesive Financial Technologies
+ * LLC. Portions created by Rabbit Technologies Ltd are Copyright (C)
+ * 2007-2009 Rabbit Technologies Ltd.
+ *
+ * Portions created by Tony Garnock-Jones are Copyright (C) 2009-2010
+ * LShift Ltd and Tony Garnock-Jones.
+ *
+ * All Rights Reserved.
+ *
+ * Contributor(s): ______________________________________.
+ *
+ * Alternatively, the contents of this file may be used under the terms
+ * of the GNU General Public License Version 2 or later (the "GPL"), in
+ * which case the provisions of the GPL are applicable instead of those
+ * above. If you wish to allow use of your version of this file only
+ * under the terms of the GPL, and not to allow others to use your
+ * version of this file under the terms of the MPL, indicate your
+ * decision by deleting the provisions above and replace them with the
+ * notice and other provisions required by the GPL. If you do not
+ * delete the provisions above, a recipient may use your version of
+ * this file under the terms of any one of the MPL or the GPL.
+ *
+ * ***** END LICENSE BLOCK *****
+ */
+
+#include <stdint.h>
+
+#include <windows.h>
+
+uint64_t now_microseconds(void)
+{
+ FILETIME ft;
+ GetSystemTimeAsFileTime(&ft);
+ return (((uint64_t)ft.dwHighDateTime << 32) | (uint64_t)ft.dwLowDateTime)
+ / 10;
+}
+
+void microsleep(int usec)
+{
+ Sleep(usec / 1000);
+}
diff --git a/librabbitmq/Makefile.am b/librabbitmq/Makefile.am
index 82b9f30..a0f8987 100644
--- a/librabbitmq/Makefile.am
+++ b/librabbitmq/Makefile.am
@@ -1,7 +1,19 @@
lib_LTLIBRARIES = librabbitmq.la
-AM_CFLAGS = -I$(srcdir)/$(PLATFORM_DIR)
-librabbitmq_la_SOURCES = amqp_mem.c amqp_table.c amqp_connection.c amqp_socket.c amqp_debug.c amqp_api.c $(PLATFORM_DIR)/socket.c
+AM_CFLAGS = -I$(srcdir)/$(PLATFORM_DIR) -DBUILDING_LIBRABBITMQ
+
+if GCC
+# Because we want to build under Microsoft's C compiler (for which
+# there is apparently no demand for C99 support), it's a good idea
+# to have gcc tell us when we stray from the old standard.
+AM_CFLAGS += -ansi -pedantic
+endif
+
+if USE_MSINTTYPES
+AM_CFLAGS += -I$(top_srcdir)/msinttypes
+endif
+
+librabbitmq_la_SOURCES = amqp_mem.c amqp_table.c amqp_connection.c amqp_socket.c amqp_api.c $(PLATFORM_DIR)/socket.c
librabbitmq_la_LDFLAGS = -no-undefined
librabbitmq_la_LIBADD = $(EXTRA_LIBS)
nodist_librabbitmq_la_SOURCES = amqp_framing.c
diff --git a/librabbitmq/amqp.h b/librabbitmq/amqp.h
index 40c8292..1541583 100644
--- a/librabbitmq/amqp.h
+++ b/librabbitmq/amqp.h
@@ -55,6 +55,16 @@
extern "C" {
#endif
+#ifdef _WIN32
+#ifdef BUILDING_LIBRABBITMQ
+#define RABBITMQ_EXPORT extern __declspec(dllexport)
+#else
+#define RABBITMQ_EXPORT extern __declspec(dllimport)
+#endif
+#else
+#define RABBITMQ_EXPORT extern
+#endif
+
typedef int amqp_boolean_t;
typedef uint32_t amqp_method_number_t;
typedef uint32_t amqp_flags_t;
@@ -65,29 +75,21 @@ typedef struct amqp_bytes_t_ {
void *bytes;
} amqp_bytes_t;
-#define AMQP_EMPTY_BYTES ((amqp_bytes_t) { .len = 0, .bytes = NULL })
-
typedef struct amqp_decimal_t_ {
- int decimals;
+ uint8_t decimals;
uint32_t value;
} amqp_decimal_t;
-#define AMQP_DECIMAL(d,v) ((amqp_decimal_t) { .decimals = (d), .value = (v) })
-
typedef struct amqp_table_t_ {
int num_entries;
struct amqp_table_entry_t_ *entries;
} amqp_table_t;
-#define AMQP_EMPTY_TABLE ((amqp_table_t) { .num_entries = 0, .entries = NULL })
-
typedef struct amqp_array_t_ {
int num_entries;
struct amqp_field_value_t_ *entries;
} amqp_array_t;
-#define AMQP_EMPTY_ARRAY ((amqp_array_t) { .num_entries = 0, .entries = NULL })
-
/*
0-9 0-9-1 Qpid/Rabbit Type Remarks
---------------------------------------------------------------------------
@@ -128,7 +130,7 @@ the code.
*/
typedef struct amqp_field_value_t_ {
- char kind;
+ uint8_t kind;
union {
amqp_boolean_t boolean;
int8_t i8;
@@ -162,6 +164,7 @@ typedef enum {
AMQP_FIELD_KIND_I32 = 'I',
AMQP_FIELD_KIND_U32 = 'i',
AMQP_FIELD_KIND_I64 = 'l',
+ AMQP_FIELD_KIND_U64 = 'L',
AMQP_FIELD_KIND_F32 = 'f',
AMQP_FIELD_KIND_F64 = 'd',
AMQP_FIELD_KIND_DECIMAL = 'D',
@@ -170,47 +173,9 @@ typedef enum {
AMQP_FIELD_KIND_TIMESTAMP = 'T',
AMQP_FIELD_KIND_TABLE = 'F',
AMQP_FIELD_KIND_VOID = 'V',
- AMQP_FIELD_KIND_BYTES = 'x',
+ AMQP_FIELD_KIND_BYTES = 'x'
} amqp_field_value_kind_t;
-#define _AMQP_TEINIT(ke,ki,v) {.key = (ke), .value = {.kind = AMQP_FIELD_KIND_##ki, .value = {v}}}
-#define AMQP_TABLE_ENTRY_BOOLEAN(k,v) _AMQP_TEINIT(amqp_cstring_bytes(k), BOOLEAN, .boolean = (v))
-#define AMQP_TABLE_ENTRY_I8(k,v) _AMQP_TEINIT(amqp_cstring_bytes(k), I8, .i8 = (v))
-#define AMQP_TABLE_ENTRY_U8(k,v) _AMQP_TEINIT(amqp_cstring_bytes(k), U8, .u8 = (v))
-#define AMQP_TABLE_ENTRY_I16(k,v) _AMQP_TEINIT(amqp_cstring_bytes(k), I16, .i16 = (v))
-#define AMQP_TABLE_ENTRY_U16(k,v) _AMQP_TEINIT(amqp_cstring_bytes(k), U16, .u16 = (v))
-#define AMQP_TABLE_ENTRY_I32(k,v) _AMQP_TEINIT(amqp_cstring_bytes(k), I32, .i32 = (v))
-#define AMQP_TABLE_ENTRY_U32(k,v) _AMQP_TEINIT(amqp_cstring_bytes(k), U32, .u32 = (v))
-#define AMQP_TABLE_ENTRY_I64(k,v) _AMQP_TEINIT(amqp_cstring_bytes(k), I64, .i64 = (v))
-#define AMQP_TABLE_ENTRY_F32(k,v) _AMQP_TEINIT(amqp_cstring_bytes(k), F32, .f32 = (v))
-#define AMQP_TABLE_ENTRY_F64(k,v) _AMQP_TEINIT(amqp_cstring_bytes(k), F64, .f64 = (v))
-#define AMQP_TABLE_ENTRY_DECIMAL(k,v) _AMQP_TEINIT(amqp_cstring_bytes(k), DECIMAL, .decimal = (v))
-#define AMQP_TABLE_ENTRY_UTF8(k,v) _AMQP_TEINIT(amqp_cstring_bytes(k), UTF8, .bytes = (v))
-#define AMQP_TABLE_ENTRY_ARRAY(k,v) _AMQP_TEINIT(amqp_cstring_bytes(k), ARRAY, .array = (v))
-#define AMQP_TABLE_ENTRY_TIMESTAMP(k,v) _AMQP_TEINIT(amqp_cstring_bytes(k), TIMESTAMP, .u64 = (v))
-#define AMQP_TABLE_ENTRY_TABLE(k,v) _AMQP_TEINIT(amqp_cstring_bytes(k), TABLE, .table = (v))
-#define AMQP_TABLE_ENTRY_VOID(k) _AMQP_TEINIT(amqp_cstring_bytes(k), VOID, .u8 = 0)
-#define AMQP_TABLE_ENTRY_BYTES(k,v) _AMQP_TEINIT(amqp_cstring_bytes(k), BYTES, .bytes = (v))
-
-#define _AMQP_FVINIT(ki,v) {.kind = AMQP_FIELD_KIND_##ki, .value = {v}}
-#define AMQP_FIELD_VALUE_BOOLEAN(v) _AMQP_FVINIT(BOOLEAN, .boolean = (v))
-#define AMQP_FIELD_VALUE_I8(v) _AMQP_FVINIT(I8, .i8 = (v))
-#define AMQP_FIELD_VALUE_U8(v) _AMQP_FVINIT(U8, .u8 = (v))
-#define AMQP_FIELD_VALUE_I16(v) _AMQP_FVINIT(I16, .i16 = (v))
-#define AMQP_FIELD_VALUE_U16(v) _AMQP_FVINIT(U16, .u16 = (v))
-#define AMQP_FIELD_VALUE_I32(v) _AMQP_FVINIT(I32, .i32 = (v))
-#define AMQP_FIELD_VALUE_U32(v) _AMQP_FVINIT(U32, .u32 = (v))
-#define AMQP_FIELD_VALUE_I64(v) _AMQP_FVINIT(I64, .i64 = (v))
-#define AMQP_FIELD_VALUE_F32(v) _AMQP_FVINIT(F32, .f32 = (v))
-#define AMQP_FIELD_VALUE_F64(v) _AMQP_FVINIT(F64, .f64 = (v))
-#define AMQP_FIELD_VALUE_DECIMAL(v) _AMQP_FVINIT(DECIMAL, .decimal = (v))
-#define AMQP_FIELD_VALUE_UTF8(v) _AMQP_FVINIT(UTF8, .bytes = (v))
-#define AMQP_FIELD_VALUE_ARRAY(v) _AMQP_FVINIT(ARRAY, .array = (v))
-#define AMQP_FIELD_VALUE_TIMESTAMP(v) _AMQP_FVINIT(TIMESTAMP, .u64 = (v))
-#define AMQP_FIELD_VALUE_TABLE(v) _AMQP_FVINIT(TABLE, .table = (v))
-#define AMQP_FIELD_VALUE_VOID(k) _AMQP_FVINIT(VOID, .u8 = 0)
-#define AMQP_FIELD_VALUE_BYTES(v) _AMQP_FVINIT(BYTES, .bytes = (v))
-
typedef struct amqp_pool_blocklist_t_ {
int num_blocks;
void **blocklist;
@@ -270,211 +235,198 @@ typedef enum amqp_sasl_method_enum_ {
AMQP_SASL_METHOD_PLAIN = 0
} amqp_sasl_method_enum;
-#define AMQP_PSEUDOFRAME_PROTOCOL_HEADER ((uint8_t) 'A')
-#define AMQP_PSEUDOFRAME_PROTOCOL_CHANNEL ((amqp_channel_t) ((((int) 'M') << 8) | ((int) 'Q')))
-
-typedef int (*amqp_output_fn_t)(void *context, void *buffer, size_t count);
-
/* Opaque struct. */
typedef struct amqp_connection_state_t_ *amqp_connection_state_t;
-extern char const *amqp_version(void);
-
-extern void init_amqp_pool(amqp_pool_t *pool, size_t pagesize);
-extern void recycle_amqp_pool(amqp_pool_t *pool);
-extern void empty_amqp_pool(amqp_pool_t *pool);
-
-extern void *amqp_pool_alloc(amqp_pool_t *pool, size_t amount);
-extern void amqp_pool_alloc_bytes(amqp_pool_t *pool, size_t amount, amqp_bytes_t *output);
-
-extern amqp_bytes_t amqp_cstring_bytes(char const *cstr);
-extern amqp_bytes_t amqp_bytes_malloc_dup(amqp_bytes_t src);
-extern amqp_bytes_t amqp_bytes_malloc(size_t amount);
-extern void amqp_bytes_free(amqp_bytes_t bytes);
-
-#define AMQP_BYTES_FREE(b) \
- ({ \
- if ((b).bytes != NULL) { \
- free((b).bytes); \
- (b).bytes = NULL; \
- } \
- })
-
-extern amqp_connection_state_t amqp_new_connection(void);
-extern int amqp_get_sockfd(amqp_connection_state_t state);
-extern void amqp_set_sockfd(amqp_connection_state_t state,
- int sockfd);
-extern int amqp_tune_connection(amqp_connection_state_t state,
- int channel_max,
- int frame_max,
- int heartbeat);
-extern int amqp_get_channel_max(amqp_connection_state_t state);
-extern int amqp_destroy_connection(amqp_connection_state_t state);
-
-extern int amqp_handle_input(amqp_connection_state_t state,
- amqp_bytes_t received_data,
- amqp_frame_t *decoded_frame);
-
-extern amqp_boolean_t amqp_release_buffers_ok(amqp_connection_state_t state);
-
-extern void amqp_release_buffers(amqp_connection_state_t state);
-
-extern void amqp_maybe_release_buffers(amqp_connection_state_t state);
-
-extern int amqp_send_frame(amqp_connection_state_t state,
- amqp_frame_t const *frame);
-extern int amqp_send_frame_to(amqp_connection_state_t state,
- amqp_frame_t const *frame,
- amqp_output_fn_t fn,
- void *context);
-
-extern int amqp_table_entry_cmp(void const *entry1, void const *entry2);
-
-extern int amqp_open_socket(char const *hostname, int portnumber);
-
-extern int amqp_send_header(amqp_connection_state_t state);
-extern int amqp_send_header_to(amqp_connection_state_t state,
- amqp_output_fn_t fn,
- void *context);
-
-extern amqp_boolean_t amqp_frames_enqueued(amqp_connection_state_t state);
-
-extern int amqp_simple_wait_frame(amqp_connection_state_t state,
- amqp_frame_t *decoded_frame);
-
-extern int amqp_simple_wait_method(amqp_connection_state_t state,
- amqp_channel_t expected_channel,
- amqp_method_number_t expected_method,
- amqp_method_t *output);
-
-extern int amqp_send_method(amqp_connection_state_t state,
- amqp_channel_t channel,
- amqp_method_number_t id,
- void *decoded);
-
-extern amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state,
- amqp_channel_t channel,
- amqp_method_number_t request_id,
- amqp_method_number_t *expected_reply_ids,
- void *decoded_request_method);
-
-#define AMQP_EXPAND_METHOD(classname, methodname) (AMQP_ ## classname ## _ ## methodname ## _METHOD)
-
-#define AMQP_SIMPLE_RPC(state, channel, classname, requestname, replyname, structname, ...) \
- ({ \
- structname _simple_rpc_request___ = (structname) { __VA_ARGS__ }; \
- amqp_method_number_t _replies__[2] = { AMQP_EXPAND_METHOD(classname, replyname), 0}; \
- amqp_simple_rpc(state, channel, \
- AMQP_EXPAND_METHOD(classname, requestname), \
- (amqp_method_number_t *)&_replies__, \
- &_simple_rpc_request___); \
- })
-
-#define AMQP_MULTIPLE_RESPONSE_RPC(state, channel, classname, requestname, replynames, structname, ...) \
- ({ \
- structname _simple_rpc_request___ = (structname) { __VA_ARGS__ }; \
- amqp_simple_rpc(state, channel, \
- AMQP_EXPAND_METHOD(classname, requestname), \
- replynames, \
- &_simple_rpc_request___); \
- })
-
-
-extern amqp_rpc_reply_t amqp_login(amqp_connection_state_t state,
- char const *vhost,
- int channel_max,
- int frame_max,
- int heartbeat,
- amqp_sasl_method_enum sasl_method, ...);
-
-extern struct amqp_channel_open_ok_t_ *amqp_channel_open(amqp_connection_state_t state,
- amqp_channel_t channel);
+RABBITMQ_EXPORT char const *amqp_version(void);
+
+/* Exported empty data structures */
+RABBITMQ_EXPORT const amqp_bytes_t amqp_empty_bytes;
+RABBITMQ_EXPORT const amqp_table_t amqp_empty_table;
+RABBITMQ_EXPORT const amqp_array_t amqp_empty_array;
+
+/* Compatibility macros for the above, to avoid the need to update
+ code written against earlier versions of librabbitmq. */
+#define AMQP_EMPTY_BYTES amqp_empty_bytes
+#define AMQP_EMPTY_TABLE amqp_empty_table
+#define AMQP_EMPTY_ARRAY amqp_empty_array
+
+RABBITMQ_EXPORT void init_amqp_pool(amqp_pool_t *pool, size_t pagesize);
+RABBITMQ_EXPORT void recycle_amqp_pool(amqp_pool_t *pool);
+RABBITMQ_EXPORT void empty_amqp_pool(amqp_pool_t *pool);
+
+RABBITMQ_EXPORT void *amqp_pool_alloc(amqp_pool_t *pool, size_t amount);
+RABBITMQ_EXPORT void amqp_pool_alloc_bytes(amqp_pool_t *pool,
+ size_t amount, amqp_bytes_t *output);
+
+RABBITMQ_EXPORT amqp_bytes_t amqp_cstring_bytes(char const *cstr);
+RABBITMQ_EXPORT amqp_bytes_t amqp_bytes_malloc_dup(amqp_bytes_t src);
+RABBITMQ_EXPORT amqp_bytes_t amqp_bytes_malloc(size_t amount);
+RABBITMQ_EXPORT void amqp_bytes_free(amqp_bytes_t bytes);
+
+RABBITMQ_EXPORT amqp_connection_state_t amqp_new_connection(void);
+RABBITMQ_EXPORT int amqp_get_sockfd(amqp_connection_state_t state);
+RABBITMQ_EXPORT void amqp_set_sockfd(amqp_connection_state_t state,
+ int sockfd);
+RABBITMQ_EXPORT int amqp_tune_connection(amqp_connection_state_t state,
+ int channel_max,
+ int frame_max,
+ int heartbeat);
+RABBITMQ_EXPORT int amqp_get_channel_max(amqp_connection_state_t state);
+RABBITMQ_EXPORT int amqp_destroy_connection(amqp_connection_state_t state);
+
+RABBITMQ_EXPORT int amqp_handle_input(amqp_connection_state_t state,
+ amqp_bytes_t received_data,
+ amqp_frame_t *decoded_frame);
+
+RABBITMQ_EXPORT amqp_boolean_t amqp_release_buffers_ok(
+ amqp_connection_state_t state);
+
+RABBITMQ_EXPORT void amqp_release_buffers(amqp_connection_state_t state);
+
+RABBITMQ_EXPORT void amqp_maybe_release_buffers(amqp_connection_state_t state);
+
+RABBITMQ_EXPORT int amqp_send_frame(amqp_connection_state_t state,
+ amqp_frame_t const *frame);
+
+RABBITMQ_EXPORT int amqp_table_entry_cmp(void const *entry1,
+ void const *entry2);
+
+RABBITMQ_EXPORT int amqp_open_socket(char const *hostname,
+ int portnumber);
+
+RABBITMQ_EXPORT int amqp_send_header(amqp_connection_state_t state);
+
+RABBITMQ_EXPORT amqp_boolean_t amqp_frames_enqueued(
+ amqp_connection_state_t state);
+
+RABBITMQ_EXPORT int amqp_simple_wait_frame(amqp_connection_state_t state,
+ amqp_frame_t *decoded_frame);
+
+RABBITMQ_EXPORT int amqp_simple_wait_method(amqp_connection_state_t state,
+ amqp_channel_t expected_channel,
+ amqp_method_number_t expected_method,
+ amqp_method_t *output);
+
+RABBITMQ_EXPORT int amqp_send_method(amqp_connection_state_t state,
+ amqp_channel_t channel,
+ amqp_method_number_t id,
+ void *decoded);
+
+RABBITMQ_EXPORT amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state,
+ amqp_channel_t channel,
+ amqp_method_number_t request_id,
+ amqp_method_number_t *expected_reply_ids,
+ void *decoded_request_method);
+
+RABBITMQ_EXPORT amqp_rpc_reply_t amqp_login(amqp_connection_state_t state,
+ char const *vhost,
+ int channel_max,
+ int frame_max,
+ int heartbeat,
+ amqp_sasl_method_enum sasl_method, ...);
+
+RABBITMQ_EXPORT struct amqp_channel_open_ok_t_ *amqp_channel_open(
+ amqp_connection_state_t state,
+ amqp_channel_t channel);
struct amqp_basic_properties_t_;
-extern int amqp_basic_publish(amqp_connection_state_t state,
- amqp_channel_t channel,
- amqp_bytes_t exchange,
- amqp_bytes_t routing_key,
- amqp_boolean_t mandatory,
- amqp_boolean_t immediate,
- struct amqp_basic_properties_t_ const *properties,
- amqp_bytes_t body);
-
-extern amqp_rpc_reply_t amqp_channel_close(amqp_connection_state_t state,
- amqp_channel_t channel,
- int code);
-extern amqp_rpc_reply_t amqp_connection_close(amqp_connection_state_t state,
- int code);
-
-extern struct amqp_exchange_declare_ok_t_ *amqp_exchange_declare(amqp_connection_state_t state,
- amqp_channel_t channel,
- amqp_bytes_t exchange,
- amqp_bytes_t type,
- amqp_boolean_t passive,
- amqp_boolean_t durable,
- amqp_table_t arguments);
-
-extern struct amqp_queue_declare_ok_t_ *amqp_queue_declare(amqp_connection_state_t state,
- amqp_channel_t channel,
- amqp_bytes_t queue,
- amqp_boolean_t passive,
- amqp_boolean_t durable,
- amqp_boolean_t exclusive,
- amqp_boolean_t auto_delete,
- amqp_table_t arguments);
-
-extern struct amqp_queue_delete_ok_t_ *amqp_queue_delete(amqp_connection_state_t state,
- amqp_channel_t channel,
- amqp_bytes_t queue,
- amqp_boolean_t if_unused,
- amqp_boolean_t if_empty);
-
-extern struct amqp_queue_bind_ok_t_ *amqp_queue_bind(amqp_connection_state_t state,
- amqp_channel_t channel,
- amqp_bytes_t queue,
- amqp_bytes_t exchange,
- amqp_bytes_t routing_key,
- amqp_table_t arguments);
-
-extern struct amqp_queue_unbind_ok_t_ *amqp_queue_unbind(amqp_connection_state_t state,
- amqp_channel_t channel,
- amqp_bytes_t queue,
- amqp_bytes_t exchange,
- amqp_bytes_t routing_key,
- amqp_table_t arguments);
-
-extern struct amqp_basic_consume_ok_t_ *amqp_basic_consume(amqp_connection_state_t state,
- amqp_channel_t channel,
- amqp_bytes_t queue,
- amqp_bytes_t consumer_tag,
- amqp_boolean_t no_local,
- amqp_boolean_t no_ack,
- amqp_boolean_t exclusive,
- amqp_table_t filter);
-
-extern int amqp_basic_ack(amqp_connection_state_t state,
- amqp_channel_t channel,
- uint64_t delivery_tag,
- amqp_boolean_t multiple);
-
-extern amqp_rpc_reply_t amqp_basic_get(amqp_connection_state_t state,
- amqp_channel_t channel,
- amqp_bytes_t queue,
- amqp_boolean_t no_ack);
-
-extern struct amqp_queue_purge_ok_t_ *amqp_queue_purge(amqp_connection_state_t state,
- amqp_channel_t channel,
- amqp_bytes_t queue,
- amqp_boolean_t no_wait);
-
-extern struct amqp_tx_select_ok_t_ *amqp_tx_select(amqp_connection_state_t state,
- amqp_channel_t channel);
-
-extern struct amqp_tx_commit_ok_t_ *amqp_tx_commit(amqp_connection_state_t state,
- amqp_channel_t channel);
-
-extern struct amqp_tx_rollback_ok_t_ *amqp_tx_rollback(amqp_connection_state_t state,
- amqp_channel_t channel);
+RABBITMQ_EXPORT int amqp_basic_publish(amqp_connection_state_t state,
+ amqp_channel_t channel,
+ amqp_bytes_t exchange,
+ amqp_bytes_t routing_key,
+ amqp_boolean_t mandatory,
+ amqp_boolean_t immediate,
+ struct amqp_basic_properties_t_ const *properties,
+ amqp_bytes_t body);
+
+RABBITMQ_EXPORT amqp_rpc_reply_t amqp_channel_close(
+ amqp_connection_state_t state,
+ amqp_channel_t channel,
+ int code);
+RABBITMQ_EXPORT amqp_rpc_reply_t amqp_connection_close(
+ amqp_connection_state_t state,
+ int code);
+
+RABBITMQ_EXPORT struct amqp_exchange_declare_ok_t_ *amqp_exchange_declare(
+ amqp_connection_state_t state,
+ amqp_channel_t channel,
+ amqp_bytes_t exchange,
+ amqp_bytes_t type,
+ amqp_boolean_t passive,
+ amqp_boolean_t durable,
+ amqp_table_t arguments);
+
+RABBITMQ_EXPORT struct amqp_queue_declare_ok_t_ *amqp_queue_declare(
+ amqp_connection_state_t state,
+ amqp_channel_t channel,
+ amqp_bytes_t queue,
+ amqp_boolean_t passive,
+ amqp_boolean_t durable,
+ amqp_boolean_t exclusive,
+ amqp_boolean_t auto_delete,
+ amqp_table_t arguments);
+
+RABBITMQ_EXPORT struct amqp_queue_delete_ok_t_ *amqp_queue_delete(
+ amqp_connection_state_t state,
+ amqp_channel_t channel,
+ amqp_bytes_t queue,
+ amqp_boolean_t if_unused,
+ amqp_boolean_t if_empty);
+
+RABBITMQ_EXPORT struct amqp_queue_bind_ok_t_ *amqp_queue_bind(
+ amqp_connection_state_t state,
+ amqp_channel_t channel,
+ amqp_bytes_t queue,
+ amqp_bytes_t exchange,
+ amqp_bytes_t routing_key,
+ amqp_table_t arguments);
+
+RABBITMQ_EXPORT struct amqp_queue_unbind_ok_t_ *amqp_queue_unbind(
+ amqp_connection_state_t state,
+ amqp_channel_t channel,
+ amqp_bytes_t queue,
+ amqp_bytes_t exchange,
+ amqp_bytes_t routing_key,
+ amqp_table_t arguments);
+
+RABBITMQ_EXPORT struct amqp_basic_consume_ok_t_ *amqp_basic_consume(
+ amqp_connection_state_t state,
+ amqp_channel_t channel,
+ amqp_bytes_t queue,
+ amqp_bytes_t consumer_tag,
+ amqp_boolean_t no_local,
+ amqp_boolean_t no_ack,
+ amqp_boolean_t exclusive,
+ amqp_table_t filter);
+
+RABBITMQ_EXPORT int amqp_basic_ack(amqp_connection_state_t state,
+ amqp_channel_t channel,
+ uint64_t delivery_tag,
+ amqp_boolean_t multiple);
+
+RABBITMQ_EXPORT amqp_rpc_reply_t amqp_basic_get(amqp_connection_state_t state,
+ amqp_channel_t channel,
+ amqp_bytes_t queue,
+ amqp_boolean_t no_ack);
+
+RABBITMQ_EXPORT struct amqp_queue_purge_ok_t_ *amqp_queue_purge(
+ amqp_connection_state_t state,
+ amqp_channel_t channel,
+ amqp_bytes_t queue,
+ amqp_boolean_t no_wait);
+
+RABBITMQ_EXPORT struct amqp_tx_select_ok_t_ *amqp_tx_select(
+ amqp_connection_state_t state,
+ amqp_channel_t channel);
+
+RABBITMQ_EXPORT struct amqp_tx_commit_ok_t_ *amqp_tx_commit(
+ amqp_connection_state_t state,
+ amqp_channel_t channel);
+
+RABBITMQ_EXPORT struct amqp_tx_rollback_ok_t_ *amqp_tx_rollback(
+ amqp_connection_state_t state,
+ amqp_channel_t channel);
/*
* Can be used to see if there is data still in the buffer, if so
@@ -483,7 +435,8 @@ extern struct amqp_tx_rollback_ok_t_ *amqp_tx_rollback(amqp_connection_state_t s
*
* Possibly amqp_frames_enqueued should be used for this?
*/
-extern amqp_boolean_t amqp_data_in_buffer(amqp_connection_state_t state);
+RABBITMQ_EXPORT amqp_boolean_t amqp_data_in_buffer(
+ amqp_connection_state_t state);
/*
* For those API operations (such as amqp_basic_ack,
@@ -498,7 +451,8 @@ extern amqp_boolean_t amqp_data_in_buffer(amqp_connection_state_t state);
* generally do NOT update this per-connection-global amqp_rpc_reply_t
* instance.
*/
-extern amqp_rpc_reply_t amqp_get_rpc_reply(amqp_connection_state_t state);
+RABBITMQ_EXPORT amqp_rpc_reply_t amqp_get_rpc_reply(
+ amqp_connection_state_t state);
/*
* Get the error string for the given error code.
@@ -506,7 +460,16 @@ extern amqp_rpc_reply_t amqp_get_rpc_reply(amqp_connection_state_t state);
* The returned string resides on the heap; the caller is responsible
* for freeing it.
*/
-extern char *amqp_error_string(int err);
+RABBITMQ_EXPORT char *amqp_error_string(int err);
+
+RABBITMQ_EXPORT int amqp_decode_table(amqp_bytes_t encoded,
+ amqp_pool_t *pool,
+ amqp_table_t *output,
+ size_t *offset);
+
+RABBITMQ_EXPORT int amqp_encode_table(amqp_bytes_t encoded,
+ amqp_table_t *input,
+ size_t *offset);
#ifdef __cplusplus
}
diff --git a/librabbitmq/amqp_api.c b/librabbitmq/amqp_api.c
index b2793ff..bf19761 100644
--- a/librabbitmq/amqp_api.c
+++ b/librabbitmq/amqp_api.c
@@ -52,6 +52,7 @@
#include <stdio.h>
#include <string.h>
#include <stdint.h>
+#include <stdarg.h>
#include "amqp.h"
#include "amqp_framing.h"
@@ -69,6 +70,12 @@ static const char *client_error_strings[ERROR_MAX] = {
"connection closed unexpectedly", /* ERROR_CONNECTION_CLOSED */
};
+/* strdup is not in ISO C90! */
+static inline char *strdup(const char *str)
+{
+ return strcpy(malloc(strlen(str) + 1),str);
+}
+
char *amqp_error_string(int err)
{
const char *str;
@@ -93,6 +100,20 @@ char *amqp_error_string(int err)
return strdup(str);
}
+void amqp_abort(const char *fmt, ...)
+{
+ va_list ap;
+ va_start(ap, fmt);
+ vfprintf(stderr, fmt, ap);
+ va_end(ap);
+ fputc('\n', stderr);
+ abort();
+}
+
+const amqp_bytes_t amqp_empty_bytes = { 0, NULL };
+const amqp_table_t amqp_empty_table = { 0, NULL };
+const amqp_array_t amqp_empty_array = { 0, NULL };
+
#define RPC_REPLY(replytype) \
(state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL \
? (replytype *) state->most_recent_api_result.reply.decoded \
@@ -101,11 +122,18 @@ char *amqp_error_string(int err)
amqp_channel_open_ok_t *amqp_channel_open(amqp_connection_state_t state,
amqp_channel_t channel)
{
- state->most_recent_api_result =
- AMQP_SIMPLE_RPC(state, channel, CHANNEL, OPEN, OPEN_OK,
- amqp_channel_open_t,
- AMQP_EMPTY_BYTES);
- return RPC_REPLY(amqp_channel_open_ok_t);
+ amqp_method_number_t replies[2] = { AMQP_CHANNEL_OPEN_OK_METHOD, 0};
+ amqp_channel_open_t req;
+ req.out_of_band.bytes = NULL;
+ req.out_of_band.len = 0;
+
+ state->most_recent_api_result = amqp_simple_rpc(state, channel,
+ AMQP_CHANNEL_OPEN_METHOD,
+ replies, &req);
+ if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL)
+ return state->most_recent_api_result.reply.decoded;
+ else
+ return NULL;
}
int amqp_basic_publish(amqp_connection_state_t state,
@@ -120,18 +148,19 @@ int amqp_basic_publish(amqp_connection_state_t state,
amqp_frame_t f;
size_t body_offset;
size_t usable_body_payload_size = state->frame_max - (HEADER_SIZE + FOOTER_SIZE);
+ int res;
- amqp_basic_publish_t m =
- (amqp_basic_publish_t) {
- .exchange = exchange,
- .routing_key = routing_key,
- .mandatory = mandatory,
- .immediate = immediate
- };
-
+ amqp_basic_publish_t m;
amqp_basic_properties_t default_properties;
- AMQP_CHECK_RESULT(amqp_send_method(state, channel, AMQP_BASIC_PUBLISH_METHOD, &m));
+ m.exchange = exchange;
+ m.routing_key = routing_key;
+ m.mandatory = mandatory;
+ m.immediate = immediate;
+
+ res = amqp_send_method(state, channel, AMQP_BASIC_PUBLISH_METHOD, &m);
+ if (res < 0)
+ return res;
if (properties == NULL) {
memset(&default_properties, 0, sizeof(default_properties));
@@ -143,7 +172,10 @@ int amqp_basic_publish(amqp_connection_state_t state,
f.payload.properties.class_id = AMQP_BASIC_CLASS;
f.payload.properties.body_size = body.len;
f.payload.properties.decoded = (void *) properties;
- AMQP_CHECK_RESULT(amqp_send_frame(state, &f));
+
+ res = amqp_send_frame(state, &f);
+ if (res < 0)
+ return res;
body_offset = 0;
while (1) {
@@ -155,7 +187,7 @@ int amqp_basic_publish(amqp_connection_state_t state,
f.frame_type = AMQP_FRAME_BODY;
f.channel = channel;
- f.payload.body_fragment.bytes = BUF_AT(body, body_offset);
+ f.payload.body_fragment.bytes = amqp_offset(body.bytes, body_offset);
if (remaining >= usable_body_payload_size) {
f.payload.body_fragment.len = usable_body_payload_size;
} else {
@@ -163,7 +195,9 @@ int amqp_basic_publish(amqp_connection_state_t state,
}
body_offset += f.payload.body_fragment.len;
- AMQP_CHECK_RESULT(amqp_send_frame(state, &f));
+ res = amqp_send_frame(state, &f);
+ if (res < 0)
+ return res;
}
return 0;
@@ -174,20 +208,34 @@ amqp_rpc_reply_t amqp_channel_close(amqp_connection_state_t state,
int code)
{
char codestr[13];
- snprintf(codestr, sizeof(codestr), "%d", code);
- return AMQP_SIMPLE_RPC(state, channel, CHANNEL, CLOSE, CLOSE_OK,
- amqp_channel_close_t,
- code, amqp_cstring_bytes(codestr), 0, 0);
+ amqp_method_number_t replies[2] = { AMQP_CHANNEL_CLOSE_OK_METHOD, 0};
+ amqp_channel_close_t req;
+
+ req.reply_code = code;
+ req.reply_text.bytes = codestr;
+ req.reply_text.len = sprintf(codestr, "%d", code);
+ req.class_id = 0;
+ req.method_id = 0;
+
+ return amqp_simple_rpc(state, channel, AMQP_CHANNEL_CLOSE_METHOD,
+ replies, &req);
}
amqp_rpc_reply_t amqp_connection_close(amqp_connection_state_t state,
int code)
{
char codestr[13];
- snprintf(codestr, sizeof(codestr), "%d", code);
- return AMQP_SIMPLE_RPC(state, 0, CONNECTION, CLOSE, CLOSE_OK,
- amqp_connection_close_t,
- code, amqp_cstring_bytes(codestr), 0, 0);
+ amqp_method_number_t replies[2] = { AMQP_CONNECTION_CLOSE_OK_METHOD, 0};
+ amqp_channel_close_t req;
+
+ req.reply_code = code;
+ req.reply_text.bytes = codestr;
+ req.reply_text.len = sprintf(codestr, "%d", code);
+ req.class_id = 0;
+ req.method_id = 0;
+
+ return amqp_simple_rpc(state, 0, AMQP_CONNECTION_CLOSE_METHOD,
+ replies, &req);
}
amqp_exchange_declare_ok_t *amqp_exchange_declare(amqp_connection_state_t state,
@@ -198,11 +246,24 @@ amqp_exchange_declare_ok_t *amqp_exchange_declare(amqp_connection_state_t state,
amqp_boolean_t durable,
amqp_table_t arguments)
{
- state->most_recent_api_result =
- AMQP_SIMPLE_RPC(state, channel, EXCHANGE, DECLARE, DECLARE_OK,
- amqp_exchange_declare_t,
- 0, exchange, type, passive, durable, 0, 0, 0, arguments);
- return RPC_REPLY(amqp_exchange_declare_ok_t);
+ amqp_method_number_t replies[2] = { AMQP_EXCHANGE_DECLARE_OK_METHOD, 0};
+ amqp_exchange_declare_t req;
+ req.exchange = exchange;
+ req.type = type;
+ req.passive = passive;
+ req.durable = durable;
+ req.auto_delete = 0;
+ req.internal = 0;
+ req.nowait = 0;
+ req.arguments = arguments;
+
+ state->most_recent_api_result = amqp_simple_rpc(state, channel,
+ AMQP_EXCHANGE_DECLARE_METHOD,
+ replies, &req);
+ if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL)
+ return state->most_recent_api_result.reply.decoded;
+ else
+ return NULL;
}
amqp_queue_declare_ok_t *amqp_queue_declare(amqp_connection_state_t state,
@@ -214,11 +275,23 @@ amqp_queue_declare_ok_t *amqp_queue_declare(amqp_connection_state_t state,
amqp_boolean_t auto_delete,
amqp_table_t arguments)
{
- state->most_recent_api_result =
- AMQP_SIMPLE_RPC(state, channel, QUEUE, DECLARE, DECLARE_OK,
- amqp_queue_declare_t,
- 0, queue, passive, durable, exclusive, auto_delete, 0, arguments);
- return RPC_REPLY(amqp_queue_declare_ok_t);
+ amqp_method_number_t replies[2] = { AMQP_QUEUE_DECLARE_OK_METHOD, 0};
+ amqp_queue_declare_t req;
+ req.queue = queue;
+ req.passive = passive;
+ req.durable = durable;
+ req.exclusive = exclusive;
+ req.auto_delete = auto_delete;
+ req.nowait = 0;
+ req.arguments = arguments;
+
+ state->most_recent_api_result = amqp_simple_rpc(state, channel,
+ AMQP_QUEUE_DECLARE_METHOD,
+ replies, &req);
+ if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL)
+ return state->most_recent_api_result.reply.decoded;
+ else
+ return NULL;
}
amqp_queue_delete_ok_t *amqp_queue_delete(amqp_connection_state_t state,
@@ -227,11 +300,20 @@ amqp_queue_delete_ok_t *amqp_queue_delete(amqp_connection_state_t state,
amqp_boolean_t if_unused,
amqp_boolean_t if_empty)
{
- state->most_recent_api_result =
- AMQP_SIMPLE_RPC(state, channel, QUEUE, DELETE, DELETE_OK,
- amqp_queue_delete_t,
- 0, queue, if_unused, if_empty, 0);
- return RPC_REPLY(amqp_queue_delete_ok_t);
+ amqp_method_number_t replies[2] = { AMQP_QUEUE_DELETE_OK_METHOD, 0};
+ amqp_queue_delete_t req;
+ req.queue = queue;
+ req.if_unused = if_unused;
+ req.if_empty = if_empty;
+ req.nowait = 0;
+
+ state->most_recent_api_result = amqp_simple_rpc(state, channel,
+ AMQP_QUEUE_DELETE_METHOD,
+ replies, &req);
+ if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL)
+ return state->most_recent_api_result.reply.decoded;
+ else
+ return NULL;
}
amqp_queue_bind_ok_t *amqp_queue_bind(amqp_connection_state_t state,
@@ -241,11 +323,22 @@ amqp_queue_bind_ok_t *amqp_queue_bind(amqp_connection_state_t state,
amqp_bytes_t routing_key,
amqp_table_t arguments)
{
- state->most_recent_api_result =
- AMQP_SIMPLE_RPC(state, channel, QUEUE, BIND, BIND_OK,
- amqp_queue_bind_t,
- 0, queue, exchange, routing_key, 0, arguments);
- return RPC_REPLY(amqp_queue_bind_ok_t);
+ amqp_method_number_t replies[2] = { AMQP_QUEUE_BIND_OK_METHOD, 0};
+ amqp_queue_bind_t req;
+ req.ticket = 0;
+ req.queue = queue;
+ req.exchange = exchange;
+ req.routing_key = routing_key;
+ req.nowait = 0;
+ req.arguments = arguments;
+
+ state->most_recent_api_result = amqp_simple_rpc(state, channel,
+ AMQP_QUEUE_BIND_METHOD,
+ replies, &req);
+ if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL)
+ return state->most_recent_api_result.reply.decoded;
+ else
+ return NULL;
}
amqp_queue_unbind_ok_t *amqp_queue_unbind(amqp_connection_state_t state,
@@ -255,11 +348,21 @@ amqp_queue_unbind_ok_t *amqp_queue_unbind(amqp_connection_state_t state,
amqp_bytes_t routing_key,
amqp_table_t arguments)
{
- state->most_recent_api_result =
- AMQP_SIMPLE_RPC(state, channel, QUEUE, UNBIND, UNBIND_OK,
- amqp_queue_unbind_t,
- 0, queue, exchange, routing_key, arguments);
- return RPC_REPLY(amqp_queue_unbind_ok_t);
+ amqp_method_number_t replies[2] = { AMQP_QUEUE_UNBIND_OK_METHOD, 0};
+ amqp_queue_unbind_t req;
+ req.ticket = 0;
+ req.queue = queue;
+ req.exchange = exchange;
+ req.routing_key = routing_key;
+ req.arguments = arguments;
+
+ state->most_recent_api_result = amqp_simple_rpc(state, channel,
+ AMQP_QUEUE_UNBIND_METHOD,
+ replies, &req);
+ if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL)
+ return state->most_recent_api_result.reply.decoded;
+ else
+ return NULL;
}
amqp_basic_consume_ok_t *amqp_basic_consume(amqp_connection_state_t state,
@@ -271,11 +374,24 @@ amqp_basic_consume_ok_t *amqp_basic_consume(amqp_connection_state_t state,
amqp_boolean_t exclusive,
amqp_table_t filter)
{
- state->most_recent_api_result =
- AMQP_SIMPLE_RPC(state, channel, BASIC, CONSUME, CONSUME_OK,
- amqp_basic_consume_t,
- 0, queue, consumer_tag, no_local, no_ack, exclusive, 0, filter);
- return RPC_REPLY(amqp_basic_consume_ok_t);
+ amqp_method_number_t replies[2] = { AMQP_BASIC_CONSUME_OK_METHOD, 0};
+ amqp_basic_consume_t req;
+ req.ticket = 0;
+ req.queue = queue;
+ req.consumer_tag = consumer_tag;
+ req.no_local = no_local;
+ req.no_ack = no_ack;
+ req.exclusive = exclusive;
+ req.nowait = 0;
+ req.arguments = filter;
+
+ state->most_recent_api_result = amqp_simple_rpc(state, channel,
+ AMQP_BASIC_CONSUME_METHOD,
+ replies, &req);
+ if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL)
+ return state->most_recent_api_result.reply.decoded;
+ else
+ return NULL;
}
int amqp_basic_ack(amqp_connection_state_t state,
@@ -283,13 +399,10 @@ int amqp_basic_ack(amqp_connection_state_t state,
uint64_t delivery_tag,
amqp_boolean_t multiple)
{
- amqp_basic_ack_t m =
- (amqp_basic_ack_t) {
- .delivery_tag = delivery_tag,
- .multiple = multiple
- };
- AMQP_CHECK_RESULT(amqp_send_method(state, channel, AMQP_BASIC_ACK_METHOD, &m));
- return 0;
+ amqp_basic_ack_t m;
+ m.delivery_tag = delivery_tag;
+ m.multiple = multiple;
+ return amqp_send_method(state, channel, AMQP_BASIC_ACK_METHOD, &m);
}
amqp_queue_purge_ok_t *amqp_queue_purge(amqp_connection_state_t state,
@@ -297,11 +410,19 @@ amqp_queue_purge_ok_t *amqp_queue_purge(amqp_connection_state_t state,
amqp_bytes_t queue,
amqp_boolean_t no_wait)
{
- state->most_recent_api_result =
- AMQP_SIMPLE_RPC(state, channel, QUEUE, PURGE, PURGE_OK,
- amqp_queue_purge_t,
- 0, queue, no_wait);
- return RPC_REPLY(amqp_queue_purge_ok_t);
+ amqp_method_number_t replies[2] = { AMQP_QUEUE_PURGE_OK_METHOD, 0};
+ amqp_queue_purge_t req;
+ req.ticket = 0;
+ req.queue = queue;
+ req.nowait = 0;
+
+ state->most_recent_api_result = amqp_simple_rpc(state, channel,
+ AMQP_QUEUE_PURGE_METHOD,
+ replies, &req);
+ if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL)
+ return state->most_recent_api_result.reply.decoded;
+ else
+ return NULL;
}
amqp_rpc_reply_t amqp_basic_get(amqp_connection_state_t state,
@@ -312,38 +433,54 @@ amqp_rpc_reply_t amqp_basic_get(amqp_connection_state_t state,
amqp_method_number_t replies[] = { AMQP_BASIC_GET_OK_METHOD,
AMQP_BASIC_GET_EMPTY_METHOD,
0 };
- state->most_recent_api_result =
- AMQP_MULTIPLE_RESPONSE_RPC(state, channel, BASIC, GET, replies,
- amqp_basic_get_t,
- 0, queue, no_ack);
+ amqp_basic_get_t req;
+ req.ticket = 0;
+ req.queue = queue;
+ req.no_ack = no_ack;
+
+ state->most_recent_api_result = amqp_simple_rpc(state, channel,
+ AMQP_BASIC_GET_METHOD,
+ replies, &req);
return state->most_recent_api_result;
}
amqp_tx_select_ok_t *amqp_tx_select(amqp_connection_state_t state,
amqp_channel_t channel)
{
- state->most_recent_api_result =
- AMQP_SIMPLE_RPC(state, channel, TX, SELECT, SELECT_OK,
- amqp_tx_select_t);
- return RPC_REPLY(amqp_tx_select_ok_t);
+ amqp_method_number_t replies[2] = { AMQP_TX_SELECT_OK_METHOD, 0};
+ state->most_recent_api_result = amqp_simple_rpc(state, channel,
+ AMQP_TX_SELECT_METHOD,
+ replies, NULL);
+ if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL)
+ return state->most_recent_api_result.reply.decoded;
+ else
+ return NULL;
}
amqp_tx_commit_ok_t *amqp_tx_commit(amqp_connection_state_t state,
amqp_channel_t channel)
{
- state->most_recent_api_result =
- AMQP_SIMPLE_RPC(state, channel, TX, COMMIT, COMMIT_OK,
- amqp_tx_commit_t);
- return RPC_REPLY(amqp_tx_commit_ok_t);
+ amqp_method_number_t replies[2] = { AMQP_TX_COMMIT_OK_METHOD, 0};
+ state->most_recent_api_result = amqp_simple_rpc(state, channel,
+ AMQP_TX_COMMIT_METHOD,
+ replies, NULL);
+ if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL)
+ return state->most_recent_api_result.reply.decoded;
+ else
+ return NULL;
}
amqp_tx_rollback_ok_t *amqp_tx_rollback(amqp_connection_state_t state,
amqp_channel_t channel)
{
- state->most_recent_api_result =
- AMQP_SIMPLE_RPC(state, channel, TX, ROLLBACK, ROLLBACK_OK,
- amqp_tx_rollback_t);
- return RPC_REPLY(amqp_tx_rollback_ok_t);
+ amqp_method_number_t replies[2] = { AMQP_TX_ROLLBACK_OK_METHOD, 0};
+ state->most_recent_api_result = amqp_simple_rpc(state, channel,
+ AMQP_TX_ROLLBACK_METHOD,
+ replies, NULL);
+ if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL)
+ return state->most_recent_api_result.reply.decoded;
+ else
+ return NULL;
}
amqp_rpc_reply_t amqp_get_rpc_reply(amqp_connection_state_t state)
diff --git a/librabbitmq/amqp_connection.c b/librabbitmq/amqp_connection.c
index 3d95e98..34d12f8 100644
--- a/librabbitmq/amqp_connection.c
+++ b/librabbitmq/amqp_connection.c
@@ -58,62 +58,56 @@
#include "amqp_framing.h"
#include "amqp_private.h"
-#include "socket.h"
-
#define INITIAL_FRAME_POOL_PAGE_SIZE 65536
#define INITIAL_DECODING_POOL_PAGE_SIZE 131072
#define INITIAL_INBOUND_SOCK_BUFFER_SIZE 131072
-#define ENFORCE_STATE(statevec, statenum) \
- { \
- amqp_connection_state_t _check_state = (statevec); \
- int _wanted_state = (statenum); \
- amqp_assert(_check_state->state == _wanted_state, \
- "Programming error: invalid AMQP connection state: expected %d, got %d", \
- _wanted_state, \
- _check_state->state); \
+#define ENFORCE_STATE(statevec, statenum) \
+ { \
+ amqp_connection_state_t _check_state = (statevec); \
+ int _wanted_state = (statenum); \
+ if (_check_state->state != _wanted_state) \
+ amqp_abort("Programming error: invalid AMQP connection state: expected %d, got %d", \
+ _wanted_state, \
+ _check_state->state); \
}
amqp_connection_state_t amqp_new_connection(void) {
amqp_connection_state_t state =
(amqp_connection_state_t) calloc(1, sizeof(struct amqp_connection_state_t_));
- if (state == NULL) {
+ if (state == NULL)
return NULL;
- }
init_amqp_pool(&state->frame_pool, INITIAL_FRAME_POOL_PAGE_SIZE);
init_amqp_pool(&state->decoding_pool, INITIAL_DECODING_POOL_PAGE_SIZE);
- state->state = CONNECTION_STATE_IDLE;
+ if (amqp_tune_connection(state, 0, INITIAL_FRAME_POOL_PAGE_SIZE, 0) != 0)
+ goto out_nomem;
- state->inbound_buffer.bytes = NULL;
- state->outbound_buffer.bytes = NULL;
- if (amqp_tune_connection(state, 0, INITIAL_FRAME_POOL_PAGE_SIZE, 0) != 0) {
- empty_amqp_pool(&state->frame_pool);
- empty_amqp_pool(&state->decoding_pool);
- free(state);
- return NULL;
- }
+ state->inbound_buffer.bytes = amqp_pool_alloc(&state->frame_pool, state->inbound_buffer.len);
+ if (state->inbound_buffer.bytes == NULL)
+ goto out_nomem;
- state->inbound_offset = 0;
- state->target_size = HEADER_SIZE;
+ state->state = CONNECTION_STATE_INITIAL;
+ /* the server protocol version response is 8 bytes, which conveniently
+ is also the minimum frame size */
+ state->target_size = 8;
state->sockfd = -1;
state->sock_inbound_buffer.len = INITIAL_INBOUND_SOCK_BUFFER_SIZE;
state->sock_inbound_buffer.bytes = malloc(INITIAL_INBOUND_SOCK_BUFFER_SIZE);
- if (state->sock_inbound_buffer.bytes == NULL) {
- amqp_destroy_connection(state);
- return NULL;
- }
-
- state->sock_inbound_offset = 0;
- state->sock_inbound_limit = 0;
-
- state->first_queued_frame = NULL;
- state->last_queued_frame = NULL;
+ if (state->sock_inbound_buffer.bytes == NULL)
+ goto out_nomem;
return state;
+
+ out_nomem:
+ free(state->sock_inbound_buffer.bytes);
+ empty_amqp_pool(&state->frame_pool);
+ empty_amqp_pool(&state->decoding_pool);
+ free(state);
+ return NULL;
}
int amqp_get_sockfd(amqp_connection_state_t state) {
@@ -180,153 +174,162 @@ static void return_to_idle(amqp_connection_state_t state) {
state->state = CONNECTION_STATE_IDLE;
}
+static size_t consume_data(amqp_connection_state_t state,
+ amqp_bytes_t *received_data)
+{
+ /* how much data is available and will fit? */
+ size_t bytes_consumed = state->target_size - state->inbound_offset;
+ if (received_data->len < bytes_consumed)
+ bytes_consumed = received_data->len;
+
+ memcpy(amqp_offset(state->inbound_buffer.bytes, state->inbound_offset),
+ received_data->bytes, bytes_consumed);
+ state->inbound_offset += bytes_consumed;
+ received_data->bytes = amqp_offset(received_data->bytes, bytes_consumed);
+ received_data->len -= bytes_consumed;
+
+ return bytes_consumed;
+}
+
int amqp_handle_input(amqp_connection_state_t state,
amqp_bytes_t received_data,
amqp_frame_t *decoded_frame)
{
- int total_bytes_consumed = 0;
- int bytes_consumed;
+ size_t bytes_consumed;
+ void *raw_frame;
/* Returning frame_type of zero indicates either insufficient input,
or a complete, ignored frame was read. */
decoded_frame->frame_type = 0;
- read_more:
- if (received_data.len == 0) {
- return total_bytes_consumed;
- }
+ if (received_data.len == 0)
+ return 0;
if (state->state == CONNECTION_STATE_IDLE) {
- state->inbound_buffer.bytes = amqp_pool_alloc(&state->frame_pool, state->inbound_buffer.len);
- if (state->inbound_buffer.bytes == NULL) {
+ state->inbound_buffer.bytes = amqp_pool_alloc(&state->frame_pool,
+ state->inbound_buffer.len);
+ if (state->inbound_buffer.bytes == NULL)
/* state->inbound_buffer.len is always nonzero, because it
corresponds to frame_max, which is not permitted to be less
than AMQP_FRAME_MIN_SIZE (currently 4096 bytes). */
return -ERROR_NO_MEMORY;
- }
- state->state = CONNECTION_STATE_WAITING_FOR_HEADER;
- }
- bytes_consumed = state->target_size - state->inbound_offset;
- if (received_data.len < bytes_consumed) {
- bytes_consumed = received_data.len;
+ state->state = CONNECTION_STATE_HEADER;
}
- E_BYTES(state->inbound_buffer, state->inbound_offset, bytes_consumed, received_data.bytes);
- state->inbound_offset += bytes_consumed;
- total_bytes_consumed += bytes_consumed;
+ bytes_consumed = consume_data(state, &received_data);
- assert(state->inbound_offset <= state->target_size);
+ /* do we have target_size data yet? if not, return with the
+ expectation that more will arrive */
+ if (state->inbound_offset < state->target_size)
+ return bytes_consumed;
- if (state->inbound_offset < state->target_size) {
- return total_bytes_consumed;
- }
+ raw_frame = state->inbound_buffer.bytes;
switch (state->state) {
- case CONNECTION_STATE_WAITING_FOR_HEADER:
- if (D_8(state->inbound_buffer, 0) == AMQP_PSEUDOFRAME_PROTOCOL_HEADER &&
- D_16(state->inbound_buffer, 1) == AMQP_PSEUDOFRAME_PROTOCOL_CHANNEL)
- {
- state->target_size = 8;
- state->state = CONNECTION_STATE_WAITING_FOR_PROTOCOL_HEADER;
- } else {
- state->target_size = D_32(state->inbound_buffer, 3) + HEADER_SIZE + FOOTER_SIZE;
- state->state = CONNECTION_STATE_WAITING_FOR_BODY;
- }
-
- /* Wind buffer forward, and try to read some body out of it. */
- received_data.len -= bytes_consumed;
- received_data.bytes = ((char *) received_data.bytes) + bytes_consumed;
- goto read_more;
-
- case CONNECTION_STATE_WAITING_FOR_BODY: {
- int frame_type = D_8(state->inbound_buffer, 0);
-
-#if 0
- printf("recving:\n");
- amqp_dump(state->inbound_buffer.bytes, state->target_size);
-#endif
-
- /* Check frame end marker (footer) */
- if (D_8(state->inbound_buffer, state->target_size - 1) != AMQP_FRAME_END) {
- return -ERROR_BAD_AMQP_DATA;
- }
-
- decoded_frame->channel = D_16(state->inbound_buffer, 1);
-
- switch (frame_type) {
- case AMQP_FRAME_METHOD: {
- amqp_bytes_t encoded;
-
- /* Four bytes of method ID before the method args. */
- encoded.len = state->target_size - (HEADER_SIZE + 4 + FOOTER_SIZE);
- encoded.bytes = D_BYTES(state->inbound_buffer, HEADER_SIZE + 4, encoded.len);
-
- decoded_frame->frame_type = AMQP_FRAME_METHOD;
- decoded_frame->payload.method.id = D_32(state->inbound_buffer, HEADER_SIZE);
- AMQP_CHECK_RESULT(amqp_decode_method(decoded_frame->payload.method.id,
- &state->decoding_pool,
- encoded,
- &decoded_frame->payload.method.decoded));
- break;
- }
-
- case AMQP_FRAME_HEADER: {
- amqp_bytes_t encoded;
-
- /* 12 bytes for properties header. */
- encoded.len = state->target_size - (HEADER_SIZE + 12 + FOOTER_SIZE);
- encoded.bytes = D_BYTES(state->inbound_buffer, HEADER_SIZE + 12, encoded.len);
-
- decoded_frame->frame_type = AMQP_FRAME_HEADER;
- decoded_frame->payload.properties.class_id = D_16(state->inbound_buffer, HEADER_SIZE);
- decoded_frame->payload.properties.body_size = D_64(state->inbound_buffer, HEADER_SIZE+4);
- decoded_frame->payload.properties.raw = encoded;
- AMQP_CHECK_RESULT(amqp_decode_properties(decoded_frame->payload.properties.class_id,
- &state->decoding_pool,
- encoded,
- &decoded_frame->payload.properties.decoded));
- break;
- }
-
- case AMQP_FRAME_BODY: {
- size_t fragment_len = state->target_size - (HEADER_SIZE + FOOTER_SIZE);
-
- decoded_frame->frame_type = AMQP_FRAME_BODY;
- decoded_frame->payload.body_fragment.len = fragment_len;
- decoded_frame->payload.body_fragment.bytes =
- D_BYTES(state->inbound_buffer, HEADER_SIZE, fragment_len);
- break;
- }
-
- case AMQP_FRAME_HEARTBEAT:
- decoded_frame->frame_type = AMQP_FRAME_HEARTBEAT;
- break;
-
- default:
- /* Ignore the frame by not changing frame_type away from 0. */
- break;
- }
+ case CONNECTION_STATE_INITIAL:
+ /* check for a protocol header from the server */
+ if (memcmp(raw_frame, "AMQP", 4) == 0) {
+ decoded_frame->frame_type = AMQP_PSEUDOFRAME_PROTOCOL_HEADER;
+ decoded_frame->channel = 0;
+
+ decoded_frame->payload.protocol_header.transport_high
+ = amqp_d8(raw_frame, 4);
+ decoded_frame->payload.protocol_header.transport_low
+ = amqp_d8(raw_frame, 5);
+ decoded_frame->payload.protocol_header.protocol_version_major
+ = amqp_d8(raw_frame, 6);
+ decoded_frame->payload.protocol_header.protocol_version_minor
+ = amqp_d8(raw_frame, 7);
return_to_idle(state);
- return total_bytes_consumed;
+ return bytes_consumed;
}
- case CONNECTION_STATE_WAITING_FOR_PROTOCOL_HEADER:
- decoded_frame->frame_type = AMQP_PSEUDOFRAME_PROTOCOL_HEADER;
- decoded_frame->channel = AMQP_PSEUDOFRAME_PROTOCOL_CHANNEL;
- amqp_assert(D_8(state->inbound_buffer, 3) == (uint8_t) 'P',
- "Invalid protocol header received");
- decoded_frame->payload.protocol_header.transport_high = D_8(state->inbound_buffer, 4);
- decoded_frame->payload.protocol_header.transport_low = D_8(state->inbound_buffer, 5);
- decoded_frame->payload.protocol_header.protocol_version_major = D_8(state->inbound_buffer, 6);
- decoded_frame->payload.protocol_header.protocol_version_minor = D_8(state->inbound_buffer, 7);
+ /* it's not a protocol header; fall through to process it as a
+ regular frame header */
- return_to_idle(state);
- return total_bytes_consumed;
+ case CONNECTION_STATE_HEADER:
+ /* frame length is 3 bytes in */
+ state->target_size
+ = amqp_d32(raw_frame, 3) + HEADER_SIZE + FOOTER_SIZE;
+ state->state = CONNECTION_STATE_BODY;
+
+ bytes_consumed += consume_data(state, &received_data);
+
+ /* do we have target_size data yet? if not, return with the
+ expectation that more will arrive */
+ if (state->inbound_offset < state->target_size)
+ return bytes_consumed;
+
+ /* fall through to process body */
+
+ case CONNECTION_STATE_BODY: {
+ amqp_bytes_t encoded;
+ int res;
+
+ /* Check frame end marker (footer) */
+ if (amqp_d8(raw_frame, state->target_size - 1) != AMQP_FRAME_END)
+ return -ERROR_BAD_AMQP_DATA;
+
+ decoded_frame->frame_type = amqp_d8(raw_frame, 0);
+ decoded_frame->channel = amqp_d16(raw_frame, 1);
+
+ switch (decoded_frame->frame_type) {
+ case AMQP_FRAME_METHOD:
+ decoded_frame->payload.method.id = amqp_d32(raw_frame, HEADER_SIZE);
+ encoded.bytes = amqp_offset(raw_frame, HEADER_SIZE + 4);
+ encoded.len = state->target_size - HEADER_SIZE - 4 - FOOTER_SIZE;
+
+ res = amqp_decode_method(decoded_frame->payload.method.id,
+ &state->decoding_pool, encoded,
+ &decoded_frame->payload.method.decoded);
+ if (res < 0)
+ return res;
+
+ break;
+
+ case AMQP_FRAME_HEADER:
+ decoded_frame->payload.properties.class_id
+ = amqp_d16(raw_frame, HEADER_SIZE);
+ /* unused 2-byte weight field goes here */
+ decoded_frame->payload.properties.body_size
+ = amqp_d64(raw_frame, HEADER_SIZE + 4);
+ encoded.bytes = amqp_offset(raw_frame, HEADER_SIZE + 12);
+ encoded.len = state->target_size - HEADER_SIZE - 12 - FOOTER_SIZE;
+ decoded_frame->payload.properties.raw = encoded;
+
+ res = amqp_decode_properties(decoded_frame->payload.properties.class_id,
+ &state->decoding_pool, encoded,
+ &decoded_frame->payload.properties.decoded);
+ if (res < 0)
+ return res;
+
+ break;
+
+ case AMQP_FRAME_BODY:
+ decoded_frame->payload.body_fragment.len
+ = state->target_size - HEADER_SIZE - FOOTER_SIZE;
+ decoded_frame->payload.body_fragment.bytes
+ = amqp_offset(raw_frame, HEADER_SIZE);
+ break;
+
+ case AMQP_FRAME_HEARTBEAT:
+ break;
default:
- amqp_assert(0, "Internal error: invalid amqp_connection_state_t->state %d", state->state);
+ /* Ignore the frame */
+ decoded_frame->frame_type = 0;
+ break;
+ }
+
+ return_to_idle(state);
+ return bytes_consumed;
+ }
+
+ default:
+ amqp_abort("Internal error: invalid amqp_connection_state_t->state %d", state->state);
+ return bytes_consumed;
}
}
@@ -337,8 +340,8 @@ amqp_boolean_t amqp_release_buffers_ok(amqp_connection_state_t state) {
void amqp_release_buffers(amqp_connection_state_t state) {
ENFORCE_STATE(state, CONNECTION_STATE_IDLE);
- amqp_assert(state->first_queued_frame == NULL,
- "Programming error: attempt to amqp_release_buffers while waiting events enqueued");
+ if (state->first_queued_frame)
+ amqp_abort("Programming error: attempt to amqp_release_buffers while waiting events enqueued");
recycle_amqp_pool(&state->frame_pool);
recycle_amqp_pool(&state->decoding_pool);
@@ -350,103 +353,80 @@ void amqp_maybe_release_buffers(amqp_connection_state_t state) {
}
}
-static int inner_send_frame(amqp_connection_state_t state,
- amqp_frame_t const *frame,
- amqp_bytes_t *encoded,
- int *payload_len)
+int amqp_send_frame(amqp_connection_state_t state,
+ const amqp_frame_t *frame)
{
- int separate_body;
+ void *out_frame = state->outbound_buffer.bytes;
+ int res;
- E_8(state->outbound_buffer, 0, frame->frame_type);
- E_16(state->outbound_buffer, 1, frame->channel);
- switch (frame->frame_type) {
- case AMQP_FRAME_METHOD:
- E_32(state->outbound_buffer, HEADER_SIZE, frame->payload.method.id);
- encoded->len = state->outbound_buffer.len - (HEADER_SIZE + 4 + FOOTER_SIZE);
- encoded->bytes = D_BYTES(state->outbound_buffer, HEADER_SIZE + 4, encoded->len);
- *payload_len = AMQP_CHECK_RESULT(amqp_encode_method(frame->payload.method.id,
- frame->payload.method.decoded,
- *encoded)) + 4;
- separate_body = 0;
- break;
+ amqp_e8(out_frame, 0, frame->frame_type);
+ amqp_e16(out_frame, 1, frame->channel);
- case AMQP_FRAME_HEADER:
- E_16(state->outbound_buffer, HEADER_SIZE, frame->payload.properties.class_id);
- E_16(state->outbound_buffer, HEADER_SIZE+2, 0); /* "weight" */
- E_64(state->outbound_buffer, HEADER_SIZE+4, frame->payload.properties.body_size);
- encoded->len = state->outbound_buffer.len - (HEADER_SIZE + 12 + FOOTER_SIZE);
- encoded->bytes = D_BYTES(state->outbound_buffer, HEADER_SIZE + 12, encoded->len);
- *payload_len = AMQP_CHECK_RESULT(amqp_encode_properties(frame->payload.properties.class_id,
- frame->payload.properties.decoded,
- *encoded)) + 12;
- separate_body = 0;
- break;
+ if (frame->frame_type == AMQP_FRAME_BODY) {
+ /* For a body frame, rather than copying data around, we use
+ writev to compose the frame */
+ struct iovec iov[3];
+ uint8_t frame_end_byte = AMQP_FRAME_END;
+ const amqp_bytes_t *body = &frame->payload.body_fragment;
- case AMQP_FRAME_BODY:
- *encoded = frame->payload.body_fragment;
- *payload_len = encoded->len;
- separate_body = 1;
- break;
+ amqp_e32(out_frame, 3, body->len);
- case AMQP_FRAME_HEARTBEAT:
- *encoded = AMQP_EMPTY_BYTES;
- *payload_len = 0;
- separate_body = 0;
- break;
+ iov[0].iov_base = out_frame;
+ iov[0].iov_len = HEADER_SIZE;
+ iov[1].iov_base = body->bytes;
+ iov[1].iov_len = body->len;
+ iov[2].iov_base = &frame_end_byte;
+ iov[2].iov_len = FOOTER_SIZE;
- default:
- abort();
+ res = amqp_socket_writev(state->sockfd, iov, 3);
}
+ else {
+ size_t out_frame_len;
+ amqp_bytes_t encoded;
- E_32(state->outbound_buffer, 3, *payload_len);
- if (!separate_body) {
- E_8(state->outbound_buffer, *payload_len + HEADER_SIZE, AMQP_FRAME_END);
- }
+ switch (frame->frame_type) {
+ case AMQP_FRAME_METHOD:
+ amqp_e32(out_frame, HEADER_SIZE, frame->payload.method.id);
-#if 0
- if (separate_body) {
- printf("sending body frame (header):\n");
- amqp_dump(state->outbound_buffer.bytes, HEADER_SIZE);
- printf("sending body frame (payload):\n");
- amqp_dump(encoded->bytes, *payload_len);
- } else {
- printf("sending:\n");
- amqp_dump(state->outbound_buffer.bytes, *payload_len + HEADER_SIZE + FOOTER_SIZE);
- }
-#endif
+ encoded.bytes = amqp_offset(out_frame, HEADER_SIZE + 4);
+ encoded.len = state->outbound_buffer.len - HEADER_SIZE - 4 - FOOTER_SIZE;
- return separate_body;
-}
+ res = amqp_encode_method(frame->payload.method.id,
+ frame->payload.method.decoded, encoded);
+ if (res < 0)
+ return res;
-int amqp_send_frame(amqp_connection_state_t state,
- amqp_frame_t const *frame)
-{
- amqp_bytes_t encoded;
- int payload_len, res;
-
- res = inner_send_frame(state, frame, &encoded, &payload_len);
- switch (res) {
- case 0:
- res = send(state->sockfd, state->outbound_buffer.bytes,
- payload_len + (HEADER_SIZE + FOOTER_SIZE), 0);
+ out_frame_len = res + 4;
break;
- case 1: {
- struct iovec iov[3];
- char frame_end_byte = AMQP_FRAME_END;
- iov[0].iov_base = state->outbound_buffer.bytes;
- iov[0].iov_len = HEADER_SIZE;
- iov[1].iov_base = encoded.bytes;
- iov[1].iov_len = payload_len;
- iov[2].iov_base = &frame_end_byte;
- assert(FOOTER_SIZE == 1);
- iov[2].iov_len = FOOTER_SIZE;
- res = amqp_socket_writev(state->sockfd, &iov[0], 3);
+ case AMQP_FRAME_HEADER:
+ amqp_e16(out_frame, HEADER_SIZE, frame->payload.properties.class_id);
+ amqp_e16(out_frame, HEADER_SIZE+2, 0); /* "weight" */
+ amqp_e64(out_frame, HEADER_SIZE+4, frame->payload.properties.body_size);
+
+ encoded.bytes = amqp_offset(out_frame, HEADER_SIZE + 12);
+ encoded.len = state->outbound_buffer.len - HEADER_SIZE - 12 - FOOTER_SIZE;
+
+ res = amqp_encode_properties(frame->payload.properties.class_id,
+ frame->payload.properties.decoded, encoded);
+ if (res < 0)
+ return res;
+
+ out_frame_len = res + 12;
+ break;
+
+ case AMQP_FRAME_HEARTBEAT:
+ out_frame_len = 0;
break;
- }
default:
- return res;
+ abort();
+ }
+
+ amqp_e32(out_frame, 3, out_frame_len);
+ amqp_e8(out_frame, out_frame_len + HEADER_SIZE, AMQP_FRAME_END);
+ res = send(state->sockfd, out_frame,
+ out_frame_len + HEADER_SIZE + FOOTER_SIZE, 0);
}
if (res < 0)
@@ -454,35 +434,3 @@ int amqp_send_frame(amqp_connection_state_t state,
else
return 0;
}
-
-int amqp_send_frame_to(amqp_connection_state_t state,
- amqp_frame_t const *frame,
- amqp_output_fn_t fn,
- void *context)
-{
- amqp_bytes_t encoded;
- int payload_len;
- int separate_body;
-
- separate_body = inner_send_frame(state, frame, &encoded, &payload_len);
- switch (separate_body) {
- case 0:
- AMQP_CHECK_RESULT(fn(context,
- state->outbound_buffer.bytes,
- payload_len + (HEADER_SIZE + FOOTER_SIZE)));
- return 0;
-
- case 1:
- AMQP_CHECK_RESULT(fn(context, state->outbound_buffer.bytes, HEADER_SIZE));
- AMQP_CHECK_RESULT(fn(context, encoded.bytes, payload_len));
- {
- assert(FOOTER_SIZE == 1);
- char frame_end_byte = AMQP_FRAME_END;
- AMQP_CHECK_RESULT(fn(context, &frame_end_byte, FOOTER_SIZE));
- }
- return 0;
-
- default:
- return separate_body;
- }
-}
diff --git a/librabbitmq/amqp_mem.c b/librabbitmq/amqp_mem.c
index 021151a..3ad8c3e 100644
--- a/librabbitmq/amqp_mem.c
+++ b/librabbitmq/amqp_mem.c
@@ -56,7 +56,7 @@
#include <assert.h>
#include "amqp.h"
-#include "../config.h"
+#include "config.h"
char const *amqp_version(void) {
return VERSION; /* defined in config.h */
@@ -196,5 +196,5 @@ amqp_bytes_t amqp_bytes_malloc(size_t amount) {
}
void amqp_bytes_free(amqp_bytes_t bytes) {
- AMQP_BYTES_FREE(bytes);
+ free(bytes.bytes);
}
diff --git a/librabbitmq/amqp_private.h b/librabbitmq/amqp_private.h
index c30663a..eaf6b2c 100644
--- a/librabbitmq/amqp_private.h
+++ b/librabbitmq/amqp_private.h
@@ -51,9 +51,7 @@
* ***** END LICENSE BLOCK *****
*/
-#ifdef __cplusplus
-extern "C" {
-#endif
+#include "config.h"
/* Error numbering: Because of differences in error numbering on
* different platforms, we want to keep error numbers opaque for
@@ -78,42 +76,43 @@ extern "C" {
extern char *amqp_os_error_string(int err);
+#include "socket.h"
+
/*
- * Connection states:
+ * Connection states: XXX FIX THIS
*
- * - CONNECTION_STATE_IDLE: initial state, and entered again after
- * each frame is completed. Means that no bytes of the next frame
- * have been seen yet. Connections may only be reconfigured, and the
+ * - CONNECTION_STATE_INITIAL: The initial state, when we cannot be
+ * sure if the next thing we will get is the first AMQP frame, or a
+ * protocol header from the server.
+ *
+ * - CONNECTION_STATE_IDLE: The normal state between
+ * frames. Connections may only be reconfigured, and the
* connection's pools recycled, when in this state. Whenever we're
* in this state, the inbound_buffer's bytes pointer must be NULL;
* any other state, and it must point to a block of memory allocated
* from the frame_pool.
*
- * - CONNECTION_STATE_WAITING_FOR_HEADER: Some bytes of an incoming
- * frame have been seen, but not a complete frame header's worth.
- *
- * - CONNECTION_STATE_WAITING_FOR_BODY: A complete frame header has
- * been seen, but the frame is not yet complete. When it is
- * completed, it will be returned, and the connection will return to
- * IDLE state.
+ * - CONNECTION_STATE_HEADER: Some bytes of an incoming frame have
+ * been seen, but not a complete frame header's worth.
*
- * - CONNECTION_STATE_WAITING_FOR_PROTOCOL_HEADER: The beginning of a
- * protocol version header has been seen, but the full eight bytes
- * hasn't yet been received. When it is completed, it will be
+ * - CONNECTION_STATE_BODY: A complete frame header has been seen, but
+ * the frame is not yet complete. When it is completed, it will be
* returned, and the connection will return to IDLE state.
*
*/
typedef enum amqp_connection_state_enum_ {
CONNECTION_STATE_IDLE = 0,
- CONNECTION_STATE_WAITING_FOR_HEADER,
- CONNECTION_STATE_WAITING_FOR_BODY,
- CONNECTION_STATE_WAITING_FOR_PROTOCOL_HEADER
+ CONNECTION_STATE_INITIAL,
+ CONNECTION_STATE_HEADER,
+ CONNECTION_STATE_BODY
} amqp_connection_state_enum;
/* 7 bytes up front, then payload, then 1 byte footer */
#define HEADER_SIZE 7
#define FOOTER_SIZE 1
+#define AMQP_PSEUDOFRAME_PROTOCOL_HEADER 'A'
+
typedef struct amqp_link_t_ {
struct amqp_link_t_ *next;
void *data;
@@ -146,65 +145,107 @@ struct amqp_connection_state_t_ {
amqp_rpc_reply_t most_recent_api_result;
};
-#define CHECK_LIMIT(b, o, l, v) ({ if ((o + l) > (b).len) { return -ERROR_BAD_AMQP_DATA; } (v); })
-#define BUF_AT(b, o) (&(((uint8_t *) (b).bytes)[o]))
-
-#define D_8(b, o) CHECK_LIMIT(b, o, 1, * (uint8_t *) BUF_AT(b, o))
-#define D_16(b, o) CHECK_LIMIT(b, o, 2, ({uint16_t v; memcpy(&v, BUF_AT(b, o), 2); ntohs(v);}))
-#define D_32(b, o) CHECK_LIMIT(b, o, 4, ({uint32_t v; memcpy(&v, BUF_AT(b, o), 4); ntohl(v);}))
-#define D_64(b, o) ({ \
- uint64_t hi = D_32(b, o); \
- uint64_t lo = D_32(b, o + 4); \
- hi << 32 | lo; \
-})
-
-#define D_BYTES(b, o, l) CHECK_LIMIT(b, o, l, BUF_AT(b, o))
-
-#define E_8(b, o, v) CHECK_LIMIT(b, o, 1, * (uint8_t *) BUF_AT(b, o) = (v))
-#define E_16(b, o, v) CHECK_LIMIT(b, o, 2, ({uint16_t vv = htons(v); memcpy(BUF_AT(b, o), &vv, 2);}))
-#define E_32(b, o, v) CHECK_LIMIT(b, o, 4, ({uint32_t vv = htonl(v); memcpy(BUF_AT(b, o), &vv, 4);}))
-#define E_64(b, o, v) ({ \
- E_32(b, o, (uint32_t) (((uint64_t) v) >> 32)); \
- E_32(b, o + 4, (uint32_t) (((uint64_t) v) & 0xFFFFFFFF)); \
- })
-
-#define E_BYTES(b, o, l, v) CHECK_LIMIT(b, o, l, memcpy(BUF_AT(b, o), (v), (l)))
-
-extern int amqp_decode_table(amqp_bytes_t encoded,
- amqp_pool_t *pool,
- amqp_table_t *output,
- int *offsetptr);
-
-extern int amqp_encode_table(amqp_bytes_t encoded,
- amqp_table_t *input,
- int *offsetptr);
-
-#define amqp_assert(condition, ...) \
- ({ \
- if (!(condition)) { \
- fprintf(stderr, __VA_ARGS__); \
- fputc('\n', stderr); \
- abort(); \
- } \
- })
-
-#define AMQP_CHECK_RESULT_CLEANUP(expr, stmts) \
- ({ \
- int _result = (expr); \
- if (_result < 0) { stmts; return _result; } \
- _result; \
- })
-
-#define AMQP_CHECK_RESULT(expr) AMQP_CHECK_RESULT_CLEANUP(expr, )
-
-#ifndef NDEBUG
-extern void amqp_dump(void const *buffer, size_t len);
-#else
-#define amqp_dump(buffer, len) ((void) 0)
-#endif
+static inline void *amqp_offset(void *data, size_t offset)
+{
+ return (char *)data + offset;
+}
-#ifdef __cplusplus
+/* assuming a machine that supports unaligned accesses (for now) */
+
+#define DECLARE_CODEC_BASE_TYPE(bits, htonx, ntohx) \
+ \
+static inline void amqp_e##bits(void *data, size_t offset, \
+ uint##bits##_t val) \
+{ \
+ *(uint##bits##_t *)amqp_offset(data, offset) = htonx(val); \
+} \
+ \
+static inline uint##bits##_t amqp_d##bits(void *data, size_t offset) \
+{ \
+ return ntohx(*(uint##bits##_t *)amqp_offset(data, offset)); \
+} \
+ \
+static inline int amqp_encode_##bits(amqp_bytes_t encoded, size_t *offset, \
+ uint##bits##_t input) \
+ \
+{ \
+ size_t o = *offset; \
+ if ((*offset = o + bits / 8) <= encoded.len) { \
+ amqp_e##bits(encoded.bytes, o, input); \
+ return 1; \
+ } \
+ else { \
+ return 0; \
+ } \
+} \
+ \
+static inline int amqp_decode_##bits(amqp_bytes_t encoded, size_t *offset, \
+ uint##bits##_t *output) \
+ \
+{ \
+ size_t o = *offset; \
+ if ((*offset = o + bits / 8) <= encoded.len) { \
+ *output = amqp_d##bits(encoded.bytes, o); \
+ *output = ntohx(*(uint##bits##_t *)((char *)encoded.bytes + o)); \
+ return 1; \
+ } \
+ else { \
+ return 0; \
+ } \
}
-#endif
+
+/* assuming little endian (for now) */
+
+#define DECLARE_XTOXLL(func) \
+static inline uint64_t func##ll(uint64_t val) \
+{ \
+ union { \
+ uint64_t whole; \
+ uint32_t halves[2]; \
+ } u; \
+ uint32_t t; \
+ u.whole = val; \
+ t = u.halves[0]; \
+ u.halves[0] = func##l(u.halves[1]); \
+ u.halves[1] = func##l(t); \
+ return u.whole; \
+}
+
+DECLARE_XTOXLL(hton)
+DECLARE_XTOXLL(ntoh)
+
+DECLARE_CODEC_BASE_TYPE(8, (uint8_t), (uint8_t))
+DECLARE_CODEC_BASE_TYPE(16, htons, ntohs)
+DECLARE_CODEC_BASE_TYPE(32, htonl, ntohl)
+DECLARE_CODEC_BASE_TYPE(64, htonll, ntohll)
+
+static inline int amqp_encode_bytes(amqp_bytes_t encoded, size_t *offset,
+ amqp_bytes_t input)
+{
+ size_t o = *offset;
+ if ((*offset = o + input.len) <= encoded.len) {
+ memcpy(amqp_offset(encoded.bytes, o), input.bytes, input.len);
+ return 1;
+ }
+ else {
+ return 0;
+ }
+}
+
+static inline int amqp_decode_bytes(amqp_bytes_t encoded, size_t *offset,
+ amqp_bytes_t *output, size_t len)
+{
+ size_t o = *offset;
+ if ((*offset = o + len) <= encoded.len) {
+ output->bytes = amqp_offset(encoded.bytes, o);
+ output->len = len;
+ return 1;
+ }
+ else {
+ return 0;
+ }
+}
+
+extern void amqp_abort(const char *fmt, ...);
#endif
diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c
index 13f6376..f23b42b 100644
--- a/librabbitmq/amqp_socket.c
+++ b/librabbitmq/amqp_socket.c
@@ -59,8 +59,6 @@
#include "amqp_framing.h"
#include "amqp_private.h"
-#include "socket.h"
-
int amqp_open_socket(char const *hostname,
int portnumber)
@@ -98,37 +96,28 @@ int amqp_open_socket(char const *hostname,
return sockfd;
}
-static char *header() {
- static char header[8];
- header[0] = 'A';
- header[1] = 'M';
- header[2] = 'Q';
- header[3] = 'P';
- header[4] = 0;
- header[5] = AMQP_PROTOCOL_VERSION_MAJOR;
- header[6] = AMQP_PROTOCOL_VERSION_MINOR;
- header[7] = AMQP_PROTOCOL_VERSION_REVISION;
- return header;
-}
-
int amqp_send_header(amqp_connection_state_t state) {
- return send(state->sockfd, header(), 8, 0);
-}
-
-int amqp_send_header_to(amqp_connection_state_t state,
- amqp_output_fn_t fn,
- void *context)
-{
- return fn(context, header(), 8);
+ static const uint8_t header[8] = { 'A', 'M', 'Q', 'P', 0,
+ AMQP_PROTOCOL_VERSION_MAJOR,
+ AMQP_PROTOCOL_VERSION_MINOR,
+ AMQP_PROTOCOL_VERSION_REVISION };
+ return send(state->sockfd, (void *)header, 8, 0);
}
static amqp_bytes_t sasl_method_name(amqp_sasl_method_enum method) {
+ amqp_bytes_t res;
+
switch (method) {
- case AMQP_SASL_METHOD_PLAIN: return (amqp_bytes_t) {.len = 5, .bytes = "PLAIN"};
- default:
- amqp_assert(0, "Invalid SASL method: %d", (int) method);
+ case AMQP_SASL_METHOD_PLAIN:
+ res.bytes = "PLAIN";
+ res.len = 5;
+ break;
+
+ default:
+ amqp_abort("Invalid SASL method: %d", (int) method);
}
- abort(); /* unreachable */
+
+ return res;
}
static amqp_bytes_t sasl_response(amqp_pool_t *pool,
@@ -143,20 +132,23 @@ static amqp_bytes_t sasl_response(amqp_pool_t *pool,
size_t username_len = strlen(username);
char *password = va_arg(args, char *);
size_t password_len = strlen(password);
+ char *response_buf;
+
amqp_pool_alloc_bytes(pool, strlen(username) + strlen(password) + 2, &response);
- if (response.bytes == NULL) {
+ if (response.bytes == NULL)
/* We never request a zero-length block, because of the +2
above, so a NULL here really is ENOMEM. */
return response;
- }
- *BUF_AT(response, 0) = 0;
- memcpy(((char *) response.bytes) + 1, username, username_len);
- *BUF_AT(response, username_len + 1) = 0;
- memcpy(((char *) response.bytes) + username_len + 2, password, password_len);
+
+ response_buf = response.bytes;
+ response_buf[0] = 0;
+ memcpy(response_buf + 1, username, username_len);
+ response_buf[username_len + 1] = 0;
+ memcpy(response_buf + username_len + 2, password, password_len);
break;
}
default:
- amqp_assert(0, "Invalid SASL method: %d", (int) method);
+ amqp_abort("Invalid SASL method: %d", (int) method);
}
return response;
@@ -178,33 +170,37 @@ static int wait_frame_inner(amqp_connection_state_t state,
amqp_frame_t *decoded_frame)
{
while (1) {
- int result;
+ int res;
while (amqp_data_in_buffer(state)) {
amqp_bytes_t buffer;
buffer.len = state->sock_inbound_limit - state->sock_inbound_offset;
buffer.bytes = ((char *) state->sock_inbound_buffer.bytes) + state->sock_inbound_offset;
- AMQP_CHECK_RESULT((result = amqp_handle_input(state, buffer, decoded_frame)));
- state->sock_inbound_offset += result;
+
+ res = amqp_handle_input(state, buffer, decoded_frame);
+ if (res < 0)
+ return res;
+
+ state->sock_inbound_offset += res;
if (decoded_frame->frame_type != 0)
/* Complete frame was read. Return it. */
return 0;
/* Incomplete or ignored frame. Keep processing input. */
- assert(result != 0);
- }
+ assert(res != 0);
+ }
- result = recv(state->sockfd, state->sock_inbound_buffer.bytes,
+ res = recv(state->sockfd, state->sock_inbound_buffer.bytes,
state->sock_inbound_buffer.len, 0);
- if (result <= 0) {
- if (result == 0)
+ if (res <= 0) {
+ if (res == 0)
return -ERROR_CONNECTION_CLOSED;
else
return -amqp_socket_error();
}
- state->sock_inbound_limit = result;
+ state->sock_inbound_limit = res;
state->sock_inbound_offset = 0;
}
}
@@ -234,22 +230,22 @@ int amqp_simple_wait_method(amqp_connection_state_t state,
int res = amqp_simple_wait_frame(state, &frame);
if (res < 0)
return res;
-
- amqp_assert(frame.channel == expected_channel,
- "Expected 0x%08X method frame on channel %d, got frame on channel %d",
- expected_method,
- expected_channel,
- frame.channel);
- amqp_assert(frame.frame_type == AMQP_FRAME_METHOD,
- "Expected 0x%08X method frame on channel %d, got frame type %d",
- expected_method,
- expected_channel,
- frame.frame_type);
- amqp_assert(frame.payload.method.id == expected_method,
- "Expected method ID 0x%08X on channel %d, got ID 0x%08X",
- expected_method,
- expected_channel,
- frame.payload.method.id);
+
+ if (frame.channel != expected_channel)
+ amqp_abort("Expected 0x%08X method frame on channel %d, got frame on channel %d",
+ expected_method,
+ expected_channel,
+ frame.channel);
+ if (frame.frame_type != AMQP_FRAME_METHOD)
+ amqp_abort("Expected 0x%08X method frame on channel %d, got frame type %d",
+ expected_method,
+ expected_channel,
+ frame.frame_type);
+ if (frame.payload.method.id != expected_method)
+ amqp_abort("Expected method ID 0x%08X on channel %d, got ID 0x%08X",
+ expected_method,
+ expected_channel,
+ frame.payload.method.id);
*output = frame.payload.method;
return 0;
}
@@ -388,19 +384,23 @@ static int amqp_login_inner(amqp_connection_state_t state,
}
{
- amqp_bytes_t response_bytes = sasl_response(&state->decoding_pool, sasl_method, vl);
amqp_connection_start_ok_t s;
- if (response_bytes.bytes == NULL) {
+ amqp_bytes_t response_bytes = sasl_response(&state->decoding_pool,
+ sasl_method, vl);
+
+ if (response_bytes.bytes == NULL)
return -ERROR_NO_MEMORY;
- }
- s =
- (amqp_connection_start_ok_t) {
- .client_properties = {.num_entries = 0, .entries = NULL},
- .mechanism = sasl_method_name(sasl_method),
- .response = response_bytes,
- .locale = {.len = 5, .bytes = "en_US"}
- };
- AMQP_CHECK_RESULT(amqp_send_method(state, 0, AMQP_CONNECTION_START_OK_METHOD, &s));
+
+ s.client_properties.num_entries = 0;
+ s.client_properties.entries = NULL;
+ s.mechanism = sasl_method_name(sasl_method);
+ s.response = response_bytes;
+ s.locale.bytes = "en_US";
+ s.locale.len = 5;
+
+ res = amqp_send_method(state, 0, AMQP_CONNECTION_START_OK_METHOD, &s);
+ if (res < 0)
+ return res;
}
amqp_release_buffers(state);
@@ -409,7 +409,7 @@ static int amqp_login_inner(amqp_connection_state_t state,
&method);
if (res < 0)
return res;
-
+
{
amqp_connection_tune_t *s = (amqp_connection_tune_t *) method.decoded;
server_channel_max = s->channel_max;
@@ -417,28 +417,28 @@ static int amqp_login_inner(amqp_connection_state_t state,
server_heartbeat = s->heartbeat;
}
- if (server_channel_max != 0 && server_channel_max < channel_max) {
+ if (server_channel_max != 0 && server_channel_max < channel_max)
channel_max = server_channel_max;
- }
- if (server_frame_max != 0 && server_frame_max < frame_max) {
+ if (server_frame_max != 0 && server_frame_max < frame_max)
frame_max = server_frame_max;
- }
- if (server_heartbeat != 0 && server_heartbeat < heartbeat) {
+ if (server_heartbeat != 0 && server_heartbeat < heartbeat)
heartbeat = server_heartbeat;
- }
- AMQP_CHECK_RESULT(amqp_tune_connection(state, channel_max, frame_max, heartbeat));
+ res = amqp_tune_connection(state, channel_max, frame_max, heartbeat);
+ if (res < 0)
+ return res;
{
- amqp_connection_tune_ok_t s =
- (amqp_connection_tune_ok_t) {
- .channel_max = channel_max,
- .frame_max = frame_max,
- .heartbeat = heartbeat
- };
- AMQP_CHECK_RESULT(amqp_send_method(state, 0, AMQP_CONNECTION_TUNE_OK_METHOD, &s));
+ amqp_connection_tune_ok_t s;
+ s.frame_max = frame_max;
+ s.channel_max = channel_max;
+ s.heartbeat = heartbeat;
+
+ res = amqp_send_method(state, 0, AMQP_CONNECTION_TUNE_OK_METHOD, &s);
+ if (res < 0)
+ return res;
}
amqp_release_buffers(state);
@@ -470,21 +470,20 @@ amqp_rpc_reply_t amqp_login(amqp_connection_state_t state,
}
{
- amqp_connection_open_t s =
- (amqp_connection_open_t) {
- .virtual_host = amqp_cstring_bytes(vhost),
- .capabilities = {.len = 0, .bytes = NULL},
- .insist = 1
- };
amqp_method_number_t replies[] = { AMQP_CONNECTION_OPEN_OK_METHOD, 0 };
+ amqp_connection_open_t s;
+ s.virtual_host = amqp_cstring_bytes(vhost);
+ s.capabilities.len = 0;
+ s.capabilities.bytes = NULL;
+ s.insist = 1;
+
result = amqp_simple_rpc(state,
0,
AMQP_CONNECTION_OPEN_METHOD,
(amqp_method_number_t *) &replies,
&s);
- if (result.reply_type != AMQP_RESPONSE_NORMAL) {
+ if (result.reply_type != AMQP_RESPONSE_NORMAL)
return result;
- }
}
amqp_maybe_release_buffers(state);
diff --git a/librabbitmq/amqp_table.c b/librabbitmq/amqp_table.c
index 3f5eb61..e85217f 100644
--- a/librabbitmq/amqp_table.c
+++ b/librabbitmq/amqp_table.c
@@ -55,7 +55,6 @@
#include "amqp.h"
#include "amqp_private.h"
-#include "socket.h"
#include <assert.h>
@@ -65,356 +64,360 @@
static int amqp_decode_field_value(amqp_bytes_t encoded,
amqp_pool_t *pool,
amqp_field_value_t *entry,
- int *offsetptr); /* forward */
+ size_t *offset);
static int amqp_encode_field_value(amqp_bytes_t encoded,
amqp_field_value_t *entry,
- int *offsetptr); /* forward */
+ size_t *offset);
/*---------------------------------------------------------------------------*/
static int amqp_decode_array(amqp_bytes_t encoded,
amqp_pool_t *pool,
amqp_array_t *output,
- int *offsetptr)
+ size_t *offset)
{
- int offset = *offsetptr;
- uint32_t arraysize = D_32(encoded, offset);
+ uint32_t arraysize;
int num_entries = 0;
- amqp_field_value_t *entries = malloc(INITIAL_ARRAY_SIZE * sizeof(amqp_field_value_t));
int allocated_entries = INITIAL_ARRAY_SIZE;
- int limit;
+ amqp_field_value_t *entries;
+ size_t limit;
+ int res;
- if (entries == NULL) {
- return -ERROR_NO_MEMORY;
- }
+ if (!amqp_decode_32(encoded, offset, &arraysize))
+ return -ERROR_BAD_AMQP_DATA;
- offset += 4;
- limit = offset + arraysize;
+ entries = malloc(allocated_entries * sizeof(amqp_field_value_t));
+ if (entries == NULL)
+ return -ERROR_NO_MEMORY;
- while (offset < limit) {
+ limit = *offset + arraysize;
+ while (*offset < limit) {
if (num_entries >= allocated_entries) {
void *newentries;
allocated_entries = allocated_entries * 2;
newentries = realloc(entries, allocated_entries * sizeof(amqp_field_value_t));
- if (newentries == NULL) {
- free(entries);
- return -ERROR_NO_MEMORY;
- }
+ res = -ERROR_NO_MEMORY;
+ if (newentries == NULL)
+ goto out;
+
entries = newentries;
}
- AMQP_CHECK_RESULT_CLEANUP(amqp_decode_field_value(encoded,
- pool,
- &entries[num_entries],
- &offset),
- free(entries));
+ res = amqp_decode_field_value(encoded, pool, &entries[num_entries],
+ offset);
+ if (res < 0)
+ goto out;
+
num_entries++;
}
output->num_entries = num_entries;
output->entries = amqp_pool_alloc(pool, num_entries * sizeof(amqp_field_value_t));
- if (output->entries == NULL && num_entries > 0) {
- /* NULL is legitimate if we requested a zero-length block. */
- free(entries);
- return -ERROR_NO_MEMORY;
- }
+ res = -ERROR_NO_MEMORY;
+ /* NULL is legitimate if we requested a zero-length block. */
+ if (output->entries == NULL && num_entries > 0)
+ goto out;
memcpy(output->entries, entries, num_entries * sizeof(amqp_field_value_t));
- free(entries);
+ res = 0;
- *offsetptr = offset;
- return 0;
+ out:
+ free(entries);
+ return res;
}
int amqp_decode_table(amqp_bytes_t encoded,
amqp_pool_t *pool,
amqp_table_t *output,
- int *offsetptr)
+ size_t *offset)
{
- int offset = *offsetptr;
- uint32_t tablesize = D_32(encoded, offset);
+ uint32_t tablesize;
int num_entries = 0;
- amqp_table_entry_t *entries = malloc(INITIAL_TABLE_SIZE * sizeof(amqp_table_entry_t));
+ amqp_table_entry_t *entries;
int allocated_entries = INITIAL_TABLE_SIZE;
- int limit;
+ size_t limit;
+ int res;
- if (entries == NULL) {
- return -ERROR_NO_MEMORY;
- }
+ if (!amqp_decode_32(encoded, offset, &tablesize))
+ return -ERROR_BAD_AMQP_DATA;
- offset += 4;
- limit = offset + tablesize;
+ entries = malloc(allocated_entries * sizeof(amqp_table_entry_t));
+ if (entries == NULL)
+ return -ERROR_NO_MEMORY;
- while (offset < limit) {
- size_t keylen;
- amqp_table_entry_t *entry;
+ limit = *offset + tablesize;
+ while (*offset < limit) {
+ uint8_t keylen;
- keylen = D_8(encoded, offset);
- offset++;
+ res = -ERROR_BAD_AMQP_DATA;
+ if (!amqp_decode_8(encoded, offset, &keylen))
+ goto out;
if (num_entries >= allocated_entries) {
void *newentries;
allocated_entries = allocated_entries * 2;
newentries = realloc(entries, allocated_entries * sizeof(amqp_table_entry_t));
- if (newentries == NULL) {
- free(entries);
- return -ERROR_NO_MEMORY;
- }
+ res = -ERROR_NO_MEMORY;
+ if (newentries == NULL)
+ goto out;
+
entries = newentries;
}
- entry = &entries[num_entries];
- entry->key.len = keylen;
- entry->key.bytes = D_BYTES(encoded, offset, keylen);
- offset += keylen;
+ res = -ERROR_BAD_AMQP_DATA;
+ if (!amqp_decode_bytes(encoded, offset, &entries[num_entries].key, keylen))
+ goto out;
+
+ res = amqp_decode_field_value(encoded, pool, &entries[num_entries].value,
+ offset);
+ if (res < 0)
+ goto out;
- AMQP_CHECK_RESULT_CLEANUP(amqp_decode_field_value(encoded,
- pool,
- &entry->value,
- &offset),
- free(entries));
num_entries++;
}
output->num_entries = num_entries;
output->entries = amqp_pool_alloc(pool, num_entries * sizeof(amqp_table_entry_t));
- if (output->entries == NULL && num_entries > 0) {
- /* NULL is legitimate if we requested a zero-length block. */
- free(entries);
- return -ERROR_NO_MEMORY;
- }
+ res = -ERROR_NO_MEMORY;
+ /* NULL is legitimate if we requested a zero-length block. */
+ if (output->entries == NULL && num_entries > 0)
+ goto out;
memcpy(output->entries, entries, num_entries * sizeof(amqp_table_entry_t));
- free(entries);
+ res = 0;
- *offsetptr = offset;
- return 0;
+ out:
+ free(entries);
+ return res;
}
static int amqp_decode_field_value(amqp_bytes_t encoded,
amqp_pool_t *pool,
amqp_field_value_t *entry,
- int *offsetptr)
+ size_t *offset)
{
- int offset = *offsetptr;
+ int res = -ERROR_BAD_AMQP_DATA;
- entry->kind = D_8(encoded, offset);
- offset++;
+ if (!amqp_decode_8(encoded, offset, &entry->kind))
+ goto out;
+
+#define TRIVIAL_FIELD_DECODER(bits) if (!amqp_decode_##bits(encoded, offset, &entry->value.u##bits)) goto out; break
+#define SIMPLE_FIELD_DECODER(bits, dest, how) { uint##bits##_t val; if (!amqp_decode_##bits(encoded, offset, &val)) goto out; entry->value.dest = how; } break
switch (entry->kind) {
- case AMQP_FIELD_KIND_BOOLEAN:
- entry->value.boolean = D_8(encoded, offset) ? 1 : 0;
- offset++;
- break;
- case AMQP_FIELD_KIND_I8:
- entry->value.i8 = (int8_t) D_8(encoded, offset);
- offset++;
- break;
- case AMQP_FIELD_KIND_U8:
- entry->value.u8 = D_8(encoded, offset);
- offset++;
- break;
- case AMQP_FIELD_KIND_I16:
- entry->value.i16 = (int16_t) D_16(encoded, offset);
- offset += 2;
- break;
- case AMQP_FIELD_KIND_U16:
- entry->value.u16 = D_16(encoded, offset);
- offset += 2;
- break;
- case AMQP_FIELD_KIND_I32:
- entry->value.i32 = (int32_t) D_32(encoded, offset);
- offset += 4;
- break;
- case AMQP_FIELD_KIND_U32:
- entry->value.u32 = D_32(encoded, offset);
- offset += 4;
- break;
- case AMQP_FIELD_KIND_I64:
- entry->value.i64 = (int64_t) D_64(encoded, offset);
- offset += 8;
- break;
- case AMQP_FIELD_KIND_F32:
- entry->value.u32 = D_32(encoded, offset);
- /* and by punning, f32 magically gets the right value...! */
- offset += 4;
- break;
- case AMQP_FIELD_KIND_F64:
- entry->value.u64 = D_64(encoded, offset);
- /* and by punning, f64 magically gets the right value...! */
- offset += 8;
- break;
- case AMQP_FIELD_KIND_DECIMAL:
- entry->value.decimal.decimals = D_8(encoded, offset);
- offset++;
- entry->value.decimal.value = D_32(encoded, offset);
- offset += 4;
- break;
- case AMQP_FIELD_KIND_UTF8:
- /* AMQP_FIELD_KIND_UTF8 and AMQP_FIELD_KIND_BYTES have the
- same implementation, but different interpretations. */
- /* fall through */
- case AMQP_FIELD_KIND_BYTES:
- entry->value.bytes.len = D_32(encoded, offset);
- offset += 4;
- entry->value.bytes.bytes = D_BYTES(encoded, offset, entry->value.bytes.len);
- offset += entry->value.bytes.len;
- break;
- case AMQP_FIELD_KIND_ARRAY:
- AMQP_CHECK_RESULT(amqp_decode_array(encoded, pool, &(entry->value.array), &offset));
- break;
- case AMQP_FIELD_KIND_TIMESTAMP:
- entry->value.u64 = D_64(encoded, offset);
- offset += 8;
- break;
- case AMQP_FIELD_KIND_TABLE:
- AMQP_CHECK_RESULT(amqp_decode_table(encoded, pool, &(entry->value.table), &offset));
- break;
- case AMQP_FIELD_KIND_VOID:
- break;
- default:
- return -ERROR_BAD_AMQP_DATA;
+ case AMQP_FIELD_KIND_BOOLEAN:
+ SIMPLE_FIELD_DECODER(8, boolean, val ? 1 : 0);
+
+ case AMQP_FIELD_KIND_I8:
+ SIMPLE_FIELD_DECODER(8, i8, (int8_t)val);
+ case AMQP_FIELD_KIND_U8:
+ TRIVIAL_FIELD_DECODER(8);
+
+ case AMQP_FIELD_KIND_I16:
+ SIMPLE_FIELD_DECODER(16, i16, (int16_t)val);
+ case AMQP_FIELD_KIND_U16:
+ TRIVIAL_FIELD_DECODER(16);
+
+ case AMQP_FIELD_KIND_I32:
+ SIMPLE_FIELD_DECODER(32, i32, (int32_t)val);
+ case AMQP_FIELD_KIND_U32:
+ TRIVIAL_FIELD_DECODER(32);
+
+ case AMQP_FIELD_KIND_I64:
+ SIMPLE_FIELD_DECODER(64, i64, (int64_t)val);
+ case AMQP_FIELD_KIND_U64:
+ TRIVIAL_FIELD_DECODER(64);
+
+ case AMQP_FIELD_KIND_F32:
+ TRIVIAL_FIELD_DECODER(32);
+ /* and by punning, f32 magically gets the right value...! */
+
+ case AMQP_FIELD_KIND_F64:
+ TRIVIAL_FIELD_DECODER(64);
+ /* and by punning, f64 magically gets the right value...! */
+
+ case AMQP_FIELD_KIND_DECIMAL:
+ if (!amqp_decode_8(encoded, offset, &entry->value.decimal.decimals)
+ || !amqp_decode_32(encoded, offset, &entry->value.decimal.value))
+ goto out;
+ break;
+
+ case AMQP_FIELD_KIND_UTF8:
+ /* AMQP_FIELD_KIND_UTF8 and AMQP_FIELD_KIND_BYTES have the
+ same implementation, but different interpretations. */
+ /* fall through */
+ case AMQP_FIELD_KIND_BYTES: {
+ uint32_t len;
+ if (!amqp_decode_32(encoded, offset, &len)
+ || !amqp_decode_bytes(encoded, offset, &entry->value.bytes, len))
+ goto out;
+ break;
}
- *offsetptr = offset;
- return 0;
+ case AMQP_FIELD_KIND_ARRAY:
+ res = amqp_decode_array(encoded, pool, &(entry->value.array), offset);
+ goto out;
+
+ case AMQP_FIELD_KIND_TIMESTAMP:
+ TRIVIAL_FIELD_DECODER(64);
+
+ case AMQP_FIELD_KIND_TABLE:
+ res = amqp_decode_table(encoded, pool, &(entry->value.table), offset);
+ goto out;
+
+ case AMQP_FIELD_KIND_VOID:
+ break;
+
+ default:
+ goto out;
+ }
+
+ res = 0;
+
+ out:
+ return res;
}
/*---------------------------------------------------------------------------*/
static int amqp_encode_array(amqp_bytes_t encoded,
amqp_array_t *input,
- int *offsetptr)
+ size_t *offset)
{
- int offset = *offsetptr;
- int arraysize_offset = offset;
- int i;
+ size_t start = *offset;
+ int i, res;
- offset += 4; /* skip space for the size of the array to be filled in later */
+ *offset += 4; /* size of the array gets filled in later on */
for (i = 0; i < input->num_entries; i++) {
- AMQP_CHECK_RESULT(amqp_encode_field_value(encoded, &(input->entries[i]), &offset));
+ res = amqp_encode_field_value(encoded, &input->entries[i], offset);
+ if (res < 0)
+ goto out;
}
- E_32(encoded, arraysize_offset, (offset - *offsetptr - 4));
- *offsetptr = offset;
- return 0;
+ if (amqp_encode_32(encoded, &start, *offset - start - 4))
+ res = 0;
+ else
+ res = -ERROR_BAD_AMQP_DATA;
+
+ out:
+ return res;
}
int amqp_encode_table(amqp_bytes_t encoded,
amqp_table_t *input,
- int *offsetptr)
+ size_t *offset)
{
- int offset = *offsetptr;
- int tablesize_offset = offset;
- int i;
+ size_t start = *offset;
+ int i, res;
- offset += 4; /* skip space for the size of the table to be filled in later */
+ *offset += 4; /* size of the table gets filled in later on */
for (i = 0; i < input->num_entries; i++) {
- amqp_table_entry_t *entry = &(input->entries[i]);
-
- E_8(encoded, offset, entry->key.len);
- offset++;
+ res = amqp_encode_8(encoded, offset, input->entries[i].key.len);
+ if (res < 0)
+ goto out;
- E_BYTES(encoded, offset, entry->key.len, entry->key.bytes);
- offset += entry->key.len;
+ res = amqp_encode_bytes(encoded, offset, input->entries[i].key);
+ if (res < 0)
+ goto out;
- AMQP_CHECK_RESULT(amqp_encode_field_value(encoded, &(entry->value), &offset));
+ res = amqp_encode_field_value(encoded, &input->entries[i].value, offset);
+ if (res < 0)
+ goto out;
}
- E_32(encoded, tablesize_offset, (offset - *offsetptr - 4));
- *offsetptr = offset;
- return 0;
+ if (amqp_encode_32(encoded, &start, *offset - start - 4))
+ res = 0;
+ else
+ res = -ERROR_BAD_AMQP_DATA;
+
+ out:
+ return res;
}
static int amqp_encode_field_value(amqp_bytes_t encoded,
amqp_field_value_t *entry,
- int *offsetptr)
+ size_t *offset)
{
- int offset = *offsetptr;
+ int res = -ERROR_BAD_AMQP_DATA;
+
+ if (!amqp_encode_8(encoded, offset, entry->kind))
+ goto out;
- E_8(encoded, offset, entry->kind);
- offset++;
+#define FIELD_ENCODER(bits, val) if (!amqp_encode_##bits(encoded, offset, val)) goto out; break
switch (entry->kind) {
- case AMQP_FIELD_KIND_BOOLEAN:
- E_8(encoded, offset, entry->value.boolean ? 1 : 0);
- offset++;
- break;
- case AMQP_FIELD_KIND_I8:
- E_8(encoded, offset, (uint8_t) entry->value.i8);
- offset++;
- break;
- case AMQP_FIELD_KIND_U8:
- E_8(encoded, offset, entry->value.u8);
- offset++;
- break;
- case AMQP_FIELD_KIND_I16:
- E_16(encoded, offset, (uint16_t) entry->value.i16);
- offset += 2;
- break;
- case AMQP_FIELD_KIND_U16:
- E_16(encoded, offset, entry->value.u16);
- offset += 2;
- break;
- case AMQP_FIELD_KIND_I32:
- E_32(encoded, offset, (uint32_t) entry->value.i32);
- offset += 4;
- break;
- case AMQP_FIELD_KIND_U32:
- E_32(encoded, offset, entry->value.u32);
- offset += 4;
- break;
- case AMQP_FIELD_KIND_I64:
- E_64(encoded, offset, (uint64_t) entry->value.i64);
- offset += 8;
- break;
- case AMQP_FIELD_KIND_F32:
- /* by punning, u32 magically gets the right value...! */
- E_32(encoded, offset, entry->value.u32);
- offset += 4;
- break;
- case AMQP_FIELD_KIND_F64:
- /* by punning, u64 magically gets the right value...! */
- E_64(encoded, offset, entry->value.u64);
- offset += 8;
- break;
- case AMQP_FIELD_KIND_DECIMAL:
- E_8(encoded, offset, entry->value.decimal.decimals);
- offset++;
- E_32(encoded, offset, entry->value.decimal.value);
- offset += 4;
- break;
- case AMQP_FIELD_KIND_UTF8:
- /* AMQP_FIELD_KIND_UTF8 and AMQP_FIELD_KIND_BYTES have the
- same implementation, but different interpretations. */
- /* fall through */
- case AMQP_FIELD_KIND_BYTES:
- E_32(encoded, offset, entry->value.bytes.len);
- offset += 4;
- E_BYTES(encoded, offset, entry->value.bytes.len, entry->value.bytes.bytes);
- offset += entry->value.bytes.len;
- break;
- case AMQP_FIELD_KIND_ARRAY:
- AMQP_CHECK_RESULT(amqp_encode_array(encoded, &(entry->value.array), &offset));
- break;
- case AMQP_FIELD_KIND_TIMESTAMP:
- E_64(encoded, offset, entry->value.u64);
- offset += 8;
- break;
- case AMQP_FIELD_KIND_TABLE:
- AMQP_CHECK_RESULT(amqp_encode_table(encoded, &(entry->value.table), &offset));
- break;
- case AMQP_FIELD_KIND_VOID:
- break;
- default:
- abort();
+ case AMQP_FIELD_KIND_BOOLEAN:
+ FIELD_ENCODER(8, entry->value.boolean ? 1 : 0);
+
+ case AMQP_FIELD_KIND_I8:
+ FIELD_ENCODER(8, entry->value.i8);
+ case AMQP_FIELD_KIND_U8:
+ FIELD_ENCODER(8, entry->value.u8);
+
+ case AMQP_FIELD_KIND_I16:
+ FIELD_ENCODER(16, entry->value.i16);
+ case AMQP_FIELD_KIND_U16:
+ FIELD_ENCODER(16, entry->value.u16);
+
+ case AMQP_FIELD_KIND_I32:
+ FIELD_ENCODER(32, entry->value.i32);
+ case AMQP_FIELD_KIND_U32:
+ FIELD_ENCODER(32, entry->value.u32);
+
+ case AMQP_FIELD_KIND_I64:
+ FIELD_ENCODER(64, entry->value.i64);
+ case AMQP_FIELD_KIND_U64:
+ FIELD_ENCODER(64, entry->value.u64);
+
+ case AMQP_FIELD_KIND_F32:
+ /* by punning, u32 magically gets the right value...! */
+ FIELD_ENCODER(32, entry->value.u32);
+
+ case AMQP_FIELD_KIND_F64:
+ /* by punning, u64 magically gets the right value...! */
+ FIELD_ENCODER(64, entry->value.u64);
+
+ case AMQP_FIELD_KIND_DECIMAL:
+ if (!amqp_encode_8(encoded, offset, entry->value.decimal.decimals)
+ || !amqp_encode_32(encoded, offset, entry->value.decimal.value))
+ goto out;
+ break;
+
+ case AMQP_FIELD_KIND_UTF8:
+ /* AMQP_FIELD_KIND_UTF8 and AMQP_FIELD_KIND_BYTES have the
+ same implementation, but different interpretations. */
+ /* fall through */
+ case AMQP_FIELD_KIND_BYTES:
+ if (!amqp_encode_32(encoded, offset, entry->value.bytes.len)
+ || !amqp_encode_bytes(encoded, offset, entry->value.bytes))
+ goto out;
+ break;
+
+ case AMQP_FIELD_KIND_ARRAY:
+ res = amqp_encode_array(encoded, &entry->value.array, offset);
+ goto out;
+
+ case AMQP_FIELD_KIND_TIMESTAMP:
+ FIELD_ENCODER(64, entry->value.u64);
+
+ case AMQP_FIELD_KIND_TABLE:
+ res = amqp_encode_table(encoded, &entry->value.table, offset);
+ goto out;
+
+ case AMQP_FIELD_KIND_VOID:
+ break;
+
+ default:
+ abort();
}
- *offsetptr = offset;
- return 0;
+ res = 0;
+
+ out:
+ return res;
}
/*---------------------------------------------------------------------------*/
diff --git a/librabbitmq/codegen.py b/librabbitmq/codegen.py
index f911966..6fd149e 100644
--- a/librabbitmq/codegen.py
+++ b/librabbitmq/codegen.py
@@ -52,18 +52,162 @@ from amqp_codegen import *
import string
import re
-cTypeMap = {
- 'octet': 'uint8_t',
- 'shortstr': 'amqp_bytes_t',
- 'longstr': 'amqp_bytes_t',
- 'short': 'uint16_t',
- 'long': 'uint32_t',
- 'longlong': 'uint64_t',
- 'bit': 'amqp_boolean_t',
- 'table': 'amqp_table_t',
- 'timestamp': 'uint64_t',
+
+class Emitter(object):
+ """An object the trivially emits generated code lines.
+
+ This largely exists to be wrapped by more sophisticated emitter
+ classes.
+ """
+
+ def __init__(self, prefix):
+ self.prefix = prefix
+
+ def emit(self, line):
+ """Emit a line of generated code."""
+ print self.prefix + line
+
+
+class BitDecoder(object):
+ """An emitter object that keeps track of the state involved in
+ decoding the AMQP bit type."""
+
+ def __init__(self, emitter):
+ self.emitter = emitter
+ self.bit = 0
+
+ def emit(self, line):
+ self.bit = 0
+ self.emitter.emit(line)
+
+ def decode_bit(self, lvalue):
+ """Generate code to decode a value of the AMQP bit type into
+ the given lvalue."""
+ if self.bit == 0:
+ self.emitter.emit("if (!amqp_decode_8(encoded, &offset, &bit_buffer)) return -ERROR_BAD_AMQP_DATA;")
+
+ self.emitter.emit("%s = (bit_buffer & (1 << %d)) ? 1 : 0;"
+ % (lvalue, self.bit))
+ self.bit += 1
+ if self.bit == 8:
+ self.bit = 0
+
+
+class BitEncoder(object):
+ """An emitter object that keeps track of the state involved in
+ encoding the AMQP bit type."""
+
+ def __init__(self, emitter):
+ self.emitter = emitter
+ self.bit = 0
+
+ def flush(self):
+ """Flush the state associated with AMQP bit types."""
+ if self.bit:
+ self.emitter.emit("if (!amqp_encode_8(encoded, &offset, bit_buffer)) return -ERROR_BAD_AMQP_DATA;")
+ self.bit = 0
+
+ def emit(self, line):
+ self.flush()
+ self.emitter.emit(line)
+
+ def encode_bit(self, value):
+ """Generate code to ebcode a value of the AMQP bit type from
+ the given value."""
+ if self.bit == 0:
+ self.emitter.emit("bit_buffer = 0;")
+
+ self.emitter.emit("if (%s) bit_buffer |= (1 << %d);"
+ % (value, self.bit))
+ self.bit += 1
+ if self.bit == 8:
+ self.flush()
+
+
+class SimpleType(object):
+ """A AMQP type that corresponds to a simple scalar C value of a
+ certain width."""
+
+ def __init__(self, bits):
+ self.bits = bits
+ self.ctype = "uint%d_t" % (bits,)
+
+ def decode(self, emitter, lvalue):
+ emitter.emit("if (!amqp_decode_%d(encoded, &offset, &%s)) return -ERROR_BAD_AMQP_DATA;" % (self.bits, lvalue))
+
+ def encode(self, emitter, value):
+ emitter.emit("if (!amqp_encode_%d(encoded, &offset, %s)) return -ERROR_BAD_AMQP_DATA;" % (self.bits, value))
+
+
+class StrType(object):
+ """The AMQP shortstr or longstr types."""
+
+ def __init__(self, lenbits):
+ self.lenbits = lenbits
+ self.ctype = "amqp_bytes_t"
+
+ def decode(self, emitter, lvalue):
+ emitter.emit("{")
+ emitter.emit(" uint%d_t len;" % (self.lenbits,))
+ emitter.emit(" if (!amqp_decode_%d(encoded, &offset, &len)" % (self.lenbits,))
+ emitter.emit(" || !amqp_decode_bytes(encoded, &offset, &%s, len))" % (lvalue,))
+ emitter.emit(" return -ERROR_BAD_AMQP_DATA;")
+ emitter.emit("}")
+
+ def encode(self, emitter, value):
+ emitter.emit("if (!amqp_encode_%d(encoded, &offset, %s.len)" % (self.lenbits, value))
+ emitter.emit(" || !amqp_encode_bytes(encoded, &offset, %s))" % (value,))
+ emitter.emit(" return -ERROR_BAD_AMQP_DATA;")
+
+
+class BitType(object):
+ """The AMQP bit type."""
+
+ def __init__(self):
+ self.ctype = "amqp_boolean_t"
+
+ def decode(self, emitter, lvalue):
+ emitter.decode_bit(lvalue)
+
+ def encode(self, emitter, value):
+ emitter.encode_bit(value)
+
+
+class TableType(object):
+ """The AMQP table type."""
+
+ def __init__(self):
+ self.ctype = "amqp_table_t"
+
+ def decode(self, emitter, lvalue):
+ emitter.emit("{")
+ emitter.emit(" int res = amqp_decode_table(encoded, pool, &(%s), &offset);" % (lvalue,))
+ emitter.emit(" if (res < 0) return res;")
+ emitter.emit("}")
+
+ def encode(self, emitter, value):
+ emitter.emit("{")
+ emitter.emit(" int res = amqp_encode_table(encoded, &(%s), &offset);" % (value,))
+ emitter.emit(" if (res < 0) return res;")
+ emitter.emit("}")
+
+
+types = {
+ 'octet': SimpleType(8),
+ 'short': SimpleType(16),
+ 'long': SimpleType(32),
+ 'longlong': SimpleType(64),
+ 'shortstr': StrType(8),
+ 'longstr': StrType(32),
+ 'bit': BitType(),
+ 'table': TableType(),
+ 'timestamp': SimpleType(64),
}
+def typeFor(spec, f):
+ """Get a representation of the AMQP type of a field."""
+ return types[spec.resolveDomain(f.domain)]
+
def c_ize(s):
s = s.replace('-', '_')
s = s.replace(' ', '_')
@@ -81,9 +225,6 @@ def cFlagName(c, f):
return cConstantName(c.name + '_' + f.name) + '_FLAG'
def genErl(spec):
- def cType(domain):
- return cTypeMap[spec.resolveDomain(domain)]
-
def fieldTempList(fields):
return '[' + ', '.join(['F' + str(f.index) for f in fields]) + ']'
@@ -93,78 +234,6 @@ def genErl(spec):
def genLookupMethodName(m):
print ' case %s: return "%s";' % (m.defName(), m.defName())
- def genSingleDecode(prefix, cLvalue, unresolved_domain):
- type = spec.resolveDomain(unresolved_domain)
- if type == 'shortstr':
- print prefix + "%s.len = D_8(encoded, offset);" % (cLvalue,)
- print prefix + "offset++;"
- print prefix + "%s.bytes = D_BYTES(encoded, offset, %s.len);" % (cLvalue, cLvalue)
- print prefix + "offset += %s.len;" % (cLvalue,)
- elif type == 'longstr':
- print prefix + "%s.len = D_32(encoded, offset);" % (cLvalue,)
- print prefix + "offset += 4;"
- print prefix + "%s.bytes = D_BYTES(encoded, offset, %s.len);" % (cLvalue, cLvalue)
- print prefix + "offset += %s.len;" % (cLvalue,)
- elif type == 'octet':
- print prefix + "%s = D_8(encoded, offset);" % (cLvalue,)
- print prefix + "offset++;"
- elif type == 'short':
- print prefix + "%s = D_16(encoded, offset);" % (cLvalue,)
- print prefix + "offset += 2;"
- elif type == 'long':
- print prefix + "%s = D_32(encoded, offset);" % (cLvalue,)
- print prefix + "offset += 4;"
- elif type == 'longlong':
- print prefix + "%s = D_64(encoded, offset);" % (cLvalue,)
- print prefix + "offset += 8;"
- elif type == 'timestamp':
- print prefix + "%s = D_64(encoded, offset);" % (cLvalue,)
- print prefix + "offset += 8;"
- elif type == 'bit':
- raise "Can't decode bit in genSingleDecode"
- elif type == 'table':
- print prefix + "table_result = amqp_decode_table(encoded, pool, &(%s), &offset);" % \
- (cLvalue,)
- print prefix + "AMQP_CHECK_RESULT(table_result);"
- else:
- raise "Illegal domain in genSingleDecode", type
-
- def genSingleEncode(prefix, cValue, unresolved_domain):
- type = spec.resolveDomain(unresolved_domain)
- if type == 'shortstr':
- print prefix + "E_8(encoded, offset, %s.len);" % (cValue,)
- print prefix + "offset++;"
- print prefix + "E_BYTES(encoded, offset, %s.len, %s.bytes);" % (cValue, cValue)
- print prefix + "offset += %s.len;" % (cValue,)
- elif type == 'longstr':
- print prefix + "E_32(encoded, offset, %s.len);" % (cValue,)
- print prefix + "offset += 4;"
- print prefix + "E_BYTES(encoded, offset, %s.len, %s.bytes);" % (cValue, cValue)
- print prefix + "offset += %s.len;" % (cValue,)
- elif type == 'octet':
- print prefix + "E_8(encoded, offset, %s);" % (cValue,)
- print prefix + "offset++;"
- elif type == 'short':
- print prefix + "E_16(encoded, offset, %s);" % (cValue,)
- print prefix + "offset += 2;"
- elif type == 'long':
- print prefix + "E_32(encoded, offset, %s);" % (cValue,)
- print prefix + "offset += 4;"
- elif type == 'longlong':
- print prefix + "E_64(encoded, offset, %s);" % (cValue,)
- print prefix + "offset += 8;"
- elif type == 'timestamp':
- print prefix + "E_64(encoded, offset, %s);" % (cValue,)
- print prefix + "offset += 8;"
- elif type == 'bit':
- raise "Can't encode bit in genSingleDecode"
- elif type == 'table':
- print prefix + "table_result = amqp_encode_table(encoded, &(%s), &offset);" % \
- (cValue,)
- print prefix + "if (table_result < 0) return table_result;"
- else:
- raise "Illegal domain in genSingleEncode", type
-
def genDecodeMethodFields(m):
print " case %s: {" % (m.defName(),)
if m.arguments:
@@ -173,22 +242,11 @@ def genErl(spec):
print " if (m == NULL) { return -ERROR_NO_MEMORY; }"
else:
print " %s *m = NULL; /* no fields */" % (m.structName(),)
- bitindex = None
+
+ emitter = BitDecoder(Emitter(" "))
for f in m.arguments:
- if spec.resolveDomain(f.domain) == 'bit':
- if bitindex is None:
- bitindex = 0
- if bitindex >= 8:
- bitindex = 0
- if bitindex == 0:
- print " bit_buffer = D_8(encoded, offset);"
- print " offset++;"
- print " m->%s = (bit_buffer & (1 << %d)) ? 1 : 0;" % \
- (c_ize(f.name), bitindex)
- bitindex = bitindex + 1
- else:
- bitindex = None
- genSingleDecode(" ", "m->%s" % (c_ize(f.name),), f.domain)
+ typeFor(spec, f).decode(emitter, "m->"+c_ize(f.name))
+
print " *decoded = m;"
print " return 0;"
print " }"
@@ -199,13 +257,13 @@ def genErl(spec):
(c.structName(), c.structName(), c.structName())
print " if (p == NULL) { return -ERROR_NO_MEMORY; }"
print " p->_flags = flags;"
+
+ emitter = Emitter(" ")
for f in c.fields:
- if spec.resolveDomain(f.domain) == 'bit':
- pass
- else:
- print " if (flags & %s) {" % (cFlagName(c, f),)
- genSingleDecode(" ", "p->%s" % (c_ize(f.name),), f.domain)
- print " }"
+ emitter.emit("if (flags & %s) {" % (cFlagName(c, f),))
+ typeFor(spec, f).decode(emitter, "p->"+c_ize(f.name))
+ emitter.emit("}")
+
print " *decoded = p;"
print " return 0;"
print " }"
@@ -214,28 +272,12 @@ def genErl(spec):
print " case %s: {" % (m.defName(),)
if m.arguments:
print " %s *m = (%s *) decoded;" % (m.structName(), m.structName())
- bitindex = None
- def finishBits():
- if bitindex is not None:
- print " E_8(encoded, offset, bit_buffer);"
- print " offset++;"
+
+ emitter = BitEncoder(Emitter(" "))
for f in m.arguments:
- if spec.resolveDomain(f.domain) == 'bit':
- if bitindex is None:
- bitindex = 0
- print " bit_buffer = 0;"
- if bitindex >= 8:
- finishBits()
- print " bit_buffer = 0;"
- bitindex = 0
- print " if (m->%s) { bit_buffer |= (1 << %d); }" % \
- (c_ize(f.name), bitindex)
- bitindex = bitindex + 1
- else:
- finishBits()
- bitindex = None
- genSingleEncode(" ", "m->%s" % (c_ize(f.name),), f.domain)
- finishBits()
+ typeFor(spec, f).encode(emitter, "m->"+c_ize(f.name))
+ emitter.flush()
+
print " return offset;"
print " }"
@@ -243,13 +285,13 @@ def genErl(spec):
print " case %d: {" % (c.index,)
if c.fields:
print " %s *p = (%s *) decoded;" % (c.structName(), c.structName())
+
+ emitter = Emitter(" ")
for f in c.fields:
- if spec.resolveDomain(f.domain) == 'bit':
- pass
- else:
- print " if (flags & %s) {" % (cFlagName(c, f),)
- genSingleEncode(" ", "p->%s" % (c_ize(f.name),), f.domain)
- print " }"
+ emitter.emit(" if (flags & %s) {" % (cFlagName(c, f),))
+ typeFor(spec, f).encode(emitter, "p->"+c_ize(f.name))
+ emitter.emit("}")
+
print " return offset;"
print " }"
@@ -310,8 +352,7 @@ int amqp_decode_method(amqp_method_number_t methodNumber,
amqp_bytes_t encoded,
void **decoded)
{
- int offset = 0;
- int table_result;
+ size_t offset = 0;
uint8_t bit_buffer;
switch (methodNumber) {"""
@@ -326,16 +367,15 @@ int amqp_decode_properties(uint16_t class_id,
amqp_bytes_t encoded,
void **decoded)
{
- int offset = 0;
- int table_result;
+ size_t offset = 0;
amqp_flags_t flags = 0;
int flagword_index = 0;
- amqp_flags_t partial_flags;
+ uint16_t partial_flags;
do {
- partial_flags = D_16(encoded, offset);
- offset += 2;
+ if (!amqp_decode_16(encoded, &offset, &partial_flags))
+ return -ERROR_BAD_AMQP_DATA;
flags |= (partial_flags << (flagword_index * 16));
flagword_index++;
} while (partial_flags & 1);
@@ -351,8 +391,7 @@ int amqp_encode_method(amqp_method_number_t methodNumber,
void *decoded,
amqp_bytes_t encoded)
{
- int offset = 0;
- int table_result;
+ size_t offset = 0;
uint8_t bit_buffer;
switch (methodNumber) {"""
@@ -366,8 +405,7 @@ int amqp_encode_properties(uint16_t class_id,
void *decoded,
amqp_bytes_t encoded)
{
- int offset = 0;
- int table_result;
+ size_t offset = 0;
/* Cheat, and get the flags out generically, relying on the
similarity of structure between classes */
@@ -381,8 +419,8 @@ int amqp_encode_properties(uint16_t class_id,
amqp_flags_t remainder = remaining_flags >> 16;
uint16_t partial_flags = remaining_flags & 0xFFFE;
if (remainder != 0) { partial_flags |= 1; }
- E_16(encoded, offset, partial_flags);
- offset += 2;
+ if (!amqp_encode_16(encoded, &offset, partial_flags))
+ return -ERROR_BAD_AMQP_DATA;
remaining_flags = remainder;
} while (remaining_flags != 0);
}
@@ -394,14 +432,16 @@ int amqp_encode_properties(uint16_t class_id,
}"""
def genHrl(spec):
- def cType(domain):
- return cTypeMap[spec.resolveDomain(domain)]
-
def fieldDeclList(fields):
- return ''.join([" %s %s;\n" % (cType(f.domain), c_ize(f.name)) for f in fields])
-
+ if fields:
+ return ''.join([" %s %s;\n" % (typeFor(spec, f).ctype,
+ c_ize(f.name))
+ for f in fields])
+ else:
+ return " char dummy; /* Dummy field to avoid empty struct */\n"
+
def propDeclList(fields):
- return ''.join([" %s %s;\n" % (cType(f.domain), c_ize(f.name))
+ return ''.join([" %s %s;\n" % (typeFor(spec, f).ctype, c_ize(f.name))
for f in fields
if spec.resolveDomain(f.domain) != 'bit'])
@@ -427,7 +467,7 @@ extern "C" {
print """/* Function prototypes. */
extern char const *amqp_constant_name(int constantNumber);
extern amqp_boolean_t amqp_constant_is_hard_error(int constantNumber);
-extern char const *amqp_method_name(amqp_method_number_t methodNumber);
+RABBITMQ_EXPORT char const *amqp_method_name(amqp_method_number_t methodNumber);
extern amqp_boolean_t amqp_method_has_content(amqp_method_number_t methodNumber);
extern int amqp_decode_method(amqp_method_number_t methodNumber,
amqp_pool_t *pool,
diff --git a/librabbitmq/unix/socket.c b/librabbitmq/unix/socket.c
index 9d37dfc..4f5368e 100644
--- a/librabbitmq/unix/socket.c
+++ b/librabbitmq/unix/socket.c
@@ -53,6 +53,7 @@
#include <fcntl.h>
#include <stdint.h>
#include <string.h>
+#include <stdlib.h>
#include "amqp.h"
#include "amqp_private.h"
@@ -77,7 +78,13 @@ int amqp_socket_socket(int domain, int type, int proto)
}
return s;
-}
+}
+
+/* strdup is not in ISO C90! */
+static inline char *strdup(const char *str)
+{
+ return strcpy(malloc(strlen(str) + 1),str);
+}
char *amqp_os_error_string(int err)
{
diff --git a/librabbitmq/windows/socket.c b/librabbitmq/windows/socket.c
index 9c026bd..bef7b95 100644
--- a/librabbitmq/windows/socket.c
+++ b/librabbitmq/windows/socket.c
@@ -48,8 +48,12 @@
* ***** END LICENSE BLOCK *****
*/
+/* See http://msdn.microsoft.com/en-us/library/ms737629%28VS.85%29.aspx */
+#define WIN32_LEAN_AND_MEAN
+
#include <windows.h>
#include <stdint.h>
+#include <stdlib.h>
#include "amqp.h"
#include "amqp_private.h"
@@ -64,13 +68,19 @@ int amqp_socket_init(void)
int res = WSAStartup(0x0202, &data);
if (res)
return -res;
-
+
called_wsastartup = 1;
}
return 0;
}
+/* strdup is not in ISO C90! */
+static inline char *strdup(const char *str)
+{
+ return strcpy(malloc(strlen(str) + 1),str);
+}
+
char *amqp_os_error_string(int err)
{
char *msg, *copy;
diff --git a/librabbitmq/windows/socket.h b/librabbitmq/windows/socket.h
index 3e0a378..38ca905 100644
--- a/librabbitmq/windows/socket.h
+++ b/librabbitmq/windows/socket.h
@@ -69,7 +69,7 @@ static inline int amqp_socket_setsockopt(int sock, int level, int optname,
/* same as WSABUF */
struct iovec {
u_long iov_len;
- char *iov_base;
+ void *iov_base;
};
static inline int amqp_socket_writev(int sock, struct iovec *iov, int nvecs)
diff --git a/msinttypes/inttypes.h b/msinttypes/inttypes.h
new file mode 100644
index 0000000..4b3828a
--- /dev/null
+++ b/msinttypes/inttypes.h
@@ -0,0 +1,305 @@
+// ISO C9x compliant inttypes.h for Microsoft Visual Studio
+// Based on ISO/IEC 9899:TC2 Committee draft (May 6, 2005) WG14/N1124
+//
+// Copyright (c) 2006 Alexander Chemeris
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are met:
+//
+// 1. Redistributions of source code must retain the above copyright notice,
+// this list of conditions and the following disclaimer.
+//
+// 2. Redistributions in binary form must reproduce the above copyright
+// notice, this list of conditions and the following disclaimer in the
+// documentation and/or other materials provided with the distribution.
+//
+// 3. The name of the author may be used to endorse or promote products
+// derived from this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
+// WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
+// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
+// EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
+// OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
+// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
+// OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
+// ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+//
+///////////////////////////////////////////////////////////////////////////////
+
+#ifndef _MSC_VER // [
+#error "Use this header only with Microsoft Visual C++ compilers!"
+#endif // _MSC_VER ]
+
+#ifndef _MSC_INTTYPES_H_ // [
+#define _MSC_INTTYPES_H_
+
+#if _MSC_VER > 1000
+#pragma once
+#endif
+
+#include "stdint.h"
+
+// 7.8 Format conversion of integer types
+
+typedef struct {
+ intmax_t quot;
+ intmax_t rem;
+} imaxdiv_t;
+
+// 7.8.1 Macros for format specifiers
+
+#if !defined(__cplusplus) || defined(__STDC_FORMAT_MACROS) // [ See footnote 185 at page 198
+
+// The fprintf macros for signed integers are:
+#define PRId8 "d"
+#define PRIi8 "i"
+#define PRIdLEAST8 "d"
+#define PRIiLEAST8 "i"
+#define PRIdFAST8 "d"
+#define PRIiFAST8 "i"
+
+#define PRId16 "hd"
+#define PRIi16 "hi"
+#define PRIdLEAST16 "hd"
+#define PRIiLEAST16 "hi"
+#define PRIdFAST16 "hd"
+#define PRIiFAST16 "hi"
+
+#define PRId32 "I32d"
+#define PRIi32 "I32i"
+#define PRIdLEAST32 "I32d"
+#define PRIiLEAST32 "I32i"
+#define PRIdFAST32 "I32d"
+#define PRIiFAST32 "I32i"
+
+#define PRId64 "I64d"
+#define PRIi64 "I64i"
+#define PRIdLEAST64 "I64d"
+#define PRIiLEAST64 "I64i"
+#define PRIdFAST64 "I64d"
+#define PRIiFAST64 "I64i"
+
+#define PRIdMAX "I64d"
+#define PRIiMAX "I64i"
+
+#define PRIdPTR "Id"
+#define PRIiPTR "Ii"
+
+// The fprintf macros for unsigned integers are:
+#define PRIo8 "o"
+#define PRIu8 "u"
+#define PRIx8 "x"
+#define PRIX8 "X"
+#define PRIoLEAST8 "o"
+#define PRIuLEAST8 "u"
+#define PRIxLEAST8 "x"
+#define PRIXLEAST8 "X"
+#define PRIoFAST8 "o"
+#define PRIuFAST8 "u"
+#define PRIxFAST8 "x"
+#define PRIXFAST8 "X"
+
+#define PRIo16 "ho"
+#define PRIu16 "hu"
+#define PRIx16 "hx"
+#define PRIX16 "hX"
+#define PRIoLEAST16 "ho"
+#define PRIuLEAST16 "hu"
+#define PRIxLEAST16 "hx"
+#define PRIXLEAST16 "hX"
+#define PRIoFAST16 "ho"
+#define PRIuFAST16 "hu"
+#define PRIxFAST16 "hx"
+#define PRIXFAST16 "hX"
+
+#define PRIo32 "I32o"
+#define PRIu32 "I32u"
+#define PRIx32 "I32x"
+#define PRIX32 "I32X"
+#define PRIoLEAST32 "I32o"
+#define PRIuLEAST32 "I32u"
+#define PRIxLEAST32 "I32x"
+#define PRIXLEAST32 "I32X"
+#define PRIoFAST32 "I32o"
+#define PRIuFAST32 "I32u"
+#define PRIxFAST32 "I32x"
+#define PRIXFAST32 "I32X"
+
+#define PRIo64 "I64o"
+#define PRIu64 "I64u"
+#define PRIx64 "I64x"
+#define PRIX64 "I64X"
+#define PRIoLEAST64 "I64o"
+#define PRIuLEAST64 "I64u"
+#define PRIxLEAST64 "I64x"
+#define PRIXLEAST64 "I64X"
+#define PRIoFAST64 "I64o"
+#define PRIuFAST64 "I64u"
+#define PRIxFAST64 "I64x"
+#define PRIXFAST64 "I64X"
+
+#define PRIoMAX "I64o"
+#define PRIuMAX "I64u"
+#define PRIxMAX "I64x"
+#define PRIXMAX "I64X"
+
+#define PRIoPTR "Io"
+#define PRIuPTR "Iu"
+#define PRIxPTR "Ix"
+#define PRIXPTR "IX"
+
+// The fscanf macros for signed integers are:
+#define SCNd8 "d"
+#define SCNi8 "i"
+#define SCNdLEAST8 "d"
+#define SCNiLEAST8 "i"
+#define SCNdFAST8 "d"
+#define SCNiFAST8 "i"
+
+#define SCNd16 "hd"
+#define SCNi16 "hi"
+#define SCNdLEAST16 "hd"
+#define SCNiLEAST16 "hi"
+#define SCNdFAST16 "hd"
+#define SCNiFAST16 "hi"
+
+#define SCNd32 "ld"
+#define SCNi32 "li"
+#define SCNdLEAST32 "ld"
+#define SCNiLEAST32 "li"
+#define SCNdFAST32 "ld"
+#define SCNiFAST32 "li"
+
+#define SCNd64 "I64d"
+#define SCNi64 "I64i"
+#define SCNdLEAST64 "I64d"
+#define SCNiLEAST64 "I64i"
+#define SCNdFAST64 "I64d"
+#define SCNiFAST64 "I64i"
+
+#define SCNdMAX "I64d"
+#define SCNiMAX "I64i"
+
+#ifdef _WIN64 // [
+# define SCNdPTR "I64d"
+# define SCNiPTR "I64i"
+#else // _WIN64 ][
+# define SCNdPTR "ld"
+# define SCNiPTR "li"
+#endif // _WIN64 ]
+
+// The fscanf macros for unsigned integers are:
+#define SCNo8 "o"
+#define SCNu8 "u"
+#define SCNx8 "x"
+#define SCNX8 "X"
+#define SCNoLEAST8 "o"
+#define SCNuLEAST8 "u"
+#define SCNxLEAST8 "x"
+#define SCNXLEAST8 "X"
+#define SCNoFAST8 "o"
+#define SCNuFAST8 "u"
+#define SCNxFAST8 "x"
+#define SCNXFAST8 "X"
+
+#define SCNo16 "ho"
+#define SCNu16 "hu"
+#define SCNx16 "hx"
+#define SCNX16 "hX"
+#define SCNoLEAST16 "ho"
+#define SCNuLEAST16 "hu"
+#define SCNxLEAST16 "hx"
+#define SCNXLEAST16 "hX"
+#define SCNoFAST16 "ho"
+#define SCNuFAST16 "hu"
+#define SCNxFAST16 "hx"
+#define SCNXFAST16 "hX"
+
+#define SCNo32 "lo"
+#define SCNu32 "lu"
+#define SCNx32 "lx"
+#define SCNX32 "lX"
+#define SCNoLEAST32 "lo"
+#define SCNuLEAST32 "lu"
+#define SCNxLEAST32 "lx"
+#define SCNXLEAST32 "lX"
+#define SCNoFAST32 "lo"
+#define SCNuFAST32 "lu"
+#define SCNxFAST32 "lx"
+#define SCNXFAST32 "lX"
+
+#define SCNo64 "I64o"
+#define SCNu64 "I64u"
+#define SCNx64 "I64x"
+#define SCNX64 "I64X"
+#define SCNoLEAST64 "I64o"
+#define SCNuLEAST64 "I64u"
+#define SCNxLEAST64 "I64x"
+#define SCNXLEAST64 "I64X"
+#define SCNoFAST64 "I64o"
+#define SCNuFAST64 "I64u"
+#define SCNxFAST64 "I64x"
+#define SCNXFAST64 "I64X"
+
+#define SCNoMAX "I64o"
+#define SCNuMAX "I64u"
+#define SCNxMAX "I64x"
+#define SCNXMAX "I64X"
+
+#ifdef _WIN64 // [
+# define SCNoPTR "I64o"
+# define SCNuPTR "I64u"
+# define SCNxPTR "I64x"
+# define SCNXPTR "I64X"
+#else // _WIN64 ][
+# define SCNoPTR "lo"
+# define SCNuPTR "lu"
+# define SCNxPTR "lx"
+# define SCNXPTR "lX"
+#endif // _WIN64 ]
+
+#endif // __STDC_FORMAT_MACROS ]
+
+// 7.8.2 Functions for greatest-width integer types
+
+// 7.8.2.1 The imaxabs function
+#define imaxabs _abs64
+
+// 7.8.2.2 The imaxdiv function
+
+// This is modified version of div() function from Microsoft's div.c found
+// in %MSVC.NET%\crt\src\div.c
+#ifdef STATIC_IMAXDIV // [
+static
+#else // STATIC_IMAXDIV ][
+_inline
+#endif // STATIC_IMAXDIV ]
+imaxdiv_t __cdecl imaxdiv(intmax_t numer, intmax_t denom)
+{
+ imaxdiv_t result;
+
+ result.quot = numer / denom;
+ result.rem = numer % denom;
+
+ if (numer < 0 && result.rem > 0) {
+ // did division wrong; must fix up
+ ++result.quot;
+ result.rem -= denom;
+ }
+
+ return result;
+}
+
+// 7.8.2.3 The strtoimax and strtoumax functions
+#define strtoimax _strtoi64
+#define strtoumax _strtoui64
+
+// 7.8.2.4 The wcstoimax and wcstoumax functions
+#define wcstoimax _wcstoi64
+#define wcstoumax _wcstoui64
+
+
+#endif // _MSC_INTTYPES_H_ ]
diff --git a/msinttypes/stdint.h b/msinttypes/stdint.h
new file mode 100644
index 0000000..d02608a
--- /dev/null
+++ b/msinttypes/stdint.h
@@ -0,0 +1,247 @@
+// ISO C9x compliant stdint.h for Microsoft Visual Studio
+// Based on ISO/IEC 9899:TC2 Committee draft (May 6, 2005) WG14/N1124
+//
+// Copyright (c) 2006-2008 Alexander Chemeris
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are met:
+//
+// 1. Redistributions of source code must retain the above copyright notice,
+// this list of conditions and the following disclaimer.
+//
+// 2. Redistributions in binary form must reproduce the above copyright
+// notice, this list of conditions and the following disclaimer in the
+// documentation and/or other materials provided with the distribution.
+//
+// 3. The name of the author may be used to endorse or promote products
+// derived from this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
+// WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
+// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
+// EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
+// OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
+// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
+// OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
+// ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+//
+///////////////////////////////////////////////////////////////////////////////
+
+#ifndef _MSC_VER // [
+#error "Use this header only with Microsoft Visual C++ compilers!"
+#endif // _MSC_VER ]
+
+#ifndef _MSC_STDINT_H_ // [
+#define _MSC_STDINT_H_
+
+#if _MSC_VER > 1000
+#pragma once
+#endif
+
+#include <limits.h>
+
+// For Visual Studio 6 in C++ mode and for many Visual Studio versions when
+// compiling for ARM we should wrap <wchar.h> include with 'extern "C++" {}'
+// or compiler give many errors like this:
+// error C2733: second C linkage of overloaded function 'wmemchr' not allowed
+#ifdef __cplusplus
+extern "C" {
+#endif
+# include <wchar.h>
+#ifdef __cplusplus
+}
+#endif
+
+// Define _W64 macros to mark types changing their size, like intptr_t.
+#ifndef _W64
+# if !defined(__midl) && (defined(_X86_) || defined(_M_IX86)) && _MSC_VER >= 1300
+# define _W64 __w64
+# else
+# define _W64
+# endif
+#endif
+
+
+// 7.18.1 Integer types
+
+// 7.18.1.1 Exact-width integer types
+
+// Visual Studio 6 and Embedded Visual C++ 4 doesn't
+// realize that, e.g. char has the same size as __int8
+// so we give up on __intX for them.
+#if (_MSC_VER < 1300)
+ typedef signed char int8_t;
+ typedef signed short int16_t;
+ typedef signed int int32_t;
+ typedef unsigned char uint8_t;
+ typedef unsigned short uint16_t;
+ typedef unsigned int uint32_t;
+#else
+ typedef signed __int8 int8_t;
+ typedef signed __int16 int16_t;
+ typedef signed __int32 int32_t;
+ typedef unsigned __int8 uint8_t;
+ typedef unsigned __int16 uint16_t;
+ typedef unsigned __int32 uint32_t;
+#endif
+typedef signed __int64 int64_t;
+typedef unsigned __int64 uint64_t;
+
+
+// 7.18.1.2 Minimum-width integer types
+typedef int8_t int_least8_t;
+typedef int16_t int_least16_t;
+typedef int32_t int_least32_t;
+typedef int64_t int_least64_t;
+typedef uint8_t uint_least8_t;
+typedef uint16_t uint_least16_t;
+typedef uint32_t uint_least32_t;
+typedef uint64_t uint_least64_t;
+
+// 7.18.1.3 Fastest minimum-width integer types
+typedef int8_t int_fast8_t;
+typedef int16_t int_fast16_t;
+typedef int32_t int_fast32_t;
+typedef int64_t int_fast64_t;
+typedef uint8_t uint_fast8_t;
+typedef uint16_t uint_fast16_t;
+typedef uint32_t uint_fast32_t;
+typedef uint64_t uint_fast64_t;
+
+// 7.18.1.4 Integer types capable of holding object pointers
+#ifdef _WIN64 // [
+ typedef signed __int64 intptr_t;
+ typedef unsigned __int64 uintptr_t;
+#else // _WIN64 ][
+ typedef _W64 signed int intptr_t;
+ typedef _W64 unsigned int uintptr_t;
+#endif // _WIN64 ]
+
+// 7.18.1.5 Greatest-width integer types
+typedef int64_t intmax_t;
+typedef uint64_t uintmax_t;
+
+
+// 7.18.2 Limits of specified-width integer types
+
+#if !defined(__cplusplus) || defined(__STDC_LIMIT_MACROS) // [ See footnote 220 at page 257 and footnote 221 at page 259
+
+// 7.18.2.1 Limits of exact-width integer types
+#define INT8_MIN ((int8_t)_I8_MIN)
+#define INT8_MAX _I8_MAX
+#define INT16_MIN ((int16_t)_I16_MIN)
+#define INT16_MAX _I16_MAX
+#define INT32_MIN ((int32_t)_I32_MIN)
+#define INT32_MAX _I32_MAX
+#define INT64_MIN ((int64_t)_I64_MIN)
+#define INT64_MAX _I64_MAX
+#define UINT8_MAX _UI8_MAX
+#define UINT16_MAX _UI16_MAX
+#define UINT32_MAX _UI32_MAX
+#define UINT64_MAX _UI64_MAX
+
+// 7.18.2.2 Limits of minimum-width integer types
+#define INT_LEAST8_MIN INT8_MIN
+#define INT_LEAST8_MAX INT8_MAX
+#define INT_LEAST16_MIN INT16_MIN
+#define INT_LEAST16_MAX INT16_MAX
+#define INT_LEAST32_MIN INT32_MIN
+#define INT_LEAST32_MAX INT32_MAX
+#define INT_LEAST64_MIN INT64_MIN
+#define INT_LEAST64_MAX INT64_MAX
+#define UINT_LEAST8_MAX UINT8_MAX
+#define UINT_LEAST16_MAX UINT16_MAX
+#define UINT_LEAST32_MAX UINT32_MAX
+#define UINT_LEAST64_MAX UINT64_MAX
+
+// 7.18.2.3 Limits of fastest minimum-width integer types
+#define INT_FAST8_MIN INT8_MIN
+#define INT_FAST8_MAX INT8_MAX
+#define INT_FAST16_MIN INT16_MIN
+#define INT_FAST16_MAX INT16_MAX
+#define INT_FAST32_MIN INT32_MIN
+#define INT_FAST32_MAX INT32_MAX
+#define INT_FAST64_MIN INT64_MIN
+#define INT_FAST64_MAX INT64_MAX
+#define UINT_FAST8_MAX UINT8_MAX
+#define UINT_FAST16_MAX UINT16_MAX
+#define UINT_FAST32_MAX UINT32_MAX
+#define UINT_FAST64_MAX UINT64_MAX
+
+// 7.18.2.4 Limits of integer types capable of holding object pointers
+#ifdef _WIN64 // [
+# define INTPTR_MIN INT64_MIN
+# define INTPTR_MAX INT64_MAX
+# define UINTPTR_MAX UINT64_MAX
+#else // _WIN64 ][
+# define INTPTR_MIN INT32_MIN
+# define INTPTR_MAX INT32_MAX
+# define UINTPTR_MAX UINT32_MAX
+#endif // _WIN64 ]
+
+// 7.18.2.5 Limits of greatest-width integer types
+#define INTMAX_MIN INT64_MIN
+#define INTMAX_MAX INT64_MAX
+#define UINTMAX_MAX UINT64_MAX
+
+// 7.18.3 Limits of other integer types
+
+#ifdef _WIN64 // [
+# define PTRDIFF_MIN _I64_MIN
+# define PTRDIFF_MAX _I64_MAX
+#else // _WIN64 ][
+# define PTRDIFF_MIN _I32_MIN
+# define PTRDIFF_MAX _I32_MAX
+#endif // _WIN64 ]
+
+#define SIG_ATOMIC_MIN INT_MIN
+#define SIG_ATOMIC_MAX INT_MAX
+
+#ifndef SIZE_MAX // [
+# ifdef _WIN64 // [
+# define SIZE_MAX _UI64_MAX
+# else // _WIN64 ][
+# define SIZE_MAX _UI32_MAX
+# endif // _WIN64 ]
+#endif // SIZE_MAX ]
+
+// WCHAR_MIN and WCHAR_MAX are also defined in <wchar.h>
+#ifndef WCHAR_MIN // [
+# define WCHAR_MIN 0
+#endif // WCHAR_MIN ]
+#ifndef WCHAR_MAX // [
+# define WCHAR_MAX _UI16_MAX
+#endif // WCHAR_MAX ]
+
+#define WINT_MIN 0
+#define WINT_MAX _UI16_MAX
+
+#endif // __STDC_LIMIT_MACROS ]
+
+
+// 7.18.4 Limits of other integer types
+
+#if !defined(__cplusplus) || defined(__STDC_CONSTANT_MACROS) // [ See footnote 224 at page 260
+
+// 7.18.4.1 Macros for minimum-width integer constants
+
+#define INT8_C(val) val##i8
+#define INT16_C(val) val##i16
+#define INT32_C(val) val##i32
+#define INT64_C(val) val##i64
+
+#define UINT8_C(val) val##ui8
+#define UINT16_C(val) val##ui16
+#define UINT32_C(val) val##ui32
+#define UINT64_C(val) val##ui64
+
+// 7.18.4.2 Macros for greatest-width integer constants
+#define INTMAX_C INT64_C
+#define UINTMAX_C UINT64_C
+
+#endif // __STDC_CONSTANT_MACROS ]
+
+
+#endif // _MSC_STDINT_H_ ]
diff --git a/tests/Makefile.am b/tests/Makefile.am
index 7c8a4fe..3bd5e6e 100644
--- a/tests/Makefile.am
+++ b/tests/Makefile.am
@@ -1,4 +1,16 @@
noinst_PROGRAMS = test_tables
-AM_CFLAGS = -I$(top_srcdir)/librabbitmq -I$(top_srcdir)/librabbitmq/$(PLATFORM_DIR)
+AM_CFLAGS = -I$(top_srcdir)/librabbitmq
+
+if GCC
+# Because we want to build under Microsoft's C compiler (for which
+# there is apparently no demand for C99 support), it's a good idea
+# to have gcc tell us when we stray from the old standard.
+AM_CFLAGS += -ansi -pedantic
+endif
+
+if USE_MSINTTYPES
+AM_CFLAGS += -I$(top_srcdir)/msinttypes
+endif
+
AM_LDFLAGS = $(top_builddir)/librabbitmq/librabbitmq.la
diff --git a/tests/test_tables.c b/tests/test_tables.c
index be1994b..5c0fbb9 100644
--- a/tests/test_tables.c
+++ b/tests/test_tables.c
@@ -51,19 +51,15 @@
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
-#include <time.h>
#include <inttypes.h>
#include <amqp.h>
-#include <amqp_framing.h>
-#include <amqp_private.h>
-
-#include <unistd.h>
-#include <assert.h>
#include <math.h>
+#define M_PI 3.14159265358979323846264338327
+
static void dump_indent(int indent) {
int i;
for (i = 0; i < indent; i++) { putchar(' '); }
@@ -127,86 +123,152 @@ static void dump_value(int indent, amqp_field_value_t v) {
}
}
-static void test_table_codec(void) {
- amqp_table_entry_t inner_entries[2] =
- { AMQP_TABLE_ENTRY_I32("one", 54321),
- AMQP_TABLE_ENTRY_UTF8("two", amqp_cstring_bytes("A long string")) };
- amqp_table_t inner_table = { .num_entries = sizeof(inner_entries) / sizeof(inner_entries[0]),
- .entries = &inner_entries[0] };
-
- amqp_field_value_t inner_values[2] =
- { AMQP_FIELD_VALUE_I32(54321),
- AMQP_FIELD_VALUE_UTF8(amqp_cstring_bytes("A long string")) };
- amqp_array_t inner_array = { .num_entries = sizeof(inner_values) / sizeof(inner_values[0]),
- .entries = &inner_values[0] };
-
- amqp_table_entry_t entries[14] =
- { AMQP_TABLE_ENTRY_UTF8("longstr", amqp_cstring_bytes("Here is a long string")),
- AMQP_TABLE_ENTRY_I32("signedint", 12345),
- AMQP_TABLE_ENTRY_DECIMAL("decimal", AMQP_DECIMAL(3, 123456)),
- AMQP_TABLE_ENTRY_TIMESTAMP("timestamp", 109876543209876),
- AMQP_TABLE_ENTRY_TABLE("table", inner_table),
- AMQP_TABLE_ENTRY_I8("byte", 255),
- AMQP_TABLE_ENTRY_I64("long", 1234567890),
- AMQP_TABLE_ENTRY_I16("short", 655),
- AMQP_TABLE_ENTRY_BOOLEAN("bool", 1),
- AMQP_TABLE_ENTRY_BYTES("binary", amqp_cstring_bytes("a binary string")),
- AMQP_TABLE_ENTRY_VOID("void"),
- AMQP_TABLE_ENTRY_ARRAY("array", inner_array),
- AMQP_TABLE_ENTRY_F32("float", M_PI),
- AMQP_TABLE_ENTRY_F64("double", M_PI) };
- amqp_table_t table = { .num_entries = sizeof(entries) / sizeof(entries[0]),
- .entries = &entries[0] };
-
- uint8_t pre_encoded_table[] = {
- 0x00, 0x00, 0x00, 0xff, 0x07, 0x6c, 0x6f, 0x6e,
- 0x67, 0x73, 0x74, 0x72, 0x53, 0x00, 0x00, 0x00,
- 0x15, 0x48, 0x65, 0x72, 0x65, 0x20, 0x69, 0x73,
- 0x20, 0x61, 0x20, 0x6c, 0x6f, 0x6e, 0x67, 0x20,
- 0x73, 0x74, 0x72, 0x69, 0x6e, 0x67, 0x09, 0x73,
- 0x69, 0x67, 0x6e, 0x65, 0x64, 0x69, 0x6e, 0x74,
- 0x49, 0x00, 0x00, 0x30, 0x39, 0x07, 0x64, 0x65,
- 0x63, 0x69, 0x6d, 0x61, 0x6c, 0x44, 0x03, 0x00,
- 0x01, 0xe2, 0x40, 0x09, 0x74, 0x69, 0x6d, 0x65,
- 0x73, 0x74, 0x61, 0x6d, 0x70, 0x54, 0x00, 0x00,
- 0x63, 0xee, 0xa0, 0x53, 0xc1, 0x94, 0x05, 0x74,
- 0x61, 0x62, 0x6c, 0x65, 0x46, 0x00, 0x00, 0x00,
- 0x1f, 0x03, 0x6f, 0x6e, 0x65, 0x49, 0x00, 0x00,
- 0xd4, 0x31, 0x03, 0x74, 0x77, 0x6f, 0x53, 0x00,
- 0x00, 0x00, 0x0d, 0x41, 0x20, 0x6c, 0x6f, 0x6e,
- 0x67, 0x20, 0x73, 0x74, 0x72, 0x69, 0x6e, 0x67,
- 0x04, 0x62, 0x79, 0x74, 0x65, 0x62, 0xff, 0x04,
- 0x6c, 0x6f, 0x6e, 0x67, 0x6c, 0x00, 0x00, 0x00,
- 0x00, 0x49, 0x96, 0x02, 0xd2, 0x05, 0x73, 0x68,
- 0x6f, 0x72, 0x74, 0x73, 0x02, 0x8f, 0x04, 0x62,
- 0x6f, 0x6f, 0x6c, 0x74, 0x01, 0x06, 0x62, 0x69,
- 0x6e, 0x61, 0x72, 0x79, 0x78, 0x00, 0x00, 0x00,
- 0x0f, 0x61, 0x20, 0x62, 0x69, 0x6e, 0x61, 0x72,
- 0x79, 0x20, 0x73, 0x74, 0x72, 0x69, 0x6e, 0x67,
- 0x04, 0x76, 0x6f, 0x69, 0x64, 0x56, 0x05, 0x61,
- 0x72, 0x72, 0x61, 0x79, 0x41, 0x00, 0x00, 0x00,
- 0x17, 0x49, 0x00, 0x00, 0xd4, 0x31, 0x53, 0x00,
- 0x00, 0x00, 0x0d, 0x41, 0x20, 0x6c, 0x6f, 0x6e,
- 0x67, 0x20, 0x73, 0x74, 0x72, 0x69, 0x6e, 0x67,
- 0x05, 0x66, 0x6c, 0x6f, 0x61, 0x74, 0x66, 0x40,
- 0x49, 0x0f, 0xdb, 0x06, 0x64, 0x6f, 0x75, 0x62,
- 0x6c, 0x65, 0x64, 0x40, 0x09, 0x21, 0xfb, 0x54,
- 0x44, 0x2d, 0x18
- };
+static uint8_t pre_encoded_table[] = {
+ 0x00, 0x00, 0x00, 0xff, 0x07, 0x6c, 0x6f, 0x6e,
+ 0x67, 0x73, 0x74, 0x72, 0x53, 0x00, 0x00, 0x00,
+ 0x15, 0x48, 0x65, 0x72, 0x65, 0x20, 0x69, 0x73,
+ 0x20, 0x61, 0x20, 0x6c, 0x6f, 0x6e, 0x67, 0x20,
+ 0x73, 0x74, 0x72, 0x69, 0x6e, 0x67, 0x09, 0x73,
+ 0x69, 0x67, 0x6e, 0x65, 0x64, 0x69, 0x6e, 0x74,
+ 0x49, 0x00, 0x00, 0x30, 0x39, 0x07, 0x64, 0x65,
+ 0x63, 0x69, 0x6d, 0x61, 0x6c, 0x44, 0x03, 0x00,
+ 0x01, 0xe2, 0x40, 0x09, 0x74, 0x69, 0x6d, 0x65,
+ 0x73, 0x74, 0x61, 0x6d, 0x70, 0x54, 0x00, 0x00,
+ 0x63, 0xee, 0xa0, 0x53, 0xc1, 0x94, 0x05, 0x74,
+ 0x61, 0x62, 0x6c, 0x65, 0x46, 0x00, 0x00, 0x00,
+ 0x1f, 0x03, 0x6f, 0x6e, 0x65, 0x49, 0x00, 0x00,
+ 0xd4, 0x31, 0x03, 0x74, 0x77, 0x6f, 0x53, 0x00,
+ 0x00, 0x00, 0x0d, 0x41, 0x20, 0x6c, 0x6f, 0x6e,
+ 0x67, 0x20, 0x73, 0x74, 0x72, 0x69, 0x6e, 0x67,
+ 0x04, 0x62, 0x79, 0x74, 0x65, 0x62, 0xff, 0x04,
+ 0x6c, 0x6f, 0x6e, 0x67, 0x6c, 0x00, 0x00, 0x00,
+ 0x00, 0x49, 0x96, 0x02, 0xd2, 0x05, 0x73, 0x68,
+ 0x6f, 0x72, 0x74, 0x73, 0x02, 0x8f, 0x04, 0x62,
+ 0x6f, 0x6f, 0x6c, 0x74, 0x01, 0x06, 0x62, 0x69,
+ 0x6e, 0x61, 0x72, 0x79, 0x78, 0x00, 0x00, 0x00,
+ 0x0f, 0x61, 0x20, 0x62, 0x69, 0x6e, 0x61, 0x72,
+ 0x79, 0x20, 0x73, 0x74, 0x72, 0x69, 0x6e, 0x67,
+ 0x04, 0x76, 0x6f, 0x69, 0x64, 0x56, 0x05, 0x61,
+ 0x72, 0x72, 0x61, 0x79, 0x41, 0x00, 0x00, 0x00,
+ 0x17, 0x49, 0x00, 0x00, 0xd4, 0x31, 0x53, 0x00,
+ 0x00, 0x00, 0x0d, 0x41, 0x20, 0x6c, 0x6f, 0x6e,
+ 0x67, 0x20, 0x73, 0x74, 0x72, 0x69, 0x6e, 0x67,
+ 0x05, 0x66, 0x6c, 0x6f, 0x61, 0x74, 0x66, 0x40,
+ 0x49, 0x0f, 0xdb, 0x06, 0x64, 0x6f, 0x75, 0x62,
+ 0x6c, 0x65, 0x64, 0x40, 0x09, 0x21, 0xfb, 0x54,
+ 0x44, 0x2d, 0x18
+};
+static void test_table_codec(void) {
amqp_pool_t pool;
int result;
+ amqp_table_entry_t inner_entries[2];
+ amqp_table_t inner_table;
+
+ amqp_field_value_t inner_values[2];
+ amqp_array_t inner_array;
+
+ amqp_table_entry_t entries[14];
+ amqp_table_t table;
+
+ inner_entries[0].key = amqp_cstring_bytes("one");
+ inner_entries[0].value.kind = AMQP_FIELD_KIND_I32;
+ inner_entries[0].value.value.i32 = 54321;
+
+ inner_entries[1].key = amqp_cstring_bytes("two");
+ inner_entries[1].value.kind = AMQP_FIELD_KIND_UTF8;
+ inner_entries[1].value.value.bytes = amqp_cstring_bytes("A long string");
+
+ inner_table.num_entries = 2;
+ inner_table.entries = inner_entries;
+
+ inner_values[0].kind = AMQP_FIELD_KIND_I32;
+ inner_values[0].value.i32 = 54321;
+
+ inner_values[1].kind = AMQP_FIELD_KIND_UTF8;
+ inner_values[1].value.bytes = amqp_cstring_bytes("A long string");
+
+ inner_array.num_entries = 2;
+ inner_array.entries = inner_values;
+
+ entries[0].key = amqp_cstring_bytes("longstr");
+ entries[0].value.kind = AMQP_FIELD_KIND_UTF8;
+ entries[0].value.value.bytes = amqp_cstring_bytes("Here is a long string");
+
+ entries[1].key = amqp_cstring_bytes("signedint");
+ entries[1].value.kind = AMQP_FIELD_KIND_I32;
+ entries[1].value.value.i32 = 12345;
+
+ entries[2].key = amqp_cstring_bytes("decimal");
+ entries[2].value.kind = AMQP_FIELD_KIND_DECIMAL;
+ entries[2].value.value.decimal.decimals = 3;
+ entries[2].value.value.decimal.value = 123456;
+
+ entries[3].key = amqp_cstring_bytes("timestamp");
+ entries[3].value.kind = AMQP_FIELD_KIND_TIMESTAMP;
+ entries[3].value.value.u64 = 109876543209876;
+
+ entries[4].key = amqp_cstring_bytes("table");
+ entries[4].value.kind = AMQP_FIELD_KIND_TABLE;
+ entries[4].value.value.table = inner_table;
+
+ entries[5].key = amqp_cstring_bytes("byte");
+ entries[5].value.kind = AMQP_FIELD_KIND_I8;
+ entries[5].value.value.i8 = (int8_t)255;
+
+ entries[6].key = amqp_cstring_bytes("long");
+ entries[6].value.kind = AMQP_FIELD_KIND_I64;
+ entries[6].value.value.i64 = 1234567890;
+
+ entries[7].key = amqp_cstring_bytes("short");
+ entries[7].value.kind = AMQP_FIELD_KIND_I16;
+ entries[7].value.value.i16 = 655;
+
+ entries[8].key = amqp_cstring_bytes("bool");
+ entries[8].value.kind = AMQP_FIELD_KIND_BOOLEAN;
+ entries[8].value.value.boolean = 1;
+
+ entries[9].key = amqp_cstring_bytes("binary");
+ entries[9].value.kind = AMQP_FIELD_KIND_BYTES;
+ entries[9].value.value.bytes = amqp_cstring_bytes("a binary string");
+
+ entries[10].key = amqp_cstring_bytes("void");
+ entries[10].value.kind = AMQP_FIELD_KIND_VOID;
+
+ entries[11].key = amqp_cstring_bytes("array");
+ entries[11].value.kind = AMQP_FIELD_KIND_ARRAY;
+ entries[11].value.value.array = inner_array;
+
+ entries[12].key = amqp_cstring_bytes("float");
+ entries[12].value.kind = AMQP_FIELD_KIND_F32;
+ entries[12].value.value.f32 = M_PI;
+
+ entries[13].key = amqp_cstring_bytes("double");
+ entries[13].value.kind = AMQP_FIELD_KIND_F64;
+ entries[13].value.value.f64 = M_PI;
+
+ table.num_entries = 14;
+ table.entries = entries;
+
printf("AAAAAAAAAA\n");
- dump_value(0, (amqp_field_value_t) AMQP_FIELD_VALUE_TABLE(table));
+
+ {
+ amqp_field_value_t val;
+ val.kind = AMQP_FIELD_KIND_TABLE;
+ val.value.table = table;
+ dump_value(0, val);
+ }
init_amqp_pool(&pool, 4096);
{
- amqp_bytes_t decoding_bytes = { .len = sizeof(pre_encoded_table),
- .bytes = pre_encoded_table };
amqp_table_t decoded;
- int decoding_offset = 0;
+ size_t decoding_offset = 0;
+ amqp_bytes_t decoding_bytes;
+ decoding_bytes.len = sizeof(pre_encoded_table);
+ decoding_bytes.bytes = pre_encoded_table;
+
result = amqp_decode_table(decoding_bytes, &pool, &decoded, &decoding_offset);
if (result < 0) {
char *errstr = amqp_error_string(-result);
@@ -215,13 +277,20 @@ static void test_table_codec(void) {
abort();
}
printf("BBBBBBBBBB\n");
- dump_value(0, (amqp_field_value_t) AMQP_FIELD_VALUE_TABLE(decoded));
+
+ {
+ amqp_field_value_t val;
+ val.kind = AMQP_FIELD_KIND_TABLE;
+ val.value.table = decoded;
+
+ dump_value(0, val);
+ }
}
{
uint8_t encoding_buffer[4096];
amqp_bytes_t encoding_result;
- int offset = 0;
+ size_t offset = 0;
memset(&encoding_buffer[0], 0, sizeof(encoding_buffer));
encoding_result.len = sizeof(encoding_buffer);
@@ -236,7 +305,7 @@ static void test_table_codec(void) {
}
if (offset != sizeof(pre_encoded_table)) {
- printf("Offset should be %d, was %d\n", (int) sizeof(pre_encoded_table), offset);
+ printf("Offset should be %d, was %d\n", (int) sizeof(pre_encoded_table), (int)offset);
abort();
}
@@ -251,17 +320,8 @@ static void test_table_codec(void) {
}
int main(int argc, char const * const *argv) {
- amqp_table_entry_t entries[8] =
- { AMQP_TABLE_ENTRY_UTF8("zebra", amqp_cstring_bytes("last")),
- AMQP_TABLE_ENTRY_UTF8("aardvark", amqp_cstring_bytes("first")),
- AMQP_TABLE_ENTRY_UTF8("middle", amqp_cstring_bytes("third")),
- AMQP_TABLE_ENTRY_I32("number", 1234),
- AMQP_TABLE_ENTRY_DECIMAL("decimal", AMQP_DECIMAL(2, 1234)),
- AMQP_TABLE_ENTRY_TIMESTAMP("time", (uint64_t) 1234123412341234LL),
- AMQP_TABLE_ENTRY_UTF8("beta", amqp_cstring_bytes("second")),
- AMQP_TABLE_ENTRY_UTF8("wombat", amqp_cstring_bytes("fourth")) };
- amqp_table_t table = { .num_entries = sizeof(entries) / sizeof(entries[0]),
- .entries = &entries[0] };
+ amqp_table_entry_t entries[8];
+ amqp_table_t table;
union {
uint32_t i;
@@ -272,6 +332,42 @@ int main(int argc, char const * const *argv) {
double d;
} vl;
+ entries[0].key = amqp_cstring_bytes("zebra");
+ entries[0].value.kind = AMQP_FIELD_KIND_UTF8;
+ entries[0].value.value.bytes = amqp_cstring_bytes("last");
+
+ entries[1].key = amqp_cstring_bytes("aardvark");
+ entries[1].value.kind = AMQP_FIELD_KIND_UTF8;
+ entries[1].value.value.bytes = amqp_cstring_bytes("first");
+
+ entries[2].key = amqp_cstring_bytes("middle");
+ entries[2].value.kind = AMQP_FIELD_KIND_UTF8;
+ entries[2].value.value.bytes = amqp_cstring_bytes("third");
+
+ entries[3].key = amqp_cstring_bytes("number");
+ entries[3].value.kind = AMQP_FIELD_KIND_I32;
+ entries[3].value.value.i32 = 1234;
+
+ entries[4].key = amqp_cstring_bytes("decimal");
+ entries[4].value.kind = AMQP_FIELD_KIND_DECIMAL;
+ entries[4].value.value.decimal.decimals = 2;
+ entries[4].value.value.decimal.value = 1234;
+
+ entries[5].key = amqp_cstring_bytes("time");
+ entries[5].value.kind = AMQP_FIELD_KIND_TIMESTAMP;
+ entries[5].value.value.u64 = 1234123412341234;
+
+ entries[6].key = amqp_cstring_bytes("beta");
+ entries[6].value.kind = AMQP_FIELD_KIND_UTF8;
+ entries[6].value.value.bytes = amqp_cstring_bytes("second");
+
+ entries[7].key = amqp_cstring_bytes("wombat");
+ entries[7].value.kind = AMQP_FIELD_KIND_UTF8;
+ entries[7].value.value.bytes = amqp_cstring_bytes("fourth");
+
+ table.num_entries = 8;
+ table.entries = entries;
+
vi.f = M_PI;
if ((sizeof(float) != 4) || (vi.i != 0x40490fdb)) {
printf("*** ERROR: single floating point encoding does not work as expected\n");
@@ -294,7 +390,14 @@ int main(int argc, char const * const *argv) {
qsort(table.entries, table.num_entries, sizeof(amqp_table_entry_t), &amqp_table_entry_cmp);
printf("----------\n");
- dump_value(0, (amqp_field_value_t) AMQP_FIELD_VALUE_TABLE(table));
+
+ {
+ amqp_field_value_t val;
+ val.kind = AMQP_FIELD_KIND_TABLE;
+ val.value.table = table;
+
+ dump_value(0, val);
+ }
return 0;
}
diff --git a/tools/common.c b/tools/common.c
index f8b6985..7b0a969 100644
--- a/tools/common.c
+++ b/tools/common.c
@@ -354,5 +354,5 @@ void process_all_options(int argc, const char **argv,
amqp_bytes_t cstring_bytes(const char *str)
{
- return str ? amqp_cstring_bytes(str) : AMQP_EMPTY_BYTES;
+ return str ? amqp_cstring_bytes(str) : amqp_empty_bytes;
}
diff --git a/tools/consume.c b/tools/consume.c
index 34037d9..434a6f5 100644
--- a/tools/consume.c
+++ b/tools/consume.c
@@ -100,7 +100,7 @@ static amqp_bytes_t setup_queue(amqp_connection_state_t conn,
/* Declare the queue as auto-delete. */
amqp_queue_declare_ok_t *res = amqp_queue_declare(conn, 1,
queue_bytes, 0, 0, 1, 1,
- AMQP_EMPTY_TABLE);
+ amqp_empty_table);
if (!res)
die_rpc(amqp_get_rpc_reply(conn), "queue.declare");
@@ -119,7 +119,7 @@ static amqp_bytes_t setup_queue(amqp_connection_state_t conn,
amqp_bytes_t eb = amqp_cstring_bytes(exchange);
if (!amqp_queue_bind(conn, 1, queue_bytes, eb,
cstring_bytes(routing_key),
- AMQP_EMPTY_TABLE))
+ amqp_empty_table))
die_rpc(amqp_get_rpc_reply(conn),
"queue.bind");
}
@@ -131,8 +131,8 @@ static amqp_bytes_t setup_queue(amqp_connection_state_t conn,
static void do_consume(amqp_connection_state_t conn, amqp_bytes_t queue,
int no_ack, const char * const *argv)
{
- if (!amqp_basic_consume(conn, 1, queue, AMQP_EMPTY_BYTES, 0, no_ack,
- 0, AMQP_EMPTY_TABLE))
+ if (!amqp_basic_consume(conn, 1, queue, amqp_empty_bytes, 0, no_ack,
+ 0, amqp_empty_table))
die_rpc(amqp_get_rpc_reply(conn), "basic.consume");
for (;;) {
diff --git a/tools/declare_queue.c b/tools/declare_queue.c
index 3536455..145a15e 100644
--- a/tools/declare_queue.c
+++ b/tools/declare_queue.c
@@ -88,12 +88,11 @@ int main(int argc, const char **argv)
durable,
0,
0,
- AMQP_EMPTY_TABLE);
- if (reply == NULL) {
+ amqp_empty_table);
+ if (reply == NULL)
die_rpc(amqp_get_rpc_reply(conn), "queue.declare");
- }
- write(1, reply->queue.bytes, reply->queue.len);
- write(1, "\n", strlen("\n"));
+
+ printf("%.*s\n", (int)reply->queue.len, (char *)reply->queue.bytes);
}
close_connection(conn);
return 0;
diff --git a/tools/windows/compat.c b/tools/windows/compat.c
index f0508b2..bce318b 100644
--- a/tools/windows/compat.c
+++ b/tools/windows/compat.c
@@ -61,13 +61,16 @@ int asprintf(char **strp, const char *fmt, ...)
va_start(ap, fmt);
len = _vscprintf(fmt, ap);
+ va_end(ap);
+
*strp = malloc(len+1);
if (!*strp)
return -1;
- len = vsprintf(*strp, fmt, ap);
- *strp[len] = 0;
-
+ va_start(ap, fmt);
+ _vsnprintf(*strp, len+1, fmt, ap);
va_end(ap);
+
+ (*strp)[len] = 0;
return len;
}
diff --git a/tools/windows/process.c b/tools/windows/process.c
index 0a005bd..d0e162a 100644
--- a/tools/windows/process.c
+++ b/tools/windows/process.c
@@ -98,37 +98,57 @@ static char *make_command_line(const char *const *argv)
dest = buf = malloc(len);
if (!buf)
die("allocating memory for subprocess command line");
-
- *dest++ = '\"';
+
+ /* Here we perform the inverse of the CommandLineToArgvW
+ function. Note that it's rules are slightly crazy: A
+ sequence of backslashes only act to escape if followed by
+ double quotes. A seuqence of backslashes not followed by
+ double quotes is unaffected. */
for (i = 0;;) {
const char *src = argv[i];
+ int backslashes = 0;
+
+ *dest++ = '\"';
+
for (;;) {
switch (*src) {
case 0:
goto done;
case '\"':
+ for (; backslashes; backslashes--)
+ *dest++ = '\\';
+
+ *dest++ = '\\';
+ *dest++ = '\"';
+ break;
+
case '\\':
+ backslashes++;
*dest++ = '\\';
- /* fall through */
+ break;
default:
- *dest++ = *src++;
+ backslashes = 0;
+ *dest++ = *src;
break;
}
+
+ src++;
}
- done:
+ done:
+ for (; backslashes; backslashes--)
+ *dest++ = '\\';
+
+ *dest++ = '\"';
if (!argv[++i])
break;
- *dest++ = '\"';
*dest++ = ' ';
- *dest++ = '\"';
}
- *dest++ = '\"';
*dest++ = 0;
return buf;
}