Introduction

At the UB JCS, we make extensive usage of the Python luigi framework for data engineering. The framework is capable of handling thousands of tasks, calculating non-circular task dependencies, and run over days. Additionally, it provides a convenient web control panel to see, e.g. the task dependencies in a tree diagram or start specific tasks.

Although luigi itself supports the user already by enforcing a very specific structure, there are still some things to consider when designing a data pipeline with luigi (for a general introduction, see in a previous post). In this post, I present ideas, that I learned while using luigi. Since luigi is a heavily object-oriented framework, some approaches in this post rely naturally on Software architecture patterns.

What would Mary Kondo do?

Although with luigi we are in the field of data science / data engineering, we still have to think like a programmer (or at least a little like Mary Kondo) when structuring our code. Hence, we want to have everything clean, separated, and modular.

After some years of working with luigi, I worked out a luigi project structure that looks like this:

1
2
3
4
5
6
7
8
9
src/
│  ├─ classes/        # All classes should live here
│  ├─ configuration/  # Luigi configurations
│  ├─ resources/      # Data or templates needed by scripts
│  ├─ scripts/        # Here live the scripts that do the heavy lifting
│  ├─ tasks/          # Here all tasks are defined.
│  ├─ luigi.ini       # The luigi config file
tests/                # Well... tests...
pyproject.toml

Of course, this is only the top-level structure. You are free (and encouraged) to further nest the given directories. And even if the project is still growing, with luigi and the given structure, it should be easy in the development process to push around files with only minor changes of the code.

The idea is that a task can call functions that are provided in scripts. However, only tasks import classes (e.g. factories, DAOs, serializers, abstract classes), initialize them and provide the initialised objects to a script (Fig. 1).

This structure allows to keep the details within the classes and use a common interface class to call different implementations of a common process. For example, each class could be a different data provider and the interface method collect_data() will gather the data from each data provider. But whether the data comes from a file or is crawled from the web, is left to the data provider class – the calling script is not aware of this. The interface only guarantees that the result will obey a specific format (e.g. a list of messenger objects). So you can add or remove new classes easily.

Three large fields entitled 'classes', 'scripts', and 'tasks' are shown. 'classes' holds a rounded box with dotted borders named 'Document Database Interface' with a label 'def write(value)'. Towards 'Document Database Interface' points an arrow from a box with solid borders saying 'SolrDatabase'. A second box with dotted borders in 'classes' is named 'DataproviderInterface' with a label 'def gather_data()'. An arrow points towards this box originating from a box with solid borders and named 'Provider #1'. The 'tasks' field holds a box saying 'class CollectDataFromAllProviders(luigi.Task)'. From this box two arrows split pointing at 'SolrDatabase' and 'Provider #1', respectively. The arrows are labeled 'import & initialization'. Another arrow labeled 'provide objects as parameters' points towards a box in the 'scripts' field. The box shows a small program snippet in Python that takes a list of providers and a database object. The program then iterates the provider list, calling the `gather_data()` method of the provider and hands the resulting data to the `write(data)` method of the database object.

Fig. 1: Separation of concerns in a fictional luigi task.

My recommendation follows the src project structure1, since it provides a nice separation of concerns in regard of code (everything in the src folder) and project management (pyproject.toml, tests, etc.). This structure has the “downside” (depending on perspective) that the code has to be installed before usage, which would not be necessary in a flat hierarchy. On the other side, the src structure assures that the tests only run on the code that is installed and you do not commit a broken package.

What are a task’s responsibilities?

The main interface that luigi provides are its tasks. A task is one step that does a specific job (like downloading a data file, iterating all data in a database, etc.). The three main responsibilities of a task are very nicely depicted by its three main methods: requires(), run(), and output(). So, a typical task may look like this:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
import pathlib

import luigi


class MyImportantTask(luigi.task):
    """Performes a very important process.

    The current implementation involves X.
    """

    def requires(self):
        return AnotherImportantTask()

    def output(self):
        output_file_path = pathlib.Path("/luigi/checkpoints") / "MyImportantTask.checkpoint"
        return luigi.LocalTarget(output_file_path)

    def run(self):
        # Processing task

Here, I will cover only my thoughts on requires() and run().

Manage resources

A task’s first responsibility is to manage all resources necessary for the process to run. This does mean that it first makes sure that all depending tasks in requires() have run. This also means that it gathers data, e.g. provided by the configuration. This does not mean that it aggregates the resources by itself!

The task knows where to look for the resources and provides them to the processes in the run() method. Making sure that a data file is at some place on the disk or that a database is filled, is the responsibility of the depending tasks (in the requires() method). I recommend to be very granular when creating required tasks, because:

  1. If a task fails, a rerun will again process everything in run(), while a once completed tasks stays completed
  2. Other tasks may also depend on the same resource(s) (i.e. tasks)
  3. It scales much better

Delegate the job

A task may lack both methods requires() and output(), but having a task without run() is rare (but can be useful). So, what should run() actually do? It is not running the job! The responsibility of the run() method is to delegate the job to the script that does the actual job. The script does its work and, when it is done, it returns back to the task. Whether the script returns a value or not, depends on the situation. Hence, the run() method orchestrates the job, it does not perform the actual work itself.

Additionally, the run() method also has to use the aggregated resources of the task to provide the script with the necessary input arguments. This approach implies that run() provides simple parameters like paths to data files or a dict to process. More importantly, the run() method should also provide more complex structures like database objects to the script. The creation of the database connection should never be in a script, but the script should instead rely on dependency injection (DI). DI grants the code more flexibility and reduces the cost of change.

