diff --git a/.github/workflows/tsan-test.yml b/.github/workflows/tsan-test.yml deleted file mode 100644 index e7d9142..0000000 --- a/.github/workflows/tsan-test.yml +++ /dev/null @@ -1,29 +0,0 @@ -on: - push: - branches: [ main ] - pull_request: - branches: [ main ] - -name: TSAN Integration Test - -jobs: - build: - runs-on: ubuntu-latest - strategy: - matrix: - include: - - build: "" - - build: "--release" - - steps: - - uses: actions/checkout@v4 - - uses: dtolnay/rust-toolchain@stable - with: - toolchain: nightly - components: rust-src - - - run: cargo test ${{ matrix.build }} --features=short-potato --manifest-path bbqtest/Cargo.toml -Zbuild-std --target x86_64-unknown-linux-gnu -- --nocapture - env: - RUSTFLAGS: "-Z sanitizer=thread" - RUST_TEST_THREADS: 1 - TSAN_OPTIONS: "suppressions=${{ github.workspace }}/tsan-blacklist.txt" diff --git a/bbq2/.github/workflows/build.yml b/bbq2/.github/workflows/build.yml new file mode 100644 index 0000000..1672629 --- /dev/null +++ b/bbq2/.github/workflows/build.yml @@ -0,0 +1,42 @@ +name: Build and Test + +on: + push: + branches: ["main"] + pull_request: + branches: ["main"] + workflow_dispatch: + +jobs: + miri: + name: "Build all crates" + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - name: Install embedded target + # Note: once https://github.com/hawkw/mycelium/pull/538 lands we can test on + # thumbv6m-none-eabi + run: rustup target add thumbv7em-none-eabi + # + # BUILD + TEST + # + # no features, on std + - name: Check bbq2 (no features, on host) + run: cargo build --no-default-features + # default features, on std + - name: Check bbq2 (default features, on host) + run: cargo build + # std features, on std + - name: Check bbq2 (std features, on host) + run: cargo build --features=std + # std features, on std, test + - name: Test bbq2 (std features, on host) + run: cargo test --features=std + + # no features, on mcu + - name: Check bbq2 (no features, on mcu) + run: cargo build --no-default-features --target=thumbv7em-none-eabi + # default features, on mcu + - name: Check bbq2 (no features, on mcu) + run: cargo build --target=thumbv7em-none-eabi + diff --git a/bbq2/.github/workflows/miri.yml b/bbq2/.github/workflows/miri.yml new file mode 100644 index 0000000..d8ae0e3 --- /dev/null +++ b/bbq2/.github/workflows/miri.yml @@ -0,0 +1,22 @@ +name: Run miri tests + +on: + push: + branches: ["main"] + pull_request: + branches: ["main"] + workflow_dispatch: + +jobs: + miri: + name: "miri all the things" + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - name: Install miri component + run: rustup component add --toolchain nightly-x86_64-unknown-linux-gnu miri + # + # crate + # + - name: Miri test bbq2 + run: ./miri.sh diff --git a/bbq2/.gitignore b/bbq2/.gitignore new file mode 100644 index 0000000..ea8c4bf --- /dev/null +++ b/bbq2/.gitignore @@ -0,0 +1 @@ +/target diff --git a/bbq2/Cargo.lock b/bbq2/Cargo.lock new file mode 100644 index 0000000..91a15a3 --- /dev/null +++ b/bbq2/Cargo.lock @@ -0,0 +1,669 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 4 + +[[package]] +name = "addr2line" +version = "0.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e4503c46a5c0c7844e948c9a4d6acd9f50cccb4de1c48eb9e291ea17470c678" +dependencies = [ + "gimli", +] + +[[package]] +name = "adler" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" + +[[package]] +name = "aho-corasick" +version = "1.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e60d3430d3a69478ad0993f19238d2df97c507009a52b3c10addcd7f6bcb916" +dependencies = [ + "memchr", +] + +[[package]] +name = "backtrace" +version = "0.3.73" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5cc23269a4f8976d0a4d2e7109211a419fe30e8d88d677cd60b6bc79c5732e0a" +dependencies = [ + "addr2line", + "cc", + "cfg-if", + "libc", + "miniz_oxide", + "object", + "rustc-demangle", +] + +[[package]] +name = "bbq2" +version = "0.4.2" +dependencies = [ + "const-init", + "critical-section", + "maitake-sync", + "tokio", +] + +[[package]] +name = "cc" +version = "1.0.101" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac367972e516d45567c7eafc73d24e1c193dcf200a8d94e9db7b3d38b349572d" + +[[package]] +name = "cfg-if" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" + +[[package]] +name = "const-init" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4bd422bfb4f24a97243f60b6a4443e63d810c925d8da4bb2d8fde26a7c1d57ec" + +[[package]] +name = "cordyceps" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec10f0a762d93c4498d2e97a333805cb6250d60bead623f71d8034f9a4152ba3" +dependencies = [ + "loom 0.5.6", + "tracing", +] + +[[package]] +name = "critical-section" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "790eea4361631c5e7d22598ecd5723ff611904e3344ce8720784c93e3d83d40b" + +[[package]] +name = "generator" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5cc16584ff22b460a382b7feec54b23d2908d858152e5739a120b949293bd74e" +dependencies = [ + "cc", + "libc", + "log", + "rustversion", + "windows 0.48.0", +] + +[[package]] +name = "generator" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "186014d53bc231d0090ef8d6f03e0920c54d85a5ed22f4f2f74315ec56cf83fb" +dependencies = [ + "cc", + "cfg-if", + "libc", + "log", + "rustversion", + "windows 0.54.0", +] + +[[package]] +name = "gimli" +version = "0.29.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "40ecd4077b5ae9fd2e9e169b102c6c330d0605168eb0e8bf79952b256dbefffd" + +[[package]] +name = "lazy_static" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" + +[[package]] +name = "libc" +version = "0.2.155" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97b3888a4aecf77e811145cadf6eef5901f4782c53886191b2f693f24761847c" + +[[package]] +name = "log" +version = "0.4.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24" + +[[package]] +name = "loom" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff50ecb28bb86013e935fb6683ab1f6d3a20016f123c76fd4c27470076ac30f5" +dependencies = [ + "cfg-if", + "generator 0.7.5", + "scoped-tls", + "tracing", + "tracing-subscriber", +] + +[[package]] +name = "loom" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "419e0dc8046cb947daa77eb95ae174acfbddb7673b4151f56d1eed8e93fbfaca" +dependencies = [ + "cfg-if", + "generator 0.8.1", + "scoped-tls", + "tracing", + "tracing-subscriber", +] + +[[package]] +name = "maitake-sync" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0467b24ad105bb1873a1ad7b6b5515cd383f33f1df293ec05f82b5632d0e07cd" +dependencies = [ + "cordyceps", + "loom 0.7.2", + "mutex-traits", + "mycelium-bitfield", + "pin-project", + "portable-atomic", + "tracing", +] + +[[package]] +name = "matchers" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558" +dependencies = [ + "regex-automata 0.1.10", +] + +[[package]] +name = "memchr" +version = "2.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" + +[[package]] +name = "miniz_oxide" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8a240ddb74feaf34a79a7add65a741f3167852fba007066dcac1ca548d89c08" +dependencies = [ + "adler", +] + +[[package]] +name = "mutex-traits" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fd54cb762feb1788c74f5d3387983864d2365ca1819043d2addb76db80169102" + +[[package]] +name = "mycelium-bitfield" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24e0cc5e2c585acbd15c5ce911dff71e1f4d5313f43345873311c4f5efd741cc" + +[[package]] +name = "nu-ansi-term" +version = "0.46.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84" +dependencies = [ + "overload", + "winapi", +] + +[[package]] +name = "object" +version = "0.36.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "576dfe1fc8f9df304abb159d767a29d0476f7750fbf8aa7ad07816004a207434" +dependencies = [ + "memchr", +] + +[[package]] +name = "once_cell" +version = "1.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" + +[[package]] +name = "overload" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" + +[[package]] +name = "pin-project" +version = "1.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6bf43b791c5b9e34c3d182969b4abb522f9343702850a2e57f460d00d09b4b3" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2f38a4412a78282e09a2cf38d195ea5420d15ba0602cb375210efbc877243965" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "pin-project-lite" +version = "0.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bda66fc9667c18cb2758a2ac84d1167245054bcf85d5d1aaa6923f45801bdd02" + +[[package]] +name = "portable-atomic" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7170ef9988bc169ba16dd36a7fa041e5c4cbeb6a35b76d4c03daded371eae7c0" + +[[package]] +name = "proc-macro2" +version = "1.0.86" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e719e8df665df0d1c8fbfd238015744736151d4445ec0836b8e628aae103b77" +dependencies = [ + "unicode-ident", +] + +[[package]] +name = "quote" +version = "1.0.36" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fa76aaf39101c457836aec0ce2316dbdc3ab723cdda1c6bd4e6ad4208acaca7" +dependencies = [ + "proc-macro2", +] + +[[package]] +name = "regex" +version = "1.10.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b91213439dad192326a0d7c6ee3955910425f441d7038e0d6933b0aec5c4517f" +dependencies = [ + "aho-corasick", + "memchr", + "regex-automata 0.4.7", + "regex-syntax 0.8.4", +] + +[[package]] +name = "regex-automata" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" +dependencies = [ + "regex-syntax 0.6.29", +] + +[[package]] +name = "regex-automata" +version = "0.4.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38caf58cc5ef2fed281f89292ef23f6365465ed9a41b7a7754eb4e26496c92df" +dependencies = [ + "aho-corasick", + "memchr", + "regex-syntax 0.8.4", +] + +[[package]] +name = "regex-syntax" +version = "0.6.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1" + +[[package]] +name = "regex-syntax" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a66a03ae7c801facd77a29370b4faec201768915ac14a721ba36f20bc9c209b" + +[[package]] +name = "rustc-demangle" +version = "0.1.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f" + +[[package]] +name = "rustversion" +version = "1.0.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "955d28af4278de8121b7ebeb796b6a45735dc01436d898801014aced2773a3d6" + +[[package]] +name = "scoped-tls" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1cf6437eb19a8f4a6cc0f7dca544973b0b78843adbfeb3683d1a94a0024a294" + +[[package]] +name = "sharded-slab" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6" +dependencies = [ + "lazy_static", +] + +[[package]] +name = "smallvec" +version = "1.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67" + +[[package]] +name = "syn" +version = "2.0.68" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "901fa70d88b9d6c98022e23b4136f9f3e54e4662c3bc1bd1d84a42a9a0f0c1e9" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + +[[package]] +name = "thread_local" +version = "1.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b9ef9bad013ada3808854ceac7b46812a6465ba368859a37e2100283d2d719c" +dependencies = [ + "cfg-if", + "once_cell", +] + +[[package]] +name = "tokio" +version = "1.38.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba4f4a02a7a80d6f274636f0aa95c7e383b912d41fe721a31f29e29698585a4a" +dependencies = [ + "backtrace", + "pin-project-lite", + "tokio-macros", +] + +[[package]] +name = "tokio-macros" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f5ae998a069d4b5aba8ee9dad856af7d520c3699e6159b185c2acd48155d39a" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tracing" +version = "0.1.40" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef" +dependencies = [ + "pin-project-lite", + "tracing-attributes", + "tracing-core", +] + +[[package]] +name = "tracing-attributes" +version = "0.1.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tracing-core" +version = "0.1.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54" +dependencies = [ + "once_cell", + "valuable", +] + +[[package]] +name = "tracing-log" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" +dependencies = [ + "log", + "once_cell", + "tracing-core", +] + +[[package]] +name = "tracing-subscriber" +version = "0.3.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ad0f048c97dbd9faa9b7df56362b8ebcaa52adb06b498c050d2f4e32f90a7a8b" +dependencies = [ + "matchers", + "nu-ansi-term", + "once_cell", + "regex", + "sharded-slab", + "smallvec", + "thread_local", + "tracing", + "tracing-core", + "tracing-log", +] + +[[package]] +name = "unicode-ident" +version = "1.0.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b" + +[[package]] +name = "valuable" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" + +[[package]] +name = "winapi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" +dependencies = [ + "winapi-i686-pc-windows-gnu", + "winapi-x86_64-pc-windows-gnu", +] + +[[package]] +name = "winapi-i686-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" + +[[package]] +name = "winapi-x86_64-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" + +[[package]] +name = "windows" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e686886bc078bc1b0b600cac0147aadb815089b6e4da64016cbd754b6342700f" +dependencies = [ + "windows-targets 0.48.5", +] + +[[package]] +name = "windows" +version = "0.54.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9252e5725dbed82865af151df558e754e4a3c2c30818359eb17465f1346a1b49" +dependencies = [ + "windows-core", + "windows-targets 0.52.5", +] + +[[package]] +name = "windows-core" +version = "0.54.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "12661b9c89351d684a50a8a643ce5f608e20243b9fb84687800163429f161d65" +dependencies = [ + "windows-result", + "windows-targets 0.52.5", +] + +[[package]] +name = "windows-result" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e383302e8ec8515204254685643de10811af0ed97ea37210dc26fb0032647f8" +dependencies = [ + "windows-targets 0.52.5", +] + +[[package]] +name = "windows-targets" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a2fa6e2155d7247be68c096456083145c183cbbbc2764150dda45a87197940c" +dependencies = [ + "windows_aarch64_gnullvm 0.48.5", + "windows_aarch64_msvc 0.48.5", + "windows_i686_gnu 0.48.5", + "windows_i686_msvc 0.48.5", + "windows_x86_64_gnu 0.48.5", + "windows_x86_64_gnullvm 0.48.5", + "windows_x86_64_msvc 0.48.5", +] + +[[package]] +name = "windows-targets" +version = "0.52.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f0713a46559409d202e70e28227288446bf7841d3211583a4b53e3f6d96e7eb" +dependencies = [ + "windows_aarch64_gnullvm 0.52.5", + "windows_aarch64_msvc 0.52.5", + "windows_i686_gnu 0.52.5", + "windows_i686_gnullvm", + "windows_i686_msvc 0.52.5", + "windows_x86_64_gnu 0.52.5", + "windows_x86_64_gnullvm 0.52.5", + "windows_x86_64_msvc 0.52.5", +] + +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8" + +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.52.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7088eed71e8b8dda258ecc8bac5fb1153c5cffaf2578fc8ff5d61e23578d3263" + +[[package]] +name = "windows_aarch64_msvc" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc" + +[[package]] +name = "windows_aarch64_msvc" +version = "0.52.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9985fd1504e250c615ca5f281c3f7a6da76213ebd5ccc9561496568a2752afb6" + +[[package]] +name = "windows_i686_gnu" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e" + +[[package]] +name = "windows_i686_gnu" +version = "0.52.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "88ba073cf16d5372720ec942a8ccbf61626074c6d4dd2e745299726ce8b89670" + +[[package]] +name = "windows_i686_gnullvm" +version = "0.52.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87f4261229030a858f36b459e748ae97545d6f1ec60e5e0d6a3d32e0dc232ee9" + +[[package]] +name = "windows_i686_msvc" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406" + +[[package]] +name = "windows_i686_msvc" +version = "0.52.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "db3c2bf3d13d5b658be73463284eaf12830ac9a26a90c717b7f771dfe97487bf" + +[[package]] +name = "windows_x86_64_gnu" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e" + +[[package]] +name = "windows_x86_64_gnu" +version = "0.52.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4e4246f76bdeff09eb48875a0fd3e2af6aada79d409d33011886d3e1581517d9" + +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc" + +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.52.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "852298e482cd67c356ddd9570386e2862b5673c85bd5f88df9ab6802b334c596" + +[[package]] +name = "windows_x86_64_msvc" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" + +[[package]] +name = "windows_x86_64_msvc" +version = "0.52.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bec47e5bfd1bff0eeaf6d8b485cc1074891a197ab4225d504cb7a1ab88b02bf0" diff --git a/bbq2/Cargo.toml b/bbq2/Cargo.toml new file mode 100644 index 0000000..7c94bf5 --- /dev/null +++ b/bbq2/Cargo.toml @@ -0,0 +1,58 @@ +[package] +name = "bbq2" +version = "0.4.2" +description = "A SPSC, lockless, no_std, thread safe, queue, based on BipBuffers" +repository = "https://github.com/jamesmunns/bbq2" +authors = ["James Munns "] +edition = "2024" +readme = "README.md" + +categories = [ + "embedded", + "no-std", + "memory-management", +] +license = "MIT OR Apache-2.0" +keywords = [] +documentation = "https://docs.rs/bbq2/" + +[package.metadata.docs.rs] +rustdoc-args = ["--cfg", "docsrs"] +features = ["std"] + +[dependencies] +const-init = "1.0.0" + +[dependencies.maitake-sync] +version = "0.2" +default-features = false +optional = true + +[dependencies.critical-section] +version = "1.0" +default-features = false +optional = true + +[dev-dependencies.tokio] +version = "1.0" +features = ["macros", "rt", "time"] + +[features] +# NOTE: CAS atomics are switched using `#[cfg(target_has_atomic = "ptr")]` +default = [ + "maitake-sync-0_2", + "critical-section", +] +critical-section = [ + "dep:critical-section", +] +disable-cache-padding = [ + "maitake-sync?/no-cache-pad", +] +alloc = [] +std = [ + "alloc" +] +maitake-sync-0_2 = [ + "dep:maitake-sync", +] diff --git a/bbq2/README.md b/bbq2/README.md new file mode 100644 index 0000000..4a444ad --- /dev/null +++ b/bbq2/README.md @@ -0,0 +1,7 @@ +# bbq2 + +Now with sixteen great flavors! + +This is an attempt to re-implement [bbqueue](https://github.com/jamesmunns/bbqueue). + +No docs yet. Check out the unit tests in [`lib.rs`](./src/lib.rs) for usage :) diff --git a/bbq2/miri.sh b/bbq2/miri.sh new file mode 100755 index 0000000..5d13344 --- /dev/null +++ b/bbq2/miri.sh @@ -0,0 +1,10 @@ +#!/bin/bash + +# We disable isolation because we use tokio sleep +# We disable leaks because ??? +# +# TODO: Can we eliminate some of these limitations for testing? +MIRIFLAGS="-Zmiri-disable-isolation -Zmiri-ignore-leaks" \ + cargo +nightly miri test \ + --target x86_64-unknown-linux-gnu \ + --features=std diff --git a/bbq2/src/lib.rs b/bbq2/src/lib.rs new file mode 100644 index 0000000..6537393 --- /dev/null +++ b/bbq2/src/lib.rs @@ -0,0 +1,241 @@ +//! bbq2 +//! +//! A new and improved bipbuffer queue. + +#![cfg_attr(not(any(test, feature = "std")), no_std)] + +#[cfg(feature = "alloc")] +extern crate alloc; + +/// Type aliases for different generic configurations +/// +pub mod nicknames; + +/// Producer and consumer interfaces +/// +pub mod prod_cons; + +/// Queue storage +/// +pub mod queue; + +/// Generic traits +/// +pub mod traits; + +/// Re-export of external types/traits +/// +pub mod export { + pub use const_init::ConstInit; +} + +#[cfg(all(test, feature = "alloc"))] +mod test { + use core::{ops::Deref, time::Duration}; + + use crate::{ + queue::{ArcBBQueue, BBQueue}, + traits::{ + coordination::cas::AtomicCoord, + notifier::maitake::MaiNotSpsc, + storage::{BoxedSlice, Inline}, + }, + }; + + #[cfg(all(target_has_atomic = "ptr", feature = "alloc"))] + #[test] + fn ux() { + use crate::traits::{notifier::blocking::Blocking, storage::BoxedSlice}; + + static BBQ: BBQueue, AtomicCoord, Blocking> = BBQueue::new(); + let _ = BBQ.stream_producer(); + let _ = BBQ.stream_consumer(); + + let buf2 = Inline::<64>::new(); + let bbq2: BBQueue<_, AtomicCoord, Blocking> = BBQueue::new_with_storage(&buf2); + let _ = bbq2.stream_producer(); + let _ = bbq2.stream_consumer(); + + let buf3 = BoxedSlice::new(64); + let bbq3: BBQueue<_, AtomicCoord, Blocking> = BBQueue::new_with_storage(buf3); + let _ = bbq3.stream_producer(); + let _ = bbq3.stream_consumer(); + } + + #[cfg(target_has_atomic = "ptr")] + #[test] + fn smoke() { + use crate::traits::notifier::blocking::Blocking; + use core::ops::Deref; + + static BBQ: BBQueue, AtomicCoord, Blocking> = BBQueue::new(); + let prod = BBQ.stream_producer(); + let cons = BBQ.stream_consumer(); + + let write_once = &[0x01, 0x02, 0x03, 0x04, 0x11, 0x12, 0x13, 0x14]; + let mut wgr = prod.grant_exact(8).unwrap(); + wgr.copy_from_slice(write_once); + wgr.commit(8); + + let rgr = cons.read().unwrap(); + assert_eq!(rgr.deref(), write_once.as_slice(),); + rgr.release(4); + + let rgr = cons.read().unwrap(); + assert_eq!(rgr.deref(), &write_once[4..]); + rgr.release(4); + + assert!(cons.read().is_err()); + } + + #[cfg(target_has_atomic = "ptr")] + #[test] + fn smoke_framed() { + use crate::traits::notifier::blocking::Blocking; + use core::ops::Deref; + + static BBQ: BBQueue, AtomicCoord, Blocking> = BBQueue::new(); + let prod = BBQ.framed_producer(); + let cons = BBQ.framed_consumer(); + + let write_once = &[0x01, 0x02, 0x03, 0x04, 0x11, 0x12]; + let mut wgr = prod.grant(8).unwrap(); + wgr[..6].copy_from_slice(write_once); + wgr.commit(6); + + let rgr = cons.read().unwrap(); + assert_eq!(rgr.deref(), write_once.as_slice()); + rgr.release(); + + assert!(cons.read().is_err()); + } + + #[cfg(target_has_atomic = "ptr")] + #[test] + fn framed_misuse() { + use crate::traits::notifier::blocking::Blocking; + + static BBQ: BBQueue, AtomicCoord, Blocking> = BBQueue::new(); + let prod = BBQ.stream_producer(); + let cons = BBQ.framed_consumer(); + + // Bad grant one: HUGE header value + let write_once = &[0xFF, 0xFF, 0x03, 0x04, 0x11, 0x12]; + let mut wgr = prod.grant_exact(6).unwrap(); + wgr[..6].copy_from_slice(write_once); + wgr.commit(6); + + assert!(cons.read().is_err()); + + { + // Clear the bad grant + let cons2 = BBQ.stream_consumer(); + let rgr = cons2.read().unwrap(); + rgr.release(6); + } + + // Bad grant two: too small of a grant + let write_once = &[0x00]; + let mut wgr = prod.grant_exact(1).unwrap(); + wgr[..1].copy_from_slice(write_once); + wgr.commit(1); + + assert!(cons.read().is_err()); + } + + #[tokio::test] + async fn asink() { + static BBQ: BBQueue, AtomicCoord, MaiNotSpsc> = BBQueue::new(); + let prod = BBQ.stream_producer(); + let cons = BBQ.stream_consumer(); + + let rxfut = tokio::task::spawn(async move { + let rgr = cons.wait_read().await; + assert_eq!(rgr.deref(), &[1, 2, 3]); + }); + + let txfut = tokio::task::spawn(async move { + tokio::time::sleep(Duration::from_millis(500)).await; + let mut wgr = prod.grant_exact(3).unwrap(); + wgr.copy_from_slice(&[1, 2, 3]); + wgr.commit(3); + }); + + // todo: timeouts + rxfut.await.unwrap(); + txfut.await.unwrap(); + } + + #[tokio::test] + async fn asink_framed() { + static BBQ: BBQueue, AtomicCoord, MaiNotSpsc> = BBQueue::new(); + let prod = BBQ.framed_producer(); + let cons = BBQ.framed_consumer(); + + let rxfut = tokio::task::spawn(async move { + let rgr = cons.wait_read().await; + assert_eq!(rgr.deref(), &[1, 2, 3]); + }); + + let txfut = tokio::task::spawn(async move { + tokio::time::sleep(Duration::from_millis(500)).await; + let mut wgr = prod.grant(3).unwrap(); + wgr.copy_from_slice(&[1, 2, 3]); + wgr.commit(3); + }); + + // todo: timeouts + rxfut.await.unwrap(); + txfut.await.unwrap(); + } + + #[tokio::test] + async fn arc1() { + let bbq: ArcBBQueue, AtomicCoord, MaiNotSpsc> = + ArcBBQueue::new_with_storage(Inline::new()); + let prod = bbq.stream_producer(); + let cons = bbq.stream_consumer(); + + let rxfut = tokio::task::spawn(async move { + let rgr = cons.wait_read().await; + assert_eq!(rgr.deref(), &[1, 2, 3]); + }); + + let txfut = tokio::task::spawn(async move { + tokio::time::sleep(Duration::from_millis(500)).await; + let mut wgr = prod.grant_exact(3).unwrap(); + wgr.copy_from_slice(&[1, 2, 3]); + wgr.commit(3); + }); + + // todo: timeouts + rxfut.await.unwrap(); + txfut.await.unwrap(); + } + + #[tokio::test] + async fn arc2() { + let bbq: ArcBBQueue = + ArcBBQueue::new_with_storage(BoxedSlice::new(64)); + let prod = bbq.stream_producer(); + let cons = bbq.stream_consumer(); + + let rxfut = tokio::task::spawn(async move { + let rgr = cons.wait_read().await; + assert_eq!(rgr.deref(), &[1, 2, 3]); + }); + + let txfut = tokio::task::spawn(async move { + tokio::time::sleep(Duration::from_millis(500)).await; + let mut wgr = prod.grant_exact(3).unwrap(); + wgr.copy_from_slice(&[1, 2, 3]); + wgr.commit(3); + }); + + // todo: timeouts + rxfut.await.unwrap(); + txfut.await.unwrap(); + + drop(bbq); + } +} diff --git a/bbq2/src/nicknames.rs b/bbq2/src/nicknames.rs new file mode 100644 index 0000000..99b1c9a --- /dev/null +++ b/bbq2/src/nicknames.rs @@ -0,0 +1,99 @@ +//! BBQueue Styles: Sixteen great flavors! +//! +//! | Storage | Coordination | Notifier | Arc? | Nickname | Source | +//! | :--- | :--- | :--- | :--- | :--- | :--- | +//! | Inline | Critical Section | Blocking | No | Jerk | Jamaica | +//! | Inline | Critical Section | Blocking | Yes | Asado | Argentina | +//! | Inline | Critical Section | Async | No | Memphis | USA | +//! | Inline | Critical Section | Async | Yes | Carolina | USA | +//! | Inline | Atomic | Blocking | No | Churrasco | Brazil | +//! | Inline | Atomic | Blocking | Yes | Barbacoa | Mexico | +//! | Inline | Atomic | Async | No | Texas | USA | +//! | Inline | Atomic | Async | Yes | KansasCity | USA | +//! | Heap | Critical Section | Blocking | No | Braai | South Africa | +//! | Heap | Critical Section | Blocking | Yes | Kebab | Türkiye | +//! | Heap | Critical Section | Async | No | SiuMei | Hong Kong | +//! | Heap | Critical Section | Async | Yes | Satay | SE Asia | +//! | Heap | Atomic | Blocking | No | YakiNiku | Japan | +//! | Heap | Atomic | Blocking | Yes | GogiGui | South Korea | +//! | Heap | Atomic | Async | No | Tandoori | India | +//! | Heap | Atomic | Async | Yes | Lechon | Philippines | + +#![allow(unused_imports)] + +#[cfg(feature = "alloc")] +use crate::queue::ArcBBQueue; +#[cfg(target_has_atomic = "ptr")] +use crate::traits::coordination::cas::AtomicCoord; +#[cfg(feature = "critical-section")] +use crate::traits::coordination::cs::CsCoord; +#[cfg(feature = "alloc")] +use crate::traits::storage::BoxedSlice; +use crate::{ + queue::BBQueue, + traits::{notifier::blocking::Blocking, storage::Inline}, +}; + +/// Inline Storage, Critical Section, Blocking, Borrowed +#[cfg(feature = "critical-section")] +pub type Jerk = BBQueue, CsCoord, Blocking>; + +/// Inline Storage, Critical Section, Async, Borrowed +#[cfg(feature = "critical-section")] +pub type Memphis = BBQueue, CsCoord, A>; + +/// Inline Storage, Atomics, Blocking, Borrowed +#[cfg(target_has_atomic = "ptr")] +pub type Churrasco = BBQueue, AtomicCoord, Blocking>; + +/// Inline Storage, Atomics, Async, Borrowed +#[cfg(target_has_atomic = "ptr")] +pub type Texas = BBQueue, AtomicCoord, A>; + +/// Heap Buffer, Critical Section, Blocking, Borrowed +#[cfg(all(feature = "alloc", feature = "critical-section"))] +pub type Braai = BBQueue; + +/// Heap Buffer, Critical Section, Async, Borrowed +#[cfg(all(feature = "alloc", feature = "critical-section"))] +pub type SiuMei = BBQueue; + +/// Heap Buffer, Atomics, Blocking, Borrowed +#[cfg(all(feature = "alloc", target_has_atomic = "ptr"))] +pub type YakiNiku = BBQueue; + +/// Heap Buffer, Atomics, Async, Borrowed +#[cfg(all(feature = "alloc", target_has_atomic = "ptr"))] +pub type Tandoori = BBQueue; + +/// Inline Storage, Critical Section, Blocking, Arc +#[cfg(all(feature = "alloc", feature = "critical-section"))] +pub type Asado = ArcBBQueue, CsCoord, Blocking>; + +/// Inline Storage, Critical Section, Async, Arc +#[cfg(all(feature = "alloc", feature = "critical-section"))] +pub type Carolina = ArcBBQueue, CsCoord, A>; + +/// Inline Storage, Atomics, Blocking, Arc +#[cfg(all(feature = "alloc", target_has_atomic = "ptr"))] +pub type Barbacoa = ArcBBQueue, AtomicCoord, Blocking>; + +/// Inline Storage, Atomics, Async, Arc +#[cfg(all(feature = "alloc", target_has_atomic = "ptr"))] +pub type KansasCity = ArcBBQueue, AtomicCoord, A>; + +/// Heap Buffer, Critical Section, Blocking, Arc +#[cfg(all(feature = "alloc", feature = "critical-section"))] +pub type Kebab = ArcBBQueue; + +/// Heap Buffer, Critical Section, Async, Arc +#[cfg(all(feature = "alloc", feature = "critical-section"))] +pub type Satay = ArcBBQueue; + +/// Heap Buffer, Atomics, Blocking, Arc +#[cfg(all(feature = "alloc", target_has_atomic = "ptr"))] +pub type GogiGui = ArcBBQueue; + +/// Heap Buffer, Atomics, Async, Arc +#[cfg(all(feature = "alloc", target_has_atomic = "ptr"))] +pub type Lechon = ArcBBQueue; diff --git a/bbq2/src/prod_cons/framed.rs b/bbq2/src/prod_cons/framed.rs new file mode 100644 index 0000000..c005832 --- /dev/null +++ b/bbq2/src/prod_cons/framed.rs @@ -0,0 +1,399 @@ +//! Framed byte queue interfaces +//! +//! Useful for sending data that has coherent frames, e.g. network packets + +use core::{ + marker::PhantomData, + ops::{Deref, DerefMut}, + ptr::NonNull, +}; + +use crate::traits::{ + bbqhdl::BbqHandle, + coordination::{Coord, ReadGrantError, WriteGrantError}, + notifier::{AsyncNotifier, Notifier}, + storage::Storage, +}; + +/// A trait that can be used as the "header" for separating framed storage. +/// +/// Framed interfaces use a `u16` by default, which only requires two bytes +/// for the header, with the limitation that the largest grant allowed is +/// 64KiB at a time. You can also use a `usize` allowing the maximum platform +/// available size. +/// +/// You should not have to implement this trait. +/// +/// # Safety +/// +/// Do it right +pub unsafe trait LenHeader: Into + Copy + Ord { + /// Should be `[u8; size_of::()]` + type Bytes; + /// Convert Self into Self::Bytes, in little endian order + fn to_le_bytes(&self) -> Self::Bytes; + /// Convert Self::Bytes (which is in little endian order) to Self. + fn from_le_bytes(by: Self::Bytes) -> Self; +} + +/// A producer handle that can be used to write framed chunks +pub struct FramedProducer +where + Q: BbqHandle, + H: LenHeader, +{ + pub(crate) bbq: Q::Target, + pub(crate) pd: PhantomData, +} + +/// A consumer handle that can be used to read framed chunks +pub struct FramedConsumer +where + Q: BbqHandle, + H: LenHeader, +{ + pub(crate) bbq: Q::Target, + pub(crate) pd: PhantomData, +} + +/// A writing grant into the storage buffer +/// +/// Grants implement Deref/DerefMut to access the contained storage. +#[must_use = "Write Grants must be committed to be effective"] +pub struct FramedGrantW +where + Q: BbqHandle, + H: LenHeader, +{ + bbq: Q::Target, + base_ptr: NonNull, + hdr: H, +} + +/// A reading grant into the storage buffer +/// +/// Grants implement Deref/DerefMut to access the contained storage. +/// +/// Write access is provided for read grants in case it is necessary to mutate +/// the storage in-place for decoding. +#[must_use = "Read Grants must be released to free space"] +pub struct FramedGrantR +where + Q: BbqHandle, + H: LenHeader, +{ + bbq: Q::Target, + body_ptr: NonNull, + hdr: H, +} + +// ---- impls ---- + +// ---- impl LenHeader ---- + +unsafe impl LenHeader for u16 { + type Bytes = [u8; 2]; + + #[inline(always)] + fn to_le_bytes(&self) -> Self::Bytes { + u16::to_le_bytes(*self) + } + + #[inline(always)] + fn from_le_bytes(by: Self::Bytes) -> Self { + u16::from_le_bytes(by) + } +} + +unsafe impl LenHeader for usize { + type Bytes = [u8; core::mem::size_of::()]; + + #[inline(always)] + fn to_le_bytes(&self) -> Self::Bytes { + usize::to_le_bytes(*self) + } + + #[inline(always)] + fn from_le_bytes(by: Self::Bytes) -> Self { + usize::from_le_bytes(by) + } +} + +// ---- impl FramedProducer ---- + +impl FramedProducer +where + Q: BbqHandle, + H: LenHeader, +{ + /// Attempt to obtain a write grant of the given (max) size + /// + /// The returned grant can be used to write up to `sz` bytes, though + /// a smaller size may be committed. Dropping the grant without calling + /// commit means that no data will be made visible to the consumer. + pub fn grant(&self, sz: H) -> Result, WriteGrantError> { + let (ptr, cap) = self.bbq.sto.ptr_len(); + let needed = sz.into() + core::mem::size_of::(); + + let offset = self.bbq.cor.grant_exact(cap, needed)?; + + let base_ptr = unsafe { + let p = ptr.as_ptr().byte_add(offset); + NonNull::new_unchecked(p) + }; + Ok(FramedGrantW { + bbq: self.bbq.clone(), + base_ptr, + hdr: sz, + }) + } +} + +impl FramedProducer +where + Q: BbqHandle, + Q::Notifier: AsyncNotifier, + H: LenHeader, +{ + /// Wait for the given write grant to become available + /// + /// If `sz` is larger than the storage buffer, this method will never + /// return. + /// + /// The returned grant can be used to write up to `sz` bytes, though + /// a smaller size may be committed. Dropping the grant without calling + /// commit means that no data will be made visible to the consumer. + pub async fn wait_grant(&self, sz: H) -> FramedGrantW { + self.bbq.not.wait_for_not_full(|| self.grant(sz).ok()).await + } +} + +// ---- impl FramedConsumer ---- + +impl FramedConsumer +where + Q: BbqHandle, + H: LenHeader, +{ + /// Attempt to receive a single frame + /// + /// The FramedConsumer has no control over the size of the read grant, + /// we see whatever size was written by the FramedProducer. + /// + /// The returned grant must be released to free the space in the buffer. + pub fn read(&self) -> Result, ReadGrantError> { + let (ptr, _cap) = self.bbq.sto.ptr_len(); + let (offset, grant_len) = self.bbq.cor.read()?; + + // Calculate the size so we can figure out where the body + // starts in the grant + let hdr_sz = const { core::mem::size_of::() }; + if hdr_sz > grant_len { + // This means that we got a read grant that doesn't even + // cover the size of a header - this should only be possible + // if you used a stream producer to create a grant, this is + // not compatible. We need to release the read grant, and + // return an error + self.bbq.cor.release_inner(0); + return Err(ReadGrantError::InconsistentFrameHeader); + } + + // Ptr is the base of (HDR, Body) + let ptr = unsafe { ptr.as_ptr().byte_add(offset) }; + // Read the potentially unaligned header + let hdr: H = unsafe { ptr.cast::().read_unaligned() }; + if (hdr_sz + hdr.into()) > grant_len { + // Again, the header value + header size are larger than + // the actual read grant, this means someone is doing + // something sketch. We need to release the read grant, + // and return an error + self.bbq.cor.release_inner(0); + return Err(ReadGrantError::InconsistentFrameHeader); + } + + // Get the body, which is the base ptr offset by the header size + let body_ptr = unsafe { + let p = ptr.byte_add(hdr_sz); + core::ptr::NonNull::new_unchecked(p) + }; + Ok(FramedGrantR { + bbq: self.bbq.clone(), + body_ptr, + hdr, + }) + } +} + +impl FramedConsumer +where + Q: BbqHandle, + Q::Notifier: AsyncNotifier, + H: LenHeader, +{ + pub async fn wait_read(&self) -> FramedGrantR { + self.bbq.not.wait_for_not_empty(|| self.read().ok()).await + } +} + +// ---- impl FramedGrantW ---- + +impl FramedGrantW +where + Q: BbqHandle, + H: LenHeader, +{ + /// Commit `used` bytes of the grant to be visible. + /// + /// If `used` is greater than the `sz` used to create this grant, the + /// amount will be clamped to `sz`. + pub fn commit(self, used: H) { + let (_ptr, cap) = self.bbq.sto.ptr_len(); + let hdrlen: usize = const { core::mem::size_of::() }; + let grant_len = hdrlen + self.hdr.into(); + let clamp_hdr = self.hdr.min(used); + let used_len: usize = hdrlen + clamp_hdr.into(); + + unsafe { + self.base_ptr + .cast::() + .as_ptr() + .write_unaligned(clamp_hdr); + } + + self.bbq.cor.commit_inner(cap, grant_len, used_len); + self.bbq.not.wake_one_consumer(); + core::mem::forget(self); + } + + /// Aborts the grant, making no frame available to the consumer + /// + /// Can be used to silence "must_use" errors. + pub fn abort(self) { + // The default behavior is to abort - do nothing, let the + // drop impl run + } +} + +impl Deref for FramedGrantW +where + Q: BbqHandle, + H: LenHeader, +{ + type Target = [u8]; + + fn deref(&self) -> &Self::Target { + let len = self.hdr.into(); + let body_ptr = unsafe { + let hdr_sz = const { core::mem::size_of::() }; + self.base_ptr.as_ptr().byte_add(hdr_sz) + }; + unsafe { core::slice::from_raw_parts(body_ptr, len) } + } +} + +impl DerefMut for FramedGrantW +where + Q: BbqHandle, + H: LenHeader, +{ + fn deref_mut(&mut self) -> &mut Self::Target { + let len = self.hdr.into(); + let body_ptr = unsafe { + let hdr_sz = const { core::mem::size_of::() }; + self.base_ptr.as_ptr().byte_add(hdr_sz) + }; + unsafe { core::slice::from_raw_parts_mut(body_ptr, len) } + } +} + +impl Drop for FramedGrantW +where + Q: BbqHandle, + H: LenHeader, +{ + fn drop(&mut self) { + // Default drop performs an "abort" + let (_ptr, cap) = self.bbq.sto.ptr_len(); + let hdrlen: usize = const { core::mem::size_of::() }; + let grant_len = hdrlen + self.hdr.into(); + self.bbq.cor.commit_inner(cap, grant_len, 0); + } +} + +unsafe impl Send for FramedGrantW +where + Q: BbqHandle, + Q::Target: Send, + H: LenHeader + Send, +{ +} + +// ---- impl FramedGrantR ---- + +impl FramedGrantR +where + Q: BbqHandle, + H: LenHeader, +{ + /// Release the entire read grant + /// + /// It is not possible to partially release a framed read grant. + pub fn release(self) { + let len: usize = self.hdr.into(); + let hdrlen: usize = const { core::mem::size_of::() }; + let used = len + hdrlen; + self.bbq.cor.release_inner(used); + self.bbq.not.wake_one_producer(); + core::mem::forget(self); + } + + /// Drop the grant WITHOUT releasing the message from the queue. + /// + /// The next call to read will observe the same packet again. + pub fn keep(self) { + // Default behavior is "keep" + } +} + +impl Deref for FramedGrantR +where + Q: BbqHandle, + H: LenHeader, +{ + type Target = [u8]; + + fn deref(&self) -> &Self::Target { + let len: usize = self.hdr.into(); + unsafe { core::slice::from_raw_parts(self.body_ptr.as_ptr(), len) } + } +} + +impl DerefMut for FramedGrantR +where + Q: BbqHandle, + H: LenHeader, +{ + fn deref_mut(&mut self) -> &mut Self::Target { + let len: usize = self.hdr.into(); + unsafe { core::slice::from_raw_parts_mut(self.body_ptr.as_ptr(), len) } + } +} + +impl Drop for FramedGrantR +where + Q: BbqHandle, + H: LenHeader, +{ + fn drop(&mut self) { + // Default behavior is "keep" - release zero bytes + self.bbq.cor.release_inner(0); + } +} + +unsafe impl Send for FramedGrantR +where + Q: BbqHandle, + Q::Target: Send, + H: LenHeader + Send, +{ +} diff --git a/bbq2/src/prod_cons/mod.rs b/bbq2/src/prod_cons/mod.rs new file mode 100644 index 0000000..9e9b34e --- /dev/null +++ b/bbq2/src/prod_cons/mod.rs @@ -0,0 +1,20 @@ +//! Producer and Consumer interfaces +//! +//! BBQueues can be used with one of two kinds of producer/consumer pairs: +//! +//! * **Framed**, where the consumer sees the exact chunks that were inserted by the +//! producer. This uses a small length header to note the length of the inserted frame. +//! This means that if the producer writes a 10 byte grant, a 20 byte grant, then a 30 +//! byte grant, the consumer will need to read three times to drain the queue, seeing the +//! 10, 20, and 30 byte chunks in order. This is useful when you are working with data that +//! has logical "frames", for example for network packets. +//! * **Stream**, where the consumer may potentially see multiple pushed chunks at once, with +//! no separation. This means that if the producer writes a 10 byte grant, a 20 byte grant, +//! then a 30 byte grant, the consumer could potentially see all 60 bytes in a single read +//! grant (if there is no wrap-around). +//! +//! You should NOT "mix and match" framed/stream consumers and producers. This will not cause +//! memory safety/UB issues, but will not work properly. + +pub mod framed; +pub mod stream; diff --git a/bbq2/src/prod_cons/stream.rs b/bbq2/src/prod_cons/stream.rs new file mode 100644 index 0000000..bcf18a6 --- /dev/null +++ b/bbq2/src/prod_cons/stream.rs @@ -0,0 +1,308 @@ +//! Stream byte queue interfaces +//! +//! Useful for sending stream-oriented data where the consumer doesn't care +//! about how the data was pushed, e.g. a serial port stream where multiple +//! writes from the software may be transferred out over DMA in a single +//! transfer. + +use core::{ + ops::{Deref, DerefMut}, + ptr::NonNull, +}; + +use crate::traits::{ + bbqhdl::BbqHandle, + coordination::{Coord, ReadGrantError, WriteGrantError}, + notifier::{AsyncNotifier, Notifier}, + storage::Storage, +}; + +/// A producer handle that may write data into the buffer +pub struct StreamProducer +where + Q: BbqHandle, +{ + pub(crate) bbq: Q::Target, +} + +/// A consumer handle that may read data from the buffer +pub struct StreamConsumer +where + Q: BbqHandle, +{ + pub(crate) bbq: Q::Target, +} + +/// A writing grant into the storage buffer +/// +/// Grants implement Deref/DerefMut to access the contained storage. +#[must_use = "Write Grants must be committed to be effective"] +pub struct StreamGrantW +where + Q: BbqHandle, +{ + bbq: Q::Target, + ptr: NonNull, + len: usize, + to_commit: usize, +} + +/// A reading grant into the storage buffer +/// +/// Grants implement Deref/DerefMut to access the contained storage. +/// +/// Write access is provided for read grants in case it is necessary to mutate +/// the storage in-place for decoding. +pub struct StreamGrantR +where + Q: BbqHandle, +{ + bbq: Q::Target, + ptr: NonNull, + len: usize, + to_release: usize, +} + +// ---- impls ---- + +// ---- StreamProducer ---- + +impl StreamProducer +where + Q: BbqHandle, +{ + /// Obtain a grant UP TO the given `max` size. + /// + /// If we return a grant, it will have a nonzero amount of space. + /// + /// If the grant represents LESS than `max` size, this is due to either: + /// + /// * There is less than `max` free space available in the queue for writing + /// * The grant represents the remaining space in the buffer that WOULDN'T cause + /// a wrap-around of the ring buffer + /// + /// This method will never cause an "early wraparound" of the ring buffer unless + /// there is no capacity without wrapping around. There may still be available + /// writing capacity in the buffer after commiting this write grant, so it may be + /// useful to call `grant_max_remaining` in a loop until `Err(WriteGrantError::InsufficientSize)` + /// is returned. + pub fn grant_max_remaining(&self, max: usize) -> Result, WriteGrantError> { + let (ptr, cap) = self.bbq.sto.ptr_len(); + let (offset, len) = self.bbq.cor.grant_max_remaining(cap, max)?; + let ptr = unsafe { + let p = ptr.as_ptr().byte_add(offset); + NonNull::new_unchecked(p) + }; + Ok(StreamGrantW { + bbq: self.bbq.clone(), + ptr, + len, + to_commit: 0, + }) + } + + /// Obtain a grant with EXACTLY `sz` capacity + /// + /// Unlike `grant_max_remaining`, if there is insufficient size at the "tail" of + /// the ring buffer, this method WILL cause a wrap-around to occur to attempt to + /// find the requested write capacity. + pub fn grant_exact(&self, sz: usize) -> Result, WriteGrantError> { + let (ptr, cap) = self.bbq.sto.ptr_len(); + let offset = self.bbq.cor.grant_exact(cap, sz)?; + let ptr = unsafe { + let p = ptr.as_ptr().byte_add(offset); + NonNull::new_unchecked(p) + }; + Ok(StreamGrantW { + bbq: self.bbq.clone(), + ptr, + len: sz, + to_commit: 0, + }) + } +} + +impl StreamProducer +where + Q: BbqHandle, + Q::Notifier: AsyncNotifier, +{ + /// Wait for a grant of any size, up to `max`, to become available + pub async fn wait_grant_max_remaining(&self, max: usize) -> StreamGrantW { + self.bbq + .not + .wait_for_not_full(|| self.grant_max_remaining(max).ok()) + .await + } + + /// Wait for a grant of EXACTLY `sz` to become available. + /// + /// If `sz` exceeds the capacity of the buffer, this method will never return. + pub async fn wait_grant_exact(&self, sz: usize) -> StreamGrantW { + self.bbq + .not + .wait_for_not_full(|| self.grant_exact(sz).ok()) + .await + } +} + +unsafe impl Send for StreamProducer {} + +// ---- StreamConsumer ---- + +impl StreamConsumer +where + Q: BbqHandle, +{ + /// Obtain a chunk of readable data + /// + /// The returned chunk may NOT represent all available data if the available + /// data wraps around the internal ring buffer. You may want to call `read` + /// in a loop until `Err(ReadGrantError::Empty)` is returned if you want to + /// drain the queue entirely. + pub fn read(&self) -> Result, ReadGrantError> { + let (ptr, _cap) = self.bbq.sto.ptr_len(); + let (offset, len) = self.bbq.cor.read()?; + let ptr = unsafe { + let p = ptr.as_ptr().byte_add(offset); + NonNull::new_unchecked(p) + }; + Ok(StreamGrantR { + bbq: self.bbq.clone(), + ptr, + len, + to_release: 0, + }) + } +} + +impl StreamConsumer +where + Q: BbqHandle, + Q::Notifier: AsyncNotifier, +{ + /// Wait for any read data to become available + pub async fn wait_read(&self) -> StreamGrantR { + self.bbq.not.wait_for_not_empty(|| self.read().ok()).await + } +} + +unsafe impl Send for StreamConsumer {} + +// ---- StreamGrantW ---- + +impl StreamGrantW +where + Q: BbqHandle, +{ + pub fn commit(self, used: usize) { + let (_, cap) = self.bbq.sto.ptr_len(); + let used = used.min(self.len); + self.bbq.cor.commit_inner(cap, self.len, used); + if used != 0 { + self.bbq.not.wake_one_consumer(); + } + core::mem::forget(self); + } +} + +impl Deref for StreamGrantW +where + Q: BbqHandle, +{ + type Target = [u8]; + + fn deref(&self) -> &Self::Target { + unsafe { core::slice::from_raw_parts(self.ptr.as_ptr(), self.len) } + } +} + +impl DerefMut for StreamGrantW +where + Q: BbqHandle, +{ + fn deref_mut(&mut self) -> &mut Self::Target { + unsafe { core::slice::from_raw_parts_mut(self.ptr.as_ptr(), self.len) } + } +} + +impl Drop for StreamGrantW +where + Q: BbqHandle, +{ + fn drop(&mut self) { + let StreamGrantW { + bbq, + ptr: _, + len, + to_commit, + } = self; + let (_, cap) = bbq.sto.ptr_len(); + let len = *len; + let used = (*to_commit).min(len); + bbq.cor.commit_inner(cap, len, used); + if used != 0 { + bbq.not.wake_one_consumer(); + } + } +} + +unsafe impl Send for StreamGrantW {} + +// ---- StreamGrantR ---- + +impl StreamGrantR +where + Q: BbqHandle, +{ + pub fn release(self, used: usize) { + let used = used.min(self.len); + self.bbq.cor.release_inner(used); + if used != 0 { + self.bbq.not.wake_one_producer(); + } + core::mem::forget(self); + } +} + +impl Deref for StreamGrantR +where + Q: BbqHandle, +{ + type Target = [u8]; + + fn deref(&self) -> &Self::Target { + unsafe { core::slice::from_raw_parts(self.ptr.as_ptr(), self.len) } + } +} + +impl DerefMut for StreamGrantR +where + Q: BbqHandle, +{ + fn deref_mut(&mut self) -> &mut Self::Target { + unsafe { core::slice::from_raw_parts_mut(self.ptr.as_ptr(), self.len) } + } +} + +impl Drop for StreamGrantR +where + Q: BbqHandle, +{ + fn drop(&mut self) { + let StreamGrantR { + bbq, + ptr: _, + len, + to_release, + } = self; + let len = *len; + let used = (*to_release).min(len); + bbq.cor.release_inner(used); + if used != 0 { + bbq.not.wake_one_producer(); + } + } +} + +unsafe impl Send for StreamGrantR {} diff --git a/bbq2/src/queue.rs b/bbq2/src/queue.rs new file mode 100644 index 0000000..f1b5b29 --- /dev/null +++ b/bbq2/src/queue.rs @@ -0,0 +1,136 @@ +use core::marker::PhantomData; + +use crate::{ + prod_cons::{ + framed::{FramedConsumer, FramedProducer}, + stream::{StreamConsumer, StreamProducer}, + }, + traits::{ + coordination::Coord, + notifier::Notifier, + storage::{ConstStorage, Storage}, + }, +}; + +#[cfg(feature = "alloc")] +use crate::traits::bbqhdl::BbqHandle; + +/// A standard bbqueue +pub struct BBQueue { + pub(crate) sto: S, + pub(crate) cor: C, + pub(crate) not: N, +} + +impl BBQueue { + pub fn new_with_storage(sto: S) -> Self { + Self { + sto, + cor: C::INIT, + not: N::INIT, + } + } +} + +/// A BBQueue wrapped in an Arc +#[cfg(feature = "alloc")] +pub struct ArcBBQueue(pub(crate) alloc::sync::Arc>); + +#[cfg(feature = "alloc")] +impl ArcBBQueue { + pub fn new_with_storage(sto: S) -> Self { + Self(alloc::sync::Arc::new(BBQueue::new_with_storage(sto))) + } +} + +#[allow(clippy::new_without_default)] +impl BBQueue { + pub const fn new() -> Self { + Self { + sto: S::INIT, + cor: C::INIT, + not: N::INIT, + } + } +} + +impl BBQueue { + pub const fn framed_producer(&self) -> FramedProducer<&'_ Self> { + FramedProducer { + bbq: self, + pd: PhantomData, + } + } + + pub const fn framed_consumer(&self) -> FramedConsumer<&'_ Self> { + FramedConsumer { + bbq: self, + pd: PhantomData, + } + } + + pub const fn stream_producer(&self) -> StreamProducer<&'_ Self> { + StreamProducer { bbq: self } + } + + pub const fn stream_consumer(&self) -> StreamConsumer<&'_ Self> { + StreamConsumer { bbq: self } + } +} + +#[cfg(feature = "alloc")] +impl crate::queue::ArcBBQueue { + pub fn framed_producer(&self) -> FramedProducer>> { + FramedProducer { + bbq: self.0.bbq_ref(), + pd: PhantomData, + } + } + + pub fn framed_consumer(&self) -> FramedConsumer>> { + FramedConsumer { + bbq: self.0.bbq_ref(), + pd: PhantomData, + } + } + + pub fn stream_producer(&self) -> StreamProducer>> { + StreamProducer { + bbq: self.0.bbq_ref(), + } + } + + pub fn stream_consumer(&self) -> StreamConsumer>> { + StreamConsumer { + bbq: self.0.bbq_ref(), + } + } +} + +#[cfg(test)] +mod test { + use crate::traits::{ + coordination::cas::AtomicCoord, notifier::blocking::Blocking, storage::Inline, + }; + + use super::*; + + type Queue = BBQueue, AtomicCoord, Blocking>; + static QUEUE: Queue = BBQueue::new(); + static PRODUCER: FramedProducer<&'static Queue, u16> = QUEUE.framed_producer(); + static CONSUMER: FramedConsumer<&'static Queue, u16> = QUEUE.framed_consumer(); + + #[test] + fn handles() { + let mut wgr = PRODUCER.grant(16).unwrap(); + wgr.iter_mut().for_each(|w| *w = 123); + wgr.commit(16); + + let rgr = CONSUMER.read().unwrap(); + assert_eq!(rgr.len(), 16); + for b in rgr.iter() { + assert_eq!(*b, 123); + } + rgr.release(); + } +} diff --git a/bbq2/src/traits/bbqhdl.rs b/bbq2/src/traits/bbqhdl.rs new file mode 100644 index 0000000..025a625 --- /dev/null +++ b/bbq2/src/traits/bbqhdl.rs @@ -0,0 +1,94 @@ +//! "Access" functionality +//! +//! This trait allows us to be generic over things like whether the +//! BBQueue is stored as a static (and we use a `&'static` reference +//! to it), or if the BBQueue is stored in an `Arc`, and we clone the +//! Arc when creating producers and consumers. +//! +//! While `storage` is where/how the *data* is stored, `bbqhdl` is where +//! the shared *header* is stored. +//! +//! The `BbqHandle` trait also serves to "bundle" all the other generics +//! into a single trait with associated types, meaning MOST of the time +//! you code only needs to be generic over `Q: BbqHandle`, and not all four +//! generic types. You can still set trait bounds in where clauses for things +//! like "has async notifications" by using additional `where`-clause like +//! `where Q::BbqHandle, Q::Coord: AsyncCoord`. + +use core::{marker::PhantomData, ops::Deref}; + +use crate::{ + prod_cons::{ + framed::{FramedConsumer, FramedProducer, LenHeader}, + stream::{StreamConsumer, StreamProducer}, + }, + queue::BBQueue, +}; + +use super::{coordination::Coord, notifier::Notifier, storage::Storage}; + +/// The "Access" trait +pub trait BbqHandle: Sized { + /// How we reference our BBQueue. + type Target: Deref> + Clone; + /// How the DATA of our BBQueue is stored + type Storage: Storage; + /// How the producers/consumers of this BBQueue coordinate + type Coord: Coord; + /// How we notify the producer/consumers of this BBQueue + type Notifier: Notifier; + + // Obtain a reference to + fn bbq_ref(&self) -> Self::Target; + + fn stream_producer(&self) -> StreamProducer { + StreamProducer { + bbq: self.bbq_ref(), + } + } + + fn stream_consumer(&self) -> StreamConsumer { + StreamConsumer { + bbq: self.bbq_ref(), + } + } + + fn framed_producer(&self) -> FramedProducer { + FramedProducer { + bbq: self.bbq_ref(), + pd: PhantomData, + } + } + + fn framed_consumer(&self) -> FramedConsumer { + FramedConsumer { + bbq: self.bbq_ref(), + pd: PhantomData, + } + } +} + +impl BbqHandle for &'_ BBQueue { + type Target = Self; + type Storage = S; + type Coord = C; + type Notifier = N; + + #[inline(always)] + fn bbq_ref(&self) -> Self::Target { + *self + } +} + +#[cfg(feature = "alloc")] +impl BbqHandle for alloc::sync::Arc> { + type Target = Self; + type Storage = S; + type Coord = C; + type Notifier = N; + + #[inline(always)] + fn bbq_ref(&self) -> Self::Target { + self.clone() + } +} diff --git a/bbq2/src/traits/coordination/cas.rs b/bbq2/src/traits/coordination/cas.rs new file mode 100644 index 0000000..11c6d86 --- /dev/null +++ b/bbq2/src/traits/coordination/cas.rs @@ -0,0 +1,280 @@ +//! Lock-free coordination based on Compare and Swap atomics + +use super::{Coord, ReadGrantError, WriteGrantError}; +use core::{ + cmp::min, + sync::atomic::{AtomicBool, AtomicUsize, Ordering}, +}; + +/// Coordination using CAS atomics +pub struct AtomicCoord { + /// Where the next byte will be written + write: AtomicUsize, + + /// Where the next byte will be read from + read: AtomicUsize, + + /// Used in the inverted case to mark the end of the + /// readable streak. Otherwise will == sizeof::(). + /// Writer is responsible for placing this at the correct + /// place when entering an inverted condition, and Reader + /// is responsible for moving it back to sizeof::() + /// when exiting the inverted condition + last: AtomicUsize, + + /// Used by the Writer to remember what bytes are currently + /// allowed to be written to, but are not yet ready to be + /// read from + reserve: AtomicUsize, + + /// Is there an active read grant? + read_in_progress: AtomicBool, + + /// Is there an active write grant? + write_in_progress: AtomicBool, +} + +impl AtomicCoord { + pub const fn new() -> Self { + Self { + write: AtomicUsize::new(0), + read: AtomicUsize::new(0), + last: AtomicUsize::new(0), + reserve: AtomicUsize::new(0), + read_in_progress: AtomicBool::new(false), + write_in_progress: AtomicBool::new(false), + } + } +} + +impl Default for AtomicCoord { + fn default() -> Self { + Self::new() + } +} + +unsafe impl Coord for AtomicCoord { + #[allow(clippy::declare_interior_mutable_const)] + const INIT: Self = Self::new(); + + fn reset(&self) { + // Re-initialize the buffer (not totally needed, but nice to do) + self.write.store(0, Ordering::Release); + self.read.store(0, Ordering::Release); + self.reserve.store(0, Ordering::Release); + self.last.store(0, Ordering::Release); + } + + fn grant_max_remaining( + &self, + capacity: usize, + mut sz: usize, + ) -> Result<(usize, usize), WriteGrantError> { + if self.write_in_progress.swap(true, Ordering::AcqRel) { + return Err(WriteGrantError::GrantInProgress); + } + + // Writer component. Must never write to `read`, + // be careful writing to `load` + let write = self.write.load(Ordering::Acquire); + let read = self.read.load(Ordering::Acquire); + let max = capacity; + + let already_inverted = write < read; + + let start = if already_inverted { + // In inverted case, read is always > write + let remain = read - write - 1; + + if remain != 0 { + sz = min(remain, sz); + write + } else { + // Inverted, no room is available + self.write_in_progress.store(false, Ordering::Release); + return Err(WriteGrantError::InsufficientSize); + } + } else { + #[allow(clippy::collapsible_if)] + if write != max { + // Some (or all) room remaining in un-inverted case + sz = min(max - write, sz); + write + } else { + // Not inverted, but need to go inverted + + // NOTE: We check read > 1, NOT read >= 1, because + // write must never == read in an inverted condition, since + // we will then not be able to tell if we are inverted or not + if read > 1 { + sz = min(read - 1, sz); + 0 + } else { + // Not invertible, no space + self.write_in_progress.store(false, Ordering::Release); + return Err(WriteGrantError::InsufficientSize); + } + } + }; + + // Safe write, only viewed by this task + self.reserve.store(start + sz, Ordering::Release); + + Ok((start, sz)) + } + + fn grant_exact(&self, capacity: usize, sz: usize) -> Result { + if self.write_in_progress.swap(true, Ordering::AcqRel) { + return Err(WriteGrantError::GrantInProgress); + } + + // Writer component. Must never write to `read`, + // be careful writing to `load` + let write = self.write.load(Ordering::Acquire); + let read = self.read.load(Ordering::Acquire); + let max = capacity; + let already_inverted = write < read; + + let start = if already_inverted { + if (write + sz) < read { + // Inverted, room is still available + write + } else { + // Inverted, no room is available + self.write_in_progress.store(false, Ordering::Release); + return Err(WriteGrantError::InsufficientSize); + } + } else { + #[allow(clippy::collapsible_if)] + if write + sz <= max { + // Non inverted condition + write + } else { + // Not inverted, but need to go inverted + + // NOTE: We check sz < read, NOT <=, because + // write must never == read in an inverted condition, since + // we will then not be able to tell if we are inverted or not + if sz < read { + // Invertible situation + 0 + } else { + // Not invertible, no space + self.write_in_progress.store(false, Ordering::Release); + return Err(WriteGrantError::InsufficientSize); + } + } + }; + + // Safe write, only viewed by this task + self.reserve.store(start + sz, Ordering::Release); + + Ok(start) + } + + fn read(&self) -> Result<(usize, usize), ReadGrantError> { + if self.read_in_progress.swap(true, Ordering::AcqRel) { + return Err(ReadGrantError::GrantInProgress); + } + + let write = self.write.load(Ordering::Acquire); + let last = self.last.load(Ordering::Acquire); + let mut read = self.read.load(Ordering::Acquire); + + // Resolve the inverted case or end of read + if (read == last) && (write < read) { + read = 0; + // This has some room for error, the other thread reads this + // Impact to Grant: + // Grant checks if read < write to see if inverted. If not inverted, but + // no space left, Grant will initiate an inversion, but will not trigger it + // Impact to Commit: + // Commit does not check read, but if Grant has started an inversion, + // grant could move Last to the prior write position + // MOVING READ BACKWARDS! + self.read.store(0, Ordering::Release); + } + + let sz = if write < read { + // Inverted, only believe last + last + } else { + // Not inverted, only believe write + write + } - read; + + if sz == 0 { + self.read_in_progress.store(false, Ordering::Release); + return Err(ReadGrantError::Empty); + } + + Ok((read, sz)) + } + + fn commit_inner(&self, capacity: usize, grant_len: usize, used: usize) { + // If there is no grant in progress, return early. This + // generally means we are dropping the grant within a + // wrapper structure + if !self.write_in_progress.load(Ordering::Acquire) { + return; + } + + // Writer component. Must never write to READ, + // be careful writing to LAST + + // Saturate the grant commit + let len = grant_len; + let used = min(len, used); + + let write = self.write.load(Ordering::Acquire); + self.reserve.fetch_sub(len - used, Ordering::AcqRel); + + let max = capacity; + let last = self.last.load(Ordering::Acquire); + let new_write = self.reserve.load(Ordering::Acquire); + + if (new_write < write) && (write != max) { + // We have already wrapped, but we are skipping some bytes at the end of the ring. + // Mark `last` where the write pointer used to be to hold the line here + self.last.store(write, Ordering::Release); + } else if new_write > last { + // We're about to pass the last pointer, which was previously the artificial + // end of the ring. Now that we've passed it, we can "unlock" the section + // that was previously skipped. + // + // Since new_write is strictly larger than last, it is safe to move this as + // the other thread will still be halted by the (about to be updated) write + // value + self.last.store(max, Ordering::Release); + } + // else: If new_write == last, either: + // * last == max, so no need to write, OR + // * If we write in the end chunk again, we'll update last to max next time + // * If we write to the start chunk in a wrap, we'll update last when we + // move write backwards + + // Write must be updated AFTER last, otherwise read could think it was + // time to invert early! + self.write.store(new_write, Ordering::Release); + + // Allow subsequent grants + self.write_in_progress.store(false, Ordering::Release); + } + + fn release_inner(&self, used: usize) { + // If there is no grant in progress, return early. This + // generally means we are dropping the grant within a + // wrapper structure + if !self.read_in_progress.load(Ordering::Acquire) { + return; + } + + // // This should always be checked by the public interfaces + // debug_assert!(used <= self.buf.len()); + + // This should be fine, purely incrementing + let _ = self.read.fetch_add(used, Ordering::Release); + + self.read_in_progress.store(false, Ordering::Release); + } +} diff --git a/bbq2/src/traits/coordination/cs.rs b/bbq2/src/traits/coordination/cs.rs new file mode 100644 index 0000000..16ec22c --- /dev/null +++ b/bbq2/src/traits/coordination/cs.rs @@ -0,0 +1,301 @@ +//! Mutex/Critical section based coordination +//! +//! This is provided so bbq2 is usable on bare metal targets that don't +//! have CAS atomics, like `cortex-m0`/`thumbv6m` targets. + +use super::{Coord, ReadGrantError, WriteGrantError}; +use core::{ + cmp::min, + sync::atomic::{AtomicBool, AtomicUsize, Ordering}, +}; + +/// Coordination that uses a critical section to perform coordination operations +/// +/// The critical section is only taken for a short time to obtain or release grants, +/// not for the entire duration of the grant. +pub struct CsCoord { + /// Where the next byte will be written + write: AtomicUsize, + + /// Where the next byte will be read from + read: AtomicUsize, + + /// Used in the inverted case to mark the end of the + /// readable streak. Otherwise will == sizeof::(). + /// Writer is responsible for placing this at the correct + /// place when entering an inverted condition, and Reader + /// is responsible for moving it back to sizeof::() + /// when exiting the inverted condition + last: AtomicUsize, + + /// Used by the Writer to remember what bytes are currently + /// allowed to be written to, but are not yet ready to be + /// read from + reserve: AtomicUsize, + + /// Is there an active read grant? + read_in_progress: AtomicBool, + + /// Is there an active write grant? + write_in_progress: AtomicBool, +} + +impl CsCoord { + pub const fn new() -> Self { + Self { + write: AtomicUsize::new(0), + read: AtomicUsize::new(0), + last: AtomicUsize::new(0), + reserve: AtomicUsize::new(0), + read_in_progress: AtomicBool::new(false), + write_in_progress: AtomicBool::new(false), + } + } +} + +impl Default for CsCoord { + fn default() -> Self { + Self::new() + } +} + +unsafe impl Coord for CsCoord { + #[allow(clippy::declare_interior_mutable_const)] + const INIT: Self = Self::new(); + + fn reset(&self) { + // Re-initialize the buffer (not totally needed, but nice to do) + self.write.store(0, Ordering::Release); + self.read.store(0, Ordering::Release); + self.reserve.store(0, Ordering::Release); + self.last.store(0, Ordering::Release); + } + + fn grant_max_remaining( + &self, + capacity: usize, + mut sz: usize, + ) -> Result<(usize, usize), WriteGrantError> { + critical_section::with(|_cs| { + if self.write_in_progress.load(Ordering::Relaxed) { + return Err(WriteGrantError::GrantInProgress); + } + self.write_in_progress.store(true, Ordering::Relaxed); + + // Writer component. Must never write to `read`, + // be careful writing to `load` + let write = self.write.load(Ordering::Relaxed); + let read = self.read.load(Ordering::Relaxed); + let max = capacity; + + let already_inverted = write < read; + + let start = if already_inverted { + // In inverted case, read is always > write + let remain = read - write - 1; + + if remain != 0 { + sz = min(remain, sz); + write + } else { + // Inverted, no room is available + self.write_in_progress.store(false, Ordering::Relaxed); + return Err(WriteGrantError::InsufficientSize); + } + } else { + #[allow(clippy::collapsible_if)] + if write != max { + // Some (or all) room remaining in un-inverted case + sz = min(max - write, sz); + write + } else { + // Not inverted, but need to go inverted + + // NOTE: We check read > 1, NOT read >= 1, because + // write must never == read in an inverted condition, since + // we will then not be able to tell if we are inverted or not + if read > 1 { + sz = min(read - 1, sz); + 0 + } else { + // Not invertible, no space + self.write_in_progress.store(false, Ordering::Relaxed); + return Err(WriteGrantError::InsufficientSize); + } + } + }; + + // Safe write, only viewed by this task + self.reserve.store(start + sz, Ordering::Relaxed); + + Ok((start, sz)) + }) + } + + fn grant_exact(&self, capacity: usize, sz: usize) -> Result { + critical_section::with(|_cs| { + if self.write_in_progress.load(Ordering::Relaxed) { + return Err(WriteGrantError::GrantInProgress); + } + self.write_in_progress.store(true, Ordering::Relaxed); + + // Writer component. Must never write to `read`, + // be careful writing to `load` + let write = self.write.load(Ordering::Relaxed); + let read = self.read.load(Ordering::Relaxed); + let max = capacity; + let already_inverted = write < read; + + let start = if already_inverted { + if (write + sz) < read { + // Inverted, room is still available + write + } else { + // Inverted, no room is available + self.write_in_progress.store(false, Ordering::Relaxed); + return Err(WriteGrantError::InsufficientSize); + } + } else { + #[allow(clippy::collapsible_if)] + if write + sz <= max { + // Non inverted condition + write + } else { + // Not inverted, but need to go inverted + + // NOTE: We check sz < read, NOT <=, because + // write must never == read in an inverted condition, since + // we will then not be able to tell if we are inverted or not + if sz < read { + // Invertible situation + 0 + } else { + // Not invertible, no space + self.write_in_progress.store(false, Ordering::Relaxed); + return Err(WriteGrantError::InsufficientSize); + } + } + }; + + // Safe write, only viewed by this task + self.reserve.store(start + sz, Ordering::Relaxed); + + Ok(start) + }) + } + + fn read(&self) -> Result<(usize, usize), ReadGrantError> { + critical_section::with(|_cs| { + if self.read_in_progress.load(Ordering::Relaxed) { + return Err(ReadGrantError::GrantInProgress); + } + self.read_in_progress.store(true, Ordering::Relaxed); + + let write = self.write.load(Ordering::Relaxed); + let last = self.last.load(Ordering::Relaxed); + let mut read = self.read.load(Ordering::Relaxed); + + // Resolve the inverted case or end of read + if (read == last) && (write < read) { + read = 0; + // This has some room for error, the other thread reads this + // Impact to Grant: + // Grant checks if read < write to see if inverted. If not inverted, but + // no space left, Grant will initiate an inversion, but will not trigger it + // Impact to Commit: + // Commit does not check read, but if Grant has started an inversion, + // grant could move Last to the prior write position + // MOVING READ BACKWARDS! + self.read.store(0, Ordering::Relaxed); + } + + let sz = if write < read { + // Inverted, only believe last + last + } else { + // Not inverted, only believe write + write + } - read; + + if sz == 0 { + self.read_in_progress.store(false, Ordering::Relaxed); + return Err(ReadGrantError::Empty); + } + + Ok((read, sz)) + }) + } + + fn commit_inner(&self, capacity: usize, grant_len: usize, used: usize) { + critical_section::with(|_cs| { + // If there is no grant in progress, return early. This + // generally means we are dropping the grant within a + // wrapper structure + if !self.write_in_progress.load(Ordering::Relaxed) { + return; + } + + // Writer component. Must never write to READ, + // be careful writing to LAST + + // Saturate the grant commit + let len = grant_len; + let used = min(len, used); + + let write = self.write.load(Ordering::Relaxed); + let old_reserve = self.reserve.load(Ordering::Relaxed); + self.reserve + .store(old_reserve - (len - used), Ordering::Relaxed); + + let max = capacity; + let last = self.last.load(Ordering::Relaxed); + let new_write = self.reserve.load(Ordering::Relaxed); + + if (new_write < write) && (write != max) { + // We have already wrapped, but we are skipping some bytes at the end of the ring. + // Mark `last` where the write pointer used to be to hold the line here + self.last.store(write, Ordering::Relaxed); + } else if new_write > last { + // We're about to pass the last pointer, which was previously the artificial + // end of the ring. Now that we've passed it, we can "unlock" the section + // that was previously skipped. + // + // Since new_write is strictly larger than last, it is safe to move this as + // the other thread will still be halted by the (about to be updated) write + // value + self.last.store(max, Ordering::Relaxed); + } + // else: If new_write == last, either: + // * last == max, so no need to write, OR + // * If we write in the end chunk again, we'll update last to max next time + // * If we write to the start chunk in a wrap, we'll update last when we + // move write backwards + + // Write must be updated AFTER last, otherwise read could think it was + // time to invert early! + self.write.store(new_write, Ordering::Relaxed); + + // Allow subsequent grants + self.write_in_progress.store(false, Ordering::Relaxed); + }) + } + + fn release_inner(&self, used: usize) { + critical_section::with(|_cs| { + // If there is no grant in progress, return early. This + // generally means we are dropping the grant within a + // wrapper structure + if !self.read_in_progress.load(Ordering::Acquire) { + return; + } + + // // This should always be checked by the public interfaces + // debug_assert!(used <= self.buf.len()); + + // This should be fine, purely incrementing + let old_read = self.read.load(Ordering::Relaxed); + self.read.store(used + old_read, Ordering::Relaxed); + self.read_in_progress.store(false, Ordering::Relaxed); + }) + } +} diff --git a/bbq2/src/traits/coordination/mod.rs b/bbq2/src/traits/coordination/mod.rs new file mode 100644 index 0000000..5eba2f8 --- /dev/null +++ b/bbq2/src/traits/coordination/mod.rs @@ -0,0 +1,69 @@ +//! "Coordination" functionality +//! +//! This trait is used to coordinate between Producers and Consumers. +//! +//! Unless you are on an embedded target without Compare and Swap atomics, e.g. +//! `cortex-m0`/`thumbv6m`, you almost certainly want to use the [`cas`] version +//! of coordination. +//! +//! The `cas` module is toggled automatically based on `#[cfg(target_has_atomic = "ptr")]`. + +#[cfg(target_has_atomic = "ptr")] +pub mod cas; + +#[cfg(feature = "critical-section")] +pub mod cs; + +/// Errors associated with obtaining a write grant +#[derive(PartialEq, Debug)] +pub enum WriteGrantError { + /// Unable to create write grant due to not enough room in the buffer + InsufficientSize, + /// Unable to create write grant due to existing write grant + GrantInProgress, +} + +/// Errors associated with obtaining a read grant +#[derive(PartialEq, Debug)] +pub enum ReadGrantError { + /// Unable to create write grant due to not enough room in the buffer + Empty, + /// Unable to create write grant due to existing write grant + GrantInProgress, + /// We observed a frame header that did not make sense. This should only + /// occur if a stream producer was used on one end and a frame consumer was + /// used on the other end. Don't do that. + /// + /// If you see this error and you are NOT doing that, please report it, as it + /// is a bug. + InconsistentFrameHeader, +} + +/// Coordination Handler +/// +/// The coordination handler is responsible for arbitrating access to the storage +/// +/// # Safety +/// +/// you must implement these correctly, or UB could happen +pub unsafe trait Coord { + const INIT: Self; + + // Reset all EXCEPT taken values back to the initial empty state + fn reset(&self); + + // Write Grants + + fn grant_max_remaining( + &self, + capacity: usize, + sz: usize, + ) -> Result<(usize, usize), WriteGrantError>; + fn grant_exact(&self, capacity: usize, sz: usize) -> Result; + fn commit_inner(&self, capacity: usize, grant_len: usize, used: usize); + + // Read Grants + + fn read(&self) -> Result<(usize, usize), ReadGrantError>; + fn release_inner(&self, used: usize); +} diff --git a/bbq2/src/traits/mod.rs b/bbq2/src/traits/mod.rs new file mode 100644 index 0000000..c1c216d --- /dev/null +++ b/bbq2/src/traits/mod.rs @@ -0,0 +1,4 @@ +pub mod bbqhdl; +pub mod coordination; +pub mod notifier; +pub mod storage; diff --git a/bbq2/src/traits/notifier/blocking.rs b/bbq2/src/traits/notifier/blocking.rs new file mode 100644 index 0000000..e171ac6 --- /dev/null +++ b/bbq2/src/traits/notifier/blocking.rs @@ -0,0 +1,14 @@ +use super::Notifier; +use const_init::ConstInit; + +pub struct Blocking; + +// Blocking performs no notification +impl Notifier for Blocking { + fn wake_one_consumer(&self) {} + fn wake_one_producer(&self) {} +} + +impl ConstInit for Blocking { + const INIT: Self = Blocking; +} diff --git a/bbq2/src/traits/notifier/maitake.rs b/bbq2/src/traits/notifier/maitake.rs new file mode 100644 index 0000000..ea8af99 --- /dev/null +++ b/bbq2/src/traits/notifier/maitake.rs @@ -0,0 +1,53 @@ +use const_init::ConstInit; +use maitake_sync::WaitCell; + +use super::{AsyncNotifier, Notifier}; + +/// A Maitake-Sync based SPSC notifier +/// +/// Usable for async context. Should not be used with multiple consumers or multiple producers +/// at the same time. +pub struct MaiNotSpsc { + not_empty: WaitCell, + not_full: WaitCell, +} + +impl MaiNotSpsc { + pub fn new() -> Self { + Self::INIT + } +} + +impl Default for MaiNotSpsc { + fn default() -> Self { + Self::new() + } +} + +impl ConstInit for MaiNotSpsc { + #[allow(clippy::declare_interior_mutable_const)] + const INIT: Self = Self { + not_empty: WaitCell::new(), + not_full: WaitCell::new(), + }; +} + +impl Notifier for MaiNotSpsc { + fn wake_one_consumer(&self) { + _ = self.not_empty.wake(); + } + + fn wake_one_producer(&self) { + _ = self.not_full.wake(); + } +} + +impl AsyncNotifier for MaiNotSpsc { + async fn wait_for_not_empty Option>(&self, f: F) -> T { + self.not_empty.wait_for_value(f).await.unwrap() + } + + async fn wait_for_not_full Option>(&self, f: F) -> T { + self.not_full.wait_for_value(f).await.unwrap() + } +} diff --git a/bbq2/src/traits/notifier/mod.rs b/bbq2/src/traits/notifier/mod.rs new file mode 100644 index 0000000..29c48c3 --- /dev/null +++ b/bbq2/src/traits/notifier/mod.rs @@ -0,0 +1,23 @@ +//! "Notification" functionality +//! +//! This functionality allows (or doesn't allow) for awaiting a read/write grant + +use const_init::ConstInit; + +#[cfg(feature = "maitake-sync-0_2")] +pub mod maitake; + +pub mod blocking; + +/// Non-async notifications +pub trait Notifier: ConstInit { + fn wake_one_consumer(&self); + fn wake_one_producer(&self); +} + +/// Async notifications +#[allow(async_fn_in_trait)] +pub trait AsyncNotifier: Notifier { + async fn wait_for_not_empty Option>(&self, f: F) -> T; + async fn wait_for_not_full Option>(&self, f: F) -> T; +} diff --git a/bbq2/src/traits/storage.rs b/bbq2/src/traits/storage.rs new file mode 100644 index 0000000..555a6ac --- /dev/null +++ b/bbq2/src/traits/storage.rs @@ -0,0 +1,146 @@ +//! "Storage" functionality +//! +//! This trait defines how the "data" part of the ring buffer is stored. +//! +//! This is typically "inline", e.g. stored as an owned `[u8; N]` array, +//! or heap allocated, e.g. as a `Box<[u8]>`. +//! +//! Inline storage is useful for static allocation, or cases where a fixed +//! buffer is useful. Heap storage is useful when you need dynamically sized +//! storage, e.g. of a size provided from CLI args or a configuration file +//! at runtime. + +use const_init::ConstInit; +use core::{cell::UnsafeCell, mem::MaybeUninit, ptr::NonNull}; + +#[cfg(feature = "alloc")] +use alloc::{boxed::Box, vec::Vec}; + +/// Trait for providing access to the storage +/// +/// Must always return the same ptr/len forever. +pub trait Storage { + fn ptr_len(&self) -> (NonNull, usize); +} + +/// A marker trait that the item is BOTH storage and can be initialized as a constant +/// +/// This allows for making `static` versions of the bbqueue. +pub trait ConstStorage: Storage + ConstInit {} +impl ConstStorage for T where T: Storage + ConstInit {} + +/// Inline/array-ful storage +#[repr(transparent)] +pub struct Inline { + buf: UnsafeCell>, +} + +unsafe impl Sync for Inline {} + +impl Inline { + pub const fn new() -> Self { + Self { + buf: UnsafeCell::new(MaybeUninit::zeroed()), + } + } +} + +impl Default for Inline { + fn default() -> Self { + Self::new() + } +} + +impl Storage for Inline { + fn ptr_len(&self) -> (NonNull, usize) { + if N == 0 { + return (NonNull::dangling(), N); + } + let ptr: *mut MaybeUninit<[u8; N]> = self.buf.get(); + let ptr: *mut u8 = ptr.cast(); + // SAFETY: UnsafeCell and MaybeUninit are both repr transparent, cast is + // sound to get to first byte element + let nn_ptr = unsafe { NonNull::new_unchecked(ptr) }; + (nn_ptr, N) + } +} + +#[allow(clippy::declare_interior_mutable_const)] +impl ConstInit for Inline { + const INIT: Self = Self::new(); +} + +impl Storage for &'_ Inline { + fn ptr_len(&self) -> (NonNull, usize) { + let len = N; + + let ptr: *mut MaybeUninit<[u8; N]> = self.buf.get(); + let ptr: *mut u8 = ptr.cast(); + let nn_ptr = unsafe { NonNull::new_unchecked(ptr) }; + + (nn_ptr, len) + } +} + +/// Boxed/heap-ful storage +#[cfg(feature = "alloc")] +pub struct BoxedSlice { + buf: Box<[UnsafeCell>]>, +} + +#[cfg(feature = "alloc")] +unsafe impl Sync for BoxedSlice {} + +#[cfg(feature = "alloc")] +impl BoxedSlice { + /// Create a new BoxedSlice with capacity `len`. + pub fn new(len: usize) -> Self { + let buf: Box<[UnsafeCell>]> = { + let mut v: Vec>> = Vec::with_capacity(len); + // Fields are already MaybeUninit, so valid capacity is valid len + unsafe { + v.set_len(len); + } + // We can zero each field now + v.iter_mut().for_each(|val| { + *val = UnsafeCell::new(MaybeUninit::zeroed()); + }); + v.into_boxed_slice() + }; + Self { buf } + } +} + +#[cfg(feature = "alloc")] +impl Storage for BoxedSlice { + fn ptr_len(&self) -> (NonNull, usize) { + let len = self.buf.len(); + + let ptr: *const UnsafeCell> = self.buf.as_ptr(); + let ptr: *mut MaybeUninit = UnsafeCell::raw_get(ptr); + let ptr: *mut u8 = ptr.cast(); + let nn_ptr = unsafe { NonNull::new_unchecked(ptr) }; + + (nn_ptr, len) + } +} + +#[cfg(test)] +mod test { + use super::{Inline, Storage}; + + #[test] + fn provenance_slice() { + let sli = Inline::<64>::new(); + let sli = &sli; + let (ptr, len) = <&Inline<64> as Storage>::ptr_len(&sli); + + // This test ensures that obtaining the pointer for ptr_len through a single + // element is sound + for i in 0..len { + unsafe { + ptr.as_ptr().add(i).write(i as u8); + } + } + } +}