import os
from module import n, N
import module
import module2
from hamilton import driver
from hamilton.execution import executors
Parallel Caching with Hamilton
Setup
Code: module.py and module2.py.
Write example data
= False
regenerate
if not os.path.exists("data"):
"data")
os.mkdir(
for i in range(N):
dir = f"data/{i}"
if not os.path.exists(dir):
dir)
os.mkdir(if not os.path.exists(f"{dir}/data.csv") or regenerate:
with open(f"{dir}/data.csv", "w") as f:
"x,y\n")
f.write(= [f"{i},{j}" for j in range(n)]
l "\n".join(l)) f.write(
Hamilton
Multiprocessing, Parallelizable, Caching
Pipeline of module.py
= (
dr
driver.Builder()
.with_modules(module)=True)
.enable_dynamic_execution(allow_experimental_mode
.with_local_executor(executors.SynchronousLocalTaskExecutor())=5))
.with_remote_executor(executors.MultiProcessingExecutor(max_tasks=True)
.with_cache(recompute
.build() )
"result"]) dr.visualize_execution([
Rebuilding cache
%%timeit -r1 -n 3
= (
dr
driver.Builder()
.with_modules(module)=True)
.enable_dynamic_execution(allow_experimental_mode
.with_local_executor(executors.SynchronousLocalTaskExecutor())=5))
.with_remote_executor(executors.MultiProcessingExecutor(max_tasks=True)
.with_cache(recompute
.build()
)= dr.execute(["result"])["result"] r
16.6 s ± 0 ns per loop (mean ± std. dev. of 1 run, 3 loops each)
From cache
%%timeit -r1 -n 3
= (
dr
driver.Builder()
.with_modules(module)=True)
.enable_dynamic_execution(allow_experimental_mode
.with_local_executor(executors.SynchronousLocalTaskExecutor())=5))
.with_remote_executor(executors.MultiProcessingExecutor(max_tasks
.with_cache()
.build()
)= dr.execute(["result"])["result"] r
7.69 s ± 0 ns per loop (mean ± std. dev. of 1 run, 3 loops each)
Synchronous executor, Parallelizable, Caching
Rebuilding cache
%%timeit -r1 -n 3
= (
dr
driver.Builder()
.with_modules(module)=True)
.enable_dynamic_execution(allow_experimental_mode
.with_local_executor(executors.SynchronousLocalTaskExecutor())
.with_remote_executor(executors.SynchronousLocalTaskExecutor())=True)
.with_cache(recompute
.build()
)= dr.execute(["result"])["result"] r
20.6 s ± 0 ns per loop (mean ± std. dev. of 1 run, 3 loops each)
From cache
%%timeit -r1 -n 3
= (
dr
driver.Builder()
.with_modules(module)=True)
.enable_dynamic_execution(allow_experimental_mode
.with_local_executor(executors.SynchronousLocalTaskExecutor())
.with_remote_executor(executors.SynchronousLocalTaskExecutor())
.with_cache()
.build()
)= dr.execute(["result"])["result"] r
7.1 s ± 0 ns per loop (mean ± std. dev. of 1 run, 3 loops each)
No Parallelizable, Caching
Pipeline of module2.py
= (
dr
driver.Builder()
.with_modules(module2)=True)
.with_cache(recompute
.build()
)"result"]) dr.visualize_execution([
Rebuilding cache
%%timeit -r1 -n 3
= (
dr
driver.Builder()
.with_modules(module2)=True)
.with_cache(recompute
.build()
)= dr.execute(["result"])["result"] r
6.91 s ± 0 ns per loop (mean ± std. dev. of 1 run, 3 loops each)
From cache
%%timeit -r1 -n 3
= (
dr
driver.Builder()
.with_modules(module2)
.with_cache()
.build()
)= dr.execute(["result"])["result"] r
11.4 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 3 loops each)