summaryrefslogtreecommitdiff
path: root/java/cluster/src/test/java/java/org/apache/qpid/server/cluster/BrokerTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/cluster/src/test/java/java/org/apache/qpid/server/cluster/BrokerTest.java')
-rw-r--r--java/cluster/src/test/java/java/org/apache/qpid/server/cluster/BrokerTest.java231
1 files changed, 231 insertions, 0 deletions
diff --git a/java/cluster/src/test/java/java/org/apache/qpid/server/cluster/BrokerTest.java b/java/cluster/src/test/java/java/org/apache/qpid/server/cluster/BrokerTest.java
new file mode 100644
index 0000000000..d7ede7d3e0
--- /dev/null
+++ b/java/cluster/src/test/java/java/org/apache/qpid/server/cluster/BrokerTest.java
@@ -0,0 +1,231 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster;
+
+import junit.framework.TestCase;
+import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQBody;
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.AMQFrameDecodingException;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.server.cluster.policy.StandardPolicies;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class BrokerTest extends TestCase
+{
+ //group request (no failure)
+ public void testGroupRequest_noFailure() throws AMQException
+ {
+ RecordingBroker[] brokers = new RecordingBroker[]{
+ new RecordingBroker("A", 1),
+ new RecordingBroker("B", 2),
+ new RecordingBroker("C", 3)
+ };
+ GroupResponseValidator handler = new GroupResponseValidator(new TestMethod("response"), new ArrayList<Member>(Arrays.asList(brokers)));
+ GroupRequest grpRequest = new GroupRequest(new SimpleSendable(new TestMethod("request")), StandardPolicies.SYNCH_POLICY, handler);
+ for (Broker b : brokers)
+ {
+ b.invoke(grpRequest);
+ }
+ grpRequest.finishedSend();
+
+ for (RecordingBroker b : brokers)
+ {
+ b.handleResponse(((AMQFrame) b.getMessages().get(0)).channel, new TestMethod("response"));
+ }
+
+ assertTrue("Handler did not receive response", handler.isCompleted());
+ }
+
+ //group request (failure)
+ public void testGroupRequest_failure() throws AMQException
+ {
+ RecordingBroker a = new RecordingBroker("A", 1);
+ RecordingBroker b = new RecordingBroker("B", 2);
+ RecordingBroker c = new RecordingBroker("C", 3);
+ RecordingBroker[] all = new RecordingBroker[]{a, b, c};
+ RecordingBroker[] succeeded = new RecordingBroker[]{a, c};
+
+ GroupResponseValidator handler = new GroupResponseValidator(new TestMethod("response"), new ArrayList<Member>(Arrays.asList(succeeded)));
+ GroupRequest grpRequest = new GroupRequest(new SimpleSendable(new TestMethod("request")), StandardPolicies.SYNCH_POLICY, handler);
+
+ for (Broker broker : all)
+ {
+ broker.invoke(grpRequest);
+ }
+ grpRequest.finishedSend();
+
+ for (RecordingBroker broker : succeeded)
+ {
+ broker.handleResponse(((AMQFrame) broker.getMessages().get(0)).channel, new TestMethod("response"));
+ }
+ b.remove();
+
+ assertTrue("Handler did not receive response", handler.isCompleted());
+ }
+
+
+ //simple send (no response)
+ public void testSend_noResponse() throws AMQException
+ {
+ AMQBody[] msgs = new AMQBody[]{
+ new TestMethod("A"),
+ new TestMethod("B"),
+ new TestMethod("C")
+ };
+ RecordingBroker broker = new RecordingBroker("myhost", 1);
+ for (AMQBody msg : msgs)
+ {
+ broker.send(new SimpleSendable(msg), null);
+ }
+ List<AMQDataBlock> sent = broker.getMessages();
+ assertEquals(msgs.length, sent.size());
+ for (int i = 0; i < msgs.length; i++)
+ {
+ assertTrue(sent.get(i) instanceof AMQFrame);
+ assertEquals(msgs[i], ((AMQFrame) sent.get(i)).bodyFrame);
+ }
+ }
+
+ //simple send (no failure)
+ public void testSend_noFailure() throws AMQException
+ {
+ RecordingBroker broker = new RecordingBroker("myhost", 1);
+ BlockingHandler handler = new BlockingHandler();
+ broker.send(new SimpleSendable(new TestMethod("A")), handler);
+ List<AMQDataBlock> sent = broker.getMessages();
+ assertEquals(1, sent.size());
+ assertTrue(sent.get(0) instanceof AMQFrame);
+ assertEquals(new TestMethod("A"), ((AMQFrame) sent.get(0)).bodyFrame);
+
+ broker.handleResponse(((AMQFrame) sent.get(0)).channel, new TestMethod("B"));
+
+ assertEquals(new TestMethod("B"), handler.getResponse());
+ }
+
+ //simple send (failure)
+ public void testSend_failure() throws AMQException
+ {
+ RecordingBroker broker = new RecordingBroker("myhost", 1);
+ BlockingHandler handler = new BlockingHandler();
+ broker.send(new SimpleSendable(new TestMethod("A")), handler);
+ List<AMQDataBlock> sent = broker.getMessages();
+ assertEquals(1, sent.size());
+ assertTrue(sent.get(0) instanceof AMQFrame);
+ assertEquals(new TestMethod("A"), ((AMQFrame) sent.get(0)).bodyFrame);
+ broker.remove();
+ assertEquals(null, handler.getResponse());
+ assertTrue(handler.isCompleted());
+ assertTrue(handler.failed());
+ }
+
+ private static class TestMethod extends AMQMethodBody
+ {
+ private final Object id;
+
+ TestMethod(Object id)
+ {
+ this.id = id;
+ }
+
+ protected int getBodySize()
+ {
+ return 0;
+ }
+
+ protected int getClazz()
+ {
+ return 1002;
+ }
+
+ protected int getMethod()
+ {
+ return 1003;
+ }
+
+ protected void writeMethodPayload(ByteBuffer buffer)
+ {
+ }
+
+ protected byte getType()
+ {
+ return 0;
+ }
+
+ protected int getSize()
+ {
+ return 0;
+ }
+
+ protected void writePayload(ByteBuffer buffer)
+ {
+ }
+
+ protected void populateMethodBodyFromBuffer(ByteBuffer buffer) throws AMQFrameDecodingException
+ {
+ }
+
+ protected void populateFromBuffer(ByteBuffer buffer, long size) throws AMQFrameDecodingException
+ {
+ }
+
+ public boolean equals(Object o)
+ {
+ return o instanceof TestMethod && id.equals(((TestMethod) o).id);
+ }
+
+ public int hashCode()
+ {
+ return id.hashCode();
+ }
+
+ }
+
+ private static class GroupResponseValidator implements GroupResponseHandler
+ {
+ private final AMQMethodBody _response;
+ private final List<Member> _members;
+ private boolean _completed = false;
+
+ GroupResponseValidator(AMQMethodBody response, List<Member> members)
+ {
+ _response = response;
+ _members = members;
+ }
+
+ public void response(List<AMQMethodBody> responses, List<Member> members)
+ {
+ for (AMQMethodBody r : responses)
+ {
+ assertEquals(_response, r);
+ }
+ assertEquals(_members, members);
+ _completed = true;
+ }
+
+ boolean isCompleted()
+ {
+ return _completed;
+ }
+ }
+}