Outlier ExampleΒΆ

architecture

In this example we will:

PrerequisitesΒΆ

This notebooks needs to be run in the tempo-examples conda environment defined below. Create from project root folder:

conda env create --name tempo-examples --file conda/tempo-examples.yaml

Project StructureΒΆ

!tree -P "*.py"  -I "__init__.py|__pycache__" -L 2
.
β”œβ”€β”€ artifacts
β”‚Β Β  β”œβ”€β”€ model
β”‚Β Β  β”œβ”€β”€ outlier
β”‚Β Β  └── svc
β”œβ”€β”€ k8s
β”‚Β Β  └── rbac
β”œβ”€β”€ README_files
β”œβ”€β”€ src
β”‚Β Β  β”œβ”€β”€ constants.py
β”‚Β Β  β”œβ”€β”€ data.py
β”‚Β Β  β”œβ”€β”€ outlier.py
β”‚Β Β  β”œβ”€β”€ tempo.py
β”‚Β Β  └── utils.py
└── tests
    └── test_tempo.py

9 directories, 6 files

Train ModelsΒΆ

  • This section is where as a data scientist you do your work of training models and creating artfacts.

  • For this example we train sklearn and xgboost classification models for the iris dataset.

import os
import logging
import numpy as np
import tempo

from tempo.utils import logger
from src.constants import ARTIFACTS_FOLDER

logger.setLevel(logging.ERROR)
logging.basicConfig(level=logging.ERROR)
from src.data import Cifar10
data = Cifar10()
(50000, 32, 32, 3) (50000, 1) (10000, 32, 32, 3) (10000, 1)

Download pretrained Resnet32 Tensorflow model for CIFAR10

!rclone --config ./rclone-gcs.conf copy gs://seldon-models/tfserving/cifar10/resnet32 ./artifacts/model

Download or train an outlier detector on CIFAR10 data

load_pretrained = True
if load_pretrained:  # load pre-trained detector
    !rclone --config ./rclone-gcs.conf copy gs://seldon-models/tempo/cifar10/outlier/cifar10 ./artifacts/outlier/cifar10
else:
    from src.outlier import train_outlier_detector
    train_outlier_detector(data, ARTIFACTS_FOLDER)

Create Tempo ArtifactsΒΆ

from src.tempo import create_outlier_cls, create_model, create_svc_cls

cifar10_model = create_model()
OutlierModel = create_outlier_cls()
outlier = OutlierModel()
Cifar10Svc = create_svc_cls(outlier, cifar10_model)
svc = Cifar10Svc()
Loading from /home/clive/work/mlops/fork-tempo/docs/examples/outlier/artifacts/outlier
# %load src/tempo.py
import json
import os

import numpy as np
from alibi_detect.base import NumpyEncoder
from src.constants import ARTIFACTS_FOLDER, MODEL_FOLDER, OUTLIER_FOLDER

from tempo.protocols.v2 import V2Protocol
from tempo.protocols.tensorflow import TensorflowProtocol
from tempo.serve.metadata import ModelFramework
from tempo.serve.model import Model
from tempo.serve.pipeline import PipelineModels
from tempo.serve.utils import model, pipeline, predictmethod


def create_outlier_cls():
    @model(
        name="outlier",
        platform=ModelFramework.Custom,
        protocol=V2Protocol(),
        uri="s3://tempo/outlier/cifar10/outlier",
        local_folder=os.path.join(ARTIFACTS_FOLDER, OUTLIER_FOLDER),
    )
    class OutlierModel(object):
        def __init__(self):
            from alibi_detect.utils.saving import load_detector

            model = self.get_tempo()
            models_folder = model.details.local_folder
            print(f"Loading from {models_folder}")
            self.od = load_detector(os.path.join(models_folder, "cifar10"))

        @predictmethod
        def outlier(self, payload: np.ndarray) -> dict:
            od_preds = self.od.predict(
                payload,
                outlier_type="instance",  # use 'feature' or 'instance' level
                return_feature_score=True,
                # scores used to determine outliers
                return_instance_score=True,
            )

            return json.loads(json.dumps(od_preds, cls=NumpyEncoder))

    return OutlierModel


