Kubernetes Jobs for Machine Learning

kubernetes jobs for machine learning

In my previous post I introduced Kubernetes Pods, the basic building block of the Kubernetes ecosystem. In that post I discussed what a Pod is, how it fits into the Kubernetes system, and how to create, view, and delete a Pod. While it’s important to understand what a Pod is, it is not considered good practice to deploy Pods directly to perform tasks. This is because Naked Pods (Pods not bound to other objects) won’t be rescheduled in the event of a node failure. Instead, you should deploy higher-level abstractions that incorporate Pods.

In this post we’ll discuss Kubernetes Jobs, a higher-level abstraction for running batch processes. We’ll create Kubernetes Jobs to train a machine learning model and to perform batch inference.

What is a Job?

A Job is a Kubernetes controller that creates one or more Pods and ensures that a specified number of them successfully complete their workloads. When a Job is created, it creates and tracks the Pods that were specified in its configuration file. When a specified number of Pods complete, the Job is complete. If a Pod fails due to underlying node failure or reboot, the Job object will create a new Pod to take its place. This means that the application needs to handle the case when it’s restarted in a new Pod.

There are several different patterns that can be used to run Jobs. The simplest case is to create one Job that reliably runs one Pod to completion. The Job will only create additional Pods if a Pod fails. But Jobs can also support parallel processing. For instance, you can specify n>1 Pod completions, and the Job will ensure that n Pods successfully terminate. You can even specify levels of parallelism so that multiple Pods can run at the same time or incorporate a work queue to set up a parallel processing system. In this case, a Job would launch multiple Pods, each which would pop data from the queue and perform some task.

By default, when a Job completes it and the Pods it created remain in the system. This allows users to view logs of the completed pods and to check diagnostic information. It is up to the user to delete Jobs. When a Job is deleted, all the pods it created are deleted as well.

Why Kubernetes Jobs are Ideal for Machine Learning Tasks

Jobs are perfect for running batch processes – processes that run for a certain amount of time before exiting. This makes Jobs ideal for many types of production machine learning tasks such as feature engineering, cross-validation, model training, and batch inference. For instance, you can create a Kubernetes Job that trains a machine learning model and persists the model and training metadata to external storage. Then you can create another Job to perform batch inference. The Job would create a Pod that fetches the pre-trained model from storage, loads both the model and data into memory, performs inference, and stores the predictions.

Using Kubernetes Jobs for Machine Learning Tasks

Let’s examine how to create, view, update, and delete Jobs.

We’ll create two Kubernetes Jobs. One Job will train a machine learning model and serialize the model. The other Job will load the trained model into memory and perform batch inference. Since the Jobs will create Pods (and hence containers) that are independent from one another, we will need an external storage layer to persist the model object. We’ll use Amazon S3 as our storage layer to persist the model. Therefore, our application logic will look like the following

  • Training Job
    1. Load some training data into memory.
    2. Train a model.
    3. Persist the trained model to S3.
  • Inference Job
    1. Load the trained model from S3.
    2. Load inference data into memory.
    3. Perform inference.

Let’s dive right in!

Note: This section assumes you have access to a Kubernetes cluster and have the kubectl command line client installed.

Docker Image for Kubernetes Machine Learning Jobs

Our Kubernetes Jobs will create Pods that run containers for performing model training and model inference. Since we are writing custom application logic, we need to build our own Docker image. Our Image will need some Python dependencies installed as well as the Python logic for model training and inference. Here is a Dockerfile for building that image:

FROM jupyter/scipy-notebook

RUN pip install awscli joblib boto3

RUN mkdir model
ENV MODEL_DIR=/home/jovyan/model
ENV MODEL_FILE=clf.joblib
ENV METADATA_FILE=metadata.json
ENV BUCKET_NAME=mlinproduction

COPY train.py ./train.py
COPY inference.py ./inference.py

We start with the jupyter/scipy-notebook as our base image. We then install some additional python libraries for serializing our model object and persisting the object to S3. Next we set several environment variables and finally we copy two python scripts into the image. We’ll discuss the train.py and inference.py scripts in the following sections. If you need a refresher on Docker syntax, check out my Docker for Machine Learning blog series!

In order to use this image in our Kubernetes cluster, we first need to build the image and push it to a registry. To build the image, we need to use the docker build command:

$ docker build -t k8-model -f Dockerfile .
Sending build context to Docker daemon  11.78kB
Step 1/9 : FROM jupyter/scipy-notebook
 ---> 2fb85d5904cc
Step 2/9 : RUN pip install awscli joblib boto3
 ---> Using cache
 ---> 1c2dade95b41
