Skip to main content

hydro_lang/location/
process.rs

1//! Definition of the [`Process`] location type, representing a single-node
2//! compute location in a distributed Hydro program.
3//!
4//! A [`Process`] is the simplest kind of location: it corresponds to exactly one
5//! machine (or OS process) and all live collections placed on it are materialized
6//! on that single node. Use a process when the computation does not need to be
7//! replicated or partitioned across multiple nodes.
8//!
9//! Processes are created via [`FlowBuilder::process`](crate::compile::builder::FlowBuilder::process)
10//! and are parameterized by a **tag type** (`ProcessTag`) that lets the type
11//! system distinguish different processes at compile time.
12
13use std::fmt::{Debug, Formatter};
14use std::marker::PhantomData;
15
16use super::{Location, LocationId};
17use crate::compile::builder::FlowState;
18use crate::location::{LocationKey, TopLevel};
19use crate::staging_util::Invariant;
20
21/// A single-node location in a distributed Hydro program.
22///
23/// `Process` represents exactly one machine (or OS process) and is one of the
24/// core location types that implements the [`Location`] trait. Live collections
25/// placed on a `Process` are materialized entirely on that single node.
26///
27/// The type parameter `ProcessTag` is a compile-time marker that differentiates
28/// distinct processes in the same dataflow graph (e.g. `Process<'a, Leader>` vs
29/// `Process<'a, Follower>`). It defaults to `()` when only one process is
30/// needed.
31///
32/// # Creating a Process
33/// ```rust,ignore
34/// let mut flow = FlowBuilder::new();
35/// let node = flow.process::<MyTag>();
36/// ```
37pub struct Process<'a, ProcessTag = ()> {
38    pub(crate) key: LocationKey,
39    pub(crate) flow_state: FlowState,
40    pub(crate) _phantom: Invariant<'a, ProcessTag>,
41}
42
43impl<P> Debug for Process<'_, P> {
44    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
45        write!(f, "Process({})", self.key)
46    }
47}
48
49impl<P> Eq for Process<'_, P> {}
50impl<P> PartialEq for Process<'_, P> {
51    fn eq(&self, other: &Self) -> bool {
52        self.key == other.key && FlowState::ptr_eq(&self.flow_state, &other.flow_state)
53    }
54}
55
56impl<P> Clone for Process<'_, P> {
57    fn clone(&self) -> Self {
58        Process {
59            key: self.key,
60            flow_state: self.flow_state.clone(),
61            _phantom: PhantomData,
62        }
63    }
64}
65
66impl<'a, P> super::dynamic::DynLocation for Process<'a, P> {
67    fn dyn_id(&self) -> LocationId {
68        LocationId::Process(self.key)
69    }
70
71    fn flow_state(&self) -> &FlowState {
72        &self.flow_state
73    }
74
75    fn is_top_level() -> bool {
76        true
77    }
78
79    fn multiversioned(&self) -> bool {
80        false // processes are always single-versioned
81    }
82
83    fn cluster_consistency() -> Option<super::dynamic::ClusterConsistency> {
84        None
85    }
86}
87
88impl<'a, P> Location<'a> for Process<'a, P> {
89    type Root = Self;
90
91    type DropConsistency = Self;
92
93    fn consistency() -> Option<super::dynamic::ClusterConsistency> {
94        None
95    }
96
97    fn root(&self) -> Self::Root {
98        self.clone()
99    }
100
101    fn drop_consistency(&self) -> Self::DropConsistency {
102        self.clone()
103    }
104
105    fn from_drop_consistency(l2: Self::DropConsistency) -> Self {
106        l2
107    }
108}
109
110impl<'a, P> TopLevel<'a> for Process<'a, P> {}
111
112#[cfg(feature = "sim")]
113impl<'a, P> Process<'a, P> {
114    /// Sets up a simulated input port on this location for testing.
115    ///
116    /// Returns a handle to send messages to the location as well as a stream
117    /// of received messages. This is only available when the `sim` feature is enabled.
118    #[expect(clippy::type_complexity, reason = "stream markers")]
119    pub fn sim_input<
120        T,
121        O: crate::live_collections::stream::Ordering,
122        R: crate::live_collections::stream::Retries,
123    >(
124        &self,
125    ) -> (
126        crate::sim::SimSender<T, O, R>,
127        crate::live_collections::stream::Stream<
128            T,
129            Self,
130            crate::live_collections::boundedness::Unbounded,
131            O,
132            R,
133        >,
134    )
135    where
136        T: serde::Serialize + serde::de::DeserializeOwned,
137    {
138        use crate::location::dynamic::DynLocation;
139
140        let external_location: super::External<'a, ()> = super::External {
141            key: LocationKey::FIRST,
142            flow_state: self.flow_state().clone(),
143            _phantom: PhantomData,
144        };
145
146        let (external, stream) = self.source_external_bincode(&external_location);
147
148        (crate::sim::SimSender(external.port_id, PhantomData), stream)
149    }
150
151    /// Sets up a simulated atomic input port on this process for testing.
152    ///
153    /// Unlike [`Self::sim_input`], this returns a [`super::Atomic`] stream and a
154    /// [`crate::sim::SimAtomicSender`] that synchronously sends data to that stream. The sender
155    /// guarantees that after a value is sent, it will be immediately read by any downstream
156    /// consumers without any buffering.
157    #[expect(clippy::type_complexity, reason = "stream markers")]
158    pub fn sim_atomic_input<
159        T,
160        O: crate::live_collections::stream::Ordering,
161        R: crate::live_collections::stream::Retries,
162    >(
163        &self,
164    ) -> (
165        crate::sim::SimAtomicSender<T, O, R>,
166        crate::live_collections::stream::Stream<
167            T,
168            super::Atomic<Self>,
169            crate::live_collections::boundedness::Unbounded,
170            O,
171            R,
172        >,
173    )
174    where
175        T: 'a + serde::Serialize + serde::de::DeserializeOwned,
176    {
177        use std::marker::PhantomData;
178
179        use stageleft::quote_type;
180        use tokio_util::codec::LengthDelimitedCodec;
181
182        use crate::compile::ir::{DebugInstantiate, HydroNode, HydroRoot};
183        use crate::live_collections::boundedness::Unbounded;
184        use crate::live_collections::stream::Stream;
185        use crate::location::dynamic::DynLocation;
186        use crate::location::tick::Atomic;
187        use crate::location::{Location, LocationKey, NetworkHint, Tick};
188        use crate::staging_util::get_this_crate;
189
190        let id = self.flow_state().borrow_mut().next_clock_id();
191        let atomic_location = Atomic {
192            tick: Tick {
193                id,
194                l: self.clone(),
195            },
196        };
197
198        let external_location: super::External<'a, ()> = super::External {
199            key: LocationKey::FIRST,
200            flow_state: self.flow_state().clone(),
201            _phantom: PhantomData,
202        };
203
204        let next_external_port_id = self.flow_state().borrow_mut().next_external_port();
205
206        let root = get_this_crate();
207        let in_t_type = quote_type::<T>();
208
209        let deser_fn: syn::Expr = syn::parse_quote! {
210            |res| {
211                let b = res.unwrap();
212                #root::runtime_support::bincode::deserialize::<#in_t_type>(&b).unwrap()
213            }
214        };
215
216        // Create the source stream at the Atomic location directly
217        let stream: Stream<T, Atomic<Self>, Unbounded, O, R> = Stream::new(
218            atomic_location.clone(),
219            HydroNode::ExternalInput {
220                from_external_key: external_location.key,
221                from_port_id: next_external_port_id,
222                from_many: false,
223                codec_type: quote_type::<LengthDelimitedCodec>().into(),
224                port_hint: NetworkHint::Auto,
225                instantiate_fn: DebugInstantiate::Building,
226                deserialize_fn: Some(deser_fn.into()),
227                metadata: atomic_location.new_node_metadata(Stream::<
228                    T,
229                    Atomic<Self>,
230                    Unbounded,
231                    O,
232                    R,
233                >::collection_kind()),
234            },
235        );
236
237        // Wire up a dummy send side (empty stream) so the external port is paired
238        let empty_stream: Stream<T, Self, _, _, _> = self.source_iter(stageleft::q!([]));
239        let out_t_type = quote_type::<T>();
240        let ser_fn: syn::Expr = syn::parse_quote! {
241            #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<#out_t_type, _>(
242                |b| #root::runtime_support::bincode::serialize(&b).unwrap().into()
243            )
244        };
245        self.flow_state()
246            .borrow_mut()
247            .push_root(HydroRoot::SendExternal {
248                to_external_key: external_location.key,
249                to_port_id: next_external_port_id,
250                to_many: false,
251                unpaired: false,
252                serialize_fn: Some(ser_fn.into()),
253                instantiate_fn: DebugInstantiate::Building,
254                input: Box::new(empty_stream.ir_node.replace(HydroNode::Placeholder)),
255                op_metadata: crate::compile::ir::HydroIrOpMetadata::new(),
256            });
257
258        (
259            crate::sim::SimAtomicSender(crate::sim::SimSender(next_external_port_id, PhantomData)),
260            stream,
261        )
262    }
263}