def create_model():

    cifar10_model = Model(
        name="resnet32",
        protocol=TensorflowProtocol(),
        platform=ModelFramework.Tensorflow,
        uri="gs://seldon-models/tfserving/cifar10/resnet32",
        local_folder=os.path.join(ARTIFACTS_FOLDER, MODEL_FOLDER),
    )

    return cifar10_model


def create_svc_cls(outlier, model):
    @pipeline(
        name="cifar10-service",
        protocol=V2Protocol(),
        uri="s3://tempo/outlier/cifar10/svc",
        local_folder=os.path.join(ARTIFACTS_FOLDER, "svc"),
        models=PipelineModels(outlier=outlier, cifar10=model),
    )
    class Cifar10Svc(object):
        @predictmethod
        def predict(self, payload: np.ndarray) -> np.ndarray:
            r = self.models.outlier(payload=payload)
            if r["data"]["is_outlier"][0]:
                return np.array([])
            else:
                return self.models.cifar10(payload)

    return Cifar10Svc

Unit TestsΒΆ

  • Here we run our unit tests to ensure the orchestration works before running on the actual models.

# %load tests/test_tempo.py
import numpy as np
from src.tempo import create_model, create_outlier_cls, create_svc_cls


def test_svc_outlier():
    model = create_model()
    OutlierModel = create_outlier_cls()
    outlier = OutlierModel()
    Cifar10Svc = create_svc_cls(outlier, model)
    svc = Cifar10Svc()
    svc.models.outlier = lambda payload: {"data": {"is_outlier": [1]}}
    svc.models.cifar10 = lambda input: np.array([[0.2]])
    res = svc(np.array([1]))
    assert res.shape[0] == 0


def test_svc_inlier():
    model = create_model()
    OutlierModel = create_outlier_cls()
    outlier = OutlierModel()
    Cifar10Svc = create_svc_cls(outlier, model)
    svc = Cifar10Svc()
    svc.models.outlier = lambda payload: {"data": {"is_outlier": [0]}}
    svc.models.cifar10 = lambda input: np.array([[0.2]])
    res = svc(np.array([1]))
    assert res.shape[0] == 1
!python -m pytest tests/
============================= test session starts ==============================
platform linux -- Python 3.7.9, pytest-6.2.0, py-1.10.0, pluggy-0.13.1
rootdir: /home/clive/work/mlops/fork-tempo, configfile: setup.cfg
plugins: cases-3.4.6, cov-2.12.1, asyncio-0.14.0
collected 2 items                                                              

tests/test_tempo.py ..                                                   [100%]

=============================== warnings summary ===============================
../../../../../../anaconda3/envs/tempo-examples/lib/python3.7/site-packages/tensorflow/python/autograph/impl/api.py:22
  /home/clive/anaconda3/envs/tempo-examples/lib/python3.7/site-packages/tensorflow/python/autograph/impl/api.py:22: DeprecationWarning: the imp module is deprecated in favour of importlib; see the module's documentation for alternative uses
    import imp

../../../../../../anaconda3/envs/tempo-examples/lib/python3.7/site-packages/packaging/version.py:130
  /home/clive/anaconda3/envs/tempo-examples/lib/python3.7/site-packages/packaging/version.py:130: DeprecationWarning: Creating a LegacyVersion has been deprecated and will be removed in the next major release
    DeprecationWarning,