Step 3/9 : RUN mkdir model
 ---> Using cache
 ---> 2fc99e33c252
Step 4/9 : ENV MODEL_DIR=/home/jovyan/model
 ---> Using cache
 ---> 7a0b58174a90
Step 5/9 : ENV MODEL_FILE=clf.joblib
 ---> Using cache
 ---> 59a14aa21638
Step 6/9 : ENV METADATA_FILE=metadata.json
 ---> Using cache
 ---> 1589a321f94f
Step 7/9 : ENV BUCKET_NAME=mlinproduction
 ---> Using cache
 ---> ef9d4d3b0713
Step 8/9 : COPY train.py ./train.py
 ---> Using cache
 ---> 9f1b80232a9b
Step 9/9 : COPY inference.py ./inference.py
 ---> 2d1fb06cd6ad
Successfully built 2d1fb06cd6ad
Successfully tagged k8-model:latest

Next we tag the image with the name of an image repository on the Docker Hub registry, and then push our Image to that registry.

$ docker tag k8-model:latest lpatruno/k8-model:latest
$ docker push lpatruno/k8-model:latest
The push refers to repository [docker.io/lpatruno/k8-model]
0409ca0952f1: Pushed 
6060f4226f64: Layer already exists 
c1dd84c720fe: Layer already exists 
1b87593f54f7: Layer already exists 
03de148dfb0a: Layer already exists 
b0f3e4f91d7b: Layer already exists 
d678676e139c: Layer already exists 
f1c34378f44b: Layer already exists 
3e989afdb948: Layer already exists 
5d8e59e8fa3d: Layer already exists 
d0fac854ebed: Layer already exists 
4e4c852921cc: Layer already exists 
6db4e45cf563: Layer already exists 
b9c6b5375a6e: Layer already exists 
ec7a5c783ba6: Layer already exists 
305d55183e3e: Layer already exists 
e4da5278aad5: Layer already exists 
88fb11447873: Layer already exists 
c3c9a296a12d: Layer already exists 
69ff1caa4c1a: Layer already exists 
e9804e687894: Layer already exists 
e8482936e318: Layer already exists 
059ad60bcacf: Layer already exists 
8db5f072feec: Layer already exists 
67885e448177: Layer already exists 
ec75999a0cb1: Layer already exists 
65bdd50ee76a: Layer already exists 
latest: digest: sha256:a4d76d8e3708b76ff999f06d312714c43ec7228d39bc723caaab7f84b90541ba size: 5968

I’ve made the Image public so that anyone can use it. All you need to do is reference the image as lpatruno/k8-model.

Kubernetes Jobs for Machine Learning Model Training

Training a Machine Learning Model in Python

Now that we have build and published our Docker image, let’s take a look at the Python code in train.py to train a machine learning model.

import json
import os

import boto3
from joblib import dump
import matplotlib.pyplot as plt
import numpy as np
from sklearn import ensemble
from sklearn import datasets
from sklearn.utils import shuffle
from sklearn.metrics import mean_squared_error


MODEL_DIR = os.environ["MODEL_DIR"]
MODEL_FILE = os.environ["MODEL_FILE"]
METADATA_FILE = os.environ["METADATA_FILE"]
BUCKET_NAME = os.environ["BUCKET_NAME"]
MODEL_PATH = os.path.join(MODEL_DIR, MODEL_FILE)
METADATA_PATH = os.path.join(MODEL_DIR, METADATA_FILE)


# #############################################################################
# Load data
print("Loading data...")
boston = datasets.load_boston()

print("Splitting data...")
X, y = shuffle(boston.data, boston.target, random_state=13)
X = X.astype(np.float32)
offset = int(X.shape[0] * 0.9)
X_train, y_train = X[:offset], y[:offset]
X_test, y_test = X[offset:], y[offset:]

# #############################################################################
# Fit regression model
print("Fitting model...")
params = {'n_estimators': 500, 'max_depth': 4, 'min_samples_split': 2,
          'learning_rate': 0.01, 'loss': 'ls'}
clf = ensemble.GradientBoostingRegressor(**params)

clf.fit(X_train, y_train)
train_mse = mean_squared_error(y_train, clf.predict(X_train))
test_mse = mean_squared_error(y_test, clf.predict(X_test))
metadata = {
    "train_mean_square_error": train_mse,
    "test_mean_square_error": test_mse
}

print("Serializing model to: {}".format(MODEL_PATH))
dump(clf, MODEL_PATH)

print("Serializing metadata to: {}".format(METADATA_PATH))
with open(METADATA_PATH, 'w') as outfile:  
    json.dump(metadata, outfile)

