diff --git a/modules/rntuple.mjs b/modules/rntuple.mjs index fe29976ec..7ebac27ed 100644 --- a/modules/rntuple.mjs +++ b/modules/rntuple.mjs @@ -114,6 +114,7 @@ class RBufferReader { } + const ENTupleColumnType = { kBit: 0x00, kByte: 0x01, @@ -962,143 +963,440 @@ function getSelectorFieldName(selector, i) { return isStr(br) ? br : br?.fieldName; } -// Read and process the next data cluster from the RNTuple -async function readNextCluster(rntuple, selector) { - const builder = rntuple.builder; +class ReaderItem { + + constructor(column, name) { + this.column = column; + this.id = column.index; + this.coltype = column.coltype; + this.splittype = 0; + this.page = -1; // current page for the reading + this.name = name; + this.sz = 0; + + // special handling of split types + if ((this.coltype >= ENTupleColumnType.kSplitInt16) && (this.coltype <= ENTupleColumnType.kSplitIndex64)) { + this.splittype = this.coltype; + this.coltype -= (ENTupleColumnType.kSplitInt16 - ENTupleColumnType.kInt16); + } + } - // Add validation - if (!builder.clusterSummaries) - throw new Error('No cluster summaries available - possibly incomplete file reading'); + cleanup() { + this.views = null; + this.view = null; + this.view_len = 0; + } - const clusterIndex = selector.currentCluster, - clusterSummary = builder.clusterSummaries[clusterIndex], - // Gather all pages for this cluster from selected fields only - pages = [], - // Collect only selected field names from selector - selectedFields = []; + init_o() { + this.o = 0; + this.o2 = 0; // for bit count + this.view = this.views.shift(); + this.view_len = this.view.byteLength; + } - if (!clusterSummary) { - selector.Terminate(clusterIndex > 0); - return false; + shift_o(sz) { + this.o += sz; + while ((this.o >= this.view_len) && this.view_len) { + this.o -= this.view_len; + if (this.views.length) { + this.view = this.views.shift(); + this.view_len = this.view.byteLength; + } else { + this.view = null; + this.view_len = 0; + } + } + } + + shift(entries) { + if (this.sz) + this.shift_o(this.sz * entries); + else { + while (entries-- > 0) + this.func({}); + } + } + + /** @summary Simple column which fixed element size */ + is_simple() { return this.sz > 0; } + + assignReadFunc() { + switch (this.coltype) { + case ENTupleColumnType.kBit: { + this.func = function(obj) { + if (this.o2 === 0) + this.byte = this.view.getUint8(this.o); + obj[this.name] = ((this.byte >>> this.o2++) & 1) === 1; + if (this.o2 === 8) { + this.o2 = 0; + this.shift_o(1); + } + }; + break; + } + case ENTupleColumnType.kReal64: + this.func = function(obj) { + obj[this.name] = this.view.getFloat64(this.o, LITTLE_ENDIAN); + this.shift_o(8); + }; + this.sz = 8; + break; + case ENTupleColumnType.kReal32: + this.func = function(obj) { + obj[this.name] = this.view.getFloat32(this.o, LITTLE_ENDIAN); + this.shift_o(4); + }; + this.sz = 4; + break; + case ENTupleColumnType.kInt64: + case ENTupleColumnType.kIndex64: + this.func = function(obj) { + // FIXME: let process BigInt in the TTree::Draw + obj[this.name] = Number(this.view.getBigInt64(this.o, LITTLE_ENDIAN)); + this.shift_o(8); + }; + this.sz = 8; + break; + case ENTupleColumnType.kUInt64: + this.func = function(obj) { + // FIXME: let process BigInt in the TTree::Draw + obj[this.name] = Number(this.view.getBigUint64(this.o, LITTLE_ENDIAN)); + this.shift_o(8); + }; + this.sz = 8; + break; + case ENTupleColumnType.kInt32: + case ENTupleColumnType.kIndex32: + this.func = function(obj) { + obj[this.name] = this.view.getInt32(this.o, LITTLE_ENDIAN); + this.shift_o(4); + }; + this.sz = 4; + break; + case ENTupleColumnType.kUInt32: + this.func = function(obj) { + obj[this.name] = this.view.getUint32(this.o, LITTLE_ENDIAN); + this.shift_o(4); + }; + this.sz = 4; + break; + case ENTupleColumnType.kInt16: + this.func = function(obj) { + obj[this.name] = this.view.getInt16(this.o, LITTLE_ENDIAN); + this.shift_o(2); + }; + this.sz = 2; + break; + case ENTupleColumnType.kUInt16: + this.func = function(obj) { + obj[this.name] = this.view.getUint16(this.o, LITTLE_ENDIAN); + this.shift_o(2); + }; + this.sz = 2; + break; + case ENTupleColumnType.kInt8: + this.func = function(obj) { + obj[this.name] = this.view.getInt8(this.o); + this.shift_o(1); + }; + this.sz = 1; + break; + case ENTupleColumnType.kUInt8: + case ENTupleColumnType.kByte: + this.func = function(obj) { + obj[this.name] = this.view.getUint8(this.o); + this.shift_o(1); + }; + this.sz = 1; + break; + case ENTupleColumnType.kChar: + this.func = function(obj) { + obj[this.name] = String.fromCharCode(this.view.getInt8(this.o)); + this.shift_o(1); + }; + this.sz = 1; + break; + default: + throw new Error(`Unsupported column type: ${this.coltype}`); + } + } + + /** @summary identify if this item used as offset for std::string or similar */ + is_offset_item() { return this.item1; } + + assignStringReader(item1) { + this.item1 = item1; + this.off0 = 0; + this.$tgt = {}; + + item1.func0 = item1.func; + item1.shift0 = item1.shift; + + // assign noop + item1.func = item1.shift = () => {}; + + this.func = function(tgtobj) { + this.item1.func0(this.$tgt); + const off = Number(this.$tgt[this.name]); + let len = off - this.off0, s = ''; + while (len-- > 0) { + s += String.fromCharCode(this.view.getInt8(this.o)); + this.shift_o(1); + } + tgtobj[this.name] = s; + this.off0 = off; + }; + + this.shift = function(entries) { + if (entries > 0) { + this.item1.shift0(entries); + this.item1.func0(this.$tgt); + this.off0 = Number(this.$tgt[this.name]); + this.shift_o(this.off0); + } + }; } - for (let i = 0; i < selector.numBranches(); ++i) - selectedFields.push(getSelectorFieldName(selector, i)); + collectPages(cluster_locations, dataToRead, itemsToRead, pagesToRead, emin, emax, elist) { + const pages = cluster_locations[this.id].pages; - // For each selected field, collect its columns' pages - for (const fieldName of selectedFields) { - const columns = rntuple.fieldToColumns[fieldName]; - if (!columns) - throw new Error(`Selected field '${fieldName}' not found in RNTuple`); + this.views = new Array(pages.length); - for (const colDesc of columns) { - const colEntry = builder.pageLocations[clusterIndex]?.[colDesc.index]; + let e0 = 0; + for (let p = 0; p < pages.length; ++p) { + const page = pages[p], + e1 = e0 + Number(page.numElements), + margin = this.is_offset_item() ? 1 : 0; // offset for previous entry has to be read as well - // When the data is missing or broken - if (!colEntry || !colEntry.pages) - throw new Error(`No pages for column ${colDesc.index} in cluster ${clusterIndex}`); + let is_entries_inside = false; + if (elist?.length) + elist.forEach(e => { is_entries_inside ||= (e >= e0) && (e - margin < e1); }); + else + is_entries_inside = ((e0 >= emin - margin) && (e0 < emax)) || ((e1 > emin - margin) && (e1 <= emax)); - for (const page of colEntry.pages) - pages.push({ page, colDesc, fieldName }); + if (!this.is_simple() || is_entries_inside) { + itemsToRead.push(this); + dataToRead.push(Number(page.locator.offset), page.locator.size); + pagesToRead.push(p); + this.views[p] = null; // placeholder, filled after request + } else + this.views[p] = { byteLength: this.sz * Number(page.numElements) }; // dummy entry only to allow proper navigation + + e0 = e1; } } - selector.currentCluster++; + async unzipBlob(blob, cluster_locations, page_indx) { + const colEntry = cluster_locations[this.id], // Access column entry + numElements = Number(colEntry.pages[page_indx].numElements), + elementSize = this.column.bitsOnStorage / 8; + + let expectedSize = numElements * elementSize; + // Special handling for boolean fields + if (this.coltype === ENTupleColumnType.kBit) + expectedSize = Math.ceil(numElements / 8); + + // Check if data is compressed + if ((colEntry.compression === 0) || (blob.byteLength === expectedSize)) + return blob; // Uncompressed: use blob directly + + // Try decompression + return R__unzip(blob, expectedSize).then(result => { + return result || blob; // Fallback to original blob ?? + }).catch(err => { + throw new Error(`Failed to unzip page ${page_indx} for column ${this.id}: ${err.message}`); + }); + } + + reconstructBlob(rawblob, page_indx) { + if (!(rawblob instanceof DataView)) + throw new Error(`Invalid blob type for column ${this.id}: ${Object.prototype.toString.call(rawblob)}`); - // Early exit if no pages to read (i.e., no selected fields matched) - if (pages.length === 0) { - selector.Terminate(false); - return false; + const originalColtype = this.column.coltype, + data = recontructUnsplitBuffer(rawblob, this.column); + + let view; + + // Handle split index types + if (originalColtype === ENTupleColumnType.kSplitIndex32 || originalColtype === ENTupleColumnType.kSplitIndex64) + view = new DataView(DecodeDeltaIndex(data.blob, data.coltype).blob.buffer); + // Handle Split Signed Int types + else if (originalColtype === ENTupleColumnType.kSplitInt16 || originalColtype === ENTupleColumnType.kSplitInt32 || originalColtype === ENTupleColumnType.kSplitInt64) + view = new DataView(decodeZigzag(data.blob, data.coltype).blob.buffer); + else if (data.blob instanceof DataView) + view = data.blob; + else + view = new DataView(data.blob); + + this.views[page_indx] = view; } - // Build flat array of [offset, size, offset, size, ...] to read pages - const dataToRead = pages.flatMap(p => - [Number(p.page.locator.offset), Number(p.page.locator.size)] - ); - - return rntuple.$file.readBuffer(dataToRead).then(blobsRaw => { - const blobs = Array.isArray(blobsRaw) ? blobsRaw : [blobsRaw], - unzipPromises = blobs.map((blob, idx) => { - const { page, colDesc } = pages[idx], - colEntry = builder.pageLocations[clusterIndex][colDesc.index], // Access column entry - numElements = Number(page.numElements), - elementSize = colDesc.bitsOnStorage / 8; - - // Check if data is compressed - if (colEntry.compression === 0) - return Promise.resolve(blob); // Uncompressed: use blob directly - const expectedSize = numElements * elementSize; - - // Special handling for boolean fields - if (colDesc.coltype === ENTupleColumnType.kBit) { - const expectedBoolSize = Math.ceil(numElements / 8); - if (blob.byteLength === expectedBoolSize) - return Promise.resolve(blob); - // Try decompression but catch errors for boolean fields - return R__unzip(blob, expectedBoolSize).catch(err => { - throw new Error(`Failed to unzip boolean page ${idx}: ${err.message}`); - }); - } +} - // If the blob is already the expected size, treat as uncompressed - if (blob.byteLength === expectedSize) - return Promise.resolve(blob); - - // Try decompression - return R__unzip(blob, expectedSize).then(result => { - if (!result) - return blob; // Fallback to original blob - return result; - }).catch(err => { - throw new Error(`Failed to unzip page ${idx}: ${err.message}`); - }); - }); +async function rntupleProcess(rntuple, selector, args = {}) { + const handle = { + rntuple, // keep rntuple reference + file: rntuple.$file, // keep file reference + selector, // reference on selector + arr: [], // list of special handles per columns, more than one column per field may exist + current_cluster: 0, // current cluster to process + current_cluster_first_entry: 0, // first entry in current cluster + current_cluster_last_entry: 0, // last entry in current cluster + current_entry: 0, // current processed entry + process_arrays: false, // one can process all branches as arrays + firstentry: 0, // first entry in the rntuple + lastentry: 0 // last entry in the rntuple + }; + + function readNextPortion(inc_cluster) { + if (inc_cluster) { + handle.current_cluster++; + handle.current_cluster_first_entry = handle.current_cluster_last_entry; + } - return Promise.all(unzipPromises).then(unzipBlobs => { - rntuple._clusterData = {}; // store deserialized data per column index + const locations = rntuple.builder.pageLocations[handle.current_cluster]; + if (!locations) { + selector.Terminate(true); + return selector; + } - for (let i = 0; i < unzipBlobs.length; ++i) { - const blob = unzipBlobs[i]; - // Ensure blob is a DataView - if (!(blob instanceof DataView)) - throw new Error(`Invalid blob type for page ${i}: ${Object.prototype.toString.call(blob)}`); - const colDesc = pages[i].colDesc, - values = builder.deserializePage(blob, colDesc, pages[i].page); + const numClusterEntries = rntuple.builder.clusterSummaries[handle.current_cluster].numEntries; - // Support multiple representations (e.g., string fields with offsets + payload) - if (!rntuple._clusterData[colDesc.index]) - rntuple._clusterData[colDesc.index] = []; + handle.current_cluster_last_entry = handle.current_cluster_first_entry + numClusterEntries; - rntuple._clusterData[colDesc.index].push(values); - } + // calculate entries which can be extracted from the cluster + let emin, emax; + const dataToRead = [], itemsToRead = [], pagesToRead = [], elist = []; + + if (handle.process_entries) { + let i = handle.process_entries_indx; + while ((i < handle.process_entries.length) && (handle.process_entries[i] < handle.current_cluster_last_entry)) + elist.push(handle.process_entries[i++] - handle.current_cluster_first_entry); + emin = elist[0]; + emax = elist[elist.length - 1]; + } else { + emin = handle.current_entry - handle.current_cluster_first_entry; + emax = Math.min(numClusterEntries, handle.process_max - handle.current_cluster_first_entry); + } - const numEntries = clusterSummary.numEntries; - for (let i = 0; i < numEntries; ++i) { - for (let b = 0; b < selector.numBranches(); ++b) { - const fieldName = getSelectorFieldName(selector, b), - tgtName = selector.nameOfBranch(b); + // loop over all columns and request required pages + for (let i = 0; i < handle.arr.length; ++i) + handle.arr[i].collectPages(locations, dataToRead, itemsToRead, pagesToRead, emin, emax, elist); - selector.tgtobj[tgtName] = readEntry(rntuple, fieldName, clusterIndex, i); + return rntuple.$file.readBuffer(dataToRead).then(blobsRaw => { + const blobs = Array.isArray(blobsRaw) ? blobsRaw : [blobsRaw], + unzipPromises = blobs.map((blob, idx) => itemsToRead[idx].unzipBlob(blob, locations, pagesToRead[idx])); + return Promise.all(unzipPromises); + }).then(unzipBlobs => { + unzipBlobs.map((rawblob, idx) => itemsToRead[idx].reconstructBlob(rawblob, pagesToRead[idx])); + + for (let indx = 0; indx < handle.arr.length; ++indx) + handle.arr[indx].init_o(); + + let skip_entries = handle.current_entry - handle.current_cluster_first_entry; + + while (handle.current_entry < handle.current_cluster_last_entry) { + for (let i = 0; i < handle.arr.length; ++i) { + if (skip_entries > 0) + handle.arr[i].shift(skip_entries); + handle.arr[i].func(selector.tgtobj); + } + skip_entries = 0; + + selector.Process(handle.current_entry); + + if (handle.process_entries) { + if (++handle.process_entries_indx >= handle.process_entries.length) { + selector.Terminate(true); + return selector; + } + const prev_entry = handle.current_entry; + handle.current_entry = handle.process_entries[handle.process_entries_indx]; + skip_entries = handle.current_entry - prev_entry - 1; + } else if (++handle.current_entry >= handle.process_max) { + selector.Terminate(true); + return selector; } - selector.Process(selector.currentEntry++); } - return readNextCluster(rntuple, selector); + return readNextPortion(true); }); - }); -} + } + + return readHeaderFooter(rntuple).then(res => { + if (!res) + throw new Error('Not able to read header for the RNtuple'); + + for (let i = 0; i < selector.numBranches(); ++i) { + const name = getSelectorFieldName(selector, i); + if (!name) + throw new Error(`Not able to extract name for field ${i}`); + + // TODO: fieldToColumns can be out out + const columns = rntuple.fieldToColumns[name]; + if (!columns) + throw new Error(`No columns found for field '${name}' in RNTuple`); + + const tgtname = selector.nameOfBranch(i), + item = new ReaderItem(columns[0], tgtname); + item.assignReadFunc(); + handle.arr.push(item); + + if (columns.length === 2) { + const item2 = new ReaderItem(columns[1], tgtname); + item2.assignStringReader(item); + handle.arr.push(item2); + } + } + + // calculate number of entries + rntuple.builder.clusterSummaries.forEach(summary => { handle.lastentry += summary.numEntries; }); -// TODO args can later be used to filter fields, limit entries, etc. -// Create reader and deserialize doubles from the buffer -function rntupleProcess(rntuple, selector, args) { - return readHeaderFooter(rntuple).then(() => { - selector.Begin(); - selector.currentCluster = 0; - selector.currentEntry = 0; - return readNextCluster(rntuple, selector, args); + if (handle.firstentry >= handle.lastentry) + throw new Error('Not able to find entries in the RNtuple'); + + // select range of entries to process + handle.process_min = handle.firstentry; + handle.process_max = handle.lastentry; + + if (args.elist) { + args.firstentry = args.elist.at(0); + args.numentries = args.elist.at(-1) - args.elist.at(0) + 1; + handle.process_entries = args.elist; + handle.process_entries_indx = 0; + handle.process_arrays = false; // do not use arrays process for selected entries + } + + if (Number.isInteger(args.firstentry) && (args.firstentry > handle.firstentry) && (args.firstentry < handle.lastentry)) + handle.process_min = args.firstentry; + + if (Number.isInteger(args.numentries) && (args.numentries > 0)) + handle.process_max = Math.min(handle.process_max, handle.process_min + args.numentries); + + // first check from which cluster one should start + for (let indx = 0, emin = 0; indx < rntuple.builder.clusterSummaries.length; ++indx) { + const summary = rntuple.builder.clusterSummaries[indx], + emax = emin + summary.numEntries; + if ((handle.process_min >= emin) && (handle.process_min < emax)) { + handle.current_cluster = indx; + handle.current_cluster_first_entry = emin; + break; + } + emin = emax; + } + + if (handle.current_cluster < 0) + throw new Error(`Not able to find cluster for entry ${handle.process_min} in the RNtuple`); + + handle.current_entry = handle.process_min; + + selector.Begin(rntuple); + + return readNextPortion(); }).then(() => selector); } + class TDrawSelectorTuple extends TDrawSelector { /** @summary Return total number of entries @@ -1120,30 +1418,6 @@ class TDrawSelectorTuple extends TDrawSelector { /** @summary Returns true if field can be used as array */ isArrayBranch(/* tuple, br */) { return false; } - /** @summary Begin of processing */ - Begin(tree) { - super.Begin(tree); - this._first_entry = true; - this._has_bigint = false; - } - - /** @summary Process entry */ - Process(entry) { - if (this._first_entry || this._has_bigint) { - for (let n = 0; n < this.numBranches(); ++n) { - const name = this.nameOfBranch(n); - if (typeof this.tgtobj[name] === 'bigint') { - this.tgtobj[name] = Number(this.tgtobj[name]); - this._has_bigint = true; - } - } - } - - this._first_entry = false; - - super.Process(entry); - } - } // class TDrawSelectorTuple