summaryrefslogtreecommitdiff
path: root/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ExecutionPhase.java
blob: 8db955afbc22e91a8f22d4f8caedd37c4df0c357 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
package org.apache.qpid.nclient.execution;

import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQBody;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.AMQRequestBody;
import org.apache.qpid.framing.AMQResponseBody;
import org.apache.qpid.framing.RequestResponseMappingException;
import org.apache.qpid.nclient.core.AMQPException;
import org.apache.qpid.nclient.core.AbstractPhase;
import org.apache.qpid.nclient.core.QpidConstants;
import org.apache.qpid.nclient.model.AMQPMethodEvent;
import org.apache.qpid.protocol.AMQMethodEvent;

/**
 * Corressponds to the Layer 2 in AMQP.
 * This phase handles the correlation of amqp messages
 * This class implements the 0.9 spec (request/response) 
 */
public class ExecutionPhase extends AbstractPhase{

	protected static final Logger _logger = Logger.getLogger(ExecutionPhase.class);
    protected ConcurrentMap _channelId2RequestMgrMap = new ConcurrentHashMap();
    protected ConcurrentMap _channelId2ResponseMgrMap = new ConcurrentHashMap();

    
	/**
	 * --------------------------------------------------
	 * Phase related methods
	 * --------------------------------------------------
	 */	
    
    // should add these in the init method
    //_channelId2RequestMgrMap.put(0, new RequestManager(_ConnectionId, 0, this, false));
    //_channelId2ResponseMgrMap.put(0, new ResponseManager(_ConnectionId, 0, _stateManager, this, false));
    
	public void messageReceived(Object msg) throws AMQPException 
	{		
		AMQFrame frame = (AMQFrame) msg;
        final AMQBody bodyFrame = frame.getBodyFrame();

        if (bodyFrame instanceof AMQRequestBody)
        {   
        	AMQPMethodEvent event;
			try 
			{
				event = messageRequestBodyReceived(frame.getChannel(), (AMQRequestBody)bodyFrame);
				super.messageReceived(event);
			} 
			catch (Exception e) 
			{
				_logger.error("Error handling request",e);
			}
            
        }
        else if (bodyFrame instanceof AMQResponseBody)
        {
           List<AMQPMethodEvent> events;
		   try 
		   {
			   events = messageResponseBodyReceived(frame.getChannel(), (AMQResponseBody)bodyFrame);
			   for (AMQPMethodEvent event: events)
			   {
		        	  super.messageReceived(event);  
		       }
		   }
		   catch (Exception e) 
		   {
			   _logger.error("Error handling response",e);
		   }           
        }
	}

	/**
	 * Need to figure out if the message is a request or a response 
	 * that needs to be sent and then delegate it to the Request or response manager 
	 * to prepare it.
	 */
	public void messageSent(Object msg) throws AMQPException 
	{
		AMQPMethodEvent evt = (AMQPMethodEvent)msg;		
		if(evt.getCorrelationId() == QpidConstants.EMPTY_CORRELATION_ID)
		{
			// This is a request
			AMQFrame frame = handleRequest(evt);
			super.messageSent(frame);
		}
		else
		{
//			 This is a response
			List<AMQFrame> frames = handleResponse(evt);
			for(AMQFrame frame: frames)
			{
				super.messageSent(frame);
			}
		}		
	}

	/**
	 * ------------------------------------------------
	 * Methods to handle request response
	 * -----------------------------------------------
	 */
    private AMQPMethodEvent messageRequestBodyReceived(int channelId, AMQRequestBody requestBody) throws Exception
    {
        if (_logger.isDebugEnabled())
        {
            _logger.debug("Request frame received: " + requestBody);
        }
        ResponseManager responseManager = (ResponseManager)_channelId2ResponseMgrMap.get(channelId);
        if (responseManager == null)
            throw new AMQException("Unable to find ResponseManager for channel " + channelId);
        return responseManager.requestReceived(requestBody);
    }
    
    private List<AMQPMethodEvent> messageResponseBodyReceived(int channelId, AMQResponseBody responseBody) throws Exception
    {
        if (_logger.isDebugEnabled())
        {
            _logger.debug("Response frame received: " + responseBody);
        }
        RequestManager requestManager = (RequestManager)_channelId2RequestMgrMap.get(channelId);
        if (requestManager == null)
            throw new AMQException("Unable to find RequestManager for channel " + channelId);
        return requestManager.responseReceived(responseBody);
    }
    
    private AMQFrame handleRequest(AMQPMethodEvent evt)
    {
    	RequestManager requestManager = (RequestManager)_channelId2RequestMgrMap.get(evt.getChannelId());
    	return requestManager.sendRequest(evt);
    }
    
    private List<AMQFrame> handleResponse(AMQPMethodEvent evt) throws AMQPException
    {
    	ResponseManager responseManager = (ResponseManager)_channelId2ResponseMgrMap.get(evt.getChannelId());
    	try
    	{
    		return responseManager.sendResponse(evt);
    	}
    	catch(Exception e)
    	{
    		throw new AMQPException("Error handling response",e);
    	}
    }
}