print("Moving to S3")
s3 = boto3.client('s3')
s3.upload_file(MODEL_PATH, BUCKET_NAME, MODEL_FILE)

This code starts by importing the necessary libraries and reading in the environment variables that were set in our Docker image. Next we load the boston housing dataset from scikit-learn and split the data into training and testing sets. We then fit a Gradient Boosted Tree on the data and calculate the training and testing mean squared error metric. Finally we serialize both the model and the metrics to separate files, and then upload the file containing the serialized model to S3. In order for the boto3 library to upload the file to S3, we need to ensure that the AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY environment variables are set. I’ll cover how to do that in the next section when we (finally) discuss how to create a Kubernetes Job.

Creating a Kubernetes Job

To create a Kubernetes Job, we need to create a yaml file containing the configuration data. Let’s walk through train.yaml, the config file for our model training Job:

apiVersion: batch/v1
kind: Job
metadata:
  name: train-job
spec:
  template:
    spec:
      containers:
      - name: train-container
        imagePullPolicy: Always
        image: lpatruno/k8-model:latest
        command: ["python3",  "train.py"]
        env:
        - name: AWS_ACCESS_KEY_ID
          value: ""
        - name: AWS_SECRET_ACCESS_KEY
          value: ""
      restartPolicy: Never
  backoffLimit: 0

This yaml file contains four top-level keys. The apiVersion specifies which version of the Kubernetes API to use. The kind field specifies which type of Kubernetes resource we wish to create. In this case, we are creating a Job object. The metadata field lists a set of labels, arbitrary attributes developers can attach to Kubernetes objects. The docs contain a recommended set of labels, but I would recommend appending your own machine learning specific metadata as well. The spec field specifies the characteristics you want the object to have. Every Kubernetes object must contain a spec field, but the format of the object spec is different for different objects (see the Kubernetes API Reference).

The .spec.template field is a Pod template that specifies which Pods our Job should create. It has the same schema as a Pod, except that’s nested and doesn’t have an apiVersion or kind.

It’s worth going into some additional detail about the .spec.template.spec.containers section. This section list the containers we wish the Pod to run. We give each container a name and specify both the image we wish to use (which points to the image we pushed to the Docker Hub earlier), as well as the command we wish to run in the container (we want to run model training). The imagePullPolicy tells Kubernetes to always fetch the image from the registry rather than use a cached Image, and the env field lists the environment variables we wish to set in the Pod. I have omitted the values of the AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY environment variables, but you’ll need to set these if you wish to create the Job. The restartPolicy value determines whether failed containers should be restarted and the backoffLimit lists the number of times to retry a Job before considering the Job as failed.

To create the Job, simply run

$ kubectl create -f train.yaml
job.batch/train-job created

We can view the Job objects by running

$ kubectl get jobs
NAME        COMPLETIONS   DURATION   AGE
train-job   1/1           9s         32s

In order to view the logs of the process, we first need to find the name of the Pod that the train-job Job launched. We can do this by running

$ kubectl get pods --selector=job-name=train-job
NAME              READY   STATUS      RESTARTS   AGE
train-job-s7bc4   0/1     Completed   0          114s

To view the logs of the Pod, simply run

$ kubectl logs train-job-s7bc4
Loading data...
Splitting data...
Fitting model...
Serializing model to: /home/jovyan/model/clf.joblib
Serializing metadata to: /home/jovyan/model/metadata.json
Moving to S3

Our model training Job succeeded and we now have a trained model object on S3!

You can delete a job with the kubectl delete command. This will also delete any Pods created by the Job.

kubectl delete -f train.yaml
job.batch "train-job" deleted

Kubernetes Jobs for Machine Learning Batch Inference

Now that we’ve successfully created a Kubernetes Job to train a machine learning model and serialize the model object to S3, lets cover batch inference. We’ll be using the same Docker image for inference as we used for training, since we copied the inference.py script into the image during the build process.

Batch Inference in Python

Let’s briefly describe the inference.py Python script for performing batch inference.

import os

import boto3
from joblib import load
import numpy as np
from sklearn import datasets
from sklearn.utils import shuffle


MODEL_DIR = os.environ["MODEL_DIR"]
MODEL_FILE = os.environ["MODEL_FILE"]
METADATA_FILE = os.environ["METADATA_FILE"]
BUCKET_NAME = os.environ["BUCKET_NAME"]
MODEL_PATH = os.path.join(MODEL_DIR, MODEL_FILE)
METADATA_PATH = os.path.join(MODEL_DIR, METADATA_FILE)

