Skip to content

Commit 6c2f05c

Browse files
committed
public api for streaming endpoints [ch48372]
1 parent b650867 commit 6c2f05c

File tree

2 files changed

+429
-0
lines changed

2 files changed

+429
-0
lines changed

dataikuapi/dss/project.py

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import time, warnings, sys, os.path as osp
22
from .dataset import DSSDataset, DSSDatasetListItem, DSSManagedDatasetCreationHelper
3+
from .streaming_endpoint import DSSStreamingEndpoint, DSSStreamingEndpointListItem, DSSManagedStreamingEndpointCreationHelper
34
from .recipe import DSSRecipe
45
from . import recipe
56
from .managedfolder import DSSManagedFolder
@@ -344,6 +345,106 @@ def new_managed_dataset_creation_helper(self, dataset_name):
344345
"""
345346
return DSSManagedDatasetCreationHelper(self, dataset_name)
346347

348+
########################################################
349+
# Streaming endpoints
350+
########################################################
351+
352+
def list_streaming_endpoints(self, as_type="listitems"):
353+
"""
354+
List the streaming endpoints in this project.
355+
356+
:param str as_type: How to return the list. Supported values are "listitems" and "objects".
357+
:returns: The list of the streaming endpoints. If "as_type" is "listitems", each one as a :class:`streaming_endpoint.DSSStreamingEndpointListItem`.
358+
If "as_type" is "objects", each one as a :class:`streaming_endpoint.DSSStreamingEndpoint`
359+
:rtype: list
360+
"""
361+
items = self.client._perform_json("GET", "/projects/%s/streamingendpoints/" % self.project_key)
362+
if as_type == "listitems" or as_type == "listitem":
363+
return [DSSStreamingEndpointListItem(self.client, item) for item in items]
364+
elif as_type == "objects" or as_type == "object":
365+
return [DSSStreamingEndpoint(self.client, self.project_key, item["id"]) for item in items]
366+
else:
367+
raise ValueError("Unknown as_type")
368+
369+
def get_streaming_endpoint(self, streaming_endpoint_name):
370+
"""
371+
Get a handle to interact with a specific streaming endpoint
372+
373+
:param string streaming_endpoint_name: the name of the desired streaming endpoint
374+
375+
:returns: A :class:`dataikuapi.dss.streaming_endpoint.DSSStreamingEndpoint` streaming endpoint handle
376+
"""
377+
return DSSStreamingEndpoint(self.client, self.project_key, streaming_endpoint_name)
378+
379+
def create_streaming_endpoint(self, streaming_endpoint_name, type, params=None):
380+
"""
381+
Create a new streaming endpoint in the project, and return a handle to interact with it.
382+
383+
The precise structure of ``params`` depends on the specific streaming endpoint
384+
type. To know which fields exist for a given streaming endpoint type,
385+
create a streaming endpoint from the UI, and use :meth:`get_streaming_endpoint` to retrieve the configuration
386+
of the streaming endpoint and inspect it. Then reproduce a similar structure in the :meth:`create_streaming_endpoint` call.
387+
388+
Not all settings of a streaming endpoint can be set at creation time (for example partitioning). After creation,
389+
you'll have the ability to modify the streaming endpoint
390+
391+
:param string streaming_endpoint_name: the name for the new streaming endpoint
392+
:param string type: the type of the streaming endpoint
393+
:param dict params: the parameters for the type, as a JSON object (defaults to `{}`)
394+
395+
Returns:
396+
A :class:`dataikuapi.dss.streaming_endpoint.DSSStreamingEndpoint` streaming endpoint handle
397+
"""
398+
if params is None:
399+
params = {}
400+
obj = {
401+
"id" : streaming_endpoint_name,
402+
"projectKey" : self.project_key,
403+
"type" : type,
404+
"params" : params
405+
}
406+
self.client._perform_json("POST", "/projects/%s/streamingendpoints/" % self.project_key,
407+
body = obj)
408+
return DSSStreamingEndpoint(self.client, self.project_key, streaming_endpoint_name)
409+
410+
def create_kafka_streaming_endpoint(self, streaming_endpoint_name, connection=None, topic=None):
411+
obj = {
412+
"id" : streaming_endpoint_name,
413+
"projectKey" : self.project_key,
414+
"type" : "kafka",
415+
"params" : {}
416+
}
417+
if connection is not None:
418+
obj["params"]["connection"] = connection
419+
if topic is not None:
420+
obj["params"]["topic"] = topic
421+
self.client._perform_json("POST", "/projects/%s/streamingendpoints/" % self.project_key,
422+
body = obj)
423+
return DSSStreamingEndpoint(self.client, self.project_key, streaming_endpoint_name)
424+
425+
def create_httpsse_streaming_endpoint(self, streaming_endpoint_name, url=None):
426+
obj = {
427+
"id" : streaming_endpoint_name,
428+
"projectKey" : self.project_key,
429+
"type" : "httpsse",
430+
"params" : {}
431+
}
432+
if url is not None:
433+
obj["params"]["url"] = url
434+
self.client._perform_json("POST", "/projects/%s/streamingendpoints/" % self.project_key,
435+
body = obj)
436+
return DSSStreamingEndpoint(self.client, self.project_key, streaming_endpoint_name)
437+
438+
def new_managed_streaming_endpoint_creation_helper(self, streaming_endpoint_name, streaming_endpoint_type=None):
439+
"""
440+
Creates a helper class to create a managed streaming endpoint in the project
441+
442+
:param string streaming_endpoint_name: Name of the new streaming endpoint - must be unique in the project
443+
:param string streaming_endpoint_type: Type of the new streaming endpoint (optional if it can be inferred from a connection type)
444+
:return: A :class:`dataikuapi.dss.streaming_endpoint.DSSManagedStreamingEndpointCreationHelper` object to create the streaming endpoint
445+
"""
446+
return DSSManagedStreamingEndpointCreationHelper(self, streaming_endpoint_name, streaming_endpoint_type)
447+
347448
########################################################
348449
# Lab and ML
349450
# Don't forget to synchronize with DSSDataset.*

0 commit comments

Comments
 (0)