diff options
author | Rafael H. Schloming <rhs@apache.org> | 2008-07-10 01:15:43 +0000 |
---|---|---|
committer | Rafael H. Schloming <rhs@apache.org> | 2008-07-10 01:15:43 +0000 |
commit | d0e58803b1baba4d423c9054a28b12bb3b78ded8 (patch) | |
tree | c6b6ccf03c6f48c4fbe2f1dcf4dac86a99f0df82 /qpid/java/common | |
parent | 2a545d4432fb46adedc9fd899a42f69534d4a80a (diff) | |
download | qpid-python-d0e58803b1baba4d423c9054a28b12bb3b78ded8.tar.gz |
QPID-1062: moved channel id into the ProtocolEvent interface and removed ConnectionEvent, this removes the overhead of creating ConnectionEvents
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@675397 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/common')
14 files changed, 119 insertions, 116 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/ToyClient.java b/qpid/java/common/src/main/java/org/apache/qpidity/ToyClient.java index a3233afcbe..f6d2829504 100644 --- a/qpid/java/common/src/main/java/org/apache/qpidity/ToyClient.java +++ b/qpid/java/common/src/main/java/org/apache/qpidity/ToyClient.java @@ -75,9 +75,10 @@ class ToyClient extends SessionDelegate } public void closed() {} }); - conn.send(new ConnectionEvent(0, new ProtocolHeader(1, - TransportConstants.getVersionMajor(), - TransportConstants.getVersionMinor()))); + conn.send(new ProtocolHeader + (1, + TransportConstants.getVersionMajor(), + TransportConstants.getVersionMinor())); Channel ch = conn.getChannel(0); Session ssn = new Session("my-session".getBytes()); diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/Channel.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/Channel.java index eb37ce1590..50341d2c20 100644 --- a/qpid/java/common/src/main/java/org/apache/qpidity/transport/Channel.java +++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/Channel.java @@ -151,7 +151,8 @@ public class Channel extends Invoker private void emit(ProtocolEvent event) { - connection.send(new ConnectionEvent(channel, event)); + event.setChannel(channel); + connection.send(event); } public void method(Method m) diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/Connection.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/Connection.java index 15116be1c3..d83bc7d2f5 100644 --- a/qpid/java/common/src/main/java/org/apache/qpidity/transport/Connection.java +++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/Connection.java @@ -41,12 +41,12 @@ import java.nio.ByteBuffer; */ public class Connection - implements Receiver<ConnectionEvent>, Sender<ConnectionEvent> + implements Receiver<ProtocolEvent>, Sender<ProtocolEvent> { private static final Logger log = Logger.get(Connection.class); - final private Sender<ConnectionEvent> sender; + final private Sender<ProtocolEvent> sender; final private ConnectionDelegate delegate; private int channelMax = 1; // want to make this final @@ -54,7 +54,7 @@ public class Connection final private Map<Integer,Channel> channels = new HashMap<Integer,Channel>(); - public Connection(Sender<ConnectionEvent> sender, + public Connection(Sender<ProtocolEvent> sender, ConnectionDelegate delegate) { this.sender = sender; @@ -76,14 +76,14 @@ public class Connection return delegate; } - public void received(ConnectionEvent event) + public void received(ProtocolEvent event) { log.debug("RECV: [%s] %s", this, event); Channel channel = getChannel(event.getChannel()); - channel.received(event.getProtocolEvent()); + channel.received(event); } - public void send(ConnectionEvent event) + public void send(ProtocolEvent event) { log.debug("SEND: [%s] %s", this, event); sender.send(event); diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/ConnectionDelegate.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/ConnectionDelegate.java index 14344991c6..962dd9a5da 100644 --- a/qpid/java/common/src/main/java/org/apache/qpidity/transport/ConnectionDelegate.java +++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/ConnectionDelegate.java @@ -82,18 +82,18 @@ public abstract class ConnectionDelegate extends MethodDelegate<Channel> public void init(Channel ch, ProtocolHeader hdr) { - ch.getConnection().send(new ConnectionEvent(0, new ProtocolHeader - (1, - TransportConstants.getVersionMajor(), - TransportConstants.getVersionMinor()))); + ch.getConnection().send(new ProtocolHeader + (1, + TransportConstants.getVersionMajor(), + TransportConstants.getVersionMinor())); if (hdr.getMajor() != TransportConstants.getVersionMajor() && hdr.getMinor() != TransportConstants.getVersionMinor()) { // XXX - ch.getConnection().send(new ConnectionEvent(0, new ProtocolHeader - (1, - TransportConstants.getVersionMajor(), - TransportConstants.getVersionMinor()))); + ch.getConnection().send(new ProtocolHeader + (1, + TransportConstants.getVersionMajor(), + TransportConstants.getVersionMinor())); ch.getConnection().close(); } else diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/ConnectionEvent.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/ConnectionEvent.java deleted file mode 100644 index 62d8f4d99d..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpidity/transport/ConnectionEvent.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpidity.transport; - - -/** - * ConnectionEvent - * - */ - -public class ConnectionEvent -{ - - private final int channel; - private final ProtocolEvent event; - - public ConnectionEvent(int channel, ProtocolEvent event) - { - this.channel = channel; - this.event = event; - } - - public int getChannel() - { - return channel; - } - - public ProtocolEvent getProtocolEvent() - { - return event; - } - - public String toString() - { - return String.format("[%d] %s", channel, event); - } - -} diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/Data.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/Data.java index 8792518834..31e5a99935 100644 --- a/qpid/java/common/src/main/java/org/apache/qpidity/transport/Data.java +++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/Data.java @@ -40,6 +40,7 @@ public class Data implements ProtocolEvent private final ByteBuffer data; private final boolean first; private final boolean last; + private int channel; public Data(ByteBuffer data, boolean first, boolean last) { @@ -63,6 +64,16 @@ public class Data implements ProtocolEvent return last; } + public final int getChannel() + { + return channel; + } + + public final void setChannel(int channel) + { + this.channel = channel; + } + public byte getEncodedTrack() { return Frame.L4; @@ -76,6 +87,8 @@ public class Data implements ProtocolEvent public String toString() { StringBuffer str = new StringBuffer(); + str.append("ch="); + str.append(" "); str.append("Data("); str.append(str(data, 64)); str.append(")"); diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/Header.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/Header.java index ae11bb0c69..221fc37f58 100644 --- a/qpid/java/common/src/main/java/org/apache/qpidity/transport/Header.java +++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/Header.java @@ -37,6 +37,7 @@ public class Header implements ProtocolEvent { private final List<Struct> structs; private ByteBuffer _buf; private boolean _noPayload; + private int channel; public Header(List<Struct> structs, boolean lastframe) { @@ -71,6 +72,16 @@ public class Header implements ProtocolEvent { return null; } + public final int getChannel() + { + return channel; + } + + public final void setChannel(int channel) + { + this.channel = channel; + } + public byte getEncodedTrack() { return Frame.L4; @@ -82,15 +93,16 @@ public class Header implements ProtocolEvent { } public boolean hasNoPayload() - { - return _noPayload; - } - + { + return _noPayload; + } public String toString() { StringBuffer str = new StringBuffer(); - str.append("Header("); + str.append("ch="); + str.append(channel); + str.append(" Header("); boolean first = true; for (Struct s : structs) { diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/Method.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/Method.java index a0605e6e66..6589cc79b4 100644 --- a/qpid/java/common/src/main/java/org/apache/qpidity/transport/Method.java +++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/Method.java @@ -40,6 +40,7 @@ public abstract class Method extends Struct implements ProtocolEvent // XXX: command subclass? private int id; + private int channel; private boolean idSet = false; private boolean sync = false; private boolean batch = false; @@ -55,6 +56,16 @@ public abstract class Method extends Struct implements ProtocolEvent this.idSet = true; } + public final int getChannel() + { + return channel; + } + + public final void setChannel(int channel) + { + this.channel = channel; + } + public final boolean isSync() { return sync; @@ -97,18 +108,18 @@ public abstract class Method extends Struct implements ProtocolEvent { StringBuilder str = new StringBuilder(); + str.append("ch="); + str.append(channel); + if (getEncodedTrack() == Frame.L4 && idSet) { - str.append("id="); + str.append(" id="); str.append(id); } if (sync || batch) { - if (str.length() > 0) - { - str.append(" "); - } + str.append(" "); str.append("["); if (sync) { @@ -121,11 +132,9 @@ public abstract class Method extends Struct implements ProtocolEvent str.append("]"); } - if (str.length() > 0) - { - str.append(" "); - } + str.append(" "); str.append(super.toString()); + return str.toString(); } diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/ProtocolError.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/ProtocolError.java index 59fc3d2552..586695af22 100644 --- a/qpid/java/common/src/main/java/org/apache/qpidity/transport/ProtocolError.java +++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/ProtocolError.java @@ -30,9 +30,10 @@ import org.apache.qpidity.transport.network.NetworkEvent; * @author Rafael H. Schloming */ -public class ProtocolError implements NetworkEvent, ProtocolEvent +public final class ProtocolError implements NetworkEvent, ProtocolEvent { + private int channel; private final byte track; private final String format; private final Object[] args; @@ -44,6 +45,16 @@ public class ProtocolError implements NetworkEvent, ProtocolEvent this.args = args; } + public int getChannel() + { + return channel; + } + + public void setChannel(int channel) + { + this.channel = channel; + } + public byte getEncodedTrack() { return track; diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/ProtocolEvent.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/ProtocolEvent.java index 0b38dc6f28..03439be9ce 100644 --- a/qpid/java/common/src/main/java/org/apache/qpidity/transport/ProtocolEvent.java +++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/ProtocolEvent.java @@ -29,6 +29,10 @@ package org.apache.qpidity.transport; public interface ProtocolEvent { + int getChannel(); + + void setChannel(int channel); + byte getEncodedTrack(); <C> void delegate(C context, ProtocolDelegate<C> delegate); diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/ProtocolHeader.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/ProtocolHeader.java index f9cd6f3947..eea945c7c6 100644 --- a/qpid/java/common/src/main/java/org/apache/qpidity/transport/ProtocolHeader.java +++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/ProtocolHeader.java @@ -33,9 +33,7 @@ import org.apache.qpidity.transport.network.Frame; * @author Rafael H. Schloming */ -//RA making this public until we sort out the package issues - -public class ProtocolHeader implements NetworkEvent, ProtocolEvent +public final class ProtocolHeader implements NetworkEvent, ProtocolEvent { private static final byte[] AMQP = {'A', 'M', 'Q', 'P' }; @@ -44,6 +42,7 @@ public class ProtocolHeader implements NetworkEvent, ProtocolEvent final private byte instance; final private byte major; final private byte minor; + private int channel; public ProtocolHeader(byte instance, byte major, byte minor) { @@ -72,6 +71,16 @@ public class ProtocolHeader implements NetworkEvent, ProtocolEvent return minor; } + public int getChannel() + { + return channel; + } + + public void setChannel(int channel) + { + this.channel = channel; + } + public byte getEncodedTrack() { return Frame.L1; diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/Assembler.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/Assembler.java index e188c15b35..45e2576e7b 100644 --- a/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/Assembler.java +++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/Assembler.java @@ -30,7 +30,6 @@ import java.nio.ByteBuffer; import org.apache.qpidity.transport.codec.BBDecoder; import org.apache.qpidity.transport.codec.Decoder; -import org.apache.qpidity.transport.ConnectionEvent; import org.apache.qpidity.transport.Data; import org.apache.qpidity.transport.Header; import org.apache.qpidity.transport.Method; @@ -50,7 +49,7 @@ import org.apache.qpidity.transport.Struct; public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate { - private final Receiver<ConnectionEvent> receiver; + private final Receiver<ProtocolEvent> receiver; private final Map<Integer,List<Frame>> segments; private final ThreadLocal<BBDecoder> decoder = new ThreadLocal<BBDecoder>() { @@ -60,7 +59,7 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate } }; - public Assembler(Receiver<ConnectionEvent> receiver) + public Assembler(Receiver<ProtocolEvent> receiver) { this.receiver = receiver; segments = new HashMap<Integer,List<Frame>>(); @@ -94,7 +93,8 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate private void emit(int channel, ProtocolEvent event) { - receiver.received(new ConnectionEvent(channel, event)); + event.setChannel(channel); + receiver.received(event); } private void emit(Frame frame, ProtocolEvent event) diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/Disassembler.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/Disassembler.java index 074057df56..fe7939c0d8 100644 --- a/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/Disassembler.java +++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/Disassembler.java @@ -22,7 +22,6 @@ package org.apache.qpidity.transport.network; import org.apache.qpidity.transport.codec.BBEncoder; -import org.apache.qpidity.transport.ConnectionEvent; import org.apache.qpidity.transport.Data; import org.apache.qpidity.transport.Header; import org.apache.qpidity.transport.Method; @@ -47,8 +46,8 @@ import static java.lang.Math.*; * */ -public class Disassembler implements Sender<ConnectionEvent>, - ProtocolDelegate<ConnectionEvent> +public class Disassembler implements Sender<ProtocolEvent>, + ProtocolDelegate<Void> { private final Sender<NetworkEvent> sender; @@ -73,9 +72,9 @@ public class Disassembler implements Sender<ConnectionEvent>, } - public void send(ConnectionEvent event) + public void send(ProtocolEvent event) { - event.getProtocolEvent().delegate(event, this); + event.delegate(null, this); } public void flush() @@ -88,10 +87,10 @@ public class Disassembler implements Sender<ConnectionEvent>, sender.close(); } - private void fragment(byte flags, SegmentType type, ConnectionEvent event, + private void fragment(byte flags, SegmentType type, ProtocolEvent event, ByteBuffer buf, boolean first, boolean last) { - byte track = event.getProtocolEvent().getEncodedTrack() == Frame.L4 ? (byte) 1 : (byte) 0; + byte track = event.getEncodedTrack() == Frame.L4 ? (byte) 1 : (byte) 0; if(!buf.hasRemaining()) { @@ -131,19 +130,19 @@ public class Disassembler implements Sender<ConnectionEvent>, } } - public void init(ConnectionEvent event, ProtocolHeader header) + public void init(Void v, ProtocolHeader header) { sender.send(header); } - public void control(ConnectionEvent event, Method method) + public void control(Void v, Method method) { - method(event, method, SegmentType.CONTROL); + method(method, SegmentType.CONTROL); } - public void command(ConnectionEvent event, Method method) + public void command(Void v, Method method) { - method(event, method, SegmentType.COMMAND); + method(method, SegmentType.COMMAND); } private ByteBuffer copy(ByteBuffer src) @@ -154,7 +153,7 @@ public class Disassembler implements Sender<ConnectionEvent>, return buf; } - private void method(ConnectionEvent event, Method method, SegmentType type) + private void method(Method method, SegmentType type) { BBEncoder enc = encoder.get(); enc.init(); @@ -180,10 +179,10 @@ public class Disassembler implements Sender<ConnectionEvent>, flags |= LAST_SEG; } - fragment(flags, type, event, buf, true, true); + fragment(flags, type, method, buf, true, true); } - public void header(ConnectionEvent event, Header header) + public void header(Void v, Header header) { ByteBuffer buf; if (header.getBuf() == null) @@ -202,15 +201,15 @@ public class Disassembler implements Sender<ConnectionEvent>, buf = header.getBuf(); buf.flip(); } - fragment((byte) 0x0, SegmentType.HEADER, event, buf, true, true); + fragment((byte) 0x0, SegmentType.HEADER, header, buf, true, true); } - public void data(ConnectionEvent event, Data data) + public void data(Void v, Data data) { - fragment(LAST_SEG, SegmentType.BODY, event, data.getData(), data.isFirst(), data.isLast()); + fragment(LAST_SEG, SegmentType.BODY, data, data.getData(), data.isFirst(), data.isLast()); } - public void error(ConnectionEvent event, ProtocolError error) + public void error(Void v, ProtocolError error) { sender.send(error); } diff --git a/qpid/java/common/src/test/java/org/apache/qpidity/transport/ConnectionTest.java b/qpid/java/common/src/test/java/org/apache/qpidity/transport/ConnectionTest.java index 1a83786e12..281b618408 100644 --- a/qpid/java/common/src/test/java/org/apache/qpidity/transport/ConnectionTest.java +++ b/qpid/java/common/src/test/java/org/apache/qpidity/transport/ConnectionTest.java @@ -86,7 +86,7 @@ public class ConnectionTest extends TestCase } }); - conn.send(new ConnectionEvent(0, new ProtocolHeader(1, 0, 10))); + conn.send(new ProtocolHeader(1, 0, 10)); return conn; } |