summaryrefslogtreecommitdiff
path: root/java/common/src/main/java/org/apache/qpid
diff options
context:
space:
mode:
Diffstat (limited to 'java/common/src/main/java/org/apache/qpid')
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java9
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/FieldTable.java46
-rw-r--r--java/common/src/main/java/org/apache/qpid/thread/DefaultThreadFactory.java11
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java94
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/Connection.java93
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java19
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/Session.java33
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java23
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java35
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java4
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java68
-rw-r--r--java/common/src/main/java/org/apache/qpid/url/BindingURL.java1
12 files changed, 301 insertions, 135 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java b/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
index a8e7f47db0..39a9beb9e8 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
@@ -40,8 +40,6 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt
private static final byte MINUS = (byte)'-';
private static final byte ZERO = (byte) '0';
-
-
private final class TokenizerImpl implements AMQShortStringTokenizer
{
private final byte _delim;
@@ -115,7 +113,7 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt
private final int _length;
private static final char[] EMPTY_CHAR_ARRAY = new char[0];
-
+
public static final AMQShortString EMPTY_STRING = new AMQShortString((String)null);
public AMQShortString(byte[] data)
@@ -760,6 +758,11 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt
return false; //To change body of created methods use File | Settings | File Templates.
}
+ public static AMQShortString valueOf(Object obj)
+ {
+ return obj == null ? null : new AMQShortString(String.valueOf(obj));
+ }
+
public static void main(String args[])
{
diff --git a/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java b/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
index 341238c667..bd566adf8f 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
@@ -21,15 +21,15 @@
package org.apache.qpid.framing;
import org.apache.mina.common.ByteBuffer;
-
-import org.apache.qpid.AMQPInvalidClassException;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.qpid.AMQPInvalidClassException;
+
import java.math.BigDecimal;
import java.util.Collections;
import java.util.Enumeration;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
@@ -68,8 +68,11 @@ public class FieldTable
public FieldTable(ByteBuffer buffer, long length) throws AMQFrameDecodingException
{
this();
- _encodedForm = buffer.slice();
- _encodedForm.limit((int) length);
+ ByteBuffer encodedForm = buffer.slice();
+ encodedForm.limit((int) length);
+ _encodedForm = ByteBuffer.allocate((int)length);
+ _encodedForm.put(encodedForm);
+ _encodedForm.flip();
_encodedSize = length;
buffer.skip((int) length);
}
@@ -829,6 +832,36 @@ public class FieldTable
recalculateEncodedSize();
}
+ public static Map<String, Object> convertToMap(final FieldTable fieldTable)
+ {
+ final Map<String, Object> map = new HashMap<String, Object>();
+
+ if(fieldTable != null)
+ {
+ fieldTable.processOverElements(
+ new FieldTableElementProcessor()
+ {
+
+ public boolean processElement(String propertyName, AMQTypedValue value)
+ {
+ Object val = value.getValue();
+ if(val instanceof AMQShortString)
+ {
+ val = val.toString();
+ }
+ map.put(propertyName, val);
+ return true;
+ }
+
+ public Object getResult()
+ {
+ return map;
+ }
+ });
+ }
+ return map;
+ }
+
public static interface FieldTableElementProcessor
{
@@ -1046,6 +1079,9 @@ public class FieldTable
{
final AMQShortString key = EncodingUtils.readAMQShortString(buffer);
+
+ _logger.debug("FieldTable::PropFieldTable(buffer," + length + "): Read key '" + key);
+
AMQTypedValue value = AMQTypedValue.readFromBuffer(buffer);
if (trace)
diff --git a/java/common/src/main/java/org/apache/qpid/thread/DefaultThreadFactory.java b/java/common/src/main/java/org/apache/qpid/thread/DefaultThreadFactory.java
index 94869ab205..70d7d2c6b2 100644
--- a/java/common/src/main/java/org/apache/qpid/thread/DefaultThreadFactory.java
+++ b/java/common/src/main/java/org/apache/qpid/thread/DefaultThreadFactory.java
@@ -3,6 +3,17 @@ package org.apache.qpid.thread;
public class DefaultThreadFactory implements ThreadFactory
{
+ private static class QpidThread extends Thread
+ {
+ private QpidThread(final Runnable target)
+ {
+ super(target);
+ }
+
+ }
+
+
+
public Thread createThread(Runnable r)
{
return new Thread(r);
diff --git a/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java b/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
index b12fbb75e6..74064c9d11 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
@@ -20,8 +20,19 @@
*/
package org.apache.qpid.transport;
+import org.ietf.jgss.GSSContext;
+import org.ietf.jgss.GSSException;
+import org.ietf.jgss.GSSManager;
+import org.ietf.jgss.GSSName;
+import org.ietf.jgss.Oid;
+
+import org.apache.qpid.security.UsernamePasswordCallbackHandler;
import static org.apache.qpid.transport.Connection.State.OPEN;
+import org.apache.qpid.transport.util.Logger;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
import java.lang.management.ManagementFactory;
import java.lang.management.RuntimeMXBean;
import java.util.ArrayList;
@@ -30,18 +41,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import javax.security.sasl.Sasl;
-import javax.security.sasl.SaslClient;
-import javax.security.sasl.SaslException;
-
-import org.apache.qpid.security.UsernamePasswordCallbackHandler;
-import org.apache.qpid.transport.util.Logger;
-import org.ietf.jgss.GSSContext;
-import org.ietf.jgss.GSSException;
-import org.ietf.jgss.GSSManager;
-import org.ietf.jgss.GSSName;
-import org.ietf.jgss.Oid;
-
/**
* ClientDelegate
@@ -54,20 +53,22 @@ public class ClientDelegate extends ConnectionDelegate
private static final String KRB5_OID_STR = "1.2.840.113554.1.2.2";
protected static Oid KRB5_OID;
-
- static {
- try {
+
+ static
+ {
+ try
+ {
KRB5_OID = new Oid(KRB5_OID_STR);
} catch (GSSException ignore) {}
}
-
+
private List<String> clientMechs;
private ConnectionSettings conSettings;
-
+
public ClientDelegate(ConnectionSettings settings)
{
this.conSettings = settings;
- this.clientMechs = Arrays.asList(settings.getSaslMechs().split(" "));
+ this.clientMechs = Arrays.asList(settings.getSaslMechs().split(" "));
}
public void init(Connection conn, ProtocolHeader hdr)
@@ -81,12 +82,17 @@ public class ClientDelegate extends ConnectionDelegate
@Override public void connectionStart(Connection conn, ConnectionStart start)
{
Map<String,Object> clientProperties = new HashMap<String,Object>();
+
+ if(this.conSettings.getClientProperties() != null)
+ {
+ clientProperties.putAll(this.conSettings.getClientProperties());
+ }
+
clientProperties.put("qpid.session_flow", 1);
clientProperties.put("qpid.client_pid",getPID());
- clientProperties.put("qpid.client_pid",clientProperties.get("qpid.client_pid"));
clientProperties.put("qpid.client_process",
System.getProperty("qpid.client_process","Qpid Java Client"));
-
+
List<Object> brokerMechs = start.getMechanisms();
if (brokerMechs == null || brokerMechs.isEmpty())
{
@@ -94,27 +100,29 @@ public class ClientDelegate extends ConnectionDelegate
(clientProperties, null, null, conn.getLocale());
return;
}
-
+
List<String> choosenMechs = new ArrayList<String>();
for (String mech:clientMechs)
{
- if (brokerMechs.contains(mech))
+ if (brokerMechs.contains(mech))
{
choosenMechs.add(mech);
}
}
-
+
if (choosenMechs.size() == 0)
{
conn.exception(new ConnectionException("The following SASL mechanisms " +
- clientMechs.toString() +
+ clientMechs.toString() +
" specified by the client are not supported by the broker"));
return;
}
-
+
String[] mechs = new String[choosenMechs.size()];
- choosenMechs.toArray(mechs);
-
+ choosenMechs.toArray(mechs);
+
+ conn.setServerProperties(start.getServerProperties());
+
try
{
Map<String,Object> saslProps = new HashMap<String,Object>();
@@ -162,8 +170,8 @@ public class ClientDelegate extends ConnectionDelegate
tune.getHeartbeatMin(),
tune.getHeartbeatMax()
);
- conn.connectionTuneOk(tune.getChannelMax(),
- tune.getMaxFrameSize(),
+ conn.connectionTuneOk(tune.getChannelMax(),
+ tune.getMaxFrameSize(),
hb_interval);
// The idle timeout is twice the heartbeat amount (in milisecs)
conn.setIdleTimeout(hb_interval*1000*2);
@@ -212,11 +220,11 @@ public class ClientDelegate extends ConnectionDelegate
return max;
}
}
-
+
private int getPID()
{
RuntimeMXBean rtb = ManagementFactory.getRuntimeMXBean();
- String processName = rtb.getName();
+ String processName = rtb.getName();
if (processName != null && processName.indexOf('@')>0)
{
try
@@ -236,36 +244,36 @@ public class ClientDelegate extends ConnectionDelegate
}
}
-
+
private String getUserID()
{
log.debug("Obtaining userID from kerberos");
String service = conSettings.getSaslProtocol() + "@" + conSettings.getSaslServerName();
GSSManager manager = GSSManager.getInstance();
-
- try
+
+ try
{
GSSName acceptorName = manager.createName(service,
GSSName.NT_HOSTBASED_SERVICE, KRB5_OID);
-
+
GSSContext secCtx = manager.createContext(acceptorName,
KRB5_OID,
null,
GSSContext.INDEFINITE_LIFETIME);
-
- secCtx.initSecContext(new byte[0], 0, 1);
-
+
+ secCtx.initSecContext(new byte[0], 0, 1);
+
if (secCtx.getSrcName() != null)
{
return secCtx.getSrcName().toString();
- }
-
- }
- catch (GSSException e)
+ }
+
+ }
+ catch (GSSException e)
{
log.warn("Unable to retrieve userID from Kerberos due to error",e);
}
-
+
return null;
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/java/common/src/main/java/org/apache/qpid/transport/Connection.java
index 17a13561c8..8c2da9d77a 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/Connection.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/Connection.java
@@ -20,24 +20,26 @@
*/
package org.apache.qpid.transport;
+import static org.apache.qpid.transport.Connection.State.CLOSED;
+import static org.apache.qpid.transport.Connection.State.CLOSING;
+import static org.apache.qpid.transport.Connection.State.NEW;
+import static org.apache.qpid.transport.Connection.State.OPEN;
+import static org.apache.qpid.transport.Connection.State.OPENING;
import org.apache.qpid.transport.network.ConnectionBinding;
import org.apache.qpid.transport.network.io.IoTransport;
import org.apache.qpid.transport.util.Logger;
import org.apache.qpid.transport.util.Waiter;
import org.apache.qpid.util.Strings;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslServer;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-
import java.util.UUID;
-import javax.security.sasl.SaslClient;
-import javax.security.sasl.SaslServer;
-
-import static org.apache.qpid.transport.Connection.State.*;
-
/**
* Connection
@@ -55,6 +57,7 @@ public class Connection extends ConnectionInvoker
private static final Logger log = Logger.get(Connection.class);
+
public enum State { NEW, CLOSED, OPENING, OPEN, CLOSING, CLOSE_RCVD }
class DefaultConnectionListener implements ConnectionListener
@@ -67,6 +70,24 @@ public class Connection extends ConnectionInvoker
public void closed(Connection conn) {}
}
+ public static interface SessionFactory
+ {
+ Session newSession(Connection conn, Binary name, long expiry);
+ }
+
+ private static final class DefaultSessionFactory implements SessionFactory
+ {
+
+ public Session newSession(final Connection conn, final Binary name, final long expiry)
+ {
+ return new Session(conn, name, expiry);
+ }
+ }
+
+ private static final SessionFactory DEFAULT_SESSION_FACTORY = new DefaultSessionFactory();
+
+ private SessionFactory _sessionFactory = DEFAULT_SESSION_FACTORY;
+
private ConnectionDelegate delegate;
private Sender<ProtocolEvent> sender;
@@ -76,7 +97,7 @@ public class Connection extends ConnectionInvoker
private State state = NEW;
final private Object lock = new Object();
private long timeout = 60000;
- private List<ConnectionListener> listeners = new ArrayList<ConnectionListener>();
+ private List<ConnectionListener> listeners = new ArrayList<ConnectionListener>();
private ConnectionException error = null;
private int channelMax = 1;
@@ -85,6 +106,7 @@ public class Connection extends ConnectionInvoker
private SaslClient saslClient;
private int idleTimeout = 0;
private String _authorizationID;
+ private Map<String,Object> _serverProperties;
private String userID;
private ConnectionSettings conSettings;
@@ -111,7 +133,7 @@ public class Connection extends ConnectionInvoker
public void setSender(Sender<ProtocolEvent> sender)
{
this.sender = sender;
- sender.setIdleTimeout(idleTimeout);
+ sender.setIdleTimeout(idleTimeout);
}
protected void setState(State state)
@@ -157,7 +179,7 @@ public class Connection extends ConnectionInvoker
{
connect(host, port, vhost, username, password, false);
}
-
+
public void connect(String host, int port, String vhost, String username, String password, boolean ssl)
{
connect(host, port, vhost, username, password, ssl,"PLAIN");
@@ -165,6 +187,12 @@ public class Connection extends ConnectionInvoker
public void connect(String host, int port, String vhost, String username, String password, boolean ssl,String saslMechs)
{
+ connect(host, port, vhost, username, password, ssl,saslMechs, Collections.EMPTY_MAP);
+ }
+
+
+ public void connect(String host, int port, String vhost, String username, String password, boolean ssl,String saslMechs,Map<String,Object> clientProps)
+ {
ConnectionSettings settings = new ConnectionSettings();
settings.setHost(host);
settings.setPort(port);
@@ -173,11 +201,13 @@ public class Connection extends ConnectionInvoker
settings.setPassword(password);
settings.setUseSSL(ssl);
settings.setSaslMechs(saslMechs);
+ settings.setClientProperties(clientProps);
connect(settings);
}
-
+
public void connect(ConnectionSettings settings)
{
+
synchronized (lock)
{
conSettings = settings;
@@ -185,9 +215,9 @@ public class Connection extends ConnectionInvoker
userID = settings.getUsername();
delegate = new ClientDelegate(settings);
- IoTransport.connect(settings.getHost(),
- settings.getPort(),
- ConnectionBinding.get(this),
+ IoTransport.connect(settings.getHost(),
+ settings.getPort(),
+ ConnectionBinding.get(this),
settings.isUseSSL());
send(new ProtocolHeader(1, 0, 10));
@@ -264,7 +294,7 @@ public class Connection extends ConnectionInvoker
{
synchronized (lock)
{
- Session ssn = new Session(this, name, expiry);
+ Session ssn = _sessionFactory.newSession(this, name, expiry);
sessions.put(name, ssn);
map(ssn);
ssn.attach();
@@ -280,6 +310,13 @@ public class Connection extends ConnectionInvoker
}
}
+ public void setSessionFactory(SessionFactory sessionFactory)
+ {
+ assert sessionFactory != null;
+
+ _sessionFactory = sessionFactory;
+ }
+
public void setConnectionId(int id)
{
_connectionId = id;
@@ -425,7 +462,7 @@ public class Connection extends ConnectionInvoker
{
listener.exception(this, e);
}
-
+
}
public void exception(Throwable t)
@@ -481,7 +518,7 @@ public class Connection extends ConnectionInvoker
for (ConnectionListener listener: listeners)
{
listener.closed(this);
- }
+ }
}
public void close()
@@ -545,13 +582,13 @@ public class Connection extends ConnectionInvoker
public void setIdleTimeout(int i)
{
- idleTimeout = i;
+ idleTimeout = i;
if (sender != null)
- {
- sender.setIdleTimeout(i);
+ {
+ sender.setIdleTimeout(i);
}
}
-
+
public int getIdleTimeout()
{
return idleTimeout;
@@ -566,22 +603,32 @@ public class Connection extends ConnectionInvoker
{
return _authorizationID;
}
-
+
public String getUserID()
{
return userID;
}
-
+
public void setUserID(String id)
{
userID = id;
}
+ public void setServerProperties(final Map<String, Object> serverProperties)
+ {
+ _serverProperties = serverProperties == null ? Collections.EMPTY_MAP : serverProperties;
+ }
+
+ public Map<String, Object> getServerProperties()
+ {
+ return _serverProperties;
+ }
+
public String toString()
{
return String.format("conn:%x", System.identityHashCode(this));
}
-
+
public ConnectionSettings getConnectionSettings()
{
return conSettings;
diff --git a/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java b/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java
index b25c2d7fd0..c063ef5e6f 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java
@@ -20,7 +20,9 @@
*/
package org.apache.qpid.transport;
-public class ConnectionSettings
+import java.util.Map;
+
+public class ConnectionSettings
{
String protocol = "tcp";
String host = "localhost";
@@ -32,12 +34,13 @@ public class ConnectionSettings
String saslServerName = "localhost";
int port = 5672;
int maxChannelCount = 32767;
- int maxFrameSize = 65535;
+ int maxFrameSize = 65535;
int heartbeatInterval;
boolean useSSL;
boolean useSASLEncryption;
boolean tcpNodelay;
-
+ private Map<String, Object> _clientProperties;
+
public boolean isTcpNodelay()
{
return tcpNodelay;
@@ -187,4 +190,14 @@ public class ConnectionSettings
{
this.maxFrameSize = maxFrameSize;
}
+
+ public void setClientProperties(final Map<String, Object> clientProperties)
+ {
+ _clientProperties = clientProperties;
+ }
+
+ public Map<String, Object> getClientProperties()
+ {
+ return _clientProperties;
+ }
}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/Session.java b/java/common/src/main/java/org/apache/qpid/transport/Session.java
index 818bb19c08..d9a8e5550c 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/Session.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/Session.java
@@ -21,27 +21,32 @@
package org.apache.qpid.transport;
+import static org.apache.qpid.transport.Option.COMPLETED;
+import static org.apache.qpid.transport.Option.SYNC;
+import static org.apache.qpid.transport.Option.TIMELY_REPLY;
+import static org.apache.qpid.transport.Session.State.CLOSED;
+import static org.apache.qpid.transport.Session.State.CLOSING;
+import static org.apache.qpid.transport.Session.State.DETACHED;
+import static org.apache.qpid.transport.Session.State.NEW;
+import static org.apache.qpid.transport.Session.State.OPEN;
+import static org.apache.qpid.transport.Session.State.RESUMING;
import org.apache.qpid.transport.network.Frame;
-
+import static org.apache.qpid.transport.util.Functions.mod;
import org.apache.qpid.transport.util.Logger;
import org.apache.qpid.transport.util.Waiter;
+import static org.apache.qpid.util.Serial.ge;
+import static org.apache.qpid.util.Serial.gt;
+import static org.apache.qpid.util.Serial.le;
+import static org.apache.qpid.util.Serial.lt;
+import static org.apache.qpid.util.Serial.max;
+import static org.apache.qpid.util.Strings.toUTF8;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
-import static org.apache.qpid.transport.Option.*;
-import static org.apache.qpid.transport.Session.State.*;
-import static org.apache.qpid.transport.util.Functions.*;
-import static org.apache.qpid.util.Serial.*;
-import static org.apache.qpid.util.Strings.*;
-
/**
* Session
*
@@ -140,7 +145,7 @@ public class Session extends SessionInvoker
this.expiry = expiry;
}
- int getChannel()
+ public int getChannel()
{
return channel;
}
@@ -464,7 +469,7 @@ public class Session extends SessionInvoker
{
commandBytes -= m.getBodySize();
m.complete();
- commands[idx] = null;
+ commands[idx] = null;
}
}
if (le(lower, maxComplete + 1))
@@ -644,7 +649,7 @@ public class Session extends SessionInvoker
m.setSync(true);
}
needSync = !m.isSync();
-
+
try
{
send(m);
diff --git a/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java b/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java
index d18a0f64db..4486b03a67 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java
@@ -23,6 +23,7 @@ package org.apache.qpid.transport.codec;
import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
+import java.util.UUID;
/**
@@ -315,6 +316,28 @@ public final class BBEncoder extends AbstractEncoder
}
}
+ public void writeBin128(UUID id)
+ {
+ byte[] data = new byte[16];
+
+ long msb = id.getMostSignificantBits();
+ long lsb = id.getLeastSignificantBits();
+
+ assert data.length == 16;
+ for (int i=7; i>=0; i--)
+ {
+ data[i] = (byte)(msb & 0xff);
+ msb = msb >> 8;
+ }
+
+ for (int i=15; i>=8; i--)
+ {
+ data[i] = (byte)(lsb & 0xff);
+ lsb = (lsb >> 8);
+ }
+ writeBin128(data);
+ }
+
public void writeFloat(float aFloat)
{
try
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java b/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
index 3759fa238a..ab174b00b3 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
@@ -20,16 +20,6 @@
*/
package org.apache.qpid.transport.network;
-import static java.lang.Math.min;
-import static org.apache.qpid.transport.network.Frame.FIRST_FRAME;
-import static org.apache.qpid.transport.network.Frame.FIRST_SEG;
-import static org.apache.qpid.transport.network.Frame.HEADER_SIZE;
-import static org.apache.qpid.transport.network.Frame.LAST_FRAME;
-import static org.apache.qpid.transport.network.Frame.LAST_SEG;
-
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-
import org.apache.qpid.transport.Header;
import org.apache.qpid.transport.Method;
import org.apache.qpid.transport.ProtocolDelegate;
@@ -40,6 +30,15 @@ import org.apache.qpid.transport.SegmentType;
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.Struct;
import org.apache.qpid.transport.codec.BBEncoder;
+import static org.apache.qpid.transport.network.Frame.FIRST_FRAME;
+import static org.apache.qpid.transport.network.Frame.FIRST_SEG;
+import static org.apache.qpid.transport.network.Frame.HEADER_SIZE;
+import static org.apache.qpid.transport.network.Frame.LAST_FRAME;
+import static org.apache.qpid.transport.network.Frame.LAST_SEG;
+
+import static java.lang.Math.min;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
/**
@@ -55,7 +54,7 @@ public final class Disassembler implements Sender<ProtocolEvent>,
private final int maxPayload;
private final ByteBuffer header;
private final Object sendlock = new Object();
- private final ThreadLocal<BBEncoder> encoder = new ThreadLocal()
+ private final ThreadLocal<BBEncoder> encoder = new ThreadLocal<BBEncoder>()
{
public BBEncoder initialValue()
{
@@ -98,7 +97,7 @@ public final class Disassembler implements Sender<ProtocolEvent>,
}
}
- private final void frame(byte flags, byte type, byte track, int channel, int size, ByteBuffer buf)
+ private void frame(byte flags, byte type, byte track, int channel, int size, ByteBuffer buf)
{
synchronized (sendlock)
{
@@ -227,8 +226,14 @@ public final class Disassembler implements Sender<ProtocolEvent>,
fragment(flags, type, method, methodSeg);
if (payload)
{
- fragment((byte) 0x0, SegmentType.HEADER, method, headerSeg);
- fragment(LAST_SEG, SegmentType.BODY, method, method.getBody());
+ ByteBuffer body = method.getBody();
+ fragment(body == null ? LAST_SEG : 0x0, SegmentType.HEADER,
+ method, headerSeg);
+ if (body != null)
+ {
+ fragment(LAST_SEG, SegmentType.BODY, method, body);
+ }
+
}
}
}
@@ -237,7 +242,7 @@ public final class Disassembler implements Sender<ProtocolEvent>,
{
throw new IllegalArgumentException("" + error);
}
-
+
public void setIdleTimeout(int i)
{
sender.setIdleTimeout(i);
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
index 6144edb947..e0e06d22ec 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
@@ -60,10 +60,10 @@ final class IoReceiver implements Runnable
this.bufferSize = bufferSize;
this.socket = transport.getSocket();
this.timeout = timeout;
-
+
try
{
- receiverThread = Threading.getThreadFactory().createThread(this);
+ receiverThread = Threading.getThreadFactory().createThread(this);
}
catch(Exception e)
{
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java b/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java
index b324cdd5a9..1a2869a815 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java
@@ -21,14 +21,8 @@
package org.apache.qpid.transport.network.mina;
-import java.io.IOException;
-import java.net.BindException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-
import org.apache.mina.common.ConnectFuture;
+import org.apache.mina.common.ExecutorThreadModel;
import org.apache.mina.common.IdleStatus;
import org.apache.mina.common.IoAcceptor;
import org.apache.mina.common.IoConnector;
@@ -48,6 +42,9 @@ import org.apache.mina.transport.socket.nio.SocketConnectorConfig;
import org.apache.mina.transport.socket.nio.SocketSessionConfig;
import org.apache.mina.util.NewThreadExecutor;
import org.apache.mina.util.SessionUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.qpid.protocol.ProtocolEngine;
import org.apache.qpid.protocol.ProtocolEngineFactory;
import org.apache.qpid.ssl.SSLContextFactory;
@@ -55,14 +52,19 @@ import org.apache.qpid.thread.QpidThreadExecutor;
import org.apache.qpid.transport.NetworkDriver;
import org.apache.qpid.transport.NetworkDriverConfiguration;
import org.apache.qpid.transport.OpenException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.BindException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver
{
private static final int DEFAULT_BUFFER_SIZE = 32 * 1024;
-
+
ProtocolEngine _protocolEngine;
private boolean _useNIO = false;
private int _processors = 4;
@@ -80,7 +82,7 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver
private WriteFuture _lastWriteFuture;
private static final Logger _logger = LoggerFactory.getLogger(MINANetworkDriver.class);
-
+
public MINANetworkDriver(boolean useNIO, int processors, boolean executorPool, boolean protectIO)
{
_useNIO = useNIO;
@@ -100,7 +102,7 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver
_ioSession = session;
_ioSession.setAttachment(_protocolEngine);
}
-
+
public MINANetworkDriver()
{
@@ -110,7 +112,7 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver
{
_socketConnector = ioConnector;
}
-
+
public MINANetworkDriver(IoConnector ioConnector, ProtocolEngine engine)
{
_socketConnector = ioConnector;
@@ -123,7 +125,7 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver
_factory = factory;
_config = config;
-
+
if (_useNIO)
{
_acceptor = new org.apache.mina.transport.socket.nio.MultiThreadSocketAcceptor(_processors,
@@ -135,6 +137,7 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver
}
SocketAcceptorConfig sconfig = (SocketAcceptorConfig) _acceptor.getDefaultConfig();
+ sconfig.setThreadModel(ExecutorThreadModel.getInstance("MINANetworkDriver(Acceptor)"));
SocketSessionConfig sc = (SocketSessionConfig) sconfig.getSessionConfig();
if (config != null)
@@ -181,12 +184,12 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver
{
return _ioSession.getRemoteAddress();
}
-
+
public SocketAddress getLocalAddress()
{
return _ioSession.getLocalAddress();
}
-
+
public void open(int port, InetAddress destination, ProtocolEngine engine, NetworkDriverConfiguration config,
SSLContextFactory sslFactory) throws OpenException
@@ -195,7 +198,7 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver
{
_sslFactory = sslFactory;
}
-
+
if (_useNIO)
{
_socketConnector = new MultiThreadSocketConnector(1, new QpidThreadExecutor());
@@ -205,7 +208,7 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver
_socketConnector = new SocketConnector(1, new QpidThreadExecutor()); // non-blocking
// connector
}
-
+
org.apache.mina.common.ByteBuffer.setUseDirectBuffers(Boolean.getBoolean("amqj.enableDirectBuffers"));
// the MINA default is currently to use the pooled allocator although this may change in future
// once more testing of the performance of the simple allocator has been done
@@ -215,12 +218,23 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver
}
SocketConnectorConfig cfg = (SocketConnectorConfig) _socketConnector.getDefaultConfig();
-
+ String s = "";
+ StackTraceElement[] trace = Thread.currentThread().getStackTrace();
+ for(StackTraceElement elt : trace)
+ {
+ if(elt.getClassName().contains("Test"))
+ {
+ s = elt.getClassName();
+ break;
+ }
+ }
+ cfg.setThreadModel(ExecutorThreadModel.getInstance("MINANetworkDriver(Client)-"+s));
+
SocketSessionConfig scfg = (SocketSessionConfig) cfg.getSessionConfig();
scfg.setTcpNoDelay((config != null) ? config.getTcpNoDelay() : true);
scfg.setSendBufferSize((config != null) ? config.getSendBufferSize() : DEFAULT_BUFFER_SIZE);
scfg.setReceiveBufferSize((config != null) ? config.getReceiveBufferSize() : DEFAULT_BUFFER_SIZE);
-
+
// Don't have the connector's worker thread wait around for other
// connections (we only use
// one SocketConnector per connection at the moment anyway). This allows
@@ -230,7 +244,7 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver
{
((SocketConnector) _socketConnector).setWorkerTimeout(0);
}
-
+
ConnectFuture future = _socketConnector.connect(new InetSocketAddress(destination, port), this, cfg);
future.join();
if (!future.isConnected())
@@ -295,7 +309,7 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver
if (_protocolEngine != null)
{
_protocolEngine.exception(throwable);
- }
+ }
else
{
_logger.error("Exception thrown and no ProtocolEngine to handle it", throwable);
@@ -307,12 +321,12 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver
* Invoked when a message is received on a particular protocol session. Note
* that a protocol session is directly tied to a particular physical
* connection.
- *
+ *
* @param protocolSession
* the protocol session that received the message
* @param message
* the message itself (i.e. a decoded frame)
- *
+ *
* @throws Exception
* if the message cannot be processed
*/
@@ -376,7 +390,7 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver
{
_ioSession = protocolSession;
}
-
+
if (_acceptingConnections)
{
// Set up the protocol engine
@@ -389,12 +403,12 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver
public void sessionIdle(IoSession session, IdleStatus status) throws Exception
{
if (IdleStatus.WRITER_IDLE.equals(status))
- {
+ {
((ProtocolEngine) session.getAttachment()).writerIdle();
}
else if (IdleStatus.READER_IDLE.equals(status))
{
- ((ProtocolEngine) session.getAttachment()).readerIdle();
+ ((ProtocolEngine) session.getAttachment()).readerIdle();
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/url/BindingURL.java b/java/common/src/main/java/org/apache/qpid/url/BindingURL.java
index 25450fea64..9996fff311 100644
--- a/java/common/src/main/java/org/apache/qpid/url/BindingURL.java
+++ b/java/common/src/main/java/org/apache/qpid/url/BindingURL.java
@@ -31,6 +31,7 @@ public interface BindingURL
public static final String OPTION_EXCLUSIVE = "exclusive";
public static final String OPTION_AUTODELETE = "autodelete";
public static final String OPTION_DURABLE = "durable";
+ public static final String OPTION_BROWSE = "browse";
public static final String OPTION_CLIENTID = "clientid";
public static final String OPTION_SUBSCRIPTION = "subscription";
public static final String OPTION_ROUTING_KEY = "routingkey";