summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java346
1 files changed, 346 insertions, 0 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
new file mode 100644
index 0000000000..54cd709af3
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
@@ -0,0 +1,346 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.transport;
+
+import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.*;
+
+import java.text.MessageFormat;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.configuration.ConnectionConfig;
+import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.actors.GenericActor;
+import org.apache.qpid.server.logging.messages.ConnectionMessages;
+import org.apache.qpid.server.protocol.AMQConnectionModel;
+import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.stats.StatisticsCounter;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.transport.Connection;
+import org.apache.qpid.transport.ConnectionCloseCode;
+import org.apache.qpid.transport.ExecutionErrorCode;
+import org.apache.qpid.transport.ExecutionException;
+import org.apache.qpid.transport.Method;
+import org.apache.qpid.transport.ProtocolEvent;
+import org.apache.qpid.transport.Session;
+
+public class ServerConnection extends Connection implements AMQConnectionModel, LogSubject
+{
+ private ConnectionConfig _config;
+ private Runnable _onOpenTask;
+ private AtomicBoolean _logClosed = new AtomicBoolean(false);
+ private LogActor _actor = GenericActor.getInstance(this);
+
+ private ApplicationRegistry _registry;
+ private boolean _statisticsEnabled = false;
+ private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
+
+ public ServerConnection()
+ {
+
+ }
+
+ public UUID getId()
+ {
+ return _config.getId();
+ }
+
+ @Override
+ protected void invoke(Method method)
+ {
+ super.invoke(method);
+ }
+
+ @Override
+ protected void setState(State state)
+ {
+ super.setState(state);
+
+ if (state == State.OPEN)
+ {
+ if (_onOpenTask != null)
+ {
+ _onOpenTask.run();
+ }
+ _actor.message(ConnectionMessages.OPEN(getClientId(), "0-10", true, true));
+
+ getVirtualHost().getConnectionRegistry().registerConnection(this);
+ }
+
+ if (state == State.CLOSE_RCVD || state == State.CLOSED || state == State.CLOSING)
+ {
+ if(_virtualHost != null)
+ {
+ _virtualHost.getConnectionRegistry().deregisterConnection(this);
+ }
+ }
+
+ if (state == State.CLOSED)
+ {
+ logClosed();
+ }
+ }
+
+ protected void logClosed()
+ {
+ if(_logClosed.compareAndSet(false, true))
+ {
+ CurrentActor.get().message(this, ConnectionMessages.CLOSE());
+ }
+ }
+
+ @Override
+ public ServerConnectionDelegate getConnectionDelegate()
+ {
+ return (ServerConnectionDelegate) super.getConnectionDelegate();
+ }
+
+ public void setConnectionDelegate(ServerConnectionDelegate delegate)
+ {
+ super.setConnectionDelegate(delegate);
+ }
+
+ private VirtualHost _virtualHost;
+
+
+ public VirtualHost getVirtualHost()
+ {
+ return _virtualHost;
+ }
+
+ public void setVirtualHost(VirtualHost virtualHost)
+ {
+ _virtualHost = virtualHost;
+
+ initialiseStatistics();
+ }
+
+ public void setConnectionConfig(final ConnectionConfig config)
+ {
+ _config = config;
+ }
+
+ public ConnectionConfig getConfig()
+ {
+ return _config;
+ }
+
+ public void onOpen(final Runnable task)
+ {
+ _onOpenTask = task;
+ }
+
+ public void closeSession(AMQSessionModel session, AMQConstant cause, String message) throws AMQException
+ {
+ ExecutionException ex = new ExecutionException();
+ ExecutionErrorCode code = ExecutionErrorCode.INTERNAL_ERROR;
+ try
+ {
+ code = ExecutionErrorCode.get(cause.getCode());
+ }
+ catch (IllegalArgumentException iae)
+ {
+ // Ignore, already set to INTERNAL_ERROR
+ }
+ ex.setErrorCode(code);
+ ex.setDescription(message);
+ ((ServerSession)session).invoke(ex);
+
+ ((ServerSession)session).close();
+ }
+
+ public LogSubject getLogSubject()
+ {
+ return (LogSubject) this;
+ }
+
+ @Override
+ public void received(ProtocolEvent event)
+ {
+ if (event.isConnectionControl())
+ {
+ CurrentActor.set(_actor);
+ }
+ else
+ {
+ ServerSession channel = (ServerSession) getSession(event.getChannel());
+ LogActor channelActor = null;
+
+ if (channel != null)
+ {
+ channelActor = channel.getLogActor();
+ }
+
+ CurrentActor.set(channelActor == null ? _actor : channelActor);
+ }
+
+ try
+ {
+ super.received(event);
+ }
+ finally
+ {
+ CurrentActor.remove();
+ }
+ }
+
+ public String toLogString()
+ {
+ boolean hasVirtualHost = (null != this.getVirtualHost());
+ boolean hasPrincipal = (null != getAuthorizationID());
+
+ if (hasPrincipal && hasVirtualHost)
+ {
+ return "[" +
+ MessageFormat.format(CONNECTION_FORMAT,
+ getConnectionId(),
+ getClientId(),
+ getConfig().getAddress(),
+ getVirtualHost().getName())
+ + "] ";
+ }
+ else if (hasPrincipal)
+ {
+ return "[" +
+ MessageFormat.format(USER_FORMAT,
+ getConnectionId(),
+ getClientId(),
+ getConfig().getAddress())
+ + "] ";
+
+ }
+ else
+ {
+ return "[" +
+ MessageFormat.format(SOCKET_FORMAT,
+ getConnectionId(),
+ getConfig().getAddress())
+ + "] ";
+ }
+ }
+
+ public LogActor getLogActor()
+ {
+ return _actor;
+ }
+
+ @Override
+ public void close(AMQConstant cause, String message) throws AMQException
+ {
+ ConnectionCloseCode replyCode = ConnectionCloseCode.NORMAL;
+ try
+ {
+ replyCode = ConnectionCloseCode.get(cause.getCode());
+ }
+ catch (IllegalArgumentException iae)
+ {
+ // Ignore
+ }
+ close(replyCode, message);
+ }
+
+ @Override
+ public List<AMQSessionModel> getSessionModels()
+ {
+ List<AMQSessionModel> sessions = new ArrayList<AMQSessionModel>();
+ for (Session ssn : getChannels())
+ {
+ sessions.add((AMQSessionModel) ssn);
+ }
+ return sessions;
+ }
+
+ public void registerMessageDelivered(long messageSize)
+ {
+ if (isStatisticsEnabled())
+ {
+ _messagesDelivered.registerEvent(1L);
+ _dataDelivered.registerEvent(messageSize);
+ }
+ _virtualHost.registerMessageDelivered(messageSize);
+ }
+
+ public void registerMessageReceived(long messageSize, long timestamp)
+ {
+ if (isStatisticsEnabled())
+ {
+ _messagesReceived.registerEvent(1L, timestamp);
+ _dataReceived.registerEvent(messageSize, timestamp);
+ }
+ _virtualHost.registerMessageReceived(messageSize, timestamp);
+ }
+
+ public StatisticsCounter getMessageReceiptStatistics()
+ {
+ return _messagesReceived;
+ }
+
+ public StatisticsCounter getDataReceiptStatistics()
+ {
+ return _dataReceived;
+ }
+
+ public StatisticsCounter getMessageDeliveryStatistics()
+ {
+ return _messagesDelivered;
+ }
+
+ public StatisticsCounter getDataDeliveryStatistics()
+ {
+ return _dataDelivered;
+ }
+
+ public void resetStatistics()
+ {
+ _messagesDelivered.reset();
+ _dataDelivered.reset();
+ _messagesReceived.reset();
+ _dataReceived.reset();
+ }
+
+ public void initialiseStatistics()
+ {
+ setStatisticsEnabled(!StatisticsCounter.DISABLE_STATISTICS &&
+ _virtualHost.getApplicationRegistry().getConfiguration().isStatisticsGenerationConnectionsEnabled());
+
+ _messagesDelivered = new StatisticsCounter("messages-delivered-" + getConnectionId());
+ _dataDelivered = new StatisticsCounter("data-delivered-" + getConnectionId());
+ _messagesReceived = new StatisticsCounter("messages-received-" + getConnectionId());
+ _dataReceived = new StatisticsCounter("data-received-" + getConnectionId());
+ }
+
+ public boolean isStatisticsEnabled()
+ {
+ return _statisticsEnabled;
+ }
+
+ public void setStatisticsEnabled(boolean enabled)
+ {
+ _statisticsEnabled = enabled;
+ }
+}