Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public static void forecastTableFunctionErrorTest(
"701: The OUTPUT_START_TIME should be greater than the maximum timestamp of target time series. Expected greater than [5759] but found [5759].");

// OUTPUT_LENGTH error
String invalidOutputLengthSQL =
String invalidOutputLengthSQLWithZero =
String.format(
FORECAST_TABLE_FUNCTION_SQL_TEMPLATE,
modelInfo.getModelId(),
Expand All @@ -144,7 +144,24 @@ public static void forecastTableFunctionErrorTest(
0,
1,
"time");
errorTest(statement, invalidOutputLengthSQL, "701: OUTPUT_LENGTH should be greater than 0");
errorTest(
statement, invalidOutputLengthSQLWithZero, "701: OUTPUT_LENGTH should be greater than 0");

String invalidOutputLengthSQLWithOutOfRange =
String.format(
FORECAST_TABLE_FUNCTION_SQL_TEMPLATE,
modelInfo.getModelId(),
0,
5760,
2880,
5760,
2881,
1,
"time");
errorTest(
statement,
invalidOutputLengthSQLWithOutOfRange,
"1599: Error occurred while executing forecast:[Attribute output_length expect value between 1 and 2880, got 2881 instead.]");

// OUTPUT_INTERVAL error
String invalidOutputIntervalSQL =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,5 +155,13 @@ private void failTest(Statement statement) {
statement,
"UNLOAD MODEL sundial FROM DEVICES \"unknown\"",
"1507: Device ID [unknown] is not available. You can use 'SHOW AI_DEVICES' to retrieve the available devices.");
errorTest(
statement,
"LOAD MODEL sundial TO DEVICES \"0,0\"",
"1509: Device ID list contains duplicate entries.");
errorTest(
statement,
"UNLOAD MODEL sundial FROM DEVICES \"0,0\"",
"1510: Device ID list contains duplicate entries.");
}
}
2 changes: 2 additions & 0 deletions iotdb-core/ainode/iotdb/ainode/core/constant.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ class TSStatusCode(Enum):
DROP_BUILTIN_MODEL_ERROR = 1506
DROP_MODEL_ERROR = 1507
UNAVAILABLE_AI_DEVICE_ERROR = 1508
LOAD_MODEL_ERROR = 1509
UNLOAD_MODEL_ERROR = 1510

INVALID_URI_ERROR = 1511
INVALID_INFERENCE_CONFIG = 1512
Expand Down
10 changes: 10 additions & 0 deletions iotdb-core/ainode/iotdb/ainode/core/rpc/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,11 @@ def loadModel(self, req: TLoadModelReq) -> TSStatus:
status = self._ensure_model_is_registered(req.existingModelId)
if status.code != TSStatusCode.SUCCESS_STATUS.value:
return status
if len(set(req.deviceIdList)) != len(req.deviceIdList):
return TSStatus(
code=TSStatusCode.LOAD_MODEL_ERROR.value,
message="Device ID list contains duplicate entries.",
)
status = self._ensure_device_id_is_available(req.deviceIdList)
if status.code != TSStatusCode.SUCCESS_STATUS.value:
return status
Expand All @@ -104,6 +109,11 @@ def unloadModel(self, req: TUnloadModelReq) -> TSStatus:
status = self._ensure_model_is_registered(req.modelId)
if status.code != TSStatusCode.SUCCESS_STATUS.value:
return status
if len(set(req.deviceIdList)) != len(req.deviceIdList):
return TSStatus(
code=TSStatusCode.UNLOAD_MODEL_ERROR.value,
message="Device ID list contains duplicate entries.",
)
status = self._ensure_device_id_is_available(req.deviceIdList)
if status.code != TSStatusCode.SUCCESS_STATUS.value:
return status
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.db.exception.ainode;

import org.apache.iotdb.commons.exception.IoTDBRuntimeException;

import static org.apache.iotdb.rpc.TSStatusCode.CAN_NOT_CONNECT_AINODE;

