from __future__ import annotations
import json
from .CFGraph import CFGraph
from .CFDivisor import CFDivisor
from .CFOrientation import CFOrientation
from .CFiringScript import CFiringScript
[docs]
class CFDataProcessor:
"""
A class to handle data input and output for chip-firing objects
in various formats like .txt, .json, and .tex.
"""
[docs]
def __init__(self):
"""
Initialize the CFDataProcessor.
"""
pass
# --- JSON Methods ---
[docs]
def read_json(self, file_path: str, object_type: str):
"""
Reads a CF object from a .json file.
Args:
file_path (str): The path to the .json file.
object_type (str): The type of CF object to read ('graph', 'divisor', 'orientation', 'firingscript').
Returns:
A CF object (e.g., CFGraph, CFDivisor) or None if reading fails.
"""
try:
with open(file_path, 'r') as f:
data = json.load(f)
if object_type.lower() == 'graph':
return CFGraph.from_dict(data)
elif object_type.lower() == 'divisor':
return CFDivisor.from_dict(data)
elif object_type.lower() == 'orientation':
return CFOrientation.from_dict(data)
elif object_type.lower() == 'firingscript':
return CFiringScript.from_dict(data)
else:
print(f"Unsupported object_type for JSON reading: {object_type}. Please use 'graph', 'divisor', 'orientation', or 'firingscript'.")
return None
except FileNotFoundError:
print(f"Error: File not found at {file_path}")
return None
except json.JSONDecodeError:
print(f"Error: Could not decode JSON from {file_path}")
return None
except ValueError as ve:
print(f"Error processing data for {object_type}: {ve}")
return None
except Exception as e:
print(f"An unexpected error occurred while reading JSON: {e}")
return None
[docs]
def to_json(self, cf_object, file_path: str):
"""
Writes a CF object to a .json file.
Args:
cf_object: The CF object to serialize (e.g., CFGraph, CFDivisor).
file_path (str): The path to save the .json file.
"""
data_to_write = None
if isinstance(cf_object, (CFGraph, CFDivisor, CFOrientation, CFiringScript)):
data_to_write = cf_object.to_dict()
else:
raise ValueError(f"Unsupported object type for JSON serialization: {type(cf_object)}")
if data_to_write is not None:
try:
with open(file_path, 'w') as f:
json.dump(data_to_write, f, indent=4)
print(f"Successfully wrote object to {file_path}.")
except Exception as e:
print(f"An error occurred while writing JSON to {file_path}: {e}")
else:
print(f"No data to write for object of type {type(cf_object)}.")
# --- TXT Methods ---
[docs]
def read_txt(self, file_path: str, object_type: str):
"""
Reads a CF object from a .txt file.
Args:
file_path (str): The path to the .txt file.
object_type (str): The type of CF object to read.
Returns:
A CF object or None if reading fails.
"""
try:
with open(file_path, 'r') as f:
lines = [line.strip() for line in f if line.strip()] # Read non-empty lines
if object_type.lower() == 'graph':
vertex_names = []
edges = []
for line in lines:
if line.startswith("VERTICES:"):
vertex_names = [name.strip() for name in line.replace("VERTICES:", "").split(',')]
elif line.startswith("EDGE:"):
parts = [part.strip() for part in line.replace("EDGE:", "").split(',')]
if len(parts) == 3:
edges.append((parts[0], parts[1], int(parts[2])))
else:
print(f"Warning: Malformed EDGE line: {line}")
if not vertex_names:
raise ValueError("VERTICES line missing or empty in TXT file for graph.")
return CFGraph(set(vertex_names), edges)
elif object_type.lower() == 'divisor':
graph_vertex_names = []
graph_edges = []
divisor_degrees_list = []
parsing_degrees = False
for line in lines:
if line.startswith("GRAPH_VERTICES:"):
graph_vertex_names = [name.strip() for name in line.replace("GRAPH_VERTICES:", "").split(',')]
elif line.startswith("GRAPH_EDGE:"):
parts = [part.strip() for part in line.replace("GRAPH_EDGE:", "").split(',')]
if len(parts) == 3:
graph_edges.append((parts[0], parts[1], int(parts[2])))
else:
print(f"Warning: Malformed GRAPH_EDGE line: {line}")
elif line == "---DEGREES---":
parsing_degrees = True
elif line.startswith("DEGREE:") and parsing_degrees:
parts = [part.strip() for part in line.replace("DEGREE:", "").split(',')]
if len(parts) == 2:
divisor_degrees_list.append((parts[0], int(parts[1])))
else:
print(f"Warning: Malformed DEGREE line: {line}")
if not graph_vertex_names:
raise ValueError("GRAPH_VERTICES line missing or empty in TXT file for divisor.")
graph = CFGraph(set(graph_vertex_names), graph_edges)
# For CFDivisor constructor, all graph vertices will default to degree 0
# if not specified in divisor_degrees_list.
return CFDivisor(graph, divisor_degrees_list)
elif object_type.lower() == 'orientation':
graph_vertex_names = []
graph_edges = []
orientations_list = []
parsing_orientations = False
for line in lines:
if line.startswith("GRAPH_VERTICES:"):
graph_vertex_names = [name.strip() for name in line.replace("GRAPH_VERTICES:", "").split(',')]
elif line.startswith("GRAPH_EDGE:"):
parts = [part.strip() for part in line.replace("GRAPH_EDGE:", "").split(',')]
if len(parts) == 3:
graph_edges.append((parts[0], parts[1], int(parts[2])))
else:
print(f"Warning: Malformed GRAPH_EDGE line: {line}")
elif line == "---ORIENTATIONS---":
parsing_orientations = True
elif line.startswith("ORIENTED:") and parsing_orientations:
parts = [part.strip() for part in line.replace("ORIENTED:", "").split(',')]
if len(parts) == 2:
orientations_list.append((parts[0], parts[1]))
else:
print(f"Warning: Malformed ORIENTED line: {line}")
if not graph_vertex_names:
raise ValueError("GRAPH_VERTICES missing for orientation.")
graph = CFGraph(set(graph_vertex_names), graph_edges)
return CFOrientation(graph, orientations_list)
elif object_type.lower() == 'firingscript':
graph_vertex_names = []
graph_edges = []
script_dict = {}
parsing_script = False
for line in lines:
if line.startswith("GRAPH_VERTICES:"):
graph_vertex_names = [name.strip() for name in line.replace("GRAPH_VERTICES:", "").split(',')]
elif line.startswith("GRAPH_EDGE:"):
parts = [part.strip() for part in line.replace("GRAPH_EDGE:", "").split(',')]
if len(parts) == 3:
graph_edges.append((parts[0], parts[1], int(parts[2])))
else:
print(f"Warning: Malformed GRAPH_EDGE line: {line}")
elif line == "---SCRIPT---":
parsing_script = True
elif line.startswith("FIRING:") and parsing_script:
parts = [part.strip() for part in line.replace("FIRING:", "").split(',')]
if len(parts) == 2:
script_dict[parts[0]] = int(parts[1])
else:
print(f"Warning: Malformed FIRING line: {line}")
if not graph_vertex_names:
raise ValueError("GRAPH_VERTICES missing for firingscript.")
graph = CFGraph(set(graph_vertex_names), graph_edges)
return CFiringScript(graph, script_dict)
else:
print(f"Unsupported object_type for TXT reading: {object_type}")
return None
except FileNotFoundError:
print(f"Error: File not found at {file_path}")
return None
except ValueError as ve:
print(f"Error processing TXT data for {object_type}: {ve}")
return None
except Exception as e:
print(f"An unexpected error occurred while reading TXT from {file_path}: {e}")
return None
[docs]
def to_txt(self, cf_object, file_path: str):
"""
Writes a CF object to a .txt file.
Args:
cf_object: The CF object to serialize.
file_path (str): The path to save the .txt file.
"""
try:
lines_to_write = []
if isinstance(cf_object, CFGraph):
vertex_names = sorted([v.name for v in cf_object.vertices])
lines_to_write.append(f"VERTICES: {', '.join(vertex_names)}")
# Use to_dict to get canonical edge list
graph_dict = cf_object.to_dict()
for edge_data in sorted(graph_dict.get("edges", [])):
lines_to_write.append(f"EDGE: {edge_data[0]}, {edge_data[1]}, {edge_data[2]}")
elif isinstance(cf_object, CFDivisor):
graph = cf_object.graph
graph_vertex_names = sorted([v.name for v in graph.vertices])
lines_to_write.append(f"GRAPH_VERTICES: {', '.join(graph_vertex_names)}")
graph_dict = graph.to_dict()
for edge_data in sorted(graph_dict.get("edges", [])):
lines_to_write.append(f"GRAPH_EDGE: {edge_data[0]}, {edge_data[1]}, {edge_data[2]}")
lines_to_write.append("---DEGREES---")
# Sort degrees by vertex name for consistent output
sorted_degrees = sorted(cf_object.degrees.items(), key=lambda item: item[0].name)
for vertex, degree in sorted_degrees:
lines_to_write.append(f"DEGREE: {vertex.name}, {degree}")
elif isinstance(cf_object, CFOrientation):
graph = cf_object.graph
graph_vertex_names = sorted([v.name for v in graph.vertices])
lines_to_write.append(f"GRAPH_VERTICES: {', '.join(graph_vertex_names)}")
graph_dict = graph.to_dict()
for edge_data in sorted(graph_dict.get("edges", [])):
lines_to_write.append(f"GRAPH_EDGE: {edge_data[0]}, {edge_data[1]}, {edge_data[2]}")
lines_to_write.append("---ORIENTATIONS---")
# Use CFOrientation's to_dict to get canonical orientations
orientation_data = cf_object.to_dict().get("orientations", [])
for source, sink in sorted(orientation_data):
lines_to_write.append(f"ORIENTED: {source}, {sink}")
elif isinstance(cf_object, CFiringScript):
graph = cf_object.graph
graph_vertex_names = sorted([v.name for v in graph.vertices])
lines_to_write.append(f"GRAPH_VERTICES: {', '.join(graph_vertex_names)}")
graph_dict = graph.to_dict()
for edge_data in sorted(graph_dict.get("edges", [])):
lines_to_write.append(f"GRAPH_EDGE: {edge_data[0]}, {edge_data[1]}, {edge_data[2]}")
lines_to_write.append("---SCRIPT---")
# Get script data (name: firings), sort by name for consistency
script_data = cf_object.script # .script property gives all vertices
sorted_script_items = sorted(script_data.items())
for vertex_name, firings in sorted_script_items:
if firings != 0: # Only write non-zero firings for brevity
lines_to_write.append(f"FIRING: {vertex_name}, {firings}")
else:
print(f"Unsupported object type for TXT serialization: {type(cf_object)}")
lines_to_write.append(f"Object type: {type(cf_object)}")
lines_to_write.append("Data: TXT representation not implemented for this type.")
with open(file_path, 'w') as f:
for line in lines_to_write:
f.write(line + "\n")
print(f"Successfully wrote object to {file_path}.")
except Exception as e:
print(f"An error occurred while writing TXT to {file_path}: {e}")
# --- TeX Methods ---
[docs]
def to_tex(self, cf_object, file_path: str):
"""
Writes a CF object to a .tex file using basic TikZ representation.
Args:
cf_object: The CF object to serialize (CFGraph or CFDivisor).
file_path (str): The path to save the .tex file.
"""
_tikz_node_positions = {} # To store node names for edges e.g. (A) -> tikz_id
def _get_tikz_node_label(vertex_name, obj_type, obj_instance):
label = vertex_name.replace("_", "\\_")
if obj_type == 'divisor':
# Don't include degree in the node label
label = label
elif obj_type == 'firingscript':
# Don't include firing info in the node label
label = label
return label
def _generate_tikz_nodes(vertices_list, obj_type, obj_instance):
tex_node_lines = []
if not vertices_list:
return tex_node_lines
# Basic positioning
# First node
v_obj_first = vertices_list[0]
node_tikz_id = v_obj_first.name.replace("_", "") # TikZ node IDs can't have underscores usually
_tikz_node_positions[v_obj_first.name] = node_tikz_id
label = _get_tikz_node_label(v_obj_first.name, obj_type, obj_instance)
tex_node_lines.append(f" \\node[state] ({node_tikz_id}) {{{label}}};")
prev_node_tikz_id = node_tikz_id
# Subsequent nodes
for i, v_obj in enumerate(vertices_list[1:]):
current_real_idx = i + 1 # original index in vertices_list for positioning logic
node_tikz_id = v_obj.name.replace("_", "")
_tikz_node_positions[v_obj.name] = node_tikz_id
label = _get_tikz_node_label(v_obj.name, obj_type, obj_instance)
pos_str = "right=of " + prev_node_tikz_id
# Rudimentary grid: find the anchor node for 'below of'
# This assumes vertices_list is sorted consistently.
if current_real_idx % 3 == 1: # New row
anchor_idx = current_real_idx - (current_real_idx % 3) -1
if current_real_idx == 1:
anchor_idx = 0 # first in second row
else:
anchor_idx = current_real_idx - 1 - ( (current_real_idx-1) %3)
anchor_node_name_orig = vertices_list[anchor_idx].name
anchor_node_tikz_id = _tikz_node_positions[anchor_node_name_orig]
pos_str = "below=of " + anchor_node_tikz_id
elif current_real_idx % 3 == 2: # Third in row
# prev_node_tikz_id is already correct from previous iteration
pos_str = "right=of " + prev_node_tikz_id
# else: pos_str is "right=of " prev_node_tikz_id (default, for first and second in row)
tex_node_lines.append(f" \\node[state] ({node_tikz_id}) [{pos_str}] {{{label}}};")
prev_node_tikz_id = node_tikz_id
return tex_node_lines
tex_lines = [
"\\documentclass{article}",
"\\usepackage{tikz}",
"\\usetikzlibrary{automata, positioning, arrows.meta, shapes}",
"",
"\\begin{document}",
"",
"\\begin{tikzpicture}[shorten >=1pt, node distance=2.5cm, on grid, auto,",
" every state/.style={draw=black!50, thick, minimum size=0.8cm, inner sep=3pt},",
" edge_label/.style={midway, fill=white, inner sep=1pt, font=\\small}",
"]",
""
]
_tikz_node_positions.clear() # Clear for current object
try:
obj_specific_type_str = None # For _get_tikz_node_label
if isinstance(cf_object, CFGraph):
obj_specific_type_str = 'graph'
tex_lines.append("% Graph Definition")
# Vertices must be Vertex objects for CFGraph
vertices_for_layout = sorted(list(cf_object.vertices), key=lambda v: v.name)
tex_lines.extend(_generate_tikz_nodes(vertices_for_layout, obj_specific_type_str, cf_object))
tex_lines.append("")
tex_lines.append("% Edges")
graph_dict = cf_object.to_dict()
for v1_name, v2_name, valence in sorted(graph_dict.get("edges", [])):
u_node_id = _tikz_node_positions.get(v1_name)
v_node_id = _tikz_node_positions.get(v2_name)
if u_node_id and v_node_id:
# Draw multiple edges based on valence
for i in range(valence):
if valence == 1:
bend = 0
else:
if i % 2 == 0:
bend = 5
else:
bend = -5
bend_magnitude = 5 * (i // 2)
bend = (bend + bend_magnitude) if bend > 0 else (bend - bend_magnitude)
if bend > 0:
tex_lines.append(f" \\path[-] ({u_node_id}) edge[bend right={bend}] ({v_node_id});")
else:
tex_lines.append(f" \\path[-] ({u_node_id}) edge[bend left={abs(bend)}] ({v_node_id});")
elif isinstance(cf_object, CFDivisor):
obj_specific_type_str = 'divisor'
tex_lines.append("% Divisor Definition (Graph with Chip Counts)")
graph = cf_object.graph
vertices_for_layout = sorted(list(graph.vertices), key=lambda v: v.name)
tex_lines.extend(_generate_tikz_nodes(vertices_for_layout, obj_specific_type_str, cf_object))
# Add degree information as labels next to nodes
tex_lines.append("")
tex_lines.append("% Degree information as labels")
for vertex in vertices_for_layout:
degree = cf_object.degrees.get(vertex, 0)
node_id = _tikz_node_positions.get(vertex.name)
tex_lines.append(f" \\node[anchor=west, xshift=1pt, yshift=1pt, at=({node_id}.north east), font=\\small] {{{degree}}};")
tex_lines.append("")
tex_lines.append("% Edges")
graph_dict = graph.to_dict()
for v1_name, v2_name, valence in sorted(graph_dict.get("edges", [])):
u_node_id = _tikz_node_positions.get(v1_name)
v_node_id = _tikz_node_positions.get(v2_name)
if u_node_id and v_node_id:
# Draw multiple edges based on valence
for i in range(valence):
if valence == 1:
bend = 0
else:
if i % 2 == 0:
bend = 5
else:
bend = -5
bend_magnitude = 5 * (i // 2)
bend = (bend + bend_magnitude) if bend > 0 else (bend - bend_magnitude)
if bend > 0:
tex_lines.append(f" \\path[->] ({u_node_id}) edge[bend right={bend}] ({v_node_id});")
else:
tex_lines.append(f" \\path[->] ({u_node_id}) edge[bend left={abs(bend)}] ({v_node_id});")
elif isinstance(cf_object, CFOrientation):
obj_specific_type_str = 'orientation'
tex_lines.append("% Orientation Definition")
graph = cf_object.graph
vertices_for_layout = sorted(list(graph.vertices), key=lambda v: v.name)
tex_lines.extend(_generate_tikz_nodes(vertices_for_layout, obj_specific_type_str, cf_object))
tex_lines.append("")
tex_lines.append("% Edges with Orientations")
graph_edges_dict = { tuple(sorted((e[0],e[1]))): e[2] for e in graph.to_dict().get("edges", []) }
# Get orientations from the object: list of [source, sink]
oriented_pairs_set = { tuple(o) for o in cf_object.to_dict().get("orientations", []) }
for v1_s_name, v2_s_name in sorted(graph_edges_dict.keys()): # Iterate unique graph edges
valence = graph_edges_dict[(v1_s_name, v2_s_name)]
u_node_id = _tikz_node_positions.get(v1_s_name)
v_node_id = _tikz_node_positions.get(v2_s_name)
if not (u_node_id and v_node_id):
continue
# Determine arrow style based on orientation
if (v1_s_name, v2_s_name) in oriented_pairs_set: # v1 -> v2
# Draw multiple directed edges
for i in range(valence):
if valence == 1:
bend = 0
else:
if i % 2 == 0:
bend = 5
else:
bend = -5
bend_magnitude = 5 * (i // 2)
bend = (bend + bend_magnitude) if bend > 0 else (bend - bend_magnitude)
if bend > 0:
tex_lines.append(f" \\path[->] ({u_node_id}) edge[bend right={bend}] ({v_node_id});")
else:
tex_lines.append(f" \\path[->] ({u_node_id}) edge[bend left={abs(bend)}] ({v_node_id});")
elif (v2_s_name, v1_s_name) in oriented_pairs_set: # v2 -> v1
# Draw multiple directed edges in reverse
for i in range(valence):
if valence == 1:
bend = 0
else:
if i % 2 == 0:
bend = 5
else:
bend = -5
bend_magnitude = 5 * (i // 2)
bend = (bend + bend_magnitude) if bend > 0 else (bend - bend_magnitude)
if bend > 0:
tex_lines.append(f" \\path[->] ({v_node_id}) edge[bend right={bend}] ({u_node_id});")
else:
tex_lines.append(f" \\path[->] ({v_node_id}) edge[bend left={abs(bend)}] ({u_node_id});")
else: # Not in orientation list, draw as undirected or per specific style
# Draw multiple undirected edges
for i in range(valence):
if valence == 1:
bend = 0
else:
if i % 2:
bend = 5
else:
bend = -5
bend_magnitude = 5 * (i // 2)
bend = (bend + bend_magnitude) if bend > 0 else (bend - bend_magnitude)
if bend > 0:
tex_lines.append(f" \\path[-] ({u_node_id}) edge[bend right={bend}] ({v_node_id});")
else:
tex_lines.append(f" \\path[-] ({u_node_id}) edge[bend left={abs(bend)}] ({v_node_id});")
elif isinstance(cf_object, CFiringScript):
obj_specific_type_str = 'firingscript'
tex_lines.append("% Firing Script Definition")
graph = cf_object.graph
vertices_for_layout = sorted(list(graph.vertices), key=lambda v: v.name)
tex_lines.extend(_generate_tikz_nodes(vertices_for_layout, obj_specific_type_str, cf_object))
# Add firing information as labels next to nodes
tex_lines.append("")
tex_lines.append("% Firing information as labels")
for vertex in vertices_for_layout:
firings = cf_object.script.get(vertex.name, 0)
if firings != 0: # Only show non-zero firing counts
node_id = _tikz_node_positions.get(vertex.name)
tex_lines.append(f" \\node[anchor=west, xshift=1pt, yshift=1pt, at=({node_id}.north east), font=\\small] {{{firings}}};")
tex_lines.append("")
tex_lines.append("% Edges (structure only, firings are shown next to nodes)")
graph_dict = graph.to_dict()
for v1_name, v2_name, valence in sorted(graph_dict.get("edges", [])):
u_node_id = _tikz_node_positions.get(v1_name)
v_node_id = _tikz_node_positions.get(v2_name)
if u_node_id and v_node_id:
# Draw multiple edges based on valence
for i in range(valence):
if valence == 1:
bend = 0
else:
if i % 2 == 0:
bend = 5
else:
bend = -5
bend_magnitude = 5 * (i // 2)
bend = (bend + bend_magnitude) if bend > 0 else (bend - bend_magnitude)
if bend > 0:
tex_lines.append(f" \\path[-] ({u_node_id}) edge[bend right={bend}] ({v_node_id});")
else:
tex_lines.append(f" \\path[-] ({u_node_id}) edge[bend left={abs(bend)}] ({v_node_id});")
else:
print(f"Unsupported object type for TeX serialization: {type(cf_object)}")
tex_lines.append(f"% Object type: {type(cf_object)}")
tex_lines.append("% Data: TeX representation not implemented for this type.")
tex_lines.extend([
"",
"\\end{tikzpicture}",
"",
"\\end{document}"
])
with open(file_path, 'w') as f:
for line in tex_lines:
f.write(line + "\n")
print(f"Successfully wrote TeX representation to {file_path}.")
except Exception as e:
print(f"An error occurred while writing TeX to {file_path}: {e}")
raise e
if __name__ == "__main__":
processor = CFDataProcessor()
graph = processor.read_txt("tests/data/txt/favorite_graph.txt", "graph")
processor.to_tex(graph, "tests/data/tex/favorite_graph.tex")
divisor = processor.read_txt("tests/data/txt/favorite_divisor.txt", "divisor")
processor.to_tex(divisor, "tests/data/tex/favorite_divisor.tex")
orientation = processor.read_txt("tests/data/txt/favorite_orientation.txt", "orientation")
processor.to_tex(orientation, "tests/data/tex/favorite_orientation.tex")
firingscript = processor.read_txt("tests/data/txt/favorite_firing_script.txt", "firingscript")
processor.to_tex(firingscript, "tests/data/tex/favorite_firing_script.tex")