summaryrefslogtreecommitdiff
path: root/extras
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2013-10-08 20:33:27 +0000
committerTed Ross <tross@apache.org>2013-10-08 20:33:27 +0000
commite730b6b64b77365702cb6ebf81d94e27142da182 (patch)
tree5e83a84564821a6186461221438ef1cf034ad211 /extras
parent81b2d98bdd80db979df3757591c5f5c2a8c4fbf7 (diff)
downloadqpid-python-e730b6b64b77365702cb6ebf81d94e27142da182.tar.gz
QPID-5218 - Fixed crash caused by fanned-out non-presettled messages.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1530415 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'extras')
-rw-r--r--extras/dispatch/src/router_node.c5
-rw-r--r--extras/dispatch/tests/system_tests_one_router.py59
2 files changed, 61 insertions, 3 deletions
diff --git a/extras/dispatch/src/router_node.c b/extras/dispatch/src/router_node.c
index de69e21605..8efbf6a2af 100644
--- a/extras/dispatch/src/router_node.c
+++ b/extras/dispatch/src/router_node.c
@@ -547,7 +547,7 @@ static void router_rx_handler(void* context, dx_link_t *link, dx_delivery_t *del
DEQ_INSERT_TAIL(dest_link->msg_fifo, re);
fanout++;
- if (fanout == 1)
+ if (fanout == 1 && !dx_delivery_settled(delivery))
re->delivery = delivery;
addr->deliveries_transit++;
@@ -569,8 +569,7 @@ static void router_rx_handler(void* context, dx_link_t *link, dx_delivery_t *del
dx_delivery_free(delivery, PN_ACCEPTED);
} else if (fanout == 0) {
dx_delivery_free(delivery, PN_RELEASED);
- } else if (fanout > 1)
- dx_delivery_free(delivery, PN_ACCEPTED);
+ }
}
} else {
//
diff --git a/extras/dispatch/tests/system_tests_one_router.py b/extras/dispatch/tests/system_tests_one_router.py
index 8d486e2b85..28ad9a98d7 100644
--- a/extras/dispatch/tests/system_tests_one_router.py
+++ b/extras/dispatch/tests/system_tests_one_router.py
@@ -138,6 +138,65 @@ class RouterTest(unittest.TestCase):
M4.stop()
+ def test_2a_multicast_unsettled(self):
+ addr = "amqp://0.0.0.0:20000/pre_settled/multicast/1"
+ M1 = Messenger()
+ M2 = Messenger()
+ M3 = Messenger()
+ M4 = Messenger()
+
+ M1.timeout = 1.0
+ M2.timeout = 1.0
+ M3.timeout = 1.0
+ M4.timeout = 1.0
+
+ M1.outgoing_window = 5
+ M2.incoming_window = 5
+ M3.incoming_window = 5
+ M4.incoming_window = 5
+
+ M1.start()
+ M2.start()
+ M3.start()
+ M4.start()
+ self.subscribe(M2, addr)
+ self.subscribe(M3, addr)
+ self.subscribe(M4, addr)
+
+ tm = Message()
+ rm = Message()
+
+ tm.address = addr
+ for i in range(2):
+ tm.body = {'number': i}
+ M1.put(tm)
+ M1.send(0)
+
+ for i in range(2):
+ M2.recv(1)
+ trk = M2.get(rm)
+ M2.accept(trk)
+ M2.settle(trk)
+ self.assertEqual(i, rm.body['number'])
+
+ M3.recv(1)
+ trk = M3.get(rm)
+ M3.accept(trk)
+ M3.settle(trk)
+ self.assertEqual(i, rm.body['number'])
+
+ M4.recv(1)
+ trk = M4.get(rm)
+ M4.accept(trk)
+ M4.settle(trk)
+ self.assertEqual(i, rm.body['number'])
+
+ M1.stop()
+ M2.stop()
+ M3.stop()
+ M4.stop()
+
+
def test_3_propagated_disposition(self):
addr = "amqp://0.0.0.0:20000/unsettled/1"
M1 = Messenger()