PQ Signature Performance
Analysis of post-quantum cryptographic signature performance in Lean Consensus clients.
This notebook examines:
- Attestation signing time (p50, p95, p99)
- Attestation verification time
- Signature counts (total, valid, invalid)
- Performance comparison across clients
Show code
import json
from pathlib import Path
import pandas as pd
import plotly.graph_objects as go
from plotly.subplots import make_subplots
# Set default renderer for static HTML output
import plotly.io as pio
pio.renderers.default = "notebook"
Show code
# Resolve devnet_id
DATA_DIR = Path("../data")
if devnet_id is None:
# Use latest devnet from manifest
devnets_path = DATA_DIR / "devnets.json"
if devnets_path.exists():
with open(devnets_path) as f:
devnets = json.load(f).get("devnets", [])
if devnets:
devnet_id = devnets[-1]["id"] # Latest
print(f"Using latest devnet: {devnet_id}")
else:
raise ValueError("No devnets.json found. Run 'just detect-devnets' first.")
DEVNET_DIR = DATA_DIR / devnet_id
print(f"Loading data from: {DEVNET_DIR}")
Show code
# Load devnet metadata
with open(DATA_DIR / "devnets.json") as f:
devnets_data = json.load(f)
devnet_info = next((d for d in devnets_data["devnets"] if d["id"] == devnet_id), None)
if devnet_info:
print(f"Devnet: {devnet_info['id']}")
print(f"Duration: {devnet_info['duration_hours']:.1f} hours")
print(f"Time: {devnet_info['start_time']} to {devnet_info['end_time']}")
print(f"Slots: {devnet_info['start_slot']} β {devnet_info['end_slot']}")
print(f"Clients: {', '.join(devnet_info['clients'])}")
Load DataΒΆ
Show code
# Load PQ signature timing data
timing_df = pd.read_parquet(DEVNET_DIR / "pq_signature_timing.parquet")
print(f"Loaded {len(timing_df)} timing records")
print(f"Metrics: {timing_df['metric'].unique().tolist()}")
print(f"Clients: {timing_df['client'].unique().tolist()}")
Show code
# Load PQ signature counts
counts_df = pd.read_parquet(DEVNET_DIR / "pq_signature_metrics.parquet")
print(f"Loaded {len(counts_df)} count records")
print(f"Metrics: {counts_df['metric'].unique().tolist()}")
# Unified client list from devnet metadata (includes all containers via cAdvisor)
all_clients = sorted(devnet_info["clients"])
n_cols = min(len(all_clients), 2)
n_rows = -(-len(all_clients) // n_cols)
print(f"\nAll clients ({len(all_clients)}): {all_clients}")
Attestation Signature CountsΒΆ
Total aggregated signatures produced by each client, broken down by validation result.
Show code
# Signature counts over time per client
# These are cumulative Prometheus counters β plot as-is to show growth over time.
from IPython.display import HTML, display
count_metrics = {
"lean_pq_sig_aggregated_signatures_total": ("Total", "#636EFA"),
"lean_pq_sig_aggregated_signatures_valid_total": ("Valid", "#00CC96"),
"lean_pq_sig_aggregated_signatures_invalid_total": ("Invalid", "#EF553B"),
}
if counts_df.empty:
print("No signature count data available")
else:
sig_df = counts_df[counts_df["metric"].isin(count_metrics)].copy()
fig = make_subplots(
rows=n_rows, cols=n_cols,
subplot_titles=all_clients,
vertical_spacing=0.12 / max(n_rows - 1, 1) * 2,
horizontal_spacing=0.08,
)
legend_added = set()
for i, client in enumerate(all_clients):
row = i // n_cols + 1
col = i % n_cols + 1
cdf = sig_df[sig_df["client"] == client]
if not cdf.empty:
for metric_name, (label, color) in count_metrics.items():
mdf = cdf[cdf["metric"] == metric_name].sort_values("timestamp")
if mdf.empty:
continue
show_legend = label not in legend_added
legend_added.add(label)
fig.add_trace(
go.Scatter(
x=mdf["timestamp"], y=mdf["value"],
name=label, legendgroup=label,
showlegend=show_legend,
line=dict(color=color),
),
row=row, col=col,
)
fig.update_yaxes(title_text="count", row=row, col=col)
else:
fig.add_trace(
go.Scatter(x=[None], y=[None], showlegend=False, hoverinfo="skip"),
row=row, col=col,
)
_n = (row - 1) * n_cols + col
_s = "" if _n == 1 else str(_n)
fig.add_annotation(
text="No data available",
xref=f"x{_s} domain", yref=f"y{_s} domain",
x=0.5, y=0.5,
showarrow=False,
font=dict(size=12, color="#999"),
)
fig.update_layout(
title="Cumulative Attestation Signature Counts by Client",
height=270 * n_rows,
)
fig.show()
Attestation Signing TimeΒΆ
How long does it take to sign an attestation using post-quantum cryptography?
Show code
# Filter to signing time metric
signing_df = timing_df[timing_df["metric"] == "signing"].copy()
if signing_df.empty:
print("No signing time data available")
else:
# Convert to milliseconds for readability
signing_df["value_ms"] = signing_df["value"] * 1000
fig = make_subplots(
rows=n_rows, cols=n_cols,
subplot_titles=all_clients,
vertical_spacing=0.12 / max(n_rows - 1, 1) * 2,
horizontal_spacing=0.08,
)
colors = {"0.5": "#636EFA", "0.95": "#EF553B", "0.99": "#00CC96"}
legend_added = set()
for i, client in enumerate(all_clients):
row = i // n_cols + 1
col = i % n_cols + 1
cdf = signing_df[signing_df["client"] == client]
if not cdf.empty:
for q in sorted(cdf["quantile"].unique()):
qdf = cdf[cdf["quantile"] == q].sort_values("timestamp")
q_str = str(q)
label = f"p{int(q * 100)}"
show_legend = q_str not in legend_added
legend_added.add(q_str)
fig.add_trace(
go.Scatter(
x=qdf["timestamp"], y=qdf["value_ms"],
name=label, legendgroup=q_str,
showlegend=show_legend,
line=dict(color=colors.get(q_str, "#AB63FA")),
),
row=row, col=col,
)
fig.update_yaxes(title_text="ms", row=row, col=col)
else:
fig.add_trace(
go.Scatter(x=[None], y=[None], showlegend=False, hoverinfo='skip'),
row=row, col=col,
)
_n = (row - 1) * n_cols + col
_s = "" if _n == 1 else str(_n)
fig.add_annotation(
text="No data available",
xref=f"x{_s} domain", yref=f"y{_s} domain",
x=0.5, y=0.5,
showarrow=False,
font=dict(size=12, color="#999"),
)
fig.update_layout(
title="Attestation Signing Time by Client",
height=270 * n_rows,
)
fig.show()
Show code
# Summary statistics by client
if not signing_df.empty:
summary = signing_df.groupby(["client", "quantile"])["value_ms"].agg(["mean", "min", "max"]).round(3)
summary.columns = ["Mean (ms)", "Min (ms)", "Max (ms)"]
display(summary)
Attestation Verification TimeΒΆ
How long does it take to verify an attestation signature?
Show code
# Filter to verification time metric
verification_df = timing_df[timing_df["metric"] == "verification"].copy()
if verification_df.empty:
print("No verification time data available")
else:
verification_df["value_ms"] = verification_df["value"] * 1000
fig = make_subplots(
rows=n_rows, cols=n_cols,
subplot_titles=all_clients,
vertical_spacing=0.12 / max(n_rows - 1, 1) * 2,
horizontal_spacing=0.08,
)
colors = {"0.5": "#636EFA", "0.95": "#EF553B", "0.99": "#00CC96"}
legend_added = set()
for i, client in enumerate(all_clients):
row = i // n_cols + 1
col = i % n_cols + 1
cdf = verification_df[verification_df["client"] == client]
if not cdf.empty:
for q in sorted(cdf["quantile"].unique()):
qdf = cdf[cdf["quantile"] == q].sort_values("timestamp")
q_str = str(q)
label = f"p{int(q * 100)}"
show_legend = q_str not in legend_added
legend_added.add(q_str)
fig.add_trace(
go.Scatter(
x=qdf["timestamp"], y=qdf["value_ms"],
name=label, legendgroup=q_str,
showlegend=show_legend,
line=dict(color=colors.get(q_str, "#AB63FA")),
),
row=row, col=col,
)
fig.update_yaxes(title_text="ms", row=row, col=col)
else:
fig.add_trace(
go.Scatter(x=[None], y=[None], showlegend=False, hoverinfo='skip'),
row=row, col=col,
)
_n = (row - 1) * n_cols + col
_s = "" if _n == 1 else str(_n)
fig.add_annotation(
text="No data available",
xref=f"x{_s} domain", yref=f"y{_s} domain",
x=0.5, y=0.5,
showarrow=False,
font=dict(size=12, color="#999"),
)
fig.update_layout(
title="Attestation Verification Time by Client",
height=270 * n_rows,
)
fig.show()
Show code
# Summary statistics by client
if not verification_df.empty:
summary = verification_df.groupby(["client", "quantile"])["value_ms"].agg(["mean", "min", "max"]).round(3)
summary.columns = ["Mean (ms)", "Min (ms)", "Max (ms)"]
display(summary)
SummaryΒΆ
Key findings from this devnet iteration:
Show code
# Generate summary statistics
print(f"Devnet: {devnet_id}")
print(f"Duration: {devnet_info['duration_hours']:.1f} hours")
print(f"Clients analyzed: {len(timing_df['client'].unique())}")
print()
if not signing_df.empty:
p95_mean = signing_df[signing_df["quantile"] == 0.95]["value_ms"].mean()
print(f"Average P95 signing time: {p95_mean:.2f} ms")
if not verification_df.empty:
p95_ver = verification_df[verification_df["quantile"] == 0.95]["value_ms"].mean()
print(f"Average P95 verification time: {p95_ver:.2f} ms")
if not counts_df.empty:
sig_totals = counts_df[counts_df["metric"] == "lean_pq_sig_aggregated_signatures_total"].groupby("client")["value"].max()
valid_totals = counts_df[counts_df["metric"] == "lean_pq_sig_aggregated_signatures_valid_total"].groupby("client")["value"].max()
invalid_totals = counts_df[counts_df["metric"] == "lean_pq_sig_aggregated_signatures_invalid_total"].groupby("client")["value"].max()
print(f"\nTotal signatures: {int(sig_totals.sum()):,}")
print(f"Valid signatures: {int(valid_totals.sum()):,}")
print(f"Invalid signatures: {int(invalid_totals.sum()):,}")