Parallel Caching with Hamilton

Setup

import os
from module import n, N
import module
import module2
from hamilton import driver
from hamilton.execution import executors

Code: module.py and module2.py.

Write example data

regenerate = False

if not os.path.exists("data"):
  os.mkdir("data")

for i in range(N):
  dir = f"data/{i}"
  if not os.path.exists(dir):
    os.mkdir(dir)
  if not os.path.exists(f"{dir}/data.csv") or regenerate:
    with open(f"{dir}/data.csv", "w") as f:
      f.write("x,y\n")
      l = [f"{i},{j}" for j in range(n)]
      f.write("\n".join(l))

Hamilton

Multiprocessing, Parallelizable, Caching

Pipeline of module.py

dr = (
    driver.Builder()
    .with_modules(module)
    .enable_dynamic_execution(allow_experimental_mode=True)
    .with_local_executor(executors.SynchronousLocalTaskExecutor())
    .with_remote_executor(executors.MultiProcessingExecutor(max_tasks=5))
    .with_cache(recompute=True)
    .build()
)
dr.visualize_execution(["result"])

Rebuilding cache

%%timeit -r1 -n 3
dr = (
    driver.Builder()
    .with_modules(module)
    .enable_dynamic_execution(allow_experimental_mode=True)
    .with_local_executor(executors.SynchronousLocalTaskExecutor())
    .with_remote_executor(executors.MultiProcessingExecutor(max_tasks=5))
    .with_cache(recompute=True)
    .build()
)
r = dr.execute(["result"])["result"]
16.6 s ± 0 ns per loop (mean ± std. dev. of 1 run, 3 loops each)

From cache

%%timeit -r1 -n 3
dr = (
    driver.Builder()
    .with_modules(module)
    .enable_dynamic_execution(allow_experimental_mode=True)
    .with_local_executor(executors.SynchronousLocalTaskExecutor())
    .with_remote_executor(executors.MultiProcessingExecutor(max_tasks=5))
    .with_cache()
    .build()
)
r = dr.execute(["result"])["result"]
7.69 s ± 0 ns per loop (mean ± std. dev. of 1 run, 3 loops each)

Synchronous executor, Parallelizable, Caching

Rebuilding cache

%%timeit -r1 -n 3
dr = (
    driver.Builder()
    .with_modules(module)
    .enable_dynamic_execution(allow_experimental_mode=True)
    .with_local_executor(executors.SynchronousLocalTaskExecutor())
    .with_remote_executor(executors.SynchronousLocalTaskExecutor())
    .with_cache(recompute=True)
    .build()
)
r = dr.execute(["result"])["result"]
20.6 s ± 0 ns per loop (mean ± std. dev. of 1 run, 3 loops each)

From cache

%%timeit -r1 -n 3
dr = (
    driver.Builder()
    .with_modules(module)
    .enable_dynamic_execution(allow_experimental_mode=True)
    .with_local_executor(executors.SynchronousLocalTaskExecutor())
    .with_remote_executor(executors.SynchronousLocalTaskExecutor())
    .with_cache()
    .build()
)
r = dr.execute(["result"])["result"]
7.1 s ± 0 ns per loop (mean ± std. dev. of 1 run, 3 loops each)

No Parallelizable, Caching

Pipeline of module2.py

dr = (
    driver.Builder()
    .with_modules(module2)
    .with_cache(recompute=True)
    .build()
)
dr.visualize_execution(["result"])

Rebuilding cache

%%timeit -r1 -n 3
dr = (
    driver.Builder()
    .with_modules(module2)
    .with_cache(recompute=True)
    .build()
)
r = dr.execute(["result"])["result"]
6.91 s ± 0 ns per loop (mean ± std. dev. of 1 run, 3 loops each)

From cache

%%timeit -r1 -n 3
dr = (
    driver.Builder()
    .with_modules(module2)
    .with_cache()
    .build()
)
r = dr.execute(["result"])["result"]
11.4 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 3 loops each)