summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java199
1 files changed, 183 insertions, 16 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java
index f330e2f708..a8f75d2b9b 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.federation;
+import org.apache.qpid.AMQStoreException;
import org.apache.qpid.common.ServerPropertyNames;
import org.apache.qpid.server.configuration.ConfigStore;
import org.apache.qpid.server.configuration.ConfiguredObject;
@@ -29,16 +30,19 @@ import org.apache.qpid.server.configuration.LinkConfig;
import org.apache.qpid.server.configuration.LinkConfigType;
import org.apache.qpid.server.transport.ServerSession;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.transport.Binary;
-import org.apache.qpid.transport.Connection;
-import org.apache.qpid.transport.ConnectionException;
-import org.apache.qpid.transport.ConnectionListener;
-import org.apache.qpid.transport.Session;
-import org.apache.qpid.transport.SessionDelegate;
-import org.apache.qpid.transport.TransportException;
-
-import java.util.Map;
-import java.util.UUID;
+import org.apache.qpid.transport.*;
+import org.apache.qpid.util.Strings;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+import java.io.IOException;
+import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ScheduledThreadPoolExecutor;
@@ -55,6 +59,14 @@ public class BrokerLink implements LinkConfig, ConnectionListener
private static final ScheduledThreadPoolExecutor _threadPool =
new ScheduledThreadPoolExecutor(CORE_POOL_SIZE);
+ private static final String TRANSPORT = "transport";
+ private static final String HOST = "host";
+ private static final String PORT = "port";
+ private static final String REMOTE_VHOST = "remoteVhost";
+ private static final String DURABLE = "durable";
+ private static final String AUTH_MECHANISM = "authMechanism";
+ private static final String USERNAME = "username";
+ private static final String PASSWORD = "password";
private final String _transport;
@@ -68,7 +80,7 @@ public class BrokerLink implements LinkConfig, ConnectionListener
private final VirtualHost _virtualHost;
private UUID _id;
private AtomicBoolean _closing = new AtomicBoolean();
- private final long _createTime = System.currentTimeMillis();
+ private final long _createTime;
private Connection _qpidConnection;
private AtomicReference<Thread> _executor = new AtomicReference<Thread>();
private AtomicInteger _bridgeId = new AtomicInteger();
@@ -88,8 +100,10 @@ public class BrokerLink implements LinkConfig, ConnectionListener
{
doMakeConnection();
}
- };;
- ;
+ };
+
+
+
public static enum State
{
@@ -205,6 +219,44 @@ public class BrokerLink implements LinkConfig, ConnectionListener
}
};
+ public BrokerLink(final VirtualHost virtualHost, UUID id, long createTime, Map<String, String> arguments)
+ {
+ _virtualHost = virtualHost;
+ _id = id;
+ virtualHost.getConfigStore().persistentIdInUse(id);
+ _createTime = createTime;
+ _transport = arguments.get(TRANSPORT);
+
+ _host = arguments.get(HOST);
+ _port = Integer.parseInt(arguments.get(PORT));
+ _remoteVhost = arguments.get(REMOTE_VHOST);
+ _durable = Boolean.parseBoolean(arguments.get(DURABLE));
+ _authMechanism = arguments.get("authMechanism");
+ _username = arguments.get("username");
+ _password = arguments.get("password");
+
+ if(_durable)
+ {
+ try
+ {
+ _virtualHost.getDurableConfigurationStore().createBrokerLink(this);
+ }
+ catch (AMQStoreException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+
+ _qpidConnection = new Connection();
+ _connectionConfig = new ConnectionConfigAdapter();
+ _qpidConnection.addConnectionListener(this);
+
+
+ makeConnection();
+
+ }
+
public BrokerLink(final VirtualHost virtualHost,
final String transport,
@@ -212,10 +264,13 @@ public class BrokerLink implements LinkConfig, ConnectionListener
final int port,
final String remoteVhost,
final boolean durable,
- final String authMechanism, final String username, final String password)
+ final String authMechanism,
+ final String username,
+ final String password)
{
_virtualHost = virtualHost;
_transport = transport;
+ _createTime = System.currentTimeMillis();
_host = host;
_port = port;
_remoteVhost = remoteVhost;
@@ -223,15 +278,42 @@ public class BrokerLink implements LinkConfig, ConnectionListener
_authMechanism = authMechanism;
_username = username;
_password = password;
- _id = virtualHost.getConfigStore().createId();
+ _id = durable ? virtualHost.getConfigStore().createPersistentId() : virtualHost.getConfigStore().createId();
+
+ if(durable)
+ {
+ try
+ {
+ _virtualHost.getDurableConfigurationStore().createBrokerLink(this);
+ }
+ catch (AMQStoreException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
_qpidConnection = new Connection();
_connectionConfig = new ConnectionConfigAdapter();
_qpidConnection.addConnectionListener(this);
-
makeConnection();
}
+ public Map<String,String> getArguments()
+ {
+ Map<String,String> arguments = new HashMap<String, String>();
+
+ arguments.put(TRANSPORT, _transport);
+ arguments.put(HOST, _host);
+ arguments.put(PORT, String.valueOf(_port));
+ arguments.put(REMOTE_VHOST, _remoteVhost);
+ arguments.put(DURABLE, String.valueOf(_durable));
+ arguments.put(AUTH_MECHANISM, _authMechanism);
+ arguments.put(USERNAME, _username);
+ arguments.put(PASSWORD, _password);
+
+ return Collections.unmodifiableMap(arguments);
+ }
+
private final boolean updateState(State expected, State newState)
{
return _stateUpdater.compareAndSet(this,expected,newState);
@@ -250,9 +332,50 @@ public class BrokerLink implements LinkConfig, ConnectionListener
{
try
{
+ _qpidConnection.setConnectionDelegate(new ClientDelegate(new ConnectionSettings())
+ {
+ protected SaslClient createSaslClient(List<Object> brokerMechs) throws ConnectionException,
+ SaslException
+ {
+ Map<String,Object> saslProps = new HashMap<String,Object>();
+
+
+ CallbackHandler cbh = new CallbackHandler()
+ {
+ public void handle(final Callback[] callbacks)
+ throws IOException, UnsupportedCallbackException
+ {
+ for (int i = 0; i < callbacks.length; i++)
+ {
+ Callback cb = callbacks[i];
+ if (cb instanceof NameCallback)
+ {
+ ((NameCallback)cb).setName(_username);
+ }
+ else if (cb instanceof PasswordCallback)
+ {
+ ((PasswordCallback)cb).setPassword(_password.toCharArray());
+ }
+ else
+ {
+ throw new UnsupportedCallbackException(cb);
+ }
+ }
+
+ }
+ };
+ final SaslClient sc = Sasl.createSaslClient(new String[] {"PLAIN"}, null,
+ _conSettings.getSaslProtocol(),
+ _conSettings.getSaslServerName(),
+ saslProps, cbh);
+
+ return sc;
+ }});
+
_qpidConnection.connect(_host, _port, _remoteVhost, _username, _password, "ssl".equals(_transport), _authMechanism);
final Map<String,Object> serverProps = _qpidConnection.getServerProperties();
+
_remoteFederationTag = (String) serverProps.get(ServerPropertyNames.FEDERATION_TAG);
if(_remoteFederationTag == null)
{
@@ -445,6 +568,20 @@ public class BrokerLink implements LinkConfig, ConnectionListener
}
+ public void createBridge(final UUID id, final long createTime, final Map<String, String> arguments)
+ {
+ if(!_closing.get())
+ {
+ Bridge bridge = new Bridge(this, _bridgeId.incrementAndGet(), id, createTime, arguments);
+ if(_bridges.putIfAbsent(bridge, bridge) == null)
+ {
+
+ addBridge(bridge);
+ }
+ }
+ }
+
+
private void addBridge(final Bridge bridge)
{
getConfigStore().addConfiguredObject(bridge);
@@ -509,4 +646,34 @@ public class BrokerLink implements LinkConfig, ConnectionListener
{
return _remoteFederationTag;
}
+
+ public String getState()
+ {
+ return _state.name();
+ }
+
+ public String getLastError()
+ {
+ return _lastErrorMessage;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "BrokerLink{" +
+ " _id=" + _id +
+ ", _transport='" + _transport + '\'' +
+ ", _host='" + _host + '\'' +
+ ", _port=" + _port +
+ ", _remoteVhost='" + _remoteVhost + '\'' +
+ ", _durable=" + _durable +
+ ", _authMechanism='" + _authMechanism + '\'' +
+ ", _username='" + _username + '\'' +
+ ", _password='" + _password + '\'' +
+ ", _virtualHost=" + _virtualHost +
+ ", _createTime=" + _createTime +
+ ", _remoteFederationTag='" + _remoteFederationTag + '\'' +
+ ", _state=" + _state +
+ '}';
+ }
}