summaryrefslogtreecommitdiff
path: root/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_Internal_to_v1_0.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_Internal_to_v1_0.java')
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_Internal_to_v1_0.java140
1 files changed, 140 insertions, 0 deletions
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_Internal_to_v1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_Internal_to_v1_0.java
new file mode 100644
index 0000000000..f02908391a
--- /dev/null
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_Internal_to_v1_0.java
@@ -0,0 +1,140 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0;
+
+import org.apache.qpid.amqp_1_0.messaging.SectionEncoder;
+import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.Section;
+import org.apache.qpid.amqp_1_0.type.UnsignedByte;
+import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
+import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue;
+import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties;
+import org.apache.qpid.amqp_1_0.type.messaging.Data;
+import org.apache.qpid.amqp_1_0.type.messaging.Header;
+import org.apache.qpid.amqp_1_0.type.messaging.Properties;
+import org.apache.qpid.server.message.internal.InternalMessage;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+
+public class MessageConverter_Internal_to_v1_0 extends MessageConverter_to_1_0<InternalMessage>
+{
+ private static final Charset UTF_8 = Charset.forName("UTF-8");
+
+
+ public Class<InternalMessage> getInputClass()
+ {
+ return InternalMessage.class;
+ }
+
+
+ @Override
+ protected MessageMetaData_1_0 convertMetaData(final InternalMessage serverMessage,
+ final SectionEncoder sectionEncoder)
+ {
+ List<Section> sections = new ArrayList<Section>(3);
+ Header header = new Header();
+
+ header.setDurable(serverMessage.isPersistent());
+ header.setPriority(UnsignedByte.valueOf(serverMessage.getMessageHeader().getPriority()));
+ if(serverMessage.getExpiration() != 0l && serverMessage.getArrivalTime() !=0l && serverMessage.getExpiration() >= serverMessage.getArrivalTime())
+ {
+ header.setTtl(UnsignedInteger.valueOf(serverMessage.getExpiration()-serverMessage.getArrivalTime()));
+ }
+
+ sections.add(header);
+
+ Properties properties = new Properties();
+ properties.setCorrelationId(serverMessage.getMessageHeader().getCorrelationId());
+ properties.setCreationTime(new Date(serverMessage.getMessageHeader().getTimestamp()));
+ properties.setMessageId(serverMessage.getMessageHeader().getMessageId());
+ final String userId = serverMessage.getMessageHeader().getUserId();
+ if(userId != null)
+ {
+ properties.setUserId(new Binary(userId.getBytes(UTF_8)));
+ }
+ properties.setReplyTo(serverMessage.getMessageHeader().getReplyTo());
+
+ sections.add(properties);
+
+ if(!serverMessage.getMessageHeader().getHeaderNames().isEmpty())
+ {
+ ApplicationProperties applicationProperties = new ApplicationProperties(serverMessage.getMessageHeader().getHeaderMap() );
+ sections.add(applicationProperties);
+ }
+ return new MessageMetaData_1_0(sections, sectionEncoder);
+
+ }
+
+ protected Section getBodySection(final InternalMessage serverMessage, final String mimeType)
+ {
+ return convertToBody(serverMessage.getMessageBody());
+ }
+
+
+ @Override
+ public String getType()
+ {
+ return "Internal to v1-0";
+ }
+
+
+ public Section convertToBody(Object object)
+ {
+ if(object instanceof String)
+ {
+ return new AmqpValue(object);
+ }
+ else if(object instanceof byte[])
+ {
+ return new Data(new Binary((byte[])object));
+ }
+ else if(object instanceof Map)
+ {
+ return new AmqpValue(MessageConverter_to_1_0.fixMapValues((Map)object));
+ }
+ else if(object instanceof List)
+ {
+ return new AmqpValue(MessageConverter_to_1_0.fixListValues((List)object));
+ }
+ else
+ {
+ ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+ try
+ {
+ ObjectOutputStream os = new ObjectOutputStream(bytesOut);
+ os.writeObject(object);
+ return new Data(new Binary(bytesOut.toByteArray()));
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+}