public class AINodeConnectionException extends IoTDBRuntimeException {

private static final String message =
"Failed to connect to AINode because [%s], please check the status of your AINode.";

public AINodeConnectionException(Exception e) {
super(String.format(message, e.toString()), CAN_NOT_CONNECT_AINODE.getStatusCode());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

import org.apache.iotdb.ainode.rpc.thrift.TInferenceReq;
import org.apache.iotdb.ainode.rpc.thrift.TInferenceResp;
import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.db.exception.ainode.AINodeConnectionException;
import org.apache.iotdb.db.exception.runtime.ModelInferenceProcessException;
import org.apache.iotdb.db.protocol.client.an.AINodeClient;
import org.apache.iotdb.db.protocol.client.an.AINodeClientManager;
Expand All @@ -33,6 +35,7 @@

import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.thrift.TException;
import org.apache.tsfile.block.column.Column;
import org.apache.tsfile.block.column.ColumnBuilder;
import org.apache.tsfile.read.common.block.TsBlock;
Expand Down Expand Up @@ -247,8 +250,8 @@ private void submitInferenceTask() {
new TInferenceReq(
modelInferenceDescriptor.getModelId(), serde.serialize(inputTsBlock))
.setInferenceAttributes(modelInferenceDescriptor.getInferenceAttributes()));
} catch (Exception e) {
throw new ModelInferenceProcessException(e.getMessage());
} catch (ClientManagerException | TException e) {
throw new AINodeConnectionException(e);
}
},
modelInferenceExecutor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.BatchProcessException;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.ainode.AINodeConnectionException;
import org.apache.iotdb.db.exception.metadata.PathNotExistException;
import org.apache.iotdb.db.exception.metadata.SchemaQuotaExceededException;
import org.apache.iotdb.db.exception.sql.SemanticException;
Expand Down Expand Up @@ -3705,8 +3706,8 @@ public SettableFuture<ConfigTaskResult> createModel(String modelId, String uri)
} else {
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
}
} catch (final TException | ClientManagerException e) {
future.setException(e);
} catch (final ClientManagerException | TException e) {
future.setException(new AINodeConnectionException(e));
}
return future;
}
Expand All @@ -3723,7 +3724,7 @@ public SettableFuture<ConfigTaskResult> dropModel(final String modelId) {
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
}
} catch (final ClientManagerException | TException e) {
future.setException(e);
future.setException(new AINodeConnectionException(e));
}
return future;
}
Expand All @@ -3743,8 +3744,8 @@ public SettableFuture<ConfigTaskResult> showModels(final String modelId) {
return future;
}
ShowModelsTask.buildTsBlock(resp, future);
} catch (final Exception e) {
future.setException(e);
} catch (final ClientManagerException | TException e) {
future.setException(new AINodeConnectionException(e));
}
return future;
}
Expand All @@ -3762,8 +3763,8 @@ public SettableFuture<ConfigTaskResult> showLoadedModels(List<String> deviceIdLi
return future;
}
ShowLoadedModelsTask.buildTsBlock(resp, future);
} catch (final Exception e) {
future.setException(e);
} catch (final ClientManagerException | TException e) {
future.setException(new AINodeConnectionException(e));
}
return future;
}
Expand All @@ -3779,8 +3780,8 @@ public SettableFuture<ConfigTaskResult> showAIDevices() {
return future;
}
ShowAIDevicesTask.buildTsBlock(resp, future);
} catch (final Exception e) {
future.setException(e);
} catch (final ClientManagerException | TException e) {
future.setException(new AINodeConnectionException(e));
}
return future;
}
Expand All @@ -3798,8 +3799,8 @@ public SettableFuture<ConfigTaskResult> loadModel(
} else {
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
}
} catch (final Exception e) {
future.setException(e);
} catch (final ClientManagerException | TException e) {
future.setException(new AINodeConnectionException(e));
}
return future;
}
Expand All @@ -3817,8 +3818,8 @@ public SettableFuture<ConfigTaskResult> unloadModel(
} else {
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
}
} catch (final Exception e) {
future.setException(e);
} catch (final ClientManagerException | TException e) {
future.setException(new AINodeConnectionException(e));
}
return future;
}
Expand Down Expand Up @@ -3850,8 +3851,8 @@ public SettableFuture<ConfigTaskResult> createTuningTask(
} else {
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
}
} catch (final Exception e) {
future.setException(e);
} catch (final ClientManagerException | TException e) {
future.setException(new AINodeConnectionException(e));
}
return future;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import org.apache.iotdb.ainode.rpc.thrift.TForecastReq;
import org.apache.iotdb.ainode.rpc.thrift.TForecastResp;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
import org.apache.iotdb.db.exception.ainode.AINodeConnectionException;
import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.protocol.client.an.AINodeClient;
import org.apache.iotdb.db.protocol.client.an.AINodeClientManager;
Expand All @@ -44,6 +46,7 @@
import org.apache.iotdb.udf.api.relational.table.specification.TableParameterSpecification;
import org.apache.iotdb.udf.api.type.Type;

