Tento článek je technickým návodem, jak ve Fabric Data Factory (nebo Azure data factory) realizovat hromadný import tabulek pomocí jediné pipeline pomocí ForEach kontejneru a externího JSON konfiguračního souboru
Tento postup umožňuje snadné přidávání, odebírání i úpravu tabulek bez změn samotné pipeline. V json souboru si můžeme nastavit spoustu užitečných nastavení, které následně můžeme spravovat z jednoho místa pro všechny tabulky.
- Název zdrojové tabulky
- Cílový název parquet souboru
- is_enabled (jestli se má tabulka zpracovávat)
- Primární klíč
- Metoda ingestion (full load vs incremental load)
- návazný notebook pro import do delta tabulky
- a další…
Datová akvizice a problém manuálního vytváření velkého množství datových toků
Při budování datové platformy často potřebujeme často stáhnout a zpracovat velké množství tabulek ze zdrojových systémů. Vytvářet samostatnou pipeline pro každou tabulku je časově dost neefektivní, protože to ve finále mohou být stovky objektů.
Elegantní řešení je použít jeden hromadný orchestrátor (pro 1 zdrojový systém), který iteruje přes seznam tabulek uložený v JSON souboru a provádí jejich import univerzálním způsobem. Ve fabricu máme možnost, jak toho dosáhnout a budeme k tomu potřebovat:
- 1x Fabric Data factory pipeline
- json konfigurační soubor uložený v Data lake
- AdventureWorks light databázi jako zdrojová data
A to je vlastně vše.
Poznámka: Ne vždy je tento přístup možné praktikovat z různých objektivních důvodů. Nicméně v případě, kdy jsou zdrojem nějaká strukturovaná data, např z databáze a podobně, tak to jde.
Princip řešení ve Fabric data factory pro automatizaci pipeline
Fabric Data Factory je nástroj pro orchestraci datových toků napříč Fabric prostředím. Umožňuje spouštění a řízení copy aktivit, transformací, notebooků, webových volání, parametrizaci datasetů a automatizaci komplexních ETL/ELT procesů. Právě díky parametrizaci je ideální pro hromadnou akvizici dat.
- Konfigurační JSON obsahuje seznam tabulek a jejich metadata (nastavení)
- Pipeline provede
Lookupnad JSON souborem. - Volitelná aktivita
Filtervybere pouze položky sis_enabled = true. V budoucnu můžeme chtít tabulku vypnout/zapnout dle potřeby - Pomocí
ForEachproběhne iterace přes jednotlivé tabulky a nastavení - Každá iterace spustí copy aktivitu, která stáhne data ze zdroje do parquet souboru
- Po stažení dat spustíme notebook pro transformaci nebo zápis do Delta tabulek.
Ukázkový JSON config pro řízení hromadného importu
Tento JSON slouží jako hlavní zdroj řízení — definuje všechny tabulky, které budeme importovat. Json ukládáme typicky do Lakehouse files.
{
"sourceSystem": "AdventureWorks",
"tables": [
{
"schema": "SalesLT",
"tableName": "Address",
"parquetName": "address",
"deltaSchema": "adventureworks",
"deltaName": "bt_aw_address",
"deltaNotebook": "2a0b0af8...",
"source_updated_at": "ModifiedDate",
"source_businesskey": "AddressID",
"is_enabled": true,
"ingestion_method": "fullload"
},
{
"schema": "SalesLT",
"tableName": "Customer",
"parquetName": "customer",
"deltaSchema": "adventureworks",
"deltaName": "bt_aw_customer",
"deltaNotebook": "acf935d3...",
"source_updated_at": "ModifiedDate",
"source_businesskey": "CustomerID",
"is_enabled": true,
"ingestion_method": "fullload"
}
/* ...další tabulky... */
]
}
Krok 1 – Příprava json konfiguračního souboru
Uložte JSON konfig například do Fabric Lakehouse nebo ADLS: /_configs/{sourceSystem}_{Name}.json.
Krok 2 – Vytvoření nové pipeline
Vytvořte novou pipeline
Krok 3 – Lookup aktivita a JSON konfigurace
Přídáme aktivitu Lookup a načteme JSON config z Lakehouse.
Krok 4 – ForEach loop aktivita a nastavení jako expression
Přídáme ForEach loop aktivitu a nastavíme, že si má kolekci dat (seznam tabulek) přečíst z našeho json souboru z lookup aktivity. Zaklikneme sekvenční zpracování dat.
Krok 5 – If conditions aktivita a nastavení
Musíme ještě zajistit filtrování na náš parametr is_enabled json. To se dá udělat pomocí filter aktivity nebo pomocí if conditions aktivity.
- Přetáhneme if conditions aktivitu dovnitř foreach aktivity a klikneme na edit (tím nás to přenese dovnitř ifcondition aktivity)
- nastavíme aktivitu tak, aby si přešetla nastaveni “is_enabled” pro konkrétní zpracovávanou tabulku. Pokud je hodnota is_enabled = true, tak se zpracují aktivity uvedené v sekci True (viz krok dále). Pokud je hodnota is_enabled = false, tak se nestane nic
Krok 6 – Copy Activity – univerzální import a automatizace ingest souborové struktury
- (a) Source dataset parametrizujte výrazem:
@item().schema,@item().tableName.
- (b) Source – u source ještě můžeme nastavit další sloupce dle libosti pres advanced – additional columns. Označené sloupce jsou důležité pro další zpracování v silver vrstvě a automatizaci templates – nemusíme definovat ručně primární klíče a sloupec dle kterého dělat SCD 2, což nám dává základ pro další automatizaci – viz Fabric | dbt – Slowly changing dimension (SCD 2) – Snapshots a Check Strategie v dbt s příkladem
- Sink dataset si naparametrizujeme tak, abychom parquet soubory ukládali ve formátu, který nám pak umožňuje pozdější snadnou manipulaci s daty a partition pruning:1
/data-lake/
└── <source_system>/
└── <table_name>/
└── <ingestion_method>/
└── year=<ingestion_year>/
└── month=<ingestion_month>/
└── day=<ingestion_day>/
└── <ingestion_timestamp>data.parquet
- File path nastavíme jako
@concat(pipeline().parameters.p_destination_root,'/',
pipeline().parameters.p_source_system, '/',
item().parquetName,
'/ingestion_method=',item().ingestion_method,
'/ingestion_year=', formatDateTime(utcNow(),'yyyy'),
'/ingestion_month=', formatDateTime(utcNow(),'MM'),
'/ingestion_day=', formatDateTime(utcNow(),'dd')
)
- a file name:
@concat(formatDateTime(utcNow(),'yyyyMMdd_HHmmss'),'_',item().parquetName,'.parquet').
Krokem 5 bycho mměli mít zajištěno, že veškeré tabulky, které jsou enabled v našem konfiguračním souboru se zpracují a uloží jako parquety. Volitelně můžeme ještě nastavit spuštění nějakého návazného kroku. V konfiguračním souboru máme jako referenci guid na notebook, který můžeme spustit poté, co se nakopírují data. Tento notebook může obsahovat nějakou další logiku – například deduplikaci, čištění, scd 2 nebo něco jiného.
Závěr, test a kontrola
Po spuštění připraveného řešení vidíme, že se data postupně zpracovávají pouze pokud je is_enabled = true (označené se nezpracovalo – tabulka byla skipnuta):
Když se podíváme do lakehouse, tak vidíme, že se nám vytvořilo krásná souborová struktura, která je připravena pro pohodlnou práci s parquet soubory, partition pruning a pohodlnou maintenance.
Tento přístup umožňuje zpracovávat desítky tabulek pomocí jediné pipeline, jednoduše je přidávat či odebírat a výrazně zjednodušuje správu datové akvizice. Stačí aktualizovat JSON soubor a pipeline bude automaticky provádět import podle nové konfigurace. V případě většího objemu dat nebo tabulek je ale potřeba se ještě zamyslet nad optimalizací a asynchronním zpracováním dat.
Reference
- Microsoft dokumentace, Efficient Data Partitioning with Microsoft Fabric: Best Practices and Implementation Guide [on-line]. [cit. 2025-11-17]. Dostupné z WWW: https://techcommunity.microsoft.com/blog/fasttrackforazureblog/efficient-data-partitioning-with-microsoft-fabric-best-practices-and-implementat/3890097











