From aa369d00377a496b545f894c68717ae0b280f54f Mon Sep 17 00:00:00 2001 From: crwen <1543720935@qq.com> Date: Sat, 6 Sep 2025 18:13:29 +0800 Subject: [PATCH 1/8] feat: add read/write/delete/list support for opfs --- core/Cargo.toml | 9 +- core/src/services/mod.rs | 2 +- core/src/services/opfs/backend.rs | 80 ++++++++++--- core/src/services/opfs/builder.rs | 66 +++++++++++ core/src/services/opfs/config.rs | 7 +- core/src/services/opfs/core.rs | 188 ++++++++++++++++++++++++------ core/src/services/opfs/delete.rs | 66 +++++++++++ core/src/services/opfs/docs.md | 10 +- core/src/services/opfs/error.rs | 15 ++- core/src/services/opfs/lister.rs | 178 ++++++++++++++++++++++++++++ core/src/services/opfs/mod.rs | 14 ++- core/src/services/opfs/reader.rs | 139 ++++++++++++++++++++++ core/src/services/opfs/utils.rs | 23 +--- core/src/services/opfs/writer.rs | 161 +++++++++++++++++++++++++ core/src/types/scheme.rs | 6 + 15 files changed, 881 insertions(+), 83 deletions(-) create mode 100644 core/src/services/opfs/builder.rs create mode 100644 core/src/services/opfs/delete.rs create mode 100644 core/src/services/opfs/lister.rs create mode 100644 core/src/services/opfs/reader.rs create mode 100644 core/src/services/opfs/writer.rs diff --git a/core/Cargo.toml b/core/Cargo.toml index d7de30dc9260..9ef014a570dd 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -369,16 +369,23 @@ js-sys = { version = "0.3.77", optional = true } wasm-bindgen = { version = "0.2.100", optional = true } wasm-bindgen-futures = { version = "0.4.50", optional = true } web-sys = { version = "0.3.77", optional = true, features = [ + "Blob", "Window", "File", + "FileSystemCreateWritableOptions", "FileSystemDirectoryHandle", "FileSystemFileHandle", "FileSystemGetDirectoryOptions", "FileSystemGetFileOptions", + "FileSystemHandle", + "FileSystemHandleKind", + "FileSystemRemoveOptions", "FileSystemWritableFileStream", "Navigator", + "ReadableStream", + "ReadableStreamDefaultReader", + "ReadableStreamReadResult", "StorageManager", - "FileSystemGetFileOptions", ] } # Layers diff --git a/core/src/services/mod.rs b/core/src/services/mod.rs index a78fd9a17b2a..2a54f2490e6a 100644 --- a/core/src/services/mod.rs +++ b/core/src/services/mod.rs @@ -202,5 +202,5 @@ pub use webhdfs::*; mod yandex_disk; pub use yandex_disk::*; -#[cfg(target_arch = "wasm32")] mod opfs; +pub use opfs::*; diff --git a/core/src/services/opfs/backend.rs b/core/src/services/opfs/backend.rs index c989b6f01d07..cd8a4bb9f8f9 100644 --- a/core/src/services/opfs/backend.rs +++ b/core/src/services/opfs/backend.rs @@ -17,34 +17,88 @@ use std::fmt::Debug; use std::sync::Arc; -use web_sys::FileSystemGetDirectoryOptions; -use super::utils::*; -use crate::raw::*; +use super::core::OpfsCore; +use super::reader::OpfsReader; +use super::writer::OpfsWriter; +use crate::raw::oio::OneShotDeleter; +use crate::services::opfs::delete::OpfsDeleter; +use crate::services::opfs::lister::OpfsLister; use crate::Result; +use crate::{raw::*, Capability}; /// OPFS Service backend -#[derive(Default, Debug, Clone)] -pub struct OpfsBackend {} +#[derive(Debug, Clone)] +pub struct OpfsBackend { + core: Arc, +} + +impl OpfsBackend { + pub(crate) fn new(core: Arc) -> Self { + Self { core } + } +} impl Access for OpfsBackend { - type Reader = (); + type Reader = OpfsReader; - type Writer = (); + type Writer = OpfsWriter; - type Lister = (); + type Lister = OpfsLister; - type Deleter = (); + type Deleter = OneShotDeleter; fn info(&self) -> Arc { - Arc::new(AccessorInfo::default()) + let info = AccessorInfo::default(); + info.set_native_capability(Capability { + stat: true, + + read: true, + + write: true, + write_can_empty: true, + write_can_append: true, + write_can_multi: true, + // write_with_if_not_exists: true, + create_dir: true, + delete: true, + + list: true, + + ..Default::default() + }); + Arc::new(info) + } + + async fn stat(&self, path: &str, _: OpStat) -> Result { + let metadata = self.core.opfs_stat(path).await?; + Ok(RpStat::new(metadata)) } async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result { - let opt = FileSystemGetDirectoryOptions::new(); - opt.set_create(true); - get_directory_handle(path, &opt).await?; + self.core.opfs_create_dir(path).await?; Ok(RpCreateDir::default()) } + + async fn write(&self, path: &str, op: OpWrite) -> Result<(RpWrite, Self::Writer)> { + let writer = OpfsWriter::new(self.core.clone(), path, op).await?; + Ok((RpWrite::default(), writer)) + } + + async fn read(&self, path: &str, op: OpRead) -> Result<(RpRead, Self::Reader)> { + let reader = OpfsReader::new(self.core.clone(), path, &op).await?; + Ok((RpRead::default(), reader)) + } + + async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> { + let deleter = OneShotDeleter::new(OpfsDeleter::new(self.core.clone())); + Ok((RpDelete::default(), deleter)) + } + + async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Lister)> { + // TODO: list all entries recursively + let lister = OpfsLister::new(self.core.clone(), path).await?; + Ok((RpList::default(), lister)) + } } diff --git a/core/src/services/opfs/builder.rs b/core/src/services/opfs/builder.rs new file mode 100644 index 000000000000..e01b91bea48f --- /dev/null +++ b/core/src/services/opfs/builder.rs @@ -0,0 +1,66 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use crate::{raw::Access, Builder, Configurator, Error, ErrorKind, Result}; + +use super::{backend::OpfsBackend, config::OpfsConfig, core::OpfsCore}; + +impl Configurator for OpfsConfig { + type Builder = OpfsBuilder; + + fn into_builder(self) -> Self::Builder { + OpfsBuilder::new(self) + } +} + +#[derive(Default)] +pub struct OpfsBuilder { + config: OpfsConfig, +} + +impl OpfsBuilder { + pub(crate) fn new(config: OpfsConfig) -> Self { + Self { config } + } + + /// Set root for backend. + pub fn root(mut self, root: &str) -> Self { + self.config.root = if root.is_empty() { + None + } else { + Some(root.to_string()) + }; + + self + } +} + +impl Builder for OpfsBuilder { + type Config = OpfsConfig; + + fn build(self) -> Result { + let root = self.config.root.ok_or( + Error::new(ErrorKind::ConfigInvalid, "root is not specified") + .with_operation("Builder::build"), + )?; + + let core = Arc::new(OpfsCore::new(root)); + Ok(OpfsBackend::new(core)) + } +} diff --git a/core/src/services/opfs/config.rs b/core/src/services/opfs/config.rs index 7bf9382337c6..78e49d6e637f 100644 --- a/core/src/services/opfs/config.rs +++ b/core/src/services/opfs/config.rs @@ -19,7 +19,10 @@ use serde::Deserialize; use serde::Serialize; /// Config for OPFS. -#[derive(Default, Serialize, Deserialize, Clone, PartialEq, Eq)] +#[derive(Default, Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] #[serde(default)] #[non_exhaustive] -pub struct OPFSConfig {} +pub struct OpfsConfig { + /// root dir for backend + pub root: Option, +} diff --git a/core/src/services/opfs/core.rs b/core/src/services/opfs/core.rs index 643af84bd95c..a7f6180e520a 100644 --- a/core/src/services/opfs/core.rs +++ b/core/src/services/opfs/core.rs @@ -17,58 +17,178 @@ use std::fmt::Debug; +use js_sys::AsyncIterator; use wasm_bindgen::JsCast; use wasm_bindgen_futures::JsFuture; use web_sys::File; -use web_sys::FileSystemWritableFileStream; +use web_sys::FileSystemDirectoryHandle; +use web_sys::FileSystemFileHandle; +use web_sys::FileSystemGetDirectoryOptions; +use web_sys::FileSystemGetFileOptions; -use crate::Error; +use crate::raw::build_abs_path; +use crate::raw::parse_datetime_from_from_timestamp_millis; +use crate::EntryMode; +use crate::Metadata; use crate::Result; use super::error::*; use super::utils::*; -#[derive(Default, Debug)] -pub struct OpfsCore {} +#[derive(Debug)] +pub struct OpfsCore { + root: String, +} impl OpfsCore { - #[allow(unused)] - async fn store_file(&self, file_name: &str, content: &[u8]) -> Result<(), Error> { - let handle = get_handle_by_filename(file_name).await?; + pub(crate) fn new(root: String) -> Self { + Self { root } + } - let writable: FileSystemWritableFileStream = JsFuture::from(handle.create_writable()) - .await - .and_then(JsCast::dyn_into) - .map_err(parse_js_error)?; - - // QuotaExceeded or NotAllowed - JsFuture::from( - writable - .write_with_u8_array(content) - .map_err(parse_js_error)?, - ) - .await - .map_err(parse_js_error)?; - - JsFuture::from(writable.close()) - .await - .map_err(parse_js_error)?; + pub(crate) fn path(&self, path: &str) -> String { + build_abs_path(&self.root, path) + } + + /// Build write path and ensure the parent dirs created + pub(crate) async fn ensure_write_path(&self, path: &str) -> Result<()> { + let opt = FileSystemGetDirectoryOptions::new(); + opt.set_create(true); + + let path = build_abs_path(&self.root, path); + let path = path + .trim_end_matches('/') + .rsplit_once('/') + .map(|s| s.0) + .unwrap_or("/"); + + get_directory_handle(path, &opt).await?; Ok(()) } - #[allow(unused)] - async fn read_file(&self, file_name: &str) -> Result, Error> { - let handle = get_handle_by_filename(file_name).await?; + pub(crate) async fn opfs_stat(&self, path: &str) -> Result { + let parent_handle = self.parent_dir_handle(path).await?; + let path = build_abs_path(&self.root, &path); + let last_component = path + .trim_end_matches('/') + .rsplit_once('/') + .map(|s| s.1) + .unwrap_or("/"); + + match JsFuture::from(parent_handle.get_directory_handle(last_component)).await { + // TODO: set content length for directory metadata + Ok(_) => Ok(Metadata::new(EntryMode::DIR)), + Err(err) => { + let err = js_sys::Error::from(err); + match String::from(err.name()).as_str() { + JS_TYPE_MISMATCH_ERROR => { + // the entry is a file and not a directory + let handle: FileSystemFileHandle = + JsFuture::from(parent_handle.get_file_handle(last_component)) + .await + .and_then(JsCast::dyn_into) + .map_err(parse_js_error)?; + + let file: File = JsFuture::from(handle.get_file()) + .await + .and_then(JsCast::dyn_into) + .map_err(parse_js_error)?; + + let last_modified = file.last_modified() as i64; + let metadata = Metadata::new(EntryMode::FILE) + .with_content_length(file.size() as u64) + .with_last_modified(parse_datetime_from_from_timestamp_millis( + last_modified, + )?); + + Ok(metadata) + } + _ => Err(parse_js_error(err.into())), + } + } + } + } + + pub(crate) async fn opfs_create_dir(&self, path: &str) -> Result<()> { + let opt = FileSystemGetDirectoryOptions::new(); + opt.set_create(true); + + self.dir_handle_with_option(path, &opt).await?; + + Ok(()) + } - let file: File = JsFuture::from(handle.get_file()) + /// List directory entries. Returns an [`AsyncIterator`] over the entries in the directory. + pub(crate) async fn opfs_list(&self, path: &str) -> Result { + let opt = FileSystemGetDirectoryOptions::new(); + + let handle = self.dir_handle_with_option(path, &opt).await?; + + Ok(handle.entries()) + } + + /// Get directory handle with options + pub(crate) async fn dir_handle_with_option( + &self, + path: &str, + opt: &FileSystemGetDirectoryOptions, + ) -> Result { + let path = build_abs_path(&self.root, path); + let dirs: Vec<&str> = path.trim_matches('/').split('/').collect(); + + let mut handle = get_root_directory_handle().await?; + for dir in dirs { + handle = JsFuture::from(handle.get_directory_handle_with_options(dir, &opt)) + .await + .and_then(JsCast::dyn_into) + .map_err(parse_js_error)?; + } + Ok(handle) + } + + /// Get parent directory handle + pub(crate) async fn parent_dir_handle(&self, path: &str) -> Result { + let path = build_abs_path(&self.root, path); + + let paths: Vec<&str> = path.trim_matches('/').split('/').collect(); + + let mut handle = get_root_directory_handle().await?; + for dir in paths[0..paths.len() - 1].iter() { + handle = JsFuture::from(handle.get_directory_handle(dir)) + .await + .and_then(JsCast::dyn_into) + .map_err(parse_js_error)?; + } + + Ok(handle) + } + + /// Get file handle + pub(crate) async fn file_handle(&self, path: &str) -> Result { + let opt = FileSystemGetFileOptions::new(); + self.file_handle_with_option(path, &opt).await + } + + /// Get file handle with options + pub(crate) async fn file_handle_with_option( + &self, + path: &str, + opt: &FileSystemGetFileOptions, + ) -> Result { + let path = build_abs_path(&self.root, path); + let paths: Vec<&str> = path.trim_matches('/').split('/').collect(); + + let mut handle = get_root_directory_handle().await?; + for dir in paths[0..paths.len() - 1].iter() { + handle = JsFuture::from(handle.get_directory_handle(dir)) + .await + .and_then(JsCast::dyn_into) + .map_err(parse_js_error)?; + } + + JsFuture::from(handle.get_file_handle_with_options(paths[paths.len() - 1], &opt)) .await .and_then(JsCast::dyn_into) - .map_err(parse_js_error)?; - let array_buffer = JsFuture::from(file.array_buffer()) - .await - .map_err(parse_js_error)?; - - Ok(js_sys::Uint8Array::new(&array_buffer).to_vec()) + .map_err(parse_js_error) } } diff --git a/core/src/services/opfs/delete.rs b/core/src/services/opfs/delete.rs new file mode 100644 index 000000000000..2765f1ae9d5e --- /dev/null +++ b/core/src/services/opfs/delete.rs @@ -0,0 +1,66 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use wasm_bindgen_futures::JsFuture; +use web_sys::FileSystemRemoveOptions; + +use crate::raw::{oio, OpDelete}; +use crate::services::opfs::error::parse_js_error; +use crate::{ErrorKind, Result}; + +use super::core::OpfsCore; + +pub struct OpfsDeleter { + core: Arc, +} + +impl OpfsDeleter { + pub(crate) fn new(core: Arc) -> Self { + Self { core } + } +} + +impl oio::OneShotDelete for OpfsDeleter { + async fn delete_once(&self, path: String, _: OpDelete) -> Result<()> { + let handle = match self.core.parent_dir_handle(&path).await { + Ok(handle) => handle, + Err(err) if err.kind() == ErrorKind::NotFound => return Ok(()), + Err(err) => return Err(err), + }; + + let path = self.core.path(&path); + let entry_name = path + .trim_end_matches('/') + .rsplit_once('/') + .map(|p| p.1) + .unwrap_or("/"); + + let opt = FileSystemRemoveOptions::new(); + opt.set_recursive(false); + + match JsFuture::from(handle.remove_entry_with_options(entry_name, &opt)) + .await + .map_err(parse_js_error) + { + Ok(_) => Ok(()), + Err(err) if err.kind() == ErrorKind::NotFound => Ok(()), + Err(err) => Err(err), + } + } +} diff --git a/core/src/services/opfs/docs.md b/core/src/services/opfs/docs.md index 79acba995573..bdc2619a1f4c 100644 --- a/core/src/services/opfs/docs.md +++ b/core/src/services/opfs/docs.md @@ -3,13 +3,13 @@ This service can be used to: - [ ] stat -- [ ] read -- [ ] write -- [ ] create_dir -- [ ] delete +- [x] read +- [x] write +- [x] create_dir +- [x] delete - [ ] copy - [ ] rename -- [ ] list +- [x] list - [ ] presign - [ ] blocking diff --git a/core/src/services/opfs/error.rs b/core/src/services/opfs/error.rs index 1f7d5cb87bd1..48f9ecb2bffb 100644 --- a/core/src/services/opfs/error.rs +++ b/core/src/services/opfs/error.rs @@ -19,9 +19,16 @@ use wasm_bindgen::JsValue; use crate::{Error, ErrorKind}; +pub(crate) const JS_NOT_FOUND_ERROR: &str = "NotFoundError"; +pub(crate) const JS_TYPE_MISMATCH_ERROR: &str = "TypeMismatchError"; + pub(crate) fn parse_js_error(msg: JsValue) -> Error { - Error::new( - ErrorKind::Unexpected, - msg.as_string().unwrap_or_else(String::new), - ) + let err = js_sys::Error::from(msg); + + let kind = match String::from(err.name()).as_str() { + JS_NOT_FOUND_ERROR => ErrorKind::NotFound, + _ => ErrorKind::Unexpected, + }; + + Error::new(kind, String::from(&err.message())) } diff --git a/core/src/services/opfs/lister.rs b/core/src/services/opfs/lister.rs new file mode 100644 index 000000000000..298d021b6fe6 --- /dev/null +++ b/core/src/services/opfs/lister.rs @@ -0,0 +1,178 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use futures::channel::{mpsc, oneshot}; +use futures::{SinkExt, StreamExt}; +use js_sys::{Array, AsyncIterator, IteratorNext, JsString}; +use wasm_bindgen::JsCast; +use wasm_bindgen_futures::JsFuture; +use web_sys::{FileSystemHandle, FileSystemHandleKind}; + +use crate::raw::{normalize_path, oio}; +use crate::services::opfs::error::parse_js_error; +use crate::{EntryMode, Error, ErrorKind, Metadata, Result}; + +use super::core::OpfsCore; + +enum ListRequest { + Next { + tx: oneshot::Sender>>, + }, +} + +pub struct OpfsLister { + done: bool, + current_path: Option, + tx: mpsc::UnboundedSender, +} + +impl OpfsLister { + pub(crate) async fn new(core: Arc, path: &str) -> Result { + let mut current_path = path.to_string(); + if !current_path.ends_with('/') { + current_path.push('/'); + } + + let (tx, rx) = mpsc::unbounded(); + + match core.opfs_list(path).await { + Ok(iter) => { + let prefix = current_path.clone(); + wasm_bindgen_futures::spawn_local(async move { + Self::run(iter, prefix, rx).await; + }); + Ok(Self { + current_path: Some(current_path), + done: false, + tx, + }) + } + Err(err) if err.kind() == ErrorKind::NotFound => Ok(Self { + tx, + done: true, + current_path: Some(current_path), + }), + Err(err) => Err(err), + } + } + + async fn run( + iter: AsyncIterator, + prefix: String, + mut rx: mpsc::UnboundedReceiver, + ) { + loop { + let Some(req) = rx.next().await else { + // OpfsLister is dropped, exit worker task + break; + }; + + match req { + ListRequest::Next { tx } => { + let fut = match iter.next().map(JsFuture::from) { + Ok(fut) => fut, + Err(err) => { + let _ = tx.send(Err(parse_js_error(err))); + return; + } + }; + + let entry_result = match fut.await { + Ok(entry) => { + let next = entry.unchecked_into::(); + let entry = if next.done() { + let _ = tx.send(Ok(None)); + return; + } else { + next.value() + }; + + let array = entry.dyn_into::().unwrap().to_vec(); + debug_assert_eq!(array.len(), 2, "expected 2 elements in array"); + + let Ok(mut path) = + array[0].clone().dyn_into::().map(String::from) + else { + let _ = tx.send(Err(Error::new( + ErrorKind::Unexpected, + format!("cast {:?} to JsString failed", array[0]), + ))); + break; + }; + + let Ok(handle) = array[1].clone().dyn_into::() else { + let _ = tx.send(Err(Error::new( + ErrorKind::Unexpected, + format!("cast {:?} to FileSystemHandle failed", array[1]), + ))); + break; + }; + + let mode = match handle.kind() { + FileSystemHandleKind::File => EntryMode::FILE, + FileSystemHandleKind::Directory => { + if !path.ends_with('/') { + path.push('/'); + } + EntryMode::DIR + } + _ => unreachable!(), + }; + let meta = Metadata::new(mode); + + Ok(Some(oio::Entry::new( + &normalize_path(&format!("{prefix}{path}")), + meta, + ))) + } + Err(err) => Err(parse_js_error(err)), + }; + + let _ = tx.send(entry_result); + } + } + } + } +} + +impl oio::List for OpfsLister { + async fn next(&mut self) -> Result> { + if self.done { + return Ok(None); + } + + // since list should return path itself, we return it first + if let Some(path) = self.current_path.take() { + let e = oio::Entry::new(path.as_str(), Metadata::new(EntryMode::DIR)); + return Ok(Some(e)); + } + + let (tx, rx) = oneshot::channel(); + let _ = self.tx.send(ListRequest::Next { tx }).await; + + match rx.await.unwrap() { + Ok(None) => { + self.done = true; + Ok(None) + } + Ok(e) => Ok(e), + Err(err) => Err(err), + } + } +} diff --git a/core/src/services/opfs/mod.rs b/core/src/services/opfs/mod.rs index c143d37b3aa5..15ade96a5bd1 100644 --- a/core/src/services/opfs/mod.rs +++ b/core/src/services/opfs/mod.rs @@ -18,13 +18,25 @@ #[cfg(feature = "services-opfs")] mod backend; #[cfg(feature = "services-opfs")] +mod builder; +#[cfg(feature = "services-opfs")] mod core; #[cfg(feature = "services-opfs")] -mod config; +mod delete; #[cfg(feature = "services-opfs")] mod error; +#[cfg(feature = "services-opfs")] +mod lister; +#[cfg(feature = "services-opfs")] +mod reader; +#[cfg(feature = "services-opfs")] +mod writer; + #[cfg(feature = "services-opfs")] mod utils; + +mod config; +pub use config::OpfsConfig; diff --git a/core/src/services/opfs/reader.rs b/core/src/services/opfs/reader.rs new file mode 100644 index 000000000000..6d6dde183426 --- /dev/null +++ b/core/src/services/opfs/reader.rs @@ -0,0 +1,139 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; +use std::u64; + +use bytes::{Bytes, BytesMut}; +use futures::channel::{mpsc, oneshot}; +use futures::{SinkExt, StreamExt}; +use js_sys::Uint8Array; +use wasm_bindgen::JsCast; +use wasm_bindgen_futures::JsFuture; +use web_sys::{File, ReadableStreamDefaultReader, ReadableStreamReadResult}; + +use crate::raw::{oio, OpRead}; +use crate::*; + +use super::core::OpfsCore; +use super::error::parse_js_error; + +enum ReadRequest { + Read { tx: oneshot::Sender> }, +} + +pub struct OpfsReader { + pos: usize, + end_pos: usize, + tx: mpsc::UnboundedSender, +} + +impl OpfsReader { + pub async fn new(core: Arc, path: &str, op: &OpRead) -> Result { + let pos = op.range().offset(); + let end_pos = op + .range() + .size() + .map(|sz| pos + sz) + .unwrap_or(i32::MAX as u64); + + let file_handle = core.file_handle(path).await?; + let file: File = JsFuture::from(file_handle.get_file()) + .await + .and_then(JsCast::dyn_into) + .map_err(parse_js_error)?; + + let read_stream = file + .slice_with_i32_and_i32(pos as i32, end_pos as i32) + .map_err(parse_js_error)? + .stream() + .get_reader() + .dyn_into::() + .map_err(|obj| parse_js_error(obj.into()))?; + + let (tx, rx) = mpsc::unbounded(); + + // everything in wasm-bindgen is non-send, so we need to spawn a new task + wasm_bindgen_futures::spawn_local(async move { + Self::run(read_stream, rx).await; + }); + + Ok(Self { + pos: pos as usize, + end_pos: end_pos as usize, + tx, + }) + } + + async fn run( + reader: ReadableStreamDefaultReader, + mut rx: mpsc::UnboundedReceiver, + ) { + loop { + let Some(req) = rx.next().await else { + // OpfsReader is dropped, exit worker task + break; + }; + match req { + ReadRequest::Read { tx } => { + let read_res = match JsFuture::from(reader.read()).await { + Ok(js_value) => ReadableStreamReadResult::from(js_value), + Err(err) => { + let _ = tx.send(Err(parse_js_error(err))); + break; + } + }; + + if read_res.get_done().unwrap_or(false) { + let _ = tx.send(Ok(Bytes::new())); + break; + } + + let chunk = match read_res.get_value().dyn_into::() { + Ok(chunk) => chunk, + Err(err) => { + let _ = tx.send(Err(parse_js_error(err))); + break; + } + }; + + let mut buf = BytesMut::with_capacity(chunk.byte_length() as usize); + buf.resize(chunk.byte_length() as usize, 0); + chunk.copy_to(buf.as_mut()); + + let _ = tx.send(Ok(buf.freeze())); + } + } + } + } +} + +impl oio::Read for OpfsReader { + async fn read(&mut self) -> Result { + if self.pos > self.end_pos { + return Ok(Buffer::new()); + } + + let (tx, rx) = oneshot::channel(); + let _ = self.tx.send(ReadRequest::Read { tx }).await; + + let bytes = rx.await.unwrap()?; + self.pos += bytes.len(); + + Ok(Buffer::from(bytes)) + } +} diff --git a/core/src/services/opfs/utils.rs b/core/src/services/opfs/utils.rs index 8aa97a9531b9..cd24d9aafa07 100644 --- a/core/src/services/opfs/utils.rs +++ b/core/src/services/opfs/utils.rs @@ -18,10 +18,7 @@ use crate::Result; use wasm_bindgen::JsCast; use wasm_bindgen_futures::JsFuture; -use web_sys::{ - window, FileSystemDirectoryHandle, FileSystemFileHandle, FileSystemGetDirectoryOptions, - FileSystemGetFileOptions, -}; +use web_sys::{window, FileSystemDirectoryHandle, FileSystemGetDirectoryOptions}; use super::error::*; @@ -50,21 +47,3 @@ pub(crate) async fn get_directory_handle( Ok(handle) } - -pub(crate) async fn get_handle_by_filename(filename: &str) -> Result { - let navigator = window().unwrap().navigator(); - let storage_manager = navigator.storage(); - let root: FileSystemDirectoryHandle = JsFuture::from(storage_manager.get_directory()) - .await - .and_then(JsCast::dyn_into) - .map_err(parse_js_error)?; - - // maybe the option should be exposed? - let opt = FileSystemGetFileOptions::new(); - opt.set_create(true); - - JsFuture::from(root.get_file_handle_with_options(filename, &opt)) - .await - .and_then(JsCast::dyn_into) - .map_err(parse_js_error) -} diff --git a/core/src/services/opfs/writer.rs b/core/src/services/opfs/writer.rs new file mode 100644 index 000000000000..717bdee0e68f --- /dev/null +++ b/core/src/services/opfs/writer.rs @@ -0,0 +1,161 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use bytes::{Buf, Bytes}; +use futures::channel::{mpsc, oneshot}; +use futures::{SinkExt, StreamExt}; +use wasm_bindgen::JsCast; +use wasm_bindgen_futures::JsFuture; +use web_sys::{ + File, FileSystemCreateWritableOptions, FileSystemGetFileOptions, FileSystemWritableFileStream, +}; + +use crate::raw::{oio, OpWrite}; +use crate::*; + +use super::core::OpfsCore; +use super::error::parse_js_error; + +enum WriterRequest { + Write { + buf: Bytes, + tx: oneshot::Sender>, + }, + Close { + tx: oneshot::Sender>, + }, +} + +pub struct OpfsWriter { + tx: mpsc::UnboundedSender, + pos: u64, +} + +impl OpfsWriter { + pub async fn new(core: Arc, path: &str, op: OpWrite) -> Result { + let (tx, rx) = mpsc::unbounded(); + let mut pos = 0; + + core.ensure_write_path(path).await?; + + let opt = FileSystemGetFileOptions::new(); + opt.set_create(true); + + let file_handle = core.file_handle_with_option(path, &opt).await?; + + let opt = FileSystemCreateWritableOptions::new(); + opt.set_keep_existing_data(op.append()); + + let write_stream: FileSystemWritableFileStream = + JsFuture::from(file_handle.create_writable_with_options(&opt)) + .await + .and_then(JsCast::dyn_into) + .map_err(parse_js_error)?; + + if op.append() { + let file: File = JsFuture::from(file_handle.get_file()) + .await + .and_then(JsCast::dyn_into) + .map_err(parse_js_error)?; + + JsFuture::from( + write_stream + .seek_with_f64(file.size()) + .map_err(parse_js_error)?, + ) + .await + .map_err(parse_js_error)?; + + pos += file.size() as u64; + } + + wasm_bindgen_futures::spawn_local(async move { + Self::run(write_stream, rx).await; + }); + + Ok(Self { tx, pos }) + } + + async fn run( + write_stream: FileSystemWritableFileStream, + mut rx: mpsc::UnboundedReceiver, + ) { + loop { + let Some(req) = rx.next().await else { + // OpfsWriter is dropped, exit worker task + break; + }; + + match req { + WriterRequest::Write { buf, tx } => { + let result = match write_stream.write_with_u8_array(&buf).map(JsFuture::from) { + Ok(fut) => match fut.await { + Ok(_) => Ok(()), + Err(err) => Err(parse_js_error(err)), + }, + Err(err) => Err(parse_js_error(err)), + }; + + let _ = tx.send(result); + } + WriterRequest::Close { tx } => { + let _ = match JsFuture::from(write_stream.close()).await { + Ok(_) => tx.send(Ok(())), + Err(err) => tx.send(Err(parse_js_error(err))), + }; + return; + } + }; + } + } +} + +impl oio::Write for OpfsWriter { + async fn write(&mut self, mut bs: Buffer) -> Result<()> { + while bs.has_remaining() { + let buf = bs.current(); + let n = buf.len(); + + let (tx, rx) = oneshot::channel(); + let _ = self.tx.send(WriterRequest::Write { buf, tx }).await; + + rx.await.unwrap()?; + self.pos += n as u64; + bs.advance(n); + } + + Ok(()) + } + + async fn close(&mut self) -> Result { + let (tx, rx) = oneshot::channel(); + let _ = self.tx.send(WriterRequest::Close { tx }).await; + + rx.await.unwrap()?; + + Ok(Metadata::default().with_content_length(self.pos)) + } + + async fn abort(&mut self) -> Result<()> { + Err(Error::new( + ErrorKind::Unsupported, + "OPFS doesn't support abort", + )) + } +} diff --git a/core/src/types/scheme.rs b/core/src/types/scheme.rs index 6d9c06b56c37..69f9b3b117c2 100644 --- a/core/src/types/scheme.rs +++ b/core/src/types/scheme.rs @@ -161,6 +161,8 @@ pub enum Scheme { Lakefs, /// [NebulaGraph](crate::services::NebulaGraph): NebulaGraph Services NebulaGraph, + /// [OPFS](crate::services::Opfs): OPFS Services + Opfs, /// Custom that allow users to implement services outside OpenDAL. /// /// # NOTE @@ -303,6 +305,8 @@ impl Scheme { Scheme::Surrealdb, #[cfg(feature = "services-lakefs")] Scheme::Lakefs, + #[cfg(feature = "services-opfs")] + Scheme::Opfs, ]) } } @@ -389,6 +393,7 @@ impl FromStr for Scheme { "hdfs_native" => Ok(Scheme::HdfsNative), "surrealdb" => Ok(Scheme::Surrealdb), "lakefs" => Ok(Scheme::Lakefs), + "opfs" => Ok(Scheme::Opfs), _ => Ok(Scheme::Custom(Box::leak(s.into_boxed_str()))), } } @@ -461,6 +466,7 @@ impl From for &'static str { Scheme::Surrealdb => "surrealdb", Scheme::Lakefs => "lakefs", Scheme::NebulaGraph => "nebula_graph", + Scheme::Opfs => "opfs", Scheme::Custom(v) => v, } } From e62bba2805dd6cdbb0ba40bedf50204e5353eb3a Mon Sep 17 00:00:00 2001 From: crwen <1543720935@qq.com> Date: Sat, 6 Sep 2025 18:15:04 +0800 Subject: [PATCH 2/8] test: add tests for opfs --- .github/workflows/test_behavior.yml | 14 + .github/workflows/test_behavior_opfs.yml | 65 +++ core/edge/opfs_on_wasm/Cargo.toml | 41 ++ .../edge/opfs_on_wasm/src/async_create_dir.rs | 40 ++ core/edge/opfs_on_wasm/src/async_delete.rs | 196 ++++++++ core/edge/opfs_on_wasm/src/async_list.rs | 351 ++++++++++++++ core/edge/opfs_on_wasm/src/async_read.rs | 198 ++++++++ core/edge/opfs_on_wasm/src/async_write.rs | 449 ++++++++++++++++++ core/edge/opfs_on_wasm/src/lib.rs | 33 ++ core/edge/opfs_on_wasm/src/utils.rs | 157 ++++++ core/edge/opfs_on_wasm/webdriver.json | 15 + core/src/services/opfs/backend.rs | 2 +- core/src/services/opfs/mod.rs | 18 +- 13 files changed, 1569 insertions(+), 10 deletions(-) create mode 100644 .github/workflows/test_behavior_opfs.yml create mode 100644 core/edge/opfs_on_wasm/Cargo.toml create mode 100644 core/edge/opfs_on_wasm/src/async_create_dir.rs create mode 100644 core/edge/opfs_on_wasm/src/async_delete.rs create mode 100644 core/edge/opfs_on_wasm/src/async_list.rs create mode 100644 core/edge/opfs_on_wasm/src/async_read.rs create mode 100644 core/edge/opfs_on_wasm/src/async_write.rs create mode 100644 core/edge/opfs_on_wasm/src/lib.rs create mode 100644 core/edge/opfs_on_wasm/src/utils.rs create mode 100644 core/edge/opfs_on_wasm/webdriver.json diff --git a/.github/workflows/test_behavior.yml b/.github/workflows/test_behavior.yml index 4839dccb9cb5..4fb11d1f022c 100644 --- a/.github/workflows/test_behavior.yml +++ b/.github/workflows/test_behavior.yml @@ -202,3 +202,17 @@ jobs: with: os: ${{ matrix.os }} cases: ${{ toJson(matrix.cases) }} + + test_opfs: + name: core_opfs / ${{ matrix.os }} + needs: [plan] + if: fromJson(needs.plan.outputs.plan).components.core + secrets: inherit + strategy: + fail-fast: false + matrix: + include: ${{ fromJson(needs.plan.outputs.plan).core }} + uses: ./.github/workflows/test_behavior_opfs.yml + with: + os: ${{ matrix.os }} + cases: ${{ toJson(matrix.cases) }} diff --git a/.github/workflows/test_behavior_opfs.yml b/.github/workflows/test_behavior_opfs.yml new file mode 100644 index 000000000000..de46cefca4f5 --- /dev/null +++ b/.github/workflows/test_behavior_opfs.yml @@ -0,0 +1,65 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +name: OPFS Test + +on: + workflow_call: + inputs: + os: + required: true + type: string + cases: + required: true + type: string + +jobs: + test_opfs_on_wasm: + runs-on: ubuntu-latest + + steps: + - name: Checkout code + uses: actions/checkout@v4 + - name: Setup Rust toolchain + uses: ./.github/actions/setup + + - name: Setup for wasm32 + run: | + rustup target add wasm32-unknown-unknown + + - name: Install Chrome Environment + run: | + mkdir -p /tmp/chrome + wget $(curl https://googlechromelabs.github.io/chrome-for-testing/known-good-versions-with-downloads.json | jq -r '.versions | sort_by(.version) | reverse | .[0] | .downloads.chrome | .[] | select(.platform == "linux64") | .url') + wget $(curl https://googlechromelabs.github.io/chrome-for-testing/known-good-versions-with-downloads.json | jq -r '.versions | sort_by(.version) | reverse | .[0] | .downloads.chromedriver | .[] | select(.platform == "linux64") | .url') + unzip chromedriver-linux64.zip + unzip chrome-linux64.zip + cp -r chrome-linux64/ /tmp/chrome/ + cp -r chromedriver-linux64 /tmp/chrome/chromedriver + + - name: Setup wasm-pack + uses: taiki-e/install-action@v2 + with: + tool: wasm-pack + + - name: Test wasm + working-directory: core/edge/opfs_on_wasm + env: + OPENDAL_TEST: opfs + run: | + export PATH=$PATH:/tmp/chrome/chrome-linux64/:/tmp/chrome/chromedriver-linux64/ + wasm-pack test --chrome --headless diff --git a/core/edge/opfs_on_wasm/Cargo.toml b/core/edge/opfs_on_wasm/Cargo.toml new file mode 100644 index 000000000000..92d77d8a4c87 --- /dev/null +++ b/core/edge/opfs_on_wasm/Cargo.toml @@ -0,0 +1,41 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "edge_test_opfs_on_wasm" + +edition.workspace = true +license.workspace = true +publish = false +rust-version.workspace = true +version.workspace = true + +[lib] +crate-type = ["cdylib"] + +[dependencies] +anyhow = { version = "1.0.30", features = ["std"] } +bytes = "1.6" +futures = "0.3" +opendal = { path = "../..", default-features = false, features = [ + "services-opfs", +] } +rand = { version = "0.8" } +sha2 = "0.10" +uuid = { version = "1", features = ["serde", "v4"] } + +wasm-bindgen-test = "0.3.41" diff --git a/core/edge/opfs_on_wasm/src/async_create_dir.rs b/core/edge/opfs_on_wasm/src/async_create_dir.rs new file mode 100644 index 000000000000..ccca3f5bca1b --- /dev/null +++ b/core/edge/opfs_on_wasm/src/async_create_dir.rs @@ -0,0 +1,40 @@ +#[cfg(test)] +mod tests { + + use anyhow::Result; + use opendal::*; + use wasm_bindgen_test::{wasm_bindgen_test, wasm_bindgen_test_configure}; + + wasm_bindgen_test_configure!(run_in_browser); + + use crate::*; + + #[wasm_bindgen_test] + /// Create dir with dir path should succeed. + pub async fn test_create_dir() -> Result<()> { + let op = operator(); + let path = TEST_FIXTURE.new_dir_path(); + + op.create_dir(&path).await?; + + let meta = op.stat(&path).await?; + assert_eq!(meta.mode(), EntryMode::DIR); + Ok(()) + } + + /// Create dir on existing dir should succeed. + #[wasm_bindgen_test] + pub async fn test_create_dir_existing() -> Result<()> { + let op = operator(); + let path = TEST_FIXTURE.new_dir_path(); + + op.create_dir(&path).await?; + + op.create_dir(&path).await?; + + let meta = op.stat(&path).await?; + assert_eq!(meta.mode(), EntryMode::DIR); + + Ok(()) + } +} diff --git a/core/edge/opfs_on_wasm/src/async_delete.rs b/core/edge/opfs_on_wasm/src/async_delete.rs new file mode 100644 index 000000000000..7a6895034d2b --- /dev/null +++ b/core/edge/opfs_on_wasm/src/async_delete.rs @@ -0,0 +1,196 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#[cfg(test)] +mod tests { + + use anyhow::Result; + use opendal::raw::Access; + use opendal::*; + use wasm_bindgen_test::wasm_bindgen_test; + use wasm_bindgen_test::wasm_bindgen_test_configure; + + use crate::*; + + wasm_bindgen_test_configure!(run_in_browser); + + /// Delete existing file should succeed. + #[wasm_bindgen_test] + pub async fn test_delete_file() -> Result<()> { + let op = operator(); + let (path, content, _) = TEST_FIXTURE.new_file(op.clone()); + + op.write(&path, content).await.expect("write must succeed"); + + op.delete(&path).await?; + + // Stat it again to check. + assert!(!op.exists(&path).await?); + + Ok(()) + } + + /// Delete empty dir should succeed. + #[wasm_bindgen_test] + pub async fn test_delete_empty_dir() -> Result<()> { + let op = operator(); + if !op.info().full_capability().create_dir { + return Ok(()); + } + + let path = TEST_FIXTURE.new_dir_path(); + + op.create_dir(&path).await.expect("create must succeed"); + + op.delete(&path).await?; + + Ok(()) + } + + /// Delete file with special chars should succeed. + #[wasm_bindgen_test] + pub async fn test_delete_with_special_chars() -> Result<()> { + let op = operator(); + // Ignore test for atomicserver until https://github.com/atomicdata-dev/atomic-server/issues/663 addressed. + if op.info().scheme() == opendal::Scheme::Atomicserver { + return Ok(()); + } + + let path = format!("{} !@#$%^&()_+-=;',.txt", uuid::Uuid::new_v4()); + let (path, content, _) = TEST_FIXTURE.new_file_with_path(op.clone(), &path); + + op.write(&path, content).await.expect("write must succeed"); + + op.delete(&path).await?; + + // Stat it again to check. + assert!(!op.exists(&path).await?); + + Ok(()) + } + + /// Delete not existing file should also succeed. + #[wasm_bindgen_test] + pub async fn test_delete_not_existing() -> Result<()> { + let op = operator(); + let path = uuid::Uuid::new_v4().to_string(); + + op.delete(&path).await?; + + Ok(()) + } + + /// Remove one file + #[wasm_bindgen_test] + pub async fn test_remove_one_file() -> Result<()> { + let op = operator(); + let (path, content, _) = TEST_FIXTURE.new_file(op.clone()); + + op.write(&path, content.clone()) + .await + .expect("write must succeed"); + + op.delete_iter(vec![path.clone()]).await?; + + // Stat it again to check. + assert!(!op.exists(&path).await?); + + op.write(&format!("/{path}"), content) + .await + .expect("write must succeed"); + + op.delete_iter(vec![path.clone()]).await?; + + // Stat it again to check. + assert!(!op.exists(&path).await?); + + Ok(()) + } + + /// Delete via stream. + #[wasm_bindgen_test] + pub async fn test_delete_stream() -> Result<()> { + let op = operator(); + if !op.info().full_capability().create_dir { + return Ok(()); + } + // Gdrive think that this test is an abuse of their service and redirect us + // to an infinite loop. Let's ignore this test for gdrive. + if op.info().scheme() == Scheme::Gdrive { + return Ok(()); + } + + let dir = uuid::Uuid::new_v4().to_string(); + op.create_dir(&format!("{dir}/")) + .await + .expect("create must succeed"); + + let expected: Vec<_> = (0..100).collect(); + for path in expected.iter() { + op.write(&format!("{dir}/{path}"), "delete_stream").await?; + } + + let mut deleter = op.deleter().await?; + deleter + .delete_iter(expected.iter().map(|v| format!("{dir}/{v}"))) + .await?; + deleter.close().await?; + + // Stat it again to check. + for path in expected.iter() { + assert!( + !op.exists(&format!("{dir}/{path}")).await?, + "{path} should be removed" + ) + } + + Ok(()) + } + + #[wasm_bindgen_test] + pub async fn test_batch_delete() -> Result<()> { + let op = operator(); + let mut cap = op.info().full_capability(); + if cap.delete_max_size.unwrap_or(1) <= 1 { + return Ok(()); + } + + cap.delete_max_size = Some(2); + op.inner().info().update_full_capability(|_| cap); + + let mut files = Vec::new(); + for _ in 0..5 { + let (path, content, _) = TEST_FIXTURE.new_file(op.clone()); + op.write(path.as_str(), content) + .await + .expect("write must succeed"); + files.push(path); + } + + op.delete_iter(files.clone()) + .await + .expect("batch delete must succeed"); + + for path in files { + let stat = op.stat(path.as_str()).await; + assert!(stat.is_err()); + assert_eq!(stat.unwrap_err().kind(), ErrorKind::NotFound); + } + + Ok(()) + } +} diff --git a/core/edge/opfs_on_wasm/src/async_list.rs b/core/edge/opfs_on_wasm/src/async_list.rs new file mode 100644 index 000000000000..99942c0c8935 --- /dev/null +++ b/core/edge/opfs_on_wasm/src/async_list.rs @@ -0,0 +1,351 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#[cfg(test)] +mod tests { + + use std::collections::HashMap; + + use anyhow::Result; + use futures::TryStreamExt; + use opendal::*; + use wasm_bindgen_test::console_log; + use wasm_bindgen_test::wasm_bindgen_test; + use wasm_bindgen_test::wasm_bindgen_test_configure; + + use crate::*; + + wasm_bindgen_test_configure!(run_in_browser); + + /// Check should be OK. + #[wasm_bindgen_test] + pub async fn test_check() -> Result<()> { + let op = operator(); + op.check().await.expect("operator check is ok"); + + Ok(()) + } + + /// List dir should return newly created file. + #[wasm_bindgen_test] + pub async fn test_list_dir() -> Result<()> { + let op = operator(); + let parent = uuid::Uuid::new_v4().to_string(); + let path = format!("{parent}/{}", uuid::Uuid::new_v4()); + console_log!("Generate a random file: {}", &path); + let (content, size) = gen_bytes(op.info().full_capability()); + + op.write(&path, content).await.expect("write must succeed"); + + let mut obs = op.lister(&format!("{parent}/")).await?; + let mut found = false; + while let Some(de) = obs.try_next().await? { + let meta = op.stat(de.path()).await?; + if de.path() == path { + assert_eq!(meta.mode(), EntryMode::FILE); + + assert_eq!(meta.content_length(), size as u64); + + found = true + } + } + assert!(found, "file should be found in list"); + + op.delete(&path).await.expect("delete must succeed"); + Ok(()) + } + + /// List prefix should return newly created file. + #[wasm_bindgen_test] + pub async fn test_list_prefix() -> Result<()> { + let op = operator(); + let path = uuid::Uuid::new_v4().to_string(); + console_log!("Generate a random file: {}", &path); + let (content, _) = gen_bytes(op.info().full_capability()); + + op.write(&path, content).await.expect("write must succeed"); + + let obs = op.list(&path).await?; + assert_eq!(obs.len(), 1); + assert_eq!(obs[0].path(), path); + assert_eq!(obs[0].metadata().mode(), EntryMode::FILE); + + op.delete(&path).await.expect("delete must succeed"); + Ok(()) + } + + /// listing a directory, which contains more objects than a single page can take. + #[wasm_bindgen_test] + pub async fn test_list_rich_dir() -> Result<()> { + let op = operator(); + // Gdrive think that this test is an abuse of their service and redirect us + // to an infinite loop. Let's ignore this test for gdrive. + if op.info().scheme() == Scheme::Gdrive { + return Ok(()); + } + + let parent = "test_list_rich_dir/"; + op.create_dir(parent).await?; + + let mut expected: Vec = (0..=10).map(|num| format!("{parent}file-{num}")).collect(); + for path in expected.iter() { + op.write(path, "test_list_rich_dir").await?; + } + expected.push(parent.to_string()); + + let mut objects = op.lister_with(parent).limit(5).await?; + let mut actual = vec![]; + while let Some(o) = objects.try_next().await? { + let path = o.path().to_string(); + actual.push(path) + } + expected.sort_unstable(); + actual.sort_unstable(); + + assert_eq!(actual, expected); + + op.remove_all(parent).await?; + Ok(()) + } + + /// List empty dir should return itself. + #[wasm_bindgen_test] + pub async fn test_list_empty_dir() -> Result<()> { + let op = operator(); + let dir = format!("{}/", uuid::Uuid::new_v4()); + + op.create_dir(&dir).await.expect("write must succeed"); + + // List "dir/" should return "dir/". + let mut obs = op.lister(&dir).await?; + let mut objects = HashMap::new(); + while let Some(de) = obs.try_next().await? { + objects.insert(de.path().to_string(), de); + } + assert_eq!( + objects.len(), + 1, + "only return the dir itself, but found: {objects:?}" + ); + assert_eq!( + objects[&dir].metadata().mode(), + EntryMode::DIR, + "given dir should exist and must be dir, but found: {objects:?}" + ); + + // List "dir" should return "dir/". + let mut obs = op.lister(dir.trim_end_matches('/')).await?; + let mut objects = HashMap::new(); + while let Some(de) = obs.try_next().await? { + objects.insert(de.path().to_string(), de); + } + assert_eq!( + objects.len(), + 1, + "only return the dir itself, but found: {objects:?}" + ); + assert_eq!( + objects[&dir].metadata().mode(), + EntryMode::DIR, + "given dir should exist and must be dir, but found: {objects:?}" + ); + + // List "dir/" recursively should return "dir/". + let mut obs = op.lister_with(&dir).recursive(true).await?; + let mut objects = HashMap::new(); + while let Some(de) = obs.try_next().await? { + objects.insert(de.path().to_string(), de); + } + assert_eq!( + objects.len(), + 1, + "only return the dir itself, but found: {objects:?}" + ); + assert_eq!( + objects[&dir].metadata().mode(), + EntryMode::DIR, + "given dir should exist and must be dir, but found: {objects:?}" + ); + + // List "dir" recursively should return "dir/". + let mut obs = op + .lister_with(dir.trim_end_matches('/')) + .recursive(true) + .await?; + let mut objects = HashMap::new(); + while let Some(de) = obs.try_next().await? { + objects.insert(de.path().to_string(), de); + } + assert_eq!(objects.len(), 1, "only return the dir itself"); + assert_eq!( + objects[&dir].metadata().mode(), + EntryMode::DIR, + "given dir should exist and must be dir" + ); + + op.delete(&dir).await.expect("delete must succeed"); + Ok(()) + } + + /// List non exist dir should return nothing. + #[wasm_bindgen_test] + pub async fn test_list_non_exist_dir() -> Result<()> { + let op = operator(); + let dir = format!("{}/", uuid::Uuid::new_v4()); + + let mut obs = op.lister(&dir).await?; + let mut objects = HashMap::new(); + while let Some(de) = obs.try_next().await? { + objects.insert(de.path().to_string(), de); + } + console_log!("got objects: {objects:?}"); + + assert_eq!(objects.len(), 0, "dir should only return empty"); + Ok(()) + } + + /// List dir should return correct sub dir. + #[wasm_bindgen_test] + pub async fn test_list_sub_dir() -> Result<()> { + let op = operator(); + let path = format!("{}/", uuid::Uuid::new_v4()); + + op.create_dir(&path).await.expect("create must succeed"); + + let mut obs = op.lister("/").await?; + let mut found = false; + let mut entries = vec![]; + while let Some(de) = obs.try_next().await? { + if de.path() == path { + let meta = op.stat(&path).await?; + assert_eq!(meta.mode(), EntryMode::DIR); + assert_eq!(de.name(), path); + + found = true + } + entries.push(de) + } + assert!( + found, + "dir should be found in list, but only got: {entries:?}" + ); + + op.delete(&path).await.expect("delete must succeed"); + Ok(()) + } + + /// List dir should also to list nested dir. + #[wasm_bindgen_test] + pub async fn test_list_nested_dir() -> Result<()> { + let op = operator(); + let parent = format!("{}/", uuid::Uuid::new_v4()); + op.create_dir(&parent) + .await + .expect("create dir must succeed"); + + let dir = format!("{parent}{}/", uuid::Uuid::new_v4()); + op.create_dir(&dir).await.expect("create must succeed"); + + let file_name = uuid::Uuid::new_v4().to_string(); + let file_path = format!("{dir}{file_name}"); + op.write(&file_path, "test_list_nested_dir") + .await + .expect("create must succeed"); + + let dir_name = format!("{}/", uuid::Uuid::new_v4()); + let dir_path = format!("{dir}{dir_name}"); + op.create_dir(&dir_path).await.expect("create must succeed"); + + let obs = op.list(&parent).await?; + assert_eq!(obs.len(), 2, "parent should got 2 entry"); + let objects: HashMap<&str, &Entry> = obs.iter().map(|e| (e.path(), e)).collect(); + assert_eq!( + objects + .get(parent.as_str()) + .expect("parent should be found in list") + .metadata() + .mode(), + EntryMode::DIR + ); + assert_eq!( + objects + .get(dir.as_str()) + .expect("dir should be found in list") + .metadata() + .mode(), + EntryMode::DIR + ); + + let mut obs = op.lister(&dir).await?; + let mut objects = HashMap::new(); + + while let Some(de) = obs.try_next().await? { + objects.insert(de.path().to_string(), de); + } + console_log!("got objects: {objects:?}"); + + assert_eq!(objects.len(), 3, "dir should only got 3 objects"); + + // Check file + let meta = op + .stat( + objects + .get(&file_path) + .expect("file should be found in list") + .path(), + ) + .await?; + assert_eq!(meta.mode(), EntryMode::FILE); + assert_eq!(meta.content_length(), 20); + + // Check dir + let meta = op + .stat( + objects + .get(&dir_path) + .expect("file should be found in list") + .path(), + ) + .await?; + assert_eq!(meta.mode(), EntryMode::DIR); + + op.delete(&file_path).await.expect("delete must succeed"); + op.delete(&dir_path).await.expect("delete must succeed"); + op.delete(&dir).await.expect("delete must succeed"); + Ok(()) + } + + /// List with path file should auto add / suffix. + #[wasm_bindgen_test] + pub async fn test_list_dir_with_file_path() -> Result<()> { + let op = operator(); + let parent = uuid::Uuid::new_v4().to_string(); + let file = format!("{parent}/{}", uuid::Uuid::new_v4()); + + let (content, _) = gen_bytes(op.info().full_capability()); + op.write(&file, content).await?; + + let obs = op.list(&parent).await?; + assert_eq!(obs.len(), 1); + assert_eq!(obs[0].path(), format!("{parent}/")); + assert_eq!(obs[0].metadata().mode(), EntryMode::DIR); + + op.delete(&file).await?; + + Ok(()) + } +} diff --git a/core/edge/opfs_on_wasm/src/async_read.rs b/core/edge/opfs_on_wasm/src/async_read.rs new file mode 100644 index 000000000000..5ea265508e72 --- /dev/null +++ b/core/edge/opfs_on_wasm/src/async_read.rs @@ -0,0 +1,198 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#[cfg(test)] +mod tests { + + use futures::AsyncReadExt; + use futures::TryStreamExt; + use opendal::*; + use sha2::Digest; + use sha2::Sha256; + use wasm_bindgen_test::console_log; + use wasm_bindgen_test::wasm_bindgen_test; + use wasm_bindgen_test::wasm_bindgen_test_configure; + + use crate::*; + + wasm_bindgen_test_configure!(run_in_browser); + + /// Read full content should match. + #[wasm_bindgen_test] + pub async fn test_read_full() -> anyhow::Result<()> { + let op = operator(); + let (path, content, size) = TEST_FIXTURE.new_file(op.clone()); + + op.write(&path, content.clone()) + .await + .expect("write must succeed"); + + let bs = op.read(&path).await?.to_bytes(); + assert_eq!(size, bs.len(), "read size"); + assert_eq!( + format!("{:x}", Sha256::digest(&bs)), + format!("{:x}", Sha256::digest(&content)), + "read content" + ); + + Ok(()) + } + + /// Read range content should match. + #[wasm_bindgen_test] + pub async fn test_read_range() -> anyhow::Result<()> { + let op = operator(); + let (path, content, size) = TEST_FIXTURE.new_file(op.clone()); + let (offset, length) = gen_offset_length(size); + + op.write(&path, content.clone()) + .await + .expect("write must succeed"); + + let bs = op + .read_with(&path) + .range(offset..offset + length) + .await? + .to_bytes(); + assert_eq!(bs.len() as u64, length, "read size"); + assert_eq!( + format!("{:x}", Sha256::digest(&bs)), + format!( + "{:x}", + Sha256::digest(&content[offset as usize..(offset + length) as usize]) + ), + "read content" + ); + + Ok(()) + } + + /// Read full content should match. + #[wasm_bindgen_test] + pub async fn test_reader() -> anyhow::Result<()> { + let op = operator(); + let (path, content, size) = TEST_FIXTURE.new_file(op.clone()); + + op.write(&path, content.clone()) + .await + .expect("write must succeed"); + + // Reader. + let bs = op.reader(&path).await?.read(..).await?.to_bytes(); + assert_eq!(size, bs.len(), "read size"); + assert_eq!( + format!("{:x}", Sha256::digest(&bs)), + format!("{:x}", Sha256::digest(&content)), + "read content" + ); + + // Bytes Stream + let bs = op + .reader(&path) + .await? + .into_bytes_stream(..) + .await? + .try_fold(Vec::new(), |mut acc, chunk| { + acc.extend_from_slice(&chunk); + async { Ok(acc) } + }) + .await?; + assert_eq!(size, bs.len(), "read size"); + assert_eq!( + format!("{:x}", Sha256::digest(&bs)), + format!("{:x}", Sha256::digest(&content)), + "read content" + ); + + // Futures Reader + let mut futures_reader = op + .reader(&path) + .await? + .into_futures_async_read(0..size as u64) + .await?; + let mut bs = Vec::new(); + futures_reader.read_to_end(&mut bs).await?; + assert_eq!(size, bs.len(), "read size"); + assert_eq!( + format!("{:x}", Sha256::digest(&bs)), + format!("{:x}", Sha256::digest(&content)), + "read content" + ); + + Ok(()) + } + + /// Read not exist file should return NotFound + #[wasm_bindgen_test] + pub async fn test_read_not_exist() -> anyhow::Result<()> { + let op = operator(); + let path = uuid::Uuid::new_v4().to_string(); + + let bs = op.read(&path).await; + assert!(bs.is_err()); + assert_eq!(bs.unwrap_err().kind(), ErrorKind::NotFound); + + Ok(()) + } + + /// Read with dir path should return an error. + #[wasm_bindgen_test] + pub async fn test_read_with_dir_path() -> anyhow::Result<()> { + let op = operator(); + if !op.info().full_capability().create_dir { + return Ok(()); + } + + let path = TEST_FIXTURE.new_dir_path(); + + op.create_dir(&path).await.expect("write must succeed"); + + let result = op.read(&path).await; + assert!(result.is_err()); + assert_eq!(result.unwrap_err().kind(), ErrorKind::IsADirectory); + + Ok(()) + } + + /// Read file with special chars should succeed. + #[wasm_bindgen_test] + pub async fn test_read_with_special_chars() -> anyhow::Result<()> { + let op = operator(); + // Ignore test for atomicserver until https://github.com/atomicdata-dev/atomic-server/issues/663 addressed. + if op.info().scheme() == opendal::Scheme::Atomicserver { + console_log!("ignore test for atomicserver until https://github.com/atomicdata-dev/atomic-server/issues/663 is resolved"); + return Ok(()); + } + + let path = format!("{} !@#$%^&()_+-=;',.txt", uuid::Uuid::new_v4()); + let (path, content, size) = TEST_FIXTURE.new_file_with_path(op.clone(), &path); + + op.write(&path, content.clone()) + .await + .expect("write must succeed"); + + let bs = op.read(&path).await?.to_bytes(); + assert_eq!(size, bs.len(), "read size"); + assert_eq!( + format!("{:x}", Sha256::digest(&bs)), + format!("{:x}", Sha256::digest(&content)), + "read content" + ); + + Ok(()) + } +} diff --git a/core/edge/opfs_on_wasm/src/async_write.rs b/core/edge/opfs_on_wasm/src/async_write.rs new file mode 100644 index 000000000000..eec47e40a995 --- /dev/null +++ b/core/edge/opfs_on_wasm/src/async_write.rs @@ -0,0 +1,449 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod tests { + + use anyhow::Result; + use bytes::Bytes; + use futures::io::BufReader; + use futures::io::Cursor; + use futures::stream; + use futures::AsyncWriteExt; + use futures::SinkExt; + use futures::StreamExt; + use opendal::*; + use sha2::Digest; + use sha2::Sha256; + use wasm_bindgen_test::{wasm_bindgen_test, wasm_bindgen_test_configure}; + + use crate::*; + + wasm_bindgen_test_configure!(run_in_browser); + + /// Write a single file and test with stat. + #[wasm_bindgen_test] + pub async fn test_write_only() -> Result<()> { + let op = operator(); + let (path, content, size) = TEST_FIXTURE.new_file(op.clone()); + + op.write(&path, content).await?; + + let meta = op.stat(&path).await.expect("stat must succeed"); + assert_eq!(meta.content_length(), size as u64); + + Ok(()) + } + + /// Write a file with empty content. + #[wasm_bindgen_test] + pub async fn test_write_with_empty_content() -> Result<()> { + let op = operator(); + if !op.info().full_capability().write_can_empty { + return Ok(()); + } + + let path = TEST_FIXTURE.new_file_path(); + + let bs: Vec = vec![]; + op.write(&path, bs).await?; + + let meta = op.stat(&path).await.expect("stat must succeed"); + assert_eq!(meta.content_length(), 0); + Ok(()) + } + + /// Write file with dir path should return an error + #[wasm_bindgen_test] + pub async fn test_write_with_dir_path() -> Result<()> { + let op = operator(); + let path = TEST_FIXTURE.new_dir_path(); + + let result = op.write(&path, vec![1]).await; + assert!(result.is_err()); + assert_eq!(result.unwrap_err().kind(), ErrorKind::IsADirectory); + + Ok(()) + } + + /// Write a single file with special chars should succeed. + #[wasm_bindgen_test] + pub async fn test_write_with_special_chars() -> Result<()> { + let op = operator(); + // Ignore test for atomicserver until https://github.com/atomicdata-dev/atomic-server/issues/663 addressed. + if op.info().scheme() == opendal::Scheme::Atomicserver { + return Ok(()); + } + // Ignore test for vercel blob https://github.com/apache/opendal/pull/4103. + if op.info().scheme() == opendal::Scheme::VercelBlob { + return Ok(()); + } + + let path = format!("{} !@#$%^&()_+-=;',.txt", uuid::Uuid::new_v4()); + let (path, content, size) = TEST_FIXTURE.new_file_with_path(op.clone(), &path); + + op.write(&path, content).await?; + + let meta = op.stat(&path).await.expect("stat must succeed"); + assert_eq!(meta.content_length(), size as u64); + + Ok(()) + } + + #[wasm_bindgen_test] + pub async fn test_write_returns_metadata() -> Result<()> { + let op = operator(); + let (path, content, _) = TEST_FIXTURE.new_file(op.clone()); + + let meta = op.write(&path, content).await?; + let stat_meta = op.stat(&path).await?; + + assert_metadata(stat_meta, meta); + + Ok(()) + } + + /// Delete existing file should succeed. + #[wasm_bindgen_test] + pub async fn test_writer_abort() -> Result<()> { + let op = operator(); + let (path, content, _) = TEST_FIXTURE.new_file(op.clone()); + + let mut writer = match op.writer(&path).await { + Ok(writer) => writer, + Err(e) => { + assert_eq!(e.kind(), ErrorKind::Unsupported); + return Ok(()); + } + }; + + if let Err(e) = writer.write(content).await { + assert_eq!(e.kind(), ErrorKind::Unsupported); + return Ok(()); + } + + if let Err(e) = writer.abort().await { + assert_eq!(e.kind(), ErrorKind::Unsupported); + return Ok(()); + } + + // Aborted writer should not write actual file. + assert!(!op.exists(&path).await?); + Ok(()) + } + + /// Append data into writer + #[wasm_bindgen_test] + pub async fn test_writer_write() -> Result<()> { + let op = operator(); + if !(op.info().full_capability().write_can_multi) { + return Ok(()); + } + + let path = TEST_FIXTURE.new_file_path(); + let size = 5 * 1024 * 1024; // write file with 5 MiB + let content_a = gen_fixed_bytes(size); + let content_b = gen_fixed_bytes(size); + + let mut w = op.writer(&path).await?; + w.write(content_a.clone()).await?; + w.write(content_b.clone()).await?; + w.close().await?; + + let meta = op.stat(&path).await.expect("stat must succeed"); + assert_eq!(meta.content_length(), (size * 2) as u64); + + let bs = op.read(&path).await?.to_bytes(); + assert_eq!(bs.len(), size * 2, "read size"); + assert_eq!( + format!("{:x}", Sha256::digest(&bs[..size])), + format!("{:x}", Sha256::digest(content_a)), + "read content a" + ); + assert_eq!( + format!("{:x}", Sha256::digest(&bs[size..])), + format!("{:x}", Sha256::digest(content_b)), + "read content b" + ); + + Ok(()) + } + + /// Streaming data into writer + #[wasm_bindgen_test] + pub async fn test_writer_sink() -> Result<()> { + let op = operator(); + let cap = op.info().full_capability(); + if !(cap.write && cap.write_can_multi) { + return Ok(()); + } + + let path = TEST_FIXTURE.new_file_path(); + let size = 5 * 1024 * 1024; // write file with 5 MiB + let content_a = gen_fixed_bytes(size); + let content_b = gen_fixed_bytes(size); + let mut stream = stream::iter(vec![ + Bytes::from(content_a.clone()), + Bytes::from(content_b.clone()), + ]) + .map(Ok); + + let mut w = op + .writer_with(&path) + .chunk(4 * 1024 * 1024) + .await? + .into_bytes_sink(); + w.send_all(&mut stream).await?; + w.close().await?; + + let meta = op.stat(&path).await.expect("stat must succeed"); + assert_eq!(meta.content_length(), (size * 2) as u64); + + let bs = op.read(&path).await?.to_bytes(); + assert_eq!(bs.len(), size * 2, "read size"); + assert_eq!( + format!("{:x}", Sha256::digest(&bs[..size])), + format!("{:x}", Sha256::digest(content_a)), + "read content a" + ); + assert_eq!( + format!("{:x}", Sha256::digest(&bs[size..])), + format!("{:x}", Sha256::digest(content_b)), + "read content b" + ); + + Ok(()) + } + + /// Copy data from reader to writer + #[wasm_bindgen_test] + pub async fn test_writer_futures_copy() -> Result<()> { + let op = operator(); + if !(op.info().full_capability().write_can_multi) { + return Ok(()); + } + + let path = TEST_FIXTURE.new_file_path(); + let (content, size): (Vec, usize) = + gen_bytes_with_range(10 * 1024 * 1024..20 * 1024 * 1024); + + let mut w = op + .writer_with(&path) + .chunk(8 * 1024 * 1024) + .await? + .into_futures_async_write(); + + // Wrap a buf reader here to make sure content is read in 1MiB chunks. + let mut cursor = BufReader::with_capacity(1024 * 1024, Cursor::new(content.clone())); + futures::io::copy_buf(&mut cursor, &mut w).await?; + w.close().await?; + + let meta = op.stat(&path).await.expect("stat must succeed"); + assert_eq!(meta.content_length(), size as u64); + + let bs = op.read(&path).await?.to_bytes(); + assert_eq!(bs.len(), size, "read size"); + assert_eq!( + format!("{:x}", Sha256::digest(&bs[..size])), + format!("{:x}", Sha256::digest(content)), + "read content" + ); + + Ok(()) + } + + #[wasm_bindgen_test] + pub async fn test_writer_return_metadata() -> Result<()> { + let op = operator(); + let cap = op.info().full_capability(); + if !cap.write_can_multi { + return Ok(()); + } + + let path = TEST_FIXTURE.new_file_path(); + let size = 5 * 1024 * 1024; // write file with 5 MiB + let content_a = gen_fixed_bytes(size); + let content_b = gen_fixed_bytes(size); + + let mut w = op.writer(&path).await?; + w.write(content_a.clone()).await?; + w.write(content_b.clone()).await?; + let meta = w.close().await?; + + let stat_meta = op.stat(&path).await.expect("stat must succeed"); + + assert_metadata(stat_meta, meta); + + Ok(()) + } + + /// Test append to a file must success. + #[wasm_bindgen_test] + pub async fn test_write_with_append() -> Result<()> { + let op = operator(); + let path = TEST_FIXTURE.new_file_path(); + let (content_one, size_one) = gen_bytes(op.info().full_capability()); + let (content_two, size_two) = gen_bytes(op.info().full_capability()); + + op.write_with(&path, content_one.clone()) + .append(true) + .await + .expect("append file first time must success"); + + let meta = op.stat(&path).await?; + assert_eq!(meta.content_length(), size_one as u64); + + op.write_with(&path, content_two.clone()) + .append(true) + .await + .expect("append to an existing file must success"); + + let bs = op + .read(&path) + .await + .expect("read file must success") + .to_bytes(); + + assert_eq!(bs.len(), size_one + size_two); + assert_eq!(bs[..size_one], content_one); + assert_eq!(bs[size_one..], content_two); + + Ok(()) + } + + #[wasm_bindgen_test] + pub async fn test_write_with_append_returns_metadata() -> Result<()> { + let op = operator(); + let cap = op.info().full_capability(); + + let path = TEST_FIXTURE.new_file_path(); + let (content_one, _) = gen_bytes(cap); + let (content_two, _) = gen_bytes(cap); + + op.write_with(&path, content_one.clone()) + .append(true) + .await + .expect("append file first time must success"); + + let meta = op + .write_with(&path, content_two.clone()) + .append(true) + .await + .expect("append to an existing file must success"); + + let stat_meta = op.stat(&path).await.expect("stat must succeed"); + assert_metadata(stat_meta, meta); + + Ok(()) + } + + fn assert_metadata(stat_meta: Metadata, meta: Metadata) { + assert_eq!(stat_meta.content_length(), meta.content_length()); + if meta.etag().is_some() { + assert_eq!(stat_meta.etag(), meta.etag()); + } + if meta.last_modified().is_some() { + assert_eq!(stat_meta.last_modified(), meta.last_modified()); + } + if meta.version().is_some() { + assert_eq!(stat_meta.version(), meta.version()); + } + if meta.content_md5().is_some() { + assert_eq!(stat_meta.content_md5(), meta.content_md5()); + } + if meta.content_type().is_some() { + assert_eq!(stat_meta.content_type(), meta.content_type()); + } + if meta.content_encoding().is_some() { + assert_eq!(stat_meta.content_encoding(), meta.content_encoding()); + } + if meta.content_disposition().is_some() { + assert_eq!(stat_meta.content_disposition(), meta.content_disposition()); + } + } + + /// Copy data from reader to writer + #[wasm_bindgen_test] + pub async fn test_writer_with_append() -> Result<()> { + let op = operator(); + let path = uuid::Uuid::new_v4().to_string(); + let (content, size): (Vec, usize) = + gen_bytes_with_range(10 * 1024 * 1024..20 * 1024 * 1024); + + let mut a = op + .writer_with(&path) + .append(true) + .await? + .into_futures_async_write(); + + // Wrap a buf reader here to make sure content is read in 1MiB chunks. + let mut cursor = BufReader::with_capacity(1024 * 1024, Cursor::new(content.clone())); + futures::io::copy_buf(&mut cursor, &mut a).await?; + a.close().await?; + + let meta = op.stat(&path).await.expect("stat must succeed"); + assert_eq!(meta.content_length(), size as u64); + + let bs = op.read(&path).await?.to_bytes(); + assert_eq!(bs.len(), size, "read size"); + assert_eq!( + format!("{:x}", Sha256::digest(&bs[..size])), + format!("{:x}", Sha256::digest(content)), + "read content" + ); + + op.delete(&path).await.expect("delete must succeed"); + Ok(()) + } + + #[wasm_bindgen_test] + pub async fn test_writer_write_with_overwrite() -> Result<()> { + let op = operator(); + // ghac does not support overwrite + if op.info().scheme() == Scheme::Ghac { + return Ok(()); + } + + let path = uuid::Uuid::new_v4().to_string(); + let (content_one, _) = gen_bytes(op.info().full_capability()); + let (content_two, _) = gen_bytes(op.info().full_capability()); + + op.write(&path, content_one.clone()).await?; + let bs = op.read(&path).await?.to_bytes(); + assert_eq!( + format!("{:x}", Sha256::digest(&bs)), + format!("{:x}", Sha256::digest(&content_one)), + "read content_one" + ); + op.write(&path, content_two.clone()) + .await + .expect("write overwrite must succeed"); + let bs = op.read(&path).await?.to_bytes(); + assert_ne!( + format!("{:x}", Sha256::digest(&bs)), + format!("{:x}", Sha256::digest(&content_one)), + "content_one must be overwrote" + ); + assert_eq!( + format!("{:x}", Sha256::digest(&bs)), + format!("{:x}", Sha256::digest(&content_two)), + "read content_two" + ); + + op.delete(&path).await.expect("delete must succeed"); + Ok(()) + } +} diff --git a/core/edge/opfs_on_wasm/src/lib.rs b/core/edge/opfs_on_wasm/src/lib.rs new file mode 100644 index 000000000000..a377f3f5fc82 --- /dev/null +++ b/core/edge/opfs_on_wasm/src/lib.rs @@ -0,0 +1,33 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod async_create_dir; +mod async_delete; +mod async_list; +mod async_read; +mod async_write; +mod utils; + +use opendal::{services::OpfsConfig, Configurator, Operator}; +pub use utils::*; + +pub static TEST_FIXTURE: Fixture = Fixture::new(); + +pub(crate) fn operator() -> Operator { + let builder = OpfsConfig::default().into_builder().root("/a/b/"); + Operator::new(builder).unwrap().finish() +} diff --git a/core/edge/opfs_on_wasm/src/utils.rs b/core/edge/opfs_on_wasm/src/utils.rs new file mode 100644 index 000000000000..a535110bf601 --- /dev/null +++ b/core/edge/opfs_on_wasm/src/utils.rs @@ -0,0 +1,157 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::mem; +use std::sync::Mutex; + +use opendal::*; +use rand::distributions::uniform::SampleRange; +use rand::prelude::*; + +pub fn gen_bytes_with_range(range: impl SampleRange) -> (Vec, usize) { + let mut rng = thread_rng(); + + let size = rng.gen_range(range); + let mut content = vec![0; size]; + rng.fill_bytes(&mut content); + + (content, size) +} + +pub fn gen_bytes(cap: Capability) -> (Vec, usize) { + let max_size = cap.write_total_max_size.unwrap_or(4 * 1024 * 1024); + gen_bytes_with_range(1..max_size) +} + +pub fn gen_fixed_bytes(size: usize) -> Vec { + let (content, _) = gen_bytes_with_range(size..=size); + + content +} + +pub fn gen_offset_length(size: usize) -> (u64, u64) { + let mut rng = thread_rng(); + + // Make sure at least one byte is read. + let offset = rng.gen_range(0..size - 1); + let length = rng.gen_range(1..(size - offset)); + + (offset as u64, length as u64) +} + +pub struct Fixture { + pub paths: Mutex>, +} + +impl Default for Fixture { + fn default() -> Self { + Self::new() + } +} + +impl Fixture { + /// Create a new fixture + pub const fn new() -> Self { + Self { + paths: Mutex::new(vec![]), + } + } + + /// Add a path. + pub fn add_path(&self, path: String) { + self.paths.lock().unwrap().push(path); + } + + /// Create a new dir path + pub fn new_dir_path(&self) -> String { + let path = format!("{}/", uuid::Uuid::new_v4()); + self.paths.lock().unwrap().push(path.clone()); + + path + } + + /// Create a new file path + pub fn new_file_path(&self) -> String { + let path = format!("{}", uuid::Uuid::new_v4()); + self.paths.lock().unwrap().push(path.clone()); + + path + } + + /// Create a new file with random content + pub fn new_file(&self, op: impl Into) -> (String, Vec, usize) { + let max_size = op + .into() + .info() + .full_capability() + .write_total_max_size + .unwrap_or(4 * 1024 * 1024); + + self.new_file_with_range(uuid::Uuid::new_v4().to_string(), 1..max_size) + } + + pub fn new_file_with_path( + &self, + op: impl Into, + path: &str, + ) -> (String, Vec, usize) { + let max_size = op + .into() + .info() + .full_capability() + .write_total_max_size + .unwrap_or(4 * 1024 * 1024); + + self.new_file_with_range(path, 1..max_size) + } + + /// Create a new file with random content in range. + fn new_file_with_range( + &self, + path: impl Into, + range: impl SampleRange, + ) -> (String, Vec, usize) { + let path = path.into(); + self.paths.lock().unwrap().push(path.clone()); + + let mut rng = thread_rng(); + + let size = rng.gen_range(range); + let mut content = vec![0; size]; + rng.fill_bytes(&mut content); + + (path, content, size) + } + + /// Perform cleanup + pub async fn cleanup(&self, op: impl Into) { + let op = op.into(); + // Don't cleanup data if delete is not supported + if !op.info().full_capability().delete { + return; + } + + let paths: Vec<_> = mem::take(self.paths.lock().unwrap().as_mut()); + // Don't call delete if paths is empty + if paths.is_empty() { + return; + } + + // We try our best to clean up fixtures, but won't panic if failed. + let _ = op.delete_iter(paths).await; + } +} diff --git a/core/edge/opfs_on_wasm/webdriver.json b/core/edge/opfs_on_wasm/webdriver.json new file mode 100644 index 000000000000..c2d6865e644e --- /dev/null +++ b/core/edge/opfs_on_wasm/webdriver.json @@ -0,0 +1,15 @@ +{ + "moz:firefoxOptions": { + "prefs": { + "media.navigator.streams.fake": true, + "media.navigator.permission.disabled": true + }, + "args": [] + }, + "goog:chromeOptions": { + "args": [ + "--use-fake-device-for-media-stream", + "--use-fake-ui-for-media-stream" + ] + } +} diff --git a/core/src/services/opfs/backend.rs b/core/src/services/opfs/backend.rs index cd8a4bb9f8f9..a029aaee1835 100644 --- a/core/src/services/opfs/backend.rs +++ b/core/src/services/opfs/backend.rs @@ -59,7 +59,7 @@ impl Access for OpfsBackend { write_can_empty: true, write_can_append: true, write_can_multi: true, - // write_with_if_not_exists: true, + create_dir: true, delete: true, diff --git a/core/src/services/opfs/mod.rs b/core/src/services/opfs/mod.rs index 15ade96a5bd1..0315f66e0d91 100644 --- a/core/src/services/opfs/mod.rs +++ b/core/src/services/opfs/mod.rs @@ -15,27 +15,27 @@ // specific language governing permissions and limitations // under the License. -#[cfg(feature = "services-opfs")] +#[cfg(all(feature = "services-opfs", target_arch = "wasm32"))] mod backend; -#[cfg(feature = "services-opfs")] +#[cfg(all(feature = "services-opfs", target_arch = "wasm32"))] mod builder; -#[cfg(feature = "services-opfs")] +#[cfg(all(feature = "services-opfs", target_arch = "wasm32"))] mod core; -#[cfg(feature = "services-opfs")] +#[cfg(all(feature = "services-opfs", target_arch = "wasm32"))] mod delete; -#[cfg(feature = "services-opfs")] +#[cfg(all(feature = "services-opfs", target_arch = "wasm32"))] mod error; -#[cfg(feature = "services-opfs")] +#[cfg(all(feature = "services-opfs", target_arch = "wasm32"))] mod lister; -#[cfg(feature = "services-opfs")] +#[cfg(all(feature = "services-opfs", target_arch = "wasm32"))] mod reader; -#[cfg(feature = "services-opfs")] +#[cfg(all(feature = "services-opfs", target_arch = "wasm32"))] mod writer; -#[cfg(feature = "services-opfs")] +#[cfg(all(feature = "services-opfs", target_arch = "wasm32"))] mod utils; mod config; From 786979d93385cb98aa562f3876e0ef9ddedc8936 Mon Sep 17 00:00:00 2001 From: crwen <1543720935@qq.com> Date: Sat, 6 Sep 2025 23:26:11 +0800 Subject: [PATCH 3/8] fix: add license --- core/Cargo.lock | 14 ++++++++++++++ core/edge/opfs_on_wasm/src/async_create_dir.rs | 17 +++++++++++++++++ 2 files changed, 31 insertions(+) diff --git a/core/Cargo.lock b/core/Cargo.lock index 8e07e0b0cfc3..c6ea5df6d6dc 100644 --- a/core/Cargo.lock +++ b/core/Cargo.lock @@ -2720,6 +2720,20 @@ dependencies = [ "tokio", ] +[[package]] +name = "edge_test_opfs_on_wasm" +version = "0.54.0" +dependencies = [ + "anyhow", + "bytes", + "futures", + "opendal", + "rand 0.8.5", + "sha2", + "uuid", + "wasm-bindgen-test", +] + [[package]] name = "edge_test_s3_read_on_wasm" version = "0.54.0" diff --git a/core/edge/opfs_on_wasm/src/async_create_dir.rs b/core/edge/opfs_on_wasm/src/async_create_dir.rs index ccca3f5bca1b..d6c2b22cd405 100644 --- a/core/edge/opfs_on_wasm/src/async_create_dir.rs +++ b/core/edge/opfs_on_wasm/src/async_create_dir.rs @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + #[cfg(test)] mod tests { From d1a81243cebc11e8e237d63ed1f54368b6401109 Mon Sep 17 00:00:00 2001 From: crwen <1543720935@qq.com> Date: Mon, 22 Sep 2025 17:41:52 +0800 Subject: [PATCH 4/8] refactor: move opfs ci to test_edge --- .github/workflows/test_behavior.yml | 16 +----- .github/workflows/test_behavior_opfs.yml | 65 ------------------------ .github/workflows/test_edge.yml | 36 +++++++++++++ 3 files changed, 37 insertions(+), 80 deletions(-) delete mode 100644 .github/workflows/test_behavior_opfs.yml diff --git a/.github/workflows/test_behavior.yml b/.github/workflows/test_behavior.yml index 4fb11d1f022c..d7ebc1cd475e 100644 --- a/.github/workflows/test_behavior.yml +++ b/.github/workflows/test_behavior.yml @@ -201,18 +201,4 @@ jobs: uses: ./.github/workflows/test_behavior_integration_object_store.yml with: os: ${{ matrix.os }} - cases: ${{ toJson(matrix.cases) }} - - test_opfs: - name: core_opfs / ${{ matrix.os }} - needs: [plan] - if: fromJson(needs.plan.outputs.plan).components.core - secrets: inherit - strategy: - fail-fast: false - matrix: - include: ${{ fromJson(needs.plan.outputs.plan).core }} - uses: ./.github/workflows/test_behavior_opfs.yml - with: - os: ${{ matrix.os }} - cases: ${{ toJson(matrix.cases) }} + cases: ${{ toJson(matrix.cases) }} \ No newline at end of file diff --git a/.github/workflows/test_behavior_opfs.yml b/.github/workflows/test_behavior_opfs.yml deleted file mode 100644 index de46cefca4f5..000000000000 --- a/.github/workflows/test_behavior_opfs.yml +++ /dev/null @@ -1,65 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -name: OPFS Test - -on: - workflow_call: - inputs: - os: - required: true - type: string - cases: - required: true - type: string - -jobs: - test_opfs_on_wasm: - runs-on: ubuntu-latest - - steps: - - name: Checkout code - uses: actions/checkout@v4 - - name: Setup Rust toolchain - uses: ./.github/actions/setup - - - name: Setup for wasm32 - run: | - rustup target add wasm32-unknown-unknown - - - name: Install Chrome Environment - run: | - mkdir -p /tmp/chrome - wget $(curl https://googlechromelabs.github.io/chrome-for-testing/known-good-versions-with-downloads.json | jq -r '.versions | sort_by(.version) | reverse | .[0] | .downloads.chrome | .[] | select(.platform == "linux64") | .url') - wget $(curl https://googlechromelabs.github.io/chrome-for-testing/known-good-versions-with-downloads.json | jq -r '.versions | sort_by(.version) | reverse | .[0] | .downloads.chromedriver | .[] | select(.platform == "linux64") | .url') - unzip chromedriver-linux64.zip - unzip chrome-linux64.zip - cp -r chrome-linux64/ /tmp/chrome/ - cp -r chromedriver-linux64 /tmp/chrome/chromedriver - - - name: Setup wasm-pack - uses: taiki-e/install-action@v2 - with: - tool: wasm-pack - - - name: Test wasm - working-directory: core/edge/opfs_on_wasm - env: - OPENDAL_TEST: opfs - run: | - export PATH=$PATH:/tmp/chrome/chrome-linux64/:/tmp/chrome/chromedriver-linux64/ - wasm-pack test --chrome --headless diff --git a/.github/workflows/test_edge.yml b/.github/workflows/test_edge.yml index 8ea0a14240f1..4ac261c2d8ee 100644 --- a/.github/workflows/test_edge.yml +++ b/.github/workflows/test_edge.yml @@ -138,3 +138,39 @@ jobs: OPENDAL_S3_BUCKET: opendal-testing OPENDAL_S3_ROLE_ARN: arn:aws:iam::952853449216:role/opendal-testing OPENDAL_S3_REGION: ap-northeast-1 + + test_opfs_on_wasm: + runs-on: ubuntu-latest + + steps: + - name: Checkout code + uses: actions/checkout@v4 + - name: Setup Rust toolchain + uses: ./.github/actions/setup + + - name: Setup for wasm32 + run: | + rustup target add wasm32-unknown-unknown + + - name: Install Chrome Environment + run: | + mkdir -p /tmp/chrome + wget $(curl https://googlechromelabs.github.io/chrome-for-testing/known-good-versions-with-downloads.json | jq -r '.versions | sort_by(.version) | reverse | .[0] | .downloads.chrome | .[] | select(.platform == "linux64") | .url') + wget $(curl https://googlechromelabs.github.io/chrome-for-testing/known-good-versions-with-downloads.json | jq -r '.versions | sort_by(.version) | reverse | .[0] | .downloads.chromedriver | .[] | select(.platform == "linux64") | .url') + unzip chromedriver-linux64.zip + unzip chrome-linux64.zip + cp -r chrome-linux64/ /tmp/chrome/ + cp -r chromedriver-linux64 /tmp/chrome/chromedriver + + - name: Setup wasm-pack + uses: taiki-e/install-action@v2 + with: + tool: wasm-pack + + - name: Test OPFS + working-directory: core/edge/opfs_on_wasm + env: + OPENDAL_TEST: opfs + run: | + export PATH=$PATH:/tmp/chrome/chrome-linux64/:/tmp/chrome/chromedriver-linux64/ + wasm-pack test --chrome --headless From 190638ba2aadd93a539d67b3d604308eeff49ced Mon Sep 17 00:00:00 2001 From: crwen <1543720935@qq.com> Date: Tue, 16 Dec 2025 16:13:55 +0800 Subject: [PATCH 5/8] refactor: remove unused file --- core/src/services/mod.rs | 0 core/src/types/scheme.rs | 0 2 files changed, 0 insertions(+), 0 deletions(-) delete mode 100644 core/src/services/mod.rs delete mode 100644 core/src/types/scheme.rs diff --git a/core/src/services/mod.rs b/core/src/services/mod.rs deleted file mode 100644 index e69de29bb2d1..000000000000 diff --git a/core/src/types/scheme.rs b/core/src/types/scheme.rs deleted file mode 100644 index e69de29bb2d1..000000000000 From 025edc4034e7cc0fc1077e4fbc46f08cac25f1fa Mon Sep 17 00:00:00 2001 From: crwen <1543720935@qq.com> Date: Tue, 16 Dec 2025 16:44:49 +0800 Subject: [PATCH 6/8] refactor: remove test --- core/Cargo.lock | 14 -- core/edge/opfs_on_wasm/Cargo.toml | 41 ---- .../edge/opfs_on_wasm/src/async_create_dir.rs | 57 ----- core/edge/opfs_on_wasm/src/async_delete.rs | 196 ------------------ core/edge/opfs_on_wasm/src/lib.rs | 30 --- core/edge/opfs_on_wasm/src/utils.rs | 157 -------------- core/edge/opfs_on_wasm/webdriver.json | 15 -- 7 files changed, 510 deletions(-) delete mode 100644 core/edge/opfs_on_wasm/Cargo.toml delete mode 100644 core/edge/opfs_on_wasm/src/async_create_dir.rs delete mode 100644 core/edge/opfs_on_wasm/src/async_delete.rs delete mode 100644 core/edge/opfs_on_wasm/src/lib.rs delete mode 100644 core/edge/opfs_on_wasm/src/utils.rs delete mode 100644 core/edge/opfs_on_wasm/webdriver.json diff --git a/core/Cargo.lock b/core/Cargo.lock index 17610e656d01..fcc412432251 100644 --- a/core/Cargo.lock +++ b/core/Cargo.lock @@ -2719,20 +2719,6 @@ dependencies = [ "tokio", ] -[[package]] -name = "edge_test_opfs_on_wasm" -version = "0.55.0" -dependencies = [ - "anyhow", - "bytes", - "futures", - "opendal", - "rand 0.8.5", - "sha2", - "uuid", - "wasm-bindgen-test", -] - [[package]] name = "edge_test_s3_read_on_wasm" version = "0.55.0" diff --git a/core/edge/opfs_on_wasm/Cargo.toml b/core/edge/opfs_on_wasm/Cargo.toml deleted file mode 100644 index 92d77d8a4c87..000000000000 --- a/core/edge/opfs_on_wasm/Cargo.toml +++ /dev/null @@ -1,41 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -[package] -name = "edge_test_opfs_on_wasm" - -edition.workspace = true -license.workspace = true -publish = false -rust-version.workspace = true -version.workspace = true - -[lib] -crate-type = ["cdylib"] - -[dependencies] -anyhow = { version = "1.0.30", features = ["std"] } -bytes = "1.6" -futures = "0.3" -opendal = { path = "../..", default-features = false, features = [ - "services-opfs", -] } -rand = { version = "0.8" } -sha2 = "0.10" -uuid = { version = "1", features = ["serde", "v4"] } - -wasm-bindgen-test = "0.3.41" diff --git a/core/edge/opfs_on_wasm/src/async_create_dir.rs b/core/edge/opfs_on_wasm/src/async_create_dir.rs deleted file mode 100644 index d6c2b22cd405..000000000000 --- a/core/edge/opfs_on_wasm/src/async_create_dir.rs +++ /dev/null @@ -1,57 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#[cfg(test)] -mod tests { - - use anyhow::Result; - use opendal::*; - use wasm_bindgen_test::{wasm_bindgen_test, wasm_bindgen_test_configure}; - - wasm_bindgen_test_configure!(run_in_browser); - - use crate::*; - - #[wasm_bindgen_test] - /// Create dir with dir path should succeed. - pub async fn test_create_dir() -> Result<()> { - let op = operator(); - let path = TEST_FIXTURE.new_dir_path(); - - op.create_dir(&path).await?; - - let meta = op.stat(&path).await?; - assert_eq!(meta.mode(), EntryMode::DIR); - Ok(()) - } - - /// Create dir on existing dir should succeed. - #[wasm_bindgen_test] - pub async fn test_create_dir_existing() -> Result<()> { - let op = operator(); - let path = TEST_FIXTURE.new_dir_path(); - - op.create_dir(&path).await?; - - op.create_dir(&path).await?; - - let meta = op.stat(&path).await?; - assert_eq!(meta.mode(), EntryMode::DIR); - - Ok(()) - } -} diff --git a/core/edge/opfs_on_wasm/src/async_delete.rs b/core/edge/opfs_on_wasm/src/async_delete.rs deleted file mode 100644 index 7a6895034d2b..000000000000 --- a/core/edge/opfs_on_wasm/src/async_delete.rs +++ /dev/null @@ -1,196 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#[cfg(test)] -mod tests { - - use anyhow::Result; - use opendal::raw::Access; - use opendal::*; - use wasm_bindgen_test::wasm_bindgen_test; - use wasm_bindgen_test::wasm_bindgen_test_configure; - - use crate::*; - - wasm_bindgen_test_configure!(run_in_browser); - - /// Delete existing file should succeed. - #[wasm_bindgen_test] - pub async fn test_delete_file() -> Result<()> { - let op = operator(); - let (path, content, _) = TEST_FIXTURE.new_file(op.clone()); - - op.write(&path, content).await.expect("write must succeed"); - - op.delete(&path).await?; - - // Stat it again to check. - assert!(!op.exists(&path).await?); - - Ok(()) - } - - /// Delete empty dir should succeed. - #[wasm_bindgen_test] - pub async fn test_delete_empty_dir() -> Result<()> { - let op = operator(); - if !op.info().full_capability().create_dir { - return Ok(()); - } - - let path = TEST_FIXTURE.new_dir_path(); - - op.create_dir(&path).await.expect("create must succeed"); - - op.delete(&path).await?; - - Ok(()) - } - - /// Delete file with special chars should succeed. - #[wasm_bindgen_test] - pub async fn test_delete_with_special_chars() -> Result<()> { - let op = operator(); - // Ignore test for atomicserver until https://github.com/atomicdata-dev/atomic-server/issues/663 addressed. - if op.info().scheme() == opendal::Scheme::Atomicserver { - return Ok(()); - } - - let path = format!("{} !@#$%^&()_+-=;',.txt", uuid::Uuid::new_v4()); - let (path, content, _) = TEST_FIXTURE.new_file_with_path(op.clone(), &path); - - op.write(&path, content).await.expect("write must succeed"); - - op.delete(&path).await?; - - // Stat it again to check. - assert!(!op.exists(&path).await?); - - Ok(()) - } - - /// Delete not existing file should also succeed. - #[wasm_bindgen_test] - pub async fn test_delete_not_existing() -> Result<()> { - let op = operator(); - let path = uuid::Uuid::new_v4().to_string(); - - op.delete(&path).await?; - - Ok(()) - } - - /// Remove one file - #[wasm_bindgen_test] - pub async fn test_remove_one_file() -> Result<()> { - let op = operator(); - let (path, content, _) = TEST_FIXTURE.new_file(op.clone()); - - op.write(&path, content.clone()) - .await - .expect("write must succeed"); - - op.delete_iter(vec![path.clone()]).await?; - - // Stat it again to check. - assert!(!op.exists(&path).await?); - - op.write(&format!("/{path}"), content) - .await - .expect("write must succeed"); - - op.delete_iter(vec![path.clone()]).await?; - - // Stat it again to check. - assert!(!op.exists(&path).await?); - - Ok(()) - } - - /// Delete via stream. - #[wasm_bindgen_test] - pub async fn test_delete_stream() -> Result<()> { - let op = operator(); - if !op.info().full_capability().create_dir { - return Ok(()); - } - // Gdrive think that this test is an abuse of their service and redirect us - // to an infinite loop. Let's ignore this test for gdrive. - if op.info().scheme() == Scheme::Gdrive { - return Ok(()); - } - - let dir = uuid::Uuid::new_v4().to_string(); - op.create_dir(&format!("{dir}/")) - .await - .expect("create must succeed"); - - let expected: Vec<_> = (0..100).collect(); - for path in expected.iter() { - op.write(&format!("{dir}/{path}"), "delete_stream").await?; - } - - let mut deleter = op.deleter().await?; - deleter - .delete_iter(expected.iter().map(|v| format!("{dir}/{v}"))) - .await?; - deleter.close().await?; - - // Stat it again to check. - for path in expected.iter() { - assert!( - !op.exists(&format!("{dir}/{path}")).await?, - "{path} should be removed" - ) - } - - Ok(()) - } - - #[wasm_bindgen_test] - pub async fn test_batch_delete() -> Result<()> { - let op = operator(); - let mut cap = op.info().full_capability(); - if cap.delete_max_size.unwrap_or(1) <= 1 { - return Ok(()); - } - - cap.delete_max_size = Some(2); - op.inner().info().update_full_capability(|_| cap); - - let mut files = Vec::new(); - for _ in 0..5 { - let (path, content, _) = TEST_FIXTURE.new_file(op.clone()); - op.write(path.as_str(), content) - .await - .expect("write must succeed"); - files.push(path); - } - - op.delete_iter(files.clone()) - .await - .expect("batch delete must succeed"); - - for path in files { - let stat = op.stat(path.as_str()).await; - assert!(stat.is_err()); - assert_eq!(stat.unwrap_err().kind(), ErrorKind::NotFound); - } - - Ok(()) - } -} diff --git a/core/edge/opfs_on_wasm/src/lib.rs b/core/edge/opfs_on_wasm/src/lib.rs deleted file mode 100644 index ba2af04335f6..000000000000 --- a/core/edge/opfs_on_wasm/src/lib.rs +++ /dev/null @@ -1,30 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -mod async_create_dir; -mod async_delete; -mod utils; - -use opendal::{Configurator, Operator, services::OpfsConfig}; -pub use utils::*; - -pub static TEST_FIXTURE: Fixture = Fixture::new(); - -pub(crate) fn operator() -> Operator { - let builder = OpfsConfig::default().into_builder().root("/a/b/"); - Operator::new(builder).unwrap().finish() -} diff --git a/core/edge/opfs_on_wasm/src/utils.rs b/core/edge/opfs_on_wasm/src/utils.rs deleted file mode 100644 index a535110bf601..000000000000 --- a/core/edge/opfs_on_wasm/src/utils.rs +++ /dev/null @@ -1,157 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use std::mem; -use std::sync::Mutex; - -use opendal::*; -use rand::distributions::uniform::SampleRange; -use rand::prelude::*; - -pub fn gen_bytes_with_range(range: impl SampleRange) -> (Vec, usize) { - let mut rng = thread_rng(); - - let size = rng.gen_range(range); - let mut content = vec![0; size]; - rng.fill_bytes(&mut content); - - (content, size) -} - -pub fn gen_bytes(cap: Capability) -> (Vec, usize) { - let max_size = cap.write_total_max_size.unwrap_or(4 * 1024 * 1024); - gen_bytes_with_range(1..max_size) -} - -pub fn gen_fixed_bytes(size: usize) -> Vec { - let (content, _) = gen_bytes_with_range(size..=size); - - content -} - -pub fn gen_offset_length(size: usize) -> (u64, u64) { - let mut rng = thread_rng(); - - // Make sure at least one byte is read. - let offset = rng.gen_range(0..size - 1); - let length = rng.gen_range(1..(size - offset)); - - (offset as u64, length as u64) -} - -pub struct Fixture { - pub paths: Mutex>, -} - -impl Default for Fixture { - fn default() -> Self { - Self::new() - } -} - -impl Fixture { - /// Create a new fixture - pub const fn new() -> Self { - Self { - paths: Mutex::new(vec![]), - } - } - - /// Add a path. - pub fn add_path(&self, path: String) { - self.paths.lock().unwrap().push(path); - } - - /// Create a new dir path - pub fn new_dir_path(&self) -> String { - let path = format!("{}/", uuid::Uuid::new_v4()); - self.paths.lock().unwrap().push(path.clone()); - - path - } - - /// Create a new file path - pub fn new_file_path(&self) -> String { - let path = format!("{}", uuid::Uuid::new_v4()); - self.paths.lock().unwrap().push(path.clone()); - - path - } - - /// Create a new file with random content - pub fn new_file(&self, op: impl Into) -> (String, Vec, usize) { - let max_size = op - .into() - .info() - .full_capability() - .write_total_max_size - .unwrap_or(4 * 1024 * 1024); - - self.new_file_with_range(uuid::Uuid::new_v4().to_string(), 1..max_size) - } - - pub fn new_file_with_path( - &self, - op: impl Into, - path: &str, - ) -> (String, Vec, usize) { - let max_size = op - .into() - .info() - .full_capability() - .write_total_max_size - .unwrap_or(4 * 1024 * 1024); - - self.new_file_with_range(path, 1..max_size) - } - - /// Create a new file with random content in range. - fn new_file_with_range( - &self, - path: impl Into, - range: impl SampleRange, - ) -> (String, Vec, usize) { - let path = path.into(); - self.paths.lock().unwrap().push(path.clone()); - - let mut rng = thread_rng(); - - let size = rng.gen_range(range); - let mut content = vec![0; size]; - rng.fill_bytes(&mut content); - - (path, content, size) - } - - /// Perform cleanup - pub async fn cleanup(&self, op: impl Into) { - let op = op.into(); - // Don't cleanup data if delete is not supported - if !op.info().full_capability().delete { - return; - } - - let paths: Vec<_> = mem::take(self.paths.lock().unwrap().as_mut()); - // Don't call delete if paths is empty - if paths.is_empty() { - return; - } - - // We try our best to clean up fixtures, but won't panic if failed. - let _ = op.delete_iter(paths).await; - } -} diff --git a/core/edge/opfs_on_wasm/webdriver.json b/core/edge/opfs_on_wasm/webdriver.json deleted file mode 100644 index c2d6865e644e..000000000000 --- a/core/edge/opfs_on_wasm/webdriver.json +++ /dev/null @@ -1,15 +0,0 @@ -{ - "moz:firefoxOptions": { - "prefs": { - "media.navigator.streams.fake": true, - "media.navigator.permission.disabled": true - }, - "args": [] - }, - "goog:chromeOptions": { - "args": [ - "--use-fake-device-for-media-stream", - "--use-fake-ui-for-media-stream" - ] - } -} From 8d1d76df793319423e555badf5c2b333b96ed229 Mon Sep 17 00:00:00 2001 From: crwen <1543720935@qq.com> Date: Tue, 16 Dec 2025 18:20:20 +0800 Subject: [PATCH 7/8] refactor: remove unused code --- .github/workflows/test_behavior.yml | 2 +- core/core/src/services/opfs/backend.rs | 23 +-- core/core/src/services/opfs/builder.rs | 21 +- core/core/src/services/opfs/core.rs | 77 +------- core/core/src/services/opfs/docs.md | 6 +- core/core/src/services/opfs/mod.rs | 19 +- core/core/src/services/opfs/utils.rs | 19 +- dev/Cargo.lock | 260 ++----------------------- 8 files changed, 64 insertions(+), 363 deletions(-) diff --git a/.github/workflows/test_behavior.yml b/.github/workflows/test_behavior.yml index 93f73d8a8f4b..1558eac8ad6e 100644 --- a/.github/workflows/test_behavior.yml +++ b/.github/workflows/test_behavior.yml @@ -203,4 +203,4 @@ jobs: uses: ./.github/workflows/test_behavior_integration_object_store.yml with: os: ${{ matrix.os }} - cases: ${{ toJson(matrix.cases) }} \ No newline at end of file + cases: ${{ toJson(matrix.cases) }} diff --git a/core/core/src/services/opfs/backend.rs b/core/core/src/services/opfs/backend.rs index baf88bb309d3..d88c0bfdaad4 100644 --- a/core/core/src/services/opfs/backend.rs +++ b/core/core/src/services/opfs/backend.rs @@ -18,9 +18,6 @@ use std::fmt::Debug; use std::sync::Arc; -use web_sys::FileSystemGetDirectoryOptions; - -use super::utils::*; use crate::raw::*; use crate::services::opfs::core::OpfsCore; use crate::services::opfs::delete::OpfsDeleter; @@ -48,25 +45,7 @@ impl Access for OpfsBackend { type Deleter = oio::OneShotDeleter; fn info(&self) -> Arc { - let info = AccessorInfo::default(); - info.set_native_capability(Capability { - stat: true, - - read: true, - - write: true, - write_can_empty: true, - write_can_append: true, - write_can_multi: true, - - create_dir: true, - delete: true, - - list: true, - - ..Default::default() - }); - Arc::new(info) + self.core.info.clone() } async fn stat(&self, path: &str, _: OpStat) -> Result { diff --git a/core/core/src/services/opfs/builder.rs b/core/core/src/services/opfs/builder.rs index c9ce1295912a..0a2c06baab15 100644 --- a/core/core/src/services/opfs/builder.rs +++ b/core/core/src/services/opfs/builder.rs @@ -17,9 +17,12 @@ use std::sync::Arc; -use crate::{Builder, Configurator, Error, ErrorKind, Result, raw::Access}; +use crate::{ + Builder, Capability, Configurator, Error, ErrorKind, Result, + raw::{Access, AccessorInfo}, +}; -use super::{backend::OpfsBackend, config::OpfsConfig, core::OpfsCore}; +use super::{OPFS_SCHEME, backend::OpfsBackend, config::OpfsConfig, core::OpfsCore}; impl Configurator for OpfsConfig { type Builder = OpfsBuilder; @@ -40,6 +43,7 @@ impl OpfsBuilder { } /// Set root for backend. + #[allow(unused)] pub fn root(mut self, root: &str) -> Self { self.config.root = if root.is_empty() { None @@ -60,7 +64,18 @@ impl Builder for OpfsBuilder { .with_operation("Builder::build"), )?; - let core = Arc::new(OpfsCore::new(root)); + let info = AccessorInfo::default(); + info.set_scheme(OPFS_SCHEME) + .set_root(&root) + .set_native_capability(Capability { + stat: true, + create_dir: true, + delete: true, + ..Default::default() + }); + + let core = Arc::new(OpfsCore::new(Arc::new(info), root)); + Ok(OpfsBackend::new(core)) } } diff --git a/core/core/src/services/opfs/core.rs b/core/core/src/services/opfs/core.rs index 245e9bdc2497..3999306b58a4 100644 --- a/core/core/src/services/opfs/core.rs +++ b/core/core/src/services/opfs/core.rs @@ -16,15 +16,14 @@ // under the License. use std::fmt::Debug; +use std::sync::Arc; -use js_sys::AsyncIterator; use wasm_bindgen::JsCast; use wasm_bindgen_futures::JsFuture; use web_sys::File; use web_sys::FileSystemDirectoryHandle; use web_sys::FileSystemFileHandle; use web_sys::FileSystemGetDirectoryOptions; -use web_sys::FileSystemGetFileOptions; use crate::EntryMode; use crate::Metadata; @@ -35,36 +34,20 @@ use super::utils::*; use crate::raw::*; #[derive(Debug, Default)] -pub struct OpfsCore { - root: String, +pub(super) struct OpfsCore { + pub info: Arc, + pub root: String, } impl OpfsCore { - pub(crate) fn new(root: String) -> Self { - Self { root } + pub(crate) fn new(info: Arc, root: String) -> Self { + Self { info, root } } pub(crate) fn path(&self, path: &str) -> String { build_abs_path(&self.root, path) } - /// Build write path and ensure the parent dirs created - pub(crate) async fn ensure_write_path(&self, path: &str) -> Result<()> { - let opt = FileSystemGetDirectoryOptions::new(); - opt.set_create(true); - - let path = build_abs_path(&self.root, path); - let path = path - .trim_end_matches('/') - .rsplit_once('/') - .map(|s| s.0) - .unwrap_or("/"); - - get_directory_handle(path, &opt).await?; - - Ok(()) - } - pub(crate) async fn opfs_stat(&self, path: &str) -> Result { let parent_handle = self.parent_dir_handle(path).await?; let path = build_abs_path(&self.root, &path); @@ -93,12 +76,10 @@ impl OpfsCore { .and_then(JsCast::dyn_into) .map_err(parse_js_error)?; - // let last_modified = file.last_modified() as i64; - let metadata = - Metadata::new(EntryMode::FILE).with_content_length(file.size() as u64); - // .with_last_modified(parse_datetime_from_from_timestamp_millis( - // last_modified, - // )?); + let last_modified = file.last_modified() as i64; + let metadata = Metadata::new(EntryMode::FILE) + .with_content_length(file.size() as u64) + .with_last_modified(Timestamp::from_millisecond(last_modified)?); Ok(metadata) } @@ -117,15 +98,6 @@ impl OpfsCore { Ok(()) } - /// List directory entries. Returns an [`AsyncIterator`] over the entries in the directory. - pub(crate) async fn opfs_list(&self, path: &str) -> Result { - let opt = FileSystemGetDirectoryOptions::new(); - - let handle = self.dir_handle_with_option(path, &opt).await?; - - Ok(handle.entries()) - } - /// Get directory handle with options pub(crate) async fn dir_handle_with_option( &self, @@ -161,33 +133,4 @@ impl OpfsCore { Ok(handle) } - - /// Get file handle - pub(crate) async fn file_handle(&self, path: &str) -> Result { - let opt = FileSystemGetFileOptions::new(); - self.file_handle_with_option(path, &opt).await - } - - /// Get file handle with options - pub(crate) async fn file_handle_with_option( - &self, - path: &str, - opt: &FileSystemGetFileOptions, - ) -> Result { - let path = build_abs_path(&self.root, path); - let paths: Vec<&str> = path.trim_matches('/').split('/').collect(); - - let mut handle = get_root_directory_handle().await?; - for dir in paths[0..paths.len() - 1].iter() { - handle = JsFuture::from(handle.get_directory_handle(dir)) - .await - .and_then(JsCast::dyn_into) - .map_err(parse_js_error)?; - } - - JsFuture::from(handle.get_file_handle_with_options(paths[paths.len() - 1], &opt)) - .await - .and_then(JsCast::dyn_into) - .map_err(parse_js_error) - } } diff --git a/core/core/src/services/opfs/docs.md b/core/core/src/services/opfs/docs.md index a529c2ff3b5e..f255deddc8ef 100644 --- a/core/core/src/services/opfs/docs.md +++ b/core/core/src/services/opfs/docs.md @@ -2,11 +2,11 @@ This service can be used to: -- [ ] create_dir -- [ ] stat +- [x] create_dir +- [x] stat - [ ] read - [ ] write -- [ ] delete +- [x] delete - [ ] list - [ ] copy - [ ] rename diff --git a/core/core/src/services/opfs/mod.rs b/core/core/src/services/opfs/mod.rs index cccc43f02f86..fe2ffd27d325 100644 --- a/core/core/src/services/opfs/mod.rs +++ b/core/core/src/services/opfs/mod.rs @@ -15,24 +15,23 @@ // specific language governing permissions and limitations // under the License. -#[cfg(all(feature = "services-opfs", target_arch = "wasm32"))] /// Default scheme for opfs service. pub const OPFS_SCHEME: &str = "opfs"; +use crate::types::DEFAULT_OPERATOR_REGISTRY; + mod backend; -#[cfg(all(feature = "services-opfs", target_arch = "wasm32"))] mod builder; -#[cfg(all(feature = "services-opfs", target_arch = "wasm32"))] +mod config; mod core; - -#[cfg(all(feature = "services-opfs", target_arch = "wasm32"))] mod delete; - -#[cfg(all(feature = "services-opfs", target_arch = "wasm32"))] mod error; - -#[cfg(all(feature = "services-opfs", target_arch = "wasm32"))] mod utils; -mod config; +pub use builder::OpfsBuilder as OPFS; pub use config::OpfsConfig; + +#[ctor::ctor] +fn register_memory_service() { + DEFAULT_OPERATOR_REGISTRY.register::(OPFS_SCHEME); +} diff --git a/core/core/src/services/opfs/utils.rs b/core/core/src/services/opfs/utils.rs index 47149b8f40f9..ddd5a5a59440 100644 --- a/core/core/src/services/opfs/utils.rs +++ b/core/core/src/services/opfs/utils.rs @@ -18,7 +18,7 @@ use crate::Result; use wasm_bindgen::JsCast; use wasm_bindgen_futures::JsFuture; -use web_sys::{FileSystemDirectoryHandle, FileSystemGetDirectoryOptions, window}; +use web_sys::{FileSystemDirectoryHandle, window}; use super::error::*; @@ -30,20 +30,3 @@ pub(crate) async fn get_root_directory_handle() -> Result Result { - let dirs: Vec<&str> = dir.trim_matches('/').split('/').collect(); - - let mut handle = get_root_directory_handle().await?; - for dir in dirs { - handle = JsFuture::from(handle.get_directory_handle_with_options(dir, dir_opt)) - .await - .and_then(JsCast::dyn_into) - .map_err(parse_js_error)?; - } - - Ok(handle) -} diff --git a/dev/Cargo.lock b/dev/Cargo.lock index bb2736b5bd34..b776dabb200a 100644 --- a/dev/Cargo.lock +++ b/dev/Cargo.lock @@ -1,6 +1,6 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 4 +version = 3 [[package]] name = "adler2" @@ -149,15 +149,6 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b63caa9aa9397e2d9480a9b13673856c78d8ac123288526c37d7839f2a86990" -[[package]] -name = "colored" -version = "3.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fde0e0ec90c9dfb3b4b1a0891a7dcd0e2bffde2f7efed5fe7c9bb00e5bfb915e" -dependencies = [ - "windows-sys", -] - [[package]] name = "cpufeatures" version = "0.2.17" @@ -233,6 +224,16 @@ version = "1.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "60b1af1c220855b6ceac025d3f6ecdd2b7c4894bfe9cd9bda4fbb4bc7c0d4cf0" +[[package]] +name = "env_filter" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "186e05a59d4c50738528153b83b0b0194d3a29507dfec16eccd4b342903397d0" +dependencies = [ + "log", + "regex", +] + [[package]] name = "env_home" version = "0.1.0" @@ -245,17 +246,6 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" -[[package]] -name = "erased-serde" -version = "0.4.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89e8918065695684b2b0702da20382d5ae6065cf3327bc2d6436bd49a71ce9f3" -dependencies = [ - "serde", - "serde_core", - "typeid", -] - [[package]] name = "errno" version = "0.3.10" @@ -437,79 +427,20 @@ checksum = "d26c52dbd32dccf2d10cac7725f8eae5296885fb5703b261f7d0a0739ec807ab" [[package]] name = "log" -version = "0.4.29" +version = "0.4.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897" -dependencies = [ - "sval", - "sval_ref", - "value-bag", -] +checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24" [[package]] name = "logforth" -version = "0.29.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "40c105c59828d07aeb95b06f9a345b12869ddc249d44a7302697a66da439076f" -dependencies = [ - "logforth-append-file", - "logforth-bridge-log", - "logforth-core", - "logforth-layout-json", - "logforth-layout-text", -] - -[[package]] -name = "logforth-append-file" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7d2ccb8b7e501c114e80069eb2b83c02a48039c23a7037e717b5b09a4ed306fb" -dependencies = [ - "jiff", - "logforth-core", -] - -[[package]] -name = "logforth-bridge-log" -version = "0.3.0" +version = "0.23.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a4aa6ca548389fd166a995b5940e15b0dacbdd5a30f2f24eac9aa4bf664bda5c" -dependencies = [ - "log", - "logforth-core", -] - -[[package]] -name = "logforth-core" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a77869b8dba38c67ed19e1753e59d9faefdcc60557bc4e84db0348606a304ac5" +checksum = "f9b81df91671a0a96902d950498cc69b509291c37f6d49105d5a1b7ddacc727d" dependencies = [ "anyhow", - "value-bag", -] - -[[package]] -name = "logforth-layout-json" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "01b80d310e0670560404a825f64dbd78a8761c5bb7da952513e90ba9dd525bd2" -dependencies = [ + "env_filter", "jiff", - "logforth-core", - "serde", - "serde_json", -] - -[[package]] -name = "logforth-layout-text" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e2a4674e549a59eeac8e301584143186c433181bdc5460046a130becedef6a3d" -dependencies = [ - "colored", - "jiff", - "logforth-core", + "log", ] [[package]] @@ -687,52 +618,24 @@ checksum = "f79dfe2d285b0488816f30e700a7438c5a73d816b5b7d3ac72fbc48b0d185e03" [[package]] name = "serde" -version = "1.0.228" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9a8e94ea7f378bd32cbbd37198a4a91436180c5bb472411e48b5ec2e2124ae9e" -dependencies = [ - "serde_core", - "serde_derive", -] - -[[package]] -name = "serde_buf" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fc948de1bbead18a61be0b33182636603ea0239ca2577b9704fc39eba900e4e5" -dependencies = [ - "serde_core", -] - -[[package]] -name = "serde_core" -version = "1.0.228" +version = "1.0.216" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "41d385c7d4ca58e59fc732af25c3983b67ac852c1a25000afe1175de458b67ad" +checksum = "0b9781016e935a97e8beecf0c933758c97a5520d32930e460142b4cd80c6338e" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.228" +version = "1.0.216" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79" +checksum = "46f859dbbf73865c6627ed570e78961cd3ac92407a2d117204c49232485da55e" dependencies = [ "proc-macro2", "quote", "syn", ] -[[package]] -name = "serde_fmt" -version = "1.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6e497af288b3b95d067a23a4f749f2861121ffcb2f6d8379310dcda040c345ed" -dependencies = [ - "serde_core", -] - [[package]] name = "serde_json" version = "1.0.138" @@ -772,84 +675,6 @@ version = "0.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" -[[package]] -name = "sval" -version = "2.16.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "502b8906c4736190684646827fbab1e954357dfe541013bbd7994d033d53a1ca" - -[[package]] -name = "sval_buffer" -version = "2.16.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c4b854348b15b6c441bdd27ce9053569b016a0723eab2d015b1fd8e6abe4f708" -dependencies = [ - "sval", - "sval_ref", -] - -[[package]] -name = "sval_dynamic" -version = "2.16.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a0bd9e8b74410ddad37c6962587c5f9801a2caadba9e11f3f916ee3f31ae4a1f" -dependencies = [ - "sval", -] - -[[package]] -name = "sval_fmt" -version = "2.16.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6fe17b8deb33a9441280b4266c2d257e166bafbaea6e66b4b34ca139c91766d9" -dependencies = [ - "itoa", - "ryu", - "sval", -] - -[[package]] -name = "sval_json" -version = "2.16.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "854addb048a5bafb1f496c98e0ab5b9b581c3843f03ca07c034ae110d3b7c623" -dependencies = [ - "itoa", - "ryu", - "sval", -] - -[[package]] -name = "sval_nested" -version = "2.16.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "96cf068f482108ff44ae8013477cb047a1665d5f1a635ad7cf79582c1845dce9" -dependencies = [ - "sval", - "sval_buffer", - "sval_ref", -] - -[[package]] -name = "sval_ref" -version = "2.16.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ed02126365ffe5ab8faa0abd9be54fbe68d03d607cd623725b0a71541f8aaa6f" -dependencies = [ - "sval", -] - -[[package]] -name = "sval_serde" -version = "2.16.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a263383c6aa2076c4ef6011d3bae1b356edf6ea2613e3d8e8ebaa7b57dd707d5" -dependencies = [ - "serde_core", - "sval", - "sval_nested", -] - [[package]] name = "syn" version = "2.0.100" @@ -894,12 +719,6 @@ dependencies = [ "winnow", ] -[[package]] -name = "typeid" -version = "1.0.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc7d623258602320d5c55d1bc22793b57daff0ec7efc270ea7d55ce1d5f5471c" - [[package]] name = "typenum" version = "1.17.0" @@ -924,43 +743,6 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" -[[package]] -name = "value-bag" -version = "1.12.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7ba6f5989077681266825251a52748b8c1d8a4ad098cc37e440103d0ea717fc0" -dependencies = [ - "value-bag-serde1", - "value-bag-sval2", -] - -[[package]] -name = "value-bag-serde1" -version = "1.12.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "16530907bfe2999a1773ca5900a65101e092c70f642f25cc23ca0c43573262c5" -dependencies = [ - "erased-serde", - "serde_buf", - "serde_core", - "serde_fmt", -] - -[[package]] -name = "value-bag-sval2" -version = "1.12.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d00ae130edd690eaa877e4f40605d534790d1cf1d651e7685bd6a144521b251f" -dependencies = [ - "sval", - "sval_buffer", - "sval_dynamic", - "sval_fmt", - "sval_json", - "sval_ref", - "sval_serde", -] - [[package]] name = "version_check" version = "0.9.5" From 58274407e2b385c32189a7b2c0434f38233d6497 Mon Sep 17 00:00:00 2001 From: crwen <1543720935@qq.com> Date: Tue, 16 Dec 2025 21:59:23 +0800 Subject: [PATCH 8/8] feat: delete with recursive --- core/core/src/services/mod.rs | 2 ++ core/core/src/services/opfs/builder.rs | 2 ++ core/core/src/services/opfs/delete.rs | 4 ++-- 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/core/core/src/services/mod.rs b/core/core/src/services/mod.rs index 2f211a4f5248..586e1340046c 100644 --- a/core/core/src/services/mod.rs +++ b/core/core/src/services/mod.rs @@ -216,3 +216,5 @@ pub use yandex_disk::*; #[cfg(all(target_arch = "wasm32", feature = "services-opfs"))] mod opfs; +#[cfg(all(target_arch = "wasm32", feature = "services-opfs"))] +pub use opfs::*; diff --git a/core/core/src/services/opfs/builder.rs b/core/core/src/services/opfs/builder.rs index 0a2c06baab15..3abc62243394 100644 --- a/core/core/src/services/opfs/builder.rs +++ b/core/core/src/services/opfs/builder.rs @@ -32,6 +32,7 @@ impl Configurator for OpfsConfig { } } +/// OPFS[https://developer.mozilla.org/en-US/docs/Web/API/File_System_API/Origin_private_file_system] backend support. #[derive(Default)] pub struct OpfsBuilder { config: OpfsConfig, @@ -71,6 +72,7 @@ impl Builder for OpfsBuilder { stat: true, create_dir: true, delete: true, + delete_with_recursive: true, ..Default::default() }); diff --git a/core/core/src/services/opfs/delete.rs b/core/core/src/services/opfs/delete.rs index df1e062149e5..ef880926fcb0 100644 --- a/core/core/src/services/opfs/delete.rs +++ b/core/core/src/services/opfs/delete.rs @@ -37,7 +37,7 @@ impl OpfsDeleter { } impl oio::OneShotDelete for OpfsDeleter { - async fn delete_once(&self, path: String, _: OpDelete) -> Result<()> { + async fn delete_once(&self, path: String, op: OpDelete) -> Result<()> { let handle = match self.core.parent_dir_handle(&path).await { Ok(handle) => handle, Err(err) if err.kind() == ErrorKind::NotFound => return Ok(()), @@ -52,7 +52,7 @@ impl oio::OneShotDelete for OpfsDeleter { .unwrap_or("/"); let opt = FileSystemRemoveOptions::new(); - opt.set_recursive(false); + opt.set_recursive(op.recursive()); match JsFuture::from(handle.remove_entry_with_options(entry_name, &opt)) .await