Optimizing CI/CD process for machine learning application in Kubernetes

Table of Contents

Problem statement

Sometimes companies are so focused on product development and feature release, which may result in flaws in security, velocity, and cost-optimization (especially for startups). Automat-it as a company of DevOps experts not only implements customer requests, we also analyze environments and prepare recommendations regarding possible improvements. One of our customers had a Docker image that contained a machine learning model. There were ~1Gb of application code, ~1Gb of Python dependencies + ~30Gb of ML model files that were pulled from Git LFS during the CI process. The application itself was a Kubernetes job, that was scheduled on-demand, a node was also provisioned on-demand and terminated once the Job was finished, so we could not benefit from Docker caching mechanism.

The main idea of Docker applications is its lightweight and ephemeral nature (the ability to be removed and recreated quickly) and be able to scale fast. The total image size was ~32Gb (with thousands of small files). It led to:

  • slow CI/CD (release velocity). ~1 hour for a CI/CD job.
  • slow scaling (meet end-users demand). New Job creation takes ~25 minutes on a new node.
  • the big cost of storage
  • the big cost of data transfer

According to ECR pricing and the usage, costs were as follows:
– the company stored 50 latest Docker images in ECR (lifecycle policy), so ECR had ~1600Gb of data. Every 1Gb costs $0.10 per month, so the total was 160$ per month
– the company also paid for Data Transfer OUT. The first 10Tb of traffic costs $0.09 per GB, so in case of having to stop/start many EKS nodes per day (once Job appears, we launch Node, once Job finished, we stop Node), if we have a new node, it does not have a cache and need to pull Docker image from ECR. Let’s assume that we have 1 job per hour (24 jobs per day), and every day Docker image will be pulled by Nodes 24 times, so it’s ~770 Gb of traffic. It would cost ~69$ per day or ~2000$ per month. We would get even more if we have more Jobs per day.

The initial CI/CD workflow took about 1 hour. The scheme is below:

Solution overview

The first question that we asked was about the ML model, who develops and trains it, and how often. We found out that this is open source ML model and it is updated just once per several months. It was just downloaded into the container and used in the ML inference process.
We decided, that in this case, we don’t need to download it whenever we build a new Docker image (at least once per day). We can include the ML model into the AMI of the EKS node. Every new EKS node will be created using the custom AMI that already contains ML model files and the folder can be mounted to Kubernetes Job at launch.

When we launch the Kubernetes job, we mount the directory from the node into the Job to the particular path. The application already knows that it should import libraries from the provided path. So the code still can use files from LFS.

As per our estimation, it will reduce the time of CI/CD workflow and costs by 90+ %:

  • 3 minutes instead of 1 hour (CI/CD)
  • 10$ per month instead of 160$ per month for ECR storage
  • 4$ per day instead of 69$ per day for ECR data transfer

AMI build automation

Once we implemented the above solution manually (prepared and tested the custom AMI for EKS nodes), the described issue was resolved, but we got a valid request from the customer “How will I do the same in the future when I need to update AMI with a new ML model”. Moreover, we need to minimize the downtime of the application during EKS node replacement. We have designed and implemented the following workflow (Jenkins job):

  1. The customer manually executes a Jenkins job when an AMI upgrade is required.
  2. Git checkout
  3. New AMI is built using Hashicorp Packer (code will be demonstrated below)
  4. New AMI is shared with a relevant AWS account.
  5. Redeploy EKS nodes using a new AMI (Python script will be demonstrated below)

5.1  Disable cluster-autoscaler (scale to 0)
5.2  Create a  new version of the  EKS node Launch Template with a  new AMI.
5.3  Get a list of current k8s nodes (old_nodes) and taint them in order to disallow the scheduling of new pods.
5.4  Doublescale-out the auto-scaling group in order to meet the end user’s requirements.
5.5  Enable scale-in protection for all instances.
5.6  Make sure that new k8s nodes are in a  ready state.
5.7  In a loop K8s Jobs state on the old k8s nodes in the specified namespace.
5.8  Drain the specific old node (where all jobs are completed)
5.9 Disable scale-in protection for the specific old instance, where no jobs are running.
5.10 ASG scale in “-1”, so we terminate the free node.
5.11 If any Job is still not completed within 2 hours, it’s forcibly deleted and the node is terminated.
5.12 Enable cluster-autoscaler again.

