summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-08-26 19:40:24 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-08-26 19:40:24 +0000
commite7da5d6cbbf402aafc6772043cb6b003958cd4b8 (patch)
treecda63c883592cbdba5bedba5615df97e3fd02b84
parentc3067a75f5a0325cd42a6ade0a273b43ed808427 (diff)
downloadqpid-python-e7da5d6cbbf402aafc6772043cb6b003958cd4b8.tar.gz
QPID-6044 : Merged to 0.30 branch from trunk
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.30@1620708 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Port.java3
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AbstractPort.java100
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java74
3 files changed, 144 insertions, 33 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Port.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Port.java
index 6937d31b3a..d8b81a25b5 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Port.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Port.java
@@ -65,4 +65,7 @@ public interface Port<X extends Port<X>> extends ConfiguredObject<X>
Collection<Connection> getConnections();
void start();
+
+ boolean isLocalMachine(final String host);
+
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AbstractPort.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AbstractPort.java
index d1abded988..c90215f141 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AbstractPort.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AbstractPort.java
@@ -21,6 +21,11 @@
package org.apache.qpid.server.model.port;
+import java.net.InetAddress;
+import java.net.InterfaceAddress;
+import java.net.NetworkInterface;
+import java.net.SocketException;
+import java.net.UnknownHostException;
import java.security.AccessControlException;
import java.util.ArrayList;
import java.util.Collection;
@@ -29,6 +34,10 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.log4j.Logger;
@@ -54,6 +63,60 @@ abstract public class AbstractPort<X extends AbstractPort<X>> extends AbstractCo
{
private static final Logger LOGGER = Logger.getLogger(AbstractPort.class);
+ private static final Set<InetAddress> LOCAL_ADDRESSES = new CopyOnWriteArraySet<>();
+ private static final Set<String> LOCAL_ADDRESS_NAMES = new CopyOnWriteArraySet<>();
+ private static final Lock ADDRESS_LOCK = new ReentrantLock();
+ private static final AtomicBoolean ADDRESSES_COMPUTED = new AtomicBoolean();
+
+ static
+ {
+ Thread thread = new Thread(new Runnable()
+ {
+ public void run()
+ {
+ Lock lock = ADDRESS_LOCK;
+
+ lock.lock();
+ try
+ {
+ for (NetworkInterface networkInterface : Collections.list(NetworkInterface.getNetworkInterfaces()))
+ {
+ for (InterfaceAddress inetAddress : networkInterface.getInterfaceAddresses())
+ {
+ InetAddress address = inetAddress.getAddress();
+ LOCAL_ADDRESSES.add(address);
+ String hostAddress = address.getHostAddress();
+ if (hostAddress != null)
+ {
+ LOCAL_ADDRESS_NAMES.add(hostAddress);
+ }
+ String hostName = address.getHostName();
+ if (hostName != null)
+ {
+ LOCAL_ADDRESS_NAMES.add(hostName);
+ }
+ String canonicalHostName = address.getCanonicalHostName();
+ if (canonicalHostName != null)
+ {
+ LOCAL_ADDRESS_NAMES.add(canonicalHostName);
+ }
+ }
+ }
+ }
+ catch (SocketException e)
+ {
+ // ignore
+ }
+ finally
+ {
+ ADDRESSES_COMPUTED.set(true);
+ lock.unlock();
+ }
+ }
+ }, "Network Address Resolver");
+ thread.start();
+ }
+
private final Broker<?> _broker;
private State _state = State.UNINITIALIZED;
@@ -335,7 +398,7 @@ abstract public class AbstractPort<X extends AbstractPort<X>> extends AbstractCo
Collection<Protocol> portProtocols = existingPort.getProtocols();
if (portProtocols != null)
{
- final ArrayList<Protocol> intersection = new ArrayList(portProtocols);
+ final ArrayList<Protocol> intersection = new ArrayList<>(portProtocols);
intersection.retainAll(getProtocols());
if(!intersection.isEmpty())
{
@@ -345,4 +408,39 @@ abstract public class AbstractPort<X extends AbstractPort<X>> extends AbstractCo
}
}
}
+
+ public boolean isLocalMachine(final String host)
+ {
+ while(!ADDRESSES_COMPUTED.get())
+ {
+ Lock lock = ADDRESS_LOCK;
+ lock.lock();
+ lock.unlock();
+ }
+
+ boolean isNetworkAddress = true;
+ if (!LOCAL_ADDRESS_NAMES.contains(host))
+ {
+ try
+ {
+ InetAddress inetAddress = InetAddress.getByName(host);
+ if (!LOCAL_ADDRESSES.contains(inetAddress))
+ {
+ isNetworkAddress = false;
+ }
+ else
+ {
+ LOCAL_ADDRESS_NAMES.add(host);
+ }
+ }
+ catch (UnknownHostException e)
+ {
+ // ignore
+ isNetworkAddress = false;
+ }
+ }
+ return isNetworkAddress;
+
+ }
+
}
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
index 2a48ccb2df..f97a223f4d 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
@@ -99,7 +99,7 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod
private List<Action<? super Connection_1_0>> _closeTasks =
Collections.synchronizedList(new ArrayList<Action<? super Connection_1_0>>());
-
+ private boolean _closedOnOpen;
public Connection_1_0(Broker broker,
@@ -136,13 +136,17 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod
}
_vhost = ((AmqpPort)_port).getVirtualHost(host);
-
+ if(_vhost == null && _port.isLocalMachine(host))
+ {
+ _vhost = ((AmqpPort)_port).getVirtualHost(_broker.getDefaultVirtualHost());
+ }
if(_vhost == null)
{
final Error err = new Error();
err.setCondition(AmqpError.NOT_FOUND);
err.setDescription("Unknown hostname " + _conn.getLocalHostname());
_conn.close(err);
+ _closedOnOpen = true;
}
else
{
@@ -153,48 +157,54 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod
_subject.getPrivateCredentials().addAll(authSubject.getPrivateCredentials());
}
}
-
public void remoteSessionCreation(SessionEndpoint endpoint)
{
- final Session_1_0 session = new Session_1_0(this, endpoint);
- _sessions.add(session);
- sessionAdded(session);
- endpoint.setSessionEventListener(new SessionEventListener()
+ if(!_closedOnOpen)
{
- @Override
- public void remoteLinkCreation(final LinkEndpoint endpoint)
+ final Session_1_0 session = new Session_1_0(this, endpoint);
+ _sessions.add(session);
+ sessionAdded(session);
+ endpoint.setSessionEventListener(new SessionEventListener()
{
- Subject.doAs(session.getSubject(),new PrivilegedAction<Object>()
+ @Override
+ public void remoteLinkCreation(final LinkEndpoint endpoint)
{
- @Override
- public Object run()
+ Subject.doAs(session.getSubject(), new PrivilegedAction<Object>()
{
- session.remoteLinkCreation(endpoint);
- return null;
- }
- });
- }
-
- @Override
- public void remoteEnd(final End end)
- {
- Subject.doAs(session.getSubject(),new PrivilegedAction<Object>()
+ @Override
+ public Object run()
+ {
+ session.remoteLinkCreation(endpoint);
+ return null;
+ }
+ });
+ }
+
+ @Override
+ public void remoteEnd(final End end)
{
- @Override
- public Object run()
+ Subject.doAs(session.getSubject(), new PrivilegedAction<Object>()
{
- session.remoteEnd(end);
- return null;
- }
- });
- }
- });
+ @Override
+ public Object run()
+ {
+ session.remoteEnd(end);
+ return null;
+ }
+ });
+ }
+ });
+ }
}
void sessionEnded(Session_1_0 session)
{
- _sessions.remove(session);
- sessionRemoved(session);
+ if(!_closedOnOpen)
+ {
+
+ _sessions.remove(session);
+ sessionRemoved(session);
+ }
}
public void removeDeleteTask(final Action<? super Connection_1_0> task)