blob: 66d59910ed255a3d731cc31ace6af0d20917b7ef (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
|
package org.apache.qpid.server.protocol.converter.v0_8_v1_0;
import java.util.ArrayList;
import java.util.List;
import org.apache.qpid.amqp_1_0.messaging.SectionEncoder;
import org.apache.qpid.amqp_1_0.type.Binary;
import org.apache.qpid.amqp_1_0.type.Section;
import org.apache.qpid.amqp_1_0.type.Symbol;
import org.apache.qpid.amqp_1_0.type.UnsignedByte;
import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties;
import org.apache.qpid.amqp_1_0.type.messaging.Header;
import org.apache.qpid.amqp_1_0.type.messaging.Properties;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.protocol.v0_8.AMQMessage;
import org.apache.qpid.server.protocol.v1_0.MessageConverter_to_1_0;
import org.apache.qpid.server.protocol.v1_0.MessageMetaData_1_0;
public class MessageConverter_0_8_to_1_0 extends MessageConverter_to_1_0<AMQMessage>
{
@Override
public Class<AMQMessage> getInputClass()
{
return AMQMessage.class;
}
protected MessageMetaData_1_0 convertMetaData(final AMQMessage serverMessage, SectionEncoder sectionEncoder)
{
List<Section> sections = new ArrayList<Section>(3);
Header header = new Header();
header.setDurable(serverMessage.isPersistent());
BasicContentHeaderProperties contentHeader =
(BasicContentHeaderProperties) serverMessage.getContentHeaderBody().getProperties();
header.setPriority(UnsignedByte.valueOf(contentHeader.getPriority()));
final long expiration = serverMessage.getExpiration();
final long arrivalTime = serverMessage.getArrivalTime();
if(expiration > arrivalTime)
{
header.setTtl(UnsignedInteger.valueOf(expiration - arrivalTime));
}
sections.add(header);
Properties props = new Properties();
props.setContentEncoding(Symbol.valueOf(contentHeader.getEncodingAsString()));
props.setContentType(Symbol.valueOf(contentHeader.getContentTypeAsString()));
// Modify the content type when we are dealing with java object messages produced by the Qpid 0.x client
if(props.getContentType() == Symbol.valueOf("application/java-object-stream"))
{
props.setContentType(Symbol.valueOf("application/x-java-serialized-object"));
}
final AMQShortString correlationId = contentHeader.getCorrelationId();
if(correlationId != null)
{
props.setCorrelationId(new Binary(correlationId.getBytes()));
}
// props.setCreationTime();
// props.setGroupId();
// props.setGroupSequence();
final AMQShortString messageId = contentHeader.getMessageId();
if(messageId != null)
{
props.setMessageId(new Binary(messageId.getBytes()));
}
props.setReplyTo(String.valueOf(contentHeader.getReplyTo()));
// props.setReplyToGroupId();
props.setSubject(serverMessage.getRoutingKey());
// props.setTo();
if(contentHeader.getUserId() != null)
{
props.setUserId(new Binary(contentHeader.getUserId().getBytes()));
}
sections.add(props);
sections.add(new ApplicationProperties(FieldTable.convertToMap(contentHeader.getHeaders())));
return new MessageMetaData_1_0(sections, sectionEncoder);
}
}
|