In my previous post I introduced Kubernetes Pods, the basic building block of the Kubernetes ecosystem. In that post I discussed what a Pod is, how it fits into the Kubernetes system, and how to create, view, and delete a Pod. While it’s important to understand what a Pod is, it is not considered good practice to deploy Pods directly to perform tasks. This is because Naked Pods (Pods not bound to other objects) won’t be rescheduled in the event of a node failure. Instead, you should deploy higher-level abstractions that incorporate Pods.
In this post we’ll discuss Kubernetes Jobs, a higher-level abstraction for running batch processes. We’ll create Kubernetes Jobs to train a machine learning model and to perform batch inference.
What is a Job?
A Job is a Kubernetes controller that creates one or more Pods and ensures that a specified number of them successfully complete their workloads. When a Job is created, it creates and tracks the Pods that were specified in its configuration file. When a specified number of Pods complete, the Job is complete. If a Pod fails due to underlying node failure or reboot, the Job object will create a new Pod to take its place. This means that the application needs to handle the case when it’s restarted in a new Pod.
There are several different patterns that can be used to run Jobs. The simplest case is to create one Job that reliably runs one Pod to completion. The Job will only create additional Pods if a Pod fails. But Jobs can also support parallel processing. For instance, you can specify Pod completions, and the Job will ensure that Pods successfully terminate. You can even specify levels of parallelism so that multiple Pods can run at the same time or incorporate a work queue to set up a parallel processing system. In this case, a Job would launch multiple Pods, each which would pop data from the queue and perform some task.
By default, when a Job completes it and the Pods it created remain in the system. This allows users to view logs of the completed pods and to check diagnostic information. It is up to the user to delete Jobs. When a Job is deleted, all the pods it created are deleted as well.
Why Kubernetes Jobs are Ideal for Machine Learning Tasks
Jobs are perfect for running batch processes – processes that run for a certain amount of time before exiting. This makes Jobs ideal for many types of production machine learning tasks such as feature engineering, cross-validation, model training, and batch inference. For instance, you can create a Kubernetes Job that trains a machine learning model and persists the model and training metadata to external storage. Then you can create another Job to perform batch inference. The Job would create a Pod that fetches the pre-trained model from storage, loads both the model and data into memory, performs inference, and stores the predictions.
Using Kubernetes Jobs for Machine Learning Tasks
Let’s examine how to create, view, update, and delete Jobs.
We’ll create two Kubernetes Jobs. One Job will train a machine learning model and serialize the model. The other Job will load the trained model into memory and perform batch inference. Since the Jobs will create Pods (and hence containers) that are independent from one another, we will need an external storage layer to persist the model object. We’ll use Amazon S3 as our storage layer to persist the model. Therefore, our application logic will look like the following
- Training Job
- Load some training data into memory.
- Train a model.
- Persist the trained model to S3.
- Inference Job
- Load the trained model from S3.
- Load inference data into memory.
- Perform inference.
Let’s dive right in!
Note: This section assumes you have access to a Kubernetes cluster and have the kubectl command line client installed.
Docker Image for Kubernetes Machine Learning Jobs
Our Kubernetes Jobs will create Pods that run containers for performing model training and model inference. Since we are writing custom application logic, we need to build our own Docker image. Our Image will need some Python dependencies installed as well as the Python logic for model training and inference. Here is a Dockerfile for building that image:
FROM jupyter/scipy-notebook
RUN pip install awscli joblib boto3
RUN mkdir model
ENV MODEL_DIR=/home/jovyan/model
ENV MODEL_FILE=clf.joblib
ENV METADATA_FILE=metadata.json
ENV BUCKET_NAME=mlinproduction
COPY train.py ./train.py
COPY inference.py ./inference.py
We start with the jupyter/scipy-notebook as our base image. We then install some additional python libraries for serializing our model object and persisting the object to S3. Next we set several environment variables and finally we copy two python scripts into the image. We’ll discuss the train.py and inference.py scripts in the following sections. If you need a refresher on Docker syntax, check out my Docker for Machine Learning blog series!
In order to use this image in our Kubernetes cluster, we first need to build the image and push it to a registry. To build the image, we need to use the docker build
command:
$ docker build -t k8-model -f Dockerfile .
Sending build context to Docker daemon 11.78kB
Step 1/9 : FROM jupyter/scipy-notebook
---> 2fb85d5904cc
Step 2/9 : RUN pip install awscli joblib boto3
---> Using cache
---> 1c2dade95b41
Step 3/9 : RUN mkdir model
---> Using cache
---> 2fc99e33c252
Step 4/9 : ENV MODEL_DIR=/home/jovyan/model
---> Using cache
---> 7a0b58174a90
Step 5/9 : ENV MODEL_FILE=clf.joblib
---> Using cache
---> 59a14aa21638
Step 6/9 : ENV METADATA_FILE=metadata.json
---> Using cache
---> 1589a321f94f
Step 7/9 : ENV BUCKET_NAME=mlinproduction
---> Using cache
---> ef9d4d3b0713
Step 8/9 : COPY train.py ./train.py
---> Using cache
---> 9f1b80232a9b
Step 9/9 : COPY inference.py ./inference.py
---> 2d1fb06cd6ad
Successfully built 2d1fb06cd6ad
Successfully tagged k8-model:latest
Next we tag the image with the name of an image repository on the Docker Hub registry, and then push our Image to that registry.
$ docker tag k8-model:latest lpatruno/k8-model:latest
$ docker push lpatruno/k8-model:latest
The push refers to repository [docker.io/lpatruno/k8-model]
0409ca0952f1: Pushed
6060f4226f64: Layer already exists
c1dd84c720fe: Layer already exists
1b87593f54f7: Layer already exists
03de148dfb0a: Layer already exists
b0f3e4f91d7b: Layer already exists
d678676e139c: Layer already exists
f1c34378f44b: Layer already exists
3e989afdb948: Layer already exists
5d8e59e8fa3d: Layer already exists
d0fac854ebed: Layer already exists
4e4c852921cc: Layer already exists
6db4e45cf563: Layer already exists
b9c6b5375a6e: Layer already exists
ec7a5c783ba6: Layer already exists
305d55183e3e: Layer already exists
e4da5278aad5: Layer already exists
88fb11447873: Layer already exists
c3c9a296a12d: Layer already exists
69ff1caa4c1a: Layer already exists
e9804e687894: Layer already exists
e8482936e318: Layer already exists
059ad60bcacf: Layer already exists
8db5f072feec: Layer already exists
67885e448177: Layer already exists
ec75999a0cb1: Layer already exists
65bdd50ee76a: Layer already exists
latest: digest: sha256:a4d76d8e3708b76ff999f06d312714c43ec7228d39bc723caaab7f84b90541ba size: 5968
I’ve made the Image public so that anyone can use it. All you need to do is reference the image as lpatruno/k8-model.
Kubernetes Jobs for Machine Learning Model Training
Training a Machine Learning Model in Python
Now that we have build and published our Docker image, let’s take a look at the Python code in train.py to train a machine learning model.
import json
import os
import boto3
from joblib import dump
import matplotlib.pyplot as plt
import numpy as np
from sklearn import ensemble
from sklearn import datasets
from sklearn.utils import shuffle
from sklearn.metrics import mean_squared_error
MODEL_DIR = os.environ["MODEL_DIR"]
MODEL_FILE = os.environ["MODEL_FILE"]
METADATA_FILE = os.environ["METADATA_FILE"]
BUCKET_NAME = os.environ["BUCKET_NAME"]
MODEL_PATH = os.path.join(MODEL_DIR, MODEL_FILE)
METADATA_PATH = os.path.join(MODEL_DIR, METADATA_FILE)
# #############################################################################
# Load data
print("Loading data...")
boston = datasets.load_boston()
print("Splitting data...")
X, y = shuffle(boston.data, boston.target, random_state=13)
X = X.astype(np.float32)
offset = int(X.shape[0] * 0.9)
X_train, y_train = X[:offset], y[:offset]
X_test, y_test = X[offset:], y[offset:]
# #############################################################################
# Fit regression model
print("Fitting model...")
params = {'n_estimators': 500, 'max_depth': 4, 'min_samples_split': 2,
'learning_rate': 0.01, 'loss': 'ls'}
clf = ensemble.GradientBoostingRegressor(**params)
clf.fit(X_train, y_train)
train_mse = mean_squared_error(y_train, clf.predict(X_train))
test_mse = mean_squared_error(y_test, clf.predict(X_test))
metadata = {
"train_mean_square_error": train_mse,
"test_mean_square_error": test_mse
}
print("Serializing model to: {}".format(MODEL_PATH))
dump(clf, MODEL_PATH)
print("Serializing metadata to: {}".format(METADATA_PATH))
with open(METADATA_PATH, 'w') as outfile:
json.dump(metadata, outfile)
print("Moving to S3")
s3 = boto3.client('s3')
s3.upload_file(MODEL_PATH, BUCKET_NAME, MODEL_FILE)
This code starts by importing the necessary libraries and reading in the environment variables that were set in our Docker image. Next we load the boston housing dataset from scikit-learn and split the data into training and testing sets. We then fit a Gradient Boosted Tree on the data and calculate the training and testing mean squared error metric. Finally we serialize both the model and the metrics to separate files, and then upload the file containing the serialized model to S3. In order for the boto3 library to upload the file to S3, we need to ensure that the AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY environment variables are set. I’ll cover how to do that in the next section when we (finally) discuss how to create a Kubernetes Job.
Creating a Kubernetes Job
To create a Kubernetes Job, we need to create a yaml file containing the configuration data. Let’s walk through train.yaml, the config file for our model training Job:
apiVersion: batch/v1
kind: Job
metadata:
name: train-job
spec:
template:
spec:
containers:
- name: train-container
imagePullPolicy: Always
image: lpatruno/k8-model:latest
command: ["python3", "train.py"]
env:
- name: AWS_ACCESS_KEY_ID
value: ""
- name: AWS_SECRET_ACCESS_KEY
value: ""
restartPolicy: Never
backoffLimit: 0
This yaml file contains four top-level keys. The apiVersion specifies which version of the Kubernetes API to use. The kind field specifies which type of Kubernetes resource we wish to create. In this case, we are creating a Job object. The metadata field lists a set of labels, arbitrary attributes developers can attach to Kubernetes objects. The docs contain a recommended set of labels, but I would recommend appending your own machine learning specific metadata as well. The spec field specifies the characteristics you want the object to have. Every Kubernetes object must contain a spec field, but the format of the object spec is different for different objects (see the Kubernetes API Reference).
The .spec.template
field is a Pod template that specifies which Pods our Job should create. It has the same schema as a Pod, except that’s nested and doesn’t have an apiVersion or kind.
It’s worth going into some additional detail about the .spec.template.spec.containers section. This section list the containers we wish the Pod to run. We give each container a name and specify both the image we wish to use (which points to the image we pushed to the Docker Hub earlier), as well as the command we wish to run in the container (we want to run model training). The imagePullPolicy tells Kubernetes to always fetch the image from the registry rather than use a cached Image, and the env field lists the environment variables we wish to set in the Pod. I have omitted the values of the AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY environment variables, but you’ll need to set these if you wish to create the Job. The restartPolicy value determines whether failed containers should be restarted and the backoffLimit lists the number of times to retry a Job before considering the Job as failed.
To create the Job, simply run
$ kubectl create -f train.yaml
job.batch/train-job created
We can view the Job objects by running
$ kubectl get jobs
NAME COMPLETIONS DURATION AGE
train-job 1/1 9s 32s
In order to view the logs of the process, we first need to find the name of the Pod that the train-job Job launched. We can do this by running
$ kubectl get pods --selector=job-name=train-job
NAME READY STATUS RESTARTS AGE
train-job-s7bc4 0/1 Completed 0 114s
To view the logs of the Pod, simply run
$ kubectl logs train-job-s7bc4
Loading data...
Splitting data...
Fitting model...
Serializing model to: /home/jovyan/model/clf.joblib
Serializing metadata to: /home/jovyan/model/metadata.json
Moving to S3
Our model training Job succeeded and we now have a trained model object on S3!
You can delete a job with the kubectl delete
command. This will also delete any Pods created by the Job.
kubectl delete -f train.yaml
job.batch "train-job" deleted
Kubernetes Jobs for Machine Learning Batch Inference
Now that we’ve successfully created a Kubernetes Job to train a machine learning model and serialize the model object to S3, lets cover batch inference. We’ll be using the same Docker image for inference as we used for training, since we copied the inference.py script into the image during the build process.
Batch Inference in Python
Let’s briefly describe the inference.py Python script for performing batch inference.
import os
import boto3
from joblib import load
import numpy as np
from sklearn import datasets
from sklearn.utils import shuffle
MODEL_DIR = os.environ["MODEL_DIR"]
MODEL_FILE = os.environ["MODEL_FILE"]
METADATA_FILE = os.environ["METADATA_FILE"]
BUCKET_NAME = os.environ["BUCKET_NAME"]
MODEL_PATH = os.path.join(MODEL_DIR, MODEL_FILE)
METADATA_PATH = os.path.join(MODEL_DIR, METADATA_FILE)
def load_model():
s3 = boto3.resource('s3')
try:
s3.Bucket(BUCKET_NAME).download_file(MODEL_FILE, MODEL_PATH)
except Exception as e:
if e.response['Error']['Code'] == "404":
print("The object does not exist.")
else:
raise
return load(MODEL_PATH)
def get_data():
"""
Return data for inference.
"""
print("Loading data...")
boston = datasets.load_boston()
X, y = shuffle(boston.data, boston.target, random_state=13)
X = X.astype(np.float32)
offset = int(X.shape[0] * 0.9)
X_train, y_train = X[:offset], y[:offset]
X_test, y_test = X[offset:], y[offset:]
return X_test, y_test
print("Running inference...")
X, y = get_data()
# #############################################################################
# Load model
print("Loading model from: {}".format(MODEL_PATH))
clf = load_model()
# #############################################################################
# Run inference
print("Scoring observations...")
y_pred = clf.predict(X)
print(y_pred)
Again, this script starts by importing the necessary libraries and reading in the environment variables that were set in our Docker image. Next I define two functions. The load_model
function downloads the serialized model file from S3. As was the case during training, the boto3
methods will issue authentication errors unless the AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY environment variables are set. The get_data
function loads data for inference. In this case, I’ve loaded the same data using during the model training and validation processes. In a production setting, you would replace this method with some mechanism for retrieving new data that needs to be scored. Next I call these methods, generate predictions, and print these out to the screen. Again, in a production setting these scores would be persisted to some storage layer.
A Kubernetes Job for Batch Inference
Most of the configuration for the Kubernetes inference Job is the same as that of the training Job. Here is the new configuration file inference.yaml:
apiVersion: batch/v1
kind: Job
metadata:
name: inference-job
spec:
template:
spec:
containers:
- name: inference-container
imagePullPolicy: Always
image: lpatruno/k8-model:latest
command: ["python3", "inference.py"]
env:
- name: AWS_ACCESS_KEY_ID
value: ""
- name: AWS_SECRET_ACCESS_KEY
value: ""
restartPolicy: Never
backoffLimit: 0
The only fields that have changed are .metadata.name
, the name of the container, and the command to run within that container. The first two are metadata fields. The container command
field now calls the python script that performs inference.
To create the Job, simply run
$ kubectl create -f inference.yaml
job.batch/inference-job created
We can view the Job objects by running
$ kubectl get jobs
NAME COMPLETIONS DURATION AGE
inference-job 0/1 19s 19s
In order to view the logs of the process, we first need to find the name of the Pod that the inference-job Job launched. We can do this by running
$ kubectl get pods --selector=job-name=inference-job
NAME READY STATUS RESTARTS AGE
inference-job-mv82s 0/1 Completed 0 36s
To view the logs of the Pod, simply run
$ kubectl logs inference-job-mv82s
Running inference...
Loading data...
Loading model from: /home/jovyan/model/clf.joblib
Scoring observations...
[ 15.32448686 27.68741572 24.17609927 31.94786177 10.40786467
34.38871141 22.05210667 11.58265489 13.21049075 42.87157933
33.03218733 15.77635169 23.93521876 19.79260258 25.43466604
20.55132127 13.67733317 47.48979635 17.70069362 21.51806638
22.57388848 16.97645106 16.25503893 20.57862843 14.57438158
11.81385445 24.78353556 37.64333157 30.29062179 19.67713185
23.19310437 25.06569372 18.65459129 30.26701253 8.97905481
13.8130382 14.21123728 17.3840622 19.83840166 23.83861108
20.44820805 15.32433651 25.8157052 16.47533793 19.2214524
19.86928427 21.47113681 21.56443118 24.64517965 22.43665872
22.25160648]
Our inference Job successfully retrieved the model object from S3 and performed inference.
You can delete a job with the kubectl delete
command. This will also delete any Pods created by the Job.
kubectl delete -f inference_creds.yaml
job.batch "inference-job" deleted
Conclusion
Congratulations for making it all the way to the end of the post! We’ve covered all of the parts necessary to train machine learning models and generate predictions on Kubernetes! By using Kubernetes Jobs to perform model training and batch inference we guarantee that are processes are fault tolerant and resilient. If the processes or underlying nodes fail, Kubernetes will ensure that the jobs are rescheduled for us.
In our next post, we will discuss how to use Kubernetes CronJobs to schedule our training and inference processes to run on a recurring schedule. If you’d like to be notified when that post is published, sign up below and I’ll send you an email as soon as it’s ready!