-- Docs: https://docs.pytest.org/en/stable/warnings.html
======================== 2 passed, 2 warnings in 4.69s =========================
Unresolved object in checkpoint: (root).encoder.fc_mean.kernel
Unresolved object in checkpoint: (root).encoder.fc_mean.bias
Unresolved object in checkpoint: (root).encoder.fc_log_var.kernel
Unresolved object in checkpoint: (root).encoder.fc_log_var.bias
A checkpoint was restored (e.g. tf.train.Checkpoint.restore or tf.keras.Model.load_weights) but not all checkpointed values were used. See above for specific issues. Use expect_partial() on the load status object, e.g. tf.train.Checkpoint.restore(...).expect_partial(), to silence these warnings, or use assert_consumed() to make the check explicit. See https://www.tensorflow.org/guide/checkpoint#loading_mechanics for details.
Unresolved object in checkpoint: (root).encoder.fc_mean.kernel
Unresolved object in checkpoint: (root).encoder.fc_mean.bias
Unresolved object in checkpoint: (root).encoder.fc_log_var.kernel
Unresolved object in checkpoint: (root).encoder.fc_log_var.bias
A checkpoint was restored (e.g. tf.train.Checkpoint.restore or tf.keras.Model.load_weights) but not all checkpointed values were used. See above for specific issues. Use expect_partial() on the load status object, e.g. tf.train.Checkpoint.restore(...).expect_partial(), to silence these warnings, or use assert_consumed() to make the check explicit. See https://www.tensorflow.org/guide/checkpoint#loading_mechanics for details.

Save Outlier and Svc EnvironmentsΒΆ

tempo.save(OutlierModel)
Collecting packages...
Packing environment at '/home/clive/anaconda3/envs/tempo-c4fe11aa-1cd6-43dd-9fab-0dcb4fca7a62' to '/home/clive/work/mlops/fork-tempo/docs/examples/outlier/artifacts/outlier/environment.tar.gz'
[########################################] | 100% Completed |  1min 21.6s
tempo.save(Cifar10Svc)
Collecting packages...
Packing environment at '/home/clive/anaconda3/envs/tempo-cfface3b-1080-47d2-a3b6-113db8e286e5' to '/home/clive/work/mlops/fork-tempo/docs/examples/outlier/artifacts/svc/environment.tar.gz'
[########################################] | 100% Completed | 16.1s

Test Locally on DockerΒΆ

Here we test our models using production images but running locally on Docker. This allows us to ensure the final production deployed model will behave as expected when deployed.

from tempo import deploy_local
remote_model = deploy_local(svc)
from src.utils import show_image
show_image(data.X_test[0:1])
remote_model.predict(payload=data.X_test[0:1])
png
array([[3.92254496e-09, 1.20455460e-11, 2.66011191e-09, 9.99992609e-01,
        2.52213306e-10, 5.40860242e-07, 6.75954425e-06, 4.75119076e-12,
        6.90874735e-09, 1.07275586e-11]])
from src.utils import create_cifar10_outlier

outlier_img = create_cifar10_outlier(data)
show_image(outlier_img)
remote_model.predict(payload=outlier_img)
png
array([], dtype=float64)
remote_model.undeploy()

Production Option 1 (Deploy to Kubernetes with Tempo)ΒΆ

  • Here we illustrate how to run the final models in β€œproduction” on Kubernetes by using Tempo to deploy

PrerequisitesΒΆ

Create a Kind Kubernetes cluster with Minio and Seldon Core installed using Ansible as described here.

!kubectl apply -f k8s/rbac -n production
secret/minio-secret configured
serviceaccount/tempo-pipeline unchanged
role.rbac.authorization.k8s.io/tempo-pipeline unchanged
rolebinding.rbac.authorization.k8s.io/tempo-pipeline-rolebinding unchanged
from tempo.examples.minio import create_minio_rclone
import os

create_minio_rclone(os.getcwd()+"/rclone-minio.conf")
tempo.upload(cifar10_model)
tempo.upload(outlier)
tempo.upload(svc)
from tempo.serve.metadata import SeldonCoreOptions
runtime_options = SeldonCoreOptions(**{
        "remote_options": {
            "namespace": "production",
            "authSecretName": "minio-secret"
        }
    })
from tempo import deploy_remote
remote_model = deploy_remote(svc, options=runtime_options)
from src.utils import show_image

show_image(data.X_test[0:1])
remote_model.predict(payload=data.X_test[0:1])
png
array([[3.92254496e-09, 1.20455460e-11, 2.66011191e-09, 9.99992609e-01,
        2.52213306e-10, 5.40860242e-07, 6.75954425e-06, 4.75119076e-12,
        6.90874735e-09, 1.07275586e-11]])
from src.utils import create_cifar10_outlier

outlier_img = create_cifar10_outlier(data)
show_image(outlier_img)
remote_model.predict(payload=outlier_img)
png
array([], dtype=float64)
remote_model.undeploy()

Production Option 2 (Gitops)ΒΆ

  • We create yaml to provide to our DevOps team to deploy to a production cluster

  • We add Kustomize patches to modify the base Kubernetes yaml created by Tempo

from tempo import manifest
from tempo.serve.metadata import SeldonCoreOptions
runtime_options = SeldonCoreOptions(**{
        "remote_options": {
            "namespace": "production",
            "authSecretName": "minio-secret"
        }
    })
yaml_str = manifest(svc, options=runtime_options)
with open(os.getcwd()+"/k8s/tempo.yaml","w") as f:
    f.write(yaml_str)
!kustomize build k8s
apiVersion: machinelearning.seldon.io/v1
kind: SeldonDeployment
metadata:
  annotations:
    seldon.io/tempo-description: ""
    seldon.io/tempo-model: '{"model_details": {"name": "cifar10-service", "local_folder":
      "/home/clive/work/mlops/fork-tempo/docs/examples/outlier/artifacts/svc", "uri":
      "s3://tempo/outlier/cifar10/svc", "platform": "tempo", "inputs": {"args": [{"ty":
      "numpy.ndarray", "name": "payload"}]}, "outputs": {"args": [{"ty": "numpy.ndarray",
      "name": null}]}, "description": ""}, "protocol": "tempo.kfserving.protocol.KFServingV2Protocol",
      "runtime_options": {"runtime": "tempo.seldon.SeldonKubernetesRuntime", "state_options":
      {"state_type": "LOCAL", "key_prefix": "", "host": "", "port": ""}, "insights_options":
      {"worker_endpoint": "", "batch_size": 1, "parallelism": 1, "retries": 3, "window_time":
      0, "mode_type": "NONE", "in_asyncio": false}, "ingress_options": {"ingress":
      "tempo.ingress.istio.IstioIngress", "ssl": false, "verify_ssl": true}, "replicas":
      1, "minReplicas": null, "maxReplicas": null, "authSecretName": "minio-secret",
      "serviceAccountName": null, "add_svc_orchestrator": false, "namespace": "production"}}'
  labels:
    seldon.io/tempo: "true"
  name: cifar10-service
  namespace: production
