diff options
author | Andrew Donald Kennedy <grkvlt@apache.org> | 2010-09-27 15:23:04 +0000 |
---|---|---|
committer | Andrew Donald Kennedy <grkvlt@apache.org> | 2010-09-27 15:23:04 +0000 |
commit | e1c164dd219a1d81dba0ad7f9ac185b0292a5660 (patch) | |
tree | 4531f83f34f0a2ac7317fda0711692ba0f536b28 | |
parent | b89747e29966e0d78e59974ae54ee5ee5a91c669 (diff) | |
download | qpid-python-e1c164dd219a1d81dba0ad7f9ac185b0292a5660.tar.gz |
QPID-2801: Implement LogSubject Interface in 0-10 Subscription/ServerConnection/ServerSession Objects
Committed patch from Sorins <ssuciu@gmail.com>
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1001779 13f79535-47bb-0310-9956-ffa450edef68
3 files changed, 135 insertions, 18 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java index 2510c12eae..9f97d479e3 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java @@ -20,41 +20,64 @@ */ package org.apache.qpid.server.subscription; +import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SUBSCRIPTION_FORMAT; +import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.QUEUE_FORMAT; + import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.configuration.ConfigStore; +import org.apache.qpid.server.configuration.ConfiguredObject; +import org.apache.qpid.server.configuration.SessionConfig; +import org.apache.qpid.server.configuration.SubscriptionConfig; +import org.apache.qpid.server.configuration.SubscriptionConfigType; import org.apache.qpid.server.flow.FlowCreditManager; import org.apache.qpid.server.flow.CreditCreditManager; import org.apache.qpid.server.flow.WindowCreditManager; import org.apache.qpid.server.flow.FlowCreditManager_0_10; import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.logging.actors.SubscriptionActor; -import org.apache.qpid.server.logging.subjects.SubscriptionLogSubject; -import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.logging.messages.SubscriptionMessages; import org.apache.qpid.server.logging.LogActor; +import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.logging.actors.SubscriptionActor; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.message.MessageTransferMessage; import org.apache.qpid.server.message.AMQMessage; import org.apache.qpid.server.transport.ServerSession; -import org.apache.qpid.server.configuration.*; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.txn.AutoCommitTransaction; +import org.apache.qpid.transport.DeliveryProperties; +import org.apache.qpid.transport.Header; +import org.apache.qpid.transport.MessageAcceptMode; +import org.apache.qpid.transport.MessageAcquireMode; +import org.apache.qpid.transport.MessageCreditUnit; +import org.apache.qpid.transport.MessageDeliveryPriority; +import org.apache.qpid.transport.MessageFlowMode; +import org.apache.qpid.transport.MessageProperties; +import org.apache.qpid.transport.MessageTransfer; +import org.apache.qpid.transport.Method; +import org.apache.qpid.transport.Struct; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.AMQException; -import org.apache.qpid.transport.*; +import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.ConcurrentHashMap; -import java.util.*; import java.nio.ByteBuffer; -public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCreditManagerListener, SubscriptionConfig +public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCreditManagerListener, SubscriptionConfig, LogSubject { private static final AtomicLong idGenerator = new AtomicLong(0); @@ -94,7 +117,6 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr private ConcurrentHashMap<Integer, QueueEntry> _sentMap = new ConcurrentHashMap<Integer, QueueEntry>(); private static final Struct[] EMPTY_STRUCT_ARRAY = new Struct[0]; - private LogSubject _logSubject; private LogActor _logActor; private Map<String, Object> _properties = new ConcurrentHashMap<String, Object>(); private UUID _id; @@ -157,7 +179,6 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr _trace = (String) arguments.get("qpid.trace.id"); _id = getConfigStore().createId(); getConfigStore().addConfiguredObject(this); - _logSubject = new SubscriptionLogSubject(this); _logActor = new SubscriptionActor(CurrentActor.get().getRootMessageLogger(), this); } @@ -874,4 +895,49 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr { return _createTime; } + + public String toLogString() + { + String queueInfo = MessageFormat.format(QUEUE_FORMAT, _queue.getVirtualHost().getName(), + _queue.getNameShortString()); + String result = "[" + MessageFormat.format(SUBSCRIPTION_FORMAT, getSubscriptionID()) + "(" + // queueString is "vh(/{0})/qu({1}) " so need to trim + + queueInfo.substring(0, queueInfo.length() - 1) + ")" + "] "; + return result; + } + + private String getFilterLogString() + { + StringBuilder filterLogString = new StringBuilder(); + String delimiter = ", "; + boolean hasEntries = false; + if (_filters != null && _filters.hasFilters()) + { + filterLogString.append(_filters.toString()); + hasEntries = true; + } + + if (isBrowser()) + { + if (hasEntries) + { + filterLogString.append(delimiter); + } + filterLogString.append("Browser"); + hasEntries = true; + } + + if (isDurable()) + { + if (hasEntries) + { + filterLogString.append(delimiter); + } + filterLogString.append("Durable"); + hasEntries = true; + } + + return filterLogString.toString(); + } + } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java index 8c7b374791..3c924f3231 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java @@ -20,6 +20,11 @@ */ package org.apache.qpid.server.transport; +import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT; +import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CONNECTION_FORMAT; +import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SOCKET_FORMAT; +import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.USER_FORMAT; + import org.apache.qpid.server.configuration.ConnectionConfig; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.protocol.AMQConnectionModel; @@ -38,6 +43,8 @@ import org.apache.qpid.transport.ExecutionErrorCode; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.AMQException; +import java.text.MessageFormat; + public class ServerConnection extends Connection implements AMQConnectionModel { private ConnectionConfig _config; @@ -120,4 +127,39 @@ public class ServerConnection extends Connection implements AMQConnectionModel ((ServerSession)session).close(); } + + public String toLogString() { + boolean hasVirtualHost = (null != this.getVirtualHost()); + boolean hasPrincipal = (null != getAuthorizationID()); + + if (hasPrincipal && hasVirtualHost) + { + return " [" + + MessageFormat.format(CONNECTION_FORMAT, + getConnectionId(), + getAuthorizationID(), + _config.getAddress(), + getVirtualHost().getName()) + + "] "; + } + else if (hasPrincipal) + { + return " [" + + MessageFormat.format(USER_FORMAT, + getConnectionId(), + getAuthorizationID(), + _config.getAddress()) + + "] "; + + } + else + { + return " [" + + MessageFormat.format(SOCKET_FORMAT, + this.getConnectionId(), + _config.getAddress()) + + "] "; + } + } + } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java index 7393b17243..1f4e32a3e0 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java @@ -20,9 +20,13 @@ */ package org.apache.qpid.server.transport; +import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT; +import static org.apache.qpid.util.Serial.gt; + import com.sun.security.auth.UserPrincipal; import org.apache.qpid.AMQException; +import org.apache.qpid.protocol.ProtocolEngine; import org.apache.qpid.server.configuration.ConfigStore; import org.apache.qpid.server.configuration.ConfiguredObject; import org.apache.qpid.server.configuration.ConnectionConfig; @@ -51,10 +55,10 @@ import org.apache.qpid.transport.Range; import org.apache.qpid.transport.RangeSet; import org.apache.qpid.transport.Session; import org.apache.qpid.transport.SessionDelegate; -import static org.apache.qpid.util.Serial.gt; import java.lang.ref.WeakReference; import java.security.Principal; +import java.text.MessageFormat; import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; @@ -67,7 +71,7 @@ import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicLong; -public class ServerSession extends Session implements PrincipalHolder, SessionConfig, AMQSessionModel +public class ServerSession extends Session implements PrincipalHolder, SessionConfig, AMQSessionModel, LogSubject { private static final String NULL_DESTINTATION = UUID.randomUUID().toString(); @@ -581,14 +585,19 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo public LogSubject getLogSubject() { - return new LogSubject() - { - public String toLogString() - { - return "[ ]"; - } + return (LogSubject) this; + } + + @Override + public String toLogString() + { + return " [" + + MessageFormat.format(CHANNEL_FORMAT, getId().toString(), getClientID(), + ((ProtocolEngine) _connectionConfig).getRemoteAddress().toString(), + this.getVirtualHost().getName(), + this.getChannel()) + + "] "; - }; } } |