From 5cf96f160dd6c390ed4f40927766d61d7120f6c0 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Wed, 7 Sep 2011 09:52:21 +0000 Subject: NO-JIRA: Merged from trunk to amqp-1-0 sandbox branch git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/rg-amqp-1-0-sandbox@1166088 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/server/AMQChannel.java | 23 +- .../main/java/org/apache/qpid/server/Broker.java | 17 +- .../src/main/java/org/apache/qpid/server/Main.java | 2 +- .../qpid/server/connection/ConnectionRegistry.java | 11 +- .../handler/ConnectionCloseMethodHandler.java | 2 + .../qpid/server/protocol/AMQProtocolEngine.java | 146 +++--- .../qpid/server/protocol/AMQProtocolSession.java | 4 +- .../protocol/MultiVersionProtocolEngine.java | 43 +- .../MultiVersionProtocolEngineFactory.java | 6 + .../qpid/server/protocol/ProtocolEngine_0_10.java | 26 +- .../protocol/InternalTestProtocolSession.java | 2 +- qpid/java/build.xml | 2 +- .../qpid/client/AMQConnectionDelegate_0_10.java | 27 +- .../qpid/client/AMQConnectionDelegate_8_0.java | 9 +- .../apache/qpid/client/AMQConnectionFactory.java | 65 ++- .../org/apache/qpid/client/AMQSession_0_10.java | 7 + .../qpid/client/failover/FailoverRetrySupport.java | 4 +- .../client/message/AMQMessageDelegate_0_10.java | 61 ++- .../qpid/client/protocol/AMQProtocolHandler.java | 151 ++---- .../protocol/ProtocolBufferMonitorFilter.java | 115 ----- .../jndi/PropertiesFileInitialContextFactory.java | 21 +- .../apache/qpid/client/protocol/MockIoSession.java | 312 ------------ .../qpid/test/unit/jndi/JNDIPropertyFileTest.java | 19 + .../org/apache/qpid/codec/AMQCodecFactory.java | 17 +- .../java/org/apache/qpid/codec/AMQDecoder.java | 188 +------ .../java/org/apache/qpid/codec/AMQEncoder.java | 66 --- .../apache/qpid/framing/AMQDataBlockDecoder.java | 18 - .../apache/qpid/framing/AMQDataBlockEncoder.java | 18 +- .../org/apache/qpid/framing/ContentHeaderBody.java | 11 + .../src/main/java/org/apache/qpid/pool/Job.java | 253 ---------- .../java/org/apache/qpid/protocol/AMQConstant.java | 2 +- .../org/apache/qpid/protocol/ProtocolEngine.java | 5 + .../qpid/protocol/ProtocolEngineFactory.java | 2 +- .../org/apache/qpid/transport/ClientDelegate.java | 5 +- .../java/org/apache/qpid/transport/Connection.java | 42 +- .../qpid/transport/SenderClosedException.java | 52 ++ .../qpid/transport/network/NetworkConnection.java | 2 + .../qpid/transport/network/NetworkTransport.java | 2 - .../network/OutgoingNetworkTransport.java | 2 + .../apache/qpid/transport/network/Transport.java | 9 +- .../transport/network/io/IoNetworkConnection.java | 8 +- .../transport/network/io/IoNetworkTransport.java | 146 +++++- .../qpid/transport/network/io/IoReceiver.java | 19 +- .../apache/qpid/transport/network/io/IoSender.java | 13 +- .../network/mina/MinaNetworkConnection.java | 81 ---- .../transport/network/mina/MinaNetworkHandler.java | 157 ------ .../network/mina/MinaNetworkTransport.java | 221 --------- .../qpid/transport/network/mina/MinaSender.java | 79 --- .../transport/network/security/SecurityLayer.java | 147 +----- .../network/security/SecurityLayerFactory.java | 161 ++++++ .../network/security/ssl/SSLReceiver.java | 11 +- .../transport/network/security/ssl/SSLSender.java | 13 +- .../java/org/apache/qpid/session/TestSession.java | 277 ----------- .../qpid/transport/TestNetworkConnection.java | 22 +- .../qpid/transport/network/TransportTest.java | 5 +- .../network/mina/MinaNetworkHandlerTest.java | 540 --------------------- .../qpid/server/failover/FailoverMethodTest.java | 3 +- .../qpid/server/security/acl/ExternalACLTest.java | 42 +- .../qpid/test/client/QueueBrowserAutoAckTest.java | 4 +- .../client/connection/ConnectionFactoryTest.java | 41 ++ .../client/connection/ExceptionListenerTest.java | 6 +- .../client/protocol/AMQProtocolSessionTest.java | 1 - .../qpid/test/utils/protocol/TestIoSession.java | 104 ---- qpid/java/test-profiles/CPPExcludes | 12 + qpid/java/test-profiles/Excludes | 2 + qpid/java/test-profiles/Java010Excludes | 11 + 66 files changed, 968 insertions(+), 2927 deletions(-) delete mode 100644 qpid/java/client/src/main/java/org/apache/qpid/client/protocol/ProtocolBufferMonitorFilter.java delete mode 100644 qpid/java/client/src/test/java/org/apache/qpid/client/protocol/MockIoSession.java delete mode 100644 qpid/java/common/src/main/java/org/apache/qpid/codec/AMQEncoder.java delete mode 100644 qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java create mode 100644 qpid/java/common/src/main/java/org/apache/qpid/transport/SenderClosedException.java delete mode 100644 qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkConnection.java delete mode 100644 qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkHandler.java delete mode 100644 qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java delete mode 100644 qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java create mode 100644 qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayerFactory.java delete mode 100644 qpid/java/common/src/test/java/org/apache/qpid/session/TestSession.java delete mode 100644 qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MinaNetworkHandlerTest.java create mode 100644 qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionFactoryTest.java delete mode 100644 qpid/java/systests/src/main/java/org/apache/qpid/test/utils/protocol/TestIoSession.java diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 7bba540bd7..bb29592433 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -76,6 +76,7 @@ import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.LocalTransaction; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.transport.TransportException; import java.util.ArrayList; import java.util.Collection; @@ -207,7 +208,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel { return isTransactional() && _txnUpdateTime.get() > 0 && _transaction.getTransactionStartTime() > 0; } - + private void incrementOutstandingTxnsIfNecessary() { if(isTransactional()) @@ -386,6 +387,13 @@ public class AMQChannel implements SessionConfig, AMQSessionModel _currentMessage = null; throw e; } + catch (RuntimeException e) + { + // we want to make sure we don't keep a reference to the message in the + // event of an error + _currentMessage = null; + throw e; + } } protected void routeCurrentMessage() throws AMQException @@ -457,6 +465,11 @@ public class AMQChannel implements SessionConfig, AMQSessionModel _tag2SubscriptionMap.remove(tag); throw e; } + catch (RuntimeException e) + { + _tag2SubscriptionMap.remove(tag); + throw e; + } return tag; } @@ -514,7 +527,11 @@ public class AMQChannel implements SessionConfig, AMQSessionModel } catch (AMQException e) { - _logger.error("Caught AMQException whilst attempting to reque:" + e); + _logger.error("Caught AMQException whilst attempting to requeue:" + e); + } + catch (TransportException e) + { + _logger.error("Caught TransportException whilst attempting to requeue:" + e); } getConfigStore().removeConfiguredObject(this); @@ -1432,7 +1449,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel { _session.mgmtCloseChannel(_channelId); } - + public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) throws AMQException { if (inTransaction()) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/Broker.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/Broker.java index e29b07b3e7..3abdf47688 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/Broker.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/Broker.java @@ -57,7 +57,6 @@ import org.apache.qpid.ssl.SSLContextFactory; import org.apache.qpid.transport.NetworkTransportConfiguration; import org.apache.qpid.transport.network.IncomingNetworkTransport; import org.apache.qpid.transport.network.Transport; -import org.apache.qpid.transport.network.mina.MinaNetworkTransport; public class Broker { @@ -88,7 +87,7 @@ public class Broker startup(new BrokerOptions()); } - public void startup(BrokerOptions options) throws Exception + public void startup(final BrokerOptions options) throws Exception { try { @@ -201,9 +200,9 @@ public class Broker { for(int port : ports) { - final Set supported = + final Set supported = getSupportedVersions(port, exclude_0_10, exclude_0_9_1, exclude_0_9, exclude_0_8); - final NetworkTransportConfiguration settings = + final NetworkTransportConfiguration settings = new ServerNetworkTransportConfiguration(serverConfig, port, bindAddress.getHostName(), Transport.TCP); final IncomingNetworkTransport transport = Transport.getIncomingTransportInstance(); @@ -226,12 +225,12 @@ public class Broker for(int sslPort : sslPorts) { - final Set supported = + final Set supported = getSupportedVersions(sslPort, exclude_0_10, exclude_0_9_1, exclude_0_9, exclude_0_8); - final NetworkTransportConfiguration settings = + final NetworkTransportConfiguration settings = new ServerNetworkTransportConfiguration(serverConfig, sslPort, bindAddress.getHostName(), Transport.TCP); - final IncomingNetworkTransport transport = new MinaNetworkTransport(); + final IncomingNetworkTransport transport = Transport.getIncomingTransportInstance(); final MultiVersionProtocolEngineFactory protocolEngineFactory = new MultiVersionProtocolEngineFactory(hostName, supported); @@ -252,7 +251,7 @@ public class Broker } private static Set getSupportedVersions(final int port, final Set exclude_0_10, - final Set exclude_0_9_1, final Set exclude_0_9, + final Set exclude_0_9_1, final Set exclude_0_9, final Set exclude_0_8) { final EnumSet supported = EnumSet.allOf(AmqpProtocolVersion.class); @@ -276,7 +275,7 @@ public class Broker return supported; } - + private File getConfigFile(final String fileName, final String defaultFileName, final String qpidHome, boolean throwOnFileNotFound) throws InitException diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java index 449b52d737..15d23e30b6 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java @@ -63,7 +63,7 @@ public class Main { execute(); } - catch(Exception e) + catch(Throwable e) { System.err.println("Exception during startup: " + e); e.printStackTrace(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java index 3786c2020c..1c01ce465d 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java @@ -29,6 +29,8 @@ import org.apache.qpid.AMQException; import org.apache.qpid.common.Closeable; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.protocol.AMQConnectionModel; +import org.apache.qpid.server.protocol.AMQProtocolEngine; +import org.apache.qpid.transport.TransportException; public class ConnectionRegistry implements IConnectionRegistry, Closeable { @@ -44,19 +46,24 @@ public class ConnectionRegistry implements IConnectionRegistry, Closeable /** Close all of the currently open connections. */ public void close() { + _logger.debug("Closing connection registry :" + _registry.size() + " connections."); while (!_registry.isEmpty()) { AMQConnectionModel connection = _registry.get(0); - closeConnection(connection, AMQConstant.INTERNAL_ERROR, "Broker is shutting down"); + closeConnection(connection, AMQConstant.CONNECTION_FORCED, "Broker is shutting down"); } } - + public void closeConnection(AMQConnectionModel connection, AMQConstant cause, String message) { try { connection.close(cause, message); } + catch (TransportException e) + { + _logger.warn("Error closing connection:" + e.getMessage()); + } catch (AMQException e) { _logger.warn("Error closing connection:" + e.getMessage()); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java index dade5d5f54..f8e4eab0b6 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java @@ -68,5 +68,7 @@ public class ConnectionCloseMethodHandler implements StateAwareMethodListener _sender; + private NetworkConnection _network; + private Sender _sender; public ManagedObject getManagedObject() { @@ -184,11 +180,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr { _stateManager = new AMQStateManager(virtualHostRegistry, this); _codecFactory = new AMQCodecFactory(true, this); - _poolReference.acquireExecutorService(); - _readJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, true); - _writeJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, false); - _network = network; - _sender = _network.getSender(); + + setNetworkConnection(network); _sessionID = connectionId; _actor = new AMQPConnectionActor(this, virtualHostRegistry.getApplicationRegistry().getRootMessageLogger()); @@ -204,6 +197,17 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr initialiseStatistics(); } + public void setNetworkConnection(NetworkConnection network) + { + setNetworkConnection(network, network.getSender()); + } + + public void setNetworkConnection(NetworkConnection network, Sender sender) + { + _network = network; + _sender = sender; + } + private AMQProtocolSessionMBean createMBean() throws JMException { return new AMQProtocolSessionMBean(this); @@ -240,26 +244,18 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr try { final ArrayList dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg); - Job.fireAsynchEvent(_poolReference.getPool(), _readJob, new Runnable() + for (AMQDataBlock dataBlock : dataBlocks) { - public void run() + try { - // Decode buffer - - for (AMQDataBlock dataBlock : dataBlocks) - { - try - { - dataBlockReceived(dataBlock); - } - catch (Exception e) - { - _logger.error("Unexpected exception when processing datablock", e); - closeProtocolSession(); - } - } + dataBlockReceived(dataBlock); + } + catch (Exception e) + { + _logger.error("Unexpected exception when processing datablock", e); + closeProtocolSession(); } - }); + } } catch (Exception e) { @@ -337,6 +333,11 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr closeChannel(channelId); throw e; } + catch (TransportException e) + { + closeChannel(channelId); + throw e; + } } finally { @@ -368,6 +369,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr mechanisms.getBytes(), locales.getBytes()); _sender.send(responseBody.generateFrame(0).toNioByteBuffer()); + _sender.flush(); } catch (AMQException e) @@ -375,6 +377,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr _logger.info("Received unsupported protocol initiation for protocol version: " + getProtocolVersion()); _sender.send(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion()).toNioByteBuffer()); + _sender.flush(); } } @@ -430,19 +433,19 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr AMQConstant.CHANNEL_ERROR.getName().toString()); _logger.info(e.getMessage() + " whilst processing:" + methodBody); - closeConnection(channelId, ce, false); + closeConnection(channelId, ce); } } catch (AMQConnectionException e) { _logger.info(e.getMessage() + " whilst processing:" + methodBody); - closeConnection(channelId, e, false); + closeConnection(channelId, e); } catch (AMQSecurityException e) { AMQConnectionException ce = evt.getMethod().getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage()); _logger.info(e.getMessage() + " whilst processing:" + methodBody); - closeConnection(channelId, ce, false); + closeConnection(channelId, ce); } } catch (Exception e) @@ -485,19 +488,14 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr * * @param frame the frame to write */ - public void writeFrame(AMQDataBlock frame) + public synchronized void writeFrame(AMQDataBlock frame) { _lastSent = frame; final ByteBuffer buf = frame.toNioByteBuffer(); _lastIoTime = System.currentTimeMillis(); _writtenBytes += buf.remaining(); - Job.fireAsynchEvent(_poolReference.getPool(), _writeJob, new Runnable() - { - public void run() - { - _sender.send(buf); - } - }); + _sender.send(buf); + _sender.flush(); } public AMQShortString getContextKey() @@ -729,7 +727,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr } closeAllChannels(); - + getConfigStore().removeConfiguredObject(this); if (_managedObject != null) @@ -749,7 +747,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr _closed = true; notifyAll(); } - _poolReference.releaseExecutorService(); CurrentActor.get().message(_logSubject, ConnectionMessages.CLOSE()); } } @@ -772,27 +769,32 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr } } - public void closeConnection(int channelId, AMQConnectionException e, boolean closeProtocolSession) throws AMQException + public void closeConnection(int channelId, AMQConnectionException e) throws AMQException { - if (_logger.isInfoEnabled()) + try { - _logger.info("Closing connection due to: " + e); - } - - markChannelAwaitingCloseOk(channelId); - closeSession(); - _stateManager.changeState(AMQState.CONNECTION_CLOSING); - writeFrame(e.getCloseFrame(channelId)); + if (_logger.isInfoEnabled()) + { + _logger.info("Closing connection due to: " + e); + } - if (closeProtocolSession) + markChannelAwaitingCloseOk(channelId); + closeSession(); + _stateManager.changeState(AMQState.CONNECTION_CLOSING); + writeFrame(e.getCloseFrame(channelId)); + } + finally { closeProtocolSession(); } + + } public void closeProtocolSession() { - _sender.close(); + _network.close(); + try { _stateManager.changeState(AMQState.CONNECTION_CLOSED); @@ -801,6 +803,10 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr { _logger.info(e.getMessage()); } + catch (TransportException e) + { + _logger.info(e.getMessage()); + } } public String toString() @@ -923,7 +929,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr _virtualHost = virtualHost; _virtualHost.getConnectionRegistry().registerConnection(this); - + _configStore.addConfiguredObject(this); try @@ -960,12 +966,12 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr } _authorizedSubject = authorizedSubject; } - + public Subject getAuthorizedSubject() { return _authorizedSubject; } - + public Principal getAuthorizedPrincipal() { return _authorizedSubject == null ? null : UsernamePrincipal.getUsernamePrincipalFromSubject(_authorizedSubject); @@ -1001,6 +1007,10 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr { _logger.error("Could not close protocol engine", e); } + catch (TransportException e) + { + _logger.error("Could not close protocol engine", e); + } } public void readerIdle() @@ -1154,7 +1164,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr { return false; } - + public void mgmtClose() { MethodRegistry methodRegistry = getMethodRegistry(); @@ -1256,7 +1266,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr new AMQShortString(message), 0,0); - writeFrame(responseBody.generateFrame((Integer)session.getID())); + writeFrame(responseBody.generateFrame((Integer)session.getID())); } public void close(AMQConstant cause, String message) throws AMQException @@ -1264,12 +1274,12 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr closeConnection(0, new AMQConnectionException(cause, message, 0, 0, getProtocolOutputConverter().getProtocolMajorVersion(), getProtocolOutputConverter().getProtocolMinorVersion(), - (Throwable) null), true); + (Throwable) null)); } public List getSessionModels() { - List sessions = new ArrayList(); + List sessions = new ArrayList(); for (AMQChannel channel : getChannels()) { sessions.add((AMQSessionModel) channel); @@ -1301,27 +1311,27 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr } _virtualHost.registerMessageReceived(messageSize, timestamp); } - + public StatisticsCounter getMessageReceiptStatistics() { return _messagesReceived; } - + public StatisticsCounter getDataReceiptStatistics() { return _dataReceived; } - + public StatisticsCounter getMessageDeliveryStatistics() { return _messagesDelivered; } - + public StatisticsCounter getDataDeliveryStatistics() { return _dataDelivered; } - + public void resetStatistics() { _messagesDelivered.reset(); @@ -1334,7 +1344,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr { setStatisticsEnabled(!StatisticsCounter.DISABLE_STATISTICS && _registry.getConfiguration().isStatisticsGenerationConnectionsEnabled()); - + _messagesDelivered = new StatisticsCounter("messages-delivered-" + getSessionID()); _dataDelivered = new StatisticsCounter("data-delivered-" + getSessionID()); _messagesReceived = new StatisticsCounter("messages-received-" + getSessionID()); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java index 9116bf2767..c1b5b02f8f 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java @@ -163,8 +163,10 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, Auth /** This must be called when the session is _closed in order to free up any resources managed by the session. */ void closeSession() throws AMQException; + void closeProtocolSession(); + /** This must be called to close the session in order to free up any resources managed by the session. */ - void closeConnection(int channelId, AMQConnectionException e, boolean closeProtocolSession) throws AMQException; + void closeConnection(int channelId, AMQConnectionException e) throws AMQException; /** @return a key that uniquely identifies this session */ diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java index e1287443df..3852253058 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java @@ -44,7 +44,7 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine private IApplicationRegistry _appRegistry; private NetworkConnection _network; private Sender _sender; - + private volatile ServerProtocolEngine _delegate = new SelfDelegateProtocolEngine(); public MultiVersionProtocolEngine(IApplicationRegistry appRegistry, @@ -52,15 +52,24 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine Set supported, NetworkConnection network, long id) + { + this(appRegistry,fqdn,supported,id); + setNetworkConnection(network); + } + + public MultiVersionProtocolEngine(IApplicationRegistry appRegistry, + String fqdn, + Set supported, + long id) { _id = id; _appRegistry = appRegistry; _fqdn = fqdn; _supported = supported; - _network = network; - _sender = _network.getSender(); + } + public SocketAddress getRemoteAddress() { return _delegate.getRemoteAddress(); @@ -96,6 +105,7 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine _delegate.readerIdle(); } + public void received(ByteBuffer msg) { _delegate.received(msg); @@ -180,6 +190,18 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine (byte) 0 }; + public void setNetworkConnection(NetworkConnection networkConnection) + { + setNetworkConnection(networkConnection, networkConnection.getSender()); + } + + public void setNetworkConnection(NetworkConnection network, Sender sender) + { + _network = network; + _sender = sender; + } + + private static interface DelegateCreator { AmqpProtocolVersion getVersion(); @@ -365,6 +387,11 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine } + public void setNetworkConnection(NetworkConnection network, Sender sender) + { + + } + public long getConnectionId() { return _id; @@ -443,14 +470,19 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine if(newDelegate == null) { _sender.send(ByteBuffer.wrap(newestSupported)); + _sender.flush(); _delegate = new ClosedDelegateProtocolEngine(); + + _network.close(); + } else { _delegate = newDelegate; _header.flip(); + _delegate.setNetworkConnection(_network, _sender); _delegate.received(_header); if(msg.hasRemaining()) { @@ -486,5 +518,10 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine { } + + public void setNetworkConnection(NetworkConnection network, Sender sender) + { + + } } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java index 8a7159bdc2..7e327b221f 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java @@ -48,4 +48,10 @@ public class MultiVersionProtocolEngineFactory implements ProtocolEngineFactory { return new MultiVersionProtocolEngine(_appRegistry, _fqdn, _supported, network, ID_GENERATOR.getAndIncrement()); } + + public ServerProtocolEngine newProtocolEngine() + { + return new MultiVersionProtocolEngine(_appRegistry, _fqdn, _supported, ID_GENERATOR.getAndIncrement()); + } + } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java index 3ff650a480..48a8a1bf42 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.protocol; import org.apache.qpid.protocol.ServerProtocolEngine; +import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.network.InputHandler; import org.apache.qpid.transport.network.Assembler; import org.apache.qpid.transport.network.Disassembler; @@ -31,6 +32,7 @@ import org.apache.qpid.server.logging.messages.ConnectionMessages; import org.apache.qpid.server.registry.IApplicationRegistry; import java.net.SocketAddress; +import java.nio.ByteBuffer; import java.util.UUID; public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocolEngine, ConnectionConfig @@ -52,11 +54,16 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol super(new Assembler(conn)); _connection = conn; _connection.setConnectionConfig(this); - _network = network; + _id = appRegistry.getConfigStore().createId(); _appRegistry = appRegistry; - _connection.setSender(new Disassembler(_network.getSender(), MAX_FRAME_SIZE)); + if(network != null) + { + setNetworkConnection(network); + } + + _connection.onOpen(new Runnable() { public void run() @@ -65,6 +72,19 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol } }); + } + + public void setNetworkConnection(NetworkConnection network) + { + setNetworkConnection(network, network.getSender()); + } + + public void setNetworkConnection(NetworkConnection network, Sender sender) + { + _network = network; + + _connection.setSender(new Disassembler(sender, MAX_FRAME_SIZE)); + // FIXME Two log messages to maintain compatibility with earlier protocol versions _connection.getLogActor().message(ConnectionMessages.OPEN(null, null, false, false)); _connection.getLogActor().message(ConnectionMessages.OPEN(null, "0-10", false, true)); @@ -186,7 +206,7 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol { return false; } - + public void mgmtClose() { _connection.mgmtClose(); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java index c058d0b0d8..5a411c6807 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java @@ -196,7 +196,7 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr return _closed; } - public void closeProtocolSession(boolean waitLast) + public void closeProtocolSession() { // Override as we don't have a real IOSession to close. // The alternative is to fully implement the TestIOSession to return a CloseFuture from close(); diff --git a/qpid/java/build.xml b/qpid/java/build.xml index 4761bad877..ff2cfd0e9e 100644 --- a/qpid/java/build.xml +++ b/qpid/java/build.xml @@ -77,7 +77,7 @@ - + diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java index 6b9cf909a8..6dfbb969e7 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java @@ -1,6 +1,6 @@ package org.apache.qpid.client; /* - * + * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -8,16 +8,16 @@ package org.apache.qpid.client; * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. - * + * */ @@ -43,6 +43,7 @@ import org.apache.qpid.jms.Session; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.transport.Connection; import org.apache.qpid.transport.ConnectionClose; +import org.apache.qpid.transport.ConnectionCloseCode; import org.apache.qpid.transport.ConnectionException; import org.apache.qpid.transport.ConnectionListener; import org.apache.qpid.transport.ConnectionSettings; @@ -58,7 +59,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec * This class logger. */ private static final Logger _logger = LoggerFactory.getLogger(AMQConnectionDelegate_0_10.class); - + /** * The name of the UUID property */ @@ -273,10 +274,10 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec } ConnectionClose close = exc.getClose(); - if (close == null) + if (close == null || close.getReplyCode() == ConnectionCloseCode.CONNECTION_FORCED) { _conn.getProtocolHandler().setFailoverLatch(new CountDownLatch(1)); - + try { if (_conn.firePreFailover(false) && _conn.attemptReconnection()) @@ -345,12 +346,12 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec { return ProtocolVersion.v0_10; } - + public String getUUID() { - return (String)_qpidConnection.getServerProperties().get(UUID_NAME); + return (String)_qpidConnection.getServerProperties().get(UUID_NAME); } - + private ConnectionSettings retriveConnectionSettings(BrokerDetails brokerDetail) { ConnectionSettings conSettings = brokerDetail.buildConnectionSettings(); @@ -358,7 +359,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec conSettings.setVhost(_conn.getVirtualHost()); conSettings.setUsername(_conn.getUsername()); conSettings.setPassword(_conn.getPassword()); - + // Pass client name from connection URL Map clientProps = new HashMap(); try @@ -375,7 +376,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec return conSettings; } - + // The idle_timeout prop is in milisecs while // the new heartbeat prop is in secs private int getHeartbeatInterval(BrokerDetails brokerDetail) @@ -390,7 +391,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec { heartbeat = Integer.parseInt(brokerDetail.getProperty(BrokerDetails.OPTIONS_HEARTBEAT)); } - else if (Integer.getInteger(ClientProperties.IDLE_TIMEOUT_PROP_NAME) != null) + else if (Integer.getInteger(ClientProperties.IDLE_TIMEOUT_PROP_NAME) != null) { heartbeat = Integer.getInteger(ClientProperties.IDLE_TIMEOUT_PROP_NAME)/1000; _logger.warn("JVM arg -Didle_timeout= is deprecated, please use -Dqpid.heartbeat="); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java index 8bc9889050..948f5178a6 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.net.ConnectException; import java.nio.channels.UnresolvedAddressException; import java.security.GeneralSecurityException; +import java.security.Security; import java.text.MessageFormat; import java.util.ArrayList; import java.util.EnumSet; @@ -55,6 +56,8 @@ import org.apache.qpid.transport.ConnectionSettings; import org.apache.qpid.transport.network.NetworkConnection; import org.apache.qpid.transport.network.OutgoingNetworkTransport; import org.apache.qpid.transport.network.Transport; +import org.apache.qpid.transport.network.security.SecurityLayer; +import org.apache.qpid.transport.network.security.SecurityLayerFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -118,9 +121,11 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate } } + SecurityLayer securityLayer = SecurityLayerFactory.newInstance(settings); + OutgoingNetworkTransport transport = Transport.getOutgoingTransportInstance(getProtocolVersion()); - NetworkConnection network = transport.connect(settings, _conn._protocolHandler, sslContext); - _conn._protocolHandler.setNetworkConnection(network); + NetworkConnection network = transport.connect(settings, securityLayer.receiver(_conn._protocolHandler), sslContext); + _conn._protocolHandler.setNetworkConnection(network, securityLayer.sender(network.getSender())); _conn._protocolHandler.getProtocolSession().init(); // this blocks until the connection has been set up or when an error // has prevented the connection being set up diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java index fc2d6c94eb..f0c003e02a 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java @@ -113,27 +113,35 @@ public class AMQConnectionFactory implements ConnectionFactory, QueueConnectionF public Connection createConnection(String userName, String password, String id) throws JMSException { - try + if (_connectionDetails != null) { - _connectionDetails.setUsername(userName); - _connectionDetails.setPassword(password); - - if (id != null && !id.equals("")) + try { - _connectionDetails.setClientName(id); - } - else if (_connectionDetails.getClientName() == null || _connectionDetails.getClientName().equals("")) + ConnectionURL connectionDetails = new AMQConnectionURL(_connectionDetails.toString()); + connectionDetails.setUsername(userName); + connectionDetails.setPassword(password); + + if (id != null && !id.equals("")) + { + connectionDetails.setClientName(id); + } + else if (connectionDetails.getClientName() == null || connectionDetails.getClientName().equals("")) + { + connectionDetails.setClientName(getUniqueClientID()); + } + return new AMQConnection(connectionDetails); + } + catch (Exception e) { - _connectionDetails.setClientName(getUniqueClientID()); + JMSException jmse = new JMSException("Error creating connection: " + e.getMessage()); + jmse.setLinkedException(e); + jmse.initCause(e); + throw jmse; } - return new AMQConnection(_connectionDetails); } - catch (Exception e) + else { - JMSException jmse = new JMSException("Error creating connection: " + e.getMessage()); - jmse.setLinkedException(e); - jmse.initCause(e); - throw jmse; + throw new JMSException("The connection factory wasn't created with a proper URL, the connection details are empty"); } } @@ -285,19 +293,30 @@ public class AMQConnectionFactory implements ConnectionFactory, QueueConnectionF { if (_connectionDetails != null) { - _connectionDetails.setUsername(username); - _connectionDetails.setPassword(password); - - if (_connectionDetails.getClientName() == null || _connectionDetails.getClientName().equals("")) + try { - _connectionDetails.setClientName(getUniqueClientID()); + ConnectionURL connectionDetails = new AMQConnectionURL(_connectionDetails.toString()); + connectionDetails.setUsername(username); + connectionDetails.setPassword(password); + + if (connectionDetails.getClientName() == null || connectionDetails.getClientName().equals("")) + { + connectionDetails.setClientName(getUniqueClientID()); + } + return new XAConnectionImpl(connectionDetails); + } + catch (Exception e) + { + JMSException jmse = new JMSException("Error creating XA Connection: " + e.getMessage()); + jmse.setLinkedException(e); + jmse.initCause(e); + throw jmse; } } else { - throw new JMSException("A URL must be specified to access XA connections"); - } - return createXAConnection(); + throw new JMSException("The connection factory wasn't created with a proper URL, the connection details are empty"); + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 75d96d67af..b5868cd235 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -1007,6 +1007,13 @@ public class AMQSession_0_10 extends AMQSession Automatically retry the continuation accross fail-overs until it succeeds, or raises an exception. * * - * @todo Another continuation. Could use an interface Continuation (as described in other todos, for example, see - * {@link org.apache.qpid.pool.Job}). Then have a wrapping continuation (this), which blocks on an arbitrary + * @todo Another continuation. Could use an interface Continuation (as described in other todos) + * Then have a wrapping continuation (this), which blocks on an arbitrary * Condition or Latch (specified in constructor call), that this blocks on before calling the wrapped Continuation. * Must work on Java 1.4, so check retrotranslator works on Lock/Condition or latch first. Argument and return type * to match wrapped condition as type parameters. Rename to AsyncConditionalContinuation or something like that. diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java index 182b7b65d8..1c2c46cf51 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java @@ -47,6 +47,7 @@ import org.apache.qpid.client.AMQSession_0_10; import org.apache.qpid.client.CustomJMSXProperty; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.jms.Message; +import org.apache.qpid.messaging.Address; import org.apache.qpid.transport.DeliveryProperties; import org.apache.qpid.transport.ExchangeQueryResult; import org.apache.qpid.transport.Future; @@ -55,6 +56,8 @@ import org.apache.qpid.transport.MessageDeliveryMode; import org.apache.qpid.transport.MessageDeliveryPriority; import org.apache.qpid.transport.MessageProperties; import org.apache.qpid.transport.ReplyTo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * This extends AbstractAMQMessageDelegate which contains common code between @@ -63,6 +66,7 @@ import org.apache.qpid.transport.ReplyTo; */ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate { + private static final Logger _logger = LoggerFactory.getLogger(AMQMessageDelegate_0_10.class); private static final Map> _destinationCache = Collections.synchronizedMap(new HashMap>()); public static final String JMS_TYPE = "x-jms-type"; @@ -95,8 +99,22 @@ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate AMQDestination dest; - dest = generateDestination(new AMQShortString(_deliveryProps.getExchange()), + if (AMQDestination.getDefaultDestSyntax() == AMQDestination.DestSyntax.BURL) + { + dest = generateDestination(new AMQShortString(_deliveryProps.getExchange()), new AMQShortString(_deliveryProps.getRoutingKey())); + } + else + { + String subject = null; + if (messageProps != null && messageProps.getApplicationHeaders() != null) + { + subject = (String)messageProps.getApplicationHeaders().get(QpidMessageProperties.QPID_SUBJECT); + } + dest = (AMQDestination) convertToAddressBasedDestination(_deliveryProps.getExchange(), + _deliveryProps.getRoutingKey(), subject); + } + setJMSDestination(dest); } @@ -242,13 +260,50 @@ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate String exchange = replyTo.getExchange(); String routingKey = replyTo.getRoutingKey(); - dest = generateDestination(new AMQShortString(exchange), new AMQShortString(routingKey)); + if (AMQDestination.getDefaultDestSyntax() == AMQDestination.DestSyntax.BURL) + { + + dest = generateDestination(new AMQShortString(exchange), new AMQShortString(routingKey)); + } + else + { + dest = convertToAddressBasedDestination(exchange,routingKey,null); + } _destinationCache.put(replyTo, new SoftReference(dest)); } return dest; } } + + private Destination convertToAddressBasedDestination(String exchange, String routingKey, String subject) + { + String addr; + if ("".equals(exchange)) // type Queue + { + subject = (subject == null) ? "" : "/" + subject; + addr = routingKey + subject; + } + else + { + addr = exchange + "/" + routingKey; + } + + try + { + return AMQDestination.createDestination("ADDR:" + addr.toString()); + } + catch(Exception e) + { + // An exception is only thrown here if the address syntax is invalid. + // Logging the exception, but not throwing as this is only important to Qpid developers. + // An exception here means a bug in the code. + _logger.error("Exception when constructing an address string from the ReplyTo struct"); + + // falling back to the old way of doing it to ensure the application continues. + return generateDestination(new AMQShortString(exchange), new AMQShortString(routingKey)); + } + } public void setJMSReplyTo(Destination destination) throws JMSException { @@ -337,7 +392,7 @@ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate Destination replyTo = getJMSReplyTo(); if(replyTo != null) { - return ((AMQDestination)replyTo).toURL(); + return ((AMQDestination)replyTo).toString(); } else { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index 34c6468629..6d6cd9cae5 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -31,7 +31,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; -import org.apache.mina.filter.codec.ProtocolCodecException; import org.apache.qpid.AMQConnectionClosedException; import org.apache.qpid.AMQDisconnectedException; import org.apache.qpid.AMQException; @@ -57,8 +56,6 @@ import org.apache.qpid.framing.HeartbeatBody; import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.framing.ProtocolInitiation; import org.apache.qpid.framing.ProtocolVersion; -import org.apache.qpid.pool.Job; -import org.apache.qpid.pool.ReferenceCountingExecutorService; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQMethodListener; @@ -169,14 +166,12 @@ public class AMQProtocolHandler implements ProtocolEngine /** Object to lock on when changing the latch */ private Object _failoverLatchChange = new Object(); private AMQCodecFactory _codecFactory; - private Job _readJob; - private Job _writeJob; - private ReferenceCountingExecutorService _poolReference = ReferenceCountingExecutorService.getInstance(); + private ProtocolVersion _suggestedProtocolVersion; private long _writtenBytes; private long _readBytes; - private NetworkTransport _transport; + private NetworkConnection _network; private Sender _sender; @@ -191,24 +186,6 @@ public class AMQProtocolHandler implements ProtocolEngine _protocolSession = new AMQProtocolSession(this, _connection); _stateManager = new AMQStateManager(_protocolSession); _codecFactory = new AMQCodecFactory(false, _protocolSession); - _poolReference.setThreadFactory(new ThreadFactory() - { - - public Thread newThread(final Runnable runnable) - { - try - { - return Threading.getThreadFactory().createThread(runnable); - } - catch (Exception e) - { - throw new RuntimeException("Failed to create thread", e); - } - } - }); - _readJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, true); - _writeJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, false); - _poolReference.acquireExecutorService(); _failoverHandler = new FailoverHandler(this); } @@ -329,17 +306,7 @@ public class AMQProtocolHandler implements ProtocolEngine } else { - - if (cause instanceof ProtocolCodecException) - { - _logger.info("Protocol Exception caught NOT going to attempt failover as " + - "cause isn't AMQConnectionClosedException: " + cause, cause); - - AMQException amqe = new AMQException("Protocol handler error: " + cause, cause); - propagateExceptionToAllWaiters(amqe); - } _connection.exceptionReceived(cause); - } // FIXME Need to correctly handle other exceptions. Things like ... @@ -433,76 +400,63 @@ public class AMQProtocolHandler implements ProtocolEngine public void received(ByteBuffer msg) { + _readBytes += msg.remaining(); try { - _readBytes += msg.remaining(); final ArrayList dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg); - Job.fireAsynchEvent(_poolReference.getPool(), _readJob, new Runnable() + // Decode buffer + + for (AMQDataBlock message : dataBlocks) { - public void run() - { - // Decode buffer + if (PROTOCOL_DEBUG) + { + _protocolLogger.info(String.format("RECV: [%s] %s", this, message)); + } - for (AMQDataBlock message : dataBlocks) + if(message instanceof AMQFrame) { + final boolean debug = _logger.isDebugEnabled(); + final long msgNumber = ++_messageReceivedCount; - try - { - if (PROTOCOL_DEBUG) - { - _protocolLogger.info(String.format("RECV: [%s] %s", this, message)); - } - - if(message instanceof AMQFrame) - { - final boolean debug = _logger.isDebugEnabled(); - final long msgNumber = ++_messageReceivedCount; - - if (debug && ((msgNumber % 1000) == 0)) - { - _logger.debug("Received " + _messageReceivedCount + " protocol messages"); - } - - AMQFrame frame = (AMQFrame) message; - - final AMQBody bodyFrame = frame.getBodyFrame(); - - HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody); - - bodyFrame.handle(frame.getChannel(), _protocolSession); - - _connection.bytesReceived(_readBytes); - } - else if (message instanceof ProtocolInitiation) - { - // We get here if the server sends a response to our initial protocol header - // suggesting an alternate ProtocolVersion; the server will then close the - // connection. - ProtocolInitiation protocolInit = (ProtocolInitiation) message; - _suggestedProtocolVersion = protocolInit.checkVersion(); - _logger.info("Broker suggested using protocol version:" + _suggestedProtocolVersion); - - // get round a bug in old versions of qpid whereby the connection is not closed - _stateManager.changeState(AMQState.CONNECTION_CLOSED); - } - } - catch (Exception e) + if (debug && ((msgNumber % 1000) == 0)) { - _logger.error("Exception processing frame", e); - propagateExceptionToFrameListeners(e); - exception(e); + _logger.debug("Received " + _messageReceivedCount + " protocol messages"); } + + AMQFrame frame = (AMQFrame) message; + + final AMQBody bodyFrame = frame.getBodyFrame(); + + HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody); + + bodyFrame.handle(frame.getChannel(), _protocolSession); + + _connection.bytesReceived(_readBytes); + } + else if (message instanceof ProtocolInitiation) + { + // We get here if the server sends a response to our initial protocol header + // suggesting an alternate ProtocolVersion; the server will then close the + // connection. + ProtocolInitiation protocolInit = (ProtocolInitiation) message; + _suggestedProtocolVersion = protocolInit.checkVersion(); + _logger.info("Broker suggested using protocol version:" + _suggestedProtocolVersion); + + // get round a bug in old versions of qpid whereby the connection is not closed + _stateManager.changeState(AMQState.CONNECTION_CLOSED); } } - }); } catch (Exception e) { + _logger.error("Exception processing frame", e); propagateExceptionToFrameListeners(e); exception(e); } + + } public void methodBodyReceived(final int channelId, final AMQBody bodyFrame) @@ -568,17 +522,13 @@ public class AMQProtocolHandler implements ProtocolEngine writeFrame(frame, false); } - public void writeFrame(AMQDataBlock frame, boolean wait) + public synchronized void writeFrame(AMQDataBlock frame, boolean wait) { final ByteBuffer buf = frame.toNioByteBuffer(); _writtenBytes += buf.remaining(); - Job.fireAsynchEvent(_poolReference.getPool(), _writeJob, new Runnable() - { - public void run() - { - _sender.send(buf); - } - }); + _sender.send(buf); + _sender.flush(); + if (PROTOCOL_DEBUG) { _protocolLogger.debug(String.format("SEND: [%s] %s", this, frame)); @@ -595,10 +545,6 @@ public class AMQProtocolHandler implements ProtocolEngine _connection.bytesSent(_writtenBytes); - if (wait) - { - _sender.flush(); - } } /** @@ -723,7 +669,7 @@ public class AMQProtocolHandler implements ProtocolEngine _logger.debug("FailoverException interrupted connection close, ignoring as connection close anyway."); } } - _poolReference.releaseExecutorService(); + } /** @return the number of bytes read from this protocol session */ @@ -840,9 +786,14 @@ public class AMQProtocolHandler implements ProtocolEngine } public void setNetworkConnection(NetworkConnection network) + { + setNetworkConnection(network, network.getSender()); + } + + public void setNetworkConnection(NetworkConnection network, Sender sender) { _network = network; - _sender = network.getSender(); + _sender = sender; } /** @param delay delay in seconds (not ms) */ diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/ProtocolBufferMonitorFilter.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/ProtocolBufferMonitorFilter.java deleted file mode 100644 index bbd0a7b144..0000000000 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/ProtocolBufferMonitorFilter.java +++ /dev/null @@ -1,115 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.client.protocol; - -import org.apache.mina.common.IoFilterAdapter; -import org.apache.mina.common.IoSession; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * A MINA filter that monitors the numbers of messages pending to be sent by MINA. It outputs a message - * when a threshold has been exceeded, and has a frequency configuration so that messages are not output - * too often. - * - */ -public class ProtocolBufferMonitorFilter extends IoFilterAdapter -{ - private static final Logger _logger = LoggerFactory.getLogger(ProtocolBufferMonitorFilter.class); - - public static final long DEFAULT_FREQUENCY = 5000; - - public static final int DEFAULT_THRESHOLD = 3000; - - private int _bufferedMessages = 0; - - private int _threshold; - - private long _lastMessageOutputTime; - - private long _outputFrequencyInMillis; - - public ProtocolBufferMonitorFilter() - { - _threshold = DEFAULT_THRESHOLD; - _outputFrequencyInMillis = DEFAULT_FREQUENCY; - } - - public ProtocolBufferMonitorFilter(int threshold, long frequency) - { - _threshold = threshold; - _outputFrequencyInMillis = frequency; - } - - public void messageReceived(NextFilter nextFilter, IoSession session, Object message) throws Exception - { - _bufferedMessages++; - if (_bufferedMessages > _threshold) - { - long now = System.currentTimeMillis(); - if ((now - _lastMessageOutputTime) > _outputFrequencyInMillis) - { - _logger.warn("Protocol message buffer exceeded threshold of " + _threshold + ". Current backlog: " - + _bufferedMessages); - _lastMessageOutputTime = now; - } - } - - nextFilter.messageReceived(session, message); - } - - public void messageSent(NextFilter nextFilter, IoSession session, Object message) throws Exception - { - _bufferedMessages--; - nextFilter.messageSent(session, message); - } - - public int getBufferedMessages() - { - return _bufferedMessages; - } - - public int getThreshold() - { - return _threshold; - } - - public void setThreshold(int threshold) - { - _threshold = threshold; - } - - public long getOutputFrequencyInMillis() - { - return _outputFrequencyInMillis; - } - - public void setOutputFrequencyInMillis(long outputFrequencyInMillis) - { - _outputFrequencyInMillis = outputFrequencyInMillis; - } - - public long getLastMessageOutputTime() - { - return _lastMessageOutputTime; - } -} diff --git a/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java b/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java index fec5af55c1..b480f56c07 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java @@ -36,6 +36,7 @@ import javax.jms.Queue; import javax.jms.Topic; import javax.naming.Context; import javax.naming.NamingException; +import javax.naming.ConfigurationException; import javax.naming.spi.InitialContextFactory; import org.apache.qpid.client.AMQConnectionFactory; @@ -139,7 +140,7 @@ public class PropertiesFileInitialContextFactory implements InitialContextFactor return new ReadOnlyContext(environment, data); } - protected void createConnectionFactories(Map data, Hashtable environment) + protected void createConnectionFactories(Map data, Hashtable environment) throws ConfigurationException { for (Iterator iter = environment.entrySet().iterator(); iter.hasNext();) { @@ -157,7 +158,7 @@ public class PropertiesFileInitialContextFactory implements InitialContextFactor } } - protected void createDestinations(Map data, Hashtable environment) + protected void createDestinations(Map data, Hashtable environment) throws ConfigurationException { for (Iterator iter = environment.entrySet().iterator(); iter.hasNext();) { @@ -225,7 +226,7 @@ public class PropertiesFileInitialContextFactory implements InitialContextFactor /** * Factory method to create new Connection Factory instances */ - protected ConnectionFactory createFactory(String url) + protected ConnectionFactory createFactory(String url) throws ConfigurationException { try { @@ -233,16 +234,18 @@ public class PropertiesFileInitialContextFactory implements InitialContextFactor } catch (URLSyntaxException urlse) { - _logger.warn("Unable to createFactories:" + urlse); - } + _logger.warn("Unable to create factory:" + urlse); - return null; + ConfigurationException ex = new ConfigurationException("Failed to parse entry: " + urlse + " due to : " + urlse.getMessage()); + ex.initCause(urlse); + throw ex; + } } /** * Factory method to create new Destination instances from an AMQP BindingURL */ - protected Destination createDestination(String str) + protected Destination createDestination(String str) throws ConfigurationException { try { @@ -252,7 +255,9 @@ public class PropertiesFileInitialContextFactory implements InitialContextFactor { _logger.warn("Unable to create destination:" + e, e); - return null; + ConfigurationException ex = new ConfigurationException("Failed to parse entry: " + str + " due to : " + e.getMessage()); + ex.initCause(e); + throw ex; } } diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/MockIoSession.java b/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/MockIoSession.java deleted file mode 100644 index f0938a4bc0..0000000000 --- a/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/MockIoSession.java +++ /dev/null @@ -1,312 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.client.protocol; - -import org.apache.mina.common.*; -import org.apache.mina.common.support.DefaultCloseFuture; -import org.apache.mina.common.support.DefaultWriteFuture; -import org.apache.mina.common.support.AbstractIoFilterChain; -import org.apache.qpid.client.protocol.AMQProtocolSession; - -import java.net.SocketAddress; -import java.net.InetSocketAddress; -import java.util.Set; - -public class MockIoSession implements IoSession -{ - private AMQProtocolSession _protocolSession; - - /** - * Stores the last response written - */ - private Object _lastWrittenObject; - - private boolean _closing; - private IoFilterChain _filterChain; - - public MockIoSession() - { - _filterChain = new AbstractIoFilterChain(this) - { - protected void doWrite(IoSession ioSession, IoFilter.WriteRequest writeRequest) throws Exception - { - - } - - protected void doClose(IoSession ioSession) throws Exception - { - - } - }; - } - - public Object getLastWrittenObject() - { - return _lastWrittenObject; - } - - public IoService getService() - { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - public IoServiceConfig getServiceConfig() - { - return null; - } - - public IoHandler getHandler() - { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - public IoSessionConfig getConfig() - { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - public IoFilterChain getFilterChain() - { - return _filterChain; - } - - public WriteFuture write(Object message) - { - WriteFuture wf = new DefaultWriteFuture(null); - _lastWrittenObject = message; - return wf; - } - - public CloseFuture close() - { - _closing = true; - CloseFuture cf = new DefaultCloseFuture(null); - cf.setClosed(); - return cf; - } - - public Object getAttachment() - { - return _protocolSession; - } - - public Object setAttachment(Object attachment) - { - Object current = _protocolSession; - _protocolSession = (AMQProtocolSession) attachment; - return current; - } - - public Object getAttribute(String key) - { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - public Object setAttribute(String key, Object value) - { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - public Object setAttribute(String key) - { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - public Object removeAttribute(String key) - { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - public boolean containsAttribute(String key) - { - return false; //To change body of implemented methods use File | Settings | File Templates. - } - - public Set getAttributeKeys() - { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - public TransportType getTransportType() - { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - public boolean isConnected() - { - return false; //To change body of implemented methods use File | Settings | File Templates. - } - - public boolean isClosing() - { - return _closing; - } - - public CloseFuture getCloseFuture() - { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - public SocketAddress getRemoteAddress() - { - return new InetSocketAddress("127.0.0.1", 1234); //To change body of implemented methods use File | Settings | File Templates. - } - - public SocketAddress getLocalAddress() - { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - public SocketAddress getServiceAddress() - { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - public int getIdleTime(IdleStatus status) - { - return 0; //To change body of implemented methods use File | Settings | File Templates. - } - - public long getIdleTimeInMillis(IdleStatus status) - { - return 0; //To change body of implemented methods use File | Settings | File Templates. - } - - public void setIdleTime(IdleStatus status, int idleTime) - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public int getWriteTimeout() - { - return 0; //To change body of implemented methods use File | Settings | File Templates. - } - - public long getWriteTimeoutInMillis() - { - return 0; //To change body of implemented methods use File | Settings | File Templates. - } - - public void setWriteTimeout(int writeTimeout) - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public TrafficMask getTrafficMask() - { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - public void setTrafficMask(TrafficMask trafficMask) - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public void suspendRead() - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public void suspendWrite() - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public void resumeRead() - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public void resumeWrite() - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public long getReadBytes() - { - return 0; //To change body of implemented methods use File | Settings | File Templates. - } - - public long getWrittenBytes() - { - return 0; //To change body of implemented methods use File | Settings | File Templates. - } - - public long getReadMessages() - { - return 0L; - } - - public long getWrittenMessages() - { - return 0L; - } - - public long getWrittenWriteRequests() - { - return 0; //To change body of implemented methods use File | Settings | File Templates. - } - - public int getScheduledWriteRequests() - { - return 0; //To change body of implemented methods use File | Settings | File Templates. - } - - public int getScheduledWriteBytes() - { - return 0; //TODO - } - - public long getCreationTime() - { - return 0; //To change body of implemented methods use File | Settings | File Templates. - } - - public long getLastIoTime() - { - return 0; //To change body of implemented methods use File | Settings | File Templates. - } - - public long getLastReadTime() - { - return 0; //To change body of implemented methods use File | Settings | File Templates. - } - - public long getLastWriteTime() - { - return 0; //To change body of implemented methods use File | Settings | File Templates. - } - - public boolean isIdle(IdleStatus status) - { - return false; //To change body of implemented methods use File | Settings | File Templates. - } - - public int getIdleCount(IdleStatus status) - { - return 0; //To change body of implemented methods use File | Settings | File Templates. - } - - public long getLastIdleTime(IdleStatus status) - { - return 0; //To change body of implemented methods use File | Settings | File Templates. - } -} diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/jndi/JNDIPropertyFileTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/jndi/JNDIPropertyFileTest.java index a1b14d5723..2052312f54 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/jndi/JNDIPropertyFileTest.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/jndi/JNDIPropertyFileTest.java @@ -24,6 +24,7 @@ import java.util.Properties; import javax.jms.Queue; import javax.jms.Topic; +import javax.naming.ConfigurationException; import javax.naming.Context; import javax.naming.InitialContext; @@ -67,4 +68,22 @@ public class JNDIPropertyFileTest extends TestCase assertEquals("Topic" + i + "WithSpace",bindingKey.asString()); } } + + public void testConfigurationErrors() throws Exception + { + Properties properties = new Properties(); + properties.put("java.naming.factory.initial", "org.apache.qpid.jndi.PropertiesFileInitialContextFactory"); + properties.put("destination.my-queue","amq.topic/test;create:always}"); + + try + { + ctx = new InitialContext(properties); + fail("A configuration exception should be thrown with details about the address syntax error"); + } + catch(ConfigurationException e) + { + assertTrue("Incorrect exception", e.getMessage().contains("Failed to parse entry: amq.topic/test;create:always}")); + } + + } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java index 591dbd085b..c81af9760b 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java @@ -20,9 +20,6 @@ */ package org.apache.qpid.codec; -import org.apache.mina.filter.codec.ProtocolCodecFactory; -import org.apache.mina.filter.codec.ProtocolDecoder; -import org.apache.mina.filter.codec.ProtocolEncoder; import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; /** @@ -31,14 +28,11 @@ import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; * *

*
CRC Card
Responsibilities Collaborations. - *
Supply the protocol encoder. {@link AMQEncoder} *
Supply the protocol decoder. {@link AMQDecoder} *
*/ -public class AMQCodecFactory implements ProtocolCodecFactory +public class AMQCodecFactory { - /** Holds the protocol encoder. */ - private final AMQEncoder _encoder = new AMQEncoder(); /** Holds the protocol decoder. */ private final AMQDecoder _frameDecoder; @@ -56,15 +50,6 @@ public class AMQCodecFactory implements ProtocolCodecFactory _frameDecoder = new AMQDecoder(expectProtocolInitiation, session); } - /** - * Gets the AMQP encoder. - * - * @return The AMQP encoder. - */ - public ProtocolEncoder getEncoder() - { - return _encoder; - } /** * Gets the AMQP decoder. diff --git a/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java b/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java index 281c0761d9..7732ff2fd5 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java @@ -23,10 +23,7 @@ package org.apache.qpid.codec; import java.util.ArrayList; import org.apache.mina.common.ByteBuffer; -import org.apache.mina.common.IoSession; import org.apache.mina.common.SimpleByteBufferAllocator; -import org.apache.mina.filter.codec.CumulativeProtocolDecoder; -import org.apache.mina.filter.codec.ProtocolDecoderOutput; import org.apache.qpid.framing.AMQDataBlock; import org.apache.qpid.framing.AMQDataBlockDecoder; @@ -54,11 +51,8 @@ import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; * @todo If protocol initiation decoder not needed, then don't create it. Probably not a big deal, but it adds to the * per-session overhead. */ -public class AMQDecoder extends CumulativeProtocolDecoder +public class AMQDecoder { - - private static final String BUFFER = AMQDecoder.class.getName() + ".Buffer"; - /** Holds the 'normal' AMQP data decoder. */ private AMQDataBlockDecoder _dataBlockDecoder = new AMQDataBlockDecoder(); @@ -84,98 +78,7 @@ public class AMQDecoder extends CumulativeProtocolDecoder _bodyFactory = new AMQMethodBodyFactory(session); } - /** - * Delegates decoding AMQP from the data buffer that Mina has retrieved from the wire, to the data or protocol - * intiation decoders. - * - * @param session The Mina session. - * @param in The raw byte buffer. - * @param out The Mina object output gatherer to write decoded objects to. - * - * @return true if the data was decoded, false if more is needed and the data should accumulate. - * - * @throws Exception If the data cannot be decoded for any reason. - */ - protected boolean doDecode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception - { - - boolean decoded; - if (_expectProtocolInitiation - || (firstDecode - && (in.remaining() > 0) - && (in.get(in.position()) == (byte)'A'))) - { - decoded = doDecodePI(session, in, out); - } - else - { - decoded = doDecodeDataBlock(session, in, out); - } - if(firstDecode && decoded) - { - firstDecode = false; - } - return decoded; - } - - /** - * Decodes AMQP data, delegating the decoding to an {@link AMQDataBlockDecoder}. - * - * @param session The Mina session. - * @param in The raw byte buffer. - * @param out The Mina object output gatherer to write decoded objects to. - * - * @return true if the data was decoded, false if more is needed and the data should accumulate. - * - * @throws Exception If the data cannot be decoded for any reason. - */ - protected boolean doDecodeDataBlock(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception - { - int pos = in.position(); - boolean enoughData = _dataBlockDecoder.decodable(in.buf()); - in.position(pos); - if (!enoughData) - { - // returning false means it will leave the contents in the buffer and - // call us again when more data has been read - return false; - } - else - { - _dataBlockDecoder.decode(session, in, out); - return true; - } - } - - /** - * Decodes an AMQP initiation, delegating the decoding to a {@link ProtocolInitiation.Decoder}. - * - * @param session The Mina session. - * @param in The raw byte buffer. - * @param out The Mina object output gatherer to write decoded objects to. - * - * @return true if the data was decoded, false if more is needed and the data should accumulate. - * - * @throws Exception If the data cannot be decoded for any reason. - */ - private boolean doDecodePI(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception - { - boolean enoughData = _piDecoder.decodable(in.buf()); - if (!enoughData) - { - // returning false means it will leave the contents in the buffer and - // call us again when more data has been read - return false; - } - else - { - ProtocolInitiation pi = new ProtocolInitiation(in.buf()); - out.write(pi); - - return true; - } - } /** * Sets the protocol initation flag, that determines whether decoding is handled by the data decoder of the protocol @@ -190,97 +93,8 @@ public class AMQDecoder extends CumulativeProtocolDecoder } - /** - * Cumulates content of in into internal buffer and forwards - * decoding request to {@link #doDecode(IoSession, ByteBuffer, ProtocolDecoderOutput)}. - * doDecode() is invoked repeatedly until it returns false - * and the cumulative buffer is compacted after decoding ends. - * - * @throws IllegalStateException if your doDecode() returned - * true not consuming the cumulative buffer. - */ - public void decode( IoSession session, ByteBuffer in, - ProtocolDecoderOutput out ) throws Exception - { - ByteBuffer buf = ( ByteBuffer ) session.getAttribute( BUFFER ); - // if we have a session buffer, append data to that otherwise - // use the buffer read from the network directly - if( buf != null ) - { - buf.put( in ); - buf.flip(); - } - else - { - buf = in; - } - - for( ;; ) - { - int oldPos = buf.position(); - boolean decoded = doDecode( session, buf, out ); - if( decoded ) - { - if( buf.position() == oldPos ) - { - throw new IllegalStateException( - "doDecode() can't return true when buffer is not consumed." ); - } - - if( !buf.hasRemaining() ) - { - break; - } - } - else - { - break; - } - } - - // if there is any data left that cannot be decoded, we store - // it in a buffer in the session and next time this decoder is - // invoked the session buffer gets appended to - if ( buf.hasRemaining() ) - { - storeRemainingInSession( buf, session ); - } - else - { - removeSessionBuffer( session ); - } - } - - /** - * Releases the cumulative buffer used by the specified session. - * Please don't forget to call super.dispose( session ) when - * you override this method. - */ - public void dispose( IoSession session ) throws Exception - { - removeSessionBuffer( session ); - } - - private void removeSessionBuffer(IoSession session) - { - ByteBuffer buf = ( ByteBuffer ) session.getAttribute( BUFFER ); - if( buf != null ) - { - buf.release(); - session.removeAttribute( BUFFER ); - } - } - private static final SimpleByteBufferAllocator SIMPLE_BYTE_BUFFER_ALLOCATOR = new SimpleByteBufferAllocator(); - private void storeRemainingInSession(ByteBuffer buf, IoSession session) - { - ByteBuffer remainingBuf = SIMPLE_BYTE_BUFFER_ALLOCATOR.allocate( buf.remaining(), false ); - remainingBuf.setAutoExpand( true ); - remainingBuf.put( buf ); - session.setAttribute( BUFFER, remainingBuf ); - } - public ArrayList decodeBuffer(java.nio.ByteBuffer buf) throws AMQFrameDecodingException, AMQProtocolVersionException { diff --git a/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQEncoder.java b/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQEncoder.java deleted file mode 100644 index 53f48ae1c8..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQEncoder.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.codec; - -import org.apache.mina.common.IoSession; -import org.apache.mina.filter.codec.ProtocolEncoder; -import org.apache.mina.filter.codec.ProtocolEncoderOutput; - -import org.apache.qpid.framing.AMQDataBlockEncoder; - -/** - * AMQEncoder delegates encoding of AMQP to a data encoder. - * - *

- *
CRC Card
Responsibilities Collaborations - *
Delegate AMQP encoding. {@link AMQDataBlockEncoder} - *
- * - * @todo This class just delegates to another, so seems to be pointless. Unless it is going to handle some - * responsibilities in the future, then drop it. - */ -public class AMQEncoder implements ProtocolEncoder -{ - /** The data encoder that is delegated to. */ - private AMQDataBlockEncoder _dataBlockEncoder = new AMQDataBlockEncoder(); - - /** - * Encodes AMQP. - * - * @param session The Mina session. - * @param message The data object to encode. - * @param out The Mina writer to output the raw byte data to. - * - * @throws Exception If the data cannot be encoded for any reason. - */ - public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception - { - _dataBlockEncoder.encode(session, message, out); - } - - /** - * Does nothing. Called by Mina to allow this to clean up resources when it is no longer needed. - * - * @param session The Mina session. - */ - public void dispose(IoSession session) - { } -} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java index 228867b2b0..0187fa96a9 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java @@ -21,17 +21,12 @@ package org.apache.qpid.framing; import org.apache.mina.common.ByteBuffer; -import org.apache.mina.common.IoSession; -import org.apache.mina.filter.codec.ProtocolDecoderOutput; - -import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class AMQDataBlockDecoder { - private static final String SESSION_METHOD_BODY_FACTORY = "QPID_SESSION_METHOD_BODY_FACTORY"; private static final BodyFactory[] _bodiesSupported = new BodyFactory[Byte.MAX_VALUE]; @@ -106,19 +101,6 @@ public class AMQDataBlockDecoder return frame; } - public void decode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception - { - AMQMethodBodyFactory bodyFactory = (AMQMethodBodyFactory) session.getAttribute(SESSION_METHOD_BODY_FACTORY); - if (bodyFactory == null) - { - AMQVersionAwareProtocolSession protocolSession = (AMQVersionAwareProtocolSession) session.getAttachment(); - bodyFactory = new AMQMethodBodyFactory(protocolSession); - session.setAttribute(SESSION_METHOD_BODY_FACTORY, bodyFactory); - } - - out.write(createAndPopulateFrame(bodyFactory, in)); - } - public boolean decodable(ByteBuffer msg) throws AMQFrameDecodingException { return decodable(msg.buf()); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java index 374644b4f2..d3b8ecf8bd 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java @@ -21,9 +21,6 @@ package org.apache.qpid.framing; import org.apache.mina.common.ByteBuffer; -import org.apache.mina.common.IoSession; -import org.apache.mina.filter.codec.ProtocolEncoderOutput; -import org.apache.mina.filter.codec.demux.MessageEncoder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,7 +28,7 @@ import org.slf4j.LoggerFactory; import java.util.Collections; import java.util.Set; -public final class AMQDataBlockEncoder implements MessageEncoder +public final class AMQDataBlockEncoder { private static final Logger _logger = LoggerFactory.getLogger(AMQDataBlockEncoder.class); @@ -40,19 +37,6 @@ public final class AMQDataBlockEncoder implements MessageEncoder public AMQDataBlockEncoder() { } - public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception - { - final AMQDataBlock frame = (AMQDataBlock) message; - - final ByteBuffer buffer = frame.toByteBuffer(); - - if (_logger.isDebugEnabled()) - { - _logger.debug("Encoded frame byte-buffer is '" + EncodingUtils.convertToHexString(buffer) + "'"); - } - - out.write(buffer); - } public Set getMessageTypes() { diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java index 30db3b8be7..7526d4c756 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java @@ -138,4 +138,15 @@ public class ContentHeaderBody implements AMQBody { properties = props; } + + @Override + public String toString() + { + return "ContentHeaderBody{" + + "classId=" + classId + + ", weight=" + weight + + ", bodySize=" + bodySize + + ", properties=" + properties + + '}'; + } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java b/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java deleted file mode 100644 index 82b600de88..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java +++ /dev/null @@ -1,253 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.pool; - -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * A Job is a continuation that batches together other continuations, specifically {@link Event}s, into one continuation. - * The {@link Event}s themselves provide methods to process themselves, so processing a job simply consists of sequentially - * processing all of its aggregated events. - * - * The constructor accepts a maximum number of events for the job, and only runs up to that maximum number when - * processing the job, but the add method does not enforce this maximum. In other words, not all the enqueued events - * may be processed in each run of the job, several runs may be required to clear the queue. - * - *

- *
CRC Card
Responsibilities Collaborations - *
Aggregate many coninuations together into a single continuation. - *
Sequentially process aggregated continuations. {@link Event} - *
Provide running and completion status of the aggregate continuation. - *
Execute a terminal continuation upon job completion. {@link JobCompletionHandler} - *
- * - * @todo Could make Job implement Runnable, FutureTask, or a custom Continuation interface, to clarify its status as a - * continuation. Job is a continuation that aggregates other continuations and as such is a usefull re-usable - * piece of code. There may be other palces than the mina filter chain where continuation batching is used within - * qpid, so abstracting this out could provide a usefull building block. This also opens the way to different - * kinds of job with a common interface, e.g. parallel or sequential jobs etc. - * - * @todo For better re-usability could make the completion handler optional. Only run it when one is set. - */ -public class Job implements ReadWriteRunnable -{ - - /** Defines the maximum number of events that will be batched into a single job. */ - public static final int MAX_JOB_EVENTS = Integer.getInteger("amqj.server.read_write_pool.max_events", 10); - - /** The maximum number of events to process per run of the job. More events than this may be queued in the job. */ - private final int _maxEvents; - - /** Holds the queue of events that make up the job. */ - private final java.util.Queue _eventQueue = new ConcurrentLinkedQueue(); - - /** Holds a status flag, that indicates when the job is actively running. */ - private final AtomicBoolean _active = new AtomicBoolean(); - - private final boolean _readJob; - - private ReferenceCountingExecutorService _poolReference; - - private final static Logger _logger = LoggerFactory.getLogger(Job.class); - - public Job(ReferenceCountingExecutorService poolReference, int maxEvents, boolean readJob) - { - _poolReference = poolReference; - _maxEvents = maxEvents; - _readJob = readJob; - } - - /** - * Enqueus a continuation for sequential processing by this job. - * - * @param evt The continuation to enqueue. - */ - public void add(Runnable evt) - { - _eventQueue.add(evt); - } - - /** - * Sequentially processes, up to the maximum number per job, the aggregated continuations in enqueued in this job. - */ - boolean processAll() - { - // limit the number of events processed in one run - int i = _maxEvents; - while( --i != 0 ) - { - Runnable e = _eventQueue.poll(); - if (e == null) - { - return true; - } - else - { - e.run(); - } - } - return false; - } - - /** - * Tests if there are no more enqueued continuations to process. - * - * @return true if there are no enqueued continuations in this job, false otherwise. - */ - public boolean isComplete() - { - return _eventQueue.peek() == null; - } - - /** - * Marks this job as active if it is inactive. This method is thread safe. - * - * @return true if this job was inactive and has now been marked as active, false otherwise. - */ - public boolean activate() - { - return _active.compareAndSet(false, true); - } - - /** - * Marks this job as inactive. This method is thread safe. - */ - public void deactivate() - { - _active.set(false); - } - - /** - * Processes a batch of aggregated continuations, marks this job as inactive and call the terminal continuation. - */ - public void run() - { - if(processAll()) - { - deactivate(); - completed(); - } - else - { - notCompleted(); - } - } - - public boolean isRead() - { - return _readJob; - } - - /** - * Adds an {@link Event} to a {@link Job}, triggering the execution of the job if it is not already running. - * - * @param job The job. - * @param event The event to hand off asynchronously. - */ - public static void fireAsynchEvent(ExecutorService pool, Job job, Runnable event) - { - - job.add(event); - - - if(pool == null) - { - return; - } - - // rather than perform additional checks on pool to check that it hasn't shutdown. - // catch the RejectedExecutionException that will result from executing on a shutdown pool - if (job.activate()) - { - try - { - pool.execute(job); - } - catch(RejectedExecutionException e) - { - _logger.warn("Thread pool shutdown while tasks still outstanding"); - } - } - - } - - /** - * Implements a terminal continuation for the {@link Job} for this filter. Whenever the Job completes its processing - * of a batch of events this is called. This method simply re-activates the job, if it has more events to process. - * - * @param session The Mina session to work in. - * @param job The job that completed. - */ - public void completed() - { - if (!isComplete()) - { - final ExecutorService pool = _poolReference.getPool(); - - if(pool == null) - { - return; - } - - - // ritchiem : 2006-12-13 Do we need to perform the additional checks here? - // Can the pool be shutdown at this point? - if (activate()) - { - try - { - pool.execute(this); - } - catch(RejectedExecutionException e) - { - _logger.warn("Thread pool shutdown while tasks still outstanding"); - } - - } - } - } - - public void notCompleted() - { - final ExecutorService pool = _poolReference.getPool(); - - if(pool == null) - { - return; - } - - try - { - pool.execute(this); - } - catch(RejectedExecutionException e) - { - _logger.warn("Thread pool shutdown while tasks still outstanding"); - } - } - -} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java b/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java index f0f2652ce3..f9f6ca8444 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java @@ -80,7 +80,7 @@ public final class AMQConstant /** * An operator intervened to close the connection for some reason. The client may retry at some later date. */ - public static final AMQConstant CONTEXT_IN_USE = new AMQConstant(320, "context in use", true); + public static final AMQConstant CONNECTION_FORCED = new AMQConstant(320, "connection forced", true); /** The client tried to work with an unknown virtual host or cluster. */ public static final AMQConstant INVALID_PATH = new AMQConstant(402, "invalid path", true); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java index 48a3df734a..fd651a2b66 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java @@ -21,8 +21,11 @@ package org.apache.qpid.protocol; import java.net.SocketAddress; +import java.nio.ByteBuffer; import org.apache.qpid.transport.Receiver; +import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.network.NetworkConnection; /** * A ProtocolEngine is a Receiver for java.nio.ByteBuffers. It takes the data passed to it in the received @@ -53,4 +56,6 @@ public interface ProtocolEngine extends Receiver void readerIdle(); + public void setNetworkConnection(NetworkConnection network, Sender sender); + } \ No newline at end of file diff --git a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java index 4e40b78440..7378edff0c 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java @@ -26,6 +26,6 @@ public interface ProtocolEngineFactory { // Returns a new instance of a ProtocolEngine - ProtocolEngine newProtocolEngine(NetworkConnection network); + ProtocolEngine newProtocolEngine(); } \ No newline at end of file diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java index c8b7ad2a5e..e421f06901 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java @@ -210,10 +210,7 @@ public class ClientDelegate extends ConnectionDelegate } else if (sc.getMechanismName().equals("EXTERNAL")) { - if (conn.getSecurityLayer() != null) - { - conn.setUserID(conn.getSecurityLayer().getUserID()); - } + conn.setUserID(conn.getSecurityLayer().getUserID()); } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java index 82a6cdaa67..d6ddbaa061 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java @@ -47,6 +47,7 @@ import org.apache.qpid.transport.network.NetworkConnection; import org.apache.qpid.transport.network.OutgoingNetworkTransport; import org.apache.qpid.transport.network.Transport; import org.apache.qpid.transport.network.security.SecurityLayer; +import org.apache.qpid.transport.network.security.SecurityLayerFactory; import org.apache.qpid.transport.util.Logger; import org.apache.qpid.transport.util.Waiter; import org.apache.qpid.util.Strings; @@ -72,6 +73,7 @@ public class Connection extends ConnectionInvoker public static final int MAX_CHANNEL_MAX = 0xFFFF; public static final int MIN_USABLE_CHANNEL_NUM = 0; + public enum State { NEW, CLOSED, OPENING, OPEN, CLOSING, CLOSE_RCVD, RESUMING } static class DefaultConnectionListener implements ConnectionListener @@ -124,9 +126,9 @@ public class Connection extends ConnectionInvoker private ConnectionSettings conSettings; private SecurityLayer securityLayer; private String _clientId; - + private final AtomicBoolean connectionLost = new AtomicBoolean(false); - + public Connection() {} public void setConnectionDelegate(ConnectionDelegate delegate) @@ -239,12 +241,22 @@ public class Connection extends ConnectionInvoker userID = settings.getUsername(); delegate = new ClientDelegate(settings); - securityLayer = new SecurityLayer(this); + securityLayer = SecurityLayerFactory.newInstance(getConnectionSettings()); OutgoingNetworkTransport transport = Transport.getOutgoingTransportInstance(ProtocolVersion.v0_10); - Receiver receiver = securityLayer.receiver(new InputHandler(new Assembler(this))); - NetworkConnection network = transport.connect(settings, receiver, null); - sender = new Disassembler(securityLayer.sender(network.getSender()), settings.getMaxFrameSize()); + Receiver secureReceiver = securityLayer.receiver(new InputHandler(new Assembler(this))); + if(secureReceiver instanceof ConnectionListener) + { + addConnectionListener((ConnectionListener)secureReceiver); + } + + NetworkConnection network = transport.connect(settings, secureReceiver, null); + final Sender secureSender = securityLayer.sender(network.getSender()); + if(secureSender instanceof ConnectionListener) + { + addConnectionListener((ConnectionListener)secureSender); + } + sender = new Disassembler(secureSender, settings.getMaxFrameSize()); send(new ProtocolHeader(1, 0, 10)); @@ -326,14 +338,14 @@ public class Connection extends ConnectionInvoker Waiter w = new Waiter(lock, timeout); while (w.hasTime() && state != OPEN && error == null) { - w.await(); + w.await(); } - + if (state != OPEN) { throw new ConnectionException("Timed out waiting for connection to be ready. Current state is :" + state); } - + Session ssn = _sessionFactory.newSession(this, name, expiry); sessions.put(name, ssn); map(ssn); @@ -475,13 +487,13 @@ public class Connection extends ConnectionInvoker ssn.setState(Session.State.CLOSED); } else - { + { map(ssn); ssn.attach(); ssn.resume(); } } - + for (Binary ssn_name : transactedSessions) { sessions.remove(ssn_name); @@ -572,12 +584,12 @@ public class Connection extends ConnectionInvoker { close(ConnectionCloseCode.NORMAL, null); } - + public void mgmtClose() { close(ConnectionCloseCode.CONNECTION_FORCED, "The connection was closed using the broker's management interface."); } - + public void close(ConnectionCloseCode replyCode, String replyText, Option ... _options) { synchronized (lock) @@ -680,12 +692,12 @@ public class Connection extends ConnectionInvoker { return conSettings; } - + public SecurityLayer getSecurityLayer() { return securityLayer; } - + public boolean isConnectionResuming() { return connectionLost.get(); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/SenderClosedException.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/SenderClosedException.java new file mode 100644 index 0000000000..924c327861 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/SenderClosedException.java @@ -0,0 +1,52 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport; + + +/** + * SenderClosedException + * + */ + +public class SenderClosedException extends SenderException +{ + + public SenderClosedException(String message, Throwable cause) + { + super(message, cause); + } + + public SenderClosedException(String message) + { + super(message); + } + + public SenderClosedException(Throwable cause) + { + super(cause); + } + + public void rethrow() + { + throw new SenderClosedException(getMessage(), this); + } + +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java index 1f69973b96..7384702525 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java @@ -29,6 +29,8 @@ public interface NetworkConnection { Sender getSender(); + void start(); + void close(); /** diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java index 4610c2351e..f71d39c381 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java @@ -27,6 +27,4 @@ package org.apache.qpid.transport.network; public interface NetworkTransport { public void close(); - - public NetworkConnection getConnection(); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java index 0bae46e8eb..c3c248761c 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java @@ -29,5 +29,7 @@ import org.apache.qpid.transport.Receiver; public interface OutgoingNetworkTransport extends NetworkTransport { + public NetworkConnection getConnection(); + public NetworkConnection connect(ConnectionSettings settings, Receiver delegate, SSLContext sslContext); } \ No newline at end of file diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java index 2c10a30f10..da4349ba86 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java @@ -37,7 +37,6 @@ public class Transport public static final String QPID_BROKER_TRANSPORT_PROPNAME = "qpid.broker.transport"; // Can't reference the class directly here, as this would preclude the ability to bundle transports separately. - private static final String MINA_TRANSPORT_CLASSNAME = "org.apache.qpid.transport.network.mina.MinaNetworkTransport"; private static final String IO_TRANSPORT_CLASSNAME = "org.apache.qpid.transport.network.io.IoNetworkTransport"; public static final String TCP = "tcp"; @@ -47,9 +46,9 @@ public class Transport static { final Map map = new HashMap(); - map.put(ProtocolVersion.v8_0, MINA_TRANSPORT_CLASSNAME); - map.put(ProtocolVersion.v0_9, MINA_TRANSPORT_CLASSNAME); - map.put(ProtocolVersion.v0_91, MINA_TRANSPORT_CLASSNAME); + map.put(ProtocolVersion.v8_0, IO_TRANSPORT_CLASSNAME); + map.put(ProtocolVersion.v0_9, IO_TRANSPORT_CLASSNAME); + map.put(ProtocolVersion.v0_91, IO_TRANSPORT_CLASSNAME); map.put(ProtocolVersion.v0_10, IO_TRANSPORT_CLASSNAME); OUTGOING_PROTOCOL_TO_IMPLDEFAULTS_MAP = Collections.unmodifiableMap(map); @@ -58,7 +57,7 @@ public class Transport public static IncomingNetworkTransport getIncomingTransportInstance() { return (IncomingNetworkTransport) loadTransportClass( - System.getProperty(QPID_BROKER_TRANSPORT_PROPNAME, MINA_TRANSPORT_CLASSNAME)); + System.getProperty(QPID_BROKER_TRANSPORT_PROPNAME, IO_TRANSPORT_CLASSNAME)); } public static OutgoingNetworkTransport getOutgoingTransportInstance( diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java index cca1fc46c9..52cc6363a9 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java @@ -37,7 +37,7 @@ public class IoNetworkConnection implements NetworkConnection private final long _timeout; private final IoSender _ioSender; private final IoReceiver _ioReceiver; - + public IoNetworkConnection(Socket socket, Receiver delegate, int sendBufferSize, int receiveBufferSize, long timeout) { @@ -45,9 +45,15 @@ public class IoNetworkConnection implements NetworkConnection _timeout = timeout; _ioReceiver = new IoReceiver(_socket, delegate, receiveBufferSize,_timeout); + _ioSender = new IoSender(_socket, 2 * sendBufferSize, _timeout); + _ioSender.registerCloseListener(_ioReceiver); + } + + public void start() + { _ioReceiver.initiate(); _ioSender.initiate(); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java index 30e2856c59..7f0f04f9c4 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java @@ -21,22 +21,21 @@ package org.apache.qpid.transport.network.io; import java.io.IOException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.Socket; -import java.net.SocketException; +import java.net.*; import java.nio.ByteBuffer; import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLServerSocketFactory; -import org.apache.qpid.transport.ConnectionSettings; -import org.apache.qpid.transport.Receiver; -import org.apache.qpid.transport.TransportException; +import org.apache.qpid.protocol.ProtocolEngine; +import org.apache.qpid.protocol.ProtocolEngineFactory; +import org.apache.qpid.transport.*; +import org.apache.qpid.transport.network.IncomingNetworkTransport; import org.apache.qpid.transport.network.NetworkConnection; import org.apache.qpid.transport.network.OutgoingNetworkTransport; import org.apache.qpid.transport.util.Logger; -public class IoNetworkTransport implements OutgoingNetworkTransport +public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNetworkTransport { static { @@ -51,12 +50,13 @@ public class IoNetworkTransport implements OutgoingNetworkTransport private Socket _socket; private IoNetworkConnection _connection; private long _timeout = 60000; - + private AcceptingThread _acceptor; + public NetworkConnection connect(ConnectionSettings settings, Receiver delegate, SSLContext sslContext) { int sendBufferSize = settings.getWriteBufferSize(); int receiveBufferSize = settings.getReadBufferSize(); - + try { _socket = new Socket(); @@ -84,6 +84,7 @@ public class IoNetworkTransport implements OutgoingNetworkTransport try { _connection = new IoNetworkConnection(_socket, delegate, sendBufferSize, receiveBufferSize, _timeout); + _connection.start(); } catch(Exception e) { @@ -104,11 +105,134 @@ public class IoNetworkTransport implements OutgoingNetworkTransport public void close() { - _connection.close(); + if(_connection != null) + { + _connection.close(); + } + if(_acceptor != null) + { + _acceptor.close(); + } } public NetworkConnection getConnection() { return _connection; } + + public void accept(NetworkTransportConfiguration config, ProtocolEngineFactory factory, SSLContext sslContext) + { + + try + { + _acceptor = new AcceptingThread(config, factory, sslContext); + + _acceptor.start(); + } + catch (IOException e) + { + throw new TransportException("Unable to start server socket", e); + } + + + } + + private class AcceptingThread extends Thread + { + private NetworkTransportConfiguration _config; + private ProtocolEngineFactory _factory; + private SSLContext _sslContent; + private ServerSocket _serverSocket; + + private AcceptingThread(NetworkTransportConfiguration config, + ProtocolEngineFactory factory, + SSLContext sslContext) + throws IOException + { + _config = config; + _factory = factory; + _sslContent = sslContext; + + InetSocketAddress address = new InetSocketAddress(config.getHost(), config.getPort()); + + if(sslContext == null) + { + _serverSocket = new ServerSocket(); + } + else + { + SSLServerSocketFactory socketFactory = sslContext.getServerSocketFactory(); + _serverSocket = socketFactory.createServerSocket(); + } + + _serverSocket.bind(address); + _serverSocket.setReuseAddress(true); + + + } + + + /** + Close the underlying ServerSocket if it has not already been closed. + */ + public void close() + { + if (!_serverSocket.isClosed()) + { + try + { + _serverSocket.close(); + } + catch (IOException e) + { + throw new TransportException(e); + } + } + } + + @Override + public void run() + { + try + { + while (true) + { + try + { + Socket socket = _serverSocket.accept(); + socket.setTcpNoDelay(_config.getTcpNoDelay()); + + final Integer sendBufferSize = _config.getSendBufferSize(); + final Integer receiveBufferSize = _config.getReceiveBufferSize(); + + socket.setSendBufferSize(sendBufferSize); + socket.setReceiveBufferSize(receiveBufferSize); + + ProtocolEngine engine = _factory.newProtocolEngine(); + + NetworkConnection connection = new IoNetworkConnection(socket, engine, sendBufferSize, receiveBufferSize, _timeout); + + + engine.setNetworkConnection(connection, connection.getSender()); + + connection.start(); + + + } + catch(RuntimeException e) + { + LOGGER.error(e, "Error in Acceptor thread " + _config.getPort()); + } + } + } + catch (IOException e) + { + LOGGER.debug(e, "SocketException - no new connections will be accepted on port " + + _config.getPort()); + } + } + + + } + } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java index fea87fc350..d4b5975e54 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java @@ -92,13 +92,23 @@ final class IoReceiver implements Runnable, Closeable { try { - if (shutdownBroken) + try { - socket.close(); + if (shutdownBroken) + { + socket.close(); + } + else + { + socket.shutdownInput(); + } } - else + catch(SocketException se) { - socket.shutdownInput(); + if(!socket.isClosed() && !socket.isInputShutdown()) + { + throw se; + } } if (block && Thread.currentThread() != receiverThread) { @@ -117,6 +127,7 @@ final class IoReceiver implements Runnable, Closeable { throw new TransportException(e); } + } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java index 1bb515624c..473d4d95ff 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java @@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.qpid.common.Closeable; import org.apache.qpid.thread.Threading; import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.SenderClosedException; import org.apache.qpid.transport.SenderException; import org.apache.qpid.transport.TransportException; import org.apache.qpid.transport.util.Logger; @@ -59,7 +60,7 @@ public final class IoSender implements Runnable, Sender private final AtomicBoolean closed = new AtomicBoolean(false); private final Thread senderThread; private final List _listeners = new ArrayList(); - + private volatile Throwable exception = null; public IoSender(Socket socket, int bufferSize, long timeout) @@ -80,13 +81,13 @@ public final class IoSender implements Runnable, Sender try { //Create but deliberately don't start the thread. - senderThread = Threading.getThreadFactory().createThread(this); + senderThread = Threading.getThreadFactory().createThread(this); } catch(Exception e) { throw new Error("Error creating IOSender thread",e); } - + senderThread.setDaemon(true); senderThread.setName(String.format("IoSender - %s", socket.getRemoteSocketAddress())); } @@ -110,7 +111,7 @@ public final class IoSender implements Runnable, Sender { if (closed.get()) { - throw new SenderException("sender is closed", exception); + throw new SenderClosedException("sender is closed", exception); } final int size = buffer.length; @@ -143,7 +144,7 @@ public final class IoSender implements Runnable, Sender if (closed.get()) { - throw new SenderException("sender is closed", exception); + throw new SenderClosedException("sender is closed", exception); } if (head - tail >= size) @@ -255,7 +256,7 @@ public final class IoSender implements Runnable, Sender public void run() { - final int size = buffer.length; + final int size = buffer.length; while (true) { final int hd = head; diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkConnection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkConnection.java deleted file mode 100644 index 0f433f6eeb..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkConnection.java +++ /dev/null @@ -1,81 +0,0 @@ -/* -* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.transport.network.mina; - -import java.net.SocketAddress; -import java.nio.ByteBuffer; - -import org.apache.mina.common.IdleStatus; -import org.apache.mina.common.IoSession; -import org.apache.qpid.transport.Sender; -import org.apache.qpid.transport.network.NetworkConnection; - -public class MinaNetworkConnection implements NetworkConnection -{ - private IoSession _session; - private Sender _sender; - - public MinaNetworkConnection(IoSession session) - { - _session = session; - _sender = new MinaSender(_session); - } - - public Sender getSender() - { - return _sender; - } - - public void close() - { - _session.close(); - } - - public SocketAddress getRemoteAddress() - { - return _session.getRemoteAddress(); - } - - public SocketAddress getLocalAddress() - { - return _session.getLocalAddress(); - } - - public long getReadBytes() - { - return _session.getReadBytes(); - } - - public long getWrittenBytes() - { - return _session.getWrittenBytes(); - } - - public void setMaxWriteIdle(int sec) - { - _session.setIdleTime(IdleStatus.WRITER_IDLE, sec); - } - - public void setMaxReadIdle(int sec) - { - _session.setIdleTime(IdleStatus.READER_IDLE, sec); - } -} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkHandler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkHandler.java deleted file mode 100644 index 0e4492e31b..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkHandler.java +++ /dev/null @@ -1,157 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.transport.network.mina; - -import javax.net.ssl.SSLContext; - -import org.apache.mina.common.ByteBuffer; -import org.apache.mina.common.IdleStatus; -import org.apache.mina.common.IoHandlerAdapter; -import org.apache.mina.common.IoSession; -import org.apache.mina.common.SimpleByteBufferAllocator; -import org.apache.mina.filter.SSLFilter; -import org.apache.mina.util.SessionUtil; -import org.apache.qpid.protocol.ProtocolEngine; -import org.apache.qpid.protocol.ProtocolEngineFactory; -import org.apache.qpid.transport.network.NetworkConnection; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class MinaNetworkHandler extends IoHandlerAdapter -{ - private static final Logger LOGGER = LoggerFactory.getLogger(MinaNetworkHandler.class); - - private ProtocolEngineFactory _factory; - private SSLContext _sslContext = null; - private boolean _useClientMode; - - static - { - boolean directBuffers = Boolean.getBoolean("amqj.enableDirectBuffers"); - LOGGER.debug("Using " + (directBuffers ? "direct" : "heap") + " buffers"); - ByteBuffer.setUseDirectBuffers(directBuffers); - - //override the MINA defaults to prevent use of the PooledByteBufferAllocator - ByteBuffer.setAllocator(new SimpleByteBufferAllocator()); - } - - public MinaNetworkHandler(SSLContext sslContext, ProtocolEngineFactory factory) - { - _sslContext = sslContext; - _factory = factory; - if(_factory == null) - { - _useClientMode = true; - } - } - - public MinaNetworkHandler(SSLContext sslContext) - { - this(sslContext, null); - } - - public void messageReceived(IoSession session, Object message) - { - ProtocolEngine engine = (ProtocolEngine) session.getAttachment(); - ByteBuffer buf = (ByteBuffer) message; - try - { - engine.received(buf.buf()); - } - catch (RuntimeException re) - { - engine.exception(re); - } - } - - public void exceptionCaught(IoSession ioSession, Throwable throwable) throws Exception - { - ProtocolEngine engine = (ProtocolEngine) ioSession.getAttachment(); - if(engine != null) - { - LOGGER.error("Exception caught by Mina", throwable); - engine.exception(throwable); - } - else - { - LOGGER.error("Exception caught by Mina but without protocol engine to handle it", throwable); - } - } - - public void sessionCreated(IoSession ioSession) throws Exception - { - if(LOGGER.isDebugEnabled()) - { - LOGGER.debug("Created session: " + ioSession.getRemoteAddress()); - } - - SessionUtil.initialize(ioSession); - - if (_sslContext != null) - { - SSLFilter sslFilter = new SSLFilter(_sslContext); - sslFilter.setUseClientMode(_useClientMode); - - ioSession.getFilterChain().addFirst("sslFilter",sslFilter); - } - - if (_factory != null) - { - NetworkConnection netConn = new MinaNetworkConnection(ioSession); - - ProtocolEngine engine = _factory.newProtocolEngine(netConn); - ioSession.setAttachment(engine); - } - } - - public void sessionClosed(IoSession ioSession) throws Exception - { - if(LOGGER.isDebugEnabled()) - { - LOGGER.debug("closed: " + ioSession.getRemoteAddress()); - } - - ProtocolEngine engine = (ProtocolEngine) ioSession.getAttachment(); - if(engine != null) - { - engine.closed(); - } - else - { - LOGGER.error("Unable to close ProtocolEngine as none was present"); - } - } - - - public void sessionIdle(IoSession session, IdleStatus status) throws Exception - { - if (IdleStatus.WRITER_IDLE.equals(status)) - { - ((ProtocolEngine) session.getAttachment()).writerIdle(); - } - else if (IdleStatus.READER_IDLE.equals(status)) - { - ((ProtocolEngine) session.getAttachment()).readerIdle(); - } - } - -} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java deleted file mode 100644 index 85b42da2b2..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java +++ /dev/null @@ -1,221 +0,0 @@ -/* -* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.transport.network.mina; - -import static org.apache.qpid.transport.ConnectionSettings.WILDCARD_ADDRESS; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.SocketAddress; - -import javax.net.ssl.SSLContext; - -import org.apache.mina.common.ConnectFuture; -import org.apache.mina.common.ExecutorThreadModel; -import org.apache.mina.common.IoConnector; -import org.apache.mina.common.IoSession; -import org.apache.mina.transport.socket.nio.SocketAcceptor; -import org.apache.mina.transport.socket.nio.SocketAcceptorConfig; -import org.apache.mina.transport.socket.nio.SocketConnector; -import org.apache.mina.transport.socket.nio.SocketConnectorConfig; -import org.apache.mina.transport.socket.nio.SocketSessionConfig; -import org.apache.mina.util.NewThreadExecutor; - -import org.apache.qpid.protocol.ProtocolEngineFactory; -import org.apache.qpid.ssl.SSLContextFactory; -import org.apache.qpid.thread.QpidThreadExecutor; -import org.apache.qpid.transport.ConnectionSettings; -import org.apache.qpid.transport.NetworkTransportConfiguration; -import org.apache.qpid.transport.Receiver; -import org.apache.qpid.transport.SocketConnectorFactory; -import org.apache.qpid.transport.TransportException; -import org.apache.qpid.transport.network.IncomingNetworkTransport; -import org.apache.qpid.transport.network.NetworkConnection; -import org.apache.qpid.transport.network.OutgoingNetworkTransport; -import org.apache.qpid.transport.network.Transport; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class MinaNetworkTransport implements OutgoingNetworkTransport, IncomingNetworkTransport -{ - private static final int UNKNOWN = -1; - private static final int TCP = 0; - - public NetworkConnection _connection; - private SocketAcceptor _acceptor; - private InetSocketAddress _address; - - public NetworkConnection connect(ConnectionSettings settings, - Receiver delegate, SSLContext sslContext) - { - int transport = getTransport(settings.getProtocol()); - - IoConnectorCreator stc; - switch(transport) - { - case TCP: - stc = new IoConnectorCreator(new SocketConnectorFactory() - { - public IoConnector newConnector() - { - return new SocketConnector(1, new QpidThreadExecutor()); // non-blocking connector - } - }); - _connection = stc.connect(delegate, settings, sslContext); - break; - case UNKNOWN: - default: - throw new TransportException("Unknown protocol: " + settings.getProtocol()); - } - - return _connection; - } - - private static int getTransport(String transport) - { - if (transport.equals(Transport.TCP)) - { - return TCP; - } - - return UNKNOWN; - } - - public void close() - { - if(_connection != null) - { - _connection.close(); - } - if (_acceptor != null) - { - _acceptor.unbindAll(); - } - } - - public NetworkConnection getConnection() - { - return _connection; - } - - public void accept(final NetworkTransportConfiguration config, final ProtocolEngineFactory factory, - final SSLContext sslContext) - { - int processors = config.getConnectorProcessors(); - - if (Transport.TCP.equalsIgnoreCase(config.getTransport())) - { - _acceptor = new SocketAcceptor(processors, new NewThreadExecutor()); - - SocketAcceptorConfig sconfig = (SocketAcceptorConfig) _acceptor.getDefaultConfig(); - sconfig.setThreadModel(ExecutorThreadModel.getInstance("MinaNetworkTransport(Acceptor)")); - SocketSessionConfig sc = (SocketSessionConfig) sconfig.getSessionConfig(); - sc.setTcpNoDelay(config.getTcpNoDelay()); - sc.setSendBufferSize(config.getSendBufferSize()); - sc.setReceiveBufferSize(config.getReceiveBufferSize()); - - if (config.getHost().equals(WILDCARD_ADDRESS)) - { - _address = new InetSocketAddress(config.getPort()); - } - else - { - _address = new InetSocketAddress(config.getHost(), config.getPort()); - } - } - else - { - throw new TransportException("Unknown transport: " + config.getTransport()); - } - - try - { - _acceptor.bind(_address, new MinaNetworkHandler(sslContext, factory)); - } - catch (IOException e) - { - throw new TransportException("Could not bind to " + _address, e); - } - } - - - private static class IoConnectorCreator - { - private static final Logger LOGGER = LoggerFactory.getLogger(IoConnectorCreator.class); - - private static final int CLIENT_DEFAULT_BUFFER_SIZE = 32 * 1024; - - private SocketConnectorFactory _ioConnectorFactory; - - public IoConnectorCreator(SocketConnectorFactory socketConnectorFactory) - { - _ioConnectorFactory = socketConnectorFactory; - } - - public NetworkConnection connect(Receiver receiver, ConnectionSettings settings, SSLContext sslContext) - { - final IoConnector ioConnector = _ioConnectorFactory.newConnector(); - final SocketAddress address; - final String protocol = settings.getProtocol(); - final int port = settings.getPort(); - - if (Transport.TCP.equalsIgnoreCase(protocol)) - { - address = new InetSocketAddress(settings.getHost(), port); - } - else - { - throw new TransportException("Unknown transport: " + protocol); - } - - LOGGER.info("Attempting connection to " + address); - - if (ioConnector instanceof SocketConnector) - { - SocketConnectorConfig cfg = (SocketConnectorConfig) ioConnector.getDefaultConfig(); - cfg.setThreadModel(ExecutorThreadModel.getInstance("MinaNetworkTransport(Client)")); - - SocketSessionConfig scfg = (SocketSessionConfig) cfg.getSessionConfig(); - scfg.setTcpNoDelay(true); - scfg.setSendBufferSize(CLIENT_DEFAULT_BUFFER_SIZE); - scfg.setReceiveBufferSize(CLIENT_DEFAULT_BUFFER_SIZE); - - // Don't have the connector's worker thread wait around for other - // connections (we only use one SocketConnector per connection - // at the moment anyway). This allows short-running - // clients (like unit tests) to complete quickly. - ((SocketConnector) ioConnector).setWorkerTimeout(0); - } - - ConnectFuture future = ioConnector.connect(address, new MinaNetworkHandler(sslContext), ioConnector.getDefaultConfig()); - future.join(); - if (!future.isConnected()) - { - throw new TransportException("Could not open connection"); - } - - IoSession session = future.getSession(); - session.setAttachment(receiver); - - return new MinaNetworkConnection(session); - } - } -} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java deleted file mode 100644 index be114e2fa1..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.transport.network.mina; - -import org.apache.mina.common.ByteBuffer; -import org.apache.mina.common.CloseFuture; -import org.apache.mina.common.IoSession; -import org.apache.mina.common.WriteFuture; -import org.apache.qpid.transport.Sender; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * MinaSender - */ -public class MinaSender implements Sender -{ - private static final Logger _log = LoggerFactory.getLogger(MinaSender.class); - - private final IoSession _session; - private WriteFuture _lastWrite; - - public MinaSender(IoSession session) - { - _session = session; - } - - public synchronized void send(java.nio.ByteBuffer msg) - { - _log.debug("sending data:"); - ByteBuffer mina = ByteBuffer.allocate(msg.limit()); - mina.put(msg); - mina.flip(); - _lastWrite = _session.write(mina); - _log.debug("sent data:"); - } - - public synchronized void flush() - { - if (_lastWrite != null) - { - _lastWrite.join(); - } - } - - public void close() - { - // MINA will sometimes throw away in-progress writes when you ask it to close - flush(); - CloseFuture closed = _session.close(); - closed.join(); - } - - public void setIdleTimeout(int i) - { - //TODO: - //We are instead using the setMax[Read|Write]IdleTime methods in - //MinaNetworkConnection for this. Should remove this method from - //sender interface, but currently being used by IoSender for 0-10. - } -} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java index e80f8904a3..9fd65c6e51 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java @@ -27,7 +27,6 @@ import javax.net.ssl.SSLEngine; import org.apache.qpid.ssl.SSLContextFactory; import org.apache.qpid.transport.Connection; -import org.apache.qpid.transport.ConnectionListener; import org.apache.qpid.transport.ConnectionSettings; import org.apache.qpid.transport.Receiver; import org.apache.qpid.transport.Sender; @@ -38,148 +37,12 @@ import org.apache.qpid.transport.network.security.ssl.SSLReceiver; import org.apache.qpid.transport.network.security.ssl.SSLSender; import org.apache.qpid.transport.network.security.ssl.SSLUtil; -public class SecurityLayer +public interface SecurityLayer { - ConnectionSettings settings; - Connection con; - SSLSecurityLayer sslLayer; - SASLSecurityLayer saslLayer; - public SecurityLayer(Connection con) - { - this.con = con; - this.settings = con.getConnectionSettings(); - if (settings.isUseSSL()) - { - sslLayer = new SSLSecurityLayer(); - } - if (settings.isUseSASLEncryption()) - { - saslLayer = new SASLSecurityLayer(); - } - } + public Sender sender(Sender delegate); + public Receiver receiver(Receiver delegate); + public String getUserID(); - public Sender sender(Sender delegate) - { - Sender sender = delegate; - - if (settings.isUseSSL()) - { - sender = sslLayer.sender(sender); - } - - if (settings.isUseSASLEncryption()) - { - sender = saslLayer.sender(sender); - } - - return sender; - } - - public Receiver receiver(Receiver delegate) - { - Receiver receiver = delegate; - - if (settings.isUseSSL()) - { - receiver = sslLayer.receiver(receiver); - } - - if (settings.isUseSASLEncryption()) - { - receiver = saslLayer.receiver(receiver); - } - - return receiver; - } - - public String getUserID() - { - if (settings.isUseSSL()) - { - return sslLayer.getUserID(); - } - else - { - return null; - } - } - - class SSLSecurityLayer - { - final SSLEngine _engine; - final SSLStatus _sslStatus = new SSLStatus(); - - public SSLSecurityLayer() - { - SSLContext sslCtx; - try - { - sslCtx = SSLContextFactory - .buildClientContext(settings.getTrustStorePath(), - settings.getTrustStorePassword(), - settings.getTrustStoreCertType(), - settings.getKeyStorePath(), - settings.getKeyStorePassword(), - settings.getKeyStoreCertType(), - settings.getCertAlias()); - } - catch (Exception e) - { - throw new TransportException("Error creating SSL Context", e); - } - - try - { - _engine = sslCtx.createSSLEngine(); - _engine.setUseClientMode(true); - } - catch(Exception e) - { - throw new TransportException("Error creating SSL Engine", e); - } - } - - public SSLSender sender(Sender delegate) - { - SSLSender sender = new SSLSender(_engine, delegate, _sslStatus); - sender.setConnectionSettings(settings); - return sender; - } - - public SSLReceiver receiver(Receiver delegate) - { - SSLReceiver receiver = new SSLReceiver(_engine, delegate, _sslStatus); - receiver.setConnectionSettings(settings); - return receiver; - } - - public String getUserID() - { - return SSLUtil.retriveIdentity(_engine); - } - - } - - class SASLSecurityLayer - { - public SASLSecurityLayer() - { - } - - public SASLSender sender(Sender delegate) - { - SASLSender sender = new SASLSender(delegate); - con.addConnectionListener((ConnectionListener)sender); - return sender; - } - - public SASLReceiver receiver(Receiver delegate) - { - SASLReceiver receiver = new SASLReceiver(delegate); - con.addConnectionListener((ConnectionListener)receiver); - return receiver; - } - - } } + diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayerFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayerFactory.java new file mode 100644 index 0000000000..17f89c34ef --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayerFactory.java @@ -0,0 +1,161 @@ +package org.apache.qpid.transport.network.security; + +import org.apache.qpid.ssl.SSLContextFactory; +import org.apache.qpid.transport.*; +import org.apache.qpid.transport.network.security.sasl.SASLReceiver; +import org.apache.qpid.transport.network.security.sasl.SASLSender; +import org.apache.qpid.transport.network.security.ssl.SSLReceiver; +import org.apache.qpid.transport.network.security.ssl.SSLSender; +import org.apache.qpid.transport.network.security.ssl.SSLUtil; + +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLEngine; +import java.nio.ByteBuffer; + +public class SecurityLayerFactory +{ + public static SecurityLayer newInstance(ConnectionSettings settings) + { + + SecurityLayer layer = NullSecurityLayer.getInstance(); + + if (settings.isUseSSL()) + { + layer = new SSLSecurityLayer(settings, layer); + } + if (settings.isUseSASLEncryption()) + { + layer = new SASLSecurityLayer(layer); + } + + return layer; + + } + + static class SSLSecurityLayer implements SecurityLayer + { + + private final SSLEngine _engine; + private final SSLStatus _sslStatus = new SSLStatus(); + private String _hostname; + private SecurityLayer _layer; + + + public SSLSecurityLayer(ConnectionSettings settings, SecurityLayer layer) + { + + SSLContext sslCtx; + _layer = layer; + try + { + sslCtx = SSLContextFactory + .buildClientContext(settings.getTrustStorePath(), + settings.getTrustStorePassword(), + settings.getTrustStoreCertType(), + settings.getKeyStorePath(), + settings.getKeyStorePassword(), + settings.getKeyStoreCertType(), + settings.getCertAlias()); + } + catch (Exception e) + { + throw new TransportException("Error creating SSL Context", e); + } + + if(settings.isVerifyHostname()) + { + _hostname = settings.getHost(); + } + + try + { + _engine = sslCtx.createSSLEngine(); + _engine.setUseClientMode(true); + } + catch(Exception e) + { + throw new TransportException("Error creating SSL Engine", e); + } + + } + + public Sender sender(Sender delegate) + { + SSLSender sender = new SSLSender(_engine, _layer.sender(delegate), _sslStatus); + sender.setHostname(_hostname); + return sender; + } + + public Receiver receiver(Receiver delegate) + { + SSLReceiver receiver = new SSLReceiver(_engine, _layer.receiver(delegate), _sslStatus); + receiver.setHostname(_hostname); + return receiver; + } + + public String getUserID() + { + return SSLUtil.retriveIdentity(_engine); + } + } + + + static class SASLSecurityLayer implements SecurityLayer + { + + private SecurityLayer _layer; + + SASLSecurityLayer(SecurityLayer layer) + { + _layer = layer; + } + + public SASLSender sender(Sender delegate) + { + SASLSender sender = new SASLSender(_layer.sender(delegate)); + return sender; + } + + public SASLReceiver receiver(Receiver delegate) + { + SASLReceiver receiver = new SASLReceiver(_layer.receiver(delegate)); + return receiver; + } + + public String getUserID() + { + return _layer.getUserID(); + } + } + + + static class NullSecurityLayer implements SecurityLayer + { + + private static final NullSecurityLayer INSTANCE = new NullSecurityLayer(); + + private NullSecurityLayer() + { + } + + public Sender sender(Sender delegate) + { + return delegate; + } + + public Receiver receiver(Receiver delegate) + { + return delegate; + } + + public String getUserID() + { + return null; + } + + public static NullSecurityLayer getInstance() + { + return INSTANCE; + } + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java index 878f0b2352..8ad40bbfd3 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java @@ -43,10 +43,11 @@ public class SSLReceiver implements Receiver private final int sslBufSize; private final ByteBuffer localBuffer; private final SSLStatus _sslStatus; - private ConnectionSettings settings; private ByteBuffer appData; private boolean dataCached = false; + private String _hostname; + public SSLReceiver(final SSLEngine engine, final Receiver delegate, final SSLStatus sslStatus) { this.engine = engine; @@ -57,9 +58,9 @@ public class SSLReceiver implements Receiver _sslStatus = sslStatus; } - public void setConnectionSettings(ConnectionSettings settings) + public void setHostname(String hostname) { - this.settings = settings; + _hostname = hostname; } public void closed() @@ -166,9 +167,9 @@ public class SSLReceiver implements Receiver handshakeStatus = engine.getHandshakeStatus(); case FINISHED: - if (this.settings != null && this.settings.isVerifyHostname() ) + if (_hostname != null) { - SSLUtil.verifyHostname(engine, this.settings.getHost()); + SSLUtil.verifyHostname(engine, _hostname); } case NEED_WRAP: diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java index 5e0ee93cb8..6f5aa6d86e 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java @@ -44,8 +44,9 @@ public class SSLSender implements Sender private final ByteBuffer netData; private final long timeout; private final SSLStatus _sslStatus; - private ConnectionSettings settings; - + + private String _hostname; + private final AtomicBoolean closed = new AtomicBoolean(false); @@ -59,9 +60,9 @@ public class SSLSender implements Sender _sslStatus = sslStatus; } - public void setConnectionSettings(ConnectionSettings settings) + public void setHostname(String hostname) { - this.settings = settings; + _hostname = hostname; } public void close() @@ -237,9 +238,9 @@ public class SSLSender implements Sender break; case FINISHED: - if (this.settings != null && this.settings.isVerifyHostname() ) + if (_hostname != null) { - SSLUtil.verifyHostname(engine, this.settings.getHost()); + SSLUtil.verifyHostname(engine, _hostname); } case NOT_HANDSHAKING: diff --git a/qpid/java/common/src/test/java/org/apache/qpid/session/TestSession.java b/qpid/java/common/src/test/java/org/apache/qpid/session/TestSession.java deleted file mode 100644 index aafc91b03b..0000000000 --- a/qpid/java/common/src/test/java/org/apache/qpid/session/TestSession.java +++ /dev/null @@ -1,277 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.session; - -import org.apache.mina.common.*; - -import java.net.SocketAddress; -import java.util.Set; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ConcurrentHashMap; - -public class TestSession implements IoSession -{ - private final ConcurrentMap attributes = new ConcurrentHashMap(); - - public TestSession() - { - } - - public IoService getService() - { - return null; //TODO - } - - public IoServiceConfig getServiceConfig() - { - return null; //TODO - } - - public IoHandler getHandler() - { - return null; //TODO - } - - public IoSessionConfig getConfig() - { - return null; //TODO - } - - public IoFilterChain getFilterChain() - { - return null; //TODO - } - - public WriteFuture write(Object message) - { - return null; //TODO - } - - public CloseFuture close() - { - return null; //TODO - } - - public Object getAttachment() - { - return getAttribute(""); - } - - public Object setAttachment(Object attachment) - { - return setAttribute("",attachment); - } - - public Object getAttribute(String key) - { - return attributes.get(key); - } - - public Object setAttribute(String key, Object value) - { - return attributes.put(key,value); - } - - public Object setAttribute(String key) - { - return attributes.put(key, Boolean.TRUE); - } - - public Object removeAttribute(String key) - { - return attributes.remove(key); - } - - public boolean containsAttribute(String key) - { - return attributes.containsKey(key); - } - - public Set getAttributeKeys() - { - return attributes.keySet(); - } - - public TransportType getTransportType() - { - return null; //TODO - } - - public boolean isConnected() - { - return false; //TODO - } - - public boolean isClosing() - { - return false; //TODO - } - - public CloseFuture getCloseFuture() - { - return null; //TODO - } - - public SocketAddress getRemoteAddress() - { - return null; //TODO - } - - public SocketAddress getLocalAddress() - { - return null; //TODO - } - - public SocketAddress getServiceAddress() - { - return null; //TODO - } - - public int getIdleTime(IdleStatus status) - { - return 0; //TODO - } - - public long getIdleTimeInMillis(IdleStatus status) - { - return 0; //TODO - } - - public void setIdleTime(IdleStatus status, int idleTime) - { - //TODO - } - - public int getWriteTimeout() - { - return 0; //TODO - } - - public long getWriteTimeoutInMillis() - { - return 0; //TODO - } - - public void setWriteTimeout(int writeTimeout) - { - //TODO - } - - public TrafficMask getTrafficMask() - { - return null; //TODO - } - - public void setTrafficMask(TrafficMask trafficMask) - { - //TODO - } - - public void suspendRead() - { - //TODO - } - - public void suspendWrite() - { - //TODO - } - - public void resumeRead() - { - //TODO - } - - public void resumeWrite() - { - //TODO - } - - public long getReadBytes() - { - return 0; //TODO - } - - public long getWrittenBytes() - { - return 0; //TODO - } - - public long getReadMessages() - { - return 0; - } - - public long getWrittenMessages() - { - return 0; - } - - public long getWrittenWriteRequests() - { - return 0; //TODO - } - - public int getScheduledWriteRequests() - { - return 0; //TODO - } - - public int getScheduledWriteBytes() - { - return 0; //TODO - } - - public long getCreationTime() - { - return 0; //TODO - } - - public long getLastIoTime() - { - return 0; //TODO - } - - public long getLastReadTime() - { - return 0; //TODO - } - - public long getLastWriteTime() - { - return 0; //TODO - } - - public boolean isIdle(IdleStatus status) - { - return false; //TODO - } - - public int getIdleCount(IdleStatus status) - { - return 0; //TODO - } - - public long getLastIdleTime(IdleStatus status) - { - return 0; //TODO - } -} diff --git a/qpid/java/common/src/test/java/org/apache/qpid/transport/TestNetworkConnection.java b/qpid/java/common/src/test/java/org/apache/qpid/transport/TestNetworkConnection.java index 8686c17414..8533c64fab 100644 --- a/qpid/java/common/src/test/java/org/apache/qpid/transport/TestNetworkConnection.java +++ b/qpid/java/common/src/test/java/org/apache/qpid/transport/TestNetworkConnection.java @@ -49,10 +49,12 @@ public class TestNetworkConnection implements NetworkConnection _sender = new MockSender(); } + + public void bind(int port, InetAddress[] addresses, ProtocolEngineFactory protocolFactory, NetworkTransportConfiguration config, SSLContextFactory sslFactory) throws BindException { - + } public SocketAddress getLocalAddress() @@ -68,37 +70,37 @@ public class TestNetworkConnection implements NetworkConnection public void open(int port, InetAddress destination, ProtocolEngine engine, NetworkTransportConfiguration config, SSLContextFactory sslFactory) throws OpenException { - + } public void setMaxReadIdle(int idleTime) { - + } public void setMaxWriteIdle(int idleTime) { - + } public void close() { - + } public void flush() { - + } public void send(ByteBuffer msg) { - + } public void setIdleTimeout(int i) { - + } public void setPort(int port) @@ -135,4 +137,8 @@ public class TestNetworkConnection implements NetworkConnection { return _sender; } + + public void start() + { + } } diff --git a/qpid/java/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java b/qpid/java/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java index d2fab7d163..7039b904e3 100644 --- a/qpid/java/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java +++ b/qpid/java/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java @@ -33,7 +33,6 @@ import org.apache.qpid.transport.NetworkTransportConfiguration; import org.apache.qpid.transport.Receiver; import org.apache.qpid.transport.TransportException; import org.apache.qpid.transport.network.io.IoNetworkTransport; -import org.apache.qpid.transport.network.mina.MinaNetworkTransport; public class TransportTest extends QpidTestCase { @@ -44,7 +43,7 @@ public class TransportTest extends QpidTestCase { final OutgoingNetworkTransport networkTransport = Transport.getOutgoingTransportInstance(ProtocolVersion.v8_0); assertNotNull(networkTransport); - assertTrue(networkTransport instanceof MinaNetworkTransport); + assertTrue(networkTransport instanceof IoNetworkTransport); } public void testGloballyOverriddenOutgoingTransportForv0_8() throws Exception @@ -76,7 +75,7 @@ public class TransportTest extends QpidTestCase { final IncomingNetworkTransport networkTransport = Transport.getIncomingTransportInstance(); assertNotNull(networkTransport); - assertTrue(networkTransport instanceof MinaNetworkTransport); + assertTrue(networkTransport instanceof IoNetworkTransport); } public void testOverriddenGetIncomingTransport() throws Exception diff --git a/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MinaNetworkHandlerTest.java b/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MinaNetworkHandlerTest.java deleted file mode 100644 index 976b141fc0..0000000000 --- a/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MinaNetworkHandlerTest.java +++ /dev/null @@ -1,540 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.transport.network.mina; - -import static org.apache.qpid.transport.ConnectionSettings.WILDCARD_ADDRESS; - -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import org.apache.qpid.framing.AMQDataBlock; -import org.apache.qpid.protocol.ProtocolEngine; -import org.apache.qpid.protocol.ProtocolEngineFactory; -import org.apache.qpid.protocol.ServerProtocolEngine; -import org.apache.qpid.test.utils.QpidTestCase; -import org.apache.qpid.transport.ConnectionSettings; -import org.apache.qpid.transport.NetworkTransportConfiguration; -import org.apache.qpid.transport.TransportException; -import org.apache.qpid.transport.network.IncomingNetworkTransport; -import org.apache.qpid.transport.network.NetworkConnection; -import org.apache.qpid.transport.network.OutgoingNetworkTransport; -import org.apache.qpid.transport.network.Transport; - -public class MinaNetworkHandlerTest extends QpidTestCase -{ - - private static final String TEST_DATA = "YHALOTHAR"; - private int _testPort; - private IncomingNetworkTransport _server; - private OutgoingNetworkTransport _client; - private CountingProtocolEngine _countingEngine; // Keeps a count of how many bytes it's read - private Exception _thrownEx; - private ConnectionSettings _clientSettings; - private NetworkConnection _network; - private TestNetworkTransportConfiguration _brokerSettings; - - @Override - public void setUp() throws Exception - { - String host = InetAddress.getLocalHost().getHostName(); - _testPort = findFreePort(); - - _clientSettings = new ConnectionSettings(); - _clientSettings.setHost(host); - _clientSettings.setPort(_testPort); - - _brokerSettings = new TestNetworkTransportConfiguration(_testPort, host); - - _server = new MinaNetworkTransport(); - _client = new MinaNetworkTransport(); - _thrownEx = null; - _countingEngine = new CountingProtocolEngine(); - } - - @Override - public void tearDown() - { - if (_server != null) - { - _server.close(); - } - - if (_client != null) - { - _client.close(); - } - } - - /** - * Tests that a socket can't be opened if a driver hasn't been bound - * to the port and can be opened if a driver has been bound. - */ - public void testBindOpen() throws Exception - { - try - { - _client.connect(_clientSettings, _countingEngine, null); - } - catch (TransportException e) - { - _thrownEx = e; - } - - assertNotNull("Open should have failed since no engine bound", _thrownEx); - - _server.accept(_brokerSettings, null, null); - - _client.connect(_clientSettings, _countingEngine, null); - } - - /** - * Tests that a socket can't be opened after a bound NetworkDriver has been closed - */ - public void testBindOpenCloseOpen() throws Exception - { - _server.accept(_brokerSettings, new EchoProtocolEngineSingletonFactory(), null); - _client.connect(_clientSettings, _countingEngine, null); - _client.close(); - _server.close(); - - try - { - _client.connect(_clientSettings, _countingEngine, null); - } - catch (TransportException e) - { - _thrownEx = e; - } - assertNotNull("Open should have failed", _thrownEx); - } - - /** - * Checks that the right exception is thrown when binding a NetworkDriver to an already - * existing socket. - */ - public void testBindPortInUse() - { - try - { - _server.accept(_brokerSettings, new EchoProtocolEngineSingletonFactory(), null); - } - catch (TransportException e) - { - fail("First bind should not fail"); - } - - try - { - IncomingNetworkTransport second = new MinaNetworkTransport(); - second.accept(_brokerSettings, new EchoProtocolEngineSingletonFactory(), null); - } - catch (TransportException e) - { - _thrownEx = e; - } - assertNotNull("Second bind should throw BindException", _thrownEx); - } - - /** - * Tests that binding to the wildcard address succeeds and a client can - * connect via localhost. - */ - public void testWildcardBind() throws Exception - { - TestNetworkTransportConfiguration serverSettings = - new TestNetworkTransportConfiguration(_testPort, WILDCARD_ADDRESS); - - _server.accept(serverSettings, null, null); - - try - { - _client.connect(_clientSettings, _countingEngine, null); - } - catch (TransportException e) - { - fail("Open should have succeeded since we used a wildcard bind"); - } - } - - /** - * tests that bytes sent on a network driver are received at the other end - */ - public void testSend() throws Exception - { - // Open a connection from a counting engine to an echo engine - _server.accept(_brokerSettings, new EchoProtocolEngineSingletonFactory(), null); - _network = _client.connect(_clientSettings, _countingEngine, null); - - // Tell the counting engine how much data we're sending - _countingEngine.setNewLatch(TEST_DATA.getBytes().length); - - // Send the data and wait for up to 2 seconds to get it back - _network.getSender().send(ByteBuffer.wrap(TEST_DATA.getBytes())); - _countingEngine.getLatch().await(2, TimeUnit.SECONDS); - - // Check what we got - assertEquals("Wrong amount of data recieved", TEST_DATA.getBytes().length, _countingEngine.getReadBytes()); - } - - /** - * Opens a connection with a low read idle and check that it gets triggered - * - */ - public void testSetReadIdle() throws Exception - { - // Open a connection from a counting engine to an echo engine - _server.accept(_brokerSettings, new EchoProtocolEngineSingletonFactory(), null); - _network = _client.connect(_clientSettings, _countingEngine, null); - assertFalse("Reader should not have been idle", _countingEngine.getReaderHasBeenIdle()); - _network.setMaxReadIdle(1); - sleepForAtLeast(1500); - assertTrue("Reader should have been idle", _countingEngine.getReaderHasBeenIdle()); - } - - /** - * Opens a connection with a low write idle and check that it gets triggered - * - */ - public void testSetWriteIdle() throws Exception - { - // Open a connection from a counting engine to an echo engine - _server.accept(_brokerSettings, new EchoProtocolEngineSingletonFactory(), null); - _network = _client.connect(_clientSettings, _countingEngine, null); - assertFalse("Reader should not have been idle", _countingEngine.getWriterHasBeenIdle()); - _network.setMaxWriteIdle(1); - sleepForAtLeast(1500); - assertTrue("Reader should have been idle", _countingEngine.getWriterHasBeenIdle()); - } - - - /** - * Creates and then closes a connection from client to server and checks that the server - * has its closed() method called. Then creates a new client and closes the server to check - * that the client has its closed() method called. - */ - public void testClosed() throws Exception - { - // Open a connection from a counting engine to an echo engine - EchoProtocolEngineSingletonFactory factory = new EchoProtocolEngineSingletonFactory(); - _server.accept(_brokerSettings, factory, null); - _network = _client.connect(_clientSettings, _countingEngine, null); - EchoProtocolEngine serverEngine = null; - while (serverEngine == null) - { - serverEngine = factory.getEngine(); - if (serverEngine == null) - { - try - { - Thread.sleep(10); - } - catch (InterruptedException e) - { - } - } - } - assertFalse("Server should not have been closed", serverEngine.getClosed()); - serverEngine.setNewLatch(1); - _client.close(); - try - { - serverEngine.getLatch().await(2, TimeUnit.SECONDS); - } - catch (InterruptedException e) - { - } - assertTrue("Server should have been closed", serverEngine.getClosed()); - - _client.connect(_clientSettings, _countingEngine, null); - _countingEngine.setClosed(false); - assertFalse("Client should not have been closed", _countingEngine.getClosed()); - _countingEngine.setNewLatch(1); - _server.close(); - try - { - _countingEngine.getLatch().await(2, TimeUnit.SECONDS); - } - catch (InterruptedException e) - { - } - assertTrue("Client should have been closed", _countingEngine.getClosed()); - } - - /** - * Create a connection and instruct the client to throw an exception when it gets some data - * and that the latch gets counted down. - */ - public void testExceptionCaught() throws Exception - { - _server.accept(_brokerSettings, new EchoProtocolEngineSingletonFactory(), null); - _network = _client.connect(_clientSettings, _countingEngine, null); - - - assertEquals("Exception should not have been thrown", 1, - _countingEngine.getExceptionLatch().getCount()); - _countingEngine.setErrorOnNextRead(true); - _countingEngine.setNewLatch(TEST_DATA.getBytes().length); - _network.getSender().send(ByteBuffer.wrap(TEST_DATA.getBytes())); - _countingEngine.getExceptionLatch().await(2, TimeUnit.SECONDS); - assertEquals("Exception should have been thrown", 0, - _countingEngine.getExceptionLatch().getCount()); - } - - /** - * Opens a connection and checks that the remote address is the one that was asked for - */ - public void testGetRemoteAddress() throws Exception - { - _server.accept(_brokerSettings, new EchoProtocolEngineSingletonFactory(), null); - _network = _client.connect(_clientSettings, _countingEngine, null); - assertEquals(new InetSocketAddress(InetAddress.getLocalHost(), _testPort), - _network.getRemoteAddress()); - } - - private class EchoProtocolEngineSingletonFactory implements ProtocolEngineFactory - { - private EchoProtocolEngine _engine = null; - - public ProtocolEngine newProtocolEngine(NetworkConnection network) - { - if (_engine == null) - { - _engine = new EchoProtocolEngine(network); - } - return getEngine(); - } - - public EchoProtocolEngine getEngine() - { - return _engine; - } - } - - public class CountingProtocolEngine implements ServerProtocolEngine - { - public ArrayList _receivedBytes = new ArrayList(); - private int _readBytes; - private CountDownLatch _latch = new CountDownLatch(0); - private boolean _readerHasBeenIdle; - private boolean _writerHasBeenIdle; - private boolean _closed = false; - private boolean _nextReadErrors = false; - private CountDownLatch _exceptionLatch = new CountDownLatch(1); - - public void closed() - { - setClosed(true); - _latch.countDown(); - } - - public void setErrorOnNextRead(boolean b) - { - _nextReadErrors = b; - } - - public void setNewLatch(int length) - { - _latch = new CountDownLatch(length); - } - - public long getReadBytes() - { - return _readBytes; - } - - public SocketAddress getRemoteAddress() - { - return _network.getRemoteAddress(); - } - - public SocketAddress getLocalAddress() - { - return _network.getLocalAddress(); - } - - public long getWrittenBytes() - { - return 0; - } - - public void readerIdle() - { - _readerHasBeenIdle = true; - } - - public void writeFrame(AMQDataBlock frame) - { - - } - - public void writerIdle() - { - _writerHasBeenIdle = true; - } - - public void exception(Throwable t) - { - _exceptionLatch.countDown(); - } - - public CountDownLatch getExceptionLatch() - { - return _exceptionLatch; - } - - public void received(ByteBuffer msg) - { - // increment read bytes and count down the latch for that many - int bytes = msg.remaining(); - _readBytes += bytes; - for (int i = 0; i < bytes; i++) - { - _latch.countDown(); - } - - // Throw an error if we've been asked too, but we can still count - if (_nextReadErrors) - { - throw new RuntimeException("Was asked to error"); - } - } - - public CountDownLatch getLatch() - { - return _latch; - } - - public boolean getWriterHasBeenIdle() - { - return _writerHasBeenIdle; - } - - public boolean getReaderHasBeenIdle() - { - return _readerHasBeenIdle; - } - - public void setClosed(boolean _closed) - { - this._closed = _closed; - } - - public boolean getClosed() - { - return _closed; - } - - public long getConnectionId() - { - return -1; - } - - } - - private class EchoProtocolEngine extends CountingProtocolEngine - { - private NetworkConnection _echoNetwork; - - public EchoProtocolEngine(NetworkConnection network) - { - _echoNetwork = network; - } - - public void received(ByteBuffer msg) - { - super.received(msg); - msg.rewind(); - _echoNetwork.getSender().send(msg); - } - } - - public static void sleepForAtLeast(long period) - { - long start = System.currentTimeMillis(); - long timeLeft = period; - while (timeLeft > 0) - { - try - { - Thread.sleep(timeLeft); - } - catch (InterruptedException e) - { - // Ignore it - } - timeLeft = period - (System.currentTimeMillis() - start); - } - } - - private static class TestNetworkTransportConfiguration implements NetworkTransportConfiguration - { - private int _port; - private String _host; - - public TestNetworkTransportConfiguration(final int port, final String host) - { - _port = port; - _host = host; - } - - public Boolean getTcpNoDelay() - { - return true; - } - - public Integer getReceiveBufferSize() - { - return 32768; - } - - public Integer getSendBufferSize() - { - return 32768; - } - - public Integer getPort() - { - return _port; - } - - public String getHost() - { - return _host; - } - - public String getTransport() - { - return Transport.TCP; - } - - public Integer getConnectorProcessors() - { - return 4; - } - - } -} \ No newline at end of file diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/failover/FailoverMethodTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/failover/FailoverMethodTest.java index 30d2d851a0..98cdf94ac9 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/failover/FailoverMethodTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/failover/FailoverMethodTest.java @@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit; import javax.jms.ExceptionListener; import javax.jms.JMSException; +import org.apache.qpid.AMQConnectionClosedException; import org.apache.qpid.AMQDisconnectedException; import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQConnection; @@ -139,7 +140,7 @@ public class FailoverMethodTest extends QpidBrokerTestCase implements ExceptionL public void onException(JMSException e) { - if (e.getLinkedException() instanceof AMQDisconnectedException) + if (e.getLinkedException() instanceof AMQDisconnectedException || e.getLinkedException() instanceof AMQConnectionClosedException) { _logger.debug("Received AMQDisconnectedException"); _failoverComplete.countDown(); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/ExternalACLTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/ExternalACLTest.java index d1ba725721..782ca22965 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/ExternalACLTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/ExternalACLTest.java @@ -67,14 +67,14 @@ public class ExternalACLTest extends AbstractACLTestCase fail("Connection was not created due to:" + e); } } - + public void testAccessVhostAuthorisedGuestSuccess() throws IOException, Exception { //The 'guest' user has no access to the 'test' vhost, as tested below in testAccessNoRights(), and so - //is unable to perform actions such as connecting (and by extension, creating a queue, and consuming - //from a queue etc). In order to test the vhost-wide 'access' ACL right, the 'guest' user has been given + //is unable to perform actions such as connecting (and by extension, creating a queue, and consuming + //from a queue etc). In order to test the vhost-wide 'access' ACL right, the 'guest' user has been given //this right in the 'test2' vhost. - + try { //get a connection to the 'test2' vhost using the guest user and perform various actions. @@ -106,7 +106,7 @@ public class ExternalACLTest extends AbstractACLTestCase fail("Test failed due to:" + e.getMessage()); } } - + public void testAccessNoRightsFailure() throws Exception { try @@ -115,7 +115,7 @@ public class ExternalACLTest extends AbstractACLTestCase Session sess = conn.createSession(true, Session.SESSION_TRANSACTED); conn.start(); sess.rollback(); - + fail("Connection was created."); } catch (JMSException e) @@ -126,11 +126,11 @@ public class ExternalACLTest extends AbstractACLTestCase Throwable cause = linkedException.getCause(); assertNotNull("Cause was null", cause); assertTrue("Wrong linked exception type", cause instanceof AMQException); - AMQConstant errorCode = isBroker010() ? AMQConstant.CONTEXT_IN_USE : AMQConstant.ACCESS_REFUSED; + AMQConstant errorCode = isBroker010() ? AMQConstant.CONNECTION_FORCED : AMQConstant.ACCESS_REFUSED; assertEquals("Incorrect error code received", errorCode, ((AMQException) cause).getErrorCode()); } } - + public void testClientDeleteQueueSuccess() throws Exception { try @@ -155,7 +155,7 @@ public class ExternalACLTest extends AbstractACLTestCase fail("Test failed due to:" + e.getMessage()); } } - + public void testServerDeleteQueueFailure() throws Exception { try @@ -207,13 +207,13 @@ public class ExternalACLTest extends AbstractACLTestCase try { Connection conn = getConnection("test", "client", "guest"); - + Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); conn.start(); sess.createConsumer(sess.createQueue("IllegalQueue")); - + fail("Test failed as consumer was created."); } catch (JMSException e) @@ -253,10 +253,10 @@ public class ExternalACLTest extends AbstractACLTestCase Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); conn.start(); - + //Create a Named Queue ((AMQSession) sess).createQueue(new AMQShortString("IllegalQueue"), false, false, false); - + fail("Test failed as Queue creation succeded."); //conn will be automatically closed } @@ -385,7 +385,7 @@ public class ExternalACLTest extends AbstractACLTestCase try { Connection conn = getConnection("test", "client", "guest"); - + Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); conn.start(); @@ -405,13 +405,13 @@ public class ExternalACLTest extends AbstractACLTestCase try { Connection conn = getConnection("test", "server", "guest"); - + Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); conn.start(); sess.createConsumer(sess.createTemporaryQueue()); - + fail("Test failed as consumer was created."); } catch (JMSException e) @@ -446,7 +446,7 @@ public class ExternalACLTest extends AbstractACLTestCase try { Connection conn = getConnection("test", "server", "guest"); - + Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); conn.start(); @@ -480,13 +480,13 @@ public class ExternalACLTest extends AbstractACLTestCase check403Exception(e.getLinkedException()); } } - + public void testServerCreateAutoDeleteQueueInvalid() throws NamingException, JMSException, AMQException, Exception { try { Connection connection = getConnection("test", "server", "guest"); - + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); connection.start(); @@ -630,8 +630,8 @@ public class ExternalACLTest extends AbstractACLTestCase check403Exception(e.getLinkedException()); } } - - + + @Override public String getConfig() { diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java index 97d825177c..d6caf05d33 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java @@ -59,7 +59,7 @@ public class QueueBrowserAutoAckTest extends FailoverBaseCase _queue = _clientSession.createQueue(getTestQueueName()); _clientSession.createConsumer(_queue).close(); - + //Ensure there are no messages on the queue to start with. checkQueueDepth(0); } @@ -490,7 +490,7 @@ public class QueueBrowserAutoAckTest extends FailoverBaseCase } } - assertTrue("We should get atleast " + messages + " msgs.", msgCount >= messages); + assertTrue("We should get atleast " + messages + " msgs (found " + msgCount +").", msgCount >= messages); if (_logger.isDebugEnabled()) { diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionFactoryTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionFactoryTest.java new file mode 100644 index 0000000000..545081fb43 --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionFactoryTest.java @@ -0,0 +1,41 @@ +package org.apache.qpid.test.unit.client.connection; + +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQConnectionFactory; +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +public class ConnectionFactoryTest extends QpidBrokerTestCase +{ + + /** + * The username & password specified should not override the default + * specified in the URL. + */ + public void testCreateConnectionWithUsernamePassword() throws Exception + { + + String brokerUrl = getBroker().toString(); + String URL = "amqp://guest:guest@clientID/test?brokerlist='" + brokerUrl + "'"; + AMQConnectionFactory factory = new AMQConnectionFactory(URL); + + AMQConnection con = (AMQConnection)factory.createConnection(); + assertEquals("Usernames used is different from the one in URL","guest",con.getConnectionURL().getUsername()); + assertEquals("Password used is different from the one in URL","guest",con.getConnectionURL().getPassword()); + + try + { + AMQConnection con2 = (AMQConnection)factory.createConnection("user","pass"); + assertEquals("Usernames used is different from the one in URL","user",con2.getConnectionURL().getUsername()); + assertEquals("Password used is different from the one in URL","pass",con2.getConnectionURL().getPassword()); + } + catch(Exception e) + { + // ignore + } + + AMQConnection con3 = (AMQConnection)factory.createConnection(); + assertEquals("Usernames used is different from the one in URL","guest",con3.getConnectionURL().getUsername()); + assertEquals("Password used is different from the one in URL","guest",con3.getConnectionURL().getPassword()); + } + +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ExceptionListenerTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ExceptionListenerTest.java index cec9d292cf..0057422c8f 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ExceptionListenerTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ExceptionListenerTest.java @@ -47,13 +47,15 @@ public class ExceptionListenerTest extends QpidBrokerTestCase { public void onException(JMSException e) { + _logger.debug("&&&&&&&&&&&&&&&&&&&&&&&&&&&& Caught exception &&&&&&&&&&&&&&&&&&&&&&&&&&&& ", e); fired.countDown(); } }); - + _logger.debug("%%%%%%%%%%%%%%%% Stopping Broker %%%%%%%%%%%%%%%%%%%%%"); stopBroker(); + _logger.debug("%%%%%%%%%%%%%%%% Stopped Broker %%%%%%%%%%%%%%%%%%%%%"); - if (!fired.await(3, TimeUnit.SECONDS)) + if (!fired.await(5, TimeUnit.SECONDS)) { fail("exception listener was not fired"); } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java index 5701b5a1fd..836684c965 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java @@ -25,7 +25,6 @@ import java.net.InetSocketAddress; import java.net.SocketAddress; import java.net.UnknownHostException; -import org.apache.mina.transport.vmpipe.VmPipeAddress; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.protocol.AMQProtocolSession; diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/protocol/TestIoSession.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/protocol/TestIoSession.java deleted file mode 100644 index f1eb8159b6..0000000000 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/protocol/TestIoSession.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.test.utils.protocol; - -import java.net.InetSocketAddress; -import java.net.SocketAddress; - -import org.apache.mina.common.IoFilterChain; -import org.apache.mina.common.IoHandler; -import org.apache.mina.common.IoService; -import org.apache.mina.common.IoServiceConfig; -import org.apache.mina.common.IoSessionConfig; -import org.apache.mina.common.TransportType; -import org.apache.mina.common.support.BaseIoSession; - -public class TestIoSession extends BaseIoSession { - - private String _stringLocalAddress; - private int _localPort; - - public SocketAddress getLocalAddress() - { - //create a new address for testing purposes using member variables - return new InetSocketAddress(_stringLocalAddress,_localPort); - } - - protected void updateTrafficMask() { - //dummy - } - - public IoService getService() { - return null; - } - - public IoServiceConfig getServiceConfig() { - return null; - } - - public IoHandler getHandler() { - return null; - } - - public IoSessionConfig getConfig() { - return null; - } - - public IoFilterChain getFilterChain() { - return null; - } - - public TransportType getTransportType() { - return null; - } - - public SocketAddress getRemoteAddress() { - return null; - } - - public SocketAddress getServiceAddress() { - return null; - } - - public int getScheduledWriteRequests() { - return 0; - } - - public int getScheduledWriteBytes() { - return 0; - } - - public String getStringLocalAddress() { - return _stringLocalAddress; - } - - public void setStringLocalAddress(String _stringLocalAddress) { - this._stringLocalAddress = _stringLocalAddress; - } - - public int getLocalPort() { - return _localPort; - } - - public void setLocalPort(int _localPort) { - this._localPort = _localPort; - } -} diff --git a/qpid/java/test-profiles/CPPExcludes b/qpid/java/test-profiles/CPPExcludes index 2e32754943..29f585b300 100755 --- a/qpid/java/test-profiles/CPPExcludes +++ b/qpid/java/test-profiles/CPPExcludes @@ -175,3 +175,15 @@ org.apache.qpid.server.management.AMQUserManagementMBeanTest#* // QPID-3133: On 0-10, the exception listener is currently not invoked when reconnection fails to occurs. org.apache.qpid.server.failover.FailoverMethodTest#* + +//QPID-3468: exclude QueueBrowser related failover tests +org.apache.qpid.test.client.QueueBrowserAutoAckTest#testFailoverWithQueueBrowser +org.apache.qpid.test.client.QueueBrowserAutoAckTest#testFailoverAsQueueBrowserCreated +org.apache.qpid.test.client.QueueBrowserClientAckTest#testFailoverWithQueueBrowser +org.apache.qpid.test.client.QueueBrowserClientAckTest#testFailoverAsQueueBrowserCreated +org.apache.qpid.test.client.QueueBrowserNoAckTest#testFailoverWithQueueBrowser +org.apache.qpid.test.client.QueueBrowserNoAckTest#testFailoverAsQueueBrowserCreated +org.apache.qpid.test.client.QueueBrowserPreAckTest#testFailoverWithQueueBrowser +org.apache.qpid.test.client.QueueBrowserPreAckTest#testFailoverAsQueueBrowserCreated +org.apache.qpid.test.client.QueueBrowserDupsOkTest#testFailoverWithQueueBrowser +org.apache.qpid.test.client.QueueBrowserDupsOkTest#testFailoverAsQueueBrowserCreated diff --git a/qpid/java/test-profiles/Excludes b/qpid/java/test-profiles/Excludes index 56a256f191..b1edd07f87 100644 --- a/qpid/java/test-profiles/Excludes +++ b/qpid/java/test-profiles/Excludes @@ -43,3 +43,5 @@ org.apache.qpid.test.unit.ack.AcknowledgeAfterFailoverTest#* // QPID-2418 : The queue backing the dur sub is not currently deleted at subscription change, so the test will fail. org.apache.qpid.test.unit.ct.DurableSubscriberTest#testResubscribeWithChangedSelectorAndRestart + +org.apache.qpid.client.ssl.SSLTest#testVerifyLocalHostLocalDomain diff --git a/qpid/java/test-profiles/Java010Excludes b/qpid/java/test-profiles/Java010Excludes index e180566699..fe0a53bdfc 100755 --- a/qpid/java/test-profiles/Java010Excludes +++ b/qpid/java/test-profiles/Java010Excludes @@ -82,3 +82,14 @@ org.apache.qpid.test.client.QueueBrowserTransactedTest#testFailoverWithQueueBrow org.apache.qpid.test.client.timeouts.SyncWaitTimeoutDelayTest#* org.apache.qpid.test.client.timeouts.SyncWaitDelayTest#* +//QPID-3468: exclude QueueBrowser related failover tests +org.apache.qpid.test.client.QueueBrowserAutoAckTest#testFailoverWithQueueBrowser +org.apache.qpid.test.client.QueueBrowserAutoAckTest#testFailoverAsQueueBrowserCreated +org.apache.qpid.test.client.QueueBrowserClientAckTest#testFailoverWithQueueBrowser +org.apache.qpid.test.client.QueueBrowserClientAckTest#testFailoverAsQueueBrowserCreated +org.apache.qpid.test.client.QueueBrowserNoAckTest#testFailoverWithQueueBrowser +org.apache.qpid.test.client.QueueBrowserNoAckTest#testFailoverAsQueueBrowserCreated +org.apache.qpid.test.client.QueueBrowserPreAckTest#testFailoverWithQueueBrowser +org.apache.qpid.test.client.QueueBrowserPreAckTest#testFailoverAsQueueBrowserCreated +org.apache.qpid.test.client.QueueBrowserDupsOkTest#testFailoverWithQueueBrowser +org.apache.qpid.test.client.QueueBrowserDupsOkTest#testFailoverAsQueueBrowserCreated -- cgit v1.2.1