From cc6008279cb4a14c5db6c2590668176d9dc0a1f3 Mon Sep 17 00:00:00 2001 From: Greg Lucas Date: Tue, 6 Jan 2026 20:56:58 -0700 Subject: [PATCH 1/8] Handling codice segmented packets --- imap_processing/codice/codice_l1a.py | 3 + imap_processing/codice/codice_l1a_de.py | 189 ++++++- .../codice_packet_definition.xml | 467 +++++++----------- 3 files changed, 331 insertions(+), 328 deletions(-) diff --git a/imap_processing/codice/codice_l1a.py b/imap_processing/codice/codice_l1a.py index 5bc0a3a9a3..23d3d35cf1 100644 --- a/imap_processing/codice/codice_l1a.py +++ b/imap_processing/codice/codice_l1a.py @@ -64,6 +64,9 @@ def process_l1a( # noqa: PLR0912 datasets = [] for apid in datasets_by_apid: + if apid not in [CODICEAPID.COD_LO_PHA, CODICEAPID.COD_HI_PHA]: + # XXX Temporary skip all non-direct event apids + continue if apid not in [CODICEAPID.COD_LO_PHA, CODICEAPID.COD_HI_PHA]: # Get LUT file. Direct events do not need LUT lut_file = dependency.get_file_paths(descriptor="l1a-sci-lut") diff --git a/imap_processing/codice/codice_l1a_de.py b/imap_processing/codice/codice_l1a_de.py index 5991fa3749..952e8e476a 100644 --- a/imap_processing/codice/codice_l1a_de.py +++ b/imap_processing/codice/codice_l1a_de.py @@ -117,6 +117,153 @@ def group_data(packets: xr.Dataset) -> list[bytes]: return grouped_data +def combine_segmented_packets(packets: xr.Dataset) -> xr.Dataset: + """ + Combine segmented packets into unsegmented packets. + + All packets have (SHCOARSE, EVENT_DATA, CHKSUM) fields. To combine + the segmented packets, we only concatenate along the EVENT_DATA field + into the first packet of the group. + + Parameters + ---------- + packets : xarray.Dataset + Dataset containing the packets to combine. + + Returns + ------- + combined_packets : xarray.Dataset + Dataset containing the combined packets. + """ + # Identification of group starts (UNSEGMENTED or FIRST_SEGMENT) + is_group_start = (packets.seq_flgs.data == SegmentedPacketOrder.UNSEGMENTED) | ( + packets.seq_flgs.data == SegmentedPacketOrder.FIRST_SEGMENT + ) + + # Assign group IDs using cumulative sum - each group start increments the ID + group_ids = np.cumsum(is_group_start) + + # Get indices of packets we'll keep (first packet of each group) + group_start_indices = np.where(is_group_start)[0] + + # Concatenate event_data in-place for each group + for group_id in np.unique(group_ids): + # Find all packets belonging to this group + group_mask = group_ids == group_id + group_indices = np.where(group_mask)[0] + + # If multiple packets, concatenate into the first packet + if len(group_indices) > 1: + start_index = group_indices[0] + packets["event_data"].data[start_index] = np.sum( + packets.event_data.data[group_indices] + ) + + # Select only the first packet of each group + combined_packets = packets.isel(epoch=group_start_indices) + + return combined_packets + + +def extract_initial_items_from_combined_packets( + packets: xr.Dataset, +) -> xr.Dataset: + """ + Extract metadata fields from the beginning of combined event_data packets. + + Extracts bit fields from the first 20 bytes of each event_data array + and adds them as new variables to the dataset. + + Parameters + ---------- + packets : xr.Dataset + Dataset containing combined packets with event_data. + + Returns + ------- + xr.Dataset + Dataset with extracted metadata fields added. + """ + # Initialize arrays for extracted fields + n_packets = len(packets.epoch) + + # Preallocate arrays + packet_version = np.zeros(n_packets, dtype=np.uint16) + spin_period = np.zeros(n_packets, dtype=np.uint16) + acq_start_seconds = np.zeros(n_packets, dtype=np.uint32) + acq_start_subseconds = np.zeros(n_packets, dtype=np.uint32) + spare_1 = np.zeros(n_packets, dtype=np.uint8) + st_bias_gain_mode = np.zeros(n_packets, dtype=np.uint8) + sw_bias_gain_mode = np.zeros(n_packets, dtype=np.uint8) + priority = np.zeros(n_packets, dtype=np.uint8) + suspect = np.zeros(n_packets, dtype=np.uint8) + compressed = np.zeros(n_packets, dtype=np.uint8) + num_events = np.zeros(n_packets, dtype=np.uint32) + byte_count = np.zeros(n_packets, dtype=np.uint32) + + # Extract fields from each packet + for pkt_idx in range(n_packets): + event_data = packets.event_data.data[pkt_idx] + + # Byte-aligned fields using frombuffer + packet_version[pkt_idx] = np.frombuffer(event_data[0:2], dtype=">u2")[0] + spin_period[pkt_idx] = np.frombuffer(event_data[2:4], dtype=">u2")[0] + acq_start_seconds[pkt_idx] = np.frombuffer(event_data[4:8], dtype=">u4")[0] + + # Non-byte-aligned fields (bytes 8-12 contain mixed bit fields) + # Extract 4 bytes and unpack bit fields + mixed_bytes = np.frombuffer(event_data[8:12], dtype=">u4")[0] + + # acq_start_subseconds: 20 bits (MSB) + acq_start_subseconds[pkt_idx] = (mixed_bytes >> 12) & 0xFFFFF + # spare_1: 2 bits + spare_1[pkt_idx] = (mixed_bytes >> 10) & 0x3 + # st_bias_gain_mode: 2 bits + st_bias_gain_mode[pkt_idx] = (mixed_bytes >> 8) & 0x3 + # sw_bias_gain_mode: 2 bits + sw_bias_gain_mode[pkt_idx] = (mixed_bytes >> 6) & 0x3 + # priority: 4 bits + priority[pkt_idx] = (mixed_bytes >> 2) & 0xF + # suspect: 1 bit + suspect[pkt_idx] = (mixed_bytes >> 1) & 0x1 + # compressed: 1 bit (LSB) + compressed[pkt_idx] = mixed_bytes & 0x1 + + # Remaining byte-aligned fields + num_events[pkt_idx] = np.frombuffer(event_data[12:16], dtype=">u4")[0] + byte_count[pkt_idx] = np.frombuffer(event_data[16:20], dtype=">u4")[0] + + # Remove the first 20 bytes from event_data + packets.event_data.data[pkt_idx] = event_data[20:] + + # Trim to the byte_count field + packets.event_data.data[pkt_idx] = packets.event_data.data[pkt_idx][ + : byte_count[pkt_idx] + ] + + if compressed[pkt_idx]: + packets.event_data.data[pkt_idx] = decompress( + packets.event_data.data[pkt_idx], + CoDICECompression.LOSSLESS, + ) + + # Add extracted fields to dataset + packets["packet_version"] = xr.DataArray(packet_version, dims=["epoch"]) + packets["spin_period"] = xr.DataArray(spin_period, dims=["epoch"]) + packets["acq_start_seconds"] = xr.DataArray(acq_start_seconds, dims=["epoch"]) + packets["acq_start_subseconds"] = xr.DataArray(acq_start_subseconds, dims=["epoch"]) + packets["spare_1"] = xr.DataArray(spare_1, dims=["epoch"]) + packets["st_bias_gain_mode"] = xr.DataArray(st_bias_gain_mode, dims=["epoch"]) + packets["sw_bias_gain_mode"] = xr.DataArray(sw_bias_gain_mode, dims=["epoch"]) + packets["priority"] = xr.DataArray(priority, dims=["epoch"]) + packets["suspect"] = xr.DataArray(suspect, dims=["epoch"]) + packets["compressed"] = xr.DataArray(compressed, dims=["epoch"]) + packets["num_events"] = xr.DataArray(num_events, dims=["epoch"]) + packets["byte_count"] = xr.DataArray(byte_count, dims=["epoch"]) + + return packets + + def unpack_bits(bit_structure: dict, de_data: np.ndarray) -> dict: """ Unpack 64-bit values into separate fields based on bit structure. @@ -162,7 +309,6 @@ def unpack_bits(bit_structure: dict, de_data: np.ndarray) -> dict: def process_de_data( packets: xr.Dataset, - decompressed_data: list[list[int]], apid: int, cdf_attrs: ImapCdfAttributes, ) -> xr.Dataset: @@ -203,7 +349,7 @@ def process_de_data( # Determine the number of epochs to help with data array initialization # There is one epoch per set of priorities - num_epochs = len(decompressed_data) // num_priorities + num_epochs = len(packets["epoch"]) // num_priorities # Initialize data arrays for unpacked 64-bits fields for field in bit_structure: @@ -227,13 +373,10 @@ def process_de_data( ) # Get num_events, data quality, and priorities data for beginning of packet_indexs - packet_index_starts = np.where( - (packets.seq_flgs.data == SegmentedPacketOrder.UNSEGMENTED) - | (packets.seq_flgs.data == SegmentedPacketOrder.FIRST_SEGMENT) - )[0] - num_events_arr = packets.num_events.data[packet_index_starts] - data_quality_arr = packets.suspect.data[packet_index_starts] - priorities_arr = packets.priority.data[packet_index_starts] + num_events_arr = packets.num_events.data + data_quality_arr = packets.suspect.data + priorities_arr = packets.priority.data + event_data_arr = packets.event_data.data # Initialize other fields of l1a that we want to # carry in L1A CDF file @@ -255,16 +398,15 @@ def process_de_data( # (epoch, (num_events * )). # num_events is a variable number per priority. for epoch_index in range(num_epochs): - # current epoch's grouped data are: - # current group's start index * 8 to next group's start indices * 8 - epoch_start = packet_index_starts[epoch_index] * num_priorities - epoch_end = packet_index_starts[epoch_index + 1] * num_priorities # Extract the decompressed data for current epoch. # epoch_data should be of shape ((num_priorities * num_events),) - epoch_data = decompressed_data[epoch_start:epoch_end] + epoch_start = epoch_index * num_priorities + epoch_end = (epoch_index + 1) * num_priorities + epoch_data = event_data_arr[epoch_start:epoch_end] # Extract these other data unordered_priority = priorities_arr[epoch_start:epoch_end] + print(unordered_priority) unordered_data_quality = data_quality_arr[epoch_start:epoch_end] unordered_num_events = num_events_arr[epoch_start:epoch_end] @@ -336,18 +478,11 @@ def l1a_direct_event(unpacked_dataset: xr.Dataset, apid: int) -> xr.Dataset: xarray.Dataset Processed L1A Direct Event dataset. """ - # Group segmented data. - # TODO: this may get replaced with space_packet_parser's functionality - grouped_data = group_data(unpacked_dataset) - - # Decompress data shape is (epoch, priority * num_events) - decompressed_data = [ - decompress( - group, - CoDICECompression.LOSSLESS, - ) - for group in grouped_data - ] + print(unpacked_dataset) + new_dataset = combine_segmented_packets(unpacked_dataset) + new_dataset = extract_initial_items_from_combined_packets(new_dataset) + print(new_dataset) + # raise RuntimeError("Debug stop") # Gather the CDF attributes cdf_attrs = ImapCdfAttributes() @@ -355,7 +490,7 @@ def l1a_direct_event(unpacked_dataset: xr.Dataset, apid: int) -> xr.Dataset: cdf_attrs.add_instrument_variable_attrs("codice", "l1a") # Unpack DE packet data into CDF-ready variables - de_dataset = process_de_data(unpacked_dataset, decompressed_data, apid, cdf_attrs) + de_dataset = process_de_data(new_dataset, apid, cdf_attrs) # Determine the epochs to use in the dataset, which are the epochs whenever # there is a start of a segment and the priority is 0 diff --git a/imap_processing/codice/packet_definitions/codice_packet_definition.xml b/imap_processing/codice/packet_definitions/codice_packet_definition.xml index a50eb3d4b6..d90e269e1c 100644 --- a/imap_processing/codice/packet_definitions/codice_packet_definition.xml +++ b/imap_processing/codice/packet_definitions/codice_packet_definition.xml @@ -1,6 +1,6 @@ - + @@ -45,7 +45,7 @@ - + @@ -297,8 +297,17 @@ + + + + + + + + + - + @@ -358,7 +367,7 @@ - + @@ -439,7 +448,8 @@ - + + @@ -567,46 +577,28 @@ - - - - - - - + - - - - - - - + - - - - - - - + - - - - - - - + - + + + + + + + @@ -615,7 +607,8 @@ - + + @@ -624,7 +617,8 @@ - + + @@ -633,7 +627,8 @@ - + + @@ -688,7 +683,7 @@ - + @@ -697,8 +692,7 @@ - - + @@ -707,8 +701,7 @@ - - + @@ -717,7 +710,7 @@ - + @@ -726,7 +719,7 @@ - + @@ -796,19 +789,19 @@ - + - + - + - + - + @@ -835,15 +828,55 @@ - + - + + + + + - + - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + @@ -1115,7 +1148,7 @@ - + @@ -1145,7 +1178,13 @@ - + + + + + + + @@ -1154,7 +1193,13 @@ - + + + + + + + @@ -1163,7 +1208,13 @@ - + + + + + + + @@ -1172,7 +1223,13 @@ - + + + + + + + @@ -1181,7 +1238,13 @@ - + + + + + + + @@ -1474,74 +1537,12 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + @@ -1737,7 +1738,6 @@ - @@ -2422,74 +2422,12 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + @@ -3104,6 +3042,12 @@ + + INDICATES THE CURRENT OPERATIONAL STATE OF THE LO ESA SWEEP: +- NORMAL - BOTH ESAS ARE TRACKING TOGETHER +- RGFO - REDUCED GAIN FACTOR OPERATION; ESA-A IS REDUCED IN ORDER TO REDUCE THE GAIN FACTOR AND ALLOW FEWER IONS INTO THE DETECTOR +- NSO - NO SCAN OPERATION; BOTH ESAS ARE RETURNED TO A HIGH-ENERGY SETTING AND NO SCANNING IS DONE FOR THE REMAINDER OF THE ESA SWEEP + @@ -3127,17 +3071,27 @@ - + - - - - - - - - - + + ALARM PERSISTENCE = 3 IN OASIS + + + ALARM PERSISTENCE = 3 IN OASIS + + + ALARM PERSISTENCE = 3 IN OASIS + + + ALARM PERSISTENCE = 3 IN OASIS + + + ALARM PERSISTENCE = 3 IN OASIS + + + + + @@ -3159,11 +3113,13 @@ - - - - - + + + + + + EACH BIT INDICATES WHETHER THE CORRESPONDING MACRO IS CURRENTLY RUNNING (E.G. BIT 1 WILL BE SET IF MACRO 1 IS RUNNING) + INDICATES WHETHER ANY CATEGORY 1 LIMITS HAVE TRIGGERED. @@ -3194,7 +3150,7 @@ INDICATES WHETHER THE MOST RECENT TRIGGER WAS A MINIMUM OR MAXIMUM LIMIT - INDICATES THE ID OF THE MOST RECENT FDC TRIGGER + INDICATES THE TABLE INDEX OF THE MOST RECENT FDC TRIGGER INDICATES THE ACTION THAT WAS TAKEN FOR THE MOST RECENT FDC TRIGGER @@ -3208,7 +3164,7 @@ INDICATES WHETHER FSW CONTROL OF THE OPERATIONAL HEATER IS ENABLED - + INDICATES THE CURRENT STATE OF THE PHYSICAL HEATER OUTPUT @@ -3313,40 +3269,6 @@ SECONDARY HEADER - WHOLE-SECONDS PART OF SCLK - - PACKET VERSION - THIS WILL BE INCREMENTED EACH TIME THE FORMAT OF THE PACKET CHANGESCOD_LO_PHA. - - - SPIN PERIOD REPORTED BY THE SPACECRAFT IN THE TIME AND STATUS MESSAGE. REPORTED PERIOD IS THE PERIOD THAT WAS ACTIVE WHEN THE 16-SPIN ACQUISITION CYCLE STARTED. - - - FULL-SECONDS PORTION OF THE TIME AT WHICH THE 16-SPIN CYCLE STARTED - - - SUB-SECONDS PORTION OF THE TIME AT WHICH THE 16-SPIN CYCLE STARTED - - - SPARE FOR ALIGNMENT - - - BIAS GAIN MODE FOR THE SUPRATHERMAL SECTOR - - - BIAS GAIN MODE FOR THE SOLARWIND SECTOR - - - - INDICATES THAT THERE WAS SOME ERROR DETECTED DURING ACQUISITION OR PROCESSING OF THE DATA. ERRORS COULD INCLUDE CORRUPTED ACQUISITION MEMORY (I.E. EDAC ERRORS), TIMING VIOLATIONS, OR OTHER EVENTS THAT INTERRUPTED OR OTHERWISE AFFECTED DATA COLLECTION. - - - WHETHER THE EVENT DATA IS COMPRESSED. IF 1/YES, EVENT_DATA ARRAY IS COMPRESSED USING THE LZMA COMPRESSION ALGORITHM. - - - NUMBER OF EVENTS SELECTED FOR DOWNLINK (I.E. NUMBER OF EVENTS IN THE EVENT_DATA ARRAY) - - - NUMBER OF BYTES IN THE EVENT_DATA ARRAY. IF COMPRESSED, THIS VALUE REPRESENTS THE LENGTH OF THE COMPRESSED DATA. - OPTIONALLY COMPRESSED ARRAY OF EVENT DATA @@ -3999,40 +3921,6 @@ WHEN THIS ARRAY IS TOO LARGE FOR A SINGLE CCSDS PACKET, CODICE WILL UTILIZE THE SECONDARY HEADER - WHOLE-SECONDS PART OF SCLK - - PACKET VERSION - THIS WILL BE INCREMENTED EACH TIME THE FORMAT OF THE PACKET CHANGESCOD_LO_PHA. - - - SPIN PERIOD REPORTED BY THE SPACECRAFT IN THE TIME AND STATUS MESSAGE. REPORTED PERIOD IS THE PERIOD THAT WAS ACTIVE WHEN THE 16-SPIN ACQUISITION CYCLE STARTED. - - - FULL-SECONDS PORTION OF THE TIME AT WHICH THE 16-SPIN CYCLE STARTED - - - SUB-SECONDS PORTION OF THE TIME AT WHICH THE 16-SPIN CYCLE STARTED - - - SPARE FOR ALIGNMENT - - - BIAS GAIN MODE FOR THE SUPRATHERMAL SECTOR - - - BIAS GAIN MODE FOR THE SOLARWIND SECTOR - - - - INDICATES THAT THERE WAS SOME ERROR DETECTED DURING ACQUISITION OR PROCESSING OF THE DATA. ERRORS COULD INCLUDE CORRUPTED ACQUISITION MEMORY (I.E. EDAC ERRORS), TIMING VIOLATIONS, OR OTHER EVENTS THAT INTERRUPTED OR OTHERWISE AFFECTED DATA COLLECTION. - - - WHETHER THE EVENT DATA IS COMPRESSED. IF 1/YES, EVENT_DATA ARRAY IS COMPRESSED USING THE RICE COMPRESSION ALGORITHM. - - - NUMBER OF EVENTS SELECTED FOR DOWNLINK (I.E. NUMBER OF EVENTS IN THE EVENT_DATA ARRAY) - - - NUMBER OF BYTES IN THE EVENT_DATA ARRAY. IF COMPRESSED, THIS VALUE REPRESENTS THE LENGTH OF THE COMPRESSED DATA. - OPTIONALLY COMPRESSED ARRAY OF EVENT DATA @@ -4491,6 +4379,7 @@ WHEN THIS ARRAY IS TOO LARGE FOR A SINGLE CCSDS PACKET, CODICE WILL UTILIZE THE + @@ -4546,11 +4435,11 @@ WHEN THIS ARRAY IS TOO LARGE FOR A SINGLE CCSDS PACKET, CODICE WILL UTILIZE THE - - - - - + + + + + @@ -4565,7 +4454,7 @@ WHEN THIS ARRAY IS TOO LARGE FOR A SINGLE CCSDS PACKET, CODICE WILL UTILIZE THE - + @@ -4632,18 +4521,6 @@ WHEN THIS ARRAY IS TOO LARGE FOR A SINGLE CCSDS PACKET, CODICE WILL UTILIZE THE - - - - - - - - - - - - @@ -4908,18 +4785,6 @@ WHEN THIS ARRAY IS TOO LARGE FOR A SINGLE CCSDS PACKET, CODICE WILL UTILIZE THE - - - - - - - - - - - - From 15c9bb055c95c8b2cf82910d81e62bdb15512d99 Mon Sep 17 00:00:00 2001 From: Greg Lucas Date: Thu, 8 Jan 2026 16:27:55 -0700 Subject: [PATCH 2/8] MNT: Remove unused functions and switch away from from_buffer() calls --- imap_processing/codice/codice_l1a_de.py | 124 ++---------------------- 1 file changed, 9 insertions(+), 115 deletions(-) diff --git a/imap_processing/codice/codice_l1a_de.py b/imap_processing/codice/codice_l1a_de.py index 952e8e476a..32657089b1 100644 --- a/imap_processing/codice/codice_l1a_de.py +++ b/imap_processing/codice/codice_l1a_de.py @@ -17,106 +17,6 @@ from imap_processing.spice.time import met_to_ttj2000ns -def get_de_metadata(packets: xr.Dataset, packet_index: int) -> bytes: - """ - Gather and return packet metadata (From packet_version through byte_count). - - Extract the metadata in the packet_indexed direct event packet, which is then - used to construct the full data of the group of packet_indexs. - - Parameters - ---------- - packets : xarray.Dataset - The packet_indexed direct event packet data. - packet_index : int - The index of the packet_index of interest. - - Returns - ------- - metadata : bytes - The compressed metadata for the packet_indexed packet. - """ - # String together the metadata fields and convert the data to a bytes obj - metadata_str = "" - for field, num_bits in constants.DE_METADATA_FIELDS.items(): - metadata_str += f"{packets[field].data[packet_index]:0{num_bits}b}" - metadata_chunks = [metadata_str[i : i + 8] for i in range(0, len(metadata_str), 8)] - metadata_ints = [int(item, 2) for item in metadata_chunks] - metadata = bytes(metadata_ints) - - return metadata - - -def group_data(packets: xr.Dataset) -> list[bytes]: - """ - Organize continuation packets into appropriate groups. - - Some packets are continuation packets, as in, they are packets that are - part of a group of packets. These packets are marked by the `seq_flgs` field - in the CCSDS header of the packet. For CoDICE, the values are defined as - follows: - - 3 = Packet is not part of a group - 1 = Packet is the first packet of the group - 0 = Packet is in the middle of the group - 2 = Packet is the last packet of the group - - For packets that are part of a group, the byte count associated with the - first packet of the group signifies the byte count for the entire group. - - Parameters - ---------- - packets : xarray.Dataset - Dataset containing the packets to group. - - Returns - ------- - grouped_data : list[bytes] - The packet data, converted to bytes and grouped appropriately. - """ - grouped_data = [] # Holds the properly grouped data to be decompressed - current_group = bytearray() # Temporary storage for current group - group_byte_count = None # Temporary storage for current group byte count - - for packet_index in range(len(packets.event_data.data)): - packet_data = packets.event_data.data[packet_index] - group_code = packets.seq_flgs.data[packet_index] - byte_count = packets.byte_count.data[packet_index] - - # If the group code is 3, this means the data is unsegmented - # and can be decompressed as-is - if group_code == SegmentedPacketOrder.UNSEGMENTED: - grouped_data.append(packet_data[:byte_count]) - - # If the group code is 1, this means the data is the first data in a - # group. Also, set the byte count for the group - elif group_code == SegmentedPacketOrder.FIRST_SEGMENT: - group_byte_count = byte_count - current_group += packet_data - - # If the group code is 0, this means the data is part of the middle of - # the group. - elif group_code == SegmentedPacketOrder.CONTINUATION_SEGMENT: - current_group += get_de_metadata(packets, packet_index) - current_group += packet_data - - # If the group code is 2, this means the data is the last data in the - # group - elif group_code == SegmentedPacketOrder.LAST_SEGMENT: - current_group += get_de_metadata(packets, packet_index) - current_group += packet_data - - # The grouped data is now ready to be decompressed - values_to_decompress = current_group[:group_byte_count] - grouped_data.append(values_to_decompress) - - # Reset the current group - current_group = bytearray() - group_byte_count = None - - return grouped_data - - def combine_segmented_packets(packets: xr.Dataset) -> xr.Dataset: """ Combine segmented packets into unsegmented packets. @@ -205,14 +105,14 @@ def extract_initial_items_from_combined_packets( for pkt_idx in range(n_packets): event_data = packets.event_data.data[pkt_idx] - # Byte-aligned fields using frombuffer - packet_version[pkt_idx] = np.frombuffer(event_data[0:2], dtype=">u2")[0] - spin_period[pkt_idx] = np.frombuffer(event_data[2:4], dtype=">u2")[0] - acq_start_seconds[pkt_idx] = np.frombuffer(event_data[4:8], dtype=">u4")[0] + # Byte-aligned fields using int.from_bytes + packet_version[pkt_idx] = int.from_bytes(event_data[0:2], byteorder="big") + spin_period[pkt_idx] = int.from_bytes(event_data[2:4], byteorder="big") + acq_start_seconds[pkt_idx] = int.from_bytes(event_data[4:8], byteorder="big") # Non-byte-aligned fields (bytes 8-12 contain mixed bit fields) # Extract 4 bytes and unpack bit fields - mixed_bytes = np.frombuffer(event_data[8:12], dtype=">u4")[0] + mixed_bytes = int.from_bytes(event_data[8:12], byteorder="big") # acq_start_subseconds: 20 bits (MSB) acq_start_subseconds[pkt_idx] = (mixed_bytes >> 12) & 0xFFFFF @@ -230,16 +130,12 @@ def extract_initial_items_from_combined_packets( compressed[pkt_idx] = mixed_bytes & 0x1 # Remaining byte-aligned fields - num_events[pkt_idx] = np.frombuffer(event_data[12:16], dtype=">u4")[0] - byte_count[pkt_idx] = np.frombuffer(event_data[16:20], dtype=">u4")[0] + num_events[pkt_idx] = int.from_bytes(event_data[12:16], byteorder="big") + byte_count[pkt_idx] = int.from_bytes(event_data[16:20], byteorder="big") # Remove the first 20 bytes from event_data - packets.event_data.data[pkt_idx] = event_data[20:] - - # Trim to the byte_count field - packets.event_data.data[pkt_idx] = packets.event_data.data[pkt_idx][ - : byte_count[pkt_idx] - ] + # Then trim to the byte_count value + packets.event_data.data[pkt_idx] = event_data[20 : 20 + byte_count[pkt_idx]] if compressed[pkt_idx]: packets.event_data.data[pkt_idx] = decompress( @@ -327,8 +223,6 @@ def process_de_data( packets : xarray.Dataset Dataset containing the packets, needed to determine priority order and data quality. - decompressed_data : list[list[int]] - The decompressed data to reshape, in the format [[]]. apid : int The sensor type, used primarily to determine if the data are from CoDICE-Lo or CoDICE-Hi. From 661adc64e910ec462c6316ab19b384110a70e42e Mon Sep 17 00:00:00 2001 From: Greg Lucas Date: Thu, 8 Jan 2026 17:20:01 -0700 Subject: [PATCH 3/8] Fixing up codice DE --- imap_processing/codice/codice_l1a_de.py | 82 +++++++++---------------- 1 file changed, 28 insertions(+), 54 deletions(-) diff --git a/imap_processing/codice/codice_l1a_de.py b/imap_processing/codice/codice_l1a_de.py index 32657089b1..1f9cdfbf71 100644 --- a/imap_processing/codice/codice_l1a_de.py +++ b/imap_processing/codice/codice_l1a_de.py @@ -292,44 +292,31 @@ def process_de_data( # (epoch, (num_events * )). # num_events is a variable number per priority. for epoch_index in range(num_epochs): - # Extract the decompressed data for current epoch. - # epoch_data should be of shape ((num_priorities * num_events),) - epoch_start = epoch_index * num_priorities - epoch_end = (epoch_index + 1) * num_priorities - epoch_data = event_data_arr[epoch_start:epoch_end] - - # Extract these other data - unordered_priority = priorities_arr[epoch_start:epoch_end] - print(unordered_priority) - unordered_data_quality = data_quality_arr[epoch_start:epoch_end] - unordered_num_events = num_events_arr[epoch_start:epoch_end] - # If priority array unique size is not same size as # num_priorities, then throw error. They should match. - if len(np.unique(unordered_priority)) != num_priorities: + per_epoch_priorities = priorities_arr[ + epoch_index * num_priorities : (epoch_index + 1) * num_priorities + ] + if len(np.unique(per_epoch_priorities)) != num_priorities: raise ValueError( f"Priority array for epoch {epoch_index} contains " - f"non-unique values: {unordered_priority}" + f"non-unique values: {per_epoch_priorities}" ) - # Until here, we have the out of order priority data. Data could have been - # collected in any priority order. Eg. - # priority - [0, 4, 5, 1, 3, 2, 6, 7] - # Now, we need to put data into their respective priority indexes - # in final arrays for the current epoch. Eg. put data into - # priority - [0, 1, 2, 3, 4, 5, 6, 7] - de_data["num_events"][epoch_index, unordered_priority] = unordered_num_events - de_data["data_quality"][epoch_index, unordered_priority] = ( - unordered_data_quality - ) - - # Fill the event data into it's bin in same logic as above. But + # Fill the event data into it's bin. But # since the epoch has different num_events per priority, # we need to loop and index accordingly. Otherwise, numpy throws # 'The detected shape was (n,) + inhomogeneous part' error. - for priority_index in range(len(unordered_priority)): - # Get num_events - priority_num_events = int(unordered_num_events[priority_index]) + for priority_index in range(num_priorities): + event_data_loc = epoch_index * num_priorities + priority_index + curr_priority = priorities_arr[event_data_loc] + + priority_num_events = num_events_arr[event_data_loc] + de_data["num_events"][epoch_index, curr_priority] = priority_num_events + de_data["data_quality"][epoch_index, curr_priority] = data_quality_arr[ + event_data_loc + ] + # Reshape epoch data into (num_events, 8). That 8 is 8-bytes that # make up 64-bits. Therefore, combine last 8 dimension into one to # get 64-bits event data that we need to unpack later. First, @@ -337,7 +324,7 @@ def process_de_data( # we need to make a copy and reverse the byte order # to match LSB order before we use .view. events_in_bytes = ( - np.array(epoch_data[priority_index], dtype=np.uint8) + np.array(event_data_arr[event_data_loc], dtype=np.uint8) .reshape(priority_num_events, 8)[:, ::-1] .copy() ) @@ -346,11 +333,10 @@ def process_de_data( unpacked_fields = unpack_bits(bit_structure, combined_64bits) # Put unpacked event data into their respective variable and priority # number bins - priority_num = int(unordered_priority[priority_index]) for field_name, field_data in unpacked_fields.items(): if field_name not in ["Priority", "Spare"]: de_data[field_name][ - epoch_index, priority_num, :priority_num_events + epoch_index, curr_priority, :priority_num_events ] = field_data return de_data @@ -372,11 +358,8 @@ def l1a_direct_event(unpacked_dataset: xr.Dataset, apid: int) -> xr.Dataset: xarray.Dataset Processed L1A Direct Event dataset. """ - print(unpacked_dataset) - new_dataset = combine_segmented_packets(unpacked_dataset) - new_dataset = extract_initial_items_from_combined_packets(new_dataset) - print(new_dataset) - # raise RuntimeError("Debug stop") + ds = combine_segmented_packets(unpacked_dataset) + ds = extract_initial_items_from_combined_packets(ds) # Gather the CDF attributes cdf_attrs = ImapCdfAttributes() @@ -384,20 +367,7 @@ def l1a_direct_event(unpacked_dataset: xr.Dataset, apid: int) -> xr.Dataset: cdf_attrs.add_instrument_variable_attrs("codice", "l1a") # Unpack DE packet data into CDF-ready variables - de_dataset = process_de_data(new_dataset, apid, cdf_attrs) - - # Determine the epochs to use in the dataset, which are the epochs whenever - # there is a start of a segment and the priority is 0 - epoch_indices = np.where( - ( - (unpacked_dataset.seq_flgs.data == SegmentedPacketOrder.UNSEGMENTED) - | (unpacked_dataset.seq_flgs.data == SegmentedPacketOrder.FIRST_SEGMENT) - ) - & (unpacked_dataset.priority.data == 0) - )[0] - acq_start_seconds = unpacked_dataset.acq_start_seconds[epoch_indices] - acq_start_subseconds = unpacked_dataset.acq_start_subseconds[epoch_indices] - spin_periods = unpacked_dataset.spin_period[epoch_indices] + de_dataset = process_de_data(ds, apid, cdf_attrs) # Calculate epoch variables using sensor id and apid # Provide 0 as default input for other inputs but they @@ -409,8 +379,12 @@ def l1a_direct_event(unpacked_dataset: xr.Dataset, apid: int) -> xr.Dataset: three_d_collapsed=0, view_id=0, ) + ds = ds.isel(epoch=slice(None, None, 8)) epochs, epochs_delta = get_codice_epoch_time( - acq_start_seconds, acq_start_subseconds, spin_periods, view_tab_info + ds["acq_start_seconds"], + ds["acq_start_subseconds"], + ds["spin_period"], + view_tab_info, ) # Define coordinates @@ -490,14 +464,14 @@ def l1a_direct_event(unpacked_dataset: xr.Dataset, apid: int) -> xr.Dataset: ) de_dataset["sw_bias_gain_mode"] = xr.DataArray( - unpacked_dataset["sw_bias_gain_mode"].data[epoch_indices], + ds["sw_bias_gain_mode"].values, name="sw_bias_gain_mode", dims=["epoch"], attrs=cdf_attrs.get_variable_attributes("sw_bias_gain_mode"), ) de_dataset["st_bias_gain_mode"] = xr.DataArray( - unpacked_dataset["st_bias_gain_mode"].data[epoch_indices], + ds["st_bias_gain_mode"].values, name="st_bias_gain_mode", dims=["epoch"], attrs=cdf_attrs.get_variable_attributes("st_bias_gain_mode"), From f414b223cb5cba6a209485ebfa4fec1025720734 Mon Sep 17 00:00:00 2001 From: Greg Lucas Date: Thu, 8 Jan 2026 20:28:58 -0700 Subject: [PATCH 4/8] Refactor codice l1a_de --- imap_processing/codice/codice_l1a_de.py | 318 ++++++++++++------------ 1 file changed, 153 insertions(+), 165 deletions(-) diff --git a/imap_processing/codice/codice_l1a_de.py b/imap_processing/codice/codice_l1a_de.py index 1f9cdfbf71..580bee3aac 100644 --- a/imap_processing/codice/codice_l1a_de.py +++ b/imap_processing/codice/codice_l1a_de.py @@ -209,7 +209,7 @@ def process_de_data( cdf_attrs: ImapCdfAttributes, ) -> xr.Dataset: """ - Reshape the decompressed direct event data into CDF-ready arrays. + Process direct event data into a complete CDF-ready dataset. Unpacking DE needs below for-loops because of many reasons, including: - Need of preserve fillval per field of various bit lengths @@ -221,8 +221,7 @@ def process_de_data( Parameters ---------- packets : xarray.Dataset - Dataset containing the packets, needed to determine priority order - and data quality. + Dataset containing the combined packets with extracted header fields. apid : int The sensor type, used primarily to determine if the data are from CoDICE-Lo or CoDICE-Hi. @@ -231,23 +230,132 @@ def process_de_data( Returns ------- - data : xarray.Dataset - Processed Direct Event data. + xarray.Dataset + Complete processed Direct Event dataset with coordinates and attributes. """ - # xr.Dataset to hold all the (soon to be restructured) direct event data - de_data = xr.Dataset() - - # Extract some useful variables + # Extract configuration for this APID num_priorities = constants.DE_DATA_PRODUCT_CONFIGURATIONS[apid]["num_priorities"] bit_structure = constants.DE_DATA_PRODUCT_CONFIGURATIONS[apid]["bit_structure"] - # Determine the number of epochs to help with data array initialization - # There is one epoch per set of priorities - num_epochs = len(packets["epoch"]) // num_priorities + # There is one epoch per complete set of priorities + # Discard any incomplete group at the end + num_packets = len(packets["epoch"]) + num_epochs = num_packets // num_priorities + num_complete_packets = num_epochs * num_priorities + + # TODO: What do we want to do for how to start/end the grouping? + # It is unclear to me how we know we are starting the priorities + # and epoch picking at the right spot. + if num_complete_packets < num_packets: + # Truncate to only complete priority groups + packets = packets.isel(epoch=slice(None, num_complete_packets)) + + # --- Build coordinates first --- + # Get timing info from the first packet of each epoch (every num_priorities packets) + epoch_packet_indices = slice(None, None, num_priorities) - # Initialize data arrays for unpacked 64-bits fields + view_tab_info = ViewTabInfo( + apid=apid, + sensor=1 if apid == CODICEAPID.COD_HI_PHA else 0, + collapse_table=0, + three_d_collapsed=0, + view_id=0, + ) + epochs, epochs_delta = get_codice_epoch_time( + packets["acq_start_seconds"].isel(epoch=epoch_packet_indices), + packets["acq_start_subseconds"].isel(epoch=epoch_packet_indices), + packets["spin_period"].isel(epoch=epoch_packet_indices), + view_tab_info, + ) + + # Convert to numpy arrays if DataArrays + epochs_data = epochs.values if hasattr(epochs, "values") else epochs + epochs_delta_data = ( + epochs_delta.values if hasattr(epochs_delta, "values") else epochs_delta + ) + epoch_values = met_to_ttj2000ns(epochs_data) + + # Create the dataset with coordinates + de_data = xr.Dataset( + coords={ + "epoch": ( + "epoch", + epoch_values, + cdf_attrs.get_variable_attributes("epoch", check_schema=False), + ), + "epoch_delta_minus": ( + "epoch", + epochs_delta_data, + cdf_attrs.get_variable_attributes( + "epoch_delta_minus", check_schema=False + ), + ), + "epoch_delta_plus": ( + "epoch", + epochs_delta_data, + cdf_attrs.get_variable_attributes( + "epoch_delta_plus", check_schema=False + ), + ), + "event_num": ( + "event_num", + np.arange(constants.MAX_DE_EVENTS_PER_PACKET), + cdf_attrs.get_variable_attributes("event_num", check_schema=False), + ), + "event_num_label": ( + "event_num", + np.arange(constants.MAX_DE_EVENTS_PER_PACKET).astype(str), + cdf_attrs.get_variable_attributes( + "event_num_label", check_schema=False + ), + ), + "priority": ( + "priority", + np.arange(num_priorities), + cdf_attrs.get_variable_attributes("priority", check_schema=False), + ), + "priority_label": ( + "priority", + np.arange(num_priorities).astype(str), + cdf_attrs.get_variable_attributes("priority_label", check_schema=False), + ), + } + ) + + # Set global attributes + if apid == CODICEAPID.COD_LO_PHA: + de_data.attrs = cdf_attrs.get_global_attributes( + "imap_codice_l1a_lo-direct-events" + ) + # Add k_factor for Lo + de_data["k_factor"] = xr.DataArray( + np.array([constants.K_FACTOR]), + name="k_factor", + dims=["k_factor"], + attrs=cdf_attrs.get_variable_attributes("k_factor", check_schema=False), + ) + elif apid == CODICEAPID.COD_HI_PHA: + de_data.attrs = cdf_attrs.get_global_attributes( + "imap_codice_l1a_hi-direct-events" + ) + + # Add bias/gain mode variables (one per epoch, from first packet of each epoch) + de_data["sw_bias_gain_mode"] = xr.DataArray( + packets["sw_bias_gain_mode"].isel(epoch=epoch_packet_indices).values, + name="sw_bias_gain_mode", + dims=["epoch"], + attrs=cdf_attrs.get_variable_attributes("sw_bias_gain_mode"), + ) + de_data["st_bias_gain_mode"] = xr.DataArray( + packets["st_bias_gain_mode"].isel(epoch=epoch_packet_indices).values, + name="st_bias_gain_mode", + dims=["epoch"], + attrs=cdf_attrs.get_variable_attributes("st_bias_gain_mode"), + ) + + # --- Initialize data arrays for unpacked 64-bit fields --- for field in bit_structure: - if field not in ["Priority", "Spare"]: + if field not in ["priority", "spare"]: # Update attrs based on fillval per field fillval = bit_structure[field]["fillval"] dtype = bit_structure[field]["dtype"] @@ -257,7 +365,7 @@ def process_de_data( ) de_data[field] = xr.DataArray( np.full( - (num_epochs, num_priorities, 10000), + (num_epochs, num_priorities, constants.MAX_DE_EVENTS_PER_PACKET), fillval, dtype=dtype, ), @@ -266,21 +374,13 @@ def process_de_data( attrs=attrs, ) - # Get num_events, data quality, and priorities data for beginning of packet_indexs - num_events_arr = packets.num_events.data - data_quality_arr = packets.suspect.data - priorities_arr = packets.priority.data - event_data_arr = packets.event_data.data - - # Initialize other fields of l1a that we want to - # carry in L1A CDF file + # Initialize 2D metadata arrays de_data["num_events"] = xr.DataArray( np.full((num_epochs, num_priorities), 65535, dtype=np.uint16), name="num_events", dims=["epoch", "priority"], attrs=cdf_attrs.get_variable_attributes("de_2d_attrs"), ) - de_data["data_quality"] = xr.DataArray( np.full((num_epochs, num_priorities), 65535, dtype=np.uint16), name="data_quality", @@ -288,53 +388,50 @@ def process_de_data( attrs=cdf_attrs.get_variable_attributes("de_2d_attrs"), ) - # As mentioned above, epoch data is of this shape: - # (epoch, (num_events * )). - # num_events is a variable number per priority. + # --- Unpack event data --- + # Extract arrays for faster access in loop + num_events_arr = packets.num_events.data + data_quality_arr = packets.suspect.data + priorities_arr = packets.priority.data + event_data_arr = packets.event_data.data + for epoch_index in range(num_epochs): - # If priority array unique size is not same size as - # num_priorities, then throw error. They should match. - per_epoch_priorities = priorities_arr[ - epoch_index * num_priorities : (epoch_index + 1) * num_priorities - ] + # Validate that all priorities are present for this epoch + epoch_start = epoch_index * num_priorities + epoch_end = epoch_start + num_priorities + per_epoch_priorities = priorities_arr[epoch_start:epoch_end] if len(np.unique(per_epoch_priorities)) != num_priorities: raise ValueError( f"Priority array for epoch {epoch_index} contains " f"non-unique values: {per_epoch_priorities}" ) - # Fill the event data into it's bin. But - # since the epoch has different num_events per priority, - # we need to loop and index accordingly. Otherwise, numpy throws - # 'The detected shape was (n,) + inhomogeneous part' error. + # Process each priority within this epoch for priority_index in range(num_priorities): - event_data_loc = epoch_index * num_priorities + priority_index - curr_priority = priorities_arr[event_data_loc] + pkt_idx = epoch_start + priority_index + curr_priority = priorities_arr[pkt_idx] + priority_num_events = num_events_arr[pkt_idx] - priority_num_events = num_events_arr[event_data_loc] de_data["num_events"][epoch_index, curr_priority] = priority_num_events de_data["data_quality"][epoch_index, curr_priority] = data_quality_arr[ - event_data_loc + pkt_idx ] - # Reshape epoch data into (num_events, 8). That 8 is 8-bytes that - # make up 64-bits. Therefore, combine last 8 dimension into one to - # get 64-bits event data that we need to unpack later. First, - # combine last 8 dimension into one 64-bits value - # we need to make a copy and reverse the byte order - # to match LSB order before we use .view. + # Reshape event bytes into 64-bit values + # Each event is 8 bytes; reverse byte order for LSB unpacking events_in_bytes = ( - np.array(event_data_arr[event_data_loc], dtype=np.uint8) + np.array(event_data_arr[pkt_idx], dtype=np.uint8) .reshape(priority_num_events, 8)[:, ::-1] .copy() ) combined_64bits = events_in_bytes.view(np.uint64)[:, 0] - # Unpack 64-bits into fields + + # Unpack 64-bit values into individual fields unpacked_fields = unpack_bits(bit_structure, combined_64bits) - # Put unpacked event data into their respective variable and priority - # number bins + + # Store unpacked fields in the dataset for field_name, field_data in unpacked_fields.items(): - if field_name not in ["Priority", "Spare"]: + if field_name not in ["priority", "spare"]: de_data[field_name][ epoch_index, curr_priority, :priority_num_events ] = field_data @@ -358,123 +455,14 @@ def l1a_direct_event(unpacked_dataset: xr.Dataset, apid: int) -> xr.Dataset: xarray.Dataset Processed L1A Direct Event dataset. """ - ds = combine_segmented_packets(unpacked_dataset) - ds = extract_initial_items_from_combined_packets(ds) + # Combine segmented packets and extract header fields + packets = combine_segmented_packets(unpacked_dataset) + packets = extract_initial_items_from_combined_packets(packets) # Gather the CDF attributes cdf_attrs = ImapCdfAttributes() cdf_attrs.add_instrument_global_attrs("codice") cdf_attrs.add_instrument_variable_attrs("codice", "l1a") - # Unpack DE packet data into CDF-ready variables - de_dataset = process_de_data(ds, apid, cdf_attrs) - - # Calculate epoch variables using sensor id and apid - # Provide 0 as default input for other inputs but they - # are not used in epoch calculation - view_tab_info = ViewTabInfo( - apid=apid, - sensor=1 if apid == CODICEAPID.COD_HI_PHA else 0, - collapse_table=0, - three_d_collapsed=0, - view_id=0, - ) - ds = ds.isel(epoch=slice(None, None, 8)) - epochs, epochs_delta = get_codice_epoch_time( - ds["acq_start_seconds"], - ds["acq_start_subseconds"], - ds["spin_period"], - view_tab_info, - ) - - # Define coordinates - epoch = xr.DataArray( - met_to_ttj2000ns(epochs), - name="epoch", - dims=["epoch"], - attrs=cdf_attrs.get_variable_attributes("epoch", check_schema=False), - ) - epoch_delta_minus = xr.DataArray( - epochs_delta, - name="epoch_delta_minus", - dims=["epoch"], - attrs=cdf_attrs.get_variable_attributes( - "epoch_delta_minus", check_schema=False - ), - ) - epoch_delta_plus = xr.DataArray( - epochs_delta, - name="epoch_delta_plus", - dims=["epoch"], - attrs=cdf_attrs.get_variable_attributes("epoch_delta_plus", check_schema=False), - ) - event_num = xr.DataArray( - np.arange(constants.MAX_DE_EVENTS_PER_PACKET), - name="event_num", - dims=["event_num"], - attrs=cdf_attrs.get_variable_attributes("event_num", check_schema=False), - ) - event_num_label = xr.DataArray( - np.arange(constants.MAX_DE_EVENTS_PER_PACKET).astype(str), - name="event_num_label", - dims=["event_num"], - attrs=cdf_attrs.get_variable_attributes("event_num_label", check_schema=False), - ) - priority = xr.DataArray( - np.arange(constants.DE_DATA_PRODUCT_CONFIGURATIONS[apid]["num_priorities"]), - name="priority", - dims=["priority"], - attrs=cdf_attrs.get_variable_attributes("priority", check_schema=False), - ) - priority_label = xr.DataArray( - np.arange( - constants.DE_DATA_PRODUCT_CONFIGURATIONS[apid]["num_priorities"] - ).astype(str), - name="priority_label", - dims=["priority"], - attrs=cdf_attrs.get_variable_attributes("priority_label", check_schema=False), - ) - - # Logical source id to lookup global attributes - if apid == CODICEAPID.COD_LO_PHA: - attrs = cdf_attrs.get_global_attributes("imap_codice_l1a_lo-direct-events") - elif apid == CODICEAPID.COD_HI_PHA: - attrs = cdf_attrs.get_global_attributes("imap_codice_l1a_hi-direct-events") - - # Add coordinates and global attributes to dataset - de_dataset = de_dataset.assign_coords( - epoch=epoch, - epoch_delta_minus=epoch_delta_minus, - epoch_delta_plus=epoch_delta_plus, - event_num=event_num, - event_num_label=event_num_label, - priority=priority, - priority_label=priority_label, - ) - de_dataset.attrs = attrs - - # Carry over these variables from unpacked dataset - if apid == CODICEAPID.COD_LO_PHA: - # Add k_factor - de_dataset["k_factor"] = xr.DataArray( - np.array([constants.K_FACTOR]), - name="k_factor", - dims=["k_factor"], - attrs=cdf_attrs.get_variable_attributes("k_factor", check_schema=False), - ) - - de_dataset["sw_bias_gain_mode"] = xr.DataArray( - ds["sw_bias_gain_mode"].values, - name="sw_bias_gain_mode", - dims=["epoch"], - attrs=cdf_attrs.get_variable_attributes("sw_bias_gain_mode"), - ) - - de_dataset["st_bias_gain_mode"] = xr.DataArray( - ds["st_bias_gain_mode"].values, - name="st_bias_gain_mode", - dims=["epoch"], - attrs=cdf_attrs.get_variable_attributes("st_bias_gain_mode"), - ) - - return de_dataset + # Process packets into complete CDF-ready dataset + return process_de_data(packets, apid, cdf_attrs) From 51f884a1ae6a231fbb625867192f1c2e66747716 Mon Sep 17 00:00:00 2001 From: Greg Lucas Date: Thu, 8 Jan 2026 20:39:46 -0700 Subject: [PATCH 5/8] Vectorizing codice bit unpacking --- imap_processing/codice/codice_l1a_de.py | 109 +++++++++++++++--------- 1 file changed, 70 insertions(+), 39 deletions(-) diff --git a/imap_processing/codice/codice_l1a_de.py b/imap_processing/codice/codice_l1a_de.py index 580bee3aac..35cdbf17d6 100644 --- a/imap_processing/codice/codice_l1a_de.py +++ b/imap_processing/codice/codice_l1a_de.py @@ -389,52 +389,83 @@ def process_de_data( ) # --- Unpack event data --- - # Extract arrays for faster access in loop - num_events_arr = packets.num_events.data - data_quality_arr = packets.suspect.data - priorities_arr = packets.priority.data - event_data_arr = packets.event_data.data - + # Extract arrays for faster access + num_events_arr = packets.num_events.values + data_quality_arr = packets.suspect.values + priorities_arr = packets.priority.values + event_data_arr = packets.event_data.values + + # Reshape priority-related arrays to (num_epochs, num_priorities) + priorities_2d = priorities_arr.reshape(num_epochs, num_priorities) + num_events_2d = num_events_arr.reshape(num_epochs, num_priorities) + data_quality_2d = data_quality_arr.reshape(num_epochs, num_priorities) + + # Validate all epochs have unique priorities for epoch_index in range(num_epochs): - # Validate that all priorities are present for this epoch - epoch_start = epoch_index * num_priorities - epoch_end = epoch_start + num_priorities - per_epoch_priorities = priorities_arr[epoch_start:epoch_end] - if len(np.unique(per_epoch_priorities)) != num_priorities: + if len(np.unique(priorities_2d[epoch_index])) != num_priorities: raise ValueError( f"Priority array for epoch {epoch_index} contains " - f"non-unique values: {per_epoch_priorities}" + f"non-unique values: {priorities_2d[epoch_index]}" ) - # Process each priority within this epoch - for priority_index in range(num_priorities): - pkt_idx = epoch_start + priority_index - curr_priority = priorities_arr[pkt_idx] - priority_num_events = num_events_arr[pkt_idx] - - de_data["num_events"][epoch_index, curr_priority] = priority_num_events - de_data["data_quality"][epoch_index, curr_priority] = data_quality_arr[ - pkt_idx - ] - - # Reshape event bytes into 64-bit values - # Each event is 8 bytes; reverse byte order for LSB unpacking - events_in_bytes = ( - np.array(event_data_arr[pkt_idx], dtype=np.uint8) - .reshape(priority_num_events, 8)[:, ::-1] - .copy() - ) - combined_64bits = events_in_bytes.view(np.uint64)[:, 0] + # Vectorized assignment of num_events and data_quality using advanced indexing + # For each epoch, use the priority values as column indices + epoch_indices = np.arange(num_epochs)[:, np.newaxis] + de_data["num_events"].values[epoch_indices, priorities_2d] = num_events_2d + de_data["data_quality"].values[epoch_indices, priorities_2d] = data_quality_2d + + # --- Process all events in a single batch --- + # Concatenate all event data into one large array, then unpack all at once + # This avoids calling unpack_bits in a loop + + # First, calculate total events and build index arrays for scattering results + total_events = int(np.sum(num_events_arr)) + + if total_events == 0: + return de_data + + # Preallocate the concatenated event bytes array + all_event_bytes = np.zeros((total_events, 8), dtype=np.uint8) + + # Build arrays to track where each event should go in the output + event_epoch_idx = np.zeros(total_events, dtype=np.int32) + event_priority_idx = np.zeros(total_events, dtype=np.int32) + event_position_idx = np.zeros(total_events, dtype=np.int32) + + # Fill the arrays by iterating through packets once + event_offset = 0 + for pkt_idx in range(num_complete_packets): + n_events = int(num_events_arr[pkt_idx]) + if n_events == 0: + continue + + epoch_idx = pkt_idx // num_priorities + priority_val = priorities_arr[pkt_idx] + + # Copy event bytes (reversed for LSB order) + pkt_events = np.array(event_data_arr[pkt_idx], dtype=np.uint8) + pkt_events = pkt_events.reshape(n_events, 8)[:, ::-1] + all_event_bytes[event_offset : event_offset + n_events] = pkt_events + + # Record destination indices + event_epoch_idx[event_offset : event_offset + n_events] = epoch_idx + event_priority_idx[event_offset : event_offset + n_events] = priority_val + event_position_idx[event_offset : event_offset + n_events] = np.arange(n_events) + + event_offset += n_events + + # Convert all events to 64-bit values at once + all_64bits = all_event_bytes.view(np.uint64)[:, 0] - # Unpack 64-bit values into individual fields - unpacked_fields = unpack_bits(bit_structure, combined_64bits) + # Unpack all events at once + unpacked_fields = unpack_bits(bit_structure, all_64bits) - # Store unpacked fields in the dataset - for field_name, field_data in unpacked_fields.items(): - if field_name not in ["priority", "spare"]: - de_data[field_name][ - epoch_index, curr_priority, :priority_num_events - ] = field_data + # Scatter results to the output arrays using advanced indexing + for field_name, field_data in unpacked_fields.items(): + if field_name not in ["priority", "spare"]: + de_data[field_name].values[ + event_epoch_idx, event_priority_idx, event_position_idx + ] = field_data return de_data From 827befcd84355c69910631e002ad32c4e9f34b79 Mon Sep 17 00:00:00 2001 From: Greg Lucas Date: Thu, 8 Jan 2026 21:03:03 -0700 Subject: [PATCH 6/8] Clean up the helper functions for codice de --- imap_processing/codice/codice_l1a.py | 3 - imap_processing/codice/codice_l1a_de.py | 350 +++++++++++++----------- 2 files changed, 187 insertions(+), 166 deletions(-) diff --git a/imap_processing/codice/codice_l1a.py b/imap_processing/codice/codice_l1a.py index 23d3d35cf1..5bc0a3a9a3 100644 --- a/imap_processing/codice/codice_l1a.py +++ b/imap_processing/codice/codice_l1a.py @@ -64,9 +64,6 @@ def process_l1a( # noqa: PLR0912 datasets = [] for apid in datasets_by_apid: - if apid not in [CODICEAPID.COD_LO_PHA, CODICEAPID.COD_HI_PHA]: - # XXX Temporary skip all non-direct event apids - continue if apid not in [CODICEAPID.COD_LO_PHA, CODICEAPID.COD_HI_PHA]: # Get LUT file. Direct events do not need LUT lut_file = dependency.get_file_paths(descriptor="l1a-sci-lut") diff --git a/imap_processing/codice/codice_l1a_de.py b/imap_processing/codice/codice_l1a_de.py index 35cdbf17d6..fb5677e64c 100644 --- a/imap_processing/codice/codice_l1a_de.py +++ b/imap_processing/codice/codice_l1a_de.py @@ -203,56 +203,33 @@ def unpack_bits(bit_structure: dict, de_data: np.ndarray) -> dict: return unpacked -def process_de_data( +def _create_dataset_coords( packets: xr.Dataset, apid: int, + num_priorities: int, cdf_attrs: ImapCdfAttributes, ) -> xr.Dataset: """ - Process direct event data into a complete CDF-ready dataset. - - Unpacking DE needs below for-loops because of many reasons, including: - - Need of preserve fillval per field of various bit lengths - - inability to use nan for 64-bits unpacking - - num_events being variable length per epoch - - binning priorities into its bins - - unpacking 64-bits into fields and indexing correctly + Create the output dataset with coordinates. Parameters ---------- - packets : xarray.Dataset - Dataset containing the combined packets with extracted header fields. + packets : xr.Dataset + Combined packets with extracted header fields. apid : int - The sensor type, used primarily to determine if the data are from - CoDICE-Lo or CoDICE-Hi. + APID for sensor type. + num_priorities : int + Number of priorities for this APID. cdf_attrs : ImapCdfAttributes - The CDF attributes to be added to the dataset. + CDF attributes manager. Returns ------- - xarray.Dataset - Complete processed Direct Event dataset with coordinates and attributes. + xr.Dataset + Dataset with coordinates defined. """ - # Extract configuration for this APID - num_priorities = constants.DE_DATA_PRODUCT_CONFIGURATIONS[apid]["num_priorities"] - bit_structure = constants.DE_DATA_PRODUCT_CONFIGURATIONS[apid]["bit_structure"] - - # There is one epoch per complete set of priorities - # Discard any incomplete group at the end - num_packets = len(packets["epoch"]) - num_epochs = num_packets // num_priorities - num_complete_packets = num_epochs * num_priorities - - # TODO: What do we want to do for how to start/end the grouping? - # It is unclear to me how we know we are starting the priorities - # and epoch picking at the right spot. - if num_complete_packets < num_packets: - # Truncate to only complete priority groups - packets = packets.isel(epoch=slice(None, num_complete_packets)) - - # --- Build coordinates first --- - # Get timing info from the first packet of each epoch (every num_priorities packets) - epoch_packet_indices = slice(None, None, num_priorities) + # Get timing info from the first packet of each epoch + epoch_slice = slice(None, None, num_priorities) view_tab_info = ViewTabInfo( apid=apid, @@ -262,21 +239,18 @@ def process_de_data( view_id=0, ) epochs, epochs_delta = get_codice_epoch_time( - packets["acq_start_seconds"].isel(epoch=epoch_packet_indices), - packets["acq_start_subseconds"].isel(epoch=epoch_packet_indices), - packets["spin_period"].isel(epoch=epoch_packet_indices), + packets["acq_start_seconds"].isel(epoch=epoch_slice), + packets["acq_start_subseconds"].isel(epoch=epoch_slice), + packets["spin_period"].isel(epoch=epoch_slice), view_tab_info, ) - # Convert to numpy arrays if DataArrays - epochs_data = epochs.values if hasattr(epochs, "values") else epochs - epochs_delta_data = ( - epochs_delta.values if hasattr(epochs_delta, "values") else epochs_delta - ) + # Convert to numpy arrays + epochs_data = np.asarray(epochs) + epochs_delta_data = np.asarray(epochs_delta) epoch_values = met_to_ttj2000ns(epochs_data) - # Create the dataset with coordinates - de_data = xr.Dataset( + dataset = xr.Dataset( coords={ "epoch": ( "epoch", @@ -322,150 +296,200 @@ def process_de_data( } ) - # Set global attributes - if apid == CODICEAPID.COD_LO_PHA: - de_data.attrs = cdf_attrs.get_global_attributes( - "imap_codice_l1a_lo-direct-events" - ) - # Add k_factor for Lo - de_data["k_factor"] = xr.DataArray( - np.array([constants.K_FACTOR]), - name="k_factor", - dims=["k_factor"], - attrs=cdf_attrs.get_variable_attributes("k_factor", check_schema=False), - ) - elif apid == CODICEAPID.COD_HI_PHA: - de_data.attrs = cdf_attrs.get_global_attributes( - "imap_codice_l1a_hi-direct-events" - ) + return dataset - # Add bias/gain mode variables (one per epoch, from first packet of each epoch) - de_data["sw_bias_gain_mode"] = xr.DataArray( - packets["sw_bias_gain_mode"].isel(epoch=epoch_packet_indices).values, - name="sw_bias_gain_mode", - dims=["epoch"], - attrs=cdf_attrs.get_variable_attributes("sw_bias_gain_mode"), - ) - de_data["st_bias_gain_mode"] = xr.DataArray( - packets["st_bias_gain_mode"].isel(epoch=epoch_packet_indices).values, - name="st_bias_gain_mode", - dims=["epoch"], - attrs=cdf_attrs.get_variable_attributes("st_bias_gain_mode"), - ) - # --- Initialize data arrays for unpacked 64-bit fields --- - for field in bit_structure: - if field not in ["priority", "spare"]: - # Update attrs based on fillval per field - fillval = bit_structure[field]["fillval"] - dtype = bit_structure[field]["dtype"] - attrs = cdf_attrs.get_variable_attributes("de_3d_attrs") - attrs = apply_replacements_to_attrs( - attrs, {"num_digits": len(str(fillval)), "valid_max": fillval} - ) - de_data[field] = xr.DataArray( - np.full( - (num_epochs, num_priorities, constants.MAX_DE_EVENTS_PER_PACKET), - fillval, - dtype=dtype, - ), - name=field, - dims=["epoch", "priority", "event_num"], - attrs=attrs, - ) +def _unpack_and_store_events( + de_data: xr.Dataset, + packets: xr.Dataset, + num_priorities: int, + bit_structure: dict, + event_fields: list[str], +) -> xr.Dataset: + """ + Unpack all event data and store directly into the dataset arrays. - # Initialize 2D metadata arrays - de_data["num_events"] = xr.DataArray( - np.full((num_epochs, num_priorities), 65535, dtype=np.uint16), - name="num_events", - dims=["epoch", "priority"], - attrs=cdf_attrs.get_variable_attributes("de_2d_attrs"), - ) - de_data["data_quality"] = xr.DataArray( - np.full((num_epochs, num_priorities), 65535, dtype=np.uint16), - name="data_quality", - dims=["epoch", "priority"], - attrs=cdf_attrs.get_variable_attributes("de_2d_attrs"), - ) + Parameters + ---------- + de_data : xr.Dataset + Dataset to store unpacked events into (modified in place). + packets : xr.Dataset + Combined packets with extracted header fields. + num_priorities : int + Number of priorities per epoch. + bit_structure : dict + Bit structure defining how to unpack 64-bit event values. + event_fields : list[str] + List of field names to unpack (excludes priority/spare). - # --- Unpack event data --- - # Extract arrays for faster access + Returns + ------- + xr.Dataset + The dataset with unpacked events stored. + """ + # Extract arrays from packets dataset num_events_arr = packets.num_events.values - data_quality_arr = packets.suspect.values priorities_arr = packets.priority.values event_data_arr = packets.event_data.values - # Reshape priority-related arrays to (num_epochs, num_priorities) - priorities_2d = priorities_arr.reshape(num_epochs, num_priorities) - num_events_2d = num_events_arr.reshape(num_epochs, num_priorities) - data_quality_2d = data_quality_arr.reshape(num_epochs, num_priorities) - - # Validate all epochs have unique priorities - for epoch_index in range(num_epochs): - if len(np.unique(priorities_2d[epoch_index])) != num_priorities: - raise ValueError( - f"Priority array for epoch {epoch_index} contains " - f"non-unique values: {priorities_2d[epoch_index]}" - ) - - # Vectorized assignment of num_events and data_quality using advanced indexing - # For each epoch, use the priority values as column indices - epoch_indices = np.arange(num_epochs)[:, np.newaxis] - de_data["num_events"].values[epoch_indices, priorities_2d] = num_events_2d - de_data["data_quality"].values[epoch_indices, priorities_2d] = data_quality_2d - - # --- Process all events in a single batch --- - # Concatenate all event data into one large array, then unpack all at once - # This avoids calling unpack_bits in a loop - - # First, calculate total events and build index arrays for scattering results total_events = int(np.sum(num_events_arr)) - if total_events == 0: return de_data - # Preallocate the concatenated event bytes array - all_event_bytes = np.zeros((total_events, 8), dtype=np.uint8) + num_packets = len(num_events_arr) - # Build arrays to track where each event should go in the output + # Preallocate arrays for concatenated events and their destination indices + all_event_bytes = np.zeros((total_events, 8), dtype=np.uint8) event_epoch_idx = np.zeros(total_events, dtype=np.int32) event_priority_idx = np.zeros(total_events, dtype=np.int32) event_position_idx = np.zeros(total_events, dtype=np.int32) - # Fill the arrays by iterating through packets once - event_offset = 0 - for pkt_idx in range(num_complete_packets): + # Build concatenated event array and index mappings + offset = 0 + for pkt_idx in range(num_packets): n_events = int(num_events_arr[pkt_idx]) if n_events == 0: continue - epoch_idx = pkt_idx // num_priorities - priority_val = priorities_arr[pkt_idx] + # Extract and byte-reverse events for LSB unpacking + pkt_bytes = np.asarray(event_data_arr[pkt_idx], dtype=np.uint8) + pkt_bytes = pkt_bytes.reshape(n_events, 8)[:, ::-1] + all_event_bytes[offset : offset + n_events] = pkt_bytes - # Copy event bytes (reversed for LSB order) - pkt_events = np.array(event_data_arr[pkt_idx], dtype=np.uint8) - pkt_events = pkt_events.reshape(n_events, 8)[:, ::-1] - all_event_bytes[event_offset : event_offset + n_events] = pkt_events + # Record destination indices for scattering + event_epoch_idx[offset : offset + n_events] = pkt_idx // num_priorities + event_priority_idx[offset : offset + n_events] = priorities_arr[pkt_idx] + event_position_idx[offset : offset + n_events] = np.arange(n_events) - # Record destination indices - event_epoch_idx[event_offset : event_offset + n_events] = epoch_idx - event_priority_idx[event_offset : event_offset + n_events] = priority_val - event_position_idx[event_offset : event_offset + n_events] = np.arange(n_events) + offset += n_events - event_offset += n_events + # Convert bytes to 64-bit values and unpack all fields at once + all_64bits = all_event_bytes.view(np.uint64).ravel() + unpacked = unpack_bits(bit_structure, all_64bits) - # Convert all events to 64-bit values at once - all_64bits = all_event_bytes.view(np.uint64)[:, 0] + # Scatter unpacked values directly into the dataset arrays + for field in event_fields: + de_data[field].values[ + event_epoch_idx, event_priority_idx, event_position_idx + ] = unpacked[field] - # Unpack all events at once - unpacked_fields = unpack_bits(bit_structure, all_64bits) + return de_data + + +def process_de_data( + packets: xr.Dataset, + apid: int, + cdf_attrs: ImapCdfAttributes, +) -> xr.Dataset: + """ + Process direct event data into a complete CDF-ready dataset. + + Parameters + ---------- + packets : xarray.Dataset + Dataset containing the combined packets with extracted header fields. + apid : int + The APID identifying CoDICE-Lo or CoDICE-Hi. + cdf_attrs : ImapCdfAttributes + The CDF attributes manager. + + Returns + ------- + xarray.Dataset + Complete processed Direct Event dataset with coordinates and attributes. + """ + # Get configuration for this APID + config = constants.DE_DATA_PRODUCT_CONFIGURATIONS[apid] + num_priorities = config["num_priorities"] + bit_structure = config["bit_structure"] + + # Truncate to complete priority groups only + num_packets = len(packets["epoch"]) + num_epochs = num_packets // num_priorities + num_complete_packets = num_epochs * num_priorities + if num_complete_packets < num_packets: + packets = packets.isel(epoch=slice(None, num_complete_packets)) - # Scatter results to the output arrays using advanced indexing - for field_name, field_data in unpacked_fields.items(): - if field_name not in ["priority", "spare"]: - de_data[field_name].values[ - event_epoch_idx, event_priority_idx, event_position_idx - ] = field_data + # Create dataset with coordinates + de_data = _create_dataset_coords(packets, apid, num_priorities, cdf_attrs) + + # Set global attributes based on APID + if apid == CODICEAPID.COD_LO_PHA: + de_data.attrs = cdf_attrs.get_global_attributes( + "imap_codice_l1a_lo-direct-events" + ) + de_data["k_factor"] = xr.DataArray( + np.array([constants.K_FACTOR]), + dims=["k_factor"], + attrs=cdf_attrs.get_variable_attributes("k_factor", check_schema=False), + ) + else: + de_data.attrs = cdf_attrs.get_global_attributes( + "imap_codice_l1a_hi-direct-events" + ) + + # Add per-epoch metadata from first packet of each epoch + epoch_slice = slice(None, None, num_priorities) + for var in ["sw_bias_gain_mode", "st_bias_gain_mode"]: + de_data[var] = xr.DataArray( + packets[var].isel(epoch=epoch_slice).values, + dims=["epoch"], + attrs=cdf_attrs.get_variable_attributes(var), + ) + + # Initialize 3D event data arrays with fill values + event_fields = [f for f in bit_structure if f not in ["priority", "spare"]] + for field in event_fields: + info = bit_structure[field] + attrs = apply_replacements_to_attrs( + cdf_attrs.get_variable_attributes("de_3d_attrs"), + {"num_digits": len(str(info["fillval"])), "valid_max": info["fillval"]}, + ) + de_data[field] = xr.DataArray( + np.full( + (num_epochs, num_priorities, constants.MAX_DE_EVENTS_PER_PACKET), + info["fillval"], + dtype=info["dtype"], + ), + dims=["epoch", "priority", "event_num"], + attrs=attrs, + ) + + # Initialize 2D per-priority metadata arrays + for var in ["num_events", "data_quality"]: + de_data[var] = xr.DataArray( + np.full((num_epochs, num_priorities), 65535, dtype=np.uint16), + dims=["epoch", "priority"], + attrs=cdf_attrs.get_variable_attributes("de_2d_attrs"), + ) + + # Reshape packet arrays for validation and assignment + priorities_2d = packets.priority.values.reshape(num_epochs, num_priorities) + num_events_2d = packets.num_events.values.reshape(num_epochs, num_priorities) + data_quality_2d = packets.suspect.values.reshape(num_epochs, num_priorities) + + # Validate each epoch has all unique priorities + unique_counts = np.array([len(np.unique(row)) for row in priorities_2d]) + if np.any(unique_counts != num_priorities): + bad_epoch = np.argmax(unique_counts != num_priorities) + raise ValueError( + f"Priority array for epoch {bad_epoch} contains " + f"non-unique values: {priorities_2d[bad_epoch]}" + ) + + # Assign num_events and data_quality using priorities as column indices + epoch_idx = np.arange(num_epochs)[:, np.newaxis] + de_data["num_events"].values[epoch_idx, priorities_2d] = num_events_2d + de_data["data_quality"].values[epoch_idx, priorities_2d] = data_quality_2d + + # Unpack all events and store directly into dataset arrays + de_data = _unpack_and_store_events( + de_data, + packets, + num_priorities, + bit_structure, + event_fields, + ) return de_data From 1684b3728533adcc9f7b7a86ae62230bddf9418e Mon Sep 17 00:00:00 2001 From: Greg Lucas Date: Fri, 9 Jan 2026 08:09:01 -0700 Subject: [PATCH 7/8] Remove source file update for codice packet def --- .../codice/packet_definitions/codice_packet_definition.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/imap_processing/codice/packet_definitions/codice_packet_definition.xml b/imap_processing/codice/packet_definitions/codice_packet_definition.xml index d90e269e1c..829ca02f04 100644 --- a/imap_processing/codice/packet_definitions/codice_packet_definition.xml +++ b/imap_processing/codice/packet_definitions/codice_packet_definition.xml @@ -1,6 +1,6 @@ - + From d065c55f72ee34d112003cd3c644189750079b19 Mon Sep 17 00:00:00 2001 From: Greg Lucas Date: Fri, 9 Jan 2026 09:06:23 -0700 Subject: [PATCH 8/8] Add spare back into dataset --- imap_processing/codice/codice_l1a_de.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/imap_processing/codice/codice_l1a_de.py b/imap_processing/codice/codice_l1a_de.py index fb5677e64c..826c6ff488 100644 --- a/imap_processing/codice/codice_l1a_de.py +++ b/imap_processing/codice/codice_l1a_de.py @@ -438,7 +438,7 @@ def process_de_data( ) # Initialize 3D event data arrays with fill values - event_fields = [f for f in bit_structure if f not in ["priority", "spare"]] + event_fields = [f for f in bit_structure if f not in ["priority"]] for field in event_fields: info = bit_structure[field] attrs = apply_replacements_to_attrs(