summaryrefslogtreecommitdiff
path: root/java/java/cluster/src/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/java/cluster/src/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java')
-rw-r--r--java/java/cluster/src/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java193
1 files changed, 193 insertions, 0 deletions
diff --git a/java/java/cluster/src/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java b/java/java/cluster/src/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java
new file mode 100644
index 0000000000..6e7efb3659
--- /dev/null
+++ b/java/java/cluster/src/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java
@@ -0,0 +1,193 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.cluster;
+
+import org.apache.log4j.Logger;
+import org.apache.mina.common.IoSession;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.codec.AMQCodecFactory;
+import org.apache.qpid.framing.AMQBody;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.ConnectionOpenBody;
+import org.apache.qpid.framing.ConnectionSecureOkBody;
+import org.apache.qpid.framing.ConnectionStartOkBody;
+import org.apache.qpid.framing.ConnectionTuneOkBody;
+import org.apache.qpid.framing.ClusterMembershipBody;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.protocol.AMQPFastProtocolHandler;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.cluster.util.LogMessage;
+
+import java.net.InetSocketAddress;
+
+public class ClusteredProtocolHandler extends AMQPFastProtocolHandler implements InductionBuffer.MessageHandler
+{
+ private static final Logger _logger = Logger.getLogger(ClusteredProtocolHandler.class);
+ private final InductionBuffer _peerBuffer = new InductionBuffer(this);
+ private final InductionBuffer _clientBuffer = new InductionBuffer(this);
+ private final GroupManager _groupMgr;
+ private final ServerHandlerRegistry _handlers;
+
+ public ClusteredProtocolHandler(InetSocketAddress address)
+ {
+ this(ApplicationRegistry.getInstance(), address);
+ }
+
+ public ClusteredProtocolHandler(IApplicationRegistry registry, InetSocketAddress address)
+ {
+ this(registry.getQueueRegistry(), registry.getExchangeRegistry(), address);
+ }
+
+ public ClusteredProtocolHandler(QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry, InetSocketAddress address)
+ {
+ super(queueRegistry, exchangeRegistry);
+ ClusterBuilder builder = new ClusterBuilder(address);
+ _groupMgr = builder.getGroupManager();
+ _handlers = builder.getHandlerRegistry();
+ }
+
+ public ClusteredProtocolHandler(ClusteredProtocolHandler handler)
+ {
+ super(handler);
+ _groupMgr = handler._groupMgr;
+ _handlers = handler._handlers;
+ }
+
+ protected void createSession(IoSession session, QueueRegistry queues, ExchangeRegistry exchanges, AMQCodecFactory codec) throws AMQException
+ {
+ new ClusteredProtocolSession(session, queues, exchanges, codec, new ServerHandlerRegistry(_handlers));
+ }
+
+ void connect(String join) throws Exception
+ {
+ if (join == null)
+ {
+ _groupMgr.establish();
+ }
+ else
+ {
+ _groupMgr.join(new SimpleMemberHandle(join));
+ }
+ }
+
+ private boolean inState(JoinState state)
+ {
+ return _groupMgr.getState().equals(state);
+ }
+
+ public void messageReceived(IoSession session, Object msg) throws Exception
+ {
+ JoinState state = _groupMgr.getState();
+ switch (state)
+ {
+ case JOINED:
+ _logger.debug(new LogMessage("Received {0}", msg));
+ super.messageReceived(session, msg);
+ break;
+ case JOINING:
+ case INITIATION:
+ case INDUCTION:
+ buffer(session, msg);
+ break;
+ default:
+ throw new AMQException("Received message while in state: " + state);
+ }
+ JoinState latest = _groupMgr.getState();
+ if (!latest.equals(state))
+ {
+ switch (latest)
+ {
+ case INDUCTION:
+ _logger.info("Reached induction, delivering buffered message from peers");
+ _peerBuffer.deliver();
+ break;
+ case JOINED:
+ _logger.info("Reached joined, delivering buffered message from clients");
+ _clientBuffer.deliver();
+ break;
+ }
+ }
+ }
+
+ private void buffer(IoSession session, Object msg) throws Exception
+ {
+ if (isBufferable(msg))
+ {
+ MemberHandle peer = ClusteredProtocolSession.getSessionPeer(session);
+ if (peer == null)
+ {
+ _logger.debug(new LogMessage("Buffering {0} for client", msg));
+ _clientBuffer.receive(session, msg);
+ }
+ else if (inState(JoinState.JOINING) && isMembershipAnnouncement(msg))
+ {
+ _logger.debug(new LogMessage("Initial membership [{0}] received from {1}", msg, peer));
+ super.messageReceived(session, msg);
+ }
+ else if (inState(JoinState.INITIATION) && _groupMgr.isLeader(peer))
+ {
+ _logger.debug(new LogMessage("Replaying {0} from leader ", msg));
+ super.messageReceived(session, msg);
+ }
+ else if (inState(JoinState.INDUCTION))
+ {
+ _logger.debug(new LogMessage("Replaying {0} from peer {1}", msg, peer));
+ super.messageReceived(session, msg);
+ }
+ else
+ {
+ _logger.debug(new LogMessage("Buffering {0} for peer {1}", msg, peer));
+ _peerBuffer.receive(session, msg);
+ }
+ }
+ else
+ {
+ _logger.debug(new LogMessage("Received {0}", msg));
+ super.messageReceived(session, msg);
+ }
+ }
+
+ public void deliver(IoSession session, Object msg) throws Exception
+ {
+ _logger.debug(new LogMessage("Delivering {0}", msg));
+ super.messageReceived(session, msg);
+ }
+
+ private boolean isMembershipAnnouncement(Object msg)
+ {
+ return msg instanceof AMQFrame && (((AMQFrame) msg).bodyFrame instanceof ClusterMembershipBody);
+ }
+
+ private boolean isBufferable(Object msg)
+ {
+ return msg instanceof AMQFrame && isBuffereable(((AMQFrame) msg).bodyFrame);
+ }
+
+ private boolean isBuffereable(AMQBody body)
+ {
+ return !(body instanceof ConnectionStartOkBody ||
+ body instanceof ConnectionTuneOkBody ||
+ body instanceof ConnectionSecureOkBody ||
+ body instanceof ConnectionOpenBody);
+ }
+}