summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org/apache
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java199
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java56
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java1
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java1
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java (renamed from qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java)335
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngineFactory.java29
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java15
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java42
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/network/FirewallPlugin.java23
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/QpidAcceptor.java12
11 files changed, 423 insertions, 292 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java
index f8deb95628..c45e794145 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java
@@ -20,6 +20,13 @@
*/
package org.apache.qpid.server;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.Properties;
+
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
@@ -30,16 +37,9 @@ import org.apache.commons.cli.PosixParser;
import org.apache.log4j.Logger;
import org.apache.log4j.PropertyConfigurator;
import org.apache.log4j.xml.QpidLog4JConfigurator;
-import org.apache.mina.common.ByteBuffer;
-import org.apache.mina.common.FixedSizeByteBufferAllocator;
-import org.apache.mina.common.IoAcceptor;
-import org.apache.mina.transport.socket.nio.SocketAcceptorConfig;
-import org.apache.mina.transport.socket.nio.SocketSessionConfig;
-import org.apache.mina.util.NewThreadExecutor;
import org.apache.qpid.AMQException;
import org.apache.qpid.common.QpidProperties;
import org.apache.qpid.framing.ProtocolVersion;
-import org.apache.qpid.pool.ReadWriteThreadModel;
import org.apache.qpid.server.configuration.ServerConfiguration;
import org.apache.qpid.server.configuration.management.ConfigurationManagementMBean;
import org.apache.qpid.server.information.management.ServerInformationMBean;
@@ -48,19 +48,13 @@ import org.apache.qpid.server.logging.actors.BrokerActor;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.management.LoggingManagementMBean;
import org.apache.qpid.server.logging.messages.BrokerMessages;
-import org.apache.qpid.server.protocol.AMQPFastProtocolHandler;
-import org.apache.qpid.server.protocol.AMQPProtocolProvider;
+import org.apache.qpid.server.protocol.AMQProtocolEngineFactory;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
import org.apache.qpid.server.transport.QpidAcceptor;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.BindException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.util.Properties;
+import org.apache.qpid.ssl.SSLContextFactory;
+import org.apache.qpid.transport.NetworkDriver;
+import org.apache.qpid.transport.network.mina.MINANetworkDriver;
/**
* Main entry point for AMQPD.
@@ -300,20 +294,6 @@ public class Main
_brokerLogger.info("Starting Qpid Broker " + QpidProperties.getReleaseVersion()
+ " build: " + QpidProperties.getBuildVersion());
- ByteBuffer.setUseDirectBuffers(serverConfig.getEnableDirectBuffers());
-
- // the MINA default is currently to use the pooled allocator although this may change in future
- // once more testing of the performance of the simple allocator has been done
- if (!serverConfig.getEnablePooledAllocator())
- {
- ByteBuffer.setAllocator(new FixedSizeByteBufferAllocator());
- }
-
- if (serverConfig.getUseBiasedWrites())
- {
- System.setProperty("org.apache.qpid.use_write_biased_pool", "true");
- }
-
int port = serverConfig.getPort();
String portStr = commandLine.getOptionValue("p");
@@ -329,7 +309,54 @@ public class Main
}
}
- bind(port, serverConfig);
+ String bindAddr = commandLine.getOptionValue("b");
+ if (bindAddr == null)
+ {
+ bindAddr = serverConfig.getBind();
+ }
+ InetAddress bindAddress = null;
+ if (bindAddr.equals("wildcard"))
+ {
+ bindAddress = new InetSocketAddress(port).getAddress();
+ }
+ else
+ {
+ bindAddress = InetAddress.getByAddress(parseIP(bindAddr));
+ }
+
+ String keystorePath = serverConfig.getKeystorePath();
+ String keystorePassword = serverConfig.getKeystorePassword();
+ String certType = serverConfig.getCertType();
+ SSLContextFactory sslFactory = null;
+ boolean isSsl = false;
+
+ if (!serverConfig.getSSLOnly())
+ {
+ NetworkDriver driver = new MINANetworkDriver();
+ driver.bind(port, new InetAddress[]{bindAddress}, new AMQProtocolEngineFactory(),
+ serverConfig.getNetworkConfiguration(), null);
+ ApplicationRegistry.getInstance().addAcceptor(new InetSocketAddress(bindAddress, port),
+ new QpidAcceptor(driver,"TCP"));
+ CurrentActor.get().message(BrokerMessages.BRK_1002("TCP", port));
+ }
+
+ if (serverConfig.getEnableSSL())
+ {
+ sslFactory = new SSLContextFactory(keystorePath, keystorePassword, certType);
+ NetworkDriver driver = new MINANetworkDriver();
+ driver.bind(serverConfig.getSSLPort(), new InetAddress[]{bindAddress},
+ new AMQProtocolEngineFactory(), serverConfig.getNetworkConfiguration(), sslFactory);
+ ApplicationRegistry.getInstance().addAcceptor(new InetSocketAddress(bindAddress, port),
+ new QpidAcceptor(driver,"TCP"));
+ CurrentActor.get().message(BrokerMessages.BRK_1002("TCP/SSL", serverConfig.getSSLPort()));
+ }
+
+ //fixme qpid.AMQP should be using qpidproperties to get value
+ _brokerLogger.info("Qpid Broker Ready :" + QpidProperties.getReleaseVersion()
+ + " build: " + QpidProperties.getBuildVersion());
+
+ CurrentActor.get().message(BrokerMessages.BRK_1004());
+
}
finally
{
@@ -358,114 +385,6 @@ public class Main
}
}
- protected void bind(int port, ServerConfiguration config) throws BindException
- {
- String bindAddr = commandLine.getOptionValue("b");
- if (bindAddr == null)
- {
- bindAddr = config.getBind();
- }
-
- try
- {
- IoAcceptor acceptor;
-
- if (ApplicationRegistry.getInstance().getConfiguration().getQpidNIO())
- {
- _logger.warn("Using Qpid Multithreaded IO Processing");
- acceptor = new org.apache.mina.transport.socket.nio.MultiThreadSocketAcceptor(config.getProcessors(), new NewThreadExecutor());
- }
- else
- {
- _logger.warn("Using Mina IO Processing");
- acceptor = new org.apache.mina.transport.socket.nio.SocketAcceptor(config.getProcessors(), new NewThreadExecutor());
- }
-
- SocketAcceptorConfig sconfig = (SocketAcceptorConfig) acceptor.getDefaultConfig();
- SocketSessionConfig sc = (SocketSessionConfig) sconfig.getSessionConfig();
-
- sc.setReceiveBufferSize(config.getReceiveBufferSize());
- sc.setSendBufferSize(config.getWriteBufferSize());
- sc.setTcpNoDelay(config.getTcpNoDelay());
-
- // if we do not use the executor pool threading model we get the default leader follower
- // implementation provided by MINA
- if (config.getEnableExecutorPool())
- {
- sconfig.setThreadModel(ReadWriteThreadModel.getInstance());
- }
-
- if (!config.getEnableSSL() || !config.getSSLOnly())
- {
- AMQPFastProtocolHandler handler = new AMQPProtocolProvider().getHandler();
- InetSocketAddress bindAddress;
- if (bindAddr.equals("wildcard"))
- {
- bindAddress = new InetSocketAddress(port);
- }
- else
- {
- bindAddress = new InetSocketAddress(InetAddress.getByAddress(parseIP(bindAddr)), port);
- }
-
- bind(new QpidAcceptor(acceptor,"TCP"), bindAddress, handler, sconfig);
-
- //fixme qpid.AMQP should be using qpidproperties to get value
- _brokerLogger.info("Qpid.AMQP listening on non-SSL address " + bindAddress);
- }
-
- if (config.getEnableSSL())
- {
- AMQPFastProtocolHandler handler = new AMQPProtocolProvider().getHandler();
- try
- {
-
- bind(new QpidAcceptor(acceptor, "TCP/SSL"), new InetSocketAddress(config.getSSLPort()), handler, sconfig);
-
- //fixme qpid.AMQP should be using qpidproperties to get value
- _brokerLogger.info("Qpid.AMQP listening on SSL port " + config.getSSLPort());
-
- }
- catch (IOException e)
- {
- _brokerLogger.error("Unable to listen on SSL port: " + e, e);
- }
- }
-
- //fixme qpid.AMQP should be using qpidproperties to get value
- _brokerLogger.info("Qpid Broker Ready :" + QpidProperties.getReleaseVersion()
- + " build: " + QpidProperties.getBuildVersion());
-
- CurrentActor.get().message(BrokerMessages.BRK_1004());
-
- }
- catch (Exception e)
- {
- _logger.error("Unable to bind service to registry: " + e, e);
- //fixme this need tidying up
- throw new BindException(e.getMessage());
- }
- }
-
- /**
- * Ensure that any bound Acceptors are recorded in the registry so they can be closed later.
- *
- * @param acceptor
- * @param bindAddress
- * @param handler
- * @param sconfig
- *
- * @throws IOException from the acceptor.bind command
- */
- private void bind(QpidAcceptor acceptor, InetSocketAddress bindAddress, AMQPFastProtocolHandler handler, SocketAcceptorConfig sconfig) throws IOException
- {
- acceptor.getIoAcceptor().bind(bindAddress, handler, sconfig);
-
- CurrentActor.get().message(BrokerMessages.BRK_1002(acceptor.toString(), bindAddress.getPort()));
-
- ApplicationRegistry.getInstance().addAcceptor(bindAddress, acceptor);
- }
-
public static void main(String[] args)
{
//if the -Dlog4j.configuration property has not been set, enable the init override
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
index 7ea7738189..b3c8975c7c 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
@@ -38,6 +38,7 @@ import org.apache.qpid.server.configuration.management.ConfigurationManagementMB
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.qpid.transport.NetworkDriverConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -564,7 +565,7 @@ public class ServerConfiguration implements SignalHandler
public boolean getSSLOnly()
{
- return getConfig().getBoolean("connector.ssl.sslOnly", true);
+ return getConfig().getBoolean("connector.ssl.sslOnly", false);
}
public int getSSLPort()
@@ -613,4 +614,57 @@ public class ServerConfiguration implements SignalHandler
getConfig().getLong("housekeeping.expiredMessageCheckPeriod",
DEFAULT_HOUSEKEEPING_PERIOD));
}
+
+ public NetworkDriverConfiguration getNetworkConfiguration()
+ {
+ return new NetworkDriverConfiguration()
+ {
+
+ public Integer getTrafficClass()
+ {
+ return null;
+ }
+
+ public Boolean getTcpNoDelay()
+ {
+ // Can't call parent getTcpNoDelay since it just calls this one
+ return getConfig().getBoolean("connector.tcpNoDelay", true);
+ }
+
+ public Integer getSoTimeout()
+ {
+ return null;
+ }
+
+ public Integer getSoLinger()
+ {
+ return null;
+ }
+
+ public Integer getSendBufferSize()
+ {
+ return getBufferWriteLimit();
+ }
+
+ public Boolean getReuseAddress()
+ {
+ return null;
+ }
+
+ public Integer getReceiveBufferSize()
+ {
+ return getBufferReadLimit();
+ }
+
+ public Boolean getOOBInline()
+ {
+ return null;
+ }
+
+ public Boolean getKeepAlive()
+ {
+ return null;
+ }
+ };
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
index d287595e2d..71e07172ed 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
@@ -21,7 +21,6 @@
package org.apache.qpid.server.connection;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.protocol.AMQMinaProtocolSession;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQConnectionException;
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java
index d64fde1c20..002269bbaa 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.server.connection;
-import org.apache.qpid.server.protocol.AMQMinaProtocolSession;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.AMQException;
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
index 7bc4365152..49bdffb584 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
@@ -20,11 +20,25 @@
*/
package org.apache.qpid.server.protocol;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.security.Principal;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.management.JMException;
+import javax.security.sasl.SaslServer;
+
import org.apache.log4j.Logger;
-import org.apache.mina.common.CloseFuture;
-import org.apache.mina.common.IdleStatus;
-import org.apache.mina.common.IoServiceConfig;
-import org.apache.mina.common.IoSession;
import org.apache.mina.transport.vmpipe.VmPipeAddress;
import org.apache.qpid.AMQChannelException;
import org.apache.qpid.AMQConnectionException;
@@ -36,8 +50,11 @@ import org.apache.qpid.framing.AMQBody;
import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQProtocolHeaderException;
+import org.apache.qpid.framing.AMQProtocolVersionException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ChannelCloseOkBody;
+import org.apache.qpid.framing.ConnectionCloseBody;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.FieldTable;
@@ -46,18 +63,23 @@ import org.apache.qpid.framing.MethodDispatcher;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.framing.ProtocolInitiation;
import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.pool.Event;
+import org.apache.qpid.pool.Job;
+import org.apache.qpid.pool.PoolingFilter;
import org.apache.qpid.pool.ReadWriteThreadModel;
+import org.apache.qpid.pool.ReferenceCountingExecutorService;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
+import org.apache.qpid.protocol.ProtocolEngine;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.handler.ServerMethodDispatcherImpl;
+import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.actors.AMQPConnectionActor;
import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.subjects.ConnectionLogSubject;
-import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.messages.ConnectionMessages;
+import org.apache.qpid.server.logging.subjects.ConnectionLogSubject;
import org.apache.qpid.server.management.Managable;
import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.output.ProtocolOutputConverter;
@@ -67,22 +89,10 @@ import org.apache.qpid.server.state.AMQState;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.qpid.transport.NetworkDriver;
import org.apache.qpid.transport.Sender;
-import javax.management.JMException;
-import javax.security.sasl.SaslServer;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.security.Principal;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.atomic.AtomicLong;
-
-public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
+public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocolSession
{
private static final Logger _logger = Logger.getLogger(AMQProtocolSession.class);
@@ -94,8 +104,6 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
// channels. This value must be of the form 2^x - 1.
private static final int CHANNEL_CACHE_SIZE = 0xff;
- private final IoSession _minaProtocolSession;
-
private AMQShortString _contextKey;
private AMQShortString _clientVersion = null;
@@ -135,47 +143,43 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
private Principal _authorizedID;
private MethodDispatcher _dispatcher;
private ProtocolSessionIdentifier _sessionIdentifier;
-
- private static final long LAST_WRITE_FUTURE_JOIN_TIMEOUT = 60000L;
- private org.apache.mina.common.WriteFuture _lastWriteFuture;
-
+
// Create a simple ID that increments for ever new Session
private final long _sessionID = idGenerator.getAndIncrement();
private AMQPConnectionActor _actor;
private LogSubject _logSubject;
+ private NetworkDriver _networkDriver;
+
+ private long _lastIoTime;
+
+ private long _writtenBytes;
+ private long _readBytes;
+
+ private Job _readJob;
+ private Job _writeJob;
+
+ private ReferenceCountingExecutorService _poolReference = ReferenceCountingExecutorService.getInstance();
+
public ManagedObject getManagedObject()
{
return _managedObject;
}
- public AMQMinaProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry, AMQCodecFactory codecFactory)
- throws AMQException
+ public AMQProtocolEngine(VirtualHostRegistry virtualHostRegistry, NetworkDriver driver)
{
_stateManager = new AMQStateManager(virtualHostRegistry, this);
- _minaProtocolSession = session;
- session.setAttachment(this);
+ _networkDriver = driver;
+
+ _codecFactory = new AMQCodecFactory(true, this);
- _codecFactory = codecFactory;
+ ReadWriteThreadModel threadModel = ReadWriteThreadModel.getInstance();
+ _readJob = new Job(threadModel.getAsynchronousReadFilter(), PoolingFilter.MAX_JOB_EVENTS, true);
+ _writeJob = new Job(threadModel.getAsynchronousWriteFilter(), PoolingFilter.MAX_JOB_EVENTS, false);
_actor = new AMQPConnectionActor(this, virtualHostRegistry.getApplicationRegistry().getRootMessageLogger());
-
_actor.message(ConnectionMessages.CON_1001(null, null, false, false));
-
- try
- {
- IoServiceConfig config = session.getServiceConfig();
- ReadWriteThreadModel threadModel = (ReadWriteThreadModel) config.getThreadModel();
- threadModel.getAsynchronousReadFilter().createNewJobForSession(session);
- threadModel.getAsynchronousWriteFilter().createNewJobForSession(session);
- }
- catch (RuntimeException e)
- {
- e.printStackTrace();
- throw e;
-
- }
}
private AMQProtocolSessionMBean createMBean() throws AMQException
@@ -191,16 +195,6 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
}
}
- public IoSession getIOSession()
- {
- return _minaProtocolSession;
- }
-
- public static AMQProtocolSession getAMQProtocolSession(IoSession minaProtocolSession)
- {
- return (AMQProtocolSession) minaProtocolSession.getAttachment();
- }
-
public long getSessionID()
{
return _sessionID;
@@ -211,6 +205,42 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
return _actor;
}
+ @Override
+ public void received(final ByteBuffer msg)
+ {
+ _lastIoTime = System.currentTimeMillis();
+ try
+ {
+ final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg);
+ fireAsynchEvent(_readJob, new Event(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ // Decode buffer
+
+ for (AMQDataBlock dataBlock : dataBlocks)
+ {
+ try
+ {
+ dataBlockReceived(dataBlock);
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ closeProtocolSession();
+ }
+ }
+ }
+ }));
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ closeProtocolSession();
+ }
+ }
+
public void dataBlockReceived(AMQDataBlock message) throws Exception
{
_lastReceived = message;
@@ -314,21 +344,14 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
null,
mechanisms.getBytes(),
locales.getBytes());
- _minaProtocolSession.write(responseBody.generateFrame(0));
+ _networkDriver.send(responseBody.generateFrame(0).toNioByteBuffer());
}
catch (AMQException e)
{
_logger.info("Received unsupported protocol initiation for protocol version: " + getProtocolVersion());
- _minaProtocolSession.write(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion()));
-
- // TODO: Close connection (but how to wait until message is sent?)
- // ritchiem 2006-12-04 will this not do?
- // WriteFuture future = _minaProtocolSession.write(new ProtocolInitiation(pv[i][PROTOCOLgetProtocolMajorVersion()], pv[i][PROTOCOLgetProtocolMinorVersion()]));
- // future.join();
- // close connection
-
+ _networkDriver.send(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion()).toNioByteBuffer());
}
}
@@ -437,8 +460,17 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
public void writeFrame(AMQDataBlock frame)
{
_lastSent = frame;
-
- _lastWriteFuture = _minaProtocolSession.write(frame);
+ final ByteBuffer buf = frame.toNioByteBuffer();
+ _lastIoTime = System.currentTimeMillis();
+ _writtenBytes += buf.remaining();
+ fireAsynchEvent(_writeJob, new Event(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ _networkDriver.send(buf);
+ }
+ }));
}
public AMQShortString getContextKey()
@@ -539,7 +571,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
{
_maxNoOfChannels = value;
}
-
+
public void commitTransactions(AMQChannel channel) throws AMQException
{
if ((channel != null) && channel.isTransactional())
@@ -555,7 +587,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
channel.rollback();
}
}
-
+
/**
* Close a specific channel. This will remove any resources used by the channel, including: <ul><li>any queue
* subscriptions (this may in turn remove queues if they are auto delete</li> </ul>
@@ -628,8 +660,8 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
{
if (delay > 0)
{
- _minaProtocolSession.setIdleTime(IdleStatus.WRITER_IDLE, delay);
- _minaProtocolSession.setIdleTime(IdleStatus.READER_IDLE, (int) (ApplicationRegistry.getInstance().getConfiguration().getHeartBeatTimeout() * delay));
+ _networkDriver.setMaxWriteIdle(delay);
+ _networkDriver.setMaxReadIdle((int) (ApplicationRegistry.getInstance().getConfiguration().getHeartBeatTimeout() * delay));
}
}
@@ -672,7 +704,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
{
task.doTask(this);
}
-
+
_closed = true;
CurrentActor.get().message(_logSubject, ConnectionMessages.CON_1002());
@@ -699,21 +731,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
public void closeProtocolSession()
{
- closeProtocolSession(true);
- }
-
- public void closeProtocolSession(boolean waitLast)
- {
- if (waitLast && (_lastWriteFuture != null))
- {
- _logger.debug("Waiting for last write to join.");
- _lastWriteFuture.join(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
- }
-
- _logger.debug("REALLY Closing protocol session:" + _minaProtocolSession);
- final CloseFuture future = _minaProtocolSession.close();
- future.join(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
-
+ _networkDriver.close();
try
{
_stateManager.changeState(AMQState.CONNECTION_CLOSED);
@@ -726,7 +744,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
public String toString()
{
- return _minaProtocolSession.getRemoteAddress() + "(" + (getAuthorizedID() == null ? "?" : getAuthorizedID().getName() + ")");
+ return getRemoteAddress() + "(" + (getAuthorizedID() == null ? "?" : getAuthorizedID().getName() + ")");
}
public String dump()
@@ -737,7 +755,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
/** @return an object that can be used to identity */
public Object getKey()
{
- return _minaProtocolSession.getRemoteAddress();
+ return getRemoteAddress();
}
/**
@@ -748,7 +766,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
*/
public String getLocalFQDN()
{
- SocketAddress address = _minaProtocolSession.getLocalAddress();
+ SocketAddress address = _networkDriver.getLocalAddress();
// we use the vmpipe address in some tests hence the need for this rather ugly test. The host
// information is used by SASL primary.
if (address instanceof InetSocketAddress)
@@ -764,7 +782,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
throw new IllegalArgumentException("Unsupported socket address class: " + address);
}
}
-
+
public SaslServer getSaslServer()
{
return _saslServer;
@@ -837,7 +855,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
public Object getClientIdentifier()
{
- return (_minaProtocolSession != null) ? _minaProtocolSession.getRemoteAddress() : null;
+ return (_networkDriver != null) ? _networkDriver.getRemoteAddress() : null;
}
public VirtualHost getVirtualHost()
@@ -867,7 +885,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
{
_taskList.remove(task);
}
-
+
public ProtocolOutputConverter getProtocolOutputConverter()
{
return _protocolOutputConverter;
@@ -888,7 +906,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
public SocketAddress getRemoteAddress()
{
- return _minaProtocolSession.getRemoteAddress();
+ return _networkDriver.getRemoteAddress();
}
public MethodRegistry getMethodRegistry()
@@ -901,23 +919,136 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
return _dispatcher;
}
- public ProtocolSessionIdentifier getSessionIdentifier()
+ @Override
+ public void closed()
{
- return _sessionIdentifier;
+ try
+ {
+ closeSession();
+ }
+ catch (AMQException e)
+ {
+ _logger.error("Could not close protocol engine", e);
+ }
}
- public String getClientVersion()
+ @Override
+ public void readerIdle()
{
- return (_clientVersion == null) ? null : _clientVersion.toString();
+ // Nothing
+ }
+
+ @Override
+ public void setNetworkDriver(NetworkDriver driver)
+ {
+ _networkDriver = driver;
+ }
+
+ @Override
+ public void writerIdle()
+ {
+ _networkDriver.send(HeartbeatBody.FRAME.toNioByteBuffer());
}
- public void setSender(Sender<java.nio.ByteBuffer> sender)
+ @Override
+ public void exception(Throwable throwable)
{
- // No-op, interface munging between this and AMQProtocolSession
+ if (throwable instanceof AMQProtocolHeaderException)
+ {
+
+ writeFrame(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion()));
+ _networkDriver.close();
+
+ _logger.error("Error in protocol initiation " + this + ":" + getRemoteAddress() + " :" + throwable.getMessage(), throwable);
+ }
+ else if (throwable instanceof IOException)
+ {
+ _logger.error("IOException caught in" + this + ", session closed implictly: " + throwable);
+ }
+ else
+ {
+ _logger.error("Exception caught in" + this + ", closing session explictly: " + throwable, throwable);
+
+
+ MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(getProtocolVersion());
+ ConnectionCloseBody closeBody = methodRegistry.createConnectionCloseBody(200,new AMQShortString(throwable.getMessage()),0,0);
+
+ writeFrame(closeBody.generateFrame(0));
+
+ _networkDriver.close();
+ }
}
+ @Override
public void init()
{
- // No-op, interface munging between this and AMQProtocolSession
+ // Do nothing
+ }
+
+ @Override
+ public void setSender(Sender<ByteBuffer> sender)
+ {
+ // Do nothing
+ }
+
+ @Override
+ public long getReadBytes()
+ {
+ return _readBytes;
+ }
+
+ public long getWrittenBytes()
+ {
+ return _writtenBytes;
+ }
+
+ public long getLastIoTime()
+ {
+ return _lastIoTime;
+ }
+
+ public ProtocolSessionIdentifier getSessionIdentifier()
+ {
+ return _sessionIdentifier;
+ }
+
+ public String getClientVersion()
+ {
+ return (_clientVersion == null) ? null : _clientVersion.toString();
+ }
+
+ /**
+ * Adds an {@link Event} to a {@link Job}, triggering the execution of the job if it is not already running.
+ *
+ * @param job The job.
+ * @param event The event to hand off asynchronously.
+ */
+ void fireAsynchEvent(Job job, Event event)
+ {
+
+ job.add(event);
+
+ final ExecutorService pool = _poolReference .getPool();
+
+ if(pool == null)
+ {
+ return;
+ }
+
+ // rather than perform additional checks on pool to check that it hasn't shutdown.
+ // catch the RejectedExecutionException that will result from executing on a shutdown pool
+ if (job.activate())
+ {
+ try
+ {
+ pool.execute(job);
+ }
+ catch(RejectedExecutionException e)
+ {
+ _logger.warn("Thread pool shutdown while tasks still outstanding");
+ }
+ }
+
}
+
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngineFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngineFactory.java
new file mode 100644
index 0000000000..ff0c007a60
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngineFactory.java
@@ -0,0 +1,29 @@
+package org.apache.qpid.server.protocol;
+
+import org.apache.qpid.protocol.ProtocolEngine;
+import org.apache.qpid.protocol.ProtocolEngineFactory;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.qpid.transport.NetworkDriver;
+
+public class AMQProtocolEngineFactory implements ProtocolEngineFactory
+{
+ private VirtualHostRegistry _vhosts;
+
+ public AMQProtocolEngineFactory()
+ {
+ this(1);
+ }
+
+ public AMQProtocolEngineFactory(Integer port)
+ {
+ _vhosts = ApplicationRegistry.getInstance(port).getVirtualHostRegistry();
+ }
+
+
+ public ProtocolEngine newProtocolEngine(NetworkDriver networkDriver)
+ {
+ return new AMQProtocolEngine(_vhosts, networkDriver);
+ }
+
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
index fff406bb3d..b0bef04354 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
@@ -34,6 +34,7 @@ import org.apache.qpid.server.output.ProtocolOutputConverter;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.security.Principal;
+import java.util.List;
public interface AMQProtocolSession extends AMQVersionAwareProtocolSession
@@ -210,5 +211,19 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession
public MethodDispatcher getMethodDispatcher();
public ProtocolSessionIdentifier getSessionIdentifier();
+
+ String getClientVersion();
+
+ long getLastIoTime();
+
+ long getWrittenBytes();
+
+ Long getMaximumNumberOfChannels();
+
+ void setMaximumNumberOfChannels(Long value);
+
+ void commitTransactions(AMQChannel channel) throws AMQException;
+
+ List<AMQChannel> getChannels();
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
index 81dbeeded2..8497c95e26 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
@@ -79,7 +79,7 @@ import org.apache.qpid.server.management.ManagedObject;
@MBeanDescription("Management Bean for an AMQ Broker Connection")
public class AMQProtocolSessionMBean extends AMQManagedObject implements ManagedConnection
{
- private AMQMinaProtocolSession _session = null;
+ private AMQProtocolSession _protocolSession = null;
private String _name = null;
// openmbean data types for representing the channel attributes
@@ -92,10 +92,10 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed
new AMQShortString("Broker Management Console has closed the connection.");
@MBeanConstructor("Creates an MBean exposing an AMQ Broker Connection")
- public AMQProtocolSessionMBean(AMQMinaProtocolSession session) throws NotCompliantMBeanException, OpenDataException
+ public AMQProtocolSessionMBean(AMQProtocolSession amqProtocolSession) throws NotCompliantMBeanException, OpenDataException
{
super(ManagedConnection.class, ManagedConnection.TYPE, ManagedConnection.VERSION);
- _session = session;
+ _protocolSession = amqProtocolSession;
String remote = getRemoteAddress();
remote = "anonymous".equals(remote) ? (remote + hashCode()) : remote;
_name = jmxEncode(new StringBuffer(remote), 0).toString();
@@ -128,52 +128,52 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed
public String getClientId()
{
- return (_session.getContextKey() == null) ? null : _session.getContextKey().toString();
+ return (_protocolSession.getContextKey() == null) ? null : _protocolSession.getContextKey().toString();
}
public String getAuthorizedId()
{
- return (_session.getAuthorizedID() != null ) ? _session.getAuthorizedID().getName() : null;
+ return (_protocolSession.getAuthorizedID() != null ) ? _protocolSession.getAuthorizedID().getName() : null;
}
public String getVersion()
{
- return (_session.getClientVersion() == null) ? null : _session.getClientVersion().toString();
+ return (_protocolSession.getClientVersion() == null) ? null : _protocolSession.getClientVersion().toString();
}
public Date getLastIoTime()
{
- return new Date(_session.getIOSession().getLastIoTime());
+ return new Date(_protocolSession.getLastIoTime());
}
public String getRemoteAddress()
{
- return _session.getIOSession().getRemoteAddress().toString();
+ return _protocolSession.getRemoteAddress().toString();
}
public ManagedObject getParentObject()
{
- return _session.getVirtualHost().getManagedObject();
+ return _protocolSession.getVirtualHost().getManagedObject();
}
public Long getWrittenBytes()
{
- return _session.getIOSession().getWrittenBytes();
+ return _protocolSession.getWrittenBytes();
}
public Long getReadBytes()
{
- return _session.getIOSession().getReadBytes();
+ return _protocolSession.getWrittenBytes();
}
public Long getMaximumNumberOfChannels()
{
- return _session.getMaximumNumberOfChannels();
+ return _protocolSession.getMaximumNumberOfChannels();
}
public void setMaximumNumberOfChannels(Long value)
{
- _session.setMaximumNumberOfChannels(value);
+ _protocolSession.setMaximumNumberOfChannels(value);
}
public String getObjectInstanceName()
@@ -192,13 +192,13 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed
CurrentActor.set(new ManagementActor(_logActor.getRootMessageLogger()));
try
{
- AMQChannel channel = _session.getChannel(channelId);
+ AMQChannel channel = _protocolSession.getChannel(channelId);
if (channel == null)
{
throw new JMException("The channel (channel Id = " + channelId + ") does not exist");
}
- _session.commitTransactions(channel);
+ _protocolSession.commitTransactions(channel);
}
catch (AMQException ex)
{
@@ -221,13 +221,13 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed
CurrentActor.set(new ManagementActor(_logActor.getRootMessageLogger()));
try
{
- AMQChannel channel = _session.getChannel(channelId);
+ AMQChannel channel = _protocolSession.getChannel(channelId);
if (channel == null)
{
throw new JMException("The channel (channel Id = " + channelId + ") does not exist");
}
- _session.rollbackTransactions(channel);
+ _protocolSession.commitTransactions(channel);
}
catch (AMQException ex)
{
@@ -248,7 +248,7 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed
public TabularData channels() throws OpenDataException
{
TabularDataSupport channelsList = new TabularDataSupport(_channelsType);
- List<AMQChannel> list = _session.getChannels();
+ List<AMQChannel> list = _protocolSession.getChannels();
for (AMQChannel channel : list)
{
@@ -274,7 +274,7 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed
public void closeConnection() throws JMException
{
- MethodRegistry methodRegistry = _session.getMethodRegistry();
+ MethodRegistry methodRegistry = _protocolSession.getMethodRegistry();
ConnectionCloseBody responseBody =
methodRegistry.createConnectionCloseBody(AMQConstant.REPLY_SUCCESS.getCode(),
// replyCode
@@ -301,12 +301,12 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed
try
{
- _session.writeFrame(responseBody.generateFrame(0));
+ _protocolSession.writeFrame(responseBody.generateFrame(0));
try
{
- _session.closeSession();
+ _protocolSession.closeSession();
}
catch (AMQException ex)
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
index b6137e83de..9575bfa1ec 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
@@ -258,7 +258,7 @@ public abstract class ApplicationRegistry implements IApplicationRegistry
for (InetSocketAddress bindAddress : _acceptors.keySet())
{
QpidAcceptor acceptor = _acceptors.get(bindAddress);
- acceptor.getIoAcceptor().unbind(bindAddress);
+ acceptor.getNetworkDriver().close();
CurrentActor.get().message(BrokerMessages.BRK_1003(acceptor.toString(), bindAddress.getPort()));
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/network/FirewallPlugin.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/network/FirewallPlugin.java
index 810be8ae22..3a81932123 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/network/FirewallPlugin.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/network/FirewallPlugin.java
@@ -23,7 +23,6 @@ package org.apache.qpid.server.security.access.plugins.network;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
-import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;
@@ -32,7 +31,6 @@ import org.apache.commons.configuration.CompositeConfiguration;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.XMLConfiguration;
-import org.apache.qpid.server.protocol.AMQMinaProtocolSession;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.security.access.ACLPlugin;
import org.apache.qpid.server.security.access.ACLPluginFactory;
@@ -180,13 +178,13 @@ public class FirewallPlugin extends AbstractACLPlugin
@Override
public AuthzResult authoriseConnect(AMQProtocolSession session, VirtualHost virtualHost)
{
- if (!(session instanceof AMQMinaProtocolSession))
+ SocketAddress sockAddr = session.getRemoteAddress();
+ if (!(sockAddr instanceof InetSocketAddress))
{
- return AuthzResult.ABSTAIN; // We only deal with tcp sessions, which
- // mean MINA right now
+ return AuthzResult.ABSTAIN; // We only deal with tcp sessions
}
- InetAddress addr = getInetAdressFromMinaSession((AMQMinaProtocolSession) session);
+ InetAddress addr = ((InetSocketAddress) sockAddr).getAddress();
if (addr == null)
{
@@ -213,19 +211,6 @@ public class FirewallPlugin extends AbstractACLPlugin
}
- private InetAddress getInetAdressFromMinaSession(AMQMinaProtocolSession session)
- {
- SocketAddress remote = session.getIOSession().getRemoteAddress();
- if (remote instanceof InetSocketAddress)
- {
- return ((InetSocketAddress) remote).getAddress();
- }
- else
- {
- return null;
- }
- }
-
@Override
public void setConfiguration(Configuration config) throws ConfigurationException
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/QpidAcceptor.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/QpidAcceptor.java
index 61cc7cdeb6..3ca22b60c8 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/QpidAcceptor.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/QpidAcceptor.java
@@ -20,21 +20,21 @@
*/
package org.apache.qpid.server.transport;
-import org.apache.mina.common.IoAcceptor;
+import org.apache.qpid.transport.NetworkDriver;
public class QpidAcceptor
{
- IoAcceptor _acceptor;
+ NetworkDriver _driver;
String _protocol;
- public QpidAcceptor(IoAcceptor acceptor, String protocol)
+ public QpidAcceptor(NetworkDriver driver, String protocol)
{
- _acceptor = acceptor;
+ _driver = driver;
_protocol = protocol;
}
- public IoAcceptor getIoAcceptor()
+ public NetworkDriver getNetworkDriver()
{
- return _acceptor;
+ return _driver;
}
public String toString()