We have to disable the cluster-autoscaler in this workflow because it is able to remove nodes even with scale-in protection. Various jobs with different duration can run on the nodes, that’s why we continuously check every node and disable scale-in protection exactly for the node, where nothing is running, therefore we terminate only free nodes and the end-users are safe. As we tainted old nodes and created a new set of nodes (without taints), all jobs that appeared in the process of Nodes upgrade will be scheduled on the new node.

The packer code is:

packer {
  required_plugins {
    amazon = {
      version = ">= 1.1.0"
      source  = "github.com/hashicorp/amazon"
    }
  }
}

### Variables
variable "ami_prefix" {
  type    = string
  default = "packer-eks-ml"
}

variable "region" {
  type = string
  default = "us-east-2"
}

variable "source_ami" {
  type    = string
  default = "ami-0d439f6ee9d27dcf3" # EKS 1.22
}

variable "instance_type" {
  type    = string
  default = "m5.large"
}

variable "ssh_username" {
  type    = string
  default = "ec2-user"
}

variable "branch" {
  type    = string
  description = "Git branch to checkout inside the node"
}

variable "secret" {
  type    = string
  default = "mgmt/gh"
  description = "AWS Secrets Manager secret where GitHub token is stored"
}

variable "pvovision_iam_profile" {
  type    = string
  default = "packer_ec2-mgmt"
}

variable "jenkins_build" {
  type = number
  default = 0
}

### AMI
# https://www.packer.io/plugins/builders/amazon/ebs
source "amazon-ebs" "eks" {
  ami_name             = "${var.ami_prefix}-${var.jenkins_build}"
  instance_type        = var.instance_type
  region               = var.region
  source_ami           = var.source_ami
  ssh_username         = var.ssh_username
  iam_instance_profile = var.pvovision_iam_profile

  launch_block_device_mappings {
    device_name           = "/dev/xvda"
    delete_on_termination = true
    volume_type           = "gp2"
    volume_size           = 50
  }

  aws_polling {
     delay_seconds = 30
     max_attempts = 200
  }
}

build {
  name = "packer"
  sources = [
    "source.amazon-ebs.eks"
  ]
  provisioner "shell" {
    environment_vars =  [
      "SECRET=${var.secret}",
      "BRANCH=${var.branch}",
      "REGION=${var.region}"
    ]
    script = "provision.sh"
  }
}

Script provision.sh logs in to GitHub and pulls ML model into directory /home/ec2-user/models that will be mounted into pod later.

The python script is below:

import boto3
import datetime
import sys
import os
import re
import time

AWS_ACCOUNT = os.environ.get('AWS_ACCOUNT')
REGION = os.environ.get('REGION')
ENV = os.environ.get('ENV')
AMI_ID = os.environ.get('AMI_ID')
AMI_DESCRIPTION = os.environ.get('AMI_DESCRIPTION')

ROLE = f"arn:aws:iam::{AWS_ACCOUNT}:role/redeploy_nodes-{ENV}"
LT = f"eks_worker_lt-{ENV}"
ASG = f"eks_worker-{ENV}"
CLUSTER_NAME = f"eks-{ENV}"
KUBE_ROLE = f"arn:aws:iam::{AWS_ACCOUNT}:role/eks_admin_role-{ENV}"

K8S_JOBS = ['algopipeline', 'scraper']
K8S_JOBS_FILTERS = {'namespace': ENV, 'jobs': K8S_JOBS}

DEBUG = True

