Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 69 additions & 38 deletions conda_forge_tick/migrators/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -704,6 +704,69 @@ def migrator_uid(self, attrs: "AttrsTypedDict") -> "MigrationUidTypedDict":

return d

def get_blocking_predecessors(
self, attrs: "AttrsTypedDict", graph: nx.DiGraph | None = None
) -> list[tuple[str, dict | None]]:
"""Get list of predecessors (dependencies) that are blocking this package.

Parameters
----------
attrs : AttrsTypedDict
The node attributes
graph : nx.DiGraph | None, optional
Graph to use for finding predecessors. If None, uses self.graph.
Useful when the node might not be in self.graph.

Returns:
List of (node_name, pr_data) tuples for blocking predecessors.
pr_data is None if predecessor doesn't have this migration in PRed.
pr_data is a dict with PR info if predecessor has an open PR for this migration.
"""
use_graph = graph if graph is not None else self.graph
if use_graph is None:
return []

feedstock_name = attrs.get("feedstock_name")
if feedstock_name not in use_graph.nodes():
return []

ignored_deps = getattr(self, "ignored_deps_per_node", {})

blocking = []
for node, payload in _gen_active_feedstocks_payloads(
use_graph.predecessors(feedstock_name),
use_graph,
):
if node in ignored_deps.get(
attrs.get("feedstock_name", None),
[],
):
continue

muid = frozen_to_json_friendly(self.migrator_uid(payload))

if muid not in _sanitized_muids(
payload.get("pr_info", {}).get("PRed", []),
):
logger.debug("not yet built: %s", node)
blocking.append((node, None))
continue

m_pred_json = None
for pr_json in payload.get("pr_info", {}).get("PRed", []):
if pr_json.get("data") == muid["data"]:
m_pred_json = pr_json
break

if (
m_pred_json
and m_pred_json.get("PR", {"state": "open"}).get("state", "") == "open"
):
logger.debug("not yet built: %s", node)
blocking.append((node, m_pred_json.get("PR", {})))

return blocking

def order(
self,
graph: nx.DiGraph,
Expand Down Expand Up @@ -910,44 +973,12 @@ def all_predecessors_issued(self, attrs: "AttrsTypedDict") -> bool:
return True

def predecessors_not_yet_built(self, attrs: "AttrsTypedDict") -> bool:
# Check if all upstreams have been built
if self.graph is None:
raise ValueError("graph is None")
for node, payload in _gen_active_feedstocks_payloads(
self.graph.predecessors(attrs["feedstock_name"]),
self.graph,
):
if node in self.ignored_deps_per_node.get(
attrs.get("feedstock_name", None),
[],
):
continue

muid = frozen_to_json_friendly(self.migrator_uid(payload))

if muid not in _sanitized_muids(
payload.get("pr_info", {}).get("PRed", []),
):
logger.debug("not yet built: %s", node)
return True

# This is due to some PRed_json loss due to bad graph deploy outage
for m_pred_json in payload.get("pr_info", {}).get("PRed", []):
if m_pred_json["data"] == muid["data"]:
break
else:
m_pred_json = None

# note that if the bot is missing the PR we assume it is open
# so that errors halt the migration and can be fixed
if (
m_pred_json
and m_pred_json.get("PR", {"state": "open"}).get("state", "") == "open"
):
logger.debug("not yet built: %s", node)
return True

return False
"""Check if any predecessors are blocking this package.

Returns True if any predecessor is blocking, False otherwise.
This method uses get_blocking_predecessors to determine blocking status.
"""
return len(self.get_blocking_predecessors(attrs)) > 0

def filter_not_in_migration(self, attrs, not_bad_str_start=""):
if super().filter_not_in_migration(attrs, not_bad_str_start):
Expand Down
166 changes: 165 additions & 1 deletion conda_forge_tick/status_report.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
ArchRebuild,
GraphMigrator,
MatplotlibBase,
MigrationYaml,
MigrationYamlCreator,
Migrator,
OSXArm,
Expand Down Expand Up @@ -52,6 +53,21 @@
"unstable",
]

# Cache for migrators by name, loaded on demand
_migrators_by_name_cache: Dict[str, Migrator] | None = None


def _get_migrators_by_name() -> Dict[str, Migrator]:
"""Get mapping of migrator report_name to migrator instance.

Uses a module-level cache to avoid reloading migrators multiple times.
"""
global _migrators_by_name_cache
if _migrators_by_name_cache is None:
migrators = load_migrators(skip_paused=False)
_migrators_by_name_cache = {m.report_name: m for m in migrators}
return _migrators_by_name_cache


def _sorted_set_json(obj: Any) -> Any:
"""If obj is a set, return sorted(obj). Else, raise TypeError.
Expand Down Expand Up @@ -164,6 +180,37 @@ def write_version_migrator_status(migrator, mctx):
)


def _get_waiting_migrators(migrator: Migrator, attrs: dict) -> list[str]:
"""Get list of migrators that this package is waiting for.

