summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew Donald Kennedy <grkvlt@apache.org>2010-09-27 15:23:04 +0000
committerAndrew Donald Kennedy <grkvlt@apache.org>2010-09-27 15:23:04 +0000
commite1c164dd219a1d81dba0ad7f9ac185b0292a5660 (patch)
tree4531f83f34f0a2ac7317fda0711692ba0f536b28
parentb89747e29966e0d78e59974ae54ee5ee5a91c669 (diff)
downloadqpid-python-e1c164dd219a1d81dba0ad7f9ac185b0292a5660.tar.gz
QPID-2801: Implement LogSubject Interface in 0-10 Subscription/ServerConnection/ServerSession Objects
Committed patch from Sorins <ssuciu@gmail.com> git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1001779 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java84
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java42
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java27
3 files changed, 135 insertions, 18 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
index 2510c12eae..9f97d479e3 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
@@ -20,41 +20,64 @@
*/
package org.apache.qpid.server.subscription;
+import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SUBSCRIPTION_FORMAT;
+import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.QUEUE_FORMAT;
+
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.configuration.ConfigStore;
+import org.apache.qpid.server.configuration.ConfiguredObject;
+import org.apache.qpid.server.configuration.SessionConfig;
+import org.apache.qpid.server.configuration.SubscriptionConfig;
+import org.apache.qpid.server.configuration.SubscriptionConfigType;
import org.apache.qpid.server.flow.FlowCreditManager;
import org.apache.qpid.server.flow.CreditCreditManager;
import org.apache.qpid.server.flow.WindowCreditManager;
import org.apache.qpid.server.flow.FlowCreditManager_0_10;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.actors.SubscriptionActor;
-import org.apache.qpid.server.logging.subjects.SubscriptionLogSubject;
-import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.messages.SubscriptionMessages;
import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.actors.SubscriptionActor;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.message.MessageTransferMessage;
import org.apache.qpid.server.message.AMQMessage;
import org.apache.qpid.server.transport.ServerSession;
-import org.apache.qpid.server.configuration.*;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.txn.AutoCommitTransaction;
+import org.apache.qpid.transport.DeliveryProperties;
+import org.apache.qpid.transport.Header;
+import org.apache.qpid.transport.MessageAcceptMode;
+import org.apache.qpid.transport.MessageAcquireMode;
+import org.apache.qpid.transport.MessageCreditUnit;
+import org.apache.qpid.transport.MessageDeliveryPriority;
+import org.apache.qpid.transport.MessageFlowMode;
+import org.apache.qpid.transport.MessageProperties;
+import org.apache.qpid.transport.MessageTransfer;
+import org.apache.qpid.transport.Method;
+import org.apache.qpid.transport.Struct;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.AMQException;
-import org.apache.qpid.transport.*;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.*;
import java.nio.ByteBuffer;
-public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCreditManagerListener, SubscriptionConfig
+public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCreditManagerListener, SubscriptionConfig, LogSubject
{
private static final AtomicLong idGenerator = new AtomicLong(0);
@@ -94,7 +117,6 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
private ConcurrentHashMap<Integer, QueueEntry> _sentMap = new ConcurrentHashMap<Integer, QueueEntry>();
private static final Struct[] EMPTY_STRUCT_ARRAY = new Struct[0];
- private LogSubject _logSubject;
private LogActor _logActor;
private Map<String, Object> _properties = new ConcurrentHashMap<String, Object>();
private UUID _id;
@@ -157,7 +179,6 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
_trace = (String) arguments.get("qpid.trace.id");
_id = getConfigStore().createId();
getConfigStore().addConfiguredObject(this);
- _logSubject = new SubscriptionLogSubject(this);
_logActor = new SubscriptionActor(CurrentActor.get().getRootMessageLogger(), this);
}
@@ -874,4 +895,49 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
{
return _createTime;
}
+
+ public String toLogString()
+ {
+ String queueInfo = MessageFormat.format(QUEUE_FORMAT, _queue.getVirtualHost().getName(),
+ _queue.getNameShortString());
+ String result = "[" + MessageFormat.format(SUBSCRIPTION_FORMAT, getSubscriptionID()) + "("
+ // queueString is "vh(/{0})/qu({1}) " so need to trim
+ + queueInfo.substring(0, queueInfo.length() - 1) + ")" + "] ";
+ return result;
+ }
+
+ private String getFilterLogString()
+ {
+ StringBuilder filterLogString = new StringBuilder();
+ String delimiter = ", ";
+ boolean hasEntries = false;
+ if (_filters != null && _filters.hasFilters())
+ {
+ filterLogString.append(_filters.toString());
+ hasEntries = true;
+ }
+
+ if (isBrowser())
+ {
+ if (hasEntries)
+ {
+ filterLogString.append(delimiter);
+ }
+ filterLogString.append("Browser");
+ hasEntries = true;
+ }
+
+ if (isDurable())
+ {
+ if (hasEntries)
+ {
+ filterLogString.append(delimiter);
+ }
+ filterLogString.append("Durable");
+ hasEntries = true;
+ }
+
+ return filterLogString.toString();
+ }
+
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
index 8c7b374791..3c924f3231 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
@@ -20,6 +20,11 @@
*/
package org.apache.qpid.server.transport;
+import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT;
+import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CONNECTION_FORMAT;
+import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SOCKET_FORMAT;
+import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.USER_FORMAT;
+
import org.apache.qpid.server.configuration.ConnectionConfig;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.protocol.AMQConnectionModel;
@@ -38,6 +43,8 @@ import org.apache.qpid.transport.ExecutionErrorCode;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.AMQException;
+import java.text.MessageFormat;
+
public class ServerConnection extends Connection implements AMQConnectionModel
{
private ConnectionConfig _config;
@@ -120,4 +127,39 @@ public class ServerConnection extends Connection implements AMQConnectionModel
((ServerSession)session).close();
}
+
+ public String toLogString() {
+ boolean hasVirtualHost = (null != this.getVirtualHost());
+ boolean hasPrincipal = (null != getAuthorizationID());
+
+ if (hasPrincipal && hasVirtualHost)
+ {
+ return " [" +
+ MessageFormat.format(CONNECTION_FORMAT,
+ getConnectionId(),
+ getAuthorizationID(),
+ _config.getAddress(),
+ getVirtualHost().getName())
+ + "] ";
+ }
+ else if (hasPrincipal)
+ {
+ return " [" +
+ MessageFormat.format(USER_FORMAT,
+ getConnectionId(),
+ getAuthorizationID(),
+ _config.getAddress())
+ + "] ";
+
+ }
+ else
+ {
+ return " [" +
+ MessageFormat.format(SOCKET_FORMAT,
+ this.getConnectionId(),
+ _config.getAddress())
+ + "] ";
+ }
+ }
+
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
index 7393b17243..1f4e32a3e0 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
@@ -20,9 +20,13 @@
*/
package org.apache.qpid.server.transport;
+import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT;
+import static org.apache.qpid.util.Serial.gt;
+
import com.sun.security.auth.UserPrincipal;
import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.ProtocolEngine;
import org.apache.qpid.server.configuration.ConfigStore;
import org.apache.qpid.server.configuration.ConfiguredObject;
import org.apache.qpid.server.configuration.ConnectionConfig;
@@ -51,10 +55,10 @@ import org.apache.qpid.transport.Range;
import org.apache.qpid.transport.RangeSet;
import org.apache.qpid.transport.Session;
import org.apache.qpid.transport.SessionDelegate;
-import static org.apache.qpid.util.Serial.gt;
import java.lang.ref.WeakReference;
import java.security.Principal;
+import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
@@ -67,7 +71,7 @@ import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicLong;
-public class ServerSession extends Session implements PrincipalHolder, SessionConfig, AMQSessionModel
+public class ServerSession extends Session implements PrincipalHolder, SessionConfig, AMQSessionModel, LogSubject
{
private static final String NULL_DESTINTATION = UUID.randomUUID().toString();
@@ -581,14 +585,19 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo
public LogSubject getLogSubject()
{
- return new LogSubject()
- {
- public String toLogString()
- {
- return "[ ]";
- }
+ return (LogSubject) this;
+ }
+
+ @Override
+ public String toLogString()
+ {
+ return " [" +
+ MessageFormat.format(CHANNEL_FORMAT, getId().toString(), getClientID(),
+ ((ProtocolEngine) _connectionConfig).getRemoteAddress().toString(),
+ this.getVirtualHost().getName(),
+ this.getChannel())
+ + "] ";
- };
}
}