From 3462b5fd79f8b5d8266d750501c037990861bd68 Mon Sep 17 00:00:00 2001 From: Johannes Otepka Date: Fri, 23 Jan 2026 10:47:33 +0100 Subject: [PATCH 01/12] first version implemented --- ipyparallel/client/remotefunction.py | 2 ++ ipyparallel/client/view.py | 12 ++++++-- ipyparallel/controller/dictdb.py | 16 ++++++++++ ipyparallel/controller/hub.py | 37 ++++++++++++++++++++++++ ipyparallel/controller/task_scheduler.py | 8 +++++ ipyparallel/engine/kernel.py | 1 + 6 files changed, 73 insertions(+), 3 deletions(-) diff --git a/ipyparallel/client/remotefunction.py b/ipyparallel/client/remotefunction.py index 5b7a93509..5dd0573d4 100644 --- a/ipyparallel/client/remotefunction.py +++ b/ipyparallel/client/remotefunction.py @@ -195,12 +195,14 @@ def __init__( chunksize=None, ordered=True, return_exceptions=False, + task_label=None, **flags, ): super().__init__(view, f, block=block, **flags) self.chunksize = chunksize self.ordered = ordered self.return_exceptions = return_exceptions + self.task_label = task_label mapClass = Map.dists[dist] self.mapObject = mapClass() diff --git a/ipyparallel/client/view.py b/ipyparallel/client/view.py index 53712c392..f4b25d9b3 100644 --- a/ipyparallel/client/view.py +++ b/ipyparallel/client/view.py @@ -97,6 +97,7 @@ class View(HasTraits): # flags block = Bool(False) track = Bool(False) + task_label = Any() targets = Any() history = List() @@ -592,7 +593,7 @@ def _really_apply( return ar @sync_results - def map(self, f, *sequences, block=None, track=False, return_exceptions=False): + def map(self, f, *sequences, block=None, track=False, return_exceptions=False, task_label=None): """Parallel version of builtin `map`, using this View's `targets`. There will be one task per target, so work will be chunked @@ -1036,7 +1037,7 @@ def _broadcast_map(f, *sequence_names): return list(map(f, *sequences)) @_not_coalescing - def map(self, f, *sequences, block=None, track=False, return_exceptions=False): + def map(self, f, *sequences, block=None, track=False, return_exceptions=False, task_label=None): """Parallel version of builtin `map`, using this View's `targets`. There will be one task per engine, so work will be chunked @@ -1360,6 +1361,8 @@ def _really_apply( metadata = dict( after=after, follow=follow, timeout=timeout, targets=idents, retries=retries ) + if self.task_label: + metadata["task_label"] = self.task_label future = self.client.send_apply_request( self._socket, f, args, kwargs, track=track, metadata=metadata @@ -1389,6 +1392,7 @@ def map( chunksize=1, ordered=True, return_exceptions=False, + task_label=None, ): """Parallel version of builtin `map`, load-balanced by this View. @@ -1433,9 +1437,10 @@ def map( # default if block is None: block = self.block - assert len(sequences) > 0, "must have some sequences to map onto!" + self.task_label = task_label # just for testing + pf = ParallelFunction( self, f, @@ -1443,6 +1448,7 @@ def map( chunksize=chunksize, ordered=ordered, return_exceptions=return_exceptions, + task_label=task_label, ) return pf.map(*sequences) diff --git a/ipyparallel/controller/dictdb.py b/ipyparallel/controller/dictdb.py index 4091e43af..6f81bbc09 100644 --- a/ipyparallel/controller/dictdb.py +++ b/ipyparallel/controller/dictdb.py @@ -61,6 +61,20 @@ '$exists': lambda a, b: (b and a is not None) or (a is None and not b), } +def _debug_output(where, msg, stack=False): + import inspect, traceback + with open("d:/dictdb_log.txt", "a") as f: + f.write(f"{where} [{datetime.now()}]: {msg['msg_id']}\n") + f.write(f"\tmsg={msg}\n") + #if.write(f"has metadata={'metadata' in msg}\n") + #if 'metadata' in msg: + # f.write(f"{msg['metadata']}\n") + f.write(f"has result_metadata={'result_metadata' in msg}\n") + if 'result_metadata' in msg: + f.write(f"{msg['result_metadata']}\n") + if stack: + for line in traceback.format_stack(): + f.write(line.strip()+"\n") def _add_tz(obj): if isinstance(obj, datetime): @@ -240,6 +254,7 @@ def add_record(self, msg_id, rec): """Add a new Task Record, by msg_id.""" if msg_id in self._records: raise KeyError(f"Already have msg_id {msg_id!r}") + #_debug_output("add_record", rec, False) self._check_dates(rec) self._records[msg_id] = rec self._add_bytes(rec) @@ -257,6 +272,7 @@ def update_record(self, msg_id, rec): """Update the data in an existing record.""" if msg_id in self._culled_ids: raise KeyError(f"Record {msg_id!r} has been culled for size") + #_debug_output("update_record", rec) self._check_dates(rec) _rec = self._records[msg_id] self._drop_bytes(_rec) diff --git a/ipyparallel/controller/hub.py b/ipyparallel/controller/hub.py index b166fbce2..23b14d7b3 100644 --- a/ipyparallel/controller/hub.py +++ b/ipyparallel/controller/hub.py @@ -50,6 +50,20 @@ def _printer(*args, **kwargs): print(kwargs) +def _debug_output(where, msg): + with open("d:/hub_log.txt", "a") as f: + f.write(f"{where} [{datetime.now()}]: ") + if 'msg_id' in msg: + f.write(f"{msg['msg_id']}\n") + else: + f.write(f"{msg}\n\n\n") + return + f.write(f"has metadata={'metadata' in msg}\n") + if 'metadata' in msg: + f.write(f"{msg['metadata']}\n") + f.write(f"{msg}\n\n\n") + + def empty_record(): """Return an empty dict with all record keys.""" return { @@ -75,6 +89,7 @@ def empty_record(): 'error': None, 'stdout': '', 'stderr': '', + 'task_label': None, } @@ -111,6 +126,7 @@ def init_record(msg): 'error': None, 'stdout': '', 'stderr': '', + 'task_label': msg['metadata'].get('task_label', None), } @@ -330,6 +346,9 @@ def dispatch_monitor_traffic(self, msg): return handler = self.monitor_handlers.get(switch, None) if handler is not None: + #if switch == b'intask': + # _debug_output(f"dispatch_monitor_traffic ({switch} - {handler})", msg) + handler(idents, msg) else: self.log.error("Unrecognized monitor topic: %r", switch) @@ -470,6 +489,8 @@ def save_queue_request(self, idents, msg): record['client_uuid'] = msg['header']['session'] record['queue'] = 'mux' + #_debug_output('save_queue_request', msg) + try: # it's posible iopub arrived first: existing = self.db.get_record(msg_id) @@ -516,6 +537,8 @@ def save_queue_result(self, idents, msg): ) return + #_debug_output('save_queue_result', msg) + eid = self.by_ident.get(queue_id, None) if eid is None: self.log.error("queue::unknown engine %r is sending a reply: ", queue_id) @@ -576,6 +599,8 @@ def save_broadcast_request(self, idents, msg): msg_id = header['msg_id'] self.pending.add(msg_id) + #_debug_output('save_broadcast_request',msg) + try: self.db.add_record(msg_id, record) except Exception as e: @@ -602,6 +627,8 @@ def save_broadcast_result(self, idents, msg): eid = self.by_ident.get(engine_uuid.encode("utf8"), None) status = md.get('status', None) + #_debug_output('save_broadcast_result', msg) + if msg_id in self.pending: self.log.info(f'broadcast:: broadcast {msg_id} finished on {eid}') self.pending.remove(msg_id) @@ -649,6 +676,8 @@ def save_task_request(self, idents, msg): return record = init_record(msg) + #_debug_output('save_task_request', msg) + record['client_uuid'] = msg['header']['session'] record['queue'] = 'task' header = msg['header'] @@ -708,6 +737,8 @@ def save_task_result(self, idents, msg): ) return + #_debug_output("save_task_result", msg) + parent = msg['parent_header'] if not parent: # print msg @@ -761,6 +792,8 @@ def save_task_destination(self, idents, msg): except Exception: self.log.error("task::invalid task tracking message", exc_info=True) return + #_debug_output("save_task_destination", msg) + content = msg['content'] # print (content) msg_id = content['msg_id'] @@ -788,6 +821,7 @@ def monitor_iopub_message(self, topics, msg): except Exception: self.log.error("iopub::invalid IOPub message", exc_info=True) return + #_debug_output("monitor_iopub_message", msg) msg_type = msg['header']['msg_type'] if msg_type == 'shutdown_reply': @@ -817,6 +851,8 @@ def save_iopub_message(self, topics, msg): msg_id = parent['msg_id'] msg_type = msg['header']['msg_type'] content = msg['content'] + #_debug_output("save_iopub_message", msg) + #_debug_output("save_iopub_message[parent]", parent) # ensure msg_id is in db try: @@ -871,6 +907,7 @@ def connection_request(self, client_id, msg): for eid, ec in self.engines.items(): jsonable[str(eid)] = ec.uuid content['engines'] = jsonable + #_debug_output("connection_request", msg) self.session.send( self.query, 'connection_reply', content, parent=msg, ident=client_id ) diff --git a/ipyparallel/controller/task_scheduler.py b/ipyparallel/controller/task_scheduler.py index b354b96f6..1389c75dd 100644 --- a/ipyparallel/controller/task_scheduler.py +++ b/ipyparallel/controller/task_scheduler.py @@ -18,6 +18,12 @@ # Chooser functions # ---------------------------------------------------------------------- +def _debug_output(where, msg): + with open("d:/task_scheduler_log.txt", "a") as f: + f.write(f"{where}: {msg['msg_id']}\n") + f.write(f"has metadata={'metadata' in msg}\n") + if 'metadata' in msg: + f.write(f"{msg['metadata']}\n\n") def plainrandom(loads): """Plain random pick.""" @@ -350,6 +356,8 @@ def dispatch_submission(self, raw_msg): # send to monitor self.mon_stream.send_multipart([b'intask'] + raw_msg, copy=False) + #_debug_output("dispatch_submission",msg) + header = msg['header'] md = msg['metadata'] msg_id = header['msg_id'] diff --git a/ipyparallel/engine/kernel.py b/ipyparallel/engine/kernel.py index 8c9001677..c27fabb27 100644 --- a/ipyparallel/engine/kernel.py +++ b/ipyparallel/engine/kernel.py @@ -76,6 +76,7 @@ def init_metadata(self, parent): 'is_broadcast': parent_metadata.get('is_broadcast', False), 'is_coalescing': parent_metadata.get('is_coalescing', False), 'original_msg_id': parent_metadata.get('original_msg_id', ''), + 'task_label': parent_metadata.get('task_label', None), } def finish_metadata(self, parent, metadata, reply_content): From a51f228807a37c50988e71195a4104f9a1aaa6ed Mon Sep 17 00:00:00 2001 From: Johannes Otepka Date: Fri, 23 Jan 2026 11:08:56 +0100 Subject: [PATCH 02/12] _debug_output function removed --- ipyparallel/controller/hub.py | 37 ------------------------ ipyparallel/controller/task_scheduler.py | 9 ------ 2 files changed, 46 deletions(-) diff --git a/ipyparallel/controller/hub.py b/ipyparallel/controller/hub.py index 23b14d7b3..59578c706 100644 --- a/ipyparallel/controller/hub.py +++ b/ipyparallel/controller/hub.py @@ -50,20 +50,6 @@ def _printer(*args, **kwargs): print(kwargs) -def _debug_output(where, msg): - with open("d:/hub_log.txt", "a") as f: - f.write(f"{where} [{datetime.now()}]: ") - if 'msg_id' in msg: - f.write(f"{msg['msg_id']}\n") - else: - f.write(f"{msg}\n\n\n") - return - f.write(f"has metadata={'metadata' in msg}\n") - if 'metadata' in msg: - f.write(f"{msg['metadata']}\n") - f.write(f"{msg}\n\n\n") - - def empty_record(): """Return an empty dict with all record keys.""" return { @@ -346,9 +332,6 @@ def dispatch_monitor_traffic(self, msg): return handler = self.monitor_handlers.get(switch, None) if handler is not None: - #if switch == b'intask': - # _debug_output(f"dispatch_monitor_traffic ({switch} - {handler})", msg) - handler(idents, msg) else: self.log.error("Unrecognized monitor topic: %r", switch) @@ -489,8 +472,6 @@ def save_queue_request(self, idents, msg): record['client_uuid'] = msg['header']['session'] record['queue'] = 'mux' - #_debug_output('save_queue_request', msg) - try: # it's posible iopub arrived first: existing = self.db.get_record(msg_id) @@ -537,8 +518,6 @@ def save_queue_result(self, idents, msg): ) return - #_debug_output('save_queue_result', msg) - eid = self.by_ident.get(queue_id, None) if eid is None: self.log.error("queue::unknown engine %r is sending a reply: ", queue_id) @@ -599,8 +578,6 @@ def save_broadcast_request(self, idents, msg): msg_id = header['msg_id'] self.pending.add(msg_id) - #_debug_output('save_broadcast_request',msg) - try: self.db.add_record(msg_id, record) except Exception as e: @@ -627,8 +604,6 @@ def save_broadcast_result(self, idents, msg): eid = self.by_ident.get(engine_uuid.encode("utf8"), None) status = md.get('status', None) - #_debug_output('save_broadcast_result', msg) - if msg_id in self.pending: self.log.info(f'broadcast:: broadcast {msg_id} finished on {eid}') self.pending.remove(msg_id) @@ -676,8 +651,6 @@ def save_task_request(self, idents, msg): return record = init_record(msg) - #_debug_output('save_task_request', msg) - record['client_uuid'] = msg['header']['session'] record['queue'] = 'task' header = msg['header'] @@ -737,8 +710,6 @@ def save_task_result(self, idents, msg): ) return - #_debug_output("save_task_result", msg) - parent = msg['parent_header'] if not parent: # print msg @@ -792,8 +763,6 @@ def save_task_destination(self, idents, msg): except Exception: self.log.error("task::invalid task tracking message", exc_info=True) return - #_debug_output("save_task_destination", msg) - content = msg['content'] # print (content) msg_id = content['msg_id'] @@ -821,8 +790,6 @@ def monitor_iopub_message(self, topics, msg): except Exception: self.log.error("iopub::invalid IOPub message", exc_info=True) return - #_debug_output("monitor_iopub_message", msg) - msg_type = msg['header']['msg_type'] if msg_type == 'shutdown_reply': session = msg['header']['session'] @@ -851,9 +818,6 @@ def save_iopub_message(self, topics, msg): msg_id = parent['msg_id'] msg_type = msg['header']['msg_type'] content = msg['content'] - #_debug_output("save_iopub_message", msg) - #_debug_output("save_iopub_message[parent]", parent) - # ensure msg_id is in db try: rec = self.db.get_record(msg_id) @@ -907,7 +871,6 @@ def connection_request(self, client_id, msg): for eid, ec in self.engines.items(): jsonable[str(eid)] = ec.uuid content['engines'] = jsonable - #_debug_output("connection_request", msg) self.session.send( self.query, 'connection_reply', content, parent=msg, ident=client_id ) diff --git a/ipyparallel/controller/task_scheduler.py b/ipyparallel/controller/task_scheduler.py index 1389c75dd..7865a3bbd 100644 --- a/ipyparallel/controller/task_scheduler.py +++ b/ipyparallel/controller/task_scheduler.py @@ -18,13 +18,6 @@ # Chooser functions # ---------------------------------------------------------------------- -def _debug_output(where, msg): - with open("d:/task_scheduler_log.txt", "a") as f: - f.write(f"{where}: {msg['msg_id']}\n") - f.write(f"has metadata={'metadata' in msg}\n") - if 'metadata' in msg: - f.write(f"{msg['metadata']}\n\n") - def plainrandom(loads): """Plain random pick.""" n = len(loads) @@ -356,8 +349,6 @@ def dispatch_submission(self, raw_msg): # send to monitor self.mon_stream.send_multipart([b'intask'] + raw_msg, copy=False) - #_debug_output("dispatch_submission",msg) - header = msg['header'] md = msg['metadata'] msg_id = header['msg_id'] From 0a9ee9d3673277bee2fecb8848074c5cc6fbc4a8 Mon Sep 17 00:00:00 2001 From: Johannes Otepka Date: Fri, 23 Jan 2026 11:13:46 +0100 Subject: [PATCH 03/12] _debug_output function removed --- ipyparallel/controller/dictdb.py | 17 ----------------- 1 file changed, 17 deletions(-) diff --git a/ipyparallel/controller/dictdb.py b/ipyparallel/controller/dictdb.py index 6f81bbc09..a48a754be 100644 --- a/ipyparallel/controller/dictdb.py +++ b/ipyparallel/controller/dictdb.py @@ -61,21 +61,6 @@ '$exists': lambda a, b: (b and a is not None) or (a is None and not b), } -def _debug_output(where, msg, stack=False): - import inspect, traceback - with open("d:/dictdb_log.txt", "a") as f: - f.write(f"{where} [{datetime.now()}]: {msg['msg_id']}\n") - f.write(f"\tmsg={msg}\n") - #if.write(f"has metadata={'metadata' in msg}\n") - #if 'metadata' in msg: - # f.write(f"{msg['metadata']}\n") - f.write(f"has result_metadata={'result_metadata' in msg}\n") - if 'result_metadata' in msg: - f.write(f"{msg['result_metadata']}\n") - if stack: - for line in traceback.format_stack(): - f.write(line.strip()+"\n") - def _add_tz(obj): if isinstance(obj, datetime): obj = ensure_timezone(obj) @@ -254,7 +239,6 @@ def add_record(self, msg_id, rec): """Add a new Task Record, by msg_id.""" if msg_id in self._records: raise KeyError(f"Already have msg_id {msg_id!r}") - #_debug_output("add_record", rec, False) self._check_dates(rec) self._records[msg_id] = rec self._add_bytes(rec) @@ -272,7 +256,6 @@ def update_record(self, msg_id, rec): """Update the data in an existing record.""" if msg_id in self._culled_ids: raise KeyError(f"Record {msg_id!r} has been culled for size") - #_debug_output("update_record", rec) self._check_dates(rec) _rec = self._records[msg_id] self._drop_bytes(_rec) From c0ca7eaed4e6c72d76c5bd9b355a0b6f2d15ec3b Mon Sep 17 00:00:00 2001 From: Johannes Otepka Date: Fri, 23 Jan 2026 13:00:28 +0100 Subject: [PATCH 04/12] improvements and support for direct view added --- ipyparallel/client/remotefunction.py | 2 +- ipyparallel/client/view.py | 14 +++++++------- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/ipyparallel/client/remotefunction.py b/ipyparallel/client/remotefunction.py index 5dd0573d4..8d92e928b 100644 --- a/ipyparallel/client/remotefunction.py +++ b/ipyparallel/client/remotefunction.py @@ -295,7 +295,7 @@ def __call__(self, *sequences, **kwargs): view = self.view if balanced else client[t] with view.temp_flags(block=False, **self.flags): - ar = view.apply(f, *args) + ar = view.apply(f, *args, task_label=self.task_label) # is this the right place to insert the task_label? ar.owner = False msg_id = ar.msg_ids[0] diff --git a/ipyparallel/client/view.py b/ipyparallel/client/view.py index f4b25d9b3..dbdc707ba 100644 --- a/ipyparallel/client/view.py +++ b/ipyparallel/client/view.py @@ -97,7 +97,6 @@ class View(HasTraits): # flags block = Bool(False) track = Bool(False) - task_label = Any() targets = Any() history = List() @@ -567,13 +566,16 @@ def _really_apply( _idents, _targets = self.client._build_targets(targets) futures = [] + task_label = kwargs.pop("task_label") if "task_label" in kwargs else None # is this the correct/best way of retieving task_label? + metadata = dict(task_label=task_label) + pf = PrePickled(f) pargs = [PrePickled(arg) for arg in args] pkwargs = {k: PrePickled(v) for k, v in kwargs.items()} for ident in _idents: future = self.client.send_apply_request( - self._socket, pf, pargs, pkwargs, track=track, ident=ident + self._socket, pf, pargs, pkwargs, track=track, ident=ident, metadata=metadata ) futures.append(future) if track: @@ -1356,13 +1358,13 @@ def _really_apply( # ensure *not* bytes idents = [ident.decode() for ident in idents] + task_label = kwargs.pop("task_label") if "task_label" in kwargs else None # is this the correct/best way of retieving task_label? + after = self._render_dependency(after) follow = self._render_dependency(follow) metadata = dict( - after=after, follow=follow, timeout=timeout, targets=idents, retries=retries + after=after, follow=follow, timeout=timeout, targets=idents, retries=retries, task_label=task_label ) - if self.task_label: - metadata["task_label"] = self.task_label future = self.client.send_apply_request( self._socket, f, args, kwargs, track=track, metadata=metadata @@ -1439,8 +1441,6 @@ def map( block = self.block assert len(sequences) > 0, "must have some sequences to map onto!" - self.task_label = task_label # just for testing - pf = ParallelFunction( self, f, From 4db4cf359e79898b41a0977189b631eb5746eb9c Mon Sep 17 00:00:00 2001 From: Johannes Otepka Date: Fri, 23 Jan 2026 13:12:58 +0100 Subject: [PATCH 05/12] basic task label example added --- docs/source/examples/basic_task_label.py | 36 ++++++++++++++++++++++++ 1 file changed, 36 insertions(+) create mode 100644 docs/source/examples/basic_task_label.py diff --git a/docs/source/examples/basic_task_label.py b/docs/source/examples/basic_task_label.py new file mode 100644 index 000000000..3e7c502f5 --- /dev/null +++ b/docs/source/examples/basic_task_label.py @@ -0,0 +1,36 @@ +""" Basic task label example""" + +import ipyparallel as ipp + +# start up ipp cluster with 2 engines +cluster = ipp.Cluster(n=2) +cluster.start_cluster_sync() + +rc = cluster.connect_client_sync() +rc.wait_for_engines(n=2) + +def wait(t): + import time + tic = time.time() + time.sleep(t) + return time.time()-tic + +# send tasks to cluster +balanced_view = True +if balanced_view: + # use load balanced view + dview = rc.load_balanced_view() + ar_list = [dview.map_async(wait, [2], task_label=f"task_label_{i:02}") for i in range(10)] + dview.wait(ar_list) +else: + # use direct view + dview = rc[:] + ar_list = [dview.apply_async(wait, 2, task_label=f"task_label_{i:02}") for i in range(10)] + dview.wait(ar_list) + +# query database +data = rc.db_query({'task_label': {"$nin": ""}}, keys=['msg_id', 'task_label', 'engine_uuid']) +for d in data: + print(f"msg_id={d['msg_id']}; task_label={d['task_label']}; engine_uuid={d['engine_uuid']}") + +cluster.stop_cluster_sync() \ No newline at end of file From f6d8c0abd7821c3cf07b88d5da32b912fd40fc6a Mon Sep 17 00:00:00 2001 From: Johannes Otepka Date: Fri, 23 Jan 2026 13:19:06 +0100 Subject: [PATCH 06/12] minor change: restore original formating --- ipyparallel/client/view.py | 1 + ipyparallel/controller/dictdb.py | 1 + ipyparallel/controller/hub.py | 2 ++ ipyparallel/controller/task_scheduler.py | 1 + 4 files changed, 5 insertions(+) diff --git a/ipyparallel/client/view.py b/ipyparallel/client/view.py index dbdc707ba..5b7cfdf42 100644 --- a/ipyparallel/client/view.py +++ b/ipyparallel/client/view.py @@ -1439,6 +1439,7 @@ def map( # default if block is None: block = self.block + assert len(sequences) > 0, "must have some sequences to map onto!" pf = ParallelFunction( diff --git a/ipyparallel/controller/dictdb.py b/ipyparallel/controller/dictdb.py index a48a754be..4091e43af 100644 --- a/ipyparallel/controller/dictdb.py +++ b/ipyparallel/controller/dictdb.py @@ -61,6 +61,7 @@ '$exists': lambda a, b: (b and a is not None) or (a is None and not b), } + def _add_tz(obj): if isinstance(obj, datetime): obj = ensure_timezone(obj) diff --git a/ipyparallel/controller/hub.py b/ipyparallel/controller/hub.py index 59578c706..f83a15d50 100644 --- a/ipyparallel/controller/hub.py +++ b/ipyparallel/controller/hub.py @@ -790,6 +790,7 @@ def monitor_iopub_message(self, topics, msg): except Exception: self.log.error("iopub::invalid IOPub message", exc_info=True) return + msg_type = msg['header']['msg_type'] if msg_type == 'shutdown_reply': session = msg['header']['session'] @@ -818,6 +819,7 @@ def save_iopub_message(self, topics, msg): msg_id = parent['msg_id'] msg_type = msg['header']['msg_type'] content = msg['content'] + # ensure msg_id is in db try: rec = self.db.get_record(msg_id) diff --git a/ipyparallel/controller/task_scheduler.py b/ipyparallel/controller/task_scheduler.py index 7865a3bbd..b354b96f6 100644 --- a/ipyparallel/controller/task_scheduler.py +++ b/ipyparallel/controller/task_scheduler.py @@ -18,6 +18,7 @@ # Chooser functions # ---------------------------------------------------------------------- + def plainrandom(loads): """Plain random pick.""" n = len(loads) From 161a9878377bfd7d3162f388ab9ad8cc6248d281 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 23 Jan 2026 12:46:23 +0000 Subject: [PATCH 07/12] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- docs/source/examples/basic_task_label.py | 31 +++++++++++------ ipyparallel/client/remotefunction.py | 4 ++- ipyparallel/client/view.py | 43 ++++++++++++++++++++---- 3 files changed, 61 insertions(+), 17 deletions(-) diff --git a/docs/source/examples/basic_task_label.py b/docs/source/examples/basic_task_label.py index 3e7c502f5..77f100236 100644 --- a/docs/source/examples/basic_task_label.py +++ b/docs/source/examples/basic_task_label.py @@ -1,4 +1,4 @@ -""" Basic task label example""" +"""Basic task label example""" import ipyparallel as ipp @@ -9,28 +9,39 @@ rc = cluster.connect_client_sync() rc.wait_for_engines(n=2) + def wait(t): - import time - tic = time.time() - time.sleep(t) - return time.time()-tic + import time + + tic = time.time() + time.sleep(t) + return time.time() - tic + # send tasks to cluster balanced_view = True if balanced_view: # use load balanced view dview = rc.load_balanced_view() - ar_list = [dview.map_async(wait, [2], task_label=f"task_label_{i:02}") for i in range(10)] + ar_list = [ + dview.map_async(wait, [2], task_label=f"task_label_{i:02}") for i in range(10) + ] dview.wait(ar_list) else: # use direct view dview = rc[:] - ar_list = [dview.apply_async(wait, 2, task_label=f"task_label_{i:02}") for i in range(10)] + ar_list = [ + dview.apply_async(wait, 2, task_label=f"task_label_{i:02}") for i in range(10) + ] dview.wait(ar_list) # query database -data = rc.db_query({'task_label': {"$nin": ""}}, keys=['msg_id', 'task_label', 'engine_uuid']) +data = rc.db_query( + {'task_label': {"$nin": ""}}, keys=['msg_id', 'task_label', 'engine_uuid'] +) for d in data: - print(f"msg_id={d['msg_id']}; task_label={d['task_label']}; engine_uuid={d['engine_uuid']}") + print( + f"msg_id={d['msg_id']}; task_label={d['task_label']}; engine_uuid={d['engine_uuid']}" + ) -cluster.stop_cluster_sync() \ No newline at end of file +cluster.stop_cluster_sync() diff --git a/ipyparallel/client/remotefunction.py b/ipyparallel/client/remotefunction.py index 8d92e928b..36021bfa3 100644 --- a/ipyparallel/client/remotefunction.py +++ b/ipyparallel/client/remotefunction.py @@ -295,7 +295,9 @@ def __call__(self, *sequences, **kwargs): view = self.view if balanced else client[t] with view.temp_flags(block=False, **self.flags): - ar = view.apply(f, *args, task_label=self.task_label) # is this the right place to insert the task_label? + ar = view.apply( + f, *args, task_label=self.task_label + ) # is this the right place to insert the task_label? ar.owner = False msg_id = ar.msg_ids[0] diff --git a/ipyparallel/client/view.py b/ipyparallel/client/view.py index 5b7cfdf42..855787eed 100644 --- a/ipyparallel/client/view.py +++ b/ipyparallel/client/view.py @@ -566,7 +566,9 @@ def _really_apply( _idents, _targets = self.client._build_targets(targets) futures = [] - task_label = kwargs.pop("task_label") if "task_label" in kwargs else None # is this the correct/best way of retieving task_label? + task_label = ( + kwargs.pop("task_label") if "task_label" in kwargs else None + ) # is this the correct/best way of retieving task_label? metadata = dict(task_label=task_label) pf = PrePickled(f) @@ -575,7 +577,13 @@ def _really_apply( for ident in _idents: future = self.client.send_apply_request( - self._socket, pf, pargs, pkwargs, track=track, ident=ident, metadata=metadata + self._socket, + pf, + pargs, + pkwargs, + track=track, + ident=ident, + metadata=metadata, ) futures.append(future) if track: @@ -595,7 +603,15 @@ def _really_apply( return ar @sync_results - def map(self, f, *sequences, block=None, track=False, return_exceptions=False, task_label=None): + def map( + self, + f, + *sequences, + block=None, + track=False, + return_exceptions=False, + task_label=None, + ): """Parallel version of builtin `map`, using this View's `targets`. There will be one task per target, so work will be chunked @@ -1039,7 +1055,15 @@ def _broadcast_map(f, *sequence_names): return list(map(f, *sequences)) @_not_coalescing - def map(self, f, *sequences, block=None, track=False, return_exceptions=False, task_label=None): + def map( + self, + f, + *sequences, + block=None, + track=False, + return_exceptions=False, + task_label=None, + ): """Parallel version of builtin `map`, using this View's `targets`. There will be one task per engine, so work will be chunked @@ -1358,12 +1382,19 @@ def _really_apply( # ensure *not* bytes idents = [ident.decode() for ident in idents] - task_label = kwargs.pop("task_label") if "task_label" in kwargs else None # is this the correct/best way of retieving task_label? + task_label = ( + kwargs.pop("task_label") if "task_label" in kwargs else None + ) # is this the correct/best way of retieving task_label? after = self._render_dependency(after) follow = self._render_dependency(follow) metadata = dict( - after=after, follow=follow, timeout=timeout, targets=idents, retries=retries, task_label=task_label + after=after, + follow=follow, + timeout=timeout, + targets=idents, + retries=retries, + task_label=task_label, ) future = self.client.send_apply_request( From 8c7a833a063fbfcc5aff1c9be894cb2c0ca83f5f Mon Sep 17 00:00:00 2001 From: Johannes Otepka Date: Mon, 26 Jan 2026 07:54:56 +0100 Subject: [PATCH 08/12] 'task_label' renamed to 'label' --- docs/source/examples/basic_task_label.py | 8 ++++---- ipyparallel/client/remotefunction.py | 8 ++++---- ipyparallel/client/view.py | 24 ++++++++++++------------ ipyparallel/controller/hub.py | 4 ++-- ipyparallel/engine/kernel.py | 2 +- 5 files changed, 23 insertions(+), 23 deletions(-) diff --git a/docs/source/examples/basic_task_label.py b/docs/source/examples/basic_task_label.py index 77f100236..a8075427f 100644 --- a/docs/source/examples/basic_task_label.py +++ b/docs/source/examples/basic_task_label.py @@ -24,24 +24,24 @@ def wait(t): # use load balanced view dview = rc.load_balanced_view() ar_list = [ - dview.map_async(wait, [2], task_label=f"task_label_{i:02}") for i in range(10) + dview.map_async(wait, [2], label=f"mylabel_{i:02}") for i in range(10) ] dview.wait(ar_list) else: # use direct view dview = rc[:] ar_list = [ - dview.apply_async(wait, 2, task_label=f"task_label_{i:02}") for i in range(10) + dview.apply_async(wait, 2, label=f"mylabel_{i:02}") for i in range(10) ] dview.wait(ar_list) # query database data = rc.db_query( - {'task_label': {"$nin": ""}}, keys=['msg_id', 'task_label', 'engine_uuid'] + {'label': {"$nin": ""}}, keys=['msg_id', 'label', 'engine_uuid'] ) for d in data: print( - f"msg_id={d['msg_id']}; task_label={d['task_label']}; engine_uuid={d['engine_uuid']}" + f"msg_id={d['msg_id']}; label={d['label']}; engine_uuid={d['engine_uuid']}" ) cluster.stop_cluster_sync() diff --git a/ipyparallel/client/remotefunction.py b/ipyparallel/client/remotefunction.py index 36021bfa3..ae2a65dce 100644 --- a/ipyparallel/client/remotefunction.py +++ b/ipyparallel/client/remotefunction.py @@ -195,14 +195,14 @@ def __init__( chunksize=None, ordered=True, return_exceptions=False, - task_label=None, + label=None, **flags, ): super().__init__(view, f, block=block, **flags) self.chunksize = chunksize self.ordered = ordered self.return_exceptions = return_exceptions - self.task_label = task_label + self.label = label mapClass = Map.dists[dist] self.mapObject = mapClass() @@ -296,8 +296,8 @@ def __call__(self, *sequences, **kwargs): view = self.view if balanced else client[t] with view.temp_flags(block=False, **self.flags): ar = view.apply( - f, *args, task_label=self.task_label - ) # is this the right place to insert the task_label? + f, *args, label=self.label + ) # is this the right place to insert the label? ar.owner = False msg_id = ar.msg_ids[0] diff --git a/ipyparallel/client/view.py b/ipyparallel/client/view.py index 855787eed..33eb14778 100644 --- a/ipyparallel/client/view.py +++ b/ipyparallel/client/view.py @@ -566,10 +566,10 @@ def _really_apply( _idents, _targets = self.client._build_targets(targets) futures = [] - task_label = ( - kwargs.pop("task_label") if "task_label" in kwargs else None - ) # is this the correct/best way of retieving task_label? - metadata = dict(task_label=task_label) + label = ( + kwargs.pop("label") if "label" in kwargs else None + ) # is this the correct/best way of retieving label? + metadata = dict(label=label) pf = PrePickled(f) pargs = [PrePickled(arg) for arg in args] @@ -610,7 +610,7 @@ def map( block=None, track=False, return_exceptions=False, - task_label=None, + label=None, ): """Parallel version of builtin `map`, using this View's `targets`. @@ -1062,7 +1062,7 @@ def map( block=None, track=False, return_exceptions=False, - task_label=None, + label=None, ): """Parallel version of builtin `map`, using this View's `targets`. @@ -1382,9 +1382,9 @@ def _really_apply( # ensure *not* bytes idents = [ident.decode() for ident in idents] - task_label = ( - kwargs.pop("task_label") if "task_label" in kwargs else None - ) # is this the correct/best way of retieving task_label? + label = ( + kwargs.pop("label") if "label" in kwargs else None + ) # is this the correct/best way of retieving label? after = self._render_dependency(after) follow = self._render_dependency(follow) @@ -1394,7 +1394,7 @@ def _really_apply( timeout=timeout, targets=idents, retries=retries, - task_label=task_label, + label=label, ) future = self.client.send_apply_request( @@ -1425,7 +1425,7 @@ def map( chunksize=1, ordered=True, return_exceptions=False, - task_label=None, + label=None, ): """Parallel version of builtin `map`, load-balanced by this View. @@ -1480,7 +1480,7 @@ def map( chunksize=chunksize, ordered=ordered, return_exceptions=return_exceptions, - task_label=task_label, + label=label, ) return pf.map(*sequences) diff --git a/ipyparallel/controller/hub.py b/ipyparallel/controller/hub.py index f83a15d50..282ffa10f 100644 --- a/ipyparallel/controller/hub.py +++ b/ipyparallel/controller/hub.py @@ -75,7 +75,7 @@ def empty_record(): 'error': None, 'stdout': '', 'stderr': '', - 'task_label': None, + 'label': None, } @@ -112,7 +112,7 @@ def init_record(msg): 'error': None, 'stdout': '', 'stderr': '', - 'task_label': msg['metadata'].get('task_label', None), + 'label': msg['metadata'].get('label', None), } diff --git a/ipyparallel/engine/kernel.py b/ipyparallel/engine/kernel.py index c27fabb27..f346371c9 100644 --- a/ipyparallel/engine/kernel.py +++ b/ipyparallel/engine/kernel.py @@ -76,7 +76,7 @@ def init_metadata(self, parent): 'is_broadcast': parent_metadata.get('is_broadcast', False), 'is_coalescing': parent_metadata.get('is_coalescing', False), 'original_msg_id': parent_metadata.get('original_msg_id', ''), - 'task_label': parent_metadata.get('task_label', None), + 'label': parent_metadata.get('label', None), } def finish_metadata(self, parent, metadata, reply_content): From 133a9f7de78a35b13962b3897248d2b8477a5ead Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 26 Jan 2026 06:55:17 +0000 Subject: [PATCH 09/12] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- docs/source/examples/basic_task_label.py | 16 ++++------------ 1 file changed, 4 insertions(+), 12 deletions(-) diff --git a/docs/source/examples/basic_task_label.py b/docs/source/examples/basic_task_label.py index a8075427f..c7880337c 100644 --- a/docs/source/examples/basic_task_label.py +++ b/docs/source/examples/basic_task_label.py @@ -23,25 +23,17 @@ def wait(t): if balanced_view: # use load balanced view dview = rc.load_balanced_view() - ar_list = [ - dview.map_async(wait, [2], label=f"mylabel_{i:02}") for i in range(10) - ] + ar_list = [dview.map_async(wait, [2], label=f"mylabel_{i:02}") for i in range(10)] dview.wait(ar_list) else: # use direct view dview = rc[:] - ar_list = [ - dview.apply_async(wait, 2, label=f"mylabel_{i:02}") for i in range(10) - ] + ar_list = [dview.apply_async(wait, 2, label=f"mylabel_{i:02}") for i in range(10)] dview.wait(ar_list) # query database -data = rc.db_query( - {'label': {"$nin": ""}}, keys=['msg_id', 'label', 'engine_uuid'] -) +data = rc.db_query({'label': {"$nin": ""}}, keys=['msg_id', 'label', 'engine_uuid']) for d in data: - print( - f"msg_id={d['msg_id']}; label={d['label']}; engine_uuid={d['engine_uuid']}" - ) + print(f"msg_id={d['msg_id']}; label={d['label']}; engine_uuid={d['engine_uuid']}") cluster.stop_cluster_sync() From 182fa7e44fc5d704aa5a22e2f46adc57507da943 Mon Sep 17 00:00:00 2001 From: Johannes Otepka Date: Mon, 26 Jan 2026 11:56:14 +0100 Subject: [PATCH 10/12] try to use existing mechanism to communicate the label parameter --- docs/source/examples/basic_task_label.py | 29 +++++++++++------------- ipyparallel/client/client.py | 1 + ipyparallel/client/remotefunction.py | 6 ++--- ipyparallel/client/view.py | 25 +++++++++++--------- 4 files changed, 30 insertions(+), 31 deletions(-) diff --git a/docs/source/examples/basic_task_label.py b/docs/source/examples/basic_task_label.py index a8075427f..8683a8778 100644 --- a/docs/source/examples/basic_task_label.py +++ b/docs/source/examples/basic_task_label.py @@ -18,22 +18,19 @@ def wait(t): return time.time() - tic -# send tasks to cluster -balanced_view = True -if balanced_view: - # use load balanced view - dview = rc.load_balanced_view() - ar_list = [ - dview.map_async(wait, [2], label=f"mylabel_{i:02}") for i in range(10) - ] - dview.wait(ar_list) -else: - # use direct view - dview = rc[:] - ar_list = [ - dview.apply_async(wait, 2, label=f"mylabel_{i:02}") for i in range(10) - ] - dview.wait(ar_list) +# use load balanced view +bview = rc.load_balanced_view() +ar_list1 = [ + bview.map_async(wait, [2], label=f"mylabel_{i:02}") for i in range(10) +] +bview.wait(ar_list1) + +# use direct view +dview = rc[:] +ar_list2 = [ + dview.apply_async(wait, 2, label=f"mylabel_{i+10:02}") for i in range(10) +] +dview.wait(ar_list2) # query database data = rc.db_query( diff --git a/ipyparallel/client/client.py b/ipyparallel/client/client.py index 8fec6f116..5a64fa736 100644 --- a/ipyparallel/client/client.py +++ b/ipyparallel/client/client.py @@ -220,6 +220,7 @@ def __init__(self, *args, **kwargs): 'stderr': '', 'outputs': [], 'data': {}, + 'label': None, } self.update(md) self.update(dict(*args, **kwargs)) diff --git a/ipyparallel/client/remotefunction.py b/ipyparallel/client/remotefunction.py index ae2a65dce..8baee3e65 100644 --- a/ipyparallel/client/remotefunction.py +++ b/ipyparallel/client/remotefunction.py @@ -195,14 +195,12 @@ def __init__( chunksize=None, ordered=True, return_exceptions=False, - label=None, **flags, ): super().__init__(view, f, block=block, **flags) self.chunksize = chunksize self.ordered = ordered self.return_exceptions = return_exceptions - self.label = label mapClass = Map.dists[dist] self.mapObject = mapClass() @@ -296,8 +294,8 @@ def __call__(self, *sequences, **kwargs): view = self.view if balanced else client[t] with view.temp_flags(block=False, **self.flags): ar = view.apply( - f, *args, label=self.label - ) # is this the right place to insert the label? + f, *args + ) ar.owner = False msg_id = ar.msg_ids[0] diff --git a/ipyparallel/client/view.py b/ipyparallel/client/view.py index 33eb14778..dc6560e24 100644 --- a/ipyparallel/client/view.py +++ b/ipyparallel/client/view.py @@ -530,7 +530,7 @@ def use_pickle(self): @sync_results @save_ids def _really_apply( - self, f, args=None, kwargs=None, targets=None, block=None, track=None + self, f, args=None, kwargs=None, targets=None, block=None, track=None, label=None ): """calls f(*args, **kwargs) on remote engines, returning the result. @@ -562,15 +562,14 @@ def _really_apply( block = self.block if block is None else block track = self.track if track is None else track targets = self.targets if targets is None else targets - - _idents, _targets = self.client._build_targets(targets) - futures = [] - label = ( - kwargs.pop("label") if "label" in kwargs else None + kwargs.pop("label") if "label" in kwargs and label is None else label ) # is this the correct/best way of retieving label? metadata = dict(label=label) + _idents, _targets = self.client._build_targets(targets) + futures = [] + pf = PrePickled(f) pargs = [PrePickled(arg) for arg in args] pkwargs = {k: PrePickled(v) for k, v in kwargs.items()} @@ -1203,10 +1202,11 @@ class LoadBalancedView(View): after = Any() timeout = CFloat() retries = Integer(0) + label = Any() _task_scheme = Any() _flag_names = List( - ['targets', 'block', 'track', 'follow', 'after', 'timeout', 'retries'] + ['targets', 'block', 'track', 'follow', 'after', 'timeout', 'retries', 'label'] ) _outstanding_maps = Set() @@ -1301,6 +1301,11 @@ def set_flags(self, **kwargs): raise ValueError(f"Invalid timeout: {t}") self.timeout = t + if 'label' in kwargs: + l = kwargs['label'] + if not isinstance(l, (str, type(None))): + raise TypeError(f"Invalid type for label: {type(l)!r}") + self.label = l @sync_results @save_ids @@ -1316,6 +1321,7 @@ def _really_apply( timeout=None, targets=None, retries=None, + label=None, ): """calls f(*args, **kwargs) on a remote engine, returning the result. @@ -1371,6 +1377,7 @@ def _really_apply( follow = self.follow if follow is None else follow timeout = self.timeout if timeout is None else timeout targets = self.targets if targets is None else targets + label = self.label if label is None else label if not isinstance(retries, int): raise TypeError(f'retries must be int, not {type(retries)!r}') @@ -1382,10 +1389,6 @@ def _really_apply( # ensure *not* bytes idents = [ident.decode() for ident in idents] - label = ( - kwargs.pop("label") if "label" in kwargs else None - ) # is this the correct/best way of retieving label? - after = self._render_dependency(after) follow = self._render_dependency(follow) metadata = dict( From 65b9cd3ea8edf2160de510aab1fd5ea407723355 Mon Sep 17 00:00:00 2001 From: Johannes Otepka Date: Mon, 26 Jan 2026 12:01:21 +0100 Subject: [PATCH 11/12] formating corrected --- docs/source/examples/basic_task_label.py | 16 ++++------------ ipyparallel/client/remotefunction.py | 4 +--- ipyparallel/client/view.py | 9 ++++++++- 3 files changed, 13 insertions(+), 16 deletions(-) diff --git a/docs/source/examples/basic_task_label.py b/docs/source/examples/basic_task_label.py index 8683a8778..508b78336 100644 --- a/docs/source/examples/basic_task_label.py +++ b/docs/source/examples/basic_task_label.py @@ -20,25 +20,17 @@ def wait(t): # use load balanced view bview = rc.load_balanced_view() -ar_list1 = [ - bview.map_async(wait, [2], label=f"mylabel_{i:02}") for i in range(10) -] +ar_list1 = [bview.map_async(wait, [2], label=f"mylabel_{i:02}") for i in range(10)] bview.wait(ar_list1) # use direct view dview = rc[:] -ar_list2 = [ - dview.apply_async(wait, 2, label=f"mylabel_{i+10:02}") for i in range(10) -] +ar_list2 = [dview.apply_async(wait, 2, label=f"mylabel_{i + 10:02}") for i in range(10)] dview.wait(ar_list2) # query database -data = rc.db_query( - {'label': {"$nin": ""}}, keys=['msg_id', 'label', 'engine_uuid'] -) +data = rc.db_query({'label': {"$nin": ""}}, keys=['msg_id', 'label', 'engine_uuid']) for d in data: - print( - f"msg_id={d['msg_id']}; label={d['label']}; engine_uuid={d['engine_uuid']}" - ) + print(f"msg_id={d['msg_id']}; label={d['label']}; engine_uuid={d['engine_uuid']}") cluster.stop_cluster_sync() diff --git a/ipyparallel/client/remotefunction.py b/ipyparallel/client/remotefunction.py index 8baee3e65..5b7a93509 100644 --- a/ipyparallel/client/remotefunction.py +++ b/ipyparallel/client/remotefunction.py @@ -293,9 +293,7 @@ def __call__(self, *sequences, **kwargs): view = self.view if balanced else client[t] with view.temp_flags(block=False, **self.flags): - ar = view.apply( - f, *args - ) + ar = view.apply(f, *args) ar.owner = False msg_id = ar.msg_ids[0] diff --git a/ipyparallel/client/view.py b/ipyparallel/client/view.py index dc6560e24..1c4ac4a96 100644 --- a/ipyparallel/client/view.py +++ b/ipyparallel/client/view.py @@ -530,7 +530,14 @@ def use_pickle(self): @sync_results @save_ids def _really_apply( - self, f, args=None, kwargs=None, targets=None, block=None, track=None, label=None + self, + f, + args=None, + kwargs=None, + targets=None, + block=None, + track=None, + label=None, ): """calls f(*args, **kwargs) on remote engines, returning the result. From 1b87befaaaf1b9605353ffdb7868772119a1ad0f Mon Sep 17 00:00:00 2001 From: Johannes Otepka Date: Mon, 26 Jan 2026 13:09:08 +0100 Subject: [PATCH 12/12] corrections to get map and apply to work for balanced and direct view --- docs/source/examples/basic_task_label.py | 21 ++++++++++++++---- ipyparallel/client/view.py | 27 ++++++++++++++++-------- 2 files changed, 35 insertions(+), 13 deletions(-) diff --git a/docs/source/examples/basic_task_label.py b/docs/source/examples/basic_task_label.py index 508b78336..6a14f931d 100644 --- a/docs/source/examples/basic_task_label.py +++ b/docs/source/examples/basic_task_label.py @@ -20,13 +20,26 @@ def wait(t): # use load balanced view bview = rc.load_balanced_view() -ar_list1 = [bview.map_async(wait, [2], label=f"mylabel_{i:02}") for i in range(10)] -bview.wait(ar_list1) +ar_list_b1 = [ + bview.map_async(wait, [2], label=f"mylabel_map_{i:02}") for i in range(10) +] +ar_list_b2 = [ + bview.apply_async(wait, 2, label=f"mylabel_apply_{i:02}") for i in range(10) +] +bview.wait(ar_list_b1) +bview.wait(ar_list_b2) + # use direct view dview = rc[:] -ar_list2 = [dview.apply_async(wait, 2, label=f"mylabel_{i + 10:02}") for i in range(10)] -dview.wait(ar_list2) +ar_list_d1 = [ + dview.apply_async(wait, 2, label=f"mylabel_map_{i + 10:02}") for i in range(10) +] +ar_list_d2 = [ + dview.map_async(wait, [2], label=f"mylabel_apply_{i + 10:02}") for i in range(10) +] +dview.wait(ar_list_d1) +dview.wait(ar_list_d2) # query database data = rc.db_query({'label': {"$nin": ""}}, keys=['msg_id', 'label', 'engine_uuid']) diff --git a/ipyparallel/client/view.py b/ipyparallel/client/view.py index 1c4ac4a96..66ec28f30 100644 --- a/ipyparallel/client/view.py +++ b/ipyparallel/client/view.py @@ -98,6 +98,7 @@ class View(HasTraits): block = Bool(False) track = Bool(False) targets = Any() + label = Any() history = List() outstanding = Set() @@ -105,7 +106,7 @@ class View(HasTraits): client = Instance('ipyparallel.Client', allow_none=True) _socket = Any() - _flag_names = List(['targets', 'block', 'track']) + _flag_names = List(['targets', 'block', 'track', 'label']) _in_sync_results = Bool(False) _targets = Any() _idents = Any() @@ -569,9 +570,12 @@ def _really_apply( block = self.block if block is None else block track = self.track if track is None else track targets = self.targets if targets is None else targets + label = ( + self.label if label is None else label + ) # comes into play when calling map[_async] (self.label) label = ( kwargs.pop("label") if "label" in kwargs and label is None else label - ) # is this the correct/best way of retieving label? + ) # this is required can calling apply[_async] metadata = dict(label=label) _idents, _targets = self.client._build_targets(targets) @@ -658,7 +662,12 @@ def map( assert len(sequences) > 0, "must have some sequences to map onto!" pf = ParallelFunction( - self, f, block=block, track=track, return_exceptions=return_exceptions + self, + f, + block=block, + track=track, + return_exceptions=return_exceptions, + label=label, ) return pf.map(*sequences) @@ -1308,11 +1317,6 @@ def set_flags(self, **kwargs): raise ValueError(f"Invalid timeout: {t}") self.timeout = t - if 'label' in kwargs: - l = kwargs['label'] - if not isinstance(l, (str, type(None))): - raise TypeError(f"Invalid type for label: {type(l)!r}") - self.label = l @sync_results @save_ids @@ -1384,7 +1388,12 @@ def _really_apply( follow = self.follow if follow is None else follow timeout = self.timeout if timeout is None else timeout targets = self.targets if targets is None else targets - label = self.label if label is None else label + label = ( + self.label if label is None else label + ) # comes into play when calling map[_async] (self.label) + label = ( + kwargs.pop("label") if "label" in kwargs and label is None else label + ) # this is required can calling apply[_async] if not isinstance(retries, int): raise TypeError(f'retries must be int, not {type(retries)!r}')