summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorArchit Aggarwal <architag@amazon.com>2021-02-10 15:25:25 -0800
committerGitHub <noreply@github.com>2021-02-10 15:25:25 -0800
commit693212bd052d397cd390ef2d4ae4c3f318d2f7f5 (patch)
treec5c6abde13e797f59a357fe8c96b39273229b3b7
parent5bb198cdb84af5a225aad6c894d8717e536e4104 (diff)
downloadfreertos-git-693212bd052d397cd390ef2d4ae4c3f318d2f7f5.tar.gz
Update Jobs demo to use DescribeJobExecution instead of StartNextPendingJobExecution API (#496)
It is recommended by the AWS IoT Jobs service to use DescribeJobExecution API for scaling purposes instead of StartNextPendingJobExecution API. Thus, update the Jobs demo to replace the latter API call with the former API call.
-rwxr-xr-xFreeRTOS-Plus/Demo/AWS/Jobs_Windows_Simulator/Jobs_Demo/DemoTasks/JobsDemoExample.c138
m---------FreeRTOS-Plus/Source/AWS/jobs0
2 files changed, 94 insertions, 44 deletions
diff --git a/FreeRTOS-Plus/Demo/AWS/Jobs_Windows_Simulator/Jobs_Demo/DemoTasks/JobsDemoExample.c b/FreeRTOS-Plus/Demo/AWS/Jobs_Windows_Simulator/Jobs_Demo/DemoTasks/JobsDemoExample.c
index 60d9b1dc2..cc55c9884 100755
--- a/FreeRTOS-Plus/Demo/AWS/Jobs_Windows_Simulator/Jobs_Demo/DemoTasks/JobsDemoExample.c
+++ b/FreeRTOS-Plus/Demo/AWS/Jobs_Windows_Simulator/Jobs_Demo/DemoTasks/JobsDemoExample.c
@@ -142,7 +142,7 @@
* This demo program expects this key to be in the Job document. It is a key
* specific to this demo.
*/
-#define jobsexampleQUERY_KEY_FOR_ACTION jobsexampleQUERY_KEY_FOR_JOBS_DOC ".action"
+#define jobsexampleQUERY_KEY_FOR_ACTION "action"
/**
* @brief The length of #jobsexampleQUERY_KEY_FOR_ACTION.
@@ -157,7 +157,7 @@
* is either "publish" or "print". It represents the message that should be
* published or printed, respectively.
*/
-#define jobsexampleQUERY_KEY_FOR_MESSAGE jobsexampleQUERY_KEY_FOR_JOBS_DOC ".message"
+#define jobsexampleQUERY_KEY_FOR_MESSAGE "message"
/**
* @brief The length of #jobsexampleQUERY_KEY_FOR_MESSAGE.
@@ -172,7 +172,7 @@
* is "publish". It represents the MQTT topic on which the message should be
* published.
*/
-#define jobsexampleQUERY_KEY_FOR_TOPIC jobsexampleQUERY_KEY_FOR_JOBS_DOC ".topic"
+#define jobsexampleQUERY_KEY_FOR_TOPIC "topic"
/**
* @brief The length of #jobsexampleQUERY_KEY_FOR_TOPIC.
@@ -181,14 +181,14 @@
/**
* @brief Utility macro to generate the PUBLISH topic string to the
- * DescribePendingJobExecution API of AWS IoT Jobs service for requesting
+ * DescribeJobExecution API of AWS IoT Jobs service for requesting
* the next pending job information.
*
* @param[in] thingName The name of the Thing resource to query for the
* next pending job.
*/
-#define START_NEXT_JOB_TOPIC( thingName ) \
- ( JOBS_API_PREFIX thingName JOBS_API_BRIDGE JOBS_API_STARTNEXT )
+#define DESCRIBE_NEXT_JOB_TOPIC( thingName ) \
+ ( JOBS_API_PREFIX thingName JOBS_API_BRIDGE JOBS_API_JOBID_NEXT "/" JOBS_API_GETPENDING )
/**
* @brief Utility macro to generate the subscription topic string for the
@@ -239,8 +239,8 @@ typedef enum JobActionType
/*-----------------------------------------------------------*/
-/**
- * @brief Each compilation unit that consumes the NetworkContext must define it.
+/**
+ * @brief Each compilation unit that consumes the NetworkContext must define it.
* It should contain a single pointer to the type of your desired transport.
* When using multiple transports in the same compilation unit, define this pointer as void *.
*
@@ -271,14 +271,29 @@ static TlsTransportParams_t xTlsTransportParams;
/**
* @brief Static buffer used to hold MQTT messages being sent and received.
*/
-static uint8_t ucSharedBuffer[ democonfigNETWORK_BUFFER_SIZE ];
+static uint8_t usMqttConnectionBuffer[ democonfigNETWORK_BUFFER_SIZE ];
+
+/**
+ * @brief Static buffer used to hold the job ID of the single job that
+ * is executed at a time in the demo. This buffer allows re-use of the MQTT
+ * connection context for sending status updates of a job while it is being
+ * processed.
+ */
+static uint8_t usJobIdBuffer[ democonfigNETWORK_BUFFER_SIZE ];
+
+/**
+ * @brief Static buffer used to hold the job document of the single job that
+ * is executed at a time in the demo. This buffer allows re-use of the MQTT
+ * connection context for sending status updates of a job while it is being processed.
+ */
+static uint8_t usJobsDocumentBuffer[ democonfigNETWORK_BUFFER_SIZE ];
/**
* @brief Static buffer used to hold MQTT messages being sent and received.
*/
static MQTTFixedBuffer_t xBuffer =
{
- .pBuffer = ucSharedBuffer,
+ .pBuffer = usMqttConnectionBuffer,
.size = democonfigNETWORK_BUFFER_SIZE
};
@@ -325,12 +340,11 @@ static void prvEventCallback( MQTTContext_t * pxMqttContext,
MQTTDeserializedInfo_t * pxDeserializedInfo );
/**
- * @brief Process payload from NextJobExecutionChanged and StartNextPendingJobExecution
+ * @brief Process payload from NextJobExecutionChanged and DescribeJobExecution
* API MQTT topics of AWS IoT Jobs service.
*
- * This handler parses the payload received about the next pending job to identify
- * the action requested in the job document, and perform the appropriate
- * action to execute the job.
+ * This handler parses the received payload about the next pending job, identifies
+ * the action requested in the job document, and executes the action.
*
* @param[in] pPublishInfo Deserialized publish info pointer for the incoming
* packet.
@@ -354,14 +368,16 @@ static void prvSendUpdateForJob( char * pcJobId,
* It parses the received job document, executes the job depending on the job "Action" type, and
* sends an update to AWS for the Job.
*
- * @param[in] pxPublishInfo The PUBLISH packet containing the job document received from the
- * AWS IoT Jobs service.
* @param[in] pcJobId The ID of the job to execute.
* @param[in] usJobIdLength The length of the job ID string.
+ * @param[in] pcJobDocument The JSON document associated with the @a pcJobID job
+ * that is to be processed.
+ * @param[in] usDocumentLength The length of the job document.
*/
-static void prvProcessJobDocument( MQTTPublishInfo_t * pxPublishInfo,
- char * pcJobId,
- uint16_t usJobIdLength );
+static void prvProcessJobDocument( char * pcJobId,
+ uint16_t usJobIdLength,
+ char * pcJobDocument,
+ uint16_t jobDocumentLength );
/**
* @brief The task used to demonstrate the Jobs library API.
@@ -443,19 +459,22 @@ static void prvSendUpdateForJob( char * pcJobId,
}
}
-static void prvProcessJobDocument( MQTTPublishInfo_t * pxPublishInfo,
- char * pcJobId,
- uint16_t usJobIdLength )
+static void prvProcessJobDocument( char * pcJobId,
+ uint16_t usJobIdLength,
+ char * pcJobDocument,
+ uint16_t jobDocumentLength )
{
char * pcAction = NULL;
size_t uActionLength = 0U;
JSONStatus_t xJsonStatus = JSONSuccess;
- configASSERT( pxPublishInfo != NULL );
- configASSERT( ( pxPublishInfo->pPayload != NULL ) && ( pxPublishInfo->payloadLength > 0 ) );
+ configASSERT( pcJobId != NULL );
+ configASSERT( usJobIdLength > 0 );
+ configASSERT( pcJobDocument != NULL );
+ configASSERT( jobDocumentLength > 0 );
- xJsonStatus = JSON_Search( ( char * ) pxPublishInfo->pPayload,
- pxPublishInfo->payloadLength,
+ xJsonStatus = JSON_Search( pcJobDocument,
+ jobDocumentLength,
jobsexampleQUERY_KEY_FOR_ACTION,
jobsexampleQUERY_KEY_FOR_ACTION_LENGTH,
&pcAction,
@@ -485,8 +504,8 @@ static void prvProcessJobDocument( MQTTPublishInfo_t * pxPublishInfo,
case JOB_ACTION_PRINT:
LogInfo( ( "Received job contains \"print\" action." ) );
- xJsonStatus = JSON_Search( ( char * ) pxPublishInfo->pPayload,
- pxPublishInfo->payloadLength,
+ xJsonStatus = JSON_Search( pcJobDocument,
+ jobDocumentLength,
jobsexampleQUERY_KEY_FOR_MESSAGE,
jobsexampleQUERY_KEY_FOR_MESSAGE_LENGTH,
&pcMessage,
@@ -517,8 +536,8 @@ static void prvProcessJobDocument( MQTTPublishInfo_t * pxPublishInfo,
char * pcTopic = NULL;
size_t ulTopicLength = 0U;
- xJsonStatus = JSON_Search( ( char * ) pxPublishInfo->pPayload,
- pxPublishInfo->payloadLength,
+ xJsonStatus = JSON_Search( pcJobDocument,
+ jobDocumentLength,
jobsexampleQUERY_KEY_FOR_TOPIC,
jobsexampleQUERY_KEY_FOR_TOPIC_LENGTH,
&pcTopic,
@@ -532,8 +551,8 @@ static void prvProcessJobDocument( MQTTPublishInfo_t * pxPublishInfo,
}
else
{
- xJsonStatus = JSON_Search( ( char * ) pxPublishInfo->pPayload,
- pxPublishInfo->payloadLength,
+ xJsonStatus = JSON_Search( pcJobDocument,
+ jobDocumentLength,
jobsexampleQUERY_KEY_FOR_MESSAGE,
jobsexampleQUERY_KEY_FOR_MESSAGE_LENGTH,
&pcMessage,
@@ -590,7 +609,7 @@ static void prvNextJobHandler( MQTTPublishInfo_t * pxPublishInfo )
else
{
char * pcJobId = NULL;
- size_t ulJobIdLength = 0U;
+ size_t ulJobIdLength = 0UL;
/* Parse the Job ID of the next pending job execution from the JSON payload. */
if( JSON_Search( ( char * ) pxPublishInfo->pPayload,
@@ -607,12 +626,43 @@ static void prvNextJobHandler( MQTTPublishInfo_t * pxPublishInfo )
}
else
{
+ char * pcJobDocLoc = NULL;
+ size_t ulJobDocLength = 0UL;
+
configASSERT( ulJobIdLength < JOBS_JOBID_MAX_LENGTH );
LogInfo( ( "Received a Job from AWS IoT Jobs service: JobId=%.*s",
ulJobIdLength, pcJobId ) );
- /* Process the Job document and execute the job. */
- prvProcessJobDocument( pxPublishInfo, pcJobId, ( uint16_t ) ulJobIdLength );
+ /* Copy the Job ID in the global buffer. This is done so that
+ * the MQTT context's network buffer can be used for sending jobs
+ * status updates to the AWS IoT Jobs service. */
+ memcpy( usJobIdBuffer, pcJobId, ulJobIdLength );
+
+ /* Search for the jobs document in the payload. */
+ if( JSON_Search( ( char * ) pxPublishInfo->pPayload,
+ pxPublishInfo->payloadLength,
+ jobsexampleQUERY_KEY_FOR_JOBS_DOC,
+ jobsexampleQUERY_KEY_FOR_JOBS_DOC_LENGTH,
+ &pcJobDocLoc,
+ &ulJobDocLength ) != JSONSuccess )
+ {
+ LogWarn( ( "Failed to parse document of next job received from AWS IoT Jobs service: "
+ "Topic=%.*s, JobID=%.*s",
+ pxPublishInfo->topicNameLength, pxPublishInfo->pTopicName,
+ ulJobIdLength, pcJobId ) );
+ }
+ else
+ {
+ /* Copy the Job document in buffer. This is done so that the MQTT connection buffer can
+ * be used for sending jobs status updates to the AWS IoT Jobs service. */
+ memcpy( usJobsDocumentBuffer, pcJobDocLoc, ulJobDocLength );
+
+ /* Process the Job document and execute the job. */
+ prvProcessJobDocument( usJobIdBuffer,
+ ( uint16_t ) ulJobIdLength,
+ usJobsDocumentBuffer,
+ ulJobDocLength );
+ }
}
}
}
@@ -663,7 +713,7 @@ static void prvEventCallback( MQTTContext_t * pxMqttContext,
if( xStatus == JobsSuccess )
{
/* Upon successful return, the messageType has been filled in. */
- if( ( topicType == JobsStartNextSuccess ) || ( topicType == JobsNextJobChanged ) )
+ if( ( topicType == JobsDescribeSuccess ) || ( topicType == JobsNextJobChanged ) )
{
/* Handler function to process payload. */
prvNextJobHandler( pxDeserializedInfo->pPublishInfo );
@@ -748,10 +798,10 @@ void vStartJobsDemo( void )
*
* This function also shows that the communication with the AWS IoT Jobs services does
* not require explicit subscriptions to the response MQTT topics for request commands that
- * sent to the MQTT APIs (like StartNextPendingJobExecution API) of the service. The service
+ * sent to the MQTT APIs (like DescribeJobExecution API) of the service. The service
* will send messages on the response topics for the request commands on the same
* MQTT connection irrespective of whether the client subscribes to the response topics.
- * Therefore, this demo processes incoming messages from response topics of StartNextPendingJobExecution
+ * Therefore, this demo processes incoming messages from response topics of DescribeJobExecution
* and UpdateJobExecution APIs without explicitly subscribing to the topics.
*/
void prvJobsDemoTask( void * pvParameters )
@@ -829,23 +879,23 @@ void prvJobsDemoTask( void * pvParameters )
if( xDemoStatus == pdPASS )
{
- /* Publish to AWS IoT Jobs on the StartNextPendingJobExecution API to request the next pending job.
+ /* Publish to AWS IoT Jobs on the DescribeJobExecution API to request the next pending job.
*
* Note: It is not required to make MQTT subscriptions to the response topics of the
- * StartNextPendingJobExecution API because the AWS IoT Jobs service sends responses for
+ * DescribeJobExecution API because the AWS IoT Jobs service sends responses for
* the PUBLISH commands on the same MQTT connection irrespective of whether the client has subscribed
* to the response topics or not.
* This demo processes incoming messages from the response topics of the API in the prvEventCallback()
* handler that is supplied to the coreMQTT library. */
if( xPublishToTopic( &xMqttContext,
- START_NEXT_JOB_TOPIC( democonfigTHING_NAME ),
- sizeof( START_NEXT_JOB_TOPIC( democonfigTHING_NAME ) ) - 1,
+ DESCRIBE_NEXT_JOB_TOPIC( democonfigTHING_NAME ),
+ sizeof( DESCRIBE_NEXT_JOB_TOPIC( democonfigTHING_NAME ) ) - 1,
NULL,
0 ) != pdPASS )
{
xDemoStatus = pdFAIL;
- LogError( ( "Failed to publish to StartNextPendingJobExecution API of AWS IoT Jobs service: "
- "Topic=%s", START_NEXT_JOB_TOPIC( democonfigTHING_NAME ) ) );
+ LogError( ( "Failed to publish to DescribeJobExecution API of AWS IoT Jobs service: "
+ "Topic=%s", DESCRIBE_NEXT_JOB_TOPIC( democonfigTHING_NAME ) ) );
}
}
diff --git a/FreeRTOS-Plus/Source/AWS/jobs b/FreeRTOS-Plus/Source/AWS/jobs
-Subproject 5844fb9a7c6bc3d12f0e4ef143d873d516cf4cc
+Subproject c46950faa2c1adb0071a78e949959a407212ff2