Skip to content

Commit 6316585

Browse files
authored
[MMM-19334] Add support for OTEL. (#1411)
* Add support for OTEL. * Set OTEL_EXPORTER_OTLP_ENDPOINT * Add extra header or DR permissions. * Instrument moderation proactivly.
1 parent 1204a8e commit 6316585

File tree

5 files changed

+133
-48
lines changed

5 files changed

+133
-48
lines changed

custom_model_runner/datarobot_drum/drum/adapters/model_adapters/python_model_adapter.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
from datarobot_drum.drum.artifact_predictors.xgboost_predictor import XGBoostPredictor
2828
from datarobot_drum.drum.artifact_predictors.onnx_predictor import ONNXPredictor
2929
from datarobot_drum.drum.root_predictors.chat_helpers import is_openai_model
30-
3130
from datarobot_drum.drum.common import (
3231
reroute_stdout_to_stderr,
3332
SupportedPayloadFormats,
@@ -123,6 +122,22 @@ def __init__(self, model_dir, target_type=None):
123122
"Unexpected empty target name for text generation, "
124123
"vector database, or agentic workflow target."
125124
)
125+
# Instrument http clients in order to get nice traces from moderation library. We are
126+
# doing this here because moderation library is loaded before custom.py.
127+
try:
128+
from opentelemetry.instrumentation.requests import RequestsInstrumentor
129+
from opentelemetry.instrumentation.aiohttp_client import AioHttpClientInstrumentor
130+
131+
RequestsInstrumentor().instrument()
132+
AioHttpClientInstrumentor().instrument()
133+
except (ImportError, ModuleNotFoundError):
134+
msg = """Instrumentation for requests or aiottp is not loaded, make sure appropriate
135+
packages are installed:
136+
137+
pip install opentelemetry-instrumentation-requests
138+
pip install opentelemetry-instrumentation-aiohttp-client
139+
"""
140+
self._logger.warning(msg)
126141
self._load_moderation_hooks()
127142
else:
128143
self._target_name = None

custom_model_runner/datarobot_drum/drum/common.py

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import sys
1010
from contextvars import ContextVar
1111
from distutils.util import strtobool
12+
from urllib.parse import urlparse, urlunparse
1213

1314
from contextlib import contextmanager
1415
from pathlib import Path
@@ -20,6 +21,12 @@
2021
PayloadFormat,
2122
)
2223
from datarobot_drum.drum.exceptions import DrumCommonException
24+
from opentelemetry import trace, context
25+
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
26+
from opentelemetry.sdk.resources import Resource
27+
from opentelemetry.sdk.trace import TracerProvider
28+
from opentelemetry.sdk.trace.export import BatchSpanProcessor
29+
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
2330

2431

2532
ctx_request_id = ContextVar("request_id")
@@ -119,3 +126,57 @@ def to_bool(value):
119126

120127

121128
FIT_METADATA_FILENAME = "fit_runtime_data.json"
129+
130+
131+
def make_otel_endpoint(datarobot_endpoint):
132+
parsed_url = urlparse(datarobot_endpoint)
133+
stripped_url = (parsed_url.scheme, parsed_url.netloc, "otel", "", "", "")
134+
result = urlunparse(stripped_url)
135+
return result
136+
137+
138+
def setup_tracer(runtime_parameters):
139+
# OTEL disabled by default for now.
140+
if not (
141+
runtime_parameters.has("OTEL_SDK_ENABLED") and runtime_parameters.get("OTEL_SDK_ENABLED")
142+
):
143+
return
144+
# if deployment_id is not found, most likely this is custom model
145+
# testing
146+
deployment_id = os.environ.get("MLOPS_DEPLOYMENT_ID", os.environ.get("DEPLOYMENT_ID"))
147+
if not deployment_id:
148+
return
149+
150+
service_name = f"deployment-{deployment_id}"
151+
resource = Resource.create(
152+
{
153+
"service.name": service_name,
154+
"datarobot.deployment_id": deployment_id,
155+
}
156+
)
157+
key = os.environ.get("DATAROBOT_API_TOKEN")
158+
datarobot_endpoint = os.environ.get("DATAROBOT_ENDPOINT")
159+
if not key or not datarobot_endpoint:
160+
return
161+
endpoint = make_otel_endpoint(datarobot_endpoint)
162+
163+
os.environ["OTEL_EXPORTER_OTLP_ENDPOINT"] = endpoint
164+
headers = {
165+
"Authorization": f"Bearer {key}",
166+
"X-DataRobot-Entity-Id": f"entity=deployment; id={deployment_id};",
167+
}
168+
otlp_exporter = OTLPSpanExporter(headers=headers)
169+
provider = TracerProvider(resource=resource)
170+
provider.add_span_processor(BatchSpanProcessor(otlp_exporter))
171+
trace.set_tracer_provider(provider)
172+
173+
174+
@contextmanager
175+
def otel_context(tracer, span_name, carrier):
176+
ctx = TraceContextTextMapPropagator().extract(carrier=carrier)
177+
token = context.attach(ctx)
178+
try:
179+
with tracer.start_as_current_span(span_name) as span:
180+
yield span
181+
finally:
182+
context.detach(token)