sts_client = boto3.client('sts')
assumed_role_object = sts_client.assume_role(
    RoleArn=ROLE,
    DurationSeconds=60 * 60 * 4,  # no less than timeout job + packer build ~1 hour
    RoleSessionName="redeploy")
credentials = assumed_role_object['Credentials']
config = {
    'region_name': REGION,
    'aws_access_key_id': credentials['AccessKeyId'],
    'aws_secret_access_key': credentials['SecretAccessKey'],
    'aws_session_token': credentials['SessionToken']
}

ec2_client = boto3.client('ec2', **config)
asg_client = boto3.client('autoscaling', **config)

os.environ["AWS_ACCESS_KEY_ID"] = credentials['AccessKeyId']
os.environ["AWS_SECRET_ACCESS_KEY"] = credentials['SecretAccessKey']
os.environ["AWS_DEFAULT_REGION"] = REGION
os.environ["AWS_SESSION_TOKEN"] = credentials['SessionToken']


def create_launch_template_version(client, launch_template, ami, description):
    filters = [{'Name': 'launch-template-name', 'Values': [launch_template]}]
    response = client.describe_launch_templates(Filters=filters)

    launch_template_id = response['LaunchTemplates'][0]['LaunchTemplateId']
    print("Launch Template ID:", launch_template_id)

    launch_template_latest_version = response['LaunchTemplates'][0]['LatestVersionNumber']
    print("The latest Launch Template version:", launch_template_latest_version)
    client.create_launch_template_version(
        LaunchTemplateId=launch_template_id,
        SourceVersion=str(launch_template_latest_version),
        VersionDescription=description,
        LaunchTemplateData={'ImageId': ami})


def run_cmd(cmd):
    output = os.popen(cmd).read()
    print_debug(f"{cmd}\n{output}\n", DEBUG)
    return output


def gen_k8s_config(region, cluster_name, kube_role):
    cmd = f"aws eks update-kubeconfig --region {region} --name {cluster_name} --role-arn {kube_role}"
    run_cmd(cmd)


def set_cluster_autoscaler(is_enabled=True):
    name = "deployment/cluster-autoscaler-aws-cluster-autoscaler"
    cmd = f"kubectl scale --replicas={int(is_enabled)} {name} -n kube-system"
    run_cmd(cmd)


def get_k8s_nodes():
    cmd = "kubectl get nodes --selector='pool=worker'"
    output = run_cmd(cmd)

    regex = r'^ip[^ ]+'
    nodes = re.findall(regex, output, re.MULTILINE)
    return nodes


def taint_k8s_nodes(nodes):
    for node in nodes:
        cmd = f"kubectl taint nodes {node} dedicated=worker:NoSchedule"
        run_cmd(cmd)


def get_asg_capacity(client, asg):
    response = client.describe_auto_scaling_groups(AutoScalingGroupNames=[asg])
    desired_instances = response['AutoScalingGroups'][0]['DesiredCapacity']
    max_instances = response['AutoScalingGroups'][0]['MaxSize']
    capacity = {'desired': desired_instances, 'max': max_instances}
    msg = ', '.join([f'{k} - {v}' for k, v in capacity.items()])
    print_debug(msg, DEBUG)
    return capacity


def scale_asg(client, capacity, asg):
    client.update_auto_scaling_group(
        AutoScalingGroupName=asg,
        MaxSize=capacity['max'],
        DesiredCapacity=capacity['desired'])
    msg = f"Scale ASG: desired - {capacity['desired']}, max - {capacity['max']}"
    print_debug(msg, DEBUG)


def get_asg_instances(client, asg):
    response = client.describe_auto_scaling_groups(AutoScalingGroupNames=[asg])
    instances = response['AutoScalingGroups'][0]['Instances']


