summaryrefslogtreecommitdiff
path: root/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/Client.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/Client.java')
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/Client.java208
1 files changed, 208 insertions, 0 deletions
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/Client.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/Client.java
new file mode 100644
index 0000000000..1d2d862301
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/Client.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.disttest.client;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.naming.NamingException;
+
+import org.apache.commons.lang.builder.ToStringBuilder;
+import org.apache.commons.lang.builder.ToStringStyle;
+import org.apache.qpid.disttest.DistributedTestException;
+import org.apache.qpid.disttest.Visitor;
+import org.apache.qpid.disttest.jms.ClientJmsDelegate;
+import org.apache.qpid.disttest.message.Command;
+import org.apache.qpid.disttest.message.ParticipantResult;
+import org.apache.qpid.disttest.message.Response;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Client
+{
+ private static final Logger LOGGER = LoggerFactory.getLogger(Client.class);
+
+ private final ClientJmsDelegate _clientJmsDelegate;
+
+ private final CountDownLatch _latch = new CountDownLatch(1);
+ private Visitor _visitor;
+ private final AtomicReference<ClientState> _state;
+ private ParticipantExecutorRegistry _participantRegistry = new ParticipantExecutorRegistry();
+
+ public Client(final ClientJmsDelegate delegate) throws NamingException
+ {
+ _clientJmsDelegate = delegate;
+ _state = new AtomicReference<ClientState>(ClientState.CREATED);
+ _visitor = new ClientCommandVisitor(this, _clientJmsDelegate);
+ }
+
+ /**
+ * Register with the controller
+ */
+ public void start()
+ {
+ _clientJmsDelegate.setInstructionListener(this);
+ _clientJmsDelegate.sendRegistrationMessage();
+ _state.set(ClientState.READY);
+ }
+
+ public void stop()
+ {
+ _state.set(ClientState.STOPPED);
+ _latch.countDown();
+ }
+
+ public void addParticipantExecutor(final ParticipantExecutor participant)
+ {
+ _participantRegistry.add(participant);
+ }
+
+ public void waitUntilStopped()
+ {
+ waitUntilStopped(0);
+ }
+
+ public void waitUntilStopped(final long timeout)
+ {
+ try
+ {
+ if (timeout == 0)
+ {
+ _latch.await();
+ }
+ else
+ {
+ _latch.await(timeout, TimeUnit.MILLISECONDS);
+ }
+ }
+ catch (final InterruptedException ie)
+ {
+ Thread.currentThread().interrupt();
+ }
+
+ _clientJmsDelegate.destroy();
+ }
+
+ public void processInstruction(final Command command)
+ {
+ if (LOGGER.isInfoEnabled())
+ {
+ LOGGER.info("Client " + getClientName() + " received command: " + command);
+ }
+ String responseMessage = null;
+ try
+ {
+ command.accept(_visitor);
+ }
+ catch (final Exception e)
+ {
+ LOGGER.error("Error processing instruction", e);
+ responseMessage = e.getMessage();
+ }
+ finally
+ {
+ _clientJmsDelegate.sendResponseMessage(new Response(_clientJmsDelegate.getClientName(), command.getType(), responseMessage));
+ }
+ }
+
+ public ClientState getState()
+ {
+ return _state.get();
+ }
+
+ public String getClientName()
+ {
+ return _clientJmsDelegate.getClientName();
+ }
+
+ public void setClientCommandVisitor(final ClientCommandVisitor visitor)
+ {
+ _visitor = visitor;
+ }
+
+ public void startTest()
+ {
+ if (_state.compareAndSet(ClientState.READY, ClientState.RUNNING_TEST))
+ {
+ try
+ {
+ _clientJmsDelegate.startConnections();
+ for (final ParticipantExecutor executor : _participantRegistry.executors())
+ {
+ executor.start(this);
+ }
+ }
+ catch (final Exception e)
+ {
+ try
+ {
+ tearDownTest();
+ }
+ catch (final Exception e2)
+ {
+ // ignore
+ }
+ throw new DistributedTestException("Error starting test: " + _clientJmsDelegate.getClientName(), e);
+ }
+ }
+ else
+ {
+ throw new DistributedTestException("Client '" + _clientJmsDelegate.getClientName()
+ + "' is not in READY state:" + _state.get());
+ }
+ }
+
+ public void tearDownTest()
+ {
+ if (_state.compareAndSet(ClientState.RUNNING_TEST, ClientState.READY))
+ {
+ LOGGER.info("Tearing down test on client: " + _clientJmsDelegate.getClientName());
+
+ _clientJmsDelegate.closeTestConnections();
+ }
+ else
+ {
+ throw new DistributedTestException("Client '" + _clientJmsDelegate.getClientName() + "' is not in RUNNING_TEST state! Ignoring tearDownTest");
+ }
+
+
+ _participantRegistry.clear();
+ }
+
+ public void sendResults(ParticipantResult testResult)
+ {
+ _clientJmsDelegate.sendResponseMessage(testResult);
+ LOGGER.info("Sent test results " + testResult);
+ }
+
+ @Override
+ public String toString()
+ {
+ return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
+ .append("clientJmsDelegate", _clientJmsDelegate).toString();
+ }
+
+ void setParticipantRegistry(ParticipantExecutorRegistry participantRegistry)
+ {
+ _participantRegistry = participantRegistry;
+ }
+
+}