diff options
Diffstat (limited to 'qpid/java/broker/src/main/java')
11 files changed, 423 insertions, 292 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java index f8deb95628..c45e794145 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java @@ -20,6 +20,13 @@ */ package org.apache.qpid.server; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.Properties; + import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.HelpFormatter; import org.apache.commons.cli.Option; @@ -30,16 +37,9 @@ import org.apache.commons.cli.PosixParser; import org.apache.log4j.Logger; import org.apache.log4j.PropertyConfigurator; import org.apache.log4j.xml.QpidLog4JConfigurator; -import org.apache.mina.common.ByteBuffer; -import org.apache.mina.common.FixedSizeByteBufferAllocator; -import org.apache.mina.common.IoAcceptor; -import org.apache.mina.transport.socket.nio.SocketAcceptorConfig; -import org.apache.mina.transport.socket.nio.SocketSessionConfig; -import org.apache.mina.util.NewThreadExecutor; import org.apache.qpid.AMQException; import org.apache.qpid.common.QpidProperties; import org.apache.qpid.framing.ProtocolVersion; -import org.apache.qpid.pool.ReadWriteThreadModel; import org.apache.qpid.server.configuration.ServerConfiguration; import org.apache.qpid.server.configuration.management.ConfigurationManagementMBean; import org.apache.qpid.server.information.management.ServerInformationMBean; @@ -48,19 +48,13 @@ import org.apache.qpid.server.logging.actors.BrokerActor; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.management.LoggingManagementMBean; import org.apache.qpid.server.logging.messages.BrokerMessages; -import org.apache.qpid.server.protocol.AMQPFastProtocolHandler; -import org.apache.qpid.server.protocol.AMQPProtocolProvider; +import org.apache.qpid.server.protocol.AMQProtocolEngineFactory; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry; import org.apache.qpid.server.transport.QpidAcceptor; - -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.net.BindException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.util.Properties; +import org.apache.qpid.ssl.SSLContextFactory; +import org.apache.qpid.transport.NetworkDriver; +import org.apache.qpid.transport.network.mina.MINANetworkDriver; /** * Main entry point for AMQPD. @@ -300,20 +294,6 @@ public class Main _brokerLogger.info("Starting Qpid Broker " + QpidProperties.getReleaseVersion() + " build: " + QpidProperties.getBuildVersion()); - ByteBuffer.setUseDirectBuffers(serverConfig.getEnableDirectBuffers()); - - // the MINA default is currently to use the pooled allocator although this may change in future - // once more testing of the performance of the simple allocator has been done - if (!serverConfig.getEnablePooledAllocator()) - { - ByteBuffer.setAllocator(new FixedSizeByteBufferAllocator()); - } - - if (serverConfig.getUseBiasedWrites()) - { - System.setProperty("org.apache.qpid.use_write_biased_pool", "true"); - } - int port = serverConfig.getPort(); String portStr = commandLine.getOptionValue("p"); @@ -329,7 +309,54 @@ public class Main } } - bind(port, serverConfig); + String bindAddr = commandLine.getOptionValue("b"); + if (bindAddr == null) + { + bindAddr = serverConfig.getBind(); + } + InetAddress bindAddress = null; + if (bindAddr.equals("wildcard")) + { + bindAddress = new InetSocketAddress(port).getAddress(); + } + else + { + bindAddress = InetAddress.getByAddress(parseIP(bindAddr)); + } + + String keystorePath = serverConfig.getKeystorePath(); + String keystorePassword = serverConfig.getKeystorePassword(); + String certType = serverConfig.getCertType(); + SSLContextFactory sslFactory = null; + boolean isSsl = false; + + if (!serverConfig.getSSLOnly()) + { + NetworkDriver driver = new MINANetworkDriver(); + driver.bind(port, new InetAddress[]{bindAddress}, new AMQProtocolEngineFactory(), + serverConfig.getNetworkConfiguration(), null); + ApplicationRegistry.getInstance().addAcceptor(new InetSocketAddress(bindAddress, port), + new QpidAcceptor(driver,"TCP")); + CurrentActor.get().message(BrokerMessages.BRK_1002("TCP", port)); + } + + if (serverConfig.getEnableSSL()) + { + sslFactory = new SSLContextFactory(keystorePath, keystorePassword, certType); + NetworkDriver driver = new MINANetworkDriver(); + driver.bind(serverConfig.getSSLPort(), new InetAddress[]{bindAddress}, + new AMQProtocolEngineFactory(), serverConfig.getNetworkConfiguration(), sslFactory); + ApplicationRegistry.getInstance().addAcceptor(new InetSocketAddress(bindAddress, port), + new QpidAcceptor(driver,"TCP")); + CurrentActor.get().message(BrokerMessages.BRK_1002("TCP/SSL", serverConfig.getSSLPort())); + } + + //fixme qpid.AMQP should be using qpidproperties to get value + _brokerLogger.info("Qpid Broker Ready :" + QpidProperties.getReleaseVersion() + + " build: " + QpidProperties.getBuildVersion()); + + CurrentActor.get().message(BrokerMessages.BRK_1004()); + } finally { @@ -358,114 +385,6 @@ public class Main } } - protected void bind(int port, ServerConfiguration config) throws BindException - { - String bindAddr = commandLine.getOptionValue("b"); - if (bindAddr == null) - { - bindAddr = config.getBind(); - } - - try - { - IoAcceptor acceptor; - - if (ApplicationRegistry.getInstance().getConfiguration().getQpidNIO()) - { - _logger.warn("Using Qpid Multithreaded IO Processing"); - acceptor = new org.apache.mina.transport.socket.nio.MultiThreadSocketAcceptor(config.getProcessors(), new NewThreadExecutor()); - } - else - { - _logger.warn("Using Mina IO Processing"); - acceptor = new org.apache.mina.transport.socket.nio.SocketAcceptor(config.getProcessors(), new NewThreadExecutor()); - } - - SocketAcceptorConfig sconfig = (SocketAcceptorConfig) acceptor.getDefaultConfig(); - SocketSessionConfig sc = (SocketSessionConfig) sconfig.getSessionConfig(); - - sc.setReceiveBufferSize(config.getReceiveBufferSize()); - sc.setSendBufferSize(config.getWriteBufferSize()); - sc.setTcpNoDelay(config.getTcpNoDelay()); - - // if we do not use the executor pool threading model we get the default leader follower - // implementation provided by MINA - if (config.getEnableExecutorPool()) - { - sconfig.setThreadModel(ReadWriteThreadModel.getInstance()); - } - - if (!config.getEnableSSL() || !config.getSSLOnly()) - { - AMQPFastProtocolHandler handler = new AMQPProtocolProvider().getHandler(); - InetSocketAddress bindAddress; - if (bindAddr.equals("wildcard")) - { - bindAddress = new InetSocketAddress(port); - } - else - { - bindAddress = new InetSocketAddress(InetAddress.getByAddress(parseIP(bindAddr)), port); - } - - bind(new QpidAcceptor(acceptor,"TCP"), bindAddress, handler, sconfig); - - //fixme qpid.AMQP should be using qpidproperties to get value - _brokerLogger.info("Qpid.AMQP listening on non-SSL address " + bindAddress); - } - - if (config.getEnableSSL()) - { - AMQPFastProtocolHandler handler = new AMQPProtocolProvider().getHandler(); - try - { - - bind(new QpidAcceptor(acceptor, "TCP/SSL"), new InetSocketAddress(config.getSSLPort()), handler, sconfig); - - //fixme qpid.AMQP should be using qpidproperties to get value - _brokerLogger.info("Qpid.AMQP listening on SSL port " + config.getSSLPort()); - - } - catch (IOException e) - { - _brokerLogger.error("Unable to listen on SSL port: " + e, e); - } - } - - //fixme qpid.AMQP should be using qpidproperties to get value - _brokerLogger.info("Qpid Broker Ready :" + QpidProperties.getReleaseVersion() - + " build: " + QpidProperties.getBuildVersion()); - - CurrentActor.get().message(BrokerMessages.BRK_1004()); - - } - catch (Exception e) - { - _logger.error("Unable to bind service to registry: " + e, e); - //fixme this need tidying up - throw new BindException(e.getMessage()); - } - } - - /** - * Ensure that any bound Acceptors are recorded in the registry so they can be closed later. - * - * @param acceptor - * @param bindAddress - * @param handler - * @param sconfig - * - * @throws IOException from the acceptor.bind command - */ - private void bind(QpidAcceptor acceptor, InetSocketAddress bindAddress, AMQPFastProtocolHandler handler, SocketAcceptorConfig sconfig) throws IOException - { - acceptor.getIoAcceptor().bind(bindAddress, handler, sconfig); - - CurrentActor.get().message(BrokerMessages.BRK_1002(acceptor.toString(), bindAddress.getPort())); - - ApplicationRegistry.getInstance().addAcceptor(bindAddress, acceptor); - } - public static void main(String[] args) { //if the -Dlog4j.configuration property has not been set, enable the init override diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java index 7ea7738189..b3c8975c7c 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java @@ -38,6 +38,7 @@ import org.apache.qpid.server.configuration.management.ConfigurationManagementMB import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; +import org.apache.qpid.transport.NetworkDriverConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -564,7 +565,7 @@ public class ServerConfiguration implements SignalHandler public boolean getSSLOnly() { - return getConfig().getBoolean("connector.ssl.sslOnly", true); + return getConfig().getBoolean("connector.ssl.sslOnly", false); } public int getSSLPort() @@ -613,4 +614,57 @@ public class ServerConfiguration implements SignalHandler getConfig().getLong("housekeeping.expiredMessageCheckPeriod", DEFAULT_HOUSEKEEPING_PERIOD)); } + + public NetworkDriverConfiguration getNetworkConfiguration() + { + return new NetworkDriverConfiguration() + { + + public Integer getTrafficClass() + { + return null; + } + + public Boolean getTcpNoDelay() + { + // Can't call parent getTcpNoDelay since it just calls this one + return getConfig().getBoolean("connector.tcpNoDelay", true); + } + + public Integer getSoTimeout() + { + return null; + } + + public Integer getSoLinger() + { + return null; + } + + public Integer getSendBufferSize() + { + return getBufferWriteLimit(); + } + + public Boolean getReuseAddress() + { + return null; + } + + public Integer getReceiveBufferSize() + { + return getBufferReadLimit(); + } + + public Boolean getOOBInline() + { + return null; + } + + public Boolean getKeepAlive() + { + return null; + } + }; + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java index d287595e2d..71e07172ed 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java @@ -21,7 +21,6 @@ package org.apache.qpid.server.connection; import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.server.protocol.AMQMinaProtocolSession; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.AMQException; import org.apache.qpid.AMQConnectionException; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java index d64fde1c20..002269bbaa 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java @@ -20,7 +20,6 @@ */ package org.apache.qpid.server.connection; -import org.apache.qpid.server.protocol.AMQMinaProtocolSession; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.AMQException; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java index 7bc4365152..49bdffb584 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java @@ -20,11 +20,25 @@ */ package org.apache.qpid.server.protocol; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.security.Principal; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.atomic.AtomicLong; + +import javax.management.JMException; +import javax.security.sasl.SaslServer; + import org.apache.log4j.Logger; -import org.apache.mina.common.CloseFuture; -import org.apache.mina.common.IdleStatus; -import org.apache.mina.common.IoServiceConfig; -import org.apache.mina.common.IoSession; import org.apache.mina.transport.vmpipe.VmPipeAddress; import org.apache.qpid.AMQChannelException; import org.apache.qpid.AMQConnectionException; @@ -36,8 +50,11 @@ import org.apache.qpid.framing.AMQBody; import org.apache.qpid.framing.AMQDataBlock; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.AMQProtocolHeaderException; +import org.apache.qpid.framing.AMQProtocolVersionException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ChannelCloseOkBody; +import org.apache.qpid.framing.ConnectionCloseBody; import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.FieldTable; @@ -46,18 +63,23 @@ import org.apache.qpid.framing.MethodDispatcher; import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.framing.ProtocolInitiation; import org.apache.qpid.framing.ProtocolVersion; +import org.apache.qpid.pool.Event; +import org.apache.qpid.pool.Job; +import org.apache.qpid.pool.PoolingFilter; import org.apache.qpid.pool.ReadWriteThreadModel; +import org.apache.qpid.pool.ReferenceCountingExecutorService; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQMethodListener; +import org.apache.qpid.protocol.ProtocolEngine; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.handler.ServerMethodDispatcherImpl; +import org.apache.qpid.server.logging.LogActor; +import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.logging.actors.AMQPConnectionActor; import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.logging.subjects.ConnectionLogSubject; -import org.apache.qpid.server.logging.LogSubject; -import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.messages.ConnectionMessages; +import org.apache.qpid.server.logging.subjects.ConnectionLogSubject; import org.apache.qpid.server.management.Managable; import org.apache.qpid.server.management.ManagedObject; import org.apache.qpid.server.output.ProtocolOutputConverter; @@ -67,22 +89,10 @@ import org.apache.qpid.server.state.AMQState; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; +import org.apache.qpid.transport.NetworkDriver; import org.apache.qpid.transport.Sender; -import javax.management.JMException; -import javax.security.sasl.SaslServer; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.security.Principal; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.atomic.AtomicLong; - -public class AMQMinaProtocolSession implements AMQProtocolSession, Managable +public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocolSession { private static final Logger _logger = Logger.getLogger(AMQProtocolSession.class); @@ -94,8 +104,6 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable // channels. This value must be of the form 2^x - 1. private static final int CHANNEL_CACHE_SIZE = 0xff; - private final IoSession _minaProtocolSession; - private AMQShortString _contextKey; private AMQShortString _clientVersion = null; @@ -135,47 +143,43 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable private Principal _authorizedID; private MethodDispatcher _dispatcher; private ProtocolSessionIdentifier _sessionIdentifier; - - private static final long LAST_WRITE_FUTURE_JOIN_TIMEOUT = 60000L; - private org.apache.mina.common.WriteFuture _lastWriteFuture; - + // Create a simple ID that increments for ever new Session private final long _sessionID = idGenerator.getAndIncrement(); private AMQPConnectionActor _actor; private LogSubject _logSubject; + private NetworkDriver _networkDriver; + + private long _lastIoTime; + + private long _writtenBytes; + private long _readBytes; + + private Job _readJob; + private Job _writeJob; + + private ReferenceCountingExecutorService _poolReference = ReferenceCountingExecutorService.getInstance(); + public ManagedObject getManagedObject() { return _managedObject; } - public AMQMinaProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry, AMQCodecFactory codecFactory) - throws AMQException + public AMQProtocolEngine(VirtualHostRegistry virtualHostRegistry, NetworkDriver driver) { _stateManager = new AMQStateManager(virtualHostRegistry, this); - _minaProtocolSession = session; - session.setAttachment(this); + _networkDriver = driver; + + _codecFactory = new AMQCodecFactory(true, this); - _codecFactory = codecFactory; + ReadWriteThreadModel threadModel = ReadWriteThreadModel.getInstance(); + _readJob = new Job(threadModel.getAsynchronousReadFilter(), PoolingFilter.MAX_JOB_EVENTS, true); + _writeJob = new Job(threadModel.getAsynchronousWriteFilter(), PoolingFilter.MAX_JOB_EVENTS, false); _actor = new AMQPConnectionActor(this, virtualHostRegistry.getApplicationRegistry().getRootMessageLogger()); - _actor.message(ConnectionMessages.CON_1001(null, null, false, false)); - - try - { - IoServiceConfig config = session.getServiceConfig(); - ReadWriteThreadModel threadModel = (ReadWriteThreadModel) config.getThreadModel(); - threadModel.getAsynchronousReadFilter().createNewJobForSession(session); - threadModel.getAsynchronousWriteFilter().createNewJobForSession(session); - } - catch (RuntimeException e) - { - e.printStackTrace(); - throw e; - - } } private AMQProtocolSessionMBean createMBean() throws AMQException @@ -191,16 +195,6 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable } } - public IoSession getIOSession() - { - return _minaProtocolSession; - } - - public static AMQProtocolSession getAMQProtocolSession(IoSession minaProtocolSession) - { - return (AMQProtocolSession) minaProtocolSession.getAttachment(); - } - public long getSessionID() { return _sessionID; @@ -211,6 +205,42 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable return _actor; } + @Override + public void received(final ByteBuffer msg) + { + _lastIoTime = System.currentTimeMillis(); + try + { + final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg); + fireAsynchEvent(_readJob, new Event(new Runnable() + { + @Override + public void run() + { + // Decode buffer + + for (AMQDataBlock dataBlock : dataBlocks) + { + try + { + dataBlockReceived(dataBlock); + } + catch (Exception e) + { + e.printStackTrace(); + closeProtocolSession(); + } + } + } + })); + } + catch (Exception e) + { + e.printStackTrace(); + closeProtocolSession(); + } + } + public void dataBlockReceived(AMQDataBlock message) throws Exception { _lastReceived = message; @@ -314,21 +344,14 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable null, mechanisms.getBytes(), locales.getBytes()); - _minaProtocolSession.write(responseBody.generateFrame(0)); + _networkDriver.send(responseBody.generateFrame(0).toNioByteBuffer()); } catch (AMQException e) { _logger.info("Received unsupported protocol initiation for protocol version: " + getProtocolVersion()); - _minaProtocolSession.write(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion())); - - // TODO: Close connection (but how to wait until message is sent?) - // ritchiem 2006-12-04 will this not do? - // WriteFuture future = _minaProtocolSession.write(new ProtocolInitiation(pv[i][PROTOCOLgetProtocolMajorVersion()], pv[i][PROTOCOLgetProtocolMinorVersion()])); - // future.join(); - // close connection - + _networkDriver.send(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion()).toNioByteBuffer()); } } @@ -437,8 +460,17 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable public void writeFrame(AMQDataBlock frame) { _lastSent = frame; - - _lastWriteFuture = _minaProtocolSession.write(frame); + final ByteBuffer buf = frame.toNioByteBuffer(); + _lastIoTime = System.currentTimeMillis(); + _writtenBytes += buf.remaining(); + fireAsynchEvent(_writeJob, new Event(new Runnable() + { + @Override + public void run() + { + _networkDriver.send(buf); + } + })); } public AMQShortString getContextKey() @@ -539,7 +571,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable { _maxNoOfChannels = value; } - + public void commitTransactions(AMQChannel channel) throws AMQException { if ((channel != null) && channel.isTransactional()) @@ -555,7 +587,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable channel.rollback(); } } - + /** * Close a specific channel. This will remove any resources used by the channel, including: <ul><li>any queue * subscriptions (this may in turn remove queues if they are auto delete</li> </ul> @@ -628,8 +660,8 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable { if (delay > 0) { - _minaProtocolSession.setIdleTime(IdleStatus.WRITER_IDLE, delay); - _minaProtocolSession.setIdleTime(IdleStatus.READER_IDLE, (int) (ApplicationRegistry.getInstance().getConfiguration().getHeartBeatTimeout() * delay)); + _networkDriver.setMaxWriteIdle(delay); + _networkDriver.setMaxReadIdle((int) (ApplicationRegistry.getInstance().getConfiguration().getHeartBeatTimeout() * delay)); } } @@ -672,7 +704,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable { task.doTask(this); } - + _closed = true; CurrentActor.get().message(_logSubject, ConnectionMessages.CON_1002()); @@ -699,21 +731,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable public void closeProtocolSession() { - closeProtocolSession(true); - } - - public void closeProtocolSession(boolean waitLast) - { - if (waitLast && (_lastWriteFuture != null)) - { - _logger.debug("Waiting for last write to join."); - _lastWriteFuture.join(LAST_WRITE_FUTURE_JOIN_TIMEOUT); - } - - _logger.debug("REALLY Closing protocol session:" + _minaProtocolSession); - final CloseFuture future = _minaProtocolSession.close(); - future.join(LAST_WRITE_FUTURE_JOIN_TIMEOUT); - + _networkDriver.close(); try { _stateManager.changeState(AMQState.CONNECTION_CLOSED); @@ -726,7 +744,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable public String toString() { - return _minaProtocolSession.getRemoteAddress() + "(" + (getAuthorizedID() == null ? "?" : getAuthorizedID().getName() + ")"); + return getRemoteAddress() + "(" + (getAuthorizedID() == null ? "?" : getAuthorizedID().getName() + ")"); } public String dump() @@ -737,7 +755,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable /** @return an object that can be used to identity */ public Object getKey() { - return _minaProtocolSession.getRemoteAddress(); + return getRemoteAddress(); } /** @@ -748,7 +766,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable */ public String getLocalFQDN() { - SocketAddress address = _minaProtocolSession.getLocalAddress(); + SocketAddress address = _networkDriver.getLocalAddress(); // we use the vmpipe address in some tests hence the need for this rather ugly test. The host // information is used by SASL primary. if (address instanceof InetSocketAddress) @@ -764,7 +782,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable throw new IllegalArgumentException("Unsupported socket address class: " + address); } } - + public SaslServer getSaslServer() { return _saslServer; @@ -837,7 +855,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable public Object getClientIdentifier() { - return (_minaProtocolSession != null) ? _minaProtocolSession.getRemoteAddress() : null; + return (_networkDriver != null) ? _networkDriver.getRemoteAddress() : null; } public VirtualHost getVirtualHost() @@ -867,7 +885,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable { _taskList.remove(task); } - + public ProtocolOutputConverter getProtocolOutputConverter() { return _protocolOutputConverter; @@ -888,7 +906,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable public SocketAddress getRemoteAddress() { - return _minaProtocolSession.getRemoteAddress(); + return _networkDriver.getRemoteAddress(); } public MethodRegistry getMethodRegistry() @@ -901,23 +919,136 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable return _dispatcher; } - public ProtocolSessionIdentifier getSessionIdentifier() + @Override + public void closed() { - return _sessionIdentifier; + try + { + closeSession(); + } + catch (AMQException e) + { + _logger.error("Could not close protocol engine", e); + } } - public String getClientVersion() + @Override + public void readerIdle() { - return (_clientVersion == null) ? null : _clientVersion.toString(); + // Nothing + } + + @Override + public void setNetworkDriver(NetworkDriver driver) + { + _networkDriver = driver; + } + + @Override + public void writerIdle() + { + _networkDriver.send(HeartbeatBody.FRAME.toNioByteBuffer()); } - public void setSender(Sender<java.nio.ByteBuffer> sender) + @Override + public void exception(Throwable throwable) { - // No-op, interface munging between this and AMQProtocolSession + if (throwable instanceof AMQProtocolHeaderException) + { + + writeFrame(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion())); + _networkDriver.close(); + + _logger.error("Error in protocol initiation " + this + ":" + getRemoteAddress() + " :" + throwable.getMessage(), throwable); + } + else if (throwable instanceof IOException) + { + _logger.error("IOException caught in" + this + ", session closed implictly: " + throwable); + } + else + { + _logger.error("Exception caught in" + this + ", closing session explictly: " + throwable, throwable); + + + MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(getProtocolVersion()); + ConnectionCloseBody closeBody = methodRegistry.createConnectionCloseBody(200,new AMQShortString(throwable.getMessage()),0,0); + + writeFrame(closeBody.generateFrame(0)); + + _networkDriver.close(); + } } + @Override public void init() { - // No-op, interface munging between this and AMQProtocolSession + // Do nothing + } + + @Override + public void setSender(Sender<ByteBuffer> sender) + { + // Do nothing + } + + @Override + public long getReadBytes() + { + return _readBytes; + } + + public long getWrittenBytes() + { + return _writtenBytes; + } + + public long getLastIoTime() + { + return _lastIoTime; + } + + public ProtocolSessionIdentifier getSessionIdentifier() + { + return _sessionIdentifier; + } + + public String getClientVersion() + { + return (_clientVersion == null) ? null : _clientVersion.toString(); + } + + /** + * Adds an {@link Event} to a {@link Job}, triggering the execution of the job if it is not already running. + * + * @param job The job. + * @param event The event to hand off asynchronously. + */ + void fireAsynchEvent(Job job, Event event) + { + + job.add(event); + + final ExecutorService pool = _poolReference .getPool(); + + if(pool == null) + { + return; + } + + // rather than perform additional checks on pool to check that it hasn't shutdown. + // catch the RejectedExecutionException that will result from executing on a shutdown pool + if (job.activate()) + { + try + { + pool.execute(job); + } + catch(RejectedExecutionException e) + { + _logger.warn("Thread pool shutdown while tasks still outstanding"); + } + } + } + } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngineFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngineFactory.java new file mode 100644 index 0000000000..ff0c007a60 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngineFactory.java @@ -0,0 +1,29 @@ +package org.apache.qpid.server.protocol; + +import org.apache.qpid.protocol.ProtocolEngine; +import org.apache.qpid.protocol.ProtocolEngineFactory; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.virtualhost.VirtualHostRegistry; +import org.apache.qpid.transport.NetworkDriver; + +public class AMQProtocolEngineFactory implements ProtocolEngineFactory +{ + private VirtualHostRegistry _vhosts; + + public AMQProtocolEngineFactory() + { + this(1); + } + + public AMQProtocolEngineFactory(Integer port) + { + _vhosts = ApplicationRegistry.getInstance(port).getVirtualHostRegistry(); + } + + + public ProtocolEngine newProtocolEngine(NetworkDriver networkDriver) + { + return new AMQProtocolEngine(_vhosts, networkDriver); + } + +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java index fff406bb3d..b0bef04354 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java @@ -34,6 +34,7 @@ import org.apache.qpid.server.output.ProtocolOutputConverter; import org.apache.qpid.server.virtualhost.VirtualHost; import java.security.Principal; +import java.util.List; public interface AMQProtocolSession extends AMQVersionAwareProtocolSession @@ -210,5 +211,19 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession public MethodDispatcher getMethodDispatcher(); public ProtocolSessionIdentifier getSessionIdentifier(); + + String getClientVersion(); + + long getLastIoTime(); + + long getWrittenBytes(); + + Long getMaximumNumberOfChannels(); + + void setMaximumNumberOfChannels(Long value); + + void commitTransactions(AMQChannel channel) throws AMQException; + + List<AMQChannel> getChannels(); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java index 81dbeeded2..8497c95e26 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java @@ -79,7 +79,7 @@ import org.apache.qpid.server.management.ManagedObject; @MBeanDescription("Management Bean for an AMQ Broker Connection") public class AMQProtocolSessionMBean extends AMQManagedObject implements ManagedConnection { - private AMQMinaProtocolSession _session = null; + private AMQProtocolSession _protocolSession = null; private String _name = null; // openmbean data types for representing the channel attributes @@ -92,10 +92,10 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed new AMQShortString("Broker Management Console has closed the connection."); @MBeanConstructor("Creates an MBean exposing an AMQ Broker Connection") - public AMQProtocolSessionMBean(AMQMinaProtocolSession session) throws NotCompliantMBeanException, OpenDataException + public AMQProtocolSessionMBean(AMQProtocolSession amqProtocolSession) throws NotCompliantMBeanException, OpenDataException { super(ManagedConnection.class, ManagedConnection.TYPE, ManagedConnection.VERSION); - _session = session; + _protocolSession = amqProtocolSession; String remote = getRemoteAddress(); remote = "anonymous".equals(remote) ? (remote + hashCode()) : remote; _name = jmxEncode(new StringBuffer(remote), 0).toString(); @@ -128,52 +128,52 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed public String getClientId() { - return (_session.getContextKey() == null) ? null : _session.getContextKey().toString(); + return (_protocolSession.getContextKey() == null) ? null : _protocolSession.getContextKey().toString(); } public String getAuthorizedId() { - return (_session.getAuthorizedID() != null ) ? _session.getAuthorizedID().getName() : null; + return (_protocolSession.getAuthorizedID() != null ) ? _protocolSession.getAuthorizedID().getName() : null; } public String getVersion() { - return (_session.getClientVersion() == null) ? null : _session.getClientVersion().toString(); + return (_protocolSession.getClientVersion() == null) ? null : _protocolSession.getClientVersion().toString(); } public Date getLastIoTime() { - return new Date(_session.getIOSession().getLastIoTime()); + return new Date(_protocolSession.getLastIoTime()); } public String getRemoteAddress() { - return _session.getIOSession().getRemoteAddress().toString(); + return _protocolSession.getRemoteAddress().toString(); } public ManagedObject getParentObject() { - return _session.getVirtualHost().getManagedObject(); + return _protocolSession.getVirtualHost().getManagedObject(); } public Long getWrittenBytes() { - return _session.getIOSession().getWrittenBytes(); + return _protocolSession.getWrittenBytes(); } public Long getReadBytes() { - return _session.getIOSession().getReadBytes(); + return _protocolSession.getWrittenBytes(); } public Long getMaximumNumberOfChannels() { - return _session.getMaximumNumberOfChannels(); + return _protocolSession.getMaximumNumberOfChannels(); } public void setMaximumNumberOfChannels(Long value) { - _session.setMaximumNumberOfChannels(value); + _protocolSession.setMaximumNumberOfChannels(value); } public String getObjectInstanceName() @@ -192,13 +192,13 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed CurrentActor.set(new ManagementActor(_logActor.getRootMessageLogger())); try { - AMQChannel channel = _session.getChannel(channelId); + AMQChannel channel = _protocolSession.getChannel(channelId); if (channel == null) { throw new JMException("The channel (channel Id = " + channelId + ") does not exist"); } - _session.commitTransactions(channel); + _protocolSession.commitTransactions(channel); } catch (AMQException ex) { @@ -221,13 +221,13 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed CurrentActor.set(new ManagementActor(_logActor.getRootMessageLogger())); try { - AMQChannel channel = _session.getChannel(channelId); + AMQChannel channel = _protocolSession.getChannel(channelId); if (channel == null) { throw new JMException("The channel (channel Id = " + channelId + ") does not exist"); } - _session.rollbackTransactions(channel); + _protocolSession.commitTransactions(channel); } catch (AMQException ex) { @@ -248,7 +248,7 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed public TabularData channels() throws OpenDataException { TabularDataSupport channelsList = new TabularDataSupport(_channelsType); - List<AMQChannel> list = _session.getChannels(); + List<AMQChannel> list = _protocolSession.getChannels(); for (AMQChannel channel : list) { @@ -274,7 +274,7 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed public void closeConnection() throws JMException { - MethodRegistry methodRegistry = _session.getMethodRegistry(); + MethodRegistry methodRegistry = _protocolSession.getMethodRegistry(); ConnectionCloseBody responseBody = methodRegistry.createConnectionCloseBody(AMQConstant.REPLY_SUCCESS.getCode(), // replyCode @@ -301,12 +301,12 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed try { - _session.writeFrame(responseBody.generateFrame(0)); + _protocolSession.writeFrame(responseBody.generateFrame(0)); try { - _session.closeSession(); + _protocolSession.closeSession(); } catch (AMQException ex) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java index b6137e83de..9575bfa1ec 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java @@ -258,7 +258,7 @@ public abstract class ApplicationRegistry implements IApplicationRegistry for (InetSocketAddress bindAddress : _acceptors.keySet()) { QpidAcceptor acceptor = _acceptors.get(bindAddress); - acceptor.getIoAcceptor().unbind(bindAddress); + acceptor.getNetworkDriver().close(); CurrentActor.get().message(BrokerMessages.BRK_1003(acceptor.toString(), bindAddress.getPort())); } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/network/FirewallPlugin.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/network/FirewallPlugin.java index 810be8ae22..3a81932123 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/network/FirewallPlugin.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/network/FirewallPlugin.java @@ -23,7 +23,6 @@ package org.apache.qpid.server.security.access.plugins.network; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketAddress; -import java.util.Iterator; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import java.util.regex.Pattern; @@ -32,7 +31,6 @@ import org.apache.commons.configuration.CompositeConfiguration; import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.XMLConfiguration; -import org.apache.qpid.server.protocol.AMQMinaProtocolSession; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.security.access.ACLPlugin; import org.apache.qpid.server.security.access.ACLPluginFactory; @@ -180,13 +178,13 @@ public class FirewallPlugin extends AbstractACLPlugin @Override public AuthzResult authoriseConnect(AMQProtocolSession session, VirtualHost virtualHost) { - if (!(session instanceof AMQMinaProtocolSession)) + SocketAddress sockAddr = session.getRemoteAddress(); + if (!(sockAddr instanceof InetSocketAddress)) { - return AuthzResult.ABSTAIN; // We only deal with tcp sessions, which - // mean MINA right now + return AuthzResult.ABSTAIN; // We only deal with tcp sessions } - InetAddress addr = getInetAdressFromMinaSession((AMQMinaProtocolSession) session); + InetAddress addr = ((InetSocketAddress) sockAddr).getAddress(); if (addr == null) { @@ -213,19 +211,6 @@ public class FirewallPlugin extends AbstractACLPlugin } - private InetAddress getInetAdressFromMinaSession(AMQMinaProtocolSession session) - { - SocketAddress remote = session.getIOSession().getRemoteAddress(); - if (remote instanceof InetSocketAddress) - { - return ((InetSocketAddress) remote).getAddress(); - } - else - { - return null; - } - } - @Override public void setConfiguration(Configuration config) throws ConfigurationException { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/QpidAcceptor.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/QpidAcceptor.java index 61cc7cdeb6..3ca22b60c8 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/QpidAcceptor.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/QpidAcceptor.java @@ -20,21 +20,21 @@ */ package org.apache.qpid.server.transport; -import org.apache.mina.common.IoAcceptor; +import org.apache.qpid.transport.NetworkDriver; public class QpidAcceptor { - IoAcceptor _acceptor; + NetworkDriver _driver; String _protocol; - public QpidAcceptor(IoAcceptor acceptor, String protocol) + public QpidAcceptor(NetworkDriver driver, String protocol) { - _acceptor = acceptor; + _driver = driver; _protocol = protocol; } - public IoAcceptor getIoAcceptor() + public NetworkDriver getNetworkDriver() { - return _acceptor; + return _driver; } public String toString() |