diff options
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/BrokerReceiver.java')
-rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/BrokerReceiver.java | 319 |
1 files changed, 319 insertions, 0 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/BrokerReceiver.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/BrokerReceiver.java new file mode 100644 index 0000000000..393a350c07 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/BrokerReceiver.java @@ -0,0 +1,319 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +package org.apache.qpid.server.protocol; + +import java.nio.ByteBuffer; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.log4j.Logger; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.messages.ConnectionMessages; +import org.apache.qpid.server.protocol.BrokerReceiverFactory.VERSION; +import org.apache.qpid.server.registry.IApplicationRegistry; +import org.apache.qpid.server.transport.ServerConnection; +import org.apache.qpid.transport.ConnectionDelegate; +import org.apache.qpid.transport.Receiver; +import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.network.NetworkConnection; +import org.apache.qpid.transport.network.NetworkTransport; + +public class BrokerReceiver implements Receiver<java.nio.ByteBuffer> +{ + private static final Logger _logger = Logger.getLogger(BrokerReceiver.class); + + private static final AtomicLong _idGenerator = new AtomicLong(0); + + private NetworkConnection _network; + private NetworkTransport _transport; + private Sender<ByteBuffer> _sender; + private Set<VERSION> _supported; + private String _fqdn; + private IApplicationRegistry _appRegistry; + private long _conenctionId; + + private volatile Receiver<java.nio.ByteBuffer> _delegate = new SelfDelegateProtocolEngine(); + + public BrokerReceiver(IApplicationRegistry appRegistry, + String fqdn, + Set<VERSION> supported, + NetworkTransport transport, + NetworkConnection network) + { + _appRegistry = appRegistry; + _fqdn = fqdn; + _supported = supported; + _transport = transport; + _network = network; + _sender = _network.getSender(); + _conenctionId = _idGenerator.incrementAndGet(); + + CurrentActor.get().message(ConnectionMessages.OPEN(null, null, false, false)); + } + + public void closed() + { + _delegate.closed(); + _network.close(); + } + + public void received(ByteBuffer msg) + { + _delegate.received(msg); + } + + public void exception(Throwable t) + { + _delegate.exception(t); + } + + private static final int MINIMUM_REQUIRED_HEADER_BYTES = 8; + + private static final byte[] AMQP_0_8_HEADER = + new byte[] { (byte) 'A', + (byte) 'M', + (byte) 'Q', + (byte) 'P', + (byte) 1, + (byte) 1, + (byte) 8, + (byte) 0 + }; + + private static final byte[] AMQP_0_9_HEADER = + new byte[] { (byte) 'A', + (byte) 'M', + (byte) 'Q', + (byte) 'P', + (byte) 1, + (byte) 1, + (byte) 0, + (byte) 9 + }; + +private static final byte[] AMQP_0_9_1_HEADER = + new byte[] { (byte) 'A', + (byte) 'M', + (byte) 'Q', + (byte) 'P', + (byte) 0, + (byte) 0, + (byte) 9, + (byte) 1 + }; + + + private static final byte[] AMQP_0_10_HEADER = + new byte[] { (byte) 'A', + (byte) 'M', + (byte) 'Q', + (byte) 'P', + (byte) 1, + (byte) 1, + (byte) 0, + (byte) 10 + }; + + private static interface DelegateCreator + { + VERSION getVersion(); + byte[] getHeaderIdentifier(); + Receiver<java.nio.ByteBuffer> getProtocolEngine(); + } + + private DelegateCreator creator_0_8 = new DelegateCreator() + { + public VERSION getVersion() + { + return VERSION.v0_8; + } + + public byte[] getHeaderIdentifier() + { + return AMQP_0_8_HEADER; + } + + public Receiver<java.nio.ByteBuffer> getProtocolEngine() + { + return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _transport, _network, _sender, _conenctionId); + } + }; + + private DelegateCreator creator_0_9 = new DelegateCreator() + { + public VERSION getVersion() + { + return VERSION.v0_9; + } + + public byte[] getHeaderIdentifier() + { + return AMQP_0_9_HEADER; + } + + public Receiver<java.nio.ByteBuffer> getProtocolEngine() + { + return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _transport, _network, _sender, _conenctionId); + } + }; + + private DelegateCreator creator_0_9_1 = new DelegateCreator() + { + public VERSION getVersion() + { + return VERSION.v0_9_1; + } + + public byte[] getHeaderIdentifier() + { + return AMQP_0_9_1_HEADER; + } + + public Receiver<java.nio.ByteBuffer> getProtocolEngine() + { + return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _transport, _network, _sender, _conenctionId); + } + }; + + private DelegateCreator creator_0_10 = new DelegateCreator() + { + public VERSION getVersion() + { + return VERSION.v0_10; + } + + public byte[] getHeaderIdentifier() + { + return AMQP_0_10_HEADER; + } + + public Receiver<java.nio.ByteBuffer> getProtocolEngine() + { + final ConnectionDelegate connDelegate = new org.apache.qpid.server.transport.ServerConnectionDelegate(_appRegistry, _fqdn); + + ServerConnection conn = new ServerConnection(_conenctionId); + conn.setConnectionDelegate(connDelegate); + + return new ProtocolEngine_0_10(conn, _appRegistry, _network); + } + }; + + private final DelegateCreator[] _creators = new DelegateCreator[] { creator_0_8, creator_0_9, creator_0_9_1, creator_0_10 }; + + + private class ClosedDelegateProtocolEngine implements Receiver<java.nio.ByteBuffer> + { + public void received(ByteBuffer msg) + { + _logger.error("Error processing incoming data, could not negotiate a common protocol"); + } + + public void exception(Throwable t) + { + _logger.error("Error establishing session", t); + } + + public void closed() + { + + } + } + + private class SelfDelegateProtocolEngine implements Receiver<java.nio.ByteBuffer> + { + private final ByteBuffer _header = ByteBuffer.allocate(MINIMUM_REQUIRED_HEADER_BYTES); + + public void received(ByteBuffer msg) + { + ByteBuffer msgheader = msg.duplicate(); + if(_header.remaining() > msgheader.limit()) + { + msg.position(msg.limit()); + } + else + { + msgheader.limit(_header.remaining()); + msg.position(_header.remaining()); + } + + _header.put(msgheader); + + if(!_header.hasRemaining()) + { + _header.flip(); + byte[] headerBytes = new byte[MINIMUM_REQUIRED_HEADER_BYTES]; + _header.get(headerBytes); + + + Receiver<java.nio.ByteBuffer> newDelegate = null; + byte[] newestSupported = null; + + for(int i = 0; newDelegate == null && i < _creators.length; i++) + { + + if(_supported.contains(_creators[i].getVersion())) + { + newestSupported = _creators[i].getHeaderIdentifier(); + byte[] compareBytes = _creators[i].getHeaderIdentifier(); + boolean equal = true; + for(int j = 0; equal && j<compareBytes.length; j++) + { + equal = headerBytes[j] == compareBytes[j]; + } + if(equal) + { + newDelegate = _creators[i].getProtocolEngine(); + } + } + } + + // If no delegate is found then send back the most recent support protocol version id + if(newDelegate == null) + { + _sender.send(ByteBuffer.wrap(newestSupported)); + _sender.close(); + _delegate = new ClosedDelegateProtocolEngine(); + } + else + { + _delegate = newDelegate; + + _header.flip(); + _delegate.received(_header); + if(msg.hasRemaining()) + { + _delegate.received(msg); + } + } + } + } + + public void exception(Throwable t) + { + _logger.error("Error establishing session", t); + } + + public void closed() + { + + } + } +} |