import org.apache.thrift.TException;
import org.apache.tsfile.block.column.Column;
import org.apache.tsfile.block.column.ColumnBuilder;
import org.apache.tsfile.enums.TSDataType;
Expand Down Expand Up @@ -71,7 +74,6 @@

import static org.apache.iotdb.commons.udf.builtin.relational.tvf.WindowTVFUtils.findColumnIndex;
import static org.apache.iotdb.db.queryengine.plan.relational.utils.ResultColumnAppender.createResultColumnAppender;
import static org.apache.iotdb.rpc.TSStatusCode.CAN_NOT_CONNECT_AINODE;

public class ClassifyTableFunction implements TableFunction {

Expand Down Expand Up @@ -367,8 +369,10 @@ private TsBlock classify() {
try (AINodeClient client =
CLIENT_MANAGER.borrowClient(AINodeClientManager.AINODE_ID_PLACEHOLDER)) {
resp = client.forecast(new TForecastReq(modelId, SERDE.serialize(inputData), outputLength));
} catch (Exception e) {
throw new IoTDBRuntimeException(e.getMessage(), CAN_NOT_CONNECT_AINODE.getStatusCode());
} catch (ClientManagerException | TException e) {
throw new AINodeConnectionException(e);
} catch (IOException e) {
throw new RuntimeException(e);
}

if (resp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import org.apache.iotdb.ainode.rpc.thrift.TForecastReq;
import org.apache.iotdb.ainode.rpc.thrift.TForecastResp;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
import org.apache.iotdb.db.exception.ainode.AINodeConnectionException;
import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.protocol.client.an.AINodeClient;
import org.apache.iotdb.db.protocol.client.an.AINodeClientManager;
Expand All @@ -43,6 +45,7 @@
import org.apache.iotdb.udf.api.relational.table.specification.TableParameterSpecification;
import org.apache.iotdb.udf.api.type.Type;

import org.apache.thrift.TException;
import org.apache.tsfile.block.column.Column;
import org.apache.tsfile.block.column.ColumnBuilder;
import org.apache.tsfile.enums.TSDataType;
Expand Down Expand Up @@ -72,7 +75,6 @@

import static org.apache.iotdb.commons.udf.builtin.relational.tvf.WindowTVFUtils.findColumnIndex;
import static org.apache.iotdb.db.queryengine.plan.relational.utils.ResultColumnAppender.createResultColumnAppender;
import static org.apache.iotdb.rpc.TSStatusCode.CAN_NOT_CONNECT_AINODE;

public class ForecastTableFunction implements TableFunction {

Expand Down Expand Up @@ -563,8 +565,10 @@ protected TsBlock forecast() {
client.forecast(
new TForecastReq(modelId, SERDE.serialize(inputData), outputLength)
.setOptions(options));
} catch (Exception e) {
throw new IoTDBRuntimeException(e.getMessage(), CAN_NOT_CONNECT_AINODE.getStatusCode());
} catch (ClientManagerException | TException e) {
throw new AINodeConnectionException(e);
} catch (IOException e) {
throw new RuntimeException(e);
}

if (resp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import org.apache.iotdb.ainode.rpc.thrift.TForecastReq;
import org.apache.iotdb.ainode.rpc.thrift.TForecastResp;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
import org.apache.iotdb.db.exception.ainode.AINodeConnectionException;
import org.apache.iotdb.db.protocol.client.an.AINodeClient;
import org.apache.iotdb.db.protocol.client.an.AINodeClientManager;
import org.apache.iotdb.rpc.TSStatusCode;
Expand All @@ -34,6 +36,7 @@
import org.apache.iotdb.udf.api.customizer.strategy.RowByRowAccessStrategy;
import org.apache.iotdb.udf.api.type.Type;

import org.apache.thrift.TException;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.read.common.block.TsBlock;
import org.apache.tsfile.read.common.block.TsBlockBuilder;
Expand Down Expand Up @@ -213,9 +216,8 @@ private TsBlock forecast() throws Exception {
client.forecast(
new TForecastReq(model_id, serde.serialize(inputData), outputLength)
.setOptions(options));
} catch (Exception e) {
throw new IoTDBRuntimeException(
e.getMessage(), TSStatusCode.CAN_NOT_CONNECT_AINODE.getStatusCode());
} catch (ClientManagerException | TException e) {
throw new AINodeConnectionException(e);
}

if (resp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
Expand Down
Loading