diff --git a/ldclient/config.py b/ldclient/config.py index 32b28dfc..fbc88ac8 100644 --- a/ldclient/config.py +++ b/ldclient/config.py @@ -9,7 +9,11 @@ from ldclient.feature_store import InMemoryFeatureStore from ldclient.hook import Hook -from ldclient.impl.util import log, validate_application_info +from ldclient.impl.util import ( + log, + validate_application_info, + validate_sdk_key_format +) from ldclient.interfaces import ( BigSegmentStore, DataSourceUpdateSink, @@ -261,7 +265,7 @@ def __init__( :param omit_anonymous_contexts: Sets whether anonymous contexts should be omitted from index and identify events. :param payload_filter_key: The payload filter is used to selectively limited the flags and segments delivered in the data source payload. """ - self.__sdk_key = sdk_key + self.__sdk_key = validate_sdk_key_format(sdk_key, log) self.__base_uri = base_uri.rstrip('/') self.__events_uri = events_uri.rstrip('/') @@ -302,6 +306,7 @@ def __init__( def copy_with_new_sdk_key(self, new_sdk_key: str) -> 'Config': """Returns a new ``Config`` instance that is the same as this one, except for having a different SDK key. + The key will not be updated if the provided key contains invalid characters. :param new_sdk_key: the new SDK key """ @@ -542,8 +547,8 @@ def data_source_update_sink(self) -> Optional[DataSourceUpdateSink]: return self._data_source_update_sink def _validate(self): - if self.offline is False and self.sdk_key is None or self.sdk_key == '': - log.warning("Missing or blank sdk_key.") + if self.offline is False and self.sdk_key == '': + log.warning("Missing or blank SDK key") __all__ = ['Config', 'BigSegmentsConfig', 'HTTPConfig'] diff --git a/ldclient/impl/util.py b/ldclient/impl/util.py index 4fbaf110..e60feb9d 100644 --- a/ldclient/impl/util.py +++ b/ldclient/impl/util.py @@ -27,8 +27,13 @@ def timedelta_millis(delta: timedelta) -> float: __BASE_TYPES__ = (str, float, int, bool) +# Maximum length for SDK keys +_MAX_SDK_KEY_LENGTH = 8192 -_retryable_statuses = [400, 408, 429] +_RETRYABLE_STATUSES = [400, 408, 429] + +# Compiled regex pattern for valid characters in application values and SDK keys +_VALID_CHARACTERS_REGEX = re.compile(r"[^a-zA-Z0-9._-]") def validate_application_info(application: dict, logger: logging.Logger) -> dict: @@ -46,13 +51,35 @@ def validate_application_value(value: Any, name: str, logger: logging.Logger) -> logger.warning('Value of application[%s] was longer than 64 characters and was discarded' % name) return "" - if re.search(r"[^a-zA-Z0-9._-]", value): + if _VALID_CHARACTERS_REGEX.search(value): logger.warning('Value of application[%s] contained invalid characters and was discarded' % name) return "" return value +def validate_sdk_key_format(sdk_key: str, logger: logging.Logger) -> str: + """ + Validates that an SDK key does not contain invalid characters and is not too long for our systems. + + :param sdk_key: the SDK key to validate + :param logger: the logger to use for logging warnings + :return: the validated SDK key, or empty string if the SDK key is invalid + """ + if sdk_key is None or sdk_key == '': + return "" + + if not isinstance(sdk_key, str): + return "" + if len(sdk_key) > _MAX_SDK_KEY_LENGTH: + logger.warning('SDK key was longer than %d characters and was discarded' % _MAX_SDK_KEY_LENGTH) + return "" + if _VALID_CHARACTERS_REGEX.search(sdk_key): + logger.warning('SDK key contained invalid characters and was discarded') + return "" + return sdk_key + + def _headers(config): base_headers = _base_headers(config) base_headers.update({'Content-Type': "application/json"}) @@ -106,7 +133,7 @@ def throw_if_unsuccessful_response(resp): def is_http_error_recoverable(status): if status >= 400 and status < 500: - return status in _retryable_statuses # all other 4xx besides these are unrecoverable + return status in _RETRYABLE_STATUSES # all other 4xx besides these are unrecoverable return True # all other errors are recoverable diff --git a/ldclient/testing/impl/test_util.py b/ldclient/testing/impl/test_util.py new file mode 100644 index 00000000..ea843e4b --- /dev/null +++ b/ldclient/testing/impl/test_util.py @@ -0,0 +1,65 @@ +import logging + +from ldclient.impl.util import validate_sdk_key_format + + +def test_validate_sdk_key_format_valid(): + """Test validation of valid SDK keys""" + logger = logging.getLogger('test') + valid_keys = [ + "sdk-12345678-1234-1234-1234-123456789012", + "valid-sdk-key-123", + "VALID_SDK_KEY_456", + "test.key_with.dots", + "test-key-with-hyphens" + ] + + for key in valid_keys: + result = validate_sdk_key_format(key, logger) + assert result == key # Should return the same key if valid + + +def test_validate_sdk_key_format_invalid(): + """Test validation of invalid SDK keys""" + logger = logging.getLogger('test') + invalid_keys = [ + "sdk-key-with-\x00-null", + "sdk-key-with-\n-newline", + "sdk-key-with-\t-tab", + "sdk key with spaces", + "sdk@key#with$special%chars", + "sdk/key\\with/slashes" + ] + + for key in invalid_keys: + result = validate_sdk_key_format(key, logger) + assert result == '' # Should return empty string for invalid keys + + +def test_validate_sdk_key_format_non_string(): + """Test validation of non-string SDK keys""" + logger = logging.getLogger('test') + non_string_values = [123, object(), [], {}] + + for value in non_string_values: + result = validate_sdk_key_format(value, logger) + assert result == '' # Should return empty string for non-string values + + +def test_validate_sdk_key_format_empty_and_none(): + """Test validation of empty and None SDK keys""" + logger = logging.getLogger('test') + assert validate_sdk_key_format("", logger) == '' # Empty string should return empty string + assert validate_sdk_key_format(None, logger) == '' # None should return empty string + + +def test_validate_sdk_key_format_max_length(): + """Test validation of SDK key maximum length""" + logger = logging.getLogger('test') + valid_key = "a" * 8192 + result = validate_sdk_key_format(valid_key, logger) + assert result == valid_key # Should return the same key if valid + + invalid_key = "a" * 8193 + result = validate_sdk_key_format(invalid_key, logger) + assert result == '' # Should return empty string for keys that are too long diff --git a/ldclient/testing/test_config.py b/ldclient/testing/test_config.py index 77fc5b34..b5321d36 100644 --- a/ldclient/testing/test_config.py +++ b/ldclient/testing/test_config.py @@ -45,6 +45,71 @@ def test_trims_trailing_slashes_on_uris(): assert config.stream_base_uri == "https://blog.launchdarkly.com" +def test_sdk_key_validation_valid_keys(): + """Test that valid SDK keys are accepted""" + valid_keys = [ + "sdk-12345678-1234-1234-1234-123456789012", + "valid-sdk-key-123", + "VALID_SDK_KEY_456", + "test.key_with.dots", + "test-key-with-hyphens" + ] + + for key in valid_keys: + config = Config(sdk_key=key) + assert config.sdk_key == key + + +def test_sdk_key_validation_invalid_keys(): + """Test that invalid SDK keys are not set""" + invalid_keys = [ + "sdk-key-with-\x00-null", + "sdk-key-with-\n-newline", + "sdk-key-with-\t-tab", + "sdk key with spaces", + "sdk@key#with$special%chars", + "sdk/key\\with/slashes" + ] + + for key in invalid_keys: + config = Config(sdk_key=key) + assert config.sdk_key == '' + + +def test_sdk_key_validation_empty_key(): + """Test that empty SDK keys are accepted""" + config = Config(sdk_key="") + assert config.sdk_key == "" + + +def test_sdk_key_validation_none_key(): + """Test that None SDK keys are accepted""" + config = Config(sdk_key=None) + assert config.sdk_key == '' + + +def test_sdk_key_validation_max_length(): + """Test SDK key maximum length validation""" + valid_key = "a" * 8192 + config = Config(sdk_key=valid_key) + assert config.sdk_key == valid_key + + invalid_key = "a" * 8193 + config = Config(sdk_key=invalid_key) + assert config.sdk_key == '' + + +def test_copy_with_new_sdk_key_validation(): + """Test that copy_with_new_sdk_key validates the new key""" + original_config = Config(sdk_key="valid-key") + + new_config = original_config.copy_with_new_sdk_key("another-valid-key") + assert new_config.sdk_key == "another-valid-key" + + invalid_config = original_config.copy_with_new_sdk_key("invalid key with spaces") + assert invalid_config.sdk_key == '' + + def application_can_be_set_and_read(): application = {"id": "my-id", "version": "abcdef"} config = Config(sdk_key="SDK_KEY", application=application)