diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipePermissionIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipePermissionIT.java index 7f35bb1743ff0..7ff4ff75efef2 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipePermissionIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipePermissionIT.java @@ -323,6 +323,13 @@ public void testSourcePermission() { "create database root.db", "create timeSeries root.db.device.measurement int32"), null); + // Write some aligned historical data + TestUtils.executeNonQueries( + senderEnv, + Arrays.asList( + "create aligned timeSeries root.vehicle.plane(temperature DOUBLE, pressure INT32)", + "insert into root.vehicle.plane(temperature, pressure) values (36.5, 1103)")); + // Transfer snapshot try (final Connection connection = senderEnv.getConnection(); final Statement statement = connection.createStatement()) { @@ -384,6 +391,13 @@ public void testSourcePermission() { "count(root.vehicle.car.pressure),", Collections.singleton("0,")); + // Exception, skip + TestUtils.assertDataAlwaysOnEnv( + receiverEnv, + "select count(temperature) from root.vehicle.plane", + "count(root.vehicle.plane.temperature),", + Collections.singleton("0,")); + // Alter pipe, throw exception if no privileges try (final Connection connection = senderEnv.getConnection(); final Statement statement = connection.createStatement()) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/auth/AuthorityChecker.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/auth/AuthorityChecker.java index 8c70d49874c2b..81db578ace16a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/auth/AuthorityChecker.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/auth/AuthorityChecker.java @@ -31,6 +31,7 @@ import org.apache.iotdb.commons.schema.column.ColumnHeader; import org.apache.iotdb.commons.schema.column.ColumnHeaderConstant; import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics; +import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerResp; import org.apache.iotdb.confignode.rpc.thrift.TDBPrivilege; import org.apache.iotdb.confignode.rpc.thrift.TListUserInfo; @@ -114,6 +115,7 @@ public static AccessControl getAccessControl() { return accessControl; } + @TestOnly public static void setAccessControl(AccessControl accessControl) { AuthorityChecker.accessControl = accessControl; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/deletion/PipeDeleteDataNodeEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/deletion/PipeDeleteDataNodeEvent.java index 46f07bd40feeb..d0f720c2743fa 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/deletion/PipeDeleteDataNodeEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/deletion/PipeDeleteDataNodeEvent.java @@ -155,6 +155,7 @@ public boolean isGeneratedByPipe() { @Override public void throwIfNoPrivilege() { + // The privilege will be parsed at PipeEventCollector if (skipIfNoPrivileges || !(deleteDataNode instanceof RelationalDeleteDataNode)) { return; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java index 19881c955f062..b576ac507b875 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java @@ -287,9 +287,6 @@ public boolean isGeneratedByPipe() { @Override public void throwIfNoPrivilege() throws Exception { - if (skipIfNoPrivileges) { - return; - } final InsertNode node = insertNode; if (Objects.isNull(node)) { // Event is released, skip privilege check @@ -320,10 +317,14 @@ private void checkTableName(final String tableName) { userName, new QualifiedObjectName(getTableModelDatabaseName(), tableName), new UserEntity(Long.parseLong(userId), userName, cliHostname))) { - throw new AccessDeniedException( - String.format( - "No privilege for SELECT for user %s at table %s.%s", - userName, tableModelDatabaseName, tableName)); + if (skipIfNoPrivileges) { + shouldParse4Privilege = true; + } else { + throw new AccessDeniedException( + String.format( + "No privilege for SELECT for user %s at table %s.%s", + userName, tableModelDatabaseName, tableName)); + } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java index 8590fe2002e13..5c9985fe748b7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java @@ -485,67 +485,13 @@ private void moveToNextChunkReader() // Notice that the data in one chunk group is either aligned or non-aligned // There is no need to consider non-aligned chunks when there are value chunks currentIsMultiPage = marker == MetaMarker.CHUNK_HEADER; - long currentChunkHeaderOffset = tsFileSequenceReader.position() - 1; + final long currentChunkHeaderOffset = tsFileSequenceReader.position() - 1; chunkHeader = tsFileSequenceReader.readChunkHeader(marker); - final long nextMarkerOffset = - tsFileSequenceReader.position() + chunkHeader.getDataSize(); - - if (Objects.isNull(currentDevice)) { - tsFileSequenceReader.position(nextMarkerOffset); - break; - } - - if ((chunkHeader.getChunkType() & TsFileConstant.TIME_COLUMN_MASK) - == TsFileConstant.TIME_COLUMN_MASK) { - timeChunkList.add( - new Chunk( - chunkHeader, tsFileSequenceReader.readChunk(-1, chunkHeader.getDataSize()))); - isMultiPageList.add(marker == MetaMarker.TIME_CHUNK_HEADER); - break; - } - - if (!treePattern.matchesMeasurement(currentDevice, chunkHeader.getMeasurementID())) { - tsFileSequenceReader.position(nextMarkerOffset); + if (filterChunk(currentChunkHeaderOffset, chunkHeader, false, marker)) { break; } - // Skip the chunk if it is fully deleted by mods - if (!currentModifications.isEmpty()) { - final Statistics statistics = - findNonAlignedChunkStatistics( - tsFileSequenceReader.getIChunkMetadataList( - currentDevice, chunkHeader.getMeasurementID()), - currentChunkHeaderOffset); - if (statistics != null - && ModsOperationUtil.isAllDeletedByMods( - currentDevice, - chunkHeader.getMeasurementID(), - statistics.getStartTime(), - statistics.getEndTime(), - currentModifications)) { - tsFileSequenceReader.position(nextMarkerOffset); - break; - } - } - - if (Objects.nonNull(entity)) { - final TSStatus status = - AuthorityChecker.getAccessControl() - .checkSeriesPrivilege4Pipe( - entity, - Collections.singletonList( - new MeasurementPath(currentDevice, chunkHeader.getMeasurementID())), - PrivilegeType.READ_DATA); - if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - if (skipIfNoPrivileges) { - tsFileSequenceReader.position(nextMarkerOffset); - break; - } - throw new AccessDeniedException(status.getMessage()); - } - } - if (chunkHeader.getDataSize() > allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) { PipeDataNodeResourceManager.memory() .forceResize(allocatedMemoryBlockForChunk, chunkHeader.getDataSize()); @@ -573,37 +519,13 @@ private void moveToNextChunkReader() case MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER: { if (Objects.isNull(firstChunkHeader4NextSequentialValueChunks)) { - long currentChunkHeaderOffset = tsFileSequenceReader.position() - 1; + final long currentChunkHeaderOffset = tsFileSequenceReader.position() - 1; chunkHeader = tsFileSequenceReader.readChunkHeader(marker); - final long nextMarkerOffset = - tsFileSequenceReader.position() + chunkHeader.getDataSize(); - if (Objects.isNull(currentDevice) - || !treePattern.matchesMeasurement( - currentDevice, chunkHeader.getMeasurementID())) { - tsFileSequenceReader.position(nextMarkerOffset); + if (filterChunk(currentChunkHeaderOffset, chunkHeader, true, marker)) { break; } - if (!currentModifications.isEmpty()) { - // Skip the chunk if it is fully deleted by mods - final Statistics statistics = - findAlignedChunkStatistics( - tsFileSequenceReader.getIChunkMetadataList( - currentDevice, chunkHeader.getMeasurementID()), - currentChunkHeaderOffset); - if (statistics != null - && ModsOperationUtil.isAllDeletedByMods( - currentDevice, - chunkHeader.getMeasurementID(), - statistics.getStartTime(), - statistics.getEndTime(), - currentModifications)) { - tsFileSequenceReader.position(nextMarkerOffset); - break; - } - } - // Increase value index final int valueIndex = measurementIndexMap.compute( @@ -694,6 +616,77 @@ private void moveToNextChunkReader() } } + private boolean filterChunk( + final long currentChunkHeaderOffset, + final ChunkHeader chunkHeader, + final boolean isAlignedValueChunk, + final byte marker) + throws IOException, IllegalPathException { + final long nextMarkerOffset = tsFileSequenceReader.position() + chunkHeader.getDataSize(); + + if (Objects.isNull(currentDevice)) { + tsFileSequenceReader.position(nextMarkerOffset); + return true; + } + + if (!isAlignedValueChunk) { + if ((chunkHeader.getChunkType() & TsFileConstant.TIME_COLUMN_MASK) + == TsFileConstant.TIME_COLUMN_MASK) { + timeChunkList.add( + new Chunk(chunkHeader, tsFileSequenceReader.readChunk(-1, chunkHeader.getDataSize()))); + isMultiPageList.add(marker == MetaMarker.TIME_CHUNK_HEADER); + return true; + } + } + + if (!treePattern.matchesMeasurement(currentDevice, chunkHeader.getMeasurementID())) { + tsFileSequenceReader.position(nextMarkerOffset); + return true; + } + + // Skip the chunk if it is fully deleted by mods + if (!currentModifications.isEmpty()) { + final Statistics statistics = + isAlignedValueChunk + ? findAlignedChunkStatistics( + tsFileSequenceReader.getIChunkMetadataList( + currentDevice, chunkHeader.getMeasurementID()), + currentChunkHeaderOffset) + : findNonAlignedChunkStatistics( + tsFileSequenceReader.getIChunkMetadataList( + currentDevice, chunkHeader.getMeasurementID()), + currentChunkHeaderOffset); + if (statistics != null + && ModsOperationUtil.isAllDeletedByMods( + currentDevice, + chunkHeader.getMeasurementID(), + statistics.getStartTime(), + statistics.getEndTime(), + currentModifications)) { + tsFileSequenceReader.position(nextMarkerOffset); + return true; + } + } + + if (Objects.nonNull(entity)) { + final TSStatus status = + AuthorityChecker.getAccessControl() + .checkSeriesPrivilege4Pipe( + entity, + Collections.singletonList( + new MeasurementPath(currentDevice, chunkHeader.getMeasurementID())), + PrivilegeType.READ_DATA); + if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + if (skipIfNoPrivileges) { + tsFileSequenceReader.position(nextMarkerOffset); + return true; + } + throw new AccessDeniedException(status.getMessage()); + } + } + return false; + } + private boolean recordAlignedChunk(final List valueChunkList, final byte marker) throws IOException { if (!valueChunkList.isEmpty()) { diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTsFileInsertionEventTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTsFileInsertionEventTest.java index 46401beb4850b..5ba0843bf8003 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTsFileInsertionEventTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTsFileInsertionEventTest.java @@ -42,6 +42,7 @@ import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ArrayDeviceTimeIndex; import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ITimeIndex; import org.apache.iotdb.db.utils.constant.TestConstant; +import org.apache.iotdb.pipe.api.exception.PipeException; import org.apache.tsfile.file.metadata.IDeviceID; import org.apache.tsfile.file.metadata.enums.CompressionType; @@ -144,6 +145,8 @@ public void testAuthCheck() throws Exception { treeEvent.throwIfNoPrivilege(); Assert.assertTrue(treeEvent.shouldParse4Privilege()); + Assert.assertThrows(PipeException.class, treeEvent::toTabletInsertionEvents); + treeEvent.setTreeSchemaMap(Collections.singletonMap(deviceID, new String[] {"s0", "s1"})); Assert.assertThrows(AccessDeniedException.class, treeEvent::throwIfNoPrivilege); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java index ae45860b7479e..8814190755e27 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java @@ -94,14 +94,14 @@ public void tearDown() throws Exception { } @Test - public void testQueryContainer() throws Exception { + public void testQueryParser() throws Exception { final long startTime = System.currentTimeMillis(); testToTabletInsertionEvents(true); System.out.println(System.currentTimeMillis() - startTime); } @Test - public void testScanContainer() throws Exception { + public void testScanParser() throws Exception { final long startTime = System.currentTimeMillis(); testToTabletInsertionEvents(false); System.out.println(System.currentTimeMillis() - startTime); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java index afe0ebe22df5e..a55312471b300 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java @@ -423,6 +423,11 @@ public boolean needToCommit() { return true; } + // If user has privilege: Do nothing + // If user doesn't have privilege, and skip if == true: set shouldParse4Privilege = true + // (The DeleteDataEvent will be parsed regardless of the flag, while insert node and tsFile will + // be parsed iff this flag == true) + // If user doesn't have privilege, and skip if == false: throw exception public void throwIfNoPrivilege() throws Exception { // Do nothing by default }