Loggers#

Dagster includes a rich and extensible logging system. Dagster comes with a built-in logger that tracks all the execution events. You can also customize loggers to meet your own needs.

Relevant APIs#

NameDescription
@loggerThe decorator used to define loggers. The decorator returns a LoggerDefinition
LoggerDefinitionClass for loggers. You almost never want to use initialize this class directly. Instead, you should use the decorator above
OpExecutionContextThe context object available to an op compute function
InitLoggerContextThe context object passed to a custom logger's initialization function.
build_init_logger_contextA function to construct a InitLoggerContext outside of execution, primarily to be used for testing purposes.

Overview#

Loggers are job-scoped logging handlers, which will be automatically invoked whenever ops in a job log messages.


Defining a Logger#

By default, Dagster comes with a built-in logger that tracks all the execution events. You can find an example in the Using Built-in Loggers section.

The built-in Loggers are defined internally using the LoggerDefinition class. The @logger decorator exposes a simpler API for the common logging use case. It is typically what you'll use to define your own loggers. The decorated function should take a single argument, the init_context available during logger initialization, and return a logging.Logger. You can find an example in the Customizing Loggers section.

Using a Logger#

Logging from an Op#

Any op can emit log messages at any point in its computation:

@op
def hello_logs(context):
    context.log.info("Hello, world!")


@job
def demo_job():
    hello_logs()

Using Built-in Loggers#

When you run the above job in terminal, you'll find the messages have been logged through a built-in logger.

job-cli

The context object passed to every op execution includes the built-in log manager, context.log. It exposes the usual debug, info, warning, error, and critical methods you would expect anywhere else in Python.

When you run Dagster jobs in Dagit, you'll notice that log messages are visible as colored messages in the console:

Logs also stream back to the Dagit frontend in real time:

job-log-dagit

Dagit exposes a powerful facility for filtering log messages based on execution steps and log levels.

job-dagit-filter

Debugging with Logs#

What happens if we introduce an error into our op logic?

@op
def hello_logs_error(context):
    raise Exception("Somebody set up us the bomb")


@job
def demo_job_error():
    hello_logs_error()

Errors in user code are caught by the Dagster machinery to ensure jobs gracefully halt or continue to execute, but messages including the original stack trace get logged both to the console and back to Dagit.

Messages at level ERROR or above are highlighted both in Dagit and in the console logs, so we can easily pick them out of logs even without filtering.

job-dagit-error

In many cases, especially for local development, this log viewer, coupled with op reexecution, is sufficient to enable a fast debug cycle for job implementation.

Examples#

Configuring the built-in loggers#

Suppose that we've gotten the kinks out of our jobs developing locally, and now we want to run in production—without all of the log spew from DEBUG messages that was helpful during development.

Just like ops, loggers can be configured when you run a job. For example, to filter all messages below ERROR out of the colored console logger, add the following snippet to your config YAML:

loggers:
  console:
    config:
      log_level: ERROR

So when you execute the job with that config, you'll only see the ERROR level logs.

Customizing Loggers#

You may find yourself wanting to add or supplement the built-in loggers so that Dagster logs are integrated with the rest of your log aggregation and monitoring infrastructure.

For example, you may be operating in a containerized environment where container stdout is aggregated by a tool such as Logstash. In this kind of environment, where logs will be aggregated and parsed by machine, the multi-line output from the default colored console logger is unhelpful. Instead, we'd much prefer to see single-line, structured log messages like:

{"orig_message": "Hello, world!", "log_message_id": "49854579-e4d1-4289-8453-b3e177b20056", ...}

In fact, a logger that prints JSON-formatted single-line messages like this to the console is already included as dagster.loggers.json_console_logger. But let's look at how we might implement a simplified version of this logger.

Loggers are defined internally using the LoggerDefinition class, but, following a common pattern in the Dagster codebase, the @logger decorator exposes a simpler API for the common use case and is typically what you'll use to define your own loggers. The decorated function should take a single argument, the init_context available during logger initialization, and return a logging.Logger.

@logger(
    {
        "log_level": Field(str, is_required=False, default_value="INFO"),
        "name": Field(str, is_required=False, default_value="dagster"),
    },
    description="A JSON-formatted console logger",
)
def json_console_logger(init_context):
    level = init_context.logger_config["log_level"]
    name = init_context.logger_config["name"]

    klass = logging.getLoggerClass()
    logger_ = klass(name, level=level)

    handler = logging.StreamHandler()

    class JsonFormatter(logging.Formatter):
        def format(self, record):
            return json.dumps(record.__dict__)

    handler.setFormatter(JsonFormatter())
    logger_.addHandler(handler)

    return logger_


@op
def hello_logs(context):
    context.log.info("Hello, world!")


@job(logger_defs={"my_json_logger": json_console_logger})
def demo_job():
    hello_logs()

As you can see, you can specify the logger name in the run config. It also takes a config argument, representing the config that users can pass to the logger, for example:

loggers:
  my_json_logger:
    config:
      log_level: INFO

When you execute the job, you'll notice that you are no longer using the built-in logger but your custom json logger instead.

job-custom-logger

Specifying Default Repository Loggers#

Default loggers can be specified on a @repository using the default_logger_defs argument. These loggers will be added to every job specified on the repository, and will be used for any materializatons of software-defined assets provided to the repository.

Note that if you explicitly specify loggers on a job, they will override those provided to default_logger_defs.

from dagster import repository, define_asset_job, asset


@asset
def some_asset():
    ...


the_job = define_asset_job("the_job", selection="*")


@repository(default_logger_defs={"json_logger": json_console_logger})
def the_repo():
    # When loading this repository in dagit, json_logger will now be used when
    # the_job runs, or some_asset materializes.
    return [the_job, some_asset]

Testing Custom Loggers#

You can unit test the initialization method of a logger by invoking it.

def test_init_json_console_logger():
    logger_ = json_console_logger(None)
    assert logger_.level == 20
    assert logger_.name == "dagster"

If you need to provide config to the initialization of your logger, you can use the build_init_logger_context function to do so.

from dagster import build_init_logger_context


def test_init_json_console_logger_with_context():
    logger_ = json_console_logger(
        build_init_logger_context(logger_config={"name": "my_logger"})
    )
    assert logger_.level == 20
    assert logger_.name == "my_logger"

Patterns#

Environment-specific logging using jobs#

Logging is environment-specific: you don't want messages generated by data scientists' local development loops to be aggregated with production messages; on the other hand, you may find that in production console logging is irrelevant or even counterproductive.

Dagster recognizes this by attaching loggers to jobs so that you can seamlessly switch from, e.g., Cloudwatch logging in production to console logging in development and test, without changing any of your code.

@op
def log_op(context):
    context.log.info("Hello, world!")


@graph
def hello_logs():
    log_op()


local_logs = hello_logs.to_job(
    name="local_logs", logger_defs={"console": colored_console_logger}
)
prod_logs = hello_logs.to_job(
    name="prod_logs", logger_defs={"cloudwatch": cloudwatch_logger}
)

From Dagit, you can view and execute the prod_logs job and edit config in order to use the new Cloudwatch logger, for example:

loggers:
  cloudwatch:
    config:
      log_level: ERROR
      log_group_name: /my/cool/cloudwatch/log/group
      log_stream_name: very_good_log_stream