/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ using System; using System.Collections; using log4net; using Apache.Qpid.Buffer; using Apache.Qpid.Codec; using Apache.Qpid.Codec.Demux; namespace Apache.Qpid.Framing { public class AMQDataBlockDecoder : IMessageDecoder { private static ILog _logger = LogManager.GetLogger(typeof(AMQDataBlockDecoder)); private Hashtable _supportedBodies = new Hashtable(); private bool _disabled = false; public AMQDataBlockDecoder() { _supportedBodies[AMQMethodBody.TYPE] = AMQMethodBodyFactory.GetInstance(); _supportedBodies[ContentHeaderBody.TYPE] = ContentHeaderBodyFactory.GetInstance(); _supportedBodies[ContentBody.TYPE] = ContentBodyFactory.GetInstance(); _supportedBodies[HeartbeatBody.TYPE] = new HeartbeatBodyFactory(); } public MessageDecoderResult Decodable(ByteBuffer input) { if (_disabled) { return MessageDecoderResult.NOT_OK; } // final +1 represents the command end which we know we must require even // if there is an empty body if (input.Remaining < 1) { return MessageDecoderResult.NEED_DATA; } byte type = input.GetByte(); // we have to check this isn't a protocol initiation frame here - we can't tell later on and we end up // waiting for more data. This could be improved if MINA supported some kind of state awareness when decoding if ((char)type == 'A') { _logger.Error("Received what appears to be a protocol initiation frame"); return MessageDecoderResult.NOT_OK; } // zero, channel, body size and end byte if (input.Remaining < (1 + 2 + 4 + 1)) { return MessageDecoderResult.NEED_DATA; } int channel = input.GetUInt16(); long bodySize = input.GetUInt32(); // bodySize can be zero if (type <= 0 || channel < 0 || bodySize < 0) { _logger.Error(String.Format("Error decoding frame: Type={0}, Channel={1}, BodySize={2}", type, channel, bodySize)); return MessageDecoderResult.NOT_OK; } if (input.Remaining < (bodySize + 1)) { return MessageDecoderResult.NEED_DATA; } if (IsSupportedFrameType(type)) { if (_logger.IsDebugEnabled) { // we have read 7 bytes so far, so output 7 + bodysize + 1 (for end byte) to get complete data block size // this logging statement is useful when looking at exactly what size of data is coming in/out // the broker _logger.Debug("Able to decode data block of size " + (bodySize + 8)); } return MessageDecoderResult.OK; } else { return MessageDecoderResult.NOT_OK; } } private bool IsSupportedFrameType(byte frameType) { bool result = _supportedBodies.ContainsKey(frameType); if (!result) { _logger.Warn("AMQDataBlockDecoder does not handle frame type " + frameType); } return result; } protected Object CreateAndPopulateFrame(ByteBuffer input) { byte type = input.GetByte(); ushort channel = input.GetUInt16(); uint bodySize = input.GetUInt32(); IBodyFactory bodyFactory = (IBodyFactory)_supportedBodies[type]; if (bodyFactory == null) { throw new AMQFrameDecodingException("Unsupported body type: " + type); } AMQFrame frame = new AMQFrame(); frame.PopulateFromBuffer(input, channel, bodySize, bodyFactory); byte marker = input.GetByte(); if (marker != 0xCE) { throw new FormatException("marker is not 0xCE"); } return frame; } public MessageDecoderResult Decode(ByteBuffer input, IProtocolDecoderOutput output) { output.Write(CreateAndPopulateFrame(input)); return MessageDecoderResult.OK; } public bool Disabled { set { _disabled = value; } } } }