Skip to main content

hydro_lang/location/
mod.rs

1//! Type definitions for distributed locations, which specify where pieces of a Hydro
2//! program will be executed.
3//!
4//! Hydro is a **global**, **distributed** programming model. This means that the data
5//! and computation in a Hydro program can be spread across multiple machines, data
6//! centers, and even continents. To achieve this, Hydro uses the concept of
7//! **locations** to keep track of _where_ data is located and computation is executed.
8//!
9//! Each live collection type (in [`crate::live_collections`]) has a type parameter `L`
10//! which will always be a type that implements the [`Location`] trait (e.g. [`Process`]
11//! and [`Cluster`]). To create distributed programs, Hydro provides a variety of APIs
12//! to allow live collections to be _moved_ between locations via network send/receive.
13//!
14//! See [the Hydro docs](https://hydro.run/docs/hydro/reference/locations/) for more information.
15
16use std::fmt::Debug;
17use std::future::Future;
18use std::marker::PhantomData;
19use std::num::ParseIntError;
20use std::time::Duration;
21
22use bytes::{Bytes, BytesMut};
23use futures::stream::Stream as FuturesStream;
24use proc_macro2::Span;
25use quote::quote;
26use serde::de::DeserializeOwned;
27use serde::{Deserialize, Serialize};
28use slotmap::{Key, new_key_type};
29use stageleft::runtime_support::{FreeVariableWithContextWithProps, QuoteTokens};
30use stageleft::{QuotedWithContext, q, quote_type};
31use syn::parse_quote;
32use tokio_util::codec::{Decoder, Encoder, LengthDelimitedCodec};
33
34use crate::compile::ir::{
35    ClusterMembersState, DebugInstantiate, HydroIrOpMetadata, HydroNode, HydroRoot, HydroSource,
36};
37use crate::forward_handle::ForwardRef;
38#[cfg(stageleft_runtime)]
39use crate::forward_handle::{CycleCollection, ForwardHandle};
40use crate::live_collections::boundedness::{Bounded, Unbounded};
41use crate::live_collections::keyed_stream::KeyedStream;
42use crate::live_collections::singleton::Singleton;
43use crate::live_collections::stream::{
44    ExactlyOnce, NoOrder, Ordering, Retries, Stream, TotalOrder,
45};
46#[cfg(stageleft_runtime)]
47use crate::location::dynamic::DynLocation;
48use crate::location::dynamic::{ClusterConsistency, LocationId};
49use crate::location::external_process::{
50    ExternalBincodeBidi, ExternalBincodeSink, ExternalBytesPort, Many, NotMany,
51};
52use crate::nondet::NonDet;
53use crate::properties::manual_proof;
54use crate::staging_util::get_this_crate;
55
56pub mod dynamic;
57
58pub mod external_process;
59pub use external_process::External;
60
61pub mod process;
62pub use process::Process;
63
64pub mod cluster;
65pub use cluster::Cluster;
66
67pub mod member_id;
68pub use member_id::{MemberId, TaglessMemberId};
69
70pub mod tick;
71pub use tick::{Atomic, Tick};
72
73/// An event indicating a change in membership status of a location in a group
74/// (e.g. a node in a [`Cluster`] or an external client connection).
75#[derive(PartialEq, Eq, Clone, Debug, Hash, Serialize, Deserialize)]
76pub enum MembershipEvent {
77    /// The member has joined the group and is now active.
78    Joined,
79    /// The member has left the group and is no longer active.
80    Left,
81}
82
83/// A hint for configuring the network transport used by an external connection.
84///
85/// This controls how the underlying TCP listener is set up when binding
86/// external client connections via methods like [`Location::bind_single_client`]
87/// or [`Location::bidi_external_many_bytes`].
88#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
89pub enum NetworkHint {
90    /// Automatically select the network configuration (e.g. an ephemeral port).
91    Auto,
92    /// Use a TCP port, optionally specifying a fixed port number.
93    ///
94    /// If `None`, an available port will be chosen automatically.
95    /// If `Some(port)`, the given port number will be used.
96    TcpPort(Option<u16>),
97}
98
99pub(crate) fn check_matching_location<'a, L: Location<'a>>(l1: &L, l2: &L) {
100    assert_eq!(Location::id(l1), Location::id(l2), "locations do not match");
101}
102
103#[stageleft::export(LocationKey)]
104new_key_type! {
105    /// A unique identifier for a clock tick.
106    pub struct LocationKey;
107}
108
109impl std::fmt::Display for LocationKey {
110    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
111        write!(f, "loc{:?}", self.data()) // `"loc1v1"``
112    }
113}
114
115/// This is used for the ECS membership stream.
116/// TODO(mingwei): Make this more robust?
117impl std::str::FromStr for LocationKey {
118    type Err = Option<ParseIntError>;
119
120    fn from_str(s: &str) -> Result<Self, Self::Err> {
121        let nvn = s.strip_prefix("loc").ok_or(None)?;
122        let (idx, ver) = nvn.split_once("v").ok_or(None)?;
123        let idx: u64 = idx.parse()?;
124        let ver: u64 = ver.parse()?;
125        Ok(slotmap::KeyData::from_ffi((ver << 32) | idx).into())
126    }
127}
128
129impl LocationKey {
130    /// TODO(minwgei): Remove this and avoid magic key for simulator external.
131    /// The first location key, used by the simulator as the default external location.
132    pub const FIRST: Self = Self(slotmap::KeyData::from_ffi(0x0000000100000001)); // `1v1`
133
134    /// A key for testing with index 1.
135    #[cfg(test)]
136    pub const TEST_KEY_1: Self = Self(slotmap::KeyData::from_ffi(0x000000FF00000001)); // `1v255`
137
138    /// A key for testing with index 2.
139    #[cfg(test)]
140    pub const TEST_KEY_2: Self = Self(slotmap::KeyData::from_ffi(0x000000FF00000002)); // `2v255`
141}
142
143/// This is used within `q!` code in docker and ECS.
144impl<Ctx> FreeVariableWithContextWithProps<Ctx, ()> for LocationKey {
145    type O = LocationKey;
146
147    fn to_tokens(self, _ctx: &Ctx) -> (QuoteTokens, ())
148    where
149        Self: Sized,
150    {
151        let root = get_this_crate();
152        let n = Key::data(&self).as_ffi();
153        (
154            QuoteTokens {
155                prelude: None,
156                expr: Some(quote! {
157                    #root::location::LocationKey::from(#root::runtime_support::slotmap::KeyData::from_ffi(#n))
158                }),
159            },
160            (),
161        )
162    }
163}
164
165/// A simple enum for the type of a root location.
166#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq, Serialize)]
167pub enum LocationType {
168    /// A process (single node).
169    Process,
170    /// A cluster (multiple nodes).
171    Cluster,
172    /// An external client.
173    External,
174}
175
176/// A top-level location (i.e. a [`Process`] or [`Cluster`]) that is outside a tick / atomic region.
177pub trait TopLevel<'a>: Location<'a> {}
178
179/// A location where data can be materialized and computation can be executed.
180///
181/// Hydro is a **global**, **distributed** programming model. This means that the data
182/// and computation in a Hydro program can be spread across multiple machines, data
183/// centers, and even continents. To achieve this, Hydro uses the concept of
184/// **locations** to keep track of _where_ data is located and computation is executed.
185///
186/// Each live collection type (in [`crate::live_collections`]) has a type parameter `L`
187/// which will always be a type that implements the [`Location`] trait (e.g. [`Process`]
188/// and [`Cluster`]). To create distributed programs, Hydro provides a variety of APIs
189/// to allow live collections to be _moved_ between locations via network send/receive.
190///
191/// See [the Hydro docs](https://hydro.run/docs/hydro/reference/locations/) for more information.
192#[expect(
193    private_bounds,
194    reason = "only internal Hydro code can define location types"
195)]
196pub trait Location<'a>: DynLocation {
197    /// The root location type for this location.
198    ///
199    /// For top-level locations like [`Process`] and [`Cluster`], this is `Self`.
200    /// For nested locations like [`Tick`], this is the root location that contains it.
201    type Root: Location<'a>;
202
203    /// Location type with consistency guarantees dropped for the live collection on it.
204    type DropConsistency: Location<'a, DropConsistency = Self::DropConsistency>;
205
206    /// Returns the root location for this location.
207    ///
208    /// For top-level locations like [`Process`] and [`Cluster`], this returns `self`.
209    /// For nested locations like [`Tick`], this returns the root location that contains it.
210    fn root(&self) -> Self::Root;
211
212    /// This location but with consistency guarantees dropped for the live collection
213    fn drop_consistency(&self) -> Self::DropConsistency;
214    /// Gets the runtime enum variant for the current consistency level, if this is a cluster.
215    fn consistency() -> Option<ClusterConsistency>;
216
217    /// Updates the consistency guarantees to match that of the given location.
218    fn with_consistency_of<L2: Location<'a, DropConsistency = Self::DropConsistency>>(&self) -> L2 {
219        L2::from_drop_consistency(self.drop_consistency())
220    }
221
222    #[doc(hidden)]
223    fn from_drop_consistency(l2: Self::DropConsistency) -> Self;
224
225    /// Attempts to create a new [`Tick`] clock domain at this location.
226    ///
227    /// Returns `Some(Tick)` if this is a top-level location (like [`Process`] or [`Cluster`]),
228    /// or `None` if this location is already inside a tick (nested ticks are not supported).
229    ///
230    /// Prefer using [`Location::tick`] when you know the location is top-level.
231    fn try_tick(&self) -> Option<Tick<Self>> {
232        if Self::is_top_level() {
233            let id = self.flow_state().borrow_mut().next_clock_id();
234            Some(Tick {
235                id,
236                l: self.clone(),
237            })
238        } else {
239            None
240        }
241    }
242
243    /// Returns the unique identifier for this location.
244    fn id(&self) -> LocationId {
245        DynLocation::dyn_id(self)
246    }
247
248    /// Creates a new [`Tick`] clock domain at this location.
249    ///
250    /// A tick represents a logical clock that can be used to batch streaming data
251    /// into discrete time steps. This is useful for implementing iterative algorithms
252    /// or for synchronizing data across multiple streams.
253    ///
254    /// # Example
255    /// ```rust
256    /// # #[cfg(feature = "deploy")] {
257    /// # use hydro_lang::prelude::*;
258    /// # use futures::StreamExt;
259    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
260    /// let tick = process.tick();
261    /// let inside_tick = process
262    ///     .source_iter(q!(vec![1, 2, 3, 4]))
263    ///     .batch(&tick, nondet!(/** test */));
264    /// inside_tick.all_ticks()
265    /// # }, |mut stream| async move {
266    /// // 1, 2, 3, 4
267    /// # for w in vec![1, 2, 3, 4] {
268    /// #     assert_eq!(stream.next().await.unwrap(), w);
269    /// # }
270    /// # }));
271    /// # }
272    /// ```
273    fn tick(&self) -> Tick<Self> {
274        if let LocationId::Tick(_, _) = self.id() {
275            panic!("cannot create nested ticks");
276        }
277
278        let id = self.flow_state().borrow_mut().next_clock_id();
279        Tick {
280            id,
281            l: self.clone(),
282        }
283    }
284
285    /// Creates an unbounded stream that continuously emits unit values `()`.
286    ///
287    /// This is useful for driving computations that need to run continuously,
288    /// such as polling or heartbeat mechanisms.
289    ///
290    /// # Example
291    /// ```rust
292    /// # #[cfg(feature = "deploy")] {
293    /// # use hydro_lang::prelude::*;
294    /// # use futures::StreamExt;
295    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
296    /// let tick = process.tick();
297    /// process.spin()
298    ///     .batch(&tick, nondet!(/** test */))
299    ///     .map(q!(|_| 42))
300    ///     .all_ticks()
301    /// # }, |mut stream| async move {
302    /// // 42, 42, 42, ...
303    /// # assert_eq!(stream.next().await.unwrap(), 42);
304    /// # assert_eq!(stream.next().await.unwrap(), 42);
305    /// # assert_eq!(stream.next().await.unwrap(), 42);
306    /// # }));
307    /// # }
308    /// ```
309    fn spin(&self) -> Stream<(), Self, Unbounded, TotalOrder, ExactlyOnce>
310    where
311        Self: TopLevel<'a> + Sized,
312    {
313        Stream::new(
314            self.clone(),
315            HydroNode::Source {
316                source: HydroSource::Spin(),
317                metadata: self.new_node_metadata(Stream::<
318                    (),
319                    Self,
320                    Unbounded,
321                    TotalOrder,
322                    ExactlyOnce,
323                >::collection_kind()),
324            },
325        )
326    }
327
328    /// Creates a stream from an async [`FuturesStream`].
329    ///
330    /// This is useful for integrating with external async data sources,
331    /// such as network connections or file readers.
332    ///
333    /// # Example
334    /// ```rust
335    /// # #[cfg(feature = "deploy")] {
336    /// # use hydro_lang::prelude::*;
337    /// # use futures::StreamExt;
338    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
339    /// process.source_stream(q!(futures::stream::iter(vec![1, 2, 3])))
340    /// # }, |mut stream| async move {
341    /// // 1, 2, 3
342    /// # for w in vec![1, 2, 3] {
343    /// #     assert_eq!(stream.next().await.unwrap(), w);
344    /// # }
345    /// # }));
346    /// # }
347    /// ```
348    fn source_stream<T, E>(
349        &self,
350        e: impl QuotedWithContext<'a, E, Self>,
351    ) -> Stream<T, Self::DropConsistency, Unbounded, TotalOrder, ExactlyOnce>
352    where
353        E: FuturesStream<Item = T> + Unpin,
354        Self: TopLevel<'a> + Sized,
355    {
356        let e = e.splice_untyped_ctx(self);
357
358        let target_location = self.drop_consistency();
359        Stream::new(
360            target_location.clone(),
361            HydroNode::Source {
362                source: HydroSource::Stream(e.into()),
363                metadata: target_location.new_node_metadata(Stream::<
364                    T,
365                    Self::DropConsistency,
366                    Unbounded,
367                    TotalOrder,
368                    ExactlyOnce,
369                >::collection_kind()),
370            },
371        )
372    }
373
374    /// Creates a bounded stream from an iterator.
375    ///
376    /// The iterator is evaluated once at runtime, and all elements are emitted
377    /// in order. This is useful for creating streams from static data or
378    /// for testing.
379    ///
380    /// # Example
381    /// ```rust
382    /// # #[cfg(feature = "deploy")] {
383    /// # use hydro_lang::prelude::*;
384    /// # use futures::StreamExt;
385    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
386    /// process.source_iter(q!(vec![1, 2, 3, 4]))
387    /// # }, |mut stream| async move {
388    /// // 1, 2, 3, 4
389    /// # for w in vec![1, 2, 3, 4] {
390    /// #     assert_eq!(stream.next().await.unwrap(), w);
391    /// # }
392    /// # }));
393    /// # }
394    /// ```
395    fn source_iter<T, E>(
396        &self,
397        e: impl QuotedWithContext<'a, E, Self>,
398    ) -> Stream<T, Self::DropConsistency, Bounded, TotalOrder, ExactlyOnce>
399    where
400        E: IntoIterator<Item = T>,
401        Self: Sized,
402    {
403        let e = e.splice_typed_ctx(self);
404
405        let target_location = self.drop_consistency();
406        Stream::new(
407            target_location.clone(),
408            HydroNode::Source {
409                source: HydroSource::Iter(e.into()),
410                metadata: target_location.new_node_metadata(Stream::<
411                    T,
412                    Self::DropConsistency,
413                    Bounded,
414                    TotalOrder,
415                    ExactlyOnce,
416                >::collection_kind()),
417            },
418        )
419    }
420
421    #[deprecated(note = "use .source_cluster_membership_stream(...) instead")]
422    /// Creates a stream of membership events for a cluster.
423    ///
424    /// This stream emits [`MembershipEvent::Joined`] when a cluster member joins
425    /// and [`MembershipEvent::Left`] when a cluster member leaves. The stream is
426    /// keyed by the [`MemberId`] of the cluster member.
427    ///
428    /// This is useful for implementing protocols that need to track cluster membership,
429    /// such as broadcasting to all members or detecting failures.
430    ///
431    /// # Non-Determinism
432    /// This stream is non-deterministic because the timing of membership events, for example
433    /// if a node leaves, the membership event may not be received if the node left before the
434    /// stream was created.
435    ///
436    /// # Example
437    /// ```rust
438    /// # #[cfg(feature = "deploy")] {
439    /// # use hydro_lang::prelude::*;
440    /// # use futures::StreamExt;
441    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
442    /// let p1 = flow.process::<()>();
443    /// let workers: Cluster<()> = flow.cluster::<()>();
444    /// # // do nothing on each worker
445    /// # workers.source_iter(q!(vec![])).for_each(q!(|_: ()| {}));
446    /// let cluster_members = p1.source_cluster_members(&workers, nondet!(/** late joiners may miss events */));
447    /// # cluster_members.entries().send(&p2, TCP.fail_stop().bincode())
448    /// // if there are 4 members in the cluster, we would see a join event for each
449    /// // { MemberId::<Worker>(0): [MembershipEvent::Join], MemberId::<Worker>(2): [MembershipEvent::Join], ... }
450    /// # }, |mut stream| async move {
451    /// # let mut results = Vec::new();
452    /// # for w in 0..4 {
453    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
454    /// # }
455    /// # results.sort();
456    /// # assert_eq!(results, vec!["(MemberId::<()>(0), Joined)", "(MemberId::<()>(1), Joined)", "(MemberId::<()>(2), Joined)", "(MemberId::<()>(3), Joined)"]);
457    /// # }));
458    /// # }
459    /// ```
460    fn source_cluster_members<C: 'a>(
461        &self,
462        cluster: &Cluster<'a, C>,
463        nondet_start: NonDet,
464    ) -> KeyedStream<MemberId<C>, MembershipEvent, Self::DropConsistency, Unbounded>
465    where
466        Self: TopLevel<'a> + Sized,
467    {
468        self.source_cluster_membership_stream(cluster, nondet_start)
469    }
470
471    /// Creates a stream of membership events for a cluster.
472    ///
473    /// This stream emits [`MembershipEvent::Joined`] when a cluster member joins
474    /// and [`MembershipEvent::Left`] when a cluster member leaves. The stream is
475    /// keyed by the [`MemberId`] of the cluster member.
476    ///
477    /// This is useful for implementing protocols that need to track cluster membership,
478    /// such as broadcasting to all members or detecting failures.
479    ///
480    /// # Non-Determinism
481    /// This stream is non-deterministic because the timing of membership events, for example
482    /// if a node leaves, the membership event may not be received if the node left before the
483    /// stream was created.
484    ///
485    /// # Example
486    /// ```rust
487    /// # #[cfg(feature = "deploy")] {
488    /// # use hydro_lang::prelude::*;
489    /// # use futures::StreamExt;
490    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
491    /// let p1 = flow.process::<()>();
492    /// let workers: Cluster<()> = flow.cluster::<()>();
493    /// # // do nothing on each worker
494    /// # workers.source_iter(q!(vec![])).for_each(q!(|_: ()| {}));
495    /// let cluster_members = p1.source_cluster_membership_stream(&workers, nondet!(/** late joiners may miss events */));
496    /// # cluster_members.entries().send(&p2, TCP.fail_stop().bincode())
497    /// // if there are 4 members in the cluster, we would see a join event for each
498    /// // { MemberId::<Worker>(0): [MembershipEvent::Join], MemberId::<Worker>(2): [MembershipEvent::Join], ... }
499    /// # }, |mut stream| async move {
500    /// # let mut results = Vec::new();
501    /// # for w in 0..4 {
502    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
503    /// # }
504    /// # results.sort();
505    /// # assert_eq!(results, vec!["(MemberId::<()>(0), Joined)", "(MemberId::<()>(1), Joined)", "(MemberId::<()>(2), Joined)", "(MemberId::<()>(3), Joined)"]);
506    /// # }));
507    /// # }
508    /// ```
509    fn source_cluster_membership_stream<C: 'a>(
510        &self,
511        cluster: &Cluster<'a, C>,
512        _nondet_start: NonDet,
513    ) -> KeyedStream<MemberId<C>, MembershipEvent, Self::DropConsistency, Unbounded>
514    where
515        Self: TopLevel<'a> + Sized,
516    {
517        let target_consistency = self.drop_consistency();
518        Stream::new(
519            target_consistency.clone(),
520            HydroNode::Source {
521                source: HydroSource::ClusterMembers(cluster.id(), ClusterMembersState::Uninit),
522                metadata: target_consistency.new_node_metadata(Stream::<
523                    (TaglessMemberId, MembershipEvent),
524                    Self,
525                    Unbounded,
526                    TotalOrder,
527                    ExactlyOnce,
528                >::collection_kind(
529                )),
530            },
531        )
532        .map(q!(|(k, v)| (MemberId::from_tagless(k), v)))
533        .into_keyed()
534    }
535
536    /// Creates a one-way connection from an external process to receive raw bytes.
537    ///
538    /// Returns a port handle for the external process to connect to, and a stream
539    /// of received byte buffers.
540    ///
541    /// For bidirectional communication or typed data, see [`Location::bind_single_client`]
542    /// or [`Location::source_external_bincode`].
543    fn source_external_bytes<L>(
544        &self,
545        from: &External<L>,
546    ) -> (
547        ExternalBytesPort,
548        Stream<BytesMut, Self::DropConsistency, Unbounded, TotalOrder, ExactlyOnce>,
549    )
550    where
551        Self: TopLevel<'a> + Sized,
552    {
553        let (port, stream, sink) =
554            self.bind_single_client::<_, Bytes, LengthDelimitedCodec>(from, NetworkHint::Auto);
555
556        sink.complete(stream.location().source_iter(q!([])));
557
558        (port, stream)
559    }
560
561    /// Creates a one-way connection from an external process to receive bincode-serialized data.
562    ///
563    /// Returns a sink handle for the external process to send data to, and a stream
564    /// of received values.
565    ///
566    /// For bidirectional communication, see [`Location::bind_single_client_bincode`].
567    #[expect(clippy::type_complexity, reason = "stream markers")]
568    fn source_external_bincode<L, T, O: Ordering, R: Retries>(
569        &self,
570        from: &External<L>,
571    ) -> (
572        ExternalBincodeSink<T, NotMany, O, R>,
573        Stream<T, Self::DropConsistency, Unbounded, O, R>,
574    )
575    where
576        Self: TopLevel<'a> + Sized,
577        T: Serialize + DeserializeOwned,
578    {
579        let (port, stream, sink) = self.bind_single_client_bincode::<_, T, ()>(from);
580        sink.complete(stream.location().source_iter(q!([])));
581
582        (
583            ExternalBincodeSink {
584                process_key: from.key,
585                port_id: port.port_id,
586                _phantom: PhantomData,
587            },
588            stream.weaken_ordering().weaken_retries(),
589        )
590    }
591
592    /// Creates an external input stream for embedded deployment mode.
593    ///
594    /// The `name` parameter specifies the name of the generated function parameter
595    /// that will supply data to this stream at runtime. The generated function will
596    /// accept an `impl Stream<Item = T> + Unpin` argument with this name.
597    fn embedded_input<T>(
598        &self,
599        name: impl Into<String>,
600    ) -> Stream<T, Self::DropConsistency, Unbounded, TotalOrder, ExactlyOnce>
601    where
602        Self: TopLevel<'a> + Sized,
603    {
604        let ident = syn::Ident::new(&name.into(), Span::call_site());
605
606        let target_location = self.drop_consistency();
607        Stream::new(
608            target_location.clone(),
609            HydroNode::Source {
610                source: HydroSource::Embedded(ident),
611                metadata: target_location.new_node_metadata(Stream::<
612                    T,
613                    Self,
614                    Unbounded,
615                    TotalOrder,
616                    ExactlyOnce,
617                >::collection_kind()),
618            },
619        )
620    }
621
622    /// Creates an embedded singleton input for embedded deployment mode.
623    ///
624    /// The `name` parameter specifies the name of the generated function parameter
625    /// that will supply data to this singleton at runtime. The generated function will
626    /// accept a plain `T` parameter with this name.
627    fn embedded_singleton_input<T>(
628        &self,
629        name: impl Into<String>,
630    ) -> Singleton<T, Self::DropConsistency, Bounded>
631    where
632        Self: TopLevel<'a> + Sized,
633    {
634        let ident = syn::Ident::new(&name.into(), Span::call_site());
635
636        let target_location = self.drop_consistency();
637        Singleton::new(
638            target_location.clone(),
639            HydroNode::Source {
640                source: HydroSource::EmbeddedSingleton(ident),
641                metadata: target_location
642                    .new_node_metadata(Singleton::<T, Self, Bounded>::collection_kind()),
643            },
644        )
645    }
646
647    /// Establishes a server on this location to receive a bidirectional connection from a single
648    /// client, identified by the given `External` handle. Returns a port handle for the external
649    /// process to connect to, a stream of incoming messages, and a handle to send outgoing
650    /// messages.
651    ///
652    /// # Example
653    /// ```rust
654    /// # #[cfg(feature = "deploy")] {
655    /// # use hydro_lang::prelude::*;
656    /// # use hydro_deploy::Deployment;
657    /// # use futures::{SinkExt, StreamExt};
658    /// # tokio_test::block_on(async {
659    /// # use bytes::Bytes;
660    /// # use hydro_lang::location::NetworkHint;
661    /// # use tokio_util::codec::LengthDelimitedCodec;
662    /// # let mut flow = FlowBuilder::new();
663    /// let node = flow.process::<()>();
664    /// let external = flow.external::<()>();
665    /// let (port, incoming, outgoing) =
666    ///     node.bind_single_client::<_, Bytes, LengthDelimitedCodec>(&external, NetworkHint::Auto);
667    /// outgoing.complete(incoming.map(q!(|data /* : Bytes */| {
668    ///     let mut resp: Vec<u8> = data.into();
669    ///     resp.push(42);
670    ///     resp.into() // : Bytes
671    /// })));
672    ///
673    /// # let mut deployment = Deployment::new();
674    /// let nodes = flow // ... with_process and with_external
675    /// #     .with_process(&node, deployment.Localhost())
676    /// #     .with_external(&external, deployment.Localhost())
677    /// #     .deploy(&mut deployment);
678    ///
679    /// deployment.deploy().await.unwrap();
680    /// deployment.start().await.unwrap();
681    ///
682    /// let (mut external_out, mut external_in) = nodes.connect(port).await;
683    /// external_in.send(vec![1, 2, 3].into()).await.unwrap();
684    /// assert_eq!(
685    ///     external_out.next().await.unwrap().unwrap(),
686    ///     vec![1, 2, 3, 42]
687    /// );
688    /// # });
689    /// # }
690    /// ```
691    #[expect(clippy::type_complexity, reason = "stream markers")]
692    fn bind_single_client<L, T, Codec: Encoder<T> + Decoder>(
693        &self,
694        from: &External<L>,
695        port_hint: NetworkHint,
696    ) -> (
697        ExternalBytesPort<NotMany>,
698        Stream<<Codec as Decoder>::Item, Self::DropConsistency, Unbounded, TotalOrder, ExactlyOnce>,
699        ForwardHandle<'a, Stream<T, Self::DropConsistency, Unbounded, TotalOrder, ExactlyOnce>>,
700    )
701    where
702        Self: TopLevel<'a> + Sized,
703    {
704        let next_external_port_id = from.flow_state.borrow_mut().next_external_port();
705        let target_consistency = self.drop_consistency();
706
707        let (fwd_ref, to_sink) = target_consistency.forward_ref::<Stream<
708            T,
709            Self::DropConsistency,
710            Unbounded,
711            TotalOrder,
712            ExactlyOnce,
713        >>();
714        let mut flow_state_borrow = self.flow_state().borrow_mut();
715
716        flow_state_borrow.push_root(HydroRoot::SendExternal {
717            to_external_key: from.key,
718            to_port_id: next_external_port_id,
719            to_many: false,
720            unpaired: false,
721            serialize_fn: None,
722            instantiate_fn: DebugInstantiate::Building,
723            input: Box::new(to_sink.ir_node.replace(HydroNode::Placeholder)),
724            op_metadata: HydroIrOpMetadata::new(),
725        });
726
727        let raw_stream: Stream<
728            Result<<Codec as Decoder>::Item, <Codec as Decoder>::Error>,
729            Self::DropConsistency,
730            Unbounded,
731            TotalOrder,
732            ExactlyOnce,
733        > = Stream::new(
734            target_consistency.clone(),
735            HydroNode::ExternalInput {
736                from_external_key: from.key,
737                from_port_id: next_external_port_id,
738                from_many: false,
739                codec_type: quote_type::<Codec>().into(),
740                port_hint,
741                instantiate_fn: DebugInstantiate::Building,
742                deserialize_fn: None,
743                metadata: target_consistency.new_node_metadata(Stream::<
744                    Result<<Codec as Decoder>::Item, <Codec as Decoder>::Error>,
745                    Self::DropConsistency,
746                    Unbounded,
747                    TotalOrder,
748                    ExactlyOnce,
749                >::collection_kind(
750                )),
751            },
752        );
753
754        (
755            ExternalBytesPort {
756                process_key: from.key,
757                port_id: next_external_port_id,
758                _phantom: PhantomData,
759            },
760            raw_stream.flatten_ordered(),
761            fwd_ref,
762        )
763    }
764
765    /// Establishes a bidirectional connection from a single external client using bincode serialization.
766    ///
767    /// Returns a port handle for the external process to connect to, a stream of incoming messages,
768    /// and a handle to send outgoing messages. This is a convenience wrapper around
769    /// [`Location::bind_single_client`] that uses bincode for serialization.
770    ///
771    /// # Type Parameters
772    /// - `InT`: The type of incoming messages (must implement [`DeserializeOwned`])
773    /// - `OutT`: The type of outgoing messages (must implement [`Serialize`])
774    #[expect(clippy::type_complexity, reason = "stream markers")]
775    fn bind_single_client_bincode<L, InT: DeserializeOwned, OutT: Serialize>(
776        &self,
777        from: &External<L>,
778    ) -> (
779        ExternalBincodeBidi<InT, OutT, NotMany>,
780        Stream<InT, Self::DropConsistency, Unbounded, TotalOrder, ExactlyOnce>,
781        ForwardHandle<'a, Stream<OutT, Self::DropConsistency, Unbounded, TotalOrder, ExactlyOnce>>,
782    )
783    where
784        Self: TopLevel<'a> + Sized,
785    {
786        let next_external_port_id = from.flow_state.borrow_mut().next_external_port();
787
788        let target_consistency = self.drop_consistency();
789        let (fwd_ref, to_sink) = target_consistency.forward_ref::<Stream<
790            OutT,
791            Self::DropConsistency,
792            Unbounded,
793            TotalOrder,
794            ExactlyOnce,
795        >>();
796        let mut flow_state_borrow = self.flow_state().borrow_mut();
797
798        let root = get_this_crate();
799
800        let out_t_type = quote_type::<OutT>();
801        let ser_fn: syn::Expr = syn::parse_quote! {
802            #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<#out_t_type, _>(
803                |b| #root::runtime_support::bincode::serialize(&b).unwrap().into()
804            )
805        };
806
807        flow_state_borrow.push_root(HydroRoot::SendExternal {
808            to_external_key: from.key,
809            to_port_id: next_external_port_id,
810            to_many: false,
811            unpaired: false,
812            serialize_fn: Some(ser_fn.into()),
813            instantiate_fn: DebugInstantiate::Building,
814            input: Box::new(to_sink.ir_node.replace(HydroNode::Placeholder)),
815            op_metadata: HydroIrOpMetadata::new(),
816        });
817
818        let in_t_type = quote_type::<InT>();
819
820        let deser_fn: syn::Expr = syn::parse_quote! {
821            |res| {
822                let b = res.unwrap();
823                #root::runtime_support::bincode::deserialize::<#in_t_type>(&b).unwrap()
824            }
825        };
826
827        let raw_stream: Stream<InT, Self::DropConsistency, Unbounded, TotalOrder, ExactlyOnce> =
828            Stream::new(
829                target_consistency.clone(),
830                HydroNode::ExternalInput {
831                    from_external_key: from.key,
832                    from_port_id: next_external_port_id,
833                    from_many: false,
834                    codec_type: quote_type::<LengthDelimitedCodec>().into(),
835                    port_hint: NetworkHint::Auto,
836                    instantiate_fn: DebugInstantiate::Building,
837                    deserialize_fn: Some(deser_fn.into()),
838                    metadata: target_consistency.new_node_metadata(Stream::<
839                        InT,
840                        Self::DropConsistency,
841                        Unbounded,
842                        TotalOrder,
843                        ExactlyOnce,
844                    >::collection_kind(
845                    )),
846                },
847            );
848
849        (
850            ExternalBincodeBidi {
851                process_key: from.key,
852                port_id: next_external_port_id,
853                _phantom: PhantomData,
854            },
855            raw_stream,
856            fwd_ref,
857        )
858    }
859
860    /// Establishes a server on this location to receive bidirectional connections from multiple
861    /// external clients using raw bytes.
862    ///
863    /// Unlike [`Location::bind_single_client`], this method supports multiple concurrent client
864    /// connections. Each client is assigned a unique `u64` identifier.
865    ///
866    /// Returns:
867    /// - A port handle for external processes to connect to
868    /// - A keyed stream of incoming messages, keyed by client ID
869    /// - A keyed stream of membership events (client joins/leaves), keyed by client ID
870    /// - A handle to send outgoing messages, keyed by client ID
871    #[expect(clippy::type_complexity, reason = "stream markers")]
872    fn bidi_external_many_bytes<L, T, Codec: Encoder<T> + Decoder>(
873        &self,
874        from: &External<L>,
875        port_hint: NetworkHint,
876    ) -> (
877        ExternalBytesPort<Many>,
878        KeyedStream<
879            u64,
880            <Codec as Decoder>::Item,
881            Self::DropConsistency,
882            Unbounded,
883            TotalOrder,
884            ExactlyOnce,
885        >,
886        KeyedStream<
887            u64,
888            MembershipEvent,
889            Self::DropConsistency,
890            Unbounded,
891            TotalOrder,
892            ExactlyOnce,
893        >,
894        ForwardHandle<
895            'a,
896            KeyedStream<u64, T, Self::DropConsistency, Unbounded, NoOrder, ExactlyOnce>,
897        >,
898    )
899    where
900        Self: TopLevel<'a> + Sized,
901    {
902        let next_external_port_id = from.flow_state.borrow_mut().next_external_port();
903
904        let target_consistency = self.drop_consistency();
905        let (fwd_ref, to_sink) = target_consistency.forward_ref::<KeyedStream<
906            u64,
907            T,
908            Self::DropConsistency,
909            Unbounded,
910            NoOrder,
911            ExactlyOnce,
912        >>();
913        let mut flow_state_borrow = self.flow_state().borrow_mut();
914
915        flow_state_borrow.push_root(HydroRoot::SendExternal {
916            to_external_key: from.key,
917            to_port_id: next_external_port_id,
918            to_many: true,
919            unpaired: false,
920            serialize_fn: None,
921            instantiate_fn: DebugInstantiate::Building,
922            input: Box::new(to_sink.entries().ir_node.replace(HydroNode::Placeholder)),
923            op_metadata: HydroIrOpMetadata::new(),
924        });
925
926        let raw_stream: Stream<
927            Result<(u64, <Codec as Decoder>::Item), <Codec as Decoder>::Error>,
928            Self::DropConsistency,
929            Unbounded,
930            TotalOrder,
931            ExactlyOnce,
932        > = Stream::new(
933            target_consistency.clone(),
934            HydroNode::ExternalInput {
935                from_external_key: from.key,
936                from_port_id: next_external_port_id,
937                from_many: true,
938                codec_type: quote_type::<Codec>().into(),
939                port_hint,
940                instantiate_fn: DebugInstantiate::Building,
941                deserialize_fn: None,
942                metadata: target_consistency.new_node_metadata(Stream::<
943                    Result<(u64, <Codec as Decoder>::Item), <Codec as Decoder>::Error>,
944                    Self::DropConsistency,
945                    Unbounded,
946                    TotalOrder,
947                    ExactlyOnce,
948                >::collection_kind(
949                )),
950            },
951        );
952
953        let membership_stream_ident = syn::Ident::new(
954            &format!(
955                "__hydro_deploy_many_{}_{}_membership",
956                from.key, next_external_port_id
957            ),
958            Span::call_site(),
959        );
960        let membership_stream_expr: syn::Expr = parse_quote!(#membership_stream_ident);
961        let raw_membership_stream: KeyedStream<
962            u64,
963            bool,
964            Self::DropConsistency,
965            Unbounded,
966            TotalOrder,
967            ExactlyOnce,
968        > = KeyedStream::new(
969            target_consistency.clone(),
970            HydroNode::Source {
971                source: HydroSource::Stream(membership_stream_expr.into()),
972                metadata: target_consistency.new_node_metadata(KeyedStream::<
973                    u64,
974                    bool,
975                    Self::DropConsistency,
976                    Unbounded,
977                    TotalOrder,
978                    ExactlyOnce,
979                >::collection_kind(
980                )),
981            },
982        );
983
984        (
985            ExternalBytesPort {
986                process_key: from.key,
987                port_id: next_external_port_id,
988                _phantom: PhantomData,
989            },
990            raw_stream
991                .flatten_ordered() // TODO(shadaj): this silently drops framing errors, decide on right defaults
992                .into_keyed(),
993            raw_membership_stream.map(q!(|join| {
994                if join {
995                    MembershipEvent::Joined
996                } else {
997                    MembershipEvent::Left
998                }
999            })),
1000            fwd_ref,
1001        )
1002    }
1003
1004    /// Establishes a server on this location to receive bidirectional connections from multiple
1005    /// external clients using bincode serialization.
1006    ///
1007    /// Unlike [`Location::bind_single_client_bincode`], this method supports multiple concurrent
1008    /// client connections. Each client is assigned a unique `u64` identifier.
1009    ///
1010    /// Returns:
1011    /// - A port handle for external processes to connect to
1012    /// - A keyed stream of incoming messages, keyed by client ID
1013    /// - A keyed stream of membership events (client joins/leaves), keyed by client ID
1014    /// - A handle to send outgoing messages, keyed by client ID
1015    ///
1016    /// # Type Parameters
1017    /// - `InT`: The type of incoming messages (must implement [`DeserializeOwned`])
1018    /// - `OutT`: The type of outgoing messages (must implement [`Serialize`])
1019    #[expect(clippy::type_complexity, reason = "stream markers")]
1020    fn bidi_external_many_bincode<L, InT: DeserializeOwned, OutT: Serialize>(
1021        &self,
1022        from: &External<L>,
1023    ) -> (
1024        ExternalBincodeBidi<InT, OutT, Many>,
1025        KeyedStream<u64, InT, Self::DropConsistency, Unbounded, TotalOrder, ExactlyOnce>,
1026        KeyedStream<
1027            u64,
1028            MembershipEvent,
1029            Self::DropConsistency,
1030            Unbounded,
1031            TotalOrder,
1032            ExactlyOnce,
1033        >,
1034        ForwardHandle<
1035            'a,
1036            KeyedStream<u64, OutT, Self::DropConsistency, Unbounded, NoOrder, ExactlyOnce>,
1037        >,
1038    )
1039    where
1040        Self: TopLevel<'a> + Sized,
1041    {
1042        let next_external_port_id = from.flow_state.borrow_mut().next_external_port();
1043
1044        let target_consistency = self.drop_consistency();
1045        let (fwd_ref, to_sink) = target_consistency.forward_ref::<KeyedStream<
1046            u64,
1047            OutT,
1048            Self::DropConsistency,
1049            Unbounded,
1050            NoOrder,
1051            ExactlyOnce,
1052        >>();
1053        let mut flow_state_borrow = self.flow_state().borrow_mut();
1054
1055        let root = get_this_crate();
1056
1057        let out_t_type = quote_type::<OutT>();
1058        let ser_fn: syn::Expr = syn::parse_quote! {
1059            #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<(u64, #out_t_type), _>(
1060                |(id, b)| (id, #root::runtime_support::bincode::serialize(&b).unwrap().into())
1061            )
1062        };
1063
1064        flow_state_borrow.push_root(HydroRoot::SendExternal {
1065            to_external_key: from.key,
1066            to_port_id: next_external_port_id,
1067            to_many: true,
1068            unpaired: false,
1069            serialize_fn: Some(ser_fn.into()),
1070            instantiate_fn: DebugInstantiate::Building,
1071            input: Box::new(to_sink.entries().ir_node.replace(HydroNode::Placeholder)),
1072            op_metadata: HydroIrOpMetadata::new(),
1073        });
1074
1075        let in_t_type = quote_type::<InT>();
1076
1077        let deser_fn: syn::Expr = syn::parse_quote! {
1078            |res| {
1079                let (id, b) = res.unwrap();
1080                (id, #root::runtime_support::bincode::deserialize::<#in_t_type>(&b).unwrap())
1081            }
1082        };
1083
1084        let raw_stream: KeyedStream<
1085            u64,
1086            InT,
1087            Self::DropConsistency,
1088            Unbounded,
1089            TotalOrder,
1090            ExactlyOnce,
1091        > = KeyedStream::new(
1092            target_consistency.clone(),
1093            HydroNode::ExternalInput {
1094                from_external_key: from.key,
1095                from_port_id: next_external_port_id,
1096                from_many: true,
1097                codec_type: quote_type::<LengthDelimitedCodec>().into(),
1098                port_hint: NetworkHint::Auto,
1099                instantiate_fn: DebugInstantiate::Building,
1100                deserialize_fn: Some(deser_fn.into()),
1101                metadata: target_consistency.new_node_metadata(KeyedStream::<
1102                    u64,
1103                    InT,
1104                    Self::DropConsistency,
1105                    Unbounded,
1106                    TotalOrder,
1107                    ExactlyOnce,
1108                >::collection_kind(
1109                )),
1110            },
1111        );
1112
1113        let membership_stream_ident = syn::Ident::new(
1114            &format!(
1115                "__hydro_deploy_many_{}_{}_membership",
1116                from.key, next_external_port_id
1117            ),
1118            Span::call_site(),
1119        );
1120        let membership_stream_expr: syn::Expr = parse_quote!(#membership_stream_ident);
1121        let raw_membership_stream: KeyedStream<
1122            u64,
1123            bool,
1124            Self::DropConsistency,
1125            Unbounded,
1126            TotalOrder,
1127            ExactlyOnce,
1128        > = KeyedStream::new(
1129            target_consistency.clone(),
1130            HydroNode::Source {
1131                source: HydroSource::Stream(membership_stream_expr.into()),
1132                metadata: target_consistency.new_node_metadata(KeyedStream::<
1133                    u64,
1134                    bool,
1135                    Self::DropConsistency,
1136                    Unbounded,
1137                    TotalOrder,
1138                    ExactlyOnce,
1139                >::collection_kind(
1140                )),
1141            },
1142        );
1143
1144        (
1145            ExternalBincodeBidi {
1146                process_key: from.key,
1147                port_id: next_external_port_id,
1148                _phantom: PhantomData,
1149            },
1150            raw_stream,
1151            raw_membership_stream.map(q!(|join| {
1152                if join {
1153                    MembershipEvent::Joined
1154                } else {
1155                    MembershipEvent::Left
1156                }
1157            })),
1158            fwd_ref,
1159        )
1160    }
1161
1162    /// Bridges user-owned async code to the dataflow as a **bidirectional sidecar**.
1163    ///
1164    /// The closure is called once at startup and must return a
1165    /// `(Stream<InT>, Sink<OutT>)` pair. The framework reads from the stream
1166    /// (items flowing *into* the dataflow) and writes to the sink (items flowing
1167    /// *out* to the sidecar). The user controls buffering, backpressure, and
1168    /// internal lifecycle — Hydro only sees the stream/sink interface.
1169    ///
1170    /// This will hopefully make it easy to integrate hydro with existing frameworks,
1171    /// for example grpc code generated service endpoints.
1172    ///
1173    /// # Returns
1174    /// - A `Stream<InT>` carrying items from the sidecar into the dataflow.
1175    /// - A [`ForwardHandle`] expecting a `Stream<OutT>` that the user completes
1176    ///   with items destined for the sidecar.
1177    ///
1178    /// # Example
1179    ///
1180    /// ```rust
1181    /// # #[cfg(feature = "deploy")] {
1182    /// # use hydro_lang::prelude::*;
1183    /// # use futures::StreamExt;
1184    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1185    /// // Sidecar that echoes whatever it receives back into the dataflow.
1186    /// let (inbound, response_handle) = process.sidecar_bidi::<String, String, _>(q!(|| {
1187    ///     let (to_df_tx, to_df_rx) = tokio::sync::mpsc::channel::<String>(16);
1188    ///     let (from_df_tx, mut from_df_rx) = tokio::sync::mpsc::channel::<String>(16);
1189    ///
1190    ///     // Spawn the sidecar: echoes items from the dataflow back into it.
1191    ///     tokio::spawn(async move {
1192    ///         while let Some(msg) = from_df_rx.recv().await {
1193    ///             to_df_tx.send(msg).await.ok();
1194    ///         }
1195    ///     });
1196    ///
1197    ///     // Return the framework-facing ends (concrete types, no boxing needed).
1198    ///     let stream = tokio_stream::wrappers::ReceiverStream::new(to_df_rx);
1199    ///     let sink = tokio_util::sync::PollSender::new(from_df_tx);
1200    ///     (stream, sink)
1201    /// }));
1202    ///
1203    /// // Send "hello" into the sidecar via the response channel.
1204    /// let input = process.source_stream(q!(futures::stream::iter(vec!["hello".to_string()])));
1205    /// response_handle.complete(input);
1206    ///
1207    /// // The sidecar echoes it back — assert we get "hello" out.
1208    /// inbound
1209    /// # }, |mut stream| async move {
1210    /// #     assert_eq!(stream.next().await.unwrap(), "hello");
1211    /// # }));
1212    /// # }
1213    /// ```
1214    #[expect(clippy::type_complexity, reason = "stream markers")]
1215    fn sidecar_bidi<InT: 'static, OutT: 'static, F>(
1216        &self,
1217        sidecar: impl QuotedWithContext<'a, F, Self>,
1218    ) -> (
1219        Stream<InT, Self, Unbounded, TotalOrder, ExactlyOnce>,
1220        ForwardHandle<'a, Stream<OutT, Self, Unbounded, NoOrder, ExactlyOnce>>,
1221    )
1222    where
1223        Self: Sized + TopLevel<'a>,
1224    {
1225        let location_key = Location::id(self).key();
1226
1227        let sidecar_id = self.flow_state().borrow_mut().next_sidecar_id();
1228        let (stream_ident, sink_ident) = sidecar_id.idents();
1229
1230        let sidecar_closure: syn::Expr = sidecar.splice_untyped_ctx(self);
1231        self.flow_state()
1232            .borrow_mut()
1233            .sidecars
1234            .push(crate::compile::builder::Sidecar::Bidi {
1235                location_key,
1236                sidecar_id,
1237                sidecar_closure: Box::new(sidecar_closure),
1238            });
1239
1240        // Inbound stream: reads from the stream returned by the sidecar closure
1241        let source_expr: syn::Expr = parse_quote! {
1242            #stream_ident
1243        };
1244        let inbound: Stream<InT, Self, Unbounded, TotalOrder, ExactlyOnce> = Stream::new(
1245            self.clone(),
1246            HydroNode::Source {
1247                source: HydroSource::Stream(source_expr.into()),
1248                metadata: self.new_node_metadata(Stream::<
1249                    InT,
1250                    Self,
1251                    Unbounded,  // TODO: maybe bounded sidecars are interesting..?
1252                    TotalOrder, // TODO: NoOrder..?
1253                    ExactlyOnce,
1254                >::collection_kind()),
1255            },
1256        );
1257
1258        // Outbound: forward_ref cycle feeding the sink returned by the sidecar closure
1259        let (fwd_ref, to_sink): (
1260            ForwardHandle<'a, Stream<OutT, Self, Unbounded, NoOrder, ExactlyOnce>>,
1261            Stream<OutT, Self, Unbounded, NoOrder, ExactlyOnce>,
1262        ) = self.forward_ref();
1263
1264        let sink_expr: syn::Expr = parse_quote! {
1265            #sink_ident
1266        };
1267
1268        let sink_input_ir = to_sink.ir_node.replace(HydroNode::Placeholder);
1269        self.flow_state()
1270            .borrow_mut()
1271            .try_push_root(HydroRoot::DestSink {
1272                sink: sink_expr.into(),
1273                input: Box::new(sink_input_ir),
1274                op_metadata: HydroIrOpMetadata::new(),
1275            });
1276
1277        (inbound, fwd_ref)
1278    }
1279
1280    /// Constructs a [`Singleton`] materialized at this location with the given static value.
1281    ///
1282    /// See also: [`Tick::singleton`], for creating a singleton _within_ a tick, which requires
1283    /// `T: Clone`.
1284    ///
1285    /// # Example
1286    /// ```rust
1287    /// # #[cfg(feature = "deploy")] {
1288    /// # use hydro_lang::prelude::*;
1289    /// # use futures::StreamExt;
1290    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1291    /// let singleton = process.singleton(q!(5));
1292    /// # singleton.into_stream()
1293    /// # }, |mut stream| async move {
1294    /// // 5
1295    /// # assert_eq!(stream.next().await.unwrap(), 5);
1296    /// # }));
1297    /// # }
1298    /// ```
1299    fn singleton<T>(
1300        &self,
1301        e: impl QuotedWithContext<'a, T, Self>,
1302    ) -> Singleton<T, Self::DropConsistency, Bounded>
1303    where
1304        Self: Sized,
1305    {
1306        let e = e.splice_untyped_ctx(self);
1307
1308        let target_location = self.drop_consistency();
1309        Singleton::new(
1310            target_location.clone(),
1311            HydroNode::SingletonSource {
1312                value: e.into(),
1313                first_tick_only: false,
1314                metadata: target_location.new_node_metadata(Singleton::<
1315                    T,
1316                    Self::DropConsistency,
1317                    Bounded,
1318                >::collection_kind()),
1319            },
1320        )
1321    }
1322
1323    /// Constructs a [`Singleton`] by resolving an async [`Future`] to completion.
1324    ///
1325    /// This is a convenience method equivalent to
1326    /// `self.singleton(future_expr).resolve_future_blocking()`, which is a common
1327    /// pattern when initializing a singleton from an async computation.
1328    ///
1329    /// # Example
1330    /// ```rust
1331    /// # #[cfg(feature = "deploy")] {
1332    /// # use hydro_lang::prelude::*;
1333    /// # use futures::StreamExt;
1334    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1335    /// let singleton = process.singleton_future(q!(async { 42 }));
1336    /// singleton.into_stream()
1337    /// # }, |mut stream| async move {
1338    /// // 42
1339    /// # assert_eq!(stream.next().await.unwrap(), 42);
1340    /// # }));
1341    /// # }
1342    /// ```
1343    ///
1344    /// [`Future`]: std::future::Future
1345    fn singleton_future<F>(
1346        &self,
1347        e: impl QuotedWithContext<'a, F, Self>,
1348    ) -> Singleton<F::Output, Self::DropConsistency, Bounded>
1349    where
1350        F: Future,
1351        Self: Sized,
1352    {
1353        self.singleton(e).resolve_future_blocking()
1354    }
1355
1356    /// Generates a stream that emits `()` at a fixed interval.
1357    ///
1358    /// The first tick completes immediately. Missed ticks will be scheduled
1359    /// as soon as possible.
1360    ///
1361    /// Because this only emits `()`, the non-determinism of *when* events fire
1362    /// is captured by the `AtLeastOnce` retry semantics downstream, so no
1363    /// [`NonDet`] guard is required.
1364    fn source_interval(
1365        &self,
1366        interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
1367    ) -> Stream<(), Self, Unbounded, TotalOrder, ExactlyOnce>
1368    where
1369        Self: TopLevel<'a> + Sized,
1370    {
1371        self.source_stream(q!(tokio_stream::StreamExt::map(
1372            tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(interval)),
1373            |_| ()
1374        )))
1375        .assert_has_consistency_of_trusted(
1376            manual_proof!(/** interval does not reveal timestamps */),
1377        )
1378    }
1379
1380    /// Generates a stream that emits `()` at a fixed interval, after an
1381    /// initial delay.
1382    ///
1383    /// Because this only emits `()`, the non-determinism of *when* events fire
1384    /// is captured by the `AtLeastOnce` retry semantics downstream, so no
1385    /// [`NonDet`] guard is required.
1386    fn source_interval_delayed(
1387        &self,
1388        delay: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
1389        interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
1390    ) -> Stream<(), Self, Unbounded, TotalOrder, ExactlyOnce>
1391    where
1392        Self: TopLevel<'a> + Sized,
1393    {
1394        self.source_stream(q!(tokio_stream::StreamExt::map(
1395            tokio_stream::wrappers::IntervalStream::new(tokio::time::interval_at(
1396                tokio::time::Instant::now() + delay,
1397                interval,
1398            )),
1399            |_| ()
1400        )))
1401        .assert_has_consistency_of_trusted(
1402            manual_proof!(/** interval does not reveal timestamps */),
1403        )
1404    }
1405
1406    /// Creates a forward reference, allowing a stream to be used before its source is defined.
1407    ///
1408    /// Returns a `(handle, placeholder)` pair. Use the placeholder in the dataflow graph,
1409    /// then call `handle.complete(actual_stream)` to wire in the real source.
1410    ///
1411    /// This is useful for mutually-dependent dataflows or when the definition order
1412    /// doesn't match the data flow direction. For feedback loops, prefer [`Tick::cycle`]
1413    /// instead, which automatically defers values by one tick.
1414    ///
1415    /// # Panics
1416    /// Panics if the forward reference creates a synchronous cycle (i.e., the completed
1417    /// stream transitively depends on the placeholder without a `defer_tick` or network
1418    /// hop in between).
1419    ///
1420    /// # Example
1421    /// ```rust
1422    /// # #[cfg(feature = "deploy")] {
1423    /// # use hydro_lang::prelude::*;
1424    /// # use hydro_lang::live_collections::stream::NoOrder;
1425    /// # use futures::StreamExt;
1426    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1427    /// // Create a forward reference to define a stream that will be completed later
1428    /// let (complete, forward_stream) = process.forward_ref::<Stream<i32, _, _, NoOrder>>();
1429    ///
1430    /// // Use the forward reference as input to another computation
1431    /// let output: Stream<_, _, _, NoOrder> = forward_stream.map(q!(|x| x * 2));
1432    ///
1433    /// // Complete the forward reference with the actual source
1434    /// let source: Stream<_, _, Unbounded> = process.source_iter(q!([1, 2, 3])).into();
1435    /// complete.complete(source);
1436    /// output
1437    /// # }, |mut stream| async move {
1438    /// // 2, 4, 6
1439    /// # assert_eq!(stream.next().await.unwrap(), 2);
1440    /// # assert_eq!(stream.next().await.unwrap(), 4);
1441    /// # assert_eq!(stream.next().await.unwrap(), 6);
1442    /// # }));
1443    /// # }
1444    /// ```
1445    fn forward_ref<S>(&self) -> (ForwardHandle<'a, S>, S)
1446    where
1447        S: CycleCollection<'a, ForwardRef, Location = Self>,
1448    {
1449        let cycle_id = self.flow_state().borrow_mut().next_cycle_id();
1450        (
1451            ForwardHandle::new(cycle_id, Location::id(self)),
1452            S::create_source(cycle_id, self.clone()),
1453        )
1454    }
1455}
1456
1457#[cfg(feature = "deploy")]
1458#[cfg(test)]
1459mod tests {
1460    use std::collections::HashSet;
1461
1462    use futures::{SinkExt, StreamExt};
1463    use hydro_deploy::Deployment;
1464    use stageleft::q;
1465    use tokio_util::codec::LengthDelimitedCodec;
1466
1467    use crate::compile::builder::FlowBuilder;
1468    use crate::live_collections::stream::{ExactlyOnce, TotalOrder};
1469    use crate::location::{Location, NetworkHint};
1470    use crate::nondet::nondet;
1471
1472    #[tokio::test]
1473    async fn top_level_singleton_replay_cardinality() {
1474        let mut deployment = Deployment::new();
1475
1476        let mut flow = FlowBuilder::new();
1477        let node = flow.process::<()>();
1478        let external = flow.external::<()>();
1479
1480        let (in_port, input) =
1481            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
1482        let singleton = node.singleton(q!(123));
1483        let tick = node.tick();
1484        let out = input
1485            .batch(&tick, nondet!(/** test */))
1486            .cross_singleton(singleton.clone().snapshot(&tick, nondet!(/** test */)))
1487            .cross_singleton(
1488                singleton
1489                    .snapshot(&tick, nondet!(/** test */))
1490                    .into_stream()
1491                    .count(),
1492            )
1493            .all_ticks()
1494            .send_bincode_external(&external);
1495
1496        let nodes = flow
1497            .with_process(&node, deployment.Localhost())
1498            .with_external(&external, deployment.Localhost())
1499            .deploy(&mut deployment);
1500
1501        deployment.deploy().await.unwrap();
1502
1503        let mut external_in = nodes.connect(in_port).await;
1504        let mut external_out = nodes.connect(out).await;
1505
1506        deployment.start().await.unwrap();
1507
1508        external_in.send(1).await.unwrap();
1509        assert_eq!(external_out.next().await.unwrap(), ((1, 123), 1));
1510
1511        external_in.send(2).await.unwrap();
1512        assert_eq!(external_out.next().await.unwrap(), ((2, 123), 1));
1513    }
1514
1515    #[tokio::test]
1516    async fn tick_singleton_replay_cardinality() {
1517        let mut deployment = Deployment::new();
1518
1519        let mut flow = FlowBuilder::new();
1520        let node = flow.process::<()>();
1521        let external = flow.external::<()>();
1522
1523        let (in_port, input) =
1524            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
1525        let tick = node.tick();
1526        let singleton = tick.singleton(q!(123));
1527        let out = input
1528            .batch(&tick, nondet!(/** test */))
1529            .cross_singleton(singleton.clone())
1530            .cross_singleton(singleton.into_stream().count())
1531            .all_ticks()
1532            .send_bincode_external(&external);
1533
1534        let nodes = flow
1535            .with_process(&node, deployment.Localhost())
1536            .with_external(&external, deployment.Localhost())
1537            .deploy(&mut deployment);
1538
1539        deployment.deploy().await.unwrap();
1540
1541        let mut external_in = nodes.connect(in_port).await;
1542        let mut external_out = nodes.connect(out).await;
1543
1544        deployment.start().await.unwrap();
1545
1546        external_in.send(1).await.unwrap();
1547        assert_eq!(external_out.next().await.unwrap(), ((1, 123), 1));
1548
1549        external_in.send(2).await.unwrap();
1550        assert_eq!(external_out.next().await.unwrap(), ((2, 123), 1));
1551    }
1552
1553    #[tokio::test]
1554    async fn external_bytes() {
1555        let mut deployment = Deployment::new();
1556
1557        let mut flow = FlowBuilder::new();
1558        let first_node = flow.process::<()>();
1559        let external = flow.external::<()>();
1560
1561        let (in_port, input) = first_node.source_external_bytes(&external);
1562        let out = input.send_bincode_external(&external);
1563
1564        let nodes = flow
1565            .with_process(&first_node, deployment.Localhost())
1566            .with_external(&external, deployment.Localhost())
1567            .deploy(&mut deployment);
1568
1569        deployment.deploy().await.unwrap();
1570
1571        let mut external_in = nodes.connect(in_port).await.1;
1572        let mut external_out = nodes.connect(out).await;
1573
1574        deployment.start().await.unwrap();
1575
1576        external_in.send(vec![1, 2, 3].into()).await.unwrap();
1577
1578        assert_eq!(external_out.next().await.unwrap(), vec![1, 2, 3]);
1579    }
1580
1581    #[tokio::test]
1582    async fn multi_external_source() {
1583        let mut deployment = Deployment::new();
1584
1585        let mut flow = FlowBuilder::new();
1586        let first_node = flow.process::<()>();
1587        let external = flow.external::<()>();
1588
1589        let (in_port, input, _membership, complete_sink) =
1590            first_node.bidi_external_many_bincode(&external);
1591        let out = input.entries().send_bincode_external(&external);
1592        complete_sink.complete(
1593            first_node
1594                .source_iter::<(u64, ()), _>(q!([]))
1595                .into_keyed()
1596                .weaken_ordering(),
1597        );
1598
1599        let nodes = flow
1600            .with_process(&first_node, deployment.Localhost())
1601            .with_external(&external, deployment.Localhost())
1602            .deploy(&mut deployment);
1603
1604        deployment.deploy().await.unwrap();
1605
1606        let (_, mut external_in_1) = nodes.connect_bincode(in_port.clone()).await;
1607        let (_, mut external_in_2) = nodes.connect_bincode(in_port).await;
1608        let external_out = nodes.connect(out).await;
1609
1610        deployment.start().await.unwrap();
1611
1612        external_in_1.send(123).await.unwrap();
1613        external_in_2.send(456).await.unwrap();
1614
1615        assert_eq!(
1616            external_out.take(2).collect::<HashSet<_>>().await,
1617            vec![(0, 123), (1, 456)].into_iter().collect()
1618        );
1619    }
1620
1621    #[tokio::test]
1622    async fn second_connection_only_multi_source() {
1623        let mut deployment = Deployment::new();
1624
1625        let mut flow = FlowBuilder::new();
1626        let first_node = flow.process::<()>();
1627        let external = flow.external::<()>();
1628
1629        let (in_port, input, _membership, complete_sink) =
1630            first_node.bidi_external_many_bincode(&external);
1631        let out = input.entries().send_bincode_external(&external);
1632        complete_sink.complete(
1633            first_node
1634                .source_iter::<(u64, ()), _>(q!([]))
1635                .into_keyed()
1636                .weaken_ordering(),
1637        );
1638
1639        let nodes = flow
1640            .with_process(&first_node, deployment.Localhost())
1641            .with_external(&external, deployment.Localhost())
1642            .deploy(&mut deployment);
1643
1644        deployment.deploy().await.unwrap();
1645
1646        // intentionally skipped to test stream waking logic
1647        let (_, mut _external_in_1) = nodes.connect_bincode(in_port.clone()).await;
1648        let (_, mut external_in_2) = nodes.connect_bincode(in_port).await;
1649        let mut external_out = nodes.connect(out).await;
1650
1651        deployment.start().await.unwrap();
1652
1653        external_in_2.send(456).await.unwrap();
1654
1655        assert_eq!(external_out.next().await.unwrap(), (1, 456));
1656    }
1657
1658    #[tokio::test]
1659    async fn multi_external_bytes() {
1660        let mut deployment = Deployment::new();
1661
1662        let mut flow = FlowBuilder::new();
1663        let first_node = flow.process::<()>();
1664        let external = flow.external::<()>();
1665
1666        let (in_port, input, _membership, complete_sink) = first_node
1667            .bidi_external_many_bytes::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1668        let out = input.entries().send_bincode_external(&external);
1669        complete_sink.complete(
1670            first_node
1671                .source_iter(q!([]))
1672                .into_keyed()
1673                .weaken_ordering(),
1674        );
1675
1676        let nodes = flow
1677            .with_process(&first_node, deployment.Localhost())
1678            .with_external(&external, deployment.Localhost())
1679            .deploy(&mut deployment);
1680
1681        deployment.deploy().await.unwrap();
1682
1683        let mut external_in_1 = nodes.connect(in_port.clone()).await.1;
1684        let mut external_in_2 = nodes.connect(in_port).await.1;
1685        let external_out = nodes.connect(out).await;
1686
1687        deployment.start().await.unwrap();
1688
1689        external_in_1.send(vec![1, 2, 3].into()).await.unwrap();
1690        external_in_2.send(vec![4, 5].into()).await.unwrap();
1691
1692        assert_eq!(
1693            external_out.take(2).collect::<HashSet<_>>().await,
1694            vec![
1695                (0, (&[1u8, 2, 3] as &[u8]).into()),
1696                (1, (&[4u8, 5] as &[u8]).into())
1697            ]
1698            .into_iter()
1699            .collect()
1700        );
1701    }
1702
1703    #[tokio::test]
1704    async fn single_client_external_bytes() {
1705        let mut deployment = Deployment::new();
1706        let mut flow = FlowBuilder::new();
1707        let first_node = flow.process::<()>();
1708        let external = flow.external::<()>();
1709        let (port, input, complete_sink) = first_node
1710            .bind_single_client::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1711        complete_sink.complete(input.map(q!(|data| {
1712            let mut resp: Vec<u8> = data.into();
1713            resp.push(42);
1714            resp.into() // : Bytes
1715        })));
1716
1717        let nodes = flow
1718            .with_process(&first_node, deployment.Localhost())
1719            .with_external(&external, deployment.Localhost())
1720            .deploy(&mut deployment);
1721
1722        deployment.deploy().await.unwrap();
1723        deployment.start().await.unwrap();
1724
1725        let (mut external_out, mut external_in) = nodes.connect(port).await;
1726
1727        external_in.send(vec![1, 2, 3].into()).await.unwrap();
1728        assert_eq!(
1729            external_out.next().await.unwrap().unwrap(),
1730            vec![1, 2, 3, 42]
1731        );
1732    }
1733
1734    #[tokio::test]
1735    async fn echo_external_bytes() {
1736        let mut deployment = Deployment::new();
1737
1738        let mut flow = FlowBuilder::new();
1739        let first_node = flow.process::<()>();
1740        let external = flow.external::<()>();
1741
1742        let (port, input, _membership, complete_sink) = first_node
1743            .bidi_external_many_bytes::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1744        complete_sink
1745            .complete(input.map(q!(|bytes| { bytes.into_iter().map(|x| x + 1).collect() })));
1746
1747        let nodes = flow
1748            .with_process(&first_node, deployment.Localhost())
1749            .with_external(&external, deployment.Localhost())
1750            .deploy(&mut deployment);
1751
1752        deployment.deploy().await.unwrap();
1753
1754        let (mut external_out_1, mut external_in_1) = nodes.connect(port.clone()).await;
1755        let (mut external_out_2, mut external_in_2) = nodes.connect(port).await;
1756
1757        deployment.start().await.unwrap();
1758
1759        external_in_1.send(vec![1, 2, 3].into()).await.unwrap();
1760        external_in_2.send(vec![4, 5].into()).await.unwrap();
1761
1762        assert_eq!(external_out_1.next().await.unwrap().unwrap(), vec![2, 3, 4]);
1763        assert_eq!(external_out_2.next().await.unwrap().unwrap(), vec![5, 6]);
1764    }
1765
1766    #[tokio::test]
1767    async fn echo_external_bincode() {
1768        let mut deployment = Deployment::new();
1769
1770        let mut flow = FlowBuilder::new();
1771        let first_node = flow.process::<()>();
1772        let external = flow.external::<()>();
1773
1774        let (port, input, _membership, complete_sink) =
1775            first_node.bidi_external_many_bincode(&external);
1776        complete_sink.complete(input.map(q!(|text: String| { text.to_uppercase() })));
1777
1778        let nodes = flow
1779            .with_process(&first_node, deployment.Localhost())
1780            .with_external(&external, deployment.Localhost())
1781            .deploy(&mut deployment);
1782
1783        deployment.deploy().await.unwrap();
1784
1785        let (mut external_out_1, mut external_in_1) = nodes.connect_bincode(port.clone()).await;
1786        let (mut external_out_2, mut external_in_2) = nodes.connect_bincode(port).await;
1787
1788        deployment.start().await.unwrap();
1789
1790        external_in_1.send("hi".to_owned()).await.unwrap();
1791        external_in_2.send("hello".to_owned()).await.unwrap();
1792
1793        assert_eq!(external_out_1.next().await.unwrap(), "HI");
1794        assert_eq!(external_out_2.next().await.unwrap(), "HELLO");
1795    }
1796
1797    #[tokio::test]
1798    async fn closure_location_name() {
1799        let mut deployment = Deployment::new();
1800        let mut flow = FlowBuilder::new();
1801
1802        enum ClosureProcess {}
1803
1804        let node = flow.process::<ClosureProcess>();
1805        let external = flow.external::<()>();
1806
1807        let (in_port, input) =
1808            node.source_external_bincode::<_, i32, TotalOrder, ExactlyOnce>(&external);
1809        let out = input.send_bincode_external(&external);
1810
1811        let nodes = flow
1812            .with_process(&node, deployment.Localhost())
1813            .with_external(&external, deployment.Localhost())
1814            .deploy(&mut deployment);
1815
1816        deployment.deploy().await.unwrap();
1817
1818        let mut external_in = nodes.connect(in_port).await;
1819        let mut external_out = nodes.connect(out).await;
1820
1821        deployment.start().await.unwrap();
1822
1823        external_in.send(42).await.unwrap();
1824        assert_eq!(external_out.next().await.unwrap(), 42);
1825    }
1826}