DI enforces a design in which a script does not rely on a specific class but on an abstract interface. If you, at some point in the development process, decide to change the database for a specific task, you can simply change it in the task, while the script does not care about the change – it still works. Of course, this requires a level of abstraction in your class design, but decouples the code strongly.

Use a central configuration

In many instances of the luigi documentation, you will see that tasks define their own configuration parameters like so (from the luigi documentation):

1
2
3
class DailyReport(luigi.contrib.hadoop.JobTask):
    date = luigi.DateParameter(default=datetime.date.today())
    # ...

This may be useful in some cases, but applied in general, it becomes a mess very fast. Hence, I recommend to have configuration files in a configurations folder. A configuration file may contain one or multiple luigi configuration classes that handle all the data input from the configuration file.

Example configuration in a file src/configurations/general.py:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import pathlib

import luigi


class GeneralConfiguration(luigi.Config):
    """General settings that may be useful in many tasks."""

    date = luigi.DateParameter(default=datetime.date.today())
    checkpoint_path = luigi.Parameter(default="/luigi/checkpoints/")
    base_data_path = luigi.Parameter(default="/data/")
    mail_recipients = luigi.ListParameter(default=["me@company.com"])

    @property
    def resources_path() -> pathlib.Path:
        """The path of the base resource directory."""
        return pathlib.Path(self.base_data_path) / "resources"


class DocumentDatabaseConfiguration(luigi.Config):
    """Configuration data specific for the document database."""

    url = luigi.Parameter(default="http://document-db/api")
    username = luigi.Parameter(default="dummy")
    password = luigi.Parameter()


class GraphDatabaseConfiguration(luigi.Config):
    # and so on...

Using the property decorator in a configuration class allows you to orchestrate resources both in a central position and in a dynamic fashion.

In the example above, I put all configuration classes into a single configuration file. However, I recommend to separate them into multiple configuration files with meaningful names. In this case, it would make sense to put the two classes DocumentDatabaseConfiguration and GraphDatabaseConfiguration into a file src/configurations/databases.py.

With this approach, all your configuration lives in one place, and tasks can use it very conveniently:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
import luigi

from configurations.general import GeneralConfiguration


class MyTask(luigi.Task):
    """Runs another important task."""

    general_settings = GeneralConfiguration()

    def output(self):
        checkpoint_directory = self.general_settings.checkpoint_path
        # generate the output

    def run(self):
        resource_directory = self.general_settings.resources_path
        # run the process

Let luigi send you (lovely) messages

You can configure luigi to notify you via mail, if a run fails. This allows you to react immediately and fix the problem. This is the most obvious way to use luigi’s notifications.

Additionally, luigi allows you to send messages e. g. on successful runs. This makes especially sense, if luigi is triggered by a cron job. After luigi completed a specific task, it will notify you via mail – giving you the warm feeling that you are a mastermind of automation.

You can trigger the notification dispatch by using the on_success() method of the Task class:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
import luigi

from configurations.general import GeneralConfiguration
from tasks.commons import send_success_mail


class MyNotificatingTask(luigi.Task):
    """After it completes, it sends a notification."""

    general_settings = GeneralConfiguration()

    def on_success(self):
        luigi.notifications.send_email_smtp(
            sender="bot@company.com",
            subject="[SUCCESS] The job ran like a charm!",
            message="Description what was done...",
            recipients=self.general_settings.mail_recipients,
            image_png=None
        )

    def run():
        # some processing

You want to use on_success() (or on_failure(exception) to handle failing tasks) for sending notifications. There is also a method named complete(), but that is called by the luigi daemon to check before running the task, if the task needs to be run or not.

Be aware that you are not restricted to get mails. When importing e. g. the slack_sdk package, you can setup notifications via Slack.

Although in this example luigi’s send_email_smtp is called directly, I recommend to define a function in a file like src/tasks/commons.py to handle the reusable parts.

Aggregate tasks into groups

If you have multiply independent task working towards a common goal, it may come in handy to gather them in a luigi.WrapperTask. Let’s say, you have a machine learning task that depends on multiple other tasks that gather the data. You can then aggregate these required tasks in a common task:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
import luigi

class MachineLearningDataAggregration(luigi.WrapperTask):
    """Handles the aggregation of all necessary data for my machine learning model."""

    def requires(self):
        yield DownloadAllDogImages()
        yield DownloadAllCatImages()
        yield NormalizeImageSizesOfAllImages()
        yield GenerateImageLabels()

This is a valid task that can be referenced by another task as requirement. The luigi.WrapperTask will successfully complete, after all its depending tasks are completed.

Have some tests to check for circular imports

When writing luigi tasks and distributing them in multiple scripts, it can happen quite fast that luigi raises exceptions, because of circular imports. To check for this circumstance in your development environment and not in production, you should have at least a test for circular imports (but you should write tests for all your scripts and classes anyway).

This test only needs to import your main module that start your pipeline. For example, let’s assume, we have the luigi starting task in a file named run_luigi.py. A test for circular imports would look like this:

1
2
3
4
5
import importlib

class TestImports:
    def test_prevent_circular_imports(self):
        importlib.import_module('run_luigi')

When running the test, all imports will be done recursively and any circular import will cause an exception. The test should run in less than an second and saves you a lot of frustration.

Conclusion

I hope, I could give you some new ideas of how to work with luigi and how to apply its myriads of possibilities. If you have questions or comments, do not hesitate to contact us via ublabs@ub.uni-frankfurt.de .


  1. For more details, please see https://packaging.python.org/en/latest/discussions/src-layout-vs-flat-layout/, but also be referred to the Python Sample Project. And if you really want to go into the rabbit hole, see the Stack Overflow discussion↩︎