The NuGet Classification Pipeline in Code - Part 4
Across the previous articles I walked this package intelligence pipeline from requirements to tech stack to architecture. Quick recap: the goal is a pipeline that pulls package data from the NuGet Catalog API and classifies each package as open source or not, built on Dagster and fed by the NuGet catalog plus the SPDX license list. Part 3 planned the whole thing: the asset graph, the cadence table, the classification logic, and the quality gates. This part is the wiring, how each of those plans turns into actual Dagster assets, Postgres tables, and Python. Three assets backed by Postgres, walked in order: ingest, reference, enrich. All of the code in this post lives in the package-intelligence-pipeline repo.
Ingestion
The NuGet asset is a thin Dagster wrapper around a plain async worker (sync_nuget) that knows nothing about the orchestrator, which is what lets the sync run as python -m nuget_pipeline.sync.nuget and the tests skip standing up Dagster. The worker walks the catalog and writes each batch through a single bulk INSERT ... ON CONFLICT DO UPDATE into two tables, raw.nuget_packages and raw.nuget_versions, and crucially the data write, the commitTimeStamp watermark advance, and a heartbeat row all share one transaction, so a crash mid-run is a non-event: restart, read the watermark, resume from the last committed batch. The escape hatch is the raw_metadata JSONB column on each table, which stores the entire catalog leaf verbatim rather than just the columns I parse out. That decision paid for itself when the classifier later needed licenseUrl, a field I never columnized: enrichment just reads it back out of JSON instead of triggering a full re-walk of the catalog over the network. A max_pages config knob caps a run as a backfill safety valve, and a separate one-shot in sync/backfill_deletes.py reuses the same walker against its own sync_state row.
@dg.asset(name="raw_nuget_packages", group_name="nuget", compute_kind="python")
def raw_nuget_packages(context, config: NugetSyncConfig) -> dg.MaterializeResult:
result = asyncio.run(_run_and_cleanup()) # sync_nuget(), then close_pool()
...
return dg.MaterializeResult(metadata=meta)
# inside the worker, one batch, one transaction:
async with transaction() as conn, conn.cursor() as cur:
await cur.execute(_PACKAGE_INSERT_HEAD + values_sql + _PACKAGE_INSERT_TAIL, params)
# ... versions, then deletes ...
await advance_watermark(conn, SOURCE, latest_watermark[0], ...)
await heartbeat_run(conn, ctx.run_id, ...)
Reference data
The reference asset is the boring one, and that’s the point: SPDX publishes one ~730-entry licenses.json that changes maybe quarterly, so the worker fetches it, validates with Pydantic, and rewrites the whole table in a single transaction, no diffing or pagination. The watermark is the licenseListVersion and the caught-up check is plain string equality, not a comparison, because versions are forward-only and lexicographic ordering on 3.10.0 versus 3.9.0 is a footgun. Out of that table the classifier builds two structures: a trivial dictionary keyed by license id and an index from license URL to license id. The URL index takes more care than it first looks. Matching a license URL against SPDX sounds like a plain dictionary lookup, but the same license shows up as http://, https://, and www.-prefixed variants with and without a trailing slash, so I had to write _normalize_url and decide exactly how aggressive it gets. It lowercases scheme and host and strips www. and a trailing slash but deliberately leaves the path case alone, because .../licenses/MIT is not guaranteed to resolve the same as .../mit and over-normalizing would silently merge licenses that aren’t the same. Normalization is lossy: every rule trades a missed match against a wrong one. I tuned toward fewer wrong matches, and how many real matches slip through to unknown is a number Part 5 gets to find.
def _normalize_url(url: str) -> str:
parsed = urlparse(url.strip())
if not parsed.scheme or not parsed.netloc:
return ""
host = parsed.netloc.lower()
if host.startswith("www."):
host = host[4:]
path = parsed.path.rstrip("/") # host/scheme lowered; path case kept
return f"{host}{path}"
The classification engine
The centerpiece is a pure function. classify takes a license expression, a URL, the SPDX dictionary, and the URL index, and returns a verdict in a strict order: the expression wins, the URL is the fallback, and a package with neither is unknown.
The expression does most of the work. A NuGet licenseExpression is a tiny language, not a bare id (Apache-2.0 OR MIT, Apache-2.0+, GPL-2.0-only WITH Classpath-exception-2.0), so a regex tokenizer first reduces it to the base license ids it names. That means dropping parentheses and the AND/OR words, stripping a trailing +, and keeping the base license of a WITH clause while discarding the exception.
With a clean list of ids in hand, the decision is short:
- If SPDX doesn’t recognize a term, the package is
unknown. The one exception isLicenseRef-*, SPDX’s marker for a custom license, which on NuGet is almost alwaysproprietary. - If any recognized term is OSI-approved, it’s
open_source. (A consumer ofMIT OR CC-BY-NC-4.0can just pickMIT.) - Otherwise it’s
proprietary.
osi_terms = [t for t in terms if spdx_dict[t].is_osi_approved]
primary = osi_terms[0] if osi_terms else terms[0]
if osi_terms:
return Classification("open_source", primary, normalized, True,
f"{primary} is OSI-approved")
return Classification("proprietary", primary, normalized, False,
f"declared license {primary} is not OSI-approved")
When there’s no expression, the URL fallback normalizes the license URL, looks it up in the SPDX-derived index, and as a last resort matches a couple of hardcoded Microsoft EULA patterns (go.microsoft.com/fwlink, aka.ms) that never show up in SPDX seeAlso but are unmistakably proprietary.
A handful of edge cases are why every verdict also carries a reasoning string. A malformed expression resolves to unknown rather than crashing. A valid SPDX id can still be non-OSI, so “is this a real license” and “is this open source” turn out to be separate questions. And treating LicenseRef-* as a confident proprietary call, instead of a shrug, is what moved the most rows out of unknown. The worker also tallies how each row was decided, by expression, URL, or nothing, which is how Part 5 gets to ask how big unknown really is.
Schedules, sensors, and automation
The cadence table from Part 3 maps almost straight onto code. NuGet and SPDX are plain ScheduleDefinitions, one firing every six hours and the other weekly on Mondays, both pinned to UTC and shipped stopped so nothing fires the moment the repo loads. The classifier, conspicuously, has no schedule: its cadence row read “after either upstream updates,” and that’s not a cron expression but an AutomationCondition.eager() on the asset, so Dagster materialises enrichment the instant either upstream does and I never have to keep a third schedule in sync with the two it depends on. The dependency graph is the schedule, and that’s the line that sold me on Dagster over cron-and-glue. The safety net the table footnoted is a staleness sensor that re-triggers NuGet if the last successful run in the audit table is older than twelve hours (two missed cycles, because one slip is noise and two is a pattern), backed by a sibling zombie sensor that marks sync_runs failed when a worker dies without reporting completion, so “last successful run” stays a number worth trusting. All of it ships stopped by default.
@dg.asset(
name="enriched_nuget_package_oss_status",
deps=[raw_nuget_packages, raw_spdx_licenses],
automation_condition=dg.AutomationCondition.eager(), # fires when either upstream updates
)
Data quality gates and freshness
The quality checks Part 3 called for land as four layers in code, from the database outward:
- A Postgres
CHECKconstraint, so the classification column can only ever holdopen_source/proprietary/unknown. A typo fails at the boundary, not three weeks later as a confusing dashboard number. - Pydantic validation in the sync workers, rejecting malformed upstream payloads before they reach the database.
- A runtime guard that refuses to classify if the SPDX table loaded empty.
- Dagster asset checks, which are the ones that turn red in the UI.
The load-bearing asset check is the SPDX OSI floor. SPDX reports around 150 OSI-approved licenses in steady state, and the whole open-source verdict leans on that one boolean. If an upstream schema change quietly zeroed it out, every package would flip to proprietary and nothing would look broken. So the check is blocking=True: drop below 100 and the SPDX asset goes red, which stops enrichment from running on poisoned reference data.
@dg.asset_check(asset=raw_spdx_licenses, name="raw_spdx_licenses_osi_floor", blocking=True)
def raw_spdx_licenses_osi_floor() -> dg.AssetCheckResult:
osi_count = _run(_check) # SELECT count(*) WHERE is_osi_approved
return dg.AssetCheckResult(
passed=osi_count >= SPDX_OSI_FLOOR, # SPDX_OSI_FLOOR = 100, baseline ~150
severity=dg.AssetCheckSeverity.ERROR,
metadata={"osi_approved_count": osi_count, "floor": SPDX_OSI_FLOOR},
)
The unknown-ratio check is the deliberate exception. It’s a WARN, not an error, set at a frank 60%, because v1 is known to leave a real chunk of packages in unknown (closing that gap with license-file probing is a v2 job). The point is to catch a regression, not to enforce a bar v1 was always going to miss.
Freshness is a separate set of checks, built with build_last_update_freshness_checks. Each threshold is just its upstream’s cadence plus a margin: twelve hours for NuGet (the six-hour schedule plus one cycle of slack, the same logic as the staleness sensor), ten days for SPDX, and twenty-four hours for enrichment.
What’s next
That’s the wiring. The plan from Part 3 is running code now: two raw assets ingesting NuGet and SPDX, an enrichment asset that classifies on top of them, schedules and a sensor driving the cadence, and four layers of checks keeping bad data out. Everything is in place except the one thing the code can’t tell you on its own, which is what the catalog actually looks like once you run it. How often do we land in unknown? How many packages declare a license at all? What’s the real open-source-versus-proprietary split? That’s Part 5, where we deploy this, let it run against the full catalog, and read what the data says.