summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorBhupendra Bhusman Bhardwaj <bhupendrab@apache.org>2007-01-15 09:39:38 +0000
committerBhupendra Bhusman Bhardwaj <bhupendrab@apache.org>2007-01-15 09:39:38 +0000
commit6f5675f4963b6661048d19036ccd3d0e789d36cf (patch)
tree6c729977451b54d1d595018a61db2941118a792b /java
parentbd98b98b0dc2f15563a280967f9cd907bc7aa7c4 (diff)
downloadqpid-python-6f5675f4963b6661048d19036ccd3d0e789d36cf.tar.gz
checking for the AMQ MessageID of received message in ServiceRequestingClient.java
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@496260 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceProvidingClient.java14
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java49
-rw-r--r--java/perftests/src/main/java/perftests.log4j2
3 files changed, 33 insertions, 32 deletions
diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceProvidingClient.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceProvidingClient.java
index 00528c3a5d..bab732e2a6 100644
--- a/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceProvidingClient.java
+++ b/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceProvidingClient.java
@@ -35,7 +35,7 @@ import java.net.UnknownHostException;
public class ServiceProvidingClient
{
private static final Logger _logger = Logger.getLogger(ServiceProvidingClient.class);
- private static final String MESSAGE_IDENTIFIER = "MessageIdentifier";
+
private MessageProducer _destinationProducer;
private Destination _responseDest;
@@ -57,8 +57,7 @@ public class ServiceProvidingClient
_logger.info("Delivery Mode: " + (deliveryMode == DeliveryMode.NON_PERSISTENT ? "Non Persistent" : "Persistent")
+ "\t isTransactional: " + _isTransactional);
- _connection = new AMQConnection(brokerDetails, username, password,
- clientName, virtualPath);
+ _connection = new AMQConnection(brokerDetails, username, password, clientName, virtualPath);
_connection.setConnectionListener(new ConnectionListener()
{
@@ -145,11 +144,6 @@ public class ServiceProvidingClient
_logger.info("timeSent value is: " + timesent);
msg.setLongProperty("timeSent", timesent);
}
- // this identifier set in the serviceRequestingClient is used to match the response with the request
- if (tm.propertyExists(MESSAGE_IDENTIFIER))
- {
- msg.setIntProperty(MESSAGE_IDENTIFIER, tm.getIntProperty(MESSAGE_IDENTIFIER));
- }
_destinationProducer.send(msg);
@@ -200,7 +194,6 @@ public class ServiceProvidingClient
_logger.error("Error: " + e, e);
}
-
int deliveryMode = DeliveryMode.NON_PERSISTENT;
boolean transactedMode = false;
@@ -237,9 +230,6 @@ public class ServiceProvidingClient
{
_logger.error("Error: " + e, e);
}
-
-
}
-
}
diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java
index 2c1c0ecff6..7d4b5eb24f 100644
--- a/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java
+++ b/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java
@@ -26,6 +26,7 @@ import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.message.TestMessageFactory;
+import org.apache.qpid.client.message.JMSTextMessage;
import org.apache.qpid.jms.MessageConsumer;
import org.apache.qpid.jms.MessageProducer;
import org.apache.qpid.jms.Session;
@@ -47,8 +48,7 @@ public class ServiceRequestingClient implements ExceptionListener
{
private static final Logger _log = Logger.getLogger(ServiceRequestingClient.class);
- private static final String MESSAGE_IDENTIFIER = "MessageIdentifier";
- private static int _messageIdentifier = 0;
+ private long _messageIdentifier = 0;
private String MESSAGE_DATA;
private AMQConnection _connection;
@@ -108,10 +108,6 @@ public class ServiceRequestingClient implements ExceptionListener
_log.info("Average latency now: " + _averageLatency);
}
}
- if (m.propertyExists(MESSAGE_IDENTIFIER))
- {
- _log.info("Received Message Identifier: " + m.getIntProperty(MESSAGE_IDENTIFIER));
- }
if(_isTransactional)
{
_session.commit();
@@ -127,6 +123,7 @@ public class ServiceRequestingClient implements ExceptionListener
_log.info("Received message count: " + _actualMessageCount);
}
+ checkForMessageID(m);
if (_actualMessageCount == _expectedMessageCount)
{
_completed = true;
@@ -149,6 +146,30 @@ public class ServiceRequestingClient implements ExceptionListener
}
}
+ /**
+ * Checks if the received AMQ Message ID(delivery tag) is in sequence, by comparing it with the AMQ MessageID
+ * of previous message.
+ * @param receivedMsg
+ */
+ private void checkForMessageID(Message receivedMsg)
+ {
+ try
+ {
+ JMSTextMessage msg = (JMSTextMessage)receivedMsg;
+ if (! (msg.getDeliveryTag() == _messageIdentifier + 1))
+ {
+ _log.info("Out of sequence message received. Previous AMQ MessageID= " + _messageIdentifier +
+ ", Received AMQ messageID= " + receivedMsg.getJMSMessageID());
+ }
+ _messageIdentifier = msg.getDeliveryTag();
+ }
+ catch (Exception ex)
+ {
+ _log.error("Error in checking messageID ", ex);
+ }
+
+ }
+
private void notifyWaiter()
{
if (_waiter != null)
@@ -178,10 +199,8 @@ public class ServiceRequestingClient implements ExceptionListener
_session = (Session) _connection.createSession(_isTransactional, Session.AUTO_ACKNOWLEDGE);
_producerSession = (Session) _connection.createSession(_isTransactional, Session.AUTO_ACKNOWLEDGE);
-
_connection.setExceptionListener(this);
-
AMQQueue destination = new AMQQueue(commandQueueName);
_producer = (MessageProducer) _producerSession.createProducer(destination);
_producer.setDisableMessageTimestamp(true);
@@ -195,7 +214,7 @@ public class ServiceRequestingClient implements ExceptionListener
//Send first message, then wait a bit to allow the provider to get initialised
TextMessage first = _session.createTextMessage(MESSAGE_DATA);
first.setJMSReplyTo(_tempDestination);
- send(first);
+ _producer.send(first);
if (_isTransactional)
{
_producerSession.commit();
@@ -219,13 +238,6 @@ public class ServiceRequestingClient implements ExceptionListener
}
}
- private void send(TextMessage msg) throws JMSException
- {
- msg.setIntProperty(MESSAGE_IDENTIFIER, ++_messageIdentifier);
- _producer.send(msg);
- _log.info("Sent Message Identifier: " + _messageIdentifier);
- }
-
/**
* Run the test and notify an object upon receipt of all responses.
*
@@ -245,7 +257,7 @@ public class ServiceRequestingClient implements ExceptionListener
long timeNow = System.currentTimeMillis();
msg.setLongProperty("timeSent", timeNow);
}
- send(msg);
+ _producer.send(msg);
if (_isTransactional)
{
_producerSession.commit();
@@ -263,8 +275,7 @@ public class ServiceRequestingClient implements ExceptionListener
private void createConnection(String brokerHosts, String clientID, String username, String password,
String vpath) throws AMQException, URLSyntaxException
{
- _connection = new AMQConnection(brokerHosts, username, password,
- clientID, vpath);
+ _connection = new AMQConnection(brokerHosts, username, password, clientID, vpath);
}
/**
diff --git a/java/perftests/src/main/java/perftests.log4j b/java/perftests/src/main/java/perftests.log4j
index d5196b4c79..f28c647e24 100644
--- a/java/perftests/src/main/java/perftests.log4j
+++ b/java/perftests/src/main/java/perftests.log4j
@@ -22,7 +22,7 @@ log4j.rootLogger=${root.logging.level}
log4j.logger.org.apache.qpid=${amqj.logging.level}, console
log4j.additivity.org.apache.qpid=false
-log4j.logger.org.apache.qpid.requestreply=${amqj.test.logging.level}, fileApp
+log4j.logger.org.apache.qpid.requestreply=${amqj.test.logging.level}
log4j.logger.org.apache.qpid.pingpong=${amqj.test.logging.level}
log4j.logger.org.apache.qpid.topic=${amqj.test.logging.level}