diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2014-09-14 21:36:08 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2014-09-14 21:36:08 +0000 |
commit | 3dbaae07a361295caabd4e4fdcef50d2ae62b9ce (patch) | |
tree | 5677bb13cd26c00ec7329487257a5545436e49f6 | |
parent | 33ec74425f6eab2f9fb7de6a3c4e73a21bcfdffb (diff) | |
download | qpid-python-3dbaae07a361295caabd4e4fdcef50d2ae62b9ce.tar.gz |
QPID-6098 : [JMS AMQP 1.0 Client] use content-type of incoming messages to drive JMS message type
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1624911 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageFactory.java | 200 |
1 files changed, 162 insertions, 38 deletions
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageFactory.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageFactory.java index c266f11e32..d120e4eadf 100644 --- a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageFactory.java +++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageFactory.java @@ -19,14 +19,22 @@ package org.apache.qpid.amqp_1_0.jms.impl; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import org.apache.qpid.amqp_1_0.client.Message; import org.apache.qpid.amqp_1_0.type.Binary; import org.apache.qpid.amqp_1_0.type.Section; +import org.apache.qpid.amqp_1_0.type.Symbol; import org.apache.qpid.amqp_1_0.type.messaging.AmqpSequence; import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue; import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties; @@ -133,23 +141,76 @@ class MessageFactory } else if(bodySection instanceof Data) { - if(properties != null && ObjectMessageImpl.CONTENT_TYPE.equals(properties.getContentType())) + Data dataSection = (Data) bodySection; + + Symbol contentType = properties == null ? null : properties.getContentType(); + + if(ObjectMessageImpl.CONTENT_TYPE.equals(contentType)) { message = new ObjectMessageImpl(header, deliveryAnnotations, messageAnnotations, properties, appProperties, - (Data) bodySection, + dataSection, footer, _session); } + else if(contentType != null) + { + ContentType contentTypeObj = parseContentType(contentType.toString()); + // todo : content encoding - e.g. gzip + String contentTypeType = contentTypeObj.getType(); + String contentTypeSubType = contentTypeObj.getSubType(); + if ("text".equals(contentTypeType) + || ("application".equals(contentTypeType) + && contentTypeSubType != null + && ("xml".equals(contentTypeSubType) + || "json".equals(contentTypeSubType) + || "xml-dtd".equals(contentTypeSubType) + || "javascript".equals(contentTypeSubType) + || contentTypeSubType.endsWith("+xml") + || contentTypeSubType.endsWith("+json")))) + { + Charset charset; + if (contentTypeObj.getParameterNames().contains("charset")) + { + charset = Charset.forName(contentTypeObj.getParameter("charset").toUpperCase()); + } + else + { + charset = StandardCharsets.US_ASCII; + } + Binary binary = dataSection.getValue(); + String data = + new String(binary.getArray(), binary.getArrayOffset(), binary.getLength(), charset); + message = new TextMessageImpl(header, deliveryAnnotations, messageAnnotations, properties, + appProperties, data, footer, _session); + } + else + { + message = new BytesMessageImpl(header, + deliveryAnnotations, + messageAnnotations, + properties, + appProperties, + dataSection, + footer, + _session); + } + } else { message = new BytesMessageImpl(header, deliveryAnnotations, - messageAnnotations, properties, appProperties, (Data) bodySection, footer, _session); + messageAnnotations, + properties, + appProperties, + dataSection, + footer, + _session); } + } else if(bodySection instanceof AmqpSequence) { @@ -158,63 +219,126 @@ class MessageFactory messageAnnotations, properties, appProperties, ((AmqpSequence) bodySection).getValue(), footer, _session ); } - - /*else if(bodySection instanceof AmqpDataSection) + else { - AmqpDataSection dataSection = (AmqpDataSection) bodySection; + message = new AmqpMessageImpl(header, + deliveryAnnotations, + messageAnnotations, properties,appProperties,body,footer, _session); + } + } + else + { + message = new AmqpMessageImpl(header, + deliveryAnnotations, + messageAnnotations, properties,appProperties,body,footer, _session); + } - List<Object> data = new ArrayList<Object>(); + message.setReadOnly(); - ListIterator<Object> dataIter = dataSection.iterator(); + return message; + } - while(dataIter.hasNext()) - { - data.add(dataIter.next()); - } - if(data.size() == 1) + static interface ContentType + { + String getType(); + String getSubType(); + Set<String> getParameterNames(); + String getParameter(String name); + } + + static ContentType parseContentType(String contentType) + { + int subTypeSeparator = contentType.indexOf("/"); + final String type = contentType.substring(0, subTypeSeparator).toLowerCase().trim(); + String subTypePart = contentType.substring(subTypeSeparator +1).toLowerCase().trim(); + if(subTypePart.contains(";")) + { + subTypePart = subTypePart.substring(0,subTypePart.indexOf(";")).trim(); + } + if(subTypePart.contains("(")) + { + subTypePart = subTypePart.substring(0,subTypePart.indexOf("(")).trim(); + } + final String subType = subTypePart; + final Map<String,String> parameters = new HashMap<>(); + + if(contentType.substring(subTypeSeparator +1).contains(";")) + { + parseContentTypeParameters(contentType.substring(contentType.indexOf(";",subTypeSeparator +1)+1), parameters); + } + return new ContentType() { - final Object obj = data.get(0); - if( obj instanceof String) + @Override + public String getType() { - message = new TextMessageImpl(header,properties,appProperties,(String) data.get(0),footer, _session); + return type; } - else if(obj instanceof JavaSerializable) + + @Override + public String getSubType() { - // TODO - ObjectMessage - message = new AmqpMessageImpl(header,properties,appProperties,body,footer, _session); + return subType; } - else if(obj instanceof Serializable) + + @Override + public Set<String> getParameterNames() { - message = new ObjectMessageImpl(header,properties,footer,appProperties,(Serializable)obj, _session); + return Collections.unmodifiableMap(parameters).keySet(); } - else + + @Override + public String getParameter(final String name) { - message = new AmqpMessageImpl(header,properties,appProperties,body,footer, _session); + return parameters.get(name); } + }; + } + + private static void parseContentTypeParameters(final String parameterString, final Map<String, String> parameters) + { + int equalsIndex = parameterString.indexOf("="); + if(equalsIndex != -1) + { + String paramName = parameterString.substring(0,equalsIndex).trim(); + String valuePart = equalsIndex == parameterString.length() - 1 ? "" : parameterString.substring(equalsIndex+1).trim(); + String remainder; + if(valuePart.startsWith("\"")) + { + int closeQuoteIndex = valuePart.indexOf("\"", 1); + if(closeQuoteIndex != -1) + { + parameters.put(paramName, valuePart.substring(1, closeQuoteIndex)); + remainder = (closeQuoteIndex == valuePart.length()-1) ? "" : valuePart.substring(closeQuoteIndex+1); } else { - // not a text message - message = new AmqpMessageImpl(header,properties,appProperties,body,footer, _session); + remainder = ""; } - }*/ + } else { - message = new AmqpMessageImpl(header, - deliveryAnnotations, - messageAnnotations, properties,appProperties,body,footer, _session); + Pattern pattern = Pattern.compile("\\s|;|\\("); + Matcher matcher = pattern.matcher(valuePart); + if(matcher.matches()) + { + parameters.put(paramName, valuePart.substring(0,matcher.start())); + remainder = valuePart.substring(matcher.start()); + } + else + { + parameters.put(paramName, valuePart); + remainder = ""; + } } - } - else - { - message = new AmqpMessageImpl(header, - deliveryAnnotations, - messageAnnotations, properties,appProperties,body,footer, _session); - } - message.setReadOnly(); + int paramSep = remainder.indexOf(";"); + if(paramSep != -1 && paramSep != remainder.length()-1) + { + parseContentTypeParameters(remainder.substring(paramSep+1),parameters); + } + } - return message; } + } |