def wait_asg_status(client, capacity_cur, capacity_exp, asg, timeouts):

    while capacity_cur < capacity_exp:
        capacity_cur = len(get_asg_instances(client, asg))

    cur_time = datetime.datetime.now()
    deadline_time = cur_time + datetime.timedelta(seconds=timeouts['deadline'])

    while cur_time <= deadline_time:
        cur_time = datetime.datetime.now()
        instances_status = [i['LifecycleState'] for i in get_asg_instances(client, asg)]

        set_status = set(instances_status)
        if len(set_status) == 1 and 'InService' in set_status:
            break

        time.sleep(timeouts['iteration'])
    else:
        msg = f"{timeouts['deadline']} is expired: some EC2 is not InService"
        sys.exit(msg)


def set_asg_protection(client, asg, instances, is_enabled=True):
    client.set_instance_protection(
        InstanceIds=instances,
        AutoScalingGroupName=asg,
        ProtectedFromScaleIn=is_enabled)

def get_k8s_nodes_status():
    cmd = "kubectl get nodes --selector='pool=worker'"
    output = run_cmd(cmd)

    regex = r'^(ip[^ ]+)\s+([^ ]+)'
    nodes = re.findall(regex, output, re.MULTILINE)
    return nodes


def wait_k8s_nodes_status(k8s_nodes, timeouts):
    cur_time = datetime.datetime.now()
    deadline_time = cur_time + datetime.timedelta(seconds=timeouts['deadline'])

    nodes = get_k8s_nodes_status()
    while cur_time <= deadline_time:
        nodes_status = set([node[1] for node in nodes])
        if len(nodes) == k8s_nodes:
            if len(nodes_status) == 1 and 'Ready' in nodes_status:
                break
        nodes = get_k8s_nodes_status()

        cur_time = datetime.datetime.now()
        time.sleep(timeouts['iteration'])
    else:
        msg = f"{timeouts['deadline']} is expired: kubernetes nodes are not ready"
        sys.exit(msg)


def drain_k8s_node(node):
    cmd = f'kubectl drain {node} --ignore-daemonsets'
    run_cmd(cmd)


def get_k8s_pods_status(filters, node):
    namespace = filters['namespace']
    cmd = f"kubectl get pods -n {namespace} -o wide --field-selector spec.nodeName={node}"
    output = run_cmd(cmd)

    jobs = '|'.join(filters['jobs'])
    regex = rf'^((?:{jobs})[^ ]+)\s+\d+/\d+\s+([^ ]+)'
    match = re.findall(regex, output, re.MULTILINE)
    return match


def replace_nodes(k8s_jobs_filters, k8s_nodes_old, timeouts):

    cur_time = datetime.datetime.now()
    deadline_time = cur_time + datetime.timedelta(seconds=timeout['deadline'])

        # check only old k8s nodes with Ready status
        nodes = get_k8s_nodes_status()
        k8s_nodes_ready = [node[0] for node in nodes if node[0] in k8s_nodes_old and node[1] == 'Ready']

        if not k8s_nodes_ready:
            break

        for node in k8s_nodes_ready:
            pods = get_k8s_pods_status(k8s_jobs_filters, node)
            statuses = [pod[1] for pod in pods if pod]
            if not pods or set(statuses) in ["Running"]: 
                drain_k8s_node(node)

                instances = get_instance_id(node).split()
                set_asg_protection(asg_client, ASG, instances, is_enabled=False)

                cur_capacity = get_asg_capacity(asg_client, ASG)
                new_capacity = {'desired': cur_capacity['desired'] - 1,
                                'max': cur_capacity['max']}
                scale_asg(asg_client, new_capacity, ASG)

        time.sleep(timeouts['iteration'])
        cur_time = datetime.datetime.now()
    else:
        msg = f"{timeouts['deadline']} timeout for jobs is expired"
        print(msg)

        nodes = get_k8s_nodes_status()
        k8s_nodes_ready = [node[0] for node in nodes if node[0] in k8s_nodes_old and node[1] == 'Ready']
        msg = "Force drain nodes:"
        print(msg, k8s_nodes_ready)
        for node in k8s_nodes_ready:
            drain_k8s_node(node)

        instances = [get_instance_id(node) for node in k8s_nodes_ready]
        msg = "Disable scale-in protection:"
        print(msg, instances)
        set_asg_protection(asg_client, ASG, instances, is_enabled=False)

        msg = "Force scale down ASG:"
        print(msg, capacity_old)
        scale_asg(asg_client, capacity_old, ASG)

        # wait until desired != count of instances in ASG
        msg = "Wait until old instances in ASG are terminated"
        print(msg)
        iteration_timeout = 5
        while len(instances) != capacity_old['desired']:
            instances = get_asg_instances(asg_client, ASG)
            time.sleep(iteration_timeout)