custom_model_runner/datarobot_drum/drum/main.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@
4343
import sys
4444

4545
from datarobot_drum.drum.args_parser import CMRunnerArgsRegistry
46-
from datarobot_drum.drum.common import config_logging
46+
from datarobot_drum.drum.common import config_logging, setup_tracer
4747
from datarobot_drum.drum.enum import RunMode
4848
from datarobot_drum.drum.enum import ExitCodes
4949
from datarobot_drum.drum.exceptions import DrumSchemaValidationException
@@ -90,6 +90,9 @@ def signal_handler(sig, frame):
9090
options = arg_parser.parse_args()
9191
CMRunnerArgsRegistry.verify_options(options)
9292
_setup_required_environment_variables(options)
93+
# Env vars may setup OTEL configuration, lets setup
94+
# tracer after all env vars updated
95+
setup_tracer(RuntimeParameters)
9396
if RuntimeParameters.has("CUSTOM_MODEL_WORKERS"):
9497
options.max_workers = RuntimeParameters.get("CUSTOM_MODEL_WORKERS")
9598
runtime.options = options

custom_model_runner/datarobot_drum/drum/root_predictors/prediction_server.py

Lines changed: 48 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from flask import Response, jsonify, request
1313
from werkzeug.exceptions import HTTPException
1414

15+
from opentelemetry import trace
1516
from datarobot_drum.drum.description import version as drum_version
1617
from datarobot_drum.drum.enum import (
1718
FLASK_EXT_FILE_NAME,
@@ -38,10 +39,14 @@
3839
get_flask_app,
3940
)
4041
from datarobot_drum.profiler.stats_collector import StatsCollector, StatsOperation
42+
from datarobot_drum.drum.common import otel_context
4143

4244
logger = logging.getLogger(LOGGER_NAME_PREFIX + "." + __name__)
4345

4446

47+
tracer = trace.get_tracer(__name__)
48+
49+
4550
class PredictionServer(PredictMixin):
4651
def __init__(self, params: dict):
4752
self._params = params
@@ -157,54 +162,50 @@ def health():
157162
@model_api.route("/invocations", methods=["POST"])
158163
def predict():
159164
logger.debug("Entering predict() endpoint")
160-
161-
self._pre_predict_and_transform()
162-
try:
163-
response, response_status = self.do_predict_structured(logger=logger)
164-
finally:
165-
self._post_predict_and_transform()
165+
with otel_context(tracer, "drum.invocations", request.headers):
166+
self._pre_predict_and_transform()
167+
try:
168+
response, response_status = self.do_predict_structured(logger=logger)
169+
finally:
170+
self._post_predict_and_transform()
166171

167172
return response, response_status
168173

169174
@model_api.route("/transform/", methods=["POST"])
170175
def transform():
171176
logger.debug("Entering transform() endpoint")
172-
173-
self._pre_predict_and_transform()
174-
175-
try:
176-
response, response_status = self.do_transform(logger=logger)
177-
finally:
178-
self._post_predict_and_transform()
177+
with otel_context(tracer, "drum.transform", request.headers):
178+
self._pre_predict_and_transform()
179+
try:
180+
response, response_status = self.do_transform(logger=logger)
181+
finally:
182+
self._post_predict_and_transform()
179183

