diff options
Diffstat (limited to 'qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/FrameHandler.java')
-rw-r--r-- | qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/FrameHandler.java | 658 |
1 files changed, 329 insertions, 329 deletions
diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/FrameHandler.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/FrameHandler.java index 76a28d23e9..99b25d4193 100644 --- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/FrameHandler.java +++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/FrameHandler.java @@ -1,329 +1,329 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.amqp_1_0.framing;
-
-import org.apache.qpid.amqp_1_0.codec.BinaryWriter;
-import org.apache.qpid.amqp_1_0.codec.ProtocolHandler;
-import org.apache.qpid.amqp_1_0.codec.ValueHandler;
-import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint;
-import org.apache.qpid.amqp_1_0.type.AmqpErrorException;
-import org.apache.qpid.amqp_1_0.type.ErrorCondition;
-import org.apache.qpid.amqp_1_0.type.transport.ConnectionError;
-import org.apache.qpid.amqp_1_0.type.transport.Error;
-import org.apache.qpid.amqp_1_0.type.transport.Transfer;
-
-import java.nio.ByteBuffer;
-import java.util.Formatter;
-
-public class FrameHandler implements ProtocolHandler
-{
- private ConnectionEndpoint _connection;
- private ValueHandler _typeHandler;
-
- enum State {
- SIZE_0,
- SIZE_1,
- SIZE_2,
- SIZE_3,
- PRE_PARSE,
- BUFFERING,
- PARSING,
- ERROR
- }
-
- private State _state = State.SIZE_0;
- private int _size;
-
- private ByteBuffer _buffer;
-
-
-
- public FrameHandler(final ConnectionEndpoint connection)
- {
- _connection = connection;
- _typeHandler = new ValueHandler(connection.getDescribedTypeRegistry());
-
- }
-
- public ProtocolHandler parse(ByteBuffer in)
- {
- try
- {
- Error frameParsingError = null;
- int size = _size;
- State state = _state;
- ByteBuffer oldIn = null;
-
- while(in.hasRemaining() && state != State.ERROR)
- {
-
- final int remaining = in.remaining();
- if(remaining == 0)
- {
- return this;
- }
-
-
- switch(state)
- {
- case SIZE_0:
- if(remaining >= 4)
- {
- size = in.getInt();
- state = State.PRE_PARSE;
- break;
- }
- else
- {
- size = (in.get() << 24) & 0xFF000000;
- if(!in.hasRemaining())
- {
- state = State.SIZE_1;
- break;
- }
- }
- case SIZE_1:
- size |= (in.get() << 16) & 0xFF0000;
- if(!in.hasRemaining())
- {
- state = State.SIZE_2;
- break;
- }
- case SIZE_2:
- size |= (in.get() << 8) & 0xFF00;
- if(!in.hasRemaining())
- {
- state = State.SIZE_3;
- break;
- }
- case SIZE_3:
- size |= in.get() & 0xFF;
- state = State.PRE_PARSE;
-
- case PRE_PARSE:
-
- if(size < 8)
- {
- frameParsingError = createFramingError("specified frame size %d smaller than minimum frame header size %d", _size, 8);
- state = State.ERROR;
- break;
- }
-
- else if(size > _connection.getMaxFrameSize())
- {
- frameParsingError = createFramingError("specified frame size %d larger than maximum frame header size %d", size, _connection.getMaxFrameSize());
- state = State.ERROR;
- break;
- }
-
- if(in.remaining() < size-4)
- {
- _buffer = ByteBuffer.allocate(size-4);
- _buffer.put(in);
- state = State.BUFFERING;
- break;
- }
- case BUFFERING:
- if(_buffer != null)
- {
- if(in.remaining() < _buffer.remaining())
- {
- _buffer.put(in);
- break;
- }
- else
- {
- ByteBuffer dup = in.duplicate();
- dup.limit(dup.position()+_buffer.remaining());
- int i = _buffer.remaining();
- int d = dup.remaining();
- in.position(in.position()+_buffer.remaining());
- _buffer.put(dup);
- oldIn = in;
- _buffer.flip();
- in = _buffer;
- state = State.PARSING;
- }
- }
-
- case PARSING:
-
- int dataOffset = (in.get() << 2) & 0x3FF;
-
- if(dataOffset < 8)
- {
- frameParsingError = createFramingError("specified frame data offset %d smaller than minimum frame header size %d", dataOffset, 8);
- state = State.ERROR;
- break;
- }
- else if(dataOffset > size)
- {
- frameParsingError = createFramingError("specified frame data offset %d larger than the frame size %d", dataOffset, _size);
- state = State.ERROR;
- break;
- }
-
- // type
-
- int type = in.get() & 0xFF;
- int channel = in.getShort() & 0xFF;
-
- if(type != 0 && type != 1)
- {
- frameParsingError = createFramingError("unknown frame type: %d", type);
- state = State.ERROR;
- break;
- }
-
- // channel
-
- /*if(channel > _connection.getChannelMax())
- {
- frameParsingError = createError(AmqpError.DECODE_ERROR,
- "frame received on invalid channel %d above channel-max %d",
- channel, _connection.getChannelMax());
-
- state = State.ERROR;
- }
-*/
- // ext header
- if(dataOffset!=8)
- {
- in.position(in.position()+dataOffset-8);
- }
-
- // oldIn null iff not working on duplicated buffer
- if(oldIn == null)
- {
- oldIn = in;
- in = in.duplicate();
- final int endPos = in.position() + size - dataOffset;
- in.limit(endPos);
- oldIn.position(endPos);
-
- }
-
- int inPos = in.position();
- int inLimit = in.limit();
- // PARSE HERE
- try
- {
- Object val = _typeHandler.parse(in);
-
- if(in.hasRemaining())
- {
- if(val instanceof Transfer)
- {
- ByteBuffer buf = ByteBuffer.allocate(in.remaining());
- buf.put(in);
- buf.flip();
- ((Transfer)val).setPayload(buf);
- }
- }
-
- _connection.receive((short)channel,val);
- reset();
- in = oldIn;
- oldIn = null;
- _buffer = null;
- state = State.SIZE_0;
- break;
-
-
- }
- catch (AmqpErrorException ex)
- {
- state = State.ERROR;
- frameParsingError = ex.getError();
- }
- catch (RuntimeException e)
- {
- in.position(inPos);
- in.limit(inLimit);
- System.err.println(toHex(in));
- throw e;
- }
- }
-
- }
-
- _state = state;
- _size = size;
-
- if(_state == State.ERROR)
- {
- _connection.handleError(frameParsingError);
- }
- return this;
- }
- catch(RuntimeException e)
- {
- e.printStackTrace();
- throw e;
- }
- }
-
- private static String toHex(ByteBuffer in)
- {
- Formatter formatter = new Formatter();
- int count = 0;
- while(in.hasRemaining())
- {
- formatter.format("%02x ", in.get() & 0xFF);
- if(count++ == 16)
- {
- formatter.format("\n");
- count = 0;
- }
-
- }
- return formatter.toString();
- }
-
- private Error createFramingError(String description, Object... args)
- {
- return createError(ConnectionError.FRAMING_ERROR, description, args);
- }
-
- private Error createError(final ErrorCondition framingError,
- final String description,
- final Object... args)
- {
- Error error = new Error();
- error.setCondition(framingError);
- Formatter formatter = new Formatter();
- error.setDescription(formatter.format(description, args).toString());
- return error;
- }
-
-
- private void reset()
- {
- _size = 0;
- _state = State.SIZE_0;
- }
-
-
- public boolean isDone()
- {
- return _state == State.ERROR || _connection.closedForInput();
- }
-}
+/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.amqp_1_0.framing; + +import org.apache.qpid.amqp_1_0.codec.BinaryWriter; +import org.apache.qpid.amqp_1_0.codec.ProtocolHandler; +import org.apache.qpid.amqp_1_0.codec.ValueHandler; +import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint; +import org.apache.qpid.amqp_1_0.type.AmqpErrorException; +import org.apache.qpid.amqp_1_0.type.ErrorCondition; +import org.apache.qpid.amqp_1_0.type.transport.ConnectionError; +import org.apache.qpid.amqp_1_0.type.transport.Error; +import org.apache.qpid.amqp_1_0.type.transport.Transfer; + +import java.nio.ByteBuffer; +import java.util.Formatter; + +public class FrameHandler implements ProtocolHandler +{ + private ConnectionEndpoint _connection; + private ValueHandler _typeHandler; + + enum State { + SIZE_0, + SIZE_1, + SIZE_2, + SIZE_3, + PRE_PARSE, + BUFFERING, + PARSING, + ERROR + } + + private State _state = State.SIZE_0; + private int _size; + + private ByteBuffer _buffer; + + + + public FrameHandler(final ConnectionEndpoint connection) + { + _connection = connection; + _typeHandler = new ValueHandler(connection.getDescribedTypeRegistry()); + + } + + public ProtocolHandler parse(ByteBuffer in) + { + try + { + Error frameParsingError = null; + int size = _size; + State state = _state; + ByteBuffer oldIn = null; + + while(in.hasRemaining() && state != State.ERROR) + { + + final int remaining = in.remaining(); + if(remaining == 0) + { + return this; + } + + + switch(state) + { + case SIZE_0: + if(remaining >= 4) + { + size = in.getInt(); + state = State.PRE_PARSE; + break; + } + else + { + size = (in.get() << 24) & 0xFF000000; + if(!in.hasRemaining()) + { + state = State.SIZE_1; + break; + } + } + case SIZE_1: + size |= (in.get() << 16) & 0xFF0000; + if(!in.hasRemaining()) + { + state = State.SIZE_2; + break; + } + case SIZE_2: + size |= (in.get() << 8) & 0xFF00; + if(!in.hasRemaining()) + { + state = State.SIZE_3; + break; + } + case SIZE_3: + size |= in.get() & 0xFF; + state = State.PRE_PARSE; + + case PRE_PARSE: + + if(size < 8) + { + frameParsingError = createFramingError("specified frame size %d smaller than minimum frame header size %d", _size, 8); + state = State.ERROR; + break; + } + + else if(size > _connection.getMaxFrameSize()) + { + frameParsingError = createFramingError("specified frame size %d larger than maximum frame header size %d", size, _connection.getMaxFrameSize()); + state = State.ERROR; + break; + } + + if(in.remaining() < size-4) + { + _buffer = ByteBuffer.allocate(size-4); + _buffer.put(in); + state = State.BUFFERING; + break; + } + case BUFFERING: + if(_buffer != null) + { + if(in.remaining() < _buffer.remaining()) + { + _buffer.put(in); + break; + } + else + { + ByteBuffer dup = in.duplicate(); + dup.limit(dup.position()+_buffer.remaining()); + int i = _buffer.remaining(); + int d = dup.remaining(); + in.position(in.position()+_buffer.remaining()); + _buffer.put(dup); + oldIn = in; + _buffer.flip(); + in = _buffer; + state = State.PARSING; + } + } + + case PARSING: + + int dataOffset = (in.get() << 2) & 0x3FF; + + if(dataOffset < 8) + { + frameParsingError = createFramingError("specified frame data offset %d smaller than minimum frame header size %d", dataOffset, 8); + state = State.ERROR; + break; + } + else if(dataOffset > size) + { + frameParsingError = createFramingError("specified frame data offset %d larger than the frame size %d", dataOffset, _size); + state = State.ERROR; + break; + } + + // type + + int type = in.get() & 0xFF; + int channel = in.getShort() & 0xFF; + + if(type != 0 && type != 1) + { + frameParsingError = createFramingError("unknown frame type: %d", type); + state = State.ERROR; + break; + } + + // channel + + /*if(channel > _connection.getChannelMax()) + { + frameParsingError = createError(AmqpError.DECODE_ERROR, + "frame received on invalid channel %d above channel-max %d", + channel, _connection.getChannelMax()); + + state = State.ERROR; + } +*/ + // ext header + if(dataOffset!=8) + { + in.position(in.position()+dataOffset-8); + } + + // oldIn null iff not working on duplicated buffer + if(oldIn == null) + { + oldIn = in; + in = in.duplicate(); + final int endPos = in.position() + size - dataOffset; + in.limit(endPos); + oldIn.position(endPos); + + } + + int inPos = in.position(); + int inLimit = in.limit(); + // PARSE HERE + try + { + Object val = _typeHandler.parse(in); + + if(in.hasRemaining()) + { + if(val instanceof Transfer) + { + ByteBuffer buf = ByteBuffer.allocate(in.remaining()); + buf.put(in); + buf.flip(); + ((Transfer)val).setPayload(buf); + } + } + + _connection.receive((short)channel,val); + reset(); + in = oldIn; + oldIn = null; + _buffer = null; + state = State.SIZE_0; + break; + + + } + catch (AmqpErrorException ex) + { + state = State.ERROR; + frameParsingError = ex.getError(); + } + catch (RuntimeException e) + { + in.position(inPos); + in.limit(inLimit); + System.err.println(toHex(in)); + throw e; + } + } + + } + + _state = state; + _size = size; + + if(_state == State.ERROR) + { + _connection.handleError(frameParsingError); + } + return this; + } + catch(RuntimeException e) + { + e.printStackTrace(); + throw e; + } + } + + private static String toHex(ByteBuffer in) + { + Formatter formatter = new Formatter(); + int count = 0; + while(in.hasRemaining()) + { + formatter.format("%02x ", in.get() & 0xFF); + if(count++ == 16) + { + formatter.format("\n"); + count = 0; + } + + } + return formatter.toString(); + } + + private Error createFramingError(String description, Object... args) + { + return createError(ConnectionError.FRAMING_ERROR, description, args); + } + + private Error createError(final ErrorCondition framingError, + final String description, + final Object... args) + { + Error error = new Error(); + error.setCondition(framingError); + Formatter formatter = new Formatter(); + error.setDescription(formatter.format(description, args).toString()); + return error; + } + + + private void reset() + { + _size = 0; + _state = State.SIZE_0; + } + + + public boolean isDone() + { + return _state == State.ERROR || _connection.closedForInput(); + } +} |