spec:
  predictors:
  - annotations:
      seldon.io/no-engine: "true"
    componentSpecs:
    - spec:
        containers:
        - name: classifier
          resources:
            limits:
              cpu: 1
              memory: 1Gi
            requests:
              cpu: 500m
              memory: 500Mi
    graph:
      envSecretRefName: minio-secret
      implementation: TEMPO_SERVER
      modelUri: s3://tempo/outlier/cifar10/svc
      name: cifar10-service
      serviceAccountName: tempo-pipeline
      type: MODEL
    name: default
    replicas: 1
  protocol: kfserving
---
apiVersion: machinelearning.seldon.io/v1
kind: SeldonDeployment
metadata:
  annotations:
    seldon.io/tempo-description: ""
    seldon.io/tempo-model: '{"model_details": {"name": "outlier", "local_folder":
      "/home/clive/work/mlops/fork-tempo/docs/examples/outlier/artifacts/outlier",
      "uri": "s3://tempo/outlier/cifar10/outlier", "platform": "custom", "inputs":
      {"args": [{"ty": "numpy.ndarray", "name": "payload"}]}, "outputs": {"args":
      [{"ty": "builtins.dict", "name": null}]}, "description": ""}, "protocol": "tempo.kfserving.protocol.KFServingV2Protocol",
      "runtime_options": {"runtime": "tempo.seldon.SeldonKubernetesRuntime", "state_options":
      {"state_type": "LOCAL", "key_prefix": "", "host": "", "port": ""}, "insights_options":
      {"worker_endpoint": "", "batch_size": 1, "parallelism": 1, "retries": 3, "window_time":
      0, "mode_type": "NONE", "in_asyncio": false}, "ingress_options": {"ingress":
      "tempo.ingress.istio.IstioIngress", "ssl": false, "verify_ssl": true}, "replicas":
      1, "minReplicas": null, "maxReplicas": null, "authSecretName": "minio-secret",
      "serviceAccountName": null, "add_svc_orchestrator": false, "namespace": "production"}}'
  labels:
    seldon.io/tempo: "true"
  name: outlier
  namespace: production
