diff --git a/scapy/contrib/dicom.py b/scapy/contrib/dicom.py new file mode 100644 index 00000000000..30a657ce0fc --- /dev/null +++ b/scapy/contrib/dicom.py @@ -0,0 +1,1460 @@ +# SPDX-License-Identifier: GPL-2.0-only +# This file is part of Scapy +# See https://scapy.net/ for more information +# Copyright (C) Tyler M + +# scapy.contrib.description = DICOM (Digital Imaging and Communications in Medicine) +# scapy.contrib.status = loads + +""" +DICOM (Digital Imaging and Communications in Medicine) Protocol +Reference: DICOM PS3.8 - Network Communication Support for Message Exchange +https://dicom.nema.org/medical/dicom/current/output/html/part08.html +""" + +import logging +import socket +import struct +import time + +from scapy.packet import Packet, bind_layers +from scapy.error import Scapy_Exception +from scapy.fields import ( + BitField, + ByteEnumField, + ByteField, + ConditionalField, + Field, + FieldLenField, + IntField, + LenField, + PacketListField, + ShortField, + StrFixedLenField, + StrLenField, +) +from scapy.layers.inet import TCP +from scapy.supersocket import StreamSocket +from scapy.volatile import RandShort, RandInt, RandString + +__all__ = [ + "DICOM_PORT", + "APP_CONTEXT_UID", + "DEFAULT_TRANSFER_SYNTAX_UID", + "VERIFICATION_SOP_CLASS_UID", + "CT_IMAGE_STORAGE_SOP_CLASS_UID", + "PATIENT_ROOT_QR_FIND_SOP_CLASS_UID", + "PATIENT_ROOT_QR_MOVE_SOP_CLASS_UID", + "PATIENT_ROOT_QR_GET_SOP_CLASS_UID", + "STUDY_ROOT_QR_FIND_SOP_CLASS_UID", + "STUDY_ROOT_QR_MOVE_SOP_CLASS_UID", + "STUDY_ROOT_QR_GET_SOP_CLASS_UID", + "DICOM", + "A_ASSOCIATE_RQ", + "A_ASSOCIATE_AC", + "A_ASSOCIATE_RJ", + "P_DATA_TF", + "PresentationDataValueItem", + "A_RELEASE_RQ", + "A_RELEASE_RP", + "A_ABORT", + "DICOMVariableItem", + "DICOMApplicationContext", + "DICOMPresentationContextRQ", + "DICOMPresentationContextAC", + "DICOMAbstractSyntax", + "DICOMTransferSyntax", + "DICOMUserInformation", + "DICOMMaximumLength", + "DICOMImplementationClassUID", + "DICOMAsyncOperationsWindow", + "DICOMSCPSCURoleSelection", + "DICOMImplementationVersionName", + "DICOMUserIdentity", + "DICOMUserIdentityResponse", + "DICOMElementField", + "DICOMUIDField", + "DICOMUIDFieldRaw", + "DICOMUSField", + "DICOMULField", + "DICOMAEField", + "DIMSEPacket", + "C_ECHO_RQ", + "C_ECHO_RSP", + "C_STORE_RQ", + "C_STORE_RSP", + "C_FIND_RQ", + "C_FIND_RSP", + "C_MOVE_RQ", + "C_MOVE_RSP", + "C_GET_RQ", + "C_GET_RSP", + "DICOMSession", + "parse_dimse_status", + "_pad_ae_title", + "_uid_to_bytes", + "_uid_to_bytes_raw", + "build_c_echo_rq_dimse_raw", + "build_c_store_rq_dimse_raw", + "build_presentation_context_rq", + "build_user_information", +] + +log = logging.getLogger("scapy.contrib.dicom") + +DICOM_PORT = 104 +APP_CONTEXT_UID = "1.2.840.10008.3.1.1.1" +DEFAULT_TRANSFER_SYNTAX_UID = "1.2.840.10008.1.2" +VERIFICATION_SOP_CLASS_UID = "1.2.840.10008.1.1" +CT_IMAGE_STORAGE_SOP_CLASS_UID = "1.2.840.10008.5.1.4.1.1.2" + +# Query/Retrieve SOP Class UIDs +PATIENT_ROOT_QR_FIND_SOP_CLASS_UID = "1.2.840.10008.5.1.4.1.2.1.1" +PATIENT_ROOT_QR_MOVE_SOP_CLASS_UID = "1.2.840.10008.5.1.4.1.2.1.2" +PATIENT_ROOT_QR_GET_SOP_CLASS_UID = "1.2.840.10008.5.1.4.1.2.1.3" +STUDY_ROOT_QR_FIND_SOP_CLASS_UID = "1.2.840.10008.5.1.4.1.2.2.1" +STUDY_ROOT_QR_MOVE_SOP_CLASS_UID = "1.2.840.10008.5.1.4.1.2.2.2" +STUDY_ROOT_QR_GET_SOP_CLASS_UID = "1.2.840.10008.5.1.4.1.2.2.3" + +PDU_TYPES = { + 0x01: "A-ASSOCIATE-RQ", + 0x02: "A-ASSOCIATE-AC", + 0x03: "A-ASSOCIATE-RJ", + 0x04: "P-DATA-TF", + 0x05: "A-RELEASE-RQ", + 0x06: "A-RELEASE-RP", + 0x07: "A-ABORT", +} + +ITEM_TYPES = { + 0x10: "Application Context", + 0x20: "Presentation Context RQ", + 0x21: "Presentation Context AC", + 0x30: "Abstract Syntax", + 0x40: "Transfer Syntax", + 0x50: "User Information", + 0x51: "Maximum Length", + 0x52: "Implementation Class UID", + 0x53: "Asynchronous Operations Window", + 0x54: "SCP/SCU Role Selection", + 0x55: "Implementation Version Name", + 0x58: "User Identity", + 0x59: "User Identity Server Response", +} + +def _pad_ae_title(title): + if isinstance(title, bytes): + return title.ljust(16, b" ") + return title.ljust(16).encode("ascii") + +def _uid_to_bytes(uid): + if isinstance(uid, bytes): + b_uid = uid + elif isinstance(uid, str): + b_uid = uid.encode("ascii") + else: + return b"" + if len(b_uid) % 2 != 0: + b_uid += b"\x00" + return b_uid + +def _uid_to_bytes_raw(uid): + if isinstance(uid, bytes): + return uid + elif isinstance(uid, str): + return uid.encode("ascii") + else: + return b"" + +class DICOMElementField(Field): + __slots__ = ["tag_group", "tag_elem"] + + def __init__(self, name, default, tag_group, tag_elem): + self.tag_group = tag_group + self.tag_elem = tag_elem + Field.__init__(self, name, default) + + def addfield(self, pkt, s, val): + if val is None: + val = b"" + if isinstance(val, str): + val = val.encode("ascii") + return s + struct.pack("= 2: + return remain, struct.unpack("= 4: + return remain, struct.unpack(" {called}" + + def hashret(self): + return self.called_ae_title + self.calling_ae_title + +class A_ASSOCIATE_AC(Packet): + name = "A-ASSOCIATE-AC" + fields_desc = [ + ShortField("protocol_version", 1), + ShortField("reserved1", 0), + StrFixedLenField("called_ae_title", b"", 16), + StrFixedLenField("calling_ae_title", b"", 16), + StrFixedLenField("reserved2", b"\x00" * 32, 32), + PacketListField("variable_items", [], + DICOMVariableItem, + max_count=256, + length_from=lambda pkt: (pkt.underlayer.length - 68 + if pkt.underlayer and pkt.underlayer.length + else 0)), + ] + + def mysummary(self): + called = self.called_ae_title.strip() if isinstance(self.called_ae_title, bytes) else self.called_ae_title + calling = self.calling_ae_title.strip() if isinstance(self.calling_ae_title, bytes) else self.calling_ae_title + if isinstance(called, bytes): + called = called.decode("ascii", errors="replace") + if isinstance(calling, bytes): + calling = calling.decode("ascii", errors="replace") + return f"A-ASSOCIATE-AC {calling} <- {called}" + + def hashret(self): + return self.called_ae_title + self.calling_ae_title + + def answers(self, other): + return isinstance(other, A_ASSOCIATE_RQ) + +class A_ASSOCIATE_RJ(Packet): + name = "A-ASSOCIATE-RJ" + + RESULT_CODES = { + 1: "rejected-permanent", + 2: "rejected-transient", + } + + SOURCE_CODES = { + 1: "DICOM UL service-user", + 2: "DICOM UL service-provider (ACSE)", + 3: "DICOM UL service-provider (Presentation)", + } + + fields_desc = [ + ByteField("reserved1", 0), + ByteEnumField("result", 1, RESULT_CODES), + ByteEnumField("source", 1, SOURCE_CODES), + ByteField("reason_diag", 1), + ] + + def mysummary(self): + return self.sprintf("A-ASSOCIATE-RJ %result% %source%") + + def answers(self, other): + return isinstance(other, A_ASSOCIATE_RQ) + +class P_DATA_TF(Packet): + name = "P-DATA-TF" + fields_desc = [ + PacketListField("pdv_items", [], + PresentationDataValueItem, + max_count=256, + length_from=lambda pkt: (pkt.underlayer.length + if pkt.underlayer and pkt.underlayer.length + else 0)), + ] + + def mysummary(self): + return f"P-DATA-TF ({len(self.pdv_items)} PDVs)" + +class A_RELEASE_RQ(Packet): + name = "A-RELEASE-RQ" + fields_desc = [IntField("reserved1", 0)] + + def mysummary(self): + return "A-RELEASE-RQ" + +class A_RELEASE_RP(Packet): + name = "A-RELEASE-RP" + fields_desc = [IntField("reserved1", 0)] + + def mysummary(self): + return "A-RELEASE-RP" + + def answers(self, other): + return isinstance(other, A_RELEASE_RQ) + +class A_ABORT(Packet): + name = "A-ABORT" + + SOURCE_CODES = { + 0: "DICOM UL service-user", + 2: "DICOM UL service-provider", + } + + fields_desc = [ + ByteField("reserved1", 0), + ByteField("reserved2", 0), + ByteEnumField("source", 0, SOURCE_CODES), + ByteField("reason_diag", 0), + ] + + def mysummary(self): + return self.sprintf("A-ABORT %source%") + +bind_layers(TCP, DICOM, dport=DICOM_PORT) +bind_layers(TCP, DICOM, sport=DICOM_PORT) +bind_layers(DICOM, A_ASSOCIATE_RQ, pdu_type=0x01) +bind_layers(DICOM, A_ASSOCIATE_AC, pdu_type=0x02) +bind_layers(DICOM, A_ASSOCIATE_RJ, pdu_type=0x03) +bind_layers(DICOM, P_DATA_TF, pdu_type=0x04) +bind_layers(DICOM, A_RELEASE_RQ, pdu_type=0x05) +bind_layers(DICOM, A_RELEASE_RP, pdu_type=0x06) +bind_layers(DICOM, A_ABORT, pdu_type=0x07) + +def build_presentation_context_rq(context_id, abstract_syntax_uid, transfer_syntax_uids): + abs_syn = DICOMVariableItem() / DICOMAbstractSyntax(uid=_uid_to_bytes(abstract_syntax_uid)) + + sub_items = [abs_syn] + for ts_uid in transfer_syntax_uids: + ts = DICOMVariableItem() / DICOMTransferSyntax(uid=_uid_to_bytes(ts_uid)) + sub_items.append(ts) + + return DICOMVariableItem() / DICOMPresentationContextRQ( + context_id=context_id, + sub_items=sub_items, + ) + +def build_user_information(max_pdu_length=16384, implementation_class_uid=None, implementation_version=None): + sub_items = [ + DICOMVariableItem() / DICOMMaximumLength(max_pdu_length=max_pdu_length) + ] + + if implementation_class_uid: + sub_items.append( + DICOMVariableItem() / DICOMImplementationClassUID(uid=_uid_to_bytes(implementation_class_uid)) + ) + + if implementation_version: + ver_bytes = implementation_version if isinstance(implementation_version, bytes) else implementation_version.encode('ascii') + sub_items.append( + DICOMVariableItem() / DICOMImplementationVersionName(name=ver_bytes) + ) + + return DICOMVariableItem() / DICOMUserInformation(sub_items=sub_items) + +class DICOMSession: + def __init__(self, dst_ip, dst_port, dst_ae, src_ae="SCAPY_SCU", + read_timeout=10, raw_mode=False): + self.dst_ip = dst_ip + self.dst_port = dst_port + self.dst_ae = _pad_ae_title(dst_ae) + self.src_ae = _pad_ae_title(src_ae) + self.sock = None + self.stream = None + self.assoc_established = False + self.accepted_contexts = {} + self.read_timeout = read_timeout + self._current_message_id_counter = int(time.time()) % 50000 + self._proposed_max_pdu = 16384 + self.max_pdu_length = 16384 + self.raw_mode = raw_mode + self._proposed_context_map = {} + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + if self.assoc_established: + try: + self.release() + except Exception: + pass + self.close() + return False + + def connect(self): + try: + self.sock = socket.create_connection( + (self.dst_ip, self.dst_port), + timeout=self.read_timeout, + ) + self.stream = StreamSocket(self.sock, basecls=DICOM) + return True + except Exception as e: + log.error("Connection failed: %s", e) + return False + + def send(self, pkt): + self.stream.send(pkt) + + def recv(self): + try: + return self.stream.recv() + except socket.timeout: + return None + except Exception as e: + log.error("Error receiving PDU: %s", e) + return None + + def sr1(self, pkt): + try: + return self.stream.sr1(pkt, timeout=self.read_timeout) + except socket.timeout: + return None + except Exception as e: + log.error("Error in sr1: %s", e) + return None + + def send_raw_bytes(self, raw_bytes): + self.sock.sendall(raw_bytes) + + def associate(self, requested_contexts=None): + if not self.stream and not self.connect(): + return False + + if requested_contexts is None: + requested_contexts = { + VERIFICATION_SOP_CLASS_UID: [DEFAULT_TRANSFER_SYNTAX_UID] + } + + self._proposed_context_map = {} + + variable_items = [ + DICOMVariableItem() / DICOMApplicationContext() + ] + + ctx_id = 1 + for abs_syntax, trn_syntaxes in requested_contexts.items(): + self._proposed_context_map[ctx_id] = abs_syntax + pctx = build_presentation_context_rq(ctx_id, abs_syntax, trn_syntaxes) + variable_items.append(pctx) + ctx_id += 2 + + user_info = build_user_information(max_pdu_length=self._proposed_max_pdu) + variable_items.append(user_info) + + assoc_rq = DICOM() / A_ASSOCIATE_RQ( + called_ae_title=self.dst_ae, + calling_ae_title=self.src_ae, + variable_items=variable_items, + ) + + response = self.sr1(assoc_rq) + + if response: + if response.haslayer(A_ASSOCIATE_AC): + self.assoc_established = True + self._parse_accepted_contexts(response) + self._parse_max_pdu_length(response) + return True + elif response.haslayer(A_ASSOCIATE_RJ): + log.error( + "Association rejected: result=%d, source=%d, reason=%d", + response[A_ASSOCIATE_RJ].result, + response[A_ASSOCIATE_RJ].source, + response[A_ASSOCIATE_RJ].reason_diag, + ) + return False + + log.error("Association failed: no valid response received") + return False + + def _parse_max_pdu_length(self, response): + try: + for item in response[A_ASSOCIATE_AC].variable_items: + if item.item_type == 0x50 and item.haslayer(DICOMUserInformation): + user_info = item[DICOMUserInformation] + for sub_item in user_info.sub_items: + if sub_item.item_type == 0x51 and sub_item.haslayer(DICOMMaximumLength): + server_max = sub_item[DICOMMaximumLength].max_pdu_length + self.max_pdu_length = min(self._proposed_max_pdu, server_max) + log.debug("Negotiated max PDU length: %d", self.max_pdu_length) + return + except Exception as e: + log.debug("Could not parse max PDU length: %s", e) + self.max_pdu_length = self._proposed_max_pdu + + def _parse_accepted_contexts(self, response): + for item in response[A_ASSOCIATE_AC].variable_items: + if item.item_type == 0x21 and item.haslayer(DICOMPresentationContextAC): + pctx = item[DICOMPresentationContextAC] + ctx_id = pctx.context_id + result = pctx.result + + if result != 0: + log.debug("Presentation context %d rejected (result=%d)", ctx_id, result) + continue + + abs_syntax = self._proposed_context_map.get(ctx_id) + if abs_syntax is None: + log.warning( + "Server accepted context ID %d which we didn't propose!", + ctx_id + ) + continue + + for sub_item in pctx.sub_items: + if sub_item.item_type == 0x40 and sub_item.haslayer(DICOMTransferSyntax): + ts_uid = sub_item[DICOMTransferSyntax].uid + if isinstance(ts_uid, bytes): + ts_uid = ts_uid.rstrip(b"\x00").decode("ascii") + self.accepted_contexts[ctx_id] = (abs_syntax, ts_uid) + log.debug( + "Accepted context %d: %s with transfer syntax %s", + ctx_id, abs_syntax, ts_uid + ) + break + + def _get_next_message_id(self): + self._current_message_id_counter += 1 + return self._current_message_id_counter & 0xFFFF + + def _find_accepted_context_id(self, sop_class_uid, transfer_syntax_uid=None): + for ctx_id, (abs_syntax, ts_syntax) in self.accepted_contexts.items(): + if abs_syntax == sop_class_uid: + if transfer_syntax_uid is None or transfer_syntax_uid == ts_syntax: + return ctx_id + return None + + def c_echo(self): + if not self.assoc_established: + log.error("Association not established") + return None + + echo_ctx_id = self._find_accepted_context_id(VERIFICATION_SOP_CLASS_UID) + if echo_ctx_id is None: + log.error("No accepted context for Verification SOP Class") + return None + + msg_id = self._get_next_message_id() + dimse_rq = bytes(C_ECHO_RQ(message_id=msg_id)) + + pdv_rq = PresentationDataValueItem( + context_id=echo_ctx_id, + data=dimse_rq, + is_command=1, + is_last=1, + ) + pdata_rq = DICOM() / P_DATA_TF(pdv_items=[pdv_rq]) + + response = self.sr1(pdata_rq) + + if response and response.haslayer(P_DATA_TF): + pdv_items = response[P_DATA_TF].pdv_items + if pdv_items: + pdv_rsp = pdv_items[0] + data = pdv_rsp.data + if isinstance(data, str): + data = data.encode("latin-1") + return parse_dimse_status(data) + return None + + def c_store(self, dataset_bytes, sop_class_uid, sop_instance_uid, + transfer_syntax_uid): + if not self.assoc_established: + log.error("Association not established") + return None + + store_ctx_id = self._find_accepted_context_id( + sop_class_uid, + transfer_syntax_uid, + ) + if store_ctx_id is None: + log.error( + "No accepted context for SOP Class %s with Transfer Syntax %s", + sop_class_uid, + transfer_syntax_uid, + ) + return None + + msg_id = self._get_next_message_id() + + if self.raw_mode: + dimse_rq = build_c_store_rq_dimse_raw(sop_class_uid, sop_instance_uid, msg_id) + else: + dimse_rq = bytes(C_STORE_RQ( + affected_sop_class_uid=sop_class_uid, + affected_sop_instance_uid=sop_instance_uid, + message_id=msg_id, + )) + + cmd_pdv = PresentationDataValueItem( + context_id=store_ctx_id, + data=dimse_rq, + is_command=1, + is_last=1, + ) + pdata_cmd = DICOM() / P_DATA_TF(pdv_items=[cmd_pdv]) + self.send(pdata_cmd) + + max_pdv_data = self.max_pdu_length - 12 + + if len(dataset_bytes) <= max_pdv_data: + data_pdv = PresentationDataValueItem( + context_id=store_ctx_id, + data=dataset_bytes, + is_command=0, + is_last=1, + ) + pdata_data = DICOM() / P_DATA_TF(pdv_items=[data_pdv]) + self.send(pdata_data) + else: + offset = 0 + while offset < len(dataset_bytes): + chunk = dataset_bytes[offset:offset + max_pdv_data] + is_last = 1 if (offset + len(chunk) >= len(dataset_bytes)) else 0 + data_pdv = PresentationDataValueItem( + context_id=store_ctx_id, + data=chunk, + is_command=0, + is_last=is_last, + ) + pdata_data = DICOM() / P_DATA_TF(pdv_items=[data_pdv]) + self.send(pdata_data) + offset += len(chunk) + log.debug( + "Fragmented %d bytes into %d PDUs", + len(dataset_bytes), + (len(dataset_bytes) + max_pdv_data - 1) // max_pdv_data, + ) + + response = self.recv() + + if response and response.haslayer(P_DATA_TF): + pdv_items = response[P_DATA_TF].pdv_items + if pdv_items: + pdv_rsp = pdv_items[0] + data = pdv_rsp.data + if isinstance(data, str): + data = data.encode("latin-1") + return parse_dimse_status(data) + return None + + def c_store_raw(self, dataset_bytes, sop_class_uid, sop_instance_uid, + context_id, skip_padding=True): + if not self.assoc_established: + log.error("Association not established") + return None + + msg_id = self._get_next_message_id() + + if skip_padding: + dimse_rq = build_c_store_rq_dimse_raw(sop_class_uid, sop_instance_uid, msg_id) + else: + dimse_rq = bytes(C_STORE_RQ( + affected_sop_class_uid=sop_class_uid, + affected_sop_instance_uid=sop_instance_uid, + message_id=msg_id, + )) + + cmd_pdv = PresentationDataValueItem( + context_id=context_id, + data=dimse_rq, + is_command=1, + is_last=1, + ) + pdata_cmd = DICOM() / P_DATA_TF(pdv_items=[cmd_pdv]) + self.send(pdata_cmd) + + data_pdv = PresentationDataValueItem( + context_id=context_id, + data=dataset_bytes, + is_command=0, + is_last=1, + ) + pdata_data = DICOM() / P_DATA_TF(pdv_items=[data_pdv]) + self.send(pdata_data) + + response = self.recv() + + if response: + if response.haslayer(P_DATA_TF): + pdv_items = response[P_DATA_TF].pdv_items + if pdv_items: + pdv_rsp = pdv_items[0] + data = pdv_rsp.data + if isinstance(data, str): + data = data.encode("latin-1") + return parse_dimse_status(data) + elif response.haslayer(A_ABORT): + log.info("Server aborted connection (expected for malformed data)") + return None + return None + + def release(self): + if not self.assoc_established: + return True + + release_rq = DICOM() / A_RELEASE_RQ() + response = self.sr1(release_rq) + self.close() + + if response: + return response.haslayer(A_RELEASE_RP) + return False + + def close(self): + if self.stream: + try: + self.stream.close() + except Exception: + pass + self.sock = None + self.stream = None + self.assoc_established = False \ No newline at end of file diff --git a/test/contrib/dicom.uts b/test/contrib/dicom.uts new file mode 100644 index 00000000000..0ddffb18136 --- /dev/null +++ b/test/contrib/dicom.uts @@ -0,0 +1,581 @@ +% DICOM (Digital Imaging and Communications in Medicine) tests + +# Type the following command to launch the tests: +# $ test/run_tests -P "load_contrib('dicom')" -t test/contrib/dicom.uts + +############ +############ ++ DICOM module loading + += Import DICOM module +load_contrib("dicom", globals_dict=globals()) + += Verify essential classes are exported +assert DICOM is not None +assert A_ASSOCIATE_RQ is not None +assert A_ASSOCIATE_AC is not None +assert A_ASSOCIATE_RJ is not None +assert P_DATA_TF is not None +assert A_RELEASE_RQ is not None +assert A_RELEASE_RP is not None +assert A_ABORT is not None +assert DICOMVariableItem is not None +assert DICOMApplicationContext is not None +assert DICOMPresentationContextRQ is not None +assert DICOMUserInformation is not None +assert DICOMMaximumLength is not None + += Verify DIMSE packet classes are exported +assert C_ECHO_RQ is not None +assert C_ECHO_RSP is not None +assert C_STORE_RQ is not None +assert C_STORE_RSP is not None +assert C_FIND_RQ is not None +assert C_FIND_RSP is not None +assert C_MOVE_RQ is not None +assert C_MOVE_RSP is not None +assert C_GET_RQ is not None +assert C_GET_RSP is not None + += Verify constants are exported +assert DICOM_PORT == 104 +assert APP_CONTEXT_UID == "1.2.840.10008.3.1.1.1" +assert DEFAULT_TRANSFER_SYNTAX_UID == "1.2.840.10008.1.2" +assert VERIFICATION_SOP_CLASS_UID == "1.2.840.10008.1.1" + += Verify Query/Retrieve SOP Class UIDs are exported +assert PATIENT_ROOT_QR_FIND_SOP_CLASS_UID == "1.2.840.10008.5.1.4.1.2.1.1" +assert PATIENT_ROOT_QR_MOVE_SOP_CLASS_UID == "1.2.840.10008.5.1.4.1.2.1.2" +assert PATIENT_ROOT_QR_GET_SOP_CLASS_UID == "1.2.840.10008.5.1.4.1.2.1.3" +assert STUDY_ROOT_QR_FIND_SOP_CLASS_UID == "1.2.840.10008.5.1.4.1.2.2.1" +assert STUDY_ROOT_QR_MOVE_SOP_CLASS_UID == "1.2.840.10008.5.1.4.1.2.2.2" +assert STUDY_ROOT_QR_GET_SOP_CLASS_UID == "1.2.840.10008.5.1.4.1.2.2.3" + +############ +############ ++ PDU header tests + += DICOM PDU header construction +pkt = DICOM() +assert pkt.pdu_type == 0x01 +assert pkt.reserved1 == 0 + += DICOM PDU type field values +import struct +for pdu_type, expected_class in [(0x01, A_ASSOCIATE_RQ), (0x02, A_ASSOCIATE_AC), + (0x03, A_ASSOCIATE_RJ), (0x04, P_DATA_TF), + (0x05, A_RELEASE_RQ), (0x06, A_RELEASE_RP), + (0x07, A_ABORT)]: + pkt = DICOM() / expected_class() + raw_bytes = bytes(pkt) + assert raw_bytes[0] == pdu_type + +############ +############ ++ LenField auto-calculation tests + += DICOM header auto-calculates payload length +pkt = DICOM() / A_RELEASE_RQ() +raw = bytes(pkt) +length_field = struct.unpack("!I", raw[2:6])[0] +payload_size = len(raw) - 6 +assert length_field == payload_size +assert length_field == 4 + += Variable item length auto-calculated +pkt = DICOMVariableItem() / DICOMApplicationContext() +raw = bytes(pkt) +length_field = struct.unpack("!H", raw[2:4])[0] +payload_size = len(raw) - 4 +assert length_field == payload_size + += Nested items have correct cumulative length +max_len = DICOMVariableItem() / DICOMMaximumLength(max_pdu_length=16384) +user_info = DICOMVariableItem() / DICOMUserInformation(sub_items=[max_len]) +raw = bytes(user_info) +assert len(raw) == 12 +ui_length = struct.unpack("!H", raw[2:4])[0] +assert ui_length == 8 + +############ +############ ++ Variable item bind_layers tests + += Application Context bind_layers (type 0x10) +pkt = DICOMVariableItem() / DICOMApplicationContext() +assert pkt.item_type == 0x10 +raw = bytes(pkt) +parsed = DICOMVariableItem(raw) +assert parsed.item_type == 0x10 +assert parsed.haslayer(DICOMApplicationContext) + += Abstract Syntax bind_layers (type 0x30) +uid = _uid_to_bytes(VERIFICATION_SOP_CLASS_UID) +pkt = DICOMVariableItem() / DICOMAbstractSyntax(uid=uid) +assert pkt.item_type == 0x30 +raw = bytes(pkt) +parsed = DICOMVariableItem(raw) +assert parsed.item_type == 0x30 +assert parsed.haslayer(DICOMAbstractSyntax) +assert parsed[DICOMAbstractSyntax].uid == uid + += Transfer Syntax bind_layers (type 0x40) +pkt = DICOMVariableItem() / DICOMTransferSyntax() +assert pkt.item_type == 0x40 +raw = bytes(pkt) +parsed = DICOMVariableItem(raw) +assert parsed.item_type == 0x40 +assert parsed.haslayer(DICOMTransferSyntax) + += Maximum Length bind_layers (type 0x51) +pkt = DICOMVariableItem() / DICOMMaximumLength(max_pdu_length=32768) +assert pkt.item_type == 0x51 +raw = bytes(pkt) +parsed = DICOMVariableItem(raw) +assert parsed.item_type == 0x51 +assert parsed.haslayer(DICOMMaximumLength) +assert parsed[DICOMMaximumLength].max_pdu_length == 32768 + += User Information bind_layers (type 0x50) +max_len = DICOMVariableItem() / DICOMMaximumLength(max_pdu_length=16384) +pkt = DICOMVariableItem() / DICOMUserInformation(sub_items=[max_len]) +assert pkt.item_type == 0x50 +raw = bytes(pkt) +parsed = DICOMVariableItem(raw) +assert parsed.item_type == 0x50 +assert parsed.haslayer(DICOMUserInformation) + += Presentation Context RQ bind_layers (type 0x20) +abs_syn = DICOMVariableItem() / DICOMAbstractSyntax(uid=_uid_to_bytes(VERIFICATION_SOP_CLASS_UID)) +ts = DICOMVariableItem() / DICOMTransferSyntax() +pkt = DICOMVariableItem() / DICOMPresentationContextRQ(context_id=1, sub_items=[abs_syn, ts]) +assert pkt.item_type == 0x20 +raw = bytes(pkt) +parsed = DICOMVariableItem(raw) +assert parsed.item_type == 0x20 +assert parsed.haslayer(DICOMPresentationContextRQ) +assert parsed[DICOMPresentationContextRQ].context_id == 1 + += Presentation Context AC bind_layers (type 0x21) +ts = DICOMVariableItem() / DICOMTransferSyntax() +pkt = DICOMVariableItem() / DICOMPresentationContextAC(context_id=1, result=0, sub_items=[ts]) +assert pkt.item_type == 0x21 +raw = bytes(pkt) +parsed = DICOMVariableItem(raw) +assert parsed.item_type == 0x21 +assert parsed.haslayer(DICOMPresentationContextAC) +assert parsed[DICOMPresentationContextAC].result == 0 + += Unknown item type uses guess_payload_class fallback +raw = struct.pack("!BBH", 0xFF, 0, 4) + b"test" +parsed = DICOMVariableItem(raw) +assert parsed.item_type == 0xFF +assert parsed.length == 4 +assert parsed.payload is not None + +############ +############ ++ A-ASSOCIATE-RQ tests + += Build simple A-ASSOCIATE-RQ +app_ctx = DICOMVariableItem() / DICOMApplicationContext() +pctx = build_presentation_context_rq(1, VERIFICATION_SOP_CLASS_UID, [DEFAULT_TRANSFER_SYNTAX_UID]) +user_info = build_user_information(max_pdu_length=16384) +assoc_rq = DICOM() / A_ASSOCIATE_RQ( + called_ae_title=_pad_ae_title("TARGET"), + calling_ae_title=_pad_ae_title("SOURCE"), + variable_items=[app_ctx, pctx, user_info] +) +raw = bytes(assoc_rq) +parsed = DICOM(raw) +assert parsed.haslayer(A_ASSOCIATE_RQ) +items = parsed[A_ASSOCIATE_RQ].variable_items +assert len(items) == 3 +assert items[0].item_type == 0x10 +assert items[1].item_type == 0x20 +assert items[2].item_type == 0x50 + += A-ASSOCIATE-RQ round-trip preserves AE titles +original = DICOM() / A_ASSOCIATE_RQ( + called_ae_title=_pad_ae_title("TARGET"), + calling_ae_title=_pad_ae_title("SOURCE"), +) +serialized = bytes(original) +parsed = DICOM(serialized) +assert parsed.haslayer(A_ASSOCIATE_RQ) +assert parsed[A_ASSOCIATE_RQ].called_ae_title == b"TARGET " +assert parsed[A_ASSOCIATE_RQ].calling_ae_title == b"SOURCE " + += A-ASSOCIATE-RQ with multiple presentation contexts +app_ctx = DICOMVariableItem() / DICOMApplicationContext() +pctx1 = build_presentation_context_rq(1, VERIFICATION_SOP_CLASS_UID, [DEFAULT_TRANSFER_SYNTAX_UID]) +pctx2 = build_presentation_context_rq(3, CT_IMAGE_STORAGE_SOP_CLASS_UID, [DEFAULT_TRANSFER_SYNTAX_UID]) +user_info = build_user_information() +assoc_rq = DICOM() / A_ASSOCIATE_RQ( + called_ae_title=_pad_ae_title("TARGET"), + calling_ae_title=_pad_ae_title("SOURCE"), + variable_items=[app_ctx, pctx1, pctx2, user_info] +) +raw = bytes(assoc_rq) +parsed = DICOM(raw) +items = parsed[A_ASSOCIATE_RQ].variable_items +assert len(items) == 4 +pctx_items = [i for i in items if i.item_type == 0x20] +assert len(pctx_items) == 2 +assert pctx_items[0][DICOMPresentationContextRQ].context_id == 1 +assert pctx_items[1][DICOMPresentationContextRQ].context_id == 3 + +############ +############ ++ A-ASSOCIATE-RJ tests + += A-ASSOCIATE-RJ construction and parsing +pkt = DICOM() / A_ASSOCIATE_RJ(result=1, source=2, reason_diag=2) +reparsed = DICOM(bytes(pkt)) +assert reparsed.haslayer(A_ASSOCIATE_RJ) +assert reparsed[A_ASSOCIATE_RJ].source == 2 + +############ +############ ++ A-RELEASE tests + += A-RELEASE-RQ round-trip +original = DICOM() / A_RELEASE_RQ() +serialized = bytes(original) +parsed = DICOM(serialized) +assert parsed.haslayer(A_RELEASE_RQ) + += A-RELEASE-RP round-trip +original = DICOM() / A_RELEASE_RP() +serialized = bytes(original) +parsed = DICOM(serialized) +assert parsed.haslayer(A_RELEASE_RP) + +############ +############ ++ A-ABORT tests + += A-ABORT round-trip +original = DICOM() / A_ABORT(source=2, reason_diag=6) +serialized = bytes(original) +parsed = DICOM(serialized) +assert parsed.haslayer(A_ABORT) +assert parsed[A_ABORT].source == 2 +assert parsed[A_ABORT].reason_diag == 6 + +############ +############ ++ P-DATA-TF tests + += PresentationDataValueItem length linked to data +test_data = b"TEST_DATA_12345" +pdv = PresentationDataValueItem(context_id=1, data=test_data, is_command=1, is_last=1) +raw = bytes(pdv) +length = struct.unpack("!I", raw[:4])[0] +assert length == len(test_data) + 2 + += P-DATA-TF with multiple PDV items - build only +pdv1 = PresentationDataValueItem(context_id=1, data=b'\xDE\xAD', is_command=1, is_last=0) +pdv2 = PresentationDataValueItem(context_id=1, data=b'\xBE\xEF', is_command=0, is_last=1) +pkt = DICOM() / P_DATA_TF(pdv_items=[pdv1, pdv2]) +raw = bytes(pkt) +assert raw[0] == 0x04 +assert len(raw) > 6 + += P-DATA-TF round-trip - build and verify structure +test_data = b"\x01\x02\x03\x04\x05" +pdv = PresentationDataValueItem(context_id=3, data=test_data, is_command=1, is_last=1) +original = DICOM() / P_DATA_TF(pdv_items=[pdv]) +serialized = bytes(original) +assert serialized[0] == 0x04 +length = struct.unpack("!I", serialized[2:6])[0] +assert length > 0 +assert test_data in serialized + += PDV is_command flag encoding +pdv = PresentationDataValueItem(context_id=1, data=b'x', is_command=1, is_last=0) +raw = bytes(pdv) +msg_ctrl = raw[5] +assert msg_ctrl & 0x01 == 1 +assert msg_ctrl & 0x02 == 0 + += PDV is_last flag encoding +pdv = PresentationDataValueItem(context_id=1, data=b'x', is_command=0, is_last=1) +raw = bytes(pdv) +msg_ctrl = raw[5] +assert msg_ctrl & 0x01 == 0 +assert msg_ctrl & 0x02 == 2 + += PDV both flags set +pdv = PresentationDataValueItem(context_id=1, data=b'x', is_command=1, is_last=1) +raw = bytes(pdv) +msg_ctrl = raw[5] +assert msg_ctrl == 0x03 + += PDV flags encoding verification +for is_cmd in [0, 1]: + for is_last in [0, 1]: + pdv = PresentationDataValueItem(context_id=1, data=b'test', is_command=is_cmd, is_last=is_last) + raw = bytes(pdv) + msg_ctrl = raw[5] + assert (msg_ctrl & 0x01) == is_cmd + assert (msg_ctrl & 0x02) == (is_last << 1) + +############ +############ ++ DIMSE packet tests + += C_ECHO_RQ creation with defaults +pkt = C_ECHO_RQ() +raw = bytes(pkt) +assert raw[:4] == b'\x00\x00\x00\x00' +assert b'1.2.840.10008.1.1' in raw + += C_ECHO_RQ custom message_id +pkt = C_ECHO_RQ(message_id=12345) +raw = bytes(pkt) +assert b'\x10\x01' in raw +assert struct.pack("= 1 +assert sub_items[0].item_type == 0x51 +assert sub_items[0][DICOMMaximumLength].max_pdu_length == 32768 + += build_user_information with implementation info +user_info = build_user_information( + max_pdu_length=16384, + implementation_class_uid="1.2.3.4.5", + implementation_version="SCAPY_V1" +) +sub_items = user_info[DICOMUserInformation].sub_items +assert len(sub_items) == 3 +types = [item.item_type for item in sub_items] +assert 0x51 in types +assert 0x52 in types +assert 0x55 in types + += _pad_ae_title pads to 16 bytes +assert _pad_ae_title("TEST") == b"TEST " +assert len(_pad_ae_title("X")) == 16 +assert _pad_ae_title(b"BYTES") == b"BYTES " + += _uid_to_bytes pads odd-length UIDs +assert len(_uid_to_bytes("1.2.3")) % 2 == 0 +assert _uid_to_bytes("1.2.3.4") == b"1.2.3.4\x00" +assert _uid_to_bytes("1.2.3") == b"1.2.3\x00" + +############ +############ ++ User Identity Negotiation tests + += User Identity username only (type 1) +pkt = DICOMVariableItem() / DICOMUserIdentity( + user_identity_type=1, + positive_response_requested=0, + primary_field=b"admin" +) +assert pkt.item_type == 0x58 +raw = bytes(pkt) +parsed = DICOMVariableItem(raw) +assert parsed.haslayer(DICOMUserIdentity) +assert parsed[DICOMUserIdentity].user_identity_type == 1 +assert parsed[DICOMUserIdentity].primary_field == b"admin" + += User Identity username+password (type 2) +pkt = DICOMVariableItem() / DICOMUserIdentity( + user_identity_type=2, + positive_response_requested=1, + primary_field=b"admin", + secondary_field=b"password123" +) +raw = bytes(pkt) +parsed = DICOMVariableItem(raw) +assert parsed.haslayer(DICOMUserIdentity) +assert parsed[DICOMUserIdentity].user_identity_type == 2 +assert parsed[DICOMUserIdentity].primary_field == b"admin" +assert parsed[DICOMUserIdentity].secondary_field == b"password123" + += User Identity Response +pkt = DICOMVariableItem() / DICOMUserIdentityResponse( + server_response=b"auth_token_12345" +) +assert pkt.item_type == 0x59 +raw = bytes(pkt) +parsed = DICOMVariableItem(raw) +assert parsed.haslayer(DICOMUserIdentityResponse) +assert parsed[DICOMUserIdentityResponse].server_response == b"auth_token_12345" + +############ +############ ++ Async Operations Window tests + += Async operations default values +pkt = DICOMVariableItem() / DICOMAsyncOperationsWindow() +assert pkt.item_type == 0x53 +raw = bytes(pkt) +parsed = DICOMVariableItem(raw) +assert parsed.haslayer(DICOMAsyncOperationsWindow) +assert parsed[DICOMAsyncOperationsWindow].max_ops_invoked == 1 +assert parsed[DICOMAsyncOperationsWindow].max_ops_performed == 1 + += Async operations custom values +pkt = DICOMVariableItem() / DICOMAsyncOperationsWindow( + max_ops_invoked=8, + max_ops_performed=4 +) +raw = bytes(pkt) +parsed = DICOMVariableItem(raw) +assert parsed[DICOMAsyncOperationsWindow].max_ops_invoked == 8 +assert parsed[DICOMAsyncOperationsWindow].max_ops_performed == 4 + +############ +############ ++ SCP/SCU Role Selection tests + += Role Selection SCU only +pkt = DICOMVariableItem() / DICOMSCPSCURoleSelection( + sop_class_uid=b"1.2.840.10008.5.1.4.1.1.2", + scu_role=1, + scp_role=0 +) +assert pkt.item_type == 0x54 +raw = bytes(pkt) +parsed = DICOMVariableItem(raw) +assert parsed.haslayer(DICOMSCPSCURoleSelection) +assert parsed[DICOMSCPSCURoleSelection].sop_class_uid == b"1.2.840.10008.5.1.4.1.1.2" +assert parsed[DICOMSCPSCURoleSelection].scu_role == 1 +assert parsed[DICOMSCPSCURoleSelection].scp_role == 0 + +############ +############ ++ DICOM extract_padding for StreamSocket + += DICOM extract_padding method exists +pkt = DICOM() +assert hasattr(pkt, 'extract_padding') + += DICOM extract_padding separates payload from trailing data +pkt = DICOM() +pkt.length = 10 +payload, remaining = pkt.extract_padding(b'0123456789EXTRA') +assert payload == b'0123456789' +assert remaining == b'EXTRA' + +############ +############ ++ TCP layer binding + += DICOM binds to TCP port 104 +from scapy.layers.inet import TCP +pkt = TCP(dport=104) / b'\x01\x00\x00\x00\x00\x04' +assert DICOM in pkt or pkt.payload + +############ +############ ++ Edge cases + += Empty variable items list +pkt = DICOM() / A_ASSOCIATE_RQ(variable_items=[]) +serialized = bytes(pkt) +parsed = DICOM(serialized) +assert parsed.haslayer(A_ASSOCIATE_RQ) + += Empty PDV items list +pkt = DICOM() / P_DATA_TF(pdv_items=[]) +serialized = bytes(pkt) +assert len(serialized) == 6