Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,12 @@

// Make sure all our public APIs have docs.
#![deny(missing_docs)]
// WASI supports are still unstable
#![cfg_attr(
all(feature = "services-fs", target_arch = "wasm32"),
feature(wasi_ext)
)]
#![cfg_attr(all(feature = "services-fs", target_arch = "wasm32"), feature(wasip2))]

// Private module with public types, they will be accessed via `opendal::Xxxx`
mod types;
Expand Down
2 changes: 2 additions & 0 deletions core/src/raw/adapters/kv/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ impl Scan for () {
}

/// A Scan implementation for all trivial non-async iterators
#[cfg(any(feature = "services-rocksdb", feature = "services-sled"))]
pub struct ScanStdIter<I>(I);

#[cfg(any(feature = "services-rocksdb", feature = "services-sled"))]
Expand All @@ -55,6 +56,7 @@ where
}
}

#[cfg(any(feature = "services-rocksdb", feature = "services-sled"))]
impl<I> Scan for ScanStdIter<I>
where
I: Iterator<Item = Result<String>> + Unpin + Send + Sync,
Expand Down
59 changes: 32 additions & 27 deletions core/src/raw/http_util/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,19 @@
// specific language governing permissions and limitations
// under the License.

use std::convert::Infallible;
use std::fmt::Debug;
use std::fmt::Formatter;
use std::future;
use std::mem;
use std::ops::Deref;
use std::pin::Pin;
use std::str::FromStr;
use std::sync::Arc;
use std::sync::LazyLock;
use std::task::Context;
use std::task::Poll;

use bytes::Bytes;
use futures::Future;
use futures::TryStreamExt;
use http::Request;
use http::Response;
use http_body::Frame;
use http_body::SizeHint;
use raw::oio::Read;

use super::parse_content_encoding;
Expand Down Expand Up @@ -180,7 +173,7 @@ impl HttpFetch for reqwest::Client {
if !body.is_empty() {
#[cfg(not(target_arch = "wasm32"))]
{
req_builder = req_builder.body(reqwest::Body::wrap(HttpBufferBody(body)))
req_builder = req_builder.body(reqwest::Body::wrap(body::HttpBufferBody(body)))
}
#[cfg(target_arch = "wasm32")]
{
Expand Down Expand Up @@ -250,27 +243,39 @@ fn is_temporary_error(err: &reqwest::Error) -> bool {
err.is_decode()
}

struct HttpBufferBody(Buffer);

impl http_body::Body for HttpBufferBody {
type Data = Bytes;
type Error = Infallible;

fn poll_frame(
mut self: Pin<&mut Self>,
_: &mut Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
match self.0.next() {
Some(bs) => Poll::Ready(Some(Ok(Frame::data(bs)))),
None => Poll::Ready(None),
#[cfg(not(target_arch = "wasm32"))]
mod body {
use std::convert::Infallible;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;

use bytes::Bytes;
use http_body::Frame;
use http_body::SizeHint;

pub(super) struct HttpBufferBody(pub(super) super::Buffer);

impl http_body::Body for HttpBufferBody {
type Data = Bytes;
type Error = Infallible;

fn poll_frame(
mut self: Pin<&mut Self>,
_: &mut Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
match self.0.next() {
Some(bs) => Poll::Ready(Some(Ok(Frame::data(bs)))),
None => Poll::Ready(None),
}
}
}

fn is_end_stream(&self) -> bool {
self.0.is_empty()
}
fn is_end_stream(&self) -> bool {
self.0.is_empty()
}

fn size_hint(&self) -> SizeHint {
SizeHint::with_exact(self.0.len() as u64)
fn size_hint(&self) -> SizeHint {
SizeHint::with_exact(self.0.len() as u64)
}
}
}
6 changes: 6 additions & 0 deletions core/src/services/fs/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,3 +210,9 @@ fn write_at(f: &File, buf: &[u8], offset: u64) -> Result<usize> {
use std::os::unix::fs::FileExt;
f.write_at(buf, offset).map_err(new_std_io_error)
}

#[cfg(target_os = "wasi")]
fn write_at(f: &File, buf: &[u8], offset: u64) -> Result<usize> {
use std::os::wasi::fs::FileExt;
f.write_at(buf, offset).map_err(new_std_io_error)
}
4 changes: 4 additions & 0 deletions core/src/types/execute/executors/tokio_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,11 @@ pub struct TokioExecutor {}
impl Execute for TokioExecutor {
/// Tokio's JoinHandle has its own `abort` support, so dropping handle won't cancel the task.
fn execute(&self, f: BoxedStaticFuture<()>) {
#[cfg(not(target_arch = "wasm32"))]
let _handle = tokio::task::spawn(f);

#[cfg(target_arch = "wasm32")]
let _handle = tokio::task::spawn_local(f);
}
}

Expand Down
Loading