From 0e01cdafd689417aa245d4779bfe4749a7211d64 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Wed, 21 Jan 2026 10:26:32 +0800 Subject: [PATCH 01/13] fix --- .../scan/TsFileInsertionEventScanParser.java | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java index 8590fe2002e1..6fad3440a34e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java @@ -604,6 +604,23 @@ private void moveToNextChunkReader() } } + if (Objects.nonNull(entity)) { + final TSStatus status = + AuthorityChecker.getAccessControl() + .checkSeriesPrivilege4Pipe( + entity, + Collections.singletonList( + new MeasurementPath(currentDevice, chunkHeader.getMeasurementID())), + PrivilegeType.READ_DATA); + if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + if (skipIfNoPrivileges) { + tsFileSequenceReader.position(nextMarkerOffset); + break; + } + throw new AccessDeniedException(status.getMessage()); + } + } + // Increase value index final int valueIndex = measurementIndexMap.compute( From 2ebb6005d00915c1c31c60a35c4ba1e7e7c33ce4 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Wed, 21 Jan 2026 10:45:47 +0800 Subject: [PATCH 02/13] it --- .../treemodel/manual/IoTDBPipePermissionIT.java | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipePermissionIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipePermissionIT.java index 7f35bb1743ff..4371dc3095a0 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipePermissionIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipePermissionIT.java @@ -323,6 +323,13 @@ public void testSourcePermission() { "create database root.db", "create timeSeries root.db.device.measurement int32"), null); + // Write some aligned historical data + TestUtils.executeNonQueries( + senderEnv, + Arrays.asList( + "create aligned timeSeries root.vehicle.plane(temperature DOUBLE, pressure INT32)", + "insert into root.vehicle.plane(temperature, pressure) values (36.5, 1103)")); + // Transfer snapshot try (final Connection connection = senderEnv.getConnection(); final Statement statement = connection.createStatement()) { @@ -384,6 +391,13 @@ public void testSourcePermission() { "count(root.vehicle.car.pressure),", Collections.singleton("0,")); + // Exception, skip + TestUtils.assertDataAlwaysOnEnv( + receiverEnv, + "select count(temperature) from root.vehicle.plane", + "count(root.vehicle.car.pressure),", + Collections.singleton("0,")); + // Alter pipe, throw exception if no privileges try (final Connection connection = senderEnv.getConnection(); final Statement statement = connection.createStatement()) { From 76434bf9331995805e5d315120f9ceb4ee442676 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Wed, 21 Jan 2026 11:34:07 +0800 Subject: [PATCH 03/13] fix --- .../scan/TsFileInsertionEventScanParser.java | 150 +++++++----------- 1 file changed, 60 insertions(+), 90 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java index 6fad3440a34e..c510119bf9ce 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java @@ -485,14 +485,9 @@ private void moveToNextChunkReader() // Notice that the data in one chunk group is either aligned or non-aligned // There is no need to consider non-aligned chunks when there are value chunks currentIsMultiPage = marker == MetaMarker.CHUNK_HEADER; - long currentChunkHeaderOffset = tsFileSequenceReader.position() - 1; chunkHeader = tsFileSequenceReader.readChunkHeader(marker); - final long nextMarkerOffset = - tsFileSequenceReader.position() + chunkHeader.getDataSize(); - - if (Objects.isNull(currentDevice)) { - tsFileSequenceReader.position(nextMarkerOffset); + if (filterChunk(chunkHeader, false)) { break; } @@ -505,47 +500,6 @@ private void moveToNextChunkReader() break; } - if (!treePattern.matchesMeasurement(currentDevice, chunkHeader.getMeasurementID())) { - tsFileSequenceReader.position(nextMarkerOffset); - break; - } - - // Skip the chunk if it is fully deleted by mods - if (!currentModifications.isEmpty()) { - final Statistics statistics = - findNonAlignedChunkStatistics( - tsFileSequenceReader.getIChunkMetadataList( - currentDevice, chunkHeader.getMeasurementID()), - currentChunkHeaderOffset); - if (statistics != null - && ModsOperationUtil.isAllDeletedByMods( - currentDevice, - chunkHeader.getMeasurementID(), - statistics.getStartTime(), - statistics.getEndTime(), - currentModifications)) { - tsFileSequenceReader.position(nextMarkerOffset); - break; - } - } - - if (Objects.nonNull(entity)) { - final TSStatus status = - AuthorityChecker.getAccessControl() - .checkSeriesPrivilege4Pipe( - entity, - Collections.singletonList( - new MeasurementPath(currentDevice, chunkHeader.getMeasurementID())), - PrivilegeType.READ_DATA); - if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - if (skipIfNoPrivileges) { - tsFileSequenceReader.position(nextMarkerOffset); - break; - } - throw new AccessDeniedException(status.getMessage()); - } - } - if (chunkHeader.getDataSize() > allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) { PipeDataNodeResourceManager.memory() .forceResize(allocatedMemoryBlockForChunk, chunkHeader.getDataSize()); @@ -573,54 +527,12 @@ private void moveToNextChunkReader() case MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER: { if (Objects.isNull(firstChunkHeader4NextSequentialValueChunks)) { - long currentChunkHeaderOffset = tsFileSequenceReader.position() - 1; chunkHeader = tsFileSequenceReader.readChunkHeader(marker); - final long nextMarkerOffset = - tsFileSequenceReader.position() + chunkHeader.getDataSize(); - if (Objects.isNull(currentDevice) - || !treePattern.matchesMeasurement( - currentDevice, chunkHeader.getMeasurementID())) { - tsFileSequenceReader.position(nextMarkerOffset); + if (filterChunk(chunkHeader, true)) { break; } - if (!currentModifications.isEmpty()) { - // Skip the chunk if it is fully deleted by mods - final Statistics statistics = - findAlignedChunkStatistics( - tsFileSequenceReader.getIChunkMetadataList( - currentDevice, chunkHeader.getMeasurementID()), - currentChunkHeaderOffset); - if (statistics != null - && ModsOperationUtil.isAllDeletedByMods( - currentDevice, - chunkHeader.getMeasurementID(), - statistics.getStartTime(), - statistics.getEndTime(), - currentModifications)) { - tsFileSequenceReader.position(nextMarkerOffset); - break; - } - } - - if (Objects.nonNull(entity)) { - final TSStatus status = - AuthorityChecker.getAccessControl() - .checkSeriesPrivilege4Pipe( - entity, - Collections.singletonList( - new MeasurementPath(currentDevice, chunkHeader.getMeasurementID())), - PrivilegeType.READ_DATA); - if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - if (skipIfNoPrivileges) { - tsFileSequenceReader.position(nextMarkerOffset); - break; - } - throw new AccessDeniedException(status.getMessage()); - } - } - // Increase value index final int valueIndex = measurementIndexMap.compute( @@ -711,6 +623,64 @@ private void moveToNextChunkReader() } } + private boolean filterChunk(final ChunkHeader chunkHeader, final boolean isAligned) + throws IOException, IllegalPathException { + long currentChunkHeaderOffset = tsFileSequenceReader.position() - 1; + final long nextMarkerOffset = tsFileSequenceReader.position() + chunkHeader.getDataSize(); + + if (Objects.isNull(currentDevice)) { + tsFileSequenceReader.position(nextMarkerOffset); + return true; + } + + if (!treePattern.matchesMeasurement(currentDevice, chunkHeader.getMeasurementID())) { + tsFileSequenceReader.position(nextMarkerOffset); + return true; + } + + // Skip the chunk if it is fully deleted by mods + if (!currentModifications.isEmpty()) { + final Statistics statistics = + isAligned + ? findAlignedChunkStatistics( + tsFileSequenceReader.getIChunkMetadataList( + currentDevice, chunkHeader.getMeasurementID()), + currentChunkHeaderOffset) + : findNonAlignedChunkStatistics( + tsFileSequenceReader.getIChunkMetadataList( + currentDevice, chunkHeader.getMeasurementID()), + currentChunkHeaderOffset); + if (statistics != null + && ModsOperationUtil.isAllDeletedByMods( + currentDevice, + chunkHeader.getMeasurementID(), + statistics.getStartTime(), + statistics.getEndTime(), + currentModifications)) { + tsFileSequenceReader.position(nextMarkerOffset); + return true; + } + } + + if (Objects.nonNull(entity)) { + final TSStatus status = + AuthorityChecker.getAccessControl() + .checkSeriesPrivilege4Pipe( + entity, + Collections.singletonList( + new MeasurementPath(currentDevice, chunkHeader.getMeasurementID())), + PrivilegeType.READ_DATA); + if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + if (skipIfNoPrivileges) { + tsFileSequenceReader.position(nextMarkerOffset); + return true; + } + throw new AccessDeniedException(status.getMessage()); + } + } + return false; + } + private boolean recordAlignedChunk(final List valueChunkList, final byte marker) throws IOException { if (!valueChunkList.isEmpty()) { From 66aeb41c9ea06a1c6139a2955dcea85088223814 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Wed, 21 Jan 2026 15:29:46 +0800 Subject: [PATCH 04/13] swap-sequence --- .../parser/scan/TsFileInsertionEventScanParser.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java index c510119bf9ce..d14eff835185 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java @@ -487,10 +487,6 @@ private void moveToNextChunkReader() currentIsMultiPage = marker == MetaMarker.CHUNK_HEADER; chunkHeader = tsFileSequenceReader.readChunkHeader(marker); - if (filterChunk(chunkHeader, false)) { - break; - } - if ((chunkHeader.getChunkType() & TsFileConstant.TIME_COLUMN_MASK) == TsFileConstant.TIME_COLUMN_MASK) { timeChunkList.add( @@ -500,6 +496,10 @@ private void moveToNextChunkReader() break; } + if (filterChunk(chunkHeader, false)) { + break; + } + if (chunkHeader.getDataSize() > allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) { PipeDataNodeResourceManager.memory() .forceResize(allocatedMemoryBlockForChunk, chunkHeader.getDataSize()); From dc3b57d7e1a6a9183531fa24edbbab60083df0e0 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Wed, 21 Jan 2026 17:57:31 +0800 Subject: [PATCH 05/13] fix --- .../pipe/it/dual/treemodel/manual/IoTDBPipePermissionIT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipePermissionIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipePermissionIT.java index 4371dc3095a0..7ff4ff75efef 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipePermissionIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipePermissionIT.java @@ -395,7 +395,7 @@ public void testSourcePermission() { TestUtils.assertDataAlwaysOnEnv( receiverEnv, "select count(temperature) from root.vehicle.plane", - "count(root.vehicle.car.pressure),", + "count(root.vehicle.plane.temperature),", Collections.singleton("0,")); // Alter pipe, throw exception if no privileges From 88d769493fa2983c20487f67a103bbd222843d02 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Wed, 21 Jan 2026 19:04:20 +0800 Subject: [PATCH 06/13] fix --- .../scan/TsFileInsertionEventScanParser.java | 26 ++++++++++--------- 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java index d14eff835185..1a30688a14a1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java @@ -487,16 +487,7 @@ private void moveToNextChunkReader() currentIsMultiPage = marker == MetaMarker.CHUNK_HEADER; chunkHeader = tsFileSequenceReader.readChunkHeader(marker); - if ((chunkHeader.getChunkType() & TsFileConstant.TIME_COLUMN_MASK) - == TsFileConstant.TIME_COLUMN_MASK) { - timeChunkList.add( - new Chunk( - chunkHeader, tsFileSequenceReader.readChunk(-1, chunkHeader.getDataSize()))); - isMultiPageList.add(marker == MetaMarker.TIME_CHUNK_HEADER); - break; - } - - if (filterChunk(chunkHeader, false)) { + if (filterChunk(chunkHeader, false, marker)) { break; } @@ -529,7 +520,7 @@ private void moveToNextChunkReader() if (Objects.isNull(firstChunkHeader4NextSequentialValueChunks)) { chunkHeader = tsFileSequenceReader.readChunkHeader(marker); - if (filterChunk(chunkHeader, true)) { + if (filterChunk(chunkHeader, true, marker)) { break; } @@ -623,7 +614,7 @@ private void moveToNextChunkReader() } } - private boolean filterChunk(final ChunkHeader chunkHeader, final boolean isAligned) + private boolean filterChunk(final ChunkHeader chunkHeader, final boolean isAligned, final byte marker) throws IOException, IllegalPathException { long currentChunkHeaderOffset = tsFileSequenceReader.position() - 1; final long nextMarkerOffset = tsFileSequenceReader.position() + chunkHeader.getDataSize(); @@ -633,6 +624,17 @@ private boolean filterChunk(final ChunkHeader chunkHeader, final boolean isAlign return true; } + if (!isAligned) { + if ((chunkHeader.getChunkType() & TsFileConstant.TIME_COLUMN_MASK) + == TsFileConstant.TIME_COLUMN_MASK) { + timeChunkList.add( + new Chunk( + chunkHeader, tsFileSequenceReader.readChunk(-1, chunkHeader.getDataSize()))); + isMultiPageList.add(marker == MetaMarker.TIME_CHUNK_HEADER); + return true; + } + } + if (!treePattern.matchesMeasurement(currentDevice, chunkHeader.getMeasurementID())) { tsFileSequenceReader.position(nextMarkerOffset); return true; From f53465d6d3fd73a236cbd1b350b6b0dab6757f04 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Wed, 21 Jan 2026 19:21:09 +0800 Subject: [PATCH 07/13] spoltess --- .../parser/scan/TsFileInsertionEventScanParser.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java index 1a30688a14a1..a565cf9db712 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java @@ -614,7 +614,8 @@ private void moveToNextChunkReader() } } - private boolean filterChunk(final ChunkHeader chunkHeader, final boolean isAligned, final byte marker) + private boolean filterChunk( + final ChunkHeader chunkHeader, final boolean isAligned, final byte marker) throws IOException, IllegalPathException { long currentChunkHeaderOffset = tsFileSequenceReader.position() - 1; final long nextMarkerOffset = tsFileSequenceReader.position() + chunkHeader.getDataSize(); @@ -626,10 +627,9 @@ private boolean filterChunk(final ChunkHeader chunkHeader, final boolean isAlign if (!isAligned) { if ((chunkHeader.getChunkType() & TsFileConstant.TIME_COLUMN_MASK) - == TsFileConstant.TIME_COLUMN_MASK) { + == TsFileConstant.TIME_COLUMN_MASK) { timeChunkList.add( - new Chunk( - chunkHeader, tsFileSequenceReader.readChunk(-1, chunkHeader.getDataSize()))); + new Chunk(chunkHeader, tsFileSequenceReader.readChunk(-1, chunkHeader.getDataSize()))); isMultiPageList.add(marker == MetaMarker.TIME_CHUNK_HEADER); return true; } From 590403258a6b6681280fdca15134493be809cc6d Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Wed, 21 Jan 2026 19:26:56 +0800 Subject: [PATCH 08/13] stupid-bug --- .../parser/scan/TsFileInsertionEventScanParser.java | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java index a565cf9db712..5f9c0fc16f8b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java @@ -485,9 +485,10 @@ private void moveToNextChunkReader() // Notice that the data in one chunk group is either aligned or non-aligned // There is no need to consider non-aligned chunks when there are value chunks currentIsMultiPage = marker == MetaMarker.CHUNK_HEADER; + final long currentChunkHeaderOffset = tsFileSequenceReader.position() - 1; chunkHeader = tsFileSequenceReader.readChunkHeader(marker); - if (filterChunk(chunkHeader, false, marker)) { + if (filterChunk(currentChunkHeaderOffset, chunkHeader, false, marker)) { break; } @@ -518,9 +519,10 @@ private void moveToNextChunkReader() case MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER: { if (Objects.isNull(firstChunkHeader4NextSequentialValueChunks)) { + final long currentChunkHeaderOffset = tsFileSequenceReader.position() - 1; chunkHeader = tsFileSequenceReader.readChunkHeader(marker); - if (filterChunk(chunkHeader, true, marker)) { + if (filterChunk(currentChunkHeaderOffset, chunkHeader, true, marker)) { break; } @@ -615,9 +617,11 @@ private void moveToNextChunkReader() } private boolean filterChunk( - final ChunkHeader chunkHeader, final boolean isAligned, final byte marker) + final long currentChunkHeaderOffset, + final ChunkHeader chunkHeader, + final boolean isAligned, + final byte marker) throws IOException, IllegalPathException { - long currentChunkHeaderOffset = tsFileSequenceReader.position() - 1; final long nextMarkerOffset = tsFileSequenceReader.position() + chunkHeader.getDataSize(); if (Objects.isNull(currentDevice)) { From 3b2438b987bce3da04f9843aa0f7dc5575d961da Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Thu, 22 Jan 2026 10:25:05 +0800 Subject: [PATCH 09/13] jq --- .../tsfile/parser/scan/TsFileInsertionEventScanParser.java | 6 +++--- .../iotdb/db/pipe/event/TsFileInsertionEventParserTest.java | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java index 5f9c0fc16f8b..5c9985fe748b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java @@ -619,7 +619,7 @@ private void moveToNextChunkReader() private boolean filterChunk( final long currentChunkHeaderOffset, final ChunkHeader chunkHeader, - final boolean isAligned, + final boolean isAlignedValueChunk, final byte marker) throws IOException, IllegalPathException { final long nextMarkerOffset = tsFileSequenceReader.position() + chunkHeader.getDataSize(); @@ -629,7 +629,7 @@ private boolean filterChunk( return true; } - if (!isAligned) { + if (!isAlignedValueChunk) { if ((chunkHeader.getChunkType() & TsFileConstant.TIME_COLUMN_MASK) == TsFileConstant.TIME_COLUMN_MASK) { timeChunkList.add( @@ -647,7 +647,7 @@ private boolean filterChunk( // Skip the chunk if it is fully deleted by mods if (!currentModifications.isEmpty()) { final Statistics statistics = - isAligned + isAlignedValueChunk ? findAlignedChunkStatistics( tsFileSequenceReader.getIChunkMetadataList( currentDevice, chunkHeader.getMeasurementID()), diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java index ae45860b7479..8814190755e2 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java @@ -94,14 +94,14 @@ public void tearDown() throws Exception { } @Test - public void testQueryContainer() throws Exception { + public void testQueryParser() throws Exception { final long startTime = System.currentTimeMillis(); testToTabletInsertionEvents(true); System.out.println(System.currentTimeMillis() - startTime); } @Test - public void testScanContainer() throws Exception { + public void testScanParser() throws Exception { final long startTime = System.currentTimeMillis(); testToTabletInsertionEvents(false); System.out.println(System.currentTimeMillis() - startTime); From d0a0752e99623dff0802d98d6a145cb0838d8c79 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Thu, 22 Jan 2026 11:12:52 +0800 Subject: [PATCH 10/13] slightly --- .../main/java/org/apache/iotdb/db/auth/AuthorityChecker.java | 2 ++ .../iotdb/db/pipe/event/PipeTsFileInsertionEventTest.java | 3 +++ 2 files changed, 5 insertions(+) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/auth/AuthorityChecker.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/auth/AuthorityChecker.java index 8c70d49874c2..81db578ace16 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/auth/AuthorityChecker.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/auth/AuthorityChecker.java @@ -31,6 +31,7 @@ import org.apache.iotdb.commons.schema.column.ColumnHeader; import org.apache.iotdb.commons.schema.column.ColumnHeaderConstant; import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics; +import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerResp; import org.apache.iotdb.confignode.rpc.thrift.TDBPrivilege; import org.apache.iotdb.confignode.rpc.thrift.TListUserInfo; @@ -114,6 +115,7 @@ public static AccessControl getAccessControl() { return accessControl; } + @TestOnly public static void setAccessControl(AccessControl accessControl) { AuthorityChecker.accessControl = accessControl; } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTsFileInsertionEventTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTsFileInsertionEventTest.java index 46401beb4850..5ba0843bf800 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTsFileInsertionEventTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTsFileInsertionEventTest.java @@ -42,6 +42,7 @@ import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ArrayDeviceTimeIndex; import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ITimeIndex; import org.apache.iotdb.db.utils.constant.TestConstant; +import org.apache.iotdb.pipe.api.exception.PipeException; import org.apache.tsfile.file.metadata.IDeviceID; import org.apache.tsfile.file.metadata.enums.CompressionType; @@ -144,6 +145,8 @@ public void testAuthCheck() throws Exception { treeEvent.throwIfNoPrivilege(); Assert.assertTrue(treeEvent.shouldParse4Privilege()); + Assert.assertThrows(PipeException.class, treeEvent::toTabletInsertionEvents); + treeEvent.setTreeSchemaMap(Collections.singletonMap(deviceID, new String[] {"s0", "s1"})); Assert.assertThrows(AccessDeniedException.class, treeEvent::throwIfNoPrivilege); From 9d374645f1dc33ebe27d975faf0712e8568ce3a3 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Thu, 22 Jan 2026 14:38:52 +0800 Subject: [PATCH 11/13] fix --- .../PipeInsertNodeTabletInsertionEvent.java | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java index 19881c955f06..b576ac507b87 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java @@ -287,9 +287,6 @@ public boolean isGeneratedByPipe() { @Override public void throwIfNoPrivilege() throws Exception { - if (skipIfNoPrivileges) { - return; - } final InsertNode node = insertNode; if (Objects.isNull(node)) { // Event is released, skip privilege check @@ -320,10 +317,14 @@ private void checkTableName(final String tableName) { userName, new QualifiedObjectName(getTableModelDatabaseName(), tableName), new UserEntity(Long.parseLong(userId), userName, cliHostname))) { - throw new AccessDeniedException( - String.format( - "No privilege for SELECT for user %s at table %s.%s", - userName, tableModelDatabaseName, tableName)); + if (skipIfNoPrivileges) { + shouldParse4Privilege = true; + } else { + throw new AccessDeniedException( + String.format( + "No privilege for SELECT for user %s at table %s.%s", + userName, tableModelDatabaseName, tableName)); + } } } From dde367f1bcae3eb8715049659c8471a7c8d1db03 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Thu, 22 Jan 2026 15:02:05 +0800 Subject: [PATCH 12/13] comment --- .../pipe/event/common/deletion/PipeDeleteDataNodeEvent.java | 1 + .../org/apache/iotdb/commons/pipe/event/EnrichedEvent.java | 4 ++++ 2 files changed, 5 insertions(+) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/deletion/PipeDeleteDataNodeEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/deletion/PipeDeleteDataNodeEvent.java index 46f07bd40fee..d0f720c2743f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/deletion/PipeDeleteDataNodeEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/deletion/PipeDeleteDataNodeEvent.java @@ -155,6 +155,7 @@ public boolean isGeneratedByPipe() { @Override public void throwIfNoPrivilege() { + // The privilege will be parsed at PipeEventCollector if (skipIfNoPrivileges || !(deleteDataNode instanceof RelationalDeleteDataNode)) { return; } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java index afe0ebe22df5..792aa259ad7a 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java @@ -423,6 +423,10 @@ public boolean needToCommit() { return true; } + // If user has privilege: Do nothing + // If user doesn't have privilege, and skip if == true: set shouldParse4Privilege = true + // (The DeleteDataEvent will be parsed regardless of the flag) + // If user doesn't have privilege, and skip if == false: throw exception public void throwIfNoPrivilege() throws Exception { // Do nothing by default } From e19c3f09c0ba7b82afa97dd7025b07256e2a874c Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Thu, 22 Jan 2026 15:03:23 +0800 Subject: [PATCH 13/13] fix --- .../org/apache/iotdb/commons/pipe/event/EnrichedEvent.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java index 792aa259ad7a..a55312471b30 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java @@ -425,7 +425,8 @@ public boolean needToCommit() { // If user has privilege: Do nothing // If user doesn't have privilege, and skip if == true: set shouldParse4Privilege = true - // (The DeleteDataEvent will be parsed regardless of the flag) + // (The DeleteDataEvent will be parsed regardless of the flag, while insert node and tsFile will + // be parsed iff this flag == true) // If user doesn't have privilege, and skip if == false: throw exception public void throwIfNoPrivilege() throws Exception { // Do nothing by default