Source code for hiper.metrics.swalk

# -*- coding: utf-8 -*-
"""
swalk.py

Implements s-walk efficiency metrics for hypergraphs.
"""

from typing import Dict

from hiper.core.hypernetwork import Hypernetwork
from .distance import HypergraphDistance


[docs] class SwalkEfficiency: """ Computes average s-walk efficiency ℰs(H). The s-walk efficiency measures how efficiently nodes can reach each other via s-walks, computed as the average of inverse distances. Formula: :math:`\mathcal{E}_s(H) = \\frac{1}{|V|(|V|-1)} \\sum \\frac{1}{d_H^s(u,v)}` for all pairs :math:`u,v \\in V` with :math:`u \\neq v`. """
[docs] def __init__(self, s: int = 1): """ Initialize with s-walk parameter. Args: s: Parameter for s-walk computation. """ self.s = s self.distance_calculator = HypergraphDistance(s)
[docs] def compute(self, hypernetwork: Hypernetwork) -> float: """ Compute average s-walk efficiency. Args: hypernetwork: Target hypergraph. Returns: S-walk efficiency value ℰs(H) in range [0, 1]. """ nodes = list(hypernetwork.nodes) n = len(nodes) if n <= 1: return 0.0 total_efficiency = 0.0 pair_count = 0 # Compute efficiency for all distinct node pairs for i, u in enumerate(nodes): for v in nodes: if u != v: distance = self.distance_calculator.compute_distance( hypernetwork, u, v) if distance != float('inf') and distance > 0: efficiency = 1.0 / distance else: efficiency = 0.0 total_efficiency += efficiency pair_count += 1 # Apply formula if pair_count > 0: return total_efficiency / pair_count else: return 0.0
[docs] def compute_detailed(self, hypernetwork: Hypernetwork) -> Dict[str, float]: """ Compute detailed s-walk efficiency statistics. Args: hypernetwork: Target hypergraph. Returns: Dictionary with efficiency statistics including average, min, max. """ nodes = list(hypernetwork.nodes) n = len(nodes) if n <= 1: return { 'average_efficiency': 0.0, 'min_efficiency': 0.0, 'max_efficiency': 0.0, 'connected_pairs': 0, 'total_pairs': 0, 'connectivity_ratio': 0.0 } efficiencies = [] connected_pairs = 0 total_pairs = 0 for i, u in enumerate(nodes): for v in nodes: if u != v: distance = self.distance_calculator.compute_distance( hypernetwork, u, v) if distance != float('inf') and distance > 0: efficiency = 1.0 / distance connected_pairs += 1 else: efficiency = 0.0 efficiencies.append(efficiency) total_pairs += 1 average_efficiency = sum(efficiencies) / len(efficiencies) min_efficiency = min(efficiencies) max_efficiency = max(efficiencies) connectivity_ratio = connected_pairs / total_pairs \ if total_pairs > 0 else 0.0 return { 'average_efficiency': average_efficiency, 'min_efficiency': min_efficiency, 'max_efficiency': max_efficiency, 'connected_pairs': connected_pairs, 'total_pairs': total_pairs, 'connectivity_ratio': connectivity_ratio }
[docs] def compute_node_efficiencies(self, hypernetwork: Hypernetwork) -> Dict[ int, float]: """ Compute individual node efficiency scores. Args: hypernetwork: Target hypergraph. Returns: Dictionary mapping node IDs to their efficiency scores. """ nodes = list(hypernetwork.nodes) node_efficiencies = {} for source in nodes: total_efficiency = 0.0 reachable_count = 0 for target in nodes: if source != target: distance = self.distance_calculator.compute_distance( hypernetwork, source, target) if distance != float('inf') and distance > 0: total_efficiency += 1.0 / distance reachable_count += 1 # Average efficiency for this node if reachable_count > 0: node_efficiencies[source] = total_efficiency / reachable_count else: node_efficiencies[source] = 0.0 return node_efficiencies
[docs] def find_critical_nodes(self, hypernetwork: Hypernetwork, top_k: int = 5) -> list: """ Find nodes whose removal would most impact s-walk efficiency. Args: hypernetwork: Target hypergraph. top_k: Number of critical nodes to return. Returns: List of (node_id, efficiency_impact) tuples. """ original_efficiency = self.compute(hypernetwork) impacts = [] for node_id in hypernetwork.nodes: # Create copy without this node test_hn = Hypernetwork() for edge_id in hypernetwork.edges: original_nodes = hypernetwork.get_nodes(edge_id) remaining_nodes = [n for n in original_nodes if n != node_id] if len(remaining_nodes) >= 1: test_hn.add_hyperedge(edge_id, remaining_nodes) # Compute efficiency without this node new_efficiency = self.compute(test_hn) impact = original_efficiency - new_efficiency impacts.append((node_id, impact)) # Sort by impact (descending) impacts.sort(key=lambda x: x[1], reverse=True) return impacts[:top_k]