Luigi Patterns
--------------
Code Reuse
~~~~~~~~~~
One nice thing about Luigi is that it's super easy to depend on tasks defined in other repos.
It's also trivial to have "forks" in the execution path,
where the output of one task may become the input of many other tasks.
Currently, no semantics for "intermediate" output is supported,
meaning that all output will be persisted indefinitely.
The upside of that is that if you try to run X -> Y, and Y crashes,
you can resume with the previously built X.
The downside is that you will have a lot of intermediate results on your file system.
A useful pattern is to put these files in a special directory and
have some kind of periodical garbage collection clean it up.
Triggering Many Tasks
~~~~~~~~~~~~~~~~~~~~~
A convenient pattern is to have a dummy Task at the end of several
dependency chains, so you can trigger a multitude of pipelines by
specifying just one task in command line, similarly to how e.g. `make `_
works.
.. code:: python
class AllReports(luigi.WrapperTask):
date = luigi.DateParameter(default=datetime.date.today())
def requires(self):
yield SomeReport(self.date)
yield SomeOtherReport(self.date)
yield CropReport(self.date)
yield TPSReport(self.date)
yield FooBarBazReport(self.date)
This simple task will not do anything itself, but will invoke a bunch of
other tasks. Per each invocation, Luigi will perform as many of the pending
jobs as possible (those which have all their dependencies present).
You'll need to use :class:`~luigi.task.WrapperTask` for this instead of the usual Task class, because this job will not produce any output of its own, and as such needs a way to indicate when it's complete. This class is used for tasks that only wrap other tasks and that by definition are done if all their requirements exist.
Triggering recurring tasks
~~~~~~~~~~~~~~~~~~~~~~~~~~
A common requirement is to have a daily report (or something else)
produced every night. Sometimes for various reasons tasks will keep
crashing or lacking their required dependencies for more than a day
though, which would lead to a missing deliverable for some date. Oops.
To ensure that the above AllReports task is eventually completed for
every day (value of date parameter), one could e.g. add a loop in
requires method to yield dependencies on the past few days preceding
self.date. Then, so long as Luigi keeps being invoked, the backlog of
jobs would catch up nicely after fixing intermittent problems.
Luigi actually comes with a reusable tool for achieving this, called
:class:`~luigi.tools.range.RangeDailyBase` (resp. :class:`~luigi.tools.range.RangeHourlyBase`). Simply putting
.. code-block:: console
luigi --module all_reports RangeDailyBase --of AllReports --start 2015-01-01
in your crontab will easily keep gaps from occurring from 2015-01-01
onwards. NB - it will not always loop over everything from 2015-01-01
till current time though, but rather a maximum of 3 months ago by
default - see :class:`~luigi.tools.range.RangeDailyBase` documentation for this and more knobs
for tweaking behavior. See also Monitoring below.
Efficiently triggering recurring tasks
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
RangeDailyBase, described above, is named like that because a more
efficient subclass exists, :class:`~luigi.tools.range.RangeDaily` (resp. :class:`~luigi.tools.range.RangeHourly`), tailored for
hundreds of task classes scheduled concurrently with contiguousness
requirements spanning years (which would incur redundant completeness
checks and scheduler overload using the naive looping approach.) Usage:
.. code-block:: console
luigi --module all_reports RangeDaily --of AllReports --start 2015-01-01
It has the same knobs as RangeDailyBase, with some added requirements.
Namely the task must implement an efficient bulk_complete method, or
must be writing output to file system Target with date parameter value
consistently represented in the file path.
Backfilling tasks
~~~~~~~~~~~~~~~~~
Also a common use case, sometimes you have tweaked existing recurring
task code and you want to schedule recomputation of it over an interval
of dates for that or another reason. Most conveniently it is achieved
with the above described range tools, just with both start (inclusive)
and stop (exclusive) parameters specified:
.. code-block:: console
luigi --module all_reports RangeDaily --of AllReportsV2 --start 2014-10-31 --stop 2014-12-25
Propagating parameters with Range
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Some tasks you want to recur may include additional parameters which need to be configured.
The Range classes provide a parameter which accepts a :class:`~luigi.parameter.DictParameter`
and passes any parameters onwards for this purpose.
.. code-block:: console
luigi RangeDaily --of MyTask --start 2014-10-31 --of-params '{"my_string_param": "123", "my_int_param": 123}'
Alternatively, you can specify parameters at the task family level (as described :ref:`here `),
however these will not appear in the task name for the upstream Range task which
can have implications in how the scheduler and visualizer handle task instances.
.. code-block:: console
luigi RangeDaily --of MyTask --start 2014-10-31 --MyTask-my-param 123
.. _batch_method:
Batching multiple parameter values into a single run
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Sometimes it'll be faster to run multiple jobs together as a single
batch rather than running them each individually. When this is the case,
you can mark some parameters with a batch_method in their constructor
to tell the worker how to combine multiple values. One common way to do
this is by simply running the maximum value. This is good for tasks that
overwrite older data when a newer one runs. You accomplish this by
setting the batch_method to max, like so:
.. code-block:: python
class A(luigi.Task):
date = luigi.DateParameter(batch_method=max)
What's exciting about this is that if you send multiple As to the
scheduler, it can combine them and return one. So if
``A(date=2016-07-28)``, ``A(date=2016-07-29)`` and
``A(date=2016-07-30)`` are all ready to run, you will start running
``A(date=2016-07-30)``. While this is running, the scheduler will show
``A(date=2016-07-28)``, ``A(date=2016-07-29)`` as batch running while
``A(date=2016-07-30)`` is running. When ``A(date=2016-07-30)`` is done
running and becomes FAILED or DONE, the other two tasks will be updated
to the same status.
If you want to limit how big a batch can get, simply set max_batch_size.
So if you have
.. code-block:: python
class A(luigi.Task):
date = luigi.DateParameter(batch_method=max)
max_batch_size = 10
then the scheduler will batch at most 10 jobs together. You probably do
not want to do this with the max batch method, but it can be helpful if
you use other methods. You can use any method that takes a list of
parameter values and returns a single parameter value.
If you have two max batch parameters, you'll get the max values for both
of them. If you have parameters that don't have a batch method, they'll
be aggregated separately. So if you have a class like
.. code-block:: python
class A(luigi.Task):
p1 = luigi.IntParameter(batch_method=max)
p2 = luigi.IntParameter(batch_method=max)
p3 = luigi.IntParameter()
and you create tasks ``A(p1=1, p2=2, p3=0)``, ``A(p1=2, p2=3, p3=0)``,
``A(p1=3, p2=4, p3=1)``, you'll get them batched as
``A(p1=2, p2=3, p3=0)`` and ``A(p1=3, p2=4, p3=1)``.
Note that batched tasks do not take up :ref:`resources-config`, only the
task that ends up running will use resources. The scheduler only checks
that there are sufficient resources for each task individually before
batching them all together.
Tasks that regularly overwrite the same data source
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
If you are overwriting of the same data source with every run, you'll
need to ensure that two batches can't run at the same time. You can do
this pretty easily by setting batch_method to max and setting a unique
resource:
.. code-block:: python
class A(luigi.Task):
date = luigi.DateParameter(batch_method=max)
resources = {'overwrite_resource': 1}
Now if you have multiple tasks such as ``A(date=2016-06-01)``,
``A(date=2016-06-02)``, ``A(date=2016-06-03)``, the scheduler will just
tell you to run the highest available one and mark the lower ones as
batch_running. Using a unique resource will prevent multiple tasks from
writing to the same location at the same time if a new one becomes
available while others are running.
Avoiding concurrent writes to a single file
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Updating a single file from several tasks is almost always a bad idea, and you
need to be very confident that no other good solution exists before doing this.
If, however, you have no other option, then you will probably at least need to ensure that
no two tasks try to write to the file _simultaneously_.
By turning 'resources' into a Python property, it can return a value dependent on
the task parameters or other dynamic attributes:
.. code-block:: python
class A(luigi.Task):
...
@property
def resources(self):
return { self.important_file_name: 1 }
Since, by default, resources have a usage limit of 1, no two instances of Task A
will now run if they have the same `important_file_name` property.
Decreasing resources of running tasks
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
At scheduling time, the luigi scheduler needs to be aware of the maximum
resource consumption a task might have once it runs. For some tasks, however,
it can be beneficial to decrease the amount of consumed resources between two
steps within their run method (e.g. after some heavy computation). In this
case, a different task waiting for that particular resource can already be
scheduled.
.. code-block:: python
class A(luigi.Task):
# set maximum resources a priori
resources = {"some_resource": 3}
def run(self):
# do something
...
# decrease consumption of "some_resource" by one
self.decrease_running_resources({"some_resource": 1})
# continue with reduced resources
...
Monitoring task pipelines
~~~~~~~~~~~~~~~~~~~~~~~~~
Luigi comes with some existing ways in :py:mod:`luigi.notifications` to receive
notifications whenever tasks crash. Email is the most common way.
The above mentioned range tools for recurring tasks not only implement
reliable scheduling for you, but also emit events which you can use to
set up delay monitoring. That way you can implement alerts for when
jobs are stuck for prolonged periods lacking input data or otherwise
requiring attention.
.. _AtomicWrites:
Atomic Writes Problem
~~~~~~~~~~~~~~~~~~~~~
A very common mistake done by luigi plumbers is to write data partially to the
final destination, that is, not atomically. The problem arises because
completion checks in luigi are exactly as naive as running
:meth:`luigi.target.Target.exists`. And in many cases it just means to check if
a folder exist on disk. During the time we have partially written data, a task
depending on that output would think its input is complete. This can have
devestating effects, as in `the thanksgiving bug
`__.
The concept can be illustrated by imagining that we deal with data stored on
local disk and by running commands:
.. code-block:: console
# This the BAD way
$ mkdir /outputs/final_output
$ big-slow-calculation > /outputs/final_output/foo.data
As stated earlier, the problem is that only partial data exists for a duration,
yet we consider the data to be :meth:`~luigi.task.Task.complete` because the
output folder already exists. Here is a robust version of this:
.. code-block:: console
# This is the good way
$ mkdir /outputs/final_output-tmp-123456
$ big-slow-calculation > /outputs/final_output-tmp-123456/foo.data
$ mv --no-target-directory --no-clobber /outputs/final_output{-tmp-123456,}
$ [[ -d /outputs/final_output-tmp-123456 ]] && rm -r /outputs/final_output-tmp-123456
Indeed, the good way is not as trivial. It involves coming up with a unique
directory name and a pretty complex ``mv`` line, the reason ``mv`` need all
those is because we don't want ``mv`` to move a directory into a potentially
existing directory. A directory could already exist in exceptional cases, for
example when central locking fails and the same task would somehow run twice at
the same time. Lastly, in the exceptional case where the file was never moved,
one might want to remove the temporary directory that never got used.
Note that this was an example where the storage was on local disk. But for
every storage (hard disk file, hdfs file, database table, etc.) this procedure
will look different. But do every luigi user need to implement that complexity?
Nope, thankfully luigi developers are aware of these and luigi comes with many
built-in solutions. In the case of you're dealing with a file system
(:class:`~luigi.target.FileSystemTarget`), you should consider using
:meth:`~luigi.target.FileSystemTarget.temporary_path`. For other targets, you
should ensure that the way you're writing your final output directory is
atomic.
Sending messages to tasks
~~~~~~~~~~~~~~~~~~~~~~~~~
The central scheduler is able to send messages to particular tasks. When a running task accepts
messages, it can access a `multiprocessing.Queue `__
object storing incoming messages. You can implement custom behavior to react and respond to
messages:
.. code-block:: python
class Example(luigi.Task):
# common task setup
...
# configure the task to accept all incoming messages
accepts_messages = True
def run(self):
# this example runs some loop and listens for the
# "terminate" message, and responds to all other messages
for _ in some_loop():
# check incomming messages
if not self.scheduler_messages.empty():
msg = self.scheduler_messages.get()
if msg.content == "terminate":
break
else:
msg.respond("unknown message")
# finalize
...
Messages can be sent right from the scheduler UI which also displays responses (if any). Note that
this feature is only available when the scheduler is configured to send messages (see the :ref:`scheduler-config` config), and the task is configured to accept them.