diff options
author | Rafael H. Schloming <rhs@apache.org> | 2006-09-19 22:06:50 +0000 |
---|---|---|
committer | Rafael H. Schloming <rhs@apache.org> | 2006-09-19 22:06:50 +0000 |
commit | 913489deb2ee9dbf44455de5f407ddaf4bd8c540 (patch) | |
tree | 7ea442d6867d0076f1c9ea4f4265664059e7aff5 /java/cluster/src | |
download | qpid-python-913489deb2ee9dbf44455de5f407ddaf4bd8c540.tar.gz |
Import of qpid from etp:
URL: https://etp.108.redhat.com/svn/etp/trunk/blaze
Repository Root: https://etp.108.redhat.com/svn/etp
Repository UUID: 06e15bec-b515-0410-bef0-cc27a458cf48
Revision: 608
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@447994 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/cluster/src')
73 files changed, 6158 insertions, 0 deletions
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/BlockingHandler.java b/java/cluster/src/org/apache/qpid/server/cluster/BlockingHandler.java new file mode 100644 index 0000000000..3e7a2af01f --- /dev/null +++ b/java/cluster/src/org/apache/qpid/server/cluster/BlockingHandler.java @@ -0,0 +1,88 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster; + +import org.apache.qpid.framing.AMQMethodBody; + +public class BlockingHandler implements ResponseHandler +{ + private final Class _expected; + private boolean _completed; + private AMQMethodBody _response; + + + public BlockingHandler() + { + this(AMQMethodBody.class); + } + + public BlockingHandler(Class<? extends AMQMethodBody> expected) + { + _expected = expected; + } + + public void responded(AMQMethodBody response) + { + if (_expected.isInstance(response)) + { + _response = response; + completed(); + } + } + + public void removed() + { + completed(); + } + + private synchronized void completed() + { + _completed = true; + notifyAll(); + } + + synchronized void waitForCompletion() + { + while (!_completed) + { + try + { + wait(); + } + catch (InterruptedException ignore) + { + + } + } + } + + AMQMethodBody getResponse() + { + return _response; + } + + boolean failed() + { + return _response == null; + } + + boolean isCompleted() + { + return _completed; + } +} diff --git a/java/cluster/src/org/apache/qpid/server/cluster/BroadcastPolicy.java b/java/cluster/src/org/apache/qpid/server/cluster/BroadcastPolicy.java new file mode 100644 index 0000000000..6d56afe50b --- /dev/null +++ b/java/cluster/src/org/apache/qpid/server/cluster/BroadcastPolicy.java @@ -0,0 +1,23 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster; + +public interface BroadcastPolicy +{ + public boolean isComplete(int responded, int members); +} diff --git a/java/cluster/src/org/apache/qpid/server/cluster/Broker.java b/java/cluster/src/org/apache/qpid/server/cluster/Broker.java new file mode 100644 index 0000000000..d8a3fd1b76 --- /dev/null +++ b/java/cluster/src/org/apache/qpid/server/cluster/Broker.java @@ -0,0 +1,244 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster; + +import org.apache.qpid.AMQException; +import org.apache.qpid.server.cluster.util.LogMessage; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +/** + * An implementation of the Member interface (through which data is sent to other + * peers in the cluster). This class provides a base from which subclasses can + * inherit some common behaviour for broadcasting GroupRequests and sending methods + * that may expect a response. It also extends the Member abstraction to support + * a richer set of operations that are useful within the package but should not be + * exposed outside of it. + * + */ +abstract class Broker extends SimpleMemberHandle implements Member +{ + private static final Logger _logger = Logger.getLogger(Broker.class); + private static final int DEFAULT_CHANNEL = 1; + private static final int START_CHANNEL = 2; + private static final int END_CHANNEL = 10000; + + + private MemberFailureListener _listener; + //a wrap-around counter to allocate _requests a unique channel: + private int _nextChannel = START_CHANNEL; + //outstanding _requests: + private final Map<Integer, ResponseHandler> _requests = new HashMap<Integer, ResponseHandler>(); + + Broker(String host, int port) + { + super(host, port); + } + + /** + * Allows a listener to be registered that will receive callbacks when communication + * to the peer this broker instance represents fails. + * @param listener the callback to be notified of failures + */ + public void addFailureListener(MemberFailureListener listener) + { + _listener = listener; + } + + /** + * Allows subclasses to signal comunication failures + */ + protected void failed() + { + if (_listener != null) + { + _listener.failed(this); + } + } + + /** + * Subclasses should call this on receiving message responses from the remote + * peer. They are matched to any outstanding request they might be response + * to, with the completion and callback of that request being managed if + * required. + * + * @param channel the channel on which the method was received + * @param response the response received + * @return true if the response matched an outstanding request + */ + protected synchronized boolean handleResponse(int channel, AMQMethodBody response) + { + ResponseHandler request = _requests.get(channel); + if (request == null) + { + if(!_requests.isEmpty()) + { + _logger.warn(new LogMessage("[next channel={3, integer}]: Response {0} on channel {1, integer} failed to match outstanding requests: {2}", response, channel, _requests, _nextChannel)); + } + return false; + } + else + { + request.responded(response); + return true; + } + } + + /** + * Called when this broker is excluded from the group. Any requests made on + * it are informed this member has left the group. + */ + synchronized void remove() + { + for (ResponseHandler r : _requests.values()) + { + r.removed(); + } + } + + /** + * Engages this broker in the specified group request + * + * @param request the request being made to a group of brokers + * @throws AMQException if there is any failure + */ + synchronized void invoke(GroupRequest request) throws AMQException + { + int channel = nextChannel(); + _requests.put(channel, new GroupRequestAdapter(request, channel)); + request.send(channel, this); + } + + /** + * Sends a message to the remote peer and undertakes to notify the specified + * handler of the response. + * + * @param msg the message to send + * @param handler the callback to notify of responses (or the removal of this broker + * from the group) + * @throws AMQException + */ + synchronized void send(Sendable msg, ResponseHandler handler) throws AMQException + { + int channel; + if (handler != null) + { + channel = nextChannel(); + _requests.put(channel, new RemovingWrapper(handler, channel)); + } + else + { + channel = DEFAULT_CHANNEL; + } + + msg.send(channel, this); + } + + private int nextChannel() + { + int channel = _nextChannel++; + if(_nextChannel >= END_CHANNEL) + { + _nextChannel = START_CHANNEL; + } + return channel; + } + + /** + * extablish connection without handling redirect + */ + abstract boolean connect() throws IOException, InterruptedException; + + /** + * Start connection process, including replay + */ + abstract void connectAsynch(Iterable<AMQMethodBody> msgs); + + /** + * Replay messages to the remote peer this instance represents. These messages + * must be sent before any others whose transmission is requested through send() etc. + * + * @param msgs + */ + abstract void replay(Iterable<AMQMethodBody> msgs); + + /** + * establish connection, handling redirect if required... + */ + abstract Broker connectToCluster() throws IOException, InterruptedException; + + private class GroupRequestAdapter implements ResponseHandler + { + private final GroupRequest request; + private final int channel; + + GroupRequestAdapter(GroupRequest request, int channel) + { + this.request = request; + this.channel = channel; + } + + public void responded(AMQMethodBody response) + { + request.responseReceived(Broker.this, response); + _requests.remove(channel); + } + + public void removed() + { + request.removed(Broker.this); + } + + public String toString() + { + return "GroupRequestAdapter{" + channel + ", " + request + "}"; + } + } + + private class RemovingWrapper implements ResponseHandler + { + private final ResponseHandler handler; + private final int channel; + + RemovingWrapper(ResponseHandler handler, int channel) + { + this.handler = handler; + this.channel = channel; + } + + public void responded(AMQMethodBody response) + { + handler.responded(response); + _requests.remove(channel); + } + + public void removed() + { + handler.removed(); + } + + public String toString() + { + return "RemovingWrapper{" + channel + ", " + handler + "}"; + } + } +} diff --git a/java/cluster/src/org/apache/qpid/server/cluster/BrokerFactory.java b/java/cluster/src/org/apache/qpid/server/cluster/BrokerFactory.java new file mode 100644 index 0000000000..75a47a168f --- /dev/null +++ b/java/cluster/src/org/apache/qpid/server/cluster/BrokerFactory.java @@ -0,0 +1,23 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster; + +interface BrokerFactory +{ + public Broker create(MemberHandle handle); +} diff --git a/java/cluster/src/org/apache/qpid/server/cluster/BrokerGroup.java b/java/cluster/src/org/apache/qpid/server/cluster/BrokerGroup.java new file mode 100644 index 0000000000..db667879f6 --- /dev/null +++ b/java/cluster/src/org/apache/qpid/server/cluster/BrokerGroup.java @@ -0,0 +1,365 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster; + +import org.apache.log4j.Logger; +import org.apache.qpid.server.cluster.replay.ReplayManager; +import org.apache.qpid.server.cluster.util.LogMessage; +import org.apache.qpid.server.cluster.util.InvokeMultiple; +import org.apache.qpid.framing.AMQMethodBody; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * Manages the membership list of a group and the set of brokers representing the + * remote peers. The group should be initialised through a call to establish() + * or connectToLeader(). + * + */ +class BrokerGroup +{ + private static final Logger _logger = Logger.getLogger(BrokerGroup.class); + + private final InvokeMultiple<MembershipChangeListener> _changeListeners = new InvokeMultiple<MembershipChangeListener>(MembershipChangeListener.class); + private final ReplayManager _replayMgr; + private final MemberHandle _local; + private final BrokerFactory _factory; + private final Object _lock = new Object(); + private final Set<MemberHandle> _synch = new HashSet<MemberHandle>(); + private List<MemberHandle> _members; + private List<Broker> _peers = new ArrayList<Broker>(); + private JoinState _state = JoinState.UNINITIALISED; + + /** + * Creates an unitialised group. + * + * @param local a handle that represents the local broker + * @param replayMgr the replay manager to use when creating new brokers + * @param factory the factory through which broker instances are created + */ + BrokerGroup(MemberHandle local, ReplayManager replayMgr, BrokerFactory factory) + { + _replayMgr = replayMgr; + _local = local; + _factory = factory; + } + + /** + * Called to establish the local broker as the leader of a new group + */ + void establish() + { + synchronized (_lock) + { + setState(JoinState.JOINED); + _members = new ArrayList<MemberHandle>(); + _members.add(_local); + } + fireChange(); + } + + /** + * Called by prospect to connect to group + */ + Broker connectToLeader(MemberHandle handle) throws Exception + { + Broker leader = _factory.create(handle); + leader = leader.connectToCluster(); + synchronized (_lock) + { + setState(JoinState.JOINING); + _members = new ArrayList<MemberHandle>(); + _members.add(leader); + _peers.add(leader); + } + fireChange(); + return leader; + } + + /** + * Called by leader when handling a join request + */ + Broker connectToProspect(MemberHandle handle) throws IOException, InterruptedException + { + Broker prospect = _factory.create(handle); + prospect.connect(); + synchronized (_lock) + { + _members.add(prospect); + _peers.add(prospect); + } + fireChange(); + return prospect; + } + + /** + * Called in reponse to membership announcements. + * + * @param members the list of members now part of the group + */ + void setMembers(List<MemberHandle> members) + { + if (isJoined()) + { + List<Broker> old = _peers; + + synchronized (_lock) + { + _peers = getBrokers(members); + _members = new ArrayList<MemberHandle>(members); + } + + //remove those that are still members + old.removeAll(_peers); + + //handle failure of any brokers that haven't survived + for (Broker peer : old) + { + peer.remove(); + } + } + else + { + synchronized (_lock) + { + setState(JoinState.INITIATION); + _members = new ArrayList<MemberHandle>(members); + _synch.addAll(_members); + _synch.remove(_local); + } + } + fireChange(); + } + + List<MemberHandle> getMembers() + { + synchronized (_lock) + { + return Collections.unmodifiableList(_members); + } + } + + List<Broker> getPeers() + { + synchronized (_lock) + { + return _peers; + } + } + + /** + * Removes the member presented from the group + * @param peer the broker that should be removed + */ + void remove(Broker peer) + { + synchronized (_lock) + { + _peers.remove(peer); + _members.remove(peer); + } + fireChange(); + } + + MemberHandle getLocal() + { + return _local; + } + + Broker getLeader() + { + synchronized (_lock) + { + return _peers.size() > 0 ? _peers.get(0) : null; + } + } + + /** + * Allows a Broker instance to be retrieved for a given handle + * + * @param handle the handle for which a broker is sought + * @param create flag to indicate whther a broker should be created for the handle if + * one is not found within the list of known peers + * @return the broker corresponding to handle or null if a match cannot be found and + * create is false + */ + Broker findBroker(MemberHandle handle, boolean create) + { + if (handle instanceof Broker) + { + return (Broker) handle; + } + else + { + for (Broker b : getPeers()) + { + if (b.matches(handle)) + { + return b; + } + } + } + if (create) + { + Broker b = _factory.create(handle); + List<AMQMethodBody> msgs = _replayMgr.replay(isLeader(_local)); + _logger.info(new LogMessage("Replaying {0} from {1} to {2}", msgs, _local, b)); + b.connectAsynch(msgs); + + return b; + } + else + { + return null; + } + } + + /** + * @param member the member to test for leadership + * @return true if the passed in member is the group leader, false otherwise + */ + boolean isLeader(MemberHandle member) + { + synchronized (_lock) + { + return member.matches(_members.get(0)); + } + } + + /** + * @return true if the local broker is the group leader, false otherwise + */ + boolean isLeader() + { + return isLeader(_local); + } + + /** + * Used when the leader fails and the next broker in the list needs to + * assume leadership + * @return true if the action succeeds + */ + boolean assumeLeadership() + { + boolean valid; + synchronized (_lock) + { + valid = _members.size() > 1 && _local.matches(_members.get(1)); + if (valid) + { + _members.remove(0); + _peers.remove(0); + } + } + fireChange(); + return valid; + } + + /** + * Called in response to a Cluster.Synch message being received during the join + * process. This indicates that the member mentioned has replayed all necessary + * messages to the local broker. + * + * @param member the member from whom the synch messages was received + */ + void synched(MemberHandle member) + { + _logger.info(new LogMessage("Synchronised with {0}", member)); + synchronized (_lock) + { + if (isLeader(member)) + { + setState(JoinState.INDUCTION); + } + _synch.remove(member); + if (_synch.isEmpty()) + { + _peers = getBrokers(_members); + setState(JoinState.JOINED); + } + } + } + + + /** + * @return the state of the group + */ + JoinState getState() + { + synchronized (_lock) + { + return _state; + } + } + + void addMemberhipChangeListener(MembershipChangeListener l) + { + _changeListeners.addListener(l); + } + + void removeMemberhipChangeListener(MembershipChangeListener l) + { + _changeListeners.removeListener(l); + } + + + + private void setState(JoinState state) + { + _logger.info(new LogMessage("Changed state from {0} to {1}", _state, state)); + _state = state; + } + + private boolean isJoined() + { + return inState(JoinState.JOINED); + } + + private boolean inState(JoinState state) + { + return _state.equals(state); + } + + private List<Broker> getBrokers(List<MemberHandle> handles) + { + List<Broker> brokers = new ArrayList<Broker>(); + for (MemberHandle handle : handles) + { + if (!_local.matches(handle)) + { + brokers.add(findBroker(handle, true)); + } + } + return brokers; + } + + private void fireChange() + { + List<MemberHandle> members; + synchronized(this) + { + members = new ArrayList(_members); + } + _changeListeners.getProxy().changed(Collections.unmodifiableList(members)); + } +}
\ No newline at end of file diff --git a/java/cluster/src/org/apache/qpid/server/cluster/ClientAdapter.java b/java/cluster/src/org/apache/qpid/server/cluster/ClientAdapter.java new file mode 100644 index 0000000000..6909b199ce --- /dev/null +++ b/java/cluster/src/org/apache/qpid/server/cluster/ClientAdapter.java @@ -0,0 +1,70 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster; + +import org.apache.mina.common.IoSession; +import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.protocol.AMQMethodEvent; +import org.apache.qpid.client.protocol.AMQProtocolSession; +import org.apache.qpid.client.state.AMQStateManager; +import org.apache.qpid.framing.AMQMethodBody; + +/** + * Hack to assist with reuse of the client handlers for connection setup in + * the inter-broker communication within the cluster. + * + */ +class ClientAdapter implements MethodHandler +{ + private final AMQProtocolSession _session; + private final AMQStateManager _stateMgr; + + ClientAdapter(IoSession session, AMQStateManager stateMgr) + { + this(session, stateMgr, "guest", "guest", session.toString(), "/cluster"); + } + + ClientAdapter(IoSession session, AMQStateManager stateMgr, String user, String password, String name, String path) + { + _session = new SessionAdapter(session, new ConnectionAdapter(user, password, name, path)); + _stateMgr = stateMgr; + } + + public void handle(int channel, AMQMethodBody method) throws AMQException + { + AMQMethodEvent evt = new AMQMethodEvent(channel, method, _session); + _stateMgr.methodReceived(evt); + } + + private class SessionAdapter extends AMQProtocolSession + { + public SessionAdapter(IoSession session, AMQConnection connection) + { + super(null, session, connection); + } + } + + private static class ConnectionAdapter extends AMQConnection + { + ConnectionAdapter(String username, String password, String clientName, String virtualPath) + { + super(username, password, clientName, virtualPath); + } + } +} diff --git a/java/cluster/src/org/apache/qpid/server/cluster/ClientHandlerRegistry.java b/java/cluster/src/org/apache/qpid/server/cluster/ClientHandlerRegistry.java new file mode 100644 index 0000000000..d378647698 --- /dev/null +++ b/java/cluster/src/org/apache/qpid/server/cluster/ClientHandlerRegistry.java @@ -0,0 +1,132 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster; + +import org.apache.qpid.client.handler.ConnectionCloseMethodHandler; +import org.apache.qpid.client.handler.ConnectionOpenOkMethodHandler; +import org.apache.qpid.client.handler.ConnectionSecureMethodHandler; +import org.apache.qpid.client.handler.ConnectionStartMethodHandler; +import org.apache.qpid.client.handler.ConnectionTuneMethodHandler; +import org.apache.qpid.client.state.AMQState; +import org.apache.qpid.client.state.AMQStateManager; +import org.apache.qpid.client.state.IllegalStateTransitionException; +import org.apache.qpid.client.state.StateAwareMethodListener; +import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.ConnectionCloseBody; +import org.apache.qpid.framing.ConnectionOpenOkBody; +import org.apache.qpid.framing.ConnectionSecureBody; +import org.apache.qpid.framing.ConnectionStartBody; +import org.apache.qpid.framing.ConnectionTuneBody; + +import java.util.HashMap; +import java.util.Map; + +/** + * An extension of client.AMQStateManager that allows different handlers to be registered. + * + */ +public class ClientHandlerRegistry extends AMQStateManager +{ + private final Map<AMQState, ClientRegistry> _handlers = new HashMap<AMQState, ClientRegistry>(); + private final MemberHandle _identity; + + protected ClientHandlerRegistry(MemberHandle local) + { + super(AMQState.CONNECTION_NOT_STARTED, false); + + _identity = local; + + addHandler(ConnectionStartBody.class, ConnectionStartMethodHandler.getInstance(), + AMQState.CONNECTION_NOT_STARTED); + + addHandler(ConnectionTuneBody.class, new ConnectionTuneHandler(), + AMQState.CONNECTION_NOT_TUNED); + addHandler(ConnectionSecureBody.class, ConnectionSecureMethodHandler.getInstance(), + AMQState.CONNECTION_NOT_TUNED); + addHandler(ConnectionOpenOkBody.class, ConnectionOpenOkMethodHandler.getInstance(), + AMQState.CONNECTION_NOT_OPENED); + + addHandlers(ConnectionCloseBody.class, ConnectionCloseMethodHandler.getInstance(), + AMQState.CONNECTION_NOT_STARTED, + AMQState.CONNECTION_NOT_TUNED, + AMQState.CONNECTION_NOT_OPENED); + + } + + private ClientRegistry state(AMQState state) + { + ClientRegistry registry = _handlers.get(state); + if (registry == null) + { + registry = new ClientRegistry(); + _handlers.put(state, registry); + } + return registry; + } + + protected StateAwareMethodListener findStateTransitionHandler(AMQState state, AMQMethodBody frame) throws IllegalStateTransitionException + { + ClientRegistry registry = _handlers.get(state); + return registry == null ? null : registry.getHandler(frame); + } + + + <A extends Class<AMQMethodBody>> void addHandlers(Class type, StateAwareMethodListener handler, AMQState... states) + { + for (AMQState state : states) + { + addHandler(type, handler, state); + } + } + + <A extends Class<AMQMethodBody>> void addHandler(Class type, StateAwareMethodListener handler, AMQState state) + { + ClientRegistry registry = _handlers.get(state); + if (registry == null) + { + registry = new ClientRegistry(); + _handlers.put(state, registry); + } + registry.add(type, handler); + } + + static class ClientRegistry + { + private final Map<Class<? extends AMQMethodBody>, StateAwareMethodListener> registry + = new HashMap<Class<? extends AMQMethodBody>, StateAwareMethodListener>(); + + <A extends Class<AMQMethodBody>> void add(A type, StateAwareMethodListener handler) + { + registry.put(type, handler); + } + + StateAwareMethodListener getHandler(AMQMethodBody frame) + { + return registry.get(frame.getClass()); + } + } + + class ConnectionTuneHandler extends ConnectionTuneMethodHandler + { + protected AMQFrame createConnectionOpenFrame(int channel, String path, String capabilities, boolean insist) + { + return super.createConnectionOpenFrame(channel, path, ClusterCapability.add(capabilities, _identity), insist); + } + } +} diff --git a/java/cluster/src/org/apache/qpid/server/cluster/ClusterBuilder.java b/java/cluster/src/org/apache/qpid/server/cluster/ClusterBuilder.java new file mode 100644 index 0000000000..eb2992a690 --- /dev/null +++ b/java/cluster/src/org/apache/qpid/server/cluster/ClusterBuilder.java @@ -0,0 +1,60 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster; + +import org.apache.qpid.server.cluster.handler.ClusterMethodHandlerFactory; +import org.apache.qpid.server.cluster.replay.RecordingMethodHandlerFactory; +import org.apache.qpid.server.cluster.replay.ReplayStore; + +import java.net.InetSocketAddress; + +class ClusterBuilder +{ + private final LoadTable loadTable = new LoadTable(); + private final ReplayStore replayStore = new ReplayStore(); + private final MemberHandle handle; + private final GroupManager groupMgr; + + ClusterBuilder(InetSocketAddress address) + { + handle = new SimpleMemberHandle(address.getHostName(), address.getPort()).resolve(); + groupMgr = new DefaultGroupManager(handle, getBrokerFactory(), replayStore, loadTable); + } + + GroupManager getGroupManager() + { + return groupMgr; + } + + ServerHandlerRegistry getHandlerRegistry() + { + return new ServerHandlerRegistry(getHandlerFactory()); + } + + private MethodHandlerFactory getHandlerFactory() + { + MethodHandlerFactory factory = new ClusterMethodHandlerFactory(groupMgr, loadTable); + //need to wrap relevant handlers with recording handler for easy replay: + return new RecordingMethodHandlerFactory(factory, replayStore); + } + + private BrokerFactory getBrokerFactory() + { + return new MinaBrokerProxyFactory(handle); + } +} diff --git a/java/cluster/src/org/apache/qpid/server/cluster/ClusterCapability.java b/java/cluster/src/org/apache/qpid/server/cluster/ClusterCapability.java new file mode 100644 index 0000000000..9bf2e02d3c --- /dev/null +++ b/java/cluster/src/org/apache/qpid/server/cluster/ClusterCapability.java @@ -0,0 +1,55 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class ClusterCapability +{ + public static final String PATTERN = ".*\\bcluster_peer=(\\S*:\\d*)\b*.*"; + public static final String PEER = "cluster_peer"; + + public static String add(String original, MemberHandle identity) + { + return original == null ? peer(identity) : original + " " + peer(identity); + } + + private static String peer(MemberHandle identity) + { + return PEER + "=" + identity.getDetails(); + } + + public static boolean contains(String in) + { + return in != null && in.contains(in); + } + + public static MemberHandle getPeer(String in) + { + Matcher matcher = Pattern.compile(PATTERN).matcher(in); + if (matcher.matches()) + { + return new SimpleMemberHandle(matcher.group(1)); + } + else + { + throw new RuntimeException("Could not find peer in '" + in + "'"); + } + } +} diff --git a/java/cluster/src/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java b/java/cluster/src/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java new file mode 100644 index 0000000000..65b188484c --- /dev/null +++ b/java/cluster/src/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java @@ -0,0 +1,190 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster; + +import org.apache.log4j.Logger; +import org.apache.mina.common.IoSession; +import org.apache.qpid.AMQException; +import org.apache.qpid.codec.AMQCodecFactory; +import org.apache.qpid.framing.AMQBody; +import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.ConnectionOpenBody; +import org.apache.qpid.framing.ConnectionSecureOkBody; +import org.apache.qpid.framing.ConnectionStartOkBody; +import org.apache.qpid.framing.ConnectionTuneOkBody; +import org.apache.qpid.framing.ClusterMembershipBody; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.protocol.AMQPFastProtocolHandler; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.registry.IApplicationRegistry; +import org.apache.qpid.server.cluster.util.LogMessage; + +import java.net.InetSocketAddress; + +public class ClusteredProtocolHandler extends AMQPFastProtocolHandler implements InductionBuffer.MessageHandler +{ + private static final Logger _logger = Logger.getLogger(ClusteredProtocolHandler.class); + private final InductionBuffer _peerBuffer = new InductionBuffer(this); + private final InductionBuffer _clientBuffer = new InductionBuffer(this); + private final GroupManager _groupMgr; + private final ServerHandlerRegistry _handlers; + + public ClusteredProtocolHandler(InetSocketAddress address) + { + this(ApplicationRegistry.getInstance(), address); + } + + public ClusteredProtocolHandler(IApplicationRegistry registry, InetSocketAddress address) + { + this(registry.getQueueRegistry(), registry.getExchangeRegistry(), address); + } + + public ClusteredProtocolHandler(QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry, InetSocketAddress address) + { + super(queueRegistry, exchangeRegistry); + ClusterBuilder builder = new ClusterBuilder(address); + _groupMgr = builder.getGroupManager(); + _handlers = builder.getHandlerRegistry(); + } + + public ClusteredProtocolHandler(ClusteredProtocolHandler handler) + { + super(handler); + _groupMgr = handler._groupMgr; + _handlers = handler._handlers; + } + + protected void createSession(IoSession session, QueueRegistry queues, ExchangeRegistry exchanges, AMQCodecFactory codec) throws AMQException + { + new ClusteredProtocolSession(session, queues, exchanges, codec, new ServerHandlerRegistry(_handlers)); + } + + void connect(String join) throws Exception + { + if (join == null) + { + _groupMgr.establish(); + } + else + { + _groupMgr.join(new SimpleMemberHandle(join)); + } + } + + private boolean inState(JoinState state) + { + return _groupMgr.getState().equals(state); + } + + public void messageReceived(IoSession session, Object msg) throws Exception + { + JoinState state = _groupMgr.getState(); + switch (state) + { + case JOINED: + _logger.debug(new LogMessage("Received {0}", msg)); + super.messageReceived(session, msg); + break; + case JOINING: + case INITIATION: + case INDUCTION: + buffer(session, msg); + break; + default: + throw new AMQException("Received message while in state: " + state); + } + JoinState latest = _groupMgr.getState(); + if (!latest.equals(state)) + { + switch (latest) + { + case INDUCTION: + _logger.info("Reached induction, delivering buffered message from peers"); + _peerBuffer.deliver(); + break; + case JOINED: + _logger.info("Reached joined, delivering buffered message from clients"); + _clientBuffer.deliver(); + break; + } + } + } + + private void buffer(IoSession session, Object msg) throws Exception + { + if (isBufferable(msg)) + { + MemberHandle peer = ClusteredProtocolSession.getSessionPeer(session); + if (peer == null) + { + _logger.debug(new LogMessage("Buffering {0} for client", msg)); + _clientBuffer.receive(session, msg); + } + else if (inState(JoinState.JOINING) && isMembershipAnnouncement(msg)) + { + _logger.debug(new LogMessage("Initial membership [{0}] received from {1}", msg, peer)); + super.messageReceived(session, msg); + } + else if (inState(JoinState.INITIATION) && _groupMgr.isLeader(peer)) + { + _logger.debug(new LogMessage("Replaying {0} from leader ", msg)); + super.messageReceived(session, msg); + } + else if (inState(JoinState.INDUCTION)) + { + _logger.debug(new LogMessage("Replaying {0} from peer {1}", msg, peer)); + super.messageReceived(session, msg); + } + else + { + _logger.debug(new LogMessage("Buffering {0} for peer {1}", msg, peer)); + _peerBuffer.receive(session, msg); + } + } + else + { + _logger.debug(new LogMessage("Received {0}", msg)); + super.messageReceived(session, msg); + } + } + + public void deliver(IoSession session, Object msg) throws Exception + { + _logger.debug(new LogMessage("Delivering {0}", msg)); + super.messageReceived(session, msg); + } + + private boolean isMembershipAnnouncement(Object msg) + { + return msg instanceof AMQFrame && (((AMQFrame) msg).bodyFrame instanceof ClusterMembershipBody); + } + + private boolean isBufferable(Object msg) + { + return msg instanceof AMQFrame && isBuffereable(((AMQFrame) msg).bodyFrame); + } + + private boolean isBuffereable(AMQBody body) + { + return !(body instanceof ConnectionStartOkBody || + body instanceof ConnectionTuneOkBody || + body instanceof ConnectionSecureOkBody || + body instanceof ConnectionOpenBody); + } +} diff --git a/java/cluster/src/org/apache/qpid/server/cluster/ClusteredProtocolSession.java b/java/cluster/src/org/apache/qpid/server/cluster/ClusteredProtocolSession.java new file mode 100644 index 0000000000..e5efe941b3 --- /dev/null +++ b/java/cluster/src/org/apache/qpid/server/cluster/ClusteredProtocolSession.java @@ -0,0 +1,132 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster; + +import org.apache.mina.common.IoSession; +import org.apache.qpid.AMQException; +import org.apache.qpid.codec.AMQCodecFactory; +import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.protocol.AMQMinaProtocolSession; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.registry.IApplicationRegistry; +import org.apache.qpid.server.state.AMQStateManager; + +public class ClusteredProtocolSession extends AMQMinaProtocolSession +{ + private MemberHandle _peer; + + public ClusteredProtocolSession(IoSession session, QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry, AMQCodecFactory codecFactory, AMQStateManager stateManager) + throws AMQException + { + super(session, queueRegistry, exchangeRegistry, codecFactory, stateManager); + } + + public boolean isPeerSession() + { + return _peer != null; + } + + public void setSessionPeer(MemberHandle peer) + { + _peer = peer; + } + + public MemberHandle getSessionPeer() + { + return _peer; + } + + public AMQChannel getChannel(int channelId) + throws AMQException + { + AMQChannel channel = super.getChannel(channelId); + if (isPeerSession() && channel == null) + { + channel = new OneUseChannel(channelId); + addChannel(channel); + } + return channel; + } + + public static boolean isPeerSession(IoSession session) + { + return isPeerSession(getAMQProtocolSession(session)); + } + + public static boolean isPeerSession(AMQProtocolSession session) + { + return session instanceof ClusteredProtocolSession && ((ClusteredProtocolSession) session).isPeerSession(); + } + + public static void setSessionPeer(AMQProtocolSession session, MemberHandle peer) + { + ((ClusteredProtocolSession) session).setSessionPeer(peer); + } + + public static MemberHandle getSessionPeer(AMQProtocolSession session) + { + return ((ClusteredProtocolSession) session).getSessionPeer(); + } + + public static MemberHandle getSessionPeer(IoSession session) + { + return getSessionPeer(getAMQProtocolSession(session)); + } + + /** + * Cleans itself up after delivery of a message (publish frame, header and optional body frame(s)) + */ + private class OneUseChannel extends AMQChannel + { + public OneUseChannel(int channelId) + throws AMQException + { + this(channelId, ApplicationRegistry.getInstance()); + } + + public OneUseChannel(int channelId, IApplicationRegistry registry) + throws AMQException + { + super(channelId, + registry.getMessageStore(), + registry.getExchangeRegistry()); + } + + protected void routeCurrentMessage() throws AMQException + { + super.routeCurrentMessage(); + removeChannel(getChannelId()); + } + } + + public static boolean isPayloadFromPeer(AMQMessage payload) + { + return isPeerSession(payload.getPublisher()); + } + + public static boolean canRelay(AMQMessage payload, MemberHandle target) + { + //can only relay client messages that have not already been relayed to the given target + return !isPayloadFromPeer(payload) && !payload.checkToken(target); + } + +} diff --git a/java/cluster/src/org/apache/qpid/server/cluster/ConnectionStatusMonitor.java b/java/cluster/src/org/apache/qpid/server/cluster/ConnectionStatusMonitor.java new file mode 100644 index 0000000000..650240fa70 --- /dev/null +++ b/java/cluster/src/org/apache/qpid/server/cluster/ConnectionStatusMonitor.java @@ -0,0 +1,77 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster; + +import java.io.IOException; + +class ConnectionStatusMonitor +{ + private boolean _complete; + private boolean _redirected; + private String _host; + private int _port; + private RuntimeException _error; + + synchronized void opened() + { + _complete = true; + notifyAll(); + } + + synchronized void redirect(String host, int port) + { + _complete = true; + _redirected = true; + this._host = host; + this._port = port; + } + + synchronized void failed(RuntimeException e) + { + _error = e; + _complete = true; + } + + synchronized boolean waitUntilOpen() throws InterruptedException + { + while (!_complete) + { + wait(); + } + if (_error != null) + { + throw _error; + } + return !_redirected; + } + + synchronized boolean isOpened() + { + return _complete; + } + + String getHost() + { + return _host; + } + + int getPort() + { + return _port; + } +} diff --git a/java/cluster/src/org/apache/qpid/server/cluster/DefaultGroupManager.java b/java/cluster/src/org/apache/qpid/server/cluster/DefaultGroupManager.java new file mode 100644 index 0000000000..980b36cf21 --- /dev/null +++ b/java/cluster/src/org/apache/qpid/server/cluster/DefaultGroupManager.java @@ -0,0 +1,366 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.ClusterJoinBody; +import org.apache.qpid.framing.ClusterLeaveBody; +import org.apache.qpid.framing.ClusterMembershipBody; +import org.apache.qpid.framing.ClusterPingBody; +import org.apache.qpid.framing.ClusterSuspectBody; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.server.cluster.policy.StandardPolicies; +import org.apache.qpid.server.cluster.replay.ReplayManager; +import org.apache.qpid.server.cluster.util.LogMessage; +import org.apache.qpid.server.cluster.util.InvokeMultiple; + +import java.util.List; + +public class DefaultGroupManager implements GroupManager, MemberFailureListener, BrokerFactory, StandardPolicies +{ + private static final Logger _logger = Logger.getLogger(DefaultGroupManager.class); + private final LoadTable _loadTable; + private final BrokerFactory _factory; + private final ReplayManager _replayMgr; + private final BrokerGroup _group; + + DefaultGroupManager(MemberHandle handle, BrokerFactory factory, ReplayManager replayMgr) + { + this(handle, factory, replayMgr, new LoadTable()); + } + + DefaultGroupManager(MemberHandle handle, BrokerFactory factory, ReplayManager replayMgr, LoadTable loadTable) + { + handle = SimpleMemberHandle.resolve(handle); + _logger.info(handle); + _loadTable = loadTable; + _factory = factory; + _replayMgr = replayMgr; + _group = new BrokerGroup(handle, _replayMgr, this); + } + + public JoinState getState() + { + return _group.getState(); + } + + public void addMemberhipChangeListener(MembershipChangeListener l) + { + _group.addMemberhipChangeListener(l); + } + + public void removeMemberhipChangeListener(MembershipChangeListener l) + { + _group.removeMemberhipChangeListener(l); + } + + public void broadcast(Sendable message) throws AMQException + { + for (Broker b : _group.getPeers()) + { + b.send(message, null); + } + } + + public void broadcast(Sendable message, BroadcastPolicy policy, GroupResponseHandler callback) throws AMQException + { + GroupRequest request = new GroupRequest(message, policy, callback); + for (Broker b : _group.getPeers()) + { + b.invoke(request); + } + request.finishedSend(); + } + + public void send(MemberHandle broker, Sendable message) throws AMQException + { + Broker destination = findBroker(broker); + if(destination == null) + { + _logger.warn(new LogMessage("Invalid destination sending {0}. {1} not known", message, broker)); + } + else + { + destination.send(message, null); + _logger.debug(new LogMessage("Sent {0} to {1}", message, broker)); + } + } + + private void send(Broker broker, Sendable message, ResponseHandler handler) throws AMQException + { + broker.send(message, handler); + } + + private void ping(Broker b) throws AMQException + { + ClusterPingBody ping = new ClusterPingBody(); + ping.broker = _group.getLocal().getDetails(); + ping.responseRequired = true; + ping.load = _loadTable.getLocalLoad(); + BlockingHandler handler = new BlockingHandler(); + send(getLeader(), new SimpleSendable(ping), handler); + handler.waitForCompletion(); + if (handler.failed()) + { + if (isLeader()) + { + handleFailure(b); + } + else + { + suspect(b); + } + } + else + { + _loadTable.setLoad(b, ((ClusterPingBody) handler.getResponse()).load); + } + } + + public void handlePing(MemberHandle member, long load) + { + _loadTable.setLoad(findBroker(member), load); + } + + public Member redirect() + { + return _loadTable.redirect(); + } + + public void establish() + { + _group.establish(); + _logger.info("Established cluster"); + } + + public void join(MemberHandle member) throws AMQException + { + member = SimpleMemberHandle.resolve(member); + + Broker leader = connectToLeader(member); + _logger.info(new LogMessage("Connected to {0}. joining", leader)); + ClusterJoinBody join = new ClusterJoinBody(); + join.broker = _group.getLocal().getDetails(); + send(leader, new SimpleSendable(join)); + } + + private Broker connectToLeader(MemberHandle member) throws AMQException + { + try + { + return _group.connectToLeader(member); + } + catch (Exception e) + { + throw new AMQException("Could not connect to leader: " + e, e); + } + } + + public void leave() throws AMQException + { + ClusterLeaveBody leave = new ClusterLeaveBody(); + leave.broker = _group.getLocal().getDetails(); + send(getLeader(), new SimpleSendable(leave)); + } + + private void suspect(MemberHandle broker) throws AMQException + { + if (_group.isLeader(broker)) + { + //need new leader, if this broker is next in line it can assume leadership + if (_group.assumeLeadership()) + { + announceMembership(); + } + else + { + _logger.warn(new LogMessage("Leader failed. Expecting {0} to succeed.", _group.getMembers().get(1))); + } + } + else + { + ClusterSuspectBody suspect = new ClusterSuspectBody(); + suspect.broker = broker.getDetails(); + send(getLeader(), new SimpleSendable(suspect)); + } + } + + + public void handleJoin(MemberHandle member) throws AMQException + { + _logger.info(new LogMessage("Handling join request for {0}", member)); + if(isLeader()) + { + //connect to the host and port specified: + Broker prospect = connectToProspect(member); + announceMembership(); + List<AMQMethodBody> msgs = _replayMgr.replay(true); + _logger.info(new LogMessage("Replaying {0} from leader to {1}", msgs, prospect)); + prospect.replay(msgs); + } + else + { + //pass request on to leader: + ClusterJoinBody request = new ClusterJoinBody(); + request.broker = member.getDetails(); + Broker leader = getLeader(); + send(leader, new SimpleSendable(request)); + _logger.info(new LogMessage("Passed join request for {0} to {1}", member, leader)); + } + } + + private Broker connectToProspect(MemberHandle member) throws AMQException + { + try + { + return _group.connectToProspect(member); + } + catch (Exception e) + { + e.printStackTrace(); + throw new AMQException("Could not connect to prospect: " + e, e); + } + } + + public void handleLeave(MemberHandle member) throws AMQException + { + handleFailure(findBroker(member)); + announceMembership(); + } + + public void handleSuspect(MemberHandle member) throws AMQException + { + Broker b = findBroker(member); + if(b != null) + { + //ping it to check it has failed, ping will handle failure if it has + ping(b); + announceMembership(); + } + } + + public void handleSynch(MemberHandle member) + { + _group.synched(member); + } + + private ClusterMembershipBody createAnnouncement(String membership) + { + ClusterMembershipBody announce = new ClusterMembershipBody(); + //TODO: revise this way of converting String to bytes... + announce.members = membership.getBytes(); + return announce; + } + + private void announceMembership() throws AMQException + { + String membership = SimpleMemberHandle.membersToString(_group.getMembers()); + ClusterMembershipBody announce = createAnnouncement(membership); + broadcast(new SimpleSendable(announce)); + _logger.info(new LogMessage("Membership announcement sent: {0}", membership)); + } + + private void handleFailure(Broker peer) + { + peer.remove(); + _group.remove(peer); + } + + public void handleMembershipAnnouncement(String membership) throws AMQException + { + _group.setMembers(SimpleMemberHandle.stringToMembers(membership)); + _logger.info(new LogMessage("Membership announcement received: {0}", membership)); + } + + public boolean isLeader() + { + return _group.isLeader(); + } + + public boolean isLeader(MemberHandle handle) + { + return _group.isLeader(handle); + } + + public Broker getLeader() + { + return _group.getLeader(); + } + + private Broker findBroker(MemberHandle handle) + { + return _group.findBroker(handle, false); + } + + public Member getMember(MemberHandle handle) + { + return findBroker(handle); + } + + public boolean isMember(MemberHandle member) + { + for (MemberHandle handle : _group.getMembers()) + { + if (handle.matches(member)) + { + return true; + } + } + return false; + } + + public MemberHandle getLocal() + { + return _group.getLocal(); + } + + public void failed(MemberHandle member) + { + if (isLeader()) + { + handleFailure(findBroker(member)); + try + { + announceMembership(); + } + catch (AMQException e) + { + _logger.error("Error announcing failure: " + e, e); + } + } + else + { + try + { + suspect(member); + } + catch (AMQException e) + { + _logger.error("Error sending suspect: " + e, e); + } + } + } + + public Broker create(MemberHandle handle) + { + Broker broker = _factory.create(handle); + broker.addFailureListener(this); + return broker; + } +} diff --git a/java/cluster/src/org/apache/qpid/server/cluster/GroupManager.java b/java/cluster/src/org/apache/qpid/server/cluster/GroupManager.java new file mode 100644 index 0000000000..d3b33f6fe3 --- /dev/null +++ b/java/cluster/src/org/apache/qpid/server/cluster/GroupManager.java @@ -0,0 +1,69 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster; + +import org.apache.qpid.AMQException; + +public interface GroupManager +{ + /** + * Establish a new cluster with the local member as the leader. + */ + public void establish(); + + /** + * Join the cluster to which member belongs + */ + public void join(MemberHandle member) throws AMQException; + + public void broadcast(Sendable message) throws AMQException; + + public void broadcast(Sendable message, BroadcastPolicy policy, GroupResponseHandler callback) throws AMQException; + + public void send(MemberHandle broker, Sendable message) throws AMQException; + + public void leave() throws AMQException; + + public void handleJoin(MemberHandle member) throws AMQException; + + public void handleLeave(MemberHandle member) throws AMQException; + + public void handleSuspect(MemberHandle member) throws AMQException; + + public void handlePing(MemberHandle member, long load); + + public void handleMembershipAnnouncement(String membership) throws AMQException; + + public void handleSynch(MemberHandle member); + + public boolean isLeader(); + + public boolean isLeader(MemberHandle handle); + + public boolean isMember(MemberHandle member); + + public MemberHandle redirect(); + + public MemberHandle getLocal(); + + public JoinState getState(); + + public void addMemberhipChangeListener(MembershipChangeListener l); + + public void removeMemberhipChangeListener(MembershipChangeListener l); +} diff --git a/java/cluster/src/org/apache/qpid/server/cluster/GroupRequest.java b/java/cluster/src/org/apache/qpid/server/cluster/GroupRequest.java new file mode 100644 index 0000000000..6c45c6e655 --- /dev/null +++ b/java/cluster/src/org/apache/qpid/server/cluster/GroupRequest.java @@ -0,0 +1,104 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQMethodBody; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Represents a method sent to a group of Member instances. Manages the responses, + * completion and callback. + * + */ +class GroupRequest +{ + private final Map<Member, AMQMethodBody> _responses = new HashMap<Member, AMQMethodBody>(); + private final List<Member> _brokers = new ArrayList<Member>(); + private boolean _sent; + + private final Sendable _request; + private final BroadcastPolicy _policy; + private final GroupResponseHandler _callback; + + GroupRequest(Sendable request, BroadcastPolicy policy, GroupResponseHandler callback) + { + _request = request; + _policy = policy; + _callback = callback; + } + + void send(int channel, Member session) throws AMQException + { + _brokers.add(session); + _request.send(channel, session); + } + + boolean finishedSend() + { + _sent = true; + return checkCompletion(); + } + + public boolean responseReceived(Member broker, AMQMethodBody response) + { + _responses.put(broker, response); + return checkCompletion(); + } + + public boolean removed(Member broker) + { + _brokers.remove(broker); + return checkCompletion(); + } + + private synchronized boolean checkCompletion() + { + return isComplete() && callback(); + } + + boolean isComplete() + { + return _sent && _policy != null && _policy.isComplete(_responses.size(), _brokers.size()); + } + + boolean callback() + { + _callback.response(getResults(), _brokers); + return true; + } + + List<AMQMethodBody> getResults() + { + List<AMQMethodBody> results = new ArrayList<AMQMethodBody>(_brokers.size()); + for (Member b : _brokers) + { + results.add(_responses.get(b)); + } + return results; + } + + public String toString() + { + return "GroupRequest{request=" + _request +", brokers=" + _brokers + ", responses=" + _responses + "}"; + } +} diff --git a/java/cluster/src/org/apache/qpid/server/cluster/GroupResponseHandler.java b/java/cluster/src/org/apache/qpid/server/cluster/GroupResponseHandler.java new file mode 100644 index 0000000000..c12d06a337 --- /dev/null +++ b/java/cluster/src/org/apache/qpid/server/cluster/GroupResponseHandler.java @@ -0,0 +1,28 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster; + +import org.apache.qpid.framing.AMQMethodBody; + +import java.util.List; + +public interface GroupResponseHandler +{ + //Note: this implies that the response to a group request will always be a method body... + public void response(List<AMQMethodBody> responses, List<Member> members); +} diff --git a/java/cluster/src/org/apache/qpid/server/cluster/InductionBuffer.java b/java/cluster/src/org/apache/qpid/server/cluster/InductionBuffer.java new file mode 100644 index 0000000000..76e15b88ec --- /dev/null +++ b/java/cluster/src/org/apache/qpid/server/cluster/InductionBuffer.java @@ -0,0 +1,87 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster; + +import org.apache.mina.common.IoSession; + +import java.util.LinkedList; +import java.util.Queue; + +/** + * Buffers any received messages until join completes. + * + */ +class InductionBuffer +{ + private final Queue<Message> _buffer = new LinkedList<Message>(); + private final MessageHandler _handler; + private boolean _buffering = true; + + InductionBuffer(MessageHandler handler) + { + _handler = handler; + } + + private void process() throws Exception + { + for (Message o = _buffer.poll(); o != null; o = _buffer.poll()) + { + o.deliver(_handler); + } + _buffering = false; + } + + synchronized void deliver() throws Exception + { + process(); + } + + synchronized void receive(IoSession session, Object msg) throws Exception + { + if (_buffering) + { + _buffer.offer(new Message(session, msg)); + } + else + { + _handler.deliver(session, msg); + } + } + + private static class Message + { + private final IoSession _session; + private final Object _msg; + + Message(IoSession session, Object msg) + { + _session = session; + _msg = msg; + } + + void deliver(MessageHandler handler) throws Exception + { + handler.deliver(_session, _msg); + } + } + + static interface MessageHandler + { + public void deliver(IoSession session, Object msg) throws Exception; + } +} diff --git a/java/cluster/src/org/apache/qpid/server/cluster/JoinState.java b/java/cluster/src/org/apache/qpid/server/cluster/JoinState.java new file mode 100644 index 0000000000..af618631e4 --- /dev/null +++ b/java/cluster/src/org/apache/qpid/server/cluster/JoinState.java @@ -0,0 +1,23 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster; + +public enum JoinState +{ + UNINITIALISED, JOINING, INITIATION, INDUCTION, JOINED +} diff --git a/java/cluster/src/org/apache/qpid/server/cluster/LoadTable.java b/java/cluster/src/org/apache/qpid/server/cluster/LoadTable.java new file mode 100644 index 0000000000..d95229505f --- /dev/null +++ b/java/cluster/src/org/apache/qpid/server/cluster/LoadTable.java @@ -0,0 +1,104 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster; + +import java.util.HashMap; +import java.util.Map; +import java.util.PriorityQueue; + +/** + * Maintains loading information about the local member and its cluster peers. + * + */ +public class LoadTable +{ + private final Map<MemberHandle, Loading> _peers = new HashMap<MemberHandle, Loading>(); + private final PriorityQueue<Loading> _loads = new PriorityQueue<Loading>(); + private final Loading _local = new Loading(null); + + public LoadTable() + { + _loads.add(_local); + } + + public void setLoad(Member member, long load) + { + synchronized (_peers) + { + Loading loading = _peers.get(member); + if (loading == null) + { + loading = new Loading(member); + synchronized (_loads) + { + _loads.add(loading); + } + _peers.put(member, loading); + } + loading.load = load; + } + } + + public void incrementLocalLoad() + { + synchronized (_local) + { + _local.load++; + } + } + + public void decrementLocalLoad() + { + synchronized (_local) + { + _local.load--; + } + } + + public long getLocalLoad() + { + synchronized (_local) + { + return _local.load; + } + } + + public Member redirect() + { + synchronized (_loads) + { + return _loads.peek().member; + } + } + + private static class Loading implements Comparable + { + private final Member member; + private long load; + + Loading(Member member) + { + this.member = member; + } + + public int compareTo(Object o) + { + return (int) (load - ((Loading) o).load); + } + } +} diff --git a/java/cluster/src/org/apache/qpid/server/cluster/Main.java b/java/cluster/src/org/apache/qpid/server/cluster/Main.java new file mode 100644 index 0000000000..94ec2042a2 --- /dev/null +++ b/java/cluster/src/org/apache/qpid/server/cluster/Main.java @@ -0,0 +1,117 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.OptionBuilder; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.commons.cli.PosixParser; +import org.apache.log4j.Logger; +import org.apache.mina.common.IoAcceptor; +import org.apache.mina.transport.socket.nio.SocketAcceptor; +import org.apache.mina.transport.socket.nio.SocketAcceptorConfig; +import org.apache.mina.transport.socket.nio.SocketSessionConfig; +import org.apache.qpid.pool.ReadWriteThreadModel; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry; +import org.apache.qpid.server.transport.ConnectorConfiguration; + +import java.io.File; +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; + +/** + * TODO: This is a cut-and-paste from the original broker Main class. Would be preferrable + * to make that class more reuseable to avoid all this duplication. + * + */ +public class Main extends org.apache.qpid.server.Main +{ + private static final Logger _logger = Logger.getLogger(Main.class); + + protected Main(String[] args) + { + super(args); + } + + protected void setOptions(Options otions) + { + super.setOptions(options); + + //extensions: + Option join = OptionBuilder.withArgName("join").hasArg().withDescription("Join the specified cluster member. Overrides any value in the config file"). + withLongOpt("join").create("j"); + options.addOption(join); + } + + protected void bind(int port, ConnectorConfiguration connectorConfig) + { + try + { + IoAcceptor acceptor = new SocketAcceptor(); + SocketAcceptorConfig sconfig = (SocketAcceptorConfig) acceptor.getDefaultConfig(); + SocketSessionConfig sc = (SocketSessionConfig) sconfig.getSessionConfig(); + + sc.setReceiveBufferSize(connectorConfig.socketReceiveBufferSize); + sc.setSendBufferSize(connectorConfig.socketWriteBuferSize); + sc.setTcpNoDelay(true); + + // if we do not use the executor pool threading model we get the default leader follower + // implementation provided by MINA + if (connectorConfig.enableExecutorPool) + { + sconfig.setThreadModel(new ReadWriteThreadModel()); + } + + String host = InetAddress.getLocalHost().getHostName(); + ClusteredProtocolHandler handler = new ClusteredProtocolHandler(new InetSocketAddress(host, port)); + if (connectorConfig.enableNonSSL) + { + acceptor.bind(new InetSocketAddress(port), handler, sconfig); + _logger.info("Qpid.AMQP listening on non-SSL port " + port); + handler.connect(commandLine.getOptionValue("j")); + } + + if (connectorConfig.enableSSL) + { + ClusteredProtocolHandler sslHandler = new ClusteredProtocolHandler(handler); + sslHandler.setUseSSL(true); + acceptor.bind(new InetSocketAddress(connectorConfig.sslPort), handler, sconfig); + _logger.info("Qpid.AMQP listening on SSL port " + connectorConfig.sslPort); + } + } + catch (IOException e) + { + _logger.error("Unable to bind service to registry: " + e, e); + } + catch (Exception e) + { + _logger.error("Unable to connect to cluster: " + e, e); + } + } + + public static void main(String[] args) + { + new Main(args); + } +} diff --git a/java/cluster/src/org/apache/qpid/server/cluster/Member.java b/java/cluster/src/org/apache/qpid/server/cluster/Member.java new file mode 100644 index 0000000000..d7825aac6e --- /dev/null +++ b/java/cluster/src/org/apache/qpid/server/cluster/Member.java @@ -0,0 +1,28 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQDataBlock; + +public interface Member extends MemberHandle +{ + public void send(AMQDataBlock data) throws AMQException; + + public void addFailureListener(MemberFailureListener listener); +} diff --git a/java/cluster/src/org/apache/qpid/server/cluster/MemberFailureListener.java b/java/cluster/src/org/apache/qpid/server/cluster/MemberFailureListener.java new file mode 100644 index 0000000000..4b56eaf962 --- /dev/null +++ b/java/cluster/src/org/apache/qpid/server/cluster/MemberFailureListener.java @@ -0,0 +1,23 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster; + +interface MemberFailureListener +{ + public void failed(MemberHandle member); +} diff --git a/java/cluster/src/org/apache/qpid/server/cluster/MemberHandle.java b/java/cluster/src/org/apache/qpid/server/cluster/MemberHandle.java new file mode 100644 index 0000000000..75f4d19103 --- /dev/null +++ b/java/cluster/src/org/apache/qpid/server/cluster/MemberHandle.java @@ -0,0 +1,31 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster; + +public interface MemberHandle +{ + public String getHost(); + + public int getPort(); + + public boolean matches(MemberHandle m); + + public boolean matches(String host, int port); + + public String getDetails(); +} diff --git a/java/cluster/src/org/apache/qpid/server/cluster/MembershipChangeListener.java b/java/cluster/src/org/apache/qpid/server/cluster/MembershipChangeListener.java new file mode 100644 index 0000000000..a62a413bc6 --- /dev/null +++ b/java/cluster/src/org/apache/qpid/server/cluster/MembershipChangeListener.java @@ -0,0 +1,25 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster; + +import java.util.List; + +public interface MembershipChangeListener +{ + public void changed(List<MemberHandle> members); +} diff --git a/java/cluster/src/org/apache/qpid/server/cluster/MethodHandler.java b/java/cluster/src/org/apache/qpid/server/cluster/MethodHandler.java new file mode 100644 index 0000000000..cd3e00012e --- /dev/null +++ b/java/cluster/src/org/apache/qpid/server/cluster/MethodHandler.java @@ -0,0 +1,26 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQMethodBody; + +interface MethodHandler +{ + public void handle(int channel, AMQMethodBody method) throws AMQException; +} diff --git a/java/cluster/src/org/apache/qpid/server/cluster/MethodHandlerFactory.java b/java/cluster/src/org/apache/qpid/server/cluster/MethodHandlerFactory.java new file mode 100644 index 0000000000..f4a721dbc5 --- /dev/null +++ b/java/cluster/src/org/apache/qpid/server/cluster/MethodHandlerFactory.java @@ -0,0 +1,25 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster; + +import org.apache.qpid.server.state.AMQState; + +public interface MethodHandlerFactory +{ + public MethodHandlerRegistry register(AMQState state, MethodHandlerRegistry registry); +} diff --git a/java/cluster/src/org/apache/qpid/server/cluster/MethodHandlerRegistry.java b/java/cluster/src/org/apache/qpid/server/cluster/MethodHandlerRegistry.java new file mode 100644 index 0000000000..4870f9daaa --- /dev/null +++ b/java/cluster/src/org/apache/qpid/server/cluster/MethodHandlerRegistry.java @@ -0,0 +1,41 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster; + +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.server.state.StateAwareMethodListener; + +import java.util.HashMap; +import java.util.Map; + +public class MethodHandlerRegistry +{ + private final Map<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>> registry = + new HashMap<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>>(); + + public <A extends AMQMethodBody, B extends Class<A>> MethodHandlerRegistry addHandler(B type, StateAwareMethodListener<A> handler) + { + registry.put(type, handler); + return this; + } + + public <B extends AMQMethodBody> StateAwareMethodListener<B> getHandler(B frame) + { + return (StateAwareMethodListener<B>) registry.get(frame.getClass()); + } +} diff --git a/java/cluster/src/org/apache/qpid/server/cluster/MinaBrokerProxy.java b/java/cluster/src/org/apache/qpid/server/cluster/MinaBrokerProxy.java new file mode 100644 index 0000000000..f193862e78 --- /dev/null +++ b/java/cluster/src/org/apache/qpid/server/cluster/MinaBrokerProxy.java @@ -0,0 +1,269 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster; + +import org.apache.log4j.Logger; +import org.apache.mina.common.ConnectFuture; +import org.apache.mina.common.IoHandlerAdapter; +import org.apache.mina.common.IoSession; +import org.apache.mina.common.RuntimeIOException; +import org.apache.mina.filter.codec.ProtocolCodecFilter; +import org.apache.mina.transport.socket.nio.SocketConnector; +import org.apache.mina.transport.socket.nio.SocketConnectorConfig; +import org.apache.mina.transport.socket.nio.SocketSessionConfig; +import org.apache.qpid.AMQException; +import org.apache.qpid.server.cluster.util.LogMessage; +import org.apache.qpid.client.state.AMQState; +import org.apache.qpid.codec.AMQCodecFactory; +import org.apache.qpid.framing.AMQBody; +import org.apache.qpid.framing.AMQDataBlock; +import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.ConnectionRedirectBody; +import org.apache.qpid.framing.ProtocolInitiation; +import org.apache.qpid.framing.ProtocolVersionList; + +import java.io.IOException; +import java.net.InetSocketAddress; + +/** + * A 'client stub' for a remote cluster peer, using MINA for IO Layer + * + */ +public class MinaBrokerProxy extends Broker implements MethodHandler +{ + private static final Logger _logger = Logger.getLogger(MinaBrokerProxy.class); + private final ConnectionStatusMonitor _connectionMonitor = new ConnectionStatusMonitor(); + private final ClientHandlerRegistry _legacyHandler; + private final MinaBinding _binding = new MinaBinding(); + private final MemberHandle _local; + private IoSession _session; + private MethodHandler _handler; + private Iterable<AMQMethodBody> _replay; + + MinaBrokerProxy(String host, int port, MemberHandle local) + { + super(host, port); + _local = local; + _legacyHandler = new ClientHandlerRegistry(local); + } + + private void init(IoSession session) + { + _session = session; + _handler = new ClientAdapter(session, _legacyHandler); + } + + private ConnectFuture connectImpl() + { + _logger.info("Connecting to cluster peer: " + getDetails()); + SocketConnector ioConnector = new SocketConnector(); + SocketConnectorConfig cfg = (SocketConnectorConfig) ioConnector.getDefaultConfig(); + + SocketSessionConfig scfg = (SocketSessionConfig) cfg.getSessionConfig(); + scfg.setTcpNoDelay(true); + scfg.setSendBufferSize(32768); + scfg.setReceiveBufferSize(32768); + InetSocketAddress address = new InetSocketAddress(getHost(), getPort()); + return ioConnector.connect(address, _binding); + } + + //extablish connection without handling redirect + boolean connect() throws IOException, InterruptedException + { + ConnectFuture future = connectImpl(); + // wait for connection to complete + future.join(); + // we call getSession which throws an IOException if there has been an error connecting + try + { + future.getSession(); + } + catch (RuntimeIOException e) + { + _connectionMonitor.failed(e); + _logger.error(new LogMessage("Could not connect to {0}: {1}", this, e), e); + throw e; + } + return _connectionMonitor.waitUntilOpen(); + } + + void connectAsynch(Iterable<AMQMethodBody> msgs) + { + _replay = msgs; + connectImpl(); + } + + void replay(Iterable<AMQMethodBody> msgs) + { + _replay = msgs; + if(_connectionMonitor.isOpened()) + { + replay(); + } + } + + //establish connection, handling redirect if required... + Broker connectToCluster() throws IOException, InterruptedException + { + connect(); + //wait until the connection is open or get a redirection + if (_connectionMonitor.waitUntilOpen()) + { + return this; + } + else + { + Broker broker = new MinaBrokerProxy(_connectionMonitor.getHost(), _connectionMonitor.getPort(), _local); + broker.connect(); + return broker; + } + } + + public void send(AMQDataBlock data) throws AMQException + { + if (_session == null) + { + try + { + _connectionMonitor.waitUntilOpen(); + } + catch (Exception e) + { + throw new AMQException("Failed to send " + data + ": " + e, e); + } + } + _session.write(data); + } + + private void replay() + { + if(_replay != null) + { + for(AMQMethodBody b : _replay) + { + _session.write(new AMQFrame(0, b)); + } + } + } + + public void handle(int channel, AMQMethodBody method) throws AMQException + { + _logger.info(new LogMessage("Handling method: {0} for channel {1}", method, channel)); + if (!handleResponse(channel, method)) + { + _logger.warn(new LogMessage("Unhandled method: {0} for channel {1}", method, channel)); + } + } + + private void handleMethod(int channel, AMQMethodBody method) throws AMQException + { + if (method instanceof ConnectionRedirectBody) + { + //signal redirection to waiting thread + ConnectionRedirectBody redirect = (ConnectionRedirectBody) method; + String[] parts = redirect.host.split(":"); + _connectionMonitor.redirect(parts[0], Integer.parseInt(parts[1])); + } + else + { + _handler.handle(channel, method); + if (AMQState.CONNECTION_OPEN.equals(_legacyHandler.getCurrentState()) && _handler != this) + { + _handler = this; + _logger.info(new LogMessage("Connection opened, handler switched")); + //replay any messages: + replay(); + //signal waiting thread: + _connectionMonitor.opened(); + } + } + } + + private void handleFrame(AMQFrame frame) throws AMQException + { + AMQBody body = frame.bodyFrame; + if (body instanceof AMQMethodBody) + { + handleMethod(frame.channel, (AMQMethodBody) body); + } + else + { + throw new AMQException("Client only expects method body, got: " + body); + } + } + + public String toString() + { + return "MinaBrokerProxy[" + (_session == null ? super.toString() : _session.getRemoteAddress()) + "]"; + } + + private class MinaBinding extends IoHandlerAdapter implements ProtocolVersionList + { + public void sessionCreated(IoSession session) throws Exception + { + init(session); + _logger.info(new LogMessage("{0}: created", MinaBrokerProxy.this)); + ProtocolCodecFilter pcf = new ProtocolCodecFilter(new AMQCodecFactory(false)); + session.getFilterChain().addLast("protocolFilter", pcf); + + /* Find last protocol version in protocol version list. Make sure last protocol version + listed in the build file (build-module.xml) is the latest version which will be used + here. */ + int i = pv.length - 1; + session.write(new ProtocolInitiation(pv[i][PROTOCOL_MAJOR], pv[i][PROTOCOL_MINOR])); + } + + public void sessionOpened(IoSession session) throws Exception + { + _logger.info(new LogMessage("{0}: opened", MinaBrokerProxy.this)); + } + + public void sessionClosed(IoSession session) throws Exception + { + _logger.info(new LogMessage("{0}: closed", MinaBrokerProxy.this)); + } + + public void exceptionCaught(IoSession session, Throwable throwable) throws Exception + { + _logger.error(new LogMessage("{0}: received {1}", MinaBrokerProxy.this, throwable), throwable); + if (! (throwable instanceof IOException)) + { + _session.close(); + } + failed(); + } + + public void messageReceived(IoSession session, Object object) throws Exception + { + if (object instanceof AMQFrame) + { + handleFrame((AMQFrame) object); + } + else + { + throw new AMQException("Received message of unrecognised type: " + object); + } + } + + public void messageSent(IoSession session, Object object) throws Exception + { + _logger.debug(new LogMessage("{0}: sent {1}", MinaBrokerProxy.this, object)); + } + } +} diff --git a/java/cluster/src/org/apache/qpid/server/cluster/MinaBrokerProxyFactory.java b/java/cluster/src/org/apache/qpid/server/cluster/MinaBrokerProxyFactory.java new file mode 100644 index 0000000000..9cf9beeba7 --- /dev/null +++ b/java/cluster/src/org/apache/qpid/server/cluster/MinaBrokerProxyFactory.java @@ -0,0 +1,33 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster; + +public class MinaBrokerProxyFactory implements BrokerFactory +{ + private final MemberHandle _local; + + MinaBrokerProxyFactory(MemberHandle local) + { + _local = local; + } + + public Broker create(MemberHandle handle) + { + return new MinaBrokerProxy(handle.getHost(), handle.getPort(), _local); + } +} diff --git a/java/cluster/src/org/apache/qpid/server/cluster/ResponseHandler.java b/java/cluster/src/org/apache/qpid/server/cluster/ResponseHandler.java new file mode 100644 index 0000000000..3d751fabba --- /dev/null +++ b/java/cluster/src/org/apache/qpid/server/cluster/ResponseHandler.java @@ -0,0 +1,27 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster; + +import org.apache.qpid.framing.AMQMethodBody; + +public interface ResponseHandler +{ + public void responded(AMQMethodBody response); + + public void removed(); +} diff --git a/java/cluster/src/org/apache/qpid/server/cluster/Sendable.java b/java/cluster/src/org/apache/qpid/server/cluster/Sendable.java new file mode 100644 index 0000000000..ad50a5a737 --- /dev/null +++ b/java/cluster/src/org/apache/qpid/server/cluster/Sendable.java @@ -0,0 +1,25 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster; + +import org.apache.qpid.AMQException; + +public interface Sendable +{ + public void send(int channel, Member member) throws AMQException; +} diff --git a/java/cluster/src/org/apache/qpid/server/cluster/ServerHandlerRegistry.java b/java/cluster/src/org/apache/qpid/server/cluster/ServerHandlerRegistry.java new file mode 100644 index 0000000000..2b4408b66c --- /dev/null +++ b/java/cluster/src/org/apache/qpid/server/cluster/ServerHandlerRegistry.java @@ -0,0 +1,91 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster; + +import org.apache.log4j.Logger; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.server.state.AMQState; +import org.apache.qpid.server.state.AMQStateManager; +import org.apache.qpid.server.state.IllegalStateTransitionException; +import org.apache.qpid.server.state.StateAwareMethodListener; +import org.apache.qpid.server.cluster.util.LogMessage; + +import java.util.HashMap; +import java.util.Map; + +/** + * An extension of server.AMQStateManager that allows different handlers to be registered. + * + */ +class ServerHandlerRegistry extends AMQStateManager +{ + private final Logger _logger = Logger.getLogger(ServerHandlerRegistry.class); + private final Map<AMQState, MethodHandlerRegistry> _handlers = new HashMap<AMQState, MethodHandlerRegistry>(); + + ServerHandlerRegistry() + { + super(AMQState.CONNECTION_NOT_STARTED, false); + } + + ServerHandlerRegistry(ServerHandlerRegistry s) + { + this(); + _handlers.putAll(s._handlers); + } + + ServerHandlerRegistry(MethodHandlerFactory factory) + { + this(); + init(factory); + } + + void setHandlers(AMQState state, MethodHandlerRegistry handlers) + { + _handlers.put(state, handlers); + } + + void init(MethodHandlerFactory factory) + { + for (AMQState s : AMQState.values()) + { + setHandlers(s, factory.register(s, new MethodHandlerRegistry())); + } + } + + protected <B extends AMQMethodBody> StateAwareMethodListener<B> findStateTransitionHandler(AMQState state, B frame) throws IllegalStateTransitionException + { + MethodHandlerRegistry registry = _handlers.get(state); + StateAwareMethodListener<B> handler = (registry == null) ? null : registry.getHandler(frame); + if (handler == null) + { + _logger.warn(new LogMessage("No handler for {0}, {1}", state, frame)); + } + return handler; + } + + <A extends AMQMethodBody, B extends Class<A>> void addHandler(AMQState state, B type, StateAwareMethodListener<A> handler) + { + MethodHandlerRegistry registry = _handlers.get(state); + if (registry == null) + { + registry = new MethodHandlerRegistry(); + _handlers.put(state, registry); + } + registry.addHandler(type, handler); + } +} diff --git a/java/cluster/src/org/apache/qpid/server/cluster/SimpleMemberHandle.java b/java/cluster/src/org/apache/qpid/server/cluster/SimpleMemberHandle.java new file mode 100644 index 0000000000..f0b1db25e6 --- /dev/null +++ b/java/cluster/src/org/apache/qpid/server/cluster/SimpleMemberHandle.java @@ -0,0 +1,156 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster; + +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; + +public class SimpleMemberHandle implements MemberHandle +{ + private final String _host; + private final int _port; + + public SimpleMemberHandle(String host, int port) + { + _host = host; + _port = port; + } + + public SimpleMemberHandle(String details) + { + String[] parts = details.split(":"); + _host = parts[0]; + _port = Integer.parseInt(parts[1]); + } + + public SimpleMemberHandle(InetSocketAddress address) throws UnknownHostException + { + this(address.getAddress(), address.getPort()); + } + + public SimpleMemberHandle(InetAddress address, int port) throws UnknownHostException + { + this(canonical(address).getHostAddress(), port); + } + + public String getHost() + { + return _host; + } + + public int getPort() + { + return _port; + } + + public int hashCode() + { + return getPort(); + } + + public boolean equals(Object o) + { + return o instanceof MemberHandle && matches((MemberHandle) o); + } + + public boolean matches(MemberHandle m) + { + return matches(m.getHost(), m.getPort()); + } + + public boolean matches(String host, int port) + { + return _host.equals(host) && _port == port; + } + + public String getDetails() + { + return _host + ":" + _port; + } + + public String toString() + { + return getDetails(); + } + + static List<MemberHandle> stringToMembers(String membership) + { + String[] names = membership.split("\\s"); + List<MemberHandle> members = new ArrayList<MemberHandle>(); + for (String name : names) + { + members.add(new SimpleMemberHandle(name)); + } + return members; + } + + static String membersToString(List<MemberHandle> members) + { + StringBuffer buffer = new StringBuffer(); + boolean first = true; + for (MemberHandle m : members) + { + if (first) + { + first = false; + } + else + { + buffer.append(" "); + } + buffer.append(m.getDetails()); + } + + return buffer.toString(); + } + + private static InetAddress canonical(InetAddress address) throws UnknownHostException + { + if (address.isLoopbackAddress()) + { + return InetAddress.getLocalHost(); + } + else + { + return address; + } + } + + public MemberHandle resolve() + { + return resolve(this); + } + + public static MemberHandle resolve(MemberHandle handle) + { + try + { + return new SimpleMemberHandle(new InetSocketAddress(handle.getHost(), handle.getPort())); + } + catch (UnknownHostException e) + { + e.printStackTrace(); + return handle; + } + } + + +} diff --git a/java/cluster/src/org/apache/qpid/server/cluster/SimpleSendable.java b/java/cluster/src/org/apache/qpid/server/cluster/SimpleSendable.java new file mode 100644 index 0000000000..4b75e76d97 --- /dev/null +++ b/java/cluster/src/org/apache/qpid/server/cluster/SimpleSendable.java @@ -0,0 +1,53 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQBody; +import org.apache.qpid.framing.AMQFrame; + +import java.util.Arrays; +import java.util.List; + +public class SimpleSendable implements Sendable +{ + private final List<AMQBody> _bodies; + + public SimpleSendable(AMQBody body) + { + this(Arrays.asList(body)); + } + + public SimpleSendable(List<AMQBody> bodies) + { + _bodies = bodies; + } + + public void send(int channel, Member member) throws AMQException + { + for (AMQBody body : _bodies) + { + member.send(new AMQFrame(channel, body)); + } + } + + public String toString() + { + return _bodies.toString(); + } +} diff --git a/java/cluster/src/org/apache/qpid/server/cluster/handler/ChainedClusterMethodHandler.java b/java/cluster/src/org/apache/qpid/server/cluster/handler/ChainedClusterMethodHandler.java new file mode 100644 index 0000000000..5ee07af596 --- /dev/null +++ b/java/cluster/src/org/apache/qpid/server/cluster/handler/ChainedClusterMethodHandler.java @@ -0,0 +1,69 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster.handler; + +import org.apache.qpid.server.state.AMQStateManager; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.protocol.AMQMethodEvent; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQMethodBody; + +import java.util.List; +import java.util.ArrayList; + +public class ChainedClusterMethodHandler <A extends AMQMethodBody> extends ClusterMethodHandler<A> +{ + private final List<ClusterMethodHandler<A>> _handlers; + + private ChainedClusterMethodHandler() + { + this(new ArrayList<ClusterMethodHandler<A>>()); + } + + public ChainedClusterMethodHandler(List<ClusterMethodHandler<A>> handlers) + { + _handlers = handlers; + } + + public ChainedClusterMethodHandler(ClusterMethodHandler<A>... handlers) + { + this(); + for(ClusterMethodHandler<A>handler: handlers) + { + _handlers.add(handler); + } + } + + protected final void peer(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException + { + for(ClusterMethodHandler<A> handler : _handlers) + { + handler.peer(stateMgr, queues, exchanges, session, evt); + } + } + + protected final void client(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException + { + for(ClusterMethodHandler<A> handler : _handlers) + { + handler.client(stateMgr, queues, exchanges, session, evt); + } + } +} diff --git a/java/cluster/src/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java b/java/cluster/src/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java new file mode 100644 index 0000000000..cd25bd178a --- /dev/null +++ b/java/cluster/src/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java @@ -0,0 +1,136 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster.handler; + +import org.apache.qpid.server.state.StateAwareMethodListener; +import org.apache.qpid.server.state.AMQStateManager; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.protocol.AMQMethodEvent; +import org.apache.qpid.server.cluster.util.LogMessage; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.QueueDeclareBody; +import org.apache.qpid.framing.QueueBindBody; +import org.apache.qpid.framing.QueueDeleteBody; +import org.apache.qpid.framing.BasicConsumeBody; +import org.apache.log4j.Logger; + +import java.util.Map; +import java.util.HashMap; + +/** + * Maintains the default queue names for a channel, and alters subsequent frames where necessary + * to use this (i.e. when no queue is explictly specified). + * + */ +class ChannelQueueManager +{ + private static final Logger _logger = Logger.getLogger(ChannelQueueManager.class); + private final Map<Integer, String> _channelQueues = new HashMap<Integer, String>(); + + ClusterMethodHandler<QueueDeclareBody> createQueueDeclareHandler() + { + return new QueueDeclareHandler(); + } + + ClusterMethodHandler<QueueDeleteBody> createQueueDeleteHandler() + { + return new QueueDeleteHandler(); + } + + ClusterMethodHandler<QueueBindBody> createQueueBindHandler() + { + return new QueueBindHandler(); + } + + ClusterMethodHandler<BasicConsumeBody> createBasicConsumeHandler() + { + return new BasicConsumeHandler(); + } + + private void set(int channel, String queue) + { + _channelQueues.put(channel, queue); + _logger.info(new LogMessage("Set default queue for {0} to {1}", channel, queue)); + } + + private String get(int channel) + { + String queue = _channelQueues.get(channel); + _logger.info(new LogMessage("Default queue for {0} is {1}", channel, queue)); + return queue; + } + + private class QueueDeclareHandler extends ClusterMethodHandler<QueueDeclareBody> + { + protected void peer(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<QueueDeclareBody> evt) throws AMQException + { + } + + protected void client(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<QueueDeclareBody> evt) throws AMQException + { + set(evt.getChannelId(), evt.getMethod().queue); + } + } + private class QueueBindHandler extends ClusterMethodHandler<QueueBindBody> + { + protected void peer(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<QueueBindBody> evt) throws AMQException + { + } + + protected void client(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<QueueBindBody> evt) throws AMQException + { + if(evt.getMethod().queue == null) + { + evt.getMethod().queue = get(evt.getChannelId()); + } + } + } + private class QueueDeleteHandler extends ClusterMethodHandler<QueueDeleteBody> + { + protected void peer(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<QueueDeleteBody> evt) throws AMQException + { + } + + protected void client(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<QueueDeleteBody> evt) throws AMQException + { + if(evt.getMethod().queue == null) + { + evt.getMethod().queue = get(evt.getChannelId()); + } + } + } + + private class BasicConsumeHandler extends ClusterMethodHandler<BasicConsumeBody> + { + protected void peer(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<BasicConsumeBody> evt) throws AMQException + { + } + + protected void client(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<BasicConsumeBody> evt) throws AMQException + { + if(evt.getMethod().queue == null) + { + evt.getMethod().queue = get(evt.getChannelId()); + } + } + } + +} diff --git a/java/cluster/src/org/apache/qpid/server/cluster/handler/ClusterMethodHandler.java b/java/cluster/src/org/apache/qpid/server/cluster/handler/ClusterMethodHandler.java new file mode 100644 index 0000000000..9e4444819e --- /dev/null +++ b/java/cluster/src/org/apache/qpid/server/cluster/handler/ClusterMethodHandler.java @@ -0,0 +1,46 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster.handler; + +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.server.state.StateAwareMethodListener; +import org.apache.qpid.server.state.AMQStateManager; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.protocol.AMQMethodEvent; +import org.apache.qpid.server.cluster.ClusteredProtocolSession; +import org.apache.qpid.AMQException; + +public abstract class ClusterMethodHandler<A extends AMQMethodBody> implements StateAwareMethodListener<A> +{ + public final void methodReceived(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException + { + if (ClusteredProtocolSession.isPeerSession(session)) + { + peer(stateMgr, queues, exchanges, session, evt); + } + else + { + client(stateMgr, queues, exchanges, session, evt); + } + } + + protected abstract void peer(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException; + protected abstract void client(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException; +} diff --git a/java/cluster/src/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java b/java/cluster/src/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java new file mode 100644 index 0000000000..a1684b399f --- /dev/null +++ b/java/cluster/src/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java @@ -0,0 +1,278 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster.handler; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.BasicCancelBody; +import org.apache.qpid.framing.BasicConsumeBody; +import org.apache.qpid.framing.BasicPublishBody; +import org.apache.qpid.framing.ChannelCloseBody; +import org.apache.qpid.framing.ChannelFlowBody; +import org.apache.qpid.framing.ChannelOpenBody; +import org.apache.qpid.framing.ClusterJoinBody; +import org.apache.qpid.framing.ClusterLeaveBody; +import org.apache.qpid.framing.ClusterMembershipBody; +import org.apache.qpid.framing.ClusterPingBody; +import org.apache.qpid.framing.ClusterSuspectBody; +import org.apache.qpid.framing.ConnectionCloseBody; +import org.apache.qpid.framing.ConnectionOpenBody; +import org.apache.qpid.framing.ConnectionSecureOkBody; +import org.apache.qpid.framing.ConnectionStartOkBody; +import org.apache.qpid.framing.ConnectionTuneOkBody; +import org.apache.qpid.framing.ExchangeDeclareBody; +import org.apache.qpid.framing.ExchangeDeleteBody; +import org.apache.qpid.framing.QueueBindBody; +import org.apache.qpid.framing.QueueDeclareBody; +import org.apache.qpid.framing.QueueDeleteBody; +import org.apache.qpid.framing.ClusterSynchBody; +import org.apache.qpid.framing.BasicQosBody; +import org.apache.qpid.framing.TxSelectBody; +import org.apache.qpid.framing.TxCommitBody; +import org.apache.qpid.framing.TxRollbackBody; +import org.apache.qpid.server.cluster.ClusterCapability; +import org.apache.qpid.server.cluster.ClusteredProtocolSession; +import org.apache.qpid.server.cluster.GroupManager; +import org.apache.qpid.server.cluster.LoadTable; +import org.apache.qpid.server.cluster.MemberHandle; +import org.apache.qpid.server.cluster.MethodHandlerFactory; +import org.apache.qpid.server.cluster.MethodHandlerRegistry; +import org.apache.qpid.server.cluster.SimpleMemberHandle; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.handler.ChannelCloseHandler; +import org.apache.qpid.server.handler.ChannelFlowHandler; +import org.apache.qpid.server.handler.ChannelOpenHandler; +import org.apache.qpid.server.handler.ConnectionCloseMethodHandler; +import org.apache.qpid.server.handler.ConnectionOpenMethodHandler; +import org.apache.qpid.server.handler.ConnectionSecureOkMethodHandler; +import org.apache.qpid.server.handler.ConnectionStartOkMethodHandler; +import org.apache.qpid.server.handler.ConnectionTuneOkMethodHandler; +import org.apache.qpid.server.handler.ExchangeDeclareHandler; +import org.apache.qpid.server.handler.ExchangeDeleteHandler; +import org.apache.qpid.server.handler.BasicCancelMethodHandler; +import org.apache.qpid.server.handler.BasicPublishMethodHandler; +import org.apache.qpid.server.handler.QueueBindHandler; +import org.apache.qpid.server.handler.QueueDeleteHandler; +import org.apache.qpid.server.handler.BasicQosHandler; +import org.apache.qpid.server.handler.TxSelectHandler; +import org.apache.qpid.server.handler.TxCommitHandler; +import org.apache.qpid.server.handler.TxRollbackHandler; +import org.apache.qpid.server.protocol.AMQMethodEvent; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.state.AMQState; +import org.apache.qpid.server.state.AMQStateManager; +import org.apache.qpid.server.state.StateAwareMethodListener; + +public class ClusterMethodHandlerFactory implements MethodHandlerFactory +{ + private final GroupManager _groupMgr; + private final LoadTable _loadTable; + + public ClusterMethodHandlerFactory(GroupManager groupMgr, LoadTable loadTable) + { + _groupMgr = groupMgr; + _loadTable = loadTable; + } + + public MethodHandlerRegistry register(AMQState state, MethodHandlerRegistry registry) + { + switch (state) + { + case CONNECTION_NOT_STARTED: + return registry.addHandler(ConnectionStartOkBody.class, ConnectionStartOkMethodHandler.getInstance()); + case CONNECTION_NOT_AUTH: + return registry.addHandler(ConnectionSecureOkBody.class, ConnectionSecureOkMethodHandler.getInstance()); + case CONNECTION_NOT_TUNED: + return registry.addHandler(ConnectionTuneOkBody.class, ConnectionTuneOkMethodHandler.getInstance()); + case CONNECTION_NOT_OPENED: + //connection.open override: + return registry.addHandler(ConnectionOpenBody.class, new ConnectionOpenHandler()); + case CONNECTION_OPEN: + return registerConnectionOpened(registry); + } + return registry; + } + + private MethodHandlerRegistry registerConnectionOpened(MethodHandlerRegistry registry) + { + //new cluster method handlers: + registry.addHandler(ClusterJoinBody.class, new JoinHandler()); + registry.addHandler(ClusterLeaveBody.class, new LeaveHandler()); + registry.addHandler(ClusterSuspectBody.class, new SuspectHandler()); + registry.addHandler(ClusterMembershipBody.class, new MembershipHandler()); + registry.addHandler(ClusterPingBody.class, new PingHandler()); + registry.addHandler(ClusterSynchBody.class, new SynchHandler()); + + //connection.close override: + registry.addHandler(ConnectionCloseBody.class, new ConnectionCloseHandler()); + + //replicated handlers: + registry.addHandler(ExchangeDeclareBody.class, replicated(ExchangeDeclareHandler.getInstance())); + registry.addHandler(ExchangeDeleteBody.class, replicated(ExchangeDeleteHandler.getInstance())); + + ChannelQueueManager channelQueueMgr = new ChannelQueueManager(); + + + LocalQueueDeclareHandler handler = new LocalQueueDeclareHandler(_groupMgr); + registry.addHandler(QueueDeclareBody.class, + chain(new QueueNameGenerator(handler), + channelQueueMgr.createQueueDeclareHandler(), + new ReplicatingHandler<QueueDeclareBody>(_groupMgr, handler))); + + registry.addHandler(QueueBindBody.class, chain(channelQueueMgr.createQueueBindHandler(), replicated(QueueBindHandler.getInstance()))); + registry.addHandler(QueueDeleteBody.class, chain(channelQueueMgr.createQueueDeleteHandler(), replicated(alternate(new QueueDeleteHandler(false), new QueueDeleteHandler(true))))); + registry.addHandler(BasicConsumeBody.class, chain(channelQueueMgr.createBasicConsumeHandler(), new ReplicatingConsumeHandler(_groupMgr))); + + //other modified handlers: + registry.addHandler(BasicCancelBody.class, alternate(new RemoteCancelHandler(), BasicCancelMethodHandler.getInstance())); + + //other unaffected handlers: + registry.addHandler(BasicPublishBody.class, BasicPublishMethodHandler.getInstance()); + registry.addHandler(BasicQosBody.class, BasicQosHandler.getInstance()); + registry.addHandler(ChannelOpenBody.class, ChannelOpenHandler.getInstance()); + registry.addHandler(ChannelCloseBody.class, ChannelCloseHandler.getInstance()); + registry.addHandler(ChannelFlowBody.class, ChannelFlowHandler.getInstance()); + registry.addHandler(TxSelectBody.class, TxSelectHandler.getInstance()); + registry.addHandler(TxCommitBody.class, TxCommitHandler.getInstance()); + registry.addHandler(TxRollbackBody.class, TxRollbackHandler.getInstance()); + + + return registry; + } + + private class SynchHandler implements StateAwareMethodListener<ClusterSynchBody> + { + public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry, + ExchangeRegistry exchangeRegistry, AMQProtocolSession session, + AMQMethodEvent<ClusterSynchBody> evt) throws AMQException + { + _groupMgr.handleSynch(ClusteredProtocolSession.getSessionPeer(session)); + } + } + + private class JoinHandler implements StateAwareMethodListener<ClusterJoinBody> + { + public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry, + ExchangeRegistry exchangeRegistry, AMQProtocolSession session, + AMQMethodEvent<ClusterJoinBody> evt) throws AMQException + { + _groupMgr.handleJoin(new SimpleMemberHandle(evt.getMethod().broker)); + } + } + + private class LeaveHandler implements StateAwareMethodListener<ClusterLeaveBody> + { + public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry, + ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession, + AMQMethodEvent<ClusterLeaveBody> evt) throws AMQException + { + _groupMgr.handleLeave(new SimpleMemberHandle(evt.getMethod().broker)); + } + } + + private class SuspectHandler implements StateAwareMethodListener<ClusterSuspectBody> + { + public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry, + ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession, + AMQMethodEvent<ClusterSuspectBody> evt) throws AMQException + { + _groupMgr.handleSuspect(new SimpleMemberHandle(evt.getMethod().broker)); + } + } + + private class MembershipHandler implements StateAwareMethodListener<ClusterMembershipBody> + { + public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry, + ExchangeRegistry exchangeRegistry, AMQProtocolSession session, + AMQMethodEvent<ClusterMembershipBody> evt) throws AMQException + { + ClusterMembershipBody body = evt.getMethod(); + _groupMgr.handleMembershipAnnouncement(new String(body.members)); + } + } + + private class PingHandler implements StateAwareMethodListener<ClusterPingBody> + { + public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry, + ExchangeRegistry exchangeRegistry, AMQProtocolSession session, + AMQMethodEvent<ClusterPingBody> evt) throws AMQException + { + MemberHandle peer = new SimpleMemberHandle(evt.getMethod().broker); + _groupMgr.handlePing(peer, evt.getMethod().load); + if (evt.getMethod().responseRequired) + { + evt.getMethod().load = _loadTable.getLocalLoad(); + session.writeFrame(new AMQFrame(evt.getChannelId(), evt.getMethod())); + } + } + } + + private class ConnectionOpenHandler extends ExtendedHandler<ConnectionOpenBody> + { + ConnectionOpenHandler() + { + super(ConnectionOpenMethodHandler.getInstance()); + } + + void postHandle(AMQStateManager stateMgr, AMQProtocolSession session, AMQMethodEvent<ConnectionOpenBody> evt) + { + String capabilities = evt.getMethod().capabilities; + if (ClusterCapability.contains(capabilities)) + { + ClusteredProtocolSession.setSessionPeer(session, ClusterCapability.getPeer(capabilities)); + } + else + { + _loadTable.incrementLocalLoad(); + } + } + } + + private class ConnectionCloseHandler extends ExtendedHandler<ConnectionCloseBody> + { + ConnectionCloseHandler() + { + super(ConnectionCloseMethodHandler.getInstance()); + } + + void postHandle(AMQStateManager stateMgr, AMQProtocolSession session, AMQMethodEvent<ConnectionCloseBody> evt) + { + if (!ClusteredProtocolSession.isPeerSession(session)) + { + _loadTable.decrementLocalLoad(); + } + } + } + + private <B extends AMQMethodBody> ReplicatingHandler<B> replicated(StateAwareMethodListener<B> handler) + { + return new ReplicatingHandler<B>(_groupMgr, handler); + } + + private <B extends AMQMethodBody> StateAwareMethodListener<B> alternate(StateAwareMethodListener<B> peer, StateAwareMethodListener<B> client) + { + return new PeerHandler<B>(peer, client); + } + + private <B extends AMQMethodBody> StateAwareMethodListener<B> chain(ClusterMethodHandler<B>... h) + { + return new ChainedClusterMethodHandler<B>(h); + } +} diff --git a/java/cluster/src/org/apache/qpid/server/cluster/handler/ExtendedHandler.java b/java/cluster/src/org/apache/qpid/server/cluster/handler/ExtendedHandler.java new file mode 100644 index 0000000000..08dcfbe121 --- /dev/null +++ b/java/cluster/src/org/apache/qpid/server/cluster/handler/ExtendedHandler.java @@ -0,0 +1,52 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster.handler; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.protocol.AMQMethodEvent; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.state.AMQStateManager; +import org.apache.qpid.server.state.StateAwareMethodListener; + +class ExtendedHandler<A extends AMQMethodBody> implements StateAwareMethodListener<A> +{ + private final StateAwareMethodListener<A> _base; + + ExtendedHandler(StateAwareMethodListener<A> base) + { + _base = base; + } + + public void methodReceived(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException + { + preHandle(stateMgr, session, evt); + _base.methodReceived(stateMgr, queues, exchanges, session, evt); + postHandle(stateMgr, session, evt); + } + + void preHandle(AMQStateManager stateMgr, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException + { + } + + void postHandle(AMQStateManager stateMgr, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException + { + } +} diff --git a/java/cluster/src/org/apache/qpid/server/cluster/handler/HandlerUtils.java b/java/cluster/src/org/apache/qpid/server/cluster/handler/HandlerUtils.java new file mode 100644 index 0000000000..27eff59685 --- /dev/null +++ b/java/cluster/src/org/apache/qpid/server/cluster/handler/HandlerUtils.java @@ -0,0 +1,22 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster.handler; + +public abstract class HandlerUtils +{ +} diff --git a/java/cluster/src/org/apache/qpid/server/cluster/handler/LocalQueueDeclareHandler.java b/java/cluster/src/org/apache/qpid/server/cluster/handler/LocalQueueDeclareHandler.java new file mode 100644 index 0000000000..997c055d77 --- /dev/null +++ b/java/cluster/src/org/apache/qpid/server/cluster/handler/LocalQueueDeclareHandler.java @@ -0,0 +1,74 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster.handler; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.QueueDeclareBody; +import org.apache.qpid.server.cluster.ClusteredProtocolSession; +import org.apache.qpid.server.cluster.GroupManager; +import org.apache.qpid.server.cluster.util.LogMessage; +import org.apache.qpid.server.cluster.MemberHandle; +import org.apache.qpid.server.handler.QueueDeclareHandler; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.ClusteredQueue; +import org.apache.qpid.server.queue.PrivateQueue; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.queue.RemoteQueueProxy; + +public class LocalQueueDeclareHandler extends QueueDeclareHandler +{ + private static final Logger _logger = Logger.getLogger(LocalQueueDeclareHandler.class); + private final GroupManager _groupMgr; + + LocalQueueDeclareHandler(GroupManager groupMgr) + { + _groupMgr = groupMgr; + } + + protected String createName() + { + return super.createName() + "@" + _groupMgr.getLocal().getDetails(); + } + + protected AMQQueue createQueue(QueueDeclareBody body, QueueRegistry registry, AMQProtocolSession session) throws AMQException + { + //is it private or shared: + if (body.exclusive) + { + if (ClusteredProtocolSession.isPeerSession(session)) + { + //need to get peer from the session... + MemberHandle peer = ClusteredProtocolSession.getSessionPeer(session); + _logger.debug(new LogMessage("Creating proxied queue {0} on behalf of {1}", body.queue, peer)); + return new RemoteQueueProxy(peer, _groupMgr, body.queue, body.durable, peer.getDetails(), body.autoDelete, registry); + } + else + { + _logger.debug(new LogMessage("Creating local private queue {0}", body.queue)); + return new PrivateQueue(_groupMgr, body.queue, body.durable, session.getContextKey(), body.autoDelete, registry); + } + } + else + { + _logger.debug(new LogMessage("Creating local shared queue {0}", body.queue)); + return new ClusteredQueue(_groupMgr, body.queue, body.durable, null, body.autoDelete, registry); + } + } +} diff --git a/java/cluster/src/org/apache/qpid/server/cluster/handler/NullListener.java b/java/cluster/src/org/apache/qpid/server/cluster/handler/NullListener.java new file mode 100644 index 0000000000..7fae2c6598 --- /dev/null +++ b/java/cluster/src/org/apache/qpid/server/cluster/handler/NullListener.java @@ -0,0 +1,35 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster.handler; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.protocol.AMQMethodEvent; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.state.AMQStateManager; +import org.apache.qpid.server.state.StateAwareMethodListener; + +public class NullListener<T extends AMQMethodBody> implements StateAwareMethodListener<T> +{ + public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession, AMQMethodEvent<T> evt) throws AMQException + { + } +} + diff --git a/java/cluster/src/org/apache/qpid/server/cluster/handler/PeerHandler.java b/java/cluster/src/org/apache/qpid/server/cluster/handler/PeerHandler.java new file mode 100644 index 0000000000..a2a570f045 --- /dev/null +++ b/java/cluster/src/org/apache/qpid/server/cluster/handler/PeerHandler.java @@ -0,0 +1,57 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster.handler; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.server.cluster.ClusteredProtocolSession; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.protocol.AMQMethodEvent; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.state.AMQStateManager; +import org.apache.qpid.server.state.StateAwareMethodListener; + +/** + * Base for implementing handlers that carry out different actions based on whether the method they + * are handling was sent by a peer (i.e. another broker in the cluster) or a client (i.e. an end-user + * application). + * + */ +public class PeerHandler<A extends AMQMethodBody> extends ClusterMethodHandler<A> +{ + private final StateAwareMethodListener<A> _peer; + private final StateAwareMethodListener<A> _client; + + PeerHandler(StateAwareMethodListener<A> peer, StateAwareMethodListener<A> client) + { + _peer = peer; + _client = client; + } + + protected void peer(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException + { + _peer.methodReceived(stateMgr, queues, exchanges, session, evt); + } + + protected void client(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException + { + _client.methodReceived(stateMgr, queues, exchanges, session, evt); + } + +} diff --git a/java/cluster/src/org/apache/qpid/server/cluster/handler/QueueNameGenerator.java b/java/cluster/src/org/apache/qpid/server/cluster/handler/QueueNameGenerator.java new file mode 100644 index 0000000000..2f15373eba --- /dev/null +++ b/java/cluster/src/org/apache/qpid/server/cluster/handler/QueueNameGenerator.java @@ -0,0 +1,60 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster.handler; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.QueueDeclareBody; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.protocol.AMQMethodEvent; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.state.AMQStateManager; + +/** + * Generates queue names for queues declared with no name. + * + */ +class QueueNameGenerator extends ClusterMethodHandler<QueueDeclareBody> +{ + private final LocalQueueDeclareHandler _handler; + + QueueNameGenerator(LocalQueueDeclareHandler handler) + { + _handler = handler; + } + + protected void peer(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<QueueDeclareBody> evt) throws AMQException + { + } + + protected void client(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, + AMQProtocolSession session, AMQMethodEvent<QueueDeclareBody> evt) + throws AMQException + { + setName(evt.getMethod());//need to set the name before propagating this method + } + + protected void setName(QueueDeclareBody body) + { + if (body.queue == null) + { + body.queue = _handler.createName(); + } + } +} + diff --git a/java/cluster/src/org/apache/qpid/server/cluster/handler/RemoteCancelHandler.java b/java/cluster/src/org/apache/qpid/server/cluster/handler/RemoteCancelHandler.java new file mode 100644 index 0000000000..9a598d7f07 --- /dev/null +++ b/java/cluster/src/org/apache/qpid/server/cluster/handler/RemoteCancelHandler.java @@ -0,0 +1,50 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster.handler; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.BasicCancelBody; +import org.apache.qpid.server.cluster.ClusteredProtocolSession; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.protocol.AMQMethodEvent; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.ClusteredQueue; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.state.AMQStateManager; +import org.apache.qpid.server.state.StateAwareMethodListener; + +public class RemoteCancelHandler implements StateAwareMethodListener<BasicCancelBody> +{ + private final Logger _logger = Logger.getLogger(RemoteCancelHandler.class); + + public void methodReceived(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<BasicCancelBody> evt) throws AMQException + { + //By convention, consumers setup between brokers use the queue name as the consumer tag: + AMQQueue queue = queues.getQueue(evt.getMethod().consumerTag); + if (queue instanceof ClusteredQueue) + { + ((ClusteredQueue) queue).removeRemoteSubscriber(ClusteredProtocolSession.getSessionPeer(session)); + } + else + { + _logger.warn("Got remote cancel request for non-clustered queue: " + queue); + } + } +} diff --git a/java/cluster/src/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java b/java/cluster/src/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java new file mode 100644 index 0000000000..24ce4087fb --- /dev/null +++ b/java/cluster/src/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java @@ -0,0 +1,55 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster.handler; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.BasicConsumeBody; +import org.apache.qpid.framing.BasicConsumeOkBody; +import org.apache.qpid.server.cluster.ClusteredProtocolSession; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.protocol.AMQMethodEvent; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.ClusteredQueue; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.state.AMQStateManager; +import org.apache.qpid.server.state.StateAwareMethodListener; + +/** + * Handles consume requests from other cluster members. + * + */ +public class RemoteConsumeHandler implements StateAwareMethodListener<BasicConsumeBody> +{ + private final Logger _logger = Logger.getLogger(RemoteConsumeHandler.class); + + public void methodReceived(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<BasicConsumeBody> evt) throws AMQException + { + AMQQueue queue = queues.getQueue(evt.getMethod().queue); + if (queue instanceof ClusteredQueue) + { + ((ClusteredQueue) queue).addRemoteSubcriber(ClusteredProtocolSession.getSessionPeer(session)); + session.writeFrame(BasicConsumeOkBody.createAMQFrame(evt.getChannelId(), evt.getMethod().queue)); + } + else + { + _logger.warn("Got remote consume request for non-clustered queue: " + queue); + } + } +} diff --git a/java/cluster/src/org/apache/qpid/server/cluster/handler/ReplicatingConsumeHandler.java b/java/cluster/src/org/apache/qpid/server/cluster/handler/ReplicatingConsumeHandler.java new file mode 100644 index 0000000000..4a895fcd0a --- /dev/null +++ b/java/cluster/src/org/apache/qpid/server/cluster/handler/ReplicatingConsumeHandler.java @@ -0,0 +1,81 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster.handler; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.BasicConsumeBody; +import org.apache.qpid.server.cluster.BroadcastPolicy; +import org.apache.qpid.server.cluster.GroupManager; +import org.apache.qpid.server.cluster.util.LogMessage; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.handler.BasicConsumeMethodHandler; +import org.apache.qpid.server.protocol.AMQMethodEvent; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.state.AMQStateManager; +import org.apache.qpid.server.state.StateAwareMethodListener; + +public class ReplicatingConsumeHandler extends ReplicatingHandler<BasicConsumeBody> +{ + ReplicatingConsumeHandler(GroupManager groupMgr) + { + this(groupMgr, null); + } + + ReplicatingConsumeHandler(GroupManager groupMgr, BroadcastPolicy policy) + { + super(groupMgr, base(), policy); + } + + protected void replicate(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<BasicConsumeBody> evt) throws AMQException + { + //only replicate if the queue in question is a shared queue + if (isShared(queues.getQueue(evt.getMethod().queue))) + { + super.replicate(stateMgr, queues, exchanges, session, evt); + } + else + { + _logger.info(new LogMessage("Handling consume for private queue ({0}) locally", evt.getMethod())); + local(stateMgr, queues, exchanges, session, evt); + _logger.info(new LogMessage("Handled consume for private queue ({0}) locally", evt.getMethod())); + + } + } + + protected boolean isShared(AMQQueue queue) + { + return queue != null && queue.isShared(); + } + + static StateAwareMethodListener<BasicConsumeBody> base() + { + return new PeerHandler<BasicConsumeBody>(peer(), client()); + } + + static StateAwareMethodListener<BasicConsumeBody> peer() + { + return new RemoteConsumeHandler(); + } + + static StateAwareMethodListener<BasicConsumeBody> client() + { + return BasicConsumeMethodHandler.getInstance(); + } +} diff --git a/java/cluster/src/org/apache/qpid/server/cluster/handler/ReplicatingHandler.java b/java/cluster/src/org/apache/qpid/server/cluster/handler/ReplicatingHandler.java new file mode 100644 index 0000000000..a5fab27d16 --- /dev/null +++ b/java/cluster/src/org/apache/qpid/server/cluster/handler/ReplicatingHandler.java @@ -0,0 +1,127 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster.handler; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.server.cluster.BroadcastPolicy; +import org.apache.qpid.server.cluster.ClusteredProtocolSession; +import org.apache.qpid.server.cluster.GroupManager; +import org.apache.qpid.server.cluster.GroupResponseHandler; +import org.apache.qpid.server.cluster.util.LogMessage; +import org.apache.qpid.server.cluster.Member; +import org.apache.qpid.server.cluster.SimpleSendable; +import org.apache.qpid.server.cluster.policy.StandardPolicies; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.protocol.AMQMethodEvent; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.state.AMQStateManager; +import org.apache.qpid.server.state.StateAwareMethodListener; + +import java.util.List; + +/** + * Basic template for handling methods that should be broadcast to the group and + * processed locally after 'completion' of this broadcast. + * + */ +class ReplicatingHandler<A extends AMQMethodBody> extends ClusterMethodHandler<A> implements StandardPolicies +{ + protected static final Logger _logger = Logger.getLogger(ReplicatingHandler.class); + + private final StateAwareMethodListener<A> _base; + private final GroupManager _groupMgr; + private final BroadcastPolicy _policy; + + ReplicatingHandler(GroupManager groupMgr, StateAwareMethodListener<A> base) + { + this(groupMgr, base, null); + } + + ReplicatingHandler(GroupManager groupMgr, StateAwareMethodListener<A> base, BroadcastPolicy policy) + { + _groupMgr = groupMgr; + _base = base; + _policy = policy; + } + + protected void peer(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException + { + local(stateMgr, queues, exchanges, session, evt); + _logger.debug(new LogMessage("Handled {0} locally", evt.getMethod())); + } + + protected void client(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException + { + replicate(stateMgr, queues, exchanges, session, evt); + } + + protected void replicate(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException + { + if (_policy == null) + { + //asynch delivery + _groupMgr.broadcast(new SimpleSendable(evt.getMethod())); + local(stateMgr, queues, exchanges, session, evt); + } + else + { + Callback callback = new Callback(stateMgr, queues, exchanges, session, evt); + _groupMgr.broadcast(new SimpleSendable(evt.getMethod()), _policy, callback); + } + _logger.debug(new LogMessage("Replicated {0} to peers", evt.getMethod())); + } + + protected void local(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException + { + _base.methodReceived(stateMgr, queues, exchanges, session, evt); + } + + private class Callback implements GroupResponseHandler + { + private final AMQStateManager _stateMgr; + private final QueueRegistry _queues; + private final ExchangeRegistry _exchanges; + private final AMQProtocolSession _session; + private final AMQMethodEvent<A> _evt; + + Callback(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) + { + _stateMgr = stateMgr; + _queues = queues; + _exchanges = exchanges; + _session = session; + _evt = evt; + } + + public void response(List<AMQMethodBody> responses, List<Member> members) + { + try + { + local(_stateMgr, _queues, _exchanges, _session, _evt); + _logger.debug(new LogMessage("Handled {0} locally, in response to completion of replication", _evt.getMethod())); + } + catch (AMQException e) + { + _logger.error(new LogMessage("Error handling {0}:{1}", _evt.getMethod(), e), e); + } + } + } +} diff --git a/java/cluster/src/org/apache/qpid/server/cluster/handler/WrappedListener.java b/java/cluster/src/org/apache/qpid/server/cluster/handler/WrappedListener.java new file mode 100644 index 0000000000..18c3f5ce58 --- /dev/null +++ b/java/cluster/src/org/apache/qpid/server/cluster/handler/WrappedListener.java @@ -0,0 +1,53 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster.handler; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.protocol.AMQMethodEvent; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.state.AMQStateManager; +import org.apache.qpid.server.state.StateAwareMethodListener; + +public class WrappedListener<T extends AMQMethodBody> implements StateAwareMethodListener<T> +{ + private final StateAwareMethodListener<T> _primary; + private final StateAwareMethodListener _post; + private final StateAwareMethodListener _pre; + + WrappedListener(StateAwareMethodListener<T> primary, StateAwareMethodListener pre, StateAwareMethodListener post) + { + _pre = check(pre); + _post = check(post); + _primary = check(primary); + } + + public void methodReceived(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<T> evt) throws AMQException + { + _pre.methodReceived(stateMgr, queues, exchanges, session, evt); + _primary.methodReceived(stateMgr, queues, exchanges, session, evt); + _post.methodReceived(stateMgr, queues, exchanges, session, evt); + } + + private static <T extends AMQMethodBody> StateAwareMethodListener<T> check(StateAwareMethodListener<T> in) + { + return in == null ? new NullListener<T>() : in; + } +} diff --git a/java/cluster/src/org/apache/qpid/server/cluster/handler/WrappingMethodHandlerFactory.java b/java/cluster/src/org/apache/qpid/server/cluster/handler/WrappingMethodHandlerFactory.java new file mode 100644 index 0000000000..912651d3ce --- /dev/null +++ b/java/cluster/src/org/apache/qpid/server/cluster/handler/WrappingMethodHandlerFactory.java @@ -0,0 +1,82 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster.handler; + +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.server.cluster.MethodHandlerFactory; +import org.apache.qpid.server.cluster.MethodHandlerRegistry; +import org.apache.qpid.server.state.AMQState; +import org.apache.qpid.server.state.StateAwareMethodListener; + +public abstract class WrappingMethodHandlerFactory implements MethodHandlerFactory +{ + private final MethodHandlerFactory _delegate; + private final StateAwareMethodListener _pre; + private final StateAwareMethodListener _post; + + protected WrappingMethodHandlerFactory(MethodHandlerFactory delegate, + StateAwareMethodListener pre, + StateAwareMethodListener post) + { + _delegate = delegate; + _pre = pre; + _post = post; + } + + public MethodHandlerRegistry register(AMQState state, MethodHandlerRegistry registry) + { + if (isWrappableState(state)) + { + return wrap(_delegate.register(state, registry), state); + } + else + { + return _delegate.register(state, registry); + } + } + + protected abstract boolean isWrappableState(AMQState state); + + protected abstract Iterable<FrameDescriptor> getWrappableFrameTypes(AMQState state); + + private MethodHandlerRegistry wrap(MethodHandlerRegistry registry, AMQState state) + { + for (FrameDescriptor fd : getWrappableFrameTypes(state)) + { + wrap(registry, fd.type, fd.instance); + } + return registry; + } + + private <A extends AMQMethodBody, B extends Class<A>> void wrap(MethodHandlerRegistry r, B type, A frame) + { + r.addHandler(type, new WrappedListener<A>(r.getHandler(frame), _pre, _post)); + } + + protected static class FrameDescriptor<A extends AMQMethodBody, B extends Class<A>> + { + protected final A instance; + protected final B type; + + public FrameDescriptor(B type, A instance) + { + this.instance = instance; + this.type = type; + } + } +} diff --git a/java/cluster/src/org/apache/qpid/server/cluster/policy/AsynchBroadcastPolicy.java b/java/cluster/src/org/apache/qpid/server/cluster/policy/AsynchBroadcastPolicy.java new file mode 100644 index 0000000000..044d264380 --- /dev/null +++ b/java/cluster/src/org/apache/qpid/server/cluster/policy/AsynchBroadcastPolicy.java @@ -0,0 +1,28 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster.policy; + +import org.apache.qpid.server.cluster.BroadcastPolicy; + +public class AsynchBroadcastPolicy implements BroadcastPolicy +{ + public boolean isComplete(int responded, int members) + { + return true; + } +} diff --git a/java/cluster/src/org/apache/qpid/server/cluster/policy/MajorityResponseBroadcastPolicy.java b/java/cluster/src/org/apache/qpid/server/cluster/policy/MajorityResponseBroadcastPolicy.java new file mode 100644 index 0000000000..cd004c36d0 --- /dev/null +++ b/java/cluster/src/org/apache/qpid/server/cluster/policy/MajorityResponseBroadcastPolicy.java @@ -0,0 +1,28 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster.policy; + +import org.apache.qpid.server.cluster.BroadcastPolicy; + +public class MajorityResponseBroadcastPolicy implements BroadcastPolicy +{ + public boolean isComplete(int responded, int members) + { + return responded > members / 2; + } +} diff --git a/java/cluster/src/org/apache/qpid/server/cluster/policy/OneResponseBroadcastPolicy.java b/java/cluster/src/org/apache/qpid/server/cluster/policy/OneResponseBroadcastPolicy.java new file mode 100644 index 0000000000..0aca6fbe97 --- /dev/null +++ b/java/cluster/src/org/apache/qpid/server/cluster/policy/OneResponseBroadcastPolicy.java @@ -0,0 +1,28 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster.policy; + +import org.apache.qpid.server.cluster.BroadcastPolicy; + +public class OneResponseBroadcastPolicy implements BroadcastPolicy +{ + public boolean isComplete(int responded, int members) + { + return responded > 0; + } +} diff --git a/java/cluster/src/org/apache/qpid/server/cluster/policy/StandardPolicies.java b/java/cluster/src/org/apache/qpid/server/cluster/policy/StandardPolicies.java new file mode 100644 index 0000000000..fbee483967 --- /dev/null +++ b/java/cluster/src/org/apache/qpid/server/cluster/policy/StandardPolicies.java @@ -0,0 +1,26 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster.policy; + +import org.apache.qpid.server.cluster.BroadcastPolicy; + +public interface StandardPolicies +{ + public static final BroadcastPolicy ASYNCH_POLICY = new AsynchBroadcastPolicy(); + public static final BroadcastPolicy SYNCH_POLICY = new SynchBroadcastPolicy(); +} diff --git a/java/cluster/src/org/apache/qpid/server/cluster/policy/SynchBroadcastPolicy.java b/java/cluster/src/org/apache/qpid/server/cluster/policy/SynchBroadcastPolicy.java new file mode 100644 index 0000000000..1c6023d35d --- /dev/null +++ b/java/cluster/src/org/apache/qpid/server/cluster/policy/SynchBroadcastPolicy.java @@ -0,0 +1,28 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster.policy; + +import org.apache.qpid.server.cluster.BroadcastPolicy; + +public class SynchBroadcastPolicy implements BroadcastPolicy +{ + public boolean isComplete(int responded, int members) + { + return responded == members; + } +} diff --git a/java/cluster/src/org/apache/qpid/server/cluster/replay/ChainedMethodRecorder.java b/java/cluster/src/org/apache/qpid/server/cluster/replay/ChainedMethodRecorder.java new file mode 100644 index 0000000000..cbbc8679a8 --- /dev/null +++ b/java/cluster/src/org/apache/qpid/server/cluster/replay/ChainedMethodRecorder.java @@ -0,0 +1,45 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster.replay; + +import org.apache.qpid.framing.AMQMethodBody; + +abstract class ChainedMethodRecorder <T extends AMQMethodBody> implements MethodRecorder<T> +{ + private final MethodRecorder<T> _recorder; + + ChainedMethodRecorder() + { + this(null); + } + + ChainedMethodRecorder(MethodRecorder<T> recorder) + { + _recorder = recorder; + } + + public final void record(T method) + { + if(!doRecord(method) && _recorder != null) + { + _recorder.record(method); + } + } + + protected abstract boolean doRecord(T method); +} diff --git a/java/cluster/src/org/apache/qpid/server/cluster/replay/ConsumerCounts.java b/java/cluster/src/org/apache/qpid/server/cluster/replay/ConsumerCounts.java new file mode 100644 index 0000000000..7ba51108f5 --- /dev/null +++ b/java/cluster/src/org/apache/qpid/server/cluster/replay/ConsumerCounts.java @@ -0,0 +1,66 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster.replay; + +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.BasicConsumeBody; + +import java.util.Map; +import java.util.HashMap; +import java.util.List; + +class ConsumerCounts +{ + private final Map<String, Integer> _counts = new HashMap<String, Integer>(); + + synchronized void increment(String queue) + { + _counts.put(queue, get(queue) + 1); + } + + synchronized void decrement(String queue) + { + _counts.put(queue, get(queue) - 1); + } + + private int get(String queue) + { + Integer count = _counts.get(queue); + return count == null ? 0 : count; + } + + synchronized void replay(List<AMQMethodBody> messages) + { + for(String queue : _counts.keySet()) + { + BasicConsumeBody m = new BasicConsumeBody(); + m.queue = queue; + m.consumerTag = queue; + replay(m, messages); + } + } + + private void replay(BasicConsumeBody msg, List<AMQMethodBody> messages) + { + int count = _counts.get(msg.queue); + for(int i = 0; i < count; i++) + { + messages.add(msg); + } + } +} diff --git a/java/cluster/src/org/apache/qpid/server/cluster/replay/MethodRecorder.java b/java/cluster/src/org/apache/qpid/server/cluster/replay/MethodRecorder.java new file mode 100644 index 0000000000..8d5ce4f18e --- /dev/null +++ b/java/cluster/src/org/apache/qpid/server/cluster/replay/MethodRecorder.java @@ -0,0 +1,29 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster.replay; + +import org.apache.qpid.framing.AMQMethodBody; + +/** + * Abstraction through which a method can be recorded for replay + * + */ +interface MethodRecorder<T extends AMQMethodBody> +{ + public void record(T method); +} diff --git a/java/cluster/src/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java b/java/cluster/src/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java new file mode 100644 index 0000000000..638ec64e09 --- /dev/null +++ b/java/cluster/src/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java @@ -0,0 +1,70 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster.replay; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.BasicCancelBody; +import org.apache.qpid.framing.BasicConsumeBody; +import org.apache.qpid.framing.ExchangeDeclareBody; +import org.apache.qpid.framing.ExchangeDeleteBody; +import org.apache.qpid.framing.QueueBindBody; +import org.apache.qpid.framing.QueueDeclareBody; +import org.apache.qpid.framing.QueueDeleteBody; +import org.apache.qpid.server.cluster.MethodHandlerFactory; +import org.apache.qpid.server.cluster.MethodHandlerRegistry; +import org.apache.qpid.server.cluster.handler.WrappingMethodHandlerFactory; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.protocol.AMQMethodEvent; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.state.AMQState; +import org.apache.qpid.server.state.AMQStateManager; +import org.apache.qpid.server.state.StateAwareMethodListener; + +import java.util.Arrays; + +public class RecordingMethodHandlerFactory extends WrappingMethodHandlerFactory +{ + private final Iterable<FrameDescriptor> _frames = Arrays.asList(new FrameDescriptor[] + { + new FrameDescriptor(QueueDeclareBody.class, new QueueDeclareBody()), + new FrameDescriptor(QueueDeleteBody.class, new QueueDeleteBody()), + new FrameDescriptor(QueueBindBody.class, new QueueBindBody()), + new FrameDescriptor(ExchangeDeclareBody.class, new ExchangeDeclareBody()), + new FrameDescriptor(ExchangeDeleteBody.class, new ExchangeDeleteBody()), + new FrameDescriptor(BasicConsumeBody.class, new BasicConsumeBody()), + new FrameDescriptor(BasicCancelBody.class, new BasicCancelBody()) + }); + + + public RecordingMethodHandlerFactory(MethodHandlerFactory factory, ReplayStore store) + { + super(factory, null, store); + } + + protected boolean isWrappableState(AMQState state) + { + return AMQState.CONNECTION_OPEN.equals(state); + } + + protected Iterable<FrameDescriptor> getWrappableFrameTypes(AMQState state) + { + return _frames; + } +} diff --git a/java/cluster/src/org/apache/qpid/server/cluster/replay/ReplayManager.java b/java/cluster/src/org/apache/qpid/server/cluster/replay/ReplayManager.java new file mode 100644 index 0000000000..1f555b0f91 --- /dev/null +++ b/java/cluster/src/org/apache/qpid/server/cluster/replay/ReplayManager.java @@ -0,0 +1,34 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster.replay; + +import org.apache.qpid.server.cluster.Sendable; +import org.apache.qpid.framing.AMQDataBlock; +import org.apache.qpid.framing.AMQMethodBody; + +import java.util.List; + +/** + * Abstraction of a replay strategy for use in getting joining members up to + * date with respect to cluster state. + * + */ +public interface ReplayManager +{ + public List<AMQMethodBody> replay(boolean isLeader); +} diff --git a/java/cluster/src/org/apache/qpid/server/cluster/replay/ReplayStore.java b/java/cluster/src/org/apache/qpid/server/cluster/replay/ReplayStore.java new file mode 100644 index 0000000000..66bd8e0b0c --- /dev/null +++ b/java/cluster/src/org/apache/qpid/server/cluster/replay/ReplayStore.java @@ -0,0 +1,310 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster.replay; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.ExchangeDeclareBody; +import org.apache.qpid.framing.ExchangeDeleteBody; +import org.apache.qpid.framing.QueueBindBody; +import org.apache.qpid.framing.QueueDeclareBody; +import org.apache.qpid.framing.QueueDeleteBody; +import org.apache.qpid.framing.ClusterSynchBody; +import org.apache.qpid.framing.BasicConsumeBody; +import org.apache.qpid.framing.BasicCancelBody; +import org.apache.qpid.server.cluster.ClusteredProtocolSession; +import org.apache.qpid.server.cluster.util.LogMessage; +import org.apache.qpid.server.cluster.util.Bindings; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.protocol.AMQMethodEvent; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.state.AMQStateManager; +import org.apache.qpid.server.state.StateAwareMethodListener; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Stores method invocations for replay to new members. + * + */ +public class ReplayStore implements ReplayManager, StateAwareMethodListener +{ + private static final Logger _logger = Logger.getLogger(ReplayStore.class); + + private final Map<Class<? extends AMQMethodBody>, MethodRecorder> _globalRecorders = new HashMap<Class<? extends AMQMethodBody>, MethodRecorder>(); + private final Map<Class<? extends AMQMethodBody>, MethodRecorder> _localRecorders = new HashMap<Class<? extends AMQMethodBody>, MethodRecorder>(); + private final Map<String, QueueDeclareBody> _sharedQueues = new ConcurrentHashMap<String, QueueDeclareBody>(); + private final Map<String, QueueDeclareBody> _privateQueues = new ConcurrentHashMap<String, QueueDeclareBody>(); + private final Bindings<String, String, QueueBindBody> _sharedBindings = new Bindings<String, String, QueueBindBody>(); + private final Bindings<String, String, QueueBindBody> _privateBindings = new Bindings<String, String, QueueBindBody>(); + private final Map<String, ExchangeDeclareBody> _exchanges = new ConcurrentHashMap<String, ExchangeDeclareBody>(); + private final ConsumerCounts _consumers = new ConsumerCounts(); + + public ReplayStore() + { + _globalRecorders.put(QueueDeclareBody.class, new SharedQueueDeclareRecorder()); + _globalRecorders.put(QueueDeleteBody.class, new SharedQueueDeleteRecorder()); + _globalRecorders.put(QueueBindBody.class, new SharedQueueBindRecorder()); + _globalRecorders.put(ExchangeDeclareBody.class, new ExchangeDeclareRecorder()); + _globalRecorders.put(ExchangeDeleteBody.class, new ExchangeDeleteRecorder()); + + _localRecorders.put(QueueDeclareBody.class, new PrivateQueueDeclareRecorder()); + _localRecorders.put(QueueDeleteBody.class, new PrivateQueueDeleteRecorder()); + _localRecorders.put(QueueBindBody.class, new PrivateQueueBindRecorder()); + _localRecorders.put(BasicConsumeBody.class, new BasicConsumeRecorder()); + _localRecorders.put(BasicCancelBody.class, new BasicCancelRecorder()); + _localRecorders.put(ExchangeDeclareBody.class, new ExchangeDeclareRecorder()); + _localRecorders.put(ExchangeDeleteBody.class, new ExchangeDeleteRecorder()); + } + + public void methodReceived(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent evt) throws AMQException + { + _logger.debug(new LogMessage("Replay store received {0}", evt.getMethod())); + AMQMethodBody request = evt.getMethod(); + + //allow any (relevant) recorder registered for this type of request to record it: + MethodRecorder recorder = getRecorders(session).get(request.getClass()); + if (recorder != null) + { + recorder.record(request); + } + } + + private Map<Class<? extends AMQMethodBody>, MethodRecorder> getRecorders(AMQProtocolSession session) + { + if (ClusteredProtocolSession.isPeerSession(session)) + { + return _globalRecorders; + } + else + { + return _localRecorders; + } + } + + public List<AMQMethodBody> replay(boolean isLeader) + { + List<AMQMethodBody> methods = new ArrayList<AMQMethodBody>(); + methods.addAll(_exchanges.values()); + methods.addAll(_privateQueues.values()); + synchronized(_privateBindings) + { + methods.addAll(_privateBindings.values()); + } + if (isLeader) + { + methods.addAll(_sharedQueues.values()); + synchronized(_sharedBindings) + { + methods.addAll(_sharedBindings.values()); + } + } + _consumers.replay(methods); + methods.add(new ClusterSynchBody()); + return methods; + } + + private class BasicConsumeRecorder implements MethodRecorder<BasicConsumeBody> + { + public void record(BasicConsumeBody method) + { + if(_sharedQueues.containsKey(method.queue)) + { + _consumers.increment(method.queue); + } + } + } + + private class BasicCancelRecorder implements MethodRecorder<BasicCancelBody> + { + public void record(BasicCancelBody method) + { + if(_sharedQueues.containsKey(method.consumerTag)) + { + _consumers.decrement(method.consumerTag); + } + } + } + + private class SharedQueueDeclareRecorder extends QueueDeclareRecorder + { + SharedQueueDeclareRecorder() + { + super(false, _sharedQueues); + } + } + + private class PrivateQueueDeclareRecorder extends QueueDeclareRecorder + { + PrivateQueueDeclareRecorder() + { + super(true, _privateQueues, new SharedQueueDeclareRecorder()); + } + } + + private class SharedQueueDeleteRecorder extends QueueDeleteRecorder + { + SharedQueueDeleteRecorder() + { + super(_sharedQueues, _sharedBindings); + } + } + + private class PrivateQueueDeleteRecorder extends QueueDeleteRecorder + { + PrivateQueueDeleteRecorder() + { + super(_privateQueues, _privateBindings, new SharedQueueDeleteRecorder()); + } + } + + private class SharedQueueBindRecorder extends QueueBindRecorder + { + SharedQueueBindRecorder() + { + super(_sharedQueues, _sharedBindings); + } + } + + private class PrivateQueueBindRecorder extends QueueBindRecorder + { + PrivateQueueBindRecorder() + { + super(_privateQueues, _privateBindings, new SharedQueueBindRecorder()); + } + } + + + private static class QueueDeclareRecorder extends ChainedMethodRecorder<QueueDeclareBody> + { + private final boolean _exclusive; + private final Map<String, QueueDeclareBody> _queues; + + QueueDeclareRecorder(boolean exclusive, Map<String, QueueDeclareBody> queues) + { + _queues = queues; + _exclusive = exclusive; + } + + QueueDeclareRecorder(boolean exclusive, Map<String, QueueDeclareBody> queues, QueueDeclareRecorder recorder) + { + super(recorder); + _queues = queues; + _exclusive = exclusive; + } + + + protected boolean doRecord(QueueDeclareBody method) + { + if (_exclusive == method.exclusive) + { + _queues.put(method.queue, method); + return true; + } + else + { + return false; + } + } + } + + private class QueueDeleteRecorder extends ChainedMethodRecorder<QueueDeleteBody> + { + private final Map<String, QueueDeclareBody> _queues; + private final Bindings<String, String, QueueBindBody> _bindings; + + QueueDeleteRecorder(Map<String, QueueDeclareBody> queues, Bindings<String, String, QueueBindBody> bindings) + { + this(queues, bindings, null); + } + + QueueDeleteRecorder(Map<String, QueueDeclareBody> queues, Bindings<String, String, QueueBindBody> bindings, QueueDeleteRecorder recorder) + { + super(recorder); + _queues = queues; + _bindings = bindings; + } + + protected boolean doRecord(QueueDeleteBody method) + { + if (_queues.remove(method.queue) != null) + { + _bindings.unbind1(method.queue); + return true; + } + else + { + return false; + } + } + } + + private class QueueBindRecorder extends ChainedMethodRecorder<QueueBindBody> + { + private final Map<String, QueueDeclareBody> _queues; + private final Bindings<String, String, QueueBindBody> _bindings; + + QueueBindRecorder(Map<String, QueueDeclareBody> queues, Bindings<String, String, QueueBindBody> bindings) + { + _queues = queues; + _bindings = bindings; + } + + QueueBindRecorder(Map<String, QueueDeclareBody> queues, Bindings<String, String, QueueBindBody> bindings, QueueBindRecorder recorder) + { + super(recorder); + _queues = queues; + _bindings = bindings; + } + + protected boolean doRecord(QueueBindBody method) + { + if (_queues.containsKey(method.queue)) + { + _bindings.bind(method.queue, method.exchange, method); + return true; + } + else + { + return false; + } + } + } + + private class ExchangeDeclareRecorder implements MethodRecorder<ExchangeDeclareBody> + { + public void record(ExchangeDeclareBody method) + { + _exchanges.put(method.exchange, method); + } + } + + private class ExchangeDeleteRecorder implements MethodRecorder<ExchangeDeleteBody> + { + public void record(ExchangeDeleteBody method) + { + _exchanges.remove(method.exchange); + } + } +} diff --git a/java/cluster/src/org/apache/qpid/server/cluster/util/Bindings.java b/java/cluster/src/org/apache/qpid/server/cluster/util/Bindings.java new file mode 100644 index 0000000000..cca3953f34 --- /dev/null +++ b/java/cluster/src/org/apache/qpid/server/cluster/util/Bindings.java @@ -0,0 +1,80 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster.util; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; + +/** + * Maps two separate keys to a list of values. + * + */ +public class Bindings<K1, K2, V> +{ + private final MultiValuedMap<K1, Binding<K2>> _a = new MultiValuedMap<K1, Binding<K2>>(); + private final MultiValuedMap<K2, Binding<K1>> _b = new MultiValuedMap<K2, Binding<K1>>(); + private final Collection<V> _values = new HashSet<V>(); + + public void bind(K1 key1, K2 key2, V value) + { + _a.add(key1, new Binding<K2>(key2, value)); + _b.add(key2, new Binding<K1>(key1, value)); + _values.add(value); + } + + public void unbind1(K1 key1) + { + Collection<Binding<K2>> values = _a.remove(key1); + for (Binding<K2> v : values) + { + _b.remove(v.key); + _values.remove(v.value); + } + } + + public void unbind2(K2 key2) + { + Collection<Binding<K1>> values = _b.remove(key2); + for (Binding<K1> v : values) + { + _a.remove(v.key); + _values.remove(v.value); + } + } + + public Collection<V> values() + { + return Collections.unmodifiableCollection(_values); + } + + /** + * Value needs to hold key to the other map + */ + private class Binding<T> + { + private final T key; + private final V value; + + Binding(T key, V value) + { + this.key = key; + this.value = value; + } + } +} diff --git a/java/cluster/src/org/apache/qpid/server/cluster/util/InvokeMultiple.java b/java/cluster/src/org/apache/qpid/server/cluster/util/InvokeMultiple.java new file mode 100644 index 0000000000..5bdc824060 --- /dev/null +++ b/java/cluster/src/org/apache/qpid/server/cluster/util/InvokeMultiple.java @@ -0,0 +1,69 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster.util; + +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; +import java.util.Set; +import java.util.HashSet; + +/** + * Allows a method to be invoked on a list of listeners with one call + * + */ +public class InvokeMultiple <T> implements InvocationHandler +{ + private final Set<T> _targets = new HashSet<T>(); + private final T _proxy; + + public InvokeMultiple(Class<? extends T> type) + { + _proxy = (T) Proxy.newProxyInstance(type.getClassLoader(), new Class[]{type}, this); + } + + public Object invoke(Object proxy, Method method, Object[] args) throws Throwable + { + Set<T> targets; + synchronized(this) + { + targets = new HashSet<T>(_targets); + } + + for(T target : targets) + { + method.invoke(target, args); + } + return null; + } + + public synchronized void addListener(T t) + { + _targets.add(t); + } + + public synchronized void removeListener(T t) + { + _targets.remove(t); + } + + public T getProxy() + { + return _proxy; + } +} diff --git a/java/cluster/src/org/apache/qpid/server/cluster/util/LogMessage.java b/java/cluster/src/org/apache/qpid/server/cluster/util/LogMessage.java new file mode 100644 index 0000000000..9824041358 --- /dev/null +++ b/java/cluster/src/org/apache/qpid/server/cluster/util/LogMessage.java @@ -0,0 +1,50 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster.util; + +import java.text.MessageFormat; + +/** + * Convenience class to allow log messages to be specified in terms + * of MessageFormat patterns with a variable set of parameters. The + * production of the string is only done if toSTring is called so it + * works well with debug level messages, allowing complex messages + * to be specified that are only evaluated if actually printed. + * + */ +public class LogMessage +{ + private final String _message; + private final Object[] _args; + + public LogMessage(String message) + { + this(message, new Object[0]); + } + + public LogMessage(String message, Object... args) + { + _message = message; + _args = args; + } + + public String toString() + { + return MessageFormat.format(_message, _args); + } +} diff --git a/java/cluster/src/org/apache/qpid/server/cluster/util/MultiValuedMap.java b/java/cluster/src/org/apache/qpid/server/cluster/util/MultiValuedMap.java new file mode 100644 index 0000000000..1c4e3da6f3 --- /dev/null +++ b/java/cluster/src/org/apache/qpid/server/cluster/util/MultiValuedMap.java @@ -0,0 +1,58 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster.util; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +/** + * Maps a key to a collection of values + * + */ +public class MultiValuedMap<K, V> +{ + private Map<K, Collection<V>> _map = new HashMap<K, Collection<V>>(); + + public boolean add(K key, V value) + { + Collection<V> values = get(key); + if (values == null) + { + values = createList(); + _map.put(key, values); + } + return values.add(value); + } + + public Collection<V> get(K key) + { + return _map.get(key); + } + + public Collection<V> remove(K key) + { + return _map.remove(key); + } + + protected Collection<V> createList() + { + return new ArrayList<V>(); + } +} diff --git a/java/cluster/src/org/apache/qpid/server/queue/ClusteredQueue.java b/java/cluster/src/org/apache/qpid/server/queue/ClusteredQueue.java new file mode 100644 index 0000000000..14d893b040 --- /dev/null +++ b/java/cluster/src/org/apache/qpid/server/queue/ClusteredQueue.java @@ -0,0 +1,161 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.queue; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.BasicCancelBody; +import org.apache.qpid.framing.QueueDeleteBody; +import org.apache.qpid.server.cluster.*; +import org.apache.qpid.server.cluster.util.LogMessage; +import org.apache.qpid.server.protocol.AMQProtocolSession; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; + +/** + * Represents a shared queue in a cluster. The key difference is that as well as any + * local consumers, there may be consumers for this queue on other members of the + * cluster. + * + */ +public class ClusteredQueue extends AMQQueue +{ + private static final Logger _logger = Logger.getLogger(ClusteredQueue.class); + private final ConcurrentHashMap<SimpleMemberHandle, RemoteSubscriptionImpl> _peers = new ConcurrentHashMap<SimpleMemberHandle, RemoteSubscriptionImpl>(); + private final GroupManager _groupMgr; + private final NestedSubscriptionManager _subscriptions; + + public ClusteredQueue(GroupManager groupMgr, String name, boolean durable, String owner, boolean autoDelete, QueueRegistry queueRegistry) + throws AMQException + { + super(name, durable, owner, autoDelete, queueRegistry, new ClusteredSubscriptionManager()); + _groupMgr = groupMgr; + _subscriptions = ((ClusteredSubscriptionManager) getSubscribers()).getAllSubscribers(); + } + + public ClusteredQueue(GroupManager groupMgr, String name, boolean durable, String owner, boolean autoDelete, QueueRegistry queueRegistry, Executor asyncDelivery) + throws AMQException + { + super(name, durable, owner, autoDelete, queueRegistry, asyncDelivery, new ClusteredSubscriptionManager(), + new SubscriptionImpl.Factory()); + _groupMgr = groupMgr; + _subscriptions = ((ClusteredSubscriptionManager) getSubscribers()).getAllSubscribers(); + } + + public void deliver(AMQMessage message) throws AMQException + { + _logger.info(new LogMessage("{0} delivered to clustered queue {1}", message, this)); + super.deliver(message); + } + + protected void autodelete() throws AMQException + { + if(!_subscriptions.hasActiveSubscribers()) + { + //delete locally: + delete(); + + //send deletion request to all other members: + QueueDeleteBody request = new QueueDeleteBody(); + request.queue = getName(); + _groupMgr.broadcast(new SimpleSendable(request)); + } + } + + public void unregisterProtocolSession(AMQProtocolSession ps, int channel, String consumerTag) throws AMQException + { + //handle locally: + super.unregisterProtocolSession(ps, channel, consumerTag); + + //signal other members: + BasicCancelBody request = new BasicCancelBody(); + request.consumerTag = getName(); + _groupMgr.broadcast(new SimpleSendable(request)); + } + + public void addRemoteSubcriber(MemberHandle peer) + { + _logger.info(new LogMessage("Added remote subscriber for {0} to clustered queue {1}", peer, this)); + //find (or create) a matching subscriber for the peer then increment the count + getSubscriber(key(peer), true).increment(); + } + + public void removeRemoteSubscriber(MemberHandle peer) + { + //find a matching subscriber for the peer then decrement the count + //if count is now zero, remove the subscriber + SimpleMemberHandle key = key(peer); + RemoteSubscriptionImpl s = getSubscriber(key, true); + if (s == null) + { + throw new RuntimeException("No subscriber for " + peer); + } + if (s.decrement()) + { + _peers.remove(key); + _subscriptions.removeSubscription(s); + } + } + + public void removeAllRemoteSubscriber(MemberHandle peer) + { + SimpleMemberHandle key = key(peer); + RemoteSubscriptionImpl s = getSubscriber(key, true); + _peers.remove(key); + _subscriptions.removeSubscription(s); + } + + private RemoteSubscriptionImpl getSubscriber(SimpleMemberHandle key, boolean create) + { + RemoteSubscriptionImpl s = _peers.get(key); + if (s == null && create) + { + return addSubscriber(key, new RemoteSubscriptionImpl(_groupMgr, key)); + } + else + { + return s; + } + } + + private RemoteSubscriptionImpl addSubscriber(SimpleMemberHandle key, RemoteSubscriptionImpl s) + { + RemoteSubscriptionImpl other = _peers.putIfAbsent(key, s); + if (other == null) + { + _subscriptions.addSubscription(s); + new SubscriberCleanup(key, this, _groupMgr); + return s; + } + else + { + return other; + } + } + + private SimpleMemberHandle key(MemberHandle peer) + { + return peer instanceof SimpleMemberHandle ? (SimpleMemberHandle) peer : (SimpleMemberHandle) SimpleMemberHandle.resolve(peer); + } + + static boolean isFromBroker(AMQMessage msg) + { + return ClusteredProtocolSession.isPayloadFromPeer(msg); + } +} diff --git a/java/cluster/src/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java b/java/cluster/src/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java new file mode 100644 index 0000000000..0005b20fb1 --- /dev/null +++ b/java/cluster/src/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java @@ -0,0 +1,92 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.queue; + +import org.apache.log4j.Logger; +import org.apache.qpid.server.cluster.util.LogMessage; + +class ClusteredSubscriptionManager extends SubscriptionSet +{ + private static final Logger _logger = Logger.getLogger(ClusteredSubscriptionManager.class); + private final NestedSubscriptionManager _all; + + ClusteredSubscriptionManager() + { + this(new NestedSubscriptionManager()); + } + + private ClusteredSubscriptionManager(NestedSubscriptionManager all) + { + _all = all; + _all.addSubscription(new Parent()); + } + + NestedSubscriptionManager getAllSubscribers() + { + return _all; + } + + public boolean hasActiveSubscribers() + { + return _all.hasActiveSubscribers(); + } + + public Subscription nextSubscriber(AMQMessage msg) + { + if(ClusteredQueue.isFromBroker(msg)) + { + //if message is from another broker, it should only be delivered + //to another client to meet ordering constraints + Subscription s = super.nextSubscriber(msg); + _logger.info(new LogMessage("Returning next *client* subscriber {0}", s)); + if(s == null) + { + //TODO: deliver to another broker, but set the redelivered flag on the msg + //(this should be policy based) + + //for now just don't deliver it + return null; + } + else + { + return s; + } + } + Subscription s = _all.nextSubscriber(msg); + _logger.info(new LogMessage("Returning next subscriber {0}", s)); + return s; + } + + private class Parent implements WeightedSubscriptionManager + { + public int getWeight() + { + return ClusteredSubscriptionManager.this.getWeight(); + } + + public boolean hasActiveSubscribers() + { + return ClusteredSubscriptionManager.super.hasActiveSubscribers(); + } + + public Subscription nextSubscriber(AMQMessage msg) + { + return ClusteredSubscriptionManager.super.nextSubscriber(msg); + } + } +} diff --git a/java/cluster/src/org/apache/qpid/server/queue/NestedSubscriptionManager.java b/java/cluster/src/org/apache/qpid/server/queue/NestedSubscriptionManager.java new file mode 100644 index 0000000000..0bb6537930 --- /dev/null +++ b/java/cluster/src/org/apache/qpid/server/queue/NestedSubscriptionManager.java @@ -0,0 +1,100 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.queue; + +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; + +/** + * Distributes messages among a list of subsscription managers, using their + * weighting. + * + */ +class NestedSubscriptionManager implements SubscriptionManager +{ + private final List<WeightedSubscriptionManager> _subscribers = new CopyOnWriteArrayList<WeightedSubscriptionManager>(); + private int _iterations; + private int _index; + + void addSubscription(WeightedSubscriptionManager s) + { + _subscribers.add(s); + } + + void removeSubscription(WeightedSubscriptionManager s) + { + _subscribers.remove(s); + } + + public boolean hasActiveSubscribers() + { + for(WeightedSubscriptionManager s : _subscribers) + { + if(s.hasActiveSubscribers()) + { + return true; + } + } + return false; + } + + public Subscription nextSubscriber(AMQMessage msg) + { + WeightedSubscriptionManager start = current(); + for(WeightedSubscriptionManager s = start; s != null; s = next(start)) + { + if(hasMore(s)) + { + return nextSubscriber(s); + } + } + return null; + } + + private Subscription nextSubscriber(WeightedSubscriptionManager s) + { + _iterations++; + return s.nextSubscriber(null); + } + + private WeightedSubscriptionManager current() + { + return _subscribers.isEmpty() ? null : _subscribers.get(_index); + } + + private boolean hasMore(WeightedSubscriptionManager s) + { + return _iterations < s.getWeight(); + } + + private WeightedSubscriptionManager next(WeightedSubscriptionManager start) + { + WeightedSubscriptionManager s = next(); + return s == start && !hasMore(s) ? null : s; + } + + private WeightedSubscriptionManager next() + { + _iterations = 0; + if(++_index >= _subscribers.size()) + { + _index = 0; + } + return _subscribers.get(_index); + } +} diff --git a/java/cluster/src/org/apache/qpid/server/queue/PrivateQueue.java b/java/cluster/src/org/apache/qpid/server/queue/PrivateQueue.java new file mode 100644 index 0000000000..1e7e13a577 --- /dev/null +++ b/java/cluster/src/org/apache/qpid/server/queue/PrivateQueue.java @@ -0,0 +1,60 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.queue; + +import org.apache.qpid.AMQException; +import org.apache.qpid.server.cluster.SimpleSendable; +import org.apache.qpid.server.cluster.GroupManager; +import org.apache.qpid.framing.QueueDeleteBody; + +import java.util.concurrent.Executor; + +/** + * Used to represent a private queue held locally. + * + */ +public class PrivateQueue extends AMQQueue +{ + private final GroupManager _groupMgr; + + public PrivateQueue(GroupManager groupMgr, String name, boolean durable, String owner, boolean autoDelete, QueueRegistry queueRegistry) + throws AMQException + { + super(name, durable, owner, autoDelete, queueRegistry); + _groupMgr = groupMgr; + + } + + public PrivateQueue(GroupManager groupMgr, String name, boolean durable, String owner, boolean autoDelete, QueueRegistry queueRegistry, Executor asyncDelivery) + throws AMQException + { + super(name, durable, owner, autoDelete, queueRegistry, asyncDelivery); + _groupMgr = groupMgr; + } + + protected void autodelete() throws AMQException + { + //delete locally: + super.autodelete(); + + //send delete request to peers: + QueueDeleteBody request = new QueueDeleteBody(); + request.queue = getName(); + _groupMgr.broadcast(new SimpleSendable(request)); + } +} diff --git a/java/cluster/src/org/apache/qpid/server/queue/ProxiedQueueCleanup.java b/java/cluster/src/org/apache/qpid/server/queue/ProxiedQueueCleanup.java new file mode 100644 index 0000000000..dac8b616e5 --- /dev/null +++ b/java/cluster/src/org/apache/qpid/server/queue/ProxiedQueueCleanup.java @@ -0,0 +1,57 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.queue; + +import org.apache.qpid.server.cluster.MembershipChangeListener; +import org.apache.qpid.server.cluster.MemberHandle; +import org.apache.qpid.server.cluster.GroupManager; +import org.apache.qpid.server.cluster.util.LogMessage; +import org.apache.qpid.AMQException; +import org.apache.log4j.Logger; + +import java.util.List; + +class ProxiedQueueCleanup implements MembershipChangeListener +{ + private static final Logger _logger = Logger.getLogger(ProxiedQueueCleanup.class); + + private final MemberHandle _subject; + private final RemoteQueueProxy _queue; + + ProxiedQueueCleanup(MemberHandle subject, RemoteQueueProxy queue) + { + _subject = subject; + _queue = queue; + } + + public void changed(List<MemberHandle> members) + { + if(!members.contains(_subject)) + { + try + { + _queue.delete(); + _logger.info(new LogMessage("Deleted {0} in response to exclusion of {1}", _queue, _subject)); + } + catch (AMQException e) + { + _logger.info(new LogMessage("Failed to delete {0} in response to exclusion of {1}: {2}", _queue, _subject, e), e); + } + } + } +} diff --git a/java/cluster/src/org/apache/qpid/server/queue/RemoteQueueProxy.java b/java/cluster/src/org/apache/qpid/server/queue/RemoteQueueProxy.java new file mode 100644 index 0000000000..a9a467b306 --- /dev/null +++ b/java/cluster/src/org/apache/qpid/server/queue/RemoteQueueProxy.java @@ -0,0 +1,106 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.queue; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQBody; +import org.apache.qpid.framing.BasicPublishBody; +import org.apache.qpid.framing.ContentBody; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.server.cluster.ClusteredProtocolSession; +import org.apache.qpid.server.cluster.GroupManager; +import org.apache.qpid.server.cluster.util.LogMessage; +import org.apache.qpid.server.cluster.MemberHandle; +import org.apache.qpid.server.cluster.SimpleSendable; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Executor; + +/** + * TODO: separate out an abstract base class from AMQQueue from which this inherits. It does + * not require all the functionality currently in AMQQueue. + * + */ +public class RemoteQueueProxy extends AMQQueue +{ + private static final Logger _logger = Logger.getLogger(RemoteQueueProxy.class); + private final MemberHandle _target; + private final GroupManager _groupMgr; + + public RemoteQueueProxy(MemberHandle target, GroupManager groupMgr, String name, boolean durable, String owner, boolean autoDelete, QueueRegistry queueRegistry) + throws AMQException + { + super(name, durable, owner, autoDelete, queueRegistry); + _target = target; + _groupMgr = groupMgr; + _groupMgr.addMemberhipChangeListener(new ProxiedQueueCleanup(target, this)); + } + + public RemoteQueueProxy(MemberHandle target, GroupManager groupMgr, String name, boolean durable, String owner, boolean autoDelete, QueueRegistry queueRegistry, Executor asyncDelivery) + throws AMQException + { + super(name, durable, owner, autoDelete, queueRegistry, asyncDelivery); + _target = target; + _groupMgr = groupMgr; + _groupMgr.addMemberhipChangeListener(new ProxiedQueueCleanup(target, this)); + } + + public void deliver(AMQMessage msg) throws NoConsumersException + { + if (ClusteredProtocolSession.canRelay(msg, _target)) + { + try + { + _logger.debug(new LogMessage("Relaying {0} to {1}", msg, _target)); + relay(msg); + } + catch (NoConsumersException e) + { + throw e; + } + catch (AMQException e) + { + //TODO: sort out exception handling... + e.printStackTrace(); + } + } + else + { + _logger.debug(new LogMessage("Cannot relay {0} to {1}", msg, _target)); + } + } + + void relay(AMQMessage msg) throws AMQException + { + BasicPublishBody publish = msg.getPublishBody(); + ContentHeaderBody header = msg.getContentHeaderBody(); + List<ContentBody> bodies = msg.getContentBodies(); + + //(i) construct a new publishing block: + publish.immediate = false;//can't as yet handle the immediate flag in a cluster + List<AMQBody> parts = new ArrayList<AMQBody>(2 + bodies.size()); + parts.add(publish); + parts.add(header); + parts.addAll(bodies); + + //(ii) send this on to the broker for which it is acting as proxy: + _groupMgr.send(_target, new SimpleSendable(parts)); + } +} diff --git a/java/cluster/src/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java b/java/cluster/src/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java new file mode 100644 index 0000000000..9de7a5c849 --- /dev/null +++ b/java/cluster/src/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java @@ -0,0 +1,93 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.queue; + +import org.apache.qpid.server.cluster.MemberHandle; +import org.apache.qpid.server.cluster.GroupManager; +import org.apache.qpid.server.cluster.SimpleSendable; +import org.apache.qpid.AMQException; + +class RemoteSubscriptionImpl implements Subscription, WeightedSubscriptionManager +{ + private final GroupManager _groupMgr; + private final MemberHandle _peer; + private boolean _suspended; + private int _count; + + RemoteSubscriptionImpl(GroupManager groupMgr, MemberHandle peer) + { + _groupMgr = groupMgr; + _peer = peer; + } + + synchronized void increment() + { + _count++; + } + + synchronized boolean decrement() + { + return --_count <= 0; + } + + public void send(AMQMessage msg, AMQQueue queue) + { + try + { + _groupMgr.send(_peer, new SimpleSendable(msg.getPayload())); + } + catch (AMQException e) + { + //TODO: handle exceptions properly... + e.printStackTrace(); + } + } + + public synchronized void setSuspended(boolean suspended) + { + _suspended = suspended; + } + + public synchronized boolean isSuspended() + { + return _suspended; + } + + public synchronized int getWeight() + { + return _count; + } + + public boolean hasActiveSubscribers() + { + return getWeight() == 0; + } + + public Subscription nextSubscriber(AMQMessage msg) + { + return this; + } + + public void queueDeleted(AMQQueue queue) + { + if(queue instanceof ClusteredQueue) + { + ((ClusteredQueue) queue).removeAllRemoteSubscriber(_peer); + } + } +} diff --git a/java/cluster/src/org/apache/qpid/server/queue/SubscriberCleanup.java b/java/cluster/src/org/apache/qpid/server/queue/SubscriberCleanup.java new file mode 100644 index 0000000000..cdb6f8f4d2 --- /dev/null +++ b/java/cluster/src/org/apache/qpid/server/queue/SubscriberCleanup.java @@ -0,0 +1,53 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.queue; + +import org.apache.qpid.server.cluster.MembershipChangeListener; +import org.apache.qpid.server.cluster.MemberHandle; +import org.apache.qpid.server.cluster.GroupManager; +import org.apache.qpid.server.cluster.util.LogMessage; +import org.apache.log4j.Logger; + +import java.util.List; + +class SubscriberCleanup implements MembershipChangeListener +{ + private static final Logger _logger = Logger.getLogger(SubscriberCleanup.class); + + private final MemberHandle _subject; + private final ClusteredQueue _queue; + private final GroupManager _manager; + + SubscriberCleanup(MemberHandle subject, ClusteredQueue queue, GroupManager manager) + { + _subject = subject; + _queue = queue; + _manager = manager; + _manager.addMemberhipChangeListener(this); + } + + public void changed(List<MemberHandle> members) + { + if(!members.contains(_subject)) + { + _queue.removeAllRemoteSubscriber(_subject); + _manager.removeMemberhipChangeListener(this); + _logger.info(new LogMessage("Removed {0} from {1}", _subject, _queue)); + } + } +} |