diff options
author | Drew Erny <drew.erny@docker.com> | 2019-12-11 10:05:03 -0600 |
---|---|---|
committer | Drew Erny <derny@mirantis.com> | 2020-01-13 13:21:12 -0600 |
commit | 30d9fe30b1c1bf52f15a41e0b106a1542a167e04 (patch) | |
tree | f3363dfa99bc6d739d05fe10506ccc1c8b25ee29 /integration/internal | |
parent | 4d63209d94276d66d9db39174bfe0127f4b9a8dc (diff) | |
download | docker-30d9fe30b1c1bf52f15a41e0b106a1542a167e04.tar.gz |
Add swarm jobs
Adds support for ReplicatedJob and GlobalJob service modes. These modes
allow running service which execute tasks that exit upon success,
instead of daemon-type tasks.
Signed-off-by: Drew Erny <drew.erny@docker.com>
Diffstat (limited to 'integration/internal')
-rw-r--r-- | integration/internal/swarm/service.go | 9 | ||||
-rw-r--r-- | integration/internal/swarm/states.go | 81 |
2 files changed, 89 insertions, 1 deletions
diff --git a/integration/internal/swarm/service.go b/integration/internal/swarm/service.go index ac0bded03c..ccf945b6fa 100644 --- a/integration/internal/swarm/service.go +++ b/integration/internal/swarm/service.go @@ -20,7 +20,7 @@ import ( // ServicePoll tweaks the pollSettings for `service` func ServicePoll(config *poll.Settings) { // Override the default pollSettings for `service` resource here ... - config.Timeout = 30 * time.Second + config.Timeout = 15 * time.Second config.Delay = 100 * time.Millisecond if runtime.GOARCH == "arm64" || runtime.GOARCH == "arm" { config.Timeout = 90 * time.Second @@ -91,6 +91,13 @@ func CreateServiceSpec(t *testing.T, opts ...ServiceSpecOpt) swarmtypes.ServiceS return spec } +// ServiceWithMode sets the mode of the service to the provided mode. +func ServiceWithMode(mode swarmtypes.ServiceMode) func(*swarmtypes.ServiceSpec) { + return func(spec *swarmtypes.ServiceSpec) { + spec.Mode = mode + } +} + // ServiceWithInit sets whether the service should use init or not func ServiceWithInit(b *bool) func(*swarmtypes.ServiceSpec) { return func(spec *swarmtypes.ServiceSpec) { diff --git a/integration/internal/swarm/states.go b/integration/internal/swarm/states.go index bf95b786ca..77b3f30209 100644 --- a/integration/internal/swarm/states.go +++ b/integration/internal/swarm/states.go @@ -2,6 +2,7 @@ package swarm import ( "context" + "fmt" "github.com/docker/docker/api/types" "github.com/docker/docker/api/types/filters" @@ -82,3 +83,83 @@ func RunningTasksCount(client client.ServiceAPIClient, serviceID string, instanc } } } + +// JobComplete is a poll function for determining that a ReplicatedJob is +// completed additionally, while polling, it verifies that the job never +// exceeds MaxConcurrent running tasks +func JobComplete(client client.CommonAPIClient, service swarmtypes.Service) func(log poll.LogT) poll.Result { + filter := filters.NewArgs() + filter.Add("service", service.ID) + + var jobIteration swarmtypes.Version + if service.JobStatus != nil { + jobIteration = service.JobStatus.JobIteration + } + + maxRaw := service.Spec.Mode.ReplicatedJob.MaxConcurrent + totalRaw := service.Spec.Mode.ReplicatedJob.TotalCompletions + + max := int(*maxRaw) + total := int(*totalRaw) + + previousResult := "" + + return func(log poll.LogT) poll.Result { + tasks, err := client.TaskList(context.Background(), types.TaskListOptions{ + Filters: filter, + }) + + if err != nil { + poll.Error(err) + } + + var running int + var completed int + + var runningSlot []int + var runningID []string + + for _, task := range tasks { + // make sure the task has the same job iteration + if task.JobIteration == nil || task.JobIteration.Index != jobIteration.Index { + continue + } + switch task.Status.State { + case swarmtypes.TaskStateRunning: + running++ + runningSlot = append(runningSlot, task.Slot) + runningID = append(runningID, task.ID) + case swarmtypes.TaskStateComplete: + completed++ + } + } + + switch { + case running > max: + return poll.Error(fmt.Errorf( + "number of running tasks (%v) exceeds max (%v)", running, max, + )) + case (completed + running) > total: + return poll.Error(fmt.Errorf( + "number of tasks exceeds total (%v), %v running and %v completed", + total, running, completed, + )) + case completed == total && running == 0: + return poll.Success() + default: + newRes := fmt.Sprintf( + "Completed: %2d Running: %v\n\t%v", + completed, runningSlot, runningID, + ) + if newRes == previousResult { + } else { + previousResult = newRes + } + + return poll.Continue( + "Job not yet finished, %v completed and %v running out of %v total", + completed, running, total, + ) + } + } +} |