summaryrefslogtreecommitdiff
path: root/librabbitmq/amqp_socket.c
diff options
context:
space:
mode:
authorAlan Antonuk <alan.antonuk@gmail.com>2013-06-26 14:23:16 -0700
committerAlan Antonuk <alan.antonuk@gmail.com>2013-07-08 15:13:32 -0700
commit33ebeede97a81c2e82ac0c3a6d88d4db0695bf29 (patch)
tree1151d9a060cb9afdf6ac25b66ddbdd03e46f8674 /librabbitmq/amqp_socket.c
parent08af83a97a99c08aaa3878724a07829e0bb955da (diff)
downloadrabbitmq-c-github-ask-33ebeede97a81c2e82ac0c3a6d88d4db0695bf29.tar.gz
Add a high level API for consuming messages
Diffstat (limited to 'librabbitmq/amqp_socket.c')
-rw-r--r--librabbitmq/amqp_socket.c74
1 files changed, 74 insertions, 0 deletions
diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c
index be85ff5..ef9debd 100644
--- a/librabbitmq/amqp_socket.c
+++ b/librabbitmq/amqp_socket.c
@@ -782,6 +782,80 @@ beginrecv:
}
}
+int amqp_queue_frame(amqp_connection_state_t state, amqp_frame_t *frame)
+{
+ amqp_link_t *link;
+ amqp_frame_t *frame_copy;
+
+ amqp_pool_t *channel_pool = amqp_get_or_create_channel_pool(state, frame->channel);
+
+ if (NULL == channel_pool) {
+ return AMQP_STATUS_NO_MEMORY;
+ }
+
+ link = amqp_pool_alloc(channel_pool, sizeof(amqp_link_t));
+ frame_copy = amqp_pool_alloc(channel_pool, sizeof(amqp_frame_t));
+
+ if (NULL == link || NULL == frame_copy) {
+ return AMQP_STATUS_NO_MEMORY;
+ }
+
+ *frame_copy = *frame;
+ link->data = frame_copy;
+
+ if (NULL == state->first_queued_frame) {
+ state->first_queued_frame = link;
+ } else {
+ state->last_queued_frame->next = link;
+ }
+
+ link->next = NULL;
+ state->last_queued_frame = link;
+
+ return AMQP_STATUS_OK;
+}
+
+int amqp_simple_wait_frame_on_channel(amqp_connection_state_t state,
+ amqp_channel_t channel,
+ amqp_frame_t *decoded_frame)
+{
+ amqp_frame_t *frame_ptr;
+ amqp_link_t *cur;
+ int res;
+
+ for (cur = state->first_queued_frame; NULL != cur; cur = cur->next) {
+ frame_ptr = cur->data;
+
+ if (channel == frame_ptr->channel) {
+ state->first_queued_frame = cur->next;
+ if (NULL == state->first_queued_frame) {
+ state->last_queued_frame = NULL;
+ }
+
+ *decoded_frame = *frame_ptr;
+
+ return AMQP_STATUS_OK;
+ }
+ }
+
+ while (1) {
+ res = wait_frame_inner(state, decoded_frame, NULL);
+
+ if (AMQP_STATUS_OK != res) {
+ return res;
+ }
+
+ if (channel == decoded_frame->channel) {
+ return AMQP_STATUS_OK;
+ } else {
+ res = amqp_queue_frame(state, decoded_frame);
+ if (res != AMQP_STATUS_OK) {
+ return res;
+ }
+ }
+ }
+}
+
int amqp_simple_wait_frame(amqp_connection_state_t state,
amqp_frame_t *decoded_frame)
{