Skip to main content

Quickstart

Dagster supports saving and loading Iceberg tables as assets using I/O managers.

Prerequisites

To follow the steps in this guide, you'll need to:

Defining the I/O manager

To use an Iceberg I/O manager, add it to your Definitions:


from dagster_iceberg.config import IcebergCatalogConfig
from dagster_iceberg.io_manager.arrow import PyArrowIcebergIOManager

from dagster import Definitions

warehouse_path = "/tmp/warehouse"

resources = {
"io_manager": PyArrowIcebergIOManager(
name="default",
config=IcebergCatalogConfig(
properties={
"type": "sql",
"uri": f"sqlite:///{warehouse_path}/pyiceberg_catalog.db",
"warehouse": f"file://{warehouse_path}",
}
),
namespace="default",
)
}

defs = Definitions(assets=[my_table, my_table_with_year], resources=resources)

Storing a Dagster asset as an Iceberg table

The I/O manager will automatically persist the returned data to your warehouse:


import pyarrow as pa

from dagster import asset


@asset
def my_table() -> pa.Table:
n_legs = pa.array([2, 4, 5, 100])
animals = pa.array(["Flamingo", "Horse", "Brittle stars", "Centipede"])
names = ["n_legs", "animals"]
return pa.Table.from_arrays([n_legs, animals], names=names)

Loading Iceberg tables in downstream assets

The I/O manager will also load the data stored in your warehouse when referenced in a dependent asset:


import pyarrow as pa

from dagster import asset


@asset
def my_table_with_year(my_table: pa.Table) -> pa.Table:
year = [2021, 2022, 2019, 2021]
return my_table.append_column("year", [year])