use super::{Command, Event, Shared, Watch};
use crate::{
stats::{self, Unsent},
ToProto, WatchRequest,
};
use console_api as proto;
use proto::resources::resource;
use tokio::sync::{mpsc, Notify};
use futures::FutureExt;
use std::{
sync::{
atomic::{AtomicBool, Ordering::*},
Arc,
},
time::{Duration, Instant},
};
use tracing_core::{span::Id, Metadata};
mod id_data;
mod shrink;
use self::id_data::{IdData, Include};
use self::shrink::{ShrinkMap, ShrinkVec};
pub(crate) struct Aggregator {
events: mpsc::Receiver<Event>,
rpcs: mpsc::Receiver<Command>,
publish_interval: Duration,
retention: Duration,
shared: Arc<Shared>,
watchers: ShrinkVec<Watch<proto::instrument::Update>>,
details_watchers: ShrinkMap<Id, Vec<Watch<proto::tasks::TaskDetails>>>,
all_metadata: ShrinkVec<proto::register_metadata::NewMetadata>,
new_metadata: Vec<proto::register_metadata::NewMetadata>,
tasks: IdData<Task>,
task_stats: IdData<Arc<stats::TaskStats>>,
resources: IdData<Resource>,
resource_stats: IdData<Arc<stats::ResourceStats>>,
async_ops: IdData<AsyncOp>,
async_op_stats: IdData<Arc<stats::AsyncOpStats>>,
poll_ops: Vec<proto::resources::PollOp>,
temporality: Temporality,
base_time: stats::TimeAnchor,
}
#[derive(Debug, Default)]
pub(crate) struct Flush {
pub(crate) should_flush: Notify,
triggered: AtomicBool,
}
#[derive(Debug)]
enum Temporality {
Live,
Paused,
}
struct Resource {
id: Id,
is_dirty: AtomicBool,
parent_id: Option<Id>,
metadata: &'static Metadata<'static>,
concrete_type: String,
kind: resource::Kind,
location: Option<proto::Location>,
is_internal: bool,
}
struct Task {
id: Id,
is_dirty: AtomicBool,
metadata: &'static Metadata<'static>,
fields: Vec<proto::Field>,
location: Option<proto::Location>,
}
struct AsyncOp {
id: Id,
is_dirty: AtomicBool,
parent_id: Option<Id>,
resource_id: Id,
metadata: &'static Metadata<'static>,
source: String,
}
impl Aggregator {
pub(crate) fn new(
events: mpsc::Receiver<Event>,
rpcs: mpsc::Receiver<Command>,
builder: &crate::Builder,
shared: Arc<crate::Shared>,
base_time: stats::TimeAnchor,
) -> Self {
Self {
shared,
rpcs,
publish_interval: builder.publish_interval,
retention: builder.retention,
events,
watchers: Default::default(),
details_watchers: Default::default(),
all_metadata: Default::default(),
new_metadata: Default::default(),
tasks: IdData::default(),
task_stats: IdData::default(),
resources: IdData::default(),
resource_stats: IdData::default(),
async_ops: IdData::default(),
async_op_stats: IdData::default(),
poll_ops: Default::default(),
temporality: Temporality::Live,
base_time,
}
}
pub(crate) async fn run(mut self) {
let mut publish = tokio::time::interval(self.publish_interval);
loop {
let should_send = tokio::select! {
_ = publish.tick() => {
match self.temporality {
Temporality::Live => true,
Temporality::Paused => false,
}
}
_ = self.shared.flush.should_flush.notified() => {
tracing::debug!("approaching capacity; draining buffer");
false
}
cmd = self.rpcs.recv() => {
match cmd {
Some(Command::Instrument(subscription)) => {
self.add_instrument_subscription(subscription);
},
Some(Command::WatchTaskDetail(watch_request)) => {
self.add_task_detail_subscription(watch_request);
},
Some(Command::Pause) => {
self.temporality = Temporality::Paused;
}
Some(Command::Resume) => {
self.temporality = Temporality::Live;
}
None => {
tracing::debug!("rpc channel closed, terminating");
return;
}
};
false
}
};
let mut drained = false;
while let Some(event) = self.events.recv().now_or_never() {
match event {
Some(event) => {
self.update_state(event);
drained = true;
}
None => {
tracing::debug!("event channel closed; terminating");
return;
}
};
}
if !self.watchers.is_empty() && should_send {
self.publish();
}
self.cleanup_closed();
if drained {
self.shared.flush.has_flushed();
}
}
}
fn cleanup_closed(&mut self) {
let now = Instant::now();
let has_watchers = !self.watchers.is_empty();
self.tasks
.drop_closed(&mut self.task_stats, now, self.retention, has_watchers);
self.resources
.drop_closed(&mut self.resource_stats, now, self.retention, has_watchers);
self.async_ops
.drop_closed(&mut self.async_op_stats, now, self.retention, has_watchers);
}
fn add_instrument_subscription(&mut self, subscription: Watch<proto::instrument::Update>) {
tracing::debug!("new instrument subscription");
let task_update = Some(self.task_update(Include::All));
let resource_update = Some(self.resource_update(Include::All));
let async_op_update = Some(self.async_op_update(Include::All));
let now = Instant::now();
let update = &proto::instrument::Update {
task_update,
resource_update,
async_op_update,
now: Some(self.base_time.to_timestamp(now)),
new_metadata: Some(proto::RegisterMetadata {
metadata: (*self.all_metadata).clone(),
}),
};
if subscription.update(update) {
self.watchers.push(subscription)
}
}
fn task_update(&mut self, include: Include) -> proto::tasks::TaskUpdate {
proto::tasks::TaskUpdate {
new_tasks: self.tasks.as_proto_list(include, &self.base_time),
stats_update: self.task_stats.as_proto(include, &self.base_time),
dropped_events: self.shared.dropped_tasks.swap(0, AcqRel) as u64,
}
}
fn resource_update(&mut self, include: Include) -> proto::resources::ResourceUpdate {
let new_poll_ops = match include {
Include::All => self.poll_ops.clone(),
Include::UpdatedOnly => std::mem::take(&mut self.poll_ops),
};
proto::resources::ResourceUpdate {
new_resources: self.resources.as_proto_list(include, &self.base_time),
stats_update: self.resource_stats.as_proto(include, &self.base_time),
new_poll_ops,
dropped_events: self.shared.dropped_resources.swap(0, AcqRel) as u64,
}
}
fn async_op_update(&mut self, include: Include) -> proto::async_ops::AsyncOpUpdate {
proto::async_ops::AsyncOpUpdate {
new_async_ops: self.async_ops.as_proto_list(include, &self.base_time),
stats_update: self.async_op_stats.as_proto(include, &self.base_time),
dropped_events: self.shared.dropped_async_ops.swap(0, AcqRel) as u64,
}
}
fn add_task_detail_subscription(
&mut self,
watch_request: WatchRequest<proto::tasks::TaskDetails>,
) {
let WatchRequest {
id,
stream_sender,
buffer,
} = watch_request;
tracing::debug!(id = ?id, "new task details subscription");
if let Some(stats) = self.task_stats.get(&id) {
let (tx, rx) = mpsc::channel(buffer);
let subscription = Watch(tx);
let now = Some(self.base_time.to_timestamp(Instant::now()));
if stream_sender.send(rx).is_ok()
&& subscription.update(&proto::tasks::TaskDetails {
task_id: Some(id.clone().into()),
now,
poll_times_histogram: Some(stats.poll_duration_histogram()),
scheduled_times_histogram: Some(stats.scheduled_duration_histogram()),
})
{
self.details_watchers
.entry(id.clone())
.or_insert_with(Vec::new)
.push(subscription);
}
}
}
fn publish(&mut self) {
let new_metadata = if !self.new_metadata.is_empty() {
Some(proto::RegisterMetadata {
metadata: std::mem::take(&mut self.new_metadata),
})
} else {
None
};
let task_update = Some(self.task_update(Include::UpdatedOnly));
let resource_update = Some(self.resource_update(Include::UpdatedOnly));
let async_op_update = Some(self.async_op_update(Include::UpdatedOnly));
let update = proto::instrument::Update {
now: Some(self.base_time.to_timestamp(Instant::now())),
new_metadata,
task_update,
resource_update,
async_op_update,
};
self.watchers
.retain_and_shrink(|watch: &Watch<proto::instrument::Update>| watch.update(&update));
let stats = &self.task_stats;
self.details_watchers.retain_and_shrink(|id, watchers| {
if let Some(task_stats) = stats.get(id) {
let details = proto::tasks::TaskDetails {
task_id: Some(id.clone().into()),
now: Some(self.base_time.to_timestamp(Instant::now())),
poll_times_histogram: Some(task_stats.poll_duration_histogram()),
scheduled_times_histogram: Some(task_stats.scheduled_duration_histogram()),
};
watchers.retain(|watch| watch.update(&details));
!watchers.is_empty()
} else {
false
}
});
}
fn update_state(&mut self, event: Event) {
match event {
Event::Metadata(meta) => {
self.all_metadata.push(meta.into());
self.new_metadata.push(meta.into());
}
Event::Spawn {
id,
metadata,
stats,
fields,
location,
} => {
self.tasks.insert(
id.clone(),
Task {
id: id.clone(),
is_dirty: AtomicBool::new(true),
metadata,
fields,
location,
},
);
self.task_stats.insert(id, stats);
}
Event::Resource {
id,
parent_id,
metadata,
kind,
concrete_type,
location,
is_internal,
stats,
} => {
self.resources.insert(
id.clone(),
Resource {
id: id.clone(),
is_dirty: AtomicBool::new(true),
parent_id,
kind,
metadata,
concrete_type,
location,
is_internal,
},
);
self.resource_stats.insert(id, stats);
}
Event::PollOp {
metadata,
resource_id,
op_name,
async_op_id,
task_id,
is_ready,
} => {
let poll_op = proto::resources::PollOp {
metadata: Some(metadata.into()),
resource_id: Some(resource_id.into()),
name: op_name,
task_id: Some(task_id.into()),
async_op_id: Some(async_op_id.into()),
is_ready,
};
self.poll_ops.push(poll_op);
}
Event::AsyncResourceOp {
id,
source,
resource_id,
metadata,
parent_id,
stats,
} => {
self.async_ops.insert(
id.clone(),
AsyncOp {
id: id.clone(),
is_dirty: AtomicBool::new(true),
resource_id,
metadata,
source,
parent_id,
},
);
self.async_op_stats.insert(id, stats);
}
}
}
}
impl Flush {
pub(crate) fn trigger(&self) {
if self
.triggered
.compare_exchange(false, true, AcqRel, Acquire)
.is_ok()
{
self.should_flush.notify_one();
} else {
}
}
fn has_flushed(&self) {
let _ = self
.triggered
.compare_exchange(true, false, AcqRel, Acquire);
}
}
impl<T: Clone> Watch<T> {
fn update(&self, update: &T) -> bool {
if let Ok(reserve) = self.0.try_reserve() {
reserve.send(Ok(update.clone()));
true
} else {
false
}
}
}
impl ToProto for Task {
type Output = proto::tasks::Task;
fn to_proto(&self, _: &stats::TimeAnchor) -> Self::Output {
proto::tasks::Task {
id: Some(self.id.clone().into()),
kind: proto::tasks::task::Kind::Spawn as i32,
metadata: Some(self.metadata.into()),
parents: Vec::new(), fields: self.fields.clone(),
location: self.location.clone(),
}
}
}
impl Unsent for Task {
fn take_unsent(&self) -> bool {
self.is_dirty.swap(false, AcqRel)
}
fn is_unsent(&self) -> bool {
self.is_dirty.load(Acquire)
}
}
impl ToProto for Resource {
type Output = proto::resources::Resource;
fn to_proto(&self, _: &stats::TimeAnchor) -> Self::Output {
proto::resources::Resource {
id: Some(self.id.clone().into()),
parent_resource_id: self.parent_id.clone().map(Into::into),
kind: Some(self.kind.clone()),
metadata: Some(self.metadata.into()),
concrete_type: self.concrete_type.clone(),
location: self.location.clone(),
is_internal: self.is_internal,
}
}
}
impl Unsent for Resource {
fn take_unsent(&self) -> bool {
self.is_dirty.swap(false, AcqRel)
}
fn is_unsent(&self) -> bool {
self.is_dirty.load(Acquire)
}
}
impl ToProto for AsyncOp {
type Output = proto::async_ops::AsyncOp;
fn to_proto(&self, _: &stats::TimeAnchor) -> Self::Output {
proto::async_ops::AsyncOp {
id: Some(self.id.clone().into()),
metadata: Some(self.metadata.into()),
resource_id: Some(self.resource_id.clone().into()),
source: self.source.clone(),
parent_async_op_id: self.parent_id.clone().map(Into::into),
}
}
}
impl Unsent for AsyncOp {
fn take_unsent(&self) -> bool {
self.is_dirty.swap(false, AcqRel)
}
fn is_unsent(&self) -> bool {
self.is_dirty.load(Acquire)
}
}