diff --git a/imap_processing/codice/codice_l1a_de.py b/imap_processing/codice/codice_l1a_de.py index 5991fa3749..826c6ff488 100644 --- a/imap_processing/codice/codice_l1a_de.py +++ b/imap_processing/codice/codice_l1a_de.py @@ -17,104 +17,147 @@ from imap_processing.spice.time import met_to_ttj2000ns -def get_de_metadata(packets: xr.Dataset, packet_index: int) -> bytes: +def combine_segmented_packets(packets: xr.Dataset) -> xr.Dataset: """ - Gather and return packet metadata (From packet_version through byte_count). + Combine segmented packets into unsegmented packets. - Extract the metadata in the packet_indexed direct event packet, which is then - used to construct the full data of the group of packet_indexs. + All packets have (SHCOARSE, EVENT_DATA, CHKSUM) fields. To combine + the segmented packets, we only concatenate along the EVENT_DATA field + into the first packet of the group. Parameters ---------- packets : xarray.Dataset - The packet_indexed direct event packet data. - packet_index : int - The index of the packet_index of interest. + Dataset containing the packets to combine. Returns ------- - metadata : bytes - The compressed metadata for the packet_indexed packet. + combined_packets : xarray.Dataset + Dataset containing the combined packets. """ - # String together the metadata fields and convert the data to a bytes obj - metadata_str = "" - for field, num_bits in constants.DE_METADATA_FIELDS.items(): - metadata_str += f"{packets[field].data[packet_index]:0{num_bits}b}" - metadata_chunks = [metadata_str[i : i + 8] for i in range(0, len(metadata_str), 8)] - metadata_ints = [int(item, 2) for item in metadata_chunks] - metadata = bytes(metadata_ints) + # Identification of group starts (UNSEGMENTED or FIRST_SEGMENT) + is_group_start = (packets.seq_flgs.data == SegmentedPacketOrder.UNSEGMENTED) | ( + packets.seq_flgs.data == SegmentedPacketOrder.FIRST_SEGMENT + ) - return metadata + # Assign group IDs using cumulative sum - each group start increments the ID + group_ids = np.cumsum(is_group_start) + # Get indices of packets we'll keep (first packet of each group) + group_start_indices = np.where(is_group_start)[0] -def group_data(packets: xr.Dataset) -> list[bytes]: - """ - Organize continuation packets into appropriate groups. + # Concatenate event_data in-place for each group + for group_id in np.unique(group_ids): + # Find all packets belonging to this group + group_mask = group_ids == group_id + group_indices = np.where(group_mask)[0] + + # If multiple packets, concatenate into the first packet + if len(group_indices) > 1: + start_index = group_indices[0] + packets["event_data"].data[start_index] = np.sum( + packets.event_data.data[group_indices] + ) - Some packets are continuation packets, as in, they are packets that are - part of a group of packets. These packets are marked by the `seq_flgs` field - in the CCSDS header of the packet. For CoDICE, the values are defined as - follows: + # Select only the first packet of each group + combined_packets = packets.isel(epoch=group_start_indices) - 3 = Packet is not part of a group - 1 = Packet is the first packet of the group - 0 = Packet is in the middle of the group - 2 = Packet is the last packet of the group + return combined_packets - For packets that are part of a group, the byte count associated with the - first packet of the group signifies the byte count for the entire group. + +def extract_initial_items_from_combined_packets( + packets: xr.Dataset, +) -> xr.Dataset: + """ + Extract metadata fields from the beginning of combined event_data packets. + + Extracts bit fields from the first 20 bytes of each event_data array + and adds them as new variables to the dataset. Parameters ---------- - packets : xarray.Dataset - Dataset containing the packets to group. + packets : xr.Dataset + Dataset containing combined packets with event_data. Returns ------- - grouped_data : list[bytes] - The packet data, converted to bytes and grouped appropriately. + xr.Dataset + Dataset with extracted metadata fields added. """ - grouped_data = [] # Holds the properly grouped data to be decompressed - current_group = bytearray() # Temporary storage for current group - group_byte_count = None # Temporary storage for current group byte count - - for packet_index in range(len(packets.event_data.data)): - packet_data = packets.event_data.data[packet_index] - group_code = packets.seq_flgs.data[packet_index] - byte_count = packets.byte_count.data[packet_index] - - # If the group code is 3, this means the data is unsegmented - # and can be decompressed as-is - if group_code == SegmentedPacketOrder.UNSEGMENTED: - grouped_data.append(packet_data[:byte_count]) - - # If the group code is 1, this means the data is the first data in a - # group. Also, set the byte count for the group - elif group_code == SegmentedPacketOrder.FIRST_SEGMENT: - group_byte_count = byte_count - current_group += packet_data - - # If the group code is 0, this means the data is part of the middle of - # the group. - elif group_code == SegmentedPacketOrder.CONTINUATION_SEGMENT: - current_group += get_de_metadata(packets, packet_index) - current_group += packet_data - - # If the group code is 2, this means the data is the last data in the - # group - elif group_code == SegmentedPacketOrder.LAST_SEGMENT: - current_group += get_de_metadata(packets, packet_index) - current_group += packet_data - - # The grouped data is now ready to be decompressed - values_to_decompress = current_group[:group_byte_count] - grouped_data.append(values_to_decompress) + # Initialize arrays for extracted fields + n_packets = len(packets.epoch) + + # Preallocate arrays + packet_version = np.zeros(n_packets, dtype=np.uint16) + spin_period = np.zeros(n_packets, dtype=np.uint16) + acq_start_seconds = np.zeros(n_packets, dtype=np.uint32) + acq_start_subseconds = np.zeros(n_packets, dtype=np.uint32) + spare_1 = np.zeros(n_packets, dtype=np.uint8) + st_bias_gain_mode = np.zeros(n_packets, dtype=np.uint8) + sw_bias_gain_mode = np.zeros(n_packets, dtype=np.uint8) + priority = np.zeros(n_packets, dtype=np.uint8) + suspect = np.zeros(n_packets, dtype=np.uint8) + compressed = np.zeros(n_packets, dtype=np.uint8) + num_events = np.zeros(n_packets, dtype=np.uint32) + byte_count = np.zeros(n_packets, dtype=np.uint32) + + # Extract fields from each packet + for pkt_idx in range(n_packets): + event_data = packets.event_data.data[pkt_idx] + + # Byte-aligned fields using int.from_bytes + packet_version[pkt_idx] = int.from_bytes(event_data[0:2], byteorder="big") + spin_period[pkt_idx] = int.from_bytes(event_data[2:4], byteorder="big") + acq_start_seconds[pkt_idx] = int.from_bytes(event_data[4:8], byteorder="big") + + # Non-byte-aligned fields (bytes 8-12 contain mixed bit fields) + # Extract 4 bytes and unpack bit fields + mixed_bytes = int.from_bytes(event_data[8:12], byteorder="big") + + # acq_start_subseconds: 20 bits (MSB) + acq_start_subseconds[pkt_idx] = (mixed_bytes >> 12) & 0xFFFFF + # spare_1: 2 bits + spare_1[pkt_idx] = (mixed_bytes >> 10) & 0x3 + # st_bias_gain_mode: 2 bits + st_bias_gain_mode[pkt_idx] = (mixed_bytes >> 8) & 0x3 + # sw_bias_gain_mode: 2 bits + sw_bias_gain_mode[pkt_idx] = (mixed_bytes >> 6) & 0x3 + # priority: 4 bits + priority[pkt_idx] = (mixed_bytes >> 2) & 0xF + # suspect: 1 bit + suspect[pkt_idx] = (mixed_bytes >> 1) & 0x1 + # compressed: 1 bit (LSB) + compressed[pkt_idx] = mixed_bytes & 0x1 + + # Remaining byte-aligned fields + num_events[pkt_idx] = int.from_bytes(event_data[12:16], byteorder="big") + byte_count[pkt_idx] = int.from_bytes(event_data[16:20], byteorder="big") + + # Remove the first 20 bytes from event_data + # Then trim to the byte_count value + packets.event_data.data[pkt_idx] = event_data[20 : 20 + byte_count[pkt_idx]] + + if compressed[pkt_idx]: + packets.event_data.data[pkt_idx] = decompress( + packets.event_data.data[pkt_idx], + CoDICECompression.LOSSLESS, + ) - # Reset the current group - current_group = bytearray() - group_byte_count = None + # Add extracted fields to dataset + packets["packet_version"] = xr.DataArray(packet_version, dims=["epoch"]) + packets["spin_period"] = xr.DataArray(spin_period, dims=["epoch"]) + packets["acq_start_seconds"] = xr.DataArray(acq_start_seconds, dims=["epoch"]) + packets["acq_start_subseconds"] = xr.DataArray(acq_start_subseconds, dims=["epoch"]) + packets["spare_1"] = xr.DataArray(spare_1, dims=["epoch"]) + packets["st_bias_gain_mode"] = xr.DataArray(st_bias_gain_mode, dims=["epoch"]) + packets["sw_bias_gain_mode"] = xr.DataArray(sw_bias_gain_mode, dims=["epoch"]) + packets["priority"] = xr.DataArray(priority, dims=["epoch"]) + packets["suspect"] = xr.DataArray(suspect, dims=["epoch"]) + packets["compressed"] = xr.DataArray(compressed, dims=["epoch"]) + packets["num_events"] = xr.DataArray(num_events, dims=["epoch"]) + packets["byte_count"] = xr.DataArray(byte_count, dims=["epoch"]) - return grouped_data + return packets def unpack_bits(bit_structure: dict, de_data: np.ndarray) -> dict: @@ -160,162 +203,293 @@ def unpack_bits(bit_structure: dict, de_data: np.ndarray) -> dict: return unpacked -def process_de_data( +def _create_dataset_coords( packets: xr.Dataset, - decompressed_data: list[list[int]], apid: int, + num_priorities: int, cdf_attrs: ImapCdfAttributes, ) -> xr.Dataset: """ - Reshape the decompressed direct event data into CDF-ready arrays. - - Unpacking DE needs below for-loops because of many reasons, including: - - Need of preserve fillval per field of various bit lengths - - inability to use nan for 64-bits unpacking - - num_events being variable length per epoch - - binning priorities into its bins - - unpacking 64-bits into fields and indexing correctly + Create the output dataset with coordinates. Parameters ---------- - packets : xarray.Dataset - Dataset containing the packets, needed to determine priority order - and data quality. - decompressed_data : list[list[int]] - The decompressed data to reshape, in the format [[]]. + packets : xr.Dataset + Combined packets with extracted header fields. apid : int - The sensor type, used primarily to determine if the data are from - CoDICE-Lo or CoDICE-Hi. + APID for sensor type. + num_priorities : int + Number of priorities for this APID. cdf_attrs : ImapCdfAttributes - The CDF attributes to be added to the dataset. + CDF attributes manager. Returns ------- - data : xarray.Dataset - Processed Direct Event data. + xr.Dataset + Dataset with coordinates defined. """ - # xr.Dataset to hold all the (soon to be restructured) direct event data - de_data = xr.Dataset() - - # Extract some useful variables - num_priorities = constants.DE_DATA_PRODUCT_CONFIGURATIONS[apid]["num_priorities"] - bit_structure = constants.DE_DATA_PRODUCT_CONFIGURATIONS[apid]["bit_structure"] - - # Determine the number of epochs to help with data array initialization - # There is one epoch per set of priorities - num_epochs = len(decompressed_data) // num_priorities - - # Initialize data arrays for unpacked 64-bits fields - for field in bit_structure: - if field not in ["Priority", "Spare"]: - # Update attrs based on fillval per field - fillval = bit_structure[field]["fillval"] - dtype = bit_structure[field]["dtype"] - attrs = cdf_attrs.get_variable_attributes("de_3d_attrs") - attrs = apply_replacements_to_attrs( - attrs, {"num_digits": len(str(fillval)), "valid_max": fillval} - ) - de_data[field] = xr.DataArray( - np.full( - (num_epochs, num_priorities, 10000), - fillval, - dtype=dtype, - ), - name=field, - dims=["epoch", "priority", "event_num"], - attrs=attrs, - ) + # Get timing info from the first packet of each epoch + epoch_slice = slice(None, None, num_priorities) - # Get num_events, data quality, and priorities data for beginning of packet_indexs - packet_index_starts = np.where( - (packets.seq_flgs.data == SegmentedPacketOrder.UNSEGMENTED) - | (packets.seq_flgs.data == SegmentedPacketOrder.FIRST_SEGMENT) - )[0] - num_events_arr = packets.num_events.data[packet_index_starts] - data_quality_arr = packets.suspect.data[packet_index_starts] - priorities_arr = packets.priority.data[packet_index_starts] - - # Initialize other fields of l1a that we want to - # carry in L1A CDF file - de_data["num_events"] = xr.DataArray( - np.full((num_epochs, num_priorities), 65535, dtype=np.uint16), - name="num_events", - dims=["epoch", "priority"], - attrs=cdf_attrs.get_variable_attributes("de_2d_attrs"), + view_tab_info = ViewTabInfo( + apid=apid, + sensor=1 if apid == CODICEAPID.COD_HI_PHA else 0, + collapse_table=0, + three_d_collapsed=0, + view_id=0, + ) + epochs, epochs_delta = get_codice_epoch_time( + packets["acq_start_seconds"].isel(epoch=epoch_slice), + packets["acq_start_subseconds"].isel(epoch=epoch_slice), + packets["spin_period"].isel(epoch=epoch_slice), + view_tab_info, ) - de_data["data_quality"] = xr.DataArray( - np.full((num_epochs, num_priorities), 65535, dtype=np.uint16), - name="data_quality", - dims=["epoch", "priority"], - attrs=cdf_attrs.get_variable_attributes("de_2d_attrs"), + # Convert to numpy arrays + epochs_data = np.asarray(epochs) + epochs_delta_data = np.asarray(epochs_delta) + epoch_values = met_to_ttj2000ns(epochs_data) + + dataset = xr.Dataset( + coords={ + "epoch": ( + "epoch", + epoch_values, + cdf_attrs.get_variable_attributes("epoch", check_schema=False), + ), + "epoch_delta_minus": ( + "epoch", + epochs_delta_data, + cdf_attrs.get_variable_attributes( + "epoch_delta_minus", check_schema=False + ), + ), + "epoch_delta_plus": ( + "epoch", + epochs_delta_data, + cdf_attrs.get_variable_attributes( + "epoch_delta_plus", check_schema=False + ), + ), + "event_num": ( + "event_num", + np.arange(constants.MAX_DE_EVENTS_PER_PACKET), + cdf_attrs.get_variable_attributes("event_num", check_schema=False), + ), + "event_num_label": ( + "event_num", + np.arange(constants.MAX_DE_EVENTS_PER_PACKET).astype(str), + cdf_attrs.get_variable_attributes( + "event_num_label", check_schema=False + ), + ), + "priority": ( + "priority", + np.arange(num_priorities), + cdf_attrs.get_variable_attributes("priority", check_schema=False), + ), + "priority_label": ( + "priority", + np.arange(num_priorities).astype(str), + cdf_attrs.get_variable_attributes("priority_label", check_schema=False), + ), + } ) - # As mentioned above, epoch data is of this shape: - # (epoch, (num_events * )). - # num_events is a variable number per priority. - for epoch_index in range(num_epochs): - # current epoch's grouped data are: - # current group's start index * 8 to next group's start indices * 8 - epoch_start = packet_index_starts[epoch_index] * num_priorities - epoch_end = packet_index_starts[epoch_index + 1] * num_priorities - # Extract the decompressed data for current epoch. - # epoch_data should be of shape ((num_priorities * num_events),) - epoch_data = decompressed_data[epoch_start:epoch_end] - - # Extract these other data - unordered_priority = priorities_arr[epoch_start:epoch_end] - unordered_data_quality = data_quality_arr[epoch_start:epoch_end] - unordered_num_events = num_events_arr[epoch_start:epoch_end] - - # If priority array unique size is not same size as - # num_priorities, then throw error. They should match. - if len(np.unique(unordered_priority)) != num_priorities: - raise ValueError( - f"Priority array for epoch {epoch_index} contains " - f"non-unique values: {unordered_priority}" - ) + return dataset + + +def _unpack_and_store_events( + de_data: xr.Dataset, + packets: xr.Dataset, + num_priorities: int, + bit_structure: dict, + event_fields: list[str], +) -> xr.Dataset: + """ + Unpack all event data and store directly into the dataset arrays. + + Parameters + ---------- + de_data : xr.Dataset + Dataset to store unpacked events into (modified in place). + packets : xr.Dataset + Combined packets with extracted header fields. + num_priorities : int + Number of priorities per epoch. + bit_structure : dict + Bit structure defining how to unpack 64-bit event values. + event_fields : list[str] + List of field names to unpack (excludes priority/spare). + + Returns + ------- + xr.Dataset + The dataset with unpacked events stored. + """ + # Extract arrays from packets dataset + num_events_arr = packets.num_events.values + priorities_arr = packets.priority.values + event_data_arr = packets.event_data.values + + total_events = int(np.sum(num_events_arr)) + if total_events == 0: + return de_data + + num_packets = len(num_events_arr) + + # Preallocate arrays for concatenated events and their destination indices + all_event_bytes = np.zeros((total_events, 8), dtype=np.uint8) + event_epoch_idx = np.zeros(total_events, dtype=np.int32) + event_priority_idx = np.zeros(total_events, dtype=np.int32) + event_position_idx = np.zeros(total_events, dtype=np.int32) + + # Build concatenated event array and index mappings + offset = 0 + for pkt_idx in range(num_packets): + n_events = int(num_events_arr[pkt_idx]) + if n_events == 0: + continue + + # Extract and byte-reverse events for LSB unpacking + pkt_bytes = np.asarray(event_data_arr[pkt_idx], dtype=np.uint8) + pkt_bytes = pkt_bytes.reshape(n_events, 8)[:, ::-1] + all_event_bytes[offset : offset + n_events] = pkt_bytes + + # Record destination indices for scattering + event_epoch_idx[offset : offset + n_events] = pkt_idx // num_priorities + event_priority_idx[offset : offset + n_events] = priorities_arr[pkt_idx] + event_position_idx[offset : offset + n_events] = np.arange(n_events) + + offset += n_events + + # Convert bytes to 64-bit values and unpack all fields at once + all_64bits = all_event_bytes.view(np.uint64).ravel() + unpacked = unpack_bits(bit_structure, all_64bits) + + # Scatter unpacked values directly into the dataset arrays + for field in event_fields: + de_data[field].values[ + event_epoch_idx, event_priority_idx, event_position_idx + ] = unpacked[field] + + return de_data + - # Until here, we have the out of order priority data. Data could have been - # collected in any priority order. Eg. - # priority - [0, 4, 5, 1, 3, 2, 6, 7] - # Now, we need to put data into their respective priority indexes - # in final arrays for the current epoch. Eg. put data into - # priority - [0, 1, 2, 3, 4, 5, 6, 7] - de_data["num_events"][epoch_index, unordered_priority] = unordered_num_events - de_data["data_quality"][epoch_index, unordered_priority] = ( - unordered_data_quality +def process_de_data( + packets: xr.Dataset, + apid: int, + cdf_attrs: ImapCdfAttributes, +) -> xr.Dataset: + """ + Process direct event data into a complete CDF-ready dataset. + + Parameters + ---------- + packets : xarray.Dataset + Dataset containing the combined packets with extracted header fields. + apid : int + The APID identifying CoDICE-Lo or CoDICE-Hi. + cdf_attrs : ImapCdfAttributes + The CDF attributes manager. + + Returns + ------- + xarray.Dataset + Complete processed Direct Event dataset with coordinates and attributes. + """ + # Get configuration for this APID + config = constants.DE_DATA_PRODUCT_CONFIGURATIONS[apid] + num_priorities = config["num_priorities"] + bit_structure = config["bit_structure"] + + # Truncate to complete priority groups only + num_packets = len(packets["epoch"]) + num_epochs = num_packets // num_priorities + num_complete_packets = num_epochs * num_priorities + if num_complete_packets < num_packets: + packets = packets.isel(epoch=slice(None, num_complete_packets)) + + # Create dataset with coordinates + de_data = _create_dataset_coords(packets, apid, num_priorities, cdf_attrs) + + # Set global attributes based on APID + if apid == CODICEAPID.COD_LO_PHA: + de_data.attrs = cdf_attrs.get_global_attributes( + "imap_codice_l1a_lo-direct-events" + ) + de_data["k_factor"] = xr.DataArray( + np.array([constants.K_FACTOR]), + dims=["k_factor"], + attrs=cdf_attrs.get_variable_attributes("k_factor", check_schema=False), + ) + else: + de_data.attrs = cdf_attrs.get_global_attributes( + "imap_codice_l1a_hi-direct-events" ) - # Fill the event data into it's bin in same logic as above. But - # since the epoch has different num_events per priority, - # we need to loop and index accordingly. Otherwise, numpy throws - # 'The detected shape was (n,) + inhomogeneous part' error. - for priority_index in range(len(unordered_priority)): - # Get num_events - priority_num_events = int(unordered_num_events[priority_index]) - # Reshape epoch data into (num_events, 8). That 8 is 8-bytes that - # make up 64-bits. Therefore, combine last 8 dimension into one to - # get 64-bits event data that we need to unpack later. First, - # combine last 8 dimension into one 64-bits value - # we need to make a copy and reverse the byte order - # to match LSB order before we use .view. - events_in_bytes = ( - np.array(epoch_data[priority_index], dtype=np.uint8) - .reshape(priority_num_events, 8)[:, ::-1] - .copy() - ) - combined_64bits = events_in_bytes.view(np.uint64)[:, 0] - # Unpack 64-bits into fields - unpacked_fields = unpack_bits(bit_structure, combined_64bits) - # Put unpacked event data into their respective variable and priority - # number bins - priority_num = int(unordered_priority[priority_index]) - for field_name, field_data in unpacked_fields.items(): - if field_name not in ["Priority", "Spare"]: - de_data[field_name][ - epoch_index, priority_num, :priority_num_events - ] = field_data + # Add per-epoch metadata from first packet of each epoch + epoch_slice = slice(None, None, num_priorities) + for var in ["sw_bias_gain_mode", "st_bias_gain_mode"]: + de_data[var] = xr.DataArray( + packets[var].isel(epoch=epoch_slice).values, + dims=["epoch"], + attrs=cdf_attrs.get_variable_attributes(var), + ) + + # Initialize 3D event data arrays with fill values + event_fields = [f for f in bit_structure if f not in ["priority"]] + for field in event_fields: + info = bit_structure[field] + attrs = apply_replacements_to_attrs( + cdf_attrs.get_variable_attributes("de_3d_attrs"), + {"num_digits": len(str(info["fillval"])), "valid_max": info["fillval"]}, + ) + de_data[field] = xr.DataArray( + np.full( + (num_epochs, num_priorities, constants.MAX_DE_EVENTS_PER_PACKET), + info["fillval"], + dtype=info["dtype"], + ), + dims=["epoch", "priority", "event_num"], + attrs=attrs, + ) + + # Initialize 2D per-priority metadata arrays + for var in ["num_events", "data_quality"]: + de_data[var] = xr.DataArray( + np.full((num_epochs, num_priorities), 65535, dtype=np.uint16), + dims=["epoch", "priority"], + attrs=cdf_attrs.get_variable_attributes("de_2d_attrs"), + ) + + # Reshape packet arrays for validation and assignment + priorities_2d = packets.priority.values.reshape(num_epochs, num_priorities) + num_events_2d = packets.num_events.values.reshape(num_epochs, num_priorities) + data_quality_2d = packets.suspect.values.reshape(num_epochs, num_priorities) + + # Validate each epoch has all unique priorities + unique_counts = np.array([len(np.unique(row)) for row in priorities_2d]) + if np.any(unique_counts != num_priorities): + bad_epoch = np.argmax(unique_counts != num_priorities) + raise ValueError( + f"Priority array for epoch {bad_epoch} contains " + f"non-unique values: {priorities_2d[bad_epoch]}" + ) + + # Assign num_events and data_quality using priorities as column indices + epoch_idx = np.arange(num_epochs)[:, np.newaxis] + de_data["num_events"].values[epoch_idx, priorities_2d] = num_events_2d + de_data["data_quality"].values[epoch_idx, priorities_2d] = data_quality_2d + + # Unpack all events and store directly into dataset arrays + de_data = _unpack_and_store_events( + de_data, + packets, + num_priorities, + bit_structure, + event_fields, + ) return de_data @@ -336,142 +510,14 @@ def l1a_direct_event(unpacked_dataset: xr.Dataset, apid: int) -> xr.Dataset: xarray.Dataset Processed L1A Direct Event dataset. """ - # Group segmented data. - # TODO: this may get replaced with space_packet_parser's functionality - grouped_data = group_data(unpacked_dataset) - - # Decompress data shape is (epoch, priority * num_events) - decompressed_data = [ - decompress( - group, - CoDICECompression.LOSSLESS, - ) - for group in grouped_data - ] + # Combine segmented packets and extract header fields + packets = combine_segmented_packets(unpacked_dataset) + packets = extract_initial_items_from_combined_packets(packets) # Gather the CDF attributes cdf_attrs = ImapCdfAttributes() cdf_attrs.add_instrument_global_attrs("codice") cdf_attrs.add_instrument_variable_attrs("codice", "l1a") - # Unpack DE packet data into CDF-ready variables - de_dataset = process_de_data(unpacked_dataset, decompressed_data, apid, cdf_attrs) - - # Determine the epochs to use in the dataset, which are the epochs whenever - # there is a start of a segment and the priority is 0 - epoch_indices = np.where( - ( - (unpacked_dataset.seq_flgs.data == SegmentedPacketOrder.UNSEGMENTED) - | (unpacked_dataset.seq_flgs.data == SegmentedPacketOrder.FIRST_SEGMENT) - ) - & (unpacked_dataset.priority.data == 0) - )[0] - acq_start_seconds = unpacked_dataset.acq_start_seconds[epoch_indices] - acq_start_subseconds = unpacked_dataset.acq_start_subseconds[epoch_indices] - spin_periods = unpacked_dataset.spin_period[epoch_indices] - - # Calculate epoch variables using sensor id and apid - # Provide 0 as default input for other inputs but they - # are not used in epoch calculation - view_tab_info = ViewTabInfo( - apid=apid, - sensor=1 if apid == CODICEAPID.COD_HI_PHA else 0, - collapse_table=0, - three_d_collapsed=0, - view_id=0, - ) - epochs, epochs_delta = get_codice_epoch_time( - acq_start_seconds, acq_start_subseconds, spin_periods, view_tab_info - ) - - # Define coordinates - epoch = xr.DataArray( - met_to_ttj2000ns(epochs), - name="epoch", - dims=["epoch"], - attrs=cdf_attrs.get_variable_attributes("epoch", check_schema=False), - ) - epoch_delta_minus = xr.DataArray( - epochs_delta, - name="epoch_delta_minus", - dims=["epoch"], - attrs=cdf_attrs.get_variable_attributes( - "epoch_delta_minus", check_schema=False - ), - ) - epoch_delta_plus = xr.DataArray( - epochs_delta, - name="epoch_delta_plus", - dims=["epoch"], - attrs=cdf_attrs.get_variable_attributes("epoch_delta_plus", check_schema=False), - ) - event_num = xr.DataArray( - np.arange(constants.MAX_DE_EVENTS_PER_PACKET), - name="event_num", - dims=["event_num"], - attrs=cdf_attrs.get_variable_attributes("event_num", check_schema=False), - ) - event_num_label = xr.DataArray( - np.arange(constants.MAX_DE_EVENTS_PER_PACKET).astype(str), - name="event_num_label", - dims=["event_num"], - attrs=cdf_attrs.get_variable_attributes("event_num_label", check_schema=False), - ) - priority = xr.DataArray( - np.arange(constants.DE_DATA_PRODUCT_CONFIGURATIONS[apid]["num_priorities"]), - name="priority", - dims=["priority"], - attrs=cdf_attrs.get_variable_attributes("priority", check_schema=False), - ) - priority_label = xr.DataArray( - np.arange( - constants.DE_DATA_PRODUCT_CONFIGURATIONS[apid]["num_priorities"] - ).astype(str), - name="priority_label", - dims=["priority"], - attrs=cdf_attrs.get_variable_attributes("priority_label", check_schema=False), - ) - - # Logical source id to lookup global attributes - if apid == CODICEAPID.COD_LO_PHA: - attrs = cdf_attrs.get_global_attributes("imap_codice_l1a_lo-direct-events") - elif apid == CODICEAPID.COD_HI_PHA: - attrs = cdf_attrs.get_global_attributes("imap_codice_l1a_hi-direct-events") - - # Add coordinates and global attributes to dataset - de_dataset = de_dataset.assign_coords( - epoch=epoch, - epoch_delta_minus=epoch_delta_minus, - epoch_delta_plus=epoch_delta_plus, - event_num=event_num, - event_num_label=event_num_label, - priority=priority, - priority_label=priority_label, - ) - de_dataset.attrs = attrs - - # Carry over these variables from unpacked dataset - if apid == CODICEAPID.COD_LO_PHA: - # Add k_factor - de_dataset["k_factor"] = xr.DataArray( - np.array([constants.K_FACTOR]), - name="k_factor", - dims=["k_factor"], - attrs=cdf_attrs.get_variable_attributes("k_factor", check_schema=False), - ) - - de_dataset["sw_bias_gain_mode"] = xr.DataArray( - unpacked_dataset["sw_bias_gain_mode"].data[epoch_indices], - name="sw_bias_gain_mode", - dims=["epoch"], - attrs=cdf_attrs.get_variable_attributes("sw_bias_gain_mode"), - ) - - de_dataset["st_bias_gain_mode"] = xr.DataArray( - unpacked_dataset["st_bias_gain_mode"].data[epoch_indices], - name="st_bias_gain_mode", - dims=["epoch"], - attrs=cdf_attrs.get_variable_attributes("st_bias_gain_mode"), - ) - - return de_dataset + # Process packets into complete CDF-ready dataset + return process_de_data(packets, apid, cdf_attrs) diff --git a/imap_processing/codice/packet_definitions/codice_packet_definition.xml b/imap_processing/codice/packet_definitions/codice_packet_definition.xml index a50eb3d4b6..829ca02f04 100644 --- a/imap_processing/codice/packet_definitions/codice_packet_definition.xml +++ b/imap_processing/codice/packet_definitions/codice_packet_definition.xml