180184
return response, response_status
181185

182186
@model_api.route("/predictionsUnstructured/", methods=["POST"])
183187
@model_api.route("/predictUnstructured/", methods=["POST"])
184188
def predict_unstructured():
185189
logger.debug("Entering predict() endpoint")
186-
187-
self._pre_predict_and_transform()
188-
189-
try:
190-
response, response_status = self.do_predict_unstructured(logger=logger)
191-
finally:
192-
self._post_predict_and_transform()
193-
190+
with otel_context(tracer, "drum.predictUnstructured", request.headers):
191+
self._pre_predict_and_transform()
192+
try:
193+
response, response_status = self.do_predict_unstructured(logger=logger)
194+
finally:
195+
self._post_predict_and_transform()
194196
return (response, response_status)
195197

196198
# Chat routes are defined without trailing slash because this is required by the OpenAI python client.
197199
@model_api.route("/chat/completions", methods=["POST"])
198200
@model_api.route("/v1/chat/completions", methods=["POST"])
199201
def chat():
200202
logger.debug("Entering chat endpoint")
201-
202-
self._pre_predict_and_transform()
203-
204-
try:
205-
response, response_status = self.do_chat(logger=logger)
206-
finally:
207-
self._post_predict_and_transform()
203+
with otel_context(tracer, "drum.chat.completions", request.headers):
204+
self._pre_predict_and_transform()
205+
try:
206+
response, response_status = self.do_chat(logger=logger)
207+
finally:
208+
self._post_predict_and_transform()
208209

209210
return response, response_status
210211

@@ -226,24 +227,25 @@ def get_supported_llm_models():
226227
@model_api.route("/directAccess/<path:path>", methods=["GET", "POST", "PUT"])
227228
@model_api.route("/nim/<path:path>", methods=["GET", "POST", "PUT"])
228229
def forward_request(path):
229-
if not hasattr(self._predictor, "openai_host") or not hasattr(
230-
self._predictor, "openai_port"
231-
):
232-
return {
233-
"message": "This endpoint is only supported by OpenAI based predictors"
234-
}, HTTP_400_BAD_REQUEST
235-
236-
openai_host = self._predictor.openai_host
237-
openai_port = self._predictor.openai_port
238-
239-
resp = requests.request(
240-
method=request.method,
241-
url=f"http://{openai_host}:{openai_port}/{path.rstrip('/')}",
242-
headers=request.headers,
243-
params=request.args,
244-
data=request.get_data(),
245-
allow_redirects=False,
246-
)
230+
with otel_context(tracer, "drum.directAccess", request.headers) as span:
231+
if not hasattr(self._predictor, "openai_host") or not hasattr(
232+
self._predictor, "openai_port"
233+
):
234+
msg = "This endpoint is only supported by OpenAI based predictors"
235+
span.set_status(StatusCode.ERROR, msg)
236+
return {"message": msg}, HTTP_400_BAD_REQUEST
237+
238+
openai_host = self._predictor.openai_host
239+
openai_port = self._predictor.openai_port
240+
241+
resp = requests.request(
242+
method=request.method,
243+
url=f"http://{openai_host}:{openai_port}/{path.rstrip('/')}",
244+
headers=request.headers,
245+
params=request.args,
246+
data=request.get_data(),
247+
allow_redirects=False,
248+
)
247249

248250
return Response(resp.content, status=resp.status_code, headers=dict(resp.headers))
249251

custom_model_runner/requirements.txt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,3 +24,7 @@ pydantic
2424
datarobot-storage
2525
datarobot-mlops>=10.2.0 # Required for the 'set_api_spooler' with arugments
2626
datarobot>=3.1.0,<4
27+
# otel
28+
opentelemetry-api
29+
opentelemetry-sdk
30+
opentelemetry-exporter-otlp-proto-http

0 commit comments

Comments
 (0)