Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions imap_processing/tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,50 @@ def test_packet_file_to_datasets_flat_definition():
utils.packet_file_to_datasets(packet_files, packet_definition)


def test_combine_segmented_packets():
"""Test combine_segmented_packets function."""

# unsegmented, first, middle, last, unsegmented
sequence_flags = xr.DataArray(np.array([3, 1, 0, 2, 3]), dims=["epoch"])

binary_data = xr.DataArray(
np.array(
[
b"ABC",
b"123",
b"456",
b"789",
b"abc",
],
dtype=object,
),
dims=["epoch"],
)

ds = xr.Dataset(data_vars={"seq_flgs": sequence_flags, "packetdata": binary_data})

combined_ds = utils.combine_segmented_packets(ds, "packetdata")

expected_ds = xr.Dataset(
data_vars={
"seq_flgs": xr.DataArray(np.array([3, 1, 3]), dims=["epoch"]),
"packetdata": xr.DataArray(
np.array(
[
b"ABC",
b"123456789",
b"abc",
],
dtype=object,
),
dims=["epoch"],
),
}
)

xr.testing.assert_equal(combined_ds, expected_ds)


def test_extract_data_dict():
"""Test extract_data_dict function."""
data_vars = {
Expand Down
65 changes: 64 additions & 1 deletion imap_processing/ultra/l0/decom_ultra.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,73 @@
ULTRA_RATES,
PacketProperties,
)
from imap_processing.utils import convert_to_binary_string
from imap_processing.utils import combine_segmented_packets, convert_to_binary_string

logger = logging.getLogger(__name__)


def extract_initial_items_from_combined_packets(
packets: xr.Dataset,
) -> xr.Dataset:
"""
Extract metadata fields from the beginning of combined event_data packets.

Extracts bit fields from the first 20 bytes of each event_data array
and adds them as new variables to the dataset.

Parameters
----------
packets : xarray.Dataset
Dataset containing combined packets with event_data.

Returns
-------
xarray.Dataset
Dataset with extracted metadata fields added.
"""
# Initialize arrays for extracted fields
n_packets = len(packets.epoch)

# Preallocate arrays
sid = np.zeros(n_packets, dtype=np.uint8)
spin = np.zeros(n_packets, dtype=np.uint8)
abortflag = np.zeros(n_packets, dtype=np.uint8)
startdelay = np.zeros(n_packets, dtype=np.uint16)
p00 = np.zeros(n_packets, dtype=np.uint8)

# Extract the data array outside of the loop
binary_data = packets["packetdata"].data
# Extract fields from each packet
for pkt_idx in range(n_packets):
event_data = binary_data[pkt_idx]

sid[pkt_idx] = event_data[0]
spin[pkt_idx] = event_data[1]
abortflag[pkt_idx] = (event_data[2] >> 7) & 0x1
startdelay[pkt_idx] = int.from_bytes(event_data[2:4], byteorder="big") & 0x7FFF
p00[pkt_idx] = event_data[4]

# Remove the first 5 bytes after extraction
binary_data[pkt_idx] = event_data[5:]

# Add extracted fields to dataset
packets["sid"] = xr.DataArray(sid, dims=["epoch"])
packets["spin"] = xr.DataArray(spin, dims=["epoch"])
packets["abortflag"] = xr.DataArray(abortflag, dims=["epoch"])
packets["startdelay"] = xr.DataArray(startdelay, dims=["epoch"])
packets["p00"] = xr.DataArray(p00, dims=["epoch"])

return packets


def process_ultra_tof(ds: xr.Dataset, packet_props: PacketProperties) -> xr.Dataset:
"""
Unpack and decode Ultra TOF packets.

The TOF packets contain image data that may be split across multiple segmented
packets. This function combines the segmented packets and decompresses the image
data.

Parameters
----------
ds : xarray.Dataset
Expand All @@ -54,6 +112,11 @@ def process_ultra_tof(ds: xr.Dataset, packet_props: PacketProperties) -> xr.Data
dataset : xarray.Dataset
Dataset containing the decoded and decompressed data.
"""
# Combine segmented packets
ds = combine_segmented_packets(ds, binary_field_name="packetdata")
# Extract the header keys from each of the combined packetdata fields.
ds = extract_initial_items_from_combined_packets(ds)

scalar_keys = [key for key in ds.data_vars if key not in ("packetdata", "sid")]

image_planes = packet_props.image_planes
Expand Down
Loading