From f5a82b90ca20e72b5f5f56a38d3d412c767a7fc6 Mon Sep 17 00:00:00 2001 From: Casion Date: Wed, 13 Aug 2025 13:16:04 +0800 Subject: [PATCH 01/20] update version to 1.8.0 --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 28aba1ef3c..e70cf06919 100644 --- a/pom.xml +++ b/pom.xml @@ -102,7 +102,7 @@ - 1.7.0 + 1.8.0 3.9.2 4.2.0 From dac7e739d3f5852cae6467fdbeabecc737195af3 Mon Sep 17 00:00:00 2001 From: Kazuto Iris <78157415+kazutoiris@users.noreply.github.com> Date: Wed, 20 Aug 2025 15:17:32 +0800 Subject: [PATCH 02/20] Upgrade `LINKIS_VERSION` to `1.8.0` to Fix Integration Test Failure in GitHub Actions (#5250) * build(ci): update Linkis version to 1.8.0 Signed-off-by: kazutoiris <78157415+kazutoiris@users.noreply.github.com> * build(ci): update Docker publish workflow to use the current repository Signed-off-by: kazutoiris <78157415+kazutoiris@users.noreply.github.com> --------- Signed-off-by: kazutoiris <78157415+kazutoiris@users.noreply.github.com> --- .github/workflows/integration-test.yml | 2 +- .github/workflows/publish-docker.yaml | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/integration-test.yml b/.github/workflows/integration-test.yml index 8a26905abe..f99f8c30e6 100644 --- a/.github/workflows/integration-test.yml +++ b/.github/workflows/integration-test.yml @@ -45,7 +45,7 @@ jobs: TAG: ${{ github.sha }} SKIP_TEST: true HUB: ghcr.io/apache/linkis - LINKIS_VERSION: 1.7.0 + LINKIS_VERSION: 1.8.0 steps: - name: Free up disk space run: | diff --git a/.github/workflows/publish-docker.yaml b/.github/workflows/publish-docker.yaml index d9199040d8..1b7c675a56 100644 --- a/.github/workflows/publish-docker.yaml +++ b/.github/workflows/publish-docker.yaml @@ -33,8 +33,8 @@ jobs: env: TAG: ${{ github.sha }} SKIP_TEST: true - HUB: ghcr.io/apache/linkis - LINKIS_VERSION: 1.7.0 + HUB: ghcr.io/${{ github.repository }} + LINKIS_VERSION: 1.8.0 steps: - name: Checkout uses: actions/checkout@v4 From 4ee7ea45e1ad1e4cf0d6c4c4b1b10c37a0b6f541 Mon Sep 17 00:00:00 2001 From: Kazuto Iris <78157415+kazutoiris@users.noreply.github.com> Date: Fri, 29 Aug 2025 12:57:52 +0800 Subject: [PATCH 03/20] Fix KIND image loading, script typo, and cache directory creation (#5251) * fix(ci): pass `USING_KIND` variable to `install-mysql.sh` Signed-off-by: kazutoiris <78157415+kazutoiris@users.noreply.github.com> * fix(ci): correct typo in script name Signed-off-by: kazutoiris <78157415+kazutoiris@users.noreply.github.com> * fix(ci): create `TAR_CACHE_ROOT` directory if not exists Signed-off-by: kazutoiris <78157415+kazutoiris@users.noreply.github.com> --------- Signed-off-by: kazutoiris <78157415+kazutoiris@users.noreply.github.com> --- linkis-dist/bin/install-linkis-to-kubernetes.sh | 2 +- ...-with-mysql-jdbc.sh => make-linkis-image-with-mysql-jdbc.sh} | 0 linkis-dist/docker/scripts/utils.sh | 2 ++ 3 files changed, 3 insertions(+), 1 deletion(-) rename linkis-dist/docker/scripts/{make-linikis-image-with-mysql-jdbc.sh => make-linkis-image-with-mysql-jdbc.sh} (100%) diff --git a/linkis-dist/bin/install-linkis-to-kubernetes.sh b/linkis-dist/bin/install-linkis-to-kubernetes.sh index 44e84e989a..00681b27b9 100644 --- a/linkis-dist/bin/install-linkis-to-kubernetes.sh +++ b/linkis-dist/bin/install-linkis-to-kubernetes.sh @@ -93,7 +93,7 @@ create_kind_cluster(){ } #mysql installation install_mysql(){ - ${ROOT_DIR}/helm/scripts/install-mysql.sh + ${ROOT_DIR}/helm/scripts/install-mysql.sh $USING_KIND } #ldh installation install_ldh(){ diff --git a/linkis-dist/docker/scripts/make-linikis-image-with-mysql-jdbc.sh b/linkis-dist/docker/scripts/make-linkis-image-with-mysql-jdbc.sh similarity index 100% rename from linkis-dist/docker/scripts/make-linikis-image-with-mysql-jdbc.sh rename to linkis-dist/docker/scripts/make-linkis-image-with-mysql-jdbc.sh diff --git a/linkis-dist/docker/scripts/utils.sh b/linkis-dist/docker/scripts/utils.sh index f7813cfe70..8c8c181de0 100755 --- a/linkis-dist/docker/scripts/utils.sh +++ b/linkis-dist/docker/scripts/utils.sh @@ -20,6 +20,8 @@ download() { TAR_FILE=$2 HARD_LINK_ROOT=$3 + mkdir -p ${TAR_CACHE_ROOT} + if [ ! -f ${TAR_CACHE_ROOT}/${TAR_FILE} ]; then echo "- downloading ${TAR_FILE} to ${TAR_CACHE_ROOT} from ${TAR_URL}" curl -L ${TAR_URL} -o ${TAR_CACHE_ROOT}/${TAR_FILE} From 75f23a1e0f658de52cc913f96a525ceb990a0eed Mon Sep 17 00:00:00 2001 From: Kazuto Iris <78157415+kazutoiris@users.noreply.github.com> Date: Tue, 2 Sep 2025 12:37:54 +0800 Subject: [PATCH 04/20] chore: prepare to release 1.8.0 (#5254) Signed-off-by: kazutoiris <78157415+kazutoiris@users.noreply.github.com> --- linkis-dist/deploy-config/linkis-env.sh | 2 +- linkis-dist/docker/ldh.Dockerfile | 2 +- linkis-dist/docker/linkis.Dockerfile | 4 ++-- .../linkis/engineplugin/spark/config/SparkConfiguration.scala | 2 +- linkis-web/.env | 2 +- linkis-web/package.json | 2 +- 6 files changed, 7 insertions(+), 7 deletions(-) diff --git a/linkis-dist/deploy-config/linkis-env.sh b/linkis-dist/deploy-config/linkis-env.sh index cbae216437..f5eed8b7d3 100644 --- a/linkis-dist/deploy-config/linkis-env.sh +++ b/linkis-dist/deploy-config/linkis-env.sh @@ -167,7 +167,7 @@ export SERVER_HEAP_SIZE="512M" ##The extended lib such mysql-connector-java-*.jar #LINKIS_EXTENDED_LIB=/appcom/common/linkisExtendedLib -LINKIS_VERSION=1.7.0 +LINKIS_VERSION=1.8.0 # for install LINKIS_PUBLIC_MODULE=lib/linkis-commons/public-module diff --git a/linkis-dist/docker/ldh.Dockerfile b/linkis-dist/docker/ldh.Dockerfile index 0e6e02d1e1..8a1d64abce 100644 --- a/linkis-dist/docker/ldh.Dockerfile +++ b/linkis-dist/docker/ldh.Dockerfile @@ -34,7 +34,7 @@ ARG SPARK_HADOOP_VERSION=3.2 ARG FLINK_VERSION=1.12.2 ARG ZOOKEEPER_VERSION=3.5.9 -ARG LINKIS_VERSION=1.7.0 +ARG LINKIS_VERSION=1.8.0 RUN useradd -r -s /bin/bash -u 100001 -g root -G wheel hadoop diff --git a/linkis-dist/docker/linkis.Dockerfile b/linkis-dist/docker/linkis.Dockerfile index 21a8e192ac..2fd4df7d91 100644 --- a/linkis-dist/docker/linkis.Dockerfile +++ b/linkis-dist/docker/linkis.Dockerfile @@ -56,7 +56,7 @@ ENV TZ="Asia/Shanghai" ###################################################################### FROM linkis-base as linkis -ARG LINKIS_VERSION=1.7.0 +ARG LINKIS_VERSION=1.8.0 ARG LINKIS_SYSTEM_USER="hadoop" ARG LINKIS_SYSTEM_UID="9001" @@ -106,7 +106,7 @@ ENTRYPOINT ["/bin/bash"] ###################################################################### FROM ${IMAGE_BASE_WEB} as linkis-web -ARG LINKIS_VERSION=1.7.0 +ARG LINKIS_VERSION=1.8.0 ARG LINKIS_HOME=/opt/linkis ENV LINKIS_WEB_ROOT ${LINKIS_HOME}-web diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala index 716e42ffc6..9b0e184b73 100644 --- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala +++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala @@ -121,7 +121,7 @@ object SparkConfiguration extends Logging { val LINKIS_SPARK_USEHIVECONTEXT = CommonVars[Boolean]("wds.linkis.spark.useHiveContext", true) val DEFAULT_SPARK_JAR_NAME = - CommonVars[String]("wds.linkis.ecp.spark.default.jar", "linkis-engineconn-core-1.7.0.jar") + CommonVars[String]("wds.linkis.ecp.spark.default.jar", "linkis-engineconn-core-1.8.0.jar") val ENGINE_JAR = CommonVars[String]("wds.linkis.enginemanager.core.jar", getMainJarName) diff --git a/linkis-web/.env b/linkis-web/.env index ca995c5f50..4660cdface 100644 --- a/linkis-web/.env +++ b/linkis-web/.env @@ -2,4 +2,4 @@ VUE_APP_HOST= BACKEND_URL=http://127.0.0.1:9001 VUE_APP_MN_CONFIG_PREFIX= VUE_APP_MN_CONFIG_SOCKET=/ws/api/entrance/connect -VUE_APP_VERSION=1.7.0 +VUE_APP_VERSION=1.8.0 diff --git a/linkis-web/package.json b/linkis-web/package.json index 55c9da9532..28a1c92a66 100644 --- a/linkis-web/package.json +++ b/linkis-web/package.json @@ -1,6 +1,6 @@ { "name": "linkis", - "version": "1.7.0", + "version": "1.8.0", "private": true, "scripts": { "serve": "vue-cli-service serve", From 76975786f4ff4e8d7bb9276c595e939fb93c405b Mon Sep 17 00:00:00 2001 From: v-kkhuang <62878639+v-kkhuang@users.noreply.github.com> Date: Mon, 22 Sep 2025 17:07:53 +0800 Subject: [PATCH 05/20] support azure (#5214) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * support azure * remove file * add azure conf --------- Co-authored-by: “v_kkhuang” <“420895376@qq.com”> --- linkis-commons/linkis-storage/pom.xml | 12 + .../impl/BuildAzureBlobFileSystem.java | 59 +++ .../storage/fs/impl/AzureBlobFileSystem.java | 401 ++++++++++++++++++ .../storage/utils/StorageConfiguration.scala | 6 +- .../linkis/storage/utils/StorageUtils.scala | 4 +- .../utils/StorageConfigurationTest.scala | 5 + linkis-dist/package/conf/linkis.properties | 6 +- pom.xml | 8 + 8 files changed, 498 insertions(+), 3 deletions(-) create mode 100644 linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/factory/impl/BuildAzureBlobFileSystem.java create mode 100644 linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/AzureBlobFileSystem.java diff --git a/linkis-commons/linkis-storage/pom.xml b/linkis-commons/linkis-storage/pom.xml index 6e04016fa7..8715b97c7a 100644 --- a/linkis-commons/linkis-storage/pom.xml +++ b/linkis-commons/linkis-storage/pom.xml @@ -105,6 +105,18 @@ 1.12.261 + + com.azure + azure-storage-blob + + + com.azure + azure-storage-common + + + com.azure + azure-identity + org.apache.parquet parquet-avro diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/factory/impl/BuildAzureBlobFileSystem.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/factory/impl/BuildAzureBlobFileSystem.java new file mode 100644 index 0000000000..292bb952ed --- /dev/null +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/factory/impl/BuildAzureBlobFileSystem.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.linkis.storage.factory.impl; + +import org.apache.linkis.common.io.Fs; +import org.apache.linkis.storage.factory.BuildFactory; +import org.apache.linkis.storage.fs.impl.AzureBlobFileSystem; +import org.apache.linkis.storage.utils.StorageUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +public class BuildAzureBlobFileSystem implements BuildFactory { + private static final Logger LOG = LoggerFactory.getLogger(BuildAzureBlobFileSystem.class); + + @Override + public Fs getFs(String user, String proxyUser) { + AzureBlobFileSystem fs = new AzureBlobFileSystem(); + try { + fs.init(null); + } catch (IOException e) { + LOG.warn("get file system failed", e); + } + fs.setUser(user); + return fs; + } + + @Override + public Fs getFs(String user, String proxyUser, String label) { + AzureBlobFileSystem fs = new AzureBlobFileSystem(); + try { + fs.init(null); + } catch (IOException e) { + LOG.warn("get file system failed", e); + } + fs.setUser(user); + return fs; + } + + @Override + public String fsName() { + return StorageUtils.BLOB; + } +} diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/AzureBlobFileSystem.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/AzureBlobFileSystem.java new file mode 100644 index 0000000000..67475aecf2 --- /dev/null +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/AzureBlobFileSystem.java @@ -0,0 +1,401 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.storage.fs.impl; + +import com.azure.core.util.polling.SyncPoller; +import com.azure.storage.blob.BlobClient; +import com.azure.storage.blob.BlobContainerClient; +import com.azure.storage.blob.BlobServiceClient; +import com.azure.storage.blob.BlobServiceClientBuilder; +import com.azure.storage.blob.models.BlobCopyInfo; +import com.azure.storage.blob.models.BlobStorageException; +import com.azure.storage.blob.specialized.BlobOutputStream; +import com.azure.storage.blob.specialized.BlockBlobClient; +import org.apache.linkis.common.io.FsPath; +import org.apache.linkis.storage.exception.StorageWarnException; +import org.apache.linkis.storage.fs.FileSystem; +import org.apache.linkis.storage.utils.StorageConfiguration; +import org.apache.linkis.storage.utils.StorageUtils; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.time.Duration; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.apache.linkis.storage.errorcode.LinkisStorageErrorCodeSummary.TO_BE_UNKNOW; +import static org.apache.linkis.storage.utils.StorageUtils.BLOB_SCHEMA; + +public class AzureBlobFileSystem extends FileSystem { + + private static final String SLASH = "/"; + + public static class PahtInfo { + private String schema = "http://"; // http + private String domain; // + private String container; // container name + private String blobName; // blob name + private String tail; + + public PahtInfo(String domain, String container, String blobName) { + this.domain = domain; + this.container = container; + this.blobName = blobName; + if (blobName != null) { + String[] names = blobName.split(SLASH, -1); + tail = names[names.length - 1]; + } + } + + public String toFullName() { + return schema + domain + SLASH + container + SLASH + blobName; + } + + public String getSchema() { + return schema; + } + + public String getDomain() { + return domain; + } + + public String getContainer() { + return container; + } + + public String getBlobName() { + return blobName; + } + + public String getTail() { + return tail; + } + + @Override + public String toString() { + return "PahtInfo{" + + "schema='" + schema + '\'' + + ", domain='" + domain + '\'' + + ", container='" + container + '\'' + + ", blobName='" + blobName + '\'' + + ", tail='" + tail + '\'' + + '}'; + } + } + + /** + * manipulate Azure storage resources and Blob container 管理命名空间下的存储资源和Blob容器 + */ + private BlobServiceClient serviceClient; + + /** + * getBlobContainerClient + * + * @param containerName + * @return client which can manipulate Azure Storage containers and their blobs.
+ * 操作一个容器和其blobs的客户端 + */ + private BlobContainerClient getBlobContainerClient(String containerName) { + return serviceClient.getBlobContainerClient(containerName); + } + + private PahtInfo azureLocation(String path) { + return this.azureLocation(new FsPath(path)); + } + + /** + * @param dest + * @return domain name,container name,blob name + */ + private PahtInfo azureLocation(FsPath dest) { + //https://myaccount.blob.core.windows.net/mycontainer/dir/blobname + // returns myaccount.blob.core.windows.net/mycontainer/dir/blobname + String path = dest.getPath(); + // myaccount.blob.core.windows.net/mycontainer/dir/blobname + // will split to myaccount.blob.core.windows.net + // and mycontainer/dir/blobname + String[] paths = path.split(SLASH, 2); + if (paths.length < 2) { + throw new IllegalArgumentException("file path error,with out container:" + path); + } + // split to container and blob object, + // container/dir/blobname will split to container and dir/blobname + String[] names = paths[1].split(SLASH, 2); + if (names.length < 2) { + return new PahtInfo(paths[0], names[0], null); + } else { + return new PahtInfo(paths[0], names[0], names[1]); + } + } + + /** + * init serviceClient + * + * @param properties + * @throws IOException + */ + @Override + public void init(Map properties) throws IOException { + + /** + * The storage account provides the top-level namespace for the Blob service. 每个账户提供了一个顶级的命名空间 + */ + String acctName = StorageConfiguration.AZURE_ACCT_NAME.getValue(properties); + String connectStr = StorageConfiguration.AZURE_ACCT_CONNECT_STR.getValue(properties); + // Azure SDK client builders accept the credential as a parameter + serviceClient = + new BlobServiceClientBuilder() + .endpoint(BLOB_SCHEMA + acctName + ".blob.core.windows.net/") + .connectionString(connectStr) + .buildClient(); + } + + /** + * name of the fileSystem + * + * @return + */ + @Override + public String fsName() { + return StorageUtils.BLOB; + } + + @Override + public String rootUserName() { + return ""; + } + + /** + * @param dest + * @return + * @throws IOException + */ + @Override + public FsPath get(String dest) throws IOException { + FsPath path = new FsPath(dest); + if (exists(path)) { + return path; + } else { + throw new StorageWarnException( + TO_BE_UNKNOW.getErrorCode(), + "File or folder does not exist or file name is garbled(文件或者文件夹不存在或者文件名乱码)"); + } + } + + /** + * Opens a blob input stream to download the blob. + * + * @param dest + * @return + * @throws BlobStorageException – If a storage service error occurred. + */ + @Override + public InputStream read(FsPath dest) { + PahtInfo result = azureLocation(dest); + BlobClient blobclient = + getBlobContainerClient(result.getContainer()).getBlobClient(result.getBlobName()); + return blobclient.openInputStream(); + } + + /** + * @param dest + * @param overwrite + * @return + * @throws BlobStorageException – If a storage service error occurred. + * @see BlockBlobClient #getBlobOutputStream + */ + @Override + public OutputStream write(FsPath dest, boolean overwrite) { + + PahtInfo result = azureLocation(dest); + BlobClient blobclient = + getBlobContainerClient(result.getContainer()).getBlobClient(result.getBlobName()); + return blobclient.getBlockBlobClient().getBlobOutputStream(overwrite); + } + + /** + * create a blob
+ * 创建一个对象("文件") + * + * @param dest + * @return + * @throws IOException + */ + @Override + public boolean create(String dest) throws IOException { + FsPath path = new FsPath(dest); + if (exists(path)) { + return false; + } + PahtInfo names = this.azureLocation(dest); + // TODO 如果是路径的话后面补一个文件. + if (!names.getTail().contains(".")) { + String tmp = names.toFullName() + SLASH + "_tmp.txt"; + names = this.azureLocation(tmp); + } + BlobContainerClient client = serviceClient.createBlobContainerIfNotExists(names.getContainer()); + try (BlobOutputStream bos = + client.getBlobClient(names.getBlobName()).getBlockBlobClient().getBlobOutputStream()) { + bos.write(1); + bos.flush(); + } + + return true; + } + + /** + * Flat listing 5000 results at a time,without deleted.
+ * 扁平化展示未删除的blob对象,最多5000条 TODO 分页接口,迭代器接口? + * + * @param path + * @return + * @throws IOException + */ + @Override + public List list(FsPath path) throws IOException { + final PahtInfo result = azureLocation(path); + return getBlobContainerClient(result.getContainer()).listBlobs().stream() + // Azure不会返回已删除对象 + .filter(item -> !item.isDeleted()) + .map(item -> { + FsPath tmp = new FsPath(result.toFullName() + SLASH + item.getName()); + // TODO 根据观察使用contentType来区别"对象"和"路径",但文档中没有具体的说明 + if (item.getProperties().getContentType() == null) { + tmp.setIsdir(true); + } + return tmp; + }) + .collect(Collectors.toList()); + } + + @Override + public boolean canRead(FsPath dest) throws IOException { + if (this.exists(dest)) { + return true; + } else { + return false; + } + } + + @Override + public boolean canWrite(FsPath dest) throws IOException { + if (this.exists(dest)) { + return true; + } else { + return false; + } + } + + @Override + public boolean exists(FsPath dest) throws IOException { + PahtInfo file = this.azureLocation(dest); + return getBlobContainerClient(file.getContainer()).getBlobClient(file.getBlobName()).exists(); + } + + @Override + public boolean delete(FsPath dest) throws IOException { + PahtInfo file = this.azureLocation(dest); + return getBlobContainerClient(file.getContainer()).getBlobClient(file.getBlobName()).deleteIfExists(); + } + + @Override + public boolean copy(String origin, String dest) throws IOException { + PahtInfo oriNames = this.azureLocation(origin); + PahtInfo destNames = this.azureLocation(dest); + + BlobClient oriClient = + getBlobContainerClient(oriNames.getContainer()).getBlobClient(oriNames.getBlobName()); + BlockBlobClient destClient = + getBlobContainerClient(destNames.getContainer()) + .getBlobClient(destNames.getBlobName()) + .getBlockBlobClient(); + SyncPoller poller = destClient.beginCopy(oriClient.getBlobUrl(), Duration.ofSeconds(2)); + poller.waitForCompletion(); + return true; + } + + @Override + public boolean renameTo(FsPath oldDest, FsPath newDest) throws IOException { + // 没有事务性保证 + this.copy(oldDest.getPath(), newDest.getPath()); + this.delete(oldDest); + return true; + } + + @Override + public boolean mkdir(FsPath dest) throws IOException { + return this.create(dest.getPath()); + } + + @Override + public boolean mkdirs(FsPath dest) throws IOException { + return this.mkdir(dest); + } + + // 下面这些方法可能都无法支持 + @Override + public String listRoot() throws IOException { + return ""; + } + + @Override + public long getTotalSpace(FsPath dest) throws IOException { + return 0; + } + + @Override + public long getFreeSpace(FsPath dest) throws IOException { + return 0; + } + + @Override + public long getUsableSpace(FsPath dest) throws IOException { + return 0; + } + + @Override + public boolean canExecute(FsPath dest) throws IOException { + return false; + } + + @Override + public boolean setOwner(FsPath dest, String user, String group) throws IOException { + return false; + } + + @Override + public boolean setOwner(FsPath dest, String user) throws IOException { + return false; + } + + @Override + public boolean setGroup(FsPath dest, String group) throws IOException { + return false; + } + + @Override + public boolean setPermission(FsPath dest, String permission) throws IOException { + return false; + } + + @Override + public void close() throws IOException { + } +} diff --git a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/utils/StorageConfiguration.scala b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/utils/StorageConfiguration.scala index c73b00743d..17345c050a 100644 --- a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/utils/StorageConfiguration.scala +++ b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/utils/StorageConfiguration.scala @@ -50,7 +50,8 @@ object StorageConfiguration { val STORAGE_BUILD_FS_CLASSES = CommonVars( "wds.linkis.storage.build.fs.classes", "org.apache.linkis.storage.factory.impl.BuildHDFSFileSystem,org.apache.linkis.storage.factory.impl.BuildLocalFileSystem," + - "org.apache.linkis.storage.factory.impl.BuildOSSSystem,org.apache.linkis.storage.factory.impl.BuildS3FileSystem" + "org.apache.linkis.storage.factory.impl.BuildOSSSystem,org.apache.linkis.storage.factory.impl.BuildS3FileSystem," + + "org.apache.linkis.storage.factory.impl.BuildAzureBlobFileSystem" ) val IS_SHARE_NODE = CommonVars("wds.linkis.storage.is.share.node", true) @@ -117,4 +118,7 @@ object StorageConfiguration { val S3_BUCKET = CommonVars[String]("linkis.storage.s3.bucket", "", null, null) + val AZURE_ACCT_NAME = CommonVars[String]("linkis.storage.azure.acctName", "", null, null) + + val AZURE_ACCT_CONNECT_STR = CommonVars[String]("linkis.storage.azure.connectstr", "", null, null) } diff --git a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/utils/StorageUtils.scala b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/utils/StorageUtils.scala index dd5d8c37ef..a38b0edc4c 100644 --- a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/utils/StorageUtils.scala +++ b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/utils/StorageUtils.scala @@ -39,11 +39,13 @@ object StorageUtils extends Logging { val FILE = "file" val OSS = "oss" val S3 = "s3" + val BLOB = "https" val FILE_SCHEMA = "file://" val HDFS_SCHEMA = "hdfs://" val OSS_SCHEMA = "oss://" val S3_SCHEMA = "s3://" + val BLOB_SCHEMA = "https://" private val nf = NumberFormat.getInstance() nf.setGroupingUsed(false) @@ -202,7 +204,7 @@ object StorageUtils extends Logging { * @return */ def getFsPath(path: String): FsPath = { - if (path.startsWith(FILE_SCHEMA) || path.startsWith(HDFS_SCHEMA)) new FsPath(path) + if (path.startsWith(FILE_SCHEMA) || path.startsWith(HDFS_SCHEMA) || path.startsWith(BLOB_SCHEMA)) new FsPath(path) else { new FsPath(FILE_SCHEMA + path) } diff --git a/linkis-commons/linkis-storage/src/test/scala/org/apache/linkis/storage/utils/StorageConfigurationTest.scala b/linkis-commons/linkis-storage/src/test/scala/org/apache/linkis/storage/utils/StorageConfigurationTest.scala index 4d21655ebd..a821038005 100644 --- a/linkis-commons/linkis-storage/src/test/scala/org/apache/linkis/storage/utils/StorageConfigurationTest.scala +++ b/linkis-commons/linkis-storage/src/test/scala/org/apache/linkis/storage/utils/StorageConfigurationTest.scala @@ -60,6 +60,11 @@ class StorageConfigurationTest { "txt.TextResultSet,table.TableResultSet,io.IOResultSet,html.HtmlResultSet,picture.PictureResultSet", storageresultsetclasses ) + Assertions.assertEquals( + "org.apache.linkis.storage.factory.impl.BuildHDFSFileSystem,org.apache.linkis.storage.factory.impl.BuildLocalFileSystem," + + "org.apache.linkis.storage.factory.impl.BuildOSSSystem,org.apache.linkis.storage.factory.impl.BuildS3FileSystem", + storagebuildfsclasses + ) Assertions.assertTrue(issharenode) Assertions.assertFalse(enableioproxy) Assertions.assertEquals("root", ioUser) diff --git a/linkis-dist/package/conf/linkis.properties b/linkis-dist/package/conf/linkis.properties index ae30dce4a6..7b0a9e7d56 100644 --- a/linkis-dist/package/conf/linkis.properties +++ b/linkis-dist/package/conf/linkis.properties @@ -120,4 +120,8 @@ linkis.storage.s3.access.key= linkis.storage.s3.secret.key= linkis.storage.s3.endpoint= linkis.storage.s3.region= -linkis.storage.s3.bucket= \ No newline at end of file +linkis.storage.s3.bucket= + +# azure file system +linkis.storage.azure.acctName= +linkis.storage.azure.connectstr= diff --git a/pom.xml b/pom.xml index e70cf06919..0d824dde84 100644 --- a/pom.xml +++ b/pom.xml @@ -227,6 +227,7 @@ 2021.0.8 2021.0.6.0 3.1.7 + 1.2.30 UTF-8 @@ -1378,6 +1379,13 @@ spring-cloud-starter-alibaba-nacos-discovery ${spring-cloud-alibaba.version}
+ + com.azure + azure-sdk-bom + ${azure.blob.bom} + pom + import + From 7d58d354f62bf6edf7418d4354a1a625efd62c89 Mon Sep 17 00:00:00 2001 From: Kazuto Iris <78157415+kazutoiris@users.noreply.github.com> Date: Sun, 28 Sep 2025 13:39:32 +0800 Subject: [PATCH 06/20] Add OAuth2 authentication support (#5253) * feat(mg-gateway): add OAuth2 authentication support - Add OAuth2 authentication configuration to GatewayConfiguration - Implement OAuth2Authentication - Update `SecurityFilter` and `UserRestful` to process OAuth2 request Signed-off-by: kazutoiris <78157415+kazutoiris@users.noreply.github.com> * feat(mg-gateway): add OAuth configuration - Add OAuth-related properties to `linkis-mg-gateway.properties` - Include support for GitHub OAuth as an example Signed-off-by: kazutoiris <78157415+kazutoiris@users.noreply.github.com> * style: reformat code Signed-off-by: kazutoiris <78157415+kazutoiris@users.noreply.github.com> * feat(mg-gateway): add OAuth in frontend - Add OAuth login option to the login page - Implement OAuth callback route and component - Add translations for OAuth login text Signed-off-by: kazutoiris <78157415+kazutoiris@users.noreply.github.com> * docs: add OAuth authentication documentation --------- Signed-off-by: kazutoiris <78157415+kazutoiris@users.noreply.github.com> --- docs/configuration/linkis-gateway-core.md | 8 + .../package/conf/linkis-mg-gateway.properties | 9 + .../gateway/config/GatewayConfiguration.scala | 9 + .../gateway/security/SecurityFilter.scala | 3 + .../linkis/gateway/security/UserRestful.scala | 15 + .../security/oauth/OAuth2Authentication.scala | 340 ++++++++++++++++++ linkis-web/src/common/i18n/en.json | 1 + linkis-web/src/common/i18n/zh.json | 1 + linkis-web/src/dss/router.js | 10 + linkis-web/src/dss/view/login/index.vue | 21 +- .../src/dss/view/login/oauthCallback.vue | 55 +++ 11 files changed, 471 insertions(+), 1 deletion(-) create mode 100644 linkis-spring-cloud-services/linkis-service-gateway/linkis-gateway-core/src/main/scala/org/apache/linkis/gateway/security/oauth/OAuth2Authentication.scala create mode 100644 linkis-web/src/dss/view/login/oauthCallback.vue diff --git a/docs/configuration/linkis-gateway-core.md b/docs/configuration/linkis-gateway-core.md index be933b2a26..5a4f55a3d1 100644 --- a/docs/configuration/linkis-gateway-core.md +++ b/docs/configuration/linkis-gateway-core.md @@ -36,3 +36,11 @@ |linkis-gateway-core|wds.linkis.gateway.this.schema| | gateway.this.schema| |linkis-gateway-core|wds.linkis.web.enable.water.mark|true| web.enable.water.mark| |linkis-gateway-core|wds.linkis.entrance.name| |linkis.entrance.name| +|linkis-gateway-core|wds.linkis.gateway.conf.enable.oauth.auth| false |wds.linkis.gateway.conf.enable.oauth.auth| +|linkis-gateway-core|wds.linkis.gateway.auth.oauth.authentication.url| |wds.linkis.gateway.auth.oauth.authentication.url| +|linkis-gateway-core|wds.linkis.gateway.auth.oauth.exchange.url| |wds.linkis.gateway.auth.oauth.exchange.url| +|linkis-gateway-core|wds.linkis.gateway.auth.oauth.validate.url| |wds.linkis.gateway.auth.oauth.validate.url| +|linkis-gateway-core|wds.linkis.gateway.auth.oauth.validate.field| |wds.linkis.gateway.auth.oauth.validate.field| +|linkis-gateway-core|wds.linkis.gateway.auth.oauth.client.id| |wds.linkis.gateway.auth.oauth.client.id| +|linkis-gateway-core|wds.linkis.gateway.auth.oauth.client.secret| |wds.linkis.gateway.auth.oauth.client.secret| +|linkis-gateway-core|wds.linkis.gateway.auth.oauth.scope| |wds.linkis.gateway.auth.oauth.scope| diff --git a/linkis-dist/package/conf/linkis-mg-gateway.properties b/linkis-dist/package/conf/linkis-mg-gateway.properties index 1f1d2416b4..0e4275677c 100644 --- a/linkis-dist/package/conf/linkis-mg-gateway.properties +++ b/linkis-dist/package/conf/linkis-mg-gateway.properties @@ -30,6 +30,15 @@ wds.linkis.ldap.proxy.baseDN= wds.linkis.ldap.proxy.userNameFormat= wds.linkis.admin.user=hadoop #wds.linkis.admin.password= +##OAuth +wds.linkis.oauth.enable=false +wds.linkis.oauth.url=https://github.com/login/oauth/authorize +wds.linkis.gateway.auth.oauth.exchange.url=https://github.com/login/oauth/access_token +wds.linkis.gateway.auth.oauth.validate.url=https://api.github.com/user +wds.linkis.gateway.auth.oauth.validate.field=login +wds.linkis.gateway.auth.oauth.client.id=YOUR_CLIENT_ID +wds.linkis.gateway.auth.oauth.client.secret=YOUR_CLIENT_SECRET +wds.linkis.gateway.auth.oauth.scope=user ##Spring spring.server.port=9001 diff --git a/linkis-spring-cloud-services/linkis-service-gateway/linkis-gateway-core/src/main/scala/org/apache/linkis/gateway/config/GatewayConfiguration.scala b/linkis-spring-cloud-services/linkis-service-gateway/linkis-gateway-core/src/main/scala/org/apache/linkis/gateway/config/GatewayConfiguration.scala index 5fc80d7afc..ccb7325b57 100644 --- a/linkis-spring-cloud-services/linkis-service-gateway/linkis-gateway-core/src/main/scala/org/apache/linkis/gateway/config/GatewayConfiguration.scala +++ b/linkis-spring-cloud-services/linkis-service-gateway/linkis-gateway-core/src/main/scala/org/apache/linkis/gateway/config/GatewayConfiguration.scala @@ -42,6 +42,15 @@ object GatewayConfiguration { val TOKEN_AUTHENTICATION_SCAN_INTERVAL = CommonVars("wds.linkis.gateway.conf.token.auth.scan.interval", 1000 * 60 * 10) + val ENABLE_OAUTH_AUTHENTICATION = CommonVars("wds.linkis.gateway.conf.enable.oauth.auth", false) + val OAUTH_AUTHENTICATION_URL = CommonVars("wds.linkis.gateway.auth.oauth.authentication.url", "") + val OAUTH_EXCHANGE_URL = CommonVars("wds.linkis.gateway.auth.oauth.exchange.url", "") + val OAUTH_VALIDATE_URL = CommonVars("wds.linkis.gateway.auth.oauth.validate.url", "") + val OAUTH_VALIDATE_FIELD = CommonVars("wds.linkis.gateway.auth.oauth.validate.field", "") + val OAUTH_CLIENT_ID = CommonVars("wds.linkis.gateway.auth.oauth.client.id", "") + val OAUTH_CLIENT_SECRET = CommonVars("wds.linkis.gateway.auth.oauth.client.secret", "") + val OAUTH_SCOPE = CommonVars("wds.linkis.gateway.auth.oauth.scope", "") + val PASS_AUTH_REQUEST_URI = CommonVars("wds.linkis.gateway.conf.url.pass.auth", "/dws/").getValue.split(",") diff --git a/linkis-spring-cloud-services/linkis-service-gateway/linkis-gateway-core/src/main/scala/org/apache/linkis/gateway/security/SecurityFilter.scala b/linkis-spring-cloud-services/linkis-service-gateway/linkis-gateway-core/src/main/scala/org/apache/linkis/gateway/security/SecurityFilter.scala index 150ae565ef..9f170e9dd2 100644 --- a/linkis-spring-cloud-services/linkis-service-gateway/linkis-gateway-core/src/main/scala/org/apache/linkis/gateway/security/SecurityFilter.scala +++ b/linkis-spring-cloud-services/linkis-service-gateway/linkis-gateway-core/src/main/scala/org/apache/linkis/gateway/security/SecurityFilter.scala @@ -23,6 +23,7 @@ import org.apache.linkis.common.utils.{Logging, Utils} import org.apache.linkis.gateway.config.GatewayConfiguration import org.apache.linkis.gateway.config.GatewayConfiguration._ import org.apache.linkis.gateway.http.GatewayContext +import org.apache.linkis.gateway.security.oauth.OAuth2Authentication import org.apache.linkis.gateway.security.sso.SSOInterceptor import org.apache.linkis.gateway.security.token.TokenAuthentication import org.apache.linkis.server.{validateFailed, Message} @@ -127,6 +128,8 @@ object SecurityFilter extends Logging { logger.info("No login needed for proxy uri: " + gatewayContext.getRequest.getRequestURI) } else if (TokenAuthentication.isTokenRequest(gatewayContext)) { TokenAuthentication.tokenAuth(gatewayContext) + } else if (OAuth2Authentication.isOAuth2Request(gatewayContext)) { + OAuth2Authentication.OAuth2Entry(gatewayContext) } else { val userName = Utils.tryCatch(GatewaySSOUtils.getLoginUser(gatewayContext)) { case n @ (_: NonLoginException | _: LoginExpireException) => diff --git a/linkis-spring-cloud-services/linkis-service-gateway/linkis-gateway-core/src/main/scala/org/apache/linkis/gateway/security/UserRestful.scala b/linkis-spring-cloud-services/linkis-service-gateway/linkis-gateway-core/src/main/scala/org/apache/linkis/gateway/security/UserRestful.scala index 38d06b6b17..e79296c564 100644 --- a/linkis-spring-cloud-services/linkis-service-gateway/linkis-gateway-core/src/main/scala/org/apache/linkis/gateway/security/UserRestful.scala +++ b/linkis-spring-cloud-services/linkis-service-gateway/linkis-gateway-core/src/main/scala/org/apache/linkis/gateway/security/UserRestful.scala @@ -20,6 +20,7 @@ package org.apache.linkis.gateway.security import org.apache.linkis.common.utils.{Logging, RSAUtils, Utils} import org.apache.linkis.gateway.config.GatewayConfiguration import org.apache.linkis.gateway.http.GatewayContext +import org.apache.linkis.gateway.security.oauth.OAuth2Authentication import org.apache.linkis.gateway.security.sso.SSOInterceptor import org.apache.linkis.gateway.security.token.TokenAuthentication import org.apache.linkis.protocol.usercontrol.{ @@ -87,6 +88,20 @@ abstract class AbstractUserRestful extends UserRestful with Logging { TokenAuthentication.tokenAuth(gatewayContext, true) return } + case "oauth-login" => + Utils.tryCatch { + val loginUser = GatewaySSOUtils.getLoginUsername(gatewayContext) + Message + .ok(loginUser + " already logged in, please log out before signing in(已经登录,请先退出再进行登录)!") + .data("userName", loginUser) + }(_ => { + OAuth2Authentication.OAuth2Auth(gatewayContext, true) + return + }) + case "oauth-redirect" => { + OAuth2Authentication.OAuth2Redirect(gatewayContext) + return + } case "logout" => logout(gatewayContext) case "userInfo" => userInfo(gatewayContext) case "publicKey" => publicKey(gatewayContext) diff --git a/linkis-spring-cloud-services/linkis-service-gateway/linkis-gateway-core/src/main/scala/org/apache/linkis/gateway/security/oauth/OAuth2Authentication.scala b/linkis-spring-cloud-services/linkis-service-gateway/linkis-gateway-core/src/main/scala/org/apache/linkis/gateway/security/oauth/OAuth2Authentication.scala new file mode 100644 index 0000000000..c62ab5b3be --- /dev/null +++ b/linkis-spring-cloud-services/linkis-service-gateway/linkis-gateway-core/src/main/scala/org/apache/linkis/gateway/security/oauth/OAuth2Authentication.scala @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.gateway.security.oauth + +import org.apache.linkis.common.exception.LinkisCommonErrorException +import org.apache.linkis.common.utils.{Logging, Utils} +import org.apache.linkis.gateway.config.GatewayConfiguration +import org.apache.linkis.gateway.config.GatewayConfiguration._ +import org.apache.linkis.gateway.http.GatewayContext +import org.apache.linkis.gateway.security.{GatewaySSOUtils, SecurityFilter} +import org.apache.linkis.server.Message +import org.apache.linkis.server.conf.ServerConfiguration + +import org.apache.commons.io.IOUtils +import org.apache.commons.lang3.StringUtils + +import java.io.IOException +import java.net.{HttpURLConnection, URL} + +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.scala.DefaultScalaModule + +object OAuth2Authentication extends Logging { + + private val objectMapper = new ObjectMapper() + objectMapper.registerModule(DefaultScalaModule) + + def isOAuth2Request(gatewayContext: GatewayContext): Boolean = { + val path = getMethod(gatewayContext) + path == "oauth_login" || path == "oauth_redirect" + } + + def OAuth2Entry(gatewayContext: GatewayContext, login: Boolean = false): Boolean = { + val path = getMethod(gatewayContext) + if (path == "oauth_redirect") { + OAuth2Redirect(gatewayContext) + } else if (path == "oauth_redirect") { + OAuth2Auth(gatewayContext, login) + } else { + val message = + Message.noLogin(s"未知 OAuth 请求") << gatewayContext.getRequest.getRequestURI + SecurityFilter.filterResponse(gatewayContext, message) + false + } + } + + private def getMethod(gatewayContext: GatewayContext) = { + var userURI = ServerConfiguration.BDP_SERVER_USER_URI.getValue + if (!userURI.endsWith("/")) userURI += "/" + val path = gatewayContext.getRequest.getRequestURI.replace(userURI, "") + path + } + + def OAuth2Redirect(gatewayContext: GatewayContext): Boolean = { + if (!ENABLE_OAUTH_AUTHENTICATION.getValue) { + val message = + Message.noLogin( + s"Gateway 未启用 OAuth 认证,请采用其他认证方式!" + ) << gatewayContext.getRequest.getRequestURI + SecurityFilter.filterResponse(gatewayContext, message) + return false + } + val message = + Message.ok("创建链接成功!").data("redirectUrl", generateAuthenticationUrl()) + SecurityFilter.filterResponse(gatewayContext, message) + true + } + + /** + * 生成OAuth认证的URL + * + * @note + * 认证完成回调链接需要在认证服务器上进行配置 + * @return + */ + private def generateAuthenticationUrl(): String = { + var oauthServerUrl = + s"${OAUTH_AUTHENTICATION_URL.getValue}?client_id=${OAUTH_CLIENT_ID.getValue}&response_type=code" + if (StringUtils.isNotBlank(OAUTH_SCOPE.getValue)) { + oauthServerUrl += s"&scope=${OAUTH_SCOPE.getValue}" + } + oauthServerUrl + } + + def OAuth2Auth(gatewayContext: GatewayContext, login: Boolean = false): Boolean = { + if (!ENABLE_OAUTH_AUTHENTICATION.getValue) { + val message = + Message.noLogin( + s"Gateway 未启用 OAuth 认证,请采用其他认证方式!" + ) << gatewayContext.getRequest.getRequestURI + SecurityFilter.filterResponse(gatewayContext, message) + return false + } + + val code = extractCode(gatewayContext) + val host = gatewayContext.getRequest.getRequestRealIpAddr() + + if (StringUtils.isBlank(code)) { + val message = + Message.noLogin(s"请在回调查询参数中返回code,以便完成OAuth认证!") << gatewayContext.getRequest.getRequestURI + SecurityFilter.filterResponse(gatewayContext, message) + return false + } + + var authMsg: Message = + Message.noLogin(s"无效的访问令牌 $code,无法完成 OAuth 认证!") << gatewayContext.getRequest.getRequestURI + + val accessToken = Utils.tryCatch(exchangeAccessToken(code, host))(t => { + authMsg = Message.noLogin( + s"OAuth exchange failed, code: $code, reason: ${t.getMessage}" + ) << gatewayContext.getRequest.getRequestURI + null + }) + + if (StringUtils.isNotBlank(accessToken)) { + val username = validateAccessToken(accessToken, host) + logger.info( + s"OAuth authentication succeed, uri: ${gatewayContext.getRequest.getRequestURI}, accessToken: $accessToken, username: $username." + ) + + if (login) { + GatewaySSOUtils.setLoginUser(gatewayContext, username) + val msg = + Message + .ok("login successful(登录成功)!") + .data("userName", username) + .data("enableWatermark", GatewayConfiguration.ENABLE_WATER_MARK.getValue) + .data("isAdmin", false) + SecurityFilter.filterResponse(gatewayContext, msg) + return true + } + + GatewaySSOUtils.setLoginUser(gatewayContext.getRequest, username) + true + } else { + logger.info( + s"OAuth exchange fail, uri: ${gatewayContext.getRequest.getRequestURI}, code: $code, host: $host." + ) + SecurityFilter.filterResponse(gatewayContext, authMsg) + false + } + } + + private def extractCode(gatewayContext: GatewayContext): String = { + Utils.tryCatch(gatewayContext.getRequest.getQueryParams.get("code")(0))(_ => null) + } + + /** + * 验证访问码的有效性并获取访问令牌 + * + * @param code + * 访问码 + * @param host + * 客户端主机 + * @return + * 访问令牌 + */ + private def exchangeAccessToken(code: String, host: String): String = { + val exchangeUrl = OAUTH_EXCHANGE_URL.getValue + + if (StringUtils.isBlank(exchangeUrl)) { + logger.warn(s"OAuth exchange url is not set") + } + if (StringUtils.isBlank(code)) { + logger.warn(s"OAuth exchange code is empty") + } + + Utils.tryCatch({ + val response = HttpUtils.post( + exchangeUrl, + data = objectMapper.writeValueAsString( + Map( + "client_id" -> OAUTH_CLIENT_ID.getValue, + "client_secret" -> OAUTH_CLIENT_SECRET.getValue, + "code" -> code, + "host" -> host + ) + ) + ) + objectMapper.readValue(response, classOf[Map[String, String]]).get("access_token").orNull + })(t => { + logger.warn(s"OAuth exchange failed, url: $exchangeUrl, reason: ${t.getMessage}") + null + }) + } + + /** + * 验证访问令牌的有效性并兑换用户名 + * + * @param accessToken + * 访问令牌 + * @param host + * 客户端主机 + * @return + * 用户名 + */ + private def validateAccessToken(accessToken: String, host: String): String = { + val url = OAUTH_VALIDATE_URL.getValue + + if (StringUtils.isBlank(url)) { + logger.warn(s"OAuth validate url is not set") + } + + if (StringUtils.isBlank(accessToken)) { + logger.warn(s"OAuth validate accessToken is empty") + } + + Utils.tryCatch({ + val response = HttpUtils.get(url, headers = Map("Authorization" -> s"Bearer $accessToken")) + objectMapper + .readValue(response, classOf[Map[String, String]]) + .get(OAUTH_VALIDATE_FIELD.getValue) + .orNull + })(t => { + logger.warn(s"OAuth validate failed, url: $url, reason: ${t.getMessage}") + null + }) + } + +} + +object HttpUtils extends Logging { + + def get( + url: String, + headers: Map[String, String] = Map.empty, + params: Map[String, String] = Map.empty + ): String = { + Utils.tryCatch { + val fullUrl = url + (if (params.nonEmpty) { + "?" + params.map { case (key, value) => s"$key=$value" }.mkString("&") + } else { + "" + }) + val connection = new URL(fullUrl).openConnection().asInstanceOf[HttpURLConnection] + connection.setRequestMethod("GET") + + headers.foreach { case (key, value) => + connection.setRequestProperty(key, value) + } + + if (!headers.contains("Accept")) { + connection.setRequestProperty("Accept", "application/json") + } + + val responseCode = connection.getResponseCode + if (!(responseCode >= 200 && responseCode < 300)) { + throw new IOException(s"HTTP GET request failed for URL: $url - $responseCode") + } + + val inputStream = connection.getInputStream + + try { + IOUtils.toString(inputStream, "UTF-8") + } finally { + inputStream.close() + connection.disconnect() + } + } { t => + logger.warn(s"Failed to execute HTTP GET request to $url", t) + throw new LinkisCommonErrorException( + 0, + s"HTTP GET request failed for URL: $url, reason: ${t.getMessage}" + ) + } + } + + def post(url: String, data: String, headers: Map[String, String] = Map.empty): String = { + Utils.tryCatch { + val connection = new URL(url).openConnection().asInstanceOf[HttpURLConnection] + try { + connection.setRequestMethod("POST") + connection.setDoOutput(true) + connection.setDoInput(true) + + headers.foreach { case (key, value) => + connection.setRequestProperty(key, value) + } + + if (!headers.contains("Content-Type")) { + connection.setRequestProperty("Content-Type", "application/json; charset=UTF-8") + } + + if (!headers.contains("Accept")) { + connection.setRequestProperty("Accept", "application/json") + } + + if (data != null && data.nonEmpty) { + val outputStream = connection.getOutputStream + try { + IOUtils.write(data, outputStream, "UTF-8") + } finally { + outputStream.close() + } + } + + val responseCode = connection.getResponseCode + if (!(responseCode >= 200 && responseCode < 300)) { + throw new IOException(s"HTTP POST request failed for URL: $url - $responseCode") + } + + val inputStream = connection.getInputStream + + try { + if (inputStream != null) { + IOUtils.toString(inputStream, "UTF-8") + } else { + "" + } + } finally { + if (inputStream != null) inputStream.close() + } + } finally { + connection.disconnect() + } + } { t => + logger.warn(s"Failed to execute HTTP POST request to $url", t) + throw new LinkisCommonErrorException( + 0, + s"HTTP POST request failed for URL: $url, reason: ${t.getMessage}" + ) + } + } + +} diff --git a/linkis-web/src/common/i18n/en.json b/linkis-web/src/common/i18n/en.json index aac078b18a..23b21bca44 100644 --- a/linkis-web/src/common/i18n/en.json +++ b/linkis-web/src/common/i18n/en.json @@ -265,6 +265,7 @@ "userName": "Please enter your username", "remenber": "Remember me", "login": "Login", + "oauthLogin": "OAuth Login", "passwordHint": "Please enter your password", "password": "Please enter password!", "loginSuccess": "Login Success", diff --git a/linkis-web/src/common/i18n/zh.json b/linkis-web/src/common/i18n/zh.json index 688153101e..cc4c24e0c2 100644 --- a/linkis-web/src/common/i18n/zh.json +++ b/linkis-web/src/common/i18n/zh.json @@ -266,6 +266,7 @@ "userName": "请输入用户名", "remenber": "记住当前用户", "login": "登录", + "oauthLogin": "OAuth 登录", "passwordHint": "请输入密码!", "loginSuccess": "登录成功", "haveLogin": "您已经登录,请不要重复登录", diff --git a/linkis-web/src/dss/router.js b/linkis-web/src/dss/router.js index 01b5ede649..bac6af2994 100644 --- a/linkis-web/src/dss/router.js +++ b/linkis-web/src/dss/router.js @@ -61,6 +61,16 @@ export default [ component: () => import('./view/login/index.vue'), }, + { + path: '/login/oauth/callback', + name: 'OAuthCallback', + meta: { + title: 'OAuthCallback', + publicPage: true, + }, + component: () => + import('./view/login/oauthCallback.vue'), + }, // Public pages, not subject to permission control(公用页面,不受权限控制) { path: '/500', diff --git a/linkis-web/src/dss/view/login/index.vue b/linkis-web/src/dss/view/login/index.vue index 81c6af0bdb..c3ec243b21 100644 --- a/linkis-web/src/dss/view/login/index.vue +++ b/linkis-web/src/dss/view/login/index.vue @@ -20,7 +20,7 @@ class="login" @keyup.enter.stop.prevent="handleSubmit('loginForm')">