Skip to content

aniket-mish/distributed-ml-system

Repository files navigation

building an end-to-end distributed machine learning system on kubernetes

shubham-dhage-gC_aoAjQl2Q-unsplash

distributed ml system enables training and processing large language models and building such a system will give you an edge in this modern era of modern deep learning.

every tech company that uses ml wants to serve their customers at scale. to enable this train the models in parallel by dividing the workload on different machines as data and the size of the models are getting huge. keeping expensive GPUs idle is not at all recommended. utilizing multi-GPU training and optimizing inferene can save costs and we can serve customers at scale.

so recently i was involved in a project where we developed a distributed scalable ml service that hundreds of thousands of customers. tbh we faced a lot of challenges and building a distributed system is very tough. i started working on this project for a deeper understanding of the distributed ml systems.

what are distributed systems and why do we use them?

distributed systems are a group of nodes that talk to each other to achieve a specific task, such as streaming movies across devices, search engines, etc.

how these complex models with millions or billions of parameters are trained and served? answer is distributed systems. they allow developers to handle massive datasets across multiple clusters, use automation tools, and benefit from hardware accelerations.

about the repository

this repository includes code and references to implement a scalable and reliable machine learning system. im automating machine learning tasks with kubernetes, argo workflows, kubeflow, and tensorflow. my aim is to construct machine learning pipelines that do data ingestion, distributed training, model serving, managing, and monitoring these workloads.

in this project im building an image classification end-to-end machine learning system.

steps involved are:

  1. setup
  2. data ingestion
  3. distributed training
  4. evaluation
  5. serving
  6. end-to-end workflow

setup

im using a macbook. in real-world people use cloud services like eks/gke. im installing tensorflow, docker desktop, kubectl, and k3d. im using conda as a package manager.

[1] im using tensorflow for data processing, model building and evaluation.

conda install tensorflow

[2] im using docker to create single- or multi-node clusters.

[3] im installing a cli utility called kubectl

brew install kubectl

[4] to use kubernetes on my machine, im installing k3d. its a lightweight wrapper to run k3s in docker. theres minikube which can use hyperkit instead of docker and which is recommended btw. i find k3d lean, memory efficient and simpler.

wget -q -O - https://raw.githubusercontent.com/rancher/k3d/main/install.sh | bash

lets create a cluster. im not gonna create a multiserver cluster but you can.

k3d cluster create dist-ml --image rancher/k3s:v1.27.12-k3s1

image

see the cluster info

kubectl cluster-info

see the nodes info

kubectl get nodes

image

[5] im also installing kubectx to easily switch between clusters and kubens for namespaces. this is a very handy utility.

brew install kubectx

[6] im installing kubeflow training operator.

to install the training operator, create a namespace. namespaces provide a mechanism for isolating groups of resources within a single cluster like a logical separation.

create a namespace called kubeflow.

kubectl create namespace kubeflow

switch the current context default to kubeflow.

kubens kubeflow

Note

i was getting an error couldn't get resource list for metrics.k8s.io/v1beta1: the server is currently unable to handle the request. after googling a bit and updating the metrics server deployment yaml and add hostNetwork: true after dnsPolicy, it started working again.

image

finally install kubeflow training operator using custom resources.

kubectl apply -k "github.com/kubeflow/training-operator/manifests/overlays/standalone?ref=v1.7.0"

image

[7] im using argo workflows to construct an end-to-end workflows.

k8s fundamentals

wtf is a pod?

this is the most atomic part of kubernetes ecosystem.

we can create a simple kubernetes pod using a yaml manifest file(you can name it as per your choice).

apiVersion: v1
kind: Pod
metadata:
  name: whalesay
spec:
  containers:
  - name: whalesay
    image: docker/whalesay:latest
    command: [cowsay]
    args: ["Hello world"]

next, submit the job to our cluster.

kubectl create -f hello-world.yaml

we can see the status of the pod.

kubectl get pods -o wide

We can see what is being printed out in the container.

kubectl logs whalesay

if you want to get the details of a single pod with the raw yaml(there are other formats as well like json), then enter the following command.

