summaryrefslogtreecommitdiff
path: root/java/common/src
diff options
context:
space:
mode:
Diffstat (limited to 'java/common/src')
-rw-r--r--java/common/src/main/java/org/apache/qpidity/SecurityHelper.java8
-rw-r--r--java/common/src/main/java/org/apache/qpidity/ToyBroker.java21
-rw-r--r--java/common/src/main/java/org/apache/qpidity/ToyClient.java12
-rw-r--r--java/common/src/main/java/org/apache/qpidity/dtx/XidImpl.java26
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/Channel.java10
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/ChannelDelegate.java9
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/ClientDelegate.java41
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/ConnectionDelegate.java52
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/Data.java2
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/Method.java23
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/ProtocolDelegate.java4
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/RangeSet.java7
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/Session.java130
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/SessionDelegate.java45
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/Struct.java21
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/TransportConstants.java6
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractDecoder.java212
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractEncoder.java288
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/codec/Decoder.java34
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/codec/Encoder.java34
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/codec/Sizer.java38
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/network/Assembler.java26
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/network/Disassembler.java96
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/network/Frame.java15
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java8
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java6
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaHandler.java2
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/util/Functions.java15
-rw-r--r--java/common/src/test/java/org/apache/qpidity/transport/ConnectionTest.java4
29 files changed, 722 insertions, 473 deletions
diff --git a/java/common/src/main/java/org/apache/qpidity/SecurityHelper.java b/java/common/src/main/java/org/apache/qpidity/SecurityHelper.java
index d0b3272dec..a72997813a 100644
--- a/java/common/src/main/java/org/apache/qpidity/SecurityHelper.java
+++ b/java/common/src/main/java/org/apache/qpidity/SecurityHelper.java
@@ -22,6 +22,7 @@ package org.apache.qpidity;
import java.io.UnsupportedEncodingException;
import java.util.HashSet;
+import java.util.List;
import java.util.StringTokenizer;
import org.apache.qpidity.security.AMQPCallbackHandler;
@@ -29,13 +30,12 @@ import org.apache.qpidity.security.CallbackHandlerRegistry;
public class SecurityHelper
{
- public static String chooseMechanism(String mechanisms) throws UnsupportedEncodingException
+ public static String chooseMechanism(List<Object> mechanisms) throws UnsupportedEncodingException
{
- StringTokenizer tokenizer = new StringTokenizer(mechanisms, " ");
HashSet mechanismSet = new HashSet();
- while (tokenizer.hasMoreTokens())
+ for (Object m : mechanisms)
{
- mechanismSet.add(tokenizer.nextToken());
+ mechanismSet.add(m);
}
String preferredMechanisms = CallbackHandlerRegistry.getInstance().getMechanisms();
diff --git a/java/common/src/main/java/org/apache/qpidity/ToyBroker.java b/java/common/src/main/java/org/apache/qpidity/ToyBroker.java
index 2bd97f3aff..5f9917e30a 100644
--- a/java/common/src/main/java/org/apache/qpidity/ToyBroker.java
+++ b/java/common/src/main/java/org/apache/qpidity/ToyBroker.java
@@ -59,7 +59,7 @@ class ToyBroker extends SessionDelegate
public void messageAcquire(Session context, MessageAcquire struct)
{
System.out.println("\n==================> messageAcquire " );
- context.messageAcquired(struct.getTransfers());
+ context.executionResult((int) struct.getId(), new Acquired(struct.getTransfers()));
}
@Override public void queueDeclare(Session ssn, QueueDeclare qd)
@@ -68,16 +68,16 @@ class ToyBroker extends SessionDelegate
System.out.println("\n==================> declared queue: " + qd.getQueue() + "\n");
}
- @Override public void queueBind(Session ssn, QueueBind qb)
+ @Override public void exchangeBind(Session ssn, ExchangeBind qb)
{
- exchange.bindQueue(qb.getExchange(), qb.getRoutingKey(),qb.getQueue());
- System.out.println("\n==================> bound queue: " + qb.getQueue() + " with routing key " + qb.getRoutingKey() + "\n");
+ exchange.bindQueue(qb.getExchange(), qb.getBindingKey(),qb.getQueue());
+ System.out.println("\n==================> bound queue: " + qb.getQueue() + " with binding key " + qb.getBindingKey() + "\n");
}
@Override public void queueQuery(Session ssn, QueueQuery qq)
{
QueueQueryResult result = new QueueQueryResult().queue(qq.getQueue());
- ssn.executionResult(qq.getId(), result);
+ ssn.executionResult((int) qq.getId(), result);
}
@Override public void messageSubscribe(Session ssn, MessageSubscribe ms)
@@ -112,7 +112,8 @@ class ToyBroker extends SessionDelegate
{
if (xfr == null || body == null)
{
- ssn.connectionClose(503, "no method segment", 0, 0);
+ ssn.connectionClose(ConnectionCloseCode.FRAMING_ERROR,
+ "no method segment");
ssn.close();
return;
}
@@ -136,7 +137,7 @@ class ToyBroker extends SessionDelegate
{
if (xfr == null || body == null)
{
- ssn.connectionClose(503, "no method segment", 0, 0);
+ ssn.connectionClose(ConnectionCloseCode.FRAMING_ERROR, "no method segment");
ssn.close();
return;
}
@@ -174,14 +175,16 @@ class ToyBroker extends SessionDelegate
{
RangeSet ranges = new RangeSet();
ranges.add(xfr.getId());
- ssn.messageReject(ranges, 0, "no such destination");
+ ssn.messageReject(ranges, MessageRejectCode.UNROUTABLE,
+ "no such destination");
}
}
private void transferMessageToPeer(Session ssn,String dest, Message m)
{
System.out.println("\n==================> Transfering message to: " +dest + "\n");
- ssn.messageTransfer(dest, (short)0, (short)0);
+ ssn.messageTransfer(dest, MessageAcceptMode.EXPLICIT,
+ MessageAcquireMode.PRE_ACQUIRED);
ssn.header(m.header);
for (Data d : m.body)
{
diff --git a/java/common/src/main/java/org/apache/qpidity/ToyClient.java b/java/common/src/main/java/org/apache/qpidity/ToyClient.java
index 10b68bbb20..a3233afcbe 100644
--- a/java/common/src/main/java/org/apache/qpidity/ToyClient.java
+++ b/java/common/src/main/java/org/apache/qpidity/ToyClient.java
@@ -63,7 +63,7 @@ class ToyClient extends SessionDelegate
public static final void main(String[] args)
{
Connection conn = MinaHandler.connect("0.0.0.0", 5672,
- new ConnectionDelegate()
+ new ClientDelegate()
{
public SessionDelegate getSessionDelegate()
{
@@ -80,9 +80,9 @@ class ToyClient extends SessionDelegate
TransportConstants.getVersionMinor())));
Channel ch = conn.getChannel(0);
- Session ssn = new Session();
+ Session ssn = new Session("my-session".getBytes());
ssn.attach(ch);
- ssn.sessionOpen(1234);
+ ssn.sessionAttach(ssn.getName());
ssn.queueDeclare("asdf", null, null);
ssn.sync();
@@ -111,13 +111,15 @@ class ToyClient extends SessionDelegate
map.put("list", Arrays.asList(1, 2, 3));
map.put("binary", new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10});
- ssn.messageTransfer("asdf", (short) 0, (short) 1);
+ ssn.messageTransfer("asdf", MessageAcceptMode.EXPLICIT,
+ MessageAcquireMode.PRE_ACQUIRED);
ssn.header(new DeliveryProperties(),
new MessageProperties().setApplicationHeaders(map));
ssn.data("this is the data");
ssn.endData();
- ssn.messageTransfer("fdsa", (short) 0, (short) 1);
+ ssn.messageTransfer("fdsa", MessageAcceptMode.EXPLICIT,
+ MessageAcquireMode.PRE_ACQUIRED);
ssn.data("this should be rejected");
ssn.endData();
ssn.sync();
diff --git a/java/common/src/main/java/org/apache/qpidity/dtx/XidImpl.java b/java/common/src/main/java/org/apache/qpidity/dtx/XidImpl.java
index 224917ade1..89d7e7917f 100644
--- a/java/common/src/main/java/org/apache/qpidity/dtx/XidImpl.java
+++ b/java/common/src/main/java/org/apache/qpidity/dtx/XidImpl.java
@@ -241,28 +241,10 @@ public class XidImpl implements Xid
* @return The String representation of this Xid
* @throws QpidException In case of problem when converting this Xid into a string.
*/
- public static String convertToString(Xid xid) throws QpidException
+ public static org.apache.qpidity.transport.Xid convert(Xid xid) throws QpidException
{
- if (_logger.isDebugEnabled())
- {
- _logger.debug("converting " + xid + " into a String");
- }
- try
- {
- ByteArrayOutputStream res = new ByteArrayOutputStream();
- DataOutputStream out = new DataOutputStream(res);
- out.writeLong(xid.getFormatId());
- byte[] txId = xid.getGlobalTransactionId();
- byte[] brId = xid.getBranchQualifier();
- out.writeByte(txId.length);
- out.writeByte(brId.length);
- out.write(txId);
- out.write(brId);
- return res.toString();
- }
- catch (IOException e)
- {
- throw new QpidException("cannot convert the xid " + xid + " into a String", null, e);
- }
+ return new org.apache.qpidity.transport.Xid(xid.getFormatId(),
+ xid.getGlobalTransactionId(),
+ xid.getBranchQualifier());
}
}
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Channel.java b/java/common/src/main/java/org/apache/qpidity/transport/Channel.java
index 7327697088..651402bcb7 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/Channel.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/Channel.java
@@ -77,7 +77,7 @@ public class Channel extends Invoker
connection.getConnectionDelegate().init(this, hdr);
}
- public void method(Void v, Method method)
+ public void control(Void v, Method method)
{
switch (method.getEncodedTrack())
{
@@ -90,15 +90,17 @@ public class Channel extends Invoker
case L3:
method.delegate(session, sessionDelegate);
break;
- case L4:
- method.delegate(session, sessionDelegate);
- break;
default:
throw new IllegalStateException
("unknown track: " + method.getEncodedTrack());
}
}
+ public void command(Void v, Method method)
+ {
+ method.delegate(session, sessionDelegate);
+ }
+
public void header(Void v, Header header)
{
header.delegate(session, sessionDelegate);
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/ChannelDelegate.java b/java/common/src/main/java/org/apache/qpidity/transport/ChannelDelegate.java
index 84621fbe25..7f4365f515 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/ChannelDelegate.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/ChannelDelegate.java
@@ -32,15 +32,14 @@ import java.util.UUID;
class ChannelDelegate extends MethodDelegate<Channel>
{
- public @Override void sessionOpen(Channel channel, SessionOpen open)
+ public @Override void sessionAttach(Channel channel, SessionAttach atch)
{
- Session ssn = new Session();
+ Session ssn = new Session(atch.getName());
ssn.attach(channel);
- long lifetime = open.getDetachedLifetime();
- ssn.sessionAttached(UUID.randomUUID(), lifetime);
+ ssn.sessionAttached(ssn.getName());
}
- public @Override void sessionClosed(Channel channel, SessionClosed closed)
+ public @Override void sessionDetached(Channel channel, SessionDetached closed)
{
channel.getSession().closed();
// XXX: should we remove the channel from the connection? It
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/ClientDelegate.java b/java/common/src/main/java/org/apache/qpidity/transport/ClientDelegate.java
new file mode 100644
index 0000000000..699854fb3b
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpidity/transport/ClientDelegate.java
@@ -0,0 +1,41 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpidity.transport;
+
+
+/**
+ * ClientDelegate
+ *
+ */
+
+public abstract class ClientDelegate extends ConnectionDelegate
+{
+
+ public void init(Channel ch, ProtocolHeader hdr)
+ {
+ if (hdr.getMajor() != TransportConstants.getVersionMajor() &&
+ hdr.getMinor() != TransportConstants.getVersionMinor())
+ {
+ throw new RuntimeException("version missmatch: " + hdr);
+ }
+ }
+
+}
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/ConnectionDelegate.java b/java/common/src/main/java/org/apache/qpidity/transport/ConnectionDelegate.java
index 4815f1025f..cb5f05a185 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/ConnectionDelegate.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/ConnectionDelegate.java
@@ -26,7 +26,10 @@ import org.apache.qpidity.SecurityHelper;
import org.apache.qpidity.QpidException;
import java.io.UnsupportedEncodingException;
+
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
@@ -79,17 +82,27 @@ public abstract class ConnectionDelegate extends MethodDelegate<Channel>
public void init(Channel ch, ProtocolHeader hdr)
{
- // XXX: hardcoded version
- if (hdr.getMajor() != 0 && hdr.getMinor() != 10)
+ ch.getConnection().send(new ConnectionEvent(0, new ProtocolHeader
+ (1,
+ TransportConstants.getVersionMajor(),
+ TransportConstants.getVersionMinor())));
+ if (hdr.getMajor() != TransportConstants.getVersionMajor() &&
+ hdr.getMinor() != TransportConstants.getVersionMinor())
{
// XXX
- ch.getConnection().send(new ConnectionEvent(0, new ProtocolHeader(1, TransportConstants.getVersionMajor(),
- TransportConstants.getVersionMinor())));
+ ch.getConnection().send(new ConnectionEvent(0, new ProtocolHeader
+ (1,
+ TransportConstants.getVersionMajor(),
+ TransportConstants.getVersionMinor())));
ch.getConnection().close();
}
else
{
- ch.connectionStart(hdr.getMajor(), hdr.getMinor(), null, "PLAIN", "utf8");
+ List<Object> plain = new ArrayList<Object>();
+ plain.add("PLAIN");
+ List<Object> utf8 = new ArrayList<Object>();
+ utf8.add("utf8");
+ ch.connectionStart(null, plain, utf8);
}
}
@@ -99,13 +112,13 @@ public abstract class ConnectionDelegate extends MethodDelegate<Channel>
@Override public void connectionStart(Channel context, ConnectionStart struct)
{
String mechanism = null;
- String response = null;
+ byte[] response = null;
try
{
mechanism = SecurityHelper.chooseMechanism(struct.getMechanisms());
saslClient = Sasl.createSaslClient(new String[]{ mechanism },null, "AMQP", "localhost", null,
SecurityHelper.createCallbackHandler(mechanism,_username,_password ));
- response = new String(saslClient.evaluateChallenge(new byte[0]),_locale);
+ response = saslClient.evaluateChallenge(new byte[0]);
}
catch (UnsupportedEncodingException e)
{
@@ -128,13 +141,9 @@ public abstract class ConnectionDelegate extends MethodDelegate<Channel>
{
try
{
- String response = new String(saslClient.evaluateChallenge(struct.getChallenge().getBytes()),_locale);
+ byte[] response = saslClient.evaluateChallenge(struct.getChallenge());
context.connectionSecureOk(response);
}
- catch (UnsupportedEncodingException e)
- {
- // need error handling
- }
catch (SaslException e)
{
// need error handling
@@ -144,14 +153,14 @@ public abstract class ConnectionDelegate extends MethodDelegate<Channel>
@Override public void connectionTune(Channel context, ConnectionTune struct)
{
// should update the channel max given by the broker.
- context.connectionTuneOk(struct.getChannelMax(), struct.getFrameMax(), struct.getHeartbeat());
+ context.connectionTuneOk(struct.getChannelMax(), struct.getMaxFrameSize(), struct.getHeartbeatMax());
context.connectionOpen(_virtualHost, null, Option.INSIST);
}
@Override public void connectionOpenOk(Channel context, ConnectionOpenOk struct)
{
- String knownHosts = struct.getKnownHosts();
+ List<Object> knownHosts = struct.getKnownHosts();
if(_negotiationCompleteLock != null)
{
_negotiationCompleteLock.lock();
@@ -187,13 +196,13 @@ public abstract class ConnectionDelegate extends MethodDelegate<Channel>
byte[] challenge = null;
if ( challenge == null)
{
- context.connectionTune(Integer.MAX_VALUE,maxFrame, 0);
+ context.connectionTune(Integer.MAX_VALUE, maxFrame, 0, Integer.MAX_VALUE);
}
else
{
try
{
- context.connectionSecure(new String(challenge,_locale));
+ context.connectionSecure(challenge);
}
catch(Exception e)
{
@@ -218,16 +227,16 @@ public abstract class ConnectionDelegate extends MethodDelegate<Channel>
try
{
saslServer = Sasl.createSaslServer(_mechanism, "AMQP", "ABC",new HashMap(),SecurityHelper.createCallbackHandler(_mechanism,_username,_password));
- byte[] challenge = saslServer.evaluateResponse(struct.getResponse().getBytes());
+ byte[] challenge = saslServer.evaluateResponse(struct.getResponse());
if ( challenge == null)
{
- context.connectionTune(Integer.MAX_VALUE,maxFrame, 0);
+ context.connectionTune(Integer.MAX_VALUE, maxFrame, 0, Integer.MAX_VALUE);
}
else
{
try
{
- context.connectionSecure(new String(challenge,_locale));
+ context.connectionSecure(challenge);
}
catch(Exception e)
{
@@ -250,8 +259,9 @@ public abstract class ConnectionDelegate extends MethodDelegate<Channel>
@Override public void connectionOpen(Channel context, ConnectionOpen struct)
{
- String hosts = "amqp:1223243232325";
- context.connectionOpenOk(hosts);
+ List<Object> hosts = new ArrayList<Object>();
+ hosts.add("amqp:1223243232325");
+ context.connectionOpenOk(hosts);
}
public String getPassword()
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Data.java b/java/common/src/main/java/org/apache/qpidity/transport/Data.java
index b9b8636d18..72c1c3331d 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/Data.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/Data.java
@@ -93,7 +93,7 @@ public class Data implements ProtocolEvent
{
str.append(" | ");
}
- str.append(str(buf, 20));
+ str.append(str(buf));
}
str.append(")");
return str.toString();
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Method.java b/java/common/src/main/java/org/apache/qpidity/transport/Method.java
index 7304260333..bc4ec6289d 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/Method.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/Method.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpidity.transport;
+import org.apache.qpidity.transport.network.Frame;
/**
* Method
@@ -34,11 +35,12 @@ public abstract class Method extends Struct implements ProtocolEvent
{
// XXX: should generate separate factories for separate
// namespaces
- return (Method) Struct.create(type);
+ return (Method) StructFactory.createInstruction(type);
}
// XXX: command subclass?
private long id;
+ private boolean sync = false;
public final long getId()
{
@@ -50,6 +52,16 @@ public abstract class Method extends Struct implements ProtocolEvent
this.id = id;
}
+ public final boolean isSync()
+ {
+ return sync;
+ }
+
+ void setSync(boolean value)
+ {
+ this.sync = value;
+ }
+
public abstract boolean hasPayload();
public abstract byte getEncodedTrack();
@@ -58,7 +70,14 @@ public abstract class Method extends Struct implements ProtocolEvent
public <C> void delegate(C context, ProtocolDelegate<C> delegate)
{
- delegate.method(context, this);
+ if (getEncodedTrack() == Frame.L4)
+ {
+ delegate.command(context, this);
+ }
+ else
+ {
+ delegate.control(context, this);
+ }
}
}
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/ProtocolDelegate.java b/java/common/src/main/java/org/apache/qpidity/transport/ProtocolDelegate.java
index 6149c3876a..028d570416 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/ProtocolDelegate.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/ProtocolDelegate.java
@@ -31,7 +31,9 @@ public interface ProtocolDelegate<C>
void init(C context, ProtocolHeader header);
- void method(C context, Method method);
+ void control(C context, Method control);
+
+ void command(C context, Method command);
void header(C context, Header header);
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/RangeSet.java b/java/common/src/main/java/org/apache/qpidity/transport/RangeSet.java
index 97d4be0d2e..ccbcfa99d0 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/RangeSet.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/RangeSet.java
@@ -98,6 +98,13 @@ public class RangeSet implements Iterable<Range>
ranges.clear();
}
+ public RangeSet copy()
+ {
+ RangeSet copy = new RangeSet();
+ copy.ranges.addAll(ranges);
+ return copy;
+ }
+
public String toString()
{
StringBuffer str = new StringBuffer();
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Session.java b/java/common/src/main/java/org/apache/qpidity/transport/Session.java
index e4f4af95a5..f6f4062c6d 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/Session.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/Session.java
@@ -32,6 +32,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
+import static org.apache.qpidity.transport.Option.*;
/**
* Session
@@ -56,23 +57,34 @@ public class Session extends Invoker
private static boolean ENABLE_REPLAY = false;
private static final Logger log = Logger.get(Session.class);
+ private byte[] name;
+ private long timeout = 60000;
+
// channel may be null
Channel channel;
// incoming command count
- private long commandsIn = 0;
+ long commandsIn = 0;
// completed incoming commands
private final RangeSet processed = new RangeSet();
- private long processedMark = -1;
private Range syncPoint = null;
// outgoing command count
private long commandsOut = 0;
private Map<Long,Method> commands = new HashMap<Long,Method>();
- private long mark = 0;
+ private long maxComplete = -1;
private AtomicBoolean closed = new AtomicBoolean(false);
+ public Session(byte[] name)
+ {
+ this.name = name;
+ }
+
+ public byte[] getName()
+ {
+ return name;
+ }
public Map<Long,Method> getOutstandingCommands()
{
@@ -133,25 +145,12 @@ public class Session extends Invoker
public void flushProcessed()
{
- boolean first = true;
- RangeSet rest = new RangeSet();
+ RangeSet copy;
synchronized (processed)
{
- for (Range r: processed)
- {
- if (first && r.includes(processedMark))
- {
- processedMark = r.getUpper();
- }
- else
- {
- rest.add(r);
- }
-
- first = false;
- }
+ copy = processed.copy();
}
- executionComplete(processedMark, rest);
+ sessionCompleted(copy);
}
void syncPoint()
@@ -193,34 +192,34 @@ public class Session extends Invoker
log.debug("%s complete(%d, %d)", this, lower, upper);
synchronized (commands)
{
- for (long id = lower; id <= upper; id++)
+ for (long id = maxComplete; id <= upper; id++)
{
commands.remove(id);
}
+ if (lower <= maxComplete + 1)
+ {
+ maxComplete = Math.max(maxComplete, upper);
+ }
commands.notifyAll();
log.debug("%s commands remaining: %s", this, commands);
}
}
- void complete(long mark)
- {
- synchronized (commands)
- {
- complete(this.mark, mark);
- this.mark = mark;
- commands.notifyAll();
- }
- }
-
protected void invoke(Method m)
{
if (m.getEncodedTrack() == Frame.L4)
{
synchronized (commands)
{
- // You only need to keep the command if you need command level replay.
- // If not we only need to keep track of commands to make sync work
- commands.put(commandsOut++,(ENABLE_REPLAY?m:null));
+ long next = commandsOut++;
+ if (next == 0)
+ {
+ sessionCommandPoint(0, 0);
+ }
+ if (ENABLE_REPLAY)
+ {
+ commands.put(next, m);
+ }
channel.method(m);
}
}
@@ -230,7 +229,7 @@ public class Session extends Invoker
}
}
- public void header(Header header)
+ public void header(Header header)
{
channel.header(header);
}
@@ -269,21 +268,32 @@ public class Session extends Invoker
public void sync()
{
+ sync(timeout);
+ }
+
+ public void sync(long timeout)
+ {
log.debug("%s sync()", this);
synchronized (commands)
{
long point = commandsOut - 1;
- if (mark < point)
+ if (maxComplete < point)
{
- executionSync();
+ ExecutionSync sync = new ExecutionSync();
+ sync.setSync(true);
+ invoke(sync);
}
- while (!closed.get() && mark < point)
+ long start = System.currentTimeMillis();
+ long elapsed = 0;
+ while (!closed.get() && elapsed < timeout && maxComplete < point)
{
try {
- log.debug("%s waiting for[%d]: %s", this, point, commands);
- commands.wait();
+ log.debug("%s waiting for[%d]: %d, %s", this, point,
+ maxComplete, commands);
+ commands.wait(timeout - elapsed);
+ elapsed = System.currentTimeMillis() - start;
}
catch (InterruptedException e)
{
@@ -291,9 +301,16 @@ public class Session extends Invoker
}
}
- if (mark < point)
+ if (maxComplete < point)
{
- throw new RuntimeException("session closed");
+ if (closed.get())
+ {
+ throw new RuntimeException("session closed");
+ }
+ else
+ {
+ throw new RuntimeException("timed out waiting for sync");
+ }
}
}
}
@@ -346,16 +363,19 @@ public class Session extends Invoker
}
}
- public T get(long timeout, int nanos)
+ public T get(long timeout)
{
synchronized (this)
{
- while (!closed.get() && !isDone())
+ long start = System.currentTimeMillis();
+ long elapsed = 0;
+ while (!closed.get() && timeout - elapsed > 0 && !isDone())
{
try
{
log.debug("%s waiting for result: %s", Session.this, this);
- wait(timeout, nanos);
+ wait(timeout - elapsed);
+ elapsed = System.currentTimeMillis() - start;
}
catch (InterruptedException e)
{
@@ -364,22 +384,23 @@ public class Session extends Invoker
}
}
- if (!isDone())
+ if (isDone())
+ {
+ return result;
+ }
+ else if (closed.get())
{
throw new RuntimeException("session closed");
}
-
- return result;
- }
-
- public T get(long timeout)
- {
- return get(timeout, 0);
+ else
+ {
+ return null;
+ }
}
public T get()
{
- return get(0);
+ return get(timeout);
}
public boolean isDone()
@@ -396,7 +417,8 @@ public class Session extends Invoker
public void close()
{
- sessionClose();
+ sessionRequestTimeout(0);
+ sessionDetach(name);
// XXX: channel.close();
}
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/SessionDelegate.java b/java/common/src/main/java/org/apache/qpidity/transport/SessionDelegate.java
index 5e36a77a98..442aba0e9b 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/SessionDelegate.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/SessionDelegate.java
@@ -35,20 +35,16 @@ public abstract class SessionDelegate
{
public void init(Session ssn, ProtocolHeader hdr) { }
- public void method(Session ssn, Method method) {
- if (method.getEncodedTrack() == Frame.L4)
- {
- method.setId(ssn.nextCommandId());
- }
-
+ public void control(Session ssn, Method method) {
method.dispatch(ssn, this);
+ }
- if (method.getEncodedTrack() == Frame.L4)
+ public void command(Session ssn, Method method) {
+ method.setId(ssn.nextCommandId());
+ method.dispatch(ssn, this);
+ if (!method.hasPayload())
{
- if (!method.hasPayload())
- {
- ssn.processed(method);
- }
+ ssn.processed(method);
}
}
@@ -60,12 +56,12 @@ public abstract class SessionDelegate
@Override public void executionResult(Session ssn, ExecutionResult result)
{
- ssn.result(result.getCommandId(), result.getData());
+ ssn.result(result.getCommandId(), result.getValue());
}
- @Override public void executionComplete(Session ssn, ExecutionComplete excmp)
+ @Override public void sessionCompleted(Session ssn, SessionCompleted cmp)
{
- RangeSet ranges = excmp.getRangedExecutionSet();
+ RangeSet ranges = cmp.getCommands();
if (ranges != null)
{
for (Range range : ranges)
@@ -73,12 +69,27 @@ public abstract class SessionDelegate
ssn.complete(range.getLower(), range.getUpper());
}
}
- ssn.complete(excmp.getCumulativeExecutionMark());
}
- @Override public void executionFlush(Session ssn, ExecutionFlush flush)
+ @Override public void sessionFlush(Session ssn, SessionFlush flush)
+ {
+ if (flush.getCompleted())
+ {
+ ssn.flushProcessed();
+ }
+ if (flush.getConfirmed())
+ {
+ throw new Error("not implemented");
+ }
+ if (flush.getExpected())
+ {
+ throw new Error("not implemented");
+ }
+ }
+
+ @Override public void sessionCommandPoint(Session ssn, SessionCommandPoint scp)
{
- ssn.flushProcessed();
+ ssn.commandsIn = scp.getCommandId();
}
@Override public void executionSync(Session ssn, ExecutionSync sync)
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Struct.java b/java/common/src/main/java/org/apache/qpidity/transport/Struct.java
index 200c3b68e3..f901f9e840 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/Struct.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/Struct.java
@@ -71,8 +71,6 @@ public abstract class Struct implements Encodable
return type;
}
- public abstract boolean hasTicket();
-
private final boolean isBit(Field<?,?> f)
{
return f.getType().equals(Boolean.class);
@@ -80,14 +78,7 @@ public abstract class Struct implements Encodable
private final boolean packed()
{
- if (this instanceof Method)
- {
- return false;
- }
- else
- {
- return true;
- }
+ return getPackWidth() > 0;
}
private final boolean encoded(Field<?,?> f)
@@ -147,11 +138,6 @@ public abstract class Struct implements Encodable
}
}
- if (hasTicket())
- {
- dec.readShort();
- }
-
for (Field<?,?> f : fields)
{
if (encoded(f))
@@ -187,11 +173,6 @@ public abstract class Struct implements Encodable
}
}
- if (hasTicket())
- {
- enc.writeShort(0x0);
- }
-
for (Field<?,?> f : fields)
{
if (encoded(f))
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/TransportConstants.java b/java/common/src/main/java/org/apache/qpidity/transport/TransportConstants.java
index 54429a1a4f..e9a0705de0 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/TransportConstants.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/TransportConstants.java
@@ -2,8 +2,9 @@ package org.apache.qpidity.transport;
public class TransportConstants
{
- private static byte _protocol_version_minor = 0;
- private static byte _protocol_version_major = 99;
+
+ private static byte _protocol_version_minor = 10;
+ private static byte _protocol_version_major = 0;
public static void setVersionMajor(byte value)
{
@@ -24,4 +25,5 @@ public class TransportConstants
{
return _protocol_version_minor;
}
+
}
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractDecoder.java b/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractDecoder.java
index ae67483a23..0f6180f54a 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractDecoder.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractDecoder.java
@@ -20,7 +20,10 @@
*/
package org.apache.qpidity.transport.codec;
+import java.io.UnsupportedEncodingException;
+
import java.util.ArrayList;
+import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -99,19 +102,19 @@ abstract class AbstractDecoder implements Decoder
nbits = 0;
}
- public short readOctet()
+ public short readUint8()
{
return uget();
}
- public int readShort()
+ public int readUint16()
{
int i = uget() << 8;
i |= uget();
return i;
}
- public long readLong()
+ public long readUint32()
{
long l = uget() << 24;
l |= uget() << 16;
@@ -120,7 +123,12 @@ abstract class AbstractDecoder implements Decoder
return l;
}
- public long readLonglong()
+ public int readSequenceNo()
+ {
+ return (int) readUint32();
+ }
+
+ public long readUint64()
{
long l = 0;
for (int i = 0; i < 8; i++)
@@ -130,31 +138,67 @@ abstract class AbstractDecoder implements Decoder
return l;
}
- public long readTimestamp()
+ public long readDatetime()
+ {
+ return readUint64();
+ }
+
+ private static final String decode(byte[] bytes, String charset)
{
- return readLonglong();
+ try
+ {
+ return new String(bytes, charset);
+ }
+ catch (UnsupportedEncodingException e)
+ {
+ throw new RuntimeException(e);
+ }
}
- public String readShortstr()
+ public String readStr8()
+ {
+ short size = readUint8();
+ byte[] bytes = new byte[size];
+ get(bytes);
+ return decode(bytes, "UTF-8");
+ }
+
+ public String readStr16()
{
- short size = readOctet();
+ int size = readUint16();
byte[] bytes = new byte[size];
get(bytes);
- return new String(bytes);
+ return decode(bytes, "UTF-8");
}
- public String readLongstr()
+ public byte[] readVbin8()
{
- long size = readLong();
- byte[] bytes = new byte[(int) size];
+ int size = readUint8();
+ byte[] bytes = new byte[size];
get(bytes);
- return new String(bytes);
+ return bytes;
}
- public RangeSet readRfc1982LongSet()
+ public byte[] readVbin16()
{
- int count = readShort()/8;
+ int size = readUint16();
+ byte[] bytes = new byte[size];
+ get(bytes);
+ return bytes;
+ }
+
+ public byte[] readVbin32()
+ {
+ int size = (int) readUint32();
+ byte[] bytes = new byte[size];
+ get(bytes);
+ return bytes;
+ }
+
+ public RangeSet readSequenceSet()
+ {
+ int count = readUint16()/8;
if (count == 0)
{
return null;
@@ -164,16 +208,21 @@ abstract class AbstractDecoder implements Decoder
RangeSet ranges = new RangeSet();
for (int i = 0; i < count; i++)
{
- ranges.add(readLong(), readLong());
+ ranges.add(readUint32(), readUint32());
}
return ranges;
}
}
+ public RangeSet readByteRanges()
+ {
+ throw new Error("not implemented");
+ }
+
public UUID readUuid()
{
- long msb = readLonglong();
- long lsb = readLonglong();
+ long msb = readUint64();
+ long lsb = readUint64();
return new UUID(msb, lsb);
}
@@ -194,34 +243,39 @@ abstract class AbstractDecoder implements Decoder
return null;
}
}
+ if (type > 0)
+ {
+ int code = readUint16();
+ assert code == type;
+ }
st.read(this);
return st;
}
- public Struct readLongStruct()
+ public Struct readStruct32()
{
- long size = readLong();
+ long size = readUint32();
if (size == 0)
{
return null;
}
else
{
- int type = readShort();
+ int type = readUint16();
Struct result = Struct.create(type);
result.read(this);
return result;
}
}
- public Map<String,Object> readTable()
+ public Map<String,Object> readMap()
{
- long size = readLong();
+ long size = readUint32();
int start = count;
Map<String,Object> result = new LinkedHashMap();
while (count < start + size)
{
- String key = readShortstr();
+ String key = readStr8();
byte code = get();
Type t = getType(code);
Object value = read(t);
@@ -230,9 +284,9 @@ abstract class AbstractDecoder implements Decoder
return result;
}
- public List<Object> readSequence()
+ public List<Object> readList()
{
- long size = readLong();
+ long size = readUint32();
int start = count;
List<Object> result = new ArrayList();
while (count < start + size)
@@ -247,10 +301,15 @@ abstract class AbstractDecoder implements Decoder
public List<Object> readArray()
{
- long size = readLong();
+ long size = readUint32();
+ if (size == 0)
+ {
+ return Collections.EMPTY_LIST;
+ }
+
byte code = get();
Type t = getType(code);
- long count = readLong();
+ long count = readUint32();
List<Object> result = new ArrayList<Object>();
for (int i = 0; i < count; i++)
@@ -291,11 +350,11 @@ abstract class AbstractDecoder implements Decoder
switch (width)
{
case 1:
- return readOctet();
+ return readUint8();
case 2:
- return readShort();
+ return readUint16();
case 4:
- return readLong();
+ return readUint32();
default:
throw new IllegalStateException("illegal width: " + width);
}
@@ -313,81 +372,72 @@ abstract class AbstractDecoder implements Decoder
{
switch (t)
{
- case OCTET:
- case UNSIGNED_BYTE:
- return readOctet();
- case SIGNED_BYTE:
+ case BIN8:
+ case UINT8:
+ return readUint8();
+ case INT8:
return get();
case CHAR:
return (char) get();
case BOOLEAN:
return get() > 0;
- case TWO_OCTETS:
- case UNSIGNED_SHORT:
- return readShort();
+ case BIN16:
+ case UINT16:
+ return readUint16();
- case SIGNED_SHORT:
- return (short) readShort();
+ case INT16:
+ return (short) readUint16();
- case FOUR_OCTETS:
- case UNSIGNED_INT:
- return readLong();
+ case BIN32:
+ case UINT32:
+ return readUint32();
- case UTF32_CHAR:
- case SIGNED_INT:
- return (int) readLong();
+ case CHAR_UTF32:
+ case INT32:
+ return (int) readUint32();
case FLOAT:
- return Float.intBitsToFloat((int) readLong());
+ return Float.intBitsToFloat((int) readUint32());
- case EIGHT_OCTETS:
- case SIGNED_LONG:
- case UNSIGNED_LONG:
+ case BIN64:
+ case UINT64:
+ case INT64:
case DATETIME:
- return readLonglong();
+ return readUint64();
case DOUBLE:
- return Double.longBitsToDouble(readLonglong());
-
- case SIXTEEN_OCTETS:
- case THIRTY_TWO_OCTETS:
- case SIXTY_FOUR_OCTETS:
- case _128_OCTETS:
- case SHORT_BINARY:
- case BINARY:
- case LONG_BINARY:
- return readBytes(t);
+ return Double.longBitsToDouble(readUint64());
case UUID:
return readUuid();
- case SHORT_STRING:
- case SHORT_UTF8_STRING:
- case SHORT_UTF16_STRING:
- case SHORT_UTF32_STRING:
- case STRING:
- case UTF8_STRING:
- case UTF16_STRING:
- case UTF32_STRING:
- case LONG_STRING:
- case LONG_UTF8_STRING:
- case LONG_UTF16_STRING:
- case LONG_UTF32_STRING:
+ case STR8:
+ return readStr8();
+
+ case STR16:
+ return readStr16();
+
+ case STR8_LATIN:
+ case STR8_UTF16:
+ case STR16_LATIN:
+ case STR16_UTF16:
// XXX: need to do character conversion
return new String(readBytes(t));
- case TABLE:
- return readTable();
- case SEQUENCE:
- return readSequence();
+ case MAP:
+ return readMap();
+ case LIST:
+ return readList();
case ARRAY:
return readArray();
+ case STRUCT32:
+ return readStruct32();
- case FIVE_OCTETS:
- case DECIMAL:
- case NINE_OCTETS:
- case LONG_DECIMAL:
+ case BIN40:
+ case DEC32:
+ case BIN72:
+ case DEC64:
// XXX: what types are we supposed to use here?
return readBytes(t);
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractEncoder.java b/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractEncoder.java
index c2dd205d66..56b4537719 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractEncoder.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractEncoder.java
@@ -20,8 +20,11 @@
*/
package org.apache.qpidity.transport.codec;
+import java.io.UnsupportedEncodingException;
+
import java.nio.ByteBuffer;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -48,17 +51,17 @@ abstract class AbstractEncoder implements Encoder
static
{
ENCODINGS.put(Boolean.class, Type.BOOLEAN);
- ENCODINGS.put(String.class, Type.LONG_STRING);
- ENCODINGS.put(Long.class, Type.SIGNED_LONG);
- ENCODINGS.put(Integer.class, Type.SIGNED_INT);
- ENCODINGS.put(Short.class, Type.SIGNED_SHORT);
- ENCODINGS.put(Byte.class, Type.SIGNED_BYTE);
- ENCODINGS.put(Map.class, Type.TABLE);
- ENCODINGS.put(List.class, Type.SEQUENCE);
+ ENCODINGS.put(String.class, Type.STR16);
+ ENCODINGS.put(Long.class, Type.INT64);
+ ENCODINGS.put(Integer.class, Type.INT32);
+ ENCODINGS.put(Short.class, Type.INT16);
+ ENCODINGS.put(Byte.class, Type.INT8);
+ ENCODINGS.put(Map.class, Type.MAP);
+ ENCODINGS.put(List.class, Type.LIST);
ENCODINGS.put(Float.class, Type.FLOAT);
ENCODINGS.put(Double.class, Type.DOUBLE);
ENCODINGS.put(Character.class, Type.CHAR);
- ENCODINGS.put(byte[].class, Type.LONG_BINARY);
+ ENCODINGS.put(byte[].class, Type.VBIN32);
}
protected Sizer sizer()
@@ -120,14 +123,14 @@ abstract class AbstractEncoder implements Encoder
flushBits();
}
- public void writeOctet(short b)
+ public void writeUint8(short b)
{
assert b < 0x100;
put((byte) b);
}
- public void writeShort(int s)
+ public void writeUint16(int s)
{
assert s < 0x10000;
@@ -135,7 +138,7 @@ abstract class AbstractEncoder implements Encoder
put(lsb(s));
}
- public void writeLong(long i)
+ public void writeUint32(long i)
{
assert i < 0x100000000L;
@@ -145,7 +148,12 @@ abstract class AbstractEncoder implements Encoder
put(lsb(i));
}
- public void writeLonglong(long l)
+ public void writeSequenceNo(int i)
+ {
+ writeUint32(i);
+ }
+
+ public void writeUint64(long l)
{
for (int i = 0; i < 8; i++)
{
@@ -154,47 +162,101 @@ abstract class AbstractEncoder implements Encoder
}
- public void writeTimestamp(long l)
+ public void writeDatetime(long l)
+ {
+ writeUint64(l);
+ }
+
+ private static final String checkLength(String s, int n)
+ {
+ if (s == null)
+ {
+ return "";
+ }
+
+ if (s.length() > n)
+ {
+ throw new IllegalArgumentException("string too long: " + s);
+ }
+ else
+ {
+ return s;
+ }
+ }
+
+ private static final byte[] encode(String s, String charset)
+ {
+ try
+ {
+ return s.getBytes(charset);
+ }
+ catch (UnsupportedEncodingException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void writeStr8(String s)
{
- writeLonglong(l);
+ s = checkLength(s, 255);
+ writeUint8((short) s.length());
+ put(ByteBuffer.wrap(encode(s, "UTF-8")));
}
+ public void writeStr16(String s)
+ {
+ s = checkLength(s, 65535);
+ writeUint16(s.length());
+ put(ByteBuffer.wrap(encode(s, "UTF-8")));
+ }
- public void writeShortstr(String s)
+ public void writeVbin8(byte[] bytes)
{
- if (s == null) { s = ""; }
- if (s.length() > 255) {
- throw new IllegalArgumentException(s);
+ if (bytes == null) { bytes = new byte[0]; }
+ if (bytes.length > 255)
+ {
+ throw new IllegalArgumentException("array too long: " + bytes.length);
}
- writeOctet((short) s.length());
- put(ByteBuffer.wrap(s.getBytes()));
+ writeUint8((short) bytes.length);
+ put(ByteBuffer.wrap(bytes));
}
- public void writeLongstr(String s)
+ public void writeVbin16(byte[] bytes)
{
- if (s == null) { s = ""; }
- writeLong(s.length());
- put(ByteBuffer.wrap(s.getBytes()));
+ if (bytes == null) { bytes = new byte[0]; }
+ writeUint16(bytes.length);
+ put(ByteBuffer.wrap(bytes));
}
+ public void writeVbin32(byte[] bytes)
+ {
+ if (bytes == null) { bytes = new byte[0]; }
+ writeUint32(bytes.length);
+ put(ByteBuffer.wrap(bytes));
+ }
- public void writeRfc1982LongSet(RangeSet ranges)
+ public void writeSequenceSet(RangeSet ranges)
{
if (ranges == null)
{
- writeShort((short) 0);
+ writeUint16((short) 0);
}
else
{
- writeShort(ranges.size() * 8);
+ writeUint16(ranges.size() * 8);
for (Range range : ranges)
{
- writeLong(range.getLower());
- writeLong(range.getUpper());
+ writeUint32(range.getLower());
+ writeUint32(range.getUpper());
}
}
}
+ public void writeByteRanges(RangeSet ranges)
+ {
+ throw new Error("not implemented");
+ }
+
public void writeUuid(UUID uuid)
{
long msb = 0;
@@ -202,15 +264,10 @@ abstract class AbstractEncoder implements Encoder
if (uuid != null)
{
msb = uuid.getMostSignificantBits();
- uuid.getLeastSignificantBits();
+ lsb = uuid.getLeastSignificantBits();
}
- writeLonglong(msb);
- writeLonglong(lsb);
- }
-
- public void writeContent(String c)
- {
- throw new Error("Deprecated");
+ writeUint64(msb);
+ writeUint64(lsb);
}
public void writeStruct(int type, Struct s)
@@ -237,22 +294,27 @@ abstract class AbstractEncoder implements Encoder
}
}
+ if (type > 0)
+ {
+ writeUint16(type);
+ }
+
s.write(this);
}
- public void writeLongStruct(Struct s)
+ public void writeStruct32(Struct s)
{
if (s == null)
{
- writeLong(0);
+ writeUint32(0);
}
else
{
Sizer sizer = sizer();
- sizer.writeShort(s.getEncodedType());
+ sizer.writeUint16(s.getEncodedType());
s.write(sizer);
- writeLong(sizer.size());
- writeShort(s.getEncodedType());
+ writeUint32(sizer.size());
+ writeUint16(s.getEncodedType());
s.write(this);
}
}
@@ -309,46 +371,46 @@ abstract class AbstractEncoder implements Encoder
return null;
}
- public void writeTable(Map<String,Object> table)
+ public void writeMap(Map<String,Object> map)
{
- if (table == null)
+ if (map == null)
{
- writeLong(0);
+ writeUint32(0);
return;
}
Sizer sizer = sizer();
- sizer.writeTable(table);
+ sizer.writeMap(map);
// XXX: - 4
- writeLong(sizer.size() - 4);
- writeTableEntries(table);
+ writeUint32(sizer.size() - 4);
+ writeMapEntries(map);
}
- protected void writeTableEntries(Map<String,Object> table)
+ protected void writeMapEntries(Map<String,Object> map)
{
- for (Map.Entry<String,Object> entry : table.entrySet())
+ for (Map.Entry<String,Object> entry : map.entrySet())
{
String key = entry.getKey();
Object value = entry.getValue();
Type type = encoding(value);
- writeShortstr(key);
+ writeStr8(key);
put(type.code);
write(type, value);
}
}
- public void writeSequence(List<Object> sequence)
+ public void writeList(List<Object> list)
{
Sizer sizer = sizer();
- sizer.writeSequence(sequence);
+ sizer.writeList(list);
// XXX: - 4
- writeLong(sizer.size() - 4);
- writeSequenceEntries(sequence);
+ writeUint32(sizer.size() - 4);
+ writeListEntries(list);
}
- protected void writeSequenceEntries(List<Object> sequence)
+ protected void writeListEntries(List<Object> list)
{
- for (Object value : sequence)
+ for (Object value : list)
{
Type type = encoding(value);
put(type.code);
@@ -358,10 +420,15 @@ abstract class AbstractEncoder implements Encoder
public void writeArray(List<Object> array)
{
+ if (array == null)
+ {
+ array = Collections.EMPTY_LIST;
+ }
+
Sizer sizer = sizer();
sizer.writeArray(array);
// XXX: -4
- writeLong(sizer.size() - 4);
+ writeUint32(sizer.size() - 4);
writeArrayEntries(array);
}
@@ -371,7 +438,7 @@ abstract class AbstractEncoder implements Encoder
if (array.isEmpty())
{
- type = Type.VOID;
+ return;
}
else
{
@@ -380,6 +447,8 @@ abstract class AbstractEncoder implements Encoder
put(type.code);
+ writeUint32(array.size());
+
for (Object value : array)
{
write(type, value);
@@ -409,13 +478,13 @@ abstract class AbstractEncoder implements Encoder
switch (width)
{
case 1:
- writeOctet((short) size);
+ writeUint8((short) size);
break;
case 2:
- writeShort(size);
+ writeUint16(size);
break;
case 4:
- writeLong(size);
+ writeUint32(size);
break;
default:
throw new IllegalStateException("illegal width: " + width);
@@ -444,11 +513,11 @@ abstract class AbstractEncoder implements Encoder
{
switch (t)
{
- case OCTET:
- case UNSIGNED_BYTE:
- writeOctet(coerce(Short.class, value));
+ case BIN8:
+ case UINT8:
+ writeUint8(coerce(Short.class, value));
break;
- case SIGNED_BYTE:
+ case INT8:
put(coerce(Byte.class, value));
break;
case CHAR:
@@ -465,85 +534,78 @@ abstract class AbstractEncoder implements Encoder
}
break;
- case TWO_OCTETS:
- case UNSIGNED_SHORT:
- writeShort(coerce(Integer.class, value));
+ case BIN16:
+ case UINT16:
+ writeUint16(coerce(Integer.class, value));
break;
- case SIGNED_SHORT:
- writeShort(coerce(Short.class, value));
+ case INT16:
+ writeUint16(coerce(Short.class, value));
break;
- case FOUR_OCTETS:
- case UNSIGNED_INT:
- writeLong(coerce(Long.class, value));
+ case BIN32:
+ case UINT32:
+ writeUint32(coerce(Long.class, value));
break;
- case UTF32_CHAR:
- case SIGNED_INT:
- writeLong(coerce(Integer.class, value));
+ case CHAR_UTF32:
+ case INT32:
+ writeUint32(coerce(Integer.class, value));
break;
case FLOAT:
- writeLong(Float.floatToIntBits(coerce(Float.class, value)));
+ writeUint32(Float.floatToIntBits(coerce(Float.class, value)));
break;
- case EIGHT_OCTETS:
- case SIGNED_LONG:
- case UNSIGNED_LONG:
+ case BIN64:
+ case UINT64:
+ case INT64:
case DATETIME:
- writeLonglong(coerce(Long.class, value));
+ writeUint64(coerce(Long.class, value));
break;
case DOUBLE:
long bits = Double.doubleToLongBits(coerce(Double.class, value));
- writeLonglong(bits);
- break;
-
- case SIXTEEN_OCTETS:
- case THIRTY_TWO_OCTETS:
- case SIXTY_FOUR_OCTETS:
- case _128_OCTETS:
- case SHORT_BINARY:
- case BINARY:
- case LONG_BINARY:
- writeBytes(t, coerce(byte[].class, value));
+ writeUint64(bits);
break;
case UUID:
writeUuid(coerce(UUID.class, value));
break;
- case SHORT_STRING:
- case SHORT_UTF8_STRING:
- case SHORT_UTF16_STRING:
- case SHORT_UTF32_STRING:
- case STRING:
- case UTF8_STRING:
- case UTF16_STRING:
- case UTF32_STRING:
- case LONG_STRING:
- case LONG_UTF8_STRING:
- case LONG_UTF16_STRING:
- case LONG_UTF32_STRING:
+ case STR8:
+ writeStr8(coerce(String.class, value));
+ break;
+
+ case STR16:
+ writeStr16(coerce(String.class, value));
+ break;
+
+ case STR8_LATIN:
+ case STR8_UTF16:
+ case STR16_LATIN:
+ case STR16_UTF16:
// XXX: need to do character conversion
writeBytes(t, coerce(String.class, value).getBytes());
break;
- case TABLE:
- writeTable((Map<String,Object>) coerce(Map.class, value));
+ case MAP:
+ writeMap((Map<String,Object>) coerce(Map.class, value));
break;
- case SEQUENCE:
- writeSequence(coerce(List.class, value));
+ case LIST:
+ writeList(coerce(List.class, value));
break;
case ARRAY:
writeArray(coerce(List.class, value));
break;
+ case STRUCT32:
+ writeStruct32(coerce(Struct.class, value));
+ break;
- case FIVE_OCTETS:
- case DECIMAL:
- case NINE_OCTETS:
- case LONG_DECIMAL:
+ case BIN40:
+ case DEC32:
+ case BIN72:
+ case DEC64:
// XXX: what types are we supposed to use here?
writeBytes(t, coerce(byte[].class, value));
break;
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/codec/Decoder.java b/java/common/src/main/java/org/apache/qpidity/transport/codec/Decoder.java
index f0738e0a91..df2d208118 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/codec/Decoder.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/codec/Decoder.java
@@ -38,26 +38,30 @@ public interface Decoder
{
boolean readBit();
- short readOctet();
- int readShort();
- long readLong();
- long readLonglong();
+ short readUint8();
+ int readUint16();
+ long readUint32();
+ long readUint64();
- long readTimestamp();
-
- String readShortstr();
- String readLongstr();
-
- RangeSet readRfc1982LongSet();
+ long readDatetime();
UUID readUuid();
- String readContent();
+ int readSequenceNo();
+ RangeSet readSequenceSet(); // XXX
+ RangeSet readByteRanges(); // XXX
- Struct readStruct(int type);
- Struct readLongStruct();
+ String readStr8();
+ String readStr16();
- Map<String,Object> readTable();
- List<Object> readSequence();
+ byte[] readVbin8();
+ byte[] readVbin16();
+ byte[] readVbin32();
+
+ Struct readStruct32();
+ Map<String,Object> readMap();
+ List<Object> readList();
List<Object> readArray();
+ Struct readStruct(int type);
+
}
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/codec/Encoder.java b/java/common/src/main/java/org/apache/qpidity/transport/codec/Encoder.java
index 1b2fe0213e..0449ba6702 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/codec/Encoder.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/codec/Encoder.java
@@ -40,26 +40,30 @@ public interface Encoder
void flush();
void writeBit(boolean b);
- void writeOctet(short b);
- void writeShort(int s);
- void writeLong(long i);
- void writeLonglong(long l);
+ void writeUint8(short b);
+ void writeUint16(int s);
+ void writeUint32(long i);
+ void writeUint64(long l);
- void writeTimestamp(long l);
-
- void writeShortstr(String s);
- void writeLongstr(String s);
-
- void writeRfc1982LongSet(RangeSet ranges);
+ void writeDatetime(long l);
void writeUuid(UUID uuid);
- void writeContent(String c);
+ void writeSequenceNo(int s);
+ void writeSequenceSet(RangeSet ranges); // XXX
+ void writeByteRanges(RangeSet ranges); // XXX
- void writeStruct(int type, Struct s);
- void writeLongStruct(Struct s);
+ void writeStr8(String s);
+ void writeStr16(String s);
- void writeTable(Map<String,Object> table);
- void writeSequence(List<Object> sequence);
+ void writeVbin8(byte[] bytes);
+ void writeVbin16(byte[] bytes);
+ void writeVbin32(byte[] bytes);
+
+ void writeStruct32(Struct s);
+ void writeMap(Map<String,Object> map);
+ void writeList(List<Object> list);
void writeArray(List<Object> array);
+ void writeStruct(int type, Struct s);
+
}
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/codec/Sizer.java b/java/common/src/main/java/org/apache/qpidity/transport/codec/Sizer.java
index b98bf98239..8ffdd5150b 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/codec/Sizer.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/codec/Sizer.java
@@ -38,30 +38,34 @@ public interface Sizer extends Encoder
public static final Sizer NULL = new Sizer()
{
- public void flush() {};
+ public void flush() {}
- public void writeBit(boolean b) {};
- public void writeOctet(short b) {};
- public void writeShort(int s) {};
- public void writeLong(long i) {};
- public void writeLonglong(long l) {};
+ public void writeBit(boolean b) {}
+ public void writeUint8(short b) {}
+ public void writeUint16(int s) {}
+ public void writeUint32(long i) {}
+ public void writeUint64(long l) {}
- public void writeTimestamp(long l) {};
+ public void writeDatetime(long l) {}
+ public void writeUuid(UUID uuid) {}
- public void writeShortstr(String s) {};
- public void writeLongstr(String s) {};
+ public void writeSequenceNo(int s) {}
+ public void writeSequenceSet(RangeSet ranges) {} // XXX
+ public void writeByteRanges(RangeSet ranges) {} // XXX
- public void writeRfc1982LongSet(RangeSet ranges) {};
- public void writeUuid(UUID uuid) {};
+ public void writeStr8(String s) {}
+ public void writeStr16(String s) {}
- public void writeContent(String c) {};
+ public void writeVbin8(byte[] bytes) {}
+ public void writeVbin16(byte[] bytes) {}
+ public void writeVbin32(byte[] bytes) {}
- public void writeStruct(int type, Struct s) {};
- public void writeLongStruct(Struct s) {};
+ public void writeStruct32(Struct s) {}
+ public void writeMap(Map<String,Object> map) {}
+ public void writeList(List<Object> list) {}
+ public void writeArray(List<Object> array) {}
- public void writeTable(Map<String,Object> table) {};
- public void writeSequence(List<Object> sequence) {};
- public void writeArray(List<Object> array) {};
+ public void writeStruct(int type, Struct s) {}
public int getSize() { return 0; }
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/network/Assembler.java b/java/common/src/main/java/org/apache/qpidity/transport/network/Assembler.java
index 5c35596fd8..d493245f0c 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/network/Assembler.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/network/Assembler.java
@@ -37,6 +37,7 @@ import org.apache.qpidity.transport.ProtocolError;
import org.apache.qpidity.transport.ProtocolEvent;
import org.apache.qpidity.transport.ProtocolHeader;
import org.apache.qpidity.transport.Receiver;
+import org.apache.qpidity.transport.SegmentType;
import org.apache.qpidity.transport.Struct;
@@ -118,7 +119,7 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate
{
switch (frame.getType())
{
- case Frame.BODY:
+ case BODY:
emit(frame, new Data(frame, frame.isFirstFrame(),
frame.isLastFrame()));
break;
@@ -158,22 +159,29 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate
}
}
- private ProtocolEvent decode(Frame frame, byte type, List<ByteBuffer> segment)
+ private ProtocolEvent decode(Frame frame, SegmentType type, List<ByteBuffer> segment)
{
FragmentDecoder dec = new FragmentDecoder(segment.iterator());
switch (type)
{
- case Frame.METHOD:
- int methodType = dec.readShort();
- Method method = Method.create(methodType);
- method.read(dec);
- return method;
- case Frame.HEADER:
+ case CONTROL:
+ int controlType = dec.readUint16();
+ Method control = Method.create(controlType);
+ control.read(dec);
+ return control;
+ case COMMAND:
+ int commandType = dec.readUint16();
+ // read in the session header, right now we don't use it
+ dec.readUint16();
+ Method command = Method.create(commandType);
+ command.read(dec);
+ return command;
+ case HEADER:
List<Struct> structs = new ArrayList();
while (dec.hasRemaining())
{
- structs.add(dec.readLongStruct());
+ structs.add(dec.readStruct32());
}
return new Header(structs,frame.isLastFrame() && frame.isLastSegment());
default:
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/network/Disassembler.java b/java/common/src/main/java/org/apache/qpidity/transport/network/Disassembler.java
index d38d8ded98..0357df6e86 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/network/Disassembler.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/network/Disassembler.java
@@ -31,6 +31,7 @@ import org.apache.qpidity.transport.ProtocolDelegate;
import org.apache.qpidity.transport.ProtocolError;
import org.apache.qpidity.transport.ProtocolEvent;
import org.apache.qpidity.transport.ProtocolHeader;
+import org.apache.qpidity.transport.SegmentType;
import org.apache.qpidity.transport.Sender;
import org.apache.qpidity.transport.Struct;
@@ -76,50 +77,50 @@ public class Disassembler implements Sender<ConnectionEvent>,
sender.close();
}
- private void fragment(byte flags, byte type, ConnectionEvent event,
+ private void fragment(byte flags, SegmentType type, ConnectionEvent event,
ByteBuffer buf, boolean first, boolean last)
{
if(!buf.hasRemaining())
{
//empty data
- byte nflags = flags;
+ byte nflags = flags;
if (first)
{
nflags |= FIRST_FRAME;
first = false;
}
- nflags |= LAST_FRAME;
- Frame frame = new Frame(nflags, type,
+ nflags |= LAST_FRAME;
+ Frame frame = new Frame(nflags, type,
event.getProtocolEvent().getEncodedTrack(),
event.getChannel());
- // frame.addFragment(buf);
+ // frame.addFragment(buf);
sender.send(frame);
}
else
{
- while (buf.hasRemaining())
- {
- ByteBuffer slice = buf.slice();
- slice.limit(min(maxPayload, slice.remaining()));
- buf.position(buf.position() + slice.remaining());
-
- byte newflags = flags;
- if (first)
+ while (buf.hasRemaining())
{
- newflags |= FIRST_FRAME;
- first = false;
+ ByteBuffer slice = buf.slice();
+ slice.limit(min(maxPayload, slice.remaining()));
+ buf.position(buf.position() + slice.remaining());
+
+ byte newflags = flags;
+ if (first)
+ {
+ newflags |= FIRST_FRAME;
+ first = false;
+ }
+ if (last && !buf.hasRemaining())
+ {
+ newflags |= LAST_FRAME;
+ }
+
+ Frame frame = new Frame(newflags, type,
+ event.getProtocolEvent().getEncodedTrack(),
+ event.getChannel());
+ frame.addFragment(slice);
+ sender.send(frame);
}
- if (last && !buf.hasRemaining())
- {
- newflags |= LAST_FRAME;
- }
-
- Frame frame = new Frame(newflags, type,
- event.getProtocolEvent().getEncodedTrack(),
- event.getChannel());
- frame.addFragment(slice);
- sender.send(frame);
- }
}
}
@@ -128,15 +129,40 @@ public class Disassembler implements Sender<ConnectionEvent>,
sender.send(header);
}
- public void method(ConnectionEvent event, Method method)
+ public void control(ConnectionEvent event, Method method)
+ {
+ method(event, method, SegmentType.CONTROL);
+ }
+
+ public void command(ConnectionEvent event, Method method)
+ {
+ method(event, method, SegmentType.COMMAND);
+ }
+
+ private void method(ConnectionEvent event, Method method, SegmentType type)
{
SizeEncoder sizer = new SizeEncoder();
- sizer.writeShort(method.getEncodedType());
+ sizer.writeUint16(method.getEncodedType());
+ if (type == SegmentType.COMMAND)
+ {
+ sizer.writeUint16(0);
+ }
method.write(sizer);
ByteBuffer buf = ByteBuffer.allocate(sizer.size());
BBEncoder enc = new BBEncoder(buf);
- enc.writeShort(method.getEncodedType());
+ enc.writeUint16(method.getEncodedType());
+ if (type == SegmentType.COMMAND)
+ {
+ if (method.isSync())
+ {
+ enc.writeUint16(0x0101);
+ }
+ else
+ {
+ enc.writeUint16(0x0100);
+ }
+ }
method.write(enc);
enc.flush();
buf.flip();
@@ -148,7 +174,7 @@ public class Disassembler implements Sender<ConnectionEvent>,
flags |= LAST_SEG;
}
- fragment(flags, METHOD, event, buf, true, true);
+ fragment(flags, type, event, buf, true, true);
}
public void header(ConnectionEvent event, Header header)
@@ -159,24 +185,24 @@ public class Disassembler implements Sender<ConnectionEvent>,
SizeEncoder sizer = new SizeEncoder();
for (Struct st : header.getStructs())
{
- sizer.writeLongStruct(st);
+ sizer.writeStruct32(st);
}
buf = ByteBuffer.allocate(sizer.size());
BBEncoder enc = new BBEncoder(buf);
for (Struct st : header.getStructs())
{
- enc.writeLongStruct(st);
+ enc.writeStruct32(st);
enc.flush();
}
header.setBuf(buf);
}
else
{
- buf = header.getBuf();
+ buf = header.getBuf();
}
buf.flip();
- fragment((byte) 0x0, HEADER, event, buf, true, true);
+ fragment((byte) 0x0, SegmentType.HEADER, event, buf, true, true);
}
public void data(ConnectionEvent event, Data data)
@@ -187,7 +213,7 @@ public class Disassembler implements Sender<ConnectionEvent>,
{
ByteBuffer buf = it.next();
boolean last = data.isLast() && !it.hasNext();
- fragment(LAST_SEG, BODY, event, buf, first, last);
+ fragment(LAST_SEG, SegmentType.BODY, event, buf, first, last);
first = false;
}
}
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/network/Frame.java b/java/common/src/main/java/org/apache/qpidity/transport/network/Frame.java
index a5c5db4dba..c5fb7b9986 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/network/Frame.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/network/Frame.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpidity.transport.network;
+import org.apache.qpidity.transport.SegmentType;
import org.apache.qpidity.transport.util.SliceIterator;
import java.nio.ByteBuffer;
@@ -48,10 +49,6 @@ public class Frame implements NetworkEvent, Iterable<ByteBuffer>
public static final byte L3 = 2;
public static final byte L4 = 3;
- public static final byte METHOD = 1;
- public static final byte HEADER = 2;
- public static final byte BODY = 3;
-
public static final byte RESERVED = 0x0;
public static final byte VERSION = 0x0;
@@ -62,13 +59,13 @@ public class Frame implements NetworkEvent, Iterable<ByteBuffer>
public static final byte LAST_FRAME = 0x1;
final private byte flags;
- final private byte type;
+ final private SegmentType type;
final private byte track;
final private int channel;
final private List<ByteBuffer> fragments;
private int size;
- public Frame(byte flags, byte type, byte track, int channel)
+ public Frame(byte flags, SegmentType type, byte track, int channel)
{
this.flags = flags;
this.type = type;
@@ -99,7 +96,7 @@ public class Frame implements NetworkEvent, Iterable<ByteBuffer>
return size;
}
- public byte getType()
+ public SegmentType getType()
{
return type;
}
@@ -153,7 +150,7 @@ public class Frame implements NetworkEvent, Iterable<ByteBuffer>
{
StringBuilder str = new StringBuilder();
str.append(String.format
- ("[%05d %05d %1d %1d %d%d%d%d]", getChannel(), getSize(),
+ ("[%05d %05d %1d %1d %d%d%d%d] ", getChannel(), getSize(),
getTrack(), getType(),
isFirstSegment() ? 1 : 0, isLastSegment() ? 1 : 0,
isFirstFrame() ? 1 : 0, isLastFrame() ? 1 : 0));
@@ -170,7 +167,7 @@ public class Frame implements NetworkEvent, Iterable<ByteBuffer>
str.append(" | ");
}
- str.append(str(buf, 20));
+ str.append(str(buf));
}
return str.toString();
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java b/java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java
index 871c45743e..2d41a9f516 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java
@@ -22,10 +22,10 @@ package org.apache.qpidity.transport.network;
import java.nio.ByteBuffer;
-import org.apache.qpidity.transport.Constant;
import org.apache.qpidity.transport.ProtocolError;
import org.apache.qpidity.transport.ProtocolHeader;
import org.apache.qpidity.transport.Receiver;
+import org.apache.qpidity.transport.SegmentType;
import static org.apache.qpidity.transport.util.Functions.*;
@@ -77,7 +77,7 @@ public class InputHandler implements Receiver<ByteBuffer>
private byte minor;
private byte flags;
- private byte type;
+ private SegmentType type;
private byte track;
private int channel;
private int size;
@@ -146,7 +146,7 @@ public class InputHandler implements Receiver<ByteBuffer>
flags = buf.get();
return FRAME_HDR_TYPE;
case FRAME_HDR_TYPE:
- type = buf.get();
+ type = SegmentType.get(buf.get());
return FRAME_HDR_SIZE1;
case FRAME_HDR_SIZE1:
size = (0xFF & buf.get()) << 8;
@@ -218,7 +218,7 @@ public class InputHandler implements Receiver<ByteBuffer>
return FRAME_END;
}
case FRAME_END:
- return expect(buf, Constant.FRAME_END, FRAME_HDR);
+ return expect(buf, OutputHandler.FRAME_END, FRAME_HDR);
default:
throw new IllegalStateException();
}
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java b/java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java
index 9f770bcb1c..8f615cf80d 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java
@@ -67,11 +67,13 @@ public class OutputHandler implements Sender<NetworkEvent>, NetworkDelegate
}
}
+ public static final int FRAME_END = 0xCE;
+
public void frame(Frame frame)
{
ByteBuffer hdr = ByteBuffer.allocate(HEADER_SIZE + frame.getSize() + 1);
hdr.put(frame.getFlags());
- hdr.put(frame.getType());
+ hdr.put((byte) frame.getType().getValue());
hdr.putShort((short) (frame.getSize() + HEADER_SIZE));
hdr.put(RESERVED);
hdr.put(frame.getTrack());
@@ -84,7 +86,7 @@ public class OutputHandler implements Sender<NetworkEvent>, NetworkDelegate
{
hdr.put(buf);
}
- hdr.put((byte) Constant.FRAME_END);
+ hdr.put((byte) FRAME_END);
hdr.flip();
synchronized (lock)
{
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaHandler.java b/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaHandler.java
index 2e4875cf42..7da719087c 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaHandler.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaHandler.java
@@ -200,7 +200,7 @@ public class MinaHandler<E> implements IoHandler
ConnectionDelegate delegate)
{
return connect(host, port, new ConnectionBinding
- (delegate, InputHandler.State.FRAME_HDR));
+ (delegate, InputHandler.State.PROTO_HDR));
}
private static class ConnectionBinding
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/util/Functions.java b/java/common/src/main/java/org/apache/qpidity/transport/util/Functions.java
index 7dbb3d30e8..14e756c047 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/util/Functions.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/util/Functions.java
@@ -52,15 +52,24 @@ public class Functions
public static final String str(ByteBuffer buf, int limit)
{
StringBuilder str = new StringBuilder();
+ str.append('"');
+
for (int i = 0; i < min(buf.remaining(), limit); i++)
{
- if (i > 0 && i % 2 == 0)
+ byte c = buf.get(buf.position() + i);
+
+ if (c > 31 && c < 127 && c != '\\')
{
- str.append(" ");
+ str.append((char)c);
+ }
+ else
+ {
+ str.append(String.format("\\x%02x", c));
}
- str.append(String.format("%02x", buf.get(buf.position() + i)));
}
+ str.append('"');
+
return str.toString();
}
diff --git a/java/common/src/test/java/org/apache/qpidity/transport/ConnectionTest.java b/java/common/src/test/java/org/apache/qpidity/transport/ConnectionTest.java
index d7ece9b745..1a83786e12 100644
--- a/java/common/src/test/java/org/apache/qpidity/transport/ConnectionTest.java
+++ b/java/common/src/test/java/org/apache/qpidity/transport/ConnectionTest.java
@@ -100,12 +100,12 @@ public class ConnectionTest extends TestCase
}
Channel ch = conn.getChannel(0);
- Session ssn = new Session();
+ Session ssn = new Session("test".getBytes());
ssn.attach(ch);
try
{
- ssn.sessionOpen(1234);
+ ssn.sessionAttach(ssn.getName());
fail("writing to a closed socket succeeded");
}
catch (TransportException e)