diff options
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java')
-rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java | 199 |
1 files changed, 183 insertions, 16 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java index f330e2f708..a8f75d2b9b 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.server.federation; +import org.apache.qpid.AMQStoreException; import org.apache.qpid.common.ServerPropertyNames; import org.apache.qpid.server.configuration.ConfigStore; import org.apache.qpid.server.configuration.ConfiguredObject; @@ -29,16 +30,19 @@ import org.apache.qpid.server.configuration.LinkConfig; import org.apache.qpid.server.configuration.LinkConfigType; import org.apache.qpid.server.transport.ServerSession; import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.transport.Binary; -import org.apache.qpid.transport.Connection; -import org.apache.qpid.transport.ConnectionException; -import org.apache.qpid.transport.ConnectionListener; -import org.apache.qpid.transport.Session; -import org.apache.qpid.transport.SessionDelegate; -import org.apache.qpid.transport.TransportException; - -import java.util.Map; -import java.util.UUID; +import org.apache.qpid.transport.*; +import org.apache.qpid.util.Strings; + +import javax.security.auth.callback.Callback; +import javax.security.auth.callback.CallbackHandler; +import javax.security.auth.callback.NameCallback; +import javax.security.auth.callback.PasswordCallback; +import javax.security.auth.callback.UnsupportedCallbackException; +import javax.security.sasl.Sasl; +import javax.security.sasl.SaslClient; +import javax.security.sasl.SaslException; +import java.io.IOException; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ScheduledThreadPoolExecutor; @@ -55,6 +59,14 @@ public class BrokerLink implements LinkConfig, ConnectionListener private static final ScheduledThreadPoolExecutor _threadPool = new ScheduledThreadPoolExecutor(CORE_POOL_SIZE); + private static final String TRANSPORT = "transport"; + private static final String HOST = "host"; + private static final String PORT = "port"; + private static final String REMOTE_VHOST = "remoteVhost"; + private static final String DURABLE = "durable"; + private static final String AUTH_MECHANISM = "authMechanism"; + private static final String USERNAME = "username"; + private static final String PASSWORD = "password"; private final String _transport; @@ -68,7 +80,7 @@ public class BrokerLink implements LinkConfig, ConnectionListener private final VirtualHost _virtualHost; private UUID _id; private AtomicBoolean _closing = new AtomicBoolean(); - private final long _createTime = System.currentTimeMillis(); + private final long _createTime; private Connection _qpidConnection; private AtomicReference<Thread> _executor = new AtomicReference<Thread>(); private AtomicInteger _bridgeId = new AtomicInteger(); @@ -88,8 +100,10 @@ public class BrokerLink implements LinkConfig, ConnectionListener { doMakeConnection(); } - };; - ; + }; + + + public static enum State { @@ -205,6 +219,44 @@ public class BrokerLink implements LinkConfig, ConnectionListener } }; + public BrokerLink(final VirtualHost virtualHost, UUID id, long createTime, Map<String, String> arguments) + { + _virtualHost = virtualHost; + _id = id; + virtualHost.getConfigStore().persistentIdInUse(id); + _createTime = createTime; + _transport = arguments.get(TRANSPORT); + + _host = arguments.get(HOST); + _port = Integer.parseInt(arguments.get(PORT)); + _remoteVhost = arguments.get(REMOTE_VHOST); + _durable = Boolean.parseBoolean(arguments.get(DURABLE)); + _authMechanism = arguments.get("authMechanism"); + _username = arguments.get("username"); + _password = arguments.get("password"); + + if(_durable) + { + try + { + _virtualHost.getDurableConfigurationStore().createBrokerLink(this); + } + catch (AMQStoreException e) + { + throw new RuntimeException(e); + } + } + + + _qpidConnection = new Connection(); + _connectionConfig = new ConnectionConfigAdapter(); + _qpidConnection.addConnectionListener(this); + + + makeConnection(); + + } + public BrokerLink(final VirtualHost virtualHost, final String transport, @@ -212,10 +264,13 @@ public class BrokerLink implements LinkConfig, ConnectionListener final int port, final String remoteVhost, final boolean durable, - final String authMechanism, final String username, final String password) + final String authMechanism, + final String username, + final String password) { _virtualHost = virtualHost; _transport = transport; + _createTime = System.currentTimeMillis(); _host = host; _port = port; _remoteVhost = remoteVhost; @@ -223,15 +278,42 @@ public class BrokerLink implements LinkConfig, ConnectionListener _authMechanism = authMechanism; _username = username; _password = password; - _id = virtualHost.getConfigStore().createId(); + _id = durable ? virtualHost.getConfigStore().createPersistentId() : virtualHost.getConfigStore().createId(); + + if(durable) + { + try + { + _virtualHost.getDurableConfigurationStore().createBrokerLink(this); + } + catch (AMQStoreException e) + { + throw new RuntimeException(e); + } + } _qpidConnection = new Connection(); _connectionConfig = new ConnectionConfigAdapter(); _qpidConnection.addConnectionListener(this); - makeConnection(); } + public Map<String,String> getArguments() + { + Map<String,String> arguments = new HashMap<String, String>(); + + arguments.put(TRANSPORT, _transport); + arguments.put(HOST, _host); + arguments.put(PORT, String.valueOf(_port)); + arguments.put(REMOTE_VHOST, _remoteVhost); + arguments.put(DURABLE, String.valueOf(_durable)); + arguments.put(AUTH_MECHANISM, _authMechanism); + arguments.put(USERNAME, _username); + arguments.put(PASSWORD, _password); + + return Collections.unmodifiableMap(arguments); + } + private final boolean updateState(State expected, State newState) { return _stateUpdater.compareAndSet(this,expected,newState); @@ -250,9 +332,50 @@ public class BrokerLink implements LinkConfig, ConnectionListener { try { + _qpidConnection.setConnectionDelegate(new ClientDelegate(new ConnectionSettings()) + { + protected SaslClient createSaslClient(List<Object> brokerMechs) throws ConnectionException, + SaslException + { + Map<String,Object> saslProps = new HashMap<String,Object>(); + + + CallbackHandler cbh = new CallbackHandler() + { + public void handle(final Callback[] callbacks) + throws IOException, UnsupportedCallbackException + { + for (int i = 0; i < callbacks.length; i++) + { + Callback cb = callbacks[i]; + if (cb instanceof NameCallback) + { + ((NameCallback)cb).setName(_username); + } + else if (cb instanceof PasswordCallback) + { + ((PasswordCallback)cb).setPassword(_password.toCharArray()); + } + else + { + throw new UnsupportedCallbackException(cb); + } + } + + } + }; + final SaslClient sc = Sasl.createSaslClient(new String[] {"PLAIN"}, null, + _conSettings.getSaslProtocol(), + _conSettings.getSaslServerName(), + saslProps, cbh); + + return sc; + }}); + _qpidConnection.connect(_host, _port, _remoteVhost, _username, _password, "ssl".equals(_transport), _authMechanism); final Map<String,Object> serverProps = _qpidConnection.getServerProperties(); + _remoteFederationTag = (String) serverProps.get(ServerPropertyNames.FEDERATION_TAG); if(_remoteFederationTag == null) { @@ -445,6 +568,20 @@ public class BrokerLink implements LinkConfig, ConnectionListener } + public void createBridge(final UUID id, final long createTime, final Map<String, String> arguments) + { + if(!_closing.get()) + { + Bridge bridge = new Bridge(this, _bridgeId.incrementAndGet(), id, createTime, arguments); + if(_bridges.putIfAbsent(bridge, bridge) == null) + { + + addBridge(bridge); + } + } + } + + private void addBridge(final Bridge bridge) { getConfigStore().addConfiguredObject(bridge); @@ -509,4 +646,34 @@ public class BrokerLink implements LinkConfig, ConnectionListener { return _remoteFederationTag; } + + public String getState() + { + return _state.name(); + } + + public String getLastError() + { + return _lastErrorMessage; + } + + @Override + public String toString() + { + return "BrokerLink{" + + " _id=" + _id + + ", _transport='" + _transport + '\'' + + ", _host='" + _host + '\'' + + ", _port=" + _port + + ", _remoteVhost='" + _remoteVhost + '\'' + + ", _durable=" + _durable + + ", _authMechanism='" + _authMechanism + '\'' + + ", _username='" + _username + '\'' + + ", _password='" + _password + '\'' + + ", _virtualHost=" + _virtualHost + + ", _createTime=" + _createTime + + ", _remoteFederationTag='" + _remoteFederationTag + '\'' + + ", _state=" + _state + + '}'; + } } |