66import time
77import uuid
88import pickle
9- import fcntl
109import tempfile
1110from typing import Any , Dict , Optional , Set
1211
12+ from filelock import FileLock
13+
1314from .rp_logger import RunPodLogger
1415
1516
@@ -95,25 +96,23 @@ def _load_state(self):
9596 os .path .exists (self ._STATE_FILE )
9697 and os .path .getsize (self ._STATE_FILE ) > 0
9798 ):
98- with open (self ._STATE_FILE , "rb" ) as f :
99- fcntl .flock (f , fcntl .LOCK_SH )
100- try :
101- loaded_jobs = pickle .load (f )
102- # Clear current state and add loaded jobs
103- super ().clear ()
104- for job in loaded_jobs :
105- set .add (
106- self , job
107- ) # Use set.add to avoid triggering _save_state
108-
109- except (EOFError , pickle .UnpicklingError ):
110- # Handle empty or corrupted file
111- log .debug (
112- "JobsProgress: Failed to load state file, starting with empty state"
113- )
114- pass
115- finally :
116- fcntl .flock (f , fcntl .LOCK_UN )
99+ with FileLock (self ._STATE_FILE + '.lock' ):
100+ with open (self ._STATE_FILE , "rb" ) as f :
101+ try :
102+ loaded_jobs = pickle .load (f )
103+ # Clear current state and add loaded jobs
104+ super ().clear ()
105+ for job in loaded_jobs :
106+ set .add (
107+ self , job
108+ ) # Use set.add to avoid triggering _save_state
109+
110+ except (EOFError , pickle .UnpicklingError ):
111+ # Handle empty or corrupted file
112+ log .debug (
113+ "JobsProgress: Failed to load state file, starting with empty state"
114+ )
115+ pass
117116
118117 except FileNotFoundError :
119118 log .debug ("JobsProgress: No state file found, starting with empty state" )
@@ -123,17 +122,14 @@ def _save_state(self):
123122 """Save jobs state to pickle file with atomic write and file locking."""
124123 try :
125124 # Use temporary file for atomic write
126- with tempfile .NamedTemporaryFile (
127- dir = self ._STATE_DIR , delete = False , mode = "wb"
128- ) as temp_f :
129- fcntl .flock (temp_f , fcntl .LOCK_EX )
130- try :
125+ with FileLock (self ._STATE_FILE + '.lock' ):
126+ with tempfile .NamedTemporaryFile (
127+ dir = self ._STATE_DIR , delete = False , mode = "wb"
128+ ) as temp_f :
131129 pickle .dump (set (self ), temp_f )
132- finally :
133- fcntl .flock (temp_f , fcntl .LOCK_UN )
134-
135- # Atomically replace the state file
136- os .replace (temp_f .name , self ._STATE_FILE )
130+
131+ # Atomically replace the state file
132+ os .replace (temp_f .name , self ._STATE_FILE )
137133 except Exception as e :
138134 log .error (f"Failed to save job state: { e } " )
139135
0 commit comments