Introduction

For many automated data processing tasks within the context of the Specialised Information Services (FID) at the University Library Frankfurt, we use the Python package luigi. This package proves especially useful when a task (e.g. the loading of data into a database) depends on the work of other tasks that have to run successfully, before the next task starts (e.g. first you need to download the data). luigi orchestrates all required tasks and their respective required task(s) and then processes everything for you. This approach makes the maintenance of tasks very easy, since you only have to add or remove required tasks from any task and luigi handles the rest for you, while you don’t have to worry about the computer science behind it too much. But although luigi takes a lot of mental load off of you, it also requires strategies to handle common situations that you may find yourself in.

How can…

  1. data quality and security be assured by a luigi task?
  2. luigi remember completed tasks?
  3. existing code (Python and non-Python) be included in luigi tasks?
  4. secrets be handled in luigi?

In this post, we want to present some of our ideas that we apply to our luigi tasks and data processing in this context. These ideas should not necessarily be considered best practice, but may serve as a guideline for your own ideas.

The luigi developers also shared a whole lot of examples in the official repository. These examples cover far more ground than we can in a single post.

Data Validation

Data Validation is an important factor in processing data from external sources. It serves mainl two purposes:

  1. Make sure that a field in your data contains the data in a format as it is expected by the later processes (e.g. an ISO-date formatting for dates).
  2. Reduce the attack surface for Code injections.

There are several types of data validation, two of which are especially relevant to the kind of things we work on: Formal data validation and content centered data validation.

During formal validation, we make sure that the data at hand conforms (roughly) to what we expect of it in terms of its formal structure and layout. Checking a downloaded MODS XML file against a formalized specification of that format, e.g. an XML Schema file would be an example of formal validation. This way, we make sure that for example only XML tags which are part of the specification are used and that these tags are nested in a permitted way. As a pre-processing step, formal validation contributes much to the success of the actual data processing steps and helps preventing the process from terminating before its completed, which can be especially annoying in time consuming tasks. As a post-processing step, formal validation helps to ascertain the quality of data exports and to safe others from having issues in processing the data, since the exported data obey a specific standard or schema.

Content centered validation, on the other hand, ensures that the content encoded in a specific format follows certain constraints. Is the content in the ISBN field a valid ISBN? Does the year field contain suspicious characters? Is the author called Bobby Tables? Content-centered validation helps, among others, to prevent security issues, with domain-specific constraints and with the over-all quality of the data, both for presentation on e.g. a web site and in data exports.

On a technical level, both variants are not that different to implement in luigi. Therefore, only an example for content validation is given.

Example: Validation with Pydantic

pydantic is an example for a python library which assists you in what we called ‘content-centered-validation’ above. It allows you to define a data model, where you define the (optional) attribute types of the data you expect.

1
2
3
4
5
6
from pydantic import BaseModel

class MyPointData(BaseModel):
    id: int
    longitude: float
    latitude: float

This will do the basic data validation for you, as specified by the type annotations. However, if your data is more complex (e.g. a value may be only in a specific range, like longitude and latitude), you can add a validator decorator.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
from pydantic import BaseModel, validator


class MyPointData(BaseModel):
    id: int
    longitude: float
    latitude: float

    @validator("latitude")
    def range_between_plus_and_minus_90_degree(cls, value: float) -> float:
        """Assures that the latitude is in the range of -90.0 and +90.0 ."""
        if -90.0 <= value <= 90.0:
            return value

        raise ValueError(
            f"The given latitude '{value}' is not in the range of -90 to +90."
        )

Here, we only show the validator for latitude, but you should get the idea.

Task Completion Checks

luigi does not remember if a given task was completed in a previous run. It only uses the complete() method of a task to check, if the specified task output files exists. If the output files do not exist, the task is not considered completed and will run as soon as possible. However, if a previous luigi run raised an exception and you need to restart the pipeline, you may not want to rerun all tasks that were already completed. Hence, you need a strategy to evaluate if a task has run successfully, even between runs of luigi.

The most obvious strategy is to simply let the task generate a file output that contains the generated data. If this file exists, this is the proof that the task must have completed successfully. This may look like this:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
import luigi


class DumpFile(luigi.Task):
    """Generates a data file."""

    def output(self):
        return luigi.LocalTarget("/tmp/my-file.txt")

    def run(self):
        with self.output().open("w") as f:
            f.write("This is a test!")

So, the file that was created in the run() method is also referenced by the output() method. When luigi checks if the task has run, it will call (indirectly) the output() method and check if the given file exits. If so, the task was run already and does not need to be re-run.

But what file should be written, when a task loads data into a database? It may just write a “checkpoint” file. In other words, it creates an empty file with an arbitrary unique name that will be written when the task ran successfully.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
import luigi


class LoadDataToDatabase(luigi.Task):
    """Generates a data file."""

    def output(self):
        return luigi.LocalTarget("/tmp/checkpoints/load-data-to-database.checkpoint")

    def run(self):
        # Load data into database

        # Create an empty checkpoint file
        self.output().open("w").close()

You may even consider writing additional data into that checkpoint file, that can be read by the next task.

If you have Redis running in the background, you can even go a more elegant way. luigi provides a luigi.contrib.RedisTarget, which can store a marker in Redis, when a task is completed. If provided with an expiration time, you have full control over the time a specific task is considered as completed. Similar features are implemented for e.g. PostgreSQL, MySQL, and more.

