To implement checkpointing for interrupted spatial batches, persist a lightweight atomic state file that maps each spatial asset — file path, feature ID, or tile coordinate — to a completion flag. Wrap the processing loop in a SIGINT/SIGTERM handler that flushes state before exit, and on restart deserialize the file, filter out completed items, and resume from the exact offset. This eliminates redundant I/O and prevents partial writes in formats like GeoPackage and Shapefile.
This page is part of the Progress Tracking in Batch Jobs guide, which sits inside the broader Spatial Batch Processing & Async Workflows reference.
Prerequisites
| Requirement | Detail |
|---|---|
| Python | 3.9+ (os.replace atomicity; signal module stable) |
| Standard library only | json, os, signal, pathlib, logging — no extra pip installs |
| Spatial I/O (optional) | pyogrio or rasterio for the actual processing steps; pip install pyogrio rasterio |
| POSIX OS | os.replace atomic rename is guaranteed on Linux/macOS; on Windows it is also atomic since Python 3.3 |
For the async variant of this pattern — processing GeoJSON concurrently while checkpointing — see Processing 100k GeoJSON Files with Python asyncio.
Checkpoint Lifecycle
The diagram below shows the three states each spatial asset moves through — and the two points where a crash can occur — to illustrate why the checkpoint write must happen after the format-specific commit.
Complete Working Implementation
The script below is self-contained. It processes a directory of GeoPackage files in EPSG:4326 using pyogrio, checkpoints each completed file to spatial_batch_state.json, and resumes cleanly on restart. Replace the process_one_asset function body with your actual transformation logic.
#!/usr/bin/env python3
"""
Spatial batch processor with atomic checkpointing.
Usage:
python checkpoint_batch.py /data/raw_gpkg /data/output_gpkg
# Kill with Ctrl-C, then re-run — it resumes from the last completed file.
Requirements:
pip install pyogrio
"""
import json
import logging
import os
import signal
import sys
from pathlib import Path
from typing import Optional
import pyogrio
import geopandas as gpd
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s | %(levelname)-8s | %(message)s",
stream=sys.stderr,
)
log = logging.getLogger(__name__)
CHECKPOINT_FILE = "spatial_batch_state.json"
TARGET_CRS = "EPSG:4326"
class SpatialCheckpoint:
"""Atomic, signal-aware checkpoint for spatial batch jobs."""
def __init__(self, path: str = CHECKPOINT_FILE) -> None:
self.path = Path(path)
self.state: dict[str, bool] = self._load()
self.interrupted = False
# Flush on Ctrl-C (SIGINT) and orchestrator shutdown (SIGTERM)
signal.signal(signal.SIGINT, self._handle_signal)
signal.signal(signal.SIGTERM, self._handle_signal)
def _load(self) -> dict[str, bool]:
if not self.path.exists():
return {}
try:
with open(self.path) as fh:
return json.load(fh)
except json.JSONDecodeError:
# Truncated write from a previous crash — start fresh rather than
# silently reprocessing all assets with wrong state
log.warning("Checkpoint file corrupt; starting fresh: %s", self.path)
return {}
def _save(self) -> None:
# Write to .tmp then os.replace() — atomic on POSIX and Windows 3.3+
tmp = self.path.with_suffix(".tmp")
with open(tmp, "w") as fh:
json.dump(self.state, fh, indent=2)
os.replace(str(tmp), str(self.path)) # ← crash-safe atomic rename
def _handle_signal(self, signum: int, frame: object) -> None:
log.info("Signal %d received — flushing checkpoint before exit.", signum)
self.interrupted = True
self._save()
# Do NOT sys.exit() here; let the loop's `if checkpoint.interrupted` branch
# exit cleanly so in-flight writes can finish their current iteration.
def mark_complete(self, asset_id: str) -> None:
"""Call this AFTER the format-specific commit succeeds — never before."""
self.state[asset_id] = True
self._save()
def get_pending(self, all_assets: list[str]) -> list[str]:
"""Return only assets that have not yet been successfully processed."""
return [a for a in all_assets if not self.state.get(a, False)]
@property
def completed_count(self) -> int:
return sum(1 for v in self.state.values() if v)
def process_one_asset(src_path: Path, dst_dir: Path) -> None:
"""
Read a GeoPackage with pyogrio, reproject to EPSG:4326, write output.
pyogrio is preferred over fiona for vector I/O: it uses Arrow-backed
columnar reads that are ~5–10x faster on large feature sets.
"""
gdf: gpd.GeoDataFrame = pyogrio.read_dataframe(
str(src_path),
use_arrow=True, # Arrow-backed read; requires pyogrio >= 0.6
)
if gdf.crs is None:
raise ValueError(f"No CRS on {src_path.name} — cannot reproject to {TARGET_CRS}")
if str(gdf.crs) != TARGET_CRS:
gdf = gdf.to_crs(TARGET_CRS) # explicit EPSG:4326 coercion
dst_path = dst_dir / src_path.name
# pyogrio.write_dataframe wraps the underlying GPKG write in a single
# transaction; on completion the SQLite COMMIT is issued before returning.
pyogrio.write_dataframe(gdf, str(dst_path), driver="GPKG")
def process_spatial_batch(src_dir: Path, dst_dir: Path) -> int:
dst_dir.mkdir(parents=True, exist_ok=True)
all_assets = sorted(str(p) for p in src_dir.glob("*.gpkg"))
if not all_assets:
log.error("No .gpkg files found under %s", src_dir)
return 2 # POSIX exit 2 = bad arguments / no input
checkpoint = SpatialCheckpoint()
pending = checkpoint.get_pending(all_assets)
log.info(
"Batch: %d total, %d already done, %d pending.",
len(all_assets),
checkpoint.completed_count,
len(pending),
)
for asset in pending:
if checkpoint.interrupted:
log.info("Graceful shutdown — stopping before %s.", Path(asset).name)
return 1 # POSIX exit 1 = interrupted / non-fatal error
src_path = Path(asset)
try:
log.info("Processing %s", src_path.name)
process_one_asset(src_path, dst_dir)
# mark_complete AFTER the write transaction commits
checkpoint.mark_complete(asset)
log.info("Done: %s", src_path.name)
except Exception as exc:
# Isolate failures: one bad file does not abort the batch
log.error("Failed %s: %s", src_path.name, exc)
# Asset stays pending; it will be retried on the next run
log.info("Batch finished. %d/%d assets processed.", checkpoint.completed_count, len(all_assets))
return 0
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Resumable spatial batch processor")
parser.add_argument("src_dir", type=Path, help="Directory of source .gpkg files")
parser.add_argument("dst_dir", type=Path, help="Destination directory for output .gpkg files")
args = parser.parse_args()
sys.exit(process_spatial_batch(args.src_dir, args.dst_dir))
Step Annotations
-
os.replace(str(tmp), str(self.path))— crash-safe state flush.os.replaceperforms an atomic rename at the OS level. If the process crashes while writing to the.tmpfile, the originalspatial_batch_state.jsonis untouched. A plainopen(path, "w")write can leave a zero-byte or truncated JSON file that raisesjson.JSONDecodeErroron the next run. The_loadmethod’sJSONDecodeErrorcatch is a last-resort safety net, not a substitute for atomicity. -
signal.signal(signal.SIGINT, self._handle_signal)— gracefulCtrl-Chandling. The handler setsself.interrupted = Trueand calls_save()but does not callsys.exit(). This lets the current iteration of the loop finish itsprocess_one_assetcall (including the SQLite COMMIT) before the loop’sif checkpoint.interruptedcheck exits cleanly. Forcing an immediate exit from inside a signal handler risks leaving a half-written GeoPackage on disk. -
pyogrio.read_dataframe(..., use_arrow=True)— columnar I/O for large feature sets. Arrow-backed reads viapyogrioload feature attribute tables directly into columnar memory without the row-by-row Python overhead offiona. For files with hundreds of thousands of features and wide attribute schemas — common in parcel or building footprint datasets — this is 5–10x faster thanfiona.open. The Chunked Vector Data Reading guide covers how to extend this pattern for datasets that exceed available RAM. -
gdf.to_crs(TARGET_CRS)— explicit EPSG:4326 coercion before write. Spatial format corruption frequently originates from silent CRS mismatches: a source file in EPSG:32632 (UTM zone 32N) written to a GeoPackage without reprojection, then consumed by a downstream tool that assumes EPSG:4326. Checkinggdf.crs is Noneand reprojecting explicitly eliminates this class of corruption. See Error Handling in Spatial Pipelines for a structured approach to logging CRS mismatches across a batch. -
checkpoint.mark_complete(asset)placed afterprocess_one_assetreturns.pyogrio.write_dataframeissues its SQLite COMMIT before returning. So by the timemark_completeis called, the on-disk GeoPackage is in a consistent state. If you use a lower-levelsqlite3connection directly, callcon.commit()explicitly beforemark_complete— never inside afinallyblock that also callsmark_complete, as a failed COMMIT would then incorrectly mark the asset as done. -
return 1oncheckpoint.interrupted,return 2on no input,return 0on success. These follow the POSIX convention used throughout Spatial Batch Processing & Async Workflows: exit 0 = success, 1 = runtime interruption or non-fatal error, 2 = bad arguments or missing input. Orchestrators (Airflow, Prefect, shell scripts) can test$?to distinguish a clean resume-ready stop from a configuration problem.
Named Gotcha: Shapefiles and Orphaned Sidecars
GeoPackage (SQLite-backed) is transactional; if a write is interrupted, SQLite rolls back cleanly and the file is undamaged. Shapefiles are not transactional. An interrupted write leaves behind partial .shp, .shx, .dbf, and .cpg files. On the next run, pyogrio.read_dataframe on the orphaned set raises either pyogrio.errors.DataSourceError or returns a GeoDataFrame with zero features, neither of which is obviously the cause of the downstream failure.
Fix: write Shapefiles to a temporary staging path, validate geometry count, then move the complete sidecar set atomically:
import shutil
import tempfile
from pathlib import Path
import pyogrio
import geopandas as gpd
def write_shapefile_safe(gdf: gpd.GeoDataFrame, dst_path: Path) -> None:
"""
Write a Shapefile via a staging directory to avoid orphaned sidecars.
dst_path should be the .shp file; all sidecars share the same stem.
"""
stem = dst_path.stem
with tempfile.TemporaryDirectory() as staging:
staging_path = Path(staging) / f"{stem}.shp"
pyogrio.write_dataframe(gdf, str(staging_path), driver="ESRI Shapefile")
# Verify the staged output before moving
staged_gdf = pyogrio.read_dataframe(str(staging_path))
if len(staged_gdf) != len(gdf):
raise RuntimeError(
f"Staged shapefile feature count mismatch: "
f"expected {len(gdf)}, got {len(staged_gdf)}"
)
# Move all sidecars (.shp, .shx, .dbf, .cpg, .prj) atomically
for sidecar in Path(staging).glob(f"{stem}.*"):
shutil.move(str(sidecar), str(dst_path.parent / sidecar.name))
Only call checkpoint.mark_complete(asset) after write_shapefile_safe returns without raising.
Verification
After an interrupted run and a resumed run, confirm the checkpoint state is consistent:
# Inspect the checkpoint file — all completed assets should show true
python3 - <<'EOF'
import json, pathlib, sys
cp = pathlib.Path("spatial_batch_state.json")
if not cp.exists():
print("No checkpoint file found.")
sys.exit(0)
state = json.loads(cp.read_text())
done = [k for k, v in state.items() if v]
total = len(state)
print(f"Checkpoint: {len(done)}/{total} assets marked complete")
for path in done:
p = pathlib.Path(path)
if not p.exists():
print(f" WARNING: checkpoint entry exists but file missing: {p.name}")
EOF
# Cross-check: count output files vs checkpoint entries
python3 - <<'EOF'
import json, pathlib, sys
state = json.loads(pathlib.Path("spatial_batch_state.json").read_text())
done = {k for k, v in state.items() if v}
outputs = set(str(p) for p in pathlib.Path("/data/output_gpkg").glob("*.gpkg"))
missing_outputs = done - outputs
extra_outputs = outputs - done
if missing_outputs:
print("Checkpoint says done but output file absent:")
for p in sorted(missing_outputs):
print(f" {p}")
if extra_outputs:
print("Output file exists but not in checkpoint (may be from a previous run):")
for p in sorted(extra_outputs):
print(f" {p}")
if not missing_outputs and not extra_outputs:
print("Checkpoint and output directory are in sync.")
EOF
FAQ
How often should I flush the checkpoint — after every asset, or in batches?
Flushing after every asset gives zero-rework guarantees: a crash between two consecutive flushes costs at most one asset. The overhead of an atomic JSON write (a small file rename) is negligible compared to a GDAL or pyogrio write. For high-throughput pipelines processing thousands of small tiles per second, batch the checkpoint flush every N items (e.g. if processed_count % 50 == 0: checkpoint._save()) and accept up to N assets of rework on crash.
Can I run two workers against the same checkpoint file?
Not safely with a plain JSON file — concurrent writes will corrupt it. For multi-worker resumable batches, replace the JSON file with a SQLite database and use INSERT OR IGNORE INTO completed (asset_id) VALUES (?) as the mark-complete operation. SQLite’s WAL mode supports multiple concurrent readers and one writer without lock contention for this workload. For distributed workers across machines, use Redis SETNX or PostgreSQL INSERT ... ON CONFLICT DO NOTHING.
What should I use as the asset ID — absolute path or relative path?
Always use absolute paths (str(Path(asset).resolve())). Relative paths break when the script is invoked from a different working directory or when the source directory is moved between runs. If assets can be re-named (e.g. ingested from an object-store URI), use a stable content hash instead: hashlib.sha256(Path(asset).read_bytes()).hexdigest()[:16].
How do I handle assets that should be skipped permanently (not retried)?
Add a separate failed key to the state dict alongside the completed key, or use a tri-value state ("pending", "done", "failed"). Update get_pending to filter out both "done" and "failed" entries. Log failures to a structured JSON error log (see Logging Spatial Transformation Results to Structured JSON) so they can be audited separately from the checkpoint state.
Related
- Progress Tracking in Batch Jobs — the parent guide covering thread-safe counters, Rich progress dashboards, and async-compatible renderers for the pipelines this checkpointing pattern protects
- Error Handling in Spatial Pipelines — structured logging, retry strategies, and exit-code conventions that complement checkpoint-based resumption
- Chunked Vector Data Reading — how to stream large vector datasets in fixed-size batches, a natural companion to per-chunk checkpointing