Skip to main content

Creating and registering a component type

info

This feature is still in development and might change in patch releases. It’s not production ready, and the documentation may also evolve. Stay tuned for updates.

The dagster-components system makes it easy to create new component types that can be reused across your project.

In most cases, component types map to a specific technology. For example, you might have a DockerScriptComponent that executes a script in a Docker container, or a SnowflakeQueryComponent that runs a query on Snowflake.

note

Refer to the project structuring guide to learn how to create a components-compatible project.

Scaffolding component type files

For this example, we'll write a lightweight component that executes a shell command.

First, we use the dg command-line utility to scaffold a new component type:

dg scaffold component-type shell_command
Creating a Dagster component type at /.../my-component-library/my_component_library/lib/shell_command.py.
Scaffolded files for Dagster component type at /.../my-component-library/my_component_library/lib/shell_command..

This will add a new file to your project in the lib directory:

my_component_library/lib/shell_command.py
from dagster import Definitions
from dagster_components import (
Component,
ComponentLoadContext,
DefaultComponentScaffolder,
ResolvableSchema,
registered_component_type,
)

class ShellCommandSchema(ResolvableSchema):
...

@registered_component_type(name="shell_command")
class ShellCommand(Component):
"""COMPONENT SUMMARY HERE.

COMPONENT DESCRIPTION HERE.
"""

@classmethod
def get_schema(cls):
return ShellCommandSchema

@classmethod
def get_scaffolder(cls) -> DefaultComponentScaffolder:
return DefaultComponentScaffolder()

def build_defs(self, load_context: ComponentLoadContext) -> Definitions:
# Add definition construction logic here.
return Definitions()

This file contains the basic structure for the new component type. There are two methods that you'll need to implement:

  • get_schema: This method should return a Pydantic model that defines the schema for the component. This is the schema for the data that goes into component.yaml.
  • build_defs: This method should return a Definitions object for this component.

Defining a schema

The first step is to define a schema for the component. This means determining what aspects of the component should be customizable.

In this case, we'll want to define a few things:

  • The path to the shell script that we'll want to run.
  • The assets that we expect this script to produce.

To simplify common use cases, dagster-components provides schemas for common bits of configuration, such as AssetSpecSchema, which contains attributes that are common to all assets, such as the key, description, tags, and dependencies.

We can the schema for our component and add it to our class as follows:

my_component_library/lib/shell_command.py
from collections.abc import Sequence

from dagster_components import (
AssetSpecSchema,
Component,
ComponentLoadContext,
ResolvableSchema,
registered_component_type,
)

import dagster as dg


class ShellScriptSchema(ResolvableSchema):
script_path: str
asset_specs: Sequence[AssetSpecSchema]


@registered_component_type(name="shell_command")
class ShellCommand(Component):
"""Models a shell script as a Dagster asset."""

@classmethod
def get_schema(cls) -> type[ShellScriptSchema]:
return ShellScriptSchema

def build_defs(self, load_context: ComponentLoadContext) -> dg.Definitions: ...

Defining the Python class

Next, we'll want to translate this schema into fully resolved Python objects. For example, our schema defines asset_specs as Sequence[AssetSpecSchema], but at runtime we'll want to work with Sequence[AssetSpec].

By convention, we'll use the @dataclass decorator to simplify our class definition. We can define attributes for our class that line up with the properties in our schema, but this time we'll use the fully resolved types where appropriate.

Our path will still just be a string, but our asset_specs will be a list of AssetSpec objects. AssetSpecSchema implements ResolvableSchema[AssetSpec], which indicates that it can automatically resolve into an AssetSpec object, so we don't need to do any additional work to resolve this field for our component.

my_component_library/lib/shell_command.py
from collections.abc import Sequence
from dataclasses import dataclass

from dagster_components import (
AssetSpecSchema,
Component,
ComponentLoadContext,
ResolvableSchema,
registered_component_type,
)

import dagster as dg


class ShellScriptSchema(ResolvableSchema):
script_path: str
asset_specs: Sequence[AssetSpecSchema]


@registered_component_type(name="shell_command")
@dataclass
class ShellCommand(Component):
script_path: str
asset_specs: Sequence[dg.AssetSpec]

@classmethod
def get_schema(cls) -> type[ShellScriptSchema]:
return ShellScriptSchema

def build_defs(self, load_context: ComponentLoadContext) -> dg.Definitions: ...
tip

When defining a field on a component that isn't on the schema, or is of a different type, the components system allows you to provide custom resolution logic for that field. See the Providing resolution logic for non-standard types section for more information.

Building definitions

Now that we've defined how the component is parameterized, we need to define how to turn those parameters into a Definitions object.

