Networking
Networking¶
P2P networking analysis for PQ Devnet clients.
This notebook examines:
- Peer connections over time
- Peer connection and disconnection events
- Attestation arrivals (valid vs invalid, by source)
- Network bandwidth per client (rx/tx throughput)
Show code
import json
from pathlib import Path
import pandas as pd
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from IPython.display import display
# Set default renderer for static HTML output
import plotly.io as pio
pio.renderers.default = "notebook"
Show code
# Resolve devnet_id
DATA_DIR = Path("../data")
if devnet_id is None:
# Use latest devnet from manifest
devnets_path = DATA_DIR / "devnets.json"
if devnets_path.exists():
with open(devnets_path) as f:
devnets = json.load(f).get("devnets", [])
if devnets:
devnet_id = devnets[-1]["id"] # Latest
print(f"Using latest devnet: {devnet_id}")
else:
raise ValueError("No devnets.json found. Run 'just detect-devnets' first.")
DEVNET_DIR = DATA_DIR / devnet_id
print(f"Loading data from: {DEVNET_DIR}")
Show code
# Load devnet metadata
with open(DATA_DIR / "devnets.json") as f:
devnets_data = json.load(f)
devnet_info = next((d for d in devnets_data["devnets"] if d["id"] == devnet_id), None)
if devnet_info:
print(f"Devnet: {devnet_info['id']}")
print(f"Duration: {devnet_info['duration_hours']:.1f} hours")
print(f"Time: {devnet_info['start_time']} to {devnet_info['end_time']}")
print(f"Slots: {devnet_info['start_slot']} \u2192 {devnet_info['end_slot']}")
print(f"Clients: {', '.join(devnet_info['clients'])}")
Load Data¶
Show code
# Load network peer data
peers_df = pd.read_parquet(DEVNET_DIR / "network_peers.parquet")
peers_df = peers_df.groupby(["client", "timestamp"], as_index=False)["value"].max()
print(f"Peers: {len(peers_df)} records, clients: {sorted(peers_df['client'].unique())}")
# Load peer connection/disconnection events
peer_events_path = DEVNET_DIR / "peer_events.parquet"
if peer_events_path.exists():
peer_events_df = pd.read_parquet(peer_events_path)
peer_events_df = peer_events_df.groupby(["client", "metric", "timestamp"], as_index=False)["value"].max()
print(f"Peer events: {len(peer_events_df)} records, clients: {sorted(peer_events_df['client'].unique())}")
else:
peer_events_df = pd.DataFrame()
print("Peer events: no data")
# Load attestation metrics
att_df = pd.read_parquet(DEVNET_DIR / "attestation_metrics.parquet")
att_df = att_df.groupby(["client", "metric", "source", "timestamp"], as_index=False)["value"].max()
print(f"Attestations: {len(att_df)} records, clients: {sorted(att_df['client'].unique())}")
print(f"Attestation metrics: {sorted(att_df['metric'].unique())}")
print(f"Attestation sources: {sorted(att_df['source'].unique())}")
# Load network throughput (container-level)
EXCLUDED_CONTAINERS = {"unknown", "cadvisor", "prometheus", "promtail", "node-exporter", "node_exporter", "grafana"}
net_path = DEVNET_DIR / "container_network.parquet"
if net_path.exists():
net_df = pd.read_parquet(net_path)
net_df = net_df[~net_df["container"].isin(EXCLUDED_CONTAINERS)]
net_df = net_df.groupby(["container", "metric", "timestamp"], as_index=False)["value"].sum()
print(f"Network throughput: {len(net_df)} records, containers: {sorted(net_df['container'].unique())}")
else:
net_df = pd.DataFrame()
print("Network throughput: no data")
# Unified client list from devnet metadata (includes all containers via cAdvisor)
all_clients = sorted(devnet_info["clients"])
n_cols = min(len(all_clients), 2)
n_rows = -(-len(all_clients) // n_cols)
print(f"\nAll clients ({len(all_clients)}): {all_clients}")
Peer Connections¶
Number of connected P2P peers over time. More peers generally means better attestation propagation and network resilience. Drops to 0 or 1 may indicate connectivity issues.
Show code
fig = make_subplots(
rows=n_rows, cols=n_cols,
subplot_titles=all_clients,
vertical_spacing=0.12 / max(n_rows - 1, 1) * 2,
horizontal_spacing=0.08,
)
for i, client in enumerate(all_clients):
row = i // n_cols + 1
col = i % n_cols + 1
cdf = peers_df[peers_df["client"] == client].sort_values("timestamp")
if not cdf.empty:
fig.add_trace(
go.Scatter(
x=cdf["timestamp"], y=cdf["value"],
name=client, showlegend=False,
line=dict(color="#636EFA"),
),
row=row, col=col,
)
else:
fig.add_trace(
go.Scatter(x=[None], y=[None], showlegend=False, hoverinfo='skip'),
row=row, col=col,
)
_n = (row - 1) * n_cols + col
_s = "" if _n == 1 else str(_n)
fig.add_annotation(
text="No data available",
xref=f"x{_s} domain", yref=f"y{_s} domain",
x=0.5, y=0.5,
showarrow=False,
font=dict(size=12, color="#999"),
)
fig.update_yaxes(title_text="Peers", row=row, col=col)
fig.update_layout(
title="Connected Peers Over Time",
height=270 * n_rows,
)
fig.show()
Peer Connection & Disconnection Events¶
Connection and disconnection events per minute. Spikes in disconnections may indicate network instability or incompatible peers being dropped.
Show code
if peer_events_df.empty:
print("No peer event data available")
else:
fig = make_subplots(
rows=n_rows, cols=n_cols,
subplot_titles=all_clients,
vertical_spacing=0.12 / max(n_rows - 1, 1) * 2,
horizontal_spacing=0.08,
)
colors = {"connection": "#00CC96", "disconnection": "#EF553B"}
legend_added = set()
for i, client in enumerate(all_clients):
row = i // n_cols + 1
col = i % n_cols + 1
cdf = peer_events_df[peer_events_df["client"] == client]
if not cdf.empty:
for metric in ["connection", "disconnection"]:
mdf = cdf[cdf["metric"] == metric].sort_values("timestamp").copy()
if mdf.empty:
continue
mdf["rate"] = mdf["value"].diff()
mdf = mdf[(mdf["rate"] >= 0) & mdf["rate"].notna()]
if mdf.empty:
continue
show_legend = metric not in legend_added
legend_added.add(metric)
fig.add_trace(
go.Scatter(
x=mdf["timestamp"], y=mdf["rate"],
name=metric, legendgroup=metric,
showlegend=show_legend,
line=dict(color=colors[metric]),
),
row=row, col=col,
)
else:
fig.add_trace(
go.Scatter(x=[None], y=[None], showlegend=False, hoverinfo='skip'),
row=row, col=col,
)
_n = (row - 1) * n_cols + col
_s = "" if _n == 1 else str(_n)
fig.add_annotation(
text="No data available",
xref=f"x{_s} domain", yref=f"y{_s} domain",
x=0.5, y=0.5,
showarrow=False,
font=dict(size=12, color="#999"),
)
fig.update_yaxes(title_text="Events/min", row=row, col=col)
fig.update_layout(
title="Peer Connection & Disconnection Events by Client",
height=270 * n_rows,
)
fig.show()
Attestation Arrivals¶
Cumulative valid and invalid attestations received per client. Attestations arrive via two channels:
- gossip: received directly from peers over the P2P network
- block: included in received blocks
High invalid counts may indicate signature verification failures or incompatible messages.
Show code
fig = make_subplots(
rows=n_rows, cols=n_cols,
subplot_titles=all_clients,
vertical_spacing=0.12 / max(n_rows - 1, 1) * 2,
horizontal_spacing=0.08,
)
colors = {
("valid", "gossip"): "#636EFA",
("valid", "block"): "#00CC96",
("valid", "unknown"): "#AB63FA",
("invalid", "gossip"): "#EF553B",
("invalid", "block"): "#FFA15A",
("invalid", "unknown"): "#FF6692",
}
legend_added = set()
for i, client in enumerate(all_clients):
row = i // n_cols + 1
col = i % n_cols + 1
cdf = att_df[att_df["client"] == client]
if not cdf.empty:
for metric in ["lean_attestations_valid_total", "lean_attestations_invalid_total"]:
mdf = cdf[cdf["metric"] == metric]
validity = "valid" if "valid" in metric else "invalid"
for source in sorted(mdf["source"].unique()):
sdf = mdf[mdf["source"] == source].sort_values("timestamp")
if sdf.empty or sdf["value"].max() == 0:
continue
# Insert None at counter resets to break the line
resets = sdf["value"].diff() < 0
if resets.any():
rows = []
for idx, is_reset in resets.items():
if is_reset:
rows.append({"timestamp": sdf.loc[idx, "timestamp"], "value": None})
rows.append(sdf.loc[idx].to_dict())
sdf = pd.DataFrame(rows)
key = (validity, source)
label = f"{validity} ({source})"
show_legend = key not in legend_added
legend_added.add(key)
fig.add_trace(
go.Scatter(
x=sdf["timestamp"], y=sdf["value"],
name=label, legendgroup=label,
showlegend=show_legend,
line=dict(color=colors.get(key, "#636EFA")),
connectgaps=False,
),
row=row, col=col,
)
else:
fig.add_trace(
go.Scatter(x=[None], y=[None], showlegend=False, hoverinfo='skip'),
row=row, col=col,
)
_n = (row - 1) * n_cols + col
_s = "" if _n == 1 else str(_n)
fig.add_annotation(
text="No data available",
xref=f"x{_s} domain", yref=f"y{_s} domain",
x=0.5, y=0.5,
showarrow=False,
font=dict(size=12, color="#999"),
)
fig.update_yaxes(title_text="Count", row=row, col=col)
fig.update_layout(
title="Attestation Counts by Client",
height=270 * n_rows,
)
fig.show()
Show code
# Attestation summary: final counts per client
att_summary_rows = []
for client in all_clients:
row_data = {"Client": client}
cdf = att_df[att_df["client"] == client]
for metric in ["lean_attestations_valid_total", "lean_attestations_invalid_total"]:
mdf = cdf[cdf["metric"] == metric]
validity = "Valid" if "valid" in metric else "Invalid"
for source in sorted(mdf["source"].unique()):
sdf = mdf[mdf["source"] == source]
if not sdf.empty:
col_name = f"{validity} ({source})"
row_data[col_name] = f"{sdf['value'].max():.0f}"
att_summary_rows.append(row_data)
if att_summary_rows:
att_summary = pd.DataFrame(att_summary_rows).set_index("Client").fillna("-")
display(att_summary)
Attestation Counts per Slot¶
Estimated attestations received per slot (4 seconds). Computed by diffing cumulative counters at each 1-minute scrape interval and dividing by 15 (slots per minute). Shows combined valid attestations across all sources.
Show code
SLOT_DURATION = 4 # seconds
SLOTS_PER_MINUTE = 60 / SLOT_DURATION
# Sum valid attestations across all sources per client per timestamp
valid_att = att_df[att_df["metric"] == "lean_attestations_valid_total"].copy()
valid_per_client = valid_att.groupby(["client", "timestamp"], as_index=False)["value"].sum()
fig = make_subplots(
rows=n_rows, cols=n_cols,
subplot_titles=all_clients,
vertical_spacing=0.12 / max(n_rows - 1, 1) * 2,
horizontal_spacing=0.08,
)
for i, client in enumerate(all_clients):
row = i // n_cols + 1
col = i % n_cols + 1
cdf = valid_per_client[valid_per_client["client"] == client].sort_values("timestamp").copy()
if not cdf.empty:
cdf["delta"] = cdf["value"].diff()
cdf["dt"] = cdf["timestamp"].diff().dt.total_seconds()
cdf = cdf[(cdf["delta"] >= 0) & (cdf["dt"] > 0) & cdf["delta"].notna()]
if not cdf.empty:
cdf["per_slot"] = cdf["delta"] / (cdf["dt"] / SLOT_DURATION)
fig.add_trace(
go.Scatter(
x=cdf["timestamp"], y=cdf["per_slot"],
name=client, showlegend=False,
line=dict(color="#636EFA"),
),
row=row, col=col,
)
else:
fig.add_trace(
go.Scatter(x=[None], y=[None], showlegend=False, hoverinfo='skip'),
row=row, col=col,
)
_n = (row - 1) * n_cols + col
_s = "" if _n == 1 else str(_n)
fig.add_annotation(
text="No data available",
xref=f"x{_s} domain", yref=f"y{_s} domain",
x=0.5, y=0.5,
showarrow=False,
font=dict(size=12, color="#999"),
)
fig.update_yaxes(title_text="Atts/slot", row=row, col=col)
fig.update_layout(
title="Valid Attestations Received per Slot by Client",
height=270 * n_rows,
)
fig.show()
Show code
if net_df.empty:
print("No network throughput data available")
else:
# EIP-7870 bandwidth tiers (Mbps -> KB/s)
def mbps_to_kbps(mbps: float) -> float:
return mbps * 1e6 / 8 / 1024
EIP7870_TIERS = [15, 25, 50] # Mbps
# Filter to client containers only (ending in _0)
client_net = net_df[net_df["container"].str.endswith("_0")].copy()
client_net["value_kb"] = client_net["value"] / 1024
# Derive container list from all_clients for consistency
all_net_containers = [f"{c}_0" for c in all_clients]
n_cols_net = min(len(all_net_containers), 2)
n_rows_net = -(-len(all_net_containers) // n_cols_net)
fig = make_subplots(
rows=n_rows_net, cols=n_cols_net,
subplot_titles=all_net_containers,
vertical_spacing=0.12 / max(n_rows_net - 1, 1) * 2,
horizontal_spacing=0.08,
)
colors = {"rx": "#636EFA", "tx": "#EF553B"}
legend_added = set()
for i, container in enumerate(all_net_containers):
row = i // n_cols_net + 1
col = i % n_cols_net + 1
cdf = client_net[client_net["container"] == container]
if not cdf.empty:
for metric in ["rx", "tx"]:
mdf = cdf[cdf["metric"] == metric].sort_values("timestamp")
if mdf.empty:
continue
show_legend = metric not in legend_added
legend_added.add(metric)
fig.add_trace(
go.Scatter(
x=mdf["timestamp"], y=mdf["value_kb"],
name=metric, legendgroup=metric,
showlegend=show_legend,
line=dict(color=colors[metric]),
),
row=row, col=col,
)
# Add EIP-7870 reference lines
for mbps in EIP7870_TIERS:
fig.add_hline(
y=mbps_to_kbps(mbps),
row=row, col=col,
line=dict(color="#888", dash="dash", width=1),
annotation=dict(
text=f"{mbps} Mbps",
font=dict(size=9, color="#888"),
),
)
else:
fig.add_trace(
go.Scatter(x=[None], y=[None], showlegend=False, hoverinfo='skip'),
row=row, col=col,
)
_n = (row - 1) * n_cols_net + col
_s = "" if _n == 1 else str(_n)
fig.add_annotation(
text="No data available",
xref=f"x{_s} domain", yref=f"y{_s} domain",
x=0.5, y=0.5,
showarrow=False,
font=dict(size=12, color="#999"),
)
fig.update_yaxes(title_text="KB/s", row=row, col=col)
fig.update_layout(
title="Network Throughput per Client (RX vs TX)",
height=270 * n_rows_net,
)
fig.show()
Summary¶
Show code
def format_bytes_per_sec(val: float) -> str:
"""Format bytes/s to human-readable units."""
for unit in ["B/s", "KB/s", "MB/s", "GB/s"]:
if abs(val) < 1024:
return f"{val:.1f} {unit}"
val /= 1024
return f"{val:.1f} TB/s"
summary_rows = []
for client in all_clients:
row = {"Client": client}
# Peers
client_peers = peers_df[peers_df["client"] == client]["value"]
if not client_peers.empty:
row["Avg Peers"] = f"{client_peers.mean():.1f}"
row["Min Peers"] = f"{client_peers.min():.0f}"
# Attestations
client_att = att_df[att_df["client"] == client]
valid = client_att[client_att["metric"] == "lean_attestations_valid_total"]["value"].max()
invalid = client_att[client_att["metric"] == "lean_attestations_invalid_total"]["value"].max()
if pd.notna(valid):
row["Valid Atts"] = f"{valid:.0f}"
if pd.notna(invalid) and invalid > 0:
row["Invalid Atts"] = f"{invalid:.0f}"
# Network bandwidth
container_name = f"{client}_0"
if not net_df.empty:
cnet = net_df[net_df["container"] == container_name]
rx = cnet[cnet["metric"] == "rx"]["value"]
tx = cnet[cnet["metric"] == "tx"]["value"]
if not rx.empty:
row["Avg RX"] = format_bytes_per_sec(rx.mean())
if not tx.empty:
row["Avg TX"] = format_bytes_per_sec(tx.mean())
summary_rows.append(row)
if summary_rows:
summary_df = pd.DataFrame(summary_rows).set_index("Client").fillna("-")
display(summary_df)
print(f"\nDevnet: {devnet_id}")
if devnet_info:
print(f"Duration: {devnet_info['duration_hours']:.1f} hours")