A type change upstream, a null spike overnight — your pipeline loads it anyway. Block bad data before it reaches your warehouse.
Catches schema drift, null spikes, and type mismatches in under 10ms — before your load task runs.
Your DAG runs green. The extract task succeeds. The load task inserts every row. A type changed upstream, or a field went null — but nobody knows until the dashboard breaks tomorrow morning.
DataScreenIQ screens the payload before storage. A BLOCK stops the pipeline instantly — bad rows go to a dead-letter queue, not your database.
from datascreeniq.exceptions import DataQualityError
report = client.screen(rows, source="orders")
if report.is_blocked:
raise ValueError(report.summary()) # task fails, DAG stops
Install the SDK, drop in the integration, get PASS / WARN / BLOCK on every run.
Add datascreeniq to your Airflow environment or requirements file.
Add DATASCREENIQ_API_KEY to your Airflow Variables or environment. Get a free key at datascreeniq.com.
Insert a PythonOperator task between your extract and load tasks. Pass the extracted rows through the screen call.
Set per-source thresholds in the DataScreenIQ dashboard or inline in the task. Payments can block at 1% null; events can tolerate 40%.
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
import datascreeniq as dsiq
from datascreeniq.exceptions import DataQualityError
@dag(schedule_interval="@hourly", start_date=days_ago(1), catchup=False)
def orders_pipeline():
@task
def extract():
# your existing extract logic
return fetch_from_source()
@task
def quality_gate(rows):
client = dsiq.Client() # reads DATASCREENIQ_API_KEY from env
try:
client.screen(rows, source="orders").raise_on_block()
except DataQualityError as e:
# Raises AirflowException — task fails, DAG stops
raise ValueError(f"Quality gate failed: {e.report.summary()}")
return rows # only passes through on PASS or WARN
@task
def load(rows):
insert_to_warehouse(rows) # only runs if quality_gate passed
rows = extract()
clean = quality_gate(rows)
load(clean)
dag = orders_pipeline()
raise_on_block() with a custom check on report.is_warn.# Tighter thresholds for financial data
client.screen(
rows,
source="payments",
options={"thresholds": {
"null_rate_warn": 0.01, # WARN if > 1% nulls
"null_rate_block": 0.02, # BLOCK if > 2% nulls
}}
)
DataScreenIQ runs 18 quality checks in a single pass — null rates, type mismatches, schema drift, outliers, duplicate rates, and more. The result is one of three verdicts.
All checks within thresholds. Pipeline proceeds to load. No action needed.
Quality degraded but above BLOCK threshold. Load proceeds, issue flagged for review.
Critical quality issue detected. Row load prevented. Dead-letter queue or alert triggered.
DataScreenIQ drops into any pipeline that can make an HTTP call.
Block merges when data quality fails.
Catch schema drift in transformed data.
Quality gate flow with alerting on BLOCK.
Try DataScreenIQ in 60 seconds.
Free tier: 500K rows/month. No credit card. API key in 30 seconds.
Get a free API key →