summaryrefslogtreecommitdiff
path: root/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ExecutionPhase.java
blob: e31345aa69397f3dc098545f6cd41f54f1dbeb81 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
package org.apache.qpid.nclient.execution;

import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQBody;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.AMQRequestBody;
import org.apache.qpid.framing.AMQResponseBody;
import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent;
import org.apache.qpid.nclient.core.AMQPException;
import org.apache.qpid.nclient.core.AbstractPhase;
import org.apache.qpid.nclient.core.AMQPConstants;

/**
 * Corressponds to the Layer 2 in AMQP.
 * This phase handles the correlation of amqp messages
 * This class implements the 0.9 spec (request/response) 
 */
public class ExecutionPhase extends AbstractPhase
{

    protected static final Logger _logger = Logger.getLogger(ExecutionPhase.class);

    protected ConcurrentMap _channelId2RequestMgrMap = new ConcurrentHashMap();

    protected ConcurrentMap _channelId2ResponseMgrMap = new ConcurrentHashMap();

    /**
     * --------------------------------------------------
     * Phase related methods
     * --------------------------------------------------
     */

    // should add these in the init method
    //_channelId2RequestMgrMap.put(0, new RequestManager(_ConnectionId, 0, this, false));
    //_channelId2ResponseMgrMap.put(0, new ResponseManager(_ConnectionId, 0, _stateManager, this, false));
    public void messageReceived(Object msg) throws AMQPException
    {
	AMQFrame frame = (AMQFrame) msg;
	final AMQBody bodyFrame = frame.getBodyFrame();

	if (bodyFrame instanceof AMQRequestBody)
	{
	    AMQPMethodEvent event;
	    try
	    {
		event = messageRequestBodyReceived(frame.getChannel(), (AMQRequestBody) bodyFrame);
		super.messageReceived(event);
	    }
	    catch (Exception e)
	    {
		_logger.error("Error handling request", e);
	    }

	}
	else if (bodyFrame instanceof AMQResponseBody)
	{
	    List<AMQPMethodEvent> events;
	    try
	    {
		events = messageResponseBodyReceived(frame.getChannel(), (AMQResponseBody) bodyFrame);
		for (AMQPMethodEvent event : events)
		{
		    super.messageReceived(event);
		}
	    }
	    catch (Exception e)
	    {
		_logger.error("Error handling response", e);
	    }
	}
    }

    /**
     * Need to figure out if the message is a request or a response 
     * that needs to be sent and then delegate it to the Request or response manager 
     * to prepare it.
     */
    public void messageSent(Object msg) throws AMQPException
    {
	AMQPMethodEvent evt = (AMQPMethodEvent) msg;
	if (evt.getCorrelationId() == AMQPConstants.EMPTY_CORRELATION_ID)
	{
	    // This is a request
	    AMQFrame frame = handleRequest(evt);
	    super.messageSent(frame);
	}
	else
	{
	    //			 This is a response
	    List<AMQFrame> frames = handleResponse(evt);
	    for (AMQFrame frame : frames)
	    {
		super.messageSent(frame);
	    }
	}
    }

    /**
     * ------------------------------------------------
     * Methods to handle request response
     * -----------------------------------------------
     */
    private AMQPMethodEvent messageRequestBodyReceived(int channelId, AMQRequestBody requestBody) throws Exception
    {
	if (_logger.isDebugEnabled())
	{
	    _logger.debug("Request frame received: " + requestBody);
	}
	
	ResponseManager responseManager;
	if(_channelId2ResponseMgrMap.containsKey(channelId))
	{
	    responseManager = (ResponseManager) _channelId2ResponseMgrMap.get(channelId);		
	}	
	else
	{
	    responseManager = new ResponseManager(0,channelId,false);
	    _channelId2ResponseMgrMap.put(channelId, responseManager);
	}
	return responseManager.requestReceived(requestBody);
    }

    private List<AMQPMethodEvent> messageResponseBodyReceived(int channelId, AMQResponseBody responseBody)
	    throws Exception
    {
	if (_logger.isDebugEnabled())
	{
	    _logger.debug("Response frame received: " + responseBody);
	}

	RequestManager requestManager;
	if (_channelId2RequestMgrMap.containsKey(channelId))
	{
	    requestManager = (RequestManager) _channelId2RequestMgrMap.get(channelId);
	}
	else
	{
	    requestManager = new RequestManager(0,channelId,false);
	    _channelId2RequestMgrMap.put(channelId, requestManager);
	}
	    
	return requestManager.responseReceived(responseBody);
    }

    private AMQFrame handleRequest(AMQPMethodEvent evt)
    {
	int channelId =  evt.getChannelId();
	RequestManager requestManager;
	if (_channelId2RequestMgrMap.containsKey(channelId))
	{
	    requestManager = (RequestManager) _channelId2RequestMgrMap.get(channelId);
	}
	else
	{
	    requestManager = new RequestManager(0,channelId,false);
	    _channelId2RequestMgrMap.put(channelId, requestManager);
	}
	return requestManager.sendRequest(evt);
    }

    private List<AMQFrame> handleResponse(AMQPMethodEvent evt) throws AMQPException
    {
	int channelId =  evt.getChannelId();
	ResponseManager responseManager;	
	if(_channelId2ResponseMgrMap.containsKey(channelId))
	{
	    responseManager = (ResponseManager) _channelId2ResponseMgrMap.get(channelId);		
	}	
	else
	{
	    responseManager = new ResponseManager(0,channelId,false);
	    _channelId2ResponseMgrMap.put(channelId, responseManager);
	}
	try
	{
	    return responseManager.sendResponse(evt);
	}
	catch (Exception e)
	{
	    throw new AMQPException("Error handling response", e);
	}
    }
}