summaryrefslogtreecommitdiff
path: root/app/workers
diff options
context:
space:
mode:
authorShinya Maeda <shinya@gitlab.com>2017-10-01 00:54:22 +0900
committerShinya Maeda <shinya@gitlab.com>2017-10-01 00:54:22 +0900
commite499c1c39dbea505858874ee47436641df3d93d4 (patch)
treebce2333e86abfbdba7e55d15bb1fe8e432657832 /app/workers
parentc30546f4aa073f44e97b49f47c57a9a89062c3c6 (diff)
downloadgitlab-ce-e499c1c39dbea505858874ee47436641df3d93d4.tar.gz
Replace reactive_cache by multipel sidekiq workers
Diffstat (limited to 'app/workers')
-rw-r--r--app/workers/cluster_creation_worker.rb47
-rw-r--r--app/workers/wait_for_cluster_creation_worker.rb85
2 files changed, 132 insertions, 0 deletions
diff --git a/app/workers/cluster_creation_worker.rb b/app/workers/cluster_creation_worker.rb
new file mode 100644
index 00000000000..0b547089b94
--- /dev/null
+++ b/app/workers/cluster_creation_worker.rb
@@ -0,0 +1,47 @@
+class ClusterCreationWorker
+ include Sidekiq::Worker
+ include DedicatedSidekiqQueue
+
+ def perform(cluster_id)
+ cluster = Ci::Cluster.find_by_id(cluster_id)
+
+ unless cluster
+ return Rails.logger.error "Cluster object is not found; #{cluster_id}"
+ end
+
+ api_client =
+ GoogleApi::CloudPlatform::Client.new(cluster.gcp_token, nil)
+
+ operation = api_client.projects_zones_clusters_create(
+ cluster.gcp_project_id,
+ cluster.cluster_zone,
+ cluster.cluster_name,
+ cluster.cluster_size,
+ machine_type: cluster.machine_type
+ )
+
+ if operation.is_a?(StandardError)
+ return cluster.error!("Failed to request to CloudPlatform; #{operation.message}")
+ end
+
+ unless operation.status == 'RUNNING' || operation.status == 'PENDING'
+ return cluster.error!("Operation status is unexpected; #{operation.status_message}")
+ end
+
+ operation_id = api_client.parse_operation_id(operation.self_link)
+
+ unless operation_id
+ return cluster.error!('Can not find operation_id from self_link')
+ end
+
+ if cluster.update(status: Ci::Cluster.statuses[:creating],
+ gcp_operation_id: operation_id)
+ WaitForClusterCreationWorker.perform_in(
+ WaitForClusterCreationWorker::INITIAL_INTERVAL,
+ cluster.id
+ )
+ else
+ return cluster.error!("Failed to update cluster record; #{cluster.errors}")
+ end
+ end
+end
diff --git a/app/workers/wait_for_cluster_creation_worker.rb b/app/workers/wait_for_cluster_creation_worker.rb
new file mode 100644
index 00000000000..e0dce5a60cd
--- /dev/null
+++ b/app/workers/wait_for_cluster_creation_worker.rb
@@ -0,0 +1,85 @@
+class WaitForClusterCreationWorker
+ include Sidekiq::Worker
+ include DedicatedSidekiqQueue
+
+ INITIAL_INTERVAL = 2.minutes
+ EAGER_INTERVAL = 10.seconds
+ TIMEOUT = 20.minutes
+
+ def perform(cluster_id)
+ cluster = Ci::Cluster.find_by_id(cluster_id)
+
+ unless cluster
+ return Rails.logger.error "Cluster object is not found; #{cluster_id}"
+ end
+
+ api_client =
+ GoogleApi::CloudPlatform::Client.new(cluster.gcp_token, nil)
+
+ operation = api_client.projects_zones_operations(
+ cluster.gcp_project_id,
+ cluster.cluster_zone,
+ cluster.gcp_operation_id
+ )
+
+ if operation.is_a?(StandardError)
+ return cluster.error!("Failed to request to CloudPlatform; #{operation.message}")
+ end
+
+ case operation.status
+ when 'DONE'
+ integrate(api_client, cluster)
+ when 'RUNNING'
+ if Time.now < operation.start_time.to_time + TIMEOUT
+ WaitForClusterCreationWorker.perform_in(EAGER_INTERVAL, cluster.id)
+ else
+ return cluster.error!("Cluster creation time exceeds timeout; #{TIMEOUT}")
+ end
+ else
+ return cluster.error!("Unexpected operation status; #{operation.status_message}")
+ end
+ end
+
+ def integrate(api_client, cluster)
+ # Get cluster details (end point, etc)
+ gke_cluster = api_client.projects_zones_clusters_get(
+ cluster.gcp_project_id,
+ cluster.cluster_zone,
+ cluster.cluster_name
+ )
+
+ if gke_cluster.is_a?(StandardError)
+ return cluster.error!("Failed to request to CloudPlatform; #{gke_cluster.message}")
+ end
+
+ # Get k8s token
+ kubernetes_token = ''
+ KubernetesService.new.tap do |ks|
+ ks.api_url = 'https://' + gke_cluster.endpoint
+ ks.ca_pem = Base64.decode64(gke_cluster.master_auth.cluster_ca_certificate)
+ ks.username = gke_cluster.master_auth.username
+ ks.password = gke_cluster.master_auth.password
+ secrets = ks.read_secrets
+ secrets.each do |secret|
+ name = secret.dig('metadata', 'name')
+ if /default-token/ =~ name
+ token_base64 = secret.dig('data', 'token')
+ kubernetes_token = Base64.decode64(token_base64)
+ break
+ end
+ end
+ end
+
+ unless kubernetes_token
+ return cluster.error!('Failed to get a default token of kubernetes')
+ end
+
+ # k8s endpoint, ca_cert
+ endpoint = 'https://' + gke_cluster.endpoint
+ ca_cert = Base64.decode64(gke_cluster.master_auth.cluster_ca_certificate)
+ username = gke_cluster.master_auth.username
+ password = gke_cluster.master_auth.password
+
+ Ci::IntegrateClusterService.new.execute(cluster, endpoint, ca_cert, kubernetes_token, username, password)
+ end
+end