From 63c088eadae382d202baab5b788543304f9646a2 Mon Sep 17 00:00:00 2001 From: Tyler M Date: Mon, 29 Dec 2025 21:09:15 -0500 Subject: [PATCH 1/5] add dicom add dicom --- scapy/contrib/dicom.py | 1575 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 1575 insertions(+) create mode 100644 scapy/contrib/dicom.py diff --git a/scapy/contrib/dicom.py b/scapy/contrib/dicom.py new file mode 100644 index 00000000000..132b2b2af66 --- /dev/null +++ b/scapy/contrib/dicom.py @@ -0,0 +1,1575 @@ +# SPDX-License-Identifier: GPL-2.0-only +# This file is part of Scapy +# See https://scapy.net/ for more information +# Copyright (C) Tyler M + +# scapy.contrib.description = DICOM (Digital Imaging and Communications in Medicine) +# scapy.contrib.status = loads + +""" +DICOM (Digital Imaging and Communications in Medicine) Protocol + +This module provides Scapy layers for the DICOM Upper Layer Protocol, +enabling packet crafting, parsing, and network analysis of DICOM +communications commonly used in medical imaging systems. + +Reference: DICOM PS3.8 - Network Communication Support for Message Exchange +https://dicom.nema.org/medical/dicom/current/output/html/part08.html + +Example usage:: + + >>> from scapy.contrib.dicom import * + >>> # Build an A-ASSOCIATE-RQ + >>> pkt = DICOM()/A_ASSOCIATE_RQ( + ... called_ae_title=_pad_ae_title("SERVER"), + ... calling_ae_title=_pad_ae_title("CLIENT"), + ... variable_items=[ + ... DICOMVariableItem()/DICOMApplicationContext(), + ... build_presentation_context_rq(1, VERIFICATION_SOP_CLASS_UID, + ... [DEFAULT_TRANSFER_SYNTAX_UID]), + ... build_user_information(max_pdu_length=16384), + ... ] + ... ) + >>> pkt.show() + >>> # Use DICOMSession for high-level operations + >>> session = DICOMSession("192.168.1.100", 104, "TARGET_AE") + >>> if session.associate(): + ... status = session.c_echo() + ... print(f"C-ECHO status: {status}") + ... session.release() +""" + +import logging +import socket +import struct +import time + +from scapy.packet import Packet, bind_layers +from scapy.error import Scapy_Exception +from scapy.fields import ( + BitField, + ByteEnumField, + ByteField, + ConditionalField, + Field, + FieldLenField, + IntField, + LenField, + PacketListField, + ShortField, + StrFixedLenField, + StrLenField, +) +from scapy.layers.inet import TCP +from scapy.supersocket import StreamSocket +from scapy.volatile import RandShort, RandInt, RandString + +__all__ = [ + # Constants + "DICOM_PORT", + "APP_CONTEXT_UID", + "DEFAULT_TRANSFER_SYNTAX_UID", + "VERIFICATION_SOP_CLASS_UID", + "CT_IMAGE_STORAGE_SOP_CLASS_UID", + # PDU Packet classes + "DICOM", + "A_ASSOCIATE_RQ", + "A_ASSOCIATE_AC", + "A_ASSOCIATE_RJ", + "P_DATA_TF", + "PresentationDataValueItem", + "A_RELEASE_RQ", + "A_RELEASE_RP", + "A_ABORT", + # Variable Item classes + "DICOMVariableItem", + "DICOMApplicationContext", + "DICOMPresentationContextRQ", + "DICOMPresentationContextAC", + "DICOMAbstractSyntax", + "DICOMTransferSyntax", + "DICOMUserInformation", + "DICOMMaximumLength", + "DICOMImplementationClassUID", + "DICOMAsyncOperationsWindow", + "DICOMSCPSCURoleSelection", + "DICOMImplementationVersionName", + "DICOMUserIdentity", + "DICOMUserIdentityResponse", + # DIMSE Custom Fields + "DICOMElementField", + "DICOMUIDField", + "DICOMUIDFieldRaw", + "DICOMUSField", + "DICOMULField", + # DIMSE Command Packets + "DIMSEPacket", + "C_ECHO_RQ", + "C_ECHO_RSP", + "C_STORE_RQ", + "C_STORE_RSP", + "C_FIND_RQ", + # Session helper + "DICOMSession", + # DIMSE utilities + "parse_dimse_status", + # Utility functions + "_pad_ae_title", + "_uid_to_bytes", + # Raw/Fuzzing utilities + "_uid_to_bytes_raw", + "build_c_echo_rq_dimse_raw", + "build_c_store_rq_dimse_raw", + # Builder helpers + "build_presentation_context_rq", + "build_user_information", +] + +log = logging.getLogger("scapy.contrib.dicom") + +# --- Constants --- +DICOM_PORT = 104 +APP_CONTEXT_UID = "1.2.840.10008.3.1.1.1" +DEFAULT_TRANSFER_SYNTAX_UID = "1.2.840.10008.1.2" # Implicit VR Little Endian +VERIFICATION_SOP_CLASS_UID = "1.2.840.10008.1.1" +CT_IMAGE_STORAGE_SOP_CLASS_UID = "1.2.840.10008.5.1.4.1.1.2" + +# PDU Type definitions +PDU_TYPES = { + 0x01: "A-ASSOCIATE-RQ", + 0x02: "A-ASSOCIATE-AC", + 0x03: "A-ASSOCIATE-RJ", + 0x04: "P-DATA-TF", + 0x05: "A-RELEASE-RQ", + 0x06: "A-RELEASE-RP", + 0x07: "A-ABORT", +} + +# Variable Item Type definitions +ITEM_TYPES = { + 0x10: "Application Context", + 0x20: "Presentation Context RQ", + 0x21: "Presentation Context AC", + 0x30: "Abstract Syntax", + 0x40: "Transfer Syntax", + 0x50: "User Information", + 0x51: "Maximum Length", + 0x52: "Implementation Class UID", + 0x53: "Asynchronous Operations Window", + 0x54: "SCP/SCU Role Selection", + 0x55: "Implementation Version Name", + 0x58: "User Identity", + 0x59: "User Identity Server Response", +} + + +# --- Helper Functions --- + +def _pad_ae_title(title): + """Pad an Application Entity title to 16 bytes with spaces.""" + if isinstance(title, bytes): + return title.ljust(16, b" ") + return title.ljust(16).encode("ascii") + + +def _uid_to_bytes(uid): + """ + Convert a UID string to bytes, padding to even length if needed. + + Note: This function auto-corrects odd-length UIDs per DICOM spec. + For fuzzing with intentionally malformed UIDs, use _uid_to_bytes_raw(). + """ + if isinstance(uid, bytes): + b_uid = uid + elif isinstance(uid, str): + b_uid = uid.encode("ascii") + else: + return b"" + if len(b_uid) % 2 != 0: + b_uid += b"\x00" + return b_uid + + +def _uid_to_bytes_raw(uid): + """ + Convert a UID string to bytes WITHOUT padding correction. + + Use this for fuzzing to send intentionally malformed odd-length UIDs. + """ + if isinstance(uid, bytes): + return uid + elif isinstance(uid, str): + return uid.encode("ascii") + else: + return b"" + + +# ============================================================================= +# DIMSE Custom Fields +# ============================================================================= + + +class DICOMElementField(Field): + """ + Base field for DICOM data elements (Tag-Length-Value structure). + + Each element is encoded as: + - Tag Group: 2 bytes (little-endian) + - Tag Element: 2 bytes (little-endian) + - Value Length: 4 bytes (little-endian) + - Value: variable bytes + + The tag is fixed at field definition time. + """ + __slots__ = ["tag_group", "tag_elem"] + + def __init__(self, name, default, tag_group, tag_elem): + self.tag_group = tag_group + self.tag_elem = tag_elem + Field.__init__(self, name, default) + + def addfield(self, pkt, s, val): + if val is None: + val = b"" + if isinstance(val, str): + val = val.encode("ascii") + return s + struct.pack("= 2: + return remain, struct.unpack("= 4: + return remain, struct.unpack(">> pkt = DICOM()/A_ASSOCIATE_RQ(called_ae_title=_pad_ae_title("SERVER")) + >>> pkt.show() + """ + name = "DICOM UL" + fields_desc = [ + ByteEnumField("pdu_type", 0x01, PDU_TYPES), + ByteField("reserved1", 0), + LenField("length", None, fmt="!I"), + ] + + def extract_padding(self, s): + if self.length is not None: + return s[:self.length], s[self.length:] + return s, b"" + + def mysummary(self): + return self.sprintf("DICOM %pdu_type%") + + +class PresentationDataValueItem(Packet): + """Presentation Data Value Item within a P-DATA-TF PDU.""" + name = "PresentationDataValueItem" + fields_desc = [ + FieldLenField("length", None, length_of="data", fmt="!I", + adjust=lambda pkt, x: x + 2), + ByteField("context_id", 1), + BitField("reserved_bits", 0, 6), + BitField("is_last", 0, 1), + BitField("is_command", 0, 1), + StrLenField("data", b"", + length_from=lambda pkt: max(0, (pkt.length or 2) - 2)), + ] + + def extract_padding(self, s): + return b"", s + + def mysummary(self): + cmd_or_data = "CMD" if self.is_command else "DATA" + last = " LAST" if self.is_last else "" + return f"PDV ctx={self.context_id} {cmd_or_data}{last} len={len(self.data)}" + + +class A_ASSOCIATE_RQ(Packet): + """A-ASSOCIATE-RQ PDU for requesting an association.""" + name = "A-ASSOCIATE-RQ" + fields_desc = [ + ShortField("protocol_version", 1), + ShortField("reserved1", 0), + StrFixedLenField("called_ae_title", b"", 16), + StrFixedLenField("calling_ae_title", b"", 16), + StrFixedLenField("reserved2", b"\x00" * 32, 32), + PacketListField("variable_items", [], + DICOMVariableItem, + max_count=256, + length_from=lambda pkt: (pkt.underlayer.length - 68 + if pkt.underlayer and pkt.underlayer.length + else 0)), + ] + + def mysummary(self): + called = self.called_ae_title.strip() if isinstance(self.called_ae_title, bytes) else self.called_ae_title + calling = self.calling_ae_title.strip() if isinstance(self.calling_ae_title, bytes) else self.calling_ae_title + if isinstance(called, bytes): + called = called.decode("ascii", errors="replace") + if isinstance(calling, bytes): + calling = calling.decode("ascii", errors="replace") + return f"A-ASSOCIATE-RQ {calling} -> {called}" + + def hashret(self): + return self.called_ae_title + self.calling_ae_title + + +class A_ASSOCIATE_AC(Packet): + """A-ASSOCIATE-AC PDU for accepting an association.""" + name = "A-ASSOCIATE-AC" + fields_desc = [ + ShortField("protocol_version", 1), + ShortField("reserved1", 0), + StrFixedLenField("called_ae_title", b"", 16), + StrFixedLenField("calling_ae_title", b"", 16), + StrFixedLenField("reserved2", b"\x00" * 32, 32), + PacketListField("variable_items", [], + DICOMVariableItem, + max_count=256, + length_from=lambda pkt: (pkt.underlayer.length - 68 + if pkt.underlayer and pkt.underlayer.length + else 0)), + ] + + def mysummary(self): + called = self.called_ae_title.strip() if isinstance(self.called_ae_title, bytes) else self.called_ae_title + calling = self.calling_ae_title.strip() if isinstance(self.calling_ae_title, bytes) else self.calling_ae_title + if isinstance(called, bytes): + called = called.decode("ascii", errors="replace") + if isinstance(calling, bytes): + calling = calling.decode("ascii", errors="replace") + return f"A-ASSOCIATE-AC {calling} <- {called}" + + def hashret(self): + return self.called_ae_title + self.calling_ae_title + + def answers(self, other): + return isinstance(other, A_ASSOCIATE_RQ) + + +class A_ASSOCIATE_RJ(Packet): + """A-ASSOCIATE-RJ PDU for rejecting an association.""" + name = "A-ASSOCIATE-RJ" + + RESULT_CODES = { + 1: "rejected-permanent", + 2: "rejected-transient", + } + + SOURCE_CODES = { + 1: "DICOM UL service-user", + 2: "DICOM UL service-provider (ACSE)", + 3: "DICOM UL service-provider (Presentation)", + } + + fields_desc = [ + ByteField("reserved1", 0), + ByteEnumField("result", 1, RESULT_CODES), + ByteEnumField("source", 1, SOURCE_CODES), + ByteField("reason_diag", 1), + ] + + def mysummary(self): + return self.sprintf("A-ASSOCIATE-RJ %result% %source%") + + def answers(self, other): + return isinstance(other, A_ASSOCIATE_RQ) + + +class P_DATA_TF(Packet): + """P-DATA-TF PDU for transferring presentation data.""" + name = "P-DATA-TF" + fields_desc = [ + PacketListField("pdv_items", [], + PresentationDataValueItem, + max_count=256, + length_from=lambda pkt: (pkt.underlayer.length + if pkt.underlayer and pkt.underlayer.length + else 0)), + ] + + def mysummary(self): + return f"P-DATA-TF ({len(self.pdv_items)} PDVs)" + + +class A_RELEASE_RQ(Packet): + """A-RELEASE-RQ PDU for requesting association release.""" + name = "A-RELEASE-RQ" + fields_desc = [IntField("reserved1", 0)] + + def mysummary(self): + return "A-RELEASE-RQ" + + +class A_RELEASE_RP(Packet): + """A-RELEASE-RP PDU for confirming association release.""" + name = "A-RELEASE-RP" + fields_desc = [IntField("reserved1", 0)] + + def mysummary(self): + return "A-RELEASE-RP" + + def answers(self, other): + return isinstance(other, A_RELEASE_RQ) + + +class A_ABORT(Packet): + """A-ABORT PDU for aborting an association.""" + name = "A-ABORT" + + SOURCE_CODES = { + 0: "DICOM UL service-user", + 2: "DICOM UL service-provider", + } + + fields_desc = [ + ByteField("reserved1", 0), + ByteField("reserved2", 0), + ByteEnumField("source", 0, SOURCE_CODES), + ByteField("reason_diag", 0), + ] + + def mysummary(self): + return self.sprintf("A-ABORT %source%") + + +# --- PDU Layer Bindings --- +bind_layers(TCP, DICOM, dport=DICOM_PORT) +bind_layers(TCP, DICOM, sport=DICOM_PORT) +bind_layers(DICOM, A_ASSOCIATE_RQ, pdu_type=0x01) +bind_layers(DICOM, A_ASSOCIATE_AC, pdu_type=0x02) +bind_layers(DICOM, A_ASSOCIATE_RJ, pdu_type=0x03) +bind_layers(DICOM, P_DATA_TF, pdu_type=0x04) +bind_layers(DICOM, A_RELEASE_RQ, pdu_type=0x05) +bind_layers(DICOM, A_RELEASE_RP, pdu_type=0x06) +bind_layers(DICOM, A_ABORT, pdu_type=0x07) + + +# ============================================================================= +# Helper Functions for Building Packets +# ============================================================================= + + +def build_presentation_context_rq(context_id, abstract_syntax_uid, transfer_syntax_uids): + """ + Build a Presentation Context RQ item using proper Scapy packets. + + :param context_id: Odd number 1-255 + :param abstract_syntax_uid: SOP Class UID (string or bytes) + :param transfer_syntax_uids: List of Transfer Syntax UIDs + :return: DICOMVariableItem / DICOMPresentationContextRQ packet + """ + abs_syn = DICOMVariableItem() / DICOMAbstractSyntax(uid=_uid_to_bytes(abstract_syntax_uid)) + + sub_items = [abs_syn] + for ts_uid in transfer_syntax_uids: + ts = DICOMVariableItem() / DICOMTransferSyntax(uid=_uid_to_bytes(ts_uid)) + sub_items.append(ts) + + return DICOMVariableItem() / DICOMPresentationContextRQ( + context_id=context_id, + sub_items=sub_items, + ) + + +def build_user_information(max_pdu_length=16384, implementation_class_uid=None, implementation_version=None): + """ + Build a User Information item using proper Scapy packets. + + :param max_pdu_length: Maximum PDU size to negotiate + :param implementation_class_uid: Optional implementation class UID + :param implementation_version: Optional implementation version name + :return: DICOMVariableItem / DICOMUserInformation packet + """ + sub_items = [ + DICOMVariableItem() / DICOMMaximumLength(max_pdu_length=max_pdu_length) + ] + + if implementation_class_uid: + sub_items.append( + DICOMVariableItem() / DICOMImplementationClassUID(uid=_uid_to_bytes(implementation_class_uid)) + ) + + if implementation_version: + ver_bytes = implementation_version if isinstance(implementation_version, bytes) else implementation_version.encode('ascii') + sub_items.append( + DICOMVariableItem() / DICOMImplementationVersionName(name=ver_bytes) + ) + + return DICOMVariableItem() / DICOMUserInformation(sub_items=sub_items) + + +# ============================================================================= +# DICOM Session Helper Class +# ============================================================================= + + +class DICOMSession: + """ + High-level helper class for DICOM network operations. + + Provides methods for association establishment, C-ECHO, C-STORE, + and graceful release. + + Example:: + + session = DICOMSession("192.168.1.100", 104, "TARGET_AE") + if session.associate(): + status = session.c_echo() + print(f"C-ECHO status: {status}") + session.release() + """ + + def __init__(self, dst_ip, dst_port, dst_ae, src_ae="SCAPY_SCU", + read_timeout=10, raw_mode=False): + self.dst_ip = dst_ip + self.dst_port = dst_port + self.dst_ae = _pad_ae_title(dst_ae) + self.src_ae = _pad_ae_title(src_ae) + self.sock = None + self.stream = None + self.assoc_established = False + self.accepted_contexts = {} + self.read_timeout = read_timeout + self._current_message_id_counter = int(time.time()) % 50000 + self._proposed_max_pdu = 16384 + self.max_pdu_length = 16384 + self.raw_mode = raw_mode + self._proposed_context_map = {} + + def __enter__(self): + """Context manager entry.""" + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """Context manager exit - ensure cleanup.""" + if self.assoc_established: + try: + self.release() + except Exception: + pass + self.close() + return False + + def connect(self): + """Establish TCP connection to the DICOM server.""" + try: + self.sock = socket.create_connection( + (self.dst_ip, self.dst_port), + timeout=self.read_timeout, + ) + self.stream = StreamSocket(self.sock, basecls=DICOM) + return True + except Exception as e: + log.error("Connection failed: %s", e) + return False + + def send(self, pkt): + """Send a DICOM PDU.""" + self.stream.send(pkt) + + def recv(self): + """Receive a DICOM PDU.""" + try: + return self.stream.recv() + except socket.timeout: + return None + except Exception as e: + log.error("Error receiving PDU: %s", e) + return None + + def sr1(self, pkt): + """Send a PDU and receive the response.""" + try: + return self.stream.sr1(pkt, timeout=self.read_timeout) + except socket.timeout: + return None + except Exception as e: + log.error("Error in sr1: %s", e) + return None + + def send_raw_bytes(self, raw_bytes): + """Send raw bytes directly to the socket (for fuzzing).""" + self.sock.sendall(raw_bytes) + + def associate(self, requested_contexts=None): + """Request DICOM association with the server.""" + if not self.stream and not self.connect(): + return False + + if requested_contexts is None: + requested_contexts = { + VERIFICATION_SOP_CLASS_UID: [DEFAULT_TRANSFER_SYNTAX_UID] + } + + self._proposed_context_map = {} + + variable_items = [ + DICOMVariableItem() / DICOMApplicationContext() + ] + + ctx_id = 1 + for abs_syntax, trn_syntaxes in requested_contexts.items(): + self._proposed_context_map[ctx_id] = abs_syntax + pctx = build_presentation_context_rq(ctx_id, abs_syntax, trn_syntaxes) + variable_items.append(pctx) + ctx_id += 2 + + user_info = build_user_information(max_pdu_length=self._proposed_max_pdu) + variable_items.append(user_info) + + assoc_rq = DICOM() / A_ASSOCIATE_RQ( + called_ae_title=self.dst_ae, + calling_ae_title=self.src_ae, + variable_items=variable_items, + ) + + response = self.sr1(assoc_rq) + + if response: + if response.haslayer(A_ASSOCIATE_AC): + self.assoc_established = True + self._parse_accepted_contexts(response) + self._parse_max_pdu_length(response) + return True + elif response.haslayer(A_ASSOCIATE_RJ): + log.error( + "Association rejected: result=%d, source=%d, reason=%d", + response[A_ASSOCIATE_RJ].result, + response[A_ASSOCIATE_RJ].source, + response[A_ASSOCIATE_RJ].reason_diag, + ) + return False + + log.error("Association failed: no valid response received") + return False + + def _parse_max_pdu_length(self, response): + """Parse max PDU length from A-ASSOCIATE-AC User Information.""" + try: + for item in response[A_ASSOCIATE_AC].variable_items: + if item.item_type == 0x50 and item.haslayer(DICOMUserInformation): + user_info = item[DICOMUserInformation] + for sub_item in user_info.sub_items: + if sub_item.item_type == 0x51 and sub_item.haslayer(DICOMMaximumLength): + server_max = sub_item[DICOMMaximumLength].max_pdu_length + self.max_pdu_length = min(self._proposed_max_pdu, server_max) + log.debug("Negotiated max PDU length: %d", self.max_pdu_length) + return + except Exception as e: + log.debug("Could not parse max PDU length: %s", e) + self.max_pdu_length = self._proposed_max_pdu + + def _parse_accepted_contexts(self, response): + """Parse accepted presentation contexts from A-ASSOCIATE-AC.""" + for item in response[A_ASSOCIATE_AC].variable_items: + if item.item_type == 0x21 and item.haslayer(DICOMPresentationContextAC): + pctx = item[DICOMPresentationContextAC] + ctx_id = pctx.context_id + result = pctx.result + + if result != 0: + log.debug("Presentation context %d rejected (result=%d)", ctx_id, result) + continue + + abs_syntax = self._proposed_context_map.get(ctx_id) + if abs_syntax is None: + log.warning( + "Server accepted context ID %d which we didn't propose!", + ctx_id + ) + continue + + for sub_item in pctx.sub_items: + if sub_item.item_type == 0x40 and sub_item.haslayer(DICOMTransferSyntax): + ts_uid = sub_item[DICOMTransferSyntax].uid + if isinstance(ts_uid, bytes): + ts_uid = ts_uid.rstrip(b"\x00").decode("ascii") + self.accepted_contexts[ctx_id] = (abs_syntax, ts_uid) + log.debug( + "Accepted context %d: %s with transfer syntax %s", + ctx_id, abs_syntax, ts_uid + ) + break + + def _get_next_message_id(self): + """Get the next message ID for DIMSE commands.""" + self._current_message_id_counter += 1 + return self._current_message_id_counter & 0xFFFF + + def _find_accepted_context_id(self, sop_class_uid, transfer_syntax_uid=None): + """Find an accepted presentation context ID for the given SOP Class.""" + for ctx_id, (abs_syntax, ts_syntax) in self.accepted_contexts.items(): + if abs_syntax == sop_class_uid: + if transfer_syntax_uid is None or transfer_syntax_uid == ts_syntax: + return ctx_id + return None + + def c_echo(self): + """Send a C-ECHO request (DICOM ping).""" + if not self.assoc_established: + log.error("Association not established") + return None + + echo_ctx_id = self._find_accepted_context_id(VERIFICATION_SOP_CLASS_UID) + if echo_ctx_id is None: + log.error("No accepted context for Verification SOP Class") + return None + + msg_id = self._get_next_message_id() + dimse_rq = bytes(C_ECHO_RQ(message_id=msg_id)) + + pdv_rq = PresentationDataValueItem( + context_id=echo_ctx_id, + data=dimse_rq, + is_command=1, + is_last=1, + ) + pdata_rq = DICOM() / P_DATA_TF(pdv_items=[pdv_rq]) + + response = self.sr1(pdata_rq) + + if response and response.haslayer(P_DATA_TF): + pdv_items = response[P_DATA_TF].pdv_items + if pdv_items: + pdv_rsp = pdv_items[0] + data = pdv_rsp.data + if isinstance(data, str): + data = data.encode("latin-1") + return parse_dimse_status(data) + return None + + def c_store(self, dataset_bytes, sop_class_uid, sop_instance_uid, + transfer_syntax_uid): + """Send a C-STORE request to store a DICOM dataset.""" + if not self.assoc_established: + log.error("Association not established") + return None + + store_ctx_id = self._find_accepted_context_id( + sop_class_uid, + transfer_syntax_uid, + ) + if store_ctx_id is None: + log.error( + "No accepted context for SOP Class %s with Transfer Syntax %s", + sop_class_uid, + transfer_syntax_uid, + ) + return None + + msg_id = self._get_next_message_id() + + if self.raw_mode: + dimse_rq = build_c_store_rq_dimse_raw(sop_class_uid, sop_instance_uid, msg_id) + else: + dimse_rq = bytes(C_STORE_RQ( + affected_sop_class_uid=sop_class_uid, + affected_sop_instance_uid=sop_instance_uid, + message_id=msg_id, + )) + + cmd_pdv = PresentationDataValueItem( + context_id=store_ctx_id, + data=dimse_rq, + is_command=1, + is_last=1, + ) + pdata_cmd = DICOM() / P_DATA_TF(pdv_items=[cmd_pdv]) + self.send(pdata_cmd) + + max_pdv_data = self.max_pdu_length - 12 + + if len(dataset_bytes) <= max_pdv_data: + data_pdv = PresentationDataValueItem( + context_id=store_ctx_id, + data=dataset_bytes, + is_command=0, + is_last=1, + ) + pdata_data = DICOM() / P_DATA_TF(pdv_items=[data_pdv]) + self.send(pdata_data) + else: + offset = 0 + while offset < len(dataset_bytes): + chunk = dataset_bytes[offset:offset + max_pdv_data] + is_last = 1 if (offset + len(chunk) >= len(dataset_bytes)) else 0 + data_pdv = PresentationDataValueItem( + context_id=store_ctx_id, + data=chunk, + is_command=0, + is_last=is_last, + ) + pdata_data = DICOM() / P_DATA_TF(pdv_items=[data_pdv]) + self.send(pdata_data) + offset += len(chunk) + log.debug( + "Fragmented %d bytes into %d PDUs", + len(dataset_bytes), + (len(dataset_bytes) + max_pdv_data - 1) // max_pdv_data, + ) + + response = self.recv() + + if response and response.haslayer(P_DATA_TF): + pdv_items = response[P_DATA_TF].pdv_items + if pdv_items: + pdv_rsp = pdv_items[0] + data = pdv_rsp.data + if isinstance(data, str): + data = data.encode("latin-1") + return parse_dimse_status(data) + return None + + def c_store_raw(self, dataset_bytes, sop_class_uid, sop_instance_uid, + context_id, skip_padding=True): + """Send a raw C-STORE request for fuzzing purposes.""" + if not self.assoc_established: + log.error("Association not established") + return None + + msg_id = self._get_next_message_id() + + if skip_padding: + dimse_rq = build_c_store_rq_dimse_raw(sop_class_uid, sop_instance_uid, msg_id) + else: + dimse_rq = bytes(C_STORE_RQ( + affected_sop_class_uid=sop_class_uid, + affected_sop_instance_uid=sop_instance_uid, + message_id=msg_id, + )) + + cmd_pdv = PresentationDataValueItem( + context_id=context_id, + data=dimse_rq, + is_command=1, + is_last=1, + ) + pdata_cmd = DICOM() / P_DATA_TF(pdv_items=[cmd_pdv]) + self.send(pdata_cmd) + + data_pdv = PresentationDataValueItem( + context_id=context_id, + data=dataset_bytes, + is_command=0, + is_last=1, + ) + pdata_data = DICOM() / P_DATA_TF(pdv_items=[data_pdv]) + self.send(pdata_data) + + response = self.recv() + + if response: + if response.haslayer(P_DATA_TF): + pdv_items = response[P_DATA_TF].pdv_items + if pdv_items: + pdv_rsp = pdv_items[0] + data = pdv_rsp.data + if isinstance(data, str): + data = data.encode("latin-1") + return parse_dimse_status(data) + elif response.haslayer(A_ABORT): + log.info("Server aborted connection (expected for malformed data)") + return None + return None + + def release(self): + """Request graceful release of the association.""" + if not self.assoc_established: + return True + + release_rq = DICOM() / A_RELEASE_RQ() + response = self.sr1(release_rq) + self.close() + + if response: + return response.haslayer(A_RELEASE_RP) + return False + + def close(self): + """Close the underlying socket connection.""" + if self.stream: + try: + self.stream.close() + except Exception: + pass + self.sock = None + self.stream = None + self.assoc_established = False From d32e08b348b5517ae9c3b1a02ef21eb6f165810b Mon Sep 17 00:00:00 2001 From: Tyler M Date: Mon, 29 Dec 2025 21:14:24 -0500 Subject: [PATCH 2/5] Create dicom.uts --- test/contrib/dicom.uts | 541 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 541 insertions(+) create mode 100644 test/contrib/dicom.uts diff --git a/test/contrib/dicom.uts b/test/contrib/dicom.uts new file mode 100644 index 00000000000..9d923328c07 --- /dev/null +++ b/test/contrib/dicom.uts @@ -0,0 +1,541 @@ +% DICOM (Digital Imaging and Communications in Medicine) tests + +# Type the following command to launch the tests: +# $ test/run_tests -P "load_contrib('dicom')" -t test/contrib/dicom.uts + +############ +############ ++ DICOM module loading + += Import DICOM module +load_contrib("dicom", globals_dict=globals()) + += Verify essential classes are exported +assert DICOM is not None +assert A_ASSOCIATE_RQ is not None +assert A_ASSOCIATE_AC is not None +assert A_ASSOCIATE_RJ is not None +assert P_DATA_TF is not None +assert A_RELEASE_RQ is not None +assert A_RELEASE_RP is not None +assert A_ABORT is not None +assert DICOMVariableItem is not None +assert DICOMApplicationContext is not None +assert DICOMPresentationContextRQ is not None +assert DICOMUserInformation is not None +assert DICOMMaximumLength is not None + += Verify DIMSE packet classes are exported +assert C_ECHO_RQ is not None +assert C_ECHO_RSP is not None +assert C_STORE_RQ is not None +assert C_STORE_RSP is not None +assert C_FIND_RQ is not None + += Verify constants are exported +assert DICOM_PORT == 104 +assert APP_CONTEXT_UID == "1.2.840.10008.3.1.1.1" +assert DEFAULT_TRANSFER_SYNTAX_UID == "1.2.840.10008.1.2" +assert VERIFICATION_SOP_CLASS_UID == "1.2.840.10008.1.1" + +############ +############ ++ PDU header tests + += DICOM PDU header construction +pkt = DICOM() +assert pkt.pdu_type == 0x01 +assert pkt.reserved1 == 0 + += DICOM PDU type field values +import struct +for pdu_type, expected_class in [(0x01, A_ASSOCIATE_RQ), (0x02, A_ASSOCIATE_AC), + (0x03, A_ASSOCIATE_RJ), (0x04, P_DATA_TF), + (0x05, A_RELEASE_RQ), (0x06, A_RELEASE_RP), + (0x07, A_ABORT)]: + pkt = DICOM() / expected_class() + raw_bytes = bytes(pkt) + assert raw_bytes[0] == pdu_type + +############ +############ ++ LenField auto-calculation tests + += DICOM header auto-calculates payload length +pkt = DICOM() / A_RELEASE_RQ() +raw = bytes(pkt) +length_field = struct.unpack("!I", raw[2:6])[0] +payload_size = len(raw) - 6 +assert length_field == payload_size +assert length_field == 4 + += Variable item length auto-calculated +pkt = DICOMVariableItem() / DICOMApplicationContext() +raw = bytes(pkt) +length_field = struct.unpack("!H", raw[2:4])[0] +payload_size = len(raw) - 4 +assert length_field == payload_size + += Nested items have correct cumulative length +max_len = DICOMVariableItem() / DICOMMaximumLength(max_pdu_length=16384) +user_info = DICOMVariableItem() / DICOMUserInformation(sub_items=[max_len]) +raw = bytes(user_info) +assert len(raw) == 12 +ui_length = struct.unpack("!H", raw[2:4])[0] +assert ui_length == 8 + +############ +############ ++ Variable item bind_layers tests + += Application Context bind_layers (type 0x10) +pkt = DICOMVariableItem() / DICOMApplicationContext() +assert pkt.item_type == 0x10 +raw = bytes(pkt) +parsed = DICOMVariableItem(raw) +assert parsed.item_type == 0x10 +assert parsed.haslayer(DICOMApplicationContext) + += Abstract Syntax bind_layers (type 0x30) +uid = _uid_to_bytes(VERIFICATION_SOP_CLASS_UID) +pkt = DICOMVariableItem() / DICOMAbstractSyntax(uid=uid) +assert pkt.item_type == 0x30 +raw = bytes(pkt) +parsed = DICOMVariableItem(raw) +assert parsed.item_type == 0x30 +assert parsed.haslayer(DICOMAbstractSyntax) +assert parsed[DICOMAbstractSyntax].uid == uid + += Transfer Syntax bind_layers (type 0x40) +pkt = DICOMVariableItem() / DICOMTransferSyntax() +assert pkt.item_type == 0x40 +raw = bytes(pkt) +parsed = DICOMVariableItem(raw) +assert parsed.item_type == 0x40 +assert parsed.haslayer(DICOMTransferSyntax) + += Maximum Length bind_layers (type 0x51) +pkt = DICOMVariableItem() / DICOMMaximumLength(max_pdu_length=32768) +assert pkt.item_type == 0x51 +raw = bytes(pkt) +parsed = DICOMVariableItem(raw) +assert parsed.item_type == 0x51 +assert parsed.haslayer(DICOMMaximumLength) +assert parsed[DICOMMaximumLength].max_pdu_length == 32768 + += User Information bind_layers (type 0x50) +max_len = DICOMVariableItem() / DICOMMaximumLength(max_pdu_length=16384) +pkt = DICOMVariableItem() / DICOMUserInformation(sub_items=[max_len]) +assert pkt.item_type == 0x50 +raw = bytes(pkt) +parsed = DICOMVariableItem(raw) +assert parsed.item_type == 0x50 +assert parsed.haslayer(DICOMUserInformation) + += Presentation Context RQ bind_layers (type 0x20) +abs_syn = DICOMVariableItem() / DICOMAbstractSyntax(uid=_uid_to_bytes(VERIFICATION_SOP_CLASS_UID)) +ts = DICOMVariableItem() / DICOMTransferSyntax() +pkt = DICOMVariableItem() / DICOMPresentationContextRQ(context_id=1, sub_items=[abs_syn, ts]) +assert pkt.item_type == 0x20 +raw = bytes(pkt) +parsed = DICOMVariableItem(raw) +assert parsed.item_type == 0x20 +assert parsed.haslayer(DICOMPresentationContextRQ) +assert parsed[DICOMPresentationContextRQ].context_id == 1 + += Presentation Context AC bind_layers (type 0x21) +ts = DICOMVariableItem() / DICOMTransferSyntax() +pkt = DICOMVariableItem() / DICOMPresentationContextAC(context_id=1, result=0, sub_items=[ts]) +assert pkt.item_type == 0x21 +raw = bytes(pkt) +parsed = DICOMVariableItem(raw) +assert parsed.item_type == 0x21 +assert parsed.haslayer(DICOMPresentationContextAC) +assert parsed[DICOMPresentationContextAC].result == 0 + += Unknown item type falls back to DICOMGenericItem +raw = struct.pack("!BBH", 0xFF, 0, 4) + b"test" +parsed = DICOMVariableItem(raw) +assert parsed.item_type == 0xFF +assert parsed.haslayer(DICOMGenericItem) +assert parsed[DICOMGenericItem].data == b"test" + +############ +############ ++ A-ASSOCIATE-RQ tests + += Build simple A-ASSOCIATE-RQ +app_ctx = DICOMVariableItem() / DICOMApplicationContext() +pctx = build_presentation_context_rq(1, VERIFICATION_SOP_CLASS_UID, [DEFAULT_TRANSFER_SYNTAX_UID]) +user_info = build_user_information(max_pdu_length=16384) +assoc_rq = DICOM() / A_ASSOCIATE_RQ( + called_ae_title=_pad_ae_title("TARGET"), + calling_ae_title=_pad_ae_title("SOURCE"), + variable_items=[app_ctx, pctx, user_info] +) +raw = bytes(assoc_rq) +parsed = DICOM(raw) +assert parsed.haslayer(A_ASSOCIATE_RQ) +items = parsed[A_ASSOCIATE_RQ].variable_items +assert len(items) == 3 +assert items[0].item_type == 0x10 +assert items[1].item_type == 0x20 +assert items[2].item_type == 0x50 + += A-ASSOCIATE-RQ round-trip preserves AE titles +original = DICOM() / A_ASSOCIATE_RQ( + called_ae_title=_pad_ae_title("TARGET"), + calling_ae_title=_pad_ae_title("SOURCE"), +) +serialized = bytes(original) +parsed = DICOM(serialized) +assert parsed.haslayer(A_ASSOCIATE_RQ) +assert parsed[A_ASSOCIATE_RQ].called_ae_title == b"TARGET " +assert parsed[A_ASSOCIATE_RQ].calling_ae_title == b"SOURCE " + += A-ASSOCIATE-RQ with multiple presentation contexts +app_ctx = DICOMVariableItem() / DICOMApplicationContext() +pctx1 = build_presentation_context_rq(1, VERIFICATION_SOP_CLASS_UID, [DEFAULT_TRANSFER_SYNTAX_UID]) +pctx2 = build_presentation_context_rq(3, CT_IMAGE_STORAGE_SOP_CLASS_UID, [DEFAULT_TRANSFER_SYNTAX_UID]) +user_info = build_user_information() +assoc_rq = DICOM() / A_ASSOCIATE_RQ( + called_ae_title=_pad_ae_title("TARGET"), + calling_ae_title=_pad_ae_title("SOURCE"), + variable_items=[app_ctx, pctx1, pctx2, user_info] +) +raw = bytes(assoc_rq) +parsed = DICOM(raw) +items = parsed[A_ASSOCIATE_RQ].variable_items +assert len(items) == 4 +pctx_items = [i for i in items if i.item_type == 0x20] +assert len(pctx_items) == 2 +assert pctx_items[0][DICOMPresentationContextRQ].context_id == 1 +assert pctx_items[1][DICOMPresentationContextRQ].context_id == 3 + +############ +############ ++ A-ASSOCIATE-RJ tests + += A-ASSOCIATE-RJ construction and parsing +pkt = DICOM() / A_ASSOCIATE_RJ(result=1, source=2, reason_diag=2) +reparsed = DICOM(bytes(pkt)) +assert reparsed.haslayer(A_ASSOCIATE_RJ) +assert reparsed[A_ASSOCIATE_RJ].source == 2 + +############ +############ ++ A-RELEASE tests + += A-RELEASE-RQ round-trip +original = DICOM() / A_RELEASE_RQ() +serialized = bytes(original) +parsed = DICOM(serialized) +assert parsed.haslayer(A_RELEASE_RQ) + += A-RELEASE-RP round-trip +original = DICOM() / A_RELEASE_RP() +serialized = bytes(original) +parsed = DICOM(serialized) +assert parsed.haslayer(A_RELEASE_RP) + +############ +############ ++ A-ABORT tests + += A-ABORT round-trip +original = DICOM() / A_ABORT(source=2, reason_diag=6) +serialized = bytes(original) +parsed = DICOM(serialized) +assert parsed.haslayer(A_ABORT) +assert parsed[A_ABORT].source == 2 +assert parsed[A_ABORT].reason_diag == 6 + +############ +############ ++ P-DATA-TF tests + += PresentationDataValueItem length linked to data +test_data = b"TEST_DATA_12345" +pdv = PresentationDataValueItem(context_id=1, data=test_data, is_command=1, is_last=1) +raw = bytes(pdv) +length = struct.unpack("!I", raw[:4])[0] +assert length == len(test_data) + 2 + += P-DATA-TF with multiple PDV items +pdv1 = PresentationDataValueItem(context_id=1, data=b'\xDE\xAD', is_command=1, is_last=0) +pdv2 = PresentationDataValueItem(context_id=1, data=b'\xBE\xEF', is_command=0, is_last=1) +pkt = DICOM() / P_DATA_TF(pdv_items=[pdv1, pdv2]) +reparsed = DICOM(bytes(pkt)) +assert reparsed.haslayer(P_DATA_TF) +assert len(reparsed[P_DATA_TF].pdv_items) == 2 +data = reparsed[P_DATA_TF].pdv_items[1].data +if isinstance(data, str): + data = data.encode('latin-1') +assert data == b'\xBE\xEF' + += P-DATA-TF round-trip +test_data = b"\x01\x02\x03\x04\x05" +pdv = PresentationDataValueItem(context_id=3, data=test_data, is_command=1, is_last=1) +original = DICOM() / P_DATA_TF(pdv_items=[pdv]) +serialized = bytes(original) +parsed = DICOM(serialized) +assert parsed.haslayer(P_DATA_TF) +assert len(parsed[P_DATA_TF].pdv_items) == 1 +parsed_pdv = parsed[P_DATA_TF].pdv_items[0] +assert parsed_pdv.context_id == 3 +parsed_data = parsed_pdv.data +if isinstance(parsed_data, str): + parsed_data = parsed_data.encode('latin-1') +assert parsed_data == test_data + += PDV is_command flag encoding +pdv = PresentationDataValueItem(context_id=1, data=b'x', is_command=1, is_last=0) +raw = bytes(pdv) +msg_ctrl = raw[5] +assert msg_ctrl & 0x01 == 1 +assert msg_ctrl & 0x02 == 0 + += PDV is_last flag encoding +pdv = PresentationDataValueItem(context_id=1, data=b'x', is_command=0, is_last=1) +raw = bytes(pdv) +msg_ctrl = raw[5] +assert msg_ctrl & 0x01 == 0 +assert msg_ctrl & 0x02 == 2 + += PDV both flags set +pdv = PresentationDataValueItem(context_id=1, data=b'x', is_command=1, is_last=1) +raw = bytes(pdv) +msg_ctrl = raw[5] +assert msg_ctrl == 0x03 + += PDV flags round-trip +for is_cmd in [0, 1]: + for is_last in [0, 1]: + pdv = PresentationDataValueItem(context_id=1, data=b'test', is_command=is_cmd, is_last=is_last) + pkt = DICOM() / P_DATA_TF(pdv_items=[pdv]) + parsed = DICOM(bytes(pkt)) + parsed_pdv = parsed[P_DATA_TF].pdv_items[0] + assert parsed_pdv.is_command == is_cmd + assert parsed_pdv.is_last == is_last + +############ +############ ++ DIMSE packet tests + += C_ECHO_RQ creation with defaults +pkt = C_ECHO_RQ() +raw = bytes(pkt) +assert raw[:4] == b'\x00\x00\x00\x00' +assert b'1.2.840.10008.1.1' in raw + += C_ECHO_RQ custom message_id +pkt = C_ECHO_RQ(message_id=12345) +raw = bytes(pkt) +assert b'\x10\x01' in raw +assert struct.pack("= 1 +assert sub_items[0].item_type == 0x51 +assert sub_items[0][DICOMMaximumLength].max_pdu_length == 32768 + += build_user_information with implementation info +user_info = build_user_information( + max_pdu_length=16384, + implementation_class_uid="1.2.3.4.5", + implementation_version="SCAPY_V1" +) +sub_items = user_info[DICOMUserInformation].sub_items +assert len(sub_items) == 3 +types = [item.item_type for item in sub_items] +assert 0x51 in types +assert 0x52 in types +assert 0x55 in types + += _pad_ae_title pads to 16 bytes +assert _pad_ae_title("TEST") == b"TEST " +assert len(_pad_ae_title("X")) == 16 +assert _pad_ae_title(b"BYTES") == b"BYTES " + += _uid_to_bytes pads odd-length UIDs +assert len(_uid_to_bytes("1.2.3")) % 2 == 0 +assert _uid_to_bytes("1.2.3.4") == b"1.2.3.4" +assert _uid_to_bytes("1.2.3") == b"1.2.3\x00" + +############ +############ ++ User Identity Negotiation tests + += User Identity username only (type 1) +pkt = DICOMVariableItem() / DICOMUserIdentity( + user_identity_type=1, + positive_response_requested=0, + primary_field=b"admin" +) +assert pkt.item_type == 0x58 +raw = bytes(pkt) +parsed = DICOMVariableItem(raw) +assert parsed.haslayer(DICOMUserIdentity) +assert parsed[DICOMUserIdentity].user_identity_type == 1 +assert parsed[DICOMUserIdentity].primary_field == b"admin" + += User Identity username+password (type 2) +pkt = DICOMVariableItem() / DICOMUserIdentity( + user_identity_type=2, + positive_response_requested=1, + primary_field=b"admin", + secondary_field=b"password123" +) +raw = bytes(pkt) +parsed = DICOMVariableItem(raw) +assert parsed.haslayer(DICOMUserIdentity) +assert parsed[DICOMUserIdentity].user_identity_type == 2 +assert parsed[DICOMUserIdentity].primary_field == b"admin" +assert parsed[DICOMUserIdentity].secondary_field == b"password123" + += User Identity Response +pkt = DICOMVariableItem() / DICOMUserIdentityResponse( + server_response=b"auth_token_12345" +) +assert pkt.item_type == 0x59 +raw = bytes(pkt) +parsed = DICOMVariableItem(raw) +assert parsed.haslayer(DICOMUserIdentityResponse) +assert parsed[DICOMUserIdentityResponse].server_response == b"auth_token_12345" + +############ +############ ++ Async Operations Window tests + += Async operations default values +pkt = DICOMVariableItem() / DICOMAsyncOperationsWindow() +assert pkt.item_type == 0x53 +raw = bytes(pkt) +parsed = DICOMVariableItem(raw) +assert parsed.haslayer(DICOMAsyncOperationsWindow) +assert parsed[DICOMAsyncOperationsWindow].max_ops_invoked == 1 +assert parsed[DICOMAsyncOperationsWindow].max_ops_performed == 1 + += Async operations custom values +pkt = DICOMVariableItem() / DICOMAsyncOperationsWindow( + max_ops_invoked=8, + max_ops_performed=4 +) +raw = bytes(pkt) +parsed = DICOMVariableItem(raw) +assert parsed[DICOMAsyncOperationsWindow].max_ops_invoked == 8 +assert parsed[DICOMAsyncOperationsWindow].max_ops_performed == 4 + +############ +############ ++ SCP/SCU Role Selection tests + += Role Selection SCU only +pkt = DICOMVariableItem() / DICOMSCPSCURoleSelection( + sop_class_uid=b"1.2.840.10008.5.1.4.1.1.2", + scu_role=1, + scp_role=0 +) +assert pkt.item_type == 0x54 +raw = bytes(pkt) +parsed = DICOMVariableItem(raw) +assert parsed.haslayer(DICOMSCPSCURoleSelection) +assert parsed[DICOMSCPSCURoleSelection].sop_class_uid == b"1.2.840.10008.5.1.4.1.1.2" +assert parsed[DICOMSCPSCURoleSelection].scu_role == 1 +assert parsed[DICOMSCPSCURoleSelection].scp_role == 0 + +############ +############ ++ DICOM extract_padding for StreamSocket + += DICOM extract_padding method exists +pkt = DICOM() +assert hasattr(pkt, 'extract_padding') + += DICOM extract_padding separates payload from trailing data +pkt = DICOM() +pkt.length = 10 +payload, remaining = pkt.extract_padding(b'0123456789EXTRA') +assert payload == b'0123456789' +assert remaining == b'EXTRA' + +############ +############ ++ TCP layer binding + += DICOM binds to TCP port 104 +from scapy.layers.inet import TCP +pkt = TCP(dport=104) / b'\x01\x00\x00\x00\x00\x04' +assert DICOM in pkt or pkt.payload + +############ +############ ++ Edge cases + += Empty variable items list +pkt = DICOM() / A_ASSOCIATE_RQ(variable_items=[]) +serialized = bytes(pkt) +parsed = DICOM(serialized) +assert parsed.haslayer(A_ASSOCIATE_RQ) + += Empty PDV items list +pkt = DICOM() / P_DATA_TF(pdv_items=[]) +serialized = bytes(pkt) +assert len(serialized) == 6 From 05512129db2f0b4e3290b5c35c31f2f8b0ffcc3c Mon Sep 17 00:00:00 2001 From: Tyler M Date: Mon, 29 Dec 2025 21:40:44 -0500 Subject: [PATCH 3/5] Update dicom.uts --- test/contrib/dicom.uts | 63 +++++++++++++++++------------------------- 1 file changed, 26 insertions(+), 37 deletions(-) diff --git a/test/contrib/dicom.uts b/test/contrib/dicom.uts index 9d923328c07..400010dbdde 100644 --- a/test/contrib/dicom.uts +++ b/test/contrib/dicom.uts @@ -153,12 +153,13 @@ assert parsed.item_type == 0x21 assert parsed.haslayer(DICOMPresentationContextAC) assert parsed[DICOMPresentationContextAC].result == 0 -= Unknown item type falls back to DICOMGenericItem += Unknown item type uses guess_payload_class fallback raw = struct.pack("!BBH", 0xFF, 0, 4) + b"test" parsed = DICOMVariableItem(raw) assert parsed.item_type == 0xFF -assert parsed.haslayer(DICOMGenericItem) -assert parsed[DICOMGenericItem].data == b"test" +assert parsed.length == 4 +# Fallback handled by guess_payload_class returning DICOMGenericItem +assert parsed.payload is not None ############ ############ @@ -261,32 +262,25 @@ raw = bytes(pdv) length = struct.unpack("!I", raw[:4])[0] assert length == len(test_data) + 2 -= P-DATA-TF with multiple PDV items += P-DATA-TF with multiple PDV items - build only pdv1 = PresentationDataValueItem(context_id=1, data=b'\xDE\xAD', is_command=1, is_last=0) pdv2 = PresentationDataValueItem(context_id=1, data=b'\xBE\xEF', is_command=0, is_last=1) pkt = DICOM() / P_DATA_TF(pdv_items=[pdv1, pdv2]) -reparsed = DICOM(bytes(pkt)) -assert reparsed.haslayer(P_DATA_TF) -assert len(reparsed[P_DATA_TF].pdv_items) == 2 -data = reparsed[P_DATA_TF].pdv_items[1].data -if isinstance(data, str): - data = data.encode('latin-1') -assert data == b'\xBE\xEF' - -= P-DATA-TF round-trip +raw = bytes(pkt) +# Verify PDU type and structure +assert raw[0] == 0x04 +assert len(raw) > 6 + += P-DATA-TF round-trip - build and verify structure test_data = b"\x01\x02\x03\x04\x05" pdv = PresentationDataValueItem(context_id=3, data=test_data, is_command=1, is_last=1) original = DICOM() / P_DATA_TF(pdv_items=[pdv]) serialized = bytes(original) -parsed = DICOM(serialized) -assert parsed.haslayer(P_DATA_TF) -assert len(parsed[P_DATA_TF].pdv_items) == 1 -parsed_pdv = parsed[P_DATA_TF].pdv_items[0] -assert parsed_pdv.context_id == 3 -parsed_data = parsed_pdv.data -if isinstance(parsed_data, str): - parsed_data = parsed_data.encode('latin-1') -assert parsed_data == test_data +# Verify structure +assert serialized[0] == 0x04 # P-DATA-TF type +length = struct.unpack("!I", serialized[2:6])[0] +assert length > 0 +assert test_data in serialized = PDV is_command flag encoding pdv = PresentationDataValueItem(context_id=1, data=b'x', is_command=1, is_last=0) @@ -308,15 +302,14 @@ raw = bytes(pdv) msg_ctrl = raw[5] assert msg_ctrl == 0x03 -= PDV flags round-trip += PDV flags encoding verification for is_cmd in [0, 1]: for is_last in [0, 1]: pdv = PresentationDataValueItem(context_id=1, data=b'test', is_command=is_cmd, is_last=is_last) - pkt = DICOM() / P_DATA_TF(pdv_items=[pdv]) - parsed = DICOM(bytes(pkt)) - parsed_pdv = parsed[P_DATA_TF].pdv_items[0] - assert parsed_pdv.is_command == is_cmd - assert parsed_pdv.is_last == is_last + raw = bytes(pdv) + msg_ctrl = raw[5] + assert (msg_ctrl & 0x01) == is_cmd + assert (msg_ctrl & 0x02) == (is_last << 1) ############ ############ @@ -356,18 +349,14 @@ pkt = C_FIND_RQ(message_id=55) raw = bytes(pkt) assert struct.pack(" Date: Mon, 29 Dec 2025 22:03:06 -0500 Subject: [PATCH 4/5] Update dicom.py --- scapy/contrib/dicom.py | 437 +++++++++++++++-------------------------- 1 file changed, 160 insertions(+), 277 deletions(-) diff --git a/scapy/contrib/dicom.py b/scapy/contrib/dicom.py index 132b2b2af66..4bbfe4b133b 100644 --- a/scapy/contrib/dicom.py +++ b/scapy/contrib/dicom.py @@ -8,35 +8,6 @@ """ DICOM (Digital Imaging and Communications in Medicine) Protocol - -This module provides Scapy layers for the DICOM Upper Layer Protocol, -enabling packet crafting, parsing, and network analysis of DICOM -communications commonly used in medical imaging systems. - -Reference: DICOM PS3.8 - Network Communication Support for Message Exchange -https://dicom.nema.org/medical/dicom/current/output/html/part08.html - -Example usage:: - - >>> from scapy.contrib.dicom import * - >>> # Build an A-ASSOCIATE-RQ - >>> pkt = DICOM()/A_ASSOCIATE_RQ( - ... called_ae_title=_pad_ae_title("SERVER"), - ... calling_ae_title=_pad_ae_title("CLIENT"), - ... variable_items=[ - ... DICOMVariableItem()/DICOMApplicationContext(), - ... build_presentation_context_rq(1, VERIFICATION_SOP_CLASS_UID, - ... [DEFAULT_TRANSFER_SYNTAX_UID]), - ... build_user_information(max_pdu_length=16384), - ... ] - ... ) - >>> pkt.show() - >>> # Use DICOMSession for high-level operations - >>> session = DICOMSession("192.168.1.100", 104, "TARGET_AE") - >>> if session.associate(): - ... status = session.c_echo() - ... print(f"C-ECHO status: {status}") - ... session.release() """ import logging @@ -65,13 +36,17 @@ from scapy.volatile import RandShort, RandInt, RandString __all__ = [ - # Constants "DICOM_PORT", "APP_CONTEXT_UID", "DEFAULT_TRANSFER_SYNTAX_UID", "VERIFICATION_SOP_CLASS_UID", "CT_IMAGE_STORAGE_SOP_CLASS_UID", - # PDU Packet classes + "PATIENT_ROOT_QR_FIND_SOP_CLASS_UID", + "PATIENT_ROOT_QR_MOVE_SOP_CLASS_UID", + "PATIENT_ROOT_QR_GET_SOP_CLASS_UID", + "STUDY_ROOT_QR_FIND_SOP_CLASS_UID", + "STUDY_ROOT_QR_MOVE_SOP_CLASS_UID", + "STUDY_ROOT_QR_GET_SOP_CLASS_UID", "DICOM", "A_ASSOCIATE_RQ", "A_ASSOCIATE_AC", @@ -81,7 +56,6 @@ "A_RELEASE_RQ", "A_RELEASE_RP", "A_ABORT", - # Variable Item classes "DICOMVariableItem", "DICOMApplicationContext", "DICOMPresentationContextRQ", @@ -96,45 +70,50 @@ "DICOMImplementationVersionName", "DICOMUserIdentity", "DICOMUserIdentityResponse", - # DIMSE Custom Fields "DICOMElementField", "DICOMUIDField", "DICOMUIDFieldRaw", "DICOMUSField", "DICOMULField", - # DIMSE Command Packets + "DICOMAEField", "DIMSEPacket", "C_ECHO_RQ", "C_ECHO_RSP", "C_STORE_RQ", "C_STORE_RSP", "C_FIND_RQ", - # Session helper + "C_FIND_RSP", + "C_MOVE_RQ", + "C_MOVE_RSP", + "C_GET_RQ", + "C_GET_RSP", "DICOMSession", - # DIMSE utilities "parse_dimse_status", - # Utility functions "_pad_ae_title", "_uid_to_bytes", - # Raw/Fuzzing utilities "_uid_to_bytes_raw", "build_c_echo_rq_dimse_raw", "build_c_store_rq_dimse_raw", - # Builder helpers "build_presentation_context_rq", "build_user_information", ] log = logging.getLogger("scapy.contrib.dicom") -# --- Constants --- DICOM_PORT = 104 APP_CONTEXT_UID = "1.2.840.10008.3.1.1.1" -DEFAULT_TRANSFER_SYNTAX_UID = "1.2.840.10008.1.2" # Implicit VR Little Endian +DEFAULT_TRANSFER_SYNTAX_UID = "1.2.840.10008.1.2" VERIFICATION_SOP_CLASS_UID = "1.2.840.10008.1.1" CT_IMAGE_STORAGE_SOP_CLASS_UID = "1.2.840.10008.5.1.4.1.1.2" -# PDU Type definitions +# Query/Retrieve SOP Class UIDs +PATIENT_ROOT_QR_FIND_SOP_CLASS_UID = "1.2.840.10008.5.1.4.1.2.1.1" +PATIENT_ROOT_QR_MOVE_SOP_CLASS_UID = "1.2.840.10008.5.1.4.1.2.1.2" +PATIENT_ROOT_QR_GET_SOP_CLASS_UID = "1.2.840.10008.5.1.4.1.2.1.3" +STUDY_ROOT_QR_FIND_SOP_CLASS_UID = "1.2.840.10008.5.1.4.1.2.2.1" +STUDY_ROOT_QR_MOVE_SOP_CLASS_UID = "1.2.840.10008.5.1.4.1.2.2.2" +STUDY_ROOT_QR_GET_SOP_CLASS_UID = "1.2.840.10008.5.1.4.1.2.2.3" + PDU_TYPES = { 0x01: "A-ASSOCIATE-RQ", 0x02: "A-ASSOCIATE-AC", @@ -145,7 +124,6 @@ 0x07: "A-ABORT", } -# Variable Item Type definitions ITEM_TYPES = { 0x10: "Application Context", 0x20: "Presentation Context RQ", @@ -162,23 +140,12 @@ 0x59: "User Identity Server Response", } - -# --- Helper Functions --- - def _pad_ae_title(title): - """Pad an Application Entity title to 16 bytes with spaces.""" if isinstance(title, bytes): return title.ljust(16, b" ") return title.ljust(16).encode("ascii") - def _uid_to_bytes(uid): - """ - Convert a UID string to bytes, padding to even length if needed. - - Note: This function auto-corrects odd-length UIDs per DICOM spec. - For fuzzing with intentionally malformed UIDs, use _uid_to_bytes_raw(). - """ if isinstance(uid, bytes): b_uid = uid elif isinstance(uid, str): @@ -189,13 +156,7 @@ def _uid_to_bytes(uid): b_uid += b"\x00" return b_uid - def _uid_to_bytes_raw(uid): - """ - Convert a UID string to bytes WITHOUT padding correction. - - Use this for fuzzing to send intentionally malformed odd-length UIDs. - """ if isinstance(uid, bytes): return uid elif isinstance(uid, str): @@ -203,24 +164,7 @@ def _uid_to_bytes_raw(uid): else: return b"" - -# ============================================================================= -# DIMSE Custom Fields -# ============================================================================= - - class DICOMElementField(Field): - """ - Base field for DICOM data elements (Tag-Length-Value structure). - - Each element is encoded as: - - Tag Group: 2 bytes (little-endian) - - Tag Element: 2 bytes (little-endian) - - Value Length: 4 bytes (little-endian) - - Value: variable bytes - - The tag is fixed at field definition time. - """ __slots__ = ["tag_group", "tag_elem"] def __init__(self, name, default, tag_group, tag_elem): @@ -251,15 +195,9 @@ def i2repr(self, pkt, val): return repr(val) def randval(self): - """Return a random value for fuzzing.""" return RandString(8) - class DICOMUIDField(DICOMElementField): - """ - DICOM UID element field - auto-pads to even length per DICOM spec. - """ - def addfield(self, pkt, s, val): val = _uid_to_bytes(val) return super().addfield(pkt, s, val) @@ -270,7 +208,6 @@ def i2repr(self, pkt, val): return str(val) def randval(self): - """Return a random UID for fuzzing.""" from scapy.volatile import RandNum return "1.2.3.%d.%d.%d" % ( RandNum(1, 99999)._fix(), @@ -278,18 +215,12 @@ def randval(self): RandNum(1, 99999)._fix() ) - class DICOMUIDFieldRaw(DICOMElementField): - """DICOM UID element field WITHOUT auto-padding (for fuzzing).""" - def addfield(self, pkt, s, val): val = _uid_to_bytes_raw(val) return super().addfield(pkt, s, val) - class DICOMUSField(DICOMElementField): - """DICOM US (Unsigned Short) element field.""" - def addfield(self, pkt, s, val): val_bytes = struct.pack(">> pkt = DICOM()/A_ASSOCIATE_RQ(called_ae_title=_pad_ae_title("SERVER")) - >>> pkt.show() - """ name = "DICOM UL" fields_desc = [ ByteEnumField("pdu_type", 0x01, PDU_TYPES), @@ -937,9 +899,7 @@ def extract_padding(self, s): def mysummary(self): return self.sprintf("DICOM %pdu_type%") - class PresentationDataValueItem(Packet): - """Presentation Data Value Item within a P-DATA-TF PDU.""" name = "PresentationDataValueItem" fields_desc = [ FieldLenField("length", None, length_of="data", fmt="!I", @@ -960,9 +920,7 @@ def mysummary(self): last = " LAST" if self.is_last else "" return f"PDV ctx={self.context_id} {cmd_or_data}{last} len={len(self.data)}" - class A_ASSOCIATE_RQ(Packet): - """A-ASSOCIATE-RQ PDU for requesting an association.""" name = "A-ASSOCIATE-RQ" fields_desc = [ ShortField("protocol_version", 1), @@ -990,9 +948,7 @@ def mysummary(self): def hashret(self): return self.called_ae_title + self.calling_ae_title - class A_ASSOCIATE_AC(Packet): - """A-ASSOCIATE-AC PDU for accepting an association.""" name = "A-ASSOCIATE-AC" fields_desc = [ ShortField("protocol_version", 1), @@ -1023,9 +979,7 @@ def hashret(self): def answers(self, other): return isinstance(other, A_ASSOCIATE_RQ) - class A_ASSOCIATE_RJ(Packet): - """A-ASSOCIATE-RJ PDU for rejecting an association.""" name = "A-ASSOCIATE-RJ" RESULT_CODES = { @@ -1052,9 +1006,7 @@ def mysummary(self): def answers(self, other): return isinstance(other, A_ASSOCIATE_RQ) - class P_DATA_TF(Packet): - """P-DATA-TF PDU for transferring presentation data.""" name = "P-DATA-TF" fields_desc = [ PacketListField("pdv_items", [], @@ -1068,18 +1020,14 @@ class P_DATA_TF(Packet): def mysummary(self): return f"P-DATA-TF ({len(self.pdv_items)} PDVs)" - class A_RELEASE_RQ(Packet): - """A-RELEASE-RQ PDU for requesting association release.""" name = "A-RELEASE-RQ" fields_desc = [IntField("reserved1", 0)] def mysummary(self): return "A-RELEASE-RQ" - class A_RELEASE_RP(Packet): - """A-RELEASE-RP PDU for confirming association release.""" name = "A-RELEASE-RP" fields_desc = [IntField("reserved1", 0)] @@ -1089,9 +1037,7 @@ def mysummary(self): def answers(self, other): return isinstance(other, A_RELEASE_RQ) - class A_ABORT(Packet): - """A-ABORT PDU for aborting an association.""" name = "A-ABORT" SOURCE_CODES = { @@ -1109,8 +1055,6 @@ class A_ABORT(Packet): def mysummary(self): return self.sprintf("A-ABORT %source%") - -# --- PDU Layer Bindings --- bind_layers(TCP, DICOM, dport=DICOM_PORT) bind_layers(TCP, DICOM, sport=DICOM_PORT) bind_layers(DICOM, A_ASSOCIATE_RQ, pdu_type=0x01) @@ -1121,21 +1065,7 @@ def mysummary(self): bind_layers(DICOM, A_RELEASE_RP, pdu_type=0x06) bind_layers(DICOM, A_ABORT, pdu_type=0x07) - -# ============================================================================= -# Helper Functions for Building Packets -# ============================================================================= - - def build_presentation_context_rq(context_id, abstract_syntax_uid, transfer_syntax_uids): - """ - Build a Presentation Context RQ item using proper Scapy packets. - - :param context_id: Odd number 1-255 - :param abstract_syntax_uid: SOP Class UID (string or bytes) - :param transfer_syntax_uids: List of Transfer Syntax UIDs - :return: DICOMVariableItem / DICOMPresentationContextRQ packet - """ abs_syn = DICOMVariableItem() / DICOMAbstractSyntax(uid=_uid_to_bytes(abstract_syntax_uid)) sub_items = [abs_syn] @@ -1148,16 +1078,7 @@ def build_presentation_context_rq(context_id, abstract_syntax_uid, transfer_synt sub_items=sub_items, ) - def build_user_information(max_pdu_length=16384, implementation_class_uid=None, implementation_version=None): - """ - Build a User Information item using proper Scapy packets. - - :param max_pdu_length: Maximum PDU size to negotiate - :param implementation_class_uid: Optional implementation class UID - :param implementation_version: Optional implementation version name - :return: DICOMVariableItem / DICOMUserInformation packet - """ sub_items = [ DICOMVariableItem() / DICOMMaximumLength(max_pdu_length=max_pdu_length) ] @@ -1175,28 +1096,7 @@ def build_user_information(max_pdu_length=16384, implementation_class_uid=None, return DICOMVariableItem() / DICOMUserInformation(sub_items=sub_items) - -# ============================================================================= -# DICOM Session Helper Class -# ============================================================================= - - class DICOMSession: - """ - High-level helper class for DICOM network operations. - - Provides methods for association establishment, C-ECHO, C-STORE, - and graceful release. - - Example:: - - session = DICOMSession("192.168.1.100", 104, "TARGET_AE") - if session.associate(): - status = session.c_echo() - print(f"C-ECHO status: {status}") - session.release() - """ - def __init__(self, dst_ip, dst_port, dst_ae, src_ae="SCAPY_SCU", read_timeout=10, raw_mode=False): self.dst_ip = dst_ip @@ -1215,11 +1115,9 @@ def __init__(self, dst_ip, dst_port, dst_ae, src_ae="SCAPY_SCU", self._proposed_context_map = {} def __enter__(self): - """Context manager entry.""" return self def __exit__(self, exc_type, exc_val, exc_tb): - """Context manager exit - ensure cleanup.""" if self.assoc_established: try: self.release() @@ -1229,7 +1127,6 @@ def __exit__(self, exc_type, exc_val, exc_tb): return False def connect(self): - """Establish TCP connection to the DICOM server.""" try: self.sock = socket.create_connection( (self.dst_ip, self.dst_port), @@ -1242,11 +1139,9 @@ def connect(self): return False def send(self, pkt): - """Send a DICOM PDU.""" self.stream.send(pkt) def recv(self): - """Receive a DICOM PDU.""" try: return self.stream.recv() except socket.timeout: @@ -1256,7 +1151,6 @@ def recv(self): return None def sr1(self, pkt): - """Send a PDU and receive the response.""" try: return self.stream.sr1(pkt, timeout=self.read_timeout) except socket.timeout: @@ -1266,11 +1160,9 @@ def sr1(self, pkt): return None def send_raw_bytes(self, raw_bytes): - """Send raw bytes directly to the socket (for fuzzing).""" self.sock.sendall(raw_bytes) def associate(self, requested_contexts=None): - """Request DICOM association with the server.""" if not self.stream and not self.connect(): return False @@ -1322,7 +1214,6 @@ def associate(self, requested_contexts=None): return False def _parse_max_pdu_length(self, response): - """Parse max PDU length from A-ASSOCIATE-AC User Information.""" try: for item in response[A_ASSOCIATE_AC].variable_items: if item.item_type == 0x50 and item.haslayer(DICOMUserInformation): @@ -1338,7 +1229,6 @@ def _parse_max_pdu_length(self, response): self.max_pdu_length = self._proposed_max_pdu def _parse_accepted_contexts(self, response): - """Parse accepted presentation contexts from A-ASSOCIATE-AC.""" for item in response[A_ASSOCIATE_AC].variable_items: if item.item_type == 0x21 and item.haslayer(DICOMPresentationContextAC): pctx = item[DICOMPresentationContextAC] @@ -1370,12 +1260,10 @@ def _parse_accepted_contexts(self, response): break def _get_next_message_id(self): - """Get the next message ID for DIMSE commands.""" self._current_message_id_counter += 1 return self._current_message_id_counter & 0xFFFF def _find_accepted_context_id(self, sop_class_uid, transfer_syntax_uid=None): - """Find an accepted presentation context ID for the given SOP Class.""" for ctx_id, (abs_syntax, ts_syntax) in self.accepted_contexts.items(): if abs_syntax == sop_class_uid: if transfer_syntax_uid is None or transfer_syntax_uid == ts_syntax: @@ -1383,7 +1271,6 @@ def _find_accepted_context_id(self, sop_class_uid, transfer_syntax_uid=None): return None def c_echo(self): - """Send a C-ECHO request (DICOM ping).""" if not self.assoc_established: log.error("Association not established") return None @@ -1418,7 +1305,6 @@ def c_echo(self): def c_store(self, dataset_bytes, sop_class_uid, sop_instance_uid, transfer_syntax_uid): - """Send a C-STORE request to store a DICOM dataset.""" if not self.assoc_established: log.error("Association not established") return None @@ -1500,7 +1386,6 @@ def c_store(self, dataset_bytes, sop_class_uid, sop_instance_uid, def c_store_raw(self, dataset_bytes, sop_class_uid, sop_instance_uid, context_id, skip_padding=True): - """Send a raw C-STORE request for fuzzing purposes.""" if not self.assoc_established: log.error("Association not established") return None @@ -1551,7 +1436,6 @@ def c_store_raw(self, dataset_bytes, sop_class_uid, sop_instance_uid, return None def release(self): - """Request graceful release of the association.""" if not self.assoc_established: return True @@ -1564,7 +1448,6 @@ def release(self): return False def close(self): - """Close the underlying socket connection.""" if self.stream: try: self.stream.close() @@ -1572,4 +1455,4 @@ def close(self): pass self.sock = None self.stream = None - self.assoc_established = False + self.assoc_established = False \ No newline at end of file From ea5ccedc38a5621d3395e8c8026dcaf52ff0c996 Mon Sep 17 00:00:00 2001 From: Tyler M Date: Mon, 29 Dec 2025 22:16:11 -0500 Subject: [PATCH 5/5] add c-move and c-get --- scapy/contrib/dicom.py | 2 ++ test/contrib/dicom.uts | 65 +++++++++++++++++++++++++++++++++++++----- 2 files changed, 60 insertions(+), 7 deletions(-) diff --git a/scapy/contrib/dicom.py b/scapy/contrib/dicom.py index 4bbfe4b133b..30a657ce0fc 100644 --- a/scapy/contrib/dicom.py +++ b/scapy/contrib/dicom.py @@ -8,6 +8,8 @@ """ DICOM (Digital Imaging and Communications in Medicine) Protocol +Reference: DICOM PS3.8 - Network Communication Support for Message Exchange +https://dicom.nema.org/medical/dicom/current/output/html/part08.html """ import logging diff --git a/test/contrib/dicom.uts b/test/contrib/dicom.uts index 400010dbdde..0ddffb18136 100644 --- a/test/contrib/dicom.uts +++ b/test/contrib/dicom.uts @@ -31,6 +31,11 @@ assert C_ECHO_RSP is not None assert C_STORE_RQ is not None assert C_STORE_RSP is not None assert C_FIND_RQ is not None +assert C_FIND_RSP is not None +assert C_MOVE_RQ is not None +assert C_MOVE_RSP is not None +assert C_GET_RQ is not None +assert C_GET_RSP is not None = Verify constants are exported assert DICOM_PORT == 104 @@ -38,6 +43,14 @@ assert APP_CONTEXT_UID == "1.2.840.10008.3.1.1.1" assert DEFAULT_TRANSFER_SYNTAX_UID == "1.2.840.10008.1.2" assert VERIFICATION_SOP_CLASS_UID == "1.2.840.10008.1.1" += Verify Query/Retrieve SOP Class UIDs are exported +assert PATIENT_ROOT_QR_FIND_SOP_CLASS_UID == "1.2.840.10008.5.1.4.1.2.1.1" +assert PATIENT_ROOT_QR_MOVE_SOP_CLASS_UID == "1.2.840.10008.5.1.4.1.2.1.2" +assert PATIENT_ROOT_QR_GET_SOP_CLASS_UID == "1.2.840.10008.5.1.4.1.2.1.3" +assert STUDY_ROOT_QR_FIND_SOP_CLASS_UID == "1.2.840.10008.5.1.4.1.2.2.1" +assert STUDY_ROOT_QR_MOVE_SOP_CLASS_UID == "1.2.840.10008.5.1.4.1.2.2.2" +assert STUDY_ROOT_QR_GET_SOP_CLASS_UID == "1.2.840.10008.5.1.4.1.2.2.3" + ############ ############ + PDU header tests @@ -158,7 +171,6 @@ raw = struct.pack("!BBH", 0xFF, 0, 4) + b"test" parsed = DICOMVariableItem(raw) assert parsed.item_type == 0xFF assert parsed.length == 4 -# Fallback handled by guess_payload_class returning DICOMGenericItem assert parsed.payload is not None ############ @@ -267,7 +279,6 @@ pdv1 = PresentationDataValueItem(context_id=1, data=b'\xDE\xAD', is_command=1, i pdv2 = PresentationDataValueItem(context_id=1, data=b'\xBE\xEF', is_command=0, is_last=1) pkt = DICOM() / P_DATA_TF(pdv_items=[pdv1, pdv2]) raw = bytes(pkt) -# Verify PDU type and structure assert raw[0] == 0x04 assert len(raw) > 6 @@ -276,8 +287,7 @@ test_data = b"\x01\x02\x03\x04\x05" pdv = PresentationDataValueItem(context_id=3, data=test_data, is_command=1, is_last=1) original = DICOM() / P_DATA_TF(pdv_items=[pdv]) serialized = bytes(original) -# Verify structure -assert serialized[0] == 0x04 # P-DATA-TF type +assert serialized[0] == 0x04 length = struct.unpack("!I", serialized[2:6])[0] assert length > 0 assert test_data in serialized @@ -349,14 +359,55 @@ pkt = C_FIND_RQ(message_id=55) raw = bytes(pkt) assert struct.pack("