From 01067e3c3b61757f2ac82e495f0a9650b1b8f69e Mon Sep 17 00:00:00 2001 From: Greg Lucas Date: Thu, 15 Jan 2026 07:22:57 -0500 Subject: [PATCH] ENH/FIX: Add segmented packet handling for Ultra packets This adds a helper function to combine segmented packets together after XTCE parsing. We can't handle this in the XTCE definition itself due to the variable length packets. We also need to update the Ultra packet definition to have a larger variable length field because the header items are actually only stored in the first packet, not all the middle/last packets too. --- imap_processing/tests/test_utils.py | 44 ++ imap_processing/ultra/l0/decom_ultra.py | 65 ++- .../packet_definitions/ULTRA_SCI_COMBINED.xml | 444 +----------------- imap_processing/utils.py | 74 +++ 4 files changed, 194 insertions(+), 433 deletions(-) diff --git a/imap_processing/tests/test_utils.py b/imap_processing/tests/test_utils.py index 6dd5853a15..1f5c0ca512 100644 --- a/imap_processing/tests/test_utils.py +++ b/imap_processing/tests/test_utils.py @@ -262,6 +262,50 @@ def test_packet_file_to_datasets_flat_definition(): utils.packet_file_to_datasets(packet_files, packet_definition) +def test_combine_segmented_packets(): + """Test combine_segmented_packets function.""" + + # unsegmented, first, middle, last, unsegmented + sequence_flags = xr.DataArray(np.array([3, 1, 0, 2, 3]), dims=["epoch"]) + + binary_data = xr.DataArray( + np.array( + [ + b"ABC", + b"123", + b"456", + b"789", + b"abc", + ], + dtype=object, + ), + dims=["epoch"], + ) + + ds = xr.Dataset(data_vars={"seq_flgs": sequence_flags, "packetdata": binary_data}) + + combined_ds = utils.combine_segmented_packets(ds, "packetdata") + + expected_ds = xr.Dataset( + data_vars={ + "seq_flgs": xr.DataArray(np.array([3, 1, 3]), dims=["epoch"]), + "packetdata": xr.DataArray( + np.array( + [ + b"ABC", + b"123456789", + b"abc", + ], + dtype=object, + ), + dims=["epoch"], + ), + } + ) + + xr.testing.assert_equal(combined_ds, expected_ds) + + def test_extract_data_dict(): """Test extract_data_dict function.""" data_vars = { diff --git a/imap_processing/ultra/l0/decom_ultra.py b/imap_processing/ultra/l0/decom_ultra.py index 1388996dab..7c7f5227a6 100644 --- a/imap_processing/ultra/l0/decom_ultra.py +++ b/imap_processing/ultra/l0/decom_ultra.py @@ -32,15 +32,73 @@ ULTRA_RATES, PacketProperties, ) -from imap_processing.utils import convert_to_binary_string +from imap_processing.utils import combine_segmented_packets, convert_to_binary_string logger = logging.getLogger(__name__) +def extract_initial_items_from_combined_packets( + packets: xr.Dataset, +) -> xr.Dataset: + """ + Extract metadata fields from the beginning of combined event_data packets. + + Extracts bit fields from the first 20 bytes of each event_data array + and adds them as new variables to the dataset. + + Parameters + ---------- + packets : xarray.Dataset + Dataset containing combined packets with event_data. + + Returns + ------- + xarray.Dataset + Dataset with extracted metadata fields added. + """ + # Initialize arrays for extracted fields + n_packets = len(packets.epoch) + + # Preallocate arrays + sid = np.zeros(n_packets, dtype=np.uint8) + spin = np.zeros(n_packets, dtype=np.uint8) + abortflag = np.zeros(n_packets, dtype=np.uint8) + startdelay = np.zeros(n_packets, dtype=np.uint16) + p00 = np.zeros(n_packets, dtype=np.uint8) + + # Extract the data array outside of the loop + binary_data = packets["packetdata"].data + # Extract fields from each packet + for pkt_idx in range(n_packets): + event_data = binary_data[pkt_idx] + + sid[pkt_idx] = event_data[0] + spin[pkt_idx] = event_data[1] + abortflag[pkt_idx] = (event_data[2] >> 7) & 0x1 + startdelay[pkt_idx] = int.from_bytes(event_data[2:4], byteorder="big") & 0x7FFF + p00[pkt_idx] = event_data[4] + + # Remove the first 5 bytes after extraction + binary_data[pkt_idx] = event_data[5:] + + # Add extracted fields to dataset + packets["sid"] = xr.DataArray(sid, dims=["epoch"]) + packets["spin"] = xr.DataArray(spin, dims=["epoch"]) + packets["abortflag"] = xr.DataArray(abortflag, dims=["epoch"]) + packets["startdelay"] = xr.DataArray(startdelay, dims=["epoch"]) + packets["p00"] = xr.DataArray(p00, dims=["epoch"]) + + return packets + + def process_ultra_tof(ds: xr.Dataset, packet_props: PacketProperties) -> xr.Dataset: """ Unpack and decode Ultra TOF packets. + The TOF packets contain image data that may be split across multiple segmented + packets. This function combines the segmented packets and decompresses the image + data. + Parameters ---------- ds : xarray.Dataset @@ -54,6 +112,11 @@ def process_ultra_tof(ds: xr.Dataset, packet_props: PacketProperties) -> xr.Data dataset : xarray.Dataset Dataset containing the decoded and decompressed data. """ + # Combine segmented packets + ds = combine_segmented_packets(ds, binary_field_name="packetdata") + # Extract the header keys from each of the combined packetdata fields. + ds = extract_initial_items_from_combined_packets(ds) + scalar_keys = [key for key in ds.data_vars if key not in ("packetdata", "sid")] image_planes = packet_props.image_planes diff --git a/imap_processing/ultra/packet_definitions/ULTRA_SCI_COMBINED.xml b/imap_processing/ultra/packet_definitions/ULTRA_SCI_COMBINED.xml index df36118015..21fb527295 100644 --- a/imap_processing/ultra/packet_definitions/ULTRA_SCI_COMBINED.xml +++ b/imap_processing/ultra/packet_definitions/ULTRA_SCI_COMBINED.xml @@ -4664,27 +4664,12 @@ - - - - - - - - - - - - - - - - + @@ -4692,27 +4677,12 @@ - - - - - - - - - - - - - - - - + @@ -4720,27 +4690,12 @@ - - - - - - - - - - - - - - - - + @@ -4748,27 +4703,12 @@ - - - - - - - - - - - - - - - - + @@ -4776,27 +4716,12 @@ - - - - - - - - - - - - - - - - + @@ -4804,27 +4729,12 @@ - - - - - - - - - - - - - - - - + @@ -9662,27 +9572,12 @@ - - - - - - - - - - - - - - - - + @@ -9690,27 +9585,12 @@ - - - - - - - - - - - - - - - - + @@ -9718,27 +9598,12 @@ - - - - - - - - - - - - - - - - + @@ -9746,27 +9611,12 @@ - - - - - - - - - - - - - - - - + @@ -9774,27 +9624,12 @@ - - - - - - - - - - - - - - - - + @@ -9802,27 +9637,12 @@ - - - - - - - - - - - - - - - - + @@ -11561,126 +11381,36 @@ CCSDS Packet 2nd Header Coarse Time - - Science ID - - - Spin number at integration start - - - Integration aborted - - - Integration start delay (ms) - - - Starting pixel - Image Packet Data CCSDS Packet 2nd Header Coarse Time - - Science ID - - - Spin number at integration start - - - Integration aborted - - - Integration start delay (ms) - - - Starting pixel - Image Packet Data CCSDS Packet 2nd Header Coarse Time - - Science ID - - - Spin number at integration start - - - Integration aborted - - - Integration start delay (ms) - - - Starting pixel - Image Packet Data CCSDS Packet 2nd Header Coarse Time - - Science ID - - - Spin number at integration start - - - Integration aborted - - - Integration start delay (ms) - - - Starting pixel - Image Packet Data CCSDS Packet 2nd Header Coarse Time - - Science ID - - - Spin number at integration start - - - Integration aborted - - - Integration start delay (ms) - - - Starting pixel - Image Packet Data CCSDS Packet 2nd Header Coarse Time - - Science ID - - - Spin number at integration start - - - Integration aborted - - - Integration start delay (ms) - - - Starting pixel - Image Packet Data @@ -13346,126 +13076,36 @@ CCSDS Packet 2nd Header Coarse Time - - Science ID - - - Spin number at integration start - - - Integration aborted - - - Integration start delay (ms) - - - Starting pixel - Image Packet Data CCSDS Packet 2nd Header Coarse Time - - Science ID - - - Spin number at integration start - - - Integration aborted - - - Integration start delay (ms) - - - Starting pixel - Image Packet Data CCSDS Packet 2nd Header Coarse Time - - Science ID - - - Spin number at integration start - - - Integration aborted - - - Integration start delay (ms) - - - Starting pixel - Image Packet Data CCSDS Packet 2nd Header Coarse Time - - Science ID - - - Spin number at integration start - - - Integration aborted - - - Integration start delay (ms) - - - Starting pixel - Image Packet Data CCSDS Packet 2nd Header Coarse Time - - Science ID - - - Spin number at integration start - - - Integration aborted - - - Integration start delay (ms) - - - Starting pixel - Image Packet Data CCSDS Packet 2nd Header Coarse Time - - Science ID - - - Spin number at integration start - - - Integration aborted - - - Integration start delay (ms) - - - Starting pixel - Image Packet Data @@ -14287,11 +13927,6 @@ - - - - - @@ -14303,11 +13938,6 @@ - - - - - @@ -14319,11 +13949,6 @@ - - - - - @@ -14335,11 +13960,6 @@ - - - - - @@ -14351,11 +13971,6 @@ - - - - - @@ -14367,11 +13982,6 @@ - - - - - @@ -15155,11 +14765,6 @@ - - - - - @@ -15171,11 +14776,6 @@ - - - - - @@ -15187,11 +14787,6 @@ - - - - - @@ -15203,11 +14798,6 @@ - - - - - @@ -15219,11 +14809,6 @@ - - - - - @@ -15235,11 +14820,6 @@ - - - - - diff --git a/imap_processing/utils.py b/imap_processing/utils.py index 6456fc839a..cbfd5fcb7a 100644 --- a/imap_processing/utils.py +++ b/imap_processing/utils.py @@ -10,6 +10,7 @@ import space_packet_parser as spp import xarray as xr from space_packet_parser.exceptions import UnrecognizedPacketTypeError +from space_packet_parser.generators.ccsds import SequenceFlags from space_packet_parser.xtce import definitions, encodings, parameter_types from imap_processing.spice.time import met_to_ttj2000ns @@ -349,6 +350,79 @@ def packet_file_to_datasets( return dataset_by_apid +def combine_segmented_packets( + packets: xr.Dataset, binary_field_name: str = "packetdata" +) -> xr.Dataset: + """ + Combine segmented packets into unsegmented packets. + + To combine the segmented packets, we only concatenate along the `binary_field_name` + and place all values into the first packet of the group. The binary_field_name + is the name of the XTCE Parameter that contains the binary data for the packet. + The other fields are left as-is from the first packet of the group. + + Parameters + ---------- + packets : xarray.Dataset + Dataset containing the packets to combine. + binary_field_name : str, default "packetdata" + Name of the binary field in the dataset representing the packet data. + Defined in the XTCE definition for each instrument. + + Returns + ------- + combined_packets : xarray.Dataset + Dataset containing the combined packets. + """ + # Identification of group starts + # NOTE: seq_flgs is the same variable name for all instruments on IMAP + # but could be different for other missions depending on the XTCE definition. + is_group_start = (packets["seq_flgs"].data == SequenceFlags.UNSEGMENTED) | ( + packets["seq_flgs"].data == SequenceFlags.FIRST + ) + + # Assign group IDs using cumulative sum - each group start increments the ID + group_ids = np.cumsum(is_group_start) + + # Get indices of packets we'll keep (first packet of each group) + group_start_indices = np.where(is_group_start)[0] + + # Concatenate binary data in-place for each group + for group_id in np.unique(group_ids): + # Find all packets belonging to this group + group_mask = group_ids == group_id + group_indices = np.where(group_mask)[0] + + # If multiple packets, concatenate into the first packet + # [b"abc", b"def", b"ghi"] -> b"abcdefghi" + if len(group_indices) > 1: + start_index = group_indices[0] + # Lets do some quick validation on these packets since we've had + # some missing packet groups in the past + seq_flags = packets["seq_flgs"].data[group_indices] + if ( + seq_flags[0] != SequenceFlags.FIRST + or seq_flags[-1] != SequenceFlags.LAST + or ( + len(seq_flags) > 2 + and not np.all(seq_flags[1:-1] == SequenceFlags.CONTINUATION) + ) + ): + logger.warning( + f"Incorrect/incomplete sequence flags in group {group_id}. " + f"Flags: {seq_flags}, " + f"SHCOARSEs: {packets['shcoarse'].data[group_indices]}" + ) + packets[binary_field_name].data[start_index] = np.sum( + packets[binary_field_name].data[group_indices] + ) + + # Select only the first packet of each group (drop the middle/last packets) + combined_packets = packets.isel(epoch=group_start_indices) + + return combined_packets + + def packet_generator( packet_file: str | Path, xtce_packet_definition: str | Path,