summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java512
1 files changed, 512 insertions, 0 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java
new file mode 100644
index 0000000000..fa2fb9ead1
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java
@@ -0,0 +1,512 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.federation;
+
+import org.apache.qpid.server.configuration.ConfigStore;
+import org.apache.qpid.server.configuration.ConfiguredObject;
+import org.apache.qpid.server.configuration.ConnectionConfig;
+import org.apache.qpid.server.configuration.ConnectionConfigType;
+import org.apache.qpid.server.configuration.LinkConfig;
+import org.apache.qpid.server.configuration.LinkConfigType;
+import org.apache.qpid.server.transport.ServerSession;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.transport.Binary;
+import org.apache.qpid.transport.Connection;
+import org.apache.qpid.transport.ConnectionException;
+import org.apache.qpid.transport.ConnectionListener;
+import org.apache.qpid.transport.Session;
+import org.apache.qpid.transport.SessionDelegate;
+import org.apache.qpid.transport.TransportException;
+
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+
+public class BrokerLink implements LinkConfig, ConnectionListener
+{
+
+ private static final int CORE_POOL_SIZE = 4;
+
+ private static final ScheduledThreadPoolExecutor _threadPool =
+ new ScheduledThreadPoolExecutor(CORE_POOL_SIZE);
+
+
+ private final String _transport;
+ private final String _host;
+ private final int _port;
+ private final String _remoteVhost;
+ private final boolean _durable;
+ private final String _authMechanism;
+ private final String _username;
+ private final String _password;
+ private final VirtualHost _virtualHost;
+ private UUID _id;
+ private AtomicBoolean _closing = new AtomicBoolean();
+ private final long _createTime = System.currentTimeMillis();
+ private Connection _qpidConnection;
+ private AtomicReference<Thread> _executor = new AtomicReference<Thread>();
+ private AtomicInteger _bridgeId = new AtomicInteger();
+
+ private final ConcurrentHashMap<Bridge,Bridge> _bridges = new ConcurrentHashMap<Bridge,Bridge>();
+ private final ConcurrentHashMap<Bridge,Bridge> _activeBridges = new ConcurrentHashMap<Bridge,Bridge>();
+ private final ConcurrentLinkedQueue<Bridge> _pendingBridges = new ConcurrentLinkedQueue<Bridge>();
+ private String _remoteFederationTag;
+
+ private ConnectionConfig _connectionConfig;
+ private ConnectionException _exception;
+ private String _lastErrorMessage;
+ private int _retryDelay = 1;
+ private final Runnable _makeConnectionTask = new Runnable()
+ {
+ public void run()
+ {
+ doMakeConnection();
+ }
+ };;
+ ;
+
+ public static enum State
+ {
+ OPERATIONAL,
+ DOWN,
+ ESTABLISHING,
+ DELETED
+ }
+
+
+ private volatile State _state = State.DOWN;
+
+ private static final AtomicReferenceFieldUpdater<BrokerLink, State> _stateUpdater =
+ AtomicReferenceFieldUpdater.newUpdater(BrokerLink.class, State.class, "_state");
+
+ private class ConnectionConfigAdapter implements ConnectionConfig
+ {
+ private long _adapterCreateTime = System.currentTimeMillis();
+ private UUID _id = BrokerLink.this.getConfigStore().createId();
+
+ public VirtualHost getVirtualHost()
+ {
+ return BrokerLink.this.getVirtualHost();
+ }
+
+ public String getAddress()
+ {
+ return _host+":"+_port;
+ }
+
+ public Boolean isIncoming()
+ {
+ return false;
+ }
+
+ public Boolean isSystemConnection()
+ {
+ return true;
+ }
+
+ public Boolean isFederationLink()
+ {
+ return true;
+ }
+
+ public String getAuthId()
+ {
+ return _username;
+ }
+
+ public String getRemoteProcessName()
+ {
+ return null;
+ }
+
+ public Integer getRemotePID()
+ {
+ return null;
+ }
+
+ public Integer getRemoteParentPID()
+ {
+ return null;
+ }
+
+ public ConfigStore getConfigStore()
+ {
+ return getVirtualHost().getConfigStore();
+ }
+
+ public UUID getId()
+ {
+ return _id;
+ }
+
+ public ConnectionConfigType getConfigType()
+ {
+ return ConnectionConfigType.getInstance();
+ }
+
+ public ConfiguredObject getParent()
+ {
+ return getVirtualHost();
+ }
+
+ public boolean isDurable()
+ {
+ return false;
+ }
+
+ public long getCreateTime()
+ {
+ return _adapterCreateTime;
+ }
+
+ public Boolean isShadow()
+ {
+ return false;
+ }
+
+ public void mgmtClose()
+ {
+ _connectionConfig.mgmtClose();
+ }
+ }
+
+ private class SessionFactory implements Connection.SessionFactory
+ {
+
+ public Session newSession(final Connection conn, final Binary name, final long expiry)
+ {
+ return new ServerSession(conn, new SessionDelegate(), name, expiry, _connectionConfig);
+ }
+ };
+
+
+ public BrokerLink(final VirtualHost virtualHost,
+ final String transport,
+ final String host,
+ final int port,
+ final String remoteVhost,
+ final boolean durable,
+ final String authMechanism, final String username, final String password)
+ {
+ _virtualHost = virtualHost;
+ _transport = transport;
+ _host = host;
+ _port = port;
+ _remoteVhost = remoteVhost;
+ _durable = durable;
+ _authMechanism = authMechanism;
+ _username = username;
+ _password = password;
+ _id = virtualHost.getConfigStore().createId();
+ _qpidConnection = new Connection();
+ _connectionConfig = new ConnectionConfigAdapter();
+ _qpidConnection.addConnectionListener(this);
+
+
+ makeConnection();
+ }
+
+ private final boolean updateState(State expected, State newState)
+ {
+ return _stateUpdater.compareAndSet(this,expected,newState);
+ }
+
+ private void makeConnection()
+ {
+ _threadPool.execute(_makeConnectionTask);
+ }
+
+
+
+ private void doMakeConnection()
+ {
+ if(updateState(State.DOWN, State.ESTABLISHING))
+ {
+ try
+ {
+ _qpidConnection.connect(_host, _port, _remoteVhost, _username, _password, "ssl".equals(_transport), _authMechanism);
+
+ final Map<String,Object> serverProps = _qpidConnection.getServerProperties();
+ _remoteFederationTag = (String) serverProps.get("qpid.federation_tag");
+ if(_remoteFederationTag == null)
+ {
+ _remoteFederationTag = UUID.fromString(_transport+":"+_host+":"+_port).toString();
+ }
+ _qpidConnection.setSessionFactory(new SessionFactory());
+ _qpidConnection.setAuthorizationID(_username == null ? "" : _username);
+
+ updateState(State.ESTABLISHING, State.OPERATIONAL);
+
+ _retryDelay = 1;
+
+ for(Bridge bridge : _bridges.values())
+ {
+ if(_state != State.OPERATIONAL)
+ {
+ break;
+ }
+ addBridge(bridge);
+ }
+
+
+ }
+ catch (TransportException e)
+ {
+ _lastErrorMessage = e.getMessage();
+ if(_retryDelay < 60)
+ {
+ _retryDelay <<= 1;
+ }
+
+ updateState(State.ESTABLISHING, State.DOWN);
+ _activeBridges.clear();
+ scheduleConnectionRetry();
+ }
+ }
+ }
+
+ private void scheduleConnectionRetry()
+ {
+ if(_state != State.DELETED)
+ {
+ _threadPool.schedule(_makeConnectionTask, _retryDelay, TimeUnit.SECONDS);
+ }
+ }
+
+ public VirtualHost getVirtualHost()
+ {
+ return _virtualHost;
+ }
+
+ public String getTransport()
+ {
+ return _transport;
+ }
+
+ public String getHost()
+ {
+ return _host;
+ }
+
+ public int getPort()
+ {
+ return _port;
+ }
+
+ public String getRemoteVhost()
+ {
+ return _remoteVhost;
+ }
+
+ public UUID getId()
+ {
+ return _id;
+ }
+
+ public LinkConfigType getConfigType()
+ {
+ return LinkConfigType.getInstance();
+ }
+
+ public ConfiguredObject getParent()
+ {
+ return getVirtualHost();
+ }
+
+ public boolean isDurable()
+ {
+ return _durable;
+ }
+
+ public String getAuthMechanism()
+ {
+ return _authMechanism;
+ }
+
+ public String getUsername()
+ {
+ return _username;
+ }
+
+ public String getPassword()
+ {
+ return _password;
+ }
+
+ @Override
+ public boolean equals(final Object o)
+ {
+ if (this == o)
+ {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass())
+ {
+ return false;
+ }
+
+ final BrokerLink that = (BrokerLink) o;
+
+ if (_port != that._port)
+ {
+ return false;
+ }
+ if (_host != null ? !_host.equals(that._host) : that._host != null)
+ {
+ return false;
+ }
+ if (_remoteVhost != null ? !_remoteVhost.equals(that._remoteVhost) : that._remoteVhost != null)
+ {
+ return false;
+ }
+ if (_transport != null ? !_transport.equals(that._transport) : that._transport != null)
+ {
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result = _transport != null ? _transport.hashCode() : 0;
+ result = 31 * result + (_host != null ? _host.hashCode() : 0);
+ result = 31 * result + _port;
+ result = 31 * result + (_remoteVhost != null ? _remoteVhost.hashCode() : 0);
+ return result;
+ }
+
+ public void close()
+ {
+ if(_closing.compareAndSet(false,true))
+ {
+ // TODO - close connection
+ for(Bridge bridge : _bridges.values())
+ {
+ bridge.close();
+ }
+ _bridges.clear();
+
+ _virtualHost.removeBrokerConnection(this);
+ }
+ }
+
+ public long getCreateTime()
+ {
+ return _createTime;
+ }
+
+ public void createBridge(final boolean durable,
+ final boolean dynamic,
+ final boolean srcIsQueue,
+ final boolean srcIsLocal,
+ final String src,
+ final String dest,
+ final String key,
+ final String tag,
+ final String excludes)
+ {
+ if(!_closing.get())
+ {
+ Bridge bridge = new Bridge(this, _bridgeId.incrementAndGet(), durable,dynamic,srcIsQueue,srcIsLocal,src,dest,key,tag,excludes);
+ if(_bridges.putIfAbsent(bridge, bridge) == null)
+ {
+
+ addBridge(bridge);
+ }
+ }
+
+
+ }
+
+ private void addBridge(final Bridge bridge)
+ {
+ getConfigStore().addConfiguredObject(bridge);
+
+ if(_state == State.OPERATIONAL && (_activeBridges.putIfAbsent(bridge,bridge) == null))
+ {
+
+
+ Session session = _qpidConnection.createSession("Bridge("
+ + (bridge.isDurable() ? "durable" : "transient")
+ + "," + (bridge.isDynamic() ? "dynamic" : "static")
+ + "," + (bridge.isQueueBridge() ? "queue" : "exchange")
+ + "," + (bridge.isLocalSource() ? "local-src" : "remote-src")
+ + ",[Source: '" + bridge.getSource() + "']"
+ + ",[Destination: '" + bridge.getDestination() + "']"
+ + ",[Key: '" + bridge.getKey() + "']"
+ + ",[Tag: '" + bridge.getTag() + "']"
+ + ".[Excludes: '" + bridge.getExcludes() + "'])");
+ bridge.setSession(session);
+
+
+ if(_closing.get())
+ {
+ bridge.close();
+ }
+ }
+
+ }
+
+ public void opened(final Connection connection)
+ {
+ // this method not called
+ }
+
+ public void exception(final Connection connection, final ConnectionException exception)
+ {
+ _exception = exception;
+ _lastErrorMessage = exception.getMessage();
+
+ }
+
+ public void closed(final Connection connection)
+ {
+ State currentState = _state;
+ if(currentState != State.DOWN && currentState != State.DELETED && updateState(currentState, State.DOWN))
+ {
+ scheduleConnectionRetry();
+ }
+ }
+
+ public ConfigStore getConfigStore()
+ {
+ return getVirtualHost().getConfigStore();
+ }
+
+ public String getFederationTag()
+ {
+ return getVirtualHost().getFederationTag();
+ }
+
+ public String getRemoteFederationTag()
+ {
+ return _remoteFederationTag;
+ }
+}