PQ Signature Performance

Post-Quantum Signature Performance

Analysis of post-quantum cryptographic signature performance in Lean Consensus clients.

This notebook examines:

  • Attestation signing time (p50, p95, p99)
  • Attestation verification time
  • Attestations included in aggregated signatures
  • Signature aggregation time
  • Aggregated signature verification time
  • Performance comparison across clients
Show code
import json
from pathlib import Path

import pandas as pd
import plotly.graph_objects as go
from plotly.subplots import make_subplots

# Set default renderer for static HTML output
import plotly.io as pio
pio.renderers.default = "notebook"
Show code
# Resolve devnet_id
DATA_DIR = Path("../data")

if devnet_id is None:
    # Use latest devnet from manifest
    devnets_path = DATA_DIR / "devnets.json"
    if devnets_path.exists():
        with open(devnets_path) as f:
            devnets = json.load(f).get("devnets", [])
        if devnets:
            devnet_id = devnets[-1]["id"]  # Latest
            print(f"Using latest devnet: {devnet_id}")
    else:
        raise ValueError("No devnets.json found. Run 'just detect-devnets' first.")

DEVNET_DIR = DATA_DIR / devnet_id
print(f"Loading data from: {DEVNET_DIR}")
Loading data from: ../data/pqdevnet-20260202T1337Z
Show code
# Load devnet metadata
with open(DATA_DIR / "devnets.json") as f:
    devnets_data = json.load(f)
    devnet_info = next((d for d in devnets_data["devnets"] if d["id"] == devnet_id), None)

if devnet_info:
    print(f"Devnet: {devnet_info['id']}")
    print(f"Duration: {devnet_info['duration_hours']:.1f} hours")
    print(f"Time: {devnet_info['start_time']} to {devnet_info['end_time']}")
    print(f"Slots: {devnet_info['start_slot']}{devnet_info['end_slot']}")
    print(f"Clients: {', '.join(devnet_info['clients'])}")
Devnet: pqdevnet-20260202T1337Z
Duration: 2.3 hours
Time: 2026-02-02T13:37:21+00:00 to 2026-02-02T15:53:20+00:00
Slots: 0 → 57977
Clients: ethlambda, grandine, lantern, qlean, ream, zeam

Load Data

Show code
# Load PQ signature timing data
timing_df = pd.read_parquet(DEVNET_DIR / "pq_signature_timing.parquet")
print(f"Loaded {len(timing_df)} timing records")
print(f"Metrics: {timing_df['metric'].unique().tolist()}")
print(f"Clients: {timing_df['client'].unique().tolist()}")
Loaded 630 timing records
Metrics: ['signing', 'verification', 'agg_building', 'agg_verification']
Clients: ['lantern', 'ream', 'zeam', 'qlean']
Show code
# Load PQ signature counts
counts_df = pd.read_parquet(DEVNET_DIR / "pq_signature_metrics.parquet")
print(f"Loaded {len(counts_df)} count records")
print(f"Metrics: {counts_df['metric'].unique().tolist()}")

