summaryrefslogtreecommitdiff
path: root/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
blob: b2f4fcef841d5f20e42e5a7952da6dcdd5102824 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
/*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */
package org.apache.qpid.client;

import javax.jms.JMSException;
import javax.jms.Message;

import org.apache.qpid.AMQException;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.message.*;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.configuration.ClientProperties;
import org.apache.qpid.framing.*;
import org.apache.qpid.jms.ConnectionURL;
import org.apache.qpid.url.BindingURL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMessage_0_8>
{
    protected final Logger _logger = LoggerFactory.getLogger(getClass());
    private AMQSession_0_8.DestinationCache<AMQTopic> _topicDestinationCache;
    private AMQSession_0_8.DestinationCache<AMQQueue> _queueDestinationCache;

    private final RejectBehaviour _rejectBehaviour;

    protected BasicMessageConsumer_0_8(int channelId, AMQConnection connection, AMQDestination destination,
                                       String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession_0_8 session,
                                       AMQProtocolHandler protocolHandler, FieldTable rawSelector, int prefetchHigh, int prefetchLow,
                                       boolean exclusive, int acknowledgeMode, boolean browseOnly, boolean autoClose) throws JMSException
    {
        super(channelId, connection, destination,messageSelector,noLocal,messageFactory,session,
              protocolHandler, rawSelector, prefetchHigh, prefetchLow, exclusive,
              acknowledgeMode, browseOnly, autoClose);
        final FieldTable consumerArguments = getArguments();
        if (isAutoClose())
        {
            consumerArguments.put(AMQPFilterTypes.AUTO_CLOSE.getValue(), Boolean.TRUE);
        }

        if (isBrowseOnly())
        {
            consumerArguments.put(AMQPFilterTypes.NO_CONSUME.getValue(), Boolean.TRUE);
        }

        _topicDestinationCache = session.getTopicDestinationCache();
        _queueDestinationCache = session.getQueueDestinationCache();

        if (destination.getRejectBehaviour() != null)
        {
            _rejectBehaviour = destination.getRejectBehaviour();
        }
        else
        {
            ConnectionURL connectionURL = connection.getConnectionURL();
            String rejectBehaviour = connectionURL.getOption(ConnectionURL.OPTIONS_REJECT_BEHAVIOUR);
            if (rejectBehaviour != null)
            {
                _rejectBehaviour = RejectBehaviour.valueOf(rejectBehaviour.toUpperCase());
            }
            else
            {
                // use the default value for all connections, if not set
                rejectBehaviour = System.getProperty(ClientProperties.REJECT_BEHAVIOUR_PROP_NAME, RejectBehaviour.NORMAL.toString());
                _rejectBehaviour = RejectBehaviour.valueOf( rejectBehaviour.toUpperCase());
            }
        }
    }

    void sendCancel() throws AMQException, FailoverException
    {
        BasicCancelBody body = getSession().getMethodRegistry().createBasicCancelBody(new AMQShortString(String.valueOf(_consumerTag)), false);

        final AMQFrame cancelFrame = body.generateFrame(_channelId);

        _protocolHandler.syncWrite(cancelFrame, BasicCancelOkBody.class);

        if (_logger.isDebugEnabled())
        {
            _logger.debug("CancelOk'd for consumer:" + debugIdentity());
        }
    }

    public AbstractJMSMessage createJMSMessageFromUnprocessedMessage(AMQMessageDelegateFactory delegateFactory, UnprocessedMessage_0_8 messageFrame)throws Exception
    {

        return _messageFactory.createMessage(messageFrame.getDeliveryTag(),
                                             messageFrame.isRedelivered(), messageFrame.getExchange(),
                                             messageFrame.getRoutingKey(), messageFrame.getContentHeader(), messageFrame.getBodies(),
                _queueDestinationCache, _topicDestinationCache);

    }

    Message receiveBrowse() throws JMSException
    {
        return receive();
    }

    void cleanupQueue() throws AMQException, FailoverException
    {
        
    }

    public RejectBehaviour getRejectBehaviour()
    {
        return _rejectBehaviour;
    }
}