summaryrefslogtreecommitdiff
path: root/qpid/java/common
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2008-08-05 19:33:11 +0000
committerRafael H. Schloming <rhs@apache.org>2008-08-05 19:33:11 +0000
commitfe1cbdb8a780f78bf6e249e3d41d8de4cce22777 (patch)
tree9d55708c644751f83b78b497240b0e6317008b51 /qpid/java/common
parentf04df8a8c20938c5b8bb176ca35db0376ad76c60 (diff)
downloadqpid-python-fe1cbdb8a780f78bf6e249e3d41d8de4cce22777.tar.gz
Profiling driven changes:
- made AMQShortString cache the toString() value - added static initializer to IoTransport to disable use of pooled byte buffers - modified IoSender to permit buffering - removed OutputHandler and eliminated intermediate Frame generation between Disassembler and Sender<ByteBuffer> (IoSender) - made Disassembler take advantage of IoSender's buffering - removed Header and Data as distinct protocol events, added Header and Body members to MessageTransfer - modified Assembler and Disassembler to decode/encode Header and Data directly to/from MessageTransfer - modified Disassembler to only write data if encoding of headers is successful - added Strings.toUTF8(String) -> byte[] to do proper UTF-8 encoding that is also fast for 7-bit ascii - modified JMSTextMessage to use the Strings.toUTF8 - modified QpidBench to only generate 7-bit ascii when using TextMessage git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@682887 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/common')
-rw-r--r--qpid/java/common/Composite.tpl53
-rw-r--r--qpid/java/common/Invoker.tpl7
-rw-r--r--qpid/java/common/genutil.py9
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/ToyBroker.java129
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/ToyClient.java29
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/ToyExchange.java37
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java8
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/Channel.java67
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/Data.java98
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/Echo.java16
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/Header.java48
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java39
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolDelegate.java4
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java67
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java4
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java17
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/codec/Validator.java2
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java63
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java143
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutputHandler.java125
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java48
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java12
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaHandler.java3
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioHandler.java4
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/util/Strings.java82
25 files changed, 433 insertions, 681 deletions
diff --git a/qpid/java/common/Composite.tpl b/qpid/java/common/Composite.tpl
index 37e3bf8853..283fa24641 100644
--- a/qpid/java/common/Composite.tpl
+++ b/qpid/java/common/Composite.tpl
@@ -1,5 +1,6 @@
package org.apache.qpid.transport;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
@@ -9,7 +10,6 @@ import java.util.UUID;
import org.apache.qpid.transport.codec.Decoder;
import org.apache.qpid.transport.codec.Encodable;
import org.apache.qpid.transport.codec.Encoder;
-import org.apache.qpid.transport.codec.Validator;
import org.apache.qpid.transport.network.Frame;
@@ -18,11 +18,13 @@ from genutil import *
cls = klass(type)["@name"]
+segments = type["segments"]
+
if type.name in ("control", "command"):
base = "Method"
size = 0
pack = 2
- if type["segments"]:
+ if segments:
payload = "true"
else:
payload = "false"
@@ -86,6 +88,10 @@ options = get_options(fields)
for f in fields:
if not f.empty:
out(" private $(f.type) $(f.name);\n")
+
+if segments:
+ out(" private Header header;\n")
+ out(" private ByteBuffer body;\n")
}
${
@@ -99,6 +105,10 @@ for f in fields:
if f.option: continue
out(" $(f.set)($(f.name));\n")
+if segments:
+ out(" setHeader(header);\n")
+ out(" setBody(body);\n")
+
if options or base == "Method":
out("""
for (int i=0; i < _options.length; i++) {
@@ -154,7 +164,6 @@ else:
}
public final $name $(f.set)($(f.type) value) {
- $(f.check)
${
if not f.empty:
out(" this.$(f.name) = value;")
@@ -173,6 +182,44 @@ if pack > 0:
""")
}
+${
+if segments:
+ out(""" public final Header getHeader() {
+ return this.header;
+ }
+
+ public final void setHeader(Header header) {
+ this.header = header;
+ }
+
+ public final $name header(Header header) {
+ setHeader(header);
+ return this;
+ }
+
+ public final ByteBuffer getBody() {
+ if (this.body == null)
+ {
+ return null;
+ }
+ else
+ {
+ return this.body.slice();
+ }
+ }
+
+ public final void setBody(ByteBuffer body) {
+ this.body = body;
+ }
+
+ public final $name body(ByteBuffer body)
+ {
+ setBody(body);
+ return this;
+ }
+""")
+}
+
public void write(Encoder enc)
{
${
diff --git a/qpid/java/common/Invoker.tpl b/qpid/java/common/Invoker.tpl
index 21a17624a6..9158922df7 100644
--- a/qpid/java/common/Invoker.tpl
+++ b/qpid/java/common/Invoker.tpl
@@ -1,5 +1,6 @@
package org.apache.qpid.transport;
+import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -32,9 +33,9 @@ for c in composites:
jclass = ""
out("""
- public final $jresult $(dromedary(name))($(", ".join(params))) {
- $(jreturn)invoke(new $name($(", ".join(args)))$jclass);
- }
+ public final $jresult $(dromedary(name))($(", ".join(params))) {
+ $(jreturn)invoke(new $name($(", ".join(args)))$jclass);
+ }
""")
}
diff --git a/qpid/java/common/genutil.py b/qpid/java/common/genutil.py
index 2f1caa41c4..f8f234548c 100644
--- a/qpid/java/common/genutil.py
+++ b/qpid/java/common/genutil.py
@@ -170,18 +170,15 @@ class Field:
if self.type_node.name == "struct":
self.read = "(%s) dec.readStruct(%s.TYPE)" % (tname, tname)
self.write = "enc.writeStruct(%s.TYPE, check(struct).%s)" % (tname, self.name)
- self.check = ""
self.coder = "Struct"
elif self.type_node.name == "domain":
self.coder = camel(0, self.prim_type["@name"])
self.read = "%s.get(dec.read%s())" % (tname, self.coder)
self.write = "enc.write%s(check(struct).%s.getValue())" % (self.coder, self.name)
- self.check = ""
else:
self.coder = camel(0, self.type_node["@name"])
self.read = "dec.read%s()" % self.coder
self.write = "enc.write%s(check(struct).%s)" % (self.coder, self.name)
- self.check = "Validator.check%s(value);" % self.coder
self.type = jtype(self.type_node)
self.default = DEFAULTS.get(self.type, "null")
self.has = camel(1, "has", self.name)
@@ -214,6 +211,9 @@ def get_parameters(type, fields):
options = True
else:
params.append("%s %s" % (f.type, f.name))
+ if type["segments"]:
+ params.append("Header header")
+ params.append("ByteBuffer body")
if options or type.name in ("control", "command"):
params.append("Option ... _options")
return params
@@ -226,6 +226,9 @@ def get_arguments(type, fields):
options = True
else:
args.append(f.name)
+ if type["segments"]:
+ args.append("header")
+ args.append("body")
if options or type.name in ("control", "command"):
args.append("_options")
return args
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/ToyBroker.java b/qpid/java/common/src/main/java/org/apache/qpid/ToyBroker.java
index 56286a9b01..83d434b20a 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/ToyBroker.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/ToyBroker.java
@@ -45,10 +45,6 @@ class ToyBroker extends SessionDelegate
{
private ToyExchange exchange;
- private MessageTransfer xfr = null;
- private DeliveryProperties props = null;
- private Header header = null;
- private List<Data> body = null;
private Map<String,Consumer> consumers = new ConcurrentHashMap<String,Consumer>();
public ToyBroker(ToyExchange exchange)
@@ -103,22 +99,10 @@ class ToyBroker extends SessionDelegate
@Override public void messageTransfer(Session ssn, MessageTransfer xfr)
{
- this.xfr = xfr;
- body = new ArrayList<Data>();
- System.out.println("received transfer " + xfr.getDestination());
- }
-
- @Override public void header(Session ssn, Header header)
- {
- if (xfr == null || body == null)
- {
- ssn.connectionClose(ConnectionCloseCode.FRAMING_ERROR,
- "no method segment");
- ssn.close();
- return;
- }
-
- props = header.get(DeliveryProperties.class);
+ String dest = xfr.getDestination();
+ System.out.println("received transfer " + dest);
+ Header header = xfr.getHeader();
+ DeliveryProperties props = header.get(DeliveryProperties.class);
if (props != null)
{
System.out.println("received headers routing_key " + props.getRoutingKey());
@@ -130,67 +114,31 @@ class ToyBroker extends SessionDelegate
System.out.println(mp.getApplicationHeaders());
}
- this.header = header;
- }
-
- @Override public void data(Session ssn, Data data)
- {
- if (xfr == null || body == null)
+ if (exchange.route(dest,props.getRoutingKey(),xfr))
{
- ssn.connectionClose(ConnectionCloseCode.FRAMING_ERROR, "no method segment");
- ssn.close();
- return;
+ System.out.println("queued " + xfr);
+ dispatchMessages(ssn);
}
-
- body.add(data);
-
- if (data.isLast())
+ else
{
- String dest = xfr.getDestination();
- Message m = new Message(header, body);
- if (exchange.route(dest,props.getRoutingKey(),m))
+ if (props == null || !props.getDiscardUnroutable())
{
- System.out.println("queued " + m);
- dispatchMessages(ssn);
+ RangeSet ranges = new RangeSet();
+ ranges.add(xfr.getId());
+ ssn.messageReject(ranges, MessageRejectCode.UNROUTABLE,
+ "no such destination");
}
- else
- {
-
- reject(ssn);
- }
- ssn.processed(xfr);
- xfr = null;
- body = null;
- }
- }
-
- private void reject(Session ssn)
- {
- if (props != null && props.getDiscardUnroutable())
- {
- return;
- }
- else
- {
- RangeSet ranges = new RangeSet();
- ranges.add(xfr.getId());
- ssn.messageReject(ranges, MessageRejectCode.UNROUTABLE,
- "no such destination");
}
+ ssn.processed(xfr);
}
- private void transferMessageToPeer(Session ssn,String dest, Message m)
+ private void transferMessageToPeer(Session ssn,String dest, MessageTransfer m)
{
System.out.println("\n==================> Transfering message to: " +dest + "\n");
- ssn.messageTransfer(dest, MessageAcceptMode.EXPLICIT,
- MessageAcquireMode.PRE_ACQUIRED);
- ssn.header(m.header);
- for (Data d : m.body)
- {
- ssn.data(d.getData());
- }
- ssn.endData();
+ ssn.messageTransfer(m.getDestination(), MessageAcceptMode.EXPLICIT,
+ MessageAcquireMode.PRE_ACQUIRED,
+ m.getHeader(), m.getBody());
}
private void dispatchMessages(Session ssn)
@@ -204,8 +152,8 @@ class ToyBroker extends SessionDelegate
private void checkAndSendMessagesToConsumer(Session ssn,String dest)
{
Consumer c = consumers.get(dest);
- LinkedBlockingQueue<Message> queue = exchange.getQueue(c._queueName);
- Message m = queue.poll();
+ LinkedBlockingQueue<MessageTransfer> queue = exchange.getQueue(c._queueName);
+ MessageTransfer m = queue.poll();
while (m != null && c._credit>0)
{
transferMessageToPeer(ssn,dest,m);
@@ -214,43 +162,6 @@ class ToyBroker extends SessionDelegate
}
}
- class Message
- {
- private final Header header;
- private final List<Data> body;
-
- public Message(Header header, List<Data> body)
- {
- this.header = header;
- this.body = body;
- }
-
- public String toString()
- {
- StringBuilder sb = new StringBuilder();
-
- if (header != null)
- {
- boolean first = true;
- for (Struct st : header.getStructs())
- {
- if (first) { first = false; }
- else { sb.append(" "); }
- sb.append(st);
- }
- }
-
- for (Data d : body)
- {
- sb.append(" | ");
- sb.append(d);
- }
-
- return sb.toString();
- }
-
- }
-
// ugly, but who cares :)
// assumes unit is always no of messages, not bytes
// assumes it's credit mode and not window
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/ToyClient.java b/qpid/java/common/src/main/java/org/apache/qpid/ToyClient.java
index 27a48fb760..cb10859c9f 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/ToyClient.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/ToyClient.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid;
+import java.nio.*;
import java.util.*;
import org.apache.qpid.transport.*;
@@ -47,17 +48,9 @@ class ToyClient extends SessionDelegate
}
}
- @Override public void header(Session ssn, Header header)
+ @Override public void messageTransfer(Session ssn, MessageTransfer xfr)
{
- for (Struct st : header.getStructs())
- {
- System.out.println("header: " + st);
- }
- }
-
- @Override public void data(Session ssn, Data data)
- {
- System.out.println("got data: " + data);
+ System.out.println("msg: " + xfr);
}
public static final void main(String[] args)
@@ -111,16 +104,16 @@ class ToyClient extends SessionDelegate
map.put("binary", new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10});
ssn.messageTransfer("asdf", MessageAcceptMode.EXPLICIT,
- MessageAcquireMode.PRE_ACQUIRED);
- ssn.header(new DeliveryProperties(),
- new MessageProperties().setApplicationHeaders(map));
- ssn.data("this is the data");
- ssn.endData();
+ MessageAcquireMode.PRE_ACQUIRED,
+ new Header(new DeliveryProperties(),
+ new MessageProperties()
+ .setApplicationHeaders(map)),
+ "this is the data");
ssn.messageTransfer("fdsa", MessageAcceptMode.EXPLICIT,
- MessageAcquireMode.PRE_ACQUIRED);
- ssn.data("this should be rejected");
- ssn.endData();
+ MessageAcquireMode.PRE_ACQUIRED,
+ null,
+ "this should be rejected");
ssn.sync();
Future<QueueQueryResult> future = ssn.queueQuery("asdf");
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/ToyExchange.java b/qpid/java/common/src/main/java/org/apache/qpid/ToyExchange.java
index 5c3c0ac0fb..c638679596 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/ToyExchange.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/ToyExchange.java
@@ -9,42 +9,43 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-import org.apache.qpid.ToyBroker.Message;
+import org.apache.qpid.transport.MessageTransfer;
+
public class ToyExchange
{
final static String DIRECT = "amq.direct";
final static String TOPIC = "amq.topic";
- private Map<String,List<LinkedBlockingQueue<Message>>> directEx = new HashMap<String,List<LinkedBlockingQueue<Message>>>();
- private Map<String,List<LinkedBlockingQueue<Message>>> topicEx = new HashMap<String,List<LinkedBlockingQueue<Message>>>();
- private Map<String,LinkedBlockingQueue<Message>> queues = new HashMap<String,LinkedBlockingQueue<Message>>();
+ private Map<String,List<LinkedBlockingQueue<MessageTransfer>>> directEx = new HashMap<String,List<LinkedBlockingQueue<MessageTransfer>>>();
+ private Map<String,List<LinkedBlockingQueue<MessageTransfer>>> topicEx = new HashMap<String,List<LinkedBlockingQueue<MessageTransfer>>>();
+ private Map<String,LinkedBlockingQueue<MessageTransfer>> queues = new HashMap<String,LinkedBlockingQueue<MessageTransfer>>();
public void createQueue(String name)
{
- queues.put(name, new LinkedBlockingQueue<Message>());
+ queues.put(name, new LinkedBlockingQueue<MessageTransfer>());
}
- public LinkedBlockingQueue<Message> getQueue(String name)
+ public LinkedBlockingQueue<MessageTransfer> getQueue(String name)
{
return queues.get(name);
}
public void bindQueue(String type,String binding,String queueName)
{
- LinkedBlockingQueue<Message> queue = queues.get(queueName);
+ LinkedBlockingQueue<MessageTransfer> queue = queues.get(queueName);
binding = normalizeKey(binding);
if(DIRECT.equals(type))
{
if (directEx.containsKey(binding))
{
- List<LinkedBlockingQueue<Message>> list = directEx.get(binding);
+ List<LinkedBlockingQueue<MessageTransfer>> list = directEx.get(binding);
list.add(queue);
}
else
{
- List<LinkedBlockingQueue<Message>> list = new LinkedList<LinkedBlockingQueue<Message>>();
+ List<LinkedBlockingQueue<MessageTransfer>> list = new LinkedList<LinkedBlockingQueue<MessageTransfer>>();
list.add(queue);
directEx.put(binding,list);
}
@@ -53,21 +54,21 @@ public class ToyExchange
{
if (topicEx.containsKey(binding))
{
- List<LinkedBlockingQueue<Message>> list = topicEx.get(binding);
+ List<LinkedBlockingQueue<MessageTransfer>> list = topicEx.get(binding);
list.add(queue);
}
else
{
- List<LinkedBlockingQueue<Message>> list = new LinkedList<LinkedBlockingQueue<Message>>();
+ List<LinkedBlockingQueue<MessageTransfer>> list = new LinkedList<LinkedBlockingQueue<MessageTransfer>>();
list.add(queue);
topicEx.put(binding,list);
}
}
}
- public boolean route(String dest,String routingKey,Message msg)
+ public boolean route(String dest, String routingKey, MessageTransfer msg)
{
- List<LinkedBlockingQueue<Message>> queues;
+ List<LinkedBlockingQueue<MessageTransfer>> queues;
if(DIRECT.equals(dest))
{
queues = directEx.get(routingKey);
@@ -101,9 +102,9 @@ public class ToyExchange
}
}
- private List<LinkedBlockingQueue<Message>> matchWildCard(String routingKey)
+ private List<LinkedBlockingQueue<MessageTransfer>> matchWildCard(String routingKey)
{
- List<LinkedBlockingQueue<Message>> selected = new ArrayList<LinkedBlockingQueue<Message>>();
+ List<LinkedBlockingQueue<MessageTransfer>> selected = new ArrayList<LinkedBlockingQueue<MessageTransfer>>();
for(String key: topicEx.keySet())
{
@@ -111,7 +112,7 @@ public class ToyExchange
Matcher m = p.matcher(routingKey);
if (m.find())
{
- for(LinkedBlockingQueue<Message> queue : topicEx.get(key))
+ for(LinkedBlockingQueue<MessageTransfer> queue : topicEx.get(key))
{
selected.add(queue);
}
@@ -121,9 +122,9 @@ public class ToyExchange
return selected;
}
- private void storeMessage(Message msg,List<LinkedBlockingQueue<Message>> selected)
+ private void storeMessage(MessageTransfer msg,List<LinkedBlockingQueue<MessageTransfer>> selected)
{
- for(LinkedBlockingQueue<Message> queue : selected)
+ for(LinkedBlockingQueue<MessageTransfer> queue : selected)
{
queue.offer(msg);
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
index 2a248bf703..22f66ae556 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
@@ -418,9 +418,15 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt
return chars;
}
+ private String str = null;
+
public String asString()
{
- return new String(asChars());
+ if (str == null)
+ {
+ str = new String(asChars());
+ }
+ return str;
}
public boolean equals(Object o)
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Channel.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Channel.java
index 7a967adbba..624c29baff 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Channel.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Channel.java
@@ -53,11 +53,6 @@ public class Channel extends Invoker
// session may be null
private Session session;
- private Lock commandLock = new ReentrantLock();
- private boolean first = true;
- private ByteBuffer data = null;
- private boolean batch = false;
-
public Channel(Connection connection, int channel, SessionDelegate delegate)
{
this.connection = connection;
@@ -105,16 +100,6 @@ public class Channel extends Invoker
method.delegate(session, sessionDelegate);
}
- public void header(Void v, Header header)
- {
- header.delegate(session, sessionDelegate);
- }
-
- public void data(Void v, Data data)
- {
- data.delegate(session, sessionDelegate);
- }
-
public void error(Void v, ProtocolError error)
{
throw new RuntimeException(error.getMessage());
@@ -157,62 +142,12 @@ public class Channel extends Invoker
public void method(Method m)
{
- if (m.getEncodedTrack() == Frame.L4)
- {
- commandLock.lock();
- }
-
emit(m);
- if (!m.isBatch() && !m.hasPayload())
- {
- connection.flush();
- }
-
- batch = m.isBatch();
-
- if (m.getEncodedTrack() == Frame.L4 && !m.hasPayload())
- {
- commandLock.unlock();
- }
- }
-
- public void header(Header header)
- {
- emit(header);
- }
-
- public void data(ByteBuffer buf)
- {
- if (data != null)
- {
- emit(new Data(data, first, false));
- first = false;
- }
-
- data = buf;
- }
-
- public void data(String str)
- {
- data(str.getBytes());
- }
-
- public void data(byte[] bytes)
- {
- data(ByteBuffer.wrap(bytes));
- }
-
- public void end()
- {
- emit(new Data(data, first, true));
- first = true;
- data = null;
- if (!batch)
+ if (!m.isBatch())
{
connection.flush();
}
- commandLock.unlock();
}
protected void invoke(Method m)
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Data.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Data.java
deleted file mode 100644
index fbf7428864..0000000000
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Data.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.transport;
-
-import org.apache.qpid.transport.network.Frame;
-
-import java.nio.ByteBuffer;
-
-import java.util.Collections;
-
-import static org.apache.qpid.transport.util.Functions.*;
-
-
-/**
- * Data
- *
- */
-
-public class Data implements ProtocolEvent
-{
-
- private final ByteBuffer data;
- private final boolean first;
- private final boolean last;
- private int channel;
-
- public Data(ByteBuffer data, boolean first, boolean last)
- {
- this.data = data;
- this.first = first;
- this.last = last;
- }
-
- public ByteBuffer getData()
- {
- return data.slice();
- }
-
- public boolean isFirst()
- {
- return first;
- }
-
- public boolean isLast()
- {
- return last;
- }
-
- public final int getChannel()
- {
- return channel;
- }
-
- public final void setChannel(int channel)
- {
- this.channel = channel;
- }
-
- public byte getEncodedTrack()
- {
- return Frame.L4;
- }
-
- public <C> void delegate(C context, ProtocolDelegate<C> delegate)
- {
- delegate.data(context, this);
- }
-
- public String toString()
- {
- StringBuffer str = new StringBuffer();
- str.append("ch=");
- str.append(" ");
- str.append("Data(");
- str.append(str(data, 64));
- str.append(")");
- return str.toString();
- }
-
-}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Echo.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Echo.java
index f8debcf923..87bdae3866 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Echo.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Echo.java
@@ -40,22 +40,6 @@ public class Echo extends SessionDelegate
{
this.xfr = xfr;
ssn.invoke(xfr);
- }
-
- public void header(Session ssn, Header hdr)
- {
- ssn.header(hdr);
- }
-
- public void data(Session ssn, Data data)
- {
- ssn.data(data.getData());
- if (data.isLast())
- {
- ssn.endData();
- }
-
- // XXX: should be able to get command-id from any segment
ssn.processed(xfr);
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Header.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Header.java
index 3b351ee828..9b6ab4951b 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Header.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Header.java
@@ -22,6 +22,7 @@ package org.apache.qpid.transport;
import org.apache.qpid.transport.network.Frame;
+import java.util.Arrays;
import java.util.List;
import java.nio.ByteBuffer;
@@ -32,33 +33,25 @@ import java.nio.ByteBuffer;
* @author Rafael H. Schloming
*/
-public class Header implements ProtocolEvent {
+public class Header {
private final List<Struct> structs;
- private ByteBuffer _buf;
- private boolean _noPayload;
- private int channel;
- public Header(List<Struct> structs, boolean lastframe)
+ public Header(List<Struct> structs)
{
this.structs = structs;
- _noPayload= lastframe;
}
- public List<Struct> getStructs()
+ public Header(Struct ... structs)
{
- return structs;
+ this(Arrays.asList(structs));
}
- public void setBuf(ByteBuffer buf)
+ public List<Struct> getStructs()
{
- _buf = buf;
+ return structs;
}
- public ByteBuffer getBuf()
- {
- return _buf;
- }
public <T> T get(Class<T> klass)
{
for (Struct st : structs)
@@ -72,36 +65,9 @@ public class Header implements ProtocolEvent {
return null;
}
- public final int getChannel()
- {
- return channel;
- }
-
- public final void setChannel(int channel)
- {
- this.channel = channel;
- }
-
- public byte getEncodedTrack()
- {
- return Frame.L4;
- }
-
- public <C> void delegate(C context, ProtocolDelegate<C> delegate)
- {
- delegate.header(context, this);
- }
-
- public boolean hasNoPayload()
- {
- return _noPayload;
- }
-
public String toString()
{
StringBuffer str = new StringBuffer();
- str.append("ch=");
- str.append(channel);
str.append(" Header(");
boolean first = true;
for (Struct s : structs)
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java
index 1c80d8c00c..6b99f6d5d3 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java
@@ -22,6 +22,10 @@ package org.apache.qpid.transport;
import org.apache.qpid.transport.network.Frame;
+import java.nio.ByteBuffer;
+
+import static org.apache.qpid.transport.util.Functions.*;
+
/**
* Method
*
@@ -88,6 +92,26 @@ public abstract class Method extends Struct implements ProtocolEvent
public abstract boolean hasPayload();
+ public Header getHeader()
+ {
+ return null;
+ }
+
+ public void setHeader(Header header)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public ByteBuffer getBody()
+ {
+ return null;
+ }
+
+ public void setBody(ByteBuffer body)
+ {
+ throw new UnsupportedOperationException();
+ }
+
public abstract byte getEncodedTrack();
public abstract <C> void dispatch(C context, MethodDelegate<C> delegate);
@@ -134,6 +158,21 @@ public abstract class Method extends Struct implements ProtocolEvent
str.append(" ");
str.append(super.toString());
+ Header hdr = getHeader();
+ if (hdr != null)
+ {
+ for (Struct st : hdr.getStructs())
+ {
+ str.append("\n ");
+ str.append(st);
+ }
+ }
+ ByteBuffer body = getBody();
+ if (body != null)
+ {
+ str.append("\n body=");
+ str.append(str(body, 64));
+ }
return str.toString();
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolDelegate.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolDelegate.java
index 9fa28fbe23..a90948fc1d 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolDelegate.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolDelegate.java
@@ -35,10 +35,6 @@ public interface ProtocolDelegate<C>
void command(C context, Method command);
- void header(C context, Header header);
-
- void data(C context, Data data);
-
void error(C context, ProtocolError error);
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
index 8ec13c0ee7..1400bd2e5b 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
@@ -36,6 +36,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.qpid.transport.Option.*;
import static org.apache.qpid.transport.util.Functions.*;
import static org.apache.qpid.util.Serial.*;
+import static org.apache.qpid.util.Strings.*;
/**
* Session
@@ -271,7 +272,7 @@ public class Session extends Invoker
}
needSync = !m.isSync();
channel.method(m);
- if (autoSync && !m.hasPayload())
+ if (autoSync)
{
sync();
}
@@ -290,50 +291,6 @@ public class Session extends Invoker
}
}
- public void header(Header header)
- {
- channel.header(header);
- }
-
- public Header header(List<Struct> structs)
- {
- Header res = new Header(structs, false);
- header(res);
- return res;
- }
-
- public Header header(Struct ... structs)
- {
- return header(Arrays.asList(structs));
- }
-
- public void data(ByteBuffer buf)
- {
- channel.data(buf);
- }
-
- public void data(String str)
- {
- channel.data(str);
- }
-
- public void data(byte[] bytes)
- {
- channel.data(bytes);
- }
-
- public void endData()
- {
- channel.end();
- synchronized (commands)
- {
- if (autoSync)
- {
- sync();
- }
- }
- }
-
public void sync()
{
sync(timeout);
@@ -501,6 +458,26 @@ public class Session extends Invoker
}
+ public final void messageTransfer(String destination,
+ MessageAcceptMode acceptMode,
+ MessageAcquireMode acquireMode,
+ Header header,
+ byte[] body,
+ Option ... _options) {
+ messageTransfer(destination, acceptMode, acquireMode, header,
+ ByteBuffer.wrap(body), _options);
+ }
+
+ public final void messageTransfer(String destination,
+ MessageAcceptMode acceptMode,
+ MessageAcquireMode acquireMode,
+ Header header,
+ String body,
+ Option ... _options) {
+ messageTransfer(destination, acceptMode, acquireMode, header,
+ toUTF8(body), _options);
+ }
+
public void close()
{
sessionRequestTimeout(0);
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
index dc400d3098..b91763509c 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
@@ -48,10 +48,6 @@ public abstract class SessionDelegate
}
}
- public void header(Session ssn, Header header) { }
-
- public void data(Session ssn, Data data) { }
-
public void error(Session ssn, ProtocolError error) { }
@Override public void executionResult(Session ssn, ExecutionResult result)
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java
index 788b6a55e3..390de881ab 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java
@@ -35,24 +35,29 @@ public final class BBEncoder extends AbstractEncoder
{
private ByteBuffer out;
+ private int segment;
public BBEncoder(int capacity) {
out = ByteBuffer.allocate(capacity);
out.order(ByteOrder.BIG_ENDIAN);
+ segment = 0;
}
public void init()
{
out.clear();
+ segment = 0;
}
- public ByteBuffer done()
+ public ByteBuffer segment()
{
- out.flip();
- ByteBuffer encoded = ByteBuffer.allocate(out.remaining());
- encoded.put(out);
- encoded.flip();
- return encoded;
+ int pos = out.position();
+ out.position(segment);
+ ByteBuffer slice = out.slice();
+ slice.limit(pos - segment);
+ out.position(pos);
+ segment = pos;
+ return slice;
}
private void grow(int size)
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/Validator.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/Validator.java
index ae12d35209..c1d30eacc3 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/Validator.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/Validator.java
@@ -154,7 +154,7 @@ public class Validator
public static final void checkMap(Map<String,Object> map)
{
- if (map == null)
+ if (map == null || map.isEmpty())
{
return;
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java
index 2c09776c3d..b808156dc6 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java
@@ -30,7 +30,6 @@ import java.nio.ByteBuffer;
import org.apache.qpid.transport.codec.BBDecoder;
import org.apache.qpid.transport.codec.Decoder;
-import org.apache.qpid.transport.Data;
import org.apache.qpid.transport.Header;
import org.apache.qpid.transport.Method;
import org.apache.qpid.transport.ProtocolError;
@@ -51,6 +50,7 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate
private final Receiver<ProtocolEvent> receiver;
private final Map<Integer,List<Frame>> segments;
+ private final Method[] incomplete;
private final ThreadLocal<BBDecoder> decoder = new ThreadLocal<BBDecoder>()
{
public BBDecoder initialValue()
@@ -63,6 +63,7 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate
{
this.receiver = receiver;
segments = new HashMap<Integer,List<Frame>>();
+ incomplete = new Method[64*1024];
}
private int segmentKey(Frame frame)
@@ -97,11 +98,6 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate
receiver.received(event);
}
- private void emit(Frame frame, ProtocolEvent event)
- {
- emit(frame.getChannel(), event);
- }
-
public void received(NetworkEvent event)
{
event.delegate(this);
@@ -122,32 +118,18 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate
emit(0, header);
}
- public void frame(Frame frame)
- {
- switch (frame.getType())
- {
- case BODY:
- emit(frame, new Data(frame.getBody(), frame.isFirstFrame(),
- frame.isLastFrame()));
- break;
- default:
- assemble(frame);
- break;
- }
- }
-
public void error(ProtocolError error)
{
emit(0, error);
}
- private void assemble(Frame frame)
+ public void frame(Frame frame)
{
ByteBuffer segment;
if (frame.isFirstFrame() && frame.isLastFrame())
{
segment = frame.getBody();
- emit(frame, decode(frame, segment));
+ assemble(frame, segment);
}
else
{
@@ -179,38 +161,63 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate
segment.put(f.getBody());
}
segment.flip();
- emit(frame, decode(frame, segment));
+ assemble(frame, segment);
}
}
}
- private ProtocolEvent decode(Frame frame, ByteBuffer segment)
+ private void assemble(Frame frame, ByteBuffer segment)
{
BBDecoder dec = decoder.get();
dec.init(segment);
+ int channel = frame.getChannel();
+ Method command;
+
switch (frame.getType())
{
case CONTROL:
int controlType = dec.readUint16();
Method control = Method.create(controlType);
control.read(dec);
- return control;
+ emit(channel, control);
+ break;
case COMMAND:
int commandType = dec.readUint16();
// read in the session header, right now we don't use it
dec.readUint16();
- Method command = Method.create(commandType);
+ command = Method.create(commandType);
command.read(dec);
- return command;
+ if (command.hasPayload())
+ {
+ incomplete[channel] = command;
+ }
+ else
+ {
+ emit(channel, command);
+ }
+ break;
case HEADER:
+ command = incomplete[channel];
List<Struct> structs = new ArrayList();
while (dec.hasRemaining())
{
structs.add(dec.readStruct32());
}
- return new Header(structs, frame.isLastFrame() && frame.isLastSegment());
+ command.setHeader(new Header(structs));
+ if (frame.isLastSegment())
+ {
+ incomplete[channel] = null;
+ emit(channel, command);
+ }
+ break;
+ case BODY:
+ command = incomplete[channel];
+ command.setBody(segment);
+ incomplete[channel] = null;
+ emit(channel, command);
+ break;
default:
throw new IllegalStateException("unknown frame type: " + frame.getType());
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
index 1ed446af2f..444c7d3f14 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
@@ -22,7 +22,6 @@ package org.apache.qpid.transport.network;
import org.apache.qpid.transport.codec.BBEncoder;
-import org.apache.qpid.transport.Data;
import org.apache.qpid.transport.Header;
import org.apache.qpid.transport.Method;
import org.apache.qpid.transport.ProtocolDelegate;
@@ -34,7 +33,8 @@ import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.Struct;
import java.nio.ByteBuffer;
-import java.util.Iterator;
+import java.nio.ByteOrder;
+import java.util.List;
import static org.apache.qpid.transport.network.Frame.*;
@@ -46,12 +46,14 @@ import static java.lang.Math.*;
*
*/
-public class Disassembler implements Sender<ProtocolEvent>,
- ProtocolDelegate<Void>
+public final class Disassembler implements Sender<ProtocolEvent>,
+ ProtocolDelegate<Void>
{
- private final Sender<NetworkEvent> sender;
+ private final Sender<ByteBuffer> sender;
private final int maxPayload;
+ private final ByteBuffer header;
+ private final Object sendlock = new Object();
private final ThreadLocal<BBEncoder> encoder = new ThreadLocal()
{
public BBEncoder initialValue()
@@ -60,7 +62,7 @@ public class Disassembler implements Sender<ProtocolEvent>,
}
};
- public Disassembler(Sender<NetworkEvent> sender, int maxFrame)
+ public Disassembler(Sender<ByteBuffer> sender, int maxFrame)
{
if (maxFrame <= HEADER_SIZE || maxFrame >= 64*1024)
{
@@ -69,6 +71,8 @@ public class Disassembler implements Sender<ProtocolEvent>,
}
this.sender = sender;
this.maxPayload = maxFrame - HEADER_SIZE;
+ this.header = ByteBuffer.allocate(HEADER_SIZE);
+ this.header.order(ByteOrder.BIG_ENDIAN);
}
@@ -79,60 +83,80 @@ public class Disassembler implements Sender<ProtocolEvent>,
public void flush()
{
- sender.flush();
+ synchronized (sendlock)
+ {
+ sender.flush();
+ }
}
public void close()
{
- sender.close();
+ synchronized (sendlock)
+ {
+ sender.close();
+ }
+ }
+
+ private final void frame(byte flags, byte type, byte track, int channel, int size, ByteBuffer buf)
+ {
+ synchronized (sendlock)
+ {
+ header.put(0, flags);
+ header.put(1, type);
+ header.putShort(2, (short) (size + HEADER_SIZE));
+ header.put(5, track);
+ header.putShort(6, (short) channel);
+
+ header.rewind();
+
+ sender.send(header);
+
+ int limit = buf.limit();
+ buf.limit(buf.position() + size);
+ sender.send(buf);
+ buf.limit(limit);
+ }
}
private void fragment(byte flags, SegmentType type, ProtocolEvent event,
ByteBuffer buf, boolean first, boolean last)
{
+ byte typeb = (byte) type.getValue();
byte track = event.getEncodedTrack() == Frame.L4 ? (byte) 1 : (byte) 0;
- if(!buf.hasRemaining())
+ int remaining = buf.remaining();
+ while (true)
{
- //empty data
- byte nflags = flags;
+ int size = min(maxPayload, remaining);
+ remaining -= size;
+
+ byte newflags = flags;
if (first)
{
- nflags |= FIRST_FRAME;
+ newflags |= FIRST_FRAME;
first = false;
}
- nflags |= LAST_FRAME;
- Frame frame = new Frame(nflags, type, track, event.getChannel(), buf.slice());
- sender.send(frame);
- }
- else
- {
- while (buf.hasRemaining())
+ if (last && remaining == 0)
{
- ByteBuffer slice = buf.slice();
- slice.limit(min(maxPayload, slice.remaining()));
- buf.position(buf.position() + slice.remaining());
-
- byte newflags = flags;
- if (first)
- {
- newflags |= FIRST_FRAME;
- first = false;
- }
- if (last && !buf.hasRemaining())
- {
- newflags |= LAST_FRAME;
- }
-
- Frame frame = new Frame(newflags, type, track, event.getChannel(), slice);
- sender.send(frame);
+ newflags |= LAST_FRAME;
+ }
+
+ frame(newflags, typeb, track, event.getChannel(), size, buf);
+
+ if (remaining == 0)
+ {
+ break;
}
}
}
public void init(Void v, ProtocolHeader header)
{
- sender.send(header);
+ synchronized (sendlock)
+ {
+ sender.send(header.toByteBuffer());
+ sender.flush();
+ }
}
public void control(Void v, Method method)
@@ -170,48 +194,43 @@ public class Disassembler implements Sender<ProtocolEvent>,
}
}
method.write(enc);
- ByteBuffer buf = enc.done();
+ ByteBuffer methodSeg = enc.segment();
byte flags = FIRST_SEG;
- if (!method.hasPayload())
+ boolean payload = method.hasPayload();
+ if (!payload)
{
flags |= LAST_SEG;
}
- fragment(flags, type, method, buf, true, true);
- }
-
- public void header(Void v, Header header)
- {
- ByteBuffer buf;
- if (header.getBuf() == null)
+ ByteBuffer headerSeg = null;
+ if (payload)
{
- BBEncoder enc = encoder.get();
- enc.init();
- for (Struct st : header.getStructs())
+ final Header hdr = method.getHeader();
+ final List<Struct> structs = hdr.getStructs();
+ final int nstructs = structs.size();
+ for (int i = 0; i < nstructs; i++)
{
- enc.writeStruct32(st);
+ enc.writeStruct32(structs.get(i));
}
- buf = enc.done();
- header.setBuf(buf);
+ headerSeg = enc.segment();
}
- else
+
+ synchronized (sendlock)
{
- buf = header.getBuf();
- buf.flip();
+ fragment(flags, type, method, methodSeg, true, true);
+ if (payload)
+ {
+ fragment((byte) 0x0, SegmentType.HEADER, method, headerSeg, true, true);
+ fragment(LAST_SEG, SegmentType.BODY, method, method.getBody(), true, true);
+ }
}
- fragment((byte) 0x0, SegmentType.HEADER, header, buf, true, true);
- }
-
- public void data(Void v, Data data)
- {
- fragment(LAST_SEG, SegmentType.BODY, data, data.getData(), data.isFirst(), data.isLast());
}
public void error(Void v, ProtocolError error)
{
- sender.send(error);
+ throw new IllegalArgumentException("" + error);
}
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutputHandler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutputHandler.java
deleted file mode 100644
index b3f400a6e7..0000000000
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutputHandler.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.transport.network;
-
-import java.nio.ByteBuffer;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.qpid.transport.Constant;
-import org.apache.qpid.transport.ProtocolError;
-import org.apache.qpid.transport.ProtocolHeader;
-import org.apache.qpid.transport.Sender;
-
-import static org.apache.qpid.transport.network.Frame.*;
-
-
-/**
- * OutputHandler
- *
- */
-
-public class OutputHandler implements Sender<NetworkEvent>, NetworkDelegate
-{
-
- private Sender<ByteBuffer> sender;
- private Object lock = new Object();
- private int bytes = 0;
- private List<Frame> frames = new ArrayList<Frame>();
-
- public OutputHandler(Sender<ByteBuffer> sender)
- {
- this.sender = sender;
- }
-
- public void send(NetworkEvent event)
- {
- event.delegate(this);
- }
-
- public void close()
- {
- synchronized (lock)
- {
- sender.close();
- }
- }
-
- public void init(ProtocolHeader header)
- {
- synchronized (lock)
- {
- sender.send(header.toByteBuffer());
- sender.flush();
- }
- }
-
- public void frame(Frame frame)
- {
- synchronized (lock)
- {
- frames.add(frame);
- bytes += HEADER_SIZE + frame.getSize();
-
- if (bytes > 64*1024)
- {
- flush();
- }
- }
- }
-
- public void flush()
- {
- synchronized (lock)
- {
- ByteBuffer buf = ByteBuffer.allocate(bytes);
- int nframes = frames.size();
- for (int i = 0; i < nframes; i++)
- {
- Frame frame = frames.get(i);
- buf.put(frame.getFlags());
- buf.put((byte) frame.getType().getValue());
- buf.putShort((short) (frame.getSize() + HEADER_SIZE));
- // RESERVED
- buf.put(RESERVED);
- buf.put(frame.getTrack());
- buf.putShort((short) frame.getChannel());
- // RESERVED
- buf.putInt(0);
- buf.put(frame.getBody());
- }
- buf.flip();
-
- frames.clear();
- bytes = 0;
-
- sender.send(buf);
- sender.flush();
- }
- }
-
- public void error(ProtocolError error)
- {
- throw new IllegalStateException("XXX");
- }
-
-}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
index 37910ade0d..7ac5649e99 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
@@ -23,7 +23,6 @@ import java.io.OutputStream;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.TransportException;
@@ -48,8 +47,9 @@ final class IoSender extends Thread implements Sender<ByteBuffer>
private final OutputStream out;
private final byte[] buffer;
- private final AtomicInteger head = new AtomicInteger(START);
- private final AtomicInteger tail = new AtomicInteger(START);
+ private volatile int head = START;
+ private volatile int tail = START;
+ private volatile boolean idle = true;
private final Object notFull = new Object();
private final Object notEmpty = new Object();
private final AtomicBoolean closed = new AtomicBoolean(false);
@@ -96,16 +96,17 @@ final class IoSender extends Thread implements Sender<ByteBuffer>
while (remaining > 0)
{
- final int hd = head.get();
- final int tl = tail.get();
+ final int hd = head;
+ final int tl = tail;
if (hd - tl >= size)
{
+ flush();
synchronized (notFull)
{
long start = System.currentTimeMillis();
long elapsed = 0;
- while (head.get() - tail.get() >= size && elapsed < timeout)
+ while (head - tail >= size && elapsed < timeout)
{
try
{
@@ -118,9 +119,9 @@ final class IoSender extends Thread implements Sender<ByteBuffer>
elapsed += System.currentTimeMillis() - start;
}
- if (head.get() - tail.get() >= size)
+ if (head - tail >= size)
{
- throw new TransportException(String.format("write timed out: %s, %s", head.get(), tail.get()));
+ throw new TransportException(String.format("write timed out: %s, %s", head, tail));
}
}
continue;
@@ -140,21 +141,20 @@ final class IoSender extends Thread implements Sender<ByteBuffer>
}
buf.get(buffer, hd_idx, length);
- head.getAndAdd(length);
- if (hd == tail.get())
- {
- synchronized (notEmpty)
- {
- notEmpty.notify();
- }
- }
+ head += length;
remaining -= length;
}
}
public void flush()
{
- // pass
+ if (idle)
+ {
+ synchronized (notEmpty)
+ {
+ notEmpty.notify();
+ }
+ }
}
public void close()
@@ -206,8 +206,8 @@ final class IoSender extends Thread implements Sender<ByteBuffer>
while (true)
{
- final int hd = head.get();
- final int tl = tail.get();
+ final int hd = head;
+ final int tl = tail;
if (hd == tl)
{
@@ -216,9 +216,11 @@ final class IoSender extends Thread implements Sender<ByteBuffer>
break;
}
+ idle = true;
+
synchronized (notEmpty)
{
- while (head.get() == tail.get() && !closed.get())
+ while (head == tail && !closed.get())
{
try
{
@@ -231,6 +233,8 @@ final class IoSender extends Thread implements Sender<ByteBuffer>
}
}
+ idle = false;
+
continue;
}
@@ -258,8 +262,8 @@ final class IoSender extends Thread implements Sender<ByteBuffer>
close(false);
break;
}
- tail.getAndAdd(length);
- if (head.get() - tl >= size)
+ tail += length;
+ if (head - tl >= size)
{
synchronized (notFull)
{
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java
index 52accb6b97..3b543b3e60 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java
@@ -33,7 +33,6 @@ import org.apache.qpid.transport.TransportException;
import org.apache.qpid.transport.network.Assembler;
import org.apache.qpid.transport.network.Disassembler;
import org.apache.qpid.transport.network.InputHandler;
-import org.apache.qpid.transport.network.OutputHandler;
import org.apache.qpid.transport.util.Logger;
/**
@@ -48,6 +47,14 @@ import org.apache.qpid.transport.util.Logger;
public final class IoTransport
{
+ static
+ {
+ org.apache.mina.common.ByteBuffer.setAllocator
+ (new org.apache.mina.common.SimpleByteBufferAllocator());
+ org.apache.mina.common.ByteBuffer.setUseDirectBuffers
+ (Boolean.getBoolean("amqj.enableDirectBuffers"));
+ }
+
private static final Logger log = Logger.get(IoTransport.class);
private static int DEFAULT_READ_WRITE_BUFFER_SIZE = 64 * 1024;
@@ -104,8 +111,7 @@ public final class IoTransport
sender = new IoSender(this, 2*writeBufferSize, timeout);
Connection conn = new Connection
- (new Disassembler(new OutputHandler(sender), 64*1024 - 1),
- delegate);
+ (new Disassembler(sender, 64*1024 - 1), delegate);
receiver = new IoReceiver(this, new InputHandler(new Assembler(conn)),
2*readBufferSize, timeout);
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaHandler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaHandler.java
index bcac7c4e16..16a1e20b10 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaHandler.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaHandler.java
@@ -44,7 +44,6 @@ import org.apache.qpid.transport.util.Logger;
import org.apache.qpid.transport.network.Assembler;
import org.apache.qpid.transport.network.Disassembler;
import org.apache.qpid.transport.network.InputHandler;
-import org.apache.qpid.transport.network.OutputHandler;
import static org.apache.qpid.transport.util.Functions.*;
@@ -292,7 +291,7 @@ public class MinaHandler<E> implements IoHandler
{
// XXX: hardcoded max-frame
return new Connection
- (new Disassembler(new OutputHandler(sender), MAX_FRAME_SIZE), delegate);
+ (new Disassembler(sender, MAX_FRAME_SIZE), delegate);
}
public Receiver<java.nio.ByteBuffer> receiver(Connection conn)
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioHandler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioHandler.java
index f0161efe97..51e41b26f7 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioHandler.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioHandler.java
@@ -17,7 +17,6 @@ import org.apache.qpid.transport.Receiver;
import org.apache.qpid.transport.network.Assembler;
import org.apache.qpid.transport.network.Disassembler;
import org.apache.qpid.transport.network.InputHandler;
-import org.apache.qpid.transport.network.OutputHandler;
public class NioHandler implements Runnable
{
@@ -68,8 +67,7 @@ public class NioHandler implements Runnable
NioSender sender = new NioSender(_ch);
Connection con = new Connection
- (new Disassembler(new OutputHandler(sender), 64*1024 - 1),
- delegate);
+ (new Disassembler(sender, 64*1024 - 1), delegate);
con.setConnectionId(_count.incrementAndGet());
_handlers.put(con.getConnectionId(),sender);
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/util/Strings.java b/qpid/java/common/src/main/java/org/apache/qpid/util/Strings.java
new file mode 100644
index 0000000000..4b199bafe6
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/util/Strings.java
@@ -0,0 +1,82 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.util;
+
+import java.io.UnsupportedEncodingException;
+
+
+/**
+ * Strings
+ *
+ */
+
+public final class Strings
+{
+
+ private static final byte[] EMPTY = new byte[0];
+
+ private static final ThreadLocal<char[]> charbuf = new ThreadLocal()
+ {
+ public char[] initialValue()
+ {
+ return new char[4096];
+ }
+ };
+
+ public static final byte[] toUTF8(String str)
+ {
+ if (str == null)
+ {
+ return EMPTY;
+ }
+ else
+ {
+ final int size = str.length();
+ char[] chars = charbuf.get();
+ if (size > chars.length)
+ {
+ chars = new char[Math.max(size, 2*chars.length)];
+ charbuf.set(chars);
+ }
+
+ str.getChars(0, size, chars, 0);
+ final byte[] bytes = new byte[size];
+ for (int i = 0; i < size; i++)
+ {
+ if (chars[i] > 127)
+ {
+ try
+ {
+ return str.getBytes("UTF-8");
+ }
+ catch (UnsupportedEncodingException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ bytes[i] = (byte) chars[i];
+ }
+ return bytes;
+ }
+ }
+
+}