diff options
Diffstat (limited to 'java/common/src/main/java/org/apache/qpid')
12 files changed, 301 insertions, 135 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java b/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java index a8e7f47db0..39a9beb9e8 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java @@ -40,8 +40,6 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt private static final byte MINUS = (byte)'-'; private static final byte ZERO = (byte) '0'; - - private final class TokenizerImpl implements AMQShortStringTokenizer { private final byte _delim; @@ -115,7 +113,7 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt private final int _length; private static final char[] EMPTY_CHAR_ARRAY = new char[0]; - + public static final AMQShortString EMPTY_STRING = new AMQShortString((String)null); public AMQShortString(byte[] data) @@ -760,6 +758,11 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt return false; //To change body of created methods use File | Settings | File Templates. } + public static AMQShortString valueOf(Object obj) + { + return obj == null ? null : new AMQShortString(String.valueOf(obj)); + } + public static void main(String args[]) { diff --git a/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java b/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java index 341238c667..bd566adf8f 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java +++ b/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java @@ -21,15 +21,15 @@ package org.apache.qpid.framing; import org.apache.mina.common.ByteBuffer; - -import org.apache.qpid.AMQPInvalidClassException; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.qpid.AMQPInvalidClassException; + import java.math.BigDecimal; import java.util.Collections; import java.util.Enumeration; +import java.util.HashMap; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.LinkedHashSet; @@ -68,8 +68,11 @@ public class FieldTable public FieldTable(ByteBuffer buffer, long length) throws AMQFrameDecodingException { this(); - _encodedForm = buffer.slice(); - _encodedForm.limit((int) length); + ByteBuffer encodedForm = buffer.slice(); + encodedForm.limit((int) length); + _encodedForm = ByteBuffer.allocate((int)length); + _encodedForm.put(encodedForm); + _encodedForm.flip(); _encodedSize = length; buffer.skip((int) length); } @@ -829,6 +832,36 @@ public class FieldTable recalculateEncodedSize(); } + public static Map<String, Object> convertToMap(final FieldTable fieldTable) + { + final Map<String, Object> map = new HashMap<String, Object>(); + + if(fieldTable != null) + { + fieldTable.processOverElements( + new FieldTableElementProcessor() + { + + public boolean processElement(String propertyName, AMQTypedValue value) + { + Object val = value.getValue(); + if(val instanceof AMQShortString) + { + val = val.toString(); + } + map.put(propertyName, val); + return true; + } + + public Object getResult() + { + return map; + } + }); + } + return map; + } + public static interface FieldTableElementProcessor { @@ -1046,6 +1079,9 @@ public class FieldTable { final AMQShortString key = EncodingUtils.readAMQShortString(buffer); + + _logger.debug("FieldTable::PropFieldTable(buffer," + length + "): Read key '" + key); + AMQTypedValue value = AMQTypedValue.readFromBuffer(buffer); if (trace) diff --git a/java/common/src/main/java/org/apache/qpid/thread/DefaultThreadFactory.java b/java/common/src/main/java/org/apache/qpid/thread/DefaultThreadFactory.java index 94869ab205..70d7d2c6b2 100644 --- a/java/common/src/main/java/org/apache/qpid/thread/DefaultThreadFactory.java +++ b/java/common/src/main/java/org/apache/qpid/thread/DefaultThreadFactory.java @@ -3,6 +3,17 @@ package org.apache.qpid.thread; public class DefaultThreadFactory implements ThreadFactory { + private static class QpidThread extends Thread + { + private QpidThread(final Runnable target) + { + super(target); + } + + } + + + public Thread createThread(Runnable r) { return new Thread(r); diff --git a/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java b/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java index b12fbb75e6..74064c9d11 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java +++ b/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java @@ -20,8 +20,19 @@ */ package org.apache.qpid.transport; +import org.ietf.jgss.GSSContext; +import org.ietf.jgss.GSSException; +import org.ietf.jgss.GSSManager; +import org.ietf.jgss.GSSName; +import org.ietf.jgss.Oid; + +import org.apache.qpid.security.UsernamePasswordCallbackHandler; import static org.apache.qpid.transport.Connection.State.OPEN; +import org.apache.qpid.transport.util.Logger; +import javax.security.sasl.Sasl; +import javax.security.sasl.SaslClient; +import javax.security.sasl.SaslException; import java.lang.management.ManagementFactory; import java.lang.management.RuntimeMXBean; import java.util.ArrayList; @@ -30,18 +41,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import javax.security.sasl.Sasl; -import javax.security.sasl.SaslClient; -import javax.security.sasl.SaslException; - -import org.apache.qpid.security.UsernamePasswordCallbackHandler; -import org.apache.qpid.transport.util.Logger; -import org.ietf.jgss.GSSContext; -import org.ietf.jgss.GSSException; -import org.ietf.jgss.GSSManager; -import org.ietf.jgss.GSSName; -import org.ietf.jgss.Oid; - /** * ClientDelegate @@ -54,20 +53,22 @@ public class ClientDelegate extends ConnectionDelegate private static final String KRB5_OID_STR = "1.2.840.113554.1.2.2"; protected static Oid KRB5_OID; - - static { - try { + + static + { + try + { KRB5_OID = new Oid(KRB5_OID_STR); } catch (GSSException ignore) {} } - + private List<String> clientMechs; private ConnectionSettings conSettings; - + public ClientDelegate(ConnectionSettings settings) { this.conSettings = settings; - this.clientMechs = Arrays.asList(settings.getSaslMechs().split(" ")); + this.clientMechs = Arrays.asList(settings.getSaslMechs().split(" ")); } public void init(Connection conn, ProtocolHeader hdr) @@ -81,12 +82,17 @@ public class ClientDelegate extends ConnectionDelegate @Override public void connectionStart(Connection conn, ConnectionStart start) { Map<String,Object> clientProperties = new HashMap<String,Object>(); + + if(this.conSettings.getClientProperties() != null) + { + clientProperties.putAll(this.conSettings.getClientProperties()); + } + clientProperties.put("qpid.session_flow", 1); clientProperties.put("qpid.client_pid",getPID()); - clientProperties.put("qpid.client_pid",clientProperties.get("qpid.client_pid")); clientProperties.put("qpid.client_process", System.getProperty("qpid.client_process","Qpid Java Client")); - + List<Object> brokerMechs = start.getMechanisms(); if (brokerMechs == null || brokerMechs.isEmpty()) { @@ -94,27 +100,29 @@ public class ClientDelegate extends ConnectionDelegate (clientProperties, null, null, conn.getLocale()); return; } - + List<String> choosenMechs = new ArrayList<String>(); for (String mech:clientMechs) { - if (brokerMechs.contains(mech)) + if (brokerMechs.contains(mech)) { choosenMechs.add(mech); } } - + if (choosenMechs.size() == 0) { conn.exception(new ConnectionException("The following SASL mechanisms " + - clientMechs.toString() + + clientMechs.toString() + " specified by the client are not supported by the broker")); return; } - + String[] mechs = new String[choosenMechs.size()]; - choosenMechs.toArray(mechs); - + choosenMechs.toArray(mechs); + + conn.setServerProperties(start.getServerProperties()); + try { Map<String,Object> saslProps = new HashMap<String,Object>(); @@ -162,8 +170,8 @@ public class ClientDelegate extends ConnectionDelegate tune.getHeartbeatMin(), tune.getHeartbeatMax() ); - conn.connectionTuneOk(tune.getChannelMax(), - tune.getMaxFrameSize(), + conn.connectionTuneOk(tune.getChannelMax(), + tune.getMaxFrameSize(), hb_interval); // The idle timeout is twice the heartbeat amount (in milisecs) conn.setIdleTimeout(hb_interval*1000*2); @@ -212,11 +220,11 @@ public class ClientDelegate extends ConnectionDelegate return max; } } - + private int getPID() { RuntimeMXBean rtb = ManagementFactory.getRuntimeMXBean(); - String processName = rtb.getName(); + String processName = rtb.getName(); if (processName != null && processName.indexOf('@')>0) { try @@ -236,36 +244,36 @@ public class ClientDelegate extends ConnectionDelegate } } - + private String getUserID() { log.debug("Obtaining userID from kerberos"); String service = conSettings.getSaslProtocol() + "@" + conSettings.getSaslServerName(); GSSManager manager = GSSManager.getInstance(); - - try + + try { GSSName acceptorName = manager.createName(service, GSSName.NT_HOSTBASED_SERVICE, KRB5_OID); - + GSSContext secCtx = manager.createContext(acceptorName, KRB5_OID, null, GSSContext.INDEFINITE_LIFETIME); - - secCtx.initSecContext(new byte[0], 0, 1); - + + secCtx.initSecContext(new byte[0], 0, 1); + if (secCtx.getSrcName() != null) { return secCtx.getSrcName().toString(); - } - - } - catch (GSSException e) + } + + } + catch (GSSException e) { log.warn("Unable to retrieve userID from Kerberos due to error",e); } - + return null; } } diff --git a/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/java/common/src/main/java/org/apache/qpid/transport/Connection.java index 17a13561c8..8c2da9d77a 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/Connection.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Connection.java @@ -20,24 +20,26 @@ */ package org.apache.qpid.transport; +import static org.apache.qpid.transport.Connection.State.CLOSED; +import static org.apache.qpid.transport.Connection.State.CLOSING; +import static org.apache.qpid.transport.Connection.State.NEW; +import static org.apache.qpid.transport.Connection.State.OPEN; +import static org.apache.qpid.transport.Connection.State.OPENING; import org.apache.qpid.transport.network.ConnectionBinding; import org.apache.qpid.transport.network.io.IoTransport; import org.apache.qpid.transport.util.Logger; import org.apache.qpid.transport.util.Waiter; import org.apache.qpid.util.Strings; +import javax.security.sasl.SaslClient; +import javax.security.sasl.SaslServer; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; - import java.util.UUID; -import javax.security.sasl.SaslClient; -import javax.security.sasl.SaslServer; - -import static org.apache.qpid.transport.Connection.State.*; - /** * Connection @@ -55,6 +57,7 @@ public class Connection extends ConnectionInvoker private static final Logger log = Logger.get(Connection.class); + public enum State { NEW, CLOSED, OPENING, OPEN, CLOSING, CLOSE_RCVD } class DefaultConnectionListener implements ConnectionListener @@ -67,6 +70,24 @@ public class Connection extends ConnectionInvoker public void closed(Connection conn) {} } + public static interface SessionFactory + { + Session newSession(Connection conn, Binary name, long expiry); + } + + private static final class DefaultSessionFactory implements SessionFactory + { + + public Session newSession(final Connection conn, final Binary name, final long expiry) + { + return new Session(conn, name, expiry); + } + } + + private static final SessionFactory DEFAULT_SESSION_FACTORY = new DefaultSessionFactory(); + + private SessionFactory _sessionFactory = DEFAULT_SESSION_FACTORY; + private ConnectionDelegate delegate; private Sender<ProtocolEvent> sender; @@ -76,7 +97,7 @@ public class Connection extends ConnectionInvoker private State state = NEW; final private Object lock = new Object(); private long timeout = 60000; - private List<ConnectionListener> listeners = new ArrayList<ConnectionListener>(); + private List<ConnectionListener> listeners = new ArrayList<ConnectionListener>(); private ConnectionException error = null; private int channelMax = 1; @@ -85,6 +106,7 @@ public class Connection extends ConnectionInvoker private SaslClient saslClient; private int idleTimeout = 0; private String _authorizationID; + private Map<String,Object> _serverProperties; private String userID; private ConnectionSettings conSettings; @@ -111,7 +133,7 @@ public class Connection extends ConnectionInvoker public void setSender(Sender<ProtocolEvent> sender) { this.sender = sender; - sender.setIdleTimeout(idleTimeout); + sender.setIdleTimeout(idleTimeout); } protected void setState(State state) @@ -157,7 +179,7 @@ public class Connection extends ConnectionInvoker { connect(host, port, vhost, username, password, false); } - + public void connect(String host, int port, String vhost, String username, String password, boolean ssl) { connect(host, port, vhost, username, password, ssl,"PLAIN"); @@ -165,6 +187,12 @@ public class Connection extends ConnectionInvoker public void connect(String host, int port, String vhost, String username, String password, boolean ssl,String saslMechs) { + connect(host, port, vhost, username, password, ssl,saslMechs, Collections.EMPTY_MAP); + } + + + public void connect(String host, int port, String vhost, String username, String password, boolean ssl,String saslMechs,Map<String,Object> clientProps) + { ConnectionSettings settings = new ConnectionSettings(); settings.setHost(host); settings.setPort(port); @@ -173,11 +201,13 @@ public class Connection extends ConnectionInvoker settings.setPassword(password); settings.setUseSSL(ssl); settings.setSaslMechs(saslMechs); + settings.setClientProperties(clientProps); connect(settings); } - + public void connect(ConnectionSettings settings) { + synchronized (lock) { conSettings = settings; @@ -185,9 +215,9 @@ public class Connection extends ConnectionInvoker userID = settings.getUsername(); delegate = new ClientDelegate(settings); - IoTransport.connect(settings.getHost(), - settings.getPort(), - ConnectionBinding.get(this), + IoTransport.connect(settings.getHost(), + settings.getPort(), + ConnectionBinding.get(this), settings.isUseSSL()); send(new ProtocolHeader(1, 0, 10)); @@ -264,7 +294,7 @@ public class Connection extends ConnectionInvoker { synchronized (lock) { - Session ssn = new Session(this, name, expiry); + Session ssn = _sessionFactory.newSession(this, name, expiry); sessions.put(name, ssn); map(ssn); ssn.attach(); @@ -280,6 +310,13 @@ public class Connection extends ConnectionInvoker } } + public void setSessionFactory(SessionFactory sessionFactory) + { + assert sessionFactory != null; + + _sessionFactory = sessionFactory; + } + public void setConnectionId(int id) { _connectionId = id; @@ -425,7 +462,7 @@ public class Connection extends ConnectionInvoker { listener.exception(this, e); } - + } public void exception(Throwable t) @@ -481,7 +518,7 @@ public class Connection extends ConnectionInvoker for (ConnectionListener listener: listeners) { listener.closed(this); - } + } } public void close() @@ -545,13 +582,13 @@ public class Connection extends ConnectionInvoker public void setIdleTimeout(int i) { - idleTimeout = i; + idleTimeout = i; if (sender != null) - { - sender.setIdleTimeout(i); + { + sender.setIdleTimeout(i); } } - + public int getIdleTimeout() { return idleTimeout; @@ -566,22 +603,32 @@ public class Connection extends ConnectionInvoker { return _authorizationID; } - + public String getUserID() { return userID; } - + public void setUserID(String id) { userID = id; } + public void setServerProperties(final Map<String, Object> serverProperties) + { + _serverProperties = serverProperties == null ? Collections.EMPTY_MAP : serverProperties; + } + + public Map<String, Object> getServerProperties() + { + return _serverProperties; + } + public String toString() { return String.format("conn:%x", System.identityHashCode(this)); } - + public ConnectionSettings getConnectionSettings() { return conSettings; diff --git a/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java b/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java index b25c2d7fd0..c063ef5e6f 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java +++ b/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java @@ -20,7 +20,9 @@ */ package org.apache.qpid.transport; -public class ConnectionSettings +import java.util.Map; + +public class ConnectionSettings { String protocol = "tcp"; String host = "localhost"; @@ -32,12 +34,13 @@ public class ConnectionSettings String saslServerName = "localhost"; int port = 5672; int maxChannelCount = 32767; - int maxFrameSize = 65535; + int maxFrameSize = 65535; int heartbeatInterval; boolean useSSL; boolean useSASLEncryption; boolean tcpNodelay; - + private Map<String, Object> _clientProperties; + public boolean isTcpNodelay() { return tcpNodelay; @@ -187,4 +190,14 @@ public class ConnectionSettings { this.maxFrameSize = maxFrameSize; } + + public void setClientProperties(final Map<String, Object> clientProperties) + { + _clientProperties = clientProperties; + } + + public Map<String, Object> getClientProperties() + { + return _clientProperties; + } } diff --git a/java/common/src/main/java/org/apache/qpid/transport/Session.java b/java/common/src/main/java/org/apache/qpid/transport/Session.java index 818bb19c08..d9a8e5550c 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/Session.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Session.java @@ -21,27 +21,32 @@ package org.apache.qpid.transport; +import static org.apache.qpid.transport.Option.COMPLETED; +import static org.apache.qpid.transport.Option.SYNC; +import static org.apache.qpid.transport.Option.TIMELY_REPLY; +import static org.apache.qpid.transport.Session.State.CLOSED; +import static org.apache.qpid.transport.Session.State.CLOSING; +import static org.apache.qpid.transport.Session.State.DETACHED; +import static org.apache.qpid.transport.Session.State.NEW; +import static org.apache.qpid.transport.Session.State.OPEN; +import static org.apache.qpid.transport.Session.State.RESUMING; import org.apache.qpid.transport.network.Frame; - +import static org.apache.qpid.transport.util.Functions.mod; import org.apache.qpid.transport.util.Logger; import org.apache.qpid.transport.util.Waiter; +import static org.apache.qpid.util.Serial.ge; +import static org.apache.qpid.util.Serial.gt; +import static org.apache.qpid.util.Serial.le; +import static org.apache.qpid.util.Serial.lt; +import static org.apache.qpid.util.Serial.max; +import static org.apache.qpid.util.Strings.toUTF8; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; -import java.util.List; import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; -import static org.apache.qpid.transport.Option.*; -import static org.apache.qpid.transport.Session.State.*; -import static org.apache.qpid.transport.util.Functions.*; -import static org.apache.qpid.util.Serial.*; -import static org.apache.qpid.util.Strings.*; - /** * Session * @@ -140,7 +145,7 @@ public class Session extends SessionInvoker this.expiry = expiry; } - int getChannel() + public int getChannel() { return channel; } @@ -464,7 +469,7 @@ public class Session extends SessionInvoker { commandBytes -= m.getBodySize(); m.complete(); - commands[idx] = null; + commands[idx] = null; } } if (le(lower, maxComplete + 1)) @@ -644,7 +649,7 @@ public class Session extends SessionInvoker m.setSync(true); } needSync = !m.isSync(); - + try { send(m); diff --git a/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java b/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java index d18a0f64db..4486b03a67 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java +++ b/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java @@ -23,6 +23,7 @@ package org.apache.qpid.transport.codec; import java.nio.BufferOverflowException; import java.nio.ByteBuffer; import java.nio.ByteOrder; +import java.util.UUID; /** @@ -315,6 +316,28 @@ public final class BBEncoder extends AbstractEncoder } } + public void writeBin128(UUID id) + { + byte[] data = new byte[16]; + + long msb = id.getMostSignificantBits(); + long lsb = id.getLeastSignificantBits(); + + assert data.length == 16; + for (int i=7; i>=0; i--) + { + data[i] = (byte)(msb & 0xff); + msb = msb >> 8; + } + + for (int i=15; i>=8; i--) + { + data[i] = (byte)(lsb & 0xff); + lsb = (lsb >> 8); + } + writeBin128(data); + } + public void writeFloat(float aFloat) { try diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java b/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java index 3759fa238a..ab174b00b3 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java @@ -20,16 +20,6 @@ */ package org.apache.qpid.transport.network; -import static java.lang.Math.min; -import static org.apache.qpid.transport.network.Frame.FIRST_FRAME; -import static org.apache.qpid.transport.network.Frame.FIRST_SEG; -import static org.apache.qpid.transport.network.Frame.HEADER_SIZE; -import static org.apache.qpid.transport.network.Frame.LAST_FRAME; -import static org.apache.qpid.transport.network.Frame.LAST_SEG; - -import java.nio.ByteBuffer; -import java.nio.ByteOrder; - import org.apache.qpid.transport.Header; import org.apache.qpid.transport.Method; import org.apache.qpid.transport.ProtocolDelegate; @@ -40,6 +30,15 @@ import org.apache.qpid.transport.SegmentType; import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.Struct; import org.apache.qpid.transport.codec.BBEncoder; +import static org.apache.qpid.transport.network.Frame.FIRST_FRAME; +import static org.apache.qpid.transport.network.Frame.FIRST_SEG; +import static org.apache.qpid.transport.network.Frame.HEADER_SIZE; +import static org.apache.qpid.transport.network.Frame.LAST_FRAME; +import static org.apache.qpid.transport.network.Frame.LAST_SEG; + +import static java.lang.Math.min; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; /** @@ -55,7 +54,7 @@ public final class Disassembler implements Sender<ProtocolEvent>, private final int maxPayload; private final ByteBuffer header; private final Object sendlock = new Object(); - private final ThreadLocal<BBEncoder> encoder = new ThreadLocal() + private final ThreadLocal<BBEncoder> encoder = new ThreadLocal<BBEncoder>() { public BBEncoder initialValue() { @@ -98,7 +97,7 @@ public final class Disassembler implements Sender<ProtocolEvent>, } } - private final void frame(byte flags, byte type, byte track, int channel, int size, ByteBuffer buf) + private void frame(byte flags, byte type, byte track, int channel, int size, ByteBuffer buf) { synchronized (sendlock) { @@ -227,8 +226,14 @@ public final class Disassembler implements Sender<ProtocolEvent>, fragment(flags, type, method, methodSeg); if (payload) { - fragment((byte) 0x0, SegmentType.HEADER, method, headerSeg); - fragment(LAST_SEG, SegmentType.BODY, method, method.getBody()); + ByteBuffer body = method.getBody(); + fragment(body == null ? LAST_SEG : 0x0, SegmentType.HEADER, + method, headerSeg); + if (body != null) + { + fragment(LAST_SEG, SegmentType.BODY, method, body); + } + } } } @@ -237,7 +242,7 @@ public final class Disassembler implements Sender<ProtocolEvent>, { throw new IllegalArgumentException("" + error); } - + public void setIdleTimeout(int i) { sender.setIdleTimeout(i); diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java index 6144edb947..e0e06d22ec 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java @@ -60,10 +60,10 @@ final class IoReceiver implements Runnable this.bufferSize = bufferSize; this.socket = transport.getSocket(); this.timeout = timeout; - + try { - receiverThread = Threading.getThreadFactory().createThread(this); + receiverThread = Threading.getThreadFactory().createThread(this); } catch(Exception e) { diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java b/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java index b324cdd5a9..1a2869a815 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java @@ -21,14 +21,8 @@ package org.apache.qpid.transport.network.mina; -import java.io.IOException; -import java.net.BindException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.nio.ByteBuffer; - import org.apache.mina.common.ConnectFuture; +import org.apache.mina.common.ExecutorThreadModel; import org.apache.mina.common.IdleStatus; import org.apache.mina.common.IoAcceptor; import org.apache.mina.common.IoConnector; @@ -48,6 +42,9 @@ import org.apache.mina.transport.socket.nio.SocketConnectorConfig; import org.apache.mina.transport.socket.nio.SocketSessionConfig; import org.apache.mina.util.NewThreadExecutor; import org.apache.mina.util.SessionUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.qpid.protocol.ProtocolEngine; import org.apache.qpid.protocol.ProtocolEngineFactory; import org.apache.qpid.ssl.SSLContextFactory; @@ -55,14 +52,19 @@ import org.apache.qpid.thread.QpidThreadExecutor; import org.apache.qpid.transport.NetworkDriver; import org.apache.qpid.transport.NetworkDriverConfiguration; import org.apache.qpid.transport.OpenException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.BindException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.ByteBuffer; public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver { private static final int DEFAULT_BUFFER_SIZE = 32 * 1024; - + ProtocolEngine _protocolEngine; private boolean _useNIO = false; private int _processors = 4; @@ -80,7 +82,7 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver private WriteFuture _lastWriteFuture; private static final Logger _logger = LoggerFactory.getLogger(MINANetworkDriver.class); - + public MINANetworkDriver(boolean useNIO, int processors, boolean executorPool, boolean protectIO) { _useNIO = useNIO; @@ -100,7 +102,7 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver _ioSession = session; _ioSession.setAttachment(_protocolEngine); } - + public MINANetworkDriver() { @@ -110,7 +112,7 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver { _socketConnector = ioConnector; } - + public MINANetworkDriver(IoConnector ioConnector, ProtocolEngine engine) { _socketConnector = ioConnector; @@ -123,7 +125,7 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver _factory = factory; _config = config; - + if (_useNIO) { _acceptor = new org.apache.mina.transport.socket.nio.MultiThreadSocketAcceptor(_processors, @@ -135,6 +137,7 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver } SocketAcceptorConfig sconfig = (SocketAcceptorConfig) _acceptor.getDefaultConfig(); + sconfig.setThreadModel(ExecutorThreadModel.getInstance("MINANetworkDriver(Acceptor)")); SocketSessionConfig sc = (SocketSessionConfig) sconfig.getSessionConfig(); if (config != null) @@ -181,12 +184,12 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver { return _ioSession.getRemoteAddress(); } - + public SocketAddress getLocalAddress() { return _ioSession.getLocalAddress(); } - + public void open(int port, InetAddress destination, ProtocolEngine engine, NetworkDriverConfiguration config, SSLContextFactory sslFactory) throws OpenException @@ -195,7 +198,7 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver { _sslFactory = sslFactory; } - + if (_useNIO) { _socketConnector = new MultiThreadSocketConnector(1, new QpidThreadExecutor()); @@ -205,7 +208,7 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver _socketConnector = new SocketConnector(1, new QpidThreadExecutor()); // non-blocking // connector } - + org.apache.mina.common.ByteBuffer.setUseDirectBuffers(Boolean.getBoolean("amqj.enableDirectBuffers")); // the MINA default is currently to use the pooled allocator although this may change in future // once more testing of the performance of the simple allocator has been done @@ -215,12 +218,23 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver } SocketConnectorConfig cfg = (SocketConnectorConfig) _socketConnector.getDefaultConfig(); - + String s = ""; + StackTraceElement[] trace = Thread.currentThread().getStackTrace(); + for(StackTraceElement elt : trace) + { + if(elt.getClassName().contains("Test")) + { + s = elt.getClassName(); + break; + } + } + cfg.setThreadModel(ExecutorThreadModel.getInstance("MINANetworkDriver(Client)-"+s)); + SocketSessionConfig scfg = (SocketSessionConfig) cfg.getSessionConfig(); scfg.setTcpNoDelay((config != null) ? config.getTcpNoDelay() : true); scfg.setSendBufferSize((config != null) ? config.getSendBufferSize() : DEFAULT_BUFFER_SIZE); scfg.setReceiveBufferSize((config != null) ? config.getReceiveBufferSize() : DEFAULT_BUFFER_SIZE); - + // Don't have the connector's worker thread wait around for other // connections (we only use // one SocketConnector per connection at the moment anyway). This allows @@ -230,7 +244,7 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver { ((SocketConnector) _socketConnector).setWorkerTimeout(0); } - + ConnectFuture future = _socketConnector.connect(new InetSocketAddress(destination, port), this, cfg); future.join(); if (!future.isConnected()) @@ -295,7 +309,7 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver if (_protocolEngine != null) { _protocolEngine.exception(throwable); - } + } else { _logger.error("Exception thrown and no ProtocolEngine to handle it", throwable); @@ -307,12 +321,12 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver * Invoked when a message is received on a particular protocol session. Note * that a protocol session is directly tied to a particular physical * connection. - * + * * @param protocolSession * the protocol session that received the message * @param message * the message itself (i.e. a decoded frame) - * + * * @throws Exception * if the message cannot be processed */ @@ -376,7 +390,7 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver { _ioSession = protocolSession; } - + if (_acceptingConnections) { // Set up the protocol engine @@ -389,12 +403,12 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver public void sessionIdle(IoSession session, IdleStatus status) throws Exception { if (IdleStatus.WRITER_IDLE.equals(status)) - { + { ((ProtocolEngine) session.getAttachment()).writerIdle(); } else if (IdleStatus.READER_IDLE.equals(status)) { - ((ProtocolEngine) session.getAttachment()).readerIdle(); + ((ProtocolEngine) session.getAttachment()).readerIdle(); } } diff --git a/java/common/src/main/java/org/apache/qpid/url/BindingURL.java b/java/common/src/main/java/org/apache/qpid/url/BindingURL.java index 25450fea64..9996fff311 100644 --- a/java/common/src/main/java/org/apache/qpid/url/BindingURL.java +++ b/java/common/src/main/java/org/apache/qpid/url/BindingURL.java @@ -31,6 +31,7 @@ public interface BindingURL public static final String OPTION_EXCLUSIVE = "exclusive"; public static final String OPTION_AUTODELETE = "autodelete"; public static final String OPTION_DURABLE = "durable"; + public static final String OPTION_BROWSE = "browse"; public static final String OPTION_CLIENTID = "clientid"; public static final String OPTION_SUBSCRIPTION = "subscription"; public static final String OPTION_ROUTING_KEY = "routingkey"; |