kubectl get pod whalesay -o yaml

system architecture

this system includes a model training pipeline and a inference service. there are different design patterns to design the system, you can choose the one which fits their needs.

Screenshot 2023-06-30 at 12 50 13 PM

data ingestion

im using the fashion_mnist dataset which contains 70,000 grayscale images in 10 categories. the images contain different types of clothes at a resolution of 28x28 px.

image

60,000 images are used for training and 10,000 images are in test set.

create a pipeline that runs on one machine

the tf.data api enables you to build complex input pipelines from simple, reusable pieces. its very efficient and enable handling large amounts of data, reading from different data formats, and performing complex transformations.

im loading the fashion_mnist dataset into a tf.data.Dataset object and cast the images to float32 dtype. im normalizing the image pixel values from the [0, 255] to the [0, 1] range. these are some standard practices. im keeping an in-memory cache to improve performance. lets also shuffle the training data to add some randomness.

import tensorflow_datasets as tfds
import tensorflow as tf

def mnist_dataset():
    BUFFER_SIZE = 10000
    def scale(image, label):
        image = tf.cast(image, tf.float32)
        image /= 255
        return image, label
    datasets, info = tfds.load(name='mnist', with_info=True, as_supervised=True)
    mnist_train = datasets['train']
    return mnist_train.map(scale).cache().shuffle(BUFFER_SIZE)

im using tensorflow_datasets module to load the dataset. the above piece of code gives a shuffled dataset where each element consists of images and labels.

create a distributed data pipeline

to consume the dataset in a distributed fashion lets use the same function with some tweaks. to train a model on multiple GPUs and use the extra computing power effectively, just increase the batch size.

Tip

use the largest batch size that fits the GPU memory

theres a MirroredStrategy() that can be used to train on a single machine with multiple GPUs. but theres a way to distribute training on multiple machines in a cluster(recommended and my goal), then MultiWorkerMirroredStrategy() strategy is a way to go.

strategy = tf.distribute.MultiWorkerMirroredStrategy()
BATCH_SIZE_PER_REPLICA = 64
BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync

the num_replicas_in_sync equals the number of devices that are used in the all-reduce operation. use the tf.distribute.MultiWorkerMirroredStrategy api and with the help of this strategy, a keras model that was designed to run on a single worker can seamlessly work on multiple workers with minimal code changes.

what actually happens under the hood?

  1. each GPU performs the forward pass on a different slice of the input data and computes the loss

  2. next each GPU compute the gradients based on the loss

  3. these gradients are then aggregated across all of the devices(using an all-reduce algorithm)

  4. the optimizer updates the weights using the reduced gradients thereby keeping the devices in sync

Note

pytorch has DDP and FSDP

im enabling automatic data sharding across workers by setting tf.data.experimental.AutoShardPolicy to AutoShardPolicy.DATA. this setting is needed to ensure convergence and performance. sharding means handing each worker a subset of the entire dataset.

now the final training workflow can be written as below

with strategy.scope():
    dataset = mnist_dataset().batch(BATCH_SIZE).repeat()
    options = tf.data.Options()
    options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA
    dataset = dataset.with_options(options)
    model = build_and_compile_cnn_model()

model.fit(dataset, epochs=3, steps_per_epoch=70)

lets create the neural net

now i've a distributed data ingestion and training component. now im creating the neural net.

heres a simple nn

def build_and_compile_cnn_model():
    print("Training a simple neural net")

    model = tf.keras.Sequential([
      tf.keras.layers.Flatten(input_shape=(28, 28)),
      tf.keras.layers.Dense(128, activation='relu'),
      tf.keras.layers.Dense(10)
    ])

    model.compile(optimizer='adam',
              loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
              metrics=['accuracy'])

    return model

instantiate the Adam optimizer. im using accuracy to evaluate the model and sparse categorical cross entropy as the loss function(remember we have 10 categories to predict).

lets define some necessary callbacks which will execute during model training.

  1. tf.keras.callbacks.ModelCheckpoint saves the model at a certain frequency, for example, after every epoch.
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt_{epoch}")

