Source code for chipfiring.CFRank

"""
Rank calculation for divisors on chip-firing graphs.

This module provides functionality to calculate the rank of divisors on chip-firing graphs,
which is an important invariant in the theory of divisors on graphs. The rank measures
how much freedom you have to move chips around while keeping the divisor effective.

The implementation uses the Efficient Winnability Detection (EWD) algorithm as a building
block and provides both standard and optimized calculation modes.
"""
from __future__ import annotations
from .CFDivisor import CFDivisor
from .CFOrientation import CFOrientation
from .algo import EWD
import itertools
from multiprocessing import Pool, cpu_count
from collections import Counter


[docs] class CFRank: """ A class that holds the result of a rank calculation. This class stores both the computed rank value of a divisor and the detailed logs of the calculation process. It is typically created and returned by the `rank()` function rather than being instantiated directly. Attributes: logs (List[str]): Sequential log messages from the rank calculation process. rank (int): The computed rank value, accessible as a property. Example: >>> result = rank(divisor) >>> print(f"The rank is: {result.rank}") >>> for log in result.logs: ... print(log) """
[docs] def __init__(self): """Internal constructor to initialize the CFRank object.""" self.logs = [] self._rank_value = None self._divisor = None
[docs] def log(self, message: str): """Internal method to add a log message to the logs list.""" self.logs.append(message)
@property def rank(self) -> int: """ Retrieve the calculated rank value. Returns: int: The rank value of the divisor. Raises: ValueError: If no rank calculation has been performed yet. """ if self._rank_value is None: raise ValueError("No rank has been calculated yet.") return self._rank_value
[docs] def _calculate_rank(self, divisor: CFDivisor, optimized: bool = False) -> "CFRank": """ Internal method to calculate the rank of a given divisor. """ self.logs = [] # Reset logs for new calculation self._divisor = divisor graph = divisor.graph # 1. Call EWD on the divisor; if unwinnable, return -1 self.log("Step 1: Checking initial winnability through EWD algorithm...") initial_winnable, _, _, _ = EWD(graph, divisor, optimized=False) if not initial_winnable: self.log("Initial divisor is not winnable. So, rank: -1") self._rank_value = -1 return self self.log("Initial divisor is winnable. Proceeding to step 2.") if optimized: self.log( "Optimized mode is enabled. Checking if we can apply theoretical shortcuts before proceeding." ) D = self._divisor # Using Corollary 4.4.3 from Dhyey Mavani's Math thesis: if D.get_total_degree() > 2 * graph.get_genus() - 2: self.log( "Optimized mode: D has degree > 2g-2. Using Corollary 4.4.3 from Dhyey Mavani's Math thesis to skip step 2, and return rank(D) = degree(D) - genus(G)." ) self._rank_value = D.get_total_degree() - graph.get_genus() return self # Check if the degree of (K-D) < degree of D, if so, run next step on (K-D) orientation = CFOrientation(graph, []) K = orientation.canonical_divisor() if (K - D).get_total_degree() < D.get_total_degree(): self.log( "Optimized mode: (K-D) has lower degree than D. Running next step on (K-D)." ) self._divisor = K - D else: self.log( "Optimized mode: (K-D) has degree >= that of D. Running next step on D itself." ) # 2. Sort the vertices by name sorted_vertices = sorted(list(graph.vertices), key=lambda v: v.name) k = 1 self.log("Step 2: Iteratively removing k chips and checking winnability.") while True: self.log(f"\n-- Current k: {k} --") def generate_sub_divisors_for_k(): """ Generates all divisors D-E where E is an effective divisor of degree k. """ # Create all effective divisors E of degree k. # An effective divisor is a sum of vertices, so we can get all of them # by taking combinations with replacement of the vertices. for combination_of_vertices in itertools.combinations_with_replacement( sorted_vertices, k ): vertex_counts = Counter(v.name for v in combination_of_vertices) # Create the divisor E to subtract subtraction_divisor = CFDivisor( graph, list(vertex_counts.items()) ) # Return D - E yield self._divisor - subtraction_divisor any_unwinnable_found_for_k = False num_divisors_processed_for_k = 0 def _check_winnability(sub_divisor): nonlocal num_divisors_processed_for_k num_divisors_processed_for_k += 1 winnable, _, _, _ = EWD( sub_divisor.graph, sub_divisor, optimized=False ) if not winnable: # Log from the worker process if an unwinnable divisor is found # This might not be perfectly sequential in the main log, but it's informative print( f" Found unwinnable divisor for k={k}. Terminating pool for this k." ) return winnable try: self.log(f" Starting parallel processing for k={k}...") with Pool(processes=cpu_count()) as pool: # The number of divisors can be large, so use imap_unordered for lazy evaluation results_iterator = pool.imap_unordered( _check_winnability, generate_sub_divisors_for_k(), ) for i, winnable_res in enumerate(results_iterator): self.log(f" Processed (k={k}, item {i+1}): Winnable -> {winnable_res}") if not winnable_res: any_unwinnable_found_for_k = True pool.terminate() # Stop processing further items for this k break self.log(f" Parallel processing finished for k={k}.") except Exception as e: self.log( f" Multiprocessing failed for k={k}: {e}. Falling back to sequential execution." ) any_unwinnable_found_for_k = False # Reset for sequential run num_divisors_processed_for_k = 0 # Reset for sequential run self.log(f" Starting sequential processing for k={k}...") for sub_divisor in generate_sub_divisors_for_k(): num_divisors_processed_for_k += 1 winnable_res, _, _, _ = EWD( sub_divisor.graph, sub_divisor, optimized=False ) self.log( f" Processed (k={k}, item {num_divisors_processed_for_k}): Divisor {sub_divisor.degrees_to_str()} -> Winnable: {winnable_res}" ) if not winnable_res: any_unwinnable_found_for_k = True self.log( f" Found unwinnable divisor {sub_divisor.degrees_to_str()} for k={k}." ) break self.log(f" Sequential processing finished for k={k}.") if any_unwinnable_found_for_k: self.log( f" For k={k}, an unwinnable configuration was found. Rank: {k-1}" ) self._rank_value = k - 1 return self else: self.log( f" All {num_divisors_processed_for_k} processed configurations for k={k} were winnable. Incrementing k." ) k += 1
# Loop continues for the next k
[docs] def get_log_summary(self) -> str: """ Get a complete log of the rank calculation process. Returns: str: A string containing all log messages from the calculation. If no logs are available, returns "No calculation logs available." Example: >>> result = rank(divisor) >>> print(result.get_log_summary()) """ if not self.logs: return "No calculation logs available." return "\n".join(self.logs)
[docs] def rank(divisor: CFDivisor, optimized: bool = False) -> CFRank: """ Calculate the rank of a given divisor. In divisor theory, the rank r(D) of a divisor D is defined as the largest integer r such that D-E is equivalent to an effective divisor for all effective divisors E of degree r. If D is not equivalent to an effective divisor, then r(D) = -1. The rank is computed as follows: 1. If EWD(divisor) is not winnable, return -1. 2. Starting with k = 1, consider all effective divisors E of degree k. 3. For each such divisor E, check if D-E is winnable using the EWD algorithm. These checks are performed in parallel for a given k. 4. If all checks for the current k are winnable, increment k and repeat step 2. 5. Otherwise (if any check for D-E returns not winnable), the rank is k-1. Args: divisor: The CFDivisor object for which to calculate the rank. optimized: Whether to use optimized rank calculation. (default: False) If True, theoretical shortcuts like Corollary 4.4.3 from Dhyey Mavani's thesis will be used when applicable to speed up calculations. The log will indicate when these optimizations are used. Returns: CFRank: An object with the calculated rank accessible via .rank property and calculation logs accessible via .logs attribute. One can also access the full log summary using .get_log_summary(). Example: >>> result = rank(divisor) >>> print(f"Rank: {result.rank}") >>> print(result.get_log_summary()) """ return CFRank()._calculate_rank(divisor, optimized)
[docs] def r(divisor : CFDivisor, optimized: bool = False) -> int: """ Calculate the rank of the given divisor, as in the funciton "rank." This funcion returns only the rank itself, as an integer, without the logs. Implemented as a wrapper around "rank." Args: divisor: The CFDivisor object for which to calculate the rank. optimized: Whether to use optimized rank calculation. (default: False) If True, theoretical shortcuts like Corollary 4.4.3 from Dhyey Mavani's thesis will be used when applicable to speed up calculations. The log will indicate when these optimizations are used. Returns: int: The rank of the divisor. Example: >>> result = r(divisor) >>> print(f"Rank: {result}") """ return CFRank()._calculate_rank(divisor, optimized).rank