|
16 | 16 |
|
17 | 17 | log = RunPodLogger() |
18 | 18 |
|
| 19 | +# _runpod_sls_get_jobs status codes |
| 20 | +STILL_WAITING = 0 |
| 21 | +OK = 1 |
| 22 | +ERROR_FROM_SERVER = 2 |
| 23 | +ERROR_BUFFER_TOO_SMALL = 3 |
19 | 24 |
|
20 | 25 | class CGetJobResult(ctypes.Structure): # pylint: disable=too-few-public-methods |
21 | 26 | """ |
@@ -132,25 +137,36 @@ def _json_serialize_job_data(self, job_data: Any) -> bytes: |
132 | 137 |
|
133 | 138 | def get_jobs(self, max_concurrency: int, max_jobs: int) -> List[Dict[str, Any]]: |
134 | 139 | """Get a job or jobs from the queue. The jobs are returned as a list of Job objects.""" |
135 | | - buffer = ctypes.create_string_buffer( |
| 140 | + buf = ctypes.create_string_buffer( |
136 | 141 | 1024 * 1024 * 20 |
137 | 142 | ) # 20MB buffer to store jobs in |
138 | | - destination_length = len(buffer.raw) |
139 | | - result: CGetJobResult = self._get_jobs( |
| 143 | + destination_length = len(buf.raw) |
| 144 | + res: CGetJobResult = self._get_jobs( |
140 | 145 | c_int(max_concurrency), |
141 | 146 | c_int(max_jobs), |
142 | | - byref(buffer), |
| 147 | + byref(buf), |
143 | 148 | c_int(destination_length), |
144 | 149 | ) |
145 | | - if ( |
146 | | - result.status_code == 1 |
147 | | - ): # success! the job was stored bytes 0..res_len of buf.raw |
148 | | - return list(json.loads(buffer.raw[: result.res_len].decode("utf-8"))) |
149 | | - |
150 | | - if result.status_code not in [0, 1]: |
151 | | - raise RuntimeError(f"get_jobs failed with status code {result.status_code}") |
| 150 | + n = res.res_len |
| 151 | + code = res.status_code |
| 152 | + if code == STILL_WAITING: |
| 153 | + return [] # still waiting for jobs |
| 154 | + elif code == OK: # success! the job was stored bytes 0..res_len of buf.raw |
| 155 | + log.trace(f"decoding {n} bytes of JSON") |
| 156 | + return list(json.loads(buf.raw[:n].decode("utf-8"))) |
| 157 | + elif code == ERROR_FROM_SERVER: |
| 158 | + try: |
| 159 | + b = buf.raw[: res.res_len].decode("utf-8") |
| 160 | + except Exception: |
| 161 | + b = "<failed to decode buffer>" |
| 162 | + if b == "": |
| 163 | + b = "<unknown error or buffer too small>" |
| 164 | + raise ValueError(f"_runpod_sls_get_jobs: status code 2: error from server: {b}") |
| 165 | + elif code == ERROR_BUFFER_TOO_SMALL: # buffer too small |
| 166 | + raise ValueError("_runpod_sls_get_jobs: status code 3: buffer too small") |
| 167 | + else: |
| 168 | + raise ValueError(f"_runpod_sls_get_jobs: unknown status code {code}") |
152 | 169 |
|
153 | | - return [] # Status code 0, still waiting for jobs |
154 | 170 |
|
155 | 171 | def progress_update(self, job_id: str, json_data: bytes) -> bool: |
156 | 172 | """ |
@@ -263,9 +279,13 @@ async def run(config: Dict[str, Any]) -> None: |
263 | 279 | max_jobs = config.get("max_jobs", 1) |
264 | 280 |
|
265 | 281 | serverless_hook = Hook() |
266 | | - |
267 | 282 | while True: |
268 | | - jobs = serverless_hook.get_jobs(max_concurrency, max_jobs) |
| 283 | + try: |
| 284 | + jobs = serverless_hook.get_jobs(max_concurrency, max_jobs) |
| 285 | + except RuntimeError as err: |
| 286 | + log.error(f"SLS Core | Error getting jobs: {err}") |
| 287 | + await asyncio.sleep(0.2) # sleep for a bit before trying again |
| 288 | + continue |
269 | 289 |
|
270 | 290 | if len(jobs) == 0 or jobs is None: |
271 | 291 | await asyncio.sleep(0) |
|
0 commit comments