diff options
author | Bhupendra Bhusman Bhardwaj <bhupendrab@apache.org> | 2007-01-15 09:39:38 +0000 |
---|---|---|
committer | Bhupendra Bhusman Bhardwaj <bhupendrab@apache.org> | 2007-01-15 09:39:38 +0000 |
commit | 6f5675f4963b6661048d19036ccd3d0e789d36cf (patch) | |
tree | 6c729977451b54d1d595018a61db2941118a792b /java | |
parent | bd98b98b0dc2f15563a280967f9cd907bc7aa7c4 (diff) | |
download | qpid-python-6f5675f4963b6661048d19036ccd3d0e789d36cf.tar.gz |
checking for the AMQ MessageID of received message in ServiceRequestingClient.java
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@496260 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
3 files changed, 33 insertions, 32 deletions
diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceProvidingClient.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceProvidingClient.java index 00528c3a5d..bab732e2a6 100644 --- a/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceProvidingClient.java +++ b/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceProvidingClient.java @@ -35,7 +35,7 @@ import java.net.UnknownHostException; public class ServiceProvidingClient { private static final Logger _logger = Logger.getLogger(ServiceProvidingClient.class); - private static final String MESSAGE_IDENTIFIER = "MessageIdentifier"; + private MessageProducer _destinationProducer; private Destination _responseDest; @@ -57,8 +57,7 @@ public class ServiceProvidingClient _logger.info("Delivery Mode: " + (deliveryMode == DeliveryMode.NON_PERSISTENT ? "Non Persistent" : "Persistent") + "\t isTransactional: " + _isTransactional); - _connection = new AMQConnection(brokerDetails, username, password, - clientName, virtualPath); + _connection = new AMQConnection(brokerDetails, username, password, clientName, virtualPath); _connection.setConnectionListener(new ConnectionListener() { @@ -145,11 +144,6 @@ public class ServiceProvidingClient _logger.info("timeSent value is: " + timesent); msg.setLongProperty("timeSent", timesent); } - // this identifier set in the serviceRequestingClient is used to match the response with the request - if (tm.propertyExists(MESSAGE_IDENTIFIER)) - { - msg.setIntProperty(MESSAGE_IDENTIFIER, tm.getIntProperty(MESSAGE_IDENTIFIER)); - } _destinationProducer.send(msg); @@ -200,7 +194,6 @@ public class ServiceProvidingClient _logger.error("Error: " + e, e); } - int deliveryMode = DeliveryMode.NON_PERSISTENT; boolean transactedMode = false; @@ -237,9 +230,6 @@ public class ServiceProvidingClient { _logger.error("Error: " + e, e); } - - } - } diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java index 2c1c0ecff6..7d4b5eb24f 100644 --- a/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java +++ b/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java @@ -26,6 +26,7 @@ import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.message.TestMessageFactory; +import org.apache.qpid.client.message.JMSTextMessage; import org.apache.qpid.jms.MessageConsumer; import org.apache.qpid.jms.MessageProducer; import org.apache.qpid.jms.Session; @@ -47,8 +48,7 @@ public class ServiceRequestingClient implements ExceptionListener { private static final Logger _log = Logger.getLogger(ServiceRequestingClient.class); - private static final String MESSAGE_IDENTIFIER = "MessageIdentifier"; - private static int _messageIdentifier = 0; + private long _messageIdentifier = 0; private String MESSAGE_DATA; private AMQConnection _connection; @@ -108,10 +108,6 @@ public class ServiceRequestingClient implements ExceptionListener _log.info("Average latency now: " + _averageLatency); } } - if (m.propertyExists(MESSAGE_IDENTIFIER)) - { - _log.info("Received Message Identifier: " + m.getIntProperty(MESSAGE_IDENTIFIER)); - } if(_isTransactional) { _session.commit(); @@ -127,6 +123,7 @@ public class ServiceRequestingClient implements ExceptionListener _log.info("Received message count: " + _actualMessageCount); } + checkForMessageID(m); if (_actualMessageCount == _expectedMessageCount) { _completed = true; @@ -149,6 +146,30 @@ public class ServiceRequestingClient implements ExceptionListener } } + /** + * Checks if the received AMQ Message ID(delivery tag) is in sequence, by comparing it with the AMQ MessageID + * of previous message. + * @param receivedMsg + */ + private void checkForMessageID(Message receivedMsg) + { + try + { + JMSTextMessage msg = (JMSTextMessage)receivedMsg; + if (! (msg.getDeliveryTag() == _messageIdentifier + 1)) + { + _log.info("Out of sequence message received. Previous AMQ MessageID= " + _messageIdentifier + + ", Received AMQ messageID= " + receivedMsg.getJMSMessageID()); + } + _messageIdentifier = msg.getDeliveryTag(); + } + catch (Exception ex) + { + _log.error("Error in checking messageID ", ex); + } + + } + private void notifyWaiter() { if (_waiter != null) @@ -178,10 +199,8 @@ public class ServiceRequestingClient implements ExceptionListener _session = (Session) _connection.createSession(_isTransactional, Session.AUTO_ACKNOWLEDGE); _producerSession = (Session) _connection.createSession(_isTransactional, Session.AUTO_ACKNOWLEDGE); - _connection.setExceptionListener(this); - AMQQueue destination = new AMQQueue(commandQueueName); _producer = (MessageProducer) _producerSession.createProducer(destination); _producer.setDisableMessageTimestamp(true); @@ -195,7 +214,7 @@ public class ServiceRequestingClient implements ExceptionListener //Send first message, then wait a bit to allow the provider to get initialised TextMessage first = _session.createTextMessage(MESSAGE_DATA); first.setJMSReplyTo(_tempDestination); - send(first); + _producer.send(first); if (_isTransactional) { _producerSession.commit(); @@ -219,13 +238,6 @@ public class ServiceRequestingClient implements ExceptionListener } } - private void send(TextMessage msg) throws JMSException - { - msg.setIntProperty(MESSAGE_IDENTIFIER, ++_messageIdentifier); - _producer.send(msg); - _log.info("Sent Message Identifier: " + _messageIdentifier); - } - /** * Run the test and notify an object upon receipt of all responses. * @@ -245,7 +257,7 @@ public class ServiceRequestingClient implements ExceptionListener long timeNow = System.currentTimeMillis(); msg.setLongProperty("timeSent", timeNow); } - send(msg); + _producer.send(msg); if (_isTransactional) { _producerSession.commit(); @@ -263,8 +275,7 @@ public class ServiceRequestingClient implements ExceptionListener private void createConnection(String brokerHosts, String clientID, String username, String password, String vpath) throws AMQException, URLSyntaxException { - _connection = new AMQConnection(brokerHosts, username, password, - clientID, vpath); + _connection = new AMQConnection(brokerHosts, username, password, clientID, vpath); } /** diff --git a/java/perftests/src/main/java/perftests.log4j b/java/perftests/src/main/java/perftests.log4j index d5196b4c79..f28c647e24 100644 --- a/java/perftests/src/main/java/perftests.log4j +++ b/java/perftests/src/main/java/perftests.log4j @@ -22,7 +22,7 @@ log4j.rootLogger=${root.logging.level} log4j.logger.org.apache.qpid=${amqj.logging.level}, console
log4j.additivity.org.apache.qpid=false
-log4j.logger.org.apache.qpid.requestreply=${amqj.test.logging.level}, fileApp
+log4j.logger.org.apache.qpid.requestreply=${amqj.test.logging.level}
log4j.logger.org.apache.qpid.pingpong=${amqj.test.logging.level}
log4j.logger.org.apache.qpid.topic=${amqj.test.logging.level}
|