im defining the checkpoint directory to store the checkpoints and the name for the files. checkpoints are important to restore the weights if the model training stops due to some issues.

  1. tf.keras.callbacks.TensorBoard writes a log for TensorBoard, which allows you to visualize the graphs.
  2. tf.keras.callbacks.LearningRateScheduler schedules the learning rate to change after, for example, every epoch/batch.
def decay(epoch):
    if epoch < 3:
        return 1e-3
    elif epoch >= 3 and epoch < 7:
        return 1e-4
    else:
        return 1e-5
  1. PrintLR prints the learning rate at the end of each epoch.
class PrintLR(tf.keras.callbacks.Callback):
    def on_epoch_end(self, epoch, logs=None):
        print('\nLearning rate for epoch {} is {}'.format(        epoch + 1, model.optimizer.lr.numpy()))

now put all the components together.

callbacks = [
    tf.keras.callbacks.TensorBoard(log_dir='./logs'),
    tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_prefix,
                                       save_weights_only=True),
    tf.keras.callbacks.LearningRateScheduler(decay),
    PrintLR()
]

now i can finally train the model.

single_worker_model = build_and_compile_cnn_model()
single_worker_model.fit(dataset, epochs=3, steps_per_epoch=70, callbacks=callbacks)

im getting an accuracy of 94% on the training data. accuracy doesn't matter. remember im a building a system.

we've already implemneted the distributed model training

to insert the distributed training logic and train the model on multiple workers, use the MultiWorkerMirroredStrategy with keras(tf as backend).

some theory

there are different ways to do distributed training and data parallelism is the most common one. there are two common ways to do distributed training with data parallelism:

  1. Synchronous training, where the steps of training are synced across the workers and replicas. all workers train over different slices of input data in sync, and aggregating gradients at each step.

  2. Asynchronous training, where the training steps are not strictly synced. all workers are independently training over the input data and updating variables asynchronously. see parameter server training.

im using the MultiWorkerMirroredStrategy that does synchronous distributed training across multiple workers, each with potentially multiple GPUs. it replicates all variables and computations to each local device and uses distributed collective implementation (e.g. all-reduce) so that multiple workers can work together.

once its defined, i initiate the distributed input data pipeline and the model inside the strategy scope.

model saving and loading

to save the model use model.save. the saving destinations(temporary dirs) need to be different for each worker.

  • for workers nodes, save the model to a temporary directory
  • for the master, save the model to the provided directory

the temporary directories of the workers need to be unique to prevent errors. the model saved in all the directories is identical, and only the model saved by the chief should be referenced for restoring or serving.

im not saving the model to temporary directories as doing this will waste my laptops computing resources and memory. im determining which worker node is the master and save its model only.

to determine if the worker node is the master or not use the environment variable TF_CONFIG

an example configuration

tf_config = {
    'cluster': {
        'worker': ['localhost:12345', 'localhost:23456']
    },
    'task': {'type': 'worker', 'index': 0}
}

_is_worker_master is a function that inspects the cluster specs and current task type and returns True if the worker is the master and False otherwise.

def _is_worker_master():
    return TASK_INDEX == 0

tf_config = json.loads(os.environ.get('TF_CONFIG') or '{}')
TASK_INDEX = tf_config['task']['index']

if _is_worker_master():
    model_path = args.saved_model_dir
else:
    model_path = args.saved_model_dir + '/worker_tmp_' + str(TASK_INDEX)

multi_worker_model.save(model_path)

app containerization 📦

to containerize i've a python script called multi-worker-distributed-training.py that has all the models. now dockerize the app.

FROM python:3.9
RUN pip install tensorflow==2.12.0 tensorflow_datasets==4.9.2
COPY multi-worker-distributed-training.py /

then build the docker image

docker build -f Dockerfile -t kubeflow/multi-worker-strategy:v0.1 .
image

next, import the image to the k3d cluster as it cannot access the image registry.

k3d image import kubeflow/multi-worker-strategy:v0.1 --cluster dist-ml

image

