Executors#

Relevant APIs#

NameDescription
@executorThe decorator used to define executors. Defines an ExecutorDefinition.
ExecutorDefinitionAn executor definition.

Overview#

Executors are responsible for executing steps within a job run. Once a run has launched and the process for the run (the run worker) has been allocated and started, the executor assumes responsibility for execution. Executors can range from single-process serial executors all the way to managing per-step computational resources with a sophisticated control plane.

Specifying Executors#

Every job has an executor. The default executor is the multi_or_in_process_executor, which by default executes each step in its own process, but can be configured to execute each step within the same process.

An executor can be specified directly on the job by supplying an ExecutorDefinition to the executor_def parameter of @job or GraphDefinition.to_job.

from dagster import multiprocess_executor, job, graph

# Providing an executor using the job decorator
@job(executor_def=multiprocess_executor)
def the_job():
    ...


@graph
def the_graph():
    ...


# Providing an executor using graph_def.to_job(...)
other_job = the_graph.to_job(executor_def=multiprocess_executor)

A default executor can be specified for all jobs and assets provided to a repository using the default_executor_def argument of @repository. All jobs that don't specify an executor will use this default executor, but if a job explicitly specifies an executor, then the default provided to the repository will not be used.

from dagster import multiprocess_executor, define_asset_job, asset, repository


@asset
def the_asset():
    pass


asset_job = define_asset_job("the_job", selection="*")


@job
def op_job():
    ...


# op_job and asset_job will both use the default_executor_def,
# since neither define their own executor.
@repository(default_executor_def=multiprocess_executor)
def the_repo():
    return [the_asset, asset_job, op_job]

Executing a job via JobDefinition.execute_in_process, overrides the job's executor and uses in_process_executor instead.

Example executors include:

  • in_process_executor: Execution plan executes serially within the [run worker](/deployment/overview#job-execution-flow) itself.
  • multiprocess_executor: Each step executes within its own spawned process. Has configurable level of parallelism.
  • dask_executor: Executes each step within a dask task.
  • celery_executor: Executes each step within a celery task.
  • docker_executor: Executes each step within a Docker container.
  • k8s_job_executor: Executes each step within an ephemeral kubernetes pod.
  • celery_k8s_job_executor: Executes each step within a ephemeral kubernetes pod, using celery as a control plane for prioritization and queuing.
  • celery_docker_executor: Executes each step within a Docker container, using celery as a control plane for prioritization and queueing.

Custom Executors#

The executor system is pluggable, and it is possible to write your own executor to target a different execution substrate. This is not well-documented, and the internal APIs continue to be in flux.