From d565251db01f2b369404d4075d66898022bae802 Mon Sep 17 00:00:00 2001 From: Egor Timofeev Date: Thu, 8 May 2025 13:52:26 +0200 Subject: [PATCH 1/7] Use /checkpoints instead of events parsing --- pyproject.toml | 2 +- src/together/resources/finetune.py | 101 ++++++++++++++++++++++++++++- 2 files changed, 99 insertions(+), 4 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 94226c18..2b5ccafd 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -12,7 +12,7 @@ build-backend = "poetry.masonry.api" [tool.poetry] name = "together" -version = "1.5.7" +version = "1.5.8" authors = ["Together AI "] description = "Python client for Together's Cloud Platform!" readme = "README.md" diff --git a/src/together/resources/finetune.py b/src/together/resources/finetune.py index 275d6839..89f97bb6 100644 --- a/src/together/resources/finetune.py +++ b/src/together/resources/finetune.py @@ -2,7 +2,7 @@ import re from pathlib import Path -from typing import List, Literal +from typing import Dict, List, Literal from rich import print as rprint @@ -222,6 +222,49 @@ def create_finetune_request( return finetune_request +def _parse_raw_checkpoints( + checkpoints: List[Dict[str, str]], id: str +) -> List[FinetuneCheckpoint]: + """ + Helper function to process raw checkpoints and create checkpoint list. + + Args: + checkpoints (List[Dict[str, str]]): List of raw checkpoints metadata + id (str): Fine-tune job ID + + Returns: + List[FinetuneCheckpoint]: List of available checkpoints + """ + had_adapters = any(ckpt["path"].endswith("_adapter") for ckpt in checkpoints) + + parsed_checkpoints = [] + for checkpoint in checkpoints: + checkpoint_path = checkpoint["path"] + step = checkpoint["step"] + + is_final = int(step) == 0 + checkpoint_name = f"{id}:step" if step else id + + if is_final: + if checkpoint_path.endswith("_adapter"): + checkpoint_type = "Final Adapter" + else: + checkpoint_type = "Final Merged" if had_adapters else "Final" + else: + checkpoint_type = "Intermediate" + + parsed_checkpoints.append( + FinetuneCheckpoint( + type=checkpoint_type, + timestamp=checkpoint["created_at"], + name=checkpoint_name, + ) + ) + + parsed_checkpoints.sort(key=lambda x: x.timestamp, reverse=True) + return parsed_checkpoints + + def _process_checkpoints_from_events( events: List[FinetuneEvent], id: str ) -> List[FinetuneCheckpoint]: @@ -551,7 +594,7 @@ def list_events(self, id: str) -> FinetuneListEvents: return FinetuneListEvents(**response.data) - def list_checkpoints(self, id: str) -> List[FinetuneCheckpoint]: + def list_checkpoints_from_events(self, id: str) -> List[FinetuneCheckpoint]: """ List available checkpoints for a fine-tuning job @@ -564,6 +607,32 @@ def list_checkpoints(self, id: str) -> List[FinetuneCheckpoint]: events = self.list_events(id).data or [] return _process_checkpoints_from_events(events, id) + def list_checkpoints(self, id: str) -> List[FinetuneCheckpoint]: + """ + List available checkpoints for a fine-tuning job + + Args: + id (str): Unique identifier of the fine-tune job to list checkpoints for + + Returns: + List[FinetuneCheckpoint]: List of available checkpoints + """ + requestor = api_requestor.APIRequestor( + client=self._client, + ) + + response, _, _ = requestor.request( + options=TogetherRequest( + method="GET", + url=f"fine-tunes/{id}/checkpoints", + ), + stream=False, + ) + assert isinstance(response, TogetherResponse) + + raw_checkpoints = response.data["data"] + return _parse_raw_checkpoints(raw_checkpoints, id) + def download( self, id: str, @@ -942,7 +1011,7 @@ async def list_events(self, id: str) -> FinetuneListEvents: return events_list - async def list_checkpoints(self, id: str) -> List[FinetuneCheckpoint]: + async def list_checkpoints_from_events(self, id: str) -> List[FinetuneCheckpoint]: """ List available checkpoints for a fine-tuning job @@ -956,6 +1025,32 @@ async def list_checkpoints(self, id: str) -> List[FinetuneCheckpoint]: events = events_list.data or [] return _process_checkpoints_from_events(events, id) + async def list_checkpoints(self, id: str) -> List[FinetuneCheckpoint]: + """ + List available checkpoints for a fine-tuning job + + Args: + id (str): Unique identifier of the fine-tune job to list checkpoints for + + Returns: + List[FinetuneCheckpoint]: List of available checkpoints + """ + requestor = api_requestor.APIRequestor( + client=self._client, + ) + + response, _, _ = await requestor.arequest( + options=TogetherRequest( + method="GET", + url=f"fine-tunes/{id}/checkpoints", + ), + stream=False, + ) + assert isinstance(response, TogetherResponse) + + raw_checkpoints = response.data["data"] + return _parse_raw_checkpoints(raw_checkpoints, id) + async def download( self, id: str, *, output: str | None = None, checkpoint_step: int = -1 ) -> str: From 6889b537aa75dbfeede12bc3dcb0b4b5cf552f1e Mon Sep 17 00:00:00 2001 From: Egor Timofeev Date: Thu, 8 May 2025 16:05:56 +0200 Subject: [PATCH 2/7] Fix events listing for async client --- src/together/resources/finetune.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/together/resources/finetune.py b/src/together/resources/finetune.py index 89f97bb6..fc6b24f7 100644 --- a/src/together/resources/finetune.py +++ b/src/together/resources/finetune.py @@ -1005,11 +1005,9 @@ async def list_events(self, id: str) -> FinetuneListEvents: ), stream=False, ) + assert isinstance(events_response, TogetherResponse) - # FIXME: API returns "data" field with no object type (should be "list") - events_list = FinetuneListEvents(object="list", **events_response.data) - - return events_list + return FinetuneListEvents(**events_response.data) async def list_checkpoints_from_events(self, id: str) -> List[FinetuneCheckpoint]: """ From 583a65b9f2cc8a60d2139871328b24112e776ca6 Mon Sep 17 00:00:00 2001 From: Egor Timofeev Date: Fri, 9 May 2025 17:14:45 +0200 Subject: [PATCH 3/7] Change the parsing logic due to the api changes --- src/together/resources/finetune.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/src/together/resources/finetune.py b/src/together/resources/finetune.py index fc6b24f7..0b750919 100644 --- a/src/together/resources/finetune.py +++ b/src/together/resources/finetune.py @@ -235,23 +235,22 @@ def _parse_raw_checkpoints( Returns: List[FinetuneCheckpoint]: List of available checkpoints """ - had_adapters = any(ckpt["path"].endswith("_adapter") for ckpt in checkpoints) + has_adapters = any(ckpt["is_adapter"] for ckpt in checkpoints) parsed_checkpoints = [] for checkpoint in checkpoints: - checkpoint_path = checkpoint["path"] step = checkpoint["step"] - - is_final = int(step) == 0 - checkpoint_name = f"{id}:step" if step else id + is_adapter = checkpoint["is_adapter"] + is_final = checkpoint["is_final"] + checkpoint_name = f"{id}:{step}" if not is_final else id if is_final: - if checkpoint_path.endswith("_adapter"): + if is_adapter: checkpoint_type = "Final Adapter" else: - checkpoint_type = "Final Merged" if had_adapters else "Final" + checkpoint_type = "Final Merged" if has_adapters else "Final" else: - checkpoint_type = "Intermediate" + checkpoint_type = "Intermediate (step {step})" parsed_checkpoints.append( FinetuneCheckpoint( From e52ed3892602eb32c0d9f9a6143ef44fd27283a5 Mon Sep 17 00:00:00 2001 From: Egor Timofeev Date: Fri, 9 May 2025 17:18:16 +0200 Subject: [PATCH 4/7] Fix string formatting --- src/together/resources/finetune.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/together/resources/finetune.py b/src/together/resources/finetune.py index 0b750919..a84f4231 100644 --- a/src/together/resources/finetune.py +++ b/src/together/resources/finetune.py @@ -250,7 +250,7 @@ def _parse_raw_checkpoints( else: checkpoint_type = "Final Merged" if has_adapters else "Final" else: - checkpoint_type = "Intermediate (step {step})" + checkpoint_type = f"Intermediate (step {step})" parsed_checkpoints.append( FinetuneCheckpoint( From de821b257ff213c7542193082d0e2787e9489d76 Mon Sep 17 00:00:00 2001 From: Egor Timofeev Date: Fri, 9 May 2025 17:57:54 +0200 Subject: [PATCH 5/7] Parsing updated --- src/together/resources/finetune.py | 14 ++------------ 1 file changed, 2 insertions(+), 12 deletions(-) diff --git a/src/together/resources/finetune.py b/src/together/resources/finetune.py index a84f4231..1f3d44cf 100644 --- a/src/together/resources/finetune.py +++ b/src/together/resources/finetune.py @@ -235,22 +235,12 @@ def _parse_raw_checkpoints( Returns: List[FinetuneCheckpoint]: List of available checkpoints """ - has_adapters = any(ckpt["is_adapter"] for ckpt in checkpoints) parsed_checkpoints = [] for checkpoint in checkpoints: step = checkpoint["step"] - is_adapter = checkpoint["is_adapter"] - is_final = checkpoint["is_final"] - checkpoint_name = f"{id}:{step}" if not is_final else id - - if is_final: - if is_adapter: - checkpoint_type = "Final Adapter" - else: - checkpoint_type = "Final Merged" if has_adapters else "Final" - else: - checkpoint_type = f"Intermediate (step {step})" + checkpoint_type = checkpoint["checkpoint_type"] + checkpoint_name = f"{id}:{step}" if "intermediate" in checkpoint_type.lower() else id parsed_checkpoints.append( FinetuneCheckpoint( From 08d062dad9fb8bde830c6e1c2306cc1d17a3ebc8 Mon Sep 17 00:00:00 2001 From: Egor Timofeev Date: Fri, 9 May 2025 18:00:12 +0200 Subject: [PATCH 6/7] Formatting --- src/together/resources/finetune.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/together/resources/finetune.py b/src/together/resources/finetune.py index 1f3d44cf..b7752445 100644 --- a/src/together/resources/finetune.py +++ b/src/together/resources/finetune.py @@ -240,7 +240,9 @@ def _parse_raw_checkpoints( for checkpoint in checkpoints: step = checkpoint["step"] checkpoint_type = checkpoint["checkpoint_type"] - checkpoint_name = f"{id}:{step}" if "intermediate" in checkpoint_type.lower() else id + checkpoint_name = ( + f"{id}:{step}" if "intermediate" in checkpoint_type.lower() else id + ) parsed_checkpoints.append( FinetuneCheckpoint( From e32143044e729d482528a4c5e53470f20d6839a8 Mon Sep 17 00:00:00 2001 From: Egor Timofeev Date: Fri, 9 May 2025 18:16:30 +0200 Subject: [PATCH 7/7] Remove old implementation --- src/together/resources/finetune.py | 103 +---------------------------- 1 file changed, 2 insertions(+), 101 deletions(-) diff --git a/src/together/resources/finetune.py b/src/together/resources/finetune.py index b7752445..8d0bf97e 100644 --- a/src/together/resources/finetune.py +++ b/src/together/resources/finetune.py @@ -30,16 +30,8 @@ TrainingMethodSFT, TrainingType, ) -from together.types.finetune import ( - DownloadCheckpointType, - FinetuneEvent, - FinetuneEventType, -) -from together.utils import ( - get_event_step, - log_warn_once, - normalize_key, -) +from together.types.finetune import DownloadCheckpointType +from together.utils import log_warn_once, normalize_key _FT_JOB_WITH_STEP_REGEX = r"^ft-[\dabcdef-]+:\d+$" @@ -256,70 +248,6 @@ def _parse_raw_checkpoints( return parsed_checkpoints -def _process_checkpoints_from_events( - events: List[FinetuneEvent], id: str -) -> List[FinetuneCheckpoint]: - """ - Helper function to process events and create checkpoint list. - - Args: - events (List[FinetuneEvent]): List of fine-tune events to process - id (str): Fine-tune job ID - - Returns: - List[FinetuneCheckpoint]: List of available checkpoints - """ - checkpoints: List[FinetuneCheckpoint] = [] - - for event in events: - event_type = event.type - - if event_type == FinetuneEventType.CHECKPOINT_SAVE: - step = get_event_step(event) - checkpoint_name = f"{id}:{step}" if step is not None else id - - checkpoints.append( - FinetuneCheckpoint( - type=( - f"Intermediate (step {step})" - if step is not None - else "Intermediate" - ), - timestamp=event.created_at, - name=checkpoint_name, - ) - ) - elif event_type == FinetuneEventType.JOB_COMPLETE: - if hasattr(event, "model_path"): - checkpoints.append( - FinetuneCheckpoint( - type=( - "Final Merged" - if hasattr(event, "adapter_path") - else "Final" - ), - timestamp=event.created_at, - name=id, - ) - ) - - if hasattr(event, "adapter_path"): - checkpoints.append( - FinetuneCheckpoint( - type=( - "Final Adapter" if hasattr(event, "model_path") else "Final" - ), - timestamp=event.created_at, - name=id, - ) - ) - - # Sort by timestamp (newest first) - checkpoints.sort(key=lambda x: x.timestamp, reverse=True) - - return checkpoints - - class FineTuning: def __init__(self, client: TogetherClient) -> None: self._client = client @@ -585,19 +513,6 @@ def list_events(self, id: str) -> FinetuneListEvents: return FinetuneListEvents(**response.data) - def list_checkpoints_from_events(self, id: str) -> List[FinetuneCheckpoint]: - """ - List available checkpoints for a fine-tuning job - - Args: - id (str): Unique identifier of the fine-tune job to list checkpoints for - - Returns: - List[FinetuneCheckpoint]: List of available checkpoints - """ - events = self.list_events(id).data or [] - return _process_checkpoints_from_events(events, id) - def list_checkpoints(self, id: str) -> List[FinetuneCheckpoint]: """ List available checkpoints for a fine-tuning job @@ -1000,20 +915,6 @@ async def list_events(self, id: str) -> FinetuneListEvents: return FinetuneListEvents(**events_response.data) - async def list_checkpoints_from_events(self, id: str) -> List[FinetuneCheckpoint]: - """ - List available checkpoints for a fine-tuning job - - Args: - id (str): Unique identifier of the fine-tune job to list checkpoints for - - Returns: - List[FinetuneCheckpoint]: Object containing list of available checkpoints - """ - events_list = await self.list_events(id) - events = events_list.data or [] - return _process_checkpoints_from_events(events, id) - async def list_checkpoints(self, id: str) -> List[FinetuneCheckpoint]: """ List available checkpoints for a fine-tuning job