diff options
Diffstat (limited to 'java/common/src')
29 files changed, 722 insertions, 473 deletions
diff --git a/java/common/src/main/java/org/apache/qpidity/SecurityHelper.java b/java/common/src/main/java/org/apache/qpidity/SecurityHelper.java index d0b3272dec..a72997813a 100644 --- a/java/common/src/main/java/org/apache/qpidity/SecurityHelper.java +++ b/java/common/src/main/java/org/apache/qpidity/SecurityHelper.java @@ -22,6 +22,7 @@ package org.apache.qpidity; import java.io.UnsupportedEncodingException; import java.util.HashSet; +import java.util.List; import java.util.StringTokenizer; import org.apache.qpidity.security.AMQPCallbackHandler; @@ -29,13 +30,12 @@ import org.apache.qpidity.security.CallbackHandlerRegistry; public class SecurityHelper { - public static String chooseMechanism(String mechanisms) throws UnsupportedEncodingException + public static String chooseMechanism(List<Object> mechanisms) throws UnsupportedEncodingException { - StringTokenizer tokenizer = new StringTokenizer(mechanisms, " "); HashSet mechanismSet = new HashSet(); - while (tokenizer.hasMoreTokens()) + for (Object m : mechanisms) { - mechanismSet.add(tokenizer.nextToken()); + mechanismSet.add(m); } String preferredMechanisms = CallbackHandlerRegistry.getInstance().getMechanisms(); diff --git a/java/common/src/main/java/org/apache/qpidity/ToyBroker.java b/java/common/src/main/java/org/apache/qpidity/ToyBroker.java index 2bd97f3aff..5f9917e30a 100644 --- a/java/common/src/main/java/org/apache/qpidity/ToyBroker.java +++ b/java/common/src/main/java/org/apache/qpidity/ToyBroker.java @@ -59,7 +59,7 @@ class ToyBroker extends SessionDelegate public void messageAcquire(Session context, MessageAcquire struct) { System.out.println("\n==================> messageAcquire " ); - context.messageAcquired(struct.getTransfers()); + context.executionResult((int) struct.getId(), new Acquired(struct.getTransfers())); } @Override public void queueDeclare(Session ssn, QueueDeclare qd) @@ -68,16 +68,16 @@ class ToyBroker extends SessionDelegate System.out.println("\n==================> declared queue: " + qd.getQueue() + "\n"); } - @Override public void queueBind(Session ssn, QueueBind qb) + @Override public void exchangeBind(Session ssn, ExchangeBind qb) { - exchange.bindQueue(qb.getExchange(), qb.getRoutingKey(),qb.getQueue()); - System.out.println("\n==================> bound queue: " + qb.getQueue() + " with routing key " + qb.getRoutingKey() + "\n"); + exchange.bindQueue(qb.getExchange(), qb.getBindingKey(),qb.getQueue()); + System.out.println("\n==================> bound queue: " + qb.getQueue() + " with binding key " + qb.getBindingKey() + "\n"); } @Override public void queueQuery(Session ssn, QueueQuery qq) { QueueQueryResult result = new QueueQueryResult().queue(qq.getQueue()); - ssn.executionResult(qq.getId(), result); + ssn.executionResult((int) qq.getId(), result); } @Override public void messageSubscribe(Session ssn, MessageSubscribe ms) @@ -112,7 +112,8 @@ class ToyBroker extends SessionDelegate { if (xfr == null || body == null) { - ssn.connectionClose(503, "no method segment", 0, 0); + ssn.connectionClose(ConnectionCloseCode.FRAMING_ERROR, + "no method segment"); ssn.close(); return; } @@ -136,7 +137,7 @@ class ToyBroker extends SessionDelegate { if (xfr == null || body == null) { - ssn.connectionClose(503, "no method segment", 0, 0); + ssn.connectionClose(ConnectionCloseCode.FRAMING_ERROR, "no method segment"); ssn.close(); return; } @@ -174,14 +175,16 @@ class ToyBroker extends SessionDelegate { RangeSet ranges = new RangeSet(); ranges.add(xfr.getId()); - ssn.messageReject(ranges, 0, "no such destination"); + ssn.messageReject(ranges, MessageRejectCode.UNROUTABLE, + "no such destination"); } } private void transferMessageToPeer(Session ssn,String dest, Message m) { System.out.println("\n==================> Transfering message to: " +dest + "\n"); - ssn.messageTransfer(dest, (short)0, (short)0); + ssn.messageTransfer(dest, MessageAcceptMode.EXPLICIT, + MessageAcquireMode.PRE_ACQUIRED); ssn.header(m.header); for (Data d : m.body) { diff --git a/java/common/src/main/java/org/apache/qpidity/ToyClient.java b/java/common/src/main/java/org/apache/qpidity/ToyClient.java index 10b68bbb20..a3233afcbe 100644 --- a/java/common/src/main/java/org/apache/qpidity/ToyClient.java +++ b/java/common/src/main/java/org/apache/qpidity/ToyClient.java @@ -63,7 +63,7 @@ class ToyClient extends SessionDelegate public static final void main(String[] args) { Connection conn = MinaHandler.connect("0.0.0.0", 5672, - new ConnectionDelegate() + new ClientDelegate() { public SessionDelegate getSessionDelegate() { @@ -80,9 +80,9 @@ class ToyClient extends SessionDelegate TransportConstants.getVersionMinor()))); Channel ch = conn.getChannel(0); - Session ssn = new Session(); + Session ssn = new Session("my-session".getBytes()); ssn.attach(ch); - ssn.sessionOpen(1234); + ssn.sessionAttach(ssn.getName()); ssn.queueDeclare("asdf", null, null); ssn.sync(); @@ -111,13 +111,15 @@ class ToyClient extends SessionDelegate map.put("list", Arrays.asList(1, 2, 3)); map.put("binary", new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}); - ssn.messageTransfer("asdf", (short) 0, (short) 1); + ssn.messageTransfer("asdf", MessageAcceptMode.EXPLICIT, + MessageAcquireMode.PRE_ACQUIRED); ssn.header(new DeliveryProperties(), new MessageProperties().setApplicationHeaders(map)); ssn.data("this is the data"); ssn.endData(); - ssn.messageTransfer("fdsa", (short) 0, (short) 1); + ssn.messageTransfer("fdsa", MessageAcceptMode.EXPLICIT, + MessageAcquireMode.PRE_ACQUIRED); ssn.data("this should be rejected"); ssn.endData(); ssn.sync(); diff --git a/java/common/src/main/java/org/apache/qpidity/dtx/XidImpl.java b/java/common/src/main/java/org/apache/qpidity/dtx/XidImpl.java index 224917ade1..89d7e7917f 100644 --- a/java/common/src/main/java/org/apache/qpidity/dtx/XidImpl.java +++ b/java/common/src/main/java/org/apache/qpidity/dtx/XidImpl.java @@ -241,28 +241,10 @@ public class XidImpl implements Xid * @return The String representation of this Xid * @throws QpidException In case of problem when converting this Xid into a string. */ - public static String convertToString(Xid xid) throws QpidException + public static org.apache.qpidity.transport.Xid convert(Xid xid) throws QpidException { - if (_logger.isDebugEnabled()) - { - _logger.debug("converting " + xid + " into a String"); - } - try - { - ByteArrayOutputStream res = new ByteArrayOutputStream(); - DataOutputStream out = new DataOutputStream(res); - out.writeLong(xid.getFormatId()); - byte[] txId = xid.getGlobalTransactionId(); - byte[] brId = xid.getBranchQualifier(); - out.writeByte(txId.length); - out.writeByte(brId.length); - out.write(txId); - out.write(brId); - return res.toString(); - } - catch (IOException e) - { - throw new QpidException("cannot convert the xid " + xid + " into a String", null, e); - } + return new org.apache.qpidity.transport.Xid(xid.getFormatId(), + xid.getGlobalTransactionId(), + xid.getBranchQualifier()); } } diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Channel.java b/java/common/src/main/java/org/apache/qpidity/transport/Channel.java index 7327697088..651402bcb7 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/Channel.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/Channel.java @@ -77,7 +77,7 @@ public class Channel extends Invoker connection.getConnectionDelegate().init(this, hdr); } - public void method(Void v, Method method) + public void control(Void v, Method method) { switch (method.getEncodedTrack()) { @@ -90,15 +90,17 @@ public class Channel extends Invoker case L3: method.delegate(session, sessionDelegate); break; - case L4: - method.delegate(session, sessionDelegate); - break; default: throw new IllegalStateException ("unknown track: " + method.getEncodedTrack()); } } + public void command(Void v, Method method) + { + method.delegate(session, sessionDelegate); + } + public void header(Void v, Header header) { header.delegate(session, sessionDelegate); diff --git a/java/common/src/main/java/org/apache/qpidity/transport/ChannelDelegate.java b/java/common/src/main/java/org/apache/qpidity/transport/ChannelDelegate.java index 84621fbe25..7f4365f515 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/ChannelDelegate.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/ChannelDelegate.java @@ -32,15 +32,14 @@ import java.util.UUID; class ChannelDelegate extends MethodDelegate<Channel> { - public @Override void sessionOpen(Channel channel, SessionOpen open) + public @Override void sessionAttach(Channel channel, SessionAttach atch) { - Session ssn = new Session(); + Session ssn = new Session(atch.getName()); ssn.attach(channel); - long lifetime = open.getDetachedLifetime(); - ssn.sessionAttached(UUID.randomUUID(), lifetime); + ssn.sessionAttached(ssn.getName()); } - public @Override void sessionClosed(Channel channel, SessionClosed closed) + public @Override void sessionDetached(Channel channel, SessionDetached closed) { channel.getSession().closed(); // XXX: should we remove the channel from the connection? It diff --git a/java/common/src/main/java/org/apache/qpidity/transport/ClientDelegate.java b/java/common/src/main/java/org/apache/qpidity/transport/ClientDelegate.java new file mode 100644 index 0000000000..699854fb3b --- /dev/null +++ b/java/common/src/main/java/org/apache/qpidity/transport/ClientDelegate.java @@ -0,0 +1,41 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpidity.transport; + + +/** + * ClientDelegate + * + */ + +public abstract class ClientDelegate extends ConnectionDelegate +{ + + public void init(Channel ch, ProtocolHeader hdr) + { + if (hdr.getMajor() != TransportConstants.getVersionMajor() && + hdr.getMinor() != TransportConstants.getVersionMinor()) + { + throw new RuntimeException("version missmatch: " + hdr); + } + } + +} diff --git a/java/common/src/main/java/org/apache/qpidity/transport/ConnectionDelegate.java b/java/common/src/main/java/org/apache/qpidity/transport/ConnectionDelegate.java index 4815f1025f..cb5f05a185 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/ConnectionDelegate.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/ConnectionDelegate.java @@ -26,7 +26,10 @@ import org.apache.qpidity.SecurityHelper; import org.apache.qpidity.QpidException; import java.io.UnsupportedEncodingException; + +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; @@ -79,17 +82,27 @@ public abstract class ConnectionDelegate extends MethodDelegate<Channel> public void init(Channel ch, ProtocolHeader hdr) { - // XXX: hardcoded version - if (hdr.getMajor() != 0 && hdr.getMinor() != 10) + ch.getConnection().send(new ConnectionEvent(0, new ProtocolHeader + (1, + TransportConstants.getVersionMajor(), + TransportConstants.getVersionMinor()))); + if (hdr.getMajor() != TransportConstants.getVersionMajor() && + hdr.getMinor() != TransportConstants.getVersionMinor()) { // XXX - ch.getConnection().send(new ConnectionEvent(0, new ProtocolHeader(1, TransportConstants.getVersionMajor(), - TransportConstants.getVersionMinor()))); + ch.getConnection().send(new ConnectionEvent(0, new ProtocolHeader + (1, + TransportConstants.getVersionMajor(), + TransportConstants.getVersionMinor()))); ch.getConnection().close(); } else { - ch.connectionStart(hdr.getMajor(), hdr.getMinor(), null, "PLAIN", "utf8"); + List<Object> plain = new ArrayList<Object>(); + plain.add("PLAIN"); + List<Object> utf8 = new ArrayList<Object>(); + utf8.add("utf8"); + ch.connectionStart(null, plain, utf8); } } @@ -99,13 +112,13 @@ public abstract class ConnectionDelegate extends MethodDelegate<Channel> @Override public void connectionStart(Channel context, ConnectionStart struct) { String mechanism = null; - String response = null; + byte[] response = null; try { mechanism = SecurityHelper.chooseMechanism(struct.getMechanisms()); saslClient = Sasl.createSaslClient(new String[]{ mechanism },null, "AMQP", "localhost", null, SecurityHelper.createCallbackHandler(mechanism,_username,_password )); - response = new String(saslClient.evaluateChallenge(new byte[0]),_locale); + response = saslClient.evaluateChallenge(new byte[0]); } catch (UnsupportedEncodingException e) { @@ -128,13 +141,9 @@ public abstract class ConnectionDelegate extends MethodDelegate<Channel> { try { - String response = new String(saslClient.evaluateChallenge(struct.getChallenge().getBytes()),_locale); + byte[] response = saslClient.evaluateChallenge(struct.getChallenge()); context.connectionSecureOk(response); } - catch (UnsupportedEncodingException e) - { - // need error handling - } catch (SaslException e) { // need error handling @@ -144,14 +153,14 @@ public abstract class ConnectionDelegate extends MethodDelegate<Channel> @Override public void connectionTune(Channel context, ConnectionTune struct) { // should update the channel max given by the broker. - context.connectionTuneOk(struct.getChannelMax(), struct.getFrameMax(), struct.getHeartbeat()); + context.connectionTuneOk(struct.getChannelMax(), struct.getMaxFrameSize(), struct.getHeartbeatMax()); context.connectionOpen(_virtualHost, null, Option.INSIST); } @Override public void connectionOpenOk(Channel context, ConnectionOpenOk struct) { - String knownHosts = struct.getKnownHosts(); + List<Object> knownHosts = struct.getKnownHosts(); if(_negotiationCompleteLock != null) { _negotiationCompleteLock.lock(); @@ -187,13 +196,13 @@ public abstract class ConnectionDelegate extends MethodDelegate<Channel> byte[] challenge = null; if ( challenge == null) { - context.connectionTune(Integer.MAX_VALUE,maxFrame, 0); + context.connectionTune(Integer.MAX_VALUE, maxFrame, 0, Integer.MAX_VALUE); } else { try { - context.connectionSecure(new String(challenge,_locale)); + context.connectionSecure(challenge); } catch(Exception e) { @@ -218,16 +227,16 @@ public abstract class ConnectionDelegate extends MethodDelegate<Channel> try { saslServer = Sasl.createSaslServer(_mechanism, "AMQP", "ABC",new HashMap(),SecurityHelper.createCallbackHandler(_mechanism,_username,_password)); - byte[] challenge = saslServer.evaluateResponse(struct.getResponse().getBytes()); + byte[] challenge = saslServer.evaluateResponse(struct.getResponse()); if ( challenge == null) { - context.connectionTune(Integer.MAX_VALUE,maxFrame, 0); + context.connectionTune(Integer.MAX_VALUE, maxFrame, 0, Integer.MAX_VALUE); } else { try { - context.connectionSecure(new String(challenge,_locale)); + context.connectionSecure(challenge); } catch(Exception e) { @@ -250,8 +259,9 @@ public abstract class ConnectionDelegate extends MethodDelegate<Channel> @Override public void connectionOpen(Channel context, ConnectionOpen struct) { - String hosts = "amqp:1223243232325"; - context.connectionOpenOk(hosts); + List<Object> hosts = new ArrayList<Object>(); + hosts.add("amqp:1223243232325"); + context.connectionOpenOk(hosts); } public String getPassword() diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Data.java b/java/common/src/main/java/org/apache/qpidity/transport/Data.java index b9b8636d18..72c1c3331d 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/Data.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/Data.java @@ -93,7 +93,7 @@ public class Data implements ProtocolEvent { str.append(" | "); } - str.append(str(buf, 20)); + str.append(str(buf)); } str.append(")"); return str.toString(); diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Method.java b/java/common/src/main/java/org/apache/qpidity/transport/Method.java index 7304260333..bc4ec6289d 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/Method.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/Method.java @@ -20,6 +20,7 @@ */ package org.apache.qpidity.transport; +import org.apache.qpidity.transport.network.Frame; /** * Method @@ -34,11 +35,12 @@ public abstract class Method extends Struct implements ProtocolEvent { // XXX: should generate separate factories for separate // namespaces - return (Method) Struct.create(type); + return (Method) StructFactory.createInstruction(type); } // XXX: command subclass? private long id; + private boolean sync = false; public final long getId() { @@ -50,6 +52,16 @@ public abstract class Method extends Struct implements ProtocolEvent this.id = id; } + public final boolean isSync() + { + return sync; + } + + void setSync(boolean value) + { + this.sync = value; + } + public abstract boolean hasPayload(); public abstract byte getEncodedTrack(); @@ -58,7 +70,14 @@ public abstract class Method extends Struct implements ProtocolEvent public <C> void delegate(C context, ProtocolDelegate<C> delegate) { - delegate.method(context, this); + if (getEncodedTrack() == Frame.L4) + { + delegate.command(context, this); + } + else + { + delegate.control(context, this); + } } } diff --git a/java/common/src/main/java/org/apache/qpidity/transport/ProtocolDelegate.java b/java/common/src/main/java/org/apache/qpidity/transport/ProtocolDelegate.java index 6149c3876a..028d570416 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/ProtocolDelegate.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/ProtocolDelegate.java @@ -31,7 +31,9 @@ public interface ProtocolDelegate<C> void init(C context, ProtocolHeader header); - void method(C context, Method method); + void control(C context, Method control); + + void command(C context, Method command); void header(C context, Header header); diff --git a/java/common/src/main/java/org/apache/qpidity/transport/RangeSet.java b/java/common/src/main/java/org/apache/qpidity/transport/RangeSet.java index 97d4be0d2e..ccbcfa99d0 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/RangeSet.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/RangeSet.java @@ -98,6 +98,13 @@ public class RangeSet implements Iterable<Range> ranges.clear(); } + public RangeSet copy() + { + RangeSet copy = new RangeSet(); + copy.ranges.addAll(ranges); + return copy; + } + public String toString() { StringBuffer str = new StringBuffer(); diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Session.java b/java/common/src/main/java/org/apache/qpidity/transport/Session.java index e4f4af95a5..f6f4062c6d 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/Session.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/Session.java @@ -32,6 +32,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; +import static org.apache.qpidity.transport.Option.*; /** * Session @@ -56,23 +57,34 @@ public class Session extends Invoker private static boolean ENABLE_REPLAY = false; private static final Logger log = Logger.get(Session.class); + private byte[] name; + private long timeout = 60000; + // channel may be null Channel channel; // incoming command count - private long commandsIn = 0; + long commandsIn = 0; // completed incoming commands private final RangeSet processed = new RangeSet(); - private long processedMark = -1; private Range syncPoint = null; // outgoing command count private long commandsOut = 0; private Map<Long,Method> commands = new HashMap<Long,Method>(); - private long mark = 0; + private long maxComplete = -1; private AtomicBoolean closed = new AtomicBoolean(false); + public Session(byte[] name) + { + this.name = name; + } + + public byte[] getName() + { + return name; + } public Map<Long,Method> getOutstandingCommands() { @@ -133,25 +145,12 @@ public class Session extends Invoker public void flushProcessed() { - boolean first = true; - RangeSet rest = new RangeSet(); + RangeSet copy; synchronized (processed) { - for (Range r: processed) - { - if (first && r.includes(processedMark)) - { - processedMark = r.getUpper(); - } - else - { - rest.add(r); - } - - first = false; - } + copy = processed.copy(); } - executionComplete(processedMark, rest); + sessionCompleted(copy); } void syncPoint() @@ -193,34 +192,34 @@ public class Session extends Invoker log.debug("%s complete(%d, %d)", this, lower, upper); synchronized (commands) { - for (long id = lower; id <= upper; id++) + for (long id = maxComplete; id <= upper; id++) { commands.remove(id); } + if (lower <= maxComplete + 1) + { + maxComplete = Math.max(maxComplete, upper); + } commands.notifyAll(); log.debug("%s commands remaining: %s", this, commands); } } - void complete(long mark) - { - synchronized (commands) - { - complete(this.mark, mark); - this.mark = mark; - commands.notifyAll(); - } - } - protected void invoke(Method m) { if (m.getEncodedTrack() == Frame.L4) { synchronized (commands) { - // You only need to keep the command if you need command level replay. - // If not we only need to keep track of commands to make sync work - commands.put(commandsOut++,(ENABLE_REPLAY?m:null)); + long next = commandsOut++; + if (next == 0) + { + sessionCommandPoint(0, 0); + } + if (ENABLE_REPLAY) + { + commands.put(next, m); + } channel.method(m); } } @@ -230,7 +229,7 @@ public class Session extends Invoker } } - public void header(Header header) + public void header(Header header) { channel.header(header); } @@ -269,21 +268,32 @@ public class Session extends Invoker public void sync() { + sync(timeout); + } + + public void sync(long timeout) + { log.debug("%s sync()", this); synchronized (commands) { long point = commandsOut - 1; - if (mark < point) + if (maxComplete < point) { - executionSync(); + ExecutionSync sync = new ExecutionSync(); + sync.setSync(true); + invoke(sync); } - while (!closed.get() && mark < point) + long start = System.currentTimeMillis(); + long elapsed = 0; + while (!closed.get() && elapsed < timeout && maxComplete < point) { try { - log.debug("%s waiting for[%d]: %s", this, point, commands); - commands.wait(); + log.debug("%s waiting for[%d]: %d, %s", this, point, + maxComplete, commands); + commands.wait(timeout - elapsed); + elapsed = System.currentTimeMillis() - start; } catch (InterruptedException e) { @@ -291,9 +301,16 @@ public class Session extends Invoker } } - if (mark < point) + if (maxComplete < point) { - throw new RuntimeException("session closed"); + if (closed.get()) + { + throw new RuntimeException("session closed"); + } + else + { + throw new RuntimeException("timed out waiting for sync"); + } } } } @@ -346,16 +363,19 @@ public class Session extends Invoker } } - public T get(long timeout, int nanos) + public T get(long timeout) { synchronized (this) { - while (!closed.get() && !isDone()) + long start = System.currentTimeMillis(); + long elapsed = 0; + while (!closed.get() && timeout - elapsed > 0 && !isDone()) { try { log.debug("%s waiting for result: %s", Session.this, this); - wait(timeout, nanos); + wait(timeout - elapsed); + elapsed = System.currentTimeMillis() - start; } catch (InterruptedException e) { @@ -364,22 +384,23 @@ public class Session extends Invoker } } - if (!isDone()) + if (isDone()) + { + return result; + } + else if (closed.get()) { throw new RuntimeException("session closed"); } - - return result; - } - - public T get(long timeout) - { - return get(timeout, 0); + else + { + return null; + } } public T get() { - return get(0); + return get(timeout); } public boolean isDone() @@ -396,7 +417,8 @@ public class Session extends Invoker public void close() { - sessionClose(); + sessionRequestTimeout(0); + sessionDetach(name); // XXX: channel.close(); } diff --git a/java/common/src/main/java/org/apache/qpidity/transport/SessionDelegate.java b/java/common/src/main/java/org/apache/qpidity/transport/SessionDelegate.java index 5e36a77a98..442aba0e9b 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/SessionDelegate.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/SessionDelegate.java @@ -35,20 +35,16 @@ public abstract class SessionDelegate { public void init(Session ssn, ProtocolHeader hdr) { } - public void method(Session ssn, Method method) { - if (method.getEncodedTrack() == Frame.L4) - { - method.setId(ssn.nextCommandId()); - } - + public void control(Session ssn, Method method) { method.dispatch(ssn, this); + } - if (method.getEncodedTrack() == Frame.L4) + public void command(Session ssn, Method method) { + method.setId(ssn.nextCommandId()); + method.dispatch(ssn, this); + if (!method.hasPayload()) { - if (!method.hasPayload()) - { - ssn.processed(method); - } + ssn.processed(method); } } @@ -60,12 +56,12 @@ public abstract class SessionDelegate @Override public void executionResult(Session ssn, ExecutionResult result) { - ssn.result(result.getCommandId(), result.getData()); + ssn.result(result.getCommandId(), result.getValue()); } - @Override public void executionComplete(Session ssn, ExecutionComplete excmp) + @Override public void sessionCompleted(Session ssn, SessionCompleted cmp) { - RangeSet ranges = excmp.getRangedExecutionSet(); + RangeSet ranges = cmp.getCommands(); if (ranges != null) { for (Range range : ranges) @@ -73,12 +69,27 @@ public abstract class SessionDelegate ssn.complete(range.getLower(), range.getUpper()); } } - ssn.complete(excmp.getCumulativeExecutionMark()); } - @Override public void executionFlush(Session ssn, ExecutionFlush flush) + @Override public void sessionFlush(Session ssn, SessionFlush flush) + { + if (flush.getCompleted()) + { + ssn.flushProcessed(); + } + if (flush.getConfirmed()) + { + throw new Error("not implemented"); + } + if (flush.getExpected()) + { + throw new Error("not implemented"); + } + } + + @Override public void sessionCommandPoint(Session ssn, SessionCommandPoint scp) { - ssn.flushProcessed(); + ssn.commandsIn = scp.getCommandId(); } @Override public void executionSync(Session ssn, ExecutionSync sync) diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Struct.java b/java/common/src/main/java/org/apache/qpidity/transport/Struct.java index 200c3b68e3..f901f9e840 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/Struct.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/Struct.java @@ -71,8 +71,6 @@ public abstract class Struct implements Encodable return type; } - public abstract boolean hasTicket(); - private final boolean isBit(Field<?,?> f) { return f.getType().equals(Boolean.class); @@ -80,14 +78,7 @@ public abstract class Struct implements Encodable private final boolean packed() { - if (this instanceof Method) - { - return false; - } - else - { - return true; - } + return getPackWidth() > 0; } private final boolean encoded(Field<?,?> f) @@ -147,11 +138,6 @@ public abstract class Struct implements Encodable } } - if (hasTicket()) - { - dec.readShort(); - } - for (Field<?,?> f : fields) { if (encoded(f)) @@ -187,11 +173,6 @@ public abstract class Struct implements Encodable } } - if (hasTicket()) - { - enc.writeShort(0x0); - } - for (Field<?,?> f : fields) { if (encoded(f)) diff --git a/java/common/src/main/java/org/apache/qpidity/transport/TransportConstants.java b/java/common/src/main/java/org/apache/qpidity/transport/TransportConstants.java index 54429a1a4f..e9a0705de0 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/TransportConstants.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/TransportConstants.java @@ -2,8 +2,9 @@ package org.apache.qpidity.transport; public class TransportConstants { - private static byte _protocol_version_minor = 0; - private static byte _protocol_version_major = 99; + + private static byte _protocol_version_minor = 10; + private static byte _protocol_version_major = 0; public static void setVersionMajor(byte value) { @@ -24,4 +25,5 @@ public class TransportConstants { return _protocol_version_minor; } + } diff --git a/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractDecoder.java b/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractDecoder.java index ae67483a23..0f6180f54a 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractDecoder.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractDecoder.java @@ -20,7 +20,10 @@ */ package org.apache.qpidity.transport.codec; +import java.io.UnsupportedEncodingException; + import java.util.ArrayList; +import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -99,19 +102,19 @@ abstract class AbstractDecoder implements Decoder nbits = 0; } - public short readOctet() + public short readUint8() { return uget(); } - public int readShort() + public int readUint16() { int i = uget() << 8; i |= uget(); return i; } - public long readLong() + public long readUint32() { long l = uget() << 24; l |= uget() << 16; @@ -120,7 +123,12 @@ abstract class AbstractDecoder implements Decoder return l; } - public long readLonglong() + public int readSequenceNo() + { + return (int) readUint32(); + } + + public long readUint64() { long l = 0; for (int i = 0; i < 8; i++) @@ -130,31 +138,67 @@ abstract class AbstractDecoder implements Decoder return l; } - public long readTimestamp() + public long readDatetime() + { + return readUint64(); + } + + private static final String decode(byte[] bytes, String charset) { - return readLonglong(); + try + { + return new String(bytes, charset); + } + catch (UnsupportedEncodingException e) + { + throw new RuntimeException(e); + } } - public String readShortstr() + public String readStr8() + { + short size = readUint8(); + byte[] bytes = new byte[size]; + get(bytes); + return decode(bytes, "UTF-8"); + } + + public String readStr16() { - short size = readOctet(); + int size = readUint16(); byte[] bytes = new byte[size]; get(bytes); - return new String(bytes); + return decode(bytes, "UTF-8"); } - public String readLongstr() + public byte[] readVbin8() { - long size = readLong(); - byte[] bytes = new byte[(int) size]; + int size = readUint8(); + byte[] bytes = new byte[size]; get(bytes); - return new String(bytes); + return bytes; } - public RangeSet readRfc1982LongSet() + public byte[] readVbin16() { - int count = readShort()/8; + int size = readUint16(); + byte[] bytes = new byte[size]; + get(bytes); + return bytes; + } + + public byte[] readVbin32() + { + int size = (int) readUint32(); + byte[] bytes = new byte[size]; + get(bytes); + return bytes; + } + + public RangeSet readSequenceSet() + { + int count = readUint16()/8; if (count == 0) { return null; @@ -164,16 +208,21 @@ abstract class AbstractDecoder implements Decoder RangeSet ranges = new RangeSet(); for (int i = 0; i < count; i++) { - ranges.add(readLong(), readLong()); + ranges.add(readUint32(), readUint32()); } return ranges; } } + public RangeSet readByteRanges() + { + throw new Error("not implemented"); + } + public UUID readUuid() { - long msb = readLonglong(); - long lsb = readLonglong(); + long msb = readUint64(); + long lsb = readUint64(); return new UUID(msb, lsb); } @@ -194,34 +243,39 @@ abstract class AbstractDecoder implements Decoder return null; } } + if (type > 0) + { + int code = readUint16(); + assert code == type; + } st.read(this); return st; } - public Struct readLongStruct() + public Struct readStruct32() { - long size = readLong(); + long size = readUint32(); if (size == 0) { return null; } else { - int type = readShort(); + int type = readUint16(); Struct result = Struct.create(type); result.read(this); return result; } } - public Map<String,Object> readTable() + public Map<String,Object> readMap() { - long size = readLong(); + long size = readUint32(); int start = count; Map<String,Object> result = new LinkedHashMap(); while (count < start + size) { - String key = readShortstr(); + String key = readStr8(); byte code = get(); Type t = getType(code); Object value = read(t); @@ -230,9 +284,9 @@ abstract class AbstractDecoder implements Decoder return result; } - public List<Object> readSequence() + public List<Object> readList() { - long size = readLong(); + long size = readUint32(); int start = count; List<Object> result = new ArrayList(); while (count < start + size) @@ -247,10 +301,15 @@ abstract class AbstractDecoder implements Decoder public List<Object> readArray() { - long size = readLong(); + long size = readUint32(); + if (size == 0) + { + return Collections.EMPTY_LIST; + } + byte code = get(); Type t = getType(code); - long count = readLong(); + long count = readUint32(); List<Object> result = new ArrayList<Object>(); for (int i = 0; i < count; i++) @@ -291,11 +350,11 @@ abstract class AbstractDecoder implements Decoder switch (width) { case 1: - return readOctet(); + return readUint8(); case 2: - return readShort(); + return readUint16(); case 4: - return readLong(); + return readUint32(); default: throw new IllegalStateException("illegal width: " + width); } @@ -313,81 +372,72 @@ abstract class AbstractDecoder implements Decoder { switch (t) { - case OCTET: - case UNSIGNED_BYTE: - return readOctet(); - case SIGNED_BYTE: + case BIN8: + case UINT8: + return readUint8(); + case INT8: return get(); case CHAR: return (char) get(); case BOOLEAN: return get() > 0; - case TWO_OCTETS: - case UNSIGNED_SHORT: - return readShort(); + case BIN16: + case UINT16: + return readUint16(); - case SIGNED_SHORT: - return (short) readShort(); + case INT16: + return (short) readUint16(); - case FOUR_OCTETS: - case UNSIGNED_INT: - return readLong(); + case BIN32: + case UINT32: + return readUint32(); - case UTF32_CHAR: - case SIGNED_INT: - return (int) readLong(); + case CHAR_UTF32: + case INT32: + return (int) readUint32(); case FLOAT: - return Float.intBitsToFloat((int) readLong()); + return Float.intBitsToFloat((int) readUint32()); - case EIGHT_OCTETS: - case SIGNED_LONG: - case UNSIGNED_LONG: + case BIN64: + case UINT64: + case INT64: case DATETIME: - return readLonglong(); + return readUint64(); case DOUBLE: - return Double.longBitsToDouble(readLonglong()); - - case SIXTEEN_OCTETS: - case THIRTY_TWO_OCTETS: - case SIXTY_FOUR_OCTETS: - case _128_OCTETS: - case SHORT_BINARY: - case BINARY: - case LONG_BINARY: - return readBytes(t); + return Double.longBitsToDouble(readUint64()); case UUID: return readUuid(); - case SHORT_STRING: - case SHORT_UTF8_STRING: - case SHORT_UTF16_STRING: - case SHORT_UTF32_STRING: - case STRING: - case UTF8_STRING: - case UTF16_STRING: - case UTF32_STRING: - case LONG_STRING: - case LONG_UTF8_STRING: - case LONG_UTF16_STRING: - case LONG_UTF32_STRING: + case STR8: + return readStr8(); + + case STR16: + return readStr16(); + + case STR8_LATIN: + case STR8_UTF16: + case STR16_LATIN: + case STR16_UTF16: // XXX: need to do character conversion return new String(readBytes(t)); - case TABLE: - return readTable(); - case SEQUENCE: - return readSequence(); + case MAP: + return readMap(); + case LIST: + return readList(); case ARRAY: return readArray(); + case STRUCT32: + return readStruct32(); - case FIVE_OCTETS: - case DECIMAL: - case NINE_OCTETS: - case LONG_DECIMAL: + case BIN40: + case DEC32: + case BIN72: + case DEC64: // XXX: what types are we supposed to use here? return readBytes(t); diff --git a/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractEncoder.java b/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractEncoder.java index c2dd205d66..56b4537719 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractEncoder.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractEncoder.java @@ -20,8 +20,11 @@ */ package org.apache.qpidity.transport.codec; +import java.io.UnsupportedEncodingException; + import java.nio.ByteBuffer; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -48,17 +51,17 @@ abstract class AbstractEncoder implements Encoder static { ENCODINGS.put(Boolean.class, Type.BOOLEAN); - ENCODINGS.put(String.class, Type.LONG_STRING); - ENCODINGS.put(Long.class, Type.SIGNED_LONG); - ENCODINGS.put(Integer.class, Type.SIGNED_INT); - ENCODINGS.put(Short.class, Type.SIGNED_SHORT); - ENCODINGS.put(Byte.class, Type.SIGNED_BYTE); - ENCODINGS.put(Map.class, Type.TABLE); - ENCODINGS.put(List.class, Type.SEQUENCE); + ENCODINGS.put(String.class, Type.STR16); + ENCODINGS.put(Long.class, Type.INT64); + ENCODINGS.put(Integer.class, Type.INT32); + ENCODINGS.put(Short.class, Type.INT16); + ENCODINGS.put(Byte.class, Type.INT8); + ENCODINGS.put(Map.class, Type.MAP); + ENCODINGS.put(List.class, Type.LIST); ENCODINGS.put(Float.class, Type.FLOAT); ENCODINGS.put(Double.class, Type.DOUBLE); ENCODINGS.put(Character.class, Type.CHAR); - ENCODINGS.put(byte[].class, Type.LONG_BINARY); + ENCODINGS.put(byte[].class, Type.VBIN32); } protected Sizer sizer() @@ -120,14 +123,14 @@ abstract class AbstractEncoder implements Encoder flushBits(); } - public void writeOctet(short b) + public void writeUint8(short b) { assert b < 0x100; put((byte) b); } - public void writeShort(int s) + public void writeUint16(int s) { assert s < 0x10000; @@ -135,7 +138,7 @@ abstract class AbstractEncoder implements Encoder put(lsb(s)); } - public void writeLong(long i) + public void writeUint32(long i) { assert i < 0x100000000L; @@ -145,7 +148,12 @@ abstract class AbstractEncoder implements Encoder put(lsb(i)); } - public void writeLonglong(long l) + public void writeSequenceNo(int i) + { + writeUint32(i); + } + + public void writeUint64(long l) { for (int i = 0; i < 8; i++) { @@ -154,47 +162,101 @@ abstract class AbstractEncoder implements Encoder } - public void writeTimestamp(long l) + public void writeDatetime(long l) + { + writeUint64(l); + } + + private static final String checkLength(String s, int n) + { + if (s == null) + { + return ""; + } + + if (s.length() > n) + { + throw new IllegalArgumentException("string too long: " + s); + } + else + { + return s; + } + } + + private static final byte[] encode(String s, String charset) + { + try + { + return s.getBytes(charset); + } + catch (UnsupportedEncodingException e) + { + throw new RuntimeException(e); + } + } + + public void writeStr8(String s) { - writeLonglong(l); + s = checkLength(s, 255); + writeUint8((short) s.length()); + put(ByteBuffer.wrap(encode(s, "UTF-8"))); } + public void writeStr16(String s) + { + s = checkLength(s, 65535); + writeUint16(s.length()); + put(ByteBuffer.wrap(encode(s, "UTF-8"))); + } - public void writeShortstr(String s) + public void writeVbin8(byte[] bytes) { - if (s == null) { s = ""; } - if (s.length() > 255) { - throw new IllegalArgumentException(s); + if (bytes == null) { bytes = new byte[0]; } + if (bytes.length > 255) + { + throw new IllegalArgumentException("array too long: " + bytes.length); } - writeOctet((short) s.length()); - put(ByteBuffer.wrap(s.getBytes())); + writeUint8((short) bytes.length); + put(ByteBuffer.wrap(bytes)); } - public void writeLongstr(String s) + public void writeVbin16(byte[] bytes) { - if (s == null) { s = ""; } - writeLong(s.length()); - put(ByteBuffer.wrap(s.getBytes())); + if (bytes == null) { bytes = new byte[0]; } + writeUint16(bytes.length); + put(ByteBuffer.wrap(bytes)); } + public void writeVbin32(byte[] bytes) + { + if (bytes == null) { bytes = new byte[0]; } + writeUint32(bytes.length); + put(ByteBuffer.wrap(bytes)); + } - public void writeRfc1982LongSet(RangeSet ranges) + public void writeSequenceSet(RangeSet ranges) { if (ranges == null) { - writeShort((short) 0); + writeUint16((short) 0); } else { - writeShort(ranges.size() * 8); + writeUint16(ranges.size() * 8); for (Range range : ranges) { - writeLong(range.getLower()); - writeLong(range.getUpper()); + writeUint32(range.getLower()); + writeUint32(range.getUpper()); } } } + public void writeByteRanges(RangeSet ranges) + { + throw new Error("not implemented"); + } + public void writeUuid(UUID uuid) { long msb = 0; @@ -202,15 +264,10 @@ abstract class AbstractEncoder implements Encoder if (uuid != null) { msb = uuid.getMostSignificantBits(); - uuid.getLeastSignificantBits(); + lsb = uuid.getLeastSignificantBits(); } - writeLonglong(msb); - writeLonglong(lsb); - } - - public void writeContent(String c) - { - throw new Error("Deprecated"); + writeUint64(msb); + writeUint64(lsb); } public void writeStruct(int type, Struct s) @@ -237,22 +294,27 @@ abstract class AbstractEncoder implements Encoder } } + if (type > 0) + { + writeUint16(type); + } + s.write(this); } - public void writeLongStruct(Struct s) + public void writeStruct32(Struct s) { if (s == null) { - writeLong(0); + writeUint32(0); } else { Sizer sizer = sizer(); - sizer.writeShort(s.getEncodedType()); + sizer.writeUint16(s.getEncodedType()); s.write(sizer); - writeLong(sizer.size()); - writeShort(s.getEncodedType()); + writeUint32(sizer.size()); + writeUint16(s.getEncodedType()); s.write(this); } } @@ -309,46 +371,46 @@ abstract class AbstractEncoder implements Encoder return null; } - public void writeTable(Map<String,Object> table) + public void writeMap(Map<String,Object> map) { - if (table == null) + if (map == null) { - writeLong(0); + writeUint32(0); return; } Sizer sizer = sizer(); - sizer.writeTable(table); + sizer.writeMap(map); // XXX: - 4 - writeLong(sizer.size() - 4); - writeTableEntries(table); + writeUint32(sizer.size() - 4); + writeMapEntries(map); } - protected void writeTableEntries(Map<String,Object> table) + protected void writeMapEntries(Map<String,Object> map) { - for (Map.Entry<String,Object> entry : table.entrySet()) + for (Map.Entry<String,Object> entry : map.entrySet()) { String key = entry.getKey(); Object value = entry.getValue(); Type type = encoding(value); - writeShortstr(key); + writeStr8(key); put(type.code); write(type, value); } } - public void writeSequence(List<Object> sequence) + public void writeList(List<Object> list) { Sizer sizer = sizer(); - sizer.writeSequence(sequence); + sizer.writeList(list); // XXX: - 4 - writeLong(sizer.size() - 4); - writeSequenceEntries(sequence); + writeUint32(sizer.size() - 4); + writeListEntries(list); } - protected void writeSequenceEntries(List<Object> sequence) + protected void writeListEntries(List<Object> list) { - for (Object value : sequence) + for (Object value : list) { Type type = encoding(value); put(type.code); @@ -358,10 +420,15 @@ abstract class AbstractEncoder implements Encoder public void writeArray(List<Object> array) { + if (array == null) + { + array = Collections.EMPTY_LIST; + } + Sizer sizer = sizer(); sizer.writeArray(array); // XXX: -4 - writeLong(sizer.size() - 4); + writeUint32(sizer.size() - 4); writeArrayEntries(array); } @@ -371,7 +438,7 @@ abstract class AbstractEncoder implements Encoder if (array.isEmpty()) { - type = Type.VOID; + return; } else { @@ -380,6 +447,8 @@ abstract class AbstractEncoder implements Encoder put(type.code); + writeUint32(array.size()); + for (Object value : array) { write(type, value); @@ -409,13 +478,13 @@ abstract class AbstractEncoder implements Encoder switch (width) { case 1: - writeOctet((short) size); + writeUint8((short) size); break; case 2: - writeShort(size); + writeUint16(size); break; case 4: - writeLong(size); + writeUint32(size); break; default: throw new IllegalStateException("illegal width: " + width); @@ -444,11 +513,11 @@ abstract class AbstractEncoder implements Encoder { switch (t) { - case OCTET: - case UNSIGNED_BYTE: - writeOctet(coerce(Short.class, value)); + case BIN8: + case UINT8: + writeUint8(coerce(Short.class, value)); break; - case SIGNED_BYTE: + case INT8: put(coerce(Byte.class, value)); break; case CHAR: @@ -465,85 +534,78 @@ abstract class AbstractEncoder implements Encoder } break; - case TWO_OCTETS: - case UNSIGNED_SHORT: - writeShort(coerce(Integer.class, value)); + case BIN16: + case UINT16: + writeUint16(coerce(Integer.class, value)); break; - case SIGNED_SHORT: - writeShort(coerce(Short.class, value)); + case INT16: + writeUint16(coerce(Short.class, value)); break; - case FOUR_OCTETS: - case UNSIGNED_INT: - writeLong(coerce(Long.class, value)); + case BIN32: + case UINT32: + writeUint32(coerce(Long.class, value)); break; - case UTF32_CHAR: - case SIGNED_INT: - writeLong(coerce(Integer.class, value)); + case CHAR_UTF32: + case INT32: + writeUint32(coerce(Integer.class, value)); break; case FLOAT: - writeLong(Float.floatToIntBits(coerce(Float.class, value))); + writeUint32(Float.floatToIntBits(coerce(Float.class, value))); break; - case EIGHT_OCTETS: - case SIGNED_LONG: - case UNSIGNED_LONG: + case BIN64: + case UINT64: + case INT64: case DATETIME: - writeLonglong(coerce(Long.class, value)); + writeUint64(coerce(Long.class, value)); break; case DOUBLE: long bits = Double.doubleToLongBits(coerce(Double.class, value)); - writeLonglong(bits); - break; - - case SIXTEEN_OCTETS: - case THIRTY_TWO_OCTETS: - case SIXTY_FOUR_OCTETS: - case _128_OCTETS: - case SHORT_BINARY: - case BINARY: - case LONG_BINARY: - writeBytes(t, coerce(byte[].class, value)); + writeUint64(bits); break; case UUID: writeUuid(coerce(UUID.class, value)); break; - case SHORT_STRING: - case SHORT_UTF8_STRING: - case SHORT_UTF16_STRING: - case SHORT_UTF32_STRING: - case STRING: - case UTF8_STRING: - case UTF16_STRING: - case UTF32_STRING: - case LONG_STRING: - case LONG_UTF8_STRING: - case LONG_UTF16_STRING: - case LONG_UTF32_STRING: + case STR8: + writeStr8(coerce(String.class, value)); + break; + + case STR16: + writeStr16(coerce(String.class, value)); + break; + + case STR8_LATIN: + case STR8_UTF16: + case STR16_LATIN: + case STR16_UTF16: // XXX: need to do character conversion writeBytes(t, coerce(String.class, value).getBytes()); break; - case TABLE: - writeTable((Map<String,Object>) coerce(Map.class, value)); + case MAP: + writeMap((Map<String,Object>) coerce(Map.class, value)); break; - case SEQUENCE: - writeSequence(coerce(List.class, value)); + case LIST: + writeList(coerce(List.class, value)); break; case ARRAY: writeArray(coerce(List.class, value)); break; + case STRUCT32: + writeStruct32(coerce(Struct.class, value)); + break; - case FIVE_OCTETS: - case DECIMAL: - case NINE_OCTETS: - case LONG_DECIMAL: + case BIN40: + case DEC32: + case BIN72: + case DEC64: // XXX: what types are we supposed to use here? writeBytes(t, coerce(byte[].class, value)); break; diff --git a/java/common/src/main/java/org/apache/qpidity/transport/codec/Decoder.java b/java/common/src/main/java/org/apache/qpidity/transport/codec/Decoder.java index f0738e0a91..df2d208118 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/codec/Decoder.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/codec/Decoder.java @@ -38,26 +38,30 @@ public interface Decoder { boolean readBit(); - short readOctet(); - int readShort(); - long readLong(); - long readLonglong(); + short readUint8(); + int readUint16(); + long readUint32(); + long readUint64(); - long readTimestamp(); - - String readShortstr(); - String readLongstr(); - - RangeSet readRfc1982LongSet(); + long readDatetime(); UUID readUuid(); - String readContent(); + int readSequenceNo(); + RangeSet readSequenceSet(); // XXX + RangeSet readByteRanges(); // XXX - Struct readStruct(int type); - Struct readLongStruct(); + String readStr8(); + String readStr16(); - Map<String,Object> readTable(); - List<Object> readSequence(); + byte[] readVbin8(); + byte[] readVbin16(); + byte[] readVbin32(); + + Struct readStruct32(); + Map<String,Object> readMap(); + List<Object> readList(); List<Object> readArray(); + Struct readStruct(int type); + } diff --git a/java/common/src/main/java/org/apache/qpidity/transport/codec/Encoder.java b/java/common/src/main/java/org/apache/qpidity/transport/codec/Encoder.java index 1b2fe0213e..0449ba6702 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/codec/Encoder.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/codec/Encoder.java @@ -40,26 +40,30 @@ public interface Encoder void flush(); void writeBit(boolean b); - void writeOctet(short b); - void writeShort(int s); - void writeLong(long i); - void writeLonglong(long l); + void writeUint8(short b); + void writeUint16(int s); + void writeUint32(long i); + void writeUint64(long l); - void writeTimestamp(long l); - - void writeShortstr(String s); - void writeLongstr(String s); - - void writeRfc1982LongSet(RangeSet ranges); + void writeDatetime(long l); void writeUuid(UUID uuid); - void writeContent(String c); + void writeSequenceNo(int s); + void writeSequenceSet(RangeSet ranges); // XXX + void writeByteRanges(RangeSet ranges); // XXX - void writeStruct(int type, Struct s); - void writeLongStruct(Struct s); + void writeStr8(String s); + void writeStr16(String s); - void writeTable(Map<String,Object> table); - void writeSequence(List<Object> sequence); + void writeVbin8(byte[] bytes); + void writeVbin16(byte[] bytes); + void writeVbin32(byte[] bytes); + + void writeStruct32(Struct s); + void writeMap(Map<String,Object> map); + void writeList(List<Object> list); void writeArray(List<Object> array); + void writeStruct(int type, Struct s); + } diff --git a/java/common/src/main/java/org/apache/qpidity/transport/codec/Sizer.java b/java/common/src/main/java/org/apache/qpidity/transport/codec/Sizer.java index b98bf98239..8ffdd5150b 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/codec/Sizer.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/codec/Sizer.java @@ -38,30 +38,34 @@ public interface Sizer extends Encoder public static final Sizer NULL = new Sizer() { - public void flush() {}; + public void flush() {} - public void writeBit(boolean b) {}; - public void writeOctet(short b) {}; - public void writeShort(int s) {}; - public void writeLong(long i) {}; - public void writeLonglong(long l) {}; + public void writeBit(boolean b) {} + public void writeUint8(short b) {} + public void writeUint16(int s) {} + public void writeUint32(long i) {} + public void writeUint64(long l) {} - public void writeTimestamp(long l) {}; + public void writeDatetime(long l) {} + public void writeUuid(UUID uuid) {} - public void writeShortstr(String s) {}; - public void writeLongstr(String s) {}; + public void writeSequenceNo(int s) {} + public void writeSequenceSet(RangeSet ranges) {} // XXX + public void writeByteRanges(RangeSet ranges) {} // XXX - public void writeRfc1982LongSet(RangeSet ranges) {}; - public void writeUuid(UUID uuid) {}; + public void writeStr8(String s) {} + public void writeStr16(String s) {} - public void writeContent(String c) {}; + public void writeVbin8(byte[] bytes) {} + public void writeVbin16(byte[] bytes) {} + public void writeVbin32(byte[] bytes) {} - public void writeStruct(int type, Struct s) {}; - public void writeLongStruct(Struct s) {}; + public void writeStruct32(Struct s) {} + public void writeMap(Map<String,Object> map) {} + public void writeList(List<Object> list) {} + public void writeArray(List<Object> array) {} - public void writeTable(Map<String,Object> table) {}; - public void writeSequence(List<Object> sequence) {}; - public void writeArray(List<Object> array) {}; + public void writeStruct(int type, Struct s) {} public int getSize() { return 0; } diff --git a/java/common/src/main/java/org/apache/qpidity/transport/network/Assembler.java b/java/common/src/main/java/org/apache/qpidity/transport/network/Assembler.java index 5c35596fd8..d493245f0c 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/network/Assembler.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/network/Assembler.java @@ -37,6 +37,7 @@ import org.apache.qpidity.transport.ProtocolError; import org.apache.qpidity.transport.ProtocolEvent; import org.apache.qpidity.transport.ProtocolHeader; import org.apache.qpidity.transport.Receiver; +import org.apache.qpidity.transport.SegmentType; import org.apache.qpidity.transport.Struct; @@ -118,7 +119,7 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate { switch (frame.getType()) { - case Frame.BODY: + case BODY: emit(frame, new Data(frame, frame.isFirstFrame(), frame.isLastFrame())); break; @@ -158,22 +159,29 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate } } - private ProtocolEvent decode(Frame frame, byte type, List<ByteBuffer> segment) + private ProtocolEvent decode(Frame frame, SegmentType type, List<ByteBuffer> segment) { FragmentDecoder dec = new FragmentDecoder(segment.iterator()); switch (type) { - case Frame.METHOD: - int methodType = dec.readShort(); - Method method = Method.create(methodType); - method.read(dec); - return method; - case Frame.HEADER: + case CONTROL: + int controlType = dec.readUint16(); + Method control = Method.create(controlType); + control.read(dec); + return control; + case COMMAND: + int commandType = dec.readUint16(); + // read in the session header, right now we don't use it + dec.readUint16(); + Method command = Method.create(commandType); + command.read(dec); + return command; + case HEADER: List<Struct> structs = new ArrayList(); while (dec.hasRemaining()) { - structs.add(dec.readLongStruct()); + structs.add(dec.readStruct32()); } return new Header(structs,frame.isLastFrame() && frame.isLastSegment()); default: diff --git a/java/common/src/main/java/org/apache/qpidity/transport/network/Disassembler.java b/java/common/src/main/java/org/apache/qpidity/transport/network/Disassembler.java index d38d8ded98..0357df6e86 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/network/Disassembler.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/network/Disassembler.java @@ -31,6 +31,7 @@ import org.apache.qpidity.transport.ProtocolDelegate; import org.apache.qpidity.transport.ProtocolError; import org.apache.qpidity.transport.ProtocolEvent; import org.apache.qpidity.transport.ProtocolHeader; +import org.apache.qpidity.transport.SegmentType; import org.apache.qpidity.transport.Sender; import org.apache.qpidity.transport.Struct; @@ -76,50 +77,50 @@ public class Disassembler implements Sender<ConnectionEvent>, sender.close(); } - private void fragment(byte flags, byte type, ConnectionEvent event, + private void fragment(byte flags, SegmentType type, ConnectionEvent event, ByteBuffer buf, boolean first, boolean last) { if(!buf.hasRemaining()) { //empty data - byte nflags = flags; + byte nflags = flags; if (first) { nflags |= FIRST_FRAME; first = false; } - nflags |= LAST_FRAME; - Frame frame = new Frame(nflags, type, + nflags |= LAST_FRAME; + Frame frame = new Frame(nflags, type, event.getProtocolEvent().getEncodedTrack(), event.getChannel()); - // frame.addFragment(buf); + // frame.addFragment(buf); sender.send(frame); } else { - while (buf.hasRemaining()) - { - ByteBuffer slice = buf.slice(); - slice.limit(min(maxPayload, slice.remaining())); - buf.position(buf.position() + slice.remaining()); - - byte newflags = flags; - if (first) + while (buf.hasRemaining()) { - newflags |= FIRST_FRAME; - first = false; + ByteBuffer slice = buf.slice(); + slice.limit(min(maxPayload, slice.remaining())); + buf.position(buf.position() + slice.remaining()); + + byte newflags = flags; + if (first) + { + newflags |= FIRST_FRAME; + first = false; + } + if (last && !buf.hasRemaining()) + { + newflags |= LAST_FRAME; + } + + Frame frame = new Frame(newflags, type, + event.getProtocolEvent().getEncodedTrack(), + event.getChannel()); + frame.addFragment(slice); + sender.send(frame); } - if (last && !buf.hasRemaining()) - { - newflags |= LAST_FRAME; - } - - Frame frame = new Frame(newflags, type, - event.getProtocolEvent().getEncodedTrack(), - event.getChannel()); - frame.addFragment(slice); - sender.send(frame); - } } } @@ -128,15 +129,40 @@ public class Disassembler implements Sender<ConnectionEvent>, sender.send(header); } - public void method(ConnectionEvent event, Method method) + public void control(ConnectionEvent event, Method method) + { + method(event, method, SegmentType.CONTROL); + } + + public void command(ConnectionEvent event, Method method) + { + method(event, method, SegmentType.COMMAND); + } + + private void method(ConnectionEvent event, Method method, SegmentType type) { SizeEncoder sizer = new SizeEncoder(); - sizer.writeShort(method.getEncodedType()); + sizer.writeUint16(method.getEncodedType()); + if (type == SegmentType.COMMAND) + { + sizer.writeUint16(0); + } method.write(sizer); ByteBuffer buf = ByteBuffer.allocate(sizer.size()); BBEncoder enc = new BBEncoder(buf); - enc.writeShort(method.getEncodedType()); + enc.writeUint16(method.getEncodedType()); + if (type == SegmentType.COMMAND) + { + if (method.isSync()) + { + enc.writeUint16(0x0101); + } + else + { + enc.writeUint16(0x0100); + } + } method.write(enc); enc.flush(); buf.flip(); @@ -148,7 +174,7 @@ public class Disassembler implements Sender<ConnectionEvent>, flags |= LAST_SEG; } - fragment(flags, METHOD, event, buf, true, true); + fragment(flags, type, event, buf, true, true); } public void header(ConnectionEvent event, Header header) @@ -159,24 +185,24 @@ public class Disassembler implements Sender<ConnectionEvent>, SizeEncoder sizer = new SizeEncoder(); for (Struct st : header.getStructs()) { - sizer.writeLongStruct(st); + sizer.writeStruct32(st); } buf = ByteBuffer.allocate(sizer.size()); BBEncoder enc = new BBEncoder(buf); for (Struct st : header.getStructs()) { - enc.writeLongStruct(st); + enc.writeStruct32(st); enc.flush(); } header.setBuf(buf); } else { - buf = header.getBuf(); + buf = header.getBuf(); } buf.flip(); - fragment((byte) 0x0, HEADER, event, buf, true, true); + fragment((byte) 0x0, SegmentType.HEADER, event, buf, true, true); } public void data(ConnectionEvent event, Data data) @@ -187,7 +213,7 @@ public class Disassembler implements Sender<ConnectionEvent>, { ByteBuffer buf = it.next(); boolean last = data.isLast() && !it.hasNext(); - fragment(LAST_SEG, BODY, event, buf, first, last); + fragment(LAST_SEG, SegmentType.BODY, event, buf, first, last); first = false; } } diff --git a/java/common/src/main/java/org/apache/qpidity/transport/network/Frame.java b/java/common/src/main/java/org/apache/qpidity/transport/network/Frame.java index a5c5db4dba..c5fb7b9986 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/network/Frame.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/network/Frame.java @@ -20,6 +20,7 @@ */ package org.apache.qpidity.transport.network; +import org.apache.qpidity.transport.SegmentType; import org.apache.qpidity.transport.util.SliceIterator; import java.nio.ByteBuffer; @@ -48,10 +49,6 @@ public class Frame implements NetworkEvent, Iterable<ByteBuffer> public static final byte L3 = 2; public static final byte L4 = 3; - public static final byte METHOD = 1; - public static final byte HEADER = 2; - public static final byte BODY = 3; - public static final byte RESERVED = 0x0; public static final byte VERSION = 0x0; @@ -62,13 +59,13 @@ public class Frame implements NetworkEvent, Iterable<ByteBuffer> public static final byte LAST_FRAME = 0x1; final private byte flags; - final private byte type; + final private SegmentType type; final private byte track; final private int channel; final private List<ByteBuffer> fragments; private int size; - public Frame(byte flags, byte type, byte track, int channel) + public Frame(byte flags, SegmentType type, byte track, int channel) { this.flags = flags; this.type = type; @@ -99,7 +96,7 @@ public class Frame implements NetworkEvent, Iterable<ByteBuffer> return size; } - public byte getType() + public SegmentType getType() { return type; } @@ -153,7 +150,7 @@ public class Frame implements NetworkEvent, Iterable<ByteBuffer> { StringBuilder str = new StringBuilder(); str.append(String.format - ("[%05d %05d %1d %1d %d%d%d%d]", getChannel(), getSize(), + ("[%05d %05d %1d %1d %d%d%d%d] ", getChannel(), getSize(), getTrack(), getType(), isFirstSegment() ? 1 : 0, isLastSegment() ? 1 : 0, isFirstFrame() ? 1 : 0, isLastFrame() ? 1 : 0)); @@ -170,7 +167,7 @@ public class Frame implements NetworkEvent, Iterable<ByteBuffer> str.append(" | "); } - str.append(str(buf, 20)); + str.append(str(buf)); } return str.toString(); diff --git a/java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java b/java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java index 871c45743e..2d41a9f516 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java @@ -22,10 +22,10 @@ package org.apache.qpidity.transport.network; import java.nio.ByteBuffer; -import org.apache.qpidity.transport.Constant; import org.apache.qpidity.transport.ProtocolError; import org.apache.qpidity.transport.ProtocolHeader; import org.apache.qpidity.transport.Receiver; +import org.apache.qpidity.transport.SegmentType; import static org.apache.qpidity.transport.util.Functions.*; @@ -77,7 +77,7 @@ public class InputHandler implements Receiver<ByteBuffer> private byte minor; private byte flags; - private byte type; + private SegmentType type; private byte track; private int channel; private int size; @@ -146,7 +146,7 @@ public class InputHandler implements Receiver<ByteBuffer> flags = buf.get(); return FRAME_HDR_TYPE; case FRAME_HDR_TYPE: - type = buf.get(); + type = SegmentType.get(buf.get()); return FRAME_HDR_SIZE1; case FRAME_HDR_SIZE1: size = (0xFF & buf.get()) << 8; @@ -218,7 +218,7 @@ public class InputHandler implements Receiver<ByteBuffer> return FRAME_END; } case FRAME_END: - return expect(buf, Constant.FRAME_END, FRAME_HDR); + return expect(buf, OutputHandler.FRAME_END, FRAME_HDR); default: throw new IllegalStateException(); } diff --git a/java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java b/java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java index 9f770bcb1c..8f615cf80d 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java @@ -67,11 +67,13 @@ public class OutputHandler implements Sender<NetworkEvent>, NetworkDelegate } } + public static final int FRAME_END = 0xCE; + public void frame(Frame frame) { ByteBuffer hdr = ByteBuffer.allocate(HEADER_SIZE + frame.getSize() + 1); hdr.put(frame.getFlags()); - hdr.put(frame.getType()); + hdr.put((byte) frame.getType().getValue()); hdr.putShort((short) (frame.getSize() + HEADER_SIZE)); hdr.put(RESERVED); hdr.put(frame.getTrack()); @@ -84,7 +86,7 @@ public class OutputHandler implements Sender<NetworkEvent>, NetworkDelegate { hdr.put(buf); } - hdr.put((byte) Constant.FRAME_END); + hdr.put((byte) FRAME_END); hdr.flip(); synchronized (lock) { diff --git a/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaHandler.java b/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaHandler.java index 2e4875cf42..7da719087c 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaHandler.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaHandler.java @@ -200,7 +200,7 @@ public class MinaHandler<E> implements IoHandler ConnectionDelegate delegate) { return connect(host, port, new ConnectionBinding - (delegate, InputHandler.State.FRAME_HDR)); + (delegate, InputHandler.State.PROTO_HDR)); } private static class ConnectionBinding diff --git a/java/common/src/main/java/org/apache/qpidity/transport/util/Functions.java b/java/common/src/main/java/org/apache/qpidity/transport/util/Functions.java index 7dbb3d30e8..14e756c047 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/util/Functions.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/util/Functions.java @@ -52,15 +52,24 @@ public class Functions public static final String str(ByteBuffer buf, int limit) { StringBuilder str = new StringBuilder(); + str.append('"'); + for (int i = 0; i < min(buf.remaining(), limit); i++) { - if (i > 0 && i % 2 == 0) + byte c = buf.get(buf.position() + i); + + if (c > 31 && c < 127 && c != '\\') { - str.append(" "); + str.append((char)c); + } + else + { + str.append(String.format("\\x%02x", c)); } - str.append(String.format("%02x", buf.get(buf.position() + i))); } + str.append('"'); + return str.toString(); } diff --git a/java/common/src/test/java/org/apache/qpidity/transport/ConnectionTest.java b/java/common/src/test/java/org/apache/qpidity/transport/ConnectionTest.java index d7ece9b745..1a83786e12 100644 --- a/java/common/src/test/java/org/apache/qpidity/transport/ConnectionTest.java +++ b/java/common/src/test/java/org/apache/qpidity/transport/ConnectionTest.java @@ -100,12 +100,12 @@ public class ConnectionTest extends TestCase } Channel ch = conn.getChannel(0); - Session ssn = new Session(); + Session ssn = new Session("test".getBytes()); ssn.attach(ch); try { - ssn.sessionOpen(1234); + ssn.sessionAttach(ssn.getName()); fail("writing to a closed socket succeeded"); } catch (TransportException e) |