1use std::fmt::{Debug, Formatter};
14use std::marker::PhantomData;
15
16use super::{Location, LocationId};
17use crate::compile::builder::FlowState;
18use crate::location::{LocationKey, TopLevel};
19use crate::staging_util::Invariant;
20
21pub struct Process<'a, ProcessTag = ()> {
38 pub(crate) key: LocationKey,
39 pub(crate) flow_state: FlowState,
40 pub(crate) _phantom: Invariant<'a, ProcessTag>,
41}
42
43impl<P> Debug for Process<'_, P> {
44 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
45 write!(f, "Process({})", self.key)
46 }
47}
48
49impl<P> Eq for Process<'_, P> {}
50impl<P> PartialEq for Process<'_, P> {
51 fn eq(&self, other: &Self) -> bool {
52 self.key == other.key && FlowState::ptr_eq(&self.flow_state, &other.flow_state)
53 }
54}
55
56impl<P> Clone for Process<'_, P> {
57 fn clone(&self) -> Self {
58 Process {
59 key: self.key,
60 flow_state: self.flow_state.clone(),
61 _phantom: PhantomData,
62 }
63 }
64}
65
66impl<'a, P> super::dynamic::DynLocation for Process<'a, P> {
67 fn dyn_id(&self) -> LocationId {
68 LocationId::Process(self.key)
69 }
70
71 fn flow_state(&self) -> &FlowState {
72 &self.flow_state
73 }
74
75 fn is_top_level() -> bool {
76 true
77 }
78
79 fn multiversioned(&self) -> bool {
80 false }
82
83 fn cluster_consistency() -> Option<super::dynamic::ClusterConsistency> {
84 None
85 }
86}
87
88impl<'a, P> Location<'a> for Process<'a, P> {
89 type Root = Self;
90
91 type DropConsistency = Self;
92
93 fn consistency() -> Option<super::dynamic::ClusterConsistency> {
94 None
95 }
96
97 fn root(&self) -> Self::Root {
98 self.clone()
99 }
100
101 fn drop_consistency(&self) -> Self::DropConsistency {
102 self.clone()
103 }
104
105 fn from_drop_consistency(l2: Self::DropConsistency) -> Self {
106 l2
107 }
108}
109
110impl<'a, P> TopLevel<'a> for Process<'a, P> {}
111
112#[cfg(feature = "sim")]
113impl<'a, P> Process<'a, P> {
114 #[expect(clippy::type_complexity, reason = "stream markers")]
119 pub fn sim_input<
120 T,
121 O: crate::live_collections::stream::Ordering,
122 R: crate::live_collections::stream::Retries,
123 >(
124 &self,
125 ) -> (
126 crate::sim::SimSender<T, O, R>,
127 crate::live_collections::stream::Stream<
128 T,
129 Self,
130 crate::live_collections::boundedness::Unbounded,
131 O,
132 R,
133 >,
134 )
135 where
136 T: serde::Serialize + serde::de::DeserializeOwned,
137 {
138 use crate::location::dynamic::DynLocation;
139
140 let external_location: super::External<'a, ()> = super::External {
141 key: LocationKey::FIRST,
142 flow_state: self.flow_state().clone(),
143 _phantom: PhantomData,
144 };
145
146 let (external, stream) = self.source_external_bincode(&external_location);
147
148 (crate::sim::SimSender(external.port_id, PhantomData), stream)
149 }
150
151 #[expect(clippy::type_complexity, reason = "stream markers")]
158 pub fn sim_atomic_input<
159 T,
160 O: crate::live_collections::stream::Ordering,
161 R: crate::live_collections::stream::Retries,
162 >(
163 &self,
164 ) -> (
165 crate::sim::SimAtomicSender<T, O, R>,
166 crate::live_collections::stream::Stream<
167 T,
168 super::Atomic<Self>,
169 crate::live_collections::boundedness::Unbounded,
170 O,
171 R,
172 >,
173 )
174 where
175 T: 'a + serde::Serialize + serde::de::DeserializeOwned,
176 {
177 use std::marker::PhantomData;
178
179 use stageleft::quote_type;
180 use tokio_util::codec::LengthDelimitedCodec;
181
182 use crate::compile::ir::{DebugInstantiate, HydroNode, HydroRoot};
183 use crate::live_collections::boundedness::Unbounded;
184 use crate::live_collections::stream::Stream;
185 use crate::location::dynamic::DynLocation;
186 use crate::location::tick::Atomic;
187 use crate::location::{Location, LocationKey, NetworkHint, Tick};
188 use crate::staging_util::get_this_crate;
189
190 let id = self.flow_state().borrow_mut().next_clock_id();
191 let atomic_location = Atomic {
192 tick: Tick {
193 id,
194 l: self.clone(),
195 },
196 };
197
198 let external_location: super::External<'a, ()> = super::External {
199 key: LocationKey::FIRST,
200 flow_state: self.flow_state().clone(),
201 _phantom: PhantomData,
202 };
203
204 let next_external_port_id = self.flow_state().borrow_mut().next_external_port();
205
206 let root = get_this_crate();
207 let in_t_type = quote_type::<T>();
208
209 let deser_fn: syn::Expr = syn::parse_quote! {
210 |res| {
211 let b = res.unwrap();
212 #root::runtime_support::bincode::deserialize::<#in_t_type>(&b).unwrap()
213 }
214 };
215
216 let stream: Stream<T, Atomic<Self>, Unbounded, O, R> = Stream::new(
218 atomic_location.clone(),
219 HydroNode::ExternalInput {
220 from_external_key: external_location.key,
221 from_port_id: next_external_port_id,
222 from_many: false,
223 codec_type: quote_type::<LengthDelimitedCodec>().into(),
224 port_hint: NetworkHint::Auto,
225 instantiate_fn: DebugInstantiate::Building,
226 deserialize_fn: Some(deser_fn.into()),
227 metadata: atomic_location.new_node_metadata(Stream::<
228 T,
229 Atomic<Self>,
230 Unbounded,
231 O,
232 R,
233 >::collection_kind()),
234 },
235 );
236
237 let empty_stream: Stream<T, Self, _, _, _> = self.source_iter(stageleft::q!([]));
239 let out_t_type = quote_type::<T>();
240 let ser_fn: syn::Expr = syn::parse_quote! {
241 #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<#out_t_type, _>(
242 |b| #root::runtime_support::bincode::serialize(&b).unwrap().into()
243 )
244 };
245 self.flow_state()
246 .borrow_mut()
247 .push_root(HydroRoot::SendExternal {
248 to_external_key: external_location.key,
249 to_port_id: next_external_port_id,
250 to_many: false,
251 unpaired: false,
252 serialize_fn: Some(ser_fn.into()),
253 instantiate_fn: DebugInstantiate::Building,
254 input: Box::new(empty_stream.ir_node.replace(HydroNode::Placeholder)),
255 op_metadata: crate::compile::ir::HydroIrOpMetadata::new(),
256 });
257
258 (
259 crate::sim::SimAtomicSender(crate::sim::SimSender(next_external_port_id, PhantomData)),
260 stream,
261 )
262 }
263}