1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266
//! Kernel Channels
//!
//! Kernel Channels are an async/await, MPSC queue, with a fixed backing storage (e.g. they are bounded).
use core::{cell::UnsafeCell, ops::Deref, ptr::NonNull};
use mnemos_alloc::containers::{Arc, ArrayBuf};
use spitebuf::MpScQueue;
pub use spitebuf::{DequeueError, EnqueueError};
use tracing;
/// A Kernel Channel
pub struct KChannel<T> {
q: Arc<MpScQueue<T, sealed::SpiteData<T>>>,
}
/// A Producer for a [KChannel].
///
/// A `KProducer` can be cloned multiple times, as the backing [KChannel]
/// is an MPSC queue.
pub struct KProducer<T> {
q: Arc<MpScQueue<T, sealed::SpiteData<T>>>,
}
/// A Consumer for a [KChannel].
///
/// Only a single `KConsumer` can exist at a time for each backing [KChannel],
/// as it is an MPSC queue. A `KConsumer` can also be used to create a new
/// [KProducer] instance.
pub struct KConsumer<T> {
q: Arc<MpScQueue<T, sealed::SpiteData<T>>>,
}
/// A type-erased [KProducer]. This is currently used only for implementing
/// the type-erased driver service registry.
///
/// It contains a VTable of functions necessary for operations while type-erased,
/// namely cloning and dropping.
pub(crate) struct ErasedKProducer {
erased_q: NonNull<MpScQueue<(), sealed::SpiteData<()>>>,
dropper: unsafe fn(NonNull<MpScQueue<(), sealed::SpiteData<()>>>),
cloner: unsafe fn(&Self) -> Self,
}
// KChannel
impl<T> Clone for KChannel<T> {
fn clone(&self) -> Self {
Self { q: self.q.clone() }
}
}
impl<T> Deref for KChannel<T> {
type Target = MpScQueue<T, sealed::SpiteData<T>>;
fn deref(&self) -> &Self::Target {
&self.q
}
}
fn right_size(mut cap: usize) -> usize {
if cap < 2 {
tracing::warn!(
was = cap,
now = 2,
"Increasing KChannel size to minimum size of two!"
);
}
let npot = cap.next_power_of_two();
if npot != cap {
tracing::warn!(
was = cap,
now = npot,
"Increasing KChannel size to a power of two!",
);
cap = npot;
}
cap
}
impl<T> KChannel<T> {
/// Create a new `KChannel<T>` with room for `count` elements on the given
/// Kernel's allocator.
///
/// `count` should be a power of two >= 2, or the capacity will be increased
/// automatically.
pub async fn new_async(count: usize) -> Self {
let ba = ArrayBuf::new_uninit(right_size(count)).await;
let q = MpScQueue::new(sealed::SpiteData { data: ba });
Self {
q: Arc::new(q).await,
}
}
/// Create a new `KChannel<T>` with room for `count` elements on the given
/// Kernel's allocator. Used for pre-async initialization steps
///
/// `count` should be a power of two >= 2, or the capacity will be increased
/// automatically.
pub fn new(count: usize) -> Self {
let ba = ArrayBuf::try_new_uninit(right_size(count)).unwrap();
let q = MpScQueue::new(sealed::SpiteData { data: ba });
Self {
q: Arc::try_new(q).map_err(drop).unwrap(),
}
}
/// Split the KChannel into a pair of [KProducer] and [KConsumer].
pub fn split(self) -> (KProducer<T>, KConsumer<T>) {
let q2 = self.q.clone();
let prod = KProducer { q: self.q };
let cons = KConsumer { q: q2 };
(prod, cons)
}
/// Convert the `KChannel` directly into a `KConsumer`
///
/// Because a [KConsumer] can be used to create a [KProducer], this method
/// is handy when the producer is not immediately needed.
pub fn into_consumer(self) -> KConsumer<T> {
KConsumer { q: self.q }
}
}
// KProducer
impl<T> Clone for KProducer<T> {
fn clone(&self) -> Self {
KProducer { q: self.q.clone() }
}
}
impl<T> KProducer<T> {
/// Attempt to immediately add an `item` to the end of the queue
///
/// Returns back the `item` if the queue is full
#[inline(always)]
pub fn enqueue_sync(&self, item: T) -> Result<(), EnqueueError<T>> {
self.q.enqueue_sync(item)
}
/// Attempt to asynchronously add an `item` to the end of the queue.
///
/// If the queue is full, this method will yield until there is space
/// available.
#[inline(always)]
pub async fn enqueue_async(&self, item: T) -> Result<(), EnqueueError<T>> {
self.q.enqueue_async(item).await
}
pub(crate) fn type_erase(self) -> ErasedKProducer {
let typed_q: NonNull<MpScQueue<T, sealed::SpiteData<T>>> = Arc::into_raw(self.q);
let erased_q: NonNull<MpScQueue<(), sealed::SpiteData<()>>> = typed_q.cast();
ErasedKProducer {
erased_q,
dropper: ErasedKProducer::drop_erased::<T>,
cloner: ErasedKProducer::clone_erased::<T>,
}
}
// TODO(eliza): replace this with "close on drop" behavior...
pub(crate) fn close(&mut self) {
self.q.close()
}
}
// KConsumer
impl<T> KConsumer<T> {
/// Immediately returns the item in the front of the queue, or
/// `None` if the queue is empty
#[inline(always)]
pub fn dequeue_sync(&self) -> Option<T> {
self.q.dequeue_sync()
}
/// Await the availability of an item from the front of the queue.
///
/// If no item is available, this function will yield until an item
/// has been enqueued
#[inline(always)]
pub async fn dequeue_async(&self) -> Result<T, DequeueError> {
self.q.dequeue_async().await
}
/// Create a [KProducer] for this KConsumer (and its backing [KChannel]).
pub fn producer(&self) -> KProducer<T> {
KProducer { q: self.q.clone() }
}
}
// ErasedKProducer
impl Clone for ErasedKProducer {
fn clone(&self) -> Self {
unsafe { (self.cloner)(self) }
}
}
impl ErasedKProducer {
/// Clone the ErasedKProducer. The resulting ErasedKProducer will be for the same
/// underlying [KChannel] and type.
pub(crate) fn clone_erased<T>(&self) -> Self {
let typed_q: NonNull<MpScQueue<T, sealed::SpiteData<T>>> = self.erased_q.cast();
unsafe {
Arc::increment_strong_count(typed_q.as_ptr());
}
Self {
erased_q: self.erased_q,
dropper: self.dropper,
cloner: self.cloner,
}
}
/// Clone the ErasedKProducer, while also re-typing to the unleaked [KProducer] type.
///
/// SAFETY:
///
/// The type `T` MUST be the same `T` that was used to create this ErasedKProducer,
/// otherwise undefined behavior will occur.
pub(crate) unsafe fn clone_typed<T>(&self) -> KProducer<T> {
let typed_q: NonNull<MpScQueue<T, sealed::SpiteData<T>>> = self.erased_q.cast();
let q = unsafe {
Arc::increment_strong_count(typed_q.as_ptr());
Arc::from_raw(typed_q)
};
KProducer { q }
}
/// Drop the ErasedKProducer, while also re-typing the leaked [KProducer] type.
///
/// SAFETY:
///
/// The type `T` MUST be the same `T` that was used to create this ErasedKProducer,
/// otherwise undefined behavior will occur.
pub(crate) unsafe fn drop_erased<T>(ptr: NonNull<MpScQueue<(), sealed::SpiteData<()>>>) {
let ptr = ptr.cast::<MpScQueue<T, sealed::SpiteData<T>>>();
let _ = Arc::from_raw(ptr);
}
}
impl Drop for ErasedKProducer {
fn drop(&mut self) {
unsafe {
(self.dropper)(self.erased_q);
}
}
}
pub(crate) mod sealed {
use mnemos_alloc::containers::ArrayBuf;
use super::*;
pub struct SpiteData<T> {
pub(crate) data: ArrayBuf<spitebuf::Cell<T>>,
}
unsafe impl<T: Sized> spitebuf::Storage<T> for SpiteData<T> {
fn buf(&self) -> (*const UnsafeCell<spitebuf::Cell<T>>, usize) {
let (ptr, len) = self.data.ptrlen();
(ptr.as_ptr().cast(), len)
}
}
}