diff options
author | Martin Ritchie <ritchiem@apache.org> | 2008-02-29 16:00:19 +0000 |
---|---|---|
committer | Martin Ritchie <ritchiem@apache.org> | 2008-02-29 16:00:19 +0000 |
commit | a91e0a19c3ca405b659d21593f433fca692acbec (patch) | |
tree | 6f010166d7b5759520c39f8f48bcbc50e8290cef | |
parent | 48248ab5bcab430eddaa887c6468ca7d1939c9d3 (diff) | |
download | qpid-python-a91e0a19c3ca405b659d21593f433fca692acbec.tar.gz |
QPID-702 Removed cluster code as it has been scheduled for such since last year.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@632363 13f79535-47bb-0310-9956-ffa450edef68
91 files changed, 0 insertions, 8006 deletions
diff --git a/java/cluster/doc/design.doc b/java/cluster/doc/design.doc Binary files differdeleted file mode 100644 index c5bbf0f8a4..0000000000 --- a/java/cluster/doc/design.doc +++ /dev/null diff --git a/java/cluster/pom.xml b/java/cluster/pom.xml deleted file mode 100644 index 73c62520e6..0000000000 --- a/java/cluster/pom.xml +++ /dev/null @@ -1,69 +0,0 @@ -<!-- - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> - <modelVersion>4.0.0</modelVersion> - <groupId>org.apache.qpid</groupId> - <artifactId>qpid-cluster</artifactId> - <packaging>jar</packaging> - <version>1.0-incubating-M2.1-SNAPSHOT</version> - <name>Qpid Cluster</name> - <url>http://cwiki.apache.org/confluence/display/qpid</url> - - <parent> - <groupId>org.apache.qpid</groupId> - <artifactId>qpid</artifactId> - <version>1.0-incubating-M2.1-SNAPSHOT</version> - </parent> - - <properties> - <topDirectoryLocation>..</topDirectoryLocation> - </properties> - - <dependencies> - <dependency> - <groupId>org.apache.qpid</groupId> - <artifactId>qpid-common</artifactId> - </dependency> - <dependency> - <groupId>org.apache.qpid</groupId> - <artifactId>qpid-client</artifactId> - </dependency> - <dependency> - <groupId>org.apache.qpid</groupId> - <artifactId>qpid-broker</artifactId> - </dependency> - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - </dependency> - </dependencies> - - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-surefire-plugin</artifactId> - <configuration> - <skip>true</skip> - </configuration> - </plugin> - </plugins> - </build> -</project> diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQConnectionWaitException.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQConnectionWaitException.java deleted file mode 100644 index 2baaa344ef..0000000000 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQConnectionWaitException.java +++ /dev/null @@ -1,42 +0,0 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster;
-
-import org.apache.qpid.AMQException;
-
-/**
- * AMQConnectionWaitException represents a failure to connect to a cluster peer in a timely manner.
- *
- * <p/><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Represents failure to connect to a cluster peer in a timely manner.
- * </table>
- *
- * @todo Not an AMQP exception as no status code.
- */
-public class AMQConnectionWaitException extends AMQException
-{
- public AMQConnectionWaitException(String s, Throwable e)
- {
- super(s, e);
-
- }
-}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQUnexpectedBodyTypeException.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQUnexpectedBodyTypeException.java deleted file mode 100644 index 951bd22df0..0000000000 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQUnexpectedBodyTypeException.java +++ /dev/null @@ -1,46 +0,0 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQBody;
-
-/**
- * AMQUnexpectedBodyTypeException represents a failure where a message body does not match its expected type. For example,
- * and AMQP method should have a method body.
- *
- * <p/><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Represents a failure where a message body does not match its expected type.
- * </table>
- *
- * @todo Not an AMQP exception as no status code.
- *
- * @todo Seems like this exception was created to handle an unsafe type cast that will never happen in practice. Would
- * be better just to leave that as a ClassCastException. Check that the framing layer will pick up the error first.
- */
-public class AMQUnexpectedBodyTypeException extends AMQException
-{
- public AMQUnexpectedBodyTypeException(Class<? extends AMQBody> expectedClass, AMQBody body)
- {
- super("Unexpected body type. Expected: " + expectedClass.getName() + "; got: " + body.getClass().getName());
- }
-}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQUnexpectedFrameTypeException.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQUnexpectedFrameTypeException.java deleted file mode 100644 index 4dd318f90d..0000000000 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQUnexpectedFrameTypeException.java +++ /dev/null @@ -1,45 +0,0 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster;
-
-import org.apache.qpid.AMQException;
-
-/**
- * AMQUnexpectedFrameTypeException represents a failure when Mina passes an unexpected frame type.
- *
- * <p/><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Represents failure to cast a frame to its expected type.
- * </table>
- *
- * @todo Not an AMQP exception as no status code.
- *
- * @todo Seems like this exception was created to handle an unsafe type cast that will never happen in practice. Would
- * be better just to leave that as a ClassCastException. However, check the framing layer catches this error
- * first.
- */
-public class AMQUnexpectedFrameTypeException extends AMQException
-{
- public AMQUnexpectedFrameTypeException(String s)
- {
- super(s);
- }
-}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/BlockingHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/BlockingHandler.java deleted file mode 100644 index 39508df566..0000000000 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/BlockingHandler.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.cluster; - -import org.apache.qpid.framing.AMQMethodBody; - -public class BlockingHandler implements ResponseHandler -{ - private final Class _expected; - private boolean _completed; - private AMQMethodBody _response; - - - public BlockingHandler() - { - this(AMQMethodBody.class); - } - - public BlockingHandler(Class<? extends AMQMethodBody> expected) - { - _expected = expected; - } - - public void responded(AMQMethodBody response) - { - if (_expected.isInstance(response)) - { - _response = response; - completed(); - } - } - - public void removed() - { - completed(); - } - - private synchronized void completed() - { - _completed = true; - notifyAll(); - } - - synchronized void waitForCompletion() - { - while (!_completed) - { - try - { - wait(); - } - catch (InterruptedException ignore) - { - - } - } - } - - AMQMethodBody getResponse() - { - return _response; - } - - boolean failed() - { - return _response == null; - } - - boolean isCompleted() - { - return _completed; - } -} diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/BroadcastPolicy.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/BroadcastPolicy.java deleted file mode 100644 index 145aa58574..0000000000 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/BroadcastPolicy.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.cluster; - -public interface BroadcastPolicy -{ - public boolean isComplete(int responded, int members); -} diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/Broker.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/Broker.java deleted file mode 100644 index 7e2cf6da83..0000000000 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/Broker.java +++ /dev/null @@ -1,247 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.cluster; - -import org.apache.qpid.AMQException; -import org.apache.qpid.server.cluster.util.LogMessage; -import org.apache.qpid.framing.AMQMethodBody; -import org.apache.log4j.Logger; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; - -/** - * An implementation of the Member interface (through which data is sent to other - * peers in the cluster). This class provides a base from which subclasses can - * inherit some common behaviour for broadcasting GroupRequests and sending methods - * that may expect a response. It also extends the Member abstraction to support - * a richer set of operations that are useful within the package but should not be - * exposed outside of it. - * - */ -abstract class Broker extends SimpleMemberHandle implements Member -{ - private static final Logger _logger = Logger.getLogger(Broker.class); - private static final int DEFAULT_CHANNEL = 1; - private static final int START_CHANNEL = 2; - private static final int END_CHANNEL = 10000; - - - private MemberFailureListener _listener; - //a wrap-around counter to allocate _requests a unique channel: - private int _nextChannel = START_CHANNEL; - //outstanding _requests: - private final Map<Integer, ResponseHandler> _requests = new HashMap<Integer, ResponseHandler>(); - - Broker(String host, int port) - { - super(host, port); - } - - /** - * Allows a listener to be registered that will receive callbacks when communication - * to the peer this broker instance represents fails. - * @param listener the callback to be notified of failures - */ - public void addFailureListener(MemberFailureListener listener) - { - _listener = listener; - } - - /** - * Allows subclasses to signal comunication failures - */ - protected void failed() - { - if (_listener != null) - { - _listener.failed(this); - } - } - - /** - * Subclasses should call this on receiving message responses from the remote - * peer. They are matched to any outstanding request they might be response - * to, with the completion and callback of that request being managed if - * required. - * - * @param channel the channel on which the method was received - * @param response the response received - * @return true if the response matched an outstanding request - */ - protected synchronized boolean handleResponse(int channel, AMQMethodBody response) - { - ResponseHandler request = _requests.get(channel); - if (request == null) - { - if(!_requests.isEmpty()) - { - _logger.warn(new LogMessage("[next channel={3, integer}]: Response {0} on channel {1, integer} failed to match outstanding requests: {2}", response, channel, _requests, _nextChannel)); - } - return false; - } - else - { - request.responded(response); - return true; - } - } - - /** - * Called when this broker is excluded from the group. Any requests made on - * it are informed this member has left the group. - */ - synchronized void remove() - { - for (ResponseHandler r : _requests.values()) - { - r.removed(); - } - } - - /** - * Engages this broker in the specified group request - * - * @param request the request being made to a group of brokers - * @throws AMQException if there is any failure - */ - synchronized void invoke(GroupRequest request) throws AMQException - { - int channel = nextChannel(); - _requests.put(channel, new GroupRequestAdapter(request, channel)); - request.send(channel, this); - } - - /** - * Sends a message to the remote peer and undertakes to notify the specified - * handler of the response. - * - * @param msg the message to send - * @param handler the callback to notify of responses (or the removal of this broker - * from the group) - * @throws AMQException - */ - synchronized void send(Sendable msg, ResponseHandler handler) throws AMQException - { - int channel; - if (handler != null) - { - channel = nextChannel(); - _requests.put(channel, new RemovingWrapper(handler, channel)); - } - else - { - channel = DEFAULT_CHANNEL; - } - - msg.send(channel, this); - } - - private int nextChannel() - { - int channel = _nextChannel++; - if(_nextChannel >= END_CHANNEL) - { - _nextChannel = START_CHANNEL; - } - return channel; - } - - /** - * extablish connection without handling redirect - */ - abstract boolean connect() throws IOException, InterruptedException; - - /** - * Start connection process, including replay - */ - abstract void connectAsynch(Iterable<AMQMethodBody> msgs); - - /** - * Replay messages to the remote peer this instance represents. These messages - * must be sent before any others whose transmission is requested through send() etc. - * - * @param msgs - */ - abstract void replay(Iterable<AMQMethodBody> msgs); - - /** - * establish connection, handling redirect if required... - */ - abstract Broker connectToCluster() throws IOException, InterruptedException; - - private class GroupRequestAdapter implements ResponseHandler - { - private final GroupRequest request; - private final int channel; - - GroupRequestAdapter(GroupRequest request, int channel) - { - this.request = request; - this.channel = channel; - } - - public void responded(AMQMethodBody response) - { - request.responseReceived(Broker.this, response); - _requests.remove(channel); - } - - public void removed() - { - request.removed(Broker.this); - } - - public String toString() - { - return "GroupRequestAdapter{" + channel + ", " + request + "}"; - } - } - - private class RemovingWrapper implements ResponseHandler - { - private final ResponseHandler handler; - private final int channel; - - RemovingWrapper(ResponseHandler handler, int channel) - { - this.handler = handler; - this.channel = channel; - } - - public void responded(AMQMethodBody response) - { - handler.responded(response); - _requests.remove(channel); - } - - public void removed() - { - handler.removed(); - } - - public String toString() - { - return "RemovingWrapper{" + channel + ", " + handler + "}"; - } - } -} diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/BrokerFactory.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/BrokerFactory.java deleted file mode 100644 index 92c3c4e7bf..0000000000 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/BrokerFactory.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.cluster; - -interface BrokerFactory -{ - public Broker create(MemberHandle handle); -} diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/BrokerGroup.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/BrokerGroup.java deleted file mode 100644 index 755a341607..0000000000 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/BrokerGroup.java +++ /dev/null @@ -1,368 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.cluster; - -import org.apache.log4j.Logger; -import org.apache.qpid.server.cluster.replay.ReplayManager; -import org.apache.qpid.server.cluster.util.LogMessage; -import org.apache.qpid.server.cluster.util.InvokeMultiple; -import org.apache.qpid.framing.AMQMethodBody; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Set; - -/** - * Manages the membership list of a group and the set of brokers representing the - * remote peers. The group should be initialised through a call to establish() - * or connectToLeader(). - * - */ -class BrokerGroup -{ - private static final Logger _logger = Logger.getLogger(BrokerGroup.class); - - private final InvokeMultiple<MembershipChangeListener> _changeListeners = new InvokeMultiple<MembershipChangeListener>(MembershipChangeListener.class); - private final ReplayManager _replayMgr; - private final MemberHandle _local; - private final BrokerFactory _factory; - private final Object _lock = new Object(); - private final Set<MemberHandle> _synch = new HashSet<MemberHandle>(); - private List<MemberHandle> _members; - private List<Broker> _peers = new ArrayList<Broker>(); - private JoinState _state = JoinState.UNINITIALISED; - - /** - * Creates an unitialised group. - * - * @param local a handle that represents the local broker - * @param replayMgr the replay manager to use when creating new brokers - * @param factory the factory through which broker instances are created - */ - BrokerGroup(MemberHandle local, ReplayManager replayMgr, BrokerFactory factory) - { - _replayMgr = replayMgr; - _local = local; - _factory = factory; - } - - /** - * Called to establish the local broker as the leader of a new group - */ - void establish() - { - synchronized (_lock) - { - setState(JoinState.JOINED); - _members = new ArrayList<MemberHandle>(); - _members.add(_local); - } - fireChange(); - } - - /** - * Called by prospect to connect to group - */ - Broker connectToLeader(MemberHandle handle) throws Exception - { - Broker leader = _factory.create(handle); - leader = leader.connectToCluster(); - synchronized (_lock) - { - setState(JoinState.JOINING); - _members = new ArrayList<MemberHandle>(); - _members.add(leader); - _peers.add(leader); - } - fireChange(); - return leader; - } - - /** - * Called by leader when handling a join request - */ - Broker connectToProspect(MemberHandle handle) throws IOException, InterruptedException - { - Broker prospect = _factory.create(handle); - prospect.connect(); - synchronized (_lock) - { - _members.add(prospect); - _peers.add(prospect); - } - fireChange(); - return prospect; - } - - /** - * Called in reponse to membership announcements. - * - * @param members the list of members now part of the group - */ - void setMembers(List<MemberHandle> members) - { - if (isJoined()) - { - List<Broker> old = _peers; - - synchronized (_lock) - { - _peers = getBrokers(members); - _members = new ArrayList<MemberHandle>(members); - } - - //remove those that are still members - old.removeAll(_peers); - - //handle failure of any brokers that haven't survived - for (Broker peer : old) - { - peer.remove(); - } - } - else - { - synchronized (_lock) - { - setState(JoinState.INITIATION); - _members = new ArrayList<MemberHandle>(members); - _synch.addAll(_members); - _synch.remove(_local); - } - } - fireChange(); - } - - List<MemberHandle> getMembers() - { - synchronized (_lock) - { - return Collections.unmodifiableList(_members); - } - } - - List<Broker> getPeers() - { - synchronized (_lock) - { - return _peers; - } - } - - /** - * Removes the member presented from the group - * @param peer the broker that should be removed - */ - void remove(Broker peer) - { - synchronized (_lock) - { - _peers.remove(peer); - _members.remove(peer); - } - fireChange(); - } - - MemberHandle getLocal() - { - return _local; - } - - Broker getLeader() - { - synchronized (_lock) - { - return _peers.size() > 0 ? _peers.get(0) : null; - } - } - - /** - * Allows a Broker instance to be retrieved for a given handle - * - * @param handle the handle for which a broker is sought - * @param create flag to indicate whther a broker should be created for the handle if - * one is not found within the list of known peers - * @return the broker corresponding to handle or null if a match cannot be found and - * create is false - */ - Broker findBroker(MemberHandle handle, boolean create) - { - if (handle instanceof Broker) - { - return (Broker) handle; - } - else - { - for (Broker b : getPeers()) - { - if (b.matches(handle)) - { - return b; - } - } - } - if (create) - { - Broker b = _factory.create(handle); - List<AMQMethodBody> msgs = _replayMgr.replay(isLeader(_local)); - _logger.info(new LogMessage("Replaying {0} from {1} to {2}", msgs, _local, b)); - b.connectAsynch(msgs); - - return b; - } - else - { - return null; - } - } - - /** - * @param member the member to test for leadership - * @return true if the passed in member is the group leader, false otherwise - */ - boolean isLeader(MemberHandle member) - { - synchronized (_lock) - { - return member.matches(_members.get(0)); - } - } - - /** - * @return true if the local broker is the group leader, false otherwise - */ - boolean isLeader() - { - return isLeader(_local); - } - - /** - * Used when the leader fails and the next broker in the list needs to - * assume leadership - * @return true if the action succeeds - */ - boolean assumeLeadership() - { - boolean valid; - synchronized (_lock) - { - valid = _members.size() > 1 && _local.matches(_members.get(1)); - if (valid) - { - _members.remove(0); - _peers.remove(0); - } - } - fireChange(); - return valid; - } - - /** - * Called in response to a Cluster.Synch message being received during the join - * process. This indicates that the member mentioned has replayed all necessary - * messages to the local broker. - * - * @param member the member from whom the synch messages was received - */ - void synched(MemberHandle member) - { - _logger.info(new LogMessage("Synchronised with {0}", member)); - synchronized (_lock) - { - if (isLeader(member)) - { - setState(JoinState.INDUCTION); - } - _synch.remove(member); - if (_synch.isEmpty()) - { - _peers = getBrokers(_members); - setState(JoinState.JOINED); - } - } - } - - - /** - * @return the state of the group - */ - JoinState getState() - { - synchronized (_lock) - { - return _state; - } - } - - void addMemberhipChangeListener(MembershipChangeListener l) - { - _changeListeners.addListener(l); - } - - void removeMemberhipChangeListener(MembershipChangeListener l) - { - _changeListeners.removeListener(l); - } - - - - private void setState(JoinState state) - { - _logger.info(new LogMessage("Changed state from {0} to {1}", _state, state)); - _state = state; - } - - private boolean isJoined() - { - return inState(JoinState.JOINED); - } - - private boolean inState(JoinState state) - { - return _state.equals(state); - } - - private List<Broker> getBrokers(List<MemberHandle> handles) - { - List<Broker> brokers = new ArrayList<Broker>(); - for (MemberHandle handle : handles) - { - if (!_local.matches(handle)) - { - brokers.add(findBroker(handle, true)); - } - } - return brokers; - } - - private void fireChange() - { - List<MemberHandle> members; - synchronized(this) - { - members = new ArrayList(_members); - } - _changeListeners.getProxy().changed(Collections.unmodifiableList(members)); - } -}
\ No newline at end of file diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java deleted file mode 100644 index 1b4a3e8327..0000000000 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.cluster; - -import org.apache.mina.common.IoSession; -import org.apache.qpid.AMQException; -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.protocol.AMQMethodEvent; -import org.apache.qpid.client.protocol.AMQProtocolSession; -import org.apache.qpid.client.state.AMQStateManager; -import org.apache.qpid.framing.AMQMethodBody; - -/** - * Hack to assist with reuse of the client handlers for connection setup in - * the inter-broker communication within the cluster. - * - */ -class ClientAdapter implements MethodHandler -{ - private final AMQProtocolSession _session; - private final AMQStateManager _stateMgr; - - ClientAdapter(IoSession session, AMQStateManager stateMgr) - { - this(session, stateMgr, "guest", "guest", session.toString(), "/cluster"); - } - - ClientAdapter(IoSession session, AMQStateManager stateMgr, String user, String password, String name, String path) - { - _session = new SessionAdapter(session, new ConnectionAdapter(user, password, name, path)); - _stateMgr = stateMgr; - } - - public void handle(int channel, AMQMethodBody method) throws AMQException - { - AMQMethodEvent evt = new AMQMethodEvent(channel, method); - _stateMgr.methodReceived(evt); - } - - private class SessionAdapter extends AMQProtocolSession - { - public SessionAdapter(IoSession session, AMQConnection connection) - { - super(null, session, connection); - } - } - - private static class ConnectionAdapter extends AMQConnection - { - ConnectionAdapter(String username, String password, String clientName, String virtualPath) - { - super(username, password, clientName, virtualPath); - } - } -} diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java deleted file mode 100644 index f4a8e4c1e2..0000000000 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java +++ /dev/null @@ -1,139 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.cluster; - -import org.apache.qpid.client.handler.ConnectionCloseMethodHandler; -import org.apache.qpid.client.handler.ConnectionOpenOkMethodHandler; -import org.apache.qpid.client.handler.ConnectionSecureMethodHandler; -import org.apache.qpid.client.handler.ConnectionStartMethodHandler; -import org.apache.qpid.client.handler.ConnectionTuneMethodHandler; -import org.apache.qpid.client.state.AMQState; -import org.apache.qpid.client.state.AMQStateManager; -// import org.apache.qpid.client.state.IllegalStateTransitionException; -import org.apache.qpid.client.state.StateAwareMethodListener; -import org.apache.qpid.client.protocol.AMQProtocolSession; -import org.apache.qpid.framing.*; - -import java.util.HashMap; -import java.util.Map; - -/** - * An extension of client.AMQStateManager that allows different handlers to be registered. - * - */ -public class ClientHandlerRegistry extends AMQStateManager -{ - private final Map<AMQState, ClientRegistry> _handlers = new HashMap<AMQState, ClientRegistry>(); - private final MemberHandle _identity; - - protected ClientHandlerRegistry(MemberHandle local, AMQProtocolSession protocolSession) - { - super(AMQState.CONNECTION_NOT_STARTED, false, protocolSession); - - _identity = local; - - addHandler(ConnectionStartBody.class, ConnectionStartMethodHandler.getInstance(), - AMQState.CONNECTION_NOT_STARTED); - - addHandler(ConnectionTuneBody.class, new ConnectionTuneHandler(), - AMQState.CONNECTION_NOT_TUNED); - addHandler(ConnectionSecureBody.class, ConnectionSecureMethodHandler.getInstance(), - AMQState.CONNECTION_NOT_TUNED); - addHandler(ConnectionOpenOkBody.class, ConnectionOpenOkMethodHandler.getInstance(), - AMQState.CONNECTION_NOT_OPENED); - - addHandlers(ConnectionCloseBody.class, ConnectionCloseMethodHandler.getInstance(), - AMQState.CONNECTION_NOT_STARTED, - AMQState.CONNECTION_NOT_TUNED, - AMQState.CONNECTION_NOT_OPENED); - - } - - private ClientRegistry state(AMQState state) - { - ClientRegistry registry = _handlers.get(state); - if (registry == null) - { - registry = new ClientRegistry(); - _handlers.put(state, registry); - } - return registry; - } - - protected StateAwareMethodListener findStateTransitionHandler(AMQState state, AMQMethodBody frame) //throws IllegalStateTransitionException - { - ClientRegistry registry = _handlers.get(state); - return registry == null ? null : registry.getHandler(frame); - } - - - <A extends Class<AMQMethodBody>> void addHandlers(Class type, StateAwareMethodListener handler, AMQState... states) - { - for (AMQState state : states) - { - addHandler(type, handler, state); - } - } - - <A extends Class<AMQMethodBody>> void addHandler(Class type, StateAwareMethodListener handler, AMQState state) - { - ClientRegistry registry = _handlers.get(state); - if (registry == null) - { - registry = new ClientRegistry(); - _handlers.put(state, registry); - } - registry.add(type, handler); - } - - static class ClientRegistry - { - private final Map<Class<? extends AMQMethodBody>, StateAwareMethodListener> registry - = new HashMap<Class<? extends AMQMethodBody>, StateAwareMethodListener>(); - - <A extends Class<AMQMethodBody>> void add(A type, StateAwareMethodListener handler) - { - registry.put(type, handler); - } - - StateAwareMethodListener getHandler(AMQMethodBody frame) - { - return registry.get(frame.getClass()); - } - } - - class ConnectionTuneHandler extends ConnectionTuneMethodHandler - { - protected AMQFrame createConnectionOpenFrame(int channel, AMQShortString path, AMQShortString capabilities, boolean insist, byte major, byte minor) - { - // Be aware of possible changes to parameter order as versions change. - return ConnectionOpenBody.createAMQFrame(channel, - major, - minor, - // AMQP version (major, minor) - new AMQShortString(ClusterCapability.add(capabilities, _identity)), - // capabilities - insist, - // insist - path); - } - } -} diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterBuilder.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterBuilder.java deleted file mode 100644 index 80f9ef62b1..0000000000 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterBuilder.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.cluster; - -import org.apache.qpid.server.cluster.handler.ClusterMethodHandlerFactory; -import org.apache.qpid.server.cluster.replay.RecordingMethodHandlerFactory; -import org.apache.qpid.server.cluster.replay.ReplayStore; - -import java.net.InetSocketAddress; - -class ClusterBuilder -{ - private final LoadTable loadTable = new LoadTable(); - private final ReplayStore replayStore = new ReplayStore(); - private final MemberHandle handle; - private final GroupManager groupMgr; - - ClusterBuilder(InetSocketAddress address) - { - handle = new SimpleMemberHandle(address.getHostName(), address.getPort()).resolve(); - groupMgr = new DefaultGroupManager(handle, getBrokerFactory(), replayStore, loadTable); - } - - GroupManager getGroupManager() - { - return groupMgr; - } - - ServerHandlerRegistry getHandlerRegistry() - { - return new ServerHandlerRegistry(getHandlerFactory(), null, null); - } - - private MethodHandlerFactory getHandlerFactory() - { - MethodHandlerFactory factory = new ClusterMethodHandlerFactory(groupMgr, loadTable); - //need to wrap relevant handlers with recording handler for easy replay: - return new RecordingMethodHandlerFactory(factory, replayStore); - } - - private BrokerFactory getBrokerFactory() - { - return new MinaBrokerProxyFactory(handle); - } -} diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterCapability.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterCapability.java deleted file mode 100644 index 57c48f0611..0000000000 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterCapability.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.cluster; - -import org.apache.qpid.framing.AMQShortString; - -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -public class ClusterCapability -{ - public static final String PATTERN = ".*\\bcluster_peer=(\\S*:\\d*)\b*.*"; - public static final String PEER = "cluster_peer"; - - public static AMQShortString add(AMQShortString original, MemberHandle identity) - { - return original == null ? peer(identity) : new AMQShortString(original + " " + peer(identity)); - } - - private static AMQShortString peer(MemberHandle identity) - { - return new AMQShortString(PEER + "=" + identity.getDetails()); - } - - public static boolean contains(AMQShortString in) - { - return in != null; // && in.contains(in); - } - - public static MemberHandle getPeer(AMQShortString in) - { - Matcher matcher = Pattern.compile(PATTERN).matcher(in); - if (matcher.matches()) - { - return new SimpleMemberHandle(matcher.group(1)); - } - else - { - throw new RuntimeException("Could not find peer in '" + in + "'"); - } - } -} diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java deleted file mode 100644 index ee5aa48db9..0000000000 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java +++ /dev/null @@ -1,190 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.cluster; - -import org.apache.log4j.Logger; -import org.apache.mina.common.IoSession; -import org.apache.qpid.AMQException; -import org.apache.qpid.codec.AMQCodecFactory; -import org.apache.qpid.framing.AMQBody; -import org.apache.qpid.framing.AMQFrame; -import org.apache.qpid.framing.ConnectionOpenBody; -import org.apache.qpid.framing.ConnectionSecureOkBody; -import org.apache.qpid.framing.ConnectionStartOkBody; -import org.apache.qpid.framing.ConnectionTuneOkBody; -import org.apache.qpid.framing.ClusterMembershipBody; -import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.server.protocol.AMQPFastProtocolHandler; -import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.queue.QueueRegistry; -import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.registry.IApplicationRegistry; -import org.apache.qpid.server.cluster.util.LogMessage; -import org.apache.qpid.server.virtualhost.VirtualHostRegistry; - -import java.net.InetSocketAddress; - -public class ClusteredProtocolHandler extends AMQPFastProtocolHandler implements InductionBuffer.MessageHandler -{ - private static final Logger _logger = Logger.getLogger(ClusteredProtocolHandler.class); - private final InductionBuffer _peerBuffer = new InductionBuffer(this); - private final InductionBuffer _clientBuffer = new InductionBuffer(this); - private final GroupManager _groupMgr; - private final ServerHandlerRegistry _handlers; - - public ClusteredProtocolHandler(InetSocketAddress address) - { - this(ApplicationRegistry.getInstance(), address); - } - - public ClusteredProtocolHandler(IApplicationRegistry registry, InetSocketAddress address) - { - super(registry); - ClusterBuilder builder = new ClusterBuilder(address); - _groupMgr = builder.getGroupManager(); - _handlers = builder.getHandlerRegistry(); - } - - public ClusteredProtocolHandler(ClusteredProtocolHandler handler) - { - super(handler); - _groupMgr = handler._groupMgr; - _handlers = handler._handlers; - } - - protected void createSession(IoSession session, VirtualHostRegistry virtualHostRegistry, AMQProtocolSession protocolSession, AMQCodecFactory codec) throws AMQException - { - new ClusteredProtocolSession(session, virtualHostRegistry, codec, new ServerHandlerRegistry(_handlers, virtualHostRegistry, protocolSession)); - } - - void connect(String join) throws Exception - { - if (join == null) - { - _groupMgr.establish(); - } - else - { - _groupMgr.join(new SimpleMemberHandle(join)); - } - } - - private boolean inState(JoinState state) - { - return _groupMgr.getState().equals(state); - } - - public void messageReceived(IoSession session, Object msg) throws Exception - { - JoinState state = _groupMgr.getState(); - switch (state) - { - case JOINED: - _logger.debug(new LogMessage("Received {0}", msg)); - super.messageReceived(session, msg); - break; - case JOINING: - case INITIATION: - case INDUCTION: - buffer(session, msg); - break; - default: - throw new AMQException("Received message while in state: " + state); - } - JoinState latest = _groupMgr.getState(); - if (!latest.equals(state)) - { - switch (latest) - { - case INDUCTION: - _logger.info("Reached induction, delivering buffered message from peers"); - _peerBuffer.deliver(); - break; - case JOINED: - _logger.info("Reached joined, delivering buffered message from clients"); - _clientBuffer.deliver(); - break; - } - } - } - - private void buffer(IoSession session, Object msg) throws Exception - { - if (isBufferable(msg)) - { - MemberHandle peer = ClusteredProtocolSession.getSessionPeer(session); - if (peer == null) - { - _logger.debug(new LogMessage("Buffering {0} for client", msg)); - _clientBuffer.receive(session, msg); - } - else if (inState(JoinState.JOINING) && isMembershipAnnouncement(msg)) - { - _logger.debug(new LogMessage("Initial membership [{0}] received from {1}", msg, peer)); - super.messageReceived(session, msg); - } - else if (inState(JoinState.INITIATION) && _groupMgr.isLeader(peer)) - { - _logger.debug(new LogMessage("Replaying {0} from leader ", msg)); - super.messageReceived(session, msg); - } - else if (inState(JoinState.INDUCTION)) - { - _logger.debug(new LogMessage("Replaying {0} from peer {1}", msg, peer)); - super.messageReceived(session, msg); - } - else - { - _logger.debug(new LogMessage("Buffering {0} for peer {1}", msg, peer)); - _peerBuffer.receive(session, msg); - } - } - else - { - _logger.debug(new LogMessage("Received {0}", msg)); - super.messageReceived(session, msg); - } - } - - public void deliver(IoSession session, Object msg) throws Exception - { - _logger.debug(new LogMessage("Delivering {0}", msg)); - super.messageReceived(session, msg); - } - - private boolean isMembershipAnnouncement(Object msg) - { - return msg instanceof AMQFrame && (((AMQFrame) msg).getBodyFrame() instanceof ClusterMembershipBody); - } - - private boolean isBufferable(Object msg) - { - return msg instanceof AMQFrame && isBuffereable(((AMQFrame) msg).getBodyFrame()); - } - - private boolean isBuffereable(AMQBody body) - { - return !(body instanceof ConnectionStartOkBody || - body instanceof ConnectionTuneOkBody || - body instanceof ConnectionSecureOkBody || - body instanceof ConnectionOpenBody); - } -} diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java deleted file mode 100644 index eea660c4f0..0000000000 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java +++ /dev/null @@ -1,133 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.cluster; - -import org.apache.mina.common.IoSession; -import org.apache.qpid.AMQException; -import org.apache.qpid.codec.AMQCodecFactory; -import org.apache.qpid.server.AMQChannel; -import org.apache.qpid.server.virtualhost.VirtualHostRegistry; -import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.server.protocol.AMQMinaProtocolSession; -import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.queue.AMQMessage; -import org.apache.qpid.server.queue.QueueRegistry; -import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.registry.IApplicationRegistry; -import org.apache.qpid.server.state.AMQStateManager; - -public class ClusteredProtocolSession extends AMQMinaProtocolSession -{ - private MemberHandle _peer; - - public ClusteredProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry, AMQCodecFactory codecFactory, AMQStateManager stateManager) throws AMQException -// public ClusteredProtocolSession(IoSession session, QueueRegistry queueRegistry, -// ExchangeRegistry exchangeRegistry, AMQCodecFactory codecFactory) throws AMQException - { - super(session, virtualHostRegistry, codecFactory, stateManager); -// super(session, queueRegistry, exchangeRegistry, codecFactory); - } - - public boolean isPeerSession() - { - return _peer != null; - } - - public void setSessionPeer(MemberHandle peer) - { - _peer = peer; - } - - public MemberHandle getSessionPeer() - { - return _peer; - } - - public AMQChannel getChannel(int channelId) - throws AMQException - { - AMQChannel channel = super.getChannel(channelId); - if (isPeerSession() && channel == null) - { - channel = new OneUseChannel(channelId, getVirtualHost()); - addChannel(channel); - } - return channel; - } - - public static boolean isPeerSession(IoSession session) - { - return isPeerSession(getAMQProtocolSession(session)); - } - - public static boolean isPeerSession(AMQProtocolSession session) - { - return session instanceof ClusteredProtocolSession && ((ClusteredProtocolSession) session).isPeerSession(); - } - - public static void setSessionPeer(AMQProtocolSession session, MemberHandle peer) - { - ((ClusteredProtocolSession) session).setSessionPeer(peer); - } - - public static MemberHandle getSessionPeer(AMQProtocolSession session) - { - return ((ClusteredProtocolSession) session).getSessionPeer(); - } - - public static MemberHandle getSessionPeer(IoSession session) - { - return getSessionPeer(getAMQProtocolSession(session)); - } - - /** - * Cleans itself up after delivery of a message (publish frame, header and optional body frame(s)) - */ - private class OneUseChannel extends AMQChannel - { - public OneUseChannel(int channelId, VirtualHost virtualHost) - throws AMQException - { - super(ClusteredProtocolSession.this,channelId, - virtualHost.getMessageStore(), - virtualHost.getExchangeRegistry()); - } - - protected void routeCurrentMessage() throws AMQException - { - super.routeCurrentMessage(); - removeChannel(getChannelId()); - } - } - - public static boolean isPayloadFromPeer(AMQMessage payload) - { - return isPeerSession(payload.getPublisher()); - } - - public static boolean canRelay(AMQMessage payload, MemberHandle target) - { - //can only relay client messages that have not already been relayed to the given target - return !isPayloadFromPeer(payload) && !payload.checkToken(target); - } - -} diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ConnectionStatusMonitor.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ConnectionStatusMonitor.java deleted file mode 100644 index a1f01eff46..0000000000 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ConnectionStatusMonitor.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.cluster; - -import java.io.IOException; - -class ConnectionStatusMonitor -{ - private boolean _complete; - private boolean _redirected; - private String _host; - private int _port; - private RuntimeException _error; - - synchronized void opened() - { - _complete = true; - notifyAll(); - } - - synchronized void redirect(String host, int port) - { - _complete = true; - _redirected = true; - this._host = host; - this._port = port; - } - - synchronized void failed(RuntimeException e) - { - _error = e; - _complete = true; - } - - synchronized boolean waitUntilOpen() throws InterruptedException - { - while (!_complete) - { - wait(); - } - if (_error != null) - { - throw _error; - } - return !_redirected; - } - - synchronized boolean isOpened() - { - return _complete; - } - - String getHost() - { - return _host; - } - - int getPort() - { - return _port; - } -} diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java deleted file mode 100644 index 2f473b63fb..0000000000 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java +++ /dev/null @@ -1,396 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.cluster; - -import org.apache.log4j.Logger; -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.*; -import org.apache.qpid.server.cluster.policy.StandardPolicies; -import org.apache.qpid.server.cluster.replay.ReplayManager; -import org.apache.qpid.server.cluster.util.LogMessage; - -import java.util.List; - -public class DefaultGroupManager implements GroupManager, MemberFailureListener, BrokerFactory, StandardPolicies -{ - private static final Logger _logger = Logger.getLogger(DefaultGroupManager.class); - private final LoadTable _loadTable; - private final BrokerFactory _factory; - private final ReplayManager _replayMgr; - private final BrokerGroup _group; - - DefaultGroupManager(MemberHandle handle, BrokerFactory factory, ReplayManager replayMgr) - { - this(handle, factory, replayMgr, new LoadTable()); - } - - DefaultGroupManager(MemberHandle handle, BrokerFactory factory, ReplayManager replayMgr, LoadTable loadTable) - { - handle = SimpleMemberHandle.resolve(handle); - _logger.info(handle); - _loadTable = loadTable; - _factory = factory; - _replayMgr = replayMgr; - _group = new BrokerGroup(handle, _replayMgr, this); - } - - public JoinState getState() - { - return _group.getState(); - } - - public void addMemberhipChangeListener(MembershipChangeListener l) - { - _group.addMemberhipChangeListener(l); - } - - public void removeMemberhipChangeListener(MembershipChangeListener l) - { - _group.removeMemberhipChangeListener(l); - } - - public void broadcast(Sendable message) throws AMQException - { - for (Broker b : _group.getPeers()) - { - b.send(message, null); - } - } - - public void broadcast(Sendable message, BroadcastPolicy policy, GroupResponseHandler callback) throws AMQException - { - GroupRequest request = new GroupRequest(message, policy, callback); - for (Broker b : _group.getPeers()) - { - b.invoke(request); - } - request.finishedSend(); - } - - public void send(MemberHandle broker, Sendable message) throws AMQException - { - Broker destination = findBroker(broker); - if(destination == null) - { - _logger.warn(new LogMessage("Invalid destination sending {0}. {1} not known", message, broker)); - } - else - { - destination.send(message, null); - _logger.debug(new LogMessage("Sent {0} to {1}", message, broker)); - } - } - - private void send(Broker broker, Sendable message, ResponseHandler handler) throws AMQException - { - broker.send(message, handler); - } - - private void ping(Broker b) throws AMQException - { - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - ClusterPingBody ping = new ClusterPingBody((byte)8, - (byte)0, - ClusterPingBody.getClazz((byte)8, (byte)0), - ClusterPingBody.getMethod((byte)8, (byte)0), - _group.getLocal().getDetails(), - _loadTable.getLocalLoad(), - true); - BlockingHandler handler = new BlockingHandler(); - send(getLeader(), new SimpleBodySendable(ping), handler); - handler.waitForCompletion(); - if (handler.failed()) - { - if (isLeader()) - { - handleFailure(b); - } - else - { - suspect(b); - } - } - else - { - _loadTable.setLoad(b, ((ClusterPingBody) handler.getResponse()).load); - } - } - - public void handlePing(MemberHandle member, long load) - { - _loadTable.setLoad(findBroker(member), load); - } - - public Member redirect() - { - return _loadTable.redirect(); - } - - public void establish() - { - _group.establish(); - _logger.info("Established cluster"); - } - - public void join(MemberHandle member) throws AMQException - { - member = SimpleMemberHandle.resolve(member); - - Broker leader = connectToLeader(member); - _logger.info(new LogMessage("Connected to {0}. joining", leader)); - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - ClusterJoinBody join = new ClusterJoinBody((byte)8, - (byte)0, - ClusterJoinBody.getClazz((byte)8, (byte)0), - ClusterJoinBody.getMethod((byte)8, (byte)0), - _group.getLocal().getDetails()); - - send(leader, new SimpleBodySendable(join)); - } - - private Broker connectToLeader(MemberHandle member) throws AMQException - { - try - { - return _group.connectToLeader(member); - } - catch (Exception e) - { - throw new AMQException("Could not connect to leader: " + e, e); - } - } - - public void leave() throws AMQException - { - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - ClusterLeaveBody leave = new ClusterLeaveBody((byte)8, - (byte)0, - ClusterLeaveBody.getClazz((byte)8, (byte)0), - ClusterLeaveBody.getMethod((byte)8, (byte)0), - _group.getLocal().getDetails()); - - send(getLeader(), new SimpleBodySendable(leave)); - } - - private void suspect(MemberHandle broker) throws AMQException - { - if (_group.isLeader(broker)) - { - //need new leader, if this broker is next in line it can assume leadership - if (_group.assumeLeadership()) - { - announceMembership(); - } - else - { - _logger.warn(new LogMessage("Leader failed. Expecting {0} to succeed.", _group.getMembers().get(1))); - } - } - else - { - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - ClusterSuspectBody suspect = new ClusterSuspectBody((byte)8, - (byte)0, - ClusterSuspectBody.getClazz((byte)8, (byte)0), - ClusterSuspectBody.getMethod((byte)8, (byte)0), - broker.getDetails()); - - send(getLeader(), new SimpleBodySendable(suspect)); - } - } - - - public void handleJoin(MemberHandle member) throws AMQException - { - _logger.info(new LogMessage("Handling join request for {0}", member)); - if(isLeader()) - { - //connect to the host and port specified: - Broker prospect = connectToProspect(member); - announceMembership(); - List<AMQMethodBody> msgs = _replayMgr.replay(true); - _logger.info(new LogMessage("Replaying {0} from leader to {1}", msgs, prospect)); - prospect.replay(msgs); - } - else - { - //pass request on to leader: - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - ClusterJoinBody request = new ClusterJoinBody((byte)8, (byte)0, - ClusterJoinBody.getClazz((byte)8, (byte)0), - ClusterJoinBody.getMethod((byte)8, (byte)0), - member.getDetails()); - - Broker leader = getLeader(); - send(leader, new SimpleBodySendable(request)); - _logger.info(new LogMessage("Passed join request for {0} to {1}", member, leader)); - } - } - - private Broker connectToProspect(MemberHandle member) throws AMQException - { - try - { - return _group.connectToProspect(member); - } - catch (Exception e) - { - e.printStackTrace(); - throw new AMQException("Could not connect to prospect: " + e, e); - } - } - - public void handleLeave(MemberHandle member) throws AMQException - { - handleFailure(findBroker(member)); - announceMembership(); - } - - public void handleSuspect(MemberHandle member) throws AMQException - { - Broker b = findBroker(member); - if(b != null) - { - //ping it to check it has failed, ping will handle failure if it has - ping(b); - announceMembership(); - } - } - - public void handleSynch(MemberHandle member) - { - _group.synched(member); - } - - private ClusterMembershipBody createAnnouncement(String membership) - { - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - ClusterMembershipBody announce = new ClusterMembershipBody((byte)8, (byte)0, - ClusterMembershipBody.getClazz((byte)8, (byte)0), - ClusterMembershipBody.getMethod((byte)8, (byte)0), - membership.getBytes()); - - - return announce; - } - - private void announceMembership() throws AMQException - { - String membership = SimpleMemberHandle.membersToString(_group.getMembers()); - ClusterMembershipBody announce = createAnnouncement(membership); - broadcast(new SimpleBodySendable(announce)); - _logger.info(new LogMessage("Membership announcement sent: {0}", membership)); - } - - private void handleFailure(Broker peer) - { - peer.remove(); - _group.remove(peer); - } - - public void handleMembershipAnnouncement(String membership) throws AMQException - { - _group.setMembers(SimpleMemberHandle.stringToMembers(membership)); - _logger.info(new LogMessage("Membership announcement received: {0}", membership)); - } - - public boolean isLeader() - { - return _group.isLeader(); - } - - public boolean isLeader(MemberHandle handle) - { - return _group.isLeader(handle); - } - - public Broker getLeader() - { - return _group.getLeader(); - } - - private Broker findBroker(MemberHandle handle) - { - return _group.findBroker(handle, false); - } - - public Member getMember(MemberHandle handle) - { - return findBroker(handle); - } - - public boolean isMember(MemberHandle member) - { - for (MemberHandle handle : _group.getMembers()) - { - if (handle.matches(member)) - { - return true; - } - } - return false; - } - - public MemberHandle getLocal() - { - return _group.getLocal(); - } - - public void failed(MemberHandle member) - { - if (isLeader()) - { - handleFailure(findBroker(member)); - try - { - announceMembership(); - } - catch (AMQException e) - { - _logger.error("Error announcing failure: " + e, e); - } - } - else - { - try - { - suspect(member); - } - catch (AMQException e) - { - _logger.error("Error sending suspect: " + e, e); - } - } - } - - public Broker create(MemberHandle handle) - { - Broker broker = _factory.create(handle); - broker.addFailureListener(this); - return broker; - } -} diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/GroupManager.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/GroupManager.java deleted file mode 100644 index 5599ae4b1f..0000000000 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/GroupManager.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.cluster; - -import org.apache.qpid.AMQException; - -public interface GroupManager -{ - /** - * Establish a new cluster with the local member as the leader. - */ - public void establish(); - - /** - * Join the cluster to which member belongs - */ - public void join(MemberHandle member) throws AMQException; - - public void broadcast(Sendable message) throws AMQException; - - public void broadcast(Sendable message, BroadcastPolicy policy, GroupResponseHandler callback) throws AMQException; - - public void send(MemberHandle broker, Sendable message) throws AMQException; - - public void leave() throws AMQException; - - public void handleJoin(MemberHandle member) throws AMQException; - - public void handleLeave(MemberHandle member) throws AMQException; - - public void handleSuspect(MemberHandle member) throws AMQException; - - public void handlePing(MemberHandle member, long load); - - public void handleMembershipAnnouncement(String membership) throws AMQException; - - public void handleSynch(MemberHandle member); - - public boolean isLeader(); - - public boolean isLeader(MemberHandle handle); - - public boolean isMember(MemberHandle member); - - public MemberHandle redirect(); - - public MemberHandle getLocal(); - - public JoinState getState(); - - public void addMemberhipChangeListener(MembershipChangeListener l); - - public void removeMemberhipChangeListener(MembershipChangeListener l); -} diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/GroupRequest.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/GroupRequest.java deleted file mode 100644 index 8ab7856e87..0000000000 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/GroupRequest.java +++ /dev/null @@ -1,107 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.cluster; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQMethodBody; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -/** - * Represents a method sent to a group of Member instances. Manages the responses, - * completion and callback. - * - */ -class GroupRequest -{ - private final Map<Member, AMQMethodBody> _responses = new HashMap<Member, AMQMethodBody>(); - private final List<Member> _brokers = new ArrayList<Member>(); - private boolean _sent; - - private final Sendable _request; - private final BroadcastPolicy _policy; - private final GroupResponseHandler _callback; - - GroupRequest(Sendable request, BroadcastPolicy policy, GroupResponseHandler callback) - { - _request = request; - _policy = policy; - _callback = callback; - } - - void send(int channel, Member session) throws AMQException - { - _brokers.add(session); - _request.send(channel, session); - } - - boolean finishedSend() - { - _sent = true; - return checkCompletion(); - } - - public boolean responseReceived(Member broker, AMQMethodBody response) - { - _responses.put(broker, response); - return checkCompletion(); - } - - public boolean removed(Member broker) - { - _brokers.remove(broker); - return checkCompletion(); - } - - private synchronized boolean checkCompletion() - { - return isComplete() && callback(); - } - - boolean isComplete() - { - return _sent && _policy != null && _policy.isComplete(_responses.size(), _brokers.size()); - } - - boolean callback() - { - _callback.response(getResults(), _brokers); - return true; - } - - List<AMQMethodBody> getResults() - { - List<AMQMethodBody> results = new ArrayList<AMQMethodBody>(_brokers.size()); - for (Member b : _brokers) - { - results.add(_responses.get(b)); - } - return results; - } - - public String toString() - { - return "GroupRequest{request=" + _request +", brokers=" + _brokers + ", responses=" + _responses + "}"; - } -} diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/GroupResponseHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/GroupResponseHandler.java deleted file mode 100644 index d2e9de2f39..0000000000 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/GroupResponseHandler.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.cluster; - -import org.apache.qpid.framing.AMQMethodBody; - -import java.util.List; - -public interface GroupResponseHandler -{ - //Note: this implies that the response to a group request will always be a method body... - public void response(List<AMQMethodBody> responses, List<Member> members); -} diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/InductionBuffer.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/InductionBuffer.java deleted file mode 100644 index 586d7d4ae8..0000000000 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/InductionBuffer.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.cluster; - -import org.apache.mina.common.IoSession; - -import java.util.LinkedList; -import java.util.Queue; - -/** - * Buffers any received messages until join completes. - * - */ -class InductionBuffer -{ - private final Queue<Message> _buffer = new LinkedList<Message>(); - private final MessageHandler _handler; - private boolean _buffering = true; - - InductionBuffer(MessageHandler handler) - { - _handler = handler; - } - - private void process() throws Exception - { - for (Message o = _buffer.poll(); o != null; o = _buffer.poll()) - { - o.deliver(_handler); - } - _buffering = false; - } - - synchronized void deliver() throws Exception - { - process(); - } - - synchronized void receive(IoSession session, Object msg) throws Exception - { - if (_buffering) - { - _buffer.offer(new Message(session, msg)); - } - else - { - _handler.deliver(session, msg); - } - } - - private static class Message - { - private final IoSession _session; - private final Object _msg; - - Message(IoSession session, Object msg) - { - _session = session; - _msg = msg; - } - - void deliver(MessageHandler handler) throws Exception - { - handler.deliver(_session, _msg); - } - } - - static interface MessageHandler - { - public void deliver(IoSession session, Object msg) throws Exception; - } -} diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/JoinState.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/JoinState.java deleted file mode 100644 index 5f92aa2971..0000000000 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/JoinState.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.cluster; - -public enum JoinState -{ - UNINITIALISED, JOINING, INITIATION, INDUCTION, JOINED -} diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/LoadTable.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/LoadTable.java deleted file mode 100644 index 13465a8615..0000000000 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/LoadTable.java +++ /dev/null @@ -1,107 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.cluster; - -import java.util.HashMap; -import java.util.Map; -import java.util.PriorityQueue; - -/** - * Maintains loading information about the local member and its cluster peers. - * - */ -public class LoadTable -{ - private final Map<MemberHandle, Loading> _peers = new HashMap<MemberHandle, Loading>(); - private final PriorityQueue<Loading> _loads = new PriorityQueue<Loading>(); - private final Loading _local = new Loading(null); - - public LoadTable() - { - _loads.add(_local); - } - - public void setLoad(Member member, long load) - { - synchronized (_peers) - { - Loading loading = _peers.get(member); - if (loading == null) - { - loading = new Loading(member); - synchronized (_loads) - { - _loads.add(loading); - } - _peers.put(member, loading); - } - loading.load = load; - } - } - - public void incrementLocalLoad() - { - synchronized (_local) - { - _local.load++; - } - } - - public void decrementLocalLoad() - { - synchronized (_local) - { - _local.load--; - } - } - - public long getLocalLoad() - { - synchronized (_local) - { - return _local.load; - } - } - - public Member redirect() - { - synchronized (_loads) - { - return _loads.peek().member; - } - } - - private static class Loading implements Comparable - { - private final Member member; - private long load; - - Loading(Member member) - { - this.member = member; - } - - public int compareTo(Object o) - { - return (int) (load - ((Loading) o).load); - } - } -} diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/Main.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/Main.java deleted file mode 100644 index 15752353d1..0000000000 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/Main.java +++ /dev/null @@ -1,117 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.cluster; - -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.CommandLineParser; -import org.apache.commons.cli.HelpFormatter; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.OptionBuilder; -import org.apache.commons.cli.Options; -import org.apache.commons.cli.ParseException; -import org.apache.commons.cli.PosixParser; -import org.apache.log4j.Logger; -import org.apache.mina.common.IoAcceptor; -import org.apache.mina.transport.socket.nio.SocketAcceptor; -import org.apache.mina.transport.socket.nio.SocketAcceptorConfig; -import org.apache.mina.transport.socket.nio.SocketSessionConfig; -import org.apache.qpid.pool.ReadWriteThreadModel; -import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry; -import org.apache.qpid.server.transport.ConnectorConfiguration; - -import java.io.File; -import java.io.IOException; -import java.net.InetAddress; -import java.net.InetSocketAddress; - -/** - * TODO: This is a cut-and-paste from the original broker Main class. Would be preferrable to make that class more - * reuseable to avoid all this duplication. - */ -public class Main extends org.apache.qpid.server.Main -{ - private static final Logger _logger = Logger.getLogger(Main.class); - - protected Main(String[] args) - { - super(args); - } - - protected void setOptions(Options otions) - { - super.setOptions(options); - - //extensions: - Option join = OptionBuilder.withArgName("join").hasArg().withDescription("Join the specified cluster member. Overrides any value in the config file"). - withLongOpt("join").create("j"); - options.addOption(join); - } - - protected void bind(int port, ConnectorConfiguration connectorConfig) - { - try - { - IoAcceptor acceptor = new SocketAcceptor(); - SocketAcceptorConfig sconfig = (SocketAcceptorConfig) acceptor.getDefaultConfig(); - SocketSessionConfig sc = (SocketSessionConfig) sconfig.getSessionConfig(); - - sc.setReceiveBufferSize(connectorConfig.socketReceiveBufferSize); - sc.setSendBufferSize(connectorConfig.socketWriteBuferSize); - sc.setTcpNoDelay(true); - - // if we do not use the executor pool threading model we get the default leader follower - // implementation provided by MINA - if (connectorConfig.enableExecutorPool) - { - sconfig.setThreadModel(ReadWriteThreadModel.getInstance()); - } - - String host = InetAddress.getLocalHost().getHostName(); - ClusteredProtocolHandler handler = new ClusteredProtocolHandler(new InetSocketAddress(host, port)); - if (!connectorConfig.enableSSL) - { - acceptor.bind(new InetSocketAddress(port), handler, sconfig); - _logger.info("Qpid.AMQP listening on non-SSL port " + port); - handler.connect(commandLine.getOptionValue("j")); - } - else - { - ClusteredProtocolHandler sslHandler = new ClusteredProtocolHandler(handler); - acceptor.bind(new InetSocketAddress(connectorConfig.sslPort), sslHandler, sconfig); - _logger.info("Qpid.AMQP listening on SSL port " + connectorConfig.sslPort); - } - } - catch (IOException e) - { - _logger.error("Unable to bind service to registry: " + e, e); - } - catch (Exception e) - { - _logger.error("Unable to connect to cluster: " + e, e); - } - } - - public static void main(String[] args) - { - new Main(args); - } -} diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/Member.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/Member.java deleted file mode 100644 index 3fbdfdde70..0000000000 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/Member.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.cluster; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQDataBlock; - -public interface Member extends MemberHandle -{ - public void send(AMQDataBlock data) throws AMQException; - - public void addFailureListener(MemberFailureListener listener); -} diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/MemberFailureListener.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/MemberFailureListener.java deleted file mode 100644 index 7ce45dffaa..0000000000 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/MemberFailureListener.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.cluster; - -interface MemberFailureListener -{ - public void failed(MemberHandle member); -} diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/MemberHandle.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/MemberHandle.java deleted file mode 100644 index b8099a12f7..0000000000 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/MemberHandle.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.cluster; - -import org.apache.qpid.framing.AMQShortString; - -public interface MemberHandle -{ - public String getHost(); - - public int getPort(); - - public boolean matches(MemberHandle m); - - public boolean matches(String host, int port); - - public AMQShortString getDetails(); -} diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/MembershipChangeListener.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/MembershipChangeListener.java deleted file mode 100644 index 591e652e32..0000000000 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/MembershipChangeListener.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.cluster; - -import java.util.List; - -public interface MembershipChangeListener -{ - public void changed(List<MemberHandle> members); -} diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/MethodHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/MethodHandler.java deleted file mode 100644 index a83f034021..0000000000 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/MethodHandler.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.cluster; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQMethodBody; - -interface MethodHandler -{ - public void handle(int channel, AMQMethodBody method) throws AMQException; -} diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/MethodHandlerFactory.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/MethodHandlerFactory.java deleted file mode 100644 index 9bf04f5458..0000000000 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/MethodHandlerFactory.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.cluster; - -import org.apache.qpid.server.state.AMQState; - -public interface MethodHandlerFactory -{ - public MethodHandlerRegistry register(AMQState state, MethodHandlerRegistry registry); -} diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/MethodHandlerRegistry.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/MethodHandlerRegistry.java deleted file mode 100644 index 748a660bb8..0000000000 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/MethodHandlerRegistry.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.cluster; - -import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.server.state.StateAwareMethodListener; - -import java.util.HashMap; -import java.util.Map; - -public class MethodHandlerRegistry -{ - private final Map<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>> registry = - new HashMap<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>>(); - - public <A extends AMQMethodBody, B extends Class<A>> MethodHandlerRegistry addHandler(B type, StateAwareMethodListener<A> handler) - { - registry.put(type, handler); - return this; - } - - public <B extends AMQMethodBody> StateAwareMethodListener<B> getHandler(B frame) - { - return (StateAwareMethodListener<B>) registry.get(frame.getClass()); - } -} diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java deleted file mode 100644 index b01ec491ec..0000000000 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java +++ /dev/null @@ -1,272 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.cluster; - -import org.apache.log4j.Logger; -import org.apache.mina.common.ConnectFuture; -import org.apache.mina.common.IoHandlerAdapter; -import org.apache.mina.common.IoSession; -import org.apache.mina.common.RuntimeIOException; -import org.apache.mina.filter.codec.ProtocolCodecFilter; -import org.apache.mina.transport.socket.nio.SocketConnector; -import org.apache.mina.transport.socket.nio.SocketConnectorConfig; -import org.apache.mina.transport.socket.nio.SocketSessionConfig; -import org.apache.qpid.AMQException; -import org.apache.qpid.server.cluster.util.LogMessage; -import org.apache.qpid.client.state.AMQState; -import org.apache.qpid.codec.AMQCodecFactory; -import org.apache.qpid.framing.AMQBody; -import org.apache.qpid.framing.AMQDataBlock; -import org.apache.qpid.framing.AMQFrame; -import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.framing.ConnectionRedirectBody; -import org.apache.qpid.framing.ProtocolInitiation; -import org.apache.qpid.framing.ProtocolVersion; - -import java.io.IOException; -import java.net.InetSocketAddress; - -/** - * A 'client stub' for a remote cluster peer, using MINA for IO Layer - * - */ -public class MinaBrokerProxy extends Broker implements MethodHandler -{ - private static final Logger _logger = Logger.getLogger(MinaBrokerProxy.class); - private final ConnectionStatusMonitor _connectionMonitor = new ConnectionStatusMonitor(); - private final ClientHandlerRegistry _legacyHandler; - private final MinaBinding _binding = new MinaBinding(); - private final MemberHandle _local; - private IoSession _session; - private MethodHandler _handler; - private Iterable<AMQMethodBody> _replay; - - MinaBrokerProxy(String host, int port, MemberHandle local) - { - super(host, port); - _local = local; - _legacyHandler = new ClientHandlerRegistry(local, null); - } - - private void init(IoSession session) - { - _session = session; - _handler = new ClientAdapter(session, _legacyHandler); - } - - private ConnectFuture connectImpl() - { - _logger.info("Connecting to cluster peer: " + getDetails()); - SocketConnector ioConnector = new SocketConnector(); - SocketConnectorConfig cfg = (SocketConnectorConfig) ioConnector.getDefaultConfig(); - - SocketSessionConfig scfg = (SocketSessionConfig) cfg.getSessionConfig(); - scfg.setTcpNoDelay(true); - scfg.setSendBufferSize(32768); - scfg.setReceiveBufferSize(32768); - InetSocketAddress address = new InetSocketAddress(getHost(), getPort()); - return ioConnector.connect(address, _binding); - } - - //extablish connection without handling redirect - boolean connect() throws IOException, InterruptedException - { - ConnectFuture future = connectImpl(); - // wait for connection to complete - future.join(); - // we call getSession which throws an IOException if there has been an error connecting - try - { - future.getSession(); - } - catch (RuntimeIOException e) - { - _connectionMonitor.failed(e); - _logger.error(new LogMessage("Could not connect to {0}: {1}", this, e), e); - throw e; - } - return _connectionMonitor.waitUntilOpen(); - } - - void connectAsynch(Iterable<AMQMethodBody> msgs) - { - _replay = msgs; - connectImpl(); - } - - void replay(Iterable<AMQMethodBody> msgs) - { - _replay = msgs; - if(_connectionMonitor.isOpened()) - { - replay(); - } - } - - //establish connection, handling redirect if required... - Broker connectToCluster() throws IOException, InterruptedException - { - connect(); - //wait until the connection is open or get a redirection - if (_connectionMonitor.waitUntilOpen()) - { - return this; - } - else - { - Broker broker = new MinaBrokerProxy(_connectionMonitor.getHost(), _connectionMonitor.getPort(), _local); - broker.connect(); - return broker; - } - } - - public void send(AMQDataBlock data) throws AMQConnectionWaitException - { - if (_session == null) - { - try - { - _connectionMonitor.waitUntilOpen(); - } - catch (InterruptedException e) - { - throw new AMQConnectionWaitException("Failed to send " + data + ": " + e, e); - } - } - _session.write(data); - } - - private void replay() - { - if(_replay != null) - { - for(AMQMethodBody b : _replay) - { - _session.write(new AMQFrame(0, b)); - } - } - } - - public void handle(int channel, AMQMethodBody method) throws AMQException - { - _logger.info(new LogMessage("Handling method: {0} for channel {1}", method, channel)); - if (!handleResponse(channel, method)) - { - _logger.warn(new LogMessage("Unhandled method: {0} for channel {1}", method, channel)); - } - } - - private void handleMethod(int channel, AMQMethodBody method) throws AMQException - { - if (method instanceof ConnectionRedirectBody) - { - //signal redirection to waiting thread - ConnectionRedirectBody redirect = (ConnectionRedirectBody) method; - String[] parts = redirect.host.toString().split(":"); - _connectionMonitor.redirect(parts[0], Integer.parseInt(parts[1])); - } - else - { - _handler.handle(channel, method); - if (AMQState.CONNECTION_OPEN.equals(_legacyHandler.getCurrentState()) && _handler != this) - { - _handler = this; - _logger.info(new LogMessage("Connection opened, handler switched")); - //replay any messages: - replay(); - //signal waiting thread: - _connectionMonitor.opened(); - } - } - } - - private void handleFrame(AMQFrame frame) throws AMQException - { - AMQBody body = frame.getBodyFrame(); - if (body instanceof AMQMethodBody) - { - handleMethod(frame.getChannel(), (AMQMethodBody) body); - } - else - { - throw new AMQUnexpectedBodyTypeException(AMQMethodBody.class, body); - } - } - - public String toString() - { - return "MinaBrokerProxy[" + (_session == null ? super.toString() : _session.getRemoteAddress()) + "]"; - } - - private class MinaBinding extends IoHandlerAdapter - { - public void sessionCreated(IoSession session) throws Exception - { - init(session); - _logger.info(new LogMessage("{0}: created", MinaBrokerProxy.this)); - ProtocolCodecFilter pcf = new ProtocolCodecFilter(new AMQCodecFactory(false)); - session.getFilterChain().addLast("protocolFilter", pcf); - - /* Find last protocol version in protocol version list. Make sure last protocol version - listed in the build file (build-module.xml) is the latest version which will be used - here. */ - - session.write(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion())); - } - - public void sessionOpened(IoSession session) throws Exception - { - _logger.info(new LogMessage("{0}: opened", MinaBrokerProxy.this)); - } - - public void sessionClosed(IoSession session) throws Exception - { - _logger.info(new LogMessage("{0}: closed", MinaBrokerProxy.this)); - } - - public void exceptionCaught(IoSession session, Throwable throwable) throws Exception - { - _logger.error(new LogMessage("{0}: received {1}", MinaBrokerProxy.this, throwable), throwable); - if (! (throwable instanceof IOException)) - { - _session.close(); - } - failed(); - } - - public void messageReceived(IoSession session, Object object) throws Exception - { - if (object instanceof AMQFrame) - { - handleFrame((AMQFrame) object); - } - else - { - throw new AMQUnexpectedFrameTypeException("Received message of unrecognised type: " + object); - } - } - - public void messageSent(IoSession session, Object object) throws Exception - { - _logger.debug(new LogMessage("{0}: sent {1}", MinaBrokerProxy.this, object)); - } - } -} diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxyFactory.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxyFactory.java deleted file mode 100644 index 5e70de7665..0000000000 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxyFactory.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.cluster; - -public class MinaBrokerProxyFactory implements BrokerFactory -{ - private final MemberHandle _local; - - MinaBrokerProxyFactory(MemberHandle local) - { - _local = local; - } - - public Broker create(MemberHandle handle) - { - return new MinaBrokerProxy(handle.getHost(), handle.getPort(), _local); - } -} diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ResponseHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ResponseHandler.java deleted file mode 100644 index fe76ca6505..0000000000 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ResponseHandler.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.cluster; - -import org.apache.qpid.framing.AMQMethodBody; - -public interface ResponseHandler -{ - public void responded(AMQMethodBody response); - - public void removed(); -} diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/Sendable.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/Sendable.java deleted file mode 100644 index 159612331c..0000000000 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/Sendable.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.cluster; - -import org.apache.qpid.AMQException; - -public interface Sendable -{ - public void send(int channel, Member member) throws AMQException; -} diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java deleted file mode 100644 index aadcfa4b4c..0000000000 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.cluster; - -import org.apache.log4j.Logger; -import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.server.state.AMQState; -import org.apache.qpid.server.state.AMQStateManager; -//import org.apache.qpid.server.state.IllegalStateTransitionException; -import org.apache.qpid.server.state.StateAwareMethodListener; -import org.apache.qpid.server.cluster.util.LogMessage; -import org.apache.qpid.server.queue.QueueRegistry; -import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.virtualhost.VirtualHostRegistry; - -import java.util.HashMap; -import java.util.Map; - -/** - * An extension of server.AMQStateManager that allows different handlers to be registered. - * - */ -class ServerHandlerRegistry extends AMQStateManager -{ - private final Logger _logger = Logger.getLogger(ServerHandlerRegistry.class); - private final Map<AMQState, MethodHandlerRegistry> _handlers = new HashMap<AMQState, MethodHandlerRegistry>(); - - ServerHandlerRegistry(VirtualHostRegistry virtualHostRegistry, AMQProtocolSession protocolSession) - { - super(AMQState.CONNECTION_NOT_STARTED, false, virtualHostRegistry, protocolSession); - } - - ServerHandlerRegistry(ServerHandlerRegistry s, VirtualHostRegistry virtualHostRegistry, AMQProtocolSession protocolSession) - { - this(virtualHostRegistry, protocolSession); - _handlers.putAll(s._handlers); - } - - ServerHandlerRegistry(MethodHandlerFactory factory, VirtualHostRegistry virtualHostRegistry, AMQProtocolSession protocolSession) - { - this(virtualHostRegistry, protocolSession); - init(factory); - } - - void setHandlers(AMQState state, MethodHandlerRegistry handlers) - { - _handlers.put(state, handlers); - } - - void init(MethodHandlerFactory factory) - { - for (AMQState s : AMQState.values()) - { - setHandlers(s, factory.register(s, new MethodHandlerRegistry())); - } - } - - protected <B extends AMQMethodBody> StateAwareMethodListener<B> findStateTransitionHandler(AMQState state, B frame) //throws IllegalStateTransitionException - { - MethodHandlerRegistry registry = _handlers.get(state); - StateAwareMethodListener<B> handler = (registry == null) ? null : registry.getHandler(frame); - if (handler == null) - { - _logger.warn(new LogMessage("No handler for {0}, {1}", state, frame)); - } - return handler; - } - - <A extends AMQMethodBody, B extends Class<A>> void addHandler(AMQState state, B type, StateAwareMethodListener<A> handler) - { - MethodHandlerRegistry registry = _handlers.get(state); - if (registry == null) - { - registry = new MethodHandlerRegistry(); - _handlers.put(state, registry); - } - registry.addHandler(type, handler); - } -} diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleBodySendable.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleBodySendable.java deleted file mode 100644 index bd3757bf97..0000000000 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleBodySendable.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.cluster; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQBody; -import org.apache.qpid.framing.AMQFrame; - -/** - */ -public class SimpleBodySendable implements Sendable -{ - private final AMQBody _body; - - public SimpleBodySendable(AMQBody body) - { - _body = body; - } - - public void send(int channel, Member member) throws AMQException - { - member.send(new AMQFrame(channel, _body)); - } - - public String toString() - { - return _body.toString(); - } - -} diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleMemberHandle.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleMemberHandle.java deleted file mode 100644 index 1255094b1d..0000000000 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleMemberHandle.java +++ /dev/null @@ -1,166 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.cluster; - -import org.apache.qpid.framing.AMQShortString; - -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.UnknownHostException; -import java.util.ArrayList; -import java.util.List; - -public class SimpleMemberHandle implements MemberHandle -{ - private final String _host; - private final int _port; - - public SimpleMemberHandle(String host, int port) - { - _host = host; - _port = port; - } - - public SimpleMemberHandle(AMQShortString details) - { - this(details.toString()); - } - - public SimpleMemberHandle(String details) - { - String[] parts = details.split(":"); - _host = parts[0]; - _port = Integer.parseInt(parts[1]); - } - - public SimpleMemberHandle(InetSocketAddress address) throws UnknownHostException - { - this(address.getAddress(), address.getPort()); - } - - public SimpleMemberHandle(InetAddress address, int port) throws UnknownHostException - { - this(canonical(address).getHostAddress(), port); - } - - public String getHost() - { - return _host; - } - - public int getPort() - { - return _port; - } - - public int hashCode() - { - return getPort(); - } - - public boolean equals(Object o) - { - return o instanceof MemberHandle && matches((MemberHandle) o); - } - - public boolean matches(MemberHandle m) - { - return matches(m.getHost(), m.getPort()); - } - - public boolean matches(String host, int port) - { - return _host.equals(host) && _port == port; - } - - public AMQShortString getDetails() - { - return new AMQShortString(_host + ":" + _port); - } - - public String toString() - { - return getDetails().toString(); - } - - static List<MemberHandle> stringToMembers(String membership) - { - String[] names = membership.split("\\s"); - List<MemberHandle> members = new ArrayList<MemberHandle>(); - for (String name : names) - { - members.add(new SimpleMemberHandle(name)); - } - return members; - } - - static String membersToString(List<MemberHandle> members) - { - StringBuffer buffer = new StringBuffer(); - boolean first = true; - for (MemberHandle m : members) - { - if (first) - { - first = false; - } - else - { - buffer.append(" "); - } - buffer.append(m.getDetails()); - } - - return buffer.toString(); - } - - private static InetAddress canonical(InetAddress address) throws UnknownHostException - { - if (address.isLoopbackAddress()) - { - return InetAddress.getLocalHost(); - } - else - { - return address; - } - } - - public MemberHandle resolve() - { - return resolve(this); - } - - public static MemberHandle resolve(MemberHandle handle) - { - try - { - return new SimpleMemberHandle(new InetSocketAddress(handle.getHost(), handle.getPort())); - } - catch (UnknownHostException e) - { - e.printStackTrace(); - return handle; - } - } - - -} diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleSendable.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleSendable.java deleted file mode 100644 index 3699a9e128..0000000000 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleSendable.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.cluster; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQFrame; -import org.apache.qpid.framing.amqp_8_0.MethodConverter_8_0; -import org.apache.qpid.framing.abstraction.ContentChunk; -import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter; -import org.apache.qpid.server.queue.AMQMessage; - -import java.util.Iterator; - -public class SimpleSendable implements Sendable -{ - - //todo fixme - remove 0-8 hard coding - ProtocolVersionMethodConverter _methodConverter = new MethodConverter_8_0(); - - private final AMQMessage _message; - - public SimpleSendable(AMQMessage message) - { - _message = message; - } - - public void send(int channel, Member member) throws AMQException - { - member.send(new AMQFrame(channel, _methodConverter.convertToBody(_message.getMessagePublishInfo()))); - member.send(new AMQFrame(channel, _message.getContentHeaderBody())); - Iterator<ContentChunk> it = _message.getContentBodyIterator(); - while (it.hasNext()) - { - member.send(new AMQFrame(channel, _methodConverter.convertToBody(it.next()))); - } - } -} diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChainedClusterMethodHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChainedClusterMethodHandler.java deleted file mode 100644 index 86710e8a31..0000000000 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChainedClusterMethodHandler.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.cluster.handler; - -import org.apache.qpid.server.state.AMQStateManager; -import org.apache.qpid.server.queue.QueueRegistry; -import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.virtualhost.VirtualHostRegistry; -import org.apache.qpid.protocol.AMQMethodEvent; -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQMethodBody; - -import java.util.List; -import java.util.ArrayList; - -public class ChainedClusterMethodHandler <A extends AMQMethodBody> extends ClusterMethodHandler<A> -{ - private final List<ClusterMethodHandler<A>> _handlers; - - private ChainedClusterMethodHandler() - { - this(new ArrayList<ClusterMethodHandler<A>>()); - } - - public ChainedClusterMethodHandler(List<ClusterMethodHandler<A>> handlers) - { - _handlers = handlers; - } - - public ChainedClusterMethodHandler(ClusterMethodHandler<A>... handlers) - { - this(); - for(ClusterMethodHandler<A>handler: handlers) - { - _handlers.add(handler); - } - } - - protected final void peer(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException - { - for(ClusterMethodHandler<A> handler : _handlers) - { - handler.peer(stateMgr, evt); - } - } - - protected final void client(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException - { - for(ClusterMethodHandler<A> handler : _handlers) - { - handler.client(stateMgr, evt); - } - } -} diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java deleted file mode 100644 index c9f6dbfb37..0000000000 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java +++ /dev/null @@ -1,136 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.cluster.handler; - -import org.apache.qpid.server.state.StateAwareMethodListener; -import org.apache.qpid.server.state.AMQStateManager; -import org.apache.qpid.server.queue.QueueRegistry; -import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.protocol.AMQMethodEvent; -import org.apache.qpid.server.cluster.util.LogMessage; -import org.apache.qpid.server.virtualhost.VirtualHostRegistry; -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.*; -import org.apache.log4j.Logger; - -import java.util.Map; -import java.util.HashMap; - -/** - * Maintains the default queue names for a channel, and alters subsequent frames where necessary - * to use this (i.e. when no queue is explictly specified). - * - */ -class ChannelQueueManager -{ - private static final Logger _logger = Logger.getLogger(ChannelQueueManager.class); - private final Map<Integer, AMQShortString> _channelQueues = new HashMap<Integer, AMQShortString>(); - - ClusterMethodHandler<QueueDeclareBody> createQueueDeclareHandler() - { - return new QueueDeclareHandler(); - } - - ClusterMethodHandler<QueueDeleteBody> createQueueDeleteHandler() - { - return new QueueDeleteHandler(); - } - - ClusterMethodHandler<QueueBindBody> createQueueBindHandler() - { - return new QueueBindHandler(); - } - - ClusterMethodHandler<BasicConsumeBody> createBasicConsumeHandler() - { - return new BasicConsumeHandler(); - } - - private void set(int channel, AMQShortString queue) - { - _channelQueues.put(channel, queue); - _logger.info(new LogMessage("Set default queue for {0} to {1}", channel, queue)); - } - - private AMQShortString get(int channel) - { - AMQShortString queue = _channelQueues.get(channel); - _logger.info(new LogMessage("Default queue for {0} is {1}", channel, queue)); - return queue; - } - - private class QueueDeclareHandler extends ClusterMethodHandler<QueueDeclareBody> - { - protected void peer(AMQStateManager stateMgr, AMQMethodEvent<QueueDeclareBody> evt) throws AMQException - { - } - - protected void client(AMQStateManager stateMgr, AMQMethodEvent<QueueDeclareBody> evt) throws AMQException - { - set(evt.getChannelId(), evt.getMethod().queue); - } - } - private class QueueBindHandler extends ClusterMethodHandler<QueueBindBody> - { - protected void peer(AMQStateManager stateMgr, AMQMethodEvent<QueueBindBody> evt) throws AMQException - { - } - - protected void client(AMQStateManager stateMgr, AMQMethodEvent<QueueBindBody> evt) throws AMQException - { - if(evt.getMethod().queue == null) - { - evt.getMethod().queue = get(evt.getChannelId()); - } - } - } - private class QueueDeleteHandler extends ClusterMethodHandler<QueueDeleteBody> - { - protected void peer(AMQStateManager stateMgr, AMQMethodEvent<QueueDeleteBody> evt) throws AMQException - { - } - - protected void client(AMQStateManager stateMgr, AMQMethodEvent<QueueDeleteBody> evt) throws AMQException - { - if(evt.getMethod().queue == null) - { - evt.getMethod().queue = get(evt.getChannelId()); - } - } - } - - private class BasicConsumeHandler extends ClusterMethodHandler<BasicConsumeBody> - { - protected void peer(AMQStateManager stateMgr, AMQMethodEvent<BasicConsumeBody> evt) throws AMQException - { - } - - protected void client(AMQStateManager stateMgr, AMQMethodEvent<BasicConsumeBody> evt) throws AMQException - { - if(evt.getMethod().queue == null) - { - evt.getMethod().queue = get(evt.getChannelId()); - } - } - } - -} diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandler.java deleted file mode 100644 index faab99b0f6..0000000000 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandler.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.cluster.handler; - -import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.server.state.StateAwareMethodListener; -import org.apache.qpid.server.state.AMQStateManager; -import org.apache.qpid.server.queue.QueueRegistry; -import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.protocol.AMQMethodEvent; -import org.apache.qpid.server.cluster.ClusteredProtocolSession; -import org.apache.qpid.AMQException; - -public abstract class ClusterMethodHandler<A extends AMQMethodBody> implements StateAwareMethodListener<A> -{ - public final void methodReceived(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException - { - AMQProtocolSession session = stateMgr.getProtocolSession(); - - if (ClusteredProtocolSession.isPeerSession(session)) - { - peer(stateMgr, evt); - } - else - { - client(stateMgr, evt); - } - } - - protected abstract void peer(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException; - protected abstract void client(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException; -} diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java deleted file mode 100644 index e7509da32a..0000000000 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java +++ /dev/null @@ -1,239 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.cluster.handler; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.*; -import org.apache.qpid.server.cluster.ClusterCapability; -import org.apache.qpid.server.cluster.ClusteredProtocolSession; -import org.apache.qpid.server.cluster.GroupManager; -import org.apache.qpid.server.cluster.LoadTable; -import org.apache.qpid.server.cluster.MemberHandle; -import org.apache.qpid.server.cluster.MethodHandlerFactory; -import org.apache.qpid.server.cluster.MethodHandlerRegistry; -import org.apache.qpid.server.cluster.SimpleMemberHandle; -import org.apache.qpid.server.handler.ChannelCloseHandler; -import org.apache.qpid.server.handler.ChannelFlowHandler; -import org.apache.qpid.server.handler.ChannelOpenHandler; -import org.apache.qpid.server.handler.ConnectionCloseMethodHandler; -import org.apache.qpid.server.handler.ConnectionOpenMethodHandler; -import org.apache.qpid.server.handler.ConnectionSecureOkMethodHandler; -import org.apache.qpid.server.handler.ConnectionStartOkMethodHandler; -import org.apache.qpid.server.handler.ConnectionTuneOkMethodHandler; -import org.apache.qpid.server.handler.ExchangeDeclareHandler; -import org.apache.qpid.server.handler.ExchangeDeleteHandler; -import org.apache.qpid.server.handler.BasicCancelMethodHandler; -import org.apache.qpid.server.handler.BasicPublishMethodHandler; -import org.apache.qpid.server.handler.QueueBindHandler; -import org.apache.qpid.server.handler.QueueDeleteHandler; -import org.apache.qpid.server.handler.BasicQosHandler; -import org.apache.qpid.server.handler.TxSelectHandler; -import org.apache.qpid.server.handler.TxCommitHandler; -import org.apache.qpid.server.handler.TxRollbackHandler; -import org.apache.qpid.protocol.AMQMethodEvent; -import org.apache.qpid.server.state.AMQState; -import org.apache.qpid.server.state.AMQStateManager; -import org.apache.qpid.server.state.StateAwareMethodListener; - -public class ClusterMethodHandlerFactory implements MethodHandlerFactory -{ - private final GroupManager _groupMgr; - private final LoadTable _loadTable; - - public ClusterMethodHandlerFactory(GroupManager groupMgr, LoadTable loadTable) - { - _groupMgr = groupMgr; - _loadTable = loadTable; - } - - public MethodHandlerRegistry register(AMQState state, MethodHandlerRegistry registry) - { - switch (state) - { - case CONNECTION_NOT_STARTED: - return registry.addHandler(ConnectionStartOkBody.class, ConnectionStartOkMethodHandler.getInstance()); - case CONNECTION_NOT_AUTH: - return registry.addHandler(ConnectionSecureOkBody.class, ConnectionSecureOkMethodHandler.getInstance()); - case CONNECTION_NOT_TUNED: - return registry.addHandler(ConnectionTuneOkBody.class, ConnectionTuneOkMethodHandler.getInstance()); - case CONNECTION_NOT_OPENED: - //connection.open override: - return registry.addHandler(ConnectionOpenBody.class, new ConnectionOpenHandler()); - case CONNECTION_OPEN: - return registerConnectionOpened(registry); - } - return registry; - } - - private MethodHandlerRegistry registerConnectionOpened(MethodHandlerRegistry registry) - { - //new cluster method handlers: - registry.addHandler(ClusterJoinBody.class, new JoinHandler()); - registry.addHandler(ClusterLeaveBody.class, new LeaveHandler()); - registry.addHandler(ClusterSuspectBody.class, new SuspectHandler()); - registry.addHandler(ClusterMembershipBody.class, new MembershipHandler()); - registry.addHandler(ClusterPingBody.class, new PingHandler()); - registry.addHandler(ClusterSynchBody.class, new SynchHandler()); - - //connection.close override: - registry.addHandler(ConnectionCloseBody.class, new ConnectionCloseHandler()); - - //replicated handlers: - registry.addHandler(ExchangeDeclareBody.class, replicated(ExchangeDeclareHandler.getInstance())); - registry.addHandler(ExchangeDeleteBody.class, replicated(ExchangeDeleteHandler.getInstance())); - - ChannelQueueManager channelQueueMgr = new ChannelQueueManager(); - - - LocalQueueDeclareHandler handler = new LocalQueueDeclareHandler(_groupMgr); - registry.addHandler(QueueDeclareBody.class, - chain(new QueueNameGenerator(handler), - channelQueueMgr.createQueueDeclareHandler(), - new ReplicatingHandler<QueueDeclareBody>(_groupMgr, handler))); - - registry.addHandler(QueueBindBody.class, chain(channelQueueMgr.createQueueBindHandler(), replicated(QueueBindHandler.getInstance()))); - registry.addHandler(QueueDeleteBody.class, chain(channelQueueMgr.createQueueDeleteHandler(), replicated(alternate(new QueueDeleteHandler(false), new QueueDeleteHandler(true))))); - registry.addHandler(BasicConsumeBody.class, chain(channelQueueMgr.createBasicConsumeHandler(), new ReplicatingConsumeHandler(_groupMgr))); - - //other modified handlers: - registry.addHandler(BasicCancelBody.class, alternate(new RemoteCancelHandler(), BasicCancelMethodHandler.getInstance())); - - //other unaffected handlers: - registry.addHandler(BasicPublishBody.class, BasicPublishMethodHandler.getInstance()); - registry.addHandler(BasicQosBody.class, BasicQosHandler.getInstance()); - registry.addHandler(ChannelOpenBody.class, ChannelOpenHandler.getInstance()); - registry.addHandler(ChannelCloseBody.class, ChannelCloseHandler.getInstance()); - registry.addHandler(ChannelFlowBody.class, ChannelFlowHandler.getInstance()); - registry.addHandler(TxSelectBody.class, TxSelectHandler.getInstance()); - registry.addHandler(TxCommitBody.class, TxCommitHandler.getInstance()); - registry.addHandler(TxRollbackBody.class, TxRollbackHandler.getInstance()); - - - return registry; - } - - private class SynchHandler implements StateAwareMethodListener<ClusterSynchBody> - { - public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ClusterSynchBody> evt) throws AMQException - { - _groupMgr.handleSynch(ClusteredProtocolSession.getSessionPeer(stateManager.getProtocolSession())); - } - } - - private class JoinHandler implements StateAwareMethodListener<ClusterJoinBody> - { - public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ClusterJoinBody> evt) throws AMQException - { - _groupMgr.handleJoin(new SimpleMemberHandle(evt.getMethod().broker)); - } - } - - private class LeaveHandler implements StateAwareMethodListener<ClusterLeaveBody> - { - public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ClusterLeaveBody> evt) throws AMQException - { - _groupMgr.handleLeave(new SimpleMemberHandle(evt.getMethod().broker)); - } - } - - private class SuspectHandler implements StateAwareMethodListener<ClusterSuspectBody> - { - public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ClusterSuspectBody> evt) throws AMQException - { - _groupMgr.handleSuspect(new SimpleMemberHandle(evt.getMethod().broker)); - } - } - - private class MembershipHandler implements StateAwareMethodListener<ClusterMembershipBody> - { - public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ClusterMembershipBody> evt) throws AMQException - { - ClusterMembershipBody body = evt.getMethod(); - _groupMgr.handleMembershipAnnouncement(new String(body.members)); - } - } - - private class PingHandler implements StateAwareMethodListener<ClusterPingBody> - { - public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ClusterPingBody> evt) throws AMQException - { - MemberHandle peer = new SimpleMemberHandle(evt.getMethod().broker); - _groupMgr.handlePing(peer, evt.getMethod().load); - if (evt.getMethod().responseRequired) - { - evt.getMethod().load = _loadTable.getLocalLoad(); - stateManager.getProtocolSession().writeFrame(new AMQFrame(evt.getChannelId(), evt.getMethod())); - } - } - } - - private class ConnectionOpenHandler extends ExtendedHandler<ConnectionOpenBody> - { - ConnectionOpenHandler() - { - super(ConnectionOpenMethodHandler.getInstance()); - } - - void postHandle(AMQStateManager stateMgr, AMQMethodEvent<ConnectionOpenBody> evt) - { - AMQShortString capabilities = evt.getMethod().capabilities; - if (ClusterCapability.contains(capabilities)) - { - ClusteredProtocolSession.setSessionPeer(stateMgr.getProtocolSession(), ClusterCapability.getPeer(capabilities)); - } - else - { - _loadTable.incrementLocalLoad(); - } - } - } - - private class ConnectionCloseHandler extends ExtendedHandler<ConnectionCloseBody> - { - ConnectionCloseHandler() - { - super(ConnectionCloseMethodHandler.getInstance()); - } - - void postHandle(AMQStateManager stateMgr, AMQMethodEvent<ConnectionCloseBody> evt) - { - if (!ClusteredProtocolSession.isPeerSession(stateMgr.getProtocolSession())) - { - _loadTable.decrementLocalLoad(); - } - } - } - - private <B extends AMQMethodBody> ReplicatingHandler<B> replicated(StateAwareMethodListener<B> handler) - { - return new ReplicatingHandler<B>(_groupMgr, handler); - } - - private <B extends AMQMethodBody> StateAwareMethodListener<B> alternate(StateAwareMethodListener<B> peer, StateAwareMethodListener<B> client) - { - return new PeerHandler<B>(peer, client); - } - - private <B extends AMQMethodBody> StateAwareMethodListener<B> chain(ClusterMethodHandler<B>... h) - { - return new ChainedClusterMethodHandler<B>(h); - } -} diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ExtendedHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ExtendedHandler.java deleted file mode 100644 index a2f62f714b..0000000000 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ExtendedHandler.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.cluster.handler; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.protocol.AMQMethodEvent; -import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.queue.QueueRegistry; -import org.apache.qpid.server.state.AMQStateManager; -import org.apache.qpid.server.state.StateAwareMethodListener; - -class ExtendedHandler<A extends AMQMethodBody> implements StateAwareMethodListener<A> -{ - private final StateAwareMethodListener<A> _base; - - ExtendedHandler(StateAwareMethodListener<A> base) - { - _base = base; - } - - public void methodReceived(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException - { - preHandle(stateMgr, evt); - _base.methodReceived(stateMgr, evt); - postHandle(stateMgr, evt); - } - - void preHandle(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException - { - } - - void postHandle(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException - { - } -} diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/HandlerUtils.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/HandlerUtils.java deleted file mode 100644 index 0dc7fe00d2..0000000000 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/HandlerUtils.java +++ /dev/null @@ -1,25 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.cluster.handler; - -public abstract class HandlerUtils -{ -} diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/LocalQueueDeclareHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/LocalQueueDeclareHandler.java deleted file mode 100644 index f01a8349f2..0000000000 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/LocalQueueDeclareHandler.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.cluster.handler; - -import org.apache.log4j.Logger; -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.QueueDeclareBody; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.server.cluster.ClusteredProtocolSession; -import org.apache.qpid.server.cluster.GroupManager; -import org.apache.qpid.server.cluster.util.LogMessage; -import org.apache.qpid.server.cluster.MemberHandle; -import org.apache.qpid.server.handler.QueueDeclareHandler; -import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.ClusteredQueue; -import org.apache.qpid.server.queue.PrivateQueue; -import org.apache.qpid.server.queue.QueueRegistry; -import org.apache.qpid.server.queue.RemoteQueueProxy; -import org.apache.qpid.server.virtualhost.VirtualHost; - -public class LocalQueueDeclareHandler extends QueueDeclareHandler -{ - private static final Logger _logger = Logger.getLogger(LocalQueueDeclareHandler.class); - private final GroupManager _groupMgr; - - LocalQueueDeclareHandler(GroupManager groupMgr) - { - _groupMgr = groupMgr; - } - - protected AMQShortString createName() - { - return new AMQShortString(super.createName().toString() + "@" + _groupMgr.getLocal().getDetails()); - } - - protected AMQQueue createQueue(QueueDeclareBody body, VirtualHost virtualHost, AMQProtocolSession session) throws AMQException - { - //is it private or shared: - if (body.exclusive) - { - if (ClusteredProtocolSession.isPeerSession(session)) - { - //need to get peer from the session... - MemberHandle peer = ClusteredProtocolSession.getSessionPeer(session); - _logger.debug(new LogMessage("Creating proxied queue {0} on behalf of {1}", body.queue, peer)); - return new RemoteQueueProxy(peer, _groupMgr, body.queue, body.durable, new AMQShortString(peer.getDetails()), body.autoDelete, virtualHost); - } - else - { - _logger.debug(new LogMessage("Creating local private queue {0}", body.queue)); - return new PrivateQueue(_groupMgr, body.queue, body.durable, session.getContextKey(), body.autoDelete, virtualHost); - } - } - else - { - _logger.debug(new LogMessage("Creating local shared queue {0}", body.queue)); - return new ClusteredQueue(_groupMgr, body.queue, body.durable, null, body.autoDelete, virtualHost); - } - } -} diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/NullListener.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/NullListener.java deleted file mode 100644 index 8b0bb4b127..0000000000 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/NullListener.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.cluster.handler; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.protocol.AMQMethodEvent; -import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.queue.QueueRegistry; -import org.apache.qpid.server.state.AMQStateManager; -import org.apache.qpid.server.state.StateAwareMethodListener; - -public class NullListener<T extends AMQMethodBody> implements StateAwareMethodListener<T> -{ - public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<T> evt) throws AMQException - { - } -} - diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/PeerHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/PeerHandler.java deleted file mode 100644 index 447e51ccd9..0000000000 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/PeerHandler.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.cluster.handler; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.server.cluster.ClusteredProtocolSession; -import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.protocol.AMQMethodEvent; -import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.queue.QueueRegistry; -import org.apache.qpid.server.state.AMQStateManager; -import org.apache.qpid.server.state.StateAwareMethodListener; - -/** - * Base for implementing handlers that carry out different actions based on whether the method they - * are handling was sent by a peer (i.e. another broker in the cluster) or a client (i.e. an end-user - * application). - * - */ -public class PeerHandler<A extends AMQMethodBody> extends ClusterMethodHandler<A> -{ - private final StateAwareMethodListener<A> _peer; - private final StateAwareMethodListener<A> _client; - - PeerHandler(StateAwareMethodListener<A> peer, StateAwareMethodListener<A> client) - { - _peer = peer; - _client = client; - } - - protected void peer(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException - { - _peer.methodReceived(stateMgr, evt); - } - - protected void client(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException - { - _client.methodReceived(stateMgr, evt); - } - -} diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/QueueNameGenerator.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/QueueNameGenerator.java deleted file mode 100644 index a669171d3c..0000000000 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/QueueNameGenerator.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.cluster.handler; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.QueueDeclareBody; -import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.protocol.AMQMethodEvent; -import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.queue.QueueRegistry; -import org.apache.qpid.server.state.AMQStateManager; - -/** - * Generates queue names for queues declared with no name. - * - */ -class QueueNameGenerator extends ClusterMethodHandler<QueueDeclareBody> -{ - private final LocalQueueDeclareHandler _handler; - - QueueNameGenerator(LocalQueueDeclareHandler handler) - { - _handler = handler; - } - - protected void peer(AMQStateManager stateMgr, AMQMethodEvent<QueueDeclareBody> evt) throws AMQException - { - } - - protected void client(AMQStateManager stateMgr, AMQMethodEvent<QueueDeclareBody> evt) - throws AMQException - { - setName(evt.getMethod());//need to set the name before propagating this method - } - - protected void setName(QueueDeclareBody body) - { - if (body.queue == null) - { - body.queue = _handler.createName(); - } - } -} - diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteCancelHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteCancelHandler.java deleted file mode 100644 index f09763e1ad..0000000000 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteCancelHandler.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.cluster.handler; - -import org.apache.log4j.Logger; -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.BasicCancelBody; -import org.apache.qpid.server.cluster.ClusteredProtocolSession; -import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.protocol.AMQMethodEvent; -import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.ClusteredQueue; -import org.apache.qpid.server.queue.QueueRegistry; -import org.apache.qpid.server.state.AMQStateManager; -import org.apache.qpid.server.state.StateAwareMethodListener; -import org.apache.qpid.server.virtualhost.VirtualHost; - -public class RemoteCancelHandler implements StateAwareMethodListener<BasicCancelBody> -{ - private final Logger _logger = Logger.getLogger(RemoteCancelHandler.class); - - public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<BasicCancelBody> evt) throws AMQException - { - AMQProtocolSession session = stateManager.getProtocolSession(); - VirtualHost virtualHost = session.getVirtualHost(); - QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); - - - //By convention, consumers setup between brokers use the queue name as the consumer tag: - AMQQueue queue = queueRegistry.getQueue(evt.getMethod().consumerTag); - if (queue instanceof ClusteredQueue) - { - ((ClusteredQueue) queue).removeRemoteSubscriber(ClusteredProtocolSession.getSessionPeer(session)); - } - else - { - _logger.warn("Got remote cancel request for non-clustered queue: " + queue); - } - } -} diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java deleted file mode 100644 index 073b13688c..0000000000 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.cluster.handler; - -import org.apache.log4j.Logger; -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.BasicConsumeBody; -import org.apache.qpid.framing.BasicConsumeOkBody; -import org.apache.qpid.server.cluster.ClusteredProtocolSession; -import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.protocol.AMQMethodEvent; -import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.ClusteredQueue; -import org.apache.qpid.server.queue.QueueRegistry; -import org.apache.qpid.server.state.AMQStateManager; -import org.apache.qpid.server.state.StateAwareMethodListener; -import org.apache.qpid.server.virtualhost.VirtualHost; - -/** - * Handles consume requests from other cluster members. - * - */ -public class RemoteConsumeHandler implements StateAwareMethodListener<BasicConsumeBody> -{ - private final Logger _logger = Logger.getLogger(RemoteConsumeHandler.class); - - public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<BasicConsumeBody> evt) throws AMQException - { - AMQProtocolSession session = stateManager.getProtocolSession(); - VirtualHost virtualHost = session.getVirtualHost(); - QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); - - AMQQueue queue = queueRegistry.getQueue(evt.getMethod().queue); - if (queue instanceof ClusteredQueue) - { - ((ClusteredQueue) queue).addRemoteSubcriber(ClusteredProtocolSession.getSessionPeer(session)); - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - // Be aware of possible changes to parameter order as versions change. - session.writeFrame(BasicConsumeOkBody.createAMQFrame(evt.getChannelId(), - (byte)8, (byte)0, // AMQP version (major, minor) - evt.getMethod().queue // consumerTag - )); - } - else - { - _logger.warn("Got remote consume request for non-clustered queue: " + queue); - } - } -} diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingConsumeHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingConsumeHandler.java deleted file mode 100644 index 897f8e4fb7..0000000000 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingConsumeHandler.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.cluster.handler; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.BasicConsumeBody; -import org.apache.qpid.server.cluster.BroadcastPolicy; -import org.apache.qpid.server.cluster.GroupManager; -import org.apache.qpid.server.cluster.util.LogMessage; -import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.server.handler.BasicConsumeMethodHandler; -import org.apache.qpid.protocol.AMQMethodEvent; -import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.QueueRegistry; -import org.apache.qpid.server.state.AMQStateManager; -import org.apache.qpid.server.state.StateAwareMethodListener; -import org.apache.qpid.server.virtualhost.VirtualHost; - -public class ReplicatingConsumeHandler extends ReplicatingHandler<BasicConsumeBody> -{ - ReplicatingConsumeHandler(GroupManager groupMgr) - { - this(groupMgr, null); - } - - ReplicatingConsumeHandler(GroupManager groupMgr, BroadcastPolicy policy) - { - super(groupMgr, base(), policy); - } - - protected void replicate(AMQStateManager stateManager, AMQMethodEvent<BasicConsumeBody> evt) throws AMQException - { - AMQProtocolSession session = stateManager.getProtocolSession(); - VirtualHost virtualHost = session.getVirtualHost(); - ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry(); - QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); - - //only replicate if the queue in question is a shared queue - if (isShared(queueRegistry.getQueue(evt.getMethod().queue))) - { - super.replicate(stateManager, evt); - } - else - { - _logger.info(new LogMessage("Handling consume for private queue ({0}) locally", evt.getMethod())); - local(stateManager, evt); - _logger.info(new LogMessage("Handled consume for private queue ({0}) locally", evt.getMethod())); - - } - } - - protected boolean isShared(AMQQueue queue) - { - return queue != null && queue.isShared(); - } - - static StateAwareMethodListener<BasicConsumeBody> base() - { - return new PeerHandler<BasicConsumeBody>(peer(), client()); - } - - static StateAwareMethodListener<BasicConsumeBody> peer() - { - return new RemoteConsumeHandler(); - } - - static StateAwareMethodListener<BasicConsumeBody> client() - { - return BasicConsumeMethodHandler.getInstance(); - } -} diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingHandler.java deleted file mode 100644 index 888fa4e426..0000000000 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingHandler.java +++ /dev/null @@ -1,125 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.cluster.handler; - -import org.apache.log4j.Logger; -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.server.cluster.util.LogMessage; -import org.apache.qpid.server.cluster.*; -import org.apache.qpid.server.cluster.policy.StandardPolicies; -import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.protocol.AMQMethodEvent; -import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.queue.QueueRegistry; -import org.apache.qpid.server.state.AMQStateManager; -import org.apache.qpid.server.state.StateAwareMethodListener; -import org.apache.qpid.server.virtualhost.VirtualHost; - -import java.util.List; - -/** - * Basic template for handling methods that should be broadcast to the group and - * processed locally after 'completion' of this broadcast. - * - */ -class ReplicatingHandler<A extends AMQMethodBody> extends ClusterMethodHandler<A> implements StandardPolicies -{ - protected static final Logger _logger = Logger.getLogger(ReplicatingHandler.class); - - private final StateAwareMethodListener<A> _base; - private final GroupManager _groupMgr; - private final BroadcastPolicy _policy; - - ReplicatingHandler(GroupManager groupMgr, StateAwareMethodListener<A> base) - { - this(groupMgr, base, null); - } - - ReplicatingHandler(GroupManager groupMgr, StateAwareMethodListener<A> base, BroadcastPolicy policy) - { - _groupMgr = groupMgr; - _base = base; - _policy = policy; - } - - protected void peer(AMQStateManager stateManager, AMQMethodEvent<A> evt) throws AMQException - { - AMQProtocolSession session = stateManager.getProtocolSession(); - VirtualHost virtualHost = session.getVirtualHost(); - ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry(); - QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); - - local(stateManager, evt); - _logger.debug(new LogMessage("Handled {0} locally", evt.getMethod())); - } - - protected void client(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException - { - replicate(stateMgr, evt); - } - - protected void replicate(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException - { - if (_policy == null) - { - //asynch delivery - _groupMgr.broadcast(new SimpleBodySendable(evt.getMethod())); - local(stateMgr, evt); - } - else - { - Callback callback = new Callback(stateMgr, evt); - _groupMgr.broadcast(new SimpleBodySendable(evt.getMethod()), _policy, callback); - } - _logger.debug(new LogMessage("Replicated {0} to peers", evt.getMethod())); - } - - protected void local(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException - { - _base.methodReceived(stateMgr, evt); - } - - private class Callback implements GroupResponseHandler - { - private final AMQStateManager _stateMgr; - private final AMQMethodEvent<A> _evt; - - Callback(AMQStateManager stateMgr, AMQMethodEvent<A> evt) - { - _stateMgr = stateMgr; - _evt = evt; - } - - public void response(List<AMQMethodBody> responses, List<Member> members) - { - try - { - local(_stateMgr, _evt); - _logger.debug(new LogMessage("Handled {0} locally, in response to completion of replication", _evt.getMethod())); - } - catch (AMQException e) - { - _logger.error(new LogMessage("Error handling {0}:{1}", _evt.getMethod(), e), e); - } - } - } -} diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/WrappedListener.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/WrappedListener.java deleted file mode 100644 index 8b0c638d63..0000000000 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/WrappedListener.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.cluster.handler; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.protocol.AMQMethodEvent; -import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.queue.QueueRegistry; -import org.apache.qpid.server.state.AMQStateManager; -import org.apache.qpid.server.state.StateAwareMethodListener; - -public class WrappedListener<T extends AMQMethodBody> implements StateAwareMethodListener<T> -{ - private final StateAwareMethodListener<T> _primary; - private final StateAwareMethodListener _post; - private final StateAwareMethodListener _pre; - - WrappedListener(StateAwareMethodListener<T> primary, StateAwareMethodListener pre, StateAwareMethodListener post) - { - _pre = check(pre); - _post = check(post); - _primary = check(primary); - } - - public void methodReceived(AMQStateManager stateMgr, AMQMethodEvent<T> evt) throws AMQException - { - _pre.methodReceived(stateMgr, evt); - _primary.methodReceived(stateMgr, evt); - _post.methodReceived(stateMgr, evt); - } - - private static <T extends AMQMethodBody> StateAwareMethodListener<T> check(StateAwareMethodListener<T> in) - { - return in == null ? new NullListener<T>() : in; - } -} diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/WrappingMethodHandlerFactory.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/WrappingMethodHandlerFactory.java deleted file mode 100644 index 5ec3c9660a..0000000000 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/WrappingMethodHandlerFactory.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.cluster.handler; - -import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.server.cluster.MethodHandlerFactory; -import org.apache.qpid.server.cluster.MethodHandlerRegistry; -import org.apache.qpid.server.state.AMQState; -import org.apache.qpid.server.state.StateAwareMethodListener; - -public abstract class WrappingMethodHandlerFactory implements MethodHandlerFactory -{ - private final MethodHandlerFactory _delegate; - private final StateAwareMethodListener _pre; - private final StateAwareMethodListener _post; - - protected WrappingMethodHandlerFactory(MethodHandlerFactory delegate, - StateAwareMethodListener pre, - StateAwareMethodListener post) - { - _delegate = delegate; - _pre = pre; - _post = post; - } - - public MethodHandlerRegistry register(AMQState state, MethodHandlerRegistry registry) - { - if (isWrappableState(state)) - { - return wrap(_delegate.register(state, registry), state); - } - else - { - return _delegate.register(state, registry); - } - } - - protected abstract boolean isWrappableState(AMQState state); - - protected abstract Iterable<FrameDescriptor> getWrappableFrameTypes(AMQState state); - - private MethodHandlerRegistry wrap(MethodHandlerRegistry registry, AMQState state) - { - for (FrameDescriptor fd : getWrappableFrameTypes(state)) - { - wrap(registry, fd.type, fd.instance); - } - return registry; - } - - private <A extends AMQMethodBody, B extends Class<A>> void wrap(MethodHandlerRegistry r, B type, A frame) - { - r.addHandler(type, new WrappedListener<A>(r.getHandler(frame), _pre, _post)); - } - - protected static class FrameDescriptor<A extends AMQMethodBody, B extends Class<A>> - { - protected final A instance; - protected final B type; - - public FrameDescriptor(B type, A instance) - { - this.instance = instance; - this.type = type; - } - } -} diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/policy/AsynchBroadcastPolicy.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/policy/AsynchBroadcastPolicy.java deleted file mode 100644 index 79cb558ede..0000000000 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/policy/AsynchBroadcastPolicy.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.cluster.policy; - -import org.apache.qpid.server.cluster.BroadcastPolicy; - -public class AsynchBroadcastPolicy implements BroadcastPolicy -{ - public boolean isComplete(int responded, int members) - { - return true; - } -} diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/policy/MajorityResponseBroadcastPolicy.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/policy/MajorityResponseBroadcastPolicy.java deleted file mode 100644 index 42382c6e7a..0000000000 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/policy/MajorityResponseBroadcastPolicy.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.cluster.policy; - -import org.apache.qpid.server.cluster.BroadcastPolicy; - -public class MajorityResponseBroadcastPolicy implements BroadcastPolicy -{ - public boolean isComplete(int responded, int members) - { - return responded > members / 2; - } -} diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/policy/OneResponseBroadcastPolicy.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/policy/OneResponseBroadcastPolicy.java deleted file mode 100644 index e3072a6a40..0000000000 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/policy/OneResponseBroadcastPolicy.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.cluster.policy; - -import org.apache.qpid.server.cluster.BroadcastPolicy; - -public class OneResponseBroadcastPolicy implements BroadcastPolicy -{ - public boolean isComplete(int responded, int members) - { - return responded > 0; - } -} diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/policy/StandardPolicies.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/policy/StandardPolicies.java deleted file mode 100644 index dbaf690d3a..0000000000 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/policy/StandardPolicies.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.cluster.policy; - -import org.apache.qpid.server.cluster.BroadcastPolicy; - -public interface StandardPolicies -{ - public static final BroadcastPolicy ASYNCH_POLICY = new AsynchBroadcastPolicy(); - public static final BroadcastPolicy SYNCH_POLICY = new SynchBroadcastPolicy(); -} diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/policy/SynchBroadcastPolicy.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/policy/SynchBroadcastPolicy.java deleted file mode 100644 index 605b8dd51e..0000000000 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/policy/SynchBroadcastPolicy.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.cluster.policy; - -import org.apache.qpid.server.cluster.BroadcastPolicy; - -public class SynchBroadcastPolicy implements BroadcastPolicy -{ - public boolean isComplete(int responded, int members) - { - return responded == members; - } -} diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ChainedMethodRecorder.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ChainedMethodRecorder.java deleted file mode 100644 index 3664be58bc..0000000000 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ChainedMethodRecorder.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.cluster.replay; - -import org.apache.qpid.framing.AMQMethodBody; - -abstract class ChainedMethodRecorder <T extends AMQMethodBody> implements MethodRecorder<T> -{ - private final MethodRecorder<T> _recorder; - - ChainedMethodRecorder() - { - this(null); - } - - ChainedMethodRecorder(MethodRecorder<T> recorder) - { - _recorder = recorder; - } - - public final void record(T method) - { - if(!doRecord(method) && _recorder != null) - { - _recorder.record(method); - } - } - - protected abstract boolean doRecord(T method); -} diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java deleted file mode 100644 index 5a433b869b..0000000000 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.cluster.replay; - -import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.framing.BasicConsumeBody; -import org.apache.qpid.framing.AMQShortString; - -import java.util.Map; -import java.util.HashMap; -import java.util.List; - -class ConsumerCounts -{ - private final Map<AMQShortString, Integer> _counts = new HashMap<AMQShortString, Integer>(); - - synchronized void increment(AMQShortString queue) - { - _counts.put(queue, get(queue) + 1); - } - - synchronized void decrement(AMQShortString queue) - { - _counts.put(queue, get(queue) - 1); - } - - private int get(AMQShortString queue) - { - Integer count = _counts.get(queue); - return count == null ? 0 : count; - } - - synchronized void replay(List<AMQMethodBody> messages) - { - for(AMQShortString queue : _counts.keySet()) - { - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - BasicConsumeBody m = new BasicConsumeBody((byte)8, - (byte)0, - BasicConsumeBody.getClazz((byte)8, (byte)0), - BasicConsumeBody.getMethod((byte)8, (byte)0), - null, - queue, - false, - false, - false, - false, - queue, - 0); - m.queue = queue; - m.consumerTag = queue; - replay(m, messages); - } - } - - private void replay(BasicConsumeBody msg, List<AMQMethodBody> messages) - { - int count = _counts.get(msg.queue); - for(int i = 0; i < count; i++) - { - messages.add(msg); - } - } -} diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/MethodRecorder.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/MethodRecorder.java deleted file mode 100644 index e45810438e..0000000000 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/MethodRecorder.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.cluster.replay; - -import org.apache.qpid.framing.AMQMethodBody; - -/** - * Abstraction through which a method can be recorded for replay - * - */ -interface MethodRecorder<T extends AMQMethodBody> -{ - public void record(T method); -} diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java deleted file mode 100644 index 4d3fe1dbed..0000000000 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.cluster.replay; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.framing.BasicCancelBody; -import org.apache.qpid.framing.BasicConsumeBody; -import org.apache.qpid.framing.ExchangeDeclareBody; -import org.apache.qpid.framing.ExchangeDeleteBody; -import org.apache.qpid.framing.QueueBindBody; -import org.apache.qpid.framing.QueueDeclareBody; -import org.apache.qpid.framing.QueueDeleteBody; -import org.apache.qpid.server.cluster.MethodHandlerFactory; -import org.apache.qpid.server.cluster.MethodHandlerRegistry; -import org.apache.qpid.server.cluster.handler.WrappingMethodHandlerFactory; -import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.protocol.AMQMethodEvent; -import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.queue.QueueRegistry; -import org.apache.qpid.server.state.AMQState; -import org.apache.qpid.server.state.AMQStateManager; -import org.apache.qpid.server.state.StateAwareMethodListener; - -import java.util.Arrays; - -public class RecordingMethodHandlerFactory extends WrappingMethodHandlerFactory -{ - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - private final byte major = (byte)8; - private final byte minor = (byte)0; - private final Iterable<FrameDescriptor> _frames = Arrays.asList(new FrameDescriptor[] - { - new FrameDescriptor(QueueDeclareBody.class, new QueueDeclareBody(major, minor, QueueDeclareBody.getClazz(major, minor), QueueDeclareBody.getMethod(major, minor),null,false,false,false,false,false,null,0)), - new FrameDescriptor(QueueDeleteBody.class, new QueueDeleteBody(major, minor, QueueDeleteBody.getClazz(major, minor), QueueDeleteBody.getMethod(major, minor),false,false,false,null,0)), - new FrameDescriptor(QueueBindBody.class, new QueueBindBody(major, minor, QueueBindBody.getClazz(major, minor), QueueBindBody.getMethod(major, minor),null,null,false,null,null,0)), - new FrameDescriptor(ExchangeDeclareBody.class, new ExchangeDeclareBody(major, minor, ExchangeDeclareBody.getClazz(major, minor), ExchangeDeclareBody.getMethod(major, minor),null,false,false,null,false,false,false,0,null)), - new FrameDescriptor(ExchangeDeleteBody.class, new ExchangeDeleteBody(major, minor, ExchangeDeleteBody.getClazz(major, minor), ExchangeDeleteBody.getMethod(major, minor),null,false,false,0)), - new FrameDescriptor(BasicConsumeBody.class, new BasicConsumeBody(major, minor, BasicConsumeBody.getClazz(major, minor), BasicConsumeBody.getMethod(major, minor),null,null,false,false,false,false,null,0)), - new FrameDescriptor(BasicCancelBody.class, new BasicCancelBody(major, minor, BasicCancelBody.getClazz(major, minor), BasicCancelBody.getMethod(major, minor),null,false)) - }); - - - public RecordingMethodHandlerFactory(MethodHandlerFactory factory, ReplayStore store) - { - super(factory, null, store); - } - - protected boolean isWrappableState(AMQState state) - { - return AMQState.CONNECTION_OPEN.equals(state); - } - - protected Iterable<FrameDescriptor> getWrappableFrameTypes(AMQState state) - { - return _frames; - } -} diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayManager.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayManager.java deleted file mode 100644 index 898cb80cb3..0000000000 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayManager.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.cluster.replay; - -import org.apache.qpid.server.cluster.Sendable; -import org.apache.qpid.framing.AMQDataBlock; -import org.apache.qpid.framing.AMQMethodBody; - -import java.util.List; - -/** - * Abstraction of a replay strategy for use in getting joining members up to - * date with respect to cluster state. - * - */ -public interface ReplayManager -{ - public List<AMQMethodBody> replay(boolean isLeader); -} diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java deleted file mode 100644 index d7bbb1c36b..0000000000 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java +++ /dev/null @@ -1,311 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.cluster.replay; - -import org.apache.log4j.Logger; -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.*; -import org.apache.qpid.server.cluster.ClusteredProtocolSession; -import org.apache.qpid.server.cluster.util.LogMessage; -import org.apache.qpid.server.cluster.util.Bindings; -import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.protocol.AMQMethodEvent; -import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.queue.QueueRegistry; -import org.apache.qpid.server.state.AMQStateManager; -import org.apache.qpid.server.state.StateAwareMethodListener; -import org.apache.qpid.server.virtualhost.VirtualHost; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -/** - * Stores method invocations for replay to new members. - * - */ -public class ReplayStore implements ReplayManager, StateAwareMethodListener -{ - private static final Logger _logger = Logger.getLogger(ReplayStore.class); - - private final Map<Class<? extends AMQMethodBody>, MethodRecorder> _globalRecorders = new HashMap<Class<? extends AMQMethodBody>, MethodRecorder>(); - private final Map<Class<? extends AMQMethodBody>, MethodRecorder> _localRecorders = new HashMap<Class<? extends AMQMethodBody>, MethodRecorder>(); - private final Map<AMQShortString, QueueDeclareBody> _sharedQueues = new ConcurrentHashMap<AMQShortString, QueueDeclareBody>(); - private final Map<AMQShortString, QueueDeclareBody> _privateQueues = new ConcurrentHashMap<AMQShortString, QueueDeclareBody>(); - private final Bindings<AMQShortString, AMQShortString, QueueBindBody> _sharedBindings = new Bindings<AMQShortString, AMQShortString, QueueBindBody>(); - private final Bindings<AMQShortString, AMQShortString, QueueBindBody> _privateBindings = new Bindings<AMQShortString, AMQShortString, QueueBindBody>(); - private final Map<AMQShortString, ExchangeDeclareBody> _exchanges = new ConcurrentHashMap<AMQShortString, ExchangeDeclareBody>(); - private final ConsumerCounts _consumers = new ConsumerCounts(); - - public ReplayStore() - { - _globalRecorders.put(QueueDeclareBody.class, new SharedQueueDeclareRecorder()); - _globalRecorders.put(QueueDeleteBody.class, new SharedQueueDeleteRecorder()); - _globalRecorders.put(QueueBindBody.class, new SharedQueueBindRecorder()); - _globalRecorders.put(ExchangeDeclareBody.class, new ExchangeDeclareRecorder()); - _globalRecorders.put(ExchangeDeleteBody.class, new ExchangeDeleteRecorder()); - - _localRecorders.put(QueueDeclareBody.class, new PrivateQueueDeclareRecorder()); - _localRecorders.put(QueueDeleteBody.class, new PrivateQueueDeleteRecorder()); - _localRecorders.put(QueueBindBody.class, new PrivateQueueBindRecorder()); - _localRecorders.put(BasicConsumeBody.class, new BasicConsumeRecorder()); - _localRecorders.put(BasicCancelBody.class, new BasicCancelRecorder()); - _localRecorders.put(ExchangeDeclareBody.class, new ExchangeDeclareRecorder()); - _localRecorders.put(ExchangeDeleteBody.class, new ExchangeDeleteRecorder()); - } - - public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException - { - AMQProtocolSession session = stateManager.getProtocolSession(); - VirtualHost virtualHost = session.getVirtualHost(); - - _logger.debug(new LogMessage("Replay store received {0}", evt.getMethod())); - AMQMethodBody request = evt.getMethod(); - - //allow any (relevant) recorder registered for this type of request to record it: - MethodRecorder recorder = getRecorders(session).get(request.getClass()); - if (recorder != null) - { - recorder.record(request); - } - } - - private Map<Class<? extends AMQMethodBody>, MethodRecorder> getRecorders(AMQProtocolSession session) - { - if (ClusteredProtocolSession.isPeerSession(session)) - { - return _globalRecorders; - } - else - { - return _localRecorders; - } - } - - public List<AMQMethodBody> replay(boolean isLeader) - { - List<AMQMethodBody> methods = new ArrayList<AMQMethodBody>(); - methods.addAll(_exchanges.values()); - methods.addAll(_privateQueues.values()); - synchronized(_privateBindings) - { - methods.addAll(_privateBindings.values()); - } - if (isLeader) - { - methods.addAll(_sharedQueues.values()); - synchronized(_sharedBindings) - { - methods.addAll(_sharedBindings.values()); - } - } - _consumers.replay(methods); - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - methods.add(new ClusterSynchBody((byte)8, (byte)0, ClusterSynchBody.getClazz((byte)8, (byte)0), ClusterSynchBody.getMethod((byte)8, (byte)0))); - return methods; - } - - private class BasicConsumeRecorder implements MethodRecorder<BasicConsumeBody> - { - public void record(BasicConsumeBody method) - { - if(_sharedQueues.containsKey(method.queue)) - { - _consumers.increment(method.queue); - } - } - } - - private class BasicCancelRecorder implements MethodRecorder<BasicCancelBody> - { - public void record(BasicCancelBody method) - { - if(_sharedQueues.containsKey(method.consumerTag)) - { - _consumers.decrement(method.consumerTag); - } - } - } - - private class SharedQueueDeclareRecorder extends QueueDeclareRecorder - { - SharedQueueDeclareRecorder() - { - super(false, _sharedQueues); - } - } - - private class PrivateQueueDeclareRecorder extends QueueDeclareRecorder - { - PrivateQueueDeclareRecorder() - { - super(true, _privateQueues, new SharedQueueDeclareRecorder()); - } - } - - private class SharedQueueDeleteRecorder extends QueueDeleteRecorder - { - SharedQueueDeleteRecorder() - { - super(_sharedQueues, _sharedBindings); - } - } - - private class PrivateQueueDeleteRecorder extends QueueDeleteRecorder - { - PrivateQueueDeleteRecorder() - { - super(_privateQueues, _privateBindings, new SharedQueueDeleteRecorder()); - } - } - - private class SharedQueueBindRecorder extends QueueBindRecorder - { - SharedQueueBindRecorder() - { - super(_sharedQueues, _sharedBindings); - } - } - - private class PrivateQueueBindRecorder extends QueueBindRecorder - { - PrivateQueueBindRecorder() - { - super(_privateQueues, _privateBindings, new SharedQueueBindRecorder()); - } - } - - - private static class QueueDeclareRecorder extends ChainedMethodRecorder<QueueDeclareBody> - { - private final boolean _exclusive; - private final Map<AMQShortString, QueueDeclareBody> _queues; - - QueueDeclareRecorder(boolean exclusive, Map<AMQShortString, QueueDeclareBody> queues) - { - _queues = queues; - _exclusive = exclusive; - } - - QueueDeclareRecorder(boolean exclusive, Map<AMQShortString, QueueDeclareBody> queues, QueueDeclareRecorder recorder) - { - super(recorder); - _queues = queues; - _exclusive = exclusive; - } - - - protected boolean doRecord(QueueDeclareBody method) - { - if (_exclusive == method.exclusive) - { - _queues.put(method.queue, method); - return true; - } - else - { - return false; - } - } - } - - private class QueueDeleteRecorder extends ChainedMethodRecorder<QueueDeleteBody> - { - private final Map<AMQShortString, QueueDeclareBody> _queues; - private final Bindings<AMQShortString, AMQShortString, QueueBindBody> _bindings; - - QueueDeleteRecorder(Map<AMQShortString, QueueDeclareBody> queues, Bindings<AMQShortString, AMQShortString, QueueBindBody> bindings) - { - this(queues, bindings, null); - } - - QueueDeleteRecorder(Map<AMQShortString, QueueDeclareBody> queues, Bindings<AMQShortString, AMQShortString, QueueBindBody> bindings, QueueDeleteRecorder recorder) - { - super(recorder); - _queues = queues; - _bindings = bindings; - } - - protected boolean doRecord(QueueDeleteBody method) - { - if (_queues.remove(method.queue) != null) - { - _bindings.unbind1(method.queue); - return true; - } - else - { - return false; - } - } - } - - private class QueueBindRecorder extends ChainedMethodRecorder<QueueBindBody> - { - private final Map<AMQShortString, QueueDeclareBody> _queues; - private final Bindings<AMQShortString, AMQShortString, QueueBindBody> _bindings; - - QueueBindRecorder(Map<AMQShortString, QueueDeclareBody> queues, Bindings<AMQShortString, AMQShortString, QueueBindBody> bindings) - { - _queues = queues; - _bindings = bindings; - } - - QueueBindRecorder(Map<AMQShortString, QueueDeclareBody> queues, Bindings<AMQShortString, AMQShortString, QueueBindBody> bindings, QueueBindRecorder recorder) - { - super(recorder); - _queues = queues; - _bindings = bindings; - } - - protected boolean doRecord(QueueBindBody method) - { - if (_queues.containsKey(method.queue)) - { - _bindings.bind(method.queue, method.exchange, method); - return true; - } - else - { - return false; - } - } - } - - private class ExchangeDeclareRecorder implements MethodRecorder<ExchangeDeclareBody> - { - public void record(ExchangeDeclareBody method) - { - _exchanges.put(method.exchange, method); - } - } - - private class ExchangeDeleteRecorder implements MethodRecorder<ExchangeDeleteBody> - { - public void record(ExchangeDeleteBody method) - { - _exchanges.remove(method.exchange); - } - } -} diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/util/Bindings.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/util/Bindings.java deleted file mode 100644 index 49de0a7cbf..0000000000 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/util/Bindings.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.cluster.util; - -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; - -/** - * Maps two separate keys to a list of values. - * - */ -public class Bindings<K1, K2, V> -{ - private final MultiValuedMap<K1, Binding<K2>> _a = new MultiValuedMap<K1, Binding<K2>>(); - private final MultiValuedMap<K2, Binding<K1>> _b = new MultiValuedMap<K2, Binding<K1>>(); - private final Collection<V> _values = new HashSet<V>(); - - public void bind(K1 key1, K2 key2, V value) - { - _a.add(key1, new Binding<K2>(key2, value)); - _b.add(key2, new Binding<K1>(key1, value)); - _values.add(value); - } - - public void unbind1(K1 key1) - { - Collection<Binding<K2>> values = _a.remove(key1); - for (Binding<K2> v : values) - { - _b.remove(v.key); - _values.remove(v.value); - } - } - - public void unbind2(K2 key2) - { - Collection<Binding<K1>> values = _b.remove(key2); - for (Binding<K1> v : values) - { - _a.remove(v.key); - _values.remove(v.value); - } - } - - public Collection<V> values() - { - return Collections.unmodifiableCollection(_values); - } - - /** - * Value needs to hold key to the other map - */ - private class Binding<T> - { - private final T key; - private final V value; - - Binding(T key, V value) - { - this.key = key; - this.value = value; - } - } -} diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/util/InvokeMultiple.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/util/InvokeMultiple.java deleted file mode 100644 index 406fe45701..0000000000 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/util/InvokeMultiple.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.cluster.util; - -import java.lang.reflect.InvocationHandler; -import java.lang.reflect.Method; -import java.lang.reflect.Proxy; -import java.util.Set; -import java.util.HashSet; - -/** - * Allows a method to be invoked on a list of listeners with one call - * - */ -public class InvokeMultiple <T> implements InvocationHandler -{ - private final Set<T> _targets = new HashSet<T>(); - private final T _proxy; - - public InvokeMultiple(Class<? extends T> type) - { - _proxy = (T) Proxy.newProxyInstance(type.getClassLoader(), new Class[]{type}, this); - } - - public Object invoke(Object proxy, Method method, Object[] args) throws Throwable - { - Set<T> targets; - synchronized(this) - { - targets = new HashSet<T>(_targets); - } - - for(T target : targets) - { - method.invoke(target, args); - } - return null; - } - - public synchronized void addListener(T t) - { - _targets.add(t); - } - - public synchronized void removeListener(T t) - { - _targets.remove(t); - } - - public T getProxy() - { - return _proxy; - } -} diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/util/LogMessage.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/util/LogMessage.java deleted file mode 100644 index 9be90298ea..0000000000 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/util/LogMessage.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.cluster.util; - -import java.text.MessageFormat; - -/** - * Convenience class to allow log messages to be specified in terms - * of MessageFormat patterns with a variable set of parameters. The - * production of the string is only done if toSTring is called so it - * works well with debug level messages, allowing complex messages - * to be specified that are only evaluated if actually printed. - * - */ -public class LogMessage -{ - private final String _message; - private final Object[] _args; - - public LogMessage(String message) - { - this(message, new Object[0]); - } - - public LogMessage(String message, Object... args) - { - _message = message; - _args = args; - } - - public String toString() - { - return MessageFormat.format(_message, _args); - } -} diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/util/MultiValuedMap.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/util/MultiValuedMap.java deleted file mode 100644 index ebe1fe47dd..0000000000 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/util/MultiValuedMap.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.cluster.util; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; - -/** - * Maps a key to a collection of values - * - */ -public class MultiValuedMap<K, V> -{ - private Map<K, Collection<V>> _map = new HashMap<K, Collection<V>>(); - - public boolean add(K key, V value) - { - Collection<V> values = get(key); - if (values == null) - { - values = createList(); - _map.put(key, values); - } - return values.add(value); - } - - public Collection<V> get(K key) - { - return _map.get(key); - } - - public Collection<V> remove(K key) - { - return _map.remove(key); - } - - protected Collection<V> createList() - { - return new ArrayList<V>(); - } -} diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java deleted file mode 100644 index 9fa96ece1e..0000000000 --- a/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java +++ /dev/null @@ -1,175 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import org.apache.log4j.Logger; -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.BasicCancelBody; -import org.apache.qpid.framing.QueueDeleteBody; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.server.cluster.*; -import org.apache.qpid.server.cluster.util.LogMessage; -import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.store.StoreContext; -import org.apache.qpid.server.virtualhost.VirtualHost; - -import java.util.concurrent.ConcurrentHashMap; - -/** - * Represents a shared queue in a cluster. The key difference is that as well as any - * local consumers, there may be consumers for this queue on other members of the - * cluster. - * - */ -public class ClusteredQueue extends AMQQueue -{ - private static final Logger _logger = Logger.getLogger(ClusteredQueue.class); - private final ConcurrentHashMap<SimpleMemberHandle, RemoteSubscriptionImpl> _peers = new ConcurrentHashMap<SimpleMemberHandle, RemoteSubscriptionImpl>(); - private final GroupManager _groupMgr; - private final NestedSubscriptionManager _subscriptions; - - public ClusteredQueue(GroupManager groupMgr, AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost) - throws AMQException - { - super(name, durable, owner, autoDelete, virtualHost, new ClusteredSubscriptionManager()); - _groupMgr = groupMgr; - _subscriptions = ((ClusteredSubscriptionManager) getSubscribers()).getAllSubscribers(); - } - - - public void process(StoreContext storeContext, AMQMessage msg, boolean deliverFirst) throws AMQException - { - _logger.info(new LogMessage("{0} delivered to clustered queue {1}", msg, this)); - super.process(storeContext, msg, deliverFirst); - } - - protected void autodelete() throws AMQException - { - if(!_subscriptions.hasActiveSubscribers()) - { - //delete locally: - delete(); - - //send deletion request to all other members: - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - QueueDeleteBody request = new QueueDeleteBody((byte)8, - (byte)0, - QueueDeleteBody.getClazz((byte)8,(byte)0), - QueueDeleteBody.getMethod((byte)8,(byte)0), - false, - false, - false, - getName(), - 0); - - _groupMgr.broadcast(new SimpleBodySendable(request)); - } - } - - public void unregisterProtocolSession(AMQProtocolSession ps, int channel, AMQShortString consumerTag) throws AMQException - { - //handle locally: - super.unregisterProtocolSession(ps, channel, consumerTag); - - //signal other members: - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - BasicCancelBody request = new BasicCancelBody((byte)8, - (byte)0, - BasicCancelBody.getClazz((byte)8, (byte)0), - BasicCancelBody.getMethod((byte)8, (byte)0), - getName(), - false); - - _groupMgr.broadcast(new SimpleBodySendable(request)); - } - - public void addRemoteSubcriber(MemberHandle peer) - { - _logger.info(new LogMessage("Added remote subscriber for {0} to clustered queue {1}", peer, this)); - //find (or create) a matching subscriber for the peer then increment the count - getSubscriber(key(peer), true).increment(); - } - - public void removeRemoteSubscriber(MemberHandle peer) - { - //find a matching subscriber for the peer then decrement the count - //if count is now zero, remove the subscriber - SimpleMemberHandle key = key(peer); - RemoteSubscriptionImpl s = getSubscriber(key, true); - if (s == null) - { - throw new RuntimeException("No subscriber for " + peer); - } - if (s.decrement()) - { - _peers.remove(key); - _subscriptions.removeSubscription(s); - } - } - - public void removeAllRemoteSubscriber(MemberHandle peer) - { - SimpleMemberHandle key = key(peer); - RemoteSubscriptionImpl s = getSubscriber(key, true); - _peers.remove(key); - _subscriptions.removeSubscription(s); - } - - private RemoteSubscriptionImpl getSubscriber(SimpleMemberHandle key, boolean create) - { - RemoteSubscriptionImpl s = _peers.get(key); - if (s == null && create) - { - return addSubscriber(key, new RemoteSubscriptionImpl(_groupMgr, key)); - } - else - { - return s; - } - } - - private RemoteSubscriptionImpl addSubscriber(SimpleMemberHandle key, RemoteSubscriptionImpl s) - { - RemoteSubscriptionImpl other = _peers.putIfAbsent(key, s); - if (other == null) - { - _subscriptions.addSubscription(s); - new SubscriberCleanup(key, this, _groupMgr); - return s; - } - else - { - return other; - } - } - - private SimpleMemberHandle key(MemberHandle peer) - { - return peer instanceof SimpleMemberHandle ? (SimpleMemberHandle) peer : (SimpleMemberHandle) SimpleMemberHandle.resolve(peer); - } - - static boolean isFromBroker(AMQMessage msg) - { - return ClusteredProtocolSession.isPayloadFromPeer(msg); - } -} diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java deleted file mode 100644 index 39ae7e3c3e..0000000000 --- a/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import org.apache.log4j.Logger; -import org.apache.qpid.server.cluster.util.LogMessage; - -import java.util.List; - -class ClusteredSubscriptionManager extends SubscriptionSet -{ - private static final Logger _logger = Logger.getLogger(ClusteredSubscriptionManager.class); - private final NestedSubscriptionManager _all; - - ClusteredSubscriptionManager() - { - this(new NestedSubscriptionManager()); - } - - private ClusteredSubscriptionManager(NestedSubscriptionManager all) - { - _all = all; - _all.addSubscription(new Parent()); - } - - NestedSubscriptionManager getAllSubscribers() - { - return _all; - } - - public boolean hasActiveSubscribers() - { - return _all.hasActiveSubscribers(); - } - - public Subscription nextSubscriber(AMQMessage msg) - { - if(ClusteredQueue.isFromBroker(msg)) - { - //if message is from another broker, it should only be delivered - //to another client to meet ordering constraints - Subscription s = super.nextSubscriber(msg); - _logger.info(new LogMessage("Returning next *client* subscriber {0}", s)); - if(s == null) - { - //TODO: deliver to another broker, but set the redelivered flag on the msg - //(this should be policy based) - - //for now just don't deliver it - return null; - } - else - { - return s; - } - } - Subscription s = _all.nextSubscriber(msg); - _logger.info(new LogMessage("Returning next subscriber {0}", s)); - return s; - } - - private class Parent implements WeightedSubscriptionManager - { - public int getWeight() - { - return ClusteredSubscriptionManager.this.getWeight(); - } - - public List<Subscription> getSubscriptions() - { - return ClusteredSubscriptionManager.super.getSubscriptions(); - } - - public boolean hasActiveSubscribers() - { - return ClusteredSubscriptionManager.super.hasActiveSubscribers(); - } - - public Subscription nextSubscriber(AMQMessage msg) - { - return ClusteredSubscriptionManager.super.nextSubscriber(msg); - } - } -} diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/NestedSubscriptionManager.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/NestedSubscriptionManager.java deleted file mode 100644 index 0566c5203b..0000000000 --- a/java/cluster/src/main/java/org/apache/qpid/server/queue/NestedSubscriptionManager.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import java.util.List; -import java.util.LinkedList; -import java.util.concurrent.CopyOnWriteArrayList; - -/** - * Distributes messages among a list of subsscription managers, using their - * weighting. - */ -class NestedSubscriptionManager implements SubscriptionManager -{ - private final List<WeightedSubscriptionManager> _subscribers = new CopyOnWriteArrayList<WeightedSubscriptionManager>(); - private int _iterations; - private int _index; - - void addSubscription(WeightedSubscriptionManager s) - { - _subscribers.add(s); - } - - void removeSubscription(WeightedSubscriptionManager s) - { - _subscribers.remove(s); - } - - - public List<Subscription> getSubscriptions() - { - List<Subscription> allSubs = new LinkedList<Subscription>(); - - for (WeightedSubscriptionManager subMans : _subscribers) - { - allSubs.addAll(subMans.getSubscriptions()); - } - - return allSubs; - } - - public boolean hasActiveSubscribers() - { - for (WeightedSubscriptionManager s : _subscribers) - { - if (s.hasActiveSubscribers()) - { - return true; - } - } - return false; - } - - public Subscription nextSubscriber(AMQMessage msg) - { - WeightedSubscriptionManager start = current(); - for (WeightedSubscriptionManager s = start; s != null; s = next(start)) - { - if (hasMore(s)) - { - return nextSubscriber(s); - } - } - return null; - } - - private Subscription nextSubscriber(WeightedSubscriptionManager s) - { - _iterations++; - return s.nextSubscriber(null); - } - - private WeightedSubscriptionManager current() - { - return _subscribers.isEmpty() ? null : _subscribers.get(_index); - } - - private boolean hasMore(WeightedSubscriptionManager s) - { - return _iterations < s.getWeight(); - } - - private WeightedSubscriptionManager next(WeightedSubscriptionManager start) - { - WeightedSubscriptionManager s = next(); - return s == start && !hasMore(s) ? null : s; - } - - private WeightedSubscriptionManager next() - { - _iterations = 0; - if (++_index >= _subscribers.size()) - { - _index = 0; - } - return _subscribers.get(_index); - } -} diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java deleted file mode 100644 index f8e4311a77..0000000000 --- a/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import org.apache.qpid.AMQException; -import org.apache.qpid.server.cluster.SimpleSendable; -import org.apache.qpid.server.cluster.GroupManager; -import org.apache.qpid.server.cluster.SimpleBodySendable; -import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.framing.QueueDeleteBody; -import org.apache.qpid.framing.AMQShortString; - -import java.util.concurrent.Executor; - -/** - * Used to represent a private queue held locally. - * - */ -public class PrivateQueue extends AMQQueue -{ - private final GroupManager _groupMgr; - - public PrivateQueue(GroupManager groupMgr, AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost) - throws AMQException - { - super(name, durable, owner, autoDelete, virtualHost); - _groupMgr = groupMgr; - - } - - protected void autodelete() throws AMQException - { - //delete locally: - super.autodelete(); - - //send delete request to peers: - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - QueueDeleteBody request = new QueueDeleteBody((byte)8, (byte)0, - QueueDeleteBody.getClazz((byte)8, (byte)0), - QueueDeleteBody.getMethod((byte)8, (byte)0), - false,false,false,null,0); - request.queue = getName(); - _groupMgr.broadcast(new SimpleBodySendable(request)); - } -} diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/ProxiedQueueCleanup.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/ProxiedQueueCleanup.java deleted file mode 100644 index efc0540c18..0000000000 --- a/java/cluster/src/main/java/org/apache/qpid/server/queue/ProxiedQueueCleanup.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import org.apache.qpid.server.cluster.MembershipChangeListener; -import org.apache.qpid.server.cluster.MemberHandle; -import org.apache.qpid.server.cluster.GroupManager; -import org.apache.qpid.server.cluster.util.LogMessage; -import org.apache.qpid.AMQException; -import org.apache.log4j.Logger; - -import java.util.List; - -class ProxiedQueueCleanup implements MembershipChangeListener -{ - private static final Logger _logger = Logger.getLogger(ProxiedQueueCleanup.class); - - private final MemberHandle _subject; - private final RemoteQueueProxy _queue; - - ProxiedQueueCleanup(MemberHandle subject, RemoteQueueProxy queue) - { - _subject = subject; - _queue = queue; - } - - public void changed(List<MemberHandle> members) - { - if(!members.contains(_subject)) - { - try - { - _queue.delete(); - _logger.info(new LogMessage("Deleted {0} in response to exclusion of {1}", _queue, _subject)); - } - catch (AMQException e) - { - _logger.info(new LogMessage("Failed to delete {0} in response to exclusion of {1}: {2}", _queue, _subject, e), e); - } - } - } -} diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteQueueProxy.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteQueueProxy.java deleted file mode 100644 index 2a83d65ae5..0000000000 --- a/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteQueueProxy.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import org.apache.log4j.Logger; -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.BasicPublishBody; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.server.cluster.ClusteredProtocolSession; -import org.apache.qpid.server.cluster.GroupManager; -import org.apache.qpid.server.cluster.MemberHandle; -import org.apache.qpid.server.cluster.SimpleSendable; -import org.apache.qpid.server.cluster.util.LogMessage; -import org.apache.qpid.server.virtualhost.VirtualHost; - -/** - * TODO: separate out an abstract base class from AMQQueue from which this inherits. It does - * not require all the functionality currently in AMQQueue. - * - */ -public class RemoteQueueProxy extends AMQQueue -{ - private static final Logger _logger = Logger.getLogger(RemoteQueueProxy.class); - private final MemberHandle _target; - private final GroupManager _groupMgr; - - public RemoteQueueProxy(MemberHandle target, GroupManager groupMgr, AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost) - throws AMQException - { - super(name, durable, owner, autoDelete, virtualHost); - _target = target; - _groupMgr = groupMgr; - _groupMgr.addMemberhipChangeListener(new ProxiedQueueCleanup(target, this)); - } - - - public void deliver(AMQMessage msg) throws NoConsumersException - { - if (ClusteredProtocolSession.canRelay(msg, _target)) - { - try - { - _logger.debug(new LogMessage("Relaying {0} to {1}", msg, _target)); - relay(msg); - } - catch (NoConsumersException e) - { - throw e; - } - catch (AMQException e) - { - //TODO: sort out exception handling... - e.printStackTrace(); - } - } - else - { - _logger.debug(new LogMessage("Cannot relay {0} to {1}", msg, _target)); - } - } - - void relay(AMQMessage msg) throws AMQException - { - // TODO FIXME - can no longer update the publish body as it is an opaque wrapper object - // if cluster can handle immediate then it should wrap the wrapper... - -// BasicPublishBody publish = msg.getMessagePublishInfo(); -// publish.immediate = false; //can't as yet handle the immediate flag in a cluster - - // send this on to the broker for which it is acting as proxy: - _groupMgr.send(_target, new SimpleSendable(msg)); - } -} diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java deleted file mode 100644 index e396432cea..0000000000 --- a/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java +++ /dev/null @@ -1,176 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import org.apache.qpid.server.cluster.MemberHandle; -import org.apache.qpid.server.cluster.GroupManager; -import org.apache.qpid.server.cluster.SimpleSendable; -import org.apache.qpid.server.AMQChannel; -import org.apache.qpid.AMQException; - -import java.util.Queue; -import java.util.List; - -class RemoteSubscriptionImpl implements Subscription, WeightedSubscriptionManager -{ - private final GroupManager _groupMgr; - private final MemberHandle _peer; - private boolean _suspended; - private int _count; - - RemoteSubscriptionImpl(GroupManager groupMgr, MemberHandle peer) - { - _groupMgr = groupMgr; - _peer = peer; - } - - synchronized void increment() - { - _count++; - } - - synchronized boolean decrement() - { - return --_count <= 0; - } - - public void send(AMQMessage msg, AMQQueue queue) - { - try - { - _groupMgr.send(_peer, new SimpleSendable(msg)); - } - catch (AMQException e) - { - //TODO: handle exceptions properly... - e.printStackTrace(); - } - } - - public synchronized void setSuspended(boolean suspended) - { - _suspended = suspended; - } - - public synchronized boolean isSuspended() - { - return _suspended; - } - - public synchronized int getWeight() - { - return _count; - } - - public List<Subscription> getSubscriptions() - { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - public boolean hasActiveSubscribers() - { - return getWeight() == 0; - } - - public Subscription nextSubscriber(AMQMessage msg) - { - return this; - } - - public void queueDeleted(AMQQueue queue) - { - if (queue instanceof ClusteredQueue) - { - ((ClusteredQueue) queue).removeAllRemoteSubscriber(_peer); - } - } - - public boolean filtersMessages() - { - return false; - } - - public boolean hasInterest(AMQMessage msg) - { - return true; - } - - public Queue<AMQMessage> getPreDeliveryQueue() - { - return null; - } - - public Queue<AMQMessage> getResendQueue() - { - return null; - } - - public Queue<AMQMessage> getNextQueue(Queue<AMQMessage> messages) - { - return messages; - } - - public void enqueueForPreDelivery(AMQMessage msg, boolean deliverFirst) - { - //no-op -- if selectors are implemented on RemoteSubscriptions then look at SubscriptionImpl - } - - public boolean isAutoClose() - { - return false; - } - - public void close() - { - //no-op - } - - public boolean isClosed() - { - return false; - } - - public boolean isBrowser() - { - return false; - } - - public boolean wouldSuspend(AMQMessage msg) - { - return _suspended; - } - - public void addToResendQueue(AMQMessage msg) - { - //no-op - } - - public Object getSendLock() - { - return new Object(); - } - - public AMQChannel getChannel() - { - return null; - } - -} diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/SubscriberCleanup.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/SubscriberCleanup.java deleted file mode 100644 index cc951a4709..0000000000 --- a/java/cluster/src/main/java/org/apache/qpid/server/queue/SubscriberCleanup.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import org.apache.qpid.server.cluster.MembershipChangeListener; -import org.apache.qpid.server.cluster.MemberHandle; -import org.apache.qpid.server.cluster.GroupManager; -import org.apache.qpid.server.cluster.util.LogMessage; -import org.apache.log4j.Logger; - -import java.util.List; - -class SubscriberCleanup implements MembershipChangeListener -{ - private static final Logger _logger = Logger.getLogger(SubscriberCleanup.class); - - private final MemberHandle _subject; - private final ClusteredQueue _queue; - private final GroupManager _manager; - - SubscriberCleanup(MemberHandle subject, ClusteredQueue queue, GroupManager manager) - { - _subject = subject; - _queue = queue; - _manager = manager; - _manager.addMemberhipChangeListener(this); - } - - public void changed(List<MemberHandle> members) - { - if(!members.contains(_subject)) - { - _queue.removeAllRemoteSubscriber(_subject); - _manager.removeMemberhipChangeListener(this); - _logger.info(new LogMessage("Removed {0} from {1}", _subject, _queue)); - } - } -} diff --git a/java/cluster/src/test/java/org/apache/qpid/server/cluster/BrokerGroupTest.java b/java/cluster/src/test/java/org/apache/qpid/server/cluster/BrokerGroupTest.java deleted file mode 100644 index b91d7140e0..0000000000 --- a/java/cluster/src/test/java/org/apache/qpid/server/cluster/BrokerGroupTest.java +++ /dev/null @@ -1,270 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.cluster; - -import junit.framework.TestCase; - -import java.io.IOException; -import java.util.Arrays; - -public class BrokerGroupTest extends TestCase -{ - private final MemberHandle a = new SimpleMemberHandle("A", 1); - private final MemberHandle b = new SimpleMemberHandle("B", 1); - private final MemberHandle c = new SimpleMemberHandle("C", 1); - private final MemberHandle d = new SimpleMemberHandle("D", 1); - - //join (new members perspective) - // (i) connectToLeader() - // ==> check state - // (ii) setMembers() - // ==> check state - // ==> check members - // (iii) synched(leader) - // ==> check state - // ==> check peers - // (iv) synched(other) - // ==> check state - // ==> check peers - // repeat for all others - public void testJoin_newMember() throws Exception - { - MemberHandle[] pre = new MemberHandle[]{a, b, c}; - MemberHandle[] post = new MemberHandle[]{a, b, c}; - - BrokerGroup group = new BrokerGroup(d, new TestReplayManager(), new TestBrokerFactory()); - assertEquals(JoinState.UNINITIALISED, group.getState()); - //(i) - group.connectToLeader(a); - assertEquals(JoinState.JOINING, group.getState()); - assertEquals("Wrong number of peers", 1, group.getPeers().size()); - //(ii) - group.setMembers(Arrays.asList(post)); - assertEquals(JoinState.INITIATION, group.getState()); - assertEquals(Arrays.asList(post), group.getMembers()); - //(iii) & (iv) - for (MemberHandle member : pre) - { - group.synched(member); - if (member == c) - { - assertEquals(JoinState.JOINED, group.getState()); - assertEquals("Wrong number of peers", pre.length, group.getPeers().size()); - } - else - { - assertEquals(JoinState.INDUCTION, group.getState()); - assertEquals("Wrong number of peers", 1, group.getPeers().size()); - } - } - } - - //join (leaders perspective) - // (i) extablish() - // ==> check state - // ==> check members - // ==> check peers - // (ii) connectToProspect() - // ==> check members - // ==> check peers - // repeat (ii) - public void testJoin_Leader() throws IOException, InterruptedException - { - MemberHandle[] prospects = new MemberHandle[]{b, c, d}; - - BrokerGroup group = new BrokerGroup(a, new TestReplayManager(), new TestBrokerFactory()); - assertEquals(JoinState.UNINITIALISED, group.getState()); - //(i) - group.establish(); - assertEquals(JoinState.JOINED, group.getState()); - assertEquals("Wrong number of peers", 0, group.getPeers().size()); - assertEquals("Wrong number of members", 1, group.getMembers().size()); - assertEquals(a, group.getMembers().get(0)); - //(ii) - for (int i = 0; i < prospects.length; i++) - { - group.connectToProspect(prospects[i]); - assertEquals("Wrong number of peers", i + 1, group.getPeers().size()); - for (int j = 0; j <= i; j++) - { - assertTrue(prospects[i].matches(group.getPeers().get(i))); - } - assertEquals("Wrong number of members", i + 2, group.getMembers().size()); - assertEquals(a, group.getMembers().get(0)); - for (int j = 0; j <= i; j++) - { - assertEquals(prospects[i], group.getMembers().get(i + 1)); - } - } - } - - //join (general perspective) - // (i) set up group - // (ii) setMembers() - // ==> check members - // ==> check peers - public void testJoin_general() throws Exception - { - MemberHandle[] view1 = new MemberHandle[]{a, b, c}; - MemberHandle[] view2 = new MemberHandle[]{a, b, c, d}; - MemberHandle[] peers = new MemberHandle[]{a, b, d}; - - BrokerGroup group = new BrokerGroup(c, new TestReplayManager(), new TestBrokerFactory()); - //(i) - group.connectToLeader(a); - group.setMembers(Arrays.asList(view1)); - for (MemberHandle h : view1) - { - group.synched(h); - } - //(ii) - group.setMembers(Arrays.asList(view2)); - assertEquals(Arrays.asList(view2), group.getMembers()); - assertEquals(peers.length, group.getPeers().size()); - for (int i = 0; i < peers.length; i++) - { - assertTrue(peers[i].matches(group.getPeers().get(i))); - } - } - - //leadership transfer (valid) - // (i) set up group - // (ii) assumeLeadership() - // ==> check return value - // ==> check members - // ==> check peers - // ==> check isLeader() - // ==> check isLeader(old_leader) - // ==> check isMember(old_leader) - public void testTransferLeadership_valid() throws Exception - { - MemberHandle[] view1 = new MemberHandle[]{a, b}; - MemberHandle[] view2 = new MemberHandle[]{a, b, c, d}; - MemberHandle[] view3 = new MemberHandle[]{b, c, d}; - - BrokerGroup group = new BrokerGroup(b, new TestReplayManager(), new TestBrokerFactory()); - //(i) - group.connectToLeader(a); - group.setMembers(Arrays.asList(view1)); - for (MemberHandle h : view1) - { - group.synched(h); - } - group.setMembers(Arrays.asList(view2)); - //(ii) - boolean result = group.assumeLeadership(); - assertTrue(result); - assertTrue(group.isLeader()); - assertFalse(group.isLeader(a)); - assertEquals(Arrays.asList(view3), group.getMembers()); - assertEquals(2, group.getPeers().size()); - assertTrue(c.matches(group.getPeers().get(0))); - assertTrue(d.matches(group.getPeers().get(1))); - } - - //leadership transfer (invalid) - // (i) set up group - // (ii) assumeLeadership() - // ==> check return value - // ==> check members - // ==> check peers - // ==> check isLeader() - // ==> check isLeader(old_leader) - // ==> check isMember(old_leader) - public void testTransferLeadership_invalid() throws Exception - { - MemberHandle[] view1 = new MemberHandle[]{a, b, c}; - MemberHandle[] view2 = new MemberHandle[]{a, b, c, d}; - - BrokerGroup group = new BrokerGroup(c, new TestReplayManager(), new TestBrokerFactory()); - //(i) - group.connectToLeader(a); - group.setMembers(Arrays.asList(view1)); - for (MemberHandle h : view1) - { - group.synched(h); - } - group.setMembers(Arrays.asList(view2)); - //(ii) - boolean result = group.assumeLeadership(); - assertFalse(result); - assertFalse(group.isLeader()); - assertTrue(group.isLeader(a)); - assertEquals(Arrays.asList(view2), group.getMembers()); - assertEquals(3, group.getPeers().size()); - assertTrue(a.matches(group.getPeers().get(0))); - assertTrue(b.matches(group.getPeers().get(1))); - assertTrue(d.matches(group.getPeers().get(2))); - - } - - //leave (leaders perspective) - // (i) set up group - // (ii) remove a member - // ==> check members - // ==> check peers - // ==> check isMember(removed_member) - // repeat (ii) - public void testLeave_leader() - { - MemberHandle[] view1 = new MemberHandle[]{a, b, c, d}; - MemberHandle[] view2 = new MemberHandle[]{a, b, d}; - MemberHandle[] view3 = new MemberHandle[]{a, d}; - MemberHandle[] view4 = new MemberHandle[]{a}; - //(i) - BrokerGroup group = new BrokerGroup(a, new TestReplayManager(), new TestBrokerFactory()); - group.establish(); - group.setMembers(Arrays.asList(view1)); - //(ii) - group.remove(group.findBroker(c, false)); - assertEquals(Arrays.asList(view2), group.getMembers()); - - group.remove(group.findBroker(b, false)); - assertEquals(Arrays.asList(view3), group.getMembers()); - - group.remove(group.findBroker(d, false)); - assertEquals(Arrays.asList(view4), group.getMembers()); - } - - - //leave (general perspective) - // (i) set up group - // (ii) setMember - // ==> check members - // ==> check peers - // ==> check isMember(removed_member) - // repeat (ii) - public void testLeave_general() - { - MemberHandle[] view1 = new MemberHandle[]{a, b, c, d}; - MemberHandle[] view2 = new MemberHandle[]{a, c, d}; - //(i) - BrokerGroup group = new BrokerGroup(c, new TestReplayManager(), new TestBrokerFactory()); - group.establish(); //not strictly the correct way to build up the group, but ok for here - group.setMembers(Arrays.asList(view1)); - //(ii) - group.setMembers(Arrays.asList(view2)); - assertEquals(Arrays.asList(view2), group.getMembers()); - assertEquals(2, group.getPeers().size()); - assertTrue(a.matches(group.getPeers().get(0))); - assertTrue(d.matches(group.getPeers().get(1))); - } -} diff --git a/java/cluster/src/test/java/org/apache/qpid/server/cluster/BrokerTest.java b/java/cluster/src/test/java/org/apache/qpid/server/cluster/BrokerTest.java deleted file mode 100644 index f1da312eea..0000000000 --- a/java/cluster/src/test/java/org/apache/qpid/server/cluster/BrokerTest.java +++ /dev/null @@ -1,237 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.cluster; - -import junit.framework.TestCase; -import org.apache.mina.common.ByteBuffer; -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQBody; -import org.apache.qpid.framing.AMQDataBlock; -import org.apache.qpid.framing.AMQFrame; -import org.apache.qpid.framing.AMQFrameDecodingException; -import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.server.cluster.policy.StandardPolicies; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; - -public class BrokerTest extends TestCase -{ - //group request (no failure) - public void testGroupRequest_noFailure() throws AMQException - { - RecordingBroker[] brokers = new RecordingBroker[]{ - new RecordingBroker("A", 1), - new RecordingBroker("B", 2), - new RecordingBroker("C", 3) - }; - GroupResponseValidator handler = new GroupResponseValidator(new TestMethod("response"), new ArrayList<Member>(Arrays.asList(brokers))); - GroupRequest grpRequest = new GroupRequest(new SimpleBodySendable(new TestMethod("request")), StandardPolicies.SYNCH_POLICY, handler); - for (Broker b : brokers) - { - b.invoke(grpRequest); - } - grpRequest.finishedSend(); - - for (RecordingBroker b : brokers) - { - b.handleResponse(((AMQFrame) b.getMessages().get(0)).getChannel(), new TestMethod("response")); - } - - assertTrue("Handler did not receive response", handler.isCompleted()); - } - - //group request (failure) - public void testGroupRequest_failure() throws AMQException - { - RecordingBroker a = new RecordingBroker("A", 1); - RecordingBroker b = new RecordingBroker("B", 2); - RecordingBroker c = new RecordingBroker("C", 3); - RecordingBroker[] all = new RecordingBroker[]{a, b, c}; - RecordingBroker[] succeeded = new RecordingBroker[]{a, c}; - - GroupResponseValidator handler = new GroupResponseValidator(new TestMethod("response"), new ArrayList<Member>(Arrays.asList(succeeded))); - GroupRequest grpRequest = new GroupRequest(new SimpleBodySendable(new TestMethod("request")), StandardPolicies.SYNCH_POLICY, handler); - - for (Broker broker : all) - { - broker.invoke(grpRequest); - } - grpRequest.finishedSend(); - - for (RecordingBroker broker : succeeded) - { - broker.handleResponse(((AMQFrame) broker.getMessages().get(0)).getChannel(), new TestMethod("response")); - } - b.remove(); - - assertTrue("Handler did not receive response", handler.isCompleted()); - } - - - //simple send (no response) - public void testSend_noResponse() throws AMQException - { - AMQBody[] msgs = new AMQBody[]{ - new TestMethod("A"), - new TestMethod("B"), - new TestMethod("C") - }; - RecordingBroker broker = new RecordingBroker("myhost", 1); - for (AMQBody msg : msgs) - { - broker.send(new SimpleBodySendable(msg), null); - } - List<AMQDataBlock> sent = broker.getMessages(); - assertEquals(msgs.length, sent.size()); - for (int i = 0; i < msgs.length; i++) - { - assertTrue(sent.get(i) instanceof AMQFrame); - assertEquals(msgs[i], ((AMQFrame) sent.get(i)).getBodyFrame()); - } - } - - //simple send (no failure) - public void testSend_noFailure() throws AMQException - { - RecordingBroker broker = new RecordingBroker("myhost", 1); - BlockingHandler handler = new BlockingHandler(); - broker.send(new SimpleBodySendable(new TestMethod("A")), handler); - List<AMQDataBlock> sent = broker.getMessages(); - assertEquals(1, sent.size()); - assertTrue(sent.get(0) instanceof AMQFrame); - assertEquals(new TestMethod("A"), ((AMQFrame) sent.get(0)).getBodyFrame()); - - broker.handleResponse(((AMQFrame) sent.get(0)).getChannel(), new TestMethod("B")); - - assertEquals(new TestMethod("B"), handler.getResponse()); - } - - //simple send (failure) - public void testSend_failure() throws AMQException - { - RecordingBroker broker = new RecordingBroker("myhost", 1); - BlockingHandler handler = new BlockingHandler(); - broker.send(new SimpleBodySendable(new TestMethod("A")), handler); - List<AMQDataBlock> sent = broker.getMessages(); - assertEquals(1, sent.size()); - assertTrue(sent.get(0) instanceof AMQFrame); - assertEquals(new TestMethod("A"), ((AMQFrame) sent.get(0)).getBodyFrame()); - broker.remove(); - assertEquals(null, handler.getResponse()); - assertTrue(handler.isCompleted()); - assertTrue(handler.failed()); - } - - private static class TestMethod extends AMQMethodBody - { - private final Object id; - - TestMethod(Object id) - { - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - super((byte)8, (byte)0); - this.id = id; - } - - protected int getBodySize() - { - return 0; - } - - protected int getClazz() - { - return 1002; - } - - protected int getMethod() - { - return 1003; - } - - protected void writeMethodPayload(ByteBuffer buffer) - { - } - - protected byte getType() - { - return 0; - } - - protected int getSize() - { - return 0; - } - - protected void writePayload(ByteBuffer buffer) - { - } - - protected void populateMethodBodyFromBuffer(ByteBuffer buffer) throws AMQFrameDecodingException - { - } - - protected void populateFromBuffer(ByteBuffer buffer, long size) throws AMQFrameDecodingException - { - } - - public boolean equals(Object o) - { - return o instanceof TestMethod && id.equals(((TestMethod) o).id); - } - - public int hashCode() - { - return id.hashCode(); - } - - } - - private static class GroupResponseValidator implements GroupResponseHandler - { - private final AMQMethodBody _response; - private final List<Member> _members; - private boolean _completed = false; - - GroupResponseValidator(AMQMethodBody response, List<Member> members) - { - _response = response; - _members = members; - } - - public void response(List<AMQMethodBody> responses, List<Member> members) - { - for (AMQMethodBody r : responses) - { - assertEquals(_response, r); - } - assertEquals(_members, members); - _completed = true; - } - - boolean isCompleted() - { - return _completed; - } - } -} diff --git a/java/cluster/src/test/java/org/apache/qpid/server/cluster/ClusterCapabilityTest.java b/java/cluster/src/test/java/org/apache/qpid/server/cluster/ClusterCapabilityTest.java deleted file mode 100644 index 830a00f4c2..0000000000 --- a/java/cluster/src/test/java/org/apache/qpid/server/cluster/ClusterCapabilityTest.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.cluster; - -import junit.framework.TestCase; -import org.apache.qpid.framing.AMQShortString; - -public class ClusterCapabilityTest extends TestCase -{ - public void testStartWithNull() - { - MemberHandle peer = new SimpleMemberHandle("myhost:9999"); - AMQShortString c = ClusterCapability.add(null, peer); - assertTrue(ClusterCapability.contains(c)); - assertTrue(peer.matches(ClusterCapability.getPeer(c))); - } - - public void testStartWithText() - { - MemberHandle peer = new SimpleMemberHandle("myhost:9999"); - AMQShortString c = ClusterCapability.add(new AMQShortString("existing text"), peer); - assertTrue(ClusterCapability.contains(c)); - assertTrue(peer.matches(ClusterCapability.getPeer(c))); - } -} diff --git a/java/cluster/src/test/java/org/apache/qpid/server/cluster/InductionBufferTest.java b/java/cluster/src/test/java/org/apache/qpid/server/cluster/InductionBufferTest.java deleted file mode 100644 index 7e58add91e..0000000000 --- a/java/cluster/src/test/java/org/apache/qpid/server/cluster/InductionBufferTest.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.cluster; - -import org.apache.mina.common.IoSession; - -import java.util.List; -import java.util.ArrayList; - -import junit.framework.TestCase; - -public class InductionBufferTest extends TestCase -{ - public void test() throws Exception - { - IoSession session1 = new TestSession(); - IoSession session2 = new TestSession(); - IoSession session3 = new TestSession(); - - TestMessageHandler handler = new TestMessageHandler(); - InductionBuffer buffer = new InductionBuffer(handler); - - buffer.receive(session1, "one"); - buffer.receive(session2, "two"); - buffer.receive(session3, "three"); - - buffer.receive(session1, "four"); - buffer.receive(session1, "five"); - buffer.receive(session1, "six"); - - buffer.receive(session3, "seven"); - buffer.receive(session3, "eight"); - - handler.checkEmpty(); - buffer.deliver(); - - handler.check(session1, "one"); - handler.check(session2, "two"); - handler.check(session3, "three"); - - handler.check(session1, "four"); - handler.check(session1, "five"); - handler.check(session1, "six"); - - handler.check(session3, "seven"); - handler.check(session3, "eight"); - handler.checkEmpty(); - - buffer.receive(session1, "nine"); - buffer.receive(session2, "ten"); - buffer.receive(session3, "eleven"); - - handler.check(session1, "nine"); - handler.check(session2, "ten"); - handler.check(session3, "eleven"); - - handler.checkEmpty(); - } - - private static class TestMessageHandler implements InductionBuffer.MessageHandler - { - private final List<IoSession> _sessions = new ArrayList<IoSession>(); - private final List<Object> _msgs = new ArrayList<Object>(); - - public synchronized void deliver(IoSession session, Object msg) throws Exception - { - _sessions.add(session); - _msgs.add(msg); - } - - void check(IoSession actualSession, Object actualMsg) - { - assertFalse(_sessions.isEmpty()); - assertFalse(_msgs.isEmpty()); - IoSession expectedSession = _sessions.remove(0); - Object expectedMsg = _msgs.remove(0); - assertEquals(expectedSession, actualSession); - assertEquals(expectedMsg, actualMsg); - } - - void checkEmpty() - { - assertTrue(_sessions.isEmpty()); - assertTrue(_msgs.isEmpty()); - } - } -} - diff --git a/java/cluster/src/test/java/org/apache/qpid/server/cluster/RecordingBroker.java b/java/cluster/src/test/java/org/apache/qpid/server/cluster/RecordingBroker.java deleted file mode 100644 index 1ec5154a98..0000000000 --- a/java/cluster/src/test/java/org/apache/qpid/server/cluster/RecordingBroker.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.cluster; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQDataBlock; - -import java.util.ArrayList; -import java.util.List; - -class RecordingBroker extends TestBroker -{ - private final List<AMQDataBlock> _messages = new ArrayList<AMQDataBlock>(); - - RecordingBroker(String host, int port) - { - super(host, port); - } - - public void send(AMQDataBlock data) throws AMQException - { - _messages.add(data); - } - - List<AMQDataBlock> getMessages() - { - return _messages; - } - - void clear() - { - _messages.clear(); - } - -} diff --git a/java/cluster/src/test/java/org/apache/qpid/server/cluster/RecordingBrokerFactory.java b/java/cluster/src/test/java/org/apache/qpid/server/cluster/RecordingBrokerFactory.java deleted file mode 100644 index d3e972e273..0000000000 --- a/java/cluster/src/test/java/org/apache/qpid/server/cluster/RecordingBrokerFactory.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.cluster; - -class RecordingBrokerFactory implements BrokerFactory -{ - public Broker create(MemberHandle handle) - { - return new RecordingBroker(handle.getHost(), handle.getPort()); - } -} diff --git a/java/cluster/src/test/java/org/apache/qpid/server/cluster/SimpleClusterTest.java b/java/cluster/src/test/java/org/apache/qpid/server/cluster/SimpleClusterTest.java deleted file mode 100644 index 86cde3cee7..0000000000 --- a/java/cluster/src/test/java/org/apache/qpid/server/cluster/SimpleClusterTest.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.cluster; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.url.URLSyntaxException; -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQSession; - -import javax.jms.JMSException; - -import junit.framework.TestCase; - -public class SimpleClusterTest extends TestCase -{ - public void testDeclareExchange() throws AMQException, JMSException, URLSyntaxException - { - AMQConnection con = new AMQConnection("localhost:9000", "guest", "guest", "test", "/test"); - AMQSession session = (AMQSession) con.createSession(false, AMQSession.NO_ACKNOWLEDGE); - System.out.println("Session created"); - session.declareExchange(new AMQShortString("my_exchange"), new AMQShortString("direct"), true); - System.out.println("Exchange declared"); - con.close(); - System.out.println("Connection closed"); - } -} diff --git a/java/cluster/src/test/java/org/apache/qpid/server/cluster/SimpleMemberHandleTest.java b/java/cluster/src/test/java/org/apache/qpid/server/cluster/SimpleMemberHandleTest.java deleted file mode 100644 index 8ff8357377..0000000000 --- a/java/cluster/src/test/java/org/apache/qpid/server/cluster/SimpleMemberHandleTest.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.cluster; - -import junit.framework.TestCase; - -public class SimpleMemberHandleTest extends TestCase -{ - public void testMatches() - { - assertMatch(new SimpleMemberHandle("localhost", 8888), new SimpleMemberHandle("localhost", 8888)); - assertNoMatch(new SimpleMemberHandle("localhost", 8889), new SimpleMemberHandle("localhost", 8888)); - assertNoMatch(new SimpleMemberHandle("localhost", 8888), new SimpleMemberHandle("localhost2", 8888)); - } - - public void testResolve() - { - assertEquivalent(new SimpleMemberHandle("WGLAIBD8XGR0J:9000"), new SimpleMemberHandle("localhost:9000")); - } - - private void assertEquivalent(MemberHandle a, MemberHandle b) - { - String msg = a + " is not equivalent to " + b; - a = SimpleMemberHandle.resolve(a); - b = SimpleMemberHandle.resolve(b); - msg += "(" + a + " does not match " + b + ")"; - assertTrue(msg, a.matches(b)); - } - - private void assertMatch(MemberHandle a, MemberHandle b) - { - assertTrue(a + " does not match " + b, a.matches(b)); - } - - private void assertNoMatch(MemberHandle a, MemberHandle b) - { - assertFalse(a + " matches " + b, a.matches(b)); - } -} diff --git a/java/cluster/src/test/java/org/apache/qpid/server/cluster/TestBroker.java b/java/cluster/src/test/java/org/apache/qpid/server/cluster/TestBroker.java deleted file mode 100644 index d3ccbf0ac6..0000000000 --- a/java/cluster/src/test/java/org/apache/qpid/server/cluster/TestBroker.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.cluster; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQDataBlock; -import org.apache.qpid.framing.AMQFrame; -import org.apache.qpid.framing.AMQMethodBody; - -import java.io.IOException; - -class TestBroker extends Broker -{ - TestBroker(String host, int port) - { - super(host, port); - } - - boolean connect() throws IOException, InterruptedException - { - return true; - } - - void connectAsynch(Iterable<AMQMethodBody> msgs) - { - replay(msgs); - } - - void replay(Iterable<AMQMethodBody> msgs) - { - try - { - for (AMQMethodBody b : msgs) - { - send(new AMQFrame(0, b)); - } - } - catch (AMQException e) - { - throw new RuntimeException(e); - } - } - - Broker connectToCluster() throws IOException, InterruptedException - { - return this; - } - - public void send(AMQDataBlock data) throws AMQException - { - } -} diff --git a/java/cluster/src/test/java/org/apache/qpid/server/cluster/TestBrokerFactory.java b/java/cluster/src/test/java/org/apache/qpid/server/cluster/TestBrokerFactory.java deleted file mode 100644 index 92eaec876a..0000000000 --- a/java/cluster/src/test/java/org/apache/qpid/server/cluster/TestBrokerFactory.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.cluster; - -class TestBrokerFactory implements BrokerFactory -{ - public Broker create(MemberHandle handle) - { - return new TestBroker(handle.getHost(), handle.getPort()); - } -} diff --git a/java/cluster/src/test/java/org/apache/qpid/server/cluster/TestReplayManager.java b/java/cluster/src/test/java/org/apache/qpid/server/cluster/TestReplayManager.java deleted file mode 100644 index c529c83cc0..0000000000 --- a/java/cluster/src/test/java/org/apache/qpid/server/cluster/TestReplayManager.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.cluster; - -import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.server.cluster.replay.ReplayManager; - -import java.util.ArrayList; -import java.util.List; - -class TestReplayManager implements ReplayManager -{ - private final List<AMQMethodBody> _msgs; - - TestReplayManager() - { - this(new ArrayList<AMQMethodBody>()); - } - - TestReplayManager(List<AMQMethodBody> msgs) - { - _msgs = msgs; - } - - public List<AMQMethodBody> replay(boolean isLeader) - { - return _msgs; - } -} diff --git a/java/cluster/src/test/java/org/apache/qpid/server/cluster/TestSession.java b/java/cluster/src/test/java/org/apache/qpid/server/cluster/TestSession.java deleted file mode 100644 index 86ec808924..0000000000 --- a/java/cluster/src/test/java/org/apache/qpid/server/cluster/TestSession.java +++ /dev/null @@ -1,269 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.cluster; - -import org.apache.mina.common.*; - -import java.net.SocketAddress; -import java.util.Set; - -class TestSession implements IoSession -{ - public IoService getService() - { - return null; //TODO - } - - public IoServiceConfig getServiceConfig() - { - return null; //TODO - } - - public IoHandler getHandler() - { - return null; //TODO - } - - public IoSessionConfig getConfig() - { - return null; //TODO - } - - public IoFilterChain getFilterChain() - { - return null; //TODO - } - - public WriteFuture write(Object message) - { - return null; //TODO - } - - public CloseFuture close() - { - return null; //TODO - } - - public Object getAttachment() - { - return null; //TODO - } - - public Object setAttachment(Object attachment) - { - return null; //TODO - } - - public Object getAttribute(String key) - { - return null; //TODO - } - - public Object setAttribute(String key, Object value) - { - return null; //TODO - } - - public Object setAttribute(String key) - { - return null; //TODO - } - - public Object removeAttribute(String key) - { - return null; //TODO - } - - public boolean containsAttribute(String key) - { - return false; //TODO - } - - public Set getAttributeKeys() - { - return null; //TODO - } - - public TransportType getTransportType() - { - return null; //TODO - } - - public boolean isConnected() - { - return false; //TODO - } - - public boolean isClosing() - { - return false; //TODO - } - - public CloseFuture getCloseFuture() - { - return null; //TODO - } - - public SocketAddress getRemoteAddress() - { - return null; //TODO - } - - public SocketAddress getLocalAddress() - { - return null; //TODO - } - - public SocketAddress getServiceAddress() - { - return null; //TODO - } - - public int getIdleTime(IdleStatus status) - { - return 0; //TODO - } - - public long getIdleTimeInMillis(IdleStatus status) - { - return 0; //TODO - } - - public void setIdleTime(IdleStatus status, int idleTime) - { - //TODO - } - - public int getWriteTimeout() - { - return 0; //TODO - } - - public long getWriteTimeoutInMillis() - { - return 0; //TODO - } - - public void setWriteTimeout(int writeTimeout) - { - //TODO - } - - public TrafficMask getTrafficMask() - { - return null; //TODO - } - - public void setTrafficMask(TrafficMask trafficMask) - { - //TODO - } - - public void suspendRead() - { - //TODO - } - - public void suspendWrite() - { - //TODO - } - - public void resumeRead() - { - //TODO - } - - public void resumeWrite() - { - //TODO - } - - public long getReadBytes() - { - return 0; //TODO - } - - public long getWrittenBytes() - { - return 0; //TODO - } - - public long getReadMessages() - { - return 0; - } - - public long getWrittenMessages() - { - return 0; - } - - public long getWrittenWriteRequests() - { - return 0; //TODO - } - - public int getScheduledWriteRequests() - { - return 0; //TODO - } - - public int getScheduledWriteBytes() - { - return 0; //TODO - } - - public long getCreationTime() - { - return 0; //TODO - } - - public long getLastIoTime() - { - return 0; //TODO - } - - public long getLastReadTime() - { - return 0; //TODO - } - - public long getLastWriteTime() - { - return 0; //TODO - } - - public boolean isIdle(IdleStatus status) - { - return false; //TODO - } - - public int getIdleCount(IdleStatus status) - { - return 0; //TODO - } - - public long getLastIdleTime(IdleStatus status) - { - return 0; //TODO - } -} |