summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--librabbitmq/amqp.h62
-rw-r--r--librabbitmq/amqp_api.c232
-rw-r--r--librabbitmq/codegen.py119
3 files changed, 112 insertions, 301 deletions
diff --git a/librabbitmq/amqp.h b/librabbitmq/amqp.h
index 1541583..144dc83 100644
--- a/librabbitmq/amqp.h
+++ b/librabbitmq/amqp.h
@@ -326,10 +326,6 @@ RABBITMQ_EXPORT amqp_rpc_reply_t amqp_login(amqp_connection_state_t state,
int heartbeat,
amqp_sasl_method_enum sasl_method, ...);
-RABBITMQ_EXPORT struct amqp_channel_open_ok_t_ *amqp_channel_open(
- amqp_connection_state_t state,
- amqp_channel_t channel);
-
struct amqp_basic_properties_t_;
RABBITMQ_EXPORT int amqp_basic_publish(amqp_connection_state_t state,
amqp_channel_t channel,
@@ -348,58 +344,6 @@ RABBITMQ_EXPORT amqp_rpc_reply_t amqp_connection_close(
amqp_connection_state_t state,
int code);
-RABBITMQ_EXPORT struct amqp_exchange_declare_ok_t_ *amqp_exchange_declare(
- amqp_connection_state_t state,
- amqp_channel_t channel,
- amqp_bytes_t exchange,
- amqp_bytes_t type,
- amqp_boolean_t passive,
- amqp_boolean_t durable,
- amqp_table_t arguments);
-
-RABBITMQ_EXPORT struct amqp_queue_declare_ok_t_ *amqp_queue_declare(
- amqp_connection_state_t state,
- amqp_channel_t channel,
- amqp_bytes_t queue,
- amqp_boolean_t passive,
- amqp_boolean_t durable,
- amqp_boolean_t exclusive,
- amqp_boolean_t auto_delete,
- amqp_table_t arguments);
-
-RABBITMQ_EXPORT struct amqp_queue_delete_ok_t_ *amqp_queue_delete(
- amqp_connection_state_t state,
- amqp_channel_t channel,
- amqp_bytes_t queue,
- amqp_boolean_t if_unused,
- amqp_boolean_t if_empty);
-
-RABBITMQ_EXPORT struct amqp_queue_bind_ok_t_ *amqp_queue_bind(
- amqp_connection_state_t state,
- amqp_channel_t channel,
- amqp_bytes_t queue,
- amqp_bytes_t exchange,
- amqp_bytes_t routing_key,
- amqp_table_t arguments);
-
-RABBITMQ_EXPORT struct amqp_queue_unbind_ok_t_ *amqp_queue_unbind(
- amqp_connection_state_t state,
- amqp_channel_t channel,
- amqp_bytes_t queue,
- amqp_bytes_t exchange,
- amqp_bytes_t routing_key,
- amqp_table_t arguments);
-
-RABBITMQ_EXPORT struct amqp_basic_consume_ok_t_ *amqp_basic_consume(
- amqp_connection_state_t state,
- amqp_channel_t channel,
- amqp_bytes_t queue,
- amqp_bytes_t consumer_tag,
- amqp_boolean_t no_local,
- amqp_boolean_t no_ack,
- amqp_boolean_t exclusive,
- amqp_table_t filter);
-
RABBITMQ_EXPORT int amqp_basic_ack(amqp_connection_state_t state,
amqp_channel_t channel,
uint64_t delivery_tag,
@@ -410,12 +354,6 @@ RABBITMQ_EXPORT amqp_rpc_reply_t amqp_basic_get(amqp_connection_state_t state,
amqp_bytes_t queue,
amqp_boolean_t no_ack);
-RABBITMQ_EXPORT struct amqp_queue_purge_ok_t_ *amqp_queue_purge(
- amqp_connection_state_t state,
- amqp_channel_t channel,
- amqp_bytes_t queue,
- amqp_boolean_t no_wait);
-
RABBITMQ_EXPORT struct amqp_tx_select_ok_t_ *amqp_tx_select(
amqp_connection_state_t state,
amqp_channel_t channel);
diff --git a/librabbitmq/amqp_api.c b/librabbitmq/amqp_api.c
index bf19761..456987d 100644
--- a/librabbitmq/amqp_api.c
+++ b/librabbitmq/amqp_api.c
@@ -119,23 +119,6 @@ const amqp_array_t amqp_empty_array = { 0, NULL };
? (replytype *) state->most_recent_api_result.reply.decoded \
: NULL)
-amqp_channel_open_ok_t *amqp_channel_open(amqp_connection_state_t state,
- amqp_channel_t channel)
-{
- amqp_method_number_t replies[2] = { AMQP_CHANNEL_OPEN_OK_METHOD, 0};
- amqp_channel_open_t req;
- req.out_of_band.bytes = NULL;
- req.out_of_band.len = 0;
-
- state->most_recent_api_result = amqp_simple_rpc(state, channel,
- AMQP_CHANNEL_OPEN_METHOD,
- replies, &req);
- if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL)
- return state->most_recent_api_result.reply.decoded;
- else
- return NULL;
-}
-
int amqp_basic_publish(amqp_connection_state_t state,
amqp_channel_t channel,
amqp_bytes_t exchange,
@@ -238,162 +221,6 @@ amqp_rpc_reply_t amqp_connection_close(amqp_connection_state_t state,
replies, &req);
}
-amqp_exchange_declare_ok_t *amqp_exchange_declare(amqp_connection_state_t state,
- amqp_channel_t channel,
- amqp_bytes_t exchange,
- amqp_bytes_t type,
- amqp_boolean_t passive,
- amqp_boolean_t durable,
- amqp_table_t arguments)
-{
- amqp_method_number_t replies[2] = { AMQP_EXCHANGE_DECLARE_OK_METHOD, 0};
- amqp_exchange_declare_t req;
- req.exchange = exchange;
- req.type = type;
- req.passive = passive;
- req.durable = durable;
- req.auto_delete = 0;
- req.internal = 0;
- req.nowait = 0;
- req.arguments = arguments;
-
- state->most_recent_api_result = amqp_simple_rpc(state, channel,
- AMQP_EXCHANGE_DECLARE_METHOD,
- replies, &req);
- if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL)
- return state->most_recent_api_result.reply.decoded;
- else
- return NULL;
-}
-
-amqp_queue_declare_ok_t *amqp_queue_declare(amqp_connection_state_t state,
- amqp_channel_t channel,
- amqp_bytes_t queue,
- amqp_boolean_t passive,
- amqp_boolean_t durable,
- amqp_boolean_t exclusive,
- amqp_boolean_t auto_delete,
- amqp_table_t arguments)
-{
- amqp_method_number_t replies[2] = { AMQP_QUEUE_DECLARE_OK_METHOD, 0};
- amqp_queue_declare_t req;
- req.queue = queue;
- req.passive = passive;
- req.durable = durable;
- req.exclusive = exclusive;
- req.auto_delete = auto_delete;
- req.nowait = 0;
- req.arguments = arguments;
-
- state->most_recent_api_result = amqp_simple_rpc(state, channel,
- AMQP_QUEUE_DECLARE_METHOD,
- replies, &req);
- if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL)
- return state->most_recent_api_result.reply.decoded;
- else
- return NULL;
-}
-
-amqp_queue_delete_ok_t *amqp_queue_delete(amqp_connection_state_t state,
- amqp_channel_t channel,
- amqp_bytes_t queue,
- amqp_boolean_t if_unused,
- amqp_boolean_t if_empty)
-{
- amqp_method_number_t replies[2] = { AMQP_QUEUE_DELETE_OK_METHOD, 0};
- amqp_queue_delete_t req;
- req.queue = queue;
- req.if_unused = if_unused;
- req.if_empty = if_empty;
- req.nowait = 0;
-
- state->most_recent_api_result = amqp_simple_rpc(state, channel,
- AMQP_QUEUE_DELETE_METHOD,
- replies, &req);
- if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL)
- return state->most_recent_api_result.reply.decoded;
- else
- return NULL;
-}
-
-amqp_queue_bind_ok_t *amqp_queue_bind(amqp_connection_state_t state,
- amqp_channel_t channel,
- amqp_bytes_t queue,
- amqp_bytes_t exchange,
- amqp_bytes_t routing_key,
- amqp_table_t arguments)
-{
- amqp_method_number_t replies[2] = { AMQP_QUEUE_BIND_OK_METHOD, 0};
- amqp_queue_bind_t req;
- req.ticket = 0;
- req.queue = queue;
- req.exchange = exchange;
- req.routing_key = routing_key;
- req.nowait = 0;
- req.arguments = arguments;
-
- state->most_recent_api_result = amqp_simple_rpc(state, channel,
- AMQP_QUEUE_BIND_METHOD,
- replies, &req);
- if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL)
- return state->most_recent_api_result.reply.decoded;
- else
- return NULL;
-}
-
-amqp_queue_unbind_ok_t *amqp_queue_unbind(amqp_connection_state_t state,
- amqp_channel_t channel,
- amqp_bytes_t queue,
- amqp_bytes_t exchange,
- amqp_bytes_t routing_key,
- amqp_table_t arguments)
-{
- amqp_method_number_t replies[2] = { AMQP_QUEUE_UNBIND_OK_METHOD, 0};
- amqp_queue_unbind_t req;
- req.ticket = 0;
- req.queue = queue;
- req.exchange = exchange;
- req.routing_key = routing_key;
- req.arguments = arguments;
-
- state->most_recent_api_result = amqp_simple_rpc(state, channel,
- AMQP_QUEUE_UNBIND_METHOD,
- replies, &req);
- if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL)
- return state->most_recent_api_result.reply.decoded;
- else
- return NULL;
-}
-
-amqp_basic_consume_ok_t *amqp_basic_consume(amqp_connection_state_t state,
- amqp_channel_t channel,
- amqp_bytes_t queue,
- amqp_bytes_t consumer_tag,
- amqp_boolean_t no_local,
- amqp_boolean_t no_ack,
- amqp_boolean_t exclusive,
- amqp_table_t filter)
-{
- amqp_method_number_t replies[2] = { AMQP_BASIC_CONSUME_OK_METHOD, 0};
- amqp_basic_consume_t req;
- req.ticket = 0;
- req.queue = queue;
- req.consumer_tag = consumer_tag;
- req.no_local = no_local;
- req.no_ack = no_ack;
- req.exclusive = exclusive;
- req.nowait = 0;
- req.arguments = filter;
-
- state->most_recent_api_result = amqp_simple_rpc(state, channel,
- AMQP_BASIC_CONSUME_METHOD,
- replies, &req);
- if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL)
- return state->most_recent_api_result.reply.decoded;
- else
- return NULL;
-}
-
int amqp_basic_ack(amqp_connection_state_t state,
amqp_channel_t channel,
uint64_t delivery_tag,
@@ -405,26 +232,6 @@ int amqp_basic_ack(amqp_connection_state_t state,
return amqp_send_method(state, channel, AMQP_BASIC_ACK_METHOD, &m);
}
-amqp_queue_purge_ok_t *amqp_queue_purge(amqp_connection_state_t state,
- amqp_channel_t channel,
- amqp_bytes_t queue,
- amqp_boolean_t no_wait)
-{
- amqp_method_number_t replies[2] = { AMQP_QUEUE_PURGE_OK_METHOD, 0};
- amqp_queue_purge_t req;
- req.ticket = 0;
- req.queue = queue;
- req.nowait = 0;
-
- state->most_recent_api_result = amqp_simple_rpc(state, channel,
- AMQP_QUEUE_PURGE_METHOD,
- replies, &req);
- if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL)
- return state->most_recent_api_result.reply.decoded;
- else
- return NULL;
-}
-
amqp_rpc_reply_t amqp_basic_get(amqp_connection_state_t state,
amqp_channel_t channel,
amqp_bytes_t queue,
@@ -444,45 +251,6 @@ amqp_rpc_reply_t amqp_basic_get(amqp_connection_state_t state,
return state->most_recent_api_result;
}
-amqp_tx_select_ok_t *amqp_tx_select(amqp_connection_state_t state,
- amqp_channel_t channel)
-{
- amqp_method_number_t replies[2] = { AMQP_TX_SELECT_OK_METHOD, 0};
- state->most_recent_api_result = amqp_simple_rpc(state, channel,
- AMQP_TX_SELECT_METHOD,
- replies, NULL);
- if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL)
- return state->most_recent_api_result.reply.decoded;
- else
- return NULL;
-}
-
-amqp_tx_commit_ok_t *amqp_tx_commit(amqp_connection_state_t state,
- amqp_channel_t channel)
-{
- amqp_method_number_t replies[2] = { AMQP_TX_COMMIT_OK_METHOD, 0};
- state->most_recent_api_result = amqp_simple_rpc(state, channel,
- AMQP_TX_COMMIT_METHOD,
- replies, NULL);
- if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL)
- return state->most_recent_api_result.reply.decoded;
- else
- return NULL;
-}
-
-amqp_tx_rollback_ok_t *amqp_tx_rollback(amqp_connection_state_t state,
- amqp_channel_t channel)
-{
- amqp_method_number_t replies[2] = { AMQP_TX_ROLLBACK_OK_METHOD, 0};
- state->most_recent_api_result = amqp_simple_rpc(state, channel,
- AMQP_TX_ROLLBACK_METHOD,
- replies, NULL);
- if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL)
- return state->most_recent_api_result.reply.decoded;
- else
- return NULL;
-}
-
amqp_rpc_reply_t amqp_get_rpc_reply(amqp_connection_state_t state)
{
return state->most_recent_api_result;
diff --git a/librabbitmq/codegen.py b/librabbitmq/codegen.py
index 6fd149e..cded64a 100644
--- a/librabbitmq/codegen.py
+++ b/librabbitmq/codegen.py
@@ -112,7 +112,7 @@ class BitEncoder(object):
self.emitter.emit(line)
def encode_bit(self, value):
- """Generate code to ebcode a value of the AMQP bit type from
+ """Generate code to encode a value of the AMQP bit type from
the given value."""
if self.bit == 0:
self.emitter.emit("bit_buffer = 0;")
@@ -138,6 +138,8 @@ class SimpleType(object):
def encode(self, emitter, value):
emitter.emit("if (!amqp_encode_%d(encoded, &offset, %s)) return -ERROR_BAD_AMQP_DATA;" % (self.bits, value))
+ def literal(self, value):
+ return value
class StrType(object):
"""The AMQP shortstr or longstr types."""
@@ -159,6 +161,11 @@ class StrType(object):
emitter.emit(" || !amqp_encode_bytes(encoded, &offset, %s))" % (value,))
emitter.emit(" return -ERROR_BAD_AMQP_DATA;")
+ def literal(self, value):
+ if value != '':
+ raise NotImplementedError()
+
+ return "amqp_empty_bytes"
class BitType(object):
"""The AMQP bit type."""
@@ -172,6 +179,8 @@ class BitType(object):
def encode(self, emitter, value):
emitter.encode_bit(value)
+ def literal(self, value):
+ return {True: 1, False: 0}[value]
class TableType(object):
"""The AMQP table type."""
@@ -191,6 +200,8 @@ class TableType(object):
emitter.emit(" if (res < 0) return res;")
emitter.emit("}")
+ def literal(self, value):
+ raise NotImplementedError()
types = {
'octet': SimpleType(8),
@@ -213,11 +224,54 @@ def c_ize(s):
s = s.replace(' ', '_')
return s
+# When generating API functions corresponding to synchronous methods,
+# we need some information that isn't in the protocol def: Some
+# methods should not be exposed, indicated here by a False value.
+# Some methods should be exposed but certain fields should not be
+# exposed as parameters.
+apiMethodInfo = {
+ "amqp_connection_start": False, # application code should not use this
+ "amqp_connection_secure": False, # application code should not use this
+ "amqp_connection_tune": False, # application code should not use this
+ "amqp_connection_open": False, # application code should not use this
+ "amqp_connection_close": False, # needs special handling
+ "amqp_channel_open": ["out_of_band"],
+ "amqp_channel_close": False, # needs special handling
+ "amqp_access_request": False, # huh?
+ "amqp_exchange_declare": ["auto_delete", "internal"],
+ "amqp_basic_get": False, # get-ok has content
+}
+
+# When generating API functions corresponding to synchronous methods,
+# some fields should be suppressed everywhere. This dict names those
+# fields, and the fixed values to use for them.
+apiMethodsSuppressArgs = {"ticket": 0, "nowait": False}
+
AmqpMethod.defName = lambda m: cConstantName(c_ize(m.klass.name) + '_' + c_ize(m.name) + "_method")
-AmqpMethod.structName = lambda m: "amqp_" + c_ize(m.klass.name) + '_' + c_ize(m.name) + "_t"
+AmqpMethod.fullName = lambda m: "amqp_%s_%s" % (c_ize(m.klass.name), c_ize(m.name))
+AmqpMethod.structName = lambda m: m.fullName() + "_t"
AmqpClass.structName = lambda c: "amqp_" + c_ize(c.name) + "_properties_t"
+def methodApiPrototype(m):
+ fn = m.fullName()
+ info = apiMethodInfo.get(fn, [])
+
+ args = []
+ for f in m.arguments:
+ n = c_ize(f.name)
+ if n in apiMethodsSuppressArgs or n in info:
+ continue
+
+ args.append(", ")
+ args.append(typeFor(m.klass.spec, f).ctype)
+ args.append(" ")
+ args.append(n)
+
+ return "%s_ok_t *%s(amqp_connection_state_t state, amqp_channel_t channel%s)" % (fn, fn, ''.join(args))
+
+AmqpMethod.apiPrototype = methodApiPrototype
+
def cConstantName(s):
return 'AMQP_' + '_'.join(re.split('[- ]', s.upper()))
@@ -424,13 +478,56 @@ int amqp_encode_properties(uint16_t class_id,
remaining_flags = remainder;
} while (remaining_flags != 0);
}
-
+
switch (class_id) {"""
for c in spec.allClasses(): genEncodeProperties(c)
print """ default: return -ERROR_UNKNOWN_CLASS;
}
}"""
+ for m in methods:
+ if not m.isSynchronous:
+ continue
+
+ info = apiMethodInfo.get(m.fullName(), [])
+ if info is False:
+ continue
+
+ reply = cConstantName(c_ize(m.klass.name) + '_' + c_ize(m.name)
+ + "_ok_method")
+
+ print
+ print m.apiPrototype()
+ print "{"
+ print " amqp_method_number_t replies[2] = { %s, 0};" % (reply,)
+ print " %s req;" % (m.structName(),)
+
+ for f in m.arguments:
+ n = c_ize(f.name)
+
+ val = apiMethodsSuppressArgs.get(n)
+ if val is None and n in info:
+ val = f.defaultvalue
+
+ if val is None:
+ val = n
+ else:
+ val = typeFor(spec, f).literal(val)
+
+
+ print " req.%s = %s;" % (n, val)
+
+ print """
+ state->most_recent_api_result = amqp_simple_rpc(state, channel,
+ %s,
+ replies, &req);
+ if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL)
+ return state->most_recent_api_result.reply.decoded;
+ else
+ return NULL;
+}
+""" % (m.defName(),)
+
def genHrl(spec):
def fieldDeclList(fields):
if fields:
@@ -439,7 +536,7 @@ def genHrl(spec):
for f in fields])
else:
return " char dummy; /* Dummy field to avoid empty struct */\n"
-
+
def propDeclList(fields):
return ''.join([" %s %s;\n" % (typeFor(spec, f).ctype, c_ize(f.name))
for f in fields
@@ -465,6 +562,7 @@ extern "C" {
print
print """/* Function prototypes. */
+
extern char const *amqp_constant_name(int constantNumber);
extern amqp_boolean_t amqp_constant_is_hard_error(int constantNumber);
RABBITMQ_EXPORT char const *amqp_method_name(amqp_method_number_t methodNumber);
@@ -485,7 +583,7 @@ extern int amqp_encode_properties(uint16_t class_id,
amqp_bytes_t encoded);
"""
- print "/* Method field records. */"
+ print "/* Method field records. */\n"
for m in methods:
methodid = m.klass.index << 16 | m.index
print "#define %s ((amqp_method_number_t) 0x%.08X) /* %d, %d; %d */" % \
@@ -515,7 +613,14 @@ extern int amqp_encode_properties(uint16_t class_id,
fieldDeclList(c.fields),
c.structName())
- print """#ifdef __cplusplus
+ print "/* API functions for methods */\n"
+
+ for m in methods:
+ if m.isSynchronous and apiMethodInfo.get(m.fullName()) is not False:
+ print "RABBITMQ_EXPORT %s;" % (m.apiPrototype(),)
+
+ print """
+#ifdef __cplusplus
}
#endif
@@ -526,6 +631,6 @@ def generateErl(specPath):
def generateHrl(specPath):
genHrl(AmqpSpec(specPath))
-
+
if __name__ == "__main__":
do_main(generateHrl, generateErl)