Back arrow
Go back to all blog posts
Automate your anomaly detection pipelines with Orchest and Clarify

Automate your anomaly detection pipelines with Orchest and Clarify

Time series data is everywhere: software telemetry, sensors and IoT devices, finance, and more. The interesting thing about time series is that they usually present some sort of seasonality or periodicity. When these repetition patterns are interrupted, broken, or they drift and evolve for any reason, we want to know about it: that’s the objective of time series anomaly detection. But detecting the anomalies once is not enough: we want to continuously monitor the time series for anomalies, annotate them for further inspection, and be notified when they occur.

In this blog post, you will learn how to use Orchest to continuously run a time series anomaly detection pipeline, and Clarify to store and annotate the time series data.

”It has been said that history repeats itself. This is perhaps not quite correct; it merely rhymes.”— Theodor Reik

Why Orchest and Clarify?

Orchest is an open-source data pipeline orchestrator that allows you to design your data pipelines in a visual and friendly way and scale them with ease. Orchest supports Python and other languages, has a flexible mechanism for declaring your project dependencies, and can run your pipelines periodically as batch jobs.

Clarify on the other hand is a platform for time series intelligence, that allows you to store and visualize time series data coming from different sources and collaboratively explore it with your team.

Both tools are a great match for your time series anomaly detection pipelines: in this blog post, you will read some time series demo data from Clarify in Orchest, train an anomaly detection model, continuously run that model on new batches of data, and write anomalies back to Clarify. The architecture will look as follows:

Requirements

  • (Optional) An IFTTT account

1. Preparing your time series data in Clarify

As a first step, load the Mystery Box demo dataset to your Clarify account. If you already chose it during the onboarding process you don’t have to do anything - if you didn’t or you’re not sure, head to your Admin panel, click on “Datasets”, “Browse Public Datasets”, filter the name by “Mystery box”, click on the row, and finally “Add items from dataset”. If you did everything correctly, you should see 11/11 items added.

Next, you will need an Item ID to fetch the time series data using the Clarify Python client. To look it up, navigate to “Items” on your Admin panel, filter the Data Source by “Traffic data”, and click on “Passing cars (E18 Frognerstranda v/SK)”. The Item ID will appear on the right panel: save it for later.

Finally, you will need to generate a credentials file to access the data using the API. To do that, head to “Integrations” on your Admin panel, click “Add integration”, then “Next”, type “API credentials” for the name, click “Create”, and then “Done”. On the right panel that opens automatically, navigate to the “API access” tab and turn on “Clarify namespace”: this enables data access for the integration. Lastly, click “+ Create” right below, type “Orchest” for the name, click “Create”, and download the resulting clarify-credentials.json file. You will need this in the next step!

2. Create your Orchest project and define the requirements

It is now time to go to your Orchest instance! You will be creating two pipelines: one to train the model that you will run on demand, and another one for detecting the anomalies that will run periodically.

To set everything up, proceed as follows:

  1. Create a new project called “clarify-timeseries-orchest”.
  1. Wait for the default Environment to build.
  1. Create a new pipeline called “Train”.
  1. Get into JupyterLab and add an environment.yml file with these contents:

-- CODE language-yaml --
channels:
- conda-forge
dependencies:
- pandas
- matplotlib
- scikit-learn
- statsmodels
- ipympl
- ipywidgets>=7.0,<8
- pip
- pip:
 - pyclarify~=0.4.0b3
 - adtk

  1. Go back to the Environment and modify your setup script to look like this:

-- CODE language-bash --
#!/bin/bash

mamba env update --file environment.yml -n base

  1. Create a /data/clarify-timeseries-orchest directory and upload the clarify-credentials.json file you downloaded from Clarify to it. This way you will make sure that you are not versioning sensitive data.
  1. Finally, add a new environment variable CLARIFY_CREDENTIALS to the Project containing the path of the JSON key. For that, navigate to “Projects”, click on the gear (⚙️) to open the settings, and add the environment variable.

Project files in Orchest are tracked under version control using git. This is a good moment to checkpoint your progress by making a commit, either using the JupyterLab Git UI or the integrated terminal.

3. Load the time series data into Orchest

You will need some boilerplate code to load the time series data from Clarify into Orchest. Create a notebook called load_timeseries_data.ipynb and verify that the credentials work:

