diff options
Diffstat (limited to 'java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java')
-rw-r--r-- | java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java | 132 |
1 files changed, 132 insertions, 0 deletions
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java new file mode 100644 index 0000000000..d378647698 --- /dev/null +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java @@ -0,0 +1,132 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster; + +import org.apache.qpid.client.handler.ConnectionCloseMethodHandler; +import org.apache.qpid.client.handler.ConnectionOpenOkMethodHandler; +import org.apache.qpid.client.handler.ConnectionSecureMethodHandler; +import org.apache.qpid.client.handler.ConnectionStartMethodHandler; +import org.apache.qpid.client.handler.ConnectionTuneMethodHandler; +import org.apache.qpid.client.state.AMQState; +import org.apache.qpid.client.state.AMQStateManager; +import org.apache.qpid.client.state.IllegalStateTransitionException; +import org.apache.qpid.client.state.StateAwareMethodListener; +import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.ConnectionCloseBody; +import org.apache.qpid.framing.ConnectionOpenOkBody; +import org.apache.qpid.framing.ConnectionSecureBody; +import org.apache.qpid.framing.ConnectionStartBody; +import org.apache.qpid.framing.ConnectionTuneBody; + +import java.util.HashMap; +import java.util.Map; + +/** + * An extension of client.AMQStateManager that allows different handlers to be registered. + * + */ +public class ClientHandlerRegistry extends AMQStateManager +{ + private final Map<AMQState, ClientRegistry> _handlers = new HashMap<AMQState, ClientRegistry>(); + private final MemberHandle _identity; + + protected ClientHandlerRegistry(MemberHandle local) + { + super(AMQState.CONNECTION_NOT_STARTED, false); + + _identity = local; + + addHandler(ConnectionStartBody.class, ConnectionStartMethodHandler.getInstance(), + AMQState.CONNECTION_NOT_STARTED); + + addHandler(ConnectionTuneBody.class, new ConnectionTuneHandler(), + AMQState.CONNECTION_NOT_TUNED); + addHandler(ConnectionSecureBody.class, ConnectionSecureMethodHandler.getInstance(), + AMQState.CONNECTION_NOT_TUNED); + addHandler(ConnectionOpenOkBody.class, ConnectionOpenOkMethodHandler.getInstance(), + AMQState.CONNECTION_NOT_OPENED); + + addHandlers(ConnectionCloseBody.class, ConnectionCloseMethodHandler.getInstance(), + AMQState.CONNECTION_NOT_STARTED, + AMQState.CONNECTION_NOT_TUNED, + AMQState.CONNECTION_NOT_OPENED); + + } + + private ClientRegistry state(AMQState state) + { + ClientRegistry registry = _handlers.get(state); + if (registry == null) + { + registry = new ClientRegistry(); + _handlers.put(state, registry); + } + return registry; + } + + protected StateAwareMethodListener findStateTransitionHandler(AMQState state, AMQMethodBody frame) throws IllegalStateTransitionException + { + ClientRegistry registry = _handlers.get(state); + return registry == null ? null : registry.getHandler(frame); + } + + + <A extends Class<AMQMethodBody>> void addHandlers(Class type, StateAwareMethodListener handler, AMQState... states) + { + for (AMQState state : states) + { + addHandler(type, handler, state); + } + } + + <A extends Class<AMQMethodBody>> void addHandler(Class type, StateAwareMethodListener handler, AMQState state) + { + ClientRegistry registry = _handlers.get(state); + if (registry == null) + { + registry = new ClientRegistry(); + _handlers.put(state, registry); + } + registry.add(type, handler); + } + + static class ClientRegistry + { + private final Map<Class<? extends AMQMethodBody>, StateAwareMethodListener> registry + = new HashMap<Class<? extends AMQMethodBody>, StateAwareMethodListener>(); + + <A extends Class<AMQMethodBody>> void add(A type, StateAwareMethodListener handler) + { + registry.put(type, handler); + } + + StateAwareMethodListener getHandler(AMQMethodBody frame) + { + return registry.get(frame.getClass()); + } + } + + class ConnectionTuneHandler extends ConnectionTuneMethodHandler + { + protected AMQFrame createConnectionOpenFrame(int channel, String path, String capabilities, boolean insist) + { + return super.createConnectionOpenFrame(channel, path, ClusterCapability.add(capabilities, _identity), insist); + } + } +} |