Quickstart
Dagster supports saving and loading Iceberg tables as assets using I/O managers.
Prerequisites
To follow the steps in this guide, you'll need to:
-
Create a temporary location for Iceberg and set up the catalog.
-
Create the
default
namespace:catalog.create_namespace("default")
Defining the I/O manager
To use an Iceberg I/O manager, add it to your Definitions
:
from dagster_iceberg.config import IcebergCatalogConfig
from dagster_iceberg.io_manager.arrow import PyArrowIcebergIOManager
from dagster import Definitions
warehouse_path = "/tmp/warehouse"
resources = {
"io_manager": PyArrowIcebergIOManager(
name="default",
config=IcebergCatalogConfig(
properties={
"type": "sql",
"uri": f"sqlite:///{warehouse_path}/pyiceberg_catalog.db",
"warehouse": f"file://{warehouse_path}",
}
),
namespace="default",
)
}
defs = Definitions(assets=[my_table, my_table_with_year], resources=resources)
Storing a Dagster asset as an Iceberg table
The I/O manager will automatically persist the returned data to your warehouse:
import pyarrow as pa
from dagster import asset
@asset
def my_table() -> pa.Table:
n_legs = pa.array([2, 4, 5, 100])
animals = pa.array(["Flamingo", "Horse", "Brittle stars", "Centipede"])
names = ["n_legs", "animals"]
return pa.Table.from_arrays([n_legs, animals], names=names)
Loading Iceberg tables in downstream assets
The I/O manager will also load the data stored in your warehouse when referenced in a dependent asset:
import pyarrow as pa
from dagster import asset
@asset
def my_table_with_year(my_table: pa.Table) -> pa.Table:
year = [2021, 2022, 2019, 2021]
return my_table.append_column("year", [year])