summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2007-11-07 12:36:36 +0000
committerMartin Ritchie <ritchiem@apache.org>2007-11-07 12:36:36 +0000
commit793a6b31f7ad6b27f0683eef52ae92e05d7f8fee (patch)
tree0bbd157b996c54e27bfbaadf6de6f0153909d630
parentc346e8d12bd8a287695f9a427e22e9c837b866cb (diff)
downloadqpid-python-793a6b31f7ad6b27f0683eef52ae92e05d7f8fee.tar.gz
QPID-160 Addition of JMSXUserID to all messages through the java broker.
As this will cause the headers to be re-encoded it can be disabled in the config.xml. Default is enabled as the sample config.xml should have all features enabled so that testing can observe the interactions. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@592729 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker/etc/config.xml7
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java199
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java9
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java26
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java35
5 files changed, 161 insertions, 115 deletions
diff --git a/java/broker/etc/config.xml b/java/broker/etc/config.xml
index b5b81bbeb0..2257a612b3 100644
--- a/java/broker/etc/config.xml
+++ b/java/broker/etc/config.xml
@@ -8,9 +8,9 @@
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
- -
+ -
- http://www.apache.org/licenses/LICENSE-2.0
- -
+ -
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -50,6 +50,7 @@
<enableDirectBuffers>false</enableDirectBuffers>
<framesize>65535</framesize>
<compressBufferOnQueue>false</compressBufferOnQueue>
+ <enableJMSXUserID>true</enableJMSXUserID>
</advanced>
<security>
@@ -84,7 +85,7 @@
<jmx>
<access>${conf}/jmxremote.access</access>
<principal-database>passwordfile</principal-database>
- </jmx>
+ </jmx>
</security>
<virtualhosts>
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index d3b459c48a..17300d6b50 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -23,7 +23,9 @@ package org.apache.qpid.server;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
+import org.apache.qpid.configuration.Configured;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.FieldTable;
@@ -43,6 +45,7 @@ import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.txn.LocalTransactionalContext;
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.txn.TransactionalContext;
+import org.apache.qpid.server.configuration.Configurator;
import java.util.Collection;
import java.util.HashMap;
@@ -118,9 +121,17 @@ public class AMQChannel
private final AMQProtocolSession _session;
private boolean _closing;
+ @Configured(path = "advanced.enableJMSXUserID",
+ defaultValue = "true")
+ public boolean ENABLE_JMSXUserID;
+
+
public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore, MessageRouter exchanges)
- throws AMQException
+ throws AMQException
{
+ //Set values from configuration
+ Configurator.configure(this);
+
_session = session;
_channelId = channelId;
_storeContext = new StoreContext("Session: " + session.getClientIdentifier() + "; channel: " + channelId);
@@ -199,7 +210,7 @@ public class AMQChannel
}
public void publishContentHeader(ContentHeaderBody contentHeaderBody, AMQProtocolSession protocolSession)
- throws AMQException
+ throws AMQException
{
if (_currentMessage == null)
{
@@ -212,6 +223,16 @@ public class AMQChannel
_log.trace(debugIdentity() + "Content header received on channel " + _channelId);
}
+ if (ENABLE_JMSXUserID)
+ {
+ //Set JMSXUserID
+ BasicContentHeaderProperties properties = (BasicContentHeaderProperties) contentHeaderBody.properties;
+ //fixme: fudge for QPID-677
+ properties.getHeaders().keySet();
+
+ properties.setUserId(protocolSession.getAuthorizedID().getName());
+ }
+
_currentMessage.setContentHeaderBody(contentHeaderBody);
_currentMessage.setExpiration();
@@ -245,8 +266,8 @@ public class AMQChannel
// returns true iff the message was delivered (i.e. if all data was
// received
if (_currentMessage.addContentBodyFrame(_storeContext,
- protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToContentChunk(
- contentBody)))
+ protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToContentChunk(
+ contentBody)))
{
// callback to allow the context to do any post message processing
// primary use is to allow message return processing in the non-tx case
@@ -303,7 +324,7 @@ public class AMQChannel
* @throws AMQException if something goes wrong
*/
public AMQShortString subscribeToQueue(AMQShortString tag, AMQQueue queue, AMQProtocolSession session, boolean acks,
- FieldTable filters, boolean noLocal, boolean exclusive) throws AMQException, ConsumerTagNotUniqueException
+ FieldTable filters, boolean noLocal, boolean exclusive) throws AMQException, ConsumerTagNotUniqueException
{
if (tag == null)
{
@@ -327,18 +348,19 @@ public class AMQChannel
{
_log.debug("Unacked Map Dump size:" + _unacknowledgedMessageMap.size());
_unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
- {
+ {
- public boolean callback(UnacknowledgedMessage message) throws AMQException
- {
- _log.debug(message);
+ public boolean callback(UnacknowledgedMessage message) throws AMQException
+ {
+ _log.debug(message);
- return true;
- }
+ return true;
+ }
- public void visitComplete()
- { }
- });
+ public void visitComplete()
+ {
+ }
+ });
}
AMQQueue q = _consumerTag2QueueMap.remove(consumerTag);
@@ -418,7 +440,7 @@ public class AMQChannel
if (_log.isDebugEnabled())
{
_log.debug(debugIdentity() + " Adding unacked message(" + message.toString() + " DT:" + deliveryTag
- + ") with a queue(" + queue + ") for " + consumerTag);
+ + ") with a queue(" + queue + ") for " + consumerTag);
}
}
}
@@ -464,7 +486,7 @@ public class AMQChannel
// if (_nonTransactedContext == null)
{
_nonTransactedContext =
- new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages, _browsedAcks);
+ new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages, _browsedAcks);
}
deliveryContext = _nonTransactedContext;
@@ -527,7 +549,7 @@ public class AMQChannel
// if (_nonTransactedContext == null)
{
_nonTransactedContext =
- new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages, _browsedAcks);
+ new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages, _browsedAcks);
}
deliveryContext = _nonTransactedContext;
@@ -547,7 +569,7 @@ public class AMQChannel
else
{
_log.warn(System.identityHashCode(this) + " Requested requeue of message(" + unacked.message.debugIdentity()
- + "):" + deliveryTag + " but no queue defined and no DeadLetter queue so DROPPING message.");
+ + "):" + deliveryTag + " but no queue defined and no DeadLetter queue so DROPPING message.");
// _log.error("Requested requeue of message:" + deliveryTag +
// " but no queue defined using DeadLetter queue:" + getDeadLetterQueue());
//
@@ -558,25 +580,26 @@ public class AMQChannel
else
{
_log.warn("Requested requeue of message:" + deliveryTag + " but no such delivery tag exists."
- + _unacknowledgedMessageMap.size());
+ + _unacknowledgedMessageMap.size());
if (_log.isDebugEnabled())
{
_unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
- {
- int count = 0;
+ {
+ int count = 0;
- public boolean callback(UnacknowledgedMessage message) throws AMQException
- {
- _log.debug(
+ public boolean callback(UnacknowledgedMessage message) throws AMQException
+ {
+ _log.debug(
(count++) + ": (" + message.message.debugIdentity() + ")" + "[" + message.deliveryTag + "]");
- return false; // Continue
- }
+ return false; // Continue
+ }
- public void visitComplete()
- { }
- });
+ public void visitComplete()
+ {
+ }
+ });
}
}
@@ -603,53 +626,54 @@ public class AMQChannel
// Marking messages who still have a consumer for to be resent
// and those that don't to be requeued.
_unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
+ {
+ public boolean callback(UnacknowledgedMessage message) throws AMQException
{
- public boolean callback(UnacknowledgedMessage message) throws AMQException
+ AMQShortString consumerTag = message.consumerTag;
+ AMQMessage msg = message.message;
+ msg.setRedelivered(true);
+ if (consumerTag != null)
{
- AMQShortString consumerTag = message.consumerTag;
- AMQMessage msg = message.message;
- msg.setRedelivered(true);
- if (consumerTag != null)
+ // Consumer exists
+ if (_consumerTag2QueueMap.containsKey(consumerTag))
{
- // Consumer exists
- if (_consumerTag2QueueMap.containsKey(consumerTag))
- {
- msgToResend.add(message);
- }
- else // consumer has gone
- {
- msgToRequeue.add(message);
- }
+ msgToResend.add(message);
}
- else
+ else // consumer has gone
+ {
+ msgToRequeue.add(message);
+ }
+ }
+ else
+ {
+ // Message has no consumer tag, so was "delivered" to a GET
+ // or consumer no longer registered
+ // cannot resend, so re-queue.
+ if (message.queue != null)
{
- // Message has no consumer tag, so was "delivered" to a GET
- // or consumer no longer registered
- // cannot resend, so re-queue.
- if (message.queue != null)
+ if (requeue)
{
- if (requeue)
- {
- msgToRequeue.add(message);
- }
- else
- {
- _log.info("No DeadLetter Queue and requeue not requested so dropping message:" + message);
- }
+ msgToRequeue.add(message);
}
else
{
- _log.info("Message.queue is null and no DeadLetter Queue so dropping message:" + message);
+ _log.info("No DeadLetter Queue and requeue not requested so dropping message:" + message);
}
}
-
- // false means continue processing
- return false;
+ else
+ {
+ _log.info("Message.queue is null and no DeadLetter Queue so dropping message:" + message);
+ }
}
- public void visitComplete()
- { }
- });
+ // false means continue processing
+ return false;
+ }
+
+ public void visitComplete()
+ {
+ }
+ });
// Process Messages to Resend
if (_log.isDebugEnabled())
@@ -704,7 +728,7 @@ public class AMQChannel
if (_log.isDebugEnabled())
{
_log.debug("Subscription(" + System.identityHashCode(sub)
- + ") closed during resend so requeuing message");
+ + ") closed during resend so requeuing message");
}
// move this message to requeue
msgToRequeue.add(message);
@@ -714,7 +738,7 @@ public class AMQChannel
if (_log.isDebugEnabled())
{
_log.debug("Requeuing " + msg.debugIdentity() + " for resend via sub:"
- + System.identityHashCode(sub));
+ + System.identityHashCode(sub));
}
sub.addToResendQueue(msg);
@@ -728,7 +752,7 @@ public class AMQChannel
if (_log.isInfoEnabled())
{
_log.info("DeliveredSubscription not recorded so just requeueing(" + message.toString()
- + ")to prevent loss");
+ + ")to prevent loss");
}
// move this message to requeue
msgToRequeue.add(message);
@@ -752,7 +776,7 @@ public class AMQChannel
if (_nonTransactedContext == null)
{
_nonTransactedContext =
- new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages, _browsedAcks);
+ new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages, _browsedAcks);
}
deliveryContext = _nonTransactedContext;
@@ -786,29 +810,30 @@ public class AMQChannel
public void queueDeleted(final AMQQueue queue) throws AMQException
{
_unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
+ {
+ public boolean callback(UnacknowledgedMessage message) throws AMQException
{
- public boolean callback(UnacknowledgedMessage message) throws AMQException
+ if (message.queue == queue)
{
- if (message.queue == queue)
+ try
{
- try
- {
- message.discard(_storeContext);
- message.queue = null;
- }
- catch (AMQException e)
- {
- _log.error(
+ message.discard(_storeContext);
+ message.queue = null;
+ }
+ catch (AMQException e)
+ {
+ _log.error(
"Error decrementing ref count on message " + message.message.getMessageId() + ": " + e, e);
- }
}
-
- return false;
}
- public void visitComplete()
- { }
- });
+ return false;
+ }
+
+ public void visitComplete()
+ {
+ }
+ });
}
/**
@@ -856,8 +881,8 @@ public class AMQChannel
boolean suspend;
suspend =
- ((_prefetch_HighWaterMark != 0) && (_unacknowledgedMessageMap.size() >= _prefetch_HighWaterMark))
- || ((_prefetchSize != 0) && (_prefetchSize < _unacknowledgedMessageMap.getUnacknowledgeBytes()));
+ ((_prefetch_HighWaterMark != 0) && (_unacknowledgedMessageMap.size() >= _prefetch_HighWaterMark))
+ || ((_prefetchSize != 0) && (_prefetchSize < _unacknowledgedMessageMap.getUnacknowledgeBytes()));
setSuspended(suspend);
}
@@ -942,7 +967,7 @@ public class AMQChannel
{
AMQMessage message = bouncedMessage.getAMQMessage();
session.getProtocolOutputConverter().writeReturn(message, _channelId, bouncedMessage.getReplyCode().getCode(),
- new AMQShortString(bouncedMessage.getMessage()));
+ new AMQShortString(bouncedMessage.getMessage()));
message.decrementReference(_storeContext);
}
@@ -959,7 +984,7 @@ public class AMQChannel
else
{
boolean willSuspend =
- ((_prefetch_HighWaterMark != 0) && ((_unacknowledgedMessageMap.size() + 1) > _prefetch_HighWaterMark));
+ ((_prefetch_HighWaterMark != 0) && ((_unacknowledgedMessageMap.size() + 1) > _prefetch_HighWaterMark));
if (!willSuspend)
{
final long unackedSize = _unacknowledgedMessageMap.getUnacknowledgeBytes();
diff --git a/java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java b/java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java
index a5e89ef4fc..07d5d5dd32 100644
--- a/java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java
+++ b/java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java
@@ -29,9 +29,10 @@ import org.apache.qpid.framing.AMQShortString;
public enum CustomJMSXProperty
{
JMS_AMQP_NULL,
- JMS_QPID_DESTTYPE,
+ JMS_QPID_DESTTYPE,
JMSXGroupID,
- JMSXGroupSeq;
+ JMSXGroupSeq,
+ JMSXUserID;
private final AMQShortString _nameAsShortString;
@@ -47,7 +48,7 @@ public enum CustomJMSXProperty
}
private static Enumeration _names;
-
+
public static synchronized Enumeration asEnumeration()
{
if(_names == null)
@@ -60,6 +61,6 @@ public enum CustomJMSXProperty
}
_names = Collections.enumeration(nameList);
}
- return _names;
+ return _names;
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
index 2dfeb19268..b029770946 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
@@ -73,11 +73,11 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
_headerAdapter = new JMSHeaderAdapter(((BasicContentHeaderProperties) _contentHeaderProperties).getHeaders());
_strictAMQP =
- Boolean.parseBoolean(System.getProperties().getProperty(AMQSession.STRICT_AMQP, AMQSession.STRICT_AMQP_DEFAULT));
+ Boolean.parseBoolean(System.getProperties().getProperty(AMQSession.STRICT_AMQP, AMQSession.STRICT_AMQP_DEFAULT));
}
protected AbstractJMSMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, AMQShortString exchange,
- AMQShortString routingKey, ByteBuffer data) throws AMQException
+ AMQShortString routingKey, ByteBuffer data) throws AMQException
{
this(contentHeader, deliveryTag);
@@ -201,7 +201,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
if (!(destination instanceof AMQDestination))
{
throw new IllegalArgumentException(
- "ReplyTo destination may only be an AMQDestination - passed argument was type " + destination.getClass());
+ "ReplyTo destination may only be an AMQDestination - passed argument was type " + destination.getClass());
}
final AMQDestination amqd = (AMQDestination) destination;
@@ -391,12 +391,26 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
public String getStringProperty(String propertyName) throws JMSException
{
- if (_strictAMQP)
+
+ if (propertyName.startsWith("JMSX"))
{
- throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+ //NOTE: if the JMSX Property is a non AMQP property then we must check _strictAMQP and throw as below.
+ if (propertyName.equals(CustomJMSXProperty.JMSXUserID.toString()))
+ {
+ return ((BasicContentHeaderProperties) _contentHeaderProperties).getUserIdAsString();
+ }
+
+ return null;
}
+ else
+ {
+ if (_strictAMQP)
+ {
+ throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+ }
- return getJmsHeaders().getString(propertyName);
+ return getJmsHeaders().getString(propertyName);
+ }
}
public Object getObjectProperty(String propertyName) throws JMSException
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java
index dce9667ff2..d2a7ba301b 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java
@@ -59,6 +59,7 @@ public class PropertyValueTest extends TestCase implements MessageListener
private final List<String> messages = new ArrayList<String>();
private int _count = 1;
public String _connectionString = "vm://:1";
+ private static final String USERNAME = "guest";
protected void setUp() throws Exception
{
@@ -109,7 +110,7 @@ public class PropertyValueTest extends TestCase implements MessageListener
_logger.error("Run Number:" + run++);
try
{
- init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "test"));
+ init(new AMQConnection(_connectionString, USERNAME, "guest", randomize("Client"), "test"));
}
catch (Exception e)
{
@@ -175,7 +176,7 @@ public class PropertyValueTest extends TestCase implements MessageListener
_logger.trace("Message:" + m);
Assert.assertEquals("Check temp queue has been set correctly", m.getJMSReplyTo().toString(),
- m.getStringProperty("TempQueue"));
+ m.getStringProperty("TempQueue"));
m.setJMSType("Test");
m.setLongProperty("UnsignedInt", (long) 4294967295L);
@@ -210,7 +211,7 @@ public class PropertyValueTest extends TestCase implements MessageListener
try
{
((AMQMessage) m).setDecimalProperty(new AMQShortString("decimal-bad-scale"),
- bd.setScale(Byte.MAX_VALUE + 1));
+ bd.setScale(Byte.MAX_VALUE + 1));
fail("UnsupportedOperationException should be thrown as scale can't be correctly transmitted");
}
catch (UnsupportedOperationException uoe)
@@ -248,47 +249,51 @@ public class PropertyValueTest extends TestCase implements MessageListener
Assert.assertEquals("Check Boolean properties are correctly transported", true, m.getBooleanProperty("Bool"));
Assert.assertEquals("Check Byte properties are correctly transported", (byte) Byte.MAX_VALUE,
- m.getByteProperty("Byte"));
+ m.getByteProperty("Byte"));
Assert.assertEquals("Check Double properties are correctly transported", (double) Double.MAX_VALUE,
- m.getDoubleProperty("Double"));
+ m.getDoubleProperty("Double"));
Assert.assertEquals("Check Float properties are correctly transported", (float) Float.MAX_VALUE,
- m.getFloatProperty("Float"));
+ m.getFloatProperty("Float"));
Assert.assertEquals("Check Int properties are correctly transported", (int) Integer.MAX_VALUE,
- m.getIntProperty("Int"));
+ m.getIntProperty("Int"));
Assert.assertEquals("Check CorrelationID properties are correctly transported", "Correlation",
- m.getJMSCorrelationID());
+ m.getJMSCorrelationID());
Assert.assertEquals("Check Priority properties are correctly transported", 8, m.getJMSPriority());
// Queue
Assert.assertEquals("Check ReplyTo properties are correctly transported", m.getStringProperty("TempQueue"),
- m.getJMSReplyTo().toString());
+ m.getJMSReplyTo().toString());
Assert.assertEquals("Check Type properties are correctly transported", "Test", m.getJMSType());
Assert.assertEquals("Check Short properties are correctly transported", (short) Short.MAX_VALUE,
- m.getShortProperty("Short"));
+ m.getShortProperty("Short"));
Assert.assertEquals("Check UnsignedInt properties are correctly transported", (long) 4294967295L,
- m.getLongProperty("UnsignedInt"));
+ m.getLongProperty("UnsignedInt"));
Assert.assertEquals("Check Long properties are correctly transported", (long) Long.MAX_VALUE,
- m.getLongProperty("Long"));
+ m.getLongProperty("Long"));
Assert.assertEquals("Check String properties are correctly transported", "Test", m.getStringProperty("String"));
// AMQP Tests Specific values
Assert.assertEquals("Check Timestamp properties are correctly transported", m.getStringProperty("time-str"),
- ((AMQMessage) m).getTimestampProperty(new AMQShortString("time")).toString());
+ ((AMQMessage) m).getTimestampProperty(new AMQShortString("time")).toString());
// Decimal
BigDecimal bd = new BigDecimal(Integer.MAX_VALUE);
Assert.assertEquals("Check decimal properties are correctly transported", bd.setScale(Byte.MAX_VALUE),
- ((AMQMessage) m).getDecimalProperty(new AMQShortString("decimal")));
+ ((AMQMessage) m).getDecimalProperty(new AMQShortString("decimal")));
// Void
((AMQMessage) m).setVoidProperty(new AMQShortString("void"));
Assert.assertTrue("Check void properties are correctly transported",
- ((AMQMessage) m).getPropertyHeaders().containsKey("void"));
+ ((AMQMessage) m).getPropertyHeaders().containsKey("void"));
+
+ //JMSXUserID
+ Assert.assertEquals("Check 'JMSXUserID' is supported ", USERNAME,
+ m.getStringProperty("JMSXUserID"));
}
received.clear();