To do so, we'll want to override the build_defs method, which is responsible for returning a Definitions object containing all definitions related to the component.

Our build_defs method will create a single @asset that executes the provided shell script. By convention, we'll put the code to actually execute this asset inside of a function called execute. This makes it easier for future developers to create subclasses of this component.

my_component_library/lib/shell_command.py
import subprocess
from collections.abc import Sequence
from dataclasses import dataclass
from pathlib import Path

from dagster_components import (
AssetSpecSchema,
Component,
ComponentLoadContext,
ResolutionContext,
ResolvableSchema,
registered_component_type,
)

import dagster as dg


class ShellScriptSchema(ResolvableSchema):
script_path: str
asset_specs: Sequence[AssetSpecSchema]


def resolve_asset_specs(
context: ResolutionContext, schema: ShellScriptSchema
) -> Sequence[dg.AssetSpec]:
return context.resolve_value(schema.asset_specs)


@registered_component_type(name="shell_command")
@dataclass
class ShellCommand(Component):
"""Models a shell script as a Dagster asset."""

script_path: str
asset_specs: Sequence[dg.AssetSpec]

@classmethod
def get_schema(cls) -> type[ShellScriptSchema]:
return ShellScriptSchema

def build_defs(self, load_context: ComponentLoadContext) -> dg.Definitions:
resolved_script_path = Path(load_context.path, self.script_path).absolute()

@dg.multi_asset(name=Path(self.script_path).stem, specs=self.asset_specs)
def _asset(context: dg.AssetExecutionContext):
self.execute(resolved_script_path, context)

return dg.Definitions(assets=[_asset])

def execute(self, resolved_script_path: Path, context: dg.AssetExecutionContext):
return subprocess.run(["sh", str(resolved_script_path)], check=True)

Component registration

Following the steps above will automatically register your component type in your environment. You can now run:

dg list component-type
Using /.../my-component-library/.venv/bin/dagster-components
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃ Component Type ┃ Summary ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┩
│ definitions@dagster_components │ Wraps an arbitrary set of │
│ │ Dagster definitions. │
│ pipes_subprocess_script_collection@dagster_components │ Assets that wrap Python │
│ │ scripts executed with │
│ │ Dagster's │
│ │ PipesSubprocessClient. │
│ shell_command@my_component_library │ Models a shell script as a │
│ │ Dagster asset. │
└───────────────────────────────────────────────────────┴────────────────────────────────┘

and see your new component type in the list of available component types.

You can also view automatically generated documentation describing your new component type by running:

dg docs component-type shell_command@my_component_library

Now, you can use this component type to create new component instances.

Configuring custom scaffolding

Once your component type is registered, instances of the component type can be scaffolded using the dg scaffold component command:

dg scaffold component 'shell_command@my_component_library' my_shell_command
Using /.../my-component-library/.venv/bin/dagster-components
Creating a Dagster component instance folder at /.../my-component-library/my_component_library/components/my_shell_command.
Using /.../my-component-library/.venv/bin/dagster-components

By default, this will create a new directory alongside an unpopulated component.yaml file. However, you can customize this behavior by implementing a get_scaffolder method on your component type.

In this case, we might want to scaffold a template shell script alongside a filled-out component.yaml file, which we accomplish with a custom scaffolder:

my_component_library/lib/shell_command.py
import os
import subprocess
from collections.abc import Sequence
from dataclasses import dataclass
from pathlib import Path
from typing import Any

from dagster_components import (
AssetSpecSchema,
Component,
ComponentLoadContext,
ComponentScaffolder,
ComponentScaffoldRequest,
ResolvableSchema,
registered_component_type,
scaffold_component_yaml,
)

import dagster as dg


class ShellCommandScaffolder(ComponentScaffolder):
"""Scaffolds a template shell script alongside a filled-out component YAML file."""

def scaffold(self, request: ComponentScaffoldRequest, params: Any) -> None:
scaffold_component_yaml(
request,
{
"script_path": "script.sh",
"asset_specs": [
{"key": "my_asset", "description": "Output of running a script"}
],
},
)
script_path = Path(request.component_instance_root_path) / "script.sh"
script_path.write_text("#!/bin/bash\n\necho 'Hello, world!'")
os.chmod(script_path, 0o755)




class ShellScriptSchema(ResolvableSchema):
script_path: str
asset_specs: Sequence[AssetSpecSchema]


@registered_component_type(name="shell_command")
@dataclass
class ShellCommand(Component):
"""Models a shell script as a Dagster asset."""

script_path: str
asset_specs: Sequence[dg.AssetSpec]

@classmethod
def get_schema(cls) -> type[ShellScriptSchema]:
return ShellScriptSchema