-- CODE language-python --
import os
import datetime as dt
from pyclarify import Clarify as ClarifyClient, query
import orchest

credentials = os.environ["CLARIFY_CREDENTIALS"]
client = ClarifyClient(credentials)

Then, specify the Item ID you noted earlier, and add a variable that stores the minimum time of interest:

-- CODE language-python --
item_id = "..."
ts_start = "2021-08-10T00:00:00Z"

Next, write the query that will locate the desired time series from Clarify, using their Python client:

-- CODE language-python --
query_filter = query.Filter(fields={"id": query.In(value=[item_id])})
response = client.data_frame(
   filter=query_filter,
   gte=ts_start,  # minimum date
   lt=dt.datetime.today(),  # maximum date
)

And finally, extract the signal data and transform it to a pandas Series object:

-- CODE language-python --
signal = (
   response.result.data.to_pandas()
   [item_id]  # convert to Series
   .drop_duplicates()  # sanity check
   .asfreq("H")  # fill gaps
   .rename("traffic")  # use explicit name
)

You can now save the timeseries data as an output to the next step:

-- CODE language-python --
orchest.output(signal, name="timeseries")

As a small improvement, rather than hardcoding the Item ID inside the notebook, you can parametrize it using Pipeline parameters. It’s a good opportunity to make the filters more generic so that you can reuse the same notebook only to fetch the last few hours of data. For that, go back to the Pipeline Editor and write the following contents under the “Parameters” textbox:

-- CODE language-json --
{
 "item_id": "cc3kdebfgirgtavrbjn0",
 "ts_last_hours": null
}

And replace the item_id and ts_start variable assignments inside the notebook with these two lines:

-- CODE language-python --
ts_last_hours = orchest.get_step_param("ts_last_hours")

ts_start = (
   (dt.datetime.today() - dt.timedelta(hours=ts_last_hours))
   if ts_last_hours
   else dt.datetime(2021, 8, 10, 0)
)
item_id = orchest.get_step_param("item_id")

Ready to use this data for training!

4. Train an anomaly detection model in Orchest

Now it’s time to add another step to the Train pipeline to train the model for real. We will use ADTK, a Python library that provides a scikit-learn compatible API on top of statsmodels.

First of all, let’s visualize the time series data. Create a new notebook train_ad_model.ipynb and run the following code:

-- CODE language-python --
from adtk.visualization import plot
from adtk.detector import SeasonalAD

import joblib

import orchest

signal = orchest.get_inputs()["timeseries"]

Since ipympl is listed in the requirements, you can enable interactive matplotlib plots that will help you pan and zoom to explore the data more quickly:

⚠️ You might see Invalid response: 413 Request Entity Too Large if you add too many interactive plots. We are already aware of the problem. The solution is to clear the outputs or even temporarily disable interactive plots by running %matplotlib inline.

As you can see, there are some gaps in the time series, which will be problematic during model training. We have no other choice than to interpolate:

-- CODE language-python --
signal_interp = signal.interpolate("time")

Next, create a SeasonalAD model with the default hyperparameters and call its fit_detect method to retrieve the anomalies:

-- CODE language-python --
seasonal_ad = SeasonalAD()
anomalies = seasonal_ad.fit_detect(signal_interp)

And finally, visualize the time series with the detected anomalies:

-- CODE language-python --
plot(
   signal_interp,
   anomaly=anomalies,
   anomaly_color="red",
   anomaly_tag="marker",
   figsize=(10, 4),
)

You can spot anomalies between Christmas and New Year’s Eve, expected from traffic data, and some of the data gaps that were interpolated. Not bad for a few lines of code!

Now it’s a good time to save the trained model. Since you will be reusing it in a different pipeline, we will use joblib to persist the model to disk explicitly instead of the Orchest SDK data passing mechanism:

-- CODE language-python --
joblib.dump(seasonal_ad, "/data/clarify-timeseries-orchest/model.joblib")

Read on to apply this model to detect anomalies on schedule!

5. Run anomaly detection pipeline on new data

Create a new “Detect Anomalies” pipeline, and add the load_timeseries_data.ipynb notebook as a pipeline step. Yes: you will reuse the same notebook in two different pipelines! This time however, change the parametrization so that only the last 24 hours are fetched:

-- CODE language-json --
{
 "item_id": "cc3kdebfgirgtavrbjn0",
 "ts_last_hours": 24
}

Next, add another notebook named detect_anomalies.ipynb, connect it to the previous step, and load both the trained model and the data:

-- CODE language-python --
from adtk.visualization import plot
import joblib
import orchest

model = joblib.load("/data/clarify-timeseries-orchest/model.joblib")

signal = orchest.get_inputs()["timeseries"]

Since the model is already trained, you can call its .detect() method to perform the anomaly detection process. We will consider missing values anomalies too:

-- CODE language-python --
anomalies = model.detect(signal)
anomalies = anomalies.fillna(False) | anomalies.isnull()

And finally, send the results as outputs to the next steps. We will include a boolean flag that marks whether there are any anomalies or not, as well as the anomaly values themselves:

-- CODE language-python --
orchest.output(
   (anomalies.any(), signal.interpolate("time").loc[anomalies]),
   name="anomalies"
)

And here comes the interesting part: what to do with those anomalies?

6. Write back anomalies to Clarify

To see the anomalies in Clarify, you will need to create a new Item, which will be associated to the integration you added in the first step. You will then be able to combine both the data and the anomalies in the same timeline for easy visualization and annotation.

To proceed, choose an item name (for example traffic_anomalies), add a new notebook named write_anomalies_clarify.ipynb, and parametrize it as follows:

-- CODE language-json --
{
 "item_name": "traffic_anomalies"
}

Next, connect it to the previous step, and make it read the anomalies flag and the anomalies themselves:

-- CODE language-python --
import os
from pyclarify import Client as ClarifyClient, DataFrame as ClarifyDataFrame
import orchest

has_anomalies, anomalies = orchest.get_inputs()["anomalies"]
item_name = orchest.get_step_param("item_name")

Finally, you can wrap all the logic that writes the data back to Clarify in a conditional block so that it only executes if there are anomalies to write:

-- CODE language-python --
if has_anomalies:
   credentials = os.environ["CLARIFY_CREDENTIALS"]
   client = ClarifyClient(credentials)

   data = ClarifyDataFrame.from_pandas(anomalies.to_frame())
   response = client.insert(data)
   print(response.json())
else:
   print("No anomalies detected, skipping")

After the item is first published to your Clarify account, you will need to “publish” it to make it available. For that, navigate to your Clarify Admin panel, go to “Integrations”, and select the newly created item. You can now add some metadata to it and click the “Publish” button.

And finally, if you add both items to the same timeline, you will be able to visualize them both on Clarify. Neat!

7. Run the anomaly detection pipeline on schedule

Once you have the anomaly detection pipeline up and running, you are ready to run it on schedule. For that, you will make use of Orchest Jobs.

Navigate to “Jobs” in the interface, click on “+ Create job”, type “Detect anomalies (daily)” for the name, and choose the “Detect anomalies” pipeline. Next, pick “Recurring” under “Scheduling” and click on “Daily” (or type any other cron expression). Finally, click “Run job”.

From now onwards, the anomaly detection pipeline will run every day at midnight UTC and write any anomalies it detects to Clarify. Sweet!

Extra: Send a notification if anomalies are found

As an extra step, you can use Orchest to send a notification to any service of your liking when anomalies are found. For instance, you can use slack-sdk to send a Slack message to your channel, or post to a webhook that then triggers any other action.

As an example, you can create an IFTTT applet that sends a message to Slack when a certain URL is hit. This is some sample code:

-- CODE language-python --
import requests
import orchest

has_anomalies, anomalies = orchest.get_inputs()["anomalies"]
event_name = "orchest_webhook_incoming"
webhooks_key = os.environ["IFTTT_WEBHOOKS_KEY"]

if has_anomalies:
   result = requests.post(
       f"https://maker.ifttt.com/trigger/{event_name}"
       f"/json/with/key/{webhooks_key}",
       json={
           "anomalies_found": len(anomalies)
       },
   )
   result.raise_for_status()
   print(result.text)

And this is what the Slack message could look like:

The possibilities are endless!

Orchest is a pipeline orchestrator with first class support for scikit-learn, statsmodels, and other open source machine learning libraries. Import this example project on your Orchest Cloud containing the Jupyter notebooks with the code from this article.

Still no Orchest account? Get started for free!