def load_model():
    s3 = boto3.resource('s3')
    try:
        s3.Bucket(BUCKET_NAME).download_file(MODEL_FILE, MODEL_PATH)
    except Exception as e:
        if e.response['Error']['Code'] == "404":
            print("The object does not exist.")
        else:
            raise
    return load(MODEL_PATH)

def get_data():
    """
    Return data for inference.
    """
    print("Loading data...")
    boston = datasets.load_boston()
    X, y = shuffle(boston.data, boston.target, random_state=13)
    X = X.astype(np.float32)
    offset = int(X.shape[0] * 0.9)
    X_train, y_train = X[:offset], y[:offset]
    X_test, y_test = X[offset:], y[offset:]
    return X_test, y_test

print("Running inference...")
X, y = get_data()

# #############################################################################
# Load model
print("Loading model from: {}".format(MODEL_PATH))
clf = load_model()

# #############################################################################
# Run inference
print("Scoring observations...")
y_pred = clf.predict(X)
print(y_pred)

Again, this script starts by importing the necessary libraries and reading in the environment variables that were set in our Docker image. Next I define two functions. The load_model function downloads the serialized model file from S3. As was the case during training, the boto3 methods will issue authentication errors unless the AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY environment variables are set. The get_data function loads data for inference. In this case, I’ve loaded the same data using during the model training and validation processes. In a production setting, you would replace this method with some mechanism for retrieving new data that needs to be scored. Next I call these methods, generate predictions, and print these out to the screen. Again, in a production setting these scores would be persisted to some storage layer.

A Kubernetes Job for Batch Inference

Most of the configuration for the Kubernetes inference Job is the same as that of the training Job. Here is the new configuration file inference.yaml:

apiVersion: batch/v1
kind: Job
metadata:
  name: inference-job
spec:
  template:
    spec:
      containers:
      - name: inference-container
        imagePullPolicy: Always
        image: lpatruno/k8-model:latest
        command: ["python3",  "inference.py"]
        env:
        - name: AWS_ACCESS_KEY_ID
          value: ""
        - name: AWS_SECRET_ACCESS_KEY
          value: ""
      restartPolicy: Never
  backoffLimit: 0

The only fields that have changed are .metadata.name, the name of the container, and the command to run within that container. The first two are metadata fields. The container command field now calls the python script that performs inference.

To create the Job, simply run

$ kubectl create -f inference.yaml
job.batch/inference-job created

We can view the Job objects by running

$ kubectl get jobs
NAME            COMPLETIONS   DURATION   AGE
inference-job   0/1           19s        19s

In order to view the logs of the process, we first need to find the name of the Pod that the inference-job Job launched. We can do this by running

$ kubectl get pods --selector=job-name=inference-job
NAME                  READY   STATUS      RESTARTS   AGE
inference-job-mv82s   0/1     Completed   0          36s

To view the logs of the Pod, simply run

$ kubectl logs inference-job-mv82s
Running inference...
Loading data...
Loading model from: /home/jovyan/model/clf.joblib
Scoring observations...
[ 15.32448686  27.68741572  24.17609927  31.94786177  10.40786467
  34.38871141  22.05210667  11.58265489  13.21049075  42.87157933
  33.03218733  15.77635169  23.93521876  19.79260258  25.43466604
  20.55132127  13.67733317  47.48979635  17.70069362  21.51806638
  22.57388848  16.97645106  16.25503893  20.57862843  14.57438158
  11.81385445  24.78353556  37.64333157  30.29062179  19.67713185
  23.19310437  25.06569372  18.65459129  30.26701253   8.97905481
  13.8130382   14.21123728  17.3840622   19.83840166  23.83861108
  20.44820805  15.32433651  25.8157052   16.47533793  19.2214524
  19.86928427  21.47113681  21.56443118  24.64517965  22.43665872
  22.25160648]

Our inference Job successfully retrieved the model object from S3 and performed inference.

You can delete a job with the kubectl delete command. This will also delete any Pods created by the Job.

kubectl delete -f inference_creds.yaml
job.batch "inference-job" deleted

Conclusion

Congratulations for making it all the way to the end of the post! We’ve covered all of the parts necessary to train machine learning models and generate predictions on Kubernetes! By using Kubernetes Jobs to perform model training and batch inference we guarantee that are processes are fault tolerant and resilient. If the processes or underlying nodes fail, Kubernetes will ensure that the jobs are rescheduled for us.

In our next post, we will discuss how to use Kubernetes CronJobs to schedule our training and inference processes to run on a recurring schedule. If you’d like to be notified when that post is published, sign up below and I’ll send you an email as soon as it’s ready!

Additional References

Leave a Reply

Your email address will not be published. Required fields are marked *