Returns empty list if not waiting for any migrators.
"""
if not isinstance(migrator, MigrationYaml):
return []

migrator_payload = migrator.loaded_yaml.get("__migrator", {})
wait_for_migrators = migrator_payload.get("wait_for_migrators", [])

if not wait_for_migrators:
return []

found_migrators = set()
for migration in attrs.get("pr_info", {}).get("PRed", []):
name = migration.get("data", {}).get("name", "")
if not name or name not in wait_for_migrators:
continue
found_migrators.add(name)
state = migration.get("PR", {}).get("state", "")
if state != "closed":
return list(wait_for_migrators)

missing_migrators = set(wait_for_migrators) - found_migrators
if missing_migrators:
return list(wait_for_migrators)

return []


def graph_migrator_status(
migrator: Migrator,
gx: nx.DiGraph,
Expand Down Expand Up @@ -200,6 +247,7 @@ def graph_migrator_status(

for node, node_attrs in gx2.nodes.items():
attrs = node_attrs["payload"]

# remove archived from status
if attrs.get("archived", False):
continue
Expand Down Expand Up @@ -344,20 +392,117 @@ def graph_migrator_status(
)
node_metadata["pr_status"] = pr_json["PR"].get("mergeable_state", "")

# Collect waiting migrators info for creating fake nodes
waiting_migrators_map: Dict[str, list[str]] = {}
for node, node_attrs in gx2.nodes.items():
attrs = node_attrs["payload"]

# remove archived from status
if attrs.get("archived", False):
continue

# Check if waiting for other migrators
waiting_migrators = _get_waiting_migrators(migrator, attrs)
if waiting_migrators:
waiting_migrators_map[node] = waiting_migrators

fake_migrator_nodes: Dict[str, Dict] = {}

for node, migrator_names in waiting_migrators_map.items():
node_attrs = gx2.nodes[node]
attrs = node_attrs["payload"]

for migrator_name in migrator_names:
migrators_by_name = _get_migrators_by_name()
if migrator_name not in migrators_by_name:
continue
waiting_migrator = migrators_by_name[migrator_name]

nuid = waiting_migrator.migrator_uid(attrs)
nuid_data = frozen_to_json_friendly(nuid)["data"]

matching_prs = []
for pr_json in attrs.get("pr_info", {}).get("PRed", []):
if pr_json and pr_json.get("data") == nuid_data:
matching_prs.append(pr_json)

if not matching_prs:
continue

for matching_pr_json in matching_prs:
pr_data = matching_pr_json.get("PR", {})
pr_number = pr_data.get("number")
if pr_number is None:
continue

fake_parent = f"migrator_{migrator_name}_{node}_{pr_number}"

if fake_parent not in gx2.nodes():
gx2.add_node(fake_parent, payload={})

pr_url = pr_data.get("html_url", "")
pr_status = pr_data.get("state", "")

if not pr_url:
feedstock_name = attrs.get("feedstock_name", node)
pr_url = f"https://github.com/conda-forge/{feedstock_name}-feedstock/pull/{pr_number}"

fake_migrator_nodes[fake_parent] = {
"pre_pr_migrator_status": "",
"pr_url": pr_url,
"pr_status": pr_status,
}
feedstock_metadata[fake_parent] = fake_migrator_nodes[fake_parent]

if pr_status == "closed":
out["bot-error"].add(fake_parent)
print(
f"Package '{node}' waiting for migrator '{migrator_name}' but PR #{pr_number} is already closed. "
f"Waiting logic may be incorrect.",
flush=True,
)
blocking_preds = waiting_migrator.get_blocking_predecessors(attrs, gx2)
for pred_node, _ in blocking_preds:
if pred_node in gx2.nodes() and not gx2.has_edge(
pred_node, fake_parent
):
gx2.add_edge(pred_node, fake_parent)
else:
out["in-pr"].add(fake_parent)

if node in gx2.nodes() and not gx2.has_edge(fake_parent, node):
gx2.add_edge(fake_parent, node)

for node_name, node_metadata in fake_migrator_nodes.items():
node_metadata["num_descendants"] = len(nx.descendants(gx2, node_name))
node_metadata["immediate_children"] = [
k
for k in sorted(gx2.successors(node_name))
if not gx2[k].get("payload", {}).get("archived", False)
]

out2: Dict = {}
for k in out.keys():
# Include all items, even if not in build_sequence (like fake migrator nodes)
out2[k] = list(
sorted(
out[k],
key=lambda x: (
build_sequence.index(x) if x in build_sequence else -1,
build_sequence.index(x)
if x in build_sequence
else len(build_sequence),
x,
),
),
)

out2["_feedstock_status"] = feedstock_metadata

for (e0, e1), edge_attrs in gx2.edges.items():
# Skip edges involving fake migrator nodes - handle separately
if e0.startswith("migrator_") or e1.startswith("migrator_"):
continue

if (
e0 not in out["done"]
and e1 not in out["done"]
Expand All @@ -366,6 +511,25 @@ def graph_migrator_status(
):
gv.edge(e0, e1)

# Add nodes and edges for fake migrator parents in visualization
# (Metadata and awaiting-parents status already added above, before sorting)
for node_name in gx2.nodes():
if node_name.startswith("migrator_"):
migrator_display_name = node_name.replace("migrator_", "")

# Style migrator nodes differently (awaiting-parents color is #fde725)
gv.node(
node_name,
label=_clean_text(migrator_display_name),
fillcolor="#fde725", # Same color as awaiting-parents
style="filled,dashed",
fontcolor="black",
)
# Add edges from migrator to waiting packages with dashed style
for successor in gx2.successors(node_name):
if successor not in out["done"]:
gv.edge(node_name, successor, style="dashed", color="orange")

print(" len(gv):", num_viz, flush=True)
out2["_num_viz"] = num_viz

Expand Down