Integration of existing code

Existing Python code

As luigi is, first of all, a Python library, the ’luigi-fication’ of existing python code is relatively straight forward. That is assuming your code is laid out in a way, that facilitates re-use – which is conceptually quite similar to the structural pre-requisites for successful testability of your code. If the functionality that you want to control via luigi is encapsulated in reusable, self-contained functions or classes that expose all required features via their signature, chances are high that you can reuse them in a luigi script right away.

As a brief example, here is the general structure that we use for scripts that should be usable both as stand-alone CLI tools as well as from luigi:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#!/usr/bin/env python
# demo1.py
import argparse
import logging


def get_args():
    parser = argparse.ArgumentParser("Demo Script")
    parser.add_argument("in_file", help="Input file")
    parser.add_argument("out_file", help="Output file")
    return parser.parse_args()


def do_stuff(in_file, out_file, logger=None):
    # Luigi entry point
    # do stuff
    if logger is not None:
        logger.info("I'm running.")
    return


def main():
    args = get_args()
    logging.basicConfig(level=logging.INFO)
    result = do_stuff(args.in_file, args.out_file, logging)


if __name__ == "__main__":
    main()

The luigi task definition

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
# demo1_pipeline.py
import logging
import luigi
from demo1 import do_stuff


class DemoTask(luigi.Task):
    in_file = luigi.Parameter()
    out_file = luigi.Parameter()

    def run(self):
        logger = logging.getLogger("luigi-interface")
        do_stuff(in_file=self.in_file, out_file=self.out_file, logger=logger)

Existing Non-Python code

In case of non-Python code, you can make use of the subprocess module that allows you to spawn new processes, even if they are not Python processes. We use it in some FIDs to execute XSLT, perl or PHP scripts. In order to transform an XML document with Python, you could run the script as luigi task as described in the preceding subsection:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
# demo running XSLT as subprocess in Python
import subprocess
from pathlib import Path

ENCODING = "utf-8"

def xslmap(in_file, out_file, stylesheet, saxon_path):
    cmdline = ["java", "-jar", "-Xmx2048m", saxon_path,
        f"-s:{in_file}",
        f"-xsl:{stylesheet}",
        f"-o:{out_file}",
    ]
    log_file = str(Path(out_file).parent / f"{str(Path(out_file).with_suffix('.log'))}")
    
    with open(out_file, "w", encoding=ENCODING) as fh_stdout, open(
        log_file, "w", encoding=ENCODING
    ) as fh_stderr:
        subprocess.run(cmdline, stdout=fh_stdout, stderr=fh_stderr, check=True)
        
    return log_file

Secrets handling

It may be the case that a task needs to handle secrets like passwords or web tokens. It should be clear that you do not write them into your configuration file or (even worse) commit them to a versioning tool. Hence, we need to find a workaround in the luigi context, because luigi does not provide any functionality for this problem. The most professional way would be to have a service available that provides secret-as-a-service (e.g. AWS Secrets Manager). However, services like this are not always available.

Within the FID context, we use .env files to separate secrets from code. The purpose of .env files is to stay in the directory, where they are created. Hence, they are also listed in the .gitignore so that they are not accidentally committed to the repo. This is important, because the .env files contain all your secrets in plain text in the format:

1
2
MY_DB_PASSWORD=my-S3cretPassw0rd
SITE_API_TOKEN=123456789abcdefgh

Using a local Python installation

To import .env files in Python, the python dotenv library handles all the reading and loading of env files for you.

1
2
3
4
5
6
7
8
9
import os

from dotenv import load_dotenv

load_dotenv()

db_password = os.getenv("MY_DB_PASSWORD")
print(db_password)
# Output: "my-S3cretPassw0rd"

python-dotenv loads a .env file from the current directory. Provided with another file path, you can load any other .env you like. python-dotenv also provides a way to store all secrets in a dictionary instead of writing it in the environment, making this approach even more secure.

Using Docker Environment

If using Docker, you can use the swarm mode to enable proper secret handling. However, if you run single containers (even under Docker compose), you do not have this option. In the UB Labs team, we are currently in this situation, and hence also use .env files in this context.

Before moving on, you should know that, by the nature of this approach, all secrets available to a container can be seen from anywhere in the container. So, if this is a security concern for you, you may need to take another approach.

You can have multiple .env in the same repository. They could be name .env.website or .env.db. Now, single Docker containers can load one or multiple of them when configured in the docker-compose.yml file.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
services:
  luigi:
    env_file:
      - .env.website
      - .env.db
    build: 
      context: .
      target: production
      args:
        - DB_PASSWORD=$MY_DB_PASSWORD
        - SITE_API_TOKEN=$SITE_API_TOKEN
    image: luigi:latest

You see that we can now access the variables from the .env files in the args in the Docker build process. Docker handles all the file access and reading for us.

You should also be aware that, if you only want to use secrets in the build process that all args are stored in the history of the build image and hence can be accessed from the running container.

Wrap-Up

Although luigi solves many problems when creating a pipeline, it also has its limitations that you need to consider and tackle with the appropriate strategies for your use case. In an upcoming post, we will go into detail on one of our pipelines and how code can be structured when creating a luigi pipeline.