summaryrefslogtreecommitdiff
path: root/integration/internal
diff options
context:
space:
mode:
authorDrew Erny <drew.erny@docker.com>2019-12-11 10:05:03 -0600
committerDrew Erny <derny@mirantis.com>2020-01-13 13:21:12 -0600
commit30d9fe30b1c1bf52f15a41e0b106a1542a167e04 (patch)
treef3363dfa99bc6d739d05fe10506ccc1c8b25ee29 /integration/internal
parent4d63209d94276d66d9db39174bfe0127f4b9a8dc (diff)
downloaddocker-30d9fe30b1c1bf52f15a41e0b106a1542a167e04.tar.gz
Add swarm jobs
Adds support for ReplicatedJob and GlobalJob service modes. These modes allow running service which execute tasks that exit upon success, instead of daemon-type tasks. Signed-off-by: Drew Erny <drew.erny@docker.com>
Diffstat (limited to 'integration/internal')
-rw-r--r--integration/internal/swarm/service.go9
-rw-r--r--integration/internal/swarm/states.go81
2 files changed, 89 insertions, 1 deletions
diff --git a/integration/internal/swarm/service.go b/integration/internal/swarm/service.go
index ac0bded03c..ccf945b6fa 100644
--- a/integration/internal/swarm/service.go
+++ b/integration/internal/swarm/service.go
@@ -20,7 +20,7 @@ import (
// ServicePoll tweaks the pollSettings for `service`
func ServicePoll(config *poll.Settings) {
// Override the default pollSettings for `service` resource here ...
- config.Timeout = 30 * time.Second
+ config.Timeout = 15 * time.Second
config.Delay = 100 * time.Millisecond
if runtime.GOARCH == "arm64" || runtime.GOARCH == "arm" {
config.Timeout = 90 * time.Second
@@ -91,6 +91,13 @@ func CreateServiceSpec(t *testing.T, opts ...ServiceSpecOpt) swarmtypes.ServiceS
return spec
}
+// ServiceWithMode sets the mode of the service to the provided mode.
+func ServiceWithMode(mode swarmtypes.ServiceMode) func(*swarmtypes.ServiceSpec) {
+ return func(spec *swarmtypes.ServiceSpec) {
+ spec.Mode = mode
+ }
+}
+
// ServiceWithInit sets whether the service should use init or not
func ServiceWithInit(b *bool) func(*swarmtypes.ServiceSpec) {
return func(spec *swarmtypes.ServiceSpec) {
diff --git a/integration/internal/swarm/states.go b/integration/internal/swarm/states.go
index bf95b786ca..77b3f30209 100644
--- a/integration/internal/swarm/states.go
+++ b/integration/internal/swarm/states.go
@@ -2,6 +2,7 @@ package swarm
import (
"context"
+ "fmt"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/filters"
@@ -82,3 +83,83 @@ func RunningTasksCount(client client.ServiceAPIClient, serviceID string, instanc
}
}
}
+
+// JobComplete is a poll function for determining that a ReplicatedJob is
+// completed additionally, while polling, it verifies that the job never
+// exceeds MaxConcurrent running tasks
+func JobComplete(client client.CommonAPIClient, service swarmtypes.Service) func(log poll.LogT) poll.Result {
+ filter := filters.NewArgs()
+ filter.Add("service", service.ID)
+
+ var jobIteration swarmtypes.Version
+ if service.JobStatus != nil {
+ jobIteration = service.JobStatus.JobIteration
+ }
+
+ maxRaw := service.Spec.Mode.ReplicatedJob.MaxConcurrent
+ totalRaw := service.Spec.Mode.ReplicatedJob.TotalCompletions
+
+ max := int(*maxRaw)
+ total := int(*totalRaw)
+
+ previousResult := ""
+
+ return func(log poll.LogT) poll.Result {
+ tasks, err := client.TaskList(context.Background(), types.TaskListOptions{
+ Filters: filter,
+ })
+
+ if err != nil {
+ poll.Error(err)
+ }
+
+ var running int
+ var completed int
+
+ var runningSlot []int
+ var runningID []string
+
+ for _, task := range tasks {
+ // make sure the task has the same job iteration
+ if task.JobIteration == nil || task.JobIteration.Index != jobIteration.Index {
+ continue
+ }
+ switch task.Status.State {
+ case swarmtypes.TaskStateRunning:
+ running++
+ runningSlot = append(runningSlot, task.Slot)
+ runningID = append(runningID, task.ID)
+ case swarmtypes.TaskStateComplete:
+ completed++
+ }
+ }
+
+ switch {
+ case running > max:
+ return poll.Error(fmt.Errorf(
+ "number of running tasks (%v) exceeds max (%v)", running, max,
+ ))
+ case (completed + running) > total:
+ return poll.Error(fmt.Errorf(
+ "number of tasks exceeds total (%v), %v running and %v completed",
+ total, running, completed,
+ ))
+ case completed == total && running == 0:
+ return poll.Success()
+ default:
+ newRes := fmt.Sprintf(
+ "Completed: %2d Running: %v\n\t%v",
+ completed, runningSlot, runningID,
+ )
+ if newRes == previousResult {
+ } else {
+ previousResult = newRes
+ }
+
+ return poll.Continue(
+ "Job not yet finished, %v completed and %v running out of %v total",
+ completed, running, total,
+ )
+ }
+ }
+}