use core::time::Duration;
use mnemos_trace_proto::{HostRequest, TraceEvent};
use portable_atomic::{AtomicBool, AtomicU64, AtomicU8, AtomicUsize, Ordering};
pub use tracing::*;
use tracing::{metadata::LevelFilter, subscriber::Interest};
use tracing_serde_structured::{AsSerde, SerializeRecordFields, SerializeSpanFields};
use crate::{comms::bbq, services::serial_mux};
pub struct SerialSubscriber {
tx: bbq::SpscProducer,
isr_tx: bbq::SpscProducer,
next_id: AtomicU64,
in_send: AtomicBool,
shared: &'static Shared,
}
struct Shared {
dropped_events: AtomicUsize,
dropped_spans: AtomicUsize,
dropped_metas: AtomicUsize,
dropped_span_activity: AtomicUsize,
max_level: AtomicU8,
}
static SHARED: Shared = Shared {
dropped_events: AtomicUsize::new(0),
dropped_spans: AtomicUsize::new(0),
dropped_metas: AtomicUsize::new(0),
dropped_span_activity: AtomicUsize::new(0),
max_level: AtomicU8::new(level_to_u8(LevelFilter::OFF)),
};
impl SerialSubscriber {
pub async fn start(k: &'static crate::Kernel, settings: SerialTraceSettings) -> Self {
SHARED
.max_level
.store(level_to_u8(settings.initial_level), Ordering::Release);
let (tx, rx) = bbq::new_spsc_channel(settings.tracebuf_capacity).await;
let (isr_tx, isr_rx) = bbq::new_spsc_channel(settings.tracebuf_capacity).await;
let subscriber = Self {
tx,
isr_tx,
next_id: AtomicU64::new(1),
in_send: AtomicBool::new(false),
shared: &SHARED,
};
k.spawn(async move {
let port = serial_mux::PortHandle::open(k, settings.port, settings.sendbuf_capacity)
.await
.expect("cannot initialize serial tracing, cannot open port 3!");
Self::worker(&SHARED, rx, isr_rx, port, k).await
})
.await;
subscriber
}
fn send_event<'a>(&self, sz: usize, event: impl FnOnce() -> TraceEvent<'a>) -> bool {
self.in_send.store(true, Ordering::Release);
let tx = if crate::isr::Isr::is_in_isr() {
&self.isr_tx
} else {
&self.tx
};
let Some(mut wgr) = tx.send_grant_exact_sync(sz) else {
return false;
};
let ev = event();
let len = match postcard::to_slice_cobs(&ev, &mut wgr[..]) {
Ok(encoded) => encoded.len(),
Err(_) => 0,
};
wgr.commit(len);
self.in_send.store(false, Ordering::Release);
len > 0
}
async fn worker(
shared: &'static Shared,
rx: bbq::Consumer,
isr_rx: bbq::Consumer,
port: serial_mux::PortHandle,
k: &'static crate::Kernel,
) {
use futures::FutureExt;
use maitake::time;
use postcard::accumulator::{CobsAccumulator, FeedResult};
let mut cobs_buf: CobsAccumulator<16> = CobsAccumulator::new();
let mut read_level = |rgr: bbq::GrantR| {
let mut window = &rgr[..];
let len = rgr.len();
'cobs: while !window.is_empty() {
window = match cobs_buf.feed_ref::<HostRequest>(window) {
FeedResult::Consumed => break 'cobs,
FeedResult::OverFull(new_wind) => new_wind,
FeedResult::DeserError(new_wind) => new_wind,
FeedResult::Success { data, remaining } => {
match data {
HostRequest::SetMaxLevel(lvl) => {
let level = lvl
.map(|lvl| lvl as u8)
.unwrap_or(level_to_u8(LevelFilter::OFF));
shared.max_level.store(level, Ordering::Release);
tracing::callsite::rebuild_interest_cache();
info!(
message = %"hello from mnemOS",
version = %env!("CARGO_PKG_VERSION"),
git = %format_args!(
"{}@{}",
env!("VERGEN_GIT_BRANCH"),
env!("VERGEN_GIT_DESCRIBE")
),
target = %env!("VERGEN_CARGO_TARGET_TRIPLE"),
profile = %if cfg!(debug_assertions) { "debug" } else { "release" },
);
}
}
remaining
}
};
}
rgr.release(len);
};
let mut encode_buf = [0u8; 32];
#[allow(clippy::never_loop)]
loop {
'idle: loop {
let heartbeat = {
let level = u8_to_level(shared.max_level.load(Ordering::Acquire))
.into_level()
.as_ref()
.map(AsSerde::as_serde);
postcard::to_slice_cobs(&TraceEvent::Heartbeat(level), &mut encode_buf[..])
.expect("failed to encode heartbeat msg")
};
port.send(heartbeat).await;
if let Ok(rgr) = k
.timer()
.timeout(time::Duration::from_secs(1), port.consumer().read_grant())
.await
{
read_level(rgr);
let ack = {
let level = u8_to_level(shared.max_level.load(Ordering::Acquire))
.into_level()
.as_ref()
.map(AsSerde::as_serde);
postcard::to_slice_cobs(&TraceEvent::Heartbeat(level), &mut encode_buf[..])
.expect("failed to encode heartbeat msg")
};
port.send(ack).await;
break 'idle;
}
}
loop {
futures::select_biased! {
rgr = isr_rx.read_grant().fuse() => {
let len = rgr.len();
port.send(&rgr[..]).await;
rgr.release(len);
},
rgr = rx.read_grant().fuse() => {
let len = rgr.len();
port.send(&rgr[..]).await;
rgr.release(len);
},
rgr = port.consumer().read_grant().fuse() => {
read_level(rgr);
},
_ = k.sleep(Duration::from_secs(3)).fuse() => {
let new_spans = shared.dropped_spans.swap(0, Ordering::Relaxed);
let events = shared.dropped_events.swap(0, Ordering::Relaxed);
let span_activity = shared.dropped_events.swap(0, Ordering::Relaxed);
let metas = shared.dropped_metas.swap(0, Ordering::Relaxed);
if new_spans + events + span_activity + metas > 0 {
let buf = {
let ev = TraceEvent::Discarded {
new_spans,
events,
span_activity,
metas,
};
postcard::to_slice_cobs(&ev, &mut encode_buf[..])
.expect("failed to encode dropped msg")
};
port.send(buf).await;
}
}
}
}
}
}
#[inline]
fn level_enabled(&self, metadata: &Metadata<'_>) -> bool {
metadata.level() <= &u8_to_level(self.shared.max_level.load(Ordering::Relaxed))
}
}
const BIGMSG_GRANT_SZ: usize = 256;
const TINYMSG_GRANT_SZ: usize = 8;
impl Subscriber for SerialSubscriber {
fn enabled(&self, metadata: &Metadata<'_>) -> bool {
self.level_enabled(metadata) && !self.in_send.load(Ordering::Acquire)
}
fn register_callsite(&self, metadata: &'static Metadata<'static>) -> Interest {
if !self.level_enabled(metadata) {
return Interest::never();
}
let id = metadata.callsite();
let sent = self.send_event(BIGMSG_GRANT_SZ, || TraceEvent::RegisterMeta {
id: mnemos_trace_proto::MetaId::from(id),
meta: metadata.as_serde(),
});
if !sent {
self.shared.dropped_metas.fetch_add(1, Ordering::Relaxed);
return Interest::never();
}
if metadata.target().starts_with("kernel::comms::bbq") {
return Interest::sometimes();
}
Interest::always()
}
fn max_level_hint(&self) -> Option<LevelFilter> {
Some(u8_to_level(self.shared.max_level.load(Ordering::Relaxed)))
}
fn new_span(&self, span: &span::Attributes<'_>) -> span::Id {
let id = {
let id = self.next_id.fetch_add(1, Ordering::Relaxed);
span::Id::from_u64(id)
};
if !self.send_event(BIGMSG_GRANT_SZ, || TraceEvent::NewSpan {
id: id.as_serde(),
meta: span.metadata().callsite().into(),
parent: span.parent().map(AsSerde::as_serde),
is_root: span.is_root(),
fields: SerializeSpanFields::Ser(span.values()),
}) {
self.shared.dropped_spans.fetch_add(1, Ordering::Relaxed);
}
id
}
fn record(&self, _: &span::Id, _: &span::Record<'_>) {
}
fn enter(&self, span: &span::Id) {
if !self.send_event(TINYMSG_GRANT_SZ, || TraceEvent::Enter(span.as_serde())) {
self.shared
.dropped_span_activity
.fetch_add(1, Ordering::Relaxed);
}
}
fn exit(&self, span: &span::Id) {
if !self.send_event(TINYMSG_GRANT_SZ, || TraceEvent::Exit(span.as_serde())) {
self.shared
.dropped_span_activity
.fetch_add(1, Ordering::Relaxed);
}
}
fn record_follows_from(&self, _: &span::Id, _: &span::Id) {
}
fn event(&self, event: &Event<'_>) {
if !self.send_event(BIGMSG_GRANT_SZ, || TraceEvent::Event {
meta: event.metadata().callsite().into(),
fields: SerializeRecordFields::Ser(event),
parent: event.parent().map(AsSerde::as_serde),
}) {
self.shared.dropped_events.fetch_add(1, Ordering::Relaxed);
}
}
fn clone_span(&self, span: &span::Id) -> span::Id {
if !self.send_event(TINYMSG_GRANT_SZ, || TraceEvent::CloneSpan(span.as_serde())) {
self.shared
.dropped_span_activity
.fetch_add(1, Ordering::Relaxed);
}
span.clone()
}
fn try_close(&self, span: span::Id) -> bool {
if !self.send_event(TINYMSG_GRANT_SZ, || TraceEvent::DropSpan(span.as_serde())) {
self.shared
.dropped_span_activity
.fetch_add(1, Ordering::Relaxed);
}
false
}
}
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, Serialize, Deserialize)]
#[non_exhaustive]
pub struct SerialTraceSettings {
#[serde(default)]
pub enabled: bool,
#[serde(default = "SerialTraceSettings::default_port")]
pub port: u16,
#[serde(default = "SerialTraceSettings::default_sendbuf_capacity")]
pub sendbuf_capacity: usize,
#[serde(default = "SerialTraceSettings::default_tracebuf_capacity")]
pub tracebuf_capacity: usize,
#[serde(with = "level_filter")]
#[serde(default = "SerialTraceSettings::default_initial_level")]
pub initial_level: LevelFilter,
}
pub const fn level_to_u8(level: LevelFilter) -> u8 {
match level {
LevelFilter::TRACE => 0,
LevelFilter::DEBUG => 1,
LevelFilter::INFO => 2,
LevelFilter::WARN => 3,
LevelFilter::ERROR => 4,
LevelFilter::OFF => 5,
}
}
pub const fn u8_to_level(level: u8) -> LevelFilter {
match level {
0 => LevelFilter::TRACE,
1 => LevelFilter::DEBUG,
2 => LevelFilter::INFO,
3 => LevelFilter::WARN,
4 => LevelFilter::ERROR,
_ => LevelFilter::OFF,
}
}
pub fn level_to_str(level: LevelFilter) -> &'static str {
match level {
LevelFilter::TRACE => "trace",
LevelFilter::DEBUG => "debug",
LevelFilter::INFO => "info",
LevelFilter::WARN => "warn",
LevelFilter::ERROR => "error",
LevelFilter::OFF => "off",
}
}
pub fn str_to_level(level: &str) -> Option<LevelFilter> {
level.parse().ok()
}
mod level_filter {
use serde::{de::Visitor, Deserializer, Serializer};
use super::{level_to_str, str_to_level};
pub fn serialize<S>(lf: &tracing::metadata::LevelFilter, s: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let lf_str = level_to_str(*lf);
s.serialize_str(lf_str)
}
pub fn deserialize<'de, D>(d: D) -> Result<tracing::metadata::LevelFilter, D::Error>
where
D: Deserializer<'de>,
{
struct LFVisitor;
impl<'de> Visitor<'de> for LFVisitor {
type Value = tracing::metadata::LevelFilter;
fn expecting(&self, formatter: &mut core::fmt::Formatter) -> core::fmt::Result {
formatter.write_str("a level filter as a u8 value")
}
fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
str_to_level(v).ok_or_else(|| {
E::unknown_variant(v, &["trace", "debug", "info", "warn", "error", "off"])
})
}
}
d.deserialize_str(LFVisitor)
}
}
impl SerialTraceSettings {
pub const DEFAULT_PORT: u16 = serial_mux::WellKnown::BinaryTracing as u16;
pub const DEFAULT_SENDBUF_CAPACITY: usize = BIGMSG_GRANT_SZ * 4;
pub const DEFAULT_TRACEBUF_CAPACITY: usize = Self::DEFAULT_SENDBUF_CAPACITY * 4;
pub const DEFAULT_INITIAL_LEVEL: LevelFilter = LevelFilter::INFO;
const fn default_port() -> u16 {
Self::DEFAULT_PORT
}
const fn default_sendbuf_capacity() -> usize {
Self::DEFAULT_SENDBUF_CAPACITY
}
const fn default_tracebuf_capacity() -> usize {
Self::DEFAULT_TRACEBUF_CAPACITY
}
const fn default_initial_level() -> LevelFilter {
Self::DEFAULT_INITIAL_LEVEL
}
#[must_use]
pub const fn new() -> Self {
Self {
enabled: true, port: Self::DEFAULT_PORT,
sendbuf_capacity: Self::DEFAULT_SENDBUF_CAPACITY,
tracebuf_capacity: Self::DEFAULT_TRACEBUF_CAPACITY,
initial_level: Self::DEFAULT_INITIAL_LEVEL,
}
}
#[must_use]
pub fn with_port(self, port: impl Into<u16>) -> Self {
Self {
port: port.into(),
..self
}
}
#[must_use]
pub fn with_initial_level(self, level: impl Into<LevelFilter>) -> Self {
Self {
initial_level: level.into(),
..self
}
}
#[must_use]
pub const fn with_sendbuf_capacity(self, capacity: usize) -> Self {
Self {
sendbuf_capacity: capacity,
..self
}
}
#[must_use]
pub const fn with_tracebuf_capacity(self, capacity: usize) -> Self {
Self {
tracebuf_capacity: capacity,
..self
}
}
}
impl Default for SerialTraceSettings {
fn default() -> Self {
Self::new()
}
}