Skip to content

Commit 68984f7

Browse files
authored
Merge pull request #71 from dataiku/task/streaming-endpoints-public-api
public api for streaming endpoints
2 parents 7d990f1 + 0d55b72 commit 68984f7

File tree

4 files changed

+505
-0
lines changed

4 files changed

+505
-0
lines changed
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
import time
2+
import sys
3+
from dataikuapi.utils import DataikuException
4+
5+
class DSSContinuousActivity(object):
6+
"""
7+
A continuous activity on the DSS instance
8+
"""
9+
def __init__(self, client, project_key, recipe_id):
10+
self.client = client
11+
self.recipe_id = recipe_id
12+
self.project_key = project_key
13+
14+
def start(self, loop_params={}):
15+
"""
16+
Start the activity
17+
"""
18+
return self.client._perform_json(
19+
"POST", "/projects/%s/continuous-activities/%s/start" % (self.project_key, self.recipe_id), body=loop_params)
20+
21+
def stop(self):
22+
"""
23+
Stop the activity
24+
"""
25+
self.client._perform_empty(
26+
"POST", "/projects/%s/continuous-activities/%s/stop" % (self.project_key, self.recipe_id))
27+
28+
def get_status(self):
29+
"""
30+
Get the current status of the continuous activity
31+
32+
Returns:
33+
the state of the continuous activity, as a JSON object
34+
"""
35+
return self.client._perform_json(
36+
"GET", "/projects/%s/continuous-activities/%s/" % (self.project_key, self.recipe_id))
37+
38+
def get_recipe(self):
39+
"""
40+
Return a handle on the associated recipe
41+
"""
42+
from .recipe import DSSRecipe
43+
return DSSRecipe(self.client, self.project_key, self.recipe_id)

