Skip to content

Commit ac5dd12

Browse files
authored
Fix fokring and instrument OpenAI (#1498)
1 parent d930e39 commit ac5dd12

File tree

4 files changed

+38
-6
lines changed

4 files changed

+38
-6
lines changed

custom_model_runner/datarobot_drum/drum/common.py

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
from opentelemetry.sdk.resources import Resource
2727
from opentelemetry.sdk.trace import TracerProvider
2828
from opentelemetry.sdk.trace.export import BatchSpanProcessor
29+
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
2930
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
3031

3132

@@ -135,7 +136,7 @@ def make_otel_endpoint(datarobot_endpoint):
135136
return result
136137

137138

138-
def setup_tracer(runtime_parameters):
139+
def setup_tracer(runtime_parameters, options):
139140
"""Setups OTEL tracer.
140141
141142
OTEL is configured by OTEL_EXPORTER_OTLP_ENDPOINT and
@@ -145,6 +146,8 @@ def setup_tracer(runtime_parameters):
145146
----------
146147
runtime_parameters: Type[RuntimeParameters] class handles runtime parameters for custom modes
147148
used to check if OTEL configuration from user.
149+
options: argparse.Namespace: object obtained from argparser filled with user supplied
150+
command argumetns
148151
Returns
149152
-------
150153
None
@@ -166,7 +169,19 @@ def setup_tracer(runtime_parameters):
166169
resource = Resource.create()
167170
otlp_exporter = OTLPSpanExporter()
168171
provider = TracerProvider(resource=resource)
169-
provider.add_span_processor(BatchSpanProcessor(otlp_exporter))
172+
173+
# In case of NIM flask server is configured to run in multiprocessing
174+
# mode that uses fork. Since BatchSpanProcessor start background thread
175+
# with bunch of locks, OTEL simply deadlocks and does not offlooad any
176+
# spans. Even if we start BatchSpanProcessor per fork, batches often
177+
# missing due to process exits before all data offloaded. In forking
178+
# case we use SimpleSpanProcessor (mostly NIMs) otherwise BatchSpanProcessor
179+
# (most frequent case)
180+
if options.max_workers > 1:
181+
provider.add_span_processor(SimpleSpanProcessor(otlp_exporter))
182+
else:
183+
provider.add_span_processor(BatchSpanProcessor(otlp_exporter))
184+
170185
trace.set_tracer_provider(provider)
171186

172187
endpoint = main_endpoint or trace_endpoint

custom_model_runner/datarobot_drum/drum/gpu_predictors/base.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,13 +48,31 @@
4848
from datarobot_drum.drum.server import HTTP_513_DRUM_PIPELINE_ERROR
4949
from datarobot_drum.drum.root_predictors.chat_helpers import is_openai_model
5050

51+
52+
logger = logging.getLogger(__name__)
53+
5154
# OpenAI client isn't a required dependency for DRUM, so we need to check if it's available
5255
try:
5356
from openai import OpenAI
5457
from openai.resources.chat.completions import Completions
5558

5659
COMPLETIONS_CREATE_SIGNATURE = inspect.signature(Completions.create)
5760
_HAS_OPENAI = True
61+
62+
try:
63+
# disable "anonymous" tracking from traceloop
64+
os.environ["TRACELOOP_TRACE_CONTENT"] = "false"
65+
from opentelemetry.instrumentation.openai import OpenAIInstrumentor
66+
67+
OpenAIInstrumentor().instrument()
68+
except (ImportError, ModuleNotFoundError):
69+
msg = """Instrumentation for openai is not loaded, make sure appropriate
70+
packages are installed:
71+
72+
pip install opentelemetry-instrumentation-openai
73+
"""
74+
logger.warning(msg)
75+
5876
except ImportError:
5977
_HAS_OPENAI = False
6078

custom_model_runner/datarobot_drum/drum/main.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,12 +91,11 @@ def signal_handler(sig, frame):
9191
CMRunnerArgsRegistry.verify_options(options)
9292
_setup_required_environment_variables(options)
9393

94-
setup_tracer(RuntimeParameters)
95-
9694
if RuntimeParameters.has("CUSTOM_MODEL_WORKERS"):
9795
options.max_workers = RuntimeParameters.get("CUSTOM_MODEL_WORKERS")
9896
runtime.options = options
9997

98+
setup_tracer(RuntimeParameters, options)
10099
signal.signal(signal.SIGINT, signal_handler)
101100
signal.signal(signal.SIGTERM, signal_handler)
102101

tests/unit/datarobot_drum/drum/test_main.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ def test_custom_model_workers(
3535
runtime_params.has.return_value = False
3636

3737
main()
38-
runtime_params.has.assert_called_with("CUSTOM_MODEL_WORKERS")
38+
runtime_params.has.assert_any_call("CUSTOM_MODEL_WORKERS")
3939
if workers_param:
40-
runtime_params.get.assert_called_with("CUSTOM_MODEL_WORKERS")
40+
runtime_params.get.assert_any_call("CUSTOM_MODEL_WORKERS")
4141
assert expected_workers == options.max_workers

0 commit comments

Comments
 (0)