25
loading...
This website collects cookies to deliver better user experience
def compute():
# step 1
data = fetch_data(data, params)
# step 2
data_1 = heavy_computation_1(data, a=1, b=2, c=3)
# step 3
data_2 = heavy_computation_2(data_1, x=1, y=2, z=3)
return data_2
x
, y
and z
parameters. And you find yourself running this method over and over with different parameter values and check every time if the output suits you.heavy_computation_2
result you're continuously run fetch_data
and heavy_computation_1
on (probably) the same data. import luigi
from luigi.local_target import LocalTarget
from fetching import fetch_data
from processing import heavy_computation_1, heavy_computation_2
from json import dumps, loads
class FetchData(luigi.Task):
def run(self):
data = fetch_data()
with self.output().open("w") as outfile:
outfile.write(dumps(data))
def output(self):
return LocalTarget("output/data.json")
class HeavyComputationOne(luigi.Task):
a = luigi.IntParameter(default=42)
b = luigi.IntParameter(default=42)
c = luigi.IntParameter(default=42)
def requires(self):
return FetchData()
def run(self):
with self.input().open('r') as infile:
input_data = loads(infile.read())
data = heavy_computation_1(input_data, a=self.a, b=self.b, c=self.c)
with self.output().open("w") as outfile:
outfile.write(dumps(data))
def output(self):
return LocalTarget(f"output/processed_data_1-{self.a}-{self.b}-{self.c}.json")
class HeavyComputationTwo(luigi.Task):
x = luigi.IntParameter(default=42)
y = luigi.IntParameter(default=42)
z = luigi.IntParameter(default=42)
def requires(self):
return HeavyComputationOne(a=1, b=2, c=3)
def run(self):
with self.input().open('r') as infile:
input_data = loads(infile.read())
data = heavy_computation_2(input_data, x=self.x, y=self.y, z=self.z)
with self.output().open("w") as outfile:
outfile.write(dumps(data))
def output(self):
return LocalTarget(f"output/processed_data_2-{self.x}-{self.y}-{self.z}.json")
requires()
which defines task requirements (if any)run()
which contains the task business logicoutput()
which defines the kind of output (in this case files on local filesystem, but they could be e.g. files on S3 or many more things)HeavyComputationTwo
from CLI (yes, Luigi will generate automatically the CLI for you) task N times, using different values of x
, y
and z
parameters. Luigi will run the requirements (HeavyComputationOne
and FetchData
) only once.compute()
method.def test_something():
# Given an initial situation
# When something happens
# Then there's an outcome
FetchData
task (ideally we write the test first!):import luigi
from tasks import FetchData
from json impo
def setup_db():
# here we setup a local test DB with test data
pass
def test_fetch_data_task():
setup_db()
task = FetchData()
assert luigi.build([task], local_scheduler=True)
assert task.output().exists()
data = loads(task.output().open("r").read())
assert len(data) > 0
# other assertions (if needed)
fetch_data()
method.run()
one, since they also can be used outside a Luigi task. FetchData
you're calling fetch_data()
you need to unit-testing it as well :-)FetchData
-> HeavyComputationOne
-> HeavyComputationTwo
a
, b
and c
params on HeavyComputationTwo
task requirements. HeavyComputationTwo
but you need e.g. to specify different a
, b
or c
values other than x
, y
and z
ones, do you need to declare again the HeavyComputationOne
parameters again?class HeavyComputationTwo(luigi.Task):
a = luigi.IntParameter(default=42)
b = luigi.IntParameter(default=42)
c = luigi.IntParameter(default=42)
x = luigi.IntParameter(default=42)
y = luigi.IntParameter(default=42)
z = luigi.IntParameter(default=42)
# ...
util
module.HeavyComputationTwo
method specifying that it inherits from HeavyComputationOne
, in this way:from tasks import HeavyComputationOne
from luigi.util import inherits
@inherits(HeavyComputationOne)
class HeavyComputationTwo(luigi.Task):
x = luigi.IntParameter(default=42)
y = luigi.IntParameter(default=42)
z = luigi.IntParameter(default=42)
# ...
a
, b
and c
are now inherited from HeavyComputationOne
, and there's no need to re-declare them anymore.multiprocessing
is not enough.run()
method:import luigi
import dask.bag as db
class HeavyTask(luigi.Task):
def run():
# ...
with create_local_cluster(memory_limit="4GB", n_workers=6) as cluster, Client(cluster):
bag = db.from_sequence(data)
output +=bag.map_partitions(heavy_method, arg1=arg1)
.compute()
)
create_local_cluster()
simply wraps the related Dask method:from dask.distributed import LocalCluster
def create_local_cluster(n_workers: int = 4, **kwargs) -> LocalCluster:
return LocalCluster(
n_workers=n_workers,
worker_dashboard_address=None,
**kwargs,
)