# Unified client list from devnet metadata (includes all containers via cAdvisor)
all_clients = sorted(devnet_info["clients"])
n_cols = min(len(all_clients), 2)
n_rows = -(-len(all_clients) // n_cols)
print(f"\nAll clients ({len(all_clients)}): {all_clients}")
Loaded 883 count records
Metrics: ['lean_pq_sig_aggregated_signatures_valid_total', 'lean_pq_sig_aggregated_signatures_invalid_total', 'lean_pq_sig_aggregated_signatures_total', 'lean_pq_sig_attestations_in_aggregated_signatures_total']

All clients (6): ['ethlambda', 'grandine', 'lantern', 'qlean', 'ream', 'zeam']

Attestation Signing Time

How long does it take to sign an attestation using post-quantum cryptography?

Show code
# Filter to signing time metric
signing_df = timing_df[timing_df["metric"] == "signing"].copy()

if signing_df.empty:
    print("No signing time data available")
else:
    # Convert to milliseconds for readability
    signing_df["value_ms"] = signing_df["value"] * 1000

    fig = make_subplots(
        rows=n_rows, cols=n_cols,
        subplot_titles=all_clients,
        vertical_spacing=0.12 / max(n_rows - 1, 1) * 2,
        horizontal_spacing=0.08,
    )

    colors = {"0.5": "#636EFA", "0.95": "#EF553B", "0.99": "#00CC96"}
    legend_added = set()

    for i, client in enumerate(all_clients):
        row = i // n_cols + 1
        col = i % n_cols + 1
        cdf = signing_df[signing_df["client"] == client]
        if not cdf.empty:
            for q in sorted(cdf["quantile"].unique()):
                qdf = cdf[cdf["quantile"] == q].sort_values("timestamp")
                q_str = str(q)
                label = f"p{int(q * 100)}"
                show_legend = q_str not in legend_added
                legend_added.add(q_str)
                fig.add_trace(
                    go.Scatter(
                        x=qdf["timestamp"], y=qdf["value_ms"],
                        name=label, legendgroup=q_str,
                        showlegend=show_legend,
                        line=dict(color=colors.get(q_str, "#AB63FA")),
                    ),
                    row=row, col=col,
                )
            fig.update_yaxes(title_text="ms", row=row, col=col)

        else:
            fig.add_trace(
                go.Scatter(x=[None], y=[None], showlegend=False, hoverinfo='skip'),
                row=row, col=col,
            )
            _n = (row - 1) * n_cols + col
            _s = "" if _n == 1 else str(_n)
            fig.add_annotation(
                text="No data available",
                xref=f"x{_s} domain", yref=f"y{_s} domain",
                x=0.5, y=0.5,
                showarrow=False,
                font=dict(size=12, color="#999"),
            )
    fig.update_layout(
        title="Attestation Signing Time by Client",
        height=270 * n_rows,
    )
    fig.show()
Show code
# Summary statistics by client
if not signing_df.empty:
    summary = signing_df.groupby(["client", "quantile"])["value_ms"].agg(["mean", "min", "max"]).round(3)
    summary.columns = ["Mean (ms)", "Min (ms)", "Max (ms)"]
    display(summary)
Mean (ms) Min (ms) Max (ms)
client quantile
lantern 0.50 2.500 2.50 2.500
0.95 4.750 4.75 4.750
0.99 4.950 4.95 4.950
ream 0.50 2.555 2.50 2.687
0.95 4.951 4.75 6.400
0.99 7.530 4.95 16.600
zeam 0.50 2.889 2.50 5.577
0.95 6.320 4.75 9.558
0.99 7.050 4.95 9.912

Attestation Verification Time

How long does it take to verify an attestation signature?

Show code
# Filter to verification time metric
verification_df = timing_df[timing_df["metric"] == "verification"].copy()

if verification_df.empty:
    print("No verification time data available")
else:
    verification_df["value_ms"] = verification_df["value"] * 1000

    fig = make_subplots(
        rows=n_rows, cols=n_cols,
        subplot_titles=all_clients,
        vertical_spacing=0.12 / max(n_rows - 1, 1) * 2,
        horizontal_spacing=0.08,
    )

    colors = {"0.5": "#636EFA", "0.95": "#EF553B", "0.99": "#00CC96"}
    legend_added = set()

    for i, client in enumerate(all_clients):
        row = i // n_cols + 1
        col = i % n_cols + 1
        cdf = verification_df[verification_df["client"] == client]
        if not cdf.empty:
            for q in sorted(cdf["quantile"].unique()):
                qdf = cdf[cdf["quantile"] == q].sort_values("timestamp")
                q_str = str(q)
                label = f"p{int(q * 100)}"
                show_legend = q_str not in legend_added
                legend_added.add(q_str)
                fig.add_trace(
                    go.Scatter(
                        x=qdf["timestamp"], y=qdf["value_ms"],
                        name=label, legendgroup=q_str,
                        showlegend=show_legend,
                        line=dict(color=colors.get(q_str, "#AB63FA")),
                    ),
                    row=row, col=col,
                )
            fig.update_yaxes(title_text="ms", row=row, col=col)

        else:
            fig.add_trace(
                go.Scatter(x=[None], y=[None], showlegend=False, hoverinfo='skip'),
                row=row, col=col,
            )
            _n = (row - 1) * n_cols + col
            _s = "" if _n == 1 else str(_n)
            fig.add_annotation(
                text="No data available",
                xref=f"x{_s} domain", yref=f"y{_s} domain",
                x=0.5, y=0.5,
                showarrow=False,
                font=dict(size=12, color="#999"),
            )
    fig.update_layout(
        title="Attestation Verification Time by Client",
        height=270 * n_rows,
    )
    fig.show()
Show code
# Summary statistics by client
if not verification_df.empty:
    summary = verification_df.groupby(["client", "quantile"])["value_ms"].agg(["mean", "min", "max"]).round(3)
    summary.columns = ["Mean (ms)", "Min (ms)", "Max (ms)"]
    display(summary)
Mean (ms) Min (ms) Max (ms)
client quantile
lantern 0.50 2.500 2.500 2.500
0.95 4.750 4.750 4.750
0.99 4.950 4.950 4.950
ream 0.50 34.478 22.500 37.083
0.95 48.516 46.875 49.458
0.99 52.140 49.375 83.500
zeam 0.50 3.153 2.500 4.345
0.95 36.068 4.750 88.676
0.99 47.129 4.950 97.735

Attestations in Aggregated Signatures

Cumulative number of individual attestations included in aggregated signature proofs over time.

Show code
att_in_agg_df = counts_df[counts_df["metric"] == "lean_pq_sig_attestations_in_aggregated_signatures_total"]

if att_in_agg_df.empty:
    print("No attestations in aggregated signatures data available")
else:
    fig = make_subplots(
        rows=n_rows, cols=n_cols,
        subplot_titles=all_clients,
        vertical_spacing=0.12 / max(n_rows - 1, 1) * 2,
        horizontal_spacing=0.08,
    )

    for i, client in enumerate(all_clients):
        row = i // n_cols + 1
        col = i % n_cols + 1
        cdf = att_in_agg_df[att_in_agg_df["client"] == client].sort_values("timestamp")
        if not cdf.empty and cdf["value"].max() > 0:
            fig.add_trace(
                go.Scatter(
                    x=cdf["timestamp"], y=cdf["value"],
                    showlegend=False,
                    line=dict(color="#636EFA"),
                ),
                row=row, col=col,
            )
            fig.update_yaxes(title_text="count", row=row, col=col)
        else:
            fig.add_trace(
                go.Scatter(x=[None], y=[None], showlegend=False, hoverinfo='skip'),
                row=row, col=col,
            )
            _n = (row - 1) * n_cols + col
            _s = "" if _n == 1 else str(_n)
            fig.add_annotation(
                text="No data available",
                xref=f"x{_s} domain", yref=f"y{_s} domain",
                x=0.5, y=0.5,
                showarrow=False,
                font=dict(size=12, color="#999"),
            )

    fig.update_layout(
        title="Attestations in Aggregated Signatures by Client",
        height=270 * n_rows,
    )
    fig.show()

Signature Aggregation Time

Time to build an aggregated signatures proof from individual post-quantum signatures.

Show code
# Filter to aggregate building metric
agg_build_df = timing_df[timing_df["metric"] == "agg_building"].copy()

if agg_build_df.empty:
    print("No signature aggregation timing data available")
else:
    agg_build_df["value_ms"] = agg_build_df["value"] * 1000

    fig = make_subplots(
        rows=n_rows, cols=n_cols,
        subplot_titles=all_clients,
        vertical_spacing=0.12 / max(n_rows - 1, 1) * 2,
        horizontal_spacing=0.08,
    )

    colors = {"0.5": "#636EFA", "0.95": "#EF553B", "0.99": "#00CC96"}
    legend_added = set()

    for i, client in enumerate(all_clients):
        row = i // n_cols + 1
        col = i % n_cols + 1
        cdf = agg_build_df[agg_build_df["client"] == client]
        if not cdf.empty:
            for q in sorted(cdf["quantile"].unique()):
                qdf = cdf[cdf["quantile"] == q].sort_values("timestamp")
                q_str = str(q)
                label = f"p{int(q * 100)}"
                show_legend = q_str not in legend_added
                legend_added.add(q_str)
                fig.add_trace(
                    go.Scatter(
                        x=qdf["timestamp"], y=qdf["value_ms"],
                        name=label, legendgroup=q_str,
                        showlegend=show_legend,
                        line=dict(color=colors.get(q_str, "#AB63FA")),
                    ),
                    row=row, col=col,
                )
        else:
            fig.add_trace(
                go.Scatter(x=[None], y=[None], showlegend=False, hoverinfo='skip'),
                row=row, col=col,
            )
            _n = (row - 1) * n_cols + col
            _s = "" if _n == 1 else str(_n)
            fig.add_annotation(
                text="No data available",
                xref=f"x{_s} domain", yref=f"y{_s} domain",
                x=0.5, y=0.5,
                showarrow=False,
                font=dict(size=12, color="#999"),
            )
        fig.update_yaxes(title_text="ms", row=row, col=col)

    fig.update_layout(
        title="Signature Aggregation Time by Client",
        height=270 * n_rows,
    )
    fig.show()

Aggregated Signatures per Second

Rate of signature aggregations per second.

Show code
agg_total_df = counts_df[counts_df["metric"] == "lean_pq_sig_aggregated_signatures_total"]

if agg_total_df.empty:
    print("No signature aggregation count data available")
else:
    fig = make_subplots(
        rows=n_rows, cols=n_cols,
        subplot_titles=all_clients,
        vertical_spacing=0.12 / max(n_rows - 1, 1) * 2,
        horizontal_spacing=0.08,
    )

    for i, client in enumerate(all_clients):
        row = i // n_cols + 1
        col = i % n_cols + 1
        cdf = agg_total_df[agg_total_df["client"] == client].sort_values("timestamp")
        if not cdf.empty and cdf["value"].max() > 0:
            dt = cdf["timestamp"].diff().dt.total_seconds()
            dv = cdf["value"].diff()
            rate = (dv / dt).iloc[1:]
            ts = cdf["timestamp"].iloc[1:]
            mask = rate >= 0
            rate = rate[mask]
            ts = ts[mask]
            if not rate.empty:
                fig.add_trace(
                    go.Scatter(
                        x=ts, y=rate,
                        showlegend=False,
                        line=dict(color="#636EFA"),
                    ),
                    row=row, col=col,
                )
                fig.update_yaxes(title_text="/s", row=row, col=col)
                continue
        fig.add_trace(
            go.Scatter(x=[None], y=[None], showlegend=False, hoverinfo='skip'),
            row=row, col=col,
        )
        _n = (row - 1) * n_cols + col
        _s = "" if _n == 1 else str(_n)
        fig.add_annotation(
            text="No data available",
            xref=f"x{_s} domain", yref=f"y{_s} domain",
            x=0.5, y=0.5,
            showarrow=False,
            font=dict(size=12, color="#999"),
        )

    fig.update_layout(
        title="Signature Aggregation Rate by Client",
        height=270 * n_rows,
    )
    fig.show()

Aggregated Signatures per Slot

Rate of signature aggregations per slot (4-second slot time).

Show code
SLOT_TIME = 4  # seconds

if agg_total_df.empty:
    print("No signature aggregation count data available")
else:
    fig = make_subplots(
        rows=n_rows, cols=n_cols,
        subplot_titles=all_clients,
        vertical_spacing=0.12 / max(n_rows - 1, 1) * 2,
        horizontal_spacing=0.08,
    )

    for i, client in enumerate(all_clients):
        row = i // n_cols + 1
        col = i % n_cols + 1
        cdf = agg_total_df[agg_total_df["client"] == client].sort_values("timestamp")
        if not cdf.empty and cdf["value"].max() > 0:
            dt = cdf["timestamp"].diff().dt.total_seconds()
            dv = cdf["value"].diff()
            rate = (dv / dt * SLOT_TIME).iloc[1:]
            ts = cdf["timestamp"].iloc[1:]
            mask = rate >= 0
            rate = rate[mask]
            ts = ts[mask]
            if not rate.empty:
                fig.add_trace(
                    go.Scatter(
                        x=ts, y=rate,
                        showlegend=False,
                        line=dict(color="#636EFA"),
                    ),
                    row=row, col=col,
                )
                fig.update_yaxes(title_text="/slot", row=row, col=col)
                continue
        fig.add_trace(
            go.Scatter(x=[None], y=[None], showlegend=False, hoverinfo='skip'),
            row=row, col=col,
        )
        _n = (row - 1) * n_cols + col
        _s = "" if _n == 1 else str(_n)
        fig.add_annotation(
            text="No data available",
            xref=f"x{_s} domain", yref=f"y{_s} domain",
            x=0.5, y=0.5,
            showarrow=False,
            font=dict(size=12, color="#999"),
        )

    fig.update_layout(
        title="Signature Aggregation Rate per Slot by Client",
        height=270 * n_rows,
    )
    fig.show()

Aggregated Signature Verification Time

Time to verify an aggregated post-quantum signatures proof.

Show code
# Filter to aggregate verification metric
agg_ver_df = timing_df[timing_df["metric"] == "agg_verification"].copy()

if agg_ver_df.empty:
    print("No aggregated signature verification timing data available")
else:
    agg_ver_df["value_ms"] = agg_ver_df["value"] * 1000

    fig = make_subplots(
        rows=n_rows, cols=n_cols,
        subplot_titles=all_clients,
        vertical_spacing=0.12 / max(n_rows - 1, 1) * 2,
        horizontal_spacing=0.08,
    )

    colors = {"0.5": "#636EFA", "0.95": "#EF553B", "0.99": "#00CC96"}
    legend_added = set()

    for i, client in enumerate(all_clients):
        row = i // n_cols + 1
        col = i % n_cols + 1
        cdf = agg_ver_df[agg_ver_df["client"] == client]
        if not cdf.empty:
            for q in sorted(cdf["quantile"].unique()):
                qdf = cdf[cdf["quantile"] == q].sort_values("timestamp")
                q_str = str(q)
                label = f"p{int(q * 100)}"
                show_legend = q_str not in legend_added
                legend_added.add(q_str)
                fig.add_trace(
                    go.Scatter(
                        x=qdf["timestamp"], y=qdf["value_ms"],
                        name=label, legendgroup=q_str,
                        showlegend=show_legend,
                        line=dict(color=colors.get(q_str, "#AB63FA")),
                    ),
                    row=row, col=col,
                )
        else:
            fig.add_trace(
                go.Scatter(x=[None], y=[None], showlegend=False, hoverinfo='skip'),
                row=row, col=col,
            )
            _n = (row - 1) * n_cols + col
            _s = "" if _n == 1 else str(_n)
            fig.add_annotation(
                text="No data available",
                xref=f"x{_s} domain", yref=f"y{_s} domain",
                x=0.5, y=0.5,
                showarrow=False,
                font=dict(size=12, color="#999"),
            )
        fig.update_yaxes(title_text="ms", row=row, col=col)

    fig.update_layout(
        title="Aggregated Signature Verification Time by Client",
        height=270 * n_rows,
    )
    fig.show()

Aggregated Signature Verification per Second

Rate of valid/invalid aggregated signature verifications per second.

Show code
# Calculate valid/invalid signature rate per client
valid_df = counts_df[counts_df["metric"] == "lean_pq_sig_aggregated_signatures_valid_total"]
invalid_df = counts_df[counts_df["metric"] == "lean_pq_sig_aggregated_signatures_invalid_total"]

if valid_df.empty and invalid_df.empty:
    print("No signature count data available")
else:
    fig = make_subplots(
        rows=n_rows, cols=n_cols,
        subplot_titles=all_clients,
        vertical_spacing=0.12 / max(n_rows - 1, 1) * 2,
        horizontal_spacing=0.08,
    )

    colors = {"valid": "#2ecc71", "invalid": "#e74c3c"}
    legend_added = set()

    for i, client in enumerate(all_clients):
        row = i // n_cols + 1
        col = i % n_cols + 1
        has_data = False

        for status, sdf in [("valid", valid_df), ("invalid", invalid_df)]:
            cdf = sdf[sdf["client"] == client].sort_values("timestamp")
            if cdf.empty or cdf["value"].max() == 0:
                continue
            # Compute rate: diff(value) / diff(timestamp) in per-second
            dt = cdf["timestamp"].diff().dt.total_seconds()
            dv = cdf["value"].diff()
            rate = (dv / dt).iloc[1:]  # per second, skip first NaN
            ts = cdf["timestamp"].iloc[1:]
            # Drop negative rates (counter resets)
            mask = rate >= 0
            rate = rate[mask]
            ts = ts[mask]
            if rate.empty:
                continue
            has_data = True
            show_legend = status not in legend_added
            legend_added.add(status)
            fig.add_trace(
                go.Scatter(
                    x=ts, y=rate,
                    name=status, legendgroup=status,
                    showlegend=show_legend,
                    line=dict(color=colors[status]),
                ),
                row=row, col=col,
            )

        if has_data:
            fig.update_yaxes(title_text="/s", row=row, col=col)

        else:
            fig.add_trace(
                go.Scatter(x=[None], y=[None], showlegend=False, hoverinfo='skip'),
                row=row, col=col,
            )
            _n = (row - 1) * n_cols + col
            _s = "" if _n == 1 else str(_n)
            fig.add_annotation(
                text="No data available",
                xref=f"x{_s} domain", yref=f"y{_s} domain",
                x=0.5, y=0.5,
                showarrow=False,
                font=dict(size=12, color="#999"),
            )
    fig.update_layout(
        title="Aggregated Signature Verification Rate by Client",
        height=270 * n_rows,
    )
    fig.show()

Aggregated Signature Verification per Slot

Rate of valid/invalid aggregated signature verifications per slot (4-second slot time).

Show code
SLOT_TIME = 4  # seconds

if valid_df.empty and invalid_df.empty:
    print("No signature count data available")
else:
    fig = make_subplots(
        rows=n_rows, cols=n_cols,
        subplot_titles=all_clients,
        vertical_spacing=0.12 / max(n_rows - 1, 1) * 2,
        horizontal_spacing=0.08,
    )

    colors = {"valid": "#2ecc71", "invalid": "#e74c3c"}
    legend_added = set()

    for i, client in enumerate(all_clients):
        row = i // n_cols + 1
        col = i % n_cols + 1
        has_data = False

        for status, sdf in [("valid", valid_df), ("invalid", invalid_df)]:
            cdf = sdf[sdf["client"] == client].sort_values("timestamp")
            if cdf.empty or cdf["value"].max() == 0:
                continue
            # Compute rate per slot: diff(value) / diff(seconds) * slot_time
            dt = cdf["timestamp"].diff().dt.total_seconds()
            dv = cdf["value"].diff()
            rate = (dv / dt * SLOT_TIME).iloc[1:]
            ts = cdf["timestamp"].iloc[1:]
            # Drop negative rates (counter resets)
            mask = rate >= 0
            rate = rate[mask]
            ts = ts[mask]
            if rate.empty:
                continue
            has_data = True
            show_legend = status not in legend_added
            legend_added.add(status)
            fig.add_trace(
                go.Scatter(
                    x=ts, y=rate,
                    name=status, legendgroup=status,
                    showlegend=show_legend,
                    line=dict(color=colors[status]),
                ),
                row=row, col=col,
            )

        if has_data:
            fig.update_yaxes(title_text="/slot", row=row, col=col)

        else:
            fig.add_trace(
                go.Scatter(x=[None], y=[None], showlegend=False, hoverinfo='skip'),
                row=row, col=col,
            )
            _n = (row - 1) * n_cols + col
            _s = "" if _n == 1 else str(_n)
            fig.add_annotation(
                text="No data available",
                xref=f"x{_s} domain", yref=f"y{_s} domain",
                x=0.5, y=0.5,
                showarrow=False,
                font=dict(size=12, color="#999"),
            )
    fig.update_layout(
        title="Aggregated Signature Verification Rate per Slot by Client",
        height=270 * n_rows,
    )
    fig.show()

Summary

Key findings from this devnet iteration:

Show code
# Generate summary statistics
print(f"Devnet: {devnet_id}")
print(f"Duration: {devnet_info['duration_hours']:.1f} hours")
print(f"Clients analyzed: {len(timing_df['client'].unique())}")
print()

if not signing_df.empty:
    p95_mean = signing_df[signing_df["quantile"] == 0.95]["value_ms"].mean()
    print(f"Average P95 signing time: {p95_mean:.2f} ms")

if not verification_df.empty:
    p95_ver = verification_df[verification_df["quantile"] == 0.95]["value_ms"].mean()
    print(f"Average P95 verification time: {p95_ver:.2f} ms")
Devnet: pqdevnet-20260202T1337Z
Duration: 2.3 hours
Clients analyzed: 4

Average P95 signing time: 5.63 ms
Average P95 verification time: 37.36 ms