dataikuapi/dss/project.py

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
import time, warnings, sys, os.path as osp
22
from .dataset import DSSDataset, DSSDatasetListItem, DSSManagedDatasetCreationHelper
3+
from .streaming_endpoint import DSSStreamingEndpoint, DSSStreamingEndpointListItem, DSSManagedStreamingEndpointCreationHelper
34
from .recipe import DSSRecipe
45
from . import recipe
56
from .managedfolder import DSSManagedFolder
67
from .savedmodel import DSSSavedModel
78
from .job import DSSJob, DSSJobWaiter
9+
from .continuousactivity import DSSContinuousActivity
810
from .scenario import DSSScenario
911
from .apiservice import DSSAPIService
1012
from .future import DSSFuture
@@ -344,6 +346,106 @@ def new_managed_dataset_creation_helper(self, dataset_name):
344346
"""
345347
return DSSManagedDatasetCreationHelper(self, dataset_name)
346348

349+
########################################################
350+
# Streaming endpoints
351+
########################################################
352+
353+
def list_streaming_endpoints(self, as_type="listitems"):
354+
"""
355+
List the streaming endpoints in this project.
356+
357+
:param str as_type: How to return the list. Supported values are "listitems" and "objects".
358+
:returns: The list of the streaming endpoints. If "as_type" is "listitems", each one as a :class:`streaming_endpoint.DSSStreamingEndpointListItem`.
359+
If "as_type" is "objects", each one as a :class:`streaming_endpoint.DSSStreamingEndpoint`
360+
:rtype: list
361+
"""
362+
items = self.client._perform_json("GET", "/projects/%s/streamingendpoints/" % self.project_key)
363+
if as_type == "listitems" or as_type == "listitem":
364+
return [DSSStreamingEndpointListItem(self.client, item) for item in items]
365+
elif as_type == "objects" or as_type == "object":
366+
return [DSSStreamingEndpoint(self.client, self.project_key, item["id"]) for item in items]
367+
else:
368+
raise ValueError("Unknown as_type")
369+
370+
def get_streaming_endpoint(self, streaming_endpoint_name):
371+
"""
372+
Get a handle to interact with a specific streaming endpoint
373+
374+
:param string streaming_endpoint_name: the name of the desired streaming endpoint
375+
376+
:returns: A :class:`dataikuapi.dss.streaming_endpoint.DSSStreamingEndpoint` streaming endpoint handle
377+
"""
378+
return DSSStreamingEndpoint(self.client, self.project_key, streaming_endpoint_name)
379+
380+
def create_streaming_endpoint(self, streaming_endpoint_name, type, params=None):
381+
"""
382+
Create a new streaming endpoint in the project, and return a handle to interact with it.
383+
384+
The precise structure of ``params`` depends on the specific streaming endpoint
385+
type. To know which fields exist for a given streaming endpoint type,
386+
create a streaming endpoint from the UI, and use :meth:`get_streaming_endpoint` to retrieve the configuration
387+
of the streaming endpoint and inspect it. Then reproduce a similar structure in the :meth:`create_streaming_endpoint` call.
388+
389+
Not all settings of a streaming endpoint can be set at creation time (for example partitioning). After creation,
390+
you'll have the ability to modify the streaming endpoint
391+
392+
:param string streaming_endpoint_name: the name for the new streaming endpoint
393+
:param string type: the type of the streaming endpoint
394+
:param dict params: the parameters for the type, as a JSON object (defaults to `{}`)
395+
396+
Returns:
397+
A :class:`dataikuapi.dss.streaming_endpoint.DSSStreamingEndpoint` streaming endpoint handle
398+
"""
399+
if params is None:
400+
params = {}
401+
obj = {
402+
"id" : streaming_endpoint_name,
403+
"projectKey" : self.project_key,
404+
"type" : type,
405+
"params" : params
406+
}
407+
self.client._perform_json("POST", "/projects/%s/streamingendpoints/" % self.project_key,
408+
body = obj)
409+
return DSSStreamingEndpoint(self.client, self.project_key, streaming_endpoint_name)
410+
411+
def create_kafka_streaming_endpoint(self, streaming_endpoint_name, connection=None, topic=None):
412+
obj = {
413+
"id" : streaming_endpoint_name,
414+
"projectKey" : self.project_key,
415+
"type" : "kafka",
416+
"params" : {}
417+
}
418+
if connection is not None:
419+
obj["params"]["connection"] = connection
420+
if topic is not None:
421+
obj["params"]["topic"] = topic
422+
self.client._perform_json("POST", "/projects/%s/streamingendpoints/" % self.project_key,
423+
body = obj)
424+
return DSSStreamingEndpoint(self.client, self.project_key, streaming_endpoint_name)
425+
426+
def create_httpsse_streaming_endpoint(self, streaming_endpoint_name, url=None):
427+
obj = {
428+
"id" : streaming_endpoint_name,
429+
"projectKey" : self.project_key,
430+
"type" : "httpsse",
431+
"params" : {}
432+
}
433+
if url is not None:
434+
obj["params"]["url"] = url
435+
self.client._perform_json("POST", "/projects/%s/streamingendpoints/" % self.project_key,
436+
body = obj)
437+
return DSSStreamingEndpoint(self.client, self.project_key, streaming_endpoint_name)
438+
439+
def new_managed_streaming_endpoint_creation_helper(self, streaming_endpoint_name, streaming_endpoint_type=None):
440+
"""
441+
Creates a helper class to create a managed streaming endpoint in the project
442+
443+
:param string streaming_endpoint_name: Name of the new streaming endpoint - must be unique in the project
444+
:param string streaming_endpoint_type: Type of the new streaming endpoint (optional if it can be inferred from a connection type)
445+
:return: A :class:`dataikuapi.dss.streaming_endpoint.DSSManagedStreamingEndpointCreationHelper` object to create the streaming endpoint
446+
"""
447+
return DSSManagedStreamingEndpointCreationHelper(self, streaming_endpoint_name, streaming_endpoint_type)
448+
347449
########################################################
348450
# Lab and ML
349451
# Don't forget to synchronize with DSSDataset.*
@@ -626,6 +728,32 @@ def new_job_definition_builder(self, job_type='NON_RECURSIVE_FORCED_BUILD'):
626728
warnings.warn("new_job_definition_builder is deprecated, please use new_job", DeprecationWarning)
627729
return JobDefinitionBuilder(self, job_type)
628730

731+
########################################################
732+
# Continuous activities
733+
########################################################
734+
735+
def list_continuous_activities(self, as_objects=True):
736+
"""
737+
List the continuous activities in this project
738+
739+
Returns:
740+
a list of the continuous activities, each one as a JSON object, containing both the definition and the state
741+
"""
742+
list = self.client._perform_json("GET", "/projects/%s/continuous-activities/" % self.project_key)
743+
if as_objects:
744+
return [DSSContinuousActivity(self.client, a['projectKey'], a['recipeId']) for a in list]
745+
else:
746+
return list
747+
748+
def get_continuous_activity(self, recipe_id):
749+
"""
750+
Get a handler to interact with a specific continuous activities
751+
752+
Returns:
753+
A :class:`dataikuapi.dss.continuousactivity.DSSContinuousActivity` job handle
754+
"""
755+
return DSSContinuousActivity(self.client, self.project_key, recipe_id)
756+
629757
########################################################
630758
# Variables
631759
########################################################

dataikuapi/dss/recipe.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,12 @@ def get_object_discussions(self):
190190
"""
191191
return DSSObjectDiscussions(self.client, self.project_key, "RECIPE", self.recipe_name)
192192

193+
def get_continuous_activity(self):
194+
"""
195+
Return a handle on the associated recipe
196+
"""
197+
from .continuousactivity import DSSContinuousActivity
198+
return DSSContinuousActivity(self.client, self.project_key, self.recipe_name)
193199

194200
class DSSRecipeStatus(object):
195201
"""Status of a recipce.

0 commit comments

Comments
 (0)