summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2013-08-06 16:27:50 +0000
committerTed Ross <tross@apache.org>2013-08-06 16:27:50 +0000
commit45ca380db142e921734a7cf7c8113fd719ed8687 (patch)
tree7b97894a32b62762af250d7074042a93aba1b97c
parent019bfab57680b3b2e65bda40aed3ddaf9d6024b1 (diff)
downloadqpid-python-45ca380db142e921734a7cf7c8113fd719ed8687.tar.gz
QPID-5045 - Refactored the router data structures to support message and link routing.
QPID-4997 - Fixed the thread safety problem. - Wrapped "pn_delivery" in the container to allow for explicit linkage of delivery pairs. - Removed the linkage between "message" and "delivery". Messages can now be discarded as soon as the last copy is delivered because their status will be tracked in the dx_delivery object. - Removed tx_handler from the set of callbacks in the container. There is no need for this notification (sendable delivery) because Dispatch does not create outbound deliveries until it is ready to send them (i.e. deliveries are created and advanced in one synchronous operation). - Replaced the out_fifo of messages per outbound-link with a pair of fifos (one for messages and one for state changes) per link. Note that even inbound-links need to send state changes outbound. This change addresses QPID-4997. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1511021 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--extras/dispatch/include/qpid/dispatch/container.h88
-rw-r--r--extras/dispatch/include/qpid/dispatch/message.h13
-rw-r--r--extras/dispatch/src/container.c180
-rw-r--r--extras/dispatch/src/message.c54
-rw-r--r--extras/dispatch/src/message_private.h2
-rw-r--r--extras/dispatch/src/router_node.c385
6 files changed, 465 insertions, 257 deletions
diff --git a/extras/dispatch/include/qpid/dispatch/container.h b/extras/dispatch/include/qpid/dispatch/container.h
index bf5a536ff1..a306895b50 100644
--- a/extras/dispatch/include/qpid/dispatch/container.h
+++ b/extras/dispatch/include/qpid/dispatch/container.h
@@ -51,10 +51,11 @@ typedef enum {
} dx_direction_t;
-typedef struct dx_node_t dx_node_t;
-typedef struct dx_link_t dx_link_t;
+typedef struct dx_node_t dx_node_t;
+typedef struct dx_link_t dx_link_t;
+typedef struct dx_delivery_t dx_delivery_t;
-typedef void (*dx_container_delivery_handler_t) (void *node_context, dx_link_t *link, pn_delivery_t *delivery);
+typedef void (*dx_container_delivery_handler_t) (void *node_context, dx_link_t *link, dx_delivery_t *delivery);
typedef int (*dx_container_link_handler_t) (void *node_context, dx_link_t *link);
typedef int (*dx_container_link_detach_handler_t) (void *node_context, dx_link_t *link, int closed);
typedef void (*dx_container_node_handler_t) (void *type_context, dx_node_t *node);
@@ -65,23 +66,68 @@ typedef struct {
void *type_context;
int allow_dynamic_creation;
- //
+ //=======================
// Node-Instance Handlers
+ //=======================
+
+ //
+ // rx_handler - Invoked when a new received delivery is avaliable for processing.
+ //
+ dx_container_delivery_handler_t rx_handler;
+
+ //
+ // disp_handler - Invoked when an existing delivery changes disposition
+ // or settlement state.
+ //
+ dx_container_delivery_handler_t disp_handler;
+
//
- dx_container_delivery_handler_t rx_handler;
- dx_container_delivery_handler_t tx_handler;
- dx_container_delivery_handler_t disp_handler;
- dx_container_link_handler_t incoming_handler;
- dx_container_link_handler_t outgoing_handler;
- dx_container_link_handler_t writable_handler;
- dx_container_link_detach_handler_t link_detach_handler;
+ // incoming_handler - Invoked when an attach for a new incoming link is received.
+ //
+ dx_container_link_handler_t incoming_handler;
+
+ //
+ // outgoing_handler - Invoked when an attach for a new outgoing link is received.
+ //
+ dx_container_link_handler_t outgoing_handler;
//
+ // writable_handler - Invoked when an outgoing link is available for sending either
+ // deliveries or disposition changes. The handler must check the
+ // link's credit to determine whether (and how many) message
+ // deliveries may be sent.
+ //
+ dx_container_link_handler_t writable_handler;
+
+ //
+ // link_detach_handler - Invoked when a link is detached.
+ //
+ dx_container_link_detach_handler_t link_detach_handler;
+
+ //===================
// Node-Type Handlers
+ //===================
+
+ //
+ // node_created_handler - Invoked when a new instance of the node-type is created.
//
dx_container_node_handler_t node_created_handler;
+
+ //
+ // node_destroyed_handler - Invoked when an instance of the node type is destroyed.
+ //
dx_container_node_handler_t node_destroyed_handler;
+
+ //
+ // inbound_conn_open_handler - Invoked when an incoming connection (via listener)
+ // is established.
+ //
dx_container_conn_handler_t inbound_conn_open_handler;
+
+ //
+ // outbound_conn_open_handler - Invoked when an outgoing connection (via connector)
+ // is established.
+ //
dx_container_conn_handler_t outbound_conn_open_handler;
} dx_node_type_t;
@@ -116,6 +162,26 @@ pn_terminus_t *dx_link_remote_target(dx_link_t *link);
void dx_link_activate(dx_link_t *link);
void dx_link_close(dx_link_t *link);
+/**
+ * Important: dx_delivery must never be called twice in a row without an intervening pn_link_advance.
+ * The Disatch architecture provides a hook for discovering when an outgoing link is writable
+ * and has credit. When a link is writable, a delivery is allocated, written, and advanced
+ * in one operation. If a backlog of pending deliveries is created, an assertion will be
+ * thrown.
+ */
+dx_delivery_t *dx_delivery(dx_link_t *link, pn_delivery_tag_t tag);
+void dx_delivery_free(dx_delivery_t *delivery, uint64_t final_disposition);
+void dx_delivery_set_peer(dx_delivery_t *delivery, dx_delivery_t *peer);
+dx_delivery_t *dx_delivery_peer(dx_delivery_t *delivery);
+void dx_delivery_set_context(dx_delivery_t *delivery, void *context);
+void *dx_delivery_context(dx_delivery_t *delivery);
+pn_delivery_t *dx_delivery_pn(dx_delivery_t *delivery);
+void dx_delivery_settle(dx_delivery_t *delivery);
+bool dx_delivery_settled(dx_delivery_t *delivery);
+bool dx_delivery_disp_changed(dx_delivery_t *delivery);
+uint64_t dx_delivery_disp(dx_delivery_t *delivery);
+dx_link_t *dx_delivery_link(dx_delivery_t *delivery);
+
typedef struct dx_link_item_t dx_link_item_t;
diff --git a/extras/dispatch/include/qpid/dispatch/message.h b/extras/dispatch/include/qpid/dispatch/message.h
index bc8f3f08b1..fa40314be9 100644
--- a/extras/dispatch/include/qpid/dispatch/message.h
+++ b/extras/dispatch/include/qpid/dispatch/message.h
@@ -19,12 +19,12 @@
* under the License.
*/
-#include <proton/engine.h>
#include <qpid/dispatch/ctools.h>
#include <qpid/dispatch/alloc.h>
#include <qpid/dispatch/iterator.h>
#include <qpid/dispatch/buffer.h>
#include <qpid/dispatch/compose.h>
+#include <qpid/dispatch/container.h>
// Callback for status change (confirmed persistent, loaded-in-memory, etc.)
@@ -97,16 +97,11 @@ dx_message_t *dx_message_copy(dx_message_t *qm);
int dx_message_persistent(dx_message_t *qm);
int dx_message_in_memory(dx_message_t *qm);
-void dx_message_set_out_delivery(dx_message_t *msg, pn_delivery_t *delivery);
-pn_delivery_t *dx_message_out_delivery(dx_message_t *msg);
-void dx_message_set_in_delivery(dx_message_t *msg, pn_delivery_t *delivery);
-pn_delivery_t *dx_message_in_delivery(dx_message_t *msg);
-
//
// Functions for received messages
//
-dx_message_t *dx_message_receive(pn_delivery_t *delivery);
-void dx_message_send(dx_message_t *msg, pn_link_t *link);
+dx_message_t *dx_message_receive(dx_delivery_t *delivery);
+void dx_message_send(dx_message_t *msg, dx_link_t *link);
int dx_message_check(dx_message_t *msg, dx_message_depth_t depth);
dx_field_iterator_t *dx_message_field_iterator(dx_message_t *msg, dx_message_field_t field);
@@ -114,8 +109,6 @@ dx_field_iterator_t *dx_message_field_iterator(dx_message_t *msg, dx_message_fie
ssize_t dx_message_field_length(dx_message_t *msg, dx_message_field_t field);
ssize_t dx_message_field_copy(dx_message_t *msg, dx_message_field_t field, void *buffer);
-pn_delivery_t *dx_message_inbound_delivery(dx_message_t *qm);
-
//
// Functions for composed messages
//
diff --git a/extras/dispatch/src/container.c b/extras/dispatch/src/container.c
index 0690874ab9..4a38cfc69c 100644
--- a/extras/dispatch/src/container.c
+++ b/extras/dispatch/src/container.c
@@ -47,6 +47,7 @@ ALLOC_DECLARE(dx_node_t);
ALLOC_DEFINE(dx_node_t);
ALLOC_DEFINE(dx_link_item_t);
+
struct dx_link_t {
pn_link_t *pn_link;
void *context;
@@ -56,6 +57,19 @@ struct dx_link_t {
ALLOC_DECLARE(dx_link_t);
ALLOC_DEFINE(dx_link_t);
+
+struct dx_delivery_t {
+ pn_delivery_t *pn_delivery;
+ dx_delivery_t *peer;
+ void *context;
+ uint64_t disposition;
+ dx_link_t *link;
+};
+
+ALLOC_DECLARE(dx_delivery_t);
+ALLOC_DEFINE(dx_delivery_t);
+
+
typedef struct dxc_node_type_t {
DEQ_LINKS(struct dxc_node_type_t);
const dx_node_type_t *ntype;
@@ -180,14 +194,25 @@ static int do_writable(pn_link_t *pn_link)
}
-static void process_receive(pn_delivery_t *delivery)
+static void do_receive(pn_delivery_t *pnd)
{
- pn_link_t *pn_link = pn_delivery_link(delivery);
- dx_link_t *link = (dx_link_t*) pn_link_get_context(pn_link);
+ pn_link_t *pn_link = pn_delivery_link(pnd);
+ dx_link_t *link = (dx_link_t*) pn_link_get_context(pn_link);
+ dx_delivery_t *delivery = (dx_delivery_t*) pn_delivery_get_context(pnd);
if (link) {
dx_node_t *node = link->node;
if (node) {
+ if (!delivery) {
+ delivery = new_dx_delivery_t();
+ delivery->pn_delivery = pnd;
+ delivery->peer = 0;
+ delivery->context = 0;
+ delivery->disposition = 0;
+ delivery->link = link;
+ pn_delivery_set_context(pnd, delivery);
+ }
+
node->ntype->rx_handler(node->context, link, delivery);
return;
}
@@ -198,34 +223,18 @@ static void process_receive(pn_delivery_t *delivery)
//
pn_link_advance(pn_link);
pn_link_flow(pn_link, 1);
- pn_delivery_update(delivery, PN_REJECTED);
- pn_delivery_settle(delivery);
-}
-
-
-static void do_send(pn_delivery_t *delivery)
-{
- pn_link_t *pn_link = pn_delivery_link(delivery);
- dx_link_t *link = (dx_link_t*) pn_link_get_context(pn_link);
-
- if (link) {
- dx_node_t *node = link->node;
- if (node) {
- node->ntype->tx_handler(node->context, link, delivery);
- return;
- }
- }
-
- // TODO - Cancel the delivery
+ pn_delivery_update(pnd, PN_REJECTED);
+ pn_delivery_settle(pnd);
}
-static void do_updated(pn_delivery_t *delivery)
+static void do_updated(pn_delivery_t *pnd)
{
- pn_link_t *pn_link = pn_delivery_link(delivery);
- dx_link_t *link = (dx_link_t*) pn_link_get_context(pn_link);
+ pn_link_t *pn_link = pn_delivery_link(pnd);
+ dx_link_t *link = (dx_link_t*) pn_link_get_context(pn_link);
+ dx_delivery_t *delivery = (dx_delivery_t*) pn_delivery_get_context(pnd);
- if (link) {
+ if (link && delivery) {
dx_node_t *node = link->node;
if (node)
node->ntype->disp_handler(node->context, link, delivery);
@@ -239,15 +248,15 @@ static int close_handler(void* unused, pn_connection_t *conn)
// Close all links, passing False as the 'closed' argument. These links are not
// being properly 'detached'. They are being orphaned.
//
- pn_link_t *pn_link = pn_link_head(conn, 0);
+ pn_link_t *pn_link = pn_link_head(conn, PN_LOCAL_ACTIVE);
while (pn_link) {
dx_link_t *link = (dx_link_t*) pn_link_get_context(pn_link);
dx_node_t *node = link->node;
- if (node)
+ if (node && link)
node->ntype->link_detach_handler(node->context, link, 0);
pn_link_close(pn_link);
free_dx_link_t(link);
- pn_link = pn_link_next(pn_link, 0);
+ pn_link = pn_link_next(pn_link, PN_LOCAL_ACTIVE);
}
// teardown all sessions
@@ -305,9 +314,7 @@ static int process_handler(dx_container_t *container, void* unused, pn_connectio
delivery = pn_work_head(conn);
while (delivery) {
if (pn_delivery_readable(delivery))
- process_receive(delivery);
- else if (pn_delivery_writable(delivery))
- do_send(delivery);
+ do_receive(delivery);
if (pn_delivery_updated(delivery)) {
do_updated(delivery);
@@ -325,7 +332,7 @@ static int process_handler(dx_container_t *container, void* unused, pn_connectio
pn_link = pn_link_head(conn, PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE);
while (pn_link) {
assert(pn_session_connection(pn_link_session(pn_link)) == conn);
- if (pn_link_is_sender(pn_link) && pn_link_credit(pn_link) > 0)
+ if (pn_link_is_sender(pn_link))
event_count += do_writable(pn_link);
pn_link = pn_link_next(pn_link, PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE);
}
@@ -701,3 +708,110 @@ void dx_link_close(dx_link_t *link)
}
+dx_delivery_t *dx_delivery(dx_link_t *link, pn_delivery_tag_t tag)
+{
+ pn_link_t *pnl = dx_link_pn(link);
+
+ //
+ // If there is a current delivery on this outgoing link, something
+ // is wrong with the delivey algorithm. We assume that the current
+ // delivery ('pnd' below) is the one created by pn_delivery. If it is
+ // not, then my understanding of how proton works is incorrect.
+ //
+ assert(!pn_link_current(pnl));
+
+ pn_delivery(pnl, tag);
+ pn_delivery_t *pnd = pn_link_current(pnl);
+
+ if (!pnd)
+ return 0;
+
+ dx_delivery_t *delivery = new_dx_delivery_t();
+ delivery->pn_delivery = pnd;
+ delivery->peer = 0;
+ delivery->context = 0;
+ delivery->disposition = 0;
+ delivery->link = link;
+ pn_delivery_set_context(pnd, delivery);
+
+ return delivery;
+}
+
+
+void dx_delivery_free(dx_delivery_t *delivery, uint64_t final_disposition)
+{
+ if (delivery->pn_delivery) {
+ if (final_disposition > 0)
+ pn_delivery_update(delivery->pn_delivery, final_disposition);
+ pn_delivery_set_context(delivery->pn_delivery, 0);
+ pn_delivery_settle(delivery->pn_delivery);
+ }
+ if (delivery->peer)
+ delivery->peer->peer = 0;
+ free_dx_delivery_t(delivery);
+}
+
+
+void dx_delivery_set_peer(dx_delivery_t *delivery, dx_delivery_t *peer)
+{
+ delivery->peer = peer;
+}
+
+
+void dx_delivery_set_context(dx_delivery_t *delivery, void *context)
+{
+ delivery->context = context;
+}
+
+
+void *dx_delivery_context(dx_delivery_t *delivery)
+{
+ return delivery->context;
+}
+
+
+dx_delivery_t *dx_delivery_peer(dx_delivery_t *delivery)
+{
+ return delivery->peer;
+}
+
+
+pn_delivery_t *dx_delivery_pn(dx_delivery_t *delivery)
+{
+ return delivery->pn_delivery;
+}
+
+
+void dx_delivery_settle(dx_delivery_t *delivery)
+{
+ if (delivery->pn_delivery) {
+ pn_delivery_settle(delivery->pn_delivery);
+ delivery->pn_delivery = 0;
+ }
+}
+
+
+bool dx_delivery_settled(dx_delivery_t *delivery)
+{
+ return pn_delivery_settled(delivery->pn_delivery);
+}
+
+
+bool dx_delivery_disp_changed(dx_delivery_t *delivery)
+{
+ return delivery->disposition != pn_delivery_remote_state(delivery->pn_delivery);
+}
+
+
+uint64_t dx_delivery_disp(dx_delivery_t *delivery)
+{
+ delivery->disposition = pn_delivery_remote_state(delivery->pn_delivery);
+ return delivery->disposition;
+}
+
+
+dx_link_t *dx_delivery_link(dx_delivery_t *delivery)
+{
+ return delivery->link;
+}
+
diff --git a/extras/dispatch/src/message.c b/extras/dispatch/src/message.c
index 6d75de76db..b6b8f688c7 100644
--- a/extras/dispatch/src/message.c
+++ b/extras/dispatch/src/message.c
@@ -343,8 +343,7 @@ dx_message_t *dx_allocate_message()
return 0;
DEQ_ITEM_INIT(msg);
- msg->content = new_dx_message_content_t();
- msg->out_delivery = 0;
+ msg->content = new_dx_message_content_t();
if (msg->content == 0) {
free_dx_message_t((dx_message_t*) msg);
@@ -397,8 +396,7 @@ dx_message_t *dx_message_copy(dx_message_t *in_msg)
return 0;
DEQ_ITEM_INIT(copy);
- copy->content = content;
- copy->out_delivery = 0;
+ copy->content = content;
sys_mutex_lock(content->lock);
content->ref_count++;
@@ -408,36 +406,11 @@ dx_message_t *dx_message_copy(dx_message_t *in_msg)
}
-void dx_message_set_out_delivery(dx_message_t *msg, pn_delivery_t *delivery)
+dx_message_t *dx_message_receive(dx_delivery_t *delivery)
{
- ((dx_message_pvt_t*) msg)->out_delivery = delivery;
-}
-
-
-pn_delivery_t *dx_message_out_delivery(dx_message_t *msg)
-{
- return ((dx_message_pvt_t*) msg)->out_delivery;
-}
-
-
-void dx_message_set_in_delivery(dx_message_t *msg, pn_delivery_t *delivery)
-{
- dx_message_content_t *content = MSG_CONTENT(msg);
- content->in_delivery = delivery;
-}
-
-
-pn_delivery_t *dx_message_in_delivery(dx_message_t *msg)
-{
- dx_message_content_t *content = MSG_CONTENT(msg);
- return content->in_delivery;
-}
-
-
-dx_message_t *dx_message_receive(pn_delivery_t *delivery)
-{
- pn_link_t *link = pn_delivery_link(delivery);
- dx_message_pvt_t *msg = (dx_message_pvt_t*) pn_delivery_get_context(delivery);
+ pn_delivery_t *pnd = dx_delivery_pn(delivery);
+ dx_message_pvt_t *msg = (dx_message_pvt_t*) dx_delivery_context(delivery);
+ pn_link_t *link = pn_delivery_link(pnd);
ssize_t rc;
dx_buffer_t *buf;
@@ -448,15 +421,7 @@ dx_message_t *dx_message_receive(pn_delivery_t *delivery)
//
if (!msg) {
msg = (dx_message_pvt_t*) dx_allocate_message();
- pn_delivery_set_context(delivery, (void*) msg);
-
- //
- // Record the incoming delivery only if it is not settled. If it is
- // settled, it should not be recorded as no future operations on it are
- // permitted.
- //
- if (!pn_delivery_settled(delivery))
- msg->content->in_delivery = delivery;
+ dx_delivery_set_context(delivery, (void*) msg);
}
//
@@ -489,6 +454,7 @@ dx_message_t *dx_message_receive(pn_delivery_t *delivery)
DEQ_REMOVE_TAIL(msg->content->buffers);
dx_free_buffer(buf);
}
+ dx_delivery_set_context(delivery, 0);
return (dx_message_t*) msg;
}
@@ -520,14 +486,14 @@ dx_message_t *dx_message_receive(pn_delivery_t *delivery)
}
-void dx_message_send(dx_message_t *in_msg, pn_link_t *link)
+void dx_message_send(dx_message_t *in_msg, dx_link_t *link)
{
dx_message_pvt_t *msg = (dx_message_pvt_t*) in_msg;
dx_buffer_t *buf = DEQ_HEAD(msg->content->buffers);
// TODO - Handle cases where annotations have been added or modified
while (buf) {
- pn_link_send(link, (char*) dx_buffer_base(buf), dx_buffer_size(buf));
+ pn_link_send(dx_link_pn(link), (char*) dx_buffer_base(buf), dx_buffer_size(buf));
buf = DEQ_NEXT(buf);
}
}
diff --git a/extras/dispatch/src/message_private.h b/extras/dispatch/src/message_private.h
index ca39a84660..e5ba94f196 100644
--- a/extras/dispatch/src/message_private.h
+++ b/extras/dispatch/src/message_private.h
@@ -63,7 +63,6 @@ typedef struct {
sys_mutex_t *lock;
uint32_t ref_count; // The number of qmessages referencing this
dx_buffer_list_t buffers; // The buffer chain containing the message
- pn_delivery_t *in_delivery; // The delivery on which the message arrived
dx_field_location_t section_message_header; // The message header list
dx_field_location_t section_delivery_annotation; // The delivery annotation map
dx_field_location_t section_message_annotation; // The message annotation map
@@ -83,7 +82,6 @@ typedef struct {
typedef struct {
DEQ_LINKS(dx_message_t); // Deq linkage that overlays the dx_message_t
dx_message_content_t *content;
- pn_delivery_t *out_delivery;
} dx_message_pvt_t;
ALLOC_DECLARE(dx_message_t);
diff --git a/extras/dispatch/src/router_node.c b/extras/dispatch/src/router_node.c
index 9938ab113e..1c5e7c266c 100644
--- a/extras/dispatch/src/router_node.c
+++ b/extras/dispatch/src/router_node.c
@@ -62,21 +62,27 @@ typedef enum {
typedef struct dx_routed_event_t {
DEQ_LINKS(struct dx_routed_event_t);
- dx_message_t *message;
- bool settled;
- uint64_t disposition;
+ dx_delivery_t *delivery;
+ dx_message_t *message;
+ bool settle;
+ uint64_t disposition;
} dx_routed_event_t;
+ALLOC_DECLARE(dx_routed_event_t);
+ALLOC_DEFINE(dx_routed_event_t);
+DEQ_DECLARE(dx_routed_event_t, dx_routed_event_list_t);
+
struct dx_router_link_t {
DEQ_LINKS(dx_router_link_t);
- dx_direction_t link_direction;
- dx_link_type_t link_type;
- dx_address_t *owning_addr; // [ref] Address record that owns this link
- dx_link_t *link; // [own] Link pointer
- dx_router_link_t *connected_link; // [ref] If this is a link-route, reference the connected link
- dx_router_link_t *peer_link; // [ref] If this is a bidirectional link-route, reference the peer link
- dx_message_list_t out_fifo; // Message FIFO for outgoing messages. Unused for incoming links
+ dx_direction_t link_direction;
+ dx_link_type_t link_type;
+ dx_address_t *owning_addr; // [ref] Address record that owns this link
+ dx_link_t *link; // [own] Link pointer
+ dx_router_link_t *connected_link; // [ref] If this is a link-route, reference the connected link
+ dx_router_link_t *peer_link; // [ref] If this is a bidirectional link-route, reference the peer link
+ dx_routed_event_list_t event_fifo; // FIFO of outgoing delivery/link events (no messages)
+ dx_routed_event_list_t msg_fifo; // FIFO of outgoing message deliveries
};
ALLOC_DECLARE(dx_router_link_t);
@@ -125,54 +131,128 @@ struct dx_router_t {
/**
- * Outbound Delivery Handler
+ * Outgoing Link Writable Handler
*/
-static void router_tx_handler(void* context, dx_link_t *link, pn_delivery_t *delivery)
+static int router_writable_link_handler(void* context, dx_link_t *link)
{
- dx_router_t *router = (dx_router_t*) context;
- pn_link_t *pn_link = pn_delivery_link(delivery);
- dx_router_link_t *rlink = (dx_router_link_t*) dx_link_get_context(link);
- dx_message_t *msg;
- size_t size;
+ dx_router_t *router = (dx_router_t*) context;
+ dx_delivery_t *delivery;
+ dx_router_link_t *rlink = (dx_router_link_t*) dx_link_get_context(link);
+ pn_link_t *pn_link = dx_link_pn(link);
+ uint64_t tag;
+ int link_credit = pn_link_credit(pn_link);
+ dx_routed_event_list_t to_send;
+ dx_routed_event_list_t events;
+ dx_routed_event_t *re;
+ size_t offer;
+ int event_count = 0;
+
+ DEQ_INIT(to_send);
+ DEQ_INIT(events);
sys_mutex_lock(router->lock);
- msg = DEQ_HEAD(rlink->out_fifo);
- if (!msg) {
- // TODO - Recind the delivery
- sys_mutex_unlock(router->lock);
- return;
+
+ //
+ // Pull the non-delivery events into a local list so they can be processed without
+ // the lock being held.
+ //
+ re = DEQ_HEAD(rlink->event_fifo);
+ while (re) {
+ DEQ_REMOVE_HEAD(rlink->event_fifo);
+ DEQ_INSERT_TAIL(events, re);
+ re = DEQ_HEAD(rlink->event_fifo);
}
- DEQ_REMOVE_HEAD(rlink->out_fifo);
- size = (DEQ_SIZE(rlink->out_fifo));
+ //
+ // Under lock, move available deliveries from the msg_fifo to the local to_send
+ // list. Don't move more than we have credit to send.
+ //
+ if (link_credit > 0) {
+ tag = router->dtag;
+ re = DEQ_HEAD(rlink->msg_fifo);
+ while (re) {
+ DEQ_REMOVE_HEAD(rlink->msg_fifo);
+ DEQ_INSERT_TAIL(to_send, re);
+ if (DEQ_SIZE(to_send) == link_credit)
+ break;
+ re = DEQ_HEAD(rlink->msg_fifo);
+ }
+ router->dtag += DEQ_SIZE(to_send);
+ }
+
+ offer = DEQ_SIZE(rlink->msg_fifo);
sys_mutex_unlock(router->lock);
- dx_message_send(msg, pn_link);
+ //
+ // Deliver all the to_send messages downrange
+ //
+ re = DEQ_HEAD(to_send);
+ while (re) {
+ DEQ_REMOVE_HEAD(to_send);
+
+ //
+ // Get a delivery for the send. This will be the current deliver on the link.
+ //
+ tag++;
+ delivery = dx_delivery(link, pn_dtag((char*) &tag, 8));
+
+ //
+ // Send the message
+ //
+ dx_message_send(re->message, link);
+
+ //
+ // If there is an incoming delivery associated with this message, link it
+ // with the outgoing delivery. Otherwise, the message arrived pre-settled
+ // and should be sent presettled.
+ //
+ if (re->delivery) {
+ dx_delivery_set_peer(re->delivery, delivery);
+ dx_delivery_set_peer(delivery, re->delivery);
+ } else
+ dx_delivery_free(delivery, 0); // settle and free
+
+ pn_link_advance(pn_link);
+ event_count++;
+
+ dx_free_message(re->message);
+ free_dx_routed_event_t(re);
+ re = DEQ_HEAD(to_send);
+ }
//
- // If there is no incoming delivery, it was pre-settled. In this case,
- // we must pre-settle the outgoing delivery as well.
+ // Process the non-delivery events.
//
- if (dx_message_in_delivery(msg)) {
- pn_delivery_set_context(delivery, (void*) msg);
- dx_message_set_out_delivery(msg, delivery);
- } else {
- pn_delivery_settle(delivery);
- dx_free_message(msg);
+ re = DEQ_HEAD(events);
+ while (re) {
+ DEQ_REMOVE_HEAD(events);
+
+ if (re->delivery) {
+ if (re->disposition)
+ pn_delivery_update(dx_delivery_pn(re->delivery), re->disposition);
+ if (re->settle)
+ dx_delivery_free(re->delivery, 0);
+ }
+
+ free_dx_routed_event_t(re);
+ re = DEQ_HEAD(events);
}
- pn_link_advance(pn_link);
- pn_link_offered(pn_link, size);
+ //
+ // Set the offer to the number of messages remaining to be sent.
+ //
+ pn_link_offered(pn_link, offer);
+ return event_count;
}
/**
* Inbound Delivery Handler
*/
-static void router_rx_handler(void* context, dx_link_t *link, pn_delivery_t *delivery)
+static void router_rx_handler(void* context, dx_link_t *link, dx_delivery_t *delivery)
{
dx_router_t *router = (dx_router_t*) context;
- pn_link_t *pn_link = pn_delivery_link(delivery);
+ pn_link_t *pn_link = dx_link_pn(link);
dx_router_link_t *rlink = (dx_router_link_t*) dx_link_get_context(link);
dx_message_t *msg;
int valid_message = 0;
@@ -202,15 +282,27 @@ static void router_rx_handler(void* context, dx_link_t *link, pn_delivery_t *del
// the message in this case.
//
if (rlink->connected_link) {
- dx_router_link_t *clink = rlink->connected_link;
- pn_link_t *pn_outlink = dx_link_pn(clink->link);
- DEQ_INSERT_TAIL(clink->out_fifo, msg);
- sys_mutex_unlock(router->lock);
+ dx_router_link_t *clink = rlink->connected_link;
+ dx_routed_event_t *re = new_dx_routed_event_t();
- pn_link_offered(pn_outlink, DEQ_SIZE(clink->out_fifo));
- dx_link_activate(clink->link);
- sys_mutex_unlock(router->lock);
+ DEQ_ITEM_INIT(re);
+ re->delivery = 0;
+ re->message = msg;
+ re->settle = false;
+ re->disposition = 0;
+ DEQ_INSERT_TAIL(clink->msg_fifo, re);
+ //
+ // If the incoming delivery is settled (pre-settled), don't link it into the routed
+ // event. If it's not settled, link it into the event for later handling.
+ //
+ if (dx_delivery_settled(delivery))
+ dx_delivery_free(delivery, 0);
+ else
+ re->delivery = delivery;
+
+ sys_mutex_unlock(router->lock);
+ dx_link_activate(clink->link);
return;
}
@@ -227,6 +319,8 @@ static void router_rx_handler(void* context, dx_link_t *link, pn_delivery_t *del
if (valid_message) {
dx_field_iterator_t *iter = dx_message_field_iterator(msg, DX_FIELD_TO);
dx_address_t *addr;
+ int fanout = 0;
+
if (iter) {
dx_field_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH);
hash_retrieve(router->out_hash, iter, (void*) &addr);
@@ -261,10 +355,18 @@ static void router_rx_handler(void* context, dx_link_t *link, pn_delivery_t *del
//
dx_router_link_t *dest_link = DEQ_HEAD(addr->rlinks);
while (dest_link) {
- pn_link_t *pn_outlink = dx_link_pn(dest_link->link);
- dx_message_t *copy = dx_message_copy(msg);
- DEQ_INSERT_TAIL(dest_link->out_fifo, copy);
- pn_link_offered(pn_outlink, DEQ_SIZE(dest_link->out_fifo));
+ dx_routed_event_t *re = new_dx_routed_event_t();
+ DEQ_ITEM_INIT(re);
+ re->delivery = 0;
+ re->message = dx_message_copy(msg);
+ re->settle = 0;
+ re->disposition = 0;
+ DEQ_INSERT_TAIL(dest_link->msg_fifo, re);
+
+ fanout++;
+ if (fanout == 1 && !dx_delivery_settled(delivery))
+ re->delivery = delivery;
+
dx_link_activate(dest_link->link);
dest_link = DEQ_NEXT(dest_link);
}
@@ -279,39 +381,41 @@ static void router_rx_handler(void* context, dx_link_t *link, pn_delivery_t *del
else
dest_link = dest_node->peer_link;
if (dest_link) {
- pn_link_t *pn_outlink = dx_link_pn(dest_link->link);
- dx_message_t *copy = dx_message_copy(msg);
- DEQ_INSERT_TAIL(dest_link->out_fifo, copy);
- pn_link_offered(pn_outlink, DEQ_SIZE(dest_link->out_fifo));
+ dx_routed_event_t *re = new_dx_routed_event_t();
+ DEQ_ITEM_INIT(re);
+ re->delivery = 0;
+ re->message = dx_message_copy(msg);
+ re->settle = 0;
+ re->disposition = 0;
+ DEQ_INSERT_TAIL(dest_link->msg_fifo, re);
+
+ fanout++;
+ if (fanout == 1)
+ re->delivery = delivery;
+
dx_link_activate(dest_link->link);
}
dest_node = DEQ_NEXT(dest_node);
}
}
-
- } else {
- //
- // To field contains an unknown address. Release the message.
- //
- // TODO - Undeliverable processing
- pn_delivery_update(delivery, PN_RELEASED);
- pn_delivery_settle(delivery);
}
//
- // Since we are message-routing, there is no end-to-end disposition or
- // settlement. Accept and settle the delivery now.
+ // In message-routing mode, the handling of the incoming delivery depends on the
+ // number of copies of the received message that were forwarded.
//
- pn_delivery_update(delivery, PN_ACCEPTED);
- pn_delivery_settle(delivery);
+ if (handler) {
+ dx_delivery_free(delivery, PN_ACCEPTED);
+ } else if (fanout == 0) {
+ dx_delivery_free(delivery, PN_RELEASED);
+ } else if (fanout > 1)
+ dx_delivery_free(delivery, PN_ACCEPTED);
}
} else {
//
// Message is invalid. Reject the message.
//
- pn_delivery_update(delivery, PN_REJECTED);
- pn_delivery_settle(delivery);
- pn_delivery_set_context(delivery, 0);
+ dx_delivery_free(delivery, PN_REJECTED);
}
sys_mutex_unlock(router->lock);
@@ -328,54 +432,41 @@ static void router_rx_handler(void* context, dx_link_t *link, pn_delivery_t *del
/**
* Delivery Disposition Handler
*/
-static void router_disp_handler(void* context, dx_link_t *link, pn_delivery_t *delivery)
+static void router_disp_handler(void* context, dx_link_t *link, dx_delivery_t *delivery)
{
- pn_link_t *pn_link = pn_delivery_link(delivery);
- //dx_router_link_t *rlink = (dx_router_link_t*) dx_link_get_context(link);
-
- //
- // TODO - Propagate disposition and settlement between deliveries on a link-routed
- // link pair.
- //
- return;
-
- if (pn_link_is_sender(pn_link)) {
- uint64_t disp = pn_delivery_remote_state(delivery);
- dx_message_t *msg = pn_delivery_get_context(delivery);
- pn_delivery_t *activate = 0;
-
- if (msg) {
- assert(delivery == dx_message_out_delivery(msg));
- if (disp != 0) {
- activate = dx_message_in_delivery(msg);
- pn_delivery_update(activate, disp);
- // TODO - handling of the data accompanying RECEIVED/MODIFIED
- }
-
- if (pn_delivery_settled(delivery)) {
- //
- // Downstream delivery has been settled. Propagate the settlement
- // upstream.
- //
- activate = dx_message_in_delivery(msg);
- pn_delivery_settle(activate);
- pn_delivery_settle(delivery);
- dx_free_message(msg);
- }
+ dx_router_t *router = (dx_router_t*) context;
+ bool changed = dx_delivery_disp_changed(delivery);
+ uint64_t disp = dx_delivery_disp(delivery);
+ bool settled = dx_delivery_settled(delivery);
+ dx_delivery_t *peer = dx_delivery_peer(delivery);
- if (activate) {
- //
- // Activate the upstream/incoming link so that the settlement will
- // get pushed out.
- //
- dx_link_t *act_link = (dx_link_t*) pn_link_get_context(pn_delivery_link(activate));
- dx_link_activate(act_link);
- }
-
- return;
+ if (peer) {
+ //
+ // The case where this delivery has a peer.
+ //
+ if (changed || settled) {
+ dx_link_t *peer_link = dx_delivery_link(peer);
+ dx_router_link_t *prl = (dx_router_link_t*) dx_link_get_context(peer_link);
+ dx_routed_event_t *re = new_dx_routed_event_t();
+ DEQ_ITEM_INIT(re);
+ re->delivery = peer;
+ re->message = 0;
+ re->settle = settled;
+ re->disposition = changed ? disp : 0;
+
+ sys_mutex_lock(router->lock);
+ DEQ_INSERT_TAIL(prl->event_fifo, re);
+ sys_mutex_unlock(router->lock);
+
+ dx_link_activate(peer_link);
}
+
} else {
- // TODO - Handle disposition updates from upstream
+ //
+ // The no-peer case. Ignore status changes and echo settlement.
+ //
+ if (settled)
+ dx_delivery_free(delivery, 0);
}
}
@@ -396,7 +487,8 @@ static int router_incoming_link_handler(void* context, dx_link_t *link)
rlink->link = link;
rlink->connected_link = 0;
rlink->peer_link = 0;
- DEQ_INIT(rlink->out_fifo); // Won't be used
+ DEQ_INIT(rlink->event_fifo);
+ DEQ_INIT(rlink->msg_fifo);
dx_link_set_context(link, rlink);
@@ -443,7 +535,8 @@ static int router_outgoing_link_handler(void* context, dx_link_t *link)
rlink->link = link;
rlink->connected_link = 0;
rlink->peer_link = 0;
- DEQ_INIT(rlink->out_fifo);
+ DEQ_INIT(rlink->event_fifo);
+ DEQ_INIT(rlink->msg_fifo);
dx_link_set_context(link, rlink);
@@ -475,38 +568,6 @@ static int router_outgoing_link_handler(void* context, dx_link_t *link)
/**
- * Outgoing Link Writable Handler
- */
-static int router_writable_link_handler(void* context, dx_link_t *link)
-{
- dx_router_t *router = (dx_router_t*) context;
- int grant_delivery = 0;
- pn_delivery_t *delivery;
- dx_router_link_t *rlink = (dx_router_link_t*) dx_link_get_context(link);
- pn_link_t *pn_link = dx_link_pn(link);
- uint64_t tag;
-
- sys_mutex_lock(router->lock);
- if (DEQ_SIZE(rlink->out_fifo) > 0) {
- grant_delivery = 1;
- tag = router->dtag++;
- }
- sys_mutex_unlock(router->lock);
-
- if (grant_delivery) {
- pn_delivery(pn_link, pn_dtag((char*) &tag, 8));
- delivery = pn_link_current(pn_link);
- if (delivery) {
- router_tx_handler(context, link, delivery);
- return 1;
- }
- }
-
- return 0;
-}
-
-
-/**
* Link Detached Handler
*/
static int router_link_detach_handler(void* context, dx_link_t *link, int closed)
@@ -539,8 +600,10 @@ static int router_link_detach_handler(void* context, dx_link_t *link, int closed
dx_field_iterator_free(iter);
}
}
- } else
+ } else {
DEQ_REMOVE(router->in_links, rlink);
+ free_dx_router_link_t(rlink);
+ }
sys_mutex_unlock(router->lock);
return 0;
@@ -580,7 +643,8 @@ static void router_outbound_open_handler(void *type_context, dx_connection_t *co
rlink->link = receiver;
rlink->connected_link = 0;
rlink->peer_link = 0;
- DEQ_INIT(rlink->out_fifo); // Won't be used
+ DEQ_INIT(rlink->event_fifo);
+ DEQ_INIT(rlink->msg_fifo);
dx_link_set_context(receiver, rlink);
@@ -604,7 +668,8 @@ static void router_outbound_open_handler(void *type_context, dx_connection_t *co
rlink->link = sender;
rlink->connected_link = 0;
rlink->peer_link = 0;
- DEQ_INIT(rlink->out_fifo);
+ DEQ_INIT(rlink->event_fifo);
+ DEQ_INIT(rlink->msg_fifo);
dx_link_set_context(sender, rlink);
@@ -647,7 +712,6 @@ static void dx_router_timer_handler(void *context)
static dx_node_type_t router_node = {"router", 0, 0,
router_rx_handler,
- router_tx_handler,
router_disp_handler,
router_incoming_link_handler,
router_outgoing_link_handler,
@@ -779,10 +843,14 @@ void dx_router_send(dx_dispatch_t *dx,
//
dx_router_link_t *dest_link = DEQ_HEAD(addr->rlinks);
while (dest_link) {
- pn_link_t *pn_outlink = dx_link_pn(dest_link->link);
- dx_message_t *copy = dx_message_copy(msg);
- DEQ_INSERT_TAIL(dest_link->out_fifo, copy);
- pn_link_offered(pn_outlink, DEQ_SIZE(dest_link->out_fifo));
+ dx_routed_event_t *re = new_dx_routed_event_t();
+ DEQ_ITEM_INIT(re);
+ re->delivery = 0;
+ re->message = dx_message_copy(msg);
+ re->settle = 0;
+ re->disposition = 0;
+ DEQ_INSERT_TAIL(dest_link->msg_fifo, re);
+
dx_link_activate(dest_link->link);
dest_link = DEQ_NEXT(dest_link);
}
@@ -797,10 +865,13 @@ void dx_router_send(dx_dispatch_t *dx,
else
dest_link = dest_node->peer_link;
if (dest_link) {
- pn_link_t *pn_outlink = dx_link_pn(dest_link->link);
- dx_message_t *copy = dx_message_copy(msg);
- DEQ_INSERT_TAIL(dest_link->out_fifo, copy);
- pn_link_offered(pn_outlink, DEQ_SIZE(dest_link->out_fifo));
+ dx_routed_event_t *re = new_dx_routed_event_t();
+ DEQ_ITEM_INIT(re);
+ re->delivery = 0;
+ re->message = dx_message_copy(msg);
+ re->settle = 0;
+ re->disposition = 0;
+ DEQ_INSERT_TAIL(dest_link->msg_fifo, re);
dx_link_activate(dest_link->link);
}
dest_node = DEQ_NEXT(dest_node);