def build_defs(self, load_context: ComponentLoadContext) -> dg.Definitions:
resolved_script_path = Path(load_context.path, self.script_path).absolute()

@dg.multi_asset(name=Path(self.script_path).stem, specs=self.asset_specs)
def _asset(context: dg.AssetExecutionContext):
self.execute(resolved_script_path, context)

return dg.Definitions(assets=[_asset])

def execute(self, resolved_script_path: Path, context: dg.AssetExecutionContext):
return subprocess.run(["sh", str(resolved_script_path)], check=True)

@classmethod
def get_scaffolder(cls) -> ComponentScaffolder:
return ShellCommandScaffolder()


Now, when we run dg scaffold component, we'll see that a template shell script is created alongside a filled-out component.yaml file:

my_component_library/components/my_shell_command/component.yaml
type: shell_command@my_component_library

attributes:
script_path: script.sh
asset_specs:
- key: my_asset
description: Output of running a script
my_component_library/components/my_shell_command/script.sh
#!/bin/bash

echo 'Hello, world!'

[Advanced] Providing resolution logic for non-standard types

In most cases, the types you use in your component schema and in the component class will be the same, or will have out-of-the-box resolution logic, as in the case of AssetSpecSchema and AssetSpec.

However, in some cases you may want to use a type that doesn't have an existing schema equivalent. In this case, you can provide a function that will resolve the value to the desired type by providing an annotation on the field with Annotated[<type>, FieldResolver(...)].

For example, we might want to provide an API client to our component, which can be configured with an API key in YAML, or a mock client in tests:

from dataclasses import dataclass
from typing import Annotated

from dagster_components import (
Component,
ComponentLoadContext,
FieldResolver,
ResolutionContext,
ResolvableSchema,
registered_component_type,
)

import dagster as dg


class MyApiClient:
def __init__(self, api_key: str): ...


class MyComponentSchema(ResolvableSchema):
api_key: str


def resolve_api_key(
context: ResolutionContext, schema: MyComponentSchema
) -> MyApiClient:
return MyApiClient(api_key=schema.api_key)


@registered_component_type(name="my_component")
@dataclass
class MyComponent(Component):
# FieldResolver specifies a function used to map input matching the schema
# to a value for this field
api_client: Annotated[MyApiClient, FieldResolver(resolve_api_key)]

@classmethod
def get_schema(cls) -> type[MyComponentSchema]:
return MyComponentSchema

def build_defs(self, load_context: ComponentLoadContext) -> dg.Definitions: ...

[Advanced] Customize rendering of YAML values

The components system supports a rich templating syntax that allows you to load arbitrary Python values based off of your component.yaml file. All string values in a ResolvableModel can be templated using the Jinja2 templating engine, and may be resolved into arbitrary Python types. This allows you to expose complex object types, such as PartitionsDefinition or AutomationCondition to users of your component, even if they're working in pure YAML.

You can define custom values that will be made available to the templating engine by defining a get_additional_scope classmethod on your component. In our case, we can define a "daily_partitions" function which returns a DailyPartitionsDefinition object with a pre-defined start date:

import subprocess
from collections.abc import Mapping, Sequence
from dataclasses import dataclass
from pathlib import Path
from typing import Any

from dagster_components import (
AssetSpecSchema,
Component,
ComponentLoadContext,
ResolutionContext,
ResolvableSchema,
registered_component_type,
)

import dagster as dg


class ShellScriptSchema(ResolvableSchema):
script_path: str
asset_specs: Sequence[AssetSpecSchema]


def resolve_asset_specs(
context: ResolutionContext, schema: ShellScriptSchema
) -> Sequence[dg.AssetSpec]:
return context.resolve_value(schema.asset_specs)


@registered_component_type(name="shell_command")
@dataclass
class ShellCommand(Component):
script_path: str
asset_specs: Sequence[dg.AssetSpec]

@classmethod
def get_additional_scope(cls) -> Mapping[str, Any]:
return {
"daily_partitions": dg.DailyPartitionsDefinition(start_date="2024-01-01")
}

@classmethod
def get_schema(cls) -> type[ShellScriptSchema]:
return ShellScriptSchema

def build_defs(self, load_context: ComponentLoadContext) -> dg.Definitions:
@dg.multi_asset(name=Path(self.script_path).stem, specs=self.asset_specs)
def _asset(context: dg.AssetExecutionContext):
self.execute(context)

return dg.Definitions(assets=[_asset])

def execute(self, context: dg.AssetExecutionContext):
return subprocess.run(["sh", self.script_path], check=True)

When a user instantiates this component, they will be able to use this custom scope in their component.yaml file:

component_type: my_component

attributes:
script_path: script.sh
asset_specs:
- key: a
partitions_def: "{{ daily_partitions }}"

Next steps