diff options
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java')
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java | 449 |
1 files changed, 449 insertions, 0 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java new file mode 100644 index 0000000000..ffd5e750b4 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java @@ -0,0 +1,449 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol; + +import java.io.PrintWriter; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.util.UUID; +import java.util.logging.Level; +import java.util.logging.Logger; +import javax.security.auth.callback.CallbackHandler; +import org.apache.qpid.amqp_1_0.codec.FrameWriter; +import org.apache.qpid.amqp_1_0.codec.ProtocolHandler; +import org.apache.qpid.amqp_1_0.framing.AMQFrame; +import org.apache.qpid.amqp_1_0.framing.OversizeFrameException; +import org.apache.qpid.amqp_1_0.framing.SASLFrameHandler; +import org.apache.qpid.amqp_1_0.transport.CallbackHandlerSource; +import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint; +import org.apache.qpid.amqp_1_0.transport.Container; +import org.apache.qpid.amqp_1_0.transport.FrameOutputHandler; +import org.apache.qpid.amqp_1_0.type.Binary; +import org.apache.qpid.amqp_1_0.type.FrameBody; +import org.apache.qpid.protocol.ServerProtocolEngine; +import org.apache.qpid.server.configuration.ConfigStore; +import org.apache.qpid.server.configuration.ConnectionConfigType; +import org.apache.qpid.server.protocol.v1_0.Connection_1_0; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.registry.IApplicationRegistry; +import org.apache.qpid.server.security.auth.manager.AuthenticationManager; +import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.network.NetworkConnection; + +public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOutputHandler +{ + private long _readBytes; + private long _writtenBytes; + private final UUID _id; + private final IApplicationRegistry _appRegistry; + private long _createTime = System.currentTimeMillis(); + private ConnectionEndpoint _conn; + private long _connectionId; + + private static final ByteBuffer HEADER = + ByteBuffer.wrap(new byte[] + { + (byte)'A', + (byte)'M', + (byte)'Q', + (byte)'P', + (byte) 3, + (byte) 1, + (byte) 0, + (byte) 0 + }); + + private static final ByteBuffer PROTOCOL_HEADER = + ByteBuffer.wrap(new byte[] + { + (byte)'A', + (byte)'M', + (byte)'Q', + (byte)'P', + (byte) 0, + (byte) 1, + (byte) 0, + (byte) 0 + }); + + + private FrameWriter _frameWriter; + private ProtocolHandler _frameHandler; + private ByteBuffer _buf = ByteBuffer.allocate(1024 * 1024); + private Object _sendLock = new Object(); + private byte _major; + private byte _minor; + private byte _revision; + private PrintWriter _out; + private NetworkConnection _network; + private Sender<ByteBuffer> _sender; + + + static enum State { + A, + M, + Q, + P, + PROTOCOL, + MAJOR, + MINOR, + REVISION, + FRAME + } + + private State _state = State.A; + + + public ProtocolEngine_1_0_0_SASL(final NetworkConnection networkDriver, final IApplicationRegistry appRegistry, + long id) + { + _id = appRegistry.getConfigStore().createId(); + _connectionId = id; + _appRegistry = appRegistry; + + if(networkDriver != null) + { + setNetworkConnection(networkDriver, networkDriver.getSender()); + } + } + + + public SocketAddress getRemoteAddress() + { + return _network.getRemoteAddress(); + } + + public SocketAddress getLocalAddress() + { + return _network.getLocalAddress(); + } + + public long getReadBytes() + { + return _readBytes; + } + + public long getWrittenBytes() + { + return _writtenBytes; + } + + public void writerIdle() + { + //Todo + } + + public void readerIdle() + { + //Todo + } + + public void setNetworkConnection(final NetworkConnection network, final Sender<ByteBuffer> sender) + { + _network = network; + _sender = sender; + + Container container = new Container(_appRegistry.getBrokerId().toString()); + + _conn = new ConnectionEndpoint(container, asCallbackHandlerSource(ApplicationRegistry.getInstance() + .getAuthenticationManager())); + _conn.setConnectionEventListener(new Connection_1_0(_appRegistry)); + _conn.setRemoteAddress(getRemoteAddress()); + + + _conn.setFrameOutputHandler(this); + _conn.setSaslFrameOutput(this); + + _conn.setOnSaslComplete(new Runnable() + { + + + public void run() + { + if(_conn.isAuthenticated()) + { + _sender.send(PROTOCOL_HEADER.duplicate()); + _sender.flush(); + } + else + { + _network.close(); + } + } + }); + _frameWriter = new FrameWriter(_conn.getDescribedTypeRegistry()); + _frameHandler = new SASLFrameHandler(_conn); + + _sender.send(HEADER.duplicate()); + _sender.flush(); + + _conn.initiateSASL(); + + + } + + private CallbackHandlerSource asCallbackHandlerSource(final AuthenticationManager authenticationManager) + { + return new CallbackHandlerSource() + { + @Override + public CallbackHandler getHandler(String mechanism) + { + return authenticationManager.getHandler(mechanism); + } + }; + } + + public String getAddress() + { + return getRemoteAddress().toString(); + } + + + public ConfigStore getConfigStore() + { + return _appRegistry.getConfigStore(); + } + + public UUID getId() + { + return _id; + } + + public ConnectionConfigType getConfigType() + { + return ConnectionConfigType.getInstance(); + } + + public boolean isDurable() + { + return false; + } + + private final Logger RAW_LOGGER = Logger.getLogger("RAW"); + + + public synchronized void received(ByteBuffer msg) + { + if(RAW_LOGGER.isLoggable(Level.FINE)) + { + ByteBuffer dup = msg.duplicate(); + byte[] data = new byte[dup.remaining()]; + dup.get(data); + Binary bin = new Binary(data); + RAW_LOGGER.fine("RECV[" + getRemoteAddress() + "] : " + bin.toString()); + } + _readBytes += msg.remaining(); + switch(_state) + { + case A: + if(msg.hasRemaining()) + { + msg.get(); + } + else + { + break; + } + case M: + if(msg.hasRemaining()) + { + msg.get(); + } + else + { + _state = State.M; + break; + } + + case Q: + if(msg.hasRemaining()) + { + msg.get(); + } + else + { + _state = State.Q; + break; + } + case P: + if(msg.hasRemaining()) + { + msg.get(); + } + else + { + _state = State.P; + break; + } + case PROTOCOL: + if(msg.hasRemaining()) + { + msg.get(); + } + else + { + _state = State.PROTOCOL; + break; + } + case MAJOR: + if(msg.hasRemaining()) + { + _major = msg.get(); + } + else + { + _state = State.MAJOR; + break; + } + case MINOR: + if(msg.hasRemaining()) + { + _minor = msg.get(); + } + else + { + _state = State.MINOR; + break; + } + case REVISION: + if(msg.hasRemaining()) + { + _revision = msg.get(); + + _state = State.FRAME; + } + else + { + _state = State.REVISION; + break; + } + case FRAME: + if(msg.hasRemaining()) + { + _frameHandler = _frameHandler.parse(msg); + } + } + + } + + public void exception(Throwable t) + { + t.printStackTrace(); + } + + public void closed() + { + // todo + _conn.inputClosed(); + if(_conn != null && _conn.getConnectionEventListener() != null) + { + ((Connection_1_0)_conn.getConnectionEventListener()).closed(); + } + + } + + public long getCreateTime() + { + return _createTime; + } + + + public boolean canSend() + { + return true; + } + + public void send(final AMQFrame amqFrame) + { + send(amqFrame, null); + } + + private static final Logger FRAME_LOGGER = Logger.getLogger("FRM"); + + + public void send(final AMQFrame amqFrame, ByteBuffer buf) + { + + synchronized(_sendLock) + { + + if(FRAME_LOGGER.isLoggable(Level.FINE)) + { + FRAME_LOGGER.fine("SEND[" + getRemoteAddress() + "|" + amqFrame.getChannel() + "] : " + amqFrame.getFrameBody()); + } + + + _frameWriter.setValue(amqFrame); + + + + ByteBuffer dup = ByteBuffer.allocate(_conn.getMaxFrameSize()); + + int size = _frameWriter.writeToBuffer(dup); + if(size > _conn.getMaxFrameSize()) + { + throw new OversizeFrameException(amqFrame,size); + } + + dup.flip(); + _writtenBytes += dup.limit(); + + if(RAW_LOGGER.isLoggable(Level.FINE)) + { + ByteBuffer dup2 = dup.duplicate(); + byte[] data = new byte[dup2.remaining()]; + dup2.get(data); + Binary bin = new Binary(data); + RAW_LOGGER.fine("SEND[" + getRemoteAddress() + "] : " + bin.toString()); + } + + + _sender.send(dup); + _sender.flush(); + + + } + } + + public void send(short channel, FrameBody body) + { + AMQFrame frame = AMQFrame.createAMQFrame(channel, body); + send(frame); + + } + + public void close() + { + _sender.close(); + } + + public void setLogOutput(final PrintWriter out) + { + _out = out; + } + + public long getConnectionId() + { + return _connectionId; + } + +} |