Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions docs/source/examples/basic_task_label.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
"""Basic task label example"""

import ipyparallel as ipp

# start up ipp cluster with 2 engines
cluster = ipp.Cluster(n=2)
cluster.start_cluster_sync()

rc = cluster.connect_client_sync()
rc.wait_for_engines(n=2)


def wait(t):
import time

tic = time.time()
time.sleep(t)
return time.time() - tic


# use load balanced view
bview = rc.load_balanced_view()
ar_list_b1 = [
bview.map_async(wait, [2], label=f"mylabel_map_{i:02}") for i in range(10)
]
ar_list_b2 = [
bview.apply_async(wait, 2, label=f"mylabel_apply_{i:02}") for i in range(10)
]
bview.wait(ar_list_b1)
bview.wait(ar_list_b2)


# use direct view
dview = rc[:]
ar_list_d1 = [
dview.apply_async(wait, 2, label=f"mylabel_map_{i + 10:02}") for i in range(10)
]
ar_list_d2 = [
dview.map_async(wait, [2], label=f"mylabel_apply_{i + 10:02}") for i in range(10)
]
dview.wait(ar_list_d1)
dview.wait(ar_list_d2)

# query database
data = rc.db_query({'label': {"$nin": ""}}, keys=['msg_id', 'label', 'engine_uuid'])
for d in data:
print(f"msg_id={d['msg_id']}; label={d['label']}; engine_uuid={d['engine_uuid']}")

cluster.stop_cluster_sync()
1 change: 1 addition & 0 deletions ipyparallel/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ def __init__(self, *args, **kwargs):
'stderr': '',
'outputs': [],
'data': {},
'label': None,
}
self.update(md)
self.update(dict(*args, **kwargs))
Expand Down
73 changes: 65 additions & 8 deletions ipyparallel/client/view.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,14 +98,15 @@ class View(HasTraits):
block = Bool(False)
track = Bool(False)
targets = Any()
label = Any()

history = List()
outstanding = Set()
results = Dict()
client = Instance('ipyparallel.Client', allow_none=True)

_socket = Any()
_flag_names = List(['targets', 'block', 'track'])
_flag_names = List(['targets', 'block', 'track', 'label'])
_in_sync_results = Bool(False)
_targets = Any()
_idents = Any()
Expand Down Expand Up @@ -530,7 +531,14 @@ def use_pickle(self):
@sync_results
@save_ids
def _really_apply(
self, f, args=None, kwargs=None, targets=None, block=None, track=None
self,
f,
args=None,
kwargs=None,
targets=None,
block=None,
track=None,
label=None,
):
"""calls f(*args, **kwargs) on remote engines, returning the result.

Expand Down Expand Up @@ -562,6 +570,13 @@ def _really_apply(
block = self.block if block is None else block
track = self.track if track is None else track
targets = self.targets if targets is None else targets
label = (
self.label if label is None else label
) # comes into play when calling map[_async] (self.label)
label = (
kwargs.pop("label") if "label" in kwargs and label is None else label
) # this is required can calling apply[_async]
metadata = dict(label=label)

_idents, _targets = self.client._build_targets(targets)
futures = []
Expand All @@ -572,7 +587,13 @@ def _really_apply(

for ident in _idents:
future = self.client.send_apply_request(
self._socket, pf, pargs, pkwargs, track=track, ident=ident
self._socket,
pf,
pargs,
pkwargs,
track=track,
ident=ident,
metadata=metadata,
)
futures.append(future)
if track:
Expand All @@ -592,7 +613,15 @@ def _really_apply(
return ar

@sync_results
def map(self, f, *sequences, block=None, track=False, return_exceptions=False):
def map(
self,
f,
*sequences,
block=None,
track=False,
return_exceptions=False,
label=None,
):
"""Parallel version of builtin `map`, using this View's `targets`.

There will be one task per target, so work will be chunked
Expand Down Expand Up @@ -633,7 +662,12 @@ def map(self, f, *sequences, block=None, track=False, return_exceptions=False):

assert len(sequences) > 0, "must have some sequences to map onto!"
pf = ParallelFunction(
self, f, block=block, track=track, return_exceptions=return_exceptions
self,
f,
block=block,
track=track,
return_exceptions=return_exceptions,
label=label,
)
return pf.map(*sequences)

Expand Down Expand Up @@ -1036,7 +1070,15 @@ def _broadcast_map(f, *sequence_names):
return list(map(f, *sequences))

@_not_coalescing
def map(self, f, *sequences, block=None, track=False, return_exceptions=False):
def map(
self,
f,
*sequences,
block=None,
track=False,
return_exceptions=False,
label=None,
):
"""Parallel version of builtin `map`, using this View's `targets`.

There will be one task per engine, so work will be chunked
Expand Down Expand Up @@ -1176,10 +1218,11 @@ class LoadBalancedView(View):
after = Any()
timeout = CFloat()
retries = Integer(0)
label = Any()

_task_scheme = Any()
_flag_names = List(
['targets', 'block', 'track', 'follow', 'after', 'timeout', 'retries']
['targets', 'block', 'track', 'follow', 'after', 'timeout', 'retries', 'label']
)
_outstanding_maps = Set()

Expand Down Expand Up @@ -1289,6 +1332,7 @@ def _really_apply(
timeout=None,
targets=None,
retries=None,
label=None,
):
"""calls f(*args, **kwargs) on a remote engine, returning the result.

Expand Down Expand Up @@ -1344,6 +1388,12 @@ def _really_apply(
follow = self.follow if follow is None else follow
timeout = self.timeout if timeout is None else timeout
targets = self.targets if targets is None else targets
label = (
self.label if label is None else label
) # comes into play when calling map[_async] (self.label)
label = (
kwargs.pop("label") if "label" in kwargs and label is None else label
) # this is required can calling apply[_async]

if not isinstance(retries, int):
raise TypeError(f'retries must be int, not {type(retries)!r}')
Expand All @@ -1358,7 +1408,12 @@ def _really_apply(
after = self._render_dependency(after)
follow = self._render_dependency(follow)
metadata = dict(
after=after, follow=follow, timeout=timeout, targets=idents, retries=retries
after=after,
follow=follow,
timeout=timeout,
targets=idents,
retries=retries,
label=label,
)

future = self.client.send_apply_request(
Expand Down Expand Up @@ -1389,6 +1444,7 @@ def map(
chunksize=1,
ordered=True,
return_exceptions=False,
label=None,
):
"""Parallel version of builtin `map`, load-balanced by this View.

Expand Down Expand Up @@ -1443,6 +1499,7 @@ def map(
chunksize=chunksize,
ordered=ordered,
return_exceptions=return_exceptions,
label=label,
)
return pf.map(*sequences)

Expand Down
2 changes: 2 additions & 0 deletions ipyparallel/controller/hub.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ def empty_record():
'error': None,
'stdout': '',
'stderr': '',
'label': None,
}


Expand Down Expand Up @@ -111,6 +112,7 @@ def init_record(msg):
'error': None,
'stdout': '',
'stderr': '',
'label': msg['metadata'].get('label', None),
}


Expand Down
1 change: 1 addition & 0 deletions ipyparallel/engine/kernel.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ def init_metadata(self, parent):
'is_broadcast': parent_metadata.get('is_broadcast', False),
'is_coalescing': parent_metadata.get('is_coalescing', False),
'original_msg_id': parent_metadata.get('original_msg_id', ''),
'label': parent_metadata.get('label', None),
}

def finish_metadata(self, parent, metadata, reply_content):
Expand Down