diff --git a/integration-test/src/test/java/org/apache/iotdb/ainode/it/AINodeForecastIT.java b/integration-test/src/test/java/org/apache/iotdb/ainode/it/AINodeForecastIT.java index e2d759d4bcf8c..eb4a981389e4f 100644 --- a/integration-test/src/test/java/org/apache/iotdb/ainode/it/AINodeForecastIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/ainode/it/AINodeForecastIT.java @@ -133,7 +133,7 @@ public static void forecastTableFunctionErrorTest( "701: The OUTPUT_START_TIME should be greater than the maximum timestamp of target time series. Expected greater than [5759] but found [5759]."); // OUTPUT_LENGTH error - String invalidOutputLengthSQL = + String invalidOutputLengthSQLWithZero = String.format( FORECAST_TABLE_FUNCTION_SQL_TEMPLATE, modelInfo.getModelId(), @@ -144,7 +144,24 @@ public static void forecastTableFunctionErrorTest( 0, 1, "time"); - errorTest(statement, invalidOutputLengthSQL, "701: OUTPUT_LENGTH should be greater than 0"); + errorTest( + statement, invalidOutputLengthSQLWithZero, "701: OUTPUT_LENGTH should be greater than 0"); + + String invalidOutputLengthSQLWithOutOfRange = + String.format( + FORECAST_TABLE_FUNCTION_SQL_TEMPLATE, + modelInfo.getModelId(), + 0, + 5760, + 2880, + 5760, + 2881, + 1, + "time"); + errorTest( + statement, + invalidOutputLengthSQLWithOutOfRange, + "1599: Error occurred while executing forecast:[Attribute output_length expect value between 1 and 2880, got 2881 instead.]"); // OUTPUT_INTERVAL error String invalidOutputIntervalSQL = diff --git a/integration-test/src/test/java/org/apache/iotdb/ainode/it/AINodeInstanceManagementIT.java b/integration-test/src/test/java/org/apache/iotdb/ainode/it/AINodeInstanceManagementIT.java index 2ae1b860cd230..15ddce11ede11 100644 --- a/integration-test/src/test/java/org/apache/iotdb/ainode/it/AINodeInstanceManagementIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/ainode/it/AINodeInstanceManagementIT.java @@ -155,5 +155,13 @@ private void failTest(Statement statement) { statement, "UNLOAD MODEL sundial FROM DEVICES \"unknown\"", "1507: Device ID [unknown] is not available. You can use 'SHOW AI_DEVICES' to retrieve the available devices."); + errorTest( + statement, + "LOAD MODEL sundial TO DEVICES \"0,0\"", + "1509: Device ID list contains duplicate entries."); + errorTest( + statement, + "UNLOAD MODEL sundial FROM DEVICES \"0,0\"", + "1510: Device ID list contains duplicate entries."); } } diff --git a/iotdb-core/ainode/iotdb/ainode/core/constant.py b/iotdb-core/ainode/iotdb/ainode/core/constant.py index 68f64a79afc45..b0019722630fa 100644 --- a/iotdb-core/ainode/iotdb/ainode/core/constant.py +++ b/iotdb-core/ainode/iotdb/ainode/core/constant.py @@ -87,6 +87,8 @@ class TSStatusCode(Enum): DROP_BUILTIN_MODEL_ERROR = 1506 DROP_MODEL_ERROR = 1507 UNAVAILABLE_AI_DEVICE_ERROR = 1508 + LOAD_MODEL_ERROR = 1509 + UNLOAD_MODEL_ERROR = 1510 INVALID_URI_ERROR = 1511 INVALID_INFERENCE_CONFIG = 1512 diff --git a/iotdb-core/ainode/iotdb/ainode/core/rpc/handler.py b/iotdb-core/ainode/iotdb/ainode/core/rpc/handler.py index e925c3791b830..e303a2c4c352d 100644 --- a/iotdb-core/ainode/iotdb/ainode/core/rpc/handler.py +++ b/iotdb-core/ainode/iotdb/ainode/core/rpc/handler.py @@ -92,6 +92,11 @@ def loadModel(self, req: TLoadModelReq) -> TSStatus: status = self._ensure_model_is_registered(req.existingModelId) if status.code != TSStatusCode.SUCCESS_STATUS.value: return status + if len(set(req.deviceIdList)) != len(req.deviceIdList): + return TSStatus( + code=TSStatusCode.LOAD_MODEL_ERROR.value, + message="Device ID list contains duplicate entries.", + ) status = self._ensure_device_id_is_available(req.deviceIdList) if status.code != TSStatusCode.SUCCESS_STATUS.value: return status @@ -104,6 +109,11 @@ def unloadModel(self, req: TUnloadModelReq) -> TSStatus: status = self._ensure_model_is_registered(req.modelId) if status.code != TSStatusCode.SUCCESS_STATUS.value: return status + if len(set(req.deviceIdList)) != len(req.deviceIdList): + return TSStatus( + code=TSStatusCode.UNLOAD_MODEL_ERROR.value, + message="Device ID list contains duplicate entries.", + ) status = self._ensure_device_id_is_available(req.deviceIdList) if status.code != TSStatusCode.SUCCESS_STATUS.value: return status diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/ainode/AINodeConnectionException.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/ainode/AINodeConnectionException.java new file mode 100644 index 0000000000000..2d13d08c9afba --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/ainode/AINodeConnectionException.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.exception.ainode; + +import org.apache.iotdb.commons.exception.IoTDBRuntimeException; + +import static org.apache.iotdb.rpc.TSStatusCode.CAN_NOT_CONNECT_AINODE; + +public class AINodeConnectionException extends IoTDBRuntimeException { + + private static final String message = + "Failed to connect to AINode because [%s], please check the status of your AINode."; + + public AINodeConnectionException(Exception e) { + super(String.format(message, e.toString()), CAN_NOT_CONNECT_AINODE.getStatusCode()); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/ai/InferenceOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/ai/InferenceOperator.java index 29e5580311d0b..8bf9f5ead14b1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/ai/InferenceOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/ai/InferenceOperator.java @@ -21,6 +21,8 @@ import org.apache.iotdb.ainode.rpc.thrift.TInferenceReq; import org.apache.iotdb.ainode.rpc.thrift.TInferenceResp; +import org.apache.iotdb.commons.client.exception.ClientManagerException; +import org.apache.iotdb.db.exception.ainode.AINodeConnectionException; import org.apache.iotdb.db.exception.runtime.ModelInferenceProcessException; import org.apache.iotdb.db.protocol.client.an.AINodeClient; import org.apache.iotdb.db.protocol.client.an.AINodeClientManager; @@ -33,6 +35,7 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import org.apache.thrift.TException; import org.apache.tsfile.block.column.Column; import org.apache.tsfile.block.column.ColumnBuilder; import org.apache.tsfile.read.common.block.TsBlock; @@ -247,8 +250,8 @@ private void submitInferenceTask() { new TInferenceReq( modelInferenceDescriptor.getModelId(), serde.serialize(inputTsBlock)) .setInferenceAttributes(modelInferenceDescriptor.getInferenceAttributes())); - } catch (Exception e) { - throw new ModelInferenceProcessException(e.getMessage()); + } catch (ClientManagerException | TException e) { + throw new AINodeConnectionException(e); } }, modelInferenceExecutor); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java index db8e55c16dbb5..fac08cfe0e8df 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java @@ -175,6 +175,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.BatchProcessException; import org.apache.iotdb.db.exception.StorageEngineException; +import org.apache.iotdb.db.exception.ainode.AINodeConnectionException; import org.apache.iotdb.db.exception.metadata.PathNotExistException; import org.apache.iotdb.db.exception.metadata.SchemaQuotaExceededException; import org.apache.iotdb.db.exception.sql.SemanticException; @@ -3705,8 +3706,8 @@ public SettableFuture createModel(String modelId, String uri) } else { future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); } - } catch (final TException | ClientManagerException e) { - future.setException(e); + } catch (final ClientManagerException | TException e) { + future.setException(new AINodeConnectionException(e)); } return future; } @@ -3723,7 +3724,7 @@ public SettableFuture dropModel(final String modelId) { future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); } } catch (final ClientManagerException | TException e) { - future.setException(e); + future.setException(new AINodeConnectionException(e)); } return future; } @@ -3743,8 +3744,8 @@ public SettableFuture showModels(final String modelId) { return future; } ShowModelsTask.buildTsBlock(resp, future); - } catch (final Exception e) { - future.setException(e); + } catch (final ClientManagerException | TException e) { + future.setException(new AINodeConnectionException(e)); } return future; } @@ -3762,8 +3763,8 @@ public SettableFuture showLoadedModels(List deviceIdLi return future; } ShowLoadedModelsTask.buildTsBlock(resp, future); - } catch (final Exception e) { - future.setException(e); + } catch (final ClientManagerException | TException e) { + future.setException(new AINodeConnectionException(e)); } return future; } @@ -3779,8 +3780,8 @@ public SettableFuture showAIDevices() { return future; } ShowAIDevicesTask.buildTsBlock(resp, future); - } catch (final Exception e) { - future.setException(e); + } catch (final ClientManagerException | TException e) { + future.setException(new AINodeConnectionException(e)); } return future; } @@ -3798,8 +3799,8 @@ public SettableFuture loadModel( } else { future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); } - } catch (final Exception e) { - future.setException(e); + } catch (final ClientManagerException | TException e) { + future.setException(new AINodeConnectionException(e)); } return future; } @@ -3817,8 +3818,8 @@ public SettableFuture unloadModel( } else { future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); } - } catch (final Exception e) { - future.setException(e); + } catch (final ClientManagerException | TException e) { + future.setException(new AINodeConnectionException(e)); } return future; } @@ -3850,8 +3851,8 @@ public SettableFuture createTuningTask( } else { future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); } - } catch (final Exception e) { - future.setException(e); + } catch (final ClientManagerException | TException e) { + future.setException(new AINodeConnectionException(e)); } return future; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/ClassifyTableFunction.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/ClassifyTableFunction.java index 34a1a6b223981..61036302af874 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/ClassifyTableFunction.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/ClassifyTableFunction.java @@ -22,7 +22,9 @@ import org.apache.iotdb.ainode.rpc.thrift.TForecastReq; import org.apache.iotdb.ainode.rpc.thrift.TForecastResp; import org.apache.iotdb.commons.client.IClientManager; +import org.apache.iotdb.commons.client.exception.ClientManagerException; import org.apache.iotdb.commons.exception.IoTDBRuntimeException; +import org.apache.iotdb.db.exception.ainode.AINodeConnectionException; import org.apache.iotdb.db.exception.sql.SemanticException; import org.apache.iotdb.db.protocol.client.an.AINodeClient; import org.apache.iotdb.db.protocol.client.an.AINodeClientManager; @@ -44,6 +46,7 @@ import org.apache.iotdb.udf.api.relational.table.specification.TableParameterSpecification; import org.apache.iotdb.udf.api.type.Type; +import org.apache.thrift.TException; import org.apache.tsfile.block.column.Column; import org.apache.tsfile.block.column.ColumnBuilder; import org.apache.tsfile.enums.TSDataType; @@ -71,7 +74,6 @@ import static org.apache.iotdb.commons.udf.builtin.relational.tvf.WindowTVFUtils.findColumnIndex; import static org.apache.iotdb.db.queryengine.plan.relational.utils.ResultColumnAppender.createResultColumnAppender; -import static org.apache.iotdb.rpc.TSStatusCode.CAN_NOT_CONNECT_AINODE; public class ClassifyTableFunction implements TableFunction { @@ -367,8 +369,10 @@ private TsBlock classify() { try (AINodeClient client = CLIENT_MANAGER.borrowClient(AINodeClientManager.AINODE_ID_PLACEHOLDER)) { resp = client.forecast(new TForecastReq(modelId, SERDE.serialize(inputData), outputLength)); - } catch (Exception e) { - throw new IoTDBRuntimeException(e.getMessage(), CAN_NOT_CONNECT_AINODE.getStatusCode()); + } catch (ClientManagerException | TException e) { + throw new AINodeConnectionException(e); + } catch (IOException e) { + throw new RuntimeException(e); } if (resp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/ForecastTableFunction.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/ForecastTableFunction.java index 579802c542a41..1a0fdd4bb9634 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/ForecastTableFunction.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/ForecastTableFunction.java @@ -22,7 +22,9 @@ import org.apache.iotdb.ainode.rpc.thrift.TForecastReq; import org.apache.iotdb.ainode.rpc.thrift.TForecastResp; import org.apache.iotdb.commons.client.IClientManager; +import org.apache.iotdb.commons.client.exception.ClientManagerException; import org.apache.iotdb.commons.exception.IoTDBRuntimeException; +import org.apache.iotdb.db.exception.ainode.AINodeConnectionException; import org.apache.iotdb.db.exception.sql.SemanticException; import org.apache.iotdb.db.protocol.client.an.AINodeClient; import org.apache.iotdb.db.protocol.client.an.AINodeClientManager; @@ -43,6 +45,7 @@ import org.apache.iotdb.udf.api.relational.table.specification.TableParameterSpecification; import org.apache.iotdb.udf.api.type.Type; +import org.apache.thrift.TException; import org.apache.tsfile.block.column.Column; import org.apache.tsfile.block.column.ColumnBuilder; import org.apache.tsfile.enums.TSDataType; @@ -72,7 +75,6 @@ import static org.apache.iotdb.commons.udf.builtin.relational.tvf.WindowTVFUtils.findColumnIndex; import static org.apache.iotdb.db.queryengine.plan.relational.utils.ResultColumnAppender.createResultColumnAppender; -import static org.apache.iotdb.rpc.TSStatusCode.CAN_NOT_CONNECT_AINODE; public class ForecastTableFunction implements TableFunction { @@ -563,8 +565,10 @@ protected TsBlock forecast() { client.forecast( new TForecastReq(modelId, SERDE.serialize(inputData), outputLength) .setOptions(options)); - } catch (Exception e) { - throw new IoTDBRuntimeException(e.getMessage(), CAN_NOT_CONNECT_AINODE.getStatusCode()); + } catch (ClientManagerException | TException e) { + throw new AINodeConnectionException(e); + } catch (IOException e) { + throw new RuntimeException(e); } if (resp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/udf/UDTFForecast.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/udf/UDTFForecast.java index a6794a5896fb8..efcc4c82eaa33 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/udf/UDTFForecast.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/udf/UDTFForecast.java @@ -22,7 +22,9 @@ import org.apache.iotdb.ainode.rpc.thrift.TForecastReq; import org.apache.iotdb.ainode.rpc.thrift.TForecastResp; import org.apache.iotdb.commons.client.IClientManager; +import org.apache.iotdb.commons.client.exception.ClientManagerException; import org.apache.iotdb.commons.exception.IoTDBRuntimeException; +import org.apache.iotdb.db.exception.ainode.AINodeConnectionException; import org.apache.iotdb.db.protocol.client.an.AINodeClient; import org.apache.iotdb.db.protocol.client.an.AINodeClientManager; import org.apache.iotdb.rpc.TSStatusCode; @@ -34,6 +36,7 @@ import org.apache.iotdb.udf.api.customizer.strategy.RowByRowAccessStrategy; import org.apache.iotdb.udf.api.type.Type; +import org.apache.thrift.TException; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.read.common.block.TsBlock; import org.apache.tsfile.read.common.block.TsBlockBuilder; @@ -213,9 +216,8 @@ private TsBlock forecast() throws Exception { client.forecast( new TForecastReq(model_id, serde.serialize(inputData), outputLength) .setOptions(options)); - } catch (Exception e) { - throw new IoTDBRuntimeException( - e.getMessage(), TSStatusCode.CAN_NOT_CONNECT_AINODE.getStatusCode()); + } catch (ClientManagerException | TException e) { + throw new AINodeConnectionException(e); } if (resp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {