[client] Introduce api protocol helper for listening connectors (#850)#851
[client] Introduce api protocol helper for listening connectors (#850)#851richard-julien merged 6 commits intomasterfrom
Conversation
+ Add support for multi env to fix queue protocol var name
There was a problem hiding this comment.
Thank you for adding this feature
I've left some comments regarding the code.
Do you think we should extend the unit tests in tests/02-integration, as they currently only cover AMQP cases?
I'll be happy to answer your questions and address any remarks.
| @@ -41,7 +45,7 @@ def start_loop(loop): | |||
|
|
|||
|
|
|||
| def get_config_variable( | |||
There was a problem hiding this comment.
I understand this "OR like" behavior is introduced for the "QUEUE_PROTOCOL" backwards compatibility even though we now recommend "CONNECTOR_QUEUE_PROTOCOL" (addition in line 931).
1/ As the get_config_variable is used in many other place, do not you think we could avoid this change in the get_config_var method and only use the OR behavior for this specific variable loading process
2/ Should we add a deprecation warning ?
self.queue_protocol = get_config_variable(
env_var="CONNECTOR_QUEUE_PROTOCOL",
yaml_path=["connector", "queue_protocol"],
config=config,
)
if not self.queue_protocol: # for backwards compatibility
self.queue_protocol = get_config_variable(
env_var="QUEUE_PROTOCOL",
yaml_path=None,
config=config,
)
if self.queue_protocol:
raise DeprecationWarning("QUEUE_PROTOCOL is deprecated, please use CONNECTOR_QUEUE_PROTOCOL instead.")
if not self.queue_protocol:
self.queue_protocol = "amqp"
| try: | ||
| authorization: str = request.headers.get("Authorization") | ||
| scheme, token = authorization.split() | ||
| if scheme.lower() != "bearer" or token != self.opencti_token: | ||
| return {"error": "Invalid credentials"} | ||
| except Exception: | ||
| return {"error": "Invalid credentials"} |
There was a problem hiding this comment.
I think we can remove the try-except clause by
- adding value check to avoid :
"authorization.split() => Attribute error NoneType as no attribute 'split'" in case no "Authorization" key in headers dict
or "scheme, token => ValueError not enough/to many values to unpack " in case Authorization is empty or badly formatted string.
| try: | |
| authorization: str = request.headers.get("Authorization") | |
| scheme, token = authorization.split() | |
| if scheme.lower() != "bearer" or token != self.opencti_token: | |
| return {"error": "Invalid credentials"} | |
| except Exception: | |
| return {"error": "Invalid credentials"} | |
| authorization: str = request.headers.get("Authorization", "") # empty string default value for typing | |
| items = authorization.split() if isinstance(authorization, str) else [] | |
| if len(items) != 2 or items[0].lower() != "bearer" or items[1] != self.opencti_token: | |
| return {"error": "Invalid credentials"} | |
| try: | ||
| data = await request.json() # Get the JSON payload | ||
| except Exception as e: | ||
| return {"error": "Invalid JSON payload", "details": str(e)} |
There was a problem hiding this comment.
| try: | |
| data = await request.json() # Get the JSON payload | |
| except Exception as e: | |
| return {"error": "Invalid JSON payload", "details": str(e)} | |
| try: | |
| data = await request.json() # Get the JSON payload | |
| except json.JSONDecodeError as e: | |
| return {"error": "Invalid JSON payload", "details": str(e)} |
| "Failing reporting the processing" | ||
| ) | ||
|
|
||
| async def _process_callback(self, request: Request): |
There was a problem hiding this comment.
I would recommend to use JSONResponse with HTTP code rather than dictionnary for easier future debugging
from fastapi.responses import JSONResponse
...
async def _process_callback(self, request: Request) -> JSONResponse:
# 01. Check the authentication
...
return JSONResponse(status_code=401, content={"error": "Invalid credentials"})
...
# 02. Parse the data and execute
return JSONResponse(status_code=400, content={"error": "Invalid JSON payload", "details": str(e)})
...
return JSONResponse(status_code=500, content={"error": "Error processing message", "details": str(e)})
...
# all good
return JSONResponse(status_code=202, content={"message": "Message successfully processed"})
See #850
Related to OpenCTI-Platform/opencti#10046