def get_instance_id(node):
    cmd = f'kubectl describe node/{node}'
    output = run_cmd(cmd)

    regex = r'^ProviderID:\s+aws:///[-a-zA-Z0-9]+/([^\n]+)'
    instance = re.findall(regex, output, re.MULTILINE)[0]
    return instance


def print_debug(msg, is_enabled=False):
    if is_enabled:
        print(msg)


if __name__ == '__main__':
    # create new Launch Template version with new AMI id
    print("Create new Launch Template version")
    create_launch_template_version(ec2_client, LT, AMI_ID, AMI_DESCRIPTION)

    # generate k8s config
    print("Generate kube-config file")
    gen_k8s_config(REGION, CLUSTER_NAME, KUBE_ROLE)

    # disable cluster autoscaler
    print("Disable cluster-autoscaler")
    set_cluster_autoscaler(is_enabled=False)

    # taint k8 nodes
    print("Taint current k8s nodes")
    k8s_old_nodes = get_k8s_nodes()
    taint_k8s_nodes(k8s_old_nodes)

    # scale up ASG (x2 max and desired capacity)
    print("Double scale up ASG")
    capacity_old = get_asg_capacity(asg_client, ASG)
    capacity_double = {'desired': capacity_old['desired'] * 2,
                       'max': capacity_old['max'] * 2}
    scale_asg(asg_client, capacity_double, ASG)

    # wait until ASG instances will be InService status
    print("Wait until ASG instances will be InService status")
    timeouts = {'iteration': 5, 'deadline': 300}
    wait_asg_status(asg_client, capacity_old['desired'], capacity_double['desired'], ASG, timeouts)

    # set enable scale-in protection for all ASG instances
    print("Enable scale-in protection for all ASG instances")
    instances_all = get_asg_instances(asg_client, ASG)
    instances_ids = [i['InstanceId'] for i in instances_all]
    set_asg_protection(asg_client, ASG, instances_ids)

    # wait until k8s nodes will be in Ready status
    print("Wait until k8s nodes be in Ready status")
    timeouts = {'iteration': 5, 'deadline': 300}
    wait_k8s_nodes_status(capacity_double['desired'], timeouts)

    # wait 2 hours or until pods controlled by jobs are completed
    print("Wait 2 hours or until pods controlled by jobs are completed")
    timeouts = {'deadline': 60 * 60 * 2, 'iteration': 60}
    replace_nodes(K8S_JOBS_FILTERS, k8s_old_nodes, timeouts)

    # Get back to the original MAX capacity
    scale_asg(asg_client, capacity_old, ASG)

    # disable scale-in protection for NEW instances
    print("Disable scale-in protection for all (rest) nodes")
    instances_all = get_asg_instances(asg_client, ASG)
    instances_ids = [i['InstanceId'] for i in instances_all]
    set_asg_protection(asg_client, ASG, instances_ids, is_enabled=False)

    # enable cluster-autoscaler
    print("Enable cluster-autoscaler")
    set_cluster_autoscaler(is_enabled=True)

Conclusion

In this post, we looked at the solution that helped our customer to reduce costs of ECR storage and traffic by ~90%, speed up the overall CI process from 1 hour to ~3 minutes, and allow Kubernetes jobs to start within 1 minute instead of 25 minutes that was before optimization. Hashicorp Packer was used for preparing a custom AMI for EKS nodes, Jenkins and Python were used to achieve zero-downtime nodes upgrade process.