Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,13 @@ public void testSourcePermission() {
"create database root.db", "create timeSeries root.db.device.measurement int32"),
null);

// Write some aligned historical data
TestUtils.executeNonQueries(
senderEnv,
Arrays.asList(
"create aligned timeSeries root.vehicle.plane(temperature DOUBLE, pressure INT32)",
"insert into root.vehicle.plane(temperature, pressure) values (36.5, 1103)"));

// Transfer snapshot
try (final Connection connection = senderEnv.getConnection();
final Statement statement = connection.createStatement()) {
Expand Down Expand Up @@ -384,6 +391,13 @@ public void testSourcePermission() {
"count(root.vehicle.car.pressure),",
Collections.singleton("0,"));

// Exception, skip
TestUtils.assertDataAlwaysOnEnv(
receiverEnv,
"select count(temperature) from root.vehicle.plane",
"count(root.vehicle.plane.temperature),",
Collections.singleton("0,"));

// Alter pipe, throw exception if no privileges
try (final Connection connection = senderEnv.getConnection();
final Statement statement = connection.createStatement()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.iotdb.commons.schema.column.ColumnHeader;
import org.apache.iotdb.commons.schema.column.ColumnHeaderConstant;
import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerResp;
import org.apache.iotdb.confignode.rpc.thrift.TDBPrivilege;
import org.apache.iotdb.confignode.rpc.thrift.TListUserInfo;
Expand Down Expand Up @@ -114,6 +115,7 @@ public static AccessControl getAccessControl() {
return accessControl;
}

@TestOnly
public static void setAccessControl(AccessControl accessControl) {
AuthorityChecker.accessControl = accessControl;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ public boolean isGeneratedByPipe() {

@Override
public void throwIfNoPrivilege() {
// The privilege will be parsed at PipeEventCollector
if (skipIfNoPrivileges || !(deleteDataNode instanceof RelationalDeleteDataNode)) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,9 +287,6 @@ public boolean isGeneratedByPipe() {

@Override
public void throwIfNoPrivilege() throws Exception {
if (skipIfNoPrivileges) {
return;
}
final InsertNode node = insertNode;
if (Objects.isNull(node)) {
// Event is released, skip privilege check
Expand Down Expand Up @@ -320,10 +317,14 @@ private void checkTableName(final String tableName) {
userName,
new QualifiedObjectName(getTableModelDatabaseName(), tableName),
new UserEntity(Long.parseLong(userId), userName, cliHostname))) {
throw new AccessDeniedException(
String.format(
"No privilege for SELECT for user %s at table %s.%s",
userName, tableModelDatabaseName, tableName));
if (skipIfNoPrivileges) {
shouldParse4Privilege = true;
} else {
throw new AccessDeniedException(
String.format(
"No privilege for SELECT for user %s at table %s.%s",
userName, tableModelDatabaseName, tableName));
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -485,67 +485,13 @@ private void moveToNextChunkReader()
// Notice that the data in one chunk group is either aligned or non-aligned
// There is no need to consider non-aligned chunks when there are value chunks
currentIsMultiPage = marker == MetaMarker.CHUNK_HEADER;
long currentChunkHeaderOffset = tsFileSequenceReader.position() - 1;
final long currentChunkHeaderOffset = tsFileSequenceReader.position() - 1;
chunkHeader = tsFileSequenceReader.readChunkHeader(marker);

final long nextMarkerOffset =
tsFileSequenceReader.position() + chunkHeader.getDataSize();

if (Objects.isNull(currentDevice)) {
tsFileSequenceReader.position(nextMarkerOffset);
break;
}

if ((chunkHeader.getChunkType() & TsFileConstant.TIME_COLUMN_MASK)
== TsFileConstant.TIME_COLUMN_MASK) {
timeChunkList.add(
new Chunk(
chunkHeader, tsFileSequenceReader.readChunk(-1, chunkHeader.getDataSize())));
isMultiPageList.add(marker == MetaMarker.TIME_CHUNK_HEADER);
break;
}

if (!treePattern.matchesMeasurement(currentDevice, chunkHeader.getMeasurementID())) {
tsFileSequenceReader.position(nextMarkerOffset);
if (filterChunk(currentChunkHeaderOffset, chunkHeader, false, marker)) {
break;
}

// Skip the chunk if it is fully deleted by mods
if (!currentModifications.isEmpty()) {
final Statistics statistics =
findNonAlignedChunkStatistics(
tsFileSequenceReader.getIChunkMetadataList(
currentDevice, chunkHeader.getMeasurementID()),
currentChunkHeaderOffset);
if (statistics != null
&& ModsOperationUtil.isAllDeletedByMods(
currentDevice,
chunkHeader.getMeasurementID(),
statistics.getStartTime(),
statistics.getEndTime(),
currentModifications)) {
tsFileSequenceReader.position(nextMarkerOffset);
break;
}
}

if (Objects.nonNull(entity)) {
final TSStatus status =
AuthorityChecker.getAccessControl()
.checkSeriesPrivilege4Pipe(
entity,
Collections.singletonList(
new MeasurementPath(currentDevice, chunkHeader.getMeasurementID())),
PrivilegeType.READ_DATA);
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
if (skipIfNoPrivileges) {
tsFileSequenceReader.position(nextMarkerOffset);
break;
}
throw new AccessDeniedException(status.getMessage());
}
}

if (chunkHeader.getDataSize() > allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) {
PipeDataNodeResourceManager.memory()
.forceResize(allocatedMemoryBlockForChunk, chunkHeader.getDataSize());
Expand Down Expand Up @@ -573,37 +519,13 @@ private void moveToNextChunkReader()
case MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER:
{
if (Objects.isNull(firstChunkHeader4NextSequentialValueChunks)) {
long currentChunkHeaderOffset = tsFileSequenceReader.position() - 1;
final long currentChunkHeaderOffset = tsFileSequenceReader.position() - 1;
chunkHeader = tsFileSequenceReader.readChunkHeader(marker);

final long nextMarkerOffset =
tsFileSequenceReader.position() + chunkHeader.getDataSize();
if (Objects.isNull(currentDevice)
|| !treePattern.matchesMeasurement(
currentDevice, chunkHeader.getMeasurementID())) {
tsFileSequenceReader.position(nextMarkerOffset);
if (filterChunk(currentChunkHeaderOffset, chunkHeader, true, marker)) {
break;
}

if (!currentModifications.isEmpty()) {
// Skip the chunk if it is fully deleted by mods
final Statistics statistics =
findAlignedChunkStatistics(
tsFileSequenceReader.getIChunkMetadataList(
currentDevice, chunkHeader.getMeasurementID()),
currentChunkHeaderOffset);
if (statistics != null
&& ModsOperationUtil.isAllDeletedByMods(
currentDevice,
chunkHeader.getMeasurementID(),
statistics.getStartTime(),
statistics.getEndTime(),
currentModifications)) {
tsFileSequenceReader.position(nextMarkerOffset);
break;
}
}

// Increase value index
final int valueIndex =
measurementIndexMap.compute(
Expand Down Expand Up @@ -694,6 +616,77 @@ private void moveToNextChunkReader()
}
}

private boolean filterChunk(
final long currentChunkHeaderOffset,
final ChunkHeader chunkHeader,
final boolean isAlignedValueChunk,
final byte marker)
throws IOException, IllegalPathException {
final long nextMarkerOffset = tsFileSequenceReader.position() + chunkHeader.getDataSize();

if (Objects.isNull(currentDevice)) {
tsFileSequenceReader.position(nextMarkerOffset);
return true;
}

if (!isAlignedValueChunk) {
if ((chunkHeader.getChunkType() & TsFileConstant.TIME_COLUMN_MASK)
== TsFileConstant.TIME_COLUMN_MASK) {
timeChunkList.add(
new Chunk(chunkHeader, tsFileSequenceReader.readChunk(-1, chunkHeader.getDataSize())));
isMultiPageList.add(marker == MetaMarker.TIME_CHUNK_HEADER);
return true;
}
}

if (!treePattern.matchesMeasurement(currentDevice, chunkHeader.getMeasurementID())) {
tsFileSequenceReader.position(nextMarkerOffset);
return true;
}

// Skip the chunk if it is fully deleted by mods
if (!currentModifications.isEmpty()) {
final Statistics statistics =
isAlignedValueChunk
? findAlignedChunkStatistics(
tsFileSequenceReader.getIChunkMetadataList(
currentDevice, chunkHeader.getMeasurementID()),
currentChunkHeaderOffset)
: findNonAlignedChunkStatistics(
tsFileSequenceReader.getIChunkMetadataList(
currentDevice, chunkHeader.getMeasurementID()),
currentChunkHeaderOffset);
if (statistics != null
&& ModsOperationUtil.isAllDeletedByMods(
currentDevice,
chunkHeader.getMeasurementID(),
statistics.getStartTime(),
statistics.getEndTime(),
currentModifications)) {
tsFileSequenceReader.position(nextMarkerOffset);
return true;
}
}

if (Objects.nonNull(entity)) {
final TSStatus status =
AuthorityChecker.getAccessControl()
.checkSeriesPrivilege4Pipe(
entity,
Collections.singletonList(
new MeasurementPath(currentDevice, chunkHeader.getMeasurementID())),
PrivilegeType.READ_DATA);
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
if (skipIfNoPrivileges) {
tsFileSequenceReader.position(nextMarkerOffset);
return true;
}
throw new AccessDeniedException(status.getMessage());
}
}
return false;
}

private boolean recordAlignedChunk(final List<Chunk> valueChunkList, final byte marker)
throws IOException {
if (!valueChunkList.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ArrayDeviceTimeIndex;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ITimeIndex;
import org.apache.iotdb.db.utils.constant.TestConstant;
import org.apache.iotdb.pipe.api.exception.PipeException;

import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.file.metadata.enums.CompressionType;
Expand Down Expand Up @@ -144,6 +145,8 @@ public void testAuthCheck() throws Exception {
treeEvent.throwIfNoPrivilege();
Assert.assertTrue(treeEvent.shouldParse4Privilege());

Assert.assertThrows(PipeException.class, treeEvent::toTabletInsertionEvents);

treeEvent.setTreeSchemaMap(Collections.singletonMap(deviceID, new String[] {"s0", "s1"}));
Assert.assertThrows(AccessDeniedException.class, treeEvent::throwIfNoPrivilege);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,14 @@ public void tearDown() throws Exception {
}

@Test
public void testQueryContainer() throws Exception {
public void testQueryParser() throws Exception {
final long startTime = System.currentTimeMillis();
testToTabletInsertionEvents(true);
System.out.println(System.currentTimeMillis() - startTime);
}

@Test
public void testScanContainer() throws Exception {
public void testScanParser() throws Exception {
final long startTime = System.currentTimeMillis();
testToTabletInsertionEvents(false);
System.out.println(System.currentTimeMillis() - startTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,11 @@ public boolean needToCommit() {
return true;
}

// If user has privilege: Do nothing
// If user doesn't have privilege, and skip if == true: set shouldParse4Privilege = true
// (The DeleteDataEvent will be parsed regardless of the flag, while insert node and tsFile will
// be parsed iff this flag == true)
// If user doesn't have privilege, and skip if == false: throw exception
public void throwIfNoPrivilege() throws Exception {
// Do nothing by default
}
Expand Down
Loading