while training a model in the pod, once the ops are completed or failed, the files in the pod are recycled by the kubernetes garbage collecter. this means all the model checkpoints are lost. now we don't have a model for serving.

to avoid this we have to use PersistentVolume(PV) and PersistentVolumeClaim(PVC).

a PersistentVolume (PV) is a piece of storage in the cluster that has been provisioned by an administrator or dynamically provisioned. its a resource in the cluster just like a node is a cluster resource. PVs are volume plugins like volumes but have a lifecycle independent of any individual Pod that uses the PV. this means the storage will persist and live even when the pods are deleted.

a PersistentVolumeClaim (PVC) is a request for storage by a user. pods consume node resources and PVCs consume PV resources. pods can request specific levels of resources (CPU and Memory). claims can request specific size and access modes (e.g., they can be mounted ReadWriteOnce, ReadOnlyMany, or ReadWriteMany).

next, create a PVC to submit a request for storage that will be used in worker pods to store the trained model. im requesting 1GB storage with ReadWriteOnce mode.

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: strategy-volume
spec:
  accessModes: [ "ReadWriteOnce" ]
  volumeMode: Filesystem
  resources:
    requests:
      storage: 1Gi
kubectl create -f multi-worker-pvc.yaml

next, define a TFJob(model training with tf) specification with the image we built before that contains the distributed training script.

theres a concept of deployments and the main difference between deployments and jobs is how they handle a pod that is terminated. a deployment is intended to be a "service", e.g. it should be up-and-running, so it will try to restart the pods its managing, to match the desired number of replicas. while a job is intended to execute and successfully terminate.

apiVersion: kubeflow.org/v1
kind: TFJob
metadata:
  name: multi-worker-training
spec:
  runPolicy:
    cleanPodPolicy: None
  tfReplicaSpecs:
    Worker:
      replicas: 2
      restartPolicy: Never
      template:
        spec:
          containers:
            - name: tensorflow
              image: kubeflow/multi-worker-strategy:v0.1
              imagePullPolicy: IfNotPresent
              command: ["python", "/multi-worker-distributed-training.py", "--saved_model_dir", "/trained_model/saved_model_versions/2/", "--checkpoint_dir", "/trained_model/checkpoint", "--model_type", "cnn"]
              volumeMounts:
                - mountPath: /trained_model
                  name: training
              resources:
                limits:
                  cpu: 500m
          volumes:
            - name: training
              persistentVolumeClaim:
                claimName: strategy-volume

pass saved_model_dir and checkpoint_dir to the container. the volumes field specifies the persistent volume claim and volumeMounts field specifies what folder to mount the files. the CleanPodPolicy in the TFJob spec controls the deletion of pods when a job terminates. the restartPolicy determines whether pods will be restarted when they exit.

submit the TFJob to our cluster and start our distributed model training.

kubectl create -f multi-worker-tfjob.yaml

image

we can see 2 pods running our distributed training as we've specified 2 workers.

  1. multi-worker-training-worker-0
  2. multi-worker-training-worker-1

we can also see the logs from the pod multi-worker-training-worker-0

kubectl logs multi-worker-training-worker-0

image

im training a model and stored it in the /saved_model_versions/1/ path. we can edit/update the code and resubmit the job. just delete the running job, rebuild the docker image, import it, and resubmit the job. these are the steps to remember everytime we change the code.

kubectl delete tfjob --all; docker build -f Dockerfile -t kubeflow/multi-worker-strategy:v0.1 .; k3d image import kubeflow/multi-worker-strategy:v0.1 --cluster dist-ml; kubectl create -f multi-worker-tfjob.yaml

next, evaluate the model's performance.

kubectl create -f predict-service.yaml

we finally have a trained model stored in the file path trained_model/saved_model_versions/2/.

kubectl exec --stdin --tty predict-service -- bin/bash

to see the evals, enter into a running container predict-service

image

and execute predict-service.py which takes the trained model and evaluates it on the test dataset

model selection

ive implemented a distributed model training component. in reality we might need to train multiple models and pick the top performeing model for inference.

to implement this lets create two more models.

