PQ Signature Performance
Post-Quantum Signature Performance¶
Analysis of post-quantum cryptographic signature performance in Lean Consensus clients.
This notebook examines:
- Attestation signing time (p50, p95, p99)
- Attestation verification time
- Attestations included in aggregated signatures
- Signature aggregation time
- Aggregated signature verification time
- Performance comparison across clients
Show code
import json
from pathlib import Path
import pandas as pd
import plotly.graph_objects as go
from plotly.subplots import make_subplots
# Set default renderer for static HTML output
import plotly.io as pio
pio.renderers.default = "notebook"
Show code
# Resolve devnet_id
DATA_DIR = Path("../data")
if devnet_id is None:
# Use latest devnet from manifest
devnets_path = DATA_DIR / "devnets.json"
if devnets_path.exists():
with open(devnets_path) as f:
devnets = json.load(f).get("devnets", [])
if devnets:
devnet_id = devnets[-1]["id"] # Latest
print(f"Using latest devnet: {devnet_id}")
else:
raise ValueError("No devnets.json found. Run 'just detect-devnets' first.")
DEVNET_DIR = DATA_DIR / devnet_id
print(f"Loading data from: {DEVNET_DIR}")
Show code
# Load devnet metadata
with open(DATA_DIR / "devnets.json") as f:
devnets_data = json.load(f)
devnet_info = next((d for d in devnets_data["devnets"] if d["id"] == devnet_id), None)
if devnet_info:
print(f"Devnet: {devnet_info['id']}")
print(f"Duration: {devnet_info['duration_hours']:.1f} hours")
print(f"Time: {devnet_info['start_time']} to {devnet_info['end_time']}")
print(f"Slots: {devnet_info['start_slot']} → {devnet_info['end_slot']}")
print(f"Clients: {', '.join(devnet_info['clients'])}")
Load Data¶
Show code
# Load PQ signature timing data
timing_df = pd.read_parquet(DEVNET_DIR / "pq_signature_timing.parquet")
print(f"Loaded {len(timing_df)} timing records")
print(f"Metrics: {timing_df['metric'].unique().tolist()}")
print(f"Clients: {timing_df['client'].unique().tolist()}")
Show code
# Load PQ signature counts
counts_df = pd.read_parquet(DEVNET_DIR / "pq_signature_metrics.parquet")
print(f"Loaded {len(counts_df)} count records")
print(f"Metrics: {counts_df['metric'].unique().tolist()}")
# Unified client list from devnet metadata (includes all containers via cAdvisor)
all_clients = sorted(devnet_info["clients"])
n_cols = min(len(all_clients), 2)
n_rows = -(-len(all_clients) // n_cols)
print(f"\nAll clients ({len(all_clients)}): {all_clients}")
Attestation Signing Time¶
How long does it take to sign an attestation using post-quantum cryptography?
Show code
# Filter to signing time metric
signing_df = timing_df[timing_df["metric"] == "signing"].copy()
if signing_df.empty:
print("No signing time data available")
else:
# Convert to milliseconds for readability
signing_df["value_ms"] = signing_df["value"] * 1000
fig = make_subplots(
rows=n_rows, cols=n_cols,
subplot_titles=all_clients,
vertical_spacing=0.12 / max(n_rows - 1, 1) * 2,
horizontal_spacing=0.08,
)
colors = {"0.5": "#636EFA", "0.95": "#EF553B", "0.99": "#00CC96"}
legend_added = set()
for i, client in enumerate(all_clients):
row = i // n_cols + 1
col = i % n_cols + 1
cdf = signing_df[signing_df["client"] == client]
if not cdf.empty:
for q in sorted(cdf["quantile"].unique()):
qdf = cdf[cdf["quantile"] == q].sort_values("timestamp")
q_str = str(q)
label = f"p{int(q * 100)}"
show_legend = q_str not in legend_added
legend_added.add(q_str)
fig.add_trace(
go.Scatter(
x=qdf["timestamp"], y=qdf["value_ms"],
name=label, legendgroup=q_str,
showlegend=show_legend,
line=dict(color=colors.get(q_str, "#AB63FA")),
),
row=row, col=col,
)
fig.update_yaxes(title_text="ms", row=row, col=col)
else:
fig.add_trace(
go.Scatter(x=[None], y=[None], showlegend=False, hoverinfo='skip'),
row=row, col=col,
)
_n = (row - 1) * n_cols + col
_s = "" if _n == 1 else str(_n)
fig.add_annotation(
text="No data available",
xref=f"x{_s} domain", yref=f"y{_s} domain",
x=0.5, y=0.5,
showarrow=False,
font=dict(size=12, color="#999"),
)
fig.update_layout(
title="Attestation Signing Time by Client",
height=270 * n_rows,
)
fig.show()
Show code
# Summary statistics by client
if not signing_df.empty:
summary = signing_df.groupby(["client", "quantile"])["value_ms"].agg(["mean", "min", "max"]).round(3)
summary.columns = ["Mean (ms)", "Min (ms)", "Max (ms)"]
display(summary)
Attestation Verification Time¶
How long does it take to verify an attestation signature?
Show code
# Filter to verification time metric
verification_df = timing_df[timing_df["metric"] == "verification"].copy()
if verification_df.empty:
print("No verification time data available")
else:
verification_df["value_ms"] = verification_df["value"] * 1000
fig = make_subplots(
rows=n_rows, cols=n_cols,
subplot_titles=all_clients,
vertical_spacing=0.12 / max(n_rows - 1, 1) * 2,
horizontal_spacing=0.08,
)
colors = {"0.5": "#636EFA", "0.95": "#EF553B", "0.99": "#00CC96"}
legend_added = set()
for i, client in enumerate(all_clients):
row = i // n_cols + 1
col = i % n_cols + 1
cdf = verification_df[verification_df["client"] == client]
if not cdf.empty:
for q in sorted(cdf["quantile"].unique()):
qdf = cdf[cdf["quantile"] == q].sort_values("timestamp")
q_str = str(q)
label = f"p{int(q * 100)}"
show_legend = q_str not in legend_added
legend_added.add(q_str)
fig.add_trace(
go.Scatter(
x=qdf["timestamp"], y=qdf["value_ms"],
name=label, legendgroup=q_str,
showlegend=show_legend,
line=dict(color=colors.get(q_str, "#AB63FA")),
),
row=row, col=col,
)
fig.update_yaxes(title_text="ms", row=row, col=col)
else:
fig.add_trace(
go.Scatter(x=[None], y=[None], showlegend=False, hoverinfo='skip'),
row=row, col=col,
)
_n = (row - 1) * n_cols + col
_s = "" if _n == 1 else str(_n)
fig.add_annotation(
text="No data available",
xref=f"x{_s} domain", yref=f"y{_s} domain",
x=0.5, y=0.5,
showarrow=False,
font=dict(size=12, color="#999"),
)
fig.update_layout(
title="Attestation Verification Time by Client",
height=270 * n_rows,
)
fig.show()
Show code
# Summary statistics by client
if not verification_df.empty:
summary = verification_df.groupby(["client", "quantile"])["value_ms"].agg(["mean", "min", "max"]).round(3)
summary.columns = ["Mean (ms)", "Min (ms)", "Max (ms)"]
display(summary)
Attestations in Aggregated Signatures¶
Cumulative number of individual attestations included in aggregated signature proofs over time.
Show code
att_in_agg_df = counts_df[counts_df["metric"] == "lean_pq_sig_attestations_in_aggregated_signatures_total"]
if att_in_agg_df.empty:
print("No attestations in aggregated signatures data available")
else:
fig = make_subplots(
rows=n_rows, cols=n_cols,
subplot_titles=all_clients,
vertical_spacing=0.12 / max(n_rows - 1, 1) * 2,
horizontal_spacing=0.08,
)
for i, client in enumerate(all_clients):
row = i // n_cols + 1
col = i % n_cols + 1
cdf = att_in_agg_df[att_in_agg_df["client"] == client].sort_values("timestamp")
if not cdf.empty and cdf["value"].max() > 0:
fig.add_trace(
go.Scatter(
x=cdf["timestamp"], y=cdf["value"],
showlegend=False,
line=dict(color="#636EFA"),
),
row=row, col=col,
)
fig.update_yaxes(title_text="count", row=row, col=col)
else:
fig.add_trace(
go.Scatter(x=[None], y=[None], showlegend=False, hoverinfo='skip'),
row=row, col=col,
)
_n = (row - 1) * n_cols + col
_s = "" if _n == 1 else str(_n)
fig.add_annotation(
text="No data available",
xref=f"x{_s} domain", yref=f"y{_s} domain",
x=0.5, y=0.5,
showarrow=False,
font=dict(size=12, color="#999"),
)
fig.update_layout(
title="Attestations in Aggregated Signatures by Client",
height=270 * n_rows,
)
fig.show()
Signature Aggregation Time¶
Time to build an aggregated signatures proof from individual post-quantum signatures.
Show code
# Filter to aggregate building metric
agg_build_df = timing_df[timing_df["metric"] == "agg_building"].copy()
if agg_build_df.empty:
print("No signature aggregation timing data available")
else:
agg_build_df["value_ms"] = agg_build_df["value"] * 1000
fig = make_subplots(
rows=n_rows, cols=n_cols,
subplot_titles=all_clients,
vertical_spacing=0.12 / max(n_rows - 1, 1) * 2,
horizontal_spacing=0.08,
)
colors = {"0.5": "#636EFA", "0.95": "#EF553B", "0.99": "#00CC96"}
legend_added = set()
for i, client in enumerate(all_clients):
row = i // n_cols + 1
col = i % n_cols + 1
cdf = agg_build_df[agg_build_df["client"] == client]
if not cdf.empty:
for q in sorted(cdf["quantile"].unique()):
qdf = cdf[cdf["quantile"] == q].sort_values("timestamp")
q_str = str(q)
label = f"p{int(q * 100)}"
show_legend = q_str not in legend_added
legend_added.add(q_str)
fig.add_trace(
go.Scatter(
x=qdf["timestamp"], y=qdf["value_ms"],
name=label, legendgroup=q_str,
showlegend=show_legend,
line=dict(color=colors.get(q_str, "#AB63FA")),
),
row=row, col=col,
)
else:
fig.add_trace(
go.Scatter(x=[None], y=[None], showlegend=False, hoverinfo='skip'),
row=row, col=col,
)
_n = (row - 1) * n_cols + col
_s = "" if _n == 1 else str(_n)
fig.add_annotation(
text="No data available",
xref=f"x{_s} domain", yref=f"y{_s} domain",
x=0.5, y=0.5,
showarrow=False,
font=dict(size=12, color="#999"),
)
fig.update_yaxes(title_text="ms", row=row, col=col)
fig.update_layout(
title="Signature Aggregation Time by Client",
height=270 * n_rows,
)
fig.show()
Aggregated Signatures per Second¶
Rate of signature aggregations per second.
Show code
agg_total_df = counts_df[counts_df["metric"] == "lean_pq_sig_aggregated_signatures_total"]
if agg_total_df.empty:
print("No signature aggregation count data available")
else:
fig = make_subplots(
rows=n_rows, cols=n_cols,
subplot_titles=all_clients,
vertical_spacing=0.12 / max(n_rows - 1, 1) * 2,
horizontal_spacing=0.08,
)
for i, client in enumerate(all_clients):
row = i // n_cols + 1
col = i % n_cols + 1
cdf = agg_total_df[agg_total_df["client"] == client].sort_values("timestamp")
if not cdf.empty and cdf["value"].max() > 0:
dt = cdf["timestamp"].diff().dt.total_seconds()
dv = cdf["value"].diff()
rate = (dv / dt).iloc[1:]
ts = cdf["timestamp"].iloc[1:]
mask = rate >= 0
rate = rate[mask]
ts = ts[mask]
if not rate.empty:
fig.add_trace(
go.Scatter(
x=ts, y=rate,
showlegend=False,
line=dict(color="#636EFA"),
),
row=row, col=col,
)
fig.update_yaxes(title_text="/s", row=row, col=col)
continue
fig.add_trace(
go.Scatter(x=[None], y=[None], showlegend=False, hoverinfo='skip'),
row=row, col=col,
)
_n = (row - 1) * n_cols + col
_s = "" if _n == 1 else str(_n)
fig.add_annotation(
text="No data available",
xref=f"x{_s} domain", yref=f"y{_s} domain",
x=0.5, y=0.5,
showarrow=False,
font=dict(size=12, color="#999"),
)
fig.update_layout(
title="Signature Aggregation Rate by Client",
height=270 * n_rows,
)
fig.show()
Aggregated Signatures per Slot¶
Rate of signature aggregations per slot (4-second slot time).
Show code
SLOT_TIME = 4 # seconds
if agg_total_df.empty:
print("No signature aggregation count data available")
else:
fig = make_subplots(
rows=n_rows, cols=n_cols,
subplot_titles=all_clients,
vertical_spacing=0.12 / max(n_rows - 1, 1) * 2,
horizontal_spacing=0.08,
)
for i, client in enumerate(all_clients):
row = i // n_cols + 1
col = i % n_cols + 1
cdf = agg_total_df[agg_total_df["client"] == client].sort_values("timestamp")
if not cdf.empty and cdf["value"].max() > 0:
dt = cdf["timestamp"].diff().dt.total_seconds()
dv = cdf["value"].diff()
rate = (dv / dt * SLOT_TIME).iloc[1:]
ts = cdf["timestamp"].iloc[1:]
mask = rate >= 0
rate = rate[mask]
ts = ts[mask]
if not rate.empty:
fig.add_trace(
go.Scatter(
x=ts, y=rate,
showlegend=False,
line=dict(color="#636EFA"),
),
row=row, col=col,
)
fig.update_yaxes(title_text="/slot", row=row, col=col)
continue
fig.add_trace(
go.Scatter(x=[None], y=[None], showlegend=False, hoverinfo='skip'),
row=row, col=col,
)
_n = (row - 1) * n_cols + col
_s = "" if _n == 1 else str(_n)
fig.add_annotation(
text="No data available",
xref=f"x{_s} domain", yref=f"y{_s} domain",
x=0.5, y=0.5,
showarrow=False,
font=dict(size=12, color="#999"),
)
fig.update_layout(
title="Signature Aggregation Rate per Slot by Client",
height=270 * n_rows,
)
fig.show()
Aggregated Signature Verification Time¶
Time to verify an aggregated post-quantum signatures proof.
Show code
# Filter to aggregate verification metric
agg_ver_df = timing_df[timing_df["metric"] == "agg_verification"].copy()
if agg_ver_df.empty:
print("No aggregated signature verification timing data available")
else:
agg_ver_df["value_ms"] = agg_ver_df["value"] * 1000
fig = make_subplots(
rows=n_rows, cols=n_cols,
subplot_titles=all_clients,
vertical_spacing=0.12 / max(n_rows - 1, 1) * 2,
horizontal_spacing=0.08,
)
colors = {"0.5": "#636EFA", "0.95": "#EF553B", "0.99": "#00CC96"}
legend_added = set()
for i, client in enumerate(all_clients):
row = i // n_cols + 1
col = i % n_cols + 1
cdf = agg_ver_df[agg_ver_df["client"] == client]
if not cdf.empty:
for q in sorted(cdf["quantile"].unique()):
qdf = cdf[cdf["quantile"] == q].sort_values("timestamp")
q_str = str(q)
label = f"p{int(q * 100)}"
show_legend = q_str not in legend_added
legend_added.add(q_str)
fig.add_trace(
go.Scatter(
x=qdf["timestamp"], y=qdf["value_ms"],
name=label, legendgroup=q_str,
showlegend=show_legend,
line=dict(color=colors.get(q_str, "#AB63FA")),
),
row=row, col=col,
)
else:
fig.add_trace(
go.Scatter(x=[None], y=[None], showlegend=False, hoverinfo='skip'),
row=row, col=col,
)
_n = (row - 1) * n_cols + col
_s = "" if _n == 1 else str(_n)
fig.add_annotation(
text="No data available",
xref=f"x{_s} domain", yref=f"y{_s} domain",
x=0.5, y=0.5,
showarrow=False,
font=dict(size=12, color="#999"),
)
fig.update_yaxes(title_text="ms", row=row, col=col)
fig.update_layout(
title="Aggregated Signature Verification Time by Client",
height=270 * n_rows,
)
fig.show()
Aggregated Signature Verification per Second¶
Rate of valid/invalid aggregated signature verifications per second.
Show code
# Calculate valid/invalid signature rate per client
valid_df = counts_df[counts_df["metric"] == "lean_pq_sig_aggregated_signatures_valid_total"]
invalid_df = counts_df[counts_df["metric"] == "lean_pq_sig_aggregated_signatures_invalid_total"]
if valid_df.empty and invalid_df.empty:
print("No signature count data available")
else:
fig = make_subplots(
rows=n_rows, cols=n_cols,
subplot_titles=all_clients,
vertical_spacing=0.12 / max(n_rows - 1, 1) * 2,
horizontal_spacing=0.08,
)
colors = {"valid": "#2ecc71", "invalid": "#e74c3c"}
legend_added = set()
for i, client in enumerate(all_clients):
row = i // n_cols + 1
col = i % n_cols + 1
has_data = False
for status, sdf in [("valid", valid_df), ("invalid", invalid_df)]:
cdf = sdf[sdf["client"] == client].sort_values("timestamp")
if cdf.empty or cdf["value"].max() == 0:
continue
# Compute rate: diff(value) / diff(timestamp) in per-second
dt = cdf["timestamp"].diff().dt.total_seconds()
dv = cdf["value"].diff()
rate = (dv / dt).iloc[1:] # per second, skip first NaN
ts = cdf["timestamp"].iloc[1:]
# Drop negative rates (counter resets)
mask = rate >= 0
rate = rate[mask]
ts = ts[mask]
if rate.empty:
continue
has_data = True
show_legend = status not in legend_added
legend_added.add(status)
fig.add_trace(
go.Scatter(
x=ts, y=rate,
name=status, legendgroup=status,
showlegend=show_legend,
line=dict(color=colors[status]),
),
row=row, col=col,
)
if has_data:
fig.update_yaxes(title_text="/s", row=row, col=col)
else:
fig.add_trace(
go.Scatter(x=[None], y=[None], showlegend=False, hoverinfo='skip'),
row=row, col=col,
)
_n = (row - 1) * n_cols + col
_s = "" if _n == 1 else str(_n)
fig.add_annotation(
text="No data available",
xref=f"x{_s} domain", yref=f"y{_s} domain",
x=0.5, y=0.5,
showarrow=False,
font=dict(size=12, color="#999"),
)
fig.update_layout(
title="Aggregated Signature Verification Rate by Client",
height=270 * n_rows,
)
fig.show()
Aggregated Signature Verification per Slot¶
Rate of valid/invalid aggregated signature verifications per slot (4-second slot time).
Show code
SLOT_TIME = 4 # seconds
if valid_df.empty and invalid_df.empty:
print("No signature count data available")
else:
fig = make_subplots(
rows=n_rows, cols=n_cols,
subplot_titles=all_clients,
vertical_spacing=0.12 / max(n_rows - 1, 1) * 2,
horizontal_spacing=0.08,
)
colors = {"valid": "#2ecc71", "invalid": "#e74c3c"}
legend_added = set()
for i, client in enumerate(all_clients):
row = i // n_cols + 1
col = i % n_cols + 1
has_data = False
for status, sdf in [("valid", valid_df), ("invalid", invalid_df)]:
cdf = sdf[sdf["client"] == client].sort_values("timestamp")
if cdf.empty or cdf["value"].max() == 0:
continue
# Compute rate per slot: diff(value) / diff(seconds) * slot_time
dt = cdf["timestamp"].diff().dt.total_seconds()
dv = cdf["value"].diff()
rate = (dv / dt * SLOT_TIME).iloc[1:]
ts = cdf["timestamp"].iloc[1:]
# Drop negative rates (counter resets)
mask = rate >= 0
rate = rate[mask]
ts = ts[mask]
if rate.empty:
continue
has_data = True
show_legend = status not in legend_added
legend_added.add(status)
fig.add_trace(
go.Scatter(
x=ts, y=rate,
name=status, legendgroup=status,
showlegend=show_legend,
line=dict(color=colors[status]),
),
row=row, col=col,
)
if has_data:
fig.update_yaxes(title_text="/slot", row=row, col=col)
else:
fig.add_trace(
go.Scatter(x=[None], y=[None], showlegend=False, hoverinfo='skip'),
row=row, col=col,
)
_n = (row - 1) * n_cols + col
_s = "" if _n == 1 else str(_n)
fig.add_annotation(
text="No data available",
xref=f"x{_s} domain", yref=f"y{_s} domain",
x=0.5, y=0.5,
showarrow=False,
font=dict(size=12, color="#999"),
)
fig.update_layout(
title="Aggregated Signature Verification Rate per Slot by Client",
height=270 * n_rows,
)
fig.show()
Summary¶
Key findings from this devnet iteration:
Show code
# Generate summary statistics
print(f"Devnet: {devnet_id}")
print(f"Duration: {devnet_info['duration_hours']:.1f} hours")
print(f"Clients analyzed: {len(timing_df['client'].unique())}")
print()
if not signing_df.empty:
p95_mean = signing_df[signing_df["quantile"] == 0.95]["value_ms"].mean()
print(f"Average P95 signing time: {p95_mean:.2f} ms")
if not verification_df.empty:
p95_ver = verification_df[verification_df["quantile"] == 0.95]["value_ms"].mean()
print(f"Average P95 verification time: {p95_ver:.2f} ms")