spec:
  predictors:
  - annotations:
      seldon.io/no-engine: "true"
    componentSpecs:
    - spec:
        containers:
        - args: []
          env:
          - name: MLSERVER_HTTP_PORT
            value: "9000"
          - name: MLSERVER_GRPC_PORT
            value: "9500"
          - name: MLSERVER_MODEL_IMPLEMENTATION
            value: tempo.mlserver.InferenceRuntime
          - name: MLSERVER_MODEL_NAME
            value: outlier
          - name: MLSERVER_MODEL_URI
            value: /mnt/models
          - name: TEMPO_RUNTIME_OPTIONS
            value: '{"runtime": "tempo.seldon.SeldonKubernetesRuntime", "state_options":
              {"state_type": "LOCAL", "key_prefix": "", "host": "", "port": ""}, "insights_options":
              {"worker_endpoint": "", "batch_size": 1, "parallelism": 1, "retries":
              3, "window_time": 0, "mode_type": "NONE", "in_asyncio": true}, "ingress_options":
              {"ingress": "tempo.ingress.istio.IstioIngress", "ssl": false, "verify_ssl":
              true}, "replicas": 1, "minReplicas": null, "maxReplicas": null, "authSecretName":
              "minio-secret", "serviceAccountName": null, "add_svc_orchestrator":
              false, "namespace": "production"}'
          image: seldonio/mlserver:0.3.2
          name: outlier
    graph:
      envSecretRefName: minio-secret
      implementation: TEMPO_SERVER
      modelUri: s3://tempo/outlier/cifar10/outlier
      name: outlier
      serviceAccountName: tempo-pipeline
      type: MODEL
    name: default
    replicas: 1
  protocol: kfserving
---
apiVersion: machinelearning.seldon.io/v1
kind: SeldonDeployment
metadata:
  annotations:
    seldon.io/tempo-description: ""
    seldon.io/tempo-model: '{"model_details": {"name": "resnet32", "local_folder":
      "/home/clive/work/mlops/fork-tempo/docs/examples/outlier/artifacts/model", "uri":
      "gs://seldon-models/tfserving/cifar10/resnet32", "platform": "tensorflow", "inputs":
      {"args": [{"ty": "numpy.ndarray", "name": null}]}, "outputs": {"args": [{"ty":
      "numpy.ndarray", "name": null}]}, "description": ""}, "protocol": "tempo.kfserving.protocol.KFServingV1Protocol",
      "runtime_options": {"runtime": "tempo.seldon.SeldonKubernetesRuntime", "state_options":
      {"state_type": "LOCAL", "key_prefix": "", "host": "", "port": ""}, "insights_options":
      {"worker_endpoint": "", "batch_size": 1, "parallelism": 1, "retries": 3, "window_time":
      0, "mode_type": "NONE", "in_asyncio": false}, "ingress_options": {"ingress":
      "tempo.ingress.istio.IstioIngress", "ssl": false, "verify_ssl": true}, "replicas":
      1, "minReplicas": null, "maxReplicas": null, "authSecretName": "minio-secret",
      "serviceAccountName": null, "add_svc_orchestrator": false, "namespace": "production"}}'
  labels:
    seldon.io/tempo: "true"
  name: resnet32
  namespace: production
spec:
  predictors:
  - annotations:
      seldon.io/no-engine: "true"
    graph:
      envSecretRefName: minio-secret
      implementation: TENSORFLOW_SERVER
      modelUri: gs://seldon-models/tfserving/cifar10/resnet32
      name: resnet32
      type: MODEL
    name: default
    replicas: 1
  protocol: tensorflow