this nn has batch norm layer and have more conv2d layers.

def build_and_compile_cnn_model_with_batch_norm():
    print("Training CNN model with batch normalization")
    model = models.Sequential()
    model.add(layers.Input(shape=(28, 28, 1), name='image_bytes'))
    model.add(layers.Conv2D(32, (3, 3), activation='relu'))
    model.add(layers.BatchNormalization())
    model.add(layers.Activation('sigmoid'))
    model.add(layers.MaxPooling2D((2, 2)))
    model.add(layers.Conv2D(64, (3, 3), activation='relu'))
    model.add(layers.BatchNormalization())
    model.add(layers.Activation('sigmoid'))
    model.add(layers.MaxPooling2D((2, 2)))
    model.add(layers.Conv2D(64, (3, 3), activation='relu'))
    model.add(layers.Flatten())
    model.add(layers.Dense(64, activation='relu'))
    model.add(layers.Dense(10, activation='softmax'))

    model.summary()

    model.compile(optimizer='adam',
                  loss='sparse_categorical_crossentropy',
                  metrics=['accuracy'])

    return model

im creating one more model with a dropout layer and some more conv2d layers.

def build_and_compile_cnn_model_with_dropout():
    print("Training CNN model with dropout")
    model = models.Sequential()
    model.add(layers.Input(shape=(28, 28, 1), name='image_bytes'))
    model.add(layers.Conv2D(32, (3, 3), activation='relu'))
    model.add(layers.MaxPooling2D((2, 2)))
    model.add(layers.Conv2D(64, (3, 3), activation='relu'))
    model.add(layers.MaxPooling2D((2, 2)))
    model.add(layers.Dropout(0.5))
    model.add(layers.Conv2D(64, (3, 3), activation='relu'))
    model.add(layers.Flatten())
    model.add(layers.Dense(64, activation='relu'))
    model.add(layers.Dense(10, activation='softmax'))

    model.summary()

    model.compile(optimizer='adam',
                  loss='sparse_categorical_crossentropy',
                  metrics=['accuracy'])

    return model

train these models by submitting three different TFJobs with an argument --model_type and --saved_model_dir

kubectl delete tfjob --all
kubectl apply -f multi-worker-tfjob.yaml

next, load the testing data and the trained model to evaluate its performance. the model with the highest accuracy score can be moved to a different folder and used for model serving.

def scale(image, label):
    image = tf.cast(image, tf.float32)
    image /= 255
    return image, label

best_model_path = ""
best_accuracy = 0

for i in range(3):
    model_path = "trained_models/saved_model_versions/" + str(i)
    model = tf.keras.models.load_model(model_path)

    datasets, info = tfds.load(name='mnist', with_info=True, as_supervised=True)
    mnist_test = datasets['test']
    ds = mnist_test.map(scale).cache().shuffle(BUFFER_SIZE).batch(64)
    loss, accuracy = model.evaluate(ds)

    if accuracy > best_accuracy:
      best_accuracy = accuracy
      best_model_path = model_path

dst = "trained_model/saved_model_versions/3"
shutil.copytree(best_model_path, dst)

add the above script to the Dockerfile, rebuild the image, and create a pod that runs the script for model selection.

apiVersion: v1
kind: Pod
metadata:
  name: model-selection
spec:
  containers:
  - name: predict
    image: kubeflow/multi-worker-strategy:v0.1
    command: ["python", "/model-selection.py"]
    volumeMounts:
    - name: model
      mountPath: /trained_model
  volumes:
  - name: model
    persistentVolumeClaim:
      claimName: strategy-volume

image

Inference

ive implemented distributed training and model selection compoenents. next im creating a model serving component. thin component takes the trained model from trained_model/saved_model_versions/3.

the inference service should be very high performant and robut. im not considering cost at this moment.

create a single server model inference service

model_path = "trained_models/saved_model_versions/3"
model = tf.keras.models.load_model(model_path)
datasets, info = tfds.load(name='mnist', with_info=True, as_supervised=True)
mnist_test = datasets['test']
ds = mnist_test.map(scale).cache().shuffle(BUFFER_SIZE).batch(64)
loss, accuracy = model.predict(ds)

im using TFServing to expose the model as an endpoint service.

# Environment variable with the path to the model
os.environ["MODEL_PATH"] = f"{model_path}"

nohup tensorflow_model_server \
  --port=8500 \
  --rest_api_port=8501 \
  --model_name=model \
  --model_base_path=$MODEL_PATH

Nohup, short for no hang-up is a command in Linux systems that keeps processes running even after exiting the shell or terminal.

create a distributed model inference service

the above method works great if you're experimenting locally but there are more efficient ways for model serving.

tensorflow models contain a signature definition that defines the signature of a computation supported in a tensorflow graph. SignatureDefs aims to provide generic support to identify the inputs and outputs of a function. modify this input layer with a preprocessing function so that clients can use base64 encoded images, which is a standard way of sending images through a restful api. save a model with new serving signatures. the new signatures use python functions to handle preprocessing the image from a JPEG to a tensor. Refer

def _preprocess(bytes_inputs):
    decoded = tf.io.decode_jpeg(bytes_inputs, channels=1)
    resized = tf.image.resize(decoded, size=(28, 28))
    return tf.cast(resized, dtype=tf.uint8)

def _get_serve_image_fn(model):
    @tf.function(input_signature=[tf.TensorSpec([None], dtype=tf.string, name='image_bytes')])
    def serve_image_fn(bytes_inputs):
        decoded_images = tf.map_fn(_preprocess, bytes_inputs, dtype=tf.uint8)
        return model(decoded_images)
    return serve_image_fn

signatures = {
    "serving_default": _get_serve_image_fn(model).get_concrete_function(
        tf.TensorSpec(shape=[None], dtype=tf.string, name='image_bytes')
    )
}

tf.saved_model.save(multi_worker_model, model_path, signatures=signatures)

once theres an updated training script, rebuild the image and re-train the model.

use kserve for inference service. it enables serverless inferencing on k8s and provides performant, high-abstraction interfaces for common machine learning (ml) frameworks like tensorflow, pytorch, etc.

create an InferenceService yaml, which specifies the framework tensorflow and storageUri that is pointed to a saved tensorflow model.

apiVersion: "serving.kserve.io/v1beta1"
kind: InferenceService
metadata:
  name: tf-mnist
spec:
  predictor:
    model:
      modelFormat:
        name: tensorflow
      storageUri: "pvc://strategy-volume/saved_model_versions"

install kserve

curl -s "https://raw.githubusercontent.com/kserve/kserve/release-0.11/hack/quick_install.sh" | bash

apply the inference-service.yaml to create the InferenceService. by default, it exposes an HTTP/REST endpoint.

kubectl apply -f inference-service.yaml

Wait for the InferenceService to be in a ready state.

kubectl get isvc tf-mnist

run the prediction.

but first, determine and set the INGRESS_HOST and INGRESS_PORT. an ingress gateway is similar to an api gateway that load balances requests.

test it locally using port-forward

INGRESS_GATEWAY_SERVICE=$(kubectl get svc --namespace istio-system --selector="app=istio-ingressgateway" --output jsonpath='{.items[0].metadata.name}')
kubectl port-forward --namespace istio-system svc/${INGRESS_GATEWAY_SERVICE} 8080:80

in a different terminal window

export INGRESS_HOST=localhost
export INGRESS_PORT=8080

send a sample request to our inference service.

lets use the curl command

MODEL_NAME=tf-mnist
INPUT_PATH=@./mnist-input.json
SERVICE_HOSTNAME=$(kubectl get inferenceservice $MODEL_NAME -n kubeflow -o jsonpath='{.status.url}' | cut -d "/" -f 3)
curl -v -H "Host: ${SERVICE_HOSTNAME}" http://${INGRESS_HOST}:${INGRESS_PORT}/v1/models/$MODEL_NAME:predict -d $INPUT_PATH

im using the requests library

input_path = "mnist-input.json"

with open(input_path) as json_file:
    data = json.load(json_file)

response = requests.post(
    url="http://localhost:8080/v1/models/tf-mnist:predict",
    data=json.dumps(data),
    headers={"Host": "tf-mnist.kubeflow.example.com"},
)
print(response.text)

image

inference service is working as expected

auto-scale the inference service

at scale, to handle large amounts of traffic systems should have multiple servers(replicas). the kserve can autoscale based on the requests. the autoscaler can scale down to zero if the application is receiving no traffic or we can specify a minimum number of replicas that need to be there. the autoscaling.knative.dev/target sets a soft limit. other specs that can be configured like minReplicas, containerConcurrency, and scaleMetric, etc.

apiVersion: "serving.kserve.io/v1beta1"
kind: InferenceService
metadata:
  name: tf-mnist
  annotations:
    autoscaling.knative.dev/target: "1"
spec:
  predictor:
    model:
      modelFormat:
        name: tensorflow
      storageUri: "pvc://strategy-volume/saved_model_versions"

im installing Hey, a tiny program that sends some load to a web application. the Hey runs provided a number of requests in the provided concurrency level and prints stats.

# https://github.com/rakyll/hey
brew install hey
kubectl create -f inference-service.yaml

hey -z 30s -c 5 -m POST -host ${SERVICE_HOSTNAME} -D mnist-input.json "http://${INGRESS_HOST}:${INGRESS_PORT}/v1/models/tf-mnist:predict"
Screenshot 2023-10-03 at 4 29 11 PM

im sending traffic for 30 seconds with 5 concurrent requests. as the scaling target is set to 1 and we load the service with 5 concurrent requests, the autoscaler tries scaling up to 5 pods. theres always a cold start time initially to spin pods and may take longer(to pull the docker image) if is not cached on the node.

end-to-end workflow using argo workflows

lets stitch all the components together as shown in the system architecture. im using argo workflow to orchestrate the jobs we executed before in an end-to-end fashion. build a cicd workflow using dag(e.g. gitlab cicd workflows) on k8s. argo is the defacto engine for orchestration on k8s.

im installing argo workflows

kubectl create namespace argo
kubectl apply -n argo -f https://github.com/argoproj/argo-workflows/releases/download/v3.4.11/install.yaml

im creating an end-to-end workflow with 4 steps

  1. data ingestion
  2. distributed training
  3. model selection
  4. model serving
apiVersion: argoproj.io/v1alpha1
kind: Workflow                  # new type of k8s spec
metadata:
  generateName: tfjob-wf-    # name of the workflow spec
spec:
  entrypoint: tfjob-wf          # invoke the tfjob template
  templates:
  - name: tfjob-wf
    steps:
    - - name: data-ingestion-step
        template: data-ingestion-step
    - - name: distributed-tf-training-steps
        template: distributed-tf-training-steps
    - - name: model-selection-step
        template: model-selection-step
    - - name: create-model-serving-service
        template: create-model-serving-service
podGC:
  strategy: OnPodSuccess
volumes:
- name: model
  persistentVolumeClaim:
    claimName: strategy-volume

this is a multi-step workflow where all the steps are executed sequentially(double dash). PodGC describes how to delete completed pods. deleting completed pods can free the resources. use persistent storage to store the dataset and the trained models.

the first step is the data ingestion. theres a memoize spec to cache the output of this step. memoization reduces cost and execution time. since we do not want to download the data every time, we can cache it using the configMap. specify the key and name for the config-map cache. set maxAge to 1h that defines how long should the cache be considered valid.

- name: data-ingestion-step
  serviceAccountName: argo
  memoize:
  cache:
    configMap:
      name: data-ingestion-config
      key: "data-ingestion-cache"
    maxAge: "1h"
  container:
    image: kubeflow/multi-worker-strategy:v0.1
    imagePullPolicy: IfNotPresent
    command: ["python", "/data-ingestion.py"]

execute the model training steps in parallel

- name: distributed-training-step
  steps:
  - - name: cnn-model
      template: cnn-model
    - name: cnn-model-with-dropout
      template: cnn-model-with-dropout
    - name: cnn-model-with-batch-norm
      template: cnn-model-with-batch-norm

create a step to run distributed training with the model. create the TFJob using the manifest i created before. add the successCondition and failureCondition to indicate if the job is created. store the trained model in a different folder. create similar steps for the other two models configs.

- name: cnn-model
  serviceAccountName: training-operator
  resource:
    action: create
    setOwnerReference: true
    successCondition: status.replicaStatuses.Worker.succeeded = 2
    failureCondition: status.replicaStatuses.Worker.failed > 0
  manifests: |
    apiVersion: kubeflow.org/v1
    kind: TFJob
    metadata:
      generateName: multi-worker-training-
    spec:
      runPolicy:
        cleanPodPolicy: None
      tfReplicaSpecs:
        Worker:
          replicas: 2
          restartPolicy: Never
          template:
            spec:
              containers:
                - name: tensorflow
                  image: kubeflow/multi-worker-strategy:v0.1
                  imagePullPolicy: IfNotPresent
                  command: ["python", "/multi-worker-distributed-training.py", "--saved_model_dir", "/trained_model/saved_model_versions/1/", "--checkpoint_dir", "/trained_model/checkpoint", "--model_type", "cnn"]
                  volumeMounts:
                    - mountPath: /trained_model
                      name: training
                  resources:
                    limits:
                      cpu: 500m
              volumes:
                - name: training
                  persistentVolumeClaim:
                    claimName: strategy-volume

add the model selection step, similar to model-selection.yaml

- name: model-selection-step
  serviceAccountName: argo
  container:
    image: kubeflow/multi-worker-strategy:v0.1
    imagePullPolicy: IfNotPresent
    command: ["python", "/model-selection.py"]
    volumeMounts:
    - name: model
      mountPath: /trained_model

last step of the workflow is the model serving

- name: create-model-serving-service
  serviceAccountName: training-operator
  successCondition: status.modelStatus.states.transitionStatus = UpToDate
  resource:
    action: create
    setOwnerReference: true
    manifest: |
      apiVersion: "serving.kserve.io/v1beta1"
      kind: InferenceService
      metadata:
        name: tf-mnist
      spec:
        predictor:
          model:
            modelFormat:
              name: tensorflow
            storageUri: "pvc://strategy-volume/saved_model_versions"

run the workflow.

kubectl create -f workflow.yaml

logger

logging is an essential component of the machine learning system. it helps debug issues, analyze performance, troubleshoot errors, gather insights, and implement a feedback loop. fortunately, kserve makes it easy to create a service called message-dumper. it logs the request and the response. it has a unique identifier for the request and the response.

apiVersion: serving.knative.dev/v1
kind: Service
metadata:
  name: message-dumper
spec:
  template:
    spec:
      containers:
      - image: gcr.io/knative-releases/knative.dev/eventing-contrib/cmd/event_display
kubectl create -f message-dumper.yaml

include the logger which points to the message dumper URL in the InferenceService predictor.

logger:
  mode: all
  url: http://message-dumper.default/

read about the inference logger here.

Monitoring

🚧 WIP

mlFlow

🚧 WIP

GitOps

🚧 WIP

Summary

  1. distributed machine learning system is designed to train machine learning models on large datasets that cannot be processed on a single machine. there is a need to distribute the computation or training process to train complex models with millions or rather billions of parameters.
  2. k8s is a popular choice for building such complex distributed systems and thats the reason ive used it here.
  3. tensorflow provides several strategies for distributed training and have used MultiWorkerMirroredStrategy here.
  4. used kserve for building an inference service which can be autoscaled based on the traffic.
  5. argo workflows help build cicd pipeline.

todos

  • monitoring the metrics with prometheus and grafana
  • setup mlflow tracking server
  • add gitOps

Furthur reading and references

[1] Distributed Machine Learning Patterns by Yuan Tang

[2] Multi-worker training with Keras

[3] Custom training loop with Keras and MultiWorkerMirroredStrategy

[4] First InferenceService

[5] How to Train Really Large Models on Many GPUs?

[6] Kubernetes best practices