diff options
author | Rafael H. Schloming <rhs@apache.org> | 2008-08-05 19:33:11 +0000 |
---|---|---|
committer | Rafael H. Schloming <rhs@apache.org> | 2008-08-05 19:33:11 +0000 |
commit | fe1cbdb8a780f78bf6e249e3d41d8de4cce22777 (patch) | |
tree | 9d55708c644751f83b78b497240b0e6317008b51 /qpid/java/common | |
parent | f04df8a8c20938c5b8bb176ca35db0376ad76c60 (diff) | |
download | qpid-python-fe1cbdb8a780f78bf6e249e3d41d8de4cce22777.tar.gz |
Profiling driven changes:
- made AMQShortString cache the toString() value
- added static initializer to IoTransport to disable use of pooled
byte buffers
- modified IoSender to permit buffering
- removed OutputHandler and eliminated intermediate Frame generation
between Disassembler and Sender<ByteBuffer> (IoSender)
- made Disassembler take advantage of IoSender's buffering
- removed Header and Data as distinct protocol events, added Header
and Body members to MessageTransfer
- modified Assembler and Disassembler to decode/encode Header and
Data directly to/from MessageTransfer
- modified Disassembler to only write data if encoding of headers is
successful
- added Strings.toUTF8(String) -> byte[] to do proper UTF-8 encoding
that is also fast for 7-bit ascii
- modified JMSTextMessage to use the Strings.toUTF8
- modified QpidBench to only generate 7-bit ascii when using
TextMessage
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@682887 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/common')
25 files changed, 433 insertions, 681 deletions
diff --git a/qpid/java/common/Composite.tpl b/qpid/java/common/Composite.tpl index 37e3bf8853..283fa24641 100644 --- a/qpid/java/common/Composite.tpl +++ b/qpid/java/common/Composite.tpl @@ -1,5 +1,6 @@ package org.apache.qpid.transport; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.LinkedHashMap; import java.util.List; @@ -9,7 +10,6 @@ import java.util.UUID; import org.apache.qpid.transport.codec.Decoder; import org.apache.qpid.transport.codec.Encodable; import org.apache.qpid.transport.codec.Encoder; -import org.apache.qpid.transport.codec.Validator; import org.apache.qpid.transport.network.Frame; @@ -18,11 +18,13 @@ from genutil import * cls = klass(type)["@name"] +segments = type["segments"] + if type.name in ("control", "command"): base = "Method" size = 0 pack = 2 - if type["segments"]: + if segments: payload = "true" else: payload = "false" @@ -86,6 +88,10 @@ options = get_options(fields) for f in fields: if not f.empty: out(" private $(f.type) $(f.name);\n") + +if segments: + out(" private Header header;\n") + out(" private ByteBuffer body;\n") } ${ @@ -99,6 +105,10 @@ for f in fields: if f.option: continue out(" $(f.set)($(f.name));\n") +if segments: + out(" setHeader(header);\n") + out(" setBody(body);\n") + if options or base == "Method": out(""" for (int i=0; i < _options.length; i++) { @@ -154,7 +164,6 @@ else: } public final $name $(f.set)($(f.type) value) { - $(f.check) ${ if not f.empty: out(" this.$(f.name) = value;") @@ -173,6 +182,44 @@ if pack > 0: """) } +${ +if segments: + out(""" public final Header getHeader() { + return this.header; + } + + public final void setHeader(Header header) { + this.header = header; + } + + public final $name header(Header header) { + setHeader(header); + return this; + } + + public final ByteBuffer getBody() { + if (this.body == null) + { + return null; + } + else + { + return this.body.slice(); + } + } + + public final void setBody(ByteBuffer body) { + this.body = body; + } + + public final $name body(ByteBuffer body) + { + setBody(body); + return this; + } +""") +} + public void write(Encoder enc) { ${ diff --git a/qpid/java/common/Invoker.tpl b/qpid/java/common/Invoker.tpl index 21a17624a6..9158922df7 100644 --- a/qpid/java/common/Invoker.tpl +++ b/qpid/java/common/Invoker.tpl @@ -1,5 +1,6 @@ package org.apache.qpid.transport; +import java.nio.ByteBuffer; import java.util.List; import java.util.Map; import java.util.UUID; @@ -32,9 +33,9 @@ for c in composites: jclass = "" out(""" - public final $jresult $(dromedary(name))($(", ".join(params))) { - $(jreturn)invoke(new $name($(", ".join(args)))$jclass); - } + public final $jresult $(dromedary(name))($(", ".join(params))) { + $(jreturn)invoke(new $name($(", ".join(args)))$jclass); + } """) } diff --git a/qpid/java/common/genutil.py b/qpid/java/common/genutil.py index 2f1caa41c4..f8f234548c 100644 --- a/qpid/java/common/genutil.py +++ b/qpid/java/common/genutil.py @@ -170,18 +170,15 @@ class Field: if self.type_node.name == "struct": self.read = "(%s) dec.readStruct(%s.TYPE)" % (tname, tname) self.write = "enc.writeStruct(%s.TYPE, check(struct).%s)" % (tname, self.name) - self.check = "" self.coder = "Struct" elif self.type_node.name == "domain": self.coder = camel(0, self.prim_type["@name"]) self.read = "%s.get(dec.read%s())" % (tname, self.coder) self.write = "enc.write%s(check(struct).%s.getValue())" % (self.coder, self.name) - self.check = "" else: self.coder = camel(0, self.type_node["@name"]) self.read = "dec.read%s()" % self.coder self.write = "enc.write%s(check(struct).%s)" % (self.coder, self.name) - self.check = "Validator.check%s(value);" % self.coder self.type = jtype(self.type_node) self.default = DEFAULTS.get(self.type, "null") self.has = camel(1, "has", self.name) @@ -214,6 +211,9 @@ def get_parameters(type, fields): options = True else: params.append("%s %s" % (f.type, f.name)) + if type["segments"]: + params.append("Header header") + params.append("ByteBuffer body") if options or type.name in ("control", "command"): params.append("Option ... _options") return params @@ -226,6 +226,9 @@ def get_arguments(type, fields): options = True else: args.append(f.name) + if type["segments"]: + args.append("header") + args.append("body") if options or type.name in ("control", "command"): args.append("_options") return args diff --git a/qpid/java/common/src/main/java/org/apache/qpid/ToyBroker.java b/qpid/java/common/src/main/java/org/apache/qpid/ToyBroker.java index 56286a9b01..83d434b20a 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/ToyBroker.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/ToyBroker.java @@ -45,10 +45,6 @@ class ToyBroker extends SessionDelegate { private ToyExchange exchange; - private MessageTransfer xfr = null; - private DeliveryProperties props = null; - private Header header = null; - private List<Data> body = null; private Map<String,Consumer> consumers = new ConcurrentHashMap<String,Consumer>(); public ToyBroker(ToyExchange exchange) @@ -103,22 +99,10 @@ class ToyBroker extends SessionDelegate @Override public void messageTransfer(Session ssn, MessageTransfer xfr) { - this.xfr = xfr; - body = new ArrayList<Data>(); - System.out.println("received transfer " + xfr.getDestination()); - } - - @Override public void header(Session ssn, Header header) - { - if (xfr == null || body == null) - { - ssn.connectionClose(ConnectionCloseCode.FRAMING_ERROR, - "no method segment"); - ssn.close(); - return; - } - - props = header.get(DeliveryProperties.class); + String dest = xfr.getDestination(); + System.out.println("received transfer " + dest); + Header header = xfr.getHeader(); + DeliveryProperties props = header.get(DeliveryProperties.class); if (props != null) { System.out.println("received headers routing_key " + props.getRoutingKey()); @@ -130,67 +114,31 @@ class ToyBroker extends SessionDelegate System.out.println(mp.getApplicationHeaders()); } - this.header = header; - } - - @Override public void data(Session ssn, Data data) - { - if (xfr == null || body == null) + if (exchange.route(dest,props.getRoutingKey(),xfr)) { - ssn.connectionClose(ConnectionCloseCode.FRAMING_ERROR, "no method segment"); - ssn.close(); - return; + System.out.println("queued " + xfr); + dispatchMessages(ssn); } - - body.add(data); - - if (data.isLast()) + else { - String dest = xfr.getDestination(); - Message m = new Message(header, body); - if (exchange.route(dest,props.getRoutingKey(),m)) + if (props == null || !props.getDiscardUnroutable()) { - System.out.println("queued " + m); - dispatchMessages(ssn); + RangeSet ranges = new RangeSet(); + ranges.add(xfr.getId()); + ssn.messageReject(ranges, MessageRejectCode.UNROUTABLE, + "no such destination"); } - else - { - - reject(ssn); - } - ssn.processed(xfr); - xfr = null; - body = null; - } - } - - private void reject(Session ssn) - { - if (props != null && props.getDiscardUnroutable()) - { - return; - } - else - { - RangeSet ranges = new RangeSet(); - ranges.add(xfr.getId()); - ssn.messageReject(ranges, MessageRejectCode.UNROUTABLE, - "no such destination"); } + ssn.processed(xfr); } - private void transferMessageToPeer(Session ssn,String dest, Message m) + private void transferMessageToPeer(Session ssn,String dest, MessageTransfer m) { System.out.println("\n==================> Transfering message to: " +dest + "\n"); - ssn.messageTransfer(dest, MessageAcceptMode.EXPLICIT, - MessageAcquireMode.PRE_ACQUIRED); - ssn.header(m.header); - for (Data d : m.body) - { - ssn.data(d.getData()); - } - ssn.endData(); + ssn.messageTransfer(m.getDestination(), MessageAcceptMode.EXPLICIT, + MessageAcquireMode.PRE_ACQUIRED, + m.getHeader(), m.getBody()); } private void dispatchMessages(Session ssn) @@ -204,8 +152,8 @@ class ToyBroker extends SessionDelegate private void checkAndSendMessagesToConsumer(Session ssn,String dest) { Consumer c = consumers.get(dest); - LinkedBlockingQueue<Message> queue = exchange.getQueue(c._queueName); - Message m = queue.poll(); + LinkedBlockingQueue<MessageTransfer> queue = exchange.getQueue(c._queueName); + MessageTransfer m = queue.poll(); while (m != null && c._credit>0) { transferMessageToPeer(ssn,dest,m); @@ -214,43 +162,6 @@ class ToyBroker extends SessionDelegate } } - class Message - { - private final Header header; - private final List<Data> body; - - public Message(Header header, List<Data> body) - { - this.header = header; - this.body = body; - } - - public String toString() - { - StringBuilder sb = new StringBuilder(); - - if (header != null) - { - boolean first = true; - for (Struct st : header.getStructs()) - { - if (first) { first = false; } - else { sb.append(" "); } - sb.append(st); - } - } - - for (Data d : body) - { - sb.append(" | "); - sb.append(d); - } - - return sb.toString(); - } - - } - // ugly, but who cares :) // assumes unit is always no of messages, not bytes // assumes it's credit mode and not window diff --git a/qpid/java/common/src/main/java/org/apache/qpid/ToyClient.java b/qpid/java/common/src/main/java/org/apache/qpid/ToyClient.java index 27a48fb760..cb10859c9f 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/ToyClient.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/ToyClient.java @@ -20,6 +20,7 @@ */ package org.apache.qpid; +import java.nio.*; import java.util.*; import org.apache.qpid.transport.*; @@ -47,17 +48,9 @@ class ToyClient extends SessionDelegate } } - @Override public void header(Session ssn, Header header) + @Override public void messageTransfer(Session ssn, MessageTransfer xfr) { - for (Struct st : header.getStructs()) - { - System.out.println("header: " + st); - } - } - - @Override public void data(Session ssn, Data data) - { - System.out.println("got data: " + data); + System.out.println("msg: " + xfr); } public static final void main(String[] args) @@ -111,16 +104,16 @@ class ToyClient extends SessionDelegate map.put("binary", new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}); ssn.messageTransfer("asdf", MessageAcceptMode.EXPLICIT, - MessageAcquireMode.PRE_ACQUIRED); - ssn.header(new DeliveryProperties(), - new MessageProperties().setApplicationHeaders(map)); - ssn.data("this is the data"); - ssn.endData(); + MessageAcquireMode.PRE_ACQUIRED, + new Header(new DeliveryProperties(), + new MessageProperties() + .setApplicationHeaders(map)), + "this is the data"); ssn.messageTransfer("fdsa", MessageAcceptMode.EXPLICIT, - MessageAcquireMode.PRE_ACQUIRED); - ssn.data("this should be rejected"); - ssn.endData(); + MessageAcquireMode.PRE_ACQUIRED, + null, + "this should be rejected"); ssn.sync(); Future<QueueQueryResult> future = ssn.queueQuery("asdf"); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/ToyExchange.java b/qpid/java/common/src/main/java/org/apache/qpid/ToyExchange.java index 5c3c0ac0fb..c638679596 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/ToyExchange.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/ToyExchange.java @@ -9,42 +9,43 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.regex.Matcher; import java.util.regex.Pattern; -import org.apache.qpid.ToyBroker.Message; +import org.apache.qpid.transport.MessageTransfer; + public class ToyExchange { final static String DIRECT = "amq.direct"; final static String TOPIC = "amq.topic"; - private Map<String,List<LinkedBlockingQueue<Message>>> directEx = new HashMap<String,List<LinkedBlockingQueue<Message>>>(); - private Map<String,List<LinkedBlockingQueue<Message>>> topicEx = new HashMap<String,List<LinkedBlockingQueue<Message>>>(); - private Map<String,LinkedBlockingQueue<Message>> queues = new HashMap<String,LinkedBlockingQueue<Message>>(); + private Map<String,List<LinkedBlockingQueue<MessageTransfer>>> directEx = new HashMap<String,List<LinkedBlockingQueue<MessageTransfer>>>(); + private Map<String,List<LinkedBlockingQueue<MessageTransfer>>> topicEx = new HashMap<String,List<LinkedBlockingQueue<MessageTransfer>>>(); + private Map<String,LinkedBlockingQueue<MessageTransfer>> queues = new HashMap<String,LinkedBlockingQueue<MessageTransfer>>(); public void createQueue(String name) { - queues.put(name, new LinkedBlockingQueue<Message>()); + queues.put(name, new LinkedBlockingQueue<MessageTransfer>()); } - public LinkedBlockingQueue<Message> getQueue(String name) + public LinkedBlockingQueue<MessageTransfer> getQueue(String name) { return queues.get(name); } public void bindQueue(String type,String binding,String queueName) { - LinkedBlockingQueue<Message> queue = queues.get(queueName); + LinkedBlockingQueue<MessageTransfer> queue = queues.get(queueName); binding = normalizeKey(binding); if(DIRECT.equals(type)) { if (directEx.containsKey(binding)) { - List<LinkedBlockingQueue<Message>> list = directEx.get(binding); + List<LinkedBlockingQueue<MessageTransfer>> list = directEx.get(binding); list.add(queue); } else { - List<LinkedBlockingQueue<Message>> list = new LinkedList<LinkedBlockingQueue<Message>>(); + List<LinkedBlockingQueue<MessageTransfer>> list = new LinkedList<LinkedBlockingQueue<MessageTransfer>>(); list.add(queue); directEx.put(binding,list); } @@ -53,21 +54,21 @@ public class ToyExchange { if (topicEx.containsKey(binding)) { - List<LinkedBlockingQueue<Message>> list = topicEx.get(binding); + List<LinkedBlockingQueue<MessageTransfer>> list = topicEx.get(binding); list.add(queue); } else { - List<LinkedBlockingQueue<Message>> list = new LinkedList<LinkedBlockingQueue<Message>>(); + List<LinkedBlockingQueue<MessageTransfer>> list = new LinkedList<LinkedBlockingQueue<MessageTransfer>>(); list.add(queue); topicEx.put(binding,list); } } } - public boolean route(String dest,String routingKey,Message msg) + public boolean route(String dest, String routingKey, MessageTransfer msg) { - List<LinkedBlockingQueue<Message>> queues; + List<LinkedBlockingQueue<MessageTransfer>> queues; if(DIRECT.equals(dest)) { queues = directEx.get(routingKey); @@ -101,9 +102,9 @@ public class ToyExchange } } - private List<LinkedBlockingQueue<Message>> matchWildCard(String routingKey) + private List<LinkedBlockingQueue<MessageTransfer>> matchWildCard(String routingKey) { - List<LinkedBlockingQueue<Message>> selected = new ArrayList<LinkedBlockingQueue<Message>>(); + List<LinkedBlockingQueue<MessageTransfer>> selected = new ArrayList<LinkedBlockingQueue<MessageTransfer>>(); for(String key: topicEx.keySet()) { @@ -111,7 +112,7 @@ public class ToyExchange Matcher m = p.matcher(routingKey); if (m.find()) { - for(LinkedBlockingQueue<Message> queue : topicEx.get(key)) + for(LinkedBlockingQueue<MessageTransfer> queue : topicEx.get(key)) { selected.add(queue); } @@ -121,9 +122,9 @@ public class ToyExchange return selected; } - private void storeMessage(Message msg,List<LinkedBlockingQueue<Message>> selected) + private void storeMessage(MessageTransfer msg,List<LinkedBlockingQueue<MessageTransfer>> selected) { - for(LinkedBlockingQueue<Message> queue : selected) + for(LinkedBlockingQueue<MessageTransfer> queue : selected) { queue.offer(msg); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java index 2a248bf703..22f66ae556 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java @@ -418,9 +418,15 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt return chars; } + private String str = null; + public String asString() { - return new String(asChars()); + if (str == null) + { + str = new String(asChars()); + } + return str; } public boolean equals(Object o) diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Channel.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Channel.java index 7a967adbba..624c29baff 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Channel.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Channel.java @@ -53,11 +53,6 @@ public class Channel extends Invoker // session may be null private Session session; - private Lock commandLock = new ReentrantLock(); - private boolean first = true; - private ByteBuffer data = null; - private boolean batch = false; - public Channel(Connection connection, int channel, SessionDelegate delegate) { this.connection = connection; @@ -105,16 +100,6 @@ public class Channel extends Invoker method.delegate(session, sessionDelegate); } - public void header(Void v, Header header) - { - header.delegate(session, sessionDelegate); - } - - public void data(Void v, Data data) - { - data.delegate(session, sessionDelegate); - } - public void error(Void v, ProtocolError error) { throw new RuntimeException(error.getMessage()); @@ -157,62 +142,12 @@ public class Channel extends Invoker public void method(Method m) { - if (m.getEncodedTrack() == Frame.L4) - { - commandLock.lock(); - } - emit(m); - if (!m.isBatch() && !m.hasPayload()) - { - connection.flush(); - } - - batch = m.isBatch(); - - if (m.getEncodedTrack() == Frame.L4 && !m.hasPayload()) - { - commandLock.unlock(); - } - } - - public void header(Header header) - { - emit(header); - } - - public void data(ByteBuffer buf) - { - if (data != null) - { - emit(new Data(data, first, false)); - first = false; - } - - data = buf; - } - - public void data(String str) - { - data(str.getBytes()); - } - - public void data(byte[] bytes) - { - data(ByteBuffer.wrap(bytes)); - } - - public void end() - { - emit(new Data(data, first, true)); - first = true; - data = null; - if (!batch) + if (!m.isBatch()) { connection.flush(); } - commandLock.unlock(); } protected void invoke(Method m) diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Data.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Data.java deleted file mode 100644 index fbf7428864..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Data.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.transport; - -import org.apache.qpid.transport.network.Frame; - -import java.nio.ByteBuffer; - -import java.util.Collections; - -import static org.apache.qpid.transport.util.Functions.*; - - -/** - * Data - * - */ - -public class Data implements ProtocolEvent -{ - - private final ByteBuffer data; - private final boolean first; - private final boolean last; - private int channel; - - public Data(ByteBuffer data, boolean first, boolean last) - { - this.data = data; - this.first = first; - this.last = last; - } - - public ByteBuffer getData() - { - return data.slice(); - } - - public boolean isFirst() - { - return first; - } - - public boolean isLast() - { - return last; - } - - public final int getChannel() - { - return channel; - } - - public final void setChannel(int channel) - { - this.channel = channel; - } - - public byte getEncodedTrack() - { - return Frame.L4; - } - - public <C> void delegate(C context, ProtocolDelegate<C> delegate) - { - delegate.data(context, this); - } - - public String toString() - { - StringBuffer str = new StringBuffer(); - str.append("ch="); - str.append(" "); - str.append("Data("); - str.append(str(data, 64)); - str.append(")"); - return str.toString(); - } - -} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Echo.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Echo.java index f8debcf923..87bdae3866 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Echo.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Echo.java @@ -40,22 +40,6 @@ public class Echo extends SessionDelegate { this.xfr = xfr; ssn.invoke(xfr); - } - - public void header(Session ssn, Header hdr) - { - ssn.header(hdr); - } - - public void data(Session ssn, Data data) - { - ssn.data(data.getData()); - if (data.isLast()) - { - ssn.endData(); - } - - // XXX: should be able to get command-id from any segment ssn.processed(xfr); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Header.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Header.java index 3b351ee828..9b6ab4951b 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Header.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Header.java @@ -22,6 +22,7 @@ package org.apache.qpid.transport; import org.apache.qpid.transport.network.Frame; +import java.util.Arrays; import java.util.List; import java.nio.ByteBuffer; @@ -32,33 +33,25 @@ import java.nio.ByteBuffer; * @author Rafael H. Schloming */ -public class Header implements ProtocolEvent { +public class Header { private final List<Struct> structs; - private ByteBuffer _buf; - private boolean _noPayload; - private int channel; - public Header(List<Struct> structs, boolean lastframe) + public Header(List<Struct> structs) { this.structs = structs; - _noPayload= lastframe; } - public List<Struct> getStructs() + public Header(Struct ... structs) { - return structs; + this(Arrays.asList(structs)); } - public void setBuf(ByteBuffer buf) + public List<Struct> getStructs() { - _buf = buf; + return structs; } - public ByteBuffer getBuf() - { - return _buf; - } public <T> T get(Class<T> klass) { for (Struct st : structs) @@ -72,36 +65,9 @@ public class Header implements ProtocolEvent { return null; } - public final int getChannel() - { - return channel; - } - - public final void setChannel(int channel) - { - this.channel = channel; - } - - public byte getEncodedTrack() - { - return Frame.L4; - } - - public <C> void delegate(C context, ProtocolDelegate<C> delegate) - { - delegate.header(context, this); - } - - public boolean hasNoPayload() - { - return _noPayload; - } - public String toString() { StringBuffer str = new StringBuffer(); - str.append("ch="); - str.append(channel); str.append(" Header("); boolean first = true; for (Struct s : structs) diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java index 1c80d8c00c..6b99f6d5d3 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java @@ -22,6 +22,10 @@ package org.apache.qpid.transport; import org.apache.qpid.transport.network.Frame; +import java.nio.ByteBuffer; + +import static org.apache.qpid.transport.util.Functions.*; + /** * Method * @@ -88,6 +92,26 @@ public abstract class Method extends Struct implements ProtocolEvent public abstract boolean hasPayload(); + public Header getHeader() + { + return null; + } + + public void setHeader(Header header) + { + throw new UnsupportedOperationException(); + } + + public ByteBuffer getBody() + { + return null; + } + + public void setBody(ByteBuffer body) + { + throw new UnsupportedOperationException(); + } + public abstract byte getEncodedTrack(); public abstract <C> void dispatch(C context, MethodDelegate<C> delegate); @@ -134,6 +158,21 @@ public abstract class Method extends Struct implements ProtocolEvent str.append(" "); str.append(super.toString()); + Header hdr = getHeader(); + if (hdr != null) + { + for (Struct st : hdr.getStructs()) + { + str.append("\n "); + str.append(st); + } + } + ByteBuffer body = getBody(); + if (body != null) + { + str.append("\n body="); + str.append(str(body, 64)); + } return str.toString(); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolDelegate.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolDelegate.java index 9fa28fbe23..a90948fc1d 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolDelegate.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolDelegate.java @@ -35,10 +35,6 @@ public interface ProtocolDelegate<C> void command(C context, Method command); - void header(C context, Header header); - - void data(C context, Data data); - void error(C context, ProtocolError error); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java index 8ec13c0ee7..1400bd2e5b 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java @@ -36,6 +36,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.qpid.transport.Option.*; import static org.apache.qpid.transport.util.Functions.*; import static org.apache.qpid.util.Serial.*; +import static org.apache.qpid.util.Strings.*; /** * Session @@ -271,7 +272,7 @@ public class Session extends Invoker } needSync = !m.isSync(); channel.method(m); - if (autoSync && !m.hasPayload()) + if (autoSync) { sync(); } @@ -290,50 +291,6 @@ public class Session extends Invoker } } - public void header(Header header) - { - channel.header(header); - } - - public Header header(List<Struct> structs) - { - Header res = new Header(structs, false); - header(res); - return res; - } - - public Header header(Struct ... structs) - { - return header(Arrays.asList(structs)); - } - - public void data(ByteBuffer buf) - { - channel.data(buf); - } - - public void data(String str) - { - channel.data(str); - } - - public void data(byte[] bytes) - { - channel.data(bytes); - } - - public void endData() - { - channel.end(); - synchronized (commands) - { - if (autoSync) - { - sync(); - } - } - } - public void sync() { sync(timeout); @@ -501,6 +458,26 @@ public class Session extends Invoker } + public final void messageTransfer(String destination, + MessageAcceptMode acceptMode, + MessageAcquireMode acquireMode, + Header header, + byte[] body, + Option ... _options) { + messageTransfer(destination, acceptMode, acquireMode, header, + ByteBuffer.wrap(body), _options); + } + + public final void messageTransfer(String destination, + MessageAcceptMode acceptMode, + MessageAcquireMode acquireMode, + Header header, + String body, + Option ... _options) { + messageTransfer(destination, acceptMode, acquireMode, header, + toUTF8(body), _options); + } + public void close() { sessionRequestTimeout(0); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java index dc400d3098..b91763509c 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java @@ -48,10 +48,6 @@ public abstract class SessionDelegate } } - public void header(Session ssn, Header header) { } - - public void data(Session ssn, Data data) { } - public void error(Session ssn, ProtocolError error) { } @Override public void executionResult(Session ssn, ExecutionResult result) diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java index 788b6a55e3..390de881ab 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java @@ -35,24 +35,29 @@ public final class BBEncoder extends AbstractEncoder { private ByteBuffer out; + private int segment; public BBEncoder(int capacity) { out = ByteBuffer.allocate(capacity); out.order(ByteOrder.BIG_ENDIAN); + segment = 0; } public void init() { out.clear(); + segment = 0; } - public ByteBuffer done() + public ByteBuffer segment() { - out.flip(); - ByteBuffer encoded = ByteBuffer.allocate(out.remaining()); - encoded.put(out); - encoded.flip(); - return encoded; + int pos = out.position(); + out.position(segment); + ByteBuffer slice = out.slice(); + slice.limit(pos - segment); + out.position(pos); + segment = pos; + return slice; } private void grow(int size) diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/Validator.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/Validator.java index ae12d35209..c1d30eacc3 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/Validator.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/Validator.java @@ -154,7 +154,7 @@ public class Validator public static final void checkMap(Map<String,Object> map) { - if (map == null) + if (map == null || map.isEmpty()) { return; } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java index 2c09776c3d..b808156dc6 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java @@ -30,7 +30,6 @@ import java.nio.ByteBuffer; import org.apache.qpid.transport.codec.BBDecoder; import org.apache.qpid.transport.codec.Decoder; -import org.apache.qpid.transport.Data; import org.apache.qpid.transport.Header; import org.apache.qpid.transport.Method; import org.apache.qpid.transport.ProtocolError; @@ -51,6 +50,7 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate private final Receiver<ProtocolEvent> receiver; private final Map<Integer,List<Frame>> segments; + private final Method[] incomplete; private final ThreadLocal<BBDecoder> decoder = new ThreadLocal<BBDecoder>() { public BBDecoder initialValue() @@ -63,6 +63,7 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate { this.receiver = receiver; segments = new HashMap<Integer,List<Frame>>(); + incomplete = new Method[64*1024]; } private int segmentKey(Frame frame) @@ -97,11 +98,6 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate receiver.received(event); } - private void emit(Frame frame, ProtocolEvent event) - { - emit(frame.getChannel(), event); - } - public void received(NetworkEvent event) { event.delegate(this); @@ -122,32 +118,18 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate emit(0, header); } - public void frame(Frame frame) - { - switch (frame.getType()) - { - case BODY: - emit(frame, new Data(frame.getBody(), frame.isFirstFrame(), - frame.isLastFrame())); - break; - default: - assemble(frame); - break; - } - } - public void error(ProtocolError error) { emit(0, error); } - private void assemble(Frame frame) + public void frame(Frame frame) { ByteBuffer segment; if (frame.isFirstFrame() && frame.isLastFrame()) { segment = frame.getBody(); - emit(frame, decode(frame, segment)); + assemble(frame, segment); } else { @@ -179,38 +161,63 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate segment.put(f.getBody()); } segment.flip(); - emit(frame, decode(frame, segment)); + assemble(frame, segment); } } } - private ProtocolEvent decode(Frame frame, ByteBuffer segment) + private void assemble(Frame frame, ByteBuffer segment) { BBDecoder dec = decoder.get(); dec.init(segment); + int channel = frame.getChannel(); + Method command; + switch (frame.getType()) { case CONTROL: int controlType = dec.readUint16(); Method control = Method.create(controlType); control.read(dec); - return control; + emit(channel, control); + break; case COMMAND: int commandType = dec.readUint16(); // read in the session header, right now we don't use it dec.readUint16(); - Method command = Method.create(commandType); + command = Method.create(commandType); command.read(dec); - return command; + if (command.hasPayload()) + { + incomplete[channel] = command; + } + else + { + emit(channel, command); + } + break; case HEADER: + command = incomplete[channel]; List<Struct> structs = new ArrayList(); while (dec.hasRemaining()) { structs.add(dec.readStruct32()); } - return new Header(structs, frame.isLastFrame() && frame.isLastSegment()); + command.setHeader(new Header(structs)); + if (frame.isLastSegment()) + { + incomplete[channel] = null; + emit(channel, command); + } + break; + case BODY: + command = incomplete[channel]; + command.setBody(segment); + incomplete[channel] = null; + emit(channel, command); + break; default: throw new IllegalStateException("unknown frame type: " + frame.getType()); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java index 1ed446af2f..444c7d3f14 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java @@ -22,7 +22,6 @@ package org.apache.qpid.transport.network; import org.apache.qpid.transport.codec.BBEncoder; -import org.apache.qpid.transport.Data; import org.apache.qpid.transport.Header; import org.apache.qpid.transport.Method; import org.apache.qpid.transport.ProtocolDelegate; @@ -34,7 +33,8 @@ import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.Struct; import java.nio.ByteBuffer; -import java.util.Iterator; +import java.nio.ByteOrder; +import java.util.List; import static org.apache.qpid.transport.network.Frame.*; @@ -46,12 +46,14 @@ import static java.lang.Math.*; * */ -public class Disassembler implements Sender<ProtocolEvent>, - ProtocolDelegate<Void> +public final class Disassembler implements Sender<ProtocolEvent>, + ProtocolDelegate<Void> { - private final Sender<NetworkEvent> sender; + private final Sender<ByteBuffer> sender; private final int maxPayload; + private final ByteBuffer header; + private final Object sendlock = new Object(); private final ThreadLocal<BBEncoder> encoder = new ThreadLocal() { public BBEncoder initialValue() @@ -60,7 +62,7 @@ public class Disassembler implements Sender<ProtocolEvent>, } }; - public Disassembler(Sender<NetworkEvent> sender, int maxFrame) + public Disassembler(Sender<ByteBuffer> sender, int maxFrame) { if (maxFrame <= HEADER_SIZE || maxFrame >= 64*1024) { @@ -69,6 +71,8 @@ public class Disassembler implements Sender<ProtocolEvent>, } this.sender = sender; this.maxPayload = maxFrame - HEADER_SIZE; + this.header = ByteBuffer.allocate(HEADER_SIZE); + this.header.order(ByteOrder.BIG_ENDIAN); } @@ -79,60 +83,80 @@ public class Disassembler implements Sender<ProtocolEvent>, public void flush() { - sender.flush(); + synchronized (sendlock) + { + sender.flush(); + } } public void close() { - sender.close(); + synchronized (sendlock) + { + sender.close(); + } + } + + private final void frame(byte flags, byte type, byte track, int channel, int size, ByteBuffer buf) + { + synchronized (sendlock) + { + header.put(0, flags); + header.put(1, type); + header.putShort(2, (short) (size + HEADER_SIZE)); + header.put(5, track); + header.putShort(6, (short) channel); + + header.rewind(); + + sender.send(header); + + int limit = buf.limit(); + buf.limit(buf.position() + size); + sender.send(buf); + buf.limit(limit); + } } private void fragment(byte flags, SegmentType type, ProtocolEvent event, ByteBuffer buf, boolean first, boolean last) { + byte typeb = (byte) type.getValue(); byte track = event.getEncodedTrack() == Frame.L4 ? (byte) 1 : (byte) 0; - if(!buf.hasRemaining()) + int remaining = buf.remaining(); + while (true) { - //empty data - byte nflags = flags; + int size = min(maxPayload, remaining); + remaining -= size; + + byte newflags = flags; if (first) { - nflags |= FIRST_FRAME; + newflags |= FIRST_FRAME; first = false; } - nflags |= LAST_FRAME; - Frame frame = new Frame(nflags, type, track, event.getChannel(), buf.slice()); - sender.send(frame); - } - else - { - while (buf.hasRemaining()) + if (last && remaining == 0) { - ByteBuffer slice = buf.slice(); - slice.limit(min(maxPayload, slice.remaining())); - buf.position(buf.position() + slice.remaining()); - - byte newflags = flags; - if (first) - { - newflags |= FIRST_FRAME; - first = false; - } - if (last && !buf.hasRemaining()) - { - newflags |= LAST_FRAME; - } - - Frame frame = new Frame(newflags, type, track, event.getChannel(), slice); - sender.send(frame); + newflags |= LAST_FRAME; + } + + frame(newflags, typeb, track, event.getChannel(), size, buf); + + if (remaining == 0) + { + break; } } } public void init(Void v, ProtocolHeader header) { - sender.send(header); + synchronized (sendlock) + { + sender.send(header.toByteBuffer()); + sender.flush(); + } } public void control(Void v, Method method) @@ -170,48 +194,43 @@ public class Disassembler implements Sender<ProtocolEvent>, } } method.write(enc); - ByteBuffer buf = enc.done(); + ByteBuffer methodSeg = enc.segment(); byte flags = FIRST_SEG; - if (!method.hasPayload()) + boolean payload = method.hasPayload(); + if (!payload) { flags |= LAST_SEG; } - fragment(flags, type, method, buf, true, true); - } - - public void header(Void v, Header header) - { - ByteBuffer buf; - if (header.getBuf() == null) + ByteBuffer headerSeg = null; + if (payload) { - BBEncoder enc = encoder.get(); - enc.init(); - for (Struct st : header.getStructs()) + final Header hdr = method.getHeader(); + final List<Struct> structs = hdr.getStructs(); + final int nstructs = structs.size(); + for (int i = 0; i < nstructs; i++) { - enc.writeStruct32(st); + enc.writeStruct32(structs.get(i)); } - buf = enc.done(); - header.setBuf(buf); + headerSeg = enc.segment(); } - else + + synchronized (sendlock) { - buf = header.getBuf(); - buf.flip(); + fragment(flags, type, method, methodSeg, true, true); + if (payload) + { + fragment((byte) 0x0, SegmentType.HEADER, method, headerSeg, true, true); + fragment(LAST_SEG, SegmentType.BODY, method, method.getBody(), true, true); + } } - fragment((byte) 0x0, SegmentType.HEADER, header, buf, true, true); - } - - public void data(Void v, Data data) - { - fragment(LAST_SEG, SegmentType.BODY, data, data.getData(), data.isFirst(), data.isLast()); } public void error(Void v, ProtocolError error) { - sender.send(error); + throw new IllegalArgumentException("" + error); } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutputHandler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutputHandler.java deleted file mode 100644 index b3f400a6e7..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutputHandler.java +++ /dev/null @@ -1,125 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.transport.network; - -import java.nio.ByteBuffer; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.qpid.transport.Constant; -import org.apache.qpid.transport.ProtocolError; -import org.apache.qpid.transport.ProtocolHeader; -import org.apache.qpid.transport.Sender; - -import static org.apache.qpid.transport.network.Frame.*; - - -/** - * OutputHandler - * - */ - -public class OutputHandler implements Sender<NetworkEvent>, NetworkDelegate -{ - - private Sender<ByteBuffer> sender; - private Object lock = new Object(); - private int bytes = 0; - private List<Frame> frames = new ArrayList<Frame>(); - - public OutputHandler(Sender<ByteBuffer> sender) - { - this.sender = sender; - } - - public void send(NetworkEvent event) - { - event.delegate(this); - } - - public void close() - { - synchronized (lock) - { - sender.close(); - } - } - - public void init(ProtocolHeader header) - { - synchronized (lock) - { - sender.send(header.toByteBuffer()); - sender.flush(); - } - } - - public void frame(Frame frame) - { - synchronized (lock) - { - frames.add(frame); - bytes += HEADER_SIZE + frame.getSize(); - - if (bytes > 64*1024) - { - flush(); - } - } - } - - public void flush() - { - synchronized (lock) - { - ByteBuffer buf = ByteBuffer.allocate(bytes); - int nframes = frames.size(); - for (int i = 0; i < nframes; i++) - { - Frame frame = frames.get(i); - buf.put(frame.getFlags()); - buf.put((byte) frame.getType().getValue()); - buf.putShort((short) (frame.getSize() + HEADER_SIZE)); - // RESERVED - buf.put(RESERVED); - buf.put(frame.getTrack()); - buf.putShort((short) frame.getChannel()); - // RESERVED - buf.putInt(0); - buf.put(frame.getBody()); - } - buf.flip(); - - frames.clear(); - bytes = 0; - - sender.send(buf); - sender.flush(); - } - } - - public void error(ProtocolError error) - { - throw new IllegalStateException("XXX"); - } - -} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java index 37910ade0d..7ac5649e99 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java @@ -23,7 +23,6 @@ import java.io.OutputStream; import java.net.Socket; import java.nio.ByteBuffer; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.TransportException; @@ -48,8 +47,9 @@ final class IoSender extends Thread implements Sender<ByteBuffer> private final OutputStream out; private final byte[] buffer; - private final AtomicInteger head = new AtomicInteger(START); - private final AtomicInteger tail = new AtomicInteger(START); + private volatile int head = START; + private volatile int tail = START; + private volatile boolean idle = true; private final Object notFull = new Object(); private final Object notEmpty = new Object(); private final AtomicBoolean closed = new AtomicBoolean(false); @@ -96,16 +96,17 @@ final class IoSender extends Thread implements Sender<ByteBuffer> while (remaining > 0) { - final int hd = head.get(); - final int tl = tail.get(); + final int hd = head; + final int tl = tail; if (hd - tl >= size) { + flush(); synchronized (notFull) { long start = System.currentTimeMillis(); long elapsed = 0; - while (head.get() - tail.get() >= size && elapsed < timeout) + while (head - tail >= size && elapsed < timeout) { try { @@ -118,9 +119,9 @@ final class IoSender extends Thread implements Sender<ByteBuffer> elapsed += System.currentTimeMillis() - start; } - if (head.get() - tail.get() >= size) + if (head - tail >= size) { - throw new TransportException(String.format("write timed out: %s, %s", head.get(), tail.get())); + throw new TransportException(String.format("write timed out: %s, %s", head, tail)); } } continue; @@ -140,21 +141,20 @@ final class IoSender extends Thread implements Sender<ByteBuffer> } buf.get(buffer, hd_idx, length); - head.getAndAdd(length); - if (hd == tail.get()) - { - synchronized (notEmpty) - { - notEmpty.notify(); - } - } + head += length; remaining -= length; } } public void flush() { - // pass + if (idle) + { + synchronized (notEmpty) + { + notEmpty.notify(); + } + } } public void close() @@ -206,8 +206,8 @@ final class IoSender extends Thread implements Sender<ByteBuffer> while (true) { - final int hd = head.get(); - final int tl = tail.get(); + final int hd = head; + final int tl = tail; if (hd == tl) { @@ -216,9 +216,11 @@ final class IoSender extends Thread implements Sender<ByteBuffer> break; } + idle = true; + synchronized (notEmpty) { - while (head.get() == tail.get() && !closed.get()) + while (head == tail && !closed.get()) { try { @@ -231,6 +233,8 @@ final class IoSender extends Thread implements Sender<ByteBuffer> } } + idle = false; + continue; } @@ -258,8 +262,8 @@ final class IoSender extends Thread implements Sender<ByteBuffer> close(false); break; } - tail.getAndAdd(length); - if (head.get() - tl >= size) + tail += length; + if (head - tl >= size) { synchronized (notFull) { diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java index 52accb6b97..3b543b3e60 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java @@ -33,7 +33,6 @@ import org.apache.qpid.transport.TransportException; import org.apache.qpid.transport.network.Assembler; import org.apache.qpid.transport.network.Disassembler; import org.apache.qpid.transport.network.InputHandler; -import org.apache.qpid.transport.network.OutputHandler; import org.apache.qpid.transport.util.Logger; /** @@ -48,6 +47,14 @@ import org.apache.qpid.transport.util.Logger; public final class IoTransport { + static + { + org.apache.mina.common.ByteBuffer.setAllocator + (new org.apache.mina.common.SimpleByteBufferAllocator()); + org.apache.mina.common.ByteBuffer.setUseDirectBuffers + (Boolean.getBoolean("amqj.enableDirectBuffers")); + } + private static final Logger log = Logger.get(IoTransport.class); private static int DEFAULT_READ_WRITE_BUFFER_SIZE = 64 * 1024; @@ -104,8 +111,7 @@ public final class IoTransport sender = new IoSender(this, 2*writeBufferSize, timeout); Connection conn = new Connection - (new Disassembler(new OutputHandler(sender), 64*1024 - 1), - delegate); + (new Disassembler(sender, 64*1024 - 1), delegate); receiver = new IoReceiver(this, new InputHandler(new Assembler(conn)), 2*readBufferSize, timeout); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaHandler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaHandler.java index bcac7c4e16..16a1e20b10 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaHandler.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaHandler.java @@ -44,7 +44,6 @@ import org.apache.qpid.transport.util.Logger; import org.apache.qpid.transport.network.Assembler; import org.apache.qpid.transport.network.Disassembler; import org.apache.qpid.transport.network.InputHandler; -import org.apache.qpid.transport.network.OutputHandler; import static org.apache.qpid.transport.util.Functions.*; @@ -292,7 +291,7 @@ public class MinaHandler<E> implements IoHandler { // XXX: hardcoded max-frame return new Connection - (new Disassembler(new OutputHandler(sender), MAX_FRAME_SIZE), delegate); + (new Disassembler(sender, MAX_FRAME_SIZE), delegate); } public Receiver<java.nio.ByteBuffer> receiver(Connection conn) diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioHandler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioHandler.java index f0161efe97..51e41b26f7 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioHandler.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioHandler.java @@ -17,7 +17,6 @@ import org.apache.qpid.transport.Receiver; import org.apache.qpid.transport.network.Assembler; import org.apache.qpid.transport.network.Disassembler; import org.apache.qpid.transport.network.InputHandler; -import org.apache.qpid.transport.network.OutputHandler; public class NioHandler implements Runnable { @@ -68,8 +67,7 @@ public class NioHandler implements Runnable NioSender sender = new NioSender(_ch); Connection con = new Connection - (new Disassembler(new OutputHandler(sender), 64*1024 - 1), - delegate); + (new Disassembler(sender, 64*1024 - 1), delegate); con.setConnectionId(_count.incrementAndGet()); _handlers.put(con.getConnectionId(),sender); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/util/Strings.java b/qpid/java/common/src/main/java/org/apache/qpid/util/Strings.java new file mode 100644 index 0000000000..4b199bafe6 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/util/Strings.java @@ -0,0 +1,82 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.util; + +import java.io.UnsupportedEncodingException; + + +/** + * Strings + * + */ + +public final class Strings +{ + + private static final byte[] EMPTY = new byte[0]; + + private static final ThreadLocal<char[]> charbuf = new ThreadLocal() + { + public char[] initialValue() + { + return new char[4096]; + } + }; + + public static final byte[] toUTF8(String str) + { + if (str == null) + { + return EMPTY; + } + else + { + final int size = str.length(); + char[] chars = charbuf.get(); + if (size > chars.length) + { + chars = new char[Math.max(size, 2*chars.length)]; + charbuf.set(chars); + } + + str.getChars(0, size, chars, 0); + final byte[] bytes = new byte[size]; + for (int i = 0; i < size; i++) + { + if (chars[i] > 127) + { + try + { + return str.getBytes("UTF-8"); + } + catch (UnsupportedEncodingException e) + { + throw new RuntimeException(e); + } + } + + bytes[i] = (byte